Fast and robust atproto CAR file processing in rust

well it kinda works again

+545 -340
+7
Cargo.lock
··· 705 705 checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" 706 706 707 707 [[package]] 708 + name = "hmac-sha256" 709 + version = "1.1.12" 710 + source = "registry+https://github.com/rust-lang/crates.io-index" 711 + checksum = "ad6880c8d4a9ebf39c6e8b77007ce223f646a4d21ce29d99f70cb16420545425" 712 + 713 + [[package]] 708 714 name = "interval-heap" 709 715 version = "0.0.5" 710 716 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1149 1155 "env_logger", 1150 1156 "fjall", 1151 1157 "hashbrown 0.16.1", 1158 + "hmac-sha256", 1152 1159 "iroh-car", 1153 1160 "log", 1154 1161 "mimalloc",
+5 -1
Cargo.toml
··· 15 15 serde = { version = "1.0.228", features = ["derive"] } 16 16 serde_bytes = "0.11.19" 17 17 serde_ipld_dagcbor = "0.6.4" 18 - sha2 = "0.10.9" 18 + sha2 = "0.10.9" # note: hmac-sha256 is simpler, smaller, benches ~15ns slower 19 19 thiserror = "2.0.17" 20 20 tokio = { version = "1.47.1", features = ["rt", "sync"] } 21 21 ··· 42 42 [[bench]] 43 43 name = "huge-car" 44 44 harness = false 45 + 46 + # [[bench]] 47 + # name = "leading" 48 + # harness = false
+1
benches/huge-car.rs
··· 26 26 match Driver::load_car(reader, |block| block.len().to_le_bytes().to_vec(), 1024) 27 27 .await 28 28 .unwrap() 29 + .unwrap() 29 30 { 30 31 Driver::Memory(_, mem_driver) => mem_driver, 31 32 Driver::Disk(_) => panic!("not doing disk for benchmark"),
+69
benches/leading.rs
··· 1 + use criterion::{Criterion, BatchSize, criterion_group, criterion_main}; 2 + use sha2::{Sha256, Digest}; 3 + use hmac_sha256::Hash; 4 + 5 + pub fn compute(bytes: [u8; 32]) -> u32 { 6 + let mut zeros = 0; 7 + for byte in bytes { 8 + if byte == 0 { 9 + zeros += 8 10 + } else { 11 + zeros += byte.leading_zeros(); 12 + break; 13 + } 14 + } 15 + zeros / 2 16 + } 17 + 18 + pub fn compute2(bytes: [u8; 32]) -> u32 { 19 + u128::from_be_bytes(bytes.split_at(16).0.try_into().unwrap()) 20 + .leading_zeros() / 2 21 + } 22 + 23 + fn from_key_old(key: &[u8]) -> u32 { 24 + compute2(Sha256::digest(key).into()) 25 + } 26 + 27 + fn from_key_new(key: &[u8]) -> u32 { 28 + compute2(Hash::hash(key).into()) 29 + } 30 + 31 + pub fn criterion_benchmark(c: &mut Criterion) { 32 + for (name, case) in [ 33 + ("no zeros", [0xFF; 32]), 34 + ("two zeros", [0x3F; 32]), 35 + ("some zeros", [0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]), 36 + ("many zeros", [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]), 37 + ] { 38 + let mut g = c.benchmark_group(name); 39 + g.bench_function("old", |b| { 40 + b.iter_batched( 41 + || case.clone(), 42 + |c| compute(c), 43 + BatchSize::SmallInput, 44 + ) 45 + }); 46 + g.bench_function("new", |b| { 47 + b.iter_batched( 48 + || case.clone(), 49 + |c| compute2(c), 50 + BatchSize::SmallInput, 51 + ) 52 + }); 53 + } 54 + 55 + for case in [ 56 + "a", 57 + "aa", 58 + "aaa", 59 + "aaaa", 60 + ] { 61 + let mut g = c.benchmark_group(case); 62 + g.bench_function("old", |b| b.iter(|| from_key_old(case.as_bytes()))); 63 + g.bench_function("new", |b| b.iter(|| from_key_new(case.as_bytes()))); 64 + } 65 + } 66 + 67 + 68 + criterion_group!(benches, criterion_benchmark); 69 + criterion_main!(benches);
+3 -2
benches/non-huge-cars.rs
··· 33 33 .await 34 34 .unwrap() 35 35 { 36 - Driver::Memory(_, mem_driver) => mem_driver, 37 - Driver::Disk(_) => panic!("not benching big cars here"), 36 + None => return 0, 37 + Some(Driver::Memory(_, mem_driver)) => mem_driver, 38 + Some(Driver::Disk(_)) => panic!("not benching big cars here"), 38 39 }; 39 40 40 41 let mut n = 0;
+7 -2
examples/disk-read-file/main.rs
··· 43 43 .load_car(reader) 44 44 .await? 45 45 { 46 - Driver::Memory(_, _) => panic!("try this on a bigger car"), 47 - Driver::Disk(big_stuff) => { 46 + None => panic!("empty mst! try a bigger car"), 47 + Some(Driver::Memory(_, _)) => panic!("try this on a bigger car"), 48 + Some(Driver::Disk(big_stuff)) => { 48 49 // we reach here if the repo was too big and needs to be spilled to 49 50 // disk to continue 50 51 ··· 61 62 // pop the driver back out to get some code indentation relief 62 63 driver 63 64 } 65 + }; 66 + 67 + let Some(driver) = driver else { 68 + panic!("big car but somehow empty MST: is the archive stuffed with garbage?"); 64 69 }; 65 70 66 71 // collect some random stats about the blocks
+56 -32
src/drive.rs
··· 1 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 + use crate::walk::Output; 3 4 use crate::Bytes; 4 5 use crate::HashMap; 5 6 use crate::disk::{DiskError, DiskStore}; 6 - use crate::mst::Node; 7 + use crate::mst::{Node, MstNode}; 7 8 use cid::Cid; 8 9 use iroh_car::CarReader; 9 10 use std::convert::Infallible; 10 11 use tokio::{io::AsyncRead, sync::mpsc}; 11 12 12 13 use crate::mst::Commit; 13 - use crate::walk::{Step, WalkError, Walker}; 14 + use crate::walk::{WalkError, Walker}; 14 15 15 16 /// Errors that can happen while consuming and emitting blocks and records 16 17 #[derive(Debug, thiserror::Error)] ··· 157 158 } 158 159 } 159 160 /// Begin processing an atproto MST from a CAR file 160 - pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> { 161 + pub async fn load_car<R: AsyncRead + Unpin>( 162 + &self, 163 + reader: R, 164 + ) -> Result<Option<Driver<R>>, DriveError> { 161 165 Driver::load_car(reader, noop, self.mem_limit_mb).await 162 166 } 163 167 } ··· 180 184 self 181 185 } 182 186 /// Begin processing an atproto MST from a CAR file 183 - pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> { 187 + pub async fn load_car<R: AsyncRead + Unpin>( 188 + &self, 189 + reader: R, 190 + ) -> Result<Option<Driver<R>>, DriveError> { 184 191 Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 185 192 } 186 193 } ··· 199 206 reader: R, 200 207 process: fn(Bytes) -> Bytes, 201 208 mem_limit_mb: usize, 202 - ) -> Result<Driver<R>, DriveError> { 209 + ) -> Result<Option<Driver<R>>, DriveError> { 203 210 let max_size = mem_limit_mb * 2_usize.pow(20); 204 211 let mut mem_blocks = HashMap::new(); 205 212 ··· 225 232 continue; 226 233 } 227 234 228 - let data = Bytes::from(data); 229 - 230 235 // remaining possible types: node, record, other. optimistically process 231 236 let maybe_processed = MaybeProcessedBlock::maybe(process, data); 232 237 ··· 234 239 mem_size += maybe_processed.len(); 235 240 mem_blocks.insert(cid, maybe_processed); 236 241 if mem_size >= max_size { 237 - return Ok(Driver::Disk(NeedDisk { 242 + return Ok(Some(Driver::Disk(NeedDisk { 238 243 car, 239 244 root, 240 245 process, 241 246 max_size, 242 247 mem_blocks, 243 248 commit, 244 - })); 249 + }))); 245 250 } 246 251 } 247 252 248 253 // all blocks loaded and we fit in memory! hopefully we found the commit... 249 254 let commit = commit.ok_or(DriveError::MissingCommit)?; 250 255 251 - let walker = Walker::new(commit.data); 256 + // the commit always must point to a Node; empty node => empty MST special case 257 + let node: MstNode = match mem_blocks.get(&commit.data).ok_or(DriveError::MissingCommit)? { 258 + MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 259 + MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 260 + }; 261 + if node.is_empty() { 262 + // TODO: actually we still want the commit in this case 263 + return Ok(None); 264 + } 265 + let depth = node.depth.unwrap(); 266 + 267 + let walker = Walker::new(commit.data, depth); 252 268 253 - Ok(Driver::Memory( 269 + Ok(Some(Driver::Memory( 254 270 commit, 255 271 MemDriver { 256 272 blocks: mem_blocks, 257 273 walker, 258 274 process, 259 275 }, 260 - )) 276 + ))) 261 277 } 262 278 } 263 279 ··· 287 303 let mut out = Vec::with_capacity(n); 288 304 for _ in 0..n { 289 305 // walk as far as we can until we run out of blocks or find a record 290 - match self.walker.step(&mut self.blocks, self.process)? { 291 - Step::Finish => break, 292 - Step::Found { rkey, data } => { 293 - out.push((rkey, data)); 294 - continue; 295 - } 306 + let Some(Output { rkey, cid: _, data }) = self.walker.step(&mut self.blocks, self.process)? else { 307 + break; 296 308 }; 309 + out.push((rkey, data)); 297 310 } 298 - 299 311 if out.is_empty() { 300 312 Ok(None) 301 313 } else { ··· 318 330 pub async fn finish_loading( 319 331 mut self, 320 332 mut store: DiskStore, 321 - ) -> Result<(Commit, DiskDriver), DriveError> { 333 + ) -> Result<(Commit, Option<DiskDriver>), DriveError> { 322 334 // move store in and back out so we can manage lifetimes 323 335 // dump mem blocks into the store 324 336 store = tokio::task::spawn(async move { ··· 390 402 391 403 let commit = self.commit.ok_or(DriveError::MissingCommit)?; 392 404 393 - let walker = Walker::new(commit.data); 405 + // the commit always must point to a Node; empty node => empty MST special case 406 + let db_bytes = store 407 + .get(&commit.data.to_bytes()) 408 + .map_err(|e| DriveError::StorageError(DiskError::DbError(e)))? 409 + .ok_or(DriveError::MissingCommit)?; 410 + 411 + let node: MstNode = match MaybeProcessedBlock::from_bytes(db_bytes.to_vec()) { 412 + MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 413 + MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(&bytes)?, 414 + }; 415 + if node.is_empty() { 416 + return Ok((commit, None)); 417 + } 418 + let depth = node.depth.unwrap(); 419 + 420 + let walker = Walker::new(commit.data, depth); 394 421 395 422 Ok(( 396 423 commit, 397 - DiskDriver { 424 + Some(DiskDriver { 398 425 process: self.process, 399 426 state: Some(BigState { store, walker }), 400 - }, 427 + }), 401 428 )) 402 429 } 403 430 } ··· 459 486 return (state, Err(e.into())); 460 487 } 461 488 }; 462 - match step { 463 - Step::Finish => break, 464 - Step::Found { rkey, data } => out.push((rkey, data)), 489 + let Some(Output { rkey, cid: _, data }) = step else { 490 + break; 465 491 }; 492 + out.push((rkey, data)); 466 493 } 467 494 468 495 (state, Ok::<_, DriveError>(out)) ··· 499 526 Err(e) => return tx.blocking_send(Err(e.into())), 500 527 }; 501 528 502 - match step { 503 - Step::Finish => return Ok(()), 504 - Step::Found { rkey, data } => { 505 - out.push((rkey, data)); 506 - continue; 507 - } 529 + let Some(Output { rkey, cid: _, data }) = step else { 530 + break; 508 531 }; 532 + out.push((rkey, data)); 509 533 } 510 534 511 535 if out.is_empty() {
+74 -23
src/mst.rs
··· 3 3 //! The primary aim is to work through the **tree** structure. Non-node blocks 4 4 //! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever. 5 5 6 + use sha2::{Digest, Sha256}; 6 7 use cid::Cid; 7 8 use serde::Deserialize; 8 - use crate::walk::Depth; 9 9 10 10 /// The top-level data object in a repository's tree is a signed commit. 11 11 #[derive(Debug, Deserialize)] ··· 37 37 pub sig: serde_bytes::ByteBuf, 38 38 } 39 39 40 - use serde::{de, de::{Deserializer, Visitor, MapAccess, SeqAccess}}; 40 + use serde::de::{self, Deserializer, Visitor, MapAccess, SeqAccess, Unexpected}; 41 41 use std::fmt; 42 42 43 - pub(crate) enum NodeEntry { 44 - Value(Cid, Vec<u8>), // rkey 45 - Tree(Cid, u32), // depth 43 + pub type Depth = u32; 44 + 45 + #[inline(always)] 46 + pub fn atproto_mst_depth(key: &str) -> Depth { 47 + // 128 bits oughta be enough: https://bsky.app/profile/retr0.id/post/3jwwbf4izps24 48 + u128::from_be_bytes(Sha256::digest(key).split_at(16).0.try_into().unwrap()).leading_zeros() / 2 46 49 } 47 50 51 + #[derive(Debug)] 48 52 pub(crate) struct MstNode { 49 - pub left: Option<Cid>, // a tree but we don't know the depth 50 - pub entries: Vec<NodeEntry>, 53 + pub depth: Option<Depth>, // known for nodes with entries (required for root) 54 + pub things: Vec<NodeThing>, 55 + } 56 + 57 + #[derive(Debug)] 58 + pub(crate) struct NodeThing { 59 + pub(crate) cid: Cid, 60 + pub(crate) kind: ThingKind, 61 + } 62 + 63 + #[derive(Debug)] 64 + pub(crate) enum ThingKind { 65 + Tree, 66 + Value { rkey: String }, 51 67 } 52 68 53 - pub(crate) struct Entries(pub(crate) Vec<NodeEntry>); 69 + pub(crate) struct Entries(Vec<NodeThing>, Option<Depth>); 54 70 55 71 impl<'de> Deserialize<'de> for Entries { 56 72 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> ··· 69 85 where 70 86 S: SeqAccess<'de>, 71 87 { 72 - let mut children: Vec<NodeEntry> = Vec::with_capacity(seq.size_hint().unwrap_or(5)); 88 + let mut children: Vec<NodeThing> = Vec::with_capacity(seq.size_hint().unwrap_or(5)); 73 89 let mut prefix: Vec<u8> = vec![]; 90 + let mut depth = None; 74 91 while let Some(entry) = seq.next_element::<Entry>()? { 75 92 let mut rkey: Vec<u8> = vec![]; 76 93 let pre_checked = prefix 77 94 .get(..entry.prefix_len) 78 - // .ok_or(MstError::EntryPrefixOutOfbounds)?; 79 - .ok_or_else(|| todo!()).unwrap(); 95 + .ok_or_else(|| de::Error::invalid_value( 96 + Unexpected::Bytes(&prefix), 97 + &"a prefix at least as long as the prefix_len", 98 + ))?; 80 99 81 100 rkey.extend_from_slice(pre_checked); 82 101 rkey.extend_from_slice(&entry.keysuffix); 83 - let depth = Depth::compute(&rkey); 84 102 85 - prefix = rkey.clone(); 103 + let rkey_s = String::from_utf8(rkey.clone()) 104 + .map_err(|_| de::Error::invalid_value( 105 + Unexpected::Bytes(&rkey), 106 + &"a valid utf-8 rkey", 107 + ))?; 108 + 109 + let key_depth = atproto_mst_depth(&rkey_s); 110 + if depth.is_none() { 111 + depth = Some(key_depth); 112 + } else if Some(key_depth) != depth { 113 + return Err(de::Error::invalid_value( 114 + Unexpected::Bytes(&prefix), 115 + &"all rkeys to have equal MST depth", 116 + )); 117 + } 86 118 87 - children.push(NodeEntry::Value(entry.value, rkey)); 119 + children.push(NodeThing { 120 + cid: entry.value, 121 + kind: ThingKind::Value { rkey: rkey_s }, 122 + }); 88 123 89 - if let Some(ref tree) = entry.tree { 90 - children.push(NodeEntry::Tree(*tree, depth)); 124 + if let Some(cid) = entry.tree { 125 + children.push(NodeThing { 126 + cid, 127 + kind: ThingKind::Tree, 128 + }); 91 129 } 130 + 131 + prefix = rkey; 92 132 } 93 - Ok(Entries(children)) 133 + 134 + Ok(Entries(children, depth)) 94 135 } 95 136 } 96 137 deserializer.deserialize_seq(EntriesVisitor) ··· 117 158 let mut found_left = false; 118 159 let mut left = None; 119 160 let mut found_entries = false; 120 - let mut entries = Vec::with_capacity(4); // "fanout of 4" so does this make sense???? 161 + let mut things = Vec::with_capacity(4); // "fanout of 4" so does this make sense???? 162 + let mut depth = None; 121 163 122 164 while let Some(key) = map.next_key()? { 123 165 match key { ··· 126 168 return Err(de::Error::duplicate_field("l")); 127 169 } 128 170 found_left = true; 129 - left = map.next_value()?; 171 + if let Some(cid) = map.next_value()? { 172 + left = Some(NodeThing { cid, kind: ThingKind::Tree }); 173 + } 130 174 } 131 175 "e" => { 132 176 if found_entries { 133 177 return Err(de::Error::duplicate_field("e")); 134 178 } 135 179 found_entries = true; 136 - let mut child_entries: Entries = map.next_value()?; 137 - entries.append(&mut child_entries.0); 180 + let Entries(mut child_entries, d) = map.next_value()?; 181 + things.append(&mut child_entries); 182 + depth = d; 138 183 }, 139 184 f => return Err(de::Error::unknown_field(f, NODE_FIELDS)) 140 185 } ··· 145 190 if !found_entries { 146 191 return Err(de::Error::missing_field("e")); 147 192 } 148 - Ok(MstNode { left, entries }) 193 + 194 + things.reverse(); 195 + if let Some(l) = left { 196 + things.push(l); 197 + } 198 + 199 + Ok(MstNode { depth, things }) 149 200 } 150 201 } 151 202 ··· 156 207 157 208 impl MstNode { 158 209 pub(crate) fn is_empty(&self) -> bool { 159 - self.left.is_none() && self.entries.is_empty() 210 + self.things.is_empty() 160 211 } 161 212 } 162 213
+107 -280
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 - use crate::mst::NodeEntry; 3 + use crate::mst::NodeThing; 4 + use crate::mst::ThingKind; 4 5 use crate::mst::MstNode; 6 + use crate::mst::Depth; 5 7 use crate::Bytes; 6 8 use crate::HashMap; 7 9 use crate::disk::DiskStore; 8 10 use crate::drive::MaybeProcessedBlock; 9 11 use cid::Cid; 10 - use sha2::{Digest, Sha256}; 11 12 use std::convert::Infallible; 12 13 13 14 /// Errors that can happen while walking ··· 28 29 /// Errors from invalid Rkeys 29 30 #[derive(Debug, PartialEq, thiserror::Error)] 30 31 pub enum MstError { 31 - #[error("Failed to compute an rkey due to invalid prefix_len")] 32 - EntryPrefixOutOfbounds, 33 32 #[error("RKey was not utf-8")] 34 33 EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error), 35 34 #[error("Nodes cannot be empty (except for an entirely empty MST)")] 36 35 EmptyNode, 37 - #[error("Found an entry with rkey at the wrong depth")] 38 - WrongDepth, 39 - #[error("Lost track of our depth (possible bug?)")] 40 - LostDepth, 36 + #[error("Expected node to be at depth {expected}, but it was at {depth}")] 37 + WrongDepth { depth: Depth, expected: Depth }, 41 38 #[error("MST depth underflow: depth-0 node with child trees")] 42 39 DepthUnderflow, 43 - #[error("Encountered an rkey out of order while walking the MST")] 44 - RkeyOutOfOrder, 40 + #[error("Encountered rkey {rkey:?} which cannot follow the previous: {prev:?}")] 41 + RkeyOutOfOrder { prev: String, rkey: String }, 45 42 } 46 43 47 44 /// Walker outputs 48 - #[derive(Debug)] 49 - pub enum Step { 50 - /// Reached the end of the MST! yay! 51 - Finish, 52 - /// A record was found! 53 - Found { rkey: String, data: Bytes }, 54 - } 55 - 56 - #[derive(Debug, Clone, PartialEq)] 57 - enum Need { 58 - Node { depth: Depth, cid: Cid }, 59 - Record { rkey: String, cid: Cid }, 60 - } 61 - 62 - #[derive(Debug, Clone, Copy, PartialEq)] 63 - pub enum Depth { 64 - Root, 65 - Depth(u32), 66 - } 67 - 68 - impl Depth { 69 - fn from_key(key: &[u8]) -> Self { 70 - let mut zeros = 0; 71 - for byte in Sha256::digest(key) { 72 - let leading = byte.leading_zeros(); 73 - zeros += leading; 74 - if leading < 8 { 75 - break; 76 - } 77 - } 78 - Self::Depth(zeros / 2) // truncating divide (rounds down) 79 - } 80 - fn next_expected(&self) -> Result<Option<u32>, MstError> { 81 - match self { 82 - Self::Root => Ok(None), 83 - Self::Depth(d) => d.checked_sub(1).ok_or(MstError::DepthUnderflow).map(Some), 84 - } 85 - } 86 - pub fn compute(key: &[u8]) -> u32 { 87 - let Depth::Depth(d) = Self::from_key(key) else { 88 - panic!("errr"); 89 - }; 90 - d 91 - } 92 - } 93 - 94 - fn push_from_node(stack: &mut Vec<Need>, node: &MstNode, parent_depth: Depth) -> Result<(), MstError> { 95 - // empty nodes are not allowed in the MST except in an empty MST 96 - if node.is_empty() { 97 - if parent_depth == Depth::Root { 98 - return Ok(()); // empty mst, nothing to push 99 - } else { 100 - return Err(MstError::EmptyNode); 101 - } 102 - } 103 - 104 - let mut this_depth = parent_depth.next_expected()?; 105 - 106 - for entry in node.entries.iter().rev() { 107 - // ok this loop sucks now esp with depth checking 108 - // should keep the entries together with a shared depth on the rkey 109 - // ...maybe. skipping the absent trees is nice? 110 - match entry { 111 - NodeEntry::Value(cid, rkey) => { 112 - stack.push(Need::Record { 113 - rkey: String::from_utf8(rkey.to_vec())?, 114 - cid: *cid, 115 - }); 116 - } 117 - NodeEntry::Tree(cid, depth) => { 118 - if let Some(expected) = this_depth { 119 - if *depth != expected { 120 - return Err(MstError::WrongDepth); 121 - } 122 - } else { 123 - // this_depth is `none` if we are the deepest child (directly below root) 124 - // in that case we accept whatever highest depth is claimed 125 - this_depth = Some(*depth); 126 - } 127 - stack.push(Need::Node { 128 - depth: Depth::Depth(*depth), 129 - cid: *cid, 130 - }); 131 - } 132 - } 133 - 134 - } 135 - 136 - let d = this_depth.ok_or(MstError::LostDepth)?; 137 - if let Some(tree) = node.left { 138 - stack.push(Need::Node { 139 - depth: Depth::Depth(d), 140 - cid: tree, 141 - }); 142 - } 143 - Ok(()) 45 + #[derive(Debug, PartialEq)] 46 + pub struct Output { 47 + pub rkey: String, 48 + pub cid: Cid, 49 + pub data: Bytes, 144 50 } 145 51 146 52 /// Traverser of an atproto MST ··· 148 54 /// Walks the tree from left-to-right in depth-first order 149 55 #[derive(Debug)] 150 56 pub struct Walker { 151 - stack: Vec<Need>, 152 - prev: String, 57 + prev_rkey: String, 58 + todo: Vec<(Depth, NodeThing)>, 153 59 } 154 60 155 61 impl Walker { 156 - pub fn new(tree_root_cid: Cid) -> Self { 62 + pub fn new( 63 + root_cid: Cid, 64 + depth: Depth, 65 + ) -> Self { 157 66 Self { 158 - stack: vec![Need::Node { 159 - depth: Depth::Root, 160 - cid: tree_root_cid, 161 - }], 162 - prev: "".to_string(), 67 + prev_rkey: "".to_string(), 68 + todo: vec![( 69 + depth + 1, // we're kind of inventing a fake root one above the real root 70 + // ... maybe we should just pass in the real root here??? 71 + NodeThing { 72 + cid: root_cid, 73 + kind: ThingKind::Tree, 74 + }, 75 + )], 163 76 } 164 77 } 165 78 166 - /// Advance through nodes until we find a record or can't go further 167 - pub fn step( 79 + fn mpb_step( 168 80 &mut self, 169 - blocks: &mut HashMap<Cid, MaybeProcessedBlock>, 81 + depth: Depth, 82 + kind: ThingKind, 83 + cid: Cid, 84 + mpb: &MaybeProcessedBlock, 170 85 process: impl Fn(Bytes) -> Bytes, 171 - ) -> Result<Step, WalkError> { 172 - loop { 173 - let Some(need) = self.stack.last_mut() else { 174 - log::trace!("tried to walk but we're actually done."); 175 - return Ok(Step::Finish); 176 - }; 86 + ) -> Result<Option<Output>, WalkError> { 87 + match kind { 88 + ThingKind::Value { rkey } => { 89 + let data = match mpb { 90 + MaybeProcessedBlock::Raw(data) => process(data.clone()), 91 + MaybeProcessedBlock::Processed(t) => t.clone(), 92 + }; 177 93 178 - match need { 179 - &mut Need::Node { depth, cid } => { 180 - log::trace!("need node {cid:?}"); 181 - let Some(block) = blocks.remove(&cid) else { 182 - return Err(WalkError::MissingBlock(cid)); 183 - }; 94 + if rkey <= self.prev_rkey { 95 + return Err(WalkError::MstError(MstError::RkeyOutOfOrder { 96 + rkey, 97 + prev: self.prev_rkey.clone(), 98 + })); 99 + } 100 + self.prev_rkey = rkey.clone(); 184 101 185 - let MaybeProcessedBlock::Raw(data) = block else { 186 - return Err(WalkError::BadCommitFingerprint); 187 - }; 188 - let node = serde_ipld_dagcbor::from_slice::<crate::mst::MstNode>(&data) 189 - .map_err(WalkError::BadCommit)?; 102 + Ok(Some(Output { 103 + rkey, 104 + cid, 105 + data, 106 + })) 107 + } 108 + ThingKind::Tree => { 109 + let MaybeProcessedBlock::Raw(data) = mpb else { 110 + return Err(WalkError::BadCommitFingerprint); 111 + }; 190 112 191 - // found node, make sure we remember 192 - self.stack.pop(); 113 + let node: MstNode = serde_ipld_dagcbor::from_slice(&data) 114 + .map_err(WalkError::BadCommit)?; 193 115 194 - // queue up work on the found node next 195 - push_from_node(&mut self.stack, &node, depth)?; 116 + if node.is_empty() { 117 + return Err(WalkError::MstError(MstError::EmptyNode)); 196 118 } 197 - Need::Record { rkey, cid } => { 198 - log::trace!("need record {cid:?}"); 199 - // note that we cannot *remove* a record block, sadly, since 200 - // there can be multiple rkeys pointing to the same cid. 201 - let Some(data) = blocks.get(cid) else { 202 - return Err(WalkError::MissingBlock(*cid)); 203 - }; 204 - let rkey = rkey.clone(); 205 - let data = match data { 206 - MaybeProcessedBlock::Raw(data) => process(data.clone()), 207 - MaybeProcessedBlock::Processed(t) => t.clone(), 208 - }; 209 119 210 - // found node, make sure we remember 211 - self.stack.pop(); 212 - 213 - // rkeys *must* be in order or else the tree is invalid (or 214 - // we have a bug) 215 - if rkey <= self.prev { 216 - return Err(MstError::RkeyOutOfOrder)?; 120 + let next_depth = depth.checked_sub(1).ok_or(MstError::DepthUnderflow)?; 121 + if let Some(d) = node.depth { 122 + if d != next_depth { 123 + return Err(WalkError::MstError(MstError::WrongDepth { 124 + depth: d, 125 + expected: next_depth, 126 + })); 217 127 } 218 - self.prev = rkey.clone(); 128 + } 219 129 220 - return Ok(Step::Found { rkey, data }); 130 + for thing in node.things { 131 + self.todo.push((next_depth, thing)); 221 132 } 133 + 134 + Ok(None) 222 135 } 223 136 } 224 137 } 225 138 226 - /// blocking!!!!!! 227 - pub fn disk_step( 139 + /// Advance through nodes until we find a record or can't go further 140 + pub fn step( 228 141 &mut self, 229 - reader: &mut DiskStore, 142 + blocks: &mut HashMap<Cid, MaybeProcessedBlock>, 230 143 process: impl Fn(Bytes) -> Bytes, 231 - ) -> Result<Step, WalkError> { 232 - loop { 233 - let Some(need) = self.stack.last_mut() else { 234 - log::trace!("tried to walk but we're actually done."); 235 - return Ok(Step::Finish); 144 + ) -> Result<Option<Output>, WalkError> { 145 + 146 + while let Some((depth, NodeThing { cid, kind })) = self.todo.pop() { 147 + let Some(mpb) = blocks.get(&cid) else { 148 + return Err(WalkError::MissingBlock(cid)); 236 149 }; 237 - 238 - match need { 239 - &mut Need::Node { depth, cid } => { 240 - let cid_bytes = cid.to_bytes(); 241 - log::trace!("need node {cid:?}"); 242 - let Some(block_slice) = reader.get(&cid_bytes)? else { 243 - return Err(WalkError::MissingBlock(cid)); 244 - }; 245 - 246 - let block = MaybeProcessedBlock::from_bytes(block_slice.to_vec()); 150 + if let Some(out) = self.mpb_step(depth, kind, cid, mpb, &process)? { 151 + return Ok(Some(out)); 152 + } 153 + } 247 154 248 - let MaybeProcessedBlock::Raw(data) = block else { 249 - return Err(WalkError::BadCommitFingerprint); 250 - }; 251 - let node = serde_ipld_dagcbor::from_slice::<MstNode>(&data) 252 - .map_err(WalkError::BadCommit)?; 155 + log::trace!("tried to walk but we're actually done."); 156 + Ok(None) 157 + } 253 158 254 - // found node, make sure we remember 255 - self.stack.pop(); 159 + /// blocking!!!!!! 160 + pub fn disk_step( 161 + &mut self, 162 + blocks: &mut DiskStore, 163 + process: impl Fn(Bytes) -> Bytes, 164 + ) -> Result<Option<Output>, WalkError> { 256 165 257 - // queue up work on the found node next 258 - push_from_node(&mut self.stack, &node, depth).map_err(WalkError::MstError)?; 259 - } 260 - Need::Record { rkey, cid } => { 261 - log::trace!("need record {cid:?}"); 262 - let cid_bytes = cid.to_bytes(); 263 - let Some(data_slice) = reader.get(&cid_bytes)? else { 264 - return Err(WalkError::MissingBlock(*cid)); 265 - }; 266 - let data = MaybeProcessedBlock::from_bytes(data_slice.to_vec()); 267 - let rkey = rkey.clone(); 268 - let data = match data { 269 - MaybeProcessedBlock::Raw(data) => process(data), 270 - MaybeProcessedBlock::Processed(t) => t, 271 - }; 272 - 273 - // found node, make sure we remember 274 - self.stack.pop(); 275 - 276 - log::trace!("emitting a block as a step. depth={}", self.stack.len()); 277 - 278 - // rkeys *must* be in order or else the tree is invalid (or 279 - // we have a bug) 280 - if rkey <= self.prev { 281 - return Err(MstError::RkeyOutOfOrder)?; 282 - } 283 - self.prev = rkey.clone(); 284 - 285 - return Ok(Step::Found { rkey, data }); 286 - } 166 + while let Some((depth, NodeThing { cid, kind })) = self.todo.pop() { 167 + let Some(block_slice) = blocks.get(&cid.to_bytes())? else { 168 + return Err(WalkError::MissingBlock(cid)); 169 + }; 170 + let mpb = MaybeProcessedBlock::from_bytes(block_slice.to_vec()); 171 + if let Some(out) = self.mpb_step(depth, kind, cid, &mpb, &process)? { 172 + return Ok(Some(out)); 287 173 } 288 174 } 175 + log::trace!("tried to walk but we're actually done."); 176 + Ok(None) 289 177 } 290 178 } 291 179 ··· 293 181 mod test { 294 182 use super::*; 295 183 296 - fn cid1() -> Cid { 297 - "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 298 - .parse() 299 - .unwrap() 300 - } 301 - 302 - #[test] 303 - fn test_depth_spec_0() { 304 - let d = Depth::from_key(b"2653ae71"); 305 - assert_eq!(d, Depth::Depth(0)) 306 - } 307 - 308 - #[test] 309 - fn test_depth_spec_1() { 310 - let d = Depth::from_key(b"blue"); 311 - assert_eq!(d, Depth::Depth(1)) 312 - } 313 - 314 - #[test] 315 - fn test_depth_spec_4() { 316 - let d = Depth::from_key(b"app.bsky.feed.post/454397e440ec"); 317 - assert_eq!(d, Depth::Depth(4)) 318 - } 319 - 320 - #[test] 321 - fn test_depth_spec_8() { 322 - let d = Depth::from_key(b"app.bsky.feed.post/9adeb165882c"); 323 - assert_eq!(d, Depth::Depth(8)) 324 - } 325 - 326 - #[test] 327 - fn test_depth_ietf_draft_0() { 328 - let d = Depth::from_key(b"key1"); 329 - assert_eq!(d, Depth::Depth(0)) 330 - } 331 - 332 - #[test] 333 - fn test_depth_ietf_draft_1() { 334 - let d = Depth::from_key(b"key7"); 335 - assert_eq!(d, Depth::Depth(1)) 336 - } 337 - 338 - #[test] 339 - fn test_depth_ietf_draft_4() { 340 - let d = Depth::from_key(b"key515"); 341 - assert_eq!(d, Depth::Depth(4)) 342 - } 343 - 344 - #[test] 345 - fn test_depth_interop() { 346 - // examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json 347 - for (k, expected) in [ 348 - ("", 0), 349 - ("asdf", 0), 350 - ("blue", 1), 351 - ("2653ae71", 0), 352 - ("88bfafc7", 2), 353 - ("2a92d355", 4), 354 - ("884976f5", 6), 355 - ("app.bsky.feed.post/454397e440ec", 4), 356 - ("app.bsky.feed.post/9adeb165882c", 8), 357 - ] { 358 - let d = Depth::from_key(k.as_bytes()); 359 - assert_eq!(d, Depth::Depth(expected), "key: {}", k); 360 - } 361 - } 184 + // fn cid1() -> Cid { 185 + // "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 186 + // .parse() 187 + // .unwrap() 188 + // } 362 189 363 190 // #[test] 364 191 // fn test_push_empty_fails() {
+216
tests/mst-depth.rs
··· 1 + // use repo_stream::Driver; 2 + use repo_stream::mst::atproto_mst_depth; 3 + 4 + // https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/example_keys.txt 5 + const INTEROP_EXAMPLE_KEYS: &str = "\ 6 + A0/374913 7 + A1/076595 8 + A2/827942 9 + A3/578971 10 + A4/055903 11 + A5/518415 12 + B0/601692 13 + B1/986427 14 + B2/827649 15 + B3/095483 16 + B4/774183 17 + B5/116729 18 + C0/451630 19 + C1/438573 20 + C2/014073 21 + C3/564755 22 + C4/134079 23 + C5/141153 24 + D0/952776 25 + D1/834852 26 + D2/269196 27 + D3/038750 28 + D4/052059 29 + D5/563177 30 + E0/670489 31 + E1/091396 32 + E2/819540 33 + E3/391311 34 + E4/820614 35 + E5/512478 36 + F0/697858 37 + F1/085263 38 + F2/483591 39 + F3/409933 40 + F4/789697 41 + F5/271416 42 + G0/765327 43 + G1/209912 44 + G2/611528 45 + G3/649394 46 + G4/585887 47 + G5/298495 48 + H0/131238 49 + H1/566929 50 + H2/618272 51 + H3/500151 52 + H4/841548 53 + H5/642354 54 + I0/536928 55 + I1/525517 56 + I2/800680 57 + I3/818503 58 + I4/561177 59 + I5/010047 60 + J0/453243 61 + J1/217783 62 + J2/960389 63 + J3/501274 64 + J4/042054 65 + J5/743154 66 + K0/125271 67 + K1/317361 68 + K2/453868 69 + K3/214010 70 + K4/164720 71 + K5/177856 72 + L0/502889 73 + L1/574576 74 + L2/596333 75 + L3/683657 76 + L4/724989 77 + L5/093883 78 + M0/141744 79 + M1/643368 80 + M2/919782 81 + M3/836327 82 + M4/177463 83 + M5/563354 84 + N0/370604 85 + N1/563732 86 + N2/177587 87 + N3/678428 88 + N4/599183 89 + N5/567564 90 + O0/523870 91 + O1/052141 92 + O2/037651 93 + O3/773808 94 + O4/140952 95 + O5/318605 96 + P0/133157 97 + P1/394633 98 + P2/521462 99 + P3/493488 100 + P4/908754 101 + P5/109455 102 + Q0/835234 103 + Q1/131542 104 + Q2/680035 105 + Q3/253381 106 + Q4/019053 107 + Q5/658167 108 + R0/129386 109 + R1/363149 110 + R2/742766 111 + R3/039235 112 + R4/482275 113 + R5/817312 114 + S0/340283 115 + S1/561525 116 + S2/914574 117 + S3/909434 118 + S4/789708 119 + S5/803866 120 + T0/255204 121 + T1/716687 122 + T2/256231 123 + T3/054247 124 + T4/419247 125 + T5/509584 126 + U0/298296 127 + U1/851680 128 + U2/342856 129 + U3/597327 130 + U4/311686 131 + U5/030156 132 + V0/221100 133 + V1/741554 134 + V2/267990 135 + V3/674163 136 + V4/739931 137 + V5/573718 138 + W0/034202 139 + W1/697411 140 + W2/460313 141 + W3/189647 142 + W4/847299 143 + W5/648086 144 + X0/287498 145 + X1/044093 146 + X2/613770 147 + X3/577587 148 + X4/779391 149 + X5/339246 150 + Y0/986350 151 + Y1/044567 152 + Y2/478044 153 + Y3/757097 154 + Y4/396913 155 + Y5/802264 156 + Z0/425878 157 + Z1/127557 158 + Z2/441927 159 + Z3/064474 160 + Z4/888344 161 + Z5/977983"; 162 + 163 + #[test] 164 + fn test_interop_example_keys() { 165 + for key in INTEROP_EXAMPLE_KEYS.split('\n') { 166 + let expected: u32 = key.chars().nth(1).unwrap().to_digit(16).unwrap(); 167 + let computed: u32 = atproto_mst_depth(key); 168 + assert_eq!(computed, expected); 169 + } 170 + } 171 + 172 + #[test] 173 + fn test_iterop_key_heights() { 174 + // examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json 175 + for (key, expected) in [ 176 + ("", 0), 177 + ("asdf", 0), 178 + ("blue", 1), 179 + ("2653ae71", 0), 180 + ("88bfafc7", 2), 181 + ("2a92d355", 4), 182 + ("884976f5", 6), 183 + ("app.bsky.feed.post/454397e440ec", 4), 184 + ("app.bsky.feed.post/9adeb165882c", 8), 185 + ] { 186 + let computed = atproto_mst_depth(key); 187 + assert_eq!(computed, expected); 188 + } 189 + } 190 + 191 + #[test] 192 + fn test_spec_example_keys() { 193 + // https://atproto.com/specs/repository#mst-structure 194 + for (key, expected) in [ 195 + ("2653ae71", 0), 196 + ("blue", 1), 197 + ("app.bsky.feed.post/454397e440ec", 4), 198 + ("app.bsky.feed.post/9adeb165882c", 8), 199 + ] { 200 + let computed = atproto_mst_depth(key); 201 + assert_eq!(computed, expected); 202 + } 203 + } 204 + 205 + #[test] 206 + fn test_ietf_example_keys() { 207 + // https://atproto.com/specs/repository#mst-structure 208 + for (key, expected) in [ 209 + ("key1", 0), 210 + ("key7", 1), 211 + ("key515", 4), 212 + ] { 213 + let computed = atproto_mst_depth(key); 214 + assert_eq!(computed, expected); 215 + } 216 + }