Kubernetes Operator for Tangled Spindles

fix issues actually running the workflows

evan.jarrett.net ea55cbd7 0ec6f8ac

verified
+53 -39
+33 -30
cmd/runner/main.go
··· 15 15 16 16 // LogEvent represents a structured log event emitted by the runner. 17 17 type LogEvent struct { 18 - Kind string `json:"kind"` // "control" or "data" 19 - Event string `json:"event,omitempty"` // "start", "end" (for control events) 20 - StepID int `json:"step_id"` // 0-based step index 21 - StepName string `json:"step_name,omitempty"` // Step name 22 - Stream string `json:"stream,omitempty"` // "stdout" or "stderr" (for data events) 23 - Content string `json:"content,omitempty"` // Log line content (for data events) 24 - ExitCode *int `json:"exit_code,omitempty"` // Exit code (for control/end events) 25 - Timestamp string `json:"timestamp"` // ISO 8601 timestamp 18 + Kind string `json:"kind"` // "control" or "data" 19 + Event string `json:"event,omitempty"` // "start", "end" (for control events) 20 + StepID int `json:"step_id"` // 0-based step index 21 + StepName string `json:"step_name,omitempty"` // Step name 22 + WorkflowName string `json:"workflow_name"` // Workflow name for log separation 23 + Stream string `json:"stream,omitempty"` // "stdout" or "stderr" (for data events) 24 + Content string `json:"content,omitempty"` // Log line content (for data events) 25 + ExitCode *int `json:"exit_code,omitempty"` // Exit code (for control/end events) 26 + Timestamp string `json:"timestamp"` // ISO 8601 timestamp 26 27 } 27 28 28 29 func main() { ··· 56 57 // Execute each step 57 58 ctx := context.Background() 58 59 for i, step := range workflow.Steps { 59 - if err := executeStep(ctx, i, step); err != nil { 60 + if err := executeStep(ctx, i, step, workflow.Name); err != nil { 60 61 return fmt.Errorf("step %d (%s) failed: %w", i, step.Name, err) 61 62 } 62 63 } ··· 64 65 return nil 65 66 } 66 67 67 - func executeStep(ctx context.Context, stepID int, step loomv1alpha1.WorkflowStep) error { 68 + func executeStep(ctx context.Context, stepID int, step loomv1alpha1.WorkflowStep, workflowName string) error { 68 69 // Emit step start event 69 - emitControlEvent(stepID, step.Name, "start", nil) 70 + emitControlEvent(stepID, step.Name, workflowName, "start", nil) 70 71 71 72 // Set step-specific environment variables 72 73 if step.Environment != nil { ··· 95 96 // Start the command 96 97 if err := cmd.Start(); err != nil { 97 98 exitCode := 1 98 - emitControlEvent(stepID, step.Name, "end", &exitCode) 99 + emitControlEvent(stepID, step.Name, workflowName, "end", &exitCode) 99 100 return fmt.Errorf("failed to start command: %w", err) 100 101 } 101 102 102 103 // Stream stdout and stderr concurrently 103 104 done := make(chan error, 2) 104 - go streamOutput(stdout, stepID, "stdout", done) 105 - go streamOutput(stderr, stepID, "stderr", done) 105 + go streamOutput(stdout, stepID, workflowName, "stdout", done) 106 + go streamOutput(stderr, stepID, workflowName, "stderr", done) 106 107 107 108 // Wait for both streams to complete 108 109 for i := 0; i < 2; i++ { ··· 124 125 } 125 126 126 127 // Emit step end event 127 - emitControlEvent(stepID, step.Name, "end", &exitCode) 128 + emitControlEvent(stepID, step.Name, workflowName, "end", &exitCode) 128 129 129 130 if exitCode != 0 { 130 131 return fmt.Errorf("command exited with code %d", exitCode) ··· 133 134 return nil 134 135 } 135 136 136 - func streamOutput(reader io.Reader, stepID int, stream string, done chan<- error) { 137 + func streamOutput(reader io.Reader, stepID int, workflowName, stream string, done chan<- error) { 137 138 scanner := bufio.NewScanner(reader) 138 139 // Increase buffer size for long lines 139 140 buf := make([]byte, 0, 64*1024) ··· 141 142 142 143 for scanner.Scan() { 143 144 line := scanner.Text() 144 - emitDataEvent(stepID, stream, line) 145 + emitDataEvent(stepID, workflowName, stream, line) 145 146 } 146 147 147 148 done <- scanner.Err() 148 149 } 149 150 150 - func emitControlEvent(stepID int, stepName, event string, exitCode *int) { 151 + func emitControlEvent(stepID int, stepName, workflowName, event string, exitCode *int) { 151 152 ev := LogEvent{ 152 - Kind: "control", 153 - Event: event, 154 - StepID: stepID, 155 - StepName: stepName, 156 - ExitCode: exitCode, 157 - Timestamp: time.Now().UTC().Format(time.RFC3339Nano), 153 + Kind: "control", 154 + Event: event, 155 + StepID: stepID, 156 + StepName: stepName, 157 + WorkflowName: workflowName, 158 + ExitCode: exitCode, 159 + Timestamp: time.Now().UTC().Format(time.RFC3339Nano), 158 160 } 159 161 emitJSON(ev) 160 162 } 161 163 162 - func emitDataEvent(stepID int, stream, content string) { 164 + func emitDataEvent(stepID int, workflowName, stream, content string) { 163 165 ev := LogEvent{ 164 - Kind: "data", 165 - StepID: stepID, 166 - Stream: stream, 167 - Content: content, 168 - Timestamp: time.Now().UTC().Format(time.RFC3339Nano), 166 + Kind: "data", 167 + StepID: stepID, 168 + WorkflowName: workflowName, 169 + Stream: stream, 170 + Content: content, 171 + Timestamp: time.Now().UTC().Format(time.RFC3339Nano), 169 172 } 170 173 emitJSON(ev) 171 174 }
+1 -1
config/manager/kustomization.yaml
··· 8 8 images: 9 9 - name: controller 10 10 newName: atcr.io/evan.jarrett.net/loom 11 - newTag: v0.0.8 11 + newTag: v0.0.9
+10
internal/controller/spindleset_controller.go
··· 252 252 // setCondition adds or updates a condition in the list 253 253 func setCondition(conditions *[]metav1.Condition, newCondition metav1.Condition) { 254 254 if conditions == nil { 255 + return // Pointer itself is nil, nothing we can do 256 + } 257 + 258 + if *conditions == nil { 255 259 *conditions = []metav1.Condition{} 256 260 } 257 261 ··· 313 317 Image: workflowSpec.Image, 314 318 Architecture: workflowSpec.Architecture, 315 319 Steps: jobSteps, 320 + WorkflowSpec: workflowSpec, // Pass full workflow spec to runner 316 321 RepoURL: pipelineRun.RepoURL, 317 322 CommitSHA: pipelineRun.CommitSHA, 318 323 Secrets: nil, // TODO: Handle secrets ··· 334 339 335 340 logger.Info("Creating Job for workflow", "workflow", workflowSpec.Name, "job", job.Name) 336 341 if err := r.Create(ctx, job); err != nil { 342 + if apierrors.IsAlreadyExists(err) { 343 + // Job already exists (possibly from previous deployment), skip 344 + logger.Info("Job already exists, skipping creation", "workflow", workflowSpec.Name, "job", job.Name) 345 + continue 346 + } 337 347 return fmt.Errorf("failed to create job for workflow %s: %w", workflowSpec.Name, err) 338 348 } 339 349
+9 -8
internal/engine/kubernetes_engine.go
··· 412 412 413 413 // LogEvent represents a structured log event from the runner binary 414 414 type LogEvent struct { 415 - Kind string `json:"kind"` // "control" or "data" 416 - Event string `json:"event,omitempty"` // "start", "end" (for control events) 417 - StepID int `json:"step_id"` // 0-based step index 418 - StepName string `json:"step_name,omitempty"` // Step name 419 - Stream string `json:"stream,omitempty"` // "stdout" or "stderr" (for data events) 420 - Content string `json:"content,omitempty"` // Log line content (for data events) 421 - ExitCode *int `json:"exit_code,omitempty"` // Exit code (for control/end events) 422 - Timestamp string `json:"timestamp"` // ISO 8601 timestamp 415 + Kind string `json:"kind"` // "control" or "data" 416 + Event string `json:"event,omitempty"` // "start", "end" (for control events) 417 + StepID int `json:"step_id"` // 0-based step index 418 + StepName string `json:"step_name,omitempty"` // Step name 419 + WorkflowName string `json:"workflow_name"` // Workflow name for log separation 420 + Stream string `json:"stream,omitempty"` // "stdout" or "stderr" (for data events) 421 + Content string `json:"content,omitempty"` // Log line content (for data events) 422 + ExitCode *int `json:"exit_code,omitempty"` // Exit code (for control/end events) 423 + Timestamp string `json:"timestamp"` // ISO 8601 timestamp 423 424 } 424 425 425 426 // parseLogs reads log lines as JSON events and sends them to WorkflowLogger