From 312dec6178a75289555849ba6016f886a530ce48 Mon Sep 17 00:00:00 2001 From: andrussal Date: Tue, 10 Mar 2026 10:20:30 +0100 Subject: [PATCH] Pass registration snapshots into cfgsync materializers --- cfgsync/adapter/src/lib.rs | 108 ++++++++++++++++++++++++++++++++++++- 1 file changed, 106 insertions(+), 2 deletions(-) diff --git a/cfgsync/adapter/src/lib.rs b/cfgsync/adapter/src/lib.rs index 226b98d..56d96bd 100644 --- a/cfgsync/adapter/src/lib.rs +++ b/cfgsync/adapter/src/lib.rs @@ -26,6 +26,41 @@ pub struct CfgsyncNodeArtifacts { files: Vec, } +/// Immutable view of registrations currently known to cfgsync. +#[derive(Debug, Clone, Default)] +pub struct RegistrationSnapshot { + registrations: Vec, +} + +impl RegistrationSnapshot { + #[must_use] + pub fn new(registrations: Vec) -> Self { + Self { registrations } + } + + #[must_use] + pub fn len(&self) -> usize { + self.registrations.len() + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.registrations.is_empty() + } + + #[must_use] + pub fn iter(&self) -> impl Iterator { + self.registrations.iter() + } + + #[must_use] + pub fn get(&self, identifier: &str) -> Option<&NodeRegistration> { + self.registrations + .iter() + .find(|registration| registration.identifier == identifier) + } +} + impl CfgsyncNodeArtifacts { #[must_use] pub fn new(files: Vec) -> Self { @@ -86,6 +121,7 @@ pub trait CfgsyncMaterializer: Send + Sync { fn materialize( &self, registration: &NodeRegistration, + registrations: &RegistrationSnapshot, ) -> Result, DynCfgsyncError>; } @@ -93,6 +129,7 @@ impl CfgsyncMaterializer for CfgsyncNodeCatalog { fn materialize( &self, registration: &NodeRegistration, + _registrations: &RegistrationSnapshot, ) -> Result, DynCfgsyncError> { let artifacts = self .resolve(®istration.identifier) @@ -125,6 +162,15 @@ impl MaterializingConfigProvider { registrations.get(identifier).cloned() } + + fn registration_snapshot(&self) -> RegistrationSnapshot { + let registrations = self + .registrations + .lock() + .expect("cfgsync registration store should not be poisoned"); + + RegistrationSnapshot::new(registrations.values().cloned().collect()) + } } impl ConfigProvider for MaterializingConfigProvider @@ -150,8 +196,9 @@ where )); } }; + let registrations = self.registration_snapshot(); - match self.materializer.materialize(®istration) { + match self.materializer.materialize(®istration, ®istrations) { Ok(Some(artifacts)) => { RepoResponse::Config(CfgSyncPayload::from_files(artifacts.files().to_vec())) } @@ -284,10 +331,15 @@ fn build_node_artifacts_from_config(config: &CfgsyncNodeConfig) -> CfgsyncNodeAr #[cfg(test)] mod tests { + use std::sync::atomic::{AtomicUsize, Ordering}; + use cfgsync_artifacts::ArtifactFile; use cfgsync_core::{CfgSyncErrorCode, ConfigProvider, NodeRegistration, RepoResponse}; - use super::{CfgsyncNodeCatalog, CfgsyncNodeConfig, MaterializingConfigProvider}; + use super::{ + CfgsyncMaterializer, CfgsyncNodeArtifacts, CfgsyncNodeCatalog, CfgsyncNodeConfig, + DynCfgsyncError, MaterializingConfigProvider, RegistrationSnapshot, + }; #[test] fn catalog_resolves_identifier() { @@ -332,4 +384,56 @@ mod tests { RepoResponse::Error(error) => assert!(matches!(error.code, CfgSyncErrorCode::NotReady)), } } + + struct ThresholdMaterializer { + calls: AtomicUsize, + } + + impl CfgsyncMaterializer for ThresholdMaterializer { + fn materialize( + &self, + registration: &NodeRegistration, + registrations: &RegistrationSnapshot, + ) -> Result, DynCfgsyncError> { + self.calls.fetch_add(1, Ordering::SeqCst); + + if registrations.len() < 2 { + return Ok(None); + } + + let peer_count = registrations.iter().count(); + let files = vec![ + ArtifactFile::new("/config.yaml", format!("id: {}", registration.identifier)), + ArtifactFile::new("/shared.yaml", format!("peers: {peer_count}")), + ]; + + Ok(Some(CfgsyncNodeArtifacts::new(files))) + } + } + + #[test] + fn materializing_provider_uses_registration_snapshot_for_readiness() { + let provider = MaterializingConfigProvider::new(ThresholdMaterializer { + calls: AtomicUsize::new(0), + }); + let node_a = NodeRegistration::new("node-a", "127.0.0.1".parse().expect("parse ip")); + let node_b = NodeRegistration::new("node-b", "127.0.0.2".parse().expect("parse ip")); + + let _ = provider.register(node_a.clone()); + + match provider.resolve(&node_a) { + RepoResponse::Config(_) => panic!("expected not-ready error"), + RepoResponse::Error(error) => assert!(matches!(error.code, CfgSyncErrorCode::NotReady)), + } + + let _ = provider.register(node_b); + + match provider.resolve(&node_a) { + RepoResponse::Config(payload) => { + assert_eq!(payload.files()[0].content, "id: node-a"); + assert_eq!(payload.files()[1].content, "peers: 2"); + } + RepoResponse::Error(error) => panic!("expected config, got {error}"), + } + } }