Fast and robust atproto CAR file processing in rust

get the api closer to what it was befor

+27 -32
+3 -2
benches/huge-car.rs
··· 32 32 let reader = tokio::fs::File::open(filename).await.unwrap(); 33 33 let reader = tokio::io::BufReader::new(reader); 34 34 35 - let mut driver = match Driver::load_car(reader, ser, 1024).await.unwrap().unwrap() { 36 - Driver::Memory(_, mem_driver) => mem_driver, 35 + let mut driver = match Driver::load_car(reader, ser, 1024).await.unwrap() { 36 + Driver::Memory(_, Some(mem_driver)) => mem_driver, 37 + Driver::Memory(_, None) => panic!("car was empty"), 37 38 Driver::Disk(_) => panic!("not doing disk for benchmark"), 38 39 }; 39 40
+3 -3
benches/non-huge-cars.rs
··· 40 40 41 41 async fn drive_car(bytes: &[u8]) -> usize { 42 42 let mut driver = match Driver::load_car(bytes, ser, 32).await.unwrap() { 43 - None => return 0, 44 - Some(Driver::Memory(_, mem_driver)) => mem_driver, 45 - Some(Driver::Disk(_)) => panic!("not benching big cars here"), 43 + Driver::Memory(_, Some(mem_driver)) => mem_driver, 44 + Driver::Memory(_, None) => return 0, 45 + Driver::Disk(_) => panic!("not benching big cars here"), 46 46 }; 47 47 48 48 let mut n = 0;
+2 -3
examples/disk-read-file/main.rs
··· 42 42 .load_car(reader) 43 43 .await? 44 44 { 45 - None => panic!("empty mst! try a bigger car"), 46 - Some(Driver::Memory(_, _)) => panic!("try this on a bigger car"), 47 - Some(Driver::Disk(big_stuff)) => { 45 + Driver::Memory(_, _) => panic!("try this on a bigger car"), 46 + Driver::Disk(big_stuff) => { 48 47 // we reach here if the repo was too big and needs to be spilled to 49 48 // disk to continue 50 49
+8 -7
examples/read-file/main.rs
··· 23 23 let reader = tokio::fs::File::open(file).await?; 24 24 let reader = tokio::io::BufReader::new(reader); 25 25 26 - let (commit, mut driver) = match DriverBuilder::new() 26 + let (commit, driver) = match DriverBuilder::new() 27 27 .with_block_processor(|block| block.len().to_ne_bytes().to_vec()) 28 28 .load_car(reader) 29 29 .await? 30 30 { 31 - None => todo!(), 32 - Some(Driver::Memory(commit, mem_driver)) => (commit, mem_driver), 33 - Some(Driver::Disk(_)) => panic!("this example doesn't handle big CARs"), 31 + Driver::Memory(commit, mem_driver) => (commit, mem_driver), 32 + Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 34 33 }; 35 34 36 35 log::info!("got commit: {commit:?}"); 37 36 38 37 let mut n = 0; 39 - while let Some(pairs) = driver.next_chunk(256).await? { 40 - n += pairs.len(); 41 - // log::info!("got {rkey:?}"); 38 + if let Some(mut driver) = driver { 39 + while let Some(pairs) = driver.next_chunk(256).await? { 40 + n += pairs.len(); 41 + // log::info!("got {rkey:?}"); 42 + } 42 43 } 43 44 log::info!("bye! total records={n}"); 44 45
+11 -17
src/drive.rs
··· 107 107 /// 108 108 /// You probably want to check the commit's signature. You can go ahead and 109 109 /// walk the MST right away. 110 - Memory(Commit, MemDriver), 110 + Memory(Commit, Option<MemDriver>), 111 111 /// Blocks exceed the memory limit 112 112 /// 113 113 /// You'll need to provide a disk storage to continue. The commit will be ··· 159 159 } 160 160 } 161 161 /// Begin processing an atproto MST from a CAR file 162 - pub async fn load_car<R: AsyncRead + Unpin>( 163 - &self, 164 - reader: R, 165 - ) -> Result<Option<Driver<R>>, DriveError> { 162 + pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> { 166 163 Driver::load_car(reader, noop, self.mem_limit_mb).await 167 164 } 168 165 } ··· 185 182 self 186 183 } 187 184 /// Begin processing an atproto MST from a CAR file 188 - pub async fn load_car<R: AsyncRead + Unpin>( 189 - &self, 190 - reader: R, 191 - ) -> Result<Option<Driver<R>>, DriveError> { 185 + pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> { 192 186 Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 193 187 } 194 188 } ··· 207 201 reader: R, 208 202 process: fn(Bytes) -> Bytes, 209 203 mem_limit_mb: usize, 210 - ) -> Result<Option<Driver<R>>, DriveError> { 204 + ) -> Result<Driver<R>, DriveError> { 211 205 let max_size = mem_limit_mb * 2_usize.pow(20); 212 206 let mut mem_blocks = HashMap::new(); 213 207 ··· 240 234 mem_size += maybe_processed.len(); 241 235 mem_blocks.insert(cid, maybe_processed); 242 236 if mem_size >= max_size { 243 - return Ok(Some(Driver::Disk(NeedDisk { 237 + return Ok(Driver::Disk(NeedDisk { 244 238 car, 245 239 root, 246 240 process, 247 241 max_size, 248 242 mem_blocks, 249 243 commit, 250 - }))); 244 + })); 251 245 } 252 246 } 253 247 ··· 264 258 }; 265 259 let Some(walker) = Walker::new(root_node) else { 266 260 // TODO: actually we still want the commit in this case 267 - return Ok(None); 261 + return Ok(Driver::Memory(commit, None)); 268 262 }; 269 263 270 - Ok(Some(Driver::Memory( 264 + Ok(Driver::Memory( 271 265 commit, 272 - MemDriver { 266 + Some(MemDriver { 273 267 blocks: mem_blocks, 274 268 walker, 275 269 process, 276 - }, 277 - ))) 270 + }), 271 + )) 278 272 } 279 273 } 280 274