diff --git a/Cargo.lock b/Cargo.lock index e53c899..49db93e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2904,6 +2904,7 @@ dependencies = [ "testing-framework-env", "testing-framework-runner-compose", "testing-framework-runner-k8s", + "testing-framework-runner-local", "testing_framework", "thiserror 2.0.18", "tokio", @@ -5816,6 +5817,7 @@ dependencies = [ "testing-framework-core", "testing-framework-runner-compose", "testing-framework-runner-k8s", + "testing-framework-runner-local", "testing_framework", "tokio", "tracing", diff --git a/logos/examples/Cargo.toml b/logos/examples/Cargo.toml index 68b9a18..94769a6 100644 --- a/logos/examples/Cargo.toml +++ b/logos/examples/Cargo.toml @@ -17,6 +17,7 @@ lb-workloads = { workspace = true } testing-framework-core = { workspace = true } testing-framework-runner-compose = { workspace = true } testing-framework-runner-k8s = { workspace = true } +testing-framework-runner-local = { workspace = true } tokio = { features = ["macros", "net", "rt-multi-thread", "time"], workspace = true } tracing = { workspace = true } tracing-subscriber = { features = ["env-filter", "fmt"], version = "0.3" } diff --git a/logos/examples/tests/external_sources_local.rs b/logos/examples/tests/external_sources_local.rs new file mode 100644 index 0000000..af16b9d --- /dev/null +++ b/logos/examples/tests/external_sources_local.rs @@ -0,0 +1,284 @@ +use std::{collections::HashSet, time::Duration}; + +use anyhow::Result; +use lb_ext::{LbcExtEnv, ScenarioBuilder}; +use lb_framework::{ + DeploymentBuilder, LbcEnv, LbcLocalDeployer, LbcManualCluster, NodeHttpClient, TopologyConfig, + configs::build_node_run_config, +}; +use testing_framework_core::scenario::{ + Deployer as _, ExternalNodeSource, PeerSelection, StartNodeOptions, +}; +use testing_framework_runner_local::ProcessDeployer; +use tokio::time::sleep; + +struct SeedCluster { + _cluster: LbcManualCluster, + node_a: NodeHttpClient, + node_b: NodeHttpClient, + bootstrap_peer_addresses: Vec, +} + +impl SeedCluster { + fn external_sources(&self) -> [ExternalNodeSource; 2] { + [ + ExternalNodeSource::new("external-a".to_owned(), self.node_a.base_url().to_string()), + ExternalNodeSource::new("external-b".to_owned(), self.node_b.base_url().to_string()), + ] + } +} + +#[tokio::test] +#[ignore = "run manually with `cargo test -p runner-examples --test external_sources_local -- --ignored`"] +async fn managed_local_plus_external_sources_are_orchestrated() -> Result<()> { + let seed_cluster = start_seed_cluster().await?; + let second_cluster_bootstrap_peers = + parse_peer_addresses(&seed_cluster.bootstrap_peer_addresses)?; + + let second_topology = DeploymentBuilder::new(TopologyConfig::with_node_numbers(2)).build()?; + let second_cluster = LbcLocalDeployer::new().manual_cluster_from_descriptors(second_topology); + let second_c = second_cluster + .start_node_with( + "c", + StartNodeOptions::::default() + .with_peers(PeerSelection::None) + .create_patch({ + let peers = second_cluster_bootstrap_peers.clone(); + move |mut run_config| { + run_config + .user + .network + .backend + .initial_peers + .extend(peers.clone()); + Ok(run_config) + } + }), + ) + .await? + .client; + + let second_d = second_cluster + .start_node_with( + "d", + StartNodeOptions::::default() + .with_peers(PeerSelection::Named(vec!["node-c".to_owned()])) + .create_patch({ + let peers = second_cluster_bootstrap_peers.clone(); + move |mut run_config| { + run_config + .user + .network + .backend + .initial_peers + .extend(peers.clone()); + Ok(run_config) + } + }), + ) + .await? + .client; + + second_cluster.wait_network_ready().await?; + + wait_until_has_peers(&second_c, Duration::from_secs(30)).await?; + wait_until_has_peers(&second_d, Duration::from_secs(30)).await?; + + second_cluster.add_external_clients([seed_cluster.node_a.clone(), seed_cluster.node_b.clone()]); + let orchestrated = second_cluster.node_clients(); + + assert_eq!( + orchestrated.len(), + 4, + "expected 2 managed + 2 external clients" + ); + + let expected_endpoints: HashSet = [ + seed_cluster.node_a.base_url().to_string(), + seed_cluster.node_b.base_url().to_string(), + second_c.base_url().to_string(), + second_d.base_url().to_string(), + ] + .into_iter() + .collect(); + + let actual_endpoints: HashSet = orchestrated + .snapshot() + .into_iter() + .map(|client| client.base_url().to_string()) + .collect(); + + assert_eq!(actual_endpoints, expected_endpoints); + + for client in orchestrated.snapshot() { + let _ = client.consensus_info().await?; + } + + Ok(()) +} + +#[tokio::test] +#[ignore = "run manually with `cargo test -p runner-examples --test external_sources_local -- --ignored`"] +async fn scenario_managed_plus_external_sources_are_orchestrated() -> Result<()> { + let seed_cluster = start_seed_cluster().await?; + + let base_builder = DeploymentBuilder::new(TopologyConfig::with_node_numbers(2)); + let base_descriptors = base_builder.clone().build()?; + let mut deployment_builder = base_builder; + let parsed_peers = parse_peer_addresses(&seed_cluster.bootstrap_peer_addresses)?; + + for node in base_descriptors.nodes() { + let mut run_config = build_node_run_config( + &base_descriptors, + node, + base_descriptors.config().node_config_override(node.index()), + ) + .map_err(|error| anyhow::anyhow!(error.to_string()))?; + run_config + .user + .network + .backend + .initial_peers + .extend(parsed_peers.clone()); + deployment_builder = deployment_builder.with_node_config_override(node.index(), run_config); + } + + let mut scenario = ScenarioBuilder::new(Box::new(deployment_builder)) + .with_run_duration(Duration::from_secs(5)) + .with_external_node(seed_cluster.external_sources()[0].clone()) + .with_external_node(seed_cluster.external_sources()[1].clone()) + .build()?; + + let deployer = ProcessDeployer::::default(); + let runner = deployer.deploy(&scenario).await?; + let run_handle = runner.run(&mut scenario).await?; + + let clients = run_handle.context().node_clients().snapshot(); + + assert_eq!(clients.len(), 4, "expected 2 managed + 2 external clients"); + + let first_a_endpoint = seed_cluster.node_a.base_url().to_string(); + let first_b_endpoint = seed_cluster.node_b.base_url().to_string(); + + for client in clients.iter().filter(|client| { + let endpoint = client.base_url().to_string(); + endpoint != first_a_endpoint && endpoint != first_b_endpoint + }) { + wait_until_has_peers(client, Duration::from_secs(30)).await?; + } + + let expected_endpoints: HashSet = [ + seed_cluster.node_a.base_url().to_string(), + seed_cluster.node_b.base_url().to_string(), + ] + .into_iter() + .collect(); + + let actual_endpoints: HashSet = clients + .iter() + .map(|client| client.base_url().to_string()) + .collect(); + + assert!( + expected_endpoints.is_subset(&actual_endpoints), + "scenario context should include external endpoints" + ); + + for client in clients { + let _ = client.consensus_info().await?; + } + + Ok(()) +} + +async fn start_seed_cluster() -> Result { + let topology = DeploymentBuilder::new(TopologyConfig::with_node_numbers(2)).build()?; + let cluster = LbcLocalDeployer::new().manual_cluster_from_descriptors(topology); + let node_a = cluster + .start_node_with("a", node_start_options(PeerSelection::None)) + .await? + .client; + let node_b = cluster + .start_node_with( + "b", + node_start_options(PeerSelection::Named(vec!["node-a".to_owned()])), + ) + .await? + .client; + cluster.wait_network_ready().await?; + let bootstrap_peer_addresses = collect_loopback_peer_addresses(&node_a, &node_b).await?; + + Ok(SeedCluster { + _cluster: cluster, + node_a, + node_b, + bootstrap_peer_addresses, + }) +} + +fn node_start_options(peers: PeerSelection) -> StartNodeOptions { + let mut options = StartNodeOptions::::default(); + options.peers = peers; + options +} + +async fn collect_loopback_peer_addresses( + node_a: &lb_framework::NodeHttpClient, + node_b: &lb_framework::NodeHttpClient, +) -> Result> { + let mut peers = Vec::new(); + + for info in [node_a.network_info().await?, node_b.network_info().await?] { + let addresses: Vec = info + .listen_addresses + .into_iter() + .map(|addr| addr.to_string()) + .collect(); + + let mut loopback: Vec = addresses + .iter() + .filter(|addr| addr.contains("/127.0.0.1/")) + .cloned() + .collect(); + + if loopback.is_empty() { + loopback = addresses; + } + + peers.extend(loopback); + } + + Ok(peers) +} + +fn parse_peer_addresses(addresses: &[String]) -> Result> +where + T: std::str::FromStr, + T::Err: std::error::Error + Send + Sync + 'static, +{ + addresses + .iter() + .map(|address| address.parse::().map_err(Into::into)) + .collect() +} + +async fn wait_until_has_peers(client: &NodeHttpClient, timeout: Duration) -> Result<()> { + let start = tokio::time::Instant::now(); + loop { + if let Ok(network_info) = client.network_info().await { + if network_info.n_peers > 0 { + return Ok(()); + } + } + + if start.elapsed() >= timeout { + anyhow::bail!( + "node {} did not report non-zero peer count within {:?}", + client.base_url(), + timeout + ); + } + + sleep(Duration::from_millis(500)).await; + } +} diff --git a/logos/runtime/ext/Cargo.toml b/logos/runtime/ext/Cargo.toml index e70629a..585e5d9 100644 --- a/logos/runtime/ext/Cargo.toml +++ b/logos/runtime/ext/Cargo.toml @@ -13,6 +13,7 @@ testing-framework-core = { workspace = true } testing-framework-env = { workspace = true } testing-framework-runner-compose = { workspace = true } testing-framework-runner-k8s = { workspace = true } +testing-framework-runner-local = { workspace = true } # Logos / Nomos deps lb_http_api_common = { workspace = true } diff --git a/logos/runtime/ext/src/lib.rs b/logos/runtime/ext/src/lib.rs index c849ceb..49a7b8c 100644 --- a/logos/runtime/ext/src/lib.rs +++ b/logos/runtime/ext/src/lib.rs @@ -1,9 +1,25 @@ -use std::sync::Arc; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::Arc, +}; use async_trait::async_trait; pub use lb_framework::*; -use testing_framework_core::scenario::{Application, DynError, FeedRuntime, RunContext}; +use reqwest::Url; +pub use scenario::{ + CoreBuilderExt, ObservabilityBuilderExt, ScenarioBuilder, ScenarioBuilderExt, + ScenarioBuilderWith, +}; +use testing_framework_core::scenario::{ + Application, DynError, ExternalNodeSource, FeedRuntime, RunContext, StartNodeOptions, +}; +use testing_framework_runner_local::{ + BuiltNodeConfig, LocalDeployerEnv, NodeConfigEntry, + process::{LaunchSpec, NodeEndpoints, ProcessSpawnError}, +}; use tokio::sync::broadcast; +use workloads::{LbcBlockFeedEnv, LbcScenarioEnv}; pub mod cfgsync; mod compose_env; @@ -11,36 +27,106 @@ pub mod constants; mod k8s_env; pub mod scenario; +pub type LbcComposeDeployer = testing_framework_runner_compose::ComposeDeployer; +pub type LbcK8sDeployer = testing_framework_runner_k8s::K8sDeployer; + pub struct LbcExtEnv; #[async_trait] impl Application for LbcExtEnv { - type Deployment = ::Deployment; - type NodeClient = ::NodeClient; - type NodeConfig = ::NodeConfig; - type FeedRuntime = ::FeedRuntime; + type Deployment = ::Deployment; + + type NodeClient = ::NodeClient; + + type NodeConfig = ::NodeConfig; + + type FeedRuntime = ::FeedRuntime; + + fn external_node_client(source: &ExternalNodeSource) -> Result { + let base_url = Url::parse(&source.endpoint)?; + Ok(NodeHttpClient::from_urls(base_url, None)) + } async fn prepare_feed( client: Self::NodeClient, ) -> Result<(::Feed, Self::FeedRuntime), DynError> { - ::prepare_feed(client).await + ::prepare_feed(client).await } } -pub use scenario::{ - CoreBuilderExt, ObservabilityBuilderExt, ScenarioBuilder, ScenarioBuilderExt, - ScenarioBuilderWith, -}; +impl LbcScenarioEnv for LbcExtEnv {} -pub type LbcComposeDeployer = testing_framework_runner_compose::ComposeDeployer; -pub type LbcK8sDeployer = testing_framework_runner_k8s::K8sDeployer; - -impl lb_framework::workloads::LbcScenarioEnv for LbcExtEnv {} - -impl lb_framework::workloads::LbcBlockFeedEnv for LbcExtEnv { - fn block_feed_subscription( - ctx: &RunContext, - ) -> broadcast::Receiver> { +impl LbcBlockFeedEnv for LbcExtEnv { + fn block_feed_subscription(ctx: &RunContext) -> broadcast::Receiver> { ctx.feed().subscribe() } } + +#[async_trait] +impl LocalDeployerEnv for LbcExtEnv { + fn build_node_config( + topology: &Self::Deployment, + index: usize, + peer_ports_by_name: &HashMap, + options: &StartNodeOptions, + peer_ports: &[u16], + ) -> Result::NodeConfig>, DynError> { + let mapped_options = map_start_options(options)?; + ::build_node_config( + topology, + index, + peer_ports_by_name, + &mapped_options, + peer_ports, + ) + } + + fn build_initial_node_configs( + topology: &Self::Deployment, + ) -> Result::NodeConfig>>, ProcessSpawnError> { + ::build_initial_node_configs(topology) + } + + fn initial_persist_dir( + topology: &Self::Deployment, + node_name: &str, + index: usize, + ) -> Option { + ::initial_persist_dir(topology, node_name, index) + } + + fn build_launch_spec( + config: &::NodeConfig, + dir: &Path, + label: &str, + ) -> Result { + ::build_launch_spec(config, dir, label) + } + + fn node_endpoints(config: &::NodeConfig) -> NodeEndpoints { + ::node_endpoints(config) + } + + fn node_client(endpoints: &NodeEndpoints) -> Self::NodeClient { + ::node_client(endpoints) + } + + fn readiness_endpoint_path() -> &'static str { + ::readiness_endpoint_path() + } +} + +fn map_start_options( + options: &StartNodeOptions, +) -> Result, DynError> { + if options.config_patch.is_some() { + return Err("LbcExtEnv local deployer bridge does not support config_patch yet".into()); + } + + let mut mapped = StartNodeOptions::::default(); + mapped.peers = options.peers.clone(); + mapped.config_override = options.config_override.clone(); + mapped.persist_dir = options.persist_dir.clone(); + + Ok(mapped) +} diff --git a/testing-framework/core/src/env.rs b/testing-framework/core/src/env.rs index 82cea91..e7ad9d6 100644 --- a/testing-framework/core/src/env.rs +++ b/testing-framework/core/src/env.rs @@ -1,7 +1,9 @@ +use std::io; + use async_trait::async_trait; use crate::{ - scenario::{DynError, FeedRuntime}, + scenario::{DynError, ExternalNodeSource, FeedRuntime}, topology::DeploymentDescriptor, }; @@ -16,16 +18,11 @@ pub trait Application: Send + Sync + 'static { type FeedRuntime: FeedRuntime; - /// Optional stable node identity (for example a peer id) used for - /// deduplication when nodes are discovered from multiple sources. - fn node_peer_identity(_client: &Self::NodeClient) -> Option { - None - } - - /// Optional endpoint identity used as a dedup fallback when no peer id is - /// available. - fn node_endpoint_identity(_client: &Self::NodeClient) -> Option { - None + /// Build an application node client from a static external source. + /// + /// Environments that support external nodes should override this. + fn external_node_client(_source: &ExternalNodeSource) -> Result { + Err(io::Error::other("external node sources are not supported").into()) } async fn prepare_feed( diff --git a/testing-framework/core/src/scenario/definition.rs b/testing-framework/core/src/scenario/definition.rs index 810bdae..e8b3a4e 100644 --- a/testing-framework/core/src/scenario/definition.rs +++ b/testing-framework/core/src/scenario/definition.rs @@ -734,7 +734,6 @@ fn source_plan_error_to_build_error(error: SourceOrchestrationPlanError) -> Scen const fn source_mode_name(mode: SourceModeName) -> &'static str { match mode { SourceModeName::Attached => "Attached", - SourceModeName::ExternalOnly => "ExternalOnly", } } diff --git a/testing-framework/core/src/scenario/runtime/inventory/node_inventory.rs b/testing-framework/core/src/scenario/runtime/inventory/node_inventory.rs index 18215e7..c45385e 100644 --- a/testing-framework/core/src/scenario/runtime/inventory/node_inventory.rs +++ b/testing-framework/core/src/scenario/runtime/inventory/node_inventory.rs @@ -304,25 +304,18 @@ fn upsert_node( } fn canonical_identity( - client: &E::NodeClient, + _client: &E::NodeClient, identity_hint: Option, inner: &mut NodeInventoryInner, ) -> String { - // Priority: explicit hint -> app-provided peer id -> endpoint -> synthetic. + // Priority: explicit hint -> synthetic. if let Some(identity) = identity_hint.filter(|value| !value.trim().is_empty()) { return identity; } - if let Some(identity) = E::node_peer_identity(client) { - return format!("peer:{identity}"); - } - - if let Some(identity) = E::node_endpoint_identity(client) { - return format!("endpoint:{identity}"); - } - let synthetic = format!("node:{}", inner.next_synthetic_id); inner.next_synthetic_id += 1; + synthetic } diff --git a/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs b/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs index 2c4b068..e88857b 100644 --- a/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs +++ b/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs @@ -41,14 +41,12 @@ pub struct SourceOrchestrationPlan { #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum SourceModeName { Attached, - ExternalOnly, } impl fmt::Display for SourceModeName { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::Attached => f.write_str("Attached"), - Self::ExternalOnly => f.write_str("ExternalOnly"), } } } @@ -87,9 +85,9 @@ impl SourceOrchestrationPlan { fn ensure_currently_wired(&self) -> Result<(), SourceOrchestrationPlanError> { match self.mode { - SourceOrchestrationMode::Managed { .. } => Ok(()), + SourceOrchestrationMode::Managed { .. } + | SourceOrchestrationMode::ExternalOnly { .. } => Ok(()), SourceOrchestrationMode::Attached { .. } => not_wired(SourceModeName::Attached), - SourceOrchestrationMode::ExternalOnly { .. } => not_wired(SourceModeName::ExternalOnly), } } } diff --git a/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs b/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs index c5b63dd..80fce0b 100644 --- a/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs +++ b/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs @@ -7,8 +7,9 @@ use crate::scenario::{ SourceOrchestrationMode, SourceOrchestrationPlan, SourceOrchestrationPlanError, }, providers::{ - AttachProviderError, AttachedNode, ExternalNode, ExternalProviderError, - ManagedProviderError, ManagedProvisionedNode, SourceProviders, StaticManagedProvider, + ApplicationExternalProvider, AttachProviderError, AttachedNode, ExternalNode, + ExternalProviderError, ManagedProviderError, ManagedProvisionedNode, SourceProviders, + StaticManagedProvider, }, }, }; @@ -47,9 +48,6 @@ pub fn build_source_orchestration_plan( } /// Resolves runtime source nodes via unified providers from orchestration plan. -/// -/// This currently returns managed nodes for managed mode and keeps external -/// overlays for managed mode reserved until external wiring is enabled. pub async fn resolve_sources( plan: &SourceOrchestrationPlan, providers: &SourceProviders, @@ -57,15 +55,18 @@ pub async fn resolve_sources( match &plan.mode { SourceOrchestrationMode::Managed { managed, .. } => { let managed_nodes = providers.managed.provide(managed).await?; + let external_nodes = providers.external.provide(plan.external_sources()).await?; + Ok(ResolvedSources { managed: managed_nodes, attached: Vec::new(), - external: Vec::new(), + external: external_nodes, }) } SourceOrchestrationMode::Attached { attach, external } => { let attached_nodes = providers.attach.discover(attach).await?; let external_nodes = providers.external.provide(external).await?; + Ok(ResolvedSources { managed: Vec::new(), attached: attached_nodes, @@ -74,6 +75,7 @@ pub async fn resolve_sources( } SourceOrchestrationMode::ExternalOnly { external } => { let external_nodes = providers.external.provide(external).await?; + Ok(ResolvedSources { managed: Vec::new(), attached: Vec::new(), @@ -88,16 +90,18 @@ pub async fn resolve_sources( /// Current wiring is transitional: /// - Managed mode is backed by prebuilt deployer-managed clients via /// `StaticManagedProvider`. -/// - Attached and external modes are represented in the orchestration plan and -/// resolver, but provider wiring is still scaffolding-only until full runtime -/// integration lands. +/// - External nodes are resolved via `Application::external_node_client`. +/// - Attached mode remains blocked at plan validation until attach providers +/// are fully wired. pub async fn orchestrate_sources( plan: &SourceOrchestrationPlan, node_clients: NodeClients, ) -> Result, DynError> { - let providers: SourceProviders = SourceProviders::default().with_managed(Arc::new( - StaticManagedProvider::new(node_clients.snapshot()), - )); + let providers: SourceProviders = SourceProviders::default() + .with_managed(Arc::new(StaticManagedProvider::new( + node_clients.snapshot(), + ))) + .with_external(Arc::new(ApplicationExternalProvider)); let resolved = resolve_sources(plan, &providers).await?; @@ -110,6 +114,8 @@ pub async fn orchestrate_sources( .managed .into_iter() .map(|node| node.client) + .chain(resolved.attached.into_iter().map(|node| node.client)) + .chain(resolved.external.into_iter().map(|node| node.client)) .collect(), )) } diff --git a/testing-framework/core/src/scenario/runtime/providers/external_provider.rs b/testing-framework/core/src/scenario/runtime/providers/external_provider.rs index 697c2aa..343a35c 100644 --- a/testing-framework/core/src/scenario/runtime/providers/external_provider.rs +++ b/testing-framework/core/src/scenario/runtime/providers/external_provider.rs @@ -26,9 +26,6 @@ pub enum ExternalProviderError { /// Internal adapter interface for constructing node clients from static /// external endpoint sources. -/// -/// This is scaffolding-only in phase 1 and is intentionally not wired into -/// deployer runtime orchestration yet. #[async_trait] pub trait ExternalProvider: Send + Sync { /// Builds external node handles from external source descriptors. @@ -38,8 +35,8 @@ pub trait ExternalProvider: Send + Sync { ) -> Result>, ExternalProviderError>; } -/// Default external provider stub used while external wiring is not -/// implemented. +/// Default external provider stub used when external wiring is intentionally +/// disabled. #[derive(Clone, Copy, Debug, Default)] pub struct NoopExternalProvider; @@ -58,3 +55,30 @@ impl ExternalProvider for NoopExternalProvider { }) } } + +/// External provider backed by [`Application::external_node_client`]. +#[derive(Clone, Copy, Debug, Default)] +pub struct ApplicationExternalProvider; + +#[async_trait] +impl ExternalProvider for ApplicationExternalProvider { + async fn provide( + &self, + sources: &[ExternalNodeSource], + ) -> Result>, ExternalProviderError> { + sources + .iter() + .map(|source| { + E::external_node_client(source) + .map(|client| ExternalNode { + identity_hint: Some(source.label.clone()), + client, + }) + .map_err(|build_error| ExternalProviderError::Build { + source_label: source.label.clone(), + source: build_error, + }) + }) + .collect() + } +} diff --git a/testing-framework/core/src/scenario/runtime/providers/mod.rs b/testing-framework/core/src/scenario/runtime/providers/mod.rs index 277b25c..60edaae 100644 --- a/testing-framework/core/src/scenario/runtime/providers/mod.rs +++ b/testing-framework/core/src/scenario/runtime/providers/mod.rs @@ -8,6 +8,6 @@ mod managed_provider; mod source_providers; pub use attach_provider::{AttachProviderError, AttachedNode}; -pub use external_provider::{ExternalNode, ExternalProviderError}; +pub use external_provider::{ApplicationExternalProvider, ExternalNode, ExternalProviderError}; pub use managed_provider::{ManagedProviderError, ManagedProvisionedNode, StaticManagedProvider}; pub use source_providers::SourceProviders; diff --git a/testing-framework/core/src/scenario/runtime/providers/source_providers.rs b/testing-framework/core/src/scenario/runtime/providers/source_providers.rs index c25c89f..9c35858 100644 --- a/testing-framework/core/src/scenario/runtime/providers/source_providers.rs +++ b/testing-framework/core/src/scenario/runtime/providers/source_providers.rs @@ -9,8 +9,8 @@ use crate::scenario::Application; /// Unified provider set used by source orchestration. /// -/// This is scaffolding-only and is intentionally not wired into runtime -/// deployer orchestration yet. +/// This is wired through source orchestration, but defaults to no-op providers +/// until deployers override specific source classes. pub struct SourceProviders { pub managed: Arc>, pub attach: Arc>, diff --git a/testing-framework/deployers/local/src/manual/mod.rs b/testing-framework/deployers/local/src/manual/mod.rs index 74b2195..e880f1e 100644 --- a/testing-framework/deployers/local/src/manual/mod.rs +++ b/testing-framework/deployers/local/src/manual/mod.rs @@ -1,6 +1,9 @@ use testing_framework_core::{ manual::ManualClusterHandle, - scenario::{DynError, NodeControlHandle, ReadinessError, StartNodeOptions, StartedNode}, + scenario::{ + DynError, ExternalNodeSource, NodeClients, NodeControlHandle, ReadinessError, + StartNodeOptions, StartedNode, + }, }; use thiserror::Error; @@ -78,6 +81,31 @@ impl ManualCluster { self.nodes.wait_node_ready(name).await?; Ok(()) } + + #[must_use] + pub fn node_clients(&self) -> NodeClients { + self.nodes.node_clients() + } + + pub fn add_external_sources( + &self, + external_sources: impl IntoIterator, + ) -> Result<(), DynError> { + let node_clients = self.nodes.node_clients(); + for source in external_sources { + let client = E::external_node_client(&source)?; + node_clients.add_node(client); + } + + Ok(()) + } + + pub fn add_external_clients(&self, clients: impl IntoIterator) { + let node_clients = self.nodes.node_clients(); + for client in clients { + node_clients.add_node(client); + } + } } impl Drop for ManualCluster {