chore: consolidate nodes

This commit is contained in:
andrussal 2026-01-20 12:50:40 +01:00
parent 5304c3e808
commit 67aaab0a2c
132 changed files with 2091 additions and 4153 deletions

View File

@ -31,7 +31,6 @@ allow = [
"BSD-2-Clause",
"BSD-3-Clause",
"BSL-1.0",
"BlueOak-1.0.0",
"CC0-1.0",
"CDLA-Permissive-2.0",
"ISC",

3030
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -50,12 +50,10 @@ chain-service = { package = "logos-blockchain-chain-service", default-features =
common-http-client = { package = "logos-blockchain-common-http-client", default-features = false, git = "https://github.com/logos-co/nomos-node.git", rev = "97b411ed0ce269e72a6253c8cd48eea41db5ab71" }
cryptarchia-engine = { package = "logos-blockchain-cryptarchia-engine", default-features = false, git = "https://github.com/logos-co/nomos-node.git", rev = "97b411ed0ce269e72a6253c8cd48eea41db5ab71" }
cryptarchia-sync = { package = "logos-blockchain-cryptarchia-sync", default-features = false, git = "https://github.com/logos-co/nomos-node.git", rev = "97b411ed0ce269e72a6253c8cd48eea41db5ab71" }
executor-http-client = { package = "logos-blockchain-executor-http-client", default-features = false, git = "https://github.com/logos-co/nomos-node.git", rev = "97b411ed0ce269e72a6253c8cd48eea41db5ab71" }
groth16 = { package = "logos-blockchain-groth16", default-features = false, git = "https://github.com/logos-co/nomos-node.git", rev = "97b411ed0ce269e72a6253c8cd48eea41db5ab71" }
key-management-system-service = { package = "logos-blockchain-key-management-system-service", default-features = false, git = "https://github.com/logos-co/nomos-node.git", rev = "97b411ed0ce269e72a6253c8cd48eea41db5ab71" }
kzgrs = { package = "logos-blockchain-kzgrs", default-features = false, git = "https://github.com/logos-co/nomos-node.git", rev = "97b411ed0ce269e72a6253c8cd48eea41db5ab71" }
kzgrs-backend = { package = "logos-blockchain-kzgrs-backend", default-features = false, git = "https://github.com/logos-co/nomos-node.git", rev = "97b411ed0ce269e72a6253c8cd48eea41db5ab71" }
logos-blockchain-executor = { package = "logos-blockchain-executor", default-features = false, git = "https://github.com/logos-co/nomos-node.git", rev = "97b411ed0ce269e72a6253c8cd48eea41db5ab71" }
nomos-api = { package = "logos-blockchain-api-service", default-features = false, git = "https://github.com/logos-co/nomos-node.git", rev = "97b411ed0ce269e72a6253c8cd48eea41db5ab71" }
nomos-blend-message = { package = "logos-blockchain-blend-message", default-features = false, git = "https://github.com/logos-co/nomos-node.git", rev = "97b411ed0ce269e72a6253c8cd48eea41db5ab71" }
nomos-blend-service = { package = "logos-blockchain-blend-service", default-features = false, git = "https://github.com/logos-co/nomos-node.git", rev = "97b411ed0ce269e72a6253c8cd48eea41db5ab71" }

View File

@ -101,7 +101,7 @@ cd book && mdbook serve
cargo test
# Run integration examples
scripts/run/run-examples.sh -t 60 -v 2 -e 1 host
scripts/run/run-examples.sh -t 60 -n 3 host
```
### Creating Prebuilt Bundles
@ -125,8 +125,7 @@ Key environment variables for customization:
|----------|---------|---------|
| `POL_PROOF_DEV_MODE=true` | **Required** — Disable expensive proof generation (set automatically by `scripts/run/run-examples.sh`) | (none) |
| `NOMOS_TESTNET_IMAGE` | Docker image tag for compose/k8s | `logos-blockchain-testing:local` |
| `NOMOS_DEMO_VALIDATORS` | Number of validator nodes | Varies by example |
| `NOMOS_DEMO_EXECUTORS` | Number of executor nodes | Varies by example |
| `NOMOS_DEMO_NODES` | Number of nodes | Varies by example |
| `NOMOS_LOG_DIR` | Directory for persistent log files | (temporary) |
| `NOMOS_LOG_LEVEL` | Logging verbosity | `info` |

View File

@ -2,7 +2,7 @@ Feature: Testing Framework - Auto Local/Compose Deployer
Scenario: Run auto deployer smoke scenario (tx + liveness)
Given we have a CLI deployer specified
And topology has 1 validators and 1 executors
And topology has 2 nodes
And run duration is 60 seconds
And wallets total funds is 1000000000 split across 50 users
And transactions rate is 1 per block
@ -13,7 +13,7 @@ Feature: Testing Framework - Auto Local/Compose Deployer
# Note: This test may fail on slow computers
Scenario: Run auto deployer stress smoke scenario (tx + liveness)
Given we have a CLI deployer specified
And topology has 3 validators and 3 executors
And topology has 6 nodes
And run duration is 120 seconds
And wallets total funds is 1000000000 split across 500 users
And transactions rate is 10 per block
@ -23,7 +23,7 @@ Feature: Testing Framework - Auto Local/Compose Deployer
Scenario: Run auto deployer stress smoke scenario no liveness (tx)
Given we have a CLI deployer specified
And topology has 3 validators and 3 executors
And topology has 6 nodes
And run duration is 120 seconds
And wallets total funds is 1000000000 split across 500 users
And transactions rate is 10 per block

View File

@ -3,7 +3,7 @@ Feature: Testing Framework - Compose Runner
Scenario: Run a compose smoke scenario (tx + liveness)
Given deployer is "compose"
And topology has 1 validators and 1 executors
And topology has 2 nodes
And wallets total funds is 1000 split across 10 users
And run duration is 60 seconds
And transactions rate is 1 per block

View File

@ -3,7 +3,7 @@ Feature: Testing Framework - Local Runner
Scenario: Run a local smoke scenario (tx + liveness)
Given deployer is "local"
And topology has 1 validators and 1 executors
And topology has 2 nodes
And run duration is 60 seconds
And wallets total funds is 1000000000 split across 50 users
And transactions rate is 1 per block

View File

@ -6,7 +6,7 @@ use testing_framework_workflows::ScenarioBuilderExt;
use crate::SnippetResult;
pub fn scenario_plan() -> SnippetResult<Scenario<()>> {
ScenarioBuilder::topology_with(|t| t.network_star().validators(3).executors(2))
ScenarioBuilder::topology_with(|t| t.network_star().nodes(5))
.wallets(50)
.transactions_with(|txs| txs.rate(5).users(20))
.expect_consensus_liveness()

View File

@ -6,14 +6,13 @@ use testing_framework_workflows::{ScenarioBuilderExt, workloads::chaos::RandomRe
use crate::SnippetResult;
pub fn random_restart_plan() -> SnippetResult<Scenario<NodeControlCapability>> {
ScenarioBuilder::topology_with(|t| t.network_star().validators(2).executors(1))
ScenarioBuilder::topology_with(|t| t.network_star().nodes(3))
.enable_node_control()
.with_workload(RandomRestartWorkload::new(
Duration::from_secs(45), // min delay
Duration::from_secs(75), // max delay
Duration::from_secs(120), // target cooldown
true, // include validators
true, // include executors
true, // include nodes
))
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(150))

View File

@ -18,8 +18,8 @@ impl Expectation for ReachabilityExpectation {
}
async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> {
let validators = ctx.node_clients().validator_clients();
let client = validators.get(self.target_idx).ok_or_else(|| {
let clients = ctx.node_clients().node_clients();
let client = clients.get(self.target_idx).ok_or_else(|| {
Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"missing target client",

View File

@ -33,18 +33,18 @@ impl Workload for ReachabilityWorkload {
topology: &GeneratedTopology,
_run_metrics: &RunMetrics,
) -> Result<(), DynError> {
if topology.validators().get(self.target_idx).is_none() {
if topology.nodes().get(self.target_idx).is_none() {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"no validator at requested index",
"no node at requested index",
)));
}
Ok(())
}
async fn start(&self, ctx: &RunContext) -> Result<(), DynError> {
let validators = ctx.node_clients().validator_clients();
let client = validators.get(self.target_idx).ok_or_else(|| {
let clients = ctx.node_clients().node_clients();
let client = clients.get(self.target_idx).ok_or_else(|| {
Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"missing target client",

View File

@ -4,5 +4,5 @@ use testing_framework_workflows::ScenarioBuilderExt;
use crate::SnippetResult;
pub fn build_plan() -> SnippetResult<Scenario<()>> {
ScenarioBuilder::topology_with(|t| t.network_star().validators(1).executors(0)).build() // Construct the final Scenario
ScenarioBuilder::topology_with(|t| t.network_star().nodes(1)).build() // Construct the final Scenario
}

View File

@ -6,7 +6,7 @@ use testing_framework_runner_local::LocalDeployer;
use testing_framework_workflows::ScenarioBuilderExt;
pub async fn run_test() -> Result<()> {
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(3).executors(2))
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().nodes(5))
.wallets(50)
.transactions_with(|txs| {
txs.rate(5) // 5 transactions per block

View File

@ -4,7 +4,7 @@ use testing_framework_workflows::ScenarioBuilderExt;
use crate::SnippetResult;
pub fn expectations_plan() -> SnippetResult<Scenario<()>> {
ScenarioBuilder::topology_with(|t| t.network_star().validators(1).executors(0))
ScenarioBuilder::topology_with(|t| t.network_star().nodes(1))
.expect_consensus_liveness() // Assert blocks are produced continuously
.build()
}

View File

@ -6,7 +6,7 @@ use testing_framework_workflows::ScenarioBuilderExt;
use crate::SnippetResult;
pub fn run_duration_plan() -> SnippetResult<Scenario<()>> {
ScenarioBuilder::topology_with(|t| t.network_star().validators(1).executors(0))
ScenarioBuilder::topology_with(|t| t.network_star().nodes(1))
.with_run_duration(Duration::from_secs(120)) // Run for 120 seconds
.build()
}

View File

@ -3,7 +3,6 @@ use testing_framework_core::scenario::{Builder, ScenarioBuilder};
pub fn topology() -> Builder<()> {
ScenarioBuilder::topology_with(|t| {
t.network_star() // Star topology (all connect to seed node)
.validators(3) // Number of validator nodes
.executors(2) // Number of executor nodes
.nodes(5) // Number of nodes
})
}

View File

@ -4,7 +4,7 @@ use testing_framework_workflows::ScenarioBuilderExt;
use crate::SnippetResult;
pub fn transactions_plan() -> SnippetResult<Scenario<()>> {
ScenarioBuilder::topology_with(|t| t.network_star().validators(1).executors(0))
ScenarioBuilder::topology_with(|t| t.network_star().nodes(1))
.wallets(50)
.transactions_with(|txs| {
txs.rate(5) // 5 transactions per block

View File

@ -4,7 +4,7 @@ use testing_framework_workflows::ScenarioBuilderExt;
use crate::SnippetResult;
pub fn wallets_plan() -> SnippetResult<Scenario<()>> {
ScenarioBuilder::topology_with(|t| t.network_star().validators(1).executors(0))
ScenarioBuilder::topology_with(|t| t.network_star().nodes(1))
.wallets(50) // Seed 50 funded wallet accounts
.build()
}

View File

@ -7,7 +7,7 @@ use crate::SnippetResult;
pub fn chaos_plan()
-> SnippetResult<testing_framework_core::scenario::Scenario<NodeControlCapability>> {
ScenarioBuilder::topology_with(|t| t.network_star().validators(3).executors(2))
ScenarioBuilder::topology_with(|t| t.network_star().nodes(5))
.enable_node_control() // Enable node control capability
.chaos_with(|c| {
c.restart() // Random restart chaos

View File

@ -4,7 +4,7 @@ use testing_framework_runner_local::LocalDeployer;
use testing_framework_workflows::ScenarioBuilderExt;
pub async fn execution() -> Result<()> {
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(1).executors(0))
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().nodes(1))
.expect_consensus_liveness()
.build()?;

View File

@ -6,7 +6,7 @@ use testing_framework_runner_compose::ComposeDeployer;
use testing_framework_workflows::{ChaosBuilderExt, ScenarioBuilderExt};
pub async fn aggressive_chaos_test() -> Result<()> {
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(4).executors(2))
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().nodes(6))
.enable_node_control()
.wallets(50)
.transactions_with(|txs| txs.rate(10).users(20))

View File

@ -9,13 +9,12 @@ pub async fn load_progression_test() -> Result<()> {
for rate in [5, 10, 20, 30] {
println!("Testing with rate: {}", rate);
let mut plan =
ScenarioBuilder::topology_with(|t| t.network_star().validators(3).executors(2))
.wallets(50)
.transactions_with(|txs| txs.rate(rate).users(20))
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(60))
.build()?;
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().nodes(5))
.wallets(50)
.transactions_with(|txs| txs.rate(rate).users(20))
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(60))
.build()?;
let deployer = ComposeDeployer::default();
let runner = deployer.deploy(&plan).await?;

View File

@ -6,7 +6,7 @@ use testing_framework_runner_compose::ComposeDeployer;
use testing_framework_workflows::ScenarioBuilderExt;
pub async fn sustained_load_test() -> Result<()> {
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(4).executors(2))
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().nodes(6))
.wallets(100)
.transactions_with(|txs| txs.rate(15).users(50))
.expect_consensus_liveness()

View File

@ -6,7 +6,7 @@ use testing_framework_runner_compose::ComposeDeployer;
use testing_framework_workflows::{ChaosBuilderExt, ScenarioBuilderExt};
pub async fn chaos_resilience() -> Result<()> {
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(4).executors(2))
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().nodes(6))
.enable_node_control()
.wallets(20)
.transactions_with(|txs| txs.rate(3).users(10))

View File

@ -6,7 +6,7 @@ use testing_framework_runner_local::LocalDeployer;
use testing_framework_workflows::ScenarioBuilderExt;
pub async fn transactions_multi_node() -> Result<()> {
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(3).executors(2))
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().nodes(5))
.wallets(30)
.transactions_with(|txs| txs.rate(5).users(15))
.expect_consensus_liveness()

View File

@ -6,7 +6,7 @@ use testing_framework_runner_local::LocalDeployer;
use testing_framework_workflows::ScenarioBuilderExt;
pub async fn simple_consensus() -> Result<()> {
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(3).executors(0))
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().nodes(3))
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(30))
.build()?;

View File

@ -6,7 +6,7 @@ use testing_framework_runner_local::LocalDeployer;
use testing_framework_workflows::ScenarioBuilderExt;
pub async fn transaction_workload() -> Result<()> {
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(2).executors(0))
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().nodes(2))
.wallets(20)
.transactions_with(|txs| txs.rate(5).users(10))
.expect_consensus_liveness()

View File

@ -13,7 +13,7 @@ impl<Caps> YourExpectationDslExt for testing_framework_core::scenario::Builder<C
}
pub fn use_in_examples() -> SnippetResult<()> {
let _plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(3).executors(0))
let _plan = ScenarioBuilder::topology_with(|t| t.network_star().nodes(3))
.expect_your_condition()
.build()?;
Ok(())

View File

@ -27,7 +27,7 @@ impl<Caps> YourWorkloadDslExt for testing_framework_core::scenario::Builder<Caps
}
pub fn use_in_examples() -> SnippetResult<()> {
let _plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(3).executors(0))
let _plan = ScenarioBuilder::topology_with(|t| t.network_star().nodes(3))
.your_workload_with(|w| w.some_config())
.build()?;
Ok(())

View File

@ -11,8 +11,8 @@ impl Workload for RestartWorkload {
async fn start(&self, ctx: &RunContext) -> Result<(), DynError> {
if let Some(control) = ctx.node_control() {
// Restart the first validator (index 0) if supported.
control.restart_validator(0).await?;
// Restart the first node (index 0) if supported.
control.restart_node(0).await?;
}
Ok(())
}

View File

@ -3,6 +3,5 @@ use testing_framework_core::scenario::DynError;
#[async_trait]
pub trait NodeControlHandle: Send + Sync {
async fn restart_validator(&self, index: usize) -> Result<(), DynError>;
async fn restart_executor(&self, index: usize) -> Result<(), DynError>;
async fn restart_node(&self, index: usize) -> Result<(), DynError>;
}

View File

@ -4,7 +4,7 @@ use testing_framework_runner_local::LocalDeployer;
pub async fn run_with_env_overrides() -> Result<()> {
// Uses NOMOS_DEMO_* env vars (or legacy *_DEMO_* vars)
let mut plan = ScenarioBuilder::with_node_counts(3, 2)
let mut plan = ScenarioBuilder::with_node_count(5)
.with_run_duration(std::time::Duration::from_secs(120))
.build()?;

View File

@ -6,8 +6,8 @@ use testing_framework_runner_local::LocalDeployer;
use testing_framework_workflows::ScenarioBuilderExt;
pub async fn run_local_demo() -> Result<()> {
// Define the scenario (1 validator + 1 executor, tx workload)
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(1).executors(1))
// Define the scenario (2 nodes, tx workload)
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().nodes(2))
.wallets(1_000)
.transactions_with(|txs| {
txs.rate(5) // 5 transactions per block

View File

@ -3,7 +3,6 @@ use testing_framework_core::scenario::ScenarioBuilder;
pub fn step_1_topology() -> testing_framework_core::scenario::Builder<()> {
ScenarioBuilder::topology_with(|t| {
t.network_star() // Star topology: all nodes connect to seed
.validators(1) // 1 validator node
.executors(1) // 1 executor node
.nodes(2) // 2 nodes
})
}

View File

@ -2,5 +2,5 @@ use testing_framework_core::scenario::ScenarioBuilder;
use testing_framework_workflows::ScenarioBuilderExt;
pub fn step_2_wallets() -> testing_framework_core::scenario::Builder<()> {
ScenarioBuilder::with_node_counts(1, 1).wallets(1_000) // Seed 1,000 funded wallet accounts
ScenarioBuilder::with_node_count(2).wallets(1_000) // Seed 1,000 funded wallet accounts
}

View File

@ -2,7 +2,7 @@ use testing_framework_core::scenario::ScenarioBuilder;
use testing_framework_workflows::ScenarioBuilderExt;
pub fn step_3_workloads() -> testing_framework_core::scenario::Builder<()> {
ScenarioBuilder::with_node_counts(1, 1)
ScenarioBuilder::with_node_count(2)
.wallets(1_000)
.transactions_with(|txs| {
txs.rate(5) // 5 transactions per block

View File

@ -2,5 +2,5 @@ use testing_framework_core::scenario::ScenarioBuilder;
use testing_framework_workflows::ScenarioBuilderExt;
pub fn step_4_expectation() -> testing_framework_core::scenario::Builder<()> {
ScenarioBuilder::with_node_counts(1, 1).expect_consensus_liveness() // This says what success means: blocks must be produced continuously.
ScenarioBuilder::with_node_count(2).expect_consensus_liveness() // This says what success means: blocks must be produced continuously.
}

View File

@ -3,5 +3,5 @@ use std::time::Duration;
use testing_framework_core::scenario::ScenarioBuilder;
pub fn step_5_run_duration() -> testing_framework_core::scenario::Builder<()> {
ScenarioBuilder::with_node_counts(1, 1).with_run_duration(Duration::from_secs(60))
ScenarioBuilder::with_node_count(2).with_run_duration(Duration::from_secs(60))
}

View File

@ -3,7 +3,7 @@ use testing_framework_core::scenario::{Deployer, ScenarioBuilder};
use testing_framework_runner_local::LocalDeployer;
pub async fn step_6_deploy_and_execute() -> Result<()> {
let mut plan = ScenarioBuilder::with_node_counts(1, 1).build()?;
let mut plan = ScenarioBuilder::with_node_count(2).build()?;
let deployer = LocalDeployer::default(); // Use local process deployer
let runner = deployer.deploy(&plan).await?; // Provision infrastructure

View File

@ -4,7 +4,7 @@ use testing_framework_runner_compose::ComposeDeployer;
pub async fn run_with_compose_deployer() -> Result<()> {
// ... same scenario definition ...
let mut plan = ScenarioBuilder::with_node_counts(1, 1).build()?;
let mut plan = ScenarioBuilder::with_node_count(2).build()?;
let deployer = ComposeDeployer::default(); // Use Docker Compose
let runner = deployer.deploy(&plan).await?;

View File

@ -5,7 +5,7 @@ use crate::SnippetResult;
pub fn declarative_over_imperative() -> SnippetResult<()> {
// Good: declarative
let _plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(2).executors(1))
let _plan = ScenarioBuilder::topology_with(|t| t.network_star().nodes(3))
.transactions_with(|txs| {
txs.rate(5) // 5 transactions per block
})
@ -13,7 +13,7 @@ pub fn declarative_over_imperative() -> SnippetResult<()> {
.build()?;
// Bad: imperative (framework doesn't work this way)
// spawn_validator(); spawn_executor();
// spawn_node(); spawn_node();
// loop { submit_tx(); check_block(); }
Ok(())

View File

@ -7,7 +7,7 @@ use crate::SnippetResult;
pub fn determinism_first() -> SnippetResult<()> {
// Separate: functional test (deterministic)
let _plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(2).executors(1))
let _plan = ScenarioBuilder::topology_with(|t| t.network_star().nodes(3))
.transactions_with(|txs| {
txs.rate(5) // 5 transactions per block
})
@ -15,20 +15,19 @@ pub fn determinism_first() -> SnippetResult<()> {
.build()?;
// Separate: chaos test (introduces randomness)
let _chaos_plan =
ScenarioBuilder::topology_with(|t| t.network_star().validators(3).executors(2))
.enable_node_control()
.chaos_with(|c| {
c.restart()
.min_delay(Duration::from_secs(30))
.max_delay(Duration::from_secs(60))
.target_cooldown(Duration::from_secs(45))
.apply()
})
.transactions_with(|txs| {
txs.rate(5) // 5 transactions per block
})
.expect_consensus_liveness()
.build()?;
let _chaos_plan = ScenarioBuilder::topology_with(|t| t.network_star().nodes(5))
.enable_node_control()
.chaos_with(|c| {
c.restart()
.min_delay(Duration::from_secs(30))
.max_delay(Duration::from_secs(60))
.target_cooldown(Duration::from_secs(45))
.apply()
})
.transactions_with(|txs| {
txs.rate(5) // 5 transactions per block
})
.expect_consensus_liveness()
.build()?;
Ok(())
}

View File

@ -7,14 +7,14 @@ use crate::SnippetResult;
pub fn minimum_run_windows() -> SnippetResult<()> {
// Bad: too short (~2 blocks with default 2s slots, 0.9 coeff)
let _too_short = ScenarioBuilder::with_node_counts(1, 0)
let _too_short = ScenarioBuilder::with_node_count(1)
.with_run_duration(Duration::from_secs(5))
.expect_consensus_liveness()
.build()?;
// Good: enough blocks for assertions (~27 blocks with default 2s slots, 0.9
// coeff)
let _good = ScenarioBuilder::with_node_counts(1, 0)
let _good = ScenarioBuilder::with_node_count(1)
.with_run_duration(Duration::from_secs(60))
.expect_consensus_liveness()
.build()?;

View File

@ -7,7 +7,7 @@ use crate::SnippetResult;
pub fn protocol_time_not_wall_time() -> SnippetResult<()> {
// Good: protocol-oriented thinking
let _plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(2).executors(1))
let _plan = ScenarioBuilder::topology_with(|t| t.network_star().nodes(3))
.transactions_with(|txs| {
txs.rate(5) // 5 transactions per block
})

View File

@ -24,39 +24,27 @@ async fn main() {
tracing_subscriber::fmt::init();
let validators = read_env_any(&["NOMOS_DEMO_VALIDATORS"], demo::DEFAULT_VALIDATORS);
let executors = read_env_any(&["NOMOS_DEMO_EXECUTORS"], demo::DEFAULT_EXECUTORS);
let nodes = read_env_any(&["NOMOS_DEMO_NODES"], demo::DEFAULT_NODES);
let run_secs = read_env_any(&["NOMOS_DEMO_RUN_SECS"], demo::DEFAULT_RUN_SECS);
info!(
validators,
executors, run_secs, "starting compose runner demo"
);
info!(nodes, run_secs, "starting compose runner demo");
if let Err(err) = run_compose_case(validators, executors, Duration::from_secs(run_secs)).await {
if let Err(err) = run_compose_case(nodes, Duration::from_secs(run_secs)).await {
warn!("compose runner demo failed: {err:#}");
process::exit(1);
}
}
async fn run_compose_case(
validators: usize,
executors: usize,
run_duration: Duration,
) -> Result<()> {
async fn run_compose_case(nodes: usize, run_duration: Duration) -> Result<()> {
info!(
validators,
executors,
nodes,
duration_secs = run_duration.as_secs(),
"building scenario plan"
);
let scenario = ScenarioBuilder::topology_with(|t| {
t.network_star().validators(validators).executors(executors)
})
.enable_node_control();
let scenario =
ScenarioBuilder::topology_with(|t| t.network_star().nodes(nodes)).enable_node_control();
let scenario = if let Some((chaos_min_delay, chaos_max_delay, chaos_target_cooldown)) =
chaos_timings(run_duration)

View File

@ -17,33 +17,29 @@ const TRANSACTION_WALLETS: usize = 50;
async fn main() {
tracing_subscriber::fmt::init();
let validators = read_env_any(&["NOMOS_DEMO_VALIDATORS"], demo::DEFAULT_VALIDATORS);
let executors = read_env_any(&["NOMOS_DEMO_EXECUTORS"], demo::DEFAULT_EXECUTORS);
let nodes = read_env_any(&["NOMOS_DEMO_NODES"], demo::DEFAULT_NODES);
let run_secs = read_env_any(&["NOMOS_DEMO_RUN_SECS"], demo::DEFAULT_RUN_SECS);
info!(validators, executors, run_secs, "starting k8s runner demo");
info!(nodes, run_secs, "starting k8s runner demo");
if let Err(err) = run_k8s_case(validators, executors, Duration::from_secs(run_secs)).await {
if let Err(err) = run_k8s_case(nodes, Duration::from_secs(run_secs)).await {
warn!("k8s runner demo failed: {err:#}");
process::exit(1);
}
}
async fn run_k8s_case(validators: usize, executors: usize, run_duration: Duration) -> Result<()> {
async fn run_k8s_case(nodes: usize, run_duration: Duration) -> Result<()> {
info!(
validators,
executors,
nodes,
duration_secs = run_duration.as_secs(),
"building scenario plan"
);
let mut scenario = ScenarioBuilder::topology_with(|t| {
t.network_star().validators(validators).executors(executors)
})
.with_capabilities(ObservabilityCapability::default())
.wallets(TOTAL_WALLETS)
.transactions_with(|txs| txs.rate(MIXED_TXS_PER_BLOCK).users(TRANSACTION_WALLETS))
.with_run_duration(run_duration)
.expect_consensus_liveness();
let mut scenario = ScenarioBuilder::topology_with(|t| t.network_star().nodes(nodes))
.with_capabilities(ObservabilityCapability::default())
.wallets(TOTAL_WALLETS)
.transactions_with(|txs| txs.rate(MIXED_TXS_PER_BLOCK).users(TRANSACTION_WALLETS))
.with_run_duration(run_duration)
.expect_consensus_liveness();
if let Ok(url) = env::var("NOMOS_METRICS_QUERY_URL") {
if !url.trim().is_empty() {

View File

@ -22,34 +22,27 @@ async fn main() {
process::exit(1);
}
let validators = read_env_any(&["NOMOS_DEMO_VALIDATORS"], demo::DEFAULT_VALIDATORS);
let executors = read_env_any(&["NOMOS_DEMO_EXECUTORS"], demo::DEFAULT_EXECUTORS);
let nodes = read_env_any(&["NOMOS_DEMO_NODES"], demo::DEFAULT_NODES);
let run_secs = read_env_any(&["NOMOS_DEMO_RUN_SECS"], demo::DEFAULT_RUN_SECS);
info!(
validators,
executors, run_secs, "starting local runner demo"
);
info!(nodes, run_secs, "starting local runner demo");
if let Err(err) = run_local_case(validators, executors, Duration::from_secs(run_secs)).await {
if let Err(err) = run_local_case(nodes, Duration::from_secs(run_secs)).await {
warn!("local runner demo failed: {err:#}");
process::exit(1);
}
}
async fn run_local_case(validators: usize, executors: usize, run_duration: Duration) -> Result<()> {
async fn run_local_case(nodes: usize, run_duration: Duration) -> Result<()> {
info!(
validators,
executors,
nodes,
duration_secs = run_duration.as_secs(),
"building scenario plan"
);
let scenario = ScenarioBuilder::topology_with(|t| {
t.network_star().validators(validators).executors(executors)
})
.wallets(TOTAL_WALLETS)
.with_run_duration(run_duration);
let scenario = ScenarioBuilder::topology_with(|t| t.network_star().nodes(nodes))
.wallets(TOTAL_WALLETS)
.with_run_duration(run_duration);
let scenario = if run_duration.as_secs() <= SMOKE_RUN_SECS_MAX {
scenario

View File

@ -1,3 +1,2 @@
pub const DEFAULT_VALIDATORS: usize = 2;
pub const DEFAULT_EXECUTORS: usize = 0;
pub const DEFAULT_NODES: usize = 2;
pub const DEFAULT_RUN_SECS: u64 = 60;

View File

@ -37,7 +37,7 @@ impl Workload for JoinNodeWorkload {
sleep(START_DELAY).await;
let node = handle.start_validator(&self.name).await?;
let node = handle.start_node(&self.name).await?;
let client = node.api;
timeout(READY_TIMEOUT, async {
@ -86,7 +86,7 @@ impl Workload for JoinNodeWithPeersWorkload {
let options = StartNodeOptions {
peers: PeerSelection::Named(self.peers.clone()),
};
let node = handle.start_validator_with(&self.name, options).await?;
let node = handle.start_node_with(&self.name, options).await?;
let client = node.api;
timeout(READY_TIMEOUT, async {
@ -110,13 +110,12 @@ impl Workload for JoinNodeWithPeersWorkload {
async fn dynamic_join_reaches_consensus_liveness() -> Result<()> {
let _ = try_init();
let mut scenario =
ScenarioBuilder::topology_with(|t| t.network_star().validators(2).executors(0))
.enable_node_control()
.with_workload(JoinNodeWorkload::new("joiner"))
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(60))
.build()?;
let mut scenario = ScenarioBuilder::topology_with(|t| t.network_star().nodes(2))
.enable_node_control()
.with_workload(JoinNodeWorkload::new("joiner"))
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(60))
.build()?;
let deployer = LocalDeployer::default();
let runner = deployer.deploy(&scenario).await?;
@ -128,16 +127,15 @@ async fn dynamic_join_reaches_consensus_liveness() -> Result<()> {
#[tokio::test]
#[ignore = "run manually with `cargo test -p runner-examples -- --ignored`"]
async fn dynamic_join_with_peers_reaches_consensus_liveness() -> Result<()> {
let mut scenario =
ScenarioBuilder::topology_with(|t| t.network_star().validators(2).executors(0))
.enable_node_control()
.with_workload(JoinNodeWithPeersWorkload::new(
"joiner",
vec!["validator-0".to_string()],
))
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(60))
.build()?;
let mut scenario = ScenarioBuilder::topology_with(|t| t.network_star().nodes(2))
.enable_node_control()
.with_workload(JoinNodeWithPeersWorkload::new(
"joiner",
vec!["node-0".to_string()],
))
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(60))
.build()?;
let deployer = LocalDeployer::default();
let runner = deployer.deploy(&scenario).await?;

View File

@ -18,15 +18,15 @@ async fn manual_cluster_two_clusters_merge() -> Result<()> {
// Required env vars (set on the command line when running this test):
// - `POL_PROOF_DEV_MODE=true`
// - `RUST_LOG=info` (optional)
let config = TopologyConfig::with_node_numbers(2, 0);
let config = TopologyConfig::with_node_count(2);
let deployer = LocalDeployer::new();
let cluster = deployer.manual_cluster(config)?;
// Nodes are stopped automatically when the cluster is dropped.
println!("starting validator a");
println!("starting node a");
let validator_a = cluster
.start_validator_with(
let node_a = cluster
.start_node_with(
"a",
StartNodeOptions {
peers: PeerSelection::None,
@ -38,12 +38,12 @@ async fn manual_cluster_two_clusters_merge() -> Result<()> {
println!("waiting briefly before starting c");
sleep(Duration::from_secs(30)).await;
println!("starting validator c -> a");
let validator_c = cluster
.start_validator_with(
println!("starting node c -> a");
let node_c = cluster
.start_node_with(
"c",
StartNodeOptions {
peers: PeerSelection::Named(vec!["validator-a".to_owned()]),
peers: PeerSelection::Named(vec!["node-a".to_owned()]),
},
)
.await?
@ -54,12 +54,12 @@ async fn manual_cluster_two_clusters_merge() -> Result<()> {
sleep(Duration::from_secs(5)).await;
let a_info = validator_a.consensus_info().await?;
let c_info = validator_c.consensus_info().await?;
let a_info = node_a.consensus_info().await?;
let c_info = node_c.consensus_info().await?;
let height_diff = a_info.height.abs_diff(c_info.height);
println!(
"final heights: validator-a={}, validator-c={}, diff={}",
"final heights: node-a={}, node-c={}, diff={}",
a_info.height, c_info.height, height_diff
);

View File

@ -322,7 +322,6 @@ build_bundle::prepare_circuits() {
fi
NODE_BIN="${NODE_TARGET}/debug/logos-blockchain-node"
EXEC_BIN="${NODE_TARGET}/debug/logos-blockchain-executor"
CLI_BIN="${NODE_TARGET}/debug/logos-blockchain-cli"
}
@ -360,13 +359,13 @@ build_bundle::build_binaries() {
LOGOS_BLOCKCHAIN_CIRCUITS="${CIRCUITS_DIR}" \
RUSTUP_TOOLCHAIN="${BUNDLE_RUSTUP_TOOLCHAIN}" \
cargo build --all-features \
-p logos-blockchain-node -p logos-blockchain-executor -p logos-blockchain-cli \
-p logos-blockchain-node -p logos-blockchain-cli \
--target-dir "${NODE_TARGET}"
else
RUSTFLAGS='--cfg feature="pol-dev-mode"' NOMOS_CIRCUITS="${CIRCUITS_DIR}" \
LOGOS_BLOCKCHAIN_CIRCUITS="${CIRCUITS_DIR}" \
cargo build --all-features \
-p logos-blockchain-node -p logos-blockchain-executor -p logos-blockchain-cli \
-p logos-blockchain-node -p logos-blockchain-cli \
--target-dir "${NODE_TARGET}"
fi
)
@ -380,7 +379,6 @@ build_bundle::package_bundle() {
cp -a "${CIRCUITS_DIR}/." "${bundle_dir}/artifacts/circuits/"
mkdir -p "${bundle_dir}/artifacts"
cp "${NODE_BIN}" "${bundle_dir}/artifacts/logos-blockchain-node"
cp "${EXEC_BIN}" "${bundle_dir}/artifacts/logos-blockchain-executor"
cp "${CLI_BIN}" "${bundle_dir}/artifacts/logos-blockchain-cli"
{
echo "nomos_node_path=${NOMOS_NODE_PATH:-}"

View File

@ -126,7 +126,11 @@ build_linux_binaries::stage_from_bundle() {
local tar_path="$1"
local extract_dir
extract_dir="$(common::tmpdir nomos-linux-bundle.XXXXXX)"
cleanup() { rm -rf "${extract_dir}" 2>/dev/null || true; }
cleanup() {
if [ -n "${extract_dir:-}" ]; then
rm -rf "${extract_dir}" 2>/dev/null || true
fi
}
trap cleanup EXIT
echo "==> Extracting ${tar_path}"
@ -134,7 +138,6 @@ build_linux_binaries::stage_from_bundle() {
local artifacts="${extract_dir}/artifacts"
[ -f "${artifacts}/logos-blockchain-node" ] || common::die "Missing logos-blockchain-node in bundle: ${tar_path}"
[ -f "${artifacts}/logos-blockchain-executor" ] || common::die "Missing logos-blockchain-executor in bundle: ${tar_path}"
[ -f "${artifacts}/logos-blockchain-cli" ] || common::die "Missing logos-blockchain-cli in bundle: ${tar_path}"
[ -d "${artifacts}/circuits" ] || common::die "Missing circuits/ in bundle: ${tar_path}"
@ -144,7 +147,7 @@ build_linux_binaries::stage_from_bundle() {
echo "==> Staging binaries to ${bin_out}"
mkdir -p "${bin_out}"
cp "${artifacts}/logos-blockchain-node" "${artifacts}/logos-blockchain-executor" "${artifacts}/logos-blockchain-cli" "${bin_out}/"
cp "${artifacts}/logos-blockchain-node" "${artifacts}/logos-blockchain-cli" "${bin_out}/"
echo "==> Staging circuits to ${circuits_out}"
rm -rf "${circuits_out}"

View File

@ -137,9 +137,9 @@ build_test_image::print_config() {
}
build_test_image::have_host_binaries() {
# Preserve existing behavior: only require node+executor on the host.
# Preserve existing behavior: only require logos-blockchain-node on the host.
# If logos-blockchain-cli is missing, the Dockerfile can still build it from source.
[ -x "${BIN_DST}/logos-blockchain-node" ] && [ -x "${BIN_DST}/logos-blockchain-executor" ]
[ -x "${BIN_DST}/logos-blockchain-node" ]
}
build_test_image::restore_from_bundle() {
@ -153,13 +153,13 @@ build_test_image::restore_from_bundle() {
tar -xzf "${TAR_PATH}" -C "${tmp_extract}"
local artifacts="${tmp_extract}/artifacts"
for bin in logos-blockchain-node logos-blockchain-executor logos-blockchain-cli; do
for bin in logos-blockchain-node logos-blockchain-cli; do
[ -f "${artifacts}/${bin}" ] || build_test_image::fail "Bundle ${TAR_PATH} missing artifacts/${bin}"
done
mkdir -p "${BIN_DST}"
cp "${artifacts}/logos-blockchain-node" "${artifacts}/logos-blockchain-executor" "${artifacts}/logos-blockchain-cli" "${BIN_DST}/"
chmod +x "${BIN_DST}/logos-blockchain-node" "${BIN_DST}/logos-blockchain-executor" "${BIN_DST}/logos-blockchain-cli" || true
cp "${artifacts}/logos-blockchain-node" "${artifacts}/logos-blockchain-cli" "${BIN_DST}/"
chmod +x "${BIN_DST}/logos-blockchain-node" "${BIN_DST}/logos-blockchain-cli" || true
if [ -d "${artifacts}/circuits" ]; then
mkdir -p "${CIRCUITS_DIR_HOST}"

View File

@ -161,8 +161,6 @@ targets = [
"logos-blockchain-da-network-service",
"logos-blockchain-da-sampling-service",
"logos-blockchain-da-verifier-service",
"logos-blockchain-executor",
"logos-blockchain-executor-http-client",
"logos-blockchain-groth16",
"logos-blockchain-http-api-common",
"logos-blockchain-key-management-system-service",

View File

@ -44,8 +44,7 @@ Modes:
Options:
-t, --run-seconds N Duration to run the demo (required)
-v, --validators N Number of validators (required)
-e, --executors N Number of executors (required)
-n, --nodes N Number of nodes (required)
--bundle PATH Convenience alias for setting NOMOS_BINARIES_TAR=PATH
--metrics-query-url URL PromQL base URL the runner process can query (optional)
--metrics-otlp-ingest-url URL Full OTLP HTTP ingest URL for node metrics export (optional)
@ -116,8 +115,7 @@ run_examples::select_bin() {
run_examples::parse_args() {
MODE="compose"
RUN_SECS_RAW=""
DEMO_VALIDATORS=""
DEMO_EXECUTORS=""
DEMO_NODES=""
IMAGE_SELECTION_MODE="auto"
METRICS_QUERY_URL=""
METRICS_OTLP_INGEST_URL=""
@ -140,20 +138,12 @@ run_examples::parse_args() {
RUN_SECS_RAW="${1#*=}"
shift
;;
-v|--validators)
DEMO_VALIDATORS="${2:-}"
-n|--nodes)
DEMO_NODES="${2:-}"
shift 2
;;
--validators=*)
DEMO_VALIDATORS="${1#*=}"
shift
;;
-e|--executors)
DEMO_EXECUTORS="${2:-}"
shift 2
;;
--executors=*)
DEMO_EXECUTORS="${1#*=}"
--nodes=*)
DEMO_NODES="${1#*=}"
shift
;;
--bundle)
@ -232,14 +222,11 @@ run_examples::parse_args() {
fi
RUN_SECS="${RUN_SECS_RAW}"
if [ -z "${DEMO_VALIDATORS}" ] || [ -z "${DEMO_EXECUTORS}" ]; then
run_examples::fail_with_usage "validators and executors must be provided via -v/--validators and -e/--executors"
if [ -z "${DEMO_NODES}" ]; then
run_examples::fail_with_usage "nodes must be provided via -n/--nodes"
fi
if ! common::is_uint "${DEMO_VALIDATORS}" ; then
run_examples::fail_with_usage "validators must be a non-negative integer (pass -v/--validators)"
fi
if ! common::is_uint "${DEMO_EXECUTORS}" ; then
run_examples::fail_with_usage "executors must be a non-negative integer (pass -e/--executors)"
if ! common::is_uint "${DEMO_NODES}" ; then
run_examples::fail_with_usage "nodes must be a non-negative integer (pass -n/--nodes)"
fi
}
@ -388,7 +375,7 @@ run_examples::restore_binaries_from_tar() {
RESTORED_BIN_DIR="${src}"
export RESTORED_BIN_DIR
if [ ! -f "${src}/logos-blockchain-node" ] || [ ! -f "${src}/logos-blockchain-executor" ] || [ ! -f "${src}/logos-blockchain-cli" ]; then
if [ ! -f "${src}/logos-blockchain-node" ] || [ ! -f "${src}/logos-blockchain-cli" ]; then
echo "Binaries missing in ${tar_path}; provide a prebuilt binaries tarball." >&2
return 1
fi
@ -397,11 +384,12 @@ run_examples::restore_binaries_from_tar() {
if [ "${MODE}" != "host" ] && ! run_examples::host_bin_matches_arch "${src}/logos-blockchain-node"; then
echo "Bundled binaries do not match host arch; skipping copy so containers rebuild from source."
copy_bins=0
rm -f "${bin_dst}/logos-blockchain-node" "${bin_dst}/logos-blockchain-executor" "${bin_dst}/logos-blockchain-cli"
rm -f "${bin_dst}/logos-blockchain-node" "${bin_dst}/logos-blockchain-cli"
fi
if [ "${copy_bins}" -eq 1 ]; then
mkdir -p "${bin_dst}"
cp "${src}/logos-blockchain-node" "${src}/logos-blockchain-executor" "${src}/logos-blockchain-cli" "${bin_dst}/"
cp "${src}/logos-blockchain-node" "${src}/logos-blockchain-cli" "${bin_dst}/"
fi
fi
if [ -d "${circuits_src}" ] && [ -f "${circuits_src}/${KZG_FILE}" ]; then
@ -436,8 +424,8 @@ run_examples::prepare_bundles() {
HOST_TAR="${ROOT_DIR}/.tmp/nomos-binaries-host-${VERSION}.tar.gz"
LINUX_TAR="${ROOT_DIR}/.tmp/nomos-binaries-linux-${VERSION}.tar.gz"
if [ -n "${LOGOS_BLOCKCHAIN_NODE_BIN:-}" ] && [ -x "${LOGOS_BLOCKCHAIN_NODE_BIN}" ] && [ -n "${LOGOS_BLOCKCHAIN_EXECUTOR_BIN:-}" ] && [ -x "${LOGOS_BLOCKCHAIN_EXECUTOR_BIN}" ]; then
echo "==> Using pre-specified host binaries (LOGOS_BLOCKCHAIN_NODE_BIN/LOGOS_BLOCKCHAIN_EXECUTOR_BIN); skipping tarball restore"
if [ -n "${LOGOS_BLOCKCHAIN_NODE_BIN:-}" ] && [ -x "${LOGOS_BLOCKCHAIN_NODE_BIN}" ]; then
echo "==> Using pre-specified host binaries (LOGOS_BLOCKCHAIN_NODE_BIN); skipping tarball restore"
return 0
fi
@ -508,20 +496,18 @@ run_examples::validate_restored_bundle() {
common::die "KZG params missing at ${KZG_HOST_PATH}; ensure the tarball contains circuits."
fi
if [ "${MODE}" = "host" ] && ! { [ -n "${LOGOS_BLOCKCHAIN_NODE_BIN:-}" ] && [ -x "${LOGOS_BLOCKCHAIN_NODE_BIN:-}" ] && [ -n "${LOGOS_BLOCKCHAIN_EXECUTOR_BIN:-}" ] && [ -x "${LOGOS_BLOCKCHAIN_EXECUTOR_BIN:-}" ]; }; then
local tar_node tar_exec
if [ "${MODE}" = "host" ] && ! { [ -n "${LOGOS_BLOCKCHAIN_NODE_BIN:-}" ] && [ -x "${LOGOS_BLOCKCHAIN_NODE_BIN:-}" ]; }; then
local tar_node
tar_node="${RESTORED_BIN_DIR:-${ROOT_DIR}/testing-framework/assets/stack/bin}/logos-blockchain-node"
tar_exec="${RESTORED_BIN_DIR:-${ROOT_DIR}/testing-framework/assets/stack/bin}/logos-blockchain-executor"
[ -x "${tar_node}" ] && [ -x "${tar_exec}" ] || common::die \
[ -x "${tar_node}" ] || common::die \
"Restored tarball missing host executables; provide a host-compatible binaries tarball."
run_examples::host_bin_matches_arch "${tar_node}" && run_examples::host_bin_matches_arch "${tar_exec}" || common::die \
run_examples::host_bin_matches_arch "${tar_node}" || common::die \
"Restored executables do not match host architecture; provide a host-compatible binaries tarball."
echo "==> Using restored host binaries from tarball"
LOGOS_BLOCKCHAIN_NODE_BIN="${tar_node}"
LOGOS_BLOCKCHAIN_EXECUTOR_BIN="${tar_exec}"
export LOGOS_BLOCKCHAIN_NODE_BIN LOGOS_BLOCKCHAIN_EXECUTOR_BIN
export LOGOS_BLOCKCHAIN_NODE_BIN
fi
}
@ -571,8 +557,7 @@ run_examples::run() {
kzg_path="$(run_examples::kzg_path_for_mode)"
export NOMOS_DEMO_RUN_SECS="${RUN_SECS}"
export NOMOS_DEMO_VALIDATORS="${DEMO_VALIDATORS}"
export NOMOS_DEMO_EXECUTORS="${DEMO_EXECUTORS}"
export NOMOS_DEMO_NODES="${DEMO_NODES}"
if [ -n "${METRICS_QUERY_URL}" ]; then
export NOMOS_METRICS_QUERY_URL="${METRICS_QUERY_URL}"
@ -591,7 +576,6 @@ run_examples::run() {
LOGOS_BLOCKCHAIN_CIRCUITS="${HOST_BUNDLE_PATH}" \
LOGOS_BLOCKCHAIN_KZGRS_PARAMS_PATH="${kzg_path}" \
LOGOS_BLOCKCHAIN_NODE_BIN="${LOGOS_BLOCKCHAIN_NODE_BIN:-}" \
LOGOS_BLOCKCHAIN_EXECUTOR_BIN="${LOGOS_BLOCKCHAIN_EXECUTOR_BIN:-}" \
COMPOSE_CIRCUITS_PLATFORM="${COMPOSE_CIRCUITS_PLATFORM:-}" \
cargo run -p runner-examples --bin "${BIN}"
}

View File

@ -17,8 +17,7 @@ image rebuilds (where it makes sense), after cleaning and rebuilding bundles.
Options:
-t, --run-seconds N Demo duration for each run (default: 120)
-v, --validators N Validators (default: 1)
-e, --executors N Executors (default: 1)
-n, --nodes N Nodes (default: 2)
--modes LIST Comma-separated: host,compose,k8s (default: host,compose,k8s)
--no-clean Skip scripts/ops/clean.sh step
--no-bundles Skip scripts/build/build-bundle.sh (uses existing .tmp tarballs)
@ -45,8 +44,7 @@ matrix::have() { command -v "$1" >/dev/null 2>&1; }
matrix::parse_args() {
RUN_SECS=120
VALIDATORS=1
EXECUTORS=1
NODES=2
MODES_RAW="host,compose,k8s"
DO_CLEAN=1
DO_BUNDLES=1
@ -61,10 +59,8 @@ matrix::parse_args() {
-h|--help) matrix::usage; exit 0 ;;
-t|--run-seconds) RUN_SECS="${2:-}"; shift 2 ;;
--run-seconds=*) RUN_SECS="${1#*=}"; shift ;;
-v|--validators) VALIDATORS="${2:-}"; shift 2 ;;
--validators=*) VALIDATORS="${1#*=}"; shift ;;
-e|--executors) EXECUTORS="${2:-}"; shift 2 ;;
--executors=*) EXECUTORS="${1#*=}"; shift ;;
-n|--nodes) NODES="${2:-}"; shift 2 ;;
--nodes=*) NODES="${1#*=}"; shift ;;
--modes) MODES_RAW="${2:-}"; shift 2 ;;
--modes=*) MODES_RAW="${1#*=}"; shift ;;
--no-clean) DO_CLEAN=0; shift ;;
@ -82,8 +78,7 @@ matrix::parse_args() {
common::is_uint "${RUN_SECS}" || matrix::die "--run-seconds must be an integer"
[ "${RUN_SECS}" -gt 0 ] || matrix::die "--run-seconds must be > 0"
common::is_uint "${VALIDATORS}" || matrix::die "--validators must be an integer"
common::is_uint "${EXECUTORS}" || matrix::die "--executors must be an integer"
common::is_uint "${NODES}" || matrix::die "--nodes must be an integer"
}
matrix::split_modes() {
@ -220,7 +215,7 @@ matrix::main() {
host)
matrix::run_case "host" \
"${ROOT_DIR}/scripts/run/run-examples.sh" \
-t "${RUN_SECS}" -v "${VALIDATORS}" -e "${EXECUTORS}" \
-t "${RUN_SECS}" -n "${NODES}" \
"${forward[@]}" \
host
;;
@ -228,7 +223,7 @@ matrix::main() {
if [ "${SKIP_IMAGE_BUILD_VARIANTS}" -eq 0 ]; then
matrix::run_case "compose.image_build" \
"${ROOT_DIR}/scripts/run/run-examples.sh" \
-t "${RUN_SECS}" -v "${VALIDATORS}" -e "${EXECUTORS}" \
-t "${RUN_SECS}" -n "${NODES}" \
"${forward[@]}" \
compose
else
@ -238,7 +233,7 @@ matrix::main() {
matrix::run_case "compose.skip_image_build" \
"${ROOT_DIR}/scripts/run/run-examples.sh" \
--no-image-build \
-t "${RUN_SECS}" -v "${VALIDATORS}" -e "${EXECUTORS}" \
-t "${RUN_SECS}" -n "${NODES}" \
"${forward[@]}" \
compose
;;
@ -259,7 +254,7 @@ matrix::main() {
fi
matrix::run_case "k8s.image_build" \
"${ROOT_DIR}/scripts/run/run-examples.sh" \
-t "${RUN_SECS}" -v "${VALIDATORS}" -e "${EXECUTORS}" \
-t "${RUN_SECS}" -n "${NODES}" \
"${forward[@]}" \
k8s
unset NOMOS_FORCE_IMAGE_BUILD || true
@ -273,7 +268,7 @@ matrix::main() {
matrix::run_case "k8s.skip_image_build" \
"${ROOT_DIR}/scripts/run/run-examples.sh" \
--no-image-build \
-t "${RUN_SECS}" -v "${VALIDATORS}" -e "${EXECUTORS}" \
-t "${RUN_SECS}" -n "${NODES}" \
"${forward[@]}" \
k8s
;;

View File

@ -82,7 +82,6 @@ COPY --from=builder /opt/circuits /opt/circuits
COPY --from=builder /workspace/testing-framework/assets/stack/kzgrs_test_params/kzgrs_test_params /opt/nomos/kzg-params/kzgrs_test_params
COPY --from=builder /workspace/artifacts/logos-blockchain-node /usr/bin/logos-blockchain-node
COPY --from=builder /workspace/artifacts/logos-blockchain-executor /usr/bin/logos-blockchain-executor
COPY --from=builder /workspace/artifacts/logos-blockchain-cli /usr/bin/logos-blockchain-cli
COPY --from=builder /workspace/artifacts/cfgsync-server /usr/bin/cfgsync-server
COPY --from=builder /workspace/artifacts/cfgsync-client /usr/bin/cfgsync-client

View File

@ -9,7 +9,6 @@ TARGET_ARCH="$(uname -m)"
have_prebuilt() {
[ -f testing-framework/assets/stack/bin/logos-blockchain-node ] && \
[ -f testing-framework/assets/stack/bin/logos-blockchain-executor ] && \
[ -f testing-framework/assets/stack/bin/logos-blockchain-cli ]
}
@ -34,7 +33,6 @@ bin_matches_arch() {
if have_prebuilt && bin_matches_arch; then
echo "Using prebuilt logos-blockchain binaries from testing-framework/assets/stack/bin"
cp testing-framework/assets/stack/bin/logos-blockchain-node /workspace/artifacts/logos-blockchain-node
cp testing-framework/assets/stack/bin/logos-blockchain-executor /workspace/artifacts/logos-blockchain-executor
cp testing-framework/assets/stack/bin/logos-blockchain-cli /workspace/artifacts/logos-blockchain-cli
exit 0
fi
@ -67,10 +65,9 @@ fi
RUSTFLAGS='--cfg feature="pol-dev-mode"' NOMOS_CIRCUITS=/opt/circuits \
LOGOS_BLOCKCHAIN_CIRCUITS=/opt/circuits \
cargo build --features "testing" \
-p logos-blockchain-node -p logos-blockchain-executor -p logos-blockchain-cli
-p logos-blockchain-node -p logos-blockchain-cli
cp /tmp/nomos-node/target/debug/logos-blockchain-node /workspace/artifacts/logos-blockchain-node
cp /tmp/nomos-node/target/debug/logos-blockchain-executor /workspace/artifacts/logos-blockchain-executor
cp /tmp/nomos-node/target/debug/logos-blockchain-cli /workspace/artifacts/logos-blockchain-cli
rm -rf /tmp/nomos-node/target/debug/incremental

View File

@ -2,16 +2,6 @@
set -e
role="${1:-validator}"
bin_for_role() {
case "$1" in
validator) echo "/usr/bin/logos-blockchain-node" ;;
executor) echo "/usr/bin/logos-blockchain-executor" ;;
*) echo "Unknown role: $1" >&2; exit 2 ;;
esac
}
check_binary_arch() {
bin_path="$1"
label="$2"
@ -37,16 +27,15 @@ check_binary_arch() {
fi
}
bin_path="$(bin_for_role "$role")"
check_binary_arch "$bin_path" "logos-blockchain-${role}"
bin_path="/usr/bin/logos-blockchain-node"
check_binary_arch "$bin_path" "logos-blockchain-node"
KZG_CONTAINER_PATH="${NOMOS_KZG_CONTAINER_PATH:-/kzgrs_test_params/kzgrs_test_params}"
host_identifier_default="${role}-$(hostname -i)"
host_identifier_default="node-$(hostname -i)"
export CFG_FILE_PATH="/config.yaml" \
CFG_SERVER_ADDR="${CFG_SERVER_ADDR:-http://cfgsync:${NOMOS_CFGSYNC_PORT:-4400}}" \
CFG_HOST_IP=$(hostname -i) \
CFG_HOST_KIND="${CFG_HOST_KIND:-$role}" \
CFG_HOST_IDENTIFIER="${CFG_HOST_IDENTIFIER:-$host_identifier_default}" \
LOGOS_BLOCKCHAIN_KZGRS_PARAMS_PATH="${LOGOS_BLOCKCHAIN_KZGRS_PARAMS_PATH:-${KZG_CONTAINER_PATH}}" \
NOMOS_TIME_BACKEND="${NOMOS_TIME_BACKEND:-monotonic}" \

View File

@ -1,2 +0,0 @@
#!/bin/sh
exec /etc/nomos/scripts/run_nomos.sh executor

View File

@ -1,2 +1,2 @@
#!/bin/sh
exec /etc/nomos/scripts/run_nomos.sh validator
exec /etc/nomos/scripts/run_nomos.sh

View File

@ -19,11 +19,9 @@ cryptarchia-sync = { workspace = true }
groth16 = { workspace = true }
hex = { version = "0.4.3", default-features = false }
key-management-system-service = { workspace = true }
logos-blockchain-executor = { workspace = true, default-features = false, features = ["testing", "tracing"] }
nomos-api = { workspace = true }
nomos-blend-service = { workspace = true, features = ["libp2p"] }
nomos-core = { workspace = true }
nomos-da-dispersal = { workspace = true }
nomos-da-network-core = { workspace = true }
nomos-da-network-service = { workspace = true }
nomos-da-sampling = { workspace = true }

View File

@ -226,10 +226,6 @@ pub(crate) fn wallet_settings(config: &GeneralConfig) -> WalletServiceSettings {
wallet_settings_with_leader(config, true)
}
pub(crate) fn wallet_settings_for_executor(config: &GeneralConfig) -> WalletServiceSettings {
wallet_settings_with_leader(config, false)
}
fn wallet_settings_with_leader(
config: &GeneralConfig,
include_leader: bool,

View File

@ -1,127 +0,0 @@
use logos_blockchain_executor::config::Config as ExecutorConfig;
use nomos_da_dispersal::{
DispersalServiceSettings,
backend::kzgrs::{DispersalKZGRSBackendSettings, EncoderSettings},
};
use nomos_da_network_core::protocols::sampling::SubnetsConfig;
use nomos_da_network_service::{
NetworkConfig as DaNetworkConfig,
api::http::ApiAdapterSettings,
backends::libp2p::{
common::DaNetworkBackendSettings, executor::DaNetworkExecutorBackendSettings,
},
};
use nomos_node::{RocksBackendSettings, config::deployment::DeploymentSettings};
use nomos_sdp::SdpSettings;
use crate::{
nodes::{
blend::build_blend_service_config,
common::{
cryptarchia_config, cryptarchia_deployment, da_sampling_config, da_verifier_config,
http_config, mempool_config, mempool_deployment, testing_http_config, time_config,
time_deployment, tracing_settings, wallet_settings_for_executor,
},
},
timeouts,
topology::configs::GeneralConfig,
};
#[must_use]
pub fn create_executor_config(config: GeneralConfig) -> ExecutorConfig {
let network_config = config.network_config.clone();
let (blend_user_config, blend_deployment, network_deployment) =
build_blend_service_config(&config.blend_config);
let deployment_settings =
build_executor_deployment_settings(&config, blend_deployment, network_deployment);
ExecutorConfig {
network: network_config,
blend: blend_user_config,
deployment: deployment_settings,
cryptarchia: cryptarchia_config(&config),
da_network: DaNetworkConfig {
backend: build_executor_da_network_backend_settings(&config),
membership: config.da_config.membership.clone(),
api_adapter_settings: ApiAdapterSettings {
api_port: config.api_config.address.port(),
is_secure: false,
},
subnet_refresh_interval: config.da_config.subnets_refresh_interval,
subnet_threshold: config.da_config.num_samples as usize,
min_session_members: config.da_config.num_samples as usize,
},
da_verifier: da_verifier_config(&config),
tracing: tracing_settings(&config),
http: http_config(&config),
da_sampling: da_sampling_config(&config),
storage: rocks_storage_settings(),
da_dispersal: DispersalServiceSettings {
backend: build_dispersal_backend_settings(&config),
},
time: time_config(&config),
mempool: mempool_config(),
sdp: SdpSettings { declaration: None },
wallet: wallet_settings_for_executor(&config),
key_management: config.kms_config.clone(),
testing_http: testing_http_config(&config),
}
}
fn build_executor_deployment_settings(
config: &GeneralConfig,
blend_deployment: nomos_node::config::blend::deployment::Settings,
network_deployment: nomos_node::config::network::deployment::Settings,
) -> DeploymentSettings {
DeploymentSettings::new_custom(
blend_deployment,
network_deployment,
cryptarchia_deployment(config),
time_deployment(config),
mempool_deployment(),
)
}
fn build_executor_da_network_backend_settings(
config: &GeneralConfig,
) -> DaNetworkExecutorBackendSettings {
DaNetworkExecutorBackendSettings {
validator_settings: DaNetworkBackendSettings {
node_key: config.da_config.node_key.clone(),
listening_address: config.da_config.listening_address.clone(),
policy_settings: config.da_config.policy_settings.clone(),
monitor_settings: config.da_config.monitor_settings.clone(),
balancer_interval: config.da_config.balancer_interval,
redial_cooldown: config.da_config.redial_cooldown,
replication_settings: config.da_config.replication_settings,
subnets_settings: SubnetsConfig {
num_of_subnets: config.da_config.num_samples as usize,
shares_retry_limit: config.da_config.retry_shares_limit,
commitments_retry_limit: config.da_config.retry_commitments_limit,
},
},
num_subnets: config.da_config.num_subnets,
}
}
fn rocks_storage_settings() -> RocksBackendSettings {
RocksBackendSettings {
db_path: "./db".into(),
read_only: false,
column_family: Some("blocks".into()),
}
}
fn build_dispersal_backend_settings(config: &GeneralConfig) -> DispersalKZGRSBackendSettings {
DispersalKZGRSBackendSettings {
encoder_settings: EncoderSettings {
num_columns: config.da_config.num_subnets as usize,
with_cache: false,
global_params_path: config.da_config.global_params_path.clone(),
},
dispersal_timeout: timeouts::dispersal_timeout(),
retry_cooldown: timeouts::retry_cooldown(),
retry_limit: 2,
}
}

View File

@ -1,5 +1,4 @@
pub(crate) mod blend;
pub(crate) mod common;
pub mod executor;
pub mod kms;
pub mod validator;
pub mod node;

View File

@ -6,7 +6,7 @@ use nomos_da_network_service::{
backends::libp2p::common::DaNetworkBackendSettings,
};
use nomos_node::{
Config as ValidatorConfig, RocksBackendSettings, config::deployment::DeploymentSettings,
Config as NodeConfig, RocksBackendSettings, config::deployment::DeploymentSettings,
};
use nomos_sdp::SdpSettings;
@ -23,21 +23,21 @@ use crate::{
};
#[must_use]
pub fn create_validator_config(config: GeneralConfig) -> ValidatorConfig {
pub fn create_node_config(config: GeneralConfig) -> NodeConfig {
let network_config = config.network_config.clone();
let (blend_user_config, blend_deployment, network_deployment) =
build_blend_service_config(&config.blend_config);
let deployment_settings =
build_validator_deployment_settings(&config, blend_deployment, network_deployment);
build_node_deployment_settings(&config, blend_deployment, network_deployment);
ValidatorConfig {
NodeConfig {
network: network_config,
blend: blend_user_config,
deployment: deployment_settings,
cryptarchia: cryptarchia_config(&config),
da_network: DaNetworkConfig {
backend: build_validator_da_network_backend_settings(&config),
backend: build_node_da_network_backend_settings(&config),
membership: config.da_config.membership.clone(),
api_adapter_settings: ApiAdapterSettings {
api_port: config.api_config.address.port(),
@ -61,7 +61,7 @@ pub fn create_validator_config(config: GeneralConfig) -> ValidatorConfig {
}
}
fn build_validator_deployment_settings(
fn build_node_deployment_settings(
config: &GeneralConfig,
blend_deployment: nomos_node::config::blend::deployment::Settings,
network_deployment: nomos_node::config::network::deployment::Settings,
@ -75,7 +75,7 @@ fn build_validator_deployment_settings(
)
}
fn build_validator_da_network_backend_settings(config: &GeneralConfig) -> DaNetworkBackendSettings {
fn build_node_da_network_backend_settings(config: &GeneralConfig) -> DaNetworkBackendSettings {
let da_policy_settings = config.da_config.policy_settings.clone();
DaNetworkBackendSettings {

View File

@ -24,7 +24,6 @@ futures = { default-features = false, version = "0.3" }
groth16 = { workspace = true }
hex = { version = "0.4.3", default-features = false }
key-management-system-service = { workspace = true }
logos-blockchain-executor = { workspace = true, default-features = false, features = ["testing", "tracing"] }
nomos-core = { workspace = true }
nomos-da-network-core = { workspace = true }
nomos-da-network-service = { workspace = true }

View File

@ -5,13 +5,7 @@ use crate::scenario::{DynError, StartNodeOptions, StartedNode};
/// Interface for imperative, deployer-backed manual clusters.
#[async_trait]
pub trait ManualClusterHandle: Send + Sync {
async fn start_validator_with(
&self,
name: &str,
options: StartNodeOptions,
) -> Result<StartedNode, DynError>;
async fn start_executor_with(
async fn start_node_with(
&self,
name: &str,
options: StartNodeOptions,

View File

@ -1,111 +0,0 @@
use std::{
ops::Deref,
path::{Path, PathBuf},
time::Duration,
};
use logos_blockchain_executor::config::Config;
use nomos_tracing_service::LoggerLayer;
pub use testing_framework_config::nodes::executor::create_executor_config;
use tracing::{debug, info};
use super::{persist_tempdir, should_persist_tempdir};
use crate::{
IS_DEBUG_TRACING,
nodes::{
LOGS_PREFIX,
common::{
binary::{BinaryConfig, BinaryResolver},
lifecycle::{kill::kill_child, monitor::is_running},
node::{NodeAddresses, NodeConfigCommon, NodeHandle, SpawnNodeError, spawn_node},
},
},
};
const BIN_PATH: &str = "target/debug/logos-blockchain-executor";
fn binary_path() -> PathBuf {
let cfg = BinaryConfig {
env_var: "LOGOS_BLOCKCHAIN_EXECUTOR_BIN",
binary_name: "logos-blockchain-executor",
fallback_path: BIN_PATH,
shared_bin_subpath: "../assets/stack/bin/logos-blockchain-executor",
};
BinaryResolver::resolve_path(&cfg)
}
pub struct Executor {
handle: NodeHandle<Config>,
}
impl Deref for Executor {
type Target = NodeHandle<Config>;
fn deref(&self) -> &Self::Target {
&self.handle
}
}
impl Drop for Executor {
fn drop(&mut self) {
if should_persist_tempdir()
&& let Err(e) = persist_tempdir(&mut self.handle.tempdir, "logos-blockchain-executor")
{
debug!(error = ?e, "failed to persist executor tempdir");
}
debug!("stopping executor process");
kill_child(&mut self.handle.child);
}
}
impl Executor {
pub async fn spawn(config: Config, label: &str) -> Result<Self, SpawnNodeError> {
let log_prefix = format!("{LOGS_PREFIX}-{label}");
let handle = spawn_node(
config,
&log_prefix,
"executor.yaml",
binary_path(),
!*IS_DEBUG_TRACING,
)
.await?;
info!("executor spawned and ready");
Ok(Self { handle })
}
/// Check if the executor process is still running
pub fn is_running(&mut self) -> bool {
is_running(&mut self.handle.child)
}
/// Wait for the executor process to exit, with a timeout.
pub async fn wait_for_exit(&mut self, timeout: Duration) -> bool {
self.handle.wait_for_exit(timeout).await
}
}
impl NodeConfigCommon for Config {
fn set_logger(&mut self, logger: LoggerLayer) {
self.tracing.logger = logger;
}
fn set_paths(&mut self, base: &Path) {
self.storage.db_path = base.join("db");
base.clone_into(
&mut self
.da_verifier
.storage_adapter_settings
.blob_storage_directory,
);
}
fn addresses(&self) -> NodeAddresses {
(
self.http.backend_settings.address,
Some(self.testing_http.backend_settings.address),
)
}
}

View File

@ -1,7 +1,6 @@
mod api_client;
pub mod common;
pub mod executor;
pub mod validator;
pub mod node;
use std::sync::LazyLock;

View File

@ -2,7 +2,7 @@ use std::{ops::Deref, path::PathBuf, time::Duration};
use nomos_node::Config;
use nomos_tracing_service::LoggerLayer;
pub use testing_framework_config::nodes::validator::create_validator_config;
pub use testing_framework_config::nodes::node::create_node_config;
use tracing::{debug, info};
use super::{persist_tempdir, should_persist_tempdir};
@ -30,16 +30,11 @@ fn binary_path() -> PathBuf {
BinaryResolver::resolve_path(&cfg)
}
pub enum Pool {
Da,
Mantle,
}
pub struct Validator {
pub struct Node {
handle: NodeHandle<Config>,
}
impl Deref for Validator {
impl Deref for Node {
type Target = NodeHandle<Config>;
fn deref(&self) -> &Self::Target {
@ -47,26 +42,26 @@ impl Deref for Validator {
}
}
impl Drop for Validator {
impl Drop for Node {
fn drop(&mut self) {
if should_persist_tempdir()
&& let Err(e) = persist_tempdir(&mut self.handle.tempdir, "logos-blockchain-node")
{
debug!(error = ?e, "failed to persist validator tempdir");
debug!(error = ?e, "failed to persist node tempdir");
}
debug!("stopping validator process");
debug!("stopping node process");
kill_child(&mut self.handle.child);
}
}
impl Validator {
/// Check if the validator process is still running
impl Node {
/// Check if the node process is still running
pub fn is_running(&mut self) -> bool {
is_running(&mut self.handle.child)
}
/// Wait for the validator process to exit, with a timeout
/// Wait for the node process to exit, with a timeout
/// Returns true if the process exited within the timeout, false otherwise
pub async fn wait_for_exit(&mut self, timeout: Duration) -> bool {
self.handle.wait_for_exit(timeout).await
@ -77,13 +72,13 @@ impl Validator {
let handle = spawn_node(
config,
&log_prefix,
"validator.yaml",
"node.yaml",
binary_path(),
!*IS_DEBUG_TRACING,
)
.await?;
info!("validator spawned and ready");
info!("node spawned and ready");
Ok(Self { handle })
}

View File

@ -2,7 +2,7 @@ use async_trait::async_trait;
use reqwest::Url;
use super::DynError;
use crate::{nodes::ApiClient, topology::generation::NodeRole};
use crate::nodes::ApiClient;
/// Marker type used by scenario builders to request node control support.
#[derive(Clone, Copy, Debug, Default)]
@ -66,35 +66,21 @@ impl RequiresNodeControl for ObservabilityCapability {
const REQUIRED: bool = false;
}
/// Interface exposed by runners that can restart nodes at runtime.
/// Interface exposed by runners that can restart/start nodes at runtime.
#[async_trait]
pub trait NodeControlHandle: Send + Sync {
async fn restart_validator(&self, index: usize) -> Result<(), DynError>;
async fn restart_node(&self, index: usize) -> Result<(), DynError>;
async fn restart_executor(&self, index: usize) -> Result<(), DynError>;
async fn start_validator(&self, _name: &str) -> Result<StartedNode, DynError> {
Err("start_validator not supported by this deployer".into())
async fn start_node(&self, _name: &str) -> Result<StartedNode, DynError> {
Err("start_node not supported by this deployer".into())
}
async fn start_executor(&self, _name: &str) -> Result<StartedNode, DynError> {
Err("start_executor not supported by this deployer".into())
}
async fn start_validator_with(
async fn start_node_with(
&self,
_name: &str,
_options: StartNodeOptions,
) -> Result<StartedNode, DynError> {
Err("start_validator_with not supported by this deployer".into())
}
async fn start_executor_with(
&self,
_name: &str,
_options: StartNodeOptions,
) -> Result<StartedNode, DynError> {
Err("start_executor_with not supported by this deployer".into())
Err("start_node_with not supported by this deployer".into())
}
fn node_client(&self, _name: &str) -> Option<ApiClient> {
@ -105,6 +91,5 @@ pub trait NodeControlHandle: Send + Sync {
#[derive(Clone)]
pub struct StartedNode {
pub name: String,
pub role: NodeRole,
pub api: ApiClient,
}

View File

@ -75,13 +75,10 @@ pub fn apply_topology_overrides(
use_kzg_mount: bool,
) {
debug!(
validators = topology.validators().len(),
executors = topology.executors().len(),
use_kzg_mount,
"applying topology overrides to cfgsync config"
nodes = topology.nodes().len(),
use_kzg_mount, "applying topology overrides to cfgsync config"
);
let hosts = topology.validators().len() + topology.executors().len();
cfg.n_hosts = hosts;
cfg.n_hosts = topology.nodes().len();
let consensus = &topology.config().consensus_params;
cfg.security_param = consensus.security_param;
@ -89,9 +86,15 @@ pub fn apply_topology_overrides(
let config = topology.config();
cfg.wallet = config.wallet_config.clone();
cfg.ids = Some(topology.nodes().map(|node| node.id).collect());
cfg.da_ports = Some(topology.nodes().map(|node| node.da_port).collect());
cfg.blend_ports = Some(topology.nodes().map(|node| node.blend_port).collect());
cfg.ids = Some(topology.nodes().iter().map(|node| node.id).collect());
cfg.da_ports = Some(topology.nodes().iter().map(|node| node.da_port).collect());
cfg.blend_ports = Some(
topology
.nodes()
.iter()
.map(|node| node.blend_port)
.collect(),
);
let da = &config.da_params;
cfg.subnetwork_size = da.subnetwork_size;

View File

@ -104,8 +104,7 @@ pub type ScenarioBuilder = Builder<()>;
/// Builder for shaping the scenario topology.
pub struct TopologyConfigurator<Caps> {
builder: Builder<Caps>,
validators: usize,
executors: usize,
nodes: usize,
network_star: bool,
}
@ -124,14 +123,12 @@ impl<Caps: Default> Builder<Caps> {
}
#[must_use]
pub fn with_node_counts(validators: usize, executors: usize) -> Self {
Self::new(TopologyBuilder::new(TopologyConfig::with_node_numbers(
validators, executors,
)))
pub fn with_node_count(nodes: usize) -> Self {
Self::new(TopologyBuilder::new(TopologyConfig::with_node_count(nodes)))
}
/// Convenience constructor that immediately enters topology configuration,
/// letting callers set counts via `validators`/`executors`.
/// letting callers set counts via `nodes`.
pub fn topology() -> TopologyConfigurator<Caps> {
TopologyConfigurator::new(Self::new(TopologyBuilder::new(TopologyConfig::empty())))
}
@ -263,8 +260,7 @@ impl<Caps> Builder<Caps> {
let workloads: Vec<Arc<dyn Workload>> = workloads.into_iter().map(Arc::from).collect();
info!(
validators = generated.validators().len(),
executors = generated.executors().len(),
nodes = generated.nodes().len(),
duration_secs = duration.as_secs(),
workloads = workloads.len(),
expectations = expectations.len(),
@ -285,23 +281,15 @@ impl<Caps> TopologyConfigurator<Caps> {
const fn new(builder: Builder<Caps>) -> Self {
Self {
builder,
validators: 0,
executors: 0,
nodes: 0,
network_star: false,
}
}
/// Set the number of validator nodes.
/// Set the number of nodes.
#[must_use]
pub fn validators(mut self, count: usize) -> Self {
self.validators = count;
self
}
/// Set the number of executor nodes.
#[must_use]
pub fn executors(mut self, count: usize) -> Self {
self.executors = count;
pub fn nodes(mut self, count: usize) -> Self {
self.nodes = count;
self
}
@ -315,7 +303,7 @@ impl<Caps> TopologyConfigurator<Caps> {
/// Finalize and return the underlying scenario builder.
#[must_use]
pub fn apply(self) -> Builder<Caps> {
let mut config = TopologyConfig::with_node_numbers(self.validators, self.executors);
let mut config = TopologyConfig::with_node_count(self.nodes);
if self.network_star {
config.network_params.libp2p_network_layout = Libp2pNetworkLayout::Star;
}

View File

@ -7,35 +7,18 @@ use thiserror::Error;
use tokio::time::{Instant, sleep};
use tracing::{debug, info};
/// Role used for labelling readiness probes.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum NodeRole {
Validator,
Executor,
}
impl NodeRole {
#[must_use]
pub const fn label(self) -> &'static str {
match self {
Self::Validator => "validator",
Self::Executor => "executor",
}
}
}
/// Error raised when HTTP readiness checks time out.
#[derive(Clone, Copy, Debug, Error)]
#[error("timeout waiting for {role} HTTP endpoint on port {port} after {timeout:?}", role = role.label())]
#[error("timeout waiting for {role} HTTP endpoint on port {port} after {timeout:?}")]
pub struct HttpReadinessError {
role: NodeRole,
role: &'static str,
port: u16,
timeout: Duration,
}
impl HttpReadinessError {
#[must_use]
pub const fn new(role: NodeRole, port: u16, timeout: Duration) -> Self {
pub const fn new(role: &'static str, port: u16, timeout: Duration) -> Self {
Self {
role,
port,
@ -44,7 +27,7 @@ impl HttpReadinessError {
}
#[must_use]
pub const fn role(&self) -> NodeRole {
pub const fn role(&self) -> &'static str {
self.role
}
@ -62,7 +45,7 @@ impl HttpReadinessError {
/// Wait for HTTP readiness on the provided ports against localhost.
pub async fn wait_for_http_ports(
ports: &[u16],
role: NodeRole,
role: &'static str,
timeout_duration: Duration,
poll_interval: Duration,
) -> Result<(), HttpReadinessError> {
@ -72,7 +55,7 @@ pub async fn wait_for_http_ports(
/// Wait for HTTP readiness on the provided ports against a specific host.
pub async fn wait_for_http_ports_with_host(
ports: &[u16],
role: NodeRole,
role: &'static str,
host: &str,
timeout_duration: Duration,
poll_interval: Duration,
@ -82,7 +65,7 @@ pub async fn wait_for_http_ports_with_host(
}
info!(
role = role.label(),
role,
?ports,
host,
timeout_secs = timeout_duration.as_secs_f32(),
@ -108,13 +91,13 @@ pub async fn wait_for_http_ports_with_host(
async fn wait_for_single_port(
client: ReqwestClient,
port: u16,
role: NodeRole,
role: &'static str,
host: &str,
timeout_duration: Duration,
poll_interval: Duration,
) -> Result<(), HttpReadinessError> {
let url = format!("http://{host}:{port}{}", paths::CRYPTARCHIA_INFO);
debug!(role = role.label(), %url, "probing HTTP endpoint");
debug!(role, %url, "probing HTTP endpoint");
let start = Instant::now();
let deadline = start + timeout_duration;
let mut attempts: u64 = 0;
@ -125,7 +108,7 @@ async fn wait_for_single_port(
let last_failure: Option<String> = match client.get(&url).send().await {
Ok(response) if response.status().is_success() => {
info!(
role = role.label(),
role,
port,
host,
%url,
@ -144,7 +127,7 @@ async fn wait_for_single_port(
if attempts == 1 || attempts % 10 == 0 {
debug!(
role = role.label(),
role,
port,
host,
%url,
@ -157,7 +140,7 @@ async fn wait_for_single_port(
if Instant::now() >= deadline {
info!(
role = role.label(),
role,
port,
host,
%url,

View File

@ -6,8 +6,8 @@ use tracing::warn;
pub const CONSENSUS_PROCESSED_BLOCKS: &str = "consensus_processed_blocks";
pub const CONSENSUS_TRANSACTIONS_TOTAL: &str = "consensus_transactions_total";
const CONSENSUS_TRANSACTIONS_VALIDATOR_QUERY: &str =
r#"sum(consensus_transactions_total{job=~"validator-.*"})"#;
const CONSENSUS_TRANSACTIONS_NODE_QUERY: &str =
r#"sum(consensus_transactions_total{job=~"node-.*"})"#;
/// Telemetry handles available during a run.
#[derive(Clone, Default)]
@ -71,21 +71,21 @@ impl Metrics {
.prometheus()
.ok_or_else(|| MetricsError::new("prometheus endpoint unavailable"))?;
match handle.instant_samples(CONSENSUS_TRANSACTIONS_VALIDATOR_QUERY) {
match handle.instant_samples(CONSENSUS_TRANSACTIONS_NODE_QUERY) {
Ok(samples) if !samples.is_empty() => {
return Ok(samples.into_iter().map(|sample| sample.value).sum());
}
Ok(_) => {
warn!(
query = CONSENSUS_TRANSACTIONS_VALIDATOR_QUERY,
"validator-specific consensus transaction metric returned no samples; falling back to aggregate counter"
query = CONSENSUS_TRANSACTIONS_NODE_QUERY,
"node-specific consensus transaction metric returned no samples; falling back to aggregate counter"
);
}
Err(err) => {
warn!(
query = CONSENSUS_TRANSACTIONS_VALIDATOR_QUERY,
query = CONSENSUS_TRANSACTIONS_NODE_QUERY,
error = %err,
"failed to query validator-specific consensus transaction metric; falling back to aggregate counter"
"failed to query node-specific consensus transaction metric; falling back to aggregate counter"
);
}
}

View File

@ -1,4 +1,5 @@
use std::{
future::Future,
pin::Pin,
sync::{Arc, RwLock},
};
@ -11,7 +12,7 @@ use crate::{
topology::{deployment::Topology, generation::GeneratedTopology},
};
/// Collection of API clients for the validator and executor set.
/// Collection of API clients for the node set.
#[derive(Clone, Default)]
pub struct NodeClients {
inner: Arc<RwLock<NodeClientsInner>>,
@ -19,118 +20,60 @@ pub struct NodeClients {
#[derive(Default)]
struct NodeClientsInner {
validators: Vec<ApiClient>,
executors: Vec<ApiClient>,
nodes: Vec<ApiClient>,
}
impl NodeClients {
#[must_use]
/// Build clients from preconstructed vectors.
pub fn new(validators: Vec<ApiClient>, executors: Vec<ApiClient>) -> Self {
/// Build clients from a preconstructed vector.
pub fn new(nodes: Vec<ApiClient>) -> Self {
Self {
inner: Arc::new(RwLock::new(NodeClientsInner {
validators,
executors,
})),
inner: Arc::new(RwLock::new(NodeClientsInner { nodes })),
}
}
#[must_use]
/// Derive clients from a spawned topology.
pub fn from_topology(_descriptors: &GeneratedTopology, topology: &Topology) -> Self {
let validator_clients = topology.validators().iter().map(|node| {
let node_clients = topology.nodes().iter().map(|node| {
let testing = node.testing_url();
ApiClient::from_urls(node.url(), testing)
});
let executor_clients = topology.executors().iter().map(|node| {
let testing = node.testing_url();
ApiClient::from_urls(node.url(), testing)
});
Self::new(validator_clients.collect(), executor_clients.collect())
Self::new(node_clients.collect())
}
#[must_use]
/// Validator API clients.
pub fn validator_clients(&self) -> Vec<ApiClient> {
/// Node API clients.
pub fn node_clients(&self) -> Vec<ApiClient> {
self.inner
.read()
.unwrap_or_else(|poisoned| poisoned.into_inner())
.validators
.nodes
.clone()
}
#[must_use]
/// Executor API clients.
pub fn executor_clients(&self) -> Vec<ApiClient> {
self.inner
.read()
.unwrap_or_else(|poisoned| poisoned.into_inner())
.executors
.clone()
}
#[must_use]
/// Choose a random validator client if present.
pub fn random_validator(&self) -> Option<ApiClient> {
let validators = self.validator_clients();
if validators.is_empty() {
/// Choose a random node client if present.
pub fn random_node(&self) -> Option<ApiClient> {
let nodes = self.node_clients();
if nodes.is_empty() {
return None;
}
let mut rng = thread_rng();
let idx = rng.gen_range(0..validators.len());
validators.get(idx).cloned()
}
#[must_use]
/// Choose a random executor client if present.
pub fn random_executor(&self) -> Option<ApiClient> {
let executors = self.executor_clients();
if executors.is_empty() {
return None;
}
let mut rng = thread_rng();
let idx = rng.gen_range(0..executors.len());
executors.get(idx).cloned()
let idx = rng.gen_range(0..nodes.len());
nodes.get(idx).cloned()
}
/// Iterator over all clients.
pub fn all_clients(&self) -> Vec<ApiClient> {
let guard = self
.inner
.read()
.unwrap_or_else(|poisoned| poisoned.into_inner());
guard
.validators
.iter()
.chain(guard.executors.iter())
.cloned()
.collect()
self.node_clients()
}
#[must_use]
/// Choose any random client from validators+executors.
/// Choose any random client.
pub fn any_client(&self) -> Option<ApiClient> {
let guard = self
.inner
.read()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let validator_count = guard.validators.len();
let executor_count = guard.executors.len();
let total = validator_count + executor_count;
if total == 0 {
return None;
}
let mut rng = thread_rng();
let choice = rng.gen_range(0..total);
if choice < validator_count {
guard.validators.get(choice).cloned()
} else {
guard.executors.get(choice - validator_count).cloned()
}
self.random_node()
}
#[must_use]
@ -139,22 +82,13 @@ impl NodeClients {
ClusterClient::new(self)
}
pub fn add_validator(&self, client: ApiClient) {
pub fn add_node(&self, client: ApiClient) {
let mut guard = self
.inner
.write()
.unwrap_or_else(|poisoned| poisoned.into_inner());
guard.validators.push(client);
}
pub fn add_executor(&self, client: ApiClient) {
let mut guard = self
.inner
.write()
.unwrap_or_else(|poisoned| poisoned.into_inner());
guard.executors.push(client);
guard.nodes.push(client);
}
pub fn clear(&self) {
@ -163,8 +97,7 @@ impl NodeClients {
.write()
.unwrap_or_else(|poisoned| poisoned.into_inner());
guard.validators.clear();
guard.executors.clear();
guard.nodes.clear();
}
}

View File

@ -4,7 +4,6 @@ use nomos_core::{
mantle::GenesisTx as _,
sdp::{Locator, ServiceType},
};
use nomos_da_network_core::swarm::DAConnectionPolicySettings;
use testing_framework_config::topology::{
configs::{
api::{ApiConfigError, create_api_configs},
@ -24,12 +23,11 @@ use thiserror::Error;
use crate::topology::{
configs::{GeneralConfig, time::default_time_config},
generation::{GeneratedNodeConfig, GeneratedTopology, NodeRole},
generation::{GeneratedNodeConfig, GeneratedTopology},
utils::{TopologyResolveError, create_kms_configs, resolve_ids, resolve_ports},
};
const DEFAULT_DA_BALANCER_INTERVAL: Duration = Duration::from_secs(1);
const VALIDATOR_EXECUTOR_DA_BALANCER_INTERVAL: Duration = Duration::from_secs(5);
#[derive(Debug, Error)]
pub enum TopologyBuildError {
@ -58,8 +56,7 @@ pub enum TopologyBuildError {
/// High-level topology settings used to generate node configs for a scenario.
#[derive(Clone)]
pub struct TopologyConfig {
pub n_validators: usize,
pub n_executors: usize,
pub n_nodes: usize,
pub consensus_params: ConsensusParams,
pub da_params: DaParams,
pub network_params: NetworkParams,
@ -71,8 +68,7 @@ impl TopologyConfig {
#[must_use]
pub fn empty() -> Self {
Self {
n_validators: 0,
n_executors: 0,
n_nodes: 0,
consensus_params: ConsensusParams::default_for_participants(1),
da_params: DaParams::default(),
network_params: NetworkParams::default(),
@ -81,11 +77,10 @@ impl TopologyConfig {
}
#[must_use]
/// Convenience config with two validators for consensus-only scenarios.
pub fn two_validators() -> Self {
/// Convenience config with two nodes for consensus-only scenarios.
pub fn two_nodes() -> Self {
Self {
n_validators: 2,
n_executors: 0,
n_nodes: 2,
consensus_params: ConsensusParams::default_for_participants(2),
da_params: DaParams::default(),
network_params: NetworkParams::default(),
@ -94,36 +89,9 @@ impl TopologyConfig {
}
#[must_use]
/// Single validator + single executor config for minimal dual-role setups.
pub fn validator_and_executor() -> Self {
Self {
n_validators: 1,
n_executors: 1,
consensus_params: ConsensusParams::default_for_participants(2),
da_params: DaParams {
dispersal_factor: 2,
subnetwork_size: 2,
num_subnets: 2,
policy_settings: DAConnectionPolicySettings {
min_dispersal_peers: 1,
min_replication_peers: 1,
max_dispersal_failures: 0,
max_sampling_failures: 0,
max_replication_failures: 0,
malicious_threshold: 0,
},
balancer_interval: DEFAULT_DA_BALANCER_INTERVAL,
..Default::default()
},
network_params: NetworkParams::default(),
wallet_config: WalletConfig::default(),
}
}
#[must_use]
/// Build a topology with explicit validator and executor counts.
pub fn with_node_numbers(validators: usize, executors: usize) -> Self {
let participants = validators + executors;
/// Build a topology with explicit node count.
pub fn with_node_count(nodes: usize) -> Self {
let participants = nodes;
let mut da_params = DaParams::default();
let da_nodes = participants;
@ -145,8 +113,7 @@ impl TopologyConfig {
}
Self {
n_validators: validators,
n_executors: executors,
n_nodes: nodes,
consensus_params: ConsensusParams::default_for_participants(participants),
da_params,
network_params: NetworkParams::default(),
@ -154,37 +121,6 @@ impl TopologyConfig {
}
}
#[must_use]
/// Build a topology with one executor and a configurable validator set.
pub fn validators_and_executor(
num_validators: usize,
num_subnets: usize,
dispersal_factor: usize,
) -> Self {
Self {
n_validators: num_validators,
n_executors: 1,
consensus_params: ConsensusParams::default_for_participants(num_validators + 1),
da_params: DaParams {
dispersal_factor,
subnetwork_size: num_subnets,
num_subnets: num_subnets as u16,
policy_settings: DAConnectionPolicySettings {
min_dispersal_peers: num_subnets,
min_replication_peers: dispersal_factor - 1,
max_dispersal_failures: 0,
max_sampling_failures: 0,
max_replication_failures: 0,
malicious_threshold: 0,
},
balancer_interval: VALIDATOR_EXECUTOR_DA_BALANCER_INTERVAL,
..Default::default()
},
network_params: NetworkParams::default(),
wallet_config: WalletConfig::default(),
}
}
#[must_use]
pub const fn wallet(&self) -> &WalletConfig {
&self.wallet_config
@ -234,23 +170,9 @@ impl TopologyBuilder {
}
#[must_use]
pub const fn with_validator_count(mut self, validators: usize) -> Self {
self.config.n_validators = validators;
self
}
#[must_use]
/// Set executor count.
pub const fn with_executor_count(mut self, executors: usize) -> Self {
self.config.n_executors = executors;
self
}
#[must_use]
/// Set validator and executor counts together.
pub const fn with_node_counts(mut self, validators: usize, executors: usize) -> Self {
self.config.n_validators = validators;
self.config.n_executors = executors;
/// Set total node count.
pub const fn with_node_count(mut self, nodes: usize) -> Self {
self.config.n_nodes = nodes;
self
}
@ -312,7 +234,7 @@ impl TopologyBuilder {
let kms_configs =
create_kms_configs(&blend_configs, &da_configs, &config.wallet_config.accounts);
let (validators, executors) = build_node_descriptors(
let nodes = build_node_descriptors(
&config,
n_participants,
&ids,
@ -329,11 +251,7 @@ impl TopologyBuilder {
&time_config,
)?;
Ok(GeneratedTopology {
config,
validators,
executors,
})
Ok(GeneratedTopology { config, nodes })
}
#[must_use]
@ -343,7 +261,7 @@ impl TopologyBuilder {
}
fn participant_count(config: &TopologyConfig) -> Result<usize, TopologyBuildError> {
let n_participants = config.n_validators + config.n_executors;
let n_participants = config.n_nodes;
if n_participants == 0 {
return Err(TopologyBuildError::EmptyParticipants);
}
@ -436,9 +354,8 @@ fn build_node_descriptors(
tracing_configs: &[testing_framework_config::topology::configs::tracing::GeneralTracingConfig],
kms_configs: &[key_management_system_service::backend::preload::PreloadKMSBackendSettings],
time_config: &testing_framework_config::topology::configs::time::GeneralTimeConfig,
) -> Result<(Vec<GeneratedNodeConfig>, Vec<GeneratedNodeConfig>), TopologyBuildError> {
let mut validators = Vec::with_capacity(config.n_validators);
let mut executors = Vec::with_capacity(config.n_executors);
) -> Result<Vec<GeneratedNodeConfig>, TopologyBuildError> {
let mut nodes = Vec::with_capacity(config.n_nodes);
for i in 0..n_participants {
let consensus_config =
@ -468,31 +385,17 @@ fn build_node_descriptors(
kms_config,
};
let (role, index) = node_role_and_index(i, config.n_validators);
let descriptor = GeneratedNodeConfig {
role,
index,
index: i,
id,
general,
da_port,
blend_port,
};
match role {
NodeRole::Validator => validators.push(descriptor),
NodeRole::Executor => executors.push(descriptor),
}
nodes.push(descriptor);
}
Ok((validators, executors))
}
fn node_role_and_index(i: usize, n_validators: usize) -> (NodeRole, usize) {
if i < n_validators {
(NodeRole::Validator, i)
} else {
(NodeRole::Executor, i - n_validators)
}
Ok(nodes)
}
fn get_cloned<T: Clone>(

View File

@ -6,8 +6,7 @@ use thiserror::Error;
use crate::{
nodes::{
common::node::SpawnNodeError,
executor::{Executor, create_executor_config},
validator::{Validator, create_validator_config},
node::{Node, create_node_config},
},
topology::{
config::{TopologyBuildError, TopologyBuilder, TopologyConfig},
@ -23,11 +22,10 @@ use crate::{
/// Runtime representation of a spawned topology with running nodes.
pub struct Topology {
pub(crate) validators: Vec<Validator>,
pub(crate) executors: Vec<Executor>,
pub(crate) nodes: Vec<Node>,
}
pub type DeployedNodes = (Vec<Validator>, Vec<Executor>);
pub type DeployedNodes = Vec<Node>;
#[derive(Debug, Error)]
pub enum SpawnTopologyError {
@ -40,20 +38,15 @@ pub enum SpawnTopologyError {
impl Topology {
pub async fn spawn(config: TopologyConfig) -> Result<Self, SpawnTopologyError> {
let generated = TopologyBuilder::new(config.clone()).build()?;
let n_validators = config.n_validators;
let n_executors = config.n_executors;
let node_configs = generated
.nodes()
.iter()
.map(|node| node.general.clone())
.collect::<Vec<_>>();
let (validators, executors) =
Self::spawn_validators_executors(node_configs, n_validators, n_executors).await?;
let nodes = Self::spawn_nodes(node_configs).await?;
Ok(Self {
validators,
executors,
})
Ok(Self { nodes })
}
pub async fn spawn_with_empty_membership(
@ -70,49 +63,30 @@ impl Topology {
let node_configs = generated
.nodes()
.iter()
.map(|node| node.general.clone())
.collect::<Vec<_>>();
let (validators, executors) =
Self::spawn_validators_executors(node_configs, config.n_validators, config.n_executors)
.await?;
let nodes = Self::spawn_nodes(node_configs).await?;
Ok(Self {
validators,
executors,
})
Ok(Self { nodes })
}
pub(crate) async fn spawn_validators_executors(
config: Vec<GeneralConfig>,
n_validators: usize,
n_executors: usize,
pub(crate) async fn spawn_nodes(
configs: Vec<GeneralConfig>,
) -> Result<DeployedNodes, SpawnTopologyError> {
let mut validators = Vec::new();
for i in 0..n_validators {
let config = create_validator_config(config[i].clone());
let label = format!("validator-{i}");
validators.push(Validator::spawn(config, &label).await?);
let mut nodes = Vec::with_capacity(configs.len());
for (idx, config) in configs.into_iter().enumerate() {
let config = create_node_config(config);
let label = format!("node-{idx}");
nodes.push(Node::spawn(config, &label).await?);
}
let mut executors = Vec::new();
for i in 0..n_executors {
let config = create_executor_config(config[n_validators + i].clone());
let label = format!("executor-{i}");
executors.push(Executor::spawn(config, &label).await?);
}
Ok((validators, executors))
Ok(nodes)
}
#[must_use]
pub fn validators(&self) -> &[Validator] {
&self.validators
}
#[must_use]
pub fn executors(&self) -> &[Executor] {
&self.executors
pub fn nodes(&self) -> &[Node] {
&self.nodes
}
pub async fn wait_network_ready(&self) -> Result<(), ReadinessError> {
@ -136,7 +110,7 @@ impl Topology {
}
pub async fn wait_da_balancer_ready(&self) -> Result<(), ReadinessError> {
if self.validators.is_empty() && self.executors.is_empty() {
if self.nodes.is_empty() {
return Ok(());
}
@ -174,9 +148,7 @@ impl Topology {
session: SessionNumber,
expect_non_empty: bool,
) -> Result<(), ReadinessError> {
let total_nodes = self.validators.len() + self.executors.len();
if total_nodes == 0 {
if self.nodes.is_empty() {
return Ok(());
}
@ -193,19 +165,14 @@ impl Topology {
}
fn node_listen_ports(&self) -> Vec<u16> {
self.validators
self.nodes
.iter()
.map(|node| node.config().network.backend.swarm.port)
.chain(
self.executors
.iter()
.map(|node| node.config().network.backend.swarm.port),
)
.collect()
}
fn node_initial_peer_ports(&self) -> Vec<HashSet<u16>> {
self.validators
self.nodes
.iter()
.map(|node| {
node.config()
@ -216,34 +183,14 @@ impl Topology {
.filter_map(multiaddr_port)
.collect::<HashSet<u16>>()
})
.chain(self.executors.iter().map(|node| {
node.config()
.network
.backend
.initial_peers
.iter()
.filter_map(multiaddr_port)
.collect::<HashSet<u16>>()
}))
.collect()
}
fn node_labels(&self) -> Vec<String> {
self.validators
self.nodes
.iter()
.enumerate()
.map(|(idx, node)| {
format!(
"validator#{idx}@{}",
node.config().network.backend.swarm.port
)
})
.chain(self.executors.iter().enumerate().map(|(idx, node)| {
format!(
"executor#{idx}@{}",
node.config().network.backend.swarm.port
)
}))
.map(|(idx, node)| format!("node#{idx}@{}", node.config().network.backend.swarm.port))
.collect()
}
}

View File

@ -9,17 +9,9 @@ use crate::topology::{
readiness::{HttpMembershipReadiness, HttpNetworkReadiness, ReadinessCheck, ReadinessError},
};
/// Node role within the generated topology.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum NodeRole {
Validator,
Executor,
}
/// Fully generated configuration for an individual node.
#[derive(Clone)]
pub struct GeneratedNodeConfig {
pub role: NodeRole,
pub index: usize,
pub id: [u8; 32],
pub general: GeneralConfig,
@ -28,12 +20,6 @@ pub struct GeneratedNodeConfig {
}
impl GeneratedNodeConfig {
#[must_use]
/// Logical role of the node.
pub const fn role(&self) -> NodeRole {
self.role
}
#[must_use]
/// Zero-based index within its role group.
pub const fn index(&self) -> usize {
@ -61,8 +47,7 @@ impl GeneratedNodeConfig {
#[derive(Clone)]
pub struct GeneratedTopology {
pub(crate) config: TopologyConfig,
pub(crate) validators: Vec<GeneratedNodeConfig>,
pub(crate) executors: Vec<GeneratedNodeConfig>,
pub(crate) nodes: Vec<GeneratedNodeConfig>,
}
impl GeneratedTopology {
@ -73,26 +58,15 @@ impl GeneratedTopology {
}
#[must_use]
/// All validator configs.
pub fn validators(&self) -> &[GeneratedNodeConfig] {
&self.validators
}
#[must_use]
/// All executor configs.
pub fn executors(&self) -> &[GeneratedNodeConfig] {
&self.executors
}
/// Iterator over all node configs in role order.
pub fn nodes(&self) -> impl Iterator<Item = &GeneratedNodeConfig> {
self.validators.iter().chain(self.executors.iter())
/// All node configs.
pub fn nodes(&self) -> &[GeneratedNodeConfig] {
&self.nodes
}
#[must_use]
/// Slot duration from the first node (assumes homogeneous configs).
pub fn slot_duration(&self) -> Option<Duration> {
self.validators
self.nodes
.first()
.map(|node| node.general.time_config.slot_duration)
}
@ -106,55 +80,33 @@ impl GeneratedTopology {
pub async fn spawn_local(&self) -> Result<Topology, SpawnTopologyError> {
let configs = self
.nodes()
.iter()
.map(|node| node.general.clone())
.collect::<Vec<_>>();
let (validators, executors) = Topology::spawn_validators_executors(
configs,
self.config.n_validators,
self.config.n_executors,
)
.await?;
Ok(Topology {
validators,
executors,
})
let nodes = Topology::spawn_nodes(configs).await?;
Ok(Topology { nodes })
}
pub async fn wait_remote_readiness(
&self,
// Node endpoints
validator_endpoints: &[Url],
executor_endpoints: &[Url],
// Membership endpoints
validator_membership_endpoints: Option<&[Url]>,
executor_membership_endpoints: Option<&[Url]>,
node_endpoints: &[Url],
membership_endpoints: Option<&[Url]>,
) -> Result<(), ReadinessError> {
let total_nodes = self.validators.len() + self.executors.len();
if total_nodes == 0 {
if self.nodes.is_empty() {
return Ok(());
}
let labels = self.labels();
let client = Client::new();
let endpoints =
collect_node_endpoints(self, validator_endpoints, executor_endpoints, total_nodes);
let endpoints = collect_node_endpoints(self, node_endpoints);
wait_for_network_readiness(self, &client, &endpoints, &labels).await?;
if validator_membership_endpoints.is_none() && executor_membership_endpoints.is_none() {
let Some(membership_endpoints) = membership_endpoints else {
return Ok(());
}
let membership_endpoints = collect_membership_endpoints(
self,
total_nodes,
validator_membership_endpoints,
executor_membership_endpoints,
);
};
let membership_check = HttpMembershipReadiness {
client: &client,
@ -168,19 +120,14 @@ impl GeneratedTopology {
}
fn listen_ports(&self) -> Vec<u16> {
self.validators
self.nodes
.iter()
.map(|node| node.general.network_config.backend.swarm.port)
.chain(
self.executors
.iter()
.map(|node| node.general.network_config.backend.swarm.port),
)
.collect()
}
fn initial_peer_ports(&self) -> Vec<HashSet<u16>> {
self.validators
self.nodes
.iter()
.map(|node| {
node.general
@ -191,59 +138,30 @@ impl GeneratedTopology {
.filter_map(crate::topology::utils::multiaddr_port)
.collect::<HashSet<u16>>()
})
.chain(self.executors.iter().map(|node| {
node.general
.network_config
.backend
.initial_peers
.iter()
.filter_map(crate::topology::utils::multiaddr_port)
.collect::<HashSet<u16>>()
}))
.collect()
}
fn labels(&self) -> Vec<String> {
self.validators
self.nodes
.iter()
.enumerate()
.map(|(idx, node)| {
format!(
"validator#{idx}@{}",
"node#{idx}@{}",
node.general.network_config.backend.swarm.port
)
})
.chain(self.executors.iter().enumerate().map(|(idx, node)| {
format!(
"executor#{idx}@{}",
node.general.network_config.backend.swarm.port
)
}))
.collect()
}
}
fn collect_node_endpoints(
topology: &GeneratedTopology,
validator_endpoints: &[Url],
executor_endpoints: &[Url],
total_nodes: usize,
) -> Vec<Url> {
fn collect_node_endpoints(topology: &GeneratedTopology, node_endpoints: &[Url]) -> Vec<Url> {
assert_eq!(
topology.validators.len(),
validator_endpoints.len(),
"validator endpoints must match topology"
topology.nodes.len(),
node_endpoints.len(),
"node endpoints must match topology"
);
assert_eq!(
topology.executors.len(),
executor_endpoints.len(),
"executor endpoints must match topology"
);
let mut endpoints = Vec::with_capacity(total_nodes);
endpoints.extend_from_slice(validator_endpoints);
endpoints.extend_from_slice(executor_endpoints);
endpoints
node_endpoints.to_vec()
}
async fn wait_for_network_readiness(
@ -271,52 +189,6 @@ async fn wait_for_network_readiness(
network_check.wait().await
}
fn collect_membership_endpoints(
topology: &GeneratedTopology,
total_nodes: usize,
validator_membership_endpoints: Option<&[Url]>,
executor_membership_endpoints: Option<&[Url]>,
) -> Vec<Url> {
let mut membership_endpoints = Vec::with_capacity(total_nodes);
membership_endpoints.extend(collect_role_membership_endpoints(
&topology.validators,
validator_membership_endpoints,
"validator membership endpoints must match topology",
));
membership_endpoints.extend(collect_role_membership_endpoints(
&topology.executors,
executor_membership_endpoints,
"executor membership endpoints must match topology",
));
membership_endpoints
}
fn collect_role_membership_endpoints(
nodes: &[GeneratedNodeConfig],
membership_endpoints: Option<&[Url]>,
mismatch_message: &'static str,
) -> Vec<Url> {
match membership_endpoints {
Some(urls) => {
assert_eq!(nodes.len(), urls.len(), "{mismatch_message}");
urls.to_vec()
}
None => nodes
.iter()
.map(|node| testing_base_url(node.testing_http_port()))
.collect(),
}
}
fn testing_base_url(port: u16) -> Url {
Url::parse(&format!("http://127.0.0.1:{port}/")).unwrap_or_else(|_| unsafe {
// Safety: `port` is a valid u16 port.
std::hint::unreachable_unchecked()
})
}
pub fn find_expected_peer_counts(
listen_ports: &[u16],
initial_peer_ports: &[HashSet<u16>],

View File

@ -23,33 +23,17 @@ impl<'a> ReadinessCheck<'a> for DaBalancerReadiness<'a> {
async fn collect(&'a self) -> Self::Data {
let mut data = Vec::new();
for (idx, validator) in self.topology.validators.iter().enumerate() {
for (idx, node) in self.topology.nodes.iter().enumerate() {
let label = self
.labels
.get(idx)
.cloned()
.unwrap_or_else(|| format!("validator#{idx}"));
.unwrap_or_else(|| format!("node#{idx}"));
data.push(
(
label,
validator.config().da_network.subnet_threshold,
validator.api().balancer_stats().await,
)
.into(),
);
}
for (offset, executor) in self.topology.executors.iter().enumerate() {
let label_index = self.topology.validators.len() + offset;
let label = self
.labels
.get(label_index)
.cloned()
.unwrap_or_else(|| format!("executor#{offset}"));
data.push(
(
label,
executor.config().da_network.subnet_threshold,
executor.api().balancer_stats().await,
node.config().da_network.subnet_threshold,
node.api().balancer_stats().await,
)
.into(),
);
@ -58,7 +42,7 @@ impl<'a> ReadinessCheck<'a> for DaBalancerReadiness<'a> {
}
fn is_ready(&self, data: &Self::Data) -> bool {
if self.topology.validators.len() + self.topology.executors.len() <= 1 {
if self.topology.nodes.len() <= 1 {
return true;
}
data.iter().all(|entry| {

View File

@ -38,14 +38,7 @@ impl<'a> ReadinessCheck<'a> for MembershipReadiness<'a> {
type Data = Vec<NodeMembershipStatus>;
async fn collect(&'a self) -> Self::Data {
let (validator_statuses, executor_statuses) = tokio::join!(
collect_validator_statuses(self),
collect_executor_statuses(self)
);
validator_statuses
.into_iter()
.chain(executor_statuses)
.collect()
collect_node_statuses(self).await
}
fn is_ready(&self, data: &Self::Data) -> bool {
@ -107,12 +100,10 @@ impl<'a> ReadinessCheck<'a> for HttpMembershipReadiness<'a> {
}
}
async fn collect_validator_statuses(
readiness: &MembershipReadiness<'_>,
) -> Vec<NodeMembershipStatus> {
let validator_futures = readiness
async fn collect_node_statuses(readiness: &MembershipReadiness<'_>) -> Vec<NodeMembershipStatus> {
let node_futures = readiness
.topology
.validators
.nodes
.iter()
.enumerate()
.map(|(idx, node)| {
@ -120,7 +111,7 @@ async fn collect_validator_statuses(
.labels
.get(idx)
.cloned()
.unwrap_or_else(|| format!("validator#{idx}"));
.unwrap_or_else(|| format!("node#{idx}"));
async move {
let result = node
.api()
@ -131,36 +122,7 @@ async fn collect_validator_statuses(
}
});
futures::future::join_all(validator_futures).await
}
async fn collect_executor_statuses(
readiness: &MembershipReadiness<'_>,
) -> Vec<NodeMembershipStatus> {
let offset = readiness.topology.validators.len();
let executor_futures = readiness
.topology
.executors
.iter()
.enumerate()
.map(|(idx, node)| {
let global_idx = offset + idx;
let label = readiness
.labels
.get(global_idx)
.cloned()
.unwrap_or_else(|| format!("executor#{idx}"));
async move {
let result = node
.api()
.da_get_membership_checked(&readiness.session)
.await
.map_err(MembershipError::from);
NodeMembershipStatus { label, result }
}
});
futures::future::join_all(executor_futures).await
futures::future::join_all(node_futures).await
}
pub async fn try_fetch_membership(

View File

@ -36,14 +36,7 @@ impl<'a> ReadinessCheck<'a> for NetworkReadiness<'a> {
type Data = Vec<NodeNetworkStatus>;
async fn collect(&'a self) -> Self::Data {
let (validator_statuses, executor_statuses) = tokio::join!(
collect_validator_statuses(self),
collect_executor_statuses(self)
);
validator_statuses
.into_iter()
.chain(executor_statuses)
.collect()
collect_node_statuses(self).await
}
fn is_ready(&self, data: &Self::Data) -> bool {
@ -107,10 +100,10 @@ impl<'a> ReadinessCheck<'a> for HttpNetworkReadiness<'a> {
}
}
async fn collect_validator_statuses(readiness: &NetworkReadiness<'_>) -> Vec<NodeNetworkStatus> {
let validator_futures = readiness
async fn collect_node_statuses(readiness: &NetworkReadiness<'_>) -> Vec<NodeNetworkStatus> {
let node_futures = readiness
.topology
.validators
.nodes
.iter()
.enumerate()
.map(|(idx, node)| {
@ -118,7 +111,7 @@ async fn collect_validator_statuses(readiness: &NetworkReadiness<'_>) -> Vec<Nod
.labels
.get(idx)
.cloned()
.unwrap_or_else(|| format!("validator#{idx}"));
.unwrap_or_else(|| format!("node#{idx}"));
let expected_peers = readiness.expected_peer_counts.get(idx).copied();
async move {
let result = node
@ -134,39 +127,7 @@ async fn collect_validator_statuses(readiness: &NetworkReadiness<'_>) -> Vec<Nod
}
});
futures::future::join_all(validator_futures).await
}
async fn collect_executor_statuses(readiness: &NetworkReadiness<'_>) -> Vec<NodeNetworkStatus> {
let offset = readiness.topology.validators.len();
let executor_futures = readiness
.topology
.executors
.iter()
.enumerate()
.map(|(idx, node)| {
let global_idx = offset + idx;
let label = readiness
.labels
.get(global_idx)
.cloned()
.unwrap_or_else(|| format!("executor#{idx}"));
let expected_peers = readiness.expected_peer_counts.get(global_idx).copied();
async move {
let result = node
.api()
.network_info()
.await
.map_err(NetworkInfoError::from);
NodeNetworkStatus {
label,
expected_peers,
result,
}
}
});
futures::future::join_all(executor_futures).await
futures::future::join_all(node_futures).await
}
pub async fn try_fetch_network_info(

View File

@ -35,8 +35,7 @@ pub struct ScenarioSpec {
#[derive(Debug, Clone, Copy)]
pub struct TopologySpec {
pub validators: usize,
pub executors: usize,
pub nodes: usize,
pub network: NetworkKind,
}
@ -101,15 +100,9 @@ impl TestingFrameworkWorld {
Ok(())
}
pub fn set_topology(
&mut self,
validators: usize,
executors: usize,
network: NetworkKind,
) -> StepResult {
pub fn set_topology(&mut self, nodes: usize, network: NetworkKind) -> StepResult {
self.spec.topology = Some(TopologySpec {
validators: positive_usize("validators", validators)?,
executors,
nodes: positive_usize("nodes", nodes)?,
network,
});
Ok(())
@ -209,23 +202,9 @@ impl TestingFrameworkWorld {
.is_some_and(|p| p.is_file())
|| shared_host_bin_path("nomos-node").is_file();
let requires_executor_bin = self
.spec
.topology
.is_some_and(|topology| topology.executors > 0);
let exec_ok = if requires_executor_bin {
env::var_os("NOMOS_EXECUTOR_BIN")
.map(PathBuf::from)
.is_some_and(|p| p.is_file())
|| shared_host_bin_path("nomos-executor").is_file()
} else {
true
};
if !(node_ok && exec_ok) {
if !node_ok {
return Err(StepError::Preflight {
message: "Missing Logos host binaries. Set NOMOS_NODE_BIN (and NOMOS_EXECUTOR_BIN if your scenario uses executors), or run `scripts/run/run-examples.sh host` to restore them into `testing-framework/assets/stack/bin`.".to_owned(),
message: "Missing Logos host binaries. Set NOMOS_NODE_BIN, or run `scripts/run/run-examples.sh host` to restore them into `testing-framework/assets/stack/bin`.".to_owned(),
});
}
}
@ -284,8 +263,7 @@ fn make_builder(topology: TopologySpec) -> Builder<()> {
let base = match topology.network {
NetworkKind::Star => t.network_star(),
};
base.validators(topology.validators)
.executors(topology.executors)
base.nodes(topology.nodes)
})
}

View File

@ -1,35 +1,5 @@
services:
{% for node in validators %}
{{ node.name }}:
image: {{ node.image }}
{% if node.platform %} platform: {{ node.platform }}
{% endif %} entrypoint: {{ node.entrypoint }}
volumes:
{% for volume in node.volumes %}
- {{ volume }}
{% endfor %}
{% if node.extra_hosts | length > 0 %}
extra_hosts:
{% for host in node.extra_hosts %}
- {{ host }}
{% endfor %}
{% endif %}
ports:
{% for port in node.ports %}
- {{ port }}
{% endfor %}
environment:
{% for env in node.environment %}
{{ env.key }}: "{{ env.value }}"
{% endfor %}
cap_add:
- SYS_ADMIN
- SYS_PTRACE
security_opt:
- seccomp=unconfined
restart: on-failure
{% endfor %}{% for node in executors %}
{% for node in nodes %}
{{ node.name }}:
image: {{ node.image }}
{% if node.platform %} platform: {{ node.platform }}

View File

@ -59,7 +59,7 @@ impl ClientBuilder {
.await);
}
};
info!("block feed connected to validator");
info!("block feed connected to node");
Ok(pair)
}
}

View File

@ -102,14 +102,14 @@ mod tests {
use testing_framework_core::{
scenario::ScenarioBuilder,
topology::{
generation::{GeneratedNodeConfig, GeneratedTopology, NodeRole as TopologyNodeRole},
generation::{GeneratedNodeConfig, GeneratedTopology},
utils::multiaddr_port,
},
};
#[test]
fn cfgsync_prebuilt_configs_preserve_genesis() {
let scenario = ScenarioBuilder::topology_with(|t| t.validators(1).executors(1))
let scenario = ScenarioBuilder::topology_with(|t| t.nodes(2))
.build()
.expect("scenario build should succeed");
let topology = scenario.topology().clone();
@ -121,9 +121,15 @@ mod tests {
&topology.config().da_params,
&tracing_settings,
&topology.config().wallet_config,
Some(topology.nodes().map(|node| node.id).collect()),
Some(topology.nodes().map(|node| node.da_port).collect()),
Some(topology.nodes().map(|node| node.blend_port).collect()),
Some(topology.nodes().iter().map(|node| node.id).collect()),
Some(topology.nodes().iter().map(|node| node.da_port).collect()),
Some(
topology
.nodes()
.iter()
.map(|node| node.blend_port)
.collect(),
),
hosts,
)
.expect("cfgsync config generation should succeed");
@ -133,7 +139,7 @@ mod tests {
.collect();
for node in topology.nodes() {
let identifier = identifier_for(node.role(), node.index());
let identifier = identifier_for(node.index());
let cfgsync_config = configs_by_identifier
.get(&identifier)
.unwrap_or_else(|| panic!("missing cfgsync config for {identifier}"));
@ -164,7 +170,7 @@ mod tests {
#[test]
fn cfgsync_genesis_proofs_verify_against_ledger() {
let scenario = ScenarioBuilder::topology_with(|t| t.validators(1).executors(1))
let scenario = ScenarioBuilder::topology_with(|t| t.nodes(2))
.build()
.expect("scenario build should succeed");
let topology = scenario.topology().clone();
@ -176,9 +182,15 @@ mod tests {
&topology.config().da_params,
&tracing_settings,
&topology.config().wallet_config,
Some(topology.nodes().map(|node| node.id).collect()),
Some(topology.nodes().map(|node| node.da_port).collect()),
Some(topology.nodes().map(|node| node.blend_port).collect()),
Some(topology.nodes().iter().map(|node| node.id).collect()),
Some(topology.nodes().iter().map(|node| node.da_port).collect()),
Some(
topology
.nodes()
.iter()
.map(|node| node.blend_port)
.collect(),
),
hosts,
)
.expect("cfgsync config generation should succeed");
@ -188,7 +200,7 @@ mod tests {
.collect();
for node in topology.nodes() {
let identifier = identifier_for(node.role(), node.index());
let identifier = identifier_for(node.index());
let cfgsync_config = configs_by_identifier
.get(&identifier)
.unwrap_or_else(|| panic!("missing cfgsync config for {identifier}"));
@ -203,7 +215,7 @@ mod tests {
#[test]
fn cfgsync_docker_overrides_produce_valid_genesis() {
let scenario = ScenarioBuilder::topology_with(|t| t.validators(1).executors(1))
let scenario = ScenarioBuilder::topology_with(|t| t.nodes(2))
.build()
.expect("scenario build should succeed");
let topology = scenario.topology().clone();
@ -215,9 +227,15 @@ mod tests {
&topology.config().da_params,
&tracing_settings,
&topology.config().wallet_config,
Some(topology.nodes().map(|node| node.id).collect()),
Some(topology.nodes().map(|node| node.da_port).collect()),
Some(topology.nodes().map(|node| node.blend_port).collect()),
Some(topology.nodes().iter().map(|node| node.id).collect()),
Some(topology.nodes().iter().map(|node| node.da_port).collect()),
Some(
topology
.nodes()
.iter()
.map(|node| node.blend_port)
.collect(),
),
hosts,
)
.expect("cfgsync config generation should succeed");
@ -237,7 +255,7 @@ mod tests {
#[test]
fn cfgsync_configs_match_topology_ports_and_genesis() {
let scenario = ScenarioBuilder::topology_with(|t| t.validators(1).executors(1))
let scenario = ScenarioBuilder::topology_with(|t| t.nodes(2))
.build()
.expect("scenario build should succeed");
let topology = scenario.topology().clone();
@ -249,9 +267,15 @@ mod tests {
&topology.config().da_params,
&tracing_settings,
&topology.config().wallet_config,
Some(topology.nodes().map(|node| node.id).collect()),
Some(topology.nodes().map(|node| node.da_port).collect()),
Some(topology.nodes().map(|node| node.blend_port).collect()),
Some(topology.nodes().iter().map(|node| node.id).collect()),
Some(topology.nodes().iter().map(|node| node.da_port).collect()),
Some(
topology
.nodes()
.iter()
.map(|node| node.blend_port)
.collect(),
),
hosts,
)
.expect("cfgsync config generation should succeed");
@ -261,7 +285,7 @@ mod tests {
.collect();
for node in topology.nodes() {
let identifier = identifier_for(node.role(), node.index());
let identifier = identifier_for(node.index());
let cfg = configs_by_identifier
.get(&identifier)
.unwrap_or_else(|| panic!("missing cfgsync config for {identifier}"));
@ -303,20 +327,21 @@ mod tests {
}
fn hosts_from_topology(topology: &GeneratedTopology) -> Vec<Host> {
topology.nodes().map(host_from_node).collect()
topology.nodes().iter().map(host_from_node).collect()
}
fn docker_style_hosts(topology: &GeneratedTopology) -> Vec<Host> {
topology
.nodes()
.iter()
.map(|node| docker_host(node, 10 + node.index() as u8))
.collect()
}
fn host_from_node(node: &GeneratedNodeConfig) -> Host {
let identifier = identifier_for(node.role(), node.index());
let identifier = identifier_for(node.index());
let ip = Ipv4Addr::LOCALHOST;
let mut host = make_host(node.role(), ip, identifier);
let mut host = make_host(ip, identifier);
host.network_port = node.network_port();
host.da_network_port = node.da_port;
host.blend_port = node.blend_port;
@ -324,9 +349,9 @@ mod tests {
}
fn docker_host(node: &GeneratedNodeConfig, octet: u8) -> Host {
let identifier = identifier_for(node.role(), node.index());
let identifier = identifier_for(node.index());
let ip = Ipv4Addr::new(172, 23, 0, octet);
let mut host = make_host(node.role(), ip, identifier);
let mut host = make_host(ip, identifier);
host.network_port = node.network_port().saturating_add(1000);
host.da_network_port = node.da_port.saturating_add(1000);
host.blend_port = node.blend_port.saturating_add(1000);
@ -335,9 +360,8 @@ mod tests {
fn tracing_settings(topology: &GeneratedTopology) -> TracingSettings {
topology
.validators()
.nodes()
.first()
.or_else(|| topology.executors().first())
.expect("topology must contain at least one node")
.general
.tracing_config
@ -345,14 +369,11 @@ mod tests {
.clone()
}
fn identifier_for(role: TopologyNodeRole, index: usize) -> String {
match role {
TopologyNodeRole::Validator => format!("validator-{index}"),
TopologyNodeRole::Executor => format!("executor-{index}"),
}
fn identifier_for(index: usize) -> String {
format!("node-{index}")
}
fn make_host(role: TopologyNodeRole, ip: Ipv4Addr, identifier: String) -> Host {
fn make_host(ip: Ipv4Addr, identifier: String) -> Host {
let ports = PortOverrides {
network_port: None,
da_network_port: None,
@ -360,10 +381,7 @@ mod tests {
api_port: None,
testing_http_port: None,
};
match role {
TopologyNodeRole::Validator => Host::validator_from_ip(ip, identifier, ports),
TopologyNodeRole::Executor => Host::executor_from_ip(ip, identifier, ports),
}
Host::node_from_ip(ip, identifier, ports)
}
fn declaration_fingerprint<G>(

View File

@ -50,8 +50,7 @@ impl DeploymentOrchestrator {
} = setup.prepare_workspace(&observability).await?;
tracing::info!(
validators = descriptors.validators().len(),
executors = descriptors.executors().len(),
nodes = descriptors.nodes().len(),
duration_secs = scenario.duration().as_secs(),
readiness_checks = self.deployer.readiness_checks,
metrics_query_url = observability.metrics_query_url.as_ref().map(|u| u.as_str()),
@ -63,8 +62,6 @@ impl DeploymentOrchestrator {
"compose deployment starting"
);
let validator_count = descriptors.validators().len();
let executor_count = descriptors.executors().len();
let host_ports = PortManager::prepare(&mut environment, &descriptors).await?;
wait_for_readiness_or_grace_period(
@ -104,8 +101,7 @@ impl DeploymentOrchestrator {
);
info!(
validators = validator_count,
executors = executor_count,
nodes = host_ports.nodes.len(),
duration_secs = scenario.duration().as_secs(),
readiness_checks = self.deployer.readiness_checks,
host,
@ -198,38 +194,22 @@ fn maybe_print_endpoints(observability: &ObservabilityInputs, host: &str, ports:
}
fn log_profiling_urls(host: &str, ports: &HostPortMapping) {
for (idx, node) in ports.validators.iter().enumerate() {
for (idx, node) in ports.nodes.iter().enumerate() {
tracing::info!(
validator = idx,
node = idx,
profiling_url = %format!(
"http://{}:{}/debug/pprof/profile?seconds=15&format=proto",
host, node.api
),
"validator profiling endpoint (profiling feature required)"
);
}
for (idx, node) in ports.executors.iter().enumerate() {
tracing::info!(
executor = idx,
profiling_url = %format!(
"http://{}:{}/debug/pprof/profile?seconds=15&format=proto",
host, node.api
),
"executor profiling endpoint (profiling feature required)"
"node profiling endpoint (profiling feature required)"
);
}
}
fn print_profiling_urls(host: &str, ports: &HostPortMapping) {
for (idx, node) in ports.validators.iter().enumerate() {
for (idx, node) in ports.nodes.iter().enumerate() {
println!(
"TESTNET_PPROF validator_{}=http://{}:{}/debug/pprof/profile?seconds=15&format=proto",
idx, host, node.api
);
}
for (idx, node) in ports.executors.iter().enumerate() {
println!(
"TESTNET_PPROF executor_{}=http://{}:{}/debug/pprof/profile?seconds=15&format=proto",
"TESTNET_PPROF node_{}=http://{}:{}/debug/pprof/profile?seconds=15&format=proto",
idx, host, node.api
);
}

View File

@ -17,15 +17,13 @@ impl PortManager {
descriptors: &GeneratedTopology,
) -> Result<HostPortMapping, ComposeRunnerError> {
debug!(
validators = descriptors.validators().len(),
executors = descriptors.executors().len(),
nodes = descriptors.nodes().len(),
"resolving host ports for compose services"
);
match discover_host_ports(environment, descriptors).await {
Ok(mapping) => {
info!(
validator_ports = ?mapping.validator_api_ports(),
executor_ports = ?mapping.executor_api_ports(),
node_ports = ?mapping.node_api_ports(),
"resolved container host ports"
);
Ok(mapping)

View File

@ -7,7 +7,7 @@ use crate::{
environment::StackEnvironment,
ports::{HostPortMapping, ensure_remote_readiness_with_ports},
},
lifecycle::readiness::{ensure_executors_ready_with_ports, ensure_validators_ready_with_ports},
lifecycle::readiness::ensure_nodes_ready_with_ports,
};
pub struct ReadinessChecker;
@ -18,25 +18,13 @@ impl ReadinessChecker {
host_ports: &HostPortMapping,
environment: &mut StackEnvironment,
) -> Result<(), ComposeRunnerError> {
let validator_ports = host_ports.validator_api_ports();
info!(ports = ?validator_ports, "waiting for validator HTTP endpoints");
if let Err(err) = ensure_validators_ready_with_ports(&validator_ports).await {
let node_ports = host_ports.node_api_ports();
info!(ports = ?node_ports, "waiting for node HTTP endpoints");
if let Err(err) = ensure_nodes_ready_with_ports(&node_ports).await {
return fail_readiness_step(
environment,
"validator readiness failed",
"validator readiness failed",
err,
)
.await;
}
let executor_ports = host_ports.executor_api_ports();
info!(ports = ?executor_ports, "waiting for executor HTTP endpoints");
if let Err(err) = ensure_executors_ready_with_ports(&executor_ports).await {
return fail_readiness_step(
environment,
"executor readiness failed",
"executor readiness failed",
"node readiness failed",
"node readiness failed",
err,
)
.await;

View File

@ -32,8 +32,7 @@ impl DeploymentSetup {
ensure_supported_topology(&self.descriptors)?;
info!(
validators = self.descriptors.validators().len(),
executors = self.descriptors.executors().len(),
nodes = self.descriptors.nodes().len(),
"starting compose deployment"
);

View File

@ -20,8 +20,7 @@ use testing_framework_config::constants::DEFAULT_CFGSYNC_PORT;
/// Top-level docker-compose descriptor built from a GeneratedTopology.
#[derive(Clone, Debug, Serialize)]
pub struct ComposeDescriptor {
validators: Vec<NodeDescriptor>,
executors: Vec<NodeDescriptor>,
nodes: Vec<NodeDescriptor>,
}
impl ComposeDescriptor {
@ -32,13 +31,8 @@ impl ComposeDescriptor {
}
#[cfg(test)]
pub fn validators(&self) -> &[NodeDescriptor] {
&self.validators
}
#[cfg(test)]
pub fn executors(&self) -> &[NodeDescriptor] {
&self.executors
pub fn nodes(&self) -> &[NodeDescriptor] {
&self.nodes
}
}
@ -80,56 +74,20 @@ impl<'a> ComposeDescriptorBuilder<'a> {
let (image, platform) = resolve_image();
let validators = build_nodes(
self.topology.validators(),
ComposeNodeKind::Validator,
let nodes = build_nodes(
self.topology.nodes(),
&image,
platform.as_deref(),
self.use_kzg_mount,
cfgsync_port,
);
let executors = build_nodes(
self.topology.executors(),
ComposeNodeKind::Executor,
&image,
platform.as_deref(),
self.use_kzg_mount,
cfgsync_port,
);
ComposeDescriptor {
validators,
executors,
}
}
}
#[derive(Clone, Copy)]
pub(crate) enum ComposeNodeKind {
Validator,
Executor,
}
impl ComposeNodeKind {
fn instance_name(self, index: usize) -> String {
match self {
Self::Validator => format!("validator-{index}"),
Self::Executor => format!("executor-{index}"),
}
}
const fn entrypoint(self) -> &'static str {
match self {
Self::Validator => "/etc/nomos/scripts/run_nomos_node.sh",
Self::Executor => "/etc/nomos/scripts/run_nomos_executor.sh",
}
ComposeDescriptor { nodes }
}
}
fn build_nodes(
nodes: &[GeneratedNodeConfig],
kind: ComposeNodeKind,
image: &str,
platform: Option<&str>,
use_kzg_mount: bool,
@ -139,15 +97,7 @@ fn build_nodes(
.iter()
.enumerate()
.map(|(index, node)| {
NodeDescriptor::from_node(
kind,
index,
node,
image,
platform,
use_kzg_mount,
cfgsync_port,
)
NodeDescriptor::from_node(index, node, image, platform, use_kzg_mount, cfgsync_port)
})
.collect()
}

View File

@ -1,9 +1,9 @@
use serde::Serialize;
use testing_framework_core::topology::generation::GeneratedNodeConfig;
use super::{ComposeNodeKind, base_environment, base_volumes, default_extra_hosts};
use super::{base_environment, base_volumes, default_extra_hosts};
/// Describes a validator or executor container in the compose stack.
/// Describes a node container in the compose stack.
#[derive(Clone, Debug, Serialize)]
pub struct NodeDescriptor {
name: String,
@ -45,7 +45,6 @@ impl EnvEntry {
impl NodeDescriptor {
pub(crate) fn from_node(
kind: ComposeNodeKind,
index: usize,
node: &GeneratedNodeConfig,
image: &str,
@ -53,8 +52,9 @@ impl NodeDescriptor {
use_kzg_mount: bool,
cfgsync_port: u16,
) -> Self {
const ENTRYPOINT: &str = "/etc/nomos/scripts/run_nomos_node.sh";
let mut environment = base_environment(cfgsync_port, use_kzg_mount);
let identifier = kind.instance_name(index);
let identifier = format!("node-{index}");
let api_port = node.general.api_config.address.port();
let testing_port = node.general.api_config.testing_http_address.port();
environment.extend([
@ -78,9 +78,9 @@ impl NodeDescriptor {
];
Self {
name: kind.instance_name(index),
name: format!("node-{index}"),
image: image.to_owned(),
entrypoint: kind.entrypoint().to_owned(),
entrypoint: ENTRYPOINT.to_owned(),
volumes: base_volumes(use_kzg_mount),
extra_hosts: default_extra_hosts(),
ports,

View File

@ -45,23 +45,13 @@ pub struct ComposeNodeControl {
#[async_trait::async_trait]
impl NodeControlHandle for ComposeNodeControl {
async fn restart_validator(&self, index: usize) -> Result<(), DynError> {
async fn restart_node(&self, index: usize) -> Result<(), DynError> {
restart_compose_service(
&self.compose_file,
&self.project_name,
&format!("validator-{index}"),
&format!("node-{index}"),
)
.await
.map_err(|err| format!("validator restart failed: {err}").into())
}
async fn restart_executor(&self, index: usize) -> Result<(), DynError> {
restart_compose_service(
&self.compose_file,
&self.project_name,
&format!("executor-{index}"),
)
.await
.map_err(|err| format!("executor restart failed: {err}").into())
.map_err(|err| format!("node restart failed: {err}").into())
}
}

View File

@ -1,10 +1,7 @@
use std::path::PathBuf;
use testing_framework_core::{
scenario::{
MetricsError,
http_probe::{HttpReadinessError, NodeRole},
},
scenario::{MetricsError, http_probe::HttpReadinessError},
topology::readiness::ReadinessError,
};
use url::ParseError;
@ -14,10 +11,8 @@ use crate::{docker::commands::ComposeCommandError, infrastructure::template::Tem
#[derive(Debug, thiserror::Error)]
/// Top-level compose runner errors.
pub enum ComposeRunnerError {
#[error(
"compose runner requires at least one validator (validators={validators}, executors={executors})"
)]
MissingValidator { validators: usize, executors: usize },
#[error("compose runner requires at least one node (nodes={nodes})")]
MissingNode { nodes: usize },
#[error("docker does not appear to be available on this host")]
DockerUnavailable,
#[error("failed to resolve host port for {service} container port {container_port}: {source}")]
@ -39,7 +34,7 @@ pub enum ComposeRunnerError {
NodeClients(#[from] NodeClientError),
#[error(transparent)]
Telemetry(#[from] MetricsError),
#[error("block feed requires at least one validator client")]
#[error("block feed requires at least one node client")]
BlockFeedMissing,
#[error("failed to start block feed: {source}")]
BlockFeed {
@ -105,9 +100,9 @@ pub enum ConfigError {
pub enum StackReadinessError {
#[error(transparent)]
Http(#[from] HttpReadinessError),
#[error("failed to build readiness URL for {role} port {port}: {source}", role = role.label())]
#[error("failed to build readiness URL for {role} port {port}: {source}")]
Endpoint {
role: NodeRole,
role: &'static str,
port: u16,
#[source]
source: ParseError,
@ -122,12 +117,9 @@ pub enum StackReadinessError {
#[derive(Debug, thiserror::Error)]
/// Node client construction failures.
pub enum NodeClientError {
#[error(
"failed to build {endpoint} client URL for {role} port {port}: {source}",
role = role.label()
)]
#[error("failed to build {endpoint} client URL for {role} port {port}: {source}")]
Endpoint {
role: NodeRole,
role: &'static str,
endpoint: &'static str,
port: u16,
#[source]

View File

@ -70,8 +70,7 @@ pub fn update_cfgsync_config(
path = %path.display(),
use_kzg_mount,
port,
validators = topology.validators().len(),
executors = topology.executors().len(),
nodes = topology.nodes().len(),
"updating cfgsync template"
);
let mut cfg = load_cfgsync_template(path)?;

View File

@ -133,16 +133,13 @@ impl StackEnvironment {
}
}
/// Verifies the topology has at least one validator so compose can start.
/// Verifies the topology has at least one node so compose can start.
pub fn ensure_supported_topology(
descriptors: &GeneratedTopology,
) -> Result<(), ComposeRunnerError> {
let validators = descriptors.validators().len();
if validators == 0 {
return Err(ComposeRunnerError::MissingValidator {
validators,
executors: descriptors.executors().len(),
});
let nodes = descriptors.nodes().len();
if nodes == 0 {
return Err(ComposeRunnerError::MissingNode { nodes });
}
Ok(())
}

View File

@ -2,11 +2,7 @@ use std::time::Duration;
use anyhow::{Context as _, anyhow};
use reqwest::Url;
use testing_framework_core::{
adjust_timeout,
scenario::http_probe::NodeRole as HttpNodeRole,
topology::generation::{GeneratedTopology, NodeRole as TopologyNodeRole},
};
use testing_framework_core::{adjust_timeout, topology::generation::GeneratedTopology};
use tokio::{process::Command, time::timeout};
use tracing::{debug, info};
use url::ParseError;
@ -25,22 +21,16 @@ pub struct NodeHostPorts {
pub testing: u16,
}
/// All host port mappings for validators and executors.
/// All host port mappings for nodes.
#[derive(Clone, Debug)]
pub struct HostPortMapping {
pub validators: Vec<NodeHostPorts>,
pub executors: Vec<NodeHostPorts>,
pub nodes: Vec<NodeHostPorts>,
}
impl HostPortMapping {
/// Returns API ports for all validators.
pub fn validator_api_ports(&self) -> Vec<u16> {
self.validators.iter().map(|ports| ports.api).collect()
}
/// Returns API ports for all executors.
pub fn executor_api_ports(&self) -> Vec<u16> {
self.executors.iter().map(|ports| ports.api).collect()
/// Returns API ports for all nodes.
pub fn node_api_ports(&self) -> Vec<u16> {
self.nodes.iter().map(|ports| ports.api).collect()
}
}
@ -52,34 +42,21 @@ pub async fn discover_host_ports(
debug!(
compose_file = %environment.compose_path().display(),
project = environment.project_name(),
validators = descriptors.validators().len(),
executors = descriptors.executors().len(),
nodes = descriptors.nodes().len(),
"resolving compose host ports"
);
let mut validators = Vec::new();
for node in descriptors.validators() {
let service = node_identifier(TopologyNodeRole::Validator, node.index());
let mut nodes = Vec::new();
for node in descriptors.nodes() {
let service = node_identifier(node.index());
let api = resolve_service_port(environment, &service, node.api_port()).await?;
let testing = resolve_service_port(environment, &service, node.testing_http_port()).await?;
validators.push(NodeHostPorts { api, testing });
nodes.push(NodeHostPorts { api, testing });
}
let mut executors = Vec::new();
for node in descriptors.executors() {
let service = node_identifier(TopologyNodeRole::Executor, node.index());
let api = resolve_service_port(environment, &service, node.api_port()).await?;
let testing = resolve_service_port(environment, &service, node.testing_http_port()).await?;
executors.push(NodeHostPorts { api, testing });
}
let mapping = HostPortMapping {
validators,
executors,
};
let mapping = HostPortMapping { nodes };
info!(
validator_ports = ?mapping.validators,
executor_ports = ?mapping.executors,
node_ports = ?mapping.nodes,
"compose host ports resolved"
);
@ -149,36 +126,32 @@ pub async fn ensure_remote_readiness_with_ports(
descriptors: &GeneratedTopology,
mapping: &HostPortMapping,
) -> Result<(), StackReadinessError> {
let validator_urls = mapping
.validators
let node_urls = mapping
.nodes
.iter()
.map(|ports| readiness_url(HttpNodeRole::Validator, ports.api))
.collect::<Result<Vec<_>, _>>()?;
let executor_urls = mapping
.executors
.iter()
.map(|ports| readiness_url(HttpNodeRole::Executor, ports.api))
.map(|ports| readiness_url(ports.api))
.collect::<Result<Vec<_>, _>>()?;
descriptors
.wait_remote_readiness(&validator_urls, &executor_urls, None, None)
.wait_remote_readiness(&node_urls, None)
.await
.map_err(|source| StackReadinessError::Remote { source })
}
fn readiness_url(role: HttpNodeRole, port: u16) -> Result<Url, StackReadinessError> {
localhost_url(port).map_err(|source| StackReadinessError::Endpoint { role, port, source })
fn readiness_url(port: u16) -> Result<Url, StackReadinessError> {
localhost_url(port).map_err(|source| StackReadinessError::Endpoint {
role: "node",
port,
source,
})
}
fn localhost_url(port: u16) -> Result<Url, ParseError> {
Url::parse(&format!("http://{}:{port}/", compose_runner_host()))
}
fn node_identifier(role: TopologyNodeRole, index: usize) -> String {
match role {
TopologyNodeRole::Validator => format!("validator-{index}"),
TopologyNodeRole::Executor => format!("executor-{index}"),
}
fn node_identifier(index: usize) -> String {
format!("node-{index}")
}
pub(crate) fn compose_runner_host() -> String {

View File

@ -13,13 +13,12 @@ async fn spawn_block_feed_with(
node_clients: &NodeClients,
) -> Result<(BlockFeed, BlockFeedTask), ComposeRunnerError> {
debug!(
validators = node_clients.validator_clients().len(),
executors = node_clients.executor_clients().len(),
"selecting validator client for block feed"
nodes = node_clients.node_clients().len(),
"selecting node client for block feed"
);
let block_source_client = node_clients
.random_validator()
.random_node()
.ok_or(ComposeRunnerError::BlockFeedMissing)?;
spawn_block_feed(block_source_client)

View File

@ -2,36 +2,25 @@ use std::time::Duration;
use reqwest::Url;
use testing_framework_core::{
nodes::ApiClient,
scenario::{NodeClients, http_probe::NodeRole as HttpNodeRole},
topology::generation::{GeneratedTopology, NodeRole as TopologyNodeRole},
nodes::ApiClient, scenario::NodeClients, topology::generation::GeneratedTopology,
};
use tokio::time::sleep;
use crate::{
errors::{NodeClientError, StackReadinessError},
infrastructure::ports::{HostPortMapping, NodeHostPorts},
lifecycle::wait::{wait_for_executors, wait_for_validators},
lifecycle::wait::wait_for_nodes,
};
const DISABLED_READINESS_SLEEP: Duration = Duration::from_secs(5);
/// Wait until all validators respond on their API ports.
pub async fn ensure_validators_ready_with_ports(ports: &[u16]) -> Result<(), StackReadinessError> {
/// Wait until all nodes respond on their API ports.
pub async fn ensure_nodes_ready_with_ports(ports: &[u16]) -> Result<(), StackReadinessError> {
if ports.is_empty() {
return Ok(());
}
wait_for_validators(ports).await.map_err(Into::into)
}
/// Wait until all executors respond on their API ports.
pub async fn ensure_executors_ready_with_ports(ports: &[u16]) -> Result<(), StackReadinessError> {
if ports.is_empty() {
return Ok(());
}
wait_for_executors(ports).await.map_err(Into::into)
wait_for_nodes(ports).await.map_err(Into::into)
}
/// Allow a brief pause when readiness probes are disabled.
@ -47,29 +36,22 @@ pub fn build_node_clients_with_ports(
mapping: &HostPortMapping,
host: &str,
) -> Result<NodeClients, NodeClientError> {
let validators = descriptors
.validators()
let nodes = descriptors
.nodes()
.iter()
.zip(mapping.validators.iter())
.map(|(node, ports)| api_client_from_host_ports(to_http_role(node.role()), ports, host))
.collect::<Result<Vec<_>, _>>()?;
let executors = descriptors
.executors()
.iter()
.zip(mapping.executors.iter())
.map(|(node, ports)| api_client_from_host_ports(to_http_role(node.role()), ports, host))
.zip(mapping.nodes.iter())
.map(|(_, ports)| api_client_from_host_ports(ports, host))
.collect::<Result<Vec<_>, _>>()?;
Ok(NodeClients::new(validators, executors))
Ok(NodeClients::new(nodes))
}
fn api_client_from_host_ports(
role: HttpNodeRole,
ports: &NodeHostPorts,
host: &str,
) -> Result<ApiClient, NodeClientError> {
let base_url = localhost_url(ports.api, host).map_err(|source| NodeClientError::Endpoint {
role,
role: "node",
endpoint: "api",
port: ports.api,
source,
@ -78,7 +60,7 @@ fn api_client_from_host_ports(
let testing_url =
Some(
localhost_url(ports.testing, host).map_err(|source| NodeClientError::Endpoint {
role,
role: "node",
endpoint: "testing",
port: ports.testing,
source,
@ -88,13 +70,6 @@ fn api_client_from_host_ports(
Ok(ApiClient::from_urls(base_url, testing_url))
}
fn to_http_role(role: TopologyNodeRole) -> testing_framework_core::scenario::http_probe::NodeRole {
match role {
TopologyNodeRole::Validator => HttpNodeRole::Validator,
TopologyNodeRole::Executor => HttpNodeRole::Executor,
}
}
fn localhost_url(port: u16, host: &str) -> Result<Url, url::ParseError> {
Url::parse(&format!("http://{host}:{port}/"))
}

Some files were not shown because too many files have changed in this diff Show More