A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.

start researching quotas based on layer size per DID

evan.jarrett.net 53e196a2 f74bc301

verified
+846 -1304
+324 -1073
docs/QUOTAS.md
··· 1 1 # ATCR Quota System 2 2 3 - This document describes ATCR's storage quota implementation, inspired by Harbor's proven approach to per-project blob tracking with deduplication. 3 + This document describes ATCR's storage quota implementation using ATProto records for per-user layer tracking. 4 4 5 5 ## Table of Contents 6 6 7 7 - [Overview](#overview) 8 - - [Harbor's Approach (Reference Implementation)](#harbors-approach-reference-implementation) 9 - - [Storage Options](#storage-options) 10 - - [Quota Data Model](#quota-data-model) 11 - - [Push Flow (Detailed)](#push-flow-detailed) 8 + - [Quota Model](#quota-model) 9 + - [Layer Record Schema](#layer-record-schema) 10 + - [Quota Calculation](#quota-calculation) 11 + - [Push Flow](#push-flow) 12 12 - [Delete Flow](#delete-flow) 13 13 - [Garbage Collection](#garbage-collection) 14 - - [Quota Reconciliation](#quota-reconciliation) 15 14 - [Configuration](#configuration) 16 - - [Trade-offs & Design Decisions](#trade-offs--design-decisions) 17 15 - [Future Enhancements](#future-enhancements) 18 16 19 17 ## Overview 20 18 21 19 ATCR implements per-user storage quotas to: 22 20 1. **Limit storage consumption** on shared hold services 23 - 2. **Track actual S3 costs** (what new data was added) 24 - 3. **Benefit from deduplication** (users only pay once per layer) 25 - 4. **Provide transparency** (show users their storage usage) 21 + 2. **Provide transparency** (show users their storage usage) 22 + 3. **Enable fair billing** (users pay for what they use) 26 23 27 - **Key principle:** Users pay for layers they've uploaded, but only ONCE per layer regardless of how many images reference it. 24 + **Key principle:** Users pay for layers they reference, deduplicated per-user. If you push the same layer in multiple images, you only pay once. 28 25 29 26 ### Example Scenario 30 27 31 28 ``` 32 29 Alice pushes myapp:v1 (layers A, B, C - each 100MB) 33 - → Alice's quota: +300MB (all new layers) 30 + → Creates 3 layer records in hold's PDS 31 + → Alice's quota: 300MB (3 unique layers) 34 32 35 33 Alice pushes myapp:v2 (layers A, B, D) 36 - → Layers A, B already claimed by Alice 37 - → Layer D is new (100MB) 38 - → Alice's quota: +100MB (only D is new) 39 - → Total: 400MB 34 + → Creates 3 more layer records (A, B again, plus D) 35 + → Alice's quota: 400MB (4 unique layers: A, B, C, D) 36 + → Layers A, B appear twice in records but deduplicated in quota calc 40 37 41 38 Bob pushes his-app:latest (layers A, E) 42 - → Layer A already exists in S3 (uploaded by Alice) 43 - → Bob claims it for first time → +100MB to Bob's quota 44 - → Layer E is new → +100MB to Bob's quota 45 - → Bob's quota: 200MB 39 + → Creates 2 layer records for Bob 40 + → Bob's quota: 200MB (2 unique layers: A, E) 41 + → Layer A shared with Alice in S3, but Bob pays for his own usage 46 42 47 - Physical S3 storage: 500MB (A, B, C, D, E) 48 - Claimed storage: 600MB (Alice: 400MB, Bob: 200MB) 49 - Deduplication savings: 100MB (layer A shared) 43 + Physical S3 storage: 500MB (A, B, C, D, E - deduplicated globally) 44 + Alice's quota: 400MB 45 + Bob's quota: 200MB 50 46 ``` 51 47 52 - ## Harbor's Approach (Reference Implementation) 48 + ## Quota Model 53 49 54 - Harbor is built on distribution/distribution (same as ATCR) and implements quotas as middleware. Their approach: 50 + ### Everyone Pays for What They Upload 55 51 56 - ### Key Insights from Harbor 52 + Each user is charged for all unique layers they reference, regardless of whether those layers exist in S3 from other users' uploads. 57 53 58 - 1. **"Shared blobs are only computed once per project"** 59 - - Each project tracks which blobs it has uploaded 60 - - Same blob used in multiple images counts only once per project 61 - - Different projects claiming the same blob each pay for it 54 + **Why this model?** 55 + - **Simple mental model**: "I pushed 500MB of layers, I use 500MB of quota" 56 + - **Predictable**: Your quota doesn't change based on others' actions 57 + - **Clean deletion**: Delete manifest → layer records removed → quota freed 58 + - **No cross-user dependencies**: Users are isolated 62 59 63 - 2. **Quota checked when manifest is pushed** 64 - - Blobs upload first (presigned URLs, can't intercept) 65 - - Manifest pushed last → quota check happens here 66 - - Can reject manifest if quota exceeded (orphaned blobs cleaned by GC) 60 + **Trade-off:** 61 + - Total claimed storage can exceed physical S3 storage 62 + - This is acceptable - deduplication is an operational benefit for ATCR, not a billing feature 67 63 68 - 3. **Middleware-based implementation** 69 - - distribution/distribution has NO built-in quota support 70 - - Harbor added it as request preprocessing middleware 71 - - Uses database (PostgreSQL) or Redis for quota storage 64 + ### ATProto-Native Storage 72 65 73 - 4. **Per-project ownership model** 74 - - Blobs are physically deduplicated globally 75 - - Quota accounting is logical (per-project claims) 76 - - Total claimed storage can exceed physical storage 66 + Layer tracking uses ATProto records stored in the hold's embedded PDS: 67 + - **Collection**: `io.atcr.hold.layer` 68 + - **Repository**: Hold's DID (e.g., `did:web:hold01.atcr.io`) 69 + - **Records**: One per manifest-layer relationship (TID-based keys) 77 70 78 - ### References 71 + This approach: 72 + - Keeps quota data in ATProto (no separate database) 73 + - Enables standard ATProto sync/query mechanisms 74 + - Provides full audit trail of layer usage 79 75 80 - - Harbor Quota Documentation: https://goharbor.io/docs/1.10/administration/configure-project-quotas/ 81 - - Harbor Source: https://github.com/goharbor/harbor (see `src/controller/quota`) 76 + ## Layer Record Schema 82 77 83 - ## Storage Options 78 + ### LayerRecord 84 79 85 - The hold service needs to store quota data somewhere. Two options: 86 - 87 - ### Option 1: S3-Based Storage (Recommended for BYOS) 88 - 89 - Store quota metadata alongside blobs in the same S3 bucket: 80 + ```go 81 + // pkg/atproto/lexicon.go 90 82 91 - ``` 92 - Bucket structure: 93 - /docker/registry/v2/blobs/sha256/ab/abc123.../data ← actual blobs 94 - /atcr/quota/did:plc:alice.json ← quota tracking 95 - /atcr/quota/did:plc:bob.json 83 + type LayerRecord struct { 84 + Type string `json:"$type"` // "io.atcr.hold.layer" 85 + Digest string `json:"digest"` // Layer digest (sha256:abc123...) 86 + Size int64 `json:"size"` // Size in bytes 87 + MediaType string `json:"mediaType"` // e.g., "application/vnd.oci.image.layer.v1.tar+gzip" 88 + Manifest string `json:"manifest"` // at://did:plc:alice/io.atcr.manifest/abc123 89 + UserDID string `json:"userDid"` // User's DID for quota grouping 90 + CreatedAt string `json:"createdAt"` // ISO 8601 timestamp 91 + } 96 92 ``` 97 93 98 - **Pros:** 99 - - ✅ No separate database needed 100 - - ✅ Single S3 bucket (better UX - no second bucket to configure) 101 - - ✅ Quota data lives with the blobs 102 - - ✅ Hold service stays relatively stateless 103 - - ✅ Works with any S3-compatible service (Storj, Minio, Upcloud, Fly.io) 104 - 105 - **Cons:** 106 - - ❌ Slower than local database (network round-trip) 107 - - ❌ Eventual consistency issues 108 - - ❌ Race conditions on concurrent updates 109 - - ❌ Extra S3 API costs (GET/PUT per upload) 94 + ### Record Key 110 95 111 - **Performance:** 112 - - Each blob upload: 1 HEAD (blob exists?) + 1 GET (quota) + 1 PUT (update quota) 113 - - Typical latency: 100-200ms total overhead 114 - - For high-throughput registries, consider SQLite 96 + Records use TID (timestamp-based ID) as the rkey. This means: 97 + - Multiple records can exist for the same layer (from different manifests) 98 + - Deduplication happens at query time, not storage time 99 + - Simple append-only writes on manifest push 115 100 116 - ### Option 2: SQLite Database (Recommended for Shared Holds) 101 + ### Example Records 117 102 118 - Local database in hold service: 119 - 120 - ```bash 121 - /var/lib/atcr/hold-quota.db 122 103 ``` 123 - 124 - **Pros:** 125 - - ✅ Fast local queries (no network latency) 126 - - ✅ ACID transactions (no race conditions) 127 - - ✅ Efficient for high-throughput registries 128 - - ✅ Can use foreign keys and joins 104 + Manifest A (layers X, Y, Z) → creates 3 records 105 + Manifest B (layers X, W) → creates 2 records 129 106 130 - **Cons:** 131 - - ❌ Makes hold service stateful (persistent volume needed) 132 - - ❌ Not ideal for ephemeral BYOS deployments 133 - - ❌ Backup/restore complexity 134 - - ❌ Multi-instance scaling requires shared database 135 - 136 - **Schema:** 137 - ```sql 138 - CREATE TABLE user_quotas ( 139 - did TEXT PRIMARY KEY, 140 - quota_limit INTEGER NOT NULL DEFAULT 10737418240, -- 10GB 141 - quota_used INTEGER NOT NULL DEFAULT 0, 142 - updated_at TIMESTAMP 143 - ); 144 - 145 - CREATE TABLE claimed_layers ( 146 - did TEXT NOT NULL, 147 - digest TEXT NOT NULL, 148 - size INTEGER NOT NULL, 149 - claimed_at TIMESTAMP, 150 - PRIMARY KEY(did, digest) 151 - ); 107 + io.atcr.hold.layer collection: 108 + ┌──────────────┬────────┬──────┬───────────────────────────────────┬─────────────────┐ 109 + │ rkey (TID) │ digest │ size │ manifest │ userDid │ 110 + ├──────────────┼────────┼──────┼───────────────────────────────────┼─────────────────┤ 111 + │ 3jui7...001 │ X │ 100 │ at://did:plc:alice/.../manifestA │ did:plc:alice │ 112 + │ 3jui7...002 │ Y │ 200 │ at://did:plc:alice/.../manifestA │ did:plc:alice │ 113 + │ 3jui7...003 │ Z │ 150 │ at://did:plc:alice/.../manifestA │ did:plc:alice │ 114 + │ 3jui7...004 │ X │ 100 │ at://did:plc:alice/.../manifestB │ did:plc:alice │ ← duplicate digest 115 + │ 3jui7...005 │ W │ 300 │ at://did:plc:alice/.../manifestB │ did:plc:alice │ 116 + └──────────────┴────────┴──────┴───────────────────────────────────┴─────────────────┘ 152 117 ``` 153 118 154 - ### Recommendation 119 + ## Quota Calculation 155 120 156 - - **BYOS (user-owned holds):** S3-based (keeps hold service ephemeral) 157 - - **Shared holds (multi-user):** SQLite (better performance and consistency) 158 - - **High-traffic production:** SQLite or PostgreSQL (Harbor uses this) 121 + ### Query: User's Unique Storage 159 122 160 - ## Quota Data Model 161 - 162 - ### Quota File Format (S3-based) 163 - 164 - ```json 165 - { 166 - "did": "did:plc:alice123", 167 - "limit": 10737418240, 168 - "used": 5368709120, 169 - "claimed_layers": { 170 - "sha256:abc123...": 104857600, 171 - "sha256:def456...": 52428800, 172 - "sha256:789ghi...": 209715200 173 - }, 174 - "last_updated": "2025-10-09T12:34:56Z", 175 - "version": 1 176 - } 123 + ```sql 124 + -- Calculate quota by deduplicating layers 125 + SELECT SUM(size) FROM ( 126 + SELECT DISTINCT digest, size 127 + FROM io.atcr.hold.layer 128 + WHERE userDid = ? 129 + ) 177 130 ``` 178 131 179 - **Fields:** 180 - - `did`: User's ATProto DID 181 - - `limit`: Maximum storage in bytes (default: 10GB) 182 - - `used`: Current storage usage in bytes (sum of claimed_layers) 183 - - `claimed_layers`: Map of digest → size for all layers user has uploaded 184 - - `last_updated`: Timestamp of last quota update 185 - - `version`: Schema version for future migrations 186 - 187 - ### Why Track Individual Layers? 188 - 189 - **Q: Can't we just track a counter?** 190 - 191 - **A: We need layer tracking for:** 192 - 193 - 1. **Deduplication detection** 194 - - Check if user already claimed a layer → free upload 195 - - Example: Updating an image reuses most layers 196 - 197 - 2. **Accurate deletes** 198 - - When manifest deleted, only decrement unclaimed layers 199 - - User may have 5 images sharing layer A - deleting 1 image doesn't free layer A 200 - 201 - 3. **Quota reconciliation** 202 - - Verify quota matches reality by listing user's manifests 203 - - Recalculate from layers in manifests vs claimed_layers map 132 + Using the example above: 133 + - Layer X appears twice but counted once: 100 134 + - Layers Y, Z, W counted once each: 200 + 150 + 300 135 + - **Total: 750 bytes** 204 136 205 - 4. **Auditing** 206 - - "Show me what I'm storing" 207 - - Users can see which layers consume their quota 208 - 209 - ## Push Flow (Detailed) 210 - 211 - ### Step-by-Step: User Pushes Image 212 - 213 - ``` 214 - ┌──────────┐ ┌──────────┐ ┌──────────┐ 215 - │ Client │ │ Hold │ │ S3 │ 216 - │ (Docker) │ │ Service │ │ Bucket │ 217 - └──────────┘ └──────────┘ └──────────┘ 218 - │ │ │ 219 - │ 1. PUT /v2/.../blobs/ │ │ 220 - │ upload?digest=sha256:abc│ │ 221 - ├───────────────────────────>│ │ 222 - │ │ │ 223 - │ │ 2. Check if blob exists │ 224 - │ │ (Stat/HEAD request) │ 225 - │ ├───────────────────────────>│ 226 - │ │<───────────────────────────┤ 227 - │ │ 200 OK (exists) or │ 228 - │ │ 404 Not Found │ 229 - │ │ │ 230 - │ │ 3. Read user quota │ 231 - │ │ GET /atcr/quota/{did} │ 232 - │ ├───────────────────────────>│ 233 - │ │<───────────────────────────┤ 234 - │ │ quota.json │ 235 - │ │ │ 236 - │ │ 4. Calculate quota impact │ 237 - │ │ - If digest in │ 238 - │ │ claimed_layers: 0 │ 239 - │ │ - Else: size │ 240 - │ │ │ 241 - │ │ 5. Check quota limit │ 242 - │ │ used + impact <= limit? │ 243 - │ │ │ 244 - │ │ 6. Update quota │ 245 - │ │ PUT /atcr/quota/{did} │ 246 - │ ├───────────────────────────>│ 247 - │ │<───────────────────────────┤ 248 - │ │ 200 OK │ 249 - │ │ │ 250 - │ 7. Presigned URL │ │ 251 - │<───────────────────────────┤ │ 252 - │ {url: "https://s3..."} │ │ 253 - │ │ │ 254 - │ 8. Upload blob to S3 │ │ 255 - ├────────────────────────────┼───────────────────────────>│ 256 - │ │ │ 257 - │ 9. 200 OK │ │ 258 - │<───────────────────────────┼────────────────────────────┤ 259 - │ │ │ 260 - ``` 261 - 262 - ### Implementation (Pseudocode) 137 + ### Implementation 263 138 264 139 ```go 265 - // cmd/hold/main.go - HandlePutPresignedURL 140 + // pkg/hold/quota/quota.go 266 141 267 - func (s *HoldService) HandlePutPresignedURL(w http.ResponseWriter, r *http.Request) { 268 - var req PutPresignedURLRequest 269 - json.NewDecoder(r.Body).Decode(&req) 142 + type QuotaManager struct { 143 + pds *pds.Server // Hold's embedded PDS 144 + } 270 145 271 - // Step 1: Check if blob already exists in S3 272 - blobPath := fmt.Sprintf("/docker/registry/v2/blobs/%s/%s/%s/data", 273 - algorithm, digest[:2], digest) 274 - 275 - _, err := s.driver.Stat(ctx, blobPath) 276 - blobExists := (err == nil) 277 - 278 - // Step 2: Read quota from S3 (or SQLite) 279 - quota, err := s.quotaManager.GetQuota(req.DID) 146 + // GetUsage calculates a user's current quota usage 147 + func (q *QuotaManager) GetUsage(ctx context.Context, userDID string) (int64, error) { 148 + // List all layer records for this user 149 + records, err := q.pds.ListRecords(ctx, LayerCollection, userDID) 280 150 if err != nil { 281 - // First upload - create quota with defaults 282 - quota = &Quota{ 283 - DID: req.DID, 284 - Limit: s.config.QuotaDefaultLimit, 285 - Used: 0, 286 - ClaimedLayers: make(map[string]int64), 287 - } 151 + return 0, err 288 152 } 289 153 290 - // Step 3: Calculate quota impact 291 - quotaImpact := req.Size // Default: assume new layer 292 - 293 - if _, alreadyClaimed := quota.ClaimedLayers[req.Digest]; alreadyClaimed { 294 - // User already uploaded this layer before 295 - quotaImpact = 0 296 - log.Printf("Layer %s already claimed by %s, no quota impact", 297 - req.Digest, req.DID) 298 - } else if blobExists { 299 - // Blob exists in S3 (uploaded by another user) 300 - // But this user is claiming it for first time 301 - // Still counts against their quota 302 - log.Printf("Layer %s exists globally but new to %s, quota impact: %d", 303 - req.Digest, req.DID, quotaImpact) 304 - } else { 305 - // Brand new blob - will be uploaded to S3 306 - log.Printf("New layer %s for %s, quota impact: %d", 307 - req.Digest, req.DID, quotaImpact) 154 + // Deduplicate by digest 155 + uniqueLayers := make(map[string]int64) // digest -> size 156 + for _, record := range records { 157 + var layer LayerRecord 158 + if err := json.Unmarshal(record.Value, &layer); err != nil { 159 + continue 160 + } 161 + if layer.UserDID == userDID { 162 + uniqueLayers[layer.Digest] = layer.Size 163 + } 308 164 } 309 165 310 - // Step 4: Check quota limit 311 - if quota.Used + quotaImpact > quota.Limit { 312 - http.Error(w, fmt.Sprintf( 313 - "quota exceeded: used=%d, impact=%d, limit=%d", 314 - quota.Used, quotaImpact, quota.Limit, 315 - ), http.StatusPaymentRequired) // 402 316 - return 317 - } 318 - 319 - // Step 5: Update quota (optimistic - before upload completes) 320 - quota.Used += quotaImpact 321 - if quotaImpact > 0 { 322 - quota.ClaimedLayers[req.Digest] = req.Size 166 + // Sum unique layer sizes 167 + var total int64 168 + for _, size := range uniqueLayers { 169 + total += size 323 170 } 324 - quota.LastUpdated = time.Now() 325 171 326 - if err := s.quotaManager.SaveQuota(quota); err != nil { 327 - http.Error(w, "failed to update quota", http.StatusInternalServerError) 328 - return 329 - } 172 + return total, nil 173 + } 330 174 331 - // Step 6: Generate presigned URL 332 - presignedURL, err := s.getUploadURL(ctx, req.Digest, req.Size, req.DID) 175 + // CheckQuota returns true if user has space for additional bytes 176 + func (q *QuotaManager) CheckQuota(ctx context.Context, userDID string, additional int64, limit int64) (bool, int64, error) { 177 + current, err := q.GetUsage(ctx, userDID) 333 178 if err != nil { 334 - // Rollback quota update on error 335 - quota.Used -= quotaImpact 336 - delete(quota.ClaimedLayers, req.Digest) 337 - s.quotaManager.SaveQuota(quota) 338 - 339 - http.Error(w, "failed to generate presigned URL", http.StatusInternalServerError) 340 - return 341 - } 342 - 343 - // Step 7: Return presigned URL + quota info 344 - resp := PutPresignedURLResponse{ 345 - URL: presignedURL, 346 - ExpiresAt: time.Now().Add(15 * time.Minute), 347 - QuotaInfo: QuotaInfo{ 348 - Used: quota.Used, 349 - Limit: quota.Limit, 350 - Available: quota.Limit - quota.Used, 351 - Impact: quotaImpact, 352 - AlreadyClaimed: quotaImpact == 0, 353 - }, 179 + return false, 0, err 354 180 } 355 181 356 - w.Header().Set("Content-Type", "application/json") 357 - json.NewEncoder(w).Encode(resp) 182 + return current+additional <= limit, current, nil 358 183 } 359 184 ``` 360 185 361 - ### Race Condition Handling 362 - 363 - **Problem:** Two concurrent uploads of the same blob 186 + ### Quota Response 364 187 365 - ``` 366 - Time User A User B 367 - 0ms Upload layer X (100MB) 368 - 10ms Upload layer X (100MB) 369 - 20ms Check exists: NO Check exists: NO 370 - 30ms Quota impact: 100MB Quota impact: 100MB 371 - 40ms Update quota A: +100MB Update quota B: +100MB 372 - 50ms Generate presigned URL Generate presigned URL 373 - 100ms Upload to S3 completes Upload to S3 (overwrites A's) 188 + ```go 189 + type QuotaInfo struct { 190 + Used int64 `json:"used"` // Current usage (deduplicated) 191 + Limit int64 `json:"limit"` // User's quota limit 192 + Available int64 `json:"available"` // Remaining space 193 + } 374 194 ``` 375 195 376 - **Result:** Both users charged 100MB, but only 100MB stored in S3. 196 + ## Push Flow 377 197 378 - **Mitigation strategies:** 379 - 380 - 1. **Accept eventual consistency** (recommended for S3-based) 381 - - Run periodic reconciliation to fix discrepancies 382 - - Small inconsistency window (minutes) is acceptable 383 - - Reconciliation uses PDS as source of truth 384 - 385 - 2. **Optimistic locking** (S3 ETags) 386 - ```go 387 - // Use S3 ETags for conditional writes 388 - oldETag := getQuotaFileETag(did) 389 - err := putQuotaFileWithCondition(quota, oldETag) 390 - if err == PreconditionFailed { 391 - // Retry with fresh read 392 - } 393 - ``` 394 - 395 - 3. **Database transactions** (SQLite-based) 396 - ```sql 397 - BEGIN TRANSACTION; 398 - SELECT * FROM user_quotas WHERE did = ? FOR UPDATE; 399 - UPDATE user_quotas SET used = used + ? WHERE did = ?; 400 - COMMIT; 401 - ``` 402 - 403 - ## Delete Flow 404 - 405 - ### Manifest Deletion via AppView UI 406 - 407 - When a user deletes a manifest through the AppView web interface: 198 + ### Step-by-Step: User Pushes Image 408 199 409 200 ``` 410 201 ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ 411 - │ User │ │ AppView │ │ Hold │ │ PDS │ 412 - │ UI │ │ Database │ │ Service │ │ │ 202 + │ Client │ │ AppView │ │ Hold │ │ User PDS │ 203 + │ (Docker) │ │ │ │ Service │ │ │ 413 204 └──────────┘ └──────────┘ └──────────┘ └──────────┘ 414 205 │ │ │ │ 415 - │ DELETE manifest │ │ │ 206 + │ 1. Upload blobs │ │ │ 416 207 ├─────────────────────>│ │ │ 208 + │ │ 2. Route to hold │ │ 209 + │ ├─────────────────────>│ │ 210 + │ │ │ 3. Store in S3 │ 417 211 │ │ │ │ 418 - │ │ 1. Get manifest │ │ 419 - │ │ and layers │ │ 212 + │ 4. PUT manifest │ │ │ 213 + ├─────────────────────>│ │ │ 420 214 │ │ │ │ 421 - │ │ 2. Check which │ │ 422 - │ │ layers still │ │ 423 - │ │ referenced by │ │ 424 - │ │ user's other │ │ 425 - │ │ manifests │ │ 215 + │ │ 5. Calculate quota │ │ 216 + │ │ impact for new │ │ 217 + │ │ layers │ │ 218 + │ │ │ │ 219 + │ │ 6. Check quota limit │ │ 220 + │ ├─────────────────────>│ │ 221 + │ │<─────────────────────┤ │ 426 222 │ │ │ │ 427 - │ │ 3. DELETE manifest │ │ 428 - │ │ from PDS │ │ 223 + │ │ 7. Store manifest │ │ 429 224 │ ├──────────────────────┼─────────────────────>│ 430 225 │ │ │ │ 431 - │ │ 4. POST /quota/decrement │ 226 + │ │ 8. Create layer │ │ 227 + │ │ records │ │ 432 228 │ ├─────────────────────>│ │ 433 - │ │ {layers: [...]} │ │ 434 - │ │ │ │ 435 - │ │ │ 5. Update quota │ 436 - │ │ │ Remove unclaimed │ 437 - │ │ │ layers │ 438 - │ │ │ │ 439 - │ │ 6. 200 OK │ │ 440 - │ │<─────────────────────┤ │ 441 - │ │ │ │ 442 - │ │ 7. Delete from DB │ │ 229 + │ │ │ 9. Write to │ 230 + │ │ │ hold's PDS │ 443 231 │ │ │ │ 444 - │ 8. Success │ │ │ 232 + │ 10. 201 Created │ │ │ 445 233 │<─────────────────────┤ │ │ 446 - │ │ │ │ 447 234 ``` 448 235 449 - ### AppView Implementation 236 + ### Implementation 450 237 451 238 ```go 452 - // pkg/appview/handlers/manifest.go 239 + // pkg/appview/storage/routing_repository.go 453 240 454 - func (h *ManifestHandler) DeleteManifest(w http.ResponseWriter, r *http.Request) { 455 - did := r.Context().Value("auth.did").(string) 456 - repository := chi.URLParam(r, "repository") 457 - digest := chi.URLParam(r, "digest") 241 + func (r *RoutingRepository) PutManifest(ctx context.Context, manifest distribution.Manifest) error { 242 + // Parse manifest to get layers 243 + layers := extractLayers(manifest) 458 244 459 - // Step 1: Get manifest and its layers from database 460 - manifest, err := db.GetManifest(h.db, digest) 245 + // Get user's current unique layers from hold 246 + existingLayers, err := r.holdClient.GetUserLayers(ctx, r.userDID) 461 247 if err != nil { 462 - http.Error(w, "manifest not found", 404) 463 - return 248 + return err 464 249 } 250 + existingSet := makeDigestSet(existingLayers) 465 251 466 - layers, err := db.GetLayersForManifest(h.db, manifest.ID) 467 - if err != nil { 468 - http.Error(w, "failed to get layers", 500) 469 - return 470 - } 471 - 472 - // Step 2: For each layer, check if user still references it 473 - // in other manifests 474 - layersToDecrement := []LayerInfo{} 475 - 252 + // Calculate quota impact (only new unique layers) 253 + var quotaImpact int64 476 254 for _, layer := range layers { 477 - // Query: does this user have other manifests using this layer? 478 - stillReferenced, err := db.CheckLayerReferencedByUser( 479 - h.db, did, repository, layer.Digest, manifest.ID, 480 - ) 481 - 482 - if err != nil { 483 - http.Error(w, "failed to check layer references", 500) 484 - return 255 + if !existingSet[layer.Digest] { 256 + quotaImpact += layer.Size 485 257 } 258 + } 486 259 487 - if !stillReferenced { 488 - // This layer is no longer used by user 489 - layersToDecrement = append(layersToDecrement, LayerInfo{ 490 - Digest: layer.Digest, 491 - Size: layer.Size, 492 - }) 493 - } 260 + // Check quota 261 + ok, current, err := r.quotaManager.CheckQuota(ctx, r.userDID, quotaImpact, r.quotaLimit) 262 + if err != nil { 263 + return err 264 + } 265 + if !ok { 266 + return fmt.Errorf("quota exceeded: used=%d, impact=%d, limit=%d", 267 + current, quotaImpact, r.quotaLimit) 494 268 } 495 269 496 - // Step 3: Delete manifest from user's PDS 497 - atprotoClient := atproto.NewClient(manifest.PDSEndpoint, did, accessToken) 498 - err = atprotoClient.DeleteRecord(ctx, atproto.ManifestCollection, manifestRKey) 270 + // Store manifest in user's PDS 271 + manifestURI, err := r.atprotoClient.PutManifest(ctx, manifest) 499 272 if err != nil { 500 - http.Error(w, "failed to delete from PDS", 500) 501 - return 273 + return err 502 274 } 503 275 504 - // Step 4: Notify hold service to decrement quota 505 - if len(layersToDecrement) > 0 { 506 - holdClient := &http.Client{} 507 - 508 - decrementReq := QuotaDecrementRequest{ 509 - DID: did, 510 - Layers: layersToDecrement, 276 + // Create layer records in hold's PDS 277 + for _, layer := range layers { 278 + record := LayerRecord{ 279 + Type: "io.atcr.hold.layer", 280 + Digest: layer.Digest, 281 + Size: layer.Size, 282 + MediaType: layer.MediaType, 283 + Manifest: manifestURI, 284 + UserDID: r.userDID, 285 + CreatedAt: time.Now().Format(time.RFC3339), 511 286 } 512 - 513 - body, _ := json.Marshal(decrementReq) 514 - resp, err := holdClient.Post( 515 - manifest.HoldEndpoint + "/quota/decrement", 516 - "application/json", 517 - bytes.NewReader(body), 518 - ) 519 - 520 - if err != nil || resp.StatusCode != 200 { 521 - log.Printf("Warning: failed to update quota on hold service: %v", err) 522 - // Continue anyway - GC reconciliation will fix it 287 + if err := r.holdClient.CreateLayerRecord(ctx, record); err != nil { 288 + log.Printf("Warning: failed to create layer record: %v", err) 289 + // Continue - reconciliation will fix 523 290 } 524 291 } 525 292 526 - // Step 5: Delete from AppView database 527 - err = db.DeleteManifest(h.db, did, repository, digest) 528 - if err != nil { 529 - http.Error(w, "failed to delete from database", 500) 530 - return 531 - } 532 - 533 - w.WriteHeader(http.StatusNoContent) 293 + return nil 534 294 } 535 295 ``` 536 296 537 - ### Hold Service Decrement Endpoint 538 - 539 - ```go 540 - // cmd/hold/main.go 541 - 542 - type QuotaDecrementRequest struct { 543 - DID string `json:"did"` 544 - Layers []LayerInfo `json:"layers"` 545 - } 297 + ### Quota Check Timing 546 298 547 - type LayerInfo struct { 548 - Digest string `json:"digest"` 549 - Size int64 `json:"size"` 550 - } 299 + Quota is checked when the **manifest is pushed** (after blobs are uploaded): 300 + - Blobs upload first via presigned URLs 301 + - Manifest pushed last triggers quota check 302 + - If quota exceeded, manifest is rejected (orphaned blobs cleaned by GC) 551 303 552 - func (s *HoldService) HandleQuotaDecrement(w http.ResponseWriter, r *http.Request) { 553 - var req QuotaDecrementRequest 554 - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 555 - http.Error(w, "invalid request", 400) 556 - return 557 - } 304 + This matches Harbor's approach and is the industry standard. 558 305 559 - // Read current quota 560 - quota, err := s.quotaManager.GetQuota(req.DID) 561 - if err != nil { 562 - http.Error(w, "quota not found", 404) 563 - return 564 - } 306 + ## Delete Flow 565 307 566 - // Decrement quota for each layer 567 - for _, layer := range req.Layers { 568 - if size, claimed := quota.ClaimedLayers[layer.Digest]; claimed { 569 - // Remove from claimed layers 570 - delete(quota.ClaimedLayers, layer.Digest) 571 - quota.Used -= size 308 + ### Manifest Deletion 572 309 573 - log.Printf("Decremented quota for %s: layer %s (%d bytes)", 574 - req.DID, layer.Digest, size) 575 - } else { 576 - log.Printf("Warning: layer %s not in claimed_layers for %s", 577 - layer.Digest, req.DID) 578 - } 579 - } 310 + When a user deletes a manifest: 580 311 581 - // Ensure quota.Used doesn't go negative (defensive) 582 - if quota.Used < 0 { 583 - log.Printf("Warning: quota.Used went negative for %s, resetting to 0", req.DID) 584 - quota.Used = 0 585 - } 586 - 587 - // Save updated quota 588 - quota.LastUpdated = time.Now() 589 - if err := s.quotaManager.SaveQuota(quota); err != nil { 590 - http.Error(w, "failed to save quota", 500) 591 - return 592 - } 593 - 594 - // Return updated quota info 595 - json.NewEncoder(w).Encode(map[string]any{ 596 - "used": quota.Used, 597 - "limit": quota.Limit, 598 - }) 599 - } 600 312 ``` 601 - 602 - ### SQL Query: Check Layer References 603 - 604 - ```sql 605 - -- pkg/appview/db/queries.go 606 - 607 - -- Check if user still references this layer in other manifests 608 - SELECT COUNT(*) 609 - FROM layers l 610 - JOIN manifests m ON l.manifest_id = m.id 611 - WHERE m.did = ? -- User's DID 612 - AND l.digest = ? -- Layer digest 613 - AND m.id != ? -- Exclude the manifest being deleted 313 + ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ 314 + │ User │ │ AppView │ │ Hold │ │ User PDS │ 315 + │ UI │ │ │ │ Service │ │ │ 316 + └──────────┘ └──────────┘ └──────────┘ └──────────┘ 317 + │ │ │ │ 318 + │ DELETE manifest │ │ │ 319 + ├─────────────────────>│ │ │ 320 + │ │ │ │ 321 + │ │ 1. Delete manifest │ │ 322 + │ │ from user's PDS │ │ 323 + │ ├──────────────────────┼─────────────────────>│ 324 + │ │ │ │ 325 + │ │ 2. Delete layer │ │ 326 + │ │ records for this │ │ 327 + │ │ manifest │ │ 328 + │ ├─────────────────────>│ │ 329 + │ │ │ 3. Remove records │ 330 + │ │ │ where manifest │ 331 + │ │ │ == deleted URI │ 332 + │ │ │ │ 333 + │ 4. 204 No Content │ │ │ 334 + │<─────────────────────┤ │ │ 614 335 ``` 615 336 616 - ## Garbage Collection 617 - 618 - ### Background: Orphaned Blobs 619 - 620 - Orphaned blobs accumulate when: 621 - 1. Manifest push fails after blobs uploaded (presigned URLs bypass hold) 622 - 2. Quota exceeded - manifest rejected, blobs already in S3 623 - 3. User deletes manifest - blobs no longer referenced 624 - 625 - **GC periodically cleans these up.** 626 - 627 - ### GC Cron Implementation 628 - 629 - Similar to AppView's backfill worker, the hold service can run periodic GC: 337 + ### Implementation 630 338 631 339 ```go 632 - // cmd/hold/gc/gc.go 340 + // pkg/appview/handlers/manifest.go 633 341 634 - type GarbageCollector struct { 635 - driver storagedriver.StorageDriver 636 - appviewURL string 637 - holdURL string 638 - quotaManager *quota.Manager 639 - } 342 + func (h *ManifestHandler) DeleteManifest(w http.ResponseWriter, r *http.Request) { 343 + userDID := auth.GetDID(r.Context()) 344 + repository := chi.URLParam(r, "repository") 345 + digest := chi.URLParam(r, "digest") 640 346 641 - // Run garbage collection 642 - func (gc *GarbageCollector) Run(ctx context.Context) error { 643 - log.Println("Starting garbage collection...") 347 + // Get manifest URI before deletion 348 + manifestURI := fmt.Sprintf("at://%s/%s/%s", userDID, ManifestCollection, digest) 644 349 645 - // Step 1: Get list of referenced blobs from AppView 646 - referenced, err := gc.getReferencedBlobs() 647 - if err != nil { 648 - return fmt.Errorf("failed to get referenced blobs: %w", err) 350 + // Delete manifest from user's PDS 351 + if err := h.atprotoClient.DeleteRecord(ctx, ManifestCollection, digest); err != nil { 352 + http.Error(w, "failed to delete manifest", 500) 353 + return 649 354 } 650 355 651 - referencedSet := make(map[string]bool) 652 - for _, digest := range referenced { 653 - referencedSet[digest] = true 356 + // Delete associated layer records from hold's PDS 357 + if err := h.holdClient.DeleteLayerRecords(ctx, manifestURI); err != nil { 358 + log.Printf("Warning: failed to delete layer records: %v", err) 359 + // Continue - reconciliation will clean up 654 360 } 655 361 656 - log.Printf("AppView reports %d referenced blobs", len(referenced)) 657 - 658 - // Step 2: Walk S3 blobs 659 - deletedCount := 0 660 - reclaimedBytes := int64(0) 661 - 662 - err = gc.driver.Walk(ctx, "/docker/registry/v2/blobs", func(fileInfo storagedriver.FileInfo) error { 663 - if fileInfo.IsDir() { 664 - return nil // Skip directories 665 - } 666 - 667 - // Extract digest from path 668 - // Path: /docker/registry/v2/blobs/sha256/ab/abc123.../data 669 - digest := extractDigestFromPath(fileInfo.Path()) 670 - 671 - if !referencedSet[digest] { 672 - // Unreferenced blob - delete it 673 - size := fileInfo.Size() 674 - 675 - if err := gc.driver.Delete(ctx, fileInfo.Path()); err != nil { 676 - log.Printf("Failed to delete blob %s: %v", digest, err) 677 - return nil // Continue anyway 678 - } 679 - 680 - deletedCount++ 681 - reclaimedBytes += size 682 - 683 - log.Printf("GC: Deleted unreferenced blob %s (%d bytes)", digest, size) 684 - } 685 - 686 - return nil 687 - }) 688 - 689 - if err != nil { 690 - return fmt.Errorf("failed to walk blobs: %w", err) 691 - } 692 - 693 - log.Printf("GC complete: deleted %d blobs, reclaimed %d bytes", 694 - deletedCount, reclaimedBytes) 695 - 696 - return nil 697 - } 698 - 699 - // Get referenced blobs from AppView 700 - func (gc *GarbageCollector) getReferencedBlobs() ([]string, error) { 701 - // Query AppView for all blobs referenced by manifests 702 - // stored in THIS hold service 703 - url := fmt.Sprintf("%s/internal/blobs/referenced?hold=%s", 704 - gc.appviewURL, url.QueryEscape(gc.holdURL)) 705 - 706 - resp, err := http.Get(url) 707 - if err != nil { 708 - return nil, err 709 - } 710 - defer resp.Body.Close() 711 - 712 - var result struct { 713 - Blobs []string `json:"blobs"` 714 - } 715 - 716 - if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 717 - return nil, err 718 - } 719 - 720 - return result.Blobs, nil 362 + w.WriteHeader(http.StatusNoContent) 721 363 } 722 364 ``` 723 365 724 - ### AppView Internal API 366 + ### Hold Service: Delete Layer Records 725 367 726 368 ```go 727 - // pkg/appview/handlers/internal.go 369 + // pkg/hold/pds/xrpc.go 728 370 729 - // Get all referenced blobs for a specific hold 730 - func (h *InternalHandler) GetReferencedBlobs(w http.ResponseWriter, r *http.Request) { 731 - holdEndpoint := r.URL.Query().Get("hold") 732 - if holdEndpoint == "" { 733 - http.Error(w, "missing hold parameter", 400) 734 - return 735 - } 736 - 737 - // Query database for all layers in manifests stored in this hold 738 - query := ` 739 - SELECT DISTINCT l.digest 740 - FROM layers l 741 - JOIN manifests m ON l.manifest_id = m.id 742 - WHERE m.hold_endpoint = ? 743 - ` 744 - 745 - rows, err := h.db.Query(query, holdEndpoint) 371 + func (s *Server) DeleteLayerRecords(ctx context.Context, manifestURI string) error { 372 + // List all layer records 373 + records, err := s.ListRecords(ctx, LayerCollection, "") 746 374 if err != nil { 747 - http.Error(w, "database error", 500) 748 - return 375 + return err 749 376 } 750 - defer rows.Close() 751 377 752 - blobs := []string{} 753 - for rows.Next() { 754 - var digest string 755 - if err := rows.Scan(&digest); err != nil { 378 + // Delete records matching this manifest 379 + for _, record := range records { 380 + var layer LayerRecord 381 + if err := json.Unmarshal(record.Value, &layer); err != nil { 756 382 continue 757 383 } 758 - blobs = append(blobs, digest) 759 - } 760 - 761 - json.NewEncoder(w).Encode(map[string]any{ 762 - "blobs": blobs, 763 - "count": len(blobs), 764 - "hold": holdEndpoint, 765 - }) 766 - } 767 - ``` 768 - 769 - ### GC Cron Schedule 770 - 771 - ```go 772 - // cmd/hold/main.go 773 - 774 - func main() { 775 - // ... service setup ... 776 - 777 - // Start GC cron if enabled 778 - if os.Getenv("GC_ENABLED") == "true" { 779 - gcInterval := 24 * time.Hour // Daily by default 780 - 781 - go func() { 782 - ticker := time.NewTicker(gcInterval) 783 - defer ticker.Stop() 784 - 785 - for range ticker.C { 786 - if err := garbageCollector.Run(context.Background()); err != nil { 787 - log.Printf("GC error: %v", err) 788 - } 384 + if layer.Manifest == manifestURI { 385 + if err := s.DeleteRecord(ctx, LayerCollection, record.RKey); err != nil { 386 + log.Printf("Failed to delete layer record %s: %v", record.RKey, err) 789 387 } 790 - }() 791 - 792 - log.Printf("GC cron started: runs every %v", gcInterval) 388 + } 793 389 } 794 390 795 - // Start server... 391 + return nil 796 392 } 797 393 ``` 798 394 799 - ## Quota Reconciliation 395 + ### Quota After Deletion 800 396 801 - ### PDS as Source of Truth 397 + After deleting a manifest: 398 + - Layer records for that manifest are removed 399 + - Quota recalculated with `SELECT DISTINCT` query 400 + - If layer was only in deleted manifest → quota decreases 401 + - If layer exists in other manifests → quota unchanged (still deduplicated) 802 402 803 - **Key insight:** Manifest records in PDS are publicly readable (no OAuth needed for reads). 403 + ## Garbage Collection 804 404 805 - Each manifest contains: 806 - - Repository name 807 - - Digest 808 - - Layers array with digest + size 809 - - Hold endpoint 405 + ### Orphaned Blobs 810 406 811 - The hold service can query the PDS to calculate the user's true quota: 407 + Orphaned blobs accumulate when: 408 + 1. Manifest push fails after blobs uploaded 409 + 2. Quota exceeded - manifest rejected 410 + 3. User deletes manifest - blobs may no longer be referenced 812 411 813 - ``` 814 - 1. List all io.atcr.manifest records for user 815 - 2. Filter manifests where holdEndpoint == this hold service 816 - 3. Extract unique layers (deduplicate by digest) 817 - 4. Sum layer sizes = true quota usage 818 - 5. Compare to quota file 819 - 6. Fix discrepancies 820 - ``` 821 - 822 - ### Implementation 412 + ### GC Process 823 413 824 414 ```go 825 - // cmd/hold/quota/reconcile.go 415 + // pkg/hold/gc/gc.go 826 416 827 - type Reconciler struct { 828 - quotaManager *Manager 829 - atprotoResolver *atproto.Resolver 830 - holdURL string 831 - } 832 - 833 - // ReconcileUser recalculates quota from PDS manifests 834 - func (r *Reconciler) ReconcileUser(ctx context.Context, did string) error { 835 - log.Printf("Reconciling quota for %s", did) 836 - 837 - // Step 1: Resolve user's PDS endpoint 838 - identity, err := r.atprotoResolver.ResolveIdentity(ctx, did) 417 + func (gc *GarbageCollector) Run(ctx context.Context) error { 418 + // Step 1: Get all referenced digests from layer records 419 + records, err := gc.pds.ListRecords(ctx, LayerCollection, "") 839 420 if err != nil { 840 - return fmt.Errorf("failed to resolve DID: %w", err) 421 + return err 841 422 } 842 423 843 - // Step 2: Create unauthenticated ATProto client 844 - // (manifest records are public - no OAuth needed) 845 - client := atproto.NewClient(identity.PDSEndpoint, did, "") 846 - 847 - // Step 3: List all manifest records for this user 848 - manifests, err := client.ListRecords(ctx, atproto.ManifestCollection, 1000) 849 - if err != nil { 850 - return fmt.Errorf("failed to list manifests: %w", err) 851 - } 852 - 853 - // Step 4: Filter manifests stored in THIS hold service 854 - // and extract unique layers 855 - uniqueLayers := make(map[string]int64) // digest -> size 856 - 857 - for _, record := range manifests { 858 - var manifest atproto.ManifestRecord 859 - if err := json.Unmarshal(record.Value, &manifest); err != nil { 860 - log.Printf("Warning: failed to parse manifest: %v", err) 861 - continue 862 - } 863 - 864 - // Only count manifests stored in this hold 865 - if manifest.HoldEndpoint != r.holdURL { 424 + referenced := make(map[string]bool) 425 + for _, record := range records { 426 + var layer LayerRecord 427 + if err := json.Unmarshal(record.Value, &layer); err != nil { 866 428 continue 867 429 } 868 - 869 - // Add config blob 870 - if manifest.Config.Digest != "" { 871 - uniqueLayers[manifest.Config.Digest] = manifest.Config.Size 872 - } 873 - 874 - // Add layer blobs 875 - for _, layer := range manifest.Layers { 876 - uniqueLayers[layer.Digest] = layer.Size 877 - } 430 + referenced[layer.Digest] = true 878 431 } 879 432 880 - // Step 5: Calculate true quota usage 881 - trueUsage := int64(0) 882 - for _, size := range uniqueLayers { 883 - trueUsage += size 884 - } 885 - 886 - log.Printf("User %s true usage from PDS: %d bytes (%d unique layers)", 887 - did, trueUsage, len(uniqueLayers)) 433 + log.Printf("Found %d referenced blobs", len(referenced)) 888 434 889 - // Step 6: Compare with current quota file 890 - quota, err := r.quotaManager.GetQuota(did) 891 - if err != nil { 892 - log.Printf("No existing quota for %s, creating new", did) 893 - quota = &Quota{ 894 - DID: did, 895 - Limit: r.quotaManager.DefaultLimit, 896 - ClaimedLayers: make(map[string]int64), 435 + // Step 2: Walk S3 blobs and delete unreferenced 436 + var deleted, reclaimed int64 437 + err = gc.driver.Walk(ctx, "/docker/registry/v2/blobs", func(fi storagedriver.FileInfo) error { 438 + if fi.IsDir() { 439 + return nil 897 440 } 898 - } 899 441 900 - // Step 7: Fix discrepancies 901 - if quota.Used != trueUsage || len(quota.ClaimedLayers) != len(uniqueLayers) { 902 - log.Printf("Quota mismatch for %s: recorded=%d, actual=%d (diff=%d)", 903 - did, quota.Used, trueUsage, trueUsage - quota.Used) 904 - 905 - // Update quota to match PDS truth 906 - quota.Used = trueUsage 907 - quota.ClaimedLayers = uniqueLayers 908 - quota.LastUpdated = time.Now() 909 - 910 - if err := r.quotaManager.SaveQuota(quota); err != nil { 911 - return fmt.Errorf("failed to save reconciled quota: %w", err) 912 - } 913 - 914 - log.Printf("Reconciled quota for %s: %d bytes", did, trueUsage) 915 - } else { 916 - log.Printf("Quota for %s is accurate", did) 917 - } 918 - 919 - return nil 920 - } 921 - 922 - // ReconcileAll reconciles all users (run periodically) 923 - func (r *Reconciler) ReconcileAll(ctx context.Context) error { 924 - // Get list of all users with quota files 925 - users, err := r.quotaManager.ListUsers() 926 - if err != nil { 927 - return err 928 - } 929 - 930 - log.Printf("Starting reconciliation for %d users", len(users)) 931 - 932 - for _, did := range users { 933 - if err := r.ReconcileUser(ctx, did); err != nil { 934 - log.Printf("Failed to reconcile %s: %v", did, err) 935 - // Continue with other users 442 + digest := extractDigestFromPath(fi.Path()) 443 + if !referenced[digest] { 444 + size := fi.Size() 445 + if err := gc.driver.Delete(ctx, fi.Path()); err != nil { 446 + log.Printf("Failed to delete %s: %v", digest, err) 447 + return nil 448 + } 449 + deleted++ 450 + reclaimed += size 451 + log.Printf("GC: deleted %s (%d bytes)", digest, size) 936 452 } 937 - } 453 + return nil 454 + }) 938 455 939 - log.Println("Reconciliation complete") 940 - return nil 456 + log.Printf("GC complete: deleted %d blobs, reclaimed %d bytes", deleted, reclaimed) 457 + return err 941 458 } 942 459 ``` 943 460 944 - ### Reconciliation Cron 945 - 946 - ```go 947 - // cmd/hold/main.go 948 - 949 - func main() { 950 - // ... setup ... 951 - 952 - // Start reconciliation cron 953 - if os.Getenv("QUOTA_RECONCILE_ENABLED") == "true" { 954 - reconcileInterval := 24 * time.Hour // Daily 955 - 956 - go func() { 957 - ticker := time.NewTicker(reconcileInterval) 958 - defer ticker.Stop() 959 - 960 - for range ticker.C { 961 - if err := reconciler.ReconcileAll(context.Background()); err != nil { 962 - log.Printf("Reconciliation error: %v", err) 963 - } 964 - } 965 - }() 461 + ### GC Schedule 966 462 967 - log.Printf("Quota reconciliation cron started: runs every %v", reconcileInterval) 968 - } 969 - 970 - // ... start server ... 971 - } 463 + ```bash 464 + # Environment variable 465 + GC_ENABLED=true 466 + GC_INTERVAL=24h # Daily by default 972 467 ``` 973 468 974 - ### Why PDS as Source of Truth Works 975 - 976 - 1. **Manifests are canonical** - If manifest exists in PDS, user owns those layers 977 - 2. **Public reads** - No OAuth needed, just resolve DID → PDS endpoint 978 - 3. **ATProto durability** - PDS is user's authoritative data store 979 - 4. **AppView is cache** - AppView database might lag or have inconsistencies 980 - 5. **Reconciliation fixes drift** - Periodic sync from PDS ensures accuracy 981 - 982 - **Example reconciliation scenarios:** 983 - 984 - - **Orphaned quota entries:** User deleted manifest from PDS, but hold quota still has it 985 - → Reconciliation removes from claimed_layers 986 - 987 - - **Missing quota entries:** User pushed manifest, but quota update failed 988 - → Reconciliation adds to claimed_layers 989 - 990 - - **Race condition duplicates:** Two concurrent pushes double-counted a layer 991 - → Reconciliation fixes to actual usage 992 - 993 469 ## Configuration 994 470 995 471 ### Hold Service Environment Variables ··· 997 473 ```bash 998 474 # .env.hold 999 475 1000 - # ============================================================================ 1001 476 # Quota Configuration 1002 - # ============================================================================ 1003 - 1004 - # Enable quota enforcement 1005 477 QUOTA_ENABLED=true 1006 - 1007 - # Default quota limit per user (bytes) 1008 - # 10GB = 10737418240 1009 - # 50GB = 53687091200 1010 - # 100GB = 107374182400 1011 - QUOTA_DEFAULT_LIMIT=10737418240 1012 - 1013 - # Storage backend for quota data 1014 - # Options: s3, sqlite 1015 - QUOTA_STORAGE_BACKEND=s3 1016 - 1017 - # For S3-based storage: 1018 - # Quota files stored in same bucket as blobs 1019 - QUOTA_STORAGE_PREFIX=/atcr/quota/ 478 + QUOTA_DEFAULT_LIMIT=10737418240 # 10GB in bytes 1020 479 1021 - # For SQLite-based storage: 1022 - QUOTA_DB_PATH=/var/lib/atcr/hold-quota.db 1023 - 1024 - # ============================================================================ 1025 480 # Garbage Collection 1026 - # ============================================================================ 1027 - 1028 - # Enable periodic garbage collection 1029 481 GC_ENABLED=true 1030 - 1031 - # GC interval (default: 24h) 1032 482 GC_INTERVAL=24h 1033 - 1034 - # AppView URL for GC reference checking 1035 - APPVIEW_URL=https://atcr.io 1036 - 1037 - # ============================================================================ 1038 - # Quota Reconciliation 1039 - # ============================================================================ 1040 - 1041 - # Enable quota reconciliation from PDS 1042 - QUOTA_RECONCILE_ENABLED=true 1043 - 1044 - # Reconciliation interval (default: 24h) 1045 - QUOTA_RECONCILE_INTERVAL=24h 1046 - 1047 - # ============================================================================ 1048 - # Hold Service Identity (Required) 1049 - # ============================================================================ 1050 - 1051 - # Public URL of this hold service 1052 - HOLD_PUBLIC_URL=https://hold1.example.com 1053 - 1054 - # Owner DID (for auto-registration) 1055 - HOLD_OWNER=did:plc:xyz123 1056 483 ``` 1057 484 1058 - ### AppView Configuration 485 + ### Quota Limits by Bytes 1059 486 1060 - ```bash 1061 - # .env.appview 1062 - 1063 - # Internal API endpoint for hold services 1064 - # Used for GC reference checking 1065 - ATCR_INTERNAL_API_ENABLED=true 1066 - 1067 - # Optional: authentication token for internal APIs 1068 - ATCR_INTERNAL_API_TOKEN=secret123 1069 - ``` 1070 - 1071 - ## Trade-offs & Design Decisions 1072 - 1073 - ### 1. Claimed Storage vs Physical Storage 1074 - 1075 - **Decision:** Track claimed storage (logical accounting) 1076 - 1077 - **Why:** 1078 - - Predictable for users: "you pay for what you upload" 1079 - - No complex cross-user dependencies 1080 - - Delete always gives you quota back 1081 - - Matches Harbor's proven model 1082 - 1083 - **Trade-off:** 1084 - - Total claimed can exceed physical storage 1085 - - Users might complain "I uploaded 10GB but S3 only has 6GB" 1086 - 1087 - **Mitigation:** 1088 - - Show deduplication savings metric 1089 - - Educate users: "You claimed 10GB, but deduplication saved 4GB" 1090 - 1091 - ### 2. S3 vs SQLite for Quota Storage 1092 - 1093 - **Decision:** Support both, recommend based on use case 1094 - 1095 - **S3 Pros:** 1096 - - No database to manage 1097 - - Quota data lives with blobs 1098 - - Better for ephemeral BYOS 1099 - 1100 - **SQLite Pros:** 1101 - - Faster (no network) 1102 - - ACID transactions (no race conditions) 1103 - - Better for high-traffic shared holds 1104 - 1105 - **Trade-off:** 1106 - - S3: eventual consistency, race conditions 1107 - - SQLite: stateful service, scaling challenges 1108 - 1109 - **Mitigation:** 1110 - - Reconciliation fixes S3 inconsistencies 1111 - - SQLite can use shared DB for multi-instance 1112 - 1113 - ### 3. Optimistic Quota Update 1114 - 1115 - **Decision:** Update quota BEFORE upload completes 1116 - 1117 - **Why:** 1118 - - Prevent race conditions (two users uploading simultaneously) 1119 - - Can reject before presigned URL generated 1120 - - Simpler flow 1121 - 1122 - **Trade-off:** 1123 - - If upload fails, quota already incremented (user "paid" for nothing) 1124 - 1125 - **Mitigation:** 1126 - - Reconciliation from PDS fixes orphaned quota entries 1127 - - Acceptable for MVP (upload failures are rare) 1128 - 1129 - ### 4. AppView as Intermediary 1130 - 1131 - **Decision:** AppView notifies hold service on deletes 1132 - 1133 - **Why:** 1134 - - AppView already has manifest/layer database 1135 - - Can efficiently check if layer still referenced 1136 - - Hold service doesn't need to query PDS on every delete 1137 - 1138 - **Trade-off:** 1139 - - AppView → Hold dependency 1140 - - Network hop on delete 1141 - 1142 - **Mitigation:** 1143 - - If notification fails, reconciliation fixes quota 1144 - - Eventually consistent is acceptable 1145 - 1146 - ### 5. PDS as Source of Truth 1147 - 1148 - **Decision:** Use PDS manifests for reconciliation 1149 - 1150 - **Why:** 1151 - - Manifests in PDS are canonical user data 1152 - - Public reads (no OAuth for reconciliation) 1153 - - AppView database might lag or be inconsistent 1154 - 1155 - **Trade-off:** 1156 - - Reconciliation requires PDS queries (slower) 1157 - - Limited to 1000 manifests per query 1158 - 1159 - **Mitigation:** 1160 - - Run reconciliation daily (not real-time) 1161 - - Paginate if user has >1000 manifests 487 + | Size | Bytes | 488 + |------|-------| 489 + | 1 GB | 1073741824 | 490 + | 5 GB | 5368709120 | 491 + | 10 GB | 10737418240 | 492 + | 50 GB | 53687091200 | 493 + | 100 GB | 107374182400 | 1162 494 1163 495 ## Future Enhancements 1164 496 1165 497 ### 1. Quota API Endpoints 1166 498 1167 499 ``` 1168 - GET /quota/usage - Get current user's quota 1169 - GET /quota/breakdown - Get storage by repository 1170 - POST /quota/limit - Update user's quota limit (admin) 1171 - GET /quota/stats - Get hold-wide statistics 500 + GET /xrpc/io.atcr.hold.getQuota?did={userDID} - Get user's quota usage 501 + GET /xrpc/io.atcr.hold.getQuotaBreakdown - Storage by repository 1172 502 ``` 1173 503 1174 504 ### 2. Quota Alerts 1175 505 1176 - Notify users when approaching limit: 1177 - - Email/webhook at 80%, 90%, 95% 1178 - - Reject uploads at 100% (currently implemented) 1179 - - Grace period: allow 105% temporarily 506 + - Warning thresholds at 80%, 90%, 95% 507 + - Email/webhook notifications 508 + - Grace period before hard enforcement 1180 509 1181 510 ### 3. Tiered Quotas 1182 511 1183 - Different limits based on user tier: 1184 - - Free: 10GB 1185 - - Pro: 100GB 1186 - - Enterprise: unlimited 1187 - 1188 - ### 4. Quota Purchasing 1189 - 1190 - Allow users to buy additional storage: 1191 - - Stripe integration 1192 - - $0.10/GB/month pricing 1193 - - Dynamic limit updates 1194 - 1195 - ### 5. Cross-Hold Deduplication 1196 - 1197 - If multiple holds share same S3 bucket: 1198 - - Track blob ownership globally 1199 - - Split costs proportionally 1200 - - More complex, but maximizes deduplication 1201 - 1202 - ### 6. Manifest-Based Quota (Alternative Model) 1203 - 1204 - Instead of tracking layers, track manifests: 1205 - - Simpler: just count manifest sizes 1206 - - No deduplication benefits for users 1207 - - Might be acceptable for some use cases 1208 - 1209 - ### 7. Redis-Based Quota (High Performance) 1210 - 1211 - For high-traffic registries: 1212 - - Use Redis instead of S3/SQLite 1213 - - Sub-millisecond quota checks 1214 - - Harbor-proven approach 1215 - 1216 - ### 8. Quota Visualizations 1217 - 1218 - Web UI showing: 1219 - - Storage usage over time 1220 - - Top consumers by repository 1221 - - Deduplication savings graph 1222 - - Layer size distribution 512 + | Tier | Limit | 513 + |------|-------| 514 + | Free | 10 GB | 515 + | Pro | 100 GB | 516 + | Enterprise | Unlimited | 1223 517 1224 - ## Appendix: SQL Queries 518 + ### 4. Rate Limiting 1225 519 1226 - ### Check if User Still References Layer 520 + Pull rate limits (Docker Hub style): 521 + - Anonymous: 100 pulls per 6 hours per IP 522 + - Authenticated: 200 pulls per 6 hours 523 + - Paid: Unlimited 1227 524 1228 - ```sql 1229 - -- After deleting manifest, check if user has other manifests using this layer 1230 - SELECT COUNT(*) 1231 - FROM layers l 1232 - JOIN manifests m ON l.manifest_id = m.id 1233 - WHERE m.did = ? -- User's DID 1234 - AND l.digest = ? -- Layer digest to check 1235 - AND m.id != ? -- Exclude the manifest being deleted 1236 - ``` 525 + ### 5. Quota Purchasing 1237 526 1238 - ### Get All Unique Layers for User 1239 - 1240 - ```sql 1241 - -- Calculate true quota usage for a user 1242 - SELECT DISTINCT l.digest, l.size 1243 - FROM layers l 1244 - JOIN manifests m ON l.manifest_id = m.id 1245 - WHERE m.did = ? 1246 - AND m.hold_endpoint = ? 1247 - ``` 1248 - 1249 - ### Get Referenced Blobs for Hold 1250 - 1251 - ```sql 1252 - -- For GC: get all blobs still referenced by any user of this hold 1253 - SELECT DISTINCT l.digest 1254 - FROM layers l 1255 - JOIN manifests m ON l.manifest_id = m.id 1256 - WHERE m.hold_endpoint = ? 1257 - ``` 1258 - 1259 - ### Get Storage Stats by Repository 1260 - 1261 - ```sql 1262 - -- User's storage broken down by repository 1263 - SELECT 1264 - m.repository, 1265 - COUNT(DISTINCT m.id) as manifest_count, 1266 - COUNT(DISTINCT l.digest) as unique_layers, 1267 - SUM(l.size) as total_size 1268 - FROM manifests m 1269 - JOIN layers l ON l.manifest_id = m.id 1270 - WHERE m.did = ? 1271 - AND m.hold_endpoint = ? 1272 - GROUP BY m.repository 1273 - ORDER BY total_size DESC 1274 - ``` 527 + - Stripe integration for additional storage 528 + - $0.10/GB/month pricing (industry standard) 1275 529 1276 530 ## References 1277 531 1278 532 - **Harbor Quotas:** https://goharbor.io/docs/1.10/administration/configure-project-quotas/ 1279 - - **Harbor Source:** https://github.com/goharbor/harbor 1280 533 - **ATProto Spec:** https://atproto.com/specs/record 1281 534 - **OCI Distribution Spec:** https://github.com/opencontainers/distribution-spec 1282 - - **S3 API Reference:** https://docs.aws.amazon.com/AmazonS3/latest/API/ 1283 - - **Distribution GC:** https://github.com/distribution/distribution/blob/main/registry/storage/garbagecollect.go 1284 535 1285 536 --- 1286 537 1287 - **Document Version:** 1.0 1288 - **Last Updated:** 2025-10-09 1289 - **Author:** Generated from implementation research and Harbor analysis 538 + **Document Version:** 2.0 539 + **Last Updated:** 2026-01-04 540 + **Model:** Per-user layer tracking with ATProto records
+4 -9
lexicons/io/atcr/hold/layer.json
··· 8 8 "description": "Represents metadata about a container layer stored in the hold. Stored in the hold's embedded PDS for tracking and analytics.", 9 9 "record": { 10 10 "type": "object", 11 - "required": ["digest", "size", "mediaType", "repository", "userDid", "userHandle", "createdAt"], 11 + "required": ["digest", "size", "mediaType", "manifest", "userDid", "createdAt"], 12 12 "properties": { 13 13 "digest": { 14 14 "type": "string", ··· 24 24 "description": "Media type (e.g., application/vnd.oci.image.layer.v1.tar+gzip)", 25 25 "maxLength": 128 26 26 }, 27 - "repository": { 27 + "manifest": { 28 28 "type": "string", 29 - "description": "Repository this layer belongs to", 30 - "maxLength": 255 29 + "format": "at-uri", 30 + "description": "AT-URI of the manifest that included this layer (e.g., at://did:plc:xyz/io.atcr.manifest/abc123)" 31 31 }, 32 32 "userDid": { 33 33 "type": "string", 34 34 "format": "did", 35 35 "description": "DID of user who uploaded this layer" 36 - }, 37 - "userHandle": { 38 - "type": "string", 39 - "format": "handle", 40 - "description": "Handle of user (for display purposes)" 41 36 }, 42 37 "createdAt": { 43 38 "type": "string",
+129
pkg/appview/handlers/storage.go
··· 1 + package handlers 2 + 3 + import ( 4 + "encoding/json" 5 + "fmt" 6 + "html/template" 7 + "log/slog" 8 + "net/http" 9 + 10 + "atcr.io/pkg/appview/middleware" 11 + "atcr.io/pkg/appview/storage" 12 + "atcr.io/pkg/atproto" 13 + "atcr.io/pkg/auth/oauth" 14 + ) 15 + 16 + // StorageHandler handles the storage quota API endpoint 17 + // Returns an HTML partial for HTMX to swap into the settings page 18 + type StorageHandler struct { 19 + Templates *template.Template 20 + Refresher *oauth.Refresher 21 + } 22 + 23 + // QuotaStats mirrors the hold service response 24 + type QuotaStats struct { 25 + UserDID string `json:"userDid"` 26 + UniqueBlobs int `json:"uniqueBlobs"` 27 + TotalSize int64 `json:"totalSize"` 28 + } 29 + 30 + func (h *StorageHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 31 + user := middleware.GetUser(r) 32 + if user == nil { 33 + http.Error(w, "Unauthorized", http.StatusUnauthorized) 34 + return 35 + } 36 + 37 + // Create ATProto client with session provider 38 + client := atproto.NewClientWithSessionProvider(user.PDSEndpoint, user.DID, h.Refresher) 39 + 40 + // Get user's sailor profile to find their default hold 41 + profile, err := storage.GetProfile(r.Context(), client) 42 + if err != nil { 43 + slog.Warn("Failed to get profile for storage quota", "did", user.DID, "error", err) 44 + h.renderError(w, "Failed to load profile") 45 + return 46 + } 47 + 48 + if profile == nil || profile.DefaultHold == "" { 49 + // No default hold configured - can't check quota 50 + h.renderNoHold(w) 51 + return 52 + } 53 + 54 + // Resolve hold URL from DID 55 + holdURL := atproto.ResolveHoldURL(profile.DefaultHold) 56 + if holdURL == "" { 57 + slog.Warn("Failed to resolve hold URL", "did", user.DID, "holdDid", profile.DefaultHold) 58 + h.renderError(w, "Failed to resolve hold service") 59 + return 60 + } 61 + 62 + // Call the hold's quota endpoint 63 + quotaURL := fmt.Sprintf("%s%s?userDid=%s", holdURL, atproto.HoldGetQuota, user.DID) 64 + resp, err := http.Get(quotaURL) 65 + if err != nil { 66 + slog.Warn("Failed to fetch quota from hold", "did", user.DID, "holdURL", holdURL, "error", err) 67 + h.renderError(w, "Failed to connect to hold service") 68 + return 69 + } 70 + defer resp.Body.Close() 71 + 72 + if resp.StatusCode != http.StatusOK { 73 + slog.Warn("Hold returned error for quota", "did", user.DID, "status", resp.StatusCode) 74 + h.renderError(w, "Hold service returned an error") 75 + return 76 + } 77 + 78 + var stats QuotaStats 79 + if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil { 80 + slog.Warn("Failed to decode quota response", "did", user.DID, "error", err) 81 + h.renderError(w, "Failed to parse quota data") 82 + return 83 + } 84 + 85 + // Render the stats partial 86 + h.renderStats(w, stats) 87 + } 88 + 89 + func (h *StorageHandler) renderStats(w http.ResponseWriter, stats QuotaStats) { 90 + data := struct { 91 + UniqueBlobs int 92 + TotalSize int64 93 + HumanSize string 94 + }{ 95 + UniqueBlobs: stats.UniqueBlobs, 96 + TotalSize: stats.TotalSize, 97 + HumanSize: humanizeBytes(stats.TotalSize), 98 + } 99 + 100 + w.Header().Set("Content-Type", "text/html") 101 + if err := h.Templates.ExecuteTemplate(w, "storage_stats", data); err != nil { 102 + slog.Error("Failed to render storage stats template", "error", err) 103 + http.Error(w, "Failed to render template", http.StatusInternalServerError) 104 + } 105 + } 106 + 107 + func (h *StorageHandler) renderError(w http.ResponseWriter, message string) { 108 + w.Header().Set("Content-Type", "text/html") 109 + fmt.Fprintf(w, `<div class="storage-error"><i data-lucide="alert-circle"></i> %s</div>`, message) 110 + } 111 + 112 + func (h *StorageHandler) renderNoHold(w http.ResponseWriter) { 113 + w.Header().Set("Content-Type", "text/html") 114 + fmt.Fprint(w, `<div class="storage-info"><i data-lucide="info"></i> No hold configured. Set a default hold above to see storage usage.</div>`) 115 + } 116 + 117 + // humanizeBytes converts bytes to human-readable format 118 + func humanizeBytes(bytes int64) string { 119 + const unit = 1024 120 + if bytes < unit { 121 + return fmt.Sprintf("%d B", bytes) 122 + } 123 + div, exp := int64(unit), 0 124 + for n := bytes / unit; n >= unit; n /= unit { 125 + div *= unit 126 + exp++ 127 + } 128 + return fmt.Sprintf("%.1f %cB", float64(bytes)/float64(div), "KMGTPE"[exp]) 129 + }
+5
pkg/appview/routes/routes.go
··· 174 174 RegistryURL: registryURL, 175 175 }).ServeHTTP) 176 176 177 + r.Get("/api/storage", (&uihandlers.StorageHandler{ 178 + Templates: deps.Templates, 179 + Refresher: deps.Refresher, 180 + }).ServeHTTP) 181 + 177 182 r.Post("/api/profile/default-hold", (&uihandlers.UpdateDefaultHoldHandler{ 178 183 Refresher: deps.Refresher, 179 184 }).ServeHTTP)
+5 -4
pkg/appview/storage/manifest_store.go
··· 325 325 serviceToken := s.ctx.ServiceToken 326 326 327 327 // Build notification request 328 + // Note: userHandle is resolved from userDid on the hold side (cached, 24-hour TTL) 328 329 notifyReq := map[string]any{ 329 - "repository": s.ctx.Repository, 330 - "userDid": s.ctx.DID, 331 - "userHandle": s.ctx.Handle, 332 - "operation": operation, 330 + "repository": s.ctx.Repository, 331 + "userDid": s.ctx.DID, 332 + "manifestDigest": manifestDigest, 333 + "operation": operation, 333 334 } 334 335 335 336 // For push operations, include full manifest data
+59
pkg/appview/templates/pages/settings.html
··· 29 29 </div> 30 30 </section> 31 31 32 + <!-- Storage Usage Section --> 33 + <section class="settings-section storage-section"> 34 + <h2>Storage Usage</h2> 35 + <p>Estimated storage usage on your default hold.</p> 36 + <div id="storage-stats" hx-get="/api/storage" hx-trigger="load" hx-swap="innerHTML"> 37 + <p><i data-lucide="loader-2" class="spin"></i> Loading...</p> 38 + </div> 39 + </section> 40 + 32 41 <!-- Default Hold Section --> 33 42 <section class="settings-section"> 34 43 <h2>Default Hold</h2> ··· 200 209 </script> 201 210 202 211 <style> 212 + /* Storage Section Styles */ 213 + .storage-section .storage-stats { 214 + background: var(--code-bg); 215 + padding: 1rem; 216 + border-radius: 4px; 217 + margin-top: 0.5rem; 218 + } 219 + .storage-section .stat-row { 220 + display: flex; 221 + justify-content: space-between; 222 + padding: 0.5rem 0; 223 + border-bottom: 1px solid var(--border); 224 + } 225 + .storage-section .stat-row:last-child { 226 + border-bottom: none; 227 + } 228 + .storage-section .stat-label { 229 + color: var(--fg-muted); 230 + } 231 + .storage-section .stat-value { 232 + font-weight: bold; 233 + font-family: monospace; 234 + } 235 + .storage-section .storage-error, 236 + .storage-section .storage-info { 237 + padding: 1rem; 238 + border-radius: 4px; 239 + margin-top: 0.5rem; 240 + display: flex; 241 + align-items: center; 242 + gap: 0.5rem; 243 + } 244 + .storage-section .storage-error { 245 + background: var(--error-bg, #fef2f2); 246 + color: var(--error, #dc2626); 247 + border: 1px solid var(--error, #dc2626); 248 + } 249 + .storage-section .storage-info { 250 + background: var(--info-bg, #eff6ff); 251 + color: var(--info, #2563eb); 252 + border: 1px solid var(--info, #2563eb); 253 + } 254 + .spin { 255 + animation: spin 1s linear infinite; 256 + } 257 + @keyframes spin { 258 + from { transform: rotate(0deg); } 259 + to { transform: rotate(360deg); } 260 + } 261 + 203 262 /* Devices Section Styles */ 204 263 .devices-section .setup-instructions { 205 264 margin: 1rem 0;
+12
pkg/appview/templates/partials/storage_stats.html
··· 1 + {{ define "storage_stats" }} 2 + <div class="storage-stats"> 3 + <div class="stat-row"> 4 + <span class="stat-label">Unique Blobs:</span> 5 + <span class="stat-value">{{ .UniqueBlobs }}</span> 6 + </div> 7 + <div class="stat-row"> 8 + <span class="stat-label">Total Storage:</span> 9 + <span class="stat-value">{{ .HumanSize }}</span> 10 + </div> 11 + </div> 12 + {{ end }}
+36 -70
pkg/atproto/cbor_gen.go
··· 654 654 655 655 cw := cbg.NewCborWriter(w) 656 656 657 - if _, err := cw.Write([]byte{168}); err != nil { 657 + if _, err := cw.Write([]byte{167}); err != nil { 658 658 return err 659 659 } 660 660 ··· 749 749 return err 750 750 } 751 751 752 + // t.Manifest (string) (string) 753 + if len("manifest") > 8192 { 754 + return xerrors.Errorf("Value in field \"manifest\" was too long") 755 + } 756 + 757 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("manifest"))); err != nil { 758 + return err 759 + } 760 + if _, err := cw.WriteString(string("manifest")); err != nil { 761 + return err 762 + } 763 + 764 + if len(t.Manifest) > 8192 { 765 + return xerrors.Errorf("Value in field t.Manifest was too long") 766 + } 767 + 768 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Manifest))); err != nil { 769 + return err 770 + } 771 + if _, err := cw.WriteString(string(t.Manifest)); err != nil { 772 + return err 773 + } 774 + 752 775 // t.CreatedAt (string) (string) 753 776 if len("createdAt") > 8192 { 754 777 return xerrors.Errorf("Value in field \"createdAt\" was too long") ··· 794 817 if _, err := cw.WriteString(string(t.MediaType)); err != nil { 795 818 return err 796 819 } 797 - 798 - // t.Repository (string) (string) 799 - if len("repository") > 8192 { 800 - return xerrors.Errorf("Value in field \"repository\" was too long") 801 - } 802 - 803 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("repository"))); err != nil { 804 - return err 805 - } 806 - if _, err := cw.WriteString(string("repository")); err != nil { 807 - return err 808 - } 809 - 810 - if len(t.Repository) > 8192 { 811 - return xerrors.Errorf("Value in field t.Repository was too long") 812 - } 813 - 814 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Repository))); err != nil { 815 - return err 816 - } 817 - if _, err := cw.WriteString(string(t.Repository)); err != nil { 818 - return err 819 - } 820 - 821 - // t.UserHandle (string) (string) 822 - if len("userHandle") > 8192 { 823 - return xerrors.Errorf("Value in field \"userHandle\" was too long") 824 - } 825 - 826 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("userHandle"))); err != nil { 827 - return err 828 - } 829 - if _, err := cw.WriteString(string("userHandle")); err != nil { 830 - return err 831 - } 832 - 833 - if len(t.UserHandle) > 8192 { 834 - return xerrors.Errorf("Value in field t.UserHandle was too long") 835 - } 836 - 837 - if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.UserHandle))); err != nil { 838 - return err 839 - } 840 - if _, err := cw.WriteString(string(t.UserHandle)); err != nil { 841 - return err 842 - } 843 820 return nil 844 821 } 845 822 ··· 868 845 869 846 n := extra 870 847 871 - nameBuf := make([]byte, 10) 848 + nameBuf := make([]byte, 9) 872 849 for i := uint64(0); i < n; i++ { 873 850 nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 8192) 874 851 if err != nil { ··· 942 920 943 921 t.UserDID = string(sval) 944 922 } 923 + // t.Manifest (string) (string) 924 + case "manifest": 925 + 926 + { 927 + sval, err := cbg.ReadStringWithMax(cr, 8192) 928 + if err != nil { 929 + return err 930 + } 931 + 932 + t.Manifest = string(sval) 933 + } 945 934 // t.CreatedAt (string) (string) 946 935 case "createdAt": 947 936 ··· 963 952 } 964 953 965 954 t.MediaType = string(sval) 966 - } 967 - // t.Repository (string) (string) 968 - case "repository": 969 - 970 - { 971 - sval, err := cbg.ReadStringWithMax(cr, 8192) 972 - if err != nil { 973 - return err 974 - } 975 - 976 - t.Repository = string(sval) 977 - } 978 - // t.UserHandle (string) (string) 979 - case "userHandle": 980 - 981 - { 982 - sval, err := cbg.ReadStringWithMax(cr, 8192) 983 - if err != nil { 984 - return err 985 - } 986 - 987 - t.UserHandle = string(sval) 988 955 } 989 956 990 957 default:
+6
pkg/atproto/endpoints.go
··· 51 51 // Request: {"ownerDid": "...", "repository": "...", "pullCount": 10, "pushCount": 5, "lastPull": "...", "lastPush": "..."} 52 52 // Response: {"success": true} 53 53 HoldSetStats = "/xrpc/io.atcr.hold.setStats" 54 + 55 + // HoldGetQuota returns storage quota information for a user. 56 + // Method: GET 57 + // Query: userDid={did} 58 + // Response: {"userDid": "...", "uniqueBlobs": 10, "totalSize": 1073741824} 59 + HoldGetQuota = "/xrpc/io.atcr.hold.getQuota" 54 60 ) 55 61 56 62 // Hold service crew management endpoints (io.atcr.hold.*)
+16 -17
pkg/atproto/lexicon.go
··· 602 602 // Stored in the hold's embedded PDS for tracking and analytics 603 603 // Uses CBOR encoding for efficient storage in hold's carstore 604 604 type LayerRecord struct { 605 - Type string `json:"$type" cborgen:"$type"` 606 - Digest string `json:"digest" cborgen:"digest"` // Layer digest (e.g., "sha256:abc123...") 607 - Size int64 `json:"size" cborgen:"size"` // Size in bytes 608 - MediaType string `json:"mediaType" cborgen:"mediaType"` // Media type (e.g., "application/vnd.oci.image.layer.v1.tar+gzip") 609 - Repository string `json:"repository" cborgen:"repository"` // Repository this layer belongs to 610 - UserDID string `json:"userDid" cborgen:"userDid"` // DID of user who uploaded this layer 611 - UserHandle string `json:"userHandle" cborgen:"userHandle"` // Handle of user (for display purposes) 612 - CreatedAt string `json:"createdAt" cborgen:"createdAt"` // RFC3339 timestamp 605 + Type string `json:"$type" cborgen:"$type"` 606 + Digest string `json:"digest" cborgen:"digest"` // Layer digest (e.g., "sha256:abc123...") 607 + Size int64 `json:"size" cborgen:"size"` // Size in bytes 608 + MediaType string `json:"mediaType" cborgen:"mediaType"` // Media type (e.g., "application/vnd.oci.image.layer.v1.tar+gzip") 609 + Manifest string `json:"manifest" cborgen:"manifest"` // AT-URI of manifest that included this layer 610 + UserDID string `json:"userDid" cborgen:"userDid"` // DID of user who uploaded this layer 611 + CreatedAt string `json:"createdAt" cborgen:"createdAt"` // RFC3339 timestamp 613 612 } 614 613 615 614 // NewLayerRecord creates a new layer record 616 - func NewLayerRecord(digest string, size int64, mediaType, repository, userDID, userHandle string) *LayerRecord { 615 + // manifestURI: AT-URI of the manifest (e.g., "at://did:plc:xyz/io.atcr.manifest/abc123") 616 + func NewLayerRecord(digest string, size int64, mediaType, userDID, manifestURI string) *LayerRecord { 617 617 return &LayerRecord{ 618 - Type: LayerCollection, 619 - Digest: digest, 620 - Size: size, 621 - MediaType: mediaType, 622 - Repository: repository, 623 - UserDID: userDID, 624 - UserHandle: userHandle, 625 - CreatedAt: time.Now().Format(time.RFC3339), 618 + Type: LayerCollection, 619 + Digest: digest, 620 + Size: size, 621 + MediaType: mediaType, 622 + Manifest: manifestURI, 623 + UserDID: userDID, 624 + CreatedAt: time.Now().Format(time.RFC3339), 626 625 } 627 626 } 628 627
+36 -49
pkg/atproto/lexicon_test.go
··· 1089 1089 1090 1090 func TestNewLayerRecord(t *testing.T) { 1091 1091 tests := []struct { 1092 - name string 1093 - digest string 1094 - size int64 1095 - mediaType string 1096 - repository string 1097 - userDID string 1098 - userHandle string 1092 + name string 1093 + digest string 1094 + size int64 1095 + mediaType string 1096 + userDID string 1097 + manifestURI string 1099 1098 }{ 1100 1099 { 1101 - name: "standard layer", 1102 - digest: "sha256:abc123", 1103 - size: 1024, 1104 - mediaType: "application/vnd.oci.image.layer.v1.tar+gzip", 1105 - repository: "myapp", 1106 - userDID: "did:plc:user123", 1107 - userHandle: "alice.bsky.social", 1100 + name: "standard layer", 1101 + digest: "sha256:abc123", 1102 + size: 1024, 1103 + mediaType: "application/vnd.oci.image.layer.v1.tar+gzip", 1104 + userDID: "did:plc:user123", 1105 + manifestURI: "at://did:plc:user123/io.atcr.manifest/abc123", 1108 1106 }, 1109 1107 { 1110 - name: "large layer", 1111 - digest: "sha256:def456", 1112 - size: 1073741824, // 1GB 1113 - mediaType: "application/vnd.oci.image.layer.v1.tar+gzip", 1114 - repository: "largeapp", 1115 - userDID: "did:plc:user456", 1116 - userHandle: "bob.example.com", 1108 + name: "large layer", 1109 + digest: "sha256:def456", 1110 + size: 1073741824, // 1GB 1111 + mediaType: "application/vnd.oci.image.layer.v1.tar+gzip", 1112 + userDID: "did:plc:user456", 1113 + manifestURI: "at://did:plc:user456/io.atcr.manifest/def456", 1117 1114 }, 1118 1115 { 1119 - name: "empty values", 1120 - digest: "", 1121 - size: 0, 1122 - mediaType: "", 1123 - repository: "", 1124 - userDID: "", 1125 - userHandle: "", 1116 + name: "empty values", 1117 + digest: "", 1118 + size: 0, 1119 + mediaType: "", 1120 + userDID: "", 1121 + manifestURI: "", 1126 1122 }, 1127 1123 { 1128 - name: "config layer", 1129 - digest: "sha256:config123", 1130 - size: 512, 1131 - mediaType: "application/vnd.oci.image.config.v1+json", 1132 - repository: "app/subapp", 1133 - userDID: "did:web:example.com", 1134 - userHandle: "charlie.tangled.io", 1124 + name: "config layer", 1125 + digest: "sha256:config123", 1126 + size: 512, 1127 + mediaType: "application/vnd.oci.image.config.v1+json", 1128 + userDID: "did:web:example.com", 1129 + manifestURI: "at://did:web:example.com/io.atcr.manifest/config123", 1135 1130 }, 1136 1131 } 1137 1132 1138 1133 for _, tt := range tests { 1139 1134 t.Run(tt.name, func(t *testing.T) { 1140 - record := NewLayerRecord(tt.digest, tt.size, tt.mediaType, tt.repository, tt.userDID, tt.userHandle) 1135 + record := NewLayerRecord(tt.digest, tt.size, tt.mediaType, tt.userDID, tt.manifestURI) 1141 1136 1142 1137 // Verify all fields 1143 1138 if record == nil { ··· 1160 1155 t.Errorf("MediaType = %q, want %q", record.MediaType, tt.mediaType) 1161 1156 } 1162 1157 1163 - if record.Repository != tt.repository { 1164 - t.Errorf("Repository = %q, want %q", record.Repository, tt.repository) 1158 + if record.Manifest != tt.manifestURI { 1159 + t.Errorf("Manifest = %q, want %q", record.Manifest, tt.manifestURI) 1165 1160 } 1166 1161 1167 1162 if record.UserDID != tt.userDID { 1168 1163 t.Errorf("UserDID = %q, want %q", record.UserDID, tt.userDID) 1169 - } 1170 - 1171 - if record.UserHandle != tt.userHandle { 1172 - t.Errorf("UserHandle = %q, want %q", record.UserHandle, tt.userHandle) 1173 1164 } 1174 1165 1175 1166 // Verify CreatedAt is set and is a valid RFC3339 timestamp ··· 1192 1183 "sha256:abc123", 1193 1184 1024, 1194 1185 "application/vnd.oci.image.layer.v1.tar+gzip", 1195 - "myapp", 1196 1186 "did:plc:user123", 1197 - "alice.bsky.social", 1187 + "at://did:plc:user123/io.atcr.manifest/abc123", 1198 1188 ) 1199 1189 1200 1190 // Marshal to JSON ··· 1222 1212 if decoded.MediaType != record.MediaType { 1223 1213 t.Errorf("MediaType = %q, want %q", decoded.MediaType, record.MediaType) 1224 1214 } 1225 - if decoded.Repository != record.Repository { 1226 - t.Errorf("Repository = %q, want %q", decoded.Repository, record.Repository) 1215 + if decoded.Manifest != record.Manifest { 1216 + t.Errorf("Manifest = %q, want %q", decoded.Manifest, record.Manifest) 1227 1217 } 1228 1218 if decoded.UserDID != record.UserDID { 1229 1219 t.Errorf("UserDID = %q, want %q", decoded.UserDID, record.UserDID) 1230 - } 1231 - if decoded.UserHandle != record.UserHandle { 1232 - t.Errorf("UserHandle = %q, want %q", decoded.UserHandle, record.UserHandle) 1233 1220 } 1234 1221 if decoded.CreatedAt != record.CreatedAt { 1235 1222 t.Errorf("CreatedAt = %q, want %q", decoded.CreatedAt, record.CreatedAt)
+18 -9
pkg/hold/oci/xrpc.go
··· 217 217 218 218 // Parse request 219 219 var req struct { 220 - Repository string `json:"repository"` 221 - Tag string `json:"tag"` 222 - UserDID string `json:"userDid"` 223 - UserHandle string `json:"userHandle"` 224 - Operation string `json:"operation"` // "push" or "pull", defaults to "push" for backward compatibility 225 - Manifest struct { 220 + Repository string `json:"repository"` 221 + Tag string `json:"tag"` 222 + UserDID string `json:"userDid"` 223 + ManifestDigest string `json:"manifestDigest"` // For building layer record AT-URIs 224 + Operation string `json:"operation"` // "push" or "pull", defaults to "push" for backward compatibility 225 + Manifest struct { 226 226 MediaType string `json:"mediaType"` 227 227 Config struct { 228 228 Digest string `json:"digest"` ··· 287 287 postsEnabled = h.enableBlueskyPosts 288 288 } 289 289 290 + // Build manifest AT-URI for layer records 291 + manifestURI := atproto.BuildManifestURI(req.UserDID, req.ManifestDigest) 292 + 290 293 // Create layer records for each blob 291 294 for _, layer := range req.Manifest.Layers { 292 295 record := atproto.NewLayerRecord( 293 296 layer.Digest, 294 297 layer.Size, 295 298 layer.MediaType, 296 - req.Repository, 297 299 req.UserDID, 298 - req.UserHandle, 300 + manifestURI, 299 301 ) 300 302 301 303 _, _, err := h.pds.CreateLayerRecord(ctx, record) ··· 329 331 330 332 // Create Bluesky post if enabled 331 333 if postsEnabled { 334 + // Resolve handle from DID (cached, 24-hour TTL) 335 + _, userHandle, _, resolveErr := atproto.ResolveIdentity(ctx, req.UserDID) 336 + if resolveErr != nil { 337 + slog.Warn("Failed to resolve handle for user", "did", req.UserDID, "error", resolveErr) 338 + userHandle = req.UserDID // Fallback to DID if resolution fails 339 + } 340 + 332 341 // Extract manifest digest from first layer (or use config digest as fallback) 333 342 manifestDigest := req.Manifest.Config.Digest 334 343 if len(req.Manifest.Layers) > 0 { ··· 340 349 h.driver, 341 350 req.Repository, 342 351 req.Tag, 343 - req.UserHandle, 352 + userHandle, 344 353 req.UserDID, 345 354 manifestDigest, 346 355 totalSize,
+103
pkg/hold/pds/layer.go
··· 5 5 "fmt" 6 6 7 7 "atcr.io/pkg/atproto" 8 + lexutil "github.com/bluesky-social/indigo/lex/util" 9 + "github.com/bluesky-social/indigo/repo" 8 10 ) 9 11 10 12 // CreateLayerRecord creates a new layer record in the hold's PDS ··· 57 59 // not for runtime queries 58 60 return nil, "", fmt.Errorf("ListLayerRecords not yet implemented") 59 61 } 62 + 63 + // QuotaStats represents storage quota information for a user 64 + type QuotaStats struct { 65 + UserDID string `json:"userDid"` 66 + UniqueBlobs int `json:"uniqueBlobs"` 67 + TotalSize int64 `json:"totalSize"` 68 + } 69 + 70 + // GetQuotaForUser calculates storage quota for a specific user 71 + // It iterates through all layer records, filters by userDid, deduplicates by digest, 72 + // and sums the sizes of unique blobs. 73 + func (p *HoldPDS) GetQuotaForUser(ctx context.Context, userDID string) (*QuotaStats, error) { 74 + if p.recordsIndex == nil { 75 + return nil, fmt.Errorf("records index not available") 76 + } 77 + 78 + // Get session for reading record data 79 + session, err := p.carstore.ReadOnlySession(p.uid) 80 + if err != nil { 81 + return nil, fmt.Errorf("failed to create session: %w", err) 82 + } 83 + 84 + head, err := p.carstore.GetUserRepoHead(ctx, p.uid) 85 + if err != nil { 86 + return nil, fmt.Errorf("failed to get repo head: %w", err) 87 + } 88 + 89 + if !head.Defined() { 90 + // Empty repo - return zero stats 91 + return &QuotaStats{UserDID: userDID}, nil 92 + } 93 + 94 + repoHandle, err := repo.OpenRepo(ctx, session, head) 95 + if err != nil { 96 + return nil, fmt.Errorf("failed to open repo: %w", err) 97 + } 98 + 99 + // Track unique digests and their sizes 100 + digestSizes := make(map[string]int64) 101 + 102 + // Iterate all layer records via the index 103 + cursor := "" 104 + batchSize := 1000 // Process in batches 105 + 106 + for { 107 + records, nextCursor, err := p.recordsIndex.ListRecords(atproto.LayerCollection, batchSize, cursor, true) 108 + if err != nil { 109 + return nil, fmt.Errorf("failed to list layer records: %w", err) 110 + } 111 + 112 + for _, rec := range records { 113 + // Construct record path and get the record data 114 + recordPath := rec.Collection + "/" + rec.Rkey 115 + 116 + _, recBytes, err := repoHandle.GetRecordBytes(ctx, recordPath) 117 + if err != nil { 118 + // Skip records we can't read 119 + continue 120 + } 121 + 122 + // Decode the layer record 123 + recordValue, err := lexutil.CborDecodeValue(*recBytes) 124 + if err != nil { 125 + continue 126 + } 127 + 128 + layerRecord, ok := recordValue.(*atproto.LayerRecord) 129 + if !ok { 130 + continue 131 + } 132 + 133 + // Filter by userDID 134 + if layerRecord.UserDID != userDID { 135 + continue 136 + } 137 + 138 + // Deduplicate by digest - keep the size (could be different pushes of same blob) 139 + // Store the size - we only count each unique digest once 140 + if _, exists := digestSizes[layerRecord.Digest]; !exists { 141 + digestSizes[layerRecord.Digest] = layerRecord.Size 142 + } 143 + } 144 + 145 + if nextCursor == "" { 146 + break 147 + } 148 + cursor = nextCursor 149 + } 150 + 151 + // Calculate totals 152 + var totalSize int64 153 + for _, size := range digestSizes { 154 + totalSize += size 155 + } 156 + 157 + return &QuotaStats{ 158 + UserDID: userDID, 159 + UniqueBlobs: len(digestSizes), 160 + TotalSize: totalSize, 161 + }, nil 162 + }
+62 -73
pkg/hold/pds/layer_test.go
··· 22 22 "sha256:abc123def456", 23 23 1048576, // 1 MB 24 24 "application/vnd.oci.image.layer.v1.tar+gzip", 25 - "myapp", 26 25 "did:plc:alice123", 27 - "alice.bsky.social", 26 + "at://did:plc:alice123/io.atcr.manifest/abc123def456", 28 27 ), 29 28 wantErr: false, 30 29 }, ··· 34 33 "sha256:fedcba987654", 35 34 1073741824, // 1 GB 36 35 "application/vnd.docker.image.rootfs.diff.tar.gzip", 37 - "debian", 38 36 "did:plc:bob456", 39 - "bob.example.com", 37 + "at://did:plc:bob456/io.atcr.manifest/fedcba987654", 40 38 ), 41 39 wantErr: false, 42 40 }, 43 41 { 44 42 name: "invalid record type", 45 43 record: &atproto.LayerRecord{ 46 - Type: "wrong.type", 47 - Digest: "sha256:abc123", 48 - Size: 1024, 49 - MediaType: "application/vnd.oci.image.layer.v1.tar", 50 - Repository: "test", 51 - UserDID: "did:plc:test", 52 - UserHandle: "test.example.com", 44 + Type: "wrong.type", 45 + Digest: "sha256:abc123", 46 + Size: 1024, 47 + MediaType: "application/vnd.oci.image.layer.v1.tar", 48 + Manifest: "at://did:plc:test/io.atcr.manifest/abc123", 49 + UserDID: "did:plc:test", 53 50 }, 54 51 wantErr: true, 55 52 errSubstr: "invalid record type", ··· 57 54 { 58 55 name: "missing digest", 59 56 record: &atproto.LayerRecord{ 60 - Type: atproto.LayerCollection, 61 - Digest: "", 62 - Size: 1024, 63 - MediaType: "application/vnd.oci.image.layer.v1.tar", 64 - Repository: "test", 65 - UserDID: "did:plc:test", 66 - UserHandle: "test.example.com", 57 + Type: atproto.LayerCollection, 58 + Digest: "", 59 + Size: 1024, 60 + MediaType: "application/vnd.oci.image.layer.v1.tar", 61 + Manifest: "at://did:plc:test/io.atcr.manifest/abc123", 62 + UserDID: "did:plc:test", 67 63 }, 68 64 wantErr: true, 69 65 errSubstr: "digest is required", ··· 71 67 { 72 68 name: "zero size", 73 69 record: &atproto.LayerRecord{ 74 - Type: atproto.LayerCollection, 75 - Digest: "sha256:abc123", 76 - Size: 0, 77 - MediaType: "application/vnd.oci.image.layer.v1.tar", 78 - Repository: "test", 79 - UserDID: "did:plc:test", 80 - UserHandle: "test.example.com", 70 + Type: atproto.LayerCollection, 71 + Digest: "sha256:abc123", 72 + Size: 0, 73 + MediaType: "application/vnd.oci.image.layer.v1.tar", 74 + Manifest: "at://did:plc:test/io.atcr.manifest/abc123", 75 + UserDID: "did:plc:test", 81 76 }, 82 77 wantErr: true, 83 78 errSubstr: "size must be positive", ··· 85 80 { 86 81 name: "negative size", 87 82 record: &atproto.LayerRecord{ 88 - Type: atproto.LayerCollection, 89 - Digest: "sha256:abc123", 90 - Size: -1, 91 - MediaType: "application/vnd.oci.image.layer.v1.tar", 92 - Repository: "test", 93 - UserDID: "did:plc:test", 94 - UserHandle: "test.example.com", 83 + Type: atproto.LayerCollection, 84 + Digest: "sha256:abc123", 85 + Size: -1, 86 + MediaType: "application/vnd.oci.image.layer.v1.tar", 87 + Manifest: "at://did:plc:test/io.atcr.manifest/abc123", 88 + UserDID: "did:plc:test", 95 89 }, 96 90 wantErr: true, 97 91 errSubstr: "size must be positive", ··· 134 128 func TestCreateLayerRecord_MultipleRecords(t *testing.T) { 135 129 // Test creating multiple layer records for the same manifest 136 130 pds, ctx := setupTestPDS(t) 131 + 132 + manifestURI := "at://did:plc:test123/io.atcr.manifest/manifestabc123" 137 133 138 134 layers := []struct { 139 135 digest string ··· 151 147 layer.digest, 152 148 layer.size, 153 149 "application/vnd.oci.image.layer.v1.tar+gzip", 154 - "multi-layer-app", 155 150 "did:plc:test123", 156 - "test.example.com", 151 + manifestURI, 157 152 ) 158 153 159 154 rkey, cid, err := pds.CreateLayerRecord(ctx, record) ··· 180 175 digest := "sha256:abc123def456" 181 176 size := int64(1048576) 182 177 mediaType := "application/vnd.oci.image.layer.v1.tar+gzip" 183 - repository := "myapp" 184 178 userDID := "did:plc:alice123" 185 - userHandle := "alice.bsky.social" 179 + manifestURI := "at://did:plc:alice123/io.atcr.manifest/abc123def456" 186 180 187 - record := atproto.NewLayerRecord(digest, size, mediaType, repository, userDID, userHandle) 181 + record := atproto.NewLayerRecord(digest, size, mediaType, userDID, manifestURI) 188 182 189 183 if record == nil { 190 184 t.Fatal("NewLayerRecord() returned nil") ··· 207 201 t.Errorf("MediaType = %q, want %q", record.MediaType, mediaType) 208 202 } 209 203 210 - if record.Repository != repository { 211 - t.Errorf("Repository = %q, want %q", record.Repository, repository) 204 + if record.Manifest != manifestURI { 205 + t.Errorf("Manifest = %q, want %q", record.Manifest, manifestURI) 212 206 } 213 207 214 208 if record.UserDID != userDID { 215 209 t.Errorf("UserDID = %q, want %q", record.UserDID, userDID) 216 - } 217 - 218 - if record.UserHandle != userHandle { 219 - t.Errorf("UserHandle = %q, want %q", record.UserHandle, userHandle) 220 210 } 221 211 222 212 if record.CreatedAt == "" { ··· 229 219 func TestLayerRecord_FieldValidation(t *testing.T) { 230 220 // Test various field values 231 221 tests := []struct { 232 - name string 233 - digest string 234 - size int64 235 - mediaType string 236 - repository string 237 - userDID string 238 - userHandle string 222 + name string 223 + digest string 224 + size int64 225 + mediaType string 226 + userDID string 227 + manifestURI string 239 228 }{ 240 229 { 241 - name: "typical OCI layer", 242 - digest: "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f", 243 - size: 12582912, // 12 MB 244 - mediaType: "application/vnd.oci.image.layer.v1.tar+gzip", 245 - repository: "hsm-secrets-operator", 246 - userDID: "did:plc:evan123", 247 - userHandle: "evan.jarrett.net", 230 + name: "typical OCI layer", 231 + digest: "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f", 232 + size: 12582912, // 12 MB 233 + mediaType: "application/vnd.oci.image.layer.v1.tar+gzip", 234 + userDID: "did:plc:evan123", 235 + manifestURI: "at://did:plc:evan123/io.atcr.manifest/abc123", 248 236 }, 249 237 { 250 - name: "Docker layer format", 251 - digest: "sha256:abc123", 252 - size: 1024, 253 - mediaType: "application/vnd.docker.image.rootfs.diff.tar.gzip", 254 - repository: "nginx", 255 - userDID: "did:plc:user456", 256 - userHandle: "user.example.com", 238 + name: "Docker layer format", 239 + digest: "sha256:abc123", 240 + size: 1024, 241 + mediaType: "application/vnd.docker.image.rootfs.diff.tar.gzip", 242 + userDID: "did:plc:user456", 243 + manifestURI: "at://did:plc:user456/io.atcr.manifest/def456", 257 244 }, 258 245 { 259 - name: "uncompressed layer", 260 - digest: "sha256:def456", 261 - size: 2048, 262 - mediaType: "application/vnd.oci.image.layer.v1.tar", 263 - repository: "alpine", 264 - userDID: "did:plc:user789", 265 - userHandle: "user.bsky.social", 246 + name: "uncompressed layer", 247 + digest: "sha256:def456", 248 + size: 2048, 249 + mediaType: "application/vnd.oci.image.layer.v1.tar", 250 + userDID: "did:plc:user789", 251 + manifestURI: "at://did:plc:user789/io.atcr.manifest/ghi789", 266 252 }, 267 253 } 268 254 ··· 272 258 tt.digest, 273 259 tt.size, 274 260 tt.mediaType, 275 - tt.repository, 276 261 tt.userDID, 277 - tt.userHandle, 262 + tt.manifestURI, 278 263 ) 279 264 280 265 if record == nil { ··· 288 273 289 274 if record.Digest != tt.digest { 290 275 t.Errorf("Digest = %q, want %q", record.Digest, tt.digest) 276 + } 277 + 278 + if record.Manifest != tt.manifestURI { 279 + t.Errorf("Manifest = %q, want %q", record.Manifest, tt.manifestURI) 291 280 } 292 281 }) 293 282 }
+31
pkg/hold/pds/xrpc.go
··· 192 192 r.Use(h.requireAuth) 193 193 r.Post(atproto.HoldRequestCrew, h.HandleRequestCrew) 194 194 }) 195 + 196 + // Public quota endpoint (no auth - quota is per-user, just needs userDid param) 197 + r.Get(atproto.HoldGetQuota, h.HandleGetQuota) 195 198 } 196 199 197 200 // HandleHealth returns health check information ··· 1513 1516 // Clients should use multipart upload flow via com.atproto.repo.uploadBlob 1514 1517 return "" 1515 1518 } 1519 + 1520 + // HandleGetQuota returns storage quota information for a user 1521 + // This calculates the total unique blob storage used by a specific user 1522 + // by iterating layer records and deduplicating by digest. 1523 + func (h *XRPCHandler) HandleGetQuota(w http.ResponseWriter, r *http.Request) { 1524 + userDID := r.URL.Query().Get("userDid") 1525 + if userDID == "" { 1526 + http.Error(w, "missing required parameter: userDid", http.StatusBadRequest) 1527 + return 1528 + } 1529 + 1530 + // Validate DID format 1531 + if !atproto.IsDID(userDID) { 1532 + http.Error(w, "invalid userDid format", http.StatusBadRequest) 1533 + return 1534 + } 1535 + 1536 + // Get quota stats 1537 + stats, err := h.pds.GetQuotaForUser(r.Context(), userDID) 1538 + if err != nil { 1539 + slog.Error("Failed to get quota", "userDid", userDID, "error", err) 1540 + http.Error(w, fmt.Sprintf("failed to get quota: %v", err), http.StatusInternalServerError) 1541 + return 1542 + } 1543 + 1544 + w.Header().Set("Content-Type", "application/json") 1545 + json.NewEncoder(w).Encode(stats) 1546 + }