Move runtime assembly out of runner and context

This commit is contained in:
andrussal 2026-03-08 14:27:09 +01:00
parent ad288e7421
commit 120b8879a4
7 changed files with 122 additions and 42 deletions

View File

@ -39,7 +39,7 @@ pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs};
pub use runtime::{
ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode, CleanupGuard,
Deployer, Feed, FeedHandle, FeedRuntime, HttpReadinessRequirement, ManagedSource, NodeClients,
ReadinessError, RunContext, RunHandle, RunMetrics, Runner, ScenarioError,
ReadinessError, RunContext, RunHandle, RunMetrics, Runner, RuntimeAssembly, ScenarioError,
SourceOrchestrationPlan, SourceProviders, StabilizationConfig, StaticManagedProvider,
build_source_orchestration_plan,
metrics::{

View File

@ -21,11 +21,23 @@ pub struct RunContext<E: Application> {
cluster_wait: Option<Arc<dyn ClusterWaitHandle<E>>>,
}
/// Low-level runtime assembly input used by deployers to build a runnable
/// cluster context.
pub struct RuntimeAssembly<E: Application> {
descriptors: E::Deployment,
node_clients: NodeClients<E>,
run_duration: Duration,
expectation_cooldown: Duration,
telemetry: Metrics,
feed: <E::FeedRuntime as super::FeedRuntime>::Feed,
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
cluster_wait: Option<Arc<dyn ClusterWaitHandle<E>>>,
}
impl<E: Application> RunContext<E> {
/// Builds a run context from prepared deployment/runtime artifacts.
#[must_use]
#[doc(hidden)]
pub fn new(
pub(crate) fn new(
descriptors: E::Deployment,
node_clients: NodeClients<E>,
run_duration: Duration,
@ -49,8 +61,7 @@ impl<E: Application> RunContext<E> {
}
#[must_use]
#[doc(hidden)]
pub fn with_cluster_wait(mut self, cluster_wait: Arc<dyn ClusterWaitHandle<E>>) -> Self {
pub(crate) fn with_cluster_wait(mut self, cluster_wait: Arc<dyn ClusterWaitHandle<E>>) -> Self {
self.cluster_wait = Some(cluster_wait);
self
}
@ -122,6 +133,79 @@ impl<E: Application> RunContext<E> {
}
}
impl<E: Application> RuntimeAssembly<E> {
#[must_use]
pub fn new(
descriptors: E::Deployment,
node_clients: NodeClients<E>,
run_duration: Duration,
expectation_cooldown: Duration,
telemetry: Metrics,
feed: <E::FeedRuntime as super::FeedRuntime>::Feed,
) -> Self {
Self {
descriptors,
node_clients,
run_duration,
expectation_cooldown,
telemetry,
feed,
node_control: None,
cluster_wait: None,
}
}
#[must_use]
pub fn with_node_control(mut self, node_control: Arc<dyn NodeControlHandle<E>>) -> Self {
self.node_control = Some(node_control);
self
}
#[must_use]
pub fn with_cluster_wait(mut self, cluster_wait: Arc<dyn ClusterWaitHandle<E>>) -> Self {
self.cluster_wait = Some(cluster_wait);
self
}
#[must_use]
pub fn build_context(self) -> RunContext<E> {
let context = RunContext::new(
self.descriptors,
self.node_clients,
self.run_duration,
self.expectation_cooldown,
self.telemetry,
self.feed,
self.node_control,
);
match self.cluster_wait {
Some(cluster_wait) => context.with_cluster_wait(cluster_wait),
None => context,
}
}
#[must_use]
pub fn build_runner(self, cleanup_guard: Option<Box<dyn CleanupGuard>>) -> super::Runner<E> {
super::Runner::new(self.build_context(), cleanup_guard)
}
}
impl<E: Application> From<RunContext<E>> for RuntimeAssembly<E> {
fn from(context: RunContext<E>) -> Self {
Self {
descriptors: context.descriptors,
node_clients: context.node_clients,
run_duration: context.metrics.run_duration(),
expectation_cooldown: context.expectation_cooldown,
telemetry: context.telemetry,
feed: context.feed,
node_control: context.node_control,
cluster_wait: context.cluster_wait,
}
}
}
/// Handle returned by the runner to control the lifecycle of the run.
pub struct RunHandle<E: Application> {
run_context: Arc<RunContext<E>>,
@ -137,16 +221,6 @@ impl<E: Application> Drop for RunHandle<E> {
}
impl<E: Application> RunHandle<E> {
#[must_use]
/// Build a handle from owned context and optional cleanup guard.
#[doc(hidden)]
pub fn new(context: RunContext<E>, cleanup_guard: Option<Box<dyn CleanupGuard>>) -> Self {
Self {
run_context: Arc::new(context),
cleanup_guard,
}
}
#[must_use]
/// Build a handle from a shared context reference.
pub(crate) fn from_shared(

View File

@ -9,7 +9,7 @@ pub mod readiness;
mod runner;
use async_trait::async_trait;
pub use context::{CleanupGuard, RunContext, RunHandle, RunMetrics};
pub use context::{CleanupGuard, RunContext, RunHandle, RunMetrics, RuntimeAssembly};
pub use deployer::{Deployer, ScenarioError};
pub use node_clients::NodeClients;
#[doc(hidden)]

View File

@ -34,10 +34,11 @@ impl<E: Application> Drop for Runner<E> {
}
impl<E: Application> Runner<E> {
/// Construct a runner from the run context and optional cleanup guard.
#[must_use]
#[doc(hidden)]
pub fn new(context: RunContext<E>, cleanup_guard: Option<Box<dyn CleanupGuard>>) -> Self {
pub(crate) fn new(
context: RunContext<E>,
cleanup_guard: Option<Box<dyn CleanupGuard>>,
) -> Self {
Self {
context: Arc::new(context),
cleanup_guard,

View File

@ -6,8 +6,8 @@ use testing_framework_core::{
ApplicationExternalProvider, CleanupGuard, ClusterWaitHandle, DeploymentPolicy, FeedHandle,
FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle,
ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext,
Runner, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider,
build_source_orchestration_plan, orchestrate_sources_with_providers,
Runner, RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders,
StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers,
},
topology::DeploymentDescriptor,
};
@ -169,7 +169,7 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
);
let cleanup_guard: Box<dyn CleanupGuard> = Box::new(feed_task);
Ok(Runner::new(context, Some(cleanup_guard)))
Ok(RuntimeAssembly::from(context).build_runner(Some(cleanup_guard)))
}
fn source_providers(&self, managed_clients: Vec<E::NodeClient>) -> SourceProviders<E> {
@ -283,7 +283,7 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
"compose runtime prepared"
);
Ok(Runner::new(runtime.context, Some(cleanup_guard)))
Ok(RuntimeAssembly::from(runtime.context).build_runner(Some(cleanup_guard)))
}
fn maybe_node_control<Caps>(
@ -462,16 +462,21 @@ fn build_run_context<E: ComposeDeployEnv>(
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
) -> RunContext<E> {
RunContext::new(
let mut assembly = RuntimeAssembly::new(
descriptors,
node_clients,
run_duration,
expectation_cooldown,
telemetry,
feed,
node_control,
)
.with_cluster_wait(cluster_wait)
.with_cluster_wait(cluster_wait);
if let Some(node_control) = node_control {
assembly = assembly.with_node_control(node_control);
}
assembly.build_context()
}
fn resolve_observability_inputs<E, Caps>(

View File

@ -8,7 +8,7 @@ use testing_framework_core::{
Application, ApplicationExternalProvider, CleanupGuard, ClusterWaitHandle, Deployer,
DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, MetricsError,
NodeClients, ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl,
RunContext, Runner, Scenario, SourceOrchestrationPlan, SourceProviders,
RunContext, Runner, RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders,
StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers,
},
topology::DeploymentDescriptor,
@ -230,18 +230,17 @@ where
let telemetry = observability.telemetry_handle()?;
let (feed, feed_task) = spawn_block_feed_with::<E>(&node_clients).await?;
let cluster_wait = attached_cluster_wait::<E, Caps>(scenario, client)?;
let context = RunContext::new(
let context = RuntimeAssembly::new(
scenario.deployment().clone(),
node_clients,
scenario.duration(),
scenario.expectation_cooldown(),
telemetry,
feed,
None,
)
.with_cluster_wait(cluster_wait);
Ok(Runner::new(context, Some(Box::new(feed_task))))
Ok(context.build_runner(Some(Box::new(feed_task))))
}
fn attached_metadata<E, Caps>(scenario: &Scenario<E, Caps>) -> K8sDeploymentMetadata
@ -642,7 +641,7 @@ fn finalize_runner<E: K8sDeployEnv>(
duration_secs, "k8s deployment ready; handing control to scenario runner"
);
Ok(Runner::new(context, Some(cleanup_guard)))
Ok(RuntimeAssembly::from(context).build_runner(Some(cleanup_guard)))
}
fn take_ready_cluster(
@ -664,16 +663,16 @@ fn build_k8s_run_context<E: K8sDeployEnv>(
feed: Feed<E>,
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
) -> RunContext<E> {
RunContext::new(
RuntimeAssembly::new(
descriptors,
node_clients,
duration,
expectation_cooldown,
telemetry,
feed,
None,
)
.with_cluster_wait(cluster_wait)
.build_context()
}
fn endpoint_or_disabled(endpoint: Option<&Url>) -> String {

View File

@ -12,8 +12,8 @@ use testing_framework_core::{
scenario::{
Application, CleanupGuard, Deployer, DeploymentPolicy, DynError, FeedHandle, FeedRuntime,
HttpReadinessRequirement, Metrics, NodeClients, NodeControlCapability, NodeControlHandle,
RetryPolicy, RunContext, Runner, Scenario, ScenarioError, SourceOrchestrationPlan,
build_source_orchestration_plan, spawn_feed,
RetryPolicy, RunContext, Runner, RuntimeAssembly, Scenario, ScenarioError,
SourceOrchestrationPlan, build_source_orchestration_plan, spawn_feed,
},
topology::DeploymentDescriptor,
};
@ -218,7 +218,7 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
let cleanup_guard: Box<dyn CleanupGuard> =
Box::new(LocalProcessGuard::<E>::new(nodes, runtime.feed_task));
Ok(Runner::new(runtime.context, Some(cleanup_guard)))
Ok(RuntimeAssembly::from(runtime.context).build_runner(Some(cleanup_guard)))
}
async fn deploy_with_node_control(
@ -252,10 +252,7 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
)
.await?;
Ok(Runner::new(
runtime.context,
Some(Box::new(runtime.feed_task)),
))
Ok(RuntimeAssembly::from(runtime.context).build_runner(Some(Box::new(runtime.feed_task))))
}
fn node_control_from(
@ -491,15 +488,19 @@ async fn run_context_for<E: Application>(
}
let (feed, feed_task) = spawn_feed_with::<E>(&node_clients).await?;
let context = RunContext::new(
let mut assembly = RuntimeAssembly::new(
descriptors,
node_clients,
duration,
expectation_cooldown,
Metrics::empty(),
feed,
node_control,
);
if let Some(node_control) = node_control {
assembly = assembly.with_node_control(node_control);
}
let context = assembly.build_context();
Ok(RuntimeContext { context, feed_task })
}