Trading card city builder game?

implement the pubsub

+576 -21
+2
.env.example
··· 2 2 SHADOW_DATABASE_URL="postgres://postgres:postgres@localhost:5432/shadow" 3 3 ROOT_DATABASE_URL="postgres://postgres:postgres@localhost:5432/postgres" 4 4 5 + WEBSOCKET_PROTOCOLS="json" 6 + 5 7 PUBLIC_SERVER_URL=http://localhost:8000 6 8 PUBLIC_SERVER_WS_URL=ws://localhost:8000
+1
api/src/cartography_api/game_state.gleam
··· 17 17 } 18 18 19 19 pub type CardTypeId { 20 + CardTypeId(id: String) 20 21 TileTypeId(id: String) 21 22 SpeciesId(id: String) 22 23 }
+4 -4
api/src/cartography_api/request.gleam
··· 1 - import cartography_api/game_state.{type FieldId} 1 + import cartography_api/game_state.{type CardTypeId, type FieldId} 2 2 import cartography_api/internal/repr 3 3 import gleam/dynamic/decode 4 4 import gleam/json ··· 19 19 ListFields 20 20 WatchField(field_id: FieldId) 21 21 Unsubscribe 22 - DebugAddCard(card_id: String) 22 + DebugAddCard(card_id: CardTypeId) 23 23 } 24 24 25 25 pub fn to_json(message: Message) -> json.Json { ··· 34 34 |> repr.struct("WatchField") 35 35 Unsubscribe -> repr.struct(json.null(), "Unsubscribe") 36 36 DebugAddCard(card_id) -> 37 - json.string(card_id) 37 + json.string(card_id.id) 38 38 |> repr.struct("DebugAddCard") 39 39 } 40 40 json.object([#("id", json.string(uuid.to_string(id))), #("request", request)]) ··· 67 67 } 68 68 "DebugAddCard" -> { 69 69 use payload <- repr.struct_payload(decode.string) 70 - decode.success(DebugAddCard(payload)) 70 + decode.success(DebugAddCard(game_state.CardTypeId(payload))) 71 71 } 72 72 _ -> { 73 73 decode.failure(Unsubscribe, "valid #tag")
+34
server/src/bus.gleam
··· 1 + import cartography_api/game_state.{type CardId} 2 + import gleam/erlang/process 3 + import gleam/otp/static_supervisor 4 + import gleam/otp/supervision 5 + import pubsub 6 + 7 + pub opaque type Bus { 8 + Bus(card_accounts_channel: process.Name(pubsub.Message(String, CardId))) 9 + } 10 + 11 + pub fn supervised() { 12 + let card_accounts_channel = process.new_name("CardAccountsChannel") 13 + 14 + let child_spec = 15 + supervision.supervisor(fn() { 16 + static_supervisor.new(static_supervisor.OneForOne) 17 + |> static_supervisor.add( 18 + pubsub.supervised(pubsub.named(card_accounts_channel)), 19 + ) 20 + |> static_supervisor.start() 21 + }) 22 + 23 + #(child_spec, Bus(card_accounts_channel:)) 24 + } 25 + 26 + pub fn notify_card_account(bus: Bus, account_id: String, card_id: CardId) { 27 + process.named_subject(bus.card_accounts_channel) 28 + |> pubsub.broadcast(account_id, card_id) 29 + } 30 + 31 + pub fn on_card_account(bus: Bus, account_id: String) -> process.Subject(CardId) { 32 + process.named_subject(bus.card_accounts_channel) 33 + |> pubsub.subscribe(account_id) 34 + }
+187
server/src/db/sql.gleam
··· 46 46 |> pog.execute(db) 47 47 } 48 48 49 + /// A row you get from running the `create_citizen` query 50 + /// defined in `./src/db/sql/create_citizen.sql`. 51 + /// 52 + /// > 🐿️ This type definition was generated automatically using v4.6.0 of the 53 + /// > [squirrel package](https://github.com/giacomocavalieri/squirrel). 54 + /// 55 + pub type CreateCitizenRow { 56 + CreateCitizenRow(card_id: Int, account_id: String) 57 + } 58 + 59 + /// Runs the `create_citizen` query 60 + /// defined in `./src/db/sql/create_citizen.sql`. 61 + /// 62 + /// > 🐿️ This function was generated automatically using v4.6.0 of 63 + /// > the [squirrel package](https://github.com/giacomocavalieri/squirrel). 64 + /// 65 + pub fn create_citizen( 66 + db: pog.Connection, 67 + arg_1: String, 68 + arg_2: String, 69 + ) -> Result(pog.Returned(CreateCitizenRow), pog.QueryError) { 70 + let decoder = { 71 + use card_id <- decode.field(0, decode.int) 72 + use account_id <- decode.field(1, decode.string) 73 + decode.success(CreateCitizenRow(card_id:, account_id:)) 74 + } 75 + 76 + "WITH 77 + cards_inserted AS ( 78 + INSERT INTO 79 + cards (card_type_id) 80 + VALUES 81 + ($1) 82 + RETURNING 83 + * 84 + ), 85 + citizens_inserted AS ( 86 + INSERT INTO 87 + citizens (id, species_id, name) 88 + SELECT 89 + card.id, 90 + card.card_type_id, 91 + '' 92 + FROM 93 + cards_inserted card 94 + ) 95 + INSERT INTO 96 + card_accounts (card_id, account_id) 97 + SELECT 98 + card.id, 99 + $2 100 + FROM 101 + cards_inserted card 102 + RETURNING 103 + *; 104 + " 105 + |> pog.query 106 + |> pog.parameter(pog.text(arg_1)) 107 + |> pog.parameter(pog.text(arg_2)) 108 + |> pog.returning(decoder) 109 + |> pog.execute(db) 110 + } 111 + 112 + /// A row you get from running the `create_tile` query 113 + /// defined in `./src/db/sql/create_tile.sql`. 114 + /// 115 + /// > 🐿️ This type definition was generated automatically using v4.6.0 of the 116 + /// > [squirrel package](https://github.com/giacomocavalieri/squirrel). 117 + /// 118 + pub type CreateTileRow { 119 + CreateTileRow(card_id: Int, account_id: String) 120 + } 121 + 122 + /// Runs the `create_tile` query 123 + /// defined in `./src/db/sql/create_tile.sql`. 124 + /// 125 + /// > 🐿️ This function was generated automatically using v4.6.0 of 126 + /// > the [squirrel package](https://github.com/giacomocavalieri/squirrel). 127 + /// 128 + pub fn create_tile( 129 + db: pog.Connection, 130 + arg_1: String, 131 + arg_2: String, 132 + ) -> Result(pog.Returned(CreateTileRow), pog.QueryError) { 133 + let decoder = { 134 + use card_id <- decode.field(0, decode.int) 135 + use account_id <- decode.field(1, decode.string) 136 + decode.success(CreateTileRow(card_id:, account_id:)) 137 + } 138 + 139 + "WITH 140 + cards_inserted AS ( 141 + INSERT INTO 142 + cards (card_type_id) 143 + VALUES 144 + ($1) 145 + RETURNING 146 + * 147 + ), 148 + tiles_inserted AS ( 149 + INSERT INTO 150 + tiles (id, tile_type_id, name) 151 + SELECT 152 + card.id, 153 + card.card_type_id, 154 + '' 155 + FROM 156 + cards_inserted card 157 + ) 158 + INSERT INTO 159 + card_accounts (card_id, account_id) 160 + SELECT 161 + card.id, 162 + $2 163 + FROM 164 + cards_inserted card 165 + RETURNING 166 + *; 167 + " 168 + |> pog.query 169 + |> pog.parameter(pog.text(arg_1)) 170 + |> pog.parameter(pog.text(arg_2)) 171 + |> pog.returning(decoder) 172 + |> pog.execute(db) 173 + } 174 + 49 175 /// A row you get from running the `get_account` query 50 176 /// defined in `./src/db/sql/get_account.sql`. 51 177 /// ··· 77 203 accounts 78 204 WHERE 79 205 id = $1 206 + " 207 + |> pog.query 208 + |> pog.parameter(pog.text(arg_1)) 209 + |> pog.returning(decoder) 210 + |> pog.execute(db) 211 + } 212 + 213 + /// A row you get from running the `get_card_type` query 214 + /// defined in `./src/db/sql/get_card_type.sql`. 215 + /// 216 + /// > 🐿️ This type definition was generated automatically using v4.6.0 of the 217 + /// > [squirrel package](https://github.com/giacomocavalieri/squirrel). 218 + /// 219 + pub type GetCardTypeRow { 220 + GetCardTypeRow(id: String, card_set_id: String, class: CardClass) 221 + } 222 + 223 + /// Runs the `get_card_type` query 224 + /// defined in `./src/db/sql/get_card_type.sql`. 225 + /// 226 + /// > 🐿️ This function was generated automatically using v4.6.0 of 227 + /// > the [squirrel package](https://github.com/giacomocavalieri/squirrel). 228 + /// 229 + pub fn get_card_type( 230 + db: pog.Connection, 231 + arg_1: String, 232 + ) -> Result(pog.Returned(GetCardTypeRow), pog.QueryError) { 233 + let decoder = { 234 + use id <- decode.field(0, decode.string) 235 + use card_set_id <- decode.field(1, decode.string) 236 + use class <- decode.field(2, card_class_decoder()) 237 + decode.success(GetCardTypeRow(id:, card_set_id:, class:)) 238 + } 239 + 240 + "SELECT 241 + * 242 + FROM 243 + card_types 244 + WHERE 245 + id = $1; 80 246 " 81 247 |> pog.query 82 248 |> pog.parameter(pog.text(arg_1)) ··· 206 372 |> pog.returning(decoder) 207 373 |> pog.execute(db) 208 374 } 375 + 376 + // --- Enums ------------------------------------------------------------------- 377 + 378 + /// Corresponds to the Postgres `card_class` enum. 379 + /// 380 + /// > 🐿️ This type definition was generated automatically using v4.6.0 of the 381 + /// > [squirrel package](https://github.com/giacomocavalieri/squirrel). 382 + /// 383 + pub type CardClass { 384 + Citizen 385 + Tile 386 + } 387 + 388 + fn card_class_decoder() -> decode.Decoder(CardClass) { 389 + use card_class <- decode.then(decode.string) 390 + case card_class { 391 + "citizen" -> decode.success(Citizen) 392 + "tile" -> decode.success(Tile) 393 + _ -> decode.failure(Citizen, "CardClass") 394 + } 395 + }
+28
server/src/db/sql/create_citizen.sql
··· 1 + WITH 2 + cards_inserted AS ( 3 + INSERT INTO 4 + cards (card_type_id) 5 + VALUES 6 + ($1) 7 + RETURNING 8 + * 9 + ), 10 + citizens_inserted AS ( 11 + INSERT INTO 12 + citizens (id, species_id, name) 13 + SELECT 14 + card.id, 15 + card.card_type_id, 16 + '' 17 + FROM 18 + cards_inserted card 19 + ) 20 + INSERT INTO 21 + card_accounts (card_id, account_id) 22 + SELECT 23 + card.id, 24 + $2 25 + FROM 26 + cards_inserted card 27 + RETURNING 28 + *;
+28
server/src/db/sql/create_tile.sql
··· 1 + WITH 2 + cards_inserted AS ( 3 + INSERT INTO 4 + cards (card_type_id) 5 + VALUES 6 + ($1) 7 + RETURNING 8 + * 9 + ), 10 + tiles_inserted AS ( 11 + INSERT INTO 12 + tiles (id, tile_type_id, name) 13 + SELECT 14 + card.id, 15 + card.card_type_id, 16 + '' 17 + FROM 18 + cards_inserted card 19 + ) 20 + INSERT INTO 21 + card_accounts (card_id, account_id) 22 + SELECT 23 + card.id, 24 + $2 25 + FROM 26 + cards_inserted card 27 + RETURNING 28 + *;
+6
server/src/db/sql/get_card_type.sql
··· 1 + SELECT 2 + * 3 + FROM 4 + card_types 5 + WHERE 6 + id = $1;
+44
server/src/handlers/debug_add_card_handler.gleam
··· 1 + import bus 2 + import cartography_api/game_state 3 + import db/rows 4 + import db/sql 5 + import gleam/result 6 + import gleam/string 7 + import mist 8 + import websocket/state 9 + 10 + pub fn handle( 11 + st: state.State, 12 + card_type_id: game_state.CardTypeId, 13 + ) -> Result(mist.Next(state.State, _msg), String) { 14 + use account_id <- state.account_id(st) 15 + { 16 + let assert Ok(card_type) = 17 + state.db_connection(st) 18 + |> sql.get_card_type(card_type_id.id) 19 + use card_type <- rows.one(card_type) 20 + 21 + use card_id <- result.try(case card_type.class { 22 + sql.Citizen -> { 23 + let assert Ok(citizen) = 24 + state.db_connection(st) 25 + |> sql.create_citizen(account_id, card_type.id) 26 + use citizen <- rows.one(citizen) 27 + Ok(game_state.CitizenId(citizen.card_id)) 28 + } 29 + sql.Tile -> { 30 + let assert Ok(tile) = 31 + state.db_connection(st) 32 + |> sql.create_tile(account_id, card_type.id) 33 + use tile <- rows.one(tile) 34 + Ok(game_state.TileId(tile.card_id)) 35 + } 36 + }) 37 + 38 + state.bus(st) 39 + |> bus.notify_card_account(account_id, card_id) 40 + 41 + Ok(mist.continue(st)) 42 + } 43 + |> result.map_error(string.inspect) 44 + }
+177
server/src/pubsub.gleam
··· 1 + import gleam/dict.{type Dict} 2 + import gleam/erlang/process.{type Down, type Monitor, type Pid, type Subject} 3 + import gleam/otp/actor 4 + import gleam/otp/supervision 5 + import gleam/result 6 + import gleam/set.{type Set} 7 + 8 + pub opaque type Message(channel, message) { 9 + Subscribe(channel, Subject(message)) 10 + Unsubscribe(Subject(message)) 11 + Hangup(Down) 12 + Broadcast(channel, message) 13 + } 14 + 15 + pub type PubSub(channel, message) 16 + 17 + type State(channel, message) { 18 + State( 19 + channels: Dict(channel, Set(Subject(message))), 20 + monitors: Dict(Pid, #(Monitor, Set(Subject(message)))), 21 + ) 22 + } 23 + 24 + pub fn start(name: Name(channel, message)) { 25 + actor.new_with_initialiser(10, fn(sub) { 26 + let selector = 27 + process.new_selector() 28 + |> process.select(sub) 29 + |> process.select_monitors(Hangup) 30 + 31 + actor.initialised(State(channels: dict.new(), monitors: dict.new())) 32 + |> actor.selecting(selector) 33 + |> Ok() 34 + }) 35 + |> actor.named(name) 36 + |> actor.on_message(on_message) 37 + |> actor.start() 38 + } 39 + 40 + type Name(channel, message) = 41 + process.Name(Message(channel, message)) 42 + 43 + pub opaque type Config(channel, message) { 44 + Config(name: Name(channel, message)) 45 + } 46 + 47 + pub fn supervised(config: Config(channel, message)) { 48 + supervision.supervisor(fn() { start(config.name) }) 49 + } 50 + 51 + pub fn named(name: Name(channel, message)) { 52 + Config(name:) 53 + } 54 + 55 + fn on_message( 56 + state: State(channel, message), 57 + message: Message(channel, message), 58 + ) -> actor.Next(State(channel, message), Message(channel, message)) { 59 + case message { 60 + Broadcast(channel, message) -> { 61 + dict.get(state.channels, channel) 62 + |> result.lazy_unwrap(set.new) 63 + |> set.each(process.send(_, message)) 64 + actor.continue(state) 65 + } 66 + Subscribe(channel, subject) -> 67 + handle_subscribe(state, channel, subject) 68 + |> actor.continue() 69 + Unsubscribe(subject) -> 70 + handle_unsubscribe(state, subject) 71 + |> actor.continue() 72 + Hangup(down) -> 73 + handle_hangup(state, down) 74 + |> actor.continue() 75 + } 76 + } 77 + 78 + fn remove_subject(state: State(channel, message), subject: Subject(message)) { 79 + let assert Ok(pid) = process.subject_owner(subject) 80 + let assert Ok(#(monitor, subjects)) = dict.get(state.monitors, pid) 81 + let subjects = set.delete(subjects, subject) 82 + let monitors = case set.is_empty(subjects) { 83 + True -> { 84 + process.demonitor_process(monitor) 85 + dict.delete(state.monitors, pid) 86 + } 87 + False -> dict.insert(state.monitors, pid, #(monitor, subjects)) 88 + } 89 + State(..state, monitors:) 90 + } 91 + 92 + fn remove_listener(state: State(channel, message), subject: Subject(message)) { 93 + let channels = 94 + dict.map_values(state.channels, fn(_, subs) { set.delete(subs, subject) }) 95 + State(..state, channels:) 96 + } 97 + 98 + fn add_listener( 99 + state: State(channel, message), 100 + channel: channel, 101 + subject: Subject(message), 102 + ) { 103 + let channels = 104 + state.channels 105 + |> dict.get(channel) 106 + |> result.lazy_unwrap(set.new) 107 + |> set.insert(subject) 108 + |> dict.insert(state.channels, channel, _) 109 + State(..state, channels:) 110 + } 111 + 112 + fn add_monitor(state: State(channel, message), subject: Subject(message)) { 113 + let assert Ok(pid) = process.subject_owner(subject) 114 + let monitors = case dict.get(state.monitors, pid) { 115 + Ok(#(monitor, subjects)) -> 116 + dict.insert(state.monitors, pid, #(monitor, set.insert(subjects, subject))) 117 + Error(Nil) -> 118 + dict.insert(state.monitors, pid, #( 119 + process.monitor(pid), 120 + set.new() |> set.insert(subject), 121 + )) 122 + } 123 + State(..state, monitors:) 124 + } 125 + 126 + fn remove_monitor(state: State(channel, message), pid: Pid) { 127 + State(..state, monitors: dict.delete(state.monitors, pid)) 128 + } 129 + 130 + fn handle_subscribe( 131 + state: State(channel, message), 132 + channel: channel, 133 + subject: Subject(message), 134 + ) -> State(channel, message) { 135 + state 136 + |> add_listener(channel, subject) 137 + |> add_monitor(subject) 138 + } 139 + 140 + fn handle_unsubscribe( 141 + state: State(channel, message), 142 + subject: Subject(message), 143 + ) -> State(channel, message) { 144 + state 145 + |> remove_listener(subject) 146 + |> remove_subject(subject) 147 + } 148 + 149 + fn handle_hangup( 150 + state: State(channel, message), 151 + down: Down, 152 + ) -> State(channel, message) { 153 + case down { 154 + process.ProcessDown(monitor, pid, _reason) -> { 155 + process.demonitor_process(monitor) 156 + let assert Ok(#(_, subjects)) = dict.get(state.monitors, pid) 157 + subjects 158 + |> set.fold(state, remove_listener) 159 + |> remove_monitor(pid) 160 + } 161 + process.PortDown(..) -> panic as "unreachable" 162 + } 163 + } 164 + 165 + pub fn broadcast( 166 + pubsub: Subject(Message(channel, message)), 167 + channel: channel, 168 + message: message, 169 + ) { 170 + process.send(pubsub, Broadcast(channel, message)) 171 + } 172 + 173 + pub fn subscribe(pubsub: Subject(Message(channel, message)), channel: channel) { 174 + let subject = process.new_subject() 175 + process.send(pubsub, Subscribe(channel, subject)) 176 + subject 177 + }
+11 -7
server/src/server.gleam
··· 1 + import bus 1 2 import envoy 2 3 import gleam/erlang/process 3 4 import gleam/int 4 - import gleam/otp/static_supervisor as sup 5 + import gleam/otp/static_supervisor 5 6 import gleam/result 6 7 import mist 7 8 import palabres ··· 30 31 |> pog.pool_size(10) 31 32 |> pog.supervised() 32 33 33 - let context = context.Context(db_name) 34 + let #(bus_process, bus_handles) = bus.supervised() 35 + 36 + let context = context.Context(db_name, bus_handles) 34 37 let server = 35 - mist.new(fn(req) { router.handler(req, context) }) 38 + mist.new(router.handler(_, context)) 36 39 |> mist.port(port) 37 40 |> mist.supervised() 38 41 39 42 let assert Ok(_) = 40 - sup.new(sup.OneForOne) 41 - |> sup.add(database) 42 - |> sup.add(server) 43 - |> sup.start() 43 + static_supervisor.new(static_supervisor.OneForOne) 44 + |> static_supervisor.add(database) 45 + |> static_supervisor.add(server) 46 + |> static_supervisor.add(bus_process) 47 + |> static_supervisor.start() 44 48 45 49 process.sleep_forever() 46 50 }
+2 -1
server/src/server/context.gleam
··· 1 + import bus 1 2 import gleam/erlang/process.{type Name} 2 3 import pog 3 4 4 5 pub type Context { 5 - Context(db: Name(pog.Message)) 6 + Context(db: Name(pog.Message), bus: bus.Bus) 6 7 }
+8 -1
server/src/server/router.gleam
··· 7 7 8 8 pub fn handler(req: request.Request(mist.Connection), context: Context) { 9 9 case request.path_segments(req) { 10 - ["websocket"] -> handler.start(req, context) 10 + ["websocket"] -> { 11 + case handler.start(req, context) { 12 + Ok(response) -> response 13 + Error(_) -> 14 + response.new(400) 15 + |> response.set_body(mist.Bytes(bytes_tree.new())) 16 + } 17 + } 11 18 _ -> 12 19 response.new(404) 13 20 |> response.set_body(mist.Bytes(bytes_tree.new()))
+38 -7
server/src/websocket/handler.gleam
··· 1 1 import cartography_api/request 2 + import envoy 2 3 import gleam/http/request as http 4 + import gleam/http/response 5 + import gleam/list 6 + import gleam/result 7 + import gleam/string 3 8 import handlers/authenticate_handler 9 + import handlers/debug_add_card_handler 4 10 import handlers/list_fields_handler 5 11 import json_websocket 6 12 import mist.{type WebsocketConnection} ··· 21 27 request.WatchField(_) -> { 22 28 todo 23 29 } 24 - request.DebugAddCard(_) -> { 25 - todo 26 - } 30 + request.DebugAddCard(card_id) -> 31 + debug_add_card_handler.handle(state, card_id) 27 32 request.Unsubscribe -> { 28 33 todo 29 34 } ··· 41 46 } 42 47 43 48 pub fn start(request: http.Request(mist.Connection), context: Context) { 44 - state.new(context) 45 - |> json_websocket.new() 46 - |> json_websocket.message(request.decoder(), handle_message) 47 - |> json_websocket.start(request) 49 + use protocol <- result.try(http.get_header(request, "sec-websocket-protocol")) 50 + start_with_protocol(string.split(protocol, on: ","), request, context) 51 + } 52 + 53 + fn start_with_protocol( 54 + protocol: List(String), 55 + request: http.Request(mist.Connection), 56 + context: Context, 57 + ) { 58 + let supported = 59 + envoy.get("WEBSOCKET_PROTOCOLS") 60 + |> result.unwrap("json") 61 + |> string.split(on: ",") 62 + 63 + let allow_json = list.contains(supported, "json") 64 + 65 + case protocol { 66 + [] -> Error(Nil) 67 + ["v1-json.cartography.app", ..] if allow_json -> 68 + state.new(context) 69 + |> json_websocket.new() 70 + |> json_websocket.message(request.decoder(), handle_message) 71 + |> json_websocket.start(request) 72 + |> response.set_header( 73 + "sec-websocket-protocol", 74 + "v1-json.cartography.app", 75 + ) 76 + |> Ok() 77 + [_, ..rest] -> start_with_protocol(rest, request, context) 78 + } 48 79 }
+5
server/src/websocket/state.gleam
··· 1 + import bus 1 2 import gleam/dict 2 3 import gleam/option 3 4 import pog ··· 31 32 32 33 pub fn db_connection(state: State) -> pog.Connection { 33 34 pog.named_connection(state.context.db) 35 + } 36 + 37 + pub fn bus(state: State) -> bus.Bus { 38 + state.context.bus 34 39 } 35 40 36 41 pub fn add_listener(
+1 -1
src/lib/appserver/socket/SocketV1.svelte.ts
··· 24 24 } 25 25 26 26 export class SocketV1 extends ReactiveEventTarget<SocketV1EventMap> { 27 - static readonly PROTOCOL = ["v1.cartography.app"]; 27 + static readonly PROTOCOL = ["v1-json.cartography.app", "v1-messagepack.cartography.app"]; 28 28 29 29 #socket: WebSocket; 30 30 #url: string;