Fast and robust atproto CAR file processing in rust

just vec again

+40 -58
-2
Cargo.lock
··· 850 850 checksum = "b875f1dfe14f557f805b167fb9b0fc54c5560c7a4bd6ae02535b2846f276a8cb" 851 851 dependencies = [ 852 852 "byteorder-lite", 853 - "bytes", 854 853 "byteview", 855 854 "crossbeam-skiplist", 856 855 "enum_dispatch", ··· 1144 1143 name = "repo-stream" 1145 1144 version = "0.2.2" 1146 1145 dependencies = [ 1147 - "bytes", 1148 1146 "cid", 1149 1147 "clap", 1150 1148 "criterion",
+1 -2
Cargo.toml
··· 7 7 repository = "https://tangled.org/@microcosm.blue/repo-stream" 8 8 9 9 [dependencies] 10 - bytes = "1.11.0" 11 - fjall = { version = "3.0.1", default-features = false, features = ["bytes_1"] } 10 + fjall = { version = "3.0.1", default-features = false } 12 11 hashbrown = "0.16.1" 13 12 cid = { version = "0.11.1", features = ["serde"] } 14 13 iroh-car = "0.5.1"
+8 -11
benches/huge-car.rs
··· 22 22 let reader = tokio::fs::File::open(filename).await.unwrap(); 23 23 let reader = tokio::io::BufReader::new(reader); 24 24 25 - let mut driver = match Driver::load_car( 26 - reader, 27 - |block| block.len().to_le_bytes().to_vec().into(), 28 - 1024, 29 - ) 30 - .await 31 - .unwrap() 32 - { 33 - Driver::Memory(_, mem_driver) => mem_driver, 34 - Driver::Disk(_) => panic!("not doing disk for benchmark"), 35 - }; 25 + let mut driver = 26 + match Driver::load_car(reader, |block| block.len().to_le_bytes().to_vec(), 1024) 27 + .await 28 + .unwrap() 29 + { 30 + Driver::Memory(_, mem_driver) => mem_driver, 31 + Driver::Disk(_) => panic!("not doing disk for benchmark"), 32 + }; 36 33 37 34 let mut n = 0; 38 35 while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
+7 -8
benches/non-huge-cars.rs
··· 29 29 } 30 30 31 31 async fn drive_car(bytes: &[u8]) -> usize { 32 - let mut driver = 33 - match Driver::load_car(bytes, |block| block.len().to_le_bytes().to_vec().into(), 32) 34 - .await 35 - .unwrap() 36 - { 37 - Driver::Memory(_, mem_driver) => mem_driver, 38 - Driver::Disk(_) => panic!("not benching big cars here"), 39 - }; 32 + let mut driver = match Driver::load_car(bytes, |block| block.len().to_le_bytes().to_vec(), 32) 33 + .await 34 + .unwrap() 35 + { 36 + Driver::Memory(_, mem_driver) => mem_driver, 37 + Driver::Disk(_) => panic!("not benching big cars here"), 38 + }; 40 39 41 40 let mut n = 0; 42 41 while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
+1 -1
src/disk.rs
··· 17 17 ``` 18 18 */ 19 19 20 + use crate::Bytes; 20 21 use crate::drive::DriveError; 21 - use bytes::Bytes; 22 22 use fjall::config::{CompressionPolicy, PinningPolicy, RestartIntervalPolicy}; 23 23 use fjall::{CompressionType, Database, Error as FjallError, Keyspace, KeyspaceCreateOptions}; 24 24 use std::path::PathBuf;
+9 -20
src/drive.rs
··· 1 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 + use crate::Bytes; 3 4 use crate::HashMap; 4 5 use crate::disk::{DiskError, DiskStore}; 5 6 use crate::mst::Node; 6 - use bytes::Bytes; 7 7 use cid::Cid; 8 8 use iroh_car::CarReader; 9 9 use std::convert::Infallible; ··· 21 21 BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 22 22 #[error("The Commit block reference by the root was not found")] 23 23 MissingCommit, 24 - #[error("The MST block {0} could not be found")] 25 - MissingBlock(Cid), 26 24 #[error("Failed to walk the mst tree: {0}")] 27 25 WalkError(#[from] WalkError), 28 26 #[error("CAR file had no roots")] ··· 80 78 } 81 79 pub(crate) fn into_bytes(self) -> Bytes { 82 80 match self { 83 - MaybeProcessedBlock::Raw(b) => { 84 - let mut owned = b.try_into_mut().unwrap(); 85 - owned.extend_from_slice(&[0x00]); 86 - owned.into() 81 + MaybeProcessedBlock::Raw(mut b) => { 82 + b.push(0x00); 83 + b 87 84 } 88 - MaybeProcessedBlock::Processed(b) => { 89 - let mut owned = b.try_into_mut().unwrap(); 90 - owned.extend_from_slice(&[0x01]); 91 - owned.into() 85 + MaybeProcessedBlock::Processed(mut b) => { 86 + b.push(0x01); 87 + b 92 88 } 93 89 } 94 90 } 95 91 pub(crate) fn from_bytes(mut b: Bytes) -> Self { 96 92 // TODO: make sure bytes is not empty, that it's explicitly 0 or 1, etc 97 - let suffix = b.split_off(b.len() - 1); 98 - if *suffix == [0x00] { 93 + let suffix = b.pop().unwrap(); 94 + if suffix == 0x00 { 99 95 MaybeProcessedBlock::Raw(b) 100 96 } else { 101 97 MaybeProcessedBlock::Processed(b) ··· 292 288 for _ in 0..n { 293 289 // walk as far as we can until we run out of blocks or find a record 294 290 match self.walker.step(&mut self.blocks, self.process)? { 295 - Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 296 291 Step::Finish => break, 297 292 Step::Found { rkey, data } => { 298 293 out.push((rkey, data)); ··· 465 460 } 466 461 }; 467 462 match step { 468 - Step::Missing(cid) => { 469 - return (state, Err(DriveError::MissingBlock(cid))); 470 - } 471 463 Step::Finish => break, 472 464 Step::Found { rkey, data } => out.push((rkey, data)), 473 465 }; ··· 508 500 }; 509 501 510 502 match step { 511 - Step::Missing(cid) => { 512 - return tx.blocking_send(Err(DriveError::MissingBlock(cid))); 513 - } 514 503 Step::Finish => return Ok(()), 515 504 Step::Found { rkey, data } => { 516 505 out.push((rkey, data));
+3
src/lib.rs
··· 91 91 pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop}; 92 92 pub use mst::Commit; 93 93 94 + // pub use bytes::Bytes; 95 + pub type Bytes = Vec<u8>; 96 + 94 97 pub(crate) use hashbrown::HashMap;
+11 -14
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 + use crate::Bytes; 3 4 use crate::HashMap; 4 5 use crate::disk::DiskStore; 5 6 use crate::drive::MaybeProcessedBlock; 6 7 use crate::mst::Node; 7 - use bytes::Bytes; 8 8 use cid::Cid; 9 9 use sha2::{Digest, Sha256}; 10 10 use std::convert::Infallible; ··· 20 20 MstError(#[from] MstError), 21 21 #[error("storage error: {0}")] 22 22 StorageError(#[from] fjall::Error), 23 + #[error("block not found: {0}")] 24 + MissingBlock(Cid), 23 25 } 24 26 25 27 /// Errors from invalid Rkeys ··· 44 46 /// Walker outputs 45 47 #[derive(Debug)] 46 48 pub enum Step { 47 - /// We needed this CID but it's not in the block store 48 - Missing(Cid), 49 49 /// Reached the end of the MST! yay! 50 50 Finish, 51 51 /// A record was found! ··· 189 189 &mut Need::Node { depth, cid } => { 190 190 log::trace!("need node {cid:?}"); 191 191 let Some(block) = blocks.remove(&cid) else { 192 - log::trace!("node not found, resting"); 193 - return Ok(Step::Missing(cid)); 192 + return Err(WalkError::MissingBlock(cid)); 194 193 }; 195 194 196 195 let MaybeProcessedBlock::Raw(data) = block else { ··· 209 208 log::trace!("need record {cid:?}"); 210 209 // note that we cannot *remove* a record block, sadly, since 211 210 // there can be multiple rkeys pointing to the same cid. 212 - let Some(data) = blocks.get_mut(cid) else { 213 - return Ok(Step::Missing(*cid)); 211 + let Some(data) = blocks.get(cid) else { 212 + return Err(WalkError::MissingBlock(*cid)); 214 213 }; 215 214 let rkey = rkey.clone(); 216 215 let data = match data { ··· 251 250 let cid_bytes = cid.to_bytes(); 252 251 log::trace!("need node {cid:?}"); 253 252 let Some(block_slice) = reader.get(&cid_bytes)? else { 254 - log::trace!("node not found, resting"); 255 - return Ok(Step::Missing(cid)); 253 + return Err(WalkError::MissingBlock(cid)); 256 254 }; 257 255 258 - let block = MaybeProcessedBlock::from_bytes(block_slice.into()); // TODO shouldn't fjalls slice already be bytes 256 + let block = MaybeProcessedBlock::from_bytes(block_slice.to_vec()); 259 257 260 258 let MaybeProcessedBlock::Raw(data) = block else { 261 259 return Err(WalkError::BadCommitFingerprint); ··· 273 271 log::trace!("need record {cid:?}"); 274 272 let cid_bytes = cid.to_bytes(); 275 273 let Some(data_slice) = reader.get(&cid_bytes)? else { 276 - log::trace!("record block not found, resting"); 277 - return Ok(Step::Missing(*cid)); 274 + return Err(WalkError::MissingBlock(*cid)); 278 275 }; 279 - let data = MaybeProcessedBlock::from_bytes(data_slice.into()); 276 + let data = MaybeProcessedBlock::from_bytes(data_slice.to_vec()); 280 277 let rkey = rkey.clone(); 281 278 let data = match data { 282 279 MaybeProcessedBlock::Raw(data) => process(data), 283 - MaybeProcessedBlock::Processed(t) => t.clone(), 280 + MaybeProcessedBlock::Processed(t) => t, 284 281 }; 285 282 286 283 // found node, make sure we remember