High-performance implementation of plcbundle written in Rust

feat(sync): add graceful shutdown support to sync operations

Add shutdown_rx parameter to sync_next_bundle to allow cancellation of ongoing sync operations
Modify fetch_operations to support cancellation during retries
Improve rate limiter to handle semaphore closure gracefully

+84 -13
+14 -2
src/manager.rs
··· 1747 1747 pub async fn sync_next_bundle( 1748 1748 &self, 1749 1749 client: &crate::plc_client::PLCClient, 1750 + shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>, 1750 1751 ) -> Result<SyncResult> { 1751 1752 use crate::sync::{get_boundary_cids, strip_boundary_duplicates}; 1752 1753 use std::time::Instant; ··· 1868 1869 } 1869 1870 1870 1871 let fetch_op_start = Instant::now(); 1871 - let plc_ops = client.fetch_operations(&after_time, request_count).await?; 1872 + if let Some(ref rx) = shutdown_rx { 1873 + if *rx.borrow() { 1874 + anyhow::bail!("Shutdown requested"); 1875 + } 1876 + } 1877 + let plc_ops = if let Some(rx) = shutdown_rx.clone() { 1878 + client 1879 + .fetch_operations_cancelable(&after_time, request_count, Some(rx)) 1880 + .await? 1881 + } else { 1882 + client.fetch_operations(&after_time, request_count).await? 1883 + }; 1872 1884 1873 1885 let fetched_count = plc_ops.len(); 1874 1886 ··· 2146 2158 let mut synced = 0; 2147 2159 2148 2160 loop { 2149 - match self.sync_next_bundle(client).await { 2161 + match self.sync_next_bundle(client, None).await { 2150 2162 Ok(SyncResult::BundleCreated { .. }) => { 2151 2163 synced += 1; 2152 2164
+61 -9
src/plc_client.rs
··· 69 69 70 70 /// Fetch operations from PLC directory export endpoint 71 71 pub async fn fetch_operations(&self, after: &str, count: usize) -> Result<Vec<PLCOperation>> { 72 - self.fetch_operations_with_retry(after, count, 5).await 72 + self.fetch_operations_with_retry_cancelable(after, count, 5, None).await 73 + } 74 + 75 + pub async fn fetch_operations_cancelable( 76 + &self, 77 + after: &str, 78 + count: usize, 79 + shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>, 80 + ) -> Result<Vec<PLCOperation>> { 81 + self.fetch_operations_with_retry_cancelable(after, count, 5, shutdown_rx).await 73 82 } 74 83 75 - async fn fetch_operations_with_retry( 84 + async fn fetch_operations_with_retry_cancelable( 76 85 &self, 77 86 after: &str, 78 87 count: usize, 79 88 max_retries: usize, 89 + shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>, 80 90 ) -> Result<Vec<PLCOperation>> { 81 91 let mut backoff = Duration::from_secs(1); 82 92 let mut last_err = None; 83 93 84 94 for attempt in 1..=max_retries { 95 + if let Some(ref rx) = shutdown_rx { 96 + if *rx.borrow() { 97 + anyhow::bail!("Shutdown requested"); 98 + } 99 + } 85 100 let export_url = format!( 86 101 "{}/export?after={}&count={}", 87 102 self.base_url, after, count ··· 98 113 ); 99 114 100 115 let wait_start = Instant::now(); 101 - self.rate_limiter.wait().await; 116 + if let Some(mut rx) = shutdown_rx.clone() { 117 + tokio::select! { 118 + _ = self.rate_limiter.wait() => {} 119 + _ = rx.changed() => { 120 + if *rx.borrow() { anyhow::bail!("Shutdown requested"); } 121 + } 122 + } 123 + } else { 124 + self.rate_limiter.wait().await; 125 + } 102 126 let wait_elapsed = wait_start.elapsed(); 103 127 if wait_elapsed.as_nanos() > 0 { 104 128 log::debug!("[PLCClient] Rate limiter wait: {:?}", wait_elapsed); ··· 131 155 attempt, 132 156 max_retries 133 157 ); 134 - tokio::time::sleep(retry_after).await; 158 + if let Some(mut rx) = shutdown_rx.clone() { 159 + tokio::select! { 160 + _ = tokio::time::sleep(retry_after) => {} 161 + _ = rx.changed() => { 162 + if *rx.borrow() { anyhow::bail!("Shutdown requested"); } 163 + } 164 + } 165 + } else { 166 + tokio::time::sleep(retry_after).await; 167 + } 135 168 continue; 136 169 } 137 170 ··· 144 177 last_err.as_ref().unwrap(), 145 178 backoff 146 179 ); 147 - tokio::time::sleep(backoff).await; 180 + if let Some(mut rx) = shutdown_rx.clone() { 181 + tokio::select! { 182 + _ = tokio::time::sleep(backoff) => {} 183 + _ = rx.changed() => { 184 + if *rx.borrow() { anyhow::bail!("Shutdown requested"); } 185 + } 186 + } 187 + } else { 188 + tokio::time::sleep(backoff).await; 189 + } 148 190 backoff *= 2; // Exponential backoff 149 191 } 150 192 } ··· 318 360 // Spawn background task to refill permits at steady rate 319 361 // CRITICAL: Add first permit immediately, then refill at steady rate 320 362 let refill_rate_clone = refill_rate; 363 + let capacity = requests_per_period; 321 364 tokio::spawn(async move { 322 - if sem_clone.available_permits() < 1 { 365 + // Add first permit immediately so first request can proceed 366 + if sem_clone.available_permits() < capacity { 323 367 sem_clone.add_permits(1); 324 368 } 325 369 370 + // Then refill at steady rate 326 371 loop { 327 372 tokio::time::sleep(refill_rate_clone).await; 328 - if sem_clone.available_permits() < 1 { 373 + // Add one permit if under capacity (burst allowed up to capacity) 374 + if sem_clone.available_permits() < capacity { 329 375 sem_clone.add_permits(1); 330 376 } 331 377 } ··· 335 381 } 336 382 337 383 async fn wait(&self) { 338 - let permit = self.semaphore.acquire().await.expect("semaphore closed"); 339 - permit.forget(); 384 + match self.semaphore.acquire().await { 385 + Ok(permit) => permit.forget(), 386 + Err(_) => { 387 + log::warn!( 388 + "[PLCClient] Rate limiter disabled (semaphore closed), proceeding without delay" 389 + ); 390 + } 391 + } 340 392 } 341 393 342 394 fn available_permits(&self) -> usize {
+9 -2
src/sync.rs
··· 490 490 .and_then(|v| v.as_u64()) 491 491 .unwrap_or(0); 492 492 493 - match self.manager.sync_next_bundle(&self.client).await { 493 + match self 494 + .manager 495 + .sync_next_bundle(&self.client, None) 496 + .await 497 + { 494 498 Ok(crate::manager::SyncResult::BundleCreated { 495 499 bundle_num, 496 500 mempool_count: _, ··· 613 617 .and_then(|v| v.as_u64()) 614 618 .unwrap_or(0); 615 619 616 - let sync_result = self.manager.sync_next_bundle(&self.client).await; 620 + let sync_result = self 621 + .manager 622 + .sync_next_bundle(&self.client, self.config.shutdown_rx.clone()) 623 + .await; 617 624 618 625 match sync_result { 619 626 Ok(crate::manager::SyncResult::BundleCreated {