tangled
alpha
login
or
join now
danabra.mov
/
pdsfs
forked from
oppi.li/pdsfs
1
fork
atom
mount an atproto PDS repository as a FUSE filesystem
1
fork
atom
overview
issues
pulls
pipelines
watch firehose
danabra.mov
5 months ago
15b22fe2
c9898260
+584
-75
5 changed files
expand all
collapse all
unified
split
Cargo.lock
Cargo.toml
src
firehose.rs
fs.rs
main.rs
+101
-7
Cargo.lock
reviewed
···
98
98
]
99
99
100
100
[[package]]
101
101
+
name = "anyhow"
102
102
+
version = "1.0.100"
103
103
+
source = "registry+https://github.com/rust-lang/crates.io-index"
104
104
+
checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61"
105
105
+
106
106
+
[[package]]
101
107
name = "async-channel"
102
108
version = "1.9.0"
103
109
source = "registry+https://github.com/rust-lang/crates.io-index"
···
327
333
version = "3.19.0"
328
334
source = "registry+https://github.com/rust-lang/crates.io-index"
329
335
checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43"
336
336
+
337
337
+
[[package]]
338
338
+
name = "byteorder"
339
339
+
version = "1.5.0"
340
340
+
source = "registry+https://github.com/rust-lang/crates.io-index"
341
341
+
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
330
342
331
343
[[package]]
332
344
name = "bytes"
···
669
681
checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad"
670
682
dependencies = [
671
683
"libc",
672
672
-
"windows-sys 0.59.0",
684
684
+
"windows-sys 0.60.2",
673
685
]
674
686
675
687
[[package]]
···
991
1003
"idna",
992
1004
"ipnet",
993
1005
"once_cell",
994
994
-
"rand",
1006
1006
+
"rand 0.9.2",
995
1007
"ring",
996
1008
"thiserror 2.0.12",
997
1009
"tinyvec",
···
1013
1025
"moka",
1014
1026
"once_cell",
1015
1027
"parking_lot",
1016
1016
-
"rand",
1028
1028
+
"rand 0.9.2",
1017
1029
"resolv-conf",
1018
1030
"smallvec",
1019
1031
"thiserror 2.0.12",
···
1757
1769
name = "pdsfs"
1758
1770
version = "0.1.0"
1759
1771
dependencies = [
1772
1772
+
"anyhow",
1760
1773
"atrium-api",
1761
1774
"atrium-common",
1762
1775
"atrium-identity",
···
1776
1789
"serde_json",
1777
1790
"thiserror 2.0.12",
1778
1791
"tokio",
1792
1792
+
"tokio-tungstenite",
1779
1793
"xdg",
1780
1794
]
1781
1795
···
1889
1903
1890
1904
[[package]]
1891
1905
name = "rand"
1906
1906
+
version = "0.8.5"
1907
1907
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1908
1908
+
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
1909
1909
+
dependencies = [
1910
1910
+
"libc",
1911
1911
+
"rand_chacha 0.3.1",
1912
1912
+
"rand_core 0.6.4",
1913
1913
+
]
1914
1914
+
1915
1915
+
[[package]]
1916
1916
+
name = "rand"
1892
1917
version = "0.9.2"
1893
1918
source = "registry+https://github.com/rust-lang/crates.io-index"
1894
1919
checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1"
1895
1920
dependencies = [
1896
1896
-
"rand_chacha",
1897
1897
-
"rand_core",
1921
1921
+
"rand_chacha 0.9.0",
1922
1922
+
"rand_core 0.9.3",
1923
1923
+
]
1924
1924
+
1925
1925
+
[[package]]
1926
1926
+
name = "rand_chacha"
1927
1927
+
version = "0.3.1"
1928
1928
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1929
1929
+
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
1930
1930
+
dependencies = [
1931
1931
+
"ppv-lite86",
1932
1932
+
"rand_core 0.6.4",
1898
1933
]
1899
1934
1900
1935
[[package]]
···
1904
1939
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
1905
1940
dependencies = [
1906
1941
"ppv-lite86",
1907
1907
-
"rand_core",
1942
1942
+
"rand_core 0.9.3",
1943
1943
+
]
1944
1944
+
1945
1945
+
[[package]]
1946
1946
+
name = "rand_core"
1947
1947
+
version = "0.6.4"
1948
1948
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1949
1949
+
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
1950
1950
+
dependencies = [
1951
1951
+
"getrandom 0.2.16",
1908
1952
]
1909
1953
1910
1954
[[package]]
···
2057
2101
"errno",
2058
2102
"libc",
2059
2103
"linux-raw-sys",
2060
2060
-
"windows-sys 0.59.0",
2104
2104
+
"windows-sys 0.60.2",
2061
2105
]
2062
2106
2063
2107
[[package]]
···
2234
2278
]
2235
2279
2236
2280
[[package]]
2281
2281
+
name = "sha1"
2282
2282
+
version = "0.10.6"
2283
2283
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2284
2284
+
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
2285
2285
+
dependencies = [
2286
2286
+
"cfg-if",
2287
2287
+
"cpufeatures",
2288
2288
+
"digest",
2289
2289
+
]
2290
2290
+
2291
2291
+
[[package]]
2237
2292
name = "sha2"
2238
2293
version = "0.10.9"
2239
2294
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2515
2570
]
2516
2571
2517
2572
[[package]]
2573
2573
+
name = "tokio-tungstenite"
2574
2574
+
version = "0.24.0"
2575
2575
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2576
2576
+
checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9"
2577
2577
+
dependencies = [
2578
2578
+
"futures-util",
2579
2579
+
"log",
2580
2580
+
"native-tls",
2581
2581
+
"tokio",
2582
2582
+
"tokio-native-tls",
2583
2583
+
"tungstenite",
2584
2584
+
]
2585
2585
+
2586
2586
+
[[package]]
2518
2587
name = "tokio-util"
2519
2588
version = "0.7.15"
2520
2589
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2663
2732
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
2664
2733
2665
2734
[[package]]
2735
2735
+
name = "tungstenite"
2736
2736
+
version = "0.24.0"
2737
2737
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2738
2738
+
checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a"
2739
2739
+
dependencies = [
2740
2740
+
"byteorder",
2741
2741
+
"bytes",
2742
2742
+
"data-encoding",
2743
2743
+
"http 1.3.1",
2744
2744
+
"httparse",
2745
2745
+
"log",
2746
2746
+
"native-tls",
2747
2747
+
"rand 0.8.5",
2748
2748
+
"sha1",
2749
2749
+
"thiserror 1.0.69",
2750
2750
+
"utf-8",
2751
2751
+
]
2752
2752
+
2753
2753
+
[[package]]
2666
2754
name = "typenum"
2667
2755
version = "1.18.0"
2668
2756
source = "registry+https://github.com/rust-lang/crates.io-index"
···
2712
2800
"idna",
2713
2801
"percent-encoding",
2714
2802
]
2803
2803
+
2804
2804
+
[[package]]
2805
2805
+
name = "utf-8"
2806
2806
+
version = "0.7.6"
2807
2807
+
source = "registry+https://github.com/rust-lang/crates.io-index"
2808
2808
+
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
2715
2809
2716
2810
[[package]]
2717
2811
name = "utf8_iter"
+4
-2
Cargo.toml
reviewed
···
4
4
edition = "2024"
5
5
6
6
[dependencies]
7
7
+
anyhow = "1.0"
7
8
atrium-api = "0.25.4"
8
9
atrium-common = "0.1.2"
9
10
atrium-identity = "0.1.5"
···
11
12
atrium-xrpc = "0.12.3"
12
13
atrium-xrpc-client = { version = "0.5.14", features=["isahc"] }
13
14
clap = { version = "4.5.41", features = ["cargo"] }
14
14
-
fuser = "0.15.1"
15
15
+
fuser = { version = "0.15.1", features = ["abi-7-18"] }
15
16
futures = "0.3.31"
16
17
hickory-resolver = "0.25.2"
17
18
indexmap = "2.10.0"
···
22
23
serde_ipld_dagcbor = "0.6.3"
23
24
serde_json = "1.0.141"
24
25
thiserror = "2.0.12"
25
25
-
tokio = { version = "1.46.1", features = ["fs"] }
26
26
+
tokio = { version = "1.46.1", features = ["fs", "sync", "rt-multi-thread"] }
27
27
+
tokio-tungstenite = { version = "0.24", features = ["native-tls"] }
26
28
xdg = "3.0.0"
+318
src/firehose.rs
reviewed
···
1
1
+
use anyhow::{anyhow, Result};
2
2
+
use atrium_api::com::atproto::sync::subscribe_repos::{Commit, NSID};
3
3
+
use atrium_api::client::AtpServiceClient;
4
4
+
use atrium_api::com;
5
5
+
use atrium_api::types;
6
6
+
use atrium_xrpc_client::isahc::IsahcClient;
7
7
+
use futures::StreamExt;
8
8
+
use ipld_core::ipld::Ipld;
9
9
+
use std::io::Cursor;
10
10
+
use std::sync::{Arc, Mutex};
11
11
+
use tokio_tungstenite::connect_async;
12
12
+
use tokio_tungstenite::tungstenite::Message;
13
13
+
14
14
+
use crate::fs::{PdsFsCollection, PdsFsEntry, PdsFsRecord};
15
15
+
use indexmap::{IndexMap, IndexSet};
16
16
+
17
17
+
/// Frame header types for WebSocket messages
18
18
+
#[derive(Debug, Clone, PartialEq, Eq)]
19
19
+
enum FrameHeader {
20
20
+
Message(Option<String>),
21
21
+
Error,
22
22
+
}
23
23
+
24
24
+
impl TryFrom<Ipld> for FrameHeader {
25
25
+
type Error = anyhow::Error;
26
26
+
27
27
+
fn try_from(value: Ipld) -> Result<Self> {
28
28
+
if let Ipld::Map(map) = value {
29
29
+
if let Some(Ipld::Integer(i)) = map.get("op") {
30
30
+
match i {
31
31
+
1 => {
32
32
+
let t = if let Some(Ipld::String(s)) = map.get("t") {
33
33
+
Some(s.clone())
34
34
+
} else {
35
35
+
None
36
36
+
};
37
37
+
return Ok(FrameHeader::Message(t));
38
38
+
}
39
39
+
-1 => return Ok(FrameHeader::Error),
40
40
+
_ => {}
41
41
+
}
42
42
+
}
43
43
+
}
44
44
+
Err(anyhow!("invalid frame type"))
45
45
+
}
46
46
+
}
47
47
+
48
48
+
/// Frame types for parsed WebSocket messages
49
49
+
#[derive(Debug, Clone, PartialEq, Eq)]
50
50
+
pub enum Frame {
51
51
+
Message(Option<String>, MessageFrame),
52
52
+
Error(ErrorFrame),
53
53
+
}
54
54
+
55
55
+
#[derive(Debug, Clone, PartialEq, Eq)]
56
56
+
pub struct MessageFrame {
57
57
+
pub body: Vec<u8>,
58
58
+
}
59
59
+
60
60
+
#[derive(Debug, Clone, PartialEq, Eq)]
61
61
+
pub struct ErrorFrame {}
62
62
+
63
63
+
impl TryFrom<&[u8]> for Frame {
64
64
+
type Error = anyhow::Error;
65
65
+
66
66
+
fn try_from(value: &[u8]) -> Result<Self> {
67
67
+
let mut cursor = Cursor::new(value);
68
68
+
let (left, right) = match serde_ipld_dagcbor::from_reader::<Ipld, _>(&mut cursor) {
69
69
+
Err(serde_ipld_dagcbor::DecodeError::TrailingData) => {
70
70
+
value.split_at(cursor.position() as usize)
71
71
+
}
72
72
+
_ => {
73
73
+
return Err(anyhow!("invalid frame type"));
74
74
+
}
75
75
+
};
76
76
+
let header = FrameHeader::try_from(serde_ipld_dagcbor::from_slice::<Ipld>(left)?)?;
77
77
+
if let FrameHeader::Message(t) = &header {
78
78
+
Ok(Frame::Message(t.clone(), MessageFrame { body: right.to_vec() }))
79
79
+
} else {
80
80
+
Ok(Frame::Error(ErrorFrame {}))
81
81
+
}
82
82
+
}
83
83
+
}
84
84
+
85
85
+
/// Subscribe to a repo's firehose and update inodes on changes
86
86
+
pub async fn subscribe_to_repo<R>(
87
87
+
did: String,
88
88
+
pds: String,
89
89
+
inodes: Arc<Mutex<IndexSet<PdsFsEntry>>>,
90
90
+
sizes: Arc<Mutex<IndexMap<usize, u64>>>,
91
91
+
content_cache: Arc<Mutex<IndexMap<String, String>>>,
92
92
+
notifier: fuser::Notifier,
93
93
+
) -> Result<()>
94
94
+
where
95
95
+
R: atrium_repo::blockstore::AsyncBlockStoreRead,
96
96
+
{
97
97
+
// Strip https:// or http:// prefix from PDS URL if present
98
98
+
let pds_host = pds.trim_start_matches("https://").trim_start_matches("http://");
99
99
+
let url = format!("wss://{}/xrpc/{}", pds_host, NSID);
100
100
+
println!("Connecting to firehose: {}", url);
101
101
+
102
102
+
let (mut stream, _) = connect_async(url).await?;
103
103
+
println!("Connected to firehose for {}", did);
104
104
+
105
105
+
loop {
106
106
+
match stream.next().await {
107
107
+
Some(Ok(Message::Binary(data))) => {
108
108
+
if let Ok(Frame::Message(Some(t), msg)) = Frame::try_from(data.as_slice()) {
109
109
+
if t.as_str() == "#commit" {
110
110
+
if let Ok(commit) = serde_ipld_dagcbor::from_reader::<Commit, _>(msg.body.as_slice()) {
111
111
+
// Only process commits for our DID
112
112
+
if commit.repo.as_str() == did {
113
113
+
if let Err(e) = handle_commit(&commit, &inodes, &sizes, &content_cache, &did, &pds, ¬ifier).await {
114
114
+
eprintln!("Error handling commit: {:?}", e);
115
115
+
}
116
116
+
}
117
117
+
}
118
118
+
}
119
119
+
}
120
120
+
}
121
121
+
Some(Ok(_)) => {} // Ignore other message types
122
122
+
Some(Err(e)) => {
123
123
+
eprintln!("WebSocket error: {}", e);
124
124
+
break;
125
125
+
}
126
126
+
None => {
127
127
+
eprintln!("WebSocket closed");
128
128
+
break;
129
129
+
}
130
130
+
}
131
131
+
}
132
132
+
133
133
+
Ok(())
134
134
+
}
135
135
+
136
136
+
/// Handle a commit by updating the inode tree and notifying Finder
137
137
+
async fn handle_commit(
138
138
+
commit: &Commit,
139
139
+
inodes: &Arc<Mutex<IndexSet<PdsFsEntry>>>,
140
140
+
sizes: &Arc<Mutex<IndexMap<usize, u64>>>,
141
141
+
content_cache: &Arc<Mutex<IndexMap<String, String>>>,
142
142
+
did: &str,
143
143
+
pds: &str,
144
144
+
notifier: &fuser::Notifier,
145
145
+
) -> Result<()> {
146
146
+
// Find the DID inode
147
147
+
let did_entry = PdsFsEntry::Did(did.to_string());
148
148
+
let did_inode = {
149
149
+
let inodes_lock = inodes.lock().unwrap();
150
150
+
inodes_lock.get_index_of(&did_entry)
151
151
+
};
152
152
+
153
153
+
let Some(did_inode) = did_inode else {
154
154
+
return Err(anyhow!("DID not found in inodes"));
155
155
+
};
156
156
+
157
157
+
for op in &commit.ops {
158
158
+
let Some((collection, rkey)) = op.path.split_once('/') else {
159
159
+
continue;
160
160
+
};
161
161
+
162
162
+
match op.action.as_str() {
163
163
+
"create" => {
164
164
+
// Fetch the record from PDS
165
165
+
let record_key = format!("{}/{}", collection, rkey);
166
166
+
let cache_key = format!("{}/{}", did, record_key);
167
167
+
168
168
+
// Fetch record content from PDS
169
169
+
match fetch_record(pds, did, collection, rkey).await {
170
170
+
Ok(content) => {
171
171
+
let content_len = content.len() as u64;
172
172
+
173
173
+
// Add the record to inodes
174
174
+
let (collection_inode, record_inode) = {
175
175
+
let mut inodes_lock = inodes.lock().unwrap();
176
176
+
177
177
+
// Ensure collection exists
178
178
+
let collection_entry = PdsFsEntry::Collection(PdsFsCollection {
179
179
+
parent: did_inode,
180
180
+
nsid: collection.to_string(),
181
181
+
});
182
182
+
let (collection_inode, _) = inodes_lock.insert_full(collection_entry);
183
183
+
184
184
+
// Add the record
185
185
+
let record_entry = PdsFsEntry::Record(PdsFsRecord {
186
186
+
parent: collection_inode,
187
187
+
rkey: rkey.to_string(),
188
188
+
});
189
189
+
let (record_inode, _) = inodes_lock.insert_full(record_entry);
190
190
+
(collection_inode, record_inode)
191
191
+
};
192
192
+
193
193
+
// Cache the content and size
194
194
+
content_cache.lock().unwrap().insert(cache_key, content);
195
195
+
sizes.lock().unwrap().insert(record_inode, content_len);
196
196
+
197
197
+
// Notify Finder about the new file (release lock first)
198
198
+
let filename = format!("{}.json", rkey);
199
199
+
if let Err(e) = notifier.inval_entry(collection_inode as u64, filename.as_ref()) {
200
200
+
eprintln!("Failed to invalidate entry for {}: {}", filename, e);
201
201
+
}
202
202
+
203
203
+
println!("Created: {}/{}", collection, rkey);
204
204
+
}
205
205
+
Err(e) => {
206
206
+
eprintln!("Failed to fetch record {}/{}: {}", collection, rkey, e);
207
207
+
}
208
208
+
}
209
209
+
}
210
210
+
"delete" => {
211
211
+
// Get inodes before removing
212
212
+
let (collection_inode_opt, child_inode_opt) = {
213
213
+
let mut inodes_lock = inodes.lock().unwrap();
214
214
+
215
215
+
// Find the collection
216
216
+
let collection_entry = PdsFsEntry::Collection(PdsFsCollection {
217
217
+
parent: did_inode,
218
218
+
nsid: collection.to_string(),
219
219
+
});
220
220
+
let collection_inode = inodes_lock.get_index_of(&collection_entry);
221
221
+
222
222
+
// Find and remove the record
223
223
+
let child_inode = if let Some(coll_ino) = collection_inode {
224
224
+
let record_entry = PdsFsEntry::Record(PdsFsRecord {
225
225
+
parent: coll_ino,
226
226
+
rkey: rkey.to_string(),
227
227
+
});
228
228
+
let child_ino = inodes_lock.get_index_of(&record_entry);
229
229
+
inodes_lock.shift_remove(&record_entry);
230
230
+
child_ino
231
231
+
} else {
232
232
+
None
233
233
+
};
234
234
+
235
235
+
(collection_inode, child_inode)
236
236
+
};
237
237
+
238
238
+
// Notify Finder about the deletion (release lock first)
239
239
+
if let (Some(coll_ino), Some(child_ino)) = (collection_inode_opt, child_inode_opt) {
240
240
+
// Remove from caches
241
241
+
sizes.lock().unwrap().shift_remove(&child_ino);
242
242
+
let cache_key = format!("{}/{}/{}", did, collection, rkey);
243
243
+
content_cache.lock().unwrap().shift_remove(&cache_key);
244
244
+
245
245
+
let filename = format!("{}.json", rkey);
246
246
+
if let Err(e) = notifier.delete(coll_ino as u64, child_ino as u64, filename.as_ref()) {
247
247
+
eprintln!("Failed to notify deletion for {}: {}", filename, e);
248
248
+
}
249
249
+
}
250
250
+
251
251
+
println!("Deleted: {}/{}", collection, rkey);
252
252
+
}
253
253
+
"update" => {
254
254
+
// For updates, invalidate the inode so content is re-fetched
255
255
+
let record_inode_opt = {
256
256
+
let inodes_lock = inodes.lock().unwrap();
257
257
+
let collection_entry = PdsFsEntry::Collection(PdsFsCollection {
258
258
+
parent: did_inode,
259
259
+
nsid: collection.to_string(),
260
260
+
});
261
261
+
262
262
+
if let Some(collection_inode) = inodes_lock.get_index_of(&collection_entry) {
263
263
+
let record_entry = PdsFsEntry::Record(PdsFsRecord {
264
264
+
parent: collection_inode,
265
265
+
rkey: rkey.to_string(),
266
266
+
});
267
267
+
inodes_lock.get_index_of(&record_entry)
268
268
+
} else {
269
269
+
None
270
270
+
}
271
271
+
};
272
272
+
273
273
+
// Notify Finder to invalidate the inode (release lock first)
274
274
+
if let Some(record_ino) = record_inode_opt {
275
275
+
// Clear caches so content is recalculated
276
276
+
sizes.lock().unwrap().shift_remove(&record_ino);
277
277
+
let cache_key = format!("{}/{}/{}", did, collection, rkey);
278
278
+
content_cache.lock().unwrap().shift_remove(&cache_key);
279
279
+
280
280
+
// Invalidate the entire inode (metadata and all data)
281
281
+
if let Err(e) = notifier.inval_inode(record_ino as u64, 0, 0) {
282
282
+
eprintln!("Failed to invalidate inode for {}/{}: {}", collection, rkey, e);
283
283
+
}
284
284
+
}
285
285
+
286
286
+
println!("Updated: {}/{}", collection, rkey);
287
287
+
}
288
288
+
_ => {}
289
289
+
}
290
290
+
}
291
291
+
292
292
+
Ok(())
293
293
+
}
294
294
+
295
295
+
/// Fetch a record from the PDS
296
296
+
async fn fetch_record(pds: &str, did: &str, collection: &str, rkey: &str) -> Result<String> {
297
297
+
let client = AtpServiceClient::new(IsahcClient::new(pds));
298
298
+
let did = types::string::Did::new(did.to_string()).map_err(|e| anyhow!(e))?;
299
299
+
let collection_nsid = types::string::Nsid::new(collection.to_string()).map_err(|e| anyhow!(e))?;
300
300
+
let record_key = types::string::RecordKey::new(rkey.to_string()).map_err(|e| anyhow!(e))?;
301
301
+
302
302
+
let response = client
303
303
+
.service
304
304
+
.com
305
305
+
.atproto
306
306
+
.repo
307
307
+
.get_record(com::atproto::repo::get_record::Parameters::from(
308
308
+
com::atproto::repo::get_record::ParametersData {
309
309
+
cid: None,
310
310
+
collection: collection_nsid,
311
311
+
repo: types::string::AtIdentifier::Did(did),
312
312
+
rkey: record_key,
313
313
+
}
314
314
+
))
315
315
+
.await?;
316
316
+
317
317
+
Ok(serde_json::to_string_pretty(&response.value)?)
318
318
+
}
+119
-57
src/fs.rs
reviewed
···
1
1
+
use std::sync::{Arc, Mutex};
1
2
use std::time::{self, SystemTime, UNIX_EPOCH, Duration};
2
3
3
4
use atrium_repo::{Repository, blockstore::AsyncBlockStoreRead};
···
28
29
}
29
30
30
31
pub struct PdsFs<R> {
31
31
-
repos: IndexMap<String, Repository<R>>,
32
32
-
inodes: IndexSet<PdsFsEntry>,
32
32
+
repos: Arc<Mutex<IndexMap<String, Repository<R>>>>,
33
33
+
inodes: Arc<Mutex<IndexSet<PdsFsEntry>>>,
34
34
+
sizes: Arc<Mutex<IndexMap<Inode, u64>>>,
35
35
+
content_cache: Arc<Mutex<IndexMap<String, String>>>,
33
36
rt: tokio::runtime::Runtime,
34
37
}
35
38
···
68
71
69
72
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
70
73
pub struct PdsFsCollection {
71
71
-
parent: Inode,
72
72
-
nsid: String,
74
74
+
pub parent: Inode,
75
75
+
pub nsid: String,
73
76
}
74
77
75
78
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
76
79
pub struct PdsFsRecord {
77
77
-
parent: Inode,
78
78
-
rkey: String,
80
80
+
pub parent: Inode,
81
81
+
pub rkey: String,
79
82
}
80
83
81
84
// impl PdsFsRecord {
···
111
114
{
112
115
pub fn new() -> Self {
113
116
PdsFs {
114
114
-
repos: Default::default(),
115
115
-
inodes: IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]),
117
117
+
repos: Arc::new(Mutex::new(Default::default())),
118
118
+
inodes: Arc::new(Mutex::new(IndexSet::from([PdsFsEntry::Zero, PdsFsEntry::Root]))),
119
119
+
sizes: Arc::new(Mutex::new(Default::default())),
120
120
+
content_cache: Arc::new(Mutex::new(Default::default())),
116
121
rt: tokio::runtime::Runtime::new().unwrap(),
117
122
}
118
123
}
119
124
125
125
+
pub fn get_shared_state(&self) -> (Arc<Mutex<IndexMap<String, Repository<R>>>>, Arc<Mutex<IndexSet<PdsFsEntry>>>, Arc<Mutex<IndexMap<Inode, u64>>>, Arc<Mutex<IndexMap<String, String>>>) {
126
126
+
(Arc::clone(&self.repos), Arc::clone(&self.inodes), Arc::clone(&self.sizes), Arc::clone(&self.content_cache))
127
127
+
}
128
128
+
120
129
pub async fn add(&mut self, did: String, mut repo: Repository<R>) {
121
130
let mut mst = repo.tree();
122
131
123
123
-
let (did_inode, _) = self.inodes.insert_full(PdsFsEntry::Did(did.clone()));
132
132
+
let did_inode = {
133
133
+
let mut inodes = self.inodes.lock().unwrap();
134
134
+
let (did_inode, _) = inodes.insert_full(PdsFsEntry::Did(did.clone()));
135
135
+
did_inode
136
136
+
};
124
137
125
138
let mut keys = Box::pin(mst.keys());
126
139
while let Some(Ok(key)) = keys.next().await {
127
140
if let Some((collection_name, rkey)) = key.split_once("/") {
128
128
-
let (collection_inode, _) =
129
129
-
self.inodes
130
130
-
.insert_full(PdsFsEntry::Collection(PdsFsCollection {
131
131
-
parent: did_inode,
132
132
-
nsid: collection_name.to_owned(),
133
133
-
}));
141
141
+
let mut inodes = self.inodes.lock().unwrap();
142
142
+
let (collection_inode, _) = inodes.insert_full(PdsFsEntry::Collection(PdsFsCollection {
143
143
+
parent: did_inode,
144
144
+
nsid: collection_name.to_owned(),
145
145
+
}));
134
146
135
135
-
self.inodes.insert(PdsFsEntry::Record(PdsFsRecord {
147
147
+
inodes.insert(PdsFsEntry::Record(PdsFsRecord {
136
148
parent: collection_inode,
137
149
rkey: rkey.to_owned(),
138
150
}));
···
142
154
drop(keys);
143
155
drop(mst);
144
156
145
145
-
self.repos.insert(did, repo);
157
157
+
self.repos.lock().unwrap().insert(did, repo);
146
158
}
147
159
148
160
fn attr(&mut self, ino: u64) -> fuser::FileAttr {
149
149
-
match self.inodes.get_index(ino as usize) {
161
161
+
let inodes = self.inodes.lock().unwrap();
162
162
+
match inodes.get_index(ino as usize) {
150
163
Some(PdsFsEntry::Root) => ROOTDIR_ATTR,
151
164
Some(PdsFsEntry::Collection(_)) => fuser::FileAttr {
152
165
ino,
···
183
196
blksize: BLKSIZE,
184
197
},
185
198
Some(PdsFsEntry::Record(r)) => {
186
186
-
let col = self.inodes[r.parent].unwrap_collection();
187
187
-
let did = self.inodes[col.parent].unwrap_did();
188
188
-
let repo = &mut self.repos[did];
189
189
-
let key = format!("{}/{}", col.nsid, r.rkey);
190
190
-
let size = self
191
191
-
.rt
192
192
-
.block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key))
193
193
-
.ok()
194
194
-
.flatten()
195
195
-
.map_or(0, |v| serde_json::to_string_pretty(&v).unwrap().len())
196
196
-
as u64;
199
199
+
let col = inodes[r.parent].unwrap_collection();
200
200
+
let did = inodes[col.parent].unwrap_did().clone();
201
201
+
let rkey = r.rkey.clone();
202
202
+
let collection_nsid = col.nsid.clone();
203
203
+
drop(inodes);
204
204
+
205
205
+
// Check cache first
206
206
+
let size = {
207
207
+
let sizes = self.sizes.lock().unwrap();
208
208
+
if let Some(&cached_size) = sizes.get(&(ino as usize)) {
209
209
+
cached_size
210
210
+
} else {
211
211
+
drop(sizes);
212
212
+
// Not in cache, try to fetch from repo
213
213
+
let mut repos = self.repos.lock().unwrap();
214
214
+
let repo = &mut repos[&did];
215
215
+
let key = format!("{}/{}", collection_nsid, rkey);
216
216
+
let size = self
217
217
+
.rt
218
218
+
.block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key))
219
219
+
.ok()
220
220
+
.flatten()
221
221
+
.map_or(500, |v| serde_json::to_string_pretty(&v).unwrap().len())
222
222
+
as u64;
223
223
+
// Cache it for next time
224
224
+
self.sizes.lock().unwrap().insert(ino as usize, size);
225
225
+
size
226
226
+
}
227
227
+
};
197
228
let blocks = ((size as u32 + BLKSIZE - 1) / BLKSIZE) as u64;
198
229
199
230
// Decode TID to get creation timestamp
200
200
-
let timestamp = tid_to_timestamp(&r.rkey).unwrap_or(time::UNIX_EPOCH);
231
231
+
let timestamp = tid_to_timestamp(&rkey).unwrap_or(time::UNIX_EPOCH);
201
232
202
233
fuser::FileAttr {
203
234
ino,
···
214
245
gid: 20,
215
246
rdev: 0,
216
247
flags: 0,
217
217
-
blksize: 512,
248
248
+
blksize: BLKSIZE,
218
249
}
219
250
}
220
251
_ => panic!("zero"),
···
233
264
_fh: Option<u64>,
234
265
reply: fuser::ReplyAttr,
235
266
) {
236
236
-
if (ino as usize) < self.inodes.len() {
267
267
+
let len = self.inodes.lock().unwrap().len();
268
268
+
if (ino as usize) < len {
237
269
reply.attr(&TTL, &self.attr(ino as u64))
238
270
} else {
239
271
reply.error(libc::ENOENT)
···
248
280
offset: i64,
249
281
mut reply: fuser::ReplyDirectory,
250
282
) {
251
251
-
match self.inodes.get_index(ino as usize) {
283
283
+
let inodes = self.inodes.lock().unwrap();
284
284
+
match inodes.get_index(ino as usize) {
252
285
Some(PdsFsEntry::Root) => {
253
286
let entries: Vec<_> = vec![(ino, ".".to_string()), (ino, "..".to_string())]
254
287
.into_iter()
255
255
-
.chain(self.inodes.iter().enumerate().filter_map(|(i, e)| {
288
288
+
.chain(inodes.iter().enumerate().filter_map(|(i, e)| {
256
289
if let PdsFsEntry::Did(did) = e {
257
290
Some((i as u64, did.clone()))
258
291
} else {
···
260
293
}
261
294
}))
262
295
.collect();
296
296
+
drop(inodes);
263
297
264
298
for (index, (inode_num, name)) in
265
299
entries.into_iter().enumerate().skip(offset as usize)
···
276
310
reply.ok()
277
311
}
278
312
Some(PdsFsEntry::Did(_)) => {
279
279
-
let entries = vec![(ino, ".".to_string()), (1, "..".to_string())]
313
313
+
let entries: Vec<_> = vec![(ino, ".".to_string()), (1, "..".to_string())]
280
314
.into_iter()
281
281
-
.chain(self.inodes.iter().enumerate().filter_map(|(i, e)| {
315
315
+
.chain(inodes.iter().enumerate().filter_map(|(i, e)| {
282
316
if let PdsFsEntry::Collection(col) = e {
283
317
if col.parent == ino as usize {
284
318
Some((i as u64, col.nsid.clone()))
···
289
323
None
290
324
}
291
325
}))
292
292
-
.into_iter()
293
293
-
.enumerate()
294
294
-
.skip(offset as usize);
326
326
+
.collect();
327
327
+
drop(inodes);
295
328
296
296
-
for (index, (inode_num, name)) in entries {
329
329
+
for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) {
297
330
let full = reply.add(
298
331
inode_num,
299
332
(index + 1) as i64,
···
312
345
reply.ok();
313
346
}
314
347
Some(PdsFsEntry::Collection(c)) => {
315
315
-
let entries = [(ino, ".".to_string()), (c.parent as u64, "..".to_string())]
348
348
+
let parent_ino = c.parent;
349
349
+
let entries: Vec<_> = [(ino, ".".to_string()), (parent_ino as u64, "..".to_string())]
316
350
.into_iter()
317
317
-
.chain(self.inodes.iter().enumerate().filter_map(|(i, e)| {
351
351
+
.chain(inodes.iter().enumerate().filter_map(|(i, e)| {
318
352
if let PdsFsEntry::Record(record) = e {
319
353
if record.parent == ino as usize {
320
354
Some((i as u64, format!("{}.json", record.rkey)))
···
325
359
None
326
360
}
327
361
}))
328
328
-
.into_iter()
329
329
-
.enumerate()
330
330
-
.skip(offset as usize);
362
362
+
.collect();
363
363
+
drop(inodes);
331
364
332
332
-
for (index, (inode_num, name)) in entries {
365
365
+
for (index, (inode_num, name)) in entries.into_iter().enumerate().skip(offset as usize) {
333
366
let full = reply.add(
334
367
inode_num,
335
368
(index + 1) as i64,
···
347
380
348
381
reply.ok()
349
382
}
350
350
-
_ => reply.error(libc::ENOENT),
383
383
+
_ => {
384
384
+
drop(inodes);
385
385
+
reply.error(libc::ENOENT)
386
386
+
}
351
387
}
352
388
}
353
389
···
358
394
name: &std::ffi::OsStr,
359
395
reply: fuser::ReplyEntry,
360
396
) {
361
361
-
match self.inodes.get_index(parent as usize) {
397
397
+
let inodes = self.inodes.lock().unwrap();
398
398
+
match inodes.get_index(parent as usize) {
362
399
Some(PdsFsEntry::Root) => {
363
400
let did = PdsFsEntry::Did(name.to_string_lossy().to_string());
364
364
-
if let Some(ino) = self.inodes.get_index_of(&did) {
401
401
+
if let Some(ino) = inodes.get_index_of(&did) {
402
402
+
drop(inodes);
365
403
reply.entry(&TTL, &self.attr(ino as u64), 0);
366
404
} else {
405
405
+
drop(inodes);
367
406
reply.error(libc::ENOENT)
368
407
}
369
408
}
···
372
411
parent: parent as usize,
373
412
nsid: name.to_string_lossy().to_string(),
374
413
});
375
375
-
if let Some(ino) = self.inodes.get_index_of(&col) {
414
414
+
if let Some(ino) = inodes.get_index_of(&col) {
415
415
+
drop(inodes);
376
416
reply.entry(&TTL, &self.attr(ino as u64), 0);
377
417
} else {
418
418
+
drop(inodes);
378
419
reply.error(libc::ENOENT)
379
420
}
380
421
}
···
385
426
parent: parent as usize,
386
427
rkey,
387
428
});
388
388
-
if let Some(ino) = self.inodes.get_index_of(&record) {
429
429
+
if let Some(ino) = inodes.get_index_of(&record) {
430
430
+
drop(inodes);
389
431
reply.entry(&TTL, &self.attr(ino as u64), 0);
390
432
} else {
433
433
+
drop(inodes);
391
434
reply.error(libc::ENOENT)
392
435
}
393
436
}
394
394
-
_ => reply.error(libc::ENOENT),
437
437
+
_ => {
438
438
+
drop(inodes);
439
439
+
reply.error(libc::ENOENT)
440
440
+
}
395
441
}
396
442
}
397
443
···
406
452
_lock: Option<u64>,
407
453
reply: fuser::ReplyData,
408
454
) {
409
409
-
if let Some(PdsFsEntry::Record(r)) = self.inodes.get_index(ino as usize) {
410
410
-
let col = self.inodes[r.parent].unwrap_collection();
411
411
-
let did = self.inodes[col.parent].unwrap_did();
412
412
-
let repo = &mut self.repos[did];
455
455
+
let inodes = self.inodes.lock().unwrap();
456
456
+
if let Some(PdsFsEntry::Record(r)) = inodes.get_index(ino as usize) {
457
457
+
let col = inodes[r.parent].unwrap_collection();
458
458
+
let did = inodes[col.parent].unwrap_did().clone();
413
459
let key = format!("{}/{}", col.nsid, r.rkey);
460
460
+
let cache_key = format!("{}/{}", did, key);
461
461
+
drop(inodes);
414
462
463
463
+
// Check content cache first (for new records from firehose)
464
464
+
{
465
465
+
let cache = self.content_cache.lock().unwrap();
466
466
+
if let Some(content) = cache.get(&cache_key) {
467
467
+
reply.data(&content.as_bytes()[offset as usize..]);
468
468
+
return;
469
469
+
}
470
470
+
}
471
471
+
472
472
+
// Fall back to repo
473
473
+
let mut repos = self.repos.lock().unwrap();
474
474
+
let repo = &mut repos[&did];
415
475
if let Ok(Some(val)) = self.rt.block_on(repo.get_raw::<ipld_core::ipld::Ipld>(&key)) {
416
416
-
reply.data(&serde_json::to_string_pretty(&val).unwrap().as_bytes()[offset as usize..]);
476
476
+
reply.data(&serde_json::to_string(&val).unwrap().as_bytes()[offset as usize..]);
417
477
return;
418
478
}
479
479
+
} else {
480
480
+
drop(inodes);
419
481
}
420
482
reply.error(libc::ENOENT);
421
483
}
+42
-9
src/main.rs
reviewed
···
1
1
mod client;
2
2
mod error;
3
3
+
mod firehose;
3
4
mod fs;
4
5
mod resolver;
5
6
···
12
13
use futures::{StreamExt, stream};
13
14
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
14
15
use std::{
15
15
-
collections::HashMap,
16
16
io::{Cursor, Write},
17
17
path::PathBuf,
18
18
sync::Arc,
···
58
58
let id = r.resolve(&h).await?;
59
59
let bytes = cached_download(&id, &b).await?;
60
60
let repo = build_repo(bytes).await?;
61
61
-
Ok::<_, error::Error>((id.did, repo))
61
61
+
Ok::<_, error::Error>((id.did.clone(), id.pds.clone(), repo))
62
62
}
63
63
})
64
64
.collect::<Vec<_>>(),
···
67
67
for e in errors {
68
68
eprintln!("{:?}", e.as_ref().unwrap_err());
69
69
}
70
70
-
let repos = success
70
70
+
let repos_with_pds: Vec<_> = success
71
71
.into_iter()
72
72
.map(|s| s.unwrap())
73
73
-
.collect::<HashMap<_, _>>();
73
73
+
.collect();
74
74
75
75
// construct the fs
76
76
let mut fs = fs::PdsFs::new();
77
77
-
for (did, repo) in repos {
77
77
+
78
78
+
// Extract (did, pds) pairs for WebSocket tasks before consuming repos
79
79
+
let did_pds_pairs: Vec<_> = repos_with_pds.iter()
80
80
+
.map(|(did, pds, _)| (did.clone(), pds.clone()))
81
81
+
.collect();
82
82
+
83
83
+
// Consume repos_with_pds to add repos to filesystem
84
84
+
for (did, _, repo) in repos_with_pds {
78
85
rt.block_on(fs.add(did, repo))
79
86
}
87
87
+
88
88
+
// get shared state for WebSocket tasks
89
89
+
let (_repos_arc, inodes_arc, sizes_arc, content_cache_arc) = fs.get_shared_state();
80
90
81
91
// mount
82
92
let options = vec![
···
86
96
MountOption::CUSTOM("local".to_string()),
87
97
MountOption::CUSTOM("volname=pdsfs".to_string()),
88
98
];
89
89
-
let join_handle = fuser::spawn_mount2(fs, &mountpoint, &options).unwrap();
99
99
+
100
100
+
// Create session and get notifier for Finder refresh
101
101
+
let session = fuser::Session::new(fs, &mountpoint, &options).unwrap();
102
102
+
let notifier = session.notifier();
103
103
+
let _bg = session.spawn().unwrap();
104
104
+
105
105
+
// spawn WebSocket subscription tasks for each DID using the runtime handle
106
106
+
let rt_handle = rt.handle().clone();
107
107
+
for (did, pds) in did_pds_pairs {
108
108
+
let inodes_clone = Arc::clone(&inodes_arc);
109
109
+
let sizes_clone = Arc::clone(&sizes_arc);
110
110
+
let content_cache_clone = Arc::clone(&content_cache_arc);
111
111
+
let notifier_clone = notifier.clone();
112
112
+
113
113
+
rt_handle.spawn(async move {
114
114
+
if let Err(e) = firehose::subscribe_to_repo::<atrium_repo::blockstore::CarStore<std::io::Cursor<Vec<u8>>>>(
115
115
+
did,
116
116
+
pds,
117
117
+
inodes_clone,
118
118
+
sizes_clone,
119
119
+
content_cache_clone,
120
120
+
notifier_clone,
121
121
+
).await {
122
122
+
eprintln!("WebSocket error: {:?}", e);
123
123
+
}
124
124
+
});
125
125
+
}
90
126
91
127
println!("mounted at {mountpoint:?}");
92
128
print!("hit enter to unmount and exit...");
···
95
131
// Wait for user input
96
132
let mut input = String::new();
97
133
std::io::stdin().read_line(&mut input).unwrap();
98
98
-
99
99
-
join_handle.join();
100
100
-
std::fs::remove_dir(&mountpoint).unwrap();
101
134
102
135
println!("unmounted {mountpoint:?}");
103
136
}