this repo has no description

nix,knotserver,knotmirror/xrpc: `sync.requestCrawl` support

knotmirror ingests repo creation event from atproto relay through tap,
not from knotstream. Therefore, knotserver will request
`sync.requestCrawl` with optional new repo information to notify the
repo creation event.

Knot will call `sync.requestCrawl` on following cases:
- on startup
- when `/event` stream has failed
- on repo creation (mandatory)

Signed-off-by: Seongmin Lee <git@boltless.me>

authored by boltless.me and committed by tangled.org a15e0f65 4b8fa95d

+261 -6
+56
knotmirror/hostutil/hostutil.go
··· 1 + package hostutil 2 + 3 + import ( 4 + "fmt" 5 + "net/url" 6 + "strings" 7 + 8 + "github.com/bluesky-social/indigo/atproto/syntax" 9 + ) 10 + 11 + func ParseHostname(raw string) (hostname string, noSSL bool, err error) { 12 + // handle case of bare hostname 13 + if !strings.Contains(raw, "://") { 14 + if strings.HasPrefix(raw, "localhost:") { 15 + raw = "http://" + raw 16 + } else { 17 + raw = "https://" + raw 18 + } 19 + } 20 + 21 + u, err := url.Parse(raw) 22 + if err != nil { 23 + return "", false, fmt.Errorf("not a valid host URL: %w", err) 24 + } 25 + 26 + switch u.Scheme { 27 + case "https", "wss": 28 + noSSL = false 29 + case "http", "ws": 30 + noSSL = true 31 + default: 32 + return "", false, fmt.Errorf("unsupported URL scheme: %s", u.Scheme) 33 + } 34 + 35 + // 'localhost' (exact string) is allowed *with* a required port number; SSL is optional 36 + if u.Hostname() == "localhost" { 37 + if u.Port() == "" || !strings.HasPrefix(u.Host, "localhost:") { 38 + return "", false, fmt.Errorf("port number is required for localhost") 39 + } 40 + return u.Host, noSSL, nil 41 + } 42 + 43 + // port numbers not allowed otherwise 44 + if u.Port() != "" { 45 + return "", false, fmt.Errorf("port number not allowed for non-local names") 46 + } 47 + 48 + // check it is a real hostname (eg, not IP address or single-word alias) 49 + h, err := syntax.ParseHandle(u.Host) 50 + if err != nil { 51 + return "", false, fmt.Errorf("not a public hostname") 52 + } 53 + 54 + // lower-case in response 55 + return h.Normalize().String(), noSSL, nil 56 + }
+1 -1
knotmirror/knotmirror.go
··· 49 49 } 50 50 logger.Info(fmt.Sprintf("clearing resyning states: %d records updated", rows)) 51 51 52 - xrpc := xrpc.New(logger, cfg, db, resolver) 53 52 knotstream := knotstream.NewKnotStream(logger, db, cfg) 54 53 crawler := NewCrawler(logger, db) 55 54 resyncer := NewResyncer(logger, db, gitm, cfg) 56 55 adminpage := NewAdminServer(logger, db, resyncer) 56 + xrpc := xrpc.New(logger, cfg, db, resolver, knotstream) 57 57 58 58 // maintain repository list with tap 59 59 // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events.
+2
knotmirror/resyncer.go
··· 281 281 282 282 repoUrl += "/info/refs?service=git-upload-pack" 283 283 284 + r.logger.Debug("checking knot reachability", "url", repoUrl) 285 + 284 286 client := http.Client{ 285 287 Timeout: 30 * time.Second, 286 288 }
+104
knotmirror/xrpc/sync_requestCrawl.go
··· 1 + package xrpc 2 + 3 + import ( 4 + "encoding/json" 5 + "fmt" 6 + "net/http" 7 + "strings" 8 + 9 + "github.com/bluesky-social/indigo/api/atproto" 10 + "github.com/bluesky-social/indigo/atproto/atclient" 11 + "github.com/bluesky-social/indigo/atproto/syntax" 12 + "github.com/bluesky-social/indigo/xrpc" 13 + "tangled.org/core/api/tangled" 14 + "tangled.org/core/knotmirror/db" 15 + "tangled.org/core/knotmirror/hostutil" 16 + "tangled.org/core/knotmirror/models" 17 + ) 18 + 19 + func (x *Xrpc) RequestCrawl(w http.ResponseWriter, r *http.Request) { 20 + var input tangled.SyncRequestCrawl_Input 21 + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { 22 + writeJson(w, http.StatusBadRequest, atclient.ErrorBody{Name: "BadRequest", Message: "failed to decode json body"}) 23 + return 24 + } 25 + 26 + ctx := r.Context() 27 + 28 + l := x.logger.With("input", input) 29 + 30 + hostname, noSSL, err := hostutil.ParseHostname(input.Hostname) 31 + if err != nil { 32 + l.Error("invalid hostname", "err", err) 33 + writeJson(w, http.StatusBadRequest, atclient.ErrorBody{Name: "BadRequest", Message: fmt.Sprintf("hostname field empty or invalid: %s", input.Hostname)}) 34 + return 35 + } 36 + 37 + // TODO: check if host is Knot with knot.describeServer 38 + 39 + // store given repoAt to db 40 + // this will allow knotmirror to ingest repo creation event bypassing tap. 41 + // this step won't be needed once we introduce did-for-repo 42 + // TODO(boltless): remove this section 43 + if input.EnsureRepo != nil { 44 + repoAt, err := syntax.ParseATURI(*input.EnsureRepo) 45 + if err != nil { 46 + l.Error("invalid repo at-uri", "err", err) 47 + writeJson(w, http.StatusBadRequest, atclient.ErrorBody{Name: "BadRequest", Message: fmt.Sprintf("repo parameter invalid: %s", *input.EnsureRepo)}) 48 + return 49 + } 50 + owner, err := x.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 51 + if err != nil || owner.Handle.IsInvalidHandle() { 52 + l.Error("failed to resolve ident", "err", err, "owner", repoAt.Authority().String()) 53 + writeErr(w, fmt.Errorf("failed to resolve repo owner")) 54 + return 55 + } 56 + xrpcc := xrpc.Client{Host: owner.PDSEndpoint()} 57 + out, err := atproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 58 + if err != nil { 59 + l.Error("failed to get repo record", "err", err, "repo", repoAt) 60 + writeErr(w, fmt.Errorf("failed to get repo record")) 61 + return 62 + } 63 + record := out.Value.Val.(*tangled.Repo) 64 + 65 + knotUrl := record.Knot 66 + if !strings.Contains(record.Knot, "://") { 67 + if noSSL { 68 + knotUrl = "http://" + knotUrl 69 + } else { 70 + knotUrl = "https://" + knotUrl 71 + } 72 + } 73 + 74 + repo := &models.Repo{ 75 + Did: owner.DID, 76 + Rkey: repoAt.RecordKey(), 77 + Cid: (*syntax.CID)(out.Cid), 78 + Name: record.Name, 79 + KnotDomain: knotUrl, 80 + State: models.RepoStatePending, 81 + ErrorMsg: "", 82 + RetryAfter: 0, 83 + RetryCount: 0, 84 + } 85 + 86 + if err := db.UpsertRepo(ctx, x.db, repo); err != nil { 87 + l.Error("failed to upsert repo", "err", err) 88 + writeErr(w, err) 89 + return 90 + } 91 + } 92 + 93 + // subscribe to requested host 94 + if !x.ks.CheckIfSubscribed(hostname) { 95 + if err := x.ks.SubscribeHost(ctx, hostname, noSSL); err != nil { 96 + // TODO(boltless): return HostBanned on banned hosts 97 + l.Error("failed to subscribe host", "err", err) 98 + writeErr(w, err) 99 + return 100 + } 101 + } 102 + 103 + w.WriteHeader(http.StatusOK) 104 + }
+5 -1
knotmirror/xrpc/xrpc.go
··· 12 12 "tangled.org/core/api/tangled" 13 13 "tangled.org/core/idresolver" 14 14 "tangled.org/core/knotmirror/config" 15 + "tangled.org/core/knotmirror/knotstream" 15 16 "tangled.org/core/log" 16 17 ) 17 18 ··· 19 20 cfg *config.Config 20 21 db *sql.DB 21 22 resolver *idresolver.Resolver 23 + ks *knotstream.KnotStream 22 24 logger *slog.Logger 23 25 } 24 26 25 - func New(logger *slog.Logger, cfg *config.Config, db *sql.DB, resolver *idresolver.Resolver) *Xrpc { 27 + func New(logger *slog.Logger, cfg *config.Config, db *sql.DB, resolver *idresolver.Resolver, ks *knotstream.KnotStream) *Xrpc { 26 28 return &Xrpc{ 27 29 cfg, 28 30 db, 29 31 resolver, 32 + ks, 30 33 log.SubLogger(logger, "xrpc"), 31 34 } 32 35 } ··· 47 50 r.Get("/"+tangled.GitTempListCommitsNSID, x.ListCommits) 48 51 r.Get("/"+tangled.GitTempListLanguagesNSID, x.ListLanguages) 49 52 r.Get("/"+tangled.GitTempListTagsNSID, x.ListTags) 53 + r.Post("/"+tangled.SyncRequestCrawlNSID, x.RequestCrawl) 50 54 51 55 return r 52 56 }
+5 -4
knotserver/config/config.go
··· 39 39 } 40 40 41 41 type Config struct { 42 - Repo Repo `env:",prefix=KNOT_REPO_"` 43 - Server Server `env:",prefix=KNOT_SERVER_"` 44 - Git Git `env:",prefix=KNOT_GIT_"` 45 - AppViewEndpoint string `env:"APPVIEW_ENDPOINT, default=https://tangled.org"` 42 + Repo Repo `env:",prefix=KNOT_REPO_"` 43 + Server Server `env:",prefix=KNOT_SERVER_"` 44 + Git Git `env:",prefix=KNOT_GIT_"` 45 + AppViewEndpoint string `env:"APPVIEW_ENDPOINT, default=https://tangled.org"` 46 + KnotMirrors []string `env:"KNOT_MIRRORS, default=https://mirror.tangled.network"` 46 47 } 47 48 48 49 func Load(ctx context.Context) (*Config, error) {
+29
knotserver/events.go
··· 7 7 "strconv" 8 8 "time" 9 9 10 + "github.com/bluesky-social/indigo/xrpc" 10 11 "github.com/gorilla/websocket" 12 + "tangled.org/core/api/tangled" 11 13 "tangled.org/core/log" 12 14 ) 13 15 ··· 61 63 return 62 64 } 63 65 66 + // try request crawl when connection closed 67 + defer func() { 68 + go func() { 69 + retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second) 70 + defer retryCancel() 71 + if err := h.requestCrawl(retryCtx); err != nil { 72 + l.Error("error requesting crawls", "err", err) 73 + } 74 + }() 75 + }() 76 + 64 77 for { 65 78 // wait for new data or timeout 66 79 select { ··· 118 131 119 132 return nil 120 133 } 134 + 135 + func (h *Knot) requestCrawl(ctx context.Context) error { 136 + h.l.Info("requesting crawl", "mirrors", h.c.KnotMirrors) 137 + input := &tangled.SyncRequestCrawl_Input{ 138 + Hostname: h.c.Server.Hostname, 139 + } 140 + for _, knotmirror := range h.c.KnotMirrors { 141 + xrpcc := xrpc.Client{Host: knotmirror} 142 + if err := tangled.SyncRequestCrawl(ctx, &xrpcc, input); err != nil { 143 + h.l.Error("error requesting crawl", "err", err) 144 + } else { 145 + h.l.Info("crawl requested successfully") 146 + } 147 + } 148 + return nil 149 + }
+16
knotserver/server.go
··· 5 5 "fmt" 6 6 "net/http" 7 7 8 + "github.com/bluesky-social/indigo/xrpc" 8 9 "github.com/urfave/cli/v3" 9 10 "tangled.org/core/api/tangled" 10 11 "tangled.org/core/hook" ··· 97 98 98 99 logger.Info("starting internal server", "address", c.Server.InternalListenAddr) 99 100 go http.ListenAndServe(c.Server.InternalListenAddr, imux) 101 + 102 + // TODO(boltless): too lazy here. should clear this up 103 + go func() { 104 + input := &tangled.SyncRequestCrawl_Input{ 105 + Hostname: c.Server.Hostname, 106 + } 107 + for _, knotmirror := range c.KnotMirrors { 108 + xrpcc := xrpc.Client{Host: knotmirror} 109 + if err := tangled.SyncRequestCrawl(ctx, &xrpcc, input); err != nil { 110 + logger.Error("error requesting crawl", "err", err) 111 + } else { 112 + logger.Info("crawl requested successfully") 113 + } 114 + } 115 + }() 100 116 101 117 logger.Info("starting main server", "address", c.Server.ListenAddr) 102 118 logger.Error("server error", "error", http.ListenAndServe(c.Server.ListenAddr, mux))
+30
knotserver/xrpc/create_repo.go
··· 1 1 package xrpc 2 2 3 3 import ( 4 + "context" 4 5 "encoding/json" 5 6 "errors" 6 7 "fmt" 7 8 "net/http" 8 9 "path/filepath" 9 10 "strings" 11 + "time" 10 12 11 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 14 "github.com/bluesky-social/indigo/atproto/syntax" ··· 120 122 repoPath, 121 123 ) 122 124 125 + // HACK: request crawl for this repository 126 + // Users won't want to sync entire network from their local knotmirror. 127 + // Therefore, to bypass the local tap, requestCrawl directly to the knotmirror. 128 + go func() { 129 + if h.Config.Server.Dev { 130 + repoAt := fmt.Sprintf("at://%s/%s/%s", actorDid, tangled.RepoNSID, rkey) 131 + rCtx, rCancel := context.WithTimeout(context.Background(), 10*time.Second) 132 + defer rCancel() 133 + h.requestCrawl(rCtx, &tangled.SyncRequestCrawl_Input{ 134 + Hostname: h.Config.Server.Hostname, 135 + EnsureRepo: &repoAt, 136 + }) 137 + } 138 + }() 139 + 123 140 w.WriteHeader(http.StatusOK) 141 + } 142 + 143 + func (h *Xrpc) requestCrawl(ctx context.Context, input *tangled.SyncRequestCrawl_Input) error { 144 + h.Logger.Info("requesting crawl", "mirrors", h.Config.KnotMirrors) 145 + for _, knotmirror := range h.Config.KnotMirrors { 146 + xrpcc := xrpc.Client{Host: knotmirror} 147 + if err := tangled.SyncRequestCrawl(ctx, &xrpcc, input); err != nil { 148 + h.Logger.Error("error requesting crawl", "err", err) 149 + } else { 150 + h.Logger.Info("crawl requested successfully") 151 + } 152 + } 153 + return nil 124 154 } 125 155 126 156 func validateRepoName(name string) error {
+9
nix/modules/knot.nix
··· 115 115 ''; 116 116 }; 117 117 118 + knotmirrors = mkOption { 119 + type = types.listOf types.str; 120 + default = [ 121 + "https://mirror.tangled.network" 122 + ]; 123 + description = "List of knotmirror hosts to request crawl"; 124 + }; 125 + 118 126 server = { 119 127 listenAddr = mkOption { 120 128 type = types.str; ··· 263 271 "KNOT_SERVER_PLC_URL=${cfg.server.plcUrl}" 264 272 "KNOT_SERVER_JETSTREAM_ENDPOINT=${cfg.server.jetstreamEndpoint}" 265 273 "KNOT_SERVER_OWNER=${cfg.server.owner}" 274 + "KNOT_MIRRORS=${concatStringsSep "," cfg.knotmirrors}" 266 275 "KNOT_SERVER_LOG_DIDS=${ 267 276 if cfg.server.logDids 268 277 then "true"
+4
nix/vm.nix
··· 110 110 plcUrl = plcUrl; 111 111 jetstreamEndpoint = jetstream; 112 112 listenAddr = "0.0.0.0:6444"; 113 + dev = true; 113 114 }; 115 + knotmirrors = [ 116 + "http://localhost:7000" 117 + ]; 114 118 }; 115 119 services.tangled.spindle = { 116 120 enable = true;