its for when you want to get like notifications for your reposts

feat!: rewrite notification type, add like record and profile record

ptr.pet cd597809 ff5169de

verified
+104 -116
+59 -36
server/main.go
··· 7 7 "log/slog" 8 8 "net/http" 9 9 "sync/atomic" 10 + "time" 10 11 11 12 "github.com/bluesky-social/indigo/api/bsky" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" ··· 25 26 const ListenTypeFollows = "follows" 26 27 27 28 type SubscriberData struct { 28 - ForActor syntax.DID 29 - Conn *websocket.Conn 30 - ListenType string 31 - ListenTo Set[syntax.DID] 29 + forActor syntax.DID 30 + conn *websocket.Conn 31 + listenType string 32 + listenTo Set[syntax.DID] 32 33 } 33 34 34 35 type ActorData struct { 35 - targets *hashmap.Map[string, *SubscriberData] 36 - likes map[syntax.RecordKey]bsky.FeedLike 37 - follows *hashmap.Map[syntax.RecordKey, bsky.GraphFollow] 38 - followsCursor atomic.Pointer[string] 36 + targets *hashmap.Map[string, *SubscriberData] 37 + likes map[syntax.RecordKey]bsky.FeedLike 38 + follows *hashmap.Map[syntax.RecordKey, bsky.GraphFollow] 39 + followsCursor atomic.Pointer[string] 40 + profile *bsky.ActorDefs_ProfileViewDetailed 41 + profileFetchedAt time.Time 42 + } 43 + 44 + type NotificationActor struct { 45 + DID syntax.DID `json:"did"` // the DID of the actor that (un)liked the post 46 + Profile *bsky.ActorDefs_ProfileViewDetailed `json:"profile"` // the detailed profile of the actor that (un)liked the post 39 47 } 40 48 41 49 type NotificationMessage struct { 42 - Liked bool `json:"liked"` 43 - ByDid syntax.DID `json:"did"` 44 - RepostURI syntax.ATURI `json:"repost_uri"` 45 - PostURI syntax.ATURI `json:"post_uri"` 50 + Liked bool `json:"liked"` // whether the message was liked or unliked 51 + Actor NotificationActor `json:"actor"` // information about the actor that (un)liked 52 + Record bsky.FeedLike `json:"record"` // the raw like record 53 + Time int64 `json:"time"` // when the like event came in 46 54 } 47 55 48 56 type SubscriberMessage struct { ··· 75 83 func getSubscriberDids() []string { 76 84 _dids := make(Set[string], subscribers.Len()) 77 85 subscribers.Range(func(s string, sd *SubscriberData) bool { 78 - _dids[string(sd.ForActor)] = struct{}{} 86 + _dids[string(sd.forActor)] = struct{}{} 79 87 return true 80 88 }) 81 89 dids := make([]string, 0, len(_dids)) ··· 159 167 160 168 ud := getActorData(did) 161 169 sd := &SubscriberData{ 162 - ForActor: did, 163 - Conn: conn, 164 - ListenType: listenType, 170 + forActor: did, 171 + conn: conn, 172 + listenType: listenType, 165 173 } 166 174 167 175 switch listenType { ··· 171 179 logger.Error("error fetching follows", "error", err) 172 180 return 173 181 } 174 - sd.ListenTo = make(Set[syntax.DID]) 182 + sd.listenTo = make(Set[syntax.DID]) 175 183 // use we have stored 176 184 ud.follows.Range(func(rk syntax.RecordKey, f bsky.GraphFollow) bool { 177 - sd.ListenTo[syntax.DID(f.Subject)] = struct{}{} 185 + sd.listenTo[syntax.DID(f.Subject)] = struct{}{} 178 186 return true 179 187 }) 180 188 if len(follows) > 0 { ··· 182 190 ud.followsCursor.Store((*string)(&follows[len(follows)-1].rkey)) 183 191 for _, f := range follows { 184 192 ud.follows.Insert(f.rkey, f.follow) 185 - sd.ListenTo[syntax.DID(f.follow.Subject)] = struct{}{} 193 + sd.listenTo[syntax.DID(f.follow.Subject)] = struct{}{} 186 194 } 187 195 } 188 196 logger.Info("fetched follows") 189 197 case ListenTypeNone: 190 - sd.ListenTo = make(Set[syntax.DID]) 198 + sd.listenTo = make(Set[syntax.DID]) 191 199 default: 192 200 http.Error(w, "invalid listen type", http.StatusBadRequest) 193 201 return 194 202 } 195 203 196 204 subscribers.Set(sid, sd) 197 - for listenDid := range sd.ListenTo { 205 + for listenDid := range sd.listenTo { 198 206 markActorForLikes(sid, sd, listenDid) 199 207 } 200 208 updateFollowStreamOpts() 201 209 // delete subscriber after we are done 202 210 defer func() { 203 - for listenDid := range sd.ListenTo { 211 + for listenDid := range sd.listenTo { 204 212 unmarkActorForLikes(sid, listenDid) 205 213 } 206 214 subscribers.Del(sid) ··· 219 227 switch msg.Type { 220 228 case "update_listen_to": 221 229 // only allow this if we arent managing listen to 222 - if sd.ListenType != ListenTypeNone { 230 + if sd.listenType != ListenTypeNone { 223 231 continue 224 232 } 225 233 ··· 229 237 break 230 238 } 231 239 // remove all current listens and add the ones the user requested 232 - for listenDid := range sd.ListenTo { 240 + for listenDid := range sd.listenTo { 233 241 unmarkActorForLikes(sid, listenDid) 234 - delete(sd.ListenTo, listenDid) 242 + delete(sd.listenTo, listenDid) 235 243 } 236 244 for _, listenDid := range innerMsg.ListenTo { 237 - sd.ListenTo[listenDid] = struct{}{} 245 + sd.listenTo[listenDid] = struct{}{} 238 246 markActorForLikes(sid, sd, listenDid) 239 247 } 240 248 } ··· 276 284 return nil 277 285 } 278 286 287 + logger := logger.With("actor", byDid, "type", "like") 288 + 279 289 deleted := event.Commit.Operation == models.CommitOperationDelete 280 290 rkey := syntax.RecordKey(event.Commit.RKey) 281 291 ··· 315 325 return err 316 326 } 317 327 ud.targets.Range(func(sid string, sd *SubscriberData) bool { 318 - if sd.ForActor != reposterDID { 328 + if sd.forActor != reposterDID { 319 329 return true 320 330 } 321 331 332 + if ud.profile == nil || time.Now().Sub(ud.profileFetchedAt) > time.Hour*24 { 333 + profile, err := fetchProfile(ctx, byDid) 334 + if err != nil { 335 + logger.Error("cant fetch profile", "error", err) 336 + } else { 337 + ud.profile = profile 338 + ud.profileFetchedAt = time.Now() 339 + } 340 + } 341 + 322 342 notification := NotificationMessage{ 323 - Liked: !deleted, 324 - ByDid: byDid, 325 - RepostURI: repostURI, 326 - PostURI: syntax.ATURI(like.Subject.Uri), 343 + Liked: !deleted, 344 + Actor: NotificationActor{ 345 + DID: byDid, 346 + Profile: ud.profile, 347 + }, 348 + Record: like, 349 + Time: event.TimeUS, 327 350 } 328 351 329 - if err := sd.Conn.WriteJSON(notification); err != nil { 330 - logger.Error("failed to send notification", "subscriber", sd.ForActor, "error", err) 352 + if err := sd.conn.WriteJSON(notification); err != nil { 353 + logger.Error("failed to send notification", "error", err) 331 354 } 332 355 return true 333 356 }) ··· 369 392 } 370 393 ud.targets.Range(func(sid string, sd *SubscriberData) bool { 371 394 // if we arent managing then we dont need to update anything 372 - if sd.ListenType != ListenTypeFollows { 395 + if sd.listenType != ListenTypeFollows { 373 396 return true 374 397 } 375 398 subjectDid := syntax.DID(r.Subject) 376 399 if deleted { 377 400 unmarkActorForLikes(sid, subjectDid) 378 - delete(sd.ListenTo, subjectDid) 401 + delete(sd.listenTo, subjectDid) 379 402 } else { 380 - sd.ListenTo[subjectDid] = struct{}{} 403 + sd.listenTo[subjectDid] = struct{}{} 381 404 markActorForLikes(sid, sd, subjectDid) 382 405 } 383 406 return true
+2 -11
server/xrpc.go
··· 10 10 "github.com/bluesky-social/indigo/atproto/identity" 11 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 12 "github.com/bluesky-social/indigo/xrpc" 13 - "github.com/bluesky-social/jetstream/pkg/models" 14 13 ) 15 14 16 15 func findUserPDS(ctx context.Context, did syntax.DID) (string, error) { ··· 26 25 return pdsURI, nil 27 26 } 28 27 29 - func fetchRecord[v any](ctx context.Context, xrpcClient *xrpc.Client, val *v, event *models.Event) error { 30 - out, err := atproto.RepoGetRecord(ctx, xrpcClient, "", event.Commit.Collection, event.Did, event.Commit.RKey) 31 - if err != nil { 32 - return err 33 - } 34 - raw, _ := out.Value.MarshalJSON() 35 - if err := json.Unmarshal(raw, val); err != nil { 36 - return err 37 - } 38 - return nil 28 + func fetchProfile(ctx context.Context, did syntax.DID) (*bsky.ActorDefs_ProfileViewDetailed, error) { 29 + return bsky.ActorGetProfile(ctx, &xrpc.Client{Host: "https://public.api.bsky.app"}, string(did)) 39 30 } 40 31 41 32 func fetchRecords[v any](ctx context.Context, xrpcClient *xrpc.Client, cb func(syntax.ATURI, v), cursor *string, collection string, did syntax.DID) error {
+20 -61
webapp/src/ActivityItem.tsx
··· 1 - import { Did, Handle } from "@atcute/lexicons"; 2 - import { Client, ok, simpleFetchHandler } from "@atcute/client"; 3 1 import type {} from "@atcute/bluesky"; 4 2 import type {} from "@atcute/atproto"; 5 - import { isDid, parseCanonicalResourceUri } from "@atcute/lexicons/syntax"; 3 + import { parseCanonicalResourceUri } from "@atcute/lexicons/syntax"; 6 4 import { Component, createSignal, createEffect } from "solid-js"; 7 - import { ATProtoActivity } from "./types.js"; 8 - 9 - const profileCache = new Map<string, ActorData>(); 10 - const handler = simpleFetchHandler({ 11 - service: "https://public.api.bsky.app", 12 - }); 13 - const rpc = new Client({ handler }); 5 + import { Notification } from "./types.js"; 14 6 15 7 interface ActivityItemProps { 16 - data: ATProtoActivity; 17 - } 18 - 19 - interface ActorData { 20 - did: Did; 21 - handle: Handle; 22 - displayName?: string; 8 + data: Notification; 23 9 } 24 10 25 11 export const ActivityItem: Component<ActivityItemProps> = (props) => { 26 - const [actorData, setActorData] = createSignal<ActorData | null>(null); 27 - const [error, setError] = createSignal<string | null>(null); 28 12 const [postUrl, setPostUrl] = createSignal<string | null>(null); 29 13 30 - const fetchProfile = async (did: string) => { 31 - if (!isDid(did)) { 32 - setError("user DID invalid"); 33 - return; 34 - } 35 - const resp = ok( 36 - await rpc.get("app.bsky.actor.getProfile", { 37 - params: { actor: did }, 38 - }), 39 - ); 40 - const data: ActorData = { 41 - did, 42 - handle: resp.handle, 43 - displayName: resp.displayName, 44 - }; 45 - setActorData(data); 46 - profileCache.set(data.did, data); 47 - }; 14 + const profile = props.data.actor.profile; 48 15 49 16 createEffect(() => { 50 - const actorData = profileCache.get(props.data.did); 51 - if (actorData) { 52 - setActorData(actorData); 53 - } else { 54 - fetchProfile(props.data.did); 55 - } 56 - 57 - const postUri = parseCanonicalResourceUri(props.data.post_uri); 17 + const postUri = parseCanonicalResourceUri( 18 + props.data.record.subject?.uri ?? "", 19 + ); 58 20 if (postUri.ok) { 59 21 setPostUrl( 60 22 `https://bsky.app/profile/${postUri.value.repo}/post/${postUri.value.rkey}`, ··· 72 34 > 73 35 <p text-wrap> 74 36 <span text-lg>{props.data.liked ? "❤️" : "💔"}</span>{" "} 75 - {(actorData() && ( 37 + {(profile && ( 76 38 <span font-medium text="sm gray-700"> 77 - {actorData()!.displayName ?? actorData()!.handle}{" "} 78 - {actorData()!.displayName && ( 39 + {profile!.displayName ?? profile!.handle}{" "} 40 + {profile!.displayName && ( 79 41 <span font-normal text-gray-500> 80 - (@{actorData()!.handle}) 42 + (@{profile!.handle}) 81 43 </span> 82 44 )} 83 45 </span> 84 - )) || 85 - (error() && ( 86 - <span italic text="xs red-500"> 87 - !{error()}! 88 - </span> 89 - )) || ( 90 - <span font-medium text="sm gray-700"> 91 - {props.data.did} 92 - </span> 93 - )}{" "} 46 + )) || ( 47 + <span font-medium text="sm gray-700"> 48 + {props.data.actor.did} 49 + </span> 50 + )}{" "} 94 51 <span text-gray-800>{props.data.liked ? "liked" : "unliked"}</span>{" "} 95 52 <a 96 53 text-blue-800 97 54 hover="underline text-blue-400" 98 - href={postUrl() ?? props.data.post_uri} 55 + href={postUrl() ?? props.data.record.subject?.uri} 99 56 > 100 57 this post 101 58 </a>{" "} 102 59 </p> 103 60 <div grow /> 104 - <div text="xs gray-500 end">{new Date().toLocaleTimeString()}</div> 61 + <div text="xs gray-500 end"> 62 + {new Date(props.data.time / 1000).toLocaleTimeString()} 63 + </div> 105 64 </div> 106 65 ); 107 66 };
+10 -4
webapp/src/App.tsx
··· 4 4 import type {} from "@atcute/atproto"; 5 5 import { isDid, isHandle } from "@atcute/lexicons/syntax"; 6 6 import { XrpcHandleResolver } from "@atcute/identity-resolver"; 7 - import { ATProtoActivity } from "./types.js"; 7 + import { Notification } from "./types.js"; 8 8 import { ActivityItem } from "./ActivityItem.jsx"; 9 9 10 10 const handleResolver = new XrpcHandleResolver({ ··· 15 15 const [actorId, setActorId] = createSignal<string>(""); 16 16 const [serviceDomain, setWsUrl] = createSignal<string>("likes.gaze.systems"); 17 17 const [isConnected, setIsConnected] = createSignal<boolean>(false); 18 - const [items, setItems] = createSignal<ATProtoActivity[]>([]); 18 + const [items, setItems] = createSignal<Notification[]>([]); 19 19 const [connectionStatus, setConnectionStatus] = createSignal< 20 20 "disconnected" | "connecting..." | "connected" | "error" 21 21 >("disconnected"); ··· 62 62 ws.close(); 63 63 } 64 64 65 - const url = `wss://${host}/subscribe/${did}`; 65 + let proto = "wss"; 66 + const domain = host.split(":").at(0) ?? ""; 67 + if (["localhost", "0.0.0.0", "127.0.0.1"].some((v) => v === domain)) { 68 + proto = "ws"; 69 + } 70 + 71 + const url = `${proto}://${host}/subscribe/${did}`; 66 72 67 73 try { 68 74 ws = new WebSocket(url); ··· 76 82 77 83 ws.onmessage = (event: MessageEvent) => { 78 84 try { 79 - const data: ATProtoActivity = JSON.parse(event.data); 85 + const data: Notification = JSON.parse(event.data); 80 86 setItems((prev) => [data, ...prev]); // add new items to the top 81 87 } catch (error) { 82 88 console.error("Error parsing JSON:", error);
+13 -4
webapp/src/types.ts
··· 1 - export interface ATProtoActivity { 2 - did: string; 1 + import { AppBskyFeedLike } from "@atcute/bluesky"; 2 + import { ProfileViewDetailed } from "@atcute/bluesky/types/app/actor/defs"; 3 + import { Did } from "@atcute/lexicons"; 4 + 5 + export interface Notification { 3 6 liked: boolean; 4 - repost_uri: string; 5 - post_uri: string; 7 + actor: NotificationActor; 8 + record: AppBskyFeedLike.Main; 9 + time: number; 10 + } 11 + 12 + export interface NotificationActor { 13 + did: Did; 14 + profile?: ProfileViewDetailed; 6 15 }