prefect server in zig

fix concurrency plan: lease storage separate from database

matches python architecture - database stores limit config/counters,
lease storage (memory/filesystem/redis) handles transient lease state.
removes concurrency_lease table, adds proper lease storage abstraction.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+102 -41
+102 -41
docs/concurrency-limits.md
··· 35 35 36 36 ## database schema 37 37 38 + the database only stores limit configuration and counters - NOT leases. 39 + 38 40 ```sql 39 41 CREATE TABLE concurrency_limit ( 40 42 id TEXT PRIMARY KEY, ··· 49 51 avg_slot_occupancy_seconds REAL DEFAULT 2.0 50 52 ); 51 53 52 - CREATE TABLE concurrency_lease ( 53 - id TEXT PRIMARY KEY, 54 - created TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 55 - expiration TIMESTAMP NOT NULL, 56 - -- JSON array of concurrency_limit IDs this lease holds slots for 57 - resource_ids TEXT NOT NULL, 58 - -- JSON: {slots: int, holder: {type: "task_run"|"flow_run"|"deployment", id: uuid}} 59 - metadata TEXT 60 - ); 54 + CREATE UNIQUE INDEX uq_concurrency_limit_name ON concurrency_limit(name); 55 + ``` 61 56 62 - CREATE INDEX ix_concurrency_lease_expiration ON concurrency_lease(expiration); 63 - ``` 57 + leases are stored separately in lease storage (see below). 64 58 65 59 ## api endpoints 66 60 ··· 123 117 124 118 ## lease storage 125 119 126 - start with in-memory (like python's default): 120 + leases are stored **separately from the database**, matching python's architecture: 121 + 122 + ``` 123 + python: prefect.server.concurrency.lease_storage/ 124 + ├── __init__.py # ConcurrencyLeaseStorage protocol 125 + ├── memory.py # in-memory singleton (default) 126 + └── filesystem.py # file-based persistence 127 + 128 + zig: src/concurrency/ 129 + ├── lease_storage.zig # LeaseStorage interface + memory implementation 130 + └── (future: filesystem.zig, redis.zig) 131 + ``` 132 + 133 + ### why separate from database? 134 + - leases are high-frequency operations (acquire/release/renew) 135 + - ephemeral by design - TTL handles cleanup 136 + - database stores durable config; lease storage handles transient state 137 + - if server crashes, `active_slots` counter may drift but lease expiration handles cleanup 138 + 139 + ### lease storage interface 127 140 128 141 ```zig 129 - const Lease = struct { 142 + pub const ResourceLease = struct { 130 143 id: []const u8, 144 + created_at: i64, // microseconds 145 + expiration: i64, // microseconds 131 146 resource_ids: []const []const u8, // concurrency_limit IDs 132 - expiration: i64, // microseconds since epoch 133 - metadata: struct { 134 - slots: i64, 135 - holder: ?struct { 136 - type: []const u8, // "task_run", "flow_run", "deployment" 137 - id: []const u8, 138 - }, 139 - }, 147 + metadata: ?LeaseMetadata, 148 + }; 149 + 150 + pub const LeaseMetadata = struct { 151 + slots: i64, 152 + holder: ?LeaseHolder, 153 + }; 154 + 155 + pub const LeaseHolder = struct { 156 + type: []const u8, // "task_run", "flow_run", "deployment" 157 + id: []const u8, 158 + }; 159 + 160 + pub const LeaseStorage = struct { 161 + // required methods 162 + createLease: fn(resource_ids: [][]const u8, ttl_ms: i64, metadata: ?LeaseMetadata) !ResourceLease, 163 + readLease: fn(lease_id: []const u8) ?ResourceLease, 164 + renewLease: fn(lease_id: []const u8, ttl_ms: i64) !bool, 165 + revokeLease: fn(lease_id: []const u8) void, 166 + 167 + // query methods 168 + readActiveLeaseIds: fn(limit: usize, offset: usize) [][]const u8, 169 + readExpiredLeaseIds: fn(limit: usize) [][]const u8, 170 + listHoldersForLimit: fn(limit_id: []const u8) []struct { lease_id: []const u8, holder: LeaseHolder }, 140 171 }; 172 + ``` 141 173 142 - // Global lease storage (protected by mutex) 143 - var leases: std.StringHashMap(Lease) = ...; 144 - var expirations: std.StringHashMap(i64) = ...; // lease_id -> expiration 174 + ### memory implementation (default) 175 + 176 + ```zig 177 + // singleton, protected by mutex 178 + var leases: std.StringHashMap(ResourceLease) = ...; 179 + var expirations: std.StringHashMap(i64) = ...; // lease_id -> expiration_micros 180 + 181 + pub fn createLease(...) !ResourceLease { 182 + const id = uuid_util.generate(&buf); 183 + const now = time_util.nowMicros(); 184 + const lease = ResourceLease{ 185 + .id = id, 186 + .created_at = now, 187 + .expiration = now + ttl_ms * 1000, 188 + .resource_ids = resource_ids, 189 + .metadata = metadata, 190 + }; 191 + leases.put(id, lease); 192 + expirations.put(id, lease.expiration); 193 + return lease; 194 + } 195 + 196 + pub fn readExpiredLeaseIds(limit: usize) [][]const u8 { 197 + const now = time_util.nowMicros(); 198 + // return lease_ids where expiration < now 199 + } 145 200 ``` 146 201 147 - later can add: 148 - - filesystem storage (persist across restarts) 149 - - redis storage (distributed) 150 - - database storage (concurrency_lease table) 202 + ### future backends 203 + - **filesystem**: persist to JSON files (survives restart) 204 + - **redis**: use our existing broker infrastructure for distributed lease storage 151 205 152 206 ## background service: lease_cleanup 153 207 ··· 166 220 ## implementation phases 167 221 168 222 ### phase 1: core infrastructure 169 - - [ ] database schema (concurrency_limit table) 223 + - [ ] database schema (concurrency_limit table only - no lease table) 170 224 - [ ] db module: `src/db/concurrency_limits.zig` 171 225 - [ ] basic CRUD operations 226 + - [ ] api module: `src/api/concurrency_limits_v2.zig` 172 227 173 - ### phase 2: slot management (no leases) 174 - - [ ] increment endpoint with atomic slot acquisition 175 - - [ ] decrement endpoint with occupancy tracking 176 - - [ ] slot decay calculation 177 - - [ ] retry-after calculation 228 + ### phase 2: lease storage 229 + - [ ] lease storage module: `src/concurrency/lease_storage.zig` 230 + - [ ] in-memory implementation (singleton with mutex) 231 + - [ ] ResourceLease, LeaseMetadata, LeaseHolder types 178 232 179 - ### phase 3: lease support 180 - - [ ] in-memory lease storage 181 - - [ ] increment-with-lease endpoint 182 - - [ ] decrement-with-lease endpoint 233 + ### phase 3: slot management 234 + - [ ] increment endpoint (no lease) - atomic slot acquisition with decay 235 + - [ ] increment-with-lease endpoint - creates lease after acquisition 236 + - [ ] decrement endpoint (no lease) - releases slots, updates avg_occupancy 237 + - [ ] decrement-with-lease endpoint - finds lease, releases, revokes 183 238 - [ ] lease renewal endpoint 184 - - [ ] lease_cleanup background service 239 + - [ ] retry-after calculation (slot_decay or avg_occupancy based) 240 + 241 + ### phase 4: background service 242 + - [ ] lease_cleanup service: `src/services/lease_cleanup.zig` 243 + - [ ] finds expired leases, decrements active_slots, revokes leases 244 + - [ ] runs on interval (e.g., 30 seconds) 185 245 186 - ### phase 4: orchestration integration 246 + ### phase 5: orchestration integration 187 247 - [ ] task run orchestration rule to check concurrency limits 188 248 - [ ] integration with flow_runs.zig for flow-level concurrency 189 249 190 - ### phase 5 (optional): v1 compatibility 250 + ### phase 6 (optional): v1 compatibility 191 251 - [ ] adapter endpoints for `/api/concurrency_limits/` 192 252 - [ ] tag: prefix convention handling 253 + - [ ] 100-year TTL leases for v1 compatibility 193 254 194 255 ## testing strategy 195 256