Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place

xack on going revalidation requests

+97 -6
+1
apps/firehose-service/.env.example
··· 7 7 8 8 # Redis (cache invalidation + revalidation queue) 9 9 REDIS_URL=redis://localhost:6379 10 + WISP_REVALIDATE_FAILURE_BACKOFF_SECONDS=600 10 11 11 12 # S3 Storage (leave empty for local disk fallback) 12 13 S3_BUCKET=
+52
apps/firehose-service/src/lib/cache-writer.ts
··· 33 33 } 34 34 } 35 35 36 + export class SiteBlobBackoffError extends Error { 37 + constructor( 38 + public readonly did: string, 39 + public readonly rkey: string, 40 + public readonly until: number, 41 + public readonly failures: number 42 + ) { 43 + super(`Site blob fetch backoff active until ${new Date(until).toISOString()}`); 44 + this.name = 'SiteBlobBackoffError'; 45 + } 46 + } 47 + 36 48 function isHttp500Error(err: unknown): boolean { 37 49 if (typeof err === 'object' && err !== null) { 38 50 const value = err as Record<string, unknown>; ··· 59 71 const until = Date.now() + BLOB_500_BACKOFF_MS; 60 72 blob500BackoffUntil.set(blobKey, until); 61 73 return until; 74 + } 75 + 76 + function getBlobBackoffUntil(error: unknown): number | null { 77 + if (error instanceof Blob500BackoffError) return error.until; 78 + return null; 62 79 } 63 80 64 81 /** ··· 691 708 const downloadFailures = await downloadFiles(filesToDownload); 692 709 const deleteFailures = await deleteKeys(keysToDelete); 693 710 711 + const incrementalBackoffUntil = downloadFailures.reduce<number | null>((maxUntil, failure) => { 712 + const until = getBlobBackoffUntil(failure.error); 713 + if (!until) return maxUntil; 714 + if (!maxUntil) return until; 715 + return Math.max(maxUntil, until); 716 + }, null); 717 + const allIncrementalDownloadsBackoffed = 718 + downloadFailures.length > 0 && 719 + downloadFailures.every((failure) => getBlobBackoffUntil(failure.error) !== null); 720 + 721 + if (allIncrementalDownloadsBackoffed && deleteFailures.length === 0 && incrementalBackoffUntil) { 722 + logger.warn(`Incremental sync blocked by blob backoff for ${did}/${rkey}`, { 723 + did, 724 + rkey, 725 + downloadFailures: downloadFailures.length, 726 + backoffUntil: new Date(incrementalBackoffUntil).toISOString(), 727 + }); 728 + throw new SiteBlobBackoffError(did, rkey, incrementalBackoffUntil, downloadFailures.length); 729 + } 730 + 694 731 // Recovery path: wipe site prefix and perform full rebuild if incremental had failures 695 732 if (downloadFailures.length > 0 || deleteFailures.length > 0) { 696 733 logger.warn(`Incremental sync failed for ${did}/${rkey}; falling back to full rebuild`, { ··· 718 755 719 756 const fullDownloadFailures = await downloadFiles(newFiles); 720 757 if (fullDownloadFailures.length > 0) { 758 + const fullBackoffUntil = fullDownloadFailures.reduce<number | null>((maxUntil, failure) => { 759 + const until = getBlobBackoffUntil(failure.error); 760 + if (!until) return maxUntil; 761 + if (!maxUntil) return until; 762 + return Math.max(maxUntil, until); 763 + }, null); 764 + const allFullDownloadsBackoffed = 765 + fullDownloadFailures.length > 0 && 766 + fullDownloadFailures.every((failure) => getBlobBackoffUntil(failure.error) !== null); 767 + 721 768 logger.error(`Full rebuild failed for ${did}/${rkey}`, undefined, { 722 769 did, 723 770 rkey, ··· 727 774 error: f.error instanceof Error ? f.error.message : String(f.error), 728 775 })), 729 776 }); 777 + 778 + if (allFullDownloadsBackoffed && fullBackoffUntil) { 779 + throw new SiteBlobBackoffError(did, rkey, fullBackoffUntil, fullDownloadFailures.length); 780 + } 781 + 730 782 throw new Error(`Full rebuild failed for ${did}/${rkey}`); 731 783 } 732 784 }
+44 -6
apps/firehose-service/src/lib/revalidate-worker.ts
··· 2 2 import os from 'os'; 3 3 import { createLogger } from '@wispplace/observability'; 4 4 import { config } from '../config'; 5 - import { fetchSiteRecord, handleSiteCreateOrUpdate } from './cache-writer'; 5 + import { SiteBlobBackoffError, fetchSiteRecord, handleSiteCreateOrUpdate } from './cache-writer'; 6 6 7 7 const logger = createLogger('firehose-service'); 8 8 const consumerName = process.env.WISP_REVALIDATE_CONSUMER || `${os.hostname()}:${process.pid}`; 9 9 const batchSize = Number.parseInt(process.env.WISP_REVALIDATE_BATCH_SIZE || '10', 10); 10 10 const claimIdleMs = Number.parseInt(process.env.WISP_REVALIDATE_CLAIM_IDLE_MS || '60000', 10); 11 11 const blockMs = Number.parseInt(process.env.WISP_REVALIDATE_BLOCK_MS || '5000', 10); 12 + const failureBackoffSeconds = parsePositiveInt(process.env.WISP_REVALIDATE_FAILURE_BACKOFF_SECONDS, 600); 12 13 13 14 let redis: Redis | null = null; 14 15 let running = false; 15 16 let loopPromise: Promise<void> | null = null; 17 + 18 + function parsePositiveInt(value: string | undefined, fallback: number): number { 19 + if (!value) return fallback; 20 + const parsed = Number.parseInt(value, 10); 21 + return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback; 22 + } 23 + 24 + function getFailureBackoffKey(did: string, rkey: string): string { 25 + return `revalidate:site:failure-backoff:${did}:${rkey}`; 26 + } 16 27 17 28 function parseFields(raw: string[]): Record<string, string> { 18 29 const fields: Record<string, string> = {}; ··· 42 53 43 54 logger.info(`[Revalidate] Received message ${id}: ${did}/${rkey} (${reason})`); 44 55 56 + const failureBackoffKey = getFailureBackoffKey(did, rkey); 57 + const activeBackoffTtl = await redis.ttl(failureBackoffKey); 58 + if (activeBackoffTtl > 0) { 59 + logger.info(`[Revalidate] Acking ${id}: ${did}/${rkey} site backoff active (${activeBackoffTtl}s remaining)`); 60 + await redis.xack(config.revalidateStream, config.revalidateGroup, id); 61 + return; 62 + } 63 + 45 64 const record = await fetchSiteRecord(did, rkey); 46 65 if (!record) { 47 66 logger.warn(`[Revalidate] Site record not found on PDS: ${did}/${rkey}`); ··· 53 72 const forceDownload = reason.startsWith('storage-miss'); 54 73 const forceRewriteHtml = reason.startsWith('rewrite-miss'); 55 74 56 - await handleSiteCreateOrUpdate(did, rkey, record.record, record.cid, { 57 - skipInvalidation: true, 58 - forceDownload, 59 - forceRewriteHtml, 60 - }); 75 + try { 76 + await handleSiteCreateOrUpdate(did, rkey, record.record, record.cid, { 77 + skipInvalidation: true, 78 + forceDownload, 79 + forceRewriteHtml, 80 + }); 81 + } catch (err) { 82 + if (err instanceof SiteBlobBackoffError) { 83 + const now = Date.now(); 84 + const until = Math.max(err.until, now + 1000); 85 + const ttlSeconds = Math.max(failureBackoffSeconds, Math.ceil((until - now) / 1000)); 86 + await redis.set(failureBackoffKey, until.toString(), 'EX', ttlSeconds); 87 + logger.warn(`[Revalidate] Blob backoff for ${did}/${rkey}; acking ${id} and suppressing retries`, { 88 + did, 89 + rkey, 90 + failures: err.failures, 91 + backoffUntil: new Date(until).toISOString(), 92 + ttlSeconds, 93 + }); 94 + await redis.xack(config.revalidateStream, config.revalidateGroup, id); 95 + return; 96 + } 97 + throw err; 98 + } 61 99 62 100 logger.info(`[Revalidate] Completed ${id}: ${did}/${rkey}`); 63 101 await redis.xack(config.revalidateStream, config.revalidateGroup, id);