a tool for shared writing and social publishing

actually de-duplicate pubs and docs

+449 -336
+5 -2
actions/getIdentityData.ts
··· 3 3 import { cookies } from "next/headers"; 4 4 import { supabaseServerClient } from "supabase/serverClient"; 5 5 import { cache } from "react"; 6 + import { deduplicateByUri } from "src/utils/deduplicateRecords"; 6 7 export const getIdentityData = cache(uncachedGetIdentityData); 7 8 export async function uncachedGetIdentityData() { 8 9 let cookieStore = await cookies(); ··· 44 45 if (!auth_res?.data?.identities) return null; 45 46 if (auth_res.data.identities.atp_did) { 46 47 //I should create a relationship table so I can do this in the above query 47 - let { data: publications } = await supabaseServerClient 48 + let { data: rawPublications } = await supabaseServerClient 48 49 .from("publications") 49 50 .select("*") 50 51 .eq("identity_did", auth_res.data.identities.atp_did); 52 + // Deduplicate records that may exist under both pub.leaflet and site.standard namespaces 53 + const publications = deduplicateByUri(rawPublications || []); 51 54 return { 52 55 ...auth_res.data.identities, 53 - publications: publications || [], 56 + publications, 54 57 }; 55 58 } 56 59
+5 -1
app/(home-pages)/discover/getPublications.ts
··· 5 5 normalizePublicationRow, 6 6 hasValidPublication, 7 7 } from "src/utils/normalizeRecords"; 8 + import { deduplicateByUri } from "src/utils/deduplicateRecords"; 8 9 9 10 export type Cursor = { 10 11 indexed_at?: string; ··· 42 43 return { publications: [], nextCursor: null }; 43 44 } 44 45 46 + // Deduplicate records that may exist under both pub.leaflet and site.standard namespaces 47 + const dedupedPublications = deduplicateByUri(publications || []); 48 + 45 49 // Filter out publications without documents 46 - const allPubs = (publications || []).filter( 50 + const allPubs = dedupedPublications.filter( 47 51 (pub) => pub.documents_in_publications.length > 0, 48 52 ); 49 53
+6 -1
app/(home-pages)/p/[didOrHandle]/getProfilePosts.ts
··· 7 7 normalizeDocumentRecord, 8 8 normalizePublicationRecord, 9 9 } from "src/utils/normalizeRecords"; 10 + import { deduplicateByUriOrdered } from "src/utils/deduplicateRecords"; 10 11 11 12 export type Cursor = { 12 13 indexed_at: string; ··· 38 39 ); 39 40 } 40 41 41 - let [{ data: docs }, { data: pubs }, { data: profile }] = await Promise.all([ 42 + let [{ data: rawDocs }, { data: rawPubs }, { data: profile }] = await Promise.all([ 42 43 query, 43 44 supabaseServerClient 44 45 .from("publications") ··· 50 51 .eq("did", did) 51 52 .single(), 52 53 ]); 54 + 55 + // Deduplicate records that may exist under both pub.leaflet and site.standard namespaces 56 + const docs = deduplicateByUriOrdered(rawDocs || []); 57 + const pubs = deduplicateByUriOrdered(rawPubs || []); 53 58 54 59 // Build a map of publications for quick lookup 55 60 let pubMap = new Map<string, NonNullable<typeof pubs>[number]>();
+6 -2
app/(home-pages)/reader/getReaderFeed.ts
··· 14 14 type NormalizedDocument, 15 15 type NormalizedPublication, 16 16 } from "src/utils/normalizeRecords"; 17 + import { deduplicateByUriOrdered } from "src/utils/deduplicateRecords"; 17 18 18 19 export type Cursor = { 19 20 timestamp: string; ··· 45 46 `indexed_at.lt.${cursor.timestamp},and(indexed_at.eq.${cursor.timestamp},uri.lt.${cursor.uri})`, 46 47 ); 47 48 } 48 - let { data: feed, error } = await query; 49 + let { data: rawFeed, error } = await query; 50 + 51 + // Deduplicate records that may exist under both pub.leaflet and site.standard namespaces 52 + const feed = deduplicateByUriOrdered(rawFeed || []); 49 53 50 54 let posts = ( 51 55 await Promise.all( 52 - feed?.map(async (post) => { 56 + feed.map(async (post) => { 53 57 let pub = post.documents_in_publications[0].publications!; 54 58 let uri = new AtUri(post.uri); 55 59 let handle = await idResolver.did.resolve(uri.host);
+5 -1
app/(home-pages)/tag/[tag]/getDocumentsByTag.ts
··· 9 9 normalizeDocumentRecord, 10 10 normalizePublicationRecord, 11 11 } from "src/utils/normalizeRecords"; 12 + import { deduplicateByUriOrdered } from "src/utils/deduplicateRecords"; 12 13 13 14 export async function getDocumentsByTag( 14 15 tag: string, 15 16 ): Promise<{ posts: Post[] }> { 16 17 // Query documents that have this tag 17 - const { data: documents, error } = await supabaseServerClient 18 + const { data: rawDocuments, error } = await supabaseServerClient 18 19 .from("documents") 19 20 .select( 20 21 `*, ··· 30 31 console.error("Error fetching documents by tag:", error); 31 32 return { posts: [] }; 32 33 } 34 + 35 + // Deduplicate records that may exist under both pub.leaflet and site.standard namespaces 36 + const documents = deduplicateByUriOrdered(rawDocuments || []); 33 37 34 38 const posts = await Promise.all( 35 39 documents.map(async (doc) => {
+377 -304
app/api/inngest/functions/migrate_user_to_standard.ts
··· 1 1 import { supabaseServerClient } from "supabase/serverClient"; 2 2 import { inngest } from "../client"; 3 3 import { restoreOAuthSession } from "src/atproto-oauth"; 4 - import { AtpBaseClient, SiteStandardPublication, SiteStandardDocument, SiteStandardGraphSubscription } from "lexicons/api"; 4 + import { 5 + AtpBaseClient, 6 + SiteStandardPublication, 7 + SiteStandardDocument, 8 + SiteStandardGraphSubscription, 9 + } from "lexicons/api"; 5 10 import { AtUri } from "@atproto/syntax"; 6 11 import { Json } from "supabase/database.types"; 7 - import { normalizePublicationRecord, normalizeDocumentRecord } from "src/utils/normalizeRecords"; 12 + import { 13 + normalizePublicationRecord, 14 + normalizeDocumentRecord, 15 + } from "src/utils/normalizeRecords"; 8 16 9 17 type MigrationResult = 10 18 | { success: true; oldUri: string; newUri: string; skipped?: boolean } ··· 17 25 } 18 26 const credentialSession = result.value; 19 27 return new AtpBaseClient( 20 - credentialSession.fetchHandler.bind(credentialSession) 28 + credentialSession.fetchHandler.bind(credentialSession), 21 29 ); 22 30 } 23 31 ··· 39 47 await step.run("verify-oauth-session", async () => { 40 48 const result = await restoreOAuthSession(did); 41 49 if (!result.ok) { 42 - throw new Error(`Failed to restore OAuth session: ${result.error.message}`); 50 + throw new Error( 51 + `Failed to restore OAuth session: ${result.error.message}`, 52 + ); 43 53 } 44 54 return { success: true }; 45 55 }); 46 56 47 57 // Step 2: Get user's pub.leaflet.publication records 48 - const oldPublications = await step.run("fetch-old-publications", async () => { 49 - const { data, error } = await supabaseServerClient 50 - .from("publications") 51 - .select("*") 52 - .eq("identity_did", did) 53 - .like("uri", `at://${did}/pub.leaflet.publication/%`); 58 + const oldPublications = await step.run( 59 + "fetch-old-publications", 60 + async () => { 61 + const { data, error } = await supabaseServerClient 62 + .from("publications") 63 + .select("*") 64 + .eq("identity_did", did) 65 + .like("uri", `at://${did}/pub.leaflet.publication/%`); 54 66 55 - if (error) throw new Error(`Failed to fetch publications: ${error.message}`); 56 - return data || []; 57 - }); 67 + if (error) 68 + throw new Error(`Failed to fetch publications: ${error.message}`); 69 + return data || []; 70 + }, 71 + ); 58 72 59 - // Step 3: Migrate each publication 73 + // Step 3: Migrate all publications in parallel 60 74 const publicationUriMap: Record<string, string> = {}; // old URI -> new URI 61 75 62 - for (const pub of oldPublications) { 63 - const aturi = new AtUri(pub.uri); 76 + // Prepare publications that need migration 77 + const publicationsToMigrate = oldPublications 78 + .map((pub) => { 79 + const aturi = new AtUri(pub.uri); 64 80 65 - // Skip if already a site.standard.publication 66 - if (aturi.collection === "site.standard.publication") { 67 - publicationUriMap[pub.uri] = pub.uri; 68 - continue; 69 - } 81 + // Skip if already a site.standard.publication 82 + if (aturi.collection === "site.standard.publication") { 83 + publicationUriMap[pub.uri] = pub.uri; 84 + return null; 85 + } 70 86 71 - const rkey = aturi.rkey; 72 - const normalized = normalizePublicationRecord(pub.record); 87 + const rkey = aturi.rkey; 88 + const normalized = normalizePublicationRecord(pub.record); 73 89 74 - if (!normalized) { 75 - stats.errors.push(`Publication ${pub.uri}: Failed to normalize publication record`); 76 - continue; 77 - } 90 + if (!normalized) { 91 + stats.errors.push( 92 + `Publication ${pub.uri}: Failed to normalize publication record`, 93 + ); 94 + return null; 95 + } 78 96 79 - // Build site.standard.publication record 80 - const newRecord: SiteStandardPublication.Record = { 81 - $type: "site.standard.publication", 82 - name: normalized.name, 83 - url: normalized.url, 84 - description: normalized.description, 85 - icon: normalized.icon, 86 - theme: normalized.theme, 87 - basicTheme: normalized.basicTheme, 88 - preferences: normalized.preferences, 89 - }; 97 + const newRecord: SiteStandardPublication.Record = { 98 + $type: "site.standard.publication", 99 + name: normalized.name, 100 + url: normalized.url, 101 + description: normalized.description, 102 + icon: normalized.icon, 103 + theme: normalized.theme, 104 + basicTheme: normalized.basicTheme, 105 + preferences: normalized.preferences, 106 + }; 90 107 91 - // Step: Write to PDS 92 - const pdsResult = await step.run(`pds-write-publication-${pub.uri}`, async () => { 93 - const agent = await createAuthenticatedAgent(did); 94 - const putResult = await agent.com.atproto.repo.putRecord({ 95 - repo: did, 96 - collection: "site.standard.publication", 97 - rkey, 98 - record: newRecord, 99 - validate: false, 100 - }); 101 - return { newUri: putResult.data.uri }; 102 - }); 108 + return { pub, rkey, normalized, newRecord }; 109 + }) 110 + .filter((x) => x !== null); 103 111 104 - const newUri = pdsResult.newUri; 112 + // Run all PDS writes in parallel 113 + const pubPdsResults = await Promise.all( 114 + publicationsToMigrate.map(({ pub, rkey, newRecord }) => 115 + step.run(`pds-write-publication-${pub.uri}`, async () => { 116 + const agent = await createAuthenticatedAgent(did); 117 + const putResult = await agent.com.atproto.repo.putRecord({ 118 + repo: did, 119 + collection: "site.standard.publication", 120 + rkey, 121 + record: newRecord, 122 + validate: false, 123 + }); 124 + return { oldUri: pub.uri, newUri: putResult.data.uri }; 125 + }), 126 + ), 127 + ); 105 128 106 - // Step: Write to database 107 - const dbResult = await step.run(`db-write-publication-${pub.uri}`, async () => { 108 - const { error: dbError } = await supabaseServerClient 109 - .from("publications") 110 - .upsert({ 111 - uri: newUri, 112 - identity_did: did, 113 - name: normalized.name, 114 - record: newRecord as Json, 115 - }); 129 + // Run all DB writes in parallel 130 + const pubDbResults = await Promise.all( 131 + publicationsToMigrate.map(({ pub, normalized, newRecord }, index) => { 132 + const newUri = pubPdsResults[index].newUri; 133 + return step.run(`db-write-publication-${pub.uri}`, async () => { 134 + const { error: dbError } = await supabaseServerClient 135 + .from("publications") 136 + .upsert({ 137 + uri: newUri, 138 + identity_did: did, 139 + name: normalized.name, 140 + record: newRecord as Json, 141 + }); 116 142 117 - if (dbError) { 118 - return { success: false as const, error: dbError.message }; 119 - } 120 - return { success: true as const }; 121 - }); 143 + if (dbError) { 144 + return { 145 + success: false as const, 146 + oldUri: pub.uri, 147 + newUri, 148 + error: dbError.message, 149 + }; 150 + } 151 + return { success: true as const, oldUri: pub.uri, newUri }; 152 + }); 153 + }), 154 + ); 122 155 123 - if (dbResult.success) { 124 - publicationUriMap[pub.uri] = newUri; 156 + // Process results 157 + for (const result of pubDbResults) { 158 + if (result.success) { 159 + publicationUriMap[result.oldUri] = result.newUri; 125 160 stats.publicationsMigrated++; 126 161 } else { 127 - stats.errors.push(`Publication ${pub.uri}: Database error: ${dbResult.error}`); 162 + stats.errors.push( 163 + `Publication ${result.oldUri}: Database error: ${result.error}`, 164 + ); 128 165 } 129 166 } 130 167 131 - // Step 4: Get ALL user's pub.leaflet.document records (both in publications and standalone) 132 - const oldDocuments = await step.run("fetch-old-documents", async () => { 133 - const { data, error } = await supabaseServerClient 134 - .from("documents") 135 - .select("uri, data") 136 - .like("uri", `at://${did}/pub.leaflet.document/%`); 168 + // Step 4: Get ALL user's documents and their publication associations in parallel 169 + const [oldDocuments, allDocumentPublications] = await Promise.all([ 170 + step.run("fetch-old-documents", async () => { 171 + const { data, error } = await supabaseServerClient 172 + .from("documents") 173 + .select("uri, data") 174 + .like("uri", `at://${did}/pub.leaflet.document/%`); 137 175 138 - if (error) throw new Error(`Failed to fetch documents: ${error.message}`); 139 - return data || []; 140 - }); 176 + if (error) 177 + throw new Error(`Failed to fetch documents: ${error.message}`); 178 + return data || []; 179 + }), 180 + step.run("fetch-document-publications", async () => { 181 + const { data, error } = await supabaseServerClient 182 + .from("documents_in_publications") 183 + .select("document, publication") 184 + .like("document", `at://${did}/pub.leaflet.document/%`); 141 185 142 - // Also fetch publication associations for documents 143 - const documentPublicationMap = await step.run("fetch-document-publications", async () => { 144 - const docUris = oldDocuments.map(d => d.uri); 145 - if (docUris.length === 0) return {}; 186 + if (error) 187 + throw new Error( 188 + `Failed to fetch document publications: ${error.message}`, 189 + ); 190 + return data || []; 191 + }), 192 + ]); 146 193 147 - const { data, error } = await supabaseServerClient 148 - .from("documents_in_publications") 149 - .select("document, publication") 150 - .in("document", docUris); 151 - 152 - if (error) throw new Error(`Failed to fetch document publications: ${error.message}`); 153 - 154 - // Create a map of document URI -> publication URI 155 - const map: Record<string, string> = {}; 156 - for (const row of data || []) { 157 - map[row.document] = row.publication; 158 - } 159 - return map; 160 - }); 194 + // Create a map of document URI -> publication URI 195 + const documentPublicationMap: Record<string, string> = {}; 196 + for (const row of allDocumentPublications) { 197 + documentPublicationMap[row.document] = row.publication; 198 + } 161 199 162 200 const documentUriMap: Record<string, string> = {}; // old URI -> new URI 163 201 164 - for (const doc of oldDocuments) { 165 - const aturi = new AtUri(doc.uri); 166 - 167 - // Skip if already a site.standard.document 168 - if (aturi.collection === "site.standard.document") { 169 - documentUriMap[doc.uri] = doc.uri; 170 - continue; 171 - } 202 + // Prepare documents that need migration 203 + const documentsToMigrate = oldDocuments 204 + .map((doc) => { 205 + const aturi = new AtUri(doc.uri); 172 206 173 - const rkey = aturi.rkey; 174 - const normalized = normalizeDocumentRecord(doc.data, doc.uri); 207 + // Skip if already a site.standard.document 208 + if (aturi.collection === "site.standard.document") { 209 + documentUriMap[doc.uri] = doc.uri; 210 + return null; 211 + } 175 212 176 - if (!normalized) { 177 - stats.errors.push(`Document ${doc.uri}: Failed to normalize document record`); 178 - continue; 179 - } 213 + const rkey = aturi.rkey; 214 + const normalized = normalizeDocumentRecord(doc.data, doc.uri); 180 215 181 - // Determine the site field: 182 - // - If document is in a publication, use the new publication URI (if migrated) or old URI 183 - // - If standalone, use the HTTPS URL format 184 - const oldPubUri = documentPublicationMap[doc.uri]; 185 - let siteValue: string; 216 + if (!normalized) { 217 + stats.errors.push( 218 + `Document ${doc.uri}: Failed to normalize document record`, 219 + ); 220 + return null; 221 + } 186 222 187 - if (oldPubUri) { 188 - // Document is in a publication - use new URI if migrated, otherwise keep old 189 - siteValue = publicationUriMap[oldPubUri] || oldPubUri; 190 - } else { 191 - // Standalone document - use HTTPS URL format 192 - siteValue = `https://leaflet.pub/p/${did}`; 193 - } 223 + // Determine the site field: 224 + // - If document is in a publication, use the new publication URI (if migrated) or old URI 225 + // - If standalone, use the HTTPS URL format 226 + const oldPubUri = documentPublicationMap[doc.uri]; 227 + let siteValue: string; 194 228 195 - // Build site.standard.document record 196 - const newRecord: SiteStandardDocument.Record = { 197 - $type: "site.standard.document", 198 - title: normalized.title || "Untitled", 199 - site: siteValue, 200 - path: rkey, 201 - publishedAt: normalized.publishedAt || new Date().toISOString(), 202 - description: normalized.description, 203 - content: normalized.content, 204 - tags: normalized.tags, 205 - coverImage: normalized.coverImage, 206 - bskyPostRef: normalized.bskyPostRef, 207 - }; 229 + if (oldPubUri) { 230 + // Document is in a publication - use new URI if migrated, otherwise keep old 231 + siteValue = publicationUriMap[oldPubUri] || oldPubUri; 232 + } else { 233 + // Standalone document - use HTTPS URL format 234 + siteValue = `https://leaflet.pub/p/${did}`; 235 + } 208 236 209 - // Step: Write to PDS 210 - const pdsResult = await step.run(`pds-write-document-${doc.uri}`, async () => { 211 - const agent = await createAuthenticatedAgent(did); 212 - const putResult = await agent.com.atproto.repo.putRecord({ 213 - repo: did, 214 - collection: "site.standard.document", 215 - rkey, 216 - record: newRecord, 217 - validate: false, 218 - }); 219 - return { newUri: putResult.data.uri }; 220 - }); 237 + // Build site.standard.document record 238 + const newRecord: SiteStandardDocument.Record = { 239 + $type: "site.standard.document", 240 + title: normalized.title || "Untitled", 241 + site: siteValue, 242 + path: rkey, 243 + publishedAt: normalized.publishedAt || new Date().toISOString(), 244 + description: normalized.description, 245 + content: normalized.content, 246 + tags: normalized.tags, 247 + coverImage: normalized.coverImage, 248 + bskyPostRef: normalized.bskyPostRef, 249 + }; 221 250 222 - const newUri = pdsResult.newUri; 251 + return { doc, rkey, normalized, newRecord, oldPubUri }; 252 + }) 253 + .filter((x) => x !== null); 223 254 224 - // Step: Write to database 225 - const dbResult = await step.run(`db-write-document-${doc.uri}`, async () => { 226 - const { error: dbError } = await supabaseServerClient 227 - .from("documents") 228 - .upsert({ 229 - uri: newUri, 230 - data: newRecord as Json, 255 + // Run all PDS writes in parallel 256 + const docPdsResults = await Promise.all( 257 + documentsToMigrate.map(({ doc, rkey, newRecord }) => 258 + step.run(`pds-write-document-${doc.uri}`, async () => { 259 + const agent = await createAuthenticatedAgent(did); 260 + const putResult = await agent.com.atproto.repo.putRecord({ 261 + repo: did, 262 + collection: "site.standard.document", 263 + rkey, 264 + record: newRecord, 265 + validate: false, 231 266 }); 232 - 233 - if (dbError) { 234 - return { success: false as const, error: dbError.message }; 235 - } 267 + return { oldUri: doc.uri, newUri: putResult.data.uri }; 268 + }), 269 + ), 270 + ); 236 271 237 - // If document was in a publication, add to documents_in_publications with new URIs 238 - if (oldPubUri) { 239 - const newPubUri = publicationUriMap[oldPubUri] || oldPubUri; 240 - await supabaseServerClient 241 - .from("documents_in_publications") 272 + // Run all DB writes in parallel 273 + const docDbResults = await Promise.all( 274 + documentsToMigrate.map(({ doc, newRecord, oldPubUri }, index) => { 275 + const newUri = docPdsResults[index].newUri; 276 + return step.run(`db-write-document-${doc.uri}`, async () => { 277 + const { error: dbError } = await supabaseServerClient 278 + .from("documents") 242 279 .upsert({ 243 - publication: newPubUri, 244 - document: newUri, 280 + uri: newUri, 281 + data: newRecord as Json, 245 282 }); 246 - } 283 + 284 + if (dbError) { 285 + return { 286 + success: false as const, 287 + oldUri: doc.uri, 288 + newUri, 289 + error: dbError.message, 290 + }; 291 + } 292 + 293 + // If document was in a publication, add to documents_in_publications with new URIs 294 + if (oldPubUri) { 295 + const newPubUri = publicationUriMap[oldPubUri] || oldPubUri; 296 + await supabaseServerClient 297 + .from("documents_in_publications") 298 + .upsert({ 299 + publication: newPubUri, 300 + document: newUri, 301 + }); 302 + } 247 303 248 - return { success: true as const }; 249 - }); 304 + return { success: true as const, oldUri: doc.uri, newUri }; 305 + }); 306 + }), 307 + ); 250 308 251 - if (dbResult.success) { 252 - documentUriMap[doc.uri] = newUri; 309 + // Process results 310 + for (const result of docDbResults) { 311 + if (result.success) { 312 + documentUriMap[result.oldUri] = result.newUri; 253 313 stats.documentsMigrated++; 254 314 } else { 255 - stats.errors.push(`Document ${doc.uri}: Database error: ${dbResult.error}`); 315 + stats.errors.push( 316 + `Document ${result.oldUri}: Database error: ${result.error}`, 317 + ); 256 318 } 257 319 } 258 320 259 - // Step 5: Update references in database tables 321 + // Step 5: Update references in database tables (all in parallel) 260 322 await step.run("update-references", async () => { 261 - // Update leaflets_in_publications - update publication and doc references 262 - for (const [oldUri, newUri] of Object.entries(publicationUriMap)) { 263 - const { error } = await supabaseServerClient 264 - .from("leaflets_in_publications") 265 - .update({ publication: newUri }) 266 - .eq("publication", oldUri); 267 - 268 - if (!error) stats.referencesUpdated++; 269 - } 270 - 271 - for (const [oldUri, newUri] of Object.entries(documentUriMap)) { 272 - const { error } = await supabaseServerClient 273 - .from("leaflets_in_publications") 274 - .update({ doc: newUri }) 275 - .eq("doc", oldUri); 276 - 277 - if (!error) stats.referencesUpdated++; 278 - } 323 + const pubEntries = Object.entries(publicationUriMap); 324 + const docEntries = Object.entries(documentUriMap); 279 325 280 - // Update leaflets_to_documents - update document references 281 - for (const [oldUri, newUri] of Object.entries(documentUriMap)) { 282 - const { error } = await supabaseServerClient 283 - .from("leaflets_to_documents") 284 - .update({ document: newUri }) 285 - .eq("document", oldUri); 286 - 287 - if (!error) stats.referencesUpdated++; 288 - } 289 - 290 - // Update publication_domains - update publication references 291 - for (const [oldUri, newUri] of Object.entries(publicationUriMap)) { 292 - const { error } = await supabaseServerClient 293 - .from("publication_domains") 294 - .update({ publication: newUri }) 295 - .eq("publication", oldUri); 296 - 297 - if (!error) stats.referencesUpdated++; 298 - } 299 - 300 - // Update comments_on_documents - update document references 301 - for (const [oldUri, newUri] of Object.entries(documentUriMap)) { 302 - const { error } = await supabaseServerClient 303 - .from("comments_on_documents") 304 - .update({ document: newUri }) 305 - .eq("document", oldUri); 306 - 307 - if (!error) stats.referencesUpdated++; 308 - } 309 - 310 - // Update document_mentions_in_bsky - update document references 311 - for (const [oldUri, newUri] of Object.entries(documentUriMap)) { 312 - const { error } = await supabaseServerClient 313 - .from("document_mentions_in_bsky") 314 - .update({ document: newUri }) 315 - .eq("document", oldUri); 316 - 317 - if (!error) stats.referencesUpdated++; 318 - } 319 - 320 - // Update subscribers_to_publications - update publication references 321 - for (const [oldUri, newUri] of Object.entries(publicationUriMap)) { 322 - const { error } = await supabaseServerClient 323 - .from("subscribers_to_publications") 324 - .update({ publication: newUri }) 325 - .eq("publication", oldUri); 326 - 327 - if (!error) stats.referencesUpdated++; 328 - } 329 - 330 - // Update publication_subscriptions - update publication references for incoming subscriptions 331 - for (const [oldUri, newUri] of Object.entries(publicationUriMap)) { 332 - const { error } = await supabaseServerClient 333 - .from("publication_subscriptions") 334 - .update({ publication: newUri }) 335 - .eq("publication", oldUri); 336 - 337 - if (!error) stats.referencesUpdated++; 338 - } 326 + const updatePromises = [ 327 + // Update leaflets_in_publications - publication references 328 + ...pubEntries.map(([oldUri, newUri]) => 329 + supabaseServerClient 330 + .from("leaflets_in_publications") 331 + .update({ publication: newUri }) 332 + .eq("publication", oldUri), 333 + ), 334 + // Update leaflets_in_publications - doc references 335 + ...docEntries.map(([oldUri, newUri]) => 336 + supabaseServerClient 337 + .from("leaflets_in_publications") 338 + .update({ doc: newUri }) 339 + .eq("doc", oldUri), 340 + ), 341 + // Update leaflets_to_documents - document references 342 + ...docEntries.map(([oldUri, newUri]) => 343 + supabaseServerClient 344 + .from("leaflets_to_documents") 345 + .update({ document: newUri }) 346 + .eq("document", oldUri), 347 + ), 348 + // Update publication_domains - publication references 349 + ...pubEntries.map(([oldUri, newUri]) => 350 + supabaseServerClient 351 + .from("publication_domains") 352 + .update({ publication: newUri }) 353 + .eq("publication", oldUri), 354 + ), 355 + // Update comments_on_documents - document references 356 + ...docEntries.map(([oldUri, newUri]) => 357 + supabaseServerClient 358 + .from("comments_on_documents") 359 + .update({ document: newUri }) 360 + .eq("document", oldUri), 361 + ), 362 + // Update document_mentions_in_bsky - document references 363 + ...docEntries.map(([oldUri, newUri]) => 364 + supabaseServerClient 365 + .from("document_mentions_in_bsky") 366 + .update({ document: newUri }) 367 + .eq("document", oldUri), 368 + ), 369 + // Update subscribers_to_publications - publication references 370 + ...pubEntries.map(([oldUri, newUri]) => 371 + supabaseServerClient 372 + .from("subscribers_to_publications") 373 + .update({ publication: newUri }) 374 + .eq("publication", oldUri), 375 + ), 376 + // Update publication_subscriptions - publication references 377 + ...pubEntries.map(([oldUri, newUri]) => 378 + supabaseServerClient 379 + .from("publication_subscriptions") 380 + .update({ publication: newUri }) 381 + .eq("publication", oldUri), 382 + ), 383 + ]; 339 384 385 + const results = await Promise.all(updatePromises); 386 + stats.referencesUpdated = results.filter((r) => !r.error).length; 340 387 return stats.referencesUpdated; 341 388 }); 342 389 343 390 // Step 6: Migrate user's own subscriptions - subscriptions BY this user to other publications 344 - const userSubscriptions = await step.run("fetch-user-subscriptions", async () => { 345 - const { data, error } = await supabaseServerClient 346 - .from("publication_subscriptions") 347 - .select("*") 348 - .eq("identity", did) 349 - .like("uri", `at://${did}/pub.leaflet.graph.subscription/%`); 391 + const userSubscriptions = await step.run( 392 + "fetch-user-subscriptions", 393 + async () => { 394 + const { data, error } = await supabaseServerClient 395 + .from("publication_subscriptions") 396 + .select("*") 397 + .eq("identity", did) 398 + .like("uri", `at://${did}/pub.leaflet.graph.subscription/%`); 350 399 351 - if (error) throw new Error(`Failed to fetch user subscriptions: ${error.message}`); 352 - return data || []; 353 - }); 400 + if (error) 401 + throw new Error( 402 + `Failed to fetch user subscriptions: ${error.message}`, 403 + ); 404 + return data || []; 405 + }, 406 + ); 354 407 355 408 const userSubscriptionUriMap: Record<string, string> = {}; // old URI -> new URI 356 409 357 - for (const sub of userSubscriptions) { 358 - const aturi = new AtUri(sub.uri); 359 - 360 - // Skip if already a site.standard.graph.subscription 361 - if (aturi.collection === "site.standard.graph.subscription") { 362 - userSubscriptionUriMap[sub.uri] = sub.uri; 363 - continue; 364 - } 410 + // Prepare subscriptions that need migration 411 + const subscriptionsToMigrate = userSubscriptions 412 + .map((sub) => { 413 + const aturi = new AtUri(sub.uri); 365 414 366 - const rkey = aturi.rkey; 415 + // Skip if already a site.standard.graph.subscription 416 + if (aturi.collection === "site.standard.graph.subscription") { 417 + userSubscriptionUriMap[sub.uri] = sub.uri; 418 + return null; 419 + } 367 420 368 - // Build site.standard.graph.subscription record 369 - const newRecord: SiteStandardGraphSubscription.Record = { 370 - $type: "site.standard.graph.subscription", 371 - publication: sub.publication, 372 - }; 421 + const rkey = aturi.rkey; 422 + const newRecord: SiteStandardGraphSubscription.Record = { 423 + $type: "site.standard.graph.subscription", 424 + publication: sub.publication, 425 + }; 373 426 374 - // Step: Write to PDS 375 - const pdsResult = await step.run(`pds-write-subscription-${sub.uri}`, async () => { 376 - const agent = await createAuthenticatedAgent(did); 377 - const putResult = await agent.com.atproto.repo.putRecord({ 378 - repo: did, 379 - collection: "site.standard.graph.subscription", 380 - rkey, 381 - record: newRecord, 382 - validate: false, 383 - }); 384 - return { newUri: putResult.data.uri }; 385 - }); 427 + return { sub, rkey, newRecord }; 428 + }) 429 + .filter((x) => x !== null); 386 430 387 - const newUri = pdsResult.newUri; 431 + // Run all PDS writes in parallel 432 + const subPdsResults = await Promise.all( 433 + subscriptionsToMigrate.map(({ sub, rkey, newRecord }) => 434 + step.run(`pds-write-subscription-${sub.uri}`, async () => { 435 + const agent = await createAuthenticatedAgent(did); 436 + const putResult = await agent.com.atproto.repo.putRecord({ 437 + repo: did, 438 + collection: "site.standard.graph.subscription", 439 + rkey, 440 + record: newRecord, 441 + validate: false, 442 + }); 443 + return { oldUri: sub.uri, newUri: putResult.data.uri }; 444 + }), 445 + ), 446 + ); 388 447 389 - // Step: Write to database 390 - const dbResult = await step.run(`db-write-subscription-${sub.uri}`, async () => { 391 - const { error: dbError } = await supabaseServerClient 392 - .from("publication_subscriptions") 393 - .update({ 394 - uri: newUri, 395 - record: newRecord as Json, 396 - }) 397 - .eq("uri", sub.uri); 448 + // Run all DB writes in parallel 449 + const subDbResults = await Promise.all( 450 + subscriptionsToMigrate.map(({ sub, newRecord }, index) => { 451 + const newUri = subPdsResults[index].newUri; 452 + return step.run(`db-write-subscription-${sub.uri}`, async () => { 453 + const { error: dbError } = await supabaseServerClient 454 + .from("publication_subscriptions") 455 + .update({ 456 + uri: newUri, 457 + record: newRecord as Json, 458 + }) 459 + .eq("uri", sub.uri); 398 460 399 - if (dbError) { 400 - return { success: false as const, error: dbError.message }; 401 - } 402 - return { success: true as const }; 403 - }); 461 + if (dbError) { 462 + return { 463 + success: false as const, 464 + oldUri: sub.uri, 465 + newUri, 466 + error: dbError.message, 467 + }; 468 + } 469 + return { success: true as const, oldUri: sub.uri, newUri }; 470 + }); 471 + }), 472 + ); 404 473 405 - if (dbResult.success) { 406 - userSubscriptionUriMap[sub.uri] = newUri; 474 + // Process results 475 + for (const result of subDbResults) { 476 + if (result.success) { 477 + userSubscriptionUriMap[result.oldUri] = result.newUri; 407 478 stats.userSubscriptionsMigrated++; 408 479 } else { 409 - stats.errors.push(`User subscription ${sub.uri}: Database error: ${dbResult.error}`); 480 + stats.errors.push( 481 + `User subscription ${result.oldUri}: Database error: ${result.error}`, 482 + ); 410 483 } 411 484 } 412 485 ··· 424 497 documentUriMap, 425 498 userSubscriptionUriMap, 426 499 }; 427 - } 500 + }, 428 501 );
+6 -2
app/api/rpc/[command]/get_profile_data.ts
··· 10 10 normalizePublicationRow, 11 11 hasValidPublication, 12 12 } from "src/utils/normalizeRecords"; 13 + import { deduplicateByUri } from "src/utils/deduplicateRecords"; 13 14 14 15 export type GetProfileDataReturnType = Awaited< 15 16 ReturnType<(typeof get_profile_data)["handler"]> ··· 58 59 .select("*") 59 60 .eq("identity_did", did); 60 61 61 - let [{ data: profile }, { data: publications }] = await Promise.all([ 62 + let [{ data: profile }, { data: rawPublications }] = await Promise.all([ 62 63 profileReq, 63 64 publicationsReq, 64 65 ]); 65 66 67 + // Deduplicate records that may exist under both pub.leaflet and site.standard namespaces 68 + const publications = deduplicateByUri(rawPublications || []); 69 + 66 70 // Normalize publication records before returning 67 - const normalizedPublications = (publications || []) 71 + const normalizedPublications = publications 68 72 .map(normalizePublicationRow) 69 73 .filter(hasValidPublication); 70 74
+9 -2
app/api/rpc/[command]/get_publication_data.ts
··· 52 52 ) 53 53 )`, 54 54 ) 55 - .or(`name.eq."${publication_name}", uri.eq."${pubLeafletUri}", uri.eq."${siteStandardUri}"`) 55 + .or( 56 + `name.eq."${publication_name}", uri.eq."${pubLeafletUri}", uri.eq."${siteStandardUri}"`, 57 + ) 56 58 .eq("identity_did", did) 59 + .order("uri", { ascending: false }) 60 + .limit(1) 57 61 .single(); 58 62 59 63 let leaflet_data = await getFactsFromHomeLeaflets.handler( ··· 70 74 const documents = (publication?.documents_in_publications || []) 71 75 .map((dip) => { 72 76 if (!dip.documents) return null; 73 - const normalized = normalizeDocumentRecord(dip.documents.data, dip.documents.uri); 77 + const normalized = normalizeDocumentRecord( 78 + dip.documents.data, 79 + dip.documents.uri, 80 + ); 74 81 if (!normalized) return null; 75 82 return { 76 83 uri: dip.documents.uri,
+5 -1
app/api/rpc/[command]/search_publication_names.ts
··· 2 2 import { makeRoute } from "../lib"; 3 3 import type { Env } from "./route"; 4 4 import { getPublicationURL } from "app/lish/createPub/getPublicationURL"; 5 + import { deduplicateByUri } from "src/utils/deduplicateRecords"; 5 6 6 7 export type SearchPublicationNamesReturnType = Awaited< 7 8 ReturnType<(typeof search_publication_names)["handler"]> ··· 15 16 }), 16 17 handler: async ({ query, limit }, { supabase }: Pick<Env, "supabase">) => { 17 18 // Search publications by name in record (case-insensitive partial match) 18 - const { data: publications, error } = await supabase 19 + const { data: rawPublications, error } = await supabase 19 20 .from("publications") 20 21 .select("uri, record") 21 22 .ilike("record->>name", `%${query}%`) ··· 24 25 if (error) { 25 26 throw new Error(`Failed to search publications: ${error.message}`); 26 27 } 28 + 29 + // Deduplicate records that may exist under both pub.leaflet and site.standard namespaces 30 + const publications = deduplicateByUri(rawPublications || []); 27 31 28 32 const result = publications.map((p) => { 29 33 const record = p.record as { name?: string };
+6 -2
app/lish/[did]/[publication]/generateFeed.ts
··· 19 19 let renderToReadableStream = await import("react-dom/server").then( 20 20 (module) => module.renderToReadableStream, 21 21 ); 22 - let { data: publications } = await supabaseServerClient 22 + let { data: publications, error } = await supabaseServerClient 23 23 .from("publications") 24 24 .select( 25 25 `*, ··· 31 31 .or(publicationNameOrUriFilter(did, publication_name)) 32 32 .order("uri", { ascending: false }) 33 33 .limit(1); 34 + console.log(error); 34 35 let publication = publications?.[0]; 35 36 36 37 const pubRecord = normalizePublicationRecord(publication?.record); ··· 54 55 await Promise.all( 55 56 publication.documents_in_publications.map(async (doc) => { 56 57 if (!doc.documents) return; 57 - const record = normalizeDocumentRecord(doc.documents?.data, doc.documents?.uri); 58 + const record = normalizeDocumentRecord( 59 + doc.documents?.data, 60 + doc.documents?.uri, 61 + ); 58 62 const uri = new AtUri(doc.documents?.uri); 59 63 const rkey = uri.rkey; 60 64 if (!record) return;
+8 -8
lexicons/api/lexicons.ts
··· 1441 1441 properties: { 1442 1442 title: { 1443 1443 type: 'string', 1444 - maxLength: 1280, 1445 - maxGraphemes: 128, 1444 + maxLength: 5000, 1445 + maxGraphemes: 500, 1446 1446 }, 1447 1447 postRef: { 1448 1448 type: 'ref', ··· 1450 1450 }, 1451 1451 description: { 1452 1452 type: 'string', 1453 - maxLength: 3000, 1454 - maxGraphemes: 300, 1453 + maxLength: 30000, 1454 + maxGraphemes: 3000, 1455 1455 }, 1456 1456 publishedAt: { 1457 1457 type: 'string', ··· 2128 2128 type: 'blob', 2129 2129 }, 2130 2130 description: { 2131 - maxGraphemes: 300, 2132 - maxLength: 3000, 2131 + maxGraphemes: 3000, 2132 + maxLength: 30000, 2133 2133 type: 'string', 2134 2134 }, 2135 2135 path: { ··· 2165 2165 type: 'ref', 2166 2166 }, 2167 2167 title: { 2168 - maxGraphemes: 128, 2169 - maxLength: 1280, 2168 + maxGraphemes: 500, 2169 + maxLength: 5000, 2170 2170 type: 'string', 2171 2171 }, 2172 2172 updatedAt: {
+4 -4
lexicons/pub/leaflet/document.json
··· 18 18 "properties": { 19 19 "title": { 20 20 "type": "string", 21 - "maxLength": 1280, 22 - "maxGraphemes": 128 21 + "maxLength": 5000, 22 + "maxGraphemes": 500 23 23 }, 24 24 "postRef": { 25 25 "type": "ref", ··· 27 27 }, 28 28 "description": { 29 29 "type": "string", 30 - "maxLength": 3000, 31 - "maxGraphemes": 300 30 + "maxLength": 30000, 31 + "maxGraphemes": 3000 32 32 }, 33 33 "publishedAt": { 34 34 "type": "string",
+4 -4
lexicons/site/standard/document.json
··· 19 19 "type": "blob" 20 20 }, 21 21 "description": { 22 - "maxGraphemes": 300, 23 - "maxLength": 3000, 22 + "maxGraphemes": 3000, 23 + "maxLength": 30000, 24 24 "type": "string" 25 25 }, 26 26 "path": { ··· 53 53 "type": "ref" 54 54 }, 55 55 "title": { 56 - "maxGraphemes": 128, 57 - "maxLength": 1280, 56 + "maxGraphemes": 500, 57 + "maxLength": 5000, 58 58 "type": "string" 59 59 }, 60 60 "updatedAt": {
+2 -2
lexicons/src/document.ts
··· 16 16 type: "object", 17 17 required: ["pages", "author", "title"], 18 18 properties: { 19 - title: { type: "string", maxLength: 1280, maxGraphemes: 128 }, 19 + title: { type: "string", maxLength: 5000, maxGraphemes: 500 }, 20 20 postRef: { type: "ref", ref: "com.atproto.repo.strongRef" }, 21 - description: { type: "string", maxLength: 3000, maxGraphemes: 300 }, 21 + description: { type: "string", maxLength: 30000, maxGraphemes: 3000 }, 22 22 publishedAt: { type: "string", format: "datetime" }, 23 23 publication: { type: "string", format: "at-uri" }, 24 24 author: { type: "string", format: "at-identifier" },
+1
package.json
··· 4 4 "description": "", 5 5 "main": "index.js", 6 6 "scripts": { 7 + "lint": "next lint", 7 8 "dev": "TZ=UTC next dev --turbo", 8 9 "publish-lexicons": "tsx lexicons/publish.ts", 9 10 "generate-db-types": "supabase gen types --local > supabase/database.types.ts && drizzle-kit introspect && rm -rf ./drizzle/*.sql ./drizzle/meta",