prefect server in zig

feat: add lease-based concurrency management (phase 2)

- add in-memory lease storage module (src/concurrency/lease_storage.zig)
- thread-safe singleton with mutex protection
- lease CRUD: create, read, renew, revoke
- TTL-based expiration tracking

- extend API with lease endpoints:
- POST /increment now creates leases (mode=concurrency, default)
- POST /decrement supports lease_id for release
- POST /leases/filter - list active leases
- POST /leases/renew - extend lease TTL
- DELETE /leases/{id} - revoke lease and release slots

- add mode parameter: "concurrency" (with leases) vs "rate_limit" (no leases)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+471 -9
+1 -1
loq.toml
··· 34 34 35 35 [[rules]] 36 36 path = "src/api/concurrency_limits_v2.zig" 37 - max_lines = 600 37 + max_lines = 850
+256 -8
src/api/concurrency_limits_v2.zig
··· 7 7 const uuid_util = @import("../utilities/uuid.zig"); 8 8 const time_util = @import("../utilities/time.zig"); 9 9 const json_util = @import("../utilities/json.zig"); 10 + const lease_storage = @import("../concurrency/lease_storage.zig"); 10 11 11 12 pub fn handle(r: zap.Request) !void { 12 13 const target = r.path orelse "/"; 13 14 const method = r.method orelse "GET"; 15 + 16 + // Lease routes must come first (more specific paths) 17 + // POST /v2/concurrency_limits/leases/filter - list leases 18 + if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/leases/filter")) { 19 + try filterLeases(r); 20 + return; 21 + } 22 + 23 + // POST /v2/concurrency_limits/leases/renew - renew lease 24 + if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/leases/renew")) { 25 + try renewLease(r); 26 + return; 27 + } 28 + 29 + // DELETE /v2/concurrency_limits/leases/{lease_id} - revoke lease 30 + if (mem.eql(u8, method, "DELETE") and mem.indexOf(u8, target, "/leases/") != null) { 31 + try revokeLease(r); 32 + return; 33 + } 14 34 15 35 // POST /v2/concurrency_limits/filter - list 16 36 if (mem.eql(u8, method, "POST") and mem.endsWith(u8, target, "/filter")) { ··· 113 133 // Don't match special endpoints 114 134 if (mem.eql(u8, rest, "filter") or 115 135 mem.eql(u8, rest, "increment") or 116 - mem.eql(u8, rest, "decrement")) 136 + mem.eql(u8, rest, "decrement") or 137 + mem.startsWith(u8, rest, "leases")) 117 138 { 118 139 return null; 119 140 } ··· 360 381 else => 1, 361 382 } else 1; 362 383 384 + // mode: "concurrency" (default, uses leases) or "rate_limit" (no leases) 385 + const mode: []const u8 = if (obj.get("mode")) |v| switch (v) { 386 + .string => |s| s, 387 + else => "concurrency", 388 + } else "concurrency"; 389 + 390 + // TTL for leases (default 30 seconds) 391 + const ttl_seconds: f64 = if (obj.get("ttl_seconds")) |v| switch (v) { 392 + .float => |f| f, 393 + .integer => |i| @as(f64, @floatFromInt(i)), 394 + else => 30.0, 395 + } else 30.0; 396 + 363 397 const names = obj.get("names") orelse { 364 398 json_util.sendStatus(r, "{\"detail\":\"names required\"}", .bad_request); 365 399 return; ··· 373 407 var ts_buf: [32]u8 = undefined; 374 408 const now = time_util.timestamp(&ts_buf); 375 409 var acquired_limits = std.ArrayListUnmanaged(db.concurrency_limits.ConcurrencyLimitRow){}; 410 + var acquired_ids = std.ArrayListUnmanaged([]const u8){}; 376 411 var all_acquired = true; 377 412 378 413 // Try to acquire slots for each named limit ··· 395 430 const acquired = try db.concurrency_limits.tryAcquireSlots(lim.id, slots, now); 396 431 if (acquired) { 397 432 try acquired_limits.append(alloc, lim); 433 + try acquired_ids.append(alloc, try alloc.dupe(u8, lim.id)); 398 434 } else { 399 435 all_acquired = false; 400 436 } ··· 402 438 } 403 439 404 440 if (all_acquired and acquired_limits.items.len > 0) { 405 - // Success - return minimal response 441 + // Success - create lease if using concurrency mode 442 + var lease_id: ?[]const u8 = null; 443 + 444 + if (mem.eql(u8, mode, "concurrency")) { 445 + const storage = lease_storage.getLeaseStorage(alloc); 446 + const lease = storage.createLease( 447 + acquired_ids.items, 448 + ttl_seconds, 449 + .{ .slots = slots }, 450 + ) catch { 451 + // Failed to create lease - rollback slot acquisition 452 + for (acquired_limits.items) |lim| { 453 + _ = try db.concurrency_limits.releaseSlots(lim.id, slots, null, now); 454 + } 455 + json_util.sendStatus(r, "{\"detail\":\"failed to create lease\"}", .internal_server_error); 456 + return; 457 + }; 458 + lease_id = lease.id; 459 + } 460 + 461 + // Return response with optional lease_id 406 462 var output = std.ArrayListUnmanaged(u8){}; 407 463 try output.append(alloc, '['); 408 464 409 465 for (acquired_limits.items, 0..) |lim, i| { 410 466 if (i > 0) try output.append(alloc, ','); 411 - try output.writer(alloc).print("{{\"id\":\"{s}\",\"name\":\"{s}\",\"limit\":{d}}}", .{ 412 - lim.id, lim.name, lim.limit, 413 - }); 467 + if (lease_id) |lid| { 468 + try output.writer(alloc).print("{{\"id\":\"{s}\",\"name\":\"{s}\",\"limit\":{d},\"lease_id\":\"{s}\"}}", .{ 469 + lim.id, lim.name, lim.limit, lid, 470 + }); 471 + } else { 472 + try output.writer(alloc).print("{{\"id\":\"{s}\",\"name\":\"{s}\",\"limit\":{d}}}", .{ 473 + lim.id, lim.name, lim.limit, 474 + }); 475 + } 414 476 } 415 477 416 478 try output.append(alloc, ']'); ··· 468 530 }; 469 531 470 532 const obj = parsed.value.object; 533 + var ts_buf: [32]u8 = undefined; 534 + const now = time_util.timestamp(&ts_buf); 471 535 536 + // Check if lease_id is provided - if so, use lease-based release 537 + if (obj.get("lease_id")) |lease_val| { 538 + if (lease_val == .string) { 539 + const lease_id = lease_val.string; 540 + const storage = lease_storage.getLeaseStorage(alloc); 541 + 542 + // Read the lease to get resource_ids and slots 543 + if (storage.readLease(lease_id)) |lease| { 544 + // Calculate occupancy from lease creation time 545 + const occupancy = storage.calculateOccupancy(lease_id); 546 + 547 + // Release slots for each resource in the lease 548 + for (lease.resource_ids) |resource_id| { 549 + _ = try db.concurrency_limits.releaseSlots(resource_id, lease.slots, occupancy, now); 550 + } 551 + 552 + // Revoke the lease 553 + storage.revokeLease(lease_id); 554 + 555 + r.setStatus(.ok); 556 + r.setHeader("content-type", "application/json") catch {}; 557 + r.setHeader("access-control-allow-origin", "*") catch {}; 558 + try r.sendBody("[]"); 559 + return; 560 + } else { 561 + // Lease not found - still return success (idempotent) 562 + r.setStatus(.ok); 563 + r.setHeader("content-type", "application/json") catch {}; 564 + r.setHeader("access-control-allow-origin", "*") catch {}; 565 + try r.sendBody("[]"); 566 + return; 567 + } 568 + } 569 + } 570 + 571 + // Fallback to names-based release (no lease) 472 572 const slots: i64 = if (obj.get("slots")) |v| switch (v) { 473 573 .integer => |i| i, 474 574 else => 1, ··· 481 581 } else null; 482 582 483 583 const names = obj.get("names") orelse { 484 - json_util.sendStatus(r, "{\"detail\":\"names required\"}", .bad_request); 584 + json_util.sendStatus(r, "{\"detail\":\"names or lease_id required\"}", .bad_request); 485 585 return; 486 586 }; 487 587 ··· 490 590 return; 491 591 } 492 592 493 - var ts_buf: [32]u8 = undefined; 494 - const now = time_util.timestamp(&ts_buf); 495 593 var released_limits = std.ArrayListUnmanaged(db.concurrency_limits.ConcurrencyLimitRow){}; 496 594 497 595 // Release slots for each named limit ··· 552 650 row.avg_slot_occupancy_seconds, 553 651 }); 554 652 } 653 + 654 + // Lease endpoints 655 + 656 + fn filterLeases(r: zap.Request) !void { 657 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 658 + defer arena.deinit(); 659 + const alloc = arena.allocator(); 660 + 661 + var limit: usize = 200; 662 + var offset: usize = 0; 663 + 664 + if (r.body) |body| { 665 + if (json.parseFromSlice(json.Value, alloc, body, .{})) |parsed| { 666 + if (parsed.value.object.get("limit")) |v| { 667 + if (v == .integer) limit = @intCast(v.integer); 668 + } 669 + if (parsed.value.object.get("offset")) |v| { 670 + if (v == .integer) offset = @intCast(v.integer); 671 + } 672 + } else |_| {} 673 + } 674 + 675 + const storage = lease_storage.getLeaseStorage(alloc); 676 + const lease_ids = try storage.readActiveLeaseIds(alloc, limit, offset); 677 + 678 + var output = std.ArrayListUnmanaged(u8){}; 679 + try output.append(alloc, '['); 680 + 681 + for (lease_ids, 0..) |lease_id, i| { 682 + if (i > 0) try output.append(alloc, ','); 683 + 684 + if (storage.readLease(lease_id)) |lease| { 685 + try output.writer(alloc).print( 686 + \\{{"id":"{s}","slots":{d},"expiration_us":{d},"created_at_us":{d} 687 + , .{ lease.id, lease.slots, lease.expiration_us, lease.created_at_us }); 688 + 689 + // Add resource_ids array 690 + try output.appendSlice(alloc, ",\"resource_ids\":["); 691 + for (lease.resource_ids, 0..) |rid, j| { 692 + if (j > 0) try output.append(alloc, ','); 693 + try output.writer(alloc).print("\"{s}\"", .{rid}); 694 + } 695 + try output.append(alloc, ']'); 696 + 697 + // Add optional holder info 698 + if (lease.holder_type) |ht| { 699 + try output.writer(alloc).print(",\"holder_type\":\"{s}\"", .{ht}); 700 + } 701 + if (lease.holder_id) |hi| { 702 + try output.writer(alloc).print(",\"holder_id\":\"{s}\"", .{hi}); 703 + } 704 + 705 + try output.append(alloc, '}'); 706 + } 707 + } 708 + 709 + try output.append(alloc, ']'); 710 + 711 + r.setStatus(.ok); 712 + r.setHeader("content-type", "application/json") catch {}; 713 + r.setHeader("access-control-allow-origin", "*") catch {}; 714 + try r.sendBody(output.items); 715 + } 716 + 717 + fn renewLease(r: zap.Request) !void { 718 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 719 + defer arena.deinit(); 720 + const alloc = arena.allocator(); 721 + 722 + const body = r.body orelse { 723 + json_util.sendStatus(r, "{\"detail\":\"body required\"}", .bad_request); 724 + return; 725 + }; 726 + 727 + const parsed = json.parseFromSlice(json.Value, alloc, body, .{}) catch { 728 + json_util.sendStatus(r, "{\"detail\":\"invalid json\"}", .bad_request); 729 + return; 730 + }; 731 + 732 + const obj = parsed.value.object; 733 + 734 + const lease_id = (obj.get("lease_id") orelse { 735 + json_util.sendStatus(r, "{\"detail\":\"lease_id required\"}", .bad_request); 736 + return; 737 + }).string; 738 + 739 + // TTL for renewal (default 30 seconds) 740 + const ttl_seconds: f64 = if (obj.get("ttl_seconds")) |v| switch (v) { 741 + .float => |f| f, 742 + .integer => |i| @as(f64, @floatFromInt(i)), 743 + else => 30.0, 744 + } else 30.0; 745 + 746 + const storage = lease_storage.getLeaseStorage(alloc); 747 + const renewed = storage.renewLease(lease_id, ttl_seconds); 748 + 749 + if (renewed) { 750 + r.setStatus(.ok); 751 + r.setHeader("content-type", "application/json") catch {}; 752 + r.setHeader("access-control-allow-origin", "*") catch {}; 753 + try r.sendBody("{\"renewed\":true}"); 754 + } else { 755 + json_util.sendStatus(r, "{\"detail\":\"lease not found\"}", .not_found); 756 + } 757 + } 758 + 759 + fn revokeLease(r: zap.Request) !void { 760 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 761 + defer arena.deinit(); 762 + const alloc = arena.allocator(); 763 + 764 + const target = r.path orelse "/"; 765 + 766 + // Extract lease_id from path: /v2/concurrency_limits/leases/{lease_id} 767 + const prefix1 = "/v2/concurrency_limits/leases/"; 768 + const prefix2 = "/api/v2/concurrency_limits/leases/"; 769 + 770 + var lease_id: []const u8 = undefined; 771 + if (mem.startsWith(u8, target, prefix2)) { 772 + lease_id = target[prefix2.len..]; 773 + } else if (mem.startsWith(u8, target, prefix1)) { 774 + lease_id = target[prefix1.len..]; 775 + } else { 776 + json_util.sendStatus(r, "{\"detail\":\"invalid path\"}", .bad_request); 777 + return; 778 + } 779 + 780 + if (lease_id.len == 0) { 781 + json_util.sendStatus(r, "{\"detail\":\"lease_id required\"}", .bad_request); 782 + return; 783 + } 784 + 785 + const storage = lease_storage.getLeaseStorage(alloc); 786 + var ts_buf: [32]u8 = undefined; 787 + const now = time_util.timestamp(&ts_buf); 788 + 789 + // Read lease to release slots before revoking 790 + if (storage.readLease(lease_id)) |lease| { 791 + const occupancy = storage.calculateOccupancy(lease_id); 792 + for (lease.resource_ids) |resource_id| { 793 + _ = try db.concurrency_limits.releaseSlots(resource_id, lease.slots, occupancy, now); 794 + } 795 + storage.revokeLease(lease_id); 796 + } 797 + 798 + // Always return success (idempotent) 799 + r.setStatus(.no_content); 800 + r.setHeader("access-control-allow-origin", "*") catch {}; 801 + try r.sendBody(""); 802 + }
+214
src/concurrency/lease_storage.zig
··· 1 + const std = @import("std"); 2 + const Allocator = std.mem.Allocator; 3 + 4 + const uuid_util = @import("../utilities/uuid.zig"); 5 + const time_util = @import("../utilities/time.zig"); 6 + 7 + /// Metadata associated with a concurrency lease 8 + pub const LeaseMetadata = struct { 9 + slots: i64, 10 + holder_type: ?[]const u8 = null, // "flow_run", "task_run", "deployment" 11 + holder_id: ?[]const u8 = null, 12 + }; 13 + 14 + /// A lease on concurrency limit resources 15 + pub const ResourceLease = struct { 16 + id: []const u8, 17 + resource_ids: []const []const u8, // IDs of concurrency limits this lease is for 18 + expiration_us: i64, // microseconds since epoch 19 + created_at_us: i64, 20 + slots: i64, 21 + holder_type: ?[]const u8, 22 + holder_id: ?[]const u8, 23 + }; 24 + 25 + /// In-memory lease storage (singleton) 26 + /// Thread-safe via mutex for concurrent access 27 + pub const LeaseStorage = struct { 28 + allocator: Allocator, 29 + leases: std.StringHashMap(ResourceLease), 30 + expirations: std.StringHashMap(i64), // lease_id -> expiration_us 31 + mutex: std.Thread.Mutex, 32 + 33 + var instance: ?*LeaseStorage = null; 34 + 35 + pub fn get(_: Allocator) *LeaseStorage { 36 + if (instance) |storage| { 37 + return storage; 38 + } 39 + // Use page_allocator for the singleton - it lives for the entire program lifetime 40 + const alloc = std.heap.page_allocator; 41 + const storage = alloc.create(LeaseStorage) catch unreachable; 42 + storage.* = LeaseStorage{ 43 + .allocator = alloc, 44 + .leases = std.StringHashMap(ResourceLease).init(alloc), 45 + .expirations = std.StringHashMap(i64).init(alloc), 46 + .mutex = .{}, 47 + }; 48 + instance = storage; 49 + return storage; 50 + } 51 + 52 + /// Create a new lease with the given TTL (in seconds) 53 + pub fn createLease( 54 + self: *LeaseStorage, 55 + resource_ids: []const []const u8, 56 + ttl_seconds: f64, 57 + metadata: LeaseMetadata, 58 + ) !ResourceLease { 59 + self.mutex.lock(); 60 + defer self.mutex.unlock(); 61 + 62 + // Generate lease ID 63 + var id_buf: [36]u8 = undefined; 64 + const lease_id = uuid_util.generate(&id_buf); 65 + const owned_id = try self.allocator.dupe(u8, lease_id); 66 + 67 + // Copy resource IDs 68 + const owned_resource_ids = try self.allocator.alloc([]const u8, resource_ids.len); 69 + for (resource_ids, 0..) |rid, i| { 70 + owned_resource_ids[i] = try self.allocator.dupe(u8, rid); 71 + } 72 + 73 + const now_us = time_util.nowMicros(); 74 + const ttl_us: i64 = @intFromFloat(ttl_seconds * 1_000_000); 75 + const expiration_us = now_us + ttl_us; 76 + 77 + // Copy optional strings 78 + const owned_holder_type = if (metadata.holder_type) |ht| try self.allocator.dupe(u8, ht) else null; 79 + const owned_holder_id = if (metadata.holder_id) |hi| try self.allocator.dupe(u8, hi) else null; 80 + 81 + const lease = ResourceLease{ 82 + .id = owned_id, 83 + .resource_ids = owned_resource_ids, 84 + .expiration_us = expiration_us, 85 + .created_at_us = now_us, 86 + .slots = metadata.slots, 87 + .holder_type = owned_holder_type, 88 + .holder_id = owned_holder_id, 89 + }; 90 + 91 + try self.leases.put(owned_id, lease); 92 + try self.expirations.put(owned_id, expiration_us); 93 + 94 + return lease; 95 + } 96 + 97 + /// Read a lease by ID 98 + pub fn readLease(self: *LeaseStorage, lease_id: []const u8) ?ResourceLease { 99 + self.mutex.lock(); 100 + defer self.mutex.unlock(); 101 + 102 + return self.leases.get(lease_id); 103 + } 104 + 105 + /// Renew a lease by updating its expiration 106 + pub fn renewLease(self: *LeaseStorage, lease_id: []const u8, ttl_seconds: f64) bool { 107 + self.mutex.lock(); 108 + defer self.mutex.unlock(); 109 + 110 + if (!self.leases.contains(lease_id)) { 111 + // Clean up orphaned expiration entry 112 + _ = self.expirations.remove(lease_id); 113 + return false; 114 + } 115 + 116 + const now_us = time_util.nowMicros(); 117 + const ttl_us: i64 = @intFromFloat(ttl_seconds * 1_000_000); 118 + const new_expiration = now_us + ttl_us; 119 + 120 + // Update expiration in lease 121 + if (self.leases.getPtr(lease_id)) |lease| { 122 + lease.expiration_us = new_expiration; 123 + } 124 + 125 + // Update expiration index 126 + if (self.expirations.getPtr(lease_id)) |exp| { 127 + exp.* = new_expiration; 128 + } 129 + 130 + return true; 131 + } 132 + 133 + /// Revoke a lease (remove it from storage) 134 + pub fn revokeLease(self: *LeaseStorage, lease_id: []const u8) void { 135 + self.mutex.lock(); 136 + defer self.mutex.unlock(); 137 + 138 + // Remove from expirations FIRST (before freeing keys) 139 + _ = self.expirations.remove(lease_id); 140 + 141 + if (self.leases.fetchRemove(lease_id)) |kv| { 142 + // Free the lease data - id last since it's the key 143 + for (kv.value.resource_ids) |rid| { 144 + self.allocator.free(rid); 145 + } 146 + self.allocator.free(kv.value.resource_ids); 147 + if (kv.value.holder_type) |ht| self.allocator.free(ht); 148 + if (kv.value.holder_id) |hi| self.allocator.free(hi); 149 + // Free id last since it was used as the key 150 + self.allocator.free(kv.value.id); 151 + } 152 + } 153 + 154 + /// Get IDs of expired leases 155 + pub fn readExpiredLeaseIds(self: *LeaseStorage, allocator: Allocator, limit: usize) ![][]const u8 { 156 + self.mutex.lock(); 157 + defer self.mutex.unlock(); 158 + 159 + const now_us = time_util.nowMicros(); 160 + var result = std.ArrayListUnmanaged([]const u8){}; 161 + 162 + var it = self.expirations.iterator(); 163 + while (it.next()) |entry| { 164 + if (result.items.len >= limit) break; 165 + if (entry.value_ptr.* < now_us) { 166 + try result.append(allocator, try allocator.dupe(u8, entry.key_ptr.*)); 167 + } 168 + } 169 + 170 + return result.toOwnedSlice(allocator); 171 + } 172 + 173 + /// Get IDs of active (non-expired) leases 174 + pub fn readActiveLeaseIds(self: *LeaseStorage, allocator: Allocator, limit: usize, offset: usize) ![][]const u8 { 175 + self.mutex.lock(); 176 + defer self.mutex.unlock(); 177 + 178 + const now_us = time_util.nowMicros(); 179 + var result = std.ArrayListUnmanaged([]const u8){}; 180 + var skipped: usize = 0; 181 + 182 + var it = self.expirations.iterator(); 183 + while (it.next()) |entry| { 184 + if (result.items.len >= limit) break; 185 + if (entry.value_ptr.* > now_us) { 186 + if (skipped < offset) { 187 + skipped += 1; 188 + continue; 189 + } 190 + try result.append(allocator, try allocator.dupe(u8, entry.key_ptr.*)); 191 + } 192 + } 193 + 194 + return result.toOwnedSlice(allocator); 195 + } 196 + 197 + /// Calculate occupancy seconds for a lease (time since creation) 198 + pub fn calculateOccupancy(self: *LeaseStorage, lease_id: []const u8) ?f64 { 199 + self.mutex.lock(); 200 + defer self.mutex.unlock(); 201 + 202 + if (self.leases.get(lease_id)) |lease| { 203 + const now_us = time_util.nowMicros(); 204 + const duration_us: f64 = @floatFromInt(now_us - lease.created_at_us); 205 + return duration_us / 1_000_000.0; // Convert to seconds 206 + } 207 + return null; 208 + } 209 + }; 210 + 211 + /// Get the global lease storage instance 212 + pub fn getLeaseStorage(allocator: Allocator) *LeaseStorage { 213 + return LeaseStorage.get(allocator); 214 + }