From 911d09e2c1c7f5e7d86f47fe391ebecd1e514ddc Mon Sep 17 00:00:00 2001 From: andrussal Date: Tue, 10 Mar 2026 08:57:41 +0100 Subject: [PATCH] Add adapter-backed cfgsync materialization --- Cargo.lock | 8 +- cfgsync/adapter/Cargo.toml | 4 +- cfgsync/adapter/src/lib.rs | 161 ++++++++++++++++++++++++++++++++++++- cfgsync/core/src/repo.rs | 120 +++++++++++---------------- cfgsync/core/src/server.rs | 84 ++++++++++++++++--- 5 files changed, 287 insertions(+), 90 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0d00c93..37f4acb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -920,6 +920,8 @@ dependencies = [ name = "cfgsync-adapter" version = "0.1.0" dependencies = [ + "cfgsync-artifacts", + "cfgsync-core", "thiserror 2.0.18", ] @@ -1320,7 +1322,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de" dependencies = [ "data-encoding", - "syn 2.0.114", + "syn 1.0.109", ] [[package]] @@ -5520,9 +5522,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.13" +version = "0.11.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" +checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" dependencies = [ "bytes", "getrandom 0.3.4", diff --git a/cfgsync/adapter/Cargo.toml b/cfgsync/adapter/Cargo.toml index 034b349..7a15ad1 100644 --- a/cfgsync/adapter/Cargo.toml +++ b/cfgsync/adapter/Cargo.toml @@ -13,4 +13,6 @@ version = { workspace = true } workspace = true [dependencies] -thiserror = { workspace = true } +cfgsync-artifacts = { workspace = true } +cfgsync-core = { workspace = true } +thiserror = { workspace = true } diff --git a/cfgsync/adapter/src/lib.rs b/cfgsync/adapter/src/lib.rs index 0e0c7fa..ff35b4e 100644 --- a/cfgsync/adapter/src/lib.rs +++ b/cfgsync/adapter/src/lib.rs @@ -1,5 +1,10 @@ -use std::{collections::HashMap, error::Error}; +use std::{collections::HashMap, error::Error, sync::Mutex}; +use cfgsync_artifacts::ArtifactFile; +use cfgsync_core::{ + CfgSyncErrorResponse, CfgSyncPayload, ConfigProvider, NodeRegistration, RegistrationResponse, + RepoResponse, +}; use thiserror::Error; /// Type-erased cfgsync adapter error used to preserve source context. @@ -14,6 +19,29 @@ pub struct CfgsyncNodeConfig { pub config_yaml: String, } +/// Node artifacts produced by a cfgsync materializer. +#[derive(Debug, Clone, Default)] +pub struct CfgsyncNodeArtifacts { + files: Vec, +} + +impl CfgsyncNodeArtifacts { + #[must_use] + pub fn new(files: Vec) -> Self { + Self { files } + } + + #[must_use] + pub fn files(&self) -> &[ArtifactFile] { + &self.files + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.files.is_empty() + } +} + /// Precomputed node configs indexed by stable identifier. #[derive(Debug, Clone, Default)] pub struct CfgsyncNodeCatalog { @@ -52,6 +80,91 @@ impl CfgsyncNodeCatalog { } } +/// Adapter-side node config materialization contract used by cfgsync server. +pub trait CfgsyncMaterializer: Send + Sync { + fn materialize( + &self, + registration: &NodeRegistration, + ) -> Result, DynCfgsyncError>; +} + +impl CfgsyncMaterializer for CfgsyncNodeCatalog { + fn materialize( + &self, + registration: &NodeRegistration, + ) -> Result, DynCfgsyncError> { + let artifacts = self + .resolve(®istration.identifier) + .map(build_node_artifacts_from_config); + + Ok(artifacts) + } +} + +/// Registration-aware provider backed by an adapter materializer. +pub struct MaterializingConfigProvider { + materializer: M, + registrations: Mutex>, +} + +impl MaterializingConfigProvider { + #[must_use] + pub fn new(materializer: M) -> Self { + Self { + materializer, + registrations: Mutex::new(HashMap::new()), + } + } + + fn registration_for(&self, identifier: &str) -> Option { + let registrations = self + .registrations + .lock() + .expect("cfgsync registration store should not be poisoned"); + + registrations.get(identifier).cloned() + } +} + +impl ConfigProvider for MaterializingConfigProvider +where + M: CfgsyncMaterializer, +{ + fn register(&self, registration: NodeRegistration) -> RegistrationResponse { + let mut registrations = self + .registrations + .lock() + .expect("cfgsync registration store should not be poisoned"); + registrations.insert(registration.identifier.clone(), registration); + + RegistrationResponse::Registered + } + + fn resolve(&self, registration: &NodeRegistration) -> RepoResponse { + let registration = match self.registration_for(®istration.identifier) { + Some(registration) => registration, + None => { + return RepoResponse::Error(CfgSyncErrorResponse::not_ready( + ®istration.identifier, + )); + } + }; + + match self.materializer.materialize(®istration) { + Ok(Some(artifacts)) => { + RepoResponse::Config(CfgSyncPayload::from_files(artifacts.files().to_vec())) + } + Ok(None) => { + RepoResponse::Error(CfgSyncErrorResponse::not_ready(®istration.identifier)) + } + Err(error) => RepoResponse::Error(CfgSyncErrorResponse::internal(format!( + "failed to materialize config for host {}: {error}", + registration.identifier + ))), + } + } +} + /// Adapter contract for converting an application deployment model into /// node-specific serialized config payloads. pub trait CfgsyncEnv { @@ -164,9 +277,15 @@ fn build_rewritten_node_config( Ok(node_config) } +fn build_node_artifacts_from_config(config: &CfgsyncNodeConfig) -> CfgsyncNodeArtifacts { + CfgsyncNodeArtifacts::new(vec![ArtifactFile::new("/config.yaml", &config.config_yaml)]) +} + #[cfg(test)] mod tests { - use super::{CfgsyncNodeCatalog, CfgsyncNodeConfig}; + use cfgsync_core::{CfgSyncErrorCode, ConfigProvider, NodeRegistration, RepoResponse}; + + use super::{CfgsyncNodeCatalog, CfgsyncNodeConfig, MaterializingConfigProvider}; #[test] fn catalog_resolves_identifier() { @@ -179,4 +298,42 @@ mod tests { assert_eq!(node.config_yaml, "key: value"); } + + #[test] + fn materializing_provider_resolves_registered_node() { + let catalog = CfgsyncNodeCatalog::new(vec![CfgsyncNodeConfig { + identifier: "node-1".to_owned(), + config_yaml: "key: value".to_owned(), + }]); + let provider = MaterializingConfigProvider::new(catalog); + let registration = NodeRegistration { + identifier: "node-1".to_owned(), + ip: "127.0.0.1".parse().expect("parse ip"), + }; + + let _ = provider.register(registration.clone()); + + match provider.resolve(®istration) { + RepoResponse::Config(payload) => assert_eq!(payload.files()[0].path, "/config.yaml"), + RepoResponse::Error(error) => panic!("expected config, got {error}"), + } + } + + #[test] + fn materializing_provider_reports_not_ready_before_registration() { + let catalog = CfgsyncNodeCatalog::new(vec![CfgsyncNodeConfig { + identifier: "node-1".to_owned(), + config_yaml: "key: value".to_owned(), + }]); + let provider = MaterializingConfigProvider::new(catalog); + let registration = NodeRegistration { + identifier: "node-1".to_owned(), + ip: "127.0.0.1".parse().expect("parse ip"), + }; + + match provider.resolve(®istration) { + RepoResponse::Config(_) => panic!("expected not-ready error"), + RepoResponse::Error(error) => assert!(matches!(error.code, CfgSyncErrorCode::NotReady)), + } + } } diff --git a/cfgsync/core/src/repo.rs b/cfgsync/core/src/repo.rs index dbce99f..69dd758 100644 --- a/cfgsync/core/src/repo.rs +++ b/cfgsync/core/src/repo.rs @@ -1,10 +1,4 @@ -use std::{ - collections::{HashMap, HashSet}, - fs, - net::Ipv4Addr, - path::Path, - sync::{Arc, Mutex}, -}; +use std::{collections::HashMap, fs, net::Ipv4Addr, path::Path, sync::Arc}; use cfgsync_artifacts::ArtifactFile; use serde::{Deserialize, Serialize}; @@ -113,66 +107,44 @@ pub enum RegistrationResponse { pub trait ConfigProvider: Send + Sync { fn register(&self, registration: NodeRegistration) -> RegistrationResponse; - fn resolve(&self, identifier: &str) -> RepoResponse; + fn resolve(&self, registration: &NodeRegistration) -> RepoResponse; } /// In-memory map-backed provider used by cfgsync server state. pub struct ConfigRepo { configs: HashMap, - registrations: Mutex>, } impl ConfigRepo { #[must_use] pub fn from_bundle(configs: HashMap) -> Arc { - Arc::new(Self { - configs, - registrations: Mutex::new(HashSet::new()), - }) - } - - fn register_identifier(&self, identifier: &str) -> RegistrationResponse { - if !self.configs.contains_key(identifier) { - return RegistrationResponse::Error(CfgSyncErrorResponse::missing_config(identifier)); - } - - let mut registrations = self - .registrations - .lock() - .expect("cfgsync registration store should not be poisoned"); - registrations.insert(identifier.to_owned()); - - RegistrationResponse::Registered - } - - fn is_registered(&self, identifier: &str) -> bool { - let registrations = self - .registrations - .lock() - .expect("cfgsync registration store should not be poisoned"); - - registrations.contains(identifier) + Arc::new(Self { configs }) } } impl ConfigProvider for ConfigRepo { fn register(&self, registration: NodeRegistration) -> RegistrationResponse { - self.register_identifier(®istration.identifier) + if self.configs.contains_key(®istration.identifier) { + RegistrationResponse::Registered + } else { + RegistrationResponse::Error(CfgSyncErrorResponse::missing_config( + ®istration.identifier, + )) + } } - fn resolve(&self, identifier: &str) -> RepoResponse { - if !self.configs.contains_key(identifier) { - return RepoResponse::Error(CfgSyncErrorResponse::missing_config(identifier)); - } - - if !self.is_registered(identifier) { - return RepoResponse::Error(CfgSyncErrorResponse::not_ready(identifier)); - } - - self.configs.get(identifier).cloned().map_or_else( - || RepoResponse::Error(CfgSyncErrorResponse::missing_config(identifier)), - RepoResponse::Config, - ) + fn resolve(&self, registration: &NodeRegistration) -> RepoResponse { + self.configs + .get(®istration.identifier) + .cloned() + .map_or_else( + || { + RepoResponse::Error(CfgSyncErrorResponse::missing_config( + ®istration.identifier, + )) + }, + RepoResponse::Config, + ) } } @@ -219,10 +191,7 @@ impl FileConfigProvider { .collect(); Ok(Self { - inner: ConfigRepo { - configs, - registrations: Mutex::new(HashSet::new()), - }, + inner: ConfigRepo { configs }, }) } } @@ -232,8 +201,8 @@ impl ConfigProvider for FileConfigProvider { self.inner.register(registration) } - fn resolve(&self, identifier: &str) -> RepoResponse { - self.inner.resolve(identifier) + fn resolve(&self, registration: &NodeRegistration) -> RepoResponse { + self.inner.resolve(registration) } } @@ -257,12 +226,12 @@ mod tests { fn resolves_existing_identifier() { let mut configs = HashMap::new(); configs.insert("node-1".to_owned(), sample_payload()); - let repo = ConfigRepo { - configs, - registrations: Mutex::new(HashSet::from(["node-1".to_owned()])), - }; + let repo = ConfigRepo { configs }; - match repo.resolve("node-1") { + match repo.resolve(&NodeRegistration { + identifier: "node-1".to_owned(), + ip: "127.0.0.1".parse().expect("parse ip"), + }) { RepoResponse::Config(payload) => { assert_eq!(payload.schema_version, CFGSYNC_SCHEMA_VERSION); assert_eq!(payload.files.len(), 1); @@ -276,10 +245,12 @@ mod tests { fn reports_missing_identifier() { let repo = ConfigRepo { configs: HashMap::new(), - registrations: Mutex::new(HashSet::new()), }; - match repo.resolve("unknown-node") { + match repo.resolve(&NodeRegistration { + identifier: "unknown-node".to_owned(), + ip: "127.0.0.1".parse().expect("parse ip"), + }) { RepoResponse::Config(_) => panic!("expected missing-config error"), RepoResponse::Error(error) => { assert!(matches!(error.code, CfgSyncErrorCode::MissingConfig)); @@ -310,26 +281,27 @@ nodes: ip: "127.0.0.1".parse().expect("parse ip"), }); - match provider.resolve("node-1") { + match provider.resolve(&NodeRegistration { + identifier: "node-1".to_owned(), + ip: "127.0.0.1".parse().expect("parse ip"), + }) { RepoResponse::Config(payload) => assert_eq!(payload.files.len(), 1), RepoResponse::Error(error) => panic!("expected config, got {error}"), } } #[test] - fn resolve_requires_registration_first() { + fn resolve_accepts_known_registration_without_gating() { let mut configs = HashMap::new(); configs.insert("node-1".to_owned(), sample_payload()); - let repo = ConfigRepo { - configs, - registrations: Mutex::new(HashSet::new()), - }; + let repo = ConfigRepo { configs }; - match repo.resolve("node-1") { - RepoResponse::Config(_) => panic!("expected not-ready error"), - RepoResponse::Error(error) => { - assert!(matches!(error.code, CfgSyncErrorCode::NotReady)); - } + match repo.resolve(&NodeRegistration { + identifier: "node-1".to_owned(), + ip: "127.0.0.1".parse().expect("parse ip"), + }) { + RepoResponse::Config(_) => {} + RepoResponse::Error(error) => panic!("expected config, got {error}"), } } } diff --git a/cfgsync/core/src/server.rs b/cfgsync/core/src/server.rs index 330c80d..407f30a 100644 --- a/cfgsync/core/src/server.rs +++ b/cfgsync/core/src/server.rs @@ -39,7 +39,7 @@ async fn node_config( State(state): State>, Json(payload): Json, ) -> impl IntoResponse { - let response = resolve_node_config_response(&state, &payload.identifier); + let response = resolve_node_config_response(&state, &payload); match response { RepoResponse::Config(payload_data) => (StatusCode::OK, Json(payload_data)).into_response(), @@ -65,8 +65,11 @@ async fn register_node( } } -fn resolve_node_config_response(state: &CfgSyncState, identifier: &str) -> RepoResponse { - state.repo.resolve(identifier) +fn resolve_node_config_response( + state: &CfgSyncState, + registration: &NodeRegistration, +) -> RepoResponse { + state.repo.resolve(registration) } fn error_status(code: &CfgSyncErrorCode) -> StatusCode { @@ -129,11 +132,66 @@ mod tests { } } - fn resolve(&self, identifier: &str) -> RepoResponse { - self.data.get(identifier).cloned().map_or_else( - || RepoResponse::Error(CfgSyncErrorResponse::missing_config(identifier)), - RepoResponse::Config, - ) + fn resolve(&self, registration: &NodeRegistration) -> RepoResponse { + self.data + .get(®istration.identifier) + .cloned() + .map_or_else( + || { + RepoResponse::Error(CfgSyncErrorResponse::missing_config( + ®istration.identifier, + )) + }, + RepoResponse::Config, + ) + } + } + + struct RegistrationAwareProvider { + data: HashMap, + registrations: std::sync::Mutex>, + } + + impl ConfigProvider for RegistrationAwareProvider { + fn register(&self, registration: NodeRegistration) -> RegistrationResponse { + if !self.data.contains_key(®istration.identifier) { + return RegistrationResponse::Error(CfgSyncErrorResponse::missing_config( + ®istration.identifier, + )); + } + + let mut registrations = self + .registrations + .lock() + .expect("test registration store should not be poisoned"); + registrations.insert(registration.identifier.clone(), registration); + + RegistrationResponse::Registered + } + + fn resolve(&self, registration: &NodeRegistration) -> RepoResponse { + let registrations = self + .registrations + .lock() + .expect("test registration store should not be poisoned"); + + if !registrations.contains_key(®istration.identifier) { + return RepoResponse::Error(CfgSyncErrorResponse::not_ready( + ®istration.identifier, + )); + } + + self.data + .get(®istration.identifier) + .cloned() + .map_or_else( + || { + RepoResponse::Error(CfgSyncErrorResponse::missing_config( + ®istration.identifier, + )) + }, + RepoResponse::Config, + ) } } @@ -149,7 +207,10 @@ mod tests { let mut data = HashMap::new(); data.insert("node-a".to_owned(), sample_payload()); - let provider = crate::repo::ConfigRepo::from_bundle(data); + let provider = Arc::new(RegistrationAwareProvider { + data, + registrations: std::sync::Mutex::new(HashMap::new()), + }); let state = Arc::new(CfgSyncState::new(provider)); let payload = NodeRegistration { ip: "127.0.0.1".parse().expect("valid ip"), @@ -197,7 +258,10 @@ mod tests { let mut data = HashMap::new(); data.insert("node-a".to_owned(), sample_payload()); - let provider = crate::repo::ConfigRepo::from_bundle(data); + let provider = Arc::new(RegistrationAwareProvider { + data, + registrations: std::sync::Mutex::new(HashMap::new()), + }); let state = Arc::new(CfgSyncState::new(provider)); let payload = NodeRegistration { ip: "127.0.0.1".parse().expect("valid ip"),