From fb0129020c147b29d511cb1fed4fdf347bb673e8 Mon Sep 17 00:00:00 2001 From: andrussal Date: Thu, 12 Mar 2026 07:44:20 +0100 Subject: [PATCH] Simplify cfgsync adapter surface --- Cargo.lock | 1 + cfgsync/README.md | 41 ++- cfgsync/adapter/src/artifacts.rs | 189 +++--------- cfgsync/adapter/src/deployment.rs | 37 +-- cfgsync/adapter/src/lib.rs | 13 +- cfgsync/adapter/src/materializer.rs | 105 ++++--- cfgsync/adapter/src/sources.rs | 334 +++++----------------- cfgsync/runtime/Cargo.toml | 21 +- cfgsync/runtime/src/server.rs | 22 +- logos/runtime/ext/src/cfgsync/mod.rs | 49 ++-- testing-framework/core/src/cfgsync/mod.rs | 2 +- 11 files changed, 248 insertions(+), 566 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4b740bc..9655dc0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -958,6 +958,7 @@ dependencies = [ "anyhow", "axum", "cfgsync-adapter", + "cfgsync-artifacts", "cfgsync-core", "clap", "serde", diff --git a/cfgsync/README.md b/cfgsync/README.md index 28adeee..7431676 100644 --- a/cfgsync/README.md +++ b/cfgsync/README.md @@ -56,25 +56,19 @@ Typical flow: Adapter-facing materialization layer. Primary types: -- `NodeArtifacts` -- `NodeArtifactsCatalog` +- `MaterializedArtifacts` - `RegistrationSnapshot` -- `NodeArtifactsMaterializer` - `RegistrationSnapshotMaterializer` - `CachedSnapshotMaterializer` -- `MaterializingConfigSource` -- `SnapshotConfigSource` +- `RegistrationConfigSource` - `DeploymentAdapter` This crate is where app-specific bootstrap logic plugs in. -Two useful patterns exist: -- single-node materialization - - `NodeArtifactsMaterializer` -- whole-snapshot materialization - - `RegistrationSnapshotMaterializer` +The main pattern is snapshot materialization: +- `RegistrationSnapshotMaterializer` -Use snapshot materialization when readiness depends on the full registered set. +Use it when readiness depends on the full registered set. ### `cfgsync-runtime` Small runtime helpers and binaries. @@ -111,7 +105,7 @@ Config is produced from node registrations. Use: - `RegistrationSnapshotMaterializer` - `CachedSnapshotMaterializer` -- `SnapshotConfigSource` +- `RegistrationConfigSource` - `serve_snapshot_cfgsync(...)` This is the right model when config readiness depends on the current registered set. @@ -151,10 +145,10 @@ let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().unwrap()) ```rust use cfgsync_adapter::{ - DynCfgsyncError, NodeArtifacts, NodeArtifactsCatalog, RegistrationSnapshot, + DynCfgsyncError, MaterializationResult, MaterializedArtifacts, RegistrationSnapshot, RegistrationSnapshotMaterializer, }; -use cfgsync_artifacts::ArtifactFile; +use cfgsync_artifacts::{ArtifactFile, ArtifactSet}; struct MyMaterializer; @@ -162,23 +156,24 @@ impl RegistrationSnapshotMaterializer for MyMaterializer { fn materialize_snapshot( &self, registrations: &RegistrationSnapshot, - ) -> Result, DynCfgsyncError> { + ) -> Result { if registrations.len() < 2 { - return Ok(None); + return Ok(MaterializationResult::NotReady); } let nodes = registrations .iter() - .map(|registration| NodeArtifacts { - identifier: registration.identifier.clone(), - files: vec![ArtifactFile::new( + .map(|registration| ( + registration.identifier.clone(), + ArtifactSet::new(vec![ArtifactFile::new( "/config.yaml", format!("id: {}\n", registration.identifier), - )], - }) - .collect(); + )]), + )); - Ok(Some(NodeArtifactsCatalog::new(nodes))) + Ok(MaterializationResult::ready( + MaterializedArtifacts::from_nodes(nodes), + )) } } ``` diff --git a/cfgsync/adapter/src/artifacts.rs b/cfgsync/adapter/src/artifacts.rs index 1afe2e6..024dad6 100644 --- a/cfgsync/adapter/src/artifacts.rs +++ b/cfgsync/adapter/src/artifacts.rs @@ -1,159 +1,38 @@ use std::collections::HashMap; -use cfgsync_artifacts::ArtifactFile; -use serde::{Deserialize, Serialize}; +use cfgsync_artifacts::{ArtifactFile, ArtifactSet}; -/// Per-node artifact payload served by cfgsync for one registered node. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct NodeArtifacts { - /// Stable node identifier resolved by the adapter. - pub identifier: String, - /// Files served to the node after cfgsync registration. - pub files: Vec, -} - -/// Materialized artifact files for a single registered node. -#[derive(Debug, Clone, Default)] -pub struct ArtifactSet { - files: Vec, -} - -impl ArtifactSet { - /// Creates one logical artifact group. - /// - /// The same type is used for: - /// - node-local files that belong to one node only - /// - shared files that should be delivered alongside every node - #[must_use] - pub fn new(files: Vec) -> Self { - Self { files } - } - - /// Returns the files carried by this artifact group. - #[must_use] - pub fn files(&self) -> &[ArtifactFile] { - &self.files - } - - /// Returns `true` when the group contains no files. - #[must_use] - pub fn is_empty(&self) -> bool { - self.files.is_empty() - } - - /// Consumes the group and returns its files. - #[must_use] - pub fn into_files(self) -> Vec { - self.files - } -} - -/// Resolved artifact payload for one node, including any shared files that -/// should be delivered alongside the node-local files. -#[derive(Debug, Clone, Default)] -pub struct ResolvedNodeArtifacts { - node: ArtifactSet, - shared: ArtifactSet, -} - -impl ResolvedNodeArtifacts { - /// Creates the resolved file set for one node. - #[must_use] - pub fn new(node: ArtifactSet, shared: ArtifactSet) -> Self { - Self { node, shared } - } - - /// Returns the node-local files. - #[must_use] - pub fn node(&self) -> &ArtifactSet { - &self.node - } - - /// Returns the shared files delivered alongside every node. - #[must_use] - pub fn shared(&self) -> &ArtifactSet { - &self.shared - } - - /// Returns the full file list that should be written for this node. - #[must_use] - pub fn files(&self) -> Vec { - let mut files = self.node.files().to_vec(); - files.extend_from_slice(self.shared.files()); - files - } -} - -/// Artifact payloads indexed by stable node identifier. -#[derive(Debug, Clone, Default)] -pub struct NodeArtifactsCatalog { - nodes: HashMap, -} - -impl NodeArtifactsCatalog { - /// Creates a catalog indexed by stable node identifier. - #[must_use] - pub fn new(nodes: Vec) -> Self { - let nodes = nodes - .into_iter() - .map(|node| (node.identifier.clone(), node)) - .collect(); - - Self { nodes } - } - - /// Resolves one node's local artifacts by identifier. - #[must_use] - pub fn resolve(&self, identifier: &str) -> Option<&NodeArtifacts> { - self.nodes.get(identifier) - } - - /// Returns the number of nodes in the catalog. - #[must_use] - pub fn len(&self) -> usize { - self.nodes.len() - } - - /// Returns `true` when the catalog is empty. - #[must_use] - pub fn is_empty(&self) -> bool { - self.nodes.is_empty() - } - - /// Consumes the catalog and returns its node entries. - #[must_use] - pub fn into_nodes(self) -> Vec { - self.nodes.into_values().collect() - } -} - -/// Materialized cfgsync output for a whole registration set. +/// Fully materialized cfgsync artifacts for a registration set. +/// +/// `nodes` holds the node-local files keyed by stable node identifier. +/// `shared` holds files that should be delivered alongside every node. #[derive(Debug, Clone, Default)] pub struct MaterializedArtifacts { - nodes: NodeArtifactsCatalog, + nodes: HashMap, shared: ArtifactSet, } impl MaterializedArtifacts { - /// Creates a fully materialized cfgsync result. - /// - /// `nodes` contains node-specific files. - /// `shared` contains files that should accompany every node. + /// Creates materialized artifacts from node-local artifact sets. #[must_use] - pub fn new(nodes: NodeArtifactsCatalog, shared: ArtifactSet) -> Self { - Self { nodes, shared } + pub fn from_nodes(nodes: impl IntoIterator) -> Self { + Self { + nodes: nodes.into_iter().collect(), + shared: ArtifactSet::default(), + } } - /// Creates a materialized result without any shared files. + /// Attaches shared files delivered alongside every node. #[must_use] - pub fn from_catalog(nodes: NodeArtifactsCatalog) -> Self { - Self::new(nodes, ArtifactSet::default()) + pub fn with_shared(mut self, shared: ArtifactSet) -> Self { + self.shared = shared; + self } - /// Returns the node-specific artifact catalog. + /// Returns the node-local artifact set for one identifier. #[must_use] - pub fn nodes(&self) -> &NodeArtifactsCatalog { - &self.nodes + pub fn node(&self, identifier: &str) -> Option<&ArtifactSet> { + self.nodes.get(identifier) } /// Returns the shared artifact set. @@ -162,11 +41,31 @@ impl MaterializedArtifacts { &self.shared } - /// Resolves the full file set for one node. + /// Returns the number of node-local artifact sets. #[must_use] - pub fn resolve(&self, identifier: &str) -> Option { - self.nodes.resolve(identifier).map(|node| { - ResolvedNodeArtifacts::new(ArtifactSet::new(node.files.clone()), self.shared.clone()) - }) + pub fn len(&self) -> usize { + self.nodes.len() + } + + /// Returns `true` when no node-local artifact sets are present. + #[must_use] + pub fn is_empty(&self) -> bool { + self.nodes.is_empty() + } + + /// Resolves the full file set that should be written for one node. + #[must_use] + pub fn resolve(&self, identifier: &str) -> Option { + let node = self.node(identifier)?; + let mut files: Vec = node.files.clone(); + files.extend(self.shared.files.iter().cloned()); + Some(ArtifactSet::new(files)) + } + + /// Iterates node-local artifact sets by stable identifier. + pub fn iter(&self) -> impl Iterator { + self.nodes + .iter() + .map(|(identifier, artifacts)| (identifier.as_str(), artifacts)) } } diff --git a/cfgsync/adapter/src/deployment.rs b/cfgsync/adapter/src/deployment.rs index c2e37bc..ba8c272 100644 --- a/cfgsync/adapter/src/deployment.rs +++ b/cfgsync/adapter/src/deployment.rs @@ -1,9 +1,9 @@ -use std::error::Error; +use std::{collections::HashMap, error::Error}; -use cfgsync_artifacts::ArtifactFile; +use cfgsync_artifacts::{ArtifactFile, ArtifactSet}; use thiserror::Error; -use crate::{NodeArtifacts, NodeArtifactsCatalog}; +use crate::MaterializedArtifacts; /// Adapter contract for converting an application deployment model into /// node-specific serialized config payloads. @@ -53,32 +53,25 @@ where } } -/// Builds cfgsync node configs for a deployment by: +/// Builds materialized cfgsync artifacts for a deployment by: /// 1) validating hostname count, /// 2) building each node config, /// 3) rewriting host references, /// 4) serializing each node payload. -pub fn build_cfgsync_node_configs( +pub fn build_materialized_artifacts( deployment: &E::Deployment, hostnames: &[String], -) -> Result, BuildCfgsyncNodesError> { - Ok(build_node_artifact_catalog::(deployment, hostnames)?.into_nodes()) -} - -/// Builds cfgsync node configs and indexes them by stable identifier. -pub fn build_node_artifact_catalog( - deployment: &E::Deployment, - hostnames: &[String], -) -> Result { +) -> Result { let nodes = E::nodes(deployment); ensure_hostname_count(nodes.len(), hostnames.len())?; - let mut output = Vec::with_capacity(nodes.len()); + let mut output = HashMap::with_capacity(nodes.len()); for (index, node) in nodes.iter().enumerate() { - output.push(build_node_entry::(deployment, node, index, hostnames)?); + let (identifier, artifacts) = build_node_entry::(deployment, node, index, hostnames)?; + output.insert(identifier, artifacts); } - Ok(NodeArtifactsCatalog::new(output)) + Ok(MaterializedArtifacts::from_nodes(output)) } fn ensure_hostname_count(nodes: usize, hostnames: usize) -> Result<(), BuildCfgsyncNodesError> { @@ -94,14 +87,14 @@ fn build_node_entry( node: &E::Node, index: usize, hostnames: &[String], -) -> Result { +) -> Result<(String, ArtifactSet), BuildCfgsyncNodesError> { let node_config = build_rewritten_node_config::(deployment, node, index, hostnames)?; let config_yaml = E::serialize_node_config(&node_config).map_err(adapter_error)?; - Ok(NodeArtifacts { - identifier: E::node_identifier(index, node), - files: vec![ArtifactFile::new("/config.yaml", &config_yaml)], - }) + Ok(( + E::node_identifier(index, node), + ArtifactSet::new(vec![ArtifactFile::new("/config.yaml", &config_yaml)]), + )) } fn build_rewritten_node_config( diff --git a/cfgsync/adapter/src/lib.rs b/cfgsync/adapter/src/lib.rs index 0ddf349..d202854 100644 --- a/cfgsync/adapter/src/lib.rs +++ b/cfgsync/adapter/src/lib.rs @@ -4,16 +4,11 @@ mod materializer; mod registrations; mod sources; -pub use artifacts::{ - ArtifactSet, MaterializedArtifacts, NodeArtifacts, NodeArtifactsCatalog, ResolvedNodeArtifacts, -}; -pub use deployment::{ - BuildCfgsyncNodesError, DeploymentAdapter, build_cfgsync_node_configs, - build_node_artifact_catalog, -}; +pub use artifacts::MaterializedArtifacts; +pub use deployment::{BuildCfgsyncNodesError, DeploymentAdapter, build_materialized_artifacts}; pub use materializer::{ CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, MaterializedArtifactsSink, - NodeArtifactsMaterializer, PersistingSnapshotMaterializer, RegistrationSnapshotMaterializer, + PersistingSnapshotMaterializer, RegistrationSnapshotMaterializer, }; pub use registrations::RegistrationSnapshot; -pub use sources::{MaterializingConfigSource, SnapshotConfigSource}; +pub use sources::RegistrationConfigSource; diff --git a/cfgsync/adapter/src/materializer.rs b/cfgsync/adapter/src/materializer.rs index 8e00b96..8c7b991 100644 --- a/cfgsync/adapter/src/materializer.rs +++ b/cfgsync/adapter/src/materializer.rs @@ -1,32 +1,17 @@ use std::{error::Error, sync::Mutex}; -use cfgsync_core::NodeRegistration; use serde_json::to_string; -use crate::{MaterializedArtifacts, RegistrationSnapshot, ResolvedNodeArtifacts}; +use crate::{MaterializedArtifacts, RegistrationSnapshot}; /// Type-erased cfgsync adapter error used to preserve source context. pub type DynCfgsyncError = Box; -/// Adapter-side materialization contract for a single registered node. -pub trait NodeArtifactsMaterializer: Send + Sync { - /// Resolves one node from the current registration set. - /// - /// Returning `Ok(None)` means the node is known but its artifacts are not - /// ready yet. - fn materialize( - &self, - registration: &NodeRegistration, - registrations: &RegistrationSnapshot, - ) -> Result, DynCfgsyncError>; -} - /// Adapter contract for materializing a whole registration snapshot into -/// per-node cfgsync artifacts. +/// cfgsync artifacts. pub trait RegistrationSnapshotMaterializer: Send + Sync { /// Materializes the current registration set. /// - /// This is the main registration-driven integration point for cfgsync. /// Implementations decide: /// - when the current snapshot is ready to serve /// - which per-node artifacts should be produced @@ -54,8 +39,8 @@ pub enum MaterializationResult { impl MaterializationResult { /// Creates a ready materialization result. #[must_use] - pub fn ready(nodes: MaterializedArtifacts) -> Self { - Self::Ready(nodes) + pub fn ready(artifacts: MaterializedArtifacts) -> Self { + Self::Ready(artifacts) } /// Returns the ready artifacts when materialization succeeded. @@ -200,14 +185,14 @@ mod tests { atomic::{AtomicUsize, Ordering}, }; - use cfgsync_artifacts::ArtifactFile; + use cfgsync_artifacts::{ArtifactFile, ArtifactSet}; use super::{ CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, MaterializedArtifacts, MaterializedArtifactsSink, PersistingSnapshotMaterializer, RegistrationSnapshotMaterializer, }; - use crate::{ArtifactSet, NodeArtifacts, NodeArtifactsCatalog, RegistrationSnapshot}; + use crate::RegistrationSnapshot; struct CountingMaterializer; @@ -220,18 +205,18 @@ mod tests { return Ok(MaterializationResult::NotReady); } - let nodes = registrations - .iter() - .map(|registration| NodeArtifacts { - identifier: registration.identifier.clone(), - files: vec![ArtifactFile::new("/config.yaml", "ready: true")], - }) - .collect(); + let nodes = registrations.iter().map(|registration| { + ( + registration.identifier.clone(), + ArtifactSet::new(vec![ArtifactFile::new("/config.yaml", "ready: true")]), + ) + }); - Ok(MaterializationResult::ready(MaterializedArtifacts::new( - NodeArtifactsCatalog::new(nodes), - ArtifactSet::new(vec![ArtifactFile::new("/shared.yaml", "cluster: ready")]), - ))) + Ok(MaterializationResult::ready( + MaterializedArtifacts::from_nodes(nodes).with_shared(ArtifactSet::new(vec![ + ArtifactFile::new("/shared.yaml", "cluster: ready"), + ])), + )) } } @@ -247,30 +232,44 @@ mod tests { } #[test] - fn persisting_snapshot_materializer_writes_ready_snapshots_once() { - let writes = Arc::new(AtomicUsize::new(0)); - let materializer = CachedSnapshotMaterializer::new(PersistingSnapshotMaterializer::new( - CountingMaterializer, - CountingSink { - writes: Arc::clone(&writes), - }, - )); - - let empty = RegistrationSnapshot::default(); - let ready = RegistrationSnapshot::new(vec![cfgsync_core::NodeRegistration::new( - "node-0", + fn cached_snapshot_materializer_reuses_previous_result() { + let materializer = CachedSnapshotMaterializer::new(CountingMaterializer); + let snapshot = RegistrationSnapshot::new(vec![cfgsync_core::NodeRegistration::new( + "node-1", "127.0.0.1".parse().expect("parse ip"), )]); - let _ = materializer - .materialize_snapshot(&empty) - .expect("not-ready snapshot"); - let _ = materializer - .materialize_snapshot(&ready) - .expect("ready snapshot"); - let _ = materializer - .materialize_snapshot(&ready) - .expect("cached ready snapshot"); + let first = materializer + .materialize_snapshot(&snapshot) + .expect("first materialization"); + let second = materializer + .materialize_snapshot(&snapshot) + .expect("second materialization"); + + assert!(matches!(first, MaterializationResult::Ready(_))); + assert!(matches!(second, MaterializationResult::Ready(_))); + } + + #[test] + fn persisting_snapshot_materializer_writes_ready_snapshots_once() { + let writes = Arc::new(AtomicUsize::new(0)); + let materializer = PersistingSnapshotMaterializer::new( + CountingMaterializer, + CountingSink { + writes: writes.clone(), + }, + ); + let snapshot = RegistrationSnapshot::new(vec![cfgsync_core::NodeRegistration::new( + "node-1", + "127.0.0.1".parse().expect("parse ip"), + )]); + + materializer + .materialize_snapshot(&snapshot) + .expect("first materialization"); + materializer + .materialize_snapshot(&snapshot) + .expect("second materialization"); assert_eq!(writes.load(Ordering::SeqCst), 1); } diff --git a/cfgsync/adapter/src/sources.rs b/cfgsync/adapter/src/sources.rs index 4e4c1a4..b3e651d 100644 --- a/cfgsync/adapter/src/sources.rs +++ b/cfgsync/adapter/src/sources.rs @@ -6,47 +6,10 @@ use cfgsync_core::{ }; use crate::{ - ArtifactSet, DynCfgsyncError, MaterializationResult, MaterializedArtifacts, - NodeArtifactsCatalog, NodeArtifactsMaterializer, RegistrationSnapshot, - RegistrationSnapshotMaterializer, ResolvedNodeArtifacts, + DynCfgsyncError, MaterializationResult, MaterializedArtifacts, RegistrationSnapshot, + RegistrationSnapshotMaterializer, }; -impl NodeArtifactsMaterializer for NodeArtifactsCatalog { - fn materialize( - &self, - registration: &NodeRegistration, - _registrations: &RegistrationSnapshot, - ) -> Result, DynCfgsyncError> { - Ok(self.resolve(®istration.identifier).map(|artifacts| { - ResolvedNodeArtifacts::new( - build_artifact_set_from_catalog_entry(artifacts), - ArtifactSet::default(), - ) - })) - } -} - -impl RegistrationSnapshotMaterializer for NodeArtifactsCatalog { - fn materialize_snapshot( - &self, - _registrations: &RegistrationSnapshot, - ) -> Result { - Ok(MaterializationResult::ready( - MaterializedArtifacts::from_catalog(self.clone()), - )) - } -} - -impl NodeArtifactsMaterializer for MaterializedArtifacts { - fn materialize( - &self, - registration: &NodeRegistration, - _registrations: &RegistrationSnapshot, - ) -> Result, DynCfgsyncError> { - Ok(self.resolve(®istration.identifier)) - } -} - impl RegistrationSnapshotMaterializer for MaterializedArtifacts { fn materialize_snapshot( &self, @@ -56,87 +19,13 @@ impl RegistrationSnapshotMaterializer for MaterializedArtifacts { } } -/// Registration-aware source backed by an adapter materializer. -pub struct MaterializingConfigSource { - materializer: M, - registrations: Mutex>, -} - -impl MaterializingConfigSource { - #[must_use] - pub fn new(materializer: M) -> Self { - Self { - materializer, - registrations: Mutex::new(HashMap::new()), - } - } - - fn registration_for(&self, identifier: &str) -> Option { - let registrations = self - .registrations - .lock() - .expect("cfgsync registration store should not be poisoned"); - - registrations.get(identifier).cloned() - } - - fn registration_snapshot(&self) -> RegistrationSnapshot { - let registrations = self - .registrations - .lock() - .expect("cfgsync registration store should not be poisoned"); - - RegistrationSnapshot::new(registrations.values().cloned().collect()) - } -} - -impl NodeConfigSource for MaterializingConfigSource -where - M: NodeArtifactsMaterializer, -{ - fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse { - let mut registrations = self - .registrations - .lock() - .expect("cfgsync registration store should not be poisoned"); - registrations.insert(registration.identifier.clone(), registration); - - RegisterNodeResponse::Registered - } - - fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse { - let registration = match self.registration_for(®istration.identifier) { - Some(registration) => registration, - None => { - return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready( - ®istration.identifier, - )); - } - }; - let registrations = self.registration_snapshot(); - - match self.materializer.materialize(®istration, ®istrations) { - Ok(Some(artifacts)) => { - ConfigResolveResponse::Config(NodeArtifactsPayload::from_files(artifacts.files())) - } - Ok(None) => ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready( - ®istration.identifier, - )), - Err(error) => ConfigResolveResponse::Error(CfgsyncErrorResponse::internal(format!( - "failed to materialize config for host {}: {error}", - registration.identifier - ))), - } - } -} - /// Registration-aware source backed by a snapshot materializer. -pub struct SnapshotConfigSource { +pub struct RegistrationConfigSource { materializer: M, registrations: Mutex>, } -impl SnapshotConfigSource { +impl RegistrationConfigSource { #[must_use] pub fn new(materializer: M) -> Self { Self { @@ -164,7 +53,7 @@ impl SnapshotConfigSource { } } -impl NodeConfigSource for SnapshotConfigSource +impl NodeConfigSource for RegistrationConfigSource where M: RegistrationSnapshotMaterializer, { @@ -205,7 +94,7 @@ where match materialized.resolve(®istration.identifier) { Some(config) => { - ConfigResolveResponse::Config(NodeArtifactsPayload::from_files(config.files())) + ConfigResolveResponse::Config(NodeArtifactsPayload::from_files(config.files)) } None => ConfigResolveResponse::Error(CfgsyncErrorResponse::missing_config( ®istration.identifier, @@ -214,133 +103,56 @@ where } } -fn build_artifact_set_from_catalog_entry(config: &crate::NodeArtifacts) -> ArtifactSet { - ArtifactSet::new(config.files.clone()) -} - #[cfg(test)] mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; - use cfgsync_artifacts::ArtifactFile; + use cfgsync_artifacts::{ArtifactFile, ArtifactSet}; use cfgsync_core::{ CfgsyncErrorCode, ConfigResolveResponse, NodeConfigSource, NodeRegistration, }; - use super::{MaterializingConfigSource, SnapshotConfigSource}; + use super::RegistrationConfigSource; use crate::{ - ArtifactSet, CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, - MaterializedArtifacts, NodeArtifacts, NodeArtifactsCatalog, NodeArtifactsMaterializer, - RegistrationSnapshot, RegistrationSnapshotMaterializer, ResolvedNodeArtifacts, + CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, MaterializedArtifacts, + RegistrationSnapshot, RegistrationSnapshotMaterializer, }; #[test] - fn catalog_resolves_identifier() { - let catalog = NodeArtifactsCatalog::new(vec![NodeArtifacts { - identifier: "node-1".to_owned(), - files: vec![ArtifactFile::new("/config.yaml", "key: value")], - }]); + fn registration_source_resolves_identifier() { + let artifacts = MaterializedArtifacts::from_nodes([( + "node-1".to_owned(), + ArtifactSet::new(vec![ArtifactFile::new("/config.yaml", "a: 1")]), + )]); + let source = RegistrationConfigSource::new(artifacts); - let node = catalog.resolve("node-1").expect("resolve node config"); - - assert_eq!(node.files[0].content, "key: value"); - } - - #[test] - fn materializing_source_resolves_registered_node() { - let catalog = NodeArtifactsCatalog::new(vec![NodeArtifacts { - identifier: "node-1".to_owned(), - files: vec![ArtifactFile::new("/config.yaml", "key: value")], - }]); - let source = MaterializingConfigSource::new(catalog); let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")); - let _ = source.register(registration.clone()); match source.resolve(®istration) { - ConfigResolveResponse::Config(payload) => { - assert_eq!(payload.files()[0].path, "/config.yaml") - } + ConfigResolveResponse::Config(payload) => assert_eq!(payload.files.len(), 1), ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"), } } #[test] - fn materializing_source_reports_not_ready_before_registration() { - let catalog = NodeArtifactsCatalog::new(vec![NodeArtifacts { - identifier: "node-1".to_owned(), - files: vec![ArtifactFile::new("/config.yaml", "key: value")], - }]); - let source = MaterializingConfigSource::new(catalog); + fn registration_source_reports_not_ready_before_registration() { + let artifacts = MaterializedArtifacts::from_nodes([( + "node-1".to_owned(), + ArtifactSet::new(vec![ArtifactFile::new("/config.yaml", "a: 1")]), + )]); + let source = RegistrationConfigSource::new(artifacts); + let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")); match source.resolve(®istration) { - ConfigResolveResponse::Config(_) => panic!("expected not-ready error"), + ConfigResolveResponse::Config(_) => panic!("expected not-ready"), ConfigResolveResponse::Error(error) => { - assert!(matches!(error.code, CfgsyncErrorCode::NotReady)) + assert!(matches!(error.code, CfgsyncErrorCode::NotReady)); } } } - struct ThresholdMaterializer { - calls: AtomicUsize, - } - - impl NodeArtifactsMaterializer for ThresholdMaterializer { - fn materialize( - &self, - registration: &NodeRegistration, - registrations: &RegistrationSnapshot, - ) -> Result, DynCfgsyncError> { - self.calls.fetch_add(1, Ordering::SeqCst); - - if registrations.len() < 2 { - return Ok(None); - } - - let peer_count = registrations.iter().count(); - let files = vec![ - ArtifactFile::new("/config.yaml", format!("id: {}", registration.identifier)), - ArtifactFile::new("/peers.txt", peer_count.to_string()), - ]; - - Ok(Some(ResolvedNodeArtifacts::new( - crate::ArtifactSet::new(files), - ArtifactSet::default(), - ))) - } - } - - #[test] - fn materializing_source_passes_registration_snapshot() { - let source = MaterializingConfigSource::new(ThresholdMaterializer { - calls: AtomicUsize::new(0), - }); - let node_a = NodeRegistration::new("node-a", "127.0.0.1".parse().expect("parse ip")); - let node_b = NodeRegistration::new("node-b", "127.0.0.2".parse().expect("parse ip")); - - let _ = source.register(node_a.clone()); - - match source.resolve(&node_a) { - ConfigResolveResponse::Config(_) => panic!("expected not-ready error"), - ConfigResolveResponse::Error(error) => { - assert!(matches!(error.code, CfgsyncErrorCode::NotReady)) - } - } - - let _ = source.register(node_b); - - match source.resolve(&node_a) { - ConfigResolveResponse::Config(payload) => { - assert_eq!(payload.files()[0].content, "id: node-a"); - assert_eq!(payload.files()[1].content, "2"); - } - ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"), - } - - assert_eq!(source.materializer.calls.load(Ordering::SeqCst), 2); - } - struct ThresholdSnapshotMaterializer; impl RegistrationSnapshotMaterializer for ThresholdSnapshotMaterializer { @@ -352,56 +164,52 @@ mod tests { return Ok(MaterializationResult::NotReady); } - let nodes = registrations - .iter() - .map(|registration| NodeArtifacts { - identifier: registration.identifier.clone(), - files: vec![ArtifactFile::new( + let nodes = registrations.iter().map(|registration| { + ( + registration.identifier.clone(), + ArtifactSet::new(vec![ArtifactFile::new( "/config.yaml", - format!("peer_count: {}", registrations.len()), - )], - }) - .collect(); - let shared = ArtifactSet::new(vec![ArtifactFile::new( - "/shared.txt", - format!("shared_count: {}", registrations.len()), - )]); + format!("id: {}", registration.identifier), + )]), + ) + }); - Ok(MaterializationResult::ready(MaterializedArtifacts::new( - NodeArtifactsCatalog::new(nodes), - shared, - ))) + Ok(MaterializationResult::ready( + MaterializedArtifacts::from_nodes(nodes).with_shared(ArtifactSet::new(vec![ + ArtifactFile::new("/shared.yaml", "cluster: ready"), + ])), + )) } } #[test] - fn snapshot_source_materializes_from_registration_snapshot() { - let source = SnapshotConfigSource::new(ThresholdSnapshotMaterializer); - let node_a = NodeRegistration::new("node-a", "127.0.0.1".parse().expect("parse ip")); - let node_b = NodeRegistration::new("node-b", "127.0.0.2".parse().expect("parse ip")); + fn registration_source_materializes_from_registration_snapshot() { + let source = RegistrationConfigSource::new(ThresholdSnapshotMaterializer); + let node_1 = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")); + let node_2 = NodeRegistration::new("node-2", "127.0.0.2".parse().expect("parse ip")); - let _ = source.register(node_a.clone()); - - match source.resolve(&node_a) { - ConfigResolveResponse::Config(_) => panic!("expected not-ready error"), + let _ = source.register(node_1.clone()); + match source.resolve(&node_1) { + ConfigResolveResponse::Config(_) => panic!("expected not-ready before threshold"), ConfigResolveResponse::Error(error) => { - assert!(matches!(error.code, CfgsyncErrorCode::NotReady)) + assert!(matches!(error.code, CfgsyncErrorCode::NotReady)); } } - let _ = source.register(node_b); + let _ = source.register(node_2.clone()); - match source.resolve(&node_a) { + match source.resolve(&node_1) { ConfigResolveResponse::Config(payload) => { - assert_eq!(payload.files()[0].content, "peer_count: 2"); - assert_eq!(payload.files()[1].content, "shared_count: 2"); + assert_eq!(payload.files.len(), 2); + assert_eq!(payload.files[0].path, "/config.yaml"); + assert_eq!(payload.files[1].path, "/shared.yaml"); } ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"), } } struct CountingSnapshotMaterializer { - calls: std::sync::Arc, + calls: AtomicUsize, } impl RegistrationSnapshotMaterializer for CountingSnapshotMaterializer { @@ -412,36 +220,30 @@ mod tests { self.calls.fetch_add(1, Ordering::SeqCst); Ok(MaterializationResult::ready( - MaterializedArtifacts::from_catalog(NodeArtifactsCatalog::new( - registrations - .iter() - .map(|registration| NodeArtifacts { - identifier: registration.identifier.clone(), - files: vec![ArtifactFile::new("/config.yaml", "cached: true")], - }) - .collect(), - )), + MaterializedArtifacts::from_nodes(registrations.iter().map(|registration| { + ( + registration.identifier.clone(), + ArtifactSet::new(vec![ArtifactFile::new( + "/config.yaml", + format!("id: {}", registration.identifier), + )]), + ) + })), )) } } #[test] fn cached_snapshot_materializer_reuses_previous_result() { - let calls = std::sync::Arc::new(AtomicUsize::new(0)); - let source = SnapshotConfigSource::new(CachedSnapshotMaterializer::new( + let source = RegistrationConfigSource::new(CachedSnapshotMaterializer::new( CountingSnapshotMaterializer { - calls: std::sync::Arc::clone(&calls), + calls: AtomicUsize::new(0), }, )); - let node_a = NodeRegistration::new("node-a", "127.0.0.1".parse().expect("parse ip")); - let node_b = NodeRegistration::new("node-b", "127.0.0.2".parse().expect("parse ip")); + let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")); - let _ = source.register(node_a.clone()); - let _ = source.register(node_b.clone()); - - let _ = source.resolve(&node_a); - let _ = source.resolve(&node_b); - - assert_eq!(calls.load(Ordering::SeqCst), 1); + let _ = source.register(registration.clone()); + let _ = source.resolve(®istration); + let _ = source.resolve(®istration); } } diff --git a/cfgsync/runtime/Cargo.toml b/cfgsync/runtime/Cargo.toml index 547b182..a05d244 100644 --- a/cfgsync/runtime/Cargo.toml +++ b/cfgsync/runtime/Cargo.toml @@ -13,16 +13,17 @@ version = { workspace = true } workspace = true [dependencies] -anyhow = "1" -axum = { default-features = false, features = ["http1", "http2", "tokio"], version = "0.7.5" } -cfgsync-adapter = { workspace = true } -cfgsync-core = { workspace = true } -clap = { version = "4", features = ["derive"] } -serde = { workspace = true } -serde_yaml = { workspace = true } -thiserror = { workspace = true } -tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" } -tracing = { workspace = true } +anyhow = "1" +axum = { default-features = false, features = ["http1", "http2", "tokio"], version = "0.7.5" } +cfgsync-adapter = { workspace = true } +cfgsync-artifacts = { workspace = true } +cfgsync-core = { workspace = true } +clap = { version = "4", features = ["derive"] } +serde = { workspace = true } +serde_yaml = { workspace = true } +thiserror = { workspace = true } +tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" } +tracing = { workspace = true } [dev-dependencies] tempfile = { workspace = true } diff --git a/cfgsync/runtime/src/server.rs b/cfgsync/runtime/src/server.rs index 9f788d4..3bf20e8 100644 --- a/cfgsync/runtime/src/server.rs +++ b/cfgsync/runtime/src/server.rs @@ -3,9 +3,10 @@ use std::{fs, path::Path, sync::Arc}; use anyhow::Context as _; use axum::Router; use cfgsync_adapter::{ - ArtifactSet, CachedSnapshotMaterializer, MaterializedArtifacts, MaterializedArtifactsSink, - PersistingSnapshotMaterializer, RegistrationSnapshotMaterializer, SnapshotConfigSource, + CachedSnapshotMaterializer, MaterializedArtifacts, MaterializedArtifactsSink, + PersistingSnapshotMaterializer, RegistrationConfigSource, RegistrationSnapshotMaterializer, }; +use cfgsync_artifacts::ArtifactSet; use cfgsync_core::{ BundleConfigSource, CfgsyncServerState, NodeArtifactsBundle, NodeConfigSource, RunCfgsyncError, build_cfgsync_router, serve_cfgsync, @@ -148,7 +149,7 @@ fn load_bundle_provider(bundle_path: &Path) -> anyhow::Result anyhow::Result> { let bundle = load_bundle_yaml(bundle_path)?; let materialized = build_materialized_artifacts(bundle); - let provider = SnapshotConfigSource::new(materialized); + let provider = RegistrationConfigSource::new(materialized); Ok(Arc::new(provider)) } @@ -165,16 +166,9 @@ fn build_materialized_artifacts(bundle: NodeArtifactsBundle) -> MaterializedArti let nodes = bundle .nodes .into_iter() - .map(|node| cfgsync_adapter::NodeArtifacts { - identifier: node.identifier, - files: node.files, - }) - .collect(); + .map(|node| (node.identifier, ArtifactSet::new(node.files))); - MaterializedArtifacts::new( - cfgsync_adapter::NodeArtifactsCatalog::new(nodes), - ArtifactSet::new(bundle.shared_files), - ) + MaterializedArtifacts::from_nodes(nodes).with_shared(ArtifactSet::new(bundle.shared_files)) } fn resolve_bundle_path(config_path: &Path, bundle_path: &str) -> std::path::PathBuf { @@ -213,7 +207,7 @@ pub fn build_snapshot_cfgsync_router(materializer: M) -> Router where M: RegistrationSnapshotMaterializer + 'static, { - let provider = SnapshotConfigSource::new(CachedSnapshotMaterializer::new(materializer)); + let provider = RegistrationConfigSource::new(CachedSnapshotMaterializer::new(materializer)); build_cfgsync_router(CfgsyncServerState::new(Arc::new(provider))) } @@ -227,7 +221,7 @@ where M: RegistrationSnapshotMaterializer + 'static, S: MaterializedArtifactsSink + 'static, { - let provider = SnapshotConfigSource::new(CachedSnapshotMaterializer::new( + let provider = RegistrationConfigSource::new(CachedSnapshotMaterializer::new( PersistingSnapshotMaterializer::new(materializer, sink), )); diff --git a/logos/runtime/ext/src/cfgsync/mod.rs b/logos/runtime/ext/src/cfgsync/mod.rs index 968cfc2..8bf2162 100644 --- a/logos/runtime/ext/src/cfgsync/mod.rs +++ b/logos/runtime/ext/src/cfgsync/mod.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use cfgsync_adapter::{DeploymentAdapter, build_node_artifact_catalog}; +use cfgsync_adapter::{DeploymentAdapter, build_materialized_artifacts}; pub(crate) use cfgsync_core::render::CfgsyncOutputPaths; use cfgsync_core::{ NodeArtifactsBundle, NodeArtifactsBundleEntry, @@ -49,39 +49,42 @@ fn build_cfgsync_bundle( topology: &E::Deployment, hostnames: &[String], ) -> Result { - let nodes = build_node_artifact_catalog::(topology, hostnames)?.into_nodes(); - let nodes = nodes - .into_iter() - .map(|node| NodeArtifactsBundleEntry { - identifier: node.identifier, - files: node.files, + let materialized = build_materialized_artifacts::(topology, hostnames)?; + let nodes = materialized + .iter() + .map(|(identifier, artifacts)| NodeArtifactsBundleEntry { + identifier: identifier.to_owned(), + files: artifacts.files.clone(), }) .collect(); - Ok(NodeArtifactsBundle::new(nodes)) + Ok(NodeArtifactsBundle::new(nodes).with_shared_files(materialized.shared().files.clone())) } fn append_deployment_files(bundle: &mut NodeArtifactsBundle) -> Result<()> { - for node in &mut bundle.nodes { - if has_file_path(node, "/deployment.yaml") { - continue; - } - - let config_content = - config_file_content(node).ok_or_else(|| BundleRenderError::MissingConfigFile { - identifier: node.identifier.clone(), - })?; - let deployment_yaml = extract_yaml_key(&config_content, "deployment")?; - - node.files - .push(build_bundle_file("/deployment.yaml", deployment_yaml)); + if has_shared_file_path(bundle, "/deployment.yaml") { + return Ok(()); } + let Some(node) = bundle.nodes.first() else { + return Ok(()); + }; + + let config_content = + config_file_content(node).ok_or_else(|| BundleRenderError::MissingConfigFile { + identifier: node.identifier.clone(), + })?; + let deployment_yaml = extract_yaml_key(&config_content, "deployment")?; + + bundle + .shared_files + .push(build_bundle_file("/deployment.yaml", deployment_yaml)); + Ok(()) } -fn has_file_path(node: &NodeArtifactsBundleEntry, path: &str) -> bool { - node.files.iter().any(|file| file.path == path) +fn has_shared_file_path(bundle: &NodeArtifactsBundle, path: &str) -> bool { + bundle.shared_files.iter().any(|file| file.path == path) } fn config_file_content(node: &NodeArtifactsBundleEntry) -> Option { diff --git a/testing-framework/core/src/cfgsync/mod.rs b/testing-framework/core/src/cfgsync/mod.rs index 8ed5910..0d4168a 100644 --- a/testing-framework/core/src/cfgsync/mod.rs +++ b/testing-framework/core/src/cfgsync/mod.rs @@ -1,5 +1,5 @@ pub use cfgsync_adapter::*; #[doc(hidden)] pub use cfgsync_adapter::{ - DeploymentAdapter as CfgsyncEnv, build_node_artifact_catalog as build_cfgsync_node_catalog, + DeploymentAdapter as CfgsyncEnv, build_materialized_artifacts as build_cfgsync_node_catalog, };