refactor(tf): simplify app seams and add k8s manual cluster

This commit is contained in:
andrussal 2026-03-29 14:09:36 +02:00
parent 909a56e3be
commit ea2e0e1d79
9 changed files with 740 additions and 67 deletions

42
Cargo.lock generated
View File

@ -1305,6 +1305,12 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981"
[[package]]
name = "linux-raw-sys"
version = "0.4.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab"
[[package]]
name = "linux-raw-sys"
version = "0.11.0"
@ -1805,6 +1811,19 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "rustix"
version = "0.38.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154"
dependencies = [
"bitflags",
"errno",
"libc",
"linux-raw-sys 0.4.15",
"windows-sys 0.52.0",
]
[[package]]
name = "rustix"
version = "1.1.3"
@ -1814,7 +1833,7 @@ dependencies = [
"bitflags",
"errno",
"libc",
"linux-raw-sys",
"linux-raw-sys 0.11.0",
"windows-sys 0.61.2",
]
@ -2176,7 +2195,7 @@ dependencies = [
"fastrand",
"getrandom 0.3.4",
"once_cell",
"rustix",
"rustix 1.1.3",
"windows-sys 0.61.2",
]
@ -2253,6 +2272,7 @@ dependencies = [
"kube",
"reqwest",
"serde",
"serde_json",
"serde_yaml",
"tempfile",
"testing-framework-core",
@ -2750,6 +2770,18 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "which"
version = "6.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4ee928febd44d98f2f459a4a79bd4d928591333a494a10a868418ac1b39cf1f"
dependencies = [
"either",
"home",
"rustix 0.38.44",
"winsafe",
]
[[package]]
name = "winapi-util"
version = "0.1.11"
@ -2974,6 +3006,12 @@ version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650"
[[package]]
name = "winsafe"
version = "0.0.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d135d17ab770252ad95e9a872d365cf3090e3be864a34ab46f48555993efc904"
[[package]]
name = "wit-bindgen"
version = "0.51.0"

View File

@ -39,9 +39,6 @@ pub trait StaticArtifactRenderer {
fn serialize_node_config(config: &Self::NodeConfig) -> Result<String, Self::Error>;
}
#[doc(hidden)]
pub use StaticArtifactRenderer as CfgsyncEnv;
#[doc(hidden)]
pub trait StaticNodeConfigProvider: Application {
type Error: Error + Send + Sync + 'static;

View File

@ -1,7 +1,6 @@
use std::{collections::HashMap, error::Error};
use super::ScenarioApplication;
use crate::{cfgsync::StaticNodeConfigProvider, topology::DeploymentDescriptor};
use crate::{cfgsync::StaticNodeConfigProvider, env::Application, topology::DeploymentDescriptor};
#[derive(Clone, Debug)]
pub struct ClusterPeerView {
@ -94,7 +93,7 @@ impl ClusterNodeView {
}
}
pub trait ClusterNodeConfigApplication: ScenarioApplication {
pub trait ClusterNodeConfigApplication: Application {
type ConfigError: Error + Send + Sync + 'static;
fn static_network_port() -> u16;

View File

@ -33,7 +33,7 @@ pub use control::{ClusterWaitHandle, NodeControlHandle};
pub use definition::{Scenario, ScenarioBuildError, ScenarioBuilder};
pub use deployment_policy::{CleanupPolicy, DeploymentPolicy, RetryPolicy};
pub use expectation::Expectation;
pub use noop::ScenarioApplication;
pub use noop::{DefaultFeed, DefaultFeedRuntime, default_feed_result};
pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs};
pub use runtime::{
Deployer, Feed, FeedRuntime, HttpReadinessRequirement, NodeClients, ReadinessError, RunContext,

View File

@ -1,6 +1,6 @@
use async_trait::async_trait;
use super::{Application, DynError, Feed, FeedRuntime, NodeAccess, NodeClients};
use super::{DynError, Feed, FeedRuntime};
#[derive(Clone)]
pub struct DefaultFeed;
@ -20,59 +20,6 @@ impl FeedRuntime for DefaultFeedRuntime {
async fn run(self: Box<Self>) {}
}
/// App surface for the common case where the framework default feed behavior is
/// sufficient and no custom feed runtime is needed.
#[async_trait]
pub trait ScenarioApplication: Send + Sync + 'static {
type Deployment: crate::topology::DeploymentDescriptor + Clone + 'static;
type NodeClient: Clone + Send + Sync + 'static;
type NodeConfig: Clone + Send + Sync + 'static;
fn external_node_client(
_source: &super::ExternalNodeSource,
) -> Result<Self::NodeClient, DynError> {
Err(std::io::Error::other("external node sources are not supported").into())
}
fn build_node_client(_access: &NodeAccess) -> Result<Self::NodeClient, DynError> {
Err(std::io::Error::other("node access is not supported").into())
}
fn node_readiness_path() -> &'static str {
"/"
}
}
#[async_trait]
impl<T> Application for T
where
T: ScenarioApplication,
{
type Deployment = T::Deployment;
type NodeClient = T::NodeClient;
type NodeConfig = T::NodeConfig;
type FeedRuntime = DefaultFeedRuntime;
fn external_node_client(
source: &super::ExternalNodeSource,
) -> Result<Self::NodeClient, DynError> {
T::external_node_client(source)
}
fn build_node_client(access: &NodeAccess) -> Result<Self::NodeClient, DynError> {
T::build_node_client(access)
}
fn node_readiness_path() -> &'static str {
T::node_readiness_path()
}
async fn prepare_feed(
_node_clients: NodeClients<Self>,
) -> Result<(<Self::FeedRuntime as FeedRuntime>::Feed, Self::FeedRuntime), DynError>
where
Self: Sized,
{
Ok((DefaultFeed, DefaultFeedRuntime))
}
pub fn default_feed_result() -> Result<(DefaultFeed, DefaultFeedRuntime), DynError> {
Ok((DefaultFeed, DefaultFeedRuntime))
}

View File

@ -19,6 +19,7 @@ k8s-openapi = { features = ["latest"], version = "0.20" }
kube = { default-features = false, features = ["client", "runtime", "rustls-tls"], version = "0.87" }
reqwest = { features = ["json"], workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
tempfile = { workspace = true }
testing-framework-core = { path = "../../core" }

View File

@ -3,6 +3,7 @@ mod env;
mod host;
mod infrastructure;
mod lifecycle;
mod manual;
mod workspace;
pub mod wait {
pub use crate::lifecycle::wait::*;
@ -27,6 +28,7 @@ pub use infrastructure::{
runtime_spec::{NodeRuntimeSpec, RuntimeSpecError, SharedServiceFileSpec, SharedServiceSpec},
};
pub use lifecycle::cleanup::RunnerCleanup;
pub use manual::{ManualCluster, ManualClusterError};
pub use workspace::{
RequiredPathError, bundled_runner_chart_path, create_temp_workspace, require_existing_paths,
resolve_optional_relative_dir, resolve_workspace_root, write_temp_file,

View File

@ -3,11 +3,11 @@ use std::{env, sync::LazyLock, time::Duration};
use kube::Error as KubeError;
use thiserror::Error;
mod deployment;
pub(crate) mod deployment;
mod forwarding;
mod http_probe;
mod orchestrator;
mod ports;
pub(crate) mod ports;
pub use forwarding::PortForwardHandle;
const DEFAULT_HTTP_POLL_INTERVAL: Duration = Duration::from_secs(1);

View File

@ -0,0 +1,689 @@
use std::{
collections::HashSet,
sync::{Arc, Mutex},
};
use k8s_openapi::api::apps::v1::Deployment;
use kube::{
Api, Client,
api::{Patch, PatchParams},
};
use testing_framework_core::{
manual::ManualClusterHandle,
scenario::{
ClusterWaitHandle, DynError, ExternalNodeSource, HttpReadinessRequirement, NodeClients,
NodeControlHandle, StartNodeOptions, StartedNode,
},
};
use thiserror::Error;
use tokio_retry::{RetryIf, strategy::FixedInterval};
use crate::{
K8sDeployer,
env::{HelmReleaseAssets, K8sDeployEnv, discovered_node_access},
host::node_host,
infrastructure::helm::install_release,
lifecycle::{
cleanup::RunnerCleanup,
wait::{
ClusterWaitError, NodeConfigPorts, deployment::wait_for_deployment_ready,
ports::discover_node_ports,
},
},
};
#[derive(Debug, Error)]
pub enum ManualClusterError {
#[error("kubernetes runner requires at least one node (nodes={nodes})")]
UnsupportedTopology { nodes: usize },
#[error("failed to initialise kubernetes client: {source}")]
ClientInit {
#[source]
source: kube::Error,
},
#[error("failed to prepare k8s assets: {source}")]
Assets {
#[source]
source: DynError,
},
#[error("failed to install k8s stack: {source}")]
InstallStack {
#[source]
source: DynError,
},
#[error(transparent)]
NodePorts(#[from] ClusterWaitError),
#[error("unsupported start options for k8s manual cluster: {message}")]
UnsupportedStartOptions { message: String },
#[error("invalid node name '{name}'; expected node-<index>")]
InvalidNodeName { name: String },
#[error("node index {index} is out of range for topology with {nodes} nodes")]
NodeIndexOutOfRange { index: usize, nodes: usize },
#[error("node '{name}' is already running")]
NodeAlreadyRunning { name: String },
#[error("node '{name}' is not running")]
NodeNotRunning { name: String },
#[error("failed to patch deployment {name}: {source}")]
PatchDeployment {
name: String,
#[source]
source: kube::Error,
},
#[error("failed to delete pods for deployment {name}: {source}")]
DeletePods {
name: String,
#[source]
source: kube::Error,
},
#[error("failed to discover node client for '{name}': {source}")]
NodeClient {
name: String,
#[source]
source: DynError,
},
#[error("node readiness failed for '{name}': {source}")]
NodeReadiness {
name: String,
#[source]
source: DynError,
},
#[error("cluster network readiness failed: {source}")]
NetworkReadiness {
#[source]
source: DynError,
},
}
struct ManualClusterState<E: K8sDeployEnv> {
running: HashSet<usize>,
node_clients: NodeClients<E>,
known_clients: Vec<Option<E::NodeClient>>,
}
pub struct ManualCluster<E: K8sDeployEnv>
where
E::Assets: HelmReleaseAssets,
{
client: Client,
namespace: String,
release: String,
node_count: usize,
node_host: String,
node_allocations: Vec<crate::wait::NodePortAllocation>,
cleanup: Option<RunnerCleanup>,
state: Arc<Mutex<ManualClusterState<E>>>,
}
impl<E: K8sDeployEnv> ManualCluster<E>
where
E::Assets: HelmReleaseAssets,
{
pub async fn from_topology(topology: E::Deployment) -> Result<Self, ManualClusterError> {
let nodes = testing_framework_core::topology::DeploymentDescriptor::node_count(&topology);
if nodes == 0 {
return Err(ManualClusterError::UnsupportedTopology { nodes });
}
let client = Client::try_default()
.await
.map_err(|source| ManualClusterError::ClientInit { source })?;
let assets = E::prepare_assets(&topology, None)
.map_err(|source| ManualClusterError::Assets { source })?;
let (namespace, release) = E::cluster_identifiers();
let install_spec = assets
.release_bundle()
.install_spec(release.clone(), namespace.clone());
install_release(&install_spec).await.map_err(|source| {
ManualClusterError::InstallStack {
source: source.into(),
}
})?;
let cleanup = RunnerCleanup::new(
client.clone(),
namespace.clone(),
release.clone(),
std::env::var("K8S_RUNNER_PRESERVE").is_ok(),
);
let node_ports = E::collect_port_specs(&topology).nodes;
let node_allocations =
discover_all_node_ports::<E>(&client, &namespace, &release, &node_ports).await?;
scale_all_nodes::<E>(&client, &namespace, &release, nodes, 0).await?;
Ok(Self {
client,
namespace,
release,
node_count: nodes,
node_host: node_host(),
node_allocations,
cleanup: Some(cleanup),
state: Arc::new(Mutex::new(ManualClusterState {
running: HashSet::new(),
node_clients: NodeClients::default(),
known_clients: vec![None; nodes],
})),
})
}
#[must_use]
pub fn node_client(&self, name: &str) -> Option<E::NodeClient> {
let index = parse_node_index(name)?;
let state = self
.state
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
state
.known_clients
.get(index)
.and_then(|client| client.clone())
}
#[must_use]
pub fn node_pid(&self, _name: &str) -> Option<u32> {
None
}
pub async fn start_node(&self, name: &str) -> Result<StartedNode<E>, ManualClusterError> {
self.start_node_with(name, StartNodeOptions::<E>::default())
.await
}
pub async fn start_node_with(
&self,
name: &str,
options: StartNodeOptions<E>,
) -> Result<StartedNode<E>, ManualClusterError> {
validate_start_options(&options)?;
let index = self.require_node_index(name)?;
{
let state = self
.state
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
if state.running.contains(&index) {
return Err(ManualClusterError::NodeAlreadyRunning {
name: name.to_owned(),
});
}
}
scale_node::<E>(&self.client, &self.namespace, &self.release, index, 1).await?;
self.wait_node_ready(name).await?;
let client = self.build_client(index, name)?;
let mut state = self
.state
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
state.running.insert(index);
state.known_clients[index] = Some(client.clone());
state.node_clients.add_node(client.clone());
Ok(StartedNode {
name: canonical_node_name(index),
client,
})
}
pub fn stop_all(&self) {
block_on_best_effort(self.stop_all_async());
}
async fn stop_all_async(&self) -> Result<(), ManualClusterError> {
let indices = {
let state = self
.state
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
state.running.iter().copied().collect::<Vec<_>>()
};
for index in indices {
let name = canonical_node_name(index);
self.stop_node(&name).await?;
}
Ok(())
}
pub async fn restart_node(&self, name: &str) -> Result<(), ManualClusterError> {
let index = self.require_running_node_index(name)?;
scale_node::<E>(&self.client, &self.namespace, &self.release, index, 0).await?;
scale_node::<E>(&self.client, &self.namespace, &self.release, index, 1).await?;
self.wait_node_ready(name).await?;
let client = self.build_client(index, name)?;
let mut state = self
.state
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
state.known_clients[index] = Some(client.clone());
state.node_clients.add_node(client);
Ok(())
}
pub async fn stop_node(&self, name: &str) -> Result<(), ManualClusterError> {
let index = self.require_running_node_index(name)?;
scale_node::<E>(&self.client, &self.namespace, &self.release, index, 0).await?;
let mut state = self
.state
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
state.running.remove(&index);
Ok(())
}
pub async fn wait_network_ready(&self) -> Result<(), ManualClusterError> {
let running_ports = {
let state = self
.state
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
state
.running
.iter()
.copied()
.map(|index| self.node_allocations[index].api)
.collect::<Vec<_>>()
};
if running_ports.is_empty() {
return Ok(());
}
let ports = running_ports;
testing_framework_core::scenario::wait_for_http_ports_with_host_and_requirement(
&ports,
&self.node_host,
<E as K8sDeployEnv>::node_readiness_path(),
HttpReadinessRequirement::AllNodesReady,
)
.await
.map_err(|source| ManualClusterError::NetworkReadiness {
source: source.into(),
})
}
pub async fn wait_node_ready(&self, name: &str) -> Result<(), ManualClusterError> {
let index = self.require_node_index(name)?;
let port = self.node_allocations[index].api;
testing_framework_core::scenario::wait_for_http_ports_with_host_and_requirement(
&[port],
&self.node_host,
<E as K8sDeployEnv>::node_readiness_path(),
HttpReadinessRequirement::AllNodesReady,
)
.await
.map_err(|source| ManualClusterError::NodeReadiness {
name: canonical_node_name(index),
source: source.into(),
})
}
#[must_use]
pub fn node_clients(&self) -> NodeClients<E> {
let state = self
.state
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
state.node_clients.clone()
}
pub fn add_external_sources(
&self,
external_sources: impl IntoIterator<Item = ExternalNodeSource>,
) -> Result<(), DynError> {
let node_clients = self.node_clients();
for source in external_sources {
node_clients.add_node(E::external_node_client(&source)?);
}
Ok(())
}
pub fn add_external_clients(&self, clients: impl IntoIterator<Item = E::NodeClient>) {
let node_clients = self.node_clients();
for client in clients {
node_clients.add_node(client);
}
}
fn build_client(&self, index: usize, name: &str) -> Result<E::NodeClient, ManualClusterError> {
let allocation = self.node_allocations[index];
E::build_node_client(&discovered_node_access(
&self.node_host,
allocation.api,
allocation.auxiliary,
))
.map_err(|source| ManualClusterError::NodeClient {
name: name.to_owned(),
source,
})
}
fn require_node_index(&self, name: &str) -> Result<usize, ManualClusterError> {
let index = parse_node_index(name).ok_or_else(|| ManualClusterError::InvalidNodeName {
name: name.to_owned(),
})?;
if index >= self.node_count {
return Err(ManualClusterError::NodeIndexOutOfRange {
index,
nodes: self.node_count,
});
}
Ok(index)
}
fn require_running_node_index(&self, name: &str) -> Result<usize, ManualClusterError> {
let index = self.require_node_index(name)?;
let state = self
.state
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
if !state.running.contains(&index) {
return Err(ManualClusterError::NodeNotRunning {
name: canonical_node_name(index),
});
}
Ok(index)
}
}
impl<E> Drop for ManualCluster<E>
where
E: K8sDeployEnv,
E::Assets: HelmReleaseAssets,
{
fn drop(&mut self) {
self.stop_all();
if let Some(cleanup) = self.cleanup.take() {
testing_framework_core::scenario::internal::CleanupGuard::cleanup(Box::new(cleanup));
}
}
}
#[async_trait::async_trait]
impl<E> NodeControlHandle<E> for ManualCluster<E>
where
E: K8sDeployEnv,
E::Assets: HelmReleaseAssets,
{
async fn restart_node(&self, name: &str) -> Result<(), DynError> {
Self::restart_node(self, name).await.map_err(Into::into)
}
async fn start_node(&self, name: &str) -> Result<StartedNode<E>, DynError> {
Self::start_node(self, name).await.map_err(Into::into)
}
async fn start_node_with(
&self,
name: &str,
options: StartNodeOptions<E>,
) -> Result<StartedNode<E>, DynError> {
Self::start_node_with(self, name, options)
.await
.map_err(Into::into)
}
async fn stop_node(&self, name: &str) -> Result<(), DynError> {
Self::stop_node(self, name).await.map_err(Into::into)
}
fn node_client(&self, name: &str) -> Option<E::NodeClient> {
Self::node_client(self, name)
}
}
#[async_trait::async_trait]
impl<E> ClusterWaitHandle<E> for ManualCluster<E>
where
E: K8sDeployEnv,
E::Assets: HelmReleaseAssets,
{
async fn wait_network_ready(&self) -> Result<(), DynError> {
Self::wait_network_ready(self).await.map_err(Into::into)
}
}
#[async_trait::async_trait]
impl<E> ManualClusterHandle<E> for ManualCluster<E>
where
E: K8sDeployEnv,
E::Assets: HelmReleaseAssets,
{
}
impl<E> K8sDeployer<E>
where
E: K8sDeployEnv,
E::Assets: HelmReleaseAssets,
{
pub async fn manual_cluster_from_descriptors(
&self,
descriptors: E::Deployment,
) -> Result<ManualCluster<E>, ManualClusterError> {
let _ = self;
ManualCluster::from_topology(descriptors).await
}
}
async fn discover_all_node_ports<E: K8sDeployEnv>(
client: &Client,
namespace: &str,
release: &str,
node_ports: &[NodeConfigPorts],
) -> Result<Vec<crate::wait::NodePortAllocation>, ManualClusterError> {
let mut allocations = Vec::with_capacity(node_ports.len());
for (index, ports) in node_ports.iter().enumerate() {
let service_name = E::node_service_name(release, index);
allocations.push(discover_node_ports(client, namespace, &service_name, *ports).await?);
}
Ok(allocations)
}
async fn scale_all_nodes<E: K8sDeployEnv>(
client: &Client,
namespace: &str,
release: &str,
node_count: usize,
replicas: i32,
) -> Result<(), ManualClusterError> {
for index in 0..node_count {
scale_node::<E>(client, namespace, release, index, replicas).await?;
}
Ok(())
}
async fn scale_node<E: K8sDeployEnv>(
client: &Client,
namespace: &str,
release: &str,
index: usize,
replicas: i32,
) -> Result<(), ManualClusterError> {
let name = E::node_deployment_name(release, index);
let deployments = Api::<Deployment>::namespaced(client.clone(), namespace);
let patch = serde_json::json!({"spec": {"replicas": replicas}});
deployments
.patch(&name, &PatchParams::default(), &Patch::Merge(&patch))
.await
.map_err(|source| ManualClusterError::PatchDeployment {
name: name.clone(),
source,
})?;
wait_for_replicas(client, namespace, &name, replicas).await
}
async fn wait_for_replicas(
client: &Client,
namespace: &str,
name: &str,
replicas: i32,
) -> Result<(), ManualClusterError> {
if replicas > 0 {
return wait_for_deployment_ready(client, namespace, name)
.await
.map_err(Into::into);
}
let deployments = Api::<Deployment>::namespaced(client.clone(), namespace);
RetryIf::spawn(
FixedInterval::from_millis(500).take(240),
|| async {
let deployment = deployments.get(name).await.map_err(|source| {
ManualClusterError::PatchDeployment {
name: name.to_owned(),
source,
}
})?;
let ready = deployment
.status
.as_ref()
.and_then(|status| status.ready_replicas)
.unwrap_or(0);
let current = deployment
.spec
.as_ref()
.and_then(|spec| spec.replicas)
.unwrap_or(1);
if ready == 0 && current == 0 {
Ok(())
} else {
Err(ManualClusterError::NodeAlreadyRunning {
name: name.to_owned(),
})
}
},
|error: &ManualClusterError| matches!(error, ManualClusterError::NodeAlreadyRunning { .. }),
)
.await
}
fn validate_start_options<E: K8sDeployEnv>(
options: &StartNodeOptions<E>,
) -> Result<(), ManualClusterError> {
if !matches!(
options.peers,
testing_framework_core::scenario::PeerSelection::DefaultLayout
) {
return Err(ManualClusterError::UnsupportedStartOptions {
message: "custom peer selection is not supported".to_owned(),
});
}
if options.config_override.is_some() || options.config_patch.is_some() {
return Err(ManualClusterError::UnsupportedStartOptions {
message: "config overrides/patches are not supported".to_owned(),
});
}
if options.persist_dir.is_some() || options.snapshot_dir.is_some() {
return Err(ManualClusterError::UnsupportedStartOptions {
message: "persist/snapshot directories are not supported".to_owned(),
});
}
Ok(())
}
fn parse_node_index(name: &str) -> Option<usize> {
name.strip_prefix("node-")?.parse().ok()
}
fn canonical_node_name(index: usize) -> String {
format!("node-{index}")
}
fn block_on_best_effort(fut: impl std::future::Future<Output = Result<(), ManualClusterError>>) {
if let Ok(handle) = tokio::runtime::Handle::try_current() {
tokio::task::block_in_place(|| {
let _ = handle.block_on(fut);
});
return;
}
if let Ok(runtime) = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
let _ = runtime.block_on(fut);
}
}
#[cfg(test)]
mod tests {
use testing_framework_core::scenario::{
Application, DefaultFeedRuntime, NodeAccess, NodeClients, PeerSelection,
default_feed_result,
};
use super::*;
use crate::{
PortSpecs, RenderedHelmChartAssets, render_single_template_chart_assets,
standard_port_specs,
};
struct DummyEnv;
#[async_trait::async_trait]
impl Application for DummyEnv {
type Deployment = testing_framework_core::topology::ClusterTopology;
type NodeClient = String;
type NodeConfig = ();
type FeedRuntime = DefaultFeedRuntime;
fn build_node_client(access: &NodeAccess) -> Result<Self::NodeClient, DynError> {
Ok(access.api_base_url()?.to_string())
}
async fn prepare_feed(
_node_clients: NodeClients<Self>,
) -> Result<
(
testing_framework_core::scenario::DefaultFeed,
Self::FeedRuntime,
),
DynError,
> {
default_feed_result()
}
}
#[async_trait::async_trait]
impl K8sDeployEnv for DummyEnv {
type Assets = RenderedHelmChartAssets;
fn collect_port_specs(_topology: &Self::Deployment) -> PortSpecs {
standard_port_specs(1, 8080, 8081)
}
fn prepare_assets(
_topology: &Self::Deployment,
_metrics_otlp_ingest_url: Option<&reqwest::Url>,
) -> Result<Self::Assets, DynError> {
render_single_template_chart_assets("dummy", "dummy.yaml", "")
}
}
#[test]
fn parse_node_index_accepts_node_labels() {
assert_eq!(parse_node_index("node-0"), Some(0));
assert_eq!(parse_node_index("node-12"), Some(12));
assert_eq!(parse_node_index("validator-0"), None);
}
#[test]
fn validate_start_options_rejects_non_default_inputs() {
let peers = StartNodeOptions::<DummyEnv>::default().with_peers(PeerSelection::None);
assert!(matches!(
validate_start_options(&peers),
Err(ManualClusterError::UnsupportedStartOptions { .. })
));
let persist = StartNodeOptions::<DummyEnv>::default()
.with_persist_dir(std::path::PathBuf::from("/tmp/demo"));
assert!(matches!(
validate_start_options(&persist),
Err(ManualClusterError::UnsupportedStartOptions { .. })
));
}
}