High-performance implementation of plcbundle written in Rust

refactor(websocket): improve bundle tracking and operation counting

+47 -8
+28 -1
src/manager.rs
··· 1808 1808 1809 1809 // ALWAYS get boundaries from last bundle initially 1810 1810 let (mut after_time, mut prev_boundary) = if next_bundle_num > 1 { 1811 - let last = self.load_bundle(next_bundle_num - 1, LoadOptions::default())?; 1811 + let last = self.load_bundle( 1812 + next_bundle_num - 1, 1813 + LoadOptions { 1814 + cache: false, 1815 + decompress: true, 1816 + filter: None, 1817 + limit: None, 1818 + }, 1819 + )?; 1812 1820 let boundary = get_boundary_cids(&last.operations); 1813 1821 let cursor = last 1814 1822 .operations ··· 2921 2929 /// Get a copy of the current index 2922 2930 pub fn get_index(&self) -> Index { 2923 2931 self.index.read().unwrap().clone() 2932 + } 2933 + 2934 + pub fn bundle_count(&self) -> usize { 2935 + self.index.read().unwrap().bundles.len() 2936 + } 2937 + 2938 + pub fn get_mempool_operations_from(&self, start: usize) -> Result<Vec<Operation>> { 2939 + let mempool_guard = self.mempool.read().unwrap(); 2940 + match mempool_guard.as_ref() { 2941 + Some(mp) => { 2942 + let ops = mp.get_operations(); 2943 + if start >= ops.len() { 2944 + Ok(Vec::new()) 2945 + } else { 2946 + Ok(ops[start..].to_vec()) 2947 + } 2948 + } 2949 + None => Ok(Vec::new()), 2950 + } 2924 2951 } 2925 2952 2926 2953 // === Remote Access ===
+13 -1
src/plc_client.rs
··· 361 361 /// Prevents burst requests by starting with 0 permits and refilling at steady rate 362 362 struct RateLimiter { 363 363 semaphore: std::sync::Arc<tokio::sync::Semaphore>, 364 + shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>, 364 365 } 365 366 366 367 impl RateLimiter { ··· 369 370 // Start with 0 permits to prevent initial burst 370 371 let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(0)); 371 372 let sem_clone = semaphore.clone(); 373 + let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); 374 + let shutdown_clone = shutdown.clone(); 372 375 373 376 let refill_rate = 374 377 Duration::from_secs_f64(period.as_secs_f64() / requests_per_period as f64); ··· 385 388 386 389 // Then refill at steady rate 387 390 loop { 391 + if shutdown_clone.load(std::sync::atomic::Ordering::Relaxed) { 392 + break; 393 + } 388 394 tokio::time::sleep(refill_rate_clone).await; 389 395 // Add one permit if under capacity (burst allowed up to capacity) 390 396 if sem_clone.available_permits() < capacity { ··· 393 399 } 394 400 }); 395 401 396 - Self { semaphore } 402 + Self { semaphore, shutdown } 397 403 } 398 404 399 405 async fn wait(&self) { ··· 409 415 410 416 fn available_permits(&self) -> usize { 411 417 self.semaphore.available_permits() 418 + } 419 + } 420 + 421 + impl Drop for RateLimiter { 422 + fn drop(&mut self) { 423 + self.shutdown.store(true, std::sync::atomic::Ordering::Relaxed); 412 424 } 413 425 } 414 426
+6 -6
src/server/websocket.rs
··· 100 100 } 101 101 102 102 // Stream mempool operations 103 - let bundle_record_base = crate::constants::total_operations_from_bundles(bundles.len() as u32); 103 + let mut bundle_record_base = crate::constants::total_operations_from_bundles(bundles.len() as u32); 104 104 let mut last_seen_mempool_count = 0; 105 105 106 106 stream_mempool( ··· 121 121 ticker.tick().await; 122 122 123 123 // Check for new bundles 124 - let index = state.manager.get_index(); 125 - let bundles = &index.bundles; 124 + let current_bundle_count = state.manager.bundle_count(); 126 125 127 - if bundles.len() > last_bundle_count { 128 - let new_bundle_count = bundles.len() - last_bundle_count; 126 + if current_bundle_count > last_bundle_count { 127 + let new_bundle_count = current_bundle_count - last_bundle_count; 129 128 current_record += 130 129 crate::constants::total_operations_from_bundles(new_bundle_count as u32); 131 - last_bundle_count = bundles.len(); 130 + last_bundle_count = current_bundle_count; 131 + bundle_record_base = crate::constants::total_operations_from_bundles(last_bundle_count as u32); 132 132 last_seen_mempool_count = 0; 133 133 } 134 134