A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go

some request crawl relay fixes

evan.jarrett.net cecf6d4b f340158a

verified
+223 -55
+56 -41
pkg/hold/admin/handlers_relays.go
··· 1 1 package admin 2 2 3 3 import ( 4 - "fmt" 5 4 "log/slog" 6 5 "net/http" 7 6 "net/url" ··· 85 84 ui.renderTemplate(w, "partials/relay_status.html", view) 86 85 } 87 86 88 - // handleRelayCrawl requests crawl from a single relay. 89 - func (ui *AdminUI) handleRelayCrawl(w http.ResponseWriter, r *http.Request) { 90 - if err := r.ParseForm(); err != nil { 91 - setFlash(w, r, "error", "Invalid form data") 92 - http.Redirect(w, r, "/admin#relays", http.StatusFound) 93 - return 94 - } 87 + // RelayCrawlResultView is the data for a single relay crawl result row. 88 + type RelayCrawlResultView struct { 89 + Name string 90 + URL string 91 + Success bool 92 + Error string 93 + } 95 94 96 - relayURL := r.FormValue("url") 95 + // handleRelayCrawl requests crawl from a single relay and returns an HTMX partial. 96 + func (ui *AdminUI) handleRelayCrawl(w http.ResponseWriter, r *http.Request) { 97 + relayURL := r.URL.Query().Get("url") 98 + relayName := r.URL.Query().Get("name") 97 99 if relayURL == "" { 98 - setFlash(w, r, "error", "Missing relay URL") 99 - http.Redirect(w, r, "/admin#relays", http.StatusFound) 100 + http.Error(w, "Missing relay URL", http.StatusBadRequest) 100 101 return 101 102 } 102 103 103 - if err := atproto.RequestCrawl(relayURL, ui.config.PublicURL); err != nil { 104 + err := atproto.RequestCrawl(relayURL, ui.config.PublicURL) 105 + 106 + view := RelayCrawlResultView{ 107 + Name: relayName, 108 + URL: relayURL, 109 + Success: err == nil, 110 + } 111 + if err != nil { 112 + view.Error = err.Error() 104 113 slog.Warn("Failed to request crawl from relay", "relay", relayURL, "error", err) 105 - setFlash(w, r, "error", "Crawl request failed: "+err.Error()) 106 114 } else { 107 115 slog.Info("Crawl requested via admin panel", "relay", relayURL) 108 - setFlash(w, r, "success", "Crawl requested from "+relayURL) 109 116 } 110 117 111 - http.Redirect(w, r, "/admin#relays", http.StatusFound) 118 + ui.renderTemplate(w, "partials/relay_crawl_result.html", view) 112 119 } 113 120 114 - // handleRelayCrawlAll requests crawl from all known relays. 121 + // handleRelayCrawlAll requests crawl from all known relays and returns HTMX partials. 115 122 func (ui *AdminUI) handleRelayCrawlAll(w http.ResponseWriter, r *http.Request) { 116 123 relays := atproto.KnownRelays 117 124 118 - var ( 119 - wg sync.WaitGroup 120 - mu sync.Mutex 121 - successes int 122 - failures int 123 - ) 125 + type result struct { 126 + relay atproto.KnownRelay 127 + err error 128 + } 124 129 125 - for _, relay := range relays { 130 + results := make([]result, len(relays)) 131 + var wg sync.WaitGroup 132 + 133 + for i, relay := range relays { 126 134 wg.Add(1) 127 - go func(relay atproto.KnownRelay) { 135 + go func(i int, relay atproto.KnownRelay) { 128 136 defer wg.Done() 129 - if err := atproto.RequestCrawl(relay.URL, ui.config.PublicURL); err != nil { 130 - slog.Warn("Failed to request crawl", "relay", relay.Name, "error", err) 131 - mu.Lock() 132 - failures++ 133 - mu.Unlock() 134 - } else { 135 - mu.Lock() 136 - successes++ 137 - mu.Unlock() 138 - } 139 - }(relay) 137 + err := atproto.RequestCrawl(relay.URL, ui.config.PublicURL) 138 + results[i] = result{relay: relay, err: err} 139 + }(i, relay) 140 140 } 141 141 142 142 wg.Wait() 143 143 144 144 session := getSessionFromContext(r.Context()) 145 + successes := 0 146 + for _, res := range results { 147 + if res.err == nil { 148 + successes++ 149 + } 150 + } 145 151 slog.Info("Crawl all requested via admin panel", 146 - "successes", successes, "failures", failures, "by", session.DID) 152 + "successes", successes, "failures", len(relays)-successes, "by", session.DID) 147 153 148 - if failures == 0 { 149 - setFlash(w, r, "success", fmt.Sprintf("Crawl requested from all %d relays", successes)) 150 - } else { 151 - setFlash(w, r, "warning", fmt.Sprintf("Crawl: %d succeeded, %d failed", successes, failures)) 154 + var views []RelayCrawlResultView 155 + for _, res := range results { 156 + v := RelayCrawlResultView{ 157 + Name: res.relay.Name, 158 + URL: res.relay.URL, 159 + Success: res.err == nil, 160 + } 161 + if res.err != nil { 162 + v.Error = res.err.Error() 163 + } 164 + views = append(views, v) 152 165 } 153 166 154 - http.Redirect(w, r, "/admin#relays", http.StatusFound) 167 + ui.renderTemplate(w, "partials/relay_crawl_results.html", struct { 168 + Results []RelayCrawlResultView 169 + }{Results: views}) 155 170 }
+48
pkg/hold/admin/templates/partials/relay_crawl_result.html
··· 1 + {{define "partials/relay_crawl_result.html"}} 2 + <tr hx-get="/admin/api/relay/status?url={{.URL}}&name={{.Name}}" 3 + hx-trigger="load delay:10s" 4 + hx-swap="outerHTML"> 5 + <td> 6 + {{if .Success}} 7 + <span class="badge badge-success badge-sm gap-1"> 8 + {{ icon "check-circle" "size-3" }} 9 + Sent 10 + </span> 11 + {{else}} 12 + <span class="badge badge-error badge-sm gap-1"> 13 + {{ icon "alert-circle" "size-3" }} 14 + Failed 15 + </span> 16 + {{end}} 17 + </td> 18 + <td> 19 + <div> 20 + <strong>{{.Name}}</strong><br> 21 + <code class="text-xs text-base-content/50 font-mono">{{.URL}}</code> 22 + </div> 23 + </td> 24 + <td> 25 + <span class="text-base-content/30 text-sm">-</span> 26 + </td> 27 + <td> 28 + {{if .Success}} 29 + <span class="text-sm text-info flex items-center gap-1"> 30 + <span class="loading loading-spinner loading-xs"></span> 31 + Crawl requested, refreshing... 32 + </span> 33 + {{else}} 34 + <span class="text-sm text-error">{{.Error}}</span> 35 + {{end}} 36 + </td> 37 + <td class="text-right"> 38 + <button class="btn btn-ghost btn-sm gap-1" 39 + hx-get="/admin/api/relay/status?url={{.URL}}&name={{.Name}}" 40 + hx-target="closest tr" 41 + hx-swap="outerHTML" 42 + title="Refresh Status"> 43 + {{ icon "refresh-ccw" "size-4" }} 44 + Refresh 45 + </button> 46 + </td> 47 + </tr> 48 + {{end}}
+50
pkg/hold/admin/templates/partials/relay_crawl_results.html
··· 1 + {{define "partials/relay_crawl_results.html"}} 2 + {{range .Results}} 3 + <tr hx-get="/admin/api/relay/status?url={{.URL}}&name={{.Name}}" 4 + hx-trigger="load delay:10s" 5 + hx-swap="outerHTML"> 6 + <td> 7 + {{if .Success}} 8 + <span class="badge badge-success badge-sm gap-1"> 9 + {{ icon "check-circle" "size-3" }} 10 + Sent 11 + </span> 12 + {{else}} 13 + <span class="badge badge-error badge-sm gap-1"> 14 + {{ icon "alert-circle" "size-3" }} 15 + Failed 16 + </span> 17 + {{end}} 18 + </td> 19 + <td> 20 + <div> 21 + <strong>{{.Name}}</strong><br> 22 + <code class="text-xs text-base-content/50 font-mono">{{.URL}}</code> 23 + </div> 24 + </td> 25 + <td> 26 + <span class="text-base-content/30 text-sm">-</span> 27 + </td> 28 + <td> 29 + {{if .Success}} 30 + <span class="text-sm text-info flex items-center gap-1"> 31 + <span class="loading loading-spinner loading-xs"></span> 32 + Crawl requested, refreshing... 33 + </span> 34 + {{else}} 35 + <span class="text-sm text-error">{{.Error}}</span> 36 + {{end}} 37 + </td> 38 + <td class="text-right"> 39 + <button class="btn btn-ghost btn-sm gap-1" 40 + hx-get="/admin/api/relay/status?url={{.URL}}&name={{.Name}}" 41 + hx-target="closest tr" 42 + hx-swap="outerHTML" 43 + title="Refresh Status"> 44 + {{ icon "refresh-ccw" "size-4" }} 45 + Refresh 46 + </button> 47 + </td> 48 + </tr> 49 + {{end}} 50 + {{end}}
+8 -7
pkg/hold/admin/templates/partials/relay_status.html
··· 48 48 </td> 49 49 <td class="text-right"> 50 50 {{if .Online}} 51 - <form action="/admin/relays/crawl" method="POST" class="inline"> 52 - <input type="hidden" name="url" value="{{.URL}}"> 53 - <button type="submit" class="btn btn-ghost btn-sm gap-1" title="Request Crawl"> 54 - {{ icon "refresh-ccw" "size-4" }} 55 - Crawl 56 - </button> 57 - </form> 51 + <button class="btn btn-ghost btn-sm gap-1" 52 + hx-post="/admin/relays/crawl?url={{.URL}}&name={{.Name}}" 53 + hx-target="closest tr" 54 + hx-swap="outerHTML" 55 + title="Request Crawl"> 56 + {{ icon "refresh-ccw" "size-4" }} 57 + Request Crawl 58 + </button> 58 59 {{end}} 59 60 </td> 60 61 </tr>
+19 -7
pkg/hold/admin/templates/partials/tab_relays.html
··· 1 1 {{define "partials/tab_relays.html"}} 2 2 <div class="flex justify-between items-center min-h-12 mb-6"> 3 3 <h1 class="text-2xl font-bold">Relays</h1> 4 - <form action="/admin/relays/crawl-all" method="POST"> 5 - <button type="submit" class="btn btn-primary gap-2"> 6 - {{ icon "refresh-ccw" "size-4" }} 7 - Crawl All 8 - </button> 9 - </form> 4 + <button class="btn btn-primary gap-2" 5 + hx-post="/admin/relays/crawl-all" 6 + hx-target="#relay-tbody" 7 + hx-swap="innerHTML" 8 + hx-indicator="#crawl-loading"> 9 + {{ icon "refresh-ccw" "size-4" }} 10 + Request Crawl All 11 + </button> 12 + </div> 13 + 14 + <div id="crawl-loading" class="htmx-indicator mb-4"> 15 + <div class="flex items-center gap-3 p-4 bg-base-200 rounded-lg"> 16 + <span class="loading loading-spinner loading-md text-primary"></span> 17 + <div> 18 + <p class="font-medium">Requesting crawl from all relays...</p> 19 + <p class="text-sm text-base-content/50">This may take a few seconds.</p> 20 + </div> 21 + </div> 10 22 </div> 11 23 12 24 <div class="card bg-base-100 shadow-sm"> ··· 21 33 <th class="text-right">Actions</th> 22 34 </tr> 23 35 </thead> 24 - <tbody> 36 + <tbody id="relay-tbody"> 25 37 {{range .Relays}} 26 38 <tr hx-get="/admin/api/relay/status?url={{.URL}}&name={{.Name}}" 27 39 hx-trigger="load"
+42
pkg/hold/pds/events.go
··· 387 387 388 388 slog.Info("New firehose subscriber", "remote", conn.RemoteAddr(), "cursor", cursor, "currentSeq", currentSeq, "userAgent", userAgent) 389 389 390 + // Send #account event first so relays know this DID is active. 391 + // This is written directly to the WebSocket before backfill/handleSubscriber 392 + // start, so there are no concurrent writers at this point. 393 + if err := b.sendAccountEvent(conn); err != nil { 394 + slog.Warn("Failed to send account event to subscriber", "error", err) 395 + } 396 + 390 397 // Handle cursor-based backfill: 391 398 // - cursor < 0: No backfill, stream new events only 392 399 // - cursor >= 0: Backfill events from cursor onwards ··· 412 419 go b.handleSubscriber(sub) 413 420 414 421 return sub 422 + } 423 + 424 + // sendAccountEvent writes an #account event directly to a WebSocket connection, 425 + // signaling that this DID is active on this host. This is critical for relays 426 + // that previously saw the DID deactivated on a different PDS. 427 + func (b *EventBroadcaster) sendAccountEvent(conn *websocket.Conn) error { 428 + header := events.EventHeader{ 429 + Op: events.EvtKindMessage, 430 + MsgType: "#account", 431 + } 432 + 433 + wc, err := conn.NextWriter(websocket.BinaryMessage) 434 + if err != nil { 435 + return fmt.Errorf("failed to get websocket writer: %w", err) 436 + } 437 + 438 + if err := header.MarshalCBOR(wc); err != nil { 439 + wc.Close() 440 + return fmt.Errorf("failed to write account event header: %w", err) 441 + } 442 + 443 + acctEvt := &atproto.SyncSubscribeRepos_Account{ 444 + Active: true, 445 + Did: b.holdDID, 446 + Seq: 0, // Not sequenced in the commit stream 447 + Time: time.Now().Format(time.RFC3339), 448 + } 449 + 450 + var obj lexutil.CBOR = acctEvt 451 + if err := obj.MarshalCBOR(wc); err != nil { 452 + wc.Close() 453 + return fmt.Errorf("failed to write account event body: %w", err) 454 + } 455 + 456 + return wc.Close() 415 457 } 416 458 417 459 // Unsubscribe removes a WebSocket subscriber