Fast and robust atproto CAR file processing in rust

reduce fjall mem

+9 -14
+3 -1
examples/disk-read-file/main.rs
··· 5 5 extern crate repo_stream; 6 6 7 7 use mimalloc::MiMalloc; 8 - 9 8 #[global_allocator] 10 9 static GLOBAL: MiMalloc = MiMalloc; 11 10 ··· 58 57 // at this point you might want to fetch the account's signing key 59 58 // via the DID from the commit, and then verify the signature. 60 59 log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit); 60 + 61 + // log::info!("now is good time to check mem usage..."); 62 + // tokio::time::sleep(std::time::Duration::from_secs(15)).await; 61 63 62 64 // pop the driver back out to get some code indentation relief 63 65 driver
+5 -12
src/disk.rs
··· 19 19 20 20 use crate::Bytes; 21 21 use crate::drive::DriveError; 22 - use fjall::config::{CompressionPolicy, PinningPolicy, RestartIntervalPolicy}; 23 - use fjall::{CompressionType, Database, Error as FjallError, Keyspace, KeyspaceCreateOptions}; 22 + use fjall::{Database, Error as FjallError, Keyspace, KeyspaceCreateOptions}; 24 23 use std::path::PathBuf; 25 24 26 25 #[derive(Debug, thiserror::Error)] ··· 111 110 let max_stored = max_stored_mb * 2_usize.pow(20); 112 111 let (db, partition) = tokio::task::spawn_blocking(move || { 113 112 let db = Database::builder(path) 114 - // .manual_journal_persist(true) 115 - // .flush_workers(1) 116 - // .compaction_workers(1) 117 - .journal_compression(CompressionType::None) 118 - .cache_size(cache_mb as u64 * 2_u64.pow(20)) 113 + .manual_journal_persist(true) 114 + .worker_threads(1) 115 + .cache_size(cache_mb as u64 * 2_u64.pow(20) / 2) 119 116 .temporary(true) 120 117 .open()?; 121 118 let opts = KeyspaceCreateOptions::default() 122 - .data_block_restart_interval_policy(RestartIntervalPolicy::all(8)) 123 - .filter_block_pinning_policy(PinningPolicy::disabled()) 124 119 .expect_point_read_hits(true) 125 - .data_block_compression_policy(CompressionPolicy::disabled()) 126 - .manual_journal_persist(true) 127 - .max_memtable_size(32 * 2_u64.pow(20)); 120 + .max_memtable_size(16 * 2_u64.pow(20)); 128 121 let partition = db.keyspace("z", || opts)?; 129 122 130 123 Ok::<_, DiskError>((db, partition))
+1 -1
src/drive.rs
··· 376 376 let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 377 377 mem_size += maybe_processed.len(); 378 378 chunk.push((cid, maybe_processed)); 379 - if mem_size >= self.max_size { 379 + if mem_size >= (self.max_size / 2) { 380 380 // soooooo if we're setting the db cache to max_size and then letting 381 381 // multiple chunks in the queue that are >= max_size, then at any time 382 382 // we might be using some multiple of max_size?