prefect server in zig

feat: add late_runs background service

marks flow runs as Late if they haven't started within
PREFECT_API_SERVICES_LATE_RUNS_AFTER_SECONDS (default 15s)
after their scheduled start time.

- add getLateRuns query to flow_runs.zig
- add late_runs service with configurable threshold and loop interval
- register service in services.zig

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+165 -1
+2 -1
ROADMAP.md
··· 165 165 - [x] event_broadcaster (websocket fan-out to /api/events/out subscribers) 166 166 - [x] scheduler (create flow runs from deployment schedules - interval + cron) 167 167 - [x] lease_cleanup (expire stale concurrency leases) 168 - - [ ] late_runs (mark runs as late) 168 + - [x] late_runs (mark runs as late) 169 169 - [ ] foreman (infrastructure management) 170 170 - [ ] cancellation_cleanup (clean up cancelled runs) 171 171 - [ ] pause_expirations (expire paused runs) ··· 268 268 - work pools, work queues, workers (full CRUD + heartbeat) 269 269 - deployments + schedules (full CRUD, `.serve()` support) 270 270 - scheduler service (interval + cron, idempotent, parameter merging) 271 + - late_runs service (marks overdue scheduled runs as Late) 271 272 - get_scheduled_flow_runs (worker polling) 272 273 - events (ingest via websocket, persist, broadcast with filtered backfill) 273 274 - concurrency limits v2 (CRUD, lease-based slots, memory/redis storage)
+20
src/db/flow_runs.zig
··· 590 590 // TODO: add infrastructure_pid column and update it 591 591 } 592 592 593 + /// Get late flow runs (SCHEDULED with next_scheduled_start_time before cutoff). 594 + pub fn getLateRuns(alloc: Allocator, cutoff_timestamp: []const u8, limit: usize) ![]FlowRunRow { 595 + var results = std.ArrayListUnmanaged(FlowRunRow){}; 596 + errdefer results.deinit(alloc); 597 + var rows = backend.db.query( 598 + \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 599 + \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, 600 + \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy, state_transition_id 601 + \\FROM flow_run WHERE state_type = 'SCHEDULED' AND state_name = 'Scheduled' 602 + \\ AND next_scheduled_start_time IS NOT NULL AND next_scheduled_start_time <= ? 603 + \\ORDER BY next_scheduled_start_time ASC LIMIT ? 604 + , .{ cutoff_timestamp, @as(i64, @intCast(limit)) }) catch |err| { 605 + log.err("database", "get late flow_runs error: {}", .{err}); 606 + return err; 607 + }; 608 + defer rows.deinit(); 609 + while (rows.next()) |r| try results.append(alloc, rowToFlowRun(alloc, r)); 610 + return results.toOwnedSlice(alloc); 611 + } 612 + 593 613 pub fn delete(id: []const u8) !bool { 594 614 // check if flow run exists 595 615 var rows = backend.db.query(
+2
src/services.zig
··· 5 5 pub const event_broadcaster = @import("services/event_broadcaster.zig"); 6 6 pub const scheduler = @import("services/scheduler.zig"); 7 7 pub const lease_cleanup = @import("services/lease_cleanup.zig"); 8 + pub const late_runs = @import("services/late_runs.zig"); 8 9 9 10 pub const Service = struct { 10 11 name: []const u8, ··· 17 18 .{ .name = "event_broadcaster", .start = event_broadcaster.start, .stop = event_broadcaster.stop }, 18 19 .{ .name = "scheduler", .start = scheduler.start, .stop = scheduler.stop }, 19 20 .{ .name = "lease_cleanup", .start = lease_cleanup.start, .stop = lease_cleanup.stop }, 21 + .{ .name = "late_runs", .start = late_runs.start, .stop = late_runs.stop }, 20 22 }; 21 23 22 24 pub fn startAll() !void {
+141
src/services/late_runs.zig
··· 1 + //! Late runs service 2 + //! 3 + //! Background service that marks flow runs as "Late" if they haven't started 4 + //! within the configured threshold after their scheduled start time. 5 + //! 6 + //! Configuration (env vars): 7 + //! - PREFECT_API_SERVICES_LATE_RUNS_LOOP_SECONDS: loop interval (default: 5) 8 + //! - PREFECT_API_SERVICES_LATE_RUNS_AFTER_SECONDS: threshold (default: 15) 9 + 10 + const std = @import("std"); 11 + const Thread = std.Thread; 12 + const log = @import("../logging.zig"); 13 + const db = @import("../db/sqlite.zig"); 14 + const time_util = @import("../utilities/time.zig"); 15 + const uuid_util = @import("../utilities/uuid.zig"); 16 + 17 + /// Max runs to process per cycle 18 + const BATCH_SIZE: usize = 100; 19 + 20 + fn getLoopIntervalMs() u64 { 21 + const env_val = std.posix.getenv("PREFECT_API_SERVICES_LATE_RUNS_LOOP_SECONDS"); 22 + if (env_val) |val| { 23 + const seconds = std.fmt.parseInt(u64, val, 10) catch return 5_000; 24 + return seconds * 1000; 25 + } 26 + return 5_000; // Default 5 seconds 27 + } 28 + 29 + fn getLateAfterSeconds() i64 { 30 + const env_val = std.posix.getenv("PREFECT_API_SERVICES_LATE_RUNS_AFTER_SECONDS"); 31 + if (env_val) |val| { 32 + return std.fmt.parseInt(i64, val, 10) catch return 15; 33 + } 34 + return 15; // Default 15 seconds 35 + } 36 + 37 + var service_thread: ?Thread = null; 38 + var running: bool = false; 39 + var mutex: Thread.Mutex = .{}; 40 + 41 + pub fn start() !void { 42 + mutex.lock(); 43 + defer mutex.unlock(); 44 + 45 + if (running) return; 46 + running = true; 47 + 48 + const interval_ms = getLoopIntervalMs(); 49 + const threshold = getLateAfterSeconds(); 50 + log.info("late_runs", "starting (interval: {}ms, threshold: {}s)", .{ interval_ms, threshold }); 51 + 52 + service_thread = try Thread.spawn(.{}, serviceLoop, .{}); 53 + } 54 + 55 + pub fn stop() void { 56 + mutex.lock(); 57 + const was_running = running; 58 + running = false; 59 + mutex.unlock(); 60 + 61 + if (!was_running) return; 62 + 63 + log.info("late_runs", "stopping", .{}); 64 + 65 + if (service_thread) |t| { 66 + t.join(); 67 + service_thread = null; 68 + } 69 + } 70 + 71 + fn serviceLoop() void { 72 + const interval_ms = getLoopIntervalMs(); 73 + 74 + while (true) { 75 + mutex.lock(); 76 + const should_run = running; 77 + mutex.unlock(); 78 + 79 + if (!should_run) break; 80 + 81 + markLateRuns() catch |err| { 82 + log.err("late_runs", "error marking late runs: {}", .{err}); 83 + }; 84 + 85 + Thread.sleep(interval_ms * std.time.ns_per_ms); 86 + } 87 + } 88 + 89 + fn markLateRuns() !void { 90 + var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); 91 + defer arena.deinit(); 92 + const alloc = arena.allocator(); 93 + 94 + const threshold_seconds = getLateAfterSeconds(); 95 + const now_micros = time_util.nowMicros(); 96 + const cutoff_micros = now_micros - (threshold_seconds * 1_000_000); 97 + 98 + // Format cutoff as ISO timestamp 99 + var cutoff_buf: [32]u8 = undefined; 100 + const cutoff_timestamp = time_util.formatMicros(&cutoff_buf, cutoff_micros); 101 + 102 + // Query for late runs 103 + const late_runs = try db.flow_runs.getLateRuns(alloc, cutoff_timestamp, BATCH_SIZE); 104 + if (late_runs.len == 0) return; 105 + 106 + log.debug("late_runs", "found {} late runs", .{late_runs.len}); 107 + 108 + var ts_buf: [32]u8 = undefined; 109 + const now = time_util.timestamp(&ts_buf); 110 + var marked: usize = 0; 111 + 112 + for (late_runs) |run| { 113 + // Generate new state ID 114 + var state_id_buf: [36]u8 = undefined; 115 + const state_id = uuid_util.generate(&state_id_buf); 116 + 117 + // Mark as Late (state_type stays SCHEDULED, state_name becomes "Late") 118 + db.flow_runs.setState( 119 + run.id, 120 + state_id, 121 + "SCHEDULED", // state_type unchanged 122 + "Late", // state_name 123 + now, 124 + run.start_time, // preserve 125 + run.end_time, // preserve 126 + run.run_count, // preserve 127 + run.total_run_time, // preserve 128 + run.expected_start_time, // preserve 129 + null, // no transition_id needed 130 + ) catch |err| { 131 + log.warn("late_runs", "failed to mark run {s} as late: {}", .{ run.id, err }); 132 + continue; 133 + }; 134 + 135 + marked += 1; 136 + } 137 + 138 + if (marked > 0) { 139 + log.info("late_runs", "marked {} runs as late", .{marked}); 140 + } 141 + }