prefect server in zig

add concurrency limits implementation plan

documents v1 vs v2 architecture, migration story, and our v2-only
strategy. includes database schema, api endpoints, slot math, lease
storage design, and phased implementation plan.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+209
+209
docs/concurrency-limits.md
··· 1 + # concurrency limits implementation plan 2 + 3 + ## background 4 + 5 + python prefect has two concurrency limit APIs: 6 + 7 + ### v1 (legacy, `/api/concurrency_limits/`) 8 + - tag-based: limits are identified by tag name 9 + - `active_slots` stored as list of task_run_ids directly on the limit record 10 + - no TTL - slots held forever until explicitly released 11 + - **problem**: crashed tasks never release slots, causing deadlocks 12 + 13 + ### v2 (current, `/api/v2/concurrency_limits/`) 14 + - named limits with lease-based slot management 15 + - `active_slots` is just a counter; actual holder info in separate lease storage 16 + - leases have TTL and auto-expire (solves the crash problem) 17 + - supports `slot_decay_per_second` for rate limiting 18 + - tracks `avg_slot_occupancy_seconds` for smart retry-after calculation 19 + - tracks `denied_slots` for detecting contention 20 + 21 + ### migration story 22 + python prefect migrated v1 to v2 in `9e83011d1f2a_migrate_v1_concurrency_limits_to_v2.py`: 23 + - v1 limits converted to v2 with name `tag:{tag_name}` 24 + - v1 API became an adapter that routes to v2 under the hood 25 + - v1 creates leases with ~100 year TTL for backward compatibility 26 + 27 + ## our strategy: v2 only 28 + 29 + since we have no legacy to support, we implement **only the v2 API**. benefits: 30 + - simpler codebase (one system, not two) 31 + - no adapter complexity 32 + - tag-based concurrency still works via naming convention (`tag:foo`) 33 + 34 + if older clients need v1 compatibility later, we can add thin adapters. 35 + 36 + ## database schema 37 + 38 + ```sql 39 + CREATE TABLE concurrency_limit ( 40 + id TEXT PRIMARY KEY, 41 + created TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 42 + updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 43 + name TEXT NOT NULL UNIQUE, 44 + limit INTEGER NOT NULL, 45 + active BOOLEAN DEFAULT TRUE, 46 + active_slots INTEGER DEFAULT 0, 47 + denied_slots INTEGER DEFAULT 0, 48 + slot_decay_per_second REAL DEFAULT 0.0, 49 + avg_slot_occupancy_seconds REAL DEFAULT 2.0 50 + ); 51 + 52 + CREATE TABLE concurrency_lease ( 53 + id TEXT PRIMARY KEY, 54 + created TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 55 + expiration TIMESTAMP NOT NULL, 56 + -- JSON array of concurrency_limit IDs this lease holds slots for 57 + resource_ids TEXT NOT NULL, 58 + -- JSON: {slots: int, holder: {type: "task_run"|"flow_run"|"deployment", id: uuid}} 59 + metadata TEXT 60 + ); 61 + 62 + CREATE INDEX ix_concurrency_lease_expiration ON concurrency_lease(expiration); 63 + ``` 64 + 65 + ## api endpoints 66 + 67 + ### crud 68 + - `POST /api/v2/concurrency_limits/` - create limit 69 + - `GET /api/v2/concurrency_limits/{id_or_name}` - read by id or name 70 + - `POST /api/v2/concurrency_limits/filter` - list with pagination 71 + - `PATCH /api/v2/concurrency_limits/{id_or_name}` - update 72 + - `DELETE /api/v2/concurrency_limits/{id_or_name}` - delete 73 + 74 + ### slot management (simple - no lease) 75 + - `POST /api/v2/concurrency_limits/increment` - acquire slots 76 + - `POST /api/v2/concurrency_limits/decrement` - release slots 77 + 78 + ### slot management (with lease) 79 + - `POST /api/v2/concurrency_limits/increment-with-lease` - acquire with lease 80 + - `POST /api/v2/concurrency_limits/decrement-with-lease` - release by lease_id 81 + - `POST /api/v2/concurrency_limits/leases/{lease_id}/renew` - extend lease TTL 82 + 83 + ## increment logic (the tricky part) 84 + 85 + the increment operation must be atomic and handle slot decay: 86 + 87 + ```sql 88 + -- Calculate active_slots after decay (for rate limits) 89 + -- active_slots_after_decay = MAX(0, active_slots - FLOOR(slot_decay_per_second * seconds_since_update)) 90 + 91 + -- Atomically check and increment 92 + UPDATE concurrency_limit 93 + SET 94 + active_slots = active_slots_after_decay + :requested_slots, 95 + denied_slots = denied_slots_after_decay, 96 + updated = CURRENT_TIMESTAMP 97 + WHERE 98 + id IN (:limit_ids) 99 + AND active = TRUE 100 + AND active_slots_after_decay + :requested_slots <= limit; 101 + 102 + -- If rowcount != len(limit_ids), acquisition failed (at capacity) 103 + ``` 104 + 105 + on failure (423 Locked): 106 + - increment `denied_slots` for each limit 107 + - calculate `Retry-After` header based on `avg_slot_occupancy_seconds` or decay rate 108 + 109 + ## decrement logic 110 + 111 + ```sql 112 + UPDATE concurrency_limit 113 + SET 114 + active_slots = MAX(0, active_slots_after_decay - :slots), 115 + denied_slots = denied_slots_after_decay, 116 + -- Update avg_slot_occupancy_seconds as weighted average 117 + avg_slot_occupancy_seconds = avg_slot_occupancy_seconds 118 + + (:occupancy_seconds / :slots / (limit * 2)) 119 + - (avg_slot_occupancy_seconds / (limit * 2)), 120 + updated = CURRENT_TIMESTAMP 121 + WHERE id IN (:limit_ids); 122 + ``` 123 + 124 + ## lease storage 125 + 126 + start with in-memory (like python's default): 127 + 128 + ```zig 129 + const Lease = struct { 130 + id: []const u8, 131 + resource_ids: []const []const u8, // concurrency_limit IDs 132 + expiration: i64, // microseconds since epoch 133 + metadata: struct { 134 + slots: i64, 135 + holder: ?struct { 136 + type: []const u8, // "task_run", "flow_run", "deployment" 137 + id: []const u8, 138 + }, 139 + }, 140 + }; 141 + 142 + // Global lease storage (protected by mutex) 143 + var leases: std.StringHashMap(Lease) = ...; 144 + var expirations: std.StringHashMap(i64) = ...; // lease_id -> expiration 145 + ``` 146 + 147 + later can add: 148 + - filesystem storage (persist across restarts) 149 + - redis storage (distributed) 150 + - database storage (concurrency_lease table) 151 + 152 + ## background service: lease_cleanup 153 + 154 + runs periodically to clean up expired leases: 155 + 156 + ``` 157 + 1. find expired leases: SELECT id, resource_ids, metadata FROM concurrency_lease WHERE expiration < NOW() 158 + 2. for each expired lease: 159 + a. decrement active_slots for each resource_id 160 + b. delete the lease 161 + 3. sleep for cleanup_interval (e.g., 30 seconds) 162 + ``` 163 + 164 + this prevents slot leakage when tasks crash without releasing. 165 + 166 + ## implementation phases 167 + 168 + ### phase 1: core infrastructure 169 + - [ ] database schema (concurrency_limit table) 170 + - [ ] db module: `src/db/concurrency_limits.zig` 171 + - [ ] basic CRUD operations 172 + 173 + ### phase 2: slot management (no leases) 174 + - [ ] increment endpoint with atomic slot acquisition 175 + - [ ] decrement endpoint with occupancy tracking 176 + - [ ] slot decay calculation 177 + - [ ] retry-after calculation 178 + 179 + ### phase 3: lease support 180 + - [ ] in-memory lease storage 181 + - [ ] increment-with-lease endpoint 182 + - [ ] decrement-with-lease endpoint 183 + - [ ] lease renewal endpoint 184 + - [ ] lease_cleanup background service 185 + 186 + ### phase 4: orchestration integration 187 + - [ ] task run orchestration rule to check concurrency limits 188 + - [ ] integration with flow_runs.zig for flow-level concurrency 189 + 190 + ### phase 5 (optional): v1 compatibility 191 + - [ ] adapter endpoints for `/api/concurrency_limits/` 192 + - [ ] tag: prefix convention handling 193 + 194 + ## testing strategy 195 + 196 + 1. **unit tests**: slot math, decay calculation, retry-after calculation 197 + 2. **api tests**: add to test-api-sequence 198 + - create/read/update/delete limits 199 + - increment/decrement with capacity checks 200 + - lease lifecycle (acquire, renew, expire) 201 + - concurrent access (multiple workers) 202 + 3. **integration tests**: task run orchestration with concurrency limits 203 + 204 + ## reference files (python) 205 + 206 + - `src/prefect/server/api/concurrency_limits_v2.py` - API routes 207 + - `src/prefect/server/models/concurrency_limits_v2.py` - DB operations 208 + - `src/prefect/server/concurrency/lease_storage/` - lease implementations 209 + - `src/prefect/server/database/orm_models.py` - ConcurrencyLimitV2 model