A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go

cleanup relay-compare script

evan.jarrett.net 08272197 7c064ba8

verified
+131 -40
+131 -40
cmd/relay-compare/main.go
··· 8 8 9 9 import ( 10 10 "context" 11 + "encoding/json" 11 12 "flag" 12 13 "fmt" 14 + "net/http" 13 15 "net/url" 14 16 "os" 15 17 "sort" ··· 17 19 "sync" 18 20 "time" 19 21 20 - "atcr.io/pkg/atproto" 22 + "github.com/bluesky-social/indigo/atproto/identity" 23 + "github.com/bluesky-social/indigo/atproto/syntax" 24 + "github.com/bluesky-social/indigo/xrpc" 21 25 ) 22 26 23 27 // ANSI color codes (disabled via --no-color or NO_COLOR env) ··· 37 41 38 42 // All io.atcr.* collections to compare 39 43 var allCollections = []string{ 40 - atproto.ManifestCollection, // io.atcr.manifest 41 - atproto.TagCollection, // io.atcr.tag 42 - atproto.SailorProfileCollection, // io.atcr.sailor.profile 43 - atproto.StarCollection, // io.atcr.sailor.star 44 - atproto.RepoPageCollection, // io.atcr.repo.page 45 - atproto.CaptainCollection, // io.atcr.hold.captain 46 - atproto.CrewCollection, // io.atcr.hold.crew 47 - atproto.LayerCollection, // io.atcr.hold.layer 48 - atproto.StatsCollection, // io.atcr.hold.stats 49 - atproto.ScanCollection, // io.atcr.hold.scan 44 + "io.atcr.manifest", 45 + "io.atcr.tag", 46 + "io.atcr.sailor.profile", 47 + "io.atcr.sailor.star", 48 + "io.atcr.repo.page", 49 + "io.atcr.hold.captain", 50 + "io.atcr.hold.crew", 51 + "io.atcr.hold.layer", 52 + "io.atcr.hold.stats", 53 + "io.atcr.hold.scan", 50 54 } 51 55 52 56 type summaryRow struct { ··· 76 80 relayIdx int 77 81 } 78 82 83 + // XRPC response types for listReposByCollection 84 + type listReposByCollectionResult struct { 85 + Repos []repoRef `json:"repos"` 86 + Cursor string `json:"cursor,omitempty"` 87 + } 88 + 89 + type repoRef struct { 90 + DID string `json:"did"` 91 + } 92 + 93 + // XRPC response types for listRecords 94 + type listRecordsResult struct { 95 + Records []json.RawMessage `json:"records"` 96 + Cursor string `json:"cursor,omitempty"` 97 + } 98 + 99 + // Shared identity directory for DID resolution 100 + var dir identity.Directory 101 + 79 102 func main() { 80 103 noColor := flag.Bool("no-color", false, "disable colored output") 81 104 verify := flag.Bool("verify", false, "verify diffs against PDS to distinguish real gaps from ghost entries") 105 + hideGhosts := flag.Bool("hide-ghosts", false, "with --verify, hide ghost and deactivated entries from output") 82 106 collection := flag.String("collection", "", "compare only this collection") 83 107 timeout := flag.Duration("timeout", 2*time.Minute, "timeout for all relay queries") 84 108 flag.Usage = func() { ··· 112 136 113 137 ctx, cancel := context.WithTimeout(context.Background(), *timeout) 114 138 defer cancel() 139 + 140 + dir = identity.DefaultDirectory() 115 141 116 142 // Short display names for each relay 117 143 names := make([]string, len(relays)) ··· 253 279 fmt.Printf("\n %sMissing from %s (%d):%s\n", cRed, names[ri], len(missing), cReset) 254 280 for _, did := range missing { 255 281 suffix := "" 282 + skip := false 256 283 if *verify { 257 284 vr, ok := verified[key{col, did}] 258 285 if !ok { ··· 263 290 suffix = fmt.Sprintf(" %s← deactivated%s", cDim, cReset) 264 291 row.deactivated++ 265 292 totalDeactivated++ 293 + skip = *hideGhosts 266 294 } else if vr.exists { 267 295 suffix = fmt.Sprintf(" %s← real gap%s", cRed, cReset) 268 296 row.realGaps++ ··· 271 299 suffix = fmt.Sprintf(" %s← ghost (not on PDS)%s", cDim, cReset) 272 300 row.ghosts++ 273 301 totalGhosts++ 302 + skip = *hideGhosts 274 303 } 275 304 } 276 - fmt.Printf(" %s- %s%s%s\n", cRed, did, cReset, suffix) 305 + if !skip { 306 + fmt.Printf(" %s- %s%s%s\n", cRed, did, cReset, suffix) 307 + } 277 308 } 278 309 } 279 310 ··· 283 314 } 284 315 285 316 if inSync { 286 - notes := formatSyncNotes(row.ghosts, row.deactivated) 317 + notes := "" 318 + if !*hideGhosts { 319 + notes = formatSyncNotes(row.ghosts, row.deactivated) 320 + } 287 321 if notes != "" { 288 322 fmt.Printf(" %s✓ in sync%s %s(%s)%s\n", cGreen, cReset, cDim, notes, cReset) 289 323 } else { ··· 297 331 } 298 332 299 333 // Summary table 300 - printSummary(summary, names, maxNameLen, totalMissing, *verify, totalRealGaps, totalGhosts, totalDeactivated) 334 + printSummary(summary, names, maxNameLen, totalMissing, *verify, *hideGhosts, totalRealGaps, totalGhosts, totalDeactivated) 301 335 } 302 336 303 - func printSummary(rows []summaryRow, names []string, maxNameLen, totalMissing int, showVerify bool, totalRealGaps, totalGhosts, totalDeactivated int) { 337 + func printSummary(rows []summaryRow, names []string, maxNameLen, totalMissing int, showVerify, hideGhosts bool, totalRealGaps, totalGhosts, totalDeactivated int) { 304 338 fmt.Printf("\n%s%s━━━ Summary ━━━%s\n\n", cBold, cCyan, cReset) 305 339 306 - colW := 28 307 - relayW := maxNameLen + 2 308 - if relayW < 8 { 309 - relayW = 8 340 + // Build short labels (A, B, C, ...) for compact columns 341 + labels := make([]string, len(names)) 342 + for i, name := range names { 343 + labels[i] = string(rune('A' + i)) 344 + fmt.Printf(" %s%s%s: %s\n", cBold, labels[i], cReset, name) 345 + } 346 + fmt.Println() 347 + 348 + colW := len("Collection") 349 + for _, row := range rows { 350 + if len(row.collection) > colW { 351 + colW = len(row.collection) 352 + } 310 353 } 354 + relayW := 6 311 355 312 356 // Header 313 357 fmt.Printf(" %-*s", colW, "Collection") 314 - for _, name := range names { 315 - fmt.Printf(" %*s", relayW, name) 358 + for _, label := range labels { 359 + fmt.Printf(" %*s", relayW, label) 316 360 } 317 361 fmt.Printf(" Status\n") 318 362 319 363 // Separator 320 364 fmt.Printf(" %s", strings.Repeat("─", colW)) 321 - for range names { 365 + for range labels { 322 366 fmt.Printf(" %s", strings.Repeat("─", relayW)) 323 367 } 324 368 fmt.Printf(" %s\n", strings.Repeat("─", 14)) ··· 336 380 } 337 381 switch row.status { 338 382 case "sync": 339 - notes := formatSyncNotes(row.ghosts, row.deactivated) 383 + notes := "" 384 + if !hideGhosts { 385 + notes = formatSyncNotes(row.ghosts, row.deactivated) 386 + } 340 387 if notes != "" { 341 388 fmt.Printf(" %s✓ in sync%s %s(%s)%s", cGreen, cReset, cDim, notes, cReset) 342 389 } else { ··· 344 391 } 345 392 case "diff": 346 393 if showVerify { 347 - notes := formatSyncNotes(row.ghosts, row.deactivated) 348 - if notes != "" { 349 - notes = ", " + notes 394 + if hideGhosts { 395 + fmt.Printf(" %s≠ %d missing%s", cYellow, row.realGaps, cReset) 396 + } else { 397 + notes := formatSyncNotes(row.ghosts, row.deactivated) 398 + if notes != "" { 399 + notes = ", " + notes 400 + } 401 + fmt.Printf(" %s≠ %d missing%s %s(%d real%s)%s", 402 + cYellow, row.realGaps, cReset, cDim, row.realGaps, notes, cReset) 350 403 } 351 - fmt.Printf(" %s≠ %d missing%s %s(%s)%s", 352 - cYellow, row.realGaps, cReset, cDim, fmt.Sprintf("%d real%s", row.realGaps, notes), cReset) 353 404 } else { 354 405 fmt.Printf(" %s≠ %d missing%s", cYellow, row.diffCount, cReset) 355 406 } ··· 363 414 fmt.Println() 364 415 if totalMissing > 0 { 365 416 if showVerify && totalRealGaps == 0 { 366 - notes := formatSyncNotes(totalGhosts, totalDeactivated) 367 - fmt.Printf("%s✓ All relays in sync%s %s(%s)%s\n", cGreen, cReset, cDim, notes, cReset) 417 + if hideGhosts { 418 + fmt.Printf("%s✓ All relays in sync%s\n", cGreen, cReset) 419 + } else { 420 + notes := formatSyncNotes(totalGhosts, totalDeactivated) 421 + fmt.Printf("%s✓ All relays in sync%s %s(%s)%s\n", cGreen, cReset, cDim, notes, cReset) 422 + } 368 423 } else { 369 424 if showVerify { 370 425 fmt.Printf("%s%d real gaps across relays%s", cYellow, totalRealGaps, cReset) 371 - notes := formatSyncNotes(totalGhosts, totalDeactivated) 372 - if notes != "" { 373 - fmt.Printf(" %s(%s)%s", cDim, notes, cReset) 426 + if !hideGhosts { 427 + notes := formatSyncNotes(totalGhosts, totalDeactivated) 428 + if notes != "" { 429 + fmt.Printf(" %s(%s)%s", cDim, notes, cReset) 430 + } 374 431 } 375 432 fmt.Println() 376 433 } else { ··· 425 482 sem <- struct{}{} 426 483 defer func() { <-sem }() 427 484 428 - pds, err := atproto.ResolveDIDToPDS(ctx, did) 485 + pds, err := resolveDIDToPDS(ctx, did) 429 486 mu.Lock() 430 487 if err != nil { 431 488 pdsErrors[did] = err ··· 466 523 } 467 524 468 525 pds := pdsEndpoints[dc.did] 469 - client := atproto.NewClient(pds, "", "") 470 - records, _, err := client.ListRecordsForRepo(ctx, dc.did, dc.col, 1, "") 526 + client := &xrpc.Client{Host: pds, Client: http.DefaultClient} 527 + var listResult listRecordsResult 528 + err := client.LexDo(ctx, "GET", "", "com.atproto.repo.listRecords", map[string]any{ 529 + "repo": dc.did, 530 + "collection": dc.col, 531 + "limit": 1, 532 + }, nil, &listResult) 471 533 mu.Lock() 472 534 if err != nil { 473 535 errStr := err.Error() ··· 480 542 results[k] = verifyResult{err: err} 481 543 } 482 544 } else { 483 - results[k] = verifyResult{exists: len(records) > 0} 545 + results[k] = verifyResult{exists: len(listResult.Records) > 0} 484 546 } 485 547 mu.Unlock() 486 548 }(dc) ··· 490 552 return results 491 553 } 492 554 555 + // resolveDIDToPDS resolves a DID to its PDS endpoint using the shared identity directory. 556 + func resolveDIDToPDS(ctx context.Context, did string) (string, error) { 557 + didParsed, err := syntax.ParseDID(did) 558 + if err != nil { 559 + return "", fmt.Errorf("invalid DID: %w", err) 560 + } 561 + 562 + ident, err := dir.LookupDID(ctx, didParsed) 563 + if err != nil { 564 + return "", fmt.Errorf("failed to resolve DID: %w", err) 565 + } 566 + 567 + pdsEndpoint := ident.PDSEndpoint() 568 + if pdsEndpoint == "" { 569 + return "", fmt.Errorf("no PDS endpoint found for DID") 570 + } 571 + 572 + return pdsEndpoint, nil 573 + } 574 + 493 575 // fetchAllDIDs paginates through listReposByCollection to collect all DIDs. 494 576 func fetchAllDIDs(ctx context.Context, relay, collection string) (map[string]struct{}, error) { 495 - client := atproto.NewClient(relay, "", "") 577 + client := &xrpc.Client{Host: relay, Client: http.DefaultClient} 496 578 dids := make(map[string]struct{}) 497 579 var cursor string 498 580 499 581 for { 500 - result, err := client.ListReposByCollection(ctx, collection, 1000, cursor) 582 + params := map[string]any{ 583 + "collection": collection, 584 + "limit": 1000, 585 + } 586 + if cursor != "" { 587 + params["cursor"] = cursor 588 + } 589 + 590 + var result listReposByCollectionResult 591 + err := client.LexDo(ctx, "GET", "", "com.atproto.sync.listReposByCollection", params, nil, &result) 501 592 if err != nil { 502 - return dids, err 593 + return dids, fmt.Errorf("listReposByCollection failed: %w", err) 503 594 } 504 595 505 596 for _, repo := range result.Repos {