Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver

move postgate and threadgate enforcement into their own file

mia.omg.lol 4f4aab6b 20e03838

verified
+109 -102
+105
consumer/src/db/gates.rs
··· 1 + use super::{PgExecResult, PgResult}; 2 + use chrono::prelude::*; 3 + use chrono::{DateTime, Utc}; 4 + use deadpool_postgres::GenericClient; 5 + use std::collections::HashSet; 6 + 7 + pub async fn post_enforce_threadgate<C: GenericClient>( 8 + conn: &mut C, 9 + root: &str, 10 + post_author: &str, 11 + post_created_at: DateTime<Utc>, 12 + is_backfill: bool, 13 + ) -> PgResult<bool> { 14 + // check if the root and the current post are the same author 15 + // strip "at://" then break into parts by '/' 16 + let parts = root[5..].split('/').collect::<Vec<_>>(); 17 + let root_author = parts[0]; 18 + if root_author == post_author { 19 + return Ok(false); 20 + } 21 + 22 + let tg_data = super::threadgate_get(conn, root).await?; 23 + 24 + let Some((created_at, allow, allow_lists)) = tg_data else { 25 + return Ok(false); 26 + }; 27 + 28 + // when backfilling, there's no point continuing if the record is dated before the threadgate 29 + if is_backfill && post_created_at < created_at { 30 + return Ok(false); 31 + } 32 + 33 + if allow.is_empty() { 34 + return Ok(true); 35 + } 36 + 37 + let allow: HashSet<String> = HashSet::from_iter(allow); 38 + 39 + if allow.contains("app.bsky.feed.threadgate#followerRule") 40 + || allow.contains("app.bsky.feed.threadgate#followingRule") 41 + { 42 + let profile_state: Option<(bool, bool)> = conn 43 + .query_opt( 44 + "SELECT following IS NOT NULL, followed IS NOT NULL FROM profile_states WHERE did=$1 AND subject=$2", 45 + &[&root_author, &post_author], 46 + ) 47 + .await? 48 + .map(|v| (v.get(0), v.get(1))); 49 + 50 + if let Some((following, followed)) = profile_state { 51 + if allow.contains("app.bsky.feed.threadgate#followerRule") && followed { 52 + return Ok(false); 53 + } 54 + 55 + if allow.contains("app.bsky.feed.threadgate#followingRule") && following { 56 + return Ok(false); 57 + } 58 + } 59 + } 60 + 61 + // check mentions 62 + if allow.contains("app.bsky.feed.threadgate#mentionRule") { 63 + let mentions: Vec<String> = conn 64 + .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) 65 + .await? 66 + .map(|r| r.get(0)) 67 + .unwrap_or_default(); 68 + 69 + if mentions.contains(&post_author.to_owned()) { 70 + return Ok(false); 71 + } 72 + } 73 + 74 + if allow.contains("app.bsky.feed.threadgate#listRule") { 75 + if allow_lists.is_empty() { 76 + return Ok(true); 77 + } 78 + 79 + let count: i64 = conn 80 + .query_one( 81 + "SELECT count(*) FROM list_items WHERE list_uri=ANY($1) AND subject=$2", 82 + &[&allow_lists, &post_author], 83 + ) 84 + .await? 85 + .get(0); 86 + if count == 0 { 87 + return Ok(true); 88 + } 89 + } 90 + 91 + Ok(false) 92 + } 93 + 94 + pub async fn postgate_maintain_detaches<C: GenericClient>( 95 + conn: &mut C, 96 + post: &str, 97 + detached: &[String], 98 + disable_effective: Option<NaiveDateTime>, 99 + ) -> PgExecResult { 100 + conn.execute( 101 + "SELECT maintain_postgates($1, $2, $3)", 102 + &[&post, &detached, &disable_effective], 103 + ) 104 + .await 105 + }
+2
consumer/src/db/mod.rs
··· 7 7 mod actor; 8 8 mod backfill; 9 9 pub mod copy; 10 + mod gates; 10 11 mod labels; 11 12 mod record; 12 13 13 14 pub use actor::*; 14 15 pub use backfill::*; 16 + pub use gates::*; 15 17 pub use labels::*; 16 18 pub use record::*;
+2 -102
consumer/src/db/record.rs
··· 336 336 // if there is a root, we need to check for the presence of a threadgate. 337 337 let violates_threadgate = match &root_uri { 338 338 Some(root) => { 339 - post_enforce_threadgate(conn, root, repo, rec.created_at, is_backfill).await? 339 + super::post_enforce_threadgate(conn, root, repo, rec.created_at, is_backfill).await? 340 340 } 341 341 None => false, 342 342 }; ··· 380 380 .await 381 381 } 382 382 383 - pub async fn post_enforce_threadgate<C: GenericClient>( 384 - conn: &mut C, 385 - root: &str, 386 - post_author: &str, 387 - post_created_at: DateTime<Utc>, 388 - is_backfill: bool, 389 - ) -> PgResult<bool> { 390 - // check if the root and the current post are the same author 391 - // strip "at://" then break into parts by '/' 392 - let parts = root[5..].split('/').collect::<Vec<_>>(); 393 - let root_author = parts[0]; 394 - if root_author == post_author { 395 - return Ok(false); 396 - } 397 - 398 - let tg_data = threadgate_get(conn, root).await?; 399 - 400 - let Some((created_at, allow, allow_lists)) = tg_data else { 401 - return Ok(false); 402 - }; 403 - 404 - // when backfilling, there's no point continuing if the record is dated before the threadgate 405 - if is_backfill && post_created_at < created_at { 406 - return Ok(false); 407 - } 408 - 409 - if allow.is_empty() { 410 - return Ok(true); 411 - } 412 - 413 - let allow: HashSet<String> = HashSet::from_iter(allow); 414 - 415 - if allow.contains("app.bsky.feed.threadgate#followerRule") 416 - || allow.contains("app.bsky.feed.threadgate#followingRule") 417 - { 418 - let profile_state: Option<(bool, bool)> = conn 419 - .query_opt( 420 - "SELECT following IS NOT NULL, followed IS NOT NULL FROM profile_states WHERE did=$1 AND subject=$2", 421 - &[&root_author, &post_author], 422 - ) 423 - .await? 424 - .map(|v| (v.get(0), v.get(1))); 425 - 426 - if let Some((following, followed)) = profile_state { 427 - if allow.contains("app.bsky.feed.threadgate#followerRule") && followed { 428 - return Ok(false); 429 - } 430 - 431 - if allow.contains("app.bsky.feed.threadgate#followingRule") && following { 432 - return Ok(false); 433 - } 434 - } 435 - } 436 - 437 - // check mentions 438 - if allow.contains("app.bsky.feed.threadgate#mentionRule") { 439 - let mentions: Vec<String> = conn 440 - .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) 441 - .await? 442 - .map(|r| r.get(0)) 443 - .unwrap_or_default(); 444 - 445 - if mentions.contains(&post_author.to_owned()) { 446 - return Ok(false); 447 - } 448 - } 449 - 450 - if allow.contains("app.bsky.feed.threadgate#listRule") { 451 - if allow_lists.is_empty() { 452 - return Ok(true); 453 - } 454 - 455 - let count: i64 = conn 456 - .query_one( 457 - "SELECT count(*) FROM list_items WHERE list_uri=ANY($1) AND subject=$2", 458 - &[&allow_lists, &post_author], 459 - ) 460 - .await? 461 - .get(0); 462 - if count == 0 { 463 - return Ok(true); 464 - } 465 - } 466 - 467 - Ok(false) 468 - } 469 - 470 383 pub async fn post_get_info_for_delete<C: GenericClient>( 471 384 conn: &mut C, 472 385 at_uri: &str, ··· 665 578 .await 666 579 } 667 580 668 - pub async fn postgate_maintain_detaches<C: GenericClient>( 669 - conn: &mut C, 670 - post: &str, 671 - detached: &[String], 672 - disable_effective: Option<NaiveDateTime>, 673 - ) -> PgExecResult { 674 - conn.execute( 675 - "SELECT maintain_postgates($1, $2, $3)", 676 - &[&post, &detached, &disable_effective], 677 - ) 678 - .await 679 - } 680 - 681 581 pub async fn profile_upsert<C: GenericClient>( 682 582 conn: &mut C, 683 583 repo: &str, ··· 827 727 .await 828 728 } 829 729 830 - async fn threadgate_get<C: GenericClient>( 730 + pub async fn threadgate_get<C: GenericClient>( 831 731 conn: &mut C, 832 732 post: &str, 833 733 ) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> {