Fast and robust atproto CAR file processing in rust

fmt

+77 -92
+4 -9
benches/huge-car.rs
··· 32 32 let reader = tokio::fs::File::open(filename).await.unwrap(); 33 33 let reader = tokio::io::BufReader::new(reader); 34 34 35 - let mut driver = 36 - match Driver::load_car(reader, ser, 1024) 37 - .await 38 - .unwrap() 39 - .unwrap() 40 - { 41 - Driver::Memory(_, mem_driver) => mem_driver, 42 - Driver::Disk(_) => panic!("not doing disk for benchmark"), 43 - }; 35 + let mut driver = match Driver::load_car(reader, ser, 1024).await.unwrap().unwrap() { 36 + Driver::Memory(_, mem_driver) => mem_driver, 37 + Driver::Disk(_) => panic!("not doing disk for benchmark"), 38 + }; 44 39 45 40 let mut n = 0; 46 41 while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
+22 -25
benches/leading.rs
··· 1 - use criterion::{Criterion, BatchSize, criterion_group, criterion_main}; 2 - use sha2::{Sha256, Digest}; 1 + use criterion::{BatchSize, Criterion, criterion_group, criterion_main}; 3 2 use hmac_sha256::Hash; 3 + use sha2::{Digest, Sha256}; 4 4 5 5 pub fn compute(bytes: [u8; 32]) -> u32 { 6 6 let mut zeros = 0; ··· 16 16 } 17 17 18 18 pub fn compute2(bytes: [u8; 32]) -> u32 { 19 - u128::from_be_bytes(bytes.split_at(16).0.try_into().unwrap()) 20 - .leading_zeros() / 2 19 + u128::from_be_bytes(bytes.split_at(16).0.try_into().unwrap()).leading_zeros() / 2 21 20 } 22 21 23 22 fn from_key_old(key: &[u8]) -> u32 { ··· 30 29 31 30 pub fn criterion_benchmark(c: &mut Criterion) { 32 31 for (name, case) in [ 33 - ("no zeros", [0xFF; 32]), 34 - ("two zeros", [0x3F; 32]), 35 - ("some zeros", [0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]), 36 - ("many zeros", [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]), 32 + ("no zeros", [0xFF; 32]), 33 + ("two zeros", [0x3F; 32]), 34 + ( 35 + "some zeros", 36 + [ 37 + 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 38 + 1, 1, 1, 1, 39 + ], 40 + ), 41 + ( 42 + "many zeros", 43 + [ 44 + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 45 + 1, 1, 1, 1, 46 + ], 47 + ), 37 48 ] { 38 49 let mut g = c.benchmark_group(name); 39 50 g.bench_function("old", |b| { 40 - b.iter_batched( 41 - || case.clone(), 42 - |c| compute(c), 43 - BatchSize::SmallInput, 44 - ) 51 + b.iter_batched(|| case.clone(), |c| compute(c), BatchSize::SmallInput) 45 52 }); 46 53 g.bench_function("new", |b| { 47 - b.iter_batched( 48 - || case.clone(), 49 - |c| compute2(c), 50 - BatchSize::SmallInput, 51 - ) 54 + b.iter_batched(|| case.clone(), |c| compute2(c), BatchSize::SmallInput) 52 55 }); 53 56 } 54 57 55 - for case in [ 56 - "a", 57 - "aa", 58 - "aaa", 59 - "aaaa", 60 - ] { 58 + for case in ["a", "aa", "aaa", "aaaa"] { 61 59 let mut g = c.benchmark_group(case); 62 60 g.bench_function("old", |b| b.iter(|| from_key_old(case.as_bytes()))); 63 61 g.bench_function("new", |b| b.iter(|| from_key_new(case.as_bytes()))); 64 62 } 65 63 } 66 - 67 64 68 65 criterion_group!(benches, criterion_benchmark); 69 66 criterion_main!(benches);
+1 -4
benches/non-huge-cars.rs
··· 39 39 } 40 40 41 41 async fn drive_car(bytes: &[u8]) -> usize { 42 - let mut driver = match Driver::load_car(bytes, ser, 32) 43 - .await 44 - .unwrap() 45 - { 42 + let mut driver = match Driver::load_car(bytes, ser, 32).await.unwrap() { 46 43 None => return 0, 47 44 Some(Driver::Memory(_, mem_driver)) => mem_driver, 48 45 Some(Driver::Disk(_)) => panic!("not benching big cars here"),
+1 -2
src/disk.rs
··· 17 17 ``` 18 18 */ 19 19 20 - use crate::Bytes; 21 - use crate::drive::DriveError; 20 + use crate::{Bytes, drive::DriveError}; 22 21 use fjall::{Database, Error as FjallError, Keyspace, KeyspaceCreateOptions}; 23 22 use std::path::PathBuf; 24 23
+13 -7
src/drive.rs
··· 1 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 - use crate::walk::Output; 4 - use crate::Bytes; 5 - use crate::HashMap; 6 - use crate::disk::{DiskError, DiskStore}; 7 - use crate::mst::MstNode; 3 + use crate::{ 4 + Bytes, HashMap, 5 + disk::{DiskError, DiskStore}, 6 + mst::MstNode, 7 + walk::Output, 8 + }; 8 9 use cid::Cid; 9 10 use iroh_car::CarReader; 10 11 use std::convert::Infallible; ··· 254 255 let commit = commit.ok_or(DriveError::MissingCommit)?; 255 256 256 257 // the commit always must point to a Node; empty node => empty MST special case 257 - let root_node: MstNode = match mem_blocks.get(&commit.data).ok_or(DriveError::MissingCommit)? { 258 + let root_node: MstNode = match mem_blocks 259 + .get(&commit.data) 260 + .ok_or(DriveError::MissingCommit)? 261 + { 258 262 MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 259 263 MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 260 264 }; ··· 300 304 let mut out = Vec::with_capacity(n); 301 305 for _ in 0..n { 302 306 // walk as far as we can until we run out of blocks or find a record 303 - let Some(Output { rkey, cid: _, data }) = self.walker.step(&mut self.blocks, self.process)? else { 307 + let Some(Output { rkey, cid: _, data }) = 308 + self.walker.step(&mut self.blocks, self.process)? 309 + else { 304 310 break; 305 311 }; 306 312 out.push((rkey, data));
+19 -14
src/mst.rs
··· 3 3 //! The primary aim is to work through the **tree** structure. Non-node blocks 4 4 //! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever. 5 5 6 - use sha2::{Digest, Sha256}; 7 6 use cid::Cid; 8 7 use serde::Deserialize; 8 + use sha2::{Digest, Sha256}; 9 9 10 10 /// The top-level data object in a repository's tree is a signed commit. 11 11 #[derive(Debug, Deserialize)] ··· 37 37 pub sig: serde_bytes::ByteBuf, 38 38 } 39 39 40 - use serde::de::{self, Deserializer, Visitor, MapAccess, Unexpected}; 40 + use serde::de::{self, Deserializer, MapAccess, Unexpected, Visitor}; 41 41 use std::fmt; 42 42 43 43 pub type Depth = u32; ··· 97 97 } 98 98 found_left = true; 99 99 if let Some(cid) = map.next_value()? { 100 - left = Some(NodeThing { cid, kind: ThingKind::Tree }); 100 + left = Some(NodeThing { 101 + cid, 102 + kind: ThingKind::Tree, 103 + }); 101 104 } 102 105 } 103 106 "e" => { ··· 110 113 111 114 for entry in map.next_value::<Vec<Entry>>()? { 112 115 let mut rkey: Vec<u8> = vec![]; 113 - let pre_checked = prefix 114 - .get(..entry.prefix_len) 115 - .ok_or_else(|| de::Error::invalid_value( 116 - Unexpected::Bytes(&prefix), 117 - &"a prefix at least as long as the prefix_len", 118 - ))?; 116 + let pre_checked = 117 + prefix.get(..entry.prefix_len).ok_or_else(|| { 118 + de::Error::invalid_value( 119 + Unexpected::Bytes(&prefix), 120 + &"a prefix at least as long as the prefix_len", 121 + ) 122 + })?; 119 123 120 124 rkey.extend_from_slice(pre_checked); 121 125 rkey.extend_from_slice(&entry.keysuffix); 122 126 123 - let rkey_s = String::from_utf8(rkey.clone()) 124 - .map_err(|_| de::Error::invalid_value( 127 + let rkey_s = String::from_utf8(rkey.clone()).map_err(|_| { 128 + de::Error::invalid_value( 125 129 Unexpected::Bytes(&rkey), 126 130 &"a valid utf-8 rkey", 127 - ))?; 131 + ) 132 + })?; 128 133 129 134 let key_depth = atproto_mst_depth(&rkey_s); 130 135 if depth.is_none() { ··· 150 155 151 156 prefix = rkey; 152 157 } 153 - }, 154 - f => return Err(de::Error::unknown_field(f, NODE_FIELDS)) 158 + } 159 + f => return Err(de::Error::unknown_field(f, NODE_FIELDS)), 155 160 } 156 161 } 157 162 if !found_left {
+16 -26
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 - use crate::mst::NodeThing; 4 - use crate::mst::ThingKind; 5 - use crate::mst::MstNode; 6 - use crate::mst::Depth; 7 - use crate::Bytes; 8 - use crate::HashMap; 9 - use crate::disk::DiskStore; 10 - use crate::drive::MaybeProcessedBlock; 3 + use crate::mst::{Depth, MstNode, NodeThing, ThingKind}; 4 + use crate::{Bytes, HashMap, disk::DiskStore, drive::MaybeProcessedBlock}; 11 5 use cid::Cid; 12 6 use std::convert::Infallible; 13 7 ··· 60 54 } 61 55 62 56 impl Walker { 63 - pub fn new( 64 - root_node: MstNode, 65 - ) -> Option<Self> { 57 + pub fn new(root_node: MstNode) -> Option<Self> { 66 58 Some(Self { 67 59 prev_rkey: "".to_string(), 68 60 root_depth: root_node.depth?, ··· 93 85 self.prev_rkey = rkey.clone(); 94 86 95 87 log::trace!("val @ {rkey}"); 96 - Ok(Some(Output { 97 - rkey, 98 - cid, 99 - data, 100 - })) 88 + Ok(Some(Output { rkey, cid, data })) 101 89 } 102 90 ThingKind::Tree => { 103 91 let MaybeProcessedBlock::Raw(data) = mpb else { 104 92 return Err(WalkError::BadCommitFingerprint); 105 93 }; 106 94 107 - let node: MstNode = serde_ipld_dagcbor::from_slice(&data) 108 - .map_err(WalkError::BadCommit)?; 95 + let node: MstNode = 96 + serde_ipld_dagcbor::from_slice(data).map_err(WalkError::BadCommit)?; 109 97 110 98 if node.is_empty() { 111 99 return Err(WalkError::MstError(MstError::EmptyNode)); 112 100 } 113 101 114 102 let current_depth = self.root_depth - (self.todo.len() - 1) as u32; 115 - let next_depth = current_depth.checked_sub(1).ok_or(MstError::DepthUnderflow)?; 116 - if let Some(d) = node.depth { 117 - if d != next_depth { 118 - return Err(WalkError::MstError(MstError::WrongDepth { 119 - depth: d, 120 - expected: next_depth, 121 - })); 122 - } 103 + let next_depth = current_depth 104 + .checked_sub(1) 105 + .ok_or(MstError::DepthUnderflow)?; 106 + if let Some(d) = node.depth 107 + && d != next_depth 108 + { 109 + return Err(WalkError::MstError(MstError::WrongDepth { 110 + depth: d, 111 + expected: next_depth, 112 + })); 123 113 } 124 114 125 115 log::trace!("node into depth {next_depth}");
+1 -5
tests/mst-depth.rs
··· 205 205 #[test] 206 206 fn test_ietf_example_keys() { 207 207 // https://atproto.com/specs/repository#mst-structure 208 - for (key, expected) in [ 209 - ("key1", 0), 210 - ("key7", 1), 211 - ("key515", 4), 212 - ] { 208 + for (key, expected) in [("key1", 0), ("key7", 1), ("key515", 4)] { 213 209 let computed = atproto_mst_depth(key); 214 210 assert_eq!(computed, expected); 215 211 }