this repo has no description

tapc: add tap client package

Signed-off-by: Seongmin Lee <git@boltless.me>

authored by boltless.me and committed by tangled.org b1c60fc4 f8a24a33

+259
+3
tapc/readme.md
··· 1 + basic tap client package 2 + 3 + Replace this to official indigo package when <https://github.com/bluesky-social/indigo/pull/1241> gets merged.
+24
tapc/simpleIndexer.go
··· 1 + package tapc 2 + 3 + import "context" 4 + 5 + type SimpleIndexer struct { 6 + EventHandler func(ctx context.Context, evt Event) error 7 + ErrorHandler func(ctx context.Context, err error) 8 + } 9 + 10 + var _ Handler = (*SimpleIndexer)(nil) 11 + 12 + func (i *SimpleIndexer) OnEvent(ctx context.Context, evt Event) error { 13 + if i.EventHandler == nil { 14 + return nil 15 + } 16 + return i.EventHandler(ctx, evt) 17 + } 18 + 19 + func (i *SimpleIndexer) OnError(ctx context.Context, err error) { 20 + if i.ErrorHandler == nil { 21 + return 22 + } 23 + i.ErrorHandler(ctx, err) 24 + }
+170
tapc/tap.go
··· 1 + /// heavily inspired by <https://github.com/bluesky-social/atproto/blob/c7f5a868837d3e9b3289f988fee2267789327b06/packages/tap/README.md> 2 + 3 + package tapc 4 + 5 + import ( 6 + "bytes" 7 + "context" 8 + "encoding/json" 9 + "fmt" 10 + "net/http" 11 + "net/url" 12 + "time" 13 + 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 + "github.com/gorilla/websocket" 16 + "tangled.org/core/log" 17 + ) 18 + 19 + type Handler interface { 20 + OnEvent(ctx context.Context, evt Event) error 21 + OnError(ctx context.Context, err error) 22 + } 23 + 24 + type Client struct { 25 + Url string 26 + AdminPassword string 27 + HTTPClient *http.Client 28 + } 29 + 30 + func NewClient(url, adminPassword string) Client { 31 + return Client{ 32 + Url: url, 33 + AdminPassword: adminPassword, 34 + HTTPClient: &http.Client{}, 35 + } 36 + } 37 + 38 + func (c *Client) AddRepos(ctx context.Context, dids []syntax.DID) error { 39 + body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 40 + if err != nil { 41 + return err 42 + } 43 + req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/add", bytes.NewReader(body)) 44 + if err != nil { 45 + return err 46 + } 47 + req.SetBasicAuth("admin", c.AdminPassword) 48 + req.Header.Set("Content-Type", "application/json") 49 + 50 + resp, err := c.HTTPClient.Do(req) 51 + if err != nil { 52 + return err 53 + } 54 + defer resp.Body.Close() 55 + if resp.StatusCode != http.StatusOK { 56 + return fmt.Errorf("tap: /repos/add failed with status %d", resp.StatusCode) 57 + } 58 + return nil 59 + } 60 + 61 + func (c *Client) RemoveRepos(ctx context.Context, dids []syntax.DID) error { 62 + body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 63 + if err != nil { 64 + return err 65 + } 66 + req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/remove", bytes.NewReader(body)) 67 + if err != nil { 68 + return err 69 + } 70 + req.SetBasicAuth("admin", c.AdminPassword) 71 + req.Header.Set("Content-Type", "application/json") 72 + 73 + resp, err := c.HTTPClient.Do(req) 74 + if err != nil { 75 + return err 76 + } 77 + defer resp.Body.Close() 78 + if resp.StatusCode != http.StatusOK { 79 + return fmt.Errorf("tap: /repos/remove failed with status %d", resp.StatusCode) 80 + } 81 + return nil 82 + } 83 + 84 + func (c *Client) Connect(ctx context.Context, handler Handler) error { 85 + l := log.FromContext(ctx) 86 + 87 + u, err := url.Parse(c.Url) 88 + if err != nil { 89 + return err 90 + } 91 + if u.Scheme == "https" { 92 + u.Scheme = "wss" 93 + } else { 94 + u.Scheme = "ws" 95 + } 96 + u.Path = "/channel" 97 + 98 + // TODO: set auth on dial 99 + 100 + url := u.String() 101 + 102 + var backoff int 103 + for { 104 + select { 105 + case <-ctx.Done(): 106 + return ctx.Err() 107 + default: 108 + } 109 + 110 + header := http.Header{ 111 + "Authorization": []string{""}, 112 + } 113 + conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header) 114 + if err != nil { 115 + l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff) 116 + time.Sleep(time.Duration(5+backoff) * time.Second) 117 + backoff++ 118 + 119 + continue 120 + } 121 + l.Info("connected to tap service") 122 + 123 + l.Info("tap event subscription response", "code", res.StatusCode) 124 + 125 + if err = c.handleConnection(ctx, conn, handler); err != nil { 126 + l.Warn("tap connection failed", "err", err, "backoff", backoff) 127 + } 128 + } 129 + } 130 + 131 + func (c *Client) handleConnection(ctx context.Context, conn *websocket.Conn, handler Handler) error { 132 + l := log.FromContext(ctx) 133 + 134 + defer func() { 135 + conn.Close() 136 + l.Warn("closed tap conection") 137 + }() 138 + l.Info("established tap conection") 139 + 140 + for { 141 + select { 142 + case <-ctx.Done(): 143 + return ctx.Err() 144 + default: 145 + } 146 + _, message, err := conn.ReadMessage() 147 + if err != nil { 148 + return err 149 + } 150 + 151 + var ev Event 152 + if err := json.Unmarshal(message, &ev); err != nil { 153 + handler.OnError(ctx, fmt.Errorf("failed to parse message: %w", err)) 154 + continue 155 + } 156 + if err := handler.OnEvent(ctx, ev); err != nil { 157 + handler.OnError(ctx, fmt.Errorf("failed to process event %d: %w", ev.ID, err)) 158 + continue 159 + } 160 + 161 + ack := map[string]any{ 162 + "type": "ack", 163 + "id": ev.ID, 164 + } 165 + if err := conn.WriteJSON(ack); err != nil { 166 + l.Warn("failed to send ack", "err", err) 167 + continue 168 + } 169 + } 170 + }
+62
tapc/types.go
··· 1 + package tapc 2 + 3 + import ( 4 + "encoding/json" 5 + "fmt" 6 + 7 + "github.com/bluesky-social/indigo/atproto/syntax" 8 + ) 9 + 10 + type EventType string 11 + 12 + const ( 13 + EvtRecord EventType = "record" 14 + EvtIdentity EventType = "identity" 15 + ) 16 + 17 + type Event struct { 18 + ID int64 `json:"id"` 19 + Type EventType `json:"type"` 20 + Record *RecordEventData `json:"record,omitempty"` 21 + Identity *IdentityEventData `json:"identity,omitempty"` 22 + } 23 + 24 + type RecordEventData struct { 25 + Live bool `json:"live"` 26 + Did syntax.DID `json:"did"` 27 + Rev string `json:"rev"` 28 + Collection syntax.NSID `json:"collection"` 29 + Rkey syntax.RecordKey `json:"rkey"` 30 + Action RecordAction `json:"action"` 31 + Record json.RawMessage `json:"record,omitempty"` 32 + CID *syntax.CID `json:"cid,omitempty"` 33 + } 34 + 35 + func (r *RecordEventData) AtUri() syntax.ATURI { 36 + return syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", r.Did, r.Collection, r.Rkey)) 37 + } 38 + 39 + type RecordAction string 40 + 41 + const ( 42 + RecordCreateAction RecordAction = "create" 43 + RecordUpdateAction RecordAction = "update" 44 + RecordDeleteAction RecordAction = "delete" 45 + ) 46 + 47 + type IdentityEventData struct { 48 + DID syntax.DID `json:"did"` 49 + Handle string `json:"handle"` 50 + IsActive bool `json:"is_active"` 51 + Status RepoStatus `json:"status"` 52 + } 53 + 54 + type RepoStatus string 55 + 56 + const ( 57 + RepoStatusActive RepoStatus = "active" 58 + RepoStatusTakendown RepoStatus = "takendown" 59 + RepoStatusSuspended RepoStatus = "suspended" 60 + RepoStatusDeactivated RepoStatus = "deactivated" 61 + RepoStatusDeleted RepoStatus = "deleted" 62 + )