tangled
alpha
login
or
join now
seoul.systems
/
microcosm-rs
forked from
microcosm.blue/microcosm-rs
0
fork
atom
Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0
fork
atom
overview
issues
pulls
pipelines
Fix rocks-store to match mem-store composite cursor
seoul.systems
1 month ago
6d22f695
46f5fae4
+162
-106
2 changed files
expand all
collapse all
unified
split
constellation
src
storage
mod.rs
rocks_store.rs
+105
-23
constellation/src/storage/mod.rs
···
1768
1768
});
1769
1769
1770
1770
test_each_storage!(get_m2m_single, |storage| {
1771
1771
+
// One record linking to a.com (backward), with two forward links at
1772
1772
+
// the same path_to_other (.def.uri) pointing to b.com and c.com.
1773
1773
+
// Both forward targets must appear in the output.
1771
1774
storage.push(
1772
1775
&ActionableEvent::CreateLinks {
1773
1776
record_id: RecordId {
···
1785
1788
path: ".def.uri".into(),
1786
1789
},
1787
1790
CollectedLink {
1788
1788
-
target: Link::Uri("b.com".into()),
1789
1789
-
path: ".ghi.uri".into(),
1791
1791
+
target: Link::Uri("c.com".into()),
1792
1792
+
path: ".def.uri".into(),
1790
1793
},
1791
1794
],
1792
1795
},
1793
1796
0,
1794
1797
)?;
1798
1798
+
let result = storage.get_many_to_many(
1799
1799
+
"a.com",
1800
1800
+
"app.t.c",
1801
1801
+
".abc.uri",
1802
1802
+
".def.uri",
1803
1803
+
10,
1804
1804
+
None,
1805
1805
+
&HashSet::new(),
1806
1806
+
&HashSet::new(),
1807
1807
+
)?;
1795
1808
assert_eq!(
1796
1796
-
storage.get_many_to_many(
1797
1797
-
"a.com",
1798
1798
-
"app.t.c",
1799
1799
-
".abc.uri",
1800
1800
-
".def.uri",
1801
1801
-
10,
1802
1802
-
None,
1803
1803
-
&HashSet::new(),
1804
1804
-
&HashSet::new(),
1805
1805
-
)?,
1806
1806
-
PagedOrderedCollection {
1807
1807
-
items: vec![(
1808
1808
-
RecordId {
1809
1809
-
did: "did:plc:asdf".into(),
1810
1810
-
collection: "app.t.c".into(),
1811
1811
-
rkey: "asdf".into(),
1812
1812
-
},
1813
1813
-
"b.com".to_string(),
1814
1814
-
)],
1815
1815
-
next: None,
1816
1816
-
}
1809
1809
+
result.items.len(),
1810
1810
+
2,
1811
1811
+
"both forward links at path_to_other should be emitted"
1817
1812
);
1813
1813
+
let mut targets: Vec<_> = result.items.iter().map(|(_, t)| t.as_str()).collect();
1814
1814
+
targets.sort();
1815
1815
+
assert_eq!(targets, vec!["b.com", "c.com"]);
1816
1816
+
assert!(result
1817
1817
+
.items
1818
1818
+
.iter()
1819
1819
+
.all(|(r, _)| r.did.0 == "did:plc:asdf" && r.rkey == "asdf"));
1820
1820
+
assert_eq!(result.next, None);
1818
1821
});
1819
1822
1820
1823
test_each_storage!(get_m2m_filters, |storage| {
···
2048
2051
all_rkeys,
2049
2052
vec!["asdf", "asdf2", "fdsa", "fdsa2"],
2050
2053
"should have all 4 records across both pages"
2054
2054
+
);
2055
2055
+
});
2056
2056
+
2057
2057
+
// Pagination that splits across forward links within a single backlinker.
2058
2058
+
// The cursor should correctly resume mid-record on the next page.
2059
2059
+
test_each_storage!(get_m2m_paginate_within_forward_links, |storage| {
2060
2060
+
// Record with 1 backward link and 3 forward links at the same path
2061
2061
+
storage.push(
2062
2062
+
&ActionableEvent::CreateLinks {
2063
2063
+
record_id: RecordId {
2064
2064
+
did: "did:plc:lister".into(),
2065
2065
+
collection: "app.t.c".into(),
2066
2066
+
rkey: "list1".into(),
2067
2067
+
},
2068
2068
+
links: vec![
2069
2069
+
CollectedLink {
2070
2070
+
target: Link::Uri("a.com".into()),
2071
2071
+
path: ".subject.uri".into(),
2072
2072
+
},
2073
2073
+
CollectedLink {
2074
2074
+
target: Link::Uri("x.com".into()),
2075
2075
+
path: ".items[].uri".into(),
2076
2076
+
},
2077
2077
+
CollectedLink {
2078
2078
+
target: Link::Uri("y.com".into()),
2079
2079
+
path: ".items[].uri".into(),
2080
2080
+
},
2081
2081
+
CollectedLink {
2082
2082
+
target: Link::Uri("z.com".into()),
2083
2083
+
path: ".items[].uri".into(),
2084
2084
+
},
2085
2085
+
],
2086
2086
+
},
2087
2087
+
0,
2088
2088
+
)?;
2089
2089
+
2090
2090
+
// Page 1: limit=2, should get 2 of the 3 forward links
2091
2091
+
let page1 = storage.get_many_to_many(
2092
2092
+
"a.com",
2093
2093
+
"app.t.c",
2094
2094
+
".subject.uri",
2095
2095
+
".items[].uri",
2096
2096
+
2,
2097
2097
+
None,
2098
2098
+
&HashSet::new(),
2099
2099
+
&HashSet::new(),
2100
2100
+
)?;
2101
2101
+
assert_eq!(page1.items.len(), 2, "first page should have 2 items");
2102
2102
+
assert!(
2103
2103
+
page1.next.is_some(),
2104
2104
+
"should have a next cursor for remaining item"
2105
2105
+
);
2106
2106
+
2107
2107
+
// Page 2: should get the remaining 1 forward link
2108
2108
+
let page2 = storage.get_many_to_many(
2109
2109
+
"a.com",
2110
2110
+
"app.t.c",
2111
2111
+
".subject.uri",
2112
2112
+
".items[].uri",
2113
2113
+
2,
2114
2114
+
page1.next,
2115
2115
+
&HashSet::new(),
2116
2116
+
&HashSet::new(),
2117
2117
+
)?;
2118
2118
+
assert_eq!(page2.items.len(), 1, "second page should have 1 item");
2119
2119
+
assert_eq!(page2.next, None, "no more pages");
2120
2120
+
2121
2121
+
// Verify all 3 targets appear across pages with no duplicates
2122
2122
+
let mut all_targets: Vec<_> = page1
2123
2123
+
.items
2124
2124
+
.iter()
2125
2125
+
.chain(page2.items.iter())
2126
2126
+
.map(|(_, t)| t.clone())
2127
2127
+
.collect();
2128
2128
+
all_targets.sort();
2129
2129
+
assert_eq!(
2130
2130
+
all_targets,
2131
2131
+
vec!["x.com", "y.com", "z.com"],
2132
2132
+
"all forward targets should appear exactly once across pages"
2051
2133
);
2052
2134
});
2053
2135
}
+57
-83
constellation/src/storage/rocks_store.rs
···
1199
1199
eprintln!("Target not found for {target_key:?}");
1200
1200
return Ok(PagedOrderedCollection::empty());
1201
1201
};
1202
1202
-
1203
1202
let linkers = self.get_target_linkers(&target_id)?;
1204
1203
1205
1205
-
eprintln!("linkers: {:#?}", linkers);
1206
1206
-
1207
1207
-
let mut items: Vec<(usize, TargetId, RecordId)> = Vec::new();
1204
1204
+
let mut items: Vec<(usize, usize, RecordId, String)> = Vec::new();
1208
1205
1209
1206
// iterate backwards (who linked to the target?)
1210
1207
for (linker_idx, (did_id, rkey)) in
···
1215
1212
})
1216
1213
})
1217
1214
{
1218
1218
-
// filter target did
1219
1215
if did_id.is_empty()
1220
1220
-
|| (!filter_did_ids.is_empty() && filter_did_ids.get(&did_id).is_none())
1216
1216
+
|| (!filter_did_ids.is_empty() && !filter_did_ids.contains_key(did_id))
1221
1217
{
1222
1218
continue;
1223
1219
}
1224
1220
1225
1225
-
eprintln!("did_did: {:#?}", did_id);
1226
1226
-
1227
1227
-
let Some(targets) = self.get_record_link_targets(&RecordLinkKey(
1221
1221
+
let Some(links) = self.get_record_link_targets(&RecordLinkKey(
1228
1222
*did_id,
1229
1223
collection.clone(),
1230
1224
rkey.clone(),
···
1232
1226
else {
1233
1227
continue;
1234
1228
};
1235
1235
-
let Some(fwd_target_id) =
1236
1236
-
targets
1237
1237
-
.0
1238
1238
-
.into_iter()
1239
1239
-
.find_map(|RecordLinkTarget(rpath, target_id)| {
1240
1240
-
eprintln!("rpath.0: {} vs. path_to_other: {path_to_other}", rpath.0);
1241
1241
-
if rpath.0 == path_to_other
1242
1242
-
&& (filter_to_target_ids.is_empty()
1243
1243
-
|| filter_to_target_ids.contains(&target_id))
1244
1244
-
{
1245
1245
-
Some(target_id)
1246
1246
-
} else {
1247
1247
-
None
1248
1248
-
}
1249
1249
-
})
1250
1250
-
else {
1251
1251
-
continue;
1252
1252
-
};
1253
1229
1254
1254
-
if backward_idx.is_some_and(|bl_idx| {
1255
1255
-
linker_idx == bl_idx as usize
1256
1256
-
&& forward_idx.is_some_and(|fwd_idx| fwd_target_id.0 <= fwd_idx)
1257
1257
-
}) {
1258
1258
-
continue;
1259
1259
-
}
1260
1260
-
1261
1261
-
let page_is_full = items.len() as u64 >= limit;
1262
1262
-
1263
1263
-
eprintln!(
1264
1264
-
"page_is_full: {page_is_full} for items.len(): {}",
1265
1265
-
items.len()
1266
1266
-
);
1267
1267
-
1268
1268
-
if page_is_full {
1269
1269
-
let current_max = items.iter().next_back().unwrap().1;
1270
1270
-
if fwd_target_id > current_max {
1230
1230
+
// iterate forward (which of these links point to the __other__ target?)
1231
1231
+
for (link_idx, RecordLinkTarget(_, fwd_target_id)) in links
1232
1232
+
.0
1233
1233
+
.into_iter()
1234
1234
+
.enumerate()
1235
1235
+
.filter(|(_, RecordLinkTarget(rpath, target_id))| {
1236
1236
+
eprintln!("rpath.0: {} vs. path_to_other: {path_to_other}", rpath.0);
1237
1237
+
rpath.0 == path_to_other
1238
1238
+
&& (filter_to_target_ids.is_empty()
1239
1239
+
|| filter_to_target_ids.contains(target_id))
1240
1240
+
})
1241
1241
+
.skip_while(|(link_idx, _)| {
1242
1242
+
backward_idx.is_some_and(|bl_idx| {
1243
1243
+
linker_idx == bl_idx as usize
1244
1244
+
&& forward_idx.is_some_and(|fwd_idx| *link_idx <= fwd_idx as usize)
1245
1245
+
})
1246
1246
+
})
1247
1247
+
.take(limit as usize + 1 - items.len())
1248
1248
+
{
1249
1249
+
// extract forward target did (target that links to the __other__ target)
1250
1250
+
let Some(did) = resolve_active_did(did_id) else {
1271
1251
continue;
1272
1272
-
}
1273
1273
-
}
1274
1274
-
1275
1275
-
// extract forward target did (target that links to the __other__ target)
1276
1276
-
let Some(did) = resolve_active_did(did_id) else {
1277
1277
-
continue;
1278
1278
-
};
1279
1279
-
1280
1280
-
// link to be added
1281
1281
-
let record_id = RecordId {
1282
1282
-
did,
1283
1283
-
collection: collection.0.clone(),
1284
1284
-
rkey: rkey.0.clone(),
1285
1285
-
};
1286
1286
-
items.push((linker_idx, fwd_target_id, record_id));
1287
1287
-
}
1288
1288
-
1289
1289
-
let mut backward_idx = None;
1290
1290
-
let mut forward_idx = None;
1291
1291
-
let mut items: Vec<_> = items
1292
1292
-
.iter()
1293
1293
-
.filter_map(|(b_idx, fwd_target_id, record)| {
1294
1294
-
let Some(target_key) = self
1252
1252
+
};
1253
1253
+
// resolve to target string
1254
1254
+
let Some(fwd_target_key) = self
1295
1255
.target_id_table
1296
1256
.get_val_from_id(&self.db, fwd_target_id.0)
1297
1297
-
.ok()?
1257
1257
+
.ok()
1258
1258
+
.flatten()
1298
1259
else {
1299
1299
-
eprintln!("failed to look up target from target_id {fwd_target_id:?}");
1300
1300
-
return None;
1260
1260
+
continue;
1301
1261
};
1302
1262
1303
1303
-
backward_idx = Some(b_idx);
1304
1304
-
forward_idx = Some(fwd_target_id.0 - 1);
1263
1263
+
// link to be added
1264
1264
+
let record_id = RecordId {
1265
1265
+
did,
1266
1266
+
collection: collection.0.clone(),
1267
1267
+
rkey: rkey.0.clone(),
1268
1268
+
};
1269
1269
+
items.push((linker_idx, link_idx, record_id, fwd_target_key.0 .0));
1270
1270
+
}
1305
1271
1306
1306
-
Some((record.clone(), target_key.0 .0))
1307
1307
-
})
1308
1308
-
.collect();
1272
1272
+
// page full - eject
1273
1273
+
if items.len() > limit as usize {
1274
1274
+
break;
1275
1275
+
}
1276
1276
+
}
1309
1277
1310
1310
-
// Build new cursor from last the item, if needed
1278
1278
+
// We collect up to limit + 1 fully-resolved items. If we got more than
1279
1279
+
// limit, there are more results beyond this page. We truncate to limit
1280
1280
+
// items (the actual page) and build a composite cursor from the last
1281
1281
+
// item on the page — a base64-encoded pair of (backlink_vec_idx,
1282
1282
+
// forward_link_idx). On the next request, skip_while advances past
1283
1283
+
// this position: backlinks before backlink_vec_idx are skipped entirely,
1284
1284
+
// and at backlink_vec_idx itself, forward links at or before
1285
1285
+
// forward_link_idx are skipped. This correctly resumes mid-record when
1286
1286
+
// a single backlinker has multiple forward links at path_to_other.
1311
1287
let next = if items.len() as u64 > limit {
1312
1288
items.truncate(limit as usize);
1313
1313
-
items.last().and_then(|_| {
1314
1314
-
Some(b64::URL_SAFE.encode(format!(
1315
1315
-
"{},{}",
1316
1316
-
backward_idx?.to_string(),
1317
1317
-
forward_idx?.to_string()
1318
1318
-
)))
1319
1319
-
})
1289
1289
+
items
1290
1290
+
.last()
1291
1291
+
.map(|(l, f, _, _)| b64::URL_SAFE.encode(format!("{},{}", *l as u64, *f as u64)))
1320
1292
} else {
1321
1293
None
1322
1294
};
1295
1295
+
1296
1296
+
let items = items.into_iter().map(|(_, _, rid, t)| (rid, t)).collect();
1323
1297
1324
1298
Ok(PagedOrderedCollection { items, next })
1325
1299
}