diff --git a/logos/runtime/ext/src/lib.rs b/logos/runtime/ext/src/lib.rs index c849ceb..4833f21 100644 --- a/logos/runtime/ext/src/lib.rs +++ b/logos/runtime/ext/src/lib.rs @@ -2,7 +2,9 @@ use std::sync::Arc; use async_trait::async_trait; pub use lb_framework::*; -use testing_framework_core::scenario::{Application, DynError, FeedRuntime, RunContext}; +use testing_framework_core::scenario::{ + Application, DynError, FeedRuntime, NodeClients, RunContext, +}; use tokio::sync::broadcast; pub mod cfgsync; @@ -21,9 +23,12 @@ impl Application for LbcExtEnv { type FeedRuntime = ::FeedRuntime; async fn prepare_feed( - client: Self::NodeClient, + node_clients: NodeClients, ) -> Result<(::Feed, Self::FeedRuntime), DynError> { - ::prepare_feed(client).await + let clients = node_clients.snapshot(); + let upstream_clients = NodeClients::::new(clients); + + ::prepare_feed(upstream_clients).await } } diff --git a/testing-framework/core/src/env.rs b/testing-framework/core/src/env.rs index e8b59e0..4689ad5 100644 --- a/testing-framework/core/src/env.rs +++ b/testing-framework/core/src/env.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use crate::{ - scenario::{DynError, FeedRuntime}, + scenario::{DynError, FeedRuntime, NodeClients}, topology::DeploymentDescriptor, }; @@ -17,6 +17,8 @@ pub trait Application: Send + Sync + 'static { type FeedRuntime: FeedRuntime; async fn prepare_feed( - client: Self::NodeClient, - ) -> Result<(::Feed, Self::FeedRuntime), DynError>; + node_clients: NodeClients, + ) -> Result<(::Feed, Self::FeedRuntime), DynError> + where + Self: Sized; } diff --git a/testing-framework/core/src/scenario/runtime/mod.rs b/testing-framework/core/src/scenario/runtime/mod.rs index 6b969c7..7722561 100644 --- a/testing-framework/core/src/scenario/runtime/mod.rs +++ b/testing-framework/core/src/scenario/runtime/mod.rs @@ -53,9 +53,9 @@ impl CleanupGuard for FeedHandle { /// Spawn a background task that drives the environment-provided feed. pub async fn spawn_feed( - client: E::NodeClient, + node_clients: NodeClients, ) -> Result<(::Feed, FeedHandle), DynError> { - let (feed, worker) = E::prepare_feed(client).await?; + let (feed, worker) = E::prepare_feed(node_clients).await?; let handle = tokio::spawn(async move { Box::new(worker).run().await; diff --git a/testing-framework/deployers/compose/src/lifecycle/block_feed.rs b/testing-framework/deployers/compose/src/lifecycle/block_feed.rs index 3898d2b..ff72aae 100644 --- a/testing-framework/deployers/compose/src/lifecycle/block_feed.rs +++ b/testing-framework/deployers/compose/src/lifecycle/block_feed.rs @@ -20,16 +20,14 @@ async fn spawn_block_feed_with( ), ComposeRunnerError, > { - debug!( - nodes = node_clients.len(), - "selecting node client for block feed" - ); + let node_count = node_clients.len(); + debug!(nodes = node_count, "starting compose block feed"); - let block_source_client = node_clients - .random_client() - .ok_or(ComposeRunnerError::BlockFeedMissing)?; + if node_count == 0 { + return Err(ComposeRunnerError::BlockFeedMissing); + } - spawn_feed::(block_source_client) + spawn_feed::(node_clients.clone()) .await .map_err(|source| ComposeRunnerError::BlockFeed { source }) } diff --git a/testing-framework/deployers/k8s/src/lifecycle/block_feed.rs b/testing-framework/deployers/k8s/src/lifecycle/block_feed.rs index 554a7fd..d50351e 100644 --- a/testing-framework/deployers/k8s/src/lifecycle/block_feed.rs +++ b/testing-framework/deployers/k8s/src/lifecycle/block_feed.rs @@ -14,17 +14,15 @@ pub async fn spawn_block_feed_with( ), K8sRunnerError, > { - debug!( - nodes = node_clients.len(), - "selecting node client for block feed" - ); + let node_count = node_clients.len(); + debug!(nodes = node_count, "starting k8s block feed"); - let block_source_client = node_clients - .random_client() - .ok_or(K8sRunnerError::BlockFeedMissing)?; + if node_count == 0 { + return Err(K8sRunnerError::BlockFeedMissing); + } info!("starting block feed"); - spawn_feed::(block_source_client) + spawn_feed::(node_clients.clone()) .await .map_err(|source| K8sRunnerError::BlockFeed { source }) } diff --git a/testing-framework/deployers/local/src/deployer.rs b/testing-framework/deployers/local/src/deployer.rs index 536e13e..fde1f9e 100644 --- a/testing-framework/deployers/local/src/deployer.rs +++ b/testing-framework/deployers/local/src/deployer.rs @@ -403,20 +403,18 @@ fn keep_tempdir(policy: DeploymentPolicy) -> bool { async fn spawn_feed_with( node_clients: &NodeClients, ) -> Result<(::Feed, FeedHandle), ProcessDeployerError> { - debug!( - nodes = node_clients.len(), - "selecting node client for local feed" - ); + let node_count = node_clients.len(); + debug!(nodes = node_count, "starting local feed"); - let Some(block_source_client) = node_clients.random_client() else { + if node_count == 0 { return Err(ProcessDeployerError::WorkloadFailed { source: "feed requires at least one node".into(), }); - }; + } info!("starting feed"); - spawn_feed::(block_source_client) + spawn_feed::(node_clients.clone()) .await .map_err(workload_error) } diff --git a/testing-framework/deployers/local/src/env.rs b/testing-framework/deployers/local/src/env.rs index 701cc23..2af7d06 100644 --- a/testing-framework/deployers/local/src/env.rs +++ b/testing-framework/deployers/local/src/env.rs @@ -112,7 +112,7 @@ mod tests { use async_trait::async_trait; use testing_framework_core::{ - scenario::{Application, DynError, Feed, FeedRuntime}, + scenario::{Application, DynError, Feed, FeedRuntime, NodeClients}, topology::DeploymentDescriptor, }; @@ -160,7 +160,7 @@ mod tests { type FeedRuntime = DummyFeedRuntime; async fn prepare_feed( - _client: Self::NodeClient, + _node_clients: NodeClients, ) -> Result<(::Feed, Self::FeedRuntime), DynError> { Ok((DummyFeed, DummyFeedRuntime))