Use runtime assembly directly in deployers

This commit is contained in:
andrussal 2026-03-08 14:37:31 +01:00
parent a14d616ee6
commit 19a0c904c1
3 changed files with 47 additions and 59 deletions

View File

@ -5,9 +5,9 @@ use testing_framework_core::{
scenario::{
ApplicationExternalProvider, CleanupGuard, ClusterWaitHandle, DeploymentPolicy, FeedHandle,
FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle,
ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext,
Runner, RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders,
StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers,
ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, Runner,
RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider,
build_source_orchestration_plan, orchestrate_sources_with_providers,
},
topology::DeploymentDescriptor,
};
@ -157,7 +157,7 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
let node_control = self.attached_node_control::<Caps>(scenario)?;
let cluster_wait = self.attached_cluster_wait(scenario)?;
let (feed, feed_task) = spawn_block_feed_with_retry::<E>(&node_clients).await?;
let context = build_run_context(
let assembly = build_runtime_assembly(
scenario.deployment().clone(),
node_clients,
scenario.duration(),
@ -169,7 +169,7 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
);
let cleanup_guard: Box<dyn CleanupGuard> = Box::new(feed_task);
Ok(RuntimeAssembly::from(context).build_runner(Some(cleanup_guard)))
Ok(assembly.build_runner(Some(cleanup_guard)))
}
fn source_providers(&self, managed_clients: Vec<E::NodeClient>) -> SourceProviders<E> {
@ -283,7 +283,7 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
"compose runtime prepared"
);
Ok(RuntimeAssembly::from(runtime.context).build_runner(Some(cleanup_guard)))
Ok(runtime.assembly.build_runner(Some(cleanup_guard)))
}
fn maybe_node_control<Caps>(
@ -379,7 +379,7 @@ struct DeployedNodes<E: ComposeDeployEnv> {
}
struct ComposeRuntime<E: ComposeDeployEnv> {
context: RunContext<E>,
assembly: RuntimeAssembly<E>,
feed_task: FeedHandle,
}
@ -408,7 +408,7 @@ async fn build_compose_runtime<E: ComposeDeployEnv>(
.start_block_feed(&node_clients, input.environment)
.await?;
let context = build_run_context(
let assembly = build_runtime_assembly(
input.descriptors,
node_clients,
input.duration,
@ -419,7 +419,10 @@ async fn build_compose_runtime<E: ComposeDeployEnv>(
input.cluster_wait,
);
Ok(ComposeRuntime { context, feed_task })
Ok(ComposeRuntime {
assembly,
feed_task,
})
}
async fn deploy_nodes<E: ComposeDeployEnv>(
@ -452,7 +455,7 @@ async fn deploy_nodes<E: ComposeDeployEnv>(
})
}
fn build_run_context<E: ComposeDeployEnv>(
fn build_runtime_assembly<E: ComposeDeployEnv>(
descriptors: E::Deployment,
node_clients: NodeClients<E>,
run_duration: Duration,
@ -461,7 +464,7 @@ fn build_run_context<E: ComposeDeployEnv>(
feed: <E::FeedRuntime as FeedRuntime>::Feed,
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
) -> RunContext<E> {
) -> RuntimeAssembly<E> {
let mut assembly = RuntimeAssembly::new(
descriptors,
node_clients,
@ -476,7 +479,7 @@ fn build_run_context<E: ComposeDeployEnv>(
assembly = assembly.with_node_control(node_control);
}
assembly.build_context()
assembly
}
fn resolve_observability_inputs<E, Caps>(

View File

@ -8,7 +8,7 @@ use testing_framework_core::{
Application, ApplicationExternalProvider, CleanupGuard, ClusterWaitHandle, Deployer,
DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, MetricsError,
NodeClients, ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl,
RunContext, Runner, RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders,
Runner, RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders,
StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers,
},
topology::DeploymentDescriptor,
@ -204,10 +204,10 @@ where
runtime.node_clients = resolve_node_clients(&source_plan, source_providers).await?;
ensure_non_empty_node_clients(&runtime.node_clients)?;
let parts = build_runner_parts(scenario, deployment.node_count, runtime, cluster_wait);
log_configured_observability(&observability);
maybe_print_endpoints::<E>(&observability, &parts.node_clients);
maybe_print_endpoints::<E>(&observability, &runtime.node_clients);
let parts = build_runner_parts(scenario, deployment.node_count, runtime, cluster_wait);
let runner = finalize_runner::<E>(&mut cluster, parts)?;
Ok((runner, metadata))
}
@ -497,15 +497,18 @@ fn build_runner_parts<E: K8sDeployEnv, Caps>(
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
) -> K8sRunnerParts<E> {
K8sRunnerParts {
descriptors: scenario.deployment().clone(),
node_clients: runtime.node_clients,
duration: scenario.duration(),
expectation_cooldown: scenario.expectation_cooldown(),
telemetry: runtime.telemetry,
feed: runtime.feed,
assembly: build_k8s_runtime_assembly(
scenario.deployment().clone(),
runtime.node_clients,
scenario.duration(),
scenario.expectation_cooldown(),
runtime.telemetry,
runtime.feed,
cluster_wait,
),
feed_task: runtime.feed_task,
node_count,
cluster_wait,
duration_secs: scenario.duration().as_secs(),
}
}
@ -593,15 +596,10 @@ fn maybe_print_endpoints<E: K8sDeployEnv>(
}
struct K8sRunnerParts<E: K8sDeployEnv> {
descriptors: E::Deployment,
node_clients: NodeClients<E>,
duration: Duration,
expectation_cooldown: Duration,
telemetry: Metrics,
feed: Feed<E>,
assembly: RuntimeAssembly<E>,
feed_task: FeedHandle,
node_count: usize,
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
duration_secs: u64,
}
fn finalize_runner<E: K8sDeployEnv>(
@ -612,36 +610,21 @@ fn finalize_runner<E: K8sDeployEnv>(
let (cleanup, port_forwards) = environment.into_cleanup()?;
let K8sRunnerParts {
descriptors,
node_clients,
duration,
expectation_cooldown,
telemetry,
feed,
assembly,
feed_task,
node_count,
cluster_wait,
duration_secs,
} = parts;
let duration_secs = duration.as_secs();
let cleanup_guard: Box<dyn CleanupGuard> =
Box::new(K8sCleanupGuard::new(cleanup, feed_task, port_forwards));
let context = build_k8s_run_context(
descriptors,
node_clients,
duration,
expectation_cooldown,
telemetry,
feed,
cluster_wait,
);
info!(
nodes = node_count,
duration_secs, "k8s deployment ready; handing control to scenario runner"
);
Ok(RuntimeAssembly::from(context).build_runner(Some(cleanup_guard)))
Ok(assembly.build_runner(Some(cleanup_guard)))
}
fn take_ready_cluster(
@ -654,7 +637,7 @@ fn take_ready_cluster(
})
}
fn build_k8s_run_context<E: K8sDeployEnv>(
fn build_k8s_runtime_assembly<E: K8sDeployEnv>(
descriptors: E::Deployment,
node_clients: NodeClients<E>,
duration: Duration,
@ -662,7 +645,7 @@ fn build_k8s_run_context<E: K8sDeployEnv>(
telemetry: Metrics,
feed: Feed<E>,
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
) -> RunContext<E> {
) -> RuntimeAssembly<E> {
RuntimeAssembly::new(
descriptors,
node_clients,
@ -672,7 +655,6 @@ fn build_k8s_run_context<E: K8sDeployEnv>(
feed,
)
.with_cluster_wait(cluster_wait)
.build_context()
}
fn endpoint_or_disabled(endpoint: Option<&Url>) -> String {

View File

@ -12,8 +12,8 @@ use testing_framework_core::{
scenario::{
Application, CleanupGuard, Deployer, DeploymentPolicy, DynError, FeedHandle, FeedRuntime,
HttpReadinessRequirement, Metrics, NodeClients, NodeControlCapability, NodeControlHandle,
RetryPolicy, RunContext, Runner, RuntimeAssembly, Scenario, ScenarioError,
SourceOrchestrationPlan, build_source_orchestration_plan, spawn_feed,
RetryPolicy, Runner, RuntimeAssembly, Scenario, ScenarioError, SourceOrchestrationPlan,
build_source_orchestration_plan, spawn_feed,
},
topology::DeploymentDescriptor,
};
@ -218,7 +218,7 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
let cleanup_guard: Box<dyn CleanupGuard> =
Box::new(LocalProcessGuard::<E>::new(nodes, runtime.feed_task));
Ok(RuntimeAssembly::from(runtime.context).build_runner(Some(cleanup_guard)))
Ok(runtime.assembly.build_runner(Some(cleanup_guard)))
}
async fn deploy_with_node_control(
@ -252,7 +252,9 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
)
.await?;
Ok(RuntimeAssembly::from(runtime.context).build_runner(Some(Box::new(runtime.feed_task))))
Ok(runtime
.assembly
.build_runner(Some(Box::new(runtime.feed_task))))
}
fn node_control_from(
@ -472,7 +474,7 @@ fn log_local_deploy_start(node_count: usize, policy: DeploymentPolicy, has_node_
}
struct RuntimeContext<E: Application> {
context: RunContext<E>,
assembly: RuntimeAssembly<E>,
feed_task: FeedHandle,
}
@ -500,7 +502,8 @@ async fn run_context_for<E: Application>(
assembly = assembly.with_node_control(node_control);
}
let context = assembly.build_context();
Ok(RuntimeContext { context, feed_task })
Ok(RuntimeContext {
assembly,
feed_task,
})
}