use std::{error::Error, sync::Mutex}; use serde_json::to_string; use crate::{MaterializedArtifacts, RegistrationSnapshot}; /// Type-erased cfgsync adapter error used to preserve source context. pub type DynCfgsyncError = Box; /// Adapter contract for materializing a whole registration snapshot into /// cfgsync artifacts. pub trait RegistrationSnapshotMaterializer: Send + Sync { /// Materializes the current registration set. /// /// Implementations decide: /// - when the current snapshot is ready to serve /// - which per-node artifacts should be produced /// - which shared artifacts should accompany every node fn materialize_snapshot( &self, registrations: &RegistrationSnapshot, ) -> Result; } /// Optional hook for persisting or publishing materialized cfgsync artifacts. pub trait MaterializedArtifactsSink: Send + Sync { /// Persists or publishes a ready materialization result. fn persist(&self, artifacts: &MaterializedArtifacts) -> Result<(), DynCfgsyncError>; } /// Registration-driven materialization status. #[derive(Debug, Clone, Default)] pub enum MaterializationResult { #[default] NotReady, Ready(MaterializedArtifacts), } impl MaterializationResult { /// Creates a ready materialization result. #[must_use] pub fn ready(artifacts: MaterializedArtifacts) -> Self { Self::Ready(artifacts) } /// Returns the ready artifacts when materialization succeeded. #[must_use] pub fn artifacts(&self) -> Option<&MaterializedArtifacts> { match self { Self::NotReady => None, Self::Ready(artifacts) => Some(artifacts), } } } /// Snapshot materializer wrapper that caches the last materialized result. pub struct CachedSnapshotMaterializer { inner: M, cache: Mutex>, } struct CachedSnapshot { key: String, result: MaterializationResult, } impl CachedSnapshotMaterializer { /// Wraps a snapshot materializer with deterministic snapshot-result /// caching. #[must_use] pub fn new(inner: M) -> Self { Self { inner, cache: Mutex::new(None), } } fn snapshot_key(registrations: &RegistrationSnapshot) -> Result { Ok(to_string(registrations)?) } } impl RegistrationSnapshotMaterializer for CachedSnapshotMaterializer where M: RegistrationSnapshotMaterializer, { fn materialize_snapshot( &self, registrations: &RegistrationSnapshot, ) -> Result { let key = Self::snapshot_key(registrations)?; { let cache = self .cache .lock() .expect("cfgsync snapshot cache should not be poisoned"); if let Some(cached) = &*cache && cached.key == key { return Ok(cached.result.clone()); } } let result = self.inner.materialize_snapshot(registrations)?; let mut cache = self .cache .lock() .expect("cfgsync snapshot cache should not be poisoned"); *cache = Some(CachedSnapshot { key, result: result.clone(), }); Ok(result) } } /// Snapshot materializer wrapper that persists ready results through a generic /// sink. It only persists once per distinct registration snapshot. pub struct PersistingSnapshotMaterializer { inner: M, sink: S, persisted_key: Mutex>, } impl PersistingSnapshotMaterializer { /// Wraps a snapshot materializer with one-time persistence for each /// distinct registration snapshot. #[must_use] pub fn new(inner: M, sink: S) -> Self { Self { inner, sink, persisted_key: Mutex::new(None), } } } impl RegistrationSnapshotMaterializer for PersistingSnapshotMaterializer where M: RegistrationSnapshotMaterializer, S: MaterializedArtifactsSink, { fn materialize_snapshot( &self, registrations: &RegistrationSnapshot, ) -> Result { let key = CachedSnapshotMaterializer::::snapshot_key(registrations)?; let result = self.inner.materialize_snapshot(registrations)?; let Some(artifacts) = result.artifacts() else { return Ok(result); }; { let persisted_key = self .persisted_key .lock() .expect("cfgsync persistence state should not be poisoned"); if persisted_key.as_deref() == Some(&key) { return Ok(result); } } self.sink.persist(artifacts)?; let mut persisted_key = self .persisted_key .lock() .expect("cfgsync persistence state should not be poisoned"); *persisted_key = Some(key); Ok(result) } } #[cfg(test)] mod tests { use std::sync::{ Arc, atomic::{AtomicUsize, Ordering}, }; use cfgsync_artifacts::{ArtifactFile, ArtifactSet}; use super::{ CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, MaterializedArtifacts, MaterializedArtifactsSink, PersistingSnapshotMaterializer, RegistrationSnapshotMaterializer, }; use crate::RegistrationSnapshot; struct CountingMaterializer; impl RegistrationSnapshotMaterializer for CountingMaterializer { fn materialize_snapshot( &self, registrations: &RegistrationSnapshot, ) -> Result { if registrations.is_empty() { return Ok(MaterializationResult::NotReady); } let nodes = registrations.iter().map(|registration| { ( registration.identifier.clone(), ArtifactSet::new(vec![ArtifactFile::new( "/config.yaml".to_string(), "ready: true".to_string(), )]), ) }); Ok(MaterializationResult::ready( MaterializedArtifacts::from_nodes(nodes).with_shared(ArtifactSet::new(vec![ ArtifactFile::new("/shared.yaml".to_string(), "cluster: ready".to_string()), ])), )) } } struct CountingSink { writes: Arc, } impl MaterializedArtifactsSink for CountingSink { fn persist(&self, _artifacts: &MaterializedArtifacts) -> Result<(), DynCfgsyncError> { self.writes.fetch_add(1, Ordering::SeqCst); Ok(()) } } #[test] fn cached_snapshot_materializer_reuses_previous_result() { let materializer = CachedSnapshotMaterializer::new(CountingMaterializer); let snapshot = RegistrationSnapshot::new(vec![cfgsync_core::NodeRegistration::new( "node-1".to_string(), "127.0.0.1".parse().expect("parse ip"), )]); let first = materializer .materialize_snapshot(&snapshot) .expect("first materialization"); let second = materializer .materialize_snapshot(&snapshot) .expect("second materialization"); assert!(matches!(first, MaterializationResult::Ready(_))); assert!(matches!(second, MaterializationResult::Ready(_))); } #[test] fn persisting_snapshot_materializer_writes_ready_snapshots_once() { let writes = Arc::new(AtomicUsize::new(0)); let materializer = PersistingSnapshotMaterializer::new( CountingMaterializer, CountingSink { writes: writes.clone(), }, ); let snapshot = RegistrationSnapshot::new(vec![cfgsync_core::NodeRegistration::new( "node-1".to_string(), "127.0.0.1".parse().expect("parse ip"), )]); materializer .materialize_snapshot(&snapshot) .expect("first materialization"); materializer .materialize_snapshot(&snapshot) .expect("second materialization"); assert_eq!(writes.load(Ordering::SeqCst), 1); } }