diff --git a/Cargo.lock b/Cargo.lock index 65a7623..a98949a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1015,6 +1015,7 @@ dependencies = [ name = "cfgsync" version = "0.1.0" dependencies = [ + "anyhow", "axum", "clap", "groth16", diff --git a/testing-framework/deployers/compose/src/deployer/mod.rs b/testing-framework/deployers/compose/src/deployer/mod.rs index 605821a..22c6acb 100644 --- a/testing-framework/deployers/compose/src/deployer/mod.rs +++ b/testing-framework/deployers/compose/src/deployer/mod.rs @@ -125,7 +125,8 @@ mod tests { Some(topology.nodes().map(|node| node.da_port).collect()), Some(topology.nodes().map(|node| node.blend_port).collect()), hosts, - ); + ) + .expect("cfgsync config generation should succeed"); let configs_by_identifier: HashMap<_, _> = configs .into_iter() .map(|(host, config)| (host.identifier, config)) @@ -179,7 +180,8 @@ mod tests { Some(topology.nodes().map(|node| node.da_port).collect()), Some(topology.nodes().map(|node| node.blend_port).collect()), hosts, - ); + ) + .expect("cfgsync config generation should succeed"); let configs_by_identifier: HashMap<_, _> = configs .into_iter() .map(|(host, config)| (host.identifier, config)) @@ -217,7 +219,8 @@ mod tests { Some(topology.nodes().map(|node| node.da_port).collect()), Some(topology.nodes().map(|node| node.blend_port).collect()), hosts, - ); + ) + .expect("cfgsync config generation should succeed"); for (host, config) in configs { let genesis = &config.consensus_config.genesis_tx; @@ -250,7 +253,8 @@ mod tests { Some(topology.nodes().map(|node| node.da_port).collect()), Some(topology.nodes().map(|node| node.blend_port).collect()), hosts, - ); + ) + .expect("cfgsync config generation should succeed"); let configs_by_identifier: HashMap<_, _> = configs .into_iter() .map(|(host, config)| (host.identifier, config)) diff --git a/testing-framework/tools/cfgsync/Cargo.toml b/testing-framework/tools/cfgsync/Cargo.toml index 1d68716..d672fd1 100644 --- a/testing-framework/tools/cfgsync/Cargo.toml +++ b/testing-framework/tools/cfgsync/Cargo.toml @@ -13,6 +13,7 @@ version = { workspace = true } workspace = true [dependencies] +anyhow = "1" axum = { default-features = false, features = ["http1", "http2", "json", "tokio"], version = "0.7.5" } clap = { default-features = false, version = "4" } groth16 = { workspace = true } diff --git a/testing-framework/tools/cfgsync/src/bin/cfgsync-server.rs b/testing-framework/tools/cfgsync/src/bin/cfgsync-server.rs index 0602e76..8b2aa75 100644 --- a/testing-framework/tools/cfgsync/src/bin/cfgsync-server.rs +++ b/testing-framework/tools/cfgsync/src/bin/cfgsync-server.rs @@ -1,5 +1,6 @@ -use std::{path::PathBuf, process}; +use std::path::PathBuf; +use anyhow::Context as _; use cfgsync::server::{CfgSyncConfig, cfgsync_app}; use clap::Parser; use tokio::net::TcpListener; @@ -11,19 +12,29 @@ struct Args { } #[tokio::main] -async fn main() { +async fn main() -> anyhow::Result<()> { let cli = Args::parse(); - let config = CfgSyncConfig::load_from_file(&cli.config).unwrap_or_else(|err| { - eprintln!("{err}"); - process::exit(1); - }); + let config = CfgSyncConfig::load_from_file(&cli.config) + .map_err(anyhow::Error::msg) + .with_context(|| { + format!( + "failed to load cfgsync config from {}", + cli.config.display() + ) + })?; let port = config.port; let app = cfgsync_app(config.into()); println!("Server running on http://0.0.0.0:{port}"); - let listener = TcpListener::bind(&format!("0.0.0.0:{port}")).await.unwrap(); + let listener = TcpListener::bind(&format!("0.0.0.0:{port}")) + .await + .with_context(|| format!("failed to bind cfgsync server on 0.0.0.0:{port}"))?; - axum::serve(listener, app).await.unwrap(); + axum::serve(listener, app) + .await + .context("cfgsync server terminated unexpectedly")?; + + Ok(()) } diff --git a/testing-framework/tools/cfgsync/src/config/builder.rs b/testing-framework/tools/cfgsync/src/config/builder.rs index a3f0069..1e50cb2 100644 --- a/testing-framework/tools/cfgsync/src/config/builder.rs +++ b/testing-framework/tools/cfgsync/src/config/builder.rs @@ -28,7 +28,6 @@ use crate::{ network::rewrite_initial_peers, }; -#[must_use] pub fn create_node_configs( consensus_params: &ConsensusParams, da_params: &DaParams, @@ -38,7 +37,7 @@ pub fn create_node_configs( da_ports: Option>, blend_ports: Option>, hosts: Vec, -) -> HashMap { +) -> Result, NodeConfigBuildError> { try_create_node_configs( consensus_params, da_params, @@ -49,7 +48,6 @@ pub fn create_node_configs( blend_ports, hosts, ) - .expect("failed to build cfgsync node configs") } #[derive(Debug, Error)] @@ -77,6 +75,8 @@ pub enum NodeConfigBuildError { MissingConsensusConfig, #[error("host/config length mismatch")] HostConfigLenMismatch, + #[error(transparent)] + PeerRewrite(#[from] crate::network::peers::PeerRewriteError), } pub fn try_create_node_configs( @@ -137,7 +137,7 @@ pub fn try_create_node_configs( &original_network_ports, &hosts, &peer_ids, - ); + )?; let providers = try_create_providers(&hosts, &consensus_configs, &blend_configs, &da_configs)?; @@ -182,7 +182,7 @@ pub fn try_create_node_configs( } let mut network_config = network_configs[i].clone(); - network_config.backend.swarm.host = Ipv4Addr::from_str("0.0.0.0").unwrap(); + network_config.backend.swarm.host = Ipv4Addr::UNSPECIFIED; network_config.backend.swarm.port = host.network_port; network_config.backend.initial_peers = host_network_init_peers[i].clone(); let nat_value = format!("/ip4/{}/udp/{}/quic-v1", host.ip, host.network_port); diff --git a/testing-framework/tools/cfgsync/src/config/providers.rs b/testing-framework/tools/cfgsync/src/config/providers.rs index 40ae3c2..d084424 100644 --- a/testing-framework/tools/cfgsync/src/config/providers.rs +++ b/testing-framework/tools/cfgsync/src/config/providers.rs @@ -98,7 +98,6 @@ pub fn create_providers( consensus_configs: &[GeneralConsensusConfig], blend_configs: &[GeneralBlendConfig], da_configs: &[GeneralDaConfig], -) -> Vec { +) -> Result, ProviderBuildError> { try_create_providers(hosts, consensus_configs, blend_configs, da_configs) - .expect("failed to build providers") } diff --git a/testing-framework/tools/cfgsync/src/network/peers.rs b/testing-framework/tools/cfgsync/src/network/peers.rs index 2cbbfa2..1e0ebfe 100644 --- a/testing-framework/tools/cfgsync/src/network/peers.rs +++ b/testing-framework/tools/cfgsync/src/network/peers.rs @@ -1,32 +1,69 @@ -use std::str::FromStr; - -use nomos_libp2p::{Multiaddr, PeerId}; +use nomos_libp2p::{Multiaddr, PeerId, Protocol}; +use thiserror::Error; use super::address::find_matching_host; use crate::host::Host; +#[derive(Debug, Error)] +pub enum PeerRewriteError { + #[error("hosts and peer ids length mismatch (hosts={hosts}, peer_ids={peer_ids})")] + HostPeerLenMismatch { hosts: usize, peer_ids: usize }, + #[error("peer index {peer_idx} out of bounds for hosts (len={hosts_len})")] + HostIndexOutOfBounds { peer_idx: usize, hosts_len: usize }, + #[error("peer index {peer_idx} out of bounds for peer ids (len={peer_ids_len})")] + PeerIdIndexOutOfBounds { + peer_idx: usize, + peer_ids_len: usize, + }, +} + pub fn rewrite_initial_peers( templates: &[Vec], original_ports: &[u16], hosts: &[Host], peer_ids: &[PeerId], -) -> Vec> { - templates - .iter() - .enumerate() - .map(|(node_idx, peers)| { - peers - .iter() - .filter_map(|addr| find_matching_host(addr, original_ports)) - .filter(|&peer_idx| peer_idx != node_idx) - .map(|peer_idx| { - Multiaddr::from_str(&format!( - "/ip4/{}/udp/{}/quic-v1/p2p/{}", - hosts[peer_idx].ip, hosts[peer_idx].network_port, peer_ids[peer_idx] - )) - .expect("valid peer multiaddr") - }) - .collect() - }) - .collect() +) -> Result>, PeerRewriteError> { + if hosts.len() != peer_ids.len() { + return Err(PeerRewriteError::HostPeerLenMismatch { + hosts: hosts.len(), + peer_ids: peer_ids.len(), + }); + } + + let mut rewritten = Vec::with_capacity(templates.len()); + for (node_idx, peers) in templates.iter().enumerate() { + let mut node_peers = Vec::new(); + for addr in peers { + let Some(peer_idx) = find_matching_host(addr, original_ports) else { + continue; + }; + if peer_idx == node_idx { + continue; + } + + let host = hosts + .get(peer_idx) + .ok_or(PeerRewriteError::HostIndexOutOfBounds { + peer_idx, + hosts_len: hosts.len(), + })?; + let peer_id = + peer_ids + .get(peer_idx) + .ok_or(PeerRewriteError::PeerIdIndexOutOfBounds { + peer_idx, + peer_ids_len: peer_ids.len(), + })?; + + let mut rewritten_addr = Multiaddr::empty(); + rewritten_addr.push(Protocol::Ip4(host.ip)); + rewritten_addr.push(Protocol::Udp(host.network_port)); + rewritten_addr.push(Protocol::QuicV1); + rewritten_addr.push(Protocol::P2p((*peer_id).into())); + node_peers.push(rewritten_addr); + } + rewritten.push(node_peers); + } + + Ok(rewritten) } diff --git a/testing-framework/tools/cfgsync/src/repo.rs b/testing-framework/tools/cfgsync/src/repo.rs index d59295b..c2628c4 100644 --- a/testing-framework/tools/cfgsync/src/repo.rs +++ b/testing-framework/tools/cfgsync/src/repo.rs @@ -1,14 +1,13 @@ -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use nomos_tracing_service::TracingSettings; use testing_framework_config::topology::configs::{ GeneralConfig, consensus::ConsensusParams, da::DaParams, wallet::WalletConfig, }; -use tokio::{sync::oneshot::Sender, time::timeout}; +use tokio::{ + sync::{Mutex, oneshot::Sender}, + time::timeout, +}; use tracing::{error, info, warn}; use crate::{config::builder::try_create_node_configs, host::Host, server::CfgSyncConfig}; @@ -92,8 +91,8 @@ impl ConfigRepo { repo } - pub fn register(&self, host: Host, reply_tx: Sender) { - let mut waiting_hosts = self.waiting_hosts.lock().unwrap(); + pub async fn register(&self, host: Host, reply_tx: Sender) { + let mut waiting_hosts = self.waiting_hosts.lock().await; waiting_hosts.insert(host, reply_tx); } @@ -106,7 +105,10 @@ impl ConfigRepo { { info!("all hosts have announced their IPs"); - let mut waiting_hosts = self.waiting_hosts.lock().unwrap(); + let mut waiting_hosts = { + let mut guard = self.waiting_hosts.lock().await; + std::mem::take(&mut *guard) + }; let hosts = waiting_hosts.keys().cloned().collect(); let configs = match try_create_node_configs( @@ -145,7 +147,10 @@ impl ConfigRepo { } else { warn!("timeout: not all hosts announced within the time limit"); - let mut waiting_hosts = self.waiting_hosts.lock().unwrap(); + let mut waiting_hosts = { + let mut guard = self.waiting_hosts.lock().await; + std::mem::take(&mut *guard) + }; for (_, sender) in waiting_hosts.drain() { let _ = sender.send(RepoResponse::Timeout); } @@ -154,7 +159,8 @@ impl ConfigRepo { async fn wait_for_hosts(&self) { loop { - if self.waiting_hosts.lock().unwrap().len() >= self.n_hosts { + let len = { self.waiting_hosts.lock().await.len() }; + if len >= self.n_hosts { break; } tokio::time::sleep(HOST_POLLING_INTERVAL).await; diff --git a/testing-framework/tools/cfgsync/src/server.rs b/testing-framework/tools/cfgsync/src/server.rs index 358cf38..0bb5a88 100644 --- a/testing-framework/tools/cfgsync/src/server.rs +++ b/testing-framework/tools/cfgsync/src/server.rs @@ -173,14 +173,25 @@ async fn validator_config( }; let (reply_tx, reply_rx) = channel(); - config_repo.register(Host::validator_from_ip(ip, identifier, ports), reply_tx); + config_repo + .register(Host::validator_from_ip(ip, identifier, ports), reply_tx) + .await; (reply_rx.await).map_or_else( |_| (StatusCode::INTERNAL_SERVER_ERROR, "Error receiving config").into_response(), |config_response| match config_response { RepoResponse::Config(config) => { let config = create_validator_config(*config); - let mut value = to_value(&config).expect("validator config should serialize"); + let mut value = match to_value(&config) { + Ok(value) => value, + Err(err) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("failed to serialize validator config: {err}"), + ) + .into_response(); + } + }; inject_defaults(&mut value); override_api_ports(&mut value, &ports); @@ -219,14 +230,25 @@ async fn executor_config( }; let (reply_tx, reply_rx) = channel(); - config_repo.register(Host::executor_from_ip(ip, identifier, ports), reply_tx); + config_repo + .register(Host::executor_from_ip(ip, identifier, ports), reply_tx) + .await; (reply_rx.await).map_or_else( |_| (StatusCode::INTERNAL_SERVER_ERROR, "Error receiving config").into_response(), |config_response| match config_response { RepoResponse::Config(config) => { let config = create_executor_config(*config); - let mut value = to_value(&config).expect("executor config should serialize"); + let mut value = match to_value(&config) { + Ok(value) => value, + Err(err) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("failed to serialize executor config: {err}"), + ) + .into_response(); + } + }; inject_defaults(&mut value); override_api_ports(&mut value, &ports);