···149149 // Set global refresher for middleware
150150 middleware.SetGlobalRefresher(refresher)
151151152152- // Set global database for pull/push metrics tracking
153153- metricsDB := db.NewMetricsDB(uiDatabase)
154154- middleware.SetGlobalDatabase(metricsDB)
152152+ // Set global database for hold DID lookups (used by blob routing)
153153+ holdDIDDB := db.NewHoldDIDDB(uiDatabase)
154154+ middleware.SetGlobalDatabase(holdDIDDB)
155155156156 // Create RemoteHoldAuthorizer for hold authorization with caching
157157 holdAuthorizer := auth.NewRemoteHoldAuthorizer(uiDatabase, testMode)
···160160161161 // Initialize Jetstream workers (background services before HTTP routes)
162162 initializeJetstream(uiDatabase, &cfg.Jetstream, defaultHoldDID, testMode, refresher)
163163+164164+ // Run stats migration to holds (one-time migration, skipped if already done)
165165+ go func() {
166166+ // Wait for services to be ready (Docker startup race condition)
167167+ time.Sleep(10 * time.Second)
168168+ // Create service token getter callback that uses auth.GetOrFetchServiceToken
169169+ getServiceToken := func(ctx context.Context, userDID, holdDID, pdsEndpoint string) (string, error) {
170170+ return auth.GetOrFetchServiceToken(ctx, refresher, userDID, holdDID, pdsEndpoint)
171171+ }
172172+ if err := db.MigrateStatsToHolds(context.Background(), uiDatabase, getServiceToken); err != nil {
173173+ slog.Warn("Stats migration failed", "error", err)
174174+ }
175175+ }()
163176164177 // Create main chi router
165178 mainRouter := chi.NewRouter()
+12-45
pkg/appview/db/queries.go
···14461446}
1447144714481448// UpsertRepositoryStats inserts or updates repository stats
14491449+// Note: star_count is calculated dynamically from the stars table, not stored here
14491450func UpsertRepositoryStats(db *sql.DB, stats *RepositoryStats) error {
14501451 _, err := db.Exec(`
14511451- INSERT INTO repository_stats (did, repository, star_count, pull_count, last_pull, push_count, last_push)
14521452- VALUES (?, ?, ?, ?, ?, ?, ?)
14521452+ INSERT INTO repository_stats (did, repository, pull_count, last_pull, push_count, last_push)
14531453+ VALUES (?, ?, ?, ?, ?, ?)
14531454 ON CONFLICT(did, repository) DO UPDATE SET
14541454- star_count = excluded.star_count,
14551455 pull_count = excluded.pull_count,
14561456 last_pull = excluded.last_pull,
14571457 push_count = excluded.push_count,
14581458 last_push = excluded.last_push
14591459- `, stats.DID, stats.Repository, stats.StarCount, stats.PullCount, stats.LastPull, stats.PushCount, stats.LastPush)
14591459+ `, stats.DID, stats.Repository, stats.PullCount, stats.LastPull, stats.PushCount, stats.LastPush)
14601460 return err
14611461}
14621462···15931593 return nil
15941594}
1595159515961596-// IncrementPullCount increments the pull count for a repository
15971597-func IncrementPullCount(db *sql.DB, did, repository string) error {
15981598- _, err := db.Exec(`
15991599- INSERT INTO repository_stats (did, repository, pull_count, last_pull)
16001600- VALUES (?, ?, 1, datetime('now'))
16011601- ON CONFLICT(did, repository) DO UPDATE SET
16021602- pull_count = pull_count + 1,
16031603- last_pull = datetime('now')
16041604- `, did, repository)
16051605- return err
16061606-}
16071607-16081608-// IncrementPushCount increments the push count for a repository
16091609-func IncrementPushCount(db *sql.DB, did, repository string) error {
16101610- _, err := db.Exec(`
16111611- INSERT INTO repository_stats (did, repository, push_count, last_push)
16121612- VALUES (?, ?, 1, datetime('now'))
16131613- ON CONFLICT(did, repository) DO UPDATE SET
16141614- push_count = push_count + 1,
16151615- last_push = datetime('now')
16161616- `, did, repository)
16171617- return err
16181618-}
16191619-16201596// parseTimestamp parses a timestamp string with multiple format attempts
16211597func parseTimestamp(s string) (time.Time, error) {
16221598 formats := []string{
···16341610 return time.Time{}, fmt.Errorf("unable to parse timestamp: %s", s)
16351611}
1636161216371637-// MetricsDB wraps a sql.DB and implements the metrics interface for middleware
16381638-type MetricsDB struct {
16131613+// HoldDIDDB wraps a sql.DB and implements the HoldDIDLookup interface for middleware
16141614+// This is a minimal wrapper that only provides hold DID lookups for blob routing
16151615+type HoldDIDDB struct {
16391616 db *sql.DB
16401617}
1641161816421642-// NewMetricsDB creates a new metrics database wrapper
16431643-func NewMetricsDB(db *sql.DB) *MetricsDB {
16441644- return &MetricsDB{db: db}
16451645-}
16461646-16471647-// IncrementPullCount increments the pull count for a repository
16481648-func (m *MetricsDB) IncrementPullCount(did, repository string) error {
16491649- return IncrementPullCount(m.db, did, repository)
16501650-}
16511651-16521652-// IncrementPushCount increments the push count for a repository
16531653-func (m *MetricsDB) IncrementPushCount(did, repository string) error {
16541654- return IncrementPushCount(m.db, did, repository)
16191619+// NewHoldDIDDB creates a new hold DID database wrapper
16201620+func NewHoldDIDDB(db *sql.DB) *HoldDIDDB {
16211621+ return &HoldDIDDB{db: db}
16551622}
1656162316571624// GetLatestHoldDIDForRepo returns the hold DID from the most recent manifest for a repository
16581658-func (m *MetricsDB) GetLatestHoldDIDForRepo(did, repository string) (string, error) {
16591659- return GetLatestHoldDIDForRepo(m.db, did, repository)
16251625+func (h *HoldDIDDB) GetLatestHoldDIDForRepo(did, repository string) (string, error) {
16261626+ return GetLatestHoldDIDForRepo(h.db, did, repository)
16601627}
1661162816621629// GetFeaturedRepositories fetches top repositories sorted by stars and pulls
+231
pkg/appview/db/stats_migration.go
···11+package db
22+33+import (
44+ "bytes"
55+ "context"
66+ "database/sql"
77+ "encoding/json"
88+ "fmt"
99+ "io"
1010+ "log/slog"
1111+ "net/http"
1212+ "time"
1313+1414+ "atcr.io/pkg/atproto"
1515+)
1616+1717+// ServiceTokenGetter is a function type for getting service tokens.
1818+// This avoids importing auth from db (which would create import cycles with tests).
1919+type ServiceTokenGetter func(ctx context.Context, userDID, holdDID, pdsEndpoint string) (string, error)
2020+2121+// MigrateStatsToHolds migrates existing repository_stats data to hold services.
2222+// This is a one-time migration that runs on startup.
2323+//
2424+// The migration:
2525+// 1. Checks if migration has already completed
2626+// 2. Reads all repository_stats entries
2727+// 3. For each entry, looks up the hold DID from manifests table
2828+// 4. Gets a service token for the user and calls the hold's setStats endpoint
2929+// 5. Marks migration complete after all entries are processed
3030+//
3131+// If a hold is offline, the migration logs a warning and continues.
3232+// The hold will receive real-time stats updates via Jetstream once online.
3333+//
3434+// The getServiceToken parameter is a callback to avoid import cycles with pkg/auth.
3535+func MigrateStatsToHolds(ctx context.Context, db *sql.DB, getServiceToken ServiceTokenGetter) error {
3636+ // Check if migration already done
3737+ var migrationDone bool
3838+ err := db.QueryRowContext(ctx, `
3939+ SELECT EXISTS(
4040+ SELECT 1 FROM schema_migrations WHERE version = 1000
4141+ )
4242+ `).Scan(&migrationDone)
4343+4444+ // Table might not exist yet on fresh install
4545+ if err == sql.ErrNoRows {
4646+ migrationDone = false
4747+ } else if err != nil {
4848+ // Check if it's a "no such table" error (fresh install)
4949+ if err.Error() != "no such table: schema_migrations" {
5050+ return fmt.Errorf("failed to check migration status: %w", err)
5151+ }
5252+ migrationDone = false
5353+ }
5454+5555+ if migrationDone {
5656+ slog.Debug("Stats migration already complete, skipping", "component", "migration")
5757+ return nil
5858+ }
5959+6060+ slog.Info("Starting stats migration to holds", "component", "migration")
6161+6262+ // Get all repository_stats entries
6363+ rows, err := db.QueryContext(ctx, `
6464+ SELECT did, repository, pull_count, last_pull, push_count, last_push
6565+ FROM repository_stats
6666+ WHERE pull_count > 0 OR push_count > 0
6767+ `)
6868+ if err != nil {
6969+ // Table might not exist on fresh install
7070+ if err.Error() == "no such table: repository_stats" {
7171+ slog.Info("No repository_stats table found, skipping migration", "component", "migration")
7272+ return markMigrationComplete(db)
7373+ }
7474+ return fmt.Errorf("failed to query repository_stats: %w", err)
7575+ }
7676+ defer rows.Close()
7777+7878+ var stats []struct {
7979+ DID string
8080+ Repository string
8181+ PullCount int64
8282+ LastPull sql.NullString
8383+ PushCount int64
8484+ LastPush sql.NullString
8585+ }
8686+8787+ for rows.Next() {
8888+ var stat struct {
8989+ DID string
9090+ Repository string
9191+ PullCount int64
9292+ LastPull sql.NullString
9393+ PushCount int64
9494+ LastPush sql.NullString
9595+ }
9696+ if err := rows.Scan(&stat.DID, &stat.Repository, &stat.PullCount, &stat.LastPull, &stat.PushCount, &stat.LastPush); err != nil {
9797+ return fmt.Errorf("failed to scan stat: %w", err)
9898+ }
9999+ stats = append(stats, stat)
100100+ }
101101+102102+ if len(stats) == 0 {
103103+ slog.Info("No stats to migrate", "component", "migration")
104104+ return markMigrationComplete(db)
105105+ }
106106+107107+ slog.Info("Found stats entries to migrate", "component", "migration", "count", len(stats))
108108+109109+ // Process each stat
110110+ successCount := 0
111111+ skipCount := 0
112112+ errorCount := 0
113113+114114+ for _, stat := range stats {
115115+ // Look up hold DID from manifests table
116116+ holdDID, err := GetLatestHoldDIDForRepo(db, stat.DID, stat.Repository)
117117+ if err != nil || holdDID == "" {
118118+ slog.Debug("No hold DID found for repo, skipping", "component", "migration",
119119+ "did", stat.DID, "repository", stat.Repository)
120120+ skipCount++
121121+ continue
122122+ }
123123+124124+ // Get user's PDS endpoint
125125+ user, err := GetUserByDID(db, stat.DID)
126126+ if err != nil || user == nil {
127127+ slog.Debug("User not found in database, skipping", "component", "migration",
128128+ "did", stat.DID, "repository", stat.Repository)
129129+ skipCount++
130130+ continue
131131+ }
132132+133133+ // Get service token for the user
134134+ serviceToken, err := getServiceToken(ctx, stat.DID, holdDID, user.PDSEndpoint)
135135+ if err != nil {
136136+ slog.Warn("Failed to get service token, skipping", "component", "migration",
137137+ "did", stat.DID, "repository", stat.Repository, "error", err)
138138+ errorCount++
139139+ continue
140140+ }
141141+142142+ // Resolve hold DID to HTTP URL
143143+ holdURL := atproto.ResolveHoldURL(holdDID)
144144+ if holdURL == "" {
145145+ slog.Warn("Failed to resolve hold DID, skipping", "component", "migration",
146146+ "hold_did", holdDID)
147147+ errorCount++
148148+ continue
149149+ }
150150+151151+ // Call hold's setStats endpoint
152152+ err = callSetStats(ctx, holdURL, serviceToken, stat.DID, stat.Repository,
153153+ stat.PullCount, stat.PushCount, stat.LastPull.String, stat.LastPush.String)
154154+ if err != nil {
155155+ slog.Warn("Failed to migrate stats to hold, continuing", "component", "migration",
156156+ "did", stat.DID, "repository", stat.Repository, "hold", holdDID, "error", err)
157157+ errorCount++
158158+ continue
159159+ }
160160+161161+ successCount++
162162+ slog.Debug("Migrated stats", "component", "migration",
163163+ "did", stat.DID, "repository", stat.Repository, "hold", holdDID,
164164+ "pull_count", stat.PullCount, "push_count", stat.PushCount)
165165+ }
166166+167167+ slog.Info("Stats migration completed", "component", "migration",
168168+ "success", successCount, "skipped", skipCount, "errors", errorCount, "total", len(stats))
169169+170170+ // Mark migration complete (even if some failed - they'll get updates via Jetstream)
171171+ return markMigrationComplete(db)
172172+}
173173+174174+// markMigrationComplete records that the stats migration has been done
175175+func markMigrationComplete(db *sql.DB) error {
176176+ _, err := db.Exec(`
177177+ INSERT INTO schema_migrations (version, applied_at)
178178+ VALUES (1000, datetime('now'))
179179+ ON CONFLICT(version) DO NOTHING
180180+ `)
181181+ if err != nil {
182182+ return fmt.Errorf("failed to mark migration complete: %w", err)
183183+ }
184184+ return nil
185185+}
186186+187187+// callSetStats calls the hold's io.atcr.hold.setStats endpoint
188188+func callSetStats(ctx context.Context, holdURL, serviceToken, ownerDID, repository string, pullCount, pushCount int64, lastPull, lastPush string) error {
189189+ // Build request
190190+ reqBody := map[string]any{
191191+ "ownerDid": ownerDID,
192192+ "repository": repository,
193193+ "pullCount": pullCount,
194194+ "pushCount": pushCount,
195195+ }
196196+ if lastPull != "" {
197197+ reqBody["lastPull"] = lastPull
198198+ }
199199+ if lastPush != "" {
200200+ reqBody["lastPush"] = lastPush
201201+ }
202202+203203+ body, err := json.Marshal(reqBody)
204204+ if err != nil {
205205+ return fmt.Errorf("failed to marshal request: %w", err)
206206+ }
207207+208208+ // Create HTTP request
209209+ req, err := http.NewRequestWithContext(ctx, "POST", holdURL+atproto.HoldSetStats, bytes.NewReader(body))
210210+ if err != nil {
211211+ return fmt.Errorf("failed to create request: %w", err)
212212+ }
213213+214214+ req.Header.Set("Content-Type", "application/json")
215215+ req.Header.Set("Authorization", "Bearer "+serviceToken)
216216+217217+ // Send request with timeout
218218+ client := &http.Client{Timeout: 10 * time.Second}
219219+ resp, err := client.Do(req)
220220+ if err != nil {
221221+ return fmt.Errorf("request failed: %w", err)
222222+ }
223223+ defer resp.Body.Close()
224224+225225+ if resp.StatusCode != http.StatusOK {
226226+ body, _ := io.ReadAll(resp.Body)
227227+ return fmt.Errorf("setStats failed: status %d, body: %s", resp.StatusCode, body)
228228+ }
229229+230230+ return nil
231231+}
+2-2
pkg/appview/jetstream/backfill.go
···48484949 return &BackfillWorker{
5050 db: database,
5151- client: client, // This points to the relay
5252- processor: NewProcessor(database, false), // No cache for batch processing
5151+ client: client, // This points to the relay
5252+ processor: NewProcessor(database, false, nil), // No cache for batch processing, no stats
5353 defaultHoldDID: defaultHoldDID,
5454 testMode: testMode,
5555 refresher: refresher,
+63-6
pkg/appview/jetstream/processor.go
···1616// Processor handles shared database operations for both Worker (live) and Backfill (sync)
1717// This eliminates code duplication between the two data ingestion paths
1818type Processor struct {
1919- db *sql.DB
2020- userCache *UserCache // Optional - enabled for Worker, disabled for Backfill
2121- useCache bool
1919+ db *sql.DB
2020+ userCache *UserCache // Optional - enabled for Worker, disabled for Backfill
2121+ statsCache *StatsCache // In-memory cache for per-hold stats aggregation
2222+ useCache bool
2223}
23242425// NewProcessor creates a new shared processor
2526// useCache: true for Worker (live streaming), false for Backfill (batch processing)
2626-func NewProcessor(database *sql.DB, useCache bool) *Processor {
2727+// statsCache: shared stats cache for aggregating across holds (nil to skip stats processing)
2828+func NewProcessor(database *sql.DB, useCache bool, statsCache *StatsCache) *Processor {
2729 p := &Processor{
2828- db: database,
2929- useCache: useCache,
3030+ db: database,
3131+ useCache: useCache,
3232+ statsCache: statsCache,
3033 }
31343235 if useCache {
···367370 "new_handle", newHandle)
368371369372 return nil
373373+}
374374+375375+// ProcessStats handles stats record events from hold PDSes
376376+// This is called when Jetstream receives a stats create/update/delete event from a hold
377377+// The holdDID is the DID of the hold PDS (event.DID), and the record contains ownerDID + repository
378378+func (p *Processor) ProcessStats(ctx context.Context, holdDID string, recordData []byte, isDelete bool) error {
379379+ // Skip if no stats cache configured
380380+ if p.statsCache == nil {
381381+ return nil
382382+ }
383383+384384+ // Unmarshal stats record
385385+ var statsRecord atproto.StatsRecord
386386+ if err := json.Unmarshal(recordData, &statsRecord); err != nil {
387387+ return fmt.Errorf("failed to unmarshal stats record: %w", err)
388388+ }
389389+390390+ if isDelete {
391391+ // Delete from in-memory cache
392392+ p.statsCache.Delete(holdDID, statsRecord.OwnerDID, statsRecord.Repository)
393393+ } else {
394394+ // Parse timestamps
395395+ var lastPull, lastPush *time.Time
396396+ if statsRecord.LastPull != "" {
397397+ t, err := time.Parse(time.RFC3339, statsRecord.LastPull)
398398+ if err == nil {
399399+ lastPull = &t
400400+ }
401401+ }
402402+ if statsRecord.LastPush != "" {
403403+ t, err := time.Parse(time.RFC3339, statsRecord.LastPush)
404404+ if err == nil {
405405+ lastPush = &t
406406+ }
407407+ }
408408+409409+ // Update in-memory cache
410410+ p.statsCache.Update(holdDID, statsRecord.OwnerDID, statsRecord.Repository,
411411+ statsRecord.PullCount, statsRecord.PushCount, lastPull, lastPush)
412412+ }
413413+414414+ // Get aggregated stats across all holds
415415+ totalPull, totalPush, latestPull, latestPush := p.statsCache.GetAggregated(
416416+ statsRecord.OwnerDID, statsRecord.Repository)
417417+418418+ // Upsert aggregated stats to repository_stats
419419+ return db.UpsertRepositoryStats(p.db, &db.RepositoryStats{
420420+ DID: statsRecord.OwnerDID,
421421+ Repository: statsRecord.Repository,
422422+ PullCount: int(totalPull),
423423+ PushCount: int(totalPush),
424424+ LastPull: latestPull,
425425+ LastPush: latestPush,
426426+ })
370427}
371428372429// ProcessAccount handles account status events (deactivation/reactivation)
+9-9
pkg/appview/jetstream/processor_test.go
···115115116116 for _, tt := range tests {
117117 t.Run(tt.name, func(t *testing.T) {
118118- p := NewProcessor(database, tt.useCache)
118118+ p := NewProcessor(database, tt.useCache, nil)
119119 if p == nil {
120120 t.Fatal("NewProcessor returned nil")
121121 }
···139139 database := setupTestDB(t)
140140 defer database.Close()
141141142142- p := NewProcessor(database, false)
142142+ p := NewProcessor(database, false, nil)
143143 ctx := context.Background()
144144145145 // Create test manifest record
···238238 database := setupTestDB(t)
239239 defer database.Close()
240240241241- p := NewProcessor(database, false)
241241+ p := NewProcessor(database, false, nil)
242242 ctx := context.Background()
243243244244 // Create test manifest list record
···322322 database := setupTestDB(t)
323323 defer database.Close()
324324325325- p := NewProcessor(database, false)
325325+ p := NewProcessor(database, false, nil)
326326 ctx := context.Background()
327327328328 // Create test tag record (using ManifestDigest field for simplicity)
···403403 database := setupTestDB(t)
404404 defer database.Close()
405405406406- p := NewProcessor(database, false)
406406+ p := NewProcessor(database, false, nil)
407407 ctx := context.Background()
408408409409 // Create test star record
···463463 database := setupTestDB(t)
464464 defer database.Close()
465465466466- p := NewProcessor(database, false)
466466+ p := NewProcessor(database, false, nil)
467467 ctx := context.Background()
468468469469 manifestRecord := &atproto.ManifestRecord{
···514514 database := setupTestDB(t)
515515 defer database.Close()
516516517517- p := NewProcessor(database, false)
517517+ p := NewProcessor(database, false, nil)
518518 ctx := context.Background()
519519520520 // Manifest with nil annotations
···555555 db := setupTestDB(t)
556556 defer db.Close()
557557558558- processor := NewProcessor(db, false)
558558+ processor := NewProcessor(db, false, nil)
559559560560 // Setup: Create test user
561561 testDID := "did:plc:alice123"
···621621 db := setupTestDB(t)
622622 defer db.Close()
623623624624- processor := NewProcessor(db, false)
624624+ processor := NewProcessor(db, false, nil)
625625626626 // Setup: Create test user
627627 testDID := "did:plc:bob456"
+100
pkg/appview/jetstream/stats_cache.go
···11+package jetstream
22+33+import (
44+ "sync"
55+ "time"
66+)
77+88+// HoldRepoStats represents stats for a single owner+repo from a specific hold
99+type HoldRepoStats struct {
1010+ OwnerDID string
1111+ Repository string
1212+ PullCount int64
1313+ PushCount int64
1414+ LastPull *time.Time
1515+ LastPush *time.Time
1616+}
1717+1818+// StatsCache provides in-memory caching of per-hold stats with aggregation
1919+// This allows summing stats across multiple holds for the same owner+repo
2020+type StatsCache struct {
2121+ mu sync.RWMutex
2222+ // holdDID -> (ownerDID/repo -> stats)
2323+ holds map[string]map[string]*HoldRepoStats
2424+}
2525+2626+// NewStatsCache creates a new in-memory stats cache
2727+func NewStatsCache() *StatsCache {
2828+ return &StatsCache{
2929+ holds: make(map[string]map[string]*HoldRepoStats),
3030+ }
3131+}
3232+3333+// makeKey creates a cache key from ownerDID and repository
3434+func makeKey(ownerDID, repo string) string {
3535+ return ownerDID + "/" + repo
3636+}
3737+3838+// Update stores or updates stats for a hold+owner+repo combination
3939+func (c *StatsCache) Update(holdDID, ownerDID, repo string, pullCount, pushCount int64, lastPull, lastPush *time.Time) {
4040+ c.mu.Lock()
4141+ defer c.mu.Unlock()
4242+4343+ // Ensure hold map exists
4444+ if c.holds[holdDID] == nil {
4545+ c.holds[holdDID] = make(map[string]*HoldRepoStats)
4646+ }
4747+4848+ key := makeKey(ownerDID, repo)
4949+ c.holds[holdDID][key] = &HoldRepoStats{
5050+ OwnerDID: ownerDID,
5151+ Repository: repo,
5252+ PullCount: pullCount,
5353+ PushCount: pushCount,
5454+ LastPull: lastPull,
5555+ LastPush: lastPush,
5656+ }
5757+}
5858+5959+// Delete removes stats for a hold+owner+repo combination
6060+func (c *StatsCache) Delete(holdDID, ownerDID, repo string) {
6161+ c.mu.Lock()
6262+ defer c.mu.Unlock()
6363+6464+ if c.holds[holdDID] != nil {
6565+ key := makeKey(ownerDID, repo)
6666+ delete(c.holds[holdDID], key)
6767+ }
6868+}
6969+7070+// GetAggregated returns aggregated stats for an owner+repo by summing across all holds
7171+// Returns (pullCount, pushCount, lastPull, lastPush)
7272+func (c *StatsCache) GetAggregated(ownerDID, repo string) (int64, int64, *time.Time, *time.Time) {
7373+ c.mu.RLock()
7474+ defer c.mu.RUnlock()
7575+7676+ key := makeKey(ownerDID, repo)
7777+ var totalPull, totalPush int64
7878+ var latestPull, latestPush *time.Time
7979+8080+ for _, holdStats := range c.holds {
8181+ if stats, ok := holdStats[key]; ok {
8282+ totalPull += stats.PullCount
8383+ totalPush += stats.PushCount
8484+8585+ // Track latest timestamps
8686+ if stats.LastPull != nil {
8787+ if latestPull == nil || stats.LastPull.After(*latestPull) {
8888+ latestPull = stats.LastPull
8989+ }
9090+ }
9191+ if stats.LastPush != nil {
9292+ if latestPush == nil || stats.LastPush.After(*latestPush) {
9393+ latestPush = stats.LastPush
9494+ }
9595+ }
9696+ }
9797+ }
9898+9999+ return totalPull, totalPush, latestPull, latestPush
100100+}
+39-2
pkg/appview/jetstream/worker.go
···3434 startCursor int64
3535 wantedCollections []string
3636 debugCollectionCount int
3737- processor *Processor // Shared processor for DB operations
3737+ processor *Processor // Shared processor for DB operations
3838+ statsCache *StatsCache // In-memory cache for stats aggregation across holds
3839 eventCallback EventCallback
3940 connStartTime time.Time // Track when connection started for debugging
4041···5657 jetstreamURL = "wss://jetstream2.us-west.bsky.network/subscribe"
5758 }
58596060+ // Create shared stats cache for aggregating across holds
6161+ statsCache := NewStatsCache()
6262+5963 return &Worker{
6064 db: database,
6165 jetstreamURL: jetstreamURL,
···6367 wantedCollections: []string{
6468 "io.atcr.*", // Subscribe to all ATCR collections
6569 },
6666- processor: NewProcessor(database, true), // Use cache for live streaming
7070+ statsCache: statsCache,
7171+ processor: NewProcessor(database, true, statsCache), // Use cache for live streaming
6772 }
6873}
6974···313318 case atproto.RepoPageCollection:
314319 slog.Info("Jetstream processing repo page event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey)
315320 return w.processRepoPage(commit)
321321+ case atproto.StatsCollection:
322322+ slog.Info("Jetstream processing stats event", "did", commit.DID, "operation", commit.Operation, "rkey", commit.RKey)
323323+ return w.processStats(commit)
316324 default:
317325 // Ignore other collections
318326 return nil
···470478471479 // Use shared processor for DB operations
472480 return w.processor.ProcessRepoPage(context.Background(), commit.DID, commit.RKey, recordBytes, false)
481481+}
482482+483483+// processStats processes a stats commit event from a hold PDS
484484+func (w *Worker) processStats(commit *CommitEvent) error {
485485+ isDelete := commit.Operation == "delete"
486486+487487+ if isDelete {
488488+ // For delete events, we need to parse the rkey to get ownerDID + repository
489489+ // The rkey is deterministic: base32(sha256(ownerDID + "/" + repository)[:16])
490490+ // Unfortunately, we can't reverse this - we need the record data
491491+ // Delete events don't include record data, so we can't delete from cache
492492+ // This is acceptable - stats will be refreshed on next update from hold
493493+ slog.Debug("Jetstream ignoring stats delete event (cannot reverse rkey)", "did", commit.DID, "rkey", commit.RKey)
494494+ return nil
495495+ }
496496+497497+ // Parse stats record
498498+ if commit.Record == nil {
499499+ return nil
500500+ }
501501+502502+ // Marshal map to bytes for processing
503503+ recordBytes, err := json.Marshal(commit.Record)
504504+ if err != nil {
505505+ return fmt.Errorf("failed to marshal record: %w", err)
506506+ }
507507+508508+ // Use shared processor - commit.DID is the hold's DID
509509+ return w.processor.ProcessStats(context.Background(), commit.DID, recordBytes, false)
473510}
474511475512// processIdentity processes an identity event (handle change)
+10-10
pkg/appview/middleware/registry.go
···174174// After initialization, request handling uses the NamespaceResolver's instance fields.
175175var (
176176 globalRefresher *oauth.Refresher
177177- globalDatabase storage.DatabaseMetrics
177177+ globalDatabase storage.HoldDIDLookup
178178 globalAuthorizer auth.HoldAuthorizer
179179)
180180···186186187187// SetGlobalDatabase sets the database instance during initialization
188188// Must be called before the registry starts serving requests
189189-func SetGlobalDatabase(database storage.DatabaseMetrics) {
189189+func SetGlobalDatabase(database storage.HoldDIDLookup) {
190190 globalDatabase = database
191191}
192192···204204// NamespaceResolver wraps a namespace and resolves names
205205type NamespaceResolver struct {
206206 distribution.Namespace
207207- defaultHoldDID string // Default hold DID (e.g., "did:web:hold01.atcr.io")
208208- baseURL string // Base URL for error messages (e.g., "https://atcr.io")
209209- testMode bool // If true, fallback to default hold when user's hold is unreachable
210210- refresher *oauth.Refresher // OAuth session manager (copied from global on init)
211211- database storage.DatabaseMetrics // Metrics database (copied from global on init)
212212- authorizer auth.HoldAuthorizer // Hold authorization (copied from global on init)
213213- validationCache *validationCache // Request-level service token cache
214214- readmeFetcher *readme.Fetcher // README fetcher for repo pages
207207+ defaultHoldDID string // Default hold DID (e.g., "did:web:hold01.atcr.io")
208208+ baseURL string // Base URL for error messages (e.g., "https://atcr.io")
209209+ testMode bool // If true, fallback to default hold when user's hold is unreachable
210210+ refresher *oauth.Refresher // OAuth session manager (copied from global on init)
211211+ database storage.HoldDIDLookup // Database for hold DID lookups (copied from global on init)
212212+ authorizer auth.HoldAuthorizer // Hold authorization (copied from global on init)
213213+ validationCache *validationCache // Request-level service token cache
214214+ readmeFetcher *readme.Fetcher // README fetcher for repo pages
215215}
216216217217// initATProtoResolver initializes the name resolution middleware
+3-5
pkg/appview/storage/context.go
···77 "atcr.io/pkg/auth/oauth"
88)
991010-// DatabaseMetrics interface for tracking pull/push counts and querying hold DIDs
1111-type DatabaseMetrics interface {
1212- IncrementPullCount(did, repository string) error
1313- IncrementPushCount(did, repository string) error
1010+// HoldDIDLookup interface for querying hold DIDs from manifests
1111+type HoldDIDLookup interface {
1412 GetLatestHoldDIDForRepo(did, repository string) (string, error)
1513}
1614···3230 PullerPDSEndpoint string // Puller's PDS endpoint URL
33313432 // Shared services (same for all requests)
3535- Database DatabaseMetrics // Metrics tracking database
3333+ Database HoldDIDLookup // Database for hold DID lookups
3634 Authorizer auth.HoldAuthorizer // Hold access authorization
3735 Refresher *oauth.Refresher // OAuth session manager
3836 ReadmeFetcher *readme.Fetcher // README fetcher for repo pages
···33//go:generate go run generate.go
4455import (
66+ "crypto/sha256"
77+ "encoding/base32"
68 "encoding/base64"
79 "encoding/json"
810 "fmt"
···3436 // LayerCollection is the collection name for container layer metadata
3537 // Stored in hold's embedded PDS to track which layers are stored
3638 LayerCollection = "io.atcr.hold.layer"
3939+4040+ // StatsCollection is the collection name for repository statistics
4141+ // Stored in hold's embedded PDS to track pull/push counts per owner+repo
4242+ StatsCollection = "io.atcr.hold.stats"
37433844 // TangledProfileCollection is the collection name for tangled profiles
3945 // Stored in hold's embedded PDS (singleton record at rkey "self")
···618624 UserHandle: userHandle,
619625 CreatedAt: time.Now().Format(time.RFC3339),
620626 }
627627+}
628628+629629+// StatsRecord represents repository statistics stored in the hold's PDS
630630+// Collection: io.atcr.hold.stats
631631+// Stored in the hold's embedded PDS for tracking manifest pull/push counts
632632+// Uses CBOR encoding for efficient storage in hold's carstore
633633+// RKey is deterministic: base32(sha256(ownerDID + "/" + repository)[:16])
634634+type StatsRecord struct {
635635+ Type string `json:"$type" cborgen:"$type"`
636636+ OwnerDID string `json:"ownerDid" cborgen:"ownerDid"` // DID of the image owner (e.g., "did:plc:xyz123")
637637+ Repository string `json:"repository" cborgen:"repository"` // Repository name (e.g., "myapp")
638638+ PullCount int64 `json:"pullCount" cborgen:"pullCount"` // Number of manifest downloads
639639+ LastPull string `json:"lastPull,omitempty" cborgen:"lastPull,omitempty"`
640640+ PushCount int64 `json:"pushCount" cborgen:"pushCount"` // Number of manifest uploads
641641+ LastPush string `json:"lastPush,omitempty" cborgen:"lastPush,omitempty"`
642642+ UpdatedAt string `json:"updatedAt" cborgen:"updatedAt"` // RFC3339 timestamp
643643+}
644644+645645+// NewStatsRecord creates a new stats record
646646+func NewStatsRecord(ownerDID, repository string) *StatsRecord {
647647+ return &StatsRecord{
648648+ Type: StatsCollection,
649649+ OwnerDID: ownerDID,
650650+ Repository: repository,
651651+ PullCount: 0,
652652+ PushCount: 0,
653653+ UpdatedAt: time.Now().Format(time.RFC3339),
654654+ }
655655+}
656656+657657+// StatsRecordKey generates a deterministic record key for stats
658658+// Uses base32 encoding of first 16 bytes of SHA-256 hash of "ownerDID/repository"
659659+// This ensures same owner+repo always maps to same rkey
660660+func StatsRecordKey(ownerDID, repository string) string {
661661+ combined := ownerDID + "/" + repository
662662+ hash := sha256.Sum256([]byte(combined))
663663+ // Use first 16 bytes (128 bits) for collision resistance
664664+ // Encode with base32 (alphanumeric, lowercase, no padding) for ATProto rkey compatibility
665665+ return strings.ToLower(base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(hash[:16]))
621666}
622667623668// TangledProfileRecord represents a Tangled profile for the hold
+159-75
pkg/hold/oci/xrpc.go
···5050 r.Post(atproto.HoldCompleteUpload, h.HandleCompleteUpload)
5151 r.Post(atproto.HoldAbortUpload, h.HandleAbortUpload)
5252 r.Post(atproto.HoldNotifyManifest, h.HandleNotifyManifest)
5353+ r.Post(atproto.HoldSetStats, h.HandleSetStats)
5354 })
5455}
5556···201202 })
202203}
203204204204-// HandleNotifyManifest handles manifest upload notifications from AppView
205205-// Creates layer records and optionally posts to Bluesky
205205+// HandleNotifyManifest handles manifest notifications from AppView
206206+// For pushes: Creates layer records and optionally posts to Bluesky
207207+// For pulls: Just increments stats (no layer records or posts)
208208+// Always increments stats (pull or push counts)
206209func (h *XRPCHandler) HandleNotifyManifest(w http.ResponseWriter, r *http.Request) {
207210 ctx := r.Context()
208211···219222 Tag string `json:"tag"`
220223 UserDID string `json:"userDid"`
221224 UserHandle string `json:"userHandle"`
225225+ Operation string `json:"operation"` // "push" or "pull", defaults to "push" for backward compatibility
222226 Manifest struct {
223227 MediaType string `json:"mediaType"`
224228 Config struct {
···253257 return
254258 }
255259256256- // Check if manifest posts are enabled
257257- // Read from captain record (which is synced with HOLD_BLUESKY_POSTS_ENABLED env var)
258258- postsEnabled := false
259259- _, captain, err := h.pds.GetCaptainRecord(ctx)
260260- if err == nil {
261261- postsEnabled = captain.EnableBlueskyPosts
262262- } else {
263263- // Fallback to env var if captain record doesn't exist (shouldn't happen in normal operation)
264264- postsEnabled = h.enableBlueskyPosts
260260+ // Default operation to "push" for backward compatibility
261261+ operation := req.Operation
262262+ if operation == "" {
263263+ operation = "push"
265264 }
266265267267- // Create layer records for each blob
268268- layersCreated := 0
269269- for _, layer := range req.Manifest.Layers {
270270- record := atproto.NewLayerRecord(
271271- layer.Digest,
272272- layer.Size,
273273- layer.MediaType,
274274- req.Repository,
275275- req.UserDID,
276276- req.UserHandle,
277277- )
266266+ // Validate operation
267267+ if operation != "push" && operation != "pull" {
268268+ RespondError(w, http.StatusBadRequest, fmt.Sprintf("invalid operation: %s (must be 'push' or 'pull')", operation))
269269+ return
270270+ }
271271+272272+ var layersCreated int
273273+ var postCreated bool
274274+ var postURI string
278275279279- _, _, err := h.pds.CreateLayerRecord(ctx, record)
280280- if err != nil {
281281- slog.Error("Failed to create layer record", "error", err)
282282- // Continue creating other records
276276+ // Only create layer records and Bluesky posts for pushes
277277+ if operation == "push" {
278278+ // Check if manifest posts are enabled
279279+ // Read from captain record (which is synced with HOLD_BLUESKY_POSTS_ENABLED env var)
280280+ postsEnabled := false
281281+ _, captain, err := h.pds.GetCaptainRecord(ctx)
282282+ if err == nil {
283283+ postsEnabled = captain.EnableBlueskyPosts
283284 } else {
284284- layersCreated++
285285+ // Fallback to env var if captain record doesn't exist (shouldn't happen in normal operation)
286286+ postsEnabled = h.enableBlueskyPosts
285287 }
286286- }
287288288288- // Check if this is a multi-arch image (has manifests instead of layers)
289289- isMultiArch := len(req.Manifest.Manifests) > 0
289289+ // Create layer records for each blob
290290+ for _, layer := range req.Manifest.Layers {
291291+ record := atproto.NewLayerRecord(
292292+ layer.Digest,
293293+ layer.Size,
294294+ layer.MediaType,
295295+ req.Repository,
296296+ req.UserDID,
297297+ req.UserHandle,
298298+ )
290299291291- // Calculate total size from all layers (for single-arch images)
292292- var totalSize int64
293293- for _, layer := range req.Manifest.Layers {
294294- totalSize += layer.Size
295295- }
296296- totalSize += req.Manifest.Config.Size // Add config blob size
297297-298298- // Extract platforms for multi-arch images
299299- var platforms []string
300300- if isMultiArch {
301301- for _, m := range req.Manifest.Manifests {
302302- if m.Platform != nil {
303303- platforms = append(platforms, m.Platform.OS+"/"+m.Platform.Architecture)
300300+ _, _, err := h.pds.CreateLayerRecord(ctx, record)
301301+ if err != nil {
302302+ slog.Error("Failed to create layer record", "error", err)
303303+ // Continue creating other records
304304+ } else {
305305+ layersCreated++
304306 }
305307 }
306306- }
308308+309309+ // Check if this is a multi-arch image (has manifests instead of layers)
310310+ isMultiArch := len(req.Manifest.Manifests) > 0
307311308308- // Create Bluesky post if enabled
309309- var postURI string
310310- postCreated := false
311311- if postsEnabled {
312312- // Extract manifest digest from first layer (or use config digest as fallback)
313313- manifestDigest := req.Manifest.Config.Digest
314314- if len(req.Manifest.Layers) > 0 {
315315- manifestDigest = req.Manifest.Layers[0].Digest
312312+ // Calculate total size from all layers (for single-arch images)
313313+ var totalSize int64
314314+ for _, layer := range req.Manifest.Layers {
315315+ totalSize += layer.Size
316316 }
317317+ totalSize += req.Manifest.Config.Size // Add config blob size
317318318318- postURI, err = h.pds.CreateManifestPost(
319319- ctx,
320320- h.driver,
321321- req.Repository,
322322- req.Tag,
323323- req.UserHandle,
324324- req.UserDID,
325325- manifestDigest,
326326- totalSize,
327327- platforms,
328328- )
329329- if err != nil {
330330- slog.Error("Failed to create manifest post", "error", err)
331331- } else {
332332- postCreated = true
319319+ // Extract platforms for multi-arch images
320320+ var platforms []string
321321+ if isMultiArch {
322322+ for _, m := range req.Manifest.Manifests {
323323+ if m.Platform != nil {
324324+ platforms = append(platforms, m.Platform.OS+"/"+m.Platform.Architecture)
325325+ }
326326+ }
327327+ }
328328+329329+ // Create Bluesky post if enabled
330330+ if postsEnabled {
331331+ // Extract manifest digest from first layer (or use config digest as fallback)
332332+ manifestDigest := req.Manifest.Config.Digest
333333+ if len(req.Manifest.Layers) > 0 {
334334+ manifestDigest = req.Manifest.Layers[0].Digest
335335+ }
336336+337337+ postURI, err = h.pds.CreateManifestPost(
338338+ ctx,
339339+ h.driver,
340340+ req.Repository,
341341+ req.Tag,
342342+ req.UserHandle,
343343+ req.UserDID,
344344+ manifestDigest,
345345+ totalSize,
346346+ platforms,
347347+ )
348348+ if err != nil {
349349+ slog.Error("Failed to create manifest post", "error", err)
350350+ } else {
351351+ postCreated = true
352352+ }
333353 }
354354+ }
355355+356356+ // ALWAYS increment stats (even if Bluesky posts disabled, even for pulls)
357357+ statsUpdated := false
358358+ if err := h.pds.IncrementStats(ctx, req.UserDID, req.Repository, operation); err != nil {
359359+ slog.Error("Failed to increment stats", "operation", operation, "error", err)
360360+ } else {
361361+ statsUpdated = true
334362 }
335363336364 // Return response
337365 resp := map[string]any{
338338- "success": layersCreated > 0 || postCreated,
339339- "layersCreated": layersCreated,
340340- "postCreated": postCreated,
366366+ "success": statsUpdated || layersCreated > 0 || postCreated,
367367+ "operation": operation,
368368+ "statsUpdated": statsUpdated,
369369+ }
370370+371371+ // Only include push-specific fields for push operations
372372+ if operation == "push" {
373373+ resp["layersCreated"] = layersCreated
374374+ resp["postCreated"] = postCreated
375375+ if postURI != "" {
376376+ resp["postUri"] = postURI
377377+ }
378378+ }
379379+380380+ RespondJSON(w, http.StatusOK, resp)
381381+}
382382+383383+// HandleSetStats sets absolute stats values for a repository (used by migration)
384384+// This is a migration-only endpoint that allows AppView to sync existing stats to holds
385385+func (h *XRPCHandler) HandleSetStats(w http.ResponseWriter, r *http.Request) {
386386+ ctx := r.Context()
387387+388388+ // Validate service token (same auth as blob:write endpoints)
389389+ validatedUser, err := pds.ValidateBlobWriteAccess(r, h.pds, h.httpClient)
390390+ if err != nil {
391391+ RespondError(w, http.StatusForbidden, fmt.Sprintf("authorization failed: %v", err))
392392+ return
341393 }
342342- if postURI != "" {
343343- resp["postUri"] = postURI
394394+395395+ // Parse request
396396+ var req struct {
397397+ OwnerDID string `json:"ownerDid"`
398398+ Repository string `json:"repository"`
399399+ PullCount int64 `json:"pullCount"`
400400+ PushCount int64 `json:"pushCount"`
401401+ LastPull string `json:"lastPull,omitempty"`
402402+ LastPush string `json:"lastPush,omitempty"`
344403 }
345345- if err != nil && layersCreated == 0 && !postCreated {
346346- resp["error"] = err.Error()
404404+405405+ if err := DecodeJSON(r, &req); err != nil {
406406+ RespondError(w, http.StatusBadRequest, err.Error())
407407+ return
347408 }
348409349349- RespondJSON(w, http.StatusOK, resp)
410410+ // Verify user DID matches token (user can only set stats for their own repos)
411411+ if req.OwnerDID != validatedUser.DID {
412412+ RespondError(w, http.StatusForbidden, "owner DID mismatch")
413413+ return
414414+ }
415415+416416+ // Validate required fields
417417+ if req.OwnerDID == "" || req.Repository == "" {
418418+ RespondError(w, http.StatusBadRequest, "ownerDid and repository are required")
419419+ return
420420+ }
421421+422422+ // Set stats using the SetStats method
423423+ if err := h.pds.SetStats(ctx, req.OwnerDID, req.Repository, req.PullCount, req.PushCount, req.LastPull, req.LastPush); err != nil {
424424+ slog.Error("Failed to set stats", "error", err)
425425+ RespondError(w, http.StatusInternalServerError, fmt.Sprintf("failed to set stats: %v", err))
426426+ return
427427+ }
428428+429429+ slog.Info("Stats set via migration", "owner_did", req.OwnerDID, "repository", req.Repository, "pull_count", req.PullCount, "push_count", req.PushCount)
430430+431431+ RespondJSON(w, http.StatusOK, map[string]any{
432432+ "success": true,
433433+ })
350434}
351435352436// requireBlobWriteAccess middleware - validates DPoP + OAuth and checks for blob:write permission
+2-1
pkg/hold/pds/server.go
···2222// init registers our custom ATProto types with indigo's lexutil type registry
2323// This allows repomgr.GetRecord to automatically unmarshal our types
2424func init() {
2525- // Register captain, crew, tangled profile, and layer record types
2525+ // Register captain, crew, tangled profile, layer, and stats record types
2626 // These must match the $type field in the records
2727 lexutil.RegisterType(atproto.CaptainCollection, &atproto.CaptainRecord{})
2828 lexutil.RegisterType(atproto.CrewCollection, &atproto.CrewRecord{})
2929 lexutil.RegisterType(atproto.LayerCollection, &atproto.LayerRecord{})
3030 lexutil.RegisterType(atproto.TangledProfileCollection, &atproto.TangledProfileRecord{})
3131+ lexutil.RegisterType(atproto.StatsCollection, &atproto.StatsRecord{})
3132}
32333334// HoldPDS is a minimal ATProto PDS implementation for a hold service
+218
pkg/hold/pds/stats.go
···11+package pds
22+33+import (
44+ "bytes"
55+ "context"
66+ "errors"
77+ "fmt"
88+ "log/slog"
99+ "strings"
1010+ "time"
1111+1212+ "atcr.io/pkg/atproto"
1313+ "github.com/bluesky-social/indigo/repo"
1414+ "github.com/ipfs/go-cid"
1515+)
1616+1717+// IncrementStats increments the pull or push count for a repository
1818+// operation should be "pull" or "push"
1919+// Creates a new record if none exists, updates existing record otherwise
2020+func (p *HoldPDS) IncrementStats(ctx context.Context, ownerDID, repository, operation string) error {
2121+ if operation != "pull" && operation != "push" {
2222+ return fmt.Errorf("invalid operation: %s (must be 'pull' or 'push')", operation)
2323+ }
2424+2525+ rkey := atproto.StatsRecordKey(ownerDID, repository)
2626+ now := time.Now().Format(time.RFC3339)
2727+2828+ // Try to get existing record
2929+ _, existing, err := p.GetStats(ctx, ownerDID, repository)
3030+ if err != nil {
3131+ // Record doesn't exist - create new one
3232+ record := atproto.NewStatsRecord(ownerDID, repository)
3333+ if operation == "pull" {
3434+ record.PullCount = 1
3535+ record.LastPull = now
3636+ } else {
3737+ record.PushCount = 1
3838+ record.LastPush = now
3939+ }
4040+ record.UpdatedAt = now
4141+4242+ _, _, err := p.repomgr.PutRecord(ctx, p.uid, atproto.StatsCollection, rkey, record)
4343+ if err != nil {
4444+ return fmt.Errorf("failed to create stats record: %w", err)
4545+ }
4646+4747+ slog.Debug("Created stats record",
4848+ "ownerDID", ownerDID,
4949+ "repository", repository,
5050+ "operation", operation)
5151+ return nil
5252+ }
5353+5454+ // Record exists - update it
5555+ if operation == "pull" {
5656+ existing.PullCount++
5757+ existing.LastPull = now
5858+ } else {
5959+ existing.PushCount++
6060+ existing.LastPush = now
6161+ }
6262+ existing.UpdatedAt = now
6363+6464+ _, err = p.repomgr.UpdateRecord(ctx, p.uid, atproto.StatsCollection, rkey, existing)
6565+ if err != nil {
6666+ return fmt.Errorf("failed to update stats record: %w", err)
6767+ }
6868+6969+ slog.Debug("Updated stats record",
7070+ "ownerDID", ownerDID,
7171+ "repository", repository,
7272+ "operation", operation,
7373+ "pullCount", existing.PullCount,
7474+ "pushCount", existing.PushCount)
7575+ return nil
7676+}
7777+7878+// GetStats retrieves the stats record for a repository
7979+// Returns nil, nil if no stats record exists
8080+func (p *HoldPDS) GetStats(ctx context.Context, ownerDID, repository string) (cid.Cid, *atproto.StatsRecord, error) {
8181+ rkey := atproto.StatsRecordKey(ownerDID, repository)
8282+8383+ recordCID, val, err := p.repomgr.GetRecord(ctx, p.uid, atproto.StatsCollection, rkey, cid.Undef)
8484+ if err != nil {
8585+ return cid.Undef, nil, err
8686+ }
8787+8888+ statsRecord, ok := val.(*atproto.StatsRecord)
8989+ if !ok {
9090+ return cid.Undef, nil, fmt.Errorf("unexpected type for stats record: %T", val)
9191+ }
9292+9393+ return recordCID, statsRecord, nil
9494+}
9595+9696+// SetStats directly sets the stats for a repository (used for migration)
9797+// Creates or updates the stats record with the specified counts
9898+func (p *HoldPDS) SetStats(ctx context.Context, ownerDID, repository string, pullCount, pushCount int64, lastPull, lastPush string) error {
9999+ rkey := atproto.StatsRecordKey(ownerDID, repository)
100100+ now := time.Now().Format(time.RFC3339)
101101+102102+ // Try to get existing record
103103+ _, existing, err := p.GetStats(ctx, ownerDID, repository)
104104+ if err != nil {
105105+ // Record doesn't exist - create new one
106106+ record := &atproto.StatsRecord{
107107+ Type: atproto.StatsCollection,
108108+ OwnerDID: ownerDID,
109109+ Repository: repository,
110110+ PullCount: pullCount,
111111+ PushCount: pushCount,
112112+ LastPull: lastPull,
113113+ LastPush: lastPush,
114114+ UpdatedAt: now,
115115+ }
116116+117117+ _, _, err := p.repomgr.PutRecord(ctx, p.uid, atproto.StatsCollection, rkey, record)
118118+ if err != nil {
119119+ return fmt.Errorf("failed to create stats record: %w", err)
120120+ }
121121+ return nil
122122+ }
123123+124124+ // Record exists - update it
125125+ existing.PullCount = pullCount
126126+ existing.PushCount = pushCount
127127+ existing.LastPull = lastPull
128128+ existing.LastPush = lastPush
129129+ existing.UpdatedAt = now
130130+131131+ _, err = p.repomgr.UpdateRecord(ctx, p.uid, atproto.StatsCollection, rkey, existing)
132132+ if err != nil {
133133+ return fmt.Errorf("failed to update stats record: %w", err)
134134+ }
135135+136136+ return nil
137137+}
138138+139139+// ListStats returns all stats records in the hold's PDS
140140+// This is used by AppView to aggregate stats from all holds
141141+func (p *HoldPDS) ListStats(ctx context.Context) ([]*atproto.StatsRecord, error) {
142142+ // Get read-only session from carstore
143143+ session, err := p.carstore.ReadOnlySession(p.uid)
144144+ if err != nil {
145145+ return nil, fmt.Errorf("failed to get read-only session: %w", err)
146146+ }
147147+148148+ // Get repo head
149149+ head, err := p.carstore.GetUserRepoHead(ctx, p.uid)
150150+ if err != nil {
151151+ return nil, fmt.Errorf("failed to get repo head: %w", err)
152152+ }
153153+154154+ if !head.Defined() {
155155+ // No repo yet, return empty list
156156+ return []*atproto.StatsRecord{}, nil
157157+ }
158158+159159+ // Open repo
160160+ r, err := repo.OpenRepo(ctx, session, head)
161161+ if err != nil {
162162+ return nil, fmt.Errorf("failed to open repo: %w", err)
163163+ }
164164+165165+ var stats []*atproto.StatsRecord
166166+167167+ // Iterate over all stats records
168168+ err = r.ForEach(ctx, atproto.StatsCollection, func(k string, v cid.Cid) error {
169169+ // Extract collection and rkey from full path (k is like "io.atcr.hold.stats/abcd1234...")
170170+ parts := strings.Split(k, "/")
171171+ if len(parts) < 2 {
172172+ return nil // Skip invalid keys
173173+ }
174174+175175+ // Extract actual collection
176176+ actualCollection := strings.Join(parts[:len(parts)-1], "/")
177177+178178+ // MST keys are sorted, so once we hit a different collection, stop walking
179179+ if actualCollection != atproto.StatsCollection {
180180+ return repo.ErrDoneIterating
181181+ }
182182+183183+ // Get record bytes
184184+ _, recBytes, err := r.GetRecordBytes(ctx, k)
185185+ if err != nil {
186186+ slog.Warn("Failed to get stats record bytes", "key", k, "error", err)
187187+ return nil // Continue with other records
188188+ }
189189+190190+ if recBytes == nil {
191191+ return nil
192192+ }
193193+194194+ // Unmarshal the CBOR bytes
195195+ var statsRecord atproto.StatsRecord
196196+ if err := statsRecord.UnmarshalCBOR(bytes.NewReader(*recBytes)); err != nil {
197197+ slog.Warn("Failed to unmarshal stats record", "key", k, "error", err)
198198+ return nil // Continue with other records
199199+ }
200200+201201+ stats = append(stats, &statsRecord)
202202+ return nil
203203+ })
204204+205205+ if err != nil {
206206+ // ErrDoneIterating is expected when we stop walking early
207207+ if errors.Is(err, repo.ErrDoneIterating) {
208208+ // Successfully stopped at collection boundary
209209+ } else if strings.Contains(err.Error(), "not found") {
210210+ // Collection doesn't exist yet - return empty list
211211+ return []*atproto.StatsRecord{}, nil
212212+ } else {
213213+ return nil, fmt.Errorf("failed to iterate stats records: %w", err)
214214+ }
215215+ }
216216+217217+ return stats, nil
218218+}