···11use super::{
22 LinkReader, LinkStorage, Order, PagedAppendingCollection, PagedOrderedCollection, StorageStats,
33};
44-use crate::storage::{decode_m2m_cursor, encode_m2m_cursor};
54use crate::{ActionableEvent, CountsByCount, Did, RecordId};
66-use anyhow::Result;
55+66+use anyhow::{anyhow, Result};
77+use base64::engine::general_purpose as b64;
88+use base64::Engine as _;
79use links::CollectedLink;
1010+811use std::collections::{HashMap, HashSet};
912use std::sync::{Arc, Mutex};
1013···245248 limit: u64,
246249 after: Option<String>,
247250 filter_dids: &HashSet<Did>,
248248- filter_to_targets: &HashSet<String>,
251251+ filter_targets: &HashSet<String>,
249252 ) -> Result<PagedOrderedCollection<(RecordId, String), String>> {
250250- let empty_res = Ok(PagedOrderedCollection {
251251- items: Vec::new(),
252252- next: None,
253253- });
253253+ // setup variables that we need later
254254+ let path_to_other = RecordPath(path_to_other.to_string());
255255+ let filter_targets: HashSet<Target> =
256256+ HashSet::from_iter(filter_targets.iter().map(|s| Target::new(s)));
257257+258258+ // extract parts form composite cursor
259259+ let (backward_idx, forward_idx) = match after {
260260+ Some(a) => {
261261+ let after_str = String::from_utf8(b64::URL_SAFE.decode(a)?)?;
262262+ let (b, f) = after_str
263263+ .split_once(',')
264264+ .ok_or_else(|| anyhow!("invalid cursor format"))?;
265265+ (
266266+ (!b.is_empty()).then(|| b.parse::<u64>()).transpose()?,
267267+ (!f.is_empty()).then(|| f.parse::<u64>()).transpose()?,
268268+ )
269269+ }
270270+ None => (None, None),
271271+ };
254272255273 let data = self.0.lock().unwrap();
256256-257274 let Some(sources) = data.targets.get(&Target::new(target)) else {
258258- return empty_res;
275275+ return Ok(PagedOrderedCollection::empty());
259276 };
260277 let Some(linkers) = sources.get(&Source::new(collection, path)) else {
261261- return empty_res;
278278+ return Ok(PagedOrderedCollection::empty());
262279 };
263263- let path_to_other = RecordPath::new(path_to_other);
264280265265- // Convert filter_to_targets to Target objects for comparison
266266- let filter_to_target_objs: HashSet<Target> =
267267- HashSet::from_iter(filter_to_targets.iter().map(|s| Target::new(s)));
281281+ let mut items: Vec<(usize, usize, RecordId, String)> = Vec::new();
268282269269- let mut grouped_links: HashMap<Target, Vec<RecordId>> = HashMap::new();
270270- for (did, rkey) in linkers.iter().flatten().cloned() {
271271- // Filter by DID if filter is provided
272272- if !filter_dids.is_empty() && !filter_dids.contains(&did) {
273273- continue;
274274- }
275275- if let Some(fwd_target) = data
276276- .links
277277- .get(&did)
278278- .unwrap_or(&HashMap::new())
279279- .get(&RepoId {
283283+ // iterate backwards (who linked to the target?)
284284+ for (linker_idx, (did, rkey)) in linkers
285285+ .iter()
286286+ .enumerate()
287287+ .filter_map(|(i, opt)| opt.as_ref().map(|v| (i, v)))
288288+ .skip_while(|(linker_idx, _)| {
289289+ backward_idx.is_some_and(|idx| match forward_idx {
290290+ Some(_) => *linker_idx < idx as usize, // inclusive: depend on link idx for skipping
291291+ None => *linker_idx <= idx as usize, // exclusive: skip right here
292292+ })
293293+ })
294294+ .filter(|(_, (did, _))| filter_dids.is_empty() || filter_dids.contains(&did))
295295+ {
296296+ let Some(links) = data.links.get(&did).and_then(|m| {
297297+ m.get(&RepoId {
280298 collection: collection.to_string(),
281299 rkey: rkey.clone(),
282300 })
283283- .unwrap_or(&Vec::new())
301301+ }) else {
302302+ continue;
303303+ };
304304+305305+ // iterate forward (which of these links point to the __other__ target?)
306306+ for (link_idx, (_, fwd_target)) in links
284307 .iter()
285285- .find_map(|(path, target)| {
286286- if *path == path_to_other
287287- && (filter_to_target_objs.is_empty()
288288- || filter_to_target_objs.contains(target))
289289- {
290290- Some(target)
291291- } else {
292292- None
293293- }
308308+ .enumerate()
309309+ .filter(|(_, (p, t))| {
310310+ *p == path_to_other && (filter_targets.is_empty() || filter_targets.contains(t))
294311 })
312312+ .skip_while(|(link_idx, _)| {
313313+ backward_idx.is_some_and(|bl_idx| {
314314+ linker_idx == bl_idx as usize
315315+ && forward_idx.is_some_and(|fwd_idx| *link_idx <= fwd_idx as usize)
316316+ })
317317+ })
318318+ .take(limit as usize + 1 - items.len())
295319 {
296296- let record_ids = grouped_links.entry(fwd_target.clone()).or_default();
297297- record_ids.push(RecordId {
298298- did,
299299- collection: collection.to_string(),
300300- rkey: rkey.0,
301301- });
320320+ items.push((
321321+ linker_idx,
322322+ link_idx,
323323+ RecordId {
324324+ did: did.clone(),
325325+ collection: collection.to_string(),
326326+ rkey: rkey.0.clone(),
327327+ },
328328+ fwd_target.0.clone(),
329329+ ));
302330 }
303303- }
304331305305- let mut items = grouped_links
306306- .into_iter()
307307- .flat_map(|(target, records)| {
308308- records
309309- .iter()
310310- .map(move |r| (r.clone(), target.0.clone()))
311311- .collect::<Vec<_>>()
312312- })
313313- .collect::<Vec<_>>();
314314-315315- // first try to sort by subject, then by did, collection and finally rkey
316316- items.sort_by(|a, b| {
317317- if a.1 == b.1 {
318318- a.0.cmp(&b.0)
319319- } else {
320320- a.1.cmp(&b.1)
332332+ // page full - eject
333333+ if items.len() > limit as usize {
334334+ break;
321335 }
322322- });
323323-324324- // Parse cursor if provided (malformed cursor silently ignored)
325325- let after_cursor = after.and_then(|a| decode_m2m_cursor(&a).ok());
326326-327327- // Apply cursor: skip everything up to and including the cursor position
328328- items = items
329329- .into_iter()
330330- .skip_while(|item| {
331331- let Some((after_did, after_rkey, after_subject)) = &after_cursor else {
332332- return false;
333333- };
336336+ }
334337335335- if &item.1 == after_subject {
336336- // Same subject — compare by RecordId to find our position
337337- let cursor_id = RecordId {
338338- did: Did(after_did.clone()),
339339- collection: collection.to_string(),
340340- rkey: after_rkey.clone(),
341341- };
342342- item.0.cmp(&cursor_id).is_le()
343343- } else {
344344- // Different subject — compare subjects directly
345345- item.1.cmp(after_subject).is_le()
346346- }
347347- })
348348- .take(limit as usize + 1)
349349- .collect();
350350-351351- // Build the new cursor from last item, if needed
352338 let next = if items.len() as u64 > limit {
353339 items.truncate(limit as usize);
354340 items
355341 .last()
356356- .map(|item| encode_m2m_cursor(&item.0.did.0, &item.0.rkey, &item.1))
342342+ .map(|(l, f, _, _)| b64::URL_SAFE.encode(format!("{},{}", *l as u64, *f as u64)))
357343 } else {
358344 None
359345 };
360346347347+ let items = items.into_iter().map(|(_, _, rid, t)| (rid, t)).collect();
361348 Ok(PagedOrderedCollection { items, next })
362349 }
363350
-34
constellation/src/storage/mod.rs
···66pub mod mem_store;
77pub use mem_store::MemStorage;
8899-use anyhow::anyhow;
1010-1111-use base64::engine::general_purpose as b64;
1212-use base64::Engine as _;
1313-149#[cfg(feature = "rocks")]
1510pub mod rocks_store;
1611#[cfg(feature = "rocks")]
···159154160155 /// assume all stats are estimates, since exact counts are very challenging for LSMs
161156 fn get_stats(&self) -> Result<StorageStats>;
162162-}
163163-164164-// Shared helpers
165165-166166-/// Decode a base64 cursor into its component parts (did, rkey, subject).
167167-/// The subject is placed last because it may contain '|' characters.
168168-pub(crate) fn decode_m2m_cursor(cursor: &str) -> Result<(String, String, String)> {
169169- let decoded = String::from_utf8(b64::URL_SAFE.decode(cursor)?)?;
170170- let mut parts = decoded.splitn(3, '|').map(String::from);
171171-172172- // Using .next() to pull each part out of the iterator in order.
173173- // This avoids collecting into a Vec just to index and clone back out.
174174- let did = parts
175175- .next()
176176- .ok_or_else(|| anyhow!("missing did in cursor"))?;
177177- let rkey = parts
178178- .next()
179179- .ok_or_else(|| anyhow!("missing rkey in cursor"))?;
180180- let subject = parts
181181- .next()
182182- .ok_or_else(|| anyhow!("missing subject in cursor"))?;
183183-184184- Ok((did, rkey, subject))
185185-}
186186-187187-/// Encode cursor components into a base64 string.
188188-pub(crate) fn encode_m2m_cursor(did: &str, rkey: &str, subject: &str) -> String {
189189- let raw = format!("{did}|{rkey}|{subject}");
190190- b64::URL_SAFE.encode(&raw)
191157}
192158193159#[cfg(test)]
+129-112
constellation/src/storage/rocks_store.rs
···22 ActionableEvent, LinkReader, LinkStorage, Order, PagedAppendingCollection,
33 PagedOrderedCollection, StorageStats,
44};
55-use crate::storage::{decode_m2m_cursor, encode_m2m_cursor};
65use crate::{CountsByCount, Did, RecordId};
77-use anyhow::{bail, Result};
66+77+use anyhow::{anyhow, bail, Result};
88+use base64::engine::general_purpose as b64;
99+use base64::Engine as _;
810use bincode::Options as BincodeOptions;
911use links::CollectedLink;
1012use metrics::{counter, describe_counter, describe_histogram, histogram, Unit};
···1517 MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch,
1618};
1719use serde::{Deserialize, Serialize};
2020+use tokio_util::sync::CancellationToken;
2121+1822use std::collections::{BTreeMap, HashMap, HashSet};
1923use std::io::Read;
2024use std::marker::PhantomData;
···2529};
2630use std::thread;
2731use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
2828-use tokio_util::sync::CancellationToken;
29323033static DID_IDS_CF: &str = "did_ids";
3134static TARGET_IDS_CF: &str = "target_ids";
···11361139 filter_dids: &HashSet<Did>,
11371140 filter_to_targets: &HashSet<String>,
11381141 ) -> Result<PagedOrderedCollection<(RecordId, String), String>> {
11421142+ // helper to resolve dids
11431143+ let resolve_active_did = |did_id: &DidId| -> Option<Did> {
11441144+ let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0).ok()? else {
11451145+ eprintln!("failed to look up did from did_id {did_id:?}");
11461146+ return None;
11471147+ };
11481148+ let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did).ok()?
11491149+ else {
11501150+ eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?");
11511151+ return None;
11521152+ };
11531153+ active.then_some(did)
11541154+ };
11551155+11561156+ // setup variables that we need later
11391157 let collection = Collection(collection.to_string());
11401158 let path = RPath(path.to_string());
1141115911421142- let target_key = TargetKey(Target(target.to_string()), collection.clone(), path);
11601160+ // extract parts form composite cursor
11611161+ let (backward_idx, forward_idx) = match after {
11621162+ Some(a) => {
11631163+ eprintln!("a: {:#?}", a);
1143116411441144- // Parse cursor if provided (malformed cursor silently ignored)
11451145- let after_cursor = after.and_then(|a| decode_m2m_cursor(&a).ok());
11651165+ let after_str = String::from_utf8(b64::URL_SAFE.decode(a)?)?;
1146116611471147- let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
11481148- eprintln!("Target not found for {target_key:?}");
11491149- return Ok(PagedOrderedCollection::empty());
11671167+ eprintln!("after_str: {:#?}", after_str);
11681168+ let (b, f) = after_str
11691169+ .split_once(',')
11701170+ .ok_or_else(|| anyhow!("invalid cursor format"))?;
11711171+ (
11721172+ (!b.is_empty()).then(|| b.parse::<u64>()).transpose()?,
11731173+ (!f.is_empty()).then(|| f.parse::<u64>()).transpose()?,
11741174+ )
11751175+ }
11761176+ None => (None, None),
11501177 };
1151117811791179+ eprintln!("backward_idx: {:#?}", backward_idx);
11801180+ eprintln!("forward_idx: {:#?}", forward_idx);
11811181+11821182+ // (__active__) did ids and filter targets
11521183 let filter_did_ids: HashMap<DidId, bool> = filter_dids
11531184 .iter()
11541185 .filter_map(|did| self.did_id_table.get_id_val(&self.db, did).transpose())
···11561187 .into_iter()
11571188 .map(|DidIdValue(id, active)| (id, active))
11581189 .collect();
11591159-11601190 let mut filter_to_target_ids: HashSet<TargetId> = HashSet::new();
11611191 for t in filter_to_targets {
11621192 for (_, target_id) in self.iter_targets_for_target(&Target(t.to_string())) {
···11641194 }
11651195 }
1166119611971197+ let target_key = TargetKey(Target(target.to_string()), collection.clone(), path);
11981198+ let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else {
11991199+ eprintln!("Target not found for {target_key:?}");
12001200+ return Ok(PagedOrderedCollection::empty());
12011201+ };
12021202+11671203 let linkers = self.get_target_linkers(&target_id)?;
1168120411691169- // we want to provide many to many which effectively means that we want to show a specific
11701170- // list of reords that is linked to by a specific number of linkers
11711171- let mut grouped_links: BTreeMap<TargetId, Vec<RecordId>> = BTreeMap::new();
11721172- for (did_id, rkey) in linkers.0 {
11731173- if did_id.is_empty() {
12051205+ eprintln!("linkers: {:#?}", linkers);
12061206+12071207+ let mut items: Vec<(usize, TargetId, RecordId)> = Vec::new();
12081208+12091209+ // iterate backwards (who linked to the target?)
12101210+ for (linker_idx, (did_id, rkey)) in
12111211+ linkers.0.iter().enumerate().skip_while(|(linker_idx, _)| {
12121212+ backward_idx.is_some_and(|idx| match forward_idx {
12131213+ Some(_) => *linker_idx < idx as usize, // inclusive: depend on link idx for skipping
12141214+ None => *linker_idx <= idx as usize, // exclusive: skip right here
12151215+ })
12161216+ })
12171217+ {
12181218+ // filter target did
12191219+ if did_id.is_empty()
12201220+ || (!filter_did_ids.is_empty() && filter_did_ids.get(&did_id).is_none())
12211221+ {
11741222 continue;
11751223 }
1176122411771177- if !filter_did_ids.is_empty() && filter_did_ids.get(&did_id) != Some(&true) {
11781178- continue;
11791179- }
12251225+ eprintln!("did_did: {:#?}", did_id);
1180122611811181- // Make sure the current did is active
11821182- let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? else {
11831183- eprintln!("failed to look up did from did_id {did_id:?}");
12271227+ let Some(targets) = self.get_record_link_targets(&RecordLinkKey(
12281228+ *did_id,
12291229+ collection.clone(),
12301230+ rkey.clone(),
12311231+ ))?
12321232+ else {
11841233 continue;
11851234 };
11861186- let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? else {
11871187- eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?");
12351235+ let Some(fwd_target_id) =
12361236+ targets
12371237+ .0
12381238+ .into_iter()
12391239+ .find_map(|RecordLinkTarget(rpath, target_id)| {
12401240+ eprintln!("rpath.0: {} vs. path_to_other: {path_to_other}", rpath.0);
12411241+ if rpath.0 == path_to_other
12421242+ && (filter_to_target_ids.is_empty()
12431243+ || filter_to_target_ids.contains(&target_id))
12441244+ {
12451245+ Some(target_id)
12461246+ } else {
12471247+ None
12481248+ }
12491249+ })
12501250+ else {
11881251 continue;
11891252 };
11901190- if !active {
12531253+12541254+ if backward_idx.is_some_and(|bl_idx| {
12551255+ linker_idx == bl_idx as usize
12561256+ && forward_idx.is_some_and(|fwd_idx| fwd_target_id.0 <= fwd_idx)
12571257+ }) {
11911258 continue;
11921259 }
1193126011941194- let record_link_key = RecordLinkKey(did_id, collection.clone(), rkey.clone());
11951195- let Some(targets) = self.get_record_link_targets(&record_link_key)? else {
11961196- continue;
11971197- };
12611261+ let page_is_full = items.len() as u64 >= limit;
1198126211991199- let Some(fwd_target) = targets
12001200- .0
12011201- .into_iter()
12021202- .filter_map(|RecordLinkTarget(rpath, target_id)| {
12031203- if rpath.0 == path_to_other
12041204- && (filter_to_target_ids.is_empty()
12051205- || filter_to_target_ids.contains(&target_id))
12061206- {
12071207- Some(target_id)
12081208- } else {
12091209- None
12101210- }
12111211- })
12121212- .take(1)
12131213- .next()
12141214- else {
12151215- eprintln!("no forward match found.");
12161216- continue;
12171217- };
12631263+ eprintln!(
12641264+ "page_is_full: {page_is_full} for items.len(): {}",
12651265+ items.len()
12661266+ );
1218126712191219- let page_is_full = grouped_links.len() as u64 >= limit;
12201268 if page_is_full {
12211221- let current_max = grouped_links.keys().next_back().unwrap();
12221222- if fwd_target > *current_max {
12691269+ let current_max = items.iter().next_back().unwrap().1;
12701270+ if fwd_target_id > current_max {
12231271 continue;
12241272 }
12251273 }
12741274+12751275+ // extract forward target did (target that links to the __other__ target)
12761276+ let Some(did) = resolve_active_did(did_id) else {
12771277+ continue;
12781278+ };
1226127912271280 // link to be added
12281281 let record_id = RecordId {
12291282 did,
12301283 collection: collection.0.clone(),
12311231- rkey: rkey.0,
12841284+ rkey: rkey.0.clone(),
12321285 };
12861286+ items.push((linker_idx, fwd_target_id, record_id));
12871287+ }
1233128812341234- // pagination:
12351235- if after_cursor.is_some() {
12361236- // extract composite-cursor parts
12371237- let Some((after_did, after_rkey, after_subject)) = &after_cursor else {
12381238- continue;
12391239- };
12401240-12411241- let Some(fwd_target_key) = self
12891289+ let mut backward_idx = None;
12901290+ let mut forward_idx = None;
12911291+ let mut items: Vec<_> = items
12921292+ .iter()
12931293+ .filter_map(|(b_idx, fwd_target_id, record)| {
12941294+ let Some(target_key) = self
12421295 .target_id_table
12431243- .get_val_from_id(&self.db, fwd_target.0)?
12961296+ .get_val_from_id(&self.db, fwd_target_id.0)
12971297+ .ok()?
12441298 else {
12451245- eprintln!("failed to look up target from target_id {fwd_target:?}");
12461246- continue;
12991299+ eprintln!("failed to look up target from target_id {fwd_target_id:?}");
13001300+ return None;
12471301 };
1248130212491249- // first try and compare by subject only
12501250- if &fwd_target_key.0 .0 != after_subject
12511251- && fwd_target_key.0 .0.cmp(after_subject).is_le()
12521252- {
12531253- continue;
12541254- }
13031303+ backward_idx = Some(b_idx);
13041304+ forward_idx = Some(fwd_target_id.0 - 1);
1255130512561256- // then, if needed, we compare by record id
12571257- let cursor_id = RecordId {
12581258- did: Did(after_did.clone()),
12591259- collection: collection.0.clone(),
12601260- rkey: after_rkey.clone(),
12611261- };
12621262- if record_id.cmp(&cursor_id).is_le() {
12631263- continue;
12641264- }
12651265- }
12661266-12671267- // pagination, continued
12681268- let mut should_evict = false;
12691269- let entry = grouped_links.entry(fwd_target.clone()).or_insert_with(|| {
12701270- should_evict = page_is_full;
12711271- Vec::default()
12721272- });
12731273- entry.push(record_id);
12741274-12751275- if should_evict {
12761276- grouped_links.pop_last();
12771277- }
12781278- }
12791279-12801280- let mut items: Vec<(RecordId, String)> = Vec::with_capacity(grouped_links.len());
12811281- for (fwd_target_id, records) in &grouped_links {
12821282- let Some(target_key) = self
12831283- .target_id_table
12841284- .get_val_from_id(&self.db, fwd_target_id.0)?
12851285- else {
12861286- eprintln!("failed to look up target from target_id {fwd_target_id:?}");
12871287- continue;
12881288- };
12891289-12901290- let target_string = target_key.0 .0;
12911291-12921292- records
12931293- .iter()
12941294- .for_each(|r| items.push((r.clone(), target_string.clone())));
12951295- }
13061306+ Some((record.clone(), target_key.0 .0))
13071307+ })
13081308+ .collect();
1296130912971310 // Build new cursor from last the item, if needed
12981311 let next = if items.len() as u64 > limit {
12991312 items.truncate(limit as usize);
13001300- items
13011301- .last()
13021302- .map(|item| encode_m2m_cursor(&item.0.did.0, &item.0.rkey, &item.1))
13131313+ items.last().and_then(|_| {
13141314+ Some(b64::URL_SAFE.encode(format!(
13151315+ "{},{}",
13161316+ backward_idx?.to_string(),
13171317+ forward_idx?.to_string()
13181318+ )))
13191319+ })
13031320 } else {
13041321 None
13051322 };
···16451662}
1646166316471664// target ids
16481648-#[derive(Debug, Clone, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq, Hash)]
16651665+#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq, Hash)]
16491666struct TargetId(u64); // key
1650166716511668#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]