From 870280e35469f1ac80cb507a65f0c94e473149d2 Mon Sep 17 00:00:00 2001 From: andrussal Date: Fri, 19 Dec 2025 01:48:50 +0100 Subject: [PATCH] cfgsync: split repo/provider phases Refactors long cfgsync helpers into smaller steps. - ConfigRepo::run - Before: wait/timeout + lock draining + config generation + per-host response sending in one block. - After: wait_for_hosts_with_timeout/take_waiting_hosts/generate_node_configs/ send_error_to_all/send_timeout_to_all/send_configs_to_all_hosts. - try_create_providers - Before: mixed validation + duplicated locator parsing for DA/Blend. - After: validate_provider_inputs/build_da_providers/build_blend_providers/locator_for_host. --- .../tools/cfgsync/src/config/providers.rs | 132 +++++++++++------- testing-framework/tools/cfgsync/src/repo.rs | 117 ++++++++++------ 2 files changed, 154 insertions(+), 95 deletions(-) diff --git a/testing-framework/tools/cfgsync/src/config/providers.rs b/testing-framework/tools/cfgsync/src/config/providers.rs index d084424..eda8535 100644 --- a/testing-framework/tools/cfgsync/src/config/providers.rs +++ b/testing-framework/tools/cfgsync/src/config/providers.rs @@ -39,57 +39,11 @@ pub fn try_create_providers( .first() .ok_or(ProviderBuildError::MissingConsensusConfigs)?; - if hosts.len() != da_configs.len() || hosts.len() != blend_configs.len() { - return Err(ProviderBuildError::HostConfigLenMismatch { - hosts: hosts.len(), - da_configs: da_configs.len(), - blend_configs: blend_configs.len(), - }); - } - if first.da_notes.len() < da_configs.len() || first.blend_notes.len() < blend_configs.len() { - return Err(ProviderBuildError::NoteLenMismatch { - da_notes: first.da_notes.len(), - blend_notes: first.blend_notes.len(), - }); - } + validate_provider_inputs(hosts, first, da_configs, blend_configs)?; let mut providers = Vec::with_capacity(da_configs.len() + blend_configs.len()); - - for (i, da_conf) in da_configs.iter().enumerate() { - let value = format!( - "/ip4/{}/udp/{}/quic-v1", - hosts[i].ip, hosts[i].da_network_port - ); - let locator = - Multiaddr::from_str(&value).map_err(|source| ProviderBuildError::InvalidMultiaddr { - value, - message: source.to_string(), - })?; - providers.push(ProviderInfo { - service_type: ServiceType::DataAvailability, - provider_sk: da_conf.signer.clone(), - zk_sk: da_conf.secret_zk_key.clone(), - locator: Locator(locator), - note: first.da_notes[i].clone(), - }); - } - - for (i, blend_conf) in blend_configs.iter().enumerate() { - let value = format!("/ip4/{}/udp/{}/quic-v1", hosts[i].ip, hosts[i].blend_port); - let locator = - Multiaddr::from_str(&value).map_err(|source| ProviderBuildError::InvalidMultiaddr { - value, - message: source.to_string(), - })?; - providers.push(ProviderInfo { - service_type: ServiceType::BlendNetwork, - provider_sk: blend_conf.signer.clone(), - zk_sk: blend_conf.secret_zk_key.clone(), - locator: Locator(locator), - note: first.blend_notes[i].clone(), - }); - } - + providers.extend(build_da_providers(hosts, first, da_configs)?); + providers.extend(build_blend_providers(hosts, first, blend_configs)?); Ok(providers) } @@ -101,3 +55,83 @@ pub fn create_providers( ) -> Result, ProviderBuildError> { try_create_providers(hosts, consensus_configs, blend_configs, da_configs) } + +fn validate_provider_inputs( + hosts: &[Host], + first: &GeneralConsensusConfig, + da_configs: &[GeneralDaConfig], + blend_configs: &[GeneralBlendConfig], +) -> Result<(), ProviderBuildError> { + if hosts.len() != da_configs.len() || hosts.len() != blend_configs.len() { + return Err(ProviderBuildError::HostConfigLenMismatch { + hosts: hosts.len(), + da_configs: da_configs.len(), + blend_configs: blend_configs.len(), + }); + } + + if first.da_notes.len() < da_configs.len() || first.blend_notes.len() < blend_configs.len() { + return Err(ProviderBuildError::NoteLenMismatch { + da_notes: first.da_notes.len(), + blend_notes: first.blend_notes.len(), + }); + } + + Ok(()) +} + +fn build_da_providers( + hosts: &[Host], + first: &GeneralConsensusConfig, + da_configs: &[GeneralDaConfig], +) -> Result, ProviderBuildError> { + da_configs + .iter() + .enumerate() + .map(|(i, da_conf)| { + let locator = locator_for_host(hosts, i, hosts[i].da_network_port)?; + Ok(ProviderInfo { + service_type: ServiceType::DataAvailability, + provider_sk: da_conf.signer.clone(), + zk_sk: da_conf.secret_zk_key.clone(), + locator, + note: first.da_notes[i].clone(), + }) + }) + .collect() +} + +fn build_blend_providers( + hosts: &[Host], + first: &GeneralConsensusConfig, + blend_configs: &[GeneralBlendConfig], +) -> Result, ProviderBuildError> { + blend_configs + .iter() + .enumerate() + .map(|(i, blend_conf)| { + let locator = locator_for_host(hosts, i, hosts[i].blend_port)?; + Ok(ProviderInfo { + service_type: ServiceType::BlendNetwork, + provider_sk: blend_conf.signer.clone(), + zk_sk: blend_conf.secret_zk_key.clone(), + locator, + note: first.blend_notes[i].clone(), + }) + }) + .collect() +} + +fn locator_for_host( + hosts: &[Host], + index: usize, + port: u16, +) -> Result { + let value = format!("/ip4/{}/udp/{port}/quic-v1", hosts[index].ip); + let locator = + Multiaddr::from_str(&value).map_err(|source| ProviderBuildError::InvalidMultiaddr { + value, + message: source.to_string(), + })?; + Ok(Locator(locator)) +} diff --git a/testing-framework/tools/cfgsync/src/repo.rs b/testing-framework/tools/cfgsync/src/repo.rs index c2628c4..e7b2c46 100644 --- a/testing-framework/tools/cfgsync/src/repo.rs +++ b/testing-framework/tools/cfgsync/src/repo.rs @@ -99,62 +99,27 @@ impl ConfigRepo { async fn run(&self) { let timeout_duration = self.timeout_duration; - if timeout(timeout_duration, self.wait_for_hosts()) - .await - .is_ok() - { + if wait_for_hosts_with_timeout(self, timeout_duration).await { info!("all hosts have announced their IPs"); - let mut waiting_hosts = { - let mut guard = self.waiting_hosts.lock().await; - std::mem::take(&mut *guard) - }; + let mut waiting_hosts = take_waiting_hosts(self).await; let hosts = waiting_hosts.keys().cloned().collect(); - let configs = match try_create_node_configs( - &self.consensus_params, - &self.da_params, - &self.tracing_settings, - &self.wallet_config, - self.ids.clone(), - self.da_ports.clone(), - self.blend_ports.clone(), - hosts, - ) { + let configs = match generate_node_configs(self, hosts) { Ok(configs) => configs, - Err(err) => { - error!(error = %err, "failed to generate node configs"); - let message = err.to_string(); - for (_, sender) in waiting_hosts.drain() { - let _ = sender.send(RepoResponse::Error(message.clone())); - } + Err(message) => { + send_error_to_all(&mut waiting_hosts, &message); return; } }; - for (host, sender) in waiting_hosts.drain() { - match configs.get(&host) { - Some(config) => { - let _ = sender.send(RepoResponse::Config(Box::new(config.to_owned()))); - } - None => { - warn!(identifier = %host.identifier, "missing config for host"); - let _ = - sender.send(RepoResponse::Error("missing config for host".to_string())); - } - } - } - } else { - warn!("timeout: not all hosts announced within the time limit"); - - let mut waiting_hosts = { - let mut guard = self.waiting_hosts.lock().await; - std::mem::take(&mut *guard) - }; - for (_, sender) in waiting_hosts.drain() { - let _ = sender.send(RepoResponse::Timeout); - } + send_configs_to_all_hosts(&mut waiting_hosts, &configs); + return; } + + warn!("timeout: not all hosts announced within the time limit"); + let mut waiting_hosts = take_waiting_hosts(self).await; + send_timeout_to_all(&mut waiting_hosts); } async fn wait_for_hosts(&self) { @@ -167,3 +132,63 @@ impl ConfigRepo { } } } + +async fn wait_for_hosts_with_timeout(repo: &ConfigRepo, timeout_duration: Duration) -> bool { + timeout(timeout_duration, repo.wait_for_hosts()) + .await + .is_ok() +} + +async fn take_waiting_hosts(repo: &ConfigRepo) -> HashMap> { + let mut guard = repo.waiting_hosts.lock().await; + std::mem::take(&mut *guard) +} + +fn generate_node_configs( + repo: &ConfigRepo, + hosts: Vec, +) -> Result, String> { + try_create_node_configs( + &repo.consensus_params, + &repo.da_params, + &repo.tracing_settings, + &repo.wallet_config, + repo.ids.clone(), + repo.da_ports.clone(), + repo.blend_ports.clone(), + hosts, + ) + .map_err(|err| { + error!(error = %err, "failed to generate node configs"); + err.to_string() + }) +} + +fn send_error_to_all(waiting_hosts: &mut HashMap>, message: &str) { + for (_, sender) in waiting_hosts.drain() { + let _ = sender.send(RepoResponse::Error(message.to_string())); + } +} + +fn send_timeout_to_all(waiting_hosts: &mut HashMap>) { + for (_, sender) in waiting_hosts.drain() { + let _ = sender.send(RepoResponse::Timeout); + } +} + +fn send_configs_to_all_hosts( + waiting_hosts: &mut HashMap>, + configs: &HashMap, +) { + for (host, sender) in waiting_hosts.drain() { + match configs.get(&host) { + Some(config) => { + let _ = sender.send(RepoResponse::Config(Box::new(config.to_owned()))); + } + None => { + warn!(identifier = %host.identifier, "missing config for host"); + let _ = sender.send(RepoResponse::Error("missing config for host".to_string())); + } + } + } +}