tangled
alpha
login
or
join now
evan.jarrett.net
/
at-container-registry
66
fork
atom
A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
atcr.io
docker
container
atproto
go
66
fork
atom
overview
issues
1
pulls
pipelines
update repomgr to support prevdata
evan.jarrett.net
3 weeks ago
b235e4a7
7d74e767
verified
This commit was signed with the committer's
known signature
.
evan.jarrett.net
SSH Key Fingerprint:
SHA256:bznk0uVPp7XFOl67P0uTM1pCjf2A4ojeP/lsUE7uauQ=
1/2
lint.yaml
failed
3m 31s
tests.yml
success
3m 27s
+246
-53
4 changed files
expand all
collapse all
unified
split
pkg
hold
pds
events.go
events_test.go
import.go
repomgr.go
+83
-43
pkg/hold/pds/events.go
reviewed
···
50
50
51
51
// RepoCommitEvent represents a #commit event in subscribeRepos
52
52
type RepoCommitEvent struct {
53
53
-
Seq int64 `json:"seq" cborgen:"seq"`
54
54
-
Repo string `json:"repo" cborgen:"repo"`
55
55
-
Commit string `json:"commit" cborgen:"commit"` // CID string
56
56
-
Rev string `json:"rev" cborgen:"rev"`
57
57
-
Since *string `json:"since,omitempty" cborgen:"since,omitempty"`
58
58
-
Blocks []byte `json:"blocks" cborgen:"blocks"` // CAR slice bytes
59
59
-
Ops []*atproto.SyncSubscribeRepos_RepoOp `json:"ops" cborgen:"ops"`
60
60
-
Time string `json:"time" cborgen:"time"`
61
61
-
Type string `json:"$type" cborgen:"$type"` // Always "#commit"
53
53
+
Seq int64 `json:"seq" cborgen:"seq"`
54
54
+
Repo string `json:"repo" cborgen:"repo"`
55
55
+
Commit string `json:"commit" cborgen:"commit"` // CID string
56
56
+
Rev string `json:"rev" cborgen:"rev"`
57
57
+
Since *string `json:"since,omitempty" cborgen:"since,omitempty"`
58
58
+
PrevData string `json:"prevData,omitempty" cborgen:"prevData,omitempty"` // MST root CID string of previous commit
59
59
+
Blocks []byte `json:"blocks" cborgen:"blocks"` // CAR slice bytes
60
60
+
Ops []*atproto.SyncSubscribeRepos_RepoOp `json:"ops" cborgen:"ops"`
61
61
+
Time string `json:"time" cborgen:"time"`
62
62
+
Type string `json:"$type" cborgen:"$type"` // Always "#commit"
62
63
}
63
64
64
65
// NewEventBroadcaster creates a new event broadcaster with persistent storage
···
155
156
commit_cid TEXT NOT NULL,
156
157
rev TEXT NOT NULL,
157
158
since_rev TEXT,
159
159
+
prev_data TEXT,
158
160
repo_slice BLOB NOT NULL,
159
161
ops_json TEXT NOT NULL,
160
162
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
···
167
169
return err
168
170
}
169
171
}
172
172
+
173
173
+
// Migration: add prev_data column if missing (existing databases)
174
174
+
b.db.Exec("ALTER TABLE firehose_events ADD COLUMN prev_data TEXT")
170
175
171
176
// Load last sequence number from database
172
177
var lastSeq sql.NullInt64
···
522
527
sinceRev = sql.NullString{String: *event.Since, Valid: true}
523
528
}
524
529
530
530
+
// Get prev_data value (may be empty)
531
531
+
var prevData sql.NullString
532
532
+
if event.PrevData != "" {
533
533
+
prevData = sql.NullString{String: event.PrevData, Valid: true}
534
534
+
}
535
535
+
525
536
// Insert event
526
537
query := `
527
527
-
INSERT INTO firehose_events (seq, commit_cid, rev, since_rev, repo_slice, ops_json)
528
528
-
VALUES (?, ?, ?, ?, ?, ?)
538
538
+
INSERT INTO firehose_events (seq, commit_cid, rev, since_rev, prev_data, repo_slice, ops_json)
539
539
+
VALUES (?, ?, ?, ?, ?, ?, ?)
529
540
`
530
530
-
_, err = b.db.Exec(query, event.Seq, event.Commit, event.Rev, sinceRev, event.Blocks, opsJSON)
541
541
+
_, err = b.db.Exec(query, event.Seq, event.Commit, event.Rev, sinceRev, prevData, event.Blocks, opsJSON)
531
542
return err
532
543
}
533
544
···
556
567
// Event.NewRoot is a cid.Cid, convert to string
557
568
commitCID := event.NewRoot.String()
558
569
570
570
+
// Convert PrevData CID to string if present
571
571
+
var prevData string
572
572
+
if event.PrevData != nil {
573
573
+
prevData = event.PrevData.String()
574
574
+
}
575
575
+
559
576
return &RepoCommitEvent{
560
560
-
Seq: seq,
561
561
-
Repo: b.holdDID, // Set to hold's DID
562
562
-
Commit: commitCID,
563
563
-
Rev: event.Rev,
564
564
-
Since: event.Since,
565
565
-
Blocks: event.RepoSlice, // CAR slice bytes
566
566
-
Ops: ops,
567
567
-
Time: time.Now().Format(time.RFC3339),
568
568
-
Type: "#commit",
577
577
+
Seq: seq,
578
578
+
Repo: b.holdDID, // Set to hold's DID
579
579
+
Commit: commitCID,
580
580
+
Rev: event.Rev,
581
581
+
Since: event.Since,
582
582
+
PrevData: prevData,
583
583
+
Blocks: event.RepoSlice, // CAR slice bytes
584
584
+
Ops: ops,
585
585
+
Time: time.Now().Format(time.RFC3339),
586
586
+
Type: "#commit",
569
587
}
570
588
}
571
589
···
605
623
// Query events where seq > cursor, ordered by seq
606
624
// Include created_at to preserve original event timestamp
607
625
query := `
608
608
-
SELECT seq, commit_cid, rev, since_rev, repo_slice, ops_json, created_at
626
626
+
SELECT seq, commit_cid, rev, since_rev, prev_data, repo_slice, ops_json, created_at
609
627
FROM firehose_events
610
628
WHERE seq > ?
611
629
ORDER BY seq ASC
···
623
641
commitCID string
624
642
rev string
625
643
sinceRev sql.NullString
644
644
+
prevData sql.NullString
626
645
repoSlice []byte
627
646
opsJSON []byte
628
647
createdAt time.Time
629
648
)
630
649
631
631
-
if err := rows.Scan(&seq, &commitCID, &rev, &sinceRev, &repoSlice, &opsJSON, &createdAt); err != nil {
650
650
+
if err := rows.Scan(&seq, &commitCID, &rev, &sinceRev, &prevData, &repoSlice, &opsJSON, &createdAt); err != nil {
632
651
slog.Error("Error scanning event row", "error", err)
633
652
continue
634
653
}
···
646
665
since = &sinceRev.String
647
666
}
648
667
668
668
+
var prevDataStr string
669
669
+
if prevData.Valid {
670
670
+
prevDataStr = prevData.String
671
671
+
}
672
672
+
649
673
event := &RepoCommitEvent{
650
650
-
Seq: seq,
651
651
-
Repo: b.holdDID,
652
652
-
Commit: commitCID,
653
653
-
Rev: rev,
654
654
-
Since: since,
655
655
-
Blocks: repoSlice,
656
656
-
Ops: ops,
657
657
-
Time: createdAt.Format(time.RFC3339), // Use original event time from database
658
658
-
Type: "#commit",
674
674
+
Seq: seq,
675
675
+
Repo: b.holdDID,
676
676
+
Commit: commitCID,
677
677
+
Rev: rev,
678
678
+
Since: since,
679
679
+
PrevData: prevDataStr,
680
680
+
Blocks: repoSlice,
681
681
+
Ops: ops,
682
682
+
Time: createdAt.Format(time.RFC3339), // Use original event time from database
683
683
+
Type: "#commit",
659
684
}
660
685
661
686
// Send to subscriber
···
780
805
// Convert blocks to LexBytes
781
806
blocks := lexutil.LexBytes(event.Blocks)
782
807
808
808
+
// Convert prevData CID string to LexLink if present
809
809
+
var prevDataLink *lexutil.LexLink
810
810
+
if event.PrevData != "" {
811
811
+
prevDataCID, err := cid.Decode(event.PrevData)
812
812
+
if err != nil {
813
813
+
slog.Warn("Failed to parse prevData CID",
814
814
+
"cid", event.PrevData,
815
815
+
"error", err)
816
816
+
} else {
817
817
+
link := lexutil.LexLink(prevDataCID)
818
818
+
prevDataLink = &link
819
819
+
}
820
820
+
}
821
821
+
783
822
return &atproto.SyncSubscribeRepos_Commit{
784
784
-
Seq: event.Seq,
785
785
-
Repo: event.Repo,
786
786
-
Commit: commitLink,
787
787
-
Rev: event.Rev,
788
788
-
Since: event.Since,
789
789
-
Blocks: blocks,
790
790
-
Ops: event.Ops,
791
791
-
Time: event.Time,
792
792
-
Blobs: []lexutil.LexLink{}, // Empty for now, we don't track blob refs in our simplified model
793
793
-
Rebase: false, // DEPRECATED field
794
794
-
TooBig: false, // Not implementing tooBig for now
823
823
+
Seq: event.Seq,
824
824
+
Repo: event.Repo,
825
825
+
Commit: commitLink,
826
826
+
Rev: event.Rev,
827
827
+
Since: event.Since,
828
828
+
PrevData: prevDataLink,
829
829
+
Blocks: blocks,
830
830
+
Ops: event.Ops,
831
831
+
Time: event.Time,
832
832
+
Blobs: []lexutil.LexLink{}, // Empty for now, we don't track blob refs in our simplified model
833
833
+
Rebase: false, // DEPRECATED field
834
834
+
TooBig: false, // Not implementing tooBig for now
795
835
}
796
836
}
797
837
+64
pkg/hold/pds/events_test.go
reviewed
···
184
184
broadcaster := NewEventBroadcaster("did:web:hold.example.com", 10, "")
185
185
186
186
testCID, _ := cid.Decode("bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke")
187
187
+
prevDataCID, _ := cid.Decode("bafyreie5737gdxlw5i64mnsc7x35mha5ee4w5vqnivxceag4pfd2mhqtsu")
187
188
since := "prev-rev"
188
189
189
190
event := &RepoEvent{
190
191
NewRoot: testCID,
192
192
+
PrevData: &prevDataCID,
191
193
Rev: "test-rev-123",
192
194
Since: &since,
193
195
RepoSlice: []byte("test CAR data"),
···
234
236
235
237
if commitEvent.Since == nil || *commitEvent.Since != since {
236
238
t.Errorf("Expected since=%s, got %v", since, commitEvent.Since)
239
239
+
}
240
240
+
241
241
+
// Verify prevData is set
242
242
+
if commitEvent.PrevData != prevDataCID.String() {
243
243
+
t.Errorf("Expected prevData=%s, got %s", prevDataCID.String(), commitEvent.PrevData)
237
244
}
238
245
239
246
if string(commitEvent.Blocks) != "test CAR data" {
···
312
319
313
320
if commitEvent.Since != nil {
314
321
t.Errorf("Expected nil since, got %v", commitEvent.Since)
322
322
+
}
323
323
+
324
324
+
// PrevData should be empty when not set
325
325
+
if commitEvent.PrevData != "" {
326
326
+
t.Errorf("Expected empty prevData when not set, got %s", commitEvent.PrevData)
327
327
+
}
328
328
+
}
329
329
+
330
330
+
// TestConvertToIndigoCommit_PrevData tests that prevData is correctly set on the indigo commit
331
331
+
func TestConvertToIndigoCommit_PrevData(t *testing.T) {
332
332
+
testCID, _ := cid.Decode("bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke")
333
333
+
prevDataCID, _ := cid.Decode("bafyreie5737gdxlw5i64mnsc7x35mha5ee4w5vqnivxceag4pfd2mhqtsu")
334
334
+
335
335
+
event := &RepoCommitEvent{
336
336
+
Seq: 1,
337
337
+
Repo: "did:web:hold.example.com",
338
338
+
Commit: testCID.String(),
339
339
+
Rev: "test-rev",
340
340
+
PrevData: prevDataCID.String(),
341
341
+
Blocks: []byte("test data"),
342
342
+
Ops: []*atproto.SyncSubscribeRepos_RepoOp{},
343
343
+
Time: time.Now().Format(time.RFC3339),
344
344
+
Type: "#commit",
345
345
+
}
346
346
+
347
347
+
indigoCommit := convertToIndigoCommit(event)
348
348
+
349
349
+
// PrevData should be set
350
350
+
if indigoCommit.PrevData == nil {
351
351
+
t.Fatal("Expected non-nil PrevData on indigo commit")
352
352
+
}
353
353
+
354
354
+
if cid.Cid(*indigoCommit.PrevData) != prevDataCID {
355
355
+
t.Errorf("Expected PrevData CID=%s, got %s", prevDataCID.String(), cid.Cid(*indigoCommit.PrevData).String())
356
356
+
}
357
357
+
}
358
358
+
359
359
+
// TestConvertToIndigoCommit_NoPrevData tests that empty prevData results in nil PrevData
360
360
+
func TestConvertToIndigoCommit_NoPrevData(t *testing.T) {
361
361
+
testCID, _ := cid.Decode("bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke")
362
362
+
363
363
+
event := &RepoCommitEvent{
364
364
+
Seq: 1,
365
365
+
Repo: "did:web:hold.example.com",
366
366
+
Commit: testCID.String(),
367
367
+
Rev: "test-rev",
368
368
+
Blocks: []byte("test data"),
369
369
+
Ops: []*atproto.SyncSubscribeRepos_RepoOp{},
370
370
+
Time: time.Now().Format(time.RFC3339),
371
371
+
Type: "#commit",
372
372
+
}
373
373
+
374
374
+
indigoCommit := convertToIndigoCommit(event)
375
375
+
376
376
+
// PrevData should be nil when not set
377
377
+
if indigoCommit.PrevData != nil {
378
378
+
t.Errorf("Expected nil PrevData when not set, got %v", indigoCommit.PrevData)
315
379
}
316
380
}
317
381
+8
pkg/hold/pds/import.go
reviewed
···
120
120
return err
121
121
}
122
122
123
123
+
// Capture previous MST root before commit overwrites it
124
124
+
var prevData *cid.Cid
125
125
+
if head.Defined() {
126
126
+
pd := r.DataCid()
127
127
+
prevData = &pd
128
128
+
}
129
129
+
123
130
ops := make([]RepoOp, 0, len(records))
124
131
for _, rec := range records {
125
132
rpath := rec.Collection + "/" + rec.Rkey
···
169
176
User: p.uid,
170
177
OldRoot: oldroot,
171
178
NewRoot: nroot,
179
179
+
PrevData: prevData,
172
180
Rev: nrev,
173
181
Since: &rev,
174
182
Ops: ops,
+91
-10
pkg/hold/pds/repomgr.go
reviewed
···
13
13
// - Added PutRecord method (lines 309-381) for creating records with explicit rkeys
14
14
// (like CreateRecord but with specified rkey instead of auto-generated TID)
15
15
// Based on streamplace/indigo implementation
16
16
+
// - Added prevData to support Sync 1.1
16
17
package pds
17
18
18
19
import (
···
102
103
User models.Uid
103
104
OldRoot *cid.Cid
104
105
NewRoot cid.Cid
106
106
+
PrevData *cid.Cid // MST root CID of the previous commit (for firehose prevData field)
105
107
Since *string
106
108
Rev string
107
109
RepoSlice []byte
···
196
198
return "", cid.Undef, err
197
199
}
198
200
201
201
+
// Capture previous MST root before commit overwrites it
202
202
+
var prevData *cid.Cid
203
203
+
if head.Defined() {
204
204
+
pd := r.DataCid()
205
205
+
prevData = &pd
206
206
+
}
207
207
+
199
208
cc, tid, err := r.CreateRecord(ctx, collection, rec)
200
209
if err != nil {
201
210
return "", cid.Undef, err
···
218
227
219
228
if rm.events != nil {
220
229
rm.events(ctx, &RepoEvent{
221
221
-
User: user,
222
222
-
OldRoot: oldroot,
223
223
-
NewRoot: nroot,
224
224
-
Rev: nrev,
225
225
-
Since: &rev,
230
230
+
User: user,
231
231
+
OldRoot: oldroot,
232
232
+
NewRoot: nroot,
233
233
+
PrevData: prevData,
234
234
+
Rev: nrev,
235
235
+
Since: &rev,
226
236
Ops: []RepoOp{{
227
237
Kind: EvtKindCreateRecord,
228
238
Collection: collection,
···
260
270
return cid.Undef, err
261
271
}
262
272
273
273
+
// Capture previous MST root before commit overwrites it
274
274
+
var prevData *cid.Cid
275
275
+
if head.Defined() {
276
276
+
pd := r.DataCid()
277
277
+
prevData = &pd
278
278
+
}
279
279
+
263
280
rpath := collection + "/" + rkey
264
281
cc, err := r.UpdateRecord(ctx, rpath, rec)
265
282
if err != nil {
···
297
314
User: user,
298
315
OldRoot: oldroot,
299
316
NewRoot: nroot,
317
317
+
PrevData: prevData,
300
318
Rev: nrev,
301
319
Since: &rev,
302
320
Ops: []RepoOp{op},
···
334
352
return "", cid.Undef, err
335
353
}
336
354
355
355
+
// Capture previous MST root before commit overwrites it
356
356
+
var prevData *cid.Cid
357
357
+
if head.Defined() {
358
358
+
pd := r.DataCid()
359
359
+
prevData = &pd
360
360
+
}
361
361
+
337
362
rpath := collection + "/" + rkey
338
363
cc, err := r.PutRecord(ctx, rpath, rec)
339
364
if err != nil {
···
371
396
User: user,
372
397
OldRoot: oldroot,
373
398
NewRoot: nroot,
399
399
+
PrevData: prevData,
374
400
Rev: nrev,
375
401
Since: &rev,
376
402
Ops: []RepoOp{op},
···
407
433
return "", cid.Undef, false, err
408
434
}
409
435
436
436
+
// Capture previous MST root before commit overwrites it
437
437
+
var prevData *cid.Cid
438
438
+
if head.Defined() {
439
439
+
pd := r.DataCid()
440
440
+
prevData = &pd
441
441
+
}
442
442
+
410
443
rpath := collection + "/" + rkey
411
444
412
445
// Check if record exists
···
459
492
User: user,
460
493
OldRoot: oldroot,
461
494
NewRoot: nroot,
495
495
+
PrevData: prevData,
462
496
Rev: nrev,
463
497
Since: &rev,
464
498
Ops: []RepoOp{op},
···
490
524
r, err := repo.OpenRepo(ctx, ds, head)
491
525
if err != nil {
492
526
return err
527
527
+
}
528
528
+
529
529
+
// Capture previous MST root before commit overwrites it
530
530
+
var prevData *cid.Cid
531
531
+
if head.Defined() {
532
532
+
pd := r.DataCid()
533
533
+
prevData = &pd
493
534
}
494
535
495
536
rpath := collection + "/" + rkey
···
514
555
515
556
if rm.events != nil {
516
557
rm.events(ctx, &RepoEvent{
517
517
-
User: user,
518
518
-
OldRoot: oldroot,
519
519
-
NewRoot: nroot,
520
520
-
Rev: nrev,
521
521
-
Since: &rev,
558
558
+
User: user,
559
559
+
OldRoot: oldroot,
560
560
+
NewRoot: nroot,
561
561
+
PrevData: prevData,
562
562
+
Rev: nrev,
563
563
+
Since: &rev,
522
564
Ops: []RepoOp{{
523
565
Kind: EvtKindDeleteRecord,
524
566
Collection: collection,
···
750
792
return fmt.Errorf("check repo sig: %w", err)
751
793
}
752
794
795
795
+
// Capture previous MST root from old repo state if it exists
796
796
+
var prevData *cid.Cid
797
797
+
if ds.BaseCid().Defined() {
798
798
+
oldrepo, err := repo.OpenRepo(ctx, ds, ds.BaseCid())
799
799
+
if err == nil {
800
800
+
pd := oldrepo.DataCid()
801
801
+
prevData = &pd
802
802
+
}
803
803
+
}
804
804
+
753
805
evtops := make([]RepoOp, 0, len(ops))
754
806
for _, op := range ops {
755
807
parts := strings.SplitN(op.Path, "/", 2)
···
809
861
User: uid,
810
862
//OldRoot: prev,
811
863
NewRoot: root,
864
864
+
PrevData: prevData,
812
865
Rev: nrev,
813
866
Since: since,
814
867
Ops: evtops,
···
846
899
}
847
900
848
901
var skipcids map[cid.Cid]bool
902
902
+
var prevData *cid.Cid
849
903
if ds.BaseCid().Defined() {
850
904
oldrepo, err := repo.OpenRepo(ctx, ds, ds.BaseCid())
851
905
if err != nil {
852
906
return fmt.Errorf("failed to check data root in old repo: %w", err)
853
907
}
854
908
909
909
+
// Capture previous MST root for prevData
910
910
+
pd := oldrepo.DataCid()
911
911
+
prevData = &pd
912
912
+
855
913
// if the old commit has a 'prev', CalcDiff will error out while trying
856
914
// to walk it. This is an old repo thing that is being deprecated.
857
915
// This check is a temporary workaround until all repos get migrated
···
933
991
User: uid,
934
992
//OldRoot: prev,
935
993
NewRoot: root,
994
994
+
PrevData: prevData,
936
995
Rev: nrev,
937
996
Since: since,
938
997
Ops: evtops,
···
967
1026
return err
968
1027
}
969
1028
1029
1029
+
// Capture previous MST root before commit overwrites it
1030
1030
+
var prevData *cid.Cid
1031
1031
+
if head.Defined() {
1032
1032
+
pd := r.DataCid()
1033
1033
+
prevData = &pd
1034
1034
+
}
1035
1035
+
970
1036
ops := make([]RepoOp, 0, len(writes))
971
1037
for _, w := range writes {
972
1038
switch {
···
1054
1120
User: user,
1055
1121
OldRoot: oldroot,
1056
1122
NewRoot: nroot,
1123
1123
+
PrevData: prevData,
1057
1124
RepoSlice: rslice,
1058
1125
Rev: nrev,
1059
1126
Since: &rev,
···
1095
1162
return fmt.Errorf("ImportNewRepo called with incorrect base")
1096
1163
}
1097
1164
1165
1165
+
// Capture previous MST root before import overwrites it
1166
1166
+
var prevData *cid.Cid
1167
1167
+
if curhead.Defined() {
1168
1168
+
robs, err := rm.cs.ReadOnlySession(user)
1169
1169
+
if err == nil {
1170
1170
+
oldrepo, err := repo.OpenRepo(ctx, robs, curhead)
1171
1171
+
if err == nil {
1172
1172
+
pd := oldrepo.DataCid()
1173
1173
+
prevData = &pd
1174
1174
+
}
1175
1175
+
}
1176
1176
+
}
1177
1177
+
1098
1178
err = rm.processNewRepo(ctx, user, r, rev, func(ctx context.Context, root cid.Cid, finish func(context.Context, string) ([]byte, error), bs blockstore.Blockstore) error {
1099
1179
r, err := repo.OpenRepo(ctx, bs, root)
1100
1180
if err != nil {
···
1139
1219
User: user,
1140
1220
//OldRoot: oldroot,
1141
1221
NewRoot: root,
1222
1222
+
PrevData: prevData,
1142
1223
Rev: scom.Rev,
1143
1224
Since: &currev,
1144
1225
RepoSlice: slice,