Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

base_url, typed url parts, api fixups,

+341 -185
+6 -2
slingshot/src/identity.rs
··· 457 457 &self, 458 458 did: &Did, 459 459 ) -> Result<Option<MiniServiceDoc>, IdentityError> { 460 - let key = IdentityKey::Did(did.clone()); 460 + let key = IdentityKey::ServiceDid(did.clone()); 461 461 metrics::counter!("slingshot_get_service_did_doc").increment(1); 462 462 let entry = self 463 463 .cache ··· 659 659 log::warn!( 660 660 "refreshed did doc failed: wrong did doc id. dropping refresh." 661 661 ); 662 + self.complete_refresh(&task_key).await?; 662 663 continue; 663 664 } 664 665 let mini_doc = match did_doc.try_into() { ··· 666 667 Err(e) => { 667 668 metrics::counter!("identity_did_refresh", "success" => "false", "reason" => "bad doc").increment(1); 668 669 log::warn!( 669 - "converting mini doc failed: {e:?}. dropping refresh." 670 + "converting mini doc for {did:?} failed: {e:?}. dropping refresh." 670 671 ); 672 + self.complete_refresh(&task_key).await?; 671 673 continue; 672 674 } 673 675 }; ··· 706 708 log::warn!( 707 709 "refreshed did doc failed: wrong did doc id. dropping refresh." 708 710 ); 711 + self.complete_refresh(&task_key).await?; 709 712 continue; 710 713 } 711 714 let mini_service_doc = match did_doc.try_into() { ··· 715 718 log::warn!( 716 719 "converting mini service doc failed: {e:?}. dropping refresh." 717 720 ); 721 + self.complete_refresh(&task_key).await?; 718 722 continue; 719 723 } 720 724 };
+19 -5
slingshot/src/main.rs
··· 1 - // use foyer::HybridCache; 2 - // use foyer::{Engine, DirectFsDeviceOptions, HybridCacheBuilder}; 3 1 use metrics_exporter_prometheus::PrometheusBuilder; 4 2 use slingshot::{ 5 3 Identity, Proxy, Repo, consume, error::MainTaskError, firehose_cache, healthcheck, serve, ··· 9 7 10 8 use clap::Parser; 11 9 use tokio_util::sync::CancellationToken; 10 + use url::Url; 12 11 13 12 /// Slingshot record edge cache 14 13 #[derive(Parser, Debug, Clone)] ··· 48 47 #[arg(long, env = "SLINGSHOT_IDENTITY_CACHE_DISK_DB")] 49 48 #[clap(default_value_t = 1)] 50 49 identity_cache_disk_gb: usize, 50 + /// the address of this server 51 + /// 52 + /// used if --acme-domain is not set, defaulting to `--bind` 53 + #[arg(long, conflicts_with("acme_domain"), env = "SLINGSHOT_PUBLIC_HOST")] 54 + base_url: Option<Url>, 51 55 /// the domain pointing to this server 52 56 /// 53 57 /// if present: ··· 101 105 102 106 let args = Args::parse(); 103 107 108 + let base_url: Url = args 109 + .base_url 110 + .or_else(|| { 111 + args.acme_domain 112 + .as_ref() 113 + .map(|d| Url::parse(&format!("https://{d}")).unwrap()) 114 + }) 115 + .unwrap_or_else(|| Url::parse(&format!("http://{}", args.bind)).unwrap()); 116 + 104 117 if args.collect_metrics { 105 118 log::trace!("installing metrics server..."); 106 119 if let Err(e) = install_metrics_server(args.bind_metrics) { ··· 164 177 identity_for_server, 165 178 repo, 166 179 proxy, 180 + base_url, 167 181 args.acme_domain, 168 182 args.acme_contact, 169 183 args.acme_cache_path, ··· 236 250 ) -> Result<(), metrics_exporter_prometheus::BuildError> { 237 251 log::info!("installing metrics server..."); 238 252 PrometheusBuilder::new() 239 - .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 240 - .set_bucket_duration(std::time::Duration::from_secs(300))? 241 - .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here. 253 + .set_buckets(&[0.001, 0.006, 0.036, 0.216, 1.296, 7.776, 45.656])? 254 + .set_bucket_duration(std::time::Duration::from_secs(15))? 255 + .set_bucket_count(std::num::NonZero::new(4).unwrap()) // count * duration = bucket lifetime 242 256 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 243 257 .with_http_listener(bind_metrics) 244 258 .install()?;
+128 -64
slingshot/src/proxy.rs
··· 1 1 use crate::{Identity, error::ProxyError, server::HydrationSource}; 2 - use atrium_api::types::string::{Did, Nsid}; 2 + use atrium_api::types::string::{AtIdentifier, Cid, Did, Nsid, RecordKey}; 3 3 use reqwest::Client; 4 4 use serde_json::{Map, Value}; 5 5 use std::{collections::HashMap, time::Duration}; ··· 135 135 136 136 // TODO i mean maybe we should look for headers also in our headers but not obviously 137 137 let mut headers = reqwest::header::HeaderMap::new(); 138 + // TODO: check the jwt aud against the upstream!!! 138 139 if let Some(auth) = authorization { 139 140 headers.insert("Authorization", auth.try_into()?); 140 141 } ··· 252 253 } 253 254 254 255 #[derive(Debug, PartialEq)] 256 + pub struct FullAtUriParts { 257 + pub repo: AtIdentifier, 258 + pub collection: Nsid, 259 + pub rkey: RecordKey, 260 + pub cid: Option<Cid>, 261 + } 262 + 263 + impl FullAtUriParts { 264 + pub fn to_uri(&self) -> String { 265 + let repo: String = self.repo.clone().into(); // no as_str for AtIdentifier atrium??? 266 + let collection = self.collection.as_str(); 267 + let rkey = self.rkey.as_str(); 268 + format!("at://{repo}/{collection}/{rkey}") 269 + } 270 + } 271 + 272 + // TODO: move this to links 273 + pub fn split_uri(uri: &str) -> Option<(AtIdentifier, Nsid, RecordKey)> { 274 + let rest = uri.strip_prefix("at://")?; 275 + let (repo, rest) = rest.split_once("/")?; 276 + let repo = repo.parse().ok()?; 277 + let (collection, rkey) = rest.split_once("/")?; 278 + let collection = collection.parse().ok()?; 279 + let rkey = rkey.split_once('#').map(|(k, _)| k).unwrap_or(rkey); 280 + let rkey = rkey.split_once('?').map(|(k, _)| k).unwrap_or(rkey); 281 + let rkey = rkey.parse().ok()?; 282 + Some((repo, collection, rkey)) 283 + } 284 + 285 + #[derive(Debug, PartialEq)] 255 286 pub enum MatchedRef { 256 - AtUri { uri: String, cid: Option<String> }, 257 - Identifier(String), 287 + AtUri(FullAtUriParts), 288 + Identifier(AtIdentifier), 258 289 } 259 290 260 291 pub fn match_shape(shape: RefShape, val: &Value) -> Option<MatchedRef> { ··· 266 297 RefShape::StrongRef => { 267 298 let o = val.as_object()?; 268 299 let uri = o.get("uri")?.as_str()?.to_string(); 269 - let cid = o.get("cid")?.as_str()?.to_string(); 270 - Some(MatchedRef::AtUri { 271 - uri, 300 + let cid = o.get("cid")?.as_str()?.parse().ok()?; 301 + let (repo, collection, rkey) = split_uri(&uri)?; 302 + Some(MatchedRef::AtUri(FullAtUriParts { 303 + repo, 304 + collection, 305 + rkey, 272 306 cid: Some(cid), 273 - }) 307 + })) 274 308 } 275 309 RefShape::AtUri => { 276 310 let uri = val.as_str()?.to_string(); 277 - Some(MatchedRef::AtUri { uri, cid: None }) 311 + let (repo, collection, rkey) = split_uri(&uri)?; 312 + Some(MatchedRef::AtUri(FullAtUriParts { 313 + repo, 314 + collection, 315 + rkey, 316 + cid: None, 317 + })) 278 318 } 279 319 RefShape::AtUriParts => { 280 320 let o = val.as_object()?; 281 - let identifier = o.get("repo").or(o.get("did"))?.as_str()?.to_string(); 282 - let collection = o.get("collection")?.as_str()?.to_string(); 283 - let rkey = o.get("rkey")?.as_str()?.to_string(); 284 - let uri = format!("at://{identifier}/{collection}/{rkey}"); 285 - let cid = o.get("cid").and_then(|v| v.as_str()).map(str::to_string); 286 - Some(MatchedRef::AtUri { uri, cid }) 321 + let repo = o.get("repo").or(o.get("did"))?.as_str()?.parse().ok()?; 322 + let collection = o.get("collection")?.as_str()?.parse().ok()?; 323 + let rkey = o.get("rkey")?.as_str()?.parse().ok()?; 324 + let cid = o 325 + .get("cid") 326 + .and_then(|v| v.as_str()) 327 + .and_then(|s| s.parse().ok()); 328 + Some(MatchedRef::AtUri(FullAtUriParts { 329 + repo, 330 + collection, 331 + rkey, 332 + cid, 333 + })) 287 334 } 288 335 RefShape::Did => { 289 - let id = val.as_str()?; 290 - if !id.starts_with("did:") { 291 - return None; 292 - } 293 - Some(MatchedRef::Identifier(id.to_string())) 336 + let did = val.as_str()?.parse().ok()?; 337 + Some(MatchedRef::Identifier(AtIdentifier::Did(did))) 294 338 } 295 339 RefShape::Handle => { 296 - let id = val.as_str()?; 297 - if id.contains(':') { 298 - return None; 299 - } 300 - Some(MatchedRef::Identifier(id.to_string())) 340 + let handle = val.as_str()?.parse().ok()?; 341 + Some(MatchedRef::Identifier(AtIdentifier::Handle(handle))) 342 + } 343 + RefShape::AtIdentifier => { 344 + let identifier = val.as_str()?.parse().ok()?; 345 + Some(MatchedRef::Identifier(identifier)) 301 346 } 302 - RefShape::AtIdentifier => Some(MatchedRef::Identifier(val.as_str()?.to_string())), 303 347 } 304 348 } 305 349 ··· 385 429 mod tests { 386 430 use super::*; 387 431 use serde_json::json; 432 + 433 + static TEST_CID: &str = "bafyreidffwk5wvh5l76yy7zefiqrovv6yaaegb4wg4zaq35w7nt3quix5a"; 388 434 389 435 #[test] 390 436 fn test_parse_record_path() -> Result<(), Box<dyn std::error::Error>> { ··· 439 485 ("strong-ref", json!(""), None), 440 486 ("strong-ref", json!({}), None), 441 487 ("strong-ref", json!({ "uri": "abc" }), None), 442 - ("strong-ref", json!({ "cid": "def" }), None), 488 + ("strong-ref", json!({ "cid": TEST_CID }), None), 443 489 ( 444 490 "strong-ref", 445 - json!({ "uri": "abc", "cid": "def" }), 446 - Some(MatchedRef::AtUri { 447 - uri: "abc".to_string(), 448 - cid: Some("def".to_string()), 449 - }), 491 + json!({ "uri": "at://a.com/xx.yy.zz/1", "cid": TEST_CID }), 492 + Some(MatchedRef::AtUri(FullAtUriParts { 493 + repo: "a.com".parse().unwrap(), 494 + collection: "xx.yy.zz".parse().unwrap(), 495 + rkey: "1".parse().unwrap(), 496 + cid: Some(TEST_CID.parse().unwrap()), 497 + })), 450 498 ), 451 499 ("at-uri", json!({ "uri": "abc" }), None), 452 - ("at-uri", json!({ "uri": "abc", "cid": "def" }), None), 453 500 ( 454 501 "at-uri", 455 - json!("abc"), 456 - Some(MatchedRef::AtUri { 457 - uri: "abc".to_string(), 502 + json!({ "uri": "at://did:web:y.com/xx.yy.zz/1", "cid": TEST_CID }), 503 + None, 504 + ), 505 + ( 506 + "at-uri", 507 + json!("at://did:web:y.com/xx.yy.zz/1"), 508 + Some(MatchedRef::AtUri(FullAtUriParts { 509 + repo: "did:web:y.com".parse().unwrap(), 510 + collection: "xx.yy.zz".parse().unwrap(), 511 + rkey: "1".parse().unwrap(), 458 512 cid: None, 459 - }), 513 + })), 460 514 ), 461 515 ("at-uri-parts", json!("abc"), None), 462 516 ("at-uri-parts", json!({}), None), 463 517 ( 464 518 "at-uri-parts", 465 - json!({"repo": "a", "collection": "b", "rkey": "c"}), 466 - Some(MatchedRef::AtUri { 467 - uri: "at://a/b/c".to_string(), 468 - cid: None, 469 - }), 519 + json!({"repo": "a.com", "collection": "xx.yy.zz", "rkey": "1", "cid": TEST_CID}), 520 + Some(MatchedRef::AtUri(FullAtUriParts { 521 + repo: "a.com".parse().unwrap(), 522 + collection: "xx.yy.zz".parse().unwrap(), 523 + rkey: "1".parse().unwrap(), 524 + cid: Some(TEST_CID.parse().unwrap()), 525 + })), 470 526 ), 471 527 ( 472 528 "at-uri-parts", 473 - json!({"did": "a", "collection": "b", "rkey": "c"}), 474 - Some(MatchedRef::AtUri { 475 - uri: "at://a/b/c".to_string(), 529 + json!({"did": "a.com", "collection": "xx.yy.zz", "rkey": "1"}), 530 + Some(MatchedRef::AtUri(FullAtUriParts { 531 + repo: "a.com".parse().unwrap(), 532 + collection: "xx.yy.zz".parse().unwrap(), 533 + rkey: "1".parse().unwrap(), 476 534 cid: None, 477 - }), 535 + })), 478 536 ), 479 537 ( 480 538 "at-uri-parts", 481 539 // 'repo' takes precedence over 'did' 482 - json!({"did": "a", "repo": "z", "collection": "b", "rkey": "c"}), 483 - Some(MatchedRef::AtUri { 484 - uri: "at://z/b/c".to_string(), 540 + json!({"did": "did:web:a.com", "repo": "z.com", "collection": "xx.yy.zz", "rkey": "1"}), 541 + Some(MatchedRef::AtUri(FullAtUriParts { 542 + repo: "z.com".parse().unwrap(), 543 + collection: "xx.yy.zz".parse().unwrap(), 544 + rkey: "1".parse().unwrap(), 485 545 cid: None, 486 - }), 546 + })), 487 547 ), 488 548 ( 489 549 "at-uri-parts", 490 - json!({"repo": "a", "collection": "b", "rkey": "c", "cid": "def"}), 491 - Some(MatchedRef::AtUri { 492 - uri: "at://a/b/c".to_string(), 493 - cid: Some("def".to_string()), 494 - }), 550 + json!({"repo": "a.com", "collection": "xx.yy.zz", "rkey": "1", "cid": TEST_CID}), 551 + Some(MatchedRef::AtUri(FullAtUriParts { 552 + repo: "a.com".parse().unwrap(), 553 + collection: "xx.yy.zz".parse().unwrap(), 554 + rkey: "1".parse().unwrap(), 555 + cid: Some(TEST_CID.parse().unwrap()), 556 + })), 495 557 ), 496 558 ( 497 559 "at-uri-parts", 498 - json!({"repo": "a", "collection": "b", "rkey": "c", "cid": {}}), 499 - Some(MatchedRef::AtUri { 500 - uri: "at://a/b/c".to_string(), 560 + json!({"repo": "a.com", "collection": "xx.yy.zz", "rkey": "1", "cid": {}}), 561 + Some(MatchedRef::AtUri(FullAtUriParts { 562 + repo: "a.com".parse().unwrap(), 563 + collection: "xx.yy.zz".parse().unwrap(), 564 + rkey: "1".parse().unwrap(), 501 565 cid: None, 502 - }), 566 + })), 503 567 ), 504 568 ("did", json!({}), None), 505 569 ("did", json!(""), None), ··· 507 571 ( 508 572 "did", 509 573 json!("did:plc:xyz"), 510 - Some(MatchedRef::Identifier("did:plc:xyz".to_string())), 574 + Some(MatchedRef::Identifier("did:plc:xyz".parse().unwrap())), 511 575 ), 512 576 ("handle", json!({}), None), 513 577 ( 514 578 "handle", 515 579 json!("bad-example.com"), 516 - Some(MatchedRef::Identifier("bad-example.com".to_string())), 580 + Some(MatchedRef::Identifier("bad-example.com".parse().unwrap())), 517 581 ), 518 582 ("handle", json!("did:plc:xyz"), None), 519 583 ("at-identifier", json!({}), None), 520 584 ( 521 585 "at-identifier", 522 586 json!("bad-example.com"), 523 - Some(MatchedRef::Identifier("bad-example.com".to_string())), 587 + Some(MatchedRef::Identifier("bad-example.com".parse().unwrap())), 524 588 ), 525 589 ( 526 590 "at-identifier", 527 591 json!("did:plc:xyz"), 528 - Some(MatchedRef::Identifier("did:plc:xyz".to_string())), 592 + Some(MatchedRef::Identifier("did:plc:xyz".parse().unwrap())), 529 593 ), 530 594 ]; 531 - for (shape, val, expected) in cases { 595 + for (i, (shape, val, expected)) in cases.into_iter().enumerate() { 532 596 let s = shape.try_into().unwrap(); 533 597 let matched = match_shape(s, &val); 534 - assert_eq!(matched, expected, "shape: {shape:?}, val: {val:?}"); 598 + assert_eq!(matched, expected, "{i}: shape: {shape:?}, val: {val:?}"); 535 599 } 536 600 } 537 601 }
+188 -114
slingshot/src/server.rs
··· 1 1 use crate::{ 2 2 CachedRecord, ErrorResponseObject, Identity, Proxy, Repo, 3 3 error::{RecordError, ServerError}, 4 - proxy::{MatchedRef, extract_links}, 4 + proxy::{FullAtUriParts, MatchedRef, extract_links, split_uri}, 5 5 record::RawRecord, 6 6 }; 7 - use atrium_api::types::string::{Cid, Did, Handle, Nsid, RecordKey}; 7 + use atrium_api::types::string::{AtIdentifier, Cid, Did, Handle, Nsid, RecordKey}; 8 8 use foyer::HybridCache; 9 9 use links::at_uri::parse_at_uri as normalize_at_uri; 10 10 use serde::Serialize; ··· 236 236 } 237 237 238 238 #[derive(Object)] 239 + #[oai(rename_all = "camelCase")] 239 240 struct ProxyHydrationError { 241 + /// Short description of why the hydration failed 240 242 reason: String, 243 + /// Whether or not it's recommended to retry requesting this item 244 + should_retry: bool, 245 + /// URL to follow up at if retrying 246 + follow_up: String, 241 247 } 242 248 243 249 #[derive(Object)] 250 + #[oai(rename_all = "camelCase")] 244 251 struct ProxyHydrationPending { 245 - url: String, 246 - } 247 - 248 - #[derive(Object)] 249 - struct ProxyHydrationRecordFound { 250 - record: serde_json::Value, 251 - } 252 - 253 - #[derive(Object)] 254 - struct ProxyHydrationIdentifierFound { 255 - mini_doc: MiniDocResponseObject, 252 + /// URL you can request to finish hydrating this item 253 + follow_up: String, 254 + /// Why this item couldn't be hydrated: 'deadline' or 'limit' 255 + /// 256 + /// - `deadline`: the item fetch didn't complete before the response was 257 + /// due, but will continue on slingshot in the background -- `followUp` 258 + /// requests are coalesced into the original item fetch to be available as 259 + /// early as possible. 260 + /// 261 + /// - `limit`: slingshot only attempts to hydrate the first 100 items found 262 + /// in a proxied response, with the remaining marked `pending`. You can 263 + /// request `followUp` to fetch them. 264 + /// 265 + /// In the future, Slingshot may put pending links after `limit` into a low- 266 + /// priority fetch queue, so that these items become available sooner on 267 + /// follow-up request as well. 268 + reason: String, 256 269 } 257 270 258 271 // todo: there's gotta be a supertrait that collects these? ··· 272 285 /// The original upstream response content 273 286 output: serde_json::Value, 274 287 /// Any hydrated records 275 - records: HashMap<String, Hydration<ProxyHydrationRecordFound>>, 288 + records: HashMap<String, Hydration<FoundRecordResponseObject>>, 276 289 /// Any hydrated identifiers 277 - identifiers: HashMap<String, Hydration<ProxyHydrationIdentifierFound>>, 290 + identifiers: HashMap<String, Hydration<MiniDocResponseObject>>, 278 291 } 279 292 impl Example for ProxyHydrateResponseObject { 280 293 fn example() -> Self { ··· 282 295 output: serde_json::json!({}), 283 296 records: HashMap::from([( 284 297 "asdf".into(), 285 - Hydration::Pending(ProxyHydrationPending { url: "todo".into() }), 298 + Hydration::Pending(ProxyHydrationPending { 299 + follow_up: "/xrpc/com.atproto.repo.getRecord?...".to_string(), 300 + reason: "deadline".to_string(), 301 + }), 286 302 )]), 287 303 identifiers: HashMap::new(), 288 304 } ··· 334 350 params: Option<serde_json::Value>, 335 351 /// Paths within the response to look for at-uris that can be hydrated 336 352 hydration_sources: Vec<HydrationSource>, 337 - // todo: deadline thing 353 + // todo: let clients pass a hydration deadline? 338 354 } 339 355 impl Example for ProxyQueryPayload { 340 356 fn example() -> Self { ··· 383 399 } 384 400 385 401 struct Xrpc { 402 + base_url: url::Url, 386 403 cache: HybridCache<String, CachedRecord>, 387 404 identity: Identity, 388 405 proxy: Arc<Proxy>, ··· 451 468 /// only retains the most recent version of a record. 452 469 Query(cid): Query<Option<String>>, 453 470 ) -> GetRecordResponse { 454 - self.get_record_impl(repo, collection, rkey, cid).await 471 + self.get_record_impl(&repo, &collection, &rkey, cid.as_deref()) 472 + .await 455 473 } 456 474 457 475 /// blue.microcosm.repo.getRecordByUri ··· 521 539 return bad_at_uri(); 522 540 }; 523 541 524 - // TODO: move this to links 525 - let Some(rest) = normalized.strip_prefix("at://") else { 526 - return bad_at_uri(); 527 - }; 528 - let Some((repo, rest)) = rest.split_once('/') else { 529 - return bad_at_uri(); 530 - }; 531 - let Some((collection, rest)) = rest.split_once('/') else { 542 + let Some((repo, collection, rkey)) = split_uri(&normalized) else { 532 543 return bad_at_uri(); 533 544 }; 534 - let rkey = if let Some((rkey, _rest)) = rest.split_once('?') { 535 - rkey 536 - } else { 537 - rest 538 - }; 539 545 540 546 self.get_record_impl( 541 - repo.to_string(), 542 - collection.to_string(), 543 - rkey.to_string(), 544 - cid, 547 + Into::<String>::into(repo).as_str(), 548 + collection.as_str(), 549 + rkey.as_str(), 550 + cid.as_deref(), 545 551 ) 546 552 .await 547 553 } ··· 795 801 &self, 796 802 Json(payload): Json<ProxyQueryPayload>, 797 803 ) -> ProxyHydrateResponse { 798 - // TODO: the Accept request header, if present, gotta be json 799 - // TODO: find any Authorization header and verify it. TBD about `aud`. 800 - 801 804 let params = if let Some(p) = payload.params { 802 805 let serde_json::Value::Object(map) = p else { 803 806 panic!("params have to be an object"); ··· 855 858 let mut identifiers = HashMap::new(); 856 859 857 860 enum GetThing { 858 - Record(String, Hydration<ProxyHydrationRecordFound>), 859 - Identifier(String, Hydration<ProxyHydrationIdentifierFound>), 861 + Record(String, Hydration<FoundRecordResponseObject>), 862 + Identifier(String, Hydration<MiniDocResponseObject>), 860 863 } 861 864 862 865 let (tx, mut rx) = mpsc::channel(1); ··· 865 868 866 869 for link in links { 867 870 match link { 868 - MatchedRef::AtUri { uri, cid } => { 869 - if records.contains_key(&uri) { 871 + MatchedRef::AtUri(parts) => { 872 + let non_canonical_url = parts.to_uri(); 873 + if records.contains_key(&non_canonical_url) { 870 874 log::warn!("skipping duplicate record without checking cid"); 871 875 continue; 872 876 } 873 - let mut u = url::Url::parse("https://example.com").unwrap(); 874 - u.query_pairs_mut().append_pair("at_uri", &uri); // BLEH todo 877 + let mut follow_up = self.base_url.clone(); 878 + follow_up.set_path("/xrpc/com.atproto.repo.getRecord"); 879 + follow_up 880 + .query_pairs_mut() 881 + .append_pair("repo", &Into::<String>::into(parts.repo.clone())) 882 + .append_pair("collection", parts.collection.as_str()) 883 + .append_pair("rkey", parts.rkey.as_str()); 884 + if let Some(ref cid) = parts.cid { 885 + follow_up 886 + .query_pairs_mut() 887 + .append_pair("cid", &cid.as_ref().to_string()); 888 + } 875 889 records.insert( 876 - uri.clone(), 890 + non_canonical_url.clone(), 877 891 Hydration::Pending(ProxyHydrationPending { 878 - url: format!( 879 - "/xrpc/blue.microcosm.repo.getRecordByUri?{}", 880 - u.query().unwrap() 881 - ), // TODO better; with cid, etc. 892 + reason: "deadline".to_string(), 893 + follow_up: follow_up.to_string(), 882 894 }), 883 895 ); 884 896 let tx = tx.clone(); 885 897 let identity = self.identity.clone(); 886 898 let repo = self.repo.clone(); 887 899 tokio::task::spawn(async move { 888 - let rest = uri.strip_prefix("at://").unwrap(); 889 - let (identifier, rest) = rest.split_once('/').unwrap(); 890 - let (collection, rkey) = rest.split_once('/').unwrap(); 891 - 892 - let did = if identifier.starts_with("did:") { 893 - Did::new(identifier.to_string()).unwrap() 894 - } else { 895 - let handle = Handle::new(identifier.to_string()).unwrap(); 896 - identity.handle_to_did(handle).await.unwrap().unwrap() 900 + let FullAtUriParts { 901 + repo: ident, 902 + collection, 903 + rkey, 904 + cid, 905 + } = parts; 906 + let did = match ident { 907 + AtIdentifier::Did(did) => did, 908 + AtIdentifier::Handle(handle) => { 909 + let Ok(Some(did)) = identity.handle_to_did(handle).await 910 + else { 911 + let res = Hydration::Error(ProxyHydrationError { 912 + reason: "could not resolve handle".to_string(), 913 + should_retry: true, 914 + follow_up: follow_up.to_string(), 915 + }); 916 + return if tx 917 + .send(GetThing::Record(non_canonical_url, res)) 918 + .await 919 + .is_ok() 920 + { 921 + metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "true").increment(1); 922 + } else { 923 + metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "false").increment(1); 924 + }; 925 + }; 926 + did 927 + } 897 928 }; 898 929 899 - let res = match repo 900 - .get_record( 901 - &did, 902 - &Nsid::new(collection.to_string()).unwrap(), 903 - &RecordKey::new(rkey.to_string()).unwrap(), 904 - &cid.as_ref().map(|s| Cid::from_str(s).unwrap()), 905 - ) 930 + let res = 931 + match repo.get_record(&did, &collection, &rkey, &cid).await { 932 + Ok(CachedRecord::Deleted) => { 933 + Hydration::Error(ProxyHydrationError { 934 + reason: "record deleted".to_string(), 935 + should_retry: false, 936 + follow_up: follow_up.to_string(), 937 + }) 938 + } 939 + Ok(CachedRecord::Found(RawRecord { 940 + cid: found_cid, 941 + record, 942 + })) => { 943 + if cid 944 + .as_ref() 945 + .map(|expected| *expected != found_cid) 946 + .unwrap_or(false) 947 + { 948 + Hydration::Error(ProxyHydrationError { 949 + reason: "not found".to_string(), 950 + should_retry: false, 951 + follow_up: follow_up.to_string(), 952 + }) 953 + } else if let Ok(value) = serde_json::from_str(&record) 954 + { 955 + let canonical_uri = FullAtUriParts { 956 + repo: AtIdentifier::Did(did), 957 + collection, 958 + rkey, 959 + cid: None, // not used for .to_uri 960 + } 961 + .to_uri(); 962 + Hydration::Found(FoundRecordResponseObject { 963 + cid: Some(found_cid.as_ref().to_string()), 964 + uri: canonical_uri, 965 + value, 966 + }) 967 + } else { 968 + Hydration::Error(ProxyHydrationError { 969 + reason: "could not parse upstream response" 970 + .to_string(), 971 + should_retry: false, 972 + follow_up: follow_up.to_string(), 973 + }) 974 + } 975 + } 976 + Err(e) => { 977 + log::warn!("finally oop {e:?}"); 978 + Hydration::Error(ProxyHydrationError { 979 + reason: "failed to fetch record".to_string(), 980 + should_retry: true, // TODO 981 + follow_up: follow_up.to_string(), 982 + }) 983 + } 984 + }; 985 + if tx 986 + .send(GetThing::Record(non_canonical_url, res)) 906 987 .await 988 + .is_ok() 907 989 { 908 - Ok(CachedRecord::Deleted) => { 909 - Hydration::Error(ProxyHydrationError { 910 - reason: "record deleted".to_string(), 911 - }) 912 - } 913 - Ok(CachedRecord::Found(RawRecord { 914 - cid: found_cid, 915 - record, 916 - })) => { 917 - if let Some(c) = cid 918 - && found_cid.as_ref().to_string() != c 919 - { 920 - log::warn!("ignoring cid mismatch"); 921 - } 922 - let value = serde_json::from_str(&record).unwrap(); 923 - Hydration::Found(ProxyHydrationRecordFound { 924 - record: value, 925 - }) 926 - } 927 - Err(e) => { 928 - log::warn!("finally oop {e:?}"); 929 - Hydration::Error(ProxyHydrationError { 930 - reason: "failed to fetch record".to_string(), 931 - }) 932 - } 933 - }; 934 - if tx.send(GetThing::Record(uri, res)).await.is_ok() { 935 990 metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "true").increment(1); 936 991 } else { 937 992 metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "false").increment(1); ··· 939 994 }); 940 995 } 941 996 MatchedRef::Identifier(id) => { 942 - if identifiers.contains_key(&id) { 997 + let identifier: String = id.clone().into(); 998 + if identifiers.contains_key(&identifier) { 943 999 continue; 944 1000 } 945 - let mut u = url::Url::parse("https://example.com").unwrap(); 946 - u.query_pairs_mut().append_pair("identifier", &id); 1001 + 1002 + let mut follow_up = self.base_url.clone(); 1003 + follow_up.set_path("/xrpc/blue.microcosm.identity.resolveMiniDoc"); 1004 + 1005 + follow_up 1006 + .query_pairs_mut() 1007 + .append_pair("identifier", &identifier); 1008 + 947 1009 identifiers.insert( 948 - id.clone(), 1010 + identifier.clone(), 949 1011 Hydration::Pending(ProxyHydrationPending { 950 - url: format!( 951 - "/xrpc/blue.microcosm.identity.resolveMiniDoc?{}", 952 - u.query().unwrap() 953 - ), // gross 1012 + reason: "deadline".to_string(), 1013 + follow_up: follow_up.to_string(), 954 1014 }), 955 1015 ); 956 1016 let tx = tx.clone(); 957 1017 let identity = self.identity.clone(); 958 1018 tokio::task::spawn(async move { 959 - let res = match Self::resolve_mini_doc_impl(&id, identity).await { 1019 + let res = match Self::resolve_mini_doc_impl(&identifier, identity) 1020 + .await 1021 + { 960 1022 ResolveMiniDocResponse::Ok(Json(mini_doc)) => { 961 - Hydration::Found(ProxyHydrationIdentifierFound { mini_doc }) 1023 + Hydration::Found(mini_doc) 962 1024 } 963 1025 ResolveMiniDocResponse::BadRequest(e) => { 964 1026 log::warn!("minidoc fail: {:?}", e.0); 965 1027 Hydration::Error(ProxyHydrationError { 966 1028 reason: "failed to resolve mini doc".to_string(), 1029 + should_retry: false, 1030 + follow_up: follow_up.to_string(), 967 1031 }) 968 1032 } 969 1033 }; 970 - if tx.send(GetThing::Identifier(id, res)).await.is_ok() { 1034 + if tx.send(GetThing::Identifier(identifier, res)).await.is_ok() { 971 1035 metrics::counter!("slingshot_hydrated_one", "type" => "identity", "ontime" => "true").increment(1); 972 1036 } else { 973 1037 metrics::counter!("slingshot_hydrated_one", "type" => "identity", "ontime" => "false").increment(1); ··· 985 1049 while let Some(hydration) = rx.recv().await { 986 1050 match hydration { 987 1051 GetThing::Record(uri, h) => { 988 - records.insert(uri, h); 1052 + if let Some(r) = records.get_mut(&uri) { 1053 + match (&r, &h) { 1054 + (_, Hydration::Found(_)) => *r = h, // always replace if found 1055 + (Hydration::Pending(_), _) => *r = h, // or if it was pending 1056 + _ => {} // else leave it 1057 + } 1058 + } else { 1059 + records.insert(uri, h); 1060 + } 989 1061 } 990 - GetThing::Identifier(uri, md) => { 991 - identifiers.insert(uri, md); 1062 + GetThing::Identifier(identifier, md) => { 1063 + identifiers.insert(identifier.to_string(), md); 992 1064 } 993 1065 }; 994 1066 } ··· 1016 1088 1017 1089 async fn get_record_impl( 1018 1090 &self, 1019 - repo: String, 1020 - collection: String, 1021 - rkey: String, 1022 - cid: Option<String>, 1091 + repo: &str, 1092 + collection: &str, 1093 + rkey: &str, 1094 + cid: Option<&str>, 1023 1095 ) -> GetRecordResponse { 1024 - let did = match Did::new(repo.clone()) { 1096 + let did = match Did::new(repo.to_string()) { 1025 1097 Ok(did) => did, 1026 1098 Err(_) => { 1027 1099 let Ok(handle) = Handle::new(repo.to_lowercase()) else { ··· 1052 1124 } 1053 1125 }; 1054 1126 1055 - let Ok(collection) = Nsid::new(collection) else { 1127 + let Ok(collection) = Nsid::new(collection.to_string()) else { 1056 1128 return GetRecordResponse::BadRequest(xrpc_error( 1057 1129 "InvalidRequest", 1058 1130 "Invalid NSID for collection", 1059 1131 )); 1060 1132 }; 1061 1133 1062 - let Ok(rkey) = RecordKey::new(rkey) else { 1134 + let Ok(rkey) = RecordKey::new(rkey.to_string()) else { 1063 1135 return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid rkey")); 1064 1136 }; 1065 1137 1066 1138 let cid: Option<Cid> = if let Some(cid) = cid { 1067 - let Ok(cid) = Cid::from_str(&cid) else { 1139 + let Ok(cid) = Cid::from_str(cid) else { 1068 1140 return GetRecordResponse::BadRequest(xrpc_error("InvalidRequest", "Invalid CID")); 1069 1141 }; 1070 1142 Some(cid) ··· 1213 1285 identity: Identity, 1214 1286 repo: Repo, 1215 1287 proxy: Proxy, 1288 + base_url: url::Url, 1216 1289 acme_domain: Option<String>, 1217 1290 acme_contact: Option<String>, 1218 1291 acme_cache_path: Option<PathBuf>, ··· 1224 1297 let proxy = Arc::new(proxy); 1225 1298 let api_service = OpenApiService::new( 1226 1299 Xrpc { 1300 + base_url, 1227 1301 cache, 1228 1302 identity, 1229 1303 proxy,