Trading card city builder game?

add message bus functionality

eldridge.cam 0aef7ed5 f716475b

verified
Waiting for spindle ...
+149 -13
-1
Cargo.lock
··· 1973 1973 source = "registry+https://github.com/rust-lang/crates.io-index" 1974 1974 checksum = "59559e1509172f6b26c1cdbc7247c4ddd1ac6560fe94b584f81ee489b141f719" 1975 1975 dependencies = [ 1976 - "axum", 1977 1976 "serde", 1978 1977 "serde_json", 1979 1978 "utoipa",
+1 -1
Cargo.toml
··· 26 26 tokio-stream = { version = "0.1.18", features = ["sync"] } 27 27 tracing = "0.1.44" 28 28 utoipa = "5.4.0" 29 - utoipa-scalar = { version = "0.3.0", features = ["axum"] } 29 + utoipa-scalar = { version = "0.3.0" } 30 30 uuid = { version = "1.20.0", features = ["serde", "v7"] }
+1 -1
src/actor/field_state/mod.rs
··· 1 1 use super::player_socket::Response; 2 2 use super::Unsubscribe; 3 - use kameo::{prelude::Message, Actor}; 3 + use kameo::prelude::*; 4 4 use serde::{Deserialize, Serialize}; 5 5 use sqlx::PgPool; 6 6 use tokio::sync::mpsc::UnboundedSender;
+134
src/bus.rs
··· 1 + use kameo::error::Infallible; 2 + use kameo::prelude::*; 3 + use std::any::{Any, TypeId}; 4 + use std::collections::HashMap; 5 + use std::ops::ControlFlow; 6 + 7 + pub trait AnyRecipient: Any { 8 + fn as_any(&self) -> &dyn Any; 9 + fn actor_id(&self) -> ActorId; 10 + } 11 + 12 + impl<T: Send + Sync> AnyRecipient for Recipient<T> { 13 + fn as_any(&self) -> &dyn Any { 14 + self 15 + } 16 + 17 + fn actor_id(&self) -> ActorId { 18 + self.id() 19 + } 20 + } 21 + 22 + pub struct Bus { 23 + listeners: HashMap<TypeId, Vec<Box<dyn AnyRecipient + Sync + Send>>>, 24 + } 25 + 26 + impl Bus { 27 + pub fn listen<T>(&mut self, recipient: Recipient<T>) 28 + where 29 + T: Any + Send + Sync, 30 + { 31 + let entry = self.listeners.entry(TypeId::of::<T>()).or_default(); 32 + entry.push(Box::new(recipient)) 33 + } 34 + 35 + pub async fn notify<T>(&mut self, notification: T) 36 + where 37 + T: Clone + Any + Send + Sync, 38 + { 39 + for recipient in self 40 + .listeners 41 + .get(&TypeId::of::<T>()) 42 + .into_iter() 43 + .flatten() 44 + .filter_map(|entry| entry.as_any().downcast_ref::<Recipient<T>>()) 45 + { 46 + if let Err(error) = recipient.tell(notification.clone()).await { 47 + tracing::error!("bus failed to notify: {}", error); 48 + } 49 + } 50 + } 51 + } 52 + 53 + impl Actor for Bus { 54 + type Args = (); 55 + type Error = (); 56 + 57 + async fn on_start( 58 + _args: Self::Args, 59 + _actor_ref: kameo::prelude::ActorRef<Self>, 60 + ) -> Result<Self, Self::Error> { 61 + Ok(Self { 62 + listeners: HashMap::new(), 63 + }) 64 + } 65 + 66 + async fn on_link_died( 67 + &mut self, 68 + _actor_ref: kameo::prelude::WeakActorRef<Self>, 69 + id: kameo::prelude::ActorId, 70 + _reason: kameo::prelude::ActorStopReason, 71 + ) -> Result<std::ops::ControlFlow<kameo::prelude::ActorStopReason>, Self::Error> { 72 + for (_, listeners) in self.listeners.iter_mut() { 73 + listeners.retain(|listener| listener.actor_id() != id); 74 + } 75 + Ok(ControlFlow::Continue(())) 76 + } 77 + } 78 + 79 + pub struct Listen<T: Send + Sync + 'static>(Recipient<T>); 80 + pub struct Notify<T: Send + Sync + Clone + 'static>(T); 81 + 82 + #[expect(dead_code)] 83 + pub trait BusExt { 84 + async fn listen<T: Send + Sync + Clone + 'static, A: Actor + Message<T>>( 85 + &self, 86 + actor_ref: &ActorRef<A>, 87 + ) -> Result<(), SendError<Listen<T>, Infallible>>; 88 + 89 + async fn notify<T: Send + Sync + Clone + 'static, A: Actor + Message<T>>( 90 + &self, 91 + notification: T, 92 + ) -> Result<(), SendError<Notify<T>, Infallible>>; 93 + } 94 + 95 + impl BusExt for ActorRef<Bus> { 96 + async fn listen<T: Send + Sync + Clone + 'static, A: Actor + Message<T>>( 97 + &self, 98 + actor_ref: &ActorRef<A>, 99 + ) -> Result<(), SendError<Listen<T>, Infallible>> { 100 + self.link(actor_ref).await; 101 + self.tell(Listen(actor_ref.clone().recipient())).await 102 + } 103 + 104 + async fn notify<T: Send + Sync + Clone + 'static, A: Actor + Message<T>>( 105 + &self, 106 + notification: T, 107 + ) -> Result<(), SendError<Notify<T>, Infallible>> { 108 + self.tell(Notify(notification)).await 109 + } 110 + } 111 + 112 + impl<T: Send + Sync + 'static> Message<Listen<T>> for Bus { 113 + type Reply = (); 114 + 115 + async fn handle( 116 + &mut self, 117 + msg: Listen<T>, 118 + _ctx: &mut kameo::prelude::Context<Self, Self::Reply>, 119 + ) -> Self::Reply { 120 + self.listen(msg.0); 121 + } 122 + } 123 + 124 + impl<T: Send + Sync + Clone + 'static> Message<Notify<T>> for Bus { 125 + type Reply = (); 126 + 127 + async fn handle( 128 + &mut self, 129 + msg: Notify<T>, 130 + _ctx: &mut kameo::prelude::Context<Self, Self::Reply>, 131 + ) -> Self::Reply { 132 + self.notify(msg.0).await; 133 + } 134 + }
+13 -10
src/main.rs
··· 1 1 mod actor; 2 2 mod api; 3 + mod bus; 3 4 mod db; 4 5 mod dto; 5 6 6 - use std::net::IpAddr; 7 - 8 - use axum::{response::Html, Extension, Json}; 9 - use utoipa::OpenApi; 10 - use utoipa_scalar::Scalar; 7 + use kameo::actor::Spawn as _; 8 + use utoipa::OpenApi as _; 11 9 12 - #[derive(OpenApi)] 10 + #[derive(utoipa::OpenApi)] 13 11 #[openapi( 14 12 paths(api::list_card_types::list_card_types), 15 13 tags((name = "Global", description = "Publicly available global data about the Cartography game.")), ··· 19 17 #[tokio::main] 20 18 async fn main() -> anyhow::Result<()> { 21 19 let db_url = std::env::var("DATABASE_URL").expect("DATABASE_URL is required"); 22 - let host: IpAddr = std::env::var("HOST") 20 + let host: std::net::IpAddr = std::env::var("HOST") 23 21 .as_deref() 24 22 .unwrap_or("0.0.0.0") 25 23 .parse() ··· 33 31 .max_connections(10) 34 32 .connect(&db_url) 35 33 .await?; 34 + 35 + let bus = bus::Bus::spawn(()); 36 36 let app = axum::Router::new() 37 37 .route( 38 38 "/api/v1/cardtypes", ··· 41 41 .route("/play/ws", axum::routing::any(api::ws::v1)) 42 42 .route( 43 43 "/api/openapi.json", 44 - axum::routing::get(Json(ApiDoc::openapi())), 44 + axum::routing::get(axum::response::Json(ApiDoc::openapi())), 45 45 ) 46 46 .route( 47 47 "/api", 48 - axum::routing::get(Html(Scalar::new(ApiDoc::openapi()).to_html())), 48 + axum::routing::get(axum::response::Html( 49 + utoipa_scalar::Scalar::new(ApiDoc::openapi()).to_html(), 50 + )), 49 51 ) 50 - .layer(Extension(pool)); 52 + .layer(axum::Extension(bus)) 53 + .layer(axum::Extension(pool)); 51 54 let listener = tokio::net::TcpListener::bind((host, port)).await?; 52 55 axum::serve(listener, app).await?; 53 56 Ok(())