cfgsync: split repo/provider phases

Refactors long cfgsync helpers into smaller steps.

- ConfigRepo::run
  - Before: wait/timeout + lock draining + config generation + per-host response sending in one block.
  - After: wait_for_hosts_with_timeout/take_waiting_hosts/generate_node_configs/
    send_error_to_all/send_timeout_to_all/send_configs_to_all_hosts.

- try_create_providers
  - Before: mixed validation + duplicated locator parsing for DA/Blend.
  - After: validate_provider_inputs/build_da_providers/build_blend_providers/locator_for_host.
This commit is contained in:
andrussal 2025-12-19 01:48:50 +01:00
parent 938d782f8d
commit 870280e354
2 changed files with 154 additions and 95 deletions

View File

@ -39,57 +39,11 @@ pub fn try_create_providers(
.first()
.ok_or(ProviderBuildError::MissingConsensusConfigs)?;
if hosts.len() != da_configs.len() || hosts.len() != blend_configs.len() {
return Err(ProviderBuildError::HostConfigLenMismatch {
hosts: hosts.len(),
da_configs: da_configs.len(),
blend_configs: blend_configs.len(),
});
}
if first.da_notes.len() < da_configs.len() || first.blend_notes.len() < blend_configs.len() {
return Err(ProviderBuildError::NoteLenMismatch {
da_notes: first.da_notes.len(),
blend_notes: first.blend_notes.len(),
});
}
validate_provider_inputs(hosts, first, da_configs, blend_configs)?;
let mut providers = Vec::with_capacity(da_configs.len() + blend_configs.len());
for (i, da_conf) in da_configs.iter().enumerate() {
let value = format!(
"/ip4/{}/udp/{}/quic-v1",
hosts[i].ip, hosts[i].da_network_port
);
let locator =
Multiaddr::from_str(&value).map_err(|source| ProviderBuildError::InvalidMultiaddr {
value,
message: source.to_string(),
})?;
providers.push(ProviderInfo {
service_type: ServiceType::DataAvailability,
provider_sk: da_conf.signer.clone(),
zk_sk: da_conf.secret_zk_key.clone(),
locator: Locator(locator),
note: first.da_notes[i].clone(),
});
}
for (i, blend_conf) in blend_configs.iter().enumerate() {
let value = format!("/ip4/{}/udp/{}/quic-v1", hosts[i].ip, hosts[i].blend_port);
let locator =
Multiaddr::from_str(&value).map_err(|source| ProviderBuildError::InvalidMultiaddr {
value,
message: source.to_string(),
})?;
providers.push(ProviderInfo {
service_type: ServiceType::BlendNetwork,
provider_sk: blend_conf.signer.clone(),
zk_sk: blend_conf.secret_zk_key.clone(),
locator: Locator(locator),
note: first.blend_notes[i].clone(),
});
}
providers.extend(build_da_providers(hosts, first, da_configs)?);
providers.extend(build_blend_providers(hosts, first, blend_configs)?);
Ok(providers)
}
@ -101,3 +55,83 @@ pub fn create_providers(
) -> Result<Vec<ProviderInfo>, ProviderBuildError> {
try_create_providers(hosts, consensus_configs, blend_configs, da_configs)
}
fn validate_provider_inputs(
hosts: &[Host],
first: &GeneralConsensusConfig,
da_configs: &[GeneralDaConfig],
blend_configs: &[GeneralBlendConfig],
) -> Result<(), ProviderBuildError> {
if hosts.len() != da_configs.len() || hosts.len() != blend_configs.len() {
return Err(ProviderBuildError::HostConfigLenMismatch {
hosts: hosts.len(),
da_configs: da_configs.len(),
blend_configs: blend_configs.len(),
});
}
if first.da_notes.len() < da_configs.len() || first.blend_notes.len() < blend_configs.len() {
return Err(ProviderBuildError::NoteLenMismatch {
da_notes: first.da_notes.len(),
blend_notes: first.blend_notes.len(),
});
}
Ok(())
}
fn build_da_providers(
hosts: &[Host],
first: &GeneralConsensusConfig,
da_configs: &[GeneralDaConfig],
) -> Result<Vec<ProviderInfo>, ProviderBuildError> {
da_configs
.iter()
.enumerate()
.map(|(i, da_conf)| {
let locator = locator_for_host(hosts, i, hosts[i].da_network_port)?;
Ok(ProviderInfo {
service_type: ServiceType::DataAvailability,
provider_sk: da_conf.signer.clone(),
zk_sk: da_conf.secret_zk_key.clone(),
locator,
note: first.da_notes[i].clone(),
})
})
.collect()
}
fn build_blend_providers(
hosts: &[Host],
first: &GeneralConsensusConfig,
blend_configs: &[GeneralBlendConfig],
) -> Result<Vec<ProviderInfo>, ProviderBuildError> {
blend_configs
.iter()
.enumerate()
.map(|(i, blend_conf)| {
let locator = locator_for_host(hosts, i, hosts[i].blend_port)?;
Ok(ProviderInfo {
service_type: ServiceType::BlendNetwork,
provider_sk: blend_conf.signer.clone(),
zk_sk: blend_conf.secret_zk_key.clone(),
locator,
note: first.blend_notes[i].clone(),
})
})
.collect()
}
fn locator_for_host(
hosts: &[Host],
index: usize,
port: u16,
) -> Result<Locator, ProviderBuildError> {
let value = format!("/ip4/{}/udp/{port}/quic-v1", hosts[index].ip);
let locator =
Multiaddr::from_str(&value).map_err(|source| ProviderBuildError::InvalidMultiaddr {
value,
message: source.to_string(),
})?;
Ok(Locator(locator))
}

View File

@ -99,62 +99,27 @@ impl ConfigRepo {
async fn run(&self) {
let timeout_duration = self.timeout_duration;
if timeout(timeout_duration, self.wait_for_hosts())
.await
.is_ok()
{
if wait_for_hosts_with_timeout(self, timeout_duration).await {
info!("all hosts have announced their IPs");
let mut waiting_hosts = {
let mut guard = self.waiting_hosts.lock().await;
std::mem::take(&mut *guard)
};
let mut waiting_hosts = take_waiting_hosts(self).await;
let hosts = waiting_hosts.keys().cloned().collect();
let configs = match try_create_node_configs(
&self.consensus_params,
&self.da_params,
&self.tracing_settings,
&self.wallet_config,
self.ids.clone(),
self.da_ports.clone(),
self.blend_ports.clone(),
hosts,
) {
let configs = match generate_node_configs(self, hosts) {
Ok(configs) => configs,
Err(err) => {
error!(error = %err, "failed to generate node configs");
let message = err.to_string();
for (_, sender) in waiting_hosts.drain() {
let _ = sender.send(RepoResponse::Error(message.clone()));
}
Err(message) => {
send_error_to_all(&mut waiting_hosts, &message);
return;
}
};
for (host, sender) in waiting_hosts.drain() {
match configs.get(&host) {
Some(config) => {
let _ = sender.send(RepoResponse::Config(Box::new(config.to_owned())));
}
None => {
warn!(identifier = %host.identifier, "missing config for host");
let _ =
sender.send(RepoResponse::Error("missing config for host".to_string()));
}
}
}
} else {
warn!("timeout: not all hosts announced within the time limit");
let mut waiting_hosts = {
let mut guard = self.waiting_hosts.lock().await;
std::mem::take(&mut *guard)
};
for (_, sender) in waiting_hosts.drain() {
let _ = sender.send(RepoResponse::Timeout);
}
send_configs_to_all_hosts(&mut waiting_hosts, &configs);
return;
}
warn!("timeout: not all hosts announced within the time limit");
let mut waiting_hosts = take_waiting_hosts(self).await;
send_timeout_to_all(&mut waiting_hosts);
}
async fn wait_for_hosts(&self) {
@ -167,3 +132,63 @@ impl ConfigRepo {
}
}
}
async fn wait_for_hosts_with_timeout(repo: &ConfigRepo, timeout_duration: Duration) -> bool {
timeout(timeout_duration, repo.wait_for_hosts())
.await
.is_ok()
}
async fn take_waiting_hosts(repo: &ConfigRepo) -> HashMap<Host, Sender<RepoResponse>> {
let mut guard = repo.waiting_hosts.lock().await;
std::mem::take(&mut *guard)
}
fn generate_node_configs(
repo: &ConfigRepo,
hosts: Vec<Host>,
) -> Result<HashMap<Host, GeneralConfig>, String> {
try_create_node_configs(
&repo.consensus_params,
&repo.da_params,
&repo.tracing_settings,
&repo.wallet_config,
repo.ids.clone(),
repo.da_ports.clone(),
repo.blend_ports.clone(),
hosts,
)
.map_err(|err| {
error!(error = %err, "failed to generate node configs");
err.to_string()
})
}
fn send_error_to_all(waiting_hosts: &mut HashMap<Host, Sender<RepoResponse>>, message: &str) {
for (_, sender) in waiting_hosts.drain() {
let _ = sender.send(RepoResponse::Error(message.to_string()));
}
}
fn send_timeout_to_all(waiting_hosts: &mut HashMap<Host, Sender<RepoResponse>>) {
for (_, sender) in waiting_hosts.drain() {
let _ = sender.send(RepoResponse::Timeout);
}
}
fn send_configs_to_all_hosts(
waiting_hosts: &mut HashMap<Host, Sender<RepoResponse>>,
configs: &HashMap<Host, GeneralConfig>,
) {
for (host, sender) in waiting_hosts.drain() {
match configs.get(&host) {
Some(config) => {
let _ = sender.send(RepoResponse::Config(Box::new(config.to_owned())));
}
None => {
warn!(identifier = %host.identifier, "missing config for host");
let _ = sender.send(RepoResponse::Error("missing config for host".to_string()));
}
}
}
}