this repo has no description sites.wisp.place/zzstoatzz.io/pds-message-poc
pds messaging

add service auth and labeler simulation

- simulate com.atproto.server.getServiceAuth JWT pattern
- simulate com.atproto.label spam filtering
- dynamic recipient panel updates

๐Ÿค– Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+219 -56
+40 -35
README.md
··· 18 18 19 19 - type a message 20 20 - select sender/recipient 21 - - click Send 22 - - watch messages flow through queue โ†’ recipient decides accept/reject 23 - - use Toggle Block to block the selected sender 21 + - click Send โ†’ watch service auth token created, message queued, recipient decides 22 + - click Block โ†’ alice blocks selected sender 23 + - click Spam Label โ†’ labeler marks selected sender as spam (rejected by all) 24 24 25 25 press `q` to quit. 26 26 27 27 ## what's happening 28 28 29 29 ``` 30 - โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” 31 - โ”‚ Bob โ”‚ โ”‚ Alice โ”‚ 32 - โ”‚ (sender) โ”‚ โ”‚ (recipient) โ”‚ 33 - โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค 34 - โ”‚ send_message() โ”‚ โ”€โ”€โ”€โ”€โ”€โ”€โ–บ โ”‚ inbox queue โ”‚ 35 - โ”‚ โ”‚ docket โ”‚ (redis stream) โ”‚ 36 - โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ 37 - โ”‚ 38 - โ–ผ 39 - Worker decides: 40 - - accept โ†’ store in inbox 41 - - reject (blocked sender) 42 - - reject (rate limited) 30 + โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” 31 + โ”‚ Bob's PDS โ”‚ โ”‚ Alice's PDS โ”‚ 32 + โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค 33 + โ”‚ โ”‚ 1. getServiceAuth(aud=alice)โ”‚ โ”‚ 34 + โ”‚ send_message() โ”‚ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€>โ”‚ โ”‚ 35 + โ”‚ โ”‚ 2. JWT: iss=bob aud=alice โ”‚ inbox queue โ”‚ 36 + โ”‚ โ”‚ <โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”‚ (docket) โ”‚ 37 + โ”‚ โ”‚ โ”‚ โ”‚ 38 + โ”‚ โ”‚ 3. POST /inbox + JWT โ”‚ โ”‚ 39 + โ”‚ โ”‚ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€>โ”‚ worker checks: โ”‚ 40 + โ”‚ โ”‚ โ”‚ - token valid? โ”‚ 41 + โ”‚ โ”‚ โ”‚ - spam label? โ”‚ 42 + โ”‚ โ”‚ โ”‚ - blocked? โ”‚ 43 + โ”‚ โ”‚ โ”‚ - rate limit? โ”‚ 44 + โ”‚ โ”‚ 4. {status: accepted} โ”‚ โ”‚ 45 + โ”‚ โ”‚ <โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”‚ โ†’ accept/rejectโ”‚ 46 + โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ 47 + โ”‚ 48 + โ–ผ 49 + โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” 50 + โ”‚ Labeler โ”‚ 51 + โ”‚ (reputation) โ”‚ 52 + โ”‚ spam labels โ”‚ 53 + โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ 43 54 ``` 44 55 45 - key insight: messages go to a **queue**, not directly to recipient's repo. 56 + ## what's demonstrated 46 57 47 - ## what this demonstrates 48 - 49 - the **architectural pattern** is real: sender โ†’ queue โ†’ recipient decides. 50 - 51 - docket provides real queue semantics (add โ†’ process โ†’ retry). swap `memory://` for a redis URL and you have persistent queues. 58 + | feature | implementation | ATProto pattern | 59 + |---------|----------------|-----------------| 60 + | inbox queue | docket (redis streams) | proposed `dev.pds.inbox.sendMessage` endpoint | 61 + | service auth | JWT with iss/aud/exp/lxm | [com.atproto.server.getServiceAuth](https://github.com/bluesky-social/atproto/blob/main/lexicons/com/atproto/server/getServiceAuth.json) | 62 + | reputation | labeler with spam labels | [com.atproto.label](https://github.com/bluesky-social/atproto/tree/main/lexicons/com/atproto/label) | 63 + | block list | per-user set | existing pattern | 64 + | rate limiting | per-sender, time-windowed | existing pattern | 52 65 53 66 ## what's mocked 54 67 55 68 | component | current | path to real | 56 69 |-----------|---------|--------------| 57 - | DIDs | fake strings | use real DIDs with [PLC resolution](https://github.com/did-method-plc/did-method-plc) | 58 - | queue backend | docket `memory://` | docket `redis://` for persistence | 59 - | message storage | python list | sqlite, repo records | 60 - | inter-PDS auth | none | [service auth JWTs](https://github.com/bluesky-social/atproto/blob/main/packages/xrpc-server/src/auth.ts) via `com.atproto.server.getServiceAuth` | 61 - 62 - ## what this proposal needs 63 - 64 - current bluesky chat is [centralized](https://github.com/bluesky-social/atproto/tree/main/lexicons/chat/bsky/convo). jacob.gold's proposal makes PDSes themselves handle incoming messages like email servers. 65 - 66 - | problem | status | 67 - |---------|--------| 68 - | inbox endpoint | needs new lexicon + XRPC endpoint on PDS | 69 - | spam at scale | per-sender rate limiting is trivial; distributed spam from many DIDs needs reputation/other signals | 70 + | DIDs | fake strings | [PLC resolution](https://github.com/did-method-plc/did-method-plc) | 71 + | queue backend | docket `memory://` | docket `redis://` | 72 + | JWT signing | sha256 hash | [DID signing keys](https://github.com/bluesky-social/atproto/tree/main/packages/crypto) | 73 + | labeler | in-memory dict | [ozone](https://github.com/bluesky-social/atproto/tree/main/packages/ozone) | 70 74 71 75 ## references 72 76 73 77 - [jacob.gold's thread](https://bsky.app/profile/jacob.gold/post/3mbsbqsc3vc24) 74 78 - [docket](https://github.com/chrisguidry/docket) 75 79 - [official PDS](https://github.com/bluesky-social/atproto/tree/main/packages/pds) 80 + - [service auth](https://github.com/bluesky-social/atproto/blob/main/packages/xrpc-server/src/auth.ts) 76 81 - [AT Protocol specs](https://atproto.com/specs/atp)
+179 -21
src/pds_message_poc/app.py
··· 5 5 - PDSes have incoming message queues 6 6 - Senders push to recipient's queue 7 7 - Recipients decide to accept/reject 8 + 9 + Now with: 10 + - Service auth JWT simulation (com.atproto.server.getServiceAuth pattern) 11 + - Label-based reputation (com.atproto.label pattern) 8 12 """ 9 13 10 14 import asyncio 15 + import hashlib 16 + import time 11 17 from collections import defaultdict 12 18 from dataclasses import dataclass, field 13 19 from datetime import datetime, timedelta, timezone ··· 15 21 from textual.app import App, ComposeResult 16 22 from textual.containers import Container, Horizontal, Vertical 17 23 from textual.widgets import Button, Footer, Header, Input, Label, Log, RichLog, Select, Static 18 - from textual.reactive import reactive 19 24 20 25 from docket import Docket, Worker 21 26 22 27 28 + # --- Simulated Labeler (com.atproto.label pattern) --- 29 + 30 + @dataclass 31 + class Labeler: 32 + """ 33 + Simulates ATProto labeler service. 34 + 35 + Real impl: labels are signed records with src (labeler DID), uri (target), 36 + val (label value like "spam", "trusted"). See com.atproto.label.defs 37 + """ 38 + labels: dict[str, set[str]] = field(default_factory=lambda: defaultdict(set)) 39 + 40 + def add_label(self, did: str, label: str) -> None: 41 + self.labels[did].add(label) 42 + 43 + def remove_label(self, did: str, label: str) -> None: 44 + self.labels[did].discard(label) 45 + 46 + def has_label(self, did: str, label: str) -> bool: 47 + return label in self.labels[did] 48 + 49 + def get_labels(self, did: str) -> set[str]: 50 + return self.labels[did] 51 + 52 + 53 + # --- Simulated Service Auth (com.atproto.server.getServiceAuth pattern) --- 54 + 55 + @dataclass 56 + class ServiceToken: 57 + """ 58 + Simulates service auth JWT. 59 + 60 + Real impl: signed JWT with iss (sender DID), aud (recipient PDS DID), 61 + exp (expiry), lxm (lexicon method). See com.atproto.server.getServiceAuth 62 + """ 63 + iss: str # issuer DID 64 + aud: str # audience DID (recipient's PDS) 65 + exp: int # expiry timestamp 66 + lxm: str # lexicon method being called 67 + 68 + def is_valid(self) -> bool: 69 + return time.time() < self.exp 70 + 71 + def signature(self) -> str: 72 + """fake signature - real impl uses DID signing key""" 73 + return hashlib.sha256(f"{self.iss}:{self.aud}:{self.exp}".encode()).hexdigest()[:8] 74 + 75 + 76 + def create_service_token(sender_did: str, recipient_did: str) -> ServiceToken: 77 + """ 78 + Simulates com.atproto.server.getServiceAuth call. 79 + 80 + Real impl: sender's PDS creates JWT signed with sender's key, 81 + recipient's PDS verifies against sender's DID document. 82 + """ 83 + return ServiceToken( 84 + iss=sender_did, 85 + aud=recipient_did, 86 + exp=int(time.time()) + 60, # 60 second expiry 87 + lxm="dev.pds.inbox.sendMessage", 88 + ) 89 + 90 + 23 91 # --- Simulated PDS --- 24 92 25 93 @dataclass 26 94 class PDS: 27 - """minimal PDS with inbox queue""" 95 + """ 96 + Minimal PDS with inbox queue. 97 + 98 + Real impl would use ActorStore, MST, etc. 99 + See github.com/bluesky-social/atproto/tree/main/packages/pds 100 + """ 28 101 did: str 29 102 handle: str 30 103 inbox: list[dict] = field(default_factory=list) ··· 32 105 rate_limit: int = 5 33 106 _counts: dict[str, list[datetime]] = field(default_factory=lambda: defaultdict(list)) 34 107 35 - def accept(self, sender: str, text: str) -> tuple[bool, str]: 36 - """decide whether to accept incoming message""" 108 + def accept(self, sender: str, text: str, token: ServiceToken, labeler: Labeler) -> tuple[bool, str]: 109 + """ 110 + Decide whether to accept incoming message. 111 + 112 + Checks (in order): 113 + 1. Service auth token validity 114 + 2. Label-based reputation (spam label = reject) 115 + 3. Block list 116 + 4. Rate limit 117 + """ 118 + # verify service auth 119 + if not token.is_valid(): 120 + return False, "token-expired" 121 + if token.aud != self.did: 122 + return False, "wrong-audience" 123 + 124 + # check labels (reputation) 125 + if labeler.has_label(sender, "spam"): 126 + return False, "labeled-spam" 127 + 128 + # check block list 37 129 if sender in self.blocked: 38 130 return False, "blocked" 39 131 132 + # rate limit 40 133 now = datetime.now(timezone.utc) 41 134 cutoff = now - timedelta(minutes=1) 42 135 self._counts[sender] = [t for t in self._counts[sender] if t > cutoff] ··· 52 145 # --- Global state --- 53 146 54 147 NETWORK: dict[str, PDS] = {} 148 + LABELER: Labeler = Labeler() 55 149 DOCKET: Docket | None = None 56 150 WORKER_TASK: asyncio.Task | None = None 57 - LOG_CALLBACK = None # set by app 58 - REFRESH_CALLBACK = None # set by app 151 + LOG_CALLBACK = None 152 + REFRESH_CALLBACK = None 59 153 60 154 61 155 def get_pds(did: str) -> PDS | None: 62 156 return NETWORK.get(did) 63 157 64 158 65 - # --- Docket task --- 159 + # --- Docket task (simulates dev.pds.inbox.sendMessage endpoint) --- 66 160 67 - async def deliver_message(sender_did: str, recipient_did: str, text: str) -> None: 68 - """worker task: deliver message to recipient's inbox""" 161 + async def deliver_message(sender_did: str, recipient_did: str, text: str, token_sig: str) -> None: 162 + """ 163 + Worker task: deliver message to recipient's inbox. 164 + 165 + This simulates what a real PDS inbox endpoint would do: 166 + 1. Verify service auth token 167 + 2. Check reputation labels 168 + 3. Check block list 169 + 4. Apply rate limits 170 + 5. Accept or reject 171 + """ 69 172 recipient = get_pds(recipient_did) 70 173 if not recipient: 71 174 if LOG_CALLBACK: ··· 75 178 sender = get_pds(sender_did) 76 179 sender_name = sender.handle if sender else sender_did[:12] 77 180 78 - ok, reason = recipient.accept(sender_did, text) 181 + # recreate token (in real impl, this would be passed and verified) 182 + token = create_service_token(sender_did, recipient_did) 183 + 184 + ok, reason = recipient.accept(sender_did, text, token, LABELER) 79 185 80 186 if LOG_CALLBACK: 81 187 if ok: 82 - LOG_CALLBACK(f"[green]+[/] {recipient.handle} accepted from {sender_name}") 188 + LOG_CALLBACK(f"[green]+[/] {recipient.handle} accepted from {sender_name} [dim](sig:{token_sig})[/]") 83 189 else: 84 190 LOG_CALLBACK(f"[red]x[/] {recipient.handle} rejected ({reason}) from {sender_name}") 85 191 ··· 99 205 def compose(self) -> ComposeResult: 100 206 yield Label(f"[bold]{self.pds.handle}[/]", classes="inbox-title") 101 207 yield Label(f"rate: {self.pds.rate_limit}/min", classes="inbox-subtitle") 102 - yield Log(id=f"inbox-{self.pds.handle}", classes="inbox-log") 208 + yield Log(classes="inbox-log") 103 209 104 210 def refresh_inbox(self) -> None: 105 - log = self.query_one(f"#inbox-{self.pds.handle}", Log) 211 + log = self.query_one(".inbox-log", Log) 106 212 log.clear() 107 213 for msg in self.pds.inbox[-10:]: 108 214 sender = get_pds(msg["from"]) ··· 167 273 width: 1fr; 168 274 } 169 275 170 - #block-btn { 276 + #block-btn, #spam-btn { 171 277 margin-left: 1; 172 278 } 173 279 """ ··· 218 324 219 325 with Horizontal(): 220 326 yield Button("Send", id="send-btn", variant="primary") 221 - yield Button("Toggle Block", id="block-btn", variant="primary") 327 + yield Button("Block", id="block-btn", variant="primary") 328 + yield Button("Spam Label", id="spam-btn", variant="warning") 222 329 223 330 yield Label("[bold]event log[/]", classes="section-title") 224 331 yield RichLog(id="event-log", markup=True) 225 332 226 333 with Container(id="right-panel"): 227 - yield InboxWidget(self.alice, id="alice-inbox") 334 + yield InboxWidget(self.alice, id="recipient-inbox") 228 335 229 336 yield Footer() 230 337 ··· 240 347 WORKER_TASK = asyncio.create_task(worker.run_forever()) 241 348 242 349 self.log_event("[dim]docket worker started[/]") 243 - self.log_event("[dim]alice blocks: nobody | rate limit: 3/min[/]") 350 + self.log_event("[dim]alice: rate=3/min | blocks=none | labels=none[/]") 244 351 self.log_event("") 245 352 246 353 async def on_unmount(self) -> None: ··· 258 365 pass 259 366 260 367 def refresh_inboxes(self) -> None: 261 - for widget_id in ["alice-inbox", "bob-inbox", "charlie-inbox"]: 368 + for widget_id in ["bob-inbox", "charlie-inbox", "recipient-inbox"]: 262 369 try: 263 370 widget = self.query_one(f"#{widget_id}", InboxWidget) 264 371 widget.refresh_inbox() 265 372 except Exception: 266 373 pass 267 374 375 + def update_recipient_panel(self) -> None: 376 + """Update right panel to show selected recipient's inbox""" 377 + recipient_select = self.query_one("#recipient-select", Select) 378 + recipient_did = recipient_select.value 379 + if recipient_did == Select.BLANK: 380 + return 381 + 382 + recipient = get_pds(recipient_did) 383 + if not recipient: 384 + return 385 + 386 + try: 387 + widget = self.query_one("#recipient-inbox", InboxWidget) 388 + widget.pds = recipient 389 + # update the title label 390 + title = widget.query_one(".inbox-title", Label) 391 + title.update(f"[bold]{recipient.handle}[/]") 392 + subtitle = widget.query_one(".inbox-subtitle", Label) 393 + subtitle.update(f"rate: {recipient.rate_limit}/min") 394 + widget.refresh_inbox() 395 + except Exception: 396 + pass 397 + 268 398 async def on_button_pressed(self, event: Button.Pressed) -> None: 269 399 if event.button.id == "send-btn": 270 400 await self.send_message() 271 401 elif event.button.id == "block-btn": 272 402 self.toggle_block() 403 + elif event.button.id == "spam-btn": 404 + self.toggle_spam_label() 273 405 274 406 async def on_input_submitted(self, event: Input.Submitted) -> None: 275 407 if event.input.id == "message-input": 276 408 await self.send_message() 409 + 410 + def on_select_changed(self, event: Select.Changed) -> None: 411 + if event.select.id == "recipient-select": 412 + self.update_recipient_panel() 277 413 278 414 async def send_message(self) -> None: 279 415 input_widget = self.query_one("#message-input", Input) ··· 294 430 sender = get_pds(sender_did) 295 431 recipient = get_pds(recipient_did) 296 432 297 - self.log_event(f"[cyan]>>>[/] {sender.handle} -> {recipient.handle}: {text[:30]}") 433 + # create service auth token (simulates getServiceAuth call) 434 + token = create_service_token(sender_did, recipient_did) 298 435 299 - # queue via docket 436 + self.log_event(f"[cyan]>>>[/] {sender.handle} -> {recipient.handle}: {text[:20]}...") 437 + self.log_event(f"[dim] token: iss={sender.handle} aud={recipient.handle} sig={token.signature()}[/]") 438 + 439 + # queue via docket (simulates HTTP POST to recipient's inbox endpoint) 300 440 await DOCKET.add(deliver_message)( 301 441 sender_did=sender_did, 302 442 recipient_did=recipient_did, 303 443 text=text, 444 + token_sig=token.signature(), 304 445 ) 305 446 306 447 input_widget.value = "" 307 - # worker calls REFRESH_CALLBACK after processing 308 448 309 449 def toggle_block(self) -> None: 310 450 sender_select = self.query_one("#sender-select", Select) ··· 322 462 else: 323 463 self.alice.blocked.add(sender_did) 324 464 self.log_event(f"[yellow]alice blocked {sender.handle}[/]") 465 + 466 + def toggle_spam_label(self) -> None: 467 + """Toggle spam label on selected sender (simulates labeler action)""" 468 + sender_select = self.query_one("#sender-select", Select) 469 + sender_did = sender_select.value 470 + 471 + if sender_did == Select.BLANK: 472 + self.log_event("[red]select a sender to label[/]") 473 + return 474 + 475 + sender = get_pds(sender_did) 476 + 477 + if LABELER.has_label(sender_did, "spam"): 478 + LABELER.remove_label(sender_did, "spam") 479 + self.log_event(f"[magenta]labeler removed 'spam' from {sender.handle}[/]") 480 + else: 481 + LABELER.add_label(sender_did, "spam") 482 + self.log_event(f"[magenta]labeler added 'spam' to {sender.handle}[/]") 325 483 326 484 327 485 def main() -> None: