tangled
alpha
login
or
join now
willdot.net
/
feed-demo-go
4
fork
atom
A demo of a Bluesky feed generator in Go
4
fork
atom
overview
issues
pulls
pipelines
add the files
willdot.net
6 months ago
f0e8436e
f082dfea
+294
2 changed files
expand all
collapse all
unified
split
cmd
feed-generator
main.go
register-feed
main.go
+105
cmd/feed-generator/main.go
···
1
1
+
package main
2
2
+
3
3
+
import (
4
4
+
"context"
5
5
+
"errors"
6
6
+
"fmt"
7
7
+
"log"
8
8
+
"log/slog"
9
9
+
"os"
10
10
+
"os/signal"
11
11
+
"path"
12
12
+
"syscall"
13
13
+
14
14
+
"tangled.sh/willdot.net/feed-demo-go"
15
15
+
16
16
+
"github.com/avast/retry-go/v4"
17
17
+
"github.com/joho/godotenv"
18
18
+
)
19
19
+
20
20
+
const (
21
21
+
defaultJetstreamAddr = "wss://jetstream.atproto.tools/subscribe"
22
22
+
serverPort = 443 // this must be the port value used. See https://docs.bsky.app/docs/starter-templates/custom-feeds#deploying-your-feed
23
23
+
)
24
24
+
25
25
+
func main() {
26
26
+
err := run()
27
27
+
if err != nil {
28
28
+
log.Fatal(err)
29
29
+
}
30
30
+
}
31
31
+
32
32
+
func run() error {
33
33
+
err := godotenv.Load()
34
34
+
if err != nil && !os.IsNotExist(err) {
35
35
+
return fmt.Errorf("error loading .env file")
36
36
+
}
37
37
+
38
38
+
signals := make(chan os.Signal, 1)
39
39
+
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
40
40
+
41
41
+
feedHost := os.Getenv("FEED_HOST_NAME")
42
42
+
if feedHost == "" {
43
43
+
return fmt.Errorf("FEED_HOST_NAME not set")
44
44
+
}
45
45
+
feedName := os.Getenv("FEED_NAME")
46
46
+
if feedName == "" {
47
47
+
return fmt.Errorf("FEED_NAME not set")
48
48
+
}
49
49
+
50
50
+
dbPath := os.Getenv("DATABASE_PATH")
51
51
+
if dbPath == "" {
52
52
+
dbPath = "./"
53
53
+
}
54
54
+
55
55
+
dbFilename := path.Join(dbPath, "database.db")
56
56
+
database, err := feed.NewDatabase(dbFilename)
57
57
+
if err != nil {
58
58
+
return fmt.Errorf("create new store: %w", err)
59
59
+
}
60
60
+
defer database.Close()
61
61
+
62
62
+
ctx, cancel := context.WithCancel(context.Background())
63
63
+
defer cancel()
64
64
+
65
65
+
go consumeLoop(ctx, database)
66
66
+
67
67
+
server, err := feed.NewServer(serverPort, feedHost, feedName, database)
68
68
+
if err != nil {
69
69
+
return fmt.Errorf("create new server: %w", err)
70
70
+
}
71
71
+
go func() {
72
72
+
<-signals
73
73
+
cancel()
74
74
+
_ = server.Stop(context.Background())
75
75
+
}()
76
76
+
77
77
+
server.Run()
78
78
+
return nil
79
79
+
}
80
80
+
81
81
+
func consumeLoop(ctx context.Context, database *feed.Database) {
82
82
+
handler := feed.NewFeedHandler(database)
83
83
+
84
84
+
jsServerAddr := os.Getenv("JS_SERVER_ADDR")
85
85
+
if jsServerAddr == "" {
86
86
+
jsServerAddr = defaultJetstreamAddr
87
87
+
}
88
88
+
89
89
+
consumer := feed.NewJetstreamConsumer(jsServerAddr, slog.Default(), handler)
90
90
+
91
91
+
_ = retry.Do(func() error {
92
92
+
err := consumer.Consume(ctx)
93
93
+
if err != nil {
94
94
+
// if the context has been cancelled then it's time to exit
95
95
+
if errors.Is(err, context.Canceled) {
96
96
+
return nil
97
97
+
}
98
98
+
slog.Error("consume loop", "error", err)
99
99
+
return err
100
100
+
}
101
101
+
return nil
102
102
+
}, retry.Attempts(0)) // retry indefinitly until context canceled
103
103
+
104
104
+
slog.Warn("exiting consume loop")
105
105
+
}
+189
cmd/register-feed/main.go
···
1
1
+
package main
2
2
+
3
3
+
import (
4
4
+
"bytes"
5
5
+
"encoding/json"
6
6
+
"fmt"
7
7
+
"io"
8
8
+
"log/slog"
9
9
+
"net/http"
10
10
+
"os"
11
11
+
"time"
12
12
+
13
13
+
"github.com/joho/godotenv"
14
14
+
)
15
15
+
16
16
+
const (
17
17
+
baseurl = "https://bsky.social/xrpc"
18
18
+
19
19
+
httpClientTimeoutDuration = time.Second * 5
20
20
+
transportIdleConnTimeoutDuration = time.Second * 90
21
21
+
)
22
22
+
23
23
+
type auth struct {
24
24
+
AccessJwt string `json:"accessJwt"`
25
25
+
Did string `json:"did"`
26
26
+
}
27
27
+
28
28
+
type registerFeedGen struct {
29
29
+
Repo string `json:"repo"`
30
30
+
Collection string `json:"collection"`
31
31
+
Rkey string `json:"rkey"`
32
32
+
Record registerRecord `json:"record"`
33
33
+
}
34
34
+
35
35
+
type registerRecord struct {
36
36
+
Did string `json:"did"`
37
37
+
DisplayName string `json:"displayName"`
38
38
+
Description string `json:"description"`
39
39
+
CreatedAt time.Time `json:"createdAt"`
40
40
+
}
41
41
+
42
42
+
func main() {
43
43
+
err := run()
44
44
+
if err != nil {
45
45
+
slog.Error("error registering feed", "error", err)
46
46
+
os.Exit(1)
47
47
+
}
48
48
+
}
49
49
+
50
50
+
func run() error {
51
51
+
err := godotenv.Load()
52
52
+
if err != nil && !os.IsNotExist(err) {
53
53
+
return fmt.Errorf("error loading .env file")
54
54
+
}
55
55
+
56
56
+
httpClient := http.Client{
57
57
+
Timeout: httpClientTimeoutDuration,
58
58
+
Transport: &http.Transport{
59
59
+
IdleConnTimeout: transportIdleConnTimeoutDuration,
60
60
+
},
61
61
+
}
62
62
+
auth, err := login(httpClient)
63
63
+
if err != nil {
64
64
+
return fmt.Errorf("failed to login: %w", err)
65
65
+
}
66
66
+
67
67
+
err = Register(auth, httpClient)
68
68
+
if err != nil {
69
69
+
return err
70
70
+
}
71
71
+
72
72
+
return nil
73
73
+
}
74
74
+
75
75
+
func login(client http.Client) (*auth, error) {
76
76
+
handle := os.Getenv("BSKY_HANDLE")
77
77
+
appPass := os.Getenv("BSKY_PASS")
78
78
+
79
79
+
url := fmt.Sprintf("%s/com.atproto.server.createsession", baseurl)
80
80
+
81
81
+
requestData := map[string]interface{}{
82
82
+
"identifier": handle,
83
83
+
"password": appPass,
84
84
+
}
85
85
+
86
86
+
data, err := json.Marshal(requestData)
87
87
+
if err != nil {
88
88
+
return nil, fmt.Errorf("failed to marshal request: %w", err)
89
89
+
}
90
90
+
91
91
+
r := bytes.NewReader(data)
92
92
+
93
93
+
req, err := http.NewRequest("POST", url, r)
94
94
+
if err != nil {
95
95
+
return nil, fmt.Errorf("failed to create request: %w", err)
96
96
+
}
97
97
+
98
98
+
req.Header.Add("Content-Type", "application/json")
99
99
+
100
100
+
res, err := client.Do(req)
101
101
+
if err != nil {
102
102
+
return nil, fmt.Errorf("failed to make request: %w", err)
103
103
+
}
104
104
+
105
105
+
defer func() {
106
106
+
_ = res.Body.Close()
107
107
+
}()
108
108
+
109
109
+
resBody, err := io.ReadAll(res.Body)
110
110
+
if err != nil {
111
111
+
return nil, fmt.Errorf("failed to read response: %w", err)
112
112
+
}
113
113
+
114
114
+
var loginResp auth
115
115
+
err = json.Unmarshal(resBody, &loginResp)
116
116
+
if err != nil {
117
117
+
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
118
118
+
}
119
119
+
return &loginResp, nil
120
120
+
}
121
121
+
122
122
+
func Register(auth *auth, httpClient http.Client) error {
123
123
+
feedName := os.Getenv("FEED_NAME")
124
124
+
if feedName == "" {
125
125
+
return fmt.Errorf("FEED_NAME env not set")
126
126
+
}
127
127
+
feedDisplayName := os.Getenv("FEED_DISPLAY_NAME")
128
128
+
if feedDisplayName == "" {
129
129
+
return fmt.Errorf("FEED_DISPLAY_NAME env not set")
130
130
+
}
131
131
+
feedDescription := os.Getenv("FEED_DESCRIPTION")
132
132
+
if feedDescription == "" {
133
133
+
return fmt.Errorf("FEED_DESCRIPTION env not set")
134
134
+
}
135
135
+
feedDID := os.Getenv("FEED_DID")
136
136
+
if feedDID == "" {
137
137
+
return fmt.Errorf("FEED_DID environment not set")
138
138
+
}
139
139
+
140
140
+
reqData := registerFeedGen{
141
141
+
Repo: auth.Did,
142
142
+
Collection: "app.bsky.feed.generator",
143
143
+
Rkey: feedName,
144
144
+
Record: registerRecord{
145
145
+
Did: feedDID,
146
146
+
DisplayName: feedDisplayName,
147
147
+
Description: feedDescription,
148
148
+
CreatedAt: time.Now(),
149
149
+
},
150
150
+
}
151
151
+
152
152
+
data, err := json.Marshal(reqData)
153
153
+
if err != nil {
154
154
+
return fmt.Errorf("failed to marshal request: %w", err)
155
155
+
}
156
156
+
157
157
+
r := bytes.NewReader(data)
158
158
+
159
159
+
url := fmt.Sprintf("%s/com.atproto.repo.putRecord", baseurl)
160
160
+
req, err := http.NewRequest("POST", url, r)
161
161
+
if err != nil {
162
162
+
return fmt.Errorf("failed to create new post request: %w", err)
163
163
+
}
164
164
+
165
165
+
req.Header.Add("Content-Type", "application/json")
166
166
+
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", auth.AccessJwt))
167
167
+
168
168
+
res, err := httpClient.Do(req)
169
169
+
if err != nil {
170
170
+
return fmt.Errorf("failed to make create post request: %w", err)
171
171
+
}
172
172
+
173
173
+
defer func() {
174
174
+
_ = res.Body.Close()
175
175
+
}()
176
176
+
177
177
+
b, err := io.ReadAll(res.Body)
178
178
+
if err != nil {
179
179
+
fmt.Println(err)
180
180
+
} else {
181
181
+
fmt.Println(string(b))
182
182
+
}
183
183
+
184
184
+
if res.StatusCode != 200 {
185
185
+
return fmt.Errorf("failed to create post: %v", res.StatusCode)
186
186
+
}
187
187
+
188
188
+
return nil
189
189
+
}