tangled
alpha
login
or
join now
crashkeys.dev
/
audquotes
0
fork
atom
Source code for my personal quote bot project.
0
fork
atom
overview
issues
pulls
pipelines
Tidying up.
ironballista
6 months ago
622f279f
fcf53881
+55
-21
1 changed file
expand all
collapse all
unified
split
src
main.rs
+55
-21
src/main.rs
reviewed
···
20
20
const EVENT_QUEUE: &str = "queue:event";
21
21
22
22
// See https://cron.help for what these strings mean
23
23
-
const POSTING_INTERVAL_CRON: &str = "0 0,30 * * * *";
23
23
+
const POSTING_INTERVAL_CRON: &str = "0 0,30 * * * *";
24
24
+
const POSTING_INTERVAL_DEBUG: &str = "0,30 * * * * *";
24
25
const EVENT_UPDATE_INTERVAL: &str = "55 23 * * *";
26
26
+
27
27
+
const POSTING_RETRIES: i32 = 5;
25
28
26
29
fn prepare_post<I: Into<String>>(text: I) -> post::RecordData {
27
30
post::RecordData {
···
77
80
mut con: impl redis::aio::ConnectionLike + AsyncCommands,
78
81
output_queue: &str,
79
82
) -> Result<(), ()> {
80
80
-
let len: u64 = con.llen(output_queue).await.unwrap();
83
83
+
let len: u64 = con.llen(output_queue).await.map_err(|_| ())?;
81
84
// NOTE: The following assumes the queue hasn't been repopulated by any other client
82
85
// in-between the call to llen and the execution of the pipeline.
83
86
// Hopefully won't be a problem :)
···
93
96
for file_contents in file_contents.into_iter() {
94
97
pipeline.lpush(output_queue, file_contents.as_str());
95
98
}
96
96
-
let _: () = pipeline.query_async(&mut con).await.unwrap();
99
99
+
let _: () = pipeline.query_async(&mut con).await.map_err(|_| ())?;
97
100
}
98
101
99
102
Ok(())
···
120
123
redis::Client::open(std::env::var("REDIS_URL").unwrap_or("redis://localhost".to_string()))?;
121
124
let con = redis.get_multiplexed_async_connection().await?;
122
125
123
123
-
let agent = BskyAgent::builder().build().await?;
124
124
-
let _session = agent
125
125
-
.login(
126
126
-
std::env::var("BLUESKY_USERNAME").unwrap_or_default(),
127
127
-
std::env::var("BLUESKY_PASSWORD").unwrap_or_default(),
128
128
-
)
129
129
-
.await?;
126
126
+
let debug_mode = std::env::var("DEBUG").unwrap_or("0".to_string()) == "1";
127
127
+
128
128
+
let (agent, session) = if !debug_mode {
129
129
+
let agent = BskyAgent::builder().build().await?;
130
130
+
let session = agent
131
131
+
.login(
132
132
+
std::env::var("BLUESKY_USERNAME").unwrap_or_default(),
133
133
+
std::env::var("BLUESKY_PASSWORD").unwrap_or_default(),
134
134
+
)
135
135
+
.await?;
136
136
+
137
137
+
(Some(Arc::new(Mutex::new(agent))), Some(session))
138
138
+
} else {
139
139
+
(None, None) // Let's just simulate what the bot would post
140
140
+
};
130
141
131
142
let sched = JobScheduler::new().await?;
132
132
-
let agent = Arc::new(Mutex::new(agent));
133
133
-
143
143
+
134
144
/*
135
135
-
let event_filter = Arc::new(QuoteFilter {
136
136
-
content: r"\b(?i:mother|mommy|mama|mom)\b".to_string(),
137
137
-
path: "test/**/*.txt".to_string(),
145
145
+
let event_filter = Arc::new(QuoteFilter {
146
146
+
content: r"\b(?i:mother|mommy|mama|mom)\b".to_string(),
147
147
+
path: "test/**/
148
148
+
*.txt".to_string(),
138
149
dates: vec![],
139
150
});
140
151
*/
141
152
142
153
let regular_filter = Arc::new(QuoteFilter {
143
154
content: r".*".to_string(),
144
144
-
path: "quotes/**/*.txt".to_string(),
155
155
+
path: if !debug_mode { "quotes/**/*.txt".to_string() } else { "test/**/*.txt".to_string() },
145
156
dates: vec![],
146
157
});
147
158
148
159
let (con_poster, con_event_monitor) = (con.clone(), con.clone());
149
160
let (agent_poster, agent_event_monitor) = (agent.clone(), agent.clone());
150
161
162
162
+
let posting_interval = if !debug_mode {
163
163
+
POSTING_INTERVAL_CRON
164
164
+
} else {
165
165
+
POSTING_INTERVAL_DEBUG
166
166
+
};
167
167
+
151
168
// Add async job
152
169
sched
153
153
-
.add(Job::new_async(POSTING_INTERVAL_CRON, move |_uuid, _| {
170
170
+
.add(Job::new_async(posting_interval, move |_uuid, _| {
154
171
let filter = regular_filter.clone();
155
172
let con = con_poster.clone();
156
173
let agent = agent_poster.clone();
174
174
+
let session = session.clone();
157
175
158
176
Box::pin(async move {
159
177
let text: String = get_quote(&filter, con).await.unwrap();
160
160
-
let post = prepare_post(text.as_str());
161
161
-
let agent = agent.lock().await;
162
162
-
if let Err(e) = agent.create_record(post).await {
163
163
-
eprintln!("Could not post quote: {e}")
178
178
+
179
179
+
if let (Some(agent), Some(session)) = (agent, session) {
180
180
+
let post = prepare_post(text.as_str());
181
181
+
let agent = agent.lock().await;
182
182
+
183
183
+
for _ in 0..POSTING_RETRIES {
184
184
+
if let Err(e) = agent.create_record(post.clone()).await {
185
185
+
eprintln!("Could not post quote: `{e}`");
186
186
+
eprintln!("Attempting to refresh login...");
187
187
+
188
188
+
if let Err(e) = agent.resume_session(session.clone()).await {
189
189
+
eprintln!("Failed to resume sessions due to following error: {e}")
190
190
+
}
191
191
+
} else {
192
192
+
break;
193
193
+
}
194
194
+
}
195
195
+
} else {
196
196
+
// Let's just print the quote!
197
197
+
println!("{}\n", text);
164
198
}
165
199
})
166
200
})?)