bsky feeds about music music-atmosphere-feed.plyr.fm/
bsky feed zig

add socket read timeout for auto-reconnection

turbostream sends data constantly, so 2 minutes of silence means
the connection is dead. set socket read timeout to trigger reconnect
instead of hanging forever.

also update zat dependency to new location (zat.dev/zat)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+563 -128
+2 -2
build.zig.zon
··· 13 13 .hash = "zqlite-0.0.0-RWLaY_y_mADh2LdbDrG_2HT2dBAcsAR8Jig_7-dOJd0B", 14 14 }, 15 15 .zat = .{ 16 - .url = "https://tangled.org/zzstoatzz.io/zat/archive/main", 17 - .hash = "zat-0.0.2-5PuC7rVbAQDTFAVjoBrKZZ2RCF1v3OuK2n4wIBABnJpM", 16 + .url = "https://tangled.sh/zat.dev/zat/archive/main", 17 + .hash = "zat-0.1.0-5PuC7heIAQA4j2UVmJT-oivQh5AwZTrFQ-NC4CJi2-_R", 18 18 }, 19 19 }, 20 20 .paths = .{
+3 -2
justfile
··· 18 18 fly logs 19 19 20 20 # publish feed to bluesky (requires HANDLE, PASSWORD, FEED_HOSTNAME in .env) 21 - publish: 22 - set -a && source .env && uv run scripts/publish.py 21 + # usage: just publish [--organic | --following | --all] 22 + publish *ARGS: 23 + set -a && source .env && uv run scripts/publish.py {{ARGS}}
+7
scripts/publish.py
··· 9 9 usage: 10 10 HANDLE=you.bsky.social PASSWORD=xxx FEED_HOSTNAME=your.domain ./scripts/publish.py 11 11 HANDLE=you.bsky.social PASSWORD=xxx FEED_HOSTNAME=your.domain ./scripts/publish.py --following 12 + HANDLE=you.bsky.social PASSWORD=xxx FEED_HOSTNAME=your.domain ./scripts/publish.py --organic 12 13 HANDLE=you.bsky.social PASSWORD=xxx FEED_HOSTNAME=your.domain ./scripts/publish.py --all 13 14 """ 14 15 ··· 25 26 "music-following": { 26 27 "display_name": "music (following)", 27 28 "description": "music posts from people you follow | https://music-atmosphere-feed.plyr.fm", 29 + }, 30 + "music-organic": { 31 + "display_name": "music (organic)", 32 + "description": "music posts, excluding high-volume bots | https://music-atmosphere-feed.plyr.fm", 28 33 }, 29 34 } 30 35 ··· 62 67 feeds_to_publish = list(FEEDS.keys()) 63 68 elif "--following" in sys.argv: 64 69 feeds_to_publish = ["music-following"] 70 + elif "--organic" in sys.argv: 71 + feeds_to_publish = ["music-organic"] 65 72 else: 66 73 # default: just the main feed (backwards compatible) 67 74 record_name = os.environ.get("FEED_RECORD_NAME", "music-atmosphere")
+14
src/feed/config.zig
··· 13 13 pub const FeedType = enum { 14 14 all, // music-atmosphere: all music posts 15 15 following, // music-following: only from people you follow 16 + organic, // music-organic: excludes high-volume bot posters 16 17 }; 17 18 18 19 var service_did_buf: [256]u8 = undefined; 19 20 var feed_uri_buf: [512]u8 = undefined; 20 21 var feed_following_uri_buf: [512]u8 = undefined; 22 + var feed_organic_uri_buf: [512]u8 = undefined; 21 23 var initialized = false; 22 24 var cached_service_did: []const u8 = ""; 23 25 var cached_feed_uri: []const u8 = ""; 24 26 var cached_feed_following_uri: []const u8 = ""; 27 + var cached_feed_organic_uri: []const u8 = ""; 25 28 26 29 fn ensureInit() void { 27 30 if (initialized) return; ··· 38 41 39 42 // build following feed URI 40 43 cached_feed_following_uri = std.fmt.bufPrint(&feed_following_uri_buf, "at://{s}/app.bsky.feed.generator/music-following", .{publisher_did}) catch ""; 44 + 45 + // build organic feed URI 46 + cached_feed_organic_uri = std.fmt.bufPrint(&feed_organic_uri_buf, "at://{s}/app.bsky.feed.generator/music-organic", .{publisher_did}) catch ""; 41 47 42 48 initialized = true; 43 49 } ··· 77 83 return cached_feed_following_uri; 78 84 } 79 85 86 + pub fn getFeedOrganicUri() []const u8 { 87 + ensureInit(); 88 + return cached_feed_organic_uri; 89 + } 90 + 80 91 /// check if a feed URI is supported and return its type 81 92 pub fn getFeedType(feed_uri: []const u8) ?FeedType { 82 93 ensureInit(); ··· 85 96 } 86 97 if (std.mem.eql(u8, feed_uri, cached_feed_following_uri)) { 87 98 return .following; 99 + } 100 + if (std.mem.eql(u8, feed_uri, cached_feed_organic_uri)) { 101 + return .organic; 88 102 } 89 103 return null; 90 104 }
+40 -57
src/feed/filter.zig
··· 5 5 const stats = @import("../server/stats.zig"); 6 6 7 7 const Record = json.ObjectMap; 8 - const Filter = *const fn (Record) ?bool; 9 8 10 - /// filters that can exclude posts (returning false = exclude, null = no opinion) 11 - const exclusion_filters: []const Filter = &.{ 12 - excludeNsfwLabels, 13 - }; 9 + /// returns true if the post should be included in the feed. 10 + /// hydrated contains turbostream's hydrated_metadata with quoted/parent posts 11 + pub fn matches(record: Record, hydrated: ?json.Value) bool { 12 + // exclusion filters 13 + if (excludeNsfwLabels(record) == false) return false; 14 14 15 - /// filters that can include posts (returning true = include, null = no opinion) 16 - const inclusion_filters: []const Filter = &.{ 17 - includeMusicLinks, 18 - }; 15 + // inclusion filters - check record and hydrated metadata 16 + if (includeMusicLinks(record) == true) return true; 17 + if (hydratedHasMusicLinks(hydrated)) return true; 19 18 20 - /// returns true if the post should be included in the feed. 21 - pub fn matches(record: Record) bool { 22 - for (exclusion_filters) |f| { 23 - if (f(record) == false) return false; 24 - } 25 - for (inclusion_filters) |f| { 26 - if (f(record) == true) return true; 27 - } 28 19 return false; 29 20 } 30 21 ··· 75 66 if (containsMusicDomain(uri)) return true; 76 67 } 77 68 78 - // check quoted post's embed (embed.record.value.embed.external.uri) 79 - if (zat.json.getString(val, "embed.record.value.embed.external.uri")) |uri| { 80 - if (containsMusicDomain(uri)) return true; 81 - } 82 - 83 - // check recordWithMedia quoted post (embed.record.record.value.embed.external.uri) 84 - if (zat.json.getString(val, "embed.record.record.value.embed.external.uri")) |uri| { 85 - if (containsMusicDomain(uri)) return true; 86 - } 87 - 88 69 // check facets (inline links) 89 70 if (zat.json.getArray(val, "facets")) |facets| { 90 71 if (checkFacetsForMusic(facets)) return true; 91 72 } 92 73 93 - // check quoted post's facets 94 - if (zat.json.getArray(val, "embed.record.value.facets")) |facets| { 95 - if (checkFacetsForMusic(facets)) return true; 96 - } 97 - 74 + // note: quoted post content is in hydrated_metadata, checked by hydratedHasMusicLinks 98 75 return null; 99 76 } 100 77 ··· 111 88 return false; 112 89 } 113 90 91 + /// check turbostream's hydrated_metadata for music links in quoted posts 92 + fn hydratedHasMusicLinks(hydrated: ?json.Value) bool { 93 + const h = hydrated orelse return false; 94 + 95 + // turbostream hydrates quoted post as "quote_post" (postView structure) in hydrated_metadata 96 + // the actual post record is in quote_post.record 97 + // check quote_post.record.embed.external.uri 98 + if (zat.json.getString(h, "quote_post.record.embed.external.uri")) |uri| { 99 + if (containsMusicDomain(uri)) return true; 100 + } 101 + 102 + // check quote_post.record.facets for inline links 103 + if (zat.json.getArray(h, "quote_post.record.facets")) |facets| { 104 + if (checkFacetsForMusic(facets)) return true; 105 + } 106 + 107 + return false; 108 + } 109 + 114 110 /// returns true if the post matched because it quotes a music post (not direct link) 115 - pub fn isQuoteMatch(record: Record) bool { 111 + pub fn isQuoteMatch(record: Record, hydrated: ?json.Value) bool { 116 112 const val: json.Value = .{ .object = record }; 117 113 118 114 // if post has direct music links, it's not a quote match ··· 123 119 if (checkFacetsForMusic(facets)) return false; 124 120 } 125 121 126 - // check if music is in quoted content 127 - if (zat.json.getString(val, "embed.record.value.embed.external.uri")) |uri| { 128 - if (containsMusicDomain(uri)) return true; 129 - } 130 - if (zat.json.getArray(val, "embed.record.value.facets")) |facets| { 131 - if (checkFacetsForMusic(facets)) return true; 132 - } 133 - if (zat.json.getString(val, "embed.record.record.value.embed.external.uri")) |uri| { 134 - if (containsMusicDomain(uri)) return true; 135 - } 122 + // check hydrated_metadata for quoted post with music 123 + if (hydratedHasMusicLinks(hydrated)) return true; 136 124 137 125 return false; 138 126 } ··· 204 192 }; 205 193 206 194 /// detect all unique platforms in a post record 207 - pub fn detectAllPlatformsFromRecord(record: Record) PlatformSet { 195 + pub fn detectAllPlatformsFromRecord(record: Record, hydrated: ?json.Value) PlatformSet { 208 196 var result = PlatformSet{}; 209 197 const val: json.Value = .{ .object = record }; 210 198 211 199 // check embed.external.uri 212 200 if (zat.json.getString(val, "embed.external.uri")) |uri| { 213 - if (detectPlatform(uri)) |p| result.add(p); 214 - } 215 - 216 - // check quoted post's embed 217 - if (zat.json.getString(val, "embed.record.value.embed.external.uri")) |uri| { 218 - if (detectPlatform(uri)) |p| result.add(p); 219 - } 220 - 221 - // check recordWithMedia quoted post 222 - if (zat.json.getString(val, "embed.record.record.value.embed.external.uri")) |uri| { 223 201 if (detectPlatform(uri)) |p| result.add(p); 224 202 } 225 203 ··· 228 206 detectPlatformsInFacets(facets, &result); 229 207 } 230 208 231 - // check quoted post's facets 232 - if (zat.json.getArray(val, "embed.record.value.facets")) |facets| { 233 - detectPlatformsInFacets(facets, &result); 209 + // check hydrated quoted post (quote_post is a postView, record is nested) 210 + if (hydrated) |h| { 211 + if (zat.json.getString(h, "quote_post.record.embed.external.uri")) |uri| { 212 + if (detectPlatform(uri)) |p| result.add(p); 213 + } 214 + if (zat.json.getArray(h, "quote_post.record.facets")) |facets| { 215 + detectPlatformsInFacets(facets, &result); 216 + } 234 217 } 235 218 236 219 return result;
+36 -7
src/jetstream.zig
··· 20 20 rkey: []const u8, 21 21 }; 22 22 23 + // socket read timeout: if no data for this long, reconnect 24 + // turbostream sends data constantly, so 2 minutes without data means dead connection 25 + const READ_TIMEOUT_MS: u32 = 120_000; 26 + 23 27 pub fn consumer(allocator: Allocator) void { 24 28 var backoff: u64 = 1; 25 29 const max_backoff: u64 = 60; 30 + var had_successful_connection = false; 26 31 27 32 while (true) { 28 - connect(allocator) catch |err| { 33 + had_successful_connection = false; 34 + 35 + connect(allocator, &had_successful_connection) catch |err| { 29 36 std.debug.print("turbostream error: {}, reconnecting in {}s...\n", .{ err, backoff }); 30 37 }; 38 + 39 + // if we received messages, connection was working - reset backoff 40 + if (had_successful_connection) { 41 + backoff = 1; 42 + } 43 + 31 44 posix.nanosleep(backoff, 0); 32 45 backoff = @min(backoff * 2, max_backoff); 33 46 } 34 47 } 35 48 36 - fn connect(allocator: Allocator) !void { 49 + fn connect(allocator: Allocator, had_successful_connection: *bool) !void { 37 50 const host = posix.getenv("TURBOSTREAM_HOST") orelse TURBOSTREAM_HOST; 38 51 const path = TURBOSTREAM_PATH; 39 52 ··· 58 71 return err; 59 72 }; 60 73 61 - std.debug.print("turbostream connected!\n", .{}); 74 + // set socket read timeout - if no data for READ_TIMEOUT_MS, read will fail 75 + // and we'll reconnect. turbostream sends data constantly so silence = dead connection 76 + client.readTimeout(READ_TIMEOUT_MS) catch |err| { 77 + std.debug.print("failed to set read timeout: {}\n", .{err}); 78 + }; 79 + 80 + std.debug.print("turbostream connected! (read timeout={}ms)\n", .{READ_TIMEOUT_MS}); 62 81 stats.get().recordConnected(); 63 82 64 - var handler = Handler{ .allocator = allocator }; 83 + var handler = Handler{ .allocator = allocator, .had_successful_connection = had_successful_connection }; 65 84 client.readLoop(&handler) catch |err| { 66 85 std.debug.print("websocket read loop error: {}\n", .{err}); 67 86 return err; ··· 70 89 71 90 const Handler = struct { 72 91 allocator: Allocator, 92 + had_successful_connection: *bool, 73 93 msg_count: usize = 0, 74 94 last_summary_time: i64 = 0, 75 95 76 96 pub fn serverMessage(self: *Handler, data: []const u8) !void { 77 97 self.msg_count += 1; 98 + self.had_successful_connection.* = true; // mark connection as working 78 99 79 100 self.processRecord(data) catch |err| { 80 101 if (err != error.NotAPost and err != error.NoMatch) { ··· 150 171 const post_record = commit.object.get("record") orelse return error.NotAPost; 151 172 if (post_record != .object) return error.NotAPost; 152 173 174 + // get hydrated_metadata for quoted/parent posts 175 + const hydrated = record_obj.get("hydrated_metadata"); 176 + 153 177 // check if post matches filter criteria 154 - if (!filter.matches(post_record.object)) return error.NoMatch; 178 + if (!filter.matches(post_record.object, hydrated)) return error.NoMatch; 155 179 156 180 // get uri from turbostream (it provides at_uri directly) 157 181 const at_uri = record_obj.get("at_uri") orelse return error.NotAPost; ··· 166 190 const s = stats.get(); 167 191 s.recordMatch(); 168 192 193 + // track poster 194 + if (record_obj.get("did")) |did_val| { 195 + if (did_val == .string) s.recordPoster(did_val.string); 196 + } 197 + 169 198 // track all platforms in post 170 - const platforms = filter.detectAllPlatformsFromRecord(post_record.object); 199 + const platforms = filter.detectAllPlatformsFromRecord(post_record.object, hydrated); 171 200 if (platforms.soundcloud) s.recordPlatform(.soundcloud); 172 201 if (platforms.bandcamp) s.recordPlatform(.bandcamp); 173 202 if (platforms.spotify) s.recordPlatform(.spotify); ··· 175 204 if (platforms.apple) s.recordPlatform(.apple); 176 205 177 206 // track match types 178 - if (filter.isQuoteMatch(post_record.object)) s.recordQuoteMatch(); 207 + if (filter.isQuoteMatch(post_record.object, hydrated)) s.recordQuoteMatch(); 179 208 if (platforms.count() >= 2) s.recordMultiPlatform(); 180 209 181 210 std.debug.print("added: {s}\n", .{at_uri.string});
+153 -12
src/server/dashboard.zig
··· 124 124 \\ justify-content: space-between; 125 125 \\ } 126 126 \\ footer a { color: #555; } 127 + \\ .poster-cards { display: flex; gap: 0.5rem; flex-wrap: wrap; margin-top: 0.5rem; } 128 + \\ .poster-card { 129 + \\ display: flex; 130 + \\ align-items: center; 131 + \\ gap: 0.4rem; 132 + \\ padding: 0.3rem 0.5rem; 133 + \\ background: #151515; 134 + \\ border-radius: 999px; 135 + \\ text-decoration: none; 136 + \\ transition: background 0.15s; 137 + \\ } 138 + \\ .poster-card:hover { background: #222; } 139 + \\ .poster-avatar { 140 + \\ border-radius: 50%; 141 + \\ background: #333; 142 + \\ flex-shrink: 0; 143 + \\ } 144 + \\ .poster-count { color: #666; font-size: 11px; } 145 + \\ .insight-row { margin-bottom: 0.4rem; } 146 + \\ .insight-row:last-child { margin-bottom: 0; } 147 + \\ .hint { border-bottom: 1px dotted #555; cursor: help; } 148 + \\ .subsection { color: #555; margin-top: 0.75rem; margin-bottom: 0.25rem; font-size: 11px; } 149 + \\ details { margin-bottom: 1.5rem; font-size: 12px; } 150 + \\ summary { color: #444; cursor: pointer; user-select: none; } 151 + \\ summary:hover { color: #666; } 152 + \\ summary::marker { color: #333; } 153 + \\ .stats-grid { display: grid; grid-template-columns: 1fr 1fr; gap: 1rem; margin-top: 0.75rem; } 154 + \\ .stats-group h4 { color: #555; font-size: 11px; font-weight: normal; margin-bottom: 0.4rem; } 155 + \\ .stats-group p { color: #666; margin: 0.2rem 0; } 156 + \\ .stats-group span { color: #888; } 127 157 \\ </style> 128 158 \\</head> 129 159 \\<body> ··· 143 173 \\ <div class="feed-info"> 144 174 \\ <div class="feed-name">music (following)</div> 145 175 \\ <div class="feed-desc">only from people you follow</div> 176 + \\ </div> 177 + \\ <svg viewBox="0 0 568 501"><path d="M123.121 33.664C188.241 82.553 258.281 181.68 284 234.873c25.719-53.192 95.759-152.32 160.879-201.21C491.866-1.611 568-28.906 568 57.947c0 17.346-9.945 145.713-15.778 166.555-20.275 72.453-94.155 90.933-159.875 79.748C507.222 323.8 536.444 388.56 473.333 453.32c-119.86 122.992-172.272-30.859-185.702-70.281-2.462-7.227-3.614-10.608-3.631-7.733-.017-2.875-1.169.506-3.631 7.733-13.43 39.422-65.842 193.273-185.702 70.281-63.111-64.76-33.89-129.52 80.986-149.07-65.72 11.185-139.6-7.295-159.875-79.748C9.945 203.659 0 75.291 0 57.946 0-28.906 76.135-1.612 123.121 33.664Z"/></svg> 178 + \\ </a> 179 + \\ <a href="https://bsky.app/profile/did:plc:vs3hnzq2daqbszxlysywzy54/feed/music-organic" class="feed-item"> 180 + \\ <div class="feed-info"> 181 + \\ <div class="feed-name">music (organic)</div> 182 + \\ <div class="feed-desc">excludes high-volume bots</div> 146 183 \\ </div> 147 184 \\ <svg viewBox="0 0 568 501"><path d="M123.121 33.664C188.241 82.553 258.281 181.68 284 234.873c25.719-53.192 95.759-152.32 160.879-201.21C491.866-1.611 568-28.906 568 57.947c0 17.346-9.945 145.713-15.778 166.555-20.275 72.453-94.155 90.933-159.875 79.748C507.222 323.8 536.444 388.56 473.333 453.32c-119.86 122.992-172.272-30.859-185.702-70.281-2.462-7.227-3.614-10.608-3.631-7.733-.017-2.875-1.169.506-3.631 7.733-13.43 39.422-65.842 193.273-185.702 70.281-63.111-64.76-33.89-129.52 80.986-149.07-65.72 11.185-139.6-7.295-159.875-79.748C9.945 203.659 0 75.291 0 57.946 0-28.906 76.135-1.612 123.121 33.664Z"/></svg> 148 185 \\ </a> ··· 226 263 227 264 // match rate 228 265 const match_rate = s.getMatchRate(); 229 - try w.print("{d:.1} posts/hour", .{match_rate}); 266 + try w.print("<div class=\"insight-row\">{d:.1} posts/hour</div>", .{match_rate}); 230 267 231 - // quote matches 268 + // quote matches and multi-link 232 269 const quote_matches = s.getQuoteMatches(); 233 270 const multi_platform = s.getMultiPlatform(); 234 271 const matches = s.getMatches(); 235 272 236 273 if (matches > 0) { 274 + try w.writeAll("<div class=\"insight-row\">"); 275 + try w.print("<span class=\"hint\" title=\"posts that quote another post containing a music link\">{d} quotes</span>", .{quote_matches}); 237 276 try w.writeAll(" · "); 238 - if (quote_matches > 0) { 239 - const quote_pct: f64 = @as(f64, @floatFromInt(quote_matches)) / @as(f64, @floatFromInt(matches)) * 100.0; 240 - try w.print("{d} quotes ({d:.1}%)", .{ quote_matches, quote_pct }); 241 - } else { 242 - try w.writeAll("0 quotes"); 277 + try w.print("<span class=\"hint\" title=\"posts with links to multiple music platforms\">{d} multi-link</span>", .{multi_platform}); 278 + try w.writeAll("</div>"); 279 + } 280 + 281 + // top posters subsection 282 + try w.writeAll( 283 + \\<div class="subsection">top posters</div> 284 + ); 285 + 286 + // get top 5 posters 287 + const top = s.getTopPosters(5); 288 + const unique_count = s.getUniquePosterCount(); 289 + const concentration = s.getTopConcentration(5); 290 + const posters_since = s.getPostersSince(); 291 + 292 + if (unique_count > 0) { 293 + // show distribution insight with "since" qualifier 294 + try w.print("{d} unique · top 5 = {d:.0}%", .{ unique_count, concentration }); 295 + 296 + // format timestamp with day/time 297 + if (posters_since > 0) { 298 + const months = [_][]const u8{ "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 299 + const days = [_][]const u8{ "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat" }; 300 + const epoch_secs: u64 = @intCast(posters_since); 301 + const es = std.time.epoch.EpochSeconds{ .secs = epoch_secs }; 302 + const epoch_day = es.getEpochDay(); 303 + // epoch (Jan 1 1970) was Thursday (4), so: (day + 4) % 7 304 + const dow: usize = @intCast(@mod(epoch_day.day + 4, 7)); 305 + const yd = epoch_day.calculateYearDay(); 306 + const md = yd.calculateMonthDay(); 307 + const day_secs = es.getDaySeconds(); 308 + const hour = day_secs.getHoursIntoDay(); 309 + const min = day_secs.getMinutesIntoHour(); 310 + const hour_12 = if (hour == 0) 12 else if (hour > 12) hour - 12 else hour; 311 + const ampm: []const u8 = if (hour < 12) "am" else "pm"; 312 + try w.print(" <span style=\"color:#444\">(since {s} {s} {d} at {d}:{d:0>2}{s} UTC)</span>", .{ 313 + days[dow], 314 + months[md.month.numeric() - 1], 315 + md.day_index + 1, 316 + hour_12, 317 + min, 318 + ampm, 319 + }); 320 + } 321 + 322 + // find max count for relative sizing 323 + var max_count: u32 = 1; 324 + for (top) |entry| { 325 + if (entry.count > max_count) max_count = entry.count; 243 326 } 244 - try w.writeAll(" · "); 245 - if (multi_platform > 0) { 246 - try w.print("{d} multi-link", .{multi_platform}); 247 - } else { 248 - try w.writeAll("0 multi-link"); 327 + 328 + try w.writeAll( 329 + \\<div class="poster-cards"> 330 + ); 331 + 332 + // render avatar cards with relative sizing 333 + for (top) |entry| { 334 + if (entry.count == 0) continue; 335 + const did = entry.getDid(); 336 + // size: 20-36px based on relative post count 337 + const ratio: f64 = @as(f64, @floatFromInt(entry.count)) / @as(f64, @floatFromInt(max_count)); 338 + const size: u32 = 20 + @as(u32, @intFromFloat(ratio * 16)); 339 + try w.print( 340 + \\<a href="https://bsky.app/profile/{s}" class="poster-card" data-did="{s}"><img class="poster-avatar" width="{d}" height="{d}" alt=""><span class="poster-count">{d}</span></a> 341 + , .{ did, did, size, size, entry.count }); 249 342 } 343 + 344 + try w.writeAll("</div>"); 345 + } else { 346 + try w.writeAll("collecting data..."); 250 347 } 251 348 252 349 try w.writeAll( ··· 258 355 \\ <div class="criteria-list">posts with nsfw <a href="https://docs.bsky.app/docs/advanced-guides/moderation">labels</a></div> 259 356 \\ </div> 260 357 \\ 358 + \\ <details> 359 + \\ <summary>firehose stats</summary> 360 + \\ <div class="stats-grid"> 361 + \\ <div class="stats-group"> 362 + \\ <h4>throughput</h4> 363 + ); 364 + 365 + // firehose throughput - the interesting stuff not shown above 366 + const total_messages = s.getMessages(); 367 + const total_matches = s.getMatches(); 368 + try w.print("<p><span>messages:</span> {d}</p>", .{total_messages}); 369 + try w.print("<p><span>matches:</span> {d}</p>", .{total_matches}); 370 + if (total_messages > 0) { 371 + const match_pct: f64 = @as(f64, @floatFromInt(total_matches)) / @as(f64, @floatFromInt(total_messages)) * 100.0; 372 + try w.print("<p><span>match rate:</span> {d:.3}%</p>", .{match_pct}); 373 + } 374 + 375 + try w.writeAll( 376 + \\ </div> 377 + \\ <div class="stats-group"> 378 + \\ <h4>infrastructure</h4> 379 + ); 380 + 381 + try w.writeAll("<p><span>region:</span> ord</p>"); 382 + try w.writeAll("<p><span>memory:</span> 256 MB</p>"); 383 + 384 + try w.writeAll( 385 + \\ </div> 386 + \\ </div> 387 + \\ </details> 388 + \\ 261 389 \\ <footer> 262 390 \\ <a href="https://tangled.sh/@zzstoatzz.io/music-atmosphere-feed">source</a> 263 391 \\ <span>powered by <a href="https://graze.social/docs/graze-turbostream" target="_blank">turbostream</a></span> ··· 286 414 \\ document.getElementById('uptime').textContent = fmt(Date.now() - startedAt); 287 415 \\ }, 1000); 288 416 \\ document.getElementById('uptime').textContent = fmt(Date.now() - startedAt); 417 + \\ 418 + \\ // fetch avatars and names for top posters 419 + \\ document.querySelectorAll('.poster-card').forEach(async card => { 420 + \\ const did = card.dataset.did; 421 + \\ if (!did) return; 422 + \\ try { 423 + \\ const res = await fetch(`https://public.api.bsky.app/xrpc/app.bsky.actor.getProfile?actor=${did}`); 424 + \\ if (!res.ok) return; 425 + \\ const data = await res.json(); 426 + \\ if (data.avatar) card.querySelector('img').src = data.avatar; 427 + \\ card.title = data.displayName || data.handle || did; 428 + \\ } catch {} 429 + \\ }); 289 430 \\ </script> 290 431 \\</body> 291 432 \\</html>
+17 -3
src/server/http.zig
··· 102 102 const service_did = config.getServiceDid(); 103 103 const feed_uri = config.getFeedUri(); 104 104 const feed_following_uri = config.getFeedFollowingUri(); 105 + const feed_organic_uri = config.getFeedOrganicUri(); 105 106 106 107 var buf: std.ArrayList(u8) = .{}; 107 108 defer buf.deinit(alloc); 108 109 const w = buf.writer(alloc); 109 110 110 111 try w.print( 111 - \\{{"encoding":"application/json","body":{{"did":"{s}","feeds":[{{"uri":"{s}"}},{{"uri":"{s}"}}]}}}} 112 - , .{ service_did, feed_uri, feed_following_uri }); 112 + \\{{"encoding":"application/json","body":{{"did":"{s}","feeds":[{{"uri":"{s}"}},{{"uri":"{s}"}},{{"uri":"{s}"}}]}}}} 113 + , .{ service_did, feed_uri, feed_following_uri, feed_organic_uri }); 113 114 114 115 try sendJson(request, .ok, buf.items); 115 116 } ··· 176 177 var author_filter: ?[][]const u8 = null; 177 178 defer if (author_filter) |f| api.freeFollows(alloc, f); 178 179 180 + // get exclude filter for organic feed (bots with >500 posts) 181 + var exclude_dids: ?[][]const u8 = null; 182 + defer if (exclude_dids) |e| posts.freeDidList(alloc, e); 183 + 179 184 if (feed_type == .following) { 180 185 if (requester_did) |did| { 181 186 // fetch follows ··· 197 202 try sendJson(request, .ok, empty_response); 198 203 return; 199 204 } 205 + } else if (feed_type == .organic) { 206 + // get high-volume posters (>500 posts = likely bots) 207 + exclude_dids = posts.getHighVolumePosters(alloc, 500) catch |err| blk: { 208 + std.debug.print("failed to get high-volume posters: {}\n", .{err}); 209 + break :blk null; 210 + }; 211 + if (exclude_dids) |e| { 212 + std.debug.print("organic feed: excluding {d} high-volume posters\n", .{e.len}); 213 + } 200 214 } 201 215 202 216 // query posts 203 - const query_result = posts.query(alloc, cursor, limit, author_filter) catch |err| { 217 + const query_result = posts.queryWithExclude(alloc, cursor, limit, author_filter, exclude_dids) catch |err| { 204 218 std.debug.print("feed error: {}\n", .{err}); 205 219 try sendJson(request, .internal_server_error, "{\"error\":\"internal error\"}"); 206 220 return;
+206 -3
src/server/stats.zig
··· 3 3 const json = std.json; 4 4 const Atomic = std.atomic.Value; 5 5 const Thread = std.Thread; 6 + const Mutex = Thread.Mutex; 6 7 7 8 const STATS_PATH = "/data/stats.json"; 8 9 9 10 pub const Platform = enum { soundcloud, bandcamp, spotify, plyr, apple }; 11 + 12 + // top posters tracking 13 + pub const MAX_POSTERS = 100; 14 + 15 + pub const PosterEntry = struct { 16 + did: [64]u8 = undefined, // DIDs are typically 32 chars but allow headroom 17 + did_len: u8 = 0, 18 + count: u32 = 0, 19 + 20 + pub fn getDid(self: *const PosterEntry) []const u8 { 21 + return self.did[0..self.did_len]; 22 + } 23 + 24 + pub fn setDid(self: *PosterEntry, did: []const u8) void { 25 + const len = @min(did.len, 64); 26 + @memcpy(self.did[0..len], did[0..len]); 27 + self.did_len = @intCast(len); 28 + } 29 + }; 30 + 31 + pub const TopPosters = struct { 32 + entries: [MAX_POSTERS]PosterEntry = [_]PosterEntry{.{}} ** MAX_POSTERS, 33 + count: usize = 0, 34 + mutex: Mutex = .{}, 35 + 36 + pub fn record(self: *TopPosters, did: []const u8) void { 37 + self.mutex.lock(); 38 + defer self.mutex.unlock(); 39 + 40 + // look for existing entry 41 + for (self.entries[0..self.count]) |*entry| { 42 + if (std.mem.eql(u8, entry.getDid(), did)) { 43 + entry.count += 1; 44 + return; 45 + } 46 + } 47 + 48 + // add new entry if space 49 + if (self.count < MAX_POSTERS) { 50 + self.entries[self.count].setDid(did); 51 + self.entries[self.count].count = 1; 52 + self.count += 1; 53 + } 54 + } 55 + 56 + pub fn getTop(self: *TopPosters, comptime N: usize) [N]PosterEntry { 57 + self.mutex.lock(); 58 + defer self.mutex.unlock(); 59 + 60 + // copy and sort 61 + var sorted: [MAX_POSTERS]PosterEntry = self.entries; 62 + const slice = sorted[0..self.count]; 63 + 64 + std.mem.sort(PosterEntry, slice, {}, struct { 65 + fn cmp(_: void, a: PosterEntry, b: PosterEntry) bool { 66 + return a.count > b.count; 67 + } 68 + }.cmp); 69 + 70 + var result: [N]PosterEntry = [_]PosterEntry{.{}} ** N; 71 + const copy_len = @min(N, self.count); 72 + @memcpy(result[0..copy_len], slice[0..copy_len]); 73 + return result; 74 + } 75 + 76 + pub fn toJson(self: *TopPosters, alloc: std.mem.Allocator) ![]const u8 { 77 + self.mutex.lock(); 78 + defer self.mutex.unlock(); 79 + 80 + var buf = std.ArrayList(u8).init(alloc); 81 + const w = buf.writer(); 82 + try w.writeAll("{"); 83 + var first = true; 84 + for (self.entries[0..self.count]) |entry| { 85 + if (entry.count == 0) continue; 86 + if (!first) try w.writeAll(","); 87 + first = false; 88 + try w.print("\"{s}\":{d}", .{ entry.getDid(), entry.count }); 89 + } 90 + try w.writeAll("}"); 91 + return buf.toOwnedSlice(); 92 + } 93 + 94 + pub fn loadFromJson(self: *TopPosters, data: []const u8) void { 95 + self.mutex.lock(); 96 + defer self.mutex.unlock(); 97 + 98 + const parsed = json.parseFromSlice(json.Value, std.heap.page_allocator, data, .{}) catch return; 99 + defer parsed.deinit(); 100 + 101 + if (parsed.value != .object) return; 102 + const obj = parsed.value.object; 103 + 104 + self.count = 0; 105 + var iter = obj.iterator(); 106 + while (iter.next()) |kv| { 107 + if (self.count >= MAX_POSTERS) break; 108 + if (kv.value_ptr.* != .integer) continue; 109 + self.entries[self.count].setDid(kv.key_ptr.*); 110 + self.entries[self.count].count = @intCast(@max(0, kv.value_ptr.integer)); 111 + self.count += 1; 112 + } 113 + } 114 + }; 10 115 11 116 pub const Stats = struct { 12 117 started_at: i64, ··· 33 138 // for lag trend tracking 34 139 prev_lag_ms: Atomic(i64), 35 140 141 + // top posters 142 + top_posters: TopPosters = .{}, 143 + posters_since: Atomic(i64) = Atomic(i64).init(0), // when tracking started 144 + 36 145 pub fn init() Stats { 37 146 var self = Stats{ 38 147 .started_at = std.time.timestamp(), ··· 60 169 const file = fs.openFileAbsolute(STATS_PATH, .{}) catch return; 61 170 defer file.close(); 62 171 63 - var buf: [4096]u8 = undefined; 172 + var buf: [16384]u8 = undefined; 64 173 const len = file.readAll(&buf) catch return; 65 174 if (len == 0) return; 66 175 ··· 100 209 self.multi_platform.store(@intCast(@max(0, v.integer)), .monotonic); 101 210 }; 102 211 212 + // load top posters 213 + if (root.get("top_posters")) |v| if (v == .object) { 214 + var iter = v.object.iterator(); 215 + while (iter.next()) |kv| { 216 + if (self.top_posters.count >= MAX_POSTERS) break; 217 + if (kv.value_ptr.* != .integer) continue; 218 + self.top_posters.entries[self.top_posters.count].setDid(kv.key_ptr.*); 219 + self.top_posters.entries[self.top_posters.count].count = @intCast(@max(0, kv.value_ptr.integer)); 220 + self.top_posters.count += 1; 221 + } 222 + }; 223 + if (root.get("posters_since")) |v| if (v == .integer) { 224 + self.posters_since.store(v.integer, .monotonic); 225 + }; 226 + 103 227 std.debug.print("loaded stats from {s}\n", .{STATS_PATH}); 104 228 } 105 229 ··· 111 235 const session_uptime: u64 = @intCast(@max(0, now - self.started_at)); 112 236 const total_uptime = self.prior_uptime + session_uptime; 113 237 114 - var buf: [768]u8 = undefined; 238 + // build top_posters json 239 + var posters_buf: [8192]u8 = undefined; 240 + var posters_len: usize = 0; 241 + { 242 + self.top_posters.mutex.lock(); 243 + defer self.top_posters.mutex.unlock(); 244 + 245 + posters_buf[0] = '{'; 246 + posters_len = 1; 247 + var first = true; 248 + for (self.top_posters.entries[0..self.top_posters.count]) |entry| { 249 + if (entry.count == 0) continue; 250 + const prefix: []const u8 = if (first) "\"" else ",\""; 251 + first = false; 252 + const written = std.fmt.bufPrint(posters_buf[posters_len..], "{s}{s}\":{d}", .{ 253 + prefix, 254 + entry.getDid(), 255 + entry.count, 256 + }) catch break; 257 + posters_len += written.len; 258 + } 259 + posters_buf[posters_len] = '}'; 260 + posters_len += 1; 261 + } 262 + 263 + var buf: [16384]u8 = undefined; 115 264 const data = std.fmt.bufPrint(&buf, 116 - \\{{"messages":{},"matches":{},"cumulative_uptime":{},"soundcloud":{},"bandcamp":{},"spotify":{},"plyr":{},"apple":{},"quote_matches":{},"multi_platform":{}}} 265 + \\{{"messages":{},"matches":{},"cumulative_uptime":{},"soundcloud":{},"bandcamp":{},"spotify":{},"plyr":{},"apple":{},"quote_matches":{},"multi_platform":{},"posters_since":{},"top_posters":{s}}} 117 266 , .{ 118 267 self.messages.load(.monotonic), 119 268 self.matches.load(.monotonic), ··· 125 274 self.apple.load(.monotonic), 126 275 self.quote_matches.load(.monotonic), 127 276 self.multi_platform.load(.monotonic), 277 + self.posters_since.load(.monotonic), 278 + posters_buf[0..posters_len], 128 279 }) catch return; 129 280 130 281 file.writeAll(data) catch return; ··· 192 343 193 344 pub fn recordMultiPlatform(self: *Stats) void { 194 345 _ = self.multi_platform.fetchAdd(1, .monotonic); 346 + } 347 + 348 + pub fn recordPoster(self: *Stats, did: []const u8) void { 349 + // set tracking start time on first poster 350 + if (self.posters_since.load(.monotonic) == 0) { 351 + self.posters_since.store(std.time.timestamp(), .monotonic); 352 + } 353 + self.top_posters.record(did); 354 + } 355 + 356 + pub fn getPostersSince(self: *Stats) i64 { 357 + return self.posters_since.load(.monotonic); 358 + } 359 + 360 + pub fn getTopPosters(self: *Stats, comptime N: usize) [N]PosterEntry { 361 + return self.top_posters.getTop(N); 362 + } 363 + 364 + pub fn getUniquePosterCount(self: *Stats) usize { 365 + self.top_posters.mutex.lock(); 366 + defer self.top_posters.mutex.unlock(); 367 + return self.top_posters.count; 368 + } 369 + 370 + /// returns concentration of top N posters as percentage of tracked poster posts 371 + pub fn getTopConcentration(self: *Stats, comptime N: usize) f64 { 372 + self.top_posters.mutex.lock(); 373 + defer self.top_posters.mutex.unlock(); 374 + 375 + // sum all tracked poster posts 376 + var total_tracked: u64 = 0; 377 + for (self.top_posters.entries[0..self.top_posters.count]) |entry| { 378 + total_tracked += entry.count; 379 + } 380 + if (total_tracked == 0) return 0; 381 + 382 + // sum top N 383 + var sorted: [MAX_POSTERS]PosterEntry = self.top_posters.entries; 384 + const slice = sorted[0..self.top_posters.count]; 385 + std.mem.sort(PosterEntry, slice, {}, struct { 386 + fn cmp(_: void, a: PosterEntry, b: PosterEntry) bool { 387 + return a.count > b.count; 388 + } 389 + }.cmp); 390 + 391 + var top_sum: u64 = 0; 392 + const limit = @min(N, self.top_posters.count); 393 + for (slice[0..limit]) |entry| { 394 + top_sum += entry.count; 395 + } 396 + 397 + return @as(f64, @floatFromInt(top_sum)) / @as(f64, @floatFromInt(total_tracked)) * 100.0; 195 398 } 196 399 197 400 pub fn getQuoteMatches(self: *const Stats) u64 {
+85 -42
src/store/posts.zig
··· 71 71 limit: usize, 72 72 author_filter: ?[]const []const u8, 73 73 ) !struct { posts: []Post, next_cursor: ?Cursor } { 74 + return queryWithExclude(alloc, cursor, limit, author_filter, null); 75 + } 76 + 77 + /// query posts with cursor-based pagination, optionally excluding specific authors 78 + pub fn queryWithExclude( 79 + alloc: std.mem.Allocator, 80 + cursor: ?Cursor, 81 + limit: usize, 82 + author_filter: ?[]const []const u8, 83 + exclude_dids: ?[]const []const u8, 84 + ) !struct { posts: []Post, next_cursor: ?Cursor } { 74 85 db.lock(); 75 86 defer db.unlock(); 76 87 ··· 84 95 85 96 var next_cursor: ?Cursor = null; 86 97 98 + // build dynamic query 99 + var query_buf: std.ArrayList(u8) = .empty; 100 + defer query_buf.deinit(alloc); 101 + const qw = query_buf.writer(alloc); 102 + 103 + try qw.writeAll("SELECT uri, cid, indexed_at FROM posts WHERE "); 104 + 105 + // author include filter 87 106 if (author_filter) |authors| { 88 107 if (authors.len == 0) { 89 108 return .{ .posts = try post_list.toOwnedSlice(alloc), .next_cursor = null }; 90 109 } 91 - 92 - // build IN clause query 93 - var query_buf: std.ArrayList(u8) = .empty; 94 - defer query_buf.deinit(alloc); 95 - const qw = query_buf.writer(alloc); 96 - 97 - try qw.writeAll("SELECT uri, cid, indexed_at FROM posts WHERE author_did IN ("); 98 - 110 + try qw.writeAll("author_did IN ("); 99 111 for (authors, 0..) |did, i| { 100 112 if (i > 0) try qw.writeAll(","); 101 113 try qw.print("'{s}'", .{did}); 102 114 } 115 + try qw.writeAll(") AND "); 116 + } 103 117 104 - try qw.print( 105 - ") AND (indexed_at < {d} OR (indexed_at = {d} AND cid < '{s}')) ORDER BY indexed_at DESC, cid DESC LIMIT {d}", 106 - .{ cursor_ts, cursor_ts, cursor_cid, limit }, 107 - ); 118 + // author exclude filter 119 + if (exclude_dids) |excludes| { 120 + if (excludes.len > 0) { 121 + try qw.writeAll("(author_did IS NULL OR author_did NOT IN ("); 122 + for (excludes, 0..) |did, i| { 123 + if (i > 0) try qw.writeAll(","); 124 + try qw.print("'{s}'", .{did}); 125 + } 126 + try qw.writeAll(")) AND "); 127 + } 128 + } 108 129 109 - try query_buf.append(alloc, 0); 110 - const query_z: [:0]const u8 = query_buf.items[0 .. query_buf.items.len - 1 :0]; 130 + // cursor condition and ordering 131 + try qw.print( 132 + "(indexed_at < {d} OR (indexed_at = {d} AND cid < '{s}')) ORDER BY indexed_at DESC, cid DESC LIMIT {d}", 133 + .{ cursor_ts, cursor_ts, cursor_cid, limit }, 134 + ); 111 135 112 - var rows = conn.rows(query_z, .{}) catch return error.QueryFailed; 113 - defer rows.deinit(); 136 + try query_buf.append(alloc, 0); 137 + const query_z: [:0]const u8 = query_buf.items[0 .. query_buf.items.len - 1 :0]; 114 138 115 - while (rows.next()) |row| { 116 - const uri = try alloc.dupe(u8, row.text(0)); 117 - const cid = try alloc.dupe(u8, row.text(1)); 118 - const ts = row.int(2); 139 + var rows = conn.rows(query_z, .{}) catch return error.QueryFailed; 140 + defer rows.deinit(); 119 141 120 - try post_list.append(alloc, .{ .uri = uri, .cid = cid, .indexed_at = ts }); 121 - next_cursor = .{ .timestamp = ts, .cid = cid }; 122 - } 123 - } else { 124 - var rows = conn.rows( 125 - \\SELECT uri, cid, indexed_at FROM posts 126 - \\WHERE indexed_at < ? OR (indexed_at = ? AND cid < ?) 127 - \\ORDER BY indexed_at DESC, cid DESC 128 - \\LIMIT ? 129 - , .{ cursor_ts, cursor_ts, cursor_cid, limit }) catch return error.QueryFailed; 130 - defer rows.deinit(); 131 - 132 - while (rows.next()) |row| { 133 - const uri = try alloc.dupe(u8, row.text(0)); 134 - const cid = try alloc.dupe(u8, row.text(1)); 135 - const ts = row.int(2); 142 + while (rows.next()) |row| { 143 + const uri = try alloc.dupe(u8, row.text(0)); 144 + const cid = try alloc.dupe(u8, row.text(1)); 145 + const ts = row.int(2); 136 146 137 - try post_list.append(alloc, .{ .uri = uri, .cid = cid, .indexed_at = ts }); 138 - next_cursor = .{ .timestamp = ts, .cid = cid }; 139 - } 147 + try post_list.append(alloc, .{ .uri = uri, .cid = cid, .indexed_at = ts }); 148 + next_cursor = .{ .timestamp = ts, .cid = cid }; 140 149 } 141 150 142 151 return .{ .posts = try post_list.toOwnedSlice(alloc), .next_cursor = next_cursor }; 143 152 } 144 153 145 - pub fn freeQueryResult(alloc: std.mem.Allocator, posts: []Post) void { 146 - for (posts) |p| { 154 + pub fn freeQueryResult(alloc: std.mem.Allocator, result_posts: []Post) void { 155 + for (result_posts) |p| { 147 156 alloc.free(p.uri); 148 157 alloc.free(p.cid); 149 158 } 150 - alloc.free(posts); 159 + alloc.free(result_posts); 160 + } 161 + 162 + /// get DIDs with more than threshold posts (for bot detection) 163 + pub fn getHighVolumePosters(alloc: std.mem.Allocator, threshold: usize) ![][]const u8 { 164 + db.lock(); 165 + defer db.unlock(); 166 + 167 + const conn = db.getConn() orelse return error.NotInitialized; 168 + 169 + var did_list: std.ArrayList([]const u8) = .empty; 170 + errdefer { 171 + for (did_list.items) |did| alloc.free(did); 172 + did_list.deinit(alloc); 173 + } 174 + 175 + var rows = conn.rows( 176 + \\SELECT author_did, COUNT(*) as cnt FROM posts 177 + \\WHERE author_did IS NOT NULL 178 + \\GROUP BY author_did 179 + \\HAVING cnt > ? 180 + , .{threshold}) catch return error.QueryFailed; 181 + defer rows.deinit(); 182 + 183 + while (rows.next()) |row| { 184 + const did = try alloc.dupe(u8, row.text(0)); 185 + try did_list.append(alloc, did); 186 + } 187 + 188 + return try did_list.toOwnedSlice(alloc); 189 + } 190 + 191 + pub fn freeDidList(alloc: std.mem.Allocator, dids: [][]const u8) void { 192 + for (dids) |did| alloc.free(did); 193 + alloc.free(dids); 151 194 } 152 195 153 196 // helpers