diff --git a/Cargo.lock b/Cargo.lock index 03e40e3..a8c8bcd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2268,6 +2268,8 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "cfgsync-artifacts", + "cfgsync-core", "k8s-openapi", "kube", "reqwest", diff --git a/cfgsync/adapter/src/artifacts.rs b/cfgsync/adapter/src/artifacts.rs index 2830b10..98f2fa2 100644 --- a/cfgsync/adapter/src/artifacts.rs +++ b/cfgsync/adapter/src/artifacts.rs @@ -39,6 +39,11 @@ impl MaterializedArtifacts { self.nodes.get(identifier) } + /// Inserts or replaces the node-local artifact set for one identifier. + pub fn set_node(&mut self, identifier: impl Into, artifacts: ArtifactSet) { + self.nodes.insert(identifier.into(), artifacts); + } + /// Returns the shared artifact set. #[must_use] pub fn shared(&self) -> &ArtifactSet { diff --git a/cfgsync/adapter/src/sources.rs b/cfgsync/adapter/src/sources.rs index 8e61acf..4a73afa 100644 --- a/cfgsync/adapter/src/sources.rs +++ b/cfgsync/adapter/src/sources.rs @@ -1,8 +1,9 @@ use std::{collections::HashMap, sync::Mutex}; +use cfgsync_artifacts::ArtifactSet; use cfgsync_core::{ CfgsyncErrorResponse, ConfigResolveResponse, NodeArtifactsPayload, NodeConfigSource, - NodeRegistration, RegisterNodeResponse, + NodeRegistration, RegisterNodeResponse, ReplaceNodeArtifactsRequest, }; use crate::{ @@ -23,6 +24,7 @@ impl RegistrationSnapshotMaterializer for MaterializedArtifacts { pub struct RegistrationConfigSource { materializer: M, registrations: Mutex>, + node_overrides: Mutex>, } impl RegistrationConfigSource { @@ -31,6 +33,7 @@ impl RegistrationConfigSource { Self { materializer, registrations: Mutex::new(HashMap::new()), + node_overrides: Mutex::new(HashMap::new()), } } @@ -51,6 +54,14 @@ impl RegistrationConfigSource { RegistrationSnapshot::new(registrations.values().cloned().collect()) } + + fn override_for(&self, identifier: &str) -> Option { + let overrides = self + .node_overrides + .lock() + .expect("cfgsync override store should not be poisoned"); + overrides.get(identifier).cloned() + } } impl NodeConfigSource for RegistrationConfigSource @@ -92,6 +103,12 @@ where } }; + if let Some(override_files) = self.override_for(®istration.identifier) { + let mut files = override_files.files; + files.extend(materialized.shared().files.iter().cloned()); + return ConfigResolveResponse::Config(NodeArtifactsPayload::from_files(files)); + } + match materialized.resolve(®istration.identifier) { Some(config) => { ConfigResolveResponse::Config(NodeArtifactsPayload::from_files(config.files)) @@ -101,6 +118,18 @@ where )), } } + + fn replace_node_artifacts( + &self, + request: ReplaceNodeArtifactsRequest, + ) -> Result<(), CfgsyncErrorResponse> { + let mut overrides = self + .node_overrides + .lock() + .expect("cfgsync override store should not be poisoned"); + overrides.insert(request.identifier, ArtifactSet::new(request.files)); + Ok(()) + } } #[cfg(test)] @@ -110,6 +139,7 @@ mod tests { use cfgsync_artifacts::{ArtifactFile, ArtifactSet}; use cfgsync_core::{ CfgsyncErrorCode, ConfigResolveResponse, NodeConfigSource, NodeRegistration, + ReplaceNodeArtifactsRequest, }; use super::RegistrationConfigSource; @@ -257,4 +287,43 @@ mod tests { let _ = source.resolve(®istration); let _ = source.resolve(®istration); } + + #[test] + fn registration_source_replaces_node_local_files_while_preserving_shared() { + let source = RegistrationConfigSource::new( + MaterializedArtifacts::from_nodes([( + "node-1".to_owned(), + ArtifactSet::new(vec![ArtifactFile::new( + "/config.yaml".to_string(), + "old: 1".to_string(), + )]), + )]) + .with_shared(ArtifactSet::new(vec![ArtifactFile::new( + "/shared.yaml".to_string(), + "shared: true".to_string(), + )])), + ); + + let registration = + NodeRegistration::new("node-1".to_string(), "127.0.0.1".parse().expect("parse ip")); + let _ = source.register(registration.clone()); + source + .replace_node_artifacts(ReplaceNodeArtifactsRequest { + identifier: "node-1".to_owned(), + files: vec![ArtifactFile::new( + "/config.yaml".to_string(), + "new: 2".to_string(), + )], + }) + .expect("replace node artifacts"); + + match source.resolve(®istration) { + ConfigResolveResponse::Config(payload) => { + assert_eq!(payload.files.len(), 2); + assert_eq!(payload.files[0].content, "new: 2"); + assert_eq!(payload.files[1].path, "/shared.yaml"); + } + ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"), + } + } } diff --git a/cfgsync/core/src/client.rs b/cfgsync/core/src/client.rs index 52dd22b..14ab70f 100644 --- a/cfgsync/core/src/client.rs +++ b/cfgsync/core/src/client.rs @@ -1,7 +1,10 @@ use serde::Serialize; use thiserror::Error; -use crate::{CfgsyncErrorCode, CfgsyncErrorResponse, NodeArtifactsPayload, NodeRegistration}; +use crate::{ + CfgsyncErrorCode, CfgsyncErrorResponse, NodeArtifactFile, NodeArtifactsPayload, + NodeRegistration, ReplaceNodeArtifactsRequest, +}; /// cfgsync client-side request/response failures. #[derive(Debug, Error)] @@ -94,6 +97,22 @@ impl Client { } } + /// Replaces the served files for one node identifier. + pub async fn replace_node_artifacts( + &self, + identifier: impl Into, + files: Vec, + ) -> Result<(), ClientError> { + self.post_status_only( + "/node/replace", + &ReplaceNodeArtifactsRequest { + identifier: identifier.into(), + files, + }, + ) + .await + } + /// Posts JSON payload to a cfgsync endpoint and decodes cfgsync payload. pub async fn post_json( &self, diff --git a/cfgsync/core/src/lib.rs b/cfgsync/core/src/lib.rs index e855acb..1da09a4 100644 --- a/cfgsync/core/src/lib.rs +++ b/cfgsync/core/src/lib.rs @@ -12,7 +12,7 @@ pub use client::{Client, ClientError, ConfigFetchStatus}; pub use protocol::{ CFGSYNC_SCHEMA_VERSION, CfgsyncErrorCode, CfgsyncErrorResponse, ConfigResolveResponse, NodeArtifactFile, NodeArtifactsPayload, NodeRegistration, RegisterNodeResponse, - RegistrationPayload, + RegistrationPayload, ReplaceNodeArtifactsRequest, }; pub use render::{ CfgsyncConfigOverrides, CfgsyncOutputPaths, RenderedCfgsync, apply_cfgsync_overrides, diff --git a/cfgsync/core/src/protocol.rs b/cfgsync/core/src/protocol.rs index 2b81ca4..f3bd128 100644 --- a/cfgsync/core/src/protocol.rs +++ b/cfgsync/core/src/protocol.rs @@ -21,6 +21,16 @@ pub struct NodeArtifactsPayload { pub files: Vec, } +/// Administrative request to replace the served files for one node. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ReplaceNodeArtifactsRequest { + /// Stable node identifier whose files should be replaced. + pub identifier: String, + /// Files that should be served for the node on subsequent resolves. + #[serde(default)] + pub files: Vec, +} + /// Adapter-owned registration payload stored alongside a generic node identity. #[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct RegistrationPayload { diff --git a/cfgsync/core/src/server.rs b/cfgsync/core/src/server.rs index 1c56f54..2b4b973 100644 --- a/cfgsync/core/src/server.rs +++ b/cfgsync/core/src/server.rs @@ -5,7 +5,7 @@ use thiserror::Error; use crate::{ CfgsyncErrorCode, ConfigResolveResponse, NodeConfigSource, NodeRegistration, - RegisterNodeResponse, + RegisterNodeResponse, ReplaceNodeArtifactsRequest, }; /// Runtime state shared across cfgsync HTTP handlers. @@ -69,6 +69,19 @@ async fn register_node( } } +async fn replace_node_artifacts( + State(state): State>, + Json(payload): Json, +) -> impl IntoResponse { + match state.repo.replace_node_artifacts(payload) { + Ok(()) => StatusCode::ACCEPTED.into_response(), + Err(error) => { + let status = error_status(&error.code); + (status, Json(error)).into_response() + } + } +} + fn resolve_node_config_response( state: &CfgsyncServerState, registration: &NodeRegistration, @@ -90,6 +103,7 @@ pub fn build_cfgsync_router(state: CfgsyncServerState) -> Router { Router::new() .route("/register", post(register_node)) .route("/node", post(node_config)) + .route("/node/replace", post(replace_node_artifacts)) .with_state(Arc::new(state)) } @@ -99,6 +113,7 @@ pub fn build_legacy_cfgsync_router(state: CfgsyncServerState) -> Router { Router::new() .route("/register", post(register_node)) .route("/node", post(node_config)) + .route("/node/replace", post(replace_node_artifacts)) .route("/init-with-node", post(node_config)) .with_state(Arc::new(state)) } @@ -126,10 +141,13 @@ mod tests { use axum::{Json, extract::State, http::StatusCode, response::IntoResponse}; - use super::{CfgsyncServerState, NodeRegistration, node_config, register_node}; + use super::{ + CfgsyncServerState, NodeRegistration, node_config, register_node, replace_node_artifacts, + }; use crate::{ CFGSYNC_SCHEMA_VERSION, CfgsyncErrorCode, CfgsyncErrorResponse, ConfigResolveResponse, NodeArtifactFile, NodeArtifactsPayload, NodeConfigSource, RegisterNodeResponse, + ReplaceNodeArtifactsRequest, }; struct StaticProvider { @@ -160,6 +178,13 @@ mod tests { ConfigResolveResponse::Config, ) } + + fn replace_node_artifacts( + &self, + _request: ReplaceNodeArtifactsRequest, + ) -> Result<(), CfgsyncErrorResponse> { + Ok(()) + } } struct RegistrationAwareProvider { @@ -288,4 +313,27 @@ mod tests { assert_eq!(response.status(), StatusCode::TOO_EARLY); } + + #[tokio::test] + async fn replace_node_artifacts_returns_accepted() { + let provider = Arc::new(StaticProvider { + data: HashMap::new(), + }); + let state = Arc::new(CfgsyncServerState::new(provider)); + + let response = replace_node_artifacts( + State(state), + Json(ReplaceNodeArtifactsRequest { + identifier: "node-a".to_string(), + files: vec![NodeArtifactFile::new( + "/config.yaml".to_string(), + "a: 1".to_string(), + )], + }), + ) + .await + .into_response(); + + assert_eq!(response.status(), StatusCode::ACCEPTED); + } } diff --git a/cfgsync/core/src/source.rs b/cfgsync/core/src/source.rs index 00a04a3..6b2006f 100644 --- a/cfgsync/core/src/source.rs +++ b/cfgsync/core/src/source.rs @@ -4,7 +4,7 @@ use thiserror::Error; use crate::{ NodeArtifactsBundle, NodeArtifactsBundleEntry, NodeArtifactsPayload, NodeRegistration, - RegisterNodeResponse, protocol::ConfigResolveResponse, + RegisterNodeResponse, ReplaceNodeArtifactsRequest, protocol::ConfigResolveResponse, }; /// Source of cfgsync node payloads. @@ -14,18 +14,30 @@ pub trait NodeConfigSource: Send + Sync { /// Resolves the current artifact payload for a previously registered node. fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse; + + /// Replaces the served files for one node identifier. + fn replace_node_artifacts( + &self, + _request: ReplaceNodeArtifactsRequest, + ) -> Result<(), crate::CfgsyncErrorResponse> { + Err(crate::CfgsyncErrorResponse::internal( + "node artifact replacement is not supported by this source".to_owned(), + )) + } } /// In-memory map-backed source used by cfgsync server state. pub struct StaticConfigSource { - configs: HashMap, + configs: std::sync::Mutex>, } impl StaticConfigSource { /// Builds an in-memory source from fully formed payloads. #[must_use] pub fn from_payloads(configs: HashMap) -> Arc { - Arc::new(Self { configs }) + Arc::new(Self { + configs: std::sync::Mutex::new(configs), + }) } /// Builds an in-memory source from a static bundle document. @@ -37,7 +49,11 @@ impl StaticConfigSource { impl NodeConfigSource for StaticConfigSource { fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse { - if self.configs.contains_key(®istration.identifier) { + let configs = self + .configs + .lock() + .expect("cfgsync static source should not be poisoned"); + if configs.contains_key(®istration.identifier) { RegisterNodeResponse::Registered } else { RegisterNodeResponse::Error(crate::CfgsyncErrorResponse::missing_config( @@ -47,17 +63,33 @@ impl NodeConfigSource for StaticConfigSource { } fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse { - self.configs - .get(®istration.identifier) - .cloned() - .map_or_else( - || { - ConfigResolveResponse::Error(crate::CfgsyncErrorResponse::missing_config( - ®istration.identifier, - )) - }, - ConfigResolveResponse::Config, - ) + let configs = self + .configs + .lock() + .expect("cfgsync static source should not be poisoned"); + configs.get(®istration.identifier).cloned().map_or_else( + || { + ConfigResolveResponse::Error(crate::CfgsyncErrorResponse::missing_config( + ®istration.identifier, + )) + }, + ConfigResolveResponse::Config, + ) + } + + fn replace_node_artifacts( + &self, + request: ReplaceNodeArtifactsRequest, + ) -> Result<(), crate::CfgsyncErrorResponse> { + let mut configs = self + .configs + .lock() + .expect("cfgsync static source should not be poisoned"); + configs.insert( + request.identifier, + NodeArtifactsPayload::from_files(request.files), + ); + Ok(()) } } @@ -152,7 +184,9 @@ impl BundleConfigSource { .collect(); Ok(Self { - inner: StaticConfigSource { configs }, + inner: StaticConfigSource { + configs: std::sync::Mutex::new(configs), + }, }) } } @@ -165,6 +199,13 @@ impl NodeConfigSource for BundleConfigSource { fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse { self.inner.resolve(registration) } + + fn replace_node_artifacts( + &self, + request: ReplaceNodeArtifactsRequest, + ) -> Result<(), crate::CfgsyncErrorResponse> { + self.inner.replace_node_artifacts(request) + } } fn payload_from_bundle_node(node: NodeArtifactsBundleEntry) -> (String, NodeArtifactsPayload) { @@ -183,7 +224,7 @@ mod tests { use super::{BundleConfigSource, StaticConfigSource}; use crate::{ CFGSYNC_SCHEMA_VERSION, CfgsyncErrorCode, ConfigResolveResponse, NodeArtifactFile, - NodeArtifactsPayload, NodeConfigSource, NodeRegistration, + NodeArtifactsPayload, NodeConfigSource, NodeRegistration, ReplaceNodeArtifactsRequest, }; fn sample_payload() -> NodeArtifactsPayload { @@ -197,7 +238,9 @@ mod tests { fn resolves_existing_identifier() { let mut configs = HashMap::new(); configs.insert("node-1".to_owned(), sample_payload()); - let repo = StaticConfigSource { configs }; + let repo = StaticConfigSource { + configs: std::sync::Mutex::new(configs), + }; match repo.resolve(&NodeRegistration::new( "node-1".to_string(), @@ -215,7 +258,7 @@ mod tests { #[test] fn reports_missing_identifier() { let repo = StaticConfigSource { - configs: HashMap::new(), + configs: std::sync::Mutex::new(HashMap::new()), }; match repo.resolve(&NodeRegistration::new( @@ -265,7 +308,9 @@ nodes: fn resolve_accepts_known_registration_without_gating() { let mut configs = HashMap::new(); configs.insert("node-1".to_owned(), sample_payload()); - let repo = StaticConfigSource { configs }; + let repo = StaticConfigSource { + configs: std::sync::Mutex::new(configs), + }; match repo.resolve(&NodeRegistration::new( "node-1".to_string(), @@ -275,4 +320,33 @@ nodes: ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"), } } + + #[test] + fn static_source_replaces_node_artifacts() { + let mut configs = HashMap::new(); + configs.insert("node-1".to_owned(), sample_payload()); + let repo = StaticConfigSource { + configs: std::sync::Mutex::new(configs), + }; + + repo.replace_node_artifacts(ReplaceNodeArtifactsRequest { + identifier: "node-1".to_owned(), + files: vec![NodeArtifactFile::new( + "/config.yaml".to_string(), + "updated: true".to_string(), + )], + }) + .expect("replace node artifacts"); + + match repo.resolve(&NodeRegistration::new( + "node-1".to_string(), + "127.0.0.1".parse().expect("parse ip"), + )) { + ConfigResolveResponse::Config(payload) => { + assert_eq!(payload.files.len(), 1); + assert_eq!(payload.files[0].content, "updated: true"); + } + ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"), + } + } } diff --git a/testing-framework/core/src/cfgsync/mod.rs b/testing-framework/core/src/cfgsync/mod.rs index b769e43..cb9452e 100644 --- a/testing-framework/core/src/cfgsync/mod.rs +++ b/testing-framework/core/src/cfgsync/mod.rs @@ -7,7 +7,10 @@ pub use cfgsync_core::render::{CfgsyncOutputPaths, RenderedCfgsync}; use serde::Serialize; use thiserror::Error; -use crate::{scenario::Application, topology::DeploymentDescriptor}; +use crate::{ + scenario::{Application, StartNodeOptions}, + topology::DeploymentDescriptor, +}; #[doc(hidden)] pub type DynCfgsyncError = Box; @@ -40,7 +43,7 @@ pub trait StaticArtifactRenderer { } #[doc(hidden)] -pub trait StaticNodeConfigProvider: Application { +pub trait StaticNodeConfigProvider: Application + Sized { type Error: Error + Send + Sync + 'static; fn build_node_config( @@ -58,6 +61,15 @@ pub trait StaticNodeConfigProvider: Application { } fn serialize_node_config(config: &Self::NodeConfig) -> Result; + + fn build_node_artifacts_for_options( + _deployment: &Self::Deployment, + _node_index: usize, + _hostnames: &[String], + _options: &StartNodeOptions, + ) -> Result, Self::Error> { + Ok(None) + } } impl StaticArtifactRenderer for T @@ -151,6 +163,16 @@ pub fn build_static_artifacts( Ok(cfgsync_adapter::MaterializedArtifacts::from_nodes(output)) } +pub fn build_node_artifact_override( + deployment: &E::Deployment, + node_index: usize, + hostnames: &[String], + options: &StartNodeOptions, +) -> Result, BuildStaticArtifactsError> { + E::build_node_artifacts_for_options(deployment, node_index, hostnames, options) + .map_err(adapter_error) +} + #[doc(hidden)] pub use build_static_artifacts as build_cfgsync_node_catalog; diff --git a/testing-framework/core/src/scenario/config.rs b/testing-framework/core/src/scenario/config.rs index c7d73bb..5d03ae3 100644 --- a/testing-framework/core/src/scenario/config.rs +++ b/testing-framework/core/src/scenario/config.rs @@ -1,6 +1,13 @@ -use std::{collections::HashMap, error::Error}; +use std::{collections::HashMap, error::Error, io}; -use crate::{cfgsync::StaticNodeConfigProvider, env::Application, topology::DeploymentDescriptor}; +use cfgsync_artifacts::{ArtifactFile, ArtifactSet}; + +use crate::{ + cfgsync::StaticNodeConfigProvider, + env::Application, + scenario::{PeerSelection, StartNodeOptions}, + topology::DeploymentDescriptor, +}; #[derive(Clone, Debug)] pub struct ClusterPeerView { @@ -116,6 +123,7 @@ impl StaticNodeConfigProvider for T where T: ClusterNodeConfigApplication, T::Deployment: DeploymentDescriptor, + T::ConfigError: From, { type Error = T::ConfigError; @@ -139,6 +147,30 @@ where fn serialize_node_config(config: &Self::NodeConfig) -> Result { T::serialize_cluster_node_config(config) } + + fn build_node_artifacts_for_options( + deployment: &Self::Deployment, + node_index: usize, + hostnames: &[String], + options: &StartNodeOptions, + ) -> Result, Self::Error> { + match &options.peers { + PeerSelection::DefaultLayout => Ok(None), + PeerSelection::None => { + let config = + build_cluster_node_config_for_indices::(node_index, hostnames, &[])?; + let yaml = T::serialize_cluster_node_config(&config)?; + Ok(Some(single_config_artifact(yaml))) + } + PeerSelection::Named(names) => { + let indices = resolve_named_peer_indices::(deployment, node_index, names)?; + let config = + build_cluster_node_config_for_indices::(node_index, hostnames, &indices)?; + let yaml = T::serialize_cluster_node_config(&config)?; + Ok(Some(single_config_artifact(yaml))) + } + } + } } fn build_static_cluster_node_config( @@ -149,6 +181,7 @@ fn build_static_cluster_node_config( where T: ClusterNodeConfigApplication, T::Deployment: DeploymentDescriptor, + T::ConfigError: From, { let node = static_node_view::(node_index, hostnames); let peers = (0..deployment.node_count()) @@ -159,6 +192,63 @@ where T::build_cluster_node_config(&node, &peers) } +fn build_cluster_node_config_for_indices( + node_index: usize, + hostnames: &[String], + peer_indices: &[usize], +) -> Result +where + T: ClusterNodeConfigApplication, + T::ConfigError: From, +{ + let node = static_node_view::(node_index, Some(hostnames)); + let peers = peer_indices + .iter() + .copied() + .map(|index| static_peer_view::(index, Some(hostnames))) + .collect::>(); + + T::build_cluster_node_config(&node, &peers) +} + +fn resolve_named_peer_indices( + deployment: &T::Deployment, + node_index: usize, + names: &[String], +) -> Result, T::ConfigError> +where + T: ClusterNodeConfigApplication, + T::Deployment: DeploymentDescriptor, + T::ConfigError: From, +{ + let mut indices = Vec::with_capacity(names.len()); + + for name in names { + let Some(index) = parse_node_index(name) else { + return Err(io::Error::other(format!("unknown peer name '{name}'")).into()); + }; + if index >= deployment.node_count() { + return Err(io::Error::other(format!("peer index out of range for '{name}'")).into()); + } + if index != node_index { + indices.push(index); + } + } + + Ok(indices) +} + +fn parse_node_index(name: &str) -> Option { + name.strip_prefix("node-")?.parse().ok() +} + +fn single_config_artifact(config_yaml: String) -> ArtifactSet { + ArtifactSet::new(vec![ArtifactFile::new( + "/config.yaml".to_string(), + config_yaml, + )]) +} + fn static_node_view(node_index: usize, hostnames: Option<&[String]>) -> ClusterNodeView where T: ClusterNodeConfigApplication, @@ -182,3 +272,102 @@ where .unwrap_or_else(|| format!("node-{node_index}")); ClusterPeerView::new(node_index, host, T::static_network_port()) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::scenario::{Application, DefaultFeed, DefaultFeedRuntime, NodeAccess, NodeClients}; + + struct DummyClusterApp; + + #[async_trait::async_trait] + impl Application for DummyClusterApp { + type Deployment = crate::topology::ClusterTopology; + type NodeClient = String; + type NodeConfig = String; + type FeedRuntime = DefaultFeedRuntime; + + fn build_node_client( + _access: &NodeAccess, + ) -> Result { + Ok("client".to_owned()) + } + + async fn prepare_feed( + _node_clients: NodeClients, + ) -> Result<(DefaultFeed, Self::FeedRuntime), crate::scenario::DynError> { + crate::scenario::default_feed_result() + } + } + + impl ClusterNodeConfigApplication for DummyClusterApp { + type ConfigError = io::Error; + + fn static_network_port() -> u16 { + 9000 + } + + fn build_cluster_node_config( + node: &ClusterNodeView, + peers: &[ClusterPeerView], + ) -> Result { + Ok(format!( + "node={};peers={}", + node.authority(), + peers + .iter() + .map(ClusterPeerView::authority) + .collect::>() + .join(",") + )) + } + + fn serialize_cluster_node_config( + config: &Self::NodeConfig, + ) -> Result { + Ok(config.clone()) + } + } + + #[test] + fn cluster_app_builds_named_peer_override_artifacts() { + let deployment = crate::topology::ClusterTopology::new(3); + let hostnames = vec![ + "node-0.svc".to_owned(), + "node-1.svc".to_owned(), + "node-2.svc".to_owned(), + ]; + let options = + StartNodeOptions::::default().with_peers(PeerSelection::Named(vec![ + "node-0".to_owned(), + "node-2".to_owned(), + ])); + + let artifacts = + DummyClusterApp::build_node_artifacts_for_options(&deployment, 1, &hostnames, &options) + .expect("override artifacts") + .expect("expected override"); + + assert_eq!(artifacts.files.len(), 1); + assert_eq!(artifacts.files[0].path, "/config.yaml"); + assert_eq!( + artifacts.files[0].content, + "node=node-1.svc:9000;peers=node-0.svc:9000,node-2.svc:9000" + ); + } + + #[test] + fn cluster_app_builds_empty_peer_override_artifacts() { + let deployment = crate::topology::ClusterTopology::new(2); + let hostnames = vec!["node-0.svc".to_owned(), "node-1.svc".to_owned()]; + let options = + StartNodeOptions::::default().with_peers(PeerSelection::None); + + let artifacts = + DummyClusterApp::build_node_artifacts_for_options(&deployment, 1, &hostnames, &options) + .expect("override artifacts") + .expect("expected override"); + + assert_eq!(artifacts.files[0].content, "node=node-1.svc:9000;peers="); + } +} diff --git a/testing-framework/deployers/k8s/Cargo.toml b/testing-framework/deployers/k8s/Cargo.toml index 5ecffb6..fadd9d3 100644 --- a/testing-framework/deployers/k8s/Cargo.toml +++ b/testing-framework/deployers/k8s/Cargo.toml @@ -15,6 +15,8 @@ workspace = true [dependencies] anyhow = "1" async-trait = { workspace = true } +cfgsync-artifacts = { workspace = true } +cfgsync-core = { workspace = true } k8s-openapi = { features = ["latest"], version = "0.20" } kube = { default-features = false, features = ["client", "runtime", "rustls-tls"], version = "0.87" } reqwest = { features = ["json"], workspace = true } diff --git a/testing-framework/deployers/k8s/src/env.rs b/testing-framework/deployers/k8s/src/env.rs index 5088820..103a442 100644 --- a/testing-framework/deployers/k8s/src/env.rs +++ b/testing-framework/deployers/k8s/src/env.rs @@ -6,6 +6,7 @@ use std::{ }; use async_trait::async_trait; +use cfgsync_artifacts::ArtifactSet; use kube::Client; use reqwest::Url; use tempfile::TempDir; @@ -91,7 +92,7 @@ pub async fn install_helm_release_with_cleanup( } #[async_trait] -pub trait K8sDeployEnv: Application { +pub trait K8sDeployEnv: Application + Sized { type Assets: Send + Sync; /// Collect container port specs from the topology. @@ -227,4 +228,34 @@ pub trait K8sDeployEnv: Application { fn node_base_url(_client: &Self::NodeClient) -> Option { None } + + /// Optional cfgsync/bootstrap service reachable from inside the cluster. + /// + /// Manual cluster uses this to update one node's served config before + /// start. + fn cfgsync_service(_release: &str) -> Option<(String, u16)> { + None + } + + /// Hostnames that should be rendered into cfgsync-served node configs. + fn cfgsync_hostnames(release: &str, node_count: usize) -> Vec { + (0..node_count) + .map(|index| Self::node_service_name(release, index)) + .collect() + } + + /// Optional node-local artifact override for manual cluster startup + /// options. + /// + /// Return `Some(..)` when options require a node-specific config + /// replacement before the node starts. Return `None` to keep the + /// original cfgsync artifact set. + fn build_cfgsync_override_artifacts( + _topology: &Self::Deployment, + _node_index: usize, + _hostnames: &[String], + _options: &testing_framework_core::scenario::StartNodeOptions, + ) -> Result, DynError> { + Ok(None) + } } diff --git a/testing-framework/deployers/k8s/src/lifecycle/wait/mod.rs b/testing-framework/deployers/k8s/src/lifecycle/wait/mod.rs index d13a709..f1fdf89 100644 --- a/testing-framework/deployers/k8s/src/lifecycle/wait/mod.rs +++ b/testing-framework/deployers/k8s/src/lifecycle/wait/mod.rs @@ -10,6 +10,7 @@ mod orchestrator; pub(crate) mod ports; pub use forwarding::PortForwardHandle; +pub(crate) use forwarding::port_forward_service; const DEFAULT_HTTP_POLL_INTERVAL: Duration = Duration::from_secs(1); const DEFAULT_NODE_HTTP_TIMEOUT: Duration = Duration::from_secs(240); const DEFAULT_NODE_HTTP_PROBE_TIMEOUT: Duration = Duration::from_secs(30); diff --git a/testing-framework/deployers/k8s/src/manual.rs b/testing-framework/deployers/k8s/src/manual.rs index 9d83a8d..9c4f643 100644 --- a/testing-framework/deployers/k8s/src/manual.rs +++ b/testing-framework/deployers/k8s/src/manual.rs @@ -1,8 +1,10 @@ use std::{ collections::HashSet, + net::Ipv4Addr, sync::{Arc, Mutex}, }; +use cfgsync_core::Client as CfgsyncClient; use k8s_openapi::api::apps::v1::Deployment; use kube::{ Api, Client, @@ -27,7 +29,7 @@ use crate::{ cleanup::RunnerCleanup, wait::{ ClusterWaitError, NodeConfigPorts, deployment::wait_for_deployment_ready, - ports::discover_node_ports, + port_forward_service, ports::discover_node_ports, }, }, }; @@ -51,6 +53,12 @@ pub enum ManualClusterError { #[source] source: DynError, }, + #[error("failed to update cfgsync artifacts for '{name}': {source}")] + CfgsyncUpdate { + name: String, + #[source] + source: DynError, + }, #[error(transparent)] NodePorts(#[from] ClusterWaitError), #[error("unsupported start options for k8s manual cluster: {message}")] @@ -107,6 +115,7 @@ where client: Client, namespace: String, release: String, + topology: E::Deployment, node_count: usize, node_host: String, node_allocations: Vec, @@ -154,6 +163,7 @@ where client, namespace, release, + topology, node_count: nodes, node_host: node_host(), node_allocations, @@ -208,6 +218,7 @@ where } } + self.apply_cfgsync_override(index, &options).await?; scale_node::(&self.client, &self.namespace, &self.release, index, 1).await?; self.wait_node_ready(name).await?; let client = self.build_client(index, name)?; @@ -387,6 +398,45 @@ where } Ok(index) } + + async fn apply_cfgsync_override( + &self, + index: usize, + options: &StartNodeOptions, + ) -> Result<(), ManualClusterError> { + let Some((service, port)) = E::cfgsync_service(&self.release) else { + return ensure_default_peer_selection(options); + }; + + let hostnames = E::cfgsync_hostnames(&self.release, self.node_count); + let artifacts = + E::build_cfgsync_override_artifacts(&self.topology, index, &hostnames, options) + .map_err(|source| ManualClusterError::CfgsyncUpdate { + name: canonical_node_name(index), + source, + })?; + + let Some(artifacts) = artifacts else { + return ensure_default_peer_selection(options); + }; + + let forward = port_forward_service(&self.namespace, &service, port)?; + let client = CfgsyncClient::new(format!( + "http://{}:{}", + Ipv4Addr::LOCALHOST, + forward.local_port + )); + + client + .replace_node_artifacts(canonical_node_name(index), artifacts.files) + .await + .map_err(|source| ManualClusterError::CfgsyncUpdate { + name: canonical_node_name(index), + source: source.into(), + })?; + + Ok(()) + } } impl Drop for ManualCluster @@ -564,14 +614,6 @@ async fn wait_for_replicas( fn validate_start_options( options: &StartNodeOptions, ) -> Result<(), ManualClusterError> { - if !matches!( - options.peers, - testing_framework_core::scenario::PeerSelection::DefaultLayout - ) { - return Err(ManualClusterError::UnsupportedStartOptions { - message: "custom peer selection is not supported".to_owned(), - }); - } if options.config_override.is_some() || options.config_patch.is_some() { return Err(ManualClusterError::UnsupportedStartOptions { message: "config overrides/patches are not supported".to_owned(), @@ -585,6 +627,21 @@ fn validate_start_options( Ok(()) } +fn ensure_default_peer_selection( + options: &StartNodeOptions, +) -> Result<(), ManualClusterError> { + if matches!( + options.peers, + testing_framework_core::scenario::PeerSelection::DefaultLayout + ) { + return Ok(()); + } + + Err(ManualClusterError::UnsupportedStartOptions { + message: "custom peer selection is not supported".to_owned(), + }) +} + fn parse_node_index(name: &str) -> Option { name.strip_prefix("node-")?.parse().ok() } @@ -611,9 +668,12 @@ fn block_on_best_effort(fut: impl std::future::Future Result { @@ -662,6 +722,54 @@ mod tests { ) -> Result { render_single_template_chart_assets("dummy", "dummy.yaml", "") } + + fn cfgsync_service(release: &str) -> Option<(String, u16)> { + Some((format!("{release}-cfgsync"), 4400)) + } + + fn build_cfgsync_override_artifacts( + topology: &Self::Deployment, + node_index: usize, + hostnames: &[String], + options: &StartNodeOptions, + ) -> Result, DynError> { + build_node_artifact_override::(topology, node_index, hostnames, options) + .map_err(Into::into) + } + } + + impl StaticNodeConfigProvider for DummyEnv { + type Error = std::io::Error; + + fn build_node_config( + _deployment: &Self::Deployment, + node_index: usize, + ) -> Result { + Ok(format!("node={node_index};peers=default")) + } + + fn serialize_node_config(config: &Self::NodeConfig) -> Result { + Ok(config.clone()) + } + + fn build_node_artifacts_for_options( + _deployment: &Self::Deployment, + node_index: usize, + _hostnames: &[String], + options: &StartNodeOptions, + ) -> Result, Self::Error> { + let peers = match &options.peers { + PeerSelection::DefaultLayout => return Ok(None), + PeerSelection::None => "none".to_owned(), + PeerSelection::Named(names) => names.join(","), + }; + Ok(Some(cfgsync_artifacts::ArtifactSet::new(vec![ + cfgsync_artifacts::ArtifactFile::new( + "/config.yaml".to_string(), + format!("node={node_index};peers={peers}"), + ), + ]))) + } } #[test] @@ -672,13 +780,7 @@ mod tests { } #[test] - fn validate_start_options_rejects_non_default_inputs() { - let peers = StartNodeOptions::::default().with_peers(PeerSelection::None); - assert!(matches!( - validate_start_options(&peers), - Err(ManualClusterError::UnsupportedStartOptions { .. }) - )); - + fn validate_start_options_rejects_non_peer_overrides() { let persist = StartNodeOptions::::default() .with_persist_dir(std::path::PathBuf::from("/tmp/demo")); assert!(matches!( @@ -686,4 +788,33 @@ mod tests { Err(ManualClusterError::UnsupportedStartOptions { .. }) )); } + + #[test] + fn ensure_default_peer_selection_rejects_named_peers() { + let peers = StartNodeOptions::::default() + .with_peers(PeerSelection::Named(vec!["node-0".to_owned()])); + assert!(matches!( + ensure_default_peer_selection(&peers), + Err(ManualClusterError::UnsupportedStartOptions { .. }) + )); + } + + #[test] + fn dummy_env_builds_cfgsync_override_artifacts() { + let topology = testing_framework_core::topology::ClusterTopology::new(2); + let options = StartNodeOptions::::default() + .with_peers(PeerSelection::Named(vec!["node-0".to_owned()])); + + let artifacts = DummyEnv::build_cfgsync_override_artifacts( + &topology, + 1, + &["node-0".to_owned(), "node-1".to_owned()], + &options, + ) + .expect("build override") + .expect("expected override"); + + assert_eq!(artifacts.files.len(), 1); + assert_eq!(artifacts.files[0].content, "node=1;peers=node-0"); + } }