feed: initialize from full node client set

This commit is contained in:
andrussal 2026-02-22 03:52:27 +01:00
parent 74e6ef5fd0
commit ef0f989be8
7 changed files with 34 additions and 33 deletions

View File

@ -2,7 +2,9 @@ use std::sync::Arc;
use async_trait::async_trait;
pub use lb_framework::*;
use testing_framework_core::scenario::{Application, DynError, FeedRuntime, RunContext};
use testing_framework_core::scenario::{
Application, DynError, FeedRuntime, NodeClients, RunContext,
};
use tokio::sync::broadcast;
pub mod cfgsync;
@ -21,9 +23,12 @@ impl Application for LbcExtEnv {
type FeedRuntime = <lb_framework::LbcEnv as Application>::FeedRuntime;
async fn prepare_feed(
client: Self::NodeClient,
node_clients: NodeClients<Self>,
) -> Result<(<Self::FeedRuntime as FeedRuntime>::Feed, Self::FeedRuntime), DynError> {
<lb_framework::LbcEnv as Application>::prepare_feed(client).await
let clients = node_clients.snapshot();
let upstream_clients = NodeClients::<lb_framework::LbcEnv>::new(clients);
<LbcEnv as Application>::prepare_feed(upstream_clients).await
}
}

View File

@ -1,7 +1,7 @@
use async_trait::async_trait;
use crate::{
scenario::{DynError, FeedRuntime},
scenario::{DynError, FeedRuntime, NodeClients},
topology::DeploymentDescriptor,
};
@ -17,6 +17,8 @@ pub trait Application: Send + Sync + 'static {
type FeedRuntime: FeedRuntime;
async fn prepare_feed(
client: Self::NodeClient,
) -> Result<(<Self::FeedRuntime as FeedRuntime>::Feed, Self::FeedRuntime), DynError>;
node_clients: NodeClients<Self>,
) -> Result<(<Self::FeedRuntime as FeedRuntime>::Feed, Self::FeedRuntime), DynError>
where
Self: Sized;
}

View File

@ -53,9 +53,9 @@ impl CleanupGuard for FeedHandle {
/// Spawn a background task that drives the environment-provided feed.
pub async fn spawn_feed<E: Application>(
client: E::NodeClient,
node_clients: NodeClients<E>,
) -> Result<(<E::FeedRuntime as FeedRuntime>::Feed, FeedHandle), DynError> {
let (feed, worker) = E::prepare_feed(client).await?;
let (feed, worker) = E::prepare_feed(node_clients).await?;
let handle = tokio::spawn(async move {
Box::new(worker).run().await;

View File

@ -20,16 +20,14 @@ async fn spawn_block_feed_with<E: Application>(
),
ComposeRunnerError,
> {
debug!(
nodes = node_clients.len(),
"selecting node client for block feed"
);
let node_count = node_clients.len();
debug!(nodes = node_count, "starting compose block feed");
let block_source_client = node_clients
.random_client()
.ok_or(ComposeRunnerError::BlockFeedMissing)?;
if node_count == 0 {
return Err(ComposeRunnerError::BlockFeedMissing);
}
spawn_feed::<E>(block_source_client)
spawn_feed::<E>(node_clients.clone())
.await
.map_err(|source| ComposeRunnerError::BlockFeed { source })
}

View File

@ -14,17 +14,15 @@ pub async fn spawn_block_feed_with<E: Application>(
),
K8sRunnerError,
> {
debug!(
nodes = node_clients.len(),
"selecting node client for block feed"
);
let node_count = node_clients.len();
debug!(nodes = node_count, "starting k8s block feed");
let block_source_client = node_clients
.random_client()
.ok_or(K8sRunnerError::BlockFeedMissing)?;
if node_count == 0 {
return Err(K8sRunnerError::BlockFeedMissing);
}
info!("starting block feed");
spawn_feed::<E>(block_source_client)
spawn_feed::<E>(node_clients.clone())
.await
.map_err(|source| K8sRunnerError::BlockFeed { source })
}

View File

@ -403,20 +403,18 @@ fn keep_tempdir(policy: DeploymentPolicy) -> bool {
async fn spawn_feed_with<E: Application>(
node_clients: &NodeClients<E>,
) -> Result<(<E::FeedRuntime as FeedRuntime>::Feed, FeedHandle), ProcessDeployerError> {
debug!(
nodes = node_clients.len(),
"selecting node client for local feed"
);
let node_count = node_clients.len();
debug!(nodes = node_count, "starting local feed");
let Some(block_source_client) = node_clients.random_client() else {
if node_count == 0 {
return Err(ProcessDeployerError::WorkloadFailed {
source: "feed requires at least one node".into(),
});
};
}
info!("starting feed");
spawn_feed::<E>(block_source_client)
spawn_feed::<E>(node_clients.clone())
.await
.map_err(workload_error)
}

View File

@ -112,7 +112,7 @@ mod tests {
use async_trait::async_trait;
use testing_framework_core::{
scenario::{Application, DynError, Feed, FeedRuntime},
scenario::{Application, DynError, Feed, FeedRuntime, NodeClients},
topology::DeploymentDescriptor,
};
@ -160,7 +160,7 @@ mod tests {
type FeedRuntime = DummyFeedRuntime;
async fn prepare_feed(
_client: Self::NodeClient,
_node_clients: NodeClients<Self>,
) -> Result<(<Self::FeedRuntime as FeedRuntime>::Feed, Self::FeedRuntime), DynError>
{
Ok((DummyFeed, DummyFeedRuntime))