social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

refactor: use a result class to wrap and pass errors

zenfyr.dev ad62d8ff 46d54aea

verified
+153 -99
+8 -6
atproto/models.py
··· 3 3 from dataclasses import dataclass, field 4 4 from typing import Any 5 5 6 + from util.util import Result 7 + 6 8 7 9 URI = "at://" 8 10 URI_LEN = len(URI) 9 11 10 12 11 - def cid_from_json(data: str | None) -> str | None: 12 - if not data: 13 - return None 13 + def cid_from_json(data: str | None) -> Result[str, str]: 14 + if data is None: 15 + return Result.err("Expected json, got None") 14 16 15 17 try: 16 - return str(json.loads(data)["cid"]) 17 - except (json.JSONDecodeError, AttributeError, KeyError): 18 - return None 18 + return Result.ok(str(json.loads(data)["cid"])) 19 + except (json.JSONDecodeError, AttributeError, KeyError) as e: 20 + return Result.err(str(e)) 19 21 20 22 21 23 class AtUri:
+28 -18
bluesky/input.py
··· 14 14 from bluesky.info import SERVICE, BlueskyService, validate_and_transform 15 15 from bluesky.richtext import richtext_to_tokens 16 16 from cross.attachments import ( 17 + Attachment, 17 18 LabelsAttachment, 18 19 LanguagesAttachment, 19 20 MediaAttachment, ··· 24 25 from cross.post import Post, PostRef 25 26 from cross.service import InputService 26 27 from database.connection import DatabasePool 28 + from util.util import Result 27 29 28 30 29 31 @dataclass(kw_only=True) ··· 88 90 ) 89 91 90 92 embed: dict[str, Any] = record.get("embed", {}) 91 - blob_urls: list[tuple[str, str, str | None]] = [] 93 + 94 + def handle_embeds( 95 + embed: dict[str, Any], 96 + ) -> Result[tuple[list[tuple[str, str, str | None]], list[Attachment]], str]: 97 + attachments: list[Attachment] = [] 98 + blob_urls: list[tuple[str, str, str | None]] = [] 92 99 93 - def handle_embeds(embed: dict[str, Any]) -> str | None: 94 - nonlocal blob_urls, post 95 100 match cast(str, embed["$type"]): 96 101 case "app.bsky.embed.record" | "app.bsky.embed.recordWithMedia": 97 102 rcrd = ( ··· 101 106 ) 102 107 did, collection, _ = AtUri.record_uri(rcrd["uri"]) 103 108 if collection != "app.bsky.feed.post": 104 - return f"unhandled record collection '{collection}'" 109 + return Result.err(f"unhandled record collection '{collection}'") 105 110 if did != self.did: 106 - return "" 111 + return Result.err(f"quote of other user '{did}'") 107 112 108 113 rquote = self._get_post(self.url, did, rcrd["uri"]) 109 114 if not rquote: 110 - return f"quote '{rcrd['uri']}' not found in db" 111 - post.attachments.put( 115 + return Result.err(f"quote '{rcrd['uri']}' not found in db") 116 + 117 + attachments.append( 112 118 QuoteAttachment(quoted_id=rcrd["uri"], quoted_user=did) 113 119 ) 114 - 115 120 if embed.get("media"): 116 121 return handle_embeds(embed["media"]) 117 122 case "app.bsky.embed.images": ··· 125 130 blob_urls.append((url, blob_cid, embed.get("alt"))) 126 131 case _: 127 132 self.log.warning(f"unhandled embed type '{embed['$type']}'") 128 - return None 133 + return Result.ok((blob_urls, attachments)) 129 134 130 - if embed: 131 - fexit = handle_embeds(embed) 132 - if fexit is not None: 133 - self.log.info("Skipping '%s': %s", post_uri, fexit) 134 - return 135 + embeds = handle_embeds(embed) 136 + if not embeds.is_ok(): 137 + self.log.info("Skipping '%s': %s", post_uri, embeds.error()) 138 + return 139 + 140 + blob_urls, attachments = embeds.value() 141 + for a in attachments: 142 + post.attachments.put(a) 135 143 136 144 if blob_urls: 137 145 blobs: list[Blob] = [] 138 146 for url, cid, alt in blob_urls: 139 147 self.log.info("Downloading '%s'...", cid) 140 - blob: Blob | None = download_blob(url, alt, client=self.http) 141 - if not blob: 148 + blob = download_blob(url, alt, client=self.http) 149 + if not blob.is_ok(): 142 150 self.log.error( 143 - "Skipping '%s': failed to download blob '%s'", post_uri, cid 151 + "Skipping '%s': failed to download blob. %s", 152 + post_uri, 153 + blob.error(), 144 154 ) 145 155 return 146 - blobs.append(blob) 156 + blobs.append(blob.value()) 147 157 post.attachments.put(MediaAttachment(blobs=blobs)) 148 158 149 159 if "langs" in record:
+25 -13
bluesky/output.py
··· 222 222 root_cid = cid_from_json(root_post["extra_data"]) 223 223 reply_cid = cid_from_json(reply_post["extra_data"]) 224 224 225 - if not root_cid or not reply_cid: 226 - self.log.error("Skipping '%s': failed to parse CID from db", post.id) 225 + if not root_cid.is_ok(): 226 + self.log.error( 227 + "Skipping '%s': failed to parse CID. %s", post.id, root_cid.error() 228 + ) 229 + return 230 + if not reply_cid.is_ok(): 231 + self.log.error( 232 + "Skipping '%s': failed to parse CID. %s", post.id, reply_cid.error() 233 + ) 227 234 return 228 235 229 - root_ref = StrongRef(uri=root_uri, cid=root_cid) 230 - reply_ref = StrongRef(uri=reply_uri, cid=reply_cid) 236 + root_ref = StrongRef(uri=root_uri, cid=root_cid.value()) 237 + reply_ref = StrongRef(uri=reply_uri, cid=reply_cid.value()) 231 238 reply_to = ReplyRef(root=root_ref, parent=reply_ref) 232 239 233 240 labels_attachment = post.attachments.get(LabelsAttachment) ··· 314 321 ) 315 322 return 316 323 317 - quoted_cid = cid_from_json(quoted_mappings[0]["extra_data"]) 318 - if not quoted_cid: 319 - self.log.error("Skipping '%s': failed to parse CID from db", post.id) 324 + quoted_result = cid_from_json(quoted_mappings[0]["extra_data"]) 325 + if not quoted_result.is_ok(): 326 + self.log.error( 327 + "Skipping '%s': failed to parse CID. %s", 328 + post.id, 329 + quoted_result.error(), 330 + ) 320 331 return 321 332 333 + quoted_cid = quoted_result.value() 322 334 quoted_uri = quoted_mappings[0]["identifier"] 323 335 324 336 splitter = TokenSplitter(max_chars=300, max_link_len=30) 325 337 token_blocks = splitter.split(tokens) 326 338 327 - if token_blocks is None: 328 - self.log.error("Skipping '%s': links/tags are too long", post.id) 339 + if not token_blocks.is_ok(): 340 + self.log.error("Skipping '%s': %s", post.id, token_blocks.error()) 329 341 return 330 342 331 343 for blob in supported_media: ··· 350 362 return 351 363 352 364 baked_media = self._split_media_per_post( 353 - [list(block) for block in token_blocks], 365 + [list(block) for block in token_blocks.value()], 354 366 supported_media, 355 367 ) 356 368 357 369 precomputed_richtexts: list[tuple[str, list[Facet]]] = [] 358 - for block in token_blocks: 370 + for block in token_blocks.value(): 359 371 result = tokens_to_richtext(block) 360 372 if result is None: 361 373 self.log.error( ··· 555 567 return 556 568 557 569 cid = cid_from_json(mappings[0]["extra_data"]) 558 - if not cid: 570 + if not cid.is_ok(): 559 571 self.log.exception( 560 572 "Skipping repost '%s': failed to parse CID from extra_data", repost.id 561 573 ) 562 574 return 563 575 564 - response = self._client.repost(mappings[0]["identifier"], cid) 576 + response = self._client.repost(mappings[0]["identifier"], cid.value()) 565 577 566 578 self._insert_post( 567 579 {
+10 -8
cross/media.py
··· 9 9 import httpx 10 10 import magic 11 11 12 + from util.util import Result 13 + 12 14 13 15 FILENAME = re.compile(r'filename="?([^\";]*)"?') 14 16 MAGIC = magic.Magic(mime=True) ··· 42 44 alt: str | None = None, 43 45 max_bytes: int = 100_000_000, 44 46 client: httpx.Client | None = None, 45 - ) -> Blob | None: 47 + ) -> Result[Blob, str]: 46 48 name = get_filename_from_url(url, client) 47 49 io = download_chuncked(url, max_bytes, client) 48 - if not io: 49 - return None 50 - return Blob(url, mime_from_bytes(io), io, name, alt) 50 + if not io.is_ok(): 51 + return Result.err(io.error()) 52 + return Result.ok(Blob(url, mime_from_bytes(io.value()), io.value(), name, alt)) 51 53 52 54 53 55 def download_chuncked( 54 56 url: str, max_bytes: int = 100_000_000, client: httpx.Client | None = None 55 - ) -> bytes | None: 57 + ) -> Result[bytes, str]: 56 58 if client is None: 57 59 client = httpx.Client() 58 60 with client.stream("GET", url, timeout=20) as response: 59 61 if response.status_code != 200: 60 - return None 62 + return Result.err(f"HTTP {response.status_code}: {response.text}") 61 63 62 64 downloaded_bytes = b"" 63 65 current_size = 0 ··· 68 70 69 71 current_size += len(chunk) 70 72 if current_size > max_bytes: 71 - return None 73 + return Result.err(f"'{url}' larger than max_bytes ({max_bytes})") 72 74 73 75 downloaded_bytes += chunk 74 76 75 - return downloaded_bytes 77 + return Result.ok(downloaded_bytes) 76 78 77 79 78 80 def get_filename_from_url(url: str, client: httpx.Client | None = None) -> str:
+5 -7
mastodon/input.py
··· 155 155 blobs: list[Blob] = [] 156 156 for media in status.get("media_attachments", []): 157 157 self.log.info("Downloading '%s'...", media["url"]) 158 - blob: Blob | None = download_blob( 159 - media["url"], media.get("alt"), client=self.http 160 - ) 161 - if not blob: 158 + blob = download_blob(media["url"], media.get("alt"), client=self.http) 159 + if not blob.is_ok(): 162 160 self.log.error( 163 - "Skipping '%s': failed to download attachment '%s'", 161 + "Skipping '%s': failed to download attachment. %s", 164 162 status["id"], 165 - media["url"], 163 + blob.value(), 166 164 ) 167 165 return 168 - blobs.append(blob) 166 + blobs.append(blob.value()) 169 167 170 168 if blobs: 171 169 post.attachments.put(MediaAttachment(blobs=blobs))
+39 -38
mastodon/output.py
··· 19 19 from database.connection import DatabasePool 20 20 from mastodon.info import InstanceInfo, MastodonService, validate_and_transform 21 21 from util.splitter import TokenSplitter, canonical_label 22 + from util.util import Result 22 23 23 24 24 25 ALLOWED_POSTING_VISIBILITY: list[str] = ["public", "unlisted", "private"] ··· 65 66 response = self.fetch_instance_info() 66 67 self.instance_info: InstanceInfo = InstanceInfo.from_api(response) 67 68 68 - def _token_to_string(self, tokens: list[Token]) -> str | None: 69 + def _token_to_string(self, tokens: list[Token]) -> Result[str, str]: 69 70 text: str = "" 70 71 for token in tokens: 71 72 match token: ··· 91 92 else: 92 93 text += token.href 93 94 else: 94 - return None 95 - return text 95 + return Result.err( 96 + f"unsupported instance text format '{self.instance_info.text_format}'" 97 + ) 98 + case _: 99 + return Result.err( 100 + f"unsupported token type '{type(token).__name__}'" 101 + ) 102 + 103 + return Result.ok(text) 96 104 97 105 def _split_tokens_and_media( 98 106 self, 99 107 tokens: list[Token], 100 108 media: list[Blob], 101 - ) -> list[tuple[str, list[Blob]]] | None: 109 + ) -> Result[list[tuple[str, list[Blob]]], str]: 102 110 splitter = TokenSplitter( 103 111 max_chars=self.instance_info.max_characters, 104 112 max_link_len=self.instance_info.characters_reserved_per_url, 105 113 ) 106 114 split_token_blocks = splitter.split(tokens) 107 115 108 - if split_token_blocks is None: 109 - return None 116 + if not split_token_blocks.is_ok(): 117 + return Result.err(split_token_blocks.error()) 110 118 111 119 post_texts: list[str] = [] 112 - for block in split_token_blocks: 120 + for block in split_token_blocks.value(): 113 121 baked_text = self._token_to_string(block) 114 - if baked_text is None: 115 - return None 116 - post_texts.append(baked_text) 122 + if not baked_text.is_ok(): 123 + return Result.err(baked_text.error()) 124 + post_texts.append(baked_text.value()) 117 125 118 126 if not post_texts: 119 127 post_texts = [""] ··· 123 131 ] 124 132 available_indices: list[int] = list(range(len(posts))) 125 133 current_image_post_idx: int | None = None 126 - # video_post_idx: int | None = None 127 134 128 135 def make_blank_post() -> dict[str, Any]: 129 136 return {"text": "", "attachments": []} ··· 155 162 result: list[tuple[str, list[Blob]]] = [] 156 163 for p in posts: 157 164 result.append((p["text"], p["attachments"])) 158 - return result 165 + return Result.ok(result) 159 166 160 - def _upload_media(self, attachments: list[Blob]) -> list[str] | None: 167 + def _upload_media(self, attachments: list[Blob]) -> Result[list[str], str]: 161 168 for blob in attachments: 162 169 if ( 163 170 blob.mime.startswith("image/") 164 171 and len(blob.io) > self.instance_info.image_size_limit 165 172 ): 166 - self.log.error( 167 - "Image too large: %s bytes (limit: %s)", 168 - len(blob.io), 169 - self.instance_info.image_size_limit, 173 + return Result.err( 174 + f"image too large: {len(blob.io)} bytes (limit: {self.instance_info.image_size_limit})" 170 175 ) 171 - return None 172 176 if ( 173 177 blob.mime.startswith("video/") 174 178 and len(blob.io) > self.instance_info.video_size_limit 175 179 ): 176 - self.log.error( 177 - "Video too large: %s bytes (limit: %s)", 178 - len(blob.io), 179 - self.instance_info.video_size_limit, 180 + return Result.err( 181 + f"video too large: {len(blob.io)} bytes (limit: {self.instance_info.video_size_limit})" 180 182 ) 181 - return None 182 183 if ( 183 184 not blob.mime.startswith(("image/", "video/")) 184 185 and len(blob.io) > 7_000_000 185 186 ): 186 - self.log.error("File too large: %s bytes", len(blob.io)) 187 - return None 187 + return Result.err( 188 + f"file too large: {len(blob.io)} bytes (limit: 7000000)" 189 + ) 188 190 189 191 uploads: list[MediaUploadResult] = [] 190 192 ··· 244 246 continue 245 247 response.raise_for_status() 246 248 247 - return [result.id for result in uploads] 249 + return Result.ok([result.id for result in uploads]) 248 250 249 251 @override 250 252 def accept_post(self, post: Post): ··· 321 323 media_blobs = media_attachment.blobs if media_attachment else [] 322 324 323 325 raw_statuses = self._split_tokens_and_media(post_tokens, media_blobs) 324 - if not raw_statuses: 325 - self.log.error("Skipping '%s': couldn't split post into statuses", post.id) 326 + if not raw_statuses.is_ok(): 327 + self.log.error("Skipping '%s': %s", post.id, raw_statuses.error()) 326 328 return 327 329 328 - baked_statuses: list[tuple[str, list[str] | None]] = [] 329 - for status_text, raw_media in raw_statuses: 330 - media_ids: list[str] | None = None 330 + baked_statuses: list[tuple[str, list[str]]] = [] 331 + for status_text, raw_media in raw_statuses.value(): 331 332 if raw_media: 332 - media_ids = self._upload_media(raw_media) 333 - if not media_ids: 334 - self.log.error( 335 - "Skipping '%s': failed to upload attachments", post.id 336 - ) 333 + baked_media = self._upload_media(raw_media) 334 + if not baked_media.is_ok(): 335 + self.log.error("Skipping '%s': %s", post.id, baked_media.error()) 337 336 return 338 - baked_statuses.append((status_text, media_ids)) 337 + baked_statuses.append((status_text, baked_media.value())) 338 + else: 339 + baked_statuses.append((status_text, [])) 339 340 340 341 created_statuses: list[str] = [] 341 342 payload_sensitive = sensitive.sensitive if sensitive else False ··· 343 344 for i, (status_text, media_ids) in enumerate(baked_statuses): 344 345 payload: dict[str, Any] = { 345 346 "status": status_text or "", 346 - "media_ids": media_ids or [], 347 + "media_ids": media_ids, 347 348 "visibility": self.options.visibility, 348 349 "content_type": self.instance_info.text_format, 349 350 "language": lang,
+5 -5
misskey/input.py
··· 154 154 blobs: list[Blob] = [] 155 155 for media in note.get("files", []): 156 156 self.log.info("Downloading '%s'...", media["url"]) 157 - blob: Blob | None = download_blob( 157 + blob = download_blob( 158 158 media["url"], media.get("comment", ""), client=self.http 159 159 ) 160 - if not blob: 160 + if not blob.is_ok(): 161 161 self.log.error( 162 - "Skipping '%s': failed to download media '%s'.", 162 + "Skipping '%s': failed to download media. %s", 163 163 note["id"], 164 - media["url"], 164 + blob.error(), 165 165 ) 166 166 return 167 - blobs.append(blob) 167 + blobs.append(blob.value()) 168 168 169 169 if blobs: 170 170 post.attachments.put(MediaAttachment(blobs=blobs))
+4 -3
util/splitter.py
··· 4 4 import grapheme 5 5 6 6 from cross.tokens import LinkToken, TagToken, TextToken, Token 7 + from util.util import Result 7 8 8 9 9 10 def canonical_label(label: str | None, href: str): ··· 178 179 self.best_split_idx = (len(self.current_block) - 1, self.current_length, 0) 179 180 return True 180 181 181 - def split(self, tokens: list[Token]) -> list[list[Token]] | None: 182 + def split(self, tokens: list[Token]) -> Result[list[list[Token]], str]: 182 183 for token in tokens: 183 184 if not self._process_token(token): 184 - return None 185 + return Result.err("token larger than character limit") 185 186 186 187 self._save_block() 187 - return self.blocks 188 + return Result.ok(self.blocks)
+29 -1
util/util.py
··· 2 2 import os 3 3 import sys 4 4 from collections.abc import Callable 5 - from typing import Any 5 + from typing import Any, cast 6 6 7 7 import env 8 8 ··· 13 13 logging.getLogger("httpx").setLevel(logging.WARNING) 14 14 logging.getLogger("httpcore").setLevel(logging.WARNING) 15 15 LOGGER = logging.getLogger("XPost") 16 + 17 + 18 + class Result[V, E]: 19 + _value: V 20 + _error: E | None 21 + 22 + def __init__(self, value: V, err: E) -> None: 23 + self._value = value 24 + self._error = err 25 + 26 + def error(self) -> E: 27 + if self._error is None: 28 + raise ValueError("self._error not set!") 29 + return self._error 30 + 31 + def value(self) -> V: 32 + return self._value 33 + 34 + def is_ok(self) -> bool: 35 + return self._error is None 36 + 37 + @classmethod 38 + def err(cls, err: E) -> "Result[V, E]": 39 + return cast("Result[V, E]", Result(None, err)) 40 + 41 + @classmethod 42 + def ok(cls, val: V) -> "Result[V, E]": 43 + return cast("Result[V, E]", Result(val, None)) 16 44 17 45 18 46 def normalize_service_url(url: str) -> str: