A tool for backing up ATProto related data to S3

Run on cron job. Store blobs locally as a cache.

+161 -65
+1
.env.example
··· 7 7 TANGLED_KNOT_DATABASE_DIRECTORY="/path/to/database/directory" 8 8 TANGLED_KNOT_REPOSITORY_DIRECTORY="/path/to/repository/directory" 9 9 BUGSNAG_API_KEY="enter-api-key-to-enable" 10 + BLOB_DIR="./blobs"
+1
.gitignore
··· 1 1 .env 2 + .DS_Store
+19
Dockerfile
··· 1 + FROM golang:alpine AS builder 2 + 3 + WORKDIR /app 4 + 5 + COPY . . 6 + RUN go mod download 7 + 8 + COPY . . 9 + 10 + RUN CGO_ENABLED=0 go build -o back-at-it . 11 + 12 + FROM alpine:latest 13 + 14 + RUN apk --no-cache add ca-certificates 15 + 16 + WORKDIR /app/ 17 + COPY --from=builder /app/back-at-it . 18 + 19 + ENTRYPOINT ["./back-at-it"]
+9
docker-compose.yaml
··· 1 + services: 2 + back-at-it: 3 + container_name: back-at-it 4 + image: willdot/back-at-it:latest 5 + environment: 6 + ENV_LOCATION: "/app/data/ back-at-it.env" 7 + volumes: 8 + - ./data:/app/data 9 + restart: always
+1
go.mod
··· 21 21 github.com/minio/md5-simd v1.1.2 // indirect 22 22 github.com/philhofer/fwd v1.2.0 // indirect 23 23 github.com/pkg/errors v0.9.1 // indirect 24 + github.com/robfig/cron v1.2.0 // indirect 24 25 github.com/rs/xid v1.6.0 // indirect 25 26 github.com/stretchr/testify v1.10.0 // indirect 26 27 github.com/tinylib/msgp v1.3.0 // indirect
+2
go.sum
··· 34 34 github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= 35 35 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= 36 36 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= 37 + github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= 38 + github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= 37 39 github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU= 38 40 github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= 39 41 github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
+43 -2
main.go
··· 3 3 import ( 4 4 "context" 5 5 "log/slog" 6 + "net/http" 6 7 "os" 8 + "time" 7 9 8 10 "github.com/bugsnag/bugsnag-go/v2" 9 11 "github.com/joho/godotenv" 10 12 "github.com/minio/minio-go/v7" 11 13 "github.com/minio/minio-go/v7/pkg/credentials" 14 + "github.com/robfig/cron" 12 15 ) 16 + 17 + type service struct { 18 + pdsHost string 19 + did string 20 + blobDir string 21 + bucketName string 22 + httpClient *http.Client 23 + minioClient *minio.Client 24 + } 13 25 14 26 func main() { 15 27 ctx := context.Background() ··· 40 52 return 41 53 } 42 54 43 - backupPDS(ctx, minioClient, bucketName) 44 - backupTangledKnot(ctx, minioClient, bucketName) 55 + pdsHost := os.Getenv("PDS_HOST") 56 + did := os.Getenv("DID") 57 + blobDir := os.Getenv("BLOB_DIR") 58 + 59 + service := service{ 60 + pdsHost: pdsHost, 61 + did: did, 62 + blobDir: blobDir, 63 + bucketName: bucketName, 64 + minioClient: minioClient, 65 + httpClient: &http.Client{ 66 + Timeout: time.Second * 5, 67 + Transport: &http.Transport{ 68 + IdleConnTimeout: time.Second * 90, 69 + }, 70 + }, 71 + } 72 + 73 + service.backupPDS(ctx) 74 + service.backupTangledKnot(ctx) 75 + 76 + c := cron.New() 77 + 78 + c.AddFunc("@hourly", func() { 79 + service.backupPDS(ctx) 80 + }) 81 + c.AddFunc("@hourly", func() { 82 + service.backupTangledKnot(ctx) 83 + }) 84 + 85 + c.Start() 45 86 } 46 87 47 88 func createMinioClient() (*minio.Client, error) {
+72 -53
pds.go
··· 9 9 "log/slog" 10 10 "net/http" 11 11 "os" 12 + "path/filepath" 13 + "time" 12 14 13 15 "github.com/bugsnag/bugsnag-go/v2" 14 16 "github.com/minio/minio-go/v7" 15 17 ) 16 18 17 - func backupPDS(ctx context.Context, minioClient *minio.Client, bucketName string) { 18 - if os.Getenv("PDS_HOST") == "" || os.Getenv("DID") == "" { 19 + func (s *service) backupPDS(ctx context.Context) { 20 + if s.pdsHost == "" || s.did == "" { 19 21 slog.Info("PDS_HOST or DID env not set - skipping PDS backup") 20 22 return 21 23 } 22 24 23 - err := backupRepo(ctx, minioClient, bucketName) 25 + err := s.backupRepo(ctx) 24 26 if err != nil { 25 27 slog.Error("backup repo", "error", err) 26 28 bugsnag.Notify(err) 27 29 return 28 30 } 29 31 30 - err = backupBlobs(ctx, minioClient, bucketName) 32 + err = s.backupBlobs(ctx) 31 33 if err != nil { 32 34 slog.Error("backup blobs", "error", err) 33 35 bugsnag.Notify(err) 34 36 return 35 37 } 36 - } 37 38 38 - func backupRepo(ctx context.Context, minioClient *minio.Client, bucketName string) error { 39 - pdsHost := os.Getenv("PDS_HOST") 40 - did := os.Getenv("DID") 39 + slog.Info("finished PDS backup") 40 + } 41 41 42 - url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", pdsHost, did) 42 + func (s *service) backupRepo(ctx context.Context) error { 43 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", s.pdsHost, s.did) 43 44 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 44 45 if err != nil { 45 46 return fmt.Errorf("create get repo request: %w", err) 46 47 } 47 48 48 49 req.Header.Add("ACCEPT", "application/vnd.ipld.car") 49 - resp, err := http.DefaultClient.Do(req) 50 + resp, err := s.httpClient.Do(req) 50 51 if err != nil { 51 52 return fmt.Errorf("get repo: %w", err) 52 53 } 53 54 54 55 defer resp.Body.Close() 55 56 56 - _, err = minioClient.PutObject(ctx, bucketName, "pds-repo", resp.Body, -1, minio.PutObjectOptions{}) 57 + _, err = s.minioClient.PutObject(ctx, s.bucketName, "pds-repo", resp.Body, -1, minio.PutObjectOptions{}) 57 58 if err != nil { 58 59 return fmt.Errorf("stream repo to bucket: %w", err) 59 60 } ··· 61 62 return nil 62 63 } 63 64 64 - func backupBlobs(ctx context.Context, minioClient *minio.Client, bucketName string) error { 65 - cids, err := getAllBlobCIDs(ctx) 65 + func (s *service) backupBlobs(ctx context.Context) error { 66 + cids, err := s.getAllBlobCIDs(ctx) 66 67 if err != nil { 67 68 return fmt.Errorf("get all blob CIDs: %w", err) 68 69 } 69 70 70 - reader, writer := io.Pipe() 71 - defer reader.Close() 71 + filename := fmt.Sprintf("%s-%s-blobs.zip", s.did, time.Now()) 72 + defer os.Remove(filename) 72 73 73 - zipWriter := zip.NewWriter(writer) 74 + f, err := os.Create(filename) 75 + if err != nil { 76 + return fmt.Errorf("creating zip file: %w", err) 77 + } 78 + defer f.Close() 74 79 75 - go func() { 76 - defer writer.Close() 77 - defer zipWriter.Close() 80 + zipWriter := zip.NewWriter(f) 81 + for _, cid := range cids { 82 + slog.Info("processing cid", "cid", cid) 83 + blob, err := s.getBlob(ctx, cid) 84 + if err != nil { 85 + slog.Error("failed to get blob", "cid", cid, "error", err) 86 + bugsnag.Notify(err) 87 + continue 88 + } 78 89 79 - for _, cid := range cids { 80 - slog.Info("processing cid", "cid", cid) 81 - blob, err := getBlob(ctx, cid) 82 - if err != nil { 83 - slog.Error("failed to get blob", "cid", cid, "error", err) 84 - bugsnag.Notify(err) 85 - continue 86 - } 90 + zipFile, err := zipWriter.Create(cid) 91 + if err != nil { 92 + slog.Error("create new file in zipwriter", "cid", cid, "error", err) 93 + bugsnag.Notify(err) 94 + continue 95 + } 87 96 88 - zipFile, err := zipWriter.Create(cid) 89 - if err != nil { 90 - slog.Error("create new file in zipwriter", "cid", cid, "error", err) 91 - bugsnag.Notify(err) 92 - blob.Close() 93 - continue 94 - } 97 + zipFile.Write(blob) 98 + } 99 + err = zipWriter.Close() 100 + if err != nil { 101 + return fmt.Errorf("close zip writer: %w", err) 102 + } 95 103 96 - io.Copy(zipFile, blob) 97 - blob.Close() 98 - } 99 - }() 104 + fi, err := f.Stat() 105 + if err != nil { 106 + return fmt.Errorf("stat zip file: %w", err) 107 + } 100 108 101 - _, err = minioClient.PutObject(ctx, bucketName, "pds-blobs.zip", reader, -1, minio.PutObjectOptions{}) 109 + _, err = s.minioClient.PutObject(ctx, s.bucketName, "pds-blobs.zip", f, fi.Size(), minio.PutObjectOptions{}) 102 110 if err != nil { 103 111 return fmt.Errorf("stream blobs to bucket: %w", err) 104 112 } ··· 106 114 return nil 107 115 } 108 116 109 - func getAllBlobCIDs(ctx context.Context) ([]string, error) { 117 + func (s *service) getAllBlobCIDs(ctx context.Context) ([]string, error) { 110 118 cursor := "" 111 119 limit := 100 112 120 var cids []string 113 121 for { 114 - res, err := listBlobs(ctx, cursor, int64(limit)) 122 + res, err := s.listBlobs(ctx, cursor, int64(limit)) 115 123 if err != nil { 116 124 return nil, fmt.Errorf("list blobs: %w", err) 117 125 } ··· 134 142 CIDs []string `json:"cids"` 135 143 } 136 144 137 - func listBlobs(ctx context.Context, cursor string, limit int64) (listBlobsResponse, error) { 138 - pdsHost := os.Getenv("PDS_HOST") 139 - did := os.Getenv("DID") 140 - 145 + func (s *service) listBlobs(ctx context.Context, cursor string, limit int64) (listBlobsResponse, error) { 141 146 // TODO: do proper url encoding of query params 142 - url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listBlobs?did=%s&cursor=%s&limit=%d", pdsHost, did, cursor, limit) 147 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listBlobs?did=%s&cursor=%s&limit=%d", s.pdsHost, s.did, cursor, limit) 143 148 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 144 149 if err != nil { 145 150 return listBlobsResponse{}, fmt.Errorf("create list blobs request: %w", err) 146 151 } 147 152 148 - resp, err := http.DefaultClient.Do(req) 153 + resp, err := s.httpClient.Do(req) 149 154 if err != nil { 150 155 return listBlobsResponse{}, fmt.Errorf("list blobs: %w", err) 151 156 } ··· 166 171 return result, nil 167 172 } 168 173 169 - func getBlob(ctx context.Context, cid string) (io.ReadCloser, error) { 170 - pdsHost := os.Getenv("PDS_HOST") 171 - did := os.Getenv("DID") 174 + func (s *service) getBlob(ctx context.Context, cid string) ([]byte, error) { 175 + filename := filepath.Join(s.blobDir, fmt.Sprintf("blob-%s-%s", s.did, cid)) 176 + existing, err := os.ReadFile(filename) 177 + if !os.IsNotExist(err) { 178 + return existing, nil 179 + } 172 180 173 181 // TODO: do proper url encoding of query params 174 - url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", pdsHost, did, cid) 182 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", s.pdsHost, s.did, cid) 175 183 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 176 184 if err != nil { 177 185 return nil, fmt.Errorf("create get blob request: %w", err) 178 186 } 179 187 180 - resp, err := http.DefaultClient.Do(req) 188 + resp, err := s.httpClient.Do(req) 181 189 if err != nil { 182 190 return nil, fmt.Errorf("get blob: %w", err) 191 + } 192 + defer resp.Body.Close() 193 + 194 + b, err := io.ReadAll(resp.Body) 195 + if err != nil { 196 + return nil, fmt.Errorf("read blob response body: %w", err) 183 197 } 184 198 185 - return resp.Body, nil 199 + err = os.WriteFile(filename, b, os.ModePerm) 200 + if err != nil { 201 + slog.Error("writing blob", "error", err, "cid", cid) 202 + } 203 + 204 + return b, nil 186 205 }
+2 -1
readme.md
··· 24 24 25 25 ### Todo 26 26 27 - - [ ] - Turn this into a long running app using a cron library perhaps 27 + - [x] - Turn this into a long running app using a cron library perhaps 28 + - [ ] - Work out how to tar just the directory requested, not the full path (ie `/home/will/tangled/repo` should back up `/repo` only and not create the full path of empty directories) 28 29 - [ ] - User query params properly when creating the URLs to fetch repo and blobs 29 30 - [ ] - Allow configuring the backup of knot repo data per users DID maybe?
+11 -9
tangled_knot.go
··· 13 13 "github.com/minio/minio-go/v7" 14 14 ) 15 15 16 - func backupTangledKnot(ctx context.Context, minioClient *minio.Client, bucketName string) { 17 - backupKnotDB(ctx, minioClient, bucketName) 18 - backupKnotRepos(ctx, minioClient, bucketName) 16 + func (s *service) backupTangledKnot(ctx context.Context) { 17 + s.backupKnotDB(ctx) 18 + s.backupKnotRepos(ctx) 19 + 20 + slog.Info("finished tangled knot backup") 19 21 } 20 22 21 - func backupKnotDB(ctx context.Context, minioClient *minio.Client, bucketName string) { 23 + func (s *service) backupKnotDB(ctx context.Context) { 22 24 dir := os.Getenv("TANGLED_KNOT_DATABASE_DIRECTORY") 23 25 if dir == "" { 24 26 slog.Info("TANGLED_KNOT_DATABASE_DIRECTORY env not set - skipping knot DB backup") ··· 29 31 30 32 go compress(dir, pipeWriter) 31 33 32 - _, err := minioClient.PutObject(ctx, bucketName, "knot-db.zip", pipeReader, -1, minio.PutObjectOptions{}) 34 + _, err := s.minioClient.PutObject(ctx, s.bucketName, "knot-db.zip", pipeReader, -1, minio.PutObjectOptions{}) 33 35 if err != nil { 34 - slog.Error("stream knot DB to bucket: %w") 36 + slog.Error("stream knot DB to bucket", "error", err) 35 37 bugsnag.Notify(err) 36 38 } 37 39 } 38 40 39 - func backupKnotRepos(ctx context.Context, minioClient *minio.Client, bucketName string) { 41 + func (s *service) backupKnotRepos(ctx context.Context) { 40 42 dir := os.Getenv("TANGLED_KNOT_REPOSITORY_DIRECTORY") 41 43 if dir == "" { 42 44 slog.Info("TANGLED_KNOT_REPOSITORY_DIRECTORY env not set - skipping knot repo backup") ··· 47 49 48 50 go compress(dir, pipeWriter) 49 51 50 - _, err := minioClient.PutObject(ctx, bucketName, "knot-repos.zip", pipeReader, -1, minio.PutObjectOptions{}) 52 + _, err := s.minioClient.PutObject(ctx, s.bucketName, "knot-repos.zip", pipeReader, -1, minio.PutObjectOptions{}) 51 53 if err != nil { 52 - slog.Error("stream knot repos to bucket: %w") 54 + slog.Error("stream knot repos to bucket", "error", err) 53 55 bugsnag.Notify(err) 54 56 } 55 57 }