High-performance implementation of plcbundle written in Rust

feat: add duration age formatting and refactor sync operations

+703 -880
+27 -1
src/format.rs
··· 175 175 } 176 176 } 177 177 178 + /// Format a duration as a relative age string (e.g. "2 days ago", "just now"). 179 + pub fn format_age(duration: chrono::Duration) -> String { 180 + let days = duration.num_days(); 181 + if days >= 365 { 182 + let years = days as f64 / 365.25; 183 + format!("{:.1} years ago", years) 184 + } else if days >= 30 { 185 + let months = days as f64 / 30.0; 186 + format!("{:.1} months ago", months) 187 + } else if days > 0 { 188 + format!("{} days ago", days) 189 + } else { 190 + let hours = duration.num_hours(); 191 + if hours > 0 { 192 + format!("{} hours ago", hours) 193 + } else { 194 + let mins = duration.num_minutes(); 195 + if mins > 0 { 196 + format!("{} minutes ago", mins) 197 + } else { 198 + "just now".to_string() 199 + } 200 + } 201 + } 202 + } 203 + 178 204 #[cfg(test)] 179 205 mod tests { 180 206 use super::*; ··· 289 315 assert_eq!(format_bytes_per_sec(1536.0), "1.5 KB/sec"); 290 316 assert_eq!(format_bytes_per_sec(1024.0 * 1024.0), "1.0 MB/sec"); 291 317 } 292 - } 318 + }
+2 -1
src/lib.rs
··· 60 60 CleanPreviewFile, CleanResult, DIDIndexStats, ExportFormat, ExportSpec, InfoFlags, 61 61 IntoManagerOptions, LoadOptions, LoadResult, ManagerOptions, ManagerStats, OperationResult, 62 62 QuerySpec, RebuildStats, ResolveResult, RollbackFileStats, RollbackPlan, RollbackResult, 63 - RollbackSpec, SyncResult, VerifyResult, VerifySpec, WarmUpSpec, WarmUpStrategy, 63 + RollbackSpec, VerifyResult, VerifySpec, WarmUpSpec, WarmUpStrategy, 64 64 }; 65 65 pub use operations::{Operation, OperationFilter, OperationRequest, OperationWithLocation}; 66 66 pub use options::{Options, OptionsBuilder, QueryMode}; 67 + pub use sync::SyncResult;
+3 -502
src/manager.rs
··· 8 8 use crate::{cache, did_index, handle_resolver, mempool, verification}; 9 9 use anyhow::{Context, Result}; 10 10 use chrono::{DateTime, Utc}; 11 - use std::collections::{HashMap, HashSet}; 11 + use std::collections::HashMap; 12 12 use std::io::Write; 13 13 use std::path::PathBuf; 14 14 use std::sync::{Arc, Mutex, RwLock}; 15 - 16 - /// Result of a sync_next_bundle operation 17 - #[derive(Debug, Clone)] 18 - pub enum SyncResult { 19 - /// Successfully created a bundle 20 - BundleCreated { 21 - bundle_num: u32, 22 - mempool_count: usize, 23 - duration_ms: u64, 24 - fetch_duration_ms: u64, 25 - bundle_save_ms: u64, 26 - index_ms: u64, 27 - fetch_requests: usize, 28 - hash: String, 29 - age: String, 30 - did_index_compacted: bool, 31 - unique_dids: u32, 32 - size_bytes: u64, 33 - fetch_wait_ms: u64, 34 - fetch_http_ms: u64, 35 - }, 36 - /// Caught up to latest PLC data, mempool has partial operations 37 - CaughtUp { 38 - next_bundle: u32, 39 - mempool_count: usize, 40 - new_ops: usize, 41 - fetch_duration_ms: u64, 42 - }, 43 - } 44 15 45 16 /// High-level manager for PLC bundle repositories 46 17 /// ··· 1484 1455 // === Sync Operations === 1485 1456 1486 1457 /// Validate and clean repository state before sync 1487 - fn validate_sync_state(&self) -> Result<()> { 1458 + pub(crate) fn validate_sync_state(&self) -> Result<()> { 1488 1459 let last_bundle = self.get_last_bundle(); 1489 1460 let next_bundle_num = last_bundle + 1; 1490 1461 ··· 1744 1715 .map_err(|e| anyhow::anyhow!("Batch DID index update task failed: {}", e))? 1745 1716 } 1746 1717 1747 - /// Fetch and save next bundle from PLC directory 1748 - /// DID index is updated on every bundle (fast with delta segments) 1749 - pub async fn sync_next_bundle( 1750 - &self, 1751 - client: &crate::plc_client::PLCClient, 1752 - shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>, 1753 - ) -> Result<SyncResult> { 1754 - use crate::sync::{get_boundary_cids, strip_boundary_duplicates}; 1755 - use std::time::Instant; 1756 - 1757 - // Validate repository state before starting 1758 - self.validate_sync_state()?; 1759 - 1760 - let next_bundle_num = self.get_last_bundle() + 1; 1761 - 1762 - // ALWAYS get boundaries from last bundle initially 1763 - let (mut after_time, mut prev_boundary) = if next_bundle_num > 1 { 1764 - let last = self.load_bundle(next_bundle_num - 1, LoadOptions::default())?; 1765 - let boundary = get_boundary_cids(&last.operations); 1766 - let cursor = last 1767 - .operations 1768 - .last() 1769 - .map(|op| op.created_at.clone()) 1770 - .unwrap_or_default(); 1771 - 1772 - if *self.verbose.lock().unwrap() { 1773 - log::info!( 1774 - "Loaded {} boundary CIDs from bundle {:06} (at {})", 1775 - boundary.len(), 1776 - next_bundle_num - 1, 1777 - cursor 1778 - ); 1779 - } 1780 - 1781 - (cursor, boundary) 1782 - } else { 1783 - ("1970-01-01T00:00:00Z".to_string(), HashSet::new()) 1784 - }; 1785 - 1786 - // If mempool has operations, update cursor AND boundaries from mempool 1787 - // (mempool operations already had boundary dedup applied when they were added) 1788 - let mempool_stats = self.get_mempool_stats()?; 1789 - if mempool_stats.count > 0 1790 - && let Some(last_time) = mempool_stats.last_time 1791 - { 1792 - if *self.verbose.lock().unwrap() { 1793 - log::debug!( 1794 - "Mempool has {} ops, resuming from {}", 1795 - mempool_stats.count, 1796 - last_time.format("%Y-%m-%dT%H:%M:%S") 1797 - ); 1798 - } 1799 - after_time = last_time.to_rfc3339(); 1800 - 1801 - // Calculate boundaries from MEMPOOL for next fetch 1802 - let mempool_ops = self.get_mempool_operations()?; 1803 - if !mempool_ops.is_empty() { 1804 - prev_boundary = get_boundary_cids(&mempool_ops); 1805 - if *self.verbose.lock().unwrap() { 1806 - log::info!("Using {} boundary CIDs from mempool", prev_boundary.len()); 1807 - } 1808 - } 1809 - } 1810 - 1811 - log::debug!( 1812 - "Preparing bundle {:06} (mempool: {} ops)...", 1813 - next_bundle_num, 1814 - mempool_stats.count 1815 - ); 1816 - log::debug!( 1817 - "Starting cursor: {}", 1818 - if after_time.is_empty() || after_time == "1970-01-01T00:00:00Z" { 1819 - "" 1820 - } else { 1821 - &after_time 1822 - } 1823 - ); 1824 - 1825 - if !prev_boundary.is_empty() && *self.verbose.lock().unwrap() && mempool_stats.count == 0 { 1826 - log::info!( 1827 - " Starting with {} boundary CIDs from previous bundle", 1828 - prev_boundary.len() 1829 - ); 1830 - } 1831 - 1832 - // Ensure mempool is loaded (load if needed) 1833 - self.load_mempool()?; 1834 - 1835 - // Fetch until we have 10,000 operations 1836 - let mut fetch_num = 0; 1837 - let mut total_fetched = 0; 1838 - let mut total_dupes = 0; 1839 - let mut total_boundary_dupes = 0; 1840 - let fetch_start = Instant::now(); 1841 - let mut caught_up = false; 1842 - const MAX_ATTEMPTS: usize = 50; 1843 - let mut total_wait = std::time::Duration::from_secs(0); 1844 - let mut total_http = std::time::Duration::from_secs(0); 1845 - 1846 - while fetch_num < MAX_ATTEMPTS { 1847 - let stats = self.get_mempool_stats()?; 1848 - 1849 - if stats.count >= constants::BUNDLE_SIZE { 1850 - break; 1851 - } 1852 - 1853 - fetch_num += 1; 1854 - let needed = constants::BUNDLE_SIZE - stats.count; 1855 - 1856 - // Smart batch sizing - request more than exact amount to account for duplicates 1857 - let request_count = match needed { 1858 - n if n <= 50 => 50, 1859 - n if n <= 100 => 100, 1860 - n if n <= 500 => 200, 1861 - _ => 1000, 1862 - }; 1863 - 1864 - if *self.verbose.lock().unwrap() { 1865 - log::info!( 1866 - " Fetch #{}: requesting {} (need {} more, have {}/{})", 1867 - fetch_num, 1868 - request_count, 1869 - needed, 1870 - stats.count, 1871 - constants::BUNDLE_SIZE 1872 - ); 1873 - } 1874 - 1875 - let fetch_op_start = Instant::now(); 1876 - if let Some(ref rx) = shutdown_rx && *rx.borrow() { 1877 - anyhow::bail!("Shutdown requested"); 1878 - } 1879 - let (plc_ops, wait_dur, http_dur) = if let Some(rx) = shutdown_rx.clone() { 1880 - client 1881 - .fetch_operations_cancelable(&after_time, request_count, Some(rx)) 1882 - .await? 1883 - } else { 1884 - client.fetch_operations(&after_time, request_count).await? 1885 - }; 1886 - total_wait += wait_dur; 1887 - total_http += http_dur; 1888 - 1889 - let fetched_count = plc_ops.len(); 1890 - 1891 - // Check for incomplete batch (indicates caught up) 1892 - let got_incomplete_batch = fetched_count > 0 && fetched_count < request_count; 1893 - 1894 - if plc_ops.is_empty() || got_incomplete_batch { 1895 - caught_up = true; 1896 - if *self.verbose.lock().unwrap() && fetch_num > 0 { 1897 - log::debug!("Caught up to latest PLC data"); 1898 - } 1899 - if plc_ops.is_empty() { 1900 - break; 1901 - } 1902 - } 1903 - 1904 - total_fetched += fetched_count; 1905 - 1906 - // Convert and deduplicate 1907 - let mut ops: Vec<Operation> = plc_ops.into_iter().map(Into::into).collect(); 1908 - let before_dedup = ops.len(); 1909 - ops = strip_boundary_duplicates(ops, &prev_boundary); 1910 - let after_dedup = ops.len(); 1911 - 1912 - let boundary_removed = before_dedup - after_dedup; 1913 - if boundary_removed > 0 { 1914 - total_boundary_dupes += boundary_removed; 1915 - if *self.verbose.lock().unwrap() { 1916 - log::info!( 1917 - " Stripped {} boundary duplicates from fetch", 1918 - boundary_removed 1919 - ); 1920 - } 1921 - } 1922 - 1923 - // Add to mempool 1924 - let added = if !ops.is_empty() { 1925 - self.add_to_mempool(ops)? 1926 - } else { 1927 - 0 1928 - }; 1929 - 1930 - let dupes_in_fetch = after_dedup - added; 1931 - total_dupes += dupes_in_fetch; 1932 - 1933 - let fetch_duration = fetch_op_start.elapsed(); 1934 - let new_stats = self.get_mempool_stats()?; 1935 - let ops_per_sec = if fetch_duration.as_secs_f64() > 0.0 { 1936 - added as f64 / fetch_duration.as_secs_f64() 1937 - } else { 1938 - 0.0 1939 - }; 1940 - 1941 - if *self.verbose.lock().unwrap() { 1942 - if boundary_removed > 0 || dupes_in_fetch > 0 { 1943 - log::info!( 1944 - " → +{} unique ({} dupes, {} boundary) in {:.9}s • Running: {}/{} ({:.0} ops/sec)", 1945 - added, 1946 - dupes_in_fetch, 1947 - boundary_removed, 1948 - fetch_duration.as_secs_f64(), 1949 - new_stats.count, 1950 - constants::BUNDLE_SIZE, 1951 - ops_per_sec 1952 - ); 1953 - } else { 1954 - log::info!( 1955 - " → +{} unique in {:.9}s • Running: {}/{} ({:.0} ops/sec)", 1956 - added, 1957 - fetch_duration.as_secs_f64(), 1958 - new_stats.count, 1959 - constants::BUNDLE_SIZE, 1960 - ops_per_sec 1961 - ); 1962 - } 1963 - } 1964 - 1965 - // Update cursor 1966 - if let Some(last_time) = new_stats.last_time { 1967 - after_time = last_time.to_rfc3339(); 1968 - } 1969 - 1970 - // Stop if we got an incomplete batch or made no progress 1971 - if got_incomplete_batch || added == 0 { 1972 - caught_up = true; 1973 - if *self.verbose.lock().unwrap() { 1974 - log::debug!("Caught up to latest PLC data"); 1975 - } 1976 - break; 1977 - } 1978 - } 1979 - 1980 - let fetch_total_duration = fetch_start.elapsed(); 1981 - let dedup_pct = if total_fetched > 0 { 1982 - (total_dupes + total_boundary_dupes) as f64 / total_fetched as f64 * 100.0 1983 - } else { 1984 - 0.0 1985 - }; 1986 - 1987 - let final_stats = self.get_mempool_stats()?; 1988 - 1989 - // Bundles must contain exactly BUNDLE_SIZE operations (no partial bundles allowed) 1990 - if final_stats.count < constants::BUNDLE_SIZE { 1991 - if caught_up { 1992 - // Caught up to latest PLC data without enough ops for a full bundle 1993 - // Return CaughtUp result instead of error 1994 - return Ok(SyncResult::CaughtUp { 1995 - next_bundle: next_bundle_num, 1996 - mempool_count: final_stats.count, 1997 - new_ops: total_fetched - total_dupes - total_boundary_dupes, 1998 - fetch_duration_ms: fetch_total_duration.as_millis() as u64, 1999 - }); 2000 - } else { 2001 - anyhow::bail!( 2002 - "Insufficient operations: have {}, need exactly {} (max attempts reached)", 2003 - final_stats.count, 2004 - constants::BUNDLE_SIZE 2005 - ); 2006 - } 2007 - } 2008 - 2009 - if *self.verbose.lock().unwrap() { 2010 - log::info!( 2011 - " ✓ Collected {} unique ops from {} fetches ({:.1}% dedup)", 2012 - final_stats.count, 2013 - fetch_num, 2014 - dedup_pct 2015 - ); 2016 - } 2017 - 2018 - // Take operations and create bundle 2019 - log::debug!( 2020 - "Calling operations.SaveBundle with bundle={}", 2021 - next_bundle_num 2022 - ); 2023 - 2024 - let operations = { 2025 - let mut mempool = self.mempool.write().unwrap(); 2026 - let mem = mempool 2027 - .as_mut() 2028 - .ok_or_else(|| anyhow::anyhow!("Mempool not initialized"))?; 2029 - // Take up to BUNDLE_SIZE operations (or all if less) 2030 - let count = mem.count().min(constants::BUNDLE_SIZE); 2031 - mem.take(count)? 2032 - }; 2033 - 2034 - if operations.is_empty() { 2035 - anyhow::bail!("No operations to create bundle"); 2036 - } 2037 - 2038 - // Bundles must contain exactly BUNDLE_SIZE operations 2039 - if operations.len() != constants::BUNDLE_SIZE { 2040 - anyhow::bail!( 2041 - "Invalid operation count: expected exactly {}, got {}", 2042 - constants::BUNDLE_SIZE, 2043 - operations.len() 2044 - ); 2045 - } 2046 - 2047 - log::debug!("SaveBundle SUCCESS, setting bundle fields"); 2048 - 2049 - // CRITICAL: Clear mempool BEFORE saving to ensure atomicity 2050 - // If interrupted after this point, the operations are no longer in mempool 2051 - // and won't be re-fetched on restart, preventing duplicate/inconsistent bundles. 2052 - // If save fails after clearing, we bail out and the operations are lost, 2053 - // but this is better than creating bundles with inconsistent content. 2054 - self.clear_mempool()?; 2055 - 2056 - // Save bundle to disk with timing breakdown 2057 - // Save bundle and update DID index (now fast with delta segments) 2058 - let save_start = Instant::now(); 2059 - let ( 2060 - serialize_time, 2061 - compress_time, 2062 - hash_time, 2063 - did_index_time, 2064 - index_write_time, 2065 - did_index_compacted, 2066 - ) = self 2067 - .save_bundle_with_timing(next_bundle_num, operations) 2068 - .await?; 2069 - let save_duration = save_start.elapsed(); 2070 - 2071 - // Show timing breakdown in verbose mode only 2072 - if *self.verbose.lock().unwrap() { 2073 - log::debug!( 2074 - " Save timing: serialize={:.3}ms, compress={:.3}ms, hash={:.3}ms, did_index={:.3}ms, index_write={:.3}ms, total={:.1}ms", 2075 - serialize_time.as_secs_f64() * 1000.0, 2076 - compress_time.as_secs_f64() * 1000.0, 2077 - hash_time.as_secs_f64() * 1000.0, 2078 - did_index_time.as_secs_f64() * 1000.0, 2079 - index_write_time.as_secs_f64() * 1000.0, 2080 - save_duration.as_secs_f64() * 1000.0 2081 - ); 2082 - } 2083 - 2084 - log::debug!("Adding bundle {} to index", next_bundle_num); 2085 - log::debug!("Index now has {} bundles", next_bundle_num); 2086 - log::debug!("Index saved, last bundle = {}", next_bundle_num); 2087 - 2088 - // Get bundle info for display 2089 - let (short_hash, age_str, unique_dids, size_bytes) = { 2090 - let index = self.index.read().unwrap(); 2091 - let bundle_meta = index.get_bundle(next_bundle_num).unwrap(); 2092 - // Use chain hash (first 7 chars) for display 2093 - let hash = bundle_meta.hash[..7].to_string(); 2094 - 2095 - // Calculate age 2096 - let created_time = chrono::DateTime::parse_from_rfc3339(&bundle_meta.start_time) 2097 - .unwrap() 2098 - .with_timezone(&chrono::Utc); 2099 - let now = chrono::Utc::now(); 2100 - let age = now.signed_duration_since(created_time); 2101 - let age_str = format_age(age); 2102 - 2103 - ( 2104 - hash, 2105 - age_str, 2106 - bundle_meta.did_count, 2107 - bundle_meta.compressed_size, 2108 - ) 2109 - }; 2110 - 2111 - // Get mempool count after clearing (should be 0, but check anyway) 2112 - let mempool_count = self.get_mempool_stats().map(|s| s.count).unwrap_or(0); 2113 - let total_duration_ms = (fetch_total_duration + save_duration).as_millis() as u64; 2114 - let fetch_duration_ms = fetch_total_duration.as_millis() as u64; 2115 - 2116 - // Calculate separate timings: bundle save (serialize + compress + hash) vs index (did_index + index_write) 2117 - let bundle_save_ms = (serialize_time + compress_time + hash_time).as_millis() as u64; 2118 - let index_ms = (did_index_time + index_write_time).as_millis() as u64; 2119 - 2120 - // Only log detailed info in verbose mode 2121 - if *self.verbose.lock().unwrap() { 2122 - log::info!( 2123 - "→ Bundle {:06} | {} | fetch: {:.3}s ({} reqs) | {}", 2124 - next_bundle_num, 2125 - short_hash, 2126 - fetch_total_duration.as_secs_f64(), 2127 - fetch_num, 2128 - age_str 2129 - ); 2130 - log::debug!( 2131 - "Bundle done = {}, finish duration = {:.3}ms", 2132 - next_bundle_num, 2133 - save_duration.as_secs_f64() * 1000.0 2134 - ); 2135 - } 2136 - 2137 - Ok(SyncResult::BundleCreated { 2138 - bundle_num: next_bundle_num, 2139 - mempool_count, 2140 - duration_ms: total_duration_ms, 2141 - fetch_duration_ms, 2142 - bundle_save_ms, 2143 - index_ms, 2144 - fetch_requests: fetch_num, 2145 - hash: short_hash, 2146 - age: age_str, 2147 - did_index_compacted, 2148 - unique_dids, 2149 - size_bytes, 2150 - fetch_wait_ms: total_wait.as_millis() as u64, 2151 - fetch_http_ms: total_http.as_millis() as u64, 2152 - }) 2153 - } 2154 - 2155 - /// Run single sync cycle 2156 - /// 2157 - /// If max_bundles is Some(n), stop after syncing n bundles 2158 - /// If max_bundles is None, sync until caught up 2159 - pub async fn sync_once( 2160 - &self, 2161 - client: &crate::plc_client::PLCClient, 2162 - max_bundles: Option<usize>, 2163 - ) -> Result<usize> { 2164 - let mut synced = 0; 2165 - 2166 - loop { 2167 - match self.sync_next_bundle(client, None).await { 2168 - Ok(SyncResult::BundleCreated { .. }) => { 2169 - synced += 1; 2170 - 2171 - // Check if we've reached the limit 2172 - if let Some(max) = max_bundles 2173 - && synced >= max 2174 - { 2175 - break; 2176 - } 2177 - } 2178 - Ok(SyncResult::CaughtUp { .. }) => { 2179 - // Caught up to latest PLC data 2180 - break; 2181 - } 2182 - Err(e) => return Err(e), 2183 - } 2184 - 2185 - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; 2186 - } 2187 - 2188 - Ok(synced) 2189 - } 2190 - 2191 1718 /// Save bundle to disk with compression and index updates (with timing) 2192 - async fn save_bundle_with_timing( 1719 + pub(crate) async fn save_bundle_with_timing( 2193 1720 &self, 2194 1721 bundle_num: u32, 2195 1722 operations: Vec<Operation>, ··· 3766 3293 pub filter: Option<OperationFilter>, 3767 3294 pub query: String, 3768 3295 pub mode: QueryMode, 3769 - } 3770 - 3771 - // Helper function to format age duration 3772 - fn format_age(duration: chrono::Duration) -> String { 3773 - let days = duration.num_days(); 3774 - if days >= 365 { 3775 - let years = days as f64 / 365.25; 3776 - format!("{:.1} years ago", years) 3777 - } else if days >= 30 { 3778 - let months = days as f64 / 30.0; 3779 - format!("{:.1} months ago", months) 3780 - } else if days > 0 { 3781 - format!("{} days ago", days) 3782 - } else { 3783 - let hours = duration.num_hours(); 3784 - if hours > 0 { 3785 - format!("{} hours ago", hours) 3786 - } else { 3787 - let mins = duration.num_minutes(); 3788 - if mins > 0 { 3789 - format!("{} minutes ago", mins) 3790 - } else { 3791 - "just now".to_string() 3792 - } 3793 - } 3794 - } 3795 3296 } 3796 3297 3797 3298 /// Bundle selection for queries, exports, and verification
+10 -8
src/plc_client.rs
··· 73 73 after: &str, 74 74 count: usize, 75 75 ) -> Result<(Vec<PLCOperation>, Duration, Duration)> { 76 - self.fetch_operations_with_retry_cancelable(after, count, 5, None).await 76 + self.fetch_operations_with_retry_cancelable(after, count, 5, None) 77 + .await 77 78 } 78 79 79 80 pub async fn fetch_operations_cancelable( ··· 82 83 count: usize, 83 84 shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>, 84 85 ) -> Result<(Vec<PLCOperation>, Duration, Duration)> { 85 - self.fetch_operations_with_retry_cancelable(after, count, 5, shutdown_rx).await 86 + self.fetch_operations_with_retry_cancelable(after, count, 5, shutdown_rx) 87 + .await 86 88 } 87 89 88 90 async fn fetch_operations_with_retry_cancelable( ··· 98 100 let mut total_http = Duration::from_secs(0); 99 101 100 102 for attempt in 1..=max_retries { 101 - if let Some(ref rx) = shutdown_rx && *rx.borrow() { 103 + if let Some(ref rx) = shutdown_rx 104 + && *rx.borrow() 105 + { 102 106 anyhow::bail!("Shutdown requested"); 103 107 } 104 - let export_url = format!( 105 - "{}/export?after={}&count={}", 106 - self.base_url, after, count 107 - ); 108 + let export_url = format!("{}/export?after={}&count={}", self.base_url, after, count); 108 109 109 110 let permits = self.rate_limiter.available_permits(); 110 111 let requests_in_period = self.count_requests_in_period(); ··· 369 370 let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(0)); 370 371 let sem_clone = semaphore.clone(); 371 372 372 - let refill_rate = Duration::from_secs_f64(period.as_secs_f64() / requests_per_period as f64); 373 + let refill_rate = 374 + Duration::from_secs_f64(period.as_secs_f64() / requests_per_period as f64); 373 375 374 376 // Spawn background task to refill permits at steady rate 375 377 // CRITICAL: Add first permit immediately, then refill at steady rate
+661 -368
src/sync.rs
··· 85 85 // Sync Events 86 86 // ============================================================================ 87 87 88 + // ============================================================================ 89 + // Sync Results & Events 90 + // ============================================================================ 91 + 92 + #[derive(Debug, Clone)] 93 + pub struct BundleCreatedResult { 94 + pub bundle_num: u32, 95 + pub hash: String, 96 + pub age: String, 97 + pub fetch_duration_ms: u64, 98 + pub bundle_save_ms: u64, 99 + pub index_ms: u64, 100 + pub total_duration_ms: u64, 101 + pub fetch_requests: usize, 102 + pub did_index_compacted: bool, 103 + pub unique_dids: u32, 104 + pub size_bytes: u64, 105 + pub fetch_wait_ms: u64, 106 + pub fetch_http_ms: u64, 107 + pub mempool_count: usize, 108 + } 109 + 110 + #[derive(Debug, Clone)] 111 + pub struct CaughtUpResult { 112 + pub next_bundle: u32, 113 + pub mempool_count: usize, 114 + pub new_ops: usize, 115 + pub fetch_duration_ms: u64, 116 + } 117 + 88 118 #[derive(Debug, Clone)] 89 119 pub enum SyncEvent { 90 - BundleCreated { 91 - bundle_num: u32, 92 - hash: String, 93 - age: String, 94 - fetch_duration_ms: u64, 95 - bundle_save_ms: u64, 96 - index_ms: u64, 97 - total_duration_ms: u64, 98 - fetch_requests: usize, 99 - did_index_compacted: bool, 100 - 101 - unique_dids: u32, 102 - size_bytes: u64, 103 - fetch_wait_ms: u64, 104 - fetch_http_ms: u64, 105 - }, 106 - CaughtUp { 107 - next_bundle: u32, 108 - mempool_count: usize, 109 - new_ops: usize, 110 - fetch_duration_ms: u64, 111 - }, 120 + BundleCreated(BundleCreatedResult), 121 + CaughtUp(CaughtUpResult), 112 122 InitialSyncComplete { 113 123 total_bundles: u32, 114 124 mempool_count: usize, ··· 116 126 Error { 117 127 error: String, 118 128 }, 129 + } 130 + 131 + /// Result of a sync operation 132 + #[derive(Debug, Clone)] 133 + pub enum SyncResult { 134 + /// Successfully created a bundle 135 + BundleCreated(BundleCreatedResult), 136 + /// Caught up to latest PLC data, mempool has partial operations 137 + CaughtUp(CaughtUpResult), 119 138 } 120 139 121 140 // ============================================================================ ··· 154 173 pub total_duration: Duration, 155 174 } 156 175 176 + #[derive(Debug, Default)] 177 + struct FetchStats { 178 + fetch_num: usize, 179 + total_fetched: usize, 180 + total_dupes: usize, 181 + total_boundary_dupes: usize, 182 + fetch_duration: Duration, 183 + total_wait: Duration, 184 + total_http: Duration, 185 + } 186 + 157 187 // ============================================================================ 158 188 // Sync Logger Trait 159 189 // ============================================================================ ··· 162 192 pub trait SyncLogger: Send + Sync { 163 193 fn on_sync_start(&self, interval: Duration); 164 194 165 - #[allow(clippy::too_many_arguments)] 166 - fn on_bundle_created( 167 - &self, 168 - bundle_num: u32, 169 - hash: &str, 170 - age: &str, 171 - fetch_duration_ms: u64, 172 - bundle_save_ms: u64, 173 - index_ms: u64, 174 - total_duration_ms: u64, 175 - fetch_requests: usize, 176 - did_index_compacted: bool, 177 - unique_dids: u32, 178 - size_bytes: u64, 179 - fetch_wait_ms: u64, 180 - fetch_http_ms: u64, 181 - ); 195 + fn on_bundle_created(&self, result: &BundleCreatedResult); 182 196 183 - // Allow the sync logger to accept multiple arguments for detailed bundle info 184 - // (Removed workaround method; use allow attribute on trait method instead) 185 - 186 - fn on_caught_up( 187 - &self, 188 - next_bundle: u32, 189 - mempool_count: usize, 190 - new_ops: usize, 191 - fetch_duration_ms: u64, 192 - ); 197 + fn on_caught_up(&self, result: &CaughtUpResult); 193 198 194 199 fn on_initial_sync_complete( 195 200 &self, ··· 264 269 } 265 270 } 266 271 267 - #[allow(clippy::too_many_arguments)] 268 - fn on_bundle_created( 269 - &self, 270 - bundle_num: u32, 271 - hash: &str, 272 - age: &str, 273 - _fetch_duration_ms: u64, 274 - bundle_save_ms: u64, 275 - index_ms: u64, 276 - _total_duration_ms: u64, 277 - fetch_requests: usize, 278 - _did_index_compacted: bool, 279 - unique_dids: u32, 280 - size_bytes: u64, 281 - fetch_wait_ms: u64, 282 - fetch_http_ms: u64, 283 - ) { 284 - let fetch_secs = fetch_http_ms as f64 / 1000.0; 285 - let wait_secs = fetch_wait_ms as f64 / 1000.0; 286 - let size_kb = size_bytes as f64 / 1024.0; 272 + fn on_bundle_created(&self, result: &BundleCreatedResult) { 273 + let fetch_secs = result.fetch_http_ms as f64 / 1000.0; 274 + let wait_secs = result.fetch_wait_ms as f64 / 1000.0; 275 + let size_kb = result.size_bytes as f64 / 1024.0; 287 276 let size_str = if size_kb >= 1024.0 { 288 277 format!("{:.1}MB", size_kb / 1024.0) 289 278 } else { ··· 292 281 293 282 eprintln!( 294 283 "[INFO] → Bundle {:06} | {} | {} dids | {} | fetch: {:.2}s ({} reqs, {:.1}s wait) | save: {}ms | index: {}ms | {}", 295 - bundle_num, 296 - hash, 297 - unique_dids, 284 + result.bundle_num, 285 + result.hash, 286 + result.unique_dids, 298 287 size_str, 299 288 fetch_secs, 300 - fetch_requests, 289 + result.fetch_requests, 301 290 wait_secs, 302 - bundle_save_ms, 303 - index_ms, 304 - age 291 + result.bundle_save_ms, 292 + result.index_ms, 293 + result.age 305 294 ); 306 295 } 307 296 308 - fn on_caught_up( 309 - &self, 310 - next_bundle: u32, 311 - mempool_count: usize, 312 - new_ops: usize, 313 - fetch_duration_ms: u64, 314 - ) { 315 - if new_ops > 0 { 297 + fn on_caught_up(&self, result: &CaughtUpResult) { 298 + if result.new_ops > 0 { 316 299 eprintln!( 317 300 "[Sync] ✓ Bundle {:06} (upcoming) | mempool: {} ({:+}) | fetch: {}ms", 318 - next_bundle, mempool_count, new_ops as i32, fetch_duration_ms 301 + result.next_bundle, 302 + result.mempool_count, 303 + result.new_ops as i32, 304 + result.fetch_duration_ms 319 305 ); 320 306 } else { 321 307 eprintln!( 322 308 "[Sync] ✓ Bundle {:06} (upcoming) | mempool: {} | fetch: {}ms", 323 - next_bundle, mempool_count, fetch_duration_ms 309 + result.next_bundle, result.mempool_count, result.fetch_duration_ms 324 310 ); 325 311 } 326 312 } ··· 407 393 // Then, call logger if provided 408 394 if let Some(logger) = &self.logger { 409 395 match event { 410 - SyncEvent::BundleCreated { 411 - bundle_num, 412 - hash, 413 - age, 414 - fetch_duration_ms, 415 - bundle_save_ms, 416 - index_ms, 417 - total_duration_ms, 418 - fetch_requests, 419 - did_index_compacted, 420 - unique_dids, 421 - size_bytes, 422 - fetch_wait_ms, 423 - fetch_http_ms, 424 - } => { 425 - logger.on_bundle_created( 426 - *bundle_num, 427 - hash, 428 - age, 429 - *fetch_duration_ms, 430 - *bundle_save_ms, 431 - *index_ms, 432 - *total_duration_ms, 433 - *fetch_requests, 434 - *did_index_compacted, 435 - *unique_dids, 436 - *size_bytes, 437 - *fetch_wait_ms, 438 - *fetch_http_ms, 439 - ); 396 + SyncEvent::BundleCreated(result) => { 397 + logger.on_bundle_created(result); 440 398 } 441 - SyncEvent::CaughtUp { 442 - next_bundle, 443 - mempool_count, 444 - new_ops, 445 - fetch_duration_ms, 446 - } => { 447 - logger.on_caught_up(*next_bundle, *mempool_count, *new_ops, *fetch_duration_ms); 399 + SyncEvent::CaughtUp(result) => { 400 + logger.on_caught_up(result); 448 401 } 449 402 SyncEvent::InitialSyncComplete { 450 403 total_bundles, ··· 484 437 } 485 438 } 486 439 440 + /// Fetch operations loop 441 + /// Returns (caught_up, fetch_stats) 442 + async fn fetch_operations_loop( 443 + &self, 444 + mut after_time: String, 445 + prev_boundary: &HashSet<String>, 446 + shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>, 447 + ) -> Result<(bool, FetchStats)> { 448 + use std::time::Instant; 449 + 450 + let mut fetch_num = 0; 451 + let mut total_fetched = 0; 452 + let mut total_dupes = 0; 453 + let mut total_boundary_dupes = 0; 454 + let fetch_start = Instant::now(); 455 + let mut caught_up = false; 456 + const MAX_ATTEMPTS: usize = 50; 457 + let mut total_wait = std::time::Duration::from_secs(0); 458 + let mut total_http = std::time::Duration::from_secs(0); 459 + 460 + while fetch_num < MAX_ATTEMPTS { 461 + let stats = self.manager.get_mempool_stats()?; 462 + 463 + if stats.count >= constants::BUNDLE_SIZE { 464 + break; 465 + } 466 + 467 + fetch_num += 1; 468 + let needed = constants::BUNDLE_SIZE - stats.count; 469 + 470 + // Smart batch sizing - request more than exact amount to account for duplicates 471 + let request_count = match needed { 472 + n if n <= 50 => 50, 473 + n if n <= 100 => 100, 474 + n if n <= 500 => 200, 475 + _ => 1000, 476 + }; 477 + 478 + if self.config.verbose { 479 + log::info!( 480 + " Fetch #{}: requesting {} (need {} more, have {}/{})", 481 + fetch_num, 482 + request_count, 483 + needed, 484 + stats.count, 485 + constants::BUNDLE_SIZE 486 + ); 487 + } 488 + 489 + let fetch_op_start = Instant::now(); 490 + if let Some(ref rx) = shutdown_rx 491 + && *rx.borrow() 492 + { 493 + anyhow::bail!("Shutdown requested"); 494 + } 495 + let (plc_ops, wait_dur, http_dur) = if let Some(rx) = shutdown_rx.clone() { 496 + self.client 497 + .fetch_operations_cancelable(&after_time, request_count, Some(rx)) 498 + .await? 499 + } else { 500 + self.client 501 + .fetch_operations(&after_time, request_count) 502 + .await? 503 + }; 504 + total_wait += wait_dur; 505 + total_http += http_dur; 506 + 507 + let fetched_count = plc_ops.len(); 508 + 509 + // Check for incomplete batch (indicates caught up) 510 + let got_incomplete_batch = fetched_count > 0 && fetched_count < request_count; 511 + 512 + if plc_ops.is_empty() || got_incomplete_batch { 513 + caught_up = true; 514 + if self.config.verbose { 515 + log::info!( 516 + " Caught up (fetched {} items, requested {})", 517 + fetched_count, 518 + request_count 519 + ); 520 + } 521 + } 522 + 523 + if plc_ops.is_empty() { 524 + break; 525 + } 526 + 527 + // Update after_time for next fetch 528 + if let Some(last) = plc_ops.last() { 529 + after_time = last.created_at.clone(); 530 + } 531 + 532 + // Convert and deduplicate 533 + let mut new_ops = Vec::with_capacity(plc_ops.len()); 534 + let mut batch_dupes = 0; 535 + let mut batch_boundary_dupes = 0; 536 + 537 + for plc_op in plc_ops { 538 + let op: Operation = plc_op.into(); 539 + 540 + // CRITICAL: Check against previous boundary CIDs 541 + if op.cid.as_ref().is_some_and(|cid| prev_boundary.contains(cid)) { 542 + batch_boundary_dupes += 1; 543 + continue; 544 + } 545 + 546 + new_ops.push(op); 547 + } 548 + 549 + // Add to mempool (handles internal deduplication) 550 + let added = self.manager.add_to_mempool(new_ops)?; 551 + batch_dupes += fetched_count - batch_boundary_dupes - added; 552 + 553 + total_fetched += fetched_count; 554 + total_dupes += batch_dupes; 555 + total_boundary_dupes += batch_boundary_dupes; 556 + 557 + let fetch_op_ms = fetch_op_start.elapsed().as_secs_f64() * 1000.0; 558 + 559 + if self.config.verbose { 560 + log::debug!( 561 + " Fetch #{}: +{} ops ({} dupes, {} boundary dupes) in {:.1}ms", 562 + fetch_num, 563 + added, 564 + batch_dupes, 565 + batch_boundary_dupes, 566 + fetch_op_ms 567 + ); 568 + } 569 + 570 + if caught_up { 571 + break; 572 + } 573 + } 574 + 575 + Ok(( 576 + caught_up, 577 + FetchStats { 578 + fetch_num, 579 + total_fetched, 580 + total_dupes, 581 + total_boundary_dupes, 582 + fetch_duration: fetch_start.elapsed(), 583 + total_wait, 584 + total_http, 585 + }, 586 + )) 587 + } 588 + 589 + /// Fetch and save next bundle from PLC directory 590 + /// DID index is updated on every bundle (fast with delta segments) 591 + pub async fn sync_next_bundle( 592 + &self, 593 + shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>, 594 + ) -> Result<SyncResult> { 595 + use crate::manager::LoadOptions; 596 + 597 + // Validate repository state before starting 598 + self.manager.validate_sync_state()?; 599 + 600 + let next_bundle_num = self.manager.get_last_bundle() + 1; 601 + 602 + // ALWAYS get boundaries from last bundle initially 603 + let (mut after_time, mut prev_boundary) = if next_bundle_num > 1 { 604 + let last = self 605 + .manager 606 + .load_bundle(next_bundle_num - 1, LoadOptions::default())?; 607 + let boundary = get_boundary_cids(&last.operations); 608 + let cursor = last 609 + .operations 610 + .last() 611 + .map(|op| op.created_at.clone()) 612 + .unwrap_or_default(); 613 + 614 + if self.config.verbose { 615 + log::info!( 616 + "Loaded {} boundary CIDs from bundle {:06} (at {})", 617 + boundary.len(), 618 + next_bundle_num - 1, 619 + cursor 620 + ); 621 + } 622 + 623 + (cursor, boundary) 624 + } else { 625 + ("1970-01-01T00:00:00Z".to_string(), HashSet::new()) 626 + }; 627 + 628 + // If mempool has operations, update cursor AND boundaries from mempool 629 + // (mempool operations already had boundary dedup applied when they were added) 630 + let mempool_stats = self.manager.get_mempool_stats()?; 631 + if mempool_stats.count > 0 632 + && let Some(last_time) = mempool_stats.last_time 633 + { 634 + if self.config.verbose { 635 + log::debug!( 636 + "Mempool has {} ops, resuming from {}", 637 + mempool_stats.count, 638 + last_time.format("%Y-%m-%dT%H:%M:%S") 639 + ); 640 + } 641 + after_time = last_time.to_rfc3339(); 642 + 643 + // Calculate boundaries from MEMPOOL for next fetch 644 + let mempool_ops = self.manager.get_mempool_operations()?; 645 + if !mempool_ops.is_empty() { 646 + prev_boundary = get_boundary_cids(&mempool_ops); 647 + if self.config.verbose { 648 + log::info!("Using {} boundary CIDs from mempool", prev_boundary.len()); 649 + } 650 + } 651 + } 652 + 653 + log::debug!( 654 + "Preparing bundle {:06} (mempool: {} ops)...", 655 + next_bundle_num, 656 + mempool_stats.count 657 + ); 658 + log::debug!( 659 + "Starting cursor: {}", 660 + if after_time.is_empty() || after_time == "1970-01-01T00:00:00Z" { 661 + "" 662 + } else { 663 + &after_time 664 + } 665 + ); 666 + 667 + if !prev_boundary.is_empty() && self.config.verbose && mempool_stats.count == 0 { 668 + log::info!( 669 + " Starting with {} boundary CIDs from previous bundle", 670 + prev_boundary.len() 671 + ); 672 + } 673 + 674 + // Ensure mempool is loaded (load if needed) 675 + self.manager.load_mempool()?; 676 + 677 + // Run fetch loop 678 + let (_caught_up, fetch_stats) = self 679 + .fetch_operations_loop(after_time, &prev_boundary, shutdown_rx) 680 + .await?; 681 + 682 + let _fetch_duration_ms = fetch_stats.fetch_duration.as_secs_f64() * 1000.0; 683 + let _fetch_wait_ms = fetch_stats.total_wait.as_secs_f64() * 1000.0; 684 + let _fetch_http_ms = fetch_stats.total_http.as_secs_f64() * 1000.0; 685 + 686 + let stats = self.manager.get_mempool_stats()?; 687 + 688 + self.save_bundle_if_ready(next_bundle_num, &stats, &fetch_stats) 689 + .await 690 + } 691 + 692 + /// Check if we have enough operations for a bundle and save it if so 693 + async fn save_bundle_if_ready( 694 + &self, 695 + next_bundle_num: u32, 696 + stats: &crate::mempool::MempoolStats, 697 + fetch_stats: &FetchStats, 698 + ) -> Result<SyncResult> { 699 + let fetch_duration_ms = fetch_stats.fetch_duration.as_secs_f64() * 1000.0; 700 + let fetch_wait_ms = fetch_stats.total_wait.as_secs_f64() * 1000.0; 701 + let fetch_http_ms = fetch_stats.total_http.as_secs_f64() * 1000.0; 702 + 703 + // If we have a full bundle, save it 704 + if stats.count >= constants::BUNDLE_SIZE { 705 + let bundle_ops = self.manager.get_mempool_operations()?; 706 + 707 + // Double check we have enough operations 708 + if bundle_ops.len() < constants::BUNDLE_SIZE { 709 + // This shouldn't happen if stats.count is correct 710 + log::warn!( 711 + "Mempool stats say {} but got {} ops", 712 + stats.count, 713 + bundle_ops.len() 714 + ); 715 + if bundle_ops.len() < constants::BUNDLE_SIZE { 716 + // Not enough for a bundle yet 717 + return Ok(SyncResult::CaughtUp(CaughtUpResult { 718 + next_bundle: next_bundle_num, 719 + mempool_count: bundle_ops.len(), 720 + new_ops: fetch_stats.total_fetched 721 + - fetch_stats.total_dupes 722 + - fetch_stats.total_boundary_dupes, 723 + fetch_duration_ms: fetch_duration_ms as u64, 724 + })); 725 + } 726 + } 727 + 728 + // Take exactly BUNDLE_SIZE operations 729 + let (bundle_ops, remaining_ops) = bundle_ops.split_at(constants::BUNDLE_SIZE); 730 + let bundle_ops_vec = bundle_ops.to_vec(); 731 + let remaining_ops_vec = remaining_ops.to_vec(); 732 + 733 + // Calculate size for logging 734 + let size_bytes: u64 = bundle_ops_vec 735 + .iter() 736 + .map(|op| op.raw_json.as_ref().map(|s| s.len()).unwrap_or(0) as u64) 737 + .sum(); 738 + 739 + // Save bundle 740 + let ( 741 + _serialize_time, 742 + _compress_time, 743 + _hash_time, 744 + _did_index_time, 745 + index_ms, 746 + did_index_compacted, 747 + ) = self 748 + .manager 749 + .save_bundle_with_timing(next_bundle_num, bundle_ops_vec) 750 + .await?; 751 + 752 + // Update mempool with remaining operations 753 + self.manager.clear_mempool()?; 754 + if !remaining_ops_vec.is_empty() { 755 + self.manager.add_to_mempool(remaining_ops_vec)?; 756 + } 757 + 758 + let total_duration = fetch_stats.fetch_duration + index_ms; 759 + let total_duration_ms = total_duration.as_secs_f64() * 1000.0; 760 + 761 + // Count unique DIDs for stats 762 + let unique_dids = bundle_ops 763 + .iter() 764 + .map(|op| op.did.clone()) 765 + .collect::<HashSet<_>>() 766 + .len() as u32; 767 + 768 + // Calculate age 769 + let age = if let Some(last_op) = bundle_ops.last() { 770 + if let Ok(created_time) = chrono::DateTime::parse_from_rfc3339(&last_op.created_at) 771 + { 772 + let now = chrono::Utc::now(); 773 + let duration = 774 + now.signed_duration_since(created_time.with_timezone(&chrono::Utc)); 775 + crate::format::format_age(duration) 776 + } else { 777 + "unknown".to_string() 778 + } 779 + } else { 780 + "unknown".to_string() 781 + }; 782 + 783 + Ok(SyncResult::BundleCreated(BundleCreatedResult { 784 + bundle_num: next_bundle_num, 785 + mempool_count: remaining_ops.len(), 786 + total_duration_ms: total_duration_ms as u64, 787 + fetch_duration_ms: fetch_duration_ms as u64, 788 + bundle_save_ms: 0, // Included in index_ms/total for now or need to separate 789 + index_ms: index_ms.as_millis() as u64, 790 + fetch_requests: fetch_stats.fetch_num, 791 + hash: self 792 + .manager 793 + .get_bundle_metadata(next_bundle_num)? 794 + .map(|b| b.hash) 795 + .unwrap_or_default(), 796 + age, 797 + did_index_compacted, 798 + unique_dids, 799 + size_bytes, 800 + fetch_wait_ms: fetch_wait_ms as u64, 801 + fetch_http_ms: fetch_http_ms as u64, 802 + })) 803 + } else { 804 + // Not enough for a bundle, we are caught up 805 + Ok(SyncResult::CaughtUp(CaughtUpResult { 806 + next_bundle: next_bundle_num, 807 + mempool_count: stats.count, 808 + new_ops: fetch_stats.total_fetched 809 + - fetch_stats.total_dupes 810 + - fetch_stats.total_boundary_dupes, 811 + fetch_duration_ms: fetch_duration_ms as u64, 812 + })) 813 + } 814 + } 815 + 487 816 pub async fn run_once(&self, max_bundles: Option<usize>) -> Result<usize> { 488 817 let mut synced = 0; 489 818 ··· 502 831 .and_then(|v| v.as_u64()) 503 832 .unwrap_or(0); 504 833 505 - match self 506 - .manager 507 - .sync_next_bundle(&self.client, None) 508 - .await 509 - { 510 - Ok(crate::manager::SyncResult::BundleCreated { 511 - bundle_num, 512 - mempool_count: _, 513 - duration_ms, 514 - fetch_duration_ms, 515 - bundle_save_ms, 516 - index_ms, 517 - fetch_requests, 518 - hash, 519 - age, 520 - did_index_compacted, 521 - unique_dids, 522 - size_bytes, 523 - fetch_wait_ms, 524 - fetch_http_ms, 525 - }) => { 834 + match self.sync_next_bundle(None).await { 835 + Ok(SyncResult::BundleCreated(result)) => { 526 836 synced += 1; 527 837 528 - self.handle_event(&SyncEvent::BundleCreated { 529 - bundle_num, 530 - hash, 531 - age, 532 - fetch_duration_ms, 533 - bundle_save_ms, 534 - index_ms, 535 - total_duration_ms: duration_ms, 536 - fetch_requests, 537 - did_index_compacted, 538 - unique_dids, 539 - size_bytes, 540 - fetch_wait_ms, 541 - fetch_http_ms, 542 - }); 838 + self.handle_event(&SyncEvent::BundleCreated(result.clone())); 543 839 544 840 // Show compaction message if index was compacted 545 841 self.show_compaction_if_needed( 546 - did_index_compacted, 842 + result.did_index_compacted, 547 843 delta_segments_before, 548 - index_ms, 844 + result.index_ms, 549 845 ); 550 846 551 847 // Check if we've reached the limit ··· 555 851 break; 556 852 } 557 853 } 558 - Ok(crate::manager::SyncResult::CaughtUp { 559 - next_bundle, 560 - mempool_count, 561 - new_ops, 562 - fetch_duration_ms, 563 - }) => { 564 - self.handle_event(&SyncEvent::CaughtUp { 565 - next_bundle, 566 - mempool_count, 567 - new_ops, 568 - fetch_duration_ms, 569 - }); 854 + Ok(SyncResult::CaughtUp(result)) => { 855 + self.handle_event(&SyncEvent::CaughtUp(result)); 570 856 break; 571 857 } 572 858 Err(e) => { ··· 593 879 } 594 880 595 881 pub async fn run_continuous(&self) -> Result<()> { 596 - use tokio::time::sleep; 597 882 598 883 let mut total_synced = 0u32; 599 884 let mut is_initial_sync = true; ··· 634 919 .unwrap_or(0); 635 920 636 921 let sync_result = self 637 - .manager 638 - .sync_next_bundle(&self.client, self.config.shutdown_rx.clone()) 922 + .sync_next_bundle(self.config.shutdown_rx.clone()) 639 923 .await; 640 924 641 925 match sync_result { 642 - Ok(crate::manager::SyncResult::BundleCreated { 643 - bundle_num, 644 - mempool_count: _, 645 - duration_ms, 646 - fetch_duration_ms, 647 - bundle_save_ms, 648 - index_ms, 649 - fetch_requests, 650 - hash, 651 - age, 652 - did_index_compacted, 653 - unique_dids, 654 - size_bytes, 655 - fetch_wait_ms, 656 - fetch_http_ms, 657 - }) => { 658 - total_synced += 1; 659 - 660 - // Reset error counter on successful sync 661 - use std::sync::atomic::{AtomicU32, Ordering}; 662 - static CONSECUTIVE_ERRORS: AtomicU32 = AtomicU32::new(0); 663 - CONSECUTIVE_ERRORS.store(0, Ordering::Relaxed); 664 - 665 - self.handle_event(&SyncEvent::BundleCreated { 666 - bundle_num, 667 - hash, 668 - age, 669 - fetch_duration_ms, 670 - bundle_save_ms, 671 - index_ms, 672 - total_duration_ms: duration_ms, 673 - fetch_requests, 674 - did_index_compacted, 675 - unique_dids, 676 - size_bytes, 677 - fetch_wait_ms, 678 - fetch_http_ms, 679 - }); 680 - 681 - // Show compaction message if index was compacted 682 - self.show_compaction_if_needed( 683 - did_index_compacted, 684 - delta_segments_before, 685 - index_ms, 686 - ); 687 - 688 - // Check max bundles limit 689 - if self.config.max_bundles > 0 690 - && total_synced as usize >= self.config.max_bundles 926 + Ok(SyncResult::BundleCreated(result)) => { 927 + if self 928 + .handle_bundle_created( 929 + result, 930 + &mut total_synced, 931 + is_initial_sync, 932 + delta_segments_before, 933 + ) 934 + .await? 691 935 { 692 - if self.config.verbose { 693 - eprintln!( 694 - "[Sync] Reached max bundles limit ({})", 695 - self.config.max_bundles 696 - ); 697 - } 698 936 break; 699 937 } 700 - 701 - // Check for shutdown before sleeping 702 - if let Some(ref shutdown_rx) = self.config.shutdown_rx 703 - && *shutdown_rx.borrow() 938 + } 939 + Ok(SyncResult::CaughtUp(result)) => { 940 + if self 941 + .handle_caught_up(result, total_synced, &mut is_initial_sync) 942 + .await? 704 943 { 705 - if self.config.verbose { 706 - eprintln!("[Sync] Shutdown requested, stopping..."); 707 - } 708 944 break; 709 945 } 946 + } 947 + Err(e) => { 948 + self.handle_sync_error(e).await?; 949 + } 950 + } 951 + } 710 952 711 - // During initial sync, sleep briefly (500ms) to avoid hammering the API 712 - // After initial sync, use the full interval 713 - // Use select to allow cancellation during sleep 714 - let sleep_duration = if is_initial_sync { 715 - Duration::from_millis(500) 716 - } else { 717 - self.config.interval 718 - }; 953 + Ok(()) 954 + } 719 955 720 - if let Some(ref shutdown_rx) = self.config.shutdown_rx { 721 - let mut shutdown_rx = shutdown_rx.clone(); 722 - tokio::select! { 723 - _ = sleep(sleep_duration) => {} 724 - _ = shutdown_rx.changed() => { 725 - if *shutdown_rx.borrow() { 726 - break; 727 - } 728 - } 729 - } 730 - } else { 731 - sleep(sleep_duration).await; 956 + async fn handle_bundle_created( 957 + &self, 958 + result: BundleCreatedResult, 959 + total_synced: &mut u32, 960 + is_initial_sync: bool, 961 + delta_segments_before: u64, 962 + ) -> Result<bool> { 963 + use tokio::time::sleep; 964 + 965 + *total_synced += 1; 966 + 967 + // Reset error counter on successful sync 968 + use std::sync::atomic::{AtomicU32, Ordering}; 969 + static CONSECUTIVE_ERRORS: AtomicU32 = AtomicU32::new(0); 970 + CONSECUTIVE_ERRORS.store(0, Ordering::Relaxed); 971 + 972 + self.handle_event(&SyncEvent::BundleCreated(result.clone())); 973 + 974 + // Show compaction message if index was compacted 975 + self.show_compaction_if_needed( 976 + result.did_index_compacted, 977 + delta_segments_before, 978 + result.index_ms, 979 + ); 980 + 981 + // Check max bundles limit 982 + if self.config.max_bundles > 0 && *total_synced as usize >= self.config.max_bundles { 983 + if self.config.verbose { 984 + eprintln!( 985 + "[Sync] Reached max bundles limit ({})", 986 + self.config.max_bundles 987 + ); 988 + } 989 + return Ok(true); // Stop sync 990 + } 991 + 992 + // Check for shutdown before sleeping 993 + if let Some(ref shutdown_rx) = self.config.shutdown_rx 994 + && *shutdown_rx.borrow() 995 + { 996 + if self.config.verbose { 997 + eprintln!("[Sync] Shutdown requested, stopping..."); 998 + } 999 + return Ok(true); // Stop sync 1000 + } 1001 + 1002 + // During initial sync, sleep briefly (500ms) to avoid hammering the API 1003 + // After initial sync, use the full interval 1004 + // Use select to allow cancellation during sleep 1005 + let sleep_duration = if is_initial_sync { 1006 + Duration::from_millis(500) 1007 + } else { 1008 + self.config.interval 1009 + }; 1010 + 1011 + if let Some(ref shutdown_rx) = self.config.shutdown_rx { 1012 + let mut shutdown_rx = shutdown_rx.clone(); 1013 + tokio::select! { 1014 + _ = sleep(sleep_duration) => {} 1015 + _ = shutdown_rx.changed() => { 1016 + if *shutdown_rx.borrow() { 1017 + return Ok(true); // Stop sync 732 1018 } 733 1019 } 734 - Ok(crate::manager::SyncResult::CaughtUp { 735 - next_bundle, 736 - mempool_count, 737 - new_ops, 738 - fetch_duration_ms, 739 - }) => { 740 - // Check for shutdown 741 - if let Some(ref shutdown_rx) = self.config.shutdown_rx 742 - && *shutdown_rx.borrow() 743 - { 744 - if self.config.verbose { 745 - eprintln!("[Sync] Shutdown requested, stopping..."); 746 - } 747 - break; 748 - } 1020 + } 1021 + } else { 1022 + sleep(sleep_duration).await; 1023 + } 1024 + 1025 + Ok(false) // Continue sync 1026 + } 1027 + 1028 + async fn handle_caught_up( 1029 + &self, 1030 + result: CaughtUpResult, 1031 + total_synced: u32, 1032 + is_initial_sync: &mut bool, 1033 + ) -> Result<bool> { 1034 + use tokio::time::sleep; 1035 + 1036 + // Check for shutdown 1037 + if let Some(ref shutdown_rx) = self.config.shutdown_rx 1038 + && *shutdown_rx.borrow() 1039 + { 1040 + if self.config.verbose { 1041 + eprintln!("[Sync] Shutdown requested, stopping..."); 1042 + } 1043 + return Ok(true); // Stop sync 1044 + } 749 1045 750 - // Caught up to the end of the chain 751 - // Mark initial sync as complete ONLY if we actually synced at least one bundle. 752 - // This prevents premature "initial sync complete" when we just have a full 753 - // mempool from a previous run but still have thousands of bundles to sync. 754 - if is_initial_sync && total_synced > 0 { 755 - is_initial_sync = false; 1046 + // Caught up to the end of the chain 1047 + // Mark initial sync as complete ONLY if we actually synced at least one bundle. 1048 + // This prevents premature "initial sync complete" when we just have a full 1049 + // mempool from a previous run but still have thousands of bundles to sync. 1050 + if *is_initial_sync && total_synced > 0 { 1051 + *is_initial_sync = false; 756 1052 757 - self.handle_event(&SyncEvent::InitialSyncComplete { 758 - total_bundles: total_synced, 759 - mempool_count, 760 - }); 761 - } 1053 + self.handle_event(&SyncEvent::InitialSyncComplete { 1054 + total_bundles: total_synced, 1055 + mempool_count: result.mempool_count, 1056 + }); 1057 + } 762 1058 763 - self.handle_event(&SyncEvent::CaughtUp { 764 - next_bundle, 765 - mempool_count, 766 - new_ops, 767 - fetch_duration_ms, 768 - }); 1059 + self.handle_event(&SyncEvent::CaughtUp(result.clone())); 769 1060 770 - // Always sleep for the full interval when caught up (monitoring mode) 771 - // Use select to allow cancellation during sleep 772 - if let Some(ref shutdown_rx) = self.config.shutdown_rx { 773 - let mut shutdown_rx = shutdown_rx.clone(); 774 - tokio::select! { 775 - _ = sleep(self.config.interval) => {} 776 - _ = shutdown_rx.changed() => { 777 - if *shutdown_rx.borrow() { 778 - break; 779 - } 780 - } 781 - } 782 - } else { 783 - sleep(self.config.interval).await; 1061 + // Always sleep for the full interval when caught up (monitoring mode) 1062 + // Use select to allow cancellation during sleep 1063 + if let Some(ref shutdown_rx) = self.config.shutdown_rx { 1064 + let mut shutdown_rx = shutdown_rx.clone(); 1065 + tokio::select! { 1066 + _ = sleep(self.config.interval) => {} 1067 + _ = shutdown_rx.changed() => { 1068 + if *shutdown_rx.borrow() { 1069 + return Ok(true); // Stop sync 784 1070 } 785 1071 } 786 - Err(e) => { 787 - let error_msg = e.to_string(); 788 - self.handle_event(&SyncEvent::Error { 789 - error: error_msg.clone(), 790 - }); 1072 + } 1073 + } else { 1074 + sleep(self.config.interval).await; 1075 + } 1076 + 1077 + Ok(false) // Continue sync 1078 + } 791 1079 792 - // Determine if error is retryable 793 - let is_retryable = is_retryable_error(&error_msg); 1080 + async fn handle_sync_error(&self, e: anyhow::Error) -> Result<()> { 1081 + use tokio::time::sleep; 794 1082 795 - if is_retryable { 796 - // Retry transient errors with exponential backoff 797 - use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; 798 - static CONSECUTIVE_ERRORS: AtomicU32 = AtomicU32::new(0); 799 - static LAST_ERROR_TIME_SECS: AtomicU64 = AtomicU64::new(0); 1083 + let error_msg = e.to_string(); 1084 + self.handle_event(&SyncEvent::Error { 1085 + error: error_msg.clone(), 1086 + }); 800 1087 801 - let now = std::time::SystemTime::now() 802 - .duration_since(std::time::UNIX_EPOCH) 803 - .unwrap() 804 - .as_secs(); 1088 + // Determine if error is retryable 1089 + let is_retryable = is_retryable_error(&error_msg); 805 1090 806 - let last_error_secs = LAST_ERROR_TIME_SECS.load(Ordering::Relaxed); 1091 + if is_retryable { 1092 + // Retry transient errors with exponential backoff 1093 + use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; 1094 + static CONSECUTIVE_ERRORS: AtomicU32 = AtomicU32::new(0); 1095 + static LAST_ERROR_TIME_SECS: AtomicU64 = AtomicU64::new(0); 807 1096 808 - // Reset error count if last error was more than 5 minutes ago 809 - if last_error_secs > 0 && now - last_error_secs > 300 { 810 - CONSECUTIVE_ERRORS.store(0, Ordering::Relaxed); 811 - } 1097 + let now = std::time::SystemTime::now() 1098 + .duration_since(std::time::UNIX_EPOCH) 1099 + .unwrap() 1100 + .as_secs(); 812 1101 813 - let error_count = CONSECUTIVE_ERRORS.fetch_add(1, Ordering::Relaxed) + 1; 814 - LAST_ERROR_TIME_SECS.store(now, Ordering::Relaxed); 1102 + let last_error_secs = LAST_ERROR_TIME_SECS.load(Ordering::Relaxed); 815 1103 816 - // Calculate backoff with exponential increase (cap at 5 minutes) 817 - let backoff_secs = std::cmp::min(2u64.pow((error_count - 1).min(8)), 300); 1104 + // Reset error count if last error was more than 5 minutes ago 1105 + if last_error_secs > 0 && now - last_error_secs > 300 { 1106 + CONSECUTIVE_ERRORS.store(0, Ordering::Relaxed); 1107 + } 818 1108 819 - if self.config.verbose || error_count == 1 { 820 - eprintln!( 821 - "[Sync] Retryable error (attempt {}): {}", 822 - error_count, error_msg 823 - ); 824 - eprintln!("[Sync] Retrying in {} seconds...", backoff_secs); 825 - } 1109 + let error_count = CONSECUTIVE_ERRORS.fetch_add(1, Ordering::Relaxed) + 1; 1110 + LAST_ERROR_TIME_SECS.store(now, Ordering::Relaxed); 826 1111 827 - // Too many consecutive errors - give up 828 - if error_count >= 10 { 829 - eprintln!( 830 - "[Sync] Too many consecutive errors ({}) - shutting down", 831 - error_count 832 - ); 1112 + // Calculate backoff with exponential increase (cap at 5 minutes) 1113 + let backoff_secs = std::cmp::min(2u64.pow((error_count - 1).min(8)), 300); 833 1114 834 - if let Some(ref shutdown_tx) = self.config.shutdown_tx { 835 - let _ = shutdown_tx.send(true); 836 - } 837 - return Err(e); 838 - } 1115 + if self.config.verbose || error_count == 1 { 1116 + eprintln!( 1117 + "[Sync] Retryable error (attempt {}): {}", 1118 + error_count, error_msg 1119 + ); 1120 + eprintln!("[Sync] Retrying in {} seconds...", backoff_secs); 1121 + } 839 1122 840 - // Wait with backoff, checking for shutdown 841 - let backoff_duration = Duration::from_secs(backoff_secs); 842 - if let Some(ref shutdown_rx) = self.config.shutdown_rx { 843 - let mut shutdown_rx = shutdown_rx.clone(); 844 - tokio::select! { 845 - _ = sleep(backoff_duration) => {} 846 - _ = shutdown_rx.changed() => { 847 - if *shutdown_rx.borrow() { 848 - return Ok(()); 849 - } 850 - } 851 - } 852 - } else { 853 - sleep(backoff_duration).await; 854 - } 855 - } else { 856 - // Fatal error - shutdown immediately 857 - eprintln!("[Sync] Fatal error - shutting down: {}", error_msg); 1123 + // Too many consecutive errors - give up 1124 + if error_count >= 10 { 1125 + eprintln!( 1126 + "[Sync] Too many consecutive errors ({}) - shutting down", 1127 + error_count 1128 + ); 858 1129 859 - if let Some(ref shutdown_tx) = self.config.shutdown_tx { 860 - let _ = shutdown_tx.send(true); 1130 + if let Some(ref shutdown_tx) = self.config.shutdown_tx { 1131 + let _ = shutdown_tx.send(true); 1132 + } 1133 + return Err(e); 1134 + } 1135 + 1136 + // Wait with backoff, checking for shutdown 1137 + let backoff_duration = Duration::from_secs(backoff_secs); 1138 + if let Some(ref shutdown_rx) = self.config.shutdown_rx { 1139 + let mut shutdown_rx = shutdown_rx.clone(); 1140 + tokio::select! { 1141 + _ = sleep(backoff_duration) => {} 1142 + _ = shutdown_rx.changed() => { 1143 + if *shutdown_rx.borrow() { 1144 + return Ok(()); 861 1145 } 862 - return Err(e); 863 1146 } 864 1147 } 1148 + } else { 1149 + sleep(backoff_duration).await; 865 1150 } 1151 + } else { 1152 + // Fatal error - shutdown immediately 1153 + eprintln!("[Sync] Fatal error - shutting down: {}", error_msg); 1154 + 1155 + if let Some(ref shutdown_tx) = self.config.shutdown_tx { 1156 + let _ = shutdown_tx.send(true); 1157 + } 1158 + return Err(e); 866 1159 } 867 1160 868 1161 Ok(())