2025-12-02 14:51:34 +01:00

173 lines
6.6 KiB
Rust

use std::{fs::File, num::NonZero, path::Path, time::Duration};
use anyhow::{Context as _, Result};
use nomos_da_network_core::swarm::ReplicationConfig;
use nomos_tracing_service::TracingSettings;
use nomos_utils::bounded_duration::{MinimalBoundedDuration, SECOND};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use crate::topology::{GeneratedTopology, configs::wallet::WalletConfig};
#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CfgSyncConfig {
pub port: u16,
pub n_hosts: usize,
pub timeout: u64,
pub security_param: NonZero<u32>,
pub active_slot_coeff: f64,
#[serde(default)]
pub wallet: WalletConfig,
#[serde(default)]
pub ids: Option<Vec<[u8; 32]>>,
#[serde(default)]
pub da_ports: Option<Vec<u16>>,
#[serde(default)]
pub blend_ports: Option<Vec<u16>>,
pub subnetwork_size: usize,
pub dispersal_factor: usize,
pub num_samples: u16,
pub num_subnets: u16,
#[serde_as(as = "MinimalBoundedDuration<0, SECOND>")]
pub old_blobs_check_interval: Duration,
#[serde_as(as = "MinimalBoundedDuration<0, SECOND>")]
pub blobs_validity_duration: Duration,
pub global_params_path: String,
pub min_dispersal_peers: usize,
pub min_replication_peers: usize,
#[serde_as(as = "MinimalBoundedDuration<0, SECOND>")]
pub monitor_failure_time_window: Duration,
#[serde_as(as = "MinimalBoundedDuration<0, SECOND>")]
pub balancer_interval: Duration,
pub replication_settings: ReplicationConfig,
pub retry_shares_limit: usize,
pub retry_commitments_limit: usize,
pub tracing_settings: TracingSettings,
}
pub fn load_cfgsync_template(path: &Path) -> Result<CfgSyncConfig> {
let file = File::open(path)
.with_context(|| format!("opening cfgsync template at {}", path.display()))?;
serde_yaml::from_reader(file).context("parsing cfgsync template")
}
pub fn write_cfgsync_template(path: &Path, cfg: &CfgSyncConfig) -> Result<()> {
let file = File::create(path)
.with_context(|| format!("writing cfgsync template to {}", path.display()))?;
let serializable = SerializableCfgSyncConfig::from(cfg);
serde_yaml::to_writer(file, &serializable).context("serializing cfgsync template")
}
pub fn render_cfgsync_yaml(cfg: &CfgSyncConfig) -> Result<String> {
let serializable = SerializableCfgSyncConfig::from(cfg);
serde_yaml::to_string(&serializable).context("rendering cfgsync yaml")
}
pub fn apply_topology_overrides(
cfg: &mut CfgSyncConfig,
topology: &GeneratedTopology,
use_kzg_mount: bool,
) {
let hosts = topology.validators().len() + topology.executors().len();
cfg.n_hosts = hosts;
let consensus = &topology.config().consensus_params;
cfg.security_param = consensus.security_param;
cfg.active_slot_coeff = consensus.active_slot_coeff;
let config = topology.config();
cfg.wallet = config.wallet_config.clone();
cfg.ids = Some(topology.nodes().map(|node| node.id).collect());
cfg.da_ports = Some(topology.nodes().map(|node| node.da_port).collect());
cfg.blend_ports = Some(topology.nodes().map(|node| node.blend_port).collect());
let da = &config.da_params;
cfg.subnetwork_size = da.subnetwork_size;
cfg.dispersal_factor = da.dispersal_factor;
cfg.num_samples = da.num_samples;
cfg.num_subnets = da.num_subnets;
cfg.old_blobs_check_interval = da.old_blobs_check_interval;
cfg.blobs_validity_duration = da.blobs_validity_duration;
cfg.global_params_path = if use_kzg_mount {
"/kzgrs_test_params/pol/proving_key.zkey".into()
} else {
da.global_params_path.clone()
};
cfg.min_dispersal_peers = da.policy_settings.min_dispersal_peers;
cfg.min_replication_peers = da.policy_settings.min_replication_peers;
cfg.monitor_failure_time_window = da.monitor_settings.failure_time_window;
cfg.balancer_interval = da.balancer_interval;
cfg.replication_settings = da.replication_settings;
cfg.retry_shares_limit = da.retry_shares_limit;
cfg.retry_commitments_limit = da.retry_commitments_limit;
cfg.tracing_settings = TracingSettings::default();
}
#[serde_as]
#[derive(Serialize)]
struct SerializableCfgSyncConfig {
port: u16,
n_hosts: usize,
timeout: u64,
security_param: NonZero<u32>,
active_slot_coeff: f64,
wallet: WalletConfig,
#[serde(skip_serializing_if = "Option::is_none")]
ids: Option<Vec<[u8; 32]>>,
#[serde(skip_serializing_if = "Option::is_none")]
da_ports: Option<Vec<u16>>,
#[serde(skip_serializing_if = "Option::is_none")]
blend_ports: Option<Vec<u16>>,
subnetwork_size: usize,
dispersal_factor: usize,
num_samples: u16,
num_subnets: u16,
#[serde_as(as = "MinimalBoundedDuration<0, SECOND>")]
old_blobs_check_interval: Duration,
#[serde_as(as = "MinimalBoundedDuration<0, SECOND>")]
blobs_validity_duration: Duration,
global_params_path: String,
min_dispersal_peers: usize,
min_replication_peers: usize,
#[serde_as(as = "MinimalBoundedDuration<0, SECOND>")]
monitor_failure_time_window: Duration,
#[serde_as(as = "MinimalBoundedDuration<0, SECOND>")]
balancer_interval: Duration,
replication_settings: ReplicationConfig,
retry_shares_limit: usize,
retry_commitments_limit: usize,
tracing_settings: TracingSettings,
}
impl From<&CfgSyncConfig> for SerializableCfgSyncConfig {
fn from(cfg: &CfgSyncConfig) -> Self {
Self {
port: cfg.port,
n_hosts: cfg.n_hosts,
timeout: cfg.timeout,
security_param: cfg.security_param,
active_slot_coeff: cfg.active_slot_coeff,
wallet: cfg.wallet.clone(),
ids: cfg.ids.clone(),
da_ports: cfg.da_ports.clone(),
blend_ports: cfg.blend_ports.clone(),
subnetwork_size: cfg.subnetwork_size,
dispersal_factor: cfg.dispersal_factor,
num_samples: cfg.num_samples,
num_subnets: cfg.num_subnets,
old_blobs_check_interval: cfg.old_blobs_check_interval,
blobs_validity_duration: cfg.blobs_validity_duration,
global_params_path: cfg.global_params_path.clone(),
min_dispersal_peers: cfg.min_dispersal_peers,
min_replication_peers: cfg.min_replication_peers,
monitor_failure_time_window: cfg.monitor_failure_time_window,
balancer_interval: cfg.balancer_interval,
replication_settings: cfg.replication_settings,
retry_shares_limit: cfg.retry_shares_limit,
retry_commitments_limit: cfg.retry_commitments_limit,
tracing_settings: cfg.tracing_settings.clone(),
}
}
}