Refine demo tooling and unify runner config

This commit is contained in:
andrussal 2025-12-09 06:30:18 +01:00
parent 773b474bf8
commit 3536c22db0
13 changed files with 400 additions and 551 deletions

View File

@ -29,9 +29,18 @@ async fn main() {
tracing_subscriber::fmt::init();
let validators = read_env("COMPOSE_DEMO_VALIDATORS", DEFAULT_VALIDATORS);
let executors = read_env("COMPOSE_DEMO_EXECUTORS", DEFAULT_EXECUTORS);
let run_secs = read_env("COMPOSE_DEMO_RUN_SECS", DEFAULT_RUN_SECS);
let validators = read_env_any(
&["NOMOS_DEMO_VALIDATORS", "COMPOSE_DEMO_VALIDATORS"],
DEFAULT_VALIDATORS,
);
let executors = read_env_any(
&["NOMOS_DEMO_EXECUTORS", "COMPOSE_DEMO_EXECUTORS"],
DEFAULT_EXECUTORS,
);
let run_secs = read_env_any(
&["NOMOS_DEMO_RUN_SECS", "COMPOSE_DEMO_RUN_SECS"],
DEFAULT_RUN_SECS,
);
info!(
validators,
executors, run_secs, "starting compose runner demo"
@ -101,12 +110,15 @@ async fn run_compose_case(
runner.run(&mut plan).await.map(|_| ()).map_err(Into::into)
}
fn read_env<T>(key: &str, default: T) -> T
fn read_env_any<T>(keys: &[&str], default: T) -> T
where
T: std::str::FromStr + Copy,
{
std::env::var(key)
.ok()
.and_then(|raw| raw.parse::<T>().ok())
keys.iter()
.find_map(|key| {
std::env::var(key)
.ok()
.and_then(|raw| raw.parse::<T>().ok())
})
.unwrap_or(default)
}

View File

@ -16,9 +16,18 @@ const TRANSACTION_WALLETS: usize = 8;
async fn main() {
tracing_subscriber::fmt::init();
let validators = read_env("K8S_DEMO_VALIDATORS", DEFAULT_VALIDATORS);
let executors = read_env("K8S_DEMO_EXECUTORS", DEFAULT_EXECUTORS);
let run_secs = read_env("K8S_DEMO_RUN_SECS", DEFAULT_RUN_SECS);
let validators = read_env_any(
&["NOMOS_DEMO_VALIDATORS", "K8S_DEMO_VALIDATORS"],
DEFAULT_VALIDATORS,
);
let executors = read_env_any(
&["NOMOS_DEMO_EXECUTORS", "K8S_DEMO_EXECUTORS"],
DEFAULT_EXECUTORS,
);
let run_secs = read_env_any(
&["NOMOS_DEMO_RUN_SECS", "K8S_DEMO_RUN_SECS"],
DEFAULT_RUN_SECS,
);
info!(validators, executors, run_secs, "starting k8s runner demo");
if let Err(err) = run_k8s_case(validators, executors, Duration::from_secs(run_secs)).await {
@ -102,12 +111,15 @@ async fn run_k8s_case(
Ok(())
}
fn read_env<T>(key: &str, default: T) -> T
fn read_env_any<T>(keys: &[&str], default: T) -> T
where
T: std::str::FromStr + Copy,
{
std::env::var(key)
.ok()
.and_then(|raw| raw.parse::<T>().ok())
keys.iter()
.find_map(|key| {
std::env::var(key)
.ok()
.and_then(|raw| raw.parse::<T>().ok())
})
.unwrap_or(default)
}

View File

@ -21,9 +21,19 @@ async fn main() {
std::process::exit(1);
}
let validators = read_env("LOCAL_DEMO_VALIDATORS", DEFAULT_VALIDATORS);
let executors = read_env("LOCAL_DEMO_EXECUTORS", DEFAULT_EXECUTORS);
let run_secs = read_env("LOCAL_DEMO_RUN_SECS", DEFAULT_RUN_SECS);
let validators = read_env_any(
&["NOMOS_DEMO_VALIDATORS", "LOCAL_DEMO_VALIDATORS"],
DEFAULT_VALIDATORS,
);
let executors = read_env_any(
&["NOMOS_DEMO_EXECUTORS", "LOCAL_DEMO_EXECUTORS"],
DEFAULT_EXECUTORS,
);
let run_secs = read_env_any(
&["NOMOS_DEMO_RUN_SECS", "LOCAL_DEMO_RUN_SECS"],
DEFAULT_RUN_SECS,
);
info!(
validators,
executors, run_secs, "starting local runner demo"
@ -35,7 +45,6 @@ async fn main() {
}
}
#[rustfmt::skip]
async fn run_local_case(
validators: usize,
executors: usize,
@ -47,39 +56,38 @@ async fn run_local_case(
duration_secs = run_duration.as_secs(),
"building scenario plan"
);
let mut plan = ScenarioBuilder::topology_with(|t| {
t.network_star()
.validators(validators)
.executors(executors)
t.network_star().validators(validators).executors(executors)
})
.wallets(TOTAL_WALLETS)
.transactions_with(|txs| {
txs.rate(MIXED_TXS_PER_BLOCK)
.users(TRANSACTION_WALLETS)
})
.da_with(|da| {
da.channel_rate(1)
.blob_rate(1)
})
.with_run_duration(run_duration)
.expect_consensus_liveness()
.build();
.wallets(TOTAL_WALLETS)
.transactions_with(|txs| txs.rate(MIXED_TXS_PER_BLOCK).users(TRANSACTION_WALLETS))
.da_with(|da| da.channel_rate(1).blob_rate(1))
.with_run_duration(run_duration)
.expect_consensus_liveness()
.build();
let deployer = LocalDeployer::default().with_membership_check(true);
info!("deploying local nodes");
let runner: Runner = deployer.deploy(&plan).await?;
info!("running scenario");
runner.run(&mut plan).await.map(|_| ())?;
info!("scenario complete");
Ok(())
}
fn read_env<T>(key: &str, default: T) -> T
fn read_env_any<T>(keys: &[&str], default: T) -> T
where
T: std::str::FromStr + Copy,
{
std::env::var(key)
.ok()
.and_then(|raw| raw.parse::<T>().ok())
keys.iter()
.find_map(|key| {
std::env::var(key)
.ok()
.and_then(|raw| raw.parse::<T>().ok())
})
.unwrap_or(default)
}

View File

@ -1,74 +0,0 @@
use std::{
env,
path::Path,
process::{Command, Stdio},
};
// Manually run the local runner binary as a smoke test.
// This spins up real nodes and should be invoked explicitly:
// POL_PROOF_DEV_MODE=true cargo test -p runner-examples --test
// local_runner_bin_smoke -- --ignored --nocapture
#[test]
#[ignore = "runs local_runner binary (~2min) and requires local assets/binaries"]
fn local_runner_bin_smoke() {
// Prefer a prebuilt local_runner binary (if provided), otherwise fall back to
// cargo run.
let runner_bin = env::var("LOCAL_RUNNER_BIN").ok();
let mut cmd = match runner_bin.as_deref() {
Some(path) => {
let mut c = Command::new(path);
c.args(["--nocapture"]);
c
}
None => {
let mut c = Command::new("cargo");
c.args([
"run",
"-p",
"runner-examples",
"--bin",
"local_runner",
"--",
"--nocapture",
]);
c
}
};
let status = cmd
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.env("POL_PROOF_DEV_MODE", "true")
.env(
"NOMOS_CIRCUITS",
env::var("NOMOS_CIRCUITS")
.or_else(|_| {
let default = ".tmp/nomos-circuits";
if Path::new(default).exists() {
Ok(default.to_string())
} else {
Err(env::VarError::NotPresent)
}
})
.expect("NOMOS_CIRCUITS must be set or .tmp/nomos-circuits must exist"),
)
.env(
"LOCAL_DEMO_RUN_SECS",
env::var("LOCAL_DEMO_RUN_SECS").unwrap_or_else(|_| "120".into()),
)
.env(
"LOCAL_DEMO_VALIDATORS",
env::var("LOCAL_DEMO_VALIDATORS").unwrap_or_else(|_| "1".into()),
)
.env(
"LOCAL_DEMO_EXECUTORS",
env::var("LOCAL_DEMO_EXECUTORS").unwrap_or_else(|_| "1".into()),
)
.env("RUST_BACKTRACE", "1")
.status()
.expect("failed to spawn local runner");
if !status.success() {
panic!("local runner binary failed: status={status}");
}
}

View File

@ -4,11 +4,11 @@ set -euo pipefail
# All-in-one helper: prepare circuits (Linux + host), rebuild the image, and run
# the chosen runner binary.
#
# Usage: scripts/run-demo.sh [compose|local|k8s] [run-seconds]
# Usage: scripts/run-demo.sh [options] [compose|local|k8s]
# compose -> runs examples/src/bin/compose_runner.rs (default)
# local -> runs examples/src/bin/local_runner.rs
# k8s -> runs examples/src/bin/k8s_runner.rs
# run-seconds defaults to 60
# run-seconds must be provided via -t/--run-seconds
#
# Env overrides:
# VERSION - circuits version (default v0.3.1)
@ -17,13 +17,83 @@ set -euo pipefail
# NOMOS_CIRCUITS_REBUILD_RAPIDSNARK - set to 1 to force rapidsnark rebuild
# NOMOS_NODE_REV - nomos-node git rev for local binaries (default d2dd5a5084e1daef4032562c77d41de5e4d495f8)
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
MODE="${1:-compose}"
RUN_SECS="${2:-60}"
VERSION="${VERSION:-v0.3.1}"
usage() {
cat <<'EOF'
Usage: scripts/run-demo.sh [options] [compose|local|k8s]
Modes:
compose Run examples/src/bin/compose_runner.rs (default)
local Run examples/src/bin/local_runner.rs
k8s Run examples/src/bin/k8s_runner.rs
Options:
-t, --run-seconds N Duration to run the demo (required)
-v, --validators N Number of validators (required)
-e, --executors N Number of executors (required)
Environment:
VERSION Circuits version (default v0.3.1)
NOMOS_TESTNET_IMAGE Image tag (default nomos-testnet:local)
NOMOS_CIRCUITS_PLATFORM Override host platform detection
NOMOS_CIRCUITS_REBUILD_RAPIDSNARK Force rapidsnark rebuild
NOMOS_NODE_REV nomos-node git rev (default d2dd5a5084e1daef4032562c77d41de5e4d495f8)
NOMOS_BINARIES_TAR Path to prebuilt binaries/circuits tarball
NOMOS_SKIP_IMAGE_BUILD Set to 1 to skip rebuilding the compose/k8s image
EOF
}
fail_with_usage() {
echo "$1" >&2
usage
exit 1
}
if [ "${1:-}" = "-h" ] || [ "${1:-}" = "--help" ]; then
usage
exit 0
fi
readonly ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
readonly DEFAULT_VERSION="v0.3.1"
readonly DEFAULT_NODE_REV="d2dd5a5084e1daef4032562c77d41de5e4d495f8"
MODE="compose"
RUN_SECS_RAW=""
VERSION="${VERSION:-${DEFAULT_VERSION}}"
IMAGE="${NOMOS_TESTNET_IMAGE:-nomos-testnet:local}"
NOMOS_NODE_REV="${NOMOS_NODE_REV:-d2dd5a5084e1daef4032562c77d41de5e4d495f8}"
NOMOS_NODE_REV="${NOMOS_NODE_REV:-${DEFAULT_NODE_REV}}"
DEMO_VALIDATORS=""
DEMO_EXECUTORS=""
while [ "$#" -gt 0 ]; do
case "$1" in
-h|--help)
usage; exit 0 ;;
-t|--run-seconds)
RUN_SECS_RAW="${2:-}"; shift 2 ;;
-v|--validators)
DEMO_VALIDATORS="${2:-}"; shift 2 ;;
-e|--executors)
DEMO_EXECUTORS="${2:-}"; shift 2 ;;
compose|local|k8s)
MODE="$1"; shift ;;
*)
# Positional run-seconds fallback for legacy usage
if [ -z "${RUN_SECS_RAW_SPECIFIED:-}" ] && [[ "$1" =~ ^[0-9]+$ ]]; then
RUN_SECS_RAW="$1"
shift
else
fail_with_usage "Unknown argument: $1"
fi
;;
esac
done
RESTORED_BINARIES=0
SETUP_OUT=""
cleanup() {
if [ -n "${SETUP_OUT}" ]; then
rm -f "${SETUP_OUT}"
fi
}
trap cleanup EXIT
case "$MODE" in
compose) BIN="compose_runner" ;;
@ -32,6 +102,20 @@ case "$MODE" in
*) echo "Unknown mode '$MODE' (use compose|local)" >&2; exit 1 ;;
esac
if ! [[ "${RUN_SECS_RAW}" =~ ^[0-9]+$ ]] || [ "${RUN_SECS_RAW}" -le 0 ]; then
fail_with_usage "run-seconds must be a positive integer (pass -t/--run-seconds)"
fi
readonly RUN_SECS="${RUN_SECS_RAW}"
if [ -n "${DEMO_VALIDATORS}" ] && ! [[ "${DEMO_VALIDATORS}" =~ ^[0-9]+$ ]] ; then
fail_with_usage "validators must be a non-negative integer (pass -v/--validators)"
fi
if [ -n "${DEMO_EXECUTORS}" ] && ! [[ "${DEMO_EXECUTORS}" =~ ^[0-9]+$ ]] ; then
fail_with_usage "executors must be a non-negative integer (pass -e/--executors)"
fi
if [ -z "${DEMO_VALIDATORS}" ] || [ -z "${DEMO_EXECUTORS}" ]; then
fail_with_usage "validators and executors must be provided via -v/--validators and -e/--executors"
fi
restore_binaries_from_tar() {
local tar_path="${NOMOS_BINARIES_TAR:-${ROOT_DIR}/.tmp/nomos-binaries.tar.gz}"
local extract_dir="${ROOT_DIR}/.tmp/nomos-binaries"
@ -48,19 +132,22 @@ restore_binaries_from_tar() {
local circuits_dst="${ROOT_DIR}/testing-framework/assets/stack/kzgrs_test_params"
if [ -f "${src}/nomos-node" ] && [ -f "${src}/nomos-executor" ] && [ -f "${src}/nomos-cli" ]; then
mkdir -p "${bin_dst}"
cp "${src}/nomos-node" "${bin_dst}/"
cp "${src}/nomos-executor" "${bin_dst}/"
cp "${src}/nomos-cli" "${bin_dst}/"
cp "${src}/nomos-node" "${src}/nomos-executor" "${src}/nomos-cli" "${bin_dst}/"
else
echo "Binaries missing in ${tar_path}; fallback to build-from-source path" >&2
echo "Binaries missing in ${tar_path}; fallback to build-from-source path (run build-binaries workflow to populate)" >&2
return 1
fi
if [ -d "${circuits_src}" ] && [ -f "${circuits_src}/kzgrs_test_params" ]; then
rm -rf "${circuits_dst}"
mkdir -p "${circuits_dst}"
rsync -a --delete "${circuits_src}/" "${circuits_dst}/"
if command -v rsync >/dev/null 2>&1; then
rsync -a --delete "${circuits_src}/" "${circuits_dst}/"
else
rm -rf "${circuits_dst:?}/"*
cp -a "${circuits_src}/." "${circuits_dst}/"
fi
else
echo "Circuits missing in ${tar_path}; fallback to download/build path" >&2
echo "Circuits missing in ${tar_path}; fallback to download/build path (run build-binaries workflow to populate)" >&2
return 1
fi
RESTORED_BINARIES=1
@ -100,6 +187,7 @@ ensure_host_binaries() {
git checkout "${NOMOS_NODE_REV}"
git reset --hard
git clean -fdx
echo "-> Compiling host binaries (may take a few minutes)..."
RUSTFLAGS='--cfg feature="pol-dev-mode"' \
NOMOS_CIRCUITS="${HOST_BUNDLE_PATH}" \
cargo build --features "testing" \
@ -114,7 +202,7 @@ ensure_host_binaries() {
restore_binaries_from_tar || true
echo "==> Preparing circuits (version ${VERSION})"
SETUP_OUT="/tmp/nomos-setup-output.$$"
SETUP_OUT="$(mktemp -t nomos-setup-output.XXXXXX)"
if [ "${RESTORED_BINARIES}" -ne 1 ]; then
"${ROOT_DIR}/scripts/setup-circuits-stack.sh" "${VERSION}" </dev/null | tee "$SETUP_OUT"
else
@ -127,7 +215,6 @@ if [ -d "${ROOT_DIR}/.tmp/nomos-circuits-host" ]; then
else
HOST_BUNDLE_PATH="${ROOT_DIR}/testing-framework/assets/stack/kzgrs_test_params"
fi
rm -f "$SETUP_OUT"
# If the host bundle was somehow pruned, repair it once more.
if [ ! -x "${HOST_BUNDLE_PATH}/zksign/witness_generator" ]; then
@ -164,6 +251,8 @@ POL_PROOF_DEV_MODE=true \
NOMOS_TESTNET_IMAGE="${IMAGE}" \
NOMOS_CIRCUITS="${HOST_BUNDLE_PATH}" \
NOMOS_KZGRS_PARAMS_PATH="${KZG_PATH}" \
${DEMO_VALIDATORS:+NOMOS_DEMO_VALIDATORS="${DEMO_VALIDATORS}"} \
${DEMO_EXECUTORS:+NOMOS_DEMO_EXECUTORS="${DEMO_EXECUTORS}"} \
COMPOSE_DEMO_RUN_SECS="${RUN_SECS}" \
LOCAL_DEMO_RUN_SECS="${RUN_SECS}" \
K8S_DEMO_RUN_SECS="${RUN_SECS}" \

View File

@ -1,60 +0,0 @@
# Docker Compose Testnet for Nomos
The Nomos Docker Compose Testnet contains four distinct service types:
- **Nomos Node Services**: Multiple dynamically spawned Nomos nodes that synchronizes their configuration via cfgsync utility.
## Building
Upon making modifications to the codebase or the Dockerfile, the Nomos images must be rebuilt:
```bash
docker compose build
```
## Configuring
Configuration of the Docker testnet is accomplished using the `.env` file. An example configuration can be found in `.env.example`.
To adjust the count of Nomos nodes, modify the variable:
```bash
DOCKER_COMPOSE_LIBP2P_REPLICAS=100
```
## Running
Initiate the testnet by executing the following command:
```bash
docker compose up
```
This command will merge all output logs and display them in Stdout. For a more refined output, it's recommended to first run:
```bash
docker compose up -d
```
Followed by:
```bash
docker compose logs -f nomos-node
```
## Using testnet
Bootstrap node is accessible from the host via `3000` and `18080` ports. To expose other nomos nodes, please update `nomos-node` service in the `compose.yml` file with this configuration:
```bash
nomos-node-0:
ports:
- "3001-3010:3000" # Use range depending on the number of nomos node replicas.
- "18081-18190:18080"
```
After running `docker compose up`, the randomly assigned ports can be viewed with `ps` command:
```bash
docker compose ps
```

View File

@ -0,0 +1,102 @@
use std::{num::NonZeroU64, path::PathBuf, time::Duration};
use blend_serde::Config as BlendUserConfig;
use key_management_system_service::keys::{Key, ZkKey};
use nomos_blend_service::{
core::settings::{CoverTrafficSettings, MessageDelayerSettings, SchedulerSettings, ZkSettings},
settings::TimingSettings,
};
use nomos_node::config::{
blend::{
deployment::{self as blend_deployment, Settings as BlendDeploymentSettings},
serde as blend_serde,
},
network::deployment::Settings as NetworkDeploymentSettings,
};
use nomos_utils::math::NonNegativeF64;
use crate::{
common::kms::key_id_for_preload_backend,
topology::configs::blend::GeneralBlendConfig as TopologyBlendConfig,
};
pub(crate) fn build_blend_service_config(
config: &TopologyBlendConfig,
) -> (
BlendUserConfig,
BlendDeploymentSettings,
NetworkDeploymentSettings,
) {
let zk_key_id =
key_id_for_preload_backend(&Key::from(ZkKey::new(config.secret_zk_key.clone())));
let backend_core = &config.backend_core;
let backend_edge = &config.backend_edge;
let user = BlendUserConfig {
common: blend_serde::common::Config {
non_ephemeral_signing_key: config.private_key.clone(),
// Disable on-disk recovery in tests to avoid serde issues on replays.
recovery_path_prefix: PathBuf::new(),
},
core: blend_serde::core::Config {
backend: blend_serde::core::BackendConfig {
listening_address: backend_core.listening_address.clone(),
core_peering_degree: backend_core.core_peering_degree.clone(),
edge_node_connection_timeout: backend_core.edge_node_connection_timeout,
max_edge_node_incoming_connections: backend_core.max_edge_node_incoming_connections,
max_dial_attempts_per_peer: backend_core.max_dial_attempts_per_peer,
},
zk: ZkSettings {
secret_key_kms_id: zk_key_id,
},
},
edge: blend_serde::edge::Config {
backend: blend_serde::edge::BackendConfig {
max_dial_attempts_per_peer_per_message: backend_edge
.max_dial_attempts_per_peer_per_message,
replication_factor: backend_edge.replication_factor,
},
},
};
let deployment_settings = BlendDeploymentSettings {
common: blend_deployment::CommonSettings {
num_blend_layers: NonZeroU64::try_from(1).unwrap(),
minimum_network_size: NonZeroU64::try_from(1).unwrap(),
timing: TimingSettings {
round_duration: Duration::from_secs(1),
rounds_per_interval: NonZeroU64::try_from(30u64).unwrap(),
rounds_per_session: NonZeroU64::try_from(648_000u64).unwrap(),
rounds_per_observation_window: NonZeroU64::try_from(30u64).unwrap(),
rounds_per_session_transition_period: NonZeroU64::try_from(30u64).unwrap(),
epoch_transition_period_in_slots: NonZeroU64::try_from(2_600).unwrap(),
},
protocol_name: backend_core.protocol_name.clone(),
},
core: blend_deployment::CoreSettings {
scheduler: SchedulerSettings {
cover: CoverTrafficSettings {
intervals_for_safety_buffer: 100,
message_frequency_per_round: NonNegativeF64::try_from(1f64).unwrap(),
},
delayer: MessageDelayerSettings {
maximum_release_delay_in_rounds: NonZeroU64::try_from(3u64).unwrap(),
},
},
minimum_messages_coefficient: backend_core.minimum_messages_coefficient,
normalization_constant: backend_core.normalization_constant,
},
};
let network_deployment = NetworkDeploymentSettings {
identify_protocol_name: nomos_libp2p::protocol_name::StreamProtocol::new(
"/integration/nomos/identify/1.0.0",
),
kademlia_protocol_name: nomos_libp2p::protocol_name::StreamProtocol::new(
"/integration/nomos/kad/1.0.0",
),
};
(user, deployment_settings, network_deployment)
}

View File

@ -0,0 +1,88 @@
use std::{collections::HashSet, num::NonZeroUsize, path::PathBuf, time::Duration};
use chain_leader::LeaderConfig as ChainLeaderConfig;
use chain_network::{BootstrapConfig as ChainBootstrapConfig, OrphanConfig, SyncConfig};
use chain_service::StartingState;
use nomos_node::config::{
cryptarchia::{
deployment::{SdpConfig as DeploymentSdpConfig, Settings as CryptarchiaDeploymentSettings},
serde::{
Config as CryptarchiaConfig, LeaderConfig as CryptarchiaLeaderConfig,
NetworkConfig as CryptarchiaNetworkConfig, ServiceConfig as CryptarchiaServiceConfig,
},
},
mempool::deployment::Settings as MempoolDeploymentSettings,
time::deployment::Settings as TimeDeploymentSettings,
};
use crate::topology::configs::GeneralConfig;
pub(crate) fn cryptarchia_deployment(config: &GeneralConfig) -> CryptarchiaDeploymentSettings {
CryptarchiaDeploymentSettings {
epoch_config: config.consensus_config.ledger_config.epoch_config,
consensus_config: config.consensus_config.ledger_config.consensus_config,
sdp_config: DeploymentSdpConfig {
service_params: config
.consensus_config
.ledger_config
.sdp_config
.service_params
.clone(),
min_stake: config.consensus_config.ledger_config.sdp_config.min_stake,
},
gossipsub_protocol: "/cryptarchia/proto".to_owned(),
}
}
pub(crate) fn time_deployment(config: &GeneralConfig) -> TimeDeploymentSettings {
TimeDeploymentSettings {
slot_duration: config.time_config.slot_duration,
}
}
pub(crate) fn mempool_deployment() -> MempoolDeploymentSettings {
MempoolDeploymentSettings {
pubsub_topic: "mantle".to_owned(),
}
}
pub(crate) fn cryptarchia_config(config: &GeneralConfig) -> CryptarchiaConfig {
CryptarchiaConfig {
service: CryptarchiaServiceConfig {
starting_state: StartingState::Genesis {
genesis_tx: config.consensus_config.genesis_tx.clone(),
},
// Disable on-disk recovery in compose tests to avoid serde errors on
// non-string keys and keep services alive.
recovery_file: PathBuf::new(),
bootstrap: chain_service::BootstrapConfig {
prolonged_bootstrap_period: config.bootstrapping_config.prolonged_bootstrap_period,
force_bootstrap: false,
offline_grace_period: chain_service::OfflineGracePeriodConfig {
grace_period: Duration::from_secs(20 * 60),
state_recording_interval: Duration::from_secs(60),
},
},
},
network: CryptarchiaNetworkConfig {
bootstrap: ChainBootstrapConfig {
ibd: chain_network::IbdConfig {
peers: HashSet::new(),
delay_before_new_download: Duration::from_secs(10),
},
},
sync: SyncConfig {
orphan: OrphanConfig {
max_orphan_cache_size: NonZeroUsize::new(5)
.expect("Max orphan cache size must be non-zero"),
},
},
},
leader: CryptarchiaLeaderConfig {
leader: ChainLeaderConfig {
pk: config.consensus_config.leader_config.pk,
sk: config.consensus_config.leader_config.sk.clone(),
},
},
}
}

View File

@ -1,18 +1,5 @@
use std::{
collections::HashSet,
num::{NonZeroU64, NonZeroUsize},
path::PathBuf,
time::Duration,
};
use std::{collections::HashSet, path::PathBuf, time::Duration};
use chain_leader::LeaderConfig as ChainLeaderConfig;
use chain_network::{BootstrapConfig as ChainBootstrapConfig, OrphanConfig, SyncConfig};
use chain_service::StartingState;
use key_management_system_service::keys::{Key, ZkKey};
use nomos_blend_service::{
core::settings::{CoverTrafficSettings, MessageDelayerSettings, SchedulerSettings, ZkSettings},
settings::TimingSettings,
};
use nomos_da_dispersal::{
DispersalServiceSettings,
backend::kzgrs::{DispersalKZGRSBackendSettings, EncoderSettings},
@ -39,26 +26,8 @@ use nomos_node::{
RocksBackendSettings,
api::backend::AxumBackendSettings as NodeAxumBackendSettings,
config::{
blend::{
deployment::{self as blend_deployment},
serde as blend_serde,
},
cryptarchia::{
deployment::{
SdpConfig as DeploymentSdpConfig, Settings as CryptarchiaDeploymentSettings,
},
serde::{
Config as CryptarchiaConfig, LeaderConfig as CryptarchiaLeaderConfig,
NetworkConfig as CryptarchiaNetworkConfig,
ServiceConfig as CryptarchiaServiceConfig,
},
},
deployment::DeploymentSettings,
mempool::{
deployment::Settings as MempoolDeploymentSettings, serde::Config as MempoolConfig,
},
network::deployment::Settings as NetworkDeploymentSettings,
time::{deployment::Settings as TimeDeploymentSettings, serde::Config as TimeConfig},
deployment::DeploymentSettings, mempool::serde::Config as MempoolConfig,
time::serde::Config as TimeConfig,
},
};
use nomos_sdp::SdpSettings;
@ -68,88 +37,33 @@ use nomos_wallet::WalletServiceSettings;
use crate::{
adjust_timeout,
common::kms::key_id_for_preload_backend,
topology::configs::{
GeneralConfig, blend::GeneralBlendConfig as TopologyBlendConfig, wallet::WalletAccount,
nodes::{
blend::build_blend_service_config,
common::{cryptarchia_config, cryptarchia_deployment, mempool_deployment, time_deployment},
},
topology::configs::{GeneralConfig, wallet::WalletAccount},
};
#[must_use]
#[expect(clippy::too_many_lines, reason = "TODO: Address this at some point.")]
pub fn create_executor_config(config: GeneralConfig) -> ExecutorConfig {
let network_config = config.network_config.clone();
let (blend_user_config, blend_deployment, network_deployment) =
build_blend_service_config(&config.blend_config);
let cryptarchia_deployment = CryptarchiaDeploymentSettings {
epoch_config: config.consensus_config.ledger_config.epoch_config,
consensus_config: config.consensus_config.ledger_config.consensus_config,
sdp_config: DeploymentSdpConfig {
service_params: config
.consensus_config
.ledger_config
.sdp_config
.service_params
.clone(),
min_stake: config.consensus_config.ledger_config.sdp_config.min_stake,
},
gossipsub_protocol: "/cryptarchia/proto".to_owned(),
};
let time_deployment = TimeDeploymentSettings {
slot_duration: config.time_config.slot_duration,
};
let mempool_deployment = MempoolDeploymentSettings {
pubsub_topic: "mantle".to_owned(),
};
let deployment_settings = DeploymentSettings::new_custom(
blend_deployment,
network_deployment,
cryptarchia_deployment,
time_deployment,
mempool_deployment,
cryptarchia_deployment(&config),
time_deployment(&config),
mempool_deployment(),
);
ExecutorConfig {
network: config.network_config,
network: network_config,
blend: blend_user_config,
deployment: deployment_settings,
cryptarchia: CryptarchiaConfig {
service: CryptarchiaServiceConfig {
starting_state: StartingState::Genesis {
genesis_tx: config.consensus_config.genesis_tx,
},
// Disable on-disk recovery in compose tests to avoid serde errors on
// non-string keys and keep services alive.
recovery_file: PathBuf::new(),
bootstrap: chain_service::BootstrapConfig {
prolonged_bootstrap_period: config
.bootstrapping_config
.prolonged_bootstrap_period,
force_bootstrap: false,
offline_grace_period: chain_service::OfflineGracePeriodConfig {
grace_period: Duration::from_secs(20 * 60),
state_recording_interval: Duration::from_secs(60),
},
},
},
network: CryptarchiaNetworkConfig {
bootstrap: ChainBootstrapConfig {
ibd: chain_network::IbdConfig {
peers: HashSet::new(),
delay_before_new_download: Duration::from_secs(10),
},
},
sync: SyncConfig {
orphan: OrphanConfig {
max_orphan_cache_size: NonZeroUsize::new(5)
.expect("Max orphan cache size must be non-zero"),
},
},
},
leader: CryptarchiaLeaderConfig {
leader: ChainLeaderConfig {
pk: config.consensus_config.leader_config.pk,
sk: config.consensus_config.leader_config.sk.clone(),
},
},
},
cryptarchia: cryptarchia_config(&config),
da_network: DaNetworkConfig {
backend: DaNetworkExecutorBackendSettings {
validator_settings: DaNetworkBackendSettings {
@ -277,84 +191,3 @@ pub fn create_executor_config(config: GeneralConfig) -> ExecutorConfig {
},
}
}
fn build_blend_service_config(
config: &TopologyBlendConfig,
) -> (
blend_serde::Config,
blend_deployment::Settings,
NetworkDeploymentSettings,
) {
let zk_key_id =
key_id_for_preload_backend(&Key::from(ZkKey::new(config.secret_zk_key.clone())));
let backend_core = &config.backend_core;
let backend_edge = &config.backend_edge;
let user = blend_serde::Config {
common: blend_serde::common::Config {
non_ephemeral_signing_key: config.private_key.clone(),
// Disable on-disk recovery in tests to avoid serde issues on replays.
recovery_path_prefix: PathBuf::new(),
},
core: blend_serde::core::Config {
backend: blend_serde::core::BackendConfig {
listening_address: backend_core.listening_address.clone(),
core_peering_degree: backend_core.core_peering_degree.clone(),
edge_node_connection_timeout: backend_core.edge_node_connection_timeout,
max_edge_node_incoming_connections: backend_core.max_edge_node_incoming_connections,
max_dial_attempts_per_peer: backend_core.max_dial_attempts_per_peer,
},
zk: ZkSettings {
secret_key_kms_id: zk_key_id,
},
},
edge: blend_serde::edge::Config {
backend: blend_serde::edge::BackendConfig {
max_dial_attempts_per_peer_per_message: backend_edge
.max_dial_attempts_per_peer_per_message,
replication_factor: backend_edge.replication_factor,
},
},
};
let deployment_settings = blend_deployment::Settings {
common: blend_deployment::CommonSettings {
num_blend_layers: NonZeroU64::try_from(1).unwrap(),
minimum_network_size: NonZeroU64::try_from(1).unwrap(),
timing: TimingSettings {
round_duration: Duration::from_secs(1),
rounds_per_interval: NonZeroU64::try_from(30u64).unwrap(),
rounds_per_session: NonZeroU64::try_from(648_000u64).unwrap(),
rounds_per_observation_window: NonZeroU64::try_from(30u64).unwrap(),
rounds_per_session_transition_period: NonZeroU64::try_from(30u64).unwrap(),
epoch_transition_period_in_slots: NonZeroU64::try_from(2_600).unwrap(),
},
protocol_name: backend_core.protocol_name.clone(),
},
core: blend_deployment::CoreSettings {
scheduler: SchedulerSettings {
cover: CoverTrafficSettings {
intervals_for_safety_buffer: 100,
message_frequency_per_round: NonNegativeF64::try_from(1f64).unwrap(),
},
delayer: MessageDelayerSettings {
maximum_release_delay_in_rounds: NonZeroU64::try_from(3u64).unwrap(),
},
},
minimum_messages_coefficient: backend_core.minimum_messages_coefficient,
normalization_constant: backend_core.normalization_constant,
},
};
let network_deployment = NetworkDeploymentSettings {
identify_protocol_name: nomos_libp2p::protocol_name::StreamProtocol::new(
"/integration/nomos/identify/1.0.0",
),
kademlia_protocol_name: nomos_libp2p::protocol_name::StreamProtocol::new(
"/integration/nomos/kad/1.0.0",
),
};
(user, deployment_settings, network_deployment)
}

View File

@ -1,2 +1,4 @@
pub(crate) mod blend;
pub(crate) mod common;
pub mod executor;
pub mod validator;

View File

@ -1,18 +1,5 @@
use std::{
collections::HashSet,
num::{NonZeroU64, NonZeroUsize},
path::PathBuf,
time::Duration,
};
use std::{collections::HashSet, path::PathBuf, time::Duration};
use chain_leader::LeaderConfig as ChainLeaderConfig;
use chain_network::{BootstrapConfig as ChainBootstrapConfig, OrphanConfig, SyncConfig};
use chain_service::StartingState;
use key_management_system_service::keys::{Key, ZkKey};
use nomos_blend_service::{
core::settings::{CoverTrafficSettings, MessageDelayerSettings, SchedulerSettings, ZkSettings},
settings::TimingSettings,
};
use nomos_da_network_core::{
protocols::sampling::SubnetsConfig, swarm::DAConnectionPolicySettings,
};
@ -33,26 +20,8 @@ use nomos_node::{
Config as ValidatorConfig, RocksBackendSettings,
api::backend::AxumBackendSettings as NodeAxumBackendSettings,
config::{
blend::{
deployment::{self as blend_deployment},
serde as blend_serde,
},
cryptarchia::{
deployment::{
SdpConfig as DeploymentSdpConfig, Settings as CryptarchiaDeploymentSettings,
},
serde::{
Config as CryptarchiaConfig, LeaderConfig as CryptarchiaLeaderConfig,
NetworkConfig as CryptarchiaNetworkConfig,
ServiceConfig as CryptarchiaServiceConfig,
},
},
deployment::DeploymentSettings,
mempool::{
deployment::Settings as MempoolDeploymentSettings, serde::Config as MempoolConfig,
},
network::deployment::Settings as NetworkDeploymentSettings,
time::{deployment::Settings as TimeDeploymentSettings, serde::Config as TimeConfig},
deployment::DeploymentSettings, mempool::serde::Config as MempoolConfig,
time::serde::Config as TimeConfig,
},
};
use nomos_sdp::SdpSettings;
@ -62,10 +31,11 @@ use nomos_wallet::WalletServiceSettings;
use crate::{
adjust_timeout,
common::kms::key_id_for_preload_backend,
topology::configs::{
GeneralConfig, blend::GeneralBlendConfig as TopologyBlendConfig, wallet::WalletAccount,
nodes::{
blend::build_blend_service_config,
common::{cryptarchia_config, cryptarchia_deployment, mempool_deployment, time_deployment},
},
topology::configs::{GeneralConfig, wallet::WalletAccount},
};
#[must_use]
@ -74,80 +44,22 @@ use crate::{
reason = "Validator config wiring aggregates many service settings"
)]
pub fn create_validator_config(config: GeneralConfig) -> ValidatorConfig {
let da_policy_settings = config.da_config.policy_settings;
let da_policy_settings = config.da_config.policy_settings.clone();
let network_config = config.network_config.clone();
let (blend_user_config, blend_deployment, network_deployment) =
build_blend_service_config(&config.blend_config);
let cryptarchia_deployment = CryptarchiaDeploymentSettings {
epoch_config: config.consensus_config.ledger_config.epoch_config,
consensus_config: config.consensus_config.ledger_config.consensus_config,
sdp_config: DeploymentSdpConfig {
service_params: config
.consensus_config
.ledger_config
.sdp_config
.service_params
.clone(),
min_stake: config.consensus_config.ledger_config.sdp_config.min_stake,
},
gossipsub_protocol: "/cryptarchia/proto".to_owned(),
};
let time_deployment = TimeDeploymentSettings {
slot_duration: config.time_config.slot_duration,
};
let mempool_deployment = MempoolDeploymentSettings {
pubsub_topic: "mantle".to_owned(),
};
let deployment_settings = DeploymentSettings::new_custom(
blend_deployment,
network_deployment,
cryptarchia_deployment,
time_deployment,
mempool_deployment,
cryptarchia_deployment(&config),
time_deployment(&config),
mempool_deployment(),
);
ValidatorConfig {
network: config.network_config,
network: network_config,
blend: blend_user_config,
deployment: deployment_settings,
cryptarchia: CryptarchiaConfig {
service: CryptarchiaServiceConfig {
starting_state: StartingState::Genesis {
genesis_tx: config.consensus_config.genesis_tx,
},
// Disable on-disk recovery in compose tests to avoid serde errors on
// non-string keys and keep services alive.
recovery_file: PathBuf::new(),
bootstrap: chain_service::BootstrapConfig {
prolonged_bootstrap_period: config
.bootstrapping_config
.prolonged_bootstrap_period,
force_bootstrap: false,
offline_grace_period: chain_service::OfflineGracePeriodConfig {
grace_period: Duration::from_secs(20 * 60),
state_recording_interval: Duration::from_secs(60),
},
},
},
network: CryptarchiaNetworkConfig {
bootstrap: ChainBootstrapConfig {
ibd: chain_network::IbdConfig {
peers: HashSet::new(),
delay_before_new_download: Duration::from_secs(10),
},
},
sync: SyncConfig {
orphan: OrphanConfig {
max_orphan_cache_size: NonZeroUsize::new(5)
.expect("Max orphan cache size must be non-zero"),
},
},
},
leader: CryptarchiaLeaderConfig {
leader: ChainLeaderConfig {
pk: config.consensus_config.leader_config.pk,
sk: config.consensus_config.leader_config.sk.clone(),
},
},
},
cryptarchia: cryptarchia_config(&config),
da_network: DaNetworkConfig {
backend: DaNetworkBackendSettings {
node_key: config.da_config.node_key,
@ -266,84 +178,3 @@ pub fn create_validator_config(config: GeneralConfig) -> ValidatorConfig {
},
}
}
fn build_blend_service_config(
config: &TopologyBlendConfig,
) -> (
blend_serde::Config,
blend_deployment::Settings,
NetworkDeploymentSettings,
) {
let zk_key_id =
key_id_for_preload_backend(&Key::from(ZkKey::new(config.secret_zk_key.clone())));
let backend_core = &config.backend_core;
let backend_edge = &config.backend_edge;
let user = blend_serde::Config {
common: blend_serde::common::Config {
non_ephemeral_signing_key: config.private_key.clone(),
// Disable on-disk recovery in tests to avoid serde issues on replays.
recovery_path_prefix: PathBuf::new(),
},
core: blend_serde::core::Config {
backend: blend_serde::core::BackendConfig {
listening_address: backend_core.listening_address.clone(),
core_peering_degree: backend_core.core_peering_degree.clone(),
edge_node_connection_timeout: backend_core.edge_node_connection_timeout,
max_edge_node_incoming_connections: backend_core.max_edge_node_incoming_connections,
max_dial_attempts_per_peer: backend_core.max_dial_attempts_per_peer,
},
zk: ZkSettings {
secret_key_kms_id: zk_key_id,
},
},
edge: blend_serde::edge::Config {
backend: blend_serde::edge::BackendConfig {
max_dial_attempts_per_peer_per_message: backend_edge
.max_dial_attempts_per_peer_per_message,
replication_factor: backend_edge.replication_factor,
},
},
};
let deployment_settings = blend_deployment::Settings {
common: blend_deployment::CommonSettings {
num_blend_layers: NonZeroU64::try_from(1).unwrap(),
minimum_network_size: NonZeroU64::try_from(1).unwrap(),
timing: TimingSettings {
round_duration: Duration::from_secs(1),
rounds_per_interval: NonZeroU64::try_from(30u64).unwrap(),
rounds_per_session: NonZeroU64::try_from(648_000u64).unwrap(),
rounds_per_observation_window: NonZeroU64::try_from(30u64).unwrap(),
rounds_per_session_transition_period: NonZeroU64::try_from(30u64).unwrap(),
epoch_transition_period_in_slots: NonZeroU64::try_from(2_600).unwrap(),
},
protocol_name: backend_core.protocol_name.clone(),
},
core: blend_deployment::CoreSettings {
scheduler: SchedulerSettings {
cover: CoverTrafficSettings {
intervals_for_safety_buffer: 100,
message_frequency_per_round: NonNegativeF64::try_from(1f64).unwrap(),
},
delayer: MessageDelayerSettings {
maximum_release_delay_in_rounds: NonZeroU64::try_from(3u64).unwrap(),
},
},
minimum_messages_coefficient: backend_core.minimum_messages_coefficient,
normalization_constant: backend_core.normalization_constant,
},
};
let network_deployment = NetworkDeploymentSettings {
identify_protocol_name: nomos_libp2p::protocol_name::StreamProtocol::new(
"/integration/nomos/identify/1.0.0",
),
kademlia_protocol_name: nomos_libp2p::protocol_name::StreamProtocol::new(
"/integration/nomos/kad/1.0.0",
),
};
(user, deployment_settings, network_deployment)
}

View File

@ -24,5 +24,6 @@ impl RequiresNodeControl for NodeControlCapability {
#[async_trait]
pub trait NodeControlHandle: Send + Sync {
async fn restart_validator(&self, index: usize) -> Result<(), DynError>;
async fn restart_executor(&self, index: usize) -> Result<(), DynError>;
}

View File

@ -35,13 +35,16 @@ non_zero_rate_fn!(blob_rate_checked, "blob rate must be non-zero");
pub trait ScenarioBuilderExt<Caps>: Sized {
/// Configure a transaction flow workload.
fn transactions(self) -> TransactionFlowBuilder<Caps>;
/// Configure a transaction flow workload via closure.
fn transactions_with(
self,
f: impl FnOnce(TransactionFlowBuilder<Caps>) -> TransactionFlowBuilder<Caps>,
) -> CoreScenarioBuilder<Caps>;
/// Configure a data-availability workload.
fn da(self) -> DataAvailabilityFlowBuilder<Caps>;
/// Configure a data-availability workload via closure.
fn da_with(
self,
@ -50,6 +53,7 @@ pub trait ScenarioBuilderExt<Caps>: Sized {
#[must_use]
/// Attach a consensus liveness expectation.
fn expect_consensus_liveness(self) -> Self;
#[must_use]
/// Seed deterministic wallets with total funds split across `users`.
fn initialize_wallet(self, total_funds: u64, users: usize) -> Self;
@ -209,6 +213,7 @@ impl<Caps> DataAvailabilityFlowBuilder<Caps> {
pub trait ChaosBuilderExt: Sized {
/// Entry point into chaos workloads.
fn chaos(self) -> ChaosBuilder;
/// Configure chaos via closure.
fn chaos_with(
self,