tangled
alpha
login
or
join now
avikav.net
/
core
forked from
tangled.org/core
0
fork
atom
this repo has no description
0
fork
atom
overview
issues
pulls
pipelines
a
avikav.net
8 months ago
e009ee94
0a4a028a
0/3
build.yml
failed
8mo ago
fmt.yml
failed
8mo ago
test.yml
failed
8mo ago
+546
-415
7 changed files
expand all
collapse all
unified
split
.idea
modules.xml
tangled.iml
vcs.xml
workspace.xml
a
spindle
engine
docker
docker.go
engine.go
+8
.idea/modules.xml
···
1
1
+
<?xml version="1.0" encoding="UTF-8"?>
2
2
+
<project version="4">
3
3
+
<component name="ProjectModuleManager">
4
4
+
<modules>
5
5
+
<module fileurl="file://$PROJECT_DIR$/.idea/tangled.iml" filepath="$PROJECT_DIR$/.idea/tangled.iml" />
6
6
+
</modules>
7
7
+
</component>
8
8
+
</project>
+9
.idea/tangled.iml
···
1
1
+
<?xml version="1.0" encoding="UTF-8"?>
2
2
+
<module type="WEB_MODULE" version="4">
3
3
+
<component name="Go" enabled="true" />
4
4
+
<component name="NewModuleRootManager">
5
5
+
<content url="file://$MODULE_DIR$" />
6
6
+
<orderEntry type="inheritedJdk" />
7
7
+
<orderEntry type="sourceFolder" forTests="false" />
8
8
+
</component>
9
9
+
</module>
+6
.idea/vcs.xml
···
1
1
+
<?xml version="1.0" encoding="UTF-8"?>
2
2
+
<project version="4">
3
3
+
<component name="VcsDirectoryMappings">
4
4
+
<mapping directory="" vcs="Git" />
5
5
+
</component>
6
6
+
</project>
+88
.idea/workspace.xml
···
1
1
+
<?xml version="1.0" encoding="UTF-8"?>
2
2
+
<project version="4">
3
3
+
<component name="AutoImportSettings">
4
4
+
<option name="autoReloadType" value="ALL" />
5
5
+
</component>
6
6
+
<component name="ChangeListManager">
7
7
+
<list default="true" id="b6a35564-2dda-4b9c-bba0-2bff8baa1df9" name="Changes" comment="">
8
8
+
<change afterPath="$PROJECT_DIR$/.idea/modules.xml" afterDir="false" />
9
9
+
<change afterPath="$PROJECT_DIR$/.idea/tangled.iml" afterDir="false" />
10
10
+
<change afterPath="$PROJECT_DIR$/.idea/vcs.xml" afterDir="false" />
11
11
+
<change afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
12
12
+
<change afterPath="$PROJECT_DIR$/spindle/engine/engine.go" afterDir="false" />
13
13
+
<change beforePath="$PROJECT_DIR$/spindle/engine/engine.go" beforeDir="false" afterPath="$PROJECT_DIR$/spindle/engine/docker/docker.go" afterDir="false" />
14
14
+
</list>
15
15
+
<option name="SHOW_DIALOG" value="false" />
16
16
+
<option name="HIGHLIGHT_CONFLICTS" value="true" />
17
17
+
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
18
18
+
<option name="LAST_RESOLUTION" value="IGNORE" />
19
19
+
</component>
20
20
+
<component name="FileTemplateManagerImpl">
21
21
+
<option name="RECENT_TEMPLATES">
22
22
+
<list>
23
23
+
<option value="Go File" />
24
24
+
</list>
25
25
+
</option>
26
26
+
</component>
27
27
+
<component name="GOROOT" url="file:///opt/homebrew/opt/go/libexec" />
28
28
+
<component name="Git.Settings">
29
29
+
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
30
30
+
</component>
31
31
+
<component name="ProjectColorInfo"><![CDATA[{
32
32
+
"associatedIndex": 6
33
33
+
}]]></component>
34
34
+
<component name="ProjectId" id="2zpS3z7RgLfB5UMJTppV7uRKN5t" />
35
35
+
<component name="ProjectViewState">
36
36
+
<option name="hideEmptyMiddlePackages" value="true" />
37
37
+
<option name="showLibraryContents" value="true" />
38
38
+
</component>
39
39
+
<component name="PropertiesComponent"><![CDATA[{
40
40
+
"keyToString": {
41
41
+
"DefaultGoTemplateProperty": "Go File",
42
42
+
"ModuleVcsDetector.initialDetectionPerformed": "true",
43
43
+
"RunOnceActivity.GoLinterPluginOnboarding": "true",
44
44
+
"RunOnceActivity.GoLinterPluginStorageMigration": "true",
45
45
+
"RunOnceActivity.ShowReadmeOnStart": "true",
46
46
+
"RunOnceActivity.git.unshallow": "true",
47
47
+
"RunOnceActivity.go.formatter.settings.were.checked": "true",
48
48
+
"RunOnceActivity.go.migrated.go.modules.settings": "true",
49
49
+
"RunOnceActivity.go.modules.go.list.on.any.changes.was.set": "true",
50
50
+
"git-widget-placeholder": "0a4a028a",
51
51
+
"go.import.settings.migrated": "true",
52
52
+
"go.sdk.automatically.set": "true",
53
53
+
"last_opened_file_path": "/Users/avi/code/tangled",
54
54
+
"node.js.detected.package.eslint": "true",
55
55
+
"node.js.selected.package.eslint": "(autodetect)",
56
56
+
"nodejs_package_manager_path": "npm"
57
57
+
}
58
58
+
}]]></component>
59
59
+
<component name="RecentsManager">
60
60
+
<key name="MoveFile.RECENT_KEYS">
61
61
+
<recent name="$PROJECT_DIR$/spindle/engine/docker" />
62
62
+
</key>
63
63
+
</component>
64
64
+
<component name="SharedIndexes">
65
65
+
<attachedChunks>
66
66
+
<set>
67
67
+
<option value="bundled-gosdk-3b128438d3f6-07d2d2d66b1e-org.jetbrains.plugins.go.sharedIndexes.bundled-GO-251.26927.50" />
68
68
+
<option value="bundled-js-predefined-d6986cc7102b-09060db00ec0-JavaScript-GO-251.26927.50" />
69
69
+
</set>
70
70
+
</attachedChunks>
71
71
+
</component>
72
72
+
<component name="TaskManager">
73
73
+
<task active="true" id="Default" summary="Default task">
74
74
+
<changelist id="b6a35564-2dda-4b9c-bba0-2bff8baa1df9" name="Changes" comment="" />
75
75
+
<created>1752426325757</created>
76
76
+
<option name="number" value="Default" />
77
77
+
<option name="presentableId" value="Default" />
78
78
+
<updated>1752426325757</updated>
79
79
+
</task>
80
80
+
<servers />
81
81
+
</component>
82
82
+
<component name="TypeScriptGeneratedFilesManager">
83
83
+
<option name="version" value="3" />
84
84
+
</component>
85
85
+
<component name="VgoProject">
86
86
+
<settings-migrated>true</settings-migrated>
87
87
+
</component>
88
88
+
</project>
a
This is a binary file and will not be displayed.
+433
spindle/engine/docker/docker.go
···
1
1
+
package docker
2
2
+
3
3
+
import (
4
4
+
"context"
5
5
+
"errors"
6
6
+
"fmt"
7
7
+
"io"
8
8
+
"log/slog"
9
9
+
"os"
10
10
+
"strings"
11
11
+
"sync"
12
12
+
"tangled.sh/tangled.sh/core/spindle/engine"
13
13
+
"time"
14
14
+
15
15
+
"github.com/docker/docker/api/types/container"
16
16
+
"github.com/docker/docker/api/types/image"
17
17
+
"github.com/docker/docker/api/types/mount"
18
18
+
"github.com/docker/docker/api/types/network"
19
19
+
"github.com/docker/docker/api/types/volume"
20
20
+
"github.com/docker/docker/client"
21
21
+
"github.com/docker/docker/pkg/stdcopy"
22
22
+
"tangled.sh/tangled.sh/core/log"
23
23
+
"tangled.sh/tangled.sh/core/notifier"
24
24
+
"tangled.sh/tangled.sh/core/spindle/config"
25
25
+
"tangled.sh/tangled.sh/core/spindle/db"
26
26
+
"tangled.sh/tangled.sh/core/spindle/models"
27
27
+
)
28
28
+
29
29
+
const (
30
30
+
workspaceDir = "/tangled/workspace"
31
31
+
)
32
32
+
33
33
+
type cleanupFunc func(context.Context) error
34
34
+
35
35
+
type Engine struct {
36
36
+
backend client.APIClient
37
37
+
l *slog.Logger
38
38
+
db *db.DB
39
39
+
n *notifier.Notifier
40
40
+
cfg *config.Config
41
41
+
42
42
+
cleanupMu sync.Mutex
43
43
+
cleanup map[string][]cleanupFunc
44
44
+
}
45
45
+
46
46
+
func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) {
47
47
+
dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
48
48
+
if err != nil {
49
49
+
return nil, err
50
50
+
}
51
51
+
52
52
+
l := log.FromContext(ctx).With("component", "spindle")
53
53
+
54
54
+
e := &Engine{
55
55
+
backend: dcli,
56
56
+
l: l,
57
57
+
db: db,
58
58
+
n: n,
59
59
+
cfg: cfg,
60
60
+
}
61
61
+
62
62
+
e.cleanup = make(map[string][]cleanupFunc)
63
63
+
64
64
+
return e, nil
65
65
+
}
66
66
+
67
67
+
func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
68
68
+
e.l.Info("starting all workflows in parallel", "pipeline", pipelineId)
69
69
+
70
70
+
wg := sync.WaitGroup{}
71
71
+
for _, w := range pipeline.Workflows {
72
72
+
wg.Add(1)
73
73
+
go func() error {
74
74
+
defer wg.Done()
75
75
+
wid := models.WorkflowId{
76
76
+
PipelineId: pipelineId,
77
77
+
Name: w.Name,
78
78
+
}
79
79
+
80
80
+
err := e.db.StatusRunning(wid, e.n)
81
81
+
if err != nil {
82
82
+
return err
83
83
+
}
84
84
+
85
85
+
err = e.SetupWorkflow(ctx, wid)
86
86
+
if err != nil {
87
87
+
e.l.Error("setting up worklow", "wid", wid, "err", err)
88
88
+
return err
89
89
+
}
90
90
+
defer e.DestroyWorkflow(ctx, wid)
91
91
+
92
92
+
reader, err := e.backend.ImagePull(ctx, w.Image, image.PullOptions{})
93
93
+
if err != nil {
94
94
+
e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error())
95
95
+
96
96
+
err := e.db.StatusFailed(wid, err.Error(), -1, e.n)
97
97
+
if err != nil {
98
98
+
return err
99
99
+
}
100
100
+
101
101
+
return fmt.Errorf("pulling image: %w", err)
102
102
+
}
103
103
+
defer reader.Close()
104
104
+
io.Copy(os.Stdout, reader)
105
105
+
106
106
+
workflowTimeoutStr := e.cfg.Pipelines.WorkflowTimeout
107
107
+
workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
108
108
+
if err != nil {
109
109
+
e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
110
110
+
workflowTimeout = 5 * time.Minute
111
111
+
}
112
112
+
e.l.Info("using workflow timeout", "timeout", workflowTimeout)
113
113
+
ctx, cancel := context.WithTimeout(ctx, workflowTimeout)
114
114
+
defer cancel()
115
115
+
116
116
+
err = e.StartSteps(ctx, w.Steps, wid, w.Image)
117
117
+
if err != nil {
118
118
+
if errors.Is(err, engine.ErrTimedOut) {
119
119
+
dbErr := e.db.StatusTimeout(wid, e.n)
120
120
+
if dbErr != nil {
121
121
+
return dbErr
122
122
+
}
123
123
+
} else {
124
124
+
dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n)
125
125
+
if dbErr != nil {
126
126
+
return dbErr
127
127
+
}
128
128
+
}
129
129
+
130
130
+
return fmt.Errorf("starting steps image: %w", err)
131
131
+
}
132
132
+
133
133
+
err = e.db.StatusSuccess(wid, e.n)
134
134
+
if err != nil {
135
135
+
return err
136
136
+
}
137
137
+
138
138
+
return nil
139
139
+
}()
140
140
+
}
141
141
+
142
142
+
wg.Wait()
143
143
+
}
144
144
+
145
145
+
// SetupWorkflow sets up a new network for the workflow and volumes for
146
146
+
// the workspace and Nix store. These are persisted across steps and are
147
147
+
// destroyed at the end of the workflow.
148
148
+
func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error {
149
149
+
e.l.Info("setting up workflow", "workflow", wid)
150
150
+
151
151
+
_, err := e.backend.VolumeCreate(ctx, volume.CreateOptions{
152
152
+
Name: workspaceVolume(wid),
153
153
+
Driver: "local",
154
154
+
})
155
155
+
if err != nil {
156
156
+
return err
157
157
+
}
158
158
+
e.registerCleanup(wid, func(ctx context.Context) error {
159
159
+
return e.backend.VolumeRemove(ctx, workspaceVolume(wid), true)
160
160
+
})
161
161
+
162
162
+
_, err = e.backend.VolumeCreate(ctx, volume.CreateOptions{
163
163
+
Name: nixVolume(wid),
164
164
+
Driver: "local",
165
165
+
})
166
166
+
if err != nil {
167
167
+
return err
168
168
+
}
169
169
+
e.registerCleanup(wid, func(ctx context.Context) error {
170
170
+
return e.backend.VolumeRemove(ctx, nixVolume(wid), true)
171
171
+
})
172
172
+
173
173
+
_, err = e.backend.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
174
174
+
Driver: "bridge",
175
175
+
})
176
176
+
if err != nil {
177
177
+
return err
178
178
+
}
179
179
+
e.registerCleanup(wid, func(ctx context.Context) error {
180
180
+
return e.backend.NetworkRemove(ctx, networkName(wid))
181
181
+
})
182
182
+
183
183
+
return nil
184
184
+
}
185
185
+
186
186
+
// StartSteps starts all steps sequentially with the same base image.
187
187
+
// ONLY marks pipeline as failed if container's exit code is non-zero.
188
188
+
// All other errors are bubbled up.
189
189
+
// Fixed version of the step execution logic
190
190
+
func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error {
191
191
+
192
192
+
for stepIdx, step := range steps {
193
193
+
select {
194
194
+
case <-ctx.Done():
195
195
+
return ctx.Err()
196
196
+
default:
197
197
+
}
198
198
+
199
199
+
envs := engine.ConstructEnvs(step.Environment)
200
200
+
envs.AddEnv("HOME", workspaceDir)
201
201
+
e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
202
202
+
203
203
+
hostConfig := hostConfig(wid)
204
204
+
resp, err := e.backend.ContainerCreate(ctx, &container.Config{
205
205
+
Image: image,
206
206
+
Cmd: []string{"bash", "-c", step.Command},
207
207
+
WorkingDir: workspaceDir,
208
208
+
Tty: false,
209
209
+
Hostname: "spindle",
210
210
+
Env: envs.Slice(),
211
211
+
}, hostConfig, nil, nil, "")
212
212
+
defer e.DestroyStep(ctx, resp.ID)
213
213
+
if err != nil {
214
214
+
return fmt.Errorf("creating container: %w", err)
215
215
+
}
216
216
+
217
217
+
err = e.backend.NetworkConnect(ctx, networkName(wid), resp.ID, nil)
218
218
+
if err != nil {
219
219
+
return fmt.Errorf("connecting network: %w", err)
220
220
+
}
221
221
+
222
222
+
err = e.backend.ContainerStart(ctx, resp.ID, container.StartOptions{})
223
223
+
if err != nil {
224
224
+
return err
225
225
+
}
226
226
+
e.l.Info("started container", "name", resp.ID, "step", step.Name)
227
227
+
228
228
+
// start tailing logs in background
229
229
+
tailDone := make(chan error, 1)
230
230
+
go func() {
231
231
+
tailDone <- e.TailStep(ctx, resp.ID, wid, stepIdx, step)
232
232
+
}()
233
233
+
234
234
+
// wait for container completion or timeout
235
235
+
waitDone := make(chan struct{})
236
236
+
var state *container.State
237
237
+
var waitErr error
238
238
+
239
239
+
go func() {
240
240
+
defer close(waitDone)
241
241
+
state, waitErr = e.WaitStep(ctx, resp.ID)
242
242
+
}()
243
243
+
244
244
+
select {
245
245
+
case <-waitDone:
246
246
+
247
247
+
// wait for tailing to complete
248
248
+
<-tailDone
249
249
+
250
250
+
case <-ctx.Done():
251
251
+
e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name)
252
252
+
err = e.DestroyStep(context.Background(), resp.ID)
253
253
+
if err != nil {
254
254
+
e.l.Error("failed to destroy step", "container", resp.ID, "error", err)
255
255
+
}
256
256
+
257
257
+
// wait for both goroutines to finish
258
258
+
<-waitDone
259
259
+
<-tailDone
260
260
+
261
261
+
return engine.ErrTimedOut
262
262
+
}
263
263
+
264
264
+
select {
265
265
+
case <-ctx.Done():
266
266
+
return ctx.Err()
267
267
+
default:
268
268
+
}
269
269
+
270
270
+
if waitErr != nil {
271
271
+
return waitErr
272
272
+
}
273
273
+
274
274
+
err = e.DestroyStep(ctx, resp.ID)
275
275
+
if err != nil {
276
276
+
return err
277
277
+
}
278
278
+
279
279
+
if state.ExitCode != 0 {
280
280
+
e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled)
281
281
+
if state.OOMKilled {
282
282
+
return engine.ErrOOMKilled
283
283
+
}
284
284
+
return engine.ErrWorkflowFailed
285
285
+
}
286
286
+
}
287
287
+
288
288
+
return nil
289
289
+
}
290
290
+
291
291
+
func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
292
292
+
wait, errCh := e.backend.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
293
293
+
select {
294
294
+
case err := <-errCh:
295
295
+
if err != nil {
296
296
+
return nil, err
297
297
+
}
298
298
+
case <-wait:
299
299
+
}
300
300
+
301
301
+
e.l.Info("waited for container", "name", containerID)
302
302
+
303
303
+
info, err := e.backend.ContainerInspect(ctx, containerID)
304
304
+
if err != nil {
305
305
+
return nil, err
306
306
+
}
307
307
+
308
308
+
return info.State, nil
309
309
+
}
310
310
+
311
311
+
func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error {
312
312
+
wfLogger, err := engine.NewWorkflowLogger(e.cfg.Pipelines.LogDir, wid)
313
313
+
if err != nil {
314
314
+
e.l.Warn("failed to setup step logger; logs will not be persisted", "error", err)
315
315
+
return err
316
316
+
}
317
317
+
defer wfLogger.Close()
318
318
+
319
319
+
ctl := wfLogger.ControlWriter(stepIdx, step)
320
320
+
ctl.Write([]byte(step.Name))
321
321
+
322
322
+
logs, err := e.backend.ContainerLogs(ctx, containerID, container.LogsOptions{
323
323
+
Follow: true,
324
324
+
ShowStdout: true,
325
325
+
ShowStderr: true,
326
326
+
Details: false,
327
327
+
Timestamps: false,
328
328
+
})
329
329
+
if err != nil {
330
330
+
return err
331
331
+
}
332
332
+
333
333
+
_, err = stdcopy.StdCopy(
334
334
+
wfLogger.DataWriter("stdout"),
335
335
+
wfLogger.DataWriter("stderr"),
336
336
+
logs,
337
337
+
)
338
338
+
if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
339
339
+
return fmt.Errorf("failed to copy logs: %w", err)
340
340
+
}
341
341
+
342
342
+
return nil
343
343
+
}
344
344
+
345
345
+
func (e *Engine) DestroyStep(ctx context.Context, containerID string) error {
346
346
+
err := e.backend.ContainerKill(ctx, containerID, "9") // SIGKILL
347
347
+
if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
348
348
+
return err
349
349
+
}
350
350
+
351
351
+
if err := e.backend.ContainerRemove(ctx, containerID, container.RemoveOptions{
352
352
+
RemoveVolumes: true,
353
353
+
RemoveLinks: false,
354
354
+
Force: false,
355
355
+
}); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
356
356
+
return err
357
357
+
}
358
358
+
359
359
+
return nil
360
360
+
}
361
361
+
362
362
+
func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
363
363
+
e.cleanupMu.Lock()
364
364
+
key := wid.String()
365
365
+
366
366
+
fns := e.cleanup[key]
367
367
+
delete(e.cleanup, key)
368
368
+
e.cleanupMu.Unlock()
369
369
+
370
370
+
for _, fn := range fns {
371
371
+
if err := fn(ctx); err != nil {
372
372
+
e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
373
373
+
}
374
374
+
}
375
375
+
return nil
376
376
+
}
377
377
+
378
378
+
func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
379
379
+
e.cleanupMu.Lock()
380
380
+
defer e.cleanupMu.Unlock()
381
381
+
382
382
+
key := wid.String()
383
383
+
e.cleanup[key] = append(e.cleanup[key], fn)
384
384
+
}
385
385
+
386
386
+
func hostConfig(wid models.WorkflowId) *container.HostConfig {
387
387
+
hostConfig := &container.HostConfig{
388
388
+
Mounts: []mount.Mount{
389
389
+
{
390
390
+
Type: mount.TypeVolume,
391
391
+
Source: engine.workspaceVolume(wid),
392
392
+
Target: workspaceDir,
393
393
+
},
394
394
+
{
395
395
+
Type: mount.TypeVolume,
396
396
+
Source: engine.nixVolume(wid),
397
397
+
Target: "/nix",
398
398
+
},
399
399
+
{
400
400
+
Type: mount.TypeTmpfs,
401
401
+
Target: "/tmp",
402
402
+
ReadOnly: false,
403
403
+
TmpfsOptions: &mount.TmpfsOptions{
404
404
+
Mode: 0o1777, // world-writeable sticky bit
405
405
+
Options: [][]string{
406
406
+
{"exec"},
407
407
+
},
408
408
+
},
409
409
+
},
410
410
+
{
411
411
+
Type: mount.TypeVolume,
412
412
+
Source: "etc-nix-" + wid.String(),
413
413
+
Target: "/etc/nix",
414
414
+
},
415
415
+
},
416
416
+
ReadonlyRootfs: false,
417
417
+
CapDrop: []string{"ALL"},
418
418
+
CapAdd: []string{"CAP_DAC_OVERRIDE"},
419
419
+
SecurityOpt: []string{"no-new-privileges"},
420
420
+
ExtraHosts: []string{"host.docker.internal:host-gateway"},
421
421
+
}
422
422
+
423
423
+
return hostConfig
424
424
+
}
425
425
+
426
426
+
// thanks woodpecker
427
427
+
func isErrContainerNotFoundOrNotRunning(err error) bool {
428
428
+
// Error response from daemon: Cannot kill container: ...: No such container: ...
429
429
+
// Error response from daemon: Cannot kill container: ...: Container ... is not running"
430
430
+
// Error response from podman daemon: can only kill running containers. ... is in state exited
431
431
+
// Error: No such container: ...
432
432
+
return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
433
433
+
}
+2
-415
spindle/engine/engine.go
···
9
9
"os"
10
10
"strings"
11
11
"sync"
12
12
-
"time"
13
13
-
14
14
-
"github.com/docker/docker/api/types/container"
15
15
-
"github.com/docker/docker/api/types/image"
16
16
-
"github.com/docker/docker/api/types/mount"
17
17
-
"github.com/docker/docker/api/types/network"
18
18
-
"github.com/docker/docker/api/types/volume"
19
19
-
"github.com/docker/docker/client"
20
20
-
"github.com/docker/docker/pkg/stdcopy"
21
12
"tangled.sh/tangled.sh/core/log"
22
13
"tangled.sh/tangled.sh/core/notifier"
23
14
"tangled.sh/tangled.sh/core/spindle/config"
24
15
"tangled.sh/tangled.sh/core/spindle/db"
16
16
+
"tangled.sh/tangled.sh/core/spindle/engine"
25
17
"tangled.sh/tangled.sh/core/spindle/models"
18
18
+
"time"
26
19
)
27
20
28
28
-
const (
29
29
-
workspaceDir = "/tangled/workspace"
30
30
-
)
31
31
-
32
32
-
type cleanupFunc func(context.Context) error
33
33
-
34
34
-
type Engine struct {
35
35
-
docker client.APIClient
36
36
-
l *slog.Logger
37
37
-
db *db.DB
38
38
-
n *notifier.Notifier
39
39
-
cfg *config.Config
40
40
-
41
41
-
cleanupMu sync.Mutex
42
42
-
cleanup map[string][]cleanupFunc
43
43
-
}
44
44
-
45
45
-
func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier) (*Engine, error) {
46
46
-
dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
47
47
-
if err != nil {
48
48
-
return nil, err
49
49
-
}
50
50
-
51
51
-
l := log.FromContext(ctx).With("component", "spindle")
52
52
-
53
53
-
e := &Engine{
54
54
-
docker: dcli,
55
55
-
l: l,
56
56
-
db: db,
57
57
-
n: n,
58
58
-
cfg: cfg,
59
59
-
}
60
60
-
61
61
-
e.cleanup = make(map[string][]cleanupFunc)
62
62
-
63
63
-
return e, nil
64
64
-
}
65
65
-
66
66
-
func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) {
67
67
-
e.l.Info("starting all workflows in parallel", "pipeline", pipelineId)
68
68
-
69
69
-
wg := sync.WaitGroup{}
70
70
-
for _, w := range pipeline.Workflows {
71
71
-
wg.Add(1)
72
72
-
go func() error {
73
73
-
defer wg.Done()
74
74
-
wid := models.WorkflowId{
75
75
-
PipelineId: pipelineId,
76
76
-
Name: w.Name,
77
77
-
}
78
78
-
79
79
-
err := e.db.StatusRunning(wid, e.n)
80
80
-
if err != nil {
81
81
-
return err
82
82
-
}
83
83
-
84
84
-
err = e.SetupWorkflow(ctx, wid)
85
85
-
if err != nil {
86
86
-
e.l.Error("setting up worklow", "wid", wid, "err", err)
87
87
-
return err
88
88
-
}
89
89
-
defer e.DestroyWorkflow(ctx, wid)
90
90
-
91
91
-
reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{})
92
92
-
if err != nil {
93
93
-
e.l.Error("pipeline failed!", "workflowId", wid, "error", err.Error())
94
94
-
95
95
-
err := e.db.StatusFailed(wid, err.Error(), -1, e.n)
96
96
-
if err != nil {
97
97
-
return err
98
98
-
}
99
99
-
100
100
-
return fmt.Errorf("pulling image: %w", err)
101
101
-
}
102
102
-
defer reader.Close()
103
103
-
io.Copy(os.Stdout, reader)
104
104
-
105
105
-
workflowTimeoutStr := e.cfg.Pipelines.WorkflowTimeout
106
106
-
workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
107
107
-
if err != nil {
108
108
-
e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
109
109
-
workflowTimeout = 5 * time.Minute
110
110
-
}
111
111
-
e.l.Info("using workflow timeout", "timeout", workflowTimeout)
112
112
-
ctx, cancel := context.WithTimeout(ctx, workflowTimeout)
113
113
-
defer cancel()
114
114
-
115
115
-
err = e.StartSteps(ctx, w.Steps, wid, w.Image)
116
116
-
if err != nil {
117
117
-
if errors.Is(err, ErrTimedOut) {
118
118
-
dbErr := e.db.StatusTimeout(wid, e.n)
119
119
-
if dbErr != nil {
120
120
-
return dbErr
121
121
-
}
122
122
-
} else {
123
123
-
dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n)
124
124
-
if dbErr != nil {
125
125
-
return dbErr
126
126
-
}
127
127
-
}
128
128
-
129
129
-
return fmt.Errorf("starting steps image: %w", err)
130
130
-
}
131
131
-
132
132
-
err = e.db.StatusSuccess(wid, e.n)
133
133
-
if err != nil {
134
134
-
return err
135
135
-
}
136
136
-
137
137
-
return nil
138
138
-
}()
139
139
-
}
140
140
-
141
141
-
wg.Wait()
142
142
-
}
143
143
-
144
144
-
// SetupWorkflow sets up a new network for the workflow and volumes for
145
145
-
// the workspace and Nix store. These are persisted across steps and are
146
146
-
// destroyed at the end of the workflow.
147
147
-
func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error {
148
148
-
e.l.Info("setting up workflow", "workflow", wid)
149
149
-
150
150
-
_, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{
151
151
-
Name: workspaceVolume(wid),
152
152
-
Driver: "local",
153
153
-
})
154
154
-
if err != nil {
155
155
-
return err
156
156
-
}
157
157
-
e.registerCleanup(wid, func(ctx context.Context) error {
158
158
-
return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true)
159
159
-
})
160
160
-
161
161
-
_, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{
162
162
-
Name: nixVolume(wid),
163
163
-
Driver: "local",
164
164
-
})
165
165
-
if err != nil {
166
166
-
return err
167
167
-
}
168
168
-
e.registerCleanup(wid, func(ctx context.Context) error {
169
169
-
return e.docker.VolumeRemove(ctx, nixVolume(wid), true)
170
170
-
})
171
171
-
172
172
-
_, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
173
173
-
Driver: "bridge",
174
174
-
})
175
175
-
if err != nil {
176
176
-
return err
177
177
-
}
178
178
-
e.registerCleanup(wid, func(ctx context.Context) error {
179
179
-
return e.docker.NetworkRemove(ctx, networkName(wid))
180
180
-
})
181
181
-
182
182
-
return nil
183
183
-
}
184
184
-
185
185
-
// StartSteps starts all steps sequentially with the same base image.
186
186
-
// ONLY marks pipeline as failed if container's exit code is non-zero.
187
187
-
// All other errors are bubbled up.
188
188
-
// Fixed version of the step execution logic
189
189
-
func (e *Engine) StartSteps(ctx context.Context, steps []models.Step, wid models.WorkflowId, image string) error {
190
190
-
191
191
-
for stepIdx, step := range steps {
192
192
-
select {
193
193
-
case <-ctx.Done():
194
194
-
return ctx.Err()
195
195
-
default:
196
196
-
}
197
197
-
198
198
-
envs := ConstructEnvs(step.Environment)
199
199
-
envs.AddEnv("HOME", workspaceDir)
200
200
-
e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice())
201
201
-
202
202
-
hostConfig := hostConfig(wid)
203
203
-
resp, err := e.docker.ContainerCreate(ctx, &container.Config{
204
204
-
Image: image,
205
205
-
Cmd: []string{"bash", "-c", step.Command},
206
206
-
WorkingDir: workspaceDir,
207
207
-
Tty: false,
208
208
-
Hostname: "spindle",
209
209
-
Env: envs.Slice(),
210
210
-
}, hostConfig, nil, nil, "")
211
211
-
defer e.DestroyStep(ctx, resp.ID)
212
212
-
if err != nil {
213
213
-
return fmt.Errorf("creating container: %w", err)
214
214
-
}
215
215
-
216
216
-
err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil)
217
217
-
if err != nil {
218
218
-
return fmt.Errorf("connecting network: %w", err)
219
219
-
}
220
220
-
221
221
-
err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
222
222
-
if err != nil {
223
223
-
return err
224
224
-
}
225
225
-
e.l.Info("started container", "name", resp.ID, "step", step.Name)
226
226
-
227
227
-
// start tailing logs in background
228
228
-
tailDone := make(chan error, 1)
229
229
-
go func() {
230
230
-
tailDone <- e.TailStep(ctx, resp.ID, wid, stepIdx, step)
231
231
-
}()
232
232
-
233
233
-
// wait for container completion or timeout
234
234
-
waitDone := make(chan struct{})
235
235
-
var state *container.State
236
236
-
var waitErr error
237
237
-
238
238
-
go func() {
239
239
-
defer close(waitDone)
240
240
-
state, waitErr = e.WaitStep(ctx, resp.ID)
241
241
-
}()
242
242
-
243
243
-
select {
244
244
-
case <-waitDone:
245
245
-
246
246
-
// wait for tailing to complete
247
247
-
<-tailDone
248
248
-
249
249
-
case <-ctx.Done():
250
250
-
e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name)
251
251
-
err = e.DestroyStep(context.Background(), resp.ID)
252
252
-
if err != nil {
253
253
-
e.l.Error("failed to destroy step", "container", resp.ID, "error", err)
254
254
-
}
255
255
-
256
256
-
// wait for both goroutines to finish
257
257
-
<-waitDone
258
258
-
<-tailDone
259
259
-
260
260
-
return ErrTimedOut
261
261
-
}
262
262
-
263
263
-
select {
264
264
-
case <-ctx.Done():
265
265
-
return ctx.Err()
266
266
-
default:
267
267
-
}
268
268
-
269
269
-
if waitErr != nil {
270
270
-
return waitErr
271
271
-
}
272
272
-
273
273
-
err = e.DestroyStep(ctx, resp.ID)
274
274
-
if err != nil {
275
275
-
return err
276
276
-
}
277
277
-
278
278
-
if state.ExitCode != 0 {
279
279
-
e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled)
280
280
-
if state.OOMKilled {
281
281
-
return ErrOOMKilled
282
282
-
}
283
283
-
return ErrWorkflowFailed
284
284
-
}
285
285
-
}
286
286
-
287
287
-
return nil
288
288
-
}
289
289
-
290
290
-
func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) {
291
291
-
wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
292
292
-
select {
293
293
-
case err := <-errCh:
294
294
-
if err != nil {
295
295
-
return nil, err
296
296
-
}
297
297
-
case <-wait:
298
298
-
}
299
299
-
300
300
-
e.l.Info("waited for container", "name", containerID)
301
301
-
302
302
-
info, err := e.docker.ContainerInspect(ctx, containerID)
303
303
-
if err != nil {
304
304
-
return nil, err
305
305
-
}
306
306
-
307
307
-
return info.State, nil
308
308
-
}
309
309
-
310
310
-
func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error {
311
311
-
wfLogger, err := NewWorkflowLogger(e.cfg.Pipelines.LogDir, wid)
312
312
-
if err != nil {
313
313
-
e.l.Warn("failed to setup step logger; logs will not be persisted", "error", err)
314
314
-
return err
315
315
-
}
316
316
-
defer wfLogger.Close()
317
317
-
318
318
-
ctl := wfLogger.ControlWriter(stepIdx, step)
319
319
-
ctl.Write([]byte(step.Name))
320
320
-
321
321
-
logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{
322
322
-
Follow: true,
323
323
-
ShowStdout: true,
324
324
-
ShowStderr: true,
325
325
-
Details: false,
326
326
-
Timestamps: false,
327
327
-
})
328
328
-
if err != nil {
329
329
-
return err
330
330
-
}
331
331
-
332
332
-
_, err = stdcopy.StdCopy(
333
333
-
wfLogger.DataWriter("stdout"),
334
334
-
wfLogger.DataWriter("stderr"),
335
335
-
logs,
336
336
-
)
337
337
-
if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
338
338
-
return fmt.Errorf("failed to copy logs: %w", err)
339
339
-
}
340
340
-
341
341
-
return nil
342
342
-
}
343
343
-
344
344
-
func (e *Engine) DestroyStep(ctx context.Context, containerID string) error {
345
345
-
err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL
346
346
-
if err != nil && !isErrContainerNotFoundOrNotRunning(err) {
347
347
-
return err
348
348
-
}
349
349
-
350
350
-
if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{
351
351
-
RemoveVolumes: true,
352
352
-
RemoveLinks: false,
353
353
-
Force: false,
354
354
-
}); err != nil && !isErrContainerNotFoundOrNotRunning(err) {
355
355
-
return err
356
356
-
}
357
357
-
358
358
-
return nil
359
359
-
}
360
360
-
361
361
-
func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
362
362
-
e.cleanupMu.Lock()
363
363
-
key := wid.String()
364
364
-
365
365
-
fns := e.cleanup[key]
366
366
-
delete(e.cleanup, key)
367
367
-
e.cleanupMu.Unlock()
368
368
-
369
369
-
for _, fn := range fns {
370
370
-
if err := fn(ctx); err != nil {
371
371
-
e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
372
372
-
}
373
373
-
}
374
374
-
return nil
375
375
-
}
376
376
-
377
377
-
func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
378
378
-
e.cleanupMu.Lock()
379
379
-
defer e.cleanupMu.Unlock()
380
380
-
381
381
-
key := wid.String()
382
382
-
e.cleanup[key] = append(e.cleanup[key], fn)
383
383
-
}
384
384
-
385
21
func workspaceVolume(wid models.WorkflowId) string {
386
22
return fmt.Sprintf("workspace-%s", wid)
387
23
}
···
393
29
func networkName(wid models.WorkflowId) string {
394
30
return fmt.Sprintf("workflow-network-%s", wid)
395
31
}
396
396
-
397
397
-
func hostConfig(wid models.WorkflowId) *container.HostConfig {
398
398
-
hostConfig := &container.HostConfig{
399
399
-
Mounts: []mount.Mount{
400
400
-
{
401
401
-
Type: mount.TypeVolume,
402
402
-
Source: workspaceVolume(wid),
403
403
-
Target: workspaceDir,
404
404
-
},
405
405
-
{
406
406
-
Type: mount.TypeVolume,
407
407
-
Source: nixVolume(wid),
408
408
-
Target: "/nix",
409
409
-
},
410
410
-
{
411
411
-
Type: mount.TypeTmpfs,
412
412
-
Target: "/tmp",
413
413
-
ReadOnly: false,
414
414
-
TmpfsOptions: &mount.TmpfsOptions{
415
415
-
Mode: 0o1777, // world-writeable sticky bit
416
416
-
Options: [][]string{
417
417
-
{"exec"},
418
418
-
},
419
419
-
},
420
420
-
},
421
421
-
{
422
422
-
Type: mount.TypeVolume,
423
423
-
Source: "etc-nix-" + wid.String(),
424
424
-
Target: "/etc/nix",
425
425
-
},
426
426
-
},
427
427
-
ReadonlyRootfs: false,
428
428
-
CapDrop: []string{"ALL"},
429
429
-
CapAdd: []string{"CAP_DAC_OVERRIDE"},
430
430
-
SecurityOpt: []string{"no-new-privileges"},
431
431
-
ExtraHosts: []string{"host.docker.internal:host-gateway"},
432
432
-
}
433
433
-
434
434
-
return hostConfig
435
435
-
}
436
436
-
437
437
-
// thanks woodpecker
438
438
-
func isErrContainerNotFoundOrNotRunning(err error) bool {
439
439
-
// Error response from daemon: Cannot kill container: ...: No such container: ...
440
440
-
// Error response from daemon: Cannot kill container: ...: Container ... is not running"
441
441
-
// Error response from podman daemon: can only kill running containers. ... is in state exited
442
442
-
// Error: No such container: ...
443
443
-
return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers"))
444
444
-
}