Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol.

logging + proper queue system on firehose events

+210 -99
+3 -3
apps/hosting-service/package.json
··· 3 3 "version": "1.0.0", 4 4 "type": "module", 5 5 "scripts": { 6 - "dev": "tsx --env-file=.env src/index.ts", 6 + "dev": "NODE_OPTIONS='--max-old-space-size=2048' tsx --env-file=.env src/index.ts", 7 7 "build": "bun run build.ts", 8 - "start": "tsx src/index.ts", 8 + "start": "NODE_OPTIONS='--max-old-space-size=2048' tsx src/index.ts", 9 9 "check": "tsc --noEmit", 10 - "backfill": "tsx src/index.ts --backfill" 10 + "backfill": "NODE_OPTIONS='--max-old-space-size=2048' tsx src/index.ts --backfill" 11 11 }, 12 12 "dependencies": { 13 13 "@atproto/api": "^0.17.4",
+6 -3
apps/hosting-service/src/index.ts
··· 80 80 } 81 81 82 82 // Add health check endpoint 83 - app.get('/health', (c) => { 83 + app.get('/health', async (c) => { 84 84 const firehoseHealth = firehose.getHealth(); 85 + const storageStats = await storage.getStats(); 86 + 85 87 return c.json({ 86 88 status: 'ok', 87 89 firehose: firehoseHealth, 90 + storage: storageStats, 88 91 }); 89 92 }); 90 93 ··· 120 123 // Graceful shutdown 121 124 process.on('SIGINT', async () => { 122 125 console.log('\n🛑 Shutting down...'); 123 - firehose.stop(); 126 + await firehose.stop(); 124 127 stopDomainCacheCleanup(); 125 128 await closeDatabase(); 126 129 server.close(); ··· 129 132 130 133 process.on('SIGTERM', async () => { 131 134 console.log('\n🛑 Shutting down...'); 132 - firehose.stop(); 135 + await firehose.stop(); 133 136 stopDomainCacheCleanup(); 134 137 await closeDatabase(); 135 138 server.close();
+2 -1
apps/hosting-service/src/lib/backfill.ts
··· 102 102 103 103 try { 104 104 // Download and cache site 105 + console.log(`[Backfill] Caching site from backfill - ${site.did}:${site.rkey}`); 105 106 await downloadAndCacheSite(site.did, site.rkey, siteData.record, pdsEndpoint, siteData.cid); 106 107 // Clear redirect rules cache since the site was updated 107 108 clearRedirectRulesCache(site.did, site.rkey); 108 109 stats.cached++; 109 110 processed++; 110 111 logger.info('Successfully cached site during backfill', { did: site.did, rkey: site.rkey }); 111 - console.log(`✅ [${processed}/${sites.length}] Cached: ${site.display_name || site.rkey}`); 112 + console.log(`✅ [${processed}/${sites.length}] Cached (backfill): ${site.display_name || site.rkey}`); 112 113 } finally { 113 114 // Always unmark, even if caching fails 114 115 unmarkSiteAsBeingCached(site.did, site.rkey);
+14 -5
apps/hosting-service/src/lib/file-serving.ts
··· 18 18 19 19 /** 20 20 * Helper to retrieve a file with metadata from tiered storage 21 + * Logs which tier the file was served from 21 22 */ 22 23 async function getFileWithMetadata(did: string, rkey: string, filePath: string) { 23 24 const key = `${did}/${rkey}/${filePath}`; 24 - return await storage.getWithMetadata(key); 25 + const result = await storage.getWithMetadata(key); 26 + 27 + if (result) { 28 + const tier = result.metadata?.tier || 'unknown'; 29 + const size = result.data ? (result.data as Uint8Array).length : 0; 30 + console.log(`[Storage] Served ${filePath} from ${tier} tier (${size} bytes) - ${did}:${rkey}`); 31 + } 32 + 33 + return result; 25 34 } 26 35 27 36 /** ··· 41 50 // Check for redirect rules first (_redirects wins over settings) 42 51 let redirectRules = getRedirectRulesFromCache(did, rkey); 43 52 44 - if (redirectRules === undefined) { 45 - // Load rules for the first time 53 + if (redirectRules === null) { 54 + // Load rules (not in cache or evicted) 46 55 redirectRules = await loadRedirectRules(did, rkey); 47 56 setRedirectRulesInCache(did, rkey, redirectRules); 48 57 } ··· 393 402 // Check for redirect rules first (_redirects wins over settings) 394 403 let redirectRules = getRedirectRulesFromCache(did, rkey); 395 404 396 - if (redirectRules === undefined) { 397 - // Load rules for the first time 405 + if (redirectRules === null) { 406 + // Load rules (not in cache or evicted) 398 407 redirectRules = await loadRedirectRules(did, rkey); 399 408 setRedirectRulesInCache(did, rkey, redirectRules); 400 409 }
+101 -53
apps/hosting-service/src/lib/firehose.ts
··· 23 23 private eventCount = 0 24 24 private cacheCleanupInterval: NodeJS.Timeout | null = null 25 25 private healthCheckInterval: NodeJS.Timeout | null = null 26 + private processingQueue: Set<Promise<void>> = new Set() 27 + private readonly maxConcurrency = parseInt(process.env.FIREHOSE_MAX_CONCURRENCY || '5', 10) 26 28 27 29 constructor( 28 30 private logger?: (msg: string, data?: Record<string, unknown>) => void ··· 36 38 log(`[FirehoseWorker] ${msg}`, data || {}) 37 39 } 38 40 41 + /** 42 + * Queue a task with concurrency limiting 43 + * Waits if max concurrent tasks are already running 44 + */ 45 + private async queueTask(task: () => Promise<void>): Promise<void> { 46 + // Wait if we're at max concurrency 47 + if (this.processingQueue.size >= this.maxConcurrency) { 48 + this.log(`Queue at max capacity (${this.maxConcurrency}), waiting for slot...`, { 49 + queueSize: this.processingQueue.size 50 + }) 51 + await Promise.race(this.processingQueue) 52 + } 53 + 54 + // Execute task and track in queue 55 + const promise = task() 56 + .catch(err => { 57 + // Errors are already logged in the handlers 58 + }) 59 + .finally(() => { 60 + this.processingQueue.delete(promise) 61 + }) 62 + 63 + this.processingQueue.add(promise) 64 + 65 + // Don't await here - we want handleEvent to return quickly 66 + // The task will process in the background with concurrency limiting 67 + } 68 + 39 69 private startCacheCleanup() { 40 70 // Clear IdResolver cache every hour to prevent unbounded memory growth 41 71 // The IdResolver has an internal cache that never expires and can cause heap exhaustion ··· 74 104 this.connect() 75 105 } 76 106 77 - stop() { 107 + async stop() { 78 108 this.log('Stopping firehose worker') 79 109 this.isShuttingDown = true 80 110 ··· 91 121 if (this.firehose) { 92 122 this.firehose.destroy() 93 123 this.firehose = null 124 + } 125 + 126 + // Wait for all queued tasks to complete 127 + if (this.processingQueue.size > 0) { 128 + this.log(`Waiting for ${this.processingQueue.size} queued tasks to complete...`) 129 + await Promise.all(this.processingQueue) 130 + this.log('All queued tasks completed') 94 131 } 95 132 } 96 133 ··· 130 167 rkey: evt.rkey 131 168 }) 132 169 133 - try { 134 - await this.handleCreateOrUpdate( 135 - evt.did, 136 - evt.rkey, 137 - record, 138 - evt.cid?.toString() 139 - ) 140 - } catch (err) { 141 - console.error('Full error details:', err); 142 - this.log('Error handling event', { 143 - did: evt.did, 144 - event: evt.event, 145 - rkey: evt.rkey, 146 - error: 147 - err instanceof Error 148 - ? err.message 149 - : String(err) 150 - }) 151 - } 170 + await this.queueTask(async () => { 171 + try { 172 + await this.handleCreateOrUpdate( 173 + evt.did, 174 + evt.rkey, 175 + record, 176 + evt.cid?.toString() 177 + ) 178 + } catch (err) { 179 + console.error('Full error details:', err); 180 + this.log('Error handling event', { 181 + did: evt.did, 182 + event: evt.event, 183 + rkey: evt.rkey, 184 + error: 185 + err instanceof Error 186 + ? err.message 187 + : String(err) 188 + }) 189 + } 190 + }) 152 191 } 153 192 // Handle settings changes 154 193 else if (evt.collection === 'place.wisp.settings') { ··· 158 197 rkey: evt.rkey 159 198 }) 160 199 161 - try { 162 - await this.handleSettingsChange(evt.did, evt.rkey) 163 - } catch (err) { 164 - this.log('Error handling settings change', { 165 - did: evt.did, 166 - event: evt.event, 167 - rkey: evt.rkey, 168 - error: 169 - err instanceof Error 170 - ? err.message 171 - : String(err) 172 - }) 173 - } 200 + await this.queueTask(async () => { 201 + try { 202 + await this.handleSettingsChange(evt.did, evt.rkey) 203 + } catch (err) { 204 + this.log('Error handling settings change', { 205 + did: evt.did, 206 + event: evt.event, 207 + rkey: evt.rkey, 208 + error: 209 + err instanceof Error 210 + ? err.message 211 + : String(err) 212 + }) 213 + } 214 + }) 174 215 } 175 216 } else if ( 176 217 evt.event === 'delete' && ··· 181 222 rkey: evt.rkey 182 223 }) 183 224 184 - try { 185 - await this.handleDelete(evt.did, evt.rkey) 186 - } catch (err) { 187 - this.log('Error handling delete', { 188 - did: evt.did, 189 - rkey: evt.rkey, 190 - error: 191 - err instanceof Error ? err.message : String(err) 192 - }) 193 - } 225 + await this.queueTask(async () => { 226 + try { 227 + await this.handleDelete(evt.did, evt.rkey) 228 + } catch (err) { 229 + this.log('Error handling delete', { 230 + did: evt.did, 231 + rkey: evt.rkey, 232 + error: 233 + err instanceof Error ? err.message : String(err) 234 + }) 235 + } 236 + }) 194 237 } else if ( 195 238 evt.event === 'delete' && 196 239 evt.collection === 'place.wisp.settings' ··· 200 243 rkey: evt.rkey 201 244 }) 202 245 203 - try { 204 - await this.handleSettingsChange(evt.did, evt.rkey) 205 - } catch (err) { 206 - this.log('Error handling settings delete', { 207 - did: evt.did, 208 - rkey: evt.rkey, 209 - error: 210 - err instanceof Error ? err.message : String(err) 211 - }) 212 - } 246 + await this.queueTask(async () => { 247 + try { 248 + await this.handleSettingsChange(evt.did, evt.rkey) 249 + } catch (err) { 250 + this.log('Error handling settings delete', { 251 + did: evt.did, 252 + rkey: evt.rkey, 253 + error: 254 + err instanceof Error ? err.message : String(err) 255 + }) 256 + } 257 + }) 213 258 } 214 259 }, 215 260 onError: (err: any) => { ··· 237 282 record: any, 238 283 eventCid?: string 239 284 ) { 285 + console.log(`[Firehose] Processing create/update from firehose - ${did}:${site}`) 240 286 this.log('Processing create/update', { did, site }) 241 287 242 288 // Record is already validated in handleEvent ··· 458 504 connected: isConnected, 459 505 lastEventTime: this.lastEventTime, 460 506 timeSinceLastEvent, 507 + queueSize: this.processingQueue.size, 508 + maxConcurrency: this.maxConcurrency, 461 509 healthy: isConnected && timeSinceLastEvent < 300000 // 5 minutes 462 510 } 463 511 }
+13 -7
apps/hosting-service/src/lib/site-cache.ts
··· 4 4 5 5 import { createLogger } from '@wisp/observability'; 6 6 import { fetchSiteRecord, getPdsForDid, downloadAndCacheSite, isCached } from './utils'; 7 - import { markSiteAsBeingCached, unmarkSiteAsBeingCached } from './cache'; 7 + import { markSiteAsBeingCached, unmarkSiteAsBeingCached, LRUCache } from './cache'; 8 8 import type { RedirectRule } from './redirects'; 9 9 10 10 const logger = createLogger('hosting-service'); 11 11 12 - // Cache for redirect rules (per site) 13 - const redirectRulesCache = new Map<string, RedirectRule[]>(); 12 + // Cache for redirect rules (per site) - LRU with 1000 site limit 13 + // Each entry is relatively small (array of redirect rules), so 1000 sites should be < 10MB 14 + const redirectRulesCache = new LRUCache<RedirectRule[]>(10 * 1024 * 1024, 1000); // 10MB, 1000 sites 14 15 15 16 /** 16 17 * Clear redirect rules cache for a specific site ··· 24 25 /** 25 26 * Get redirect rules from cache 26 27 */ 27 - export function getRedirectRulesFromCache(did: string, rkey: string): RedirectRule[] | undefined { 28 + export function getRedirectRulesFromCache(did: string, rkey: string): RedirectRule[] | null { 28 29 const cacheKey = `${did}:${rkey}`; 29 30 return redirectRulesCache.get(cacheKey); 30 31 } ··· 34 35 */ 35 36 export function setRedirectRulesInCache(did: string, rkey: string, rules: RedirectRule[]) { 36 37 const cacheKey = `${did}:${rkey}`; 37 - redirectRulesCache.set(cacheKey, rules); 38 + // Estimate size: roughly 100 bytes per rule 39 + const estimatedSize = rules.length * 100; 40 + redirectRulesCache.set(cacheKey, rules, estimatedSize); 38 41 } 39 42 40 43 /** ··· 43 46 */ 44 47 export async function ensureSiteCached(did: string, rkey: string): Promise<boolean> { 45 48 if (await isCached(did, rkey)) { 49 + console.log(`[Cache Hit] Site already cached - ${did}:${rkey}`); 46 50 return true; 47 51 } 48 52 49 53 // Fetch and cache the site 54 + console.log(`[On-Demand] Caching site on first request - ${did}:${rkey}`); 50 55 const siteData = await fetchSiteRecord(did, rkey); 51 56 if (!siteData) { 52 57 logger.error('Site record not found', null, { did, rkey }); ··· 66 71 await downloadAndCacheSite(did, rkey, siteData.record, pdsEndpoint, siteData.cid); 67 72 // Clear redirect rules cache since the site was updated 68 73 clearRedirectRulesCache(did, rkey); 69 - logger.info('Site cached successfully', { did, rkey }); 74 + logger.info('Site cached successfully (on-demand)', { did, rkey }); 75 + console.log(`[On-Demand] Successfully cached ${did}:${rkey}`); 70 76 return true; 71 77 } catch (err) { 72 - logger.error('Failed to cache site', err, { did, rkey }); 78 + logger.error('Failed to cache site on-demand', err, { did, rkey }); 73 79 return false; 74 80 } finally { 75 81 // Always unmark, even if caching fails
+13 -6
apps/hosting-service/src/lib/utils.ts
··· 404 404 // Fetch site settings (optional) 405 405 const settings = await fetchSiteSettings(did, rkey); 406 406 407 + // Determine if this is an incremental update or full cache 408 + const isIncremental = Object.keys(existingFileCids).length > 0; 409 + const updateType = isIncremental ? 'incremental update' : 'full cache'; 410 + console.log(`[Cache] Starting ${updateType} for ${did}:${rkey}`); 411 + 407 412 // Download files directly to tiered storage (with incremental logic) 408 413 await cacheFiles(did, rkey, expandedRoot.entries, pdsEndpoint, '', existingFileCids); 409 414 await saveCacheMetadata(did, rkey, recordCid, newFileCids, settings); 410 415 411 - console.log('Successfully cached site', did, rkey); 416 + console.log(`[Cache] Successfully cached site ${did}:${rkey} (${updateType})`); 412 417 } 413 418 414 419 ··· 422 427 ): Promise<void> { 423 428 // Collect file download tasks (skip unchanged files) 424 429 const downloadTasks: Array<() => Promise<void>> = []; 430 + let skippedCount = 0; 425 431 426 432 function collectFileTasks( 427 433 entries: Entry[], ··· 440 446 // Check if file is unchanged (same CID as existing cache) 441 447 if (cid && existingFileCids[currentPath] === cid) { 442 448 // File unchanged - skip download (already in tiered storage) 443 - console.log(`Skipping unchanged file: ${currentPath}`); 449 + skippedCount++; 444 450 } else { 445 451 // File new or changed - download it 446 452 downloadTasks.push(() => cacheFileBlob( ··· 460 466 461 467 collectFileTasks(entries, pathPrefix); 462 468 463 - console.log(`[Incremental Update] Files to download: ${downloadTasks.length}`); 469 + console.log(`[Incremental Update] Files to copy: ${skippedCount}, Files to download: ${downloadTasks.length}`); 464 470 465 471 // Download new/changed files concurrently 466 472 const downloadLimit = 20; ··· 554 560 metadata: customMetadata, 555 561 }); 556 562 557 - // Log completion 563 + // Log completion with tier info 564 + const tierInfo = 'to warm/cold tiers'; 558 565 if (encoding === 'gzip' && mimeType) { 559 - console.log('Cached file', filePath, content.length, 'bytes (gzipped,', mimeType + ')'); 566 + console.log(`[Cache] Stored ${filePath} ${tierInfo} (${content.length} bytes, gzipped, ${mimeType})`); 560 567 } else { 561 - console.log('Cached file', filePath, content.length, 'bytes'); 568 + console.log(`[Cache] Stored ${filePath} ${tierInfo} (${content.length} bytes)`); 562 569 } 563 570 } 564 571
+6
packages/@wisp/observability/.env.example
··· 27 27 # GRAFANA_PROMETHEUS_PASSWORD=your-password 28 28 29 29 # ============================================================================ 30 + # Optional: Override API paths (use defaults if not set) 31 + # ============================================================================ 32 + # GRAFANA_LOKI_PATH=/loki/api/v1/push 33 + # GRAFANA_PROMETHEUS_PATH=/v1/metrics 34 + 35 + # ============================================================================ 30 36 # Optional: Override service metadata 31 37 # ============================================================================ 32 38 # SERVICE_NAME=wisp-app
+5 -1
packages/@wisp/observability/package.json
··· 24 24 } 25 25 }, 26 26 "peerDependencies": { 27 - "hono": "^4.10.7" 27 + "hono": "^4.10.7", 28 + "elysia": "^1.0.0" 28 29 }, 29 30 "peerDependenciesMeta": { 30 31 "hono": { 32 + "optional": true 33 + }, 34 + "elysia": { 31 35 "optional": true 32 36 } 33 37 },
+39 -15
packages/@wisp/observability/src/exporters.ts
··· 4 4 */ 5 5 6 6 import type { LogEntry, ErrorEntry, MetricEntry } from './core' 7 - import { metrics, type MeterProvider } from '@opentelemetry/api' 7 + import { metrics, type MeterProvider, type Counter, type Histogram } from '@opentelemetry/api' 8 8 import { MeterProvider as SdkMeterProvider, PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics' 9 9 import { OTLPMetricExporter } from '@opentelemetry/exporter-metrics-otlp-http' 10 10 import { Resource } from '@opentelemetry/resources' 11 11 import { ATTR_SERVICE_NAME, ATTR_SERVICE_VERSION } from '@opentelemetry/semantic-conventions' 12 + import os from 'node:os' 13 + import { gzipSync } from 'node:zlib' 12 14 13 15 // ============================================================================ 14 16 // Types ··· 208 210 const [service, level] = key.split('-') 209 211 const values: Array<[string, string]> = entries.map(entry => { 210 212 const logLine = JSON.stringify({ 213 + _msg: entry.message, 211 214 message: entry.message, 212 215 context: entry.context, 213 216 traceId: entry.traceId, ··· 229 232 }) 230 233 } 231 234 232 - // Create streams for errors 233 - if (errors.length > 0) { 234 - const errorValues: Array<[string, string]> = errors.map(entry => { 235 + // Group errors by service (similar to logs) 236 + const errorGroups = new Map<string, ErrorEntry[]>() 237 + for (const error of errors) { 238 + const service = error.service 239 + const group = errorGroups.get(service) || [] 240 + group.push(error) 241 + errorGroups.set(service, group) 242 + } 243 + 244 + // Create streams for errors (one per service) 245 + for (const [service, entries] of errorGroups) { 246 + const errorValues: Array<[string, string]> = entries.map(entry => { 235 247 const logLine = JSON.stringify({ 248 + _msg: entry.message, 236 249 message: entry.message, 237 250 stack: entry.stack, 238 251 context: entry.context, ··· 245 258 246 259 streams.push({ 247 260 stream: { 248 - service: errors[0]?.service || 'unknown', 261 + service: service, 249 262 level: 'error', 250 263 job: this.config.serviceName || 'wisp-app', 251 264 type: 'aggregated_error' ··· 261 274 if (!this.config.lokiUrl) return 262 275 263 276 const headers: Record<string, string> = { 264 - 'Content-Type': 'application/json' 277 + 'Content-Type': 'application/json', 278 + 'Content-Encoding': 'gzip' 265 279 } 266 280 267 281 // Add authentication ··· 272 286 headers['Authorization'] = `Basic ${auth}` 273 287 } 274 288 275 - const response = await fetch(`${this.config.lokiUrl}/loki/api/v1/push`, { 289 + // Gzip compress the payload 290 + const jsonPayload = JSON.stringify(batch) 291 + const compressedPayload = gzipSync(jsonPayload) 292 + 293 + const lokiPath = process.env.GRAFANA_LOKI_PATH || '/loki/api/v1/push' 294 + const response = await fetch(`${this.config.lokiUrl}${lokiPath}`, { 276 295 method: 'POST', 277 296 headers, 278 - body: JSON.stringify(batch) 297 + body: compressedPayload 279 298 }) 280 299 281 300 if (!response.ok) { ··· 291 310 292 311 class MetricsExporter { 293 312 private meterProvider?: MeterProvider 294 - private requestCounter?: any 295 - private requestDuration?: any 296 - private errorCounter?: any 313 + private requestCounter?: Counter 314 + private requestDuration?: Histogram 315 + private errorCounter?: Counter 297 316 private config: GrafanaConfig = {} 298 317 299 318 initialize(config: GrafanaConfig) { ··· 302 321 if (!this.config.enabled || !this.config.prometheusUrl) return 303 322 304 323 // Create OTLP exporter with Prometheus endpoint 324 + const prometheusPath = process.env.GRAFANA_PROMETHEUS_PATH || '/v1/metrics' 305 325 const exporter = new OTLPMetricExporter({ 306 - url: `${this.config.prometheusUrl}/v1/metrics`, 326 + url: `${this.config.prometheusUrl}${prometheusPath}`, 307 327 headers: this.getAuthHeaders(), 308 - timeoutMillis: 10000 328 + timeoutMillis: 10000, 329 + compression: 'gzip' 309 330 }) 310 331 311 332 // Create meter provider with periodic exporting 333 + const hostname = os.hostname() 334 + const serviceName = this.config.serviceName || 'wisp-app' 312 335 const meterProvider = new SdkMeterProvider({ 313 336 resource: new Resource({ 314 - [ATTR_SERVICE_NAME]: this.config.serviceName || 'wisp-app', 315 - [ATTR_SERVICE_VERSION]: this.config.serviceVersion || '1.0.0' 337 + [ATTR_SERVICE_NAME]: serviceName, 338 + [ATTR_SERVICE_VERSION]: this.config.serviceVersion || '1.0.0', 339 + 'instance': `${serviceName}-${hostname}` 316 340 }), 317 341 readers: [ 318 342 new PeriodicExportingMetricReader({
+8 -5
packages/@wisp/observability/src/middleware/elysia.ts
··· 1 + import type { Context } from 'elysia' 1 2 import { metricsCollector, logCollector } from '../core' 2 3 3 4 /** ··· 6 7 */ 7 8 export function observabilityMiddleware(service: string) { 8 9 return { 9 - beforeHandle: ({ request }: any) => { 10 + beforeHandle: ({ request }: Context) => { 10 11 // Store start time on request object 11 12 (request as any).__startTime = Date.now() 12 13 }, 13 - afterHandle: ({ request, set }: any) => { 14 - const duration = Date.now() - ((request as any).__startTime || Date.now()) 14 + afterHandle: ({ request, set }: Context) => { 15 + const startTime = (request as any).__startTime || Date.now() 16 + const duration = Date.now() - startTime 15 17 const url = new URL(request.url) 16 18 17 19 metricsCollector.recordRequest( ··· 22 24 service 23 25 ) 24 26 }, 25 - onError: ({ request, error, set }: any) => { 26 - const duration = Date.now() - ((request as any).__startTime || Date.now()) 27 + onError: ({ request, error, set }: Context & { error: Error }) => { 28 + const startTime = (request as any).__startTime || Date.now() 29 + const duration = Date.now() - startTime 27 30 const url = new URL(request.url) 28 31 29 32 metricsCollector.recordRequest(