refactor(core): remove legacy feed runtime

This commit is contained in:
andrussal 2026-04-11 10:10:36 +02:00
parent 23d4bf2d07
commit 7439f4799a
18 changed files with 42 additions and 436 deletions

View File

@ -3,10 +3,7 @@ use std::io;
use async_trait::async_trait;
use crate::{
scenario::{
DefaultFeed, DefaultFeedRuntime, DynError, ExternalNodeSource, FeedRuntime, NodeAccess,
NodeClients,
},
scenario::{DynError, ExternalNodeSource, NodeAccess},
topology::DeploymentDescriptor,
};
@ -19,8 +16,6 @@ pub trait Application: Send + Sync + 'static {
type NodeConfig: Clone + Send + Sync + 'static;
type FeedRuntime: FeedRuntime;
/// Build an application node client from a static external source.
///
/// Environments that support external nodes should override this.
@ -37,14 +32,4 @@ pub trait Application: Send + Sync + 'static {
fn node_readiness_path() -> &'static str {
"/"
}
async fn prepare_feed(
_node_clients: NodeClients<Self>,
) -> Result<(<Self::FeedRuntime as FeedRuntime>::Feed, Self::FeedRuntime), DynError>
where
Self: Sized,
{
let _ = (DefaultFeed::default(), DefaultFeedRuntime::default());
Ok((Default::default(), Default::default()))
}
}

View File

@ -294,7 +294,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::scenario::{Application, DefaultFeed, DefaultFeedRuntime, NodeAccess, NodeClients};
use crate::scenario::{Application, NodeAccess};
struct DummyClusterApp;
@ -303,19 +303,12 @@ mod tests {
type Deployment = crate::topology::ClusterTopology;
type NodeClient = String;
type NodeConfig = String;
type FeedRuntime = DefaultFeedRuntime;
fn build_node_client(
_access: &NodeAccess,
) -> Result<Self::NodeClient, crate::scenario::DynError> {
Ok("client".to_owned())
}
async fn prepare_feed(
_node_clients: NodeClients<Self>,
) -> Result<(DefaultFeed, Self::FeedRuntime), crate::scenario::DynError> {
crate::scenario::default_feed_result()
}
}
impl ClusterNodeConfigApplication for DummyClusterApp {

View File

@ -7,7 +7,7 @@ pub use super::definition::{
#[doc(hidden)]
pub use super::runtime::{
ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode, CleanupGuard,
FeedHandle, ManagedSource, RuntimeAssembly, SourceOrchestrationPlan, SourceProviders,
ManagedSource, RuntimeAssembly, SourceOrchestrationPlan, SourceProviders,
StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources,
orchestrate_sources_with_providers, resolve_sources,
};

View File

@ -13,7 +13,6 @@ mod definition;
mod deployment_policy;
mod expectation;
pub mod internal;
mod noop;
mod observability;
mod runtime;
mod sources;
@ -35,16 +34,15 @@ pub use control::{ClusterWaitHandle, NodeControlHandle};
pub use definition::{Scenario, ScenarioBuildError, ScenarioBuilder};
pub use deployment_policy::{CleanupPolicy, DeploymentPolicy, RetryPolicy};
pub use expectation::Expectation;
pub use noop::{DefaultFeed, DefaultFeedRuntime, default_feed_result};
pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs};
pub use runtime::{
Deployer, Feed, FeedRuntime, HttpReadinessRequirement, NodeClients, ReadinessError, RunContext,
RunHandle, RunMetrics, Runner, ScenarioError, StabilizationConfig,
Deployer, HttpReadinessRequirement, NodeClients, ReadinessError, RunContext, RunHandle,
RunMetrics, Runner, ScenarioError, StabilizationConfig,
metrics::{
CONSENSUS_PROCESSED_BLOCKS, CONSENSUS_TRANSACTIONS_TOTAL, Metrics, MetricsError,
PrometheusEndpoint, PrometheusInstantSample,
},
spawn_feed, wait_for_http_ports, wait_for_http_ports_with_host,
wait_for_http_ports, wait_for_http_ports_with_host,
wait_for_http_ports_with_host_and_requirement, wait_for_http_ports_with_requirement,
wait_http_readiness, wait_until_stable,
};

View File

@ -1,26 +0,0 @@
use async_trait::async_trait;
use super::{DynError, Feed, FeedRuntime};
#[derive(Clone, Default)]
pub struct DefaultFeed;
impl Feed for DefaultFeed {
type Subscription = ();
fn subscribe(&self) -> Self::Subscription {}
}
#[derive(Default)]
pub struct DefaultFeedRuntime;
#[async_trait]
impl FeedRuntime for DefaultFeedRuntime {
type Feed = DefaultFeed;
async fn run(self: Box<Self>) {}
}
pub fn default_feed_result() -> Result<(DefaultFeed, DefaultFeedRuntime), DynError> {
Ok((DefaultFeed, DefaultFeedRuntime))
}

View File

@ -19,7 +19,6 @@ pub struct RunContext<E: Application> {
expectation_cooldown: Duration,
cluster_control_profile: ClusterControlProfile,
telemetry: Metrics,
feed: <E::FeedRuntime as super::FeedRuntime>::Feed,
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
cluster_wait: Option<Arc<dyn ClusterWaitHandle<E>>>,
}
@ -33,7 +32,6 @@ pub struct RuntimeAssembly<E: Application> {
expectation_cooldown: Duration,
cluster_control_profile: ClusterControlProfile,
telemetry: Metrics,
feed: <E::FeedRuntime as super::FeedRuntime>::Feed,
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
cluster_wait: Option<Arc<dyn ClusterWaitHandle<E>>>,
}
@ -48,7 +46,6 @@ impl<E: Application> RunContext<E> {
expectation_cooldown: Duration,
cluster_control_profile: ClusterControlProfile,
telemetry: Metrics,
feed: <E::FeedRuntime as super::FeedRuntime>::Feed,
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
) -> Self {
let metrics = RunMetrics::new(run_duration);
@ -60,7 +57,6 @@ impl<E: Application> RunContext<E> {
expectation_cooldown,
cluster_control_profile,
telemetry,
feed,
node_control,
cluster_wait: None,
}
@ -87,11 +83,6 @@ impl<E: Application> RunContext<E> {
self.node_clients.random_client()
}
#[must_use]
pub fn feed(&self) -> <E::FeedRuntime as super::FeedRuntime>::Feed {
self.feed.clone()
}
#[must_use]
pub const fn telemetry(&self) -> &Metrics {
&self.telemetry
@ -143,7 +134,6 @@ impl<E: Application> RuntimeAssembly<E> {
expectation_cooldown: Duration,
cluster_control_profile: ClusterControlProfile,
telemetry: Metrics,
feed: <E::FeedRuntime as super::FeedRuntime>::Feed,
) -> Self {
Self {
descriptors,
@ -152,7 +142,6 @@ impl<E: Application> RuntimeAssembly<E> {
expectation_cooldown,
cluster_control_profile,
telemetry,
feed,
node_control: None,
cluster_wait: None,
}
@ -179,7 +168,6 @@ impl<E: Application> RuntimeAssembly<E> {
self.expectation_cooldown,
self.cluster_control_profile,
self.telemetry,
self.feed,
self.node_control,
);
@ -204,7 +192,6 @@ impl<E: Application> From<RunContext<E>> for RuntimeAssembly<E> {
expectation_cooldown: context.expectation_cooldown,
cluster_control_profile: context.cluster_control_profile,
telemetry: context.telemetry,
feed: context.feed,
node_control: context.node_control,
cluster_wait: context.cluster_wait,
}

View File

@ -7,7 +7,6 @@ mod node_clients;
pub mod readiness;
mod runner;
use async_trait::async_trait;
pub use context::{CleanupGuard, RunContext, RunHandle, RunMetrics, RuntimeAssembly};
pub use deployer::{Deployer, ScenarioError};
#[doc(hidden)]
@ -24,51 +23,3 @@ pub use readiness::{
wait_for_http_ports_with_requirement, wait_http_readiness, wait_until_stable,
};
pub use runner::Runner;
use tokio::task::JoinHandle;
use crate::{env::Application, scenario::DynError};
/// Cloneable feed handle exposed to workloads and expectations.
pub trait Feed: Clone + Default + Send + Sync + 'static {
type Subscription: Send + 'static;
fn subscribe(&self) -> Self::Subscription;
}
/// Background worker driving a cluster feed.
#[async_trait]
pub trait FeedRuntime: Default + Send + 'static {
type Feed: Feed;
async fn run(self: Box<Self>);
}
/// Cleanup guard for a spawned feed worker.
pub struct FeedHandle {
handle: JoinHandle<()>,
}
impl FeedHandle {
pub const fn new(handle: JoinHandle<()>) -> Self {
Self { handle }
}
}
impl CleanupGuard for FeedHandle {
fn cleanup(self: Box<Self>) {
self.handle.abort();
}
}
/// Spawn a background task that drives the environment-provided feed.
pub async fn spawn_feed<E: Application>(
node_clients: NodeClients<E>,
) -> Result<(<E::FeedRuntime as FeedRuntime>::Feed, FeedHandle), DynError> {
let (feed, worker) = E::prepare_feed(node_clients).await?;
let handle = tokio::spawn(async move {
Box::new(worker).run().await;
});
Ok((feed, FeedHandle::new(handle)))
}

View File

@ -1,17 +1,13 @@
use std::{fmt::Debug, marker::PhantomData};
use testing_framework_core::scenario::{
Application, FeedRuntime, NodeClients, internal::FeedHandle,
};
use tracing::{info, warn};
use testing_framework_core::scenario::NodeClients;
use tracing::warn;
use crate::{
env::ComposeDeployEnv,
errors::ComposeRunnerError,
infrastructure::{environment::StackEnvironment, ports::HostPortMapping},
lifecycle::{
block_feed::spawn_block_feed_with_retry, readiness::build_node_clients_with_ports,
},
lifecycle::readiness::build_node_clients_with_ports,
};
pub struct ClientBuilder<E: ComposeDeployEnv> {
@ -39,29 +35,6 @@ impl<E: ComposeDeployEnv> ClientBuilder<E> {
)
.await
}
pub async fn start_block_feed(
&self,
node_clients: &NodeClients<E>,
environment: &mut StackEnvironment,
) -> Result<
(
<<E as Application>::FeedRuntime as FeedRuntime>::Feed,
FeedHandle,
),
ComposeRunnerError,
> {
let pair = ensure_step(
environment,
spawn_block_feed_with_retry::<E>(node_clients).await,
"failed to initialize block feed",
"block feed initialization failed",
)
.await?;
info!("block feed connected to node");
Ok(pair)
}
}
async fn ensure_step<T, E>(

View File

@ -10,8 +10,7 @@ use std::marker::PhantomData;
use async_trait::async_trait;
use testing_framework_core::scenario::{
Deployer, DynError, ExistingCluster, IntoExistingCluster, ObservabilityCapabilityProvider,
RequiresNodeControl, Runner, Scenario,
internal::{CleanupGuard, FeedHandle},
RequiresNodeControl, Runner, Scenario, internal::CleanupGuard,
};
use crate::{env::ComposeDeployEnv, errors::ComposeRunnerError, lifecycle::cleanup::RunnerCleanup};
@ -176,32 +175,22 @@ where
pub(super) struct ComposeCleanupGuard {
environment: RunnerCleanup,
block_feed: Option<FeedHandle>,
}
impl ComposeCleanupGuard {
const fn new(environment: RunnerCleanup, block_feed: FeedHandle) -> Self {
Self {
environment,
block_feed: Some(block_feed),
}
const fn new(environment: RunnerCleanup) -> Self {
Self { environment }
}
}
impl CleanupGuard for ComposeCleanupGuard {
fn cleanup(mut self: Box<Self>) {
if let Some(block_feed) = self.block_feed.take() {
CleanupGuard::cleanup(Box::new(block_feed));
}
fn cleanup(self: Box<Self>) {
CleanupGuard::cleanup(Box::new(self.environment));
}
}
pub(super) fn make_cleanup_guard(
environment: RunnerCleanup,
block_feed: FeedHandle,
) -> Box<dyn CleanupGuard> {
Box::new(ComposeCleanupGuard::new(environment, block_feed))
pub(super) fn make_cleanup_guard(environment: RunnerCleanup) -> Box<dyn CleanupGuard> {
Box::new(ComposeCleanupGuard::new(environment))
}
#[cfg(test)]

View File

@ -4,13 +4,13 @@ use reqwest::Url;
use testing_framework_core::{
scenario::{
Application, ClusterControlProfile, ClusterMode, ClusterWaitHandle, DeploymentPolicy,
DynError, ExistingCluster, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients,
DynError, ExistingCluster, HttpReadinessRequirement, Metrics, NodeClients,
NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs,
RequiresNodeControl, Runner, Scenario,
internal::{
ApplicationExternalProvider, CleanupGuard, FeedHandle, RuntimeAssembly,
SourceOrchestrationPlan, SourceProviders, StaticManagedProvider,
build_source_orchestration_plan, orchestrate_sources_with_providers,
ApplicationExternalProvider, RuntimeAssembly, SourceOrchestrationPlan, SourceProviders,
StaticManagedProvider, build_source_orchestration_plan,
orchestrate_sources_with_providers,
},
},
topology::DeploymentDescriptor,
@ -34,7 +34,6 @@ use crate::{
environment::StackEnvironment,
ports::{HostPortMapping, compose_runner_host},
},
lifecycle::block_feed::spawn_block_feed_with_retry,
};
const PRINT_ENDPOINTS_ENV: &str = "TESTNET_PRINT_ENDPOINTS";
@ -172,7 +171,6 @@ where
let node_control = self.attached_node_control::<Caps>(scenario)?;
let cluster_wait = self.attached_cluster_wait(scenario)?;
let (feed, feed_task) = spawn_block_feed_with_retry::<E>(&node_clients).await?;
let assembly = build_runtime_assembly(
scenario.deployment().clone(),
node_clients,
@ -180,13 +178,11 @@ where
scenario.expectation_cooldown(),
scenario.cluster_control_profile(),
observability.telemetry_handle()?,
feed,
node_control,
cluster_wait,
);
let cleanup_guard: Box<dyn CleanupGuard> = Box::new(feed_task);
Ok(assembly.build_runner(Some(cleanup_guard)))
Ok(assembly.build_runner(None))
}
fn source_providers(&self, managed_clients: Vec<E::NodeClient>) -> SourceProviders<E> {
@ -262,7 +258,7 @@ where
async fn build_runner<Caps>(
&self,
scenario: &Scenario<E, Caps>,
mut prepared: PreparedDeployment<E>,
prepared: PreparedDeployment<E>,
deployed: DeployedNodes<E>,
observability: ObservabilityInputs,
readiness_enabled: bool,
@ -287,13 +283,11 @@ where
expectation_cooldown: scenario.expectation_cooldown(),
cluster_control_profile: scenario.cluster_control_profile(),
telemetry,
environment: &mut prepared.environment,
node_control,
cluster_wait,
};
let runtime = build_compose_runtime::<E>(input).await?;
let cleanup_guard =
make_cleanup_guard(prepared.environment.into_cleanup()?, runtime.feed_task);
let cleanup_guard = make_cleanup_guard(prepared.environment.into_cleanup()?);
info!(
effective_readiness = readiness_enabled,
@ -442,12 +436,10 @@ struct DeployedNodes<E: ComposeDeployEnv> {
host_ports: HostPortMapping,
host: String,
node_clients: NodeClients<E>,
client_builder: ClientBuilder<E>,
}
struct ComposeRuntime<E: ComposeDeployEnv> {
assembly: RuntimeAssembly<E>,
feed_task: FeedHandle,
}
struct RuntimeBuildInput<'a, E: ComposeDeployEnv> {
@ -457,7 +449,6 @@ struct RuntimeBuildInput<'a, E: ComposeDeployEnv> {
expectation_cooldown: Duration,
cluster_control_profile: ClusterControlProfile,
telemetry: Metrics,
environment: &'a mut StackEnvironment,
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
}
@ -470,12 +461,6 @@ async fn build_compose_runtime<E: ComposeDeployEnv>(
return Err(ComposeRunnerError::RuntimePreflight);
}
let (feed, feed_task) = input
.deployed
.client_builder
.start_block_feed(&node_clients, input.environment)
.await?;
let assembly = build_runtime_assembly(
input.descriptors,
node_clients,
@ -483,15 +468,11 @@ async fn build_compose_runtime<E: ComposeDeployEnv>(
input.expectation_cooldown,
input.cluster_control_profile,
input.telemetry,
feed,
input.node_control,
input.cluster_wait,
);
Ok(ComposeRuntime {
assembly,
feed_task,
})
Ok(ComposeRuntime { assembly })
}
async fn deploy_nodes<E: ComposeDeployEnv>(
@ -520,7 +501,6 @@ async fn deploy_nodes<E: ComposeDeployEnv>(
host_ports,
host,
node_clients,
client_builder,
})
}
@ -531,7 +511,6 @@ fn build_runtime_assembly<E: ComposeDeployEnv>(
expectation_cooldown: Duration,
cluster_control_profile: ClusterControlProfile,
telemetry: Metrics,
feed: <E::FeedRuntime as FeedRuntime>::Feed,
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
) -> RuntimeAssembly<E> {
@ -542,7 +521,6 @@ fn build_runtime_assembly<E: ComposeDeployEnv>(
expectation_cooldown,
cluster_control_profile,
telemetry,
feed,
)
.with_cluster_wait(cluster_wait);

View File

@ -1,64 +0,0 @@
use std::time::Duration;
use testing_framework_core::scenario::{
Application, FeedRuntime, NodeClients, internal::FeedHandle, spawn_feed,
};
use tokio::time::sleep;
use tracing::{debug, info, warn};
use crate::errors::ComposeRunnerError;
const BLOCK_FEED_MAX_ATTEMPTS: usize = 5;
const BLOCK_FEED_RETRY_DELAY: Duration = Duration::from_secs(1);
async fn spawn_block_feed_with<E: Application>(
node_clients: &NodeClients<E>,
) -> Result<
(
<<E as Application>::FeedRuntime as FeedRuntime>::Feed,
FeedHandle,
),
ComposeRunnerError,
> {
let node_count = node_clients.len();
debug!(nodes = node_count, "starting compose block feed");
if node_count == 0 {
return Err(ComposeRunnerError::BlockFeedMissing);
}
spawn_feed::<E>(node_clients.clone())
.await
.map_err(|source| ComposeRunnerError::BlockFeed { source })
}
pub async fn spawn_block_feed_with_retry<E: Application>(
node_clients: &NodeClients<E>,
) -> Result<
(
<<E as Application>::FeedRuntime as FeedRuntime>::Feed,
FeedHandle,
),
ComposeRunnerError,
> {
for attempt in 1..=BLOCK_FEED_MAX_ATTEMPTS {
info!(attempt, "starting block feed");
match spawn_block_feed_with(node_clients).await {
Ok(result) => {
info!(attempt, "block feed established");
return Ok(result);
}
Err(error) => {
if attempt == BLOCK_FEED_MAX_ATTEMPTS {
return Err(error);
}
warn!(attempt, "block feed initialization failed; retrying");
sleep(BLOCK_FEED_RETRY_DELAY).await;
}
}
}
unreachable!("retry loop always returns on success or final failure")
}

View File

@ -1,3 +1,2 @@
pub mod block_feed;
pub mod cleanup;
pub mod readiness;

View File

@ -6,13 +6,13 @@ use reqwest::Url;
use testing_framework_core::{
scenario::{
Application, ClusterControlProfile, ClusterMode, ClusterWaitHandle, Deployer, DynError,
ExistingCluster, FeedRuntime, HttpReadinessRequirement, Metrics, MetricsError, NodeClients,
ExistingCluster, HttpReadinessRequirement, Metrics, MetricsError, NodeClients,
ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, Runner,
Scenario,
internal::{
ApplicationExternalProvider, CleanupGuard, FeedHandle, RuntimeAssembly,
SourceOrchestrationPlan, SourceProviders, StaticManagedProvider,
build_source_orchestration_plan, orchestrate_sources_with_providers,
ApplicationExternalProvider, CleanupGuard, RuntimeAssembly, SourceOrchestrationPlan,
SourceProviders, StaticManagedProvider, build_source_orchestration_plan,
orchestrate_sources_with_providers,
},
},
topology::DeploymentDescriptor,
@ -33,7 +33,7 @@ use crate::{
RemoteReadinessError, build_node_clients, collect_port_specs, ensure_cluster_readiness,
kill_port_forwards, wait_for_ports_or_cleanup,
},
lifecycle::{block_feed::spawn_block_feed_with, cleanup::RunnerCleanup},
lifecycle::cleanup::RunnerCleanup,
wait::{ClusterReady, ClusterWaitError, PortForwardHandle},
};
@ -113,8 +113,6 @@ pub enum K8sRunnerError {
Telemetry(#[from] MetricsError),
#[error("internal invariant violated: {message}")]
InternalInvariant { message: String },
#[error("k8s runner requires at least one node client for feed data")]
BlockFeedMissing,
#[error("runtime preflight failed: no node clients available")]
RuntimePreflight,
#[error("source orchestration failed: {source}")]
@ -122,11 +120,6 @@ pub enum K8sRunnerError {
#[source]
source: DynError,
},
#[error("failed to initialize feed: {source}")]
BlockFeed {
#[source]
source: DynError,
},
}
#[async_trait]
@ -156,8 +149,6 @@ impl From<ClusterWaitError> for K8sRunnerError {
}
}
type Feed<E> = <<E as Application>::FeedRuntime as FeedRuntime>::Feed;
fn ensure_supported_topology<E: K8sDeployEnv>(
descriptors: &E::Deployment,
) -> Result<(), K8sRunnerError> {
@ -239,7 +230,6 @@ where
ensure_non_empty_node_clients(&node_clients)?;
let telemetry = observability.telemetry_handle()?;
let (feed, feed_task) = spawn_block_feed_with::<E>(&node_clients).await?;
let cluster_wait = attached_cluster_wait::<E, Caps>(scenario, client)?;
let context = RuntimeAssembly::new(
scenario.deployment().clone(),
@ -248,11 +238,10 @@ where
scenario.expectation_cooldown(),
scenario.cluster_control_profile(),
telemetry,
feed,
)
.with_cluster_wait(cluster_wait);
Ok(context.build_runner(Some(Box::new(feed_task))))
Ok(context.build_runner(None))
}
fn existing_cluster_metadata<E, Caps>(scenario: &Scenario<E, Caps>) -> K8sDeploymentMetadata
@ -548,8 +537,6 @@ async fn build_node_clients_or_fail<E: K8sDeployEnv>(
struct RuntimeArtifacts<E: K8sDeployEnv> {
node_clients: NodeClients<E>,
telemetry: Metrics,
feed: Feed<E>,
feed_task: FeedHandle,
}
fn build_runner_parts<E: K8sDeployEnv, Caps>(
@ -566,10 +553,8 @@ fn build_runner_parts<E: K8sDeployEnv, Caps>(
scenario.expectation_cooldown(),
scenario.cluster_control_profile(),
runtime.telemetry,
runtime.feed,
cluster_wait,
),
feed_task: runtime.feed_task,
node_count,
duration_secs: scenario.duration().as_secs(),
}
@ -591,13 +576,9 @@ async fn build_runtime_artifacts<E: K8sDeployEnv>(
}
let telemetry = build_telemetry_or_fail(cluster, observability).await?;
let (feed, feed_task) = spawn_block_feed_or_fail::<E>(cluster, &node_clients).await?;
Ok(RuntimeArtifacts {
node_clients,
telemetry,
feed,
feed_task,
})
}
@ -619,19 +600,6 @@ async fn build_telemetry_or_fail(
}
}
async fn spawn_block_feed_or_fail<E: K8sDeployEnv>(
cluster: &mut Option<ClusterEnvironment>,
node_clients: &NodeClients<E>,
) -> Result<(Feed<E>, FeedHandle), K8sRunnerError> {
match spawn_block_feed_with::<E>(node_clients).await {
Ok(pair) => Ok(pair),
Err(err) => {
fail_cluster_with_log(cluster, "failed to initialize block feed", &err).await;
Err(err)
}
}
}
async fn fail_cluster_with_log<ErrorValue: Debug>(
cluster: &mut Option<ClusterEnvironment>,
reason: &str,
@ -660,7 +628,6 @@ fn maybe_print_endpoints<E: K8sDeployEnv>(
struct K8sRunnerParts<E: K8sDeployEnv> {
assembly: RuntimeAssembly<E>,
feed_task: FeedHandle,
node_count: usize,
duration_secs: u64,
}
@ -674,13 +641,12 @@ fn finalize_runner<E: K8sDeployEnv>(
let K8sRunnerParts {
assembly,
feed_task,
node_count,
duration_secs,
} = parts;
let cleanup_guard: Box<dyn CleanupGuard> =
Box::new(K8sCleanupGuard::new(cleanup, feed_task, port_forwards));
Box::new(K8sCleanupGuard::new(cleanup, port_forwards));
info!(
nodes = node_count,
@ -707,7 +673,6 @@ fn build_k8s_runtime_assembly<E: K8sDeployEnv>(
expectation_cooldown: Duration,
cluster_control_profile: ClusterControlProfile,
telemetry: Metrics,
feed: Feed<E>,
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
) -> RuntimeAssembly<E> {
RuntimeAssembly::new(
@ -717,7 +682,6 @@ fn build_k8s_runtime_assembly<E: K8sDeployEnv>(
expectation_cooldown,
cluster_control_profile,
telemetry,
feed,
)
.with_cluster_wait(cluster_wait)
}
@ -781,19 +745,13 @@ fn log_k8s_deploy_start<E>(
struct K8sCleanupGuard {
cleanup: RunnerCleanup,
feed_task: Option<FeedHandle>,
port_forwards: Vec<PortForwardHandle>,
}
impl K8sCleanupGuard {
const fn new(
cleanup: RunnerCleanup,
feed_task: FeedHandle,
port_forwards: Vec<PortForwardHandle>,
) -> Self {
const fn new(cleanup: RunnerCleanup, port_forwards: Vec<PortForwardHandle>) -> Self {
Self {
cleanup,
feed_task: Some(feed_task),
port_forwards,
}
}
@ -801,9 +759,6 @@ impl K8sCleanupGuard {
impl CleanupGuard for K8sCleanupGuard {
fn cleanup(mut self: Box<Self>) {
if let Some(feed_task) = self.feed_task.take() {
CleanupGuard::cleanup(Box::new(feed_task));
}
kill_port_forwards(&mut self.port_forwards);
CleanupGuard::cleanup(Box::new(self.cleanup));
}

View File

@ -1,28 +0,0 @@
use testing_framework_core::scenario::{
Application, FeedRuntime, NodeClients, internal::FeedHandle, spawn_feed,
};
use tracing::{debug, info};
use crate::deployer::K8sRunnerError;
pub async fn spawn_block_feed_with<E: Application>(
node_clients: &NodeClients<E>,
) -> Result<
(
<<E as Application>::FeedRuntime as FeedRuntime>::Feed,
FeedHandle,
),
K8sRunnerError,
> {
let node_count = node_clients.len();
debug!(nodes = node_count, "starting k8s block feed");
if node_count == 0 {
return Err(K8sRunnerError::BlockFeedMissing);
}
info!("starting block feed");
spawn_feed::<E>(node_clients.clone())
.await
.map_err(|source| K8sRunnerError::BlockFeed { source })
}

View File

@ -1,4 +1,3 @@
pub mod block_feed;
pub mod cleanup;
pub mod logs;
pub mod wait;

View File

@ -645,10 +645,7 @@ fn block_on_best_effort(fut: impl std::future::Future<Output = Result<(), Manual
mod tests {
use testing_framework_core::{
cfgsync::{StaticNodeConfigProvider, build_node_artifact_override},
scenario::{
Application, DefaultFeedRuntime, NodeAccess, NodeClients, PeerSelection,
default_feed_result,
},
scenario::{Application, NodeAccess, PeerSelection},
};
use super::*;
@ -663,23 +660,10 @@ mod tests {
type Deployment = testing_framework_core::topology::ClusterTopology;
type NodeClient = String;
type NodeConfig = String;
type FeedRuntime = DefaultFeedRuntime;
fn build_node_client(access: &NodeAccess) -> Result<Self::NodeClient, DynError> {
Ok(access.api_base_url()?.to_string())
}
async fn prepare_feed(
_node_clients: NodeClients<Self>,
) -> Result<
(
testing_framework_core::scenario::DefaultFeed,
Self::FeedRuntime,
),
DynError,
> {
default_feed_result()
}
}
#[async_trait::async_trait]

View File

@ -11,13 +11,11 @@ use async_trait::async_trait;
use testing_framework_core::{
scenario::{
Application, ClusterControlProfile, ClusterMode, Deployer, DeploymentPolicy, DynError,
FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlCapability,
NodeControlHandle, RetryPolicy, Runner, Scenario, ScenarioError,
HttpReadinessRequirement, Metrics, NodeClients, NodeControlCapability, NodeControlHandle,
RetryPolicy, Runner, Scenario, ScenarioError,
internal::{
CleanupGuard, FeedHandle, RuntimeAssembly, SourceOrchestrationPlan,
build_source_orchestration_plan,
CleanupGuard, RuntimeAssembly, SourceOrchestrationPlan, build_source_orchestration_plan,
},
spawn_feed,
},
topology::DeploymentDescriptor,
};
@ -42,23 +40,16 @@ const READINESS_BACKOFF_MAX_SECS: u64 = 2;
struct LocalProcessGuard<E: LocalDeployerEnv> {
nodes: Vec<Node<E>>,
feed_task: Option<FeedHandle>,
}
impl<E: LocalDeployerEnv> LocalProcessGuard<E> {
fn new(nodes: Vec<Node<E>>, feed_task: FeedHandle) -> Self {
Self {
nodes,
feed_task: Some(feed_task),
}
fn new(nodes: Vec<Node<E>>) -> Self {
Self { nodes }
}
}
impl<E: LocalDeployerEnv> CleanupGuard for LocalProcessGuard<E> {
fn cleanup(mut self: Box<Self>) {
if let Some(feed_task) = self.feed_task.take() {
CleanupGuard::cleanup(Box::new(feed_task));
}
fn cleanup(self: Box<Self>) {
// Nodes own local processes; dropping them stops the processes.
drop(self.nodes);
}
@ -222,8 +213,7 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
)
.await?;
let cleanup_guard: Box<dyn CleanupGuard> =
Box::new(LocalProcessGuard::<E>::new(nodes, runtime.feed_task));
let cleanup_guard: Box<dyn CleanupGuard> = Box::new(LocalProcessGuard::<E>::new(nodes));
Ok(runtime.assembly.build_runner(Some(cleanup_guard)))
}
@ -262,9 +252,7 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
)
.await?;
Ok(runtime
.assembly
.build_runner(Some(Box::new(runtime.feed_task))))
Ok(runtime.assembly.build_runner(None))
}
fn node_control_from(
@ -489,29 +477,6 @@ fn keep_tempdir(policy: DeploymentPolicy) -> bool {
policy.cleanup_policy.preserve_artifacts || keep_tempdir_from_env()
}
async fn spawn_feed_with<E: Application>(
node_clients: &NodeClients<E>,
) -> Result<(<E::FeedRuntime as FeedRuntime>::Feed, FeedHandle), ProcessDeployerError> {
let node_count = node_clients.len();
debug!(nodes = node_count, "starting local feed");
if node_count == 0 {
return Err(ProcessDeployerError::WorkloadFailed {
source: "feed requires at least one node".into(),
});
}
info!("starting feed");
spawn_feed::<E>(node_clients.clone())
.await
.map_err(workload_error)
}
fn workload_error(source: DynError) -> ProcessDeployerError {
ProcessDeployerError::WorkloadFailed { source }
}
fn log_local_deploy_start(node_count: usize, policy: DeploymentPolicy, has_node_control: bool) {
info!(
nodes = node_count,
@ -524,7 +489,6 @@ fn log_local_deploy_start(node_count: usize, policy: DeploymentPolicy, has_node_
struct RuntimeContext<E: Application> {
assembly: RuntimeAssembly<E>,
feed_task: FeedHandle,
}
async fn run_context_for<E: Application>(
@ -539,7 +503,6 @@ async fn run_context_for<E: Application>(
return Err(ProcessDeployerError::RuntimePreflight);
}
let (feed, feed_task) = spawn_feed_with::<E>(&node_clients).await?;
let mut assembly = RuntimeAssembly::new(
descriptors,
node_clients,
@ -547,14 +510,10 @@ async fn run_context_for<E: Application>(
expectation_cooldown,
cluster_control_profile,
Metrics::empty(),
feed,
);
if let Some(node_control) = node_control {
assembly = assembly.with_node_control(node_control);
}
Ok(RuntimeContext {
assembly,
feed_task,
})
Ok(RuntimeContext { assembly })
}

View File

@ -1,7 +1,7 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use testing_framework_core::{
scenario::{Application, DynError, Feed, FeedRuntime, HttpReadinessRequirement, NodeClients},
scenario::{Application, DynError, HttpReadinessRequirement},
topology::DeploymentDescriptor,
};
@ -9,25 +9,6 @@ use super::*;
static STABLE_CALLS: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone, Default)]
struct DummyFeed;
impl Feed for DummyFeed {
type Subscription = ();
fn subscribe(&self) -> Self::Subscription {}
}
#[derive(Default)]
struct DummyFeedRuntime;
#[async_trait::async_trait]
impl FeedRuntime for DummyFeedRuntime {
type Feed = DummyFeed;
async fn run(self: Box<Self>) {}
}
#[derive(Clone)]
struct DummyConfig;
@ -47,13 +28,6 @@ impl Application for DummyEnv {
type Deployment = DummyTopology;
type NodeClient = ();
type NodeConfig = DummyConfig;
type FeedRuntime = DummyFeedRuntime;
async fn prepare_feed(
_node_clients: NodeClients<Self>,
) -> Result<(<Self::FeedRuntime as FeedRuntime>::Feed, Self::FeedRuntime), DynError> {
Ok((DummyFeed, DummyFeedRuntime))
}
}
#[async_trait::async_trait]