Fast and robust atproto CAR file processing in rust

simpler depth handling

+64 -44
+1
Cargo.toml
··· 27 27 tempfile = "3.23.0" 28 28 tokio = { version = "1.47.1", features = ["full"] } 29 29 mimalloc = "0.1.48" 30 + hmac-sha256 = "1.1.12" 30 31 31 32 [profile.profiling] 32 33 inherits = "release"
+4
benches/huge-car.rs
··· 4 4 5 5 use criterion::{Criterion, criterion_group, criterion_main}; 6 6 7 + // use mimalloc::MiMalloc; 8 + // #[global_allocator] 9 + // static GLOBAL: MiMalloc = MiMalloc; 10 + 7 11 pub fn criterion_benchmark(c: &mut Criterion) { 8 12 let rt = tokio::runtime::Builder::new_multi_thread() 9 13 .enable_all()
+4
benches/non-huge-cars.rs
··· 3 3 4 4 use criterion::{Criterion, criterion_group, criterion_main}; 5 5 6 + // use mimalloc::MiMalloc; 7 + // #[global_allocator] 8 + // static GLOBAL: MiMalloc = MiMalloc; 9 + 6 10 const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 7 11 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 8 12 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
+22 -6
readme.md
··· 58 58 ``` 59 59 60 60 more recent todo 61 - 62 - - [ ] get an *emtpy* car for the test suite 61 + - [ ] repo car slices 62 + - [ ] lazy-value stream (rkey -> CID diffing for tap-like `#sync` handling) 63 + - [x] get an *emtpy* car for the test suite 63 64 - [x] implement a max size on disk limit 64 65 65 66 ··· 70 71 71 72 current car processing times (records processed into their length usize, phil's dev machine): 72 73 73 - - 128MiB CAR file: `347ms` 74 - - 5.0MiB: `6.1ms` 75 - - 279KiB: `139us` 76 - - 3.4KiB: `4.9us` 74 + - 128MiB CAR file: `350ms` 75 + - 5.0MiB: `6.8ms` 76 + - 279KiB: `170us` 77 + - 3.4KiB: `5.2us` 78 + - empty: `710ns` 79 + 80 + it's a little faster with `mimalloc` 81 + 82 + ```rust 83 + use mimalloc::MiMalloc; 84 + #[global_allocator] 85 + static GLOBAL: MiMalloc = MiMalloc; 86 + ``` 87 + 88 + - 128MiB CAR file: `310ms` (-13%) 89 + - 5.0MiB: `6.1ms` (-10%) 90 + - 279KiB: `160us` (-5%) 91 + - 3.4KiB: `5.7us` (-9%) 92 + - empty: `660ns` (-7%) 77 93 78 94 79 95 running the huge-car benchmark
+5 -11
src/drive.rs
··· 254 254 let commit = commit.ok_or(DriveError::MissingCommit)?; 255 255 256 256 // the commit always must point to a Node; empty node => empty MST special case 257 - let node: MstNode = match mem_blocks.get(&commit.data).ok_or(DriveError::MissingCommit)? { 257 + let root_node: MstNode = match mem_blocks.get(&commit.data).ok_or(DriveError::MissingCommit)? { 258 258 MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 259 259 MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 260 260 }; 261 - if node.is_empty() { 261 + let Some(walker) = Walker::new(root_node) else { 262 262 // TODO: actually we still want the commit in this case 263 263 return Ok(None); 264 - } 265 - let depth = node.depth.unwrap(); 266 - 267 - let walker = Walker::new(commit.data, depth); 264 + }; 268 265 269 266 Ok(Some(Driver::Memory( 270 267 commit, ··· 412 409 MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 413 410 MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(&bytes)?, 414 411 }; 415 - if node.is_empty() { 412 + let Some(walker) = Walker::new(node) else { 416 413 return Ok((commit, None)); 417 - } 418 - let depth = node.depth.unwrap(); 419 - 420 - let walker = Walker::new(commit.data, depth); 414 + }; 421 415 422 416 Ok(( 423 417 commit,
+28 -27
src/walk.rs
··· 55 55 #[derive(Debug)] 56 56 pub struct Walker { 57 57 prev_rkey: String, 58 - todo: Vec<(Depth, NodeThing)>, 58 + root_depth: Depth, 59 + todo: Vec<Vec<NodeThing>>, 59 60 } 60 61 61 62 impl Walker { 62 63 pub fn new( 63 - root_cid: Cid, 64 - depth: Depth, 65 - ) -> Self { 66 - Self { 64 + root_node: MstNode, 65 + ) -> Option<Self> { 66 + Some(Self { 67 67 prev_rkey: "".to_string(), 68 - todo: vec![( 69 - depth + 1, // we're kind of inventing a fake root one above the real root 70 - // ... maybe we should just pass in the real root here??? 71 - NodeThing { 72 - cid: root_cid, 73 - kind: ThingKind::Tree, 74 - }, 75 - )], 68 + root_depth: root_node.depth?, 69 + todo: vec![root_node.things], 70 + }) 71 + } 72 + 73 + fn next_todo(&mut self) -> Option<NodeThing> { 74 + while let Some(last) = self.todo.last_mut() { 75 + let Some(thing) = last.pop() else { 76 + self.todo.pop(); 77 + continue; 78 + }; 79 + return Some(thing); 76 80 } 81 + None 77 82 } 78 83 79 84 fn mpb_step( 80 85 &mut self, 81 - depth: Depth, 82 86 kind: ThingKind, 83 87 cid: Cid, 84 88 mpb: &MaybeProcessedBlock, ··· 99 103 } 100 104 self.prev_rkey = rkey.clone(); 101 105 106 + log::trace!("val @ {rkey}"); 102 107 Ok(Some(Output { 103 108 rkey, 104 109 cid, ··· 117 122 return Err(WalkError::MstError(MstError::EmptyNode)); 118 123 } 119 124 120 - let next_depth = depth.checked_sub(1).ok_or(MstError::DepthUnderflow)?; 125 + let current_depth = self.root_depth - (self.todo.len() - 1) as u32; 126 + let next_depth = current_depth.checked_sub(1).ok_or(MstError::DepthUnderflow)?; 121 127 if let Some(d) = node.depth { 122 128 if d != next_depth { 123 129 return Err(WalkError::MstError(MstError::WrongDepth { ··· 127 133 } 128 134 } 129 135 130 - for thing in node.things { 131 - self.todo.push((next_depth, thing)); 132 - } 133 - 136 + log::trace!("node into depth {next_depth}"); 137 + self.todo.push(node.things); 134 138 Ok(None) 135 139 } 136 140 } ··· 143 147 process: impl Fn(Bytes) -> Bytes, 144 148 ) -> Result<Option<Output>, WalkError> { 145 149 146 - while let Some((depth, NodeThing { cid, kind })) = self.todo.pop() { 150 + while let Some(NodeThing { cid, kind }) = self.next_todo() { 147 151 let Some(mpb) = blocks.get(&cid) else { 148 152 return Err(WalkError::MissingBlock(cid)); 149 153 }; 150 - if let Some(out) = self.mpb_step(depth, kind, cid, mpb, &process)? { 154 + 155 + if let Some(out) = self.mpb_step(kind, cid, mpb, &process)? { 151 156 return Ok(Some(out)); 152 157 } 153 158 } 154 - 155 - log::trace!("tried to walk but we're actually done."); 156 159 Ok(None) 157 160 } 158 161 ··· 162 165 blocks: &mut DiskStore, 163 166 process: impl Fn(Bytes) -> Bytes, 164 167 ) -> Result<Option<Output>, WalkError> { 165 - 166 - while let Some((depth, NodeThing { cid, kind })) = self.todo.pop() { 168 + while let Some(NodeThing { cid, kind }) = self.next_todo() { 167 169 let Some(block_slice) = blocks.get(&cid.to_bytes())? else { 168 170 return Err(WalkError::MissingBlock(cid)); 169 171 }; 170 172 let mpb = MaybeProcessedBlock::from_bytes(block_slice.to_vec()); 171 - if let Some(out) = self.mpb_step(depth, kind, cid, &mpb, &process)? { 173 + if let Some(out) = self.mpb_step(kind, cid, &mpb, &process)? { 172 174 return Ok(Some(out)); 173 175 } 174 176 } 175 - log::trace!("tried to walk but we're actually done."); 176 177 Ok(None) 177 178 } 178 179 }