diff --git a/testing-framework/core/src/scenario/definition.rs b/testing-framework/core/src/scenario/definition.rs index f487eb2..810bdae 100644 --- a/testing-framework/core/src/scenario/definition.rs +++ b/testing-framework/core/src/scenario/definition.rs @@ -45,6 +45,7 @@ pub struct Scenario { deployment_policy: DeploymentPolicy, sources: ScenarioSources, source_readiness_policy: SourceReadinessPolicy, + source_orchestration_plan: SourceOrchestrationPlan, capabilities: Caps, } @@ -58,6 +59,7 @@ impl Scenario { deployment_policy: DeploymentPolicy, sources: ScenarioSources, source_readiness_policy: SourceReadinessPolicy, + source_orchestration_plan: SourceOrchestrationPlan, capabilities: Caps, ) -> Self { Self { @@ -69,6 +71,7 @@ impl Scenario { deployment_policy, sources, source_readiness_policy, + source_orchestration_plan, capabilities, } } @@ -127,6 +130,11 @@ impl Scenario { &self.sources } + #[must_use] + pub const fn source_orchestration_plan(&self) -> &SourceOrchestrationPlan { + &self.source_orchestration_plan + } + #[must_use] pub const fn capabilities(&self) -> &Caps { &self.capabilities @@ -604,11 +612,8 @@ impl Builder { let descriptors = parts.resolve_deployment()?; let run_plan = parts.run_plan(); let run_metrics = RunMetrics::new(run_plan.duration); - let _source_plan = build_source_orchestration_plan( - parts.sources(), - descriptors.node_count(), - parts.source_readiness_policy, - )?; + let source_orchestration_plan = + build_source_orchestration_plan(parts.sources(), parts.source_readiness_policy)?; initialize_components( &descriptors, @@ -636,6 +641,7 @@ impl Builder { parts.deployment_policy, parts.sources, parts.source_readiness_policy, + source_orchestration_plan, parts.capabilities, )) } @@ -709,19 +715,14 @@ impl BuilderParts { fn build_source_orchestration_plan( sources: &ScenarioSources, - managed_node_count: usize, readiness_policy: SourceReadinessPolicy, ) -> Result { - SourceOrchestrationPlan::try_from_sources(sources, managed_node_count, readiness_policy) + SourceOrchestrationPlan::try_from_sources(sources, readiness_policy) .map_err(source_plan_error_to_build_error) } fn source_plan_error_to_build_error(error: SourceOrchestrationPlanError) -> ScenarioBuildError { match error { - SourceOrchestrationPlanError::ManagedNodesMissing => ScenarioBuildError::SourceConfiguration { - message: - "managed source selected but deployment produced 0 managed nodes; choose a deployment provider/configuration that yields managed nodes".to_string(), - }, SourceOrchestrationPlanError::SourceModeNotWiredYet { mode } => { ScenarioBuildError::SourceModeNotWiredYet { mode: source_mode_name(mode), diff --git a/testing-framework/core/src/scenario/mod.rs b/testing-framework/core/src/scenario/mod.rs index 29db468..ffa3aa3 100644 --- a/testing-framework/core/src/scenario/mod.rs +++ b/testing-framework/core/src/scenario/mod.rs @@ -38,15 +38,17 @@ pub use expectation::Expectation; pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs}; pub use runtime::{ BorrowedNode, BorrowedOrigin, CleanupGuard, Deployer, Feed, FeedHandle, FeedRuntime, - HttpReadinessRequirement, ManagedNode, NodeClients, NodeHandle, NodeInventory, ReadinessError, - RunContext, RunHandle, RunMetrics, Runner, ScenarioError, StabilizationConfig, + HttpReadinessRequirement, ManagedNode, ManagedSource, NodeClients, NodeHandle, NodeInventory, + ReadinessError, RunContext, RunHandle, RunMetrics, Runner, ScenarioError, + SourceOrchestrationPlan, SourceProviders, StabilizationConfig, StaticManagedProvider, + build_source_orchestration_plan, metrics::{ CONSENSUS_PROCESSED_BLOCKS, CONSENSUS_TRANSACTIONS_TOTAL, Metrics, MetricsError, PrometheusEndpoint, PrometheusInstantSample, }, - spawn_feed, wait_for_http_ports, wait_for_http_ports_with_host, - wait_for_http_ports_with_host_and_requirement, wait_for_http_ports_with_requirement, - wait_http_readiness, wait_until_stable, + orchestrate_sources, resolve_sources, spawn_feed, wait_for_http_ports, + wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement, + wait_for_http_ports_with_requirement, wait_http_readiness, wait_until_stable, }; pub use sources::{AttachSource, ExternalNodeSource, ScenarioSources, SourceReadinessPolicy}; pub use workload::Workload; diff --git a/testing-framework/core/src/scenario/runtime/mod.rs b/testing-framework/core/src/scenario/runtime/mod.rs index 18147de..9f5f415 100644 --- a/testing-framework/core/src/scenario/runtime/mod.rs +++ b/testing-framework/core/src/scenario/runtime/mod.rs @@ -3,8 +3,8 @@ mod deployer; pub mod inventory; pub mod metrics; mod node_clients; -pub(crate) mod orchestration; -pub(crate) mod providers; +pub mod orchestration; +pub mod providers; pub mod readiness; mod runner; @@ -13,6 +13,13 @@ pub use context::{CleanupGuard, RunContext, RunHandle, RunMetrics}; pub use deployer::{Deployer, ScenarioError}; pub use inventory::{BorrowedNode, BorrowedOrigin, ManagedNode, NodeHandle, NodeInventory}; pub use node_clients::NodeClients; +#[doc(hidden)] +pub use orchestration::{ + ManagedSource, SourceOrchestrationPlan, build_source_orchestration_plan, orchestrate_sources, + resolve_sources, +}; +#[doc(hidden)] +pub use providers::{SourceProviders, StaticManagedProvider}; pub use readiness::{ HttpReadinessRequirement, ReadinessError, StabilizationConfig, wait_for_http_ports, wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement, diff --git a/testing-framework/core/src/scenario/runtime/orchestration/mod.rs b/testing-framework/core/src/scenario/runtime/orchestration/mod.rs index a2da548..cb7d5b2 100644 --- a/testing-framework/core/src/scenario/runtime/orchestration/mod.rs +++ b/testing-framework/core/src/scenario/runtime/orchestration/mod.rs @@ -1,6 +1,10 @@ #[allow(dead_code)] mod source_orchestration_plan; +#[allow(dead_code)] +mod source_resolver; -pub(crate) use source_orchestration_plan::{ - SourceModeName, SourceOrchestrationPlan, SourceOrchestrationPlanError, +pub use source_orchestration_plan::{ + ManagedSource, SourceModeName, SourceOrchestrationMode, SourceOrchestrationPlan, + SourceOrchestrationPlanError, }; +pub use source_resolver::{build_source_orchestration_plan, orchestrate_sources, resolve_sources}; diff --git a/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs b/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs index f11b831..2c4b068 100644 --- a/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs +++ b/testing-framework/core/src/scenario/runtime/orchestration/source_orchestration_plan.rs @@ -56,8 +56,6 @@ impl fmt::Display for SourceModeName { /// Validation failure while building orchestration plan from sources. #[derive(Debug, thiserror::Error)] pub enum SourceOrchestrationPlanError { - #[error("managed source selected but deployment produced 0 managed nodes")] - ManagedNodesMissing, #[error("source mode '{mode}' is not wired into deployers yet")] SourceModeNotWiredYet { mode: SourceModeName }, } @@ -65,10 +63,8 @@ pub enum SourceOrchestrationPlanError { impl SourceOrchestrationPlan { pub fn try_from_sources( sources: &ScenarioSources, - managed_node_count: usize, readiness_policy: SourceReadinessPolicy, ) -> Result { - ensure_managed_sources_have_nodes(sources, managed_node_count)?; let mode = mode_from_sources(sources); let plan = Self { @@ -98,17 +94,6 @@ impl SourceOrchestrationPlan { } } -fn ensure_managed_sources_have_nodes( - sources: &ScenarioSources, - managed_node_count: usize, -) -> Result<(), SourceOrchestrationPlanError> { - if matches!(sources, ScenarioSources::Managed { .. }) && managed_node_count == 0 { - return Err(SourceOrchestrationPlanError::ManagedNodesMissing); - } - - Ok(()) -} - fn mode_from_sources(sources: &ScenarioSources) -> SourceOrchestrationMode { match sources { ScenarioSources::Managed { external } => SourceOrchestrationMode::Managed { diff --git a/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs b/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs new file mode 100644 index 0000000..c5b63dd --- /dev/null +++ b/testing-framework/core/src/scenario/runtime/orchestration/source_resolver.rs @@ -0,0 +1,115 @@ +use std::sync::Arc; + +use crate::scenario::{ + Application, DynError, NodeClients, Scenario, + runtime::{ + orchestration::{ + SourceOrchestrationMode, SourceOrchestrationPlan, SourceOrchestrationPlanError, + }, + providers::{ + AttachProviderError, AttachedNode, ExternalNode, ExternalProviderError, + ManagedProviderError, ManagedProvisionedNode, SourceProviders, StaticManagedProvider, + }, + }, +}; + +/// Resolved source nodes grouped by source class. +#[derive(Clone, Default)] +pub struct ResolvedSources { + pub managed: Vec>, + pub attached: Vec>, + pub external: Vec>, +} + +/// Errors while resolving sources through unified providers. +#[derive(Debug, thiserror::Error)] +pub enum SourceResolveError { + #[error(transparent)] + Plan(#[from] SourceOrchestrationPlanError), + #[error("managed source selected but deployer produced 0 managed nodes")] + ManagedNodesMissing, + #[error(transparent)] + Managed(#[from] ManagedProviderError), + #[error(transparent)] + Attach(#[from] AttachProviderError), + #[error(transparent)] + External(#[from] ExternalProviderError), +} + +/// Builds a source orchestration plan from scenario source configuration. +pub fn build_source_orchestration_plan( + scenario: &Scenario, +) -> Result { + SourceOrchestrationPlan::try_from_sources( + scenario.sources(), + scenario.source_readiness_policy(), + ) +} + +/// Resolves runtime source nodes via unified providers from orchestration plan. +/// +/// This currently returns managed nodes for managed mode and keeps external +/// overlays for managed mode reserved until external wiring is enabled. +pub async fn resolve_sources( + plan: &SourceOrchestrationPlan, + providers: &SourceProviders, +) -> Result, SourceResolveError> { + match &plan.mode { + SourceOrchestrationMode::Managed { managed, .. } => { + let managed_nodes = providers.managed.provide(managed).await?; + Ok(ResolvedSources { + managed: managed_nodes, + attached: Vec::new(), + external: Vec::new(), + }) + } + SourceOrchestrationMode::Attached { attach, external } => { + let attached_nodes = providers.attach.discover(attach).await?; + let external_nodes = providers.external.provide(external).await?; + Ok(ResolvedSources { + managed: Vec::new(), + attached: attached_nodes, + external: external_nodes, + }) + } + SourceOrchestrationMode::ExternalOnly { external } => { + let external_nodes = providers.external.provide(external).await?; + Ok(ResolvedSources { + managed: Vec::new(), + attached: Vec::new(), + external: external_nodes, + }) + } + } +} + +/// Orchestrates scenario sources through the unified source orchestration path. +/// +/// Current wiring is transitional: +/// - Managed mode is backed by prebuilt deployer-managed clients via +/// `StaticManagedProvider`. +/// - Attached and external modes are represented in the orchestration plan and +/// resolver, but provider wiring is still scaffolding-only until full runtime +/// integration lands. +pub async fn orchestrate_sources( + plan: &SourceOrchestrationPlan, + node_clients: NodeClients, +) -> Result, DynError> { + let providers: SourceProviders = SourceProviders::default().with_managed(Arc::new( + StaticManagedProvider::new(node_clients.snapshot()), + )); + + let resolved = resolve_sources(plan, &providers).await?; + + if matches!(plan.mode, SourceOrchestrationMode::Managed { .. }) && resolved.managed.is_empty() { + return Err(SourceResolveError::ManagedNodesMissing.into()); + } + + Ok(NodeClients::new( + resolved + .managed + .into_iter() + .map(|node| node.client) + .collect(), + )) +} diff --git a/testing-framework/core/src/scenario/runtime/providers/external_provider.rs b/testing-framework/core/src/scenario/runtime/providers/external_provider.rs index 98578a3..697c2aa 100644 --- a/testing-framework/core/src/scenario/runtime/providers/external_provider.rs +++ b/testing-framework/core/src/scenario/runtime/providers/external_provider.rs @@ -1,3 +1,5 @@ +use async_trait::async_trait; + use crate::scenario::{Application, DynError, ExternalNodeSource}; /// External node client prepared from a static external source endpoint. @@ -27,12 +29,13 @@ pub enum ExternalProviderError { /// /// This is scaffolding-only in phase 1 and is intentionally not wired into /// deployer runtime orchestration yet. +#[async_trait] pub trait ExternalProvider: Send + Sync { - /// Builds one external node handle from one external source descriptor. - fn build_node( + /// Builds external node handles from external source descriptors. + async fn provide( &self, - source: &ExternalNodeSource, - ) -> Result, ExternalProviderError>; + sources: &[ExternalNodeSource], + ) -> Result>, ExternalProviderError>; } /// Default external provider stub used while external wiring is not @@ -40,13 +43,18 @@ pub trait ExternalProvider: Send + Sync { #[derive(Clone, Copy, Debug, Default)] pub struct NoopExternalProvider; +#[async_trait] impl ExternalProvider for NoopExternalProvider { - fn build_node( + async fn provide( &self, - source: &ExternalNodeSource, - ) -> Result, ExternalProviderError> { + sources: &[ExternalNodeSource], + ) -> Result>, ExternalProviderError> { + let Some(first) = sources.first() else { + return Ok(Vec::new()); + }; + Err(ExternalProviderError::UnsupportedSource { - external_source: source.clone(), + external_source: first.clone(), }) } } diff --git a/testing-framework/core/src/scenario/runtime/providers/managed_provider.rs b/testing-framework/core/src/scenario/runtime/providers/managed_provider.rs new file mode 100644 index 0000000..5eea666 --- /dev/null +++ b/testing-framework/core/src/scenario/runtime/providers/managed_provider.rs @@ -0,0 +1,90 @@ +use async_trait::async_trait; + +use crate::scenario::{Application, DynError, runtime::orchestration::ManagedSource}; + +/// Managed node produced by the managed provider path. +#[derive(Clone, Debug)] +pub struct ManagedProvisionedNode { + /// Optional stable identity hint used by runtime inventory dedup logic. + pub identity_hint: Option, + /// Application-specific client for the managed node. + pub client: E::NodeClient, +} + +/// Errors returned by managed providers while provisioning managed nodes. +#[derive(Debug, thiserror::Error)] +pub enum ManagedProviderError { + #[error("managed source is not supported by this provider: {managed_source:?}")] + UnsupportedSource { managed_source: ManagedSource }, + #[error("managed provisioning failed: {source}")] + Provisioning { + #[source] + source: DynError, + }, +} + +/// Internal adapter interface for managed node provisioning. +/// +/// This is scaffolding-only in phase 1 and is intentionally not wired into +/// deployer runtime orchestration yet. +#[async_trait] +pub trait ManagedProvider: Send + Sync { + /// Provisions or resolves managed nodes for the requested managed source. + async fn provide( + &self, + source: &ManagedSource, + ) -> Result>, ManagedProviderError>; +} + +/// Default managed provider stub used while unified provider wiring is not +/// implemented. +#[derive(Clone, Copy, Debug, Default)] +pub struct NoopManagedProvider; + +#[async_trait] +impl ManagedProvider for NoopManagedProvider { + async fn provide( + &self, + source: &ManagedSource, + ) -> Result>, ManagedProviderError> { + Err(ManagedProviderError::UnsupportedSource { + managed_source: *source, + }) + } +} + +/// Managed provider that returns a precomputed set of managed node clients. +/// +/// Useful as an adapter while deployers are still the source of managed +/// provisioning. +#[derive(Clone, Debug)] +pub struct StaticManagedProvider { + clients: Vec, +} + +impl StaticManagedProvider { + #[must_use] + pub fn new(clients: Vec) -> Self { + Self { clients } + } +} + +#[async_trait] +impl ManagedProvider for StaticManagedProvider { + async fn provide( + &self, + source: &ManagedSource, + ) -> Result>, ManagedProviderError> { + match source { + ManagedSource::DeployerManaged => Ok(self + .clients + .iter() + .cloned() + .map(|client| ManagedProvisionedNode { + identity_hint: None, + client, + }) + .collect()), + } + } +} diff --git a/testing-framework/core/src/scenario/runtime/providers/mod.rs b/testing-framework/core/src/scenario/runtime/providers/mod.rs index a47d7f7..277b25c 100644 --- a/testing-framework/core/src/scenario/runtime/providers/mod.rs +++ b/testing-framework/core/src/scenario/runtime/providers/mod.rs @@ -2,3 +2,12 @@ mod attach_provider; #[allow(dead_code)] mod external_provider; +#[allow(dead_code)] +mod managed_provider; +#[allow(dead_code)] +mod source_providers; + +pub use attach_provider::{AttachProviderError, AttachedNode}; +pub use external_provider::{ExternalNode, ExternalProviderError}; +pub use managed_provider::{ManagedProviderError, ManagedProvisionedNode, StaticManagedProvider}; +pub use source_providers::SourceProviders; diff --git a/testing-framework/core/src/scenario/runtime/providers/source_providers.rs b/testing-framework/core/src/scenario/runtime/providers/source_providers.rs new file mode 100644 index 0000000..c25c89f --- /dev/null +++ b/testing-framework/core/src/scenario/runtime/providers/source_providers.rs @@ -0,0 +1,48 @@ +use std::sync::Arc; + +use super::{ + attach_provider::{AttachProvider, NoopAttachProvider}, + external_provider::{ExternalProvider, NoopExternalProvider}, + managed_provider::{ManagedProvider, NoopManagedProvider}, +}; +use crate::scenario::Application; + +/// Unified provider set used by source orchestration. +/// +/// This is scaffolding-only and is intentionally not wired into runtime +/// deployer orchestration yet. +pub struct SourceProviders { + pub managed: Arc>, + pub attach: Arc>, + pub external: Arc>, +} + +impl Default for SourceProviders { + fn default() -> Self { + Self { + managed: Arc::new(NoopManagedProvider), + attach: Arc::new(NoopAttachProvider), + external: Arc::new(NoopExternalProvider), + } + } +} + +impl SourceProviders { + #[must_use] + pub fn with_managed(mut self, provider: Arc>) -> Self { + self.managed = provider; + self + } + + #[must_use] + pub fn with_attach(mut self, provider: Arc>) -> Self { + self.attach = provider; + self + } + + #[must_use] + pub fn with_external(mut self, provider: Arc>) -> Self { + self.external = provider; + self + } +} diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index 206df6f..6585136 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -5,7 +5,8 @@ use testing_framework_core::{ scenario::{ DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs, - RequiresNodeControl, RunContext, Runner, Scenario, + RequiresNodeControl, RunContext, Runner, Scenario, build_source_orchestration_plan, + orchestrate_sources, }, topology::DeploymentDescriptor, }; @@ -47,6 +48,13 @@ impl DeploymentOrchestrator { where Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync, { + // Source planning is currently resolved here before deployer-specific setup. + let source_plan = build_source_orchestration_plan(scenario).map_err(|source| { + ComposeRunnerError::SourceOrchestration { + source: source.into(), + } + })?; + let deployment = scenario.deployment(); let setup = DeploymentSetup::::new(deployment); setup.validate_environment().await?; @@ -64,7 +72,7 @@ impl DeploymentOrchestrator { &observability, ); - let deployed = deploy_nodes::( + let mut deployed = deploy_nodes::( &mut prepared.environment, &prepared.descriptors, readiness_enabled, @@ -72,6 +80,11 @@ impl DeploymentOrchestrator { ) .await?; + // Source orchestration currently runs here after managed clients are prepared. + deployed.node_clients = orchestrate_sources(&source_plan, deployed.node_clients) + .await + .map_err(|source| ComposeRunnerError::SourceOrchestration { source })?; + let runner = self .build_runner::( scenario, diff --git a/testing-framework/deployers/compose/src/errors.rs b/testing-framework/deployers/compose/src/errors.rs index 8383b33..f30fae7 100644 --- a/testing-framework/deployers/compose/src/errors.rs +++ b/testing-framework/deployers/compose/src/errors.rs @@ -35,6 +35,11 @@ pub enum ComposeRunnerError { BlockFeedMissing, #[error("runtime preflight failed: no node clients available")] RuntimePreflight, + #[error("source orchestration failed: {source}")] + SourceOrchestration { + #[source] + source: DynError, + }, #[error("failed to start feed: {source}")] BlockFeed { #[source] diff --git a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs index b955a4c..c6172ee 100644 --- a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs @@ -8,7 +8,7 @@ use testing_framework_core::{ Application, CleanupGuard, Deployer, DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, MetricsError, NodeClients, ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext, - Runner, Scenario, + Runner, Scenario, build_source_orchestration_plan, orchestrate_sources, }, topology::DeploymentDescriptor, }; @@ -94,6 +94,11 @@ pub enum K8sRunnerError { BlockFeedMissing, #[error("runtime preflight failed: no node clients available")] RuntimePreflight, + #[error("source orchestration failed: {source}")] + SourceOrchestration { + #[source] + source: DynError, + }, #[error("failed to initialize feed: {source}")] BlockFeed { #[source] @@ -147,10 +152,24 @@ where E: K8sDeployEnv, Caps: ObservabilityCapabilityProvider, { + // Source planning is currently resolved here before deployer-specific setup. + let source_plan = build_source_orchestration_plan(scenario).map_err(|source| { + K8sRunnerError::SourceOrchestration { + source: source.into(), + } + })?; + let observability = resolve_observability_inputs(scenario.capabilities())?; let deployment = build_k8s_deployment::(deployer, scenario, &observability).await?; let mut cluster = Some(deployment.cluster); - let runtime = build_runtime_artifacts::(&mut cluster, &observability).await?; + + let mut runtime = build_runtime_artifacts::(&mut cluster, &observability).await?; + + // Source orchestration currently runs here after managed clients are prepared. + runtime.node_clients = orchestrate_sources(&source_plan, runtime.node_clients) + .await + .map_err(|source| K8sRunnerError::SourceOrchestration { source })?; + let parts = build_runner_parts(scenario, deployment.node_count, runtime); log_configured_observability(&observability); diff --git a/testing-framework/deployers/local/src/deployer/orchestrator.rs b/testing-framework/deployers/local/src/deployer/orchestrator.rs index 45044cb..27048d1 100644 --- a/testing-framework/deployers/local/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/local/src/deployer/orchestrator.rs @@ -12,7 +12,8 @@ use testing_framework_core::{ scenario::{ Application, CleanupGuard, Deployer, DeploymentPolicy, DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlCapability, NodeControlHandle, - RetryPolicy, RunContext, Runner, Scenario, ScenarioError, spawn_feed, + RetryPolicy, RunContext, Runner, Scenario, ScenarioError, build_source_orchestration_plan, + orchestrate_sources, spawn_feed, }, topology::DeploymentDescriptor, }; @@ -86,6 +87,11 @@ pub enum ProcessDeployerError { }, #[error("runtime preflight failed: no node clients available")] RuntimePreflight, + #[error("source orchestration failed: {source}")] + SourceOrchestration { + #[source] + source: DynError, + }, #[error("expectations failed: {source}")] ExpectationsFailed { #[source] @@ -180,6 +186,13 @@ impl ProcessDeployer { &self, scenario: &Scenario, ) -> Result, ProcessDeployerError> { + // Source planning is currently resolved here before node spawn/runtime setup. + let source_plan = build_source_orchestration_plan(scenario).map_err(|source| { + ProcessDeployerError::SourceOrchestration { + source: source.into(), + } + })?; + log_local_deploy_start( scenario.deployment().node_count(), scenario.deployment_policy(), @@ -188,6 +201,12 @@ impl ProcessDeployer { let nodes = Self::spawn_nodes_for_scenario(scenario, self.membership_check).await?; let node_clients = NodeClients::::new(nodes.iter().map(|node| node.client()).collect()); + + // Source orchestration currently runs here after managed clients are prepared. + let node_clients = orchestrate_sources(&source_plan, node_clients) + .await + .map_err(|source| ProcessDeployerError::SourceOrchestration { source })?; + let runtime = run_context_for( scenario.deployment().clone(), node_clients, @@ -207,6 +226,13 @@ impl ProcessDeployer { &self, scenario: &Scenario, ) -> Result, ProcessDeployerError> { + // Source planning is currently resolved here before node spawn/runtime setup. + let source_plan = build_source_orchestration_plan(scenario).map_err(|source| { + ProcessDeployerError::SourceOrchestration { + source: source.into(), + } + })?; + log_local_deploy_start( scenario.deployment().node_count(), scenario.deployment_policy(), @@ -215,7 +241,10 @@ impl ProcessDeployer { let nodes = Self::spawn_nodes_for_scenario(scenario, self.membership_check).await?; let node_control = self.node_control_from(scenario, nodes); - let node_clients = node_control.node_clients(); + // Source orchestration currently runs here after managed clients are prepared. + let node_clients = orchestrate_sources(&source_plan, node_control.node_clients()) + .await + .map_err(|source| ProcessDeployerError::SourceOrchestration { source })?; let runtime = run_context_for( scenario.deployment().clone(), node_clients,