Fast and robust atproto CAR file processing in rust

bring back disk driver's reset -> disk store

+22 -7
+13 -7
src/disk.rs
··· 94 94 pub struct DiskStore { 95 95 #[allow(unused)] 96 96 db: Database, 97 - partition: Keyspace, 97 + keyspace: Keyspace, 98 98 max_stored: usize, 99 99 stored: usize, 100 100 } ··· 107 107 max_stored_mb: usize, 108 108 ) -> Result<Self, DiskError> { 109 109 let max_stored = max_stored_mb * 2_usize.pow(20); 110 - let (db, partition) = tokio::task::spawn_blocking(move || { 110 + let (db, keyspace) = tokio::task::spawn_blocking(move || { 111 111 let db = Database::builder(path) 112 112 .manual_journal_persist(true) 113 113 .worker_threads(1) ··· 117 117 let opts = KeyspaceCreateOptions::default() 118 118 .expect_point_read_hits(true) 119 119 .max_memtable_size(16 * 2_u64.pow(20)); 120 - let partition = db.keyspace("z", || opts)?; 120 + let keyspace = db.keyspace("z", || opts)?; 121 121 122 - Ok::<_, DiskError>((db, partition)) 122 + Ok::<_, DiskError>((db, keyspace)) 123 123 }) 124 124 .await??; 125 125 126 126 Ok(Self { 127 127 db, 128 - partition, 128 + keyspace, 129 129 max_stored, 130 130 stored: 0, 131 131 }) ··· 141 141 if self.stored > self.max_stored { 142 142 return Err(DiskError::MaxSizeExceeded.into()); 143 143 } 144 - batch.insert(&self.partition, k, v); 144 + batch.insert(&self.keyspace, k, v); 145 145 } 146 146 batch.commit().map_err(DiskError::DbError)?; 147 147 Ok(()) ··· 149 149 150 150 #[inline] 151 151 pub(crate) fn get(&mut self, key: &[u8]) -> Result<Option<fjall::Slice>, FjallError> { 152 - self.partition.get(key) 152 + self.keyspace.get(key) 153 + } 154 + 155 + /// Drop and recreate the kv table 156 + pub async fn reset(&self) -> Result<(), DiskError> { 157 + let keyspace = self.keyspace.clone(); 158 + Ok(tokio::task::spawn_blocking(move || keyspace.clear()).await??) 153 159 } 154 160 }
+9
src/drive.rs
··· 552 552 553 553 (rx, chan_task) 554 554 } 555 + 556 + /// Reset the disk storage so it can be reused. 557 + /// 558 + /// The store is returned, so it can be reused for another `DiskDriver`. 559 + pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> { 560 + let BigState { store, .. } = self.state.take().expect("valid state"); 561 + store.reset().await?; 562 + Ok(store) 563 + } 555 564 }