···11use super::PgExecResult;
22use crate::indexer::records;
33-use crate::utils::strongref_to_parts;
33+use crate::utils::{extract_mentions_and_tags, merge_tags, strongref_to_parts};
44use chrono::prelude::*;
55use deadpool_postgres::Transaction;
66use futures::pin_mut;
···119119 .await
120120}
121121122122-const POST_STMT: &str = "COPY posts_tmp (at_uri, cid, did, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri, root_cid, embed, embed_subtype, created_at) FROM STDIN (FORMAT binary)";
122122+const POST_STMT: &str = "COPY posts_tmp (at_uri, cid, did, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri, root_cid, embed, embed_subtype, mentions, created_at) FROM STDIN (FORMAT binary)";
123123const POST_TYPES: &[Type] = &[
124124 Type::TEXT,
125125 Type::TEXT,
···135135 Type::TEXT,
136136 Type::TEXT,
137137 Type::TEXT,
138138+ Type::TEXT_ARRAY,
138139 Type::TIMESTAMP,
139140];
140141pub async fn copy_posts(
···159160160161 for (at_uri, cid, post) in data {
161162 let record = serde_json::to_value(&post).unwrap();
163163+ let (mentions, tags) = post
164164+ .facets
165165+ .as_ref()
166166+ .map(|v| extract_mentions_and_tags(v))
167167+ .unzip();
162168 let facets = post.facets.and_then(|v| serde_json::to_value(v).ok());
163169 let embed = post.embed.as_ref().map(|v| v.as_str());
164170 let embed_subtype = post.embed.as_ref().and_then(|v| v.subtype());
165171 let (parent_uri, parent_cid) = strongref_to_parts(post.reply.as_ref().map(|v| &v.parent));
166172 let (root_uri, root_cid) = strongref_to_parts(post.reply.as_ref().map(|v| &v.root));
173173+174174+ let tags = merge_tags(tags, post.tags);
167175168176 let writer = writer.as_mut();
169177 writer
···175183 &post.text,
176184 &facets,
177185 &post.langs.unwrap_or_default(),
178178- &post.tags.unwrap_or_default(),
186186+ &tags,
179187 &parent_uri,
180188 &parent_cid,
181189 &root_uri,
182190 &root_cid,
183191 &embed,
184192 &embed_subtype,
193193+ &mentions,
185194 &post.created_at.naive_utc(),
186195 ])
187196 .await?;
188197 }
189198190199 writer.finish().await?;
200200+201201+ let threadgated: Vec<(String, String, DateTime<Utc>)> = conn
202202+ .query(
203203+ "SELECT root_uri, p.at_uri, p.created_at FROM posts_tmp p INNER JOIN threadgates t ON root_uri = post_uri WHERE t.allow IS NOT NULL",
204204+ &[],
205205+ )
206206+ .await?
207207+ .into_iter()
208208+ .map(|v| (v.get(0), v.get(1), v.get(2))).collect();
209209+210210+ for (root, post, created_at) in threadgated {
211211+ match super::post_enforce_threadgate(conn, &root, did, created_at, true).await {
212212+ Ok(true) => {
213213+ conn.execute(
214214+ "UPDATE posts_tmp SET violates_threadgate=TRUE WHERE at_uri=$1",
215215+ &[&post],
216216+ )
217217+ .await?;
218218+ }
219219+ Ok(false) => continue,
220220+ Err(e) => {
221221+ tracing::error!("failed to check threadgate enforcement: {e}");
222222+ continue;
223223+ }
224224+ }
225225+ }
191226192227 conn.execute("INSERT INTO posts (SELECT * FROM posts_tmp)", &[])
193228 .await
+208
consumer/src/db/gates.rs
···11+use super::{PgExecResult, PgResult};
22+use crate::indexer::records::{
33+ AppBskyFeedThreadgate, ThreadgateRule, THREADGATE_RULE_FOLLOWER, THREADGATE_RULE_FOLLOWING,
44+ THREADGATE_RULE_LIST, THREADGATE_RULE_MENTION,
55+};
66+use chrono::prelude::*;
77+use chrono::{DateTime, Utc};
88+use deadpool_postgres::GenericClient;
99+use std::collections::HashSet;
1010+1111+pub async fn post_enforce_threadgate<C: GenericClient>(
1212+ conn: &mut C,
1313+ root: &str,
1414+ post_author: &str,
1515+ post_created_at: DateTime<Utc>,
1616+ is_backfill: bool,
1717+) -> PgResult<bool> {
1818+ // check if the root and the current post are the same author
1919+ // strip "at://" then break into parts by '/'
2020+ let parts = root[5..].split('/').collect::<Vec<_>>();
2121+ let root_author = parts[0];
2222+ if root_author == post_author {
2323+ return Ok(false);
2424+ }
2525+2626+ let tg_data = super::threadgate_get(conn, root).await?;
2727+2828+ let Some((created_at, allow, allow_lists)) = tg_data else {
2929+ return Ok(false);
3030+ };
3131+3232+ // when backfilling, there's no point continuing if the record is dated before the threadgate
3333+ if is_backfill && post_created_at < created_at {
3434+ return Ok(false);
3535+ }
3636+3737+ if allow.is_empty() {
3838+ return Ok(true);
3939+ }
4040+4141+ let allow: HashSet<String> = HashSet::from_iter(allow);
4242+4343+ if allow.contains(THREADGATE_RULE_FOLLOWER) || allow.contains(THREADGATE_RULE_FOLLOWING) {
4444+ let profile_state: Option<(bool, bool)> = conn
4545+ .query_opt(
4646+ "SELECT following IS NOT NULL, followed IS NOT NULL FROM profile_states WHERE did=$1 AND subject=$2",
4747+ &[&root_author, &post_author],
4848+ )
4949+ .await?
5050+ .map(|v| (v.get(0), v.get(1)));
5151+5252+ if let Some((following, followed)) = profile_state {
5353+ if allow.contains(THREADGATE_RULE_FOLLOWER) && followed {
5454+ return Ok(false);
5555+ }
5656+5757+ if allow.contains(THREADGATE_RULE_FOLLOWING) && following {
5858+ return Ok(false);
5959+ }
6060+ }
6161+ }
6262+6363+ // check mentions
6464+ if allow.contains(THREADGATE_RULE_MENTION) {
6565+ let mentions: Vec<String> = conn
6666+ .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root])
6767+ .await?
6868+ .map(|r| r.get(0))
6969+ .unwrap_or_default();
7070+7171+ if mentions.contains(&post_author.to_owned()) {
7272+ return Ok(false);
7373+ }
7474+ }
7575+7676+ if allow.contains(THREADGATE_RULE_LIST) {
7777+ if allow_lists.is_empty() {
7878+ return Ok(true);
7979+ }
8080+8181+ let count: i64 = conn
8282+ .query_one(
8383+ "SELECT count(*) FROM list_items WHERE list_uri=ANY($1) AND subject=$2",
8484+ &[&allow_lists, &post_author],
8585+ )
8686+ .await?
8787+ .get(0);
8888+ if count != 0 {
8989+ return Ok(false);
9090+ }
9191+ }
9292+9393+ Ok(true)
9494+}
9595+9696+pub async fn postgate_maintain_detaches<C: GenericClient>(
9797+ conn: &mut C,
9898+ post: &str,
9999+ detached: &[String],
100100+ disable_effective: Option<NaiveDateTime>,
101101+) -> PgExecResult {
102102+ conn.execute(
103103+ "SELECT maintain_postgates($1, $2, $3)",
104104+ &[&post, &detached, &disable_effective],
105105+ )
106106+ .await
107107+}
108108+109109+// variant of post_enforce_threadgate that runs when backfilling to clean up any posts already in DB
110110+pub async fn threadgate_enforce_backfill<C: GenericClient>(
111111+ conn: &mut C,
112112+ root_author: &str,
113113+ threadgate: &AppBskyFeedThreadgate,
114114+) -> PgExecResult {
115115+ // pull out allow - if it's None we can skip this gate.
116116+ let Some(allow) = threadgate.allow.as_ref() else {
117117+ return Ok(0);
118118+ };
119119+120120+ let root = &threadgate.post;
121121+122122+ if allow.is_empty() {
123123+ // blind update everything
124124+ return conn.execute(
125125+ "UPDATE posts SET violates_threadgate=TRUE WHERE root_uri=$1 AND did != $2 AND created_at >= $3",
126126+ &[&root, &root_author, &threadgate.created_at],
127127+ ).await;
128128+ }
129129+130130+ // pull authors with our root_uri where the author is not the root author and are dated after created_at
131131+ // this is mutable because we'll remove ALLOWED dids
132132+ let mut dids: HashSet<String> = conn
133133+ .query(
134134+ "SELECT DISTINCT did FROM posts WHERE root_uri=$1 AND did != $2 AND created_at >= $3",
135135+ &[&root, &root_author, &threadgate.created_at],
136136+ )
137137+ .await?
138138+ .into_iter()
139139+ .map(|row| row.get(0))
140140+ .collect();
141141+142142+ // this will be empty if there are no replies.
143143+ if dids.is_empty() {
144144+ return Ok(0);
145145+ }
146146+147147+ let allowed_lists = allow
148148+ .iter()
149149+ .filter_map(|rule| match rule {
150150+ ThreadgateRule::List { list } => Some(list),
151151+ _ => None,
152152+ })
153153+ .collect::<Vec<_>>();
154154+155155+ let allow: HashSet<_> = HashSet::from_iter(allow.into_iter().map(|v| v.as_str()));
156156+157157+ if allow.contains(THREADGATE_RULE_FOLLOWER) && !dids.is_empty() {
158158+ let current_dids: Vec<_> = dids.iter().collect();
159159+160160+ let res = conn.query(
161161+ "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND followed IS NOT NULL",
162162+ &[&root_author, ¤t_dids]
163163+ ).await?;
164164+165165+ dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0)));
166166+ }
167167+168168+ if allow.contains(THREADGATE_RULE_FOLLOWING) && !dids.is_empty() {
169169+ let current_dids: Vec<_> = dids.iter().collect();
170170+171171+ let res = conn.query(
172172+ "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND following IS NOT NULL",
173173+ &[&root_author, ¤t_dids]
174174+ ).await?;
175175+176176+ dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0)));
177177+ }
178178+179179+ if allow.contains(THREADGATE_RULE_MENTION) && !dids.is_empty() {
180180+ let mentions: Vec<String> = conn
181181+ .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root])
182182+ .await?
183183+ .map(|r| r.get(0))
184184+ .unwrap_or_default();
185185+186186+ dids = &dids - &HashSet::from_iter(mentions);
187187+ }
188188+189189+ if allow.contains(THREADGATE_RULE_LIST) && !dids.is_empty() {
190190+ let current_dids: Vec<_> = dids.iter().collect();
191191+192192+ let res = conn
193193+ .query(
194194+ "SELECT subject FROM list_items WHERE list_uri = ANY($1) AND subject = ANY($2)",
195195+ &[&allowed_lists, ¤t_dids],
196196+ )
197197+ .await?;
198198+199199+ dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0)));
200200+ }
201201+202202+ let dids = dids.into_iter().collect::<Vec<_>>();
203203+204204+ conn.execute(
205205+ "UPDATE posts SET violates_threadgate=TRUE WHERE root_uri = $1 AND did = ANY($2) AND created_at >= $3",
206206+ &[&threadgate.post, &dids, &threadgate.created_at]
207207+ ).await
208208+}
+2
consumer/src/db/mod.rs
···77mod actor;
88mod backfill;
99pub mod copy;
1010+mod gates;
1011mod labels;
1112mod record;
12131314pub use actor::*;
1415pub use backfill::*;
1616+pub use gates::*;
1517pub use labels::*;
1618pub use record::*;
+70-28
consumer/src/db/record.rs
···11use super::{PgExecResult, PgOptResult, PgResult};
22use crate::indexer::records::*;
33-use crate::utils::{blob_ref, strongref_to_parts};
33+use crate::utils::{blob_ref, extract_mentions_and_tags, merge_tags, strongref_to_parts};
44use chrono::prelude::*;
55use deadpool_postgres::GenericClient;
66use ipld_core::cid::Cid;
77use lexica::community_lexicon::bookmarks::Bookmark;
88+use std::collections::HashSet;
89910pub async fn record_upsert<C: GenericClient>(
1011 conn: &mut C,
···317318 repo: &str,
318319 cid: Cid,
319320 rec: AppBskyFeedPost,
321321+ is_backfill: bool,
320322) -> PgExecResult {
321323 let cid = cid.to_string();
322324 let record = serde_json::to_value(&rec).unwrap();
325325+ let (mentions, tags) = rec
326326+ .facets
327327+ .as_ref()
328328+ .map(|v| extract_mentions_and_tags(v))
329329+ .unzip();
323330 let facets = rec.facets.and_then(|v| serde_json::to_value(v).ok());
324331 let (parent_uri, parent_cid) = strongref_to_parts(rec.reply.as_ref().map(|v| &v.parent));
325332 let (root_uri, root_cid) = strongref_to_parts(rec.reply.as_ref().map(|v| &v.root));
326333 let embed = rec.embed.as_ref().map(|v| v.as_str());
327334 let embed_subtype = rec.embed.as_ref().and_then(|v| v.subtype());
328335336336+ // if there is a root, we need to check for the presence of a threadgate.
337337+ let violates_threadgate = match &root_uri {
338338+ Some(root) => {
339339+ super::post_enforce_threadgate(conn, root, repo, rec.created_at, is_backfill).await?
340340+ }
341341+ None => false,
342342+ };
343343+344344+ let tags = merge_tags(tags, rec.tags);
345345+329346 let count = conn
330347 .execute(
331348 include_str!("sql/post_insert.sql"),
···337354 &rec.text,
338355 &facets,
339356 &rec.langs.unwrap_or_default(),
340340- &rec.tags.unwrap_or_default(),
357357+ &tags,
341358 &parent_uri,
342359 &parent_cid,
343360 &root_uri,
344361 &root_cid,
345362 &embed,
346363 &embed_subtype,
364364+ &mentions,
365365+ &violates_threadgate,
347366 &rec.created_at,
348367 ],
349368 )
350369 .await?;
351370352371 if let Some(embed) = rec.embed.and_then(|embed| embed.into_bsky()) {
353353- post_embed_insert(conn, at_uri, embed, rec.created_at).await?;
372372+ post_embed_insert(conn, at_uri, embed, rec.created_at, is_backfill).await?;
354373 }
355374356375 Ok(count)
···380399 post: &str,
381400 embed: AppBskyEmbed,
382401 created_at: DateTime<Utc>,
402402+ is_backfill: bool,
383403) -> PgExecResult {
384404 match embed {
385405 AppBskyEmbed::Images(embed) => post_embed_image_insert(conn, post, embed).await,
386406 AppBskyEmbed::Video(embed) => post_embed_video_insert(conn, post, embed).await,
387407 AppBskyEmbed::External(embed) => post_embed_external_insert(conn, post, embed).await,
388408 AppBskyEmbed::Record(embed) => {
389389- post_embed_record_insert(conn, post, embed, created_at).await
409409+ post_embed_record_insert(conn, post, embed, created_at, is_backfill).await
390410 }
391411 AppBskyEmbed::RecordWithMedia(embed) => {
392392- post_embed_record_insert(conn, post, embed.record, created_at).await?;
412412+ post_embed_record_insert(conn, post, embed.record, created_at, is_backfill).await?;
393413 match *embed.media {
394414 AppBskyEmbed::Images(embed) => post_embed_image_insert(conn, post, embed).await,
395415 AppBskyEmbed::Video(embed) => post_embed_video_insert(conn, post, embed).await,
···476496 ).await
477497}
478498499499+const PG_DISABLE_RULE: &str = "app.bsky.feed.postgate#disableRule";
479500async fn post_embed_record_insert<C: GenericClient>(
480501 conn: &mut C,
481502 post: &str,
482503 embed: AppBskyEmbedRecord,
483504 post_created_at: DateTime<Utc>,
505505+ is_backfill: bool,
484506) -> PgExecResult {
485507 // strip "at://" then break into parts by '/'
486508 let parts = embed.record.uri[5..].split('/').collect::<Vec<_>>();
487509488510 let detached = if parts[1] == "app.bsky.feed.post" {
489489- let postgate_effective: Option<DateTime<Utc>> = conn
490490- .query_opt(
491491- "SELECT created_at FROM postgates WHERE post_uri=$1",
492492- &[&post],
493493- )
494494- .await?
495495- .map(|v| v.get(0));
511511+ let pg_data = postgate_get(conn, post).await?;
496512497497- postgate_effective
498498- .map(|v| Utc::now().min(post_created_at) > v)
499499- .unwrap_or_default()
513513+ if let Some((effective, detached, rules)) = pg_data {
514514+ let detached: HashSet<String> = HashSet::from_iter(detached);
515515+ let rules: HashSet<String> = HashSet::from_iter(rules);
516516+ let compare_date = match is_backfill {
517517+ true => post_created_at,
518518+ false => Utc::now(),
519519+ };
520520+521521+ detached.contains(post) || (rules.contains(PG_DISABLE_RULE) && compare_date > effective)
522522+ } else {
523523+ false
524524+ }
500525 } else {
501526 false
502527 };
···505530 "INSERT INTO post_embed_record (post_uri, record_type, uri, cid, detached) VALUES ($1, $2, $3, $4, $5)",
506531 &[&post, &parts[1], &embed.record.uri, &embed.record.cid.to_string(), &detached],
507532 ).await
533533+}
534534+535535+async fn postgate_get<C: GenericClient>(
536536+ conn: &mut C,
537537+ post: &str,
538538+) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> {
539539+ let res = conn
540540+ .query_opt(
541541+ "SELECT created_at, detached, rules FROM postgates WHERE post_uri=$1",
542542+ &[&post],
543543+ )
544544+ .await?
545545+ .map(|v| (v.get(0), v.get(1), v.get(2)));
546546+547547+ Ok(res)
508548}
509549510550pub async fn postgate_upsert<C: GenericClient>(
···536576pub async fn postgate_delete<C: GenericClient>(conn: &mut C, at_uri: &str) -> PgExecResult {
537577 conn.execute("DELETE FROM postgates WHERE at_uri=$1", &[&at_uri])
538578 .await
539539-}
540540-541541-pub async fn postgate_maintain_detaches<C: GenericClient>(
542542- conn: &mut C,
543543- post: &str,
544544- detached: &[String],
545545- disable_effective: Option<NaiveDateTime>,
546546-) -> PgExecResult {
547547- conn.execute(
548548- "SELECT maintain_postgates($1, $2, $3)",
549549- &[&post, &detached, &disable_effective],
550550- )
551551- .await
552579}
553580554581pub async fn profile_upsert<C: GenericClient>(
···698725pub async fn status_delete<C: GenericClient>(conn: &mut C, did: &str) -> PgExecResult {
699726 conn.execute("DELETE FROM statuses WHERE did=$1", &[&did])
700727 .await
728728+}
729729+730730+pub async fn threadgate_get<C: GenericClient>(
731731+ conn: &mut C,
732732+ post: &str,
733733+) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> {
734734+ let res = conn
735735+ .query_opt(
736736+ "SELECT created_at, allow, allowed_lists FROM threadgates WHERE post_uri=$1 AND allow IS NOT NULL",
737737+ &[&post],
738738+ )
739739+ .await?
740740+ .map(|v| (v.get(0), v.get(1), v.get(2)));
741741+742742+ Ok(res)
701743}
702744703745pub async fn threadgate_upsert<C: GenericClient>(
···11+alter table posts
22+ drop column mentions,
33+ drop column violates_threadgate;
44+55+drop trigger t_author_feed_ins_post on posts;
66+drop trigger t_author_feed_del_post on posts;
77+drop trigger t_author_feed_ins_repost on reposts;
88+drop trigger t_author_feed_del_repost on reposts;
99+1010+drop function f_author_feed_ins_post;
1111+drop function f_author_feed_del_post;
1212+drop function f_author_feed_ins_repost;
1313+drop function f_author_feed_del_repost;
1414+1515+drop table author_feeds;
+79
migrations/2025-09-27-171241_post-tweaks/up.sql
···11+alter table posts
22+ add column mentions text[],
33+ add column violates_threadgate bool not null default false;
44+55+create table author_feeds
66+(
77+ uri text primary key,
88+ cid text not null,
99+ post text not null,
1010+ did text not null,
1111+ typ text not null,
1212+ sort_at timestamptz not null
1313+);
1414+1515+-- author_feeds post triggers
1616+create function f_author_feed_ins_post() returns trigger
1717+ language plpgsql as
1818+$$
1919+begin
2020+ insert into author_feeds (uri, cid, post, did, typ, sort_at)
2121+ VALUES (NEW.at_uri, NEW.cid, NEW.at_uri, NEW.did, 'post', NEW.created_at)
2222+ on conflict do nothing;
2323+ return NEW;
2424+end;
2525+$$;
2626+2727+create trigger t_author_feed_ins_post
2828+ before insert
2929+ on posts
3030+ for each row
3131+execute procedure f_author_feed_ins_post();
3232+3333+create function f_author_feed_del_post() returns trigger
3434+ language plpgsql as
3535+$$
3636+begin
3737+ delete from author_feeds where did = OLD.did and item = OLD.at_uri and typ = 'post';
3838+ return OLD;
3939+end;
4040+$$;
4141+4242+create trigger t_author_feed_del_post
4343+ before delete
4444+ on posts
4545+ for each row
4646+execute procedure f_author_feed_del_post();
4747+4848+-- author_feeds repost triggers
4949+create function f_author_feed_ins_repost() returns trigger
5050+ language plpgsql as
5151+$$
5252+begin
5353+ insert into author_feeds (uri, cid, post, did, typ, sort_at)
5454+ VALUES ('at://' || NEW.did || 'app.bsky.feed.repost' || NEW.rkey, NEW.post_cid, NEW.post, NEW.did, 'repost', NEW.created_at)
5555+ on conflict do nothing;
5656+ return NEW;
5757+end;
5858+$$;
5959+6060+create trigger t_author_feed_ins_repost
6161+ before insert
6262+ on reposts
6363+ for each row
6464+execute procedure f_author_feed_ins_repost();
6565+6666+create function f_author_feed_del_repost() returns trigger
6767+ language plpgsql as
6868+$$
6969+begin
7070+ delete from author_feeds where did = OLD.did and item = OLD.post and typ = 'repost';
7171+ return OLD;
7272+end;
7373+$$;
7474+7575+create trigger t_author_feed_del_repost
7676+ before delete
7777+ on reposts
7878+ for each row
7979+execute procedure f_author_feed_del_repost();
···11with recursive thread as (select at_uri, parent_uri, root_uri, 0 as depth
22 from posts
33- where parent_uri = $1
33+ where parent_uri = $1 and violates_threadgate=FALSE
44 union all
55 select p.at_uri, p.parent_uri, p.root_uri, thread.depth + 1
66 from posts p
77 join thread on p.parent_uri = thread.at_uri
88- where thread.depth <= $2)
88+ where thread.depth <= $2 and p.violates_threadgate=FALSE)
99select *
1010from thread
1111order by depth desc;
+4-2
parakeet/src/sql/thread_parent.sql
···11with recursive parents as (select at_uri, cid, parent_uri, root_uri, 0 as depth
22 from posts
33- where at_uri = (select parent_uri from posts where at_uri = $1)
33+ where
44+ at_uri = (select parent_uri from posts where at_uri = $1 and violates_threadgate = FALSE)
45 union all
56 select p.at_uri, p.cid, p.parent_uri, p.root_uri, parents.depth + 1
67 from posts p
78 join parents on p.at_uri = parents.parent_uri
88- where parents.depth <= $2)
99+ where parents.depth <= $2
1010+ and p.violates_threadgate = FALSE)
911select *
1012from parents
1113order by depth desc;