Refactor node spawn helpers and cleanup wrappers

This commit is contained in:
andrussal 2025-12-10 15:15:34 +01:00
parent 119a3cbfcd
commit e7c4bccaa6
25 changed files with 349 additions and 566 deletions

52
.github/workflows/deploy-pages.yml vendored Normal file
View File

@ -0,0 +1,52 @@
name: Deploy mdBook to GitHub Pages
on:
push:
branches: [ master ]
paths: [ 'book/**' ]
workflow_dispatch:
permissions:
contents: read
pages: write
id-token: write
concurrency:
group: "pages"
cancel-in-progress: false
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup mdBook
uses: peaceiris/actions-mdbook@v2
with:
mdbook-version: 'latest'
- name: Build book
run: |
cd book
mdbook build
- name: Setup Pages
uses: actions/configure-pages@v4
- name: Upload artifact
uses: actions/upload-pages-artifact@v3
with:
path: 'book/book'
deploy:
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
runs-on: ubuntu-latest
needs: build
steps:
- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v4

View File

@ -10,8 +10,6 @@ env:
permissions:
actions: read
contents: read
pages: write
id-token: write
concurrency:
group: lint-${{ github.ref }}
@ -490,74 +488,3 @@ jobs:
docker rm -f $ids
fi
book:
runs-on: ubuntu-latest
env:
RUSTUP_TOOLCHAIN: nightly-2025-09-14
steps:
- uses: actions/checkout@v4
- name: Load versions
run: |
set -euo pipefail
if [ ! -f versions.env ]; then
echo "versions.env missing; populate VERSION, NOMOS_NODE_REV, NOMOS_BUNDLE_VERSION" >&2
exit 1
fi
cat versions.env >> "$GITHUB_ENV"
: "${VERSION:?Missing VERSION}"
: "${NOMOS_NODE_REV:?Missing NOMOS_NODE_REV}"
: "${NOMOS_BUNDLE_VERSION:?Missing NOMOS_BUNDLE_VERSION}"
- uses: dtolnay/rust-toolchain@master
with:
toolchain: nightly-2025-09-14
- uses: actions/cache@v4
with:
path: |
~/.cargo/registry
~/.cargo/git
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
restore-keys: ${{ runner.os }}-cargo-
- name: Install mdBook toolchain
run: |
MDBOOK_VERSION=0.4.40
LINKCHECK_VERSION=0.7.7
MERMAID_VERSION=0.14.0
cargo +nightly-2025-09-14 install --locked mdbook --version ${MDBOOK_VERSION}
cargo +nightly-2025-09-14 install mdbook-linkcheck --version ${LINKCHECK_VERSION}
cargo +nightly-2025-09-14 install --locked mdbook-mermaid --version ${MERMAID_VERSION}
cargo +nightly-2025-09-14 install --locked typos-cli --version 1.23.6
- name: Spell check (typos)
run: typos --format brief book/src
- name: Markdown lint
run: npx -y markdownlint-cli2 "book/src/**/*.md"
- name: Build book
run: mdbook build book
- name: Add .nojekyll
run: |
mkdir -p book/book
touch book/book/.nojekyll
- name: Check links
run: mdbook-linkcheck -s book
- name: Setup Pages
if: github.event_name == 'push' && github.ref == 'refs/heads/master'
uses: actions/configure-pages@v4
- name: Upload book artifact for Pages
if: github.event_name == 'push' && github.ref == 'refs/heads/master'
uses: actions/upload-pages-artifact@v3
with:
path: book/book
deploy_book:
if: github.event_name == 'push' && github.ref == 'refs/heads/master'
needs: book
runs-on: ubuntu-latest
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
permissions:
pages: write
id-token: write
steps:
- name: Deploy to GitHub Pages
id: deployment
uses: actions/deploy-pages@v4

View File

@ -3,7 +3,7 @@
Directory structure with key paths annotated:
```
nomos-testing/
logos-blockchain-testing/
├─ testing-framework/ # Core library crates
│ ├─ configs/ # Node config builders, topology generation, tracing/logging config
│ ├─ core/ # Scenario model (ScenarioBuilder), runtime (Runner, Deployer), topology, node spawning

View File

@ -103,7 +103,7 @@ Three deployer implementations:
Built via `testing-framework/assets/stack/scripts/build_test_image.sh`:
- Embeds KZG circuit parameters and binaries from `testing-framework/assets/stack/kzgrs_test_params/kzgrs_test_params`
- Includes runner scripts: `run_nomos_node.sh`, `run_nomos_executor.sh`
- Tagged as `NOMOS_TESTNET_IMAGE` (default: `nomos-testnet:local`)
- Tagged as `NOMOS_TESTNET_IMAGE` (default: `logos-blockchain-testing:local`)
- **Recommended:** Use prebuilt bundle via `scripts/build-bundle.sh --platform linux` and set `NOMOS_BINARIES_TAR` before building image
### Circuit Assets

View File

@ -107,7 +107,7 @@ export NOMOS_BINARIES_TAR=.tmp/nomos-binaries-linux-v0.3.1.tar.gz
testing-framework/assets/stack/scripts/build_test_image.sh
# Run
NOMOS_TESTNET_IMAGE=nomos-testnet:local \
NOMOS_TESTNET_IMAGE=logos-blockchain-testing:local \
POL_PROOF_DEV_MODE=true \
cargo run -p runner-examples --bin compose_runner
```
@ -123,13 +123,13 @@ cp -r /tmp/nomos-circuits/* testing-framework/assets/stack/kzgrs_test_params/
testing-framework/assets/stack/scripts/build_test_image.sh
# Run
NOMOS_TESTNET_IMAGE=nomos-testnet:local \
NOMOS_TESTNET_IMAGE=logos-blockchain-testing:local \
POL_PROOF_DEV_MODE=true \
cargo run -p runner-examples --bin compose_runner
```
**Environment variables:**
- `NOMOS_TESTNET_IMAGE=nomos-testnet:local` — Image tag (required, must match built image)
- `NOMOS_TESTNET_IMAGE=logos-blockchain-testing:local` — Image tag (required, must match built image)
- `POL_PROOF_DEV_MODE=true`**Required** for all runners
- `NOMOS_DEMO_VALIDATORS=3` / `NOMOS_DEMO_EXECUTORS=2` / `NOMOS_DEMO_RUN_SECS=120` — Topology overrides
- `COMPOSE_NODE_PAIRS=1x1` — Alternative topology format: "validators×executors"
@ -163,15 +163,15 @@ export NOMOS_BINARIES_TAR=.tmp/nomos-binaries-linux-v0.3.1.tar.gz
testing-framework/assets/stack/scripts/build_test_image.sh
# Load into cluster
export NOMOS_TESTNET_IMAGE=nomos-testnet:local
kind load docker-image nomos-testnet:local # For kind
# OR: minikube image load nomos-testnet:local # For minikube
# OR: docker push your-registry/nomos-testnet:local # For remote
export NOMOS_TESTNET_IMAGE=logos-blockchain-testing:local
kind load docker-image logos-blockchain-testing:local # For kind
# OR: minikube image load logos-blockchain-testing:local # For minikube
# OR: docker push your-registry/logos-blockchain-testing:local # For remote
```
**Run the example:**
```bash
export NOMOS_TESTNET_IMAGE=nomos-testnet:local
export NOMOS_TESTNET_IMAGE=logos-blockchain-testing:local
export POL_PROOF_DEV_MODE=true
cargo run -p runner-examples --bin k8s_runner
```
@ -369,7 +369,7 @@ Setting `NOMOS_LOG_DIR` writes files **inside the container**. To access them, y
1. **Copy files out after the run:**
```bash
NOMOS_LOG_DIR=/logs \
NOMOS_TESTNET_IMAGE=nomos-testnet:local \
NOMOS_TESTNET_IMAGE=logos-blockchain-testing:local \
POL_PROOF_DEV_MODE=true \
cargo run -p runner-examples --bin compose_runner
@ -389,7 +389,7 @@ volumes:
**Keep containers for debugging:**
```bash
COMPOSE_RUNNER_PRESERVE=1 \
NOMOS_TESTNET_IMAGE=nomos-testnet:local \
NOMOS_TESTNET_IMAGE=logos-blockchain-testing:local \
cargo run -p runner-examples --bin compose_runner
# Containers remain running after test—inspect with docker logs or docker exec
```

View File

@ -21,7 +21,7 @@ The framework ships with runnable example binaries in `examples/src/bin/`.
**Recommended:** Use the convenience script:
```bash
# From the nomos-testing directory
# From the logos-blockchain-testing directory
scripts/run-examples.sh -t 60 -v 1 -e 1 host
```
@ -192,7 +192,7 @@ cp -r /tmp/nomos-circuits/* testing-framework/assets/stack/kzgrs_test_params/
testing-framework/assets/stack/scripts/build_test_image.sh
# Run with Compose
NOMOS_TESTNET_IMAGE=nomos-testnet:local \
NOMOS_TESTNET_IMAGE=logos-blockchain-testing:local \
POL_PROOF_DEV_MODE=true \
cargo run -p runner-examples --bin compose_runner
```

View File

@ -76,7 +76,7 @@ docker logs --tail 100 <container-id>
**Keep containers for post-mortem debugging:**
```bash
COMPOSE_RUNNER_PRESERVE=1 \
NOMOS_TESTNET_IMAGE=nomos-testnet:local \
NOMOS_TESTNET_IMAGE=logos-blockchain-testing:local \
POL_PROOF_DEV_MODE=true \
cargo run -p runner-examples --bin compose_runner
@ -277,7 +277,7 @@ Run a minimal baseline test (e.g., 2 validators, consensus liveness only). If it
- **Fix**: Kill orphaned processes (`pkill nomos-node`), wait for Docker cleanup
(`docker compose down`), or restart Docker.
### "Image not found: nomos-testnet:local"
### "Image not found: logos-blockchain-testing:local"
- **Cause**: Docker image not built for Compose/K8s runners, or KZG assets not
baked into the image.

View File

@ -12,7 +12,7 @@ set -euo pipefail
#
# Env overrides:
# VERSION - circuits version (default v0.3.1)
# NOMOS_TESTNET_IMAGE - image tag (default nomos-testnet:local)
# NOMOS_TESTNET_IMAGE - image tag (default logos-blockchain-testing:local)
# NOMOS_CIRCUITS_PLATFORM - override host platform detection
# NOMOS_CIRCUITS_REBUILD_RAPIDSNARK - set to 1 to force rapidsnark rebuild
# NOMOS_NODE_REV - nomos-node git rev for local binaries (default d2dd5a5084e1daef4032562c77d41de5e4d495f8)
@ -33,7 +33,7 @@ Options:
Environment:
VERSION Circuits version (default v0.3.1)
NOMOS_TESTNET_IMAGE Image tag (default nomos-testnet:local)
NOMOS_TESTNET_IMAGE Image tag (default logos-blockchain-testing:local)
NOMOS_CIRCUITS_PLATFORM Override host platform detection
NOMOS_CIRCUITS_REBUILD_RAPIDSNARK Force rapidsnark rebuild
NOMOS_NODE_REV nomos-node git rev (default d2dd5a5084e1daef4032562c77d41de5e4d495f8)
@ -81,7 +81,7 @@ readonly LINUX_CIRCUITS_DIR="${ROOT_DIR}/${NOMOS_CIRCUITS_LINUX_DIR_REL:-.tmp/no
MODE="compose"
RUN_SECS_RAW=""
VERSION="${DEFAULT_VERSION}"
IMAGE="${NOMOS_TESTNET_IMAGE:-nomos-testnet:local}"
IMAGE="${NOMOS_TESTNET_IMAGE:-logos-blockchain-testing:local}"
NOMOS_NODE_REV="${DEFAULT_NODE_REV}"
DEMO_VALIDATORS=""
DEMO_EXECUTORS=""
@ -358,21 +358,12 @@ if [ "$MODE" = "compose" ] && [ -z "${COMPOSE_CIRCUITS_PLATFORM:-}" ]; then
fi
export NOMOS_DEMO_RUN_SECS="${RUN_SECS}"
export COMPOSE_DEMO_RUN_SECS="${RUN_SECS}"
export LOCAL_DEMO_RUN_SECS="${RUN_SECS}"
export K8S_DEMO_RUN_SECS="${RUN_SECS}"
if [ -n "${DEMO_VALIDATORS}" ]; then
export NOMOS_DEMO_VALIDATORS="${DEMO_VALIDATORS}"
export COMPOSE_DEMO_VALIDATORS="${DEMO_VALIDATORS}"
export LOCAL_DEMO_VALIDATORS="${DEMO_VALIDATORS}"
export K8S_DEMO_VALIDATORS="${DEMO_VALIDATORS}"
fi
if [ -n "${DEMO_EXECUTORS}" ]; then
export NOMOS_DEMO_EXECUTORS="${DEMO_EXECUTORS}"
export COMPOSE_DEMO_EXECUTORS="${DEMO_EXECUTORS}"
export LOCAL_DEMO_EXECUTORS="${DEMO_EXECUTORS}"
export K8S_DEMO_EXECUTORS="${DEMO_EXECUTORS}"
fi
POL_PROOF_DEV_MODE=true \
NOMOS_TESTNET_IMAGE="${IMAGE}" \

View File

@ -15,7 +15,7 @@ if [ -f "${ROOT_DIR}/paths.env" ]; then
. "${ROOT_DIR}/paths.env"
fi
DOCKERFILE_PATH="${ROOT_DIR}/testing-framework/assets/stack/Dockerfile"
IMAGE_TAG="${IMAGE_TAG:-nomos-testnet:local}"
IMAGE_TAG="${IMAGE_TAG:-logos-blockchain-testing:local}"
VERSION="${VERSION:-v0.3.1}"
KZG_DIR_REL="${NOMOS_KZG_DIR_REL:-testing-framework/assets/stack/kzgrs_test_params}"
CIRCUITS_OVERRIDE="${CIRCUITS_OVERRIDE:-${KZG_DIR_REL}}"

View File

@ -1,8 +1,10 @@
#![allow(dead_code)]
use std::path::Path;
use std::{fs::File, io, path::Path};
use nomos_tracing::logging::local::FileConfig;
use serde::Serialize;
use serde_yaml::Value;
/// Configure tracing logger to write into `NOMOS_LOG_DIR` if set, else into the
/// provided base dir.
@ -24,3 +26,18 @@ where
});
}
}
/// Write a YAML config file, allowing a caller-provided injection hook to
/// mutate the serialized value before it is written.
pub fn write_config_with_injection<T, F>(config: &T, path: &Path, inject: F) -> io::Result<()>
where
T: Serialize,
F: FnOnce(&mut Value),
{
let mut yaml_value =
serde_yaml::to_value(config).map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
inject(&mut yaml_value);
let file = File::create(path)?;
serde_yaml::to_writer(file, &yaml_value)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
}

View File

@ -2,3 +2,4 @@ pub mod api;
pub mod binary;
pub mod config;
pub mod lifecycle;
pub mod node;

View File

@ -0,0 +1,147 @@
use std::{
net::SocketAddr,
path::{Path, PathBuf},
process::{Child, Command, Stdio},
time::Duration,
};
use nomos_tracing_service::LoggerLayer;
use reqwest::Url;
use serde::Serialize;
use tempfile::TempDir;
use tokio::time;
use super::lifecycle::monitor::is_running;
use crate::nodes::{
ApiClient,
common::{config::paths::ensure_recovery_paths, lifecycle::spawn::configure_logging},
create_tempdir,
};
/// Minimal interface to apply common node setup.
pub trait NodeConfigCommon {
fn set_logger(&mut self, logger: LoggerLayer);
fn set_paths(&mut self, base: &Path);
fn addresses(&self) -> (SocketAddr, Option<SocketAddr>);
}
/// Shared handle for spawned nodes that exposes common operations.
pub struct NodeHandle<T> {
pub(crate) child: Child,
pub(crate) tempdir: TempDir,
pub(crate) config: T,
pub(crate) api: ApiClient,
}
impl<T> NodeHandle<T> {
pub fn new(child: Child, tempdir: TempDir, config: T, api: ApiClient) -> Self {
Self {
child,
tempdir,
config,
api,
}
}
#[must_use]
pub fn url(&self) -> Url {
self.api.base_url().clone()
}
#[must_use]
pub fn testing_url(&self) -> Option<Url> {
self.api.testing_url()
}
#[must_use]
pub fn api(&self) -> &ApiClient {
&self.api
}
#[must_use]
pub const fn config(&self) -> &T {
&self.config
}
/// Returns true if the process exited within the timeout, false otherwise.
pub async fn wait_for_exit(&mut self, timeout: Duration) -> bool {
time::timeout(timeout, async {
loop {
if !is_running(&mut self.child) {
return;
}
time::sleep(Duration::from_millis(100)).await;
}
})
.await
.is_ok()
}
}
/// Apply common setup (recovery paths, logging, data dirs) and return a ready
/// config plus API addrs.
pub fn prepare_node_config<T: NodeConfigCommon>(
mut config: T,
log_prefix: &str,
enable_logging: bool,
) -> (TempDir, T, SocketAddr, Option<SocketAddr>) {
let dir = create_tempdir().expect("tempdir");
// Ensure recovery files/dirs exist so services that persist state do not fail
// on startup.
let _ = ensure_recovery_paths(dir.path());
if enable_logging {
configure_logging(dir.path(), log_prefix, |file_cfg| {
config.set_logger(LoggerLayer::File(file_cfg));
});
}
config.set_paths(dir.path());
let (addr, testing_addr) = config.addresses();
(dir, config, addr, testing_addr)
}
/// Spawn a node with shared setup, config writing, and readiness wait.
pub async fn spawn_node<C>(
config: C,
log_prefix: &str,
config_filename: &str,
binary_path: PathBuf,
enable_logging: bool,
) -> Result<NodeHandle<C>, tokio::time::error::Elapsed>
where
C: NodeConfigCommon + Serialize,
{
let (dir, config, addr, testing_addr) = prepare_node_config(config, log_prefix, enable_logging);
let config_path = dir.path().join(config_filename);
super::lifecycle::spawn::write_config_with_injection(&config, &config_path, |yaml| {
crate::nodes::common::config::injection::inject_ibd_into_cryptarchia(yaml)
})
.expect("failed to write node config");
let child = Command::new(binary_path)
.arg(&config_path)
.current_dir(dir.path())
.stdin(Stdio::null())
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()
.expect("failed to spawn node process");
let handle = NodeHandle::new(child, dir, config, ApiClient::new(addr, testing_addr));
// Wait for readiness via consensus_info
time::timeout(Duration::from_secs(60), async {
loop {
if handle.api.consensus_info().await.is_ok() {
break;
}
time::sleep(Duration::from_millis(100)).await;
}
})
.await?;
Ok(handle)
}

View File

@ -1,36 +1,18 @@
use std::{
collections::HashSet,
path::PathBuf,
process::{Child, Command, Stdio},
time::Duration,
};
use std::{ops::Deref, path::PathBuf};
use broadcast_service::BlockInfo;
use chain_service::CryptarchiaInfo;
use futures::Stream;
use kzgrs_backend::common::share::{DaLightShare, DaShare, DaSharesCommitments};
use nomos_core::{
block::Block, da::BlobId, header::HeaderId, mantle::SignedMantleTx, sdp::SessionNumber,
};
use nomos_da_network_core::swarm::{BalancerStats, MonitorStats};
use nomos_da_network_service::MembershipResponse;
use nomos_executor::config::Config;
use nomos_http_api_common::paths::{DA_GET_SHARES_COMMITMENTS, MANTLE_METRICS, MEMPOOL_ADD_TX};
use nomos_network::backends::libp2p::Libp2pInfo;
use nomos_node::api::testing::handlers::HistoricSamplingRequest;
use nomos_tracing_service::LoggerLayer;
use reqwest::Url;
pub use testing_framework_config::nodes::executor::create_executor_config;
use super::{ApiClient, create_tempdir, persist_tempdir, should_persist_tempdir};
use super::{persist_tempdir, should_persist_tempdir};
use crate::{
IS_DEBUG_TRACING, adjust_timeout,
IS_DEBUG_TRACING,
nodes::{
LOGS_PREFIX,
common::{
binary::{BinaryConfig, BinaryResolver},
config::{injection::inject_ibd_into_cryptarchia, paths::ensure_recovery_paths},
lifecycle::{kill::kill_child, spawn::configure_logging},
lifecycle::kill::kill_child,
node::{NodeConfigCommon, NodeHandle, spawn_node},
},
},
};
@ -48,201 +30,64 @@ fn binary_path() -> PathBuf {
}
pub struct Executor {
tempdir: tempfile::TempDir,
child: Child,
config: Config,
api: ApiClient,
handle: NodeHandle<Config>,
}
impl Deref for Executor {
type Target = NodeHandle<Config>;
fn deref(&self) -> &Self::Target {
&self.handle
}
}
impl Drop for Executor {
fn drop(&mut self) {
if should_persist_tempdir()
&& let Err(e) = persist_tempdir(&mut self.tempdir, "nomos-executor")
&& let Err(e) = persist_tempdir(&mut self.handle.tempdir, "nomos-executor")
{
println!("failed to persist tempdir: {e}");
}
kill_child(&mut self.child);
kill_child(&mut self.handle.child);
}
}
impl Executor {
pub async fn spawn(mut config: Config) -> Self {
let dir = create_tempdir().unwrap();
let config_path = dir.path().join("executor.yaml");
let file = std::fs::File::create(&config_path).unwrap();
pub async fn spawn(config: Config) -> Self {
let handle = spawn_node(
config,
LOGS_PREFIX,
"executor.yaml",
binary_path(),
!*IS_DEBUG_TRACING,
)
.await
.expect("executor did not become ready");
// Ensure recovery files/dirs exist so services that persist state do not fail
// on startup.
let _ = ensure_recovery_paths(dir.path());
Self { handle }
}
}
if !*IS_DEBUG_TRACING {
configure_logging(dir.path(), LOGS_PREFIX, |cfg| {
config.tracing.logger = LoggerLayer::File(cfg);
});
}
impl NodeConfigCommon for Config {
fn set_logger(&mut self, logger: LoggerLayer) {
self.tracing.logger = logger;
}
config.storage.db_path = dir.path().join("db");
dir.path().clone_into(
&mut config
fn set_paths(&mut self, base: &std::path::Path) {
self.storage.db_path = base.join("db");
base.clone_into(
&mut self
.da_verifier
.storage_adapter_settings
.blob_storage_directory,
);
let addr = config.http.backend_settings.address;
let testing_addr = config.testing_http.backend_settings.address;
let mut yaml_value = serde_yaml::to_value(&config).unwrap();
inject_ibd_into_cryptarchia(&mut yaml_value);
serde_yaml::to_writer(file, &yaml_value).unwrap();
let child = Command::new(binary_path())
.arg(&config_path)
.current_dir(dir.path())
.stdout(Stdio::inherit())
.spawn()
.unwrap();
let node = Self {
child,
tempdir: dir,
config,
api: ApiClient::new(addr, Some(testing_addr)),
};
tokio::time::timeout(adjust_timeout(Duration::from_secs(60)), async {
node.wait_online().await;
})
.await
.unwrap();
node
}
pub async fn block_peer(&self, peer_id: String) -> bool {
self.api.block_peer(&peer_id).await.unwrap()
}
pub async fn unblock_peer(&self, peer_id: String) -> bool {
self.api.unblock_peer(&peer_id).await.unwrap()
}
pub async fn blacklisted_peers(&self) -> Vec<String> {
self.api.blacklisted_peers().await.unwrap()
}
async fn wait_online(&self) {
loop {
let res = self.api.get_response(MANTLE_METRICS).await;
if res.is_ok() && res.unwrap().status().is_success() {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
#[must_use]
pub const fn config(&self) -> &Config {
&self.config
}
#[must_use]
pub fn url(&self) -> Url {
self.api.base_url().clone()
}
#[must_use]
pub fn testing_url(&self) -> Option<Url> {
self.api.testing_url()
}
pub async fn balancer_stats(&self) -> BalancerStats {
self.api.balancer_stats().await.unwrap()
}
pub async fn monitor_stats(&self) -> MonitorStats {
self.api.monitor_stats().await.unwrap()
}
pub async fn network_info(&self) -> Libp2pInfo {
self.api.network_info().await.unwrap()
}
pub async fn consensus_info(&self) -> CryptarchiaInfo {
self.api.consensus_info().await.unwrap()
}
pub async fn get_block(&self, id: HeaderId) -> Option<Block<SignedMantleTx>> {
self.api.storage_block(&id).await.unwrap()
}
pub async fn get_shares(
&self,
blob_id: BlobId,
requested_shares: HashSet<[u8; 2]>,
filter_shares: HashSet<[u8; 2]>,
return_available: bool,
) -> Result<impl Stream<Item = DaLightShare>, common_http_client::Error> {
self.api
.http_client()
.get_shares::<DaShare>(
self.api.base_url().clone(),
blob_id,
requested_shares,
filter_shares,
return_available,
)
.await
}
pub async fn get_commitments(&self, blob_id: BlobId) -> Option<DaSharesCommitments> {
self.api
.post_json_decode(DA_GET_SHARES_COMMITMENTS, &blob_id)
.await
.unwrap()
}
pub async fn get_storage_commitments(
&self,
blob_id: BlobId,
) -> Result<Option<DaSharesCommitments>, common_http_client::Error> {
self.api
.http_client()
.get_storage_commitments::<DaShare>(self.api.base_url().clone(), blob_id)
.await
}
pub async fn da_get_membership(
&self,
session_id: SessionNumber,
) -> Result<MembershipResponse, reqwest::Error> {
self.api.da_get_membership(&session_id).await
}
pub async fn da_historic_sampling<I>(
&self,
block_id: HeaderId,
blob_ids: I,
) -> Result<bool, reqwest::Error>
where
I: IntoIterator<Item = (BlobId, SessionNumber)>,
{
let request = HistoricSamplingRequest {
block_id,
blob_ids: blob_ids.into_iter().collect(),
};
self.api.da_historic_sampling(&request).await
}
pub async fn get_lib_stream(
&self,
) -> Result<impl Stream<Item = BlockInfo>, common_http_client::Error> {
self.api
.http_client()
.get_lib_stream(self.api.base_url().clone())
.await
}
pub async fn add_tx(&self, tx: SignedMantleTx) -> Result<(), reqwest::Error> {
self.api.post_json_unit(MEMPOOL_ADD_TX, &tx).await
fn addresses(&self) -> (std::net::SocketAddr, Option<std::net::SocketAddr>) {
(
self.http.backend_settings.address,
Some(self.testing_http.backend_settings.address),
)
}
}

View File

@ -12,7 +12,7 @@ pub(crate) const LOGS_PREFIX: &str = "__logs";
static KEEP_NODE_TEMPDIRS: LazyLock<bool> =
LazyLock::new(|| std::env::var("NOMOS_TESTS_KEEP_LOGS").is_ok());
fn create_tempdir() -> std::io::Result<TempDir> {
pub(crate) fn create_tempdir() -> std::io::Result<TempDir> {
// It's easier to use the current location instead of OS-default tempfile
// location because Github Actions can easily access files in the current
// location using wildcard to upload them as artifacts.

View File

@ -1,35 +1,19 @@
use std::{
collections::HashSet,
path::PathBuf,
process::{Child, Command, Stdio},
time::Duration,
};
use std::{ops::Deref, path::PathBuf, time::Duration};
use broadcast_service::BlockInfo;
use chain_service::CryptarchiaInfo;
use futures::Stream;
use kzgrs_backend::common::share::{DaLightShare, DaShare, DaSharesCommitments};
use nomos_core::{block::Block, da::BlobId, mantle::SignedMantleTx, sdp::SessionNumber};
use nomos_da_network_core::swarm::{BalancerStats, MonitorStats};
use nomos_da_network_service::MembershipResponse;
use nomos_http_api_common::paths::{CRYPTARCHIA_HEADERS, DA_GET_SHARES_COMMITMENTS};
use nomos_network::backends::libp2p::Libp2pInfo;
use nomos_node::{Config, HeaderId, api::testing::handlers::HistoricSamplingRequest};
use nomos_node::Config;
use nomos_tracing_service::LoggerLayer;
use reqwest::Url;
pub use testing_framework_config::nodes::validator::create_validator_config;
use tokio::time::error::Elapsed;
use tx_service::MempoolMetrics;
use super::{ApiClient, create_tempdir, persist_tempdir, should_persist_tempdir};
use super::{persist_tempdir, should_persist_tempdir};
use crate::{
IS_DEBUG_TRACING, adjust_timeout,
IS_DEBUG_TRACING,
nodes::{
LOGS_PREFIX,
common::{
binary::{BinaryConfig, BinaryResolver},
config::{injection::inject_ibd_into_cryptarchia, paths::ensure_recovery_paths},
lifecycle::{kill::kill_child, spawn::configure_logging},
lifecycle::kill::kill_child,
node::{NodeConfigCommon, NodeHandle, spawn_node},
},
},
};
@ -52,256 +36,74 @@ pub enum Pool {
}
pub struct Validator {
tempdir: tempfile::TempDir,
child: Child,
config: Config,
api: ApiClient,
handle: NodeHandle<Config>,
}
impl Deref for Validator {
type Target = NodeHandle<Config>;
fn deref(&self) -> &Self::Target {
&self.handle
}
}
impl Drop for Validator {
fn drop(&mut self) {
if should_persist_tempdir()
&& let Err(e) = persist_tempdir(&mut self.tempdir, "nomos-node")
&& let Err(e) = persist_tempdir(&mut self.handle.tempdir, "nomos-node")
{
println!("failed to persist tempdir: {e}");
}
kill_child(&mut self.child);
kill_child(&mut self.handle.child);
}
}
impl Validator {
/// Check if the validator process is still running
pub fn is_running(&mut self) -> bool {
crate::nodes::common::lifecycle::monitor::is_running(&mut self.child)
crate::nodes::common::lifecycle::monitor::is_running(&mut self.handle.child)
}
/// Wait for the validator process to exit, with a timeout
/// Returns true if the process exited within the timeout, false otherwise
pub async fn wait_for_exit(&mut self, timeout: Duration) -> bool {
tokio::time::timeout(timeout, async {
loop {
if !self.is_running() {
return;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
})
.await
.is_ok()
self.handle.wait_for_exit(timeout).await
}
pub async fn spawn(mut config: Config) -> Result<Self, Elapsed> {
let dir = create_tempdir().unwrap();
let config_path = dir.path().join("validator.yaml");
let file = std::fs::File::create(&config_path).unwrap();
pub async fn spawn(config: Config) -> Result<Self, Elapsed> {
let handle = spawn_node(
config,
LOGS_PREFIX,
"validator.yaml",
binary_path(),
!*IS_DEBUG_TRACING,
)
.await?;
// Ensure recovery files/dirs exist so services that persist state do not fail
// on startup.
let _ = ensure_recovery_paths(dir.path());
Ok(Self { handle })
}
}
if !*IS_DEBUG_TRACING {
configure_logging(dir.path(), LOGS_PREFIX, |cfg| {
config.tracing.logger = LoggerLayer::File(cfg);
});
}
impl NodeConfigCommon for Config {
fn set_logger(&mut self, logger: LoggerLayer) {
self.tracing.logger = logger;
}
config.storage.db_path = dir.path().join("db");
dir.path().clone_into(
&mut config
fn set_paths(&mut self, base: &std::path::Path) {
self.storage.db_path = base.join("db");
base.clone_into(
&mut self
.da_verifier
.storage_adapter_settings
.blob_storage_directory,
);
let addr = config.http.backend_settings.address;
let testing_addr = config.testing_http.backend_settings.address;
let mut yaml_value = serde_yaml::to_value(&config).unwrap();
inject_ibd_into_cryptarchia(&mut yaml_value);
serde_yaml::to_writer(file, &yaml_value).unwrap();
let child = Command::new(binary_path())
.arg(&config_path)
.current_dir(dir.path())
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()
.unwrap();
let node = Self {
child,
tempdir: dir,
config,
api: ApiClient::new(addr, Some(testing_addr)),
};
tokio::time::timeout(adjust_timeout(Duration::from_secs(60)), async {
node.wait_online().await;
})
.await?;
Ok(node)
}
#[must_use]
pub fn url(&self) -> Url {
self.api.base_url().clone()
}
#[must_use]
pub fn testing_url(&self) -> Option<Url> {
self.api.testing_url()
}
async fn wait_online(&self) {
loop {
if self.api.consensus_info().await.is_ok() {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
pub async fn get_block(&self, id: HeaderId) -> Option<Block<SignedMantleTx>> {
self.api.storage_block(&id).await.unwrap()
}
pub async fn get_commitments(&self, blob_id: BlobId) -> Option<DaSharesCommitments> {
self.api
.post_json_decode(DA_GET_SHARES_COMMITMENTS, &blob_id)
.await
.unwrap()
}
pub async fn get_mempoool_metrics(&self, pool: Pool) -> MempoolMetrics {
let discr = match pool {
Pool::Mantle => "mantle",
Pool::Da => "da",
};
let res = self.api.mempool_metrics(discr).await.unwrap();
MempoolMetrics {
pending_items: res["pending_items"].as_u64().unwrap() as usize,
last_item_timestamp: res["last_item_timestamp"].as_u64().unwrap(),
}
}
pub async fn da_historic_sampling<I>(
&self,
block_id: HeaderId,
blob_ids: I,
) -> Result<bool, reqwest::Error>
where
I: IntoIterator<Item = (BlobId, SessionNumber)>,
{
let request = HistoricSamplingRequest {
block_id,
blob_ids: blob_ids.into_iter().collect(),
};
self.api.da_historic_sampling(&request).await
}
// not async so that we can use this in `Drop`
#[must_use]
pub fn get_logs_from_file(&self) -> String {
println!(
"fetching logs from dir {}...",
self.tempdir.path().display()
);
// std::thread::sleep(std::time::Duration::from_secs(50));
std::fs::read_dir(self.tempdir.path())
.unwrap()
.filter_map(|entry| {
let entry = entry.unwrap();
let path = entry.path();
(path.is_file() && path.to_str().unwrap().contains(LOGS_PREFIX)).then_some(path)
})
.map(|f| std::fs::read_to_string(f).unwrap())
.collect::<String>()
}
#[must_use]
pub const fn config(&self) -> &Config {
&self.config
}
pub async fn get_headers(&self, from: Option<HeaderId>, to: Option<HeaderId>) -> Vec<HeaderId> {
let mut req = self.api.get_builder(CRYPTARCHIA_HEADERS);
if let Some(from) = from {
req = req.query(&[("from", from)]);
}
if let Some(to) = to {
req = req.query(&[("to", to)]);
}
let res = self.api.get_headers_raw(req).await;
println!("res: {res:?}");
res.unwrap().json::<Vec<HeaderId>>().await.unwrap()
}
pub async fn consensus_info(&self) -> CryptarchiaInfo {
let info = self.api.consensus_info().await.unwrap();
println!("{info:?}");
info
}
pub async fn balancer_stats(&self) -> BalancerStats {
self.api.balancer_stats().await.unwrap()
}
pub async fn monitor_stats(&self) -> MonitorStats {
self.api.monitor_stats().await.unwrap()
}
pub async fn da_get_membership(
&self,
session_id: SessionNumber,
) -> Result<MembershipResponse, reqwest::Error> {
self.api.da_get_membership(&session_id).await
}
pub async fn network_info(&self) -> Libp2pInfo {
self.api.network_info().await.unwrap()
}
pub async fn get_shares(
&self,
blob_id: BlobId,
requested_shares: HashSet<[u8; 2]>,
filter_shares: HashSet<[u8; 2]>,
return_available: bool,
) -> Result<impl Stream<Item = DaLightShare>, common_http_client::Error> {
self.api
.http_client()
.get_shares::<DaShare>(
self.api.base_url().clone(),
blob_id,
requested_shares,
filter_shares,
return_available,
)
.await
}
pub async fn get_storage_commitments(
&self,
blob_id: BlobId,
) -> Result<Option<DaSharesCommitments>, common_http_client::Error> {
self.api
.http_client()
.get_storage_commitments::<DaShare>(self.api.base_url().clone(), blob_id)
.await
}
pub async fn get_lib_stream(
&self,
) -> Result<impl Stream<Item = BlockInfo>, common_http_client::Error> {
self.api
.http_client()
.get_lib_stream(self.api.base_url().clone())
.await
fn addresses(&self) -> (std::net::SocketAddr, Option<std::net::SocketAddr>) {
(
self.http.backend_settings.address,
Some(self.testing_http.backend_settings.address),
)
}
}

View File

@ -18,7 +18,7 @@ impl<'a> ReadinessCheck<'a> for DaBalancerReadiness<'a> {
data.push((
self.labels[idx].clone(),
validator.config().da_network.subnet_threshold,
validator.balancer_stats().await,
validator.api().balancer_stats().await.unwrap(),
));
}
for (offset, executor) in self.topology.executors.iter().enumerate() {
@ -26,7 +26,7 @@ impl<'a> ReadinessCheck<'a> for DaBalancerReadiness<'a> {
data.push((
self.labels[label_index].clone(),
executor.config().da_network.subnet_threshold,
executor.balancer_stats().await,
executor.api().balancer_stats().await.unwrap(),
));
}
data

View File

@ -22,13 +22,13 @@ impl<'a> ReadinessCheck<'a> for MembershipReadiness<'a> {
self.topology
.validators
.iter()
.map(|node| node.da_get_membership(self.session)),
.map(|node| node.api().da_get_membership(&self.session)),
),
futures::future::join_all(
self.topology
.executors
.iter()
.map(|node| node.da_get_membership(self.session)),
.map(|node| node.api().da_get_membership(&self.session)),
)
);

View File

@ -21,13 +21,13 @@ impl<'a> ReadinessCheck<'a> for NetworkReadiness<'a> {
self.topology
.validators
.iter()
.map(crate::nodes::validator::Validator::network_info)
.map(|node| async { node.api().network_info().await.unwrap() })
),
futures::future::join_all(
self.topology
.executors
.iter()
.map(crate::nodes::executor::Executor::network_info)
.map(|node| async { node.api().network_info().await.unwrap() })
)
);

View File

@ -53,7 +53,7 @@ pub async fn ensure_image_present(
return Ok(());
}
if image != "nomos-testnet:local" {
if image != "logos-blockchain-testing:local" {
return Err(ComposeRunnerError::MissingImage {
image: image.to_owned(),
});

View File

@ -3,8 +3,8 @@ use std::env;
/// Select the compose image and optional platform, honoring
/// NOMOS_TESTNET_IMAGE.
pub fn resolve_image() -> (String, Option<String>) {
let image =
env::var("NOMOS_TESTNET_IMAGE").unwrap_or_else(|_| String::from("nomos-testnet:local"));
let image = env::var("NOMOS_TESTNET_IMAGE")
.unwrap_or_else(|_| String::from("logos-blockchain-testing:local"));
let platform = (image == "ghcr.io/logos-co/nomos:testnet").then(|| "linux/amd64".to_owned());
(image, platform)
}

View File

@ -39,12 +39,3 @@ app.kubernetes.io/name: {{ include "nomos-runner.chart" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
nomos/logical-role: prometheus
{{- end -}}
{{- define "nomos-runner.cfgsyncPort" -}}
{{- $env := env "NOMOS_CFGSYNC_PORT" -}}
{{- if $env -}}
{{ $env | int }}
{{- else -}}
{{ .Values.cfgsync.port | int }}
{{- end -}}
{{- end -}}

View File

@ -23,7 +23,7 @@ spec:
command: ["/etc/nomos/scripts/run_cfgsync.sh"]
ports:
- name: http
containerPort: {{ include "nomos-runner.cfgsyncPort" . }}
containerPort: {{ .Values.cfgsync.port }}
env:
- name: RUST_LOG
value: debug

View File

@ -11,5 +11,5 @@ spec:
nomos/component: cfgsync
ports:
- name: http
port: {{ include "nomos-runner.cfgsyncPort" . }}
port: {{ .Values.cfgsync.port }}
targetPort: http

View File

@ -1,4 +1,4 @@
image: "nomos-testnet:local"
image: "logos-blockchain-testing:local"
imagePullPolicy: IfNotPresent
cfgsync:

View File

@ -86,8 +86,8 @@ pub fn prepare_assets(topology: &GeneratedTopology) -> Result<RunnerAssets, Asse
let chart_path = helm_chart_path()?;
let values_yaml = render_values_yaml(topology)?;
let values_file = write_temp_file(tempdir.path(), "values.yaml", values_yaml)?;
let image =
env::var("NOMOS_TESTNET_IMAGE").unwrap_or_else(|_| String::from("nomos-testnet:local"));
let image = env::var("NOMOS_TESTNET_IMAGE")
.unwrap_or_else(|_| String::from("logos-blockchain-testing:local"));
Ok(RunnerAssets {
image,
@ -215,10 +215,16 @@ fn stack_scripts_root(root: &Path) -> PathBuf {
#[derive(Serialize)]
struct HelmValues {
cfgsync: CfgsyncValues,
validators: NodeGroup,
executors: NodeGroup,
}
#[derive(Serialize)]
struct CfgsyncValues {
port: u16,
}
#[derive(Serialize)]
struct NodeGroup {
count: usize,
@ -235,6 +241,9 @@ struct NodeValues {
}
fn build_values(topology: &GeneratedTopology) -> HelmValues {
let cfgsync = CfgsyncValues {
port: cfgsync_port(),
};
let pol_mode = pol_proof_mode();
let validators = topology
.validators()
@ -311,6 +320,7 @@ fn build_values(topology: &GeneratedTopology) -> HelmValues {
.collect();
HelmValues {
cfgsync,
validators: NodeGroup {
count: topology.validators().len(),
nodes: validators,