Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol. wisp.place

Add negative cache for blob 500s

+657 -33
+8 -2
apps/firehose-service/src/config.ts
··· 28 28 revalidateGroup: process.env.WISP_REVALIDATE_GROUP || 'firehose-service', 29 29 30 30 // Mode 31 - isDbFillOnly: process.argv.includes('--db-fill-only'), 32 - isBackfill: process.argv.includes('--backfill') || process.argv.includes('--db-fill-only'), 31 + isDbFillOnly: 32 + process.argv.includes('--db-fill-only') || 33 + process.env.DB_FILL_ONLY === 'true', 34 + isBackfill: 35 + process.argv.includes('--backfill') || 36 + process.argv.includes('--db-fill-only') || 37 + process.env.BACKFILL === 'true' || 38 + process.env.DB_FILL_ONLY === 'true', 33 39 } as const;
+65 -1
apps/firehose-service/src/lib/cache-writer.ts
··· 19 19 import { publishCacheInvalidation } from './cache-invalidation'; 20 20 21 21 const logger = createLogger('firehose-service'); 22 + const BLOB_500_BACKOFF_MS = Number.parseInt(process.env.BLOB_500_BACKOFF_MS || `${10 * 60 * 1000}`, 10); 23 + const blob500BackoffUntil = new Map<string, number>(); 24 + 25 + class Blob500BackoffError extends Error { 26 + constructor( 27 + public readonly blobKey: string, 28 + public readonly until: number, 29 + public readonly originalError?: unknown 30 + ) { 31 + super(`Blob fetch backoff active until ${new Date(until).toISOString()}`); 32 + this.name = 'Blob500BackoffError'; 33 + } 34 + } 35 + 36 + function isHttp500Error(err: unknown): boolean { 37 + if (typeof err === 'object' && err !== null) { 38 + const value = err as Record<string, unknown>; 39 + const status = value.status ?? value.statusCode; 40 + if (typeof status === 'number' && status === 500) return true; 41 + if (typeof status === 'string' && status === '500') return true; 42 + } 43 + 44 + const msg = err instanceof Error ? err.message : String(err); 45 + return /\bHTTP\s*500\b/i.test(msg); 46 + } 47 + 48 + function getBackoffUntil(blobKey: string): number | null { 49 + const until = blob500BackoffUntil.get(blobKey); 50 + if (!until) return null; 51 + if (Date.now() >= until) { 52 + blob500BackoffUntil.delete(blobKey); 53 + return null; 54 + } 55 + return until; 56 + } 57 + 58 + function set500Backoff(blobKey: string): number { 59 + const until = Date.now() + BLOB_500_BACKOFF_MS; 60 + blob500BackoffUntil.set(blobKey, until); 61 + return until; 62 + } 22 63 23 64 /** 24 65 * Fetch a site record from the PDS ··· 400 441 pdsEndpoint: string 401 442 ): Promise<void> { 402 443 const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${encodeURIComponent(did)}&cid=${encodeURIComponent(file.cid)}`; 444 + const blobKey = `${did}:${file.cid}`; 445 + 446 + const backoffUntil = getBackoffUntil(blobKey); 447 + if (backoffUntil) { 448 + throw new Blob500BackoffError(blobKey, backoffUntil); 449 + } 403 450 404 451 logger.debug(`Downloading ${file.path}`); 405 452 406 - let content = await safeFetchBlob(blobUrl, { maxSize: MAX_BLOB_SIZE, timeout: 300000 }); 453 + let content: Uint8Array; 454 + try { 455 + content = await safeFetchBlob(blobUrl, { maxSize: MAX_BLOB_SIZE, timeout: 300000 }); 456 + } catch (err) { 457 + if (isHttp500Error(err)) { 458 + const until = set500Backoff(blobKey); 459 + logger.warn(`Caching blob HTTP 500 for ${BLOB_500_BACKOFF_MS}ms`, { 460 + did, 461 + rkey, 462 + path: file.path, 463 + cid: file.cid, 464 + backoffUntil: new Date(until).toISOString(), 465 + }); 466 + throw new Blob500BackoffError(blobKey, until, err); 467 + } 468 + throw err; 469 + } 470 + blob500BackoffUntil.delete(blobKey); 407 471 let encoding = file.encoding; 408 472 409 473 // Decode base64 if needed
+12 -2
apps/hosting-service/src/lib/db.ts
··· 28 28 const key = domain.toLowerCase(); 29 29 return cache.getOrFetch('customDomains', key, async () => { 30 30 const result = await sql<CustomDomainLookup[]>` 31 - SELECT id, domain, did, rkey, verified FROM custom_domains 32 - WHERE domain = ${key} AND verified = true LIMIT 1 31 + SELECT cd.id, cd.domain, cd.did, cd.rkey, cd.verified 32 + FROM custom_domains cd 33 + LEFT JOIN site_cache sc 34 + ON sc.did = cd.did 35 + AND sc.rkey = cd.rkey 36 + WHERE cd.domain = ${key} AND cd.verified = true 37 + ORDER BY 38 + (cd.rkey IS NOT NULL) DESC, 39 + (sc.did IS NOT NULL) DESC, 40 + cd.last_verified_at DESC NULLS LAST, 41 + cd.created_at DESC 42 + LIMIT 1 33 43 `; 34 44 return result[0] || null; 35 45 });
+29 -2
apps/hosting-service/src/lib/file-serving.ts
··· 409 409 filePath: string, 410 410 settings: WispSettings | null = null, 411 411 requestHeaders?: Record<string, string>, 412 - trace?: RequestTrace | null 412 + trace?: RequestTrace | null, 413 + allowExpectedMissRecovery = true, 413 414 ): Promise<Response> { 414 415 let expectedFileCids: Record<string, string> | null | undefined; 415 416 let expectedMissPath: string | null = null; ··· 435 436 } 436 437 }; 437 438 438 - const maybeReturnStorageMiss = async (): Promise<Response | null> => { 439 + const RECOVERED_RETRY = 'RECOVERED_RETRY' as const; 440 + const maybeReturnStorageMiss = async (): Promise<Response | typeof RECOVERED_RETRY | null> => { 439 441 if (!expectedMissPath) return null; 442 + 443 + if (allowExpectedMissRecovery) { 444 + logger.warn('Expected storage miss, attempting on-demand recovery before returning 503', { 445 + did, 446 + rkey, 447 + path: expectedMissPath, 448 + }); 449 + 450 + const recovered = await fetchAndCacheSite(did, rkey); 451 + if (recovered) { 452 + // Clear any per-site negative cache entries so retry can discover restored files. 453 + cache.deletePrefix('siteFiles', `${did}:${rkey}:`); 454 + return RECOVERED_RETRY; 455 + } 456 + } 457 + 440 458 recordStorageMiss(expectedMissPath); 441 459 await enqueueRevalidate(did, rkey, `storage-miss:${expectedMissPath}`); 442 460 return buildStorageMissResponse(requestHeaders); ··· 477 495 const directoryEntries = await listDirectoryEntries(did, rkey, requestPath, fileCids ? Object.keys(fileCids) : null); 478 496 if (directoryEntries.length > 0) { 479 497 const missResponse = await maybeReturnStorageMiss(); 498 + if (missResponse === RECOVERED_RETRY) { 499 + return serveFileInternal(did, rkey, filePath, settings, requestHeaders, trace, false); 500 + } 480 501 if (missResponse) return missResponse; 481 502 const html = generateDirectoryListing(requestPath, directoryEntries); 482 503 return new Response(html, { ··· 570 591 const rootEntries = await listDirectoryEntries(did, rkey, '', fileCids ? Object.keys(fileCids) : null); 571 592 if (rootEntries.length > 0) { 572 593 const missResponse = await maybeReturnStorageMiss(); 594 + if (missResponse === RECOVERED_RETRY) { 595 + return serveFileInternal(did, rkey, filePath, settings, requestHeaders, trace, false); 596 + } 573 597 if (missResponse) return missResponse; 574 598 const html = generateDirectoryListing('', rootEntries); 575 599 return new Response(html, { ··· 583 607 } 584 608 585 609 const missResponse = await maybeReturnStorageMiss(); 610 + if (missResponse === RECOVERED_RETRY) { 611 + return serveFileInternal(did, rkey, filePath, settings, requestHeaders, trace, false); 612 + } 586 613 if (missResponse) return missResponse; 587 614 588 615 // Last resort: if site not in DB at all, try on-demand fetch
+195 -14
apps/hosting-service/src/lib/on-demand-cache.ts
··· 26 26 27 27 // Track in-flight fetches to avoid duplicate work 28 28 const inFlightFetches = new Map<string, Promise<boolean>>(); 29 + const BLOB_500_BACKOFF_MS = 10 * 60 * 1000; 30 + const blob500BackoffUntil = new Map<string, number>(); 29 31 30 32 interface FileInfo { 31 33 path: string; ··· 36 38 base64?: boolean; 37 39 } 38 40 41 + class Blob500BackoffError extends Error { 42 + constructor( 43 + public readonly blobKey: string, 44 + public readonly until: number, 45 + public readonly originalError?: unknown 46 + ) { 47 + super(`Blob fetch backoff active until ${new Date(until).toISOString()}`); 48 + this.name = 'Blob500BackoffError'; 49 + } 50 + } 51 + 52 + function isHttp500Error(err: unknown): boolean { 53 + if (typeof err === 'object' && err !== null) { 54 + const value = err as Record<string, unknown>; 55 + const status = value.status ?? value.statusCode; 56 + if (typeof status === 'number' && status === 500) return true; 57 + if (typeof status === 'string' && status === '500') return true; 58 + } 59 + 60 + const msg = err instanceof Error ? err.message : String(err); 61 + return /\bHTTP\s*500\b/i.test(msg); 62 + } 63 + 64 + function getBackoffUntil(blobKey: string): number | null { 65 + const until = blob500BackoffUntil.get(blobKey); 66 + if (!until) return null; 67 + if (Date.now() >= until) { 68 + blob500BackoffUntil.delete(blobKey); 69 + return null; 70 + } 71 + return until; 72 + } 73 + 74 + function set500Backoff(blobKey: string): number { 75 + const until = Date.now() + BLOB_500_BACKOFF_MS; 76 + blob500BackoffUntil.set(blobKey, until); 77 + return until; 78 + } 79 + 80 + function formatUnknownError(err: unknown): Record<string, unknown> { 81 + if (err instanceof Error) { 82 + return { 83 + name: err.name, 84 + message: err.message, 85 + stack: err.stack, 86 + }; 87 + } 88 + 89 + if (typeof err === 'object' && err !== null) { 90 + const value = err as Record<string, unknown>; 91 + const out: Record<string, unknown> = {}; 92 + 93 + for (const key of ['name', 'message', 'code', 'status', 'statusCode']) { 94 + if (value[key] !== undefined) out[key] = value[key]; 95 + } 96 + 97 + try { 98 + out.raw = JSON.stringify(err); 99 + } catch { 100 + out.raw = String(err); 101 + } 102 + 103 + return out; 104 + } 105 + 106 + return { message: String(err) }; 107 + } 108 + 39 109 /** 40 110 * Attempt to fetch and cache a completely missing site on-demand. 41 111 * Returns true if the site was successfully cached, false otherwise. ··· 78 148 // Fetch site record from PDS 79 149 const pdsEndpoint = await getPdsForDid(did); 80 150 if (!pdsEndpoint) { 81 - logger.error('Could not resolve PDS', { did }); 151 + logger.error('Could not resolve PDS', undefined, { did }); 82 152 return false; 83 153 } 84 154 ··· 92 162 if (msg.includes('HTTP 404') || msg.includes('Not Found')) { 93 163 logger.info('Site record not found on PDS', { did, rkey }); 94 164 } else { 95 - logger.error('Failed to fetch site record', { did, rkey, error: msg }); 165 + logger.error('Failed to fetch site record', undefined, { did, rkey, error: msg }); 96 166 } 97 167 return false; 98 168 } ··· 101 171 const recordCid = data.cid || ''; 102 172 103 173 if (!record?.root?.entries) { 104 - logger.error('Invalid record structure', { did, rkey }); 174 + logger.error('Invalid record structure', undefined, { did, rkey }); 105 175 return false; 106 176 } 107 177 ··· 111 181 // Validate limits 112 182 const fileCount = countFilesInDirectory(expandedRoot); 113 183 if (fileCount > MAX_FILE_COUNT) { 114 - logger.error('Site exceeds file limit', { did, rkey, fileCount, maxFileCount: MAX_FILE_COUNT }); 184 + logger.error('Site exceeds file limit', undefined, { did, rkey, fileCount, maxFileCount: MAX_FILE_COUNT }); 115 185 return false; 116 186 } 117 187 118 188 const totalSize = calculateTotalBlobSize(expandedRoot); 119 189 const sizeLimit = await isSupporter(did) ? MAX_SITE_SIZE_SUPPORTER : MAX_SITE_SIZE; 120 190 if (totalSize > sizeLimit) { 121 - logger.error('Site exceeds size limit', { did, rkey, totalSize, sizeLimit }); 191 + logger.error('Site exceeds size limit', undefined, { did, rkey, totalSize, sizeLimit }); 122 192 return false; 123 193 } 124 194 ··· 133 203 const CONCURRENCY = 10; 134 204 let downloaded = 0; 135 205 let failed = 0; 206 + const downloadedPaths = new Set<string>(); 136 207 137 208 for (let i = 0; i < files.length; i += CONCURRENCY) { 138 209 const batch = files.slice(i, i + CONCURRENCY); ··· 140 211 batch.map(file => downloadAndWriteBlob(did, rkey, file, pdsEndpoint)) 141 212 ); 142 213 143 - for (const result of results) { 214 + results.forEach((result, idx) => { 215 + const file = batch[idx]; 144 216 if (result.status === 'fulfilled') { 145 217 downloaded++; 218 + if (file) downloadedPaths.add(file.path); 146 219 } else { 220 + if (result.reason instanceof Blob500BackoffError) { 221 + logger.warn('Skipping blob download due cached HTTP 500 backoff', { 222 + did, 223 + rkey, 224 + filePath: file?.path, 225 + cid: file?.cid, 226 + backoffUntil: new Date(result.reason.until).toISOString(), 227 + }); 228 + failed++; 229 + return; 230 + } 147 231 failed++; 148 - logger.error('Failed to download blob', { did, rkey, error: result.reason }); 232 + logger.error('Failed to download blob', undefined, { 233 + did, 234 + rkey, 235 + filePath: file?.path, 236 + cid: file?.cid, 237 + error: formatUnknownError(result.reason), 238 + }); 149 239 } 150 - } 240 + }); 151 241 } 152 242 153 243 logger.info('Downloaded files', { did, rkey, downloaded, failed }); 244 + if (failed > 0) { 245 + logger.warn('Partial on-demand cache: some blobs could not be downloaded', { 246 + did, 247 + rkey, 248 + downloaded, 249 + failed, 250 + }); 251 + } 154 252 155 - // Update DB with file CIDs so future storage misses can be detected 156 - await upsertSiteCache(did, rkey, recordCid, fileCids); 253 + // Keep site_cache aligned with what we actually fetched to avoid 254 + // expected-miss loops ("site is updating" forever) on permanently failing blobs. 255 + const successfullyCachedFileCids: Record<string, string> = {}; 256 + for (const path of downloadedPaths) { 257 + const cid = fileCids[path]; 258 + if (cid) { 259 + successfullyCachedFileCids[path] = cid; 260 + } 261 + } 262 + 263 + if (downloaded === 0) { 264 + logger.warn('On-demand cache could not fetch any blobs', { did, rkey, totalFiles: files.length }); 265 + return false; 266 + } 267 + 268 + // Update DB with file CIDs that are actually present in storage. 269 + await upsertSiteCache(did, rkey, recordCid, successfullyCachedFileCids); 157 270 158 271 // Enqueue revalidate so firehose-service backfills S3 (cold tier) 159 272 await enqueueRevalidate(did, rkey, `storage-miss:on-demand`); ··· 161 274 logger.info('Successfully cached site', { did, rkey, downloaded }); 162 275 return downloaded > 0; 163 276 } catch (err) { 164 - logger.error('Error caching site', { did, rkey, error: err }); 277 + logger.error('Error caching site', err, { did, rkey }); 165 278 return false; 166 279 } finally { 167 280 await releaseLock(lockKey); ··· 222 335 pdsEndpoint: string 223 336 ): Promise<void> { 224 337 const blobUrl = `${pdsEndpoint}/xrpc/com.atproto.sync.getBlob?did=${encodeURIComponent(did)}&cid=${encodeURIComponent(file.cid)}`; 338 + const blobKey = `${did}:${file.cid}`; 339 + 340 + const backoffUntil = getBackoffUntil(blobKey); 341 + if (backoffUntil) { 342 + throw new Blob500BackoffError(blobKey, backoffUntil); 343 + } 225 344 226 - let content = await safeFetchBlob(blobUrl, { maxSize: MAX_BLOB_SIZE, timeout: 300000 }); 345 + let content: Uint8Array; 346 + try { 347 + content = await safeFetchBlob(blobUrl, { maxSize: MAX_BLOB_SIZE, timeout: 300000 }); 348 + } catch (err) { 349 + if (isHttp500Error(err)) { 350 + const until = set500Backoff(blobKey); 351 + throw new Blob500BackoffError(blobKey, until, err); 352 + } 353 + throw err; 354 + } 355 + // Successful fetch clears any stale backoff for this blob. 356 + blob500BackoffUntil.delete(blobKey); 227 357 let encoding = file.encoding; 228 358 229 359 // Decode base64 if flagged 230 360 if (file.base64) { 231 361 const base64String = new TextDecoder().decode(content); 232 362 content = Buffer.from(base64String, 'base64'); 363 + } else if (isTextLikeMime(file.mimeType, file.path)) { 364 + // Heuristic fallback: some records omit base64 flag but content is base64 text 365 + const decoded = tryDecodeBase64(content); 366 + if (decoded) { 367 + logger.warn(`Decoded base64 fallback for ${file.path} (base64 flag missing)`, { did, rkey }); 368 + content = decoded; 369 + } 233 370 } 234 371 235 372 // Decompress if needed and shouldn't stay compressed ··· 240 377 try { 241 378 content = gunzipSync(content); 242 379 encoding = undefined; 243 - } catch { 244 - // Keep gzipped if decompression fails 380 + } catch (error) { 381 + logger.warn(`Failed to decompress ${file.path}, storing gzipped`, { did, rkey, error }); 382 + } 383 + } else if (encoding === 'gzip' && content.length >= 2 && 384 + !(content[0] === 0x1f && content[1] === 0x8b)) { 385 + // If marked gzip but doesn't look gzipped, attempt base64 decode and retry 386 + const decoded = tryDecodeBase64(content); 387 + if (decoded && decoded.length >= 2 && decoded[0] === 0x1f && decoded[1] === 0x8b) { 388 + logger.warn(`Decoded base64+gzip fallback for ${file.path}`, { did, rkey }); 389 + try { 390 + content = gunzipSync(decoded); 391 + encoding = undefined; 392 + } catch (error) { 393 + logger.warn(`Failed to decompress base64+gzip fallback for ${file.path}, storing gzipped`, { did, rkey, error }); 394 + content = decoded; 395 + } 245 396 } 246 397 } 247 398 ··· 286 437 lower.endsWith('.xml') || 287 438 lower.endsWith('.svg'); 288 439 } 440 + 441 + function looksLikeBase64(content: Uint8Array): boolean { 442 + if (content.length === 0) return false; 443 + let nonWhitespace = 0; 444 + for (const byte of content) { 445 + if (byte === 0x0a || byte === 0x0d || byte === 0x20 || byte === 0x09) { 446 + continue; 447 + } 448 + nonWhitespace++; 449 + const isBase64Char = 450 + (byte >= 0x41 && byte <= 0x5a) || // A-Z 451 + (byte >= 0x61 && byte <= 0x7a) || // a-z 452 + (byte >= 0x30 && byte <= 0x39) || // 0-9 453 + byte === 0x2b || // + 454 + byte === 0x2f || // / 455 + byte === 0x3d; // = 456 + if (!isBase64Char) return false; 457 + } 458 + return nonWhitespace % 4 === 0; 459 + } 460 + 461 + function tryDecodeBase64(content: Uint8Array): Uint8Array | null { 462 + if (!looksLikeBase64(content)) return null; 463 + const base64String = new TextDecoder().decode(content).replace(/\s+/g, ''); 464 + try { 465 + return Buffer.from(base64String, 'base64'); 466 + } catch { 467 + return null; 468 + } 469 + }
+54 -2
apps/main-app/src/lib/dns-verification-worker.ts
··· 30 30 } 31 31 } 32 32 33 + private async cleanupDuplicateDomainRows(): Promise<number> { 34 + const rows = await db<Array<{ removed: number | string }>>` 35 + WITH ranked AS ( 36 + SELECT 37 + ctid, 38 + ROW_NUMBER() OVER ( 39 + PARTITION BY domain 40 + ORDER BY 41 + verified DESC, 42 + (rkey IS NOT NULL) DESC, 43 + last_verified_at DESC NULLS LAST, 44 + created_at DESC, 45 + id DESC 46 + ) AS rn 47 + FROM custom_domains 48 + ), 49 + deleted AS ( 50 + DELETE FROM custom_domains cd 51 + USING ranked r 52 + WHERE cd.ctid = r.ctid 53 + AND r.rn > 1 54 + RETURNING 1 55 + ) 56 + SELECT COUNT(*)::int AS removed FROM deleted 57 + `; 58 + 59 + const value = rows[0]?.removed ?? 0; 60 + return typeof value === 'string' ? Number.parseInt(value, 10) : value; 61 + } 62 + 33 63 async start() { 34 64 if (this.isRunning) { 35 65 this.log('DNS verification worker already running'); ··· 71 101 }; 72 102 73 103 try { 104 + const removed = await this.cleanupDuplicateDomainRows(); 105 + if (removed > 0) { 106 + this.log('Cleaned duplicate custom domain rows before verification', { removed }); 107 + } 108 + 74 109 // Get all custom domains (both verified and pending) 75 110 const domains = await db<Array<{ 76 111 id: string; ··· 78 113 did: string; 79 114 verified: boolean; 80 115 }>>` 81 - SELECT id, domain, did, verified FROM custom_domains 116 + SELECT DISTINCT ON (domain) id, domain, did, verified 117 + FROM custom_domains 118 + ORDER BY 119 + domain, 120 + verified DESC, 121 + (rkey IS NOT NULL) DESC, 122 + last_verified_at DESC NULLS LAST, 123 + created_at DESC, 124 + id DESC 82 125 `; 83 126 84 127 if (!domains || domains.length === 0) { ··· 107 150 // Double-check: ensure this record is still the current owner in database 108 151 // This prevents race conditions where domain ownership changed during verification 109 152 const currentOwner = await db<Array<{ id: string; did: string; verified: boolean }>>` 110 - SELECT id, did, verified FROM custom_domains WHERE domain = ${domain} 153 + SELECT id, did, verified 154 + FROM custom_domains 155 + WHERE domain = ${domain} 156 + ORDER BY 157 + verified DESC, 158 + (rkey IS NOT NULL) DESC, 159 + last_verified_at DESC NULLS LAST, 160 + created_at DESC, 161 + id DESC 162 + LIMIT 1 111 163 `; 112 164 113 165 const isStillOwner = currentOwner.length > 0 && currentOwner[0].id === id;
+4
apps/main-app/src/lib/migrations.ts
··· 91 91 `; 92 92 }); 93 93 94 + await runMigration("ensure unique custom_domains.domain", async () => { 95 + await db`CREATE UNIQUE INDEX IF NOT EXISTS idx_custom_domains_domain_unique ON custom_domains(domain)`; 96 + }, { silent: true }); 97 + 94 98 // Enforce mapped site rkeys belong to same DID as mapped domain. 95 99 await runMigration("add fk_domains_site_owner", async () => { 96 100 await db`
+150 -10
packages/@wispplace/tiered-storage/src/tiers/S3StorageTier.ts
··· 7 7 ListObjectsV2Command, 8 8 DeleteObjectsCommand, 9 9 CopyObjectCommand, 10 + type GetObjectCommandOutput, 11 + type HeadObjectCommandOutput, 10 12 type S3ClientConfig, 11 13 } from '@aws-sdk/client-s3'; 12 14 import { Upload } from '@aws-sdk/lib-storage'; 13 15 import type { Readable } from 'node:stream'; 16 + import { lookup as mimeLookup } from 'mime-types'; 14 17 import type { 15 18 StorageTier, 16 19 StorageMetadata, ··· 177 180 }), 178 181 ); 179 182 180 - if (!response.Body || !response.Metadata) { 183 + if (!response.Body) { 181 184 return null; 182 185 } 183 186 184 - const data = await this.streamToUint8Array(response.Body as Readable); 185 - const metadata = this.s3ToMetadata(response.Metadata); 187 + const rawData = await this.streamToUint8Array(response.Body as Readable); 188 + const { data, metadata } = response.Metadata 189 + ? { data: rawData, metadata: this.s3ToMetadata(response.Metadata) } 190 + : this.recoverDataAndMetadataFromObject(key, rawData, response); 186 191 187 192 return { data, metadata }; 188 193 } catch (error) { ··· 214 219 }), 215 220 ); 216 221 217 - if (!response.Body || !response.Metadata) { 222 + if (!response.Body) { 218 223 return null; 219 224 } 220 225 221 - const metadata = this.s3ToMetadata(response.Metadata); 226 + const metadata = response.Metadata 227 + ? this.s3ToMetadata(response.Metadata) 228 + : this.metadataFromObjectResponse(key, response.ContentLength, response); 222 229 223 230 return { stream: response.Body as Readable, metadata }; 224 231 } catch (error) { ··· 390 397 }), 391 398 ); 392 399 393 - if (!response.Metadata) { 394 - return null; 395 - } 396 - 397 - return this.s3ToMetadata(response.Metadata); 400 + return response.Metadata 401 + ? this.s3ToMetadata(response.Metadata) 402 + : this.metadataFromObjectResponse(key, response.ContentLength, response); 398 403 } catch (error) { 399 404 if (this.isNoSuchKeyError(error)) { 400 405 return null; ··· 471 476 return s3Key.slice(this.prefix.length); 472 477 } 473 478 return s3Key; 479 + } 480 + 481 + /** 482 + * Build conservative metadata when object metadata headers are missing. 483 + */ 484 + private metadataFromObjectResponse( 485 + key: string, 486 + size: number | undefined, 487 + response: GetObjectCommandOutput | HeadObjectCommandOutput, 488 + inferred?: { mimeType?: string; encoding?: string }, 489 + ): StorageMetadata { 490 + const now = new Date(); 491 + const customMetadata: Record<string, string> = {}; 492 + 493 + const mimeType = inferred?.mimeType ?? this.normalizeContentType(response.ContentType); 494 + if (mimeType) { 495 + customMetadata.mimeType = mimeType; 496 + } 497 + const encoding = inferred?.encoding ?? response.ContentEncoding; 498 + if (encoding) { 499 + customMetadata.encoding = encoding; 500 + } 501 + 502 + const rawChecksum = typeof response.ETag === 'string' ? response.ETag.replace(/"/g, '') : ''; 503 + 504 + return { 505 + key, 506 + size: Math.max(0, size ?? 0), 507 + createdAt: now, 508 + lastAccessed: now, 509 + accessCount: 0, 510 + compressed: false, 511 + checksum: rawChecksum, 512 + ...(Object.keys(customMetadata).length > 0 && { customMetadata }), 513 + }; 514 + } 515 + 516 + /** 517 + * Recover legacy/partial objects when S3 metadata headers are absent. 518 + * Mirrors firehose heuristics: base64 decode for text-like files and gzip detection. 519 + */ 520 + private recoverDataAndMetadataFromObject( 521 + key: string, 522 + data: Uint8Array, 523 + response: GetObjectCommandOutput, 524 + ): { data: Uint8Array; metadata: StorageMetadata } { 525 + const mimeType = this.normalizeContentType(response.ContentType) ?? this.mimeTypeFromKey(key); 526 + let recovered = data; 527 + let inferredEncoding: string | undefined; 528 + 529 + if (this.isTextLikeMime(mimeType, key)) { 530 + const decoded = this.tryDecodeBase64(recovered); 531 + if (decoded) { 532 + recovered = decoded; 533 + } 534 + } 535 + 536 + if (!inferredEncoding && this.shouldDetectGzip(mimeType, key) && this.isGzip(recovered)) { 537 + inferredEncoding = 'gzip'; 538 + } 539 + 540 + const metadata = this.metadataFromObjectResponse(key, recovered.length, response, { 541 + ...(mimeType ? { mimeType } : {}), 542 + ...(inferredEncoding ? { encoding: inferredEncoding } : {}), 543 + }); 544 + return { data: recovered, metadata }; 545 + } 546 + 547 + private normalizeContentType(contentType?: string): string | undefined { 548 + if (!contentType) return undefined; 549 + return contentType.split(';')[0]?.trim() || undefined; 550 + } 551 + 552 + private mimeTypeFromKey(key: string): string | undefined { 553 + const guessed = mimeLookup(key); 554 + return typeof guessed === 'string' ? guessed : undefined; 555 + } 556 + 557 + private shouldDetectGzip(mimeType?: string, key?: string): boolean { 558 + if (mimeType) { 559 + if (mimeType.startsWith('text/')) return true; 560 + if (mimeType === 'application/javascript') return true; 561 + if (mimeType === 'application/json') return true; 562 + if (mimeType === 'application/xml') return true; 563 + if (mimeType === 'image/svg+xml') return true; 564 + } 565 + if (!key) return false; 566 + const lower = key.toLowerCase(); 567 + return ( 568 + lower.endsWith('.html') || 569 + lower.endsWith('.htm') || 570 + lower.endsWith('.css') || 571 + lower.endsWith('.js') || 572 + lower.endsWith('.json') || 573 + lower.endsWith('.xml') || 574 + lower.endsWith('.svg') 575 + ); 576 + } 577 + 578 + private isTextLikeMime(mimeType?: string, key?: string): boolean { 579 + return this.shouldDetectGzip(mimeType, key); 580 + } 581 + 582 + private isGzip(content: Uint8Array): boolean { 583 + return content.length >= 2 && content[0] === 0x1f && content[1] === 0x8b; 584 + } 585 + 586 + private looksLikeBase64(content: Uint8Array): boolean { 587 + if (content.length === 0) return false; 588 + let nonWhitespace = 0; 589 + for (const byte of content) { 590 + if (byte === 0x0a || byte === 0x0d || byte === 0x20 || byte === 0x09) { 591 + continue; 592 + } 593 + nonWhitespace++; 594 + const isBase64Char = 595 + (byte >= 0x41 && byte <= 0x5a) || // A-Z 596 + (byte >= 0x61 && byte <= 0x7a) || // a-z 597 + (byte >= 0x30 && byte <= 0x39) || // 0-9 598 + byte === 0x2b || // + 599 + byte === 0x2f || // / 600 + byte === 0x3d; // = 601 + if (!isBase64Char) return false; 602 + } 603 + return nonWhitespace % 4 === 0; 604 + } 605 + 606 + private tryDecodeBase64(content: Uint8Array): Uint8Array | null { 607 + if (!this.looksLikeBase64(content)) return null; 608 + const base64String = new TextDecoder().decode(content).replace(/\s+/g, ''); 609 + try { 610 + return Buffer.from(base64String, 'base64'); 611 + } catch { 612 + return null; 613 + } 474 614 } 475 615 476 616 /**
+140
packages/@wispplace/tiered-storage/test/S3StorageTier.test.ts
··· 1 + import { describe, it, expect } from 'vitest'; 2 + import { Readable } from 'node:stream'; 3 + import { gzipSync } from 'node:zlib'; 4 + import { S3StorageTier } from '../src/tiers/S3StorageTier.js'; 5 + 6 + describe('S3StorageTier metadata fallback', () => { 7 + it('getWithMetadata should synthesize metadata when S3 metadata headers are missing', async () => { 8 + const tier = new S3StorageTier({ 9 + bucket: 'test-bucket', 10 + region: 'us-east-1', 11 + }); 12 + 13 + (tier as any).client = { 14 + send: async () => ({ 15 + Body: Readable.from([Buffer.from('hello')]), 16 + Metadata: undefined, 17 + ContentLength: 5, 18 + ContentType: 'text/plain', 19 + ContentEncoding: 'gzip', 20 + ETag: '"etag-123"', 21 + }), 22 + }; 23 + 24 + const result = await tier.getWithMetadata('did:plc:abc/site/index.html'); 25 + 26 + expect(result).not.toBeNull(); 27 + expect(Buffer.from(result!.data).toString()).toBe('hello'); 28 + expect(result!.metadata.key).toBe('did:plc:abc/site/index.html'); 29 + expect(result!.metadata.size).toBe(5); 30 + expect(result!.metadata.checksum).toBe('etag-123'); 31 + expect(result!.metadata.customMetadata).toEqual({ 32 + mimeType: 'text/plain', 33 + encoding: 'gzip', 34 + }); 35 + }); 36 + 37 + it('getStream should synthesize metadata when S3 metadata headers are missing', async () => { 38 + const tier = new S3StorageTier({ 39 + bucket: 'test-bucket', 40 + region: 'us-east-1', 41 + }); 42 + 43 + (tier as any).client = { 44 + send: async () => ({ 45 + Body: Readable.from([Buffer.from('abc')]), 46 + Metadata: undefined, 47 + ContentLength: 3, 48 + ETag: '"etag-stream"', 49 + }), 50 + }; 51 + 52 + const result = await tier.getStream('did:plc:abc/site/style.css'); 53 + 54 + expect(result).not.toBeNull(); 55 + expect(result!.metadata.key).toBe('did:plc:abc/site/style.css'); 56 + expect(result!.metadata.size).toBe(3); 57 + expect(result!.metadata.checksum).toBe('etag-stream'); 58 + }); 59 + 60 + it('getMetadata should synthesize metadata from HeadObject when headers are missing', async () => { 61 + const tier = new S3StorageTier({ 62 + bucket: 'test-bucket', 63 + region: 'us-east-1', 64 + }); 65 + 66 + (tier as any).client = { 67 + send: async () => ({ 68 + Metadata: undefined, 69 + ContentLength: 42, 70 + ContentType: 'text/html', 71 + ETag: '"etag-head"', 72 + }), 73 + }; 74 + 75 + const metadata = await tier.getMetadata('did:plc:abc/site/page.html'); 76 + 77 + expect(metadata).not.toBeNull(); 78 + expect(metadata!.key).toBe('did:plc:abc/site/page.html'); 79 + expect(metadata!.size).toBe(42); 80 + expect(metadata!.checksum).toBe('etag-head'); 81 + expect(metadata!.customMetadata).toEqual({ mimeType: 'text/html' }); 82 + }); 83 + 84 + it('getWithMetadata should infer gzip encoding from magic bytes for text-like content', async () => { 85 + const tier = new S3StorageTier({ 86 + bucket: 'test-bucket', 87 + region: 'us-east-1', 88 + }); 89 + 90 + const gzipped = gzipSync(Buffer.from('<html>ok</html>')); 91 + 92 + (tier as any).client = { 93 + send: async () => ({ 94 + Body: Readable.from([gzipped]), 95 + Metadata: undefined, 96 + ContentLength: gzipped.length, 97 + ContentType: 'text/html; charset=utf-8', 98 + ETag: '"etag-gzip"', 99 + }), 100 + }; 101 + 102 + const result = await tier.getWithMetadata('did:plc:abc/site/index.html'); 103 + 104 + expect(result).not.toBeNull(); 105 + expect(Buffer.from(result!.data).equals(gzipped)).toBe(true); 106 + expect(result!.metadata.customMetadata).toEqual({ 107 + mimeType: 'text/html', 108 + encoding: 'gzip', 109 + }); 110 + }); 111 + 112 + it('getWithMetadata should decode base64 payload and infer gzip encoding for text-like content', async () => { 113 + const tier = new S3StorageTier({ 114 + bucket: 'test-bucket', 115 + region: 'us-east-1', 116 + }); 117 + 118 + const gzipped = gzipSync(Buffer.from('console.log("ok")')); 119 + const base64 = Buffer.from(gzipped).toString('base64'); 120 + 121 + (tier as any).client = { 122 + send: async () => ({ 123 + Body: Readable.from([Buffer.from(base64)]), 124 + Metadata: undefined, 125 + ContentLength: base64.length, 126 + ContentType: 'application/javascript', 127 + ETag: '"etag-base64-gzip"', 128 + }), 129 + }; 130 + 131 + const result = await tier.getWithMetadata('did:plc:abc/site/app.js'); 132 + 133 + expect(result).not.toBeNull(); 134 + expect(Buffer.from(result!.data).equals(gzipped)).toBe(true); 135 + expect(result!.metadata.customMetadata).toEqual({ 136 + mimeType: 'application/javascript', 137 + encoding: 'gzip', 138 + }); 139 + }); 140 + });