prefect server in zig

add logfire-zig instrumentation

- HTTP layer: wrap onRequest with http.request span
- DB layer: wrap exec/query methods with db.exec/db.query spans
- records errors on failures via span.recordError()

without LOGFIRE_WRITE_TOKEN, spans are no-op.
with token, exports to Logfire via OTLP/protobuf.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+74 -9
+6
build.zig
··· 35 35 .optimize = optimize, 36 36 }); 37 37 38 + const logfire = b.dependency("logfire", .{ 39 + .target = target, 40 + .optimize = optimize, 41 + }); 42 + 38 43 const imports: []const std.Build.Module.Import = &.{ 39 44 .{ .name = "uuid", .module = uuid_dep.module("uuid") }, 40 45 .{ .name = "zqlite", .module = zqlite.module("zqlite") }, ··· 42 47 .{ .name = "pg", .module = pg.module("pg") }, 43 48 .{ .name = "cron", .module = cron.module("cron") }, 44 49 .{ .name = "redis", .module = redis_dep.module("redis") }, 50 + .{ .name = "logfire", .module = logfire.module("logfire") }, 45 51 }; 46 52 47 53 const exe = b.addExecutable(.{
+4
build.zig.zon
··· 28 28 .url = "https://tangled.sh/zzstoatzz.io/redis/archive/main", 29 29 .hash = "redis-0.1.0-NGPovmBFAgA-97WY7lE9Wr1hqrZtZhhiILKgZFcc09Nt", 30 30 }, 31 + .logfire = .{ 32 + .url = "https://tangled.sh/zzstoatzz.io/logfire-zig/archive/main", 33 + .hash = "logfire_zig-0.1.0-x2yDLhGVAQDgxmrR773DfqZeiCeY_DoqALwpPJhKg7gw", 34 + }, 31 35 }, 32 36 .paths = .{ 33 37 "build.zig",
+1 -1
loq.toml
··· 2 2 3 3 [[rules]] 4 4 path = "src/db/backend.zig" 5 - max_lines = 535 5 + max_lines = 575 6 6 7 7 [[rules]] 8 8 path = "scripts/benchmark"
+38 -8
src/db/backend.zig
··· 6 6 pub const Dialect = dialect_mod.Dialect; 7 7 8 8 const log = @import("../logging.zig"); 9 + const logfire = @import("logfire"); 9 10 10 11 // backend libraries 11 12 const zqlite = @import("zqlite"); ··· 235 236 postgres: PostgresBackend, 236 237 }; 237 238 239 + fn dbSystem(self: *Backend) []const u8 { 240 + return switch (self.impl) { 241 + .sqlite => "sqlite", 242 + .postgres => "postgresql", 243 + }; 244 + } 245 + 238 246 const SqliteBackend = struct { 239 247 conn: zqlite.Conn, 240 248 }; ··· 315 323 } 316 324 317 325 /// Execute a statement that doesn't return rows (thread-safe) 318 - /// Note: For SQLite, uses mutex for thread safety. For Postgres, pool handles concurrency. 319 326 pub fn exec(self: *Backend, sql: []const u8, args: anytype) !void { 327 + const span = logfire.span("db.exec", .{ .@"db.system" = self.dbSystem() }); 328 + defer span.end(); 329 + 320 330 switch (self.impl) { 321 331 .sqlite => { 322 332 self.mutex.lock(); 323 333 defer self.mutex.unlock(); 324 - return self.execUnsafe(sql, args); 334 + self.execUnsafe(sql, args) catch |err| { 335 + span.recordError(err); 336 + return err; 337 + }; 325 338 }, 326 339 .postgres => { 327 340 // Postgres pool handles concurrency - no mutex needed 328 - return self.execUnsafe(sql, args); 341 + self.execUnsafe(sql, args) catch |err| { 342 + span.recordError(err); 343 + return err; 344 + }; 329 345 }, 330 346 } 331 347 } 332 348 333 349 /// Execute a statement and return the number of affected rows (thread-safe) 334 - /// Note: For SQLite, uses mutex for thread safety. For Postgres, pool handles concurrency. 335 350 pub fn execWithRowCount(self: *Backend, sql: []const u8, args: anytype) !i64 { 351 + const span = logfire.span("db.exec", .{ .@"db.system" = self.dbSystem() }); 352 + defer span.end(); 353 + 336 354 switch (self.impl) { 337 355 .sqlite => |*s| { 338 356 self.mutex.lock(); 339 357 defer self.mutex.unlock(); 340 358 s.conn.exec(sql, args) catch |err| { 359 + span.recordError(err); 341 360 log.err("database", "exec error: {}", .{err}); 342 361 return err; 343 362 }; 344 - return @intCast(s.conn.changes()); 363 + const count = @as(i64, @intCast(s.conn.changes())); 364 + span.setAttribute("db.response.rows_affected", count); 365 + return count; 345 366 }, 346 367 .postgres => |*p| { 347 368 // Postgres pool handles concurrency - no mutex needed ··· 349 370 defer if (rewritten.ptr != sql.ptr) self.allocator.free(rewritten); 350 371 351 372 const affected = p.pool.exec(rewritten, args) catch |err| { 373 + span.recordError(err); 352 374 log.err("database", "postgres exec error: {}", .{err}); 353 375 return err; 354 376 }; 355 - return affected orelse 0; 377 + const count = affected orelse 0; 378 + span.setAttribute("db.response.rows_affected", count); 379 + return count; 356 380 }, 357 381 } 358 382 } 359 383 360 384 /// Query for a single row 361 - /// Note: For SQLite, uses mutex for thread safety. For Postgres, pool handles concurrency. 362 385 pub fn row(self: *Backend, sql: []const u8, args: anytype) !?Row { 386 + const span = logfire.span("db.query", .{ .@"db.system" = self.dbSystem() }); 387 + defer span.end(); 388 + 363 389 switch (self.impl) { 364 390 .sqlite => |*s| { 365 391 self.mutex.lock(); ··· 385 411 } 386 412 387 413 /// Query for multiple rows 388 - /// Note: For SQLite, uses mutex for thread safety. For Postgres, pool handles concurrency. 389 414 pub fn query(self: *Backend, sql: []const u8, args: anytype) !Rows { 415 + const span = logfire.span("db.query", .{ .@"db.system" = self.dbSystem() }); 416 + defer span.end(); 417 + 390 418 switch (self.impl) { 391 419 .sqlite => |*s| { 392 420 self.mutex.lock(); 393 421 defer self.mutex.unlock(); 394 422 const rows = s.conn.rows(sql, args) catch |err| { 423 + span.recordError(err); 395 424 log.err("database", "query error: {}", .{err}); 396 425 return err; 397 426 }; ··· 403 432 defer if (rewritten.ptr != sql.ptr) self.allocator.free(rewritten); 404 433 405 434 const result = p.pool.query(rewritten, args) catch |err| { 435 + span.recordError(err); 406 436 log.err("database", "postgres query error: {}", .{err}); 407 437 return err; 408 438 };
+25
src/main.zig
··· 9 9 const events = @import("api/events.zig"); 10 10 const log = @import("logging.zig"); 11 11 const services = @import("services.zig"); 12 + const logfire = @import("logfire"); 12 13 13 14 // Graceful shutdown state 14 15 var shutdown_requested: bool = false; 16 + 17 + // Logfire instance for telemetry 18 + var lf: ?*logfire.Logfire = null; 15 19 16 20 fn signalHandler(sig: c_int) callconv(.c) void { 17 21 _ = sig; ··· 36 40 const method = r.method orelse "?"; 37 41 const path = r.path orelse "/"; 38 42 43 + // create span for request tracing 44 + const span = logfire.span("http.request", .{ 45 + .@"http.request.method" = method, 46 + .@"url.path" = path, 47 + }); 48 + defer span.end(); 49 + 39 50 routes.handle(r) catch |err| { 51 + span.recordError(err); 40 52 log.err("server", "{s} {s} - error: {}", .{ method, path, err }); 41 53 r.setStatus(.internal_server_error); 42 54 r.sendBody("{\"detail\":\"internal error\"}") catch {}; ··· 104 116 105 117 pub fn main() void { 106 118 log.init(); 119 + 120 + // initialize logfire telemetry 121 + lf = logfire.configure(.{ 122 + .service_name = "prefect-server", 123 + .service_version = "0.0.1", 124 + }) catch |err| blk: { 125 + log.warn("telemetry", "logfire init failed: {} - continuing without tracing", .{err}); 126 + break :blk null; 127 + }; 128 + defer if (lf) |l| { 129 + l.flush() catch {}; 130 + l.shutdown(); 131 + }; 107 132 108 133 const args = parseArgs(); 109 134