Add cfgsync persistence and shared artifact hooks

This commit is contained in:
andrussal 2026-03-10 14:26:00 +01:00
parent 8681117301
commit 95d1a75116
2 changed files with 156 additions and 2 deletions

View File

@ -12,8 +12,8 @@ pub use deployment::{
build_node_artifact_catalog,
};
pub use materializer::{
CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, NodeArtifactsMaterializer,
RegistrationSnapshotMaterializer,
CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, MaterializedArtifactsSink,
NodeArtifactsMaterializer, PersistingSnapshotMaterializer, RegistrationSnapshotMaterializer,
};
pub use registrations::RegistrationSnapshot;
pub use sources::{MaterializingConfigSource, SnapshotConfigSource};

View File

@ -26,6 +26,11 @@ pub trait RegistrationSnapshotMaterializer: Send + Sync {
) -> Result<MaterializationResult, DynCfgsyncError>;
}
/// Optional hook for persisting or publishing materialized cfgsync artifacts.
pub trait MaterializedArtifactsSink: Send + Sync {
fn persist(&self, artifacts: &MaterializedArtifacts) -> Result<(), DynCfgsyncError>;
}
/// Registration-driven materialization status.
#[derive(Debug, Clone, Default)]
pub enum MaterializationResult {
@ -39,6 +44,14 @@ impl MaterializationResult {
pub fn ready(nodes: MaterializedArtifacts) -> Self {
Self::Ready(nodes)
}
#[must_use]
pub fn artifacts(&self) -> Option<&MaterializedArtifacts> {
match self {
Self::NotReady => None,
Self::Ready(artifacts) => Some(artifacts),
}
}
}
/// Snapshot materializer wrapper that caches the last materialized result.
@ -103,3 +116,144 @@ where
Ok(result)
}
}
/// Snapshot materializer wrapper that persists ready results through a generic
/// sink. It only persists once per distinct registration snapshot.
pub struct PersistingSnapshotMaterializer<M, S> {
inner: M,
sink: S,
persisted_key: Mutex<Option<String>>,
}
impl<M, S> PersistingSnapshotMaterializer<M, S> {
#[must_use]
pub fn new(inner: M, sink: S) -> Self {
Self {
inner,
sink,
persisted_key: Mutex::new(None),
}
}
}
impl<M, S> RegistrationSnapshotMaterializer for PersistingSnapshotMaterializer<M, S>
where
M: RegistrationSnapshotMaterializer,
S: MaterializedArtifactsSink,
{
fn materialize_snapshot(
&self,
registrations: &RegistrationSnapshot,
) -> Result<MaterializationResult, DynCfgsyncError> {
let key = CachedSnapshotMaterializer::<M>::snapshot_key(registrations)?;
let result = self.inner.materialize_snapshot(registrations)?;
let Some(artifacts) = result.artifacts() else {
return Ok(result);
};
{
let persisted_key = self
.persisted_key
.lock()
.expect("cfgsync persistence state should not be poisoned");
if persisted_key.as_deref() == Some(&key) {
return Ok(result);
}
}
self.sink.persist(artifacts)?;
let mut persisted_key = self
.persisted_key
.lock()
.expect("cfgsync persistence state should not be poisoned");
*persisted_key = Some(key);
Ok(result)
}
}
#[cfg(test)]
mod tests {
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
use cfgsync_artifacts::ArtifactFile;
use super::{
CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, MaterializedArtifacts,
MaterializedArtifactsSink, PersistingSnapshotMaterializer,
RegistrationSnapshotMaterializer,
};
use crate::{ArtifactSet, NodeArtifacts, NodeArtifactsCatalog, RegistrationSnapshot};
struct CountingMaterializer;
impl RegistrationSnapshotMaterializer for CountingMaterializer {
fn materialize_snapshot(
&self,
registrations: &RegistrationSnapshot,
) -> Result<MaterializationResult, DynCfgsyncError> {
if registrations.is_empty() {
return Ok(MaterializationResult::NotReady);
}
let nodes = registrations
.iter()
.map(|registration| NodeArtifacts {
identifier: registration.identifier.clone(),
files: vec![ArtifactFile::new("/config.yaml", "ready: true")],
})
.collect();
Ok(MaterializationResult::ready(MaterializedArtifacts::new(
NodeArtifactsCatalog::new(nodes),
ArtifactSet::new(vec![ArtifactFile::new("/shared.yaml", "cluster: ready")]),
)))
}
}
struct CountingSink {
writes: Arc<AtomicUsize>,
}
impl MaterializedArtifactsSink for CountingSink {
fn persist(&self, _artifacts: &MaterializedArtifacts) -> Result<(), DynCfgsyncError> {
self.writes.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
#[test]
fn persisting_snapshot_materializer_writes_ready_snapshots_once() {
let writes = Arc::new(AtomicUsize::new(0));
let materializer = CachedSnapshotMaterializer::new(PersistingSnapshotMaterializer::new(
CountingMaterializer,
CountingSink {
writes: Arc::clone(&writes),
},
));
let empty = RegistrationSnapshot::default();
let ready = RegistrationSnapshot::new(vec![cfgsync_core::NodeRegistration::new(
"node-0",
"127.0.0.1".parse().expect("parse ip"),
)]);
let _ = materializer
.materialize_snapshot(&empty)
.expect("not-ready snapshot");
let _ = materializer
.materialize_snapshot(&ready)
.expect("ready snapshot");
let _ = materializer
.materialize_snapshot(&ready)
.expect("cached ready snapshot");
assert_eq!(writes.load(Ordering::SeqCst), 1);
}
}