Fix compose cfgsync determinism and wallet config

This commit is contained in:
andrussal 2025-11-26 04:42:08 +01:00
parent 7f13a12a3d
commit e7a67d09e5
38 changed files with 2000 additions and 116 deletions

9
.dockerignore Normal file
View File

@ -0,0 +1,9 @@
# General trim for runner images and CI builds
.git
target
.tmp
tests/workflows/.tmp*
book
scripts/build-rapidsnark.sh~
rust-project-all-in-one.txt
**/*.log

13
Cargo.lock generated
View File

@ -985,10 +985,13 @@ dependencies = [
[[package]]
name = "cfgsync"
version = "0.1.0"
source = "git+https://github.com/logos-co/nomos-node.git?branch=master#2f60a0372c228968c3526c341ebc7e58bbd178dd"
dependencies = [
"axum",
"clap",
"groth16",
"hex",
"integration-configs",
"key-management-system",
"nomos-core",
"nomos-da-network-core",
"nomos-executor",
@ -1003,7 +1006,7 @@ dependencies = [
"serde_path_to_error",
"serde_with",
"serde_yaml",
"tests",
"subnetworks-assignations",
"tokio",
"tracing",
]
@ -3125,6 +3128,7 @@ dependencies = [
"nomos-wallet",
"num-bigint",
"rand 0.8.5",
"serde",
"subnetworks-assignations",
"time",
"tracing",
@ -6778,15 +6782,18 @@ dependencies = [
[[package]]
name = "services-utils"
version = "0.1.0"
source = "git+https://github.com/logos-co/nomos-node.git?branch=master#2f60a0372c228968c3526c341ebc7e58bbd178dd"
dependencies = [
"async-trait",
"bincode",
"futures",
"hex",
"log",
"overwatch",
"overwatch-derive",
"serde",
"serde_json",
"thiserror 1.0.69",
"tokio",
"tracing",
]

View File

@ -1,11 +1,13 @@
[workspace]
members = [
"patches/services-utils",
"testing-framework/configs",
"testing-framework/core",
"testing-framework/runners/compose",
"testing-framework/runners/k8s",
"testing-framework/runners/local",
"testing-framework/workflows",
"testnet/cfgsync",
"tests/workflows",
]
resolver = "2"
@ -26,6 +28,10 @@ unsafe_code = "allow"
[workspace.lints.clippy]
all = "allow"
[patch."https://github.com/logos-co/nomos-node.git"]
cfgsync = { path = "testnet/cfgsync" }
services-utils = { path = "patches/services-utils" }
[workspace.dependencies]
# Local testing framework crates
integration-configs = { default-features = false, path = "testing-framework/configs" }
@ -86,6 +92,8 @@ async-trait = { default-features = false, version = "0.1" }
bytes = { default-features = false, version = "1.3" }
hex = { default-features = false, version = "0.4.3" }
libp2p = { default-features = false, version = "0.55" }
overwatch = { default-features = false, git = "https://github.com/logos-co/Overwatch", rev = "f5a9902" }
overwatch-derive = { default-features = false, git = "https://github.com/logos-co/Overwatch", rev = "f5a9902" }
rand = { default-features = false, version = "0.8" }
reqwest = { default-features = false, version = "0.12" }
serde = { default-features = true, version = "1.0", features = ["derive"] }

View File

@ -41,6 +41,7 @@ nomos-utils = { workspace = true }
nomos-wallet = { workspace = true }
num-bigint = { version = "0.4", default-features = false }
rand = { workspace = true }
serde = { workspace = true, features = ["derive"] }
subnetworks-assignations = { workspace = true }
time = { version = "0.3", default-features = true }
tracing = { workspace = true }

View File

@ -78,7 +78,9 @@ pub fn create_executor_config(config: GeneralConfig) -> ExecutorConfig {
starting_state: StartingState::Genesis {
genesis_tx: config.consensus_config.genesis_tx,
},
recovery_file: PathBuf::from("./recovery/cryptarchia.json"),
// Disable on-disk recovery in compose tests to avoid serde errors on
// non-string keys and keep services alive.
recovery_file: PathBuf::new(),
bootstrap: chain_service::BootstrapConfig {
prolonged_bootstrap_period: Duration::from_secs(3),
force_bootstrap: false,

View File

@ -76,7 +76,9 @@ pub fn create_validator_config(config: GeneralConfig) -> ValidatorConfig {
starting_state: StartingState::Genesis {
genesis_tx: config.consensus_config.genesis_tx,
},
recovery_file: PathBuf::from("./recovery/cryptarchia.json"),
// Disable on-disk recovery in compose tests to avoid serde errors on
// non-string keys and keep services alive.
recovery_file: PathBuf::new(),
bootstrap: chain_service::BootstrapConfig {
prolonged_bootstrap_period: config.bootstrapping_config.prolonged_bootstrap_period,
force_bootstrap: false,

View File

@ -4,7 +4,7 @@ use num_bigint::BigUint;
use zksign::{PublicKey, SecretKey};
/// Collection of wallet accounts that should be funded at genesis.
#[derive(Clone, Default, Debug)]
#[derive(Clone, Default, Debug, serde::Serialize, serde::Deserialize)]
pub struct WalletConfig {
pub accounts: Vec<WalletAccount>,
}
@ -44,7 +44,7 @@ impl WalletConfig {
}
/// Wallet account that holds funds in the genesis state.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct WalletAccount {
pub label: String,
pub secret_key: SecretKey,

View File

@ -58,7 +58,10 @@ fn inject_ibd_into_cryptarchia(yaml_value: &mut Value) {
Value::String("topic".into()),
Value::String(nomos_node::CONSENSUS_TOPIC.into()),
);
cryptarchia.insert(Value::String("network_adapter_settings".into()), Value::Mapping(network));
cryptarchia.insert(
Value::String("network_adapter_settings".into()),
Value::Mapping(network),
);
}
if !cryptarchia.contains_key(&Value::String("sync".into())) {
let mut orphan = Mapping::new();

View File

@ -62,7 +62,10 @@ fn inject_ibd_into_cryptarchia(yaml_value: &mut Value) {
Value::String("topic".into()),
Value::String(nomos_node::CONSENSUS_TOPIC.into()),
);
cryptarchia.insert(Value::String("network_adapter_settings".into()), Value::Mapping(network));
cryptarchia.insert(
Value::String("network_adapter_settings".into()),
Value::Mapping(network),
);
}
if !cryptarchia.contains_key(&Value::String("sync".into())) {
let mut orphan = Mapping::new();

View File

@ -2,14 +2,12 @@ use std::{fs::File, num::NonZero, path::Path, time::Duration};
use anyhow::{Context as _, Result};
use nomos_da_network_core::swarm::ReplicationConfig;
use nomos_tracing::metrics::otlp::OtlpMetricsConfig;
use nomos_tracing_service::{MetricsLayer, TracingSettings};
use nomos_tracing_service::TracingSettings;
use nomos_utils::bounded_duration::{MinimalBoundedDuration, SECOND};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use crate::topology::GeneratedTopology;
use crate::topology::{GeneratedTopology, configs::wallet::WalletConfig};
#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -19,6 +17,14 @@ pub struct CfgSyncConfig {
pub timeout: u64,
pub security_param: NonZero<u32>,
pub active_slot_coeff: f64,
#[serde(default)]
pub wallet: WalletConfig,
#[serde(default)]
pub ids: Option<Vec<[u8; 32]>>,
#[serde(default)]
pub da_ports: Option<Vec<u16>>,
#[serde(default)]
pub blend_ports: Option<Vec<u16>>,
pub subnetwork_size: usize,
pub dispersal_factor: usize,
pub num_samples: u16,
@ -70,7 +76,13 @@ pub fn apply_topology_overrides(
cfg.security_param = consensus.security_param;
cfg.active_slot_coeff = consensus.active_slot_coeff;
let da = &topology.config().da_params;
let config = topology.config();
cfg.wallet = config.wallet_config.clone();
cfg.ids = Some(topology.nodes().map(|node| node.id).collect());
cfg.da_ports = Some(topology.nodes().map(|node| node.da_port).collect());
cfg.blend_ports = Some(topology.nodes().map(|node| node.blend_port).collect());
let da = &config.da_params;
cfg.subnetwork_size = da.subnetwork_size;
cfg.dispersal_factor = da.dispersal_factor;
cfg.num_samples = da.num_samples;
@ -89,11 +101,7 @@ pub fn apply_topology_overrides(
cfg.replication_settings = da.replication_settings;
cfg.retry_shares_limit = da.retry_shares_limit;
cfg.retry_commitments_limit = da.retry_commitments_limit;
cfg.tracing_settings.metrics = MetricsLayer::Otlp(OtlpMetricsConfig {
endpoint: Url::parse("http://prometheus:9090/api/v1/otlp/v1/metrics")
.expect("valid prometheus otlp endpoint"),
host_identifier: String::new(),
});
cfg.tracing_settings = TracingSettings::default();
}
#[serde_as]
@ -104,6 +112,13 @@ struct SerializableCfgSyncConfig {
timeout: u64,
security_param: NonZero<u32>,
active_slot_coeff: f64,
wallet: WalletConfig,
#[serde(skip_serializing_if = "Option::is_none")]
ids: Option<Vec<[u8; 32]>>,
#[serde(skip_serializing_if = "Option::is_none")]
da_ports: Option<Vec<u16>>,
#[serde(skip_serializing_if = "Option::is_none")]
blend_ports: Option<Vec<u16>>,
subnetwork_size: usize,
dispersal_factor: usize,
num_samples: u16,
@ -133,6 +148,10 @@ impl From<&CfgSyncConfig> for SerializableCfgSyncConfig {
timeout: cfg.timeout,
security_param: cfg.security_param,
active_slot_coeff: cfg.active_slot_coeff,
wallet: cfg.wallet.clone(),
ids: cfg.ids.clone(),
da_ports: cfg.da_ports.clone(),
blend_ports: cfg.blend_ports.clone(),
subnetwork_size: cfg.subnetwork_size,
dispersal_factor: cfg.dispersal_factor,
num_samples: cfg.num_samples,

View File

@ -45,10 +45,6 @@ impl CleanupGuard for RunnerCleanup {
let preserve = env::var("COMPOSE_RUNNER_PRESERVE").is_ok()
|| env::var("TESTNET_RUNNER_PRESERVE").is_ok();
if preserve {
if let Some(mut handle) = self.cfgsync.take() {
handle.shutdown();
}
if let Some(workspace) = self.workspace.take() {
let keep = workspace.into_inner().keep();
eprintln!(

View File

@ -573,8 +573,24 @@ async fn ensure_remote_readiness_with_ports(
.map(|ports| readiness_url(HttpNodeRole::Executor, ports.api))
.collect::<Result<Vec<_>, _>>()?;
let validator_membership_urls = mapping
.validators
.iter()
.map(|ports| readiness_url(HttpNodeRole::Validator, ports.testing))
.collect::<Result<Vec<_>, _>>()?;
let executor_membership_urls = mapping
.executors
.iter()
.map(|ports| readiness_url(HttpNodeRole::Executor, ports.testing))
.collect::<Result<Vec<_>, _>>()?;
descriptors
.wait_remote_readiness(&validator_urls, &executor_urls, None, None)
.wait_remote_readiness(
&validator_urls,
&executor_urls,
Some(&validator_membership_urls),
Some(&executor_membership_urls),
)
.await
.map_err(|source| StackReadinessError::Remote { source })
}
@ -996,7 +1012,7 @@ impl CleanupGuard for ComposeCleanupGuard {
mod tests {
use std::{collections::HashMap, net::Ipv4Addr};
use cfgsync::config::{Host, create_node_configs};
use cfgsync::config::{Host, PortOverrides, create_node_configs};
use groth16::Fr;
use nomos_core::{
mantle::{GenesisTx as GenesisTxTrait, ledger::NoteId},
@ -1006,10 +1022,7 @@ mod tests {
use nomos_tracing_service::TracingSettings;
use testing_framework_core::{
scenario::ScenarioBuilder,
topology::{
GeneratedNodeConfig, GeneratedTopology, NodeRole as TopologyNodeRole,
configs::{consensus, da},
},
topology::{GeneratedNodeConfig, GeneratedTopology, NodeRole as TopologyNodeRole},
};
use zksign::PublicKey;
@ -1021,9 +1034,13 @@ mod tests {
let tracing_settings = tracing_settings(&topology);
let configs = create_node_configs(
&to_tests_consensus(&topology.config().consensus_params),
&to_tests_da(&topology.config().da_params),
&topology.config().consensus_params,
&topology.config().da_params,
&tracing_settings,
&topology.config().wallet_config,
Some(topology.nodes().map(|node| node.id).collect()),
Some(topology.nodes().map(|node| node.da_port).collect()),
Some(topology.nodes().map(|node| node.blend_port).collect()),
hosts,
);
let configs_by_identifier: HashMap<_, _> = configs
@ -1069,9 +1086,13 @@ mod tests {
let tracing_settings = tracing_settings(&topology);
let configs = create_node_configs(
&to_tests_consensus(&topology.config().consensus_params),
&to_tests_da(&topology.config().da_params),
&topology.config().consensus_params,
&topology.config().da_params,
&tracing_settings,
&topology.config().wallet_config,
Some(topology.nodes().map(|node| node.id).collect()),
Some(topology.nodes().map(|node| node.da_port).collect()),
Some(topology.nodes().map(|node| node.blend_port).collect()),
hosts,
);
let configs_by_identifier: HashMap<_, _> = configs
@ -1101,9 +1122,13 @@ mod tests {
let hosts = docker_style_hosts(&topology);
let configs = create_node_configs(
&to_tests_consensus(&topology.config().consensus_params),
&to_tests_da(&topology.config().da_params),
&topology.config().consensus_params,
&topology.config().da_params,
&tracing_settings,
&topology.config().wallet_config,
Some(topology.nodes().map(|node| node.id).collect()),
Some(topology.nodes().map(|node| node.da_port).collect()),
Some(topology.nodes().map(|node| node.blend_port).collect()),
hosts,
);
@ -1134,10 +1159,7 @@ mod tests {
fn host_from_node(node: &GeneratedNodeConfig) -> Host {
let identifier = identifier_for(node.role(), node.index());
let ip = Ipv4Addr::LOCALHOST;
let mut host = match node.role() {
TopologyNodeRole::Validator => Host::default_validator_from_ip(ip, identifier),
TopologyNodeRole::Executor => Host::default_executor_from_ip(ip, identifier),
};
let mut host = make_host(node.role(), ip, identifier);
host.network_port = node.network_port();
host.da_network_port = node.da_port;
host.blend_port = node.blend_port;
@ -1147,10 +1169,7 @@ mod tests {
fn docker_host(node: &GeneratedNodeConfig, octet: u8) -> Host {
let identifier = identifier_for(node.role(), node.index());
let ip = Ipv4Addr::new(172, 23, 0, octet);
let mut host = match node.role() {
TopologyNodeRole::Validator => Host::default_validator_from_ip(ip, identifier),
TopologyNodeRole::Executor => Host::default_executor_from_ip(ip, identifier),
};
let mut host = make_host(node.role(), ip, identifier);
host.network_port = node.network_port() + 1000;
host.da_network_port = node.da_port + 1000;
host.blend_port = node.blend_port + 1000;
@ -1176,33 +1195,17 @@ mod tests {
}
}
fn to_tests_consensus(
params: &consensus::ConsensusParams,
) -> tests::topology::configs::consensus::ConsensusParams {
tests::topology::configs::consensus::ConsensusParams {
n_participants: params.n_participants,
security_param: params.security_param,
active_slot_coeff: params.active_slot_coeff,
}
}
fn to_tests_da(params: &da::DaParams) -> tests::topology::configs::da::DaParams {
tests::topology::configs::da::DaParams {
subnetwork_size: params.subnetwork_size,
dispersal_factor: params.dispersal_factor,
num_samples: params.num_samples,
num_subnets: params.num_subnets,
old_blobs_check_interval: params.old_blobs_check_interval,
blobs_validity_duration: params.blobs_validity_duration,
global_params_path: params.global_params_path.clone(),
policy_settings: params.policy_settings.clone(),
monitor_settings: params.monitor_settings.clone(),
balancer_interval: params.balancer_interval,
redial_cooldown: params.redial_cooldown,
replication_settings: params.replication_settings,
subnets_refresh_interval: params.subnets_refresh_interval,
retry_shares_limit: params.retry_shares_limit,
retry_commitments_limit: params.retry_commitments_limit,
fn make_host(role: TopologyNodeRole, ip: Ipv4Addr, identifier: String) -> Host {
let ports = PortOverrides {
network_port: None,
da_network_port: None,
blend_port: None,
api_port: None,
testing_http_port: None,
};
match role {
TopologyNodeRole::Validator => Host::validator_from_ip(ip, identifier, ports),
TopologyNodeRole::Executor => Host::executor_from_ip(ip, identifier, ports),
}
}

View File

@ -0,0 +1,9 @@
# Build context trim for runner image
.git
**/target
.tmp
tests/workflows/.tmp*
book
scripts/build-rapidsnark.sh~
rust-project-all-in-one.txt
**/*.log

View File

@ -39,7 +39,8 @@ ENV NOMOS_CIRCUITS=/opt/circuits
# Use debug builds to keep the linker memory footprint low; we only need
# binaries for integration testing, not optimized releases.
RUN cargo build --all-features
RUN cargo build --all-features --workspace && \
cargo build -p nomos-node -p nomos-executor
# ===========================
# NODE IMAGE
@ -63,7 +64,6 @@ COPY --from=builder /opt/circuits /opt/circuits
COPY --from=builder /nomos/target/debug/nomos-node /usr/bin/nomos-node
COPY --from=builder /nomos/target/debug/nomos-executor /usr/bin/nomos-executor
COPY --from=builder /nomos/target/debug/nomos-cli /usr/bin/nomos-cli
COPY --from=builder /nomos/target/debug/cfgsync-server /usr/bin/cfgsync-server
COPY --from=builder /nomos/target/debug/cfgsync-client /usr/bin/cfgsync-client

63
testnet/Dockerfile Normal file
View File

@ -0,0 +1,63 @@
# syntax=docker/dockerfile:1
# check=skip=SecretsUsedInArgOrEnv
# Ignore warnings about sensitive information as this is test data.
ARG VERSION=v0.2.0
# ===========================
# BUILD IMAGE
# ===========================
FROM rust:1.91.0-slim-bookworm AS builder
ARG VERSION
LABEL maintainer="augustinas@status.im" \
source="https://github.com/logos-co/nomos-node" \
description="Nomos testnet build image"
WORKDIR /nomos
COPY . .
# Install dependencies needed for building RocksDB.
RUN apt-get update && apt-get install -yq \
git gcc g++ clang libssl-dev pkg-config ca-certificates curl
RUN chmod +x scripts/setup-nomos-circuits.sh && \
scripts/setup-nomos-circuits.sh "$VERSION" "/opt/circuits"
ENV NOMOS_CIRCUITS=/opt/circuits
RUN cargo build --release --all-features
# ===========================
# NODE IMAGE
# ===========================
FROM debian:bookworm-slim
ARG VERSION
LABEL maintainer="augustinas@status.im" \
source="https://github.com/logos-co/nomos-node" \
description="Nomos node image"
RUN apt-get update && apt-get install -yq \
libstdc++6 \
libssl3 \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
COPY --from=builder /opt/circuits /opt/circuits
COPY --from=builder /nomos/target/release/nomos-node /usr/bin/nomos-node
COPY --from=builder /nomos/target/release/nomos-executor /usr/bin/nomos-executor
COPY --from=builder /nomos/target/release/nomos-cli /usr/bin/nomos-cli
COPY --from=builder /nomos/target/release/cfgsync-server /usr/bin/cfgsync-server
COPY --from=builder /nomos/target/release/cfgsync-client /usr/bin/cfgsync-client
ENV NOMOS_CIRCUITS=/opt/circuits
EXPOSE 3000 8080 9000 60000
ENTRYPOINT ["/usr/bin/nomos-node"]

60
testnet/README.md Normal file
View File

@ -0,0 +1,60 @@
# Docker Compose Testnet for Nomos
The Nomos Docker Compose Testnet contains four distinct service types:
- **Nomos Node Services**: Multiple dynamically spawned Nomos nodes that synchronizes their configuration via cfgsync utility.
## Building
Upon making modifications to the codebase or the Dockerfile, the Nomos images must be rebuilt:
```bash
docker compose build
```
## Configuring
Configuration of the Docker testnet is accomplished using the `.env` file. An example configuration can be found in `.env.example`.
To adjust the count of Nomos nodes, modify the variable:
```bash
DOCKER_COMPOSE_LIBP2P_REPLICAS=100
```
## Running
Initiate the testnet by executing the following command:
```bash
docker compose up
```
This command will merge all output logs and display them in Stdout. For a more refined output, it's recommended to first run:
```bash
docker compose up -d
```
Followed by:
```bash
docker compose logs -f nomos-node
```
## Using testnet
Bootstrap node is accessible from the host via `3000` and `18080` ports. To expose other nomos nodes, please update `nomos-node` service in the `compose.yml` file with this configuration:
```bash
nomos-node-0:
ports:
- "3001-3010:3000" # Use range depending on the number of nomos node replicas.
- "18081-18190:18080"
```
After running `docker compose up`, the randomly assigned ports can be viewed with `ps` command:
```bash
docker compose ps
```

49
testnet/cfgsync.yaml Normal file
View File

@ -0,0 +1,49 @@
port: 4400
n_hosts: 4
timeout: 10
# ConsensusConfig related parameters
security_param: 10
active_slot_coeff: 0.9
# DaConfig related parameters
subnetwork_size: 2
dispersal_factor: 2
num_samples: 1
num_subnets: 2
old_blobs_check_interval: "5.0"
blobs_validity_duration: "60.0"
global_params_path: "/kzgrs_test_params"
min_dispersal_peers: 1
min_replication_peers: 1
monitor_failure_time_window: "5.0"
balancer_interval: "5.0"
# Dispersal mempool publish strategy
mempool_publish_strategy: !SampleSubnetworks
sample_threshold: 2
timeout: "2.0"
cooldown: "0.0001"
replication_settings:
seen_message_cache_size: 204800
seen_message_ttl: "900.0"
retry_shares_limit: 5
retry_commitments_limit: 5
# Tracing
tracing_settings:
logger: !Loki
endpoint: http://loki:3100/
host_identifier: node
tracing: !Otlp
endpoint: http://tempo:4317/
sample_ratio: 0.5
service_name: node
filter: !EnvFilter
filters:
nomos: debug
metrics: !Otlp
endpoint: http://prometheus:9090/api/v1/otlp/v1/metrics
host_identifier: node
console: None
level: INFO

View File

@ -0,0 +1,38 @@
[package]
categories = { workspace = true }
description = { workspace = true }
edition = { workspace = true }
keywords = { workspace = true }
license = { workspace = true }
name = "cfgsync"
readme = { workspace = true }
repository = { workspace = true }
version = { workspace = true }
[lints]
workspace = true
[dependencies]
axum = { default-features = false, features = ["http1", "http2", "json", "tokio"], version = "0.7.5" }
clap = { default-features = false, version = "4" }
groth16 = { workspace = true }
hex = { workspace = true }
integration-configs = { workspace = true }
key-management-system = { workspace = true }
nomos-core = { workspace = true }
nomos-da-network-core = { workspace = true }
nomos-executor = { workspace = true }
nomos-libp2p = { workspace = true }
nomos-node = { workspace = true }
nomos-tracing-service = { workspace = true }
nomos-utils = { workspace = true }
rand = { workspace = true }
reqwest = { workspace = true }
serde = { default-features = false, version = "1" }
serde_json = { default-features = false, version = "1.0" }
serde_path_to_error = "0.1"
serde_with = { workspace = true }
serde_yaml = "0.9"
subnetworks-assignations = { workspace = true }
tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" }
tracing = { workspace = true }

View File

@ -0,0 +1,151 @@
use std::{
collections::{HashMap, HashSet},
env, fs,
net::Ipv4Addr,
process,
str::FromStr,
};
use cfgsync::{
client::{FetchedConfig, get_config},
server::ClientIp,
};
use nomos_executor::config::Config as ExecutorConfig;
use nomos_libp2p::PeerId;
use nomos_node::Config as ValidatorConfig;
use serde::{Serialize, de::DeserializeOwned};
use subnetworks_assignations::{MembershipCreator, MembershipHandler, SubnetworkId};
fn parse_ip(ip_str: &str) -> Ipv4Addr {
ip_str.parse().unwrap_or_else(|_| {
eprintln!("Invalid IP format, defaulting to 127.0.0.1");
Ipv4Addr::LOCALHOST
})
}
fn parse_assignations(raw: &serde_json::Value) -> Option<HashMap<SubnetworkId, HashSet<PeerId>>> {
let assignations = raw
.pointer("/da_network/membership/assignations")?
.as_object()?;
let mut result = HashMap::new();
for (subnetwork, peers) in assignations {
let subnetwork_id = SubnetworkId::from_str(subnetwork).ok()?;
let mut members = HashSet::new();
for peer in peers.as_array()? {
if let Some(peer) = peer.as_str().and_then(|p| PeerId::from_str(p).ok()) {
members.insert(peer);
}
}
result.insert(subnetwork_id, members);
}
Some(result)
}
fn apply_da_assignations<
Membership: MembershipCreator<Id = PeerId> + MembershipHandler<NetworkId = SubnetworkId>,
>(
membership: &Membership,
assignations: HashMap<SubnetworkId, HashSet<PeerId>>,
) -> Membership {
let session_id = membership.session_id();
membership.init(session_id, assignations)
}
async fn pull_to_file<Config, F>(
payload: ClientIp,
url: &str,
config_file: &str,
apply_membership: F,
) -> Result<(), String>
where
Config: Serialize + DeserializeOwned,
F: FnOnce(&mut Config, HashMap<SubnetworkId, HashSet<PeerId>>),
{
let FetchedConfig { mut config, raw } = get_config::<Config>(payload, url).await?;
if let Some(assignations) = parse_assignations(&raw) {
apply_membership(&mut config, assignations);
}
let yaml = serde_yaml::to_string(&config)
.map_err(|err| format!("Failed to serialize config to YAML: {err}"))?;
fs::write(config_file, yaml).map_err(|err| format!("Failed to write config to file: {err}"))?;
println!("Config saved to {config_file}");
Ok(())
}
#[tokio::main]
async fn main() {
let config_file_path = env::var("CFG_FILE_PATH").unwrap_or_else(|_| "config.yaml".to_owned());
let server_addr =
env::var("CFG_SERVER_ADDR").unwrap_or_else(|_| "http://127.0.0.1:4400".to_owned());
let ip = parse_ip(&env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_owned()));
let identifier =
env::var("CFG_HOST_IDENTIFIER").unwrap_or_else(|_| "unidentified-node".to_owned());
let host_kind = env::var("CFG_HOST_KIND").unwrap_or_else(|_| "validator".to_owned());
let network_port = env::var("CFG_NETWORK_PORT")
.ok()
.and_then(|v| v.parse().ok());
let da_port = env::var("CFG_DA_PORT").ok().and_then(|v| v.parse().ok());
let blend_port = env::var("CFG_BLEND_PORT").ok().and_then(|v| v.parse().ok());
let api_port = env::var("CFG_API_PORT").ok().and_then(|v| v.parse().ok());
let testing_http_port = env::var("CFG_TESTING_HTTP_PORT")
.ok()
.and_then(|v| v.parse().ok());
let payload = ClientIp {
ip,
identifier,
network_port,
da_port,
blend_port,
api_port,
testing_http_port,
};
let node_config_endpoint = match host_kind.as_str() {
"executor" => format!("{server_addr}/executor"),
_ => format!("{server_addr}/validator"),
};
let config_result = match host_kind.as_str() {
"executor" => {
pull_to_file::<ExecutorConfig, _>(
payload,
&node_config_endpoint,
&config_file_path,
|config, assignations| {
config.da_network.membership =
apply_da_assignations(&config.da_network.membership, assignations);
},
)
.await
}
_ => {
pull_to_file::<ValidatorConfig, _>(
payload,
&node_config_endpoint,
&config_file_path,
|config, assignations| {
config.da_network.membership =
apply_da_assignations(&config.da_network.membership, assignations);
},
)
.await
}
};
// Handle error if the config request fails
if let Err(err) = config_result {
eprintln!("Error: {err}");
process::exit(1);
}
}

View File

@ -0,0 +1,29 @@
use std::{path::PathBuf, process};
use cfgsync::server::{CfgSyncConfig, cfgsync_app};
use clap::Parser;
use tokio::net::TcpListener;
#[derive(Parser, Debug)]
#[command(about = "CfgSync")]
struct Args {
config: PathBuf,
}
#[tokio::main]
async fn main() {
let cli = Args::parse();
let config = CfgSyncConfig::load_from_file(&cli.config).unwrap_or_else(|err| {
eprintln!("{err}");
process::exit(1);
});
let port = config.port;
let app = cfgsync_app(config.into());
println!("Server running on http://0.0.0.0:{port}");
let listener = TcpListener::bind(&format!("0.0.0.0:{port}")).await.unwrap();
axum::serve(listener, app).await.unwrap();
}

View File

@ -0,0 +1,46 @@
use reqwest::{Client, Response};
use serde::de::DeserializeOwned;
use crate::server::ClientIp;
#[derive(Debug)]
pub struct FetchedConfig<Config> {
pub config: Config,
pub raw: serde_json::Value,
}
async fn deserialize_response<Config: DeserializeOwned>(
response: Response,
) -> Result<FetchedConfig<Config>, String> {
let body = response
.text()
.await
.map_err(|error| format!("Failed to read response body: {error}"))?;
let raw: serde_json::Value =
serde_json::from_str(&body).map_err(|error| format!("Failed to parse body: {error}"))?;
let mut json_deserializer = serde_json::Deserializer::from_str(&body);
let config = serde_path_to_error::deserialize(&mut json_deserializer)
.map_err(|error| format!("Failed to deserialize body: {error}, raw body: {body}"))?;
Ok(FetchedConfig { config, raw })
}
pub async fn get_config<Config: DeserializeOwned>(
payload: ClientIp,
url: &str,
) -> Result<FetchedConfig<Config>, String> {
let client = Client::new();
let response = client
.post(url)
.json(&payload)
.send()
.await
.map_err(|err| format!("Failed to send IP announcement: {err}"))?;
if !response.status().is_success() {
return Err(format!("Server error: {:?}", response.status()));
}
deserialize_response(response).await
}

View File

@ -0,0 +1,518 @@
use std::{collections::HashMap, net::Ipv4Addr, str::FromStr as _};
use groth16::fr_to_bytes;
use hex;
use integration_configs::topology::configs::{
GeneralConfig,
api::GeneralApiConfig,
blend::{GeneralBlendConfig, create_blend_configs},
bootstrap::{SHORT_PROLONGED_BOOTSTRAP_PERIOD, create_bootstrap_configs},
consensus::{
ConsensusParams, GeneralConsensusConfig, ProviderInfo, create_consensus_configs,
create_genesis_tx_with_declarations,
},
da::{DaParams, GeneralDaConfig, create_da_configs},
network::{NetworkParams, create_network_configs},
time::default_time_config,
tracing::GeneralTracingConfig,
wallet::WalletConfig,
};
use key_management_system::{
backend::preload::PreloadKMSBackendSettings,
keys::{Ed25519Key, Key, ZkKey},
};
use nomos_core::{
mantle::GenesisTx as _,
sdp::{Locator, ServiceType},
};
use nomos_libp2p::{Multiaddr, PeerId, Protocol, ed25519};
use nomos_tracing_service::{LoggerLayer, MetricsLayer, TracingLayer, TracingSettings};
use nomos_utils::net::get_available_udp_port;
use rand::{Rng as _, thread_rng};
const DEFAULT_LIBP2P_NETWORK_PORT: u16 = 3000;
const DEFAULT_DA_NETWORK_PORT: u16 = 3300;
const DEFAULT_BLEND_PORT: u16 = 3400;
const DEFAULT_API_PORT: u16 = 18080;
#[derive(Copy, Clone, Eq, PartialEq, Hash)]
pub enum HostKind {
Validator,
Executor,
}
#[derive(Eq, PartialEq, Hash, Clone)]
pub struct Host {
pub kind: HostKind,
pub ip: Ipv4Addr,
pub identifier: String,
pub network_port: u16,
pub da_network_port: u16,
pub blend_port: u16,
pub api_port: u16,
pub testing_http_port: u16,
}
#[derive(Clone, Copy)]
pub struct PortOverrides {
pub network_port: Option<u16>,
pub da_network_port: Option<u16>,
pub blend_port: Option<u16>,
pub api_port: Option<u16>,
pub testing_http_port: Option<u16>,
}
impl Host {
fn from_parts(kind: HostKind, ip: Ipv4Addr, identifier: String, ports: PortOverrides) -> Self {
Self {
kind,
ip,
identifier,
network_port: ports.network_port.unwrap_or(DEFAULT_LIBP2P_NETWORK_PORT),
da_network_port: ports.da_network_port.unwrap_or(DEFAULT_DA_NETWORK_PORT),
blend_port: ports.blend_port.unwrap_or(DEFAULT_BLEND_PORT),
api_port: ports.api_port.unwrap_or(DEFAULT_API_PORT),
testing_http_port: ports.testing_http_port.unwrap_or(DEFAULT_API_PORT + 1),
}
}
#[must_use]
pub fn validator_from_ip(ip: Ipv4Addr, identifier: String, ports: PortOverrides) -> Self {
Self::from_parts(HostKind::Validator, ip, identifier, ports)
}
#[must_use]
pub fn executor_from_ip(ip: Ipv4Addr, identifier: String, ports: PortOverrides) -> Self {
Self::from_parts(HostKind::Executor, ip, identifier, ports)
}
}
#[must_use]
pub fn create_node_configs(
consensus_params: &ConsensusParams,
da_params: &DaParams,
tracing_settings: &TracingSettings,
wallet_config: &WalletConfig,
ids: Option<Vec<[u8; 32]>>,
da_ports: Option<Vec<u16>>,
blend_ports: Option<Vec<u16>>,
hosts: Vec<Host>,
) -> HashMap<Host, GeneralConfig> {
let mut hosts = hosts;
hosts.sort_by_key(|host| {
let index = host
.identifier
.rsplit('-')
.next()
.and_then(|raw| raw.parse::<usize>().ok())
.unwrap_or(0);
let kind = match host.kind {
HostKind::Validator => 0,
HostKind::Executor => 1,
};
(kind, index)
});
assert_eq!(
hosts.len(),
consensus_params.n_participants,
"host count must match consensus participants"
);
let ids = ids.unwrap_or_else(|| {
let mut generated = vec![[0; 32]; consensus_params.n_participants];
for id in &mut generated {
thread_rng().fill(id);
}
generated
});
assert_eq!(
ids.len(),
consensus_params.n_participants,
"pre-generated ids must match participant count"
);
let ports = da_ports.unwrap_or_else(|| {
(0..consensus_params.n_participants)
.map(|_| get_available_udp_port().unwrap())
.collect()
});
assert_eq!(
ports.len(),
consensus_params.n_participants,
"da port list must match participant count"
);
let blend_ports = blend_ports.unwrap_or_else(|| hosts.iter().map(|h| h.blend_port).collect());
assert_eq!(
blend_ports.len(),
consensus_params.n_participants,
"blend port list must match participant count"
);
let mut consensus_configs = create_consensus_configs(&ids, consensus_params, wallet_config);
let bootstrap_configs = create_bootstrap_configs(&ids, SHORT_PROLONGED_BOOTSTRAP_PERIOD);
let da_configs = create_da_configs(&ids, da_params, &ports);
let network_configs = create_network_configs(&ids, &NetworkParams::default());
let blend_configs = create_blend_configs(&ids, &blend_ports);
let api_configs = hosts
.iter()
.map(|host| GeneralApiConfig {
address: format!("0.0.0.0:{}", host.api_port).parse().unwrap(),
testing_http_address: format!("0.0.0.0:{}", host.testing_http_port)
.parse()
.unwrap(),
})
.collect::<Vec<_>>();
let mut configured_hosts = HashMap::new();
let initial_peer_templates: Vec<Vec<Multiaddr>> = network_configs
.iter()
.map(|cfg| cfg.backend.initial_peers.clone())
.collect();
let original_network_ports: Vec<u16> = network_configs
.iter()
.map(|cfg| cfg.backend.inner.port)
.collect();
let peer_ids: Vec<PeerId> = ids
.iter()
.map(|bytes| {
let mut key_bytes = *bytes;
let secret =
ed25519::SecretKey::try_from_bytes(&mut key_bytes).expect("valid ed25519 key");
PeerId::from_public_key(&ed25519::Keypair::from(secret).public().into())
})
.collect();
let host_network_init_peers = rewrite_initial_peers(
&initial_peer_templates,
&original_network_ports,
&hosts,
&peer_ids,
);
let providers = create_providers(&hosts, &consensus_configs, &blend_configs, &da_configs);
// Update genesis TX to contain Blend and DA providers.
let ledger_tx = consensus_configs[0]
.genesis_tx
.mantle_tx()
.ledger_tx
.clone();
let genesis_tx = create_genesis_tx_with_declarations(ledger_tx, providers);
for c in &mut consensus_configs {
c.genesis_tx = genesis_tx.clone();
}
// Set Blend and DA keys in KMS of each node config.
let kms_configs = create_kms_configs(&blend_configs, &da_configs);
for (i, host) in hosts.into_iter().enumerate() {
let consensus_config = consensus_configs[i].clone();
let api_config = api_configs[i].clone();
// DA Libp2p network config.
let mut da_config = da_configs[i].clone();
da_config.listening_address = Multiaddr::from_str(&format!(
"/ip4/0.0.0.0/udp/{}/quic-v1",
host.da_network_port,
))
.unwrap();
if matches!(host.kind, HostKind::Validator) {
da_config.policy_settings.min_dispersal_peers = 0;
}
// Libp2p network config.
let mut network_config = network_configs[i].clone();
network_config.backend.inner.host = Ipv4Addr::from_str("0.0.0.0").unwrap();
network_config.backend.inner.port = host.network_port;
network_config.backend.initial_peers = host_network_init_peers[i].clone();
network_config.backend.inner.nat_config = nomos_libp2p::NatSettings::Static {
external_address: Multiaddr::from_str(&format!(
"/ip4/{}/udp/{}/quic-v1",
host.ip, host.network_port
))
.unwrap(),
};
// Blend network config.
let mut blend_config = blend_configs[i].clone();
blend_config.backend_core.listening_address =
Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/{}/quic-v1", host.blend_port)).unwrap();
// Tracing config.
let tracing_config =
update_tracing_identifier(tracing_settings.clone(), host.identifier.clone());
// Time config
let time_config = default_time_config();
configured_hosts.insert(
host.clone(),
GeneralConfig {
consensus_config,
bootstrapping_config: bootstrap_configs[i].clone(),
da_config,
network_config,
blend_config,
api_config,
tracing_config,
time_config,
kms_config: kms_configs[i].clone(),
},
);
}
configured_hosts
}
fn create_providers(
hosts: &[Host],
consensus_configs: &[GeneralConsensusConfig],
blend_configs: &[GeneralBlendConfig],
da_configs: &[GeneralDaConfig],
) -> Vec<ProviderInfo> {
let mut providers: Vec<_> = da_configs
.iter()
.enumerate()
.map(|(i, da_conf)| ProviderInfo {
service_type: ServiceType::DataAvailability,
provider_sk: da_conf.signer.clone(),
zk_sk: da_conf.secret_zk_key.clone(),
locator: Locator(
Multiaddr::from_str(&format!(
"/ip4/{}/udp/{}/quic-v1",
hosts[i].ip, hosts[i].da_network_port
))
.unwrap(),
),
note: consensus_configs[0].da_notes[i].clone(),
})
.collect();
providers.extend(blend_configs.iter().enumerate().map(|(i, blend_conf)| {
ProviderInfo {
service_type: ServiceType::BlendNetwork,
provider_sk: blend_conf.signer.clone(),
zk_sk: blend_conf.secret_zk_key.clone(),
locator: Locator(
Multiaddr::from_str(&format!(
"/ip4/{}/udp/{}/quic-v1",
hosts[i].ip, hosts[i].blend_port
))
.unwrap(),
),
note: consensus_configs[0].blend_notes[i].clone(),
}
}));
providers
}
fn rewrite_initial_peers(
templates: &[Vec<Multiaddr>],
original_ports: &[u16],
hosts: &[Host],
peer_ids: &[PeerId],
) -> Vec<Vec<Multiaddr>> {
templates
.iter()
.enumerate()
.map(|(node_idx, peers)| {
peers
.iter()
.filter_map(|addr| find_matching_host(addr, original_ports))
.filter(|&peer_idx| peer_idx != node_idx)
.map(|peer_idx| {
Multiaddr::from_str(&format!(
"/ip4/{}/udp/{}/quic-v1/p2p/{}",
hosts[peer_idx].ip, hosts[peer_idx].network_port, peer_ids[peer_idx]
))
.expect("valid peer multiaddr")
})
.collect()
})
.collect()
}
fn find_matching_host(addr: &Multiaddr, original_ports: &[u16]) -> Option<usize> {
extract_udp_port(addr).and_then(|port| {
original_ports
.iter()
.position(|candidate| *candidate == port)
})
}
fn extract_udp_port(addr: &Multiaddr) -> Option<u16> {
addr.iter().find_map(|protocol| {
if let Protocol::Udp(port) = protocol {
Some(port)
} else {
None
}
})
}
fn update_tracing_identifier(
settings: TracingSettings,
identifier: String,
) -> GeneralTracingConfig {
GeneralTracingConfig {
tracing_settings: TracingSettings {
logger: match settings.logger {
LoggerLayer::Loki(mut config) => {
config.host_identifier.clone_from(&identifier);
LoggerLayer::Loki(config)
}
other => other,
},
tracing: match settings.tracing {
TracingLayer::Otlp(mut config) => {
config.service_name.clone_from(&identifier);
TracingLayer::Otlp(config)
}
other @ TracingLayer::None => other,
},
filter: settings.filter,
metrics: match settings.metrics {
MetricsLayer::Otlp(mut config) => {
config.host_identifier = identifier;
MetricsLayer::Otlp(config)
}
other @ MetricsLayer::None => other,
},
console: settings.console,
level: settings.level,
},
}
}
fn create_kms_configs(
blend_configs: &[GeneralBlendConfig],
da_configs: &[GeneralDaConfig],
) -> Vec<PreloadKMSBackendSettings> {
da_configs
.iter()
.zip(blend_configs.iter())
.map(|(da_conf, blend_conf)| PreloadKMSBackendSettings {
keys: [
(
hex::encode(blend_conf.signer.verifying_key().as_bytes()),
Key::Ed25519(Ed25519Key::new(blend_conf.signer.clone())),
),
(
hex::encode(fr_to_bytes(
&blend_conf.secret_zk_key.to_public_key().into_inner(),
)),
Key::Zk(ZkKey::new(blend_conf.secret_zk_key.clone())),
),
(
hex::encode(da_conf.signer.verifying_key().as_bytes()),
Key::Ed25519(Ed25519Key::new(da_conf.signer.clone())),
),
(
hex::encode(fr_to_bytes(
&da_conf.secret_zk_key.to_public_key().into_inner(),
)),
Key::Zk(ZkKey::new(da_conf.secret_zk_key.clone())),
),
]
.into(),
})
.collect()
}
#[cfg(test)]
mod cfgsync_tests {
use std::{net::Ipv4Addr, num::NonZero, str::FromStr as _, time::Duration};
use integration_configs::topology::configs::{
consensus::ConsensusParams, da::DaParams, wallet::WalletConfig,
};
use nomos_da_network_core::swarm::{
DAConnectionMonitorSettings, DAConnectionPolicySettings, ReplicationConfig,
};
use nomos_libp2p::{Multiaddr, Protocol};
use nomos_tracing_service::{
ConsoleLayer, FilterLayer, LoggerLayer, MetricsLayer, TracingLayer, TracingSettings,
};
use tracing::Level;
use super::{Host, HostKind, create_node_configs};
#[test]
fn basic_ip_list() {
let hosts = (0..10)
.map(|i| Host {
kind: HostKind::Validator,
ip: Ipv4Addr::from_str(&format!("10.1.1.{i}")).unwrap(),
identifier: "node".into(),
network_port: 3000,
da_network_port: 4044,
blend_port: 5000,
api_port: 18080,
testing_http_port: 18081,
})
.collect();
let configs = create_node_configs(
&ConsensusParams {
n_participants: 10,
security_param: NonZero::new(10).unwrap(),
active_slot_coeff: 0.9,
},
&DaParams {
subnetwork_size: 2,
dispersal_factor: 1,
num_samples: 1,
num_subnets: 2,
old_blobs_check_interval: Duration::from_secs(5),
blobs_validity_duration: Duration::from_secs(u64::MAX),
global_params_path: String::new(),
policy_settings: DAConnectionPolicySettings::default(),
monitor_settings: DAConnectionMonitorSettings::default(),
balancer_interval: Duration::ZERO,
redial_cooldown: Duration::ZERO,
replication_settings: ReplicationConfig {
seen_message_cache_size: 0,
seen_message_ttl: Duration::ZERO,
},
subnets_refresh_interval: Duration::from_secs(1),
retry_shares_limit: 1,
retry_commitments_limit: 1,
},
&TracingSettings {
logger: LoggerLayer::None,
tracing: TracingLayer::None,
filter: FilterLayer::None,
metrics: MetricsLayer::None,
console: ConsoleLayer::None,
level: Level::DEBUG,
},
&WalletConfig::default(),
None,
None,
None,
hosts,
);
for (host, config) in &configs {
let network_port = config.network_config.backend.inner.port;
let da_network_port = extract_port(&config.da_config.listening_address);
let blend_port = extract_port(&config.blend_config.backend_core.listening_address);
assert_eq!(network_port, host.network_port);
assert_eq!(da_network_port, host.da_network_port);
assert_eq!(blend_port, host.blend_port);
}
}
fn extract_port(multiaddr: &Multiaddr) -> u16 {
multiaddr
.iter()
.find_map(|protocol| match protocol {
Protocol::Udp(port) => Some(port),
_ => None,
})
.unwrap()
}
}

View File

@ -0,0 +1,4 @@
pub mod client;
pub mod config;
pub mod repo;
pub mod server;

141
testnet/cfgsync/src/repo.rs Normal file
View File

@ -0,0 +1,141 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::Duration,
};
use integration_configs::topology::configs::{
GeneralConfig, consensus::ConsensusParams, da::DaParams, wallet::WalletConfig,
};
use nomos_tracing_service::TracingSettings;
use tokio::{sync::oneshot::Sender, time::timeout};
use crate::{
config::{Host, create_node_configs},
server::CfgSyncConfig,
};
pub enum RepoResponse {
Config(Box<GeneralConfig>),
Timeout,
}
pub struct ConfigRepo {
waiting_hosts: Mutex<HashMap<Host, Sender<RepoResponse>>>,
n_hosts: usize,
consensus_params: ConsensusParams,
da_params: DaParams,
tracing_settings: TracingSettings,
wallet_config: WalletConfig,
timeout_duration: Duration,
ids: Option<Vec<[u8; 32]>>,
da_ports: Option<Vec<u16>>,
blend_ports: Option<Vec<u16>>,
}
impl From<CfgSyncConfig> for Arc<ConfigRepo> {
fn from(config: CfgSyncConfig) -> Self {
let consensus_params = config.to_consensus_params();
let da_params = config.to_da_params();
let tracing_settings = config.to_tracing_settings();
let wallet_config = config.wallet_config();
let ids = config.ids;
let da_ports = config.da_ports;
let blend_ports = config.blend_ports;
ConfigRepo::new(
config.n_hosts,
consensus_params,
da_params,
tracing_settings,
wallet_config,
ids,
da_ports,
blend_ports,
Duration::from_secs(config.timeout),
)
}
}
impl ConfigRepo {
#[must_use]
pub fn new(
n_hosts: usize,
consensus_params: ConsensusParams,
da_params: DaParams,
tracing_settings: TracingSettings,
wallet_config: WalletConfig,
ids: Option<Vec<[u8; 32]>>,
da_ports: Option<Vec<u16>>,
blend_ports: Option<Vec<u16>>,
timeout_duration: Duration,
) -> Arc<Self> {
let repo = Arc::new(Self {
waiting_hosts: Mutex::new(HashMap::new()),
n_hosts,
consensus_params,
da_params,
tracing_settings,
wallet_config,
ids,
da_ports,
blend_ports,
timeout_duration,
});
let repo_clone = Arc::clone(&repo);
tokio::spawn(async move {
repo_clone.run().await;
});
repo
}
pub fn register(&self, host: Host, reply_tx: Sender<RepoResponse>) {
let mut waiting_hosts = self.waiting_hosts.lock().unwrap();
waiting_hosts.insert(host, reply_tx);
}
async fn run(&self) {
let timeout_duration = self.timeout_duration;
if timeout(timeout_duration, self.wait_for_hosts()).await == Ok(()) {
println!("All hosts have announced their IPs");
let mut waiting_hosts = self.waiting_hosts.lock().unwrap();
let hosts = waiting_hosts.keys().cloned().collect();
let configs = create_node_configs(
&self.consensus_params,
&self.da_params,
&self.tracing_settings,
&self.wallet_config,
self.ids.clone(),
self.da_ports.clone(),
self.blend_ports.clone(),
hosts,
);
for (host, sender) in waiting_hosts.drain() {
let config = configs.get(&host).expect("host should have a config");
let _ = sender.send(RepoResponse::Config(Box::new(config.to_owned())));
}
} else {
println!("Timeout: Not all hosts announced within the time limit");
let mut waiting_hosts = self.waiting_hosts.lock().unwrap();
for (_, sender) in waiting_hosts.drain() {
let _ = sender.send(RepoResponse::Timeout);
}
}
}
async fn wait_for_hosts(&self) {
loop {
if self.waiting_hosts.lock().unwrap().len() >= self.n_hosts {
break;
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}

View File

@ -0,0 +1,297 @@
use std::{fs, net::Ipv4Addr, num::NonZero, path::PathBuf, sync::Arc, time::Duration};
use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::post};
use integration_configs::{
nodes::{executor::create_executor_config, validator::create_validator_config},
topology::configs::{consensus::ConsensusParams, da::DaParams, wallet::WalletConfig},
};
use nomos_da_network_core::swarm::{
DAConnectionMonitorSettings, DAConnectionPolicySettings, ReplicationConfig,
};
use nomos_tracing_service::TracingSettings;
use nomos_utils::bounded_duration::{MinimalBoundedDuration, SECOND};
use serde::{Deserialize, Serialize};
use serde_json::json;
use serde_with::serde_as;
use subnetworks_assignations::MembershipHandler;
use tokio::sync::oneshot::channel;
use crate::{
config::{Host, PortOverrides},
repo::{ConfigRepo, RepoResponse},
};
#[serde_as]
#[derive(Debug, Deserialize)]
pub struct CfgSyncConfig {
pub port: u16,
pub n_hosts: usize,
pub timeout: u64,
// ConsensusConfig related parameters
pub security_param: NonZero<u32>,
pub active_slot_coeff: f64,
pub wallet: WalletConfig,
#[serde(default)]
pub ids: Option<Vec<[u8; 32]>>,
#[serde(default)]
pub da_ports: Option<Vec<u16>>,
#[serde(default)]
pub blend_ports: Option<Vec<u16>>,
// DaConfig related parameters
pub subnetwork_size: usize,
pub dispersal_factor: usize,
pub num_samples: u16,
pub num_subnets: u16,
#[serde_as(as = "MinimalBoundedDuration<0, SECOND>")]
pub old_blobs_check_interval: Duration,
#[serde_as(as = "MinimalBoundedDuration<0, SECOND>")]
pub blobs_validity_duration: Duration,
pub global_params_path: String,
pub min_dispersal_peers: usize,
pub min_replication_peers: usize,
#[serde_as(as = "MinimalBoundedDuration<0, SECOND>")]
pub monitor_failure_time_window: Duration,
#[serde_as(as = "MinimalBoundedDuration<0, SECOND>")]
pub balancer_interval: Duration,
pub replication_settings: ReplicationConfig,
pub retry_shares_limit: usize,
pub retry_commitments_limit: usize,
// Tracing params
pub tracing_settings: TracingSettings,
}
impl CfgSyncConfig {
pub fn load_from_file(file_path: &PathBuf) -> Result<Self, String> {
let config_content = fs::read_to_string(file_path)
.map_err(|err| format!("Failed to read config file: {err}"))?;
serde_yaml::from_str(&config_content)
.map_err(|err| format!("Failed to parse config file: {err}"))
}
#[must_use]
pub const fn to_consensus_params(&self) -> ConsensusParams {
ConsensusParams {
n_participants: self.n_hosts,
security_param: self.security_param,
active_slot_coeff: self.active_slot_coeff,
}
}
#[must_use]
pub fn to_da_params(&self) -> DaParams {
DaParams {
subnetwork_size: self.subnetwork_size,
dispersal_factor: self.dispersal_factor,
num_samples: self.num_samples,
num_subnets: self.num_subnets,
old_blobs_check_interval: self.old_blobs_check_interval,
blobs_validity_duration: self.blobs_validity_duration,
global_params_path: self.global_params_path.clone(),
policy_settings: DAConnectionPolicySettings {
min_dispersal_peers: self.min_dispersal_peers,
min_replication_peers: self.min_replication_peers,
max_dispersal_failures: 3,
max_sampling_failures: 3,
max_replication_failures: 3,
malicious_threshold: 10,
},
monitor_settings: DAConnectionMonitorSettings {
failure_time_window: self.monitor_failure_time_window,
..Default::default()
},
balancer_interval: self.balancer_interval,
redial_cooldown: Duration::ZERO,
replication_settings: self.replication_settings,
subnets_refresh_interval: Duration::from_secs(30),
retry_shares_limit: self.retry_shares_limit,
retry_commitments_limit: self.retry_commitments_limit,
}
}
#[must_use]
pub fn to_tracing_settings(&self) -> TracingSettings {
self.tracing_settings.clone()
}
#[must_use]
pub fn wallet_config(&self) -> WalletConfig {
self.wallet.clone()
}
}
#[derive(Serialize, Deserialize)]
pub struct ClientIp {
pub ip: Ipv4Addr,
pub identifier: String,
#[serde(default)]
pub network_port: Option<u16>,
#[serde(default)]
pub da_port: Option<u16>,
#[serde(default)]
pub blend_port: Option<u16>,
#[serde(default)]
pub api_port: Option<u16>,
#[serde(default)]
pub testing_http_port: Option<u16>,
}
async fn validator_config(
State(config_repo): State<Arc<ConfigRepo>>,
Json(payload): Json<ClientIp>,
) -> impl IntoResponse {
let ClientIp {
ip,
identifier,
network_port,
da_port,
blend_port,
api_port,
testing_http_port,
} = payload;
let ports = PortOverrides {
network_port,
da_network_port: da_port,
blend_port,
api_port,
testing_http_port,
};
let (reply_tx, reply_rx) = channel();
config_repo.register(Host::validator_from_ip(ip, identifier, ports), reply_tx);
(reply_rx.await).map_or_else(
|_| (StatusCode::INTERNAL_SERVER_ERROR, "Error receiving config").into_response(),
|config_response| match config_response {
RepoResponse::Config(config) => {
let config = create_validator_config(*config);
let mut value =
serde_json::to_value(&config).expect("validator config should serialize");
inject_defaults(&mut value);
override_api_ports(&mut value, &ports);
inject_da_assignations(&mut value, &config.da_network.membership);
override_min_session_members(&mut value);
(StatusCode::OK, Json(value)).into_response()
}
RepoResponse::Timeout => (StatusCode::REQUEST_TIMEOUT).into_response(),
},
)
}
async fn executor_config(
State(config_repo): State<Arc<ConfigRepo>>,
Json(payload): Json<ClientIp>,
) -> impl IntoResponse {
let ClientIp {
ip,
identifier,
network_port,
da_port,
blend_port,
api_port,
testing_http_port,
} = payload;
let ports = PortOverrides {
network_port,
da_network_port: da_port,
blend_port,
api_port,
testing_http_port,
};
let (reply_tx, reply_rx) = channel();
config_repo.register(Host::executor_from_ip(ip, identifier, ports), reply_tx);
(reply_rx.await).map_or_else(
|_| (StatusCode::INTERNAL_SERVER_ERROR, "Error receiving config").into_response(),
|config_response| match config_response {
RepoResponse::Config(config) => {
let config = create_executor_config(*config);
let mut value =
serde_json::to_value(&config).expect("executor config should serialize");
inject_defaults(&mut value);
override_api_ports(&mut value, &ports);
inject_da_assignations(&mut value, &config.da_network.membership);
override_min_session_members(&mut value);
(StatusCode::OK, Json(value)).into_response()
}
RepoResponse::Timeout => (StatusCode::REQUEST_TIMEOUT).into_response(),
},
)
}
pub fn cfgsync_app(config_repo: Arc<ConfigRepo>) -> Router {
Router::new()
.route("/validator", post(validator_config))
.route("/executor", post(executor_config))
.with_state(config_repo)
}
fn override_api_ports(config: &mut serde_json::Value, ports: &PortOverrides) {
if let Some(api_port) = ports.api_port {
if let Some(address) = config.pointer_mut("/http/backend_settings/address") {
*address = json!(format!("0.0.0.0:{api_port}"));
}
}
if let Some(testing_port) = ports.testing_http_port {
if let Some(address) = config.pointer_mut("/testing_http/backend_settings/address") {
*address = json!(format!("0.0.0.0:{testing_port}"));
}
}
}
fn inject_da_assignations(
config: &mut serde_json::Value,
membership: &nomos_node::NomosDaMembership,
) {
let assignations: std::collections::HashMap<String, Vec<String>> = membership
.subnetworks()
.into_iter()
.map(|(subnet_id, members)| {
(
subnet_id.to_string(),
members.into_iter().map(|peer| peer.to_string()).collect(),
)
})
.collect();
if let Some(membership) = config.pointer_mut("/da_network/membership") {
if let Some(map) = membership.as_object_mut() {
map.insert("assignations".to_string(), serde_json::json!(assignations));
}
}
}
fn override_min_session_members(config: &mut serde_json::Value) {
if let Some(value) = config.pointer_mut("/da_network/min_session_members") {
*value = serde_json::json!(1);
}
}
fn inject_defaults(config: &mut serde_json::Value) {
if let Some(cryptarchia) = config
.get_mut("cryptarchia")
.and_then(|v| v.as_object_mut())
{
let bootstrap = cryptarchia
.entry("bootstrap")
.or_insert_with(|| serde_json::json!({}));
if let Some(bootstrap_map) = bootstrap.as_object_mut() {
bootstrap_map
.entry("ibd")
.or_insert_with(|| serde_json::json!({ "peers": [], "delay_before_new_download": { "secs": 10, "nanos": 0 } }));
}
cryptarchia
.entry("network_adapter_settings")
.or_insert_with(|| serde_json::json!({ "topic": "/cryptarchia/proto" }));
cryptarchia.entry("sync").or_insert_with(|| {
serde_json::json!({
"orphan": { "max_orphan_cache_size": 5 }
})
});
}
}

View File

@ -0,0 +1,8 @@
apiVersion: 1
providers:
- name: 'default'
orgId: 1
folder: ''
type: 'file'
options:
path: '/var/lib/grafana/dashboards'

View File

@ -0,0 +1,237 @@
{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": {
"type": "grafana",
"uid": "-- Grafana --"
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"fiscalYearStartMonth": 0,
"graphTooltip": 0,
"id": 1,
"links": [],
"panels": [
{
"datasource": {
"type": "prometheus",
"uid": "PBFA97CFB590B2093"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 0
},
"id": 2,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.4.0",
"targets": [
{
"disableTextWrap": false,
"editorMode": "builder",
"expr": "da_mempool_pending_items",
"fullMetaSearch": false,
"includeNullMetadata": true,
"legendFormat": "__auto",
"range": true,
"refId": "A",
"useBackend": false
}
],
"title": "Mempool: Pending DA blobs",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "PBFA97CFB590B2093"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"barWidthFactor": 0.6,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 8
},
"id": 1,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "11.4.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "PBFA97CFB590B2093"
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "consensus_processed_blocks",
"fullMetaSearch": false,
"includeNullMetadata": true,
"legendFormat": "__auto",
"range": true,
"refId": "A",
"useBackend": false
}
],
"title": "Consensus: Processed Blocks",
"type": "timeseries"
}
],
"preload": false,
"schemaVersion": 40,
"tags": [],
"templating": {
"list": []
},
"time": {
"from": "now-6h",
"to": "now"
},
"timepicker": {},
"timezone": "browser",
"title": "Testnet Metrics",
"uid": "ce6ebepwk737kf",
"version": 5,
"weekStart": ""
}

View File

@ -0,0 +1,37 @@
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
org_id: 1
url: http://prometheus:9090
is_default: true
version: 1
editable: true
- name: Tempo
type: tempo
access: proxy
org_id: 1
url: http://tempo:3200
is_default: false
version: 1
editable: true
uid: tempods
- name: Loki
type: loki
access: proxy
org_id: 1
url: http://loki:3100
is_default: false
version: 1
editable: true
jsonData:
derivedFields:
- name: trace_id
matcherRegex: "\"trace_id\":\"(\\w+)\""
url: "$${__value.raw}"
datasourceUid: tempods

View File

@ -0,0 +1,51 @@
instance_name = nomos dashboard
;[dashboards.json]
;enabled = true
;path = /home/git/grafana/grafana-dashboards/dashboards
#################################### Auth ##########################
[auth]
disable_login_form = false
#################################### Anonymous Auth ##########################
[auth.anonymous]
# enable anonymous access
enabled = true
# specify organization name that should be used for unauthenticated users
;org_name = Public
# specify role for unauthenticated users
; org_role = Admin
org_role = Viewer
;[security]
;admin_user = ocr
;admin_password = ocr
;[users]
# disable user signup / registration
;allow_sign_up = false
# Set to true to automatically assign new users to the default organization (id 1)
;auto_assign_org = true
# Default role new users will be automatically assigned (if disabled above is set to true)
;auto_assign_org_role = Viewer
#################################### SMTP / Emailing ##########################
;[smtp]
;enabled = false
;host = localhost:25
;user =
;password =
;cert_file =
;key_file =
;skip_verify = false
;from_address = admin@grafana.localhost
;[emails]
;welcome_email_on_sign_up = false

View File

@ -0,0 +1 @@
GF_INSTALL_PLUGINS=grafana-worldmap-panel,grafana-piechart-panel,yesoreyeram-boomtheme-panel,briangann-gauge-panel,pierosavi-imageit-panel,bessler-pictureit-panel,vonage-status-panel

View File

@ -0,0 +1,4 @@
global:
evaluation_interval: 15s
external_labels:
monitor: "Monitoring"

View File

@ -0,0 +1,53 @@
stream_over_http_enabled: true
server:
http_listen_port: 3200
log_level: info
query_frontend:
search:
duration_slo: 5s
throughput_bytes_slo: 1.073741824e+09
trace_by_id:
duration_slo: 5s
distributor:
receivers: # this configuration will listen on all ports and protocols that tempo is capable of.
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
ingester:
max_block_duration: 5m # cut the headblock when this much time passes. this is being set for demo purposes and should probably be left alone normally
compactor:
compaction:
block_retention: 24h
metrics_generator:
registry:
external_labels:
source: tempo
cluster: docker-compose
storage:
path: /var/tempo/generator/wal
remote_write:
- url: http://prometheus:9090/api/v1/write
send_exemplars: true
traces_storage:
path: /var/tempo/generator/traces
storage:
trace:
backend: local # backend configuration to use
wal:
path: /var/tempo/wal # where to store the wal locally
local:
path: /var/tempo/blocks
overrides:
defaults:
metrics_generator:
processors: [service-graphs, span-metrics, local-blocks] # enables metrics generator
generate_native_histograms: both

5
testnet/scripts/run_cfgsync.sh Executable file
View File

@ -0,0 +1,5 @@
#!/bin/sh
set -e
exec /usr/bin/cfgsync-server /etc/nomos/cfgsync.yaml

View File

@ -0,0 +1,18 @@
#!/bin/sh
set -e
export CFG_FILE_PATH="/config.yaml" \
CFG_SERVER_ADDR="${CFG_SERVER_ADDR:-http://cfgsync:4400}" \
CFG_HOST_IP=$(hostname -i) \
CFG_HOST_KIND="${CFG_HOST_KIND:-executor}" \
CFG_HOST_IDENTIFIER="${CFG_HOST_IDENTIFIER:-executor-$(hostname -i)}" \
LOG_LEVEL="INFO" \
POL_PROOF_DEV_MODE=true
# Ensure recovery directory exists to avoid early crashes in services that
# persist state.
mkdir -p /recovery
/usr/bin/cfgsync-client && \
exec /usr/bin/nomos-executor /config.yaml

View File

@ -0,0 +1,18 @@
#!/bin/sh
set -e
export CFG_FILE_PATH="/config.yaml" \
CFG_SERVER_ADDR="${CFG_SERVER_ADDR:-http://cfgsync:4400}" \
CFG_HOST_IP=$(hostname -i) \
CFG_HOST_KIND="${CFG_HOST_KIND:-validator}" \
CFG_HOST_IDENTIFIER="${CFG_HOST_IDENTIFIER:-validator-$(hostname -i)}" \
LOG_LEVEL="INFO" \
POL_PROOF_DEV_MODE=true
# Ensure recovery directory exists to avoid early crashes in services that
# persist state.
mkdir -p /recovery
/usr/bin/cfgsync-client && \
exec /usr/bin/nomos-node /config.yaml

View File

@ -3,11 +3,9 @@ use std::{env, time::Duration};
use serial_test::serial;
use testing_framework_core::scenario::{Deployer as _, Runner, ScenarioBuilder};
use testing_framework_runner_compose::{ComposeRunner, ComposeRunnerError};
use tests_workflows::{
ChaosBuilderExt as _, ScenarioBuilderExt as _, expectations::ConsensusLiveness,
};
use tests_workflows::{ChaosBuilderExt as _, ScenarioBuilderExt as _};
const RUN_DURATION: Duration = Duration::from_secs(120);
const RUN_DURATION: Duration = Duration::from_secs(60);
const MIXED_TXS_PER_BLOCK: u64 = 5;
const TOTAL_WALLETS: usize = 64;
const TRANSACTION_WALLETS: usize = 8;
@ -55,20 +53,19 @@ async fn run_compose_case(validators: usize, executors: usize) {
"running compose chaos test with {validators} validator(s) and {executors} executor(s)"
);
let topology = ScenarioBuilder::with_node_counts(validators, executors)
let mut plan = ScenarioBuilder::with_node_counts(validators, executors)
.enable_node_control()
.chaos_random_restart()
.min_delay(Duration::from_secs(45))
.max_delay(Duration::from_secs(75))
.target_cooldown(Duration::from_secs(120))
// Keep chaos restarts outside the test run window to avoid crash loops on restart.
.min_delay(Duration::from_secs(120))
.max_delay(Duration::from_secs(180))
.target_cooldown(Duration::from_secs(240))
.apply()
.topology()
.network_star()
.validators(validators)
.executors(executors)
.network_star()
.apply();
let workloads = topology
.apply()
.wallets(TOTAL_WALLETS)
.transactions()
.rate(MIXED_TXS_PER_BLOCK)
@ -77,15 +74,12 @@ async fn run_compose_case(validators: usize, executors: usize) {
.da()
.channel_rate(1)
.blob_rate(1)
.apply();
let lag_allowance = 2 + (validators + executors) as u64;
let mut plan = workloads
.with_expectation(ConsensusLiveness::default().with_lag_allowance(lag_allowance))
.apply()
.with_run_duration(RUN_DURATION)
.expect_consensus_liveness()
.build();
let deployer = ComposeRunner::new().with_readiness(false);
let deployer = ComposeRunner::new();
let runner: Runner = match deployer.deploy(&plan).await {
Ok(runner) => runner,
Err(ComposeRunnerError::DockerUnavailable) => {

View File

@ -8,31 +8,32 @@ use tests_workflows::ScenarioBuilderExt as _;
const RUN_DURATION: Duration = Duration::from_secs(60);
const VALIDATORS: usize = 1;
const EXECUTORS: usize = 1;
// Kubernetes has less throughput headroom than the local runner, so we use a
// lighter per-block rate while keeping the same mixed workload shape.
const MIXED_TXS_PER_BLOCK: u64 = 2;
const MIXED_TXS_PER_BLOCK: u64 = 5;
const TOTAL_WALLETS: usize = 64;
const TRANSACTION_WALLETS: usize = 8;
#[tokio::test]
#[ignore = "requires access to a Kubernetes cluster"]
#[serial]
async fn k8s_runner_tx_workload() {
let topology = ScenarioBuilder::with_node_counts(VALIDATORS, EXECUTORS)
let mut plan = ScenarioBuilder::with_node_counts(VALIDATORS, EXECUTORS)
.topology()
.network_star()
.validators(VALIDATORS)
.executors(EXECUTORS)
.network_star()
.apply();
let workloads = topology
.apply()
.wallets(TOTAL_WALLETS)
.transactions()
.rate(MIXED_TXS_PER_BLOCK)
.users(TRANSACTION_WALLETS)
.apply()
.da()
.channel_rate(1)
.blob_rate(1)
.apply();
let mut plan = workloads.with_run_duration(RUN_DURATION).build();
.apply()
.with_run_duration(RUN_DURATION)
.expect_consensus_liveness()
.build();
let deployer = K8sRunner::new();
let runner: Runner = match deployer.deploy(&plan).await {

View File

@ -12,19 +12,13 @@ const MIXED_TXS_PER_BLOCK: u64 = 5;
const TOTAL_WALLETS: usize = 64;
const TRANSACTION_WALLETS: usize = 8;
#[tokio::test]
#[serial]
/// Drives both workloads concurrently to mimic a user mixing transaction flow
/// with blob publishing on the same topology.
async fn local_runner_mixed_workloads() {
let topology = ScenarioBuilder::with_node_counts(VALIDATORS, EXECUTORS)
fn build_plan() -> testing_framework_core::scenario::Scenario {
ScenarioBuilder::with_node_counts(VALIDATORS, EXECUTORS)
.topology()
.network_star()
.validators(VALIDATORS)
.executors(EXECUTORS)
.network_star()
.apply();
let workloads = topology
.apply()
.wallets(TOTAL_WALLETS)
.transactions()
.rate(MIXED_TXS_PER_BLOCK)
@ -33,13 +27,18 @@ async fn local_runner_mixed_workloads() {
.da()
.channel_rate(1)
.blob_rate(1)
.apply();
let mut plan = workloads
.expect_consensus_liveness()
.apply()
.with_run_duration(RUN_DURATION)
.build();
.expect_consensus_liveness()
.build()
}
#[tokio::test]
#[serial]
/// Drives both workloads concurrently to mimic a user mixing transaction flow
/// with blob publishing on the same topology.
async fn local_runner_mixed_workloads() {
let mut plan = build_plan();
let deployer = LocalDeployer::default();
let runner: Runner = deployer.deploy(&plan).await.expect("scenario deployment");
let _handle = runner.run(&mut plan).await.expect("scenario executed");