Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver

store mentions and tags from facets in DB

mia.omg.lol cc0eb694 2103b7d9

verified
+56 -7
+8 -3
consumer/src/db/copy.rs
··· 1 1 use super::PgExecResult; 2 2 use crate::indexer::records; 3 - use crate::utils::strongref_to_parts; 3 + use crate::utils::{extract_mentions_and_tags, merge_tags, strongref_to_parts}; 4 4 use chrono::prelude::*; 5 5 use deadpool_postgres::Transaction; 6 6 use futures::pin_mut; ··· 119 119 .await 120 120 } 121 121 122 - const POST_STMT: &str = "COPY posts_tmp (at_uri, cid, did, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri, root_cid, embed, embed_subtype, created_at) FROM STDIN (FORMAT binary)"; 122 + const POST_STMT: &str = "COPY posts_tmp (at_uri, cid, did, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri, root_cid, embed, embed_subtype, mentions, created_at) FROM STDIN (FORMAT binary)"; 123 123 const POST_TYPES: &[Type] = &[ 124 124 Type::TEXT, 125 125 Type::TEXT, ··· 135 135 Type::TEXT, 136 136 Type::TEXT, 137 137 Type::TEXT, 138 + Type::TEXT_ARRAY, 138 139 Type::TIMESTAMP, 139 140 ]; 140 141 pub async fn copy_posts( ··· 159 160 160 161 for (at_uri, cid, post) in data { 161 162 let record = serde_json::to_value(&post).unwrap(); 163 + let (mentions, tags) = post.facets.as_ref().map(|v| extract_mentions_and_tags(&v)).unzip(); 162 164 let facets = post.facets.and_then(|v| serde_json::to_value(v).ok()); 163 165 let embed = post.embed.as_ref().map(|v| v.as_str()); 164 166 let embed_subtype = post.embed.as_ref().and_then(|v| v.subtype()); 165 167 let (parent_uri, parent_cid) = strongref_to_parts(post.reply.as_ref().map(|v| &v.parent)); 166 168 let (root_uri, root_cid) = strongref_to_parts(post.reply.as_ref().map(|v| &v.root)); 167 169 170 + let tags = merge_tags(tags, post.tags); 171 + 168 172 let writer = writer.as_mut(); 169 173 writer 170 174 .write(&[ ··· 175 179 &post.text, 176 180 &facets, 177 181 &post.langs.unwrap_or_default(), 178 - &post.tags.unwrap_or_default(), 182 + &tags, 179 183 &parent_uri, 180 184 &parent_cid, 181 185 &root_uri, 182 186 &root_cid, 183 187 &embed, 184 188 &embed_subtype, 189 + &mentions, 185 190 &post.created_at.naive_utc(), 186 191 ]) 187 192 .await?;
+10 -2
consumer/src/db/record.rs
··· 1 1 use super::{PgExecResult, PgOptResult, PgResult}; 2 2 use crate::indexer::records::*; 3 - use crate::utils::{blob_ref, strongref_to_parts}; 3 + use crate::utils::{blob_ref, extract_mentions_and_tags, merge_tags, strongref_to_parts}; 4 4 use chrono::prelude::*; 5 5 use deadpool_postgres::GenericClient; 6 6 use ipld_core::cid::Cid; ··· 322 322 ) -> PgExecResult { 323 323 let cid = cid.to_string(); 324 324 let record = serde_json::to_value(&rec).unwrap(); 325 + let (mentions, tags) = rec 326 + .facets 327 + .as_ref() 328 + .map(|v| extract_mentions_and_tags(&v)) 329 + .unzip(); 325 330 let facets = rec.facets.and_then(|v| serde_json::to_value(v).ok()); 326 331 let (parent_uri, parent_cid) = strongref_to_parts(rec.reply.as_ref().map(|v| &v.parent)); 327 332 let (root_uri, root_cid) = strongref_to_parts(rec.reply.as_ref().map(|v| &v.root)); 328 333 let embed = rec.embed.as_ref().map(|v| v.as_str()); 329 334 let embed_subtype = rec.embed.as_ref().and_then(|v| v.subtype()); 330 335 336 + let tags = merge_tags(tags, rec.tags); 337 + 331 338 let count = conn 332 339 .execute( 333 340 include_str!("sql/post_insert.sql"), ··· 339 346 &rec.text, 340 347 &facets, 341 348 &rec.langs.unwrap_or_default(), 342 - &rec.tags.unwrap_or_default(), 349 + &tags, 343 350 &parent_uri, 344 351 &parent_cid, 345 352 &root_uri, 346 353 &root_cid, 347 354 &embed, 348 355 &embed_subtype, 356 + &mentions, 349 357 &rec.created_at, 350 358 ], 351 359 )
+2 -2
consumer/src/db/sql/post_insert.sql
··· 1 1 INSERT INTO posts (at_uri, did, cid, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri, 2 - root_cid, embed, embed_subtype, created_at) 3 - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) 2 + root_cid, embed, embed_subtype, mentions, created_at) 3 + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) 4 4 ON CONFLICT DO NOTHING
+31
consumer/src/utils.rs
··· 1 + use lexica::app_bsky::richtext::{Facet, FacetMain, FacetOuter}; 1 2 use lexica::{Blob, StrongRef}; 2 3 use serde::{Deserialize, Deserializer}; 3 4 ··· 39 40 40 41 did == split_aturi[2] 41 42 } 43 + 44 + pub fn extract_mentions_and_tags(from: &[FacetMain]) -> (Vec<String>, Vec<String>) { 45 + let (mentions, tags) = from 46 + .into_iter() 47 + .flat_map(|v| { 48 + v.features.iter().map(|facet| match facet { 49 + FacetOuter::Bsky(Facet::Mention { did }) => (Some(did), None), 50 + FacetOuter::Bsky(Facet::Tag { tag }) => (None, Some(tag)), 51 + _ => (None, None), 52 + }) 53 + }) 54 + .unzip::<_, _, Vec<_>, Vec<_>>(); 55 + 56 + let mentions = mentions.into_iter().flatten().cloned().collect(); 57 + let tags = tags.into_iter().flatten().cloned().collect(); 58 + 59 + (mentions, tags) 60 + } 61 + 62 + pub fn merge_tags<T>(t1: Option<Vec<T>>, t2: Option<Vec<T>>) -> Vec<T> { 63 + match (t1, t2) { 64 + (Some(t1), None) => t1, 65 + (None, Some(t2)) => t2, 66 + (Some(mut t1), Some(t2)) => { 67 + t1.extend(t2); 68 + t1 69 + } 70 + _ => Vec::default(), 71 + } 72 + }
+2
migrations/2025-09-27-171241_post-tweaks/down.sql
··· 1 + alter table posts 2 + drop column mentions;
+2
migrations/2025-09-27-171241_post-tweaks/up.sql
··· 1 + alter table posts 2 + add column mentions text[];
+1
parakeet-db/src/schema.rs
··· 284 284 embed_subtype -> Nullable<Text>, 285 285 created_at -> Timestamptz, 286 286 indexed_at -> Timestamp, 287 + mentions -> Nullable<Array<Nullable<Text>>>, 287 288 } 288 289 } 289 290