cfgsync: remove panics and blocking mutex

This commit is contained in:
andrussal 2025-12-18 22:59:16 +01:00
parent ecdbb3a171
commit 7dea766801
9 changed files with 137 additions and 56 deletions

1
Cargo.lock generated
View File

@ -1015,6 +1015,7 @@ dependencies = [
name = "cfgsync" name = "cfgsync"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"axum", "axum",
"clap", "clap",
"groth16", "groth16",

View File

@ -125,7 +125,8 @@ mod tests {
Some(topology.nodes().map(|node| node.da_port).collect()), Some(topology.nodes().map(|node| node.da_port).collect()),
Some(topology.nodes().map(|node| node.blend_port).collect()), Some(topology.nodes().map(|node| node.blend_port).collect()),
hosts, hosts,
); )
.expect("cfgsync config generation should succeed");
let configs_by_identifier: HashMap<_, _> = configs let configs_by_identifier: HashMap<_, _> = configs
.into_iter() .into_iter()
.map(|(host, config)| (host.identifier, config)) .map(|(host, config)| (host.identifier, config))
@ -179,7 +180,8 @@ mod tests {
Some(topology.nodes().map(|node| node.da_port).collect()), Some(topology.nodes().map(|node| node.da_port).collect()),
Some(topology.nodes().map(|node| node.blend_port).collect()), Some(topology.nodes().map(|node| node.blend_port).collect()),
hosts, hosts,
); )
.expect("cfgsync config generation should succeed");
let configs_by_identifier: HashMap<_, _> = configs let configs_by_identifier: HashMap<_, _> = configs
.into_iter() .into_iter()
.map(|(host, config)| (host.identifier, config)) .map(|(host, config)| (host.identifier, config))
@ -217,7 +219,8 @@ mod tests {
Some(topology.nodes().map(|node| node.da_port).collect()), Some(topology.nodes().map(|node| node.da_port).collect()),
Some(topology.nodes().map(|node| node.blend_port).collect()), Some(topology.nodes().map(|node| node.blend_port).collect()),
hosts, hosts,
); )
.expect("cfgsync config generation should succeed");
for (host, config) in configs { for (host, config) in configs {
let genesis = &config.consensus_config.genesis_tx; let genesis = &config.consensus_config.genesis_tx;
@ -250,7 +253,8 @@ mod tests {
Some(topology.nodes().map(|node| node.da_port).collect()), Some(topology.nodes().map(|node| node.da_port).collect()),
Some(topology.nodes().map(|node| node.blend_port).collect()), Some(topology.nodes().map(|node| node.blend_port).collect()),
hosts, hosts,
); )
.expect("cfgsync config generation should succeed");
let configs_by_identifier: HashMap<_, _> = configs let configs_by_identifier: HashMap<_, _> = configs
.into_iter() .into_iter()
.map(|(host, config)| (host.identifier, config)) .map(|(host, config)| (host.identifier, config))

View File

@ -13,6 +13,7 @@ version = { workspace = true }
workspace = true workspace = true
[dependencies] [dependencies]
anyhow = "1"
axum = { default-features = false, features = ["http1", "http2", "json", "tokio"], version = "0.7.5" } axum = { default-features = false, features = ["http1", "http2", "json", "tokio"], version = "0.7.5" }
clap = { default-features = false, version = "4" } clap = { default-features = false, version = "4" }
groth16 = { workspace = true } groth16 = { workspace = true }

View File

@ -1,5 +1,6 @@
use std::{path::PathBuf, process}; use std::path::PathBuf;
use anyhow::Context as _;
use cfgsync::server::{CfgSyncConfig, cfgsync_app}; use cfgsync::server::{CfgSyncConfig, cfgsync_app};
use clap::Parser; use clap::Parser;
use tokio::net::TcpListener; use tokio::net::TcpListener;
@ -11,19 +12,29 @@ struct Args {
} }
#[tokio::main] #[tokio::main]
async fn main() { async fn main() -> anyhow::Result<()> {
let cli = Args::parse(); let cli = Args::parse();
let config = CfgSyncConfig::load_from_file(&cli.config).unwrap_or_else(|err| { let config = CfgSyncConfig::load_from_file(&cli.config)
eprintln!("{err}"); .map_err(anyhow::Error::msg)
process::exit(1); .with_context(|| {
}); format!(
"failed to load cfgsync config from {}",
cli.config.display()
)
})?;
let port = config.port; let port = config.port;
let app = cfgsync_app(config.into()); let app = cfgsync_app(config.into());
println!("Server running on http://0.0.0.0:{port}"); println!("Server running on http://0.0.0.0:{port}");
let listener = TcpListener::bind(&format!("0.0.0.0:{port}")).await.unwrap(); let listener = TcpListener::bind(&format!("0.0.0.0:{port}"))
.await
.with_context(|| format!("failed to bind cfgsync server on 0.0.0.0:{port}"))?;
axum::serve(listener, app).await.unwrap(); axum::serve(listener, app)
.await
.context("cfgsync server terminated unexpectedly")?;
Ok(())
} }

View File

@ -28,7 +28,6 @@ use crate::{
network::rewrite_initial_peers, network::rewrite_initial_peers,
}; };
#[must_use]
pub fn create_node_configs( pub fn create_node_configs(
consensus_params: &ConsensusParams, consensus_params: &ConsensusParams,
da_params: &DaParams, da_params: &DaParams,
@ -38,7 +37,7 @@ pub fn create_node_configs(
da_ports: Option<Vec<u16>>, da_ports: Option<Vec<u16>>,
blend_ports: Option<Vec<u16>>, blend_ports: Option<Vec<u16>>,
hosts: Vec<Host>, hosts: Vec<Host>,
) -> HashMap<Host, GeneralConfig> { ) -> Result<HashMap<Host, GeneralConfig>, NodeConfigBuildError> {
try_create_node_configs( try_create_node_configs(
consensus_params, consensus_params,
da_params, da_params,
@ -49,7 +48,6 @@ pub fn create_node_configs(
blend_ports, blend_ports,
hosts, hosts,
) )
.expect("failed to build cfgsync node configs")
} }
#[derive(Debug, Error)] #[derive(Debug, Error)]
@ -77,6 +75,8 @@ pub enum NodeConfigBuildError {
MissingConsensusConfig, MissingConsensusConfig,
#[error("host/config length mismatch")] #[error("host/config length mismatch")]
HostConfigLenMismatch, HostConfigLenMismatch,
#[error(transparent)]
PeerRewrite(#[from] crate::network::peers::PeerRewriteError),
} }
pub fn try_create_node_configs( pub fn try_create_node_configs(
@ -137,7 +137,7 @@ pub fn try_create_node_configs(
&original_network_ports, &original_network_ports,
&hosts, &hosts,
&peer_ids, &peer_ids,
); )?;
let providers = try_create_providers(&hosts, &consensus_configs, &blend_configs, &da_configs)?; let providers = try_create_providers(&hosts, &consensus_configs, &blend_configs, &da_configs)?;
@ -182,7 +182,7 @@ pub fn try_create_node_configs(
} }
let mut network_config = network_configs[i].clone(); let mut network_config = network_configs[i].clone();
network_config.backend.swarm.host = Ipv4Addr::from_str("0.0.0.0").unwrap(); network_config.backend.swarm.host = Ipv4Addr::UNSPECIFIED;
network_config.backend.swarm.port = host.network_port; network_config.backend.swarm.port = host.network_port;
network_config.backend.initial_peers = host_network_init_peers[i].clone(); network_config.backend.initial_peers = host_network_init_peers[i].clone();
let nat_value = format!("/ip4/{}/udp/{}/quic-v1", host.ip, host.network_port); let nat_value = format!("/ip4/{}/udp/{}/quic-v1", host.ip, host.network_port);

View File

@ -98,7 +98,6 @@ pub fn create_providers(
consensus_configs: &[GeneralConsensusConfig], consensus_configs: &[GeneralConsensusConfig],
blend_configs: &[GeneralBlendConfig], blend_configs: &[GeneralBlendConfig],
da_configs: &[GeneralDaConfig], da_configs: &[GeneralDaConfig],
) -> Vec<ProviderInfo> { ) -> Result<Vec<ProviderInfo>, ProviderBuildError> {
try_create_providers(hosts, consensus_configs, blend_configs, da_configs) try_create_providers(hosts, consensus_configs, blend_configs, da_configs)
.expect("failed to build providers")
} }

View File

@ -1,32 +1,69 @@
use std::str::FromStr; use nomos_libp2p::{Multiaddr, PeerId, Protocol};
use thiserror::Error;
use nomos_libp2p::{Multiaddr, PeerId};
use super::address::find_matching_host; use super::address::find_matching_host;
use crate::host::Host; use crate::host::Host;
#[derive(Debug, Error)]
pub enum PeerRewriteError {
#[error("hosts and peer ids length mismatch (hosts={hosts}, peer_ids={peer_ids})")]
HostPeerLenMismatch { hosts: usize, peer_ids: usize },
#[error("peer index {peer_idx} out of bounds for hosts (len={hosts_len})")]
HostIndexOutOfBounds { peer_idx: usize, hosts_len: usize },
#[error("peer index {peer_idx} out of bounds for peer ids (len={peer_ids_len})")]
PeerIdIndexOutOfBounds {
peer_idx: usize,
peer_ids_len: usize,
},
}
pub fn rewrite_initial_peers( pub fn rewrite_initial_peers(
templates: &[Vec<Multiaddr>], templates: &[Vec<Multiaddr>],
original_ports: &[u16], original_ports: &[u16],
hosts: &[Host], hosts: &[Host],
peer_ids: &[PeerId], peer_ids: &[PeerId],
) -> Vec<Vec<Multiaddr>> { ) -> Result<Vec<Vec<Multiaddr>>, PeerRewriteError> {
templates if hosts.len() != peer_ids.len() {
.iter() return Err(PeerRewriteError::HostPeerLenMismatch {
.enumerate() hosts: hosts.len(),
.map(|(node_idx, peers)| { peer_ids: peer_ids.len(),
peers });
.iter() }
.filter_map(|addr| find_matching_host(addr, original_ports))
.filter(|&peer_idx| peer_idx != node_idx) let mut rewritten = Vec::with_capacity(templates.len());
.map(|peer_idx| { for (node_idx, peers) in templates.iter().enumerate() {
Multiaddr::from_str(&format!( let mut node_peers = Vec::new();
"/ip4/{}/udp/{}/quic-v1/p2p/{}", for addr in peers {
hosts[peer_idx].ip, hosts[peer_idx].network_port, peer_ids[peer_idx] let Some(peer_idx) = find_matching_host(addr, original_ports) else {
)) continue;
.expect("valid peer multiaddr") };
}) if peer_idx == node_idx {
.collect() continue;
}) }
.collect()
let host = hosts
.get(peer_idx)
.ok_or(PeerRewriteError::HostIndexOutOfBounds {
peer_idx,
hosts_len: hosts.len(),
})?;
let peer_id =
peer_ids
.get(peer_idx)
.ok_or(PeerRewriteError::PeerIdIndexOutOfBounds {
peer_idx,
peer_ids_len: peer_ids.len(),
})?;
let mut rewritten_addr = Multiaddr::empty();
rewritten_addr.push(Protocol::Ip4(host.ip));
rewritten_addr.push(Protocol::Udp(host.network_port));
rewritten_addr.push(Protocol::QuicV1);
rewritten_addr.push(Protocol::P2p((*peer_id).into()));
node_peers.push(rewritten_addr);
}
rewritten.push(node_peers);
}
Ok(rewritten)
} }

View File

@ -1,14 +1,13 @@
use std::{ use std::{collections::HashMap, sync::Arc, time::Duration};
collections::HashMap,
sync::{Arc, Mutex},
time::Duration,
};
use nomos_tracing_service::TracingSettings; use nomos_tracing_service::TracingSettings;
use testing_framework_config::topology::configs::{ use testing_framework_config::topology::configs::{
GeneralConfig, consensus::ConsensusParams, da::DaParams, wallet::WalletConfig, GeneralConfig, consensus::ConsensusParams, da::DaParams, wallet::WalletConfig,
}; };
use tokio::{sync::oneshot::Sender, time::timeout}; use tokio::{
sync::{Mutex, oneshot::Sender},
time::timeout,
};
use tracing::{error, info, warn}; use tracing::{error, info, warn};
use crate::{config::builder::try_create_node_configs, host::Host, server::CfgSyncConfig}; use crate::{config::builder::try_create_node_configs, host::Host, server::CfgSyncConfig};
@ -92,8 +91,8 @@ impl ConfigRepo {
repo repo
} }
pub fn register(&self, host: Host, reply_tx: Sender<RepoResponse>) { pub async fn register(&self, host: Host, reply_tx: Sender<RepoResponse>) {
let mut waiting_hosts = self.waiting_hosts.lock().unwrap(); let mut waiting_hosts = self.waiting_hosts.lock().await;
waiting_hosts.insert(host, reply_tx); waiting_hosts.insert(host, reply_tx);
} }
@ -106,7 +105,10 @@ impl ConfigRepo {
{ {
info!("all hosts have announced their IPs"); info!("all hosts have announced their IPs");
let mut waiting_hosts = self.waiting_hosts.lock().unwrap(); let mut waiting_hosts = {
let mut guard = self.waiting_hosts.lock().await;
std::mem::take(&mut *guard)
};
let hosts = waiting_hosts.keys().cloned().collect(); let hosts = waiting_hosts.keys().cloned().collect();
let configs = match try_create_node_configs( let configs = match try_create_node_configs(
@ -145,7 +147,10 @@ impl ConfigRepo {
} else { } else {
warn!("timeout: not all hosts announced within the time limit"); warn!("timeout: not all hosts announced within the time limit");
let mut waiting_hosts = self.waiting_hosts.lock().unwrap(); let mut waiting_hosts = {
let mut guard = self.waiting_hosts.lock().await;
std::mem::take(&mut *guard)
};
for (_, sender) in waiting_hosts.drain() { for (_, sender) in waiting_hosts.drain() {
let _ = sender.send(RepoResponse::Timeout); let _ = sender.send(RepoResponse::Timeout);
} }
@ -154,7 +159,8 @@ impl ConfigRepo {
async fn wait_for_hosts(&self) { async fn wait_for_hosts(&self) {
loop { loop {
if self.waiting_hosts.lock().unwrap().len() >= self.n_hosts { let len = { self.waiting_hosts.lock().await.len() };
if len >= self.n_hosts {
break; break;
} }
tokio::time::sleep(HOST_POLLING_INTERVAL).await; tokio::time::sleep(HOST_POLLING_INTERVAL).await;

View File

@ -173,14 +173,25 @@ async fn validator_config(
}; };
let (reply_tx, reply_rx) = channel(); let (reply_tx, reply_rx) = channel();
config_repo.register(Host::validator_from_ip(ip, identifier, ports), reply_tx); config_repo
.register(Host::validator_from_ip(ip, identifier, ports), reply_tx)
.await;
(reply_rx.await).map_or_else( (reply_rx.await).map_or_else(
|_| (StatusCode::INTERNAL_SERVER_ERROR, "Error receiving config").into_response(), |_| (StatusCode::INTERNAL_SERVER_ERROR, "Error receiving config").into_response(),
|config_response| match config_response { |config_response| match config_response {
RepoResponse::Config(config) => { RepoResponse::Config(config) => {
let config = create_validator_config(*config); let config = create_validator_config(*config);
let mut value = to_value(&config).expect("validator config should serialize"); let mut value = match to_value(&config) {
Ok(value) => value,
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("failed to serialize validator config: {err}"),
)
.into_response();
}
};
inject_defaults(&mut value); inject_defaults(&mut value);
override_api_ports(&mut value, &ports); override_api_ports(&mut value, &ports);
@ -219,14 +230,25 @@ async fn executor_config(
}; };
let (reply_tx, reply_rx) = channel(); let (reply_tx, reply_rx) = channel();
config_repo.register(Host::executor_from_ip(ip, identifier, ports), reply_tx); config_repo
.register(Host::executor_from_ip(ip, identifier, ports), reply_tx)
.await;
(reply_rx.await).map_or_else( (reply_rx.await).map_or_else(
|_| (StatusCode::INTERNAL_SERVER_ERROR, "Error receiving config").into_response(), |_| (StatusCode::INTERNAL_SERVER_ERROR, "Error receiving config").into_response(),
|config_response| match config_response { |config_response| match config_response {
RepoResponse::Config(config) => { RepoResponse::Config(config) => {
let config = create_executor_config(*config); let config = create_executor_config(*config);
let mut value = to_value(&config).expect("executor config should serialize"); let mut value = match to_value(&config) {
Ok(value) => value,
Err(err) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("failed to serialize executor config: {err}"),
)
.into_response();
}
};
inject_defaults(&mut value); inject_defaults(&mut value);
override_api_ports(&mut value, &ports); override_api_ports(&mut value, &ports);