tangled
alpha
login
or
join now
zenfyr.dev
/
xpost
2
fork
atom
social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
2
fork
atom
overview
issues
1
pulls
pipelines
add bsky output skeleton
zenfyr.dev
1 month ago
93e2afea
19170b82
verified
This commit was signed with the committer's
known signature
.
zenfyr.dev
SSH Key Fingerprint:
SHA256:TtcIcnTnoAB5mqHofsaOxIgiMzfVBxej1AXT7DQdrTE=
+53
2 changed files
expand all
collapse all
unified
split
bluesky
output.py
registry_bootstrap.py
+50
bluesky/output.py
···
1
1
+
from dataclasses import dataclass
2
2
+
from typing import Any, override
3
3
+
4
4
+
from cross.post import Post, PostRef
5
5
+
from cross.service import OutputService
6
6
+
from database.connection import DatabasePool
7
7
+
from bluesky.info import SERVICE, BlueskyService, validate_and_transform
8
8
+
9
9
+
10
10
+
@dataclass(kw_only=True)
11
11
+
class BlueskyOutputOptions:
12
12
+
handle: str | None = None
13
13
+
did: str | None = None
14
14
+
pds: str | None = None
15
15
+
16
16
+
@classmethod
17
17
+
def from_dict(cls, data: dict[str, Any]) -> "BlueskyOutputOptions":
18
18
+
validate_and_transform(data)
19
19
+
return BlueskyOutputOptions(**data)
20
20
+
21
21
+
22
22
+
class BlueskyOutputService(BlueskyService, OutputService):
23
23
+
def __init__(self, db: DatabasePool, options: BlueskyOutputOptions) -> None:
24
24
+
super().__init__(SERVICE, db)
25
25
+
self.options: BlueskyOutputOptions = options
26
26
+
self._init_identity()
27
27
+
28
28
+
@override
29
29
+
def get_identity_options(self) -> tuple[str | None, str | None, str | None]:
30
30
+
return (self.options.handle, self.options.did, self.options.pds)
31
31
+
32
32
+
@override
33
33
+
def accept_post(self, post: Post):
34
34
+
# TODO: post creation
35
35
+
self.log.warning("NOT IMPLEMENTED: accept_post %s", post.id)
36
36
+
37
37
+
@override
38
38
+
def delete_post(self, post: PostRef):
39
39
+
# TODO: post deletion
40
40
+
self.log.warning("NOT IMPLEMENTED: delete_post %s", post.id)
41
41
+
42
42
+
@override
43
43
+
def accept_repost(self, repost: PostRef, reposted: PostRef):
44
44
+
# TODO: reposts
45
45
+
self.log.warning("NOT IMPLEMENTED: accept_repost %s of %s", repost.id, reposted.id)
46
46
+
47
47
+
@override
48
48
+
def delete_repost(self, repost: PostRef):
49
49
+
# TODO: unreposts
50
50
+
self.log.warning("NOT IMPLEMENTED: delete_repost %s", repost.id)
+3
registry_bootstrap.py
···
31
31
output_factories['stderr'] = LazyFactory(
32
32
"util.dummy", "StderrOutputService", "DummyOptions"
33
33
)
34
34
+
output_factories['bluesky'] = LazyFactory(
35
35
+
"bluesky.output", "BlueskyOutputService", "BlueskyOutputOptions"
36
36
+
)