Fast and robust atproto CAR file processing in rust

drop empty-mst special-casing

yay we're back to the old api (doctests pass again!!)

+25 -38
+1 -2
benches/huge-car.rs
··· 33 33 let reader = tokio::io::BufReader::new(reader); 34 34 35 35 let mut driver = match Driver::load_car(reader, ser, 1024).await.unwrap() { 36 - Driver::Memory(_, Some(mem_driver)) => mem_driver, 37 - Driver::Memory(_, None) => panic!("car was empty"), 36 + Driver::Memory(_, mem_driver) => mem_driver, 38 37 Driver::Disk(_) => panic!("not doing disk for benchmark"), 39 38 }; 40 39
+1 -2
benches/non-huge-cars.rs
··· 40 40 41 41 async fn drive_car(bytes: &[u8]) -> usize { 42 42 let mut driver = match Driver::load_car(bytes, ser, 32).await.unwrap() { 43 - Driver::Memory(_, Some(mem_driver)) => mem_driver, 44 - Driver::Memory(_, None) => return 0, 43 + Driver::Memory(_, mem_driver) => mem_driver, 45 44 Driver::Disk(_) => panic!("not benching big cars here"), 46 45 }; 47 46
-4
examples/disk-read-file/main.rs
··· 65 65 } 66 66 }; 67 67 68 - let Some(driver) = driver else { 69 - panic!("big car but somehow empty MST: is the archive stuffed with garbage?"); 70 - }; 71 - 72 68 // collect some random stats about the blocks 73 69 let mut n = 0; 74 70 let mut zeros = 0;
+4 -6
examples/read-file/main.rs
··· 23 23 let reader = tokio::fs::File::open(file).await?; 24 24 let reader = tokio::io::BufReader::new(reader); 25 25 26 - let (commit, driver) = match DriverBuilder::new() 26 + let (commit, mut driver) = match DriverBuilder::new() 27 27 .with_block_processor(|block| block.len().to_ne_bytes().to_vec()) 28 28 .load_car(reader) 29 29 .await? ··· 35 35 log::info!("got commit: {commit:?}"); 36 36 37 37 let mut n = 0; 38 - if let Some(mut driver) = driver { 39 - while let Some(pairs) = driver.next_chunk(256).await? { 40 - n += pairs.len(); 41 - // log::info!("got {rkey:?}"); 42 - } 38 + while let Some(pairs) = driver.next_chunk(256).await? { 39 + n += pairs.len(); 40 + // log::info!("got {rkey:?}"); 43 41 } 44 42 log::info!("bye! total records={n}"); 45 43
+7 -7
readme.md
··· 74 74 - 450MiB CAR file (huge): `1.3s` 75 75 - 128MiB (huge): `350ms` 76 76 - 5.0MiB: `6.8ms` 77 - - 279KiB: `170us` 78 - - 3.4KiB: `5.2us` 79 - - empty: `670ns` 77 + - 279KiB: `160us` 78 + - 3.4KiB: `5.1us` 79 + - empty: `690ns` 80 80 81 81 it's a little faster with `mimalloc` 82 82 ··· 88 88 89 89 - 450MiB CAR file: `1.2s` (-8%) 90 90 - 128MiB: `300ms` (-14%) 91 - - 5.0MiB: `6.0ms` (-12%) 92 - - 279KiB: `140us` (-21%) 93 - - 3.4KiB: `4.7us` (-10%) 94 - - empty: `640ns` (-4%) 91 + - 5.0MiB: `6.0ms` (-11%) 92 + - 279KiB: `150us` (-7%) 93 + - 3.4KiB: `4.7us` (-8%) 94 + - empty: `670ns` (-4%) 95 95 96 96 processing CARs requires buffering blocks, so it can consume a lot of memory. repo-stream's in-memory driver has minimal memory overhead, but there are two ways to make it work with less mem (you can do either or both!) 97 97
+8 -13
src/drive.rs
··· 107 107 /// 108 108 /// You probably want to check the commit's signature. You can go ahead and 109 109 /// walk the MST right away. 110 - Memory(Commit, Option<MemDriver>), 110 + Memory(Commit, MemDriver), 111 111 /// Blocks exceed the memory limit 112 112 /// 113 113 /// You'll need to provide a disk storage to continue. The commit will be ··· 233 233 MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 234 234 MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 235 235 }; 236 - let Some(walker) = Walker::new(root_node) else { 237 - // TODO: actually we still want the commit in this case 238 - return Ok(Driver::Memory(commit, None)); 239 - }; 236 + let walker = Walker::new(root_node); 240 237 241 238 Ok(Driver::Memory( 242 239 commit, 243 - Some(MemDriver { 240 + MemDriver { 244 241 blocks: mem_blocks, 245 242 walker, 246 243 process, 247 - }), 244 + }, 248 245 )) 249 246 } 250 247 } ··· 304 301 pub async fn finish_loading( 305 302 mut self, 306 303 mut store: DiskStore, 307 - ) -> Result<(Commit, Option<DiskDriver>), DriveError> { 304 + ) -> Result<(Commit, DiskDriver), DriveError> { 308 305 // move store in and back out so we can manage lifetimes 309 306 // dump mem blocks into the store 310 307 store = tokio::task::spawn(async move { ··· 386 383 MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 387 384 MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(&bytes)?, 388 385 }; 389 - let Some(walker) = Walker::new(node) else { 390 - return Ok((commit, None)); 391 - }; 386 + let walker = Walker::new(node); 392 387 393 388 Ok(( 394 389 commit, 395 - Some(DiskDriver { 390 + DiskDriver { 396 391 process: self.process, 397 392 state: Some(BigState { store, walker }), 398 - }), 393 + }, 399 394 )) 400 395 } 401 396 }
+4 -4
src/walk.rs
··· 52 52 } 53 53 54 54 impl Walker { 55 - pub fn new(root_node: MstNode) -> Option<Self> { 56 - Some(Self { 55 + pub fn new(root_node: MstNode) -> Self { 56 + Self { 57 57 prev_rkey: "".to_string(), 58 - root_depth: root_node.depth?, 58 + root_depth: root_node.depth.unwrap_or(0), // empty root node = empty mst 59 59 todo: vec![root_node.things], 60 - }) 60 + } 61 61 } 62 62 63 63 fn mpb_step(