diff --git a/testing-framework/deployers/local/src/deployer/orchestrator.rs b/testing-framework/deployers/local/src/deployer/orchestrator.rs index 27048d1..a365348 100644 --- a/testing-framework/deployers/local/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/local/src/deployer/orchestrator.rs @@ -12,8 +12,8 @@ use testing_framework_core::{ scenario::{ Application, CleanupGuard, Deployer, DeploymentPolicy, DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlCapability, NodeControlHandle, - RetryPolicy, RunContext, Runner, Scenario, ScenarioError, build_source_orchestration_plan, - orchestrate_sources, spawn_feed, + RetryPolicy, RunContext, Runner, Scenario, ScenarioError, SourceOrchestrationPlan, + build_source_orchestration_plan, spawn_feed, }, topology::DeploymentDescriptor, }; @@ -26,6 +26,7 @@ use tracing::{debug, info, warn}; use crate::{ env::{LocalDeployerEnv, Node, wait_local_http_readiness}, + external::build_external_client, keep_tempdir_from_env, manual::ManualCluster, node_control::{NodeManager, NodeManagerSeed}, @@ -202,9 +203,7 @@ impl ProcessDeployer { let nodes = Self::spawn_nodes_for_scenario(scenario, self.membership_check).await?; let node_clients = NodeClients::::new(nodes.iter().map(|node| node.client()).collect()); - // Source orchestration currently runs here after managed clients are prepared. - let node_clients = orchestrate_sources(&source_plan, node_clients) - .await + let node_clients = merge_source_clients_for_local::(&source_plan, node_clients) .map_err(|source| ProcessDeployerError::SourceOrchestration { source })?; let runtime = run_context_for( @@ -241,10 +240,9 @@ impl ProcessDeployer { let nodes = Self::spawn_nodes_for_scenario(scenario, self.membership_check).await?; let node_control = self.node_control_from(scenario, nodes); - // Source orchestration currently runs here after managed clients are prepared. - let node_clients = orchestrate_sources(&source_plan, node_control.node_clients()) - .await - .map_err(|source| ProcessDeployerError::SourceOrchestration { source })?; + let node_clients = + merge_source_clients_for_local::(&source_plan, node_control.node_clients()) + .map_err(|source| ProcessDeployerError::SourceOrchestration { source })?; let runtime = run_context_for( scenario.deployment().clone(), node_clients, @@ -314,6 +312,18 @@ impl ProcessDeployer { } } +fn merge_source_clients_for_local( + source_plan: &SourceOrchestrationPlan, + node_clients: NodeClients, +) -> Result, DynError> { + for source in source_plan.external_sources() { + let client = + E::external_node_client(source).or_else(|_| build_external_client::(source))?; + node_clients.add_node(client); + } + Ok(node_clients) +} + fn build_retry_execution_config( deployment_policy: DeploymentPolicy, membership_check: bool, diff --git a/testing-framework/deployers/local/src/external.rs b/testing-framework/deployers/local/src/external.rs new file mode 100644 index 0000000..713c5be --- /dev/null +++ b/testing-framework/deployers/local/src/external.rs @@ -0,0 +1,90 @@ +use std::net::ToSocketAddrs; + +use testing_framework_core::scenario::{DynError, ExternalNodeSource}; + +use crate::{LocalDeployerEnv, NodeEndpoints}; + +#[derive(Debug, thiserror::Error)] +pub enum ExternalClientBuildError { + #[error("external source '{label}' endpoint is empty")] + EmptyEndpoint { label: String }, + #[error("external source '{label}' endpoint '{endpoint}' has unsupported scheme")] + UnsupportedScheme { label: String, endpoint: String }, + #[error("external source '{label}' endpoint '{endpoint}' is missing host")] + MissingHost { label: String, endpoint: String }, + #[error("external source '{label}' endpoint '{endpoint}' is missing port")] + MissingPort { label: String, endpoint: String }, + #[error("external source '{label}' endpoint '{endpoint}' failed to resolve: {source}")] + Resolve { + label: String, + endpoint: String, + #[source] + source: std::io::Error, + }, + #[error("external source '{label}' endpoint '{endpoint}' resolved to no socket addresses")] + NoResolvedAddress { label: String, endpoint: String }, +} + +pub fn build_external_client( + source: &ExternalNodeSource, +) -> Result { + let api = resolve_api_socket(source)?; + let mut endpoints = NodeEndpoints::default(); + endpoints.api = api; + Ok(E::node_client(&endpoints)) +} + +fn resolve_api_socket(source: &ExternalNodeSource) -> Result { + let source_label = source.label.clone(); + let endpoint = source.endpoint.trim(); + if endpoint.is_empty() { + return Err(ExternalClientBuildError::EmptyEndpoint { + label: source_label, + } + .into()); + } + + let without_scheme = endpoint + .strip_prefix("http://") + .or_else(|| endpoint.strip_prefix("https://")) + .ok_or_else(|| ExternalClientBuildError::UnsupportedScheme { + label: source_label.clone(), + endpoint: endpoint.to_owned(), + })?; + + let authority = without_scheme.trim_end_matches('/'); + let (host, port) = + split_host_port(authority).ok_or_else(|| ExternalClientBuildError::MissingPort { + label: source_label.clone(), + endpoint: endpoint.to_owned(), + })?; + + if host.is_empty() { + return Err(ExternalClientBuildError::MissingHost { + label: source_label.clone(), + endpoint: endpoint.to_owned(), + } + .into()); + } + + let resolved = (host, port) + .to_socket_addrs() + .map_err(|source| ExternalClientBuildError::Resolve { + label: source_label.clone(), + endpoint: endpoint.to_owned(), + source, + })? + .next() + .ok_or_else(|| ExternalClientBuildError::NoResolvedAddress { + label: source_label, + endpoint: endpoint.to_owned(), + })?; + + Ok(resolved) +} + +fn split_host_port(authority: &str) -> Option<(&str, u16)> { + let (host, port) = authority.rsplit_once(':')?; + let port = port.parse::().ok()?; + Some((host.trim_matches(['[', ']']), port)) +} diff --git a/testing-framework/deployers/local/src/lib.rs b/testing-framework/deployers/local/src/lib.rs index df19be3..9531a3b 100644 --- a/testing-framework/deployers/local/src/lib.rs +++ b/testing-framework/deployers/local/src/lib.rs @@ -1,6 +1,7 @@ pub mod binary; mod deployer; pub mod env; +mod external; mod manual; mod node_control; pub mod process; diff --git a/testing-framework/deployers/local/src/manual/mod.rs b/testing-framework/deployers/local/src/manual/mod.rs index e880f1e..36a4019 100644 --- a/testing-framework/deployers/local/src/manual/mod.rs +++ b/testing-framework/deployers/local/src/manual/mod.rs @@ -9,6 +9,7 @@ use thiserror::Error; use crate::{ env::LocalDeployerEnv, + external::build_external_client, keep_tempdir_from_env, node_control::{NodeManager, NodeManagerError, NodeManagerSeed}, }; @@ -93,7 +94,8 @@ impl ManualCluster { ) -> Result<(), DynError> { let node_clients = self.nodes.node_clients(); for source in external_sources { - let client = E::external_node_client(&source)?; + let client = E::external_node_client(&source) + .or_else(|_| build_external_client::(&source))?; node_clients.add_node(client); }