feat(testing-framework): scaffold unified source orchestration providers

This commit is contained in:
andrussal 2026-02-19 10:36:52 +01:00
parent 5bbf2e72c9
commit 86362a3a78
14 changed files with 384 additions and 49 deletions

View File

@ -45,6 +45,7 @@ pub struct Scenario<E: Application, Caps = ()> {
deployment_policy: DeploymentPolicy,
sources: ScenarioSources,
source_readiness_policy: SourceReadinessPolicy,
source_orchestration_plan: SourceOrchestrationPlan,
capabilities: Caps,
}
@ -58,6 +59,7 @@ impl<E: Application, Caps> Scenario<E, Caps> {
deployment_policy: DeploymentPolicy,
sources: ScenarioSources,
source_readiness_policy: SourceReadinessPolicy,
source_orchestration_plan: SourceOrchestrationPlan,
capabilities: Caps,
) -> Self {
Self {
@ -69,6 +71,7 @@ impl<E: Application, Caps> Scenario<E, Caps> {
deployment_policy,
sources,
source_readiness_policy,
source_orchestration_plan,
capabilities,
}
}
@ -127,6 +130,11 @@ impl<E: Application, Caps> Scenario<E, Caps> {
&self.sources
}
#[must_use]
pub const fn source_orchestration_plan(&self) -> &SourceOrchestrationPlan {
&self.source_orchestration_plan
}
#[must_use]
pub const fn capabilities(&self) -> &Caps {
&self.capabilities
@ -604,11 +612,8 @@ impl<E: Application, Caps> Builder<E, Caps> {
let descriptors = parts.resolve_deployment()?;
let run_plan = parts.run_plan();
let run_metrics = RunMetrics::new(run_plan.duration);
let _source_plan = build_source_orchestration_plan(
parts.sources(),
descriptors.node_count(),
parts.source_readiness_policy,
)?;
let source_orchestration_plan =
build_source_orchestration_plan(parts.sources(), parts.source_readiness_policy)?;
initialize_components(
&descriptors,
@ -636,6 +641,7 @@ impl<E: Application, Caps> Builder<E, Caps> {
parts.deployment_policy,
parts.sources,
parts.source_readiness_policy,
source_orchestration_plan,
parts.capabilities,
))
}
@ -709,19 +715,14 @@ impl<E: Application, Caps> BuilderParts<E, Caps> {
fn build_source_orchestration_plan(
sources: &ScenarioSources,
managed_node_count: usize,
readiness_policy: SourceReadinessPolicy,
) -> Result<SourceOrchestrationPlan, ScenarioBuildError> {
SourceOrchestrationPlan::try_from_sources(sources, managed_node_count, readiness_policy)
SourceOrchestrationPlan::try_from_sources(sources, readiness_policy)
.map_err(source_plan_error_to_build_error)
}
fn source_plan_error_to_build_error(error: SourceOrchestrationPlanError) -> ScenarioBuildError {
match error {
SourceOrchestrationPlanError::ManagedNodesMissing => ScenarioBuildError::SourceConfiguration {
message:
"managed source selected but deployment produced 0 managed nodes; choose a deployment provider/configuration that yields managed nodes".to_string(),
},
SourceOrchestrationPlanError::SourceModeNotWiredYet { mode } => {
ScenarioBuildError::SourceModeNotWiredYet {
mode: source_mode_name(mode),

View File

@ -38,15 +38,17 @@ pub use expectation::Expectation;
pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs};
pub use runtime::{
BorrowedNode, BorrowedOrigin, CleanupGuard, Deployer, Feed, FeedHandle, FeedRuntime,
HttpReadinessRequirement, ManagedNode, NodeClients, NodeHandle, NodeInventory, ReadinessError,
RunContext, RunHandle, RunMetrics, Runner, ScenarioError, StabilizationConfig,
HttpReadinessRequirement, ManagedNode, ManagedSource, NodeClients, NodeHandle, NodeInventory,
ReadinessError, RunContext, RunHandle, RunMetrics, Runner, ScenarioError,
SourceOrchestrationPlan, SourceProviders, StabilizationConfig, StaticManagedProvider,
build_source_orchestration_plan,
metrics::{
CONSENSUS_PROCESSED_BLOCKS, CONSENSUS_TRANSACTIONS_TOTAL, Metrics, MetricsError,
PrometheusEndpoint, PrometheusInstantSample,
},
spawn_feed, wait_for_http_ports, wait_for_http_ports_with_host,
wait_for_http_ports_with_host_and_requirement, wait_for_http_ports_with_requirement,
wait_http_readiness, wait_until_stable,
orchestrate_sources, resolve_sources, spawn_feed, wait_for_http_ports,
wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement,
wait_for_http_ports_with_requirement, wait_http_readiness, wait_until_stable,
};
pub use sources::{AttachSource, ExternalNodeSource, ScenarioSources, SourceReadinessPolicy};
pub use workload::Workload;

View File

@ -3,8 +3,8 @@ mod deployer;
pub mod inventory;
pub mod metrics;
mod node_clients;
pub(crate) mod orchestration;
pub(crate) mod providers;
pub mod orchestration;
pub mod providers;
pub mod readiness;
mod runner;
@ -13,6 +13,13 @@ pub use context::{CleanupGuard, RunContext, RunHandle, RunMetrics};
pub use deployer::{Deployer, ScenarioError};
pub use inventory::{BorrowedNode, BorrowedOrigin, ManagedNode, NodeHandle, NodeInventory};
pub use node_clients::NodeClients;
#[doc(hidden)]
pub use orchestration::{
ManagedSource, SourceOrchestrationPlan, build_source_orchestration_plan, orchestrate_sources,
resolve_sources,
};
#[doc(hidden)]
pub use providers::{SourceProviders, StaticManagedProvider};
pub use readiness::{
HttpReadinessRequirement, ReadinessError, StabilizationConfig, wait_for_http_ports,
wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement,

View File

@ -1,6 +1,10 @@
#[allow(dead_code)]
mod source_orchestration_plan;
#[allow(dead_code)]
mod source_resolver;
pub(crate) use source_orchestration_plan::{
SourceModeName, SourceOrchestrationPlan, SourceOrchestrationPlanError,
pub use source_orchestration_plan::{
ManagedSource, SourceModeName, SourceOrchestrationMode, SourceOrchestrationPlan,
SourceOrchestrationPlanError,
};
pub use source_resolver::{build_source_orchestration_plan, orchestrate_sources, resolve_sources};

View File

@ -56,8 +56,6 @@ impl fmt::Display for SourceModeName {
/// Validation failure while building orchestration plan from sources.
#[derive(Debug, thiserror::Error)]
pub enum SourceOrchestrationPlanError {
#[error("managed source selected but deployment produced 0 managed nodes")]
ManagedNodesMissing,
#[error("source mode '{mode}' is not wired into deployers yet")]
SourceModeNotWiredYet { mode: SourceModeName },
}
@ -65,10 +63,8 @@ pub enum SourceOrchestrationPlanError {
impl SourceOrchestrationPlan {
pub fn try_from_sources(
sources: &ScenarioSources,
managed_node_count: usize,
readiness_policy: SourceReadinessPolicy,
) -> Result<Self, SourceOrchestrationPlanError> {
ensure_managed_sources_have_nodes(sources, managed_node_count)?;
let mode = mode_from_sources(sources);
let plan = Self {
@ -98,17 +94,6 @@ impl SourceOrchestrationPlan {
}
}
fn ensure_managed_sources_have_nodes(
sources: &ScenarioSources,
managed_node_count: usize,
) -> Result<(), SourceOrchestrationPlanError> {
if matches!(sources, ScenarioSources::Managed { .. }) && managed_node_count == 0 {
return Err(SourceOrchestrationPlanError::ManagedNodesMissing);
}
Ok(())
}
fn mode_from_sources(sources: &ScenarioSources) -> SourceOrchestrationMode {
match sources {
ScenarioSources::Managed { external } => SourceOrchestrationMode::Managed {

View File

@ -0,0 +1,115 @@
use std::sync::Arc;
use crate::scenario::{
Application, DynError, NodeClients, Scenario,
runtime::{
orchestration::{
SourceOrchestrationMode, SourceOrchestrationPlan, SourceOrchestrationPlanError,
},
providers::{
AttachProviderError, AttachedNode, ExternalNode, ExternalProviderError,
ManagedProviderError, ManagedProvisionedNode, SourceProviders, StaticManagedProvider,
},
},
};
/// Resolved source nodes grouped by source class.
#[derive(Clone, Default)]
pub struct ResolvedSources<E: Application> {
pub managed: Vec<ManagedProvisionedNode<E>>,
pub attached: Vec<AttachedNode<E>>,
pub external: Vec<ExternalNode<E>>,
}
/// Errors while resolving sources through unified providers.
#[derive(Debug, thiserror::Error)]
pub enum SourceResolveError {
#[error(transparent)]
Plan(#[from] SourceOrchestrationPlanError),
#[error("managed source selected but deployer produced 0 managed nodes")]
ManagedNodesMissing,
#[error(transparent)]
Managed(#[from] ManagedProviderError),
#[error(transparent)]
Attach(#[from] AttachProviderError),
#[error(transparent)]
External(#[from] ExternalProviderError),
}
/// Builds a source orchestration plan from scenario source configuration.
pub fn build_source_orchestration_plan<E: Application, Caps>(
scenario: &Scenario<E, Caps>,
) -> Result<SourceOrchestrationPlan, SourceOrchestrationPlanError> {
SourceOrchestrationPlan::try_from_sources(
scenario.sources(),
scenario.source_readiness_policy(),
)
}
/// Resolves runtime source nodes via unified providers from orchestration plan.
///
/// This currently returns managed nodes for managed mode and keeps external
/// overlays for managed mode reserved until external wiring is enabled.
pub async fn resolve_sources<E: Application>(
plan: &SourceOrchestrationPlan,
providers: &SourceProviders<E>,
) -> Result<ResolvedSources<E>, SourceResolveError> {
match &plan.mode {
SourceOrchestrationMode::Managed { managed, .. } => {
let managed_nodes = providers.managed.provide(managed).await?;
Ok(ResolvedSources {
managed: managed_nodes,
attached: Vec::new(),
external: Vec::new(),
})
}
SourceOrchestrationMode::Attached { attach, external } => {
let attached_nodes = providers.attach.discover(attach).await?;
let external_nodes = providers.external.provide(external).await?;
Ok(ResolvedSources {
managed: Vec::new(),
attached: attached_nodes,
external: external_nodes,
})
}
SourceOrchestrationMode::ExternalOnly { external } => {
let external_nodes = providers.external.provide(external).await?;
Ok(ResolvedSources {
managed: Vec::new(),
attached: Vec::new(),
external: external_nodes,
})
}
}
}
/// Orchestrates scenario sources through the unified source orchestration path.
///
/// Current wiring is transitional:
/// - Managed mode is backed by prebuilt deployer-managed clients via
/// `StaticManagedProvider`.
/// - Attached and external modes are represented in the orchestration plan and
/// resolver, but provider wiring is still scaffolding-only until full runtime
/// integration lands.
pub async fn orchestrate_sources<E: Application>(
plan: &SourceOrchestrationPlan,
node_clients: NodeClients<E>,
) -> Result<NodeClients<E>, DynError> {
let providers: SourceProviders<E> = SourceProviders::default().with_managed(Arc::new(
StaticManagedProvider::new(node_clients.snapshot()),
));
let resolved = resolve_sources(plan, &providers).await?;
if matches!(plan.mode, SourceOrchestrationMode::Managed { .. }) && resolved.managed.is_empty() {
return Err(SourceResolveError::ManagedNodesMissing.into());
}
Ok(NodeClients::new(
resolved
.managed
.into_iter()
.map(|node| node.client)
.collect(),
))
}

View File

@ -1,3 +1,5 @@
use async_trait::async_trait;
use crate::scenario::{Application, DynError, ExternalNodeSource};
/// External node client prepared from a static external source endpoint.
@ -27,12 +29,13 @@ pub enum ExternalProviderError {
///
/// This is scaffolding-only in phase 1 and is intentionally not wired into
/// deployer runtime orchestration yet.
#[async_trait]
pub trait ExternalProvider<E: Application>: Send + Sync {
/// Builds one external node handle from one external source descriptor.
fn build_node(
/// Builds external node handles from external source descriptors.
async fn provide(
&self,
source: &ExternalNodeSource,
) -> Result<ExternalNode<E>, ExternalProviderError>;
sources: &[ExternalNodeSource],
) -> Result<Vec<ExternalNode<E>>, ExternalProviderError>;
}
/// Default external provider stub used while external wiring is not
@ -40,13 +43,18 @@ pub trait ExternalProvider<E: Application>: Send + Sync {
#[derive(Clone, Copy, Debug, Default)]
pub struct NoopExternalProvider;
#[async_trait]
impl<E: Application> ExternalProvider<E> for NoopExternalProvider {
fn build_node(
async fn provide(
&self,
source: &ExternalNodeSource,
) -> Result<ExternalNode<E>, ExternalProviderError> {
sources: &[ExternalNodeSource],
) -> Result<Vec<ExternalNode<E>>, ExternalProviderError> {
let Some(first) = sources.first() else {
return Ok(Vec::new());
};
Err(ExternalProviderError::UnsupportedSource {
external_source: source.clone(),
external_source: first.clone(),
})
}
}

View File

@ -0,0 +1,90 @@
use async_trait::async_trait;
use crate::scenario::{Application, DynError, runtime::orchestration::ManagedSource};
/// Managed node produced by the managed provider path.
#[derive(Clone, Debug)]
pub struct ManagedProvisionedNode<E: Application> {
/// Optional stable identity hint used by runtime inventory dedup logic.
pub identity_hint: Option<String>,
/// Application-specific client for the managed node.
pub client: E::NodeClient,
}
/// Errors returned by managed providers while provisioning managed nodes.
#[derive(Debug, thiserror::Error)]
pub enum ManagedProviderError {
#[error("managed source is not supported by this provider: {managed_source:?}")]
UnsupportedSource { managed_source: ManagedSource },
#[error("managed provisioning failed: {source}")]
Provisioning {
#[source]
source: DynError,
},
}
/// Internal adapter interface for managed node provisioning.
///
/// This is scaffolding-only in phase 1 and is intentionally not wired into
/// deployer runtime orchestration yet.
#[async_trait]
pub trait ManagedProvider<E: Application>: Send + Sync {
/// Provisions or resolves managed nodes for the requested managed source.
async fn provide(
&self,
source: &ManagedSource,
) -> Result<Vec<ManagedProvisionedNode<E>>, ManagedProviderError>;
}
/// Default managed provider stub used while unified provider wiring is not
/// implemented.
#[derive(Clone, Copy, Debug, Default)]
pub struct NoopManagedProvider;
#[async_trait]
impl<E: Application> ManagedProvider<E> for NoopManagedProvider {
async fn provide(
&self,
source: &ManagedSource,
) -> Result<Vec<ManagedProvisionedNode<E>>, ManagedProviderError> {
Err(ManagedProviderError::UnsupportedSource {
managed_source: *source,
})
}
}
/// Managed provider that returns a precomputed set of managed node clients.
///
/// Useful as an adapter while deployers are still the source of managed
/// provisioning.
#[derive(Clone, Debug)]
pub struct StaticManagedProvider<E: Application> {
clients: Vec<E::NodeClient>,
}
impl<E: Application> StaticManagedProvider<E> {
#[must_use]
pub fn new(clients: Vec<E::NodeClient>) -> Self {
Self { clients }
}
}
#[async_trait]
impl<E: Application> ManagedProvider<E> for StaticManagedProvider<E> {
async fn provide(
&self,
source: &ManagedSource,
) -> Result<Vec<ManagedProvisionedNode<E>>, ManagedProviderError> {
match source {
ManagedSource::DeployerManaged => Ok(self
.clients
.iter()
.cloned()
.map(|client| ManagedProvisionedNode {
identity_hint: None,
client,
})
.collect()),
}
}
}

View File

@ -2,3 +2,12 @@
mod attach_provider;
#[allow(dead_code)]
mod external_provider;
#[allow(dead_code)]
mod managed_provider;
#[allow(dead_code)]
mod source_providers;
pub use attach_provider::{AttachProviderError, AttachedNode};
pub use external_provider::{ExternalNode, ExternalProviderError};
pub use managed_provider::{ManagedProviderError, ManagedProvisionedNode, StaticManagedProvider};
pub use source_providers::SourceProviders;

View File

@ -0,0 +1,48 @@
use std::sync::Arc;
use super::{
attach_provider::{AttachProvider, NoopAttachProvider},
external_provider::{ExternalProvider, NoopExternalProvider},
managed_provider::{ManagedProvider, NoopManagedProvider},
};
use crate::scenario::Application;
/// Unified provider set used by source orchestration.
///
/// This is scaffolding-only and is intentionally not wired into runtime
/// deployer orchestration yet.
pub struct SourceProviders<E: Application> {
pub managed: Arc<dyn ManagedProvider<E>>,
pub attach: Arc<dyn AttachProvider<E>>,
pub external: Arc<dyn ExternalProvider<E>>,
}
impl<E: Application> Default for SourceProviders<E> {
fn default() -> Self {
Self {
managed: Arc::new(NoopManagedProvider),
attach: Arc::new(NoopAttachProvider),
external: Arc::new(NoopExternalProvider),
}
}
}
impl<E: Application> SourceProviders<E> {
#[must_use]
pub fn with_managed(mut self, provider: Arc<dyn ManagedProvider<E>>) -> Self {
self.managed = provider;
self
}
#[must_use]
pub fn with_attach(mut self, provider: Arc<dyn AttachProvider<E>>) -> Self {
self.attach = provider;
self
}
#[must_use]
pub fn with_external(mut self, provider: Arc<dyn ExternalProvider<E>>) -> Self {
self.external = provider;
self
}
}

View File

@ -5,7 +5,8 @@ use testing_framework_core::{
scenario::{
DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients,
NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs,
RequiresNodeControl, RunContext, Runner, Scenario,
RequiresNodeControl, RunContext, Runner, Scenario, build_source_orchestration_plan,
orchestrate_sources,
},
topology::DeploymentDescriptor,
};
@ -47,6 +48,13 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
where
Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync,
{
// Source planning is currently resolved here before deployer-specific setup.
let source_plan = build_source_orchestration_plan(scenario).map_err(|source| {
ComposeRunnerError::SourceOrchestration {
source: source.into(),
}
})?;
let deployment = scenario.deployment();
let setup = DeploymentSetup::<E>::new(deployment);
setup.validate_environment().await?;
@ -64,7 +72,7 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
&observability,
);
let deployed = deploy_nodes::<E>(
let mut deployed = deploy_nodes::<E>(
&mut prepared.environment,
&prepared.descriptors,
readiness_enabled,
@ -72,6 +80,11 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
)
.await?;
// Source orchestration currently runs here after managed clients are prepared.
deployed.node_clients = orchestrate_sources(&source_plan, deployed.node_clients)
.await
.map_err(|source| ComposeRunnerError::SourceOrchestration { source })?;
let runner = self
.build_runner::<Caps>(
scenario,

View File

@ -35,6 +35,11 @@ pub enum ComposeRunnerError {
BlockFeedMissing,
#[error("runtime preflight failed: no node clients available")]
RuntimePreflight,
#[error("source orchestration failed: {source}")]
SourceOrchestration {
#[source]
source: DynError,
},
#[error("failed to start feed: {source}")]
BlockFeed {
#[source]

View File

@ -8,7 +8,7 @@ use testing_framework_core::{
Application, CleanupGuard, Deployer, DynError, FeedHandle, FeedRuntime,
HttpReadinessRequirement, Metrics, MetricsError, NodeClients,
ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext,
Runner, Scenario,
Runner, Scenario, build_source_orchestration_plan, orchestrate_sources,
},
topology::DeploymentDescriptor,
};
@ -94,6 +94,11 @@ pub enum K8sRunnerError {
BlockFeedMissing,
#[error("runtime preflight failed: no node clients available")]
RuntimePreflight,
#[error("source orchestration failed: {source}")]
SourceOrchestration {
#[source]
source: DynError,
},
#[error("failed to initialize feed: {source}")]
BlockFeed {
#[source]
@ -147,10 +152,24 @@ where
E: K8sDeployEnv,
Caps: ObservabilityCapabilityProvider,
{
// Source planning is currently resolved here before deployer-specific setup.
let source_plan = build_source_orchestration_plan(scenario).map_err(|source| {
K8sRunnerError::SourceOrchestration {
source: source.into(),
}
})?;
let observability = resolve_observability_inputs(scenario.capabilities())?;
let deployment = build_k8s_deployment::<E, Caps>(deployer, scenario, &observability).await?;
let mut cluster = Some(deployment.cluster);
let runtime = build_runtime_artifacts::<E>(&mut cluster, &observability).await?;
let mut runtime = build_runtime_artifacts::<E>(&mut cluster, &observability).await?;
// Source orchestration currently runs here after managed clients are prepared.
runtime.node_clients = orchestrate_sources(&source_plan, runtime.node_clients)
.await
.map_err(|source| K8sRunnerError::SourceOrchestration { source })?;
let parts = build_runner_parts(scenario, deployment.node_count, runtime);
log_configured_observability(&observability);

View File

@ -12,7 +12,8 @@ use testing_framework_core::{
scenario::{
Application, CleanupGuard, Deployer, DeploymentPolicy, DynError, FeedHandle, FeedRuntime,
HttpReadinessRequirement, Metrics, NodeClients, NodeControlCapability, NodeControlHandle,
RetryPolicy, RunContext, Runner, Scenario, ScenarioError, spawn_feed,
RetryPolicy, RunContext, Runner, Scenario, ScenarioError, build_source_orchestration_plan,
orchestrate_sources, spawn_feed,
},
topology::DeploymentDescriptor,
};
@ -86,6 +87,11 @@ pub enum ProcessDeployerError {
},
#[error("runtime preflight failed: no node clients available")]
RuntimePreflight,
#[error("source orchestration failed: {source}")]
SourceOrchestration {
#[source]
source: DynError,
},
#[error("expectations failed: {source}")]
ExpectationsFailed {
#[source]
@ -180,6 +186,13 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
&self,
scenario: &Scenario<E, ()>,
) -> Result<Runner<E>, ProcessDeployerError> {
// Source planning is currently resolved here before node spawn/runtime setup.
let source_plan = build_source_orchestration_plan(scenario).map_err(|source| {
ProcessDeployerError::SourceOrchestration {
source: source.into(),
}
})?;
log_local_deploy_start(
scenario.deployment().node_count(),
scenario.deployment_policy(),
@ -188,6 +201,12 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
let nodes = Self::spawn_nodes_for_scenario(scenario, self.membership_check).await?;
let node_clients = NodeClients::<E>::new(nodes.iter().map(|node| node.client()).collect());
// Source orchestration currently runs here after managed clients are prepared.
let node_clients = orchestrate_sources(&source_plan, node_clients)
.await
.map_err(|source| ProcessDeployerError::SourceOrchestration { source })?;
let runtime = run_context_for(
scenario.deployment().clone(),
node_clients,
@ -207,6 +226,13 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
&self,
scenario: &Scenario<E, NodeControlCapability>,
) -> Result<Runner<E>, ProcessDeployerError> {
// Source planning is currently resolved here before node spawn/runtime setup.
let source_plan = build_source_orchestration_plan(scenario).map_err(|source| {
ProcessDeployerError::SourceOrchestration {
source: source.into(),
}
})?;
log_local_deploy_start(
scenario.deployment().node_count(),
scenario.deployment_policy(),
@ -215,7 +241,10 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
let nodes = Self::spawn_nodes_for_scenario(scenario, self.membership_check).await?;
let node_control = self.node_control_from(scenario, nodes);
let node_clients = node_control.node_clients();
// Source orchestration currently runs here after managed clients are prepared.
let node_clients = orchestrate_sources(&source_plan, node_control.node_clients())
.await
.map_err(|source| ProcessDeployerError::SourceOrchestration { source })?;
let runtime = run_context_for(
scenario.deployment().clone(),
node_clients,