mirror of
https://github.com/logos-blockchain/logos-blockchain-testing.git
synced 2026-02-23 14:43:07 +00:00
External provider
This commit is contained in:
commit
74b49481e3
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -2904,6 +2904,7 @@ dependencies = [
|
||||
"testing-framework-env",
|
||||
"testing-framework-runner-compose",
|
||||
"testing-framework-runner-k8s",
|
||||
"testing-framework-runner-local",
|
||||
"testing_framework",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
@ -5816,6 +5817,7 @@ dependencies = [
|
||||
"testing-framework-core",
|
||||
"testing-framework-runner-compose",
|
||||
"testing-framework-runner-k8s",
|
||||
"testing-framework-runner-local",
|
||||
"testing_framework",
|
||||
"tokio",
|
||||
"tracing",
|
||||
|
||||
@ -17,6 +17,7 @@ lb-workloads = { workspace = true }
|
||||
testing-framework-core = { workspace = true }
|
||||
testing-framework-runner-compose = { workspace = true }
|
||||
testing-framework-runner-k8s = { workspace = true }
|
||||
testing-framework-runner-local = { workspace = true }
|
||||
tokio = { features = ["macros", "net", "rt-multi-thread", "time"], workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { features = ["env-filter", "fmt"], version = "0.3" }
|
||||
|
||||
284
logos/examples/tests/external_sources_local.rs
Normal file
284
logos/examples/tests/external_sources_local.rs
Normal file
@ -0,0 +1,284 @@
|
||||
use std::{collections::HashSet, time::Duration};
|
||||
|
||||
use anyhow::Result;
|
||||
use lb_ext::{LbcExtEnv, ScenarioBuilder};
|
||||
use lb_framework::{
|
||||
DeploymentBuilder, LbcEnv, LbcLocalDeployer, LbcManualCluster, NodeHttpClient, TopologyConfig,
|
||||
configs::build_node_run_config,
|
||||
};
|
||||
use testing_framework_core::scenario::{
|
||||
Deployer as _, ExternalNodeSource, PeerSelection, StartNodeOptions,
|
||||
};
|
||||
use testing_framework_runner_local::ProcessDeployer;
|
||||
use tokio::time::sleep;
|
||||
|
||||
struct SeedCluster {
|
||||
_cluster: LbcManualCluster,
|
||||
node_a: NodeHttpClient,
|
||||
node_b: NodeHttpClient,
|
||||
bootstrap_peer_addresses: Vec<String>,
|
||||
}
|
||||
|
||||
impl SeedCluster {
|
||||
fn external_sources(&self) -> [ExternalNodeSource; 2] {
|
||||
[
|
||||
ExternalNodeSource::new("external-a".to_owned(), self.node_a.base_url().to_string()),
|
||||
ExternalNodeSource::new("external-b".to_owned(), self.node_b.base_url().to_string()),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore = "run manually with `cargo test -p runner-examples --test external_sources_local -- --ignored`"]
|
||||
async fn managed_local_plus_external_sources_are_orchestrated() -> Result<()> {
|
||||
let seed_cluster = start_seed_cluster().await?;
|
||||
let second_cluster_bootstrap_peers =
|
||||
parse_peer_addresses(&seed_cluster.bootstrap_peer_addresses)?;
|
||||
|
||||
let second_topology = DeploymentBuilder::new(TopologyConfig::with_node_numbers(2)).build()?;
|
||||
let second_cluster = LbcLocalDeployer::new().manual_cluster_from_descriptors(second_topology);
|
||||
let second_c = second_cluster
|
||||
.start_node_with(
|
||||
"c",
|
||||
StartNodeOptions::<LbcEnv>::default()
|
||||
.with_peers(PeerSelection::None)
|
||||
.create_patch({
|
||||
let peers = second_cluster_bootstrap_peers.clone();
|
||||
move |mut run_config| {
|
||||
run_config
|
||||
.user
|
||||
.network
|
||||
.backend
|
||||
.initial_peers
|
||||
.extend(peers.clone());
|
||||
Ok(run_config)
|
||||
}
|
||||
}),
|
||||
)
|
||||
.await?
|
||||
.client;
|
||||
|
||||
let second_d = second_cluster
|
||||
.start_node_with(
|
||||
"d",
|
||||
StartNodeOptions::<LbcEnv>::default()
|
||||
.with_peers(PeerSelection::Named(vec!["node-c".to_owned()]))
|
||||
.create_patch({
|
||||
let peers = second_cluster_bootstrap_peers.clone();
|
||||
move |mut run_config| {
|
||||
run_config
|
||||
.user
|
||||
.network
|
||||
.backend
|
||||
.initial_peers
|
||||
.extend(peers.clone());
|
||||
Ok(run_config)
|
||||
}
|
||||
}),
|
||||
)
|
||||
.await?
|
||||
.client;
|
||||
|
||||
second_cluster.wait_network_ready().await?;
|
||||
|
||||
wait_until_has_peers(&second_c, Duration::from_secs(30)).await?;
|
||||
wait_until_has_peers(&second_d, Duration::from_secs(30)).await?;
|
||||
|
||||
second_cluster.add_external_clients([seed_cluster.node_a.clone(), seed_cluster.node_b.clone()]);
|
||||
let orchestrated = second_cluster.node_clients();
|
||||
|
||||
assert_eq!(
|
||||
orchestrated.len(),
|
||||
4,
|
||||
"expected 2 managed + 2 external clients"
|
||||
);
|
||||
|
||||
let expected_endpoints: HashSet<String> = [
|
||||
seed_cluster.node_a.base_url().to_string(),
|
||||
seed_cluster.node_b.base_url().to_string(),
|
||||
second_c.base_url().to_string(),
|
||||
second_d.base_url().to_string(),
|
||||
]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let actual_endpoints: HashSet<String> = orchestrated
|
||||
.snapshot()
|
||||
.into_iter()
|
||||
.map(|client| client.base_url().to_string())
|
||||
.collect();
|
||||
|
||||
assert_eq!(actual_endpoints, expected_endpoints);
|
||||
|
||||
for client in orchestrated.snapshot() {
|
||||
let _ = client.consensus_info().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore = "run manually with `cargo test -p runner-examples --test external_sources_local -- --ignored`"]
|
||||
async fn scenario_managed_plus_external_sources_are_orchestrated() -> Result<()> {
|
||||
let seed_cluster = start_seed_cluster().await?;
|
||||
|
||||
let base_builder = DeploymentBuilder::new(TopologyConfig::with_node_numbers(2));
|
||||
let base_descriptors = base_builder.clone().build()?;
|
||||
let mut deployment_builder = base_builder;
|
||||
let parsed_peers = parse_peer_addresses(&seed_cluster.bootstrap_peer_addresses)?;
|
||||
|
||||
for node in base_descriptors.nodes() {
|
||||
let mut run_config = build_node_run_config(
|
||||
&base_descriptors,
|
||||
node,
|
||||
base_descriptors.config().node_config_override(node.index()),
|
||||
)
|
||||
.map_err(|error| anyhow::anyhow!(error.to_string()))?;
|
||||
run_config
|
||||
.user
|
||||
.network
|
||||
.backend
|
||||
.initial_peers
|
||||
.extend(parsed_peers.clone());
|
||||
deployment_builder = deployment_builder.with_node_config_override(node.index(), run_config);
|
||||
}
|
||||
|
||||
let mut scenario = ScenarioBuilder::new(Box::new(deployment_builder))
|
||||
.with_run_duration(Duration::from_secs(5))
|
||||
.with_external_node(seed_cluster.external_sources()[0].clone())
|
||||
.with_external_node(seed_cluster.external_sources()[1].clone())
|
||||
.build()?;
|
||||
|
||||
let deployer = ProcessDeployer::<LbcExtEnv>::default();
|
||||
let runner = deployer.deploy(&scenario).await?;
|
||||
let run_handle = runner.run(&mut scenario).await?;
|
||||
|
||||
let clients = run_handle.context().node_clients().snapshot();
|
||||
|
||||
assert_eq!(clients.len(), 4, "expected 2 managed + 2 external clients");
|
||||
|
||||
let first_a_endpoint = seed_cluster.node_a.base_url().to_string();
|
||||
let first_b_endpoint = seed_cluster.node_b.base_url().to_string();
|
||||
|
||||
for client in clients.iter().filter(|client| {
|
||||
let endpoint = client.base_url().to_string();
|
||||
endpoint != first_a_endpoint && endpoint != first_b_endpoint
|
||||
}) {
|
||||
wait_until_has_peers(client, Duration::from_secs(30)).await?;
|
||||
}
|
||||
|
||||
let expected_endpoints: HashSet<String> = [
|
||||
seed_cluster.node_a.base_url().to_string(),
|
||||
seed_cluster.node_b.base_url().to_string(),
|
||||
]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let actual_endpoints: HashSet<String> = clients
|
||||
.iter()
|
||||
.map(|client| client.base_url().to_string())
|
||||
.collect();
|
||||
|
||||
assert!(
|
||||
expected_endpoints.is_subset(&actual_endpoints),
|
||||
"scenario context should include external endpoints"
|
||||
);
|
||||
|
||||
for client in clients {
|
||||
let _ = client.consensus_info().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn start_seed_cluster() -> Result<SeedCluster> {
|
||||
let topology = DeploymentBuilder::new(TopologyConfig::with_node_numbers(2)).build()?;
|
||||
let cluster = LbcLocalDeployer::new().manual_cluster_from_descriptors(topology);
|
||||
let node_a = cluster
|
||||
.start_node_with("a", node_start_options(PeerSelection::None))
|
||||
.await?
|
||||
.client;
|
||||
let node_b = cluster
|
||||
.start_node_with(
|
||||
"b",
|
||||
node_start_options(PeerSelection::Named(vec!["node-a".to_owned()])),
|
||||
)
|
||||
.await?
|
||||
.client;
|
||||
cluster.wait_network_ready().await?;
|
||||
let bootstrap_peer_addresses = collect_loopback_peer_addresses(&node_a, &node_b).await?;
|
||||
|
||||
Ok(SeedCluster {
|
||||
_cluster: cluster,
|
||||
node_a,
|
||||
node_b,
|
||||
bootstrap_peer_addresses,
|
||||
})
|
||||
}
|
||||
|
||||
fn node_start_options(peers: PeerSelection) -> StartNodeOptions<LbcEnv> {
|
||||
let mut options = StartNodeOptions::<LbcEnv>::default();
|
||||
options.peers = peers;
|
||||
options
|
||||
}
|
||||
|
||||
async fn collect_loopback_peer_addresses(
|
||||
node_a: &lb_framework::NodeHttpClient,
|
||||
node_b: &lb_framework::NodeHttpClient,
|
||||
) -> Result<Vec<String>> {
|
||||
let mut peers = Vec::new();
|
||||
|
||||
for info in [node_a.network_info().await?, node_b.network_info().await?] {
|
||||
let addresses: Vec<String> = info
|
||||
.listen_addresses
|
||||
.into_iter()
|
||||
.map(|addr| addr.to_string())
|
||||
.collect();
|
||||
|
||||
let mut loopback: Vec<String> = addresses
|
||||
.iter()
|
||||
.filter(|addr| addr.contains("/127.0.0.1/"))
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
if loopback.is_empty() {
|
||||
loopback = addresses;
|
||||
}
|
||||
|
||||
peers.extend(loopback);
|
||||
}
|
||||
|
||||
Ok(peers)
|
||||
}
|
||||
|
||||
fn parse_peer_addresses<T>(addresses: &[String]) -> Result<Vec<T>>
|
||||
where
|
||||
T: std::str::FromStr,
|
||||
T::Err: std::error::Error + Send + Sync + 'static,
|
||||
{
|
||||
addresses
|
||||
.iter()
|
||||
.map(|address| address.parse::<T>().map_err(Into::into))
|
||||
.collect()
|
||||
}
|
||||
|
||||
async fn wait_until_has_peers(client: &NodeHttpClient, timeout: Duration) -> Result<()> {
|
||||
let start = tokio::time::Instant::now();
|
||||
loop {
|
||||
if let Ok(network_info) = client.network_info().await {
|
||||
if network_info.n_peers > 0 {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
if start.elapsed() >= timeout {
|
||||
anyhow::bail!(
|
||||
"node {} did not report non-zero peer count within {:?}",
|
||||
client.base_url(),
|
||||
timeout
|
||||
);
|
||||
}
|
||||
|
||||
sleep(Duration::from_millis(500)).await;
|
||||
}
|
||||
}
|
||||
@ -13,6 +13,7 @@ testing-framework-core = { workspace = true }
|
||||
testing-framework-env = { workspace = true }
|
||||
testing-framework-runner-compose = { workspace = true }
|
||||
testing-framework-runner-k8s = { workspace = true }
|
||||
testing-framework-runner-local = { workspace = true }
|
||||
|
||||
# Logos / Nomos deps
|
||||
lb_http_api_common = { workspace = true }
|
||||
|
||||
@ -1,9 +1,25 @@
|
||||
use std::sync::Arc;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
pub use lb_framework::*;
|
||||
use testing_framework_core::scenario::{Application, DynError, FeedRuntime, RunContext};
|
||||
use reqwest::Url;
|
||||
pub use scenario::{
|
||||
CoreBuilderExt, ObservabilityBuilderExt, ScenarioBuilder, ScenarioBuilderExt,
|
||||
ScenarioBuilderWith,
|
||||
};
|
||||
use testing_framework_core::scenario::{
|
||||
Application, DynError, ExternalNodeSource, FeedRuntime, RunContext, StartNodeOptions,
|
||||
};
|
||||
use testing_framework_runner_local::{
|
||||
BuiltNodeConfig, LocalDeployerEnv, NodeConfigEntry,
|
||||
process::{LaunchSpec, NodeEndpoints, ProcessSpawnError},
|
||||
};
|
||||
use tokio::sync::broadcast;
|
||||
use workloads::{LbcBlockFeedEnv, LbcScenarioEnv};
|
||||
|
||||
pub mod cfgsync;
|
||||
mod compose_env;
|
||||
@ -11,36 +27,106 @@ pub mod constants;
|
||||
mod k8s_env;
|
||||
pub mod scenario;
|
||||
|
||||
pub type LbcComposeDeployer = testing_framework_runner_compose::ComposeDeployer<LbcExtEnv>;
|
||||
pub type LbcK8sDeployer = testing_framework_runner_k8s::K8sDeployer<LbcExtEnv>;
|
||||
|
||||
pub struct LbcExtEnv;
|
||||
|
||||
#[async_trait]
|
||||
impl Application for LbcExtEnv {
|
||||
type Deployment = <lb_framework::LbcEnv as Application>::Deployment;
|
||||
type NodeClient = <lb_framework::LbcEnv as Application>::NodeClient;
|
||||
type NodeConfig = <lb_framework::LbcEnv as Application>::NodeConfig;
|
||||
type FeedRuntime = <lb_framework::LbcEnv as Application>::FeedRuntime;
|
||||
type Deployment = <LbcEnv as Application>::Deployment;
|
||||
|
||||
type NodeClient = <LbcEnv as Application>::NodeClient;
|
||||
|
||||
type NodeConfig = <LbcEnv as Application>::NodeConfig;
|
||||
|
||||
type FeedRuntime = <LbcEnv as Application>::FeedRuntime;
|
||||
|
||||
fn external_node_client(source: &ExternalNodeSource) -> Result<Self::NodeClient, DynError> {
|
||||
let base_url = Url::parse(&source.endpoint)?;
|
||||
Ok(NodeHttpClient::from_urls(base_url, None))
|
||||
}
|
||||
|
||||
async fn prepare_feed(
|
||||
client: Self::NodeClient,
|
||||
) -> Result<(<Self::FeedRuntime as FeedRuntime>::Feed, Self::FeedRuntime), DynError> {
|
||||
<lb_framework::LbcEnv as Application>::prepare_feed(client).await
|
||||
<LbcEnv as Application>::prepare_feed(client).await
|
||||
}
|
||||
}
|
||||
|
||||
pub use scenario::{
|
||||
CoreBuilderExt, ObservabilityBuilderExt, ScenarioBuilder, ScenarioBuilderExt,
|
||||
ScenarioBuilderWith,
|
||||
};
|
||||
impl LbcScenarioEnv for LbcExtEnv {}
|
||||
|
||||
pub type LbcComposeDeployer = testing_framework_runner_compose::ComposeDeployer<LbcExtEnv>;
|
||||
pub type LbcK8sDeployer = testing_framework_runner_k8s::K8sDeployer<LbcExtEnv>;
|
||||
|
||||
impl lb_framework::workloads::LbcScenarioEnv for LbcExtEnv {}
|
||||
|
||||
impl lb_framework::workloads::LbcBlockFeedEnv for LbcExtEnv {
|
||||
fn block_feed_subscription(
|
||||
ctx: &RunContext<Self>,
|
||||
) -> broadcast::Receiver<Arc<lb_framework::BlockRecord>> {
|
||||
impl LbcBlockFeedEnv for LbcExtEnv {
|
||||
fn block_feed_subscription(ctx: &RunContext<Self>) -> broadcast::Receiver<Arc<BlockRecord>> {
|
||||
ctx.feed().subscribe()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl LocalDeployerEnv for LbcExtEnv {
|
||||
fn build_node_config(
|
||||
topology: &Self::Deployment,
|
||||
index: usize,
|
||||
peer_ports_by_name: &HashMap<String, u16>,
|
||||
options: &StartNodeOptions<Self>,
|
||||
peer_ports: &[u16],
|
||||
) -> Result<BuiltNodeConfig<<Self as Application>::NodeConfig>, DynError> {
|
||||
let mapped_options = map_start_options(options)?;
|
||||
<LbcEnv as LocalDeployerEnv>::build_node_config(
|
||||
topology,
|
||||
index,
|
||||
peer_ports_by_name,
|
||||
&mapped_options,
|
||||
peer_ports,
|
||||
)
|
||||
}
|
||||
|
||||
fn build_initial_node_configs(
|
||||
topology: &Self::Deployment,
|
||||
) -> Result<Vec<NodeConfigEntry<<Self as Application>::NodeConfig>>, ProcessSpawnError> {
|
||||
<LbcEnv as LocalDeployerEnv>::build_initial_node_configs(topology)
|
||||
}
|
||||
|
||||
fn initial_persist_dir(
|
||||
topology: &Self::Deployment,
|
||||
node_name: &str,
|
||||
index: usize,
|
||||
) -> Option<PathBuf> {
|
||||
<LbcEnv as LocalDeployerEnv>::initial_persist_dir(topology, node_name, index)
|
||||
}
|
||||
|
||||
fn build_launch_spec(
|
||||
config: &<Self as Application>::NodeConfig,
|
||||
dir: &Path,
|
||||
label: &str,
|
||||
) -> Result<LaunchSpec, DynError> {
|
||||
<LbcEnv as LocalDeployerEnv>::build_launch_spec(config, dir, label)
|
||||
}
|
||||
|
||||
fn node_endpoints(config: &<Self as Application>::NodeConfig) -> NodeEndpoints {
|
||||
<LbcEnv as LocalDeployerEnv>::node_endpoints(config)
|
||||
}
|
||||
|
||||
fn node_client(endpoints: &NodeEndpoints) -> Self::NodeClient {
|
||||
<LbcEnv as LocalDeployerEnv>::node_client(endpoints)
|
||||
}
|
||||
|
||||
fn readiness_endpoint_path() -> &'static str {
|
||||
<LbcEnv as LocalDeployerEnv>::readiness_endpoint_path()
|
||||
}
|
||||
}
|
||||
|
||||
fn map_start_options(
|
||||
options: &StartNodeOptions<LbcExtEnv>,
|
||||
) -> Result<StartNodeOptions<LbcEnv>, DynError> {
|
||||
if options.config_patch.is_some() {
|
||||
return Err("LbcExtEnv local deployer bridge does not support config_patch yet".into());
|
||||
}
|
||||
|
||||
let mut mapped = StartNodeOptions::<LbcEnv>::default();
|
||||
mapped.peers = options.peers.clone();
|
||||
mapped.config_override = options.config_override.clone();
|
||||
mapped.persist_dir = options.persist_dir.clone();
|
||||
|
||||
Ok(mapped)
|
||||
}
|
||||
|
||||
@ -1,7 +1,9 @@
|
||||
use std::io;
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::{
|
||||
scenario::{DynError, FeedRuntime},
|
||||
scenario::{DynError, ExternalNodeSource, FeedRuntime},
|
||||
topology::DeploymentDescriptor,
|
||||
};
|
||||
|
||||
@ -16,16 +18,11 @@ pub trait Application: Send + Sync + 'static {
|
||||
|
||||
type FeedRuntime: FeedRuntime;
|
||||
|
||||
/// Optional stable node identity (for example a peer id) used for
|
||||
/// deduplication when nodes are discovered from multiple sources.
|
||||
fn node_peer_identity(_client: &Self::NodeClient) -> Option<String> {
|
||||
None
|
||||
}
|
||||
|
||||
/// Optional endpoint identity used as a dedup fallback when no peer id is
|
||||
/// available.
|
||||
fn node_endpoint_identity(_client: &Self::NodeClient) -> Option<String> {
|
||||
None
|
||||
/// Build an application node client from a static external source.
|
||||
///
|
||||
/// Environments that support external nodes should override this.
|
||||
fn external_node_client(_source: &ExternalNodeSource) -> Result<Self::NodeClient, DynError> {
|
||||
Err(io::Error::other("external node sources are not supported").into())
|
||||
}
|
||||
|
||||
async fn prepare_feed(
|
||||
|
||||
@ -734,7 +734,6 @@ fn source_plan_error_to_build_error(error: SourceOrchestrationPlanError) -> Scen
|
||||
const fn source_mode_name(mode: SourceModeName) -> &'static str {
|
||||
match mode {
|
||||
SourceModeName::Attached => "Attached",
|
||||
SourceModeName::ExternalOnly => "ExternalOnly",
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -304,25 +304,18 @@ fn upsert_node<E: Application>(
|
||||
}
|
||||
|
||||
fn canonical_identity<E: Application>(
|
||||
client: &E::NodeClient,
|
||||
_client: &E::NodeClient,
|
||||
identity_hint: Option<String>,
|
||||
inner: &mut NodeInventoryInner<E>,
|
||||
) -> String {
|
||||
// Priority: explicit hint -> app-provided peer id -> endpoint -> synthetic.
|
||||
// Priority: explicit hint -> synthetic.
|
||||
if let Some(identity) = identity_hint.filter(|value| !value.trim().is_empty()) {
|
||||
return identity;
|
||||
}
|
||||
|
||||
if let Some(identity) = E::node_peer_identity(client) {
|
||||
return format!("peer:{identity}");
|
||||
}
|
||||
|
||||
if let Some(identity) = E::node_endpoint_identity(client) {
|
||||
return format!("endpoint:{identity}");
|
||||
}
|
||||
|
||||
let synthetic = format!("node:{}", inner.next_synthetic_id);
|
||||
inner.next_synthetic_id += 1;
|
||||
|
||||
synthetic
|
||||
}
|
||||
|
||||
|
||||
@ -41,14 +41,12 @@ pub struct SourceOrchestrationPlan {
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub enum SourceModeName {
|
||||
Attached,
|
||||
ExternalOnly,
|
||||
}
|
||||
|
||||
impl fmt::Display for SourceModeName {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::Attached => f.write_str("Attached"),
|
||||
Self::ExternalOnly => f.write_str("ExternalOnly"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -87,9 +85,9 @@ impl SourceOrchestrationPlan {
|
||||
|
||||
fn ensure_currently_wired(&self) -> Result<(), SourceOrchestrationPlanError> {
|
||||
match self.mode {
|
||||
SourceOrchestrationMode::Managed { .. } => Ok(()),
|
||||
SourceOrchestrationMode::Managed { .. }
|
||||
| SourceOrchestrationMode::ExternalOnly { .. } => Ok(()),
|
||||
SourceOrchestrationMode::Attached { .. } => not_wired(SourceModeName::Attached),
|
||||
SourceOrchestrationMode::ExternalOnly { .. } => not_wired(SourceModeName::ExternalOnly),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -7,8 +7,9 @@ use crate::scenario::{
|
||||
SourceOrchestrationMode, SourceOrchestrationPlan, SourceOrchestrationPlanError,
|
||||
},
|
||||
providers::{
|
||||
AttachProviderError, AttachedNode, ExternalNode, ExternalProviderError,
|
||||
ManagedProviderError, ManagedProvisionedNode, SourceProviders, StaticManagedProvider,
|
||||
ApplicationExternalProvider, AttachProviderError, AttachedNode, ExternalNode,
|
||||
ExternalProviderError, ManagedProviderError, ManagedProvisionedNode, SourceProviders,
|
||||
StaticManagedProvider,
|
||||
},
|
||||
},
|
||||
};
|
||||
@ -47,9 +48,6 @@ pub fn build_source_orchestration_plan<E: Application, Caps>(
|
||||
}
|
||||
|
||||
/// Resolves runtime source nodes via unified providers from orchestration plan.
|
||||
///
|
||||
/// This currently returns managed nodes for managed mode and keeps external
|
||||
/// overlays for managed mode reserved until external wiring is enabled.
|
||||
pub async fn resolve_sources<E: Application>(
|
||||
plan: &SourceOrchestrationPlan,
|
||||
providers: &SourceProviders<E>,
|
||||
@ -57,15 +55,18 @@ pub async fn resolve_sources<E: Application>(
|
||||
match &plan.mode {
|
||||
SourceOrchestrationMode::Managed { managed, .. } => {
|
||||
let managed_nodes = providers.managed.provide(managed).await?;
|
||||
let external_nodes = providers.external.provide(plan.external_sources()).await?;
|
||||
|
||||
Ok(ResolvedSources {
|
||||
managed: managed_nodes,
|
||||
attached: Vec::new(),
|
||||
external: Vec::new(),
|
||||
external: external_nodes,
|
||||
})
|
||||
}
|
||||
SourceOrchestrationMode::Attached { attach, external } => {
|
||||
let attached_nodes = providers.attach.discover(attach).await?;
|
||||
let external_nodes = providers.external.provide(external).await?;
|
||||
|
||||
Ok(ResolvedSources {
|
||||
managed: Vec::new(),
|
||||
attached: attached_nodes,
|
||||
@ -74,6 +75,7 @@ pub async fn resolve_sources<E: Application>(
|
||||
}
|
||||
SourceOrchestrationMode::ExternalOnly { external } => {
|
||||
let external_nodes = providers.external.provide(external).await?;
|
||||
|
||||
Ok(ResolvedSources {
|
||||
managed: Vec::new(),
|
||||
attached: Vec::new(),
|
||||
@ -88,16 +90,18 @@ pub async fn resolve_sources<E: Application>(
|
||||
/// Current wiring is transitional:
|
||||
/// - Managed mode is backed by prebuilt deployer-managed clients via
|
||||
/// `StaticManagedProvider`.
|
||||
/// - Attached and external modes are represented in the orchestration plan and
|
||||
/// resolver, but provider wiring is still scaffolding-only until full runtime
|
||||
/// integration lands.
|
||||
/// - External nodes are resolved via `Application::external_node_client`.
|
||||
/// - Attached mode remains blocked at plan validation until attach providers
|
||||
/// are fully wired.
|
||||
pub async fn orchestrate_sources<E: Application>(
|
||||
plan: &SourceOrchestrationPlan,
|
||||
node_clients: NodeClients<E>,
|
||||
) -> Result<NodeClients<E>, DynError> {
|
||||
let providers: SourceProviders<E> = SourceProviders::default().with_managed(Arc::new(
|
||||
StaticManagedProvider::new(node_clients.snapshot()),
|
||||
));
|
||||
let providers: SourceProviders<E> = SourceProviders::default()
|
||||
.with_managed(Arc::new(StaticManagedProvider::new(
|
||||
node_clients.snapshot(),
|
||||
)))
|
||||
.with_external(Arc::new(ApplicationExternalProvider));
|
||||
|
||||
let resolved = resolve_sources(plan, &providers).await?;
|
||||
|
||||
@ -110,6 +114,8 @@ pub async fn orchestrate_sources<E: Application>(
|
||||
.managed
|
||||
.into_iter()
|
||||
.map(|node| node.client)
|
||||
.chain(resolved.attached.into_iter().map(|node| node.client))
|
||||
.chain(resolved.external.into_iter().map(|node| node.client))
|
||||
.collect(),
|
||||
))
|
||||
}
|
||||
|
||||
@ -26,9 +26,6 @@ pub enum ExternalProviderError {
|
||||
|
||||
/// Internal adapter interface for constructing node clients from static
|
||||
/// external endpoint sources.
|
||||
///
|
||||
/// This is scaffolding-only in phase 1 and is intentionally not wired into
|
||||
/// deployer runtime orchestration yet.
|
||||
#[async_trait]
|
||||
pub trait ExternalProvider<E: Application>: Send + Sync {
|
||||
/// Builds external node handles from external source descriptors.
|
||||
@ -38,8 +35,8 @@ pub trait ExternalProvider<E: Application>: Send + Sync {
|
||||
) -> Result<Vec<ExternalNode<E>>, ExternalProviderError>;
|
||||
}
|
||||
|
||||
/// Default external provider stub used while external wiring is not
|
||||
/// implemented.
|
||||
/// Default external provider stub used when external wiring is intentionally
|
||||
/// disabled.
|
||||
#[derive(Clone, Copy, Debug, Default)]
|
||||
pub struct NoopExternalProvider;
|
||||
|
||||
@ -58,3 +55,30 @@ impl<E: Application> ExternalProvider<E> for NoopExternalProvider {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// External provider backed by [`Application::external_node_client`].
|
||||
#[derive(Clone, Copy, Debug, Default)]
|
||||
pub struct ApplicationExternalProvider;
|
||||
|
||||
#[async_trait]
|
||||
impl<E: Application> ExternalProvider<E> for ApplicationExternalProvider {
|
||||
async fn provide(
|
||||
&self,
|
||||
sources: &[ExternalNodeSource],
|
||||
) -> Result<Vec<ExternalNode<E>>, ExternalProviderError> {
|
||||
sources
|
||||
.iter()
|
||||
.map(|source| {
|
||||
E::external_node_client(source)
|
||||
.map(|client| ExternalNode {
|
||||
identity_hint: Some(source.label.clone()),
|
||||
client,
|
||||
})
|
||||
.map_err(|build_error| ExternalProviderError::Build {
|
||||
source_label: source.label.clone(),
|
||||
source: build_error,
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
@ -8,6 +8,6 @@ mod managed_provider;
|
||||
mod source_providers;
|
||||
|
||||
pub use attach_provider::{AttachProviderError, AttachedNode};
|
||||
pub use external_provider::{ExternalNode, ExternalProviderError};
|
||||
pub use external_provider::{ApplicationExternalProvider, ExternalNode, ExternalProviderError};
|
||||
pub use managed_provider::{ManagedProviderError, ManagedProvisionedNode, StaticManagedProvider};
|
||||
pub use source_providers::SourceProviders;
|
||||
|
||||
@ -9,8 +9,8 @@ use crate::scenario::Application;
|
||||
|
||||
/// Unified provider set used by source orchestration.
|
||||
///
|
||||
/// This is scaffolding-only and is intentionally not wired into runtime
|
||||
/// deployer orchestration yet.
|
||||
/// This is wired through source orchestration, but defaults to no-op providers
|
||||
/// until deployers override specific source classes.
|
||||
pub struct SourceProviders<E: Application> {
|
||||
pub managed: Arc<dyn ManagedProvider<E>>,
|
||||
pub attach: Arc<dyn AttachProvider<E>>,
|
||||
|
||||
@ -12,8 +12,8 @@ use testing_framework_core::{
|
||||
scenario::{
|
||||
Application, CleanupGuard, Deployer, DeploymentPolicy, DynError, FeedHandle, FeedRuntime,
|
||||
HttpReadinessRequirement, Metrics, NodeClients, NodeControlCapability, NodeControlHandle,
|
||||
RetryPolicy, RunContext, Runner, Scenario, ScenarioError, build_source_orchestration_plan,
|
||||
orchestrate_sources, spawn_feed,
|
||||
RetryPolicy, RunContext, Runner, Scenario, ScenarioError, SourceOrchestrationPlan,
|
||||
build_source_orchestration_plan, spawn_feed,
|
||||
},
|
||||
topology::DeploymentDescriptor,
|
||||
};
|
||||
@ -26,6 +26,7 @@ use tracing::{debug, info, warn};
|
||||
|
||||
use crate::{
|
||||
env::{LocalDeployerEnv, Node, wait_local_http_readiness},
|
||||
external::build_external_client,
|
||||
keep_tempdir_from_env,
|
||||
manual::ManualCluster,
|
||||
node_control::{NodeManager, NodeManagerSeed},
|
||||
@ -202,9 +203,7 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
|
||||
let nodes = Self::spawn_nodes_for_scenario(scenario, self.membership_check).await?;
|
||||
let node_clients = NodeClients::<E>::new(nodes.iter().map(|node| node.client()).collect());
|
||||
|
||||
// Source orchestration currently runs here after managed clients are prepared.
|
||||
let node_clients = orchestrate_sources(&source_plan, node_clients)
|
||||
.await
|
||||
let node_clients = merge_source_clients_for_local::<E>(&source_plan, node_clients)
|
||||
.map_err(|source| ProcessDeployerError::SourceOrchestration { source })?;
|
||||
|
||||
let runtime = run_context_for(
|
||||
@ -241,10 +240,9 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
|
||||
|
||||
let nodes = Self::spawn_nodes_for_scenario(scenario, self.membership_check).await?;
|
||||
let node_control = self.node_control_from(scenario, nodes);
|
||||
// Source orchestration currently runs here after managed clients are prepared.
|
||||
let node_clients = orchestrate_sources(&source_plan, node_control.node_clients())
|
||||
.await
|
||||
.map_err(|source| ProcessDeployerError::SourceOrchestration { source })?;
|
||||
let node_clients =
|
||||
merge_source_clients_for_local::<E>(&source_plan, node_control.node_clients())
|
||||
.map_err(|source| ProcessDeployerError::SourceOrchestration { source })?;
|
||||
let runtime = run_context_for(
|
||||
scenario.deployment().clone(),
|
||||
node_clients,
|
||||
@ -314,6 +312,18 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
|
||||
}
|
||||
}
|
||||
|
||||
fn merge_source_clients_for_local<E: LocalDeployerEnv>(
|
||||
source_plan: &SourceOrchestrationPlan,
|
||||
node_clients: NodeClients<E>,
|
||||
) -> Result<NodeClients<E>, DynError> {
|
||||
for source in source_plan.external_sources() {
|
||||
let client =
|
||||
E::external_node_client(source).or_else(|_| build_external_client::<E>(source))?;
|
||||
node_clients.add_node(client);
|
||||
}
|
||||
Ok(node_clients)
|
||||
}
|
||||
|
||||
fn build_retry_execution_config(
|
||||
deployment_policy: DeploymentPolicy,
|
||||
membership_check: bool,
|
||||
|
||||
90
testing-framework/deployers/local/src/external.rs
Normal file
90
testing-framework/deployers/local/src/external.rs
Normal file
@ -0,0 +1,90 @@
|
||||
use std::net::ToSocketAddrs;
|
||||
|
||||
use testing_framework_core::scenario::{DynError, ExternalNodeSource};
|
||||
|
||||
use crate::{LocalDeployerEnv, NodeEndpoints};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ExternalClientBuildError {
|
||||
#[error("external source '{label}' endpoint is empty")]
|
||||
EmptyEndpoint { label: String },
|
||||
#[error("external source '{label}' endpoint '{endpoint}' has unsupported scheme")]
|
||||
UnsupportedScheme { label: String, endpoint: String },
|
||||
#[error("external source '{label}' endpoint '{endpoint}' is missing host")]
|
||||
MissingHost { label: String, endpoint: String },
|
||||
#[error("external source '{label}' endpoint '{endpoint}' is missing port")]
|
||||
MissingPort { label: String, endpoint: String },
|
||||
#[error("external source '{label}' endpoint '{endpoint}' failed to resolve: {source}")]
|
||||
Resolve {
|
||||
label: String,
|
||||
endpoint: String,
|
||||
#[source]
|
||||
source: std::io::Error,
|
||||
},
|
||||
#[error("external source '{label}' endpoint '{endpoint}' resolved to no socket addresses")]
|
||||
NoResolvedAddress { label: String, endpoint: String },
|
||||
}
|
||||
|
||||
pub fn build_external_client<E: LocalDeployerEnv>(
|
||||
source: &ExternalNodeSource,
|
||||
) -> Result<E::NodeClient, DynError> {
|
||||
let api = resolve_api_socket(source)?;
|
||||
let mut endpoints = NodeEndpoints::default();
|
||||
endpoints.api = api;
|
||||
Ok(E::node_client(&endpoints))
|
||||
}
|
||||
|
||||
fn resolve_api_socket(source: &ExternalNodeSource) -> Result<std::net::SocketAddr, DynError> {
|
||||
let source_label = source.label.clone();
|
||||
let endpoint = source.endpoint.trim();
|
||||
if endpoint.is_empty() {
|
||||
return Err(ExternalClientBuildError::EmptyEndpoint {
|
||||
label: source_label,
|
||||
}
|
||||
.into());
|
||||
}
|
||||
|
||||
let without_scheme = endpoint
|
||||
.strip_prefix("http://")
|
||||
.or_else(|| endpoint.strip_prefix("https://"))
|
||||
.ok_or_else(|| ExternalClientBuildError::UnsupportedScheme {
|
||||
label: source_label.clone(),
|
||||
endpoint: endpoint.to_owned(),
|
||||
})?;
|
||||
|
||||
let authority = without_scheme.trim_end_matches('/');
|
||||
let (host, port) =
|
||||
split_host_port(authority).ok_or_else(|| ExternalClientBuildError::MissingPort {
|
||||
label: source_label.clone(),
|
||||
endpoint: endpoint.to_owned(),
|
||||
})?;
|
||||
|
||||
if host.is_empty() {
|
||||
return Err(ExternalClientBuildError::MissingHost {
|
||||
label: source_label.clone(),
|
||||
endpoint: endpoint.to_owned(),
|
||||
}
|
||||
.into());
|
||||
}
|
||||
|
||||
let resolved = (host, port)
|
||||
.to_socket_addrs()
|
||||
.map_err(|source| ExternalClientBuildError::Resolve {
|
||||
label: source_label.clone(),
|
||||
endpoint: endpoint.to_owned(),
|
||||
source,
|
||||
})?
|
||||
.next()
|
||||
.ok_or_else(|| ExternalClientBuildError::NoResolvedAddress {
|
||||
label: source_label,
|
||||
endpoint: endpoint.to_owned(),
|
||||
})?;
|
||||
|
||||
Ok(resolved)
|
||||
}
|
||||
|
||||
fn split_host_port(authority: &str) -> Option<(&str, u16)> {
|
||||
let (host, port) = authority.rsplit_once(':')?;
|
||||
let port = port.parse::<u16>().ok()?;
|
||||
Some((host.trim_matches(['[', ']']), port))
|
||||
}
|
||||
@ -1,6 +1,7 @@
|
||||
pub mod binary;
|
||||
mod deployer;
|
||||
pub mod env;
|
||||
mod external;
|
||||
mod manual;
|
||||
mod node_control;
|
||||
pub mod process;
|
||||
|
||||
@ -1,11 +1,15 @@
|
||||
use testing_framework_core::{
|
||||
manual::ManualClusterHandle,
|
||||
scenario::{DynError, NodeControlHandle, ReadinessError, StartNodeOptions, StartedNode},
|
||||
scenario::{
|
||||
DynError, ExternalNodeSource, NodeClients, NodeControlHandle, ReadinessError,
|
||||
StartNodeOptions, StartedNode,
|
||||
},
|
||||
};
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::{
|
||||
env::LocalDeployerEnv,
|
||||
external::build_external_client,
|
||||
keep_tempdir_from_env,
|
||||
node_control::{NodeManager, NodeManagerError, NodeManagerSeed},
|
||||
};
|
||||
@ -78,6 +82,32 @@ impl<E: LocalDeployerEnv> ManualCluster<E> {
|
||||
self.nodes.wait_node_ready(name).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn node_clients(&self) -> NodeClients<E> {
|
||||
self.nodes.node_clients()
|
||||
}
|
||||
|
||||
pub fn add_external_sources(
|
||||
&self,
|
||||
external_sources: impl IntoIterator<Item = ExternalNodeSource>,
|
||||
) -> Result<(), DynError> {
|
||||
let node_clients = self.nodes.node_clients();
|
||||
for source in external_sources {
|
||||
let client = E::external_node_client(&source)
|
||||
.or_else(|_| build_external_client::<E>(&source))?;
|
||||
node_clients.add_node(client);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn add_external_clients(&self, clients: impl IntoIterator<Item = E::NodeClient>) {
|
||||
let node_clients = self.nodes.node_clients();
|
||||
for client in clients {
|
||||
node_clients.add_node(client);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: LocalDeployerEnv> Drop for ManualCluster<E> {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user