From 8681117301a1133c94093e2138768e0cc9be821f Mon Sep 17 00:00:00 2001 From: andrussal Date: Tue, 10 Mar 2026 14:24:00 +0100 Subject: [PATCH] Add shared cfgsync artifact materialization --- cfgsync/adapter/src/artifacts.rs | 73 ++++++++++++++++ cfgsync/adapter/src/lib.rs | 6 +- cfgsync/adapter/src/materializer.rs | 33 +++++-- cfgsync/adapter/src/sources.rs | 129 ++++++++++++++++++---------- cfgsync/core/src/bundle.rs | 13 ++- cfgsync/core/src/source.rs | 7 +- cfgsync/runtime/src/server.rs | 15 ++-- 7 files changed, 212 insertions(+), 64 deletions(-) diff --git a/cfgsync/adapter/src/artifacts.rs b/cfgsync/adapter/src/artifacts.rs index 4ad2693..2418ade 100644 --- a/cfgsync/adapter/src/artifacts.rs +++ b/cfgsync/adapter/src/artifacts.rs @@ -33,6 +33,43 @@ impl ArtifactSet { pub fn is_empty(&self) -> bool { self.files.is_empty() } + + #[must_use] + pub fn into_files(self) -> Vec { + self.files + } +} + +/// Resolved artifact payload for one node, including any shared files that +/// should be delivered alongside the node-local files. +#[derive(Debug, Clone, Default)] +pub struct ResolvedNodeArtifacts { + node: ArtifactSet, + shared: ArtifactSet, +} + +impl ResolvedNodeArtifacts { + #[must_use] + pub fn new(node: ArtifactSet, shared: ArtifactSet) -> Self { + Self { node, shared } + } + + #[must_use] + pub fn node(&self) -> &ArtifactSet { + &self.node + } + + #[must_use] + pub fn shared(&self) -> &ArtifactSet { + &self.shared + } + + #[must_use] + pub fn files(&self) -> Vec { + let mut files = self.node.files().to_vec(); + files.extend_from_slice(self.shared.files()); + files + } } /// Artifact payloads indexed by stable node identifier. @@ -72,3 +109,39 @@ impl NodeArtifactsCatalog { self.nodes.into_values().collect() } } + +/// Materialized cfgsync output for a whole registration set. +#[derive(Debug, Clone, Default)] +pub struct MaterializedArtifacts { + nodes: NodeArtifactsCatalog, + shared: ArtifactSet, +} + +impl MaterializedArtifacts { + #[must_use] + pub fn new(nodes: NodeArtifactsCatalog, shared: ArtifactSet) -> Self { + Self { nodes, shared } + } + + #[must_use] + pub fn from_catalog(nodes: NodeArtifactsCatalog) -> Self { + Self::new(nodes, ArtifactSet::default()) + } + + #[must_use] + pub fn nodes(&self) -> &NodeArtifactsCatalog { + &self.nodes + } + + #[must_use] + pub fn shared(&self) -> &ArtifactSet { + &self.shared + } + + #[must_use] + pub fn resolve(&self, identifier: &str) -> Option { + self.nodes.resolve(identifier).map(|node| { + ResolvedNodeArtifacts::new(ArtifactSet::new(node.files.clone()), self.shared.clone()) + }) + } +} diff --git a/cfgsync/adapter/src/lib.rs b/cfgsync/adapter/src/lib.rs index fd4e168..9c172a9 100644 --- a/cfgsync/adapter/src/lib.rs +++ b/cfgsync/adapter/src/lib.rs @@ -4,13 +4,15 @@ mod materializer; mod registrations; mod sources; -pub use artifacts::{ArtifactSet, NodeArtifacts, NodeArtifactsCatalog}; +pub use artifacts::{ + ArtifactSet, MaterializedArtifacts, NodeArtifacts, NodeArtifactsCatalog, ResolvedNodeArtifacts, +}; pub use deployment::{ BuildCfgsyncNodesError, DeploymentAdapter, build_cfgsync_node_configs, build_node_artifact_catalog, }; pub use materializer::{ - CachedSnapshotMaterializer, DynCfgsyncError, NodeArtifactsMaterializer, + CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, NodeArtifactsMaterializer, RegistrationSnapshotMaterializer, }; pub use registrations::RegistrationSnapshot; diff --git a/cfgsync/adapter/src/materializer.rs b/cfgsync/adapter/src/materializer.rs index 764bcc3..02d7448 100644 --- a/cfgsync/adapter/src/materializer.rs +++ b/cfgsync/adapter/src/materializer.rs @@ -3,7 +3,7 @@ use std::{error::Error, sync::Mutex}; use cfgsync_core::NodeRegistration; use serde_json::to_string; -use crate::{ArtifactSet, NodeArtifactsCatalog, RegistrationSnapshot}; +use crate::{MaterializedArtifacts, RegistrationSnapshot, ResolvedNodeArtifacts}; /// Type-erased cfgsync adapter error used to preserve source context. pub type DynCfgsyncError = Box; @@ -14,7 +14,7 @@ pub trait NodeArtifactsMaterializer: Send + Sync { &self, registration: &NodeRegistration, registrations: &RegistrationSnapshot, - ) -> Result, DynCfgsyncError>; + ) -> Result, DynCfgsyncError>; } /// Adapter contract for materializing a whole registration snapshot into @@ -23,7 +23,22 @@ pub trait RegistrationSnapshotMaterializer: Send + Sync { fn materialize_snapshot( &self, registrations: &RegistrationSnapshot, - ) -> Result, DynCfgsyncError>; + ) -> Result; +} + +/// Registration-driven materialization status. +#[derive(Debug, Clone, Default)] +pub enum MaterializationResult { + #[default] + NotReady, + Ready(MaterializedArtifacts), +} + +impl MaterializationResult { + #[must_use] + pub fn ready(nodes: MaterializedArtifacts) -> Self { + Self::Ready(nodes) + } } /// Snapshot materializer wrapper that caches the last materialized result. @@ -34,7 +49,7 @@ pub struct CachedSnapshotMaterializer { struct CachedSnapshot { key: String, - catalog: Option, + result: MaterializationResult, } impl CachedSnapshotMaterializer { @@ -58,7 +73,7 @@ where fn materialize_snapshot( &self, registrations: &RegistrationSnapshot, - ) -> Result, DynCfgsyncError> { + ) -> Result { let key = Self::snapshot_key(registrations)?; { @@ -70,11 +85,11 @@ where if let Some(cached) = &*cache && cached.key == key { - return Ok(cached.catalog.clone()); + return Ok(cached.result.clone()); } } - let catalog = self.inner.materialize_snapshot(registrations)?; + let result = self.inner.materialize_snapshot(registrations)?; let mut cache = self .cache .lock() @@ -82,9 +97,9 @@ where *cache = Some(CachedSnapshot { key, - catalog: catalog.clone(), + result: result.clone(), }); - Ok(catalog) + Ok(result) } } diff --git a/cfgsync/adapter/src/sources.rs b/cfgsync/adapter/src/sources.rs index e655a13..4e4c1a4 100644 --- a/cfgsync/adapter/src/sources.rs +++ b/cfgsync/adapter/src/sources.rs @@ -6,8 +6,9 @@ use cfgsync_core::{ }; use crate::{ - ArtifactSet, DynCfgsyncError, NodeArtifactsCatalog, NodeArtifactsMaterializer, - RegistrationSnapshot, RegistrationSnapshotMaterializer, + ArtifactSet, DynCfgsyncError, MaterializationResult, MaterializedArtifacts, + NodeArtifactsCatalog, NodeArtifactsMaterializer, RegistrationSnapshot, + RegistrationSnapshotMaterializer, ResolvedNodeArtifacts, }; impl NodeArtifactsMaterializer for NodeArtifactsCatalog { @@ -15,10 +16,13 @@ impl NodeArtifactsMaterializer for NodeArtifactsCatalog { &self, registration: &NodeRegistration, _registrations: &RegistrationSnapshot, - ) -> Result, DynCfgsyncError> { - Ok(self - .resolve(®istration.identifier) - .map(build_artifact_set_from_catalog_entry)) + ) -> Result, DynCfgsyncError> { + Ok(self.resolve(®istration.identifier).map(|artifacts| { + ResolvedNodeArtifacts::new( + build_artifact_set_from_catalog_entry(artifacts), + ArtifactSet::default(), + ) + })) } } @@ -26,8 +30,29 @@ impl RegistrationSnapshotMaterializer for NodeArtifactsCatalog { fn materialize_snapshot( &self, _registrations: &RegistrationSnapshot, - ) -> Result, DynCfgsyncError> { - Ok(Some(self.clone())) + ) -> Result { + Ok(MaterializationResult::ready( + MaterializedArtifacts::from_catalog(self.clone()), + )) + } +} + +impl NodeArtifactsMaterializer for MaterializedArtifacts { + fn materialize( + &self, + registration: &NodeRegistration, + _registrations: &RegistrationSnapshot, + ) -> Result, DynCfgsyncError> { + Ok(self.resolve(®istration.identifier)) + } +} + +impl RegistrationSnapshotMaterializer for MaterializedArtifacts { + fn materialize_snapshot( + &self, + _registrations: &RegistrationSnapshot, + ) -> Result { + Ok(MaterializationResult::ready(self.clone())) } } @@ -91,9 +116,9 @@ where let registrations = self.registration_snapshot(); match self.materializer.materialize(®istration, ®istrations) { - Ok(Some(artifacts)) => ConfigResolveResponse::Config(NodeArtifactsPayload::from_files( - artifacts.files().to_vec(), - )), + Ok(Some(artifacts)) => { + ConfigResolveResponse::Config(NodeArtifactsPayload::from_files(artifacts.files())) + } Ok(None) => ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready( ®istration.identifier, )), @@ -164,9 +189,9 @@ where }; let registrations = self.registration_snapshot(); - let catalog = match self.materializer.materialize_snapshot(®istrations) { - Ok(Some(catalog)) => catalog, - Ok(None) => { + let materialized = match self.materializer.materialize_snapshot(®istrations) { + Ok(MaterializationResult::Ready(materialized)) => materialized, + Ok(MaterializationResult::NotReady) => { return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready( ®istration.identifier, )); @@ -178,10 +203,10 @@ where } }; - match catalog.resolve(®istration.identifier) { - Some(config) => ConfigResolveResponse::Config(NodeArtifactsPayload::from_files( - config.files.clone(), - )), + match materialized.resolve(®istration.identifier) { + Some(config) => { + ConfigResolveResponse::Config(NodeArtifactsPayload::from_files(config.files())) + } None => ConfigResolveResponse::Error(CfgsyncErrorResponse::missing_config( ®istration.identifier, )), @@ -204,8 +229,9 @@ mod tests { use super::{MaterializingConfigSource, SnapshotConfigSource}; use crate::{ - CachedSnapshotMaterializer, DynCfgsyncError, NodeArtifacts, NodeArtifactsCatalog, - NodeArtifactsMaterializer, RegistrationSnapshot, RegistrationSnapshotMaterializer, + ArtifactSet, CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, + MaterializedArtifacts, NodeArtifacts, NodeArtifactsCatalog, NodeArtifactsMaterializer, + RegistrationSnapshot, RegistrationSnapshotMaterializer, ResolvedNodeArtifacts, }; #[test] @@ -265,7 +291,7 @@ mod tests { &self, registration: &NodeRegistration, registrations: &RegistrationSnapshot, - ) -> Result, DynCfgsyncError> { + ) -> Result, DynCfgsyncError> { self.calls.fetch_add(1, Ordering::SeqCst); if registrations.len() < 2 { @@ -278,7 +304,10 @@ mod tests { ArtifactFile::new("/peers.txt", peer_count.to_string()), ]; - Ok(Some(crate::ArtifactSet::new(files))) + Ok(Some(ResolvedNodeArtifacts::new( + crate::ArtifactSet::new(files), + ArtifactSet::default(), + ))) } } @@ -318,22 +347,29 @@ mod tests { fn materialize_snapshot( &self, registrations: &RegistrationSnapshot, - ) -> Result, DynCfgsyncError> { + ) -> Result { if registrations.len() < 2 { - return Ok(None); + return Ok(MaterializationResult::NotReady); } - Ok(Some(NodeArtifactsCatalog::new( - registrations - .iter() - .map(|registration| NodeArtifacts { - identifier: registration.identifier.clone(), - files: vec![ArtifactFile::new( - "/config.yaml", - format!("peer_count: {}", registrations.len()), - )], - }) - .collect(), + let nodes = registrations + .iter() + .map(|registration| NodeArtifacts { + identifier: registration.identifier.clone(), + files: vec![ArtifactFile::new( + "/config.yaml", + format!("peer_count: {}", registrations.len()), + )], + }) + .collect(); + let shared = ArtifactSet::new(vec![ArtifactFile::new( + "/shared.txt", + format!("shared_count: {}", registrations.len()), + )]); + + Ok(MaterializationResult::ready(MaterializedArtifacts::new( + NodeArtifactsCatalog::new(nodes), + shared, ))) } } @@ -358,6 +394,7 @@ mod tests { match source.resolve(&node_a) { ConfigResolveResponse::Config(payload) => { assert_eq!(payload.files()[0].content, "peer_count: 2"); + assert_eq!(payload.files()[1].content, "shared_count: 2"); } ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"), } @@ -371,18 +408,20 @@ mod tests { fn materialize_snapshot( &self, registrations: &RegistrationSnapshot, - ) -> Result, DynCfgsyncError> { + ) -> Result { self.calls.fetch_add(1, Ordering::SeqCst); - Ok(Some(NodeArtifactsCatalog::new( - registrations - .iter() - .map(|registration| NodeArtifacts { - identifier: registration.identifier.clone(), - files: vec![ArtifactFile::new("/config.yaml", "cached: true")], - }) - .collect(), - ))) + Ok(MaterializationResult::ready( + MaterializedArtifacts::from_catalog(NodeArtifactsCatalog::new( + registrations + .iter() + .map(|registration| NodeArtifacts { + identifier: registration.identifier.clone(), + files: vec![ArtifactFile::new("/config.yaml", "cached: true")], + }) + .collect(), + )), + )) } } diff --git a/cfgsync/core/src/bundle.rs b/cfgsync/core/src/bundle.rs index 5281b13..31f6723 100644 --- a/cfgsync/core/src/bundle.rs +++ b/cfgsync/core/src/bundle.rs @@ -6,12 +6,23 @@ use crate::NodeArtifactFile; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NodeArtifactsBundle { pub nodes: Vec, + #[serde(default)] + pub shared_files: Vec, } impl NodeArtifactsBundle { #[must_use] pub fn new(nodes: Vec) -> Self { - Self { nodes } + Self { + nodes, + shared_files: Vec::new(), + } + } + + #[must_use] + pub fn with_shared_files(mut self, shared_files: Vec) -> Self { + self.shared_files = shared_files; + self } } diff --git a/cfgsync/core/src/source.rs b/cfgsync/core/src/source.rs index 7981343..9d00d9e 100644 --- a/cfgsync/core/src/source.rs +++ b/cfgsync/core/src/source.rs @@ -75,13 +75,18 @@ pub enum BundleLoadError { #[must_use] pub fn bundle_to_payload_map(bundle: NodeArtifactsBundle) -> HashMap { + let shared_files = bundle.shared_files; + bundle .nodes .into_iter() .map(|node| { let NodeArtifactsBundleEntry { identifier, files } = node; - (identifier, NodeArtifactsPayload::from_files(files)) + let mut payload_files = files; + payload_files.extend(shared_files.clone()); + + (identifier, NodeArtifactsPayload::from_files(payload_files)) }) .collect() } diff --git a/cfgsync/runtime/src/server.rs b/cfgsync/runtime/src/server.rs index 077c075..6bae042 100644 --- a/cfgsync/runtime/src/server.rs +++ b/cfgsync/runtime/src/server.rs @@ -2,7 +2,7 @@ use std::{fs, path::Path, sync::Arc}; use anyhow::Context as _; use cfgsync_adapter::{ - CachedSnapshotMaterializer, MaterializingConfigSource, NodeArtifacts, NodeArtifactsCatalog, + ArtifactSet, CachedSnapshotMaterializer, MaterializedArtifacts, RegistrationSnapshotMaterializer, SnapshotConfigSource, }; use cfgsync_core::{ @@ -91,8 +91,8 @@ fn load_bundle_provider(bundle_path: &Path) -> anyhow::Result anyhow::Result> { let bundle = load_bundle_yaml(bundle_path)?; - let catalog = build_node_catalog(bundle); - let provider = MaterializingConfigSource::new(catalog); + let materialized = build_materialized_artifacts(bundle); + let provider = SnapshotConfigSource::new(materialized); Ok(Arc::new(provider)) } @@ -105,17 +105,20 @@ fn load_bundle_yaml(bundle_path: &Path) -> anyhow::Result { .with_context(|| format!("parsing cfgsync bundle from {}", bundle_path.display())) } -fn build_node_catalog(bundle: NodeArtifactsBundle) -> NodeArtifactsCatalog { +fn build_materialized_artifacts(bundle: NodeArtifactsBundle) -> MaterializedArtifacts { let nodes = bundle .nodes .into_iter() - .map(|node| NodeArtifacts { + .map(|node| cfgsync_adapter::NodeArtifacts { identifier: node.identifier, files: node.files, }) .collect(); - NodeArtifactsCatalog::new(nodes) + MaterializedArtifacts::new( + cfgsync_adapter::NodeArtifactsCatalog::new(nodes), + ArtifactSet::new(bundle.shared_files), + ) } fn resolve_bundle_path(config_path: &Path, bundle_path: &str) -> std::path::PathBuf {