A tool for parsing traffic on the jetstream and applying a moderation workstream based on regexp based rules

fix: improve async error handling in main.ts

- Fixed unsafe type assertions for metricsServer variable
- Added proper type conversions for template literal expressions
- Replaced hasOwnProperty with Object.hasOwn for better practice
- Converted async event handlers to non-async with void for fire-and-forget
- Added comprehensive error type annotations throughout
- Used Promise.allSettled for concurrent operations
- Removed unused imports and applied destructuring

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

+45 -47
+45 -47
src/main.ts
··· 5 5 CommitUpdateEvent, 6 6 IdentityEvent } from '@skyware/jetstream'; 7 7 import { 8 - CommitUpdate, 9 8 Jetstream, 10 9 } from '@skyware/jetstream'; 11 10 ··· 23 22 import logger from './logger.js'; 24 23 import { startMetricsServer } from './metrics.js'; 25 24 import type { Post, LinkFeature } from './types.js'; 26 - import { Handle } from './types.js'; 27 25 28 26 let cursor = 0; 29 27 let cursorUpdateInterval: NodeJS.Timeout; ··· 35 33 try { 36 34 logger.info('Trying to read cursor from cursor.txt...'); 37 35 cursor = Number(fs.readFileSync('cursor.txt', 'utf8')); 38 - logger.info(`Cursor found: ${cursor} (${epochUsToDateTime(cursor)})`); 36 + logger.info(`Cursor found: ${cursor.toString()} (${epochUsToDateTime(cursor)})`); 39 37 } catch (error) { 40 38 if (error instanceof Error && 'code' in error && error.code === 'ENOENT') { 41 39 cursor = Math.floor(Date.now() * 1000); 42 40 logger.info( 43 - `Cursor not found in cursor.txt, setting cursor to: ${cursor} (${epochUsToDateTime(cursor)})`, 41 + `Cursor not found in cursor.txt, setting cursor to: ${cursor.toString()} (${epochUsToDateTime(cursor)})`, 44 42 ); 45 43 fs.writeFileSync('cursor.txt', cursor.toString(), 'utf8'); 46 44 } else { ··· 58 56 jetstream.on('open', () => { 59 57 if (jetstream.cursor) { 60 58 logger.info( 61 - `Connected to Jetstream at ${FIREHOSE_URL} with cursor ${jetstream.cursor} (${epochUsToDateTime(jetstream.cursor)})`, 59 + `Connected to Jetstream at ${FIREHOSE_URL} with cursor ${jetstream.cursor.toString()} (${epochUsToDateTime(jetstream.cursor)})`, 62 60 ); 63 61 } else { 64 62 logger.info( ··· 68 66 cursorUpdateInterval = setInterval(() => { 69 67 if (jetstream.cursor) { 70 68 logger.info( 71 - `Cursor updated to: ${jetstream.cursor} (${epochUsToDateTime(jetstream.cursor)})`, 69 + `Cursor updated to: ${jetstream.cursor.toString()} (${epochUsToDateTime(jetstream.cursor)})`, 72 70 ); 73 71 fs.writeFile('cursor.txt', jetstream.cursor.toString(), (err) => { 74 72 if (err) logger.error(err); ··· 90 88 91 89 jetstream.onCreate( 92 90 'app.bsky.feed.post', 93 - async (event: CommitCreateEvent<'app.bsky.feed.post'>) => { 91 + (event: CommitCreateEvent<'app.bsky.feed.post'>) => { 94 92 try { 95 93 const atURI = `at://${event.did}/app.bsky.feed.post/${event.commit.rkey}`; 96 - const hasFacets = event.commit.record.hasOwnProperty('facets'); 97 - const hasText = event.commit.record.hasOwnProperty('text'); 94 + const hasFacets = Object.hasOwn(event.commit.record, 'facets'); 95 + const hasText = Object.hasOwn(event.commit.record, 'text'); 98 96 99 97 const tasks: Promise<void>[] = []; 100 98 ··· 125 123 cid: event.commit.cid, 126 124 }, 127 125 ]; 128 - tasks.push(checkPosts(posts).catch((error) => { 126 + tasks.push(checkPosts(posts).catch((error: unknown) => { 129 127 logger.error(`Error checking post links for ${event.did}:`, error); 130 128 })); 131 129 }); ··· 141 139 cid: event.commit.cid, 142 140 }, 143 141 ]; 144 - tasks.push(checkPosts(posts).catch((error) => { 142 + tasks.push(checkPosts(posts).catch((error: unknown) => { 145 143 logger.error(`Error checking post text for ${event.did}:`, error); 146 144 })); 147 145 } 148 146 149 147 // Wait for all tasks to complete 150 148 if (tasks.length > 0) { 151 - await Promise.allSettled(tasks); 149 + void Promise.allSettled(tasks); 152 150 } 153 - } catch (error) { 151 + } catch (error: unknown) { 154 152 logger.error(`Error processing post event for ${event.did}:`, error); 155 153 } 156 154 }, ··· 159 157 // Check for profile updates 160 158 jetstream.onUpdate( 161 159 'app.bsky.actor.profile', 162 - async (event: CommitUpdateEvent<'app.bsky.actor.profile'>) => { 160 + (event: CommitUpdateEvent<'app.bsky.actor.profile'>) => { 163 161 try { 164 162 const tasks: Promise<void>[] = []; 165 163 ··· 169 167 170 168 tasks.push( 171 169 checkDescription(event.did, event.time_us, displayName, description) 172 - .catch((error) => { 170 + .catch((error: unknown) => { 173 171 logger.error(`Error checking profile description for ${event.did}:`, error); 174 172 }) 175 173 ); 176 174 177 175 tasks.push( 178 176 checkDisplayName(event.did, event.time_us, displayName, description) 179 - .catch((error) => { 177 + .catch((error: unknown) => { 180 178 logger.error(`Error checking profile display name for ${event.did}:`, error); 181 179 }) 182 180 ); ··· 185 183 if (event.commit.record.joinedViaStarterPack) { 186 184 tasks.push( 187 185 checkStarterPack(event.did, event.time_us, event.commit.record.joinedViaStarterPack.uri) 188 - .catch((error) => { 186 + .catch((error: unknown) => { 189 187 logger.error(`Error checking starter pack for ${event.did}:`, error); 190 188 }) 191 189 ); ··· 193 191 194 192 // Wait for all tasks to complete 195 193 if (tasks.length > 0) { 196 - await Promise.allSettled(tasks); 194 + void Promise.allSettled(tasks); 197 195 } 198 - } catch (error) { 196 + } catch (error: unknown) { 199 197 logger.error(`Error processing profile update event for ${event.did}:`, error); 200 198 } 201 199 }, ··· 205 203 206 204 jetstream.onCreate( 207 205 'app.bsky.actor.profile', 208 - async (event: CommitCreateEvent<'app.bsky.actor.profile'>) => { 206 + (event: CommitCreateEvent<'app.bsky.actor.profile'>) => { 209 207 try { 210 208 const tasks: Promise<void>[] = []; 211 209 ··· 215 213 216 214 tasks.push( 217 215 checkDescription(event.did, event.time_us, displayName, description) 218 - .catch((error) => { 216 + .catch((error: unknown) => { 219 217 logger.error(`Error checking profile description for ${event.did}:`, error); 220 218 }) 221 219 ); 222 220 223 221 tasks.push( 224 222 checkDisplayName(event.did, event.time_us, displayName, description) 225 - .catch((error) => { 223 + .catch((error: unknown) => { 226 224 logger.error(`Error checking profile display name for ${event.did}:`, error); 227 225 }) 228 226 ); ··· 230 228 if (event.commit.record.joinedViaStarterPack) { 231 229 tasks.push( 232 230 checkStarterPack(event.did, event.time_us, event.commit.record.joinedViaStarterPack.uri) 233 - .catch((error) => { 231 + .catch((error: unknown) => { 234 232 logger.error(`Error checking starter pack for ${event.did}:`, error); 235 233 }) 236 234 ); ··· 238 236 239 237 // Wait for all tasks to complete 240 238 if (tasks.length > 0) { 241 - await Promise.allSettled(tasks); 239 + void Promise.allSettled(tasks); 242 240 } 243 241 } 244 - } catch (error) { 242 + } catch (error: unknown) { 245 243 logger.error(`Error processing profile creation event for ${event.did}:`, error); 246 244 } 247 245 }, ··· 249 247 250 248 jetstream.onCreate( 251 249 'app.bsky.graph.starterpack', 252 - async (event: CommitCreateEvent<'app.bsky.graph.starterpack'>) => { 250 + (event: CommitCreateEvent<'app.bsky.graph.starterpack'>) => { 253 251 try { 254 252 const atURI = `at://${event.did}/app.bsky.feed.post/${event.commit.rkey}`; 255 - const name = event.commit.record.name ?? ''; 256 - const description = event.commit.record.description ?? ''; 253 + const { name, description } = event.commit.record; 257 254 258 - await checkNewStarterPack( 255 + void checkNewStarterPack( 259 256 event.did, 260 257 event.time_us, 261 258 atURI, 262 259 event.commit.cid, 263 260 name, 264 261 description, 265 - ).catch((error) => { 262 + ).catch((error: unknown) => { 266 263 logger.error(`Error checking new starter pack for ${event.did}:`, error); 267 264 }); 268 - } catch (error) { 265 + } catch (error: unknown) { 269 266 logger.error(`Error processing starter pack creation event for ${event.did}:`, error); 270 267 } 271 268 }, ··· 273 270 274 271 jetstream.onUpdate( 275 272 'app.bsky.graph.starterpack', 276 - async (event: CommitUpdateEvent<'app.bsky.graph.starterpack'>) => { 273 + (event: CommitUpdateEvent<'app.bsky.graph.starterpack'>) => { 277 274 try { 278 275 const atURI = `at://${event.did}/app.bsky.feed.post/${event.commit.rkey}`; 279 - const name = event.commit.record.name ?? ''; 280 - const description = event.commit.record.description ?? ''; 276 + const { name, description } = event.commit.record; 281 277 282 - await checkNewStarterPack( 278 + void checkNewStarterPack( 283 279 event.did, 284 280 event.time_us, 285 281 atURI, 286 282 event.commit.cid, 287 283 name, 288 284 description, 289 - ).catch((error) => { 285 + ).catch((error: unknown) => { 290 286 logger.error(`Error checking updated starter pack for ${event.did}:`, error); 291 287 }); 292 - } catch (error) { 288 + } catch (error: unknown) { 293 289 logger.error(`Error processing starter pack update event for ${event.did}:`, error); 294 290 } 295 291 }, 296 292 ); 297 293 298 294 // Check for handle updates 299 - jetstream.on('identity', async (event: IdentityEvent) => { 295 + jetstream.on('identity', (event: IdentityEvent) => { 300 296 try { 301 297 if (event.identity.handle) { 302 - await checkHandle(event.identity.did, event.identity.handle, event.time_us) 303 - .catch((error) => { 298 + void checkHandle(event.identity.did, event.identity.handle, event.time_us) 299 + .catch((error: unknown) => { 304 300 logger.error(`Error checking handle for ${event.identity.did}:`, error); 305 301 }); 306 302 } 307 - } catch (error) { 303 + } catch (error: unknown) { 308 304 logger.error(`Error processing identity event for ${event.identity.did}:`, error); 309 305 } 310 306 }); 311 307 312 308 // Start metrics server with error handling 313 - let metricsServer; 309 + let metricsServer: ReturnType<typeof startMetricsServer> | undefined; 314 310 try { 315 311 metricsServer = startMetricsServer(METRICS_PORT); 316 - logger.info(`Metrics server started on port ${METRICS_PORT}`); 317 - } catch (error) { 312 + logger.info(`Metrics server started on port ${METRICS_PORT.toString()}`); 313 + } catch (error: unknown) { 318 314 logger.error('Failed to start metrics server:', error); 319 315 process.exit(1); 320 316 } ··· 331 327 try { 332 328 jetstream.start(); 333 329 logger.info('Jetstream started successfully'); 334 - } catch (error) { 330 + } catch (error: unknown) { 335 331 logger.error('Failed to start jetstream:', error); 336 332 process.exit(1); 337 333 } ··· 344 340 } 345 341 jetstream.close(); 346 342 if (metricsServer) { 347 - metricsServer.close(); 343 + metricsServer.close(() => { 344 + logger.info('Metrics server closed'); 345 + }); 348 346 } 349 347 logger.info('Shutdown completed successfully'); 350 - } catch (error) { 348 + } catch (error: unknown) { 351 349 logger.error('Error shutting down gracefully:', error); 352 350 process.exit(1); 353 351 }