Kubernetes Operator for Tangled Spindles

change kubernetes_engine to send logs on each runstep rather than sending them all on step 0

evan.jarrett.net 0e219020 7f48e70d

verified
+161 -114
-1
.tangled/workflows/workflow-amd64.yaml
··· 1 - 2 1 engine: kubernetes 3 2 image: golang:1.25-trixie 4 3 architecture: amd64
-1
.tangled/workflows/workflow-arm64.yaml
··· 1 - 2 1 engine: kubernetes 3 2 image: golang:1.25-trixie 4 3 architecture: arm64
+161 -112
internal/engine/kubernetes_engine.go
··· 7 7 "fmt" 8 8 "io" 9 9 "strings" 10 + "sync" 10 11 "time" 11 12 12 13 "gopkg.in/yaml.v3" ··· 25 26 loomv1alpha1 "tangled.org/evan.jarrett.net/loom/api/v1alpha1" 26 27 ) 27 28 29 + // workflowLogStream holds the state for streaming logs from a workflow's pod 30 + type workflowLogStream struct { 31 + scanner *bufio.Scanner 32 + stream io.ReadCloser 33 + pod *corev1.Pod 34 + } 35 + 28 36 // KubernetesEngine implements the spindle Engine interface for Kubernetes Jobs. 29 37 type KubernetesEngine struct { 30 38 client client.Client ··· 37 45 38 46 // Store current knot for Job annotations 39 47 currentKnot string 48 + 49 + // Active log streams per workflow - persist across RunStep calls 50 + logStreams map[string]*workflowLogStream 51 + streamMutex sync.RWMutex 40 52 } 41 53 42 54 // NewKubernetesEngine creates a new Kubernetes-based spindle engine. ··· 47 59 namespace: namespace, 48 60 template: template, 49 61 spindleSets: make(map[string]*loomv1alpha1.SpindleSet), 62 + logStreams: make(map[string]*workflowLogStream), 50 63 } 51 64 } 52 65 ··· 231 244 // Remove from tracking map 232 245 delete(e.spindleSets, wid.String()) 233 246 247 + // Close any open log streams for this workflow 248 + e.closeLogStream(wid) 249 + 234 250 logger.Info("SpindleSet cleaned up successfully") 235 251 return nil 236 252 } 237 253 238 - // RunStep waits for the Job to complete and streams logs. 239 - // For Kubernetes engine, all steps run in a single Job, but we need to wait for completion. 254 + // RunStep streams logs for the specific step and waits for that step to complete. 255 + // For Kubernetes engine, all steps run in a single Job, but we stream logs incrementally 256 + // as each step executes. Each RunStep call blocks until that step's "end" control event is received. 240 257 func (e *KubernetesEngine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error { 241 258 logger := log.FromContext(ctx).WithValues("workflow", wid.Name, "pipeline", wid.PipelineId.Rkey, "step", idx) 242 259 243 - // Only wait on the first step - the Job runs all steps together 244 - if idx != 0 { 245 - return nil 246 - } 247 - 248 - // Query for the Job created by SpindleSetReconciler 249 - spindleSet, exists := e.spindleSets[wid.String()] 250 - if !exists { 251 - return fmt.Errorf("no SpindleSet found for workflow") 252 - } 253 - 254 - // Wait for Job to be created by controller 260 + // Query for the Job created by SpindleSetReconciler (only on first step) 255 261 var job *batchv1.Job 256 - deadline := time.Now().Add(5 * time.Minute) 257 - for { 258 - if time.Now().After(deadline) { 259 - return fmt.Errorf("timeout waiting for Job to be created by controller") 262 + if idx == 0 { 263 + spindleSet, exists := e.spindleSets[wid.String()] 264 + if !exists { 265 + return fmt.Errorf("no SpindleSet found for workflow") 260 266 } 261 267 262 - jobList := &batchv1.JobList{} 263 - err := e.client.List(ctx, jobList, 264 - client.InNamespace(e.namespace), 265 - client.MatchingLabels{ 266 - "loom.j5t.io/spindleset": spindleSet.Name, 267 - "loom.j5t.io/workflow": w.Name, 268 - }) 269 - if err != nil { 270 - return fmt.Errorf("failed to list jobs: %w", err) 271 - } 268 + // Wait for Job to be created by controller 269 + deadline := time.Now().Add(5 * time.Minute) 270 + for { 271 + if time.Now().After(deadline) { 272 + return fmt.Errorf("timeout waiting for Job to be created by controller") 273 + } 274 + 275 + jobList := &batchv1.JobList{} 276 + err := e.client.List(ctx, jobList, 277 + client.InNamespace(e.namespace), 278 + client.MatchingLabels{ 279 + "loom.j5t.io/spindleset": spindleSet.Name, 280 + "loom.j5t.io/workflow": w.Name, 281 + }) 282 + if err != nil { 283 + return fmt.Errorf("failed to list jobs: %w", err) 284 + } 272 285 273 - if len(jobList.Items) > 0 { 274 - job = &jobList.Items[0] 275 - break 286 + if len(jobList.Items) > 0 { 287 + job = &jobList.Items[0] 288 + break 289 + } 290 + 291 + time.Sleep(2 * time.Second) 276 292 } 277 293 278 - time.Sleep(2 * time.Second) 294 + logger.Info("Found Job for workflow", "jobName", job.Name) 279 295 } 280 296 281 - logger.Info("Found Job for workflow, waiting for completion", "jobName", job.Name) 297 + // Get or create log stream (creates on first step, reuses on subsequent steps) 298 + stream, err := e.getOrCreateLogStream(ctx, wid, job) 299 + if err != nil { 300 + return fmt.Errorf("failed to get log stream: %w", err) 301 + } 282 302 283 - // Stream logs from the Job's pod 303 + // Read from stream until this step's end event 284 304 if wfLogger != nil { 285 - if err := e.streamJobLogs(ctx, job, w, wfLogger); err != nil { 286 - logger.Error(err, "Failed to stream logs") 287 - // Don't return error - continue to check Job status 305 + logger.Info("Reading logs for step", "stepID", idx) 306 + if err := e.readUntilStepEnd(stream, idx, w, wfLogger); err != nil { 307 + logger.Error(err, "Failed to read step logs") 308 + // Clean up stream on error 309 + e.closeLogStream(wid) 310 + return fmt.Errorf("failed to read logs for step %d: %w", idx, err) 288 311 } 312 + logger.Info("Step completed", "stepID", idx) 289 313 } 290 314 291 - // Wait for the Job to complete 292 - return e.waitForJobCompletion(ctx, job) 315 + // Clean up stream after last step 316 + if idx == len(w.Steps)-1 { 317 + logger.Info("Last step completed, closing log stream") 318 + e.closeLogStream(wid) 319 + } 320 + 321 + return nil 293 322 } 294 323 295 - // waitForJobCompletion polls the Job status until it completes or times out 296 - func (e *KubernetesEngine) waitForJobCompletion(ctx context.Context, job *batchv1.Job) error { 297 - logger := log.FromContext(ctx).WithValues("jobName", job.Name) 324 + // LogEvent represents a structured log event from the runner binary 325 + type LogEvent struct { 326 + Kind string `json:"kind"` // "control" or "data" 327 + Event string `json:"event,omitempty"` // "start", "end" (for control events) 328 + StepID int `json:"step_id"` // 0-based step index 329 + StepName string `json:"step_name,omitempty"` // Step name 330 + WorkflowName string `json:"workflow_name"` // Workflow name for log separation 331 + Stream string `json:"stream,omitempty"` // "stdout" or "stderr" (for data events) 332 + Content string `json:"content,omitempty"` // Log line content (for data events) 333 + ExitCode *int `json:"exit_code,omitempty"` // Exit code (for control/end events) 334 + Timestamp string `json:"timestamp"` // ISO 8601 timestamp 335 + } 298 336 299 - timeout := time.After(e.WorkflowTimeout()) 300 - ticker := time.NewTicker(5 * time.Second) 301 - defer ticker.Stop() 337 + // getOrCreateLogStream gets an existing log stream or creates a new one for step 0 338 + func (e *KubernetesEngine) getOrCreateLogStream(ctx context.Context, wid models.WorkflowId, job *batchv1.Job) (*workflowLogStream, error) { 339 + widKey := wid.String() 302 340 303 - for { 304 - select { 305 - case <-ctx.Done(): 306 - return ctx.Err() 307 - case <-timeout: 308 - return fmt.Errorf("job timed out after %v", e.WorkflowTimeout()) 309 - case <-ticker.C: 310 - // Check Job status 311 - currentJob := &batchv1.Job{} 312 - err := e.client.Get(ctx, client.ObjectKey{ 313 - Namespace: job.Namespace, 314 - Name: job.Name, 315 - }, currentJob) 316 - if err != nil { 317 - return fmt.Errorf("failed to get job status: %w", err) 318 - } 319 - 320 - // Check if Job completed 321 - if currentJob.Status.Succeeded > 0 { 322 - logger.Info("Kubernetes Job completed successfully") 323 - return nil 324 - } 341 + // Check if stream already exists 342 + e.streamMutex.RLock() 343 + stream, exists := e.logStreams[widKey] 344 + e.streamMutex.RUnlock() 325 345 326 - // Check if Job failed 327 - if currentJob.Status.Failed > 0 { 328 - logger.Error(nil, "Kubernetes Job failed") 329 - return fmt.Errorf("job failed") 330 - } 331 - 332 - // Still running, continue waiting 333 - } 346 + if exists { 347 + return stream, nil 334 348 } 335 - } 336 349 337 - // streamJobLogs streams logs from the Job's pod to the WorkflowLogger 338 - func (e *KubernetesEngine) streamJobLogs(ctx context.Context, job *batchv1.Job, workflow *models.Workflow, wfLogger *models.WorkflowLogger) error { 339 - logger := log.FromContext(ctx).WithValues("jobName", job.Name) 350 + // Create new stream 351 + logger := log.FromContext(ctx).WithValues("workflow", wid.Name, "pipeline", wid.PipelineId.Rkey) 340 352 341 353 // Create kubernetes clientset for log streaming 342 354 clientset, err := kubernetes.NewForConfig(e.config) 343 355 if err != nil { 344 - return fmt.Errorf("failed to create kubernetes clientset: %w", err) 356 + return nil, fmt.Errorf("failed to create kubernetes clientset: %w", err) 345 357 } 346 358 347 359 // Wait for pod to be created ··· 349 361 deadline := time.Now().Add(2 * time.Minute) 350 362 for { 351 363 if time.Now().After(deadline) { 352 - return fmt.Errorf("timeout waiting for pod to be created") 364 + return nil, fmt.Errorf("timeout waiting for pod to be created") 353 365 } 354 366 355 367 pods := &corev1.PodList{} 356 368 err := e.client.List(ctx, pods, client.InNamespace(job.Namespace), client.MatchingLabels(job.Spec.Template.Labels)) 357 369 if err != nil { 358 - return fmt.Errorf("failed to list pods: %w", err) 370 + return nil, fmt.Errorf("failed to list pods: %w", err) 359 371 } 360 372 361 373 if len(pods.Items) > 0 { ··· 372 384 deadline = time.Now().Add(5 * time.Minute) 373 385 for { 374 386 if time.Now().After(deadline) { 375 - return fmt.Errorf("timeout waiting for pod to start") 387 + return nil, fmt.Errorf("timeout waiting for pod to start") 376 388 } 377 389 378 390 currentPod := &corev1.Pod{} ··· 381 393 Name: pod.Name, 382 394 }, currentPod) 383 395 if err != nil { 384 - return fmt.Errorf("failed to get pod: %w", err) 396 + return nil, fmt.Errorf("failed to get pod: %w", err) 385 397 } 386 398 387 399 if currentPod.Status.Phase == corev1.PodRunning || currentPod.Status.Phase == corev1.PodSucceeded || currentPod.Status.Phase == corev1.PodFailed { ··· 400 412 Follow: true, 401 413 }) 402 414 403 - stream, err := req.Stream(ctx) 415 + logStream, err := req.Stream(ctx) 404 416 if err != nil { 405 - return fmt.Errorf("failed to open log stream: %w", err) 417 + return nil, fmt.Errorf("failed to open log stream: %w", err) 406 418 } 407 - defer stream.Close() 408 419 409 - // Parse logs and send to WorkflowLogger 410 - return e.parseLogs(stream, workflow, wfLogger) 420 + // Create scanner 421 + scanner := bufio.NewScanner(logStream) 422 + buf := make([]byte, 0, 64*1024) 423 + scanner.Buffer(buf, 1024*1024) 424 + 425 + // Create and store stream 426 + stream = &workflowLogStream{ 427 + scanner: scanner, 428 + stream: logStream, 429 + pod: pod, 430 + } 431 + 432 + e.streamMutex.Lock() 433 + e.logStreams[widKey] = stream 434 + e.streamMutex.Unlock() 435 + 436 + return stream, nil 411 437 } 412 438 413 - // LogEvent represents a structured log event from the runner binary 414 - type LogEvent struct { 415 - Kind string `json:"kind"` // "control" or "data" 416 - Event string `json:"event,omitempty"` // "start", "end" (for control events) 417 - StepID int `json:"step_id"` // 0-based step index 418 - StepName string `json:"step_name,omitempty"` // Step name 419 - WorkflowName string `json:"workflow_name"` // Workflow name for log separation 420 - Stream string `json:"stream,omitempty"` // "stdout" or "stderr" (for data events) 421 - Content string `json:"content,omitempty"` // Log line content (for data events) 422 - ExitCode *int `json:"exit_code,omitempty"` // Exit code (for control/end events) 423 - Timestamp string `json:"timestamp"` // ISO 8601 timestamp 439 + // closeLogStream closes and removes a log stream 440 + func (e *KubernetesEngine) closeLogStream(wid models.WorkflowId) { 441 + widKey := wid.String() 442 + 443 + e.streamMutex.Lock() 444 + defer e.streamMutex.Unlock() 445 + 446 + if stream, exists := e.logStreams[widKey]; exists { 447 + stream.stream.Close() 448 + delete(e.logStreams, widKey) 449 + } 424 450 } 425 451 426 - // parseLogs reads log lines as JSON events and sends them to WorkflowLogger 427 - func (e *KubernetesEngine) parseLogs(stream io.Reader, workflow *models.Workflow, wfLogger *models.WorkflowLogger) error { 428 - scanner := bufio.NewScanner(stream) 429 - // Increase buffer size for long lines 430 - buf := make([]byte, 0, 64*1024) 431 - scanner.Buffer(buf, 1024*1024) 452 + // readUntilStepEnd reads from the log stream until the end event for the specified step 453 + func (e *KubernetesEngine) readUntilStepEnd(stream *workflowLogStream, stepID int, workflow *models.Workflow, wfLogger *models.WorkflowLogger) error { 454 + scanner := stream.scanner 432 455 433 456 for scanner.Scan() { 434 457 line := scanner.Text() ··· 436 459 // Try to parse as JSON event 437 460 var event LogEvent 438 461 if err := json.Unmarshal([]byte(line), &event); err != nil { 439 - // Not JSON or parse error - log as debug and skip 462 + // Not JSON or parse error - skip 440 463 continue 441 464 } 442 465 ··· 445 468 continue 446 469 } 447 470 471 + // Only process events for the current step 472 + if event.StepID != stepID { 473 + // Got event for a different step - this shouldn't happen in sequential execution 474 + // but log it and continue 475 + continue 476 + } 477 + 478 + step := workflow.Steps[event.StepID] 479 + 448 480 switch event.Kind { 449 481 case "control": 450 - // Control events (start/end) are already written by the spindle engine in engine.go 451 - // We don't need to write them again here to avoid duplicates 452 - continue 482 + // Write control events from runner JSON 483 + var status models.StepStatus 484 + if event.Event == "start" { 485 + status = models.StepStatusStart 486 + } else if event.Event == "end" { 487 + status = models.StepStatusEnd 488 + // Write the end event 489 + controlWriter := wfLogger.ControlWriter(event.StepID, step, status) 490 + controlWriter.Write([]byte{}) 491 + // Step is done, return 492 + return nil 493 + } else { 494 + continue // Unknown control event 495 + } 496 + 497 + // Write start event 498 + controlWriter := wfLogger.ControlWriter(event.StepID, step, status) 499 + controlWriter.Write([]byte{}) 453 500 454 501 case "data": 455 502 // Log output from step ··· 462 509 } 463 510 464 511 if err := scanner.Err(); err != nil { 465 - // EOF or context canceled is expected 512 + // EOF or context canceled is expected when pod terminates 466 513 if err != io.EOF && !strings.Contains(err.Error(), "context canceled") { 467 514 return fmt.Errorf("error reading logs: %w", err) 468 515 } 469 516 } 470 517 471 - return nil 518 + // If we get here, scanner ended without seeing step end event 519 + // This could mean pod terminated early 520 + return fmt.Errorf("log stream ended before step %d completed", stepID) 472 521 } 473 522 474 523 // Ensure KubernetesEngine implements the Engine interface