Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

service doc cache

+297 -25
+193 -15
slingshot/src/identity.rs
··· 17 17 18 18 use crate::error::IdentityError; 19 19 use atrium_api::{ 20 - did_doc::DidDocument, 20 + did_doc::{DidDocument, Service as DidDocServic}, 21 21 types::string::{Did, Handle}, 22 22 }; 23 23 use atrium_common::resolver::Resolver; ··· 41 41 pub enum IdentityKey { 42 42 Handle(Handle), 43 43 Did(Did), 44 + ServiceDid(Did), 44 45 } 45 46 46 47 impl IdentityKey { ··· 48 49 let s = match self { 49 50 IdentityKey::Handle(h) => h.as_str(), 50 51 IdentityKey::Did(d) => d.as_str(), 52 + IdentityKey::ServiceDid(d) => d.as_str(), 51 53 }; 52 54 std::mem::size_of::<Self>() + std::mem::size_of_val(s) 53 55 } ··· 59 61 #[derive(Debug, Serialize, Deserialize)] 60 62 enum IdentityData { 61 63 NotFound, 62 - Did(Did), 63 - Doc(PartialMiniDoc), 64 + Did(Did), // from handle 65 + Doc(PartialMiniDoc), // from did 66 + ServiceDoc(MiniServiceDoc), // from service did 64 67 } 65 68 66 69 impl IdentityVal { ··· 71 74 IdentityData::Did(d) => std::mem::size_of_val(d.as_str()), 72 75 IdentityData::Doc(d) => { 73 76 std::mem::size_of_val(d.unverified_handle.as_str()) 74 - + std::mem::size_of_val(d.pds.as_str()) 75 - + std::mem::size_of_val(d.signing_key.as_str()) 77 + + std::mem::size_of_val(&d.pds) 78 + + std::mem::size_of_val(&d.signing_key) 76 79 } 80 + IdentityData::ServiceDoc(d) => { 81 + let mut s = std::mem::size_of::<MiniServiceDoc>(); 82 + s += std::mem::size_of_val(&d.services); 83 + for sv in &d.services { 84 + s += std::mem::size_of_val(&sv.full_id); 85 + s += std::mem::size_of_val(&sv.r#type); 86 + s += std::mem::size_of_val(&sv.endpoint); 87 + } 88 + s 89 + }, 77 90 }; 78 91 wrapping + inner 79 92 } ··· 168 181 } 169 182 } 170 183 184 + /// Simplified info from service DID docs 185 + #[derive(Debug, Clone, Serialize, Deserialize)] 186 + pub struct MiniServiceDoc { 187 + services: Vec<MiniService>, 188 + } 189 + 190 + impl MiniServiceDoc { 191 + pub fn get(&self, id_fragment: &str, service_type: Option<&str>) -> Option<&MiniService> { 192 + self.services 193 + .iter() 194 + .find(|MiniService { full_id, r#type, .. }| 195 + full_id.ends_with(id_fragment) 196 + && service_type 197 + .map(|t| t == *r#type) 198 + .unwrap_or(true)) 199 + } 200 + } 201 + 202 + /// The corresponding service info 203 + #[derive(Debug, Clone, Serialize, Deserialize)] 204 + pub struct MiniService { 205 + /// The full id 206 + /// 207 + /// for informational purposes only -- services are deduplicated by id fragment 208 + full_id: String, 209 + r#type: String, 210 + /// HTTP endpoint for the actual service 211 + pub endpoint: String, 212 + } 213 + 214 + impl TryFrom<DidDocument> for MiniServiceDoc { 215 + type Error = String; 216 + fn try_from(did_doc: DidDocument) -> Result<Self, Self::Error> { 217 + let mut services = Vec::new(); 218 + let mut seen = HashSet::new(); 219 + 220 + for DidDocServic { id, r#type, service_endpoint } in did_doc.service.unwrap_or(vec![]) { 221 + let Some((_, id_fragment)) = id.rsplit_once('#') else { 222 + continue; 223 + }; 224 + if !seen.insert((id_fragment.to_string(), r#type.clone())) { 225 + continue; 226 + } 227 + services.push(MiniService { 228 + full_id: id, 229 + r#type, 230 + endpoint: service_endpoint, 231 + }); 232 + } 233 + 234 + Ok(Self { services }) 235 + } 236 + } 237 + 171 238 /// multi-producer *single-consumer* queue structures (wrap in arc-mutex plz) 172 239 /// 173 240 /// the hashset allows testing for presense of items in the queue. ··· 296 363 let now = UtcDateTime::now(); 297 364 let IdentityVal(last_fetch, data) = entry.value(); 298 365 match data { 299 - IdentityData::Doc(_) => { 300 - log::error!("identity value mixup: got a doc from a handle key (should be a did)"); 301 - Err(IdentityError::IdentityValTypeMixup(handle.to_string())) 302 - } 303 366 IdentityData::NotFound => { 304 367 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 305 368 metrics::counter!("identity_handle_refresh_queued", "reason" => "ttl", "found" => "false").increment(1); ··· 313 376 self.queue_refresh(key).await; 314 377 } 315 378 Ok(Some(did.clone())) 379 + } 380 + _ => { 381 + log::error!("identity value mixup: got a doc from a handle key (should be a did)"); 382 + Err(IdentityError::IdentityValTypeMixup(handle.to_string())) 316 383 } 317 384 } 318 385 } ··· 362 429 let now = UtcDateTime::now(); 363 430 let IdentityVal(last_fetch, data) = entry.value(); 364 431 match data { 365 - IdentityData::Did(_) => { 366 - log::error!("identity value mixup: got a did from a did key (should be a doc)"); 367 - Err(IdentityError::IdentityValTypeMixup(did.to_string())) 368 - } 369 432 IdentityData::NotFound => { 370 433 if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 371 434 metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1); ··· 373 436 } 374 437 Ok(None) 375 438 } 376 - IdentityData::Doc(mini_did) => { 439 + IdentityData::Doc(mini_doc) => { 377 440 if (now - *last_fetch) >= MIN_TTL { 378 441 metrics::counter!("identity_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1); 379 442 self.queue_refresh(key).await; 380 443 } 381 - Ok(Some(mini_did.clone())) 444 + Ok(Some(mini_doc.clone())) 445 + } 446 + _ => { 447 + log::error!("identity value mixup: got a doc from a handle key (should be a did)"); 448 + Err(IdentityError::IdentityValTypeMixup(did.to_string())) 449 + } 450 + } 451 + } 452 + 453 + /// Fetch (and cache) a service mini doc from a did 454 + pub async fn did_to_mini_service_doc( 455 + &self, 456 + did: &Did, 457 + ) -> Result<Option<MiniServiceDoc>, IdentityError> { 458 + let key = IdentityKey::Did(did.clone()); 459 + metrics::counter!("slingshot_get_service_did_doc").increment(1); 460 + let entry = self 461 + .cache 462 + .get_or_fetch(&key, { 463 + let did = did.clone(); 464 + let resolver = self.did_resolver.clone(); 465 + || async move { 466 + let t0 = Instant::now(); 467 + let (res, success) = match resolver.resolve(&did).await { 468 + Ok(did_doc) if did_doc.id != did.to_string() => ( 469 + // TODO: fix in atrium: should verify id is did 470 + Err(IdentityError::BadDidDoc( 471 + "did doc's id did not match did".to_string(), 472 + )), 473 + "false", 474 + ), 475 + Ok(did_doc) => match did_doc.try_into() { 476 + Ok(mini_service_doc) => ( 477 + Ok(IdentityVal(UtcDateTime::now(), IdentityData::ServiceDoc(mini_service_doc))), 478 + "true", 479 + ), 480 + Err(e) => (Err(IdentityError::BadDidDoc(e)), "false"), 481 + }, 482 + Err(atrium_identity::Error::NotFound) => ( 483 + Ok(IdentityVal(UtcDateTime::now(), IdentityData::NotFound)), 484 + "false", 485 + ), 486 + Err(other) => (Err(IdentityError::ResolutionFailed(other)), "false"), 487 + }; 488 + metrics::histogram!("slingshot_fetch_service_did_doc", "success" => success) 489 + .record(t0.elapsed()); 490 + res 491 + } 492 + }) 493 + .await?; 494 + 495 + let now = UtcDateTime::now(); 496 + let IdentityVal(last_fetch, data) = entry.value(); 497 + match data { 498 + IdentityData::NotFound => { 499 + if (now - *last_fetch) >= MIN_NOT_FOUND_TTL { 500 + metrics::counter!("identity_service_did_refresh_queued", "reason" => "ttl", "found" => "false").increment(1); 501 + self.queue_refresh(key).await; 502 + } 503 + Ok(None) 504 + } 505 + IdentityData::ServiceDoc(mini_service_doc) => { 506 + if (now - *last_fetch) >= MIN_TTL { 507 + metrics::counter!("identity_service_did_refresh_queued", "reason" => "ttl", "found" => "true").increment(1); 508 + self.queue_refresh(key).await; 509 + } 510 + Ok(Some(mini_service_doc.clone())) 511 + } 512 + _ => { 513 + log::error!("identity value mixup: got a doc from a different key type (should be a service did)"); 514 + Err(IdentityError::IdentityValTypeMixup(did.to_string())) 382 515 } 383 516 } 384 517 } ··· 554 687 } 555 688 556 689 self.complete_refresh(&task_key).await?; // failures are bugs, so break loop 690 + } 691 + IdentityKey::ServiceDid(ref did) => { 692 + log::trace!("refreshing service did doc: {did:?}"); 693 + 694 + match self.did_resolver.resolve(did).await { 695 + Ok(did_doc) => { 696 + // TODO: fix in atrium: should verify id is did 697 + if did_doc.id != did.to_string() { 698 + metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "wrong did").increment(1); 699 + log::warn!( 700 + "refreshed did doc failed: wrong did doc id. dropping refresh." 701 + ); 702 + continue; 703 + } 704 + let mini_service_doc = match did_doc.try_into() { 705 + Ok(md) => md, 706 + Err(e) => { 707 + metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "bad doc").increment(1); 708 + log::warn!( 709 + "converting mini service doc failed: {e:?}. dropping refresh." 710 + ); 711 + continue; 712 + } 713 + }; 714 + metrics::counter!("identity_service_did_refresh", "success" => "true") 715 + .increment(1); 716 + self.cache.insert( 717 + task_key.clone(), 718 + IdentityVal(UtcDateTime::now(), IdentityData::ServiceDoc(mini_service_doc)), 719 + ); 720 + } 721 + Err(atrium_identity::Error::NotFound) => { 722 + metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "not found").increment(1); 723 + self.cache.insert( 724 + task_key.clone(), 725 + IdentityVal(UtcDateTime::now(), IdentityData::NotFound), 726 + ); 727 + } 728 + Err(err) => { 729 + metrics::counter!("identity_service_did_refresh", "success" => "false", "reason" => "other").increment(1); 730 + log::warn!( 731 + "failed to refresh did doc: {err:?}. leaving stale (should we eventually do something?)" 732 + ); 733 + } 734 + } 557 735 } 558 736 } 559 737 }
+104 -10
slingshot/src/server.rs
··· 34 34 fn example_did() -> String { 35 35 "did:plc:hdhoaan3xa3jiuq4fg4mefid".to_string() 36 36 } 37 + fn example_service_did() -> String { 38 + "did:web:constellation.microcosm.blue".to_string() 39 + } 37 40 fn example_collection() -> String { 38 41 "app.bsky.feed.like".to_string() 39 42 } 40 43 fn example_rkey() -> String { 41 44 "3lv4ouczo2b2a".to_string() 45 + } 46 + fn example_id_fragment() -> String { 47 + "#constellation".to_string() 42 48 } 43 49 fn example_uri() -> String { 44 50 format!( ··· 86 92 })) 87 93 } 88 94 89 - fn bad_request_handler_resolve_mini(err: poem::Error) -> ResolveMiniIDResponse { 90 - ResolveMiniIDResponse::BadRequest(Json(XrpcErrorResponseObject { 95 + fn bad_request_handler_resolve_mini(err: poem::Error) -> ResolveMiniDocResponse { 96 + ResolveMiniDocResponse::BadRequest(Json(XrpcErrorResponseObject { 97 + error: "InvalidRequest".to_string(), 98 + message: format!("Bad request, here's some info that maybe should not be exposed: {err}"), 99 + })) 100 + } 101 + 102 + fn bad_request_handler_resolve_service(err: poem::Error) -> ResolveServiceResponse { 103 + ResolveServiceResponse::BadRequest(Json(XrpcErrorResponseObject { 91 104 error: "InvalidRequest".to_string(), 92 105 message: format!("Bad request, here's some info that maybe should not be exposed: {err}"), 93 106 })) ··· 189 202 190 203 #[derive(ApiResponse)] 191 204 #[oai(bad_request_handler = "bad_request_handler_resolve_mini")] 192 - enum ResolveMiniIDResponse { 205 + enum ResolveMiniDocResponse { 193 206 /// Identity resolved 194 207 #[oai(status = 200)] 195 208 Ok(Json<MiniDocResponseObject>), 196 209 /// Bad request or identity not resolved 197 210 #[oai(status = 400)] 198 211 BadRequest(XrpcError), 212 + } 213 + 214 + #[derive(Object)] 215 + #[oai(example = true)] 216 + struct ServiceResponseObject { 217 + /// The service endpoint URL, if found 218 + endpoint: String, 219 + } 220 + impl Example for ServiceResponseObject { 221 + fn example() -> Self { 222 + Self { 223 + endpoint: "https://example.com".to_string(), 224 + } 225 + } 226 + } 227 + 228 + #[derive(ApiResponse)] 229 + #[oai(bad_request_handler = "bad_request_handler_resolve_service")] 230 + enum ResolveServiceResponse { 231 + /// Service resolved 232 + #[oai(status = 200)] 233 + Ok(Json<ServiceResponseObject>), 234 + /// Bad request or service not resolved 235 + #[oai(status = 400)] 236 + BadRequest(XrpcError) 199 237 } 200 238 201 239 #[derive(Object)] ··· 578 616 /// Handle or DID to resolve 579 617 #[oai(example = "example_handle")] 580 618 Query(identifier): Query<String>, 581 - ) -> ResolveMiniIDResponse { 619 + ) -> ResolveMiniDocResponse { 582 620 self.resolve_mini_id(Query(identifier)).await 583 621 } 584 622 ··· 596 634 /// Handle or DID to resolve 597 635 #[oai(example = "example_handle")] 598 636 Query(identifier): Query<String>, 599 - ) -> ResolveMiniIDResponse { 637 + ) -> ResolveMiniDocResponse { 600 638 Self::resolve_mini_doc_impl(&identifier, self.identity.clone()).await 601 639 } 602 640 603 - async fn resolve_mini_doc_impl(identifier: &str, identity: Identity) -> ResolveMiniIDResponse { 641 + async fn resolve_mini_doc_impl(identifier: &str, identity: Identity) -> ResolveMiniDocResponse { 604 642 let invalid = |reason: &'static str| { 605 - ResolveMiniIDResponse::BadRequest(xrpc_error("InvalidRequest", reason)) 643 + ResolveMiniDocResponse::BadRequest(xrpc_error("InvalidRequest", reason)) 606 644 }; 607 645 608 646 let mut unverified_handle = None; ··· 667 705 } 668 706 }; 669 707 670 - ResolveMiniIDResponse::Ok(Json(MiniDocResponseObject { 708 + ResolveMiniDocResponse::Ok(Json(MiniDocResponseObject { 671 709 did: did.to_string(), 672 710 handle, 673 711 pds: partial_doc.pds, ··· 675 713 })) 676 714 } 677 715 716 + /// com.bad-example.identity.resolveService 717 + /// 718 + /// resolve an atproto service did + id to its http endpoint 719 + #[oai( 720 + path = "/com.bad-example.identity.resolveService", 721 + method = "get", 722 + tag = "ApiTags::Custom" 723 + )] 724 + async fn resolve_service( 725 + &self, 726 + /// the service's did 727 + #[oai(example = "example_service_did")] 728 + Query(did): Query<String>, 729 + /// id fragment, starting with '#' 730 + /// 731 + /// must be url-encoded! 732 + #[oai(example = "example_id_fragment")] 733 + Query(id): Query<String>, 734 + /// optionally, the exact service type to filter 735 + /// 736 + /// resolving a pds requires matching the type as well as id. service 737 + /// proxying ignores the type. 738 + Query(r#type): Query<Option<String>>, 739 + ) -> ResolveServiceResponse { 740 + let Ok(did) = Did::new(did) else { 741 + return ResolveServiceResponse::BadRequest(xrpc_error("InvalidRequest", "could not parse 'did' into a DID")); 742 + }; 743 + let identity = self.identity.clone(); 744 + Self::resolve_service_impl(&did, &id, r#type.as_deref(), identity).await 745 + } 746 + 747 + async fn resolve_service_impl( 748 + did: &Did, 749 + id_fragment: &str, 750 + service_type: Option<&str>, 751 + identity: Identity 752 + ) -> ResolveServiceResponse { 753 + let invalid = |reason: &'static str| { 754 + ResolveServiceResponse::BadRequest(xrpc_error("InvalidRequest", reason)) 755 + }; 756 + let Ok(service_mini_doc) = identity.did_to_mini_service_doc(&did).await else { 757 + return invalid("Failed to get DID doc"); 758 + }; 759 + let Some(service_mini_doc) = service_mini_doc else { 760 + return invalid("Failed to find DID doc"); 761 + }; 762 + 763 + let Some(matching) = service_mini_doc.get(id_fragment, service_type) else { 764 + return invalid("failed to match identity (and maybe type)"); 765 + }; 766 + 767 + ResolveServiceResponse::Ok(Json(ServiceResponseObject { 768 + endpoint: matching.endpoint.clone(), 769 + })) 770 + } 771 + 678 772 /// com.bad-example.proxy.hydrateQueryResponse 679 773 /// 680 774 /// > [!important] ··· 792 886 let identity = self.identity.clone(); 793 887 tokio::task::spawn(async move { 794 888 let res = match Self::resolve_mini_doc_impl(&id, identity).await { 795 - ResolveMiniIDResponse::Ok(Json(mini_doc)) => Hydration::Found(ProxyHydrationIdentifierFound { 889 + ResolveMiniDocResponse::Ok(Json(mini_doc)) => Hydration::Found(ProxyHydrationIdentifierFound { 796 890 mini_doc 797 891 }), 798 - ResolveMiniIDResponse::BadRequest(e) => { 892 + ResolveMiniDocResponse::BadRequest(e) => { 799 893 log::warn!("minidoc fail: {:?}", e.0); 800 894 Hydration::Error(ProxyHydrationError { 801 895 reason: "failed to resolve mini doc".to_string(),