Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm

Merge pull request #52 from at-microcosm/jv-bind

bind options and general app configs

authored by bad-example.com and committed by

GitHub d3d84a6e e0f43fe7

+187 -140
+7 -7
Cargo.lock
··· 992 992 993 993 [[package]] 994 994 name = "clap" 995 - version = "4.5.48" 995 + version = "4.5.56" 996 996 source = "registry+https://github.com/rust-lang/crates.io-index" 997 - checksum = "e2134bb3ea021b78629caa971416385309e0131b351b25e01dc16fb54e1b5fae" 997 + checksum = "a75ca66430e33a14957acc24c5077b503e7d374151b2b4b3a10c83b4ceb4be0e" 998 998 dependencies = [ 999 999 "clap_builder", 1000 1000 "clap_derive", ··· 1002 1002 1003 1003 [[package]] 1004 1004 name = "clap_builder" 1005 - version = "4.5.48" 1005 + version = "4.5.56" 1006 1006 source = "registry+https://github.com/rust-lang/crates.io-index" 1007 - checksum = "c2ba64afa3c0a6df7fa517765e31314e983f51dda798ffba27b988194fb65dc9" 1007 + checksum = "793207c7fa6300a0608d1080b858e5fdbe713cdc1c8db9fb17777d8a13e63df0" 1008 1008 dependencies = [ 1009 1009 "anstream", 1010 1010 "anstyle", ··· 1014 1014 1015 1015 [[package]] 1016 1016 name = "clap_derive" 1017 - version = "4.5.47" 1017 + version = "4.5.55" 1018 1018 source = "registry+https://github.com/rust-lang/crates.io-index" 1019 - checksum = "bbfd7eae0b0f1a6e63d4b13c9c478de77c2eb546fba158ad50b4203dc24b9f9c" 1019 + checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5" 1020 1020 dependencies = [ 1021 1021 "heck", 1022 1022 "proc-macro2", ··· 1375 1375 checksum = "18e4fdb82bd54a12e42fb58a800dcae6b9e13982238ce2296dc3570b92148e1f" 1376 1376 dependencies = [ 1377 1377 "data-encoding", 1378 - "syn 1.0.109", 1378 + "syn 2.0.106", 1379 1379 ] 1380 1380 1381 1381 [[package]]
+3
Cargo.toml
··· 13 13 "pocket", 14 14 "reflector", 15 15 ] 16 + 17 + [workspace.dependencies] 18 + clap = { version = "4.5.56", features = ["derive", "env"] }
+1 -1
constellation/Cargo.toml
··· 11 11 axum-extra = { version = "0.10.0", features = ["query", "typed-header"] } 12 12 axum-metrics = "0.2" 13 13 bincode = "1.3.3" 14 - clap = { version = "4.5.26", features = ["derive"] } 14 + clap = { workspace = true } 15 15 ctrlc = "3.4.5" 16 16 flume = { version = "0.11.1", default-features = false } 17 17 fs4 = { version = "0.12.0", features = ["sync"] }
+47 -48
constellation/src/bin/main.rs
··· 26 26 #[arg(long)] 27 27 #[clap(default_value = "0.0.0.0:6789")] 28 28 bind: SocketAddr, 29 - /// optionally disable the metrics server 30 - #[arg(long)] 31 - #[clap(default_value_t = false)] 29 + /// enable metrics collection and serving 30 + #[arg(long, action)] 32 31 collect_metrics: bool, 33 32 /// metrics server's listen address 34 - #[arg(long)] 33 + #[arg(long, requires("collect_metrics"))] 35 34 #[clap(default_value = "0.0.0.0:8765")] 36 35 bind_metrics: SocketAddr, 37 36 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: ··· 222 221 // only spawn monitoring thread if the metrics server is running 223 222 if collect_metrics { 224 223 s.spawn(move || { // monitor thread 225 - let stay_alive = stay_alive.clone(); 226 - let check_alive = stay_alive.clone(); 227 - 228 - let process_collector = metrics_process::Collector::default(); 229 - process_collector.describe(); 230 - metrics::describe_gauge!( 231 - "storage_available", 232 - metrics::Unit::Bytes, 233 - "available to be allocated" 234 - ); 235 - metrics::describe_gauge!( 236 - "storage_free", 237 - metrics::Unit::Bytes, 238 - "unused bytes in filesystem" 239 - ); 240 - if let Some(ref p) = data_dir { 241 - if let Err(e) = fs4::available_space(p) { 242 - eprintln!("fs4 failed to get available space. may not be supported here? space metrics may be absent. e: {e:?}"); 243 - } else { 244 - println!("disk space monitoring should work, watching at {p:?}"); 245 - } 246 - } 224 + let stay_alive = stay_alive.clone(); 225 + let check_alive = stay_alive.clone(); 247 226 248 - 'monitor: loop { 249 - match readable.get_stats() { 250 - Ok(StorageStats { dids, targetables, linking_records, .. }) => { 251 - metrics::gauge!("storage.stats.dids").set(dids as f64); 252 - metrics::gauge!("storage.stats.targetables").set(targetables as f64); 253 - metrics::gauge!("storage.stats.linking_records").set(linking_records as f64); 227 + let process_collector = metrics_process::Collector::default(); 228 + process_collector.describe(); 229 + metrics::describe_gauge!( 230 + "storage_available", 231 + metrics::Unit::Bytes, 232 + "available to be allocated" 233 + ); 234 + metrics::describe_gauge!( 235 + "storage_free", 236 + metrics::Unit::Bytes, 237 + "unused bytes in filesystem" 238 + ); 239 + if let Some(ref p) = data_dir { 240 + if let Err(e) = fs4::available_space(p) { 241 + eprintln!("fs4 failed to get available space. may not be supported here? space metrics may be absent. e: {e:?}"); 242 + } else { 243 + println!("disk space monitoring should work, watching at {p:?}"); 254 244 } 255 - Err(e) => eprintln!("failed to get stats: {e:?}"), 256 245 } 257 246 258 - process_collector.collect(); 259 - if let Some(ref p) = data_dir { 260 - if let Ok(avail) = fs4::available_space(p) { 261 - metrics::gauge!("storage.available").set(avail as f64); 247 + 'monitor: loop { 248 + match readable.get_stats() { 249 + Ok(StorageStats { dids, targetables, linking_records, .. }) => { 250 + metrics::gauge!("storage.stats.dids").set(dids as f64); 251 + metrics::gauge!("storage.stats.targetables").set(targetables as f64); 252 + metrics::gauge!("storage.stats.linking_records").set(linking_records as f64); 253 + } 254 + Err(e) => eprintln!("failed to get stats: {e:?}"), 262 255 } 263 - if let Ok(free) = fs4::free_space(p) { 264 - metrics::gauge!("storage.free").set(free as f64); 256 + 257 + process_collector.collect(); 258 + if let Some(ref p) = data_dir { 259 + if let Ok(avail) = fs4::available_space(p) { 260 + metrics::gauge!("storage.available").set(avail as f64); 261 + } 262 + if let Ok(free) = fs4::free_space(p) { 263 + metrics::gauge!("storage.free").set(free as f64); 264 + } 265 265 } 266 - } 267 - let wait = time::Instant::now(); 268 - while wait.elapsed() < MONITOR_INTERVAL { 269 - thread::sleep(time::Duration::from_millis(100)); 270 - if check_alive.is_cancelled() { 271 - break 'monitor 266 + let wait = time::Instant::now(); 267 + while wait.elapsed() < MONITOR_INTERVAL { 268 + thread::sleep(time::Duration::from_millis(100)); 269 + if check_alive.is_cancelled() { 270 + break 'monitor 271 + } 272 272 } 273 273 } 274 - } 275 - stay_alive.drop_guard(); 276 - }); 274 + stay_alive.drop_guard(); 275 + }); 277 276 } 278 277 }); 279 278
-2
constellation/src/server/mod.rs
··· 355 355 #[serde(skip_serializing)] 356 356 query: GetLinksCountQuery, 357 357 } 358 - #[deprecated] 359 358 fn count_links( 360 359 accept: ExtractAccept, 361 360 query: Query<GetLinksCountQuery>, ··· 715 714 #[serde(skip_serializing)] 716 715 query: GetAllLinksQuery, 717 716 } 718 - #[deprecated] 719 717 fn count_all_links( 720 718 accept: ExtractAccept, 721 719 query: Query<GetAllLinksQuery>,
+1 -1
pocket/Cargo.toml
··· 5 5 6 6 [dependencies] 7 7 atrium-crypto = "0.1.2" 8 - clap = { version = "4.5.41", features = ["derive"] } 8 + clap = { workspace = true } 9 9 jwt-compact = { git = "https://github.com/fatfingers23/jwt-compact.git", features = ["es256k"] } 10 10 log = "0.4.27" 11 11 poem = { version = "3.1.12", features = ["acme", "static-files"] }
+1 -1
quasar/Cargo.toml
··· 4 4 edition = "2024" 5 5 6 6 [dependencies] 7 - clap = { version = "4.5.46", features = ["derive"] } 7 + clap = { workspace = true } 8 8 fjall = "2.11.2"
+1 -1
reflector/Cargo.toml
··· 4 4 edition = "2024" 5 5 6 6 [dependencies] 7 - clap = { version = "4.5.47", features = ["derive"] } 7 + clap = { workspace = true } 8 8 log = "0.4.28" 9 9 poem = "3.1.12" 10 10 serde = { version = "1.0.219", features = ["derive"] }
+1 -1
slingshot/Cargo.toml
··· 8 8 atrium-common = { git = "https://github.com/uniphil/atrium.git", branch = "fix/resolve-handle-https-accept-whitespace" } 9 9 atrium-identity = { git = "https://github.com/uniphil/atrium.git", branch = "fix/resolve-handle-https-accept-whitespace" } 10 10 atrium-oauth = { git = "https://github.com/uniphil/atrium.git", branch = "fix/resolve-handle-https-accept-whitespace" } 11 - clap = { version = "4.5.41", features = ["derive"] } 11 + clap = { workspace = true } 12 12 ctrlc = "3.4.7" 13 13 foyer = { version = "0.18.0", features = ["serde"] } 14 14 hickory-resolver = "0.25.2"
+51 -26
slingshot/src/main.rs
··· 15 15 struct Args { 16 16 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 17 17 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 18 - #[arg(long)] 18 + #[arg(long, env = "SLINGSHOT_JETSTREAM")] 19 19 jetstream: String, 20 20 /// don't request zstd-compressed jetstream events 21 21 /// 22 22 /// reduces CPU at the expense of more ingress bandwidth 23 - #[arg(long, action)] 23 + #[arg(long, action, env = "SLINGSHOT_JETSTREAM_NO_ZSTD")] 24 24 jetstream_no_zstd: bool, 25 25 /// where to keep disk caches 26 - #[arg(long)] 26 + #[arg(long, env = "SLINGSHOT_CACHE_DIR")] 27 27 cache_dir: PathBuf, 28 + /// where to listen for incomming requests 29 + /// 30 + /// cannot be used with acme -- if you need ipv6 see --acme-ipv6 31 + #[arg(long, env = "SLINGSHOT_BIND")] 32 + #[clap(default_value = "0.0.0.0:8080")] 33 + bind: std::net::SocketAddr, 28 34 /// the domain pointing to this server 29 35 /// 30 36 /// if present: 31 37 /// - a did:web document will be served at /.well-known/did.json 32 38 /// - an HTTPS certs will be automatically configured with Acme/letsencrypt 33 39 /// - TODO: a rate-limiter will be installed 34 - #[arg(long)] 35 - domain: Option<String>, 40 + #[arg( 41 + long, 42 + conflicts_with("bind"), 43 + requires("acme_cache_path"), 44 + env = "SLINGSHOT_ACME_DOMAIN" 45 + )] 46 + acme_domain: Option<String>, 36 47 /// email address for letsencrypt contact 37 48 /// 38 49 /// recommended in production, i guess? 39 - #[arg(long)] 50 + #[arg(long, requires("acme_domain"), env = "SLINGSHOT_ACME_CONTACT")] 40 51 acme_contact: Option<String>, 41 52 /// a location to cache acme https certs 42 53 /// 43 - /// only used if --host is specified. omitting requires re-requesting certs 44 - /// on every restart, and letsencrypt has rate limits that are easy to hit. 54 + /// required when (and only used when) --acme-domain is specified. 45 55 /// 46 56 /// recommended in production, but mind the file permissions. 47 - #[arg(long)] 48 - certs: Option<PathBuf>, 57 + #[arg(long, requires("acme_domain"), env = "SLINGSHOT_ACME_CACHE_PATH")] 58 + acme_cache_path: Option<PathBuf>, 59 + /// listen for ipv6 when using acme 60 + /// 61 + /// you must also configure the relevant DNS records for this to work 62 + #[arg(long, action, requires("acme_domain"), env = "SLINGSHOT_ACME_IPV6")] 63 + acme_ipv6: bool, 49 64 /// an web address to send healtcheck pings to every ~51s or so 50 - #[arg(long)] 65 + #[arg(long, env = "SLINGSHOT_HEALTHCHECK")] 51 66 healthcheck: Option<String>, 67 + /// enable metrics collection and serving 68 + #[arg(long, action, env = "SLINGSHOT_COLLECT_METRICS")] 69 + collect_metrics: bool, 70 + /// metrics server's listen address 71 + #[arg(long, requires("collect_metrics"), env = "SLINGSHOT_BIND_METRICS")] 72 + #[clap(default_value = "[::]:8765")] 73 + bind_metrics: std::net::SocketAddr, 52 74 } 53 75 54 76 #[tokio::main] ··· 62 84 63 85 let args = Args::parse(); 64 86 65 - if let Err(e) = install_metrics_server() { 66 - log::error!("failed to install metrics server: {e:?}"); 67 - } else { 68 - log::info!("metrics listening at http://0.0.0.0:8765"); 87 + if args.collect_metrics { 88 + log::trace!("installing metrics server..."); 89 + if let Err(e) = install_metrics_server(args.bind_metrics) { 90 + log::error!("failed to install metrics server: {e:?}"); 91 + } else { 92 + log::info!("metrics listening at http://{}", args.bind_metrics); 93 + } 69 94 } 70 95 71 96 std::fs::create_dir_all(&args.cache_dir).map_err(|e| { ··· 104 129 105 130 let server_shutdown = shutdown.clone(); 106 131 let server_cache_handle = cache.clone(); 132 + let bind = args.bind; 107 133 tasks.spawn(async move { 108 134 serve( 109 135 server_cache_handle, 110 136 identity, 111 137 repo, 112 - args.domain, 138 + args.acme_domain, 113 139 args.acme_contact, 114 - args.certs, 140 + args.acme_cache_path, 141 + args.acme_ipv6, 115 142 server_shutdown, 143 + bind, 116 144 ) 117 145 .await?; 118 146 Ok(()) ··· 172 200 Ok(()) 173 201 } 174 202 175 - fn install_metrics_server() -> Result<(), metrics_exporter_prometheus::BuildError> { 203 + fn install_metrics_server( 204 + bind_metrics: std::net::SocketAddr, 205 + ) -> Result<(), metrics_exporter_prometheus::BuildError> { 176 206 log::info!("installing metrics server..."); 177 - let host = [0, 0, 0, 0]; 178 - let port = 8765; 179 207 PrometheusBuilder::new() 180 208 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 181 209 .set_bucket_duration(std::time::Duration::from_secs(300))? 182 210 .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here. 183 211 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 184 - .with_http_listener((host, port)) 212 + .with_http_listener(bind_metrics) 185 213 .install()?; 186 214 log::info!( 187 - "metrics server installed! listening on http://{}.{}.{}.{}:{port}", 188 - host[0], 189 - host[1], 190 - host[2], 191 - host[3] 215 + "metrics server installed! listening on http://{}", 216 + bind_metrics 192 217 ); 193 218 Ok(()) 194 219 }
+11 -8
slingshot/src/server.rs
··· 687 687 make_sync(move |_| doc.clone()) 688 688 } 689 689 690 + #[allow(clippy::too_many_arguments)] 690 691 pub async fn serve( 691 692 cache: HybridCache<String, CachedRecord>, 692 693 identity: Identity, 693 694 repo: Repo, 694 - domain: Option<String>, 695 + acme_domain: Option<String>, 695 696 acme_contact: Option<String>, 696 - certs: Option<PathBuf>, 697 + acme_cache_path: Option<PathBuf>, 698 + acme_ipv6: bool, 697 699 shutdown: CancellationToken, 700 + bind: std::net::SocketAddr, 698 701 ) -> Result<(), ServerError> { 699 702 let repo = Arc::new(repo); 700 703 let api_service = OpenApiService::new( ··· 706 709 "Slingshot", 707 710 env!("CARGO_PKG_VERSION"), 708 711 ) 709 - .server(if let Some(ref h) = domain { 712 + .server(if let Some(ref h) = acme_domain { 710 713 format!("https://{h}") 711 714 } else { 712 715 "http://localhost:3000".to_string() ··· 727 730 .nest("/openapi", api_service.spec_endpoint()) 728 731 .nest("/xrpc/", api_service); 729 732 730 - if let Some(domain) = domain { 733 + if let Some(domain) = acme_domain { 731 734 rustls::crypto::aws_lc_rs::default_provider() 732 735 .install_default() 733 736 .expect("alskfjalksdjf"); ··· 740 743 if let Some(contact) = acme_contact { 741 744 auto_cert = auto_cert.contact(contact); 742 745 } 743 - if let Some(certs) = certs { 744 - auto_cert = auto_cert.cache_path(certs); 746 + if let Some(cache_path) = acme_cache_path { 747 + auto_cert = auto_cert.cache_path(cache_path); 745 748 } 746 749 let auto_cert = auto_cert.build().map_err(ServerError::AcmeBuildError)?; 747 750 748 751 run( 749 - TcpListener::bind("0.0.0.0:443").acme(auto_cert), 752 + TcpListener::bind(if acme_ipv6 { "[::]:443" } else { "0.0.0.0:443" }).acme(auto_cert), 750 753 app, 751 754 shutdown, 752 755 ) 753 756 .await 754 757 } else { 755 - run(TcpListener::bind("127.0.0.1:3000"), app, shutdown).await 758 + run(TcpListener::bind(bind), app, shutdown).await 756 759 } 757 760 } 758 761
+1 -1
spacedust/Cargo.toml
··· 5 5 6 6 [dependencies] 7 7 async-trait = "0.1.88" 8 - clap = { version = "4.5.40", features = ["derive"] } 8 + clap = { workspace = true } 9 9 ctrlc = "3.4.7" 10 10 dropshot = "0.16.2" 11 11 env_logger = "0.11.8"
+26 -17
spacedust/src/main.rs
··· 16 16 struct Args { 17 17 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 18 18 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 19 - #[arg(long)] 19 + #[arg(long, env = "SPACEDUST_JETSTREAM")] 20 20 jetstream: String, 21 21 /// don't request zstd-compressed jetstream events 22 22 /// 23 23 /// reduces CPU at the expense of more ingress bandwidth 24 - #[arg(long, action)] 24 + #[arg(long, action, env = "SPACEDUST_JETSTREAM_NO_ZSTD")] 25 25 jetstream_no_zstd: bool, 26 + /// spacedust server's listen address 27 + #[arg(long, env = "SPACEDUST_BIND")] 28 + #[clap(default_value = "[::]:8080")] 29 + bind: std::net::SocketAddr, 30 + /// enable metrics collection and serving 31 + #[arg(long, action, env = "SPACEDUST_COLLECT_METRICS")] 32 + collect_metrics: bool, 33 + /// metrics server's listen address 34 + #[arg(long, requires("collect_metrics"), env = "SPACEDUST_BIND_METRICS")] 35 + #[clap(default_value = "[::]:8765")] 36 + bind_metrics: std::net::SocketAddr, 26 37 } 27 38 28 39 #[tokio::main] ··· 60 71 61 72 let args = Args::parse(); 62 73 63 - if let Err(e) = install_metrics_server() { 64 - log::error!("failed to install metrics server: {e:?}"); 65 - }; 74 + if args.collect_metrics { 75 + log::trace!("installing metrics server..."); 76 + if let Err(e) = install_metrics_server(args.bind_metrics) { 77 + log::error!("failed to install metrics server: {e:?}"); 78 + }; 79 + } 66 80 67 81 let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new(); 68 82 69 83 let server_shutdown = shutdown.clone(); 84 + let bind = args.bind; 70 85 tasks.spawn(async move { 71 - server::serve(b, d, server_shutdown).await?; 86 + server::serve(b, d, server_shutdown, bind).await?; 72 87 Ok(()) 73 88 }); 74 89 ··· 122 137 Ok(()) 123 138 } 124 139 125 - fn install_metrics_server() -> Result<(), metrics_exporter_prometheus::BuildError> { 140 + fn install_metrics_server( 141 + bind: std::net::SocketAddr, 142 + ) -> Result<(), metrics_exporter_prometheus::BuildError> { 126 143 log::info!("installing metrics server..."); 127 - let host = [0, 0, 0, 0]; 128 - let port = 8765; 129 144 PrometheusBuilder::new() 130 145 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 131 146 .set_bucket_duration(std::time::Duration::from_secs(300))? 132 147 .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here. 133 148 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 134 - .with_http_listener((host, port)) 149 + .with_http_listener(bind) 135 150 .install()?; 136 - log::info!( 137 - "metrics server installed! listening on http://{}.{}.{}.{}:{port}", 138 - host[0], 139 - host[1], 140 - host[2], 141 - host[3] 142 - ); 151 + log::info!("metrics server installed! listening on {bind}"); 143 152 Ok(()) 144 153 }
+2 -1
spacedust/src/server.rs
··· 29 29 b: broadcast::Sender<Arc<ClientMessage>>, 30 30 d: broadcast::Sender<Arc<ClientMessage>>, 31 31 shutdown: CancellationToken, 32 + bind: std::net::SocketAddr, 32 33 ) -> Result<(), ServerError> { 33 34 let config_logging = ConfigLogging::StderrTerminal { 34 35 level: ConfigLoggingLevel::Info, ··· 72 73 73 74 let server = ServerBuilder::new(api, ctx, log) 74 75 .config(ConfigDropshot { 75 - bind_address: "0.0.0.0:9998".parse().unwrap(), 76 + bind_address: bind, 76 77 ..Default::default() 77 78 }) 78 79 .start()?;
+1 -1
ufos/Cargo.toml
··· 10 10 bincode = { version = "2.0.1", features = ["serde"] } 11 11 cardinality-estimator-safe = { version = "4.0.2", features = ["with_serde", "with_digest"] } 12 12 chrono = { version = "0.4.41", features = ["serde"] } 13 - clap = { version = "4.5.31", features = ["derive"] } 13 + clap = { workspace = true } 14 14 dropshot = "0.16.0" 15 15 env_logger = "0.11.7" 16 16 fjall = { git = "https://github.com/fjall-rs/fjall.git", rev = "fb229572bb7d1d6966a596994dc1708e47ec57d8", features = ["lz4"] }
+27 -21
ufos/src/main.rs
··· 26 26 struct Args { 27 27 /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value: 28 28 /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2' 29 - #[arg(long)] 29 + #[arg(long, env = "UFOS_JETSTREAM")] 30 30 jetstream: String, 31 31 /// allow changing jetstream endpoints 32 - #[arg(long, action)] 32 + #[arg(long, action, env = "UFOS_JETSTREAM_FORCE")] 33 33 jetstream_force: bool, 34 34 /// don't request zstd-compressed jetstream events 35 35 /// 36 36 /// reduces CPU at the expense of more ingress bandwidth 37 - #[arg(long, action)] 37 + #[arg(long, action, env = "UFOS_JETSTREAM_NO_ZSTD")] 38 38 jetstream_no_zstd: bool, 39 + /// ufos server's listen address 40 + #[arg(long, env = "UFOS_BIND")] 41 + #[clap(default_value = "0.0.0.0:9990")] 42 + bind: std::net::SocketAddr, 39 43 /// Location to store persist data to disk 40 - #[arg(long)] 44 + #[arg(long, env = "UFOS_DATA")] 41 45 data: PathBuf, 42 46 /// DEBUG: don't start the jetstream consumer or its write loop 43 - #[arg(long, action)] 47 + #[arg(long, action, env = "UFOS_PAUSE_WRITER")] 44 48 pause_writer: bool, 45 49 /// Adjust runtime settings like background task intervals for efficient backfill 46 - #[arg(long, action)] 50 + #[arg(long, action, env = "UFOS_BACKFILL_MODE")] 47 51 backfill: bool, 48 52 /// DEBUG: force the rw loop to fall behind by pausing it 49 53 /// todo: restore this 50 54 #[arg(long, action)] 51 55 pause_rw: bool, 52 56 /// reset the rollup cursor, scrape through missed things in the past (backfill) 53 - #[arg(long, action)] 57 + #[arg(long, action, env = "UFOS_REROLL")] 54 58 reroll: bool, 55 59 /// DEBUG: interpret jetstream as a file fixture 56 - #[arg(long, action)] 60 + #[arg(long, action, env = "UFOS_JETSTREAM_FIXTURE")] 57 61 jetstream_fixture: bool, 62 + /// enable metrics collection and serving 63 + #[arg(long, action, env = "UFOS_COLLECT_METRICS")] 64 + collect_metrics: bool, 65 + /// metrics server's listen address 66 + #[arg(long, env = "UFOS_BIND_METRICS")] 67 + #[clap(default_value = "0.0.0.0:8765")] 68 + bind_metrics: std::net::SocketAddr, 58 69 } 59 70 60 71 #[tokio::main] ··· 84 95 let mut consumer_tasks: JoinSet<anyhow::Result<()>> = JoinSet::new(); 85 96 86 97 println!("starting server with storage..."); 87 - let serving = server::serve(read_store.clone()); 98 + let serving = server::serve(read_store.clone(), args.bind); 88 99 whatever_tasks.spawn(async move { 89 100 serving.await.map_err(|e| { 90 101 log::warn!("server ended: {e}"); ··· 137 148 Ok(()) 138 149 }); 139 150 140 - install_metrics_server()?; 151 + if args.collect_metrics { 152 + log::trace!("installing metrics server..."); 153 + install_metrics_server(args.bind_metrics)?; 154 + } 141 155 142 156 for (i, t) in consumer_tasks.join_all().await.iter().enumerate() { 143 157 log::warn!("task {i} done: {t:?}"); ··· 151 165 Ok(()) 152 166 } 153 167 154 - fn install_metrics_server() -> anyhow::Result<()> { 168 + fn install_metrics_server(bind: std::net::SocketAddr) -> anyhow::Result<()> { 155 169 log::info!("installing metrics server..."); 156 - let host = [0, 0, 0, 0]; 157 - let port = 8765; 158 170 PrometheusBuilder::new() 159 171 .set_quantiles(&[0.5, 0.9, 0.99, 1.0])? 160 172 .set_bucket_duration(Duration::from_secs(60))? 161 173 .set_bucket_count(std::num::NonZero::new(10).unwrap()) // count * duration = 10 mins. stuff doesn't happen that fast here. 162 174 .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage) 163 - .with_http_listener((host, port)) 175 + .with_http_listener(bind) 164 176 .install()?; 165 - log::info!( 166 - "metrics server installed! listening on http://{}.{}.{}.{}:{port}", 167 - host[0], 168 - host[1], 169 - host[2], 170 - host[3] 171 - ); 177 + log::info!("metrics server installed! listening on {bind}"); 172 178 Ok(()) 173 179 } 174 180
+5 -2
ufos/src/server/mod.rs
··· 716 716 .await 717 717 } 718 718 719 - pub async fn serve(storage: impl StoreReader + 'static) -> Result<(), String> { 719 + pub async fn serve( 720 + storage: impl StoreReader + 'static, 721 + bind: std::net::SocketAddr, 722 + ) -> Result<(), String> { 720 723 describe_metrics(); 721 724 let log = ConfigLogging::StderrTerminal { 722 725 level: ConfigLoggingLevel::Warn, ··· 758 761 759 762 ServerBuilder::new(api, context, log) 760 763 .config(ConfigDropshot { 761 - bind_address: "0.0.0.0:9999".parse().unwrap(), 764 + bind_address: bind, 762 765 ..Default::default() 763 766 }) 764 767 .start()
+1 -1
who-am-i/Cargo.toml
··· 11 11 axum = "0.8.4" 12 12 axum-extra = { version = "0.10.1", features = ["cookie-signed", "typed-header"] } 13 13 axum-template = { version = "3.0.0", features = ["handlebars"] } 14 - clap = { version = "4.5.40", features = ["derive", "env"] } 14 + clap = { workspace = true } 15 15 ctrlc = "3.4.7" 16 16 dashmap = "6.1.0" 17 17 elliptic-curve = "0.13.8"