tangled
alpha
login
or
join now
nekomimi.pet
/
wisp.place-monorepo
87
fork
atom
Monorepo for wisp.place. A static site hosting service built on top of the AT Protocol.
wisp.place
87
fork
atom
overview
issues
9
pulls
pipelines
backfill fills db rpoperly
nekomimi.pet
3 weeks ago
f136ee7c
016b8cad
1/2
deploy-wisp.yml
success
39s
test.yml
failed
32s
+208
-10
6 changed files
expand all
collapse all
unified
split
apps
firehose-service
package.json
src
config.ts
index.ts
lib
cache-writer.ts
db.ts
packages
@wispplace
constants
src
index.ts
+3
-1
apps/firehose-service/package.json
···
5
5
"scripts": {
6
6
"dev": "NODE_OPTIONS='--max-old-space-size=2048' tsx --env-file=.env src/index.ts",
7
7
"dev:backfill": "NODE_OPTIONS='--max-old-space-size=2048' tsx --env-file=.env src/index.ts --backfill",
8
8
+
"dev:db-fill": "NODE_OPTIONS='--max-old-space-size=2048' tsx --env-file=.env src/index.ts --db-fill-only",
8
9
"build": "bun run build.ts",
9
10
"start": "NODE_OPTIONS='--max-old-space-size=2048' tsx src/index.ts",
10
11
"check": "tsc --noEmit",
11
11
-
"backfill": "NODE_OPTIONS='--max-old-space-size=2048' tsx src/index.ts --backfill"
12
12
+
"backfill": "NODE_OPTIONS='--max-old-space-size=2048' tsx src/index.ts --backfill",
13
13
+
"db-fill": "NODE_OPTIONS='--max-old-space-size=2048' tsx src/index.ts --db-fill-only"
12
14
},
13
15
"dependencies": {
14
16
"@atproto/api": "^0.17.4",
+2
-1
apps/firehose-service/src/config.ts
···
28
28
revalidateGroup: process.env.WISP_REVALIDATE_GROUP || 'firehose-service',
29
29
30
30
// Mode
31
31
-
isBackfill: process.argv.includes('--backfill'),
31
31
+
isDbFillOnly: process.argv.includes('--db-fill-only'),
32
32
+
isBackfill: process.argv.includes('--backfill') || process.argv.includes('--db-fill-only'),
32
33
} as const;
+72
-6
apps/firehose-service/src/index.ts
···
4
4
* Modes:
5
5
* - Normal: Watch firehose for place.wisp.fs events
6
6
* - Backfill: Process existing sites from database
7
7
+
* - DB Fill Only: Collect DIDs and backfill sites table (skip S3 writes)
7
8
*/
8
9
9
10
import { Hono } from 'hono';
10
11
import { serve } from '@hono/node-server';
11
12
import { config } from './config';
12
13
import { startFirehose, stopFirehose, getFirehoseHealth } from './lib/firehose';
13
13
-
import { closeDatabase, listAllSiteCaches, listAllSites, getSiteCache } from './lib/db';
14
14
+
import { closeDatabase, listAllKnownDids, listAllSiteCaches, listAllSites, getSiteCache, upsertSite } from './lib/db';
14
15
import { storage } from './lib/storage';
15
15
-
import { handleSiteCreateOrUpdate, fetchSiteRecord } from './lib/cache-writer';
16
16
+
import { handleSiteCreateOrUpdate, fetchSiteRecord, listSiteRecordsForDid } from './lib/cache-writer';
16
17
import { startRevalidateWorker, stopRevalidateWorker } from './lib/revalidate-worker';
17
18
import { closeCacheInvalidationPublisher } from './lib/cache-invalidation';
18
19
import { initializeGrafanaExporters, createLogger, logCollector, errorTracker, metricsCollector } from '@wispplace/observability';
···
41
42
42
43
return c.json({
43
44
status: firehoseHealth.healthy ? 'healthy' : 'degraded',
44
44
-
mode: config.isBackfill ? 'backfill' : 'firehose',
45
45
+
mode: config.isDbFillOnly ? 'db-fill-only' : (config.isBackfill ? 'backfill' : 'firehose'),
45
46
firehose: firehoseHealth,
46
47
storage: storageStats,
47
48
});
···
69
70
process.on('SIGTERM', () => shutdown('SIGTERM'));
70
71
71
72
/**
72
72
-
* Backfill mode - process existing sites from database
73
73
+
* Backfill phase 1+2:
74
74
+
* - Collect all known DIDs from DB
75
75
+
* - Backfill each DID's place.wisp.fs records into the sites table
76
76
+
*/
77
77
+
async function backfillSitesTableFromKnownDids(): Promise<void> {
78
78
+
logger.info('Phase 1/3: Collecting known DIDs');
79
79
+
const dids = await listAllKnownDids();
80
80
+
logger.info(`Collected ${dids.length} known DIDs`);
81
81
+
82
82
+
if (dids.length === 0) {
83
83
+
logger.warn('No known DIDs found; skipping sites table backfill');
84
84
+
return;
85
85
+
}
86
86
+
87
87
+
logger.info('Phase 2/3: Backfilling place.wisp.fs records into sites table');
88
88
+
89
89
+
let didsProcessed = 0;
90
90
+
let didsFailed = 0;
91
91
+
let sitesSynced = 0;
92
92
+
let sitesFailed = 0;
93
93
+
94
94
+
for (const did of dids) {
95
95
+
try {
96
96
+
const records = await listSiteRecordsForDid(did);
97
97
+
for (const row of records) {
98
98
+
try {
99
99
+
const siteName = typeof row.record.site === 'string' && row.record.site.length > 0
100
100
+
? row.record.site
101
101
+
: row.rkey;
102
102
+
await upsertSite(did, row.rkey, siteName);
103
103
+
sitesSynced++;
104
104
+
} catch (err) {
105
105
+
logger.error(`[Backfill:sites] Failed to upsert site ${did}/${row.rkey}`, err);
106
106
+
sitesFailed++;
107
107
+
}
108
108
+
}
109
109
+
didsProcessed++;
110
110
+
logger.info(`[Backfill:sites] Progress ${didsProcessed + didsFailed}/${dids.length} DIDs`);
111
111
+
} catch (err) {
112
112
+
logger.error(`[Backfill:sites] Failed to list records for DID ${did}`, err);
113
113
+
didsFailed++;
114
114
+
}
115
115
+
}
116
116
+
117
117
+
logger.info(
118
118
+
`Phase 2/3 complete: ${didsProcessed} DIDs processed, ${didsFailed} DIDs failed, ${sitesSynced} sites synced, ${sitesFailed} sites failed`
119
119
+
);
120
120
+
}
121
121
+
122
122
+
/**
123
123
+
* Backfill phase 3:
124
124
+
* - process sites from database and backfill blobs into S3
73
125
*/
74
126
async function runBackfill(): Promise<void> {
75
127
logger.info('Starting backfill mode');
76
128
const startTime = Date.now();
77
129
const forceRewriteHtml = process.env.BACKFILL_FORCE_REWRITE_HTML === 'true';
130
130
+
const forceDownload = process.env.BACKFILL_FORCE_DOWNLOAD === 'true';
78
131
79
132
if (forceRewriteHtml) {
80
133
logger.info('Forcing HTML rewrite for all sites');
134
134
+
}
135
135
+
if (forceDownload) {
136
136
+
logger.info('Forcing full file download/write for all backfilled sites');
81
137
}
82
138
139
139
+
await backfillSitesTableFromKnownDids();
140
140
+
141
141
+
if (config.isDbFillOnly) {
142
142
+
logger.info('DB fill only mode complete; skipping phase 3/3 S3 backfill');
143
143
+
return;
144
144
+
}
145
145
+
146
146
+
logger.info('Phase 3/3: Backfilling site blobs into S3');
147
147
+
83
148
let sites = await listAllSites();
84
149
if (sites.length === 0) {
85
150
const cachedSites = await listAllSiteCaches();
···
106
171
107
172
const existingCache = await getSiteCache(site.did, site.rkey);
108
173
// Check if CID matches (already up to date)
109
109
-
if (!forceRewriteHtml && existingCache && result.cid === existingCache.record_cid) {
174
174
+
if (!forceRewriteHtml && !forceDownload && existingCache && result.cid === existingCache.record_cid) {
110
175
logger.info(`Site already up to date: ${site.did}/${site.rkey}`);
111
176
skipped++;
112
177
continue;
···
115
180
// Process the site
116
181
await handleSiteCreateOrUpdate(site.did, site.rkey, result.record, result.cid, {
117
182
forceRewriteHtml,
183
183
+
forceDownload,
118
184
});
119
185
processed++;
120
186
···
164
230
// Main entry point
165
231
async function main() {
166
232
logger.info('Starting firehose-service');
167
167
-
logger.info(`Mode: ${config.isBackfill ? 'backfill' : 'firehose'}`);
233
233
+
logger.info(`Mode: ${config.isDbFillOnly ? 'db-fill-only' : (config.isBackfill ? 'backfill' : 'firehose')}`);
168
234
logger.info(`S3 Bucket: ${config.s3Bucket || '(disk fallback)'}`);
169
235
170
236
// Start health server
+56
apps/firehose-service/src/lib/cache-writer.ts
···
50
50
}
51
51
52
52
/**
53
53
+
* List all place.wisp.fs records for a DID.
54
54
+
*/
55
55
+
export async function listSiteRecordsForDid(
56
56
+
did: string
57
57
+
): Promise<Array<{ rkey: string; record: WispFsRecord; cid: string }>> {
58
58
+
const pdsEndpoint = await getPdsForDid(did);
59
59
+
if (!pdsEndpoint) {
60
60
+
logger.error('Failed to get PDS endpoint for DID (listRecords)', undefined, { did });
61
61
+
return [];
62
62
+
}
63
63
+
64
64
+
const records: Array<{ rkey: string; record: WispFsRecord; cid: string }> = [];
65
65
+
let cursor: string | undefined;
66
66
+
67
67
+
while (true) {
68
68
+
const params = new URLSearchParams({
69
69
+
repo: did,
70
70
+
collection: 'place.wisp.fs',
71
71
+
limit: '100',
72
72
+
});
73
73
+
if (cursor) params.set('cursor', cursor);
74
74
+
75
75
+
const url = `${pdsEndpoint}/xrpc/com.atproto.repo.listRecords?${params.toString()}`;
76
76
+
const data = await safeFetchJson(url) as {
77
77
+
records?: Array<{ uri?: string; value?: unknown; cid?: string }>;
78
78
+
cursor?: string;
79
79
+
};
80
80
+
81
81
+
const pageRecords = Array.isArray(data.records) ? data.records : [];
82
82
+
for (const row of pageRecords) {
83
83
+
const uri = row.uri;
84
84
+
const record = row.value as WispFsRecord | undefined;
85
85
+
if (!uri || !record || record.$type !== 'place.wisp.fs') continue;
86
86
+
87
87
+
const uriParts = uri.split('/');
88
88
+
const rkey = uriParts[uriParts.length - 1];
89
89
+
if (!rkey) continue;
90
90
+
91
91
+
records.push({
92
92
+
rkey,
93
93
+
record,
94
94
+
cid: row.cid || '',
95
95
+
});
96
96
+
}
97
97
+
98
98
+
const nextCursor = typeof data.cursor === 'string' && data.cursor.length > 0
99
99
+
? data.cursor
100
100
+
: undefined;
101
101
+
if (!nextCursor) break;
102
102
+
cursor = nextCursor;
103
103
+
}
104
104
+
105
105
+
return records;
106
106
+
}
107
107
+
108
108
+
/**
53
109
* Fetch a settings record from the PDS
54
110
*/
55
111
export async function fetchSettingsRecord(
+73
apps/firehose-service/src/lib/db.ts
···
49
49
`;
50
50
}
51
51
52
52
+
/**
53
53
+
* List all known DIDs from all DID-bearing tables.
54
54
+
* Missing tables are skipped to keep bootstrapping resilient.
55
55
+
*/
56
56
+
export async function listAllKnownDids(): Promise<string[]> {
57
57
+
const sources: Array<{ name: string; fetch: () => Promise<Array<{ did: string }>> }> = [
58
58
+
{
59
59
+
name: 'sites',
60
60
+
fetch: () => sql<Array<{ did: string }>>`
61
61
+
SELECT DISTINCT did
62
62
+
FROM sites
63
63
+
WHERE did IS NOT NULL AND did <> ''
64
64
+
`,
65
65
+
},
66
66
+
{
67
67
+
name: 'site_cache',
68
68
+
fetch: () => sql<Array<{ did: string }>>`
69
69
+
SELECT DISTINCT did
70
70
+
FROM site_cache
71
71
+
WHERE did IS NOT NULL AND did <> ''
72
72
+
`,
73
73
+
},
74
74
+
{
75
75
+
name: 'site_settings_cache',
76
76
+
fetch: () => sql<Array<{ did: string }>>`
77
77
+
SELECT DISTINCT did
78
78
+
FROM site_settings_cache
79
79
+
WHERE did IS NOT NULL AND did <> ''
80
80
+
`,
81
81
+
},
82
82
+
{
83
83
+
name: 'domains',
84
84
+
fetch: () => sql<Array<{ did: string }>>`
85
85
+
SELECT DISTINCT did
86
86
+
FROM domains
87
87
+
WHERE did IS NOT NULL AND did <> ''
88
88
+
`,
89
89
+
},
90
90
+
{
91
91
+
name: 'custom_domains',
92
92
+
fetch: () => sql<Array<{ did: string }>>`
93
93
+
SELECT DISTINCT did
94
94
+
FROM custom_domains
95
95
+
WHERE did IS NOT NULL AND did <> ''
96
96
+
`,
97
97
+
},
98
98
+
{
99
99
+
name: 'supporter',
100
100
+
fetch: () => sql<Array<{ did: string }>>`
101
101
+
SELECT DISTINCT did
102
102
+
FROM supporter
103
103
+
WHERE did IS NOT NULL AND did <> ''
104
104
+
`,
105
105
+
},
106
106
+
];
107
107
+
const dids = new Set<string>();
108
108
+
109
109
+
for (const source of sources) {
110
110
+
try {
111
111
+
const rows = await source.fetch();
112
112
+
for (const row of rows) {
113
113
+
if (typeof row.did === 'string' && row.did.length > 0) {
114
114
+
dids.add(row.did);
115
115
+
}
116
116
+
}
117
117
+
} catch {
118
118
+
logger.warn(`[DB] Skipping DID source table ${source.name}`);
119
119
+
}
120
120
+
}
121
121
+
122
122
+
return [...dids].sort();
123
123
+
}
124
124
+
52
125
// Write functions
53
126
54
127
export async function upsertSiteCache(
+2
-2
packages/@wispplace/constants/src/index.ts
···
82
82
// CLI Binary URLs
83
83
export const CLI_BINARY_BASE_URL = "https://sites.wisp.place/nekomimi.pet/wisp-cli-binaries";
84
84
export const CLI_BINARIES = {
85
85
-
"darwin-universal": `${CLI_BINARY_BASE_URL}/wisp-cli-darwin-universal`,
85
85
+
"darwin-universal": `${CLI_BINARY_BASE_URL}/wisp-cli-universal-darwin`,
86
86
"darwin-arm64": `${CLI_BINARY_BASE_URL}/wisp-cli-aarch64-darwin`,
87
87
-
"darwin-x86_64": `${CLI_BINARY_BASE_URL}/wisp-cli-darwin-x86_64`,
87
87
+
"darwin-x86_64": `${CLI_BINARY_BASE_URL}/wisp-cli-x86_64-darwin`,
88
88
"linux-arm64": `${CLI_BINARY_BASE_URL}/wisp-cli-aarch64-linux`,
89
89
"linux-x86_64": `${CLI_BINARY_BASE_URL}/wisp-cli-x86_64-linux`,
90
90
} as const;