Pass registration snapshots into cfgsync materializers

This commit is contained in:
andrussal 2026-03-10 10:20:30 +01:00
parent 13084c3a36
commit 312dec6178

View File

@ -26,6 +26,41 @@ pub struct CfgsyncNodeArtifacts {
files: Vec<ArtifactFile>,
}
/// Immutable view of registrations currently known to cfgsync.
#[derive(Debug, Clone, Default)]
pub struct RegistrationSnapshot {
registrations: Vec<NodeRegistration>,
}
impl RegistrationSnapshot {
#[must_use]
pub fn new(registrations: Vec<NodeRegistration>) -> Self {
Self { registrations }
}
#[must_use]
pub fn len(&self) -> usize {
self.registrations.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.registrations.is_empty()
}
#[must_use]
pub fn iter(&self) -> impl Iterator<Item = &NodeRegistration> {
self.registrations.iter()
}
#[must_use]
pub fn get(&self, identifier: &str) -> Option<&NodeRegistration> {
self.registrations
.iter()
.find(|registration| registration.identifier == identifier)
}
}
impl CfgsyncNodeArtifacts {
#[must_use]
pub fn new(files: Vec<ArtifactFile>) -> Self {
@ -86,6 +121,7 @@ pub trait CfgsyncMaterializer: Send + Sync {
fn materialize(
&self,
registration: &NodeRegistration,
registrations: &RegistrationSnapshot,
) -> Result<Option<CfgsyncNodeArtifacts>, DynCfgsyncError>;
}
@ -93,6 +129,7 @@ impl CfgsyncMaterializer for CfgsyncNodeCatalog {
fn materialize(
&self,
registration: &NodeRegistration,
_registrations: &RegistrationSnapshot,
) -> Result<Option<CfgsyncNodeArtifacts>, DynCfgsyncError> {
let artifacts = self
.resolve(&registration.identifier)
@ -125,6 +162,15 @@ impl<M> MaterializingConfigProvider<M> {
registrations.get(identifier).cloned()
}
fn registration_snapshot(&self) -> RegistrationSnapshot {
let registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
RegistrationSnapshot::new(registrations.values().cloned().collect())
}
}
impl<M> ConfigProvider for MaterializingConfigProvider<M>
@ -150,8 +196,9 @@ where
));
}
};
let registrations = self.registration_snapshot();
match self.materializer.materialize(&registration) {
match self.materializer.materialize(&registration, &registrations) {
Ok(Some(artifacts)) => {
RepoResponse::Config(CfgSyncPayload::from_files(artifacts.files().to_vec()))
}
@ -284,10 +331,15 @@ fn build_node_artifacts_from_config(config: &CfgsyncNodeConfig) -> CfgsyncNodeAr
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use cfgsync_artifacts::ArtifactFile;
use cfgsync_core::{CfgSyncErrorCode, ConfigProvider, NodeRegistration, RepoResponse};
use super::{CfgsyncNodeCatalog, CfgsyncNodeConfig, MaterializingConfigProvider};
use super::{
CfgsyncMaterializer, CfgsyncNodeArtifacts, CfgsyncNodeCatalog, CfgsyncNodeConfig,
DynCfgsyncError, MaterializingConfigProvider, RegistrationSnapshot,
};
#[test]
fn catalog_resolves_identifier() {
@ -332,4 +384,56 @@ mod tests {
RepoResponse::Error(error) => assert!(matches!(error.code, CfgSyncErrorCode::NotReady)),
}
}
struct ThresholdMaterializer {
calls: AtomicUsize,
}
impl CfgsyncMaterializer for ThresholdMaterializer {
fn materialize(
&self,
registration: &NodeRegistration,
registrations: &RegistrationSnapshot,
) -> Result<Option<CfgsyncNodeArtifacts>, DynCfgsyncError> {
self.calls.fetch_add(1, Ordering::SeqCst);
if registrations.len() < 2 {
return Ok(None);
}
let peer_count = registrations.iter().count();
let files = vec![
ArtifactFile::new("/config.yaml", format!("id: {}", registration.identifier)),
ArtifactFile::new("/shared.yaml", format!("peers: {peer_count}")),
];
Ok(Some(CfgsyncNodeArtifacts::new(files)))
}
}
#[test]
fn materializing_provider_uses_registration_snapshot_for_readiness() {
let provider = MaterializingConfigProvider::new(ThresholdMaterializer {
calls: AtomicUsize::new(0),
});
let node_a = NodeRegistration::new("node-a", "127.0.0.1".parse().expect("parse ip"));
let node_b = NodeRegistration::new("node-b", "127.0.0.2".parse().expect("parse ip"));
let _ = provider.register(node_a.clone());
match provider.resolve(&node_a) {
RepoResponse::Config(_) => panic!("expected not-ready error"),
RepoResponse::Error(error) => assert!(matches!(error.code, CfgSyncErrorCode::NotReady)),
}
let _ = provider.register(node_b);
match provider.resolve(&node_a) {
RepoResponse::Config(payload) => {
assert_eq!(payload.files()[0].content, "id: node-a");
assert_eq!(payload.files()[1].content, "peers: 2");
}
RepoResponse::Error(error) => panic!("expected config, got {error}"),
}
}
}