From 95d1a751162774d059c1b0dab65ae168da99698a Mon Sep 17 00:00:00 2001 From: andrussal Date: Tue, 10 Mar 2026 14:26:00 +0100 Subject: [PATCH] Add cfgsync persistence and shared artifact hooks --- cfgsync/adapter/src/lib.rs | 4 +- cfgsync/adapter/src/materializer.rs | 154 ++++++++++++++++++++++++++++ 2 files changed, 156 insertions(+), 2 deletions(-) diff --git a/cfgsync/adapter/src/lib.rs b/cfgsync/adapter/src/lib.rs index 9c172a9..0ddf349 100644 --- a/cfgsync/adapter/src/lib.rs +++ b/cfgsync/adapter/src/lib.rs @@ -12,8 +12,8 @@ pub use deployment::{ build_node_artifact_catalog, }; pub use materializer::{ - CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, NodeArtifactsMaterializer, - RegistrationSnapshotMaterializer, + CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, MaterializedArtifactsSink, + NodeArtifactsMaterializer, PersistingSnapshotMaterializer, RegistrationSnapshotMaterializer, }; pub use registrations::RegistrationSnapshot; pub use sources::{MaterializingConfigSource, SnapshotConfigSource}; diff --git a/cfgsync/adapter/src/materializer.rs b/cfgsync/adapter/src/materializer.rs index 02d7448..b2d18de 100644 --- a/cfgsync/adapter/src/materializer.rs +++ b/cfgsync/adapter/src/materializer.rs @@ -26,6 +26,11 @@ pub trait RegistrationSnapshotMaterializer: Send + Sync { ) -> Result; } +/// Optional hook for persisting or publishing materialized cfgsync artifacts. +pub trait MaterializedArtifactsSink: Send + Sync { + fn persist(&self, artifacts: &MaterializedArtifacts) -> Result<(), DynCfgsyncError>; +} + /// Registration-driven materialization status. #[derive(Debug, Clone, Default)] pub enum MaterializationResult { @@ -39,6 +44,14 @@ impl MaterializationResult { pub fn ready(nodes: MaterializedArtifacts) -> Self { Self::Ready(nodes) } + + #[must_use] + pub fn artifacts(&self) -> Option<&MaterializedArtifacts> { + match self { + Self::NotReady => None, + Self::Ready(artifacts) => Some(artifacts), + } + } } /// Snapshot materializer wrapper that caches the last materialized result. @@ -103,3 +116,144 @@ where Ok(result) } } + +/// Snapshot materializer wrapper that persists ready results through a generic +/// sink. It only persists once per distinct registration snapshot. +pub struct PersistingSnapshotMaterializer { + inner: M, + sink: S, + persisted_key: Mutex>, +} + +impl PersistingSnapshotMaterializer { + #[must_use] + pub fn new(inner: M, sink: S) -> Self { + Self { + inner, + sink, + persisted_key: Mutex::new(None), + } + } +} + +impl RegistrationSnapshotMaterializer for PersistingSnapshotMaterializer +where + M: RegistrationSnapshotMaterializer, + S: MaterializedArtifactsSink, +{ + fn materialize_snapshot( + &self, + registrations: &RegistrationSnapshot, + ) -> Result { + let key = CachedSnapshotMaterializer::::snapshot_key(registrations)?; + let result = self.inner.materialize_snapshot(registrations)?; + + let Some(artifacts) = result.artifacts() else { + return Ok(result); + }; + + { + let persisted_key = self + .persisted_key + .lock() + .expect("cfgsync persistence state should not be poisoned"); + + if persisted_key.as_deref() == Some(&key) { + return Ok(result); + } + } + + self.sink.persist(artifacts)?; + + let mut persisted_key = self + .persisted_key + .lock() + .expect("cfgsync persistence state should not be poisoned"); + *persisted_key = Some(key); + + Ok(result) + } +} + +#[cfg(test)] +mod tests { + use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }; + + use cfgsync_artifacts::ArtifactFile; + + use super::{ + CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, MaterializedArtifacts, + MaterializedArtifactsSink, PersistingSnapshotMaterializer, + RegistrationSnapshotMaterializer, + }; + use crate::{ArtifactSet, NodeArtifacts, NodeArtifactsCatalog, RegistrationSnapshot}; + + struct CountingMaterializer; + + impl RegistrationSnapshotMaterializer for CountingMaterializer { + fn materialize_snapshot( + &self, + registrations: &RegistrationSnapshot, + ) -> Result { + if registrations.is_empty() { + return Ok(MaterializationResult::NotReady); + } + + let nodes = registrations + .iter() + .map(|registration| NodeArtifacts { + identifier: registration.identifier.clone(), + files: vec![ArtifactFile::new("/config.yaml", "ready: true")], + }) + .collect(); + + Ok(MaterializationResult::ready(MaterializedArtifacts::new( + NodeArtifactsCatalog::new(nodes), + ArtifactSet::new(vec![ArtifactFile::new("/shared.yaml", "cluster: ready")]), + ))) + } + } + + struct CountingSink { + writes: Arc, + } + + impl MaterializedArtifactsSink for CountingSink { + fn persist(&self, _artifacts: &MaterializedArtifacts) -> Result<(), DynCfgsyncError> { + self.writes.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + } + + #[test] + fn persisting_snapshot_materializer_writes_ready_snapshots_once() { + let writes = Arc::new(AtomicUsize::new(0)); + let materializer = CachedSnapshotMaterializer::new(PersistingSnapshotMaterializer::new( + CountingMaterializer, + CountingSink { + writes: Arc::clone(&writes), + }, + )); + + let empty = RegistrationSnapshot::default(); + let ready = RegistrationSnapshot::new(vec![cfgsync_core::NodeRegistration::new( + "node-0", + "127.0.0.1".parse().expect("parse ip"), + )]); + + let _ = materializer + .materialize_snapshot(&empty) + .expect("not-ready snapshot"); + let _ = materializer + .materialize_snapshot(&ready) + .expect("ready snapshot"); + let _ = materializer + .materialize_snapshot(&ready) + .expect("cached ready snapshot"); + + assert_eq!(writes.load(Ordering::SeqCst), 1); + } +}