Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Fix get_many_to_many_counts pagination with fetch N+1

Same issue as the get_many_to_many cursor fix in the previous commit,
but simpler: the counts endpoint groups by subject so each subject
appears exactly once, meaning a subject-only cursor is sufficient.

The only problem was that when items.len() == limit, the code couldn't
distinguish "exactly limit items exist" from "more items exist but we
stopped at limit", causing a false cursor on the final page.

Fix: accumulate limit+1 items, only emit a cursor when more than limit
exist, then truncate. Applied to both MemStorage (take N+1 in the
iterator chain) and RocksDB (allow one extra group in the BTreeMap,
pop it before building results).

+82 -15
+3 -2
constellation/src/storage/mem_store.rs
··· 198 198 items = items 199 199 .into_iter() 200 200 .skip_while(|(t, _, _)| after.as_ref().map(|a| t <= a).unwrap_or(false)) 201 - .take(limit as usize) 201 + .take(limit as usize + 1) 202 202 .collect(); 203 - let next = if items.len() as u64 >= limit { 203 + let next = if items.len() as u64 > limit { 204 + items.truncate(limit as usize); 204 205 items.last().map(|(t, _, _)| t.clone()) 205 206 } else { 206 207 None
+65 -1
constellation/src/storage/mod.rs
··· 1716 1716 next: None, 1717 1717 } 1718 1718 ); 1719 + 1720 + // Pagination edge cases: we have 2 grouped results (b.com and c.com) 1721 + 1722 + // Case 1: limit > items (limit=10, items=2) -> next should be None 1723 + let result = storage.get_many_to_many_counts( 1724 + "a.com", 1725 + "app.t.c", 1726 + ".abc.uri", 1727 + ".def.uri", 1728 + 10, 1729 + None, 1730 + &HashSet::new(), 1731 + &HashSet::new(), 1732 + )?; 1733 + assert_eq!(result.items.len(), 2); 1734 + assert_eq!(result.next, None, "next should be None when items < limit"); 1735 + 1736 + // Case 2: limit == items (limit=2, items=2) -> next should be None 1737 + let result = storage.get_many_to_many_counts( 1738 + "a.com", 1739 + "app.t.c", 1740 + ".abc.uri", 1741 + ".def.uri", 1742 + 2, 1743 + None, 1744 + &HashSet::new(), 1745 + &HashSet::new(), 1746 + )?; 1747 + assert_eq!(result.items.len(), 2); 1748 + assert_eq!( 1749 + result.next, None, 1750 + "next should be None when items == limit (no more pages)" 1751 + ); 1752 + 1753 + // Case 3: limit < items (limit=1, items=2) -> next should be Some 1754 + let result = storage.get_many_to_many_counts( 1755 + "a.com", 1756 + "app.t.c", 1757 + ".abc.uri", 1758 + ".def.uri", 1759 + 1, 1760 + None, 1761 + &HashSet::new(), 1762 + &HashSet::new(), 1763 + )?; 1764 + assert_eq!(result.items.len(), 1); 1765 + assert!( 1766 + result.next.is_some(), 1767 + "next should be Some when items > limit" 1768 + ); 1769 + 1770 + // Verify second page returns remaining item with no cursor 1771 + let result2 = storage.get_many_to_many_counts( 1772 + "a.com", 1773 + "app.t.c", 1774 + ".abc.uri", 1775 + ".def.uri", 1776 + 1, 1777 + result.next, 1778 + &HashSet::new(), 1779 + &HashSet::new(), 1780 + )?; 1781 + assert_eq!(result2.items.len(), 1); 1782 + assert_eq!(result2.next, None, "next should be None on final page"); 1719 1783 }); 1720 1784 1721 1785 test_each_storage!(get_m2m_empty, |storage| { ··· 1787 1851 ); 1788 1852 }); 1789 1853 1790 - test_each_storage!(get_m2m_no_filters, |storage| { 1854 + test_each_storage!(get_m2m_filters, |storage| { 1791 1855 storage.push( 1792 1856 &ActionableEvent::CreateLinks { 1793 1857 record_id: RecordId {
+14 -12
constellation/src/storage/rocks_store.rs
··· 1033 1033 1034 1034 // aand we can skip target ids that must be on future pages 1035 1035 // (this check continues after the did-lookup, which we have to do) 1036 - let page_is_full = grouped_counts.len() as u64 >= limit; 1036 + let page_is_full = grouped_counts.len() as u64 > limit; 1037 1037 if page_is_full { 1038 - let current_max = grouped_counts.keys().next_back().unwrap(); // limit should be non-zero bleh 1038 + let current_max = grouped_counts.keys().next_back().unwrap(); 1039 1039 if fwd_target > *current_max { 1040 1040 continue; 1041 1041 } ··· 1071 1071 } 1072 1072 } 1073 1073 1074 + // If we accumulated more than limit groups, there's another page. 1075 + // Pop the extra before building items so it doesn't appear in results. 1076 + let next = if grouped_counts.len() as u64 > limit { 1077 + grouped_counts.pop_last(); 1078 + grouped_counts 1079 + .keys() 1080 + .next_back() 1081 + .map(|k| format!("{}", k.0)) 1082 + } else { 1083 + None 1084 + }; 1085 + 1074 1086 let mut items: Vec<(String, u64, u64)> = Vec::with_capacity(grouped_counts.len()); 1075 1087 for (target_id, (n, dids)) in &grouped_counts { 1076 1088 let Some(target) = self ··· 1082 1094 }; 1083 1095 items.push((target.0 .0, *n, dids.len() as u64)); 1084 1096 } 1085 - 1086 - let next = if grouped_counts.len() as u64 >= limit { 1087 - // yeah.... it's a number saved as a string......sorry 1088 - grouped_counts 1089 - .keys() 1090 - .next_back() 1091 - .map(|k| format!("{}", k.0)) 1092 - } else { 1093 - None 1094 - }; 1095 1097 1096 1098 Ok(PagedOrderedCollection { items, next }) 1097 1099 }