prefect server in zig

feat: add lease cleanup background service (phase 4)

- add lease_cleanup service (src/services/lease_cleanup.zig)
- runs every 10 seconds
- processes up to 100 expired leases per cycle
- releases slots and revokes expired leases automatically

- register service in services.zig

completes concurrency limits v2 implementation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+116
+2
src/services.zig
··· 4 4 pub const event_persister = @import("services/event_persister.zig"); 5 5 pub const event_broadcaster = @import("services/event_broadcaster.zig"); 6 6 pub const scheduler = @import("services/scheduler.zig"); 7 + pub const lease_cleanup = @import("services/lease_cleanup.zig"); 7 8 8 9 pub const Service = struct { 9 10 name: []const u8, ··· 15 16 .{ .name = "event_persister", .start = event_persister.start, .stop = event_persister.stop }, 16 17 .{ .name = "event_broadcaster", .start = event_broadcaster.start, .stop = event_broadcaster.stop }, 17 18 .{ .name = "scheduler", .start = scheduler.start, .stop = scheduler.stop }, 19 + .{ .name = "lease_cleanup", .start = lease_cleanup.start, .stop = lease_cleanup.stop }, 18 20 }; 19 21 20 22 pub fn startAll() !void {
+114
src/services/lease_cleanup.zig
··· 1 + const std = @import("std"); 2 + const Thread = std.Thread; 3 + const log = @import("../logging.zig"); 4 + const db = @import("../db/sqlite.zig"); 5 + const time_util = @import("../utilities/time.zig"); 6 + const lease_storage = @import("../concurrency/lease_storage.zig"); 7 + 8 + /// Cleanup interval in milliseconds (default: 10 seconds) 9 + const CLEANUP_INTERVAL_MS: u64 = 10_000; 10 + 11 + /// Max leases to process per cleanup cycle 12 + const BATCH_SIZE: usize = 100; 13 + 14 + var cleanup_thread: ?Thread = null; 15 + var running: bool = false; 16 + var mutex: Thread.Mutex = .{}; 17 + 18 + pub fn start() !void { 19 + mutex.lock(); 20 + defer mutex.unlock(); 21 + 22 + if (running) return; 23 + running = true; 24 + 25 + log.info("lease_cleanup", "starting (interval: {d}ms, batch: {d})", .{ CLEANUP_INTERVAL_MS, BATCH_SIZE }); 26 + 27 + cleanup_thread = try Thread.spawn(.{}, cleanupLoop, .{}); 28 + } 29 + 30 + pub fn stop() void { 31 + mutex.lock(); 32 + const was_running = running; 33 + running = false; 34 + mutex.unlock(); 35 + 36 + if (!was_running) return; 37 + 38 + log.info("lease_cleanup", "stopping", .{}); 39 + 40 + if (cleanup_thread) |t| { 41 + t.join(); 42 + cleanup_thread = null; 43 + } 44 + } 45 + 46 + fn cleanupLoop() void { 47 + while (true) { 48 + // Check if we should stop 49 + mutex.lock(); 50 + const should_run = running; 51 + mutex.unlock(); 52 + 53 + if (!should_run) break; 54 + 55 + // Sleep before next cleanup cycle 56 + Thread.sleep(CLEANUP_INTERVAL_MS * std.time.ns_per_ms); 57 + 58 + // Check again after sleep 59 + mutex.lock(); 60 + const still_running = running; 61 + mutex.unlock(); 62 + 63 + if (!still_running) break; 64 + 65 + // Perform cleanup 66 + cleanupExpiredLeases() catch |err| { 67 + log.err("lease_cleanup", "cleanup error: {}", .{err}); 68 + }; 69 + } 70 + } 71 + 72 + fn cleanupExpiredLeases() !void { 73 + const storage = lease_storage.getLeaseStorage(std.heap.page_allocator); 74 + 75 + // Get expired lease IDs 76 + const expired_ids = try storage.readExpiredLeaseIds(std.heap.page_allocator, BATCH_SIZE); 77 + defer { 78 + for (expired_ids) |id| { 79 + std.heap.page_allocator.free(id); 80 + } 81 + std.heap.page_allocator.free(expired_ids); 82 + } 83 + 84 + if (expired_ids.len == 0) return; 85 + 86 + log.debug("lease_cleanup", "processing {d} expired leases", .{expired_ids.len}); 87 + 88 + var ts_buf: [32]u8 = undefined; 89 + const now = time_util.timestamp(&ts_buf); 90 + var released: usize = 0; 91 + 92 + for (expired_ids) |lease_id| { 93 + // Read lease to get resource_ids and slots 94 + if (storage.readLease(lease_id)) |lease| { 95 + // Calculate occupancy time 96 + const occupancy = storage.calculateOccupancy(lease_id); 97 + 98 + // Release slots for each resource 99 + for (lease.resource_ids) |resource_id| { 100 + _ = db.concurrency_limits.releaseSlots(resource_id, lease.slots, occupancy, now) catch |err| { 101 + log.warn("lease_cleanup", "failed to release slots for {s}: {}", .{ resource_id, err }); 102 + }; 103 + } 104 + 105 + // Revoke the lease 106 + storage.revokeLease(lease_id); 107 + released += 1; 108 + } 109 + } 110 + 111 + if (released > 0) { 112 + log.info("lease_cleanup", "cleaned up {d} expired leases", .{released}); 113 + } 114 + }