tangled
alpha
login
or
join now
eldridge.cam
/
cartography
0
fork
atom
Trading card city builder game?
0
fork
atom
overview
issues
pulls
pipelines
add endpoint for open pack
eldridge.cam
1 month ago
78a25e03
ad8d57d8
verified
This commit was signed with the committer's
known signature
.
eldridge.cam
SSH Key Fingerprint:
SHA256:MAgO4sya2MgvdgUjSGKAO0lQ9X2HQp1Jb+x/Tpeeims=
+441
-28
16 changed files
expand all
collapse all
unified
split
.sqlx
query-466970ff1fa5231a54281d75f8ec579655bf3d7c3a623c135017dede6d58dd9c.json
query-7ef26ea823ac7cf3b7a3b4f38f8bfc2e31de4a0b0530a6504c419556cdb91870.json
query-8b46ab91d3b7a3d5c498c37218cf4e2ba467a5ad2f4b26e3d6e1399aa81be5cd.json
query-a960d5aeb0ebb03cc407ce092b002bb7c339d2b038e217e8d795795f2f92f7ab.json
query-c0eab268baac021bdf7600aa1a88e14c85a1062b71ddd94d16b0e60d4b74b128.json
packages
cartography
src
actor
mod.rs
api
errors
mod.rs
pack_not_found_error.rs
operations
list_fields.rs
list_packs.rs
mod.rs
open_pack.rs
pull_banner.rs
app.rs
bus.rs
test.rs
+26
.sqlx/query-466970ff1fa5231a54281d75f8ec579655bf3d7c3a623c135017dede6d58dd9c.json
···
1
1
+
{
2
2
+
"db_name": "PostgreSQL",
3
3
+
"query": "SELECT id, opened_at FROM packs WHERE account_id = 'foxfriends' AND opened_at IS NOT NULL",
4
4
+
"describe": {
5
5
+
"columns": [
6
6
+
{
7
7
+
"ordinal": 0,
8
8
+
"name": "id",
9
9
+
"type_info": "Int8"
10
10
+
},
11
11
+
{
12
12
+
"ordinal": 1,
13
13
+
"name": "opened_at",
14
14
+
"type_info": "Timestamptz"
15
15
+
}
16
16
+
],
17
17
+
"parameters": {
18
18
+
"Left": []
19
19
+
},
20
20
+
"nullable": [
21
21
+
false,
22
22
+
true
23
23
+
]
24
24
+
},
25
25
+
"hash": "466970ff1fa5231a54281d75f8ec579655bf3d7c3a623c135017dede6d58dd9c"
26
26
+
}
+20
.sqlx/query-7ef26ea823ac7cf3b7a3b4f38f8bfc2e31de4a0b0530a6504c419556cdb91870.json
···
1
1
+
{
2
2
+
"db_name": "PostgreSQL",
3
3
+
"query": "SELECT id FROM packs WHERE account_id = 'foxfriends' AND opened_at IS NULL",
4
4
+
"describe": {
5
5
+
"columns": [
6
6
+
{
7
7
+
"ordinal": 0,
8
8
+
"name": "id",
9
9
+
"type_info": "Int8"
10
10
+
}
11
11
+
],
12
12
+
"parameters": {
13
13
+
"Left": []
14
14
+
},
15
15
+
"nullable": [
16
16
+
false
17
17
+
]
18
18
+
},
19
19
+
"hash": "7ef26ea823ac7cf3b7a3b4f38f8bfc2e31de4a0b0530a6504c419556cdb91870"
20
20
+
}
+28
.sqlx/query-8b46ab91d3b7a3d5c498c37218cf4e2ba467a5ad2f4b26e3d6e1399aa81be5cd.json
···
1
1
+
{
2
2
+
"db_name": "PostgreSQL",
3
3
+
"query": "\n SELECT id, card_type_id\n FROM cards\n INNER JOIN pack_contents ON pack_contents.card_id = cards.id\n WHERE pack_contents.pack_id = $1\n ",
4
4
+
"describe": {
5
5
+
"columns": [
6
6
+
{
7
7
+
"ordinal": 0,
8
8
+
"name": "id",
9
9
+
"type_info": "Int8"
10
10
+
},
11
11
+
{
12
12
+
"ordinal": 1,
13
13
+
"name": "card_type_id",
14
14
+
"type_info": "Text"
15
15
+
}
16
16
+
],
17
17
+
"parameters": {
18
18
+
"Left": [
19
19
+
"Int8"
20
20
+
]
21
21
+
},
22
22
+
"nullable": [
23
23
+
false,
24
24
+
false
25
25
+
]
26
26
+
},
27
27
+
"hash": "8b46ab91d3b7a3d5c498c37218cf4e2ba467a5ad2f4b26e3d6e1399aa81be5cd"
28
28
+
}
+22
.sqlx/query-a960d5aeb0ebb03cc407ce092b002bb7c339d2b038e217e8d795795f2f92f7ab.json
···
1
1
+
{
2
2
+
"db_name": "PostgreSQL",
3
3
+
"query": "UPDATE packs SET opened_at = now() WHERE id = $1 RETURNING opened_at",
4
4
+
"describe": {
5
5
+
"columns": [
6
6
+
{
7
7
+
"ordinal": 0,
8
8
+
"name": "opened_at",
9
9
+
"type_info": "Timestamptz"
10
10
+
}
11
11
+
],
12
12
+
"parameters": {
13
13
+
"Left": [
14
14
+
"Int8"
15
15
+
]
16
16
+
},
17
17
+
"nullable": [
18
18
+
true
19
19
+
]
20
20
+
},
21
21
+
"hash": "a960d5aeb0ebb03cc407ce092b002bb7c339d2b038e217e8d795795f2f92f7ab"
22
22
+
}
+40
.sqlx/query-c0eab268baac021bdf7600aa1a88e14c85a1062b71ddd94d16b0e60d4b74b128.json
···
1
1
+
{
2
2
+
"db_name": "PostgreSQL",
3
3
+
"query": "\n SELECT packs.id, packs.pack_banner_id, packs.opened_at\n FROM packs\n WHERE id = $1 AND account_id = $2\n ",
4
4
+
"describe": {
5
5
+
"columns": [
6
6
+
{
7
7
+
"ordinal": 0,
8
8
+
"name": "id",
9
9
+
"type_info": "Int8"
10
10
+
},
11
11
+
{
12
12
+
"ordinal": 1,
13
13
+
"name": "pack_banner_id",
14
14
+
"type_info": "Text"
15
15
+
},
16
16
+
{
17
17
+
"ordinal": 2,
18
18
+
"name": "opened_at",
19
19
+
"type_info": "Timestamptz"
20
20
+
}
21
21
+
],
22
22
+
"parameters": {
23
23
+
"Left": [
24
24
+
"Int8",
25
25
+
{
26
26
+
"Custom": {
27
27
+
"name": "citext",
28
28
+
"kind": "Simple"
29
29
+
}
30
30
+
}
31
31
+
]
32
32
+
},
33
33
+
"nullable": [
34
34
+
false,
35
35
+
false,
36
36
+
true
37
37
+
]
38
38
+
},
39
39
+
"hash": "c0eab268baac021bdf7600aa1a88e14c85a1062b71ddd94d16b0e60d4b74b128"
40
40
+
}
+8
packages/cartography/src/actor/mod.rs
···
1
1
pub mod field_state;
2
2
pub mod player_socket;
3
3
4
4
+
#[derive(Clone, Copy, Debug)]
4
5
pub struct Unsubscribe;
6
6
+
7
7
+
#[derive(Clone, Debug)]
8
8
+
#[expect(dead_code, reason = "stub for later")]
9
9
+
pub struct AddCardToDeck {
10
10
+
pub account_id: String,
11
11
+
pub card_id: i64,
12
12
+
}
+2
packages/cartography/src/api/errors/mod.rs
···
7
7
mod banner_not_found_error;
8
8
mod forbidden_error;
9
9
mod internal_server_error;
10
10
+
mod pack_not_found_error;
10
11
mod unauthorized_error;
11
12
12
13
pub use banner_not_found_error::BannerNotFoundError;
13
14
pub use forbidden_error::ForbiddenError;
14
15
#[allow(unused_imports)]
15
16
pub(crate) use internal_server_error::{internal_server_error, respond_internal_server_error};
17
17
+
pub use pack_not_found_error::PackNotFoundError;
16
18
pub use unauthorized_error::UnauthorizedError;
17
19
18
20
pub trait ApiError: Error {
+16
packages/cartography/src/api/errors/pack_not_found_error.rs
···
1
1
+
use super::ApiError;
2
2
+
use axum::http::StatusCode;
3
3
+
4
4
+
#[derive(Debug, derive_more::Display, derive_more::Error)]
5
5
+
#[display("a pack with id {pack_id} was not found")]
6
6
+
pub struct PackNotFoundError {
7
7
+
pub pack_id: i64,
8
8
+
}
9
9
+
10
10
+
impl ApiError for PackNotFoundError {
11
11
+
const STATUS: StatusCode = StatusCode::NOT_FOUND;
12
12
+
const CODE: &str = "PackNotFound";
13
13
+
type Detail = ();
14
14
+
15
15
+
fn detail(&self) -> Self::Detail {}
16
16
+
}
+2
-2
packages/cartography/src/api/operations/list_fields.rs
···
23
23
)
24
24
)]
25
25
pub async fn list_fields(
26
26
-
db: axum::Extension<sqlx::PgPool>,
27
27
-
Extension(authorization): Extension<Authorization>,
26
26
+
db: Extension<sqlx::PgPool>,
27
27
+
authorization: Extension<Authorization>,
28
28
Path(account_id): Path<AccountIdOrMe>,
29
29
) -> axum::response::Result<Json<ListFieldsResponse>> {
30
30
let account_id = authorization.resolve_account_id(&account_id)?;
+6
-6
packages/cartography/src/api/operations/list_packs.rs
···
44
44
request_body = Option<ListPacksRequest>,
45
45
security(("trust" = [])),
46
46
responses(
47
47
-
(status = OK, description = "Successfully listed all fields.", body = ListPacksResponse),
47
47
+
(status = OK, description = "Successfully listed all packs.", body = ListPacksResponse),
48
48
),
49
49
params(
50
50
-
("player_id" = AccountIdOrMe, Path, description = "The ID of the player whose fields to list.")
50
50
+
("player_id" = AccountIdOrMe, Path, description = "The ID of the player whose packs to list.")
51
51
)
52
52
)]
53
53
pub async fn list_packs(
54
54
-
db: axum::Extension<sqlx::PgPool>,
55
55
-
Extension(authorization): Extension<Authorization>,
54
54
+
db: Extension<sqlx::PgPool>,
55
55
+
authorization: Extension<Authorization>,
56
56
Path(account_id): Path<AccountIdOrMe>,
57
57
request: Option<Json<ListPacksRequest>>,
58
58
) -> axum::response::Result<Json<ListPacksResponse>> {
···
82
82
83
83
#[cfg(test)]
84
84
mod tests {
85
85
-
use crate::{api::operations::PackStatus, test::prelude::*};
85
85
+
use crate::test::prelude::*;
86
86
use axum::http::{Request, StatusCode};
87
87
use sqlx::PgPool;
88
88
89
89
-
use super::{ListPacksRequest, ListPacksResponse};
89
89
+
use super::{ListPacksRequest, ListPacksResponse, PackStatus};
90
90
91
91
#[sqlx::test(
92
92
migrator = "MIGRATOR",
+2
packages/cartography/src/api/operations/mod.rs
···
12
12
pub use list_fields::*;
13
13
14
14
mod list_packs;
15
15
+
mod open_pack;
15
16
pub use list_packs::*;
17
17
+
pub use open_pack::*;
+184
packages/cartography/src/api/operations/open_pack.rs
···
1
1
+
use crate::actor::AddCardToDeck;
2
2
+
use crate::api::errors::{JsonError, PackNotFoundError};
3
3
+
use crate::api::{errors::internal_server_error, middleware::authorization::Authorization};
4
4
+
use crate::bus::{Bus, BusExt};
5
5
+
use crate::dto::*;
6
6
+
use axum::{Extension, Json, extract::Path};
7
7
+
use kameo::actor::ActorRef;
8
8
+
9
9
+
#[derive(serde::Serialize, utoipa::ToSchema)]
10
10
+
#[cfg_attr(test, derive(serde::Deserialize))]
11
11
+
pub struct OpenPackResponse {
12
12
+
pack: Pack,
13
13
+
pack_cards: Vec<Card>,
14
14
+
}
15
15
+
16
16
+
#[utoipa::path(
17
17
+
post,
18
18
+
path = "/api/v1/packs/{pack_id}/open",
19
19
+
description = "Open a pack. This request is idempotent: opening a pack a second time does nothing, but returns the pack.",
20
20
+
tag = "Game",
21
21
+
security(("trust" = [])),
22
22
+
responses(
23
23
+
(status = OK, description = "Successfully opened pack.", body = OpenPackResponse),
24
24
+
),
25
25
+
params(
26
26
+
("pack_id" = i64, Path, description = "The ID of the pack to open.")
27
27
+
)
28
28
+
)]
29
29
+
pub async fn open_pack(
30
30
+
db: Extension<sqlx::PgPool>,
31
31
+
authorization: Extension<Authorization>,
32
32
+
bus: Extension<ActorRef<Bus>>,
33
33
+
Path(pack_id): Path<i64>,
34
34
+
) -> axum::response::Result<Json<OpenPackResponse>> {
35
35
+
let account_id = authorization.authorized_account_id()?;
36
36
+
let mut conn = db.begin().await.map_err(internal_server_error)?;
37
37
+
38
38
+
let mut pack = sqlx::query_as!(
39
39
+
Pack,
40
40
+
r#"
41
41
+
SELECT packs.id, packs.pack_banner_id, packs.opened_at
42
42
+
FROM packs
43
43
+
WHERE id = $1 AND account_id = $2
44
44
+
"#,
45
45
+
pack_id,
46
46
+
account_id,
47
47
+
)
48
48
+
.fetch_optional(&mut *conn)
49
49
+
.await
50
50
+
.map_err(internal_server_error)?
51
51
+
.ok_or(JsonError(PackNotFoundError { pack_id }))?;
52
52
+
53
53
+
let pack_cards = sqlx::query_as!(
54
54
+
Card,
55
55
+
r#"
56
56
+
SELECT id, card_type_id
57
57
+
FROM cards
58
58
+
INNER JOIN pack_contents ON pack_contents.card_id = cards.id
59
59
+
WHERE pack_contents.pack_id = $1
60
60
+
"#,
61
61
+
pack_id,
62
62
+
)
63
63
+
.fetch_all(&mut *conn)
64
64
+
.await
65
65
+
.map_err(internal_server_error)?;
66
66
+
67
67
+
let mut just_opened = true;
68
68
+
if pack.opened_at.is_none() {
69
69
+
let update = sqlx::query!(
70
70
+
"UPDATE packs SET opened_at = now() WHERE id = $1 RETURNING opened_at",
71
71
+
pack_id
72
72
+
)
73
73
+
.fetch_one(&mut *conn)
74
74
+
.await
75
75
+
.map_err(internal_server_error)?;
76
76
+
pack.opened_at = update.opened_at;
77
77
+
} else {
78
78
+
just_opened = false;
79
79
+
}
80
80
+
81
81
+
conn.commit().await.map_err(internal_server_error)?;
82
82
+
83
83
+
if just_opened {
84
84
+
for card in &pack_cards {
85
85
+
bus.notify(AddCardToDeck {
86
86
+
account_id: account_id.to_owned(),
87
87
+
card_id: card.id,
88
88
+
})
89
89
+
.await
90
90
+
.ok();
91
91
+
}
92
92
+
}
93
93
+
94
94
+
Ok(Json(OpenPackResponse { pack, pack_cards }))
95
95
+
}
96
96
+
97
97
+
#[cfg(test)]
98
98
+
mod tests {
99
99
+
use crate::actor::AddCardToDeck;
100
100
+
use crate::bus::{Bus, BusExt};
101
101
+
use crate::test::prelude::*;
102
102
+
use axum::http::Request;
103
103
+
use kameo::actor::Spawn;
104
104
+
use sqlx::PgPool;
105
105
+
106
106
+
use super::OpenPackResponse;
107
107
+
108
108
+
#[sqlx::test(
109
109
+
migrator = "MIGRATOR",
110
110
+
fixtures(path = "../../../fixtures", scripts("seed", "account", "packs"))
111
111
+
)]
112
112
+
pub fn open_pack_ok(pool: PgPool) {
113
113
+
let pack = sqlx::query!(
114
114
+
"SELECT id FROM packs WHERE account_id = 'foxfriends' AND opened_at IS NULL"
115
115
+
)
116
116
+
.fetch_one(&pool)
117
117
+
.await
118
118
+
.unwrap();
119
119
+
120
120
+
let collector = Collector::<AddCardToDeck>::spawn_default();
121
121
+
let bus = Bus::spawn(());
122
122
+
bus.listen::<AddCardToDeck, _>(&collector).await.unwrap();
123
123
+
124
124
+
let app = crate::app::Config::test(pool).with_bus(bus).into_router();
125
125
+
126
126
+
let request = Request::post(format!("/api/v1/packs/{}/open", pack.id))
127
127
+
.header("Authorization", "Trust foxfriends")
128
128
+
.empty()
129
129
+
.unwrap();
130
130
+
131
131
+
let Ok(response) = app.oneshot(request).await;
132
132
+
assert_success!(response);
133
133
+
134
134
+
let response: OpenPackResponse = response.json().await.unwrap();
135
135
+
assert!(response.pack.opened_at.is_some());
136
136
+
assert_eq!(response.pack_cards.len(), 5);
137
137
+
138
138
+
let received = collector.collect().await;
139
139
+
assert!(
140
140
+
response
141
141
+
.pack_cards
142
142
+
.iter()
143
143
+
.all(|item| received.iter().any(|msg| msg.card_id == item.id)),
144
144
+
"all opened cards should have been broadcast"
145
145
+
);
146
146
+
}
147
147
+
148
148
+
#[sqlx::test(
149
149
+
migrator = "MIGRATOR",
150
150
+
fixtures(path = "../../../fixtures", scripts("seed", "account", "packs"))
151
151
+
)]
152
152
+
pub fn open_pack_already_opened_ok(pool: PgPool) {
153
153
+
let pack = sqlx::query!(
154
154
+
"SELECT id, opened_at FROM packs WHERE account_id = 'foxfriends' AND opened_at IS NOT NULL"
155
155
+
)
156
156
+
.fetch_one(&pool)
157
157
+
.await
158
158
+
.unwrap();
159
159
+
160
160
+
let collector = Collector::<AddCardToDeck>::spawn_default();
161
161
+
let bus = Bus::spawn(());
162
162
+
bus.listen::<AddCardToDeck, _>(&collector).await.unwrap();
163
163
+
164
164
+
let app = crate::app::Config::test(pool).with_bus(bus).into_router();
165
165
+
166
166
+
let request = Request::post(format!("/api/v1/packs/{}/open", pack.id))
167
167
+
.header("Authorization", "Trust foxfriends")
168
168
+
.empty()
169
169
+
.unwrap();
170
170
+
171
171
+
let Ok(response) = app.oneshot(request).await;
172
172
+
assert_success!(response);
173
173
+
174
174
+
let response: OpenPackResponse = response.json().await.unwrap();
175
175
+
assert_eq!(response.pack.opened_at, pack.opened_at);
176
176
+
assert_eq!(response.pack_cards.len(), 5);
177
177
+
178
178
+
let received = collector.collect().await;
179
179
+
assert!(
180
180
+
received.is_empty(),
181
181
+
"previously opened cards do not get re-broadcast"
182
182
+
);
183
183
+
}
184
184
+
}
+2
-2
packages/cartography/src/api/operations/pull_banner.rs
···
32
32
),
33
33
)]
34
34
pub async fn pull_banner(
35
35
-
db: axum::Extension<sqlx::PgPool>,
35
35
+
db: Extension<sqlx::PgPool>,
36
36
+
authorization: Extension<Authorization>,
36
37
Path(banner_id): Path<String>,
37
37
-
Extension(authorization): Extension<Authorization>,
38
38
) -> axum::response::Result<Json<PullBannerResponse>> {
39
39
let account_id = authorization.authorized_account_id()?;
40
40
+18
-4
packages/cartography/src/app.rs
···
1
1
use crate::api::{middleware, operations, ws};
2
2
use crate::bus::Bus;
3
3
use axum::Router;
4
4
-
use kameo::actor::Spawn as _;
4
4
+
use kameo::actor::{ActorRef, Spawn as _};
5
5
use utoipa::Modify;
6
6
use utoipa::openapi::security::{ApiKey, ApiKeyValue, SecurityScheme};
7
7
···
17
17
operations::list_fields,
18
18
19
19
operations::list_packs,
20
20
+
operations::open_pack,
20
21
),
21
22
components(
22
23
schemas(
···
46
47
47
48
pub struct Config {
48
49
pool: sqlx::PgPool,
50
50
+
bus: Option<ActorRef<Bus>>,
49
51
}
50
52
51
53
impl Config {
···
57
59
.connect(&db_url)
58
60
.await?;
59
61
60
60
-
Ok(Self { pool })
62
62
+
Ok(Self { pool, bus: None })
61
63
}
62
64
63
65
#[cfg(test)]
64
66
pub fn test(pool: sqlx::PgPool) -> Self {
65
65
-
Self { pool }
67
67
+
Self { pool, bus: None }
68
68
+
}
69
69
+
70
70
+
#[cfg(test)]
71
71
+
pub fn with_bus(self, bus: ActorRef<Bus>) -> Self {
72
72
+
Self {
73
73
+
bus: Some(bus),
74
74
+
..self
75
75
+
}
66
76
}
67
77
68
78
pub fn into_router(self) -> Router {
69
69
-
let bus = Bus::spawn(());
79
79
+
let bus = self.bus.unwrap_or_else(|| Bus::spawn(()));
70
80
71
81
axum::Router::new()
72
82
.route(
···
92
102
.route(
93
103
"/api/v1/players/{player_id}/packs",
94
104
axum::routing::post(operations::list_packs),
105
105
+
)
106
106
+
.route(
107
107
+
"/api/v1/packs/{pack_id}/open",
108
108
+
axum::routing::post(operations::open_pack),
95
109
)
96
110
.route("/play/ws", axum::routing::any(ws::v1))
97
111
.layer(axum::middleware::from_fn(middleware::authorization::trust))
+14
-14
packages/cartography/src/bus.rs
···
24
24
}
25
25
26
26
impl Bus {
27
27
-
pub fn listen<T>(&mut self, recipient: Recipient<T>)
27
27
+
fn listen<T>(&mut self, recipient: Recipient<T>)
28
28
where
29
29
T: Any + Send + Sync,
30
30
{
···
32
32
entry.push(Box::new(recipient))
33
33
}
34
34
35
35
-
pub async fn notify<T>(&mut self, notification: T)
35
35
+
async fn notify<T>(&mut self, notification: T)
36
36
where
37
37
-
T: Clone + Any + Send + Sync,
37
37
+
T: Any + Send + Sync + Clone,
38
38
{
39
39
for recipient in self
40
40
.listeners
···
43
43
.flatten()
44
44
.filter_map(|entry| entry.as_any().downcast_ref::<Recipient<T>>())
45
45
{
46
46
-
if let Err(error) = recipient.tell(notification.clone()).await {
46
46
+
if let Err(error) = dbg!(recipient.tell(notification.clone()).await) {
47
47
tracing::error!("bus failed to notify: {}", error);
48
48
}
49
49
}
···
76
76
}
77
77
}
78
78
79
79
-
pub struct Listen<T: Send + Sync + 'static>(Recipient<T>);
80
80
-
pub struct Notify<T: Send + Sync + Clone + 'static>(T);
79
79
+
pub struct Listen<T: Any + Send + Sync>(Recipient<T>);
80
80
+
pub struct Notify<T: Any + Send + Sync + Clone>(T);
81
81
82
82
-
#[expect(dead_code)]
82
82
+
#[cfg_attr(not(test), expect(dead_code))]
83
83
pub trait BusExt {
84
84
-
async fn listen<T: Send + Sync + Clone + 'static, A: Actor + Message<T>>(
84
84
+
async fn listen<T: Any + Send + Sync, A: Actor + Message<T>>(
85
85
&self,
86
86
actor_ref: &ActorRef<A>,
87
87
) -> Result<(), SendError<Listen<T>, Infallible>>;
88
88
89
89
-
async fn notify<T: Send + Sync + Clone + 'static, A: Actor + Message<T>>(
89
89
+
async fn notify<T: Any + Send + Sync + Clone>(
90
90
&self,
91
91
notification: T,
92
92
) -> Result<(), SendError<Notify<T>, Infallible>>;
93
93
}
94
94
95
95
impl BusExt for ActorRef<Bus> {
96
96
-
async fn listen<T: Send + Sync + Clone + 'static, A: Actor + Message<T>>(
96
96
+
async fn listen<T: Any + Send + Sync, A: Actor + Message<T>>(
97
97
&self,
98
98
actor_ref: &ActorRef<A>,
99
99
) -> Result<(), SendError<Listen<T>, Infallible>> {
···
101
101
self.tell(Listen(actor_ref.clone().recipient())).await
102
102
}
103
103
104
104
-
async fn notify<T: Send + Sync + Clone + 'static, A: Actor + Message<T>>(
104
104
+
async fn notify<T: Any + Send + Sync + Clone>(
105
105
&self,
106
106
notification: T,
107
107
) -> Result<(), SendError<Notify<T>, Infallible>> {
108
108
-
self.tell(Notify(notification)).await
108
108
+
self.ask(Notify(notification)).await
109
109
}
110
110
}
111
111
112
112
-
impl<T: Send + Sync + 'static> Message<Listen<T>> for Bus {
112
112
+
impl<T: Any + Send + Sync> Message<Listen<T>> for Bus {
113
113
type Reply = ();
114
114
115
115
async fn handle(
···
121
121
}
122
122
}
123
123
124
124
-
impl<T: Send + Sync + Clone + 'static> Message<Notify<T>> for Bus {
124
124
+
impl<T: Any + Send + Sync + Clone> Message<Notify<T>> for Bus {
125
125
type Reply = ();
126
126
127
127
async fn handle(
+51
packages/cartography/src/test.rs
···
1
1
+
use std::any::Any;
2
2
+
1
3
use axum::{body::Body, http::Request};
2
4
use http_body_util::BodyExt;
5
5
+
use kameo::prelude::*;
3
6
4
7
pub trait ResponseExt {
5
8
async fn json<T: serde::de::DeserializeOwned>(self) -> anyhow::Result<T>;
···
53
56
54
57
pub(crate) use assert_success;
55
58
59
59
+
#[derive(Actor)]
60
60
+
pub struct Collector<T: Any + Clone + Sync + Send>(Vec<T>);
61
61
+
62
62
+
impl<T: Any + Clone + Sync + Send> Default for Collector<T> {
63
63
+
fn default() -> Self {
64
64
+
Self(vec![])
65
65
+
}
66
66
+
}
67
67
+
68
68
+
struct TakeCollection;
69
69
+
70
70
+
impl<T: Any + Clone + Sync + Send> Message<T> for Collector<T> {
71
71
+
type Reply = ();
72
72
+
73
73
+
async fn handle(
74
74
+
&mut self,
75
75
+
msg: T,
76
76
+
_ctx: &mut kameo::prelude::Context<Self, Self::Reply>,
77
77
+
) -> Self::Reply {
78
78
+
self.0.push(msg);
79
79
+
}
80
80
+
}
81
81
+
82
82
+
impl<T: Any + Clone + Sync + Send> Message<TakeCollection> for Collector<T> {
83
83
+
type Reply = Vec<T>;
84
84
+
85
85
+
async fn handle(
86
86
+
&mut self,
87
87
+
_msg: TakeCollection,
88
88
+
ctx: &mut kameo::prelude::Context<Self, Self::Reply>,
89
89
+
) -> Self::Reply {
90
90
+
ctx.stop();
91
91
+
std::mem::take(&mut self.0)
92
92
+
}
93
93
+
}
94
94
+
95
95
+
pub trait CollectorExt<T> {
96
96
+
async fn collect(self) -> Vec<T>;
97
97
+
}
98
98
+
99
99
+
impl<T: Any + Clone + Sync + Send> CollectorExt<T> for ActorRef<Collector<T>> {
100
100
+
async fn collect(self) -> Vec<T> {
101
101
+
self.ask(TakeCollection).await.unwrap()
102
102
+
}
103
103
+
}
104
104
+
56
105
pub mod prelude {
106
106
+
pub use super::Collector;
107
107
+
pub use super::CollectorExt as _;
57
108
pub use super::{RequestExt as _, ResponseExt as _};
58
109
pub use tower::ServiceExt as _;
59
110