tangled
alpha
login
or
join now
parakeet.at
/
parakeet
62
fork
atom
Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview
atproto
bluesky
rust
appserver
62
fork
atom
overview
issues
12
pulls
pipelines
postgates: trust provided timestamp only when backfilling
mia.omg.lol
5 months ago
2103b7d9
1e908e61
verified
This commit was signed with the committer's
known signature
.
mia.omg.lol
SSH Key Fingerprint:
SHA256:eb+NhC0QEl+XKRuFP/97oH6LEz0TXTKPXGDIAI5y7CQ=
+44
-15
3 changed files
expand all
collapse all
unified
split
consumer
src
backfill
repo.rs
db
record.rs
indexer
mod.rs
+1
-1
consumer/src/backfill/repo.rs
reviewed
···
144
144
db::maintain_self_labels(t, did, Some(cid), &at_uri, labels).await?;
145
145
}
146
146
if let Some(embed) = rec.embed.clone().and_then(|embed| embed.into_bsky()) {
147
147
-
db::post_embed_insert(t, &at_uri, embed, rec.created_at).await?;
147
147
+
db::post_embed_insert(t, &at_uri, embed, rec.created_at, true).await?;
148
148
}
149
149
150
150
deltas.incr(did, AggregateType::ProfilePost).await;
+42
-13
consumer/src/db/record.rs
reviewed
···
5
5
use deadpool_postgres::GenericClient;
6
6
use ipld_core::cid::Cid;
7
7
use lexica::community_lexicon::bookmarks::Bookmark;
8
8
+
use std::collections::HashSet;
8
9
9
10
pub async fn record_upsert<C: GenericClient>(
10
11
conn: &mut C,
···
317
318
repo: &str,
318
319
cid: Cid,
319
320
rec: AppBskyFeedPost,
321
321
+
is_backfill: bool,
320
322
) -> PgExecResult {
321
323
let cid = cid.to_string();
322
324
let record = serde_json::to_value(&rec).unwrap();
···
350
352
.await?;
351
353
352
354
if let Some(embed) = rec.embed.and_then(|embed| embed.into_bsky()) {
353
353
-
post_embed_insert(conn, at_uri, embed, rec.created_at).await?;
355
355
+
post_embed_insert(conn, at_uri, embed, rec.created_at, is_backfill).await?;
354
356
}
355
357
356
358
Ok(count)
···
380
382
post: &str,
381
383
embed: AppBskyEmbed,
382
384
created_at: DateTime<Utc>,
385
385
+
is_backfill: bool,
383
386
) -> PgExecResult {
384
387
match embed {
385
388
AppBskyEmbed::Images(embed) => post_embed_image_insert(conn, post, embed).await,
386
389
AppBskyEmbed::Video(embed) => post_embed_video_insert(conn, post, embed).await,
387
390
AppBskyEmbed::External(embed) => post_embed_external_insert(conn, post, embed).await,
388
391
AppBskyEmbed::Record(embed) => {
389
389
-
post_embed_record_insert(conn, post, embed, created_at).await
392
392
+
post_embed_record_insert(conn, post, embed, created_at, is_backfill).await
390
393
}
391
394
AppBskyEmbed::RecordWithMedia(embed) => {
392
392
-
post_embed_record_insert(conn, post, embed.record, created_at).await?;
395
395
+
post_embed_record_insert(conn, post, embed.record, created_at, is_backfill).await?;
393
396
match *embed.media {
394
397
AppBskyEmbed::Images(embed) => post_embed_image_insert(conn, post, embed).await,
395
398
AppBskyEmbed::Video(embed) => post_embed_video_insert(conn, post, embed).await,
···
476
479
).await
477
480
}
478
481
482
482
+
const PG_DISABLE_RULE: &str = "app.bsky.feed.postgate#disableRule";
479
483
async fn post_embed_record_insert<C: GenericClient>(
480
484
conn: &mut C,
481
485
post: &str,
482
486
embed: AppBskyEmbedRecord,
483
487
post_created_at: DateTime<Utc>,
488
488
+
is_backfill: bool,
484
489
) -> PgExecResult {
485
490
// strip "at://" then break into parts by '/'
486
491
let parts = embed.record.uri[5..].split('/').collect::<Vec<_>>();
487
492
488
493
let detached = if parts[1] == "app.bsky.feed.post" {
489
489
-
let postgate_effective: Option<DateTime<Utc>> = conn
490
490
-
.query_opt(
491
491
-
"SELECT created_at FROM postgates WHERE post_uri=$1",
492
492
-
&[&post],
493
493
-
)
494
494
-
.await?
495
495
-
.map(|v| v.get(0));
494
494
+
let pg_data = postgate_get(conn, post).await?;
496
495
497
497
-
postgate_effective
498
498
-
.map(|v| Utc::now().min(post_created_at) > v)
499
499
-
.unwrap_or_default()
496
496
+
if let Some((effective, detached, rules)) = pg_data {
497
497
+
let detached: HashSet<String> = HashSet::from_iter(detached);
498
498
+
let rules: HashSet<String> = HashSet::from_iter(rules);
499
499
+
let compare_date = match is_backfill {
500
500
+
true => post_created_at,
501
501
+
false => Utc::now(),
502
502
+
};
503
503
+
504
504
+
if detached.contains(post) {
505
505
+
true
506
506
+
} else if rules.contains(PG_DISABLE_RULE) && compare_date > effective {
507
507
+
true
508
508
+
} else {
509
509
+
false
510
510
+
}
511
511
+
} else {
512
512
+
false
513
513
+
}
500
514
} else {
501
515
false
502
516
};
···
505
519
"INSERT INTO post_embed_record (post_uri, record_type, uri, cid, detached) VALUES ($1, $2, $3, $4, $5)",
506
520
&[&post, &parts[1], &embed.record.uri, &embed.record.cid.to_string(), &detached],
507
521
).await
522
522
+
}
523
523
+
524
524
+
async fn postgate_get<C: GenericClient>(
525
525
+
conn: &mut C,
526
526
+
post: &str,
527
527
+
) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> {
528
528
+
let res = conn
529
529
+
.query_opt(
530
530
+
"SELECT created_at, detached, rules FROM postgates WHERE post_uri=$1",
531
531
+
&[&post],
532
532
+
)
533
533
+
.await?
534
534
+
.map(|v| (v.get(0), v.get(1), v.get(2)));
535
535
+
536
536
+
Ok(res)
508
537
}
509
538
510
539
pub async fn postgate_upsert<C: GenericClient>(
+1
-1
consumer/src/indexer/mod.rs
reviewed
···
625
625
});
626
626
627
627
let labels = record.labels.clone();
628
628
-
db::post_insert(conn, at_uri, repo, cid, record).await?;
628
628
+
db::post_insert(conn, at_uri, repo, cid, record, false).await?;
629
629
if let Some(labels) = labels {
630
630
db::maintain_self_labels(conn, repo, Some(cid), at_uri, labels).await?;
631
631
}