From 315286ec0d92bcbff10a0a7640cfed9f32b12eb0 Mon Sep 17 00:00:00 2001 From: andrussal Date: Sat, 7 Mar 2026 08:44:47 +0100 Subject: [PATCH] Bring k8s attach onto attached runner contract --- .../examples/tests/k8s_attach_node_control.rs | 5 ++ logos/runtime/ext/src/k8s_env.rs | 2 +- .../k8s/src/deployer/attach_provider.rs | 57 ++++++++++++++++- .../k8s/src/deployer/orchestrator.rs | 61 ++++++++++++++++--- 4 files changed, 112 insertions(+), 13 deletions(-) diff --git a/logos/examples/tests/k8s_attach_node_control.rs b/logos/examples/tests/k8s_attach_node_control.rs index 5281607..7bc1bc4 100644 --- a/logos/examples/tests/k8s_attach_node_control.rs +++ b/logos/examples/tests/k8s_attach_node_control.rs @@ -32,6 +32,11 @@ async fn k8s_attach_mode_queries_node_api_opt_in() -> Result<()> { Err(error) => return Err(Error::new(error)), }; + attached_runner + .wait_network_ready() + .await + .map_err(|err| anyhow!("k8s attached runner readiness failed: {err}"))?; + if attached_runner.context().node_clients().is_empty() { return Err(anyhow!("k8s attach resolved no node clients")); } diff --git a/logos/runtime/ext/src/k8s_env.rs b/logos/runtime/ext/src/k8s_env.rs index 15b2dc9..8c9a1e8 100644 --- a/logos/runtime/ext/src/k8s_env.rs +++ b/logos/runtime/ext/src/k8s_env.rs @@ -31,7 +31,7 @@ use crate::{ const CFGSYNC_K8S_TIMEOUT_SECS: u64 = 300; const K8S_FULLNAME_OVERRIDE: &str = "logos-runner"; -const DEFAULT_K8S_TESTNET_IMAGE: &str = "public.ecr.aws/r4s5t9y4/logos/logos-blockchain:test"; +const DEFAULT_K8S_TESTNET_IMAGE: &str = "logos-blockchain-testing:local"; /// Paths and image metadata required to deploy the Helm chart. pub struct K8sAssets { diff --git a/testing-framework/deployers/k8s/src/deployer/attach_provider.rs b/testing-framework/deployers/k8s/src/deployer/attach_provider.rs index e448b1f..7063992 100644 --- a/testing-framework/deployers/k8s/src/deployer/attach_provider.rs +++ b/testing-framework/deployers/k8s/src/deployer/attach_provider.rs @@ -7,8 +7,10 @@ use kube::{ api::{ListParams, ObjectList}, }; use testing_framework_core::scenario::{ - AttachProvider, AttachProviderError, AttachSource, AttachedNode, DynError, ExternalNodeSource, + AttachProvider, AttachProviderError, AttachSource, AttachedNode, ClusterWaitHandle, DynError, + ExternalNodeSource, HttpReadinessRequirement, wait_http_readiness, }; +use url::Url; use crate::{env::K8sDeployEnv, host::node_host}; @@ -31,6 +33,12 @@ pub(super) struct K8sAttachProvider { _env: PhantomData, } +pub(super) struct K8sAttachedClusterWait { + client: Client, + source: AttachSource, + _env: PhantomData, +} + impl K8sAttachProvider { pub(super) fn new(client: Client) -> Self { Self { @@ -40,6 +48,16 @@ impl K8sAttachProvider { } } +impl K8sAttachedClusterWait { + pub(super) fn new(client: Client, source: AttachSource) -> Self { + Self { + client, + source, + _env: PhantomData, + } + } +} + #[async_trait] impl AttachProvider for K8sAttachProvider { async fn discover( @@ -100,7 +118,7 @@ fn to_discovery_error(source: DynError) -> AttachProviderError { AttachProviderError::Discovery { source } } -async fn discover_services( +pub(super) async fn discover_services( client: &Client, namespace: &str, selector: &str, @@ -151,7 +169,7 @@ fn tcp_node_ports(service: &Service) -> Vec<(String, u16)> { .collect() } -fn extract_api_node_port(service: &Service) -> Result { +pub(super) fn extract_api_node_port(service: &Service) -> Result { let service_name = service .metadata .name @@ -197,6 +215,39 @@ fn api_port_candidates(ports: Vec<(String, u16)>) -> Vec { ports.into_iter().map(|(_, port)| port).collect() } +#[async_trait] +impl ClusterWaitHandle for K8sAttachedClusterWait { + async fn wait_network_ready(&self) -> Result<(), DynError> { + let AttachSource::K8s { + namespace, + label_selector, + } = &self.source + else { + return Err("k8s cluster wait requires a k8s attach source".into()); + }; + + if label_selector.trim().is_empty() { + return Err(K8sAttachDiscoveryError::EmptyLabelSelector.into()); + } + + let namespace = namespace.as_deref().unwrap_or("default"); + let services = discover_services(&self.client, namespace, label_selector).await?; + let host = node_host(); + let mut endpoints = Vec::with_capacity(services.items.len()); + + for service in &services.items { + let api_port = extract_api_node_port(service)?; + let mut endpoint = Url::parse(&format!("http://{host}:{api_port}/"))?; + endpoint.set_path(E::readiness_path()); + endpoints.push(endpoint); + } + + wait_http_readiness(&endpoints, HttpReadinessRequirement::AllNodesReady).await?; + + Ok(()) + } +} + #[cfg(test)] mod tests { use k8s_openapi::api::core::v1::{Service, ServicePort, ServiceSpec}; diff --git a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs index 4089ecf..72539a3 100644 --- a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs @@ -5,18 +5,22 @@ use kube::Client; use reqwest::Url; use testing_framework_core::{ scenario::{ - Application, ApplicationExternalProvider, AttachSource, CleanupGuard, Deployer, DynError, - FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, MetricsError, NodeClients, - ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext, - Runner, Scenario, ScenarioSources, SourceOrchestrationPlan, SourceProviders, - StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers, + Application, ApplicationExternalProvider, AttachSource, CleanupGuard, ClusterWaitHandle, + Deployer, DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, + MetricsError, NodeClients, ObservabilityCapabilityProvider, ObservabilityInputs, + RequiresNodeControl, RunContext, Runner, Scenario, ScenarioSources, + SourceOrchestrationPlan, SourceProviders, StaticManagedProvider, + build_source_orchestration_plan, orchestrate_sources_with_providers, }, topology::DeploymentDescriptor, }; use tracing::{error, info}; use crate::{ - deployer::{K8sDeploymentMetadata, attach_provider::K8sAttachProvider}, + deployer::{ + K8sDeploymentMetadata, + attach_provider::{K8sAttachProvider, K8sAttachedClusterWait}, + }, env::K8sDeployEnv, infrastructure::cluster::{ ClusterEnvironment, ClusterEnvironmentError, NodeClientError, PortSpecs, @@ -200,7 +204,17 @@ where runtime.node_clients = resolve_node_clients(&source_plan, source_providers).await?; ensure_non_empty_node_clients(&runtime.node_clients)?; - let parts = build_runner_parts(scenario, deployment.node_count, runtime); + let parts = build_runner_parts( + scenario, + deployment.node_count, + runtime, + Arc::new(K8sAttachedClusterWait::::new( + client_from_cluster(&cluster)?, + metadata + .attach_source() + .map_err(|source| K8sRunnerError::SourceOrchestration { source })?, + )), + ); log_configured_observability(&observability); maybe_print_endpoints::(&observability, &parts.node_clients); @@ -218,13 +232,14 @@ where Caps: ObservabilityCapabilityProvider + Send + Sync, { let client = init_kube_client().await?; - let source_providers = source_providers::(client, Vec::new()); + let source_providers = source_providers::(client.clone(), Vec::new()); let node_clients = resolve_node_clients(&source_plan, source_providers).await?; ensure_non_empty_node_clients(&node_clients)?; let telemetry = observability.telemetry_handle()?; let (feed, feed_task) = spawn_block_feed_with::(&node_clients).await?; + let cluster_wait = attached_cluster_wait::(scenario, client)?; let context = RunContext::new( scenario.deployment().clone(), node_clients, @@ -233,7 +248,8 @@ where telemetry, feed, None, - ); + ) + .with_cluster_wait(cluster_wait); Ok(Runner::new(context, Some(Box::new(feed_task)))) } @@ -262,6 +278,26 @@ where } } +fn attached_cluster_wait( + scenario: &Scenario, + client: Client, +) -> Result>, K8sRunnerError> +where + E: K8sDeployEnv, + Caps: Send + Sync, +{ + let ScenarioSources::Attached { attach, .. } = scenario.sources() else { + return Err(K8sRunnerError::InternalInvariant { + message: "k8s attached cluster wait requested outside attached source mode".to_owned(), + }); + }; + + Ok(Arc::new(K8sAttachedClusterWait::::new( + client, + attach.clone(), + ))) +} + fn client_from_cluster(cluster: &Option) -> Result { let client = cluster .as_ref() @@ -472,6 +508,7 @@ fn build_runner_parts( scenario: &Scenario, node_count: usize, runtime: RuntimeArtifacts, + cluster_wait: Arc>, ) -> K8sRunnerParts { K8sRunnerParts { descriptors: scenario.deployment().clone(), @@ -482,6 +519,7 @@ fn build_runner_parts( feed: runtime.feed, feed_task: runtime.feed_task, node_count, + cluster_wait, } } @@ -577,6 +615,7 @@ struct K8sRunnerParts { feed: Feed, feed_task: FeedHandle, node_count: usize, + cluster_wait: Arc>, } fn finalize_runner( @@ -595,6 +634,7 @@ fn finalize_runner( feed, feed_task, node_count, + cluster_wait, } = parts; let duration_secs = duration.as_secs(); @@ -607,6 +647,7 @@ fn finalize_runner( expectation_cooldown, telemetry, feed, + cluster_wait, ); info!( @@ -634,6 +675,7 @@ fn build_k8s_run_context( expectation_cooldown: Duration, telemetry: Metrics, feed: Feed, + cluster_wait: Arc>, ) -> RunContext { RunContext::new( descriptors, @@ -644,6 +686,7 @@ fn build_k8s_run_context( feed, None, ) + .with_cluster_wait(cluster_wait) } fn endpoint_or_disabled(endpoint: Option<&Url>) -> String {