An encrypted personal cloud built on the AT Protocol.

Split client/xrpc.rs into xrpc/ module directory

Break the monolithic xrpc.rs (453 lines) into focused submodules:
xrpc/mod.rs (struct, types, check_response, private helpers),
auth.rs (login, refresh_session), blobs.rs (upload_blob, get_blob),
repo.rs (create/put/get/list/delete record). Test file moved into
module directory.

+478 -452
-452
crates/opake-core/src/client/xrpc.rs
··· 1 - // XRPC client for talking to a PDS. 2 - // 3 - // Protocol logic for authenticated XRPC calls, generic over transport. 4 - // Handles session management, token refresh, and the standard atproto 5 - // repo operations (create/get/list/delete records, upload/get blobs). 6 - 7 - use log::{debug, info, warn}; 8 - use serde::{Deserialize, Serialize}; 9 - 10 - use super::transport::*; 11 - use crate::atproto::BlobRef; 12 - use crate::error::Error; 13 - 14 - // --------------------------------------------------------------------------- 15 - // Types 16 - // --------------------------------------------------------------------------- 17 - 18 - /// An authenticated session with a PDS. 19 - #[derive(Debug, Clone, Serialize, Deserialize)] 20 - #[serde(rename_all = "camelCase")] 21 - pub struct Session { 22 - pub did: String, 23 - pub handle: String, 24 - pub access_jwt: String, 25 - pub refresh_jwt: String, 26 - } 27 - 28 - /// Reference to a created record. 29 - #[derive(Debug, Clone, Serialize, Deserialize)] 30 - pub struct RecordRef { 31 - pub uri: String, 32 - pub cid: String, 33 - } 34 - 35 - /// A page of records from `listRecords`. 36 - #[derive(Debug, Clone, Serialize, Deserialize)] 37 - pub struct RecordPage { 38 - pub records: Vec<RecordEntry>, 39 - pub cursor: Option<String>, 40 - } 41 - 42 - #[derive(Debug, Clone, Serialize, Deserialize)] 43 - pub struct RecordEntry { 44 - pub uri: String, 45 - pub cid: String, 46 - pub value: serde_json::Value, 47 - } 48 - 49 - // --------------------------------------------------------------------------- 50 - // Response checking — used by both XrpcClient and the DID/public functions 51 - // --------------------------------------------------------------------------- 52 - 53 - /// Check an XRPC response for errors. Non-2xx responses are parsed as 54 - /// XRPC error bodies (`{"error":"...", "message":"..."}`) when possible, 55 - /// falling back to the raw status code. 56 - pub fn check_response(response: &HttpResponse) -> Result<(), Error> { 57 - if (200..300).contains(&response.status) { 58 - return Ok(()); 59 - } 60 - 61 - #[derive(Deserialize)] 62 - struct XrpcError { 63 - error: Option<String>, 64 - message: Option<String>, 65 - } 66 - 67 - let message = serde_json::from_slice::<XrpcError>(&response.body) 68 - .ok() 69 - .and_then(|e| match (e.error, e.message) { 70 - (Some(code), Some(msg)) => Some(format!("{code}: {msg}")), 71 - (Some(code), None) => Some(code), 72 - (None, Some(msg)) => Some(msg), 73 - (None, None) => None, 74 - }) 75 - .unwrap_or_else(|| format!("HTTP {}", response.status)); 76 - 77 - if response.status == 404 { 78 - Err(Error::NotFound(message)) 79 - } else { 80 - Err(Error::Xrpc { 81 - status: response.status, 82 - message, 83 - }) 84 - } 85 - } 86 - 87 - // --------------------------------------------------------------------------- 88 - // XrpcClient 89 - // --------------------------------------------------------------------------- 90 - 91 - pub struct XrpcClient<T: Transport> { 92 - transport: T, 93 - base_url: String, 94 - session: Option<Session>, 95 - session_refreshed: bool, 96 - } 97 - 98 - impl<T: Transport> XrpcClient<T> { 99 - pub fn new(transport: T, base_url: String) -> Self { 100 - Self { 101 - transport, 102 - base_url, 103 - session: None, 104 - session_refreshed: false, 105 - } 106 - } 107 - 108 - pub fn with_session(transport: T, base_url: String, session: Session) -> Self { 109 - Self { 110 - transport, 111 - base_url, 112 - session: Some(session), 113 - session_refreshed: false, 114 - } 115 - } 116 - 117 - pub fn session(&self) -> Option<&Session> { 118 - self.session.as_ref() 119 - } 120 - 121 - /// Whether the session was refreshed during this client's lifetime. 122 - /// The CLI uses this to persist updated tokens to disk. 123 - pub fn session_refreshed(&self) -> bool { 124 - self.session_refreshed 125 - } 126 - 127 - /// Authenticate via `com.atproto.server.createSession`. 128 - pub async fn login(&mut self, identifier: &str, password: &str) -> Result<&Session, Error> { 129 - info!("authenticating as {} against {}", identifier, self.base_url); 130 - 131 - let body = serde_json::json!({ 132 - "identifier": identifier, 133 - "password": password, 134 - }); 135 - 136 - let response = self 137 - .transport 138 - .send(HttpRequest { 139 - method: HttpMethod::Post, 140 - url: format!("{}/xrpc/com.atproto.server.createSession", self.base_url), 141 - headers: vec![("Content-Type".into(), "application/json".into())], 142 - body: Some(RequestBody::Json(body)), 143 - }) 144 - .await?; 145 - 146 - if response.status != 200 { 147 - warn!("login failed with HTTP {}", response.status); 148 - return Err(Error::Auth(format!( 149 - "login failed (HTTP {})", 150 - response.status 151 - ))); 152 - } 153 - 154 - let session: Session = serde_json::from_slice(&response.body)?; 155 - info!("authenticated as {} ({})", session.handle, session.did); 156 - self.session = Some(session); 157 - Ok(self.session.as_ref().unwrap()) 158 - } 159 - 160 - fn auth_header(&self) -> Result<(String, String), Error> { 161 - let session = self 162 - .session 163 - .as_ref() 164 - .ok_or_else(|| Error::Auth("not logged in".into()))?; 165 - Ok(( 166 - "Authorization".into(), 167 - format!("Bearer {}", session.access_jwt), 168 - )) 169 - } 170 - 171 - fn did(&self) -> Result<&str, Error> { 172 - self.session 173 - .as_ref() 174 - .map(|s| s.did.as_str()) 175 - .ok_or_else(|| Error::Auth("not logged in".into())) 176 - } 177 - 178 - /// Replace the Authorization header in a request with the current access token. 179 - fn replace_auth_header(&self, mut request: HttpRequest) -> Result<HttpRequest, Error> { 180 - let (key, value) = self.auth_header()?; 181 - if let Some(h) = request.headers.iter_mut().find(|(k, _)| k == &key) { 182 - h.1 = value; 183 - } 184 - Ok(request) 185 - } 186 - 187 - /// Check whether a PDS response is an expired-token error. 188 - fn is_expired_token(response: &HttpResponse) -> bool { 189 - if response.status != 400 { 190 - return false; 191 - } 192 - 193 - #[derive(Deserialize)] 194 - struct Body { 195 - error: Option<String>, 196 - } 197 - 198 - serde_json::from_slice::<Body>(&response.body) 199 - .ok() 200 - .and_then(|b| b.error) 201 - .is_some_and(|e| e == "ExpiredToken") 202 - } 203 - 204 - /// Refresh the session using the stored refresh_jwt. 205 - async fn refresh_session(&mut self) -> Result<(), Error> { 206 - let refresh_jwt = self 207 - .session 208 - .as_ref() 209 - .map(|s| s.refresh_jwt.clone()) 210 - .ok_or_else(|| Error::Auth("not logged in".into()))?; 211 - 212 - info!("access token expired, refreshing session"); 213 - 214 - let response = self 215 - .transport 216 - .send(HttpRequest { 217 - method: HttpMethod::Post, 218 - url: format!("{}/xrpc/com.atproto.server.refreshSession", self.base_url), 219 - headers: vec![("Authorization".into(), format!("Bearer {}", refresh_jwt))], 220 - body: None, 221 - }) 222 - .await?; 223 - 224 - if response.status != 200 { 225 - warn!("session refresh failed with HTTP {}", response.status); 226 - return Err(Error::Auth(format!( 227 - "session refresh failed (HTTP {}) — run `opake login` again", 228 - response.status 229 - ))); 230 - } 231 - 232 - let new_session: Session = serde_json::from_slice(&response.body)?; 233 - info!("session refreshed for {}", new_session.handle); 234 - self.session = Some(new_session); 235 - self.session_refreshed = true; 236 - 237 - Ok(()) 238 - } 239 - 240 - /// Send a request and check the response status. Every XRPC method except 241 - /// `login` (which has custom error handling) goes through here. 242 - /// 243 - /// If the PDS returns `ExpiredToken`, the session is automatically refreshed 244 - /// and the request is retried once with the new access token. 245 - async fn send_checked(&mut self, request: HttpRequest) -> Result<HttpResponse, Error> { 246 - let mut response = self.transport.send(request.clone()).await?; 247 - 248 - if Self::is_expired_token(&response) { 249 - self.refresh_session().await?; 250 - let retried = self.replace_auth_header(request)?; 251 - response = self.transport.send(retried).await?; 252 - } 253 - 254 - check_response(&response)?; 255 - Ok(response) 256 - } 257 - 258 - /// Upload raw bytes as a blob via `com.atproto.repo.uploadBlob`. 259 - pub async fn upload_blob(&mut self, data: Vec<u8>, mime_type: &str) -> Result<BlobRef, Error> { 260 - debug!("uploading blob ({} bytes, {})", data.len(), mime_type); 261 - let auth = self.auth_header()?; 262 - 263 - let response = self 264 - .send_checked(HttpRequest { 265 - method: HttpMethod::Post, 266 - url: format!("{}/xrpc/com.atproto.repo.uploadBlob", self.base_url), 267 - headers: vec![auth, ("Content-Type".into(), mime_type.into())], 268 - body: Some(RequestBody::Bytes { 269 - data, 270 - content_type: mime_type.into(), 271 - }), 272 - }) 273 - .await?; 274 - 275 - #[derive(Deserialize)] 276 - struct UploadResponse { 277 - blob: BlobRef, 278 - } 279 - 280 - let parsed: UploadResponse = serde_json::from_slice(&response.body)?; 281 - Ok(parsed.blob) 282 - } 283 - 284 - /// Fetch a blob by DID + CID via `com.atproto.sync.getBlob`. 285 - pub async fn get_blob(&mut self, did: &str, cid: &str) -> Result<Vec<u8>, Error> { 286 - debug!("fetching blob did={} cid={}", did, cid); 287 - let auth = self.auth_header()?; 288 - let url = format!( 289 - "{}/xrpc/com.atproto.sync.getBlob?did={}&cid={}", 290 - self.base_url, did, cid, 291 - ); 292 - 293 - let response = self 294 - .send_checked(HttpRequest { 295 - method: HttpMethod::Get, 296 - url, 297 - headers: vec![auth], 298 - body: None, 299 - }) 300 - .await?; 301 - 302 - Ok(response.body) 303 - } 304 - 305 - /// Create a record via `com.atproto.repo.createRecord`. 306 - pub async fn create_record<R: Serialize>( 307 - &mut self, 308 - collection: &str, 309 - record: &R, 310 - ) -> Result<RecordRef, Error> { 311 - debug!("creating record in {}", collection); 312 - let auth = self.auth_header()?; 313 - let did = self.did()?; 314 - 315 - let body = serde_json::json!({ 316 - "repo": did, 317 - "collection": collection, 318 - "record": record, 319 - }); 320 - 321 - let response = self 322 - .send_checked(HttpRequest { 323 - method: HttpMethod::Post, 324 - url: format!("{}/xrpc/com.atproto.repo.createRecord", self.base_url), 325 - headers: vec![auth, ("Content-Type".into(), "application/json".into())], 326 - body: Some(RequestBody::Json(body)), 327 - }) 328 - .await?; 329 - 330 - Ok(serde_json::from_slice(&response.body)?) 331 - } 332 - 333 - /// Upsert a record with an explicit rkey via `com.atproto.repo.putRecord`. 334 - /// 335 - /// Idempotent — creates or overwrites the record at `collection/rkey`. 336 - /// Used for singleton records like `app.opake.cloud.publicKey/self`. 337 - pub async fn put_record<R: Serialize>( 338 - &mut self, 339 - collection: &str, 340 - rkey: &str, 341 - record: &R, 342 - ) -> Result<RecordRef, Error> { 343 - debug!("putting record {}/{}", collection, rkey); 344 - let auth = self.auth_header()?; 345 - let did = self.did()?; 346 - 347 - let body = serde_json::json!({ 348 - "repo": did, 349 - "collection": collection, 350 - "rkey": rkey, 351 - "record": record, 352 - }); 353 - 354 - let response = self 355 - .send_checked(HttpRequest { 356 - method: HttpMethod::Post, 357 - url: format!("{}/xrpc/com.atproto.repo.putRecord", self.base_url), 358 - headers: vec![auth, ("Content-Type".into(), "application/json".into())], 359 - body: Some(RequestBody::Json(body)), 360 - }) 361 - .await?; 362 - 363 - Ok(serde_json::from_slice(&response.body)?) 364 - } 365 - 366 - /// Fetch a single record via `com.atproto.repo.getRecord`. 367 - pub async fn get_record( 368 - &mut self, 369 - did: &str, 370 - collection: &str, 371 - rkey: &str, 372 - ) -> Result<RecordEntry, Error> { 373 - debug!("getting record {}/{}/{}", did, collection, rkey); 374 - let auth = self.auth_header()?; 375 - let url = format!( 376 - "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection={}&rkey={}", 377 - self.base_url, did, collection, rkey, 378 - ); 379 - 380 - let response = self 381 - .send_checked(HttpRequest { 382 - method: HttpMethod::Get, 383 - url, 384 - headers: vec![auth], 385 - body: None, 386 - }) 387 - .await?; 388 - 389 - Ok(serde_json::from_slice(&response.body)?) 390 - } 391 - 392 - /// List records in a collection via `com.atproto.repo.listRecords`. 393 - pub async fn list_records( 394 - &mut self, 395 - collection: &str, 396 - limit: Option<u32>, 397 - cursor: Option<&str>, 398 - ) -> Result<RecordPage, Error> { 399 - debug!("listing records in {}", collection); 400 - let auth = self.auth_header()?; 401 - let did = self.did()?; 402 - 403 - let mut url = format!( 404 - "{}/xrpc/com.atproto.repo.listRecords?repo={}&collection={}", 405 - self.base_url, did, collection, 406 - ); 407 - if let Some(limit) = limit { 408 - url.push_str(&format!("&limit={}", limit)); 409 - } 410 - if let Some(cursor) = cursor { 411 - url.push_str(&format!("&cursor={}", cursor)); 412 - } 413 - 414 - let response = self 415 - .send_checked(HttpRequest { 416 - method: HttpMethod::Get, 417 - url, 418 - headers: vec![auth], 419 - body: None, 420 - }) 421 - .await?; 422 - 423 - Ok(serde_json::from_slice(&response.body)?) 424 - } 425 - 426 - /// Delete a record via `com.atproto.repo.deleteRecord`. 427 - pub async fn delete_record(&mut self, collection: &str, rkey: &str) -> Result<(), Error> { 428 - debug!("deleting record {}/{}", collection, rkey); 429 - let auth = self.auth_header()?; 430 - let did = self.did()?; 431 - 432 - let body = serde_json::json!({ 433 - "repo": did, 434 - "collection": collection, 435 - "rkey": rkey, 436 - }); 437 - 438 - self.send_checked(HttpRequest { 439 - method: HttpMethod::Post, 440 - url: format!("{}/xrpc/com.atproto.repo.deleteRecord", self.base_url), 441 - headers: vec![auth, ("Content-Type".into(), "application/json".into())], 442 - body: Some(RequestBody::Json(body)), 443 - }) 444 - .await?; 445 - 446 - Ok(()) 447 - } 448 - } 449 - 450 - #[cfg(test)] 451 - #[path = "xrpc_tests.rs"] 452 - mod tests;
+76
crates/opake-core/src/client/xrpc/auth.rs
··· 1 + use log::{info, warn}; 2 + 3 + use super::{Session, Transport}; 4 + use crate::client::transport::*; 5 + use crate::error::Error; 6 + 7 + impl<T: Transport> super::XrpcClient<T> { 8 + /// Authenticate via `com.atproto.server.createSession`. 9 + pub async fn login(&mut self, identifier: &str, password: &str) -> Result<&Session, Error> { 10 + info!("authenticating as {} against {}", identifier, self.base_url); 11 + 12 + let body = serde_json::json!({ 13 + "identifier": identifier, 14 + "password": password, 15 + }); 16 + 17 + let response = self 18 + .transport 19 + .send(HttpRequest { 20 + method: HttpMethod::Post, 21 + url: format!("{}/xrpc/com.atproto.server.createSession", self.base_url), 22 + headers: vec![("Content-Type".into(), "application/json".into())], 23 + body: Some(RequestBody::Json(body)), 24 + }) 25 + .await?; 26 + 27 + if response.status != 200 { 28 + warn!("login failed with HTTP {}", response.status); 29 + return Err(Error::Auth(format!( 30 + "login failed (HTTP {})", 31 + response.status 32 + ))); 33 + } 34 + 35 + let session: Session = serde_json::from_slice(&response.body)?; 36 + info!("authenticated as {} ({})", session.handle, session.did); 37 + self.session = Some(session); 38 + Ok(self.session.as_ref().unwrap()) 39 + } 40 + 41 + /// Refresh the session using the stored refresh_jwt. 42 + pub(crate) async fn refresh_session(&mut self) -> Result<(), Error> { 43 + let refresh_jwt = self 44 + .session 45 + .as_ref() 46 + .map(|s| s.refresh_jwt.clone()) 47 + .ok_or_else(|| Error::Auth("not logged in".into()))?; 48 + 49 + info!("access token expired, refreshing session"); 50 + 51 + let response = self 52 + .transport 53 + .send(HttpRequest { 54 + method: HttpMethod::Post, 55 + url: format!("{}/xrpc/com.atproto.server.refreshSession", self.base_url), 56 + headers: vec![("Authorization".into(), format!("Bearer {}", refresh_jwt))], 57 + body: None, 58 + }) 59 + .await?; 60 + 61 + if response.status != 200 { 62 + warn!("session refresh failed with HTTP {}", response.status); 63 + return Err(Error::Auth(format!( 64 + "session refresh failed (HTTP {}) — run `opake login` again", 65 + response.status 66 + ))); 67 + } 68 + 69 + let new_session: Session = serde_json::from_slice(&response.body)?; 70 + info!("session refreshed for {}", new_session.handle); 71 + self.session = Some(new_session); 72 + self.session_refreshed = true; 73 + 74 + Ok(()) 75 + } 76 + }
+56
crates/opake-core/src/client/xrpc/blobs.rs
··· 1 + use log::debug; 2 + use serde::Deserialize; 3 + 4 + use super::Transport; 5 + use crate::atproto::BlobRef; 6 + use crate::client::transport::*; 7 + use crate::error::Error; 8 + 9 + impl<T: Transport> super::XrpcClient<T> { 10 + /// Upload raw bytes as a blob via `com.atproto.repo.uploadBlob`. 11 + pub async fn upload_blob(&mut self, data: Vec<u8>, mime_type: &str) -> Result<BlobRef, Error> { 12 + debug!("uploading blob ({} bytes, {})", data.len(), mime_type); 13 + let auth = self.auth_header()?; 14 + 15 + let response = self 16 + .send_checked(HttpRequest { 17 + method: HttpMethod::Post, 18 + url: format!("{}/xrpc/com.atproto.repo.uploadBlob", self.base_url), 19 + headers: vec![auth, ("Content-Type".into(), mime_type.into())], 20 + body: Some(RequestBody::Bytes { 21 + data, 22 + content_type: mime_type.into(), 23 + }), 24 + }) 25 + .await?; 26 + 27 + #[derive(Deserialize)] 28 + struct UploadResponse { 29 + blob: BlobRef, 30 + } 31 + 32 + let parsed: UploadResponse = serde_json::from_slice(&response.body)?; 33 + Ok(parsed.blob) 34 + } 35 + 36 + /// Fetch a blob by DID + CID via `com.atproto.sync.getBlob`. 37 + pub async fn get_blob(&mut self, did: &str, cid: &str) -> Result<Vec<u8>, Error> { 38 + debug!("fetching blob did={} cid={}", did, cid); 39 + let auth = self.auth_header()?; 40 + let url = format!( 41 + "{}/xrpc/com.atproto.sync.getBlob?did={}&cid={}", 42 + self.base_url, did, cid, 43 + ); 44 + 45 + let response = self 46 + .send_checked(HttpRequest { 47 + method: HttpMethod::Get, 48 + url, 49 + headers: vec![auth], 50 + body: None, 51 + }) 52 + .await?; 53 + 54 + Ok(response.body) 55 + } 56 + }
+194
crates/opake-core/src/client/xrpc/mod.rs
··· 1 + // XRPC client for talking to a PDS. 2 + // 3 + // Protocol logic for authenticated XRPC calls, generic over transport. 4 + // Handles session management, token refresh, and the standard atproto 5 + // repo operations (create/get/list/delete records, upload/get blobs). 6 + 7 + mod auth; 8 + mod blobs; 9 + mod repo; 10 + 11 + use serde::{Deserialize, Serialize}; 12 + 13 + use super::transport::*; 14 + use crate::error::Error; 15 + 16 + // --------------------------------------------------------------------------- 17 + // Types 18 + // --------------------------------------------------------------------------- 19 + 20 + /// An authenticated session with a PDS. 21 + #[derive(Debug, Clone, Serialize, Deserialize)] 22 + #[serde(rename_all = "camelCase")] 23 + pub struct Session { 24 + pub did: String, 25 + pub handle: String, 26 + pub access_jwt: String, 27 + pub refresh_jwt: String, 28 + } 29 + 30 + /// Reference to a created record. 31 + #[derive(Debug, Clone, Serialize, Deserialize)] 32 + pub struct RecordRef { 33 + pub uri: String, 34 + pub cid: String, 35 + } 36 + 37 + /// A page of records from `listRecords`. 38 + #[derive(Debug, Clone, Serialize, Deserialize)] 39 + pub struct RecordPage { 40 + pub records: Vec<RecordEntry>, 41 + pub cursor: Option<String>, 42 + } 43 + 44 + #[derive(Debug, Clone, Serialize, Deserialize)] 45 + pub struct RecordEntry { 46 + pub uri: String, 47 + pub cid: String, 48 + pub value: serde_json::Value, 49 + } 50 + 51 + // --------------------------------------------------------------------------- 52 + // Response checking — used by both XrpcClient and the DID/public functions 53 + // --------------------------------------------------------------------------- 54 + 55 + /// Check an XRPC response for errors. Non-2xx responses are parsed as 56 + /// XRPC error bodies (`{"error":"...", "message":"..."}`) when possible, 57 + /// falling back to the raw status code. 58 + pub fn check_response(response: &HttpResponse) -> Result<(), Error> { 59 + if (200..300).contains(&response.status) { 60 + return Ok(()); 61 + } 62 + 63 + #[derive(Deserialize)] 64 + struct XrpcError { 65 + error: Option<String>, 66 + message: Option<String>, 67 + } 68 + 69 + let message = serde_json::from_slice::<XrpcError>(&response.body) 70 + .ok() 71 + .and_then(|e| match (e.error, e.message) { 72 + (Some(code), Some(msg)) => Some(format!("{code}: {msg}")), 73 + (Some(code), None) => Some(code), 74 + (None, Some(msg)) => Some(msg), 75 + (None, None) => None, 76 + }) 77 + .unwrap_or_else(|| format!("HTTP {}", response.status)); 78 + 79 + if response.status == 404 { 80 + Err(Error::NotFound(message)) 81 + } else { 82 + Err(Error::Xrpc { 83 + status: response.status, 84 + message, 85 + }) 86 + } 87 + } 88 + 89 + // --------------------------------------------------------------------------- 90 + // XrpcClient 91 + // --------------------------------------------------------------------------- 92 + 93 + pub struct XrpcClient<T: Transport> { 94 + transport: T, 95 + base_url: String, 96 + session: Option<Session>, 97 + session_refreshed: bool, 98 + } 99 + 100 + impl<T: Transport> XrpcClient<T> { 101 + pub fn new(transport: T, base_url: String) -> Self { 102 + Self { 103 + transport, 104 + base_url, 105 + session: None, 106 + session_refreshed: false, 107 + } 108 + } 109 + 110 + pub fn with_session(transport: T, base_url: String, session: Session) -> Self { 111 + Self { 112 + transport, 113 + base_url, 114 + session: Some(session), 115 + session_refreshed: false, 116 + } 117 + } 118 + 119 + pub fn session(&self) -> Option<&Session> { 120 + self.session.as_ref() 121 + } 122 + 123 + /// Whether the session was refreshed during this client's lifetime. 124 + /// The CLI uses this to persist updated tokens to disk. 125 + pub fn session_refreshed(&self) -> bool { 126 + self.session_refreshed 127 + } 128 + 129 + fn auth_header(&self) -> Result<(String, String), Error> { 130 + let session = self 131 + .session 132 + .as_ref() 133 + .ok_or_else(|| Error::Auth("not logged in".into()))?; 134 + Ok(( 135 + "Authorization".into(), 136 + format!("Bearer {}", session.access_jwt), 137 + )) 138 + } 139 + 140 + fn did(&self) -> Result<&str, Error> { 141 + self.session 142 + .as_ref() 143 + .map(|s| s.did.as_str()) 144 + .ok_or_else(|| Error::Auth("not logged in".into())) 145 + } 146 + 147 + /// Replace the Authorization header in a request with the current access token. 148 + fn replace_auth_header(&self, mut request: HttpRequest) -> Result<HttpRequest, Error> { 149 + let (key, value) = self.auth_header()?; 150 + if let Some(h) = request.headers.iter_mut().find(|(k, _)| k == &key) { 151 + h.1 = value; 152 + } 153 + Ok(request) 154 + } 155 + 156 + /// Check whether a PDS response is an expired-token error. 157 + fn is_expired_token(response: &HttpResponse) -> bool { 158 + if response.status != 400 { 159 + return false; 160 + } 161 + 162 + #[derive(Deserialize)] 163 + struct Body { 164 + error: Option<String>, 165 + } 166 + 167 + serde_json::from_slice::<Body>(&response.body) 168 + .ok() 169 + .and_then(|b| b.error) 170 + .is_some_and(|e| e == "ExpiredToken") 171 + } 172 + 173 + /// Send a request and check the response status. Every XRPC method except 174 + /// `login` (which has custom error handling) goes through here. 175 + /// 176 + /// If the PDS returns `ExpiredToken`, the session is automatically refreshed 177 + /// and the request is retried once with the new access token. 178 + async fn send_checked(&mut self, request: HttpRequest) -> Result<HttpResponse, Error> { 179 + let mut response = self.transport.send(request.clone()).await?; 180 + 181 + if Self::is_expired_token(&response) { 182 + self.refresh_session().await?; 183 + let retried = self.replace_auth_header(request)?; 184 + response = self.transport.send(retried).await?; 185 + } 186 + 187 + check_response(&response)?; 188 + Ok(response) 189 + } 190 + } 191 + 192 + #[cfg(test)] 193 + #[path = "xrpc_tests.rs"] 194 + mod tests;
+152
crates/opake-core/src/client/xrpc/repo.rs
··· 1 + use log::debug; 2 + use serde::Serialize; 3 + 4 + use super::{RecordEntry, RecordPage, RecordRef, Transport}; 5 + use crate::client::transport::*; 6 + use crate::error::Error; 7 + 8 + impl<T: Transport> super::XrpcClient<T> { 9 + /// Create a record via `com.atproto.repo.createRecord`. 10 + pub async fn create_record<R: Serialize>( 11 + &mut self, 12 + collection: &str, 13 + record: &R, 14 + ) -> Result<RecordRef, Error> { 15 + debug!("creating record in {}", collection); 16 + let auth = self.auth_header()?; 17 + let did = self.did()?; 18 + 19 + let body = serde_json::json!({ 20 + "repo": did, 21 + "collection": collection, 22 + "record": record, 23 + }); 24 + 25 + let response = self 26 + .send_checked(HttpRequest { 27 + method: HttpMethod::Post, 28 + url: format!("{}/xrpc/com.atproto.repo.createRecord", self.base_url), 29 + headers: vec![auth, ("Content-Type".into(), "application/json".into())], 30 + body: Some(RequestBody::Json(body)), 31 + }) 32 + .await?; 33 + 34 + Ok(serde_json::from_slice(&response.body)?) 35 + } 36 + 37 + /// Upsert a record with an explicit rkey via `com.atproto.repo.putRecord`. 38 + /// 39 + /// Idempotent — creates or overwrites the record at `collection/rkey`. 40 + /// Used for singleton records like `app.opake.cloud.publicKey/self`. 41 + pub async fn put_record<R: Serialize>( 42 + &mut self, 43 + collection: &str, 44 + rkey: &str, 45 + record: &R, 46 + ) -> Result<RecordRef, Error> { 47 + debug!("putting record {}/{}", collection, rkey); 48 + let auth = self.auth_header()?; 49 + let did = self.did()?; 50 + 51 + let body = serde_json::json!({ 52 + "repo": did, 53 + "collection": collection, 54 + "rkey": rkey, 55 + "record": record, 56 + }); 57 + 58 + let response = self 59 + .send_checked(HttpRequest { 60 + method: HttpMethod::Post, 61 + url: format!("{}/xrpc/com.atproto.repo.putRecord", self.base_url), 62 + headers: vec![auth, ("Content-Type".into(), "application/json".into())], 63 + body: Some(RequestBody::Json(body)), 64 + }) 65 + .await?; 66 + 67 + Ok(serde_json::from_slice(&response.body)?) 68 + } 69 + 70 + /// Fetch a single record via `com.atproto.repo.getRecord`. 71 + pub async fn get_record( 72 + &mut self, 73 + did: &str, 74 + collection: &str, 75 + rkey: &str, 76 + ) -> Result<RecordEntry, Error> { 77 + debug!("getting record {}/{}/{}", did, collection, rkey); 78 + let auth = self.auth_header()?; 79 + let url = format!( 80 + "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection={}&rkey={}", 81 + self.base_url, did, collection, rkey, 82 + ); 83 + 84 + let response = self 85 + .send_checked(HttpRequest { 86 + method: HttpMethod::Get, 87 + url, 88 + headers: vec![auth], 89 + body: None, 90 + }) 91 + .await?; 92 + 93 + Ok(serde_json::from_slice(&response.body)?) 94 + } 95 + 96 + /// List records in a collection via `com.atproto.repo.listRecords`. 97 + pub async fn list_records( 98 + &mut self, 99 + collection: &str, 100 + limit: Option<u32>, 101 + cursor: Option<&str>, 102 + ) -> Result<RecordPage, Error> { 103 + debug!("listing records in {}", collection); 104 + let auth = self.auth_header()?; 105 + let did = self.did()?; 106 + 107 + let mut url = format!( 108 + "{}/xrpc/com.atproto.repo.listRecords?repo={}&collection={}", 109 + self.base_url, did, collection, 110 + ); 111 + if let Some(limit) = limit { 112 + url.push_str(&format!("&limit={}", limit)); 113 + } 114 + if let Some(cursor) = cursor { 115 + url.push_str(&format!("&cursor={}", cursor)); 116 + } 117 + 118 + let response = self 119 + .send_checked(HttpRequest { 120 + method: HttpMethod::Get, 121 + url, 122 + headers: vec![auth], 123 + body: None, 124 + }) 125 + .await?; 126 + 127 + Ok(serde_json::from_slice(&response.body)?) 128 + } 129 + 130 + /// Delete a record via `com.atproto.repo.deleteRecord`. 131 + pub async fn delete_record(&mut self, collection: &str, rkey: &str) -> Result<(), Error> { 132 + debug!("deleting record {}/{}", collection, rkey); 133 + let auth = self.auth_header()?; 134 + let did = self.did()?; 135 + 136 + let body = serde_json::json!({ 137 + "repo": did, 138 + "collection": collection, 139 + "rkey": rkey, 140 + }); 141 + 142 + self.send_checked(HttpRequest { 143 + method: HttpMethod::Post, 144 + url: format!("{}/xrpc/com.atproto.repo.deleteRecord", self.base_url), 145 + headers: vec![auth, ("Content-Type".into(), "application/json".into())], 146 + body: Some(RequestBody::Json(body)), 147 + }) 148 + .await?; 149 + 150 + Ok(()) 151 + } 152 + }
crates/opake-core/src/client/xrpc_tests.rs crates/opake-core/src/client/xrpc/xrpc_tests.rs