use std::{ marker::PhantomData, sync::{ Arc, atomic::{AtomicUsize, Ordering}, }, time::Duration, }; use async_trait::async_trait; use testing_framework_core::{ scenario::{ Application, CleanupGuard, ClusterControlProfile, Deployer, DeploymentPolicy, DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlCapability, NodeControlHandle, RetryPolicy, Runner, RuntimeAssembly, Scenario, ScenarioError, SourceOrchestrationPlan, build_source_orchestration_plan, spawn_feed, }, topology::DeploymentDescriptor, }; use thiserror::Error; use tokio_retry::{ RetryIf, strategy::{ExponentialBackoff, jitter}, }; use tracing::{debug, info, warn}; use crate::{ env::{LocalDeployerEnv, Node, wait_local_http_readiness}, external::build_external_client, keep_tempdir_from_env, manual::ManualCluster, node_control::{NodeManager, NodeManagerSeed}, }; const READINESS_ATTEMPTS: usize = 3; const READINESS_BACKOFF_BASE_MS: u64 = 250; const READINESS_BACKOFF_MAX_SECS: u64 = 2; struct LocalProcessGuard { nodes: Vec>, feed_task: Option, } impl LocalProcessGuard { fn new(nodes: Vec>, feed_task: FeedHandle) -> Self { Self { nodes, feed_task: Some(feed_task), } } } impl CleanupGuard for LocalProcessGuard { fn cleanup(mut self: Box) { if let Some(feed_task) = self.feed_task.take() { CleanupGuard::cleanup(Box::new(feed_task)); } // Nodes own local processes; dropping them stops the processes. drop(self.nodes); } } /// Spawns nodes as local processes. #[derive(Clone)] pub struct ProcessDeployer { membership_check: bool, _env: PhantomData, } /// Errors returned by the local deployer. #[derive(Debug, Error)] pub enum ProcessDeployerError { #[error("failed to spawn local topology: {source}")] Spawn { #[source] source: DynError, }, #[error("readiness probe failed: {source}")] ReadinessFailed { #[source] source: DynError, }, #[error("scenario topology is not supported by the local deployer")] UnsupportedTopology, #[error("workload failed: {source}")] WorkloadFailed { #[source] source: DynError, }, #[error("runtime preflight failed: no node clients available")] RuntimePreflight, #[error("source orchestration failed: {source}")] SourceOrchestration { #[source] source: DynError, }, #[error("expectations failed: {source}")] ExpectationsFailed { #[source] source: DynError, }, } #[derive(Debug, Error)] enum RetryAttemptError { #[error("failed to spawn local topology: {source}")] Spawn { #[source] source: DynError, }, #[error("readiness probe failed: {source}")] Readiness { #[source] source: DynError, }, } impl From for ProcessDeployerError { fn from(value: RetryAttemptError) -> Self { match value { RetryAttemptError::Spawn { source } => Self::Spawn { source }, RetryAttemptError::Readiness { source } => Self::ReadinessFailed { source }, } } } #[derive(Clone, Copy)] struct RetryExecutionConfig { max_attempts: usize, keep_tempdir: bool, readiness_enabled: bool, readiness_requirement: HttpReadinessRequirement, } impl From for ProcessDeployerError { fn from(value: ScenarioError) -> Self { match value { ScenarioError::Workload(source) => Self::WorkloadFailed { source }, ScenarioError::ExpectationCapture(source) | ScenarioError::ExpectationFailedDuringCapture(source) | ScenarioError::Expectations(source) => Self::ExpectationsFailed { source }, } } } #[async_trait] impl Deployer for ProcessDeployer { type Error = ProcessDeployerError; async fn deploy(&self, scenario: &Scenario) -> Result, Self::Error> { self.deploy_without_node_control(scenario).await } } #[async_trait] impl Deployer for ProcessDeployer { type Error = ProcessDeployerError; async fn deploy( &self, scenario: &Scenario, ) -> Result, Self::Error> { self.deploy_with_node_control(scenario).await } } impl ProcessDeployer { /// Construct a local deployer. #[must_use] pub fn new() -> Self { Self::default() } /// Enable or disable membership readiness checks. #[must_use] pub fn with_membership_check(mut self, enabled: bool) -> Self { self.membership_check = enabled; self } /// Build a manual cluster from a prepared topology descriptor. #[must_use] pub fn manual_cluster_from_descriptors(&self, descriptors: E::Deployment) -> ManualCluster { ManualCluster::from_topology(descriptors) } async fn deploy_without_node_control( &self, scenario: &Scenario, ) -> Result, ProcessDeployerError> { // Source planning is currently resolved here before node spawn/runtime setup. let source_plan = build_source_orchestration_plan(scenario).map_err(|source| { ProcessDeployerError::SourceOrchestration { source: source.into(), } })?; log_local_deploy_start( scenario.deployment().node_count(), scenario.deployment_policy(), false, ); let nodes = Self::spawn_nodes_for_scenario(scenario, self.membership_check).await?; let node_clients = NodeClients::::new(nodes.iter().map(|node| node.client()).collect()); let node_clients = merge_source_clients_for_local::(&source_plan, node_clients) .map_err(|source| ProcessDeployerError::SourceOrchestration { source })?; let runtime = run_context_for( scenario.deployment().clone(), node_clients, scenario.duration(), scenario.expectation_cooldown(), scenario.cluster_control_profile(), None, ) .await?; let cleanup_guard: Box = Box::new(LocalProcessGuard::::new(nodes, runtime.feed_task)); Ok(runtime.assembly.build_runner(Some(cleanup_guard))) } async fn deploy_with_node_control( &self, scenario: &Scenario, ) -> Result, ProcessDeployerError> { // Source planning is currently resolved here before node spawn/runtime setup. let source_plan = build_source_orchestration_plan(scenario).map_err(|source| { ProcessDeployerError::SourceOrchestration { source: source.into(), } })?; log_local_deploy_start( scenario.deployment().node_count(), scenario.deployment_policy(), true, ); let nodes = Self::spawn_nodes_for_scenario(scenario, self.membership_check).await?; let node_control = self.node_control_from(scenario, nodes); let node_clients = merge_source_clients_for_local::(&source_plan, node_control.node_clients()) .map_err(|source| ProcessDeployerError::SourceOrchestration { source })?; let runtime = run_context_for( scenario.deployment().clone(), node_clients, scenario.duration(), scenario.expectation_cooldown(), scenario.cluster_control_profile(), Some(node_control), ) .await?; Ok(runtime .assembly .build_runner(Some(Box::new(runtime.feed_task)))) } fn node_control_from( &self, scenario: &Scenario, nodes: Vec>, ) -> Arc> { let node_control = Arc::new(NodeManager::new_with_seed( scenario.deployment().clone(), NodeClients::default(), keep_tempdir(scenario.deployment_policy()), NodeManagerSeed::default(), )); node_control.initialize_with_nodes(nodes); node_control } async fn spawn_nodes_for_scenario( scenario: &Scenario, membership_check: bool, ) -> Result>, ProcessDeployerError> { info!( nodes = scenario.deployment().node_count(), "spawning local nodes" ); Self::spawn_with_readiness_retry( scenario.deployment(), membership_check, scenario.deployment_policy(), ) .await } async fn spawn_with_readiness_retry( descriptors: &E::Deployment, membership_check: bool, deployment_policy: DeploymentPolicy, ) -> Result>, ProcessDeployerError> { let (retry_policy, execution) = build_retry_execution_config(deployment_policy, membership_check); let attempts = Arc::new(AtomicUsize::new(0)); let strategy = retry_backoff_strategy(retry_policy, execution.max_attempts); let operation = { let attempts = Arc::clone(&attempts); move || { let attempts = Arc::clone(&attempts); async move { run_retry_attempt::(descriptors, execution, attempts).await } } }; let should_retry = retry_decision(Arc::clone(&attempts), execution.max_attempts); let nodes = RetryIf::spawn(strategy, operation, should_retry).await?; Ok(nodes) } } fn merge_source_clients_for_local( source_plan: &SourceOrchestrationPlan, node_clients: NodeClients, ) -> Result, DynError> { for source in source_plan.external_sources() { let client = E::external_node_client(source).or_else(|_| build_external_client::(source))?; node_clients.add_node(client); } Ok(node_clients) } fn build_retry_execution_config( deployment_policy: DeploymentPolicy, membership_check: bool, ) -> (RetryPolicy, RetryExecutionConfig) { let retry_policy = retry_policy_from(deployment_policy); let execution = RetryExecutionConfig { max_attempts: retry_policy.max_attempts.max(1), keep_tempdir: keep_tempdir(deployment_policy), readiness_enabled: deployment_policy.readiness_enabled && membership_check, readiness_requirement: deployment_policy.readiness_requirement, }; (retry_policy, execution) } async fn run_retry_attempt( descriptors: &E::Deployment, execution: RetryExecutionConfig, attempts: Arc, ) -> Result>, RetryAttemptError> { let attempt = attempts.fetch_add(1, Ordering::Relaxed) + 1; let nodes = spawn_nodes_for_attempt::(descriptors, execution.keep_tempdir).await?; run_readiness_for_attempt::(attempt, nodes, execution).await } fn retry_policy_from(deployment_policy: DeploymentPolicy) -> RetryPolicy { deployment_policy .retry_policy .unwrap_or_else(default_local_retry_policy) } fn retry_backoff_strategy( retry_policy: RetryPolicy, max_attempts: usize, ) -> impl Iterator { ExponentialBackoff::from_millis(retry_policy.base_delay.as_millis() as u64) .max_delay(retry_policy.max_delay) .map(jitter) .take(max_attempts.saturating_sub(1)) } async fn spawn_nodes_for_attempt( descriptors: &E::Deployment, keep_tempdir: bool, ) -> Result>, RetryAttemptError> { NodeManager::::spawn_initial_nodes(descriptors, keep_tempdir) .await .map_err(|source| RetryAttemptError::Spawn { source: source.into(), }) } async fn run_readiness_for_attempt( attempt: usize, nodes: Vec>, execution: RetryExecutionConfig, ) -> Result>, RetryAttemptError> { if !execution.readiness_enabled { info!("skipping local readiness checks"); return Ok(nodes); } match wait_local_http_readiness::(&nodes, execution.readiness_requirement).await { Ok(()) => { info!(attempt, "local nodes are ready"); Ok(nodes) } Err(source) => { let error: DynError = source.into(); debug!(attempt, error = ?error, "local readiness failed"); drop(nodes); Err(RetryAttemptError::Readiness { source: error }) } } } fn retry_decision( attempts: Arc, max_attempts: usize, ) -> impl FnMut(&RetryAttemptError) -> bool { move |error: &RetryAttemptError| { let attempt = attempts.load(Ordering::Relaxed); if attempt < max_attempts { warn!( attempt, max_attempts, error = %error, "local spawn/readiness failed; retrying with backoff" ); true } else { false } } } impl Default for ProcessDeployer { fn default() -> Self { Self { membership_check: true, _env: PhantomData, } } } const fn default_local_retry_policy() -> RetryPolicy { RetryPolicy::new( READINESS_ATTEMPTS, Duration::from_millis(READINESS_BACKOFF_BASE_MS), Duration::from_secs(READINESS_BACKOFF_MAX_SECS), ) } fn keep_tempdir(policy: DeploymentPolicy) -> bool { policy.cleanup_policy.preserve_artifacts || keep_tempdir_from_env() } async fn spawn_feed_with( node_clients: &NodeClients, ) -> Result<(::Feed, FeedHandle), ProcessDeployerError> { let node_count = node_clients.len(); debug!(nodes = node_count, "starting local feed"); if node_count == 0 { return Err(ProcessDeployerError::WorkloadFailed { source: "feed requires at least one node".into(), }); } info!("starting feed"); spawn_feed::(node_clients.clone()) .await .map_err(workload_error) } fn workload_error(source: DynError) -> ProcessDeployerError { ProcessDeployerError::WorkloadFailed { source } } fn log_local_deploy_start(node_count: usize, policy: DeploymentPolicy, has_node_control: bool) { info!( nodes = node_count, node_control = has_node_control, readiness_enabled = policy.readiness_enabled, readiness_requirement = ?policy.readiness_requirement, "starting local deployment" ); } struct RuntimeContext { assembly: RuntimeAssembly, feed_task: FeedHandle, } async fn run_context_for( descriptors: E::Deployment, node_clients: NodeClients, duration: Duration, expectation_cooldown: Duration, cluster_control_profile: ClusterControlProfile, node_control: Option>>, ) -> Result, ProcessDeployerError> { if node_clients.is_empty() { return Err(ProcessDeployerError::RuntimePreflight); } let (feed, feed_task) = spawn_feed_with::(&node_clients).await?; let mut assembly = RuntimeAssembly::new( descriptors, node_clients, duration, expectation_cooldown, cluster_control_profile, Metrics::empty(), feed, ); if let Some(node_control) = node_control { assembly = assembly.with_node_control(node_control); } Ok(RuntimeContext { assembly, feed_task, }) }