diff --git a/testing-framework/core/src/scenario/common_builder_ext.rs b/testing-framework/core/src/scenario/common_builder_ext.rs index f0842c3..947b3bf 100644 --- a/testing-framework/core/src/scenario/common_builder_ext.rs +++ b/testing-framework/core/src/scenario/common_builder_ext.rs @@ -2,7 +2,7 @@ use std::time::Duration; use super::{ Application, CleanupPolicy, DeploymentPolicy, Expectation, HttpReadinessRequirement, - RetryPolicy, Workload, internal::CoreBuilderAccess, + RetryPolicy, RuntimeExtensionFactory, Workload, internal::CoreBuilderAccess, }; use crate::topology::{DeploymentProvider, DeploymentSeed}; @@ -52,6 +52,14 @@ pub trait CoreBuilderExt: CoreBuilderAccess + Sized { self.map_core_builder(|builder| builder.with_expectation_boxed(expectation)) } + #[must_use] + fn with_runtime_extension_factory( + self, + extension: Box>, + ) -> Self { + self.map_core_builder(|builder| builder.with_runtime_extension_factory(extension)) + } + #[must_use] fn with_run_duration(self, duration: Duration) -> Self { self.map_core_builder(|builder| builder.with_run_duration(duration)) diff --git a/testing-framework/core/src/scenario/definition/builder.rs b/testing-framework/core/src/scenario/definition/builder.rs index 7e86df0..48673fb 100644 --- a/testing-framework/core/src/scenario/definition/builder.rs +++ b/testing-framework/core/src/scenario/definition/builder.rs @@ -13,9 +13,9 @@ use crate::{ scenario::{ Application, DeploymentPolicy, DynError, ExistingCluster, ExternalNodeSource, HttpReadinessRequirement, IntoExistingCluster, NodeControlCapability, - ObservabilityCapability, RequiresNodeControl, builder_ops::CoreBuilderAccess, - expectation::Expectation, runtime::context::RunMetrics, sources::ScenarioSources, - workload::Workload, + ObservabilityCapability, RequiresNodeControl, RuntimeExtensionFactory, + builder_ops::CoreBuilderAccess, expectation::Expectation, runtime::context::RunMetrics, + sources::ScenarioSources, workload::Workload, }, topology::{DeploymentDescriptor, DeploymentProvider, DeploymentSeed, FixedDeploymentProvider}, }; @@ -26,6 +26,7 @@ pub struct Builder { pub(super) topology_seed: Option, pub(super) workloads: Vec>>, pub(super) expectations: Vec>>, + pub(super) runtime_extensions: Vec>>, pub(super) duration: Duration, pub(super) expectation_cooldown: Option, pub(super) deployment_policy: DeploymentPolicy, @@ -99,6 +100,14 @@ macro_rules! impl_common_builder_methods { self.map_core_builder(|builder| builder.with_expectation_boxed(expectation)) } + #[must_use] + pub fn with_runtime_extension_factory( + self, + extension: Box>, + ) -> Self { + self.map_core_builder(|builder| builder.with_runtime_extension_factory(extension)) + } + #[must_use] pub fn with_run_duration(self, duration: Duration) -> Self { self.map_core_builder(|builder| builder.with_run_duration(duration)) @@ -251,6 +260,7 @@ impl Builder { topology_seed: None, workloads: Vec::new(), expectations: Vec::new(), + runtime_extensions: Vec::new(), duration: Duration::ZERO, expectation_cooldown: None, deployment_policy: DeploymentPolicy::default(), @@ -357,6 +367,7 @@ impl Builder { topology_seed, workloads, expectations, + runtime_extensions, duration, expectation_cooldown, deployment_policy, @@ -369,6 +380,7 @@ impl Builder { topology_seed, workloads, expectations, + runtime_extensions, duration, expectation_cooldown, deployment_policy, @@ -414,6 +426,15 @@ impl Builder { self } + #[must_use] + pub fn with_runtime_extension_factory( + mut self, + extension: Box>, + ) -> Self { + self.runtime_extensions.push(extension); + self + } + #[must_use] /// Configure the intended run duration. pub const fn with_run_duration(mut self, duration: Duration) -> Self { @@ -549,6 +570,7 @@ impl Builder { descriptors, workloads, parts.expectations, + parts.runtime_extensions, run_plan.duration, run_plan.expectation_cooldown, parts.deployment_policy, @@ -569,6 +591,7 @@ struct BuilderParts { topology_seed: Option, workloads: Vec>>, expectations: Vec>>, + runtime_extensions: Vec>>, duration: Duration, expectation_cooldown: Option, deployment_policy: DeploymentPolicy, @@ -583,6 +606,7 @@ impl BuilderParts { topology_seed, workloads, expectations, + runtime_extensions, duration, expectation_cooldown, deployment_policy, @@ -596,6 +620,7 @@ impl BuilderParts { topology_seed, workloads, expectations, + runtime_extensions, duration, expectation_cooldown, deployment_policy, diff --git a/testing-framework/core/src/scenario/definition/model.rs b/testing-framework/core/src/scenario/definition/model.rs index ca6ecc6..281de30 100644 --- a/testing-framework/core/src/scenario/definition/model.rs +++ b/testing-framework/core/src/scenario/definition/model.rs @@ -6,8 +6,14 @@ use super::builder::Builder; use crate::{ scenario::{ Application, ClusterControlProfile, ClusterMode, DeploymentPolicy, DynError, - ExistingCluster, ExternalNodeSource, HttpReadinessRequirement, expectation::Expectation, - runtime::SourceOrchestrationPlan, sources::ScenarioSources, workload::Workload, + ExistingCluster, ExternalNodeSource, HttpReadinessRequirement, NodeClients, + expectation::Expectation, + runtime::{ + CleanupGuard, RuntimeExtensionFactory, RuntimeExtensions, SourceOrchestrationPlan, + prepare_runtime_extensions, + }, + sources::ScenarioSources, + workload::Workload, }, topology::DynTopologyError, }; @@ -32,6 +38,7 @@ pub struct Scenario { deployment: E::Deployment, workloads: Vec>>, expectations: Vec>>, + runtime_extensions: Vec>>, duration: Duration, expectation_cooldown: Duration, deployment_policy: DeploymentPolicy, @@ -45,6 +52,7 @@ impl Scenario { deployment: E::Deployment, workloads: Vec>>, expectations: Vec>>, + runtime_extensions: Vec>>, duration: Duration, expectation_cooldown: Duration, deployment_policy: DeploymentPolicy, @@ -56,6 +64,7 @@ impl Scenario { deployment, workloads, expectations, + runtime_extensions, duration, expectation_cooldown, deployment_policy, @@ -145,6 +154,18 @@ impl Scenario { pub const fn capabilities(&self) -> &Caps { &self.capabilities } + + #[doc(hidden)] + pub async fn prepare_runtime_extensions( + &self, + node_clients: NodeClients, + ) -> Result<(RuntimeExtensions, Option>), DynError> { + Ok( + prepare_runtime_extensions(&self.runtime_extensions, &self.deployment, node_clients) + .await? + .into_parts(), + ) + } } impl super::builder::ScenarioBuilder { diff --git a/testing-framework/core/src/scenario/mod.rs b/testing-framework/core/src/scenario/mod.rs index 6df3bc2..29eb104 100644 --- a/testing-framework/core/src/scenario/mod.rs +++ b/testing-framework/core/src/scenario/mod.rs @@ -36,8 +36,9 @@ pub use deployment_policy::{CleanupPolicy, DeploymentPolicy, RetryPolicy}; pub use expectation::Expectation; pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs}; pub use runtime::{ - Deployer, HttpReadinessRequirement, NodeClients, ReadinessError, RunContext, RunHandle, - RunMetrics, Runner, ScenarioError, StabilizationConfig, + Deployer, HttpReadinessRequirement, NodeClients, PreparedRuntimeExtension, ReadinessError, + RunContext, RunHandle, RunMetrics, Runner, RuntimeExtensionFactory, RuntimeExtensions, + ScenarioError, StabilizationConfig, metrics::{ CONSENSUS_PROCESSED_BLOCKS, CONSENSUS_TRANSACTIONS_TOTAL, Metrics, MetricsError, PrometheusEndpoint, PrometheusInstantSample, diff --git a/testing-framework/core/src/scenario/runtime/context.rs b/testing-framework/core/src/scenario/runtime/context.rs index 89a0cf4..592387c 100644 --- a/testing-framework/core/src/scenario/runtime/context.rs +++ b/testing-framework/core/src/scenario/runtime/context.rs @@ -1,6 +1,6 @@ use std::{sync::Arc, time::Duration}; -use super::{metrics::Metrics, node_clients::ClusterClient}; +use super::{CleanupChain, RuntimeExtensions, metrics::Metrics, node_clients::ClusterClient}; use crate::scenario::{ Application, ClusterControlProfile, ClusterWaitHandle, DynError, NodeClients, NodeControlHandle, }; @@ -19,6 +19,7 @@ pub struct RunContext { expectation_cooldown: Duration, cluster_control_profile: ClusterControlProfile, telemetry: Metrics, + runtime_extensions: RuntimeExtensions, node_control: Option>>, cluster_wait: Option>>, } @@ -32,6 +33,8 @@ pub struct RuntimeAssembly { expectation_cooldown: Duration, cluster_control_profile: ClusterControlProfile, telemetry: Metrics, + runtime_extensions: RuntimeExtensions, + cleanup_guard: Option>, node_control: Option>>, cluster_wait: Option>>, } @@ -46,6 +49,7 @@ impl RunContext { expectation_cooldown: Duration, cluster_control_profile: ClusterControlProfile, telemetry: Metrics, + runtime_extensions: RuntimeExtensions, node_control: Option>>, ) -> Self { let metrics = RunMetrics::new(run_duration); @@ -57,6 +61,7 @@ impl RunContext { expectation_cooldown, cluster_control_profile, telemetry, + runtime_extensions, node_control, cluster_wait: None, } @@ -103,6 +108,29 @@ impl RunContext { self.cluster_control_profile } + /// Returns a cloned runtime extension value by type. + #[must_use] + pub fn extension(&self) -> Option + where + T: Clone + Send + Sync + 'static, + { + self.runtime_extensions.get::() + } + + /// Returns a runtime extension value by type or an error if it is missing. + pub fn require_extension(&self) -> Result + where + T: Clone + Send + Sync + 'static, + { + self.extension::().ok_or_else(|| { + format!( + "runtime extension is not available: {}", + std::any::type_name::() + ) + .into() + }) + } + #[must_use] pub fn node_control(&self) -> Option>> { self.node_control.clone() @@ -142,6 +170,8 @@ impl RuntimeAssembly { expectation_cooldown, cluster_control_profile, telemetry, + runtime_extensions: RuntimeExtensions::default(), + cleanup_guard: None, node_control: None, cluster_wait: None, } @@ -153,6 +183,19 @@ impl RuntimeAssembly { self } + #[must_use] + pub fn with_runtime_extensions(mut self, runtime_extensions: RuntimeExtensions) -> Self { + self.runtime_extensions = runtime_extensions; + self + } + + #[must_use] + #[doc(hidden)] + pub fn with_cleanup_guard(mut self, cleanup_guard: Option>) -> Self { + self.cleanup_guard = chain_cleanup_guards(self.cleanup_guard.take(), cleanup_guard); + self + } + #[must_use] pub fn with_cluster_wait(mut self, cluster_wait: Arc>) -> Self { self.cluster_wait = Some(cluster_wait); @@ -168,6 +211,7 @@ impl RuntimeAssembly { self.expectation_cooldown, self.cluster_control_profile, self.telemetry, + self.runtime_extensions, self.node_control, ); @@ -178,7 +222,11 @@ impl RuntimeAssembly { } #[must_use] - pub fn build_runner(self, cleanup_guard: Option>) -> super::Runner { + pub fn build_runner( + mut self, + cleanup_guard: Option>, + ) -> super::Runner { + let cleanup_guard = chain_cleanup_guards(self.cleanup_guard.take(), cleanup_guard); super::Runner::new(self.build_context(), cleanup_guard) } } @@ -192,12 +240,30 @@ impl From> for RuntimeAssembly { expectation_cooldown: context.expectation_cooldown, cluster_control_profile: context.cluster_control_profile, telemetry: context.telemetry, + runtime_extensions: context.runtime_extensions, + cleanup_guard: None, node_control: context.node_control, cluster_wait: context.cluster_wait, } } } +fn chain_cleanup_guards( + left: Option>, + right: Option>, +) -> Option> { + match (left, right) { + (None, None) => None, + (Some(guard), None) | (None, Some(guard)) => Some(guard), + (Some(left), Some(right)) => { + let mut chain = CleanupChain::default(); + chain.push(left); + chain.push(right); + Some(Box::new(chain)) + } + } +} + /// Handle returned by the runner to control the lifecycle of the run. pub struct RunHandle { run_context: Arc>, diff --git a/testing-framework/core/src/scenario/runtime/extensions.rs b/testing-framework/core/src/scenario/runtime/extensions.rs new file mode 100644 index 0000000..1b9f9f4 --- /dev/null +++ b/testing-framework/core/src/scenario/runtime/extensions.rs @@ -0,0 +1,179 @@ +use std::{ + any::{Any, TypeId, type_name}, + collections::HashMap, +}; + +use async_trait::async_trait; +use tokio::task::JoinHandle; + +use super::context::CleanupGuard; +use crate::scenario::{Application, DynError, NodeClients}; + +/// Prepared runtime extension value plus optional cleanup. +pub struct PreparedRuntimeExtension { + type_id: TypeId, + type_name: &'static str, + value: Box, + cleanup: Option>, +} + +impl PreparedRuntimeExtension { + /// Builds a runtime extension value with no extra cleanup. + #[must_use] + pub fn new(value: T) -> Self + where + T: Clone + Send + Sync + 'static, + { + Self { + type_id: TypeId::of::(), + type_name: type_name::(), + value: Box::new(value), + cleanup: None, + } + } + + /// Builds a runtime extension value with a custom cleanup guard. + #[must_use] + pub fn with_cleanup(value: T, cleanup: Box) -> Self + where + T: Clone + Send + Sync + 'static, + { + Self { + cleanup: Some(cleanup), + ..Self::new(value) + } + } + + /// Builds a runtime extension value backed by a background task. + #[must_use] + pub fn from_task(value: T, task: JoinHandle<()>) -> Self + where + T: Clone + Send + Sync + 'static, + { + Self::with_cleanup(value, Box::new(TaskCleanupGuard::new(task))) + } +} + +/// Factory that prepares a scenario runtime extension once node clients are +/// available. +#[async_trait] +pub trait RuntimeExtensionFactory: Send + Sync { + /// Prepares one extension value for this scenario run. + async fn prepare( + &self, + deployment: &E::Deployment, + node_clients: NodeClients, + ) -> Result; +} + +/// Type-indexed runtime extension store exposed through `RunContext`. +#[derive(Default)] +pub struct RuntimeExtensions { + values: HashMap>, +} + +impl RuntimeExtensions { + /// Returns a cloned extension value by type. + #[must_use] + pub fn get(&self) -> Option + where + T: Clone + Send + Sync + 'static, + { + self.values + .get(&TypeId::of::()) + .and_then(|value| value.downcast_ref::()) + .cloned() + } +} + +#[derive(Default)] +pub(crate) struct CleanupChain { + guards: Vec>, +} + +impl CleanupChain { + pub(crate) fn push(&mut self, guard: Box) { + self.guards.push(guard); + } + + pub(crate) fn push_optional(&mut self, guard: Option>) { + if let Some(guard) = guard { + self.guards.push(guard); + } + } + + pub(crate) fn into_guard(self) -> Option> { + if self.guards.is_empty() { + None + } else { + Some(Box::new(self)) + } + } +} + +impl CleanupGuard for CleanupChain { + fn cleanup(mut self: Box) { + while let Some(guard) = self.guards.pop() { + guard.cleanup(); + } + } +} + +#[derive(Default)] +pub(crate) struct PreparedRuntimeExtensions { + values: RuntimeExtensions, + cleanup: CleanupChain, +} + +impl PreparedRuntimeExtensions { + pub(crate) fn into_parts(self) -> (RuntimeExtensions, Option>) { + (self.values, self.cleanup.into_guard()) + } + + fn insert(&mut self, extension: PreparedRuntimeExtension) -> Result<(), DynError> { + let PreparedRuntimeExtension { + type_id, + type_name, + value, + cleanup, + } = extension; + + if self.values.values.contains_key(&type_id) { + return Err(format!("duplicate runtime extension type registered: {type_name}").into()); + } + + self.values.values.insert(type_id, value); + self.cleanup.push_optional(cleanup); + Ok(()) + } +} + +pub(crate) async fn prepare_runtime_extensions( + factories: &[Box>], + deployment: &E::Deployment, + node_clients: NodeClients, +) -> Result { + let mut prepared = PreparedRuntimeExtensions::default(); + + for factory in factories { + prepared.insert(factory.prepare(deployment, node_clients.clone()).await?)?; + } + + Ok(prepared) +} + +struct TaskCleanupGuard { + handle: JoinHandle<()>, +} + +impl TaskCleanupGuard { + const fn new(handle: JoinHandle<()>) -> Self { + Self { handle } + } +} + +impl CleanupGuard for TaskCleanupGuard { + fn cleanup(self: Box) { + self.handle.abort(); + } +} diff --git a/testing-framework/core/src/scenario/runtime/mod.rs b/testing-framework/core/src/scenario/runtime/mod.rs index c6c2ce7..1c614fa 100644 --- a/testing-framework/core/src/scenario/runtime/mod.rs +++ b/testing-framework/core/src/scenario/runtime/mod.rs @@ -1,5 +1,6 @@ pub mod context; mod deployer; +mod extensions; mod internal; mod inventory; pub mod metrics; @@ -9,6 +10,8 @@ mod runner; pub use context::{CleanupGuard, RunContext, RunHandle, RunMetrics, RuntimeAssembly}; pub use deployer::{Deployer, ScenarioError}; +pub(crate) use extensions::{CleanupChain, prepare_runtime_extensions}; +pub use extensions::{PreparedRuntimeExtension, RuntimeExtensionFactory, RuntimeExtensions}; #[doc(hidden)] pub use internal::{ ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode, ManagedSource, diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index b77344a..0171293 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -168,6 +168,10 @@ where .await?; self.ensure_non_empty_node_clients(&node_clients)?; + let (runtime_extensions, runtime_cleanup) = scenario + .prepare_runtime_extensions(node_clients.clone()) + .await + .map_err(|source| ComposeRunnerError::RuntimeExtensions { source })?; let node_control = self.attached_node_control::(scenario)?; let cluster_wait = self.attached_cluster_wait(scenario)?; @@ -180,7 +184,9 @@ where observability.telemetry_handle()?, node_control, cluster_wait, - ); + ) + .with_runtime_extensions(runtime_extensions) + .with_cleanup_guard(runtime_cleanup); Ok(assembly.build_runner(None)) } @@ -277,6 +283,7 @@ where maybe_print_endpoints(&observability, &deployed.host, &deployed.host_ports); let input = RuntimeBuildInput { + scenario, deployed: &deployed, descriptors: prepared.descriptors.clone(), duration: scenario.duration(), @@ -286,7 +293,7 @@ where node_control, cluster_wait, }; - let runtime = build_compose_runtime::(input).await?; + let runtime = build_compose_runtime::(input).await?; let cleanup_guard = make_cleanup_guard(prepared.environment.into_cleanup()?); info!( @@ -442,7 +449,8 @@ struct ComposeRuntime { assembly: RuntimeAssembly, } -struct RuntimeBuildInput<'a, E: ComposeDeployEnv> { +struct RuntimeBuildInput<'a, E: ComposeDeployEnv, Caps> { + scenario: &'a Scenario, deployed: &'a DeployedNodes, descriptors: E::Deployment, duration: Duration, @@ -453,14 +461,20 @@ struct RuntimeBuildInput<'a, E: ComposeDeployEnv> { cluster_wait: Arc>, } -async fn build_compose_runtime( - input: RuntimeBuildInput<'_, E>, +async fn build_compose_runtime( + input: RuntimeBuildInput<'_, E, Caps>, ) -> Result, ComposeRunnerError> { let node_clients = input.deployed.node_clients.clone(); if node_clients.is_empty() { return Err(ComposeRunnerError::RuntimePreflight); } + let (runtime_extensions, runtime_cleanup) = input + .scenario + .prepare_runtime_extensions(node_clients.clone()) + .await + .map_err(|source| ComposeRunnerError::RuntimeExtensions { source })?; + let assembly = build_runtime_assembly( input.descriptors, node_clients, @@ -470,7 +484,9 @@ async fn build_compose_runtime( input.telemetry, input.node_control, input.cluster_wait, - ); + ) + .with_runtime_extensions(runtime_extensions) + .with_cleanup_guard(runtime_cleanup); Ok(ComposeRuntime { assembly }) } diff --git a/testing-framework/deployers/compose/src/errors.rs b/testing-framework/deployers/compose/src/errors.rs index de3ccbd..1844bf1 100644 --- a/testing-framework/deployers/compose/src/errors.rs +++ b/testing-framework/deployers/compose/src/errors.rs @@ -35,6 +35,11 @@ pub enum ComposeRunnerError { BlockFeedMissing, #[error("runtime preflight failed: no node clients available")] RuntimePreflight, + #[error("runtime extension setup failed: {source}")] + RuntimeExtensions { + #[source] + source: DynError, + }, #[error("source orchestration failed: {source}")] SourceOrchestration { #[source] diff --git a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs index 8a4484b..a6572ee 100644 --- a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs @@ -115,6 +115,11 @@ pub enum K8sRunnerError { InternalInvariant { message: String }, #[error("runtime preflight failed: no node clients available")] RuntimePreflight, + #[error("runtime extension setup failed: {source}")] + RuntimeExtensions { + #[source] + source: DynError, + }, #[error("source orchestration failed: {source}")] SourceOrchestration { #[source] @@ -209,7 +214,7 @@ where log_configured_observability(&observability); maybe_print_endpoints::(&observability, &runtime.node_clients); - let parts = build_runner_parts(scenario, deployment.node_count, runtime, cluster_wait); + let parts = build_runner_parts(scenario, deployment.node_count, runtime, cluster_wait).await?; let runner = finalize_runner::(&mut cluster, parts)?; Ok((runner, metadata)) } @@ -228,6 +233,10 @@ where let node_clients = resolve_node_clients(&source_plan, source_providers).await?; ensure_non_empty_node_clients(&node_clients)?; + let (runtime_extensions, runtime_cleanup) = scenario + .prepare_runtime_extensions(node_clients.clone()) + .await + .map_err(|source| K8sRunnerError::RuntimeExtensions { source })?; let telemetry = observability.telemetry_handle()?; let cluster_wait = attached_cluster_wait::(scenario, client)?; @@ -239,6 +248,8 @@ where scenario.cluster_control_profile(), telemetry, ) + .with_runtime_extensions(runtime_extensions) + .with_cleanup_guard(runtime_cleanup) .with_cluster_wait(cluster_wait); Ok(context.build_runner(None)) @@ -539,13 +550,18 @@ struct RuntimeArtifacts { telemetry: Metrics, } -fn build_runner_parts( +async fn build_runner_parts( scenario: &Scenario, node_count: usize, runtime: RuntimeArtifacts, cluster_wait: Arc>, -) -> K8sRunnerParts { - K8sRunnerParts { +) -> Result, K8sRunnerError> { + let (runtime_extensions, runtime_cleanup) = scenario + .prepare_runtime_extensions(runtime.node_clients.clone()) + .await + .map_err(|source| K8sRunnerError::RuntimeExtensions { source })?; + + Ok(K8sRunnerParts { assembly: build_k8s_runtime_assembly( scenario.deployment().clone(), runtime.node_clients, @@ -554,10 +570,12 @@ fn build_runner_parts( scenario.cluster_control_profile(), runtime.telemetry, cluster_wait, - ), + ) + .with_runtime_extensions(runtime_extensions) + .with_cleanup_guard(runtime_cleanup), node_count, duration_secs: scenario.duration().as_secs(), - } + }) } async fn build_runtime_artifacts( diff --git a/testing-framework/deployers/local/src/deployer/orchestrator.rs b/testing-framework/deployers/local/src/deployer/orchestrator.rs index 446f198..18d7d23 100644 --- a/testing-framework/deployers/local/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/local/src/deployer/orchestrator.rs @@ -12,7 +12,7 @@ use testing_framework_core::{ scenario::{ Application, ClusterControlProfile, ClusterMode, Deployer, DeploymentPolicy, DynError, HttpReadinessRequirement, Metrics, NodeClients, NodeControlCapability, NodeControlHandle, - RetryPolicy, Runner, Scenario, ScenarioError, + RetryPolicy, Runner, RuntimeExtensions, Scenario, ScenarioError, internal::{ CleanupGuard, RuntimeAssembly, SourceOrchestrationPlan, build_source_orchestration_plan, }, @@ -83,6 +83,11 @@ pub enum ProcessDeployerError { }, #[error("runtime preflight failed: no node clients available")] RuntimePreflight, + #[error("runtime extension setup failed: {source}")] + RuntimeExtensions { + #[source] + source: DynError, + }, #[error("source orchestration failed: {source}")] SourceOrchestration { #[source] @@ -203,12 +208,19 @@ impl ProcessDeployer { let node_clients = merge_source_clients_for_local::(&source_plan, node_clients) .map_err(|source| ProcessDeployerError::SourceOrchestration { source })?; + let (runtime_extensions, runtime_cleanup) = scenario + .prepare_runtime_extensions(node_clients.clone()) + .await + .map_err(|source| ProcessDeployerError::RuntimeExtensions { source })?; + let runtime = run_context_for( scenario.deployment().clone(), node_clients, scenario.duration(), scenario.expectation_cooldown(), scenario.cluster_control_profile(), + runtime_extensions, + runtime_cleanup, None, ) .await?; @@ -242,12 +254,18 @@ impl ProcessDeployer { let node_clients = merge_source_clients_for_local::(&source_plan, node_control.node_clients()) .map_err(|source| ProcessDeployerError::SourceOrchestration { source })?; + let (runtime_extensions, runtime_cleanup) = scenario + .prepare_runtime_extensions(node_clients.clone()) + .await + .map_err(|source| ProcessDeployerError::RuntimeExtensions { source })?; let runtime = run_context_for( scenario.deployment().clone(), node_clients, scenario.duration(), scenario.expectation_cooldown(), scenario.cluster_control_profile(), + runtime_extensions, + runtime_cleanup, Some(node_control), ) .await?; @@ -497,6 +515,8 @@ async fn run_context_for( duration: Duration, expectation_cooldown: Duration, cluster_control_profile: ClusterControlProfile, + runtime_extensions: RuntimeExtensions, + runtime_cleanup: Option>, node_control: Option>>, ) -> Result, ProcessDeployerError> { if node_clients.is_empty() { @@ -510,7 +530,9 @@ async fn run_context_for( expectation_cooldown, cluster_control_profile, Metrics::empty(), - ); + ) + .with_runtime_extensions(runtime_extensions) + .with_cleanup_guard(runtime_cleanup); if let Some(node_control) = node_control { assembly = assembly.with_node_control(node_control); }