social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

fix mypy errors

zenfyr.dev aa2d8e3b a9e5150e

verified
+28 -18
+1
.tangled/workflows/run-tests.yml
··· 7 7 dependencies: 8 8 nixpkgs: 9 9 - uv 10 + - ruff 10 11 - python312 11 12 12 13 steps:
+8 -2
atproto/identity.py
··· 51 51 return None # tombstone or not registered 52 52 else: 53 53 response.raise_for_status() 54 + return None 54 55 55 56 def try_resolve_web(self, did: str) -> DidDocument | None: 56 57 url = f"http://{did[len('did:web:') :]}/.well-known/did.json" ··· 62 63 return None # tombstone or gone 63 64 else: 64 65 response.raise_for_status() 66 + return None 65 67 66 68 def resolve_did(self, did: str) -> DidDocument: 67 69 cached = self.__cache.get(did) ··· 79 81 self.__cache.set(did, from_web) 80 82 return from_web 81 83 raise Exception(f"Failed to resolve {did}!") 84 + return None # mypy 82 85 83 86 @override 84 87 def dump_cache(self, path: Path): ··· 101 104 for txt_data in rdata.strings: 102 105 did = txt_data.decode("utf-8").strip() 103 106 if did.startswith("did="): 104 - return did[4:] 107 + return str(did[4:]) 105 108 except dns.resolver.NXDOMAIN: 106 109 LOGGER.debug(f"DNS record not found for _atproto.{handle}") 107 110 return None 108 111 except dns.resolver.NoAnswer: 109 112 LOGGER.debug(f"No TXT records found for _atproto.{handle}") 110 113 return None 114 + return None 111 115 112 116 def try_resolve_http(self, handle: str) -> str | None: 113 117 url = f"http://{handle}/.well-known/atproto-did" 114 118 response = requests.get(url, timeout=10, allow_redirects=True) 115 119 116 120 if response.status_code == 200: 117 - did = response.text.strip() 121 + did = str(response.text.strip()) 118 122 if did.startswith("did:"): 119 123 return did 120 124 else: 121 125 raise ValueError(f"Got invalid did: from {url} = {did}!") 122 126 else: 123 127 response.raise_for_status() 128 + return None 124 129 125 130 def resolve_handle(self, handle: str) -> str: 126 131 cached = self.__cache.get(handle) ··· 138 143 return from_http 139 144 140 145 raise Exception(f"Failed to resolve handle {handle}!") 146 + return None # mypy 141 147 142 148 @override 143 149 def dump_cache(self, path: Path):
+2
bluesky/input.py
··· 86 86 87 87 embed: dict[str, Any] = record.get("embed", {}) 88 88 blob_urls: list[tuple[str, str, str | None]] = [] 89 + 89 90 def handle_embeds(embed: dict[str, Any]) -> str | None: 90 91 nonlocal blob_urls, post 91 92 match cast(str, embed["$type"]): ··· 115 116 blob_urls.append((url, blob_cid, embed.get("alt"))) 116 117 case _: 117 118 self.log.warning(f"Unhandled embed type {embed['$type']}") 119 + return None 118 120 119 121 if embed: 120 122 fexit = handle_embeds(embed)
+4 -4
cross/media.py
··· 34 34 mime = MAGIC.from_buffer(io) 35 35 if not mime: 36 36 mime = "application/octet-stream" 37 - return mime 37 + return str(mime) 38 38 39 39 def download_blob(url: str, alt: str | None = None, max_bytes: int = 100_000_000) -> Blob | None: 40 40 name = get_filename_from_url(url) ··· 72 72 if disposition: 73 73 filename = FILENAME.findall(disposition) 74 74 if filename: 75 - return filename[0] 75 + return str(filename[0]) 76 76 except requests.RequestException: 77 77 pass 78 78 ··· 83 83 if base_name == "com.atproto.sync.getBlob": 84 84 qs = urllib.parse.parse_qs(parsed_url.query) 85 85 if qs and qs.get("cid"): 86 - return qs["cid"][0] 86 + return str(qs["cid"][0]) 87 87 88 88 return base_name 89 89 ··· 155 155 if proc.returncode != 0: 156 156 raise RuntimeError(f"ffprobe failed: {proc.stderr.decode()}") 157 157 158 - return json.loads(proc.stdout) 158 + return json.loads(proc.stdout) # type: ignore[no-any-return] 159 159 160 160 161 161 def get_media_meta(bytes: bytes) -> MediaInfo:
+3 -2
database/connection.py
··· 13 13 if getattr(self._local, 'conn', None) is None: 14 14 self._local.conn = get_conn(self.db) 15 15 self._conns.append(self._local.conn) 16 - return self._local.conn 16 + return self._local.conn # type: ignore[no-any-return] 17 17 18 - def close(self): 18 + def close(self) -> None: 19 19 for c in self._conns: 20 20 c.close() 21 + 21 22 22 23 def get_conn(db: Path) -> sqlite3.Connection: 23 24 conn = sqlite3.connect(db, autocommit=True, check_same_thread=False)
+2 -2
database/migrations.py
··· 27 27 _ = cursor.execute(f"PRAGMA user_version = {version}") 28 28 self.conn.commit() 29 29 30 - def apply_migration(self, version: int, filename: str, migration: Callable[[sqlite3.Connection], None]): 30 + def apply_migration(self, version: int, filename: str, migration: Callable[[sqlite3.Connection], None]) -> None: 31 31 try: 32 - _ = migration(self.conn) 32 + migration(self.conn) 33 33 self.set_version(version) 34 34 self.conn.commit() 35 35 LOGGER.info("Applied migration: %s..", filename)
+1 -1
mastodon/output.py
··· 91 91 # TODO stip mfm 92 92 pass 93 93 94 - raw_statuses = [] # TODO split tokens and media across posts 94 + raw_statuses: list[dict[str, Any]] = [] # TODO split tokens and media across posts 95 95 if not raw_statuses: 96 96 self.log.error("Failed to split post into statuses!") 97 97 return
+2 -2
tests/util/util_test.py
··· 41 41 42 42 43 43 def test_read_env_mixed_types(): 44 - data = { 44 + data: dict[str, object] = { 45 45 "string": "env:TOKEN", 46 46 "number": 42, 47 47 "list": [1, 2, 3], ··· 58 58 59 59 60 60 def test_read_env_empty_dict(): 61 - data = {} 61 + data: dict[str, object] = {} 62 62 u.read_env(data) 63 63 assert data == {}
+5 -5
util/splitter.py
··· 26 26 max_chars: int, 27 27 max_link_len: int = 35, 28 28 ) -> list[list[Token]]: 29 + blocks: list[list[Token]] = [] 30 + block: list[Token] = [] 31 + length: int = 0 32 + 29 33 def new_block() -> None: 30 - nonlocal blocks, block, length 34 + nonlocal block, length 31 35 if block: 32 36 blocks.append(block) 33 37 block, length = [], 0 ··· 38 42 block[-1] = replace(block[-1], text=block[-1].text + text) 39 43 else: 40 44 block.append(TextToken(text=text)) 41 - 42 - blocks: list[list[Token]] = [] 43 - block: list[Token] = [] 44 - length: int = 0 45 45 46 46 for tk in tokens: 47 47 if isinstance(tk, TagToken):