Compose manual cluster

This commit is contained in:
andrussal 2026-01-29 12:12:47 +01:00
parent 3977a90682
commit 74659780fe
25 changed files with 907 additions and 129 deletions

1
Cargo.lock generated
View File

@ -6513,6 +6513,7 @@ dependencies = [
"logos-blockchain-groth16",
"logos-blockchain-key-management-system-service",
"logos-blockchain-ledger",
"logos-blockchain-network-service",
"logos-blockchain-tracing",
"logos-blockchain-tracing-service",
"logos-blockchain-zksign",

View File

@ -132,6 +132,23 @@ build_bundle::default_docker_platform() {
esac
}
build_bundle::ensure_circuits() {
if [ -n "${LOGOS_BLOCKCHAIN_CIRCUITS:-}" ]; then
[ -d "${LOGOS_BLOCKCHAIN_CIRCUITS}" ] || build_bundle::fail \
"LOGOS_BLOCKCHAIN_CIRCUITS is set but missing: ${LOGOS_BLOCKCHAIN_CIRCUITS}"
return 0
fi
local default_dir="${HOME}/.logos-blockchain-circuits"
if [ ! -d "${default_dir}" ]; then
echo "==> Circuits not found; installing to ${default_dir}"
bash "${ROOT_DIR}/scripts/setup/setup-logos-blockchain-circuits.sh" "${VERSION}" "${default_dir}"
fi
LOGOS_BLOCKCHAIN_CIRCUITS="${default_dir}"
export LOGOS_BLOCKCHAIN_CIRCUITS
}
build_bundle::parse_args() {
PLATFORM="host"
OUTPUT=""
@ -239,6 +256,9 @@ build_bundle::maybe_run_linux_build_in_docker() {
;;
esac
fi
if [ -n "${LOGOS_BLOCKCHAIN_CIRCUITS:-}" ] && [ -d "${LOGOS_BLOCKCHAIN_CIRCUITS}" ]; then
extra_mounts+=("-v" "${LOGOS_BLOCKCHAIN_CIRCUITS}:/root/.logos-blockchain-circuits:ro")
fi
echo "==> Building Linux bundle inside Docker"
local container_output="/workspace${OUTPUT#"${ROOT_DIR}"}"
@ -263,6 +283,7 @@ build_bundle::maybe_run_linux_build_in_docker() {
-e VERSION="${VERSION}" \
-e LOGOS_BLOCKCHAIN_NODE_REV="${LOGOS_BLOCKCHAIN_NODE_REV}" \
-e LOGOS_BLOCKCHAIN_NODE_PATH="${node_path_env}" \
-e LOGOS_BLOCKCHAIN_CIRCUITS="/root/.logos-blockchain-circuits" \
-e LOGOS_BLOCKCHAIN_BUNDLE_DOCKER_PLATFORM="${DOCKER_PLATFORM}" \
-e LOGOS_BLOCKCHAIN_EXTRA_FEATURES="${LOGOS_BLOCKCHAIN_EXTRA_FEATURES:-}" \
-e BUNDLE_IN_CONTAINER=1 \
@ -329,14 +350,15 @@ build_bundle::build_binaries() {
if [ -z "${LOGOS_BLOCKCHAIN_NODE_PATH}" ]; then
build_bundle::apply_nomos_node_patches "${NODE_SRC}"
fi
unset CARGO_FEATURE_BUILD_VERIFICATION_KEY
if [ -n "${BUNDLE_RUSTUP_TOOLCHAIN}" ]; then
CARGO_FEATURE_BUILD_VERIFICATION_KEY=1 \
RUSTFLAGS='--cfg feature="pol-dev-mode"' \
RUSTUP_TOOLCHAIN="${BUNDLE_RUSTUP_TOOLCHAIN}" \
cargo build --all-features \
-p logos-blockchain-node \
--target-dir "${NODE_TARGET}"
else
CARGO_FEATURE_BUILD_VERIFICATION_KEY=1 \
RUSTFLAGS='--cfg feature="pol-dev-mode"' \
cargo build --all-features \
-p logos-blockchain-node \
@ -385,6 +407,7 @@ build_bundle::main() {
build_bundle::clean_cargo_linux_cache
build_bundle::parse_args "$@"
build_bundle::validate_and_finalize
build_bundle::ensure_circuits
build_bundle::maybe_run_linux_build_in_docker
build_bundle::prepare_circuits
build_bundle::build_binaries

View File

@ -164,6 +164,10 @@ build_test_image::docker_build() {
linux-aarch64) target_platform="linux/arm64" ;;
esac
fi
if [ -z "${target_platform}" ] && [ "${host_platform}" = "linux/arm64" ]; then
# Default to amd64 so circuits download matches available release artifacts.
target_platform="linux/amd64"
fi
local -a base_build_args=(
-f "${BASE_DOCKERFILE_PATH}"

View File

@ -40,6 +40,8 @@ RUN /workspace/testing-framework/assets/stack/scripts/setup-logos-blockchain-cir
ENV LOGOS_BLOCKCHAIN_CIRCUITS=/opt/circuits
RUN /workspace/scripts/build/build-rapidsnark.sh /opt/circuits
RUN /workspace/testing-framework/assets/stack/scripts/docker/prepare_binaries.sh
# Strip local path patches so container builds use git sources.
@ -68,5 +70,8 @@ RUN apt-get update && apt-get install -yq \
COPY --from=builder /workspace/artifacts/logos-blockchain-node /usr/bin/logos-blockchain-node
COPY --from=builder /workspace/artifacts/cfgsync-server /usr/bin/cfgsync-server
COPY --from=builder /workspace/artifacts/cfgsync-client /usr/bin/cfgsync-client
COPY --from=builder /opt/circuits /opt/circuits
ENV LOGOS_BLOCKCHAIN_CIRCUITS=/opt/circuits
EXPOSE 3000 8080 9000 60000

View File

@ -51,6 +51,7 @@ git reset --hard
git clean -fdx
# Enable pol-dev-mode via cfg to let POL_PROOF_DEV_MODE short-circuit proofs in tests.
# Build with default features disabled for proof crates (dummy verification keys).
RUSTFLAGS='--cfg feature="pol-dev-mode"' \
cargo build --features "testing" -p logos-blockchain-node

View File

@ -170,8 +170,7 @@ async fn wait_for_network_readiness(
let listen_ports = topology.listen_ports();
let initial_peer_ports = topology.initial_peer_ports();
let expected_peer_counts =
crate::topology::generation::find_expected_peer_counts(&listen_ports, &initial_peer_ports);
let expected_peer_counts = find_expected_peer_counts(&listen_ports, &initial_peer_ports);
let network_check = HttpNetworkReadiness {
client,

View File

@ -1,23 +1,61 @@
use std::time::Duration;
use std::{collections::HashSet, time::Duration};
use nomos_network::backends::libp2p::Libp2pInfo;
use testing_framework_core::topology::readiness::ReadinessCheck;
use tokio::time::timeout;
use crate::node_control::ReadinessNode;
use super::ReadinessCheck;
use crate::{nodes::ApiClient, topology::generation::find_expected_peer_counts};
const NETWORK_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
pub(super) struct ManualNetworkReadiness {
#[derive(Clone)]
pub struct ReadinessNode {
pub label: String,
pub expected_peers: Option<usize>,
pub api: ApiClient,
}
pub struct ManualNetworkReadiness {
nodes: Vec<ReadinessNode>,
}
impl ManualNetworkReadiness {
pub(super) fn new(nodes: Vec<ReadinessNode>) -> Self {
pub fn new(nodes: Vec<ReadinessNode>) -> Self {
Self { nodes }
}
}
pub struct ManualNetworkStatus {
label: String,
expected_peers: Option<usize>,
result: Result<Libp2pInfo, String>,
}
pub fn build_readiness_nodes<I>(iter: I) -> Vec<ReadinessNode>
where
I: IntoIterator<Item = (String, ApiClient, u16, HashSet<u16>)>,
{
let entries = iter.into_iter().collect::<Vec<_>>();
let listen_ports = entries.iter().map(|entry| entry.2).collect::<Vec<_>>();
let initial_peer_ports = entries
.iter()
.map(|entry| entry.3.clone())
.collect::<Vec<_>>();
let expected_peer_counts = find_expected_peer_counts(&listen_ports, &initial_peer_ports);
entries
.into_iter()
.enumerate()
.map(|(idx, (label, api, _, _))| ReadinessNode {
label,
expected_peers: expected_peer_counts.get(idx).copied(),
api,
})
.collect()
}
#[async_trait::async_trait]
impl<'a> ReadinessCheck<'a> for ManualNetworkReadiness {
type Data = Vec<ManualNetworkStatus>;
@ -66,9 +104,3 @@ impl<'a> ReadinessCheck<'a> for ManualNetworkReadiness {
format!("timed out waiting for network readiness: {summary}")
}
}
pub(super) struct ManualNetworkStatus {
label: String,
expected_peers: Option<usize>,
result: Result<Libp2pInfo, String>,
}

View File

@ -1,7 +1,11 @@
pub mod manual_network;
pub mod network;
use std::time::Duration;
pub use manual_network::{
ManualNetworkReadiness, ManualNetworkStatus, ReadinessNode, build_readiness_nodes,
};
pub use network::{HttpNetworkReadiness, NetworkReadiness};
use thiserror::Error;
use tokio::time::{sleep, timeout};

View File

@ -16,6 +16,7 @@ workspace = true
anyhow = "1"
async-trait = { workspace = true }
cfgsync_tf = { workspace = true }
nomos-network = { workspace = true }
nomos-tracing = { workspace = true }
nomos-tracing-service = { workspace = true }
reqwest = { features = ["json"], workspace = true }

View File

@ -29,31 +29,4 @@ services:
- seccomp=unconfined
restart: on-failure
{% if node.platform %} platform: {{ node.platform }}
{% endif %} entrypoint: {{ node.entrypoint }}
volumes:
{% for volume in node.volumes %}
- {{ volume }}
{% endfor %}
{% if node.extra_hosts | length > 0 %}
extra_hosts:
{% for host in node.extra_hosts %}
- {{ host }}
{% endfor %}
{% endif %}
ports:
{% for port in node.ports %}
- {{ port }}
{% endfor %}
environment:
{% for env in node.environment %}
{{ env.key }}: "{{ env.value }}"
{% endfor %}
cap_add:
- SYS_ADMIN
- SYS_PTRACE
security_opt:
- seccomp=unconfined
restart: on-failure
{% endfor %}

View File

@ -5,12 +5,19 @@ pub mod readiness;
pub mod setup;
use async_trait::async_trait;
use testing_framework_core::scenario::{
BlockFeedTask, CleanupGuard, Deployer, ObservabilityCapabilityProvider, RequiresNodeControl,
Runner, Scenario,
use testing_framework_core::{
scenario::{
BlockFeedTask, CleanupGuard, Deployer, ObservabilityCapabilityProvider,
RequiresNodeControl, Runner, Scenario,
},
topology::config::TopologyConfig,
};
use crate::{errors::ComposeRunnerError, lifecycle::cleanup::RunnerCleanup};
use crate::{
errors::ComposeRunnerError,
lifecycle::cleanup::RunnerCleanup,
manual::{ComposeManualCluster, ManualClusterError},
};
/// Docker Compose-based deployer for Logos test scenarios.
#[derive(Clone, Copy)]
@ -37,6 +44,14 @@ impl ComposeDeployer {
self.readiness_checks = enabled;
self
}
/// Build a manual cluster using this deployer's compose implementation.
pub async fn manual_cluster(
&self,
config: TopologyConfig,
) -> Result<ComposeManualCluster, ManualClusterError> {
ComposeManualCluster::from_config(config).await
}
}
#[async_trait]

View File

@ -123,6 +123,8 @@ fn base_environment(cfgsync_port: u16) -> Vec<EnvEntry> {
let rust_log = tf_env::rust_log().unwrap_or_else(|| "info".to_string());
let nomos_log_level = tf_env::nomos_log_level().unwrap_or_else(|| "info".to_string());
let time_backend = tf_env::nomos_time_backend().unwrap_or_else(|| "monotonic".into());
let cfgsync_host =
env::var("LOGOS_BLOCKCHAIN_CFGSYNC_HOST").unwrap_or_else(|_| "cfgsync".to_string());
vec![
EnvEntry::new("POL_PROOF_DEV_MODE", pol_mode),
EnvEntry::new("RUST_LOG", rust_log),
@ -130,7 +132,7 @@ fn base_environment(cfgsync_port: u16) -> Vec<EnvEntry> {
EnvEntry::new("LOGOS_BLOCKCHAIN_TIME_BACKEND", time_backend),
EnvEntry::new(
"CFG_SERVER_ADDR",
format!("http://host.docker.internal:{cfgsync_port}"),
format!("http://{cfgsync_host}:{cfgsync_port}"),
),
EnvEntry::new("OTEL_METRIC_EXPORT_INTERVAL", "5000"),
]

View File

@ -47,15 +47,8 @@ pub async fn compose_up(
project_name: &str,
root: &Path,
) -> Result<(), ComposeCommandError> {
let mut cmd = Command::new("docker");
cmd.arg("compose")
.arg("-f")
.arg(compose_path)
.arg("-p")
.arg(project_name)
.arg("up")
.arg("-d")
.current_dir(root);
let mut cmd = compose_command(compose_path, project_name, root);
cmd.arg("up").arg("-d");
info!(
compose_file = %compose_path.display(),
@ -67,21 +60,64 @@ pub async fn compose_up(
run_compose_command(cmd, adjust_timeout(COMPOSE_UP_TIMEOUT), "docker compose up").await
}
/// Runs `docker compose up --no-start` for the generated stack.
pub async fn compose_create(
compose_path: &Path,
project_name: &str,
root: &Path,
) -> Result<(), ComposeCommandError> {
let mut cmd = compose_command(compose_path, project_name, root);
cmd.arg("up").arg("--no-start");
info!(
compose_file = %compose_path.display(),
project = project_name,
root = %root.display(),
"running docker compose create"
);
run_compose_command(
cmd,
adjust_timeout(COMPOSE_UP_TIMEOUT),
"docker compose create",
)
.await
}
/// Runs `docker compose up -d --no-deps <service>` for a single service.
pub async fn compose_up_service(
compose_path: &Path,
project_name: &str,
root: &Path,
service: &str,
) -> Result<(), ComposeCommandError> {
let mut cmd = compose_command(compose_path, project_name, root);
cmd.arg("up").arg("-d").arg("--no-deps").arg(service);
info!(
compose_file = %compose_path.display(),
project = project_name,
root = %root.display(),
service,
"running docker compose up for service"
);
run_compose_command(
cmd,
adjust_timeout(COMPOSE_UP_TIMEOUT),
"docker compose up service",
)
.await
}
/// Runs `docker compose down --volumes` for the generated stack.
pub async fn compose_down(
compose_path: &Path,
project_name: &str,
root: &Path,
) -> Result<(), ComposeCommandError> {
let mut cmd = Command::new("docker");
cmd.arg("compose")
.arg("-f")
.arg(compose_path)
.arg("-p")
.arg(project_name)
.arg("down")
.arg("--volumes")
.current_dir(root);
let mut cmd = compose_command(compose_path, project_name, root);
cmd.arg("down").arg("--volumes");
info!(
compose_file = %compose_path.display(),
@ -100,15 +136,8 @@ pub async fn compose_down(
/// Dump docker compose logs to stderr for debugging failures.
pub async fn dump_compose_logs(compose_file: &Path, project: &str, root: &Path) {
let mut cmd = Command::new("docker");
cmd.arg("compose")
.arg("-f")
.arg(compose_file)
.arg("-p")
.arg(project)
.arg("logs")
.arg("--no-color")
.current_dir(root);
let mut cmd = compose_command(compose_file, project, root);
cmd.arg("logs").arg("--no-color");
match cmd.output().await {
Ok(output) => print_logs(&output.stdout, &output.stderr),
@ -146,6 +175,17 @@ async fn run_compose_command(
}
}
fn compose_command(compose_path: &Path, project_name: &str, root: &Path) -> Command {
let mut cmd = Command::new("docker");
cmd.arg("compose")
.arg("-f")
.arg(compose_path)
.arg("-p")
.arg(project_name)
.current_dir(root);
cmd
}
fn handle_compose_status(
status: std::io::Result<std::process::ExitStatus>,
description: &str,

View File

@ -27,6 +27,15 @@ impl CfgsyncServerHandle {
_ => {}
}
}
/// Prevent automatic shutdown for preserved runs.
pub fn mark_preserved(&mut self) {
match self {
Self::Container { stopped, .. } => {
*stopped = true;
}
}
}
}
fn remove_container(name: &str) {

View File

@ -9,14 +9,18 @@ use reqwest::Url;
use testing_framework_core::{
adjust_timeout, scenario::CleanupGuard, topology::generation::GeneratedTopology,
};
use tokio::process::Command;
use tracing::{debug, error, info};
use tokio::{
net::TcpStream,
process::Command,
time::{Instant, sleep},
};
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use crate::{
descriptor::ComposeDescriptor,
docker::{
commands::{compose_up, dump_compose_logs, run_docker_command},
commands::{compose_create, compose_up, dump_compose_logs, run_docker_command},
ensure_compose_image,
platform::resolve_image,
workspace::ComposeWorkspace,
@ -30,6 +34,8 @@ use crate::{
};
const CFGSYNC_START_TIMEOUT: Duration = Duration::from_secs(180);
const CFGSYNC_READY_TIMEOUT: Duration = Duration::from_secs(60);
const CFGSYNC_READY_POLL: Duration = Duration::from_secs(2);
/// Paths and flags describing the prepared compose workspace.
pub struct WorkspaceState {
@ -136,9 +142,11 @@ pub fn ensure_supported_topology(
descriptors: &GeneratedTopology,
) -> Result<(), ComposeRunnerError> {
let nodes = descriptors.nodes().len();
if nodes == 0 {
return Err(ComposeRunnerError::MissingNode { nodes });
}
Ok(())
}
@ -192,10 +200,17 @@ pub fn update_cfgsync_logged(
pub async fn start_cfgsync_stage(
workspace: &WorkspaceState,
cfgsync_port: u16,
project_name: &str,
) -> Result<CfgsyncServerHandle, ComposeRunnerError> {
info!(cfgsync_port = cfgsync_port, "launching cfgsync server");
let handle = launch_cfgsync(&workspace.cfgsync_path, cfgsync_port).await?;
let network = compose_network_name(project_name);
let handle = launch_cfgsync(&workspace.cfgsync_path, cfgsync_port, &network).await?;
wait_for_cfgsync_ready(cfgsync_port, Some(&handle)).await?;
debug!(container = ?handle, "cfgsync server launched");
Ok(handle)
}
@ -231,7 +246,9 @@ pub fn allocate_cfgsync_port() -> Result<u16, ConfigError> {
source: source.into(),
})?
.port();
debug!(port, "allocated cfgsync port");
Ok(port)
}
@ -239,6 +256,7 @@ pub fn allocate_cfgsync_port() -> Result<u16, ConfigError> {
pub async fn launch_cfgsync(
cfgsync_path: &Path,
port: u16,
network: &str,
) -> Result<CfgsyncServerHandle, ConfigError> {
let testnet_dir = cfgsync_path
.parent()
@ -246,8 +264,10 @@ pub async fn launch_cfgsync(
port,
source: anyhow!("cfgsync path {cfgsync_path:?} has no parent directory"),
})?;
let (image, _) = resolve_image();
let container_name = format!("nomos-cfgsync-{}", Uuid::new_v4());
debug!(
container = %container_name,
image,
@ -262,6 +282,10 @@ pub async fn launch_cfgsync(
.arg("-d")
.arg("--name")
.arg(&container_name)
.arg("--network")
.arg(network)
.arg("--network-alias")
.arg("cfgsync")
.arg("--entrypoint")
.arg("cfgsync-server")
.arg("-p")
@ -273,9 +297,31 @@ pub async fn launch_cfgsync(
.canonicalize()
.unwrap_or_else(|_| testnet_dir.to_path_buf())
.display()
))
.arg(&image)
.arg("/etc/nomos/cfgsync.yaml");
));
let circuits_dir = std::env::var("LOGOS_BLOCKCHAIN_CIRCUITS_DOCKER")
.ok()
.or_else(|| std::env::var("LOGOS_BLOCKCHAIN_CIRCUITS").ok());
if let Some(circuits_dir) = circuits_dir {
let host_path = PathBuf::from(&circuits_dir);
if host_path.exists() {
command
.arg("-e")
.arg(format!("LOGOS_BLOCKCHAIN_CIRCUITS={circuits_dir}"))
.arg("-v")
.arg(format!(
"{}:{circuits_dir}:ro",
host_path.canonicalize().unwrap_or(host_path).display()
));
}
}
if let Ok(pol_mode) = std::env::var("POL_PROOF_DEV_MODE") {
command
.arg("-e")
.arg(format!("POL_PROOF_DEV_MODE={pol_mode}"));
}
command.arg(&image).arg("/etc/nomos/cfgsync.yaml");
run_docker_command(
command,
@ -372,7 +418,8 @@ pub async fn prepare_environment(
let compose_path = render_compose_logged(&workspace, descriptors, cfgsync_port)?;
let project_name = format!("nomos-compose-{}", Uuid::new_v4());
let mut cfgsync_handle = start_cfgsync_stage(&workspace, cfgsync_port).await?;
compose_create(&compose_path, &project_name, &workspace.root).await?;
let mut cfgsync_handle = start_cfgsync_stage(&workspace, cfgsync_port, &project_name).await?;
if let Err(err) = bring_up_stack_logged(
&compose_path,
@ -401,3 +448,106 @@ pub async fn prepare_environment(
Some(cfgsync_handle),
))
}
/// Prepare workspace, cfgsync, and compose artifacts without starting services.
pub async fn prepare_environment_manual(
descriptors: &GeneratedTopology,
metrics_otlp_ingest_url: Option<&Url>,
) -> Result<StackEnvironment, ComposeRunnerError> {
let workspace = prepare_workspace_logged()?;
let cfgsync_port = allocate_cfgsync_port()?;
update_cfgsync_logged(
&workspace,
descriptors,
cfgsync_port,
metrics_otlp_ingest_url,
)?;
ensure_compose_image().await?;
let compose_path = render_compose_logged(&workspace, descriptors, cfgsync_port)?;
let project_name = format!("nomos-compose-{}", Uuid::new_v4());
compose_create(&compose_path, &project_name, &workspace.root).await?;
let cfgsync_handle = start_cfgsync_stage(&workspace, cfgsync_port, &project_name).await?;
info!(
project = %project_name,
compose_file = %compose_path.display(),
cfgsync_port,
"compose manual environment prepared"
);
Ok(StackEnvironment::from_workspace(
workspace,
compose_path,
project_name,
Some(cfgsync_handle),
))
}
async fn wait_for_cfgsync_ready(
port: u16,
handle: Option<&CfgsyncServerHandle>,
) -> Result<(), ComposeRunnerError> {
let deadline = Instant::now() + CFGSYNC_READY_TIMEOUT;
let addr = format!("127.0.0.1:{port}");
loop {
match TcpStream::connect(&addr).await {
Ok(_) => {
info!(port, "cfgsync server is reachable");
return Ok(());
}
Err(err) => {
if Instant::now() >= deadline {
dump_cfgsync_logs(handle).await;
return Err(ComposeRunnerError::Config(ConfigError::CfgsyncStart {
port,
source: anyhow!("cfgsync not reachable: {err}"),
}));
}
}
}
sleep(CFGSYNC_READY_POLL).await;
}
}
async fn dump_cfgsync_logs(handle: Option<&CfgsyncServerHandle>) {
let Some(name) = handle.and_then(cfgsync_container_name) else {
return;
};
let mut cmd = Command::new("docker");
cmd.arg("logs").arg(name);
match cmd.output().await {
Ok(output) => {
if !output.stdout.is_empty() {
warn!(
logs = %String::from_utf8_lossy(&output.stdout),
container = name,
"cfgsync stdout"
);
}
if !output.stderr.is_empty() {
warn!(
logs = %String::from_utf8_lossy(&output.stderr),
container = name,
"cfgsync stderr"
);
}
}
Err(err) => warn!(error = ?err, container = name, "failed to collect cfgsync logs"),
}
}
fn cfgsync_container_name(handle: &CfgsyncServerHandle) -> Option<&str> {
match handle {
CfgsyncServerHandle::Container { name, .. } => Some(name.as_str()),
}
}
fn compose_network_name(project_name: &str) -> String {
format!("{project_name}_default")
}

View File

@ -1,4 +1,4 @@
use std::time::Duration;
use std::{path::Path, time::Duration};
use anyhow::{Context as _, anyhow};
use reqwest::Url;
@ -69,17 +69,34 @@ async fn resolve_service_port(
environment: &StackEnvironment,
service: &str,
container_port: u16,
) -> Result<u16, ComposeRunnerError> {
resolve_service_port_with(
environment.compose_path(),
environment.project_name(),
environment.root(),
service,
container_port,
)
.await
}
pub(crate) async fn resolve_service_port_with(
compose_path: &Path,
project_name: &str,
root: &Path,
service: &str,
container_port: u16,
) -> Result<u16, ComposeRunnerError> {
let mut cmd = Command::new("docker");
cmd.arg("compose")
.arg("-f")
.arg(environment.compose_path())
.arg(compose_path)
.arg("-p")
.arg(environment.project_name())
.arg(project_name)
.arg("port")
.arg(service)
.arg(container_port.to_string())
.current_dir(environment.root());
.current_dir(root);
let output = timeout(adjust_timeout(COMPOSE_PORT_DISCOVERY_TIMEOUT), cmd.output())
.await
@ -148,7 +165,7 @@ fn localhost_url(port: u16) -> Result<Url, ParseError> {
Url::parse(&format!("http://{}:{port}/", compose_runner_host()))
}
fn node_identifier(index: usize) -> String {
pub(crate) fn node_identifier(index: usize) -> String {
format!("node-{index}")
}

View File

@ -4,6 +4,7 @@ pub mod docker;
pub mod errors;
pub mod infrastructure;
pub mod lifecycle;
pub mod manual;
pub use deployer::ComposeDeployer;
pub use descriptor::{ComposeDescriptor, ComposeDescriptorBuilder, EnvEntry, NodeDescriptor};
@ -16,3 +17,4 @@ pub use infrastructure::{
ports::{HostPortMapping, NodeHostPorts},
template::{TemplateError, repository_root, write_compose_file},
};
pub use manual::{ComposeManualCluster, ManualClusterError};

View File

@ -110,6 +110,11 @@ impl RunnerCleanup {
info!(path = %keep.display(), "preserving docker state");
}
if let Some(mut cfgsync) = self.cfgsync.take() {
cfgsync.mark_preserved();
self.cfgsync = Some(cfgsync);
}
info!("compose preserve flag set; skipping docker compose down");
}

View File

@ -0,0 +1,299 @@
use std::{path, sync::Mutex};
use async_trait::async_trait;
use path::{Path, PathBuf};
use testing_framework_core::{
manual::ManualClusterHandle,
nodes::ApiClient,
scenario::{CleanupGuard, DynError, PeerSelection, StartNodeOptions, StartedNode},
topology::{
config::{TopologyBuildError, TopologyBuilder, TopologyConfig},
generation::GeneratedTopology,
readiness::{ManualNetworkReadiness, ReadinessCheck, ReadinessError, ReadinessNode},
},
};
use thiserror::Error;
use crate::{
docker::{commands::compose_up_service, ensure_docker_available},
errors::ComposeRunnerError,
infrastructure::{
environment::{StackEnvironment, ensure_supported_topology, prepare_environment_manual},
ports::{NodeHostPorts, compose_runner_host, node_identifier, resolve_service_port_with},
},
};
mod network;
mod readiness;
mod state;
use network::api_client_from_host_ports;
use readiness::readiness_nodes;
use state::{ManualClusterState, next_available_index, normalize_label};
#[derive(Debug, Error)]
pub enum ManualClusterError {
#[error("failed to build topology: {source}")]
Build {
#[source]
source: TopologyBuildError,
},
#[error(transparent)]
Compose(#[from] ComposeRunnerError),
#[error("manual compose cluster only supports default peer selection")]
UnsupportedPeers,
#[error("manual compose cluster does not support config patches")]
UnsupportedConfigPatch,
#[error("cluster has already been stopped")]
Stopped,
#[error("node name '{name}' already exists")]
NameExists { name: String },
#[error("node index {index} is out of range (max {max})")]
IndexOutOfRange { index: usize, max: usize },
#[error("node index {index} already started")]
AlreadyStarted { index: usize },
#[error("no available nodes to start")]
NoAvailableNodes,
#[error("node name cannot be empty")]
EmptyName,
}
struct EnvironmentSnapshot {
compose_path: PathBuf,
project_name: String,
root: PathBuf,
}
/// Imperative, compose-backed cluster that can start nodes on demand.
pub struct ComposeManualCluster {
descriptors: GeneratedTopology,
environment: Mutex<Option<StackEnvironment>>,
state: Mutex<ManualClusterState>,
host: String,
}
impl ComposeManualCluster {
pub(crate) async fn from_config(config: TopologyConfig) -> Result<Self, ManualClusterError> {
let builder = TopologyBuilder::new(config);
let descriptors = builder
.build()
.map_err(|source| ManualClusterError::Build { source })?;
ensure_supported_topology(&descriptors)?;
ensure_docker_available().await?;
let environment = prepare_environment_manual(&descriptors, None).await?;
let total_nodes = descriptors.nodes().len();
Ok(Self {
descriptors,
environment: Mutex::new(Some(environment)),
state: Mutex::new(ManualClusterState::new(total_nodes)),
host: compose_runner_host(),
})
}
fn lock_state(&self) -> std::sync::MutexGuard<'_, ManualClusterState> {
self.state
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
}
fn resolve_label_and_index(
&self,
name: &str,
total_nodes: usize,
) -> Result<(usize, String), ManualClusterError> {
let state = self.lock_state();
let label = normalize_label(name).ok_or(ManualClusterError::EmptyName)?;
let index = match state.name_to_index.get(&label).copied() {
Some(index) => index,
None => next_available_index(&state.started_indices, total_nodes)
.ok_or(ManualClusterError::NoAvailableNodes)?,
};
let node_label = label;
if state.clients_by_name.contains_key(&node_label) {
return Err(ManualClusterError::NameExists { name: node_label });
}
if state.started_indices.contains(&index) {
return Err(ManualClusterError::AlreadyStarted { index });
}
Ok((index, node_label))
}
#[must_use]
pub fn node_client(&self, name: &str) -> Option<ApiClient> {
if name.trim().is_empty() {
return None;
}
let state = self.lock_state();
normalize_label(name).and_then(|label| state.clients_by_name.get(&label).cloned())
}
pub async fn start_node(&self, name: &str) -> Result<StartedNode, ManualClusterError> {
self.start_node_with(name, StartNodeOptions::default())
.await
}
pub async fn start_node_with(
&self,
name: &str,
options: StartNodeOptions,
) -> Result<StartedNode, ManualClusterError> {
if !matches!(options.peers, PeerSelection::DefaultLayout) {
return Err(ManualClusterError::UnsupportedPeers);
}
if options.config_patch.is_some() {
return Err(ManualClusterError::UnsupportedConfigPatch);
}
let total_nodes = self.descriptors.nodes().len();
let (index, node_label) = self.resolve_label_and_index(name, total_nodes)?;
let snapshot = self.environment_snapshot()?;
let service = node_identifier(index);
compose_up_service(
&snapshot.compose_path,
&snapshot.project_name,
&snapshot.root,
&service,
)
.await
.map_err(ComposeRunnerError::Compose)?;
let ports = discover_node_ports(
&snapshot.compose_path,
&snapshot.project_name,
&snapshot.root,
&service,
&self.descriptors,
index,
)
.await?;
let client = api_client_from_host_ports(&ports, &self.host)?;
let mut state = self.lock_state();
state.register_node(index, node_label.clone(), client.clone(), ports)?;
Ok(StartedNode {
name: node_label,
api: client,
})
}
pub fn stop_all(&self) {
let mut env = self
.environment
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let Some(environment) = env.take() else {
return;
};
if let Ok(cleanup) = environment.into_cleanup() {
Box::new(cleanup).cleanup();
}
let mut state = self.lock_state();
state.reset();
}
pub async fn wait_network_ready(&self) -> Result<(), ReadinessError> {
let nodes = self.readiness_nodes();
if nodes.len() <= 1 {
return Ok(());
}
ManualNetworkReadiness::new(nodes).wait().await
}
fn readiness_nodes(&self) -> Vec<ReadinessNode> {
let state = self.lock_state();
readiness_nodes(&state, &self.descriptors)
}
fn environment_snapshot(&self) -> Result<EnvironmentSnapshot, ManualClusterError> {
let env = self
.environment
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let Some(environment) = env.as_ref() else {
return Err(ManualClusterError::Stopped);
};
Ok(EnvironmentSnapshot {
compose_path: environment.compose_path().to_path_buf(),
project_name: environment.project_name().to_owned(),
root: environment.root().to_path_buf(),
})
}
}
impl Drop for ComposeManualCluster {
fn drop(&mut self) {
self.stop_all();
}
}
#[async_trait]
impl ManualClusterHandle for ComposeManualCluster {
async fn start_node_with(
&self,
name: &str,
options: StartNodeOptions,
) -> Result<StartedNode, DynError> {
self.start_node_with(name, options)
.await
.map_err(|err| err.into())
}
async fn wait_network_ready(&self) -> Result<(), DynError> {
self.wait_network_ready().await.map_err(|err| err.into())
}
}
async fn discover_node_ports(
compose_path: &Path,
project_name: &str,
root: &Path,
service: &str,
descriptors: &GeneratedTopology,
index: usize,
) -> Result<NodeHostPorts, ManualClusterError> {
let node =
descriptors
.nodes()
.get(index)
.ok_or_else(|| ManualClusterError::IndexOutOfRange {
index,
max: descriptors.nodes().len().saturating_sub(1),
})?;
let api = resolve_service_port_with(compose_path, project_name, root, service, node.api_port())
.await?;
let testing = resolve_service_port_with(
compose_path,
project_name,
root,
service,
node.testing_http_port(),
)
.await?;
Ok(NodeHostPorts { api, testing })
}

View File

@ -0,0 +1,29 @@
use reqwest::Url;
use testing_framework_core::{nodes::ApiClient, scenario::http_probe};
use super::ManualClusterError;
use crate::{
errors::{ComposeRunnerError, NodeClientError},
infrastructure::ports::NodeHostPorts,
};
pub fn api_client_from_host_ports(
ports: &NodeHostPorts,
host: &str,
) -> Result<ApiClient, ManualClusterError> {
let api_url = endpoint_url(host, "api", ports.api)?;
let testing_url = endpoint_url(host, "testing", ports.testing)?;
Ok(ApiClient::from_urls(api_url, Some(testing_url)))
}
fn endpoint_url(host: &str, endpoint: &'static str, port: u16) -> Result<Url, ManualClusterError> {
Url::parse(&format!("http://{host}:{port}/")).map_err(|source| {
ManualClusterError::Compose(ComposeRunnerError::NodeClients(NodeClientError::Endpoint {
role: http_probe::NODE_ROLE,
endpoint,
port,
source,
}))
})
}

View File

@ -0,0 +1,39 @@
use std::collections::HashSet;
use testing_framework_core::topology::{
generation::GeneratedTopology,
readiness::{ReadinessNode, build_readiness_nodes},
utils::multiaddr_port,
};
use super::state::ManualClusterState;
pub fn readiness_nodes(
state: &ManualClusterState,
descriptors: &GeneratedTopology,
) -> Vec<ReadinessNode> {
let mut indices = state.started_indices.iter().copied().collect::<Vec<_>>();
indices.sort_unstable();
let iter = indices.into_iter().filter_map(|index| {
let api = state.clients_by_index.get(index).and_then(|c| c.clone())?;
let node = descriptors.nodes().get(index)?;
let initial_peers = node
.general
.network_config
.backend
.initial_peers
.iter()
.filter_map(multiaddr_port)
.collect::<HashSet<_>>();
Some((
format!("node#{index}@{}", node.network_port()),
api,
node.network_port(),
initial_peers,
))
});
build_readiness_nodes(iter)
}

View File

@ -0,0 +1,83 @@
use std::collections::{HashMap, HashSet};
use testing_framework_core::nodes::ApiClient;
use super::ManualClusterError;
use crate::infrastructure::ports::NodeHostPorts;
pub struct ManualClusterState {
pub started_indices: HashSet<usize>,
pub clients_by_name: HashMap<String, ApiClient>,
pub name_to_index: HashMap<String, usize>,
pub clients_by_index: Vec<Option<ApiClient>>,
pub ports_by_index: Vec<Option<NodeHostPorts>>,
}
impl ManualClusterState {
pub fn new(total_nodes: usize) -> Self {
Self {
started_indices: HashSet::new(),
clients_by_name: HashMap::new(),
name_to_index: HashMap::new(),
clients_by_index: vec![None; total_nodes],
ports_by_index: vec![None; total_nodes],
}
}
pub fn reset(&mut self) {
self.started_indices.clear();
self.clients_by_name.clear();
self.name_to_index.clear();
for slot in &mut self.clients_by_index {
*slot = None;
}
for slot in &mut self.ports_by_index {
*slot = None;
}
}
pub fn register_node(
&mut self,
index: usize,
label: String,
client: ApiClient,
ports: NodeHostPorts,
) -> Result<(), ManualClusterError> {
if self.clients_by_name.contains_key(&label) {
return Err(ManualClusterError::NameExists { name: label });
}
if self.started_indices.contains(&index) {
return Err(ManualClusterError::AlreadyStarted { index });
}
self.started_indices.insert(index);
self.name_to_index.insert(label.clone(), index);
self.clients_by_name.insert(label, client.clone());
if let Some(slot) = self.clients_by_index.get_mut(index) {
*slot = Some(client);
}
if let Some(slot) = self.ports_by_index.get_mut(index) {
*slot = Some(ports);
}
Ok(())
}
}
pub fn normalize_label(name: &str) -> Option<String> {
let trimmed = name.trim();
if trimmed.is_empty() {
return None;
}
if trimmed.starts_with("node-") {
Some(trimmed.to_string())
} else {
Some(format!("node-{trimmed}"))
}
}
pub fn next_available_index(started: &HashSet<usize>, total_nodes: usize) -> Option<usize> {
(0..total_nodes).find(|index| !started.contains(index))
}

View File

@ -0,0 +1,66 @@
use std::{path::Path, time::Duration};
use anyhow::Result;
use testing_framework_core::topology::config::TopologyConfig;
use testing_framework_runner_compose::ComposeDeployer;
use tokio::time::{sleep, timeout};
const STARTUP_TIMEOUT: Duration = Duration::from_secs(180);
const CONSENSUS_POLL_TIMEOUT: Duration = Duration::from_secs(10);
#[tokio::test(flavor = "multi_thread")]
async fn manual_cluster_compose_single_node() -> Result<()> {
// Note: Prefer letting the image use its bundled /opt/circuits.
// If you need to override circuits, set:
// LOGOS_BLOCKCHAIN_CIRCUITS=/path/to/host/circuits
// LOGOS_BLOCKCHAIN_CIRCUITS_DOCKER=/path/to/linux/circuits
// and ensure they match the node/cfgsync versions.
unsafe {
std::env::set_var("POL_PROOF_DEV_MODE", "true");
}
unsafe {
std::env::set_var(
"LOGOS_BLOCKCHAIN_TESTNET_IMAGE",
"logos-blockchain-testing:local",
);
}
if let Ok(host_circuits) = std::env::var("LOGOS_BLOCKCHAIN_CIRCUITS") {
if !Path::new(&host_circuits).exists() {
return Err(anyhow::anyhow!(
"host circuits directory not found at {host_circuits}"
));
}
}
if let Ok(docker_circuits) = std::env::var("LOGOS_BLOCKCHAIN_CIRCUITS_DOCKER") {
if !Path::new(&docker_circuits).exists() {
return Err(anyhow::anyhow!(
"docker circuits directory not found at {docker_circuits}"
));
}
}
let config = TopologyConfig::with_node_numbers(1);
let deployer = ComposeDeployer::new();
let cluster = deployer.manual_cluster(config).await?;
let node = cluster.start_node("seed").await?.api;
let start = tokio::time::Instant::now();
loop {
match timeout(CONSENSUS_POLL_TIMEOUT, node.consensus_info()).await {
Ok(Ok(_)) => break,
Ok(Err(err)) => {
if start.elapsed() >= STARTUP_TIMEOUT {
return Err(err.into());
}
}
Err(_) => return Err(anyhow::anyhow!("consensus_info timed out")),
}
sleep(Duration::from_secs(2)).await;
}
Ok(())
}

View File

@ -4,16 +4,12 @@ use testing_framework_core::{
scenario::{DynError, StartNodeOptions, StartedNode},
topology::{
config::{TopologyBuildError, TopologyBuilder, TopologyConfig},
readiness::{ReadinessCheck, ReadinessError},
readiness::{ManualNetworkReadiness, ReadinessCheck, ReadinessError, ReadinessNode},
},
};
use thiserror::Error;
use crate::node_control::{LocalDynamicError, LocalDynamicNodes, ReadinessNode};
mod readiness;
use readiness::ManualNetworkReadiness;
use crate::node_control::{LocalDynamicError, LocalDynamicNodes};
#[derive(Debug, Error)]
pub enum ManualClusterError {

View File

@ -12,7 +12,8 @@ use testing_framework_core::{
},
scenario::{DynError, NodeControlHandle, StartNodeOptions, StartedNode},
topology::{
generation::{GeneratedTopology, find_expected_peer_counts},
generation::GeneratedTopology,
readiness::{ReadinessNode, build_readiness_nodes},
utils::multiaddr_port,
},
};
@ -84,12 +85,6 @@ impl LocalDynamicSeed {
}
}
pub(crate) struct ReadinessNode {
pub(crate) label: String,
pub(crate) expected_peers: Option<usize>,
pub(crate) api: ApiClient,
}
impl LocalDynamicNodes {
pub fn new(descriptors: GeneratedTopology, node_clients: NodeClients) -> Self {
Self::new_with_seed(descriptors, node_clients, LocalDynamicSeed::default())
@ -166,38 +161,26 @@ impl LocalDynamicNodes {
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let listen_ports = state
.nodes
.iter()
.map(|node| node.config().network.backend.swarm.port)
.collect::<Vec<_>>();
let iter = state.nodes.iter().enumerate().map(|(idx, node)| {
let config = node.config();
let port = config.network.backend.swarm.port;
let initial_peers = config
.network
.backend
.initial_peers
.iter()
.filter_map(multiaddr_port)
.collect::<HashSet<u16>>();
let initial_peer_ports = state
.nodes
.iter()
.map(|node| {
node.config()
.network
.backend
.initial_peers
.iter()
.filter_map(multiaddr_port)
.collect::<HashSet<u16>>()
})
.collect::<Vec<_>>();
(
format!("node#{idx}@{port}"),
node.api().clone(),
port,
initial_peers,
)
});
let expected_peer_counts = find_expected_peer_counts(&listen_ports, &initial_peer_ports);
state
.nodes
.iter()
.enumerate()
.map(|(idx, node)| ReadinessNode {
label: format!("node#{idx}@{}", node.config().network.backend.swarm.port),
expected_peers: expected_peer_counts.get(idx).copied(),
api: node.api().clone(),
})
.collect::<Vec<_>>()
build_readiness_nodes(iter)
}
async fn start_node(