Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place

serialize work per did/rkey so out of order updates cant verwrite newer state with older data.

+48 -11
+31 -8
apps/firehose-service/src/lib/firehose.ts
··· 24 24 // Track firehose health 25 25 let lastEventTime = Date.now(); 26 26 let isConnected = false; 27 - let eventQueue: Array<() => Promise<void>> = []; 28 27 let activeHandlers = 0; 28 + let queuedHandlers = 0; 29 + const siteQueues = new Map<string, Promise<void>>(); 29 30 30 31 export function getFirehoseHealth() { 31 32 return { 32 33 connected: isConnected, 33 34 lastEventTime, 34 35 timeSinceLastEvent: Date.now() - lastEventTime, 35 - queueSize: eventQueue.length, 36 + queueSize: queuedHandlers, 36 37 activeHandlers, 37 38 healthy: isConnected && (Date.now() - lastEventTime < 60000), 38 39 }; ··· 56 57 } 57 58 58 59 /** 60 + * Schedule work so each site (did/rkey) is processed in event order. 61 + * This prevents stale writes when multiple updates arrive quickly. 62 + */ 63 + function scheduleSiteWork(siteKey: string, handler: () => Promise<void>): void { 64 + const previous = siteQueues.get(siteKey) ?? Promise.resolve(); 65 + queuedHandlers++; 66 + 67 + const next = previous 68 + .catch(() => undefined) 69 + .then(() => processWithConcurrencyLimit(handler)) 70 + .catch((err) => { 71 + logger.error(`[firehose] Unhandled site work error for ${siteKey}`, err); 72 + }) 73 + .finally(() => { 74 + queuedHandlers = Math.max(0, queuedHandlers - 1); 75 + if (siteQueues.get(siteKey) === next) { 76 + siteQueues.delete(siteKey); 77 + } 78 + }); 79 + 80 + siteQueues.set(siteKey, next); 81 + } 82 + 83 + /** 59 84 * Handle a firehose event 60 85 */ 61 86 async function handleEvent(evt: Event | CommitEvt): Promise<void> { ··· 77 102 // Handle place.wisp.fs events 78 103 if (collection === 'place.wisp.fs') { 79 104 logger.info(`[place.wisp.fs] Received ${commitEvt.event} event`, { did, rkey, cid: cid?.toString() || 'unknown' }); 80 - processWithConcurrencyLimit(async () => { 105 + const siteKey = `${did}/${rkey}`; 106 + scheduleSiteWork(siteKey, async () => { 81 107 try { 82 108 logger.debug(`[place.wisp.fs] Processing ${commitEvt.event} event`, { did, rkey }); 83 109 if (commitEvt.event === 'delete') { ··· 96 122 } catch (err) { 97 123 logger.error(`[place.wisp.fs] Error handling event`, err, { did, rkey, event: commitEvt.event }); 98 124 } 99 - }).catch(err => { 100 - logger.error(`[place.wisp.fs] Error processing event`, err, { did, rkey, event: commitEvt.event }); 101 125 }); 102 126 } 103 127 104 128 // Handle place.wisp.settings events 105 129 if (collection === 'place.wisp.settings') { 106 - processWithConcurrencyLimit(async () => { 130 + const siteKey = `${did}/${rkey}`; 131 + scheduleSiteWork(siteKey, async () => { 107 132 try { 108 133 if (commitEvt.event === 'delete') { 109 134 await handleSettingsDelete(did, rkey); ··· 114 139 } catch (err) { 115 140 logger.error(`[place.wisp.settings] Error handling event`, err, { did, rkey, event: commitEvt.event }); 116 141 } 117 - }).catch(err => { 118 - logger.error(`[place.wisp.settings] Error processing event`, err, { did, rkey, event: commitEvt.event }); 119 142 }); 120 143 } 121 144 } catch (err) {
+17 -3
apps/hosting-service/src/lib/file-serving.ts
··· 10 10 import { getCachedSettings } from './utils'; 11 11 import { loadRedirectRules, matchRedirectRule, parseCookies, parseQueryString } from './redirects'; 12 12 import { isHtmlContent, rewriteHtmlPaths } from './html-rewriter'; 13 - import { generate404Page, generateDirectoryListing } from './page-generators'; 13 + import { generate404Page, generateDirectoryListing, siteUpdatingResponse } from './page-generators'; 14 14 import { getIndexFiles, applyCustomHeaders } from './request-utils'; 15 15 import { cache } from './cache-manager'; 16 16 import { storage } from './storage'; ··· 65 65 return storage.exists(key); 66 66 } 67 67 68 - function buildStorageMissResponse(): Response { 68 + function shouldServeUpdatingPage(requestHeaders?: Record<string, string>): boolean { 69 + const accept = (requestHeaders?.accept ?? '').toLowerCase(); 70 + if (accept.includes('text/html') || accept.includes('application/xhtml+xml')) { 71 + return true; 72 + } 73 + 74 + const fetchDest = (requestHeaders?.['sec-fetch-dest'] ?? '').toLowerCase(); 75 + return fetchDest === 'document' || fetchDest === 'iframe' || fetchDest === 'frame'; 76 + } 77 + 78 + function buildStorageMissResponse(requestHeaders?: Record<string, string>): Response { 79 + if (shouldServeUpdatingPage(requestHeaders)) { 80 + return siteUpdatingResponse(); 81 + } 82 + 69 83 return new Response('Storage temporarily unavailable', { 70 84 status: 503, 71 85 headers: { ··· 368 382 if (!expectedMissPath) return null; 369 383 recordStorageMiss(expectedMissPath); 370 384 await enqueueRevalidate(did, rkey, `storage-miss:${expectedMissPath}`); 371 - return buildStorageMissResponse(); 385 + return buildStorageMissResponse(requestHeaders); 372 386 }; 373 387 374 388 const indexFiles = getIndexFiles(settings);