mirror of
https://github.com/logos-blockchain/logos-blockchain-testing.git
synced 2026-02-17 03:33:08 +00:00
Local deployer allows to stop and restart nodes (#16)
* Unify local node control and restart support * Add local stop-node support * Use node names for restart/stop control * merge --------- Co-authored-by: hansieodendaal <hansie.odendaal@gmail.com>
This commit is contained in:
parent
e2df69b0d5
commit
062be51a4f
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -6569,6 +6569,7 @@ dependencies = [
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber 0.3.22",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@ -11,8 +11,8 @@ impl Workload for RestartWorkload {
|
||||
|
||||
async fn start(&self, ctx: &RunContext) -> Result<(), DynError> {
|
||||
if let Some(control) = ctx.node_control() {
|
||||
// Restart the first node (index 0) if supported.
|
||||
control.restart_node(0).await?;
|
||||
// Restart the first node by name if supported.
|
||||
control.restart_node("node-0").await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -3,5 +3,5 @@ use testing_framework_core::scenario::DynError;
|
||||
|
||||
#[async_trait]
|
||||
pub trait NodeControlHandle: Send + Sync {
|
||||
async fn restart_node(&self, index: usize) -> Result<(), DynError>;
|
||||
async fn restart_node(&self, name: &str) -> Result<(), DynError>;
|
||||
}
|
||||
|
||||
@ -309,11 +309,14 @@ build_bundle::prepare_circuits() {
|
||||
}
|
||||
|
||||
build_bundle::build_binaries() {
|
||||
BUILD_FEATURES_LABEL="all"
|
||||
BUILD_FEATURES_LABEL="all,pol-dev-mode,verification-keys"
|
||||
echo "==> Building binaries (platform=${PLATFORM})"
|
||||
mkdir -p "${NODE_SRC}"
|
||||
(
|
||||
cd "${NODE_SRC}"
|
||||
if [ -d "${NODE_TARGET}" ]; then
|
||||
rm -rf "${NODE_TARGET}"
|
||||
fi
|
||||
if [ -n "${LOGOS_BLOCKCHAIN_NODE_PATH}" ]; then
|
||||
echo "Using local logos-blockchain-node checkout at ${NODE_SRC} (no fetch/checkout)"
|
||||
else
|
||||
@ -326,18 +329,16 @@ build_bundle::build_binaries() {
|
||||
git clean -fdx
|
||||
fi
|
||||
|
||||
if [ -z "${LOGOS_BLOCKCHAIN_NODE_PATH}" ]; then
|
||||
build_bundle::apply_nomos_node_patches "${NODE_SRC}"
|
||||
fi
|
||||
unset CARGO_FEATURE_BUILD_VERIFICATION_KEY
|
||||
if [ -n "${BUNDLE_RUSTUP_TOOLCHAIN}" ]; then
|
||||
RUSTFLAGS='--cfg feature="pol-dev-mode"' \
|
||||
RUSTFLAGS='--cfg feature="pol-dev-mode" --cfg feature="build-verification-key"' \
|
||||
CARGO_FEATURE_BUILD_VERIFICATION_KEY=1 \
|
||||
RUSTUP_TOOLCHAIN="${BUNDLE_RUSTUP_TOOLCHAIN}" \
|
||||
cargo build --all-features \
|
||||
-p logos-blockchain-node \
|
||||
--target-dir "${NODE_TARGET}"
|
||||
else
|
||||
RUSTFLAGS='--cfg feature="pol-dev-mode"' \
|
||||
RUSTFLAGS='--cfg feature="pol-dev-mode" --cfg feature="build-verification-key"' \
|
||||
CARGO_FEATURE_BUILD_VERIFICATION_KEY=1 \
|
||||
cargo build --all-features \
|
||||
-p logos-blockchain-node \
|
||||
--target-dir "${NODE_TARGET}"
|
||||
|
||||
@ -60,6 +60,7 @@ Environment:
|
||||
LOGOS_BLOCKCHAIN_TESTNET_IMAGE_PULL_POLICY K8s imagePullPolicy (default ${DEFAULT_PULL_POLICY_LOCAL}; set to ${DEFAULT_PULL_POLICY_ECR} for --ecr)
|
||||
LOGOS_BLOCKCHAIN_BINARIES_TAR Path to prebuilt binaries tarball (default .tmp/nomos-binaries-<platform>-<version>.tar.gz)
|
||||
LOGOS_BLOCKCHAIN_CIRCUITS Directory containing circuits assets (defaults to ~/.logos-blockchain-circuits)
|
||||
CARGO_FEATURE_BUILD_VERIFICATION_KEY Build flag to embed Groth16 verification keys in node binaries (recommended for host)
|
||||
LOGOS_BLOCKCHAIN_SKIP_IMAGE_BUILD Set to 1 to skip rebuilding the compose/k8s image
|
||||
LOGOS_BLOCKCHAIN_FORCE_IMAGE_BUILD Set to 1 to force image rebuild even for k8s ECR mode
|
||||
LOGOS_BLOCKCHAIN_METRICS_QUERY_URL PromQL base URL for the runner process (optional)
|
||||
@ -301,8 +302,9 @@ run_examples::bundle_matches_expected() {
|
||||
local tar_path="$1"
|
||||
[ -f "${tar_path}" ] || return 1
|
||||
[ -z "${LOGOS_BLOCKCHAIN_NODE_REV:-}" ] && return 0
|
||||
local expected_features="${RUN_EXAMPLES_EXPECTED_BUNDLE_FEATURES:-all,pol-dev-mode,verification-keys}"
|
||||
|
||||
local meta tar_rev tar_head
|
||||
local meta tar_rev tar_head tar_features
|
||||
meta="$(tar -xOzf "${tar_path}" artifacts/nomos-bundle-meta.env 2>/dev/null || true)"
|
||||
if [ -z "${meta}" ]; then
|
||||
echo "Bundle meta missing in ${tar_path}; treating as stale and rebuilding." >&2
|
||||
@ -310,6 +312,11 @@ run_examples::bundle_matches_expected() {
|
||||
fi
|
||||
tar_rev="$(echo "${meta}" | sed -n 's/^nomos_node_rev=//p' | head -n 1)"
|
||||
tar_head="$(echo "${meta}" | sed -n 's/^nomos_node_git_head=//p' | head -n 1)"
|
||||
tar_features="$(echo "${meta}" | sed -n 's/^features=//p' | head -n 1)"
|
||||
if [ -n "${expected_features}" ] && [ "${tar_features}" != "${expected_features}" ]; then
|
||||
echo "Bundle ${tar_path} features '${tar_features}' do not match expected '${expected_features}'; rebuilding." >&2
|
||||
return 1
|
||||
fi
|
||||
if [ -n "${tar_rev}" ] && [ "${tar_rev}" != "${LOGOS_BLOCKCHAIN_NODE_REV}" ]; then
|
||||
echo "Bundle ${tar_path} is for logos-blockchain-node rev ${tar_rev}, expected ${LOGOS_BLOCKCHAIN_NODE_REV}; rebuilding." >&2
|
||||
return 1
|
||||
@ -501,6 +508,8 @@ run_examples::run() {
|
||||
|
||||
if [ "${MODE}" = "host" ]; then
|
||||
run_examples::ensure_circuits
|
||||
# Ensure Groth16 verification keys are embedded when building local node binaries.
|
||||
export CARGO_FEATURE_BUILD_VERIFICATION_KEY=1
|
||||
fi
|
||||
|
||||
echo "==> Running ${BIN} for ${RUN_SECS}s (mode=${MODE}, image=${IMAGE})"
|
||||
|
||||
@ -1,10 +1,10 @@
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::scenario::{DynError, StartNodeOptions, StartedNode};
|
||||
use crate::scenario::{DynError, NodeControlHandle, StartNodeOptions, StartedNode};
|
||||
|
||||
/// Interface for imperative, deployer-backed manual clusters.
|
||||
#[async_trait]
|
||||
pub trait ManualClusterHandle: Send + Sync {
|
||||
pub trait ManualClusterHandle: NodeControlHandle {
|
||||
async fn start_node_with(
|
||||
&self,
|
||||
name: &str,
|
||||
|
||||
@ -195,7 +195,7 @@ fn write_node_config<C: Serialize>(config: &C, config_path: &Path) -> Result<(),
|
||||
})
|
||||
}
|
||||
|
||||
fn spawn_node_process(
|
||||
pub(crate) fn spawn_node_process(
|
||||
binary_path: &Path,
|
||||
config_path: &Path,
|
||||
workdir: &Path,
|
||||
@ -213,7 +213,9 @@ fn spawn_node_process(
|
||||
})
|
||||
}
|
||||
|
||||
async fn wait_for_consensus_readiness(api: &ApiClient) -> Result<(), time::error::Elapsed> {
|
||||
pub(crate) async fn wait_for_consensus_readiness(
|
||||
api: &ApiClient,
|
||||
) -> Result<(), time::error::Elapsed> {
|
||||
time::timeout(STARTUP_TIMEOUT, async {
|
||||
loop {
|
||||
if api.consensus_info().await.is_ok() {
|
||||
|
||||
@ -13,7 +13,10 @@ use crate::{
|
||||
common::{
|
||||
binary::{BinaryConfig, BinaryResolver},
|
||||
lifecycle::{kill::kill_child, monitor::is_running},
|
||||
node::{NodeAddresses, NodeConfigCommon, NodeHandle, SpawnNodeError, spawn_node},
|
||||
node::{
|
||||
NodeAddresses, NodeConfigCommon, NodeHandle, SpawnNodeError, spawn_node,
|
||||
spawn_node_process, wait_for_consensus_readiness,
|
||||
},
|
||||
},
|
||||
},
|
||||
scenario::DynError,
|
||||
@ -21,6 +24,7 @@ use crate::{
|
||||
};
|
||||
|
||||
const BIN_PATH: &str = "target/debug/logos-blockchain-node";
|
||||
const RESTART_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
fn binary_path() -> PathBuf {
|
||||
let cfg = BinaryConfig {
|
||||
@ -75,6 +79,12 @@ impl Drop for Node {
|
||||
}
|
||||
|
||||
impl Node {
|
||||
/// Return the current process id for the running node.
|
||||
#[must_use]
|
||||
pub fn pid(&self) -> u32 {
|
||||
self.handle.child.id()
|
||||
}
|
||||
|
||||
/// Check if the node process is still running
|
||||
pub fn is_running(&mut self) -> bool {
|
||||
is_running(&mut self.handle.child)
|
||||
@ -101,6 +111,40 @@ impl Node {
|
||||
|
||||
Ok(Self { handle })
|
||||
}
|
||||
|
||||
/// Restart the node process using the existing config and data directory.
|
||||
pub async fn restart(&mut self) -> Result<(), SpawnNodeError> {
|
||||
let old_pid = self.pid();
|
||||
debug!(old_pid, "restarting node process");
|
||||
|
||||
kill_child(&mut self.handle.child);
|
||||
let _ = self.wait_for_exit(RESTART_SHUTDOWN_TIMEOUT).await;
|
||||
|
||||
let config_path = self.handle.tempdir.path().join("node.yaml");
|
||||
let child = spawn_node_process(&binary_path(), &config_path, self.handle.tempdir.path())?;
|
||||
self.handle.child = child;
|
||||
|
||||
let new_pid = self.pid();
|
||||
wait_for_consensus_readiness(&self.handle.api)
|
||||
.await
|
||||
.map_err(|source| SpawnNodeError::Readiness { source })?;
|
||||
|
||||
info!(
|
||||
old_pid,
|
||||
new_pid, "node restart readiness confirmed via consensus_info"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stop the node process without restarting it.
|
||||
pub async fn stop(&mut self) {
|
||||
let pid = self.pid();
|
||||
debug!(pid, "stopping node process");
|
||||
|
||||
kill_child(&mut self.handle.child);
|
||||
let _ = self.wait_for_exit(RESTART_SHUTDOWN_TIMEOUT).await;
|
||||
}
|
||||
}
|
||||
|
||||
impl NodeConfigCommon for RunConfig {
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use reqwest::Url;
|
||||
|
||||
use super::DynError;
|
||||
@ -84,28 +83,6 @@ impl RequiresNodeControl for ObservabilityCapability {
|
||||
const REQUIRED: bool = false;
|
||||
}
|
||||
|
||||
/// Interface exposed by runners that can restart nodes at runtime.
|
||||
#[async_trait]
|
||||
pub trait NodeControlHandle: Send + Sync {
|
||||
async fn restart_node(&self, index: usize) -> Result<(), DynError>;
|
||||
|
||||
async fn start_node(&self, _name: &str) -> Result<StartedNode, DynError> {
|
||||
Err("start_node not supported by this deployer".into())
|
||||
}
|
||||
|
||||
async fn start_node_with(
|
||||
&self,
|
||||
_name: &str,
|
||||
_options: StartNodeOptions,
|
||||
) -> Result<StartedNode, DynError> {
|
||||
Err("start_node_with not supported by this deployer".into())
|
||||
}
|
||||
|
||||
fn node_client(&self, _name: &str) -> Option<ApiClient> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct StartedNode {
|
||||
pub name: String,
|
||||
|
||||
38
testing-framework/core/src/scenario/control.rs
Normal file
38
testing-framework/core/src/scenario/control.rs
Normal file
@ -0,0 +1,38 @@
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::{
|
||||
nodes::ApiClient,
|
||||
scenario::{DynError, StartNodeOptions, StartedNode},
|
||||
};
|
||||
|
||||
/// Deployer-agnostic control surface for runtime node operations.
|
||||
#[async_trait]
|
||||
pub trait NodeControlHandle: Send + Sync {
|
||||
async fn restart_node(&self, _name: &str) -> Result<(), DynError> {
|
||||
Err("restart_node not supported by this deployer".into())
|
||||
}
|
||||
|
||||
async fn start_node(&self, _name: &str) -> Result<StartedNode, DynError> {
|
||||
Err("start_node not supported by this deployer".into())
|
||||
}
|
||||
|
||||
async fn start_node_with(
|
||||
&self,
|
||||
_name: &str,
|
||||
_options: StartNodeOptions,
|
||||
) -> Result<StartedNode, DynError> {
|
||||
Err("start_node_with not supported by this deployer".into())
|
||||
}
|
||||
|
||||
async fn stop_node(&self, _name: &str) -> Result<(), DynError> {
|
||||
Err("stop_node not supported by this deployer".into())
|
||||
}
|
||||
|
||||
fn node_client(&self, _name: &str) -> Option<ApiClient> {
|
||||
None
|
||||
}
|
||||
|
||||
fn node_pid(&self, _name: &str) -> Option<u32> {
|
||||
None
|
||||
}
|
||||
}
|
||||
@ -2,6 +2,7 @@
|
||||
|
||||
mod capabilities;
|
||||
pub mod cfgsync;
|
||||
mod control;
|
||||
mod definition;
|
||||
mod expectation;
|
||||
pub mod http_probe;
|
||||
@ -12,9 +13,10 @@ mod workload;
|
||||
pub type DynError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
|
||||
pub use capabilities::{
|
||||
NodeControlCapability, NodeControlHandle, ObservabilityCapability, PeerSelection,
|
||||
RequiresNodeControl, StartNodeOptions, StartedNode,
|
||||
NodeControlCapability, ObservabilityCapability, PeerSelection, RequiresNodeControl,
|
||||
StartNodeOptions, StartedNode,
|
||||
};
|
||||
pub use control::NodeControlHandle;
|
||||
pub use definition::{
|
||||
Builder, Scenario, ScenarioBuildError, ScenarioBuilder, TopologyConfigurator,
|
||||
};
|
||||
|
||||
@ -1,16 +1,9 @@
|
||||
use std::collections::HashSet;
|
||||
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::{
|
||||
nodes::{
|
||||
common::node::SpawnNodeError,
|
||||
node::{Node, apply_node_config_patch, create_node_config},
|
||||
},
|
||||
scenario,
|
||||
nodes::node::Node,
|
||||
topology::{
|
||||
config::{TopologyBuildError, TopologyBuilder, TopologyConfig},
|
||||
generation::{GeneratedNodeConfig, find_expected_peer_counts},
|
||||
generation::find_expected_peer_counts,
|
||||
readiness::{NetworkReadiness, ReadinessCheck, ReadinessError},
|
||||
utils::multiaddr_port,
|
||||
},
|
||||
@ -21,65 +14,11 @@ pub struct Topology {
|
||||
pub(crate) nodes: Vec<Node>,
|
||||
}
|
||||
|
||||
pub type DeployedNodes = Vec<Node>;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum SpawnTopologyError {
|
||||
#[error(transparent)]
|
||||
Build(#[from] TopologyBuildError),
|
||||
#[error(transparent)]
|
||||
Node(#[from] SpawnNodeError),
|
||||
#[error("node config patch failed for node-{index}: {source}")]
|
||||
ConfigPatch {
|
||||
index: usize,
|
||||
source: scenario::DynError,
|
||||
},
|
||||
}
|
||||
|
||||
impl Topology {
|
||||
pub async fn spawn(config: TopologyConfig) -> Result<Self, SpawnTopologyError> {
|
||||
let generated = TopologyBuilder::new(config.clone()).build()?;
|
||||
let nodes = Self::spawn_nodes(generated.nodes()).await?;
|
||||
|
||||
Ok(Self { nodes })
|
||||
}
|
||||
|
||||
pub async fn spawn_with_empty_membership(
|
||||
config: TopologyConfig,
|
||||
ids: &[[u8; 32]],
|
||||
blend_ports: &[u16],
|
||||
) -> Result<Self, SpawnTopologyError> {
|
||||
let generated = TopologyBuilder::new(config.clone())
|
||||
.with_ids(ids.to_vec())
|
||||
.with_blend_ports(blend_ports.to_vec())
|
||||
.build()?;
|
||||
|
||||
let nodes = Self::spawn_nodes(generated.nodes()).await?;
|
||||
|
||||
Ok(Self { nodes })
|
||||
}
|
||||
|
||||
pub(crate) async fn spawn_nodes(
|
||||
nodes: &[GeneratedNodeConfig],
|
||||
) -> Result<DeployedNodes, SpawnTopologyError> {
|
||||
let mut spawned = Vec::new();
|
||||
for node in nodes {
|
||||
let mut config = create_node_config(node.general.clone());
|
||||
|
||||
if let Some(patch) = node.config_patch.as_ref() {
|
||||
config = apply_node_config_patch(config, patch).map_err(|source| {
|
||||
SpawnTopologyError::ConfigPatch {
|
||||
index: node.index,
|
||||
source,
|
||||
}
|
||||
})?;
|
||||
}
|
||||
|
||||
let label = format!("node-{}", node.index);
|
||||
spawned.push(Node::spawn(config, &label).await?);
|
||||
}
|
||||
|
||||
Ok(spawned)
|
||||
/// Construct a topology from already-spawned nodes.
|
||||
#[must_use]
|
||||
pub fn from_nodes(nodes: Vec<Node>) -> Self {
|
||||
Self { nodes }
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
@ -87,6 +26,11 @@ impl Topology {
|
||||
&self.nodes
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn into_nodes(self) -> Vec<Node> {
|
||||
self.nodes
|
||||
}
|
||||
|
||||
pub async fn wait_network_ready(&self) -> Result<(), ReadinessError> {
|
||||
let listen_ports = self.node_listen_ports();
|
||||
if listen_ports.len() <= 1 {
|
||||
|
||||
@ -5,7 +5,6 @@ use reqwest::{Client, Url};
|
||||
use crate::topology::{
|
||||
config::{NodeConfigPatch, TopologyConfig},
|
||||
configs::{GeneralConfig, wallet::WalletAccount},
|
||||
deployment::{SpawnTopologyError, Topology},
|
||||
readiness::{HttpNetworkReadiness, ReadinessCheck, ReadinessError},
|
||||
};
|
||||
|
||||
@ -82,12 +81,6 @@ impl GeneratedTopology {
|
||||
&self.config.wallet_config.accounts
|
||||
}
|
||||
|
||||
pub async fn spawn_local(&self) -> Result<Topology, SpawnTopologyError> {
|
||||
let nodes = Topology::spawn_nodes(self.nodes()).await?;
|
||||
|
||||
Ok(Topology { nodes })
|
||||
}
|
||||
|
||||
pub async fn wait_remote_readiness(
|
||||
&self,
|
||||
// Node endpoints
|
||||
|
||||
@ -45,13 +45,9 @@ pub struct ComposeNodeControl {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl NodeControlHandle for ComposeNodeControl {
|
||||
async fn restart_node(&self, index: usize) -> Result<(), DynError> {
|
||||
restart_compose_service(
|
||||
&self.compose_file,
|
||||
&self.project_name,
|
||||
&format!("node-{index}"),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| format!("node restart failed: {err}").into())
|
||||
async fn restart_node(&self, name: &str) -> Result<(), DynError> {
|
||||
restart_compose_service(&self.compose_file, &self.project_name, name)
|
||||
.await
|
||||
.map_err(|err| format!("node restart failed: {err}").into())
|
||||
}
|
||||
}
|
||||
|
||||
@ -24,3 +24,6 @@ testing-framework-core = { path = "../../core" }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
tracing-subscriber = "0.3"
|
||||
|
||||
@ -3,5 +3,5 @@ mod node_control;
|
||||
mod runner;
|
||||
|
||||
pub use manual::{LocalManualCluster, ManualClusterError};
|
||||
pub use node_control::{LocalDynamicError, LocalDynamicNodes, LocalDynamicSeed};
|
||||
pub use node_control::{LocalNodeManager, LocalNodeManagerError, LocalNodeManagerSeed};
|
||||
pub use runner::{LocalDeployer, LocalDeployerError};
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
use testing_framework_core::{
|
||||
manual::ManualClusterHandle,
|
||||
nodes::ApiClient,
|
||||
scenario::{DynError, StartNodeOptions, StartedNode},
|
||||
scenario::{DynError, NodeControlHandle, StartNodeOptions, StartedNode},
|
||||
topology::{
|
||||
config::{TopologyBuildError, TopologyBuilder, TopologyConfig},
|
||||
readiness::{ReadinessCheck, ReadinessError},
|
||||
@ -9,7 +9,7 @@ use testing_framework_core::{
|
||||
};
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::node_control::{LocalDynamicError, LocalDynamicNodes, ReadinessNode};
|
||||
use crate::node_control::{LocalNodeManager, LocalNodeManagerError, ReadinessNode};
|
||||
|
||||
mod readiness;
|
||||
|
||||
@ -23,12 +23,12 @@ pub enum ManualClusterError {
|
||||
source: TopologyBuildError,
|
||||
},
|
||||
#[error(transparent)]
|
||||
Dynamic(#[from] LocalDynamicError),
|
||||
Dynamic(#[from] LocalNodeManagerError),
|
||||
}
|
||||
|
||||
/// Imperative, in-process cluster that can start nodes on demand.
|
||||
pub struct LocalManualCluster {
|
||||
nodes: LocalDynamicNodes,
|
||||
nodes: LocalNodeManager,
|
||||
}
|
||||
|
||||
impl LocalManualCluster {
|
||||
@ -37,7 +37,7 @@ impl LocalManualCluster {
|
||||
let descriptors = builder
|
||||
.build()
|
||||
.map_err(|source| ManualClusterError::Build { source })?;
|
||||
let nodes = LocalDynamicNodes::new(
|
||||
let nodes = LocalNodeManager::new(
|
||||
descriptors,
|
||||
testing_framework_core::scenario::NodeClients::default(),
|
||||
);
|
||||
@ -49,6 +49,11 @@ impl LocalManualCluster {
|
||||
self.nodes.node_client(name)
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn node_pid(&self, name: &str) -> Option<u32> {
|
||||
self.nodes.node_pid(name)
|
||||
}
|
||||
|
||||
pub async fn start_node(&self, name: &str) -> Result<StartedNode, ManualClusterError> {
|
||||
Ok(self
|
||||
.nodes
|
||||
@ -68,6 +73,14 @@ impl LocalManualCluster {
|
||||
self.nodes.stop_all();
|
||||
}
|
||||
|
||||
pub async fn restart_node(&self, name: &str) -> Result<(), ManualClusterError> {
|
||||
Ok(self.nodes.restart_node(name).await?)
|
||||
}
|
||||
|
||||
pub async fn stop_node(&self, name: &str) -> Result<(), ManualClusterError> {
|
||||
Ok(self.nodes.stop_node(name).await?)
|
||||
}
|
||||
|
||||
pub async fn wait_network_ready(&self) -> Result<(), ReadinessError> {
|
||||
let nodes = self.nodes.readiness_nodes();
|
||||
if self.is_singleton(&nodes) {
|
||||
@ -92,6 +105,44 @@ impl Drop for LocalManualCluster {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl NodeControlHandle for LocalManualCluster {
|
||||
async fn restart_node(&self, name: &str) -> Result<(), DynError> {
|
||||
self.nodes
|
||||
.restart_node(name)
|
||||
.await
|
||||
.map_err(|err| err.into())
|
||||
}
|
||||
|
||||
async fn stop_node(&self, name: &str) -> Result<(), DynError> {
|
||||
self.nodes.stop_node(name).await.map_err(|err| err.into())
|
||||
}
|
||||
|
||||
async fn start_node(&self, name: &str) -> Result<StartedNode, DynError> {
|
||||
self.start_node_with(name, StartNodeOptions::default())
|
||||
.await
|
||||
.map_err(|err| err.into())
|
||||
}
|
||||
|
||||
async fn start_node_with(
|
||||
&self,
|
||||
name: &str,
|
||||
options: StartNodeOptions,
|
||||
) -> Result<StartedNode, DynError> {
|
||||
self.start_node_with(name, options)
|
||||
.await
|
||||
.map_err(|err| err.into())
|
||||
}
|
||||
|
||||
fn node_client(&self, name: &str) -> Option<ApiClient> {
|
||||
self.node_client(name)
|
||||
}
|
||||
|
||||
fn node_pid(&self, name: &str) -> Option<u32> {
|
||||
self.node_pid(name)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ManualClusterHandle for LocalManualCluster {
|
||||
async fn start_node_with(
|
||||
|
||||
@ -8,7 +8,7 @@ use testing_framework_config::topology::configs::{
|
||||
runtime::{build_general_config_for_node, build_initial_peers},
|
||||
time::GeneralTimeConfig,
|
||||
};
|
||||
use testing_framework_core::{
|
||||
pub(crate) use testing_framework_core::{
|
||||
scenario::{PeerSelection, StartNodeOptions},
|
||||
topology::{
|
||||
config::{NodeConfigPatch, TopologyConfig},
|
||||
@ -17,7 +17,7 @@ use testing_framework_core::{
|
||||
},
|
||||
};
|
||||
|
||||
use super::LocalDynamicError;
|
||||
use super::LocalNodeManagerError;
|
||||
|
||||
pub(super) fn build_general_config_for(
|
||||
descriptors: &GeneratedTopology,
|
||||
@ -27,7 +27,7 @@ pub(super) fn build_general_config_for(
|
||||
peer_ports_by_name: &HashMap<String, u16>,
|
||||
options: &StartNodeOptions,
|
||||
peer_ports: &[u16],
|
||||
) -> Result<(GeneralConfig, u16, Option<NodeConfigPatch>), LocalDynamicError> {
|
||||
) -> Result<(GeneralConfig, u16, Option<NodeConfigPatch>), LocalNodeManagerError> {
|
||||
if let Some(node) = descriptor_for(descriptors, index) {
|
||||
let mut config = node.general.clone();
|
||||
let initial_peers = resolve_initial_peers(
|
||||
@ -59,7 +59,7 @@ pub(super) fn build_general_config_for(
|
||||
base_consensus,
|
||||
base_time,
|
||||
)
|
||||
.map_err(|source| LocalDynamicError::Config { source })?;
|
||||
.map_err(|source| LocalNodeManagerError::Config { source })?;
|
||||
|
||||
Ok((general_config, network_port, None))
|
||||
}
|
||||
@ -71,13 +71,13 @@ fn descriptor_for(descriptors: &GeneratedTopology, index: usize) -> Option<&Gene
|
||||
fn resolve_peer_names(
|
||||
peer_ports_by_name: &HashMap<String, u16>,
|
||||
peer_names: &[String],
|
||||
) -> Result<Vec<Multiaddr>, LocalDynamicError> {
|
||||
) -> Result<Vec<Multiaddr>, LocalNodeManagerError> {
|
||||
let mut peers = Vec::with_capacity(peer_names.len());
|
||||
for name in peer_names {
|
||||
let port =
|
||||
peer_ports_by_name
|
||||
.get(name)
|
||||
.ok_or_else(|| LocalDynamicError::InvalidArgument {
|
||||
.ok_or_else(|| LocalNodeManagerError::InvalidArgument {
|
||||
message: format!("unknown peer name '{name}'"),
|
||||
})?;
|
||||
peers.push(testing_framework_config::node_address_from_port(*port));
|
||||
@ -91,7 +91,7 @@ fn resolve_initial_peers(
|
||||
default_peers: &[Multiaddr],
|
||||
descriptors: &GeneratedTopology,
|
||||
peer_ports: &[u16],
|
||||
) -> Result<Vec<Multiaddr>, LocalDynamicError> {
|
||||
) -> Result<Vec<Multiaddr>, LocalNodeManagerError> {
|
||||
match &options.peers {
|
||||
PeerSelection::Named(names) => resolve_peer_names(peer_ports_by_name, names),
|
||||
PeerSelection::DefaultLayout => {
|
||||
@ -112,8 +112,8 @@ fn random_node_id() -> [u8; 32] {
|
||||
id
|
||||
}
|
||||
|
||||
fn allocate_udp_port(label: &'static str) -> Result<u16, LocalDynamicError> {
|
||||
get_available_udp_port().ok_or_else(|| LocalDynamicError::PortAllocation {
|
||||
fn allocate_udp_port(label: &'static str) -> Result<u16, LocalNodeManagerError> {
|
||||
get_available_udp_port().ok_or_else(|| LocalNodeManagerError::PortAllocation {
|
||||
message: format!("failed to allocate free UDP port for {label}"),
|
||||
})
|
||||
}
|
||||
|
||||
@ -12,6 +12,7 @@ use testing_framework_core::{
|
||||
},
|
||||
scenario::{DynError, NodeControlHandle, StartNodeOptions, StartedNode},
|
||||
topology::{
|
||||
deployment::Topology,
|
||||
generation::{GeneratedTopology, find_expected_peer_counts},
|
||||
utils::multiaddr_port,
|
||||
},
|
||||
@ -22,11 +23,11 @@ mod config;
|
||||
mod state;
|
||||
|
||||
use config::build_general_config_for;
|
||||
use state::LocalDynamicState;
|
||||
use state::LocalNodeManagerState;
|
||||
use testing_framework_core::scenario::NodeClients;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum LocalDynamicError {
|
||||
pub enum LocalNodeManagerError {
|
||||
#[error("failed to generate node config: {source}")]
|
||||
Config {
|
||||
#[source]
|
||||
@ -43,25 +44,32 @@ pub enum LocalDynamicError {
|
||||
PortAllocation { message: String },
|
||||
#[error("node config patch failed: {message}")]
|
||||
ConfigPatch { message: String },
|
||||
#[error("node name '{name}' is unknown")]
|
||||
NodeName { name: String },
|
||||
#[error("failed to restart node: {source}")]
|
||||
Restart {
|
||||
#[source]
|
||||
source: testing_framework_core::nodes::common::node::SpawnNodeError,
|
||||
},
|
||||
}
|
||||
|
||||
pub struct LocalDynamicNodes {
|
||||
pub struct LocalNodeManager {
|
||||
descriptors: GeneratedTopology,
|
||||
base_consensus: consensus::GeneralConsensusConfig,
|
||||
base_time: time::GeneralTimeConfig,
|
||||
node_clients: NodeClients,
|
||||
seed: LocalDynamicSeed,
|
||||
state: Mutex<LocalDynamicState>,
|
||||
seed: LocalNodeManagerSeed,
|
||||
state: Mutex<LocalNodeManagerState>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct LocalDynamicSeed {
|
||||
pub struct LocalNodeManagerSeed {
|
||||
pub node_count: usize,
|
||||
pub peer_ports: Vec<u16>,
|
||||
pub peer_ports_by_name: HashMap<String, u16>,
|
||||
}
|
||||
|
||||
impl LocalDynamicSeed {
|
||||
impl LocalNodeManagerSeed {
|
||||
#[must_use]
|
||||
pub fn from_topology(descriptors: &GeneratedTopology) -> Self {
|
||||
let peer_ports = descriptors
|
||||
@ -90,15 +98,39 @@ pub(crate) struct ReadinessNode {
|
||||
pub(crate) api: ApiClient,
|
||||
}
|
||||
|
||||
impl LocalDynamicNodes {
|
||||
impl LocalNodeManager {
|
||||
fn default_label(index: usize) -> String {
|
||||
format!("node-{index}")
|
||||
}
|
||||
|
||||
pub async fn spawn_initial_nodes(
|
||||
descriptors: &GeneratedTopology,
|
||||
) -> Result<Vec<Node>, testing_framework_core::nodes::common::node::SpawnNodeError> {
|
||||
let mut nodes = Vec::with_capacity(descriptors.nodes().len());
|
||||
for node in descriptors.nodes() {
|
||||
let label = Self::default_label(node.index());
|
||||
let config = create_node_config(node.general.clone());
|
||||
let spawned = Node::spawn(config, &label).await?;
|
||||
nodes.push(spawned);
|
||||
}
|
||||
|
||||
Ok(nodes)
|
||||
}
|
||||
|
||||
pub async fn spawn_initial_topology(
|
||||
descriptors: &GeneratedTopology,
|
||||
) -> Result<Topology, testing_framework_core::nodes::common::node::SpawnNodeError> {
|
||||
let nodes = Self::spawn_initial_nodes(descriptors).await?;
|
||||
Ok(Topology::from_nodes(nodes))
|
||||
}
|
||||
pub fn new(descriptors: GeneratedTopology, node_clients: NodeClients) -> Self {
|
||||
Self::new_with_seed(descriptors, node_clients, LocalDynamicSeed::default())
|
||||
Self::new_with_seed(descriptors, node_clients, LocalNodeManagerSeed::default())
|
||||
}
|
||||
|
||||
pub fn new_with_seed(
|
||||
descriptors: GeneratedTopology,
|
||||
node_clients: NodeClients,
|
||||
seed: LocalDynamicSeed,
|
||||
seed: LocalNodeManagerSeed,
|
||||
) -> Self {
|
||||
let base_node = descriptors
|
||||
.nodes()
|
||||
@ -108,11 +140,12 @@ impl LocalDynamicNodes {
|
||||
let base_consensus = base_node.general.consensus_config.clone();
|
||||
let base_time = base_node.general.time_config.clone();
|
||||
|
||||
let state = LocalDynamicState {
|
||||
let state = LocalNodeManagerState {
|
||||
node_count: seed.node_count,
|
||||
peer_ports: seed.peer_ports.clone(),
|
||||
peer_ports_by_name: seed.peer_ports_by_name.clone(),
|
||||
clients_by_name: HashMap::new(),
|
||||
indices_by_name: HashMap::new(),
|
||||
nodes: Vec::new(),
|
||||
};
|
||||
|
||||
@ -136,6 +169,22 @@ impl LocalDynamicNodes {
|
||||
state.clients_by_name.get(name).cloned()
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn node_pid(&self, name: &str) -> Option<u32> {
|
||||
let mut state = self
|
||||
.state
|
||||
.lock()
|
||||
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||
|
||||
let index = *state.indices_by_name.get(name)?;
|
||||
let node = state.nodes.get_mut(index)?;
|
||||
if node.is_running() {
|
||||
Some(node.pid())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stop_all(&self) {
|
||||
let mut state = self
|
||||
.state
|
||||
@ -148,15 +197,46 @@ impl LocalDynamicNodes {
|
||||
.peer_ports_by_name
|
||||
.clone_from(&self.seed.peer_ports_by_name);
|
||||
state.clients_by_name.clear();
|
||||
state.indices_by_name.clear();
|
||||
state.node_count = self.seed.node_count;
|
||||
self.node_clients.clear();
|
||||
}
|
||||
|
||||
pub fn initialize_with_nodes(&self, nodes: Vec<Node>) {
|
||||
self.node_clients.clear();
|
||||
|
||||
let mut state = self
|
||||
.state
|
||||
.lock()
|
||||
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||
|
||||
state.nodes.clear();
|
||||
state.peer_ports.clear();
|
||||
state.peer_ports_by_name.clear();
|
||||
state.clients_by_name.clear();
|
||||
state.indices_by_name.clear();
|
||||
state.node_count = 0;
|
||||
|
||||
for (idx, node) in nodes.into_iter().enumerate() {
|
||||
let name = Self::default_label(idx);
|
||||
let port = node.config().user.network.backend.swarm.port;
|
||||
let client = node.api().clone();
|
||||
|
||||
self.node_clients.add_node(client.clone());
|
||||
state.register_node(&name, port, client, node);
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn node_clients(&self) -> NodeClients {
|
||||
self.node_clients.clone()
|
||||
}
|
||||
|
||||
pub async fn start_node_with(
|
||||
&self,
|
||||
name: &str,
|
||||
options: StartNodeOptions,
|
||||
) -> Result<StartedNode, LocalDynamicError> {
|
||||
) -> Result<StartedNode, LocalNodeManagerError> {
|
||||
self.start_node(name, options).await
|
||||
}
|
||||
|
||||
@ -208,7 +288,7 @@ impl LocalDynamicNodes {
|
||||
&self,
|
||||
name: &str,
|
||||
options: StartNodeOptions,
|
||||
) -> Result<StartedNode, LocalDynamicError> {
|
||||
) -> Result<StartedNode, LocalNodeManagerError> {
|
||||
let (peer_ports, peer_ports_by_name, node_name, index) = {
|
||||
let state = self
|
||||
.state
|
||||
@ -217,13 +297,15 @@ impl LocalDynamicNodes {
|
||||
|
||||
let index = state.node_count;
|
||||
let label = if name.trim().is_empty() {
|
||||
format!("node-{index}")
|
||||
Self::default_label(index)
|
||||
} else if name.starts_with("node-") {
|
||||
name.to_string()
|
||||
} else {
|
||||
format!("node-{name}")
|
||||
};
|
||||
|
||||
if state.peer_ports_by_name.contains_key(&label) {
|
||||
return Err(LocalDynamicError::InvalidArgument {
|
||||
return Err(LocalNodeManagerError::InvalidArgument {
|
||||
message: format!("node name '{label}' already exists"),
|
||||
});
|
||||
}
|
||||
@ -262,15 +344,94 @@ impl LocalDynamicNodes {
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn restart_node(&self, name: &str) -> Result<(), LocalNodeManagerError> {
|
||||
let (index, mut node) = {
|
||||
let mut state = self
|
||||
.state
|
||||
.lock()
|
||||
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||
|
||||
let Some(index) = state.indices_by_name.get(name).copied() else {
|
||||
return Err(LocalNodeManagerError::NodeName {
|
||||
name: name.to_string(),
|
||||
});
|
||||
};
|
||||
|
||||
if index >= state.nodes.len() {
|
||||
return Err(LocalNodeManagerError::NodeName {
|
||||
name: name.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let node = state.nodes.remove(index);
|
||||
(index, node)
|
||||
};
|
||||
|
||||
node.restart()
|
||||
.await
|
||||
.map_err(|source| LocalNodeManagerError::Restart { source })?;
|
||||
|
||||
let mut state = self
|
||||
.state
|
||||
.lock()
|
||||
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||
|
||||
if index <= state.nodes.len() {
|
||||
state.nodes.insert(index, node);
|
||||
} else {
|
||||
state.nodes.push(node);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn stop_node(&self, name: &str) -> Result<(), LocalNodeManagerError> {
|
||||
let (index, mut node) = {
|
||||
let mut state = self
|
||||
.state
|
||||
.lock()
|
||||
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||
|
||||
let Some(index) = state.indices_by_name.get(name).copied() else {
|
||||
return Err(LocalNodeManagerError::NodeName {
|
||||
name: name.to_string(),
|
||||
});
|
||||
};
|
||||
|
||||
if index >= state.nodes.len() {
|
||||
return Err(LocalNodeManagerError::NodeName {
|
||||
name: name.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let node = state.nodes.remove(index);
|
||||
(index, node)
|
||||
};
|
||||
|
||||
node.stop().await;
|
||||
|
||||
let mut state = self
|
||||
.state
|
||||
.lock()
|
||||
.unwrap_or_else(|poisoned| poisoned.into_inner());
|
||||
|
||||
if index <= state.nodes.len() {
|
||||
state.nodes.insert(index, node);
|
||||
} else {
|
||||
state.nodes.push(node);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn spawn_and_register_node(
|
||||
&self,
|
||||
node_name: &str,
|
||||
network_port: u16,
|
||||
config: RunConfig,
|
||||
) -> Result<ApiClient, LocalDynamicError> {
|
||||
) -> Result<ApiClient, LocalNodeManagerError> {
|
||||
let node = Node::spawn(config, node_name)
|
||||
.await
|
||||
.map_err(|source| LocalDynamicError::Spawn { source })?;
|
||||
.map_err(|source| LocalNodeManagerError::Spawn { source })?;
|
||||
let client = node.api().clone();
|
||||
|
||||
self.node_clients.add_node(client.clone());
|
||||
@ -288,9 +449,9 @@ impl LocalDynamicNodes {
|
||||
|
||||
fn build_node_config(
|
||||
general_config: testing_framework_config::topology::configs::GeneralConfig,
|
||||
descriptor_patch: Option<&testing_framework_core::topology::config::NodeConfigPatch>,
|
||||
options_patch: Option<&testing_framework_core::topology::config::NodeConfigPatch>,
|
||||
) -> Result<RunConfig, LocalDynamicError> {
|
||||
descriptor_patch: Option<&config::NodeConfigPatch>,
|
||||
options_patch: Option<&config::NodeConfigPatch>,
|
||||
) -> Result<RunConfig, LocalNodeManagerError> {
|
||||
let mut config = create_node_config(general_config);
|
||||
config = apply_patch_if_needed(config, descriptor_patch)?;
|
||||
config = apply_patch_if_needed(config, options_patch)?;
|
||||
@ -300,21 +461,25 @@ fn build_node_config(
|
||||
|
||||
fn apply_patch_if_needed(
|
||||
config: RunConfig,
|
||||
patch: Option<&testing_framework_core::topology::config::NodeConfigPatch>,
|
||||
) -> Result<RunConfig, LocalDynamicError> {
|
||||
patch: Option<&config::NodeConfigPatch>,
|
||||
) -> Result<RunConfig, LocalNodeManagerError> {
|
||||
let Some(patch) = patch else {
|
||||
return Ok(config);
|
||||
};
|
||||
|
||||
apply_node_config_patch(config, patch).map_err(|err| LocalDynamicError::ConfigPatch {
|
||||
apply_node_config_patch(config, patch).map_err(|err| LocalNodeManagerError::ConfigPatch {
|
||||
message: err.to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl NodeControlHandle for LocalDynamicNodes {
|
||||
async fn restart_node(&self, _index: usize) -> Result<(), DynError> {
|
||||
Err("local deployer does not support restart_node".into())
|
||||
impl NodeControlHandle for LocalNodeManager {
|
||||
async fn restart_node(&self, name: &str) -> Result<(), DynError> {
|
||||
self.restart_node(name).await.map_err(|err| err.into())
|
||||
}
|
||||
|
||||
async fn stop_node(&self, name: &str) -> Result<(), DynError> {
|
||||
self.stop_node(name).await.map_err(|err| err.into())
|
||||
}
|
||||
|
||||
async fn start_node(&self, name: &str) -> Result<StartedNode, DynError> {
|
||||
@ -336,4 +501,8 @@ impl NodeControlHandle for LocalDynamicNodes {
|
||||
fn node_client(&self, name: &str) -> Option<ApiClient> {
|
||||
self.node_client(name)
|
||||
}
|
||||
|
||||
fn node_pid(&self, name: &str) -> Option<u32> {
|
||||
self.node_pid(name)
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,15 +2,16 @@ use std::collections::HashMap;
|
||||
|
||||
use testing_framework_core::nodes::{ApiClient, node::Node};
|
||||
|
||||
pub(crate) struct LocalDynamicState {
|
||||
pub(crate) struct LocalNodeManagerState {
|
||||
pub(crate) node_count: usize,
|
||||
pub(crate) peer_ports: Vec<u16>,
|
||||
pub(crate) peer_ports_by_name: HashMap<String, u16>,
|
||||
pub(crate) clients_by_name: HashMap<String, ApiClient>,
|
||||
pub(crate) indices_by_name: HashMap<String, usize>,
|
||||
pub(crate) nodes: Vec<Node>,
|
||||
}
|
||||
|
||||
impl LocalDynamicState {
|
||||
impl LocalNodeManagerState {
|
||||
fn register_common(&mut self, node_name: &str, network_port: u16, client: ApiClient) {
|
||||
self.peer_ports.push(network_port);
|
||||
self.peer_ports_by_name
|
||||
@ -26,6 +27,8 @@ impl LocalDynamicState {
|
||||
node: Node,
|
||||
) {
|
||||
self.register_common(node_name, network_port, client);
|
||||
let index = self.nodes.len();
|
||||
self.indices_by_name.insert(node_name.to_string(), index);
|
||||
self.node_count += 1;
|
||||
self.nodes.push(node);
|
||||
}
|
||||
|
||||
@ -2,22 +2,19 @@ use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use testing_framework_core::{
|
||||
nodes::common::node::SpawnNodeError,
|
||||
scenario::{
|
||||
BlockFeed, BlockFeedTask, Deployer, DynError, Metrics, NodeClients, NodeControlCapability,
|
||||
RunContext, Runner, Scenario, ScenarioError, spawn_block_feed,
|
||||
},
|
||||
topology::{
|
||||
config::TopologyConfig,
|
||||
deployment::{SpawnTopologyError, Topology},
|
||||
readiness::ReadinessError,
|
||||
},
|
||||
topology::{config::TopologyConfig, deployment::Topology, readiness::ReadinessError},
|
||||
};
|
||||
use thiserror::Error;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::{
|
||||
manual::{LocalManualCluster, ManualClusterError},
|
||||
node_control::{LocalDynamicNodes, LocalDynamicSeed},
|
||||
node_control::{LocalNodeManager, LocalNodeManagerSeed},
|
||||
};
|
||||
/// Spawns nodes as local processes, reusing the existing
|
||||
/// integration harness.
|
||||
@ -32,7 +29,7 @@ pub enum LocalDeployerError {
|
||||
#[error("failed to spawn local topology: {source}")]
|
||||
Spawn {
|
||||
#[source]
|
||||
source: SpawnTopologyError,
|
||||
source: SpawnNodeError,
|
||||
},
|
||||
#[error("readiness probe failed: {source}")]
|
||||
ReadinessFailed {
|
||||
@ -103,19 +100,39 @@ impl Deployer<NodeControlCapability> for LocalDeployer {
|
||||
"starting local deployment with node control"
|
||||
);
|
||||
|
||||
let topology = Self::prepare_topology(scenario, self.membership_check).await?;
|
||||
let node_clients = NodeClients::from_topology(scenario.topology(), &topology);
|
||||
let node_control = Arc::new(LocalDynamicNodes::new_with_seed(
|
||||
let mut nodes = LocalNodeManager::spawn_initial_nodes(scenario.topology())
|
||||
.await
|
||||
.map_err(|source| LocalDeployerError::Spawn { source })?;
|
||||
|
||||
if self.membership_check {
|
||||
let topology = Topology::from_nodes(nodes);
|
||||
|
||||
wait_for_readiness(&topology).await.map_err(|source| {
|
||||
debug!(error = ?source, "local readiness failed");
|
||||
LocalDeployerError::ReadinessFailed { source }
|
||||
})?;
|
||||
|
||||
nodes = topology.into_nodes();
|
||||
|
||||
info!("local nodes are ready");
|
||||
} else {
|
||||
info!("skipping local membership readiness checks");
|
||||
}
|
||||
|
||||
let node_control = Arc::new(LocalNodeManager::new_with_seed(
|
||||
scenario.topology().clone(),
|
||||
node_clients.clone(),
|
||||
LocalDynamicSeed::from_topology(scenario.topology()),
|
||||
NodeClients::default(),
|
||||
LocalNodeManagerSeed::from_topology(scenario.topology()),
|
||||
));
|
||||
|
||||
node_control.initialize_with_nodes(nodes);
|
||||
let node_clients = node_control.node_clients();
|
||||
|
||||
let (block_feed, block_feed_guard) = spawn_block_feed_with(&node_clients).await?;
|
||||
|
||||
let context = RunContext::new(
|
||||
scenario.topology().clone(),
|
||||
Some(topology),
|
||||
None,
|
||||
node_clients,
|
||||
scenario.duration(),
|
||||
Metrics::empty(),
|
||||
@ -158,9 +175,7 @@ impl LocalDeployer {
|
||||
|
||||
info!(nodes = descriptors.nodes().len(), "spawning local nodes");
|
||||
|
||||
let topology = descriptors
|
||||
.clone()
|
||||
.spawn_local()
|
||||
let topology = LocalNodeManager::spawn_initial_topology(descriptors)
|
||||
.await
|
||||
.map_err(|source| LocalDeployerError::Spawn { source })?;
|
||||
|
||||
|
||||
67
testing-framework/deployers/local/tests/restart.rs
Normal file
67
testing-framework/deployers/local/tests/restart.rs
Normal file
@ -0,0 +1,67 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use testing_framework_core::{
|
||||
scenario::{Deployer, ScenarioBuilder},
|
||||
topology::config::TopologyConfig,
|
||||
};
|
||||
use testing_framework_runner_local::LocalDeployer;
|
||||
use tracing_subscriber::fmt::try_init;
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore = "requires local node binary and open ports"]
|
||||
async fn local_restart_node() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let _ = try_init();
|
||||
let mut scenario = ScenarioBuilder::topology_with(|t| t.nodes(1))
|
||||
.enable_node_control()
|
||||
.with_run_duration(Duration::from_secs(1))
|
||||
.build()?;
|
||||
|
||||
let deployer = LocalDeployer::default();
|
||||
let runner = deployer.deploy(&scenario).await?;
|
||||
let context = runner.context();
|
||||
|
||||
let control = context.node_control().ok_or("node control not available")?;
|
||||
|
||||
let node_name = "node-0";
|
||||
let old_pid = control.node_pid(node_name).ok_or("missing node pid")?;
|
||||
|
||||
control.restart_node(node_name).await?;
|
||||
|
||||
let new_pid = control.node_pid(node_name).ok_or("missing node pid")?;
|
||||
assert_ne!(old_pid, new_pid, "expected a new process after restart");
|
||||
|
||||
control.stop_node(node_name).await?;
|
||||
assert!(
|
||||
control.node_pid(node_name).is_none(),
|
||||
"expected node pid to be absent after stop"
|
||||
);
|
||||
|
||||
let _handle = runner.run(&mut scenario).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore = "requires local node binary and open ports"]
|
||||
async fn manual_cluster_restart_node() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let _ = try_init();
|
||||
let deployer = LocalDeployer::default();
|
||||
let cluster = deployer.manual_cluster(TopologyConfig::with_node_numbers(1))?;
|
||||
|
||||
let node_name = cluster.start_node("a").await?.name;
|
||||
|
||||
let old_pid = cluster.node_pid(&node_name).ok_or("missing node pid")?;
|
||||
|
||||
cluster.restart_node(&node_name).await?;
|
||||
|
||||
let new_pid = cluster.node_pid(&node_name).ok_or("missing node pid")?;
|
||||
assert_ne!(old_pid, new_pid, "expected a new process after restart");
|
||||
|
||||
cluster.stop_node(&node_name).await?;
|
||||
assert!(
|
||||
cluster.node_pid(&node_name).is_none(),
|
||||
"expected node pid to be absent after stop"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -44,7 +44,7 @@ impl RandomRestartWorkload {
|
||||
if self.include_nodes {
|
||||
if node_count > 1 {
|
||||
for index in 0..node_count {
|
||||
targets.push(Target::Node(index));
|
||||
targets.push(Target::Node(format!("node-{index}")));
|
||||
}
|
||||
} else if node_count == 1 {
|
||||
info!("chaos restart skipping nodes: only one node configured");
|
||||
@ -76,7 +76,7 @@ impl RandomRestartWorkload {
|
||||
let ready = now.checked_sub(self.target_cooldown).unwrap_or(now);
|
||||
targets
|
||||
.iter()
|
||||
.copied()
|
||||
.cloned()
|
||||
.map(|target| (target, ready))
|
||||
.collect()
|
||||
}
|
||||
@ -111,16 +111,16 @@ impl RandomRestartWorkload {
|
||||
|
||||
let available: Vec<Target> = targets
|
||||
.iter()
|
||||
.copied()
|
||||
.cloned()
|
||||
.filter(|target| cooldowns.get(target).is_none_or(|ready| *ready <= now))
|
||||
.collect();
|
||||
|
||||
if let Some(choice) = available.choose(&mut thread_rng()).copied() {
|
||||
if let Some(choice) = available.choose(&mut thread_rng()).cloned() {
|
||||
tracing::debug!(?choice, "chaos restart picked target");
|
||||
return Ok(choice);
|
||||
}
|
||||
|
||||
if let Some(choice) = targets.choose(&mut thread_rng()).copied() {
|
||||
if let Some(choice) = targets.choose(&mut thread_rng()).cloned() {
|
||||
return Ok(choice);
|
||||
}
|
||||
return Err("chaos restart workload has no eligible targets".into());
|
||||
@ -158,10 +158,10 @@ impl Workload for RandomRestartWorkload {
|
||||
let target = self.pick_target(&targets, &cooldowns).await?;
|
||||
|
||||
match target {
|
||||
Target::Node(index) => {
|
||||
tracing::info!(index, "chaos restarting node");
|
||||
Target::Node(ref name) => {
|
||||
tracing::info!(name, "chaos restarting node");
|
||||
handle
|
||||
.restart_node(index)
|
||||
.restart_node(name)
|
||||
.await
|
||||
.map_err(|err| format!("node restart failed: {err}"))?
|
||||
}
|
||||
@ -172,7 +172,7 @@ impl Workload for RandomRestartWorkload {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
|
||||
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
|
||||
enum Target {
|
||||
Node(usize),
|
||||
Node(String),
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user