Kubernetes Operator for Tangled Spindles

clean up orphaned spindlesets

evan.jarrett.net d45d7b83 c307718d

verified
+81 -17
+81 -17
internal/controller/spindleset_controller.go
··· 109 109 } 110 110 } 111 111 112 - // Ensure Jobs are created for workflows 112 + // Update status FIRST so Phase is current before job creation decisions 113 + if err := r.updateStatus(ctx, spindleSet); err != nil { 114 + logger.Error(err, "Failed to update status") 115 + return ctrl.Result{}, err 116 + } 117 + 118 + // Re-fetch after status update to get the latest resource version 119 + if err := r.Get(ctx, req.NamespacedName, spindleSet); err != nil { 120 + if apierrors.IsNotFound(err) { 121 + return ctrl.Result{}, nil 122 + } 123 + return ctrl.Result{}, err 124 + } 125 + 126 + // Clean up orphaned Jobs in this namespace (always runs) 127 + if err := r.cleanupOrphanedJobs(ctx, spindleSet.Namespace); err != nil { 128 + logger.Error(err, "Failed to cleanup orphaned Jobs") 129 + } 130 + 131 + // If SpindleSet is terminal, don't create new jobs — check for TTL cleanup 132 + if spindleSet.Status.Phase == "Succeeded" || spindleSet.Status.Phase == "Failed" { 133 + age := time.Since(spindleSet.CreationTimestamp.Time) 134 + const spindleSetMaxLifetime = 4 * time.Hour 135 + if age > spindleSetMaxLifetime { 136 + logger.Info("Cleaning up terminal SpindleSet past max lifetime", 137 + "phase", spindleSet.Status.Phase, "age", age) 138 + if err := r.Delete(ctx, spindleSet); client.IgnoreNotFound(err) != nil { 139 + return ctrl.Result{}, err 140 + } 141 + return ctrl.Result{}, nil 142 + } 143 + // Requeue for cleanup later 144 + return ctrl.Result{RequeueAfter: spindleSetMaxLifetime - age}, nil 145 + } 146 + 147 + // Clean up ancient SpindleSets that never reached a terminal phase (pre-fix orphans) 148 + if spindleSet.Status.Phase == "" || spindleSet.Status.Phase == "Pending" { 149 + age := time.Since(spindleSet.CreationTimestamp.Time) 150 + const orphanMaxLifetime = 6 * time.Hour 151 + if age > orphanMaxLifetime { 152 + logger.Info("Cleaning up ancient SpindleSet with no terminal phase", 153 + "phase", spindleSet.Status.Phase, "age", age) 154 + if err := r.Delete(ctx, spindleSet); client.IgnoreNotFound(err) != nil { 155 + return ctrl.Result{}, err 156 + } 157 + return ctrl.Result{}, nil 158 + } 159 + } 160 + 161 + // Ensure Jobs are created for workflows (only for non-terminal SpindleSets) 113 162 var jobsErr error 114 163 if err := r.ensurePipelineJobs(ctx, spindleSet); err != nil { 115 164 logger.Error(err, "Failed to ensure pipeline Jobs") 116 165 jobsErr = err 117 - // Continue to update status even on error for better observability 118 166 } 119 167 120 168 // Monitor Job statuses 121 169 if err := r.monitorJobStatuses(ctx, spindleSet); err != nil { 122 170 logger.Error(err, "Failed to monitor job statuses") 123 - // Don't return error - we'll retry on next reconcile 124 - } 125 - 126 - // Clean up orphaned Jobs in this namespace 127 - if err := r.cleanupOrphanedJobs(ctx, spindleSet.Namespace); err != nil { 128 - logger.Error(err, "Failed to cleanup orphaned Jobs") 129 - // Don't return error - we'll retry on next reconcile 130 171 } 131 172 132 - // Update status based on current Jobs 133 - if err := r.updateStatus(ctx, spindleSet); err != nil { 134 - logger.Error(err, "Failed to update status") 135 - return ctrl.Result{}, err 136 - } 137 - 138 - // Return error from job creation if any (after status update) 139 173 if jobsErr != nil { 140 174 return ctrl.Result{}, jobsErr 141 175 } 142 176 143 177 // Requeue after 30 seconds to update status 144 - return ctrl.Result{RequeueAfter: 30 * ctrl.Result{}.RequeueAfter}, nil 178 + return ctrl.Result{RequeueAfter: 30 * time.Second}, nil 145 179 } 146 180 147 181 // handleDeletion handles cleanup when SpindleSet is being deleted ··· 280 314 statusChanged = true 281 315 } 282 316 317 + // Compute Phase with latch semantics: once terminal, never revert. 318 + // This prevents job recreation after TTL deletes completed Jobs. 319 + if spindleSet.Status.Phase != "Succeeded" && spindleSet.Status.Phase != "Failed" { 320 + expectedWorkflows := int32(len(spindleSet.Spec.PipelineRun.Workflows)) 321 + totalTerminal := completed + failed 322 + 323 + var newPhase string 324 + switch { 325 + case expectedWorkflows > 0 && totalTerminal >= expectedWorkflows && failed > 0: 326 + newPhase = "Failed" 327 + case expectedWorkflows > 0 && totalTerminal >= expectedWorkflows: 328 + newPhase = "Succeeded" 329 + case running > 0: 330 + newPhase = "Running" 331 + default: 332 + newPhase = "Pending" 333 + } 334 + 335 + if spindleSet.Status.Phase != newPhase { 336 + logger.Info("SpindleSet phase transition", "from", spindleSet.Status.Phase, "to", newPhase) 337 + spindleSet.Status.Phase = newPhase 338 + statusChanged = true 339 + } 340 + } 341 + 283 342 // Update conditions 284 343 readyCondition := metav1.Condition{ 285 344 Type: "Ready", ··· 345 404 346 405 // ensurePipelineJobs ensures Jobs are created for all workflows in a pipeline run 347 406 func (r *SpindleSetReconciler) ensurePipelineJobs(ctx context.Context, spindleSet *loomv1alpha1.SpindleSet) error { 407 + // Don't recreate jobs for completed pipelines 408 + if spindleSet.Status.Phase == "Succeeded" || spindleSet.Status.Phase == "Failed" { 409 + return nil 410 + } 411 + 348 412 logger := log.FromContext(ctx) 349 413 350 414 pipelineRun := spindleSet.Spec.PipelineRun