diff --git a/testing-framework/core/src/env.rs b/testing-framework/core/src/env.rs index f3d3f5b..8b266ed 100644 --- a/testing-framework/core/src/env.rs +++ b/testing-framework/core/src/env.rs @@ -3,10 +3,7 @@ use std::io; use async_trait::async_trait; use crate::{ - scenario::{ - DefaultFeed, DefaultFeedRuntime, DynError, ExternalNodeSource, FeedRuntime, NodeAccess, - NodeClients, - }, + scenario::{DynError, ExternalNodeSource, NodeAccess}, topology::DeploymentDescriptor, }; @@ -19,8 +16,6 @@ pub trait Application: Send + Sync + 'static { type NodeConfig: Clone + Send + Sync + 'static; - type FeedRuntime: FeedRuntime; - /// Build an application node client from a static external source. /// /// Environments that support external nodes should override this. @@ -37,14 +32,4 @@ pub trait Application: Send + Sync + 'static { fn node_readiness_path() -> &'static str { "/" } - - async fn prepare_feed( - _node_clients: NodeClients, - ) -> Result<(::Feed, Self::FeedRuntime), DynError> - where - Self: Sized, - { - let _ = (DefaultFeed::default(), DefaultFeedRuntime::default()); - Ok((Default::default(), Default::default())) - } } diff --git a/testing-framework/core/src/scenario/config.rs b/testing-framework/core/src/scenario/config.rs index 813dd95..bd1aa7e 100644 --- a/testing-framework/core/src/scenario/config.rs +++ b/testing-framework/core/src/scenario/config.rs @@ -294,7 +294,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::scenario::{Application, DefaultFeed, DefaultFeedRuntime, NodeAccess, NodeClients}; + use crate::scenario::{Application, NodeAccess}; struct DummyClusterApp; @@ -303,19 +303,12 @@ mod tests { type Deployment = crate::topology::ClusterTopology; type NodeClient = String; type NodeConfig = String; - type FeedRuntime = DefaultFeedRuntime; fn build_node_client( _access: &NodeAccess, ) -> Result { Ok("client".to_owned()) } - - async fn prepare_feed( - _node_clients: NodeClients, - ) -> Result<(DefaultFeed, Self::FeedRuntime), crate::scenario::DynError> { - crate::scenario::default_feed_result() - } } impl ClusterNodeConfigApplication for DummyClusterApp { diff --git a/testing-framework/core/src/scenario/internal.rs b/testing-framework/core/src/scenario/internal.rs index deeb2dd..650ce31 100644 --- a/testing-framework/core/src/scenario/internal.rs +++ b/testing-framework/core/src/scenario/internal.rs @@ -7,7 +7,7 @@ pub use super::definition::{ #[doc(hidden)] pub use super::runtime::{ ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode, CleanupGuard, - FeedHandle, ManagedSource, RuntimeAssembly, SourceOrchestrationPlan, SourceProviders, + ManagedSource, RuntimeAssembly, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources, orchestrate_sources_with_providers, resolve_sources, }; diff --git a/testing-framework/core/src/scenario/mod.rs b/testing-framework/core/src/scenario/mod.rs index a89380b..6df3bc2 100644 --- a/testing-framework/core/src/scenario/mod.rs +++ b/testing-framework/core/src/scenario/mod.rs @@ -13,7 +13,6 @@ mod definition; mod deployment_policy; mod expectation; pub mod internal; -mod noop; mod observability; mod runtime; mod sources; @@ -35,16 +34,15 @@ pub use control::{ClusterWaitHandle, NodeControlHandle}; pub use definition::{Scenario, ScenarioBuildError, ScenarioBuilder}; pub use deployment_policy::{CleanupPolicy, DeploymentPolicy, RetryPolicy}; pub use expectation::Expectation; -pub use noop::{DefaultFeed, DefaultFeedRuntime, default_feed_result}; pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs}; pub use runtime::{ - Deployer, Feed, FeedRuntime, HttpReadinessRequirement, NodeClients, ReadinessError, RunContext, - RunHandle, RunMetrics, Runner, ScenarioError, StabilizationConfig, + Deployer, HttpReadinessRequirement, NodeClients, ReadinessError, RunContext, RunHandle, + RunMetrics, Runner, ScenarioError, StabilizationConfig, metrics::{ CONSENSUS_PROCESSED_BLOCKS, CONSENSUS_TRANSACTIONS_TOTAL, Metrics, MetricsError, PrometheusEndpoint, PrometheusInstantSample, }, - spawn_feed, wait_for_http_ports, wait_for_http_ports_with_host, + wait_for_http_ports, wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement, wait_for_http_ports_with_requirement, wait_http_readiness, wait_until_stable, }; diff --git a/testing-framework/core/src/scenario/noop.rs b/testing-framework/core/src/scenario/noop.rs deleted file mode 100644 index eafaf43..0000000 --- a/testing-framework/core/src/scenario/noop.rs +++ /dev/null @@ -1,26 +0,0 @@ -use async_trait::async_trait; - -use super::{DynError, Feed, FeedRuntime}; - -#[derive(Clone, Default)] -pub struct DefaultFeed; - -impl Feed for DefaultFeed { - type Subscription = (); - - fn subscribe(&self) -> Self::Subscription {} -} - -#[derive(Default)] -pub struct DefaultFeedRuntime; - -#[async_trait] -impl FeedRuntime for DefaultFeedRuntime { - type Feed = DefaultFeed; - - async fn run(self: Box) {} -} - -pub fn default_feed_result() -> Result<(DefaultFeed, DefaultFeedRuntime), DynError> { - Ok((DefaultFeed, DefaultFeedRuntime)) -} diff --git a/testing-framework/core/src/scenario/runtime/context.rs b/testing-framework/core/src/scenario/runtime/context.rs index 438cb57..89a0cf4 100644 --- a/testing-framework/core/src/scenario/runtime/context.rs +++ b/testing-framework/core/src/scenario/runtime/context.rs @@ -19,7 +19,6 @@ pub struct RunContext { expectation_cooldown: Duration, cluster_control_profile: ClusterControlProfile, telemetry: Metrics, - feed: ::Feed, node_control: Option>>, cluster_wait: Option>>, } @@ -33,7 +32,6 @@ pub struct RuntimeAssembly { expectation_cooldown: Duration, cluster_control_profile: ClusterControlProfile, telemetry: Metrics, - feed: ::Feed, node_control: Option>>, cluster_wait: Option>>, } @@ -48,7 +46,6 @@ impl RunContext { expectation_cooldown: Duration, cluster_control_profile: ClusterControlProfile, telemetry: Metrics, - feed: ::Feed, node_control: Option>>, ) -> Self { let metrics = RunMetrics::new(run_duration); @@ -60,7 +57,6 @@ impl RunContext { expectation_cooldown, cluster_control_profile, telemetry, - feed, node_control, cluster_wait: None, } @@ -87,11 +83,6 @@ impl RunContext { self.node_clients.random_client() } - #[must_use] - pub fn feed(&self) -> ::Feed { - self.feed.clone() - } - #[must_use] pub const fn telemetry(&self) -> &Metrics { &self.telemetry @@ -143,7 +134,6 @@ impl RuntimeAssembly { expectation_cooldown: Duration, cluster_control_profile: ClusterControlProfile, telemetry: Metrics, - feed: ::Feed, ) -> Self { Self { descriptors, @@ -152,7 +142,6 @@ impl RuntimeAssembly { expectation_cooldown, cluster_control_profile, telemetry, - feed, node_control: None, cluster_wait: None, } @@ -179,7 +168,6 @@ impl RuntimeAssembly { self.expectation_cooldown, self.cluster_control_profile, self.telemetry, - self.feed, self.node_control, ); @@ -204,7 +192,6 @@ impl From> for RuntimeAssembly { expectation_cooldown: context.expectation_cooldown, cluster_control_profile: context.cluster_control_profile, telemetry: context.telemetry, - feed: context.feed, node_control: context.node_control, cluster_wait: context.cluster_wait, } diff --git a/testing-framework/core/src/scenario/runtime/mod.rs b/testing-framework/core/src/scenario/runtime/mod.rs index bc524bd..c6c2ce7 100644 --- a/testing-framework/core/src/scenario/runtime/mod.rs +++ b/testing-framework/core/src/scenario/runtime/mod.rs @@ -7,7 +7,6 @@ mod node_clients; pub mod readiness; mod runner; -use async_trait::async_trait; pub use context::{CleanupGuard, RunContext, RunHandle, RunMetrics, RuntimeAssembly}; pub use deployer::{Deployer, ScenarioError}; #[doc(hidden)] @@ -24,51 +23,3 @@ pub use readiness::{ wait_for_http_ports_with_requirement, wait_http_readiness, wait_until_stable, }; pub use runner::Runner; -use tokio::task::JoinHandle; - -use crate::{env::Application, scenario::DynError}; - -/// Cloneable feed handle exposed to workloads and expectations. -pub trait Feed: Clone + Default + Send + Sync + 'static { - type Subscription: Send + 'static; - - fn subscribe(&self) -> Self::Subscription; -} - -/// Background worker driving a cluster feed. -#[async_trait] -pub trait FeedRuntime: Default + Send + 'static { - type Feed: Feed; - - async fn run(self: Box); -} - -/// Cleanup guard for a spawned feed worker. -pub struct FeedHandle { - handle: JoinHandle<()>, -} - -impl FeedHandle { - pub const fn new(handle: JoinHandle<()>) -> Self { - Self { handle } - } -} - -impl CleanupGuard for FeedHandle { - fn cleanup(self: Box) { - self.handle.abort(); - } -} - -/// Spawn a background task that drives the environment-provided feed. -pub async fn spawn_feed( - node_clients: NodeClients, -) -> Result<(::Feed, FeedHandle), DynError> { - let (feed, worker) = E::prepare_feed(node_clients).await?; - - let handle = tokio::spawn(async move { - Box::new(worker).run().await; - }); - - Ok((feed, FeedHandle::new(handle))) -} diff --git a/testing-framework/deployers/compose/src/deployer/clients.rs b/testing-framework/deployers/compose/src/deployer/clients.rs index 0574391..ead2942 100644 --- a/testing-framework/deployers/compose/src/deployer/clients.rs +++ b/testing-framework/deployers/compose/src/deployer/clients.rs @@ -1,17 +1,13 @@ use std::{fmt::Debug, marker::PhantomData}; -use testing_framework_core::scenario::{ - Application, FeedRuntime, NodeClients, internal::FeedHandle, -}; -use tracing::{info, warn}; +use testing_framework_core::scenario::NodeClients; +use tracing::warn; use crate::{ env::ComposeDeployEnv, errors::ComposeRunnerError, infrastructure::{environment::StackEnvironment, ports::HostPortMapping}, - lifecycle::{ - block_feed::spawn_block_feed_with_retry, readiness::build_node_clients_with_ports, - }, + lifecycle::readiness::build_node_clients_with_ports, }; pub struct ClientBuilder { @@ -39,29 +35,6 @@ impl ClientBuilder { ) .await } - - pub async fn start_block_feed( - &self, - node_clients: &NodeClients, - environment: &mut StackEnvironment, - ) -> Result< - ( - <::FeedRuntime as FeedRuntime>::Feed, - FeedHandle, - ), - ComposeRunnerError, - > { - let pair = ensure_step( - environment, - spawn_block_feed_with_retry::(node_clients).await, - "failed to initialize block feed", - "block feed initialization failed", - ) - .await?; - - info!("block feed connected to node"); - Ok(pair) - } } async fn ensure_step( diff --git a/testing-framework/deployers/compose/src/deployer/mod.rs b/testing-framework/deployers/compose/src/deployer/mod.rs index ad73efc..7a89aea 100644 --- a/testing-framework/deployers/compose/src/deployer/mod.rs +++ b/testing-framework/deployers/compose/src/deployer/mod.rs @@ -10,8 +10,7 @@ use std::marker::PhantomData; use async_trait::async_trait; use testing_framework_core::scenario::{ Deployer, DynError, ExistingCluster, IntoExistingCluster, ObservabilityCapabilityProvider, - RequiresNodeControl, Runner, Scenario, - internal::{CleanupGuard, FeedHandle}, + RequiresNodeControl, Runner, Scenario, internal::CleanupGuard, }; use crate::{env::ComposeDeployEnv, errors::ComposeRunnerError, lifecycle::cleanup::RunnerCleanup}; @@ -176,32 +175,22 @@ where pub(super) struct ComposeCleanupGuard { environment: RunnerCleanup, - block_feed: Option, } impl ComposeCleanupGuard { - const fn new(environment: RunnerCleanup, block_feed: FeedHandle) -> Self { - Self { - environment, - block_feed: Some(block_feed), - } + const fn new(environment: RunnerCleanup) -> Self { + Self { environment } } } impl CleanupGuard for ComposeCleanupGuard { - fn cleanup(mut self: Box) { - if let Some(block_feed) = self.block_feed.take() { - CleanupGuard::cleanup(Box::new(block_feed)); - } + fn cleanup(self: Box) { CleanupGuard::cleanup(Box::new(self.environment)); } } -pub(super) fn make_cleanup_guard( - environment: RunnerCleanup, - block_feed: FeedHandle, -) -> Box { - Box::new(ComposeCleanupGuard::new(environment, block_feed)) +pub(super) fn make_cleanup_guard(environment: RunnerCleanup) -> Box { + Box::new(ComposeCleanupGuard::new(environment)) } #[cfg(test)] diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index 15e7432..b77344a 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -4,13 +4,13 @@ use reqwest::Url; use testing_framework_core::{ scenario::{ Application, ClusterControlProfile, ClusterMode, ClusterWaitHandle, DeploymentPolicy, - DynError, ExistingCluster, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, + DynError, ExistingCluster, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, Runner, Scenario, internal::{ - ApplicationExternalProvider, CleanupGuard, FeedHandle, RuntimeAssembly, - SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, - build_source_orchestration_plan, orchestrate_sources_with_providers, + ApplicationExternalProvider, RuntimeAssembly, SourceOrchestrationPlan, SourceProviders, + StaticManagedProvider, build_source_orchestration_plan, + orchestrate_sources_with_providers, }, }, topology::DeploymentDescriptor, @@ -34,7 +34,6 @@ use crate::{ environment::StackEnvironment, ports::{HostPortMapping, compose_runner_host}, }, - lifecycle::block_feed::spawn_block_feed_with_retry, }; const PRINT_ENDPOINTS_ENV: &str = "TESTNET_PRINT_ENDPOINTS"; @@ -172,7 +171,6 @@ where let node_control = self.attached_node_control::(scenario)?; let cluster_wait = self.attached_cluster_wait(scenario)?; - let (feed, feed_task) = spawn_block_feed_with_retry::(&node_clients).await?; let assembly = build_runtime_assembly( scenario.deployment().clone(), node_clients, @@ -180,13 +178,11 @@ where scenario.expectation_cooldown(), scenario.cluster_control_profile(), observability.telemetry_handle()?, - feed, node_control, cluster_wait, ); - let cleanup_guard: Box = Box::new(feed_task); - Ok(assembly.build_runner(Some(cleanup_guard))) + Ok(assembly.build_runner(None)) } fn source_providers(&self, managed_clients: Vec) -> SourceProviders { @@ -262,7 +258,7 @@ where async fn build_runner( &self, scenario: &Scenario, - mut prepared: PreparedDeployment, + prepared: PreparedDeployment, deployed: DeployedNodes, observability: ObservabilityInputs, readiness_enabled: bool, @@ -287,13 +283,11 @@ where expectation_cooldown: scenario.expectation_cooldown(), cluster_control_profile: scenario.cluster_control_profile(), telemetry, - environment: &mut prepared.environment, node_control, cluster_wait, }; let runtime = build_compose_runtime::(input).await?; - let cleanup_guard = - make_cleanup_guard(prepared.environment.into_cleanup()?, runtime.feed_task); + let cleanup_guard = make_cleanup_guard(prepared.environment.into_cleanup()?); info!( effective_readiness = readiness_enabled, @@ -442,12 +436,10 @@ struct DeployedNodes { host_ports: HostPortMapping, host: String, node_clients: NodeClients, - client_builder: ClientBuilder, } struct ComposeRuntime { assembly: RuntimeAssembly, - feed_task: FeedHandle, } struct RuntimeBuildInput<'a, E: ComposeDeployEnv> { @@ -457,7 +449,6 @@ struct RuntimeBuildInput<'a, E: ComposeDeployEnv> { expectation_cooldown: Duration, cluster_control_profile: ClusterControlProfile, telemetry: Metrics, - environment: &'a mut StackEnvironment, node_control: Option>>, cluster_wait: Arc>, } @@ -470,12 +461,6 @@ async fn build_compose_runtime( return Err(ComposeRunnerError::RuntimePreflight); } - let (feed, feed_task) = input - .deployed - .client_builder - .start_block_feed(&node_clients, input.environment) - .await?; - let assembly = build_runtime_assembly( input.descriptors, node_clients, @@ -483,15 +468,11 @@ async fn build_compose_runtime( input.expectation_cooldown, input.cluster_control_profile, input.telemetry, - feed, input.node_control, input.cluster_wait, ); - Ok(ComposeRuntime { - assembly, - feed_task, - }) + Ok(ComposeRuntime { assembly }) } async fn deploy_nodes( @@ -520,7 +501,6 @@ async fn deploy_nodes( host_ports, host, node_clients, - client_builder, }) } @@ -531,7 +511,6 @@ fn build_runtime_assembly( expectation_cooldown: Duration, cluster_control_profile: ClusterControlProfile, telemetry: Metrics, - feed: ::Feed, node_control: Option>>, cluster_wait: Arc>, ) -> RuntimeAssembly { @@ -542,7 +521,6 @@ fn build_runtime_assembly( expectation_cooldown, cluster_control_profile, telemetry, - feed, ) .with_cluster_wait(cluster_wait); diff --git a/testing-framework/deployers/compose/src/lifecycle/block_feed.rs b/testing-framework/deployers/compose/src/lifecycle/block_feed.rs deleted file mode 100644 index a16dd84..0000000 --- a/testing-framework/deployers/compose/src/lifecycle/block_feed.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::time::Duration; - -use testing_framework_core::scenario::{ - Application, FeedRuntime, NodeClients, internal::FeedHandle, spawn_feed, -}; -use tokio::time::sleep; -use tracing::{debug, info, warn}; - -use crate::errors::ComposeRunnerError; - -const BLOCK_FEED_MAX_ATTEMPTS: usize = 5; -const BLOCK_FEED_RETRY_DELAY: Duration = Duration::from_secs(1); - -async fn spawn_block_feed_with( - node_clients: &NodeClients, -) -> Result< - ( - <::FeedRuntime as FeedRuntime>::Feed, - FeedHandle, - ), - ComposeRunnerError, -> { - let node_count = node_clients.len(); - debug!(nodes = node_count, "starting compose block feed"); - - if node_count == 0 { - return Err(ComposeRunnerError::BlockFeedMissing); - } - - spawn_feed::(node_clients.clone()) - .await - .map_err(|source| ComposeRunnerError::BlockFeed { source }) -} - -pub async fn spawn_block_feed_with_retry( - node_clients: &NodeClients, -) -> Result< - ( - <::FeedRuntime as FeedRuntime>::Feed, - FeedHandle, - ), - ComposeRunnerError, -> { - for attempt in 1..=BLOCK_FEED_MAX_ATTEMPTS { - info!(attempt, "starting block feed"); - match spawn_block_feed_with(node_clients).await { - Ok(result) => { - info!(attempt, "block feed established"); - return Ok(result); - } - - Err(error) => { - if attempt == BLOCK_FEED_MAX_ATTEMPTS { - return Err(error); - } - - warn!(attempt, "block feed initialization failed; retrying"); - sleep(BLOCK_FEED_RETRY_DELAY).await; - } - } - } - - unreachable!("retry loop always returns on success or final failure") -} diff --git a/testing-framework/deployers/compose/src/lifecycle/mod.rs b/testing-framework/deployers/compose/src/lifecycle/mod.rs index d3cf980..da6e0f6 100644 --- a/testing-framework/deployers/compose/src/lifecycle/mod.rs +++ b/testing-framework/deployers/compose/src/lifecycle/mod.rs @@ -1,3 +1,2 @@ -pub mod block_feed; pub mod cleanup; pub mod readiness; diff --git a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs index d7f76f0..8a4484b 100644 --- a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs @@ -6,13 +6,13 @@ use reqwest::Url; use testing_framework_core::{ scenario::{ Application, ClusterControlProfile, ClusterMode, ClusterWaitHandle, Deployer, DynError, - ExistingCluster, FeedRuntime, HttpReadinessRequirement, Metrics, MetricsError, NodeClients, + ExistingCluster, HttpReadinessRequirement, Metrics, MetricsError, NodeClients, ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, Runner, Scenario, internal::{ - ApplicationExternalProvider, CleanupGuard, FeedHandle, RuntimeAssembly, - SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, - build_source_orchestration_plan, orchestrate_sources_with_providers, + ApplicationExternalProvider, CleanupGuard, RuntimeAssembly, SourceOrchestrationPlan, + SourceProviders, StaticManagedProvider, build_source_orchestration_plan, + orchestrate_sources_with_providers, }, }, topology::DeploymentDescriptor, @@ -33,7 +33,7 @@ use crate::{ RemoteReadinessError, build_node_clients, collect_port_specs, ensure_cluster_readiness, kill_port_forwards, wait_for_ports_or_cleanup, }, - lifecycle::{block_feed::spawn_block_feed_with, cleanup::RunnerCleanup}, + lifecycle::cleanup::RunnerCleanup, wait::{ClusterReady, ClusterWaitError, PortForwardHandle}, }; @@ -113,8 +113,6 @@ pub enum K8sRunnerError { Telemetry(#[from] MetricsError), #[error("internal invariant violated: {message}")] InternalInvariant { message: String }, - #[error("k8s runner requires at least one node client for feed data")] - BlockFeedMissing, #[error("runtime preflight failed: no node clients available")] RuntimePreflight, #[error("source orchestration failed: {source}")] @@ -122,11 +120,6 @@ pub enum K8sRunnerError { #[source] source: DynError, }, - #[error("failed to initialize feed: {source}")] - BlockFeed { - #[source] - source: DynError, - }, } #[async_trait] @@ -156,8 +149,6 @@ impl From for K8sRunnerError { } } -type Feed = <::FeedRuntime as FeedRuntime>::Feed; - fn ensure_supported_topology( descriptors: &E::Deployment, ) -> Result<(), K8sRunnerError> { @@ -239,7 +230,6 @@ where ensure_non_empty_node_clients(&node_clients)?; let telemetry = observability.telemetry_handle()?; - let (feed, feed_task) = spawn_block_feed_with::(&node_clients).await?; let cluster_wait = attached_cluster_wait::(scenario, client)?; let context = RuntimeAssembly::new( scenario.deployment().clone(), @@ -248,11 +238,10 @@ where scenario.expectation_cooldown(), scenario.cluster_control_profile(), telemetry, - feed, ) .with_cluster_wait(cluster_wait); - Ok(context.build_runner(Some(Box::new(feed_task)))) + Ok(context.build_runner(None)) } fn existing_cluster_metadata(scenario: &Scenario) -> K8sDeploymentMetadata @@ -548,8 +537,6 @@ async fn build_node_clients_or_fail( struct RuntimeArtifacts { node_clients: NodeClients, telemetry: Metrics, - feed: Feed, - feed_task: FeedHandle, } fn build_runner_parts( @@ -566,10 +553,8 @@ fn build_runner_parts( scenario.expectation_cooldown(), scenario.cluster_control_profile(), runtime.telemetry, - runtime.feed, cluster_wait, ), - feed_task: runtime.feed_task, node_count, duration_secs: scenario.duration().as_secs(), } @@ -591,13 +576,9 @@ async fn build_runtime_artifacts( } let telemetry = build_telemetry_or_fail(cluster, observability).await?; - let (feed, feed_task) = spawn_block_feed_or_fail::(cluster, &node_clients).await?; - Ok(RuntimeArtifacts { node_clients, telemetry, - feed, - feed_task, }) } @@ -619,19 +600,6 @@ async fn build_telemetry_or_fail( } } -async fn spawn_block_feed_or_fail( - cluster: &mut Option, - node_clients: &NodeClients, -) -> Result<(Feed, FeedHandle), K8sRunnerError> { - match spawn_block_feed_with::(node_clients).await { - Ok(pair) => Ok(pair), - Err(err) => { - fail_cluster_with_log(cluster, "failed to initialize block feed", &err).await; - Err(err) - } - } -} - async fn fail_cluster_with_log( cluster: &mut Option, reason: &str, @@ -660,7 +628,6 @@ fn maybe_print_endpoints( struct K8sRunnerParts { assembly: RuntimeAssembly, - feed_task: FeedHandle, node_count: usize, duration_secs: u64, } @@ -674,13 +641,12 @@ fn finalize_runner( let K8sRunnerParts { assembly, - feed_task, node_count, duration_secs, } = parts; let cleanup_guard: Box = - Box::new(K8sCleanupGuard::new(cleanup, feed_task, port_forwards)); + Box::new(K8sCleanupGuard::new(cleanup, port_forwards)); info!( nodes = node_count, @@ -707,7 +673,6 @@ fn build_k8s_runtime_assembly( expectation_cooldown: Duration, cluster_control_profile: ClusterControlProfile, telemetry: Metrics, - feed: Feed, cluster_wait: Arc>, ) -> RuntimeAssembly { RuntimeAssembly::new( @@ -717,7 +682,6 @@ fn build_k8s_runtime_assembly( expectation_cooldown, cluster_control_profile, telemetry, - feed, ) .with_cluster_wait(cluster_wait) } @@ -781,19 +745,13 @@ fn log_k8s_deploy_start( struct K8sCleanupGuard { cleanup: RunnerCleanup, - feed_task: Option, port_forwards: Vec, } impl K8sCleanupGuard { - const fn new( - cleanup: RunnerCleanup, - feed_task: FeedHandle, - port_forwards: Vec, - ) -> Self { + const fn new(cleanup: RunnerCleanup, port_forwards: Vec) -> Self { Self { cleanup, - feed_task: Some(feed_task), port_forwards, } } @@ -801,9 +759,6 @@ impl K8sCleanupGuard { impl CleanupGuard for K8sCleanupGuard { fn cleanup(mut self: Box) { - if let Some(feed_task) = self.feed_task.take() { - CleanupGuard::cleanup(Box::new(feed_task)); - } kill_port_forwards(&mut self.port_forwards); CleanupGuard::cleanup(Box::new(self.cleanup)); } diff --git a/testing-framework/deployers/k8s/src/lifecycle/block_feed.rs b/testing-framework/deployers/k8s/src/lifecycle/block_feed.rs deleted file mode 100644 index 527a285..0000000 --- a/testing-framework/deployers/k8s/src/lifecycle/block_feed.rs +++ /dev/null @@ -1,28 +0,0 @@ -use testing_framework_core::scenario::{ - Application, FeedRuntime, NodeClients, internal::FeedHandle, spawn_feed, -}; -use tracing::{debug, info}; - -use crate::deployer::K8sRunnerError; - -pub async fn spawn_block_feed_with( - node_clients: &NodeClients, -) -> Result< - ( - <::FeedRuntime as FeedRuntime>::Feed, - FeedHandle, - ), - K8sRunnerError, -> { - let node_count = node_clients.len(); - debug!(nodes = node_count, "starting k8s block feed"); - - if node_count == 0 { - return Err(K8sRunnerError::BlockFeedMissing); - } - - info!("starting block feed"); - spawn_feed::(node_clients.clone()) - .await - .map_err(|source| K8sRunnerError::BlockFeed { source }) -} diff --git a/testing-framework/deployers/k8s/src/lifecycle/mod.rs b/testing-framework/deployers/k8s/src/lifecycle/mod.rs index 4354505..cb18d52 100644 --- a/testing-framework/deployers/k8s/src/lifecycle/mod.rs +++ b/testing-framework/deployers/k8s/src/lifecycle/mod.rs @@ -1,4 +1,3 @@ -pub mod block_feed; pub mod cleanup; pub mod logs; pub mod wait; diff --git a/testing-framework/deployers/k8s/src/manual.rs b/testing-framework/deployers/k8s/src/manual.rs index 6791a6d..400fe0f 100644 --- a/testing-framework/deployers/k8s/src/manual.rs +++ b/testing-framework/deployers/k8s/src/manual.rs @@ -645,10 +645,7 @@ fn block_on_best_effort(fut: impl std::future::Future Result { Ok(access.api_base_url()?.to_string()) } - - async fn prepare_feed( - _node_clients: NodeClients, - ) -> Result< - ( - testing_framework_core::scenario::DefaultFeed, - Self::FeedRuntime, - ), - DynError, - > { - default_feed_result() - } } #[async_trait::async_trait] diff --git a/testing-framework/deployers/local/src/deployer/orchestrator.rs b/testing-framework/deployers/local/src/deployer/orchestrator.rs index c17f54f..446f198 100644 --- a/testing-framework/deployers/local/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/local/src/deployer/orchestrator.rs @@ -11,13 +11,11 @@ use async_trait::async_trait; use testing_framework_core::{ scenario::{ Application, ClusterControlProfile, ClusterMode, Deployer, DeploymentPolicy, DynError, - FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlCapability, - NodeControlHandle, RetryPolicy, Runner, Scenario, ScenarioError, + HttpReadinessRequirement, Metrics, NodeClients, NodeControlCapability, NodeControlHandle, + RetryPolicy, Runner, Scenario, ScenarioError, internal::{ - CleanupGuard, FeedHandle, RuntimeAssembly, SourceOrchestrationPlan, - build_source_orchestration_plan, + CleanupGuard, RuntimeAssembly, SourceOrchestrationPlan, build_source_orchestration_plan, }, - spawn_feed, }, topology::DeploymentDescriptor, }; @@ -42,23 +40,16 @@ const READINESS_BACKOFF_MAX_SECS: u64 = 2; struct LocalProcessGuard { nodes: Vec>, - feed_task: Option, } impl LocalProcessGuard { - fn new(nodes: Vec>, feed_task: FeedHandle) -> Self { - Self { - nodes, - feed_task: Some(feed_task), - } + fn new(nodes: Vec>) -> Self { + Self { nodes } } } impl CleanupGuard for LocalProcessGuard { - fn cleanup(mut self: Box) { - if let Some(feed_task) = self.feed_task.take() { - CleanupGuard::cleanup(Box::new(feed_task)); - } + fn cleanup(self: Box) { // Nodes own local processes; dropping them stops the processes. drop(self.nodes); } @@ -222,8 +213,7 @@ impl ProcessDeployer { ) .await?; - let cleanup_guard: Box = - Box::new(LocalProcessGuard::::new(nodes, runtime.feed_task)); + let cleanup_guard: Box = Box::new(LocalProcessGuard::::new(nodes)); Ok(runtime.assembly.build_runner(Some(cleanup_guard))) } @@ -262,9 +252,7 @@ impl ProcessDeployer { ) .await?; - Ok(runtime - .assembly - .build_runner(Some(Box::new(runtime.feed_task)))) + Ok(runtime.assembly.build_runner(None)) } fn node_control_from( @@ -489,29 +477,6 @@ fn keep_tempdir(policy: DeploymentPolicy) -> bool { policy.cleanup_policy.preserve_artifacts || keep_tempdir_from_env() } -async fn spawn_feed_with( - node_clients: &NodeClients, -) -> Result<(::Feed, FeedHandle), ProcessDeployerError> { - let node_count = node_clients.len(); - debug!(nodes = node_count, "starting local feed"); - - if node_count == 0 { - return Err(ProcessDeployerError::WorkloadFailed { - source: "feed requires at least one node".into(), - }); - } - - info!("starting feed"); - - spawn_feed::(node_clients.clone()) - .await - .map_err(workload_error) -} - -fn workload_error(source: DynError) -> ProcessDeployerError { - ProcessDeployerError::WorkloadFailed { source } -} - fn log_local_deploy_start(node_count: usize, policy: DeploymentPolicy, has_node_control: bool) { info!( nodes = node_count, @@ -524,7 +489,6 @@ fn log_local_deploy_start(node_count: usize, policy: DeploymentPolicy, has_node_ struct RuntimeContext { assembly: RuntimeAssembly, - feed_task: FeedHandle, } async fn run_context_for( @@ -539,7 +503,6 @@ async fn run_context_for( return Err(ProcessDeployerError::RuntimePreflight); } - let (feed, feed_task) = spawn_feed_with::(&node_clients).await?; let mut assembly = RuntimeAssembly::new( descriptors, node_clients, @@ -547,14 +510,10 @@ async fn run_context_for( expectation_cooldown, cluster_control_profile, Metrics::empty(), - feed, ); if let Some(node_control) = node_control { assembly = assembly.with_node_control(node_control); } - Ok(RuntimeContext { - assembly, - feed_task, - }) + Ok(RuntimeContext { assembly }) } diff --git a/testing-framework/deployers/local/src/env/tests.rs b/testing-framework/deployers/local/src/env/tests.rs index 9d1769b..756a9d1 100644 --- a/testing-framework/deployers/local/src/env/tests.rs +++ b/testing-framework/deployers/local/src/env/tests.rs @@ -1,7 +1,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use testing_framework_core::{ - scenario::{Application, DynError, Feed, FeedRuntime, HttpReadinessRequirement, NodeClients}, + scenario::{Application, DynError, HttpReadinessRequirement}, topology::DeploymentDescriptor, }; @@ -9,25 +9,6 @@ use super::*; static STABLE_CALLS: AtomicUsize = AtomicUsize::new(0); -#[derive(Clone, Default)] -struct DummyFeed; - -impl Feed for DummyFeed { - type Subscription = (); - - fn subscribe(&self) -> Self::Subscription {} -} - -#[derive(Default)] -struct DummyFeedRuntime; - -#[async_trait::async_trait] -impl FeedRuntime for DummyFeedRuntime { - type Feed = DummyFeed; - - async fn run(self: Box) {} -} - #[derive(Clone)] struct DummyConfig; @@ -47,13 +28,6 @@ impl Application for DummyEnv { type Deployment = DummyTopology; type NodeClient = (); type NodeConfig = DummyConfig; - type FeedRuntime = DummyFeedRuntime; - - async fn prepare_feed( - _node_clients: NodeClients, - ) -> Result<(::Feed, Self::FeedRuntime), DynError> { - Ok((DummyFeed, DummyFeedRuntime)) - } } #[async_trait::async_trait]