High-performance implementation of plcbundle written in Rust

feat: add memory profiling and fetch logging support

- Add dhat heap profiler integration for memory profiling
- Implement detailed fetch logging for sync operations
- Refactor mempool to support CID collection
- Update CLI with new profiling and logging flags
- Add documentation for memory profiling usage

+434 -81
+3 -1
.gitignore
··· 7 7 .claude 8 8 .DS_Store 9 9 .trae 10 - .github 10 + .github 11 + jeprof* 12 + dhat*
+101
Cargo.lock
··· 3 3 version = 4 4 4 5 5 [[package]] 6 + name = "addr2line" 7 + version = "0.25.1" 8 + source = "registry+https://github.com/rust-lang/crates.io-index" 9 + checksum = "1b5d307320b3181d6d7954e663bd7c774a838b8220fe0593c86d9fb09f498b4b" 10 + dependencies = [ 11 + "gimli", 12 + ] 13 + 14 + [[package]] 15 + name = "adler2" 16 + version = "2.0.1" 17 + source = "registry+https://github.com/rust-lang/crates.io-index" 18 + checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" 19 + 20 + [[package]] 6 21 name = "ahash" 7 22 version = "0.8.12" 8 23 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 198 213 ] 199 214 200 215 [[package]] 216 + name = "backtrace" 217 + version = "0.3.76" 218 + source = "registry+https://github.com/rust-lang/crates.io-index" 219 + checksum = "bb531853791a215d7c62a30daf0dde835f381ab5de4589cfe7c649d2cbe92bd6" 220 + dependencies = [ 221 + "addr2line", 222 + "cfg-if", 223 + "libc", 224 + "miniz_oxide", 225 + "object", 226 + "rustc-demangle", 227 + "windows-link", 228 + ] 229 + 230 + [[package]] 201 231 name = "base-x" 202 232 version = "0.2.11" 203 233 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 649 679 checksum = "abd57806937c9cc163efc8ea3910e00a62e2aeb0b8119f1793a978088f8f6b04" 650 680 651 681 [[package]] 682 + name = "dhat" 683 + version = "0.3.3" 684 + source = "registry+https://github.com/rust-lang/crates.io-index" 685 + checksum = "98cd11d84628e233de0ce467de10b8633f4ddaecafadefc86e13b84b8739b827" 686 + dependencies = [ 687 + "backtrace", 688 + "lazy_static", 689 + "mintex", 690 + "parking_lot", 691 + "rustc-hash", 692 + "serde", 693 + "serde_json", 694 + "thousands", 695 + ] 696 + 697 + [[package]] 652 698 name = "dialoguer" 653 699 version = "0.12.0" 654 700 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 961 1007 ] 962 1008 963 1009 [[package]] 1010 + name = "gimli" 1011 + version = "0.32.3" 1012 + source = "registry+https://github.com/rust-lang/crates.io-index" 1013 + checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7" 1014 + 1015 + [[package]] 964 1016 name = "group" 965 1017 version = "0.13.0" 966 1018 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1419 1471 ] 1420 1472 1421 1473 [[package]] 1474 + name = "lazy_static" 1475 + version = "1.5.0" 1476 + source = "registry+https://github.com/rust-lang/crates.io-index" 1477 + checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" 1478 + 1479 + [[package]] 1422 1480 name = "libc" 1423 1481 version = "0.2.177" 1424 1482 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1494 1552 version = "0.3.17" 1495 1553 source = "registry+https://github.com/rust-lang/crates.io-index" 1496 1554 checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" 1555 + 1556 + [[package]] 1557 + name = "miniz_oxide" 1558 + version = "0.8.9" 1559 + source = "registry+https://github.com/rust-lang/crates.io-index" 1560 + checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" 1561 + dependencies = [ 1562 + "adler2", 1563 + ] 1564 + 1565 + [[package]] 1566 + name = "mintex" 1567 + version = "0.1.4" 1568 + source = "registry+https://github.com/rust-lang/crates.io-index" 1569 + checksum = "c505b3e17ed6b70a7ed2e67fbb2c560ee327353556120d6e72f5232b6880d536" 1497 1570 1498 1571 [[package]] 1499 1572 name = "mio" ··· 1604 1677 checksum = "ef25abbcd74fb2609453eb695bd2f860d389e457f67dc17cafc8b8cbc89d0c33" 1605 1678 1606 1679 [[package]] 1680 + name = "object" 1681 + version = "0.37.3" 1682 + source = "registry+https://github.com/rust-lang/crates.io-index" 1683 + checksum = "ff76201f031d8863c38aa7f905eca4f53abbfa15f609db4277d44cd8938f33fe" 1684 + dependencies = [ 1685 + "memchr", 1686 + ] 1687 + 1688 + [[package]] 1607 1689 name = "once_cell" 1608 1690 version = "1.21.3" 1609 1691 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1752 1834 "colored_json", 1753 1835 "crossterm", 1754 1836 "ctrlc", 1837 + "dhat", 1755 1838 "dialoguer", 1756 1839 "env_logger", 1757 1840 "fnv", ··· 2114 2197 ] 2115 2198 2116 2199 [[package]] 2200 + name = "rustc-demangle" 2201 + version = "0.1.26" 2202 + source = "registry+https://github.com/rust-lang/crates.io-index" 2203 + checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" 2204 + 2205 + [[package]] 2206 + name = "rustc-hash" 2207 + version = "1.1.0" 2208 + source = "registry+https://github.com/rust-lang/crates.io-index" 2209 + checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" 2210 + 2211 + [[package]] 2117 2212 name = "rustix" 2118 2213 version = "1.1.2" 2119 2214 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2591 2686 "quote", 2592 2687 "syn 2.0.110", 2593 2688 ] 2689 + 2690 + [[package]] 2691 + name = "thousands" 2692 + version = "0.2.0" 2693 + source = "registry+https://github.com/rust-lang/crates.io-index" 2694 + checksum = "3bf63baf9f5039dadc247375c29eb13706706cfde997d0330d05aa63a77d8820" 2594 2695 2595 2696 [[package]] 2596 2697 name = "tinystr"
+10
Cargo.toml
··· 50 50 atproto-plc = "0.2.0" 51 51 libc = "0.2" 52 52 53 + # Memory profiling (optional) 54 + dhat = { version = "0.3", optional = true } 55 + 53 56 # Server dependencies 54 57 axum = { version = "0.8", features = ["ws"], optional = true } 55 58 tower = { version = "0.5", optional = true } ··· 69 72 default = ["cli", "server"] 70 73 cli = ["clap", "clap_complete", "indicatif", "log", "env_logger", "dialoguer", "crossterm", "colored", "colored_json", "is-terminal", "similar", "ctrlc"] 71 74 server = ["axum", "tower", "tower-http", "tokio-tungstenite", "tokio-util", "futures-util", "crossterm"] 75 + 76 + # Enable with `--features dhat-heap` to collect heap profiling data 77 + dhat-heap = ["dhat"] 78 + 79 + [profile.release] 80 + # Enable line-level debug info for meaningful backtraces in the profiler 81 + debug = 1 72 82 73 83 [[test]] 74 84 name = "server"
+101
docs/profiling.md
··· 1 + # Memory Profiling with `dhat` 2 + 3 + This project integrates the [`dhat`](https://docs.rs/dhat/latest/dhat/) heap profiler to analyze memory allocations across CLI commands. It works on macOS/Linux/Windows and writes a JSON profile you can open in DHAT’s HTML viewer. 4 + 5 + ## Overview 6 + 7 + - Uses `dhat` as the global allocator to track allocations. 8 + - Starts the profiler at process startup and stops on exit. 9 + - Saves a profile file `dhat-heap.json` in the current working directory. 10 + - Best used in release mode with line-level debug info for clear backtraces. 11 + 12 + ## Prerequisites 13 + 14 + - Rust toolchain installed. 15 + - This repository already wires `dhat`: 16 + - Global allocator and profiler start: `src/cli/mod.rs:6-8`, `src/cli/mod.rs:154-155` 17 + - Feature flag and release debug info: `Cargo.toml:76-79`, `Cargo.toml:82-84` 18 + 19 + ## Enable Profiling 20 + 21 + Build and run with the profiling feature enabled: 22 + 23 + ```bash 24 + cargo build --release --features dhat-heap 25 + cargo run --release --features dhat-heap -- <COMMAND> [ARGS] 26 + ``` 27 + 28 + Examples: 29 + 30 + ```bash 31 + # Show repository status (quiet run) 32 + cargo run --release --features dhat-heap -- status --quiet 33 + 34 + # List bundles 35 + cargo run --release --features dhat-heap -- ls 36 + ``` 37 + 38 + Notes: 39 + 40 + - Use release builds (`--release`) for realistic performance and readable backtraces. 41 + - Avoid commands that immediately exit (`--help`, `--version`) if you want a profile; they may not exercise useful code paths. 42 + 43 + ## Output 44 + 45 + After the command finishes, `dhat` prints a summary to stderr and writes `dhat-heap.json` in the current directory. For example: 46 + 47 + ``` 48 + dhat: Total: 433,983 bytes in 1,186 blocks 49 + dhat: At t-gmax: 314,288 bytes in 986 blocks 50 + dhat: At t-end: 806 bytes in 7 blocks 51 + dhat: The data has been saved to dhat-heap.json, and is viewable with dhat/dh_view.html 52 + ``` 53 + 54 + ## Viewing the Profile 55 + 56 + Open DHAT’s viewer (`dh_view.html`) and load `dhat-heap.json`: 57 + 58 + - Option 1: Clone the viewer 59 + - `git clone https://github.com/nnethercote/dhat-rs` 60 + - Open `dhat-rs/dhat/dh_view.html` in your browser 61 + - Use the “Load” button to select `dhat-heap.json` 62 + 63 + - Option 2: Use the copy from crate docs 64 + - Refer to `dhat` crate documentation for viewer location and usage: https://docs.rs/dhat/latest/dhat/ 65 + 66 + The viewer provides allocation hot spots, call stacks, counts, sizes, and lifetimes. 67 + 68 + ## Customization 69 + 70 + If you need to customize the output file name or backtrace trimming, switch from `new_heap()` to the builder: 71 + 72 + ```rust 73 + // Replace the default initialization in main with: 74 + let _profiler = dhat::Profiler::builder() 75 + .file_name(format!("heap-{}.json", std::process::id())) 76 + .trim_backtraces(Some(16)) 77 + .build(); 78 + ``` 79 + 80 + Backtrace trimming defaults to a small depth to keep profiles readable and fast. Increase the frame count if you need deeper stacks, noting this increases overhead and file size. 81 + 82 + ## Disabling Profiling 83 + 84 + - Omit the feature flag: run without `--features dhat-heap`. 85 + - The instrumentation is guarded by `#[cfg(feature = "dhat-heap")]`, so the binary runs with the normal allocator when the feature is disabled. 86 + 87 + ## Tips 88 + 89 + - Profile realistic workloads (e.g., `sync`, `verify`, `server`) to see meaningful allocation behavior. 90 + - Consider ignoring generated profiles if you don’t want them in version control: 91 + - Add `dhat-*.json` to `.gitignore`. 92 + - `dhat` adds overhead; keep it off for normal runs. 93 + 94 + ## References 95 + 96 + - Crate docs: https://docs.rs/dhat/latest/dhat/ 97 + - Profiler builder: https://docs.rs/dhat/latest/dhat/struct.ProfilerBuilder.html 98 + - Repository integration points: 99 + - Global allocator: `src/cli/mod.rs:6-8` 100 + - Profiler start: `src/cli/mod.rs:154-155` 101 + - Feature and release profile: `Cargo.toml:76-79`, `Cargo.toml:82-84`
+5
src/cli/cmd_server.rs
··· 84 84 #[arg(long, default_value = "0", help_heading = "Sync Options")] 85 85 pub max_bundles: u32, 86 86 87 + /// Enable extended per-request fetch logging 88 + #[arg(long, help_heading = "Sync Options")] 89 + pub fetch_log: bool, 90 + 87 91 /// Enable WebSocket endpoint for streaming 88 92 #[arg(long, help_heading = "Feature Options")] 89 93 pub websocket: bool, ··· 139 143 sync_interval: cmd.interval, 140 144 max_bundles: cmd.max_bundles, 141 145 enable_websocket: cmd.websocket, 146 + fetch_log: cmd.fetch_log, 142 147 }; 143 148 144 149 // Create progress callback factory for DID index building (CLI-specific)
+6
src/cli/cmd_sync.rs
··· 76 76 /// Maximum bundles to fetch (0 = all, only for one-time sync) 77 77 #[arg(long, default_value = "0", conflicts_with = "continuous")] 78 78 pub max_bundles: usize, 79 + 80 + /// Enable extended per-request fetch logging 81 + #[arg(long)] 82 + pub fetch_log: bool, 79 83 } 80 84 81 85 impl HasGlobalFlags for SyncCommand { ··· 113 117 verbose: global_verbose, 114 118 shutdown_rx: None, 115 119 shutdown_tx: None, 120 + fetch_log: cmd.fetch_log, 116 121 }; 117 122 118 123 let quiet = global_quiet; ··· 131 136 verbose: global_verbose, 132 137 shutdown_rx: Some(shutdown_signal), 133 138 shutdown_tx: Some(shutdown_sender), 139 + fetch_log: cmd.fetch_log, 134 140 }; 135 141 136 142 let logger = SyncLoggerImpl::new_server(global_verbose, cmd.interval);
+7
src/cli/mod.rs
··· 3 3 use plcbundle::*; 4 4 use std::path::PathBuf; 5 5 6 + #[cfg(feature = "dhat-heap")] 7 + #[global_allocator] 8 + static ALLOC: dhat::Alloc = dhat::Alloc; 9 + 6 10 // CLI Commands (cmd_ prefix) 7 11 mod cmd_bench; 8 12 mod cmd_clean; ··· 147 151 } 148 152 149 153 fn main() -> Result<()> { 154 + #[cfg(feature = "dhat-heap")] 155 + let _profiler = dhat::Profiler::new_heap(); 156 + 150 157 let cli = Cli::parse(); 151 158 152 159 // Initialize logger based on verbosity flags
-2
src/did_index.rs
··· 186 186 struct SegmentLayer { 187 187 meta: DeltaSegmentMeta, 188 188 mmap: Mmap, 189 - _file: File, 190 189 } 191 190 192 191 impl SegmentLayer { ··· 1757 1756 layers.push(SegmentLayer { 1758 1757 meta, 1759 1758 mmap, 1760 - _file: file, 1761 1759 }); 1762 1760 } 1763 1761
+96 -20
src/manager.rs
··· 1407 1407 /// Add operations to mempool, returning number added 1408 1408 /// 1409 1409 /// Mempool must be loaded first (call `load_mempool()`). 1410 - pub fn add_to_mempool(&self, ops: Vec<Operation>) -> Result<usize> { 1411 - self.get_mempool()?; // Check if loaded 1412 - 1410 + pub fn add_to_mempool(&self, ops: Vec<Operation>, collect_cids: bool) -> Result<(usize, Vec<String>)> { 1411 + self.get_mempool()?; 1413 1412 let mut mempool_guard = self.mempool.write().unwrap(); 1414 - 1415 1413 if let Some(mp) = mempool_guard.as_mut() { 1416 - let added = mp.add(ops)?; 1414 + let result = if collect_cids { mp.add_and_collect_cids(ops)? } else { (mp.add(ops)?, Vec::new()) }; 1417 1415 mp.save_if_needed()?; 1418 - Ok(added) 1416 + Ok(result) 1419 1417 } else { 1420 1418 anyhow::bail!("Mempool not initialized") 1421 1419 } ··· 1758 1756 client: &crate::plc_client::PLCClient, 1759 1757 shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>, 1760 1758 update_did_index: bool, 1759 + fetch_log: bool, 1761 1760 ) -> Result<SyncResult> { 1762 1761 use crate::sync::{get_boundary_cids, strip_boundary_duplicates}; 1763 1762 use std::time::Instant; ··· 1894 1893 { 1895 1894 anyhow::bail!("Shutdown requested"); 1896 1895 } 1897 - let (plc_ops, wait_dur, http_dur) = if let Some(rx) = shutdown_rx.clone() { 1898 - client 1899 - .fetch_operations_cancelable(&after_time, request_count, Some(rx)) 1900 - .await? 1896 + let (plc_ops, wait_dur, http_dur, raw_capture_opt) = if fetch_log { 1897 + if let Some(rx) = shutdown_rx.clone() { 1898 + let (ops, w, h, capture_opt) = client 1899 + .fetch_operations(&after_time, request_count, Some(rx), true) 1900 + .await?; 1901 + (ops, w, h, capture_opt) 1902 + } else { 1903 + let (ops, w, h, capture_opt) = client 1904 + .fetch_operations(&after_time, request_count, None, true) 1905 + .await?; 1906 + (ops, w, h, capture_opt) 1907 + } 1901 1908 } else { 1902 - client.fetch_operations(&after_time, request_count).await? 1909 + if let Some(rx) = shutdown_rx.clone() { 1910 + let (ops, w, h, _) = client 1911 + .fetch_operations(&after_time, request_count, Some(rx), false) 1912 + .await?; 1913 + (ops, w, h, None) 1914 + } else { 1915 + let (ops, w, h, _) = 1916 + client.fetch_operations(&after_time, request_count, None, false).await?; 1917 + (ops, w, h, None) 1918 + } 1903 1919 }; 1904 1920 total_wait += wait_dur; 1905 1921 total_http += http_dur; ··· 1921 1937 1922 1938 total_fetched += fetched_count; 1923 1939 1924 - // Convert and deduplicate 1925 - let mut ops: Vec<Operation> = plc_ops.into_iter().map(Into::into).collect(); 1926 - let before_dedup = ops.len(); 1927 - ops = strip_boundary_duplicates(ops, &prev_boundary); 1940 + // Convert to operations 1941 + let ops_pre: Vec<Operation> = plc_ops.into_iter().map(Into::into).collect(); 1942 + let mut all_cids_pre: Vec<String> = Vec::new(); 1943 + if fetch_log { 1944 + all_cids_pre = ops_pre 1945 + .iter() 1946 + .filter_map(|op| op.cid.clone()) 1947 + .collect(); 1948 + } 1949 + // Deduplicate against boundary 1950 + let before_dedup = ops_pre.len(); 1951 + let ops: Vec<Operation> = strip_boundary_duplicates(ops_pre.clone(), &prev_boundary); 1928 1952 let after_dedup = ops.len(); 1929 1953 1930 1954 let boundary_removed = before_dedup - after_dedup; ··· 1938 1962 } 1939 1963 } 1940 1964 1941 - // Add to mempool 1942 - let added = if !ops.is_empty() { 1943 - self.add_to_mempool(ops)? 1965 + let export_url = if fetch_log { 1966 + client.build_export_url(&after_time, request_count) 1944 1967 } else { 1945 - 0 1968 + String::new() 1969 + }; 1970 + 1971 + let mut all_cids: Vec<String> = Vec::new(); 1972 + if fetch_log { 1973 + all_cids = all_cids_pre; 1974 + } 1975 + 1976 + let (added, added_cids) = if !ops.is_empty() { 1977 + self.add_to_mempool(ops, fetch_log)? 1978 + } else { 1979 + (0, Vec::new()) 1946 1980 }; 1981 + 1982 + if fetch_log { 1983 + use serde_json::json; 1984 + let log_dir = self.directory.join(constants::DID_INDEX_DIR).join("logs"); 1985 + let _ = std::fs::create_dir_all(&log_dir); 1986 + let log_path = log_dir.join(format!("{:06}.json", next_bundle_num)); 1987 + let added_set: std::collections::HashSet<String> = 1988 + added_cids.iter().cloned().collect(); 1989 + let skipped: Vec<String> = all_cids 1990 + .iter() 1991 + .filter(|c| !added_set.contains(*c)) 1992 + .cloned() 1993 + .collect(); 1994 + let entry = json!({ 1995 + "time": chrono::Utc::now().to_rfc3339(), 1996 + "url": export_url, 1997 + "count": fetched_count, 1998 + "cids": all_cids, 1999 + "skipped": skipped, 2000 + }); 2001 + let mut file = std::fs::OpenOptions::new() 2002 + .create(true) 2003 + .append(true) 2004 + .open(log_path)?; 2005 + use std::io::Write; 2006 + writeln!(file, "{}", entry.to_string())?; 2007 + 2008 + if let Some(capture) = raw_capture_opt.as_ref() { 2009 + let raw_path = log_dir.join(format!("{:06}-{}", next_bundle_num, after_time)); 2010 + let mut raw_file = std::fs::OpenOptions::new() 2011 + .create(true) 2012 + .write(true) 2013 + .truncate(true) 2014 + .open(raw_path)?; 2015 + writeln!(raw_file, "Status: {}", capture.status)?; 2016 + for (name, value) in &capture.headers { 2017 + writeln!(raw_file, "{}: {}", name, value)?; 2018 + } 2019 + writeln!(raw_file)?; 2020 + write!(raw_file, "{}", capture.body)?; 2021 + } 2022 + } 1947 2023 1948 2024 let dupes_in_fetch = after_dedup - added; 1949 2025 total_dupes += dupes_in_fetch; ··· 2191 2267 let mut synced = 0; 2192 2268 2193 2269 loop { 2194 - match self.sync_next_bundle(client, None, true).await { 2270 + match self.sync_next_bundle(client, None, true, false).await { 2195 2271 Ok(SyncResult::BundleCreated { .. }) => { 2196 2272 synced += 1; 2197 2273
+18 -13
src/mempool.rs
··· 69 69 Ok(mempool) 70 70 } 71 71 72 - /// Add operations to the mempool with strict validation 73 - pub fn add(&mut self, ops: Vec<Operation>) -> Result<usize> { 72 + fn add_internal(&mut self, ops: Vec<Operation>, collect_cids: bool) -> Result<(usize, Vec<String>)> { 74 73 if ops.is_empty() { 75 - return Ok(0); 74 + return Ok((0, Vec::new())); 76 75 } 77 76 78 - // Build existing CID set 79 77 let mut existing_cids: HashSet<String> = self 80 78 .operations 81 79 .iter() ··· 86 84 let total_in = ops.len(); 87 85 let mut skipped_no_cid = 0usize; 88 86 let mut skipped_dupe = 0usize; 87 + let mut added_cids = Vec::new(); 89 88 90 - // Start from last operation time if we have any 91 89 let mut last_time = if !self.operations.is_empty() { 92 90 self.parse_timestamp(&self.operations.last().unwrap().created_at)? 93 91 } else { ··· 98 96 let mut first_added_time: Option<DateTime<Utc>> = None; 99 97 let mut last_added_time: Option<DateTime<Utc>> = None; 100 98 for op in ops { 101 - // Skip if no CID 102 99 let cid = match &op.cid { 103 100 Some(c) => c, 104 101 None => { ··· 107 104 } 108 105 }; 109 106 110 - // Skip duplicates 111 107 if existing_cids.contains(cid) { 112 108 skipped_dupe += 1; 113 109 continue; 114 110 } 115 111 116 112 let op_time = self.parse_timestamp(&op.created_at)?; 117 - 118 - // CRITICAL: Validate chronological order 119 113 if op_time < last_time { 120 114 bail!( 121 115 "chronological violation: operation {} at {} is before {}", ··· 124 118 last_time.to_rfc3339() 125 119 ); 126 120 } 127 - 128 - // Validate operation is after minimum timestamp 129 121 if op_time < self.min_timestamp { 130 122 bail!( 131 123 "operation {} at {} is before minimum timestamp {} (belongs in earlier bundle)", ··· 136 128 } 137 129 138 130 new_ops.push(op.clone()); 131 + if collect_cids { 132 + added_cids.push(cid.clone()); 133 + } 139 134 existing_cids.insert(cid.clone()); 140 135 last_time = op_time; 141 136 if first_added_time.is_none() { ··· 146 141 147 142 let added = new_ops.len(); 148 143 149 - // Add new operations and update DID index 150 144 let start_idx = self.operations.len(); 151 145 self.operations.extend(new_ops); 152 146 153 - // Update DID index for new operations 154 147 for (offset, op) in self.operations[start_idx..].iter().enumerate() { 155 148 let idx = start_idx + offset; 156 149 self.did_index.entry(op.did.clone()).or_default().push(idx); ··· 182 175 } 183 176 } 184 177 178 + Ok((added, added_cids)) 179 + } 180 + 181 + /// Add operations to the mempool with strict validation 182 + pub fn add(&mut self, ops: Vec<Operation>) -> Result<usize> { 183 + let (added, _) = self.add_internal(ops, false)?; 185 184 Ok(added) 185 + } 186 + 187 + pub fn add_and_collect_cids(&mut self, ops: Vec<Operation>) -> Result<(usize, Vec<String>)> { 188 + self.add_internal(ops, true) 186 189 } 187 190 188 191 /// Validate performs a full chronological validation of all operations ··· 308 311 pub fn clear(&mut self) { 309 312 let prev = self.operations.len(); 310 313 self.operations.clear(); 314 + self.operations.shrink_to_fit(); 311 315 self.did_index.clear(); 316 + self.did_index.shrink_to_fit(); 312 317 self.validated = false; 313 318 self.dirty = true; 314 319 if self.verbose {
+80 -44
src/plc_client.rs
··· 20 20 rate_limit_period: Duration, 21 21 } 22 22 23 + pub struct RawExportResponse { 24 + pub status: u16, 25 + pub headers: Vec<(String, String)>, 26 + pub body: String, 27 + } 28 + 23 29 impl PLCClient { 24 30 pub fn new(base_url: impl Into<String>) -> Result<Self> { 25 31 let period = Duration::from_secs(constants::PLC_RATE_LIMIT_PERIOD); ··· 36 42 request_timestamps: Arc::new(std::sync::Mutex::new(VecDeque::new())), 37 43 rate_limit_period: period, 38 44 }) 45 + } 46 + 47 + pub fn build_export_url(&self, after: &str, count: usize) -> String { 48 + format!("{}/export?after={}&count={}", self.base_url, after, count) 39 49 } 40 50 41 51 /// Record a request timestamp and clean up old entries ··· 72 82 &self, 73 83 after: &str, 74 84 count: usize, 75 - ) -> Result<(Vec<PLCOperation>, Duration, Duration)> { 76 - self.fetch_operations_with_retry_cancelable(after, count, 5, None) 77 - .await 78 - } 79 - 80 - pub async fn fetch_operations_cancelable( 81 - &self, 82 - after: &str, 83 - count: usize, 84 85 shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>, 85 - ) -> Result<(Vec<PLCOperation>, Duration, Duration)> { 86 - self.fetch_operations_with_retry_cancelable(after, count, 5, shutdown_rx) 86 + capture_raw: bool, 87 + ) -> Result<( 88 + Vec<PLCOperation>, 89 + Duration, 90 + Duration, 91 + Option<RawExportResponse>, 92 + )> { 93 + self.fetch_operations_unified(after, count, shutdown_rx, capture_raw) 87 94 .await 88 95 } 89 96 90 - async fn fetch_operations_with_retry_cancelable( 97 + // merged into `fetch_operations` 98 + 99 + // merged into `fetch_operations` 100 + 101 + async fn fetch_operations_unified( 91 102 &self, 92 103 after: &str, 93 104 count: usize, 94 - max_retries: usize, 95 105 shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>, 96 - ) -> Result<(Vec<PLCOperation>, Duration, Duration)> { 106 + capture_raw: bool, 107 + ) -> Result<( 108 + Vec<PLCOperation>, 109 + Duration, 110 + Duration, 111 + Option<RawExportResponse>, 112 + )> { 97 113 let mut backoff = Duration::from_secs(1); 98 114 let mut last_err = None; 99 115 let mut total_wait = Duration::from_secs(0); 100 116 let mut total_http = Duration::from_secs(0); 101 117 102 - for attempt in 1..=max_retries { 103 - if let Some(ref rx) = shutdown_rx 104 - && *rx.borrow() 105 - { 118 + for attempt in 1..=5 { 119 + if let Some(ref rx) = shutdown_rx && *rx.borrow() { 106 120 anyhow::bail!("Shutdown requested"); 107 121 } 108 122 let export_url = format!("{}/export?after={}&count={}", self.base_url, after, count); ··· 121 135 if let Some(mut rx) = shutdown_rx.clone() { 122 136 tokio::select! { 123 137 _ = self.rate_limiter.wait() => {} 124 - _ = rx.changed() => { 125 - if *rx.borrow() { anyhow::bail!("Shutdown requested"); } 126 - } 138 + _ = rx.changed() => { if *rx.borrow() { anyhow::bail!("Shutdown requested"); } } 127 139 } 128 140 } else { 129 141 self.rate_limiter.wait().await; ··· 134 146 } 135 147 total_wait += wait_elapsed; 136 148 137 - // Clear previous retry_after 138 149 *self.last_retry_after.lock().await = None; 139 - 140 - // Record this request attempt 141 150 self.record_request(); 142 151 143 - match self.do_fetch_operations(after, count).await { 144 - Ok((operations, http_duration)) => { 152 + let result = if capture_raw { 153 + self.do_fetch_operations(after, count, true).await 154 + } else { 155 + self.do_fetch_operations(after, count, false).await 156 + }; 157 + 158 + match result { 159 + Ok((operations, http_duration, capture)) => { 145 160 total_http += http_duration; 146 - return Ok((operations, total_wait, total_http)); 161 + return Ok((operations, total_wait, total_http, capture)); 147 162 } 148 163 Err(e) => { 149 164 last_err = Some(e); 150 - 151 - // Check if it's a rate limit error (429) 152 165 let retry_after = self.last_retry_after.lock().await.take(); 153 166 if let Some(retry_after) = retry_after { 154 167 let requests_in_period = self.count_requests_in_period(); ··· 162 175 rate_limit, 163 176 retry_after, 164 177 attempt, 165 - max_retries 178 + 5 166 179 ); 167 180 if let Some(mut rx) = shutdown_rx.clone() { 168 181 tokio::select! { 169 182 _ = tokio::time::sleep(retry_after) => {} 170 - _ = rx.changed() => { 171 - if *rx.borrow() { anyhow::bail!("Shutdown requested"); } 172 - } 183 + _ = rx.changed() => { if *rx.borrow() { anyhow::bail!("Shutdown requested"); } } 173 184 } 174 185 } else { 175 186 tokio::time::sleep(retry_after).await; ··· 177 188 continue; 178 189 } 179 190 180 - // Other errors - exponential backoff 181 - if attempt < max_retries { 191 + if attempt < 5 { 182 192 eprintln!( 183 193 "[Sync] Request failed (attempt {}/{}): {}, retrying in {:?}", 184 194 attempt, 185 - max_retries, 195 + 5, 186 196 last_err.as_ref().unwrap(), 187 197 backoff 188 198 ); 189 199 if let Some(mut rx) = shutdown_rx.clone() { 190 200 tokio::select! { 191 201 _ = tokio::time::sleep(backoff) => {} 192 - _ = rx.changed() => { 193 - if *rx.borrow() { anyhow::bail!("Shutdown requested"); } 194 - } 202 + _ = rx.changed() => { if *rx.borrow() { anyhow::bail!("Shutdown requested"); } } 195 203 } 196 204 } else { 197 205 tokio::time::sleep(backoff).await; 198 206 } 199 - backoff *= 2; // Exponential backoff 207 + backoff *= 2; 200 208 } 201 209 } 202 210 } ··· 204 212 205 213 anyhow::bail!( 206 214 "Failed after {} attempts: {}", 207 - max_retries, 215 + 5, 208 216 last_err.unwrap_or_else(|| anyhow::anyhow!("Unknown error")) 209 217 ) 210 218 } 211 219 220 + // Removed legacy duplicate retry path; unified via `fetch_operations_unified` 221 + 212 222 async fn do_fetch_operations( 213 223 &self, 214 224 after: &str, 215 225 count: usize, 216 - ) -> Result<(Vec<PLCOperation>, Duration)> { 226 + capture_raw: bool, 227 + ) -> Result<( 228 + Vec<PLCOperation>, 229 + Duration, 230 + Option<RawExportResponse>, 231 + )> { 217 232 let url = format!("{}/export", self.base_url); 218 233 let request_start = Instant::now(); 219 234 let response = self ··· 224 239 .send() 225 240 .await?; 226 241 227 - // Handle rate limiting (429) 228 242 if response.status() == reqwest::StatusCode::TOO_MANY_REQUESTS { 229 243 let retry_after = parse_retry_after(&response); 230 244 *self.last_retry_after.lock().await = Some(retry_after); ··· 234 248 if !response.status().is_success() { 235 249 anyhow::bail!("PLC request failed: {}", response.status()); 236 250 } 251 + let status = if capture_raw { Some(response.status().as_u16()) } else { None }; 252 + let headers_vec: Option<Vec<(String, String)>> = if capture_raw { 253 + Some( 254 + response 255 + .headers() 256 + .iter() 257 + .filter_map(|(k, v)| Some((k.as_str().to_string(), v.to_str().ok()?.to_string()))) 258 + .collect(), 259 + ) 260 + } else { 261 + None 262 + }; 237 263 238 264 let body = response.text().await?; 239 265 let request_duration = request_start.elapsed(); ··· 256 282 } 257 283 } 258 284 259 - Ok((operations, request_duration)) 285 + let capture = if capture_raw { 286 + Some(RawExportResponse { 287 + status: status.unwrap(), 288 + headers: headers_vec.unwrap(), 289 + body, 290 + }) 291 + } else { 292 + None 293 + }; 294 + 295 + Ok((operations, request_duration, capture)) 260 296 } 261 297 262 298 /// Fetch DID document raw JSON from PLC directory
+3
src/server/startup.rs
··· 25 25 pub sync_interval: Duration, 26 26 pub max_bundles: u32, 27 27 pub enable_websocket: bool, 28 + pub fetch_log: bool, 28 29 } 29 30 30 31 /// Progress callback for DID index building ··· 453 454 let interval = config.sync_interval; 454 455 let max_bundles = config.max_bundles; 455 456 let verbose = config.verbose; 457 + let fetch_log = config.fetch_log; 456 458 let shutdown_signal = server_runtime.shutdown_signal(); 457 459 let sync_runtime = server_runtime.clone(); 458 460 ··· 477 479 verbose, 478 480 shutdown_rx: Some(shutdown_signal), 479 481 shutdown_tx: Some(sync_runtime.shutdown_sender()), 482 + fetch_log, 480 483 }; 481 484 482 485 use crate::sync::SyncLoggerImpl;
+4 -1
src/sync.rs
··· 131 131 pub verbose: bool, 132 132 pub shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>, 133 133 pub shutdown_tx: Option<tokio::sync::watch::Sender<bool>>, 134 + pub fetch_log: bool, 134 135 } 135 136 136 137 impl Default for SyncConfig { ··· 143 144 verbose: false, 144 145 shutdown_rx: None, 145 146 shutdown_tx: None, 147 + fetch_log: false, 146 148 } 147 149 } 148 150 } ··· 506 508 507 509 match self 508 510 .manager 509 - .sync_next_bundle(&self.client, None, true) 511 + .sync_next_bundle(&self.client, None, true, self.config.fetch_log) 510 512 .await 511 513 { 512 514 Ok(crate::manager::SyncResult::BundleCreated { ··· 643 645 &self.client, 644 646 self.config.shutdown_rx.clone(), 645 647 !is_initial_sync, 648 + self.config.fetch_log, 646 649 ) 647 650 .await; 648 651