A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory

update shard cache

+160 -101
+157 -98
internal/didindex/manager.go
··· 7 7 "os" 8 8 "path/filepath" 9 9 "sort" 10 + "sync/atomic" 10 11 "syscall" 11 12 "time" 12 13 ··· 41 42 indexDir: indexDir, 42 43 shardDir: shardDir, 43 44 configPath: configPath, 44 - shardCache: make(map[uint8]*mmapShard), 45 45 maxCache: 5, 46 46 evictionThreshold: 5, 47 47 config: config, ··· 51 51 52 52 // Close unmaps all shards and cleans up 53 53 func (dim *Manager) Close() error { 54 - dim.cacheMu.Lock() 55 - defer dim.cacheMu.Unlock() 54 + // Mark all shards for eviction 55 + var shards []*mmapShard 56 + 57 + dim.shardCache.Range(func(key, value interface{}) bool { 58 + shard := value.(*mmapShard) 59 + shards = append(shards, shard) 60 + dim.shardCache.Delete(key) 61 + return true 62 + }) 56 63 57 - for _, shard := range dim.shardCache { 64 + // Wait for refcounts to drop to 0 65 + for _, shard := range shards { 66 + for atomic.LoadInt64(&shard.refCount) > 0 { 67 + time.Sleep(1 * time.Millisecond) 68 + } 58 69 dim.unmapShard(shard) 59 70 } 60 - 61 - dim.shardCache = make(map[uint8]*mmapShard) 62 71 63 72 return nil 64 73 } ··· 69 78 70 79 // GetDIDLocations returns all bundle+position locations for a DID 71 80 func (dim *Manager) GetDIDLocations(did string) ([]OpLocation, error) { 72 - dim.indexMu.RLock() 73 - defer dim.indexMu.RUnlock() 74 - 75 - // Validate and extract identifier 76 81 identifier, err := extractDIDIdentifier(did) 77 82 if err != nil { 78 83 return nil, err 79 84 } 80 85 81 - // Calculate shard number 82 86 shardNum := dim.calculateShard(identifier) 83 87 if dim.verbose { 84 88 dim.logger.Printf("DEBUG: DID %s -> identifier '%s' -> shard %02x", did, identifier, shardNum) 85 89 } 86 90 87 - // Load shard 88 91 shard, err := dim.loadShard(shardNum) 89 92 if err != nil { 90 93 if dim.verbose { ··· 93 96 return nil, fmt.Errorf("failed to load shard %02x: %w", shardNum, err) 94 97 } 95 98 99 + // CRITICAL: Release shard when done 100 + defer dim.releaseShard(shard) 101 + 96 102 if shard.data == nil { 97 103 if dim.verbose { 98 104 dim.logger.Printf("DEBUG: Shard %02x has no data (empty shard)", shardNum) ··· 104 110 dim.logger.Printf("DEBUG: Shard %02x loaded, size: %d bytes", shardNum, len(shard.data)) 105 111 } 106 112 107 - // Binary search 113 + // ✅ Safe to read - refcount prevents eviction 108 114 locations := dim.searchShard(shard, identifier) 115 + 109 116 if dim.verbose { 110 117 dim.logger.Printf("DEBUG: Binary search found %d locations", len(locations)) 111 118 if len(locations) > 0 { ··· 113 120 } 114 121 } 115 122 116 - dim.cacheMu.RLock() 117 - cacheSize := len(dim.shardCache) 118 - dim.cacheMu.RUnlock() 119 - 120 - if dim.verbose || cacheSize > dim.maxCache { 121 - dim.logger.Printf("DEBUG: Shard cache size: %d/%d shards (after lookup)", cacheSize, dim.maxCache) 122 - } 123 - 124 123 return locations, nil 125 124 } 126 125 ··· 134 133 135 134 // loadShard loads a shard from cache or disk (with madvise optimization) 136 135 func (dim *Manager) loadShard(shardNum uint8) (*mmapShard, error) { 137 - dim.cacheMu.Lock() 138 - defer dim.cacheMu.Unlock() 136 + // Fast path: cache hit 137 + if val, ok := dim.shardCache.Load(shardNum); ok { 138 + shard := val.(*mmapShard) 139 139 140 - // Check cache 141 - if shard, exists := dim.shardCache[shardNum]; exists { 142 - shard.lastUsed = time.Now() 143 - shard.accessCount++ // Track for eviction 140 + // Increment refcount BEFORE returning 141 + atomic.AddInt64(&shard.refCount, 1) 142 + atomic.StoreInt64(&shard.lastUsed, time.Now().Unix()) 143 + atomic.AddInt64(&shard.accessCount, 1) 144 + 144 145 return shard, nil 145 146 } 146 147 147 - // Load from disk 148 + // Cache miss - load from disk 148 149 shardPath := filepath.Join(dim.shardDir, fmt.Sprintf("%02x.idx", shardNum)) 149 150 150 - // Check if file exists 151 151 if _, err := os.Stat(shardPath); os.IsNotExist(err) { 152 + // Empty shard - no refcount needed 152 153 return &mmapShard{ 153 154 shardNum: shardNum, 154 155 data: nil, 155 - lastUsed: time.Now(), 156 + lastUsed: time.Now().Unix(), 157 + refCount: 0, // Not in cache 156 158 }, nil 157 159 } 158 160 159 - // Open file 160 161 file, err := os.Open(shardPath) 161 162 if err != nil { 162 163 return nil, err 163 164 } 164 165 165 - // Get file size 166 166 info, err := file.Stat() 167 167 if err != nil { 168 168 file.Close() ··· 174 174 return &mmapShard{ 175 175 shardNum: shardNum, 176 176 data: nil, 177 - lastUsed: time.Now(), 177 + lastUsed: time.Now().Unix(), 178 + refCount: 0, 178 179 }, nil 179 180 } 180 181 ··· 186 187 return nil, fmt.Errorf("mmap failed: %w", err) 187 188 } 188 189 189 - // ✨ NEW: Apply madvise hints 190 190 if err := dim.applyMadviseHints(data, info.Size()); err != nil { 191 191 if dim.verbose { 192 192 dim.logger.Printf("DEBUG: madvise failed (non-fatal): %v", err) ··· 197 197 shardNum: shardNum, 198 198 data: data, 199 199 file: file, 200 - lastUsed: time.Now(), 200 + lastUsed: time.Now().Unix(), 201 201 accessCount: 1, 202 + refCount: 1, 202 203 } 203 204 204 - // Add to cache 205 - dim.shardCache[shardNum] = shard 205 + // Try to store 206 + actual, loaded := dim.shardCache.LoadOrStore(shardNum, shard) 206 207 207 - // Lazy eviction 208 - if len(dim.shardCache) > dim.evictionThreshold { 209 - dim.evictMultiple(len(dim.shardCache) - dim.maxCache) 208 + if loaded { 209 + // Someone else loaded it - cleanup ours 210 + dim.unmapShard(shard) 211 + 212 + actualShard := actual.(*mmapShard) 213 + atomic.AddInt64(&actualShard.refCount, 1) // Increment their refcount 214 + atomic.StoreInt64(&actualShard.lastUsed, time.Now().Unix()) 215 + atomic.AddInt64(&actualShard.accessCount, 1) 216 + return actualShard, nil 210 217 } 218 + 219 + // We stored it - maybe evict 220 + go dim.evictIfNeeded() // Run async to avoid blocking 211 221 212 222 return shard, nil 213 223 } ··· 239 249 } 240 250 241 251 return nil 242 - } 243 - 244 - // evictMultiple evicts multiple shards at once 245 - func (dim *Manager) evictMultiple(count int) { 246 - if count <= 0 { 247 - return 248 - } 249 - 250 - type entry struct { 251 - num uint8 252 - lastUsed time.Time 253 - } 254 - 255 - entries := make([]entry, 0, len(dim.shardCache)) 256 - for num, shard := range dim.shardCache { 257 - entries = append(entries, entry{num, shard.lastUsed}) 258 - } 259 - 260 - // Sort by lastUsed (oldest first) 261 - sort.Slice(entries, func(i, j int) bool { 262 - return entries[i].lastUsed.Before(entries[j].lastUsed) 263 - }) 264 - 265 - // Evict oldest 'count' entries 266 - for i := 0; i < count && i < len(entries); i++ { 267 - if victim, exists := dim.shardCache[entries[i].num]; exists { 268 - dim.unmapShard(victim) 269 - delete(dim.shardCache, entries[i].num) 270 - } 271 - } 272 252 } 273 253 274 254 // searchShard performs optimized binary search using prefix index ··· 478 458 479 459 // GetStats returns index statistics 480 460 func (dim *Manager) GetStats() map[string]interface{} { 481 - dim.cacheMu.RLock() 482 - defer dim.cacheMu.RUnlock() 461 + cachedShards := make([]int, 0) 462 + 463 + dim.shardCache.Range(func(key, value interface{}) bool { 464 + cachedShards = append(cachedShards, int(key.(uint8))) 465 + return true 466 + }) 483 467 484 - cachedShards := make([]int, 0, len(dim.shardCache)) 485 - for num := range dim.shardCache { 486 - cachedShards = append(cachedShards, int(num)) 487 - } 488 468 sort.Ints(cachedShards) 489 469 490 470 return map[string]interface{}{ 491 471 "total_dids": dim.config.TotalDIDs, 492 472 "last_bundle": dim.config.LastBundle, 493 473 "shard_count": dim.config.ShardCount, 494 - "cached_shards": len(dim.shardCache), 474 + "cached_shards": len(cachedShards), 495 475 "cache_limit": dim.maxCache, 496 476 "cache_order": cachedShards, 497 477 "updated_at": dim.config.UpdatedAt, ··· 506 486 507 487 // TrimCache trims cache to keep only most recent shard 508 488 func (dim *Manager) TrimCache() { 509 - dim.cacheMu.Lock() 510 - defer dim.cacheMu.Unlock() 489 + // Count current size 490 + size := 0 491 + dim.shardCache.Range(func(k, v interface{}) bool { 492 + size++ 493 + return true 494 + }) 511 495 512 - if len(dim.shardCache) <= 1 { 496 + if size <= 1 { 513 497 return 514 498 } 515 499 516 - // Find most recent shard to keep 517 - var newestTime time.Time 500 + // Find most recent shard 501 + var newestTime int64 518 502 var keepNum uint8 519 - for num, shard := range dim.shardCache { 520 - if shard.lastUsed.After(newestTime) { 521 - newestTime = shard.lastUsed 522 - keepNum = num 503 + 504 + dim.shardCache.Range(func(key, value interface{}) bool { 505 + shard := value.(*mmapShard) 506 + lastUsed := atomic.LoadInt64(&shard.lastUsed) 507 + if lastUsed > newestTime { 508 + newestTime = lastUsed 509 + keepNum = key.(uint8) 523 510 } 524 - } 511 + return true 512 + }) 525 513 526 - // Evict all except the newest 527 - for num, shard := range dim.shardCache { 514 + // Evict all except newest 515 + dim.shardCache.Range(func(key, value interface{}) bool { 516 + num := key.(uint8) 528 517 if num != keepNum { 529 - dim.unmapShard(shard) 530 - delete(dim.shardCache, num) 518 + if val, ok := dim.shardCache.LoadAndDelete(key); ok { 519 + shard := val.(*mmapShard) 520 + dim.unmapShard(shard) 521 + } 531 522 } 532 - } 523 + return true 524 + }) 533 525 } 534 526 535 527 // GetConfig returns the index configuration ··· 578 570 return nil 579 571 } 580 572 581 - // invalidateShard removes a shard from cache 582 573 func (dim *Manager) invalidateShard(shardNum uint8) { 583 - dim.cacheMu.Lock() 584 - defer dim.cacheMu.Unlock() 574 + if val, ok := dim.shardCache.LoadAndDelete(shardNum); ok { 575 + shard := val.(*mmapShard) 576 + 577 + for atomic.LoadInt64(&shard.refCount) > 0 { 578 + time.Sleep(1 * time.Millisecond) 579 + } 585 580 586 - if cached, exists := dim.shardCache[shardNum]; exists { 587 - dim.unmapShard(cached) 588 - delete(dim.shardCache, shardNum) 581 + dim.unmapShard(shard) 589 582 } 590 583 } 591 584 ··· 806 799 807 800 return &config, nil 808 801 } 802 + 803 + func (dim *Manager) evictIfNeeded() { 804 + size := 0 805 + dim.shardCache.Range(func(_, _ interface{}) bool { 806 + size++ 807 + return true 808 + }) 809 + 810 + if size <= dim.evictionThreshold { 811 + return 812 + } 813 + 814 + type entry struct { 815 + num uint8 816 + lastUsed int64 817 + refCount int64 818 + } 819 + 820 + var entries []entry 821 + 822 + dim.shardCache.Range(func(key, value interface{}) bool { 823 + shard := value.(*mmapShard) 824 + entries = append(entries, entry{ 825 + num: key.(uint8), 826 + lastUsed: atomic.LoadInt64(&shard.lastUsed), 827 + refCount: atomic.LoadInt64(&shard.refCount), 828 + }) 829 + return true 830 + }) 831 + 832 + // Sort by lastUsed (oldest first) 833 + sort.Slice(entries, func(i, j int) bool { 834 + return entries[i].lastUsed < entries[j].lastUsed 835 + }) 836 + 837 + // Evict oldest shards that are NOT in use 838 + toEvict := size - dim.maxCache 839 + evicted := 0 840 + 841 + for i := 0; i < len(entries) && evicted < toEvict; i++ { 842 + // Only evict if refCount == 0 (not in use) 843 + if entries[i].refCount == 0 { 844 + if val, ok := dim.shardCache.LoadAndDelete(entries[i].num); ok { 845 + shard := val.(*mmapShard) 846 + 847 + // Double-check refcount (race protection) 848 + if atomic.LoadInt64(&shard.refCount) == 0 { 849 + dim.unmapShard(shard) 850 + evicted++ 851 + } else { 852 + // Someone started using it - put it back 853 + dim.shardCache.Store(entries[i].num, shard) 854 + } 855 + } 856 + } 857 + } 858 + } 859 + 860 + // releaseShard decrements reference count 861 + func (dim *Manager) releaseShard(shard *mmapShard) { 862 + if shard == nil || shard.data == nil { 863 + return 864 + } 865 + 866 + atomic.AddInt64(&shard.refCount, -1) 867 + }
+3 -3
internal/didindex/types.go
··· 36 36 configPath string 37 37 38 38 // LRU cache for hot shards 39 - shardCache map[uint8]*mmapShard 39 + shardCache sync.Map 40 40 maxCache int 41 - cacheMu sync.RWMutex 42 41 evictionThreshold int 43 42 44 43 config *Config ··· 53 52 shardNum uint8 54 53 data []byte 55 54 file interface{} // *os.File (avoid import) 56 - lastUsed time.Time 55 + lastUsed int64 57 56 accessCount int64 57 + refCount int64 58 58 } 59 59 60 60 // Config stores index metadata