diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index e8ae1eb..3025eaf 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -5,9 +5,9 @@ use testing_framework_core::{ scenario::{ ApplicationExternalProvider, CleanupGuard, ClusterWaitHandle, DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle, - ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext, - Runner, RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders, - StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers, + ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, Runner, + RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, + build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, }; @@ -157,7 +157,7 @@ impl DeploymentOrchestrator { let node_control = self.attached_node_control::(scenario)?; let cluster_wait = self.attached_cluster_wait(scenario)?; let (feed, feed_task) = spawn_block_feed_with_retry::(&node_clients).await?; - let context = build_run_context( + let assembly = build_runtime_assembly( scenario.deployment().clone(), node_clients, scenario.duration(), @@ -169,7 +169,7 @@ impl DeploymentOrchestrator { ); let cleanup_guard: Box = Box::new(feed_task); - Ok(RuntimeAssembly::from(context).build_runner(Some(cleanup_guard))) + Ok(assembly.build_runner(Some(cleanup_guard))) } fn source_providers(&self, managed_clients: Vec) -> SourceProviders { @@ -283,7 +283,7 @@ impl DeploymentOrchestrator { "compose runtime prepared" ); - Ok(RuntimeAssembly::from(runtime.context).build_runner(Some(cleanup_guard))) + Ok(runtime.assembly.build_runner(Some(cleanup_guard))) } fn maybe_node_control( @@ -379,7 +379,7 @@ struct DeployedNodes { } struct ComposeRuntime { - context: RunContext, + assembly: RuntimeAssembly, feed_task: FeedHandle, } @@ -408,7 +408,7 @@ async fn build_compose_runtime( .start_block_feed(&node_clients, input.environment) .await?; - let context = build_run_context( + let assembly = build_runtime_assembly( input.descriptors, node_clients, input.duration, @@ -419,7 +419,10 @@ async fn build_compose_runtime( input.cluster_wait, ); - Ok(ComposeRuntime { context, feed_task }) + Ok(ComposeRuntime { + assembly, + feed_task, + }) } async fn deploy_nodes( @@ -452,7 +455,7 @@ async fn deploy_nodes( }) } -fn build_run_context( +fn build_runtime_assembly( descriptors: E::Deployment, node_clients: NodeClients, run_duration: Duration, @@ -461,7 +464,7 @@ fn build_run_context( feed: ::Feed, node_control: Option>>, cluster_wait: Arc>, -) -> RunContext { +) -> RuntimeAssembly { let mut assembly = RuntimeAssembly::new( descriptors, node_clients, @@ -476,7 +479,7 @@ fn build_run_context( assembly = assembly.with_node_control(node_control); } - assembly.build_context() + assembly } fn resolve_observability_inputs( diff --git a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs index 23b9bc8..f685a7d 100644 --- a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs @@ -8,7 +8,7 @@ use testing_framework_core::{ Application, ApplicationExternalProvider, CleanupGuard, ClusterWaitHandle, Deployer, DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, MetricsError, NodeClients, ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, - RunContext, Runner, RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders, + Runner, RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, @@ -204,10 +204,10 @@ where runtime.node_clients = resolve_node_clients(&source_plan, source_providers).await?; ensure_non_empty_node_clients(&runtime.node_clients)?; - let parts = build_runner_parts(scenario, deployment.node_count, runtime, cluster_wait); - log_configured_observability(&observability); - maybe_print_endpoints::(&observability, &parts.node_clients); + maybe_print_endpoints::(&observability, &runtime.node_clients); + + let parts = build_runner_parts(scenario, deployment.node_count, runtime, cluster_wait); let runner = finalize_runner::(&mut cluster, parts)?; Ok((runner, metadata)) } @@ -497,15 +497,18 @@ fn build_runner_parts( cluster_wait: Arc>, ) -> K8sRunnerParts { K8sRunnerParts { - descriptors: scenario.deployment().clone(), - node_clients: runtime.node_clients, - duration: scenario.duration(), - expectation_cooldown: scenario.expectation_cooldown(), - telemetry: runtime.telemetry, - feed: runtime.feed, + assembly: build_k8s_runtime_assembly( + scenario.deployment().clone(), + runtime.node_clients, + scenario.duration(), + scenario.expectation_cooldown(), + runtime.telemetry, + runtime.feed, + cluster_wait, + ), feed_task: runtime.feed_task, node_count, - cluster_wait, + duration_secs: scenario.duration().as_secs(), } } @@ -593,15 +596,10 @@ fn maybe_print_endpoints( } struct K8sRunnerParts { - descriptors: E::Deployment, - node_clients: NodeClients, - duration: Duration, - expectation_cooldown: Duration, - telemetry: Metrics, - feed: Feed, + assembly: RuntimeAssembly, feed_task: FeedHandle, node_count: usize, - cluster_wait: Arc>, + duration_secs: u64, } fn finalize_runner( @@ -612,36 +610,21 @@ fn finalize_runner( let (cleanup, port_forwards) = environment.into_cleanup()?; let K8sRunnerParts { - descriptors, - node_clients, - duration, - expectation_cooldown, - telemetry, - feed, + assembly, feed_task, node_count, - cluster_wait, + duration_secs, } = parts; - let duration_secs = duration.as_secs(); let cleanup_guard: Box = Box::new(K8sCleanupGuard::new(cleanup, feed_task, port_forwards)); - let context = build_k8s_run_context( - descriptors, - node_clients, - duration, - expectation_cooldown, - telemetry, - feed, - cluster_wait, - ); info!( nodes = node_count, duration_secs, "k8s deployment ready; handing control to scenario runner" ); - Ok(RuntimeAssembly::from(context).build_runner(Some(cleanup_guard))) + Ok(assembly.build_runner(Some(cleanup_guard))) } fn take_ready_cluster( @@ -654,7 +637,7 @@ fn take_ready_cluster( }) } -fn build_k8s_run_context( +fn build_k8s_runtime_assembly( descriptors: E::Deployment, node_clients: NodeClients, duration: Duration, @@ -662,7 +645,7 @@ fn build_k8s_run_context( telemetry: Metrics, feed: Feed, cluster_wait: Arc>, -) -> RunContext { +) -> RuntimeAssembly { RuntimeAssembly::new( descriptors, node_clients, @@ -672,7 +655,6 @@ fn build_k8s_run_context( feed, ) .with_cluster_wait(cluster_wait) - .build_context() } fn endpoint_or_disabled(endpoint: Option<&Url>) -> String { diff --git a/testing-framework/deployers/local/src/deployer/orchestrator.rs b/testing-framework/deployers/local/src/deployer/orchestrator.rs index bf93859..f931e8b 100644 --- a/testing-framework/deployers/local/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/local/src/deployer/orchestrator.rs @@ -12,8 +12,8 @@ use testing_framework_core::{ scenario::{ Application, CleanupGuard, Deployer, DeploymentPolicy, DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlCapability, NodeControlHandle, - RetryPolicy, RunContext, Runner, RuntimeAssembly, Scenario, ScenarioError, - SourceOrchestrationPlan, build_source_orchestration_plan, spawn_feed, + RetryPolicy, Runner, RuntimeAssembly, Scenario, ScenarioError, SourceOrchestrationPlan, + build_source_orchestration_plan, spawn_feed, }, topology::DeploymentDescriptor, }; @@ -218,7 +218,7 @@ impl ProcessDeployer { let cleanup_guard: Box = Box::new(LocalProcessGuard::::new(nodes, runtime.feed_task)); - Ok(RuntimeAssembly::from(runtime.context).build_runner(Some(cleanup_guard))) + Ok(runtime.assembly.build_runner(Some(cleanup_guard))) } async fn deploy_with_node_control( @@ -252,7 +252,9 @@ impl ProcessDeployer { ) .await?; - Ok(RuntimeAssembly::from(runtime.context).build_runner(Some(Box::new(runtime.feed_task)))) + Ok(runtime + .assembly + .build_runner(Some(Box::new(runtime.feed_task)))) } fn node_control_from( @@ -472,7 +474,7 @@ fn log_local_deploy_start(node_count: usize, policy: DeploymentPolicy, has_node_ } struct RuntimeContext { - context: RunContext, + assembly: RuntimeAssembly, feed_task: FeedHandle, } @@ -500,7 +502,8 @@ async fn run_context_for( assembly = assembly.with_node_control(node_control); } - let context = assembly.build_context(); - - Ok(RuntimeContext { context, feed_task }) + Ok(RuntimeContext { + assembly, + feed_task, + }) }