framework: make runtime mode contract explicit

This commit is contained in:
Andrus Salumets 2026-03-25 13:24:17 +07:00 committed by GitHub
commit 262c0443a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 677 additions and 176 deletions

View File

@ -1,7 +1,11 @@
use async_trait::async_trait;
use crate::scenario::{Application, ClusterWaitHandle, NodeControlHandle};
use crate::scenario::{Application, ClusterControlProfile, ClusterWaitHandle, NodeControlHandle};
/// Interface for imperative, deployer-backed manual clusters.
#[async_trait]
pub trait ManualClusterHandle<E: Application>: NodeControlHandle<E> + ClusterWaitHandle<E> {}
pub trait ManualClusterHandle<E: Application>: NodeControlHandle<E> + ClusterWaitHandle<E> {
fn cluster_control_profile(&self) -> ClusterControlProfile {
ClusterControlProfile::ManualControlled
}
}

View File

@ -4,8 +4,9 @@ use thiserror::Error;
use tracing::{debug, info};
use super::{
Application, DeploymentPolicy, DynError, ExistingCluster, ExternalNodeSource,
HttpReadinessRequirement, NodeControlCapability, ObservabilityCapability,
Application, ClusterControlProfile, ClusterMode, DeploymentPolicy, DynError, ExistingCluster,
ExternalNodeSource, HttpReadinessRequirement, NodeControlCapability, ObservabilityCapability,
RequiresNodeControl,
builder_ops::CoreBuilderAccess,
expectation::Expectation,
runtime::{
@ -119,8 +120,13 @@ impl<E: Application, Caps> Scenario<E, Caps> {
}
#[must_use]
pub const fn uses_existing_cluster(&self) -> bool {
self.sources.uses_existing_cluster()
pub const fn cluster_mode(&self) -> ClusterMode {
self.sources.cluster_mode()
}
#[must_use]
pub const fn cluster_control_profile(&self) -> ClusterControlProfile {
self.sources.control_profile()
}
#[must_use]
@ -134,16 +140,6 @@ impl<E: Application, Caps> Scenario<E, Caps> {
self.sources.external_nodes()
}
#[must_use]
pub const fn is_managed(&self) -> bool {
self.sources.is_managed()
}
#[must_use]
pub const fn is_external_only(&self) -> bool {
self.sources.is_external_only()
}
#[must_use]
pub fn has_external_nodes(&self) -> bool {
!self.sources.external_nodes().is_empty()
@ -619,11 +615,17 @@ impl<E: Application, Caps> Builder<E, Caps> {
#[must_use]
/// Finalize the scenario, computing run metrics and initializing
/// components.
pub fn build(self) -> Result<Scenario<E, Caps>, ScenarioBuildError> {
pub fn build(self) -> Result<Scenario<E, Caps>, ScenarioBuildError>
where
Caps: RequiresNodeControl,
{
let mut parts = BuilderParts::from_builder(self);
let descriptors = parts.resolve_deployment()?;
let run_plan = parts.run_plan();
let run_metrics = RunMetrics::new(run_plan.duration);
validate_source_contract::<Caps>(parts.sources())?;
let source_orchestration_plan = build_source_orchestration_plan(parts.sources())?;
initialize_components(
@ -726,6 +728,17 @@ fn build_source_orchestration_plan(
SourceOrchestrationPlan::try_from_sources(sources).map_err(source_plan_error_to_build_error)
}
fn validate_source_contract<Caps>(sources: &ScenarioSources) -> Result<(), ScenarioBuildError>
where
Caps: RequiresNodeControl,
{
validate_external_only_sources(sources)?;
validate_node_control_profile::<Caps>(sources)?;
Ok(())
}
fn source_plan_error_to_build_error(error: SourceOrchestrationPlanError) -> ScenarioBuildError {
match error {
SourceOrchestrationPlanError::SourceModeNotWiredYet { mode } => {
@ -734,6 +747,37 @@ fn source_plan_error_to_build_error(error: SourceOrchestrationPlanError) -> Scen
}
}
fn validate_external_only_sources(sources: &ScenarioSources) -> Result<(), ScenarioBuildError> {
if matches!(sources.cluster_mode(), ClusterMode::ExternalOnly)
&& sources.external_nodes().is_empty()
{
return Err(ScenarioBuildError::SourceConfiguration {
message: "external-only scenarios require at least one external node".to_owned(),
});
}
Ok(())
}
fn validate_node_control_profile<Caps>(sources: &ScenarioSources) -> Result<(), ScenarioBuildError>
where
Caps: RequiresNodeControl,
{
let profile = sources.control_profile();
if Caps::REQUIRED && matches!(profile, ClusterControlProfile::ExternalUncontrolled) {
return Err(ScenarioBuildError::SourceConfiguration {
message: format!(
"node control is not available for cluster mode '{}' with control profile '{}'",
sources.cluster_mode().as_str(),
profile.as_str(),
),
});
}
Ok(())
}
impl<E: Application> Builder<E, ()> {
#[must_use]
pub fn enable_node_control(self) -> Builder<E, NodeControlCapability> {
@ -818,3 +862,59 @@ fn expectation_cooldown_for(override_value: Option<Duration>) -> Duration {
fn min_run_duration() -> Duration {
Duration::from_secs(MIN_RUN_DURATION_SECS)
}
#[cfg(test)]
mod tests {
use super::{
ScenarioBuildError, validate_external_only_sources, validate_node_control_profile,
};
use crate::scenario::{
ExistingCluster, ExternalNodeSource, NodeControlCapability, sources::ScenarioSources,
};
#[test]
fn external_only_requires_external_nodes() {
let error =
validate_external_only_sources(&ScenarioSources::default().into_external_only())
.expect_err("external-only without nodes should fail");
assert!(matches!(
error,
ScenarioBuildError::SourceConfiguration { .. }
));
assert_eq!(
error.to_string(),
"invalid scenario source configuration: external-only scenarios require at least one external node"
);
}
#[test]
fn external_only_rejects_node_control_requirement() {
let sources = ScenarioSources::default()
.with_external_node(ExternalNodeSource::new(
"node-0".to_owned(),
"http://127.0.0.1:1".to_owned(),
))
.into_external_only();
let error = validate_node_control_profile::<NodeControlCapability>(&sources)
.expect_err("external-only should reject node control");
assert!(matches!(
error,
ScenarioBuildError::SourceConfiguration { .. }
));
assert_eq!(
error.to_string(),
"invalid scenario source configuration: node control is not available for cluster mode 'external-only' with control profile 'external-uncontrolled'"
);
}
#[test]
fn existing_cluster_accepts_node_control_requirement() {
let sources = ScenarioSources::default()
.with_attach(ExistingCluster::for_compose_project("project".to_owned()));
validate_node_control_profile::<NodeControlCapability>(&sources)
.expect("existing cluster should be considered controllable");
}
}

View File

@ -36,22 +36,25 @@ pub use definition::{Scenario, ScenarioBuildError, ScenarioBuilder};
pub use deployment_policy::{CleanupPolicy, DeploymentPolicy, RetryPolicy};
pub use expectation::Expectation;
pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs};
#[doc(hidden)]
pub use runtime::{
ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode, CleanupGuard,
Deployer, Feed, FeedHandle, FeedRuntime, HttpReadinessRequirement, ManagedSource, NodeClients,
ReadinessError, RunContext, RunHandle, RunMetrics, Runner, ScenarioError,
SourceOrchestrationPlan, SourceProviders, StabilizationConfig, StaticManagedProvider,
build_source_orchestration_plan,
FeedHandle, ManagedSource, RuntimeAssembly, SourceOrchestrationPlan, SourceProviders,
StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources,
orchestrate_sources_with_providers, resolve_sources,
};
pub use runtime::{
Deployer, Feed, FeedRuntime, HttpReadinessRequirement, NodeClients, ReadinessError, RunContext,
RunHandle, RunMetrics, Runner, ScenarioError, StabilizationConfig,
metrics::{
CONSENSUS_PROCESSED_BLOCKS, CONSENSUS_TRANSACTIONS_TOTAL, Metrics, MetricsError,
PrometheusEndpoint, PrometheusInstantSample,
},
orchestrate_sources, orchestrate_sources_with_providers, resolve_sources, spawn_feed,
wait_for_http_ports, wait_for_http_ports_with_host,
spawn_feed, wait_for_http_ports, wait_for_http_ports_with_host,
wait_for_http_ports_with_host_and_requirement, wait_for_http_ports_with_requirement,
wait_http_readiness, wait_until_stable,
};
pub use sources::{ExistingCluster, ExternalNodeSource};
pub use sources::{ClusterControlProfile, ClusterMode, ExistingCluster, ExternalNodeSource};
pub use workload::Workload;
pub use crate::env::Application;

View File

@ -1,7 +1,9 @@
use std::{sync::Arc, time::Duration};
use super::{metrics::Metrics, node_clients::ClusterClient};
use crate::scenario::{Application, ClusterWaitHandle, DynError, NodeClients, NodeControlHandle};
use crate::scenario::{
Application, ClusterControlProfile, ClusterWaitHandle, DynError, NodeClients, NodeControlHandle,
};
#[derive(Debug, thiserror::Error)]
enum RunContextCapabilityError {
@ -15,6 +17,21 @@ pub struct RunContext<E: Application> {
node_clients: NodeClients<E>,
metrics: RunMetrics,
expectation_cooldown: Duration,
cluster_control_profile: ClusterControlProfile,
telemetry: Metrics,
feed: <E::FeedRuntime as super::FeedRuntime>::Feed,
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
cluster_wait: Option<Arc<dyn ClusterWaitHandle<E>>>,
}
/// Low-level runtime assembly input used by deployers to build a runnable
/// cluster context.
pub struct RuntimeAssembly<E: Application> {
descriptors: E::Deployment,
node_clients: NodeClients<E>,
run_duration: Duration,
expectation_cooldown: Duration,
cluster_control_profile: ClusterControlProfile,
telemetry: Metrics,
feed: <E::FeedRuntime as super::FeedRuntime>::Feed,
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
@ -24,12 +41,12 @@ pub struct RunContext<E: Application> {
impl<E: Application> RunContext<E> {
/// Builds a run context from prepared deployment/runtime artifacts.
#[must_use]
#[doc(hidden)]
pub fn new(
pub(crate) fn new(
descriptors: E::Deployment,
node_clients: NodeClients<E>,
run_duration: Duration,
expectation_cooldown: Duration,
cluster_control_profile: ClusterControlProfile,
telemetry: Metrics,
feed: <E::FeedRuntime as super::FeedRuntime>::Feed,
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
@ -41,6 +58,7 @@ impl<E: Application> RunContext<E> {
node_clients,
metrics,
expectation_cooldown,
cluster_control_profile,
telemetry,
feed,
node_control,
@ -49,8 +67,7 @@ impl<E: Application> RunContext<E> {
}
#[must_use]
#[doc(hidden)]
pub fn with_cluster_wait(mut self, cluster_wait: Arc<dyn ClusterWaitHandle<E>>) -> Self {
pub(crate) fn with_cluster_wait(mut self, cluster_wait: Arc<dyn ClusterWaitHandle<E>>) -> Self {
self.cluster_wait = Some(cluster_wait);
self
}
@ -86,13 +103,13 @@ impl<E: Application> RunContext<E> {
}
#[must_use]
pub const fn expectation_cooldown(&self) -> Duration {
pub(crate) const fn expectation_cooldown(&self) -> Duration {
self.expectation_cooldown
}
#[must_use]
pub const fn run_metrics(&self) -> RunMetrics {
self.metrics
pub const fn cluster_control_profile(&self) -> ClusterControlProfile {
self.cluster_control_profile
}
#[must_use]
@ -100,11 +117,6 @@ impl<E: Application> RunContext<E> {
self.node_control.clone()
}
#[must_use]
pub const fn controls_nodes(&self) -> bool {
self.node_control.is_some()
}
pub(crate) async fn wait_network_ready(&self) -> Result<(), DynError> {
self.require_cluster_wait()?.wait_network_ready().await
}
@ -122,6 +134,83 @@ impl<E: Application> RunContext<E> {
}
}
impl<E: Application> RuntimeAssembly<E> {
#[must_use]
pub fn new(
descriptors: E::Deployment,
node_clients: NodeClients<E>,
run_duration: Duration,
expectation_cooldown: Duration,
cluster_control_profile: ClusterControlProfile,
telemetry: Metrics,
feed: <E::FeedRuntime as super::FeedRuntime>::Feed,
) -> Self {
Self {
descriptors,
node_clients,
run_duration,
expectation_cooldown,
cluster_control_profile,
telemetry,
feed,
node_control: None,
cluster_wait: None,
}
}
#[must_use]
pub fn with_node_control(mut self, node_control: Arc<dyn NodeControlHandle<E>>) -> Self {
self.node_control = Some(node_control);
self
}
#[must_use]
pub fn with_cluster_wait(mut self, cluster_wait: Arc<dyn ClusterWaitHandle<E>>) -> Self {
self.cluster_wait = Some(cluster_wait);
self
}
#[must_use]
pub fn build_context(self) -> RunContext<E> {
let context = RunContext::new(
self.descriptors,
self.node_clients,
self.run_duration,
self.expectation_cooldown,
self.cluster_control_profile,
self.telemetry,
self.feed,
self.node_control,
);
match self.cluster_wait {
Some(cluster_wait) => context.with_cluster_wait(cluster_wait),
None => context,
}
}
#[must_use]
pub fn build_runner(self, cleanup_guard: Option<Box<dyn CleanupGuard>>) -> super::Runner<E> {
super::Runner::new(self.build_context(), cleanup_guard)
}
}
impl<E: Application> From<RunContext<E>> for RuntimeAssembly<E> {
fn from(context: RunContext<E>) -> Self {
Self {
descriptors: context.descriptors,
node_clients: context.node_clients,
run_duration: context.metrics.run_duration(),
expectation_cooldown: context.expectation_cooldown,
cluster_control_profile: context.cluster_control_profile,
telemetry: context.telemetry,
feed: context.feed,
node_control: context.node_control,
cluster_wait: context.cluster_wait,
}
}
}
/// Handle returned by the runner to control the lifecycle of the run.
pub struct RunHandle<E: Application> {
run_context: Arc<RunContext<E>>,
@ -137,16 +226,6 @@ impl<E: Application> Drop for RunHandle<E> {
}
impl<E: Application> RunHandle<E> {
#[must_use]
/// Build a handle from owned context and optional cleanup guard.
#[doc(hidden)]
pub fn new(context: RunContext<E>, cleanup_guard: Option<Box<dyn CleanupGuard>>) -> Self {
Self {
run_context: Arc::new(context),
cleanup_guard,
}
}
#[must_use]
/// Build a handle from a shared context reference.
pub(crate) fn from_shared(

View File

@ -9,7 +9,7 @@ pub mod readiness;
mod runner;
use async_trait::async_trait;
pub use context::{CleanupGuard, RunContext, RunHandle, RunMetrics};
pub use context::{CleanupGuard, RunContext, RunHandle, RunMetrics, RuntimeAssembly};
pub use deployer::{Deployer, ScenarioError};
pub use node_clients::NodeClients;
#[doc(hidden)]

View File

@ -1,4 +1,4 @@
use crate::scenario::{ExistingCluster, ExternalNodeSource, sources::ScenarioSources};
use crate::scenario::{ClusterMode, ExistingCluster, ExternalNodeSource, sources::ScenarioSources};
/// Explicit descriptor for managed node sourcing.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
@ -69,7 +69,19 @@ impl SourceOrchestrationPlan {
#[cfg(test)]
mod tests {
use super::{SourceOrchestrationMode, SourceOrchestrationPlan};
use crate::scenario::{ExistingCluster, sources::ScenarioSources};
use crate::scenario::{ExistingCluster, ExternalNodeSource, sources::ScenarioSources};
#[test]
fn managed_sources_are_planned() {
let plan = SourceOrchestrationPlan::try_from_sources(&ScenarioSources::default())
.expect("managed sources should build a source orchestration plan");
assert!(matches!(
plan.mode(),
SourceOrchestrationMode::Managed { .. }
));
assert!(plan.external_sources().is_empty());
}
#[test]
fn attached_sources_are_planned() {
@ -86,20 +98,75 @@ mod tests {
SourceOrchestrationMode::Attached { .. }
));
}
#[test]
fn attached_sources_keep_external_nodes() {
let sources = ScenarioSources::default()
.with_attach(ExistingCluster::for_compose_project(
"test-project".to_string(),
))
.with_external_node(ExternalNodeSource::new(
"external-0".to_owned(),
"http://127.0.0.1:1".to_owned(),
));
let plan = SourceOrchestrationPlan::try_from_sources(&sources)
.expect("attached sources with external nodes should build");
assert!(matches!(
plan.mode(),
SourceOrchestrationMode::Attached { .. }
));
assert_eq!(plan.external_sources().len(), 1);
assert_eq!(plan.external_sources()[0].label(), "external-0");
}
#[test]
fn external_only_sources_are_planned() {
let sources = ScenarioSources::default()
.with_external_node(ExternalNodeSource::new(
"external-0".to_owned(),
"http://127.0.0.1:1".to_owned(),
))
.into_external_only();
let plan = SourceOrchestrationPlan::try_from_sources(&sources)
.expect("external-only sources should build a source orchestration plan");
assert!(matches!(
plan.mode(),
SourceOrchestrationMode::ExternalOnly { .. }
));
assert_eq!(plan.external_sources().len(), 1);
assert_eq!(plan.external_sources()[0].label(), "external-0");
}
}
fn mode_from_sources(sources: &ScenarioSources) -> SourceOrchestrationMode {
match sources {
ScenarioSources::Managed { external } => SourceOrchestrationMode::Managed {
managed: ManagedSource::DeployerManaged,
external: external.clone(),
match sources.cluster_mode() {
ClusterMode::Managed => match sources {
ScenarioSources::Managed { external } => SourceOrchestrationMode::Managed {
managed: ManagedSource::DeployerManaged,
external: external.clone(),
},
ScenarioSources::Attached { .. } | ScenarioSources::ExternalOnly { .. } => {
unreachable!("cluster mode and source storage must stay aligned")
}
},
ScenarioSources::Attached { attach, external } => SourceOrchestrationMode::Attached {
attach: attach.clone(),
external: external.clone(),
ClusterMode::ExistingCluster => match sources {
ScenarioSources::Attached { attach, external } => SourceOrchestrationMode::Attached {
attach: attach.clone(),
external: external.clone(),
},
ScenarioSources::Managed { .. } | ScenarioSources::ExternalOnly { .. } => {
unreachable!("cluster mode and source storage must stay aligned")
}
},
ScenarioSources::ExternalOnly { external } => SourceOrchestrationMode::ExternalOnly {
external: external.clone(),
ClusterMode::ExternalOnly => match sources {
ScenarioSources::ExternalOnly { external } => SourceOrchestrationMode::ExternalOnly {
external: external.clone(),
},
ScenarioSources::Managed { .. } | ScenarioSources::Attached { .. } => {
unreachable!("cluster mode and source storage must stay aligned")
}
},
}
}

View File

@ -2,7 +2,7 @@ use async_trait::async_trait;
use crate::scenario::{Application, DynError, ExistingCluster};
/// Attached node discovered from an existing external cluster source.
/// Node discovered from an existing cluster descriptor.
#[derive(Clone, Debug)]
pub struct AttachedNode<E: Application> {
/// Optional stable identity hint used by runtime inventory dedup logic.
@ -14,7 +14,7 @@ pub struct AttachedNode<E: Application> {
/// Errors returned by attach providers while discovering attached nodes.
#[derive(Debug, thiserror::Error)]
pub enum AttachProviderError {
#[error("attach source is not supported by this provider: {attach_source:?}")]
#[error("existing cluster descriptor is not supported by this provider: {attach_source:?}")]
UnsupportedSource { attach_source: ExistingCluster },
#[error("attach discovery failed: {source}")]
Discovery {
@ -23,13 +23,13 @@ pub enum AttachProviderError {
},
}
/// Internal adapter interface for discovering pre-existing nodes.
/// Internal adapter interface for discovering nodes in an existing cluster.
///
/// This is scaffolding-only in phase 1 and is intentionally not wired into
/// deployer runtime orchestration yet.
#[async_trait]
pub trait AttachProvider<E: Application>: Send + Sync {
/// Discovers node clients for the requested attach source.
/// Discovers node clients for the requested existing cluster.
async fn discover(
&self,
source: &ExistingCluster,

View File

@ -34,10 +34,11 @@ impl<E: Application> Drop for Runner<E> {
}
impl<E: Application> Runner<E> {
/// Construct a runner from the run context and optional cleanup guard.
#[must_use]
#[doc(hidden)]
pub fn new(context: RunContext<E>, cleanup_guard: Option<Box<dyn CleanupGuard>>) -> Self {
pub(crate) fn new(
context: RunContext<E>,
cleanup_guard: Option<Box<dyn CleanupGuard>>,
) -> Self {
Self {
context: Arc::new(context),
cleanup_guard,
@ -191,7 +192,7 @@ impl<E: Application> Runner<E> {
}
fn settle_wait_duration(context: &RunContext<E>) -> Option<Duration> {
let has_node_control = context.controls_nodes();
let has_node_control = context.node_control().is_some();
let configured_wait = context.expectation_cooldown();
if configured_wait.is_zero() && !has_node_control {
@ -232,7 +233,7 @@ impl<E: Application> Runner<E> {
fn cooldown_duration(context: &RunContext<E>) -> Option<Duration> {
// Managed environments need a minimum cooldown so feed and expectations
// observe stabilized state.
let needs_stabilization = context.controls_nodes();
let needs_stabilization = context.cluster_control_profile().framework_owns_lifecycle();
let mut wait = context.expectation_cooldown();

View File

@ -2,4 +2,4 @@ mod model;
pub(crate) use model::ScenarioSources;
#[doc(hidden)]
pub use model::{ExistingCluster, ExternalNodeSource};
pub use model::{ClusterControlProfile, ClusterMode, ExistingCluster, ExternalNodeSource};

View File

@ -119,6 +119,51 @@ impl ExternalNodeSource {
}
}
/// High-level source mode of a scenario cluster.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ClusterMode {
Managed,
ExistingCluster,
ExternalOnly,
}
impl ClusterMode {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Managed => "managed",
Self::ExistingCluster => "existing-cluster",
Self::ExternalOnly => "external-only",
}
}
}
/// High-level control/lifecycle expectation for a cluster surface.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ClusterControlProfile {
FrameworkManaged,
ExistingClusterAttached,
ExternalUncontrolled,
ManualControlled,
}
impl ClusterControlProfile {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::FrameworkManaged => "framework-managed",
Self::ExistingClusterAttached => "existing-cluster-attached",
Self::ExternalUncontrolled => "external-uncontrolled",
Self::ManualControlled => "manual-controlled",
}
}
#[must_use]
pub const fn framework_owns_lifecycle(self) -> bool {
matches!(self, Self::FrameworkManaged)
}
}
/// Source model that makes invalid managed+attached combinations
/// unrepresentable by type.
#[derive(Clone, Debug, Eq, PartialEq)]
@ -187,17 +232,59 @@ impl ScenarioSources {
}
#[must_use]
pub(crate) const fn is_managed(&self) -> bool {
matches!(self, Self::Managed { .. })
pub(crate) const fn cluster_mode(&self) -> ClusterMode {
match self {
Self::Managed { .. } => ClusterMode::Managed,
Self::Attached { .. } => ClusterMode::ExistingCluster,
Self::ExternalOnly { .. } => ClusterMode::ExternalOnly,
}
}
#[must_use]
pub(crate) const fn uses_existing_cluster(&self) -> bool {
matches!(self, Self::Attached { .. })
}
#[must_use]
pub(crate) const fn is_external_only(&self) -> bool {
matches!(self, Self::ExternalOnly { .. })
pub(crate) const fn control_profile(&self) -> ClusterControlProfile {
match self.cluster_mode() {
ClusterMode::Managed => ClusterControlProfile::FrameworkManaged,
ClusterMode::ExistingCluster => ClusterControlProfile::ExistingClusterAttached,
ClusterMode::ExternalOnly => ClusterControlProfile::ExternalUncontrolled,
}
}
}
#[cfg(test)]
mod tests {
use super::{ClusterControlProfile, ExistingCluster, ExternalNodeSource, ScenarioSources};
#[test]
fn managed_sources_map_to_framework_managed_control() {
assert_eq!(
ScenarioSources::default().control_profile(),
ClusterControlProfile::FrameworkManaged,
);
}
#[test]
fn attached_sources_map_to_existing_cluster_control() {
let sources = ScenarioSources::default()
.with_attach(ExistingCluster::for_compose_project("project".to_owned()));
assert_eq!(
sources.control_profile(),
ClusterControlProfile::ExistingClusterAttached,
);
}
#[test]
fn external_only_sources_map_to_uncontrolled_profile() {
let sources = ScenarioSources::default()
.with_external_node(ExternalNodeSource::new(
"node".to_owned(),
"http://node".to_owned(),
))
.into_external_only();
assert_eq!(
sources.control_profile(),
ClusterControlProfile::ExternalUncontrolled,
);
}
}

View File

@ -176,12 +176,12 @@ impl<E: ComposeDeployEnv> ClusterWaitHandle<E> for ComposeAttachedClusterWait<E>
}
fn compose_wait_request(source: &ExistingCluster) -> Result<ComposeAttachRequest<'_>, DynError> {
let project = source
.compose_project()
.ok_or_else(|| DynError::from("compose cluster wait requires a compose attach source"))?;
let services = source
.compose_services()
.ok_or_else(|| DynError::from("compose cluster wait requires a compose attach source"))?;
let project = source.compose_project().ok_or_else(|| {
DynError::from("compose cluster wait requires a compose existing-cluster descriptor")
})?;
let services = source.compose_services().ok_or_else(|| {
DynError::from("compose cluster wait requires a compose existing-cluster descriptor")
})?;
Ok(ComposeAttachRequest { project, services })
}

View File

@ -3,10 +3,11 @@ use std::{env, sync::Arc, time::Duration};
use reqwest::Url;
use testing_framework_core::{
scenario::{
ApplicationExternalProvider, CleanupGuard, ClusterWaitHandle, DeploymentPolicy, FeedHandle,
FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle,
ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext,
Runner, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider,
Application, ApplicationExternalProvider, CleanupGuard, ClusterControlProfile, ClusterMode,
ClusterWaitHandle, DeploymentPolicy, DynError, ExistingCluster, FeedHandle, FeedRuntime,
HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle,
ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, Runner,
RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider,
build_source_orchestration_plan, orchestrate_sources_with_providers,
},
topology::DeploymentDescriptor,
@ -63,6 +64,12 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
where
Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync,
{
validate_supported_cluster_mode(scenario).map_err(|source| {
ComposeRunnerError::SourceOrchestration {
source: source.into(),
}
})?;
// Source planning is currently resolved here before deployer-specific setup.
let source_plan = build_source_orchestration_plan(scenario).map_err(|source| {
ComposeRunnerError::SourceOrchestration {
@ -70,11 +77,11 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
}
})?;
if scenario.uses_existing_cluster() {
if matches!(scenario.cluster_mode(), ClusterMode::ExistingCluster) {
return self
.deploy_attached_only::<Caps>(scenario, source_plan)
.deploy_existing_cluster::<Caps>(scenario, source_plan)
.await
.map(|runner| (runner, attached_metadata(scenario)));
.map(|runner| (runner, existing_cluster_metadata(scenario)));
}
let deployment = scenario.deployment();
@ -137,7 +144,7 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
))
}
async fn deploy_attached_only<Caps>(
async fn deploy_existing_cluster<Caps>(
&self,
scenario: &Scenario<E, Caps>,
source_plan: SourceOrchestrationPlan,
@ -157,11 +164,12 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
let node_control = self.attached_node_control::<Caps>(scenario)?;
let cluster_wait = self.attached_cluster_wait(scenario)?;
let (feed, feed_task) = spawn_block_feed_with_retry::<E>(&node_clients).await?;
let context = build_run_context(
let assembly = build_runtime_assembly(
scenario.deployment().clone(),
node_clients,
scenario.duration(),
scenario.expectation_cooldown(),
scenario.cluster_control_profile(),
observability.telemetry_handle()?,
feed,
node_control,
@ -169,7 +177,7 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
);
let cleanup_guard: Box<dyn CleanupGuard> = Box::new(feed_task);
Ok(Runner::new(context, Some(cleanup_guard)))
Ok(assembly.build_runner(Some(cleanup_guard)))
}
fn source_providers(&self, managed_clients: Vec<E::NodeClient>) -> SourceProviders<E> {
@ -216,7 +224,7 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
let attach = scenario
.existing_cluster()
.ok_or(ComposeRunnerError::InternalInvariant {
message: "attached node control requested outside attached source mode",
message: "existing-cluster node control requested outside existing-cluster mode",
})?;
let node_control = ComposeAttachedNodeControl::try_from_existing_cluster(attach)
.map_err(|source| ComposeRunnerError::SourceOrchestration { source })?;
@ -234,7 +242,7 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
let attach = scenario
.existing_cluster()
.ok_or(ComposeRunnerError::InternalInvariant {
message: "compose attached cluster wait requested outside attached source mode",
message: "compose cluster wait requested outside existing-cluster mode",
})?;
let cluster_wait = ComposeAttachedClusterWait::<E>::try_new(compose_runner_host(), attach)
.map_err(|source| ComposeRunnerError::SourceOrchestration { source })?;
@ -268,6 +276,7 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
descriptors: prepared.descriptors.clone(),
duration: scenario.duration(),
expectation_cooldown: scenario.expectation_cooldown(),
cluster_control_profile: scenario.cluster_control_profile(),
telemetry,
environment: &mut prepared.environment,
node_control,
@ -283,7 +292,7 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
"compose runtime prepared"
);
Ok(Runner::new(runtime.context, Some(cleanup_guard)))
Ok(runtime.assembly.build_runner(Some(cleanup_guard)))
}
fn maybe_node_control<Caps>(
@ -363,7 +372,57 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
}
}
fn attached_metadata<E, Caps>(scenario: &Scenario<E, Caps>) -> ComposeDeploymentMetadata
fn validate_supported_cluster_mode<E: Application, Caps>(
scenario: &Scenario<E, Caps>,
) -> Result<(), DynError> {
if !matches!(scenario.cluster_mode(), ClusterMode::ExistingCluster) {
return Ok(());
}
let cluster = scenario
.existing_cluster()
.ok_or_else(|| DynError::from("existing-cluster mode requires an existing cluster"))?;
ensure_compose_existing_cluster(cluster)
}
fn ensure_compose_existing_cluster(cluster: &ExistingCluster) -> Result<(), DynError> {
if cluster.compose_project().is_some() && cluster.compose_services().is_some() {
return Ok(());
}
Err("compose deployer requires a compose existing-cluster descriptor".into())
}
#[cfg(test)]
mod tests {
use testing_framework_core::scenario::ExistingCluster;
use super::ensure_compose_existing_cluster;
#[test]
fn compose_cluster_validator_accepts_compose_descriptor() {
ensure_compose_existing_cluster(&ExistingCluster::for_compose_project(
"project".to_owned(),
))
.expect("compose descriptor should be accepted");
}
#[test]
fn compose_cluster_validator_rejects_k8s_descriptor() {
let error = ensure_compose_existing_cluster(&ExistingCluster::for_k8s_selector(
"app=node".to_owned(),
))
.expect_err("k8s descriptor should be rejected");
assert_eq!(
error.to_string(),
"compose deployer requires a compose existing-cluster descriptor"
);
}
}
fn existing_cluster_metadata<E, Caps>(scenario: &Scenario<E, Caps>) -> ComposeDeploymentMetadata
where
E: ComposeDeployEnv,
Caps: Send + Sync,
@ -379,7 +438,7 @@ struct DeployedNodes<E: ComposeDeployEnv> {
}
struct ComposeRuntime<E: ComposeDeployEnv> {
context: RunContext<E>,
assembly: RuntimeAssembly<E>,
feed_task: FeedHandle,
}
@ -388,6 +447,7 @@ struct RuntimeBuildInput<'a, E: ComposeDeployEnv> {
descriptors: E::Deployment,
duration: Duration,
expectation_cooldown: Duration,
cluster_control_profile: ClusterControlProfile,
telemetry: Metrics,
environment: &'a mut StackEnvironment,
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
@ -408,18 +468,22 @@ async fn build_compose_runtime<E: ComposeDeployEnv>(
.start_block_feed(&node_clients, input.environment)
.await?;
let context = build_run_context(
let assembly = build_runtime_assembly(
input.descriptors,
node_clients,
input.duration,
input.expectation_cooldown,
input.cluster_control_profile,
input.telemetry,
feed,
input.node_control,
input.cluster_wait,
);
Ok(ComposeRuntime { context, feed_task })
Ok(ComposeRuntime {
assembly,
feed_task,
})
}
async fn deploy_nodes<E: ComposeDeployEnv>(
@ -452,26 +516,33 @@ async fn deploy_nodes<E: ComposeDeployEnv>(
})
}
fn build_run_context<E: ComposeDeployEnv>(
fn build_runtime_assembly<E: ComposeDeployEnv>(
descriptors: E::Deployment,
node_clients: NodeClients<E>,
run_duration: Duration,
expectation_cooldown: Duration,
cluster_control_profile: ClusterControlProfile,
telemetry: Metrics,
feed: <E::FeedRuntime as FeedRuntime>::Feed,
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
) -> RunContext<E> {
RunContext::new(
) -> RuntimeAssembly<E> {
let mut assembly = RuntimeAssembly::new(
descriptors,
node_clients,
run_duration,
expectation_cooldown,
cluster_control_profile,
telemetry,
feed,
node_control,
)
.with_cluster_wait(cluster_wait)
.with_cluster_wait(cluster_wait);
if let Some(node_control) = node_control {
assembly = assembly.with_node_control(node_control);
}
assembly
}
fn resolve_observability_inputs<E, Caps>(

View File

@ -155,7 +155,7 @@ impl<E: Application> NodeControlHandle<E> for ComposeNodeControl {
}
}
/// Node control handle for compose attached mode.
/// Node control handle for compose existing-cluster mode.
pub struct ComposeAttachedNodeControl {
pub(crate) project_name: String,
}

View File

@ -247,9 +247,9 @@ impl<E: K8sDeployEnv> ClusterWaitHandle<E> for K8sAttachedClusterWait<E> {
}
fn k8s_wait_request(source: &ExistingCluster) -> Result<K8sAttachRequest<'_>, DynError> {
let label_selector = source
.k8s_label_selector()
.ok_or_else(|| DynError::from("k8s cluster wait requires a k8s attach source"))?;
let label_selector = source.k8s_label_selector().ok_or_else(|| {
DynError::from("k8s cluster wait requires a k8s existing-cluster descriptor")
})?;
if label_selector.trim().is_empty() {
return Err(K8sAttachDiscoveryError::EmptyLabelSelector.into());

View File

@ -5,11 +5,12 @@ use kube::Client;
use reqwest::Url;
use testing_framework_core::{
scenario::{
Application, ApplicationExternalProvider, CleanupGuard, ClusterWaitHandle, Deployer,
DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, MetricsError,
NodeClients, ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl,
RunContext, Runner, Scenario, SourceOrchestrationPlan, SourceProviders,
StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers,
Application, ApplicationExternalProvider, CleanupGuard, ClusterControlProfile, ClusterMode,
ClusterWaitHandle, Deployer, DynError, ExistingCluster, FeedHandle, FeedRuntime,
HttpReadinessRequirement, Metrics, MetricsError, NodeClients,
ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, Runner,
RuntimeAssembly, Scenario, SourceOrchestrationPlan, SourceProviders, StaticManagedProvider,
build_source_orchestration_plan, orchestrate_sources_with_providers,
},
topology::DeploymentDescriptor,
};
@ -170,6 +171,9 @@ where
E: K8sDeployEnv,
Caps: ObservabilityCapabilityProvider + Send + Sync,
{
validate_supported_cluster_mode(scenario)
.map_err(|source| K8sRunnerError::SourceOrchestration { source })?;
// Source planning is currently resolved here before deployer-specific setup.
let source_plan = build_source_orchestration_plan(scenario).map_err(|source| {
K8sRunnerError::SourceOrchestration {
@ -179,9 +183,10 @@ where
let observability = resolve_observability_inputs(scenario.capabilities())?;
if scenario.uses_existing_cluster() {
let runner = deploy_attached_only::<E, Caps>(scenario, source_plan, observability).await?;
return Ok((runner, attached_metadata(scenario)));
if matches!(scenario.cluster_mode(), ClusterMode::ExistingCluster) {
let runner =
deploy_existing_cluster::<E, Caps>(scenario, source_plan, observability).await?;
return Ok((runner, existing_cluster_metadata(scenario)));
}
let deployment = build_k8s_deployment::<E, Caps>(deployer, scenario, &observability).await?;
@ -204,15 +209,15 @@ where
runtime.node_clients = resolve_node_clients(&source_plan, source_providers).await?;
ensure_non_empty_node_clients(&runtime.node_clients)?;
let parts = build_runner_parts(scenario, deployment.node_count, runtime, cluster_wait);
log_configured_observability(&observability);
maybe_print_endpoints::<E>(&observability, &parts.node_clients);
maybe_print_endpoints::<E>(&observability, &runtime.node_clients);
let parts = build_runner_parts(scenario, deployment.node_count, runtime, cluster_wait);
let runner = finalize_runner::<E>(&mut cluster, parts)?;
Ok((runner, metadata))
}
async fn deploy_attached_only<E, Caps>(
async fn deploy_existing_cluster<E, Caps>(
scenario: &Scenario<E, Caps>,
source_plan: SourceOrchestrationPlan,
observability: ObservabilityInputs,
@ -230,21 +235,21 @@ where
let telemetry = observability.telemetry_handle()?;
let (feed, feed_task) = spawn_block_feed_with::<E>(&node_clients).await?;
let cluster_wait = attached_cluster_wait::<E, Caps>(scenario, client)?;
let context = RunContext::new(
let context = RuntimeAssembly::new(
scenario.deployment().clone(),
node_clients,
scenario.duration(),
scenario.expectation_cooldown(),
scenario.cluster_control_profile(),
telemetry,
feed,
None,
)
.with_cluster_wait(cluster_wait);
Ok(Runner::new(context, Some(Box::new(feed_task))))
Ok(context.build_runner(Some(Box::new(feed_task))))
}
fn attached_metadata<E, Caps>(scenario: &Scenario<E, Caps>) -> K8sDeploymentMetadata
fn existing_cluster_metadata<E, Caps>(scenario: &Scenario<E, Caps>) -> K8sDeploymentMetadata
where
E: K8sDeployEnv,
Caps: Send + Sync,
@ -263,7 +268,7 @@ where
let attach = scenario
.existing_cluster()
.ok_or_else(|| K8sRunnerError::InternalInvariant {
message: "k8s attached cluster wait requested outside attached source mode".to_owned(),
message: "k8s cluster wait requested outside existing-cluster mode".to_owned(),
})?;
let cluster_wait = K8sAttachedClusterWait::<E>::try_new(client, attach)
.map_err(|source| K8sRunnerError::SourceOrchestration { source })?;
@ -271,6 +276,54 @@ where
Ok(Arc::new(cluster_wait))
}
fn validate_supported_cluster_mode<E: Application, Caps>(
scenario: &Scenario<E, Caps>,
) -> Result<(), DynError> {
if !matches!(scenario.cluster_mode(), ClusterMode::ExistingCluster) {
return Ok(());
}
let cluster = scenario
.existing_cluster()
.ok_or_else(|| DynError::from("existing-cluster mode requires an existing cluster"))?;
ensure_k8s_existing_cluster(cluster)
}
fn ensure_k8s_existing_cluster(cluster: &ExistingCluster) -> Result<(), DynError> {
if cluster.k8s_label_selector().is_some() {
return Ok(());
}
Err("k8s deployer requires a k8s existing-cluster descriptor".into())
}
#[cfg(test)]
mod tests {
use testing_framework_core::scenario::ExistingCluster;
use super::ensure_k8s_existing_cluster;
#[test]
fn k8s_cluster_validator_accepts_k8s_descriptor() {
ensure_k8s_existing_cluster(&ExistingCluster::for_k8s_selector("app=node".to_owned()))
.expect("k8s descriptor should be accepted");
}
#[test]
fn k8s_cluster_validator_rejects_compose_descriptor() {
let error = ensure_k8s_existing_cluster(&ExistingCluster::for_compose_project(
"project".to_owned(),
))
.expect_err("compose descriptor should be rejected");
assert_eq!(
error.to_string(),
"k8s deployer requires a k8s existing-cluster descriptor"
);
}
}
fn managed_cluster_wait<E: K8sDeployEnv>(
cluster: &Option<ClusterEnvironment>,
metadata: &K8sDeploymentMetadata,
@ -498,15 +551,19 @@ fn build_runner_parts<E: K8sDeployEnv, Caps>(
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
) -> K8sRunnerParts<E> {
K8sRunnerParts {
descriptors: scenario.deployment().clone(),
node_clients: runtime.node_clients,
duration: scenario.duration(),
expectation_cooldown: scenario.expectation_cooldown(),
telemetry: runtime.telemetry,
feed: runtime.feed,
assembly: build_k8s_runtime_assembly(
scenario.deployment().clone(),
runtime.node_clients,
scenario.duration(),
scenario.expectation_cooldown(),
scenario.cluster_control_profile(),
runtime.telemetry,
runtime.feed,
cluster_wait,
),
feed_task: runtime.feed_task,
node_count,
cluster_wait,
duration_secs: scenario.duration().as_secs(),
}
}
@ -594,15 +651,10 @@ fn maybe_print_endpoints<E: K8sDeployEnv>(
}
struct K8sRunnerParts<E: K8sDeployEnv> {
descriptors: E::Deployment,
node_clients: NodeClients<E>,
duration: Duration,
expectation_cooldown: Duration,
telemetry: Metrics,
feed: Feed<E>,
assembly: RuntimeAssembly<E>,
feed_task: FeedHandle,
node_count: usize,
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
duration_secs: u64,
}
fn finalize_runner<E: K8sDeployEnv>(
@ -613,36 +665,21 @@ fn finalize_runner<E: K8sDeployEnv>(
let (cleanup, port_forwards) = environment.into_cleanup()?;
let K8sRunnerParts {
descriptors,
node_clients,
duration,
expectation_cooldown,
telemetry,
feed,
assembly,
feed_task,
node_count,
cluster_wait,
duration_secs,
} = parts;
let duration_secs = duration.as_secs();
let cleanup_guard: Box<dyn CleanupGuard> =
Box::new(K8sCleanupGuard::new(cleanup, feed_task, port_forwards));
let context = build_k8s_run_context(
descriptors,
node_clients,
duration,
expectation_cooldown,
telemetry,
feed,
cluster_wait,
);
info!(
nodes = node_count,
duration_secs, "k8s deployment ready; handing control to scenario runner"
);
Ok(Runner::new(context, Some(cleanup_guard)))
Ok(assembly.build_runner(Some(cleanup_guard)))
}
fn take_ready_cluster(
@ -655,23 +692,24 @@ fn take_ready_cluster(
})
}
fn build_k8s_run_context<E: K8sDeployEnv>(
fn build_k8s_runtime_assembly<E: K8sDeployEnv>(
descriptors: E::Deployment,
node_clients: NodeClients<E>,
duration: Duration,
expectation_cooldown: Duration,
cluster_control_profile: ClusterControlProfile,
telemetry: Metrics,
feed: Feed<E>,
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
) -> RunContext<E> {
RunContext::new(
) -> RuntimeAssembly<E> {
RuntimeAssembly::new(
descriptors,
node_clients,
duration,
expectation_cooldown,
cluster_control_profile,
telemetry,
feed,
None,
)
.with_cluster_wait(cluster_wait)
}

View File

@ -10,10 +10,10 @@ use std::{
use async_trait::async_trait;
use testing_framework_core::{
scenario::{
Application, CleanupGuard, Deployer, DeploymentPolicy, DynError, FeedHandle, FeedRuntime,
HttpReadinessRequirement, Metrics, NodeClients, NodeControlCapability, NodeControlHandle,
RetryPolicy, RunContext, Runner, Scenario, ScenarioError, SourceOrchestrationPlan,
build_source_orchestration_plan, spawn_feed,
Application, CleanupGuard, ClusterControlProfile, ClusterMode, Deployer, DeploymentPolicy,
DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients,
NodeControlCapability, NodeControlHandle, RetryPolicy, Runner, RuntimeAssembly, Scenario,
ScenarioError, SourceOrchestrationPlan, build_source_orchestration_plan, spawn_feed,
},
topology::DeploymentDescriptor,
};
@ -187,6 +187,8 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
&self,
scenario: &Scenario<E, ()>,
) -> Result<Runner<E>, ProcessDeployerError> {
validate_supported_cluster_mode(scenario)?;
// Source planning is currently resolved here before node spawn/runtime setup.
let source_plan = build_source_orchestration_plan(scenario).map_err(|source| {
ProcessDeployerError::SourceOrchestration {
@ -211,6 +213,7 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
node_clients,
scenario.duration(),
scenario.expectation_cooldown(),
scenario.cluster_control_profile(),
None,
)
.await?;
@ -218,13 +221,15 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
let cleanup_guard: Box<dyn CleanupGuard> =
Box::new(LocalProcessGuard::<E>::new(nodes, runtime.feed_task));
Ok(Runner::new(runtime.context, Some(cleanup_guard)))
Ok(runtime.assembly.build_runner(Some(cleanup_guard)))
}
async fn deploy_with_node_control(
&self,
scenario: &Scenario<E, NodeControlCapability>,
) -> Result<Runner<E>, ProcessDeployerError> {
validate_supported_cluster_mode(scenario)?;
// Source planning is currently resolved here before node spawn/runtime setup.
let source_plan = build_source_orchestration_plan(scenario).map_err(|source| {
ProcessDeployerError::SourceOrchestration {
@ -248,14 +253,14 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
node_clients,
scenario.duration(),
scenario.expectation_cooldown(),
scenario.cluster_control_profile(),
Some(node_control),
)
.await?;
Ok(Runner::new(
runtime.context,
Some(Box::new(runtime.feed_task)),
))
Ok(runtime
.assembly
.build_runner(Some(Box::new(runtime.feed_task))))
}
fn node_control_from(
@ -312,6 +317,22 @@ impl<E: LocalDeployerEnv> ProcessDeployer<E> {
}
}
fn validate_supported_cluster_mode<E: Application, Caps>(
scenario: &Scenario<E, Caps>,
) -> Result<(), ProcessDeployerError> {
ensure_local_cluster_mode(scenario.cluster_mode())
}
fn ensure_local_cluster_mode(mode: ClusterMode) -> Result<(), ProcessDeployerError> {
if matches!(mode, ClusterMode::ExistingCluster) {
return Err(ProcessDeployerError::SourceOrchestration {
source: DynError::from("local deployer does not support existing-cluster mode"),
});
}
Ok(())
}
fn merge_source_clients_for_local<E: LocalDeployerEnv>(
source_plan: &SourceOrchestrationPlan,
node_clients: NodeClients<E>,
@ -339,6 +360,29 @@ fn build_retry_execution_config(
(retry_policy, execution)
}
#[cfg(test)]
mod tests {
use testing_framework_core::scenario::ClusterMode;
use super::ensure_local_cluster_mode;
#[test]
fn local_cluster_validator_accepts_managed_mode() {
ensure_local_cluster_mode(ClusterMode::Managed).expect("managed mode should be accepted");
}
#[test]
fn local_cluster_validator_rejects_existing_cluster_mode() {
let error = ensure_local_cluster_mode(ClusterMode::ExistingCluster)
.expect_err("existing-cluster mode should be rejected");
assert_eq!(
error.to_string(),
"source orchestration failed: local deployer does not support existing-cluster mode"
);
}
}
async fn run_retry_attempt<E: LocalDeployerEnv>(
descriptors: &E::Deployment,
execution: RetryExecutionConfig,
@ -475,7 +519,7 @@ fn log_local_deploy_start(node_count: usize, policy: DeploymentPolicy, has_node_
}
struct RuntimeContext<E: Application> {
context: RunContext<E>,
assembly: RuntimeAssembly<E>,
feed_task: FeedHandle,
}
@ -484,6 +528,7 @@ async fn run_context_for<E: Application>(
node_clients: NodeClients<E>,
duration: Duration,
expectation_cooldown: Duration,
cluster_control_profile: ClusterControlProfile,
node_control: Option<Arc<dyn NodeControlHandle<E>>>,
) -> Result<RuntimeContext<E>, ProcessDeployerError> {
if node_clients.is_empty() {
@ -491,15 +536,21 @@ async fn run_context_for<E: Application>(
}
let (feed, feed_task) = spawn_feed_with::<E>(&node_clients).await?;
let context = RunContext::new(
let mut assembly = RuntimeAssembly::new(
descriptors,
node_clients,
duration,
expectation_cooldown,
cluster_control_profile,
Metrics::empty(),
feed,
node_control,
);
if let Some(node_control) = node_control {
assembly = assembly.with_node_control(node_control);
}
Ok(RuntimeContext { context, feed_task })
Ok(RuntimeContext {
assembly,
feed_task,
})
}