social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

split streams

zenfyr.dev 6f8993c0 d58e914e

verified
+98 -31
+52 -8
bluesky/input.py
··· 1 1 import asyncio 2 + import json 2 3 import re 3 4 from abc import ABC 4 5 from dataclasses import dataclass, field 5 - from typing import Any, Callable, override 6 + from typing import Any, cast, override 6 7 7 8 import websockets 8 9 9 10 from bluesky.info import SERVICE, BlueskyService, validate_and_transform 10 - from cross.service import InputService, OutputService 11 + from cross.service import InputService 11 12 from database.connection import DatabasePool 12 13 from util.util import LOGGER, normalize_service_url 13 14 ··· 48 49 def __init__(self, db: DatabasePool) -> None: 49 50 super().__init__(SERVICE, db) 50 51 52 + def _on_post(self, record: dict[str, Any]): 53 + LOGGER.info(record) # TODO 54 + 55 + def _on_repost(self, record: dict[str, Any]): 56 + LOGGER.info(record) # TODO 57 + 58 + def _on_delete_post(self, post_id: str, repost: bool): 59 + LOGGER.info("%s | %s", post_id, repost) # TODO 60 + 51 61 52 62 class BlueskyJetstreamInputService(BlueskyBaseInputService): 53 63 def __init__(self, db: DatabasePool, options: BlueskyJetstreamInputOptions) -> None: ··· 59 69 def get_identity_options(self) -> tuple[str | None, str | None, str | None]: 60 70 return (self.options.handle, self.options.did, self.options.pds) 61 71 72 + def _accept_msg(self, msg: websockets.Data) -> None: 73 + data: dict[str, Any] = cast(dict[str, Any], json.loads(msg)) 74 + if data.get("did") != self.did: 75 + return 76 + commit: dict[str, Any] | None = data.get("commit") 77 + if not commit: 78 + return 79 + 80 + commit_type: str = cast(str, commit["operation"]) 81 + match commit_type: 82 + case "create": 83 + record: dict[str, Any] = cast(dict[str, Any], commit["record"]) 84 + record["$xpost.strongRef"] = { 85 + "cid": commit["cid"], 86 + "uri": f"at://{self.did}/{commit['collection']}/{commit['rkey']}", 87 + } 88 + 89 + match cast(str, commit["collection"]): 90 + case "app.bsky.feed.post": 91 + self._on_post(record) 92 + case "app.bsky.feed.repost": 93 + self._on_repost(record) 94 + case _: 95 + pass 96 + case "delete": 97 + post_id: str = ( 98 + f"at://{self.did}/{commit['collection']}/{commit['rkey']}" 99 + ) 100 + match cast(str, commit["collection"]): 101 + case "app.bsky.feed.post": 102 + self._on_delete_post(post_id, False) 103 + case "app.bsky.feed.repost": 104 + self._on_delete_post(post_id, True) 105 + case _: 106 + pass 107 + case _: 108 + pass 109 + 62 110 @override 63 - async def listen( 64 - self, 65 - outputs: list[OutputService], 66 - submitter: Callable[[Callable[[], None]], None], 67 - ): 111 + async def listen(self): 68 112 url = self.options.jetstream + "?" 69 113 url += "wantedCollections=app.bsky.feed.post" 70 114 url += "&wantedCollections=app.bsky.feed.repost" ··· 76 120 77 121 async def listen_for_messages(): 78 122 async for msg in ws: 79 - LOGGER.info(msg) # TODO 123 + self.submitter(lambda: self._accept_msg(msg)) 80 124 81 125 listen = asyncio.create_task(listen_for_messages()) 82 126
+5 -6
cross/service.py
··· 11 11 def __init__(self, url: str, db: DatabasePool) -> None: 12 12 self.url: str = url 13 13 self.db: DatabasePool = db 14 - #self._lock: threading.Lock = threading.Lock() 14 + # self._lock: threading.Lock = threading.Lock() 15 15 16 16 def get_post(self, url: str, user: str, identifier: str) -> sqlite3.Row | None: 17 17 cursor = self.db.get_conn().cursor() ··· 52 52 53 53 54 54 class InputService(ABC, Service): 55 + outputs: list[OutputService] 56 + submitter: Callable[[Callable[[], None]], None] 57 + 55 58 @abstractmethod 56 - async def listen( 57 - self, 58 - outputs: list[OutputService], 59 - submitter: Callable[[Callable[[], None]], None], 60 - ): 59 + async def listen(self): 61 60 pass
+3 -1
main.py
··· 76 76 thread.start() 77 77 78 78 LOGGER.info("Connecting to %s...", input.url) 79 + input.outputs = outputs 80 + input.submitter = lambda c: task_queue.put(c) 79 81 try: 80 - asyncio.run(input.listen(outputs, lambda c: task_queue.put(c))) 82 + asyncio.run(input.listen()) 81 83 except KeyboardInterrupt: 82 84 LOGGER.info("Stopping...") 83 85
+21 -8
mastodon/input.py
··· 1 1 import asyncio 2 + import json 2 3 import re 3 4 from dataclasses import dataclass, field 4 - from typing import Any, Callable, override 5 + from typing import Any, cast, override 5 6 6 7 import websockets 7 8 8 - from cross.service import InputService, OutputService 9 + from cross.service import InputService 9 10 from database.connection import DatabasePool 10 11 from mastodon.info import MastodonService, validate_and_transform 11 12 from util.util import LOGGER ··· 54 55 def _get_token(self) -> str: 55 56 return self.options.token 56 57 58 + def _on_create_post(self, status: dict[str, Any]): 59 + LOGGER.info(status) # TODO 60 + 61 + def _on_delete_post(self, status_id: str): 62 + LOGGER.info(status_id) # TODO 63 + 64 + def _accept_msg(self, msg: websockets.Data) -> None: 65 + data: dict[str, Any] = cast(dict[str, Any], json.loads(msg)) 66 + event: str = cast(str, data['event']) 67 + payload: str = cast(str, data['payload']) 68 + 69 + if event == "update": 70 + self._on_create_post(json.loads(payload)) 71 + elif event == "delete": 72 + self._on_delete_post(payload) 73 + 57 74 @override 58 - async def listen( 59 - self, 60 - outputs: list[OutputService], 61 - submitter: Callable[[Callable[[], None]], None], 62 - ): 75 + async def listen(self): 63 76 url = f"{self.streaming_url}/api/v1/streaming?stream=user" 64 77 65 78 async for ws in websockets.connect( ··· 70 83 71 84 async def listen_for_messages(): 72 85 async for msg in ws: 73 - LOGGER.info(msg) # TODO 86 + self.submitter(lambda: self._accept_msg(msg)) 74 87 75 88 listen = asyncio.create_task(listen_for_messages()) 76 89
+17 -8
misskey/input.py
··· 3 3 import re 4 4 import uuid 5 5 from dataclasses import dataclass, field 6 - from typing import Any, Callable, override 6 + from typing import Any, cast, override 7 7 8 8 import websockets 9 9 10 - from cross.service import InputService, OutputService 10 + from cross.service import InputService 11 11 from database.connection import DatabasePool 12 12 from misskey.info import MisskeyService 13 13 from util.util import LOGGER, normalize_service_url ··· 52 52 def _get_token(self) -> str: 53 53 return self.options.token 54 54 55 + def _on_note(self, note: dict[str, Any]): 56 + LOGGER.info(note) # TODO 57 + 58 + def _accept_msg(self, msg: websockets.Data) -> None: 59 + data: dict[str, Any] = cast(dict[str, Any], json.loads(msg)) 60 + 61 + if data["type"] == "channel": 62 + type: str = cast(str, data["body"]["type"]) 63 + if type == "note" or type == "reply": 64 + note_body = data["body"]["body"] 65 + self._on_note(note_body) 66 + return 67 + 55 68 async def _subscribe_to_home(self, ws: websockets.ClientConnection) -> None: 56 69 await ws.send( 57 70 json.dumps( ··· 64 77 LOGGER.info("Subscribed to 'homeTimeline' channel...") 65 78 66 79 @override 67 - async def listen( 68 - self, 69 - outputs: list[OutputService], 70 - submitter: Callable[[Callable[[], None]], None], 71 - ): 80 + async def listen(self): 72 81 streaming: str = f"{'wss' if self.url.startswith('https') else 'ws'}://{self.url.split('://', 1)[1]}" 73 82 url: str = f"{streaming}/streaming?i={self.options.token}" 74 83 ··· 79 88 80 89 async def listen_for_messages(): 81 90 async for msg in ws: 82 - LOGGER.info(msg) # TODO 91 + self.submitter(lambda: self._accept_msg(msg)) 83 92 84 93 listen = asyncio.create_task(listen_for_messages()) 85 94