feat(sync): add safety lag to prevent race conditions in sync operations
Introduce a configurable safety lag duration to prevent race conditions where operations might be missed. https://github.com/did-method-plc/did-method-plc/issues/127
···7676/// Minimum time between bundle creation attempts (60 seconds)
7777pub const MIN_BUNDLE_CREATION_INTERVAL_SECS: i64 = 60;
78787979+/// Default safety lag for sync operations (in milliseconds)
8080+/// This buffer prevents race conditions where the indexer might miss operations
8181+/// that were committed just before the fetch but not yet visible or indexed.
8282+pub const DEFAULT_SAFETY_LAG_MS: u64 = 1000;
8383+7984// ============================================================================
8085// File and Directory Constants
8186// ============================================================================
+81-13
src/manager.rs
···17571757 shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>,
17581758 update_did_index: bool,
17591759 fetch_log: bool,
17601760+ safety_lag: Option<std::time::Duration>,
17601761 ) -> Result<SyncResult> {
17611762 use crate::sync::{get_boundary_cids, strip_boundary_duplicates};
17621763 use std::time::Instant;
···18581859 let mut total_wait = std::time::Duration::from_secs(0);
18591860 let mut total_http = std::time::Duration::from_secs(0);
1860186118621862+ // Cutoff time will be calculated per-request based on server time
18631863+ // (removed static cutoff calculation)
18641864+18611865 while fetch_num < MAX_ATTEMPTS {
18621866 let stats = self.get_mempool_stats()?;
18631867···18931897 {
18941898 anyhow::bail!("Shutdown requested");
18951899 }
18961896- let (plc_ops, wait_dur, http_dur, raw_capture_opt) = if fetch_log {
19001900+ let (plc_ops, wait_dur, http_dur, raw_capture_opt, server_time) = if fetch_log {
18971901 if let Some(rx) = shutdown_rx.clone() {
18981898- let (ops, w, h, capture_opt) = client
19021902+ let (ops, w, h, capture_opt, st) = client
18991903 .fetch_operations(&after_time, request_count, Some(rx), true)
19001904 .await?;
19011901- (ops, w, h, capture_opt)
19051905+ (ops, w, h, capture_opt, st)
19021906 } else {
19031903- let (ops, w, h, capture_opt) = client
19071907+ let (ops, w, h, capture_opt, st) = client
19041908 .fetch_operations(&after_time, request_count, None, true)
19051909 .await?;
19061906- (ops, w, h, capture_opt)
19101910+ (ops, w, h, capture_opt, st)
19071911 }
19081912 } else {
19091913 if let Some(rx) = shutdown_rx.clone() {
19101910- let (ops, w, h, _) = client
19141914+ let (ops, w, h, _, st) = client
19111915 .fetch_operations(&after_time, request_count, Some(rx), false)
19121916 .await?;
19131913- (ops, w, h, None)
19171917+ (ops, w, h, None, st)
19141918 } else {
19151915- let (ops, w, h, _) =
19191919+ let (ops, w, h, _, st) =
19161920 client.fetch_operations(&after_time, request_count, None, false).await?;
19171917- (ops, w, h, None)
19211921+ (ops, w, h, None, st)
19181922 }
19191923 };
19201924 total_wait += wait_dur;
···1937194119381942 total_fetched += fetched_count;
1939194319441944+ // Calculate cutoff time based on server time if available, otherwise local time
19451945+ let cutoff_time = if let Some(lag) = safety_lag {
19461946+ let base_time = server_time.unwrap_or_else(chrono::Utc::now);
19471947+ let cutoff = base_time - chrono::Duration::from_std(lag).unwrap_or(chrono::Duration::seconds(0));
19481948+19491949+ // Only log if we're using server time (to avoid spamming logs) or if verbose
19501950+ if *self.verbose.lock().unwrap() {
19511951+ let source = if server_time.is_some() { "server" } else { "local" };
19521952+ log::debug!(
19531953+ "Safety lag cutoff: {} (source: {}, lag: {:?})",
19541954+ cutoff.to_rfc3339(),
19551955+ source,
19561956+ lag
19571957+ );
19581958+ }
19591959+ Some(cutoff)
19601960+ } else {
19611961+ None
19621962+ };
19631963+19401964 // Convert to operations
19411941- let ops_pre: Vec<Operation> = plc_ops.into_iter().map(Into::into).collect();
19651965+ let ops_pre_raw: Vec<Operation> = plc_ops.into_iter().map(Into::into).collect();
19661966+19671967+ // Apply safety lag filtering
19681968+ let (ops_pre, filtered_count) = if let Some(cutoff) = cutoff_time {
19691969+ let mut kept = Vec::with_capacity(ops_pre_raw.len());
19701970+ let mut filtered = 0;
19711971+ for op in ops_pre_raw {
19721972+ if let Ok(op_time) = chrono::DateTime::parse_from_rfc3339(&op.created_at) {
19731973+ if op_time <= cutoff {
19741974+ kept.push(op);
19751975+ } else {
19761976+ filtered += 1;
19771977+ }
19781978+ } else {
19791979+ // If we can't parse the time, keep it (safe default? or unsafe?)
19801980+ // Keeping it is safer for data availability, but risky for consistency.
19811981+ // Given the issue is about race conditions, keeping it might be risky.
19821982+ // But failing to parse is a bigger issue. Let's keep it and log warning.
19831983+ log::warn!("Failed to parse timestamp for op {}, keeping it", op.did);
19841984+ kept.push(op);
19851985+ }
19861986+ }
19871987+ (kept, filtered)
19881988+ } else {
19891989+ (ops_pre_raw, 0)
19901990+ };
19911991+19921992+ if filtered_count > 0 {
19931993+ if *self.verbose.lock().unwrap() {
19941994+ log::info!(
19951995+ " Safety lag: filtered {} operations newer than cutoff",
19961996+ filtered_count
19971997+ );
19981998+ }
19991999+ // If we filtered any operations, we must consider ourselves "caught up"
20002000+ // because we can't proceed past the cutoff time safely.
20012001+ // We also stop fetching in this cycle.
20022002+ caught_up = true;
20032003+ }
20042004+19422005 let mut all_cids_pre: Vec<String> = Vec::new();
19432006 if fetch_log {
19442007 all_cids_pre = ops_pre
···20632126 }
2064212720652128 // Stop if we got an incomplete batch or made no progress
20662066- if got_incomplete_batch || added == 0 {
21292129+ // Also stop if we filtered operations due to safety lag (caught_up is set above)
21302130+ if got_incomplete_batch || added == 0 || (filtered_count > 0 && caught_up) {
20672131 caught_up = true;
20682132 if *self.verbose.lock().unwrap() {
20692069- log::debug!("Caught up to latest PLC data");
21332133+ if filtered_count > 0 {
21342134+ log::debug!("Caught up to safety lag cutoff");
21352135+ } else {
21362136+ log::debug!("Caught up to latest PLC data");
21372137+ }
20702138 }
20712139 break;
20722140 }
···22682336 let mut synced = 0;
2269233722702338 loop {
22712271- match self.sync_next_bundle(client, None, true, false).await {
23392339+ match self.sync_next_bundle(client, None, true, false, None).await {
22722340 Ok(SyncResult::BundleCreated { .. }) => {
22732341 synced += 1;
22742342
+14-3
src/plc_client.rs
···9090 Duration,
9191 Duration,
9292 Option<RawExportResponse>,
9393+ Option<chrono::DateTime<chrono::Utc>>,
9394 )> {
9495 self.fetch_operations_unified(after, count, shutdown_rx, capture_raw)
9596 .await
···110111 Duration,
111112 Duration,
112113 Option<RawExportResponse>,
114114+ Option<chrono::DateTime<chrono::Utc>>,
113115 )> {
114116 let mut backoff = Duration::from_secs(1);
115117 let mut last_err = None;
···157159 };
158160159161 match result {
160160- Ok((operations, http_duration, capture)) => {
162162+ Ok((operations, http_duration, capture, server_time)) => {
161163 total_http += http_duration;
162162- return Ok((operations, total_wait, total_http, capture));
164164+ return Ok((operations, total_wait, total_http, capture, server_time));
163165 }
164166 Err(e) => {
165167 last_err = Some(e);
···229231 Vec<PLCOperation>,
230232 Duration,
231233 Option<RawExportResponse>,
234234+ Option<chrono::DateTime<chrono::Utc>>,
232235 )> {
233236 let url = format!("{}/export", self.base_url);
234237 let request_start_wall = chrono::Utc::now();
···263266 None
264267 };
265268269269+ // Extract Date header for server time
270270+ let server_time = response
271271+ .headers()
272272+ .get("date")
273273+ .and_then(|v| v.to_str().ok())
274274+ .and_then(|s| httpdate::parse_http_date(s).ok())
275275+ .map(|t| chrono::DateTime::<chrono::Utc>::from(t));
276276+266277 let body = response.text().await?;
267278 let request_duration = request_start.elapsed();
268279 let mut operations = Vec::new();
···295306 None
296307 };
297308298298- Ok((operations, request_duration, capture))
309309+ Ok((operations, request_duration, capture, server_time))
299310 }
300311301312 /// Fetch DID document raw JSON from PLC directory