Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

include headers, timeout hydration, measure times

+79 -27
+2
slingshot/src/error.rs
··· 101 101 #[error(transparent)] 102 102 ReqwestError(#[from] reqwest::Error), 103 103 #[error(transparent)] 104 + InvalidHeader(#[from] reqwest::header::InvalidHeaderValue), 105 + #[error(transparent)] 104 106 IdentityError(#[from] IdentityError), 105 107 #[error("upstream service could not be resolved")] 106 108 ServiceNotFound,
+1 -1
slingshot/src/main.rs
··· 152 152 log::info!("identity service ready."); 153 153 154 154 let repo = Repo::new(identity.clone()); 155 - let proxy = Proxy::new(repo.clone(), identity.clone()); 155 + let proxy = Proxy::new(identity.clone()); 156 156 157 157 let identity_for_server = identity.clone(); 158 158 let server_shutdown = shutdown.clone();
+29 -14
slingshot/src/proxy.rs
··· 1 - use crate::{Identity, Repo, error::ProxyError, server::HydrationSource}; 1 + use crate::{Identity, error::ProxyError, server::HydrationSource}; 2 2 use atrium_api::types::string::{Did, Nsid}; 3 3 use reqwest::Client; 4 4 use serde_json::{Map, Value}; ··· 70 70 71 71 #[derive(Clone)] 72 72 pub struct Proxy { 73 - repo: Repo, 74 73 identity: Identity, 75 74 client: Client, 76 75 } 77 76 78 77 impl Proxy { 79 - pub fn new(repo: Repo, identity: Identity) -> Self { 78 + pub fn new(identity: Identity) -> Self { 80 79 let client = Client::builder() 81 80 .user_agent(format!( 82 81 "microcosm slingshot v{} (contact: @bad-example.com)", ··· 86 85 .timeout(Duration::from_secs(6)) 87 86 .build() 88 87 .unwrap(); 89 - Self { 90 - repo, 91 - client, 92 - identity, 93 - } 88 + Self { client, identity } 94 89 } 95 90 96 91 pub async fn proxy( ··· 98 93 service_did: &Did, 99 94 service_id: &str, 100 95 xrpc: &Nsid, 96 + authorization: Option<&str>, 97 + atproto_accept_labelers: Option<&str>, 101 98 params: Option<Map<String, Value>>, 102 99 ) -> Result<Value, ProxyError> { 103 100 let mut upstream: Url = self ··· 136 133 } 137 134 } 138 135 139 - // TODO: other headers to proxy 140 - Ok(self 136 + // TODO i mean maybe we should look for headers also in our headers but not obviously 137 + let mut headers = reqwest::header::HeaderMap::new(); 138 + if let Some(auth) = authorization { 139 + headers.insert("Authorization", auth.try_into()?); 140 + } 141 + if let Some(aal) = atproto_accept_labelers { 142 + headers.insert("atproto-accept-labelers", aal.try_into()?); 143 + } 144 + 145 + let t0 = std::time::Instant::now(); 146 + let res = self 141 147 .client 142 148 .get(upstream) 149 + .headers(headers) 143 150 .send() 144 - .await? 145 - .error_for_status()? 146 - .json() 147 - .await?) 151 + .await 152 + .and_then(|r| r.error_for_status()); 153 + 154 + if res.is_ok() { 155 + metrics::histogram!("slingshot_proxy_upstream_request", "success" => "true") 156 + .record(t0.elapsed()); 157 + } else { 158 + metrics::histogram!("slingshot_proxy_upstream_request", "success" => "false") 159 + .record(t0.elapsed()); 160 + } 161 + 162 + Ok(res?.json().await?) 148 163 } 149 164 } 150 165
+47 -12
slingshot/src/server.rs
··· 320 320 xrpc: String, 321 321 /// The destination service the request will be forwarded to 322 322 atproto_proxy: String, 323 + /// An optional auth token to pass on 324 + /// 325 + /// the `aud` field must match the upstream atproto_proxy service 326 + authorization: Option<String>, 327 + /// An optional set of labelers to request be applied by the upstream 328 + atproto_accept_labelers: Option<String>, 323 329 /// The `params` for the destination service XRPC endpoint 324 330 /// 325 331 /// Currently this will be passed along unchecked, but a future version of ··· 335 341 Self { 336 342 xrpc: "app.bsky.feed.getFeedSkeleton".to_string(), 337 343 atproto_proxy: "did:web:blue.mackuba.eu#bsky_fg".to_string(), 344 + authorization: None, 345 + atproto_accept_labelers: None, 338 346 params: Some(serde_json::json!({ 339 347 "feed": "at://did:plc:oio4hkxaop4ao4wz2pp3f4cr/app.bsky.feed.generator/atproto", 340 348 })), ··· 822 830 823 831 match self 824 832 .proxy 825 - .proxy(&service_did, &format!("#{id_fragment}"), &xrpc, params) 833 + .proxy( 834 + &service_did, 835 + &format!("#{id_fragment}"), 836 + &xrpc, 837 + payload.authorization.as_deref(), 838 + payload.atproto_accept_labelers.as_deref(), 839 + params, 840 + ) 826 841 .await 827 842 { 828 843 Ok(skeleton) => { ··· 845 860 } 846 861 847 862 let (tx, mut rx) = mpsc::channel(1); 863 + 864 + let t0 = Instant::now(); 848 865 849 866 for link in links { 850 867 match link { ··· 914 931 }) 915 932 } 916 933 }; 917 - tx.send(GetThing::Record(uri, res)).await 934 + if tx.send(GetThing::Record(uri, res)).await.is_ok() { 935 + metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "true").increment(1); 936 + } else { 937 + metrics::counter!("slingshot_hydrated_one", "type" => "record", "ontime" => "false").increment(1); 938 + } 918 939 }); 919 940 } 920 941 MatchedRef::Identifier(id) => { ··· 946 967 }) 947 968 } 948 969 }; 949 - tx.send(GetThing::Identifier(id, res)).await 970 + if tx.send(GetThing::Identifier(id, res)).await.is_ok() { 971 + metrics::counter!("slingshot_hydrated_one", "type" => "identity", "ontime" => "true").increment(1); 972 + } else { 973 + metrics::counter!("slingshot_hydrated_one", "type" => "identity", "ontime" => "false").increment(1); 974 + } 950 975 }); 951 976 } 952 977 } ··· 955 980 // (we shoudl be doing a timeout...) 956 981 drop(tx); 957 982 958 - while let Some(hydration) = rx.recv().await { 959 - match hydration { 960 - GetThing::Record(uri, h) => { 961 - records.insert(uri, h); 962 - } 963 - GetThing::Identifier(uri, md) => { 964 - identifiers.insert(uri, md); 965 - } 966 - }; 983 + let deadline = t0 + std::time::Duration::from_secs_f64(1.6); 984 + let res = tokio::time::timeout_at(deadline.into(), async { 985 + while let Some(hydration) = rx.recv().await { 986 + match hydration { 987 + GetThing::Record(uri, h) => { 988 + records.insert(uri, h); 989 + } 990 + GetThing::Identifier(uri, md) => { 991 + identifiers.insert(uri, md); 992 + } 993 + }; 994 + } 995 + }) 996 + .await; 997 + 998 + if res.is_ok() { 999 + metrics::histogram!("slingshot_hydration_all_completed").record(t0.elapsed()); 1000 + } else { 1001 + metrics::counter!("slingshot_hydration_cut_off").increment(1); 967 1002 } 968 1003 969 1004 ProxyHydrateResponse::Ok(Json(ProxyHydrateResponseObject {