prefect server in zig

implement RetryFailedFlows orchestration rule

when a flow run transitions from RUNNING to FAILED:
- if retries configured and run_count <= retries, reject FAILED
- schedule AwaitingRetry state with retry_delay offset
- return REJECT status with SCHEDULED state (matching python behavior)

changes:
- add empirical_policy column (JSON with retries/retry_delay)
- extend RuleContext with retry fields and scheduleRetry method
- implement rule in flow_rules.zig with 6 unit tests
- wire up API to parse empirical_policy and handle retry response
- add test-retry script and justfile target
- use PREFECT_PROFILE=oss for test harness (uses prefect profiles)
- add loq exceptions for files with inline tests
- reference zig 0.15 notes in CLAUDE.md

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+534 -23
+1
CLAUDE.md
··· 45 45 ## reference 46 46 47 47 python prefect source: `~/github.com/prefecthq/prefect` - read from disk, don't fetch 48 + zig 0.15 notes: `~/tangled.sh/@zzstoatzz.io/notes/languages/ziglang/0.15/` - syntax patterns, arraylist, io, etc. 48 49 49 50 ## docs 50 51
+1 -1
ROADMAP.md
··· 139 139 - [x] PreventPendingTransitions - reject PENDING/RUNNING/CANCELLING/CANCELLED → PENDING 140 140 - [x] CopyScheduledTime - copy scheduled_time when SCHEDULED → PENDING 141 141 - [x] WaitForScheduledTime - delay transition if scheduled_time in future 142 + - [x] RetryFailedFlows - schedule retry when RUNNING → FAILED if retries available 142 143 - [ ] PreventDuplicateTransitions - idempotency via transition_id 143 144 - [ ] HandleFlowTerminalStateTransitions - prevent leaving completed with persisted data 144 - - [ ] RetryFailedFlows - govern retry transitions 145 145 - [ ] HandlePausingFlows - govern RUNNING → PAUSED 146 146 - [ ] HandleResumingPausedFlows - govern PAUSED → RUNNING 147 147
+9 -5
justfile
··· 16 16 17 17 # run API test suite against local server 18 18 test: 19 - PREFECT_API_URL=http://localhost:4200/api ./scripts/test-api-sequence 19 + PREFECT_PROFILE=zig-server ./scripts/test-api-sequence 20 20 21 21 # run .serve() integration test 22 22 test-serve: 23 - PREFECT_API_URL=http://localhost:4200/api ./scripts/test-serve 23 + PREFECT_PROFILE=zig-server ./scripts/test-serve 24 24 25 25 # broker backend tests (memory + redis) 26 26 test-broker backend="all": ··· 50 50 51 51 # run high-level prefect client tests (requires running server) 52 52 test-client: 53 - PREFECT_API_URL=http://localhost:4200/api ./scripts/test-flow 54 - PREFECT_API_URL=http://localhost:4200/api ./scripts/test-blocks 53 + PREFECT_PROFILE=zig-server ./scripts/test-flow 54 + PREFECT_PROFILE=zig-server ./scripts/test-blocks 55 55 56 56 # run worker integration tests (scheduler + worker execution) 57 57 test-worker: 58 - PREFECT_API_URL=http://localhost:4200/api ./scripts/test-worker 58 + PREFECT_PROFILE=zig-server ./scripts/test-worker 59 + 60 + # run retry orchestration tests 61 + test-retry: 62 + PREFECT_PROFILE=zig-server ./scripts/test-retry 59 63 60 64 # run full test matrix (all db × broker combinations) 61 65 test-matrix:
+12
loq.toml
··· 19 19 [[rules]] 20 20 path = "src/api/deployments.zig" 21 21 max_lines = 900 22 + 23 + [[rules]] 24 + path = "src/orchestration/flow_rules.zig" 25 + max_lines = 600 26 + 27 + [[rules]] 28 + path = "src/api/flow_runs.zig" 29 + max_lines = 600 30 + 31 + [[rules]] 32 + path = "src/db/flow_runs.zig" 33 + max_lines = 600
+156
scripts/test-retry
··· 1 + #!/usr/bin/env -S uv run --script --quiet 2 + # /// script 3 + # requires-python = ">=3.12" 4 + # dependencies = ["httpx"] 5 + # /// 6 + """ 7 + test script for RetryFailedFlows orchestration rule 8 + 9 + tests: 10 + 1. flow run accepts empirical_policy with retry settings 11 + 2. RUNNING -> FAILED transition triggers retry (returns AwaitingRetry state) 12 + 3. retry respects max retries limit 13 + 14 + usage: 15 + PREFECT_API_URL=http://localhost:4200/api ./scripts/test-retry 16 + """ 17 + import os 18 + import httpx 19 + 20 + API_URL = os.environ.get("PREFECT_API_URL", "http://localhost:4200/api") 21 + 22 + 23 + def test_empirical_policy_accepted(): 24 + """test that flow run accepts empirical_policy""" 25 + print("\n=== test_empirical_policy_accepted ===") 26 + 27 + # create flow (or get existing) 28 + import uuid 29 + flow_name = f"retry-test-flow-{uuid.uuid4().hex[:8]}" 30 + resp = httpx.post(f"{API_URL}/flows/", json={"name": flow_name}) 31 + assert resp.status_code in (200, 201), f"failed to create flow: {resp.text}" 32 + flow_id = resp.json()["id"] 33 + print(f" created flow: {flow_id}") 34 + 35 + # create flow run with retry settings 36 + resp = httpx.post(f"{API_URL}/flow_runs/", json={ 37 + "flow_id": flow_id, 38 + "name": "retry-test-run", 39 + "empirical_policy": {"retries": 3, "retry_delay": 5} 40 + }) 41 + assert resp.status_code == 201, f"failed to create flow run: {resp.text}" 42 + run = resp.json() 43 + print(f" created flow run: {run['id']}") 44 + print(f" empirical_policy: {run.get('empirical_policy')}") 45 + 46 + assert run.get("empirical_policy") == {"retries": 3, "retry_delay": 5}, \ 47 + f"empirical_policy not stored correctly: {run.get('empirical_policy')}" 48 + print("✓ empirical_policy accepted and returned") 49 + 50 + return flow_id, run["id"] 51 + 52 + 53 + def test_retry_triggers_on_failure(flow_id: str, run_id: str): 54 + """test that RUNNING -> FAILED triggers retry""" 55 + print("\n=== test_retry_triggers_on_failure ===") 56 + 57 + # transition to RUNNING 58 + resp = httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={ 59 + "state": {"type": "RUNNING", "name": "Running"} 60 + }) 61 + assert resp.status_code == 200, f"failed to set RUNNING: {resp.text}" 62 + result = resp.json() 63 + print(f" set RUNNING: status={result['status']}") 64 + 65 + # get run to check run_count 66 + resp = httpx.get(f"{API_URL}/flow_runs/{run_id}") 67 + run = resp.json() 68 + print(f" run_count after RUNNING: {run['run_count']}") 69 + 70 + # transition to FAILED - should trigger retry 71 + resp = httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={ 72 + "state": {"type": "FAILED", "name": "Failed"} 73 + }) 74 + assert resp.status_code == 200, f"failed to set FAILED: {resp.text}" 75 + result = resp.json() 76 + print(f" set FAILED: status={result['status']}, state_type={result['state']['type']}") 77 + 78 + # check if retry was triggered (status=REJECT, state=SCHEDULED/AwaitingRetry) 79 + if result["status"] == "REJECT" and result["state"]["type"] == "SCHEDULED": 80 + print(f" state_name: {result['state']['name']}") 81 + assert result["state"]["name"] == "AwaitingRetry", \ 82 + f"expected AwaitingRetry, got {result['state']['name']}" 83 + print("✓ retry triggered on RUNNING -> FAILED") 84 + return True 85 + else: 86 + print(f"○ retry not triggered: status={result['status']}, state={result['state']['type']}") 87 + return False 88 + 89 + 90 + def test_retry_exhaustion(flow_id: str): 91 + """test that retries are exhausted after max attempts""" 92 + print("\n=== test_retry_exhaustion ===") 93 + 94 + # create flow run with 1 retry 95 + resp = httpx.post(f"{API_URL}/flow_runs/", json={ 96 + "flow_id": flow_id, 97 + "name": "retry-exhaust-run", 98 + "empirical_policy": {"retries": 1, "retry_delay": 0} 99 + }) 100 + run_id = resp.json()["id"] 101 + print(f" created flow run with 1 retry: {run_id}") 102 + 103 + # first attempt: RUNNING -> FAILED (should retry) 104 + httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={ 105 + "state": {"type": "RUNNING", "name": "Running"} 106 + }) 107 + resp = httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={ 108 + "state": {"type": "FAILED", "name": "Failed"} 109 + }) 110 + result = resp.json() 111 + print(f" attempt 1 FAILED: status={result['status']}, state={result['state']['type']}") 112 + 113 + if result["status"] == "REJECT": 114 + # retry was triggered, now simulate second attempt 115 + # need to set to RUNNING again (from SCHEDULED) 116 + httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={ 117 + "state": {"type": "PENDING", "name": "Pending"} 118 + }) 119 + httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={ 120 + "state": {"type": "RUNNING", "name": "Running"} 121 + }) 122 + 123 + # second FAILED - should NOT retry (exhausted) 124 + resp = httpx.post(f"{API_URL}/flow_runs/{run_id}/set_state", json={ 125 + "state": {"type": "FAILED", "name": "Failed"} 126 + }) 127 + result = resp.json() 128 + print(f" attempt 2 FAILED: status={result['status']}, state={result['state']['type']}") 129 + 130 + if result["status"] == "ACCEPT" and result["state"]["type"] == "FAILED": 131 + print("✓ retries exhausted correctly") 132 + return True 133 + else: 134 + print(f"○ unexpected result after retry exhaustion") 135 + return False 136 + 137 + return False 138 + 139 + 140 + def main(): 141 + print(f"api url: {API_URL}") 142 + 143 + # check server health 144 + resp = httpx.get(f"{API_URL}/health") 145 + assert resp.status_code == 200, f"server not healthy: {resp.text}" 146 + print("✓ server healthy") 147 + 148 + flow_id, run_id = test_empirical_policy_accepted() 149 + test_retry_triggers_on_failure(flow_id, run_id) 150 + test_retry_exhaustion(flow_id) 151 + 152 + print("\n=== all tests passed ===") 153 + 154 + 155 + if __name__ == "__main__": 156 + main()
+85 -1
src/api/flow_runs.zig
··· 113 113 else => null, 114 114 } else null; 115 115 116 + // extract empirical_policy (retry settings) 117 + const empirical_policy: ?[]const u8 = if (obj.get("empirical_policy")) |v| blk: { 118 + // stringify the object back to JSON 119 + var out: std.Io.Writer.Allocating = .init(alloc); 120 + var jw: json.Stringify = .{ .writer = &out.writer }; 121 + jw.write(v) catch break :blk null; 122 + break :blk out.toOwnedSlice() catch null; 123 + } else null; 124 + 116 125 var new_id_buf: [36]u8 = undefined; 117 126 const new_id = uuid_util.generate(&new_id_buf); 118 127 var ts_buf: [32]u8 = undefined; ··· 120 129 121 130 db.insertFlowRun(new_id, flow_id, name, state_type, state_name, now, .{ 122 131 .next_scheduled_start_time = next_scheduled_start_time, 132 + .empirical_policy = empirical_policy, 123 133 }) catch { 124 134 json_util.sendStatus(r, "{\"detail\":\"insert failed\"}", .internal_server_error); 125 135 return; ··· 151 161 .work_queue_id = null, 152 162 .auto_scheduled = false, 153 163 .idempotency_key = null, 164 + .empirical_policy = empirical_policy orelse "{}", 154 165 }; 155 166 156 167 const resp = writeFlowRun(alloc, run, state_id) catch { ··· 259 270 260 271 const proposed_state_type = orchestration.StateType.fromString(state_type); 261 272 273 + // parse empirical_policy for retry settings 274 + var retries: ?i64 = null; 275 + var retry_delay: ?i64 = null; 276 + if (current_run) |run| { 277 + if (json.parseFromSlice(json.Value, alloc, run.empirical_policy, .{})) |policy_parsed| { 278 + if (policy_parsed.value.object.get("retries")) |v| { 279 + if (v == .integer) retries = v.integer; 280 + } 281 + if (policy_parsed.value.object.get("retry_delay")) |v| { 282 + if (v == .integer) retry_delay = v.integer; 283 + } 284 + } else |_| {} 285 + } 286 + 262 287 // apply orchestration rules (policy) 263 288 var rule_ctx = orchestration.RuleContext{ 264 289 .initial_state = initial_state_type, ··· 273 298 .run_id = id, 274 299 .flow_id = if (current_run) |run| run.flow_id else null, 275 300 .deployment_id = if (current_run) |run| run.deployment_id else null, 301 + // for RetryFailedFlows: pass retry settings 302 + .run_count = if (current_run) |run| run.run_count else 0, 303 + .retries = retries, 304 + .retry_delay = retry_delay, 276 305 }; 277 306 orchestration.applyPolicy(&orchestration.CoreFlowPolicy, &rule_ctx); 278 307 279 - // if rules rejected/waited/aborted, return without committing state 308 + // if rules rejected/waited/aborted, handle appropriately 280 309 if (!rule_ctx.isAccepted()) { 310 + // special case: RetryFailedFlows rejection - commit AwaitingRetry state 311 + if (rule_ctx.result.status == .REJECT and rule_ctx.retry_state_type != null) { 312 + const retry_state_type = @tagName(rule_ctx.retry_state_type.?); 313 + const retry_state_name = rule_ctx.retry_state_name orelse "AwaitingRetry"; 314 + const retry_scheduled = rule_ctx.retry_scheduled_time; 315 + 316 + // apply bookkeeping for the retry state 317 + var bookkeeping_ctx = orchestration.TransitionContext{ 318 + .current_state_type = initial_state_type, 319 + .current_state_timestamp = if (current_run) |run| 320 + if (run.state_timestamp.len > 0) run.state_timestamp else null 321 + else 322 + null, 323 + .start_time = if (current_run) |run| run.start_time else null, 324 + .end_time = if (current_run) |run| run.end_time else null, 325 + .run_count = if (current_run) |run| run.run_count else 0, 326 + .total_run_time = if (current_run) |run| run.total_run_time else 0.0, 327 + .proposed_state_type = rule_ctx.retry_state_type.?, 328 + .proposed_state_timestamp = now, 329 + }; 330 + orchestration.applyBookkeeping(&bookkeeping_ctx); 331 + 332 + // commit the retry state with next_scheduled_start_time 333 + db.setFlowRunStateWithSchedule( 334 + id, 335 + state_id, 336 + retry_state_type, 337 + retry_state_name, 338 + now, 339 + bookkeeping_ctx.new_start_time, 340 + bookkeeping_ctx.new_end_time, 341 + bookkeeping_ctx.new_run_count, 342 + bookkeeping_ctx.new_total_run_time, 343 + null, // expected_start_time 344 + retry_scheduled, // next_scheduled_start_time 345 + ) catch { 346 + json_util.sendStatus(r, "{\"detail\":\"update failed\"}", .internal_server_error); 347 + return; 348 + }; 349 + 350 + // return REJECT status with the retry state (matching Python's behavior) 351 + const resp = writeStateResponse(alloc, .REJECT, rule_ctx.result.details, retry_state_type, retry_state_name, now, state_id) catch { 352 + json_util.sendStatus(r, "{\"detail\":\"serialize error\"}", .internal_server_error); 353 + return; 354 + }; 355 + json_util.send(r, resp); 356 + return; 357 + } 358 + 359 + // normal rejection/wait/abort - return without committing 281 360 const resp = writeStateResponse( 282 361 alloc, 283 362 rule_ctx.result.status, ··· 444 523 445 524 try jw.objectField("auto_scheduled"); 446 525 try jw.write(run.auto_scheduled); 526 + 527 + try jw.objectField("empirical_policy"); 528 + try jw.beginWriteRaw(); 529 + try jw.writer.writeAll(run.empirical_policy); 530 + jw.endWriteRaw(); 447 531 448 532 try jw.endObject(); 449 533 }
+84 -13
src/db/flow_runs.zig
··· 29 29 work_queue_id: ?[]const u8, 30 30 auto_scheduled: bool, 31 31 idempotency_key: ?[]const u8, 32 + // retry policy (empirical_policy JSON) 33 + empirical_policy: []const u8, 32 34 }; 33 35 34 36 pub const InsertParams = struct { ··· 41 43 next_scheduled_start_time: ?[]const u8 = null, 42 44 idempotency_key: ?[]const u8 = null, 43 45 parameters: ?[]const u8 = null, 46 + empirical_policy: ?[]const u8 = null, // JSON: {"retries": N, "retry_delay": N} 44 47 }; 45 48 46 49 pub fn insert( ··· 55 58 backend.db.exec( 56 59 \\INSERT INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp, 57 60 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, 58 - \\ expected_start_time, next_scheduled_start_time, idempotency_key, parameters) 59 - \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 61 + \\ expected_start_time, next_scheduled_start_time, idempotency_key, parameters, empirical_policy) 62 + \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 60 63 , .{ 61 64 id, 62 65 flow_id, ··· 73 76 params.next_scheduled_start_time, 74 77 params.idempotency_key, 75 78 params.parameters orelse "{}", 79 + params.empirical_policy orelse "{}", 76 80 }) catch |err| { 77 81 log.err("database", "insert flow_run error: {}", .{err}); 78 82 return err; ··· 94 98 const sql = if (backend.db.dialect == .sqlite) 95 99 \\INSERT OR IGNORE INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp, 96 100 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, 97 - \\ expected_start_time, next_scheduled_start_time, idempotency_key, parameters) 98 - \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 101 + \\ expected_start_time, next_scheduled_start_time, idempotency_key, parameters, empirical_policy) 102 + \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 99 103 else 100 104 \\INSERT INTO flow_run (id, flow_id, name, state_type, state_name, state_timestamp, 101 105 \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, 102 - \\ expected_start_time, next_scheduled_start_time, idempotency_key, parameters) 103 - \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 106 + \\ expected_start_time, next_scheduled_start_time, idempotency_key, parameters, empirical_policy) 107 + \\VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 104 108 \\ON CONFLICT (flow_id, idempotency_key) WHERE idempotency_key IS NOT NULL DO NOTHING 105 109 ; 106 110 ··· 120 124 params.next_scheduled_start_time, 121 125 params.idempotency_key, 122 126 params.parameters orelse "{}", 127 + params.empirical_policy orelse "{}", 123 128 }) catch |err| { 124 129 log.err("database", "insertOrIgnore flow_run error: {}", .{err}); 125 130 return err; ··· 134 139 var rows = backend.db.query( 135 140 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 136 141 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, 137 - \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key 142 + \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy 138 143 \\FROM flow_run WHERE id = ? 139 144 , .{id}) catch return null; 140 145 defer rows.deinit(); ··· 215 220 }; 216 221 } 217 222 223 + /// Set flow run state with next_scheduled_start_time (for AwaitingRetry state) 224 + pub fn setStateWithSchedule( 225 + run_id: []const u8, 226 + state_id: []const u8, 227 + state_type: []const u8, 228 + state_name: []const u8, 229 + timestamp: []const u8, 230 + start_time: ?[]const u8, 231 + end_time: ?[]const u8, 232 + run_count: i64, 233 + total_run_time: f64, 234 + expected_start_time: ?[]const u8, 235 + next_scheduled_start_time: ?[]const u8, 236 + ) !void { 237 + if (backend.db.dialect == .sqlite) { 238 + backend.db.mutex.lock(); 239 + } 240 + defer if (backend.db.dialect == .sqlite) { 241 + backend.db.mutex.unlock(); 242 + }; 243 + 244 + var txn = backend.db.beginTransaction() catch |err| { 245 + log.err("database", "begin transaction error: {}", .{err}); 246 + return err; 247 + }; 248 + errdefer txn.rollback(); 249 + 250 + txn.exec( 251 + \\UPDATE flow_run SET state_id = ?, state_type = ?, state_name = ?, state_timestamp = ?, updated = ?, 252 + \\ start_time = ?, end_time = ?, run_count = ?, total_run_time = ?, 253 + \\ expected_start_time = COALESCE(?, expected_start_time), 254 + \\ next_scheduled_start_time = ? 255 + \\WHERE id = ? 256 + , .{ 257 + state_id, 258 + state_type, 259 + state_name, 260 + timestamp, 261 + timestamp, 262 + start_time, 263 + end_time, 264 + run_count, 265 + total_run_time, 266 + expected_start_time, 267 + next_scheduled_start_time, 268 + run_id, 269 + }) catch |err| { 270 + log.err("database", "update flow_run error: {}", .{err}); 271 + return err; 272 + }; 273 + 274 + txn.exec( 275 + \\INSERT INTO flow_run_state (id, flow_run_id, type, name, timestamp) 276 + \\VALUES (?, ?, ?, ?, ?) 277 + , .{ state_id, run_id, state_type, state_name, timestamp }) catch |err| { 278 + log.err("database", "insert flow_run_state error: {}", .{err}); 279 + return err; 280 + }; 281 + 282 + txn.commit() catch |err| { 283 + log.err("database", "commit error: {}", .{err}); 284 + return err; 285 + }; 286 + } 287 + 218 288 pub fn list(alloc: Allocator, limit: usize) ![]FlowRunRow { 219 289 var results = std.ArrayListUnmanaged(FlowRunRow){}; 220 290 errdefer results.deinit(alloc); ··· 222 292 var rows = backend.db.query( 223 293 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 224 294 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, 225 - \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key 295 + \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy 226 296 \\FROM flow_run ORDER BY created DESC LIMIT ? 227 297 , .{@as(i64, @intCast(limit))}) catch |err| { 228 298 log.err("database", "list flow_runs error: {}", .{err}); ··· 245 315 var rows = backend.db.query( 246 316 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 247 317 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, 248 - \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key 318 + \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy 249 319 \\FROM flow_run WHERE deployment_id = ? AND state_type = 'SCHEDULED' 250 320 \\ORDER BY expected_start_time ASC LIMIT ? 251 321 , .{ deployment_id, @as(i64, @intCast(limit)) }) catch |err| { ··· 317 387 var rows = backend.db.query( 318 388 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 319 389 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, 320 - \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key 390 + \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy 321 391 \\FROM flow_run WHERE deployment_id = ? AND state_type = 'SCHEDULED' 322 392 \\ AND (next_scheduled_start_time IS NULL OR next_scheduled_start_time <= ?) 323 393 \\ORDER BY next_scheduled_start_time ASC LIMIT ? ··· 334 404 var rows = backend.db.query( 335 405 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 336 406 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, 337 - \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key 407 + \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy 338 408 \\FROM flow_run WHERE deployment_id = ? AND state_type = 'SCHEDULED' 339 409 \\ORDER BY next_scheduled_start_time ASC LIMIT ? 340 410 , .{ deployment_id, @as(i64, @intCast(limit)) }) catch |err| { ··· 366 436 var rows = backend.db.query( 367 437 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 368 438 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, 369 - \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key 439 + \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy 370 440 \\FROM flow_run WHERE work_queue_id = ? AND state_type = 'SCHEDULED' 371 441 \\ AND (next_scheduled_start_time IS NULL OR next_scheduled_start_time <= ?) 372 442 \\ORDER BY next_scheduled_start_time ASC LIMIT ? ··· 383 453 var rows = backend.db.query( 384 454 \\SELECT id, created, updated, flow_id, name, state_type, state_name, state_timestamp, 385 455 \\ parameters, tags, run_count, expected_start_time, next_scheduled_start_time, start_time, end_time, total_run_time, 386 - \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key 456 + \\ deployment_id, deployment_version, work_queue_name, work_queue_id, auto_scheduled, idempotency_key, empirical_policy 387 457 \\FROM flow_run WHERE work_queue_id = ? AND state_type = 'SCHEDULED' 388 458 \\ORDER BY next_scheduled_start_time ASC LIMIT ? 389 459 , .{ work_queue_id, @as(i64, @intCast(limit)) }) catch |err| { ··· 465 535 .work_queue_id = if (r.text(19).len > 0) alloc.dupe(u8, r.text(19)) catch null else null, 466 536 .auto_scheduled = r.int(20) != 0, 467 537 .idempotency_key = if (r.text(21).len > 0) alloc.dupe(u8, r.text(21)) catch null else null, 538 + .empirical_policy = alloc.dupe(u8, r.text(22)) catch "{}", 468 539 }; 469 540 } 470 541
+2 -1
src/db/schema/postgres.zig
··· 43 43 \\ work_queue_name TEXT, 44 44 \\ work_queue_id TEXT, 45 45 \\ auto_scheduled INTEGER DEFAULT 0, 46 - \\ idempotency_key TEXT 46 + \\ idempotency_key TEXT, 47 + \\ empirical_policy JSONB DEFAULT '{}' 47 48 \\) 48 49 , .{}); 49 50
+2 -1
src/db/schema/sqlite.zig
··· 40 40 \\ work_queue_name TEXT, 41 41 \\ work_queue_id TEXT, 42 42 \\ auto_scheduled INTEGER DEFAULT 0, 43 - \\ idempotency_key TEXT 43 + \\ idempotency_key TEXT, 44 + \\ empirical_policy TEXT DEFAULT '{}' 44 45 \\) 45 46 , .{}); 46 47
+1
src/db/sqlite.zig
··· 35 35 pub const insertFlowRun = flow_runs.insert; 36 36 pub const getFlowRun = flow_runs.get; 37 37 pub const setFlowRunState = flow_runs.setState; 38 + pub const setFlowRunStateWithSchedule = flow_runs.setStateWithSchedule; 38 39 pub const listFlowRuns = flow_runs.list; 39 40 40 41 pub const insertTaskRun = task_runs.insert;
+160 -1
src/orchestration/flow_rules.zig
··· 102 102 } 103 103 104 104 // ============================================================================ 105 + // RetryFailedFlows 106 + // ============================================================================ 107 + 108 + /// RetryFailedFlows: schedule retry when flow run fails if retries are available 109 + /// 110 + /// when transitioning RUNNING → FAILED, this rule checks if retries are configured 111 + /// and if run_count hasn't exceeded the retry limit. if retries are available, 112 + /// the transition to FAILED is rejected and an AwaitingRetry (SCHEDULED) state 113 + /// is proposed instead. 114 + /// 115 + /// requires: run_count, retries, retry_delay in RuleContext 116 + /// output: retry_state_type, retry_state_name, retry_scheduled_time 117 + pub const RetryFailedFlows = OrchestrationRule{ 118 + .name = "RetryFailedFlows", 119 + .from_states = StateTypeSet.init(&.{.RUNNING}), 120 + .to_states = StateTypeSet.init(&.{.FAILED}), 121 + .before_transition = retryFailedFlowsFn, 122 + }; 123 + 124 + fn retryFailedFlowsFn(ctx: *RuleContext) void { 125 + // if retries not configured, allow transition to FAILED 126 + const max_retries = ctx.retries orelse return; 127 + 128 + // if run_count > retries, allow transition to FAILED (exhausted retries) 129 + if (ctx.run_count > max_retries) { 130 + return; 131 + } 132 + 133 + // retries available - schedule a retry 134 + // calculate scheduled_time = now + retry_delay 135 + const delay_seconds = ctx.retry_delay orelse 0; 136 + const now_us = time_util.nowMicros(); 137 + const scheduled_us = now_us + delay_seconds * 1_000_000; 138 + 139 + // format as ISO timestamp using stack buffer 140 + var buf: [32]u8 = undefined; 141 + const scheduled_time = time_util.formatMicros(&buf, scheduled_us); 142 + 143 + ctx.scheduleRetry("Retrying", scheduled_time); 144 + } 145 + 146 + // ============================================================================ 105 147 // CoreFlowPolicy 106 148 // ============================================================================ 107 149 ··· 110 152 PreventPendingTransitions, 111 153 CopyScheduledTime, 112 154 WaitForScheduledTime, 155 + RetryFailedFlows, 113 156 // future rules will be added here in priority order: 114 157 // PreventDuplicateTransitions, 115 158 // HandleFlowTerminalStateTransitions, 116 - // RetryFailedFlows, 117 159 // etc. 118 160 }; 119 161 ··· 388 430 389 431 try testing.expectEqual(rules.ResponseStatus.WAIT, ctx.result.status); 390 432 } 433 + 434 + // ============================================================================ 435 + // RetryFailedFlows Tests 436 + // ============================================================================ 437 + 438 + test "RetryFailedFlows.appliesTo" { 439 + const testing = std.testing; 440 + 441 + // applies to RUNNING → FAILED 442 + try testing.expect(RetryFailedFlows.appliesTo(.RUNNING, .FAILED)); 443 + 444 + // does NOT apply to other transitions 445 + try testing.expect(!RetryFailedFlows.appliesTo(.PENDING, .FAILED)); 446 + try testing.expect(!RetryFailedFlows.appliesTo(.RUNNING, .COMPLETED)); 447 + try testing.expect(!RetryFailedFlows.appliesTo(.RUNNING, .CANCELLED)); 448 + try testing.expect(!RetryFailedFlows.appliesTo(null, .FAILED)); 449 + } 450 + 451 + test "RetryFailedFlows allows FAILED when retries not configured" { 452 + const testing = std.testing; 453 + 454 + var ctx = RuleContext{ 455 + .initial_state = .RUNNING, 456 + .proposed_state = .FAILED, 457 + .initial_state_timestamp = "2024-01-19T16:30:00Z", 458 + .proposed_state_timestamp = "2024-01-19T16:30:01Z", 459 + .run_id = "test-run-id", 460 + .run_count = 1, 461 + .retries = null, // no retries configured 462 + }; 463 + 464 + rules.applyPolicy(&CoreFlowPolicy, &ctx); 465 + 466 + try testing.expectEqual(rules.ResponseStatus.ACCEPT, ctx.result.status); 467 + } 468 + 469 + test "RetryFailedFlows allows FAILED when retries exhausted" { 470 + const testing = std.testing; 471 + 472 + var ctx = RuleContext{ 473 + .initial_state = .RUNNING, 474 + .proposed_state = .FAILED, 475 + .initial_state_timestamp = "2024-01-19T16:30:00Z", 476 + .proposed_state_timestamp = "2024-01-19T16:30:01Z", 477 + .run_id = "test-run-id", 478 + .run_count = 3, // already ran 3 times 479 + .retries = 2, // only 2 retries allowed 480 + }; 481 + 482 + rules.applyPolicy(&CoreFlowPolicy, &ctx); 483 + 484 + try testing.expectEqual(rules.ResponseStatus.ACCEPT, ctx.result.status); 485 + } 486 + 487 + test "RetryFailedFlows schedules retry when retries available" { 488 + const testing = std.testing; 489 + 490 + var ctx = RuleContext{ 491 + .initial_state = .RUNNING, 492 + .proposed_state = .FAILED, 493 + .initial_state_timestamp = "2024-01-19T16:30:00Z", 494 + .proposed_state_timestamp = "2024-01-19T16:30:01Z", 495 + .run_id = "test-run-id", 496 + .run_count = 1, // first run 497 + .retries = 3, // 3 retries allowed 498 + .retry_delay = 10, // 10 seconds delay 499 + }; 500 + 501 + rules.applyPolicy(&CoreFlowPolicy, &ctx); 502 + 503 + try testing.expectEqual(rules.ResponseStatus.REJECT, ctx.result.status); 504 + try testing.expectEqual(rules.StateType.SCHEDULED, ctx.retry_state_type.?); 505 + try testing.expectEqualStrings("AwaitingRetry", ctx.retry_state_name.?); 506 + try testing.expect(ctx.retry_scheduled_time != null); 507 + } 508 + 509 + test "RetryFailedFlows schedules retry on first failure (run_count = 0)" { 510 + const testing = std.testing; 511 + 512 + var ctx = RuleContext{ 513 + .initial_state = .RUNNING, 514 + .proposed_state = .FAILED, 515 + .initial_state_timestamp = "2024-01-19T16:30:00Z", 516 + .proposed_state_timestamp = "2024-01-19T16:30:01Z", 517 + .run_id = "test-run-id", 518 + .run_count = 0, // first run (0-indexed) 519 + .retries = 1, // allow 1 retry 520 + .retry_delay = 5, 521 + }; 522 + 523 + rules.applyPolicy(&CoreFlowPolicy, &ctx); 524 + 525 + try testing.expectEqual(rules.ResponseStatus.REJECT, ctx.result.status); 526 + try testing.expectEqual(rules.StateType.SCHEDULED, ctx.retry_state_type.?); 527 + } 528 + 529 + test "RetryFailedFlows uses default 0 delay when retry_delay is null" { 530 + const testing = std.testing; 531 + 532 + var ctx = RuleContext{ 533 + .initial_state = .RUNNING, 534 + .proposed_state = .FAILED, 535 + .initial_state_timestamp = "2024-01-19T16:30:00Z", 536 + .proposed_state_timestamp = "2024-01-19T16:30:01Z", 537 + .run_id = "test-run-id", 538 + .run_count = 0, 539 + .retries = 2, 540 + .retry_delay = null, // no delay specified 541 + }; 542 + 543 + rules.applyPolicy(&CoreFlowPolicy, &ctx); 544 + 545 + try testing.expectEqual(rules.ResponseStatus.REJECT, ctx.result.status); 546 + try testing.expectEqual(rules.StateType.SCHEDULED, ctx.retry_state_type.?); 547 + // retry should be scheduled for ~now (delay = 0) 548 + try testing.expect(ctx.retry_scheduled_time != null); 549 + }
+21
src/orchestration/rules.zig
··· 29 29 flow_id: ?[]const u8 = null, 30 30 deployment_id: ?[]const u8 = null, 31 31 32 + // retry policy (from empirical_policy) 33 + run_count: i64 = 0, 34 + retries: ?i64 = null, // max retries allowed 35 + retry_delay: ?i64 = null, // delay in seconds between retries 36 + 32 37 // orchestration result (modified by rules) 33 38 result: OrchestrationResult = .{}, 34 39 35 40 // output: values to write back to db (set by rules) 36 41 new_expected_start_time: ?[]const u8 = null, 42 + 43 + // retry output: when RetryFailedFlows rejects, it proposes a new state 44 + retry_state_type: ?StateType = null, 45 + retry_state_name: ?[]const u8 = null, 46 + retry_scheduled_time: ?[]const u8 = null, 37 47 38 48 /// reject the transition with a reason 39 49 pub fn reject(self: *RuleContext, reason: []const u8) void { ··· 55 65 self.result.status = .ABORT; 56 66 self.result.details.reason = reason; 57 67 log.debug("orchestration", "rule aborted transition: {s}", .{reason}); 68 + } 69 + 70 + /// reject and schedule a retry (used by RetryFailedFlows) 71 + /// The caller should set retry_scheduled_time before calling this 72 + pub fn scheduleRetry(self: *RuleContext, reason: []const u8, scheduled_time: []const u8) void { 73 + self.result.status = .REJECT; 74 + self.result.details.reason = reason; 75 + self.retry_state_type = .SCHEDULED; 76 + self.retry_state_name = "AwaitingRetry"; 77 + self.retry_scheduled_time = scheduled_time; 78 + log.debug("orchestration", "rule scheduled retry: {s} at {s}", .{ reason, scheduled_time }); 58 79 } 59 80 60 81 /// check if transition is still accepted (not yet rejected/waited/aborted)