Trading card city builder game?

rebuild websocket manually without dioxus integration

eldridge.cam 9cb02439 9a3d4dc1

Waiting for spindle ...
+130 -9
+23
Cargo.lock
··· 335 335 name = "cartography" 336 336 version = "0.1.0" 337 337 dependencies = [ 338 + "axum", 339 + "derive_more 2.1.1", 338 340 "dioxus", 339 341 "futures", 342 + "rmp-serde", 340 343 "serde", 344 + "serde_json", 341 345 "sqlx", 342 346 "tokio", 343 347 "tokio-stream", ··· 4285 4289 "libc", 4286 4290 "untrusted", 4287 4291 "windows-sys 0.52.0", 4292 + ] 4293 + 4294 + [[package]] 4295 + name = "rmp" 4296 + version = "0.8.15" 4297 + source = "registry+https://github.com/rust-lang/crates.io-index" 4298 + checksum = "4ba8be72d372b2c9b35542551678538b562e7cf86c3315773cae48dfbfe7790c" 4299 + dependencies = [ 4300 + "num-traits", 4301 + ] 4302 + 4303 + [[package]] 4304 + name = "rmp-serde" 4305 + version = "1.3.1" 4306 + source = "registry+https://github.com/rust-lang/crates.io-index" 4307 + checksum = "72f81bee8c8ef9b577d1681a70ebbc962c232461e397b22c208c43c04b67a155" 4308 + dependencies = [ 4309 + "rmp", 4310 + "serde", 4288 4311 ] 4289 4312 4290 4313 [[package]]
+5 -1
Cargo.toml
··· 7 7 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html 8 8 9 9 [dependencies] 10 + axum = { version = "0.8.8", features = ["ws"], optional = true } 11 + derive_more = "2.1.1" 10 12 dioxus = { version = "0.7.1", features = ["router", "fullstack"] } 11 13 futures = "0.3.31" 14 + rmp-serde = "1.3.1" 12 15 serde = { version = "1.0.228", features = ["derive"] } 16 + serde_json = "1.0.149" 13 17 sqlx = { version = "0.8.6", features = ["runtime-tokio", "postgres"], optional = true } 14 18 tokio = { version = "1.49.0", features = ["macros"] } 15 19 tokio-stream = "0.1.18" ··· 20 24 web = ["dioxus/web", "tokio/rt"] 21 25 desktop = ["dioxus/desktop", "tokio/rt"] 22 26 mobile = ["dioxus/mobile", "tokio/rt"] 23 - server = ["dioxus/server", "tokio/rt-multi-thread", "dep:sqlx"] 27 + server = ["dioxus/server", "tokio/rt-multi-thread", "dep:sqlx", "dep:axum"]
+1
src/api.rs
··· 1 1 use dioxus::prelude::*; 2 2 3 + #[cfg(feature = "server")] 3 4 pub mod ws; 4 5 5 6 /// Echo the user input on the server.
+98 -7
src/api/ws.rs
··· 1 - use dioxus::fullstack::{WebSocketOptions, Websocket}; 2 - use dioxus::prelude::*; 1 + use axum::extract::ws::{CloseFrame, Message, WebSocket, WebSocketUpgrade}; 2 + use futures::StreamExt; 3 3 use serde::{Deserialize, Serialize}; 4 + use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; 5 + use tokio_stream::wrappers::UnboundedReceiverStream; 6 + use tracing::Instrument; 4 7 5 8 #[derive(Serialize, Deserialize, Clone, Debug)] 9 + #[serde(tag = "#type", content = "#payload")] 6 10 pub enum Request { 7 11 Authenticate(String), 8 12 } 9 13 10 14 #[derive(Serialize, Deserialize, Clone, Debug)] 15 + #[serde(tag = "#type", content = "#payload")] 11 16 pub enum Response { 12 17 Authenticated(String), 13 18 } 14 19 15 - #[get("/api/ws")] 16 - pub async fn v1(options: WebSocketOptions) -> Result<Websocket<Request, Response>> { 17 - let socket = options.on_upgrade(move |socket| async move { 18 - }); 19 - Ok(socket) 20 + const JSON_PROTOCOL: &str = "v1-json.cartography.app"; 21 + const MESSAGEPACK_PROTOCOL: &str = "v1-messagepack.cartography.app"; 22 + 23 + impl std::error::Error for ProtocolV1Error { 24 + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { 25 + match self { 26 + Self::InvalidJson(error) => Some(error), 27 + Self::InvalidMessagepack(error) => Some(error), 28 + _ => None, 29 + } 30 + } 31 + } 32 + 33 + #[derive(Debug, derive_more::Display)] 34 + enum ProtocolV1Error { 35 + #[display("invalid JSON payload: {_0}")] 36 + InvalidJson(serde_json::Error), 37 + #[display("invalid MessagePack payload: {_0}")] 38 + InvalidMessagepack(rmp_serde::decode::Error), 39 + #[display("invalid websocket message type")] 40 + InvalidMessage, 41 + #[display("client disconnected")] 42 + Disconnected, 43 + #[display("client closed connection: {_0:?}")] 44 + Closed(CloseFrame), 45 + } 46 + 47 + async fn on_message(tx: UnboundedSender<Response>, message: Request) {} 48 + 49 + pub async fn v1(ws: WebSocketUpgrade) -> axum::response::Response { 50 + let ws = ws.protocols([JSON_PROTOCOL, MESSAGEPACK_PROTOCOL]); 51 + let protocol = ws 52 + .selected_protocol() 53 + .and_then(|hv| hv.to_str().ok()) 54 + .unwrap_or(JSON_PROTOCOL) 55 + .to_owned(); 56 + ws.on_upgrade(move |socket: WebSocket| async move { 57 + let _span = tracing::info_span!("websocket connection"); 58 + tracing::debug!("websocket connected"); 59 + let (ws_sender, ws_receiver) = socket.split(); 60 + futures::pin_mut!(ws_sender); 61 + let result = ws_receiver 62 + .filter_map(|msg| async move { msg.ok() }) 63 + .map(|msg| match msg { 64 + Message::Text(text) if protocol == JSON_PROTOCOL => Some( 65 + serde_json::from_str::<Request>(&text).map_err(ProtocolV1Error::InvalidJson), 66 + ) 67 + .transpose(), 68 + Message::Binary(binary) if protocol == MESSAGEPACK_PROTOCOL => { 69 + rmp_serde::from_read(binary.as_ref()) 70 + .map_err(ProtocolV1Error::InvalidMessagepack) 71 + } 72 + Message::Text(_) | Message::Binary(_) => Err(ProtocolV1Error::InvalidMessage), 73 + Message::Close(None) => Err(ProtocolV1Error::Disconnected), 74 + Message::Close(Some(frame)) => Err(ProtocolV1Error::Closed(frame)), 75 + _ => Ok(None), 76 + }) 77 + .filter_map(|msg| async move { 78 + match msg { 79 + Ok(None) => None, 80 + Ok(Some(msg)) => { 81 + tracing::trace!("websocket message: {:?}", msg); 82 + Some(msg) 83 + } 84 + Err(error) => { 85 + tracing::error!("websocket received error: {}", error); 86 + None 87 + } 88 + } 89 + }) 90 + .flat_map_unordered(None, |msg| { 91 + let (tx, rx) = unbounded_channel::<Response>(); 92 + tokio::spawn(on_message(tx, msg)); 93 + UnboundedReceiverStream::new(rx) 94 + }) 95 + .map(|response| match protocol.as_str() { 96 + MESSAGEPACK_PROTOCOL => rmp_serde::to_vec(&response) 97 + .map(Message::from) 98 + .map_err(axum::Error::new), 99 + _ => serde_json::to_string(&response) 100 + .map(Message::from) 101 + .map_err(axum::Error::new), 102 + }) 103 + .forward(ws_sender) 104 + .in_current_span() 105 + .await; 106 + if let Err(error) = result { 107 + tracing::error!("websocket ended in error: {}", error); 108 + } 109 + tracing::debug!("websocket disconnected"); 110 + }) 20 111 }
+3 -1
src/main.rs
··· 20 20 .connect(&db_url) 21 21 .await?; 22 22 23 - let router = dioxus::server::router(App).layer(Extension(pool)); 23 + let router = dioxus::server::router(App) 24 + .route("/api/ws", axum::routing::any(api::ws::v1)) 25 + .layer(Extension(pool)); 24 26 Ok(router) 25 27 }); 26 28 }