Kubernetes Operator for Tangled Spindles

various fixes

evan.jarrett.net e42b257a 53c53224

verified
+37 -11
+37 -11
internal/engine/kubernetes_engine.go
··· 30 30 31 31 // workflowLogStream holds the state for streaming logs from a workflow's pod 32 32 type workflowLogStream struct { 33 - scanner *bufio.Scanner 34 - stream io.ReadCloser 35 - pod *corev1.Pod 33 + scanner *bufio.Scanner 34 + stream io.ReadCloser 35 + pod *corev1.Pod 36 + podPhase corev1.PodPhase // Track pod phase at stream creation time 36 37 } 37 38 38 39 // KubernetesEngine implements the spindle Engine interface for Kubernetes Jobs. ··· 278 279 PropagationPolicy: &deletePolicy, 279 280 } 280 281 281 - if err := e.client.Delete(ctx, spindleSet, deleteOptions); err != nil { 282 + // Use a fresh context for cleanup to ensure deletion succeeds even if the 283 + // original context was canceled (e.g., by errgroup when another workflow completes). 284 + // This prevents orphaned SpindleSets when running multiple workflows in parallel. 285 + cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 286 + defer cancel() 287 + 288 + if err := e.client.Delete(cleanupCtx, spindleSet, deleteOptions); err != nil { 282 289 // Ignore not found errors (SpindleSet may have already been deleted) 283 290 if client.IgnoreNotFound(err) != nil { 284 291 return fmt.Errorf("failed to delete SpindleSet: %w", err) ··· 436 443 time.Sleep(1 * time.Second) 437 444 } 438 445 439 - logger.Info("Pod is ready, streaming logs", "podName", pod.Name, "phase", pod.Status.Phase) 446 + // Only use Follow mode for running pods. For completed pods, we need to read 447 + // existing logs (Follow:true only streams NEW logs after connection). 448 + shouldFollow := pod.Status.Phase == corev1.PodRunning 449 + if !shouldFollow { 450 + logger.Info("Pod already completed, reading existing logs", "podName", pod.Name, "phase", pod.Status.Phase) 451 + } else { 452 + logger.Info("Pod is running, streaming logs", "podName", pod.Name, "phase", pod.Status.Phase) 453 + } 440 454 441 455 // Stream logs from the main container (not init containers) 442 456 req := clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{ 443 457 Container: "runner", 444 - Follow: true, 458 + Follow: shouldFollow, 445 459 }) 446 460 447 461 logStream, err := req.Stream(ctx) ··· 456 470 457 471 // Create and store stream 458 472 stream = &workflowLogStream{ 459 - scanner: scanner, 460 - stream: logStream, 461 - pod: pod, 473 + scanner: scanner, 474 + stream: logStream, 475 + pod: pod, 476 + podPhase: pod.Status.Phase, 462 477 } 463 478 464 479 e.streamMutex.Lock() ··· 534 549 } 535 550 } 536 551 537 - // If we get here, scanner ended without seeing step end event 538 - // This could mean pod terminated early 552 + // Scanner ended without seeing step end event. 553 + // Check pod phase to determine if this is an error or expected behavior. 554 + if stream.podPhase == corev1.PodSucceeded { 555 + // Pod succeeded but we didn't find the control event - treat as success. 556 + // This can happen if logs were truncated or runner didn't emit events. 557 + return nil 558 + } 559 + 560 + if stream.podPhase == corev1.PodFailed { 561 + return fmt.Errorf("pod failed before step %d completed", stepID) 562 + } 563 + 564 + // Pod was running when we started but stream ended unexpectedly 539 565 return fmt.Errorf("log stream ended before step %d completed", stepID) 540 566 } 541 567