Trading card city builder game?

add actor for websocket handling

eldridge.cam 711e2e00 9cb02439

Waiting for spindle ...
+170 -28
+62
Cargo.lock
··· 339 339 "derive_more 2.1.1", 340 340 "dioxus", 341 341 "futures", 342 + "futures-rx", 343 + "kameo", 342 344 "rmp-serde", 343 345 "serde", 344 346 "serde_json", ··· 346 348 "tokio", 347 349 "tokio-stream", 348 350 "tracing", 351 + "uuid", 349 352 ] 350 353 351 354 [[package]] ··· 1643 1646 checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" 1644 1647 1645 1648 [[package]] 1649 + name = "downcast-rs" 1650 + version = "2.0.2" 1651 + source = "registry+https://github.com/rust-lang/crates.io-index" 1652 + checksum = "117240f60069e65410b3ae1bb213295bd828f707b5bec6596a1afc8793ce0cbc" 1653 + 1654 + [[package]] 1646 1655 name = "dpi" 1647 1656 version = "0.1.2" 1648 1657 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1668 1677 version = "1.0.5" 1669 1678 source = "registry+https://github.com/rust-lang/crates.io-index" 1670 1679 checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" 1680 + 1681 + [[package]] 1682 + name = "dyn-clone" 1683 + version = "1.0.20" 1684 + source = "registry+https://github.com/rust-lang/crates.io-index" 1685 + checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" 1671 1686 1672 1687 [[package]] 1673 1688 name = "either" ··· 1955 1970 "proc-macro2", 1956 1971 "quote", 1957 1972 "syn 2.0.114", 1973 + ] 1974 + 1975 + [[package]] 1976 + name = "futures-rx" 1977 + version = "0.2.1" 1978 + source = "registry+https://github.com/rust-lang/crates.io-index" 1979 + checksum = "40104b43c730b967d36dd35903dcac26dcc32ef3ae623d1e7e8467461765d9c9" 1980 + dependencies = [ 1981 + "futures", 1982 + "paste", 1983 + "pin-project-lite", 1958 1984 ] 1959 1985 1960 1986 [[package]] ··· 2857 2883 ] 2858 2884 2859 2885 [[package]] 2886 + name = "kameo" 2887 + version = "0.19.2" 2888 + source = "registry+https://github.com/rust-lang/crates.io-index" 2889 + checksum = "9c4af7638c67029fd6821d02813c3913c803784648725d4df4082c9b91d7cbb1" 2890 + dependencies = [ 2891 + "downcast-rs", 2892 + "dyn-clone", 2893 + "futures", 2894 + "kameo_macros", 2895 + "serde", 2896 + "tokio", 2897 + "tracing", 2898 + ] 2899 + 2900 + [[package]] 2901 + name = "kameo_macros" 2902 + version = "0.19.0" 2903 + source = "registry+https://github.com/rust-lang/crates.io-index" 2904 + checksum = "a13c324e2d8c8e126e63e66087448b4267e263e6cb8770c56d10a9d0d279d9e2" 2905 + dependencies = [ 2906 + "heck 0.5.0", 2907 + "proc-macro2", 2908 + "quote", 2909 + "syn 2.0.114", 2910 + ] 2911 + 2912 + [[package]] 2860 2913 name = "keyboard-types" 2861 2914 version = "0.7.0" 2862 2915 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3631 3684 "smallvec", 3632 3685 "windows-link 0.2.1", 3633 3686 ] 3687 + 3688 + [[package]] 3689 + name = "paste" 3690 + version = "1.0.15" 3691 + source = "registry+https://github.com/rust-lang/crates.io-index" 3692 + checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" 3634 3693 3635 3694 [[package]] 3636 3695 name = "pem-rfc7468" ··· 5342 5401 "pin-project-lite", 5343 5402 "socket2", 5344 5403 "tokio-macros", 5404 + "tracing", 5345 5405 "windows-sys 0.61.2", 5346 5406 ] 5347 5407 ··· 5766 5826 source = "registry+https://github.com/rust-lang/crates.io-index" 5767 5827 checksum = "ee48d38b119b0cd71fe4141b30f5ba9c7c5d9f4e7a3a8b4a674e4b6ef789976f" 5768 5828 dependencies = [ 5829 + "getrandom 0.3.4", 5769 5830 "js-sys", 5831 + "serde_core", 5770 5832 "wasm-bindgen", 5771 5833 ] 5772 5834
+6 -3
Cargo.toml
··· 11 11 derive_more = "2.1.1" 12 12 dioxus = { version = "0.7.1", features = ["router", "fullstack"] } 13 13 futures = "0.3.31" 14 + futures-rx = "0.2.1" 15 + kameo = "0.19.2" 14 16 rmp-serde = "1.3.1" 15 17 serde = { version = "1.0.228", features = ["derive"] } 16 18 serde_json = "1.0.149" ··· 18 20 tokio = { version = "1.49.0", features = ["macros"] } 19 21 tokio-stream = "0.1.18" 20 22 tracing = "0.1.44" 23 + uuid = { version = "1.20.0", features = ["serde", "v7"] } 21 24 22 25 [features] 23 26 default = ["web", "server"] 24 - web = ["dioxus/web", "tokio/rt"] 25 - desktop = ["dioxus/desktop", "tokio/rt"] 26 - mobile = ["dioxus/mobile", "tokio/rt"] 27 + web = ["dioxus/web", "tokio/rt", "uuid/js"] 28 + desktop = ["dioxus/desktop", "tokio/rt", "uuid/js"] 29 + mobile = ["dioxus/mobile", "tokio/rt", "uuid/js"] 27 30 server = ["dioxus/server", "tokio/rt-multi-thread", "dep:sqlx", "dep:axum"]
+1
src/actor/mod.rs
··· 1 + pub mod player_socket;
+61
src/actor/player_socket.rs
··· 1 + use futures::Stream; 2 + use serde::{Deserialize, Serialize}; 3 + use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; 4 + use tokio_stream::wrappers::UnboundedReceiverStream; 5 + 6 + #[cfg(feature = "server")] 7 + use kameo::prelude::*; 8 + 9 + #[cfg(feature = "server")] 10 + #[derive(Actor, Default)] 11 + pub struct PlayerSocket { 12 + account_id: Option<String>, 13 + } 14 + 15 + #[cfg(feature = "server")] 16 + impl PlayerSocket { 17 + pub async fn push( 18 + actor: ActorRef<Self>, 19 + request: Request, 20 + ) -> Result<impl Stream<Item = Response>, SendError<PlayerSocketMessage>> { 21 + let (tx, rx) = unbounded_channel(); 22 + actor.tell(PlayerSocketMessage { tx, request }).await?; 23 + Ok(UnboundedReceiverStream::new(rx)) 24 + } 25 + } 26 + 27 + #[cfg(feature = "server")] 28 + pub struct PlayerSocketMessage { 29 + tx: UnboundedSender<Response>, 30 + request: Request, 31 + } 32 + 33 + #[derive(Serialize, Deserialize, Clone, Debug)] 34 + #[serde(tag = "type", content = "data")] 35 + pub enum Request { 36 + Authenticate(String), 37 + } 38 + 39 + #[derive(Serialize, Deserialize, Clone, Debug)] 40 + #[serde(tag = "type", content = "data")] 41 + pub enum Response { 42 + Authenticated(String), 43 + } 44 + 45 + #[cfg(feature = "server")] 46 + impl Message<PlayerSocketMessage> for PlayerSocket { 47 + type Reply = (); 48 + 49 + async fn handle( 50 + &mut self, 51 + PlayerSocketMessage { tx, request }: PlayerSocketMessage, 52 + _ctx: &mut Context<Self, Self::Reply>, 53 + ) -> Self::Reply { 54 + match request { 55 + Request::Authenticate(name) => { 56 + self.account_id = Some(name.clone()); 57 + let _ = tx.send(Response::Authenticated(name)); 58 + } 59 + } 60 + } 61 + }
+39 -25
src/api/ws.rs
··· 1 1 use axum::extract::ws::{CloseFrame, Message, WebSocket, WebSocketUpgrade}; 2 2 use futures::StreamExt; 3 + use kameo::prelude::*; 3 4 use serde::{Deserialize, Serialize}; 4 - use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; 5 - use tokio_stream::wrappers::UnboundedReceiverStream; 6 5 use tracing::Instrument; 6 + use uuid::Uuid; 7 7 8 - #[derive(Serialize, Deserialize, Clone, Debug)] 9 - #[serde(tag = "#type", content = "#payload")] 10 - pub enum Request { 11 - Authenticate(String), 12 - } 8 + use crate::actor::player_socket::{PlayerSocket, Request, Response}; 13 9 14 10 #[derive(Serialize, Deserialize, Clone, Debug)] 15 - #[serde(tag = "#type", content = "#payload")] 16 - pub enum Response { 17 - Authenticated(String), 11 + pub struct ProtocolV1Message<T> { 12 + id: Uuid, 13 + #[serde(flatten)] 14 + data: T, 18 15 } 19 16 20 17 const JSON_PROTOCOL: &str = "v1-json.cartography.app"; ··· 44 41 Closed(CloseFrame), 45 42 } 46 43 47 - async fn on_message(tx: UnboundedSender<Response>, message: Request) {} 48 - 49 44 pub async fn v1(ws: WebSocketUpgrade) -> axum::response::Response { 50 45 let ws = ws.protocols([JSON_PROTOCOL, MESSAGEPACK_PROTOCOL]); 51 46 let protocol = ws ··· 58 53 tracing::debug!("websocket connected"); 59 54 let (ws_sender, ws_receiver) = socket.split(); 60 55 futures::pin_mut!(ws_sender); 56 + 57 + let actor = PlayerSocket::spawn_default(); 61 58 let result = ws_receiver 62 59 .filter_map(|msg| async move { msg.ok() }) 63 60 .map(|msg| match msg { 64 61 Message::Text(text) if protocol == JSON_PROTOCOL => Some( 65 - serde_json::from_str::<Request>(&text).map_err(ProtocolV1Error::InvalidJson), 62 + serde_json::from_str::<ProtocolV1Message<Request>>(&text) 63 + .map_err(ProtocolV1Error::InvalidJson), 66 64 ) 67 65 .transpose(), 68 66 Message::Binary(binary) if protocol == MESSAGEPACK_PROTOCOL => { ··· 87 85 } 88 86 } 89 87 }) 90 - .flat_map_unordered(None, |msg| { 91 - let (tx, rx) = unbounded_channel::<Response>(); 92 - tokio::spawn(on_message(tx, msg)); 93 - UnboundedReceiverStream::new(rx) 94 - }) 95 - .map(|response| match protocol.as_str() { 96 - MESSAGEPACK_PROTOCOL => rmp_serde::to_vec(&response) 97 - .map(Message::from) 98 - .map_err(axum::Error::new), 99 - _ => serde_json::to_string(&response) 100 - .map(Message::from) 101 - .map_err(axum::Error::new), 88 + .filter_map({ 89 + let actor = actor.clone(); 90 + move |ProtocolV1Message { id, data }| { 91 + let actor = actor.clone(); 92 + async move { 93 + Some( 94 + PlayerSocket::push(actor, data) 95 + .await 96 + .ok()? 97 + .map(move |data| ProtocolV1Message { id, data }), 98 + ) 99 + } 100 + } 102 101 }) 102 + .flatten_unordered(None) 103 + .map( 104 + |response: ProtocolV1Message<Response>| match protocol.as_str() { 105 + MESSAGEPACK_PROTOCOL => rmp_serde::to_vec(&response) 106 + .map(Message::from) 107 + .map_err(axum::Error::new), 108 + _ => serde_json::to_string(&response) 109 + .map(Message::from) 110 + .map_err(axum::Error::new), 111 + }, 112 + ) 103 113 .forward(ws_sender) 104 114 .in_current_span() 105 115 .await; 106 116 if let Err(error) = result { 107 117 tracing::error!("websocket ended in error: {}", error); 108 118 } 119 + if let Err(error) = actor.stop_gracefully().await { 120 + tracing::error!("failed to signal player socket to stop: {}", error); 121 + } 122 + actor.wait_for_shutdown().await; 109 123 tracing::debug!("websocket disconnected"); 110 124 }) 111 125 }
+1
src/main.rs
··· 1 + mod actor; 1 2 mod api; 2 3 mod app; 3 4