tangled
alpha
login
or
join now
byarielm.fyi
/
atlast
16
fork
atom
ATlast — you'll never need to find your favorites on another platform again. Find your favs in the ATmosphere.
atproto
16
fork
atom
overview
issues
1
pulls
pipelines
add bulk inserts to db, replace indiv queries
byarielm.fyi
4 months ago
74cc4d30
b098fd8e
+290
-145
2 changed files
expand all
collapse all
unified
split
netlify
functions
db-helpers.ts
save-results.ts
+201
-82
netlify/functions/db-helpers.ts
···
1
1
import { getDbClient } from './db';
2
2
3
3
-
// Normalize username for consistent matching
4
4
-
export function normalizeUsername(username: string): string {
5
5
-
return username.toLowerCase().replace(/[._-]/g, '');
3
3
+
export async function createUpload(
4
4
+
uploadId: string,
5
5
+
did: string,
6
6
+
sourcePlatform: string,
7
7
+
totalUsers: number,
8
8
+
matchedUsers: number
9
9
+
) {
10
10
+
const sql = getDbClient();
11
11
+
await sql`
12
12
+
INSERT INTO user_uploads (upload_id, did, source_platform, total_users, matched_users, unmatched_users)
13
13
+
VALUES (${uploadId}, ${did}, ${sourcePlatform}, ${totalUsers}, ${matchedUsers}, ${totalUsers - matchedUsers})
14
14
+
ON CONFLICT (upload_id) DO NOTHING
15
15
+
`;
6
16
}
7
17
8
8
-
// Get or create a source account, returns the source_account_id
9
18
export async function getOrCreateSourceAccount(
10
10
-
platform: string,
11
11
-
username: string
19
19
+
sourcePlatform: string,
20
20
+
sourceUsername: string
12
21
): Promise<number> {
13
22
const sql = getDbClient();
14
14
-
const normalized = normalizeUsername(username);
23
23
+
const normalized = sourceUsername.toLowerCase().replace(/[._-]/g, '');
15
24
16
25
const result = await sql`
17
26
INSERT INTO source_accounts (source_platform, source_username, normalized_username)
18
18
-
VALUES (${platform}, ${username}, ${normalized})
19
19
-
ON CONFLICT (source_platform, normalized_username)
20
20
-
DO UPDATE SET source_username = ${username}
27
27
+
VALUES (${sourcePlatform}, ${sourceUsername}, ${normalized})
28
28
+
ON CONFLICT (source_platform, normalized_username) DO UPDATE SET
29
29
+
source_username = ${sourceUsername}
21
30
RETURNING id
22
31
`;
23
32
24
24
-
return (result as Array<{ id: number }>)[0].id;
33
33
+
return (result as any[])[0].id;
25
34
}
26
35
27
27
-
// Link a user to a source account
28
36
export async function linkUserToSourceAccount(
29
37
uploadId: string,
30
38
did: string,
31
39
sourceAccountId: number,
32
32
-
sourceDate?: string
33
33
-
): Promise<void> {
40
40
+
sourceDate: string
41
41
+
) {
34
42
const sql = getDbClient();
35
35
-
36
43
await sql`
37
44
INSERT INTO user_source_follows (upload_id, did, source_account_id, source_date)
38
38
-
VALUES (${uploadId}, ${did}, ${sourceAccountId}, ${sourceDate || null})
45
45
+
VALUES (${uploadId}, ${did}, ${sourceAccountId}, ${sourceDate})
39
46
ON CONFLICT (upload_id, source_account_id) DO NOTHING
40
47
`;
41
48
}
42
49
43
43
-
// Store ATProto match for account (handles duplicates), returns atproto_match_id
44
50
export async function storeAtprotoMatch(
45
51
sourceAccountId: number,
46
52
atprotoDid: string,
47
53
atprotoHandle: string,
48
48
-
displayName: string | undefined,
49
49
-
avatar: string | undefined,
54
54
+
atprotoDisplayName: string | undefined,
55
55
+
atprotoAvatar: string | undefined,
50
56
matchScore: number
51
57
): Promise<number> {
52
58
const sql = getDbClient();
53
53
-
54
59
const result = await sql`
55
60
INSERT INTO atproto_matches (
56
56
-
source_account_id,
57
57
-
atproto_did,
58
58
-
atproto_handle,
59
59
-
atproto_display_name,
60
60
-
atproto_avatar,
61
61
-
match_score,
62
62
-
last_verified
61
61
+
source_account_id, atproto_did, atproto_handle,
62
62
+
atproto_display_name, atproto_avatar, match_score
63
63
)
64
64
VALUES (
65
65
-
${sourceAccountId},
66
66
-
${atprotoDid},
67
67
-
${atprotoHandle},
68
68
-
${displayName || null},
69
69
-
${avatar || null},
70
70
-
${matchScore},
71
71
-
NOW()
65
65
+
${sourceAccountId}, ${atprotoDid}, ${atprotoHandle},
66
66
+
${atprotoDisplayName || null}, ${atprotoAvatar || null}, ${matchScore}
72
67
)
73
73
-
ON CONFLICT (source_account_id, atproto_did)
74
74
-
DO UPDATE SET
68
68
+
ON CONFLICT (source_account_id, atproto_did) DO UPDATE SET
75
69
atproto_handle = ${atprotoHandle},
76
76
-
atproto_display_name = ${displayName || null},
77
77
-
atproto_avatar = ${avatar || null},
70
70
+
atproto_display_name = ${atprotoDisplayName || null},
71
71
+
atproto_avatar = ${atprotoAvatar || null},
78
72
match_score = ${matchScore},
79
73
last_verified = NOW()
80
74
RETURNING id
81
75
`;
82
76
83
83
-
return (result as Array<{ id: number }>)[0].id;
77
77
+
return (result as any[])[0].id;
84
78
}
85
79
86
86
-
// Mark source account as having matches
87
87
-
export async function markSourceAccountMatched(sourceAccountId: number): Promise<void> {
80
80
+
export async function markSourceAccountMatched(sourceAccountId: number) {
88
81
const sql = getDbClient();
89
89
-
90
82
await sql`
91
83
UPDATE source_accounts
92
92
-
SET match_found = TRUE, match_found_at = NOW()
84
84
+
SET match_found = true, match_found_at = NOW()
93
85
WHERE id = ${sourceAccountId}
94
86
`;
95
87
}
96
88
97
97
-
// Create user match status (tracks if user has viewed/followed this match)
98
89
export async function createUserMatchStatus(
99
90
did: string,
100
91
atprotoMatchId: number,
101
92
sourceAccountId: number,
102
102
-
viewed: boolean = true
103
103
-
): Promise<void> {
93
93
+
viewed: boolean = false
94
94
+
) {
104
95
const sql = getDbClient();
105
105
-
106
96
await sql`
107
97
INSERT INTO user_match_status (did, atproto_match_id, source_account_id, viewed, viewed_at)
108
108
-
VALUES (
109
109
-
${did},
110
110
-
${atprotoMatchId},
111
111
-
${sourceAccountId},
112
112
-
${viewed},
113
113
-
${viewed ? 'NOW()' : null}
114
114
-
)
115
115
-
ON CONFLICT (did, atproto_match_id) DO NOTHING
98
98
+
VALUES (${did}, ${atprotoMatchId}, ${sourceAccountId}, ${viewed}, ${viewed ? 'NOW()' : null})
99
99
+
ON CONFLICT (did, atproto_match_id) DO UPDATE SET
100
100
+
viewed = ${viewed},
101
101
+
viewed_at = CASE WHEN ${viewed} THEN NOW() ELSE user_match_status.viewed_at END
102
102
+
`;
103
103
+
}
104
104
+
105
105
+
// NEW: Bulk operations for Phase 2
106
106
+
export async function bulkCreateSourceAccounts(
107
107
+
sourcePlatform: string,
108
108
+
usernames: string[]
109
109
+
): Promise<Map<string, number>> {
110
110
+
const sql = getDbClient();
111
111
+
112
112
+
// Prepare bulk insert values
113
113
+
const values = usernames.map(username => ({
114
114
+
platform: sourcePlatform,
115
115
+
username: username,
116
116
+
normalized: username.toLowerCase().replace(/[._-]/g, '')
117
117
+
}));
118
118
+
119
119
+
// Build bulk insert query with unnest
120
120
+
const platforms = values.map(v => v.platform);
121
121
+
const source_usernames = values.map(v => v.username);
122
122
+
const normalized = values.map(v => v.normalized);
123
123
+
124
124
+
const result = await sql`
125
125
+
INSERT INTO source_accounts (source_platform, source_username, normalized_username)
126
126
+
SELECT *
127
127
+
FROM UNNEST(
128
128
+
${platforms}::text[],
129
129
+
${source_usernames}::text[],
130
130
+
${normalized}::text[]
131
131
+
) AS t(source_platform, source_username, normalized_username)
132
132
+
ON CONFLICT (source_platform, normalized_username) DO UPDATE
133
133
+
SET source_username = EXCLUDED.source_username
134
134
+
RETURNING id, normalized_username
116
135
`;
136
136
+
137
137
+
138
138
+
// Create map of normalized username to ID
139
139
+
const idMap = new Map<string, number>();
140
140
+
for (const row of result as any[]) {
141
141
+
idMap.set(row.normalized_username, row.id);
142
142
+
}
143
143
+
144
144
+
return idMap;
117
145
}
118
146
119
119
-
// Create upload record
120
120
-
export async function createUpload(
147
147
+
// ==================== THIS FUNCTION IS NOW FIXED ====================
148
148
+
export async function bulkLinkUserToSourceAccounts(
121
149
uploadId: string,
122
150
did: string,
123
123
-
platform: string,
124
124
-
totalUsers: number,
125
125
-
matchedUsers: number
126
126
-
): Promise<void> {
151
151
+
links: Array<{ sourceAccountId: number; sourceDate: string }>
152
152
+
) {
127
153
const sql = getDbClient();
128
154
155
155
+
const numLinks = links.length;
156
156
+
if (numLinks === 0) return;
157
157
+
158
158
+
// Extract arrays for columns that change
159
159
+
const sourceAccountIds = links.map(l => l.sourceAccountId);
160
160
+
const sourceDates = links.map(l => l.sourceDate);
161
161
+
162
162
+
// Create arrays for the static columns
163
163
+
const uploadIds = Array(numLinks).fill(uploadId);
164
164
+
const dids = Array(numLinks).fill(did);
165
165
+
166
166
+
// Use the parallel UNNEST pattern, which is proven to work in other functions
129
167
await sql`
130
130
-
INSERT INTO user_uploads (upload_id, did, source_platform, total_users, matched_users, unmatched_users)
131
131
-
VALUES (
132
132
-
${uploadId},
133
133
-
${did},
134
134
-
${platform},
135
135
-
${totalUsers},
136
136
-
${matchedUsers},
137
137
-
${totalUsers - matchedUsers}
168
168
+
INSERT INTO user_source_follows (upload_id, did, source_account_id, source_date)
169
169
+
SELECT * FROM UNNEST(
170
170
+
${uploadIds}::text[],
171
171
+
${dids}::text[],
172
172
+
${sourceAccountIds}::integer[],
173
173
+
${sourceDates}::text[]
174
174
+
) AS t(upload_id, did, source_account_id, source_date)
175
175
+
ON CONFLICT (upload_id, source_account_id) DO NOTHING
176
176
+
`;
177
177
+
}
178
178
+
// ====================================================================
179
179
+
180
180
+
export async function bulkStoreAtprotoMatches(
181
181
+
matches: Array<{
182
182
+
sourceAccountId: number;
183
183
+
atprotoDid: string;
184
184
+
atprotoHandle: string;
185
185
+
atprotoDisplayName?: string;
186
186
+
atprotoAvatar?: string;
187
187
+
matchScore: number;
188
188
+
}>
189
189
+
): Promise<Map<string, number>> {
190
190
+
const sql = getDbClient();
191
191
+
192
192
+
if (matches.length === 0) return new Map();
193
193
+
194
194
+
const sourceAccountId = matches.map(m => m.sourceAccountId)
195
195
+
const atprotoDid = matches.map(m => m.atprotoDid)
196
196
+
const atprotoHandle = matches.map(m => m.atprotoHandle)
197
197
+
const atprotoDisplayName = matches.map(m => m.atprotoDisplayName || null)
198
198
+
const atprotoAvatar = matches.map(m => m.atprotoAvatar || null)
199
199
+
const matchScore = matches.map(m => m.matchScore)
200
200
+
201
201
+
const result = await sql`
202
202
+
INSERT INTO atproto_matches (
203
203
+
source_account_id, atproto_did, atproto_handle,
204
204
+
atproto_display_name, atproto_avatar, match_score
205
205
+
)
206
206
+
SELECT * FROM UNNEST(
207
207
+
${sourceAccountId}::integer[],
208
208
+
${atprotoDid}::text[],
209
209
+
${atprotoHandle}::text[],
210
210
+
${atprotoDisplayName}::text[],
211
211
+
${atprotoAvatar}::text[],
212
212
+
${matchScore}::integer[]
213
213
+
) AS t(
214
214
+
source_account_id, atproto_did, atproto_handle,
215
215
+
atproto_display_name, atproto_avatar, match_score
138
216
)
217
217
+
ON CONFLICT (source_account_id, atproto_did) DO UPDATE SET
218
218
+
atproto_handle = EXCLUDED.atproto_handle,
219
219
+
atproto_display_name = EXCLUDED.atproto_display_name,
220
220
+
atproto_avatar = EXCLUDED.atproto_avatar,
221
221
+
match_score = EXCLUDED.match_score,
222
222
+
last_verified = NOW()
223
223
+
RETURNING id, source_account_id, atproto_did
139
224
`;
225
225
+
226
226
+
// Create map of "sourceAccountId:atprotoDid" to match ID
227
227
+
const idMap = new Map<string, number>();
228
228
+
for (const row of result as any[]) {
229
229
+
idMap.set(`${row.source_account_id}:${row.atproto_did}`, row.id);
230
230
+
}
231
231
+
232
232
+
return idMap;
140
233
}
141
234
142
142
-
// Get user's uploads
143
143
-
export async function getUserUploads(did: string, platform?: string) {
235
235
+
export async function bulkMarkSourceAccountsMatched(sourceAccountIds: number[]) {
144
236
const sql = getDbClient();
145
237
146
146
-
if (platform) {
147
147
-
return await sql`
148
148
-
SELECT * FROM user_uploads
149
149
-
WHERE did = ${did} AND source_platform = ${platform}
150
150
-
ORDER BY created_at DESC
151
151
-
`;
152
152
-
}
238
238
+
if (sourceAccountIds.length === 0) return;
153
239
154
154
-
return await sql`
155
155
-
SELECT * FROM user_uploads
156
156
-
WHERE did = ${did}
157
157
-
ORDER BY created_at DESC
240
240
+
await sql`
241
241
+
UPDATE source_accounts
242
242
+
SET match_found = true, match_found_at = NOW()
243
243
+
WHERE id = ANY(${sourceAccountIds})
244
244
+
`;
245
245
+
}
246
246
+
247
247
+
export async function bulkCreateUserMatchStatus(
248
248
+
statuses: Array<{
249
249
+
did: string;
250
250
+
atprotoMatchId: number;
251
251
+
sourceAccountId: number;
252
252
+
viewed: boolean;
253
253
+
}>
254
254
+
) {
255
255
+
const sql = getDbClient();
256
256
+
257
257
+
if (statuses.length === 0) return;
258
258
+
259
259
+
const did = statuses.map(s => s.did)
260
260
+
const atprotoMatchId = statuses.map(s => s.atprotoMatchId)
261
261
+
const sourceAccountId = statuses.map(s => s.sourceAccountId)
262
262
+
const viewedFlags = statuses.map(s => s.viewed);
263
263
+
const viewedDates = statuses.map(s => s.viewed ? new Date() : null);
264
264
+
265
265
+
await sql`
266
266
+
INSERT INTO user_match_status (did, atproto_match_id, source_account_id, viewed, viewed_at)
267
267
+
SELECT * FROM UNNEST(
268
268
+
${did}::text[],
269
269
+
${atprotoMatchId}::integer[],
270
270
+
${sourceAccountId}::integer[],
271
271
+
${viewedFlags}::boolean[],
272
272
+
${viewedDates}::timestamp[]
273
273
+
) AS t(did, atproto_match_id, source_account_id, viewed, viewed_at)
274
274
+
ON CONFLICT (did, atproto_match_id) DO UPDATE SET
275
275
+
viewed = EXCLUDED.viewed,
276
276
+
viewed_at = CASE WHEN EXCLUDED.viewed THEN NOW() ELSE user_match_status.viewed_at END
158
277
`;
159
278
}
+89
-63
netlify/functions/save-results.ts
···
2
2
import { userSessions } from './oauth-stores-db';
3
3
import cookie from 'cookie';
4
4
import {
5
5
-
getOrCreateSourceAccount,
6
6
-
linkUserToSourceAccount,
7
7
-
storeAtprotoMatch,
8
8
-
markSourceAccountMatched,
9
9
-
createUserMatchStatus,
10
10
-
createUpload
5
5
+
createUpload,
6
6
+
bulkCreateSourceAccounts,
7
7
+
bulkLinkUserToSourceAccounts,
8
8
+
bulkStoreAtprotoMatches,
9
9
+
bulkMarkSourceAccountsMatched,
10
10
+
bulkCreateUserMatchStatus
11
11
} from './db-helpers';
12
12
import { getDbClient } from './db';
13
13
···
100
100
};
101
101
}
102
102
103
103
-
// IMPORTANT: Create upload record FIRST before processing results
104
104
-
// This is required because user_source_follows has a foreign key to user_uploads
103
103
+
// Create upload record FIRST
105
104
await createUpload(
106
105
uploadId,
107
106
userSession.did,
108
107
sourcePlatform,
109
108
results.length,
110
110
-
0 // We'll update this after processing
109
109
+
0
111
110
);
112
111
113
113
-
const BATCH_SIZE = 100;
114
114
-
const batches = [];
115
115
-
for (let i = 0; i < results.length; i += BATCH_SIZE) {
116
116
-
batches.push(results.slice(i, i + BATCH_SIZE));
117
117
-
}
118
118
-
119
119
-
for (const batch of batches) {
120
120
-
// Process batch in parallel
121
121
-
await Promise.all(batch.map(async (result) => {
122
122
-
try {
123
123
-
// 1. Get or create source account (handles race conditions)
124
124
-
const sourceAccountId = await getOrCreateSourceAccount(
125
125
-
sourcePlatform,
126
126
-
result.tiktokUser.username
127
127
-
);
128
128
-
129
129
-
// 2. Link this user to the source account
130
130
-
await linkUserToSourceAccount(
131
131
-
uploadId,
132
132
-
userSession.did,
112
112
+
// BULK OPERATION 1: Create all source accounts at once
113
113
+
const allUsernames = results.map(r => r.tiktokUser.username);
114
114
+
const sourceAccountIdMap = await bulkCreateSourceAccounts(sourcePlatform, allUsernames);
115
115
+
116
116
+
// BULK OPERATION 2: Link all users to source accounts
117
117
+
const links = results.map(result => {
118
118
+
const normalized = result.tiktokUser.username.toLowerCase().replace(/[._-]/g, '');
119
119
+
const sourceAccountId = sourceAccountIdMap.get(normalized);
120
120
+
return {
121
121
+
sourceAccountId: sourceAccountId!,
122
122
+
sourceDate: result.tiktokUser.date
123
123
+
};
124
124
+
}).filter(link => link.sourceAccountId !== undefined);
125
125
+
126
126
+
await bulkLinkUserToSourceAccounts(uploadId, userSession.did, links);
127
127
+
128
128
+
// BULK OPERATION 3: Store all atproto matches at once
129
129
+
const allMatches: Array<{
130
130
+
sourceAccountId: number;
131
131
+
atprotoDid: string;
132
132
+
atprotoHandle: string;
133
133
+
atprotoDisplayName?: string;
134
134
+
atprotoAvatar?: string;
135
135
+
matchScore: number;
136
136
+
}> = [];
137
137
+
138
138
+
const matchedSourceAccountIds: number[] = [];
139
139
+
140
140
+
for (const result of results) {
141
141
+
const normalized = result.tiktokUser.username.toLowerCase().replace(/[._-]/g, '');
142
142
+
const sourceAccountId = sourceAccountIdMap.get(normalized);
143
143
+
144
144
+
if (sourceAccountId && result.atprotoMatches && result.atprotoMatches.length > 0) {
145
145
+
matchedCount++;
146
146
+
matchedSourceAccountIds.push(sourceAccountId);
147
147
+
148
148
+
for (const match of result.atprotoMatches) {
149
149
+
allMatches.push({
133
150
sourceAccountId,
134
134
-
result.tiktokUser.date
135
135
-
);
136
136
-
137
137
-
// 3. If matches found, store them
138
138
-
if (result.atprotoMatches && result.atprotoMatches.length > 0) {
139
139
-
matchedCount++;
140
140
-
141
141
-
// Mark source account as matched
142
142
-
await markSourceAccountMatched(sourceAccountId);
143
143
-
144
144
-
// Store each match
145
145
-
for (const match of result.atprotoMatches) {
146
146
-
const atprotoMatchId = await storeAtprotoMatch(
147
147
-
sourceAccountId,
148
148
-
match.did,
149
149
-
match.handle,
150
150
-
match.displayName,
151
151
-
match.avatar,
152
152
-
match.matchScore
153
153
-
);
154
154
-
155
155
-
// Create user match status (viewed = true since they just searched)
156
156
-
await createUserMatchStatus(
157
157
-
userSession.did,
158
158
-
atprotoMatchId,
159
159
-
sourceAccountId,
160
160
-
true
161
161
-
);
162
162
-
}
163
163
-
}
164
164
-
} catch (error) {
165
165
-
console.error(`Error processing result for ${result.tiktokUser.username}:`, error);
166
166
-
// Continue processing other results
151
151
+
atprotoDid: match.did,
152
152
+
atprotoHandle: match.handle,
153
153
+
atprotoDisplayName: match.displayName,
154
154
+
atprotoAvatar: match.avatar,
155
155
+
matchScore: match.matchScore
156
156
+
});
167
157
}
168
168
-
}));
158
158
+
}
159
159
+
}
160
160
+
161
161
+
// Store all matches in one operation
162
162
+
let matchIdMap = new Map<string, number>();
163
163
+
if (allMatches.length > 0) {
164
164
+
matchIdMap = await bulkStoreAtprotoMatches(allMatches);
165
165
+
}
166
166
+
167
167
+
// BULK OPERATION 4: Mark all matched source accounts
168
168
+
if (matchedSourceAccountIds.length > 0) {
169
169
+
await bulkMarkSourceAccountsMatched(matchedSourceAccountIds);
170
170
+
}
171
171
+
172
172
+
// BULK OPERATION 5: Create all user match statuses
173
173
+
const statuses: Array<{
174
174
+
did: string;
175
175
+
atprotoMatchId: number;
176
176
+
sourceAccountId: number;
177
177
+
viewed: boolean;
178
178
+
}> = [];
179
179
+
180
180
+
for (const match of allMatches) {
181
181
+
const key = `${match.sourceAccountId}:${match.atprotoDid}`;
182
182
+
const matchId = matchIdMap.get(key);
183
183
+
if (matchId) {
184
184
+
statuses.push({
185
185
+
did: userSession.did,
186
186
+
atprotoMatchId: matchId,
187
187
+
sourceAccountId: match.sourceAccountId,
188
188
+
viewed: true
189
189
+
});
190
190
+
}
191
191
+
}
192
192
+
193
193
+
if (statuses.length > 0) {
194
194
+
await bulkCreateUserMatchStatus(statuses);
169
195
}
170
196
171
197
// Update upload record with final counts