diff --git a/cfgsync/adapter/src/lib.rs b/cfgsync/adapter/src/lib.rs index 56d96bd..f5b24d5 100644 --- a/cfgsync/adapter/src/lib.rs +++ b/cfgsync/adapter/src/lib.rs @@ -125,6 +125,15 @@ pub trait CfgsyncMaterializer: Send + Sync { ) -> Result, DynCfgsyncError>; } +/// Adapter contract for materializing a whole registration snapshot into +/// per-node cfgsync artifacts. +pub trait CfgsyncSnapshotMaterializer: Send + Sync { + fn materialize_snapshot( + &self, + registrations: &RegistrationSnapshot, + ) -> Result, DynCfgsyncError>; +} + impl CfgsyncMaterializer for CfgsyncNodeCatalog { fn materialize( &self, @@ -139,6 +148,15 @@ impl CfgsyncMaterializer for CfgsyncNodeCatalog { } } +impl CfgsyncSnapshotMaterializer for CfgsyncNodeCatalog { + fn materialize_snapshot( + &self, + _registrations: &RegistrationSnapshot, + ) -> Result, DynCfgsyncError> { + Ok(Some(self.clone())) + } +} + /// Registration-aware provider backed by an adapter materializer. pub struct MaterializingConfigProvider { materializer: M, @@ -173,6 +191,88 @@ impl MaterializingConfigProvider { } } +/// Registration-aware provider backed by a snapshot materializer. +pub struct SnapshotMaterializingConfigProvider { + materializer: M, + registrations: Mutex>, +} + +impl SnapshotMaterializingConfigProvider { + #[must_use] + pub fn new(materializer: M) -> Self { + Self { + materializer, + registrations: Mutex::new(HashMap::new()), + } + } + + fn registration_for(&self, identifier: &str) -> Option { + let registrations = self + .registrations + .lock() + .expect("cfgsync registration store should not be poisoned"); + + registrations.get(identifier).cloned() + } + + fn registration_snapshot(&self) -> RegistrationSnapshot { + let registrations = self + .registrations + .lock() + .expect("cfgsync registration store should not be poisoned"); + + RegistrationSnapshot::new(registrations.values().cloned().collect()) + } +} + +impl ConfigProvider for SnapshotMaterializingConfigProvider +where + M: CfgsyncSnapshotMaterializer, +{ + fn register(&self, registration: NodeRegistration) -> RegistrationResponse { + let mut registrations = self + .registrations + .lock() + .expect("cfgsync registration store should not be poisoned"); + registrations.insert(registration.identifier.clone(), registration); + + RegistrationResponse::Registered + } + + fn resolve(&self, registration: &NodeRegistration) -> RepoResponse { + let registration = match self.registration_for(®istration.identifier) { + Some(registration) => registration, + None => { + return RepoResponse::Error(CfgSyncErrorResponse::not_ready( + ®istration.identifier, + )); + } + }; + + let registrations = self.registration_snapshot(); + let catalog = match self.materializer.materialize_snapshot(®istrations) { + Ok(Some(catalog)) => catalog, + Ok(None) => { + return RepoResponse::Error(CfgSyncErrorResponse::not_ready( + ®istration.identifier, + )); + } + Err(error) => { + return RepoResponse::Error(CfgSyncErrorResponse::internal(format!( + "failed to materialize config snapshot: {error}" + ))); + } + }; + + match catalog.resolve(®istration.identifier) { + Some(config) => RepoResponse::Config(CfgSyncPayload::from_files(config.files.clone())), + None => RepoResponse::Error(CfgSyncErrorResponse::missing_config( + ®istration.identifier, + )), + } + } +} + impl ConfigProvider for MaterializingConfigProvider where M: CfgsyncMaterializer,