A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory

better quiet mode

+118 -65
+1 -1
bundle.go
··· 49 49 50 50 // FetchNextBundle fetches the next bundle from PLC directory 51 51 func (m *Manager) FetchNextBundle(ctx context.Context) (*Bundle, error) { 52 - b, err := m.internal.FetchNextBundle(ctx, false) 52 + b, _, err := m.internal.FetchNextBundle(ctx, false, false) 53 53 if err != nil { 54 54 return nil, err 55 55 }
+75 -37
bundle/manager.go
··· 435 435 } 436 436 437 437 // SaveBundle saves a bundle to disk and updates the index 438 - func (m *Manager) SaveBundle(ctx context.Context, bundle *Bundle, quiet bool) (time.Duration, error) { 438 + func (m *Manager) SaveBundle(ctx context.Context, bundle *Bundle, verbose bool, quiet bool, stats types.BundleProductionStats) (time.Duration, error) { 439 + 440 + totalStart := time.Now() 439 441 if err := bundle.ValidateForSave(); err != nil { 440 442 return 0, fmt.Errorf("bundle validation failed: %w", err) 441 443 } ··· 481 483 Hostname: hostname, 482 484 } 483 485 484 - m.logger.Printf("DEBUG: Calling operations.SaveBundle with bundle=%d", bundleInfo.BundleNumber) 486 + if m.config.Verbose { 487 + m.logger.Printf("DEBUG: Calling operations.SaveBundle with bundle=%d", bundleInfo.BundleNumber) 488 + } 485 489 486 490 // Save to disk with 3 parameters 487 491 uncompressedHash, compressedHash, uncompressedSize, compressedSize, err := m.operations.SaveBundle(path, bundle.Operations, bundleInfo) ··· 490 494 return 0, fmt.Errorf("failed to save bundle: %w", err) 491 495 } 492 496 493 - m.logger.Printf("DEBUG: SaveBundle SUCCESS, setting bundle fields") 497 + if m.config.Verbose { 498 + m.logger.Printf("DEBUG: SaveBundle SUCCESS, setting bundle fields") 499 + } 494 500 501 + bundle.Hash = m.operations.CalculateChainHash(parent, bundle.ContentHash) 495 502 bundle.ContentHash = uncompressedHash 496 503 bundle.CompressedHash = compressedHash 497 504 bundle.UncompressedSize = uncompressedSize ··· 499 506 bundle.CreatedAt = time.Now().UTC() 500 507 bundle.Hash = m.operations.CalculateChainHash(parent, bundle.ContentHash) 501 508 502 - m.logger.Printf("DEBUG: Adding bundle %d to index", bundle.BundleNumber) 509 + if m.config.Verbose { 510 + m.logger.Printf("DEBUG: Adding bundle %d to index", bundle.BundleNumber) 511 + } 503 512 504 513 // Add to index 505 514 m.index.AddBundle(bundle.ToMetadata()) 506 515 507 - m.logger.Printf("DEBUG: Index now has %d bundles", m.index.Count()) 516 + if m.config.Verbose { 517 + m.logger.Printf("DEBUG: Index now has %d bundles", m.index.Count()) 518 + } 508 519 509 520 // Save index 510 521 if err := m.SaveIndex(); err != nil { ··· 512 523 return 0, fmt.Errorf("failed to save index: %w", err) 513 524 } 514 525 515 - m.logger.Printf("DEBUG: Index saved, last bundle = %d", m.index.GetLastBundle().BundleNumber) 526 + if m.config.Verbose { 527 + m.logger.Printf("DEBUG: Index saved, last bundle = %d", m.index.GetLastBundle().BundleNumber) 528 + } 529 + 530 + saveDuration := time.Since(totalStart) 516 531 517 532 // Clean up old mempool 518 533 oldMempoolFile := m.mempool.GetFilename() ··· 539 554 m.logger.Printf("Warning: failed to update DID index: %v", err) 540 555 } else { 541 556 indexUpdateDuration = time.Since(indexUpdateStart) 542 - if !quiet { 557 + if !quiet && m.config.Verbose { 543 558 m.logger.Printf(" [DID Index] Updated in %s", indexUpdateDuration) 544 559 } 545 560 } 561 + } 562 + 563 + if !quiet { 564 + msg := fmt.Sprintf("→ Bundle %06d | %s | time: %s (%d reqs)", 565 + bundle.BundleNumber, 566 + bundle.Hash[0:7], 567 + stats.TotalDuration.Round(time.Millisecond), 568 + stats.TotalFetches, 569 + ) 570 + if indexUpdateDuration > 0 { 571 + msg += fmt.Sprintf(" | index: %s", indexUpdateDuration.Round(time.Millisecond)) 572 + } 573 + m.logger.Println(msg) 574 + } 575 + 576 + if m.config.Verbose { 577 + m.logger.Printf("DEBUG: Bundle done = %d, finish duration = %s", 578 + m.index.GetLastBundle().BundleNumber, 579 + saveDuration.Round(time.Millisecond)) 546 580 } 547 581 548 582 return indexUpdateDuration, nil ··· 1163 1197 } 1164 1198 1165 1199 // FetchNextBundle fetches operations and creates a bundle, looping until caught up 1166 - func (m *Manager) FetchNextBundle(ctx context.Context, quiet bool) (*Bundle, error) { 1200 + func (m *Manager) FetchNextBundle(ctx context.Context, verbose bool, quiet bool) (*Bundle, types.BundleProductionStats, error) { 1167 1201 if m.plcClient == nil { 1168 - return nil, fmt.Errorf("PLC client not configured") 1202 + return nil, types.BundleProductionStats{}, fmt.Errorf("PLC client not configured") 1169 1203 } 1170 1204 1171 1205 lastBundle := m.index.GetLastBundle() ··· 1183 1217 prevBundle, err := m.LoadBundle(ctx, lastBundle.BundleNumber) 1184 1218 if err == nil { 1185 1219 _, prevBoundaryCIDs = m.operations.GetBoundaryCIDs(prevBundle.Operations) 1186 - if !quiet { 1220 + if verbose { 1187 1221 m.logger.Printf("Loaded %d boundary CIDs from bundle %06d (at %s)", 1188 1222 len(prevBoundaryCIDs), lastBundle.BundleNumber, 1189 1223 lastBundle.EndTime.Format(time.RFC3339)[:19]) ··· 1196 1230 if m.mempool.Count() > 0 { 1197 1231 mempoolLastTime := m.mempool.GetLastTime() 1198 1232 if mempoolLastTime != "" { 1199 - if !quiet { 1200 - m.logger.Printf("Mempool has %d ops, resuming from %s", 1233 + if verbose { 1234 + m.logger.Printf("[DEBUG] Mempool has %d ops, resuming from %s", 1201 1235 m.mempool.Count(), mempoolLastTime[:19]) 1202 1236 } 1203 1237 afterTime = mempoolLastTime ··· 1207 1241 if len(mempoolOps) > 0 { 1208 1242 _, mempoolBoundaries := m.operations.GetBoundaryCIDs(mempoolOps) 1209 1243 prevBoundaryCIDs = mempoolBoundaries 1210 - if !quiet { 1244 + if verbose { 1211 1245 m.logger.Printf("Using %d boundary CIDs from mempool", len(prevBoundaryCIDs)) 1212 1246 } 1213 1247 } 1214 1248 } 1215 1249 } 1216 1250 1217 - if !quiet { 1218 - m.logger.Printf("Preparing bundle %06d (mempool: %d ops)...", nextBundleNum, m.mempool.Count()) 1219 - m.logger.Printf("Starting cursor: %s", afterTime) 1251 + if verbose { 1252 + m.logger.Printf("[DEBUG] Preparing bundle %06d (mempool: %d ops)...", nextBundleNum, m.mempool.Count()) 1253 + m.logger.Printf("[DEBUG] Starting cursor: %s", afterTime) 1220 1254 } 1221 1255 1222 1256 totalFetches := 0 ··· 1239 1273 afterTime, 1240 1274 prevBoundaryCIDs, 1241 1275 needed, 1242 - quiet, 1276 + !verbose, 1243 1277 m.mempool, 1244 1278 totalFetches, 1245 1279 ) ··· 1257 1291 // Stop if caught up or error 1258 1292 if err != nil || len(newOps) == 0 || gotIncompleteBatch { 1259 1293 caughtUp = true 1260 - if !quiet && totalFetches > 0 { 1261 - m.logger.Printf(" Caught up to latest PLC data") 1294 + if verbose && totalFetches > 0 { 1295 + m.logger.Printf("DEBUG: Caught up to latest PLC data") 1262 1296 } 1263 1297 break 1264 1298 } ··· 1272 1306 1273 1307 if m.mempool.Count() < types.BUNDLE_SIZE { 1274 1308 if caughtUp { 1275 - return nil, fmt.Errorf("insufficient operations: have %d, need %d (caught up to latest PLC data)", 1309 + return nil, types.BundleProductionStats{}, fmt.Errorf("insufficient operations: have %d, need %d (caught up to latest PLC data)", 1276 1310 m.mempool.Count(), types.BUNDLE_SIZE) 1277 1311 } else { 1278 - return nil, fmt.Errorf("insufficient operations: have %d, need %d (max attempts reached)", 1312 + return nil, types.BundleProductionStats{}, fmt.Errorf("insufficient operations: have %d, need %d (max attempts reached)", 1279 1313 m.mempool.Count(), types.BUNDLE_SIZE) 1280 1314 } 1281 1315 } ··· 1283 1317 // Create bundle 1284 1318 operations, err := m.mempool.Take(types.BUNDLE_SIZE) 1285 1319 if err != nil { 1286 - return nil, err 1320 + return nil, types.BundleProductionStats{}, err 1287 1321 } 1288 1322 1289 1323 syncBundle := internalsync.CreateBundle(nextBundleNum, operations, afterTime, prevBundleHash, m.operations) ··· 1301 1335 CreatedAt: syncBundle.CreatedAt, 1302 1336 } 1303 1337 1304 - if !quiet { 1305 - avgPerFetch := float64(types.BUNDLE_SIZE) / float64(totalFetches) 1306 - throughput := float64(types.BUNDLE_SIZE) / totalDuration.Seconds() 1307 - m.logger.Printf("✓ Bundle %06d ready (%d ops, %d DIDs) - %d fetches in %s (avg %.0f/fetch, %.0f ops/sec)", 1308 - bundle.BundleNumber, len(bundle.Operations), bundle.DIDCount, 1309 - totalFetches, totalDuration.Round(time.Millisecond), avgPerFetch, throughput) 1338 + stats := types.BundleProductionStats{ 1339 + TotalFetches: totalFetches, 1340 + TotalDuration: totalDuration, 1341 + AvgPerFetch: float64(types.BUNDLE_SIZE) / float64(totalFetches), 1342 + Throughput: float64(types.BUNDLE_SIZE) / totalDuration.Seconds(), 1310 1343 } 1311 1344 1312 - return bundle, nil 1345 + return bundle, stats, nil 1313 1346 } 1314 1347 1315 1348 // CloneFromRemote clones bundles from a remote endpoint ··· 1443 1476 } 1444 1477 1445 1478 // FetchAndSaveNextBundle fetches and saves next bundle, returns bundle number and index time 1446 - func (m *Manager) FetchAndSaveNextBundle(ctx context.Context, quiet bool) (int, time.Duration, error) { 1447 - bundle, err := m.FetchNextBundle(ctx, quiet) 1479 + func (m *Manager) FetchAndSaveNextBundle(ctx context.Context, verbose bool, quiet bool) (int, *types.BundleProductionStats, error) { 1480 + bundle, stats, err := m.FetchNextBundle(ctx, verbose, quiet) 1448 1481 if err != nil { 1449 - return 0, 0, err 1482 + return 0, nil, err 1450 1483 } 1451 1484 1452 - indexTime, err := m.SaveBundle(ctx, bundle, quiet) 1485 + indexTime, err := m.SaveBundle(ctx, bundle, verbose, quiet, stats) 1453 1486 if err != nil { 1454 - return 0, 0, err 1487 + return 0, nil, err 1455 1488 } 1489 + stats.IndexTime = indexTime 1456 1490 1457 - return bundle.BundleNumber, indexTime, nil 1491 + return bundle.BundleNumber, &types.BundleProductionStats{}, nil 1458 1492 } 1459 1493 1460 1494 // RunSyncLoop runs continuous sync loop (delegates to internal/sync) ··· 1577 1611 } 1578 1612 1579 1613 resolveStart := time.Now() 1580 - m.logger.Printf("Resolving handle: %s", input) 1614 + if !m.config.Quiet { 1615 + m.logger.Printf("Resolving handle: %s", input) 1616 + } 1581 1617 did, err := m.handleResolver.ResolveHandle(ctx, input) 1582 1618 resolveTime := time.Since(resolveStart) 1583 1619 ··· 1585 1621 return "", resolveTime, fmt.Errorf("failed to resolve handle '%s': %w", input, err) 1586 1622 } 1587 1623 1588 - m.logger.Printf("Resolved: %s → %s (in %s)", input, did, resolveTime) 1624 + if !m.config.Quiet { 1625 + m.logger.Printf("Resolved: %s → %s (in %s)", input, did, resolveTime) 1626 + } 1589 1627 return did, resolveTime, nil 1590 1628 } 1591 1629
+2
bundle/types.go
··· 128 128 RebuildProgress func(current, total int) // Progress callback for rebuild 129 129 Logger types.Logger 130 130 Verbose bool 131 + Quiet bool 131 132 } 132 133 133 134 // DefaultConfig returns default configuration ··· 142 143 RebuildProgress: nil, // No progress callback by default 143 144 Logger: nil, 144 145 Verbose: false, 146 + Quiet: false, 145 147 } 146 148 } 147 149
+6 -2
cmd/plcbundle/commands/common.go
··· 14 14 "tangled.org/atscan.net/plcbundle/internal/didindex" 15 15 "tangled.org/atscan.net/plcbundle/internal/plcclient" 16 16 internalsync "tangled.org/atscan.net/plcbundle/internal/sync" 17 + "tangled.org/atscan.net/plcbundle/internal/types" 17 18 ) 18 19 19 20 // BundleManager interface (for testing/mocking) ··· 29 30 ValidateMempool() error 30 31 RefreshMempool() error 31 32 ClearMempool() error 32 - FetchNextBundle(ctx context.Context, quiet bool) (*bundle.Bundle, error) 33 - SaveBundle(ctx context.Context, b *bundle.Bundle, quiet bool) (time.Duration, error) 33 + FetchNextBundle(ctx context.Context, verbose bool, quiet bool) (*bundle.Bundle, types.BundleProductionStats, error) 34 + SaveBundle(ctx context.Context, b *bundle.Bundle, verbose bool, quiet bool, stats types.BundleProductionStats) (time.Duration, error) 34 35 SaveIndex() error 35 36 GetDIDIndexStats() map[string]interface{} 36 37 GetDIDIndex() *didindex.Manager ··· 127 128 if opts.Cmd != nil { 128 129 globalVerbose, _ := opts.Cmd.Root().PersistentFlags().GetBool("verbose") 129 130 localVerbose, _ := opts.Cmd.Flags().GetBool("verbose") 131 + globalQuiet, _ := opts.Cmd.Root().PersistentFlags().GetBool("quiet") 132 + localQuiet, _ := opts.Cmd.Flags().GetBool("quiet") 130 133 131 134 // Use OR logic: verbose if EITHER flag is set 132 135 config.Verbose = globalVerbose || localVerbose 136 + config.Quiet = globalQuiet || localQuiet 133 137 } 134 138 135 139 // Create PLC client if URL provided
-10
cmd/plcbundle/commands/did.go
··· 215 215 return err 216 216 } 217 217 218 - // Show resolution timing if it was a handle 219 - if input != did { 220 - if verbose { 221 - fmt.Fprintf(os.Stderr, "Handle resolution: %s → %s (%s)\n", 222 - input, did, handleResolveTime) 223 - } else { 224 - fmt.Fprintf(os.Stderr, "Resolved handle '%s' → %s\n", input, did) 225 - } 226 - } 227 - 228 218 if verbose { 229 219 fmt.Fprintf(os.Stderr, "Resolving DID: %s\n", did) 230 220 mgr.GetDIDIndex().SetVerbose(true)
+16 -8
internal/didindex/builder.go
··· 244 244 } 245 245 246 246 groupDuration := time.Since(groupStart) 247 - dim.logger.Printf(" [DID Index] Grouped operations into %d shards in %s", 248 - len(shardOps), groupDuration) 247 + if dim.verbose { 248 + dim.logger.Printf(" [DID Index] Grouped operations into %d shards in %s", 249 + len(shardOps), groupDuration) 250 + } 249 251 250 252 // STEP 2: Write ALL shards to .tmp files FIRST (PARALLEL) 251 253 writeStart := time.Now() ··· 270 272 semaphore := make(chan struct{}, workers) 271 273 var wg sync.WaitGroup 272 274 273 - dim.logger.Printf(" [DID Index] Updating %d shards in parallel (%d workers)...", 274 - len(shardOps), workers) 275 + if dim.verbose { 276 + dim.logger.Printf(" [DID Index] Updating %d shards in parallel (%d workers)...", 277 + len(shardOps), workers) 278 + } 275 279 276 280 // Process each shard in parallel 277 281 for shardNum, newOps := range shardOps { ··· 316 320 close(errChan) 317 321 318 322 writeDuration := time.Since(writeStart) 319 - dim.logger.Printf(" [DID Index] Wrote %d temp files in %s (%.1f shards/sec)", 320 - len(tmpShards), writeDuration, float64(len(tmpShards))/writeDuration.Seconds()) 323 + if dim.verbose { 324 + dim.logger.Printf(" [DID Index] Wrote %d temp files in %s (%.1f shards/sec)", 325 + len(tmpShards), writeDuration, float64(len(tmpShards))/writeDuration.Seconds()) 326 + } 321 327 322 328 // Check for errors 323 329 if err := <-errChan; err != nil { ··· 354 360 totalDuration := time.Since(totalStart) 355 361 356 362 // Summary log 357 - dim.logger.Printf(" [DID Index] ✓ Bundle %06d indexed: +%d DIDs, %d shards updated in %s", 358 - bundle.BundleNumber, deltaCount, len(tmpShards), totalDuration) 363 + if dim.verbose { 364 + dim.logger.Printf(" [DID Index] ✓ Bundle %06d indexed: +%d DIDs, %d shards updated in %s", 365 + bundle.BundleNumber, deltaCount, len(tmpShards), totalDuration) 366 + } 359 367 360 368 if dim.verbose { 361 369 dim.logger.Printf(" Breakdown: group=%s write=%s commit=%s config=%s",
+4 -3
internal/sync/sync_test.go
··· 14 14 "tangled.org/atscan.net/plcbundle/internal/plcclient" 15 15 "tangled.org/atscan.net/plcbundle/internal/storage" 16 16 internalsync "tangled.org/atscan.net/plcbundle/internal/sync" 17 + "tangled.org/atscan.net/plcbundle/internal/types" 17 18 ) 18 19 19 20 type testLogger struct { ··· 689 690 return m.mempoolCount 690 691 } 691 692 692 - func (m *mockSyncManager) FetchAndSaveNextBundle(ctx context.Context, quiet bool) (int, time.Duration, error) { 693 + func (m *mockSyncManager) FetchAndSaveNextBundle(ctx context.Context, verbose bool, quiet bool) (int, *types.BundleProductionStats, error) { 693 694 m.mu.Lock() 694 695 defer m.mu.Unlock() 695 696 ··· 701 702 if m.mempoolCount >= 10000 { 702 703 m.lastBundle++ 703 704 m.mempoolCount -= 10000 704 - return m.lastBundle, 10 * time.Millisecond, nil 705 + return m.lastBundle, nil, nil 705 706 } 706 707 707 708 // Not enough ops 708 - return 0, 0, fmt.Errorf("insufficient operations") 709 + return 0, nil, fmt.Errorf("insufficient operations") 709 710 } 710 711 711 712 func (m *mockSyncManager) SaveMempool() error {
+3 -3
internal/sync/syncer.go
··· 31 31 GetLastBundleNumber() int 32 32 GetMempoolCount() int 33 33 // Returns: bundleNumber, indexUpdateTime, error 34 - FetchAndSaveNextBundle(ctx context.Context, quiet bool) (int, time.Duration, error) 34 + FetchAndSaveNextBundle(ctx context.Context, verbose bool, quiet bool) (int, *types.BundleProductionStats, error) 35 35 SaveMempool() error 36 36 } 37 37 ··· 50 50 mempoolBefore := mgr.GetMempoolCount() 51 51 52 52 // Attempt to fetch and save next bundle 53 - bundleNum, indexTime, err := mgr.FetchAndSaveNextBundle(ctx, !verbose) 53 + bundleNum, stats, err := mgr.FetchAndSaveNextBundle(ctx, verbose, false) 54 54 55 55 // Check if we made any progress 56 56 bundleAfter := mgr.GetLastBundleNumber() ··· 71 71 72 72 // Success 73 73 fetchedCount++ 74 - totalIndexTime += indexTime 74 + totalIndexTime += stats.IndexTime 75 75 76 76 // Callback if provided 77 77 if config.OnBundleSynced != nil {
+10
internal/types/types.go
··· 1 1 package types 2 2 3 + import "time" 4 + 3 5 // Logger is a simple logging interface used throughout plcbundle 4 6 type Logger interface { 5 7 Printf(format string, v ...interface{}) ··· 16 18 // INDEX_VERSION is the current index format version 17 19 INDEX_VERSION = "1.0" 18 20 ) 21 + 22 + type BundleProductionStats struct { 23 + TotalFetches int 24 + TotalDuration time.Duration 25 + AvgPerFetch float64 26 + Throughput float64 27 + IndexTime time.Duration 28 + }
+1 -1
types.go
··· 40 40 } 41 41 42 42 // Helper to convert internal bundle to public 43 - func toBundlePublic(b interface{}) *Bundle { 43 + func toBundlePublic(_ interface{}) *Bundle { 44 44 // Implement conversion from internal bundle to public Bundle 45 45 return &Bundle{} // placeholder 46 46 }