Source code for my personal quote bot project.

Rewrote most of the internals.

This should hopefully fix the occasional, yet highly annoying crashes. maybe.

+226 -105
+11
Cargo.lock
··· 163 163 checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" 164 164 165 165 [[package]] 166 + name = "backon" 167 + version = "1.5.2" 168 + source = "registry+https://github.com/rust-lang/crates.io-index" 169 + checksum = "592277618714fbcecda9a02ba7a8781f319d26532a88553bbacc77ba5d2b3a8d" 170 + dependencies = [ 171 + "fastrand", 172 + ] 173 + 174 + [[package]] 166 175 name = "backtrace" 167 176 version = "0.3.74" 168 177 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1467 1476 checksum = "8034fb926579ff49d3fe58d288d5dcb580bf11e9bccd33224b45adebf0fd0c23" 1468 1477 dependencies = [ 1469 1478 "arc-swap", 1479 + "backon", 1470 1480 "bytes", 1471 1481 "combine", 1482 + "futures-channel", 1472 1483 "futures-util", 1473 1484 "itoa", 1474 1485 "num-bigint",
+1 -1
Cargo.toml
··· 11 11 glob = "0.3.2" 12 12 grep = "0.3.2" 13 13 rand = "0.9.0" 14 - redis = { version = "0.29.1", features = ["aio", "tokio-comp"] } 14 + redis = { version = "0.29.1", features = ["aio", "connection-manager", "tokio-comp"] } 15 15 tokio = { version = "1.44.0", features = ["full"] } 16 16 tokio-cron-scheduler = "0.13.0"
+14
config.toml
··· 1 + [quotes] 2 + default = { path = "quotes/**/*.txt", content = ".*", dates = [] } 3 + 4 + [quotes.father] 5 + path = "quotes/**/*.txt" 6 + content = "\b(?i:father|dad|daddy|papa|pops)\b" 7 + dates = [ 8 + # whichever date is Father's day 9 + ] 10 + 11 + [quotes.mother] 12 + path = "quotes/**/*.txt" 13 + content = "\b(?i:mother|mommy|mama|mom)\b" 14 + dates = []
+200 -104
src/main.rs
··· 1 - use bsky_sdk::BskyAgent; 1 + use bsky_sdk::api::app; 2 2 use bsky_sdk::api::app::bsky::feed::post; 3 3 use bsky_sdk::api::types::string::Datetime; 4 + use bsky_sdk::{BskyAgent, api::types::Object}; 4 5 5 6 use glob::glob; 6 7 use grep::{matcher::Matcher, regex, searcher::sinks}; 7 8 use rand::random_range; 8 9 use rand::seq::SliceRandom; 10 + use redis::aio::ConnectionManagerConfig; 9 11 10 12 use std::{sync::Arc, time::Duration}; 11 13 use tokio::sync::Mutex; ··· 21 23 22 24 // See https://cron.help for what these strings mean 23 25 const POSTING_INTERVAL_CRON: &str = "0 0,30 * * * *"; 24 - const POSTING_INTERVAL_DEBUG: &str = "0,30 * * * * *"; 26 + const POSTING_INTERVAL_DEBUG: &str = "1/10 * * * * *"; 25 27 const EVENT_UPDATE_INTERVAL: &str = "55 23 * * *"; 26 28 27 29 const POSTING_RETRIES: i32 = 5; ··· 40 42 } 41 43 } 42 44 45 + #[derive(Clone, Debug)] 43 46 struct QuoteFilter { 44 47 path: String, 45 48 content: String, 46 49 dates: Vec<String>, 47 50 } 48 51 49 - fn read_files(filter: &QuoteFilter) -> Vec<String> { 50 - let matcher = regex::RegexMatcher::new(&filter.content).unwrap(); 51 - let mut searcher = grep::searcher::Searcher::new(); 52 - let mut results = Vec::new(); 52 + impl QuoteFilter { 53 + pub async fn get_quote( 54 + &self, 55 + mut con: impl redis::aio::ConnectionLike + AsyncCommands + Clone, 56 + ) -> Result<String, ()> { 57 + // 1: Attempt to read from the event (priority) queue 58 + let event_quote: Option<String> = con.lpop(EVENT_QUEUE, None).await.ok(); 59 + if let Some(quote) = event_quote { 60 + return Ok(quote); 61 + } 53 62 54 - for file in glob(&filter.path).unwrap() { 55 - let file = match file { 56 - Ok(file) => file, 57 - Err(_) => continue, 58 - }; 63 + // 2: Otherwise, we read from the regular queue, repopulating it if it's empty 64 + self.reshuffle_quotes(con.clone(), DEFAULT_QUEUE).await?; 65 + con.lpop(DEFAULT_QUEUE, None).await.map_err(|_| ()) 66 + } 59 67 60 - let mut matched = false; 61 - let sink = sinks::Lossy(|_lnum, _line| { 62 - matched = true; 63 - Ok(false) 64 - }); 68 + async fn reshuffle_quotes( 69 + &self, 70 + mut con: impl redis::aio::ConnectionLike + AsyncCommands, 71 + output_queue: &str, 72 + ) -> Result<(), ()> { 73 + let len: u64 = con.llen(output_queue).await.map_err(|_| ())?; 74 + // NOTE: The following assumes the queue hasn't been repopulated by any other client 75 + // in-between the call to llen and the execution of the pipeline. 76 + // Hopefully won't be a problem :) 77 + if len == 0 { 78 + let mut file_contents = self.read_files(); 65 79 66 - let search_result = searcher.search_path(&matcher, &file, sink); 67 - if !matched || search_result.is_err() { 68 - continue; 80 + { 81 + let mut rand = rand::rng(); 82 + file_contents.shuffle(&mut rand); 83 + } 84 + 85 + let mut pipeline = redis::pipe(); 86 + for file_contents in file_contents.into_iter() { 87 + pipeline.lpush(output_queue, file_contents.as_str()); 88 + } 89 + let _: () = pipeline.query_async(&mut con).await.map_err(|_| ())?; 69 90 } 70 91 71 - let contents = std::fs::read_to_string(file).unwrap(); 72 - results.push(contents.trim().to_string()); 92 + Ok(()) 73 93 } 74 94 75 - results 95 + fn read_files(&self) -> Vec<String> { 96 + let matcher = regex::RegexMatcher::new(&self.content).unwrap(); 97 + let mut searcher = grep::searcher::Searcher::new(); 98 + let mut results = Vec::new(); 99 + 100 + for file in glob(&self.path).unwrap() { 101 + let file = match file { 102 + Ok(file) => file, 103 + Err(_) => continue, 104 + }; 105 + 106 + let mut matched = false; 107 + let sink = sinks::Lossy(|_lnum, _line| { 108 + matched = true; 109 + Ok(false) 110 + }); 111 + 112 + let search_result = searcher.search_path(&matcher, &file, sink); 113 + if !matched || search_result.is_err() { 114 + continue; 115 + } 116 + 117 + let contents = std::fs::read_to_string(file).unwrap(); 118 + results.push(contents.trim().to_string()); 119 + } 120 + 121 + results 122 + } 76 123 } 77 124 78 - async fn reshuffle_quotes( 79 - filter: &QuoteFilter, 80 - mut con: impl redis::aio::ConnectionLike + AsyncCommands, 81 - output_queue: &str, 82 - ) -> Result<(), ()> { 83 - let len: u64 = con.llen(output_queue).await.map_err(|_| ())?; 84 - // NOTE: The following assumes the queue hasn't been repopulated by any other client 85 - // in-between the call to llen and the execution of the pipeline. 86 - // Hopefully won't be a problem :) 87 - if len == 0 { 88 - let mut file_contents = read_files(filter); 125 + #[derive(Clone)] 126 + struct RedisState { 127 + con_manager: redis::aio::ConnectionManager, 128 + } 89 129 90 - { 91 - let mut rand = rand::rng(); 92 - file_contents.shuffle(&mut rand); 130 + impl RedisState { 131 + pub async fn new(url: String) -> Result<Self, ()> { 132 + let redis = redis::Client::open(url).map_err(|_| ())?; 133 + let config = ConnectionManagerConfig::new() 134 + .set_response_timeout(std::time::Duration::from_secs(10)) 135 + .set_number_of_retries(3); 136 + let con_manager = redis::aio::ConnectionManager::new_with_config(redis, config) 137 + .await 138 + .map_err(|_| ())?; 139 + 140 + Ok(RedisState { con_manager }) 141 + } 142 + 143 + pub async fn fetch_quote(&self, filter: &QuoteFilter) -> Result<String, ()> { 144 + loop { 145 + match filter.get_quote(self.con_manager.clone()).await { 146 + Ok(text) => return Ok(text), 147 + Err(_) => eprintln!("Error fetching quote from redis storage. Retrying..."), 148 + }; 93 149 } 150 + } 151 + } 94 152 95 - let mut pipeline = redis::pipe(); 96 - for file_contents in file_contents.into_iter() { 97 - pipeline.lpush(output_queue, file_contents.as_str()); 153 + #[derive(Clone)] 154 + struct BlueskyState { 155 + bsky_agent: BskyAgent, 156 + bsky_session: Object<bsky_sdk::api::com::atproto::server::create_session::OutputData>, 157 + } 158 + 159 + impl BlueskyState { 160 + pub async fn new_session(username: String, password: String) -> Result<Self, ()> { 161 + let agent = BskyAgent::builder().build().await.map_err(|_| ())?; 162 + let session = agent.login(username, password).await.map_err(|_| ())?; 163 + 164 + Ok(Self { 165 + bsky_agent: agent, 166 + bsky_session: session, 167 + }) 168 + } 169 + 170 + pub async fn submit_post(self, post: String) -> Result<(), ()> { 171 + let post = prepare_post(post.as_str()); 172 + 173 + for current_try in 0..POSTING_RETRIES { 174 + if let Err(e) = self.bsky_agent.create_record(post.clone()).await { 175 + eprintln!("Could not post quote: `{e}`"); 176 + eprintln!("Attempting to refresh login..."); 177 + 178 + if let Err(e) = self 179 + .bsky_agent 180 + .resume_session(self.bsky_session.clone()) 181 + .await 182 + { 183 + eprintln!("Failed to resume sessions due to following error: {e}") 184 + } 185 + } else { 186 + if current_try > 0 { 187 + eprintln!("Successfully posted quote on retry #{current_try}"); 188 + } 189 + return Ok(()); 190 + } 98 191 } 99 - let _: () = pipeline.query_async(&mut con).await.map_err(|_| ())?; 192 + 193 + Err(()) 100 194 } 195 + } 101 196 102 - Ok(()) 197 + #[derive(Clone)] 198 + struct State { 199 + redis: RedisState, 200 + bsky_session: Option<BlueskyState>, 103 201 } 104 202 105 - async fn get_quote( 106 - filter: &QuoteFilter, 107 - mut con: impl redis::aio::ConnectionLike + AsyncCommands + Clone, 108 - ) -> Result<String, ()> { 109 - // 1: Attempt to read from the event (priority) queue 110 - let event_quote: Option<String> = con.lpop(EVENT_QUEUE, None).await.ok(); 111 - if let Some(quote) = event_quote { 112 - return Ok(quote); 203 + impl State { 204 + pub fn redis(&self) -> &RedisState { 205 + &self.redis 113 206 } 114 207 115 - // 2: Otherwise, we read from the regular queue, repopulating it if it's empty 116 - reshuffle_quotes(filter, con.clone(), DEFAULT_QUEUE).await?; 117 - con.lpop(DEFAULT_QUEUE, None).await.map_err(|_| ()) 208 + pub fn bsky(&self) -> Option<&BlueskyState> { 209 + self.bsky_session.as_ref() 210 + } 118 211 } 119 212 120 213 #[tokio::main] 121 214 async fn main() -> Result<(), Box<dyn std::error::Error>> { 122 - let redis = 123 - redis::Client::open(std::env::var("REDIS_URL").unwrap_or("redis://localhost".to_string()))?; 124 - let con = redis.get_multiplexed_async_connection().await?; 125 - 126 215 let debug_mode = std::env::var("DEBUG").unwrap_or("0".to_string()) == "1"; 216 + let use_bsky = std::env::var("USE_BLUESKY").unwrap_or("0".to_string()) == "1"; 127 217 128 - let (agent, session) = if !debug_mode { 129 - let agent = BskyAgent::builder().build().await?; 130 - let session = agent 131 - .login( 132 - std::env::var("BLUESKY_USERNAME").unwrap_or_default(), 133 - std::env::var("BLUESKY_PASSWORD").unwrap_or_default(), 218 + let redis_state = 219 + RedisState::new(std::env::var("REDIS_URL").unwrap_or("redis://localhost".to_string())) 220 + .await 221 + .expect("Initial redis connection failure"); 222 + let bsky_state = if use_bsky { 223 + Some( 224 + BlueskyState::new_session( 225 + std::env::var("BLUESKY_USERNAME").expect("Bluesky username not supplied"), 226 + std::env::var("BLUESKY_PASSWORD") 227 + .expect("Bluesky application password not supplied"), 134 228 ) 135 - .await?; 136 - 137 - (Some(Arc::new(Mutex::new(agent))), Some(session)) 229 + .await 230 + .expect("Could not connect to Bluesky with supplied credentials"), 231 + ) 138 232 } else { 139 - (None, None) // Let's just simulate what the bot would post 233 + None 140 234 }; 141 235 236 + let app_state = Arc::new(State { 237 + redis: redis_state, 238 + bsky_session: bsky_state, 239 + }); 240 + 142 241 let sched = JobScheduler::new().await?; 143 242 144 243 /* 145 244 let event_filter = Arc::new(QuoteFilter { 146 245 content: r"\b(?i:mother|mommy|mama|mom)\b".to_string(), 147 - path: "test/**/ 148 - *.txt".to_string(), 246 + path: "test/**/*.txt".to_string(), 149 247 dates: vec![], 150 248 }); 151 249 */ 152 250 153 251 let regular_filter = Arc::new(QuoteFilter { 154 252 content: r".*".to_string(), 155 - path: if !debug_mode { "quotes/**/*.txt".to_string() } else { "test/**/*.txt".to_string() }, 253 + path: if !debug_mode { 254 + "quotes/**/*.txt".to_string() 255 + } else { 256 + "test/**/*.txt".to_string() 257 + }, 156 258 dates: vec![], 157 259 }); 158 - 159 - let (con_poster, con_event_monitor) = (con.clone(), con.clone()); 160 - let (agent_poster, agent_event_monitor) = (agent.clone(), agent.clone()); 161 260 162 261 let posting_interval = if !debug_mode { 163 262 POSTING_INTERVAL_CRON ··· 165 264 POSTING_INTERVAL_DEBUG 166 265 }; 167 266 168 - // Add async job 169 - sched 170 - .add(Job::new_async(posting_interval, move |_uuid, _| { 171 - let filter = regular_filter.clone(); 172 - let con = con_poster.clone(); 173 - let agent = agent_poster.clone(); 174 - let session = session.clone(); 267 + let post_job = Job::new_async(posting_interval, move |_uuid, _| { 268 + let filter = regular_filter.clone(); 269 + let app_state = app_state.clone(); 175 270 176 - Box::pin(async move { 177 - let text: String = get_quote(&filter, con).await.unwrap(); 178 - 179 - if let (Some(agent), Some(session)) = (agent, session) { 180 - let post = prepare_post(text.as_str()); 181 - let agent = agent.lock().await; 182 - 183 - for _ in 0..POSTING_RETRIES { 184 - if let Err(e) = agent.create_record(post.clone()).await { 185 - eprintln!("Could not post quote: `{e}`"); 186 - eprintln!("Attempting to refresh login..."); 271 + Box::pin(async move { 272 + // We try fetching a new quote from our redis storage until we succeed 273 + let text = match app_state.redis().fetch_quote(&filter).await { 274 + Ok(text) => text, 275 + Err(_) => { 276 + eprintln!("Error fetching quote from redis storage."); 277 + return; 278 + } 279 + }; 187 280 188 - if let Err(e) = agent.resume_session(session.clone()).await { 189 - eprintln!("Failed to resume sessions due to following error: {e}") 190 - } 191 - } else { 192 - break; 193 - } 194 - } 195 - } else { 196 - // Let's just print the quote! 197 - println!("{}\n", text); 281 + if let Some(bsky) = app_state.bsky() { 282 + if let Err(_) = bsky.clone().submit_post(text).await { 283 + eprintln!("Error posting to bluesky."); 284 + return; 198 285 } 199 - }) 200 - })?) 201 - .await?; 286 + } else { 287 + // Let's just print the quote! 288 + println!("{}\n", text); 289 + } 290 + }) 291 + })?; 292 + 293 + // Add async job 294 + sched.add(post_job).await?; 202 295 203 296 // sched 204 297 // .add(Job::new_async(EVENT_UPDATE_INTERVAL, move |_uuid, _| { ··· 215 308 // })?) 216 309 // .await?; 217 310 218 - sched.start().await.unwrap(); 311 + sched 312 + .start() 313 + .await 314 + .expect("Error starting tokio scheduler. Shutting down..."); 219 315 loop { 220 316 tokio::time::sleep(Duration::from_secs(10)).await; 221 317 }