prefect server in zig

add scheduler service for interval schedules

evaluates active deployment schedules every 30 seconds, creating
SCHEDULED flow runs based on interval configuration. supports
max_scheduled_runs limits and skips paused deployments.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+196 -2
+21
src/db/deployment_schedules.zig
··· 165 165 }; 166 166 return @intCast(affected); 167 167 } 168 + 169 + /// List all active schedules (for scheduler service) 170 + pub fn listActive(alloc: Allocator, limit: usize) ![]DeploymentScheduleRow { 171 + var results = std.ArrayListUnmanaged(DeploymentScheduleRow){}; 172 + errdefer results.deinit(alloc); 173 + 174 + var rows = backend.db.query( 175 + "SELECT " ++ select_cols ++ " FROM deployment_schedule WHERE active = 1 ORDER BY created ASC LIMIT ?", 176 + .{@as(i64, @intCast(limit))}, 177 + ) catch |err| { 178 + log.err("database", "list active schedules error: {}", .{err}); 179 + return err; 180 + }; 181 + defer rows.deinit(); 182 + 183 + while (rows.next()) |r| { 184 + try results.append(alloc, try rowFromResult(alloc, &r)); 185 + } 186 + 187 + return results.toOwnedSlice(alloc); 188 + }
+4 -2
src/db/flow_runs.zig
··· 34 34 work_queue_name: ?[]const u8 = null, 35 35 work_queue_id: ?[]const u8 = null, 36 36 auto_scheduled: bool = false, 37 + expected_start_time: ?[]const u8 = null, 37 38 }; 38 39 39 40 pub fn insert( ··· 47 48 ) !void { 48 49 backend.db.exec( 49 50 \\INSERT INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp, 50 - \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled) 51 - \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 51 + \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, expected_start_time) 52 + \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 52 53 , .{ 53 54 id, 54 55 flow_id, ··· 61 62 params.work_queue_name, 62 63 params.work_queue_id, 63 64 @as(i64, if (params.auto_scheduled) 1 else 0), 65 + params.expected_start_time, 64 66 }) catch |err| { 65 67 log.err("database", "insert flow_run error: {}", .{err}); 66 68 return err;
+2
src/services/mod.zig
··· 3 3 4 4 pub const event_persister = @import("event_persister.zig"); 5 5 pub const event_broadcaster = @import("event_broadcaster.zig"); 6 + pub const scheduler = @import("scheduler.zig"); 6 7 7 8 pub const Service = struct { 8 9 name: []const u8, ··· 13 14 const all = [_]Service{ 14 15 .{ .name = "event_persister", .start = event_persister.start, .stop = event_persister.stop }, 15 16 .{ .name = "event_broadcaster", .start = event_broadcaster.start, .stop = event_broadcaster.stop }, 17 + .{ .name = "scheduler", .start = scheduler.start, .stop = scheduler.stop }, 16 18 }; 17 19 18 20 pub fn startAll() !void {
+169
src/services/scheduler.zig
··· 1 + const std = @import("std"); 2 + const Thread = std.Thread; 3 + const json = std.json; 4 + const log = @import("../logging.zig"); 5 + const db = @import("../db/sqlite.zig"); 6 + const time_util = @import("../utilities/time.zig"); 7 + const uuid_util = @import("../utilities/uuid.zig"); 8 + 9 + // Configuration 10 + const SCHEDULER_INTERVAL_MS: u64 = 30_000; // 30 seconds 11 + const MAX_SCHEDULED_RUNS: usize = 50; // Per schedule per tick 12 + const BATCH_SIZE: usize = 100; // Max schedules to evaluate per tick 13 + 14 + var scheduler_thread: ?Thread = null; 15 + var running: bool = false; 16 + var mutex: Thread.Mutex = .{}; 17 + 18 + pub fn start() !void { 19 + mutex.lock(); 20 + defer mutex.unlock(); 21 + 22 + if (running) return; 23 + running = true; 24 + 25 + log.info("scheduler", "starting (interval: {}ms, max_runs: {})", .{ SCHEDULER_INTERVAL_MS, MAX_SCHEDULED_RUNS }); 26 + scheduler_thread = try Thread.spawn(.{}, schedulerLoop, .{}); 27 + } 28 + 29 + pub fn stop() void { 30 + mutex.lock(); 31 + const was_running = running; 32 + running = false; 33 + mutex.unlock(); 34 + 35 + if (!was_running) return; 36 + 37 + log.info("scheduler", "stopping", .{}); 38 + 39 + if (scheduler_thread) |t| { 40 + t.join(); 41 + scheduler_thread = null; 42 + } 43 + } 44 + 45 + fn schedulerLoop() void { 46 + while (running) { 47 + evaluateSchedules() catch |err| { 48 + log.err("scheduler", "error evaluating schedules: {}", .{err}); 49 + }; 50 + Thread.sleep(SCHEDULER_INTERVAL_MS * std.time.ns_per_ms); 51 + } 52 + } 53 + 54 + fn evaluateSchedules() !void { 55 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 56 + defer arena.deinit(); 57 + const alloc = arena.allocator(); 58 + 59 + const schedules = try db.deployment_schedules.listActive(alloc, BATCH_SIZE); 60 + if (schedules.len == 0) return; 61 + 62 + var ts_buf: [32]u8 = undefined; 63 + const now = time_util.timestamp(&ts_buf); 64 + const now_micros = time_util.nowMicros(); 65 + 66 + var runs_created: usize = 0; 67 + for (schedules) |schedule| { 68 + const count = evaluateSchedule(alloc, schedule, now, now_micros) catch |err| { 69 + log.err("scheduler", "error evaluating schedule {s}: {}", .{ schedule.id, err }); 70 + continue; 71 + }; 72 + runs_created += count; 73 + } 74 + 75 + if (runs_created > 0) { 76 + log.info("scheduler", "created {} scheduled flow runs", .{runs_created}); 77 + } 78 + } 79 + 80 + fn evaluateSchedule(alloc: std.mem.Allocator, schedule: db.deployment_schedules.DeploymentScheduleRow, now: []const u8, now_micros: i64) !usize { 81 + // Get deployment to check if it's paused 82 + const deployment = db.deployments.getById(alloc, schedule.deployment_id) catch null orelse return 0; 83 + if (deployment.paused) return 0; 84 + 85 + // Parse schedule JSON 86 + const parsed = json.parseFromSlice(json.Value, alloc, schedule.schedule, .{}) catch return 0; 87 + const obj = parsed.value.object; 88 + 89 + // Determine schedule type and evaluate 90 + if (obj.get("interval")) |interval_val| { 91 + return evaluateIntervalSchedule(alloc, schedule, deployment, interval_val, now, now_micros); 92 + } 93 + // TODO: cron and rrule schedules 94 + return 0; 95 + } 96 + 97 + fn evaluateIntervalSchedule( 98 + alloc: std.mem.Allocator, 99 + schedule: db.deployment_schedules.DeploymentScheduleRow, 100 + deployment: db.deployments.DeploymentRow, 101 + interval_val: json.Value, 102 + now: []const u8, 103 + now_micros: i64, 104 + ) !usize { 105 + const interval_seconds: i64 = switch (interval_val) { 106 + .integer => |i| i, 107 + .float => |f| @intFromFloat(f), 108 + else => return 0, 109 + }; 110 + if (interval_seconds <= 0) return 0; 111 + 112 + const interval_micros = interval_seconds * 1_000_000; 113 + 114 + // Get existing scheduled runs for this deployment 115 + const existing_runs = db.flow_runs.getScheduledByDeployment(alloc, deployment.id, MAX_SCHEDULED_RUNS) catch return 0; 116 + const existing_count = existing_runs.len; 117 + 118 + // Respect max_scheduled_runs limit 119 + const max_runs: usize = if (schedule.max_scheduled_runs) |m| @intCast(m) else MAX_SCHEDULED_RUNS; 120 + if (existing_count >= max_runs) return 0; 121 + 122 + // Find latest scheduled time 123 + var latest_scheduled_micros: i64 = now_micros; 124 + for (existing_runs) |run| { 125 + if (run.expected_start_time) |est| { 126 + if (time_util.parse(est)) |ts| { 127 + if (ts > latest_scheduled_micros) { 128 + latest_scheduled_micros = ts; 129 + } 130 + } 131 + } 132 + } 133 + 134 + // Schedule runs starting from latest + interval 135 + var runs_created: usize = 0; 136 + var next_time = latest_scheduled_micros + interval_micros; 137 + const max_future = now_micros + (interval_micros * @as(i64, @intCast(max_runs))); 138 + 139 + while (runs_created < max_runs - existing_count and next_time <= max_future) { 140 + // Create the scheduled flow run 141 + var id_buf: [36]u8 = undefined; 142 + const run_id = uuid_util.generate(&id_buf); 143 + 144 + var name_buf: [64]u8 = undefined; 145 + const run_name = std.fmt.bufPrint(&name_buf, "{s}-{s}", .{ 146 + deployment.name[0..@min(deployment.name.len, 20)], 147 + run_id[0..8], 148 + }) catch "scheduled-run"; 149 + 150 + var scheduled_time_buf: [32]u8 = undefined; 151 + const scheduled_time = time_util.formatMicros(&scheduled_time_buf, next_time); 152 + 153 + db.flow_runs.insert(run_id, deployment.flow_id, run_name, "SCHEDULED", "Scheduled", now, .{ 154 + .deployment_id = deployment.id, 155 + .deployment_version = deployment.version, 156 + .work_queue_name = deployment.work_queue_name, 157 + .work_queue_id = deployment.work_queue_id, 158 + .auto_scheduled = true, 159 + .expected_start_time = scheduled_time, 160 + }) catch { 161 + break; 162 + }; 163 + 164 + runs_created += 1; 165 + next_time += interval_micros; 166 + } 167 + 168 + return runs_created; 169 + }