tangled
alpha
login
or
join now
eldridge.cam
/
cartography
0
fork
atom
Trading card city builder game?
0
fork
atom
overview
issues
pulls
pipelines
add some unit tests
eldridge.cam
1 month ago
9e456fbf
72cc018c
+127
-39
5 changed files
expand all
collapse all
unified
split
server
src
bus.gleam
pubsub.gleam
util.gleam
test
pubsub_test.gleam
server_test.gleam
+3
-1
server/src/bus.gleam
···
15
15
supervision.supervisor(fn() {
16
16
static_supervisor.new(static_supervisor.OneForOne)
17
17
|> static_supervisor.add(
18
18
-
pubsub.supervised(pubsub.named(card_accounts_channel)),
18
18
+
pubsub.new()
19
19
+
|> pubsub.named(card_accounts_channel)
20
20
+
|> pubsub.supervised(),
19
21
)
20
22
|> static_supervisor.start()
21
23
})
+53
-30
server/src/pubsub.gleam
···
1
1
import gleam/dict.{type Dict}
2
2
import gleam/erlang/process.{type Down, type Monitor, type Pid, type Subject}
3
3
+
import gleam/option
3
4
import gleam/otp/actor
4
5
import gleam/otp/supervision
5
6
import gleam/result
6
7
import gleam/set.{type Set}
8
8
+
import util
7
9
8
10
pub opaque type Message(channel, message) {
9
11
Subscribe(channel, Subject(message))
10
12
Unsubscribe(Subject(message))
11
13
Hangup(Down)
12
14
Broadcast(channel, message)
15
15
+
Stop
13
16
}
14
17
15
18
pub type PubSub(channel, message)
···
21
24
)
22
25
}
23
26
24
24
-
pub fn start(name: Name(channel, message)) {
27
27
+
pub fn start(config: Config(channel, message)) {
25
28
actor.new_with_initialiser(10, fn(sub) {
26
29
let selector =
27
30
process.new_selector()
···
30
33
31
34
actor.initialised(State(channels: dict.new(), monitors: dict.new()))
32
35
|> actor.selecting(selector)
36
36
+
|> actor.returning(sub)
33
37
|> Ok()
34
38
})
35
35
-
|> actor.named(name)
39
39
+
|> util.with_some(config.name, actor.named)
36
40
|> actor.on_message(on_message)
37
41
|> actor.start()
38
42
}
···
41
45
process.Name(Message(channel, message))
42
46
43
47
pub opaque type Config(channel, message) {
44
44
-
Config(name: Name(channel, message))
48
48
+
Config(name: option.Option(Name(channel, message)))
45
49
}
46
50
47
51
pub fn supervised(config: Config(channel, message)) {
48
48
-
supervision.supervisor(fn() { start(config.name) })
52
52
+
supervision.supervisor(fn() { start(config) })
53
53
+
}
54
54
+
55
55
+
pub fn new() {
56
56
+
Config(name: option.None)
49
57
}
50
58
51
51
-
pub fn named(name: Name(channel, message)) {
52
52
-
Config(name:)
59
59
+
pub fn named(_config: Config(_c, _m), name: Name(channel, message)) {
60
60
+
Config(name: option.Some(name))
53
61
}
54
62
55
63
fn on_message(
···
72
80
Hangup(down) ->
73
81
handle_hangup(state, down)
74
82
|> actor.continue()
75
75
-
}
76
76
-
}
77
77
-
78
78
-
fn remove_subject(state: State(channel, message), subject: Subject(message)) {
79
79
-
let assert Ok(pid) = process.subject_owner(subject)
80
80
-
let assert Ok(#(monitor, subjects)) = dict.get(state.monitors, pid)
81
81
-
let subjects = set.delete(subjects, subject)
82
82
-
let monitors = case set.is_empty(subjects) {
83
83
-
True -> {
84
84
-
process.demonitor_process(monitor)
85
85
-
dict.delete(state.monitors, pid)
86
86
-
}
87
87
-
False -> dict.insert(state.monitors, pid, #(monitor, subjects))
83
83
+
Stop -> actor.stop()
88
84
}
89
89
-
State(..state, monitors:)
90
90
-
}
91
91
-
92
92
-
fn remove_listener(state: State(channel, message), subject: Subject(message)) {
93
93
-
let channels =
94
94
-
dict.map_values(state.channels, fn(_, subs) { set.delete(subs, subject) })
95
95
-
State(..state, channels:)
96
85
}
97
86
98
87
fn add_listener(
···
123
112
State(..state, monitors:)
124
113
}
125
114
126
126
-
fn remove_monitor(state: State(channel, message), pid: Pid) {
127
127
-
State(..state, monitors: dict.delete(state.monitors, pid))
128
128
-
}
129
129
-
130
115
fn handle_subscribe(
131
116
state: State(channel, message),
132
117
channel: channel,
···
137
122
|> add_monitor(subject)
138
123
}
139
124
125
125
+
fn remove_listener(state: State(channel, message), subject: Subject(message)) {
126
126
+
let channels =
127
127
+
dict.map_values(state.channels, fn(_, subs) { set.delete(subs, subject) })
128
128
+
State(..state, channels:)
129
129
+
}
130
130
+
131
131
+
fn remove_subject(state: State(channel, message), subject: Subject(message)) {
132
132
+
let assert Ok(pid) = process.subject_owner(subject)
133
133
+
{
134
134
+
use #(monitor, subjects) <- result.map(dict.get(state.monitors, pid))
135
135
+
let subjects = set.delete(subjects, subject)
136
136
+
let monitors = case set.is_empty(subjects) {
137
137
+
True -> {
138
138
+
process.demonitor_process(monitor)
139
139
+
dict.delete(state.monitors, pid)
140
140
+
}
141
141
+
False -> dict.insert(state.monitors, pid, #(monitor, subjects))
142
142
+
}
143
143
+
State(..state, monitors:)
144
144
+
}
145
145
+
|> result.unwrap(state)
146
146
+
}
147
147
+
140
148
fn handle_unsubscribe(
141
149
state: State(channel, message),
142
150
subject: Subject(message),
···
144
152
state
145
153
|> remove_listener(subject)
146
154
|> remove_subject(subject)
155
155
+
}
156
156
+
157
157
+
fn remove_monitor(state: State(channel, message), pid: Pid) {
158
158
+
State(..state, monitors: dict.delete(state.monitors, pid))
147
159
}
148
160
149
161
fn handle_hangup(
···
175
187
process.send(pubsub, Subscribe(channel, subject))
176
188
subject
177
189
}
190
190
+
191
191
+
pub fn unsubscribe(
192
192
+
pubsub: Subject(Message(channel, message)),
193
193
+
subject: Subject(message),
194
194
+
) {
195
195
+
process.send(pubsub, Unsubscribe(subject))
196
196
+
}
197
197
+
198
198
+
pub fn stop(pubsub: Subject(Message(channel, message))) {
199
199
+
process.send(pubsub, Stop)
200
200
+
}
+8
server/src/util.gleam
···
1
1
+
import gleam/option
2
2
+
3
3
+
pub fn with_some(v: v, option: option.Option(t), do: fn(v, t) -> v) -> v {
4
4
+
case option {
5
5
+
option.Some(t) -> do(v, t)
6
6
+
option.None -> v
7
7
+
}
8
8
+
}
+63
server/test/pubsub_test.gleam
···
1
1
+
import gleam/erlang/process
2
2
+
import pubsub
3
3
+
4
4
+
type Channel {
5
5
+
One
6
6
+
Two
7
7
+
}
8
8
+
9
9
+
pub fn pubsub_received_test() {
10
10
+
let assert Ok(pubsub_actor) = pubsub.start(pubsub.new())
11
11
+
let pubsub_subject = pubsub_actor.data
12
12
+
13
13
+
let subscription = pubsub.subscribe(pubsub_subject, One)
14
14
+
15
15
+
pubsub.broadcast(pubsub_subject, One, 1)
16
16
+
pubsub.broadcast(pubsub_subject, Two, 2)
17
17
+
18
18
+
let assert Ok(1) = process.receive(subscription, 1)
19
19
+
let assert Error(_) = process.receive(subscription, 1)
20
20
+
21
21
+
pubsub.stop(pubsub_subject)
22
22
+
}
23
23
+
24
24
+
pub fn pubsub_multiple_test() {
25
25
+
let assert Ok(pubsub_actor) = pubsub.start(pubsub.new())
26
26
+
let pubsub_subject = pubsub_actor.data
27
27
+
28
28
+
let subscription_1 = pubsub.subscribe(pubsub_subject, One)
29
29
+
let subscription_2 = pubsub.subscribe(pubsub_subject, One)
30
30
+
31
31
+
pubsub.broadcast(pubsub_subject, One, 1)
32
32
+
33
33
+
let assert Ok(1) = process.receive(subscription_1, 1)
34
34
+
let assert Ok(1) = process.receive(subscription_2, 1)
35
35
+
36
36
+
pubsub.stop(pubsub_subject)
37
37
+
}
38
38
+
39
39
+
pub fn pubsub_unsubscribe_test() {
40
40
+
let assert Ok(pubsub_actor) = pubsub.start(pubsub.new())
41
41
+
let pubsub_subject = pubsub_actor.data
42
42
+
43
43
+
let subscription = pubsub.subscribe(pubsub_subject, One)
44
44
+
pubsub.broadcast(pubsub_subject, One, 1)
45
45
+
pubsub.unsubscribe(pubsub_subject, subscription)
46
46
+
pubsub.broadcast(pubsub_subject, One, 2)
47
47
+
48
48
+
let assert Ok(1) = process.receive(subscription, 1)
49
49
+
let assert Error(_) = process.receive(subscription, 1)
50
50
+
51
51
+
pubsub.stop(pubsub_subject)
52
52
+
}
53
53
+
54
54
+
pub fn pubsub_reunsubscribe_test() {
55
55
+
let assert Ok(pubsub_actor) = pubsub.start(pubsub.new())
56
56
+
let pubsub_subject = pubsub_actor.data
57
57
+
58
58
+
let subscription = pubsub.subscribe(pubsub_subject, One)
59
59
+
pubsub.unsubscribe(pubsub_subject, subscription)
60
60
+
pubsub.unsubscribe(pubsub_subject, subscription)
61
61
+
62
62
+
pubsub.stop(pubsub_subject)
63
63
+
}
-8
server/test/server_test.gleam
···
3
3
pub fn main() -> Nil {
4
4
gleeunit.main()
5
5
}
6
6
-
7
7
-
// gleeunit test functions end in `_test`
8
8
-
pub fn hello_world_test() {
9
9
-
let name = "Joe"
10
10
-
let greeting = "Hello, " <> name <> "!"
11
11
-
12
12
-
assert greeting == "Hello, Joe!"
13
13
-
}