High-performance implementation of plcbundle written in Rust

Revert last 3 commits

+14 -27
+1 -1
src/format.rs
··· 315 315 assert_eq!(format_bytes_per_sec(1536.0), "1.5 KB/sec"); 316 316 assert_eq!(format_bytes_per_sec(1024.0 * 1024.0), "1.0 MB/sec"); 317 317 } 318 - } 318 + }
-5
src/manager.rs
··· 1726 1726 std::time::Duration, 1727 1727 std::time::Duration, 1728 1728 std::time::Duration, 1729 - std::time::Duration, 1730 1729 bool, 1731 1730 )> { 1732 1731 use anyhow::Context; ··· 1944 1943 let compressed_frames_clone = compressed_frames.clone(); 1945 1944 1946 1945 // Write file first (metadata frame doesn't contain compressed_hash, so we can write it) 1947 - let file_write_start = Instant::now(); 1948 1946 tokio::task::spawn_blocking({ 1949 1947 let bundle_path_clone = bundle_path_clone.clone(); 1950 1948 let bundle_metadata_frame_clone = bundle_metadata_frame_clone.clone(); ··· 1987 1985 }) 1988 1986 .await 1989 1987 .context("Bundle file write task failed")??; 1990 - 1991 - let file_write_time = file_write_start.elapsed(); 1992 1988 1993 1989 // Now calculate compressed_hash from the entire file (as verification does) 1994 1990 let compressed_hash = tokio::task::spawn_blocking({ ··· 2084 2080 Ok(( 2085 2081 serialize_time, 2086 2082 compress_time, 2087 - file_write_time, 2088 2083 hash_time, 2089 2084 did_index_time, 2090 2085 index_write_time,
+13 -21
src/sync.rs
··· 279 279 format!("{:.0}KB", size_kb) 280 280 }; 281 281 282 - let short_hash = &result.hash[..std::cmp::min(7, result.hash.len())]; 283 282 eprintln!( 284 283 "[INFO] → Bundle {:06} | {} | {} dids | {} | fetch: {:.2}s ({} reqs, {:.1}s wait) | save: {}ms | index: {}ms | {}", 285 284 result.bundle_num, 286 - short_hash, 285 + result.hash, 287 286 result.unique_dids, 288 287 size_str, 289 288 fetch_secs, ··· 539 538 let op: Operation = plc_op.into(); 540 539 541 540 // CRITICAL: Check against previous boundary CIDs 542 - if op 543 - .cid 544 - .as_ref() 545 - .is_some_and(|cid| prev_boundary.contains(cid)) 546 - { 541 + if op.cid.as_ref().is_some_and(|cid| prev_boundary.contains(cid)) { 547 542 batch_boundary_dupes += 1; 548 543 continue; 549 544 } ··· 743 738 744 739 // Save bundle 745 740 let ( 746 - serialize_time, 747 - compress_time, 748 - save_time, 741 + _serialize_time, 742 + _compress_time, 749 743 _hash_time, 750 - did_index_time, 751 - index_write_time, 744 + _did_index_time, 745 + index_ms, 752 746 did_index_compacted, 753 747 ) = self 754 748 .manager ··· 761 755 self.manager.add_to_mempool(remaining_ops_vec)?; 762 756 } 763 757 764 - let total_duration = fetch_stats.fetch_duration 765 - + serialize_time 766 - + compress_time 767 - + save_time 768 - + did_index_time 769 - + index_write_time; 758 + let total_duration = fetch_stats.fetch_duration + index_ms; 770 759 let total_duration_ms = total_duration.as_secs_f64() * 1000.0; 771 760 772 761 // Count unique DIDs for stats ··· 796 785 mempool_count: remaining_ops.len(), 797 786 total_duration_ms: total_duration_ms as u64, 798 787 fetch_duration_ms: fetch_duration_ms as u64, 799 - bundle_save_ms: (serialize_time + compress_time + save_time).as_millis() as u64, 800 - index_ms: (did_index_time + index_write_time).as_millis() as u64, 788 + bundle_save_ms: 0, // Included in index_ms/total for now or need to separate 789 + index_ms: index_ms.as_millis() as u64, 801 790 fetch_requests: fetch_stats.fetch_num, 802 791 hash: self 803 792 .manager ··· 890 879 } 891 880 892 881 pub async fn run_continuous(&self) -> Result<()> { 882 + 893 883 let mut total_synced = 0u32; 894 884 let mut is_initial_sync = true; 895 885 ··· 928 918 .and_then(|v| v.as_u64()) 929 919 .unwrap_or(0); 930 920 931 - let sync_result = self.sync_next_bundle(self.config.shutdown_rx.clone()).await; 921 + let sync_result = self 922 + .sync_next_bundle(self.config.shutdown_rx.clone()) 923 + .await; 932 924 933 925 match sync_result { 934 926 Ok(SyncResult::BundleCreated(result)) => {