Fast and robust atproto CAR file processing in rust

custom mst node deserialize

try to put the data into a more walk-friendly shape

oops it made things slower

benches:
huge: +0.36% (within noise margin)
midsize: +3.3%
little: +3.7%
tiny: +4.6%
empty: +6.4%

+201 -86
+137 -12
src/mst.rs
··· 5 5 6 6 use cid::Cid; 7 7 use serde::Deserialize; 8 + use crate::walk::Depth; 8 9 9 10 /// The top-level data object in a repository's tree is a signed commit. 10 11 #[derive(Debug, Deserialize)] ··· 33 34 pub prev: Option<Cid>, 34 35 /// cryptographic signature of this commit, as raw bytes 35 36 #[serde(with = "serde_bytes")] 36 - pub sig: Vec<u8>, 37 + pub sig: serde_bytes::ByteBuf, 38 + } 39 + 40 + use serde::{de, de::{Deserializer, Visitor, MapAccess, SeqAccess}}; 41 + use std::fmt; 42 + 43 + pub(crate) enum NodeEntry { 44 + Value(Cid, Vec<u8>), // rkey 45 + Tree(Cid, u32), // depth 46 + } 47 + 48 + pub(crate) struct MstNode { 49 + pub left: Option<Cid>, // a tree but we don't know the depth 50 + pub entries: Vec<NodeEntry>, 51 + } 52 + 53 + pub(crate) struct Entries(pub(crate) Vec<NodeEntry>); 54 + 55 + impl<'de> Deserialize<'de> for Entries { 56 + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 57 + where 58 + D: Deserializer<'de>, 59 + { 60 + struct EntriesVisitor; 61 + impl<'de> Visitor<'de> for EntriesVisitor { 62 + type Value = Entries; 63 + 64 + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { 65 + formatter.write_str("seq MstEntries") 66 + } 67 + 68 + fn visit_seq<S>(self, mut seq: S) -> Result<Self::Value, S::Error> 69 + where 70 + S: SeqAccess<'de>, 71 + { 72 + let mut children: Vec<NodeEntry> = Vec::with_capacity(seq.size_hint().unwrap_or(5)); 73 + let mut prefix: Vec<u8> = vec![]; 74 + while let Some(entry) = seq.next_element::<Entry>()? { 75 + let mut rkey: Vec<u8> = vec![]; 76 + let pre_checked = prefix 77 + .get(..entry.prefix_len) 78 + // .ok_or(MstError::EntryPrefixOutOfbounds)?; 79 + .ok_or_else(|| todo!()).unwrap(); 80 + 81 + rkey.extend_from_slice(pre_checked); 82 + rkey.extend_from_slice(&entry.keysuffix); 83 + let depth = Depth::compute(&rkey); 84 + 85 + prefix = rkey.clone(); 86 + 87 + children.push(NodeEntry::Value(entry.value, rkey)); 88 + 89 + if let Some(ref tree) = entry.tree { 90 + children.push(NodeEntry::Tree(*tree, depth)); 91 + } 92 + } 93 + Ok(Entries(children)) 94 + } 95 + } 96 + deserializer.deserialize_seq(EntriesVisitor) 97 + } 98 + } 99 + 100 + impl<'de> Deserialize<'de> for MstNode { 101 + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 102 + where 103 + D: Deserializer<'de>, 104 + { 105 + struct NodeVisitor; 106 + impl<'de> Visitor<'de> for NodeVisitor { 107 + type Value = MstNode; 108 + 109 + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { 110 + formatter.write_str("struct MstNode") 111 + } 112 + 113 + fn visit_map<V>(self, mut map: V) -> Result<MstNode, V::Error> 114 + where 115 + V: MapAccess<'de>, 116 + { 117 + let mut found_left = false; 118 + let mut left = None; 119 + let mut found_entries = false; 120 + let mut entries = Vec::with_capacity(4); // "fanout of 4" so does this make sense???? 121 + 122 + while let Some(key) = map.next_key()? { 123 + match key { 124 + "l" => { 125 + if found_left { 126 + return Err(de::Error::duplicate_field("l")); 127 + } 128 + found_left = true; 129 + left = map.next_value()?; 130 + } 131 + "e" => { 132 + if found_entries { 133 + return Err(de::Error::duplicate_field("e")); 134 + } 135 + found_entries = true; 136 + let mut child_entries: Entries = map.next_value()?; 137 + entries.append(&mut child_entries.0); 138 + }, 139 + f => return Err(de::Error::unknown_field(f, NODE_FIELDS)) 140 + } 141 + } 142 + if !found_left { 143 + return Err(de::Error::missing_field("l")); 144 + } 145 + if !found_entries { 146 + return Err(de::Error::missing_field("e")); 147 + } 148 + Ok(MstNode { left, entries }) 149 + } 150 + } 151 + 152 + const NODE_FIELDS: &[&str] = &["l", "e"]; 153 + deserializer.deserialize_struct("MstNode", NODE_FIELDS, NodeVisitor) 154 + } 155 + } 156 + 157 + impl MstNode { 158 + pub(crate) fn is_empty(&self) -> bool { 159 + self.left.is_none() && self.entries.is_empty() 160 + } 37 161 } 38 162 39 163 /// MST node data schema ··· 62 186 /// so if a block *could be* a node, any record converter must postpone 63 187 /// processing. if it turns out it happens to be a very node-looking record, 64 188 /// well, sorry, it just has to only be processed later when that's known. 189 + #[inline(always)] 65 190 pub(crate) fn could_be(bytes: impl AsRef<[u8]>) -> bool { 66 191 const NODE_FINGERPRINT: [u8; 3] = [ 67 192 0xA2, // map length 2 (for "l" and "e" keys) ··· 77 202 .unwrap_or(false) 78 203 } 79 204 80 - /// Check if a node has any entries 81 - /// 82 - /// An empty repository with no records is represented as a single MST node 83 - /// with an empty array of entries. This is the only situation in which a 84 - /// tree may contain an empty leaf node which does not either contain keys 85 - /// ("entries") or point to a sub-tree containing entries. 86 - pub(crate) fn is_empty(&self) -> bool { 87 - self.left.is_none() && self.entries.is_empty() 88 - } 205 + // /// Check if a node has any entries 206 + // /// 207 + // /// An empty repository with no records is represented as a single MST node 208 + // /// with an empty array of entries. This is the only situation in which a 209 + // /// tree may contain an empty leaf node which does not either contain keys 210 + // /// ("entries") or point to a sub-tree containing entries. 211 + // pub(crate) fn is_empty(&self) -> bool { 212 + // self.left.is_none() && self.entries.is_empty() 213 + // } 89 214 } 90 215 91 216 /// TreeEntry object ··· 96 221 #[serde(rename = "p")] 97 222 pub prefix_len: usize, 98 223 /// remainder of key for this TreeEntry, after "prefixlen" have been removed 99 - #[serde(rename = "k", with = "serde_bytes")] 100 - pub keysuffix: Vec<u8>, // can we String this here? 224 + #[serde(rename = "k")] 225 + pub keysuffix: serde_bytes::ByteBuf, 101 226 /// link to the record data (CBOR) for this entry 102 227 #[serde(rename = "v")] 103 228 pub value: Cid,
+64 -74
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 + use crate::mst::NodeEntry; 4 + use crate::mst::MstNode; 3 5 use crate::Bytes; 4 6 use crate::HashMap; 5 7 use crate::disk::DiskStore; 6 8 use crate::drive::MaybeProcessedBlock; 7 - use crate::mst::Node; 8 9 use cid::Cid; 9 10 use sha2::{Digest, Sha256}; 10 11 use std::convert::Infallible; ··· 59 60 } 60 61 61 62 #[derive(Debug, Clone, Copy, PartialEq)] 62 - enum Depth { 63 + pub enum Depth { 63 64 Root, 64 65 Depth(u32), 65 66 } ··· 81 82 Self::Root => Ok(None), 82 83 Self::Depth(d) => d.checked_sub(1).ok_or(MstError::DepthUnderflow).map(Some), 83 84 } 85 + } 86 + pub fn compute(key: &[u8]) -> u32 { 87 + let Depth::Depth(d) = Self::from_key(key) else { 88 + panic!("errr"); 89 + }; 90 + d 84 91 } 85 92 } 86 93 87 - fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> { 94 + fn push_from_node(stack: &mut Vec<Need>, node: &MstNode, parent_depth: Depth) -> Result<(), MstError> { 88 95 // empty nodes are not allowed in the MST except in an empty MST 89 96 if node.is_empty() { 90 97 if parent_depth == Depth::Root { ··· 94 101 } 95 102 } 96 103 97 - let mut entries = Vec::with_capacity(node.entries.len()); 98 - let mut prefix = vec![]; 99 104 let mut this_depth = parent_depth.next_expected()?; 100 105 101 - for entry in &node.entries { 102 - let mut rkey = vec![]; 103 - let pre_checked = prefix 104 - .get(..entry.prefix_len) 105 - .ok_or(MstError::EntryPrefixOutOfbounds)?; 106 - rkey.extend_from_slice(pre_checked); 107 - rkey.extend_from_slice(&entry.keysuffix); 108 - 109 - let Depth::Depth(key_depth) = Depth::from_key(&rkey) else { 110 - return Err(MstError::WrongDepth); 111 - }; 112 - 113 - // this_depth is `none` if we are the deepest child (directly below root) 114 - // in that case we accept whatever highest depth is claimed 115 - let expected_depth = match this_depth { 116 - Some(d) => d, 117 - None => { 118 - this_depth = Some(key_depth); 119 - key_depth 106 + for entry in node.entries.iter().rev() { 107 + // ok this loop sucks now esp with depth checking 108 + // should keep the entries together with a shared depth on the rkey 109 + // ...maybe. skipping the absent trees is nice? 110 + match entry { 111 + NodeEntry::Value(cid, rkey) => { 112 + stack.push(Need::Record { 113 + rkey: String::from_utf8(rkey.to_vec())?, 114 + cid: *cid, 115 + }); 116 + } 117 + NodeEntry::Tree(cid, depth) => { 118 + if let Some(expected) = this_depth { 119 + if *depth != expected { 120 + return Err(MstError::WrongDepth); 121 + } 122 + } else { 123 + // this_depth is `none` if we are the deepest child (directly below root) 124 + // in that case we accept whatever highest depth is claimed 125 + this_depth = Some(*depth); 126 + } 127 + stack.push(Need::Node { 128 + depth: Depth::Depth(*depth), 129 + cid: *cid, 130 + }); 120 131 } 121 - }; 122 - 123 - // all keys we find should be this depth 124 - if key_depth != expected_depth { 125 - return Err(MstError::DepthUnderflow); 126 132 } 127 133 128 - prefix = rkey.clone(); 129 - 130 - entries.push(Need::Record { 131 - rkey: String::from_utf8(rkey)?, 132 - cid: entry.value, 133 - }); 134 - if let Some(ref tree) = entry.tree { 135 - entries.push(Need::Node { 136 - depth: Depth::Depth(key_depth), 137 - cid: *tree, 138 - }); 139 - } 140 134 } 141 135 142 - entries.reverse(); 143 - stack.append(&mut entries); 144 - 145 136 let d = this_depth.ok_or(MstError::LostDepth)?; 146 - 147 137 if let Some(tree) = node.left { 148 138 stack.push(Need::Node { 149 139 depth: Depth::Depth(d), ··· 195 185 let MaybeProcessedBlock::Raw(data) = block else { 196 186 return Err(WalkError::BadCommitFingerprint); 197 187 }; 198 - let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 188 + let node = serde_ipld_dagcbor::from_slice::<crate::mst::MstNode>(&data) 199 189 .map_err(WalkError::BadCommit)?; 200 190 201 191 // found node, make sure we remember ··· 258 248 let MaybeProcessedBlock::Raw(data) = block else { 259 249 return Err(WalkError::BadCommitFingerprint); 260 250 }; 261 - let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 251 + let node = serde_ipld_dagcbor::from_slice::<MstNode>(&data) 262 252 .map_err(WalkError::BadCommit)?; 263 253 264 254 // found node, make sure we remember ··· 370 360 } 371 361 } 372 362 373 - #[test] 374 - fn test_push_empty_fails() { 375 - let empty_node = Node { 376 - left: None, 377 - entries: vec![], 378 - }; 379 - let mut stack = vec![]; 380 - let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4)); 381 - assert_eq!(err, Err(MstError::EmptyNode)); 382 - } 363 + // #[test] 364 + // fn test_push_empty_fails() { 365 + // let empty_node = Node { 366 + // left: None, 367 + // entries: vec![], 368 + // }; 369 + // let mut stack = vec![]; 370 + // let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4)); 371 + // assert_eq!(err, Err(MstError::EmptyNode)); 372 + // } 383 373 384 - #[test] 385 - fn test_push_one_node() { 386 - let node = Node { 387 - left: Some(cid1()), 388 - entries: vec![], 389 - }; 390 - let mut stack = vec![]; 391 - push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap(); 392 - assert_eq!( 393 - stack.last(), 394 - Some(Need::Node { 395 - depth: Depth::Depth(3), 396 - cid: cid1() 397 - }) 398 - .as_ref() 399 - ); 400 - } 374 + // #[test] 375 + // fn test_push_one_node() { 376 + // let node = Node { 377 + // left: Some(cid1()), 378 + // entries: vec![], 379 + // }; 380 + // let mut stack = vec![]; 381 + // push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap(); 382 + // assert_eq!( 383 + // stack.last(), 384 + // Some(Need::Node { 385 + // depth: Depth::Depth(3), 386 + // cid: cid1() 387 + // }) 388 + // .as_ref() 389 + // ); 390 + // } 401 391 }