tangled
alpha
login
or
join now
atscan.net
/
plcbundle
14
fork
atom
A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
14
fork
atom
overview
issues
2
pulls
pipelines
non-verbose start
tree.fail
4 months ago
e87aec3e
34dfc443
1/1
build.yml
success
4mo ago
+56
-44
6 changed files
expand all
collapse all
unified
split
bundle
bundle_test.go
manager.go
types.go
cmd
plcbundle
commands
common.go
internal
mempool
mempool.go
mempool_test.go
+6
-6
bundle/bundle_test.go
···
201
201
202
202
t.Run("CreateAndAdd", func(t *testing.T) {
203
203
minTime := time.Now().Add(-time.Hour)
204
204
-
m, err := mempool.NewMempool(tmpDir, 1, minTime, logger)
204
204
+
m, err := mempool.NewMempool(tmpDir, 1, minTime, logger, false)
205
205
if err != nil {
206
206
t.Fatalf("NewMempool failed: %v", err)
207
207
}
···
221
221
222
222
t.Run("ChronologicalValidation", func(t *testing.T) {
223
223
minTime := time.Now().Add(-time.Hour)
224
224
-
m, err := mempool.NewMempool(tmpDir, 2, minTime, logger)
224
224
+
m, err := mempool.NewMempool(tmpDir, 2, minTime, logger, false)
225
225
if err != nil {
226
226
t.Fatalf("NewMempool failed: %v", err)
227
227
}
···
249
249
250
250
t.Run("TakeOperations", func(t *testing.T) {
251
251
minTime := time.Now().Add(-time.Hour)
252
252
-
m, err := mempool.NewMempool(tmpDir, 3, minTime, logger)
252
252
+
m, err := mempool.NewMempool(tmpDir, 3, minTime, logger, false)
253
253
if err != nil {
254
254
t.Fatalf("NewMempool failed: %v", err)
255
255
}
···
271
271
272
272
t.Run("SaveAndLoad", func(t *testing.T) {
273
273
minTime := time.Now().Add(-time.Hour)
274
274
-
m, err := mempool.NewMempool(tmpDir, 4, minTime, logger)
274
274
+
m, err := mempool.NewMempool(tmpDir, 4, minTime, logger, false)
275
275
if err != nil {
276
276
t.Fatalf("NewMempool failed: %v", err)
277
277
}
···
284
284
}
285
285
286
286
// Create new mempool and load
287
287
-
m2, err := mempool.NewMempool(tmpDir, 4, minTime, logger)
287
287
+
m2, err := mempool.NewMempool(tmpDir, 4, minTime, logger, false)
288
288
if err != nil {
289
289
t.Fatalf("NewMempool failed: %v", err)
290
290
}
···
296
296
297
297
t.Run("Validate", func(t *testing.T) {
298
298
minTime := time.Now().Add(-time.Hour)
299
299
-
m, err := mempool.NewMempool(tmpDir, 5, minTime, logger)
299
299
+
m, err := mempool.NewMempool(tmpDir, 5, minTime, logger, false)
300
300
if err != nil {
301
301
t.Fatalf("NewMempool failed: %v", err)
302
302
}
+5
-4
bundle/manager.go
···
150
150
)
151
151
}
152
152
153
153
-
config.Logger.Printf("Loaded index with %d bundles (origin: %s)", index.Count(), index.Origin)
153
153
+
if config.Verbose {
154
154
+
config.Logger.Printf("Loaded index with %d bundles (origin: %s)", index.Count(), index.Origin)
155
155
+
}
154
156
155
157
// Check if there are bundle files not in the index
156
158
if hasBundleFiles && len(bundleFiles) > index.Count() {
···
283
285
minTimestamp = lastBundle.EndTime
284
286
}
285
287
286
286
-
mempool, err := mempool.NewMempool(config.BundleDir, nextBundleNum, minTimestamp, config.Logger)
288
288
+
mempool, err := mempool.NewMempool(config.BundleDir, nextBundleNum, minTimestamp, config.Logger, config.Verbose)
287
289
if err != nil {
288
290
return nil, fmt.Errorf("failed to initialize mempool: %w", err)
289
291
}
···
299
301
var handleResolver *handleresolver.Client
300
302
if config.HandleResolverURL != "" {
301
303
handleResolver = handleresolver.NewClient(config.HandleResolverURL)
302
302
-
config.Logger.Printf("Handle resolver configured: %s", config.HandleResolverURL)
303
304
}
304
305
305
306
return &Manager{
···
487
488
nextBundle := bundle.BundleNumber + 1
488
489
minTimestamp := bundle.EndTime
489
490
490
490
-
newMempool, err := mempool.NewMempool(m.config.BundleDir, nextBundle, minTimestamp, m.logger)
491
491
+
newMempool, err := mempool.NewMempool(m.config.BundleDir, nextBundle, minTimestamp, m.logger, m.config.Verbose)
491
492
if err != nil {
492
493
return 0, fmt.Errorf("failed to create new mempool: %w", err)
493
494
}
+2
bundle/types.go
···
127
127
RebuildWorkers int // Number of workers for parallel rebuild (0 = auto-detect)
128
128
RebuildProgress func(current, total int) // Progress callback for rebuild
129
129
Logger types.Logger
130
130
+
Verbose bool
130
131
}
131
132
132
133
// DefaultConfig returns default configuration
···
140
141
RebuildWorkers: 0, // 0 means auto-detect CPU count
141
142
RebuildProgress: nil, // No progress callback by default
142
143
Logger: nil,
144
144
+
Verbose: false,
143
145
}
144
146
}
145
147
+7
cmd/plcbundle/commands/common.go
···
116
116
config := bundle.DefaultConfig(absDir)
117
117
config.AutoInit = opts.AutoInit
118
118
119
119
+
// Set verbose from command if available
120
120
+
if opts.Cmd != nil {
121
121
+
if verbose, err := opts.Cmd.Root().PersistentFlags().GetBool("verbose"); err == nil {
122
122
+
config.Verbose = verbose
123
123
+
}
124
124
+
}
125
125
+
119
126
// Create PLC client if URL provided
120
127
var client *plcclient.Client
121
128
if opts.PLCURL != "" {
+4
-2
internal/mempool/mempool.go
···
27
27
logger types.Logger
28
28
validated bool
29
29
dirty bool
30
30
+
verbose bool
30
31
31
32
// Incremental save tracking
32
33
lastSavedLen int // How many ops are persisted
···
36
37
}
37
38
38
39
// NewMempool creates a new mempool for a specific bundle number
39
39
-
func NewMempool(bundleDir string, targetBundle int, minTimestamp time.Time, logger types.Logger) (*Mempool, error) {
40
40
+
func NewMempool(bundleDir string, targetBundle int, minTimestamp time.Time, logger types.Logger, verbose bool) (*Mempool, error) {
40
41
filename := fmt.Sprintf("%s%06d.jsonl", MEMPOOL_FILE_PREFIX, targetBundle)
41
42
42
43
m := &Mempool{
···
46
47
operations: make([]plcclient.PLCOperation, 0),
47
48
logger: logger,
48
49
validated: false,
50
50
+
verbose: verbose,
49
51
lastSavedLen: 0,
50
52
lastSaveTime: time.Now(),
51
53
saveThreshold: 100,
···
386
388
m.lastSaveTime = time.Now()
387
389
m.dirty = false
388
390
389
389
-
if len(m.operations) > 0 {
391
391
+
if m.verbose && len(m.operations) > 0 {
390
392
m.logger.Printf("Loaded %d operations from mempool for bundle %06d", len(m.operations), m.targetBundle)
391
393
}
392
394
+32
-32
internal/mempool/mempool_test.go
···
36
36
37
37
t.Run("RejectOutOfOrder", func(t *testing.T) {
38
38
minTime := baseTime
39
39
-
m, err := mempool.NewMempool(tmpDir, 1, minTime, logger)
39
39
+
m, err := mempool.NewMempool(tmpDir, 1, minTime, logger, false)
40
40
if err != nil {
41
41
t.Fatalf("NewMempool failed: %v", err)
42
42
}
···
70
70
71
71
t.Run("RejectBeforeMinTimestamp", func(t *testing.T) {
72
72
minTime := baseTime.Add(10 * time.Second)
73
73
-
m, err := mempool.NewMempool(tmpDir, 2, minTime, logger)
73
73
+
m, err := mempool.NewMempool(tmpDir, 2, minTime, logger, false)
74
74
if err != nil {
75
75
t.Fatalf("NewMempool failed: %v", err)
76
76
}
···
88
88
89
89
t.Run("AllowEqualTimestamps", func(t *testing.T) {
90
90
minTime := baseTime
91
91
-
m, err := mempool.NewMempool(tmpDir, 3, minTime, logger)
91
91
+
m, err := mempool.NewMempool(tmpDir, 3, minTime, logger, false)
92
92
if err != nil {
93
93
t.Fatalf("NewMempool failed: %v", err)
94
94
}
···
113
113
114
114
t.Run("ChronologicalAfterReload", func(t *testing.T) {
115
115
minTime := baseTime
116
116
-
m, err := mempool.NewMempool(tmpDir, 4, minTime, logger)
116
116
+
m, err := mempool.NewMempool(tmpDir, 4, minTime, logger, false)
117
117
if err != nil {
118
118
t.Fatalf("NewMempool failed: %v", err)
119
119
}
···
127
127
m.Save()
128
128
129
129
// Reload mempool
130
130
-
m2, err := mempool.NewMempool(tmpDir, 4, minTime, logger)
130
130
+
m2, err := mempool.NewMempool(tmpDir, 4, minTime, logger, false)
131
131
if err != nil {
132
132
t.Fatalf("NewMempool reload failed: %v", err)
133
133
}
···
159
159
160
160
t.Run("StrictIncreasingOrder", func(t *testing.T) {
161
161
minTime := baseTime
162
162
-
m, err := mempool.NewMempool(tmpDir, 5, minTime, logger)
162
162
+
m, err := mempool.NewMempool(tmpDir, 5, minTime, logger, false)
163
163
if err != nil {
164
164
t.Fatalf("NewMempool failed: %v", err)
165
165
}
···
194
194
195
195
t.Run("SameCIDTwice", func(t *testing.T) {
196
196
minTime := baseTime
197
197
-
m, err := mempool.NewMempool(tmpDir, 6, minTime, logger)
197
197
+
m, err := mempool.NewMempool(tmpDir, 6, minTime, logger, false)
198
198
if err != nil {
199
199
t.Fatalf("NewMempool failed: %v", err)
200
200
}
···
230
230
231
231
t.Run("DuplicateAcrossSaveLoad", func(t *testing.T) {
232
232
minTime := baseTime
233
233
-
m, err := mempool.NewMempool(tmpDir, 7, minTime, logger)
233
233
+
m, err := mempool.NewMempool(tmpDir, 7, minTime, logger, false)
234
234
if err != nil {
235
235
t.Fatalf("NewMempool failed: %v", err)
236
236
}
···
246
246
m.Save()
247
247
248
248
// Reload
249
249
-
m2, err := mempool.NewMempool(tmpDir, 7, minTime, logger)
249
249
+
m2, err := mempool.NewMempool(tmpDir, 7, minTime, logger, false)
250
250
if err != nil {
251
251
t.Fatalf("reload failed: %v", err)
252
252
}
···
268
268
269
269
t.Run("DuplicatesInBatch", func(t *testing.T) {
270
270
minTime := baseTime
271
271
-
m, err := mempool.NewMempool(tmpDir, 8, minTime, logger)
271
271
+
m, err := mempool.NewMempool(tmpDir, 8, minTime, logger, false)
272
272
if err != nil {
273
273
t.Fatalf("NewMempool failed: %v", err)
274
274
}
···
307
307
308
308
t.Run("SaveAndLoad", func(t *testing.T) {
309
309
minTime := baseTime
310
310
-
m, err := mempool.NewMempool(tmpDir, 9, minTime, logger)
310
310
+
m, err := mempool.NewMempool(tmpDir, 9, minTime, logger, false)
311
311
if err != nil {
312
312
t.Fatalf("NewMempool failed: %v", err)
313
313
}
···
320
320
}
321
321
322
322
// Reload
323
323
-
m2, err := mempool.NewMempool(tmpDir, 9, minTime, logger)
323
323
+
m2, err := mempool.NewMempool(tmpDir, 9, minTime, logger, false)
324
324
if err != nil {
325
325
t.Fatalf("reload failed: %v", err)
326
326
}
···
341
341
// Fix the IncrementalSave test - line ~353
342
342
t.Run("IncrementalSave", func(t *testing.T) {
343
343
minTime := baseTime
344
344
-
m, err := mempool.NewMempool(tmpDir, 10, minTime, logger)
344
344
+
m, err := mempool.NewMempool(tmpDir, 10, minTime, logger, false)
345
345
if err != nil {
346
346
t.Fatalf("NewMempool failed: %v", err)
347
347
}
···
358
358
m.Save()
359
359
360
360
// Reload - should have all 20
361
361
-
m2, err := mempool.NewMempool(tmpDir, 10, minTime, logger)
361
361
+
m2, err := mempool.NewMempool(tmpDir, 10, minTime, logger, false)
362
362
if err != nil {
363
363
t.Fatalf("reload failed: %v", err)
364
364
}
···
376
376
os.WriteFile(mempoolFile, []byte("{invalid json\n{also bad"), 0644)
377
377
378
378
// Should error on load
379
379
-
_, err := mempool.NewMempool(tmpDir, 11, minTime, logger)
379
379
+
_, err := mempool.NewMempool(tmpDir, 11, minTime, logger, false)
380
380
if err == nil {
381
381
t.Error("expected error loading corrupted mempool")
382
382
}
···
384
384
385
385
t.Run("DeleteMempool", func(t *testing.T) {
386
386
minTime := baseTime
387
387
-
m, err := mempool.NewMempool(tmpDir, 12, minTime, logger)
387
387
+
m, err := mempool.NewMempool(tmpDir, 12, minTime, logger, false)
388
388
if err != nil {
389
389
t.Fatalf("NewMempool failed: %v", err)
390
390
}
···
422
422
423
423
t.Run("TakeExact", func(t *testing.T) {
424
424
minTime := baseTime
425
425
-
m, err := mempool.NewMempool(tmpDir, 13, minTime, logger)
425
425
+
m, err := mempool.NewMempool(tmpDir, 13, minTime, logger, false)
426
426
if err != nil {
427
427
t.Fatalf("NewMempool failed: %v", err)
428
428
}
···
445
445
446
446
t.Run("TakeMoreThanAvailable", func(t *testing.T) {
447
447
minTime := baseTime
448
448
-
m, err := mempool.NewMempool(tmpDir, 14, minTime, logger)
448
448
+
m, err := mempool.NewMempool(tmpDir, 14, minTime, logger, false)
449
449
if err != nil {
450
450
t.Fatalf("NewMempool failed: %v", err)
451
451
}
···
469
469
470
470
t.Run("TakePreservesOrder", func(t *testing.T) {
471
471
minTime := baseTime
472
472
-
m, err := mempool.NewMempool(tmpDir, 15, minTime, logger)
472
472
+
m, err := mempool.NewMempool(tmpDir, 15, minTime, logger, false)
473
473
if err != nil {
474
474
t.Fatalf("NewMempool failed: %v", err)
475
475
}
···
500
500
501
501
t.Run("TakeFromEmpty", func(t *testing.T) {
502
502
minTime := baseTime
503
503
-
m, err := mempool.NewMempool(tmpDir, 16, minTime, logger)
503
503
+
m, err := mempool.NewMempool(tmpDir, 16, minTime, logger, false)
504
504
if err != nil {
505
505
t.Fatalf("NewMempool failed: %v", err)
506
506
}
···
527
527
528
528
t.Run("ValidateChronological", func(t *testing.T) {
529
529
minTime := baseTime
530
530
-
m, err := mempool.NewMempool(tmpDir, 17, minTime, logger)
530
530
+
m, err := mempool.NewMempool(tmpDir, 17, minTime, logger, false)
531
531
if err != nil {
532
532
t.Fatalf("NewMempool failed: %v", err)
533
533
}
···
542
542
543
543
t.Run("ValidateDetectsMinTimestampViolation", func(t *testing.T) {
544
544
minTime := baseTime.Add(10 * time.Second)
545
545
-
_, err := mempool.NewMempool(tmpDir, 18, minTime, logger)
545
545
+
_, err := mempool.NewMempool(tmpDir, 18, minTime, logger, false)
546
546
if err != nil {
547
547
t.Fatalf("NewMempool failed: %v", err)
548
548
}
···
574
574
575
575
t.Run("ConcurrentReads", func(t *testing.T) {
576
576
minTime := baseTime
577
577
-
m, err := mempool.NewMempool(tmpDir, 19, minTime, logger)
577
577
+
m, err := mempool.NewMempool(tmpDir, 19, minTime, logger, false)
578
578
if err != nil {
579
579
t.Fatalf("NewMempool failed: %v", err)
580
580
}
···
603
603
604
604
t.Run("ConcurrentAddAndRead", func(t *testing.T) {
605
605
minTime := baseTime
606
606
-
m, err := mempool.NewMempool(tmpDir, 20, minTime, logger)
606
606
+
m, err := mempool.NewMempool(tmpDir, 20, minTime, logger, false)
607
607
if err != nil {
608
608
t.Fatalf("NewMempool failed: %v", err)
609
609
}
···
659
659
660
660
t.Run("StatsEmpty", func(t *testing.T) {
661
661
minTime := baseTime
662
662
-
m, err := mempool.NewMempool(tmpDir, 21, minTime, logger)
662
662
+
m, err := mempool.NewMempool(tmpDir, 21, minTime, logger, false)
663
663
if err != nil {
664
664
t.Fatalf("NewMempool failed: %v", err)
665
665
}
···
681
681
682
682
t.Run("StatsPopulated", func(t *testing.T) {
683
683
minTime := baseTime
684
684
-
m, err := mempool.NewMempool(tmpDir, 22, minTime, logger)
684
684
+
m, err := mempool.NewMempool(tmpDir, 22, minTime, logger, false)
685
685
if err != nil {
686
686
t.Fatalf("NewMempool failed: %v", err)
687
687
}
···
714
714
715
715
t.Run("StatsCanCreateBundle", func(t *testing.T) {
716
716
minTime := baseTime
717
717
-
m, err := mempool.NewMempool(tmpDir, 23, minTime, logger)
717
717
+
m, err := mempool.NewMempool(tmpDir, 23, minTime, logger, false)
718
718
if err != nil {
719
719
t.Fatalf("NewMempool failed: %v", err)
720
720
}
···
741
741
742
742
t.Run("FindDIDOperations", func(t *testing.T) {
743
743
minTime := baseTime
744
744
-
m, err := mempool.NewMempool(tmpDir, 24, minTime, logger)
744
744
+
m, err := mempool.NewMempool(tmpDir, 24, minTime, logger, false)
745
745
if err != nil {
746
746
t.Fatalf("NewMempool failed: %v", err)
747
747
}
···
772
772
773
773
t.Run("FindLatestDIDOperation", func(t *testing.T) {
774
774
minTime := baseTime
775
775
-
m, err := mempool.NewMempool(tmpDir, 25, minTime, logger)
775
775
+
m, err := mempool.NewMempool(tmpDir, 25, minTime, logger, false)
776
776
if err != nil {
777
777
t.Fatalf("NewMempool failed: %v", err)
778
778
}
···
801
801
802
802
t.Run("FindLatestDIDOperation_AllNullified", func(t *testing.T) {
803
803
minTime := baseTime
804
804
-
m, err := mempool.NewMempool(tmpDir, 26, minTime, logger)
804
804
+
m, err := mempool.NewMempool(tmpDir, 26, minTime, logger, false)
805
805
if err != nil {
806
806
t.Fatalf("NewMempool failed: %v", err)
807
807
}
···
824
824
825
825
t.Run("FindDIDOperations_NotFound", func(t *testing.T) {
826
826
minTime := baseTime
827
827
-
m, err := mempool.NewMempool(tmpDir, 27, minTime, logger)
827
827
+
m, err := mempool.NewMempool(tmpDir, 27, minTime, logger, false)
828
828
if err != nil {
829
829
t.Fatalf("NewMempool failed: %v", err)
830
830
}
···
850
850
851
851
t.Run("ClearPopulated", func(t *testing.T) {
852
852
minTime := baseTime
853
853
-
m, err := mempool.NewMempool(tmpDir, 28, minTime, logger)
853
853
+
m, err := mempool.NewMempool(tmpDir, 28, minTime, logger, false)
854
854
if err != nil {
855
855
t.Fatalf("NewMempool failed: %v", err)
856
856
}