Bring k8s attach onto attached runner contract

This commit is contained in:
andrussal 2026-03-07 08:44:47 +01:00
parent 739c7a2e86
commit 315286ec0d
4 changed files with 112 additions and 13 deletions

View File

@ -32,6 +32,11 @@ async fn k8s_attach_mode_queries_node_api_opt_in() -> Result<()> {
Err(error) => return Err(Error::new(error)),
};
attached_runner
.wait_network_ready()
.await
.map_err(|err| anyhow!("k8s attached runner readiness failed: {err}"))?;
if attached_runner.context().node_clients().is_empty() {
return Err(anyhow!("k8s attach resolved no node clients"));
}

View File

@ -31,7 +31,7 @@ use crate::{
const CFGSYNC_K8S_TIMEOUT_SECS: u64 = 300;
const K8S_FULLNAME_OVERRIDE: &str = "logos-runner";
const DEFAULT_K8S_TESTNET_IMAGE: &str = "public.ecr.aws/r4s5t9y4/logos/logos-blockchain:test";
const DEFAULT_K8S_TESTNET_IMAGE: &str = "logos-blockchain-testing:local";
/// Paths and image metadata required to deploy the Helm chart.
pub struct K8sAssets {

View File

@ -7,8 +7,10 @@ use kube::{
api::{ListParams, ObjectList},
};
use testing_framework_core::scenario::{
AttachProvider, AttachProviderError, AttachSource, AttachedNode, DynError, ExternalNodeSource,
AttachProvider, AttachProviderError, AttachSource, AttachedNode, ClusterWaitHandle, DynError,
ExternalNodeSource, HttpReadinessRequirement, wait_http_readiness,
};
use url::Url;
use crate::{env::K8sDeployEnv, host::node_host};
@ -31,6 +33,12 @@ pub(super) struct K8sAttachProvider<E: K8sDeployEnv> {
_env: PhantomData<E>,
}
pub(super) struct K8sAttachedClusterWait<E: K8sDeployEnv> {
client: Client,
source: AttachSource,
_env: PhantomData<E>,
}
impl<E: K8sDeployEnv> K8sAttachProvider<E> {
pub(super) fn new(client: Client) -> Self {
Self {
@ -40,6 +48,16 @@ impl<E: K8sDeployEnv> K8sAttachProvider<E> {
}
}
impl<E: K8sDeployEnv> K8sAttachedClusterWait<E> {
pub(super) fn new(client: Client, source: AttachSource) -> Self {
Self {
client,
source,
_env: PhantomData,
}
}
}
#[async_trait]
impl<E: K8sDeployEnv> AttachProvider<E> for K8sAttachProvider<E> {
async fn discover(
@ -100,7 +118,7 @@ fn to_discovery_error(source: DynError) -> AttachProviderError {
AttachProviderError::Discovery { source }
}
async fn discover_services(
pub(super) async fn discover_services(
client: &Client,
namespace: &str,
selector: &str,
@ -151,7 +169,7 @@ fn tcp_node_ports(service: &Service) -> Vec<(String, u16)> {
.collect()
}
fn extract_api_node_port(service: &Service) -> Result<u16, DynError> {
pub(super) fn extract_api_node_port(service: &Service) -> Result<u16, DynError> {
let service_name = service
.metadata
.name
@ -197,6 +215,39 @@ fn api_port_candidates(ports: Vec<(String, u16)>) -> Vec<u16> {
ports.into_iter().map(|(_, port)| port).collect()
}
#[async_trait]
impl<E: K8sDeployEnv> ClusterWaitHandle<E> for K8sAttachedClusterWait<E> {
async fn wait_network_ready(&self) -> Result<(), DynError> {
let AttachSource::K8s {
namespace,
label_selector,
} = &self.source
else {
return Err("k8s cluster wait requires a k8s attach source".into());
};
if label_selector.trim().is_empty() {
return Err(K8sAttachDiscoveryError::EmptyLabelSelector.into());
}
let namespace = namespace.as_deref().unwrap_or("default");
let services = discover_services(&self.client, namespace, label_selector).await?;
let host = node_host();
let mut endpoints = Vec::with_capacity(services.items.len());
for service in &services.items {
let api_port = extract_api_node_port(service)?;
let mut endpoint = Url::parse(&format!("http://{host}:{api_port}/"))?;
endpoint.set_path(E::readiness_path());
endpoints.push(endpoint);
}
wait_http_readiness(&endpoints, HttpReadinessRequirement::AllNodesReady).await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use k8s_openapi::api::core::v1::{Service, ServicePort, ServiceSpec};

View File

@ -5,18 +5,22 @@ use kube::Client;
use reqwest::Url;
use testing_framework_core::{
scenario::{
Application, ApplicationExternalProvider, AttachSource, CleanupGuard, Deployer, DynError,
FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, MetricsError, NodeClients,
ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext,
Runner, Scenario, ScenarioSources, SourceOrchestrationPlan, SourceProviders,
StaticManagedProvider, build_source_orchestration_plan, orchestrate_sources_with_providers,
Application, ApplicationExternalProvider, AttachSource, CleanupGuard, ClusterWaitHandle,
Deployer, DynError, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics,
MetricsError, NodeClients, ObservabilityCapabilityProvider, ObservabilityInputs,
RequiresNodeControl, RunContext, Runner, Scenario, ScenarioSources,
SourceOrchestrationPlan, SourceProviders, StaticManagedProvider,
build_source_orchestration_plan, orchestrate_sources_with_providers,
},
topology::DeploymentDescriptor,
};
use tracing::{error, info};
use crate::{
deployer::{K8sDeploymentMetadata, attach_provider::K8sAttachProvider},
deployer::{
K8sDeploymentMetadata,
attach_provider::{K8sAttachProvider, K8sAttachedClusterWait},
},
env::K8sDeployEnv,
infrastructure::cluster::{
ClusterEnvironment, ClusterEnvironmentError, NodeClientError, PortSpecs,
@ -200,7 +204,17 @@ where
runtime.node_clients = resolve_node_clients(&source_plan, source_providers).await?;
ensure_non_empty_node_clients(&runtime.node_clients)?;
let parts = build_runner_parts(scenario, deployment.node_count, runtime);
let parts = build_runner_parts(
scenario,
deployment.node_count,
runtime,
Arc::new(K8sAttachedClusterWait::<E>::new(
client_from_cluster(&cluster)?,
metadata
.attach_source()
.map_err(|source| K8sRunnerError::SourceOrchestration { source })?,
)),
);
log_configured_observability(&observability);
maybe_print_endpoints::<E>(&observability, &parts.node_clients);
@ -218,13 +232,14 @@ where
Caps: ObservabilityCapabilityProvider + Send + Sync,
{
let client = init_kube_client().await?;
let source_providers = source_providers::<E>(client, Vec::new());
let source_providers = source_providers::<E>(client.clone(), Vec::new());
let node_clients = resolve_node_clients(&source_plan, source_providers).await?;
ensure_non_empty_node_clients(&node_clients)?;
let telemetry = observability.telemetry_handle()?;
let (feed, feed_task) = spawn_block_feed_with::<E>(&node_clients).await?;
let cluster_wait = attached_cluster_wait::<E, Caps>(scenario, client)?;
let context = RunContext::new(
scenario.deployment().clone(),
node_clients,
@ -233,7 +248,8 @@ where
telemetry,
feed,
None,
);
)
.with_cluster_wait(cluster_wait);
Ok(Runner::new(context, Some(Box::new(feed_task))))
}
@ -262,6 +278,26 @@ where
}
}
fn attached_cluster_wait<E, Caps>(
scenario: &Scenario<E, Caps>,
client: Client,
) -> Result<Arc<dyn ClusterWaitHandle<E>>, K8sRunnerError>
where
E: K8sDeployEnv,
Caps: Send + Sync,
{
let ScenarioSources::Attached { attach, .. } = scenario.sources() else {
return Err(K8sRunnerError::InternalInvariant {
message: "k8s attached cluster wait requested outside attached source mode".to_owned(),
});
};
Ok(Arc::new(K8sAttachedClusterWait::<E>::new(
client,
attach.clone(),
)))
}
fn client_from_cluster(cluster: &Option<ClusterEnvironment>) -> Result<Client, K8sRunnerError> {
let client = cluster
.as_ref()
@ -472,6 +508,7 @@ fn build_runner_parts<E: K8sDeployEnv, Caps>(
scenario: &Scenario<E, Caps>,
node_count: usize,
runtime: RuntimeArtifacts<E>,
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
) -> K8sRunnerParts<E> {
K8sRunnerParts {
descriptors: scenario.deployment().clone(),
@ -482,6 +519,7 @@ fn build_runner_parts<E: K8sDeployEnv, Caps>(
feed: runtime.feed,
feed_task: runtime.feed_task,
node_count,
cluster_wait,
}
}
@ -577,6 +615,7 @@ struct K8sRunnerParts<E: K8sDeployEnv> {
feed: Feed<E>,
feed_task: FeedHandle,
node_count: usize,
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
}
fn finalize_runner<E: K8sDeployEnv>(
@ -595,6 +634,7 @@ fn finalize_runner<E: K8sDeployEnv>(
feed,
feed_task,
node_count,
cluster_wait,
} = parts;
let duration_secs = duration.as_secs();
@ -607,6 +647,7 @@ fn finalize_runner<E: K8sDeployEnv>(
expectation_cooldown,
telemetry,
feed,
cluster_wait,
);
info!(
@ -634,6 +675,7 @@ fn build_k8s_run_context<E: K8sDeployEnv>(
expectation_cooldown: Duration,
telemetry: Metrics,
feed: Feed<E>,
cluster_wait: Arc<dyn ClusterWaitHandle<E>>,
) -> RunContext<E> {
RunContext::new(
descriptors,
@ -644,6 +686,7 @@ fn build_k8s_run_context<E: K8sDeployEnv>(
feed,
None,
)
.with_cluster_wait(cluster_wait)
}
fn endpoint_or_disabled(endpoint: Option<&Url>) -> String {