tangled
alpha
login
or
join now
willdot.net
/
back-at-it
1
fork
atom
A tool for backing up ATProto related data to S3
1
fork
atom
overview
issues
pulls
pipelines
improve error handling
willdot.net
1 month ago
c47b5846
35f4b757
+52
-25
2 changed files
expand all
collapse all
unified
split
pds.go
tangled_knot.go
+18
-5
pds.go
···
27
27
if err != nil {
28
28
slog.Error("backup repo", "error", err)
29
29
bugsnag.Notify(err)
30
30
-
return
31
30
}
32
31
33
32
err = s.backupBlobs(ctx)
34
33
if err != nil {
35
34
slog.Error("backup blobs", "error", err)
36
35
bugsnag.Notify(err)
37
37
-
return
38
36
}
39
37
40
38
slog.Info("finished PDS backup")
···
71
69
err = os.Remove(filename)
72
70
if err != nil {
73
71
slog.Error("failed to delete pds repo file after uploading", "error", err, "filename", f.Name())
72
72
+
metadata := bugsnag.MetaData{
73
73
+
"file": {
74
74
+
"filename": f.Name(),
75
75
+
},
76
76
+
}
77
77
+
bugsnag.Notify(fmt.Errorf("delete pds repo file after uploading: %w", err), metadata)
74
78
}
75
79
}()
76
80
···
95
99
96
100
fi, err := f.Stat()
97
101
if err != nil {
98
98
-
return fmt.Errorf("stat file: %w", err)
102
102
+
return fmt.Errorf("stat written file: %w", err)
99
103
}
100
104
101
105
_, err = s.minioClient.PutObject(ctx, s.bucketName, "pds-repo.zip", f, fi.Size(), minio.PutObjectOptions{})
102
106
if err != nil {
103
103
-
return fmt.Errorf("stream repo to bucket: %w", err)
107
107
+
return fmt.Errorf("put repo file to bucket: %w", err)
104
108
}
105
109
106
110
return nil
···
123
127
124
128
zipWriter := zip.NewWriter(f)
125
129
for _, cid := range cids {
126
126
-
slog.Info("processing cid", "cid", cid)
127
130
blob, err := s.getBlob(ctx, cid)
128
131
if err != nil {
129
132
slog.Error("failed to get blob", "cid", cid, "error", err)
···
222
225
return existing, nil
223
226
}
224
227
228
228
+
slog.Info("blob not found locally - downloading", "did", s.did, "cid", cid)
229
229
+
225
230
// TODO: do proper url encoding of query params
226
231
url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", s.pdsHost, s.did, cid)
227
232
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
···
243
248
err = os.WriteFile(filename, b, os.ModePerm)
244
249
if err != nil {
245
250
slog.Error("writing blob", "error", err, "cid", cid)
251
251
+
metadata := bugsnag.MetaData{
252
252
+
"blob": {
253
253
+
"filename": filename,
254
254
+
"cid": cid,
255
255
+
"did": s.did,
256
256
+
},
257
257
+
}
258
258
+
bugsnag.Notify(fmt.Errorf("writing blob to local storage: %w", err), metadata)
246
259
}
247
260
248
261
return b, nil
+34
-20
tangled_knot.go
···
17
17
)
18
18
19
19
func (s *service) backupTangledKnot(ctx context.Context) {
20
20
-
s.backupKnotDB(ctx)
21
21
-
s.backupKnotRepos(ctx)
20
20
+
err := s.backupKnotDB(ctx)
21
21
+
if err != nil {
22
22
+
bugsnag.Notify(fmt.Errorf("backup knot db: %w", err))
23
23
+
slog.Error("failed to backup knot db", "error", err)
24
24
+
}
25
25
+
err = s.backupKnotRepos(ctx)
26
26
+
if err != nil {
27
27
+
bugsnag.Notify(fmt.Errorf("backup knot db: %w", err))
28
28
+
slog.Error("failed to backup knot db", "error", err)
29
29
+
}
22
30
23
31
slog.Info("finished tangled knot backup")
24
32
}
25
33
26
26
-
func (s *service) backupKnotDB(ctx context.Context) {
34
34
+
func (s *service) backupKnotDB(ctx context.Context) error {
27
35
dir := os.Getenv("TANGLED_KNOT_DATABASE_DIRECTORY")
28
36
if dir == "" {
29
37
slog.Info("TANGLED_KNOT_DATABASE_DIRECTORY env not set - skipping knot DB backup")
···
32
40
filename := path.Join(s.blobDir, fmt.Sprintf("%d-knot.zip", time.Now().UnixMilli()))
33
41
f, err := os.Create(filename)
34
42
if err != nil {
35
35
-
slog.Error("creating temp file", "error", err)
36
36
-
return
43
43
+
return fmt.Errorf("creating temp file: %w", err)
37
44
}
38
45
defer func() {
39
46
f.Close()
···
41
48
err = os.Remove(filename)
42
49
if err != nil {
43
50
slog.Error("failed to delete knot db zip file after uploading", "error", err, "filename", f.Name())
51
51
+
metadata := bugsnag.MetaData{
52
52
+
"file": {
53
53
+
"filename": f.Name(),
54
54
+
},
55
55
+
}
56
56
+
bugsnag.Notify(fmt.Errorf("delete knot db zip file after uploading: %w", err), metadata)
44
57
}
45
58
}()
46
59
···
49
62
// reset the reader back to the start so that the minio upload can read the data that's been written.
50
63
_, err = f.Seek(0, 0)
51
64
if err != nil {
52
52
-
slog.Error("setting seek on written file", "error", err)
53
53
-
return
65
65
+
return fmt.Errorf("setting seek on written file: %w", err)
54
66
}
55
67
56
68
fi, err := f.Stat()
57
69
if err != nil {
58
58
-
slog.Error("failed to stat knot db zip file", "error", err)
59
59
-
return
70
70
+
return fmt.Errorf("stat written file: %w", err)
60
71
}
61
72
62
73
_, err = s.minioClient.PutObject(ctx, s.bucketName, "knot-db.zip", f, fi.Size(), minio.PutObjectOptions{})
63
74
if err != nil {
64
64
-
slog.Error("stream knot DB to bucket", "error", err)
65
65
-
bugsnag.Notify(err)
75
75
+
return fmt.Errorf("put knot db zip file to bucket: %w", err)
66
76
}
77
77
+
return nil
67
78
}
68
79
69
69
-
func (s *service) backupKnotRepos(ctx context.Context) {
80
80
+
func (s *service) backupKnotRepos(ctx context.Context) error {
70
81
dir := os.Getenv("TANGLED_KNOT_REPOSITORY_DIRECTORY")
71
82
if dir == "" {
72
83
slog.Info("TANGLED_KNOT_REPOSITORY_DIRECTORY env not set - skipping knot repo backup")
···
75
86
filename := path.Join(s.blobDir, fmt.Sprintf("%d-knot-repos.zip", time.Now().UnixMilli()))
76
87
f, err := os.Create(filename)
77
88
if err != nil {
78
78
-
slog.Error("creating temp file", "error", err)
79
79
-
return
89
89
+
return fmt.Errorf("creating temp file: %w", err)
80
90
}
81
91
defer func() {
82
92
f.Close()
···
84
94
err = os.Remove(filename)
85
95
if err != nil {
86
96
slog.Error("failed to delete knot repos zip file after uploading", "error", err, "filename", f.Name())
97
97
+
metadata := bugsnag.MetaData{
98
98
+
"file": {
99
99
+
"filename": f.Name(),
100
100
+
},
101
101
+
}
102
102
+
bugsnag.Notify(fmt.Errorf("delete knot repos zip file after uploading: %w", err), metadata)
87
103
}
88
104
}()
89
105
···
92
108
// reset the reader back to the start so that the minio upload can read the data that's been written.
93
109
_, err = f.Seek(0, 0)
94
110
if err != nil {
95
95
-
slog.Error("setting seek on written file", "error", err)
96
96
-
return
111
111
+
return fmt.Errorf("setting seek on written file: %w", err)
97
112
}
98
113
99
114
fi, err := f.Stat()
100
115
if err != nil {
101
101
-
slog.Error("failed to stat knot db zip file", "error", err)
102
102
-
return
116
116
+
return fmt.Errorf("stat written file: %w", err)
103
117
}
104
118
105
119
_, err = s.minioClient.PutObject(ctx, s.bucketName, "knot-repos.zip", f, fi.Size(), minio.PutObjectOptions{})
106
120
if err != nil {
107
107
-
slog.Error("write knot repos to bucket", "error", err)
108
108
-
bugsnag.Notify(err)
121
121
+
return fmt.Errorf("put knot repo file to bucket: %w", err)
109
122
}
123
123
+
return nil
110
124
}
111
125
112
126
func compress(src string, writer io.Writer) error {