A decentralized music tracking and discovery platform built on AT Protocol 🎵 rocksky.app
spotify atproto lastfm musicbrainz scrobbling listenbrainz

feat(feed): implement feed service #8

closed opened by tsiry-sandratraina.com targeting main from feat/feed-generator
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:7vdlgi2bflelz7mmuxoqjfcr/sh.tangled.repo.pull/3lztmzurcri22
+30 -10
Interdiff #0 #1
Cargo.lock

This patch was likely rebased, as context lines do not match.

crates/feed/Cargo.toml

This file has not been changed.

crates/feed/src/config.rs

This file has not been changed.

+25 -6
crates/feed/src/feed.rs
··· 1 1 use crate::config::Config; 2 2 use crate::types::{DidDocument, FeedSkeletonParameters, Service}; 3 3 use crate::{feed_handler::FeedHandler, types::FeedSkeletonQuery}; 4 + use anyhow::Error; 5 + use sqlx::postgres::PgPoolOptions; 6 + use sqlx::{Pool, Postgres}; 7 + use std::env; 4 8 use std::fmt::Debug; 5 9 use std::net::SocketAddr; 10 + use std::sync::Arc; 6 11 use warp::Filter; 7 12 8 13 /// A `Feed` stores a `FeedHandler`, handles feed server endpoints & connects to the Firehose using the `start` methods. ··· 21 26 &mut self, 22 27 name: impl AsRef<str>, 23 28 address: impl Into<SocketAddr> + Debug + Clone + Send, 24 - ) -> impl std::future::Future<Output = ()> + Send { 29 + ) -> impl std::future::Future<Output = Result<(), Error>> + Send { 25 30 self.start_with_config(name, Config::load_env_config(), address) 26 31 } 27 32 ··· 39 44 name: impl AsRef<str>, 40 45 config: Config, 41 46 address: impl Into<SocketAddr> + Debug + Clone + Send, 42 - ) -> impl std::future::Future<Output = ()> + Send { 47 + ) -> impl std::future::Future<Output = Result<(), Error>> + Send { 43 48 let handler = self.handler(); 44 49 let address = address.clone(); 45 50 let feed_name = name.as_ref().to_string(); 46 51 47 52 async move { 48 53 let config = config; 54 + let pool = PgPoolOptions::new() 55 + .max_connections(5) 56 + .connect(&env::var("XATA_POSTGRES_URL")?) 57 + .await?; 58 + let pool = Arc::new(pool); 59 + let db_filter = warp::any().map(move || pool.clone()); 49 60 50 61 let did_config = config.clone(); 51 62 let did_json = warp::path(".well-known") ··· 56 67 let describe_feed_generator = warp::path("xrpc") 57 68 .and(warp::path("app.rocksky.feed.describeFeedGenerator")) 58 69 .and(warp::get()) 59 - .and_then(move || describe_feed_generator(feed_name.clone())); 70 + .and(db_filter.clone()) 71 + .and_then(move |_pool: Arc<Pool<Postgres>>| { 72 + describe_feed_generator(feed_name.clone()) 73 + }); 60 74 61 75 let get_feed_handler = handler.clone(); 62 76 let get_feed_skeleton = warp::path("xrpc") 63 77 .and(warp::path("app.rocksky.feed.getFeedSkeleton")) 64 78 .and(warp::get()) 65 79 .and(warp::query::<FeedSkeletonParameters>()) 66 - .and_then(move |query: FeedSkeletonParameters| { 67 - get_feed_skeleton::<Handler>(query.into(), get_feed_handler.clone()) 68 - }); 80 + .and(db_filter.clone()) 81 + .and_then( 82 + move |query: FeedSkeletonParameters, _pool: Arc<Pool<Postgres>>| { 83 + get_feed_skeleton::<Handler>(query.into(), get_feed_handler.clone()) 84 + }, 85 + ); 69 86 70 87 let api = did_json.or(describe_feed_generator).or(get_feed_skeleton); 71 88 ··· 101 118 tokio::join!(feed_server.run(address), firehose_listener) 102 119 .1 103 120 .expect("Couldn't await tasks"); 121 + 122 + Ok::<(), Error>(()) 104 123 } 105 124 } 106 125 }
crates/feed/src/feed_handler.rs

This file has not been changed.

+4 -3
crates/feed/src/lib.rs
··· 1 + use anyhow::Error; 1 2 use std::{env, net::SocketAddr, sync::Arc}; 2 - 3 3 use tokio::sync::Mutex; 4 4 5 5 use crate::{ ··· 42 42 } 43 43 } 44 44 45 - pub async fn run() { 45 + pub async fn run() -> Result<(), Error> { 46 46 let mut feed = RecentlyPlayedFeed { 47 47 handler: RecentlyPlayedFeedHandler { 48 48 scrobbles: Arc::new(Mutex::new(Vec::new())), ··· 53 53 let addr_str = format!("{}:{}", host, port); 54 54 let addr: SocketAddr = addr_str.parse().expect("Invalid address format"); 55 55 56 - feed.start("RecentlyPlayed", addr).await; 56 + feed.start("RecentlyPlayed", addr).await?; 57 + Ok(()) 57 58 }
crates/feed/src/types.rs

This file has not been changed.

crates/rockskyd/Cargo.toml

This file has not been changed.

+1 -1
crates/rockskyd/src/cmd/feed.rs
··· 1 1 use anyhow::Error; 2 2 3 3 pub async fn serve() -> Result<(), Error> { 4 - rocksky_feed::run().await; 4 + rocksky_feed::run().await?; 5 5 Ok(()) 6 6 }
crates/rockskyd/src/cmd/mod.rs

This file has not been changed.

crates/rockskyd/src/main.rs

This file has not been changed.

History

2 rounds 0 comments
sign up or login to add to the discussion
4 commits
expand
feat(feed): implement feed service with configuration and handler
feat(feed): add feed command with serve subcommand and integrate feed module
fix: update feed and lib modules to return Result types for better error handling
feat: add new dependencies for moka, multer, multibase, multihash, nanorand, and native-tls
expand 0 comments
closed without merging
2 commits
expand
feat(feed): implement feed service with configuration and handler
feat(feed): add feed command with serve subcommand and integrate feed module
expand 0 comments