tangled
alpha
login
or
join now
eldridge.cam
/
cartography
0
fork
atom
Trading card city builder game?
0
fork
atom
overview
issues
pulls
pipelines
remove notifications code
eldridge.cam
1 month ago
c1cf38da
8238baeb
0/2
gleam-ci.yaml
failed
2m 43s
svelte-ci.yaml
timeout
3m 25s
-219
1 changed file
expand all
collapse all
unified
split
server
src
db
notification_listener.gleam
-219
server/src/db/notification_listener.gleam
reviewed
···
1
1
-
import gleam/dynamic/decode
2
2
-
import gleam/erlang/process.{type Name}
3
3
-
import gleam/erlang/reference
4
4
-
import gleam/io
5
5
-
import gleam/json
6
6
-
import gleam/option.{type Option}
7
7
-
import gleam/otp/actor
8
8
-
import gleam/result
9
9
-
import gleam/string
10
10
-
import palabres
11
11
-
import pog
12
12
-
13
13
-
pub type State(st) {
14
14
-
State(
15
15
-
state: st,
16
16
-
reference: reference.Reference,
17
17
-
notifications: pog.NotificationsConnection,
18
18
-
)
19
19
-
}
20
20
-
21
21
-
pub type Message(msg) {
22
22
-
Notification(pog.Notification)
23
23
-
Unlisten
24
24
-
Msg(msg)
25
25
-
}
26
26
-
27
27
-
pub type Never
28
28
-
29
29
-
pub opaque type Next(state) {
30
30
-
Continue(state)
31
31
-
Stop
32
32
-
StopAbnormal(String)
33
33
-
}
34
34
-
35
35
-
pub fn continue(state: state) {
36
36
-
Continue(state)
37
37
-
}
38
38
-
39
39
-
pub fn stop() {
40
40
-
Stop
41
41
-
}
42
42
-
43
43
-
pub fn stop_abnormal(reason: String) {
44
44
-
StopAbnormal(reason)
45
45
-
}
46
46
-
47
47
-
pub opaque type Builder(state, event, msg) {
48
48
-
Builder(
49
49
-
initial_state: state,
50
50
-
channel: Option(#(pog.NotificationsConnection, String)),
51
51
-
handler: Option(fn(state, msg) -> Next(state)),
52
52
-
event_handler: Option(
53
53
-
#(decode.Decoder(event), fn(state, event) -> Next(state)),
54
54
-
),
55
55
-
name: Option(Name(Message(msg))),
56
56
-
)
57
57
-
}
58
58
-
59
59
-
pub fn new(state: state) -> Builder(state, Never, Never) {
60
60
-
Builder(
61
61
-
initial_state: state,
62
62
-
channel: option.None,
63
63
-
handler: option.None,
64
64
-
event_handler: option.None,
65
65
-
name: option.None,
66
66
-
)
67
67
-
}
68
68
-
69
69
-
pub fn named(
70
70
-
builder: Builder(state, event, msg),
71
71
-
name: Name(Message(msg)),
72
72
-
) -> Builder(state, event, msg) {
73
73
-
Builder(..builder, name: option.Some(name))
74
74
-
}
75
75
-
76
76
-
pub fn listen_to(
77
77
-
builder: Builder(state, event, msg),
78
78
-
notifications: pog.NotificationsConnection,
79
79
-
channel: String,
80
80
-
) -> Builder(state, event, msg) {
81
81
-
Builder(..builder, channel: option.Some(#(notifications, channel)))
82
82
-
}
83
83
-
84
84
-
pub fn on_message(
85
85
-
builder: Builder(state, event, _msg),
86
86
-
handler: fn(state, msg) -> Next(state),
87
87
-
) -> Builder(state, event, msg) {
88
88
-
Builder(..builder, handler: option.Some(handler))
89
89
-
}
90
90
-
91
91
-
pub fn on_notification(
92
92
-
builder: Builder(state, _event, msg),
93
93
-
decoder: decode.Decoder(event),
94
94
-
handler: fn(state, event) -> Next(state),
95
95
-
) -> Builder(state, event, msg) {
96
96
-
Builder(..builder, event_handler: option.Some(#(decoder, handler)))
97
97
-
}
98
98
-
99
99
-
fn with(state: t, option: Option(v), apply: fn(t, v) -> t) -> t {
100
100
-
case option {
101
101
-
option.None -> state
102
102
-
option.Some(v) -> apply(state, v)
103
103
-
}
104
104
-
}
105
105
-
106
106
-
fn shutdown(state: State(st)) {
107
107
-
pog.unlisten(state.notifications, state.reference)
108
108
-
}
109
109
-
110
110
-
pub fn unlisten(listener: process.Subject(Message(msg))) {
111
111
-
process.send(listener, Unlisten)
112
112
-
}
113
113
-
114
114
-
pub fn start(
115
115
-
builder: Builder(state, event, msg),
116
116
-
) -> Result(actor.Started(process.Subject(Message(msg))), actor.StartError) {
117
117
-
use #(notifications, channel) <- result.try(
118
118
-
option.to_result(builder.channel, "missing channel")
119
119
-
|> result.map_error(actor.InitFailed),
120
120
-
)
121
121
-
use #(event_decoder, event_handler) <- result.try(
122
122
-
option.to_result(builder.event_handler, "missing event handler")
123
123
-
|> result.map_error(actor.InitFailed),
124
124
-
)
125
125
-
126
126
-
actor.new_with_initialiser(100, fn(subject) -> Result(
127
127
-
actor.Initialised(State(state), Message(msg), process.Subject(Message(msg))),
128
128
-
String,
129
129
-
) {
130
130
-
palabres.info("starting database listener")
131
131
-
|> palabres.string("channel", channel)
132
132
-
|> palabres.log()
133
133
-
134
134
-
use reference <- result.try(
135
135
-
pog.listen(notifications, channel)
136
136
-
|> result.map_error(fn(_) { "failed to start listener" }),
137
137
-
)
138
138
-
139
139
-
let selector: process.Selector(Message(msg)) =
140
140
-
process.new_selector()
141
141
-
|> process.select(subject)
142
142
-
|> process.merge_selector(
143
143
-
pog.notification_selector()
144
144
-
|> process.map_selector(Notification),
145
145
-
)
146
146
-
|> process.select_other(fn(dyn) {
147
147
-
io.println(string.inspect(dyn))
148
148
-
Unlisten
149
149
-
})
150
150
-
151
151
-
Ok(
152
152
-
actor.initialised(State(
153
153
-
state: builder.initial_state,
154
154
-
reference:,
155
155
-
notifications:,
156
156
-
))
157
157
-
|> actor.returning(subject)
158
158
-
|> actor.selecting(selector),
159
159
-
)
160
160
-
})
161
161
-
|> with(builder.name, actor.named)
162
162
-
|> actor.on_message(fn(state, message) {
163
163
-
use delegate_message <- handle_generic(state, message, event_decoder)
164
164
-
let sub_next = case delegate_message {
165
165
-
Event(event) -> event_handler(state.state, event)
166
166
-
Message(msg) -> {
167
167
-
let assert option.Some(handler) = builder.handler
168
168
-
handler(state.state, msg)
169
169
-
}
170
170
-
}
171
171
-
case sub_next {
172
172
-
Stop -> {
173
173
-
shutdown(state)
174
174
-
actor.stop()
175
175
-
}
176
176
-
StopAbnormal(reason) -> {
177
177
-
shutdown(state)
178
178
-
actor.stop_abnormal(reason)
179
179
-
}
180
180
-
Continue(substate) -> actor.continue(State(..state, state: substate))
181
181
-
}
182
182
-
})
183
183
-
|> actor.start()
184
184
-
}
185
185
-
186
186
-
fn handle_generic(
187
187
-
state: State(state),
188
188
-
message: Message(msg),
189
189
-
decoder: decode.Decoder(event),
190
190
-
delegate_fn: fn(DelegateMessage(event, msg)) ->
191
191
-
actor.Next(State(state), Message(msg)),
192
192
-
) {
193
193
-
case message {
194
194
-
Notification(pog.Notify(_, _, channel, payload)) -> {
195
195
-
palabres.debug("database notification received")
196
196
-
|> palabres.string("channel", channel)
197
197
-
|> palabres.string("payload", payload)
198
198
-
|> palabres.log()
199
199
-
200
200
-
case json.parse(payload, using: decoder) {
201
201
-
Ok(event) -> delegate_fn(Event(event))
202
202
-
Error(_) -> {
203
203
-
shutdown(state)
204
204
-
actor.stop_abnormal("unexpected event")
205
205
-
}
206
206
-
}
207
207
-
}
208
208
-
Unlisten -> {
209
209
-
shutdown(state)
210
210
-
actor.stop()
211
211
-
}
212
212
-
Msg(msg) -> delegate_fn(Message(msg))
213
213
-
}
214
214
-
}
215
215
-
216
216
-
type DelegateMessage(event, message) {
217
217
-
Event(event)
218
218
-
Message(message)
219
219
-
}