social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

chore: clean-up logging

zenfyr.dev a7238685 71b11e27

verified
+149 -248
+3 -6
atproto/xrpc.py
··· 57 57 return None 58 58 if session.is_access_token_expired(): 59 59 if not session.is_refresh_token_expired(): 60 - LOGGER.info("refreshing session for %s", session.did) 60 + LOGGER.info("Refreshing session for '%s'", session.did) 61 61 return self.refresh_session(session) 62 - LOGGER.info("both tokens expired for %s, removing session", session.did) 63 62 self.store.remove_session(did) 64 63 raise ValueError( 65 64 "Both access and refresh tokens expired. Please login again." ··· 93 92 94 93 session = Session.from_dict(response.json(), self.pds_url) 95 94 self.store.set_session(session) 96 - LOGGER.info("Created session for %s (%s)", session.handle, session.did) 95 + LOGGER.info("Created session for '%s'", session.did) 97 96 return session 98 97 99 98 def refresh_session(self, session: Session) -> Session: ··· 115 114 116 115 new_session = Session.from_dict(response.json(), self.pds_url) 117 116 self.store.set_session(new_session) 118 - LOGGER.info( 119 - "Refreshed session for %s (%s)", new_session.handle, new_session.did 120 - ) 117 + LOGGER.info("Refreshed session for '%s'", new_session.did) 121 118 return new_session 122 119 123 120 def get_access_token(self, did: str) -> str | None:
+2 -2
bluesky/info.py
··· 40 40 if not did: 41 41 if not handle: 42 42 raise KeyError("No did: or atproto handle provided!") 43 - self.log.info("Resolving ATP identity for %s...", handle) 43 + self.log.info("Resolving ATP identity for '%s'...", handle) 44 44 identity = resolve_identity(handle, self._store) 45 45 self.did = identity.did 46 46 47 47 if not pds: 48 - self.log.info("Resolving PDS for %s...", self.did) 48 + self.log.info("Resolving PDS for '%s'...", self.did) 49 49 identity = resolve_identity(self.did, self._store) 50 50 self.pds = identity.pds 51 51
+24 -26
bluesky/input.py
··· 52 52 post_uri = cast(str, record["$xpost.strongRef"]["uri"]) 53 53 post_cid = cast(str, record["$xpost.strongRef"]["cid"]) 54 54 55 - self.log.info("Processing new post: %s", post_uri) 56 - 57 55 if self._is_post_crossposted(self.url, self.did, post_uri): 58 56 self.log.info( 59 - "Skipping %s, already crossposted", 57 + "Skipping '%s': already crossposted", 60 58 post_uri, 61 59 ) 62 60 return ··· 66 64 ) 67 65 parent = None 68 66 if parent_uri: 67 + did, _, _ = AtUri.record_uri(parent_uri) 68 + if did != self.did: 69 + self.log.info("Skipping '%s': reply to other user..", post_uri) 70 + return 71 + 69 72 parent = self._get_post(self.url, self.did, parent_uri) 70 73 if not parent: 71 74 self.log.info( 72 - "Skipping %s, parent %s not found in db", post_uri, parent_uri 75 + "Skipping '%s': parent '%s' not found in db", post_uri, parent_uri 73 76 ) 74 77 return 75 78 ··· 101 104 ) 102 105 did, collection, _ = AtUri.record_uri(rcrd["uri"]) 103 106 if collection != "app.bsky.feed.post": 104 - return f"Unhandled record collection {collection}" 107 + return f"unhandled record collection '{collection}'" 105 108 if did != self.did: 106 109 return "" 107 110 108 111 rquote = self._get_post(self.url, did, rcrd["uri"]) 109 112 if not rquote: 110 - return f"Quote {rcrd['uri']} not found in the db" 113 + return f"quote '{rcrd['uri']}' not found in db" 111 114 post.attachments.put( 112 115 QuoteAttachment(quoted_id=rcrd["uri"], quoted_user=did) 113 116 ) ··· 124 127 url = f"{self.pds}/xrpc/com.atproto.sync.getBlob?did={self.did}&cid={blob_cid}" 125 128 blob_urls.append((url, blob_cid, embed.get("alt"))) 126 129 case _: 127 - self.log.warning(f"Unhandled embed type {embed['$type']}") 130 + self.log.warning(f"unhandled embed type '{embed['$type']}'") 128 131 return None 129 132 130 133 if embed: 131 134 fexit = handle_embeds(embed) 132 135 if fexit is not None: 133 - self.log.info("Skipping %s! %s", post_uri, fexit) 136 + self.log.info("Skipping '%s': %s", post_uri, fexit) 134 137 return 135 138 136 139 if blob_urls: 137 140 blobs: list[Blob] = [] 138 141 for url, cid, alt in blob_urls: 139 - self.log.info("Downloading %s...", cid) 142 + self.log.info("Downloading '%s'...", cid) 140 143 blob: Blob | None = download_blob(url, alt, client=self.http) 141 144 if not blob: 142 145 self.log.error( 143 - "Skipping %s! Failed to download blob %s.", post_uri, cid 146 + "Skipping '%s': failed to download blob '%s'", post_uri, cid 144 147 ) 145 148 return 146 149 blobs.append(blob) ··· 178 181 } 179 182 ) 180 183 181 - self.log.info("Post stored in DB: %s", post_uri) 182 - 184 + self.log.info("Crossposting: '%s'", post_uri) 183 185 for out in self.outputs: 184 186 self.submitter(lambda: out.accept_post(post)) 185 187 ··· 187 189 post_uri = cast(str, record["$xpost.strongRef"]["uri"]) 188 190 post_cid = cast(str, record["$xpost.strongRef"]["cid"]) 189 191 190 - self.log.info("Processing repost: %s", post_uri) 191 - 192 192 reposted_uri = cast(str, record["subject"]["uri"]) 193 193 reposted = self._get_post(self.url, self.did, reposted_uri) 194 194 if not reposted: 195 195 self.log.info( 196 - "Skipping repost '%s' as reposted post '%s' was not found in the db.", 196 + "Skipping repost '%s': reposted post '%s' not found in db", 197 197 post_uri, 198 198 reposted_uri, 199 199 ) ··· 209 209 } 210 210 ) 211 211 212 - self.log.info("Repost stored in DB: %s", post_uri) 213 - 214 212 repost_ref = PostRef(id=post_uri, author=self.did, service=self.url) 215 213 reposted_ref = PostRef(id=reposted_uri, author=self.did, service=self.url) 214 + 215 + self.log.info("Crossposting: '%s'", post_uri) 216 216 for out in self.outputs: 217 217 self.submitter(lambda: out.accept_repost(repost_ref, reposted_ref)) 218 218 219 219 def _on_delete_post(self, post_id: str, repost: bool): 220 - self.log.info("Processing delete for %s (repost: %s)...", post_id, repost) 221 220 post = self._get_post(self.url, self.did, post_id) 222 221 if not post: 223 - self.log.warning("Post not found in DB: %s", post_id) 222 + self.log.warning("Skipping delete '%s': post not found in db", post_id) 224 223 return 225 224 226 225 post_ref = PostRef(id=post_id, author=self.did, service=self.url) 227 226 if repost: 228 - self.log.info("Deleting repost: %s", post_id) 227 + self.log.info("Deleting repost: '%s'", post_id) 229 228 for output in self.outputs: 230 229 self.submitter(lambda: output.delete_repost(post_ref)) 231 230 else: 232 - self.log.info("Deleting post: %s", post_id) 231 + self.log.info("Deleting post: '%s'", post_id) 233 232 for output in self.outputs: 234 233 self.submitter(lambda: output.delete_post(post_ref)) 235 234 self.submitter(lambda: self._delete_post_by_id(post["id"])) 236 - self.log.info("Delete successful for %s", post_id) 237 235 238 236 239 237 class BlueskyJetstreamInputService(BlueskyBaseInputService): ··· 304 302 close_timeout=5, 305 303 ): 306 304 try: 307 - self.log.info("Listening to %s...", env.JETSTREAM_URL) 305 + self.log.info("Listening to '%s'...", env.JETSTREAM_URL) 308 306 309 307 async def listen_for_messages(): 310 308 async for msg in ws: ··· 315 313 _ = await asyncio.gather(listen) 316 314 except websockets.ConnectionClosedError as e: 317 315 self.log.error(e, stack_info=True, exc_info=True) 318 - self.log.info("Reconnecting to %s...", env.JETSTREAM_URL) 316 + self.log.info("Reconnecting to '%s'...", env.JETSTREAM_URL) 319 317 continue 320 318 except TimeoutError as e: 321 - self.log.error("Connection timeout: %s", e) 322 - self.log.info("Reconnecting to %s...", env.JETSTREAM_URL) 319 + self.log.error("Connection timeout: '%s'", e) 320 + self.log.info("Reconnecting to '%s'...", env.JETSTREAM_URL) 323 321 continue
+33 -68
bluesky/output.py
··· 56 56 validate_and_transform(data) 57 57 58 58 if "password" not in data: 59 - raise KeyError("password is required for bluesky") 59 + raise KeyError("'password' is required for bluesky") 60 60 61 61 if "quote_gate" in data: 62 62 data["quote_gate"] = bool(data["quote_gate"]) ··· 96 96 self.options.password, 97 97 ) 98 98 self.options.password = "" 99 - self.log.info("Logged in as %s", self.did) 99 + self.log.info("Logged in as '%s'", self.did) 100 100 101 101 @override 102 102 def get_identity_options(self) -> tuple[str | None, str | None, str | None]: ··· 184 184 185 185 @override 186 186 def accept_post(self, post: Post): 187 - self.log.info( 188 - "Accepting post %s (author: %s, service: %s)...", 189 - post.id, 190 - post.author, 191 - post.service, 192 - ) 187 + db_post = self._get_post(post.service, post.author, post.id) 188 + if not db_post: 189 + self.log.error("Skipping '%s': post not found in db") 190 + return 191 + 193 192 reply_to: ReplyRef | None = None 194 193 new_root_id: int | None = None 195 194 new_parent_id: int | None = None ··· 197 196 if post.parent_id: 198 197 parent = self._get_post(post.service, post.author, post.parent_id) 199 198 if not parent: 200 - self.log.error("Parent post not found in DB: %s", post.parent_id) 199 + self.log.error( 200 + "Skipping '%s': parent post not found in db", post.parent_id 201 + ) 201 202 return 202 203 203 204 thread = self._find_mapped_thread( ··· 209 210 ) 210 211 if not thread: 211 212 self.log.error( 212 - "Failed to find thread tuple in the database for parent: %s", 213 + "Skipping '%s': parent thread tuple not found in db", 213 214 post.parent_id, 214 215 ) 215 216 return ··· 220 221 reply_post = self._get_post(self.url, self.did, reply_uri) 221 222 222 223 if not root_post or not reply_post: 223 - self.log.error("Failed to fetch parent posts from database!") 224 + self.log.error("Skipping '%s': failed to fetch parent posts from db") 224 225 return 225 226 226 227 try: ··· 233 234 json.loads(reply_cid_data).get("cid", "") if reply_cid_data else "" 234 235 ) 235 236 except (json.JSONDecodeError, AttributeError, KeyError): 236 - self.log.error("Failed to parse CID from database!") 237 + self.log.error("Skipping '%s': failed to parse CID from db") 237 238 return 238 239 239 240 root_ref = StrongRef(uri=root_uri, cid=root_cid) ··· 309 310 quoted_uri: str | None = None 310 311 if quote_attachment: 311 312 if quote_attachment.quoted_user != post.author: 312 - self.log.info("Quoted other user, skipping quote!") 313 + self.log.info("Skipping '%s': quoted other user") 313 314 return 314 315 315 316 quoted_post = self._get_post( 316 317 post.service, post.author, quote_attachment.quoted_id 317 318 ) 318 319 if not quoted_post: 319 - self.log.error("Failed to find quoted post in the database!") 320 + self.log.error("Skipping '%s': quoted post not found in db!") 320 321 else: 321 322 quoted_mappings = self._get_mappings( 322 323 quoted_post["id"], self.url, self.did 323 324 ) 324 325 if not quoted_mappings: 325 - self.log.error("Failed to find mappings for quoted post!") 326 + self.log.error( 327 + "Skipping '%s': failed to find mappings for quoted post" 328 + ) 326 329 else: 327 330 bluesky_quoted_post = self._get_post( 328 331 self.url, self.did, quoted_mappings[0]["identifier"] 329 332 ) 330 333 if not bluesky_quoted_post: 331 - self.log.error("Failed to find Bluesky quoted post!") 334 + self.log.error( 335 + "Skipping '%s': Failed to find Bluesky quoted post!" 336 + ) 332 337 else: 333 338 quoted_cid_data = bluesky_quoted_post["extra_data"] 334 339 quoted_cid = ( ··· 342 347 token_blocks = splitter.split(tokens) 343 348 344 349 if token_blocks is None: 345 - self.log.error( 346 - "Skipping '%s' as it contains links/tags that are too long!", post.id 347 - ) 350 + self.log.error("Skipping '%s': links/tags are too long", post.id) 348 351 return 349 352 350 353 for blob in supported_media: 351 354 if blob.mime.startswith("image/") and len(blob.io) > 2_000_000: 352 355 self.log.error( 353 - "Skipping post '%s', image too large!", 356 + "Skipping '%s': image too large", 354 357 post.id, 355 358 ) 356 359 return 357 360 if blob.mime.startswith("video/"): 358 361 if blob.mime != "video/mp4" and not self.options.encode_videos: 359 362 self.log.info( 360 - "Video is not mp4, but encoding is disabled. Skipping '%s'...", 363 + "Skipping '%s': video is not mp4, but encoding is disabled", 361 364 post.id, 362 365 ) 363 366 return 364 367 if len(blob.io) > 100_000_000: 365 368 self.log.error( 366 - "Skipping post '%s', video too large!", 369 + "Skipping '%s': video too large", 367 370 post.id, 368 371 ) 369 372 return ··· 378 381 result = tokens_to_richtext(block) 379 382 if result is None: 380 383 self.log.error( 381 - "Skipping '%s' as it contains invalid rich text types!", 384 + "Skipping '%s': invalid rich text types", 382 385 post.id, 383 386 ) 384 387 return ··· 479 482 480 483 if duration and duration > 180: 481 484 self.log.info( 482 - "Skipping post '%s', video too long (%.1f > 180s)!", 485 + "Skipping '%s': video too long (%.1f > 180s)", 483 486 post.id, 484 487 duration, 485 488 ) ··· 509 512 self.options.quote_gate, 510 513 ) 511 514 512 - db_post = self._get_post(post.service, post.author, post.id) 513 - if not db_post: 514 - self.log.error("Post not found in database!") 515 - return 516 - 517 515 if new_root_id is None or new_parent_id is None: 518 516 self._insert_post( 519 517 { ··· 562 560 563 561 @override 564 562 def delete_post(self, post: PostRef): 565 - self.log.info( 566 - "Deleting post %s (author: %s, service: %s)...", 567 - post.id, 568 - post.author, 569 - post.service, 570 - ) 571 563 db_post = self._get_post(post.service, post.author, post.id) 572 564 if not db_post: 573 - self.log.warning( 574 - "Post not found in DB: %s (author: %s, service: %s)", 575 - post.id, 576 - post.author, 577 - post.service, 578 - ) 565 + self.log.warning("Skipping delete '%s': post not found in db", post.id) 579 566 return 580 567 581 568 mappings = self._get_mappings(db_post["id"], self.url, self.did) 582 569 for mapping in mappings[::-1]: 583 - self.log.info("Deleting '%s'...", mapping["identifier"]) 584 570 self._client.delete_post(mapping["identifier"]) 585 571 self._delete_post_by_id(mapping["id"]) 586 - self.log.info("Post deleted successfully: %s", post.id) 572 + self.log.info("Deleted '%s'", post.id) 587 573 588 574 @override 589 575 def accept_repost(self, repost: PostRef, reposted: PostRef): 590 - self.log.info( 591 - "Accepting repost %s of %s (author: %s, service: %s)...", 592 - repost.id, 593 - reposted.id, 594 - repost.author, 595 - repost.service, 596 - ) 597 576 db_repost = self._get_post(repost.service, repost.author, repost.id) 598 577 db_reposted = self._get_post(reposted.service, reposted.author, reposted.id) 599 578 if not db_repost or not db_reposted: 600 - self.log.info("Post not found in db, skipping repost..") 579 + self.log.info("Skipping repost '%s': post not found in db") 601 580 return 602 581 603 582 mappings = self._get_mappings(db_reposted["id"], self.url, self.did) ··· 607 586 try: 608 587 cid = json.loads(mappings[0]["extra_data"])["cid"] 609 588 except (json.JSONDecodeError, AttributeError, KeyError): 610 - self.log.exception("Failed to parse CID from extra_data!") 589 + self.log.exception("Skipping '%s': failed to parse CID from extra_data") 611 590 return 612 591 613 592 response = self._client.repost(mappings[0]["identifier"], cid) ··· 632 611 633 612 @override 634 613 def delete_repost(self, repost: PostRef): 635 - self.log.info( 636 - "Deleting repost %s (author: %s, service: %s)...", 637 - repost.id, 638 - repost.author, 639 - repost.service, 640 - ) 641 614 db_repost = self._get_post(repost.service, repost.author, repost.id) 642 615 if not db_repost: 643 - self.log.warning( 644 - "Repost not found in DB: %s (author: %s, service: %s)", 645 - repost.id, 646 - repost.author, 647 - repost.service, 648 - ) 616 + self.log.warning("Skipping delete '%s': repost not found in db", repost.id) 649 617 return 650 618 651 619 mappings = self._get_mappings(db_repost["id"], self.url, self.did) 652 620 if mappings: 653 - self.log.info("Deleting '%s'...", mappings[0]["identifier"]) 654 621 self._client.delete_repost(mappings[0]["identifier"]) 655 622 self._delete_post_by_id(mappings[0]["id"]) 656 - self.log.info("Repost deleted successfully: %s", repost.id) 657 - else: 658 - self.log.error([mappings]) 623 + self.log.info("Deleted %s", repost.id)
+3 -3
main.py
··· 101 101 read_env(settings) 102 102 103 103 if "services" not in settings: 104 - raise KeyError("No `services` specified in settings!") 104 + raise KeyError("No 'services' specified in settings!") 105 105 106 106 service_pairs: list[tuple[Any, list[Any]]] = [] 107 107 for svc in settings["services"]: 108 108 if "input" not in svc: 109 - raise KeyError("Each service must have an `input` field!") 109 + raise KeyError("Each service must have an 'input' field!") 110 110 if "outputs" not in svc: 111 - raise KeyError("Each service must have an `outputs` field!") 111 + raise KeyError("Each service must have an 'outputs' field!") 112 112 113 113 inp = create_input_service(db_pool, http_client, svc["input"]) 114 114 outs = [
+22 -26
mastodon/input.py
··· 56 56 super().__init__(options.instance, db, http) 57 57 self.options: MastodonInputOptions = options 58 58 59 - self.log.info("Verifying %s credentails...", self.url) 59 + self.log.info("Verifying '%s' credentails...", self.url) 60 60 response = self.verify_credentials() 61 61 self.user_id: str = response["id"] 62 62 63 - self.log.info("Getting %s configuration...", self.url) 63 + self.log.info("Getting '%s' configuration...", self.url) 64 64 response = self.fetch_instance_info() 65 65 self.streaming_url: str = response["urls"]["streaming_api"] 66 66 ··· 69 69 return self.options.token 70 70 71 71 def _on_create_post(self, status: dict[str, Any]): 72 - self.log.info("Processing new post: %s", status["id"]) 73 - 74 72 if status["account"]["id"] != self.user_id: 75 73 return 76 74 77 75 if status["visibility"] not in self.options.allowed_visibility: 78 76 self.log.info( 79 - "Skipping post with disallowed visibility: %s (%s)", 77 + "Skipping '%s': disallowed visibility (%s)", 80 78 status["id"], 81 79 status["visibility"], 82 80 ) ··· 84 82 85 83 if self._is_post_crossposted(self.url, self.user_id, status["id"]): 86 84 self.log.info( 87 - "Skipping %s, already crossposted", 85 + "Skipping '%s': already crossposted", 88 86 status["id"], 89 87 ) 90 88 return ··· 97 95 return 98 96 99 97 if status.get("poll"): 100 - self.log.info("Skipping '%s'! Contains a poll..", status["id"]) 98 + self.log.info("Skipping '%s': polls not supported", status["id"]) 101 99 return 102 100 103 101 quote: dict[str, Any] | None = status.get("quote") ··· 109 107 rquote = self._get_post(self.url, self.user_id, quote["id"]) 110 108 if not rquote: 111 109 self.log.info( 112 - "Skipping %s, parent %s not found in db", status["id"], quote["id"] 110 + "Skipping '%s': quoted post '%s' not found in db", 111 + status["id"], 112 + quote["id"], 113 113 ) 114 114 return 115 115 ··· 123 123 parent = self._get_post(self.url, self.user_id, in_reply) 124 124 if not parent: 125 125 self.log.info( 126 - "Skipping %s, parent %s not found in db", status["id"], in_reply 126 + "Skipping '%s': parent '%s' not found in db", status["id"], in_reply 127 127 ) 128 128 return 129 129 parser = StatusParser(status) ··· 154 154 155 155 blobs: list[Blob] = [] 156 156 for media in status.get("media_attachments", []): 157 - self.log.info("Downloading %s...", media["url"]) 157 + self.log.info("Downloading '%s'...", media["url"]) 158 158 blob: Blob | None = download_blob( 159 159 media["url"], media.get("alt"), client=self.http 160 160 ) 161 161 if not blob: 162 162 self.log.error( 163 - "Skipping %s! Failed to download media %s.", 163 + "Skipping '%s': failed to download attachment '%s'", 164 164 status["id"], 165 165 media["url"], 166 166 ) ··· 189 189 } 190 190 ) 191 191 192 - self.log.info("Post stored in DB: %s", status["id"]) 193 - 192 + self.log.info("Crossposting: '%s'", status["id"]) 194 193 for out in self.outputs: 195 194 self.submitter(lambda: out.accept_post(post)) 196 195 197 196 def _on_reblog(self, status: dict[str, Any], reblog: dict[str, Any]): 198 - self.log.info("Processing reblog: %s", status["id"]) 199 197 reposted = self._get_post(self.url, self.user_id, reblog["id"]) 200 198 if not reposted: 201 199 self.log.info( 202 - "Skipping repost '%s' as reposted post '%s' was not found in the db.", 200 + "Skipping repost '%s': reposted post '%s' not found in db", 203 201 status["id"], 204 202 reblog["id"], 205 203 ) ··· 213 211 "reposted": reposted["id"], 214 212 } 215 213 ) 216 - 217 - self.log.info("Reblog stored in DB: %s", status["id"]) 218 214 219 215 repost_ref = PostRef(id=status["id"], author=self.user_id, service=self.url) 220 216 reposted_ref = PostRef(id=reblog["id"], author=self.user_id, service=self.url) 217 + 218 + self.log.info("Crossposting: '%s'", status["id"]) 221 219 for out in self.outputs: 222 220 self.submitter(lambda: out.accept_repost(repost_ref, reposted_ref)) 223 221 224 222 def _on_delete_post(self, status_id: str): 225 - self.log.info("Processing delete for %s...", status_id) 226 223 post = self._get_post(self.url, self.user_id, status_id) 227 224 if not post: 228 - self.log.warning("Post not found in DB: %s", status_id) 225 + self.log.warning("Skipping delete '%s': post not found in db", status_id) 229 226 return 230 227 231 228 post_ref = PostRef(id=status_id, author=self.user_id, service=self.url) 232 229 if post["reposted"]: 233 - self.log.info("Deleting repost: %s", status_id) 230 + self.log.info("Deleting repost: '%s'", status_id) 234 231 for output in self.outputs: 235 232 self.submitter(lambda: output.delete_repost(post_ref)) 236 233 else: 237 - self.log.info("Deleting post: %s", status_id) 234 + self.log.info("Deleting post: '%s'", status_id) 238 235 for output in self.outputs: 239 236 self.submitter(lambda: output.delete_post(post_ref)) 240 237 self.submitter(lambda: self._delete_post_by_id(post["id"])) 241 - self.log.info("Delete processed successfully for %s", status_id) 242 238 243 239 def _accept_msg(self, msg: websockets.Data) -> None: 244 240 data: dict[str, Any] = cast(dict[str, Any], json.loads(msg)) ··· 262 258 close_timeout=5, 263 259 ): 264 260 try: 265 - self.log.info("Listening to %s...", self.streaming_url) 261 + self.log.info("Listening to '%s'...", self.streaming_url) 266 262 267 263 async def listen_for_messages(): 268 264 async for msg in ws: ··· 273 269 _ = await asyncio.gather(listen) 274 270 except websockets.ConnectionClosedError as e: 275 271 self.log.error(e, stack_info=True, exc_info=True) 276 - self.log.info("Reconnecting to %s...", self.streaming_url) 272 + self.log.info("Reconnecting to '%s'...", self.streaming_url) 277 273 continue 278 274 except TimeoutError as e: 279 - self.log.error("Connection timeout: %s", e) 280 - self.log.info("Reconnecting to %s...", self.streaming_url) 275 + self.log.error("Connection timeout: '%s'", e) 276 + self.log.info("Reconnecting to '%s'...", self.streaming_url) 281 277 continue
+37 -93
mastodon/output.py
··· 39 39 "visibility" in data 40 40 and data["visibility"] not in ALLOWED_POSTING_VISIBILITY 41 41 ): 42 - raise ValueError(f"Invalid visibility option {data['visibility']}!") 42 + raise ValueError(f"Invalid visibility option '{data['visibility']}'!") 43 43 44 44 return MastodonOutputOptions(**data) 45 45 ··· 57 57 super().__init__(options.instance, db, http) 58 58 self.options: MastodonOutputOptions = options 59 59 60 - self.log.info("Verifying %s credentails...", self.url) 60 + self.log.info("Verifying '%s' credentails...", self.url) 61 61 response = self.verify_credentials() 62 62 self.user_id: str = response["id"] 63 63 64 - self.log.info("Getting %s configuration...", self.url) 64 + self.log.info("Getting '%s' configuration...", self.url) 65 65 response = self.fetch_instance_info() 66 66 self.instance_info: InstanceInfo = InstanceInfo.from_api(response) 67 67 ··· 209 209 210 210 if response.status_code == 200: 211 211 self.log.info( 212 - "Uploaded %s! (%s)", blob.name or "unknown", response.json()["id"] 212 + "Uploaded '%s'! (%s)", blob.name or "unknown", response.json()["id"] 213 213 ) 214 214 uploads.append( 215 215 MediaUploadResult(id=response.json()["id"], processed=True) 216 216 ) 217 217 elif response.status_code == 202: 218 - self.log.info("Waiting for %s to process!", blob.name or "unknown") 218 + self.log.info("Waiting for '%s' to process...", blob.name or "unknown") 219 219 uploads.append( 220 220 MediaUploadResult(id=response.json()["id"], processed=False) 221 221 ) 222 222 else: 223 223 self.log.error( 224 - "Failed to upload %s! %s", 224 + "Failed to upload '%s'! %s", 225 225 blob.name or "unknown", 226 226 response.text, 227 227 ) ··· 248 248 249 249 @override 250 250 def accept_post(self, post: Post): 251 - self.log.info( 252 - "Accepting post %s (author: %s, service: %s)...", 253 - post.id, 254 - post.author, 255 - post.service, 256 - ) 251 + db_post = self._get_post(post.service, post.author, post.id) 252 + if not db_post: 253 + self.log.error("Skipping '%s': post not found in db") 254 + return 255 + 257 256 new_root_id: int | None = None 258 257 new_parent_id: int | None = None 259 258 ··· 263 262 post.parent_id, post.service, post.author, self.url, self.user_id 264 263 ) 265 264 if not thread: 266 - self.log.error("Failed to find thread tuple in the database!") 265 + self.log.error("Skipping '%s': parent thread tuple not found in db") 267 266 return 268 267 _, reply_ref, new_root_id, new_parent_id = thread 269 268 ··· 271 270 quote = post.attachments.get(QuoteAttachment) 272 271 if quote: 273 272 if quote.quoted_user != post.author: 274 - self.log.info("Quoted other user, skipping!") 273 + self.log.info("Skipping '%s': quote of other user") 275 274 return 276 275 277 276 quoted_post = self._get_post(post.service, post.author, quote.quoted_id) 278 277 if not quoted_post: 279 - self.log.error("Failed to find quoted post in the database!") 278 + self.log.error("Skipping '%s': quoted post not found in db") 280 279 return 281 280 282 281 quoted_mappings = self._get_mappings( 283 282 quoted_post["id"], self.url, self.user_id 284 283 ) 285 284 if not quoted_mappings: 286 - self.log.error("Failed to find mappings for quoted post!") 285 + self.log.error( 286 + "Skipping '%s': mappings for quoted post not found in db" 287 + ) 287 288 return 288 289 289 290 quoted_status_id = quoted_mappings[-1]["identifier"] ··· 315 316 316 317 raw_statuses = self._split_tokens_and_media(post_tokens, media_blobs) 317 318 if not raw_statuses: 318 - self.log.error("Failed to split post into statuses!") 319 + self.log.error("Skipping '%s': couldn't split post into statuses") 319 320 return 320 321 321 322 baked_statuses: list[tuple[str, list[str] | None]] = [] ··· 324 325 if raw_media: 325 326 media_ids = self._upload_media(raw_media) 326 327 if not media_ids: 327 - self.log.error("Failed to upload attachments!") 328 + self.log.error("Skipping '%s': failed to upload attachments") 328 329 return 329 330 baked_statuses.append((status_text, media_ids)) 330 331 ··· 360 361 }, 361 362 json=payload, 362 363 ) 363 - 364 - if response.status_code != 200: 365 - self.log.error( 366 - "Failed to post status! %s - %s", 367 - response.status_code, 368 - response.text, 369 - ) 370 - response.raise_for_status() 364 + response.raise_for_status() 371 365 372 366 status_id = response.json()["id"] 373 - self.log.info("Created new status %s!", status_id) 374 367 created_statuses.append(status_id) 375 368 376 369 if i == 0: 377 370 reply_ref = status_id 378 371 379 - db_post = self._get_post(post.service, post.author, post.id) 380 - if not db_post: 381 - self.log.error("Post not found in database!") 382 - return 383 - 384 372 if new_root_id is None or new_parent_id is None: 385 373 self._insert_post( 386 374 { ··· 425 413 426 414 @override 427 415 def delete_post(self, post: PostRef): 428 - self.log.info( 429 - "Deleting post %s (author: %s, service: %s)...", 430 - post.id, 431 - post.author, 432 - post.service, 433 - ) 434 416 db_post = self._get_post(post.service, post.author, post.id) 435 417 if not db_post: 436 - self.log.warning( 437 - "Post not found in DB: %s (author: %s, service: %s)", 438 - post.id, 439 - post.author, 440 - post.service, 441 - ) 418 + self.log.warning("Skipping delete '%s': post not found in db: %s", post.id) 442 419 return 443 420 444 421 mappings = self._get_mappings(db_post["id"], self.url, self.user_id) 445 422 446 423 for mapping in mappings[::-1]: 447 - self.log.info("Deleting '%s'...", mapping["identifier"]) 448 - self.http.delete( 424 + response = self.http.delete( 449 425 f"{self.url}/api/v1/statuses/{mapping['identifier']}", 450 426 headers={"Authorization": f"Bearer {self._get_token()}"}, 451 427 ) 428 + response.raise_for_status() 429 + 452 430 self._delete_post_by_id(mapping["id"]) 453 - self.log.info("Post deleted successfully: %s", post.id) 431 + self.log.info("Deleted '%s'", mapping["identifier"]) 454 432 455 433 @override 456 434 def accept_repost(self, repost: PostRef, reposted: PostRef): 457 - self.log.info( 458 - "Accepting repost %s of %s (author: %s, service: %s)...", 459 - repost.id, 460 - reposted.id, 461 - repost.author, 462 - repost.service, 463 - ) 464 435 original = self._get_post(reposted.service, reposted.author, reposted.id) 465 436 if not original: 466 - self.log.info("Post not found in db, skipping repost..") 437 + self.log.info("Skipping repost '%s': reposted post not found in db") 467 438 return 468 439 469 440 mappings = self._get_mappings(original["id"], self.url, self.user_id) 470 441 if not mappings: 471 - self.log.error("No mappings found for reposted post!") 442 + self.log.error("Skipping repost '%s': no mappings found for reposted post") 472 443 return 473 444 474 445 response = self.http.post( 475 446 f"{self.url}/api/v1/statuses/{mappings[0]['identifier']}/reblog", 476 447 headers={"Authorization": f"Bearer {self._get_token()}"}, 477 448 ) 478 - 479 - if response.status_code != 200: 480 - self.log.error( 481 - "Failed to boost status! status_code: %s, msg: %s", 482 - response.status_code, 483 - response.content, 484 - ) 485 - return 449 + response.raise_for_status() 486 450 487 451 self._insert_post( 488 452 { ··· 502 466 503 467 original_repost = self._get_post(repost.service, repost.author, repost.id) 504 468 if not original_repost: 505 - self.log.error("original repost not found in DB: %s", repost.id) 469 + self.log.error( 470 + "Skipping repost '%s': repost not found in db: %s", repost.id 471 + ) 506 472 return 507 473 508 474 self._insert_post_mapping(original_repost["id"], inserted["id"]) ··· 510 476 511 477 @override 512 478 def delete_repost(self, repost: PostRef): 513 - self.log.info( 514 - "Deleting repost %s (author: %s, service: %s)...", 515 - repost.id, 516 - repost.author, 517 - repost.service, 518 - ) 519 479 db_repost = self._get_post(repost.service, repost.author, repost.id) 520 480 if not db_repost: 521 - self.log.warning( 522 - "Repost not found in DB: %s (author: %s, service: %s)", 523 - repost.id, 524 - repost.author, 525 - repost.service, 526 - ) 481 + self.log.warning("Skipping delete '%s': repost not found in db", repost.id) 527 482 return 528 483 529 484 mappings = self._get_mappings(db_repost["id"], self.url, self.user_id) 530 485 rmappings = self._get_mappings(db_repost["reposted"], self.url, self.user_id) 531 486 532 487 if not mappings: 533 - self.log.warning("No mappings found for repost %s", repost.id) 488 + self.log.warning( 489 + "Skipping delete '%s': no mappings found for repost", repost.id 490 + ) 534 491 return 535 492 if not rmappings: 536 493 self.log.warning( 537 - "No mappings found for original post %s (reposted_id=%s)", 494 + "Skipping delete '%s': no mappings found for post", 538 495 repost.id, 539 496 db_repost["reposted"], 540 497 ) 541 498 return 542 499 543 - self.log.info( 544 - "Removing '%s' Repost of '%s'...", 545 - mappings[0]["identifier"], 546 - rmappings[0]["identifier"], 547 - ) 548 - 549 500 response = self.http.post( 550 501 f"{self.url}/api/v1/statuses/{rmappings[0]['identifier']}/unreblog", 551 502 headers={"Authorization": f"Bearer {self._get_token()}"}, 552 503 ) 553 - 554 - if response.status_code != 200: 555 - self.log.error( 556 - "Failed to unreblog! status_code: %s, msg: %s", 557 - response.status_code, 558 - response.text, 559 - ) 560 - return 504 + response.raise_for_status() 561 505 562 506 self._delete_post_by_id(mappings[0]["id"]) 563 507 self.log.info("Repost deleted successfully: %s", repost.id)
+21 -21
misskey/input.py
··· 58 58 super().__init__(options.instance, db, http) 59 59 self.options: MisskeyInputOptions = options 60 60 61 - self.log.info("Verifying %s credentails...", self.url) 61 + self.log.info("Verifying '%s' credentails...", self.url) 62 62 response = self.verify_credentials() 63 63 self.user_id: str = response["id"] 64 64 ··· 67 67 return self.options.token 68 68 69 69 def _on_note(self, note: dict[str, Any]): 70 - self.log.info("Processing new note: %s", note["id"]) 71 - 72 70 if note["userId"] != self.user_id: 73 71 return 74 72 75 73 if note["visibility"] not in self.options.allowed_visibility: 76 74 self.log.info( 77 - "Skipping note with disallowed visibility: %s (%s)", 75 + "Skipping '%s': disallowed visibility (%s)", 78 76 note["id"], 79 77 note["visibility"], 80 78 ) ··· 82 80 83 81 if self._is_post_crossposted(self.url, self.user_id, note["id"]): 84 82 self.log.info( 85 - "Skipping %s, already crossposted", 83 + "Skipping '%s': already crossposted", 86 84 note["id"], 87 85 ) 88 86 return 89 87 90 88 if note.get("poll"): 91 - self.log.info("Skipping '%s'! Contains a poll..", note["id"]) 89 + self.log.info("Skipping '%s': polls not supported", note["id"]) 92 90 return 93 91 94 92 renote: dict[str, Any] | None = note.get("renote") ··· 103 101 rrenote = self._get_post(self.url, self.user_id, renote["id"]) 104 102 if not rrenote: 105 103 self.log.info( 106 - "Skipping %s, quote %s not found in db", note["id"], renote["id"] 104 + "Skipping '%s': quoted post '%s' not found in db", 105 + note["id"], 106 + renote["id"], 107 107 ) 108 108 return 109 109 110 110 reply: dict[str, Any] | None = note.get("reply") 111 111 if reply and reply.get("userId") != self.user_id: 112 - self.log.info("Skipping '%s'! Reply to other user..", note["id"]) 112 + self.log.info("Skipping '%s': Reply to other user..", note["id"]) 113 113 return 114 114 115 115 parent = None ··· 117 117 parent = self._get_post(self.url, self.user_id, reply["id"]) 118 118 if not parent: 119 119 self.log.info( 120 - "Skipping %s, parent %s not found in db", note["id"], reply["id"] 120 + "Skipping '%s': parent '%s' not found in db", 121 + note["id"], 122 + reply["id"], 121 123 ) 122 124 return 123 125 ··· 151 153 152 154 blobs: list[Blob] = [] 153 155 for media in note.get("files", []): 154 - self.log.info("Downloading %s...", media["url"]) 156 + self.log.info("Downloading '%s'...", media["url"]) 155 157 blob: Blob | None = download_blob( 156 158 media["url"], media.get("comment", ""), client=self.http 157 159 ) 158 160 if not blob: 159 161 self.log.error( 160 - "Skipping %s! Failed to download media %s.", 162 + "Skipping '%s': failed to download media '%s'.", 161 163 note["id"], 162 164 media["url"], 163 165 ) ··· 186 188 } 187 189 ) 188 190 189 - self.log.info("Note stored in DB: %s", note["id"]) 190 - 191 + self.log.info("Crossposting: '%s'", note["id"]) 191 192 for out in self.outputs: 192 193 self.submitter(lambda: out.accept_post(post)) 193 194 194 195 def _on_renote(self, note: dict[str, Any], renote: dict[str, Any]): 195 - self.log.info("Processing renote: %s", note["id"]) 196 196 reposted = self._get_post(self.url, self.user_id, renote["id"]) 197 197 if not reposted: 198 198 self.log.info( 199 - "Skipping repost '%s' as reposted post '%s' was not found in the db.", 199 + "Skipping repost '%s': reposted post '%s' not found in db", 200 200 note["id"], 201 201 renote["id"], 202 202 ) ··· 210 210 "reposted": reposted["id"], 211 211 } 212 212 ) 213 - 214 - self.log.info("Renote stored in DB: %s", note["id"]) 215 213 216 214 repost_ref = PostRef(id=note["id"], author=self.user_id, service=self.url) 217 215 reposted_ref = PostRef(id=renote["id"], author=self.user_id, service=self.url) 216 + 217 + self.log.info("Crossposting: '%s'", note["id"]) 218 218 for out in self.outputs: 219 219 self.submitter(lambda: out.accept_repost(repost_ref, reposted_ref)) 220 220 ··· 250 250 close_timeout=5, 251 251 ): 252 252 try: 253 - self.log.info("Listening to %s...", streaming) 253 + self.log.info("Listening to '%s'...", streaming) 254 254 await self._subscribe_to_home(ws) 255 255 256 256 async def listen_for_messages(): ··· 262 262 _ = await asyncio.gather(listen) 263 263 except websockets.ConnectionClosedError as e: 264 264 self.log.error(e, stack_info=True, exc_info=True) 265 - self.log.info("Reconnecting to %s...", streaming) 265 + self.log.info("Reconnecting to '%s'...", streaming) 266 266 continue 267 267 except TimeoutError as e: 268 - self.log.error("Connection timeout: %s", e) 269 - self.log.info("Reconnecting to %s...", streaming) 268 + self.log.error("Connection timeout: '%s'", e) 269 + self.log.info("Reconnecting to '%s'...", streaming) 270 270 continue
+2 -2
registry.py
··· 19 19 db: DatabasePool, http: httpx.Client, data: dict[str, Any] 20 20 ) -> InputService: 21 21 if "type" not in data: 22 - raise ValueError("No `type` field in input data!") 22 + raise ValueError("No 'type' field in input data!") 23 23 type: str = str(data["type"]) 24 24 del data["type"] 25 25 ··· 33 33 db: DatabasePool, http: httpx.Client, data: dict[str, Any] 34 34 ) -> OutputService: 35 35 if "type" not in data: 36 - raise ValueError("No `type` field in input data!") 36 + raise ValueError("No 'type' field in input data!") 37 37 type: str = str(data["type"]) 38 38 del data["type"] 39 39
+2 -1
util/util.py
··· 17 17 18 18 def normalize_service_url(url: str) -> str: 19 19 if not url.startswith("https://") and not url.startswith("http://"): 20 - raise ValueError(f"Invalid service url {url}! Only http/https are supported.") 20 + raise ValueError(f"Invalid service url {url}! Must start with http/https!") 21 21 22 22 return url[:-1] if url.endswith("/") else url 23 + 23 24 24 25 def _read_env(data: Any) -> None: 25 26 match data: