prefect server in zig

refactor: lease storage - memory default, redis for HA

leases are NEVER stored in the database (matches Python behavior):
- memory storage: default for single-instance deployments
- redis storage: when PREFECT_BROKER_BACKEND=redis (HA)

moved from src/concurrency/lease_storage.zig to src/leases/:
- memory.zig: in-memory lease storage
- redis.zig: redis-backed lease storage
- storage.zig: unified interface

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+1023 -265
+12 -5
ROADMAP.md
··· 86 86 - [ ] POST /api/block_capabilities/ 87 87 88 88 ### concurrency (v2 only - skip v1 API) 89 - - [ ] POST /api/v2/concurrency_limits/ 90 - - [ ] POST /api/v2/concurrency_limits/filter 91 - - [ ] lease-based slot management (increment/decrement with lease, renew, TTL) 89 + - [x] POST /api/v2/concurrency_limits/ 90 + - [x] POST /api/v2/concurrency_limits/filter 91 + - [x] GET /api/v2/concurrency_limits/{id} 92 + - [x] DELETE /api/v2/concurrency_limits/{id} 93 + - [x] POST /api/v2/concurrency_limits/increment (acquire slots, returns lease_id) 94 + - [x] POST /api/v2/concurrency_limits/decrement (release slots by limit names) 95 + - [x] lease storage (memory default, redis for HA - matches Python) 96 + - [x] lease_cleanup background service (expires stale leases) 92 97 - note: v1 tag-based concurrency was reconstituted to use v2 in python prefect. 93 98 we should only implement the v2 API to avoid the complexity of supporting both. 94 99 ··· 152 157 - [ ] RetryFailedTasks - govern retry transitions 153 158 154 159 ### other 155 - - [ ] concurrency limits (v2 only) 160 + - [x] concurrency limits (v2 only) 156 161 157 162 ## background services 158 163 159 164 - [x] event_persister (batched event writes, deduplication, retention trimming) 160 165 - [x] event_broadcaster (websocket fan-out to /api/events/out subscribers) 161 166 - [x] scheduler (create flow runs from deployment schedules - interval + cron) 167 + - [x] lease_cleanup (expire stale concurrency leases) 162 168 - [ ] late_runs (mark runs as late) 163 169 - [ ] foreman (infrastructure management) 164 170 - [ ] cancellation_cleanup (clean up cancelled runs) ··· 189 195 - [x] block_type table 190 196 - [x] block_document table 191 197 - [x] block_schema table 192 - - [ ] concurrency_limit table 198 + - [x] concurrency_limit table 193 199 - [ ] artifact table 194 200 - [ ] automation table 195 201 - [x] variable table ··· 264 270 - scheduler service (interval + cron, idempotent, parameter merging) 265 271 - get_scheduled_flow_runs (worker polling) 266 272 - events (ingest via websocket, persist, broadcast with filtered backfill) 273 + - concurrency limits v2 (CRUD, lease-based slots, memory/redis storage) 267 274 - dual database backends (sqlite/postgres) 268 275 - dual message brokers (memory/redis)
+45 -36
src/api/concurrency_limits_v2.zig
··· 7 7 const uuid_util = @import("../utilities/uuid.zig"); 8 8 const time_util = @import("../utilities/time.zig"); 9 9 const json_util = @import("../utilities/json.zig"); 10 - const lease_storage = @import("../concurrency/lease_storage.zig"); 10 + const lease_storage = @import("../leases/storage.zig"); 11 + // Leases use Redis (when PREFECT_BROKER_BACKEND=redis) or database storage 11 12 12 13 pub fn handle(r: zap.Request) !void { 13 14 const target = r.path orelse "/"; ··· 442 443 var lease_id: ?[]const u8 = null; 443 444 444 445 if (mem.eql(u8, mode, "concurrency")) { 445 - const storage = lease_storage.getLeaseStorage(alloc); 446 + const storage = lease_storage.get(); 446 447 const lease = storage.createLease( 448 + alloc, 447 449 acquired_ids.items, 448 450 ttl_seconds, 449 - .{ .slots = slots }, 451 + slots, 452 + null, // holder_type 453 + null, // holder_id 450 454 ) catch { 451 455 // Failed to create lease - rollback slot acquisition 452 456 for (acquired_limits.items) |lim| { ··· 537 541 if (obj.get("lease_id")) |lease_val| { 538 542 if (lease_val == .string) { 539 543 const lease_id = lease_val.string; 540 - const storage = lease_storage.getLeaseStorage(alloc); 544 + const storage = lease_storage.get(); 541 545 542 546 // Read the lease to get resource_ids and slots 543 - if (storage.readLease(lease_id)) |lease| { 547 + if (try storage.getById(alloc, lease_id)) |lease| { 544 548 // Calculate occupancy from lease creation time 545 - const occupancy = storage.calculateOccupancy(lease_id); 549 + const occupancy = lease_storage.LeaseStorage.calculateOccupancy(lease); 546 550 547 551 // Release slots for each resource in the lease 548 552 for (lease.resource_ids) |resource_id| { ··· 550 554 } 551 555 552 556 // Revoke the lease 553 - storage.revokeLease(lease_id); 557 + _ = try storage.revokeLease(lease_id); 554 558 555 559 r.setStatus(.ok); 556 560 r.setHeader("content-type", "application/json") catch {}; ··· 672 676 } else |_| {} 673 677 } 674 678 675 - const storage = lease_storage.getLeaseStorage(alloc); 676 - const lease_ids = try storage.readActiveLeaseIds(alloc, limit, offset); 679 + const storage = lease_storage.get(); 680 + const leases = try storage.getActiveLeases(alloc, limit, offset); 677 681 678 682 var output = std.ArrayListUnmanaged(u8){}; 679 683 try output.append(alloc, '['); 680 684 681 - for (lease_ids, 0..) |lease_id, i| { 685 + for (leases, 0..) |lease, i| { 682 686 if (i > 0) try output.append(alloc, ','); 683 687 684 - if (storage.readLease(lease_id)) |lease| { 685 - try output.writer(alloc).print( 686 - \\{{"id":"{s}","slots":{d},"expiration_us":{d},"created_at_us":{d} 687 - , .{ lease.id, lease.slots, lease.expiration_us, lease.created_at_us }); 688 - 689 - // Add resource_ids array 690 - try output.appendSlice(alloc, ",\"resource_ids\":["); 691 - for (lease.resource_ids, 0..) |rid, j| { 692 - if (j > 0) try output.append(alloc, ','); 693 - try output.writer(alloc).print("\"{s}\"", .{rid}); 694 - } 695 - try output.append(alloc, ']'); 688 + // Build resource_ids JSON array 689 + var rids_json = std.ArrayListUnmanaged(u8){}; 690 + try rids_json.append(alloc, '['); 691 + for (lease.resource_ids, 0..) |rid, j| { 692 + if (j > 0) try rids_json.append(alloc, ','); 693 + try rids_json.append(alloc, '"'); 694 + try rids_json.appendSlice(alloc, rid); 695 + try rids_json.append(alloc, '"'); 696 + } 697 + try rids_json.append(alloc, ']'); 696 698 697 - // Add optional holder info 698 - if (lease.holder_type) |ht| { 699 - try output.writer(alloc).print(",\"holder_type\":\"{s}\"", .{ht}); 700 - } 701 - if (lease.holder_id) |hi| { 702 - try output.writer(alloc).print(",\"holder_id\":\"{s}\"", .{hi}); 703 - } 699 + try output.writer(alloc).print( 700 + \\{{"id":"{s}","slots":{d},"expiration_us":{d},"created_at_us":{d},"resource_ids":{s} 701 + , .{ lease.id, lease.slots, lease.expiration_us, lease.created_at_us, rids_json.items }); 704 702 705 - try output.append(alloc, '}'); 703 + // Add optional holder info 704 + if (lease.holder_type) |ht| { 705 + try output.writer(alloc).print(",\"holder_type\":\"{s}\"", .{ht}); 706 + } 707 + if (lease.holder_id) |hi| { 708 + try output.writer(alloc).print(",\"holder_id\":\"{s}\"", .{hi}); 706 709 } 710 + 711 + try output.append(alloc, '}'); 707 712 } 708 713 709 714 try output.append(alloc, ']'); ··· 743 748 else => 30.0, 744 749 } else 30.0; 745 750 746 - const storage = lease_storage.getLeaseStorage(alloc); 747 - const renewed = storage.renewLease(lease_id, ttl_seconds); 751 + const storage = lease_storage.get(); 752 + const renewed = storage.renewLease(lease_id, ttl_seconds) catch { 753 + json_util.sendStatus(r, "{\"detail\":\"storage error\"}", .internal_server_error); 754 + return; 755 + }; 748 756 749 757 if (renewed) { 750 758 r.setStatus(.ok); ··· 782 790 return; 783 791 } 784 792 785 - const storage = lease_storage.getLeaseStorage(alloc); 786 793 var ts_buf: [32]u8 = undefined; 787 794 const now = time_util.timestamp(&ts_buf); 788 795 789 796 // Read lease to release slots before revoking 790 - if (storage.readLease(lease_id)) |lease| { 791 - const occupancy = storage.calculateOccupancy(lease_id); 797 + const storage = lease_storage.get(); 798 + if (try storage.getById(alloc, lease_id)) |lease| { 799 + const occupancy = lease_storage.LeaseStorage.calculateOccupancy(lease); 800 + 792 801 for (lease.resource_ids) |resource_id| { 793 802 _ = try db.concurrency_limits.releaseSlots(resource_id, lease.slots, occupancy, now); 794 803 } 795 - storage.revokeLease(lease_id); 804 + _ = try storage.revokeLease(lease_id); 796 805 } 797 806 798 807 // Always return success (idempotent)
-214
src/concurrency/lease_storage.zig
··· 1 - const std = @import("std"); 2 - const Allocator = std.mem.Allocator; 3 - 4 - const uuid_util = @import("../utilities/uuid.zig"); 5 - const time_util = @import("../utilities/time.zig"); 6 - 7 - /// Metadata associated with a concurrency lease 8 - pub const LeaseMetadata = struct { 9 - slots: i64, 10 - holder_type: ?[]const u8 = null, // "flow_run", "task_run", "deployment" 11 - holder_id: ?[]const u8 = null, 12 - }; 13 - 14 - /// A lease on concurrency limit resources 15 - pub const ResourceLease = struct { 16 - id: []const u8, 17 - resource_ids: []const []const u8, // IDs of concurrency limits this lease is for 18 - expiration_us: i64, // microseconds since epoch 19 - created_at_us: i64, 20 - slots: i64, 21 - holder_type: ?[]const u8, 22 - holder_id: ?[]const u8, 23 - }; 24 - 25 - /// In-memory lease storage (singleton) 26 - /// Thread-safe via mutex for concurrent access 27 - pub const LeaseStorage = struct { 28 - allocator: Allocator, 29 - leases: std.StringHashMap(ResourceLease), 30 - expirations: std.StringHashMap(i64), // lease_id -> expiration_us 31 - mutex: std.Thread.Mutex, 32 - 33 - var instance: ?*LeaseStorage = null; 34 - 35 - pub fn get(_: Allocator) *LeaseStorage { 36 - if (instance) |storage| { 37 - return storage; 38 - } 39 - // Use page_allocator for the singleton - it lives for the entire program lifetime 40 - const alloc = std.heap.page_allocator; 41 - const storage = alloc.create(LeaseStorage) catch unreachable; 42 - storage.* = LeaseStorage{ 43 - .allocator = alloc, 44 - .leases = std.StringHashMap(ResourceLease).init(alloc), 45 - .expirations = std.StringHashMap(i64).init(alloc), 46 - .mutex = .{}, 47 - }; 48 - instance = storage; 49 - return storage; 50 - } 51 - 52 - /// Create a new lease with the given TTL (in seconds) 53 - pub fn createLease( 54 - self: *LeaseStorage, 55 - resource_ids: []const []const u8, 56 - ttl_seconds: f64, 57 - metadata: LeaseMetadata, 58 - ) !ResourceLease { 59 - self.mutex.lock(); 60 - defer self.mutex.unlock(); 61 - 62 - // Generate lease ID 63 - var id_buf: [36]u8 = undefined; 64 - const lease_id = uuid_util.generate(&id_buf); 65 - const owned_id = try self.allocator.dupe(u8, lease_id); 66 - 67 - // Copy resource IDs 68 - const owned_resource_ids = try self.allocator.alloc([]const u8, resource_ids.len); 69 - for (resource_ids, 0..) |rid, i| { 70 - owned_resource_ids[i] = try self.allocator.dupe(u8, rid); 71 - } 72 - 73 - const now_us = time_util.nowMicros(); 74 - const ttl_us: i64 = @intFromFloat(ttl_seconds * 1_000_000); 75 - const expiration_us = now_us + ttl_us; 76 - 77 - // Copy optional strings 78 - const owned_holder_type = if (metadata.holder_type) |ht| try self.allocator.dupe(u8, ht) else null; 79 - const owned_holder_id = if (metadata.holder_id) |hi| try self.allocator.dupe(u8, hi) else null; 80 - 81 - const lease = ResourceLease{ 82 - .id = owned_id, 83 - .resource_ids = owned_resource_ids, 84 - .expiration_us = expiration_us, 85 - .created_at_us = now_us, 86 - .slots = metadata.slots, 87 - .holder_type = owned_holder_type, 88 - .holder_id = owned_holder_id, 89 - }; 90 - 91 - try self.leases.put(owned_id, lease); 92 - try self.expirations.put(owned_id, expiration_us); 93 - 94 - return lease; 95 - } 96 - 97 - /// Read a lease by ID 98 - pub fn readLease(self: *LeaseStorage, lease_id: []const u8) ?ResourceLease { 99 - self.mutex.lock(); 100 - defer self.mutex.unlock(); 101 - 102 - return self.leases.get(lease_id); 103 - } 104 - 105 - /// Renew a lease by updating its expiration 106 - pub fn renewLease(self: *LeaseStorage, lease_id: []const u8, ttl_seconds: f64) bool { 107 - self.mutex.lock(); 108 - defer self.mutex.unlock(); 109 - 110 - if (!self.leases.contains(lease_id)) { 111 - // Clean up orphaned expiration entry 112 - _ = self.expirations.remove(lease_id); 113 - return false; 114 - } 115 - 116 - const now_us = time_util.nowMicros(); 117 - const ttl_us: i64 = @intFromFloat(ttl_seconds * 1_000_000); 118 - const new_expiration = now_us + ttl_us; 119 - 120 - // Update expiration in lease 121 - if (self.leases.getPtr(lease_id)) |lease| { 122 - lease.expiration_us = new_expiration; 123 - } 124 - 125 - // Update expiration index 126 - if (self.expirations.getPtr(lease_id)) |exp| { 127 - exp.* = new_expiration; 128 - } 129 - 130 - return true; 131 - } 132 - 133 - /// Revoke a lease (remove it from storage) 134 - pub fn revokeLease(self: *LeaseStorage, lease_id: []const u8) void { 135 - self.mutex.lock(); 136 - defer self.mutex.unlock(); 137 - 138 - // Remove from expirations FIRST (before freeing keys) 139 - _ = self.expirations.remove(lease_id); 140 - 141 - if (self.leases.fetchRemove(lease_id)) |kv| { 142 - // Free the lease data - id last since it's the key 143 - for (kv.value.resource_ids) |rid| { 144 - self.allocator.free(rid); 145 - } 146 - self.allocator.free(kv.value.resource_ids); 147 - if (kv.value.holder_type) |ht| self.allocator.free(ht); 148 - if (kv.value.holder_id) |hi| self.allocator.free(hi); 149 - // Free id last since it was used as the key 150 - self.allocator.free(kv.value.id); 151 - } 152 - } 153 - 154 - /// Get IDs of expired leases 155 - pub fn readExpiredLeaseIds(self: *LeaseStorage, allocator: Allocator, limit: usize) ![][]const u8 { 156 - self.mutex.lock(); 157 - defer self.mutex.unlock(); 158 - 159 - const now_us = time_util.nowMicros(); 160 - var result = std.ArrayListUnmanaged([]const u8){}; 161 - 162 - var it = self.expirations.iterator(); 163 - while (it.next()) |entry| { 164 - if (result.items.len >= limit) break; 165 - if (entry.value_ptr.* < now_us) { 166 - try result.append(allocator, try allocator.dupe(u8, entry.key_ptr.*)); 167 - } 168 - } 169 - 170 - return result.toOwnedSlice(allocator); 171 - } 172 - 173 - /// Get IDs of active (non-expired) leases 174 - pub fn readActiveLeaseIds(self: *LeaseStorage, allocator: Allocator, limit: usize, offset: usize) ![][]const u8 { 175 - self.mutex.lock(); 176 - defer self.mutex.unlock(); 177 - 178 - const now_us = time_util.nowMicros(); 179 - var result = std.ArrayListUnmanaged([]const u8){}; 180 - var skipped: usize = 0; 181 - 182 - var it = self.expirations.iterator(); 183 - while (it.next()) |entry| { 184 - if (result.items.len >= limit) break; 185 - if (entry.value_ptr.* > now_us) { 186 - if (skipped < offset) { 187 - skipped += 1; 188 - continue; 189 - } 190 - try result.append(allocator, try allocator.dupe(u8, entry.key_ptr.*)); 191 - } 192 - } 193 - 194 - return result.toOwnedSlice(allocator); 195 - } 196 - 197 - /// Calculate occupancy seconds for a lease (time since creation) 198 - pub fn calculateOccupancy(self: *LeaseStorage, lease_id: []const u8) ?f64 { 199 - self.mutex.lock(); 200 - defer self.mutex.unlock(); 201 - 202 - if (self.leases.get(lease_id)) |lease| { 203 - const now_us = time_util.nowMicros(); 204 - const duration_us: f64 = @floatFromInt(now_us - lease.created_at_us); 205 - return duration_us / 1_000_000.0; // Convert to seconds 206 - } 207 - return null; 208 - } 209 - }; 210 - 211 - /// Get the global lease storage instance 212 - pub fn getLeaseStorage(allocator: Allocator) *LeaseStorage { 213 - return LeaseStorage.get(allocator); 214 - }
+277
src/leases/memory.zig
··· 1 + //! In-memory lease storage 2 + //! 3 + //! Default lease storage for single-instance deployments. 4 + //! Leases are ephemeral and lost on server restart. 5 + 6 + const std = @import("std"); 7 + const Allocator = std.mem.Allocator; 8 + 9 + const utilities_time = @import("../utilities/time.zig"); 10 + const utilities_uuid = @import("../utilities/uuid.zig"); 11 + 12 + pub const Lease = struct { 13 + id: []const u8, 14 + resource_ids: []const []const u8, 15 + slots: i64, 16 + expiration_us: i64, 17 + created_at_us: i64, 18 + holder_type: ?[]const u8, 19 + holder_id: ?[]const u8, 20 + }; 21 + 22 + pub const MemoryLeaseStorage = struct { 23 + const Self = @This(); 24 + 25 + alloc: Allocator, 26 + mutex: std.Thread.Mutex, 27 + leases: std.StringHashMap(StoredLease), 28 + expirations: std.StringHashMap(i64), // lease_id -> expiration_us 29 + 30 + const StoredLease = struct { 31 + id: []const u8, 32 + resource_ids_json: []const u8, 33 + slots: i64, 34 + expiration_us: i64, 35 + created_at_us: i64, 36 + holder_type: ?[]const u8, 37 + holder_id: ?[]const u8, 38 + }; 39 + 40 + pub fn init(alloc: Allocator) Self { 41 + return Self{ 42 + .alloc = alloc, 43 + .mutex = .{}, 44 + .leases = std.StringHashMap(StoredLease).init(alloc), 45 + .expirations = std.StringHashMap(i64).init(alloc), 46 + }; 47 + } 48 + 49 + pub fn deinit(self: *Self) void { 50 + // Free all stored lease data 51 + var it = self.leases.iterator(); 52 + while (it.next()) |entry| { 53 + self.alloc.free(entry.key_ptr.*); 54 + self.alloc.free(entry.value_ptr.resource_ids_json); 55 + if (entry.value_ptr.holder_type) |ht| self.alloc.free(ht); 56 + if (entry.value_ptr.holder_id) |hi| self.alloc.free(hi); 57 + } 58 + self.leases.deinit(); 59 + 60 + var exp_it = self.expirations.iterator(); 61 + while (exp_it.next()) |entry| { 62 + self.alloc.free(entry.key_ptr.*); 63 + } 64 + self.expirations.deinit(); 65 + } 66 + 67 + pub fn createLease( 68 + self: *Self, 69 + resource_ids: []const []const u8, 70 + ttl_seconds: f64, 71 + slots: i64, 72 + holder_type: ?[]const u8, 73 + holder_id: ?[]const u8, 74 + ) !Lease { 75 + self.mutex.lock(); 76 + defer self.mutex.unlock(); 77 + 78 + const now_us = utilities_time.nowMicros(); 79 + const ttl_us: i64 = @intFromFloat(ttl_seconds * 1_000_000); 80 + const expiration_us = now_us + ttl_us; 81 + 82 + // Generate UUID 83 + const lease_id = utilities_uuid.generateAlloc(self.alloc); 84 + 85 + // Serialize resource_ids to JSON 86 + const resource_ids_json = std.json.Stringify.valueAlloc(self.alloc, resource_ids, .{}) catch "[]"; 87 + 88 + // Copy holder info 89 + const ht = if (holder_type) |h| try self.alloc.dupe(u8, h) else null; 90 + const hi = if (holder_id) |h| try self.alloc.dupe(u8, h) else null; 91 + 92 + const stored = StoredLease{ 93 + .id = lease_id, 94 + .resource_ids_json = resource_ids_json, 95 + .slots = slots, 96 + .expiration_us = expiration_us, 97 + .created_at_us = now_us, 98 + .holder_type = ht, 99 + .holder_id = hi, 100 + }; 101 + 102 + try self.leases.put(lease_id, stored); 103 + const exp_key = try self.alloc.dupe(u8, lease_id); 104 + try self.expirations.put(exp_key, expiration_us); 105 + 106 + return Lease{ 107 + .id = lease_id, 108 + .resource_ids = resource_ids, 109 + .slots = slots, 110 + .expiration_us = expiration_us, 111 + .created_at_us = now_us, 112 + .holder_type = holder_type, 113 + .holder_id = holder_id, 114 + }; 115 + } 116 + 117 + pub fn getById(self: *Self, lease_id: []const u8) ?Lease { 118 + self.mutex.lock(); 119 + defer self.mutex.unlock(); 120 + 121 + const stored = self.leases.get(lease_id) orelse return null; 122 + 123 + return Lease{ 124 + .id = stored.id, 125 + .resource_ids = &.{}, // Caller should parse resource_ids_json if needed 126 + .slots = stored.slots, 127 + .expiration_us = stored.expiration_us, 128 + .created_at_us = stored.created_at_us, 129 + .holder_type = stored.holder_type, 130 + .holder_id = stored.holder_id, 131 + }; 132 + } 133 + 134 + pub fn renewLease(self: *Self, lease_id: []const u8, ttl_seconds: f64) !bool { 135 + self.mutex.lock(); 136 + defer self.mutex.unlock(); 137 + 138 + const stored = self.leases.getPtr(lease_id) orelse { 139 + // Clean up orphaned expiration entry 140 + if (self.expirations.fetchRemove(lease_id)) |kv| { 141 + self.alloc.free(kv.key); 142 + } 143 + return false; 144 + }; 145 + 146 + const ttl_us: i64 = @intFromFloat(ttl_seconds * 1_000_000); 147 + const new_expiration = utilities_time.nowMicros() + ttl_us; 148 + stored.expiration_us = new_expiration; 149 + 150 + if (self.expirations.getPtr(lease_id)) |exp_ptr| { 151 + exp_ptr.* = new_expiration; 152 + } 153 + 154 + return true; 155 + } 156 + 157 + pub fn revokeLease(self: *Self, lease_id: []const u8) !bool { 158 + self.mutex.lock(); 159 + defer self.mutex.unlock(); 160 + 161 + if (self.leases.fetchRemove(lease_id)) |kv| { 162 + self.alloc.free(kv.key); 163 + self.alloc.free(kv.value.resource_ids_json); 164 + if (kv.value.holder_type) |ht| self.alloc.free(ht); 165 + if (kv.value.holder_id) |hi| self.alloc.free(hi); 166 + } else { 167 + return false; 168 + } 169 + 170 + if (self.expirations.fetchRemove(lease_id)) |kv| { 171 + self.alloc.free(kv.key); 172 + } 173 + 174 + return true; 175 + } 176 + 177 + pub fn getExpiredLeaseIds(self: *Self, alloc: Allocator, limit: usize) ![][]const u8 { 178 + self.mutex.lock(); 179 + defer self.mutex.unlock(); 180 + 181 + const now_us = utilities_time.nowMicros(); 182 + var result: std.ArrayList([]const u8) = .empty; 183 + 184 + var it = self.expirations.iterator(); 185 + while (it.next()) |entry| { 186 + if (result.items.len >= limit) break; 187 + if (entry.value_ptr.* < now_us) { 188 + try result.append(alloc, try alloc.dupe(u8, entry.key_ptr.*)); 189 + } 190 + } 191 + 192 + return result.toOwnedSlice(alloc); 193 + } 194 + 195 + pub fn getActiveLeases(self: *Self, alloc: Allocator, limit: usize, offset: usize) ![]Lease { 196 + self.mutex.lock(); 197 + defer self.mutex.unlock(); 198 + 199 + const now_us = utilities_time.nowMicros(); 200 + var result: std.ArrayList(Lease) = .empty; 201 + var skipped: usize = 0; 202 + 203 + var it = self.leases.iterator(); 204 + while (it.next()) |entry| { 205 + if (result.items.len >= limit) break; 206 + if (entry.value_ptr.expiration_us > now_us) { 207 + if (skipped < offset) { 208 + skipped += 1; 209 + continue; 210 + } 211 + try result.append(alloc, Lease{ 212 + .id = entry.value_ptr.id, 213 + .resource_ids = &.{}, 214 + .slots = entry.value_ptr.slots, 215 + .expiration_us = entry.value_ptr.expiration_us, 216 + .created_at_us = entry.value_ptr.created_at_us, 217 + .holder_type = entry.value_ptr.holder_type, 218 + .holder_id = entry.value_ptr.holder_id, 219 + }); 220 + } 221 + } 222 + 223 + return result.toOwnedSlice(alloc); 224 + } 225 + 226 + pub fn deleteExpiredLease(self: *Self, alloc: Allocator, lease_id: []const u8) !?Lease { 227 + self.mutex.lock(); 228 + defer self.mutex.unlock(); 229 + 230 + const now_us = utilities_time.nowMicros(); 231 + 232 + if (self.leases.get(lease_id)) |stored| { 233 + if (stored.expiration_us >= now_us) { 234 + // Not expired yet 235 + return null; 236 + } 237 + 238 + // Copy data before removal 239 + const result = Lease{ 240 + .id = try alloc.dupe(u8, stored.id), 241 + .resource_ids = &.{}, 242 + .slots = stored.slots, 243 + .expiration_us = stored.expiration_us, 244 + .created_at_us = stored.created_at_us, 245 + .holder_type = if (stored.holder_type) |ht| try alloc.dupe(u8, ht) else null, 246 + .holder_id = if (stored.holder_id) |hi| try alloc.dupe(u8, hi) else null, 247 + }; 248 + 249 + // Remove from storage 250 + if (self.leases.fetchRemove(lease_id)) |kv| { 251 + self.alloc.free(kv.key); 252 + self.alloc.free(kv.value.resource_ids_json); 253 + if (kv.value.holder_type) |ht| self.alloc.free(ht); 254 + if (kv.value.holder_id) |hi| self.alloc.free(hi); 255 + } 256 + 257 + if (self.expirations.fetchRemove(lease_id)) |kv| { 258 + self.alloc.free(kv.key); 259 + } 260 + 261 + return result; 262 + } 263 + 264 + return null; 265 + } 266 + 267 + pub fn calculateOccupancy(lease: Lease) f64 { 268 + const now_us = utilities_time.nowMicros(); 269 + if (lease.expiration_us <= now_us) return 0.0; 270 + 271 + const total_duration = lease.expiration_us - lease.created_at_us; 272 + if (total_duration <= 0) return 0.0; 273 + 274 + const elapsed = now_us - lease.created_at_us; 275 + return @as(f64, @floatFromInt(elapsed)) / @as(f64, @floatFromInt(total_duration)); 276 + } 277 + };
+422
src/leases/redis.zig
··· 1 + //! Redis-backed lease storage for HA deployments 2 + //! 3 + //! Stores leases in Redis for shared access across multiple server instances: 4 + //! - Lease data in `prefect:concurrency:lease:{id}` (JSON) 5 + //! - Expiration times in sorted set `prefect:concurrency:expirations` 6 + //! - Atomic operations via Redis commands 7 + 8 + const std = @import("std"); 9 + const Allocator = std.mem.Allocator; 10 + const json = std.json; 11 + 12 + const redis = @import("redis"); 13 + const log = @import("../logging.zig"); 14 + const uuid_util = @import("../utilities/uuid.zig"); 15 + const time_util = @import("../utilities/time.zig"); 16 + 17 + pub const RedisLeaseStorage = struct { 18 + const Self = @This(); 19 + 20 + alloc: Allocator, 21 + host: []const u8, 22 + port: u16, 23 + password: []const u8, 24 + db: u8, 25 + 26 + const base_prefix = "prefect:concurrency:"; 27 + const lease_prefix = base_prefix ++ "lease:"; 28 + const expirations_key = base_prefix ++ "expirations"; 29 + 30 + pub const Lease = struct { 31 + id: []const u8, 32 + resource_ids: []const []const u8, 33 + slots: i64, 34 + expiration_us: i64, 35 + created_at_us: i64, 36 + holder_type: ?[]const u8, 37 + holder_id: ?[]const u8, 38 + }; 39 + 40 + pub fn init(alloc: Allocator, host: []const u8, port: u16, password: []const u8, db: u8) Self { 41 + return .{ 42 + .alloc = alloc, 43 + .host = host, 44 + .port = port, 45 + .password = password, 46 + .db = db, 47 + }; 48 + } 49 + 50 + fn connect(self: *Self) !redis.Client { 51 + return redis.Client.connectWithConfig(self.alloc, .{ 52 + .host = self.host, 53 + .port = self.port, 54 + .password = self.password, 55 + .db = self.db, 56 + }); 57 + } 58 + 59 + fn leaseKey(buf: *[128]u8, lease_id: []const u8) []const u8 { 60 + const result = std.fmt.bufPrint(buf, "{s}{s}", .{ lease_prefix, lease_id }) catch return lease_prefix; 61 + return result; 62 + } 63 + 64 + /// Create a new lease and store in Redis 65 + pub fn createLease( 66 + self: *Self, 67 + resource_ids: []const []const u8, 68 + ttl_seconds: f64, 69 + slots: i64, 70 + holder_type: ?[]const u8, 71 + holder_id: ?[]const u8, 72 + ) !Lease { 73 + var id_buf: [36]u8 = undefined; 74 + const lease_id = uuid_util.generate(&id_buf); 75 + 76 + const now_us = time_util.nowMicros(); 77 + const ttl_us: i64 = @intFromFloat(ttl_seconds * 1_000_000); 78 + const expiration_us = now_us + ttl_us; 79 + const expiration_ts: f64 = @as(f64, @floatFromInt(expiration_us)) / 1_000_000.0; 80 + 81 + // Build JSON for lease data 82 + var json_buf = std.ArrayListUnmanaged(u8){}; 83 + defer json_buf.deinit(self.alloc); 84 + 85 + try json_buf.appendSlice(self.alloc, "{\"id\":\""); 86 + try json_buf.appendSlice(self.alloc, lease_id); 87 + try json_buf.appendSlice(self.alloc, "\",\"resource_ids\":["); 88 + for (resource_ids, 0..) |rid, i| { 89 + if (i > 0) try json_buf.append(self.alloc, ','); 90 + try json_buf.append(self.alloc, '"'); 91 + try json_buf.appendSlice(self.alloc, rid); 92 + try json_buf.append(self.alloc, '"'); 93 + } 94 + try json_buf.appendSlice(self.alloc, "],\"slots\":"); 95 + var slots_buf: [20]u8 = undefined; 96 + const slots_str = std.fmt.bufPrint(&slots_buf, "{d}", .{slots}) catch "0"; 97 + try json_buf.appendSlice(self.alloc, slots_str); 98 + try json_buf.appendSlice(self.alloc, ",\"expiration_us\":"); 99 + var exp_buf: [20]u8 = undefined; 100 + const exp_str = std.fmt.bufPrint(&exp_buf, "{d}", .{expiration_us}) catch "0"; 101 + try json_buf.appendSlice(self.alloc, exp_str); 102 + try json_buf.appendSlice(self.alloc, ",\"created_at_us\":"); 103 + var created_buf: [20]u8 = undefined; 104 + const created_str = std.fmt.bufPrint(&created_buf, "{d}", .{now_us}) catch "0"; 105 + try json_buf.appendSlice(self.alloc, created_str); 106 + if (holder_type) |ht| { 107 + try json_buf.appendSlice(self.alloc, ",\"holder_type\":\""); 108 + try json_buf.appendSlice(self.alloc, ht); 109 + try json_buf.append(self.alloc, '"'); 110 + } 111 + if (holder_id) |hi| { 112 + try json_buf.appendSlice(self.alloc, ",\"holder_id\":\""); 113 + try json_buf.appendSlice(self.alloc, hi); 114 + try json_buf.append(self.alloc, '"'); 115 + } 116 + try json_buf.append(self.alloc, '}'); 117 + 118 + // Store in Redis 119 + var client = try self.connect(); 120 + defer client.close(); 121 + 122 + var key_buf: [128]u8 = undefined; 123 + const key = leaseKey(&key_buf, lease_id); 124 + 125 + // SET lease data 126 + var strings = client.strings(); 127 + try strings.set(key, json_buf.items); 128 + 129 + // ZADD to expirations sorted set 130 + var zsets = client.sortedSets(); 131 + _ = try zsets.zadd(expirations_key, expiration_ts, lease_id); 132 + 133 + // Return lease (caller must dupe strings if needed) 134 + const id_copy = try self.alloc.dupe(u8, lease_id); 135 + var rids = try self.alloc.alloc([]const u8, resource_ids.len); 136 + for (resource_ids, 0..) |rid, i| { 137 + rids[i] = try self.alloc.dupe(u8, rid); 138 + } 139 + 140 + return Lease{ 141 + .id = id_copy, 142 + .resource_ids = rids, 143 + .slots = slots, 144 + .expiration_us = expiration_us, 145 + .created_at_us = now_us, 146 + .holder_type = if (holder_type) |ht| try self.alloc.dupe(u8, ht) else null, 147 + .holder_id = if (holder_id) |hi| try self.alloc.dupe(u8, hi) else null, 148 + }; 149 + } 150 + 151 + /// Read a lease by ID 152 + pub fn getById(self: *Self, lease_id: []const u8) !?Lease { 153 + var client = try self.connect(); 154 + defer client.close(); 155 + 156 + var key_buf: [128]u8 = undefined; 157 + const key = leaseKey(&key_buf, lease_id); 158 + 159 + var strings = client.strings(); 160 + const data = strings.get(key) catch return null; 161 + if (data == null) return null; 162 + 163 + return try self.parseLease(data.?); 164 + } 165 + 166 + fn parseLease(self: *Self, data: []const u8) !Lease { 167 + const parsed = try json.parseFromSlice(json.Value, self.alloc, data, .{}); 168 + defer parsed.deinit(); 169 + 170 + const obj = parsed.value.object; 171 + 172 + const id = try self.alloc.dupe(u8, obj.get("id").?.string); 173 + const slots = obj.get("slots").?.integer; 174 + const expiration_us = obj.get("expiration_us").?.integer; 175 + const created_at_us = obj.get("created_at_us").?.integer; 176 + 177 + // Parse resource_ids array 178 + const rids_arr = obj.get("resource_ids").?.array; 179 + var rids = try self.alloc.alloc([]const u8, rids_arr.items.len); 180 + for (rids_arr.items, 0..) |item, i| { 181 + rids[i] = try self.alloc.dupe(u8, item.string); 182 + } 183 + 184 + const holder_type = if (obj.get("holder_type")) |v| blk: { 185 + if (v == .string) break :blk try self.alloc.dupe(u8, v.string); 186 + break :blk null; 187 + } else null; 188 + 189 + const holder_id = if (obj.get("holder_id")) |v| blk: { 190 + if (v == .string) break :blk try self.alloc.dupe(u8, v.string); 191 + break :blk null; 192 + } else null; 193 + 194 + return Lease{ 195 + .id = id, 196 + .resource_ids = rids, 197 + .slots = slots, 198 + .expiration_us = expiration_us, 199 + .created_at_us = created_at_us, 200 + .holder_type = holder_type, 201 + .holder_id = holder_id, 202 + }; 203 + } 204 + 205 + /// Renew a lease by updating its expiration 206 + pub fn renewLease(self: *Self, lease_id: []const u8, ttl_seconds: f64) !bool { 207 + var client = try self.connect(); 208 + defer client.close(); 209 + 210 + var key_buf: [128]u8 = undefined; 211 + const key = leaseKey(&key_buf, lease_id); 212 + 213 + // Get existing lease 214 + var strings = client.strings(); 215 + const data = strings.get(key) catch return false; 216 + if (data == null) return false; 217 + 218 + // Parse and update expiration 219 + var parsed = try json.parseFromSlice(json.Value, self.alloc, data.?, .{}); 220 + defer parsed.deinit(); 221 + 222 + const now_us = time_util.nowMicros(); 223 + const ttl_us: i64 = @intFromFloat(ttl_seconds * 1_000_000); 224 + const new_expiration_us = now_us + ttl_us; 225 + const new_expiration_ts: f64 = @as(f64, @floatFromInt(new_expiration_us)) / 1_000_000.0; 226 + 227 + // Rebuild JSON with new expiration 228 + var json_buf = std.ArrayListUnmanaged(u8){}; 229 + defer json_buf.deinit(self.alloc); 230 + 231 + // Simple approach: rebuild the JSON 232 + const obj = parsed.value.object; 233 + try json_buf.appendSlice(self.alloc, "{\"id\":\""); 234 + try json_buf.appendSlice(self.alloc, obj.get("id").?.string); 235 + try json_buf.appendSlice(self.alloc, "\",\"resource_ids\":"); 236 + 237 + // Serialize resource_ids 238 + const rids = obj.get("resource_ids").?.array; 239 + try json_buf.append(self.alloc, '['); 240 + for (rids.items, 0..) |item, i| { 241 + if (i > 0) try json_buf.append(self.alloc, ','); 242 + try json_buf.append(self.alloc, '"'); 243 + try json_buf.appendSlice(self.alloc, item.string); 244 + try json_buf.append(self.alloc, '"'); 245 + } 246 + try json_buf.appendSlice(self.alloc, "],\"slots\":"); 247 + var slots_buf: [20]u8 = undefined; 248 + const slots_str = std.fmt.bufPrint(&slots_buf, "{d}", .{obj.get("slots").?.integer}) catch "0"; 249 + try json_buf.appendSlice(self.alloc, slots_str); 250 + try json_buf.appendSlice(self.alloc, ",\"expiration_us\":"); 251 + var exp_buf: [20]u8 = undefined; 252 + const exp_str = std.fmt.bufPrint(&exp_buf, "{d}", .{new_expiration_us}) catch "0"; 253 + try json_buf.appendSlice(self.alloc, exp_str); 254 + try json_buf.appendSlice(self.alloc, ",\"created_at_us\":"); 255 + var created_buf: [20]u8 = undefined; 256 + const created_str = std.fmt.bufPrint(&created_buf, "{d}", .{obj.get("created_at_us").?.integer}) catch "0"; 257 + try json_buf.appendSlice(self.alloc, created_str); 258 + 259 + if (obj.get("holder_type")) |v| { 260 + if (v == .string) { 261 + try json_buf.appendSlice(self.alloc, ",\"holder_type\":\""); 262 + try json_buf.appendSlice(self.alloc, v.string); 263 + try json_buf.append(self.alloc, '"'); 264 + } 265 + } 266 + if (obj.get("holder_id")) |v| { 267 + if (v == .string) { 268 + try json_buf.appendSlice(self.alloc, ",\"holder_id\":\""); 269 + try json_buf.appendSlice(self.alloc, v.string); 270 + try json_buf.append(self.alloc, '"'); 271 + } 272 + } 273 + try json_buf.append(self.alloc, '}'); 274 + 275 + // Update in Redis 276 + try strings.set(key, json_buf.items); 277 + var zsets = client.sortedSets(); 278 + _ = try zsets.zadd(expirations_key, new_expiration_ts, lease_id); 279 + 280 + return true; 281 + } 282 + 283 + /// Revoke (delete) a lease 284 + pub fn revokeLease(self: *Self, lease_id: []const u8) !bool { 285 + var client = try self.connect(); 286 + defer client.close(); 287 + 288 + var key_buf: [128]u8 = undefined; 289 + const key = leaseKey(&key_buf, lease_id); 290 + 291 + // Delete lease data and remove from expirations 292 + var keys_cmd = client.keys(); 293 + const deleted = try keys_cmd.del(&.{key}); 294 + var zsets = client.sortedSets(); 295 + _ = try zsets.zrem(expirations_key, &.{lease_id}); 296 + 297 + return deleted > 0; 298 + } 299 + 300 + /// Get expired lease IDs using ZRANGEBYSCORE with -inf 301 + pub fn getExpiredLeaseIds(self: *Self, limit: usize) ![][]const u8 { 302 + var client = try self.connect(); 303 + defer client.close(); 304 + 305 + const now_us = time_util.nowMicros(); 306 + const now_ts: f64 = @as(f64, @floatFromInt(now_us)) / 1_000_000.0; 307 + 308 + // ZRANGEBYSCORE expirations -inf now LIMIT 0 limit 309 + var ts_buf: [32]u8 = undefined; 310 + const ts_str = std.fmt.bufPrint(&ts_buf, "{d:.6}", .{now_ts}) catch "0"; 311 + 312 + var limit_buf: [20]u8 = undefined; 313 + const limit_str = std.fmt.bufPrint(&limit_buf, "{d}", .{limit}) catch "100"; 314 + 315 + // Use sendCommand for -inf syntax 316 + const result = client.sendCommand(&.{ 317 + "ZRANGEBYSCORE", 318 + expirations_key, 319 + "-inf", 320 + ts_str, 321 + "LIMIT", 322 + "0", 323 + limit_str, 324 + }) catch return &[_][]const u8{}; 325 + 326 + // Parse array result 327 + const arr = result.asArray() orelse return &[_][]const u8{}; 328 + if (arr.len == 0) return &[_][]const u8{}; 329 + 330 + // Copy results 331 + var ids = try self.alloc.alloc([]const u8, arr.len); 332 + var count: usize = 0; 333 + for (arr) |item| { 334 + if (item.asString()) |id| { 335 + ids[count] = try self.alloc.dupe(u8, id); 336 + count += 1; 337 + } 338 + } 339 + 340 + return ids[0..count]; 341 + } 342 + 343 + /// Get active (non-expired) leases with pagination 344 + pub fn getActiveLeases(self: *Self, limit: usize, offset: usize) ![]Lease { 345 + var client = try self.connect(); 346 + defer client.close(); 347 + 348 + const now_us = time_util.nowMicros(); 349 + const now_ts: f64 = @as(f64, @floatFromInt(now_us)) / 1_000_000.0; 350 + 351 + var ts_buf: [32]u8 = undefined; 352 + const ts_str = std.fmt.bufPrint(&ts_buf, "{d:.6}", .{now_ts}) catch "0"; 353 + 354 + var offset_buf: [20]u8 = undefined; 355 + const offset_str = std.fmt.bufPrint(&offset_buf, "{d}", .{offset}) catch "0"; 356 + 357 + var limit_buf: [20]u8 = undefined; 358 + const limit_str = std.fmt.bufPrint(&limit_buf, "{d}", .{limit}) catch "100"; 359 + 360 + // ZRANGEBYSCORE expirations now +inf LIMIT offset limit 361 + const result = client.sendCommand(&.{ 362 + "ZRANGEBYSCORE", 363 + expirations_key, 364 + ts_str, 365 + "+inf", 366 + "LIMIT", 367 + offset_str, 368 + limit_str, 369 + }) catch return &[_]Lease{}; 370 + 371 + const arr = result.asArray() orelse return &[_]Lease{}; 372 + if (arr.len == 0) return &[_]Lease{}; 373 + 374 + // Fetch each lease 375 + var results = std.ArrayListUnmanaged(Lease){}; 376 + for (arr) |item| { 377 + if (item.asString()) |lease_id| { 378 + if (try self.getById(lease_id)) |lease| { 379 + try results.append(self.alloc, lease); 380 + } 381 + } 382 + } 383 + 384 + return results.toOwnedSlice(self.alloc); 385 + } 386 + 387 + /// Atomically delete expired lease and return it 388 + pub fn deleteExpiredLease(self: *Self, lease_id: []const u8) !?Lease { 389 + const now_us = time_util.nowMicros(); 390 + 391 + // Get the lease first 392 + const maybe_lease = try self.getById(lease_id); 393 + if (maybe_lease) |lease| { 394 + if (lease.expiration_us < now_us) { 395 + // Still expired, safe to delete 396 + _ = try self.revokeLease(lease_id); 397 + return lease; 398 + } 399 + // Lease was renewed, free and return null 400 + self.freeLease(lease); 401 + } 402 + return null; 403 + } 404 + 405 + pub fn freeLease(self: *Self, lease: Lease) void { 406 + self.alloc.free(lease.id); 407 + for (lease.resource_ids) |rid| { 408 + self.alloc.free(rid); 409 + } 410 + self.alloc.free(lease.resource_ids); 411 + if (lease.holder_type) |ht| self.alloc.free(ht); 412 + if (lease.holder_id) |hi| self.alloc.free(hi); 413 + } 414 + 415 + /// Calculate occupancy seconds for a lease 416 + pub fn calculateOccupancy(lease: Lease) f64 { 417 + const now_us = time_util.nowMicros(); 418 + const end_us = @min(now_us, lease.expiration_us); 419 + const duration_us: f64 = @floatFromInt(@max(0, end_us - lease.created_at_us)); 420 + return duration_us / 1_000_000.0; 421 + } 422 + };
+250
src/leases/storage.zig
··· 1 + //! Unified lease storage interface 2 + //! 3 + //! Backend selection: 4 + //! - Memory: default (single-instance deployments) 5 + //! - Redis: when PREFECT_BROKER_BACKEND=redis (HA deployments) 6 + //! 7 + //! This matches Python Prefect's pattern where leases are NEVER stored in the database. 8 + //! Leases are ephemeral by default (memory) or use Redis for HA. 9 + 10 + const std = @import("std"); 11 + const Allocator = std.mem.Allocator; 12 + 13 + const log = @import("../logging.zig"); 14 + const redis_storage = @import("redis.zig"); 15 + const memory_storage = @import("memory.zig"); 16 + 17 + pub const Lease = struct { 18 + id: []const u8, 19 + resource_ids: []const []const u8, 20 + slots: i64, 21 + expiration_us: i64, 22 + created_at_us: i64, 23 + holder_type: ?[]const u8, 24 + holder_id: ?[]const u8, 25 + }; 26 + 27 + pub const LeaseStorage = union(enum) { 28 + redis: *redis_storage.RedisLeaseStorage, 29 + memory: *memory_storage.MemoryLeaseStorage, 30 + 31 + pub fn createLease( 32 + self: LeaseStorage, 33 + alloc: Allocator, 34 + resource_ids: []const []const u8, 35 + ttl_seconds: f64, 36 + slots: i64, 37 + holder_type: ?[]const u8, 38 + holder_id: ?[]const u8, 39 + ) !Lease { 40 + _ = alloc; 41 + switch (self) { 42 + .redis => |r| { 43 + const lease = try r.createLease(resource_ids, ttl_seconds, slots, holder_type, holder_id); 44 + return Lease{ 45 + .id = lease.id, 46 + .resource_ids = lease.resource_ids, 47 + .slots = lease.slots, 48 + .expiration_us = lease.expiration_us, 49 + .created_at_us = lease.created_at_us, 50 + .holder_type = lease.holder_type, 51 + .holder_id = lease.holder_id, 52 + }; 53 + }, 54 + .memory => |m| { 55 + const lease = try m.createLease(resource_ids, ttl_seconds, slots, holder_type, holder_id); 56 + return Lease{ 57 + .id = lease.id, 58 + .resource_ids = lease.resource_ids, 59 + .slots = lease.slots, 60 + .expiration_us = lease.expiration_us, 61 + .created_at_us = lease.created_at_us, 62 + .holder_type = lease.holder_type, 63 + .holder_id = lease.holder_id, 64 + }; 65 + }, 66 + } 67 + } 68 + 69 + pub fn getById(self: LeaseStorage, alloc: Allocator, lease_id: []const u8) !?Lease { 70 + _ = alloc; 71 + switch (self) { 72 + .redis => |r| { 73 + if (try r.getById(lease_id)) |lease| { 74 + return Lease{ 75 + .id = lease.id, 76 + .resource_ids = lease.resource_ids, 77 + .slots = lease.slots, 78 + .expiration_us = lease.expiration_us, 79 + .created_at_us = lease.created_at_us, 80 + .holder_type = lease.holder_type, 81 + .holder_id = lease.holder_id, 82 + }; 83 + } 84 + return null; 85 + }, 86 + .memory => |m| { 87 + if (m.getById(lease_id)) |lease| { 88 + return Lease{ 89 + .id = lease.id, 90 + .resource_ids = lease.resource_ids, 91 + .slots = lease.slots, 92 + .expiration_us = lease.expiration_us, 93 + .created_at_us = lease.created_at_us, 94 + .holder_type = lease.holder_type, 95 + .holder_id = lease.holder_id, 96 + }; 97 + } 98 + return null; 99 + }, 100 + } 101 + } 102 + 103 + pub fn renewLease(self: LeaseStorage, lease_id: []const u8, ttl_seconds: f64) !bool { 104 + switch (self) { 105 + .redis => |r| return try r.renewLease(lease_id, ttl_seconds), 106 + .memory => |m| return try m.renewLease(lease_id, ttl_seconds), 107 + } 108 + } 109 + 110 + pub fn revokeLease(self: LeaseStorage, lease_id: []const u8) !bool { 111 + switch (self) { 112 + .redis => |r| return try r.revokeLease(lease_id), 113 + .memory => |m| return try m.revokeLease(lease_id), 114 + } 115 + } 116 + 117 + pub fn getExpiredLeaseIds(self: LeaseStorage, alloc: Allocator, limit: usize) ![][]const u8 { 118 + switch (self) { 119 + .redis => |r| return try r.getExpiredLeaseIds(limit), 120 + .memory => |m| return try m.getExpiredLeaseIds(alloc, limit), 121 + } 122 + } 123 + 124 + pub fn getActiveLeases(self: LeaseStorage, alloc: Allocator, limit: usize, offset: usize) ![]Lease { 125 + switch (self) { 126 + .redis => |r| { 127 + const redis_leases = try r.getActiveLeases(limit, offset); 128 + var results = try alloc.alloc(Lease, redis_leases.len); 129 + for (redis_leases, 0..) |lease, i| { 130 + results[i] = Lease{ 131 + .id = lease.id, 132 + .resource_ids = lease.resource_ids, 133 + .slots = lease.slots, 134 + .expiration_us = lease.expiration_us, 135 + .created_at_us = lease.created_at_us, 136 + .holder_type = lease.holder_type, 137 + .holder_id = lease.holder_id, 138 + }; 139 + } 140 + return results; 141 + }, 142 + .memory => |m| { 143 + const mem_leases = try m.getActiveLeases(alloc, limit, offset); 144 + var results = try alloc.alloc(Lease, mem_leases.len); 145 + for (mem_leases, 0..) |lease, i| { 146 + results[i] = Lease{ 147 + .id = lease.id, 148 + .resource_ids = lease.resource_ids, 149 + .slots = lease.slots, 150 + .expiration_us = lease.expiration_us, 151 + .created_at_us = lease.created_at_us, 152 + .holder_type = lease.holder_type, 153 + .holder_id = lease.holder_id, 154 + }; 155 + } 156 + return results; 157 + }, 158 + } 159 + } 160 + 161 + pub fn deleteExpiredLease(self: LeaseStorage, alloc: Allocator, lease_id: []const u8) !?Lease { 162 + switch (self) { 163 + .redis => |r| { 164 + if (try r.deleteExpiredLease(lease_id)) |lease| { 165 + return Lease{ 166 + .id = lease.id, 167 + .resource_ids = lease.resource_ids, 168 + .slots = lease.slots, 169 + .expiration_us = lease.expiration_us, 170 + .created_at_us = lease.created_at_us, 171 + .holder_type = lease.holder_type, 172 + .holder_id = lease.holder_id, 173 + }; 174 + } 175 + return null; 176 + }, 177 + .memory => |m| { 178 + if (try m.deleteExpiredLease(alloc, lease_id)) |lease| { 179 + return Lease{ 180 + .id = lease.id, 181 + .resource_ids = lease.resource_ids, 182 + .slots = lease.slots, 183 + .expiration_us = lease.expiration_us, 184 + .created_at_us = lease.created_at_us, 185 + .holder_type = lease.holder_type, 186 + .holder_id = lease.holder_id, 187 + }; 188 + } 189 + return null; 190 + }, 191 + } 192 + } 193 + 194 + pub fn calculateOccupancy(lease: Lease) f64 { 195 + return memory_storage.MemoryLeaseStorage.calculateOccupancy(.{ 196 + .id = lease.id, 197 + .resource_ids = &.{}, 198 + .slots = lease.slots, 199 + .expiration_us = lease.expiration_us, 200 + .created_at_us = lease.created_at_us, 201 + .holder_type = lease.holder_type, 202 + .holder_id = lease.holder_id, 203 + }); 204 + } 205 + }; 206 + 207 + // Global lease storage instance 208 + var storage: ?LeaseStorage = null; 209 + var redis_instance: ?redis_storage.RedisLeaseStorage = null; 210 + var memory_instance: ?memory_storage.MemoryLeaseStorage = null; 211 + 212 + /// Initialize lease storage based on environment 213 + pub fn init() !void { 214 + if (storage != null) return; 215 + 216 + const alloc = std.heap.page_allocator; 217 + const broker_backend = std.posix.getenv("PREFECT_BROKER_BACKEND") orelse "memory"; 218 + 219 + if (std.mem.eql(u8, broker_backend, "redis")) { 220 + // Use Redis for lease storage (HA mode) 221 + const host = std.posix.getenv("PREFECT_REDIS_MESSAGING_HOST") orelse "localhost"; 222 + const port_str = std.posix.getenv("PREFECT_REDIS_MESSAGING_PORT") orelse "6379"; 223 + const port = std.fmt.parseInt(u16, port_str, 10) catch 6379; 224 + const password = std.posix.getenv("PREFECT_REDIS_MESSAGING_PASSWORD") orelse ""; 225 + const db_str = std.posix.getenv("PREFECT_REDIS_MESSAGING_DB") orelse "0"; 226 + const db_num = std.fmt.parseInt(u8, db_str, 10) catch 0; 227 + 228 + redis_instance = redis_storage.RedisLeaseStorage.init(alloc, host, port, password, db_num); 229 + storage = .{ .redis = &redis_instance.? }; 230 + log.info("leases", "initialized redis lease storage ({s}:{d})", .{ host, port }); 231 + } else { 232 + // Use in-memory storage (default - matches Python) 233 + memory_instance = memory_storage.MemoryLeaseStorage.init(alloc); 234 + storage = .{ .memory = &memory_instance.? }; 235 + log.info("leases", "initialized memory lease storage", .{}); 236 + } 237 + } 238 + 239 + /// Get the global lease storage instance 240 + pub fn get() LeaseStorage { 241 + if (storage == null) { 242 + // Auto-initialize if not already done 243 + init() catch { 244 + // Fall back to memory if Redis fails 245 + memory_instance = memory_storage.MemoryLeaseStorage.init(std.heap.page_allocator); 246 + storage = .{ .memory = &memory_instance.? }; 247 + }; 248 + } 249 + return storage.?; 250 + }
+7
src/main.zig
··· 5 5 const db = @import("db/sqlite.zig"); 6 6 const backend = @import("db/backend.zig"); 7 7 const broker = @import("broker.zig"); 8 + const lease_storage = @import("leases/storage.zig"); 8 9 const routes = @import("api/routes.zig"); 9 10 const events = @import("api/events.zig"); 10 11 const log = @import("logging.zig"); ··· 243 244 return err; 244 245 }; 245 246 log.info("broker", "ready ({s})", .{@tagName(broker_dialect)}); 247 + 248 + // Initialize lease storage (matches broker backend: redis for HA, database otherwise) 249 + lease_storage.init() catch |err| { 250 + log.err("leases", "failed to initialize lease storage: {}", .{err}); 251 + return err; 252 + }; 246 253 } 247 254 248 255 fn deinitInfra() void {
+10 -10
src/services/lease_cleanup.zig
··· 3 3 const log = @import("../logging.zig"); 4 4 const db = @import("../db/sqlite.zig"); 5 5 const time_util = @import("../utilities/time.zig"); 6 - const lease_storage = @import("../concurrency/lease_storage.zig"); 6 + const lease_storage = @import("../leases/storage.zig"); 7 7 8 8 /// Cleanup interval in milliseconds (default: 10 seconds) 9 9 const CLEANUP_INTERVAL_MS: u64 = 10_000; ··· 70 70 } 71 71 72 72 fn cleanupExpiredLeases() !void { 73 - const storage = lease_storage.getLeaseStorage(std.heap.page_allocator); 73 + const alloc = std.heap.page_allocator; 74 + const storage = lease_storage.get(); 74 75 75 76 // Get expired lease IDs 76 - const expired_ids = try storage.readExpiredLeaseIds(std.heap.page_allocator, BATCH_SIZE); 77 + const expired_ids = try storage.getExpiredLeaseIds(alloc, BATCH_SIZE); 77 78 defer { 78 79 for (expired_ids) |id| { 79 - std.heap.page_allocator.free(id); 80 + alloc.free(id); 80 81 } 81 - std.heap.page_allocator.free(expired_ids); 82 + alloc.free(expired_ids); 82 83 } 83 84 84 85 if (expired_ids.len == 0) return; ··· 90 91 var released: usize = 0; 91 92 92 93 for (expired_ids) |lease_id| { 93 - // Read lease to get resource_ids and slots 94 - if (storage.readLease(lease_id)) |lease| { 94 + // Use deleteExpiredLease for race-safe cleanup 95 + // This only deletes if the lease is still expired (handles renewal race) 96 + if (try storage.deleteExpiredLease(alloc, lease_id)) |lease| { 95 97 // Calculate occupancy time 96 - const occupancy = storage.calculateOccupancy(lease_id); 98 + const occupancy = lease_storage.LeaseStorage.calculateOccupancy(lease); 97 99 98 100 // Release slots for each resource 99 101 for (lease.resource_ids) |resource_id| { ··· 102 104 }; 103 105 } 104 106 105 - // Revoke the lease 106 - storage.revokeLease(lease_id); 107 107 released += 1; 108 108 } 109 109 }