use std::{collections::HashMap, net::Ipv4Addr, str::FromStr as _}; use groth16::fr_to_bytes; use hex; use key_management_system::{ backend::preload::PreloadKMSBackendSettings, keys::{Ed25519Key, Key, ZkKey}, }; use nomos_core::{ mantle::GenesisTx as _, sdp::{Locator, ServiceType}, }; use nomos_libp2p::{Multiaddr, PeerId, Protocol, ed25519}; use nomos_tracing_service::{LoggerLayer, MetricsLayer, TracingLayer, TracingSettings}; use nomos_utils::net::get_available_udp_port; use rand::{Rng as _, thread_rng}; use testing_framework_config::topology::configs::{ GeneralConfig, api::GeneralApiConfig, blend::{GeneralBlendConfig, create_blend_configs}, bootstrap::{SHORT_PROLONGED_BOOTSTRAP_PERIOD, create_bootstrap_configs}, consensus::{ ConsensusParams, GeneralConsensusConfig, ProviderInfo, create_consensus_configs, create_genesis_tx_with_declarations, }, da::{DaParams, GeneralDaConfig, create_da_configs}, network::{NetworkParams, create_network_configs}, time::default_time_config, tracing::GeneralTracingConfig, wallet::WalletConfig, }; const DEFAULT_LIBP2P_NETWORK_PORT: u16 = 3000; const DEFAULT_DA_NETWORK_PORT: u16 = 3300; const DEFAULT_BLEND_PORT: u16 = 3400; const DEFAULT_API_PORT: u16 = 18080; #[derive(Copy, Clone, Eq, PartialEq, Hash)] pub enum HostKind { Validator, Executor, } #[derive(Eq, PartialEq, Hash, Clone)] pub struct Host { pub kind: HostKind, pub ip: Ipv4Addr, pub identifier: String, pub network_port: u16, pub da_network_port: u16, pub blend_port: u16, pub api_port: u16, pub testing_http_port: u16, } #[derive(Clone, Copy)] pub struct PortOverrides { pub network_port: Option, pub da_network_port: Option, pub blend_port: Option, pub api_port: Option, pub testing_http_port: Option, } impl Host { fn from_parts(kind: HostKind, ip: Ipv4Addr, identifier: String, ports: PortOverrides) -> Self { Self { kind, ip, identifier, network_port: ports.network_port.unwrap_or(DEFAULT_LIBP2P_NETWORK_PORT), da_network_port: ports.da_network_port.unwrap_or(DEFAULT_DA_NETWORK_PORT), blend_port: ports.blend_port.unwrap_or(DEFAULT_BLEND_PORT), api_port: ports.api_port.unwrap_or(DEFAULT_API_PORT), testing_http_port: ports.testing_http_port.unwrap_or(DEFAULT_API_PORT + 1), } } #[must_use] pub fn validator_from_ip(ip: Ipv4Addr, identifier: String, ports: PortOverrides) -> Self { Self::from_parts(HostKind::Validator, ip, identifier, ports) } #[must_use] pub fn executor_from_ip(ip: Ipv4Addr, identifier: String, ports: PortOverrides) -> Self { Self::from_parts(HostKind::Executor, ip, identifier, ports) } } #[must_use] pub fn create_node_configs( consensus_params: &ConsensusParams, da_params: &DaParams, tracing_settings: &TracingSettings, wallet_config: &WalletConfig, ids: Option>, da_ports: Option>, blend_ports: Option>, hosts: Vec, ) -> HashMap { let mut hosts = hosts; hosts.sort_by_key(|host| { let index = host .identifier .rsplit('-') .next() .and_then(|raw| raw.parse::().ok()) .unwrap_or(0); let kind = match host.kind { HostKind::Validator => 0, HostKind::Executor => 1, }; (kind, index) }); assert_eq!( hosts.len(), consensus_params.n_participants, "host count must match consensus participants" ); let ids = ids.unwrap_or_else(|| { let mut generated = vec![[0; 32]; consensus_params.n_participants]; for id in &mut generated { thread_rng().fill(id); } generated }); assert_eq!( ids.len(), consensus_params.n_participants, "pre-generated ids must match participant count" ); let ports = da_ports.unwrap_or_else(|| { (0..consensus_params.n_participants) .map(|_| get_available_udp_port().unwrap()) .collect() }); assert_eq!( ports.len(), consensus_params.n_participants, "da port list must match participant count" ); let blend_ports = blend_ports.unwrap_or_else(|| hosts.iter().map(|h| h.blend_port).collect()); assert_eq!( blend_ports.len(), consensus_params.n_participants, "blend port list must match participant count" ); let mut consensus_configs = create_consensus_configs(&ids, consensus_params, wallet_config); let bootstrap_configs = create_bootstrap_configs(&ids, SHORT_PROLONGED_BOOTSTRAP_PERIOD); let da_configs = create_da_configs(&ids, da_params, &ports); let network_configs = create_network_configs(&ids, &NetworkParams::default()); let blend_configs = create_blend_configs(&ids, &blend_ports); let api_configs = hosts .iter() .map(|host| GeneralApiConfig { address: format!("0.0.0.0:{}", host.api_port).parse().unwrap(), testing_http_address: format!("0.0.0.0:{}", host.testing_http_port) .parse() .unwrap(), }) .collect::>(); let mut configured_hosts = HashMap::new(); let initial_peer_templates: Vec> = network_configs .iter() .map(|cfg| cfg.backend.initial_peers.clone()) .collect(); let original_network_ports: Vec = network_configs .iter() .map(|cfg| cfg.backend.inner.port) .collect(); let peer_ids: Vec = ids .iter() .map(|bytes| { let mut key_bytes = *bytes; let secret = ed25519::SecretKey::try_from_bytes(&mut key_bytes).expect("valid ed25519 key"); PeerId::from_public_key(&ed25519::Keypair::from(secret).public().into()) }) .collect(); let host_network_init_peers = rewrite_initial_peers( &initial_peer_templates, &original_network_ports, &hosts, &peer_ids, ); let providers = create_providers(&hosts, &consensus_configs, &blend_configs, &da_configs); // Update genesis TX to contain Blend and DA providers. let ledger_tx = consensus_configs[0] .genesis_tx .mantle_tx() .ledger_tx .clone(); let genesis_tx = create_genesis_tx_with_declarations(ledger_tx, providers); for c in &mut consensus_configs { c.genesis_tx = genesis_tx.clone(); } // Set Blend and DA keys in KMS of each node config. let kms_configs = create_kms_configs(&blend_configs, &da_configs); for (i, host) in hosts.into_iter().enumerate() { let consensus_config = consensus_configs[i].clone(); let api_config = api_configs[i].clone(); // DA Libp2p network config. let mut da_config = da_configs[i].clone(); da_config.listening_address = Multiaddr::from_str(&format!( "/ip4/0.0.0.0/udp/{}/quic-v1", host.da_network_port, )) .unwrap(); if matches!(host.kind, HostKind::Validator) { da_config.policy_settings.min_dispersal_peers = 0; } // Libp2p network config. let mut network_config = network_configs[i].clone(); network_config.backend.inner.host = Ipv4Addr::from_str("0.0.0.0").unwrap(); network_config.backend.inner.port = host.network_port; network_config.backend.initial_peers = host_network_init_peers[i].clone(); network_config.backend.inner.nat_config = nomos_libp2p::NatSettings::Static { external_address: Multiaddr::from_str(&format!( "/ip4/{}/udp/{}/quic-v1", host.ip, host.network_port )) .unwrap(), }; // Blend network config. let mut blend_config = blend_configs[i].clone(); blend_config.backend_core.listening_address = Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/{}/quic-v1", host.blend_port)).unwrap(); // Tracing config. let tracing_config = update_tracing_identifier(tracing_settings.clone(), host.identifier.clone()); // Time config let time_config = default_time_config(); configured_hosts.insert( host.clone(), GeneralConfig { consensus_config, bootstrapping_config: bootstrap_configs[i].clone(), da_config, network_config, blend_config, api_config, tracing_config, time_config, kms_config: kms_configs[i].clone(), }, ); } configured_hosts } fn create_providers( hosts: &[Host], consensus_configs: &[GeneralConsensusConfig], blend_configs: &[GeneralBlendConfig], da_configs: &[GeneralDaConfig], ) -> Vec { let mut providers: Vec<_> = da_configs .iter() .enumerate() .map(|(i, da_conf)| ProviderInfo { service_type: ServiceType::DataAvailability, provider_sk: da_conf.signer.clone(), zk_sk: da_conf.secret_zk_key.clone(), locator: Locator( Multiaddr::from_str(&format!( "/ip4/{}/udp/{}/quic-v1", hosts[i].ip, hosts[i].da_network_port )) .unwrap(), ), note: consensus_configs[0].da_notes[i].clone(), }) .collect(); providers.extend(blend_configs.iter().enumerate().map(|(i, blend_conf)| { ProviderInfo { service_type: ServiceType::BlendNetwork, provider_sk: blend_conf.signer.clone(), zk_sk: blend_conf.secret_zk_key.clone(), locator: Locator( Multiaddr::from_str(&format!( "/ip4/{}/udp/{}/quic-v1", hosts[i].ip, hosts[i].blend_port )) .unwrap(), ), note: consensus_configs[0].blend_notes[i].clone(), } })); providers } fn rewrite_initial_peers( templates: &[Vec], original_ports: &[u16], hosts: &[Host], peer_ids: &[PeerId], ) -> Vec> { templates .iter() .enumerate() .map(|(node_idx, peers)| { peers .iter() .filter_map(|addr| find_matching_host(addr, original_ports)) .filter(|&peer_idx| peer_idx != node_idx) .map(|peer_idx| { Multiaddr::from_str(&format!( "/ip4/{}/udp/{}/quic-v1/p2p/{}", hosts[peer_idx].ip, hosts[peer_idx].network_port, peer_ids[peer_idx] )) .expect("valid peer multiaddr") }) .collect() }) .collect() } fn find_matching_host(addr: &Multiaddr, original_ports: &[u16]) -> Option { extract_udp_port(addr).and_then(|port| { original_ports .iter() .position(|candidate| *candidate == port) }) } fn extract_udp_port(addr: &Multiaddr) -> Option { addr.iter().find_map(|protocol| { if let Protocol::Udp(port) = protocol { Some(port) } else { None } }) } fn update_tracing_identifier( settings: TracingSettings, identifier: String, ) -> GeneralTracingConfig { GeneralTracingConfig { tracing_settings: TracingSettings { logger: match settings.logger { LoggerLayer::Loki(mut config) => { config.host_identifier.clone_from(&identifier); LoggerLayer::Loki(config) } other => other, }, tracing: match settings.tracing { TracingLayer::Otlp(mut config) => { config.service_name.clone_from(&identifier); TracingLayer::Otlp(config) } other @ TracingLayer::None => other, }, filter: settings.filter, metrics: match settings.metrics { MetricsLayer::Otlp(mut config) => { config.host_identifier = identifier; MetricsLayer::Otlp(config) } other @ MetricsLayer::None => other, }, console: settings.console, level: settings.level, }, } } fn create_kms_configs( blend_configs: &[GeneralBlendConfig], da_configs: &[GeneralDaConfig], ) -> Vec { da_configs .iter() .zip(blend_configs.iter()) .map(|(da_conf, blend_conf)| PreloadKMSBackendSettings { keys: [ ( hex::encode(blend_conf.signer.verifying_key().as_bytes()), Key::Ed25519(Ed25519Key::new(blend_conf.signer.clone())), ), ( hex::encode(fr_to_bytes( &blend_conf.secret_zk_key.to_public_key().into_inner(), )), Key::Zk(ZkKey::new(blend_conf.secret_zk_key.clone())), ), ( hex::encode(da_conf.signer.verifying_key().as_bytes()), Key::Ed25519(Ed25519Key::new(da_conf.signer.clone())), ), ( hex::encode(fr_to_bytes( &da_conf.secret_zk_key.to_public_key().into_inner(), )), Key::Zk(ZkKey::new(da_conf.secret_zk_key.clone())), ), ] .into(), }) .collect() } #[cfg(test)] mod cfgsync_tests { use std::{net::Ipv4Addr, num::NonZero, str::FromStr as _, time::Duration}; use nomos_da_network_core::swarm::{ DAConnectionMonitorSettings, DAConnectionPolicySettings, ReplicationConfig, }; use nomos_libp2p::{Multiaddr, Protocol}; use nomos_tracing_service::{ ConsoleLayer, FilterLayer, LoggerLayer, MetricsLayer, TracingLayer, TracingSettings, }; use testing_framework_config::topology::configs::{ consensus::ConsensusParams, da::DaParams, wallet::WalletConfig, }; use tracing::Level; use super::{Host, HostKind, create_node_configs}; #[test] fn basic_ip_list() { let hosts = (0..10) .map(|i| Host { kind: HostKind::Validator, ip: Ipv4Addr::from_str(&format!("10.1.1.{i}")).unwrap(), identifier: "node".into(), network_port: 3000, da_network_port: 4044, blend_port: 5000, api_port: 18080, testing_http_port: 18081, }) .collect(); let configs = create_node_configs( &ConsensusParams { n_participants: 10, security_param: NonZero::new(10).unwrap(), active_slot_coeff: 0.9, }, &DaParams { subnetwork_size: 2, dispersal_factor: 1, num_samples: 1, num_subnets: 2, old_blobs_check_interval: Duration::from_secs(5), blobs_validity_duration: Duration::from_secs(u64::MAX), global_params_path: String::new(), policy_settings: DAConnectionPolicySettings::default(), monitor_settings: DAConnectionMonitorSettings::default(), balancer_interval: Duration::ZERO, redial_cooldown: Duration::ZERO, replication_settings: ReplicationConfig { seen_message_cache_size: 0, seen_message_ttl: Duration::ZERO, }, subnets_refresh_interval: Duration::from_secs(1), retry_shares_limit: 1, retry_commitments_limit: 1, }, &TracingSettings { logger: LoggerLayer::None, tracing: TracingLayer::None, filter: FilterLayer::None, metrics: MetricsLayer::None, console: ConsoleLayer::None, level: Level::DEBUG, }, &WalletConfig::default(), None, None, None, hosts, ); for (host, config) in &configs { let network_port = config.network_config.backend.inner.port; let da_network_port = extract_port(&config.da_config.listening_address); let blend_port = extract_port(&config.blend_config.backend_core.listening_address); assert_eq!(network_port, host.network_port); assert_eq!(da_network_port, host.da_network_port); assert_eq!(blend_port, host.blend_port); } } fn extract_port(multiaddr: &Multiaddr) -> u16 { multiaddr .iter() .find_map(|protocol| match protocol { Protocol::Udp(port) => Some(port), _ => None, }) .unwrap() } }