···2222 atproto.NSIDGrinder,
2323 atproto.NSIDBrewer,
2424 atproto.NSIDLike,
2525+ atproto.NSIDComment,
2526}
26272728// Config holds configuration for the Jetstream consumer
+4
internal/firehose/consumer.go
···349349 if err := c.index.UpsertLike(event.DID, commit.RKey, subjectURI); err != nil {
350350 log.Warn().Err(err).Str("did", event.DID).Str("subject", subjectURI).Msg("failed to index like")
351351 }
352352+ // Create notification for the like
353353+ c.index.CreateLikeNotification(event.DID, subjectURI)
352354 }
353355 }
354356 }
···379381 if err := c.index.UpsertComment(event.DID, commit.RKey, subjectURI, parentURI, commit.CID, text, createdAt); err != nil {
380382 log.Warn().Err(err).Str("did", event.DID).Str("subject", subjectURI).Msg("failed to index comment")
381383 }
384384+ // Create notification for the comment
385385+ c.index.CreateCommentNotification(event.DID, subjectURI, parentURI)
382386 }
383387 }
384388 }
+22-1
internal/firehose/index.go
···174174 BucketCommentCounts,
175175 BucketCommentsByActor,
176176 BucketCommentChildren,
177177+ BucketNotifications,
178178+ BucketNotificationsMeta,
177179 }
178180 for _, bucket := range buckets {
179181 if _, err := tx.CreateBucketIfNotExists(bucket); err != nil {
···13101312 existingSubject := commentsByActor.Get(actorKey)
13111313 isNew := existingSubject == nil
1312131413151315+ // If the comment already exists, delete the old entry from BucketComments
13161316+ // to prevent duplicates (the key includes timestamp which may differ between calls)
13171317+ if !isNew {
13181318+ oldPrefix := []byte(string(existingSubject) + ":")
13191319+ suffix := ":" + actorDID + ":" + rkey
13201320+ cur := comments.Cursor()
13211321+ for k, _ := cur.Seek(oldPrefix); k != nil && strings.HasPrefix(string(k), string(oldPrefix)); k, _ = cur.Next() {
13221322+ if strings.HasSuffix(string(k), suffix) {
13231323+ _ = comments.Delete(k)
13241324+ break
13251325+ }
13261326+ }
13271327+ }
13281328+13131329 // Extract parent rkey from parent URI if present
13141330 var parentRKey string
13151331 if parentURI != "" {
···1382139813831399 actorKey := []byte(actorDID + ":" + rkey)
1384140013851385- // Check if comment exists
14011401+ // Check if comment exists and get subject URI from index
13861402 existingSubject := commentsByActor.Get(actorKey)
13871403 if existingSubject == nil {
13881404 return nil // Comment doesn't exist, nothing to do
14051405+ }
14061406+14071407+ // Use the subject URI from the index if not provided
14081408+ if subjectURI == "" {
14091409+ subjectURI = string(existingSubject)
13891410 }
1390141113911412 // Find and delete the comment by iterating over comments with matching subject
+257
internal/firehose/notifications.go
···11+package firehose
22+33+import (
44+ "encoding/json"
55+ "fmt"
66+ "strings"
77+ "time"
88+99+ "arabica/internal/models"
1010+1111+ "github.com/rs/zerolog/log"
1212+ bolt "go.etcd.io/bbolt"
1313+)
1414+1515+// Bucket names for notifications
1616+var (
1717+ // BucketNotifications stores notifications: {target_did}:{inverted_timestamp}:{id} -> {Notification JSON}
1818+ BucketNotifications = []byte("notifications")
1919+2020+ // BucketNotificationsMeta stores per-user metadata: {target_did}:last_read -> {timestamp RFC3339}
2121+ BucketNotificationsMeta = []byte("notifications_meta")
2222+)
2323+2424+// CreateNotification stores a notification for the target user.
2525+// Deduplicates by (type + actorDID + subjectURI) to prevent duplicates from backfills.
2626+// Self-notifications (actorDID == targetDID) are silently skipped.
2727+func (idx *FeedIndex) CreateNotification(targetDID string, notif models.Notification) error {
2828+ if targetDID == "" || targetDID == notif.ActorDID {
2929+ return nil // skip self-notifications
3030+ }
3131+3232+ return idx.db.Update(func(tx *bolt.Tx) error {
3333+ b := tx.Bucket(BucketNotifications)
3434+3535+ // Deduplication: scan for existing notification with same type+actor+subject
3636+ prefix := []byte(targetDID + ":")
3737+ c := b.Cursor()
3838+ for k, v := c.Seek(prefix); k != nil && strings.HasPrefix(string(k), string(prefix)); k, v = c.Next() {
3939+ var existing models.Notification
4040+ if err := json.Unmarshal(v, &existing); err != nil {
4141+ continue
4242+ }
4343+ if existing.Type == notif.Type && existing.ActorDID == notif.ActorDID && existing.SubjectURI == notif.SubjectURI {
4444+ return nil // duplicate, skip
4545+ }
4646+ }
4747+4848+ // Generate ID from timestamp
4949+ if notif.ID == "" {
5050+ notif.ID = fmt.Sprintf("%d", notif.CreatedAt.UnixNano())
5151+ }
5252+5353+ data, err := json.Marshal(notif)
5454+ if err != nil {
5555+ return fmt.Errorf("failed to marshal notification: %w", err)
5656+ }
5757+5858+ // Key: {target_did}:{inverted_timestamp}:{id} for reverse chronological order
5959+ inverted := ^uint64(notif.CreatedAt.UnixNano())
6060+ key := fmt.Sprintf("%s:%016x:%s", targetDID, inverted, notif.ID)
6161+ return b.Put([]byte(key), data)
6262+ })
6363+}
6464+6565+// GetNotifications returns notifications for a user, newest first.
6666+// Uses cursor-based pagination. Returns notifications, next cursor, and error.
6767+func (idx *FeedIndex) GetNotifications(targetDID string, limit int, cursor string) ([]models.Notification, string, error) {
6868+ var notifications []models.Notification
6969+ var nextCursor string
7070+7171+ if limit <= 0 {
7272+ limit = 20
7373+ }
7474+7575+ // Get last_read timestamp for marking read status
7676+ lastRead := idx.getLastRead(targetDID)
7777+7878+ err := idx.db.View(func(tx *bolt.Tx) error {
7979+ b := tx.Bucket(BucketNotifications)
8080+ c := b.Cursor()
8181+8282+ prefix := []byte(targetDID + ":")
8383+ var k, v []byte
8484+8585+ if cursor != "" {
8686+ // Seek to cursor position, then advance past it
8787+ k, v = c.Seek([]byte(cursor))
8888+ if k != nil && string(k) == cursor {
8989+ k, v = c.Next()
9090+ }
9191+ } else {
9292+ k, v = c.Seek(prefix)
9393+ }
9494+9595+ var lastKey []byte
9696+ count := 0
9797+ for ; k != nil && strings.HasPrefix(string(k), string(prefix)); k, v = c.Next() {
9898+ if count >= limit {
9999+ // There are more items beyond our limit
100100+ nextCursor = string(lastKey)
101101+ break
102102+ }
103103+104104+ var notif models.Notification
105105+ if err := json.Unmarshal(v, ¬if); err != nil {
106106+ continue
107107+ }
108108+109109+ // Determine read status based on last_read timestamp
110110+ if !lastRead.IsZero() && !notif.CreatedAt.After(lastRead) {
111111+ notif.Read = true
112112+ }
113113+114114+ notifications = append(notifications, notif)
115115+ lastKey = make([]byte, len(k))
116116+ copy(lastKey, k)
117117+ count++
118118+ }
119119+120120+ return nil
121121+ })
122122+123123+ return notifications, nextCursor, err
124124+}
125125+126126+// GetUnreadCount returns the number of unread notifications for a user.
127127+func (idx *FeedIndex) GetUnreadCount(targetDID string) int {
128128+ if targetDID == "" {
129129+ return 0
130130+ }
131131+132132+ lastRead := idx.getLastRead(targetDID)
133133+134134+ var count int
135135+ _ = idx.db.View(func(tx *bolt.Tx) error {
136136+ b := tx.Bucket(BucketNotifications)
137137+ c := b.Cursor()
138138+139139+ prefix := []byte(targetDID + ":")
140140+ for k, v := c.Seek(prefix); k != nil && strings.HasPrefix(string(k), string(prefix)); k, v = c.Next() {
141141+ var notif models.Notification
142142+ if err := json.Unmarshal(v, ¬if); err != nil {
143143+ continue
144144+ }
145145+ // If no last_read set, all are unread
146146+ if lastRead.IsZero() || notif.CreatedAt.After(lastRead) {
147147+ count++
148148+ } else {
149149+ // Since keys are in reverse chronological order,
150150+ // once we hit a read notification, all remaining are also read
151151+ break
152152+ }
153153+ }
154154+ return nil
155155+ })
156156+157157+ return count
158158+}
159159+160160+// MarkAllRead updates the last_read timestamp to now for the user.
161161+func (idx *FeedIndex) MarkAllRead(targetDID string) error {
162162+ return idx.db.Update(func(tx *bolt.Tx) error {
163163+ b := tx.Bucket(BucketNotificationsMeta)
164164+ key := []byte(targetDID + ":last_read")
165165+ return b.Put(key, []byte(time.Now().Format(time.RFC3339Nano)))
166166+ })
167167+}
168168+169169+// getLastRead returns the last_read timestamp for a user.
170170+func (idx *FeedIndex) getLastRead(targetDID string) time.Time {
171171+ var lastRead time.Time
172172+ _ = idx.db.View(func(tx *bolt.Tx) error {
173173+ b := tx.Bucket(BucketNotificationsMeta)
174174+ v := b.Get([]byte(targetDID + ":last_read"))
175175+ if v != nil {
176176+ if t, err := time.Parse(time.RFC3339Nano, string(v)); err == nil {
177177+ lastRead = t
178178+ }
179179+ }
180180+ return nil
181181+ })
182182+ return lastRead
183183+}
184184+185185+// parseTargetDID extracts the DID from an AT-URI (at://did:plc:xxx/collection/rkey)
186186+func parseTargetDID(atURI string) string {
187187+ if !strings.HasPrefix(atURI, "at://") {
188188+ return ""
189189+ }
190190+ rest := atURI[5:] // strip "at://"
191191+ parts := strings.SplitN(rest, "/", 2)
192192+ if len(parts) == 0 {
193193+ return ""
194194+ }
195195+ did := parts[0]
196196+ if !strings.HasPrefix(did, "did:") {
197197+ return ""
198198+ }
199199+ return did
200200+}
201201+202202+// CreateLikeNotification creates a notification for a like event
203203+func (idx *FeedIndex) CreateLikeNotification(actorDID, subjectURI string) {
204204+ targetDID := parseTargetDID(subjectURI)
205205+ if targetDID == "" || targetDID == actorDID {
206206+ return
207207+ }
208208+209209+ notif := models.Notification{
210210+ Type: models.NotificationLike,
211211+ ActorDID: actorDID,
212212+ SubjectURI: subjectURI,
213213+ CreatedAt: time.Now(),
214214+ }
215215+216216+ if err := idx.CreateNotification(targetDID, notif); err != nil {
217217+ log.Warn().Err(err).Str("actor", actorDID).Str("subject", subjectURI).Msg("failed to create like notification")
218218+ }
219219+}
220220+221221+// CreateCommentNotification creates notifications for a comment event.
222222+// Notifies the brew owner (comment) and the parent comment author (reply).
223223+func (idx *FeedIndex) CreateCommentNotification(actorDID, subjectURI, parentURI string) {
224224+ now := time.Now()
225225+226226+ // Notify the brew owner
227227+ targetDID := parseTargetDID(subjectURI)
228228+ if targetDID != "" && targetDID != actorDID {
229229+ notif := models.Notification{
230230+ Type: models.NotificationComment,
231231+ ActorDID: actorDID,
232232+ SubjectURI: subjectURI,
233233+ CreatedAt: now,
234234+ }
235235+ if err := idx.CreateNotification(targetDID, notif); err != nil {
236236+ log.Warn().Err(err).Str("actor", actorDID).Str("subject", subjectURI).Msg("failed to create comment notification")
237237+ }
238238+ }
239239+240240+ // If this is a reply, also notify the parent comment's author.
241241+ // We store the brew's subjectURI (not the parent comment URI) so the
242242+ // notification links directly to the brew page with comments.
243243+ if parentURI != "" {
244244+ parentAuthorDID := parseTargetDID(parentURI)
245245+ if parentAuthorDID != "" && parentAuthorDID != actorDID && parentAuthorDID != targetDID {
246246+ replyNotif := models.Notification{
247247+ Type: models.NotificationCommentReply,
248248+ ActorDID: actorDID,
249249+ SubjectURI: subjectURI, // brew URI, not parent comment URI
250250+ CreatedAt: now,
251251+ }
252252+ if err := idx.CreateNotification(parentAuthorDID, replyNotif); err != nil {
253253+ log.Warn().Err(err).Str("actor", actorDID).Str("parent", parentURI).Msg("failed to create reply notification")
254254+ }
255255+ }
256256+ }
257257+}