mirror of
https://github.com/logos-blockchain/logos-blockchain-testing.git
synced 2026-04-11 21:53:45 +00:00
feat(core): add scenario runtime extensions
This commit is contained in:
parent
7439f4799a
commit
d79712dd16
@ -2,7 +2,7 @@ use std::time::Duration;
|
||||
|
||||
use super::{
|
||||
Application, CleanupPolicy, DeploymentPolicy, Expectation, HttpReadinessRequirement,
|
||||
RetryPolicy, Workload, internal::CoreBuilderAccess,
|
||||
RetryPolicy, RuntimeExtensionFactory, Workload, internal::CoreBuilderAccess,
|
||||
};
|
||||
use crate::topology::{DeploymentProvider, DeploymentSeed};
|
||||
|
||||
@ -52,6 +52,14 @@ pub trait CoreBuilderExt: CoreBuilderAccess + Sized {
|
||||
self.map_core_builder(|builder| builder.with_expectation_boxed(expectation))
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
fn with_runtime_extension_factory(
|
||||
self,
|
||||
extension: Box<dyn RuntimeExtensionFactory<Self::Env>>,
|
||||
) -> Self {
|
||||
self.map_core_builder(|builder| builder.with_runtime_extension_factory(extension))
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
fn with_run_duration(self, duration: Duration) -> Self {
|
||||
self.map_core_builder(|builder| builder.with_run_duration(duration))
|
||||
|
||||
@ -13,9 +13,9 @@ use crate::{
|
||||
scenario::{
|
||||
Application, DeploymentPolicy, DynError, ExistingCluster, ExternalNodeSource,
|
||||
HttpReadinessRequirement, IntoExistingCluster, NodeControlCapability,
|
||||
ObservabilityCapability, RequiresNodeControl, builder_ops::CoreBuilderAccess,
|
||||
expectation::Expectation, runtime::context::RunMetrics, sources::ScenarioSources,
|
||||
workload::Workload,
|
||||
ObservabilityCapability, RequiresNodeControl, RuntimeExtensionFactory,
|
||||
builder_ops::CoreBuilderAccess, expectation::Expectation, runtime::context::RunMetrics,
|
||||
sources::ScenarioSources, workload::Workload,
|
||||
},
|
||||
topology::{DeploymentDescriptor, DeploymentProvider, DeploymentSeed, FixedDeploymentProvider},
|
||||
};
|
||||
@ -26,6 +26,7 @@ pub struct Builder<E: Application, Caps = ()> {
|
||||
pub(super) topology_seed: Option<DeploymentSeed>,
|
||||
pub(super) workloads: Vec<Box<dyn Workload<E>>>,
|
||||
pub(super) expectations: Vec<Box<dyn Expectation<E>>>,
|
||||
pub(super) runtime_extensions: Vec<Box<dyn RuntimeExtensionFactory<E>>>,
|
||||
pub(super) duration: Duration,
|
||||
pub(super) expectation_cooldown: Option<Duration>,
|
||||
pub(super) deployment_policy: DeploymentPolicy,
|
||||
@ -99,6 +100,14 @@ macro_rules! impl_common_builder_methods {
|
||||
self.map_core_builder(|builder| builder.with_expectation_boxed(expectation))
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_runtime_extension_factory(
|
||||
self,
|
||||
extension: Box<dyn RuntimeExtensionFactory<E>>,
|
||||
) -> Self {
|
||||
self.map_core_builder(|builder| builder.with_runtime_extension_factory(extension))
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_run_duration(self, duration: Duration) -> Self {
|
||||
self.map_core_builder(|builder| builder.with_run_duration(duration))
|
||||
@ -251,6 +260,7 @@ impl<E: Application, Caps: Default> Builder<E, Caps> {
|
||||
topology_seed: None,
|
||||
workloads: Vec::new(),
|
||||
expectations: Vec::new(),
|
||||
runtime_extensions: Vec::new(),
|
||||
duration: Duration::ZERO,
|
||||
expectation_cooldown: None,
|
||||
deployment_policy: DeploymentPolicy::default(),
|
||||
@ -357,6 +367,7 @@ impl<E: Application, Caps> Builder<E, Caps> {
|
||||
topology_seed,
|
||||
workloads,
|
||||
expectations,
|
||||
runtime_extensions,
|
||||
duration,
|
||||
expectation_cooldown,
|
||||
deployment_policy,
|
||||
@ -369,6 +380,7 @@ impl<E: Application, Caps> Builder<E, Caps> {
|
||||
topology_seed,
|
||||
workloads,
|
||||
expectations,
|
||||
runtime_extensions,
|
||||
duration,
|
||||
expectation_cooldown,
|
||||
deployment_policy,
|
||||
@ -414,6 +426,15 @@ impl<E: Application, Caps> Builder<E, Caps> {
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_runtime_extension_factory(
|
||||
mut self,
|
||||
extension: Box<dyn RuntimeExtensionFactory<E>>,
|
||||
) -> Self {
|
||||
self.runtime_extensions.push(extension);
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
/// Configure the intended run duration.
|
||||
pub const fn with_run_duration(mut self, duration: Duration) -> Self {
|
||||
@ -549,6 +570,7 @@ impl<E: Application, Caps> Builder<E, Caps> {
|
||||
descriptors,
|
||||
workloads,
|
||||
parts.expectations,
|
||||
parts.runtime_extensions,
|
||||
run_plan.duration,
|
||||
run_plan.expectation_cooldown,
|
||||
parts.deployment_policy,
|
||||
@ -569,6 +591,7 @@ struct BuilderParts<E: Application, Caps> {
|
||||
topology_seed: Option<DeploymentSeed>,
|
||||
workloads: Vec<Box<dyn Workload<E>>>,
|
||||
expectations: Vec<Box<dyn Expectation<E>>>,
|
||||
runtime_extensions: Vec<Box<dyn RuntimeExtensionFactory<E>>>,
|
||||
duration: Duration,
|
||||
expectation_cooldown: Option<Duration>,
|
||||
deployment_policy: DeploymentPolicy,
|
||||
@ -583,6 +606,7 @@ impl<E: Application, Caps> BuilderParts<E, Caps> {
|
||||
topology_seed,
|
||||
workloads,
|
||||
expectations,
|
||||
runtime_extensions,
|
||||
duration,
|
||||
expectation_cooldown,
|
||||
deployment_policy,
|
||||
@ -596,6 +620,7 @@ impl<E: Application, Caps> BuilderParts<E, Caps> {
|
||||
topology_seed,
|
||||
workloads,
|
||||
expectations,
|
||||
runtime_extensions,
|
||||
duration,
|
||||
expectation_cooldown,
|
||||
deployment_policy,
|
||||
|
||||
@ -6,8 +6,14 @@ use super::builder::Builder;
|
||||
use crate::{
|
||||
scenario::{
|
||||
Application, ClusterControlProfile, ClusterMode, DeploymentPolicy, DynError,
|
||||
ExistingCluster, ExternalNodeSource, HttpReadinessRequirement, expectation::Expectation,
|
||||
runtime::SourceOrchestrationPlan, sources::ScenarioSources, workload::Workload,
|
||||
ExistingCluster, ExternalNodeSource, HttpReadinessRequirement, NodeClients,
|
||||
expectation::Expectation,
|
||||
runtime::{
|
||||
CleanupGuard, RuntimeExtensionFactory, RuntimeExtensions, SourceOrchestrationPlan,
|
||||
prepare_runtime_extensions,
|
||||
},
|
||||
sources::ScenarioSources,
|
||||
workload::Workload,
|
||||
},
|
||||
topology::DynTopologyError,
|
||||
};
|
||||
@ -32,6 +38,7 @@ pub struct Scenario<E: Application, Caps = ()> {
|
||||
deployment: E::Deployment,
|
||||
workloads: Vec<Arc<dyn Workload<E>>>,
|
||||
expectations: Vec<Box<dyn Expectation<E>>>,
|
||||
runtime_extensions: Vec<Box<dyn RuntimeExtensionFactory<E>>>,
|
||||
duration: Duration,
|
||||
expectation_cooldown: Duration,
|
||||
deployment_policy: DeploymentPolicy,
|
||||
@ -45,6 +52,7 @@ impl<E: Application, Caps> Scenario<E, Caps> {
|
||||
deployment: E::Deployment,
|
||||
workloads: Vec<Arc<dyn Workload<E>>>,
|
||||
expectations: Vec<Box<dyn Expectation<E>>>,
|
||||
runtime_extensions: Vec<Box<dyn RuntimeExtensionFactory<E>>>,
|
||||
duration: Duration,
|
||||
expectation_cooldown: Duration,
|
||||
deployment_policy: DeploymentPolicy,
|
||||
@ -56,6 +64,7 @@ impl<E: Application, Caps> Scenario<E, Caps> {
|
||||
deployment,
|
||||
workloads,
|
||||
expectations,
|
||||
runtime_extensions,
|
||||
duration,
|
||||
expectation_cooldown,
|
||||
deployment_policy,
|
||||
@ -145,6 +154,18 @@ impl<E: Application, Caps> Scenario<E, Caps> {
|
||||
pub const fn capabilities(&self) -> &Caps {
|
||||
&self.capabilities
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub async fn prepare_runtime_extensions(
|
||||
&self,
|
||||
node_clients: NodeClients<E>,
|
||||
) -> Result<(RuntimeExtensions, Option<Box<dyn CleanupGuard>>), DynError> {
|
||||
Ok(
|
||||
prepare_runtime_extensions(&self.runtime_extensions, &self.deployment, node_clients)
|
||||
.await?
|
||||
.into_parts(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: Application> super::builder::ScenarioBuilder<E> {
|
||||
|
||||
@ -36,8 +36,9 @@ pub use deployment_policy::{CleanupPolicy, DeploymentPolicy, RetryPolicy};
|
||||
pub use expectation::Expectation;
|
||||
pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs};
|
||||
pub use runtime::{
|
||||
Deployer, HttpReadinessRequirement, NodeClients, ReadinessError, RunContext, RunHandle,
|
||||
RunMetrics, Runner, ScenarioError, StabilizationConfig,
|
||||
Deployer, HttpReadinessRequirement, NodeClients, PreparedRuntimeExtension, ReadinessError,
|
||||
RunContext, RunHandle, RunMetrics, Runner, RuntimeExtensionFactory, RuntimeExtensions,
|
||||
ScenarioError, StabilizationConfig,
|
||||
metrics::{
|
||||
CONSENSUS_PROCESSED_BLOCKS, CONSENSUS_TRANSACTIONS_TOTAL, Metrics, MetricsError,
|
||||
PrometheusEndpoint, PrometheusInstantSample,
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use super::{metrics::Metrics, node_clients::ClusterClient};
|
||||
use super::{CleanupChain, RuntimeExtensions, metrics::Metrics, node_clients::ClusterClient};
|
||||
use crate::scenario::{
|
||||
Application, ClusterControlProfile, ClusterWaitHandle, DynError, NodeClients, NodeControlHandle,
|
||||
};
|
||||
@ -19,6 +19,7 @@ pub struct RunContext<E: Application> {
|
||||
expectation_cooldown: Duration,
|
||||
cluster_control_profile: ClusterControlProfile,
|
||||
telemetry: Metrics,
|
||||
runtime_extensions: RuntimeExtensions,
|
||||
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
|
||||
cluster_wait: Option<Arc<dyn ClusterWaitHandle<E>>>,
|
||||
}
|
||||
@ -32,6 +33,8 @@ pub struct RuntimeAssembly<E: Application> {
|
||||
expectation_cooldown: Duration,
|
||||
cluster_control_profile: ClusterControlProfile,
|
||||
telemetry: Metrics,
|
||||
runtime_extensions: RuntimeExtensions,
|
||||
cleanup_guard: Option<Box<dyn CleanupGuard>>,
|
||||
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
|
||||
cluster_wait: Option<Arc<dyn ClusterWaitHandle<E>>>,
|
||||
}
|
||||
@ -46,6 +49,7 @@ impl<E: Application> RunContext<E> {
|
||||
expectation_cooldown: Duration,
|
||||
cluster_control_profile: ClusterControlProfile,
|
||||
telemetry: Metrics,
|
||||
runtime_extensions: RuntimeExtensions,
|
||||
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
|
||||
) -> Self {
|
||||
let metrics = RunMetrics::new(run_duration);
|
||||
@ -57,6 +61,7 @@ impl<E: Application> RunContext<E> {
|
||||
expectation_cooldown,
|
||||
cluster_control_profile,
|
||||
telemetry,
|
||||
runtime_extensions,
|
||||
node_control,
|
||||
cluster_wait: None,
|
||||
}
|
||||
@ -103,6 +108,29 @@ impl<E: Application> RunContext<E> {
|
||||
self.cluster_control_profile
|
||||
}
|
||||
|
||||
/// Returns a cloned runtime extension value by type.
|
||||
#[must_use]
|
||||
pub fn extension<T>(&self) -> Option<T>
|
||||
where
|
||||
T: Clone + Send + Sync + 'static,
|
||||
{
|
||||
self.runtime_extensions.get::<T>()
|
||||
}
|
||||
|
||||
/// Returns a runtime extension value by type or an error if it is missing.
|
||||
pub fn require_extension<T>(&self) -> Result<T, DynError>
|
||||
where
|
||||
T: Clone + Send + Sync + 'static,
|
||||
{
|
||||
self.extension::<T>().ok_or_else(|| {
|
||||
format!(
|
||||
"runtime extension is not available: {}",
|
||||
std::any::type_name::<T>()
|
||||
)
|
||||
.into()
|
||||
})
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn node_control(&self) -> Option<Arc<dyn NodeControlHandle<E>>> {
|
||||
self.node_control.clone()
|
||||
@ -142,6 +170,8 @@ impl<E: Application> RuntimeAssembly<E> {
|
||||
expectation_cooldown,
|
||||
cluster_control_profile,
|
||||
telemetry,
|
||||
runtime_extensions: RuntimeExtensions::default(),
|
||||
cleanup_guard: None,
|
||||
node_control: None,
|
||||
cluster_wait: None,
|
||||
}
|
||||
@ -153,6 +183,19 @@ impl<E: Application> RuntimeAssembly<E> {
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_runtime_extensions(mut self, runtime_extensions: RuntimeExtensions) -> Self {
|
||||
self.runtime_extensions = runtime_extensions;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
#[doc(hidden)]
|
||||
pub fn with_cleanup_guard(mut self, cleanup_guard: Option<Box<dyn CleanupGuard>>) -> Self {
|
||||
self.cleanup_guard = chain_cleanup_guards(self.cleanup_guard.take(), cleanup_guard);
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_cluster_wait(mut self, cluster_wait: Arc<dyn ClusterWaitHandle<E>>) -> Self {
|
||||
self.cluster_wait = Some(cluster_wait);
|
||||
@ -168,6 +211,7 @@ impl<E: Application> RuntimeAssembly<E> {
|
||||
self.expectation_cooldown,
|
||||
self.cluster_control_profile,
|
||||
self.telemetry,
|
||||
self.runtime_extensions,
|
||||
self.node_control,
|
||||
);
|
||||
|
||||
@ -178,7 +222,11 @@ impl<E: Application> RuntimeAssembly<E> {
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn build_runner(self, cleanup_guard: Option<Box<dyn CleanupGuard>>) -> super::Runner<E> {
|
||||
pub fn build_runner(
|
||||
mut self,
|
||||
cleanup_guard: Option<Box<dyn CleanupGuard>>,
|
||||
) -> super::Runner<E> {
|
||||
let cleanup_guard = chain_cleanup_guards(self.cleanup_guard.take(), cleanup_guard);
|
||||
super::Runner::new(self.build_context(), cleanup_guard)
|
||||
}
|
||||
}
|
||||
@ -192,12 +240,30 @@ impl<E: Application> From<RunContext<E>> for RuntimeAssembly<E> {
|
||||
expectation_cooldown: context.expectation_cooldown,
|
||||
cluster_control_profile: context.cluster_control_profile,
|
||||
telemetry: context.telemetry,
|
||||
runtime_extensions: context.runtime_extensions,
|
||||
cleanup_guard: None,
|
||||
node_control: context.node_control,
|
||||
cluster_wait: context.cluster_wait,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn chain_cleanup_guards(
|
||||
left: Option<Box<dyn CleanupGuard>>,
|
||||
right: Option<Box<dyn CleanupGuard>>,
|
||||
) -> Option<Box<dyn CleanupGuard>> {
|
||||
match (left, right) {
|
||||
(None, None) => None,
|
||||
(Some(guard), None) | (None, Some(guard)) => Some(guard),
|
||||
(Some(left), Some(right)) => {
|
||||
let mut chain = CleanupChain::default();
|
||||
chain.push(left);
|
||||
chain.push(right);
|
||||
Some(Box::new(chain))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle returned by the runner to control the lifecycle of the run.
|
||||
pub struct RunHandle<E: Application> {
|
||||
run_context: Arc<RunContext<E>>,
|
||||
|
||||
179
testing-framework/core/src/scenario/runtime/extensions.rs
Normal file
179
testing-framework/core/src/scenario/runtime/extensions.rs
Normal file
@ -0,0 +1,179 @@
|
||||
use std::{
|
||||
any::{Any, TypeId, type_name},
|
||||
collections::HashMap,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use super::context::CleanupGuard;
|
||||
use crate::scenario::{Application, DynError, NodeClients};
|
||||
|
||||
/// Prepared runtime extension value plus optional cleanup.
|
||||
pub struct PreparedRuntimeExtension {
|
||||
type_id: TypeId,
|
||||
type_name: &'static str,
|
||||
value: Box<dyn Any + Send + Sync>,
|
||||
cleanup: Option<Box<dyn CleanupGuard>>,
|
||||
}
|
||||
|
||||
impl PreparedRuntimeExtension {
|
||||
/// Builds a runtime extension value with no extra cleanup.
|
||||
#[must_use]
|
||||
pub fn new<T>(value: T) -> Self
|
||||
where
|
||||
T: Clone + Send + Sync + 'static,
|
||||
{
|
||||
Self {
|
||||
type_id: TypeId::of::<T>(),
|
||||
type_name: type_name::<T>(),
|
||||
value: Box::new(value),
|
||||
cleanup: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds a runtime extension value with a custom cleanup guard.
|
||||
#[must_use]
|
||||
pub fn with_cleanup<T>(value: T, cleanup: Box<dyn CleanupGuard>) -> Self
|
||||
where
|
||||
T: Clone + Send + Sync + 'static,
|
||||
{
|
||||
Self {
|
||||
cleanup: Some(cleanup),
|
||||
..Self::new(value)
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds a runtime extension value backed by a background task.
|
||||
#[must_use]
|
||||
pub fn from_task<T>(value: T, task: JoinHandle<()>) -> Self
|
||||
where
|
||||
T: Clone + Send + Sync + 'static,
|
||||
{
|
||||
Self::with_cleanup(value, Box::new(TaskCleanupGuard::new(task)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Factory that prepares a scenario runtime extension once node clients are
|
||||
/// available.
|
||||
#[async_trait]
|
||||
pub trait RuntimeExtensionFactory<E: Application>: Send + Sync {
|
||||
/// Prepares one extension value for this scenario run.
|
||||
async fn prepare(
|
||||
&self,
|
||||
deployment: &E::Deployment,
|
||||
node_clients: NodeClients<E>,
|
||||
) -> Result<PreparedRuntimeExtension, DynError>;
|
||||
}
|
||||
|
||||
/// Type-indexed runtime extension store exposed through `RunContext`.
|
||||
#[derive(Default)]
|
||||
pub struct RuntimeExtensions {
|
||||
values: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
|
||||
}
|
||||
|
||||
impl RuntimeExtensions {
|
||||
/// Returns a cloned extension value by type.
|
||||
#[must_use]
|
||||
pub fn get<T>(&self) -> Option<T>
|
||||
where
|
||||
T: Clone + Send + Sync + 'static,
|
||||
{
|
||||
self.values
|
||||
.get(&TypeId::of::<T>())
|
||||
.and_then(|value| value.downcast_ref::<T>())
|
||||
.cloned()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct CleanupChain {
|
||||
guards: Vec<Box<dyn CleanupGuard>>,
|
||||
}
|
||||
|
||||
impl CleanupChain {
|
||||
pub(crate) fn push(&mut self, guard: Box<dyn CleanupGuard>) {
|
||||
self.guards.push(guard);
|
||||
}
|
||||
|
||||
pub(crate) fn push_optional(&mut self, guard: Option<Box<dyn CleanupGuard>>) {
|
||||
if let Some(guard) = guard {
|
||||
self.guards.push(guard);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn into_guard(self) -> Option<Box<dyn CleanupGuard>> {
|
||||
if self.guards.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(Box::new(self))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CleanupGuard for CleanupChain {
|
||||
fn cleanup(mut self: Box<Self>) {
|
||||
while let Some(guard) = self.guards.pop() {
|
||||
guard.cleanup();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct PreparedRuntimeExtensions {
|
||||
values: RuntimeExtensions,
|
||||
cleanup: CleanupChain,
|
||||
}
|
||||
|
||||
impl PreparedRuntimeExtensions {
|
||||
pub(crate) fn into_parts(self) -> (RuntimeExtensions, Option<Box<dyn CleanupGuard>>) {
|
||||
(self.values, self.cleanup.into_guard())
|
||||
}
|
||||
|
||||
fn insert(&mut self, extension: PreparedRuntimeExtension) -> Result<(), DynError> {
|
||||
let PreparedRuntimeExtension {
|
||||
type_id,
|
||||
type_name,
|
||||
value,
|
||||
cleanup,
|
||||
} = extension;
|
||||
|
||||
if self.values.values.contains_key(&type_id) {
|
||||
return Err(format!("duplicate runtime extension type registered: {type_name}").into());
|
||||
}
|
||||
|
||||
self.values.values.insert(type_id, value);
|
||||
self.cleanup.push_optional(cleanup);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn prepare_runtime_extensions<E: Application>(
|
||||
factories: &[Box<dyn RuntimeExtensionFactory<E>>],
|
||||
deployment: &E::Deployment,
|
||||
node_clients: NodeClients<E>,
|
||||
) -> Result<PreparedRuntimeExtensions, DynError> {
|
||||
let mut prepared = PreparedRuntimeExtensions::default();
|
||||
|
||||
for factory in factories {
|
||||
prepared.insert(factory.prepare(deployment, node_clients.clone()).await?)?;
|
||||
}
|
||||
|
||||
Ok(prepared)
|
||||
}
|
||||
|
||||
struct TaskCleanupGuard {
|
||||
handle: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl TaskCleanupGuard {
|
||||
const fn new(handle: JoinHandle<()>) -> Self {
|
||||
Self { handle }
|
||||
}
|
||||
}
|
||||
|
||||
impl CleanupGuard for TaskCleanupGuard {
|
||||
fn cleanup(self: Box<Self>) {
|
||||
self.handle.abort();
|
||||
}
|
||||
}
|
||||
@ -1,5 +1,6 @@
|
||||
pub mod context;
|
||||
mod deployer;
|
||||
mod extensions;
|
||||
mod internal;
|
||||
mod inventory;
|
||||
pub mod metrics;
|
||||
@ -9,6 +10,8 @@ mod runner;
|
||||
|
||||
pub use context::{CleanupGuard, RunContext, RunHandle, RunMetrics, RuntimeAssembly};
|
||||
pub use deployer::{Deployer, ScenarioError};
|
||||
pub(crate) use extensions::{CleanupChain, prepare_runtime_extensions};
|
||||
pub use extensions::{PreparedRuntimeExtension, RuntimeExtensionFactory, RuntimeExtensions};
|
||||
#[doc(hidden)]
|
||||
pub use internal::{
|
||||
ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode, ManagedSource,
|
||||
|
||||
@ -168,6 +168,10 @@ where
|
||||
.await?;
|
||||
|
||||
self.ensure_non_empty_node_clients(&node_clients)?;
|
||||
let (runtime_extensions, runtime_cleanup) = scenario
|
||||
.prepare_runtime_extensions(node_clients.clone())
|
||||
.await
|
||||
.map_err(|source| ComposeRunnerError::RuntimeExtensions { source })?;
|
||||
|
||||
let node_control = self.attached_node_control::<Caps>(scenario)?;
|
||||
let cluster_wait = self.attached_cluster_wait(scenario)?;
|
||||
@ -180,7 +184,9 @@ where
|
||||
observability.telemetry_handle()?,
|
||||
node_control,
|
||||
cluster_wait,
|
||||
);
|
||||
)
|
||||
.with_runtime_extensions(runtime_extensions)
|
||||
.with_cleanup_guard(runtime_cleanup);
|
||||
|
||||
Ok(assembly.build_runner(None))
|
||||
}
|
||||
@ -277,6 +283,7 @@ where
|
||||
maybe_print_endpoints(&observability, &deployed.host, &deployed.host_ports);
|
||||
|
||||
let input = RuntimeBuildInput {
|
||||
scenario,
|
||||
deployed: &deployed,
|
||||
descriptors: prepared.descriptors.clone(),
|
||||
duration: scenario.duration(),
|
||||
@ -286,7 +293,7 @@ where
|
||||
node_control,
|
||||
cluster_wait,
|
||||
};
|
||||
let runtime = build_compose_runtime::<E>(input).await?;
|
||||
let runtime = build_compose_runtime::<E, Caps>(input).await?;
|
||||
let cleanup_guard = make_cleanup_guard(prepared.environment.into_cleanup()?);
|
||||
|
||||
info!(
|
||||
@ -442,7 +449,8 @@ struct ComposeRuntime<E: ComposeDeployEnv> {
|
||||
assembly: RuntimeAssembly<E>,
|
||||
}
|
||||
|
||||
struct RuntimeBuildInput<'a, E: ComposeDeployEnv> {
|
||||
struct RuntimeBuildInput<'a, E: ComposeDeployEnv, Caps> {
|
||||
scenario: &'a Scenario<E, Caps>,
|
||||
deployed: &'a DeployedNodes<E>,
|
||||
descriptors: E::Deployment,
|
||||
duration: Duration,
|
||||
@ -453,14 +461,20 @@ struct RuntimeBuildInput<'a, E: ComposeDeployEnv> {
|
||||
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
|
||||
}
|
||||
|
||||
async fn build_compose_runtime<E: ComposeDeployEnv>(
|
||||
input: RuntimeBuildInput<'_, E>,
|
||||
async fn build_compose_runtime<E: ComposeDeployEnv, Caps>(
|
||||
input: RuntimeBuildInput<'_, E, Caps>,
|
||||
) -> Result<ComposeRuntime<E>, ComposeRunnerError> {
|
||||
let node_clients = input.deployed.node_clients.clone();
|
||||
if node_clients.is_empty() {
|
||||
return Err(ComposeRunnerError::RuntimePreflight);
|
||||
}
|
||||
|
||||
let (runtime_extensions, runtime_cleanup) = input
|
||||
.scenario
|
||||
.prepare_runtime_extensions(node_clients.clone())
|
||||
.await
|
||||
.map_err(|source| ComposeRunnerError::RuntimeExtensions { source })?;
|
||||
|
||||
let assembly = build_runtime_assembly(
|
||||
input.descriptors,
|
||||
node_clients,
|
||||
@ -470,7 +484,9 @@ async fn build_compose_runtime<E: ComposeDeployEnv>(
|
||||
input.telemetry,
|
||||
input.node_control,
|
||||
input.cluster_wait,
|
||||
);
|
||||
)
|
||||
.with_runtime_extensions(runtime_extensions)
|
||||
.with_cleanup_guard(runtime_cleanup);
|
||||
|
||||
Ok(ComposeRuntime { assembly })
|
||||
}
|
||||
|
||||
@ -35,6 +35,11 @@ pub enum ComposeRunnerError {
|
||||
BlockFeedMissing,
|
||||
#[error("runtime preflight failed: no node clients available")]
|
||||
RuntimePreflight,
|
||||
#[error("runtime extension setup failed: {source}")]
|
||||
RuntimeExtensions {
|
||||
#[source]
|
||||
source: DynError,
|
||||
},
|
||||
#[error("source orchestration failed: {source}")]
|
||||
SourceOrchestration {
|
||||
#[source]
|
||||
|
||||
@ -115,6 +115,11 @@ pub enum K8sRunnerError {
|
||||
InternalInvariant { message: String },
|
||||
#[error("runtime preflight failed: no node clients available")]
|
||||
RuntimePreflight,
|
||||
#[error("runtime extension setup failed: {source}")]
|
||||
RuntimeExtensions {
|
||||
#[source]
|
||||
source: DynError,
|
||||
},
|
||||
#[error("source orchestration failed: {source}")]
|
||||
SourceOrchestration {
|
||||
#[source]
|
||||
@ -209,7 +214,7 @@ where
|
||||
log_configured_observability(&observability);
|
||||
maybe_print_endpoints::<E>(&observability, &runtime.node_clients);
|
||||
|
||||
let parts = build_runner_parts(scenario, deployment.node_count, runtime, cluster_wait);
|
||||
let parts = build_runner_parts(scenario, deployment.node_count, runtime, cluster_wait).await?;
|
||||
let runner = finalize_runner::<E>(&mut cluster, parts)?;
|
||||
Ok((runner, metadata))
|
||||
}
|
||||
@ -228,6 +233,10 @@ where
|
||||
let node_clients = resolve_node_clients(&source_plan, source_providers).await?;
|
||||
|
||||
ensure_non_empty_node_clients(&node_clients)?;
|
||||
let (runtime_extensions, runtime_cleanup) = scenario
|
||||
.prepare_runtime_extensions(node_clients.clone())
|
||||
.await
|
||||
.map_err(|source| K8sRunnerError::RuntimeExtensions { source })?;
|
||||
|
||||
let telemetry = observability.telemetry_handle()?;
|
||||
let cluster_wait = attached_cluster_wait::<E, Caps>(scenario, client)?;
|
||||
@ -239,6 +248,8 @@ where
|
||||
scenario.cluster_control_profile(),
|
||||
telemetry,
|
||||
)
|
||||
.with_runtime_extensions(runtime_extensions)
|
||||
.with_cleanup_guard(runtime_cleanup)
|
||||
.with_cluster_wait(cluster_wait);
|
||||
|
||||
Ok(context.build_runner(None))
|
||||
@ -539,13 +550,18 @@ struct RuntimeArtifacts<E: K8sDeployEnv> {
|
||||
telemetry: Metrics,
|
||||
}
|
||||
|
||||
fn build_runner_parts<E: K8sDeployEnv, Caps>(
|
||||
async fn build_runner_parts<E: K8sDeployEnv, Caps>(
|
||||
scenario: &Scenario<E, Caps>,
|
||||
node_count: usize,
|
||||
runtime: RuntimeArtifacts<E>,
|
||||
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
|
||||
) -> K8sRunnerParts<E> {
|
||||
K8sRunnerParts {
|
||||
) -> Result<K8sRunnerParts<E>, K8sRunnerError> {
|
||||
let (runtime_extensions, runtime_cleanup) = scenario
|
||||
.prepare_runtime_extensions(runtime.node_clients.clone())
|
||||
.await
|
||||
.map_err(|source| K8sRunnerError::RuntimeExtensions { source })?;
|
||||
|
||||
Ok(K8sRunnerParts {
|
||||
assembly: build_k8s_runtime_assembly(
|
||||
scenario.deployment().clone(),
|
||||
runtime.node_clients,
|
||||
@ -554,10 +570,12 @@ fn build_runner_parts<E: K8sDeployEnv, Caps>(
|
||||
scenario.cluster_control_profile(),
|
||||
runtime.telemetry,
|
||||
cluster_wait,
|
||||
),
|
||||
)
|
||||
.with_runtime_extensions(runtime_extensions)
|
||||
.with_cleanup_guard(runtime_cleanup),
|
||||
node_count,
|
||||
duration_secs: scenario.duration().as_secs(),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn build_runtime_artifacts<E: K8sDeployEnv>(
|
||||
|
||||
@ -12,7 +12,7 @@ use testing_framework_core::{
|
||||
scenario::{
|
||||
Application, ClusterControlProfile, ClusterMode, Deployer, DeploymentPolicy, DynError,
|
||||
HttpReadinessRequirement, Metrics, NodeClients, NodeControlCapability, NodeControlHandle,
|
||||
RetryPolicy, Runner, Scenario, ScenarioError,
|
||||
RetryPolicy, Runner, RuntimeExtensions, Scenario, ScenarioError,
|
||||
internal::{
|
||||
CleanupGuard, RuntimeAssembly, SourceOrchestrationPlan, build_source_orchestration_plan,
|
||||
},
|
||||
@ -83,6 +83,11 @@ pub enum ProcessDeployerError {
|
||||
},
|
||||
#[error("runtime preflight failed: no node clients available")]
|
||||
RuntimePreflight,
|
||||
#[error("runtime extension setup failed: {source}")]
|
||||
RuntimeExtensions {
|
||||
#[source]
|
||||
source: DynError,
|
||||
},
|
||||
#[error("source orchestration failed: {source}")]
|
||||
SourceOrchestration {
|
||||
#[source]
|
||||
@ -203,12 +208,19 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
|
||||
let node_clients = merge_source_clients_for_local::<E>(&source_plan, node_clients)
|
||||
.map_err(|source| ProcessDeployerError::SourceOrchestration { source })?;
|
||||
|
||||
let (runtime_extensions, runtime_cleanup) = scenario
|
||||
.prepare_runtime_extensions(node_clients.clone())
|
||||
.await
|
||||
.map_err(|source| ProcessDeployerError::RuntimeExtensions { source })?;
|
||||
|
||||
let runtime = run_context_for(
|
||||
scenario.deployment().clone(),
|
||||
node_clients,
|
||||
scenario.duration(),
|
||||
scenario.expectation_cooldown(),
|
||||
scenario.cluster_control_profile(),
|
||||
runtime_extensions,
|
||||
runtime_cleanup,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
@ -242,12 +254,18 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
|
||||
let node_clients =
|
||||
merge_source_clients_for_local::<E>(&source_plan, node_control.node_clients())
|
||||
.map_err(|source| ProcessDeployerError::SourceOrchestration { source })?;
|
||||
let (runtime_extensions, runtime_cleanup) = scenario
|
||||
.prepare_runtime_extensions(node_clients.clone())
|
||||
.await
|
||||
.map_err(|source| ProcessDeployerError::RuntimeExtensions { source })?;
|
||||
let runtime = run_context_for(
|
||||
scenario.deployment().clone(),
|
||||
node_clients,
|
||||
scenario.duration(),
|
||||
scenario.expectation_cooldown(),
|
||||
scenario.cluster_control_profile(),
|
||||
runtime_extensions,
|
||||
runtime_cleanup,
|
||||
Some(node_control),
|
||||
)
|
||||
.await?;
|
||||
@ -497,6 +515,8 @@ async fn run_context_for<E: Application>(
|
||||
duration: Duration,
|
||||
expectation_cooldown: Duration,
|
||||
cluster_control_profile: ClusterControlProfile,
|
||||
runtime_extensions: RuntimeExtensions,
|
||||
runtime_cleanup: Option<Box<dyn CleanupGuard>>,
|
||||
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
|
||||
) -> Result<RuntimeContext<E>, ProcessDeployerError> {
|
||||
if node_clients.is_empty() {
|
||||
@ -510,7 +530,9 @@ async fn run_context_for<E: Application>(
|
||||
expectation_cooldown,
|
||||
cluster_control_profile,
|
||||
Metrics::empty(),
|
||||
);
|
||||
)
|
||||
.with_runtime_extensions(runtime_extensions)
|
||||
.with_cleanup_guard(runtime_cleanup);
|
||||
if let Some(node_control) = node_control {
|
||||
assembly = assembly.with_node_control(node_control);
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user