feat(testing-framework): wire external sources for local and scenario flows

This commit is contained in:
andrussal 2026-02-19 11:53:46 +01:00
parent 86362a3a78
commit b3ecc5acf7
14 changed files with 486 additions and 67 deletions

2
Cargo.lock generated
View File

@ -2904,6 +2904,7 @@ dependencies = [
"testing-framework-env",
"testing-framework-runner-compose",
"testing-framework-runner-k8s",
"testing-framework-runner-local",
"testing_framework",
"thiserror 2.0.18",
"tokio",
@ -5816,6 +5817,7 @@ dependencies = [
"testing-framework-core",
"testing-framework-runner-compose",
"testing-framework-runner-k8s",
"testing-framework-runner-local",
"testing_framework",
"tokio",
"tracing",

View File

@ -17,6 +17,7 @@ lb-workloads = { workspace = true }
testing-framework-core = { workspace = true }
testing-framework-runner-compose = { workspace = true }
testing-framework-runner-k8s = { workspace = true }
testing-framework-runner-local = { workspace = true }
tokio = { features = ["macros", "net", "rt-multi-thread", "time"], workspace = true }
tracing = { workspace = true }
tracing-subscriber = { features = ["env-filter", "fmt"], version = "0.3" }

View File

@ -0,0 +1,284 @@
use std::{collections::HashSet, time::Duration};
use anyhow::Result;
use lb_ext::{LbcExtEnv, ScenarioBuilder};
use lb_framework::{
DeploymentBuilder, LbcEnv, LbcLocalDeployer, LbcManualCluster, NodeHttpClient, TopologyConfig,
configs::build_node_run_config,
};
use testing_framework_core::scenario::{
Deployer as _, ExternalNodeSource, PeerSelection, StartNodeOptions,
};
use testing_framework_runner_local::ProcessDeployer;
use tokio::time::sleep;
struct SeedCluster {
_cluster: LbcManualCluster,
node_a: NodeHttpClient,
node_b: NodeHttpClient,
bootstrap_peer_addresses: Vec<String>,
}
impl SeedCluster {
fn external_sources(&self) -> [ExternalNodeSource; 2] {
[
ExternalNodeSource::new("external-a".to_owned(), self.node_a.base_url().to_string()),
ExternalNodeSource::new("external-b".to_owned(), self.node_b.base_url().to_string()),
]
}
}
#[tokio::test]
#[ignore = "run manually with `cargo test -p runner-examples --test external_sources_local -- --ignored`"]
async fn managed_local_plus_external_sources_are_orchestrated() -> Result<()> {
let seed_cluster = start_seed_cluster().await?;
let second_cluster_bootstrap_peers =
parse_peer_addresses(&seed_cluster.bootstrap_peer_addresses)?;
let second_topology = DeploymentBuilder::new(TopologyConfig::with_node_numbers(2)).build()?;
let second_cluster = LbcLocalDeployer::new().manual_cluster_from_descriptors(second_topology);
let second_c = second_cluster
.start_node_with(
"c",
StartNodeOptions::<LbcEnv>::default()
.with_peers(PeerSelection::None)
.create_patch({
let peers = second_cluster_bootstrap_peers.clone();
move |mut run_config| {
run_config
.user
.network
.backend
.initial_peers
.extend(peers.clone());
Ok(run_config)
}
}),
)
.await?
.client;
let second_d = second_cluster
.start_node_with(
"d",
StartNodeOptions::<LbcEnv>::default()
.with_peers(PeerSelection::Named(vec!["node-c".to_owned()]))
.create_patch({
let peers = second_cluster_bootstrap_peers.clone();
move |mut run_config| {
run_config
.user
.network
.backend
.initial_peers
.extend(peers.clone());
Ok(run_config)
}
}),
)
.await?
.client;
second_cluster.wait_network_ready().await?;
wait_until_has_peers(&second_c, Duration::from_secs(30)).await?;
wait_until_has_peers(&second_d, Duration::from_secs(30)).await?;
second_cluster.add_external_clients([seed_cluster.node_a.clone(), seed_cluster.node_b.clone()]);
let orchestrated = second_cluster.node_clients();
assert_eq!(
orchestrated.len(),
4,
"expected 2 managed + 2 external clients"
);
let expected_endpoints: HashSet<String> = [
seed_cluster.node_a.base_url().to_string(),
seed_cluster.node_b.base_url().to_string(),
second_c.base_url().to_string(),
second_d.base_url().to_string(),
]
.into_iter()
.collect();
let actual_endpoints: HashSet<String> = orchestrated
.snapshot()
.into_iter()
.map(|client| client.base_url().to_string())
.collect();
assert_eq!(actual_endpoints, expected_endpoints);
for client in orchestrated.snapshot() {
let _ = client.consensus_info().await?;
}
Ok(())
}
#[tokio::test]
#[ignore = "run manually with `cargo test -p runner-examples --test external_sources_local -- --ignored`"]
async fn scenario_managed_plus_external_sources_are_orchestrated() -> Result<()> {
let seed_cluster = start_seed_cluster().await?;
let base_builder = DeploymentBuilder::new(TopologyConfig::with_node_numbers(2));
let base_descriptors = base_builder.clone().build()?;
let mut deployment_builder = base_builder;
let parsed_peers = parse_peer_addresses(&seed_cluster.bootstrap_peer_addresses)?;
for node in base_descriptors.nodes() {
let mut run_config = build_node_run_config(
&base_descriptors,
node,
base_descriptors.config().node_config_override(node.index()),
)
.map_err(|error| anyhow::anyhow!(error.to_string()))?;
run_config
.user
.network
.backend
.initial_peers
.extend(parsed_peers.clone());
deployment_builder = deployment_builder.with_node_config_override(node.index(), run_config);
}
let mut scenario = ScenarioBuilder::new(Box::new(deployment_builder))
.with_run_duration(Duration::from_secs(5))
.with_external_node(seed_cluster.external_sources()[0].clone())
.with_external_node(seed_cluster.external_sources()[1].clone())
.build()?;
let deployer = ProcessDeployer::<LbcExtEnv>::default();
let runner = deployer.deploy(&scenario).await?;
let run_handle = runner.run(&mut scenario).await?;
let clients = run_handle.context().node_clients().snapshot();
assert_eq!(clients.len(), 4, "expected 2 managed + 2 external clients");
let first_a_endpoint = seed_cluster.node_a.base_url().to_string();
let first_b_endpoint = seed_cluster.node_b.base_url().to_string();
for client in clients.iter().filter(|client| {
let endpoint = client.base_url().to_string();
endpoint != first_a_endpoint && endpoint != first_b_endpoint
}) {
wait_until_has_peers(client, Duration::from_secs(30)).await?;
}
let expected_endpoints: HashSet<String> = [
seed_cluster.node_a.base_url().to_string(),
seed_cluster.node_b.base_url().to_string(),
]
.into_iter()
.collect();
let actual_endpoints: HashSet<String> = clients
.iter()
.map(|client| client.base_url().to_string())
.collect();
assert!(
expected_endpoints.is_subset(&actual_endpoints),
"scenario context should include external endpoints"
);
for client in clients {
let _ = client.consensus_info().await?;
}
Ok(())
}
async fn start_seed_cluster() -> Result<SeedCluster> {
let topology = DeploymentBuilder::new(TopologyConfig::with_node_numbers(2)).build()?;
let cluster = LbcLocalDeployer::new().manual_cluster_from_descriptors(topology);
let node_a = cluster
.start_node_with("a", node_start_options(PeerSelection::None))
.await?
.client;
let node_b = cluster
.start_node_with(
"b",
node_start_options(PeerSelection::Named(vec!["node-a".to_owned()])),
)
.await?
.client;
cluster.wait_network_ready().await?;
let bootstrap_peer_addresses = collect_loopback_peer_addresses(&node_a, &node_b).await?;
Ok(SeedCluster {
_cluster: cluster,
node_a,
node_b,
bootstrap_peer_addresses,
})
}
fn node_start_options(peers: PeerSelection) -> StartNodeOptions<LbcEnv> {
let mut options = StartNodeOptions::<LbcEnv>::default();
options.peers = peers;
options
}
async fn collect_loopback_peer_addresses(
node_a: &lb_framework::NodeHttpClient,
node_b: &lb_framework::NodeHttpClient,
) -> Result<Vec<String>> {
let mut peers = Vec::new();
for info in [node_a.network_info().await?, node_b.network_info().await?] {
let addresses: Vec<String> = info
.listen_addresses
.into_iter()
.map(|addr| addr.to_string())
.collect();
let mut loopback: Vec<String> = addresses
.iter()
.filter(|addr| addr.contains("/127.0.0.1/"))
.cloned()
.collect();
if loopback.is_empty() {
loopback = addresses;
}
peers.extend(loopback);
}
Ok(peers)
}
fn parse_peer_addresses<T>(addresses: &[String]) -> Result<Vec<T>>
where
T: std::str::FromStr,
T::Err: std::error::Error + Send + Sync + 'static,
{
addresses
.iter()
.map(|address| address.parse::<T>().map_err(Into::into))
.collect()
}
async fn wait_until_has_peers(client: &NodeHttpClient, timeout: Duration) -> Result<()> {
let start = tokio::time::Instant::now();
loop {
if let Ok(network_info) = client.network_info().await {
if network_info.n_peers > 0 {
return Ok(());
}
}
if start.elapsed() >= timeout {
anyhow::bail!(
"node {} did not report non-zero peer count within {:?}",
client.base_url(),
timeout
);
}
sleep(Duration::from_millis(500)).await;
}
}

View File

@ -13,6 +13,7 @@ testing-framework-core = { workspace = true }
testing-framework-env = { workspace = true }
testing-framework-runner-compose = { workspace = true }
testing-framework-runner-k8s = { workspace = true }
testing-framework-runner-local = { workspace = true }
# Logos / Nomos deps
lb_http_api_common = { workspace = true }

View File

@ -1,9 +1,25 @@
use std::sync::Arc;
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
};
use async_trait::async_trait;
pub use lb_framework::*;
use testing_framework_core::scenario::{Application, DynError, FeedRuntime, RunContext};
use reqwest::Url;
pub use scenario::{
CoreBuilderExt, ObservabilityBuilderExt, ScenarioBuilder, ScenarioBuilderExt,
ScenarioBuilderWith,
};
use testing_framework_core::scenario::{
Application, DynError, ExternalNodeSource, FeedRuntime, RunContext, StartNodeOptions,
};
use testing_framework_runner_local::{
BuiltNodeConfig, LocalDeployerEnv, NodeConfigEntry,
process::{LaunchSpec, NodeEndpoints, ProcessSpawnError},
};
use tokio::sync::broadcast;
use workloads::{LbcBlockFeedEnv, LbcScenarioEnv};
pub mod cfgsync;
mod compose_env;
@ -11,36 +27,106 @@ pub mod constants;
mod k8s_env;
pub mod scenario;
pub type LbcComposeDeployer = testing_framework_runner_compose::ComposeDeployer<LbcExtEnv>;
pub type LbcK8sDeployer = testing_framework_runner_k8s::K8sDeployer<LbcExtEnv>;
pub struct LbcExtEnv;
#[async_trait]
impl Application for LbcExtEnv {
type Deployment = <lb_framework::LbcEnv as Application>::Deployment;
type NodeClient = <lb_framework::LbcEnv as Application>::NodeClient;
type NodeConfig = <lb_framework::LbcEnv as Application>::NodeConfig;
type FeedRuntime = <lb_framework::LbcEnv as Application>::FeedRuntime;
type Deployment = <LbcEnv as Application>::Deployment;
type NodeClient = <LbcEnv as Application>::NodeClient;
type NodeConfig = <LbcEnv as Application>::NodeConfig;
type FeedRuntime = <LbcEnv as Application>::FeedRuntime;
fn external_node_client(source: &ExternalNodeSource) -> Result<Self::NodeClient, DynError> {
let base_url = Url::parse(&source.endpoint)?;
Ok(NodeHttpClient::from_urls(base_url, None))
}
async fn prepare_feed(
client: Self::NodeClient,
) -> Result<(<Self::FeedRuntime as FeedRuntime>::Feed, Self::FeedRuntime), DynError> {
<lb_framework::LbcEnv as Application>::prepare_feed(client).await
<LbcEnv as Application>::prepare_feed(client).await
}
}
pub use scenario::{
CoreBuilderExt, ObservabilityBuilderExt, ScenarioBuilder, ScenarioBuilderExt,
ScenarioBuilderWith,
};
impl LbcScenarioEnv for LbcExtEnv {}
pub type LbcComposeDeployer = testing_framework_runner_compose::ComposeDeployer<LbcExtEnv>;
pub type LbcK8sDeployer = testing_framework_runner_k8s::K8sDeployer<LbcExtEnv>;
impl lb_framework::workloads::LbcScenarioEnv for LbcExtEnv {}
impl lb_framework::workloads::LbcBlockFeedEnv for LbcExtEnv {
fn block_feed_subscription(
ctx: &RunContext<Self>,
) -> broadcast::Receiver<Arc<lb_framework::BlockRecord>> {
impl LbcBlockFeedEnv for LbcExtEnv {
fn block_feed_subscription(ctx: &RunContext<Self>) -> broadcast::Receiver<Arc<BlockRecord>> {
ctx.feed().subscribe()
}
}
#[async_trait]
impl LocalDeployerEnv for LbcExtEnv {
fn build_node_config(
topology: &Self::Deployment,
index: usize,
peer_ports_by_name: &HashMap<String, u16>,
options: &StartNodeOptions<Self>,
peer_ports: &[u16],
) -> Result<BuiltNodeConfig<<Self as Application>::NodeConfig>, DynError> {
let mapped_options = map_start_options(options)?;
<LbcEnv as LocalDeployerEnv>::build_node_config(
topology,
index,
peer_ports_by_name,
&mapped_options,
peer_ports,
)
}
fn build_initial_node_configs(
topology: &Self::Deployment,
) -> Result<Vec<NodeConfigEntry<<Self as Application>::NodeConfig>>, ProcessSpawnError> {
<LbcEnv as LocalDeployerEnv>::build_initial_node_configs(topology)
}
fn initial_persist_dir(
topology: &Self::Deployment,
node_name: &str,
index: usize,
) -> Option<PathBuf> {
<LbcEnv as LocalDeployerEnv>::initial_persist_dir(topology, node_name, index)
}
fn build_launch_spec(
config: &<Self as Application>::NodeConfig,
dir: &Path,
label: &str,
) -> Result<LaunchSpec, DynError> {
<LbcEnv as LocalDeployerEnv>::build_launch_spec(config, dir, label)
}
fn node_endpoints(config: &<Self as Application>::NodeConfig) -> NodeEndpoints {
<LbcEnv as LocalDeployerEnv>::node_endpoints(config)
}
fn node_client(endpoints: &NodeEndpoints) -> Self::NodeClient {
<LbcEnv as LocalDeployerEnv>::node_client(endpoints)
}
fn readiness_endpoint_path() -> &'static str {
<LbcEnv as LocalDeployerEnv>::readiness_endpoint_path()
}
}
fn map_start_options(
options: &StartNodeOptions<LbcExtEnv>,
) -> Result<StartNodeOptions<LbcEnv>, DynError> {
if options.config_patch.is_some() {
return Err("LbcExtEnv local deployer bridge does not support config_patch yet".into());
}
let mut mapped = StartNodeOptions::<LbcEnv>::default();
mapped.peers = options.peers.clone();
mapped.config_override = options.config_override.clone();
mapped.persist_dir = options.persist_dir.clone();
Ok(mapped)
}

View File

@ -1,7 +1,9 @@
use std::io;
use async_trait::async_trait;
use crate::{
scenario::{DynError, FeedRuntime},
scenario::{DynError, ExternalNodeSource, FeedRuntime},
topology::DeploymentDescriptor,
};
@ -16,16 +18,11 @@ pub trait Application: Send + Sync + 'static {
type FeedRuntime: FeedRuntime;
/// Optional stable node identity (for example a peer id) used for
/// deduplication when nodes are discovered from multiple sources.
fn node_peer_identity(_client: &Self::NodeClient) -> Option<String> {
None
}
/// Optional endpoint identity used as a dedup fallback when no peer id is
/// available.
fn node_endpoint_identity(_client: &Self::NodeClient) -> Option<String> {
None
/// Build an application node client from a static external source.
///
/// Environments that support external nodes should override this.
fn external_node_client(_source: &ExternalNodeSource) -> Result<Self::NodeClient, DynError> {
Err(io::Error::other("external node sources are not supported").into())
}
async fn prepare_feed(

View File

@ -734,7 +734,6 @@ fn source_plan_error_to_build_error(error: SourceOrchestrationPlanError) -> Scen
const fn source_mode_name(mode: SourceModeName) -> &'static str {
match mode {
SourceModeName::Attached => "Attached",
SourceModeName::ExternalOnly => "ExternalOnly",
}
}

View File

@ -304,25 +304,18 @@ fn upsert_node<E: Application>(
}
fn canonical_identity<E: Application>(
client: &E::NodeClient,
_client: &E::NodeClient,
identity_hint: Option<String>,
inner: &mut NodeInventoryInner<E>,
) -> String {
// Priority: explicit hint -> app-provided peer id -> endpoint -> synthetic.
// Priority: explicit hint -> synthetic.
if let Some(identity) = identity_hint.filter(|value| !value.trim().is_empty()) {
return identity;
}
if let Some(identity) = E::node_peer_identity(client) {
return format!("peer:{identity}");
}
if let Some(identity) = E::node_endpoint_identity(client) {
return format!("endpoint:{identity}");
}
let synthetic = format!("node:{}", inner.next_synthetic_id);
inner.next_synthetic_id += 1;
synthetic
}

View File

@ -41,14 +41,12 @@ pub struct SourceOrchestrationPlan {
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum SourceModeName {
Attached,
ExternalOnly,
}
impl fmt::Display for SourceModeName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Attached => f.write_str("Attached"),
Self::ExternalOnly => f.write_str("ExternalOnly"),
}
}
}
@ -87,9 +85,9 @@ impl SourceOrchestrationPlan {
fn ensure_currently_wired(&self) -> Result<(), SourceOrchestrationPlanError> {
match self.mode {
SourceOrchestrationMode::Managed { .. } => Ok(()),
SourceOrchestrationMode::Managed { .. }
| SourceOrchestrationMode::ExternalOnly { .. } => Ok(()),
SourceOrchestrationMode::Attached { .. } => not_wired(SourceModeName::Attached),
SourceOrchestrationMode::ExternalOnly { .. } => not_wired(SourceModeName::ExternalOnly),
}
}
}

View File

@ -7,8 +7,9 @@ use crate::scenario::{
SourceOrchestrationMode, SourceOrchestrationPlan, SourceOrchestrationPlanError,
},
providers::{
AttachProviderError, AttachedNode, ExternalNode, ExternalProviderError,
ManagedProviderError, ManagedProvisionedNode, SourceProviders, StaticManagedProvider,
ApplicationExternalProvider, AttachProviderError, AttachedNode, ExternalNode,
ExternalProviderError, ManagedProviderError, ManagedProvisionedNode, SourceProviders,
StaticManagedProvider,
},
},
};
@ -47,9 +48,6 @@ pub fn build_source_orchestration_plan<E: Application, Caps>(
}
/// Resolves runtime source nodes via unified providers from orchestration plan.
///
/// This currently returns managed nodes for managed mode and keeps external
/// overlays for managed mode reserved until external wiring is enabled.
pub async fn resolve_sources<E: Application>(
plan: &SourceOrchestrationPlan,
providers: &SourceProviders<E>,
@ -57,15 +55,18 @@ pub async fn resolve_sources<E: Application>(
match &plan.mode {
SourceOrchestrationMode::Managed { managed, .. } => {
let managed_nodes = providers.managed.provide(managed).await?;
let external_nodes = providers.external.provide(plan.external_sources()).await?;
Ok(ResolvedSources {
managed: managed_nodes,
attached: Vec::new(),
external: Vec::new(),
external: external_nodes,
})
}
SourceOrchestrationMode::Attached { attach, external } => {
let attached_nodes = providers.attach.discover(attach).await?;
let external_nodes = providers.external.provide(external).await?;
Ok(ResolvedSources {
managed: Vec::new(),
attached: attached_nodes,
@ -74,6 +75,7 @@ pub async fn resolve_sources<E: Application>(
}
SourceOrchestrationMode::ExternalOnly { external } => {
let external_nodes = providers.external.provide(external).await?;
Ok(ResolvedSources {
managed: Vec::new(),
attached: Vec::new(),
@ -88,16 +90,18 @@ pub async fn resolve_sources<E: Application>(
/// Current wiring is transitional:
/// - Managed mode is backed by prebuilt deployer-managed clients via
/// `StaticManagedProvider`.
/// - Attached and external modes are represented in the orchestration plan and
/// resolver, but provider wiring is still scaffolding-only until full runtime
/// integration lands.
/// - External nodes are resolved via `Application::external_node_client`.
/// - Attached mode remains blocked at plan validation until attach providers
/// are fully wired.
pub async fn orchestrate_sources<E: Application>(
plan: &SourceOrchestrationPlan,
node_clients: NodeClients<E>,
) -> Result<NodeClients<E>, DynError> {
let providers: SourceProviders<E> = SourceProviders::default().with_managed(Arc::new(
StaticManagedProvider::new(node_clients.snapshot()),
));
let providers: SourceProviders<E> = SourceProviders::default()
.with_managed(Arc::new(StaticManagedProvider::new(
node_clients.snapshot(),
)))
.with_external(Arc::new(ApplicationExternalProvider));
let resolved = resolve_sources(plan, &providers).await?;
@ -110,6 +114,8 @@ pub async fn orchestrate_sources<E: Application>(
.managed
.into_iter()
.map(|node| node.client)
.chain(resolved.attached.into_iter().map(|node| node.client))
.chain(resolved.external.into_iter().map(|node| node.client))
.collect(),
))
}

View File

@ -26,9 +26,6 @@ pub enum ExternalProviderError {
/// Internal adapter interface for constructing node clients from static
/// external endpoint sources.
///
/// This is scaffolding-only in phase 1 and is intentionally not wired into
/// deployer runtime orchestration yet.
#[async_trait]
pub trait ExternalProvider<E: Application>: Send + Sync {
/// Builds external node handles from external source descriptors.
@ -38,8 +35,8 @@ pub trait ExternalProvider<E: Application>: Send + Sync {
) -> Result<Vec<ExternalNode<E>>, ExternalProviderError>;
}
/// Default external provider stub used while external wiring is not
/// implemented.
/// Default external provider stub used when external wiring is intentionally
/// disabled.
#[derive(Clone, Copy, Debug, Default)]
pub struct NoopExternalProvider;
@ -58,3 +55,30 @@ impl<E: Application> ExternalProvider<E> for NoopExternalProvider {
})
}
}
/// External provider backed by [`Application::external_node_client`].
#[derive(Clone, Copy, Debug, Default)]
pub struct ApplicationExternalProvider;
#[async_trait]
impl<E: Application> ExternalProvider<E> for ApplicationExternalProvider {
async fn provide(
&self,
sources: &[ExternalNodeSource],
) -> Result<Vec<ExternalNode<E>>, ExternalProviderError> {
sources
.iter()
.map(|source| {
E::external_node_client(source)
.map(|client| ExternalNode {
identity_hint: Some(source.label.clone()),
client,
})
.map_err(|build_error| ExternalProviderError::Build {
source_label: source.label.clone(),
source: build_error,
})
})
.collect()
}
}

View File

@ -8,6 +8,6 @@ mod managed_provider;
mod source_providers;
pub use attach_provider::{AttachProviderError, AttachedNode};
pub use external_provider::{ExternalNode, ExternalProviderError};
pub use external_provider::{ApplicationExternalProvider, ExternalNode, ExternalProviderError};
pub use managed_provider::{ManagedProviderError, ManagedProvisionedNode, StaticManagedProvider};
pub use source_providers::SourceProviders;

View File

@ -9,8 +9,8 @@ use crate::scenario::Application;
/// Unified provider set used by source orchestration.
///
/// This is scaffolding-only and is intentionally not wired into runtime
/// deployer orchestration yet.
/// This is wired through source orchestration, but defaults to no-op providers
/// until deployers override specific source classes.
pub struct SourceProviders<E: Application> {
pub managed: Arc<dyn ManagedProvider<E>>,
pub attach: Arc<dyn AttachProvider<E>>,

View File

@ -1,6 +1,9 @@
use testing_framework_core::{
manual::ManualClusterHandle,
scenario::{DynError, NodeControlHandle, ReadinessError, StartNodeOptions, StartedNode},
scenario::{
DynError, ExternalNodeSource, NodeClients, NodeControlHandle, ReadinessError,
StartNodeOptions, StartedNode,
},
};
use thiserror::Error;
@ -78,6 +81,31 @@ impl<E: LocalDeployerEnv> ManualCluster<E> {
self.nodes.wait_node_ready(name).await?;
Ok(())
}
#[must_use]
pub fn node_clients(&self) -> NodeClients<E> {
self.nodes.node_clients()
}
pub fn add_external_sources(
&self,
external_sources: impl IntoIterator<Item = ExternalNodeSource>,
) -> Result<(), DynError> {
let node_clients = self.nodes.node_clients();
for source in external_sources {
let client = E::external_node_client(&source)?;
node_clients.add_node(client);
}
Ok(())
}
pub fn add_external_clients(&self, clients: impl IntoIterator<Item = E::NodeClient>) {
let node_clients = self.nodes.node_clients();
for client in clients {
node_clients.add_node(client);
}
}
}
impl<E: LocalDeployerEnv> Drop for ManualCluster<E> {