diff --git a/cfgsync/adapter/src/artifacts.rs b/cfgsync/adapter/src/artifacts.rs new file mode 100644 index 0000000..4ad2693 --- /dev/null +++ b/cfgsync/adapter/src/artifacts.rs @@ -0,0 +1,74 @@ +use std::collections::HashMap; + +use cfgsync_artifacts::ArtifactFile; +use serde::{Deserialize, Serialize}; + +/// Per-node artifact payload served by cfgsync for one registered node. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodeArtifacts { + /// Stable node identifier resolved by the adapter. + pub identifier: String, + /// Files served to the node after cfgsync registration. + pub files: Vec, +} + +/// Materialized artifact files for a single registered node. +#[derive(Debug, Clone, Default)] +pub struct ArtifactSet { + files: Vec, +} + +impl ArtifactSet { + #[must_use] + pub fn new(files: Vec) -> Self { + Self { files } + } + + #[must_use] + pub fn files(&self) -> &[ArtifactFile] { + &self.files + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.files.is_empty() + } +} + +/// Artifact payloads indexed by stable node identifier. +#[derive(Debug, Clone, Default)] +pub struct NodeArtifactsCatalog { + nodes: HashMap, +} + +impl NodeArtifactsCatalog { + #[must_use] + pub fn new(nodes: Vec) -> Self { + let nodes = nodes + .into_iter() + .map(|node| (node.identifier.clone(), node)) + .collect(); + + Self { nodes } + } + + #[must_use] + pub fn resolve(&self, identifier: &str) -> Option<&NodeArtifacts> { + self.nodes.get(identifier) + } + + #[must_use] + pub fn len(&self) -> usize { + self.nodes.len() + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.nodes.is_empty() + } + + #[must_use] + pub fn into_nodes(self) -> Vec { + self.nodes.into_values().collect() + } +} diff --git a/cfgsync/adapter/src/deployment.rs b/cfgsync/adapter/src/deployment.rs new file mode 100644 index 0000000..c2e37bc --- /dev/null +++ b/cfgsync/adapter/src/deployment.rs @@ -0,0 +1,118 @@ +use std::error::Error; + +use cfgsync_artifacts::ArtifactFile; +use thiserror::Error; + +use crate::{NodeArtifacts, NodeArtifactsCatalog}; + +/// Adapter contract for converting an application deployment model into +/// node-specific serialized config payloads. +pub trait DeploymentAdapter { + type Deployment; + type Node; + type NodeConfig; + type Error: Error + Send + Sync + 'static; + + fn nodes(deployment: &Self::Deployment) -> &[Self::Node]; + + fn node_identifier(index: usize, node: &Self::Node) -> String; + + fn build_node_config( + deployment: &Self::Deployment, + node: &Self::Node, + ) -> Result; + + fn rewrite_for_hostnames( + deployment: &Self::Deployment, + node_index: usize, + hostnames: &[String], + config: &mut Self::NodeConfig, + ) -> Result<(), Self::Error>; + + fn serialize_node_config(config: &Self::NodeConfig) -> Result; +} + +/// High-level failures while building adapter output for cfgsync. +#[derive(Debug, Error)] +pub enum BuildCfgsyncNodesError { + #[error("cfgsync hostnames mismatch (nodes={nodes}, hostnames={hostnames})")] + HostnameCountMismatch { nodes: usize, hostnames: usize }, + #[error("cfgsync adapter failed: {source}")] + Adapter { + #[source] + source: super::DynCfgsyncError, + }, +} + +fn adapter_error(source: E) -> BuildCfgsyncNodesError +where + E: Error + Send + Sync + 'static, +{ + BuildCfgsyncNodesError::Adapter { + source: Box::new(source), + } +} + +/// Builds cfgsync node configs for a deployment by: +/// 1) validating hostname count, +/// 2) building each node config, +/// 3) rewriting host references, +/// 4) serializing each node payload. +pub fn build_cfgsync_node_configs( + deployment: &E::Deployment, + hostnames: &[String], +) -> Result, BuildCfgsyncNodesError> { + Ok(build_node_artifact_catalog::(deployment, hostnames)?.into_nodes()) +} + +/// Builds cfgsync node configs and indexes them by stable identifier. +pub fn build_node_artifact_catalog( + deployment: &E::Deployment, + hostnames: &[String], +) -> Result { + let nodes = E::nodes(deployment); + ensure_hostname_count(nodes.len(), hostnames.len())?; + + let mut output = Vec::with_capacity(nodes.len()); + for (index, node) in nodes.iter().enumerate() { + output.push(build_node_entry::(deployment, node, index, hostnames)?); + } + + Ok(NodeArtifactsCatalog::new(output)) +} + +fn ensure_hostname_count(nodes: usize, hostnames: usize) -> Result<(), BuildCfgsyncNodesError> { + if nodes != hostnames { + return Err(BuildCfgsyncNodesError::HostnameCountMismatch { nodes, hostnames }); + } + + Ok(()) +} + +fn build_node_entry( + deployment: &E::Deployment, + node: &E::Node, + index: usize, + hostnames: &[String], +) -> Result { + let node_config = build_rewritten_node_config::(deployment, node, index, hostnames)?; + let config_yaml = E::serialize_node_config(&node_config).map_err(adapter_error)?; + + Ok(NodeArtifacts { + identifier: E::node_identifier(index, node), + files: vec![ArtifactFile::new("/config.yaml", &config_yaml)], + }) +} + +fn build_rewritten_node_config( + deployment: &E::Deployment, + node: &E::Node, + index: usize, + hostnames: &[String], +) -> Result { + let mut node_config = E::build_node_config(deployment, node).map_err(adapter_error)?; + E::rewrite_for_hostnames(deployment, index, hostnames, &mut node_config) + .map_err(adapter_error)?; + + Ok(node_config) +} diff --git a/cfgsync/adapter/src/lib.rs b/cfgsync/adapter/src/lib.rs index aee1864..c530f62 100644 --- a/cfgsync/adapter/src/lib.rs +++ b/cfgsync/adapter/src/lib.rs @@ -1,596 +1,16 @@ -use std::{collections::HashMap, error::Error, sync::Mutex}; +mod artifacts; +mod deployment; +mod materializer; +mod registrations; +mod sources; -use cfgsync_artifacts::ArtifactFile; -use cfgsync_core::{ - CfgsyncErrorResponse, ConfigResolveResponse, NodeArtifactsPayload, NodeConfigSource, - NodeRegistration, RegisterNodeResponse, +pub use artifacts::{ArtifactSet, NodeArtifacts, NodeArtifactsCatalog}; +pub use deployment::{ + BuildCfgsyncNodesError, DeploymentAdapter, build_cfgsync_node_configs, + build_node_artifact_catalog, }; -use serde::{Deserialize, Serialize}; -use thiserror::Error; - -/// Type-erased cfgsync adapter error used to preserve source context. -pub type DynCfgsyncError = Box; - -/// Per-node artifact payload served by cfgsync for one registered node. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct NodeArtifacts { - /// Stable node identifier resolved by the adapter. - pub identifier: String, - /// Files served to the node after cfgsync registration. - pub files: Vec, -} - -/// Materialized artifact files for a single registered node. -#[derive(Debug, Clone, Default)] -pub struct ArtifactSet { - files: Vec, -} - -/// Immutable view of registrations currently known to cfgsync. -#[derive(Debug, Clone, Default)] -pub struct RegistrationSet { - registrations: Vec, -} - -impl RegistrationSet { - #[must_use] - pub fn new(registrations: Vec) -> Self { - Self { registrations } - } - - #[must_use] - pub fn len(&self) -> usize { - self.registrations.len() - } - - #[must_use] - pub fn is_empty(&self) -> bool { - self.registrations.is_empty() - } - - #[must_use] - pub fn iter(&self) -> impl Iterator { - self.registrations.iter() - } - - #[must_use] - pub fn get(&self, identifier: &str) -> Option<&NodeRegistration> { - self.registrations - .iter() - .find(|registration| registration.identifier == identifier) - } -} - -impl ArtifactSet { - #[must_use] - pub fn new(files: Vec) -> Self { - Self { files } - } - - #[must_use] - pub fn files(&self) -> &[ArtifactFile] { - &self.files - } - - #[must_use] - pub fn is_empty(&self) -> bool { - self.files.is_empty() - } -} - -/// Artifact payloads indexed by stable node identifier. -#[derive(Debug, Clone, Default)] -pub struct NodeArtifactsCatalog { - nodes: HashMap, -} - -impl NodeArtifactsCatalog { - #[must_use] - pub fn new(nodes: Vec) -> Self { - let nodes = nodes - .into_iter() - .map(|node| (node.identifier.clone(), node)) - .collect(); - - Self { nodes } - } - - #[must_use] - pub fn resolve(&self, identifier: &str) -> Option<&NodeArtifacts> { - self.nodes.get(identifier) - } - - #[must_use] - pub fn len(&self) -> usize { - self.nodes.len() - } - - #[must_use] - pub fn is_empty(&self) -> bool { - self.nodes.is_empty() - } - - #[must_use] - pub fn into_nodes(self) -> Vec { - self.nodes.into_values().collect() - } - - #[doc(hidden)] - #[must_use] - pub fn into_configs(self) -> Vec { - self.into_nodes() - } -} - -/// Adapter-side materialization contract for a single registered node. -pub trait NodeArtifactsMaterializer: Send + Sync { - fn materialize( - &self, - registration: &NodeRegistration, - registrations: &RegistrationSet, - ) -> Result, DynCfgsyncError>; -} - -/// Backward-compatible alias for the previous materializer trait name. -pub trait CfgsyncMaterializer: NodeArtifactsMaterializer {} - -impl CfgsyncMaterializer for T where T: NodeArtifactsMaterializer + ?Sized {} - -/// Adapter contract for materializing a whole registration set into -/// per-node cfgsync artifacts. -pub trait RegistrationSetMaterializer: Send + Sync { - fn materialize_snapshot( - &self, - registrations: &RegistrationSet, - ) -> Result, DynCfgsyncError>; -} - -/// Backward-compatible alias for the previous snapshot materializer trait name. -pub trait CfgsyncSnapshotMaterializer: RegistrationSetMaterializer {} - -impl CfgsyncSnapshotMaterializer for T where T: RegistrationSetMaterializer + ?Sized {} - -impl NodeArtifactsMaterializer for NodeArtifactsCatalog { - fn materialize( - &self, - registration: &NodeRegistration, - _registrations: &RegistrationSet, - ) -> Result, DynCfgsyncError> { - let artifacts = self - .resolve(®istration.identifier) - .map(build_node_artifacts_from_config); - - Ok(artifacts) - } -} - -impl RegistrationSetMaterializer for NodeArtifactsCatalog { - fn materialize_snapshot( - &self, - _registrations: &RegistrationSet, - ) -> Result, DynCfgsyncError> { - Ok(Some(self.clone())) - } -} - -/// Registration-aware provider backed by an adapter materializer. -pub struct RegistrationConfigProvider { - materializer: M, - registrations: Mutex>, -} - -impl RegistrationConfigProvider { - #[must_use] - pub fn new(materializer: M) -> Self { - Self { - materializer, - registrations: Mutex::new(HashMap::new()), - } - } - - fn registration_for(&self, identifier: &str) -> Option { - let registrations = self - .registrations - .lock() - .expect("cfgsync registration store should not be poisoned"); - - registrations.get(identifier).cloned() - } - - fn registration_set(&self) -> RegistrationSet { - let registrations = self - .registrations - .lock() - .expect("cfgsync registration store should not be poisoned"); - - RegistrationSet::new(registrations.values().cloned().collect()) - } -} - -/// Registration-aware provider backed by a snapshot materializer. -pub struct SnapshotConfigProvider { - materializer: M, - registrations: Mutex>, -} - -impl SnapshotConfigProvider { - #[must_use] - pub fn new(materializer: M) -> Self { - Self { - materializer, - registrations: Mutex::new(HashMap::new()), - } - } - - fn registration_for(&self, identifier: &str) -> Option { - let registrations = self - .registrations - .lock() - .expect("cfgsync registration store should not be poisoned"); - - registrations.get(identifier).cloned() - } - - fn registration_set(&self) -> RegistrationSet { - let registrations = self - .registrations - .lock() - .expect("cfgsync registration store should not be poisoned"); - - RegistrationSet::new(registrations.values().cloned().collect()) - } -} - -impl NodeConfigSource for SnapshotConfigProvider -where - M: RegistrationSetMaterializer, -{ - fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse { - let mut registrations = self - .registrations - .lock() - .expect("cfgsync registration store should not be poisoned"); - registrations.insert(registration.identifier.clone(), registration); - - RegisterNodeResponse::Registered - } - - fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse { - let registration = match self.registration_for(®istration.identifier) { - Some(registration) => registration, - None => { - return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready( - ®istration.identifier, - )); - } - }; - - let registrations = self.registration_set(); - let catalog = match self.materializer.materialize_snapshot(®istrations) { - Ok(Some(catalog)) => catalog, - Ok(None) => { - return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready( - ®istration.identifier, - )); - } - Err(error) => { - return ConfigResolveResponse::Error(CfgsyncErrorResponse::internal(format!( - "failed to materialize config snapshot: {error}" - ))); - } - }; - - match catalog.resolve(®istration.identifier) { - Some(config) => ConfigResolveResponse::Config(NodeArtifactsPayload::from_files( - config.files.clone(), - )), - None => ConfigResolveResponse::Error(CfgsyncErrorResponse::missing_config( - ®istration.identifier, - )), - } - } -} - -impl NodeConfigSource for RegistrationConfigProvider -where - M: NodeArtifactsMaterializer, -{ - fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse { - let mut registrations = self - .registrations - .lock() - .expect("cfgsync registration store should not be poisoned"); - registrations.insert(registration.identifier.clone(), registration); - - RegisterNodeResponse::Registered - } - - fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse { - let registration = match self.registration_for(®istration.identifier) { - Some(registration) => registration, - None => { - return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready( - ®istration.identifier, - )); - } - }; - let registrations = self.registration_set(); - - match self.materializer.materialize(®istration, ®istrations) { - Ok(Some(artifacts)) => ConfigResolveResponse::Config(NodeArtifactsPayload::from_files( - artifacts.files().to_vec(), - )), - Ok(None) => ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready( - ®istration.identifier, - )), - Err(error) => ConfigResolveResponse::Error(CfgsyncErrorResponse::internal(format!( - "failed to materialize config for host {}: {error}", - registration.identifier - ))), - } - } -} - -/// Adapter contract for converting an application deployment model into -/// node-specific serialized config payloads. -pub trait CfgsyncEnv { - type Deployment; - type Node; - type NodeConfig; - type Error: Error + Send + Sync + 'static; - - fn nodes(deployment: &Self::Deployment) -> &[Self::Node]; - - fn node_identifier(index: usize, node: &Self::Node) -> String; - - fn build_node_config( - deployment: &Self::Deployment, - node: &Self::Node, - ) -> Result; - - fn rewrite_for_hostnames( - deployment: &Self::Deployment, - node_index: usize, - hostnames: &[String], - config: &mut Self::NodeConfig, - ) -> Result<(), Self::Error>; - - fn serialize_node_config(config: &Self::NodeConfig) -> Result; -} - -/// Preferred public name for application-side cfgsync integration. -pub trait DeploymentAdapter: CfgsyncEnv {} - -impl DeploymentAdapter for T where T: CfgsyncEnv + ?Sized {} - -/// High-level failures while building adapter output for cfgsync. -#[derive(Debug, Error)] -pub enum BuildCfgsyncNodesError { - #[error("cfgsync hostnames mismatch (nodes={nodes}, hostnames={hostnames})")] - HostnameCountMismatch { nodes: usize, hostnames: usize }, - #[error("cfgsync adapter failed: {source}")] - Adapter { - #[source] - source: DynCfgsyncError, - }, -} - -fn adapter_error(source: E) -> BuildCfgsyncNodesError -where - E: Error + Send + Sync + 'static, -{ - BuildCfgsyncNodesError::Adapter { - source: Box::new(source), - } -} - -/// Builds cfgsync node configs for a deployment by: -/// 1) validating hostname count, -/// 2) building each node config, -/// 3) rewriting host references, -/// 4) serializing each node payload. -pub fn build_cfgsync_node_configs( - deployment: &E::Deployment, - hostnames: &[String], -) -> Result, BuildCfgsyncNodesError> { - Ok(build_node_artifact_catalog::(deployment, hostnames)?.into_nodes()) -} - -/// Builds cfgsync node configs and indexes them by stable identifier. -pub fn build_node_artifact_catalog( - deployment: &E::Deployment, - hostnames: &[String], -) -> Result { - let nodes = E::nodes(deployment); - ensure_hostname_count(nodes.len(), hostnames.len())?; - - let mut output = Vec::with_capacity(nodes.len()); - for (index, node) in nodes.iter().enumerate() { - output.push(build_node_entry::(deployment, node, index, hostnames)?); - } - - Ok(NodeArtifactsCatalog::new(output)) -} - -#[doc(hidden)] -pub fn build_cfgsync_node_catalog( - deployment: &E::Deployment, - hostnames: &[String], -) -> Result { - build_node_artifact_catalog::(deployment, hostnames) -} - -fn ensure_hostname_count(nodes: usize, hostnames: usize) -> Result<(), BuildCfgsyncNodesError> { - if nodes != hostnames { - return Err(BuildCfgsyncNodesError::HostnameCountMismatch { nodes, hostnames }); - } - - Ok(()) -} - -fn build_node_entry( - deployment: &E::Deployment, - node: &E::Node, - index: usize, - hostnames: &[String], -) -> Result { - let node_config = build_rewritten_node_config::(deployment, node, index, hostnames)?; - let config_yaml = E::serialize_node_config(&node_config).map_err(adapter_error)?; - - Ok(NodeArtifacts { - identifier: E::node_identifier(index, node), - files: vec![ArtifactFile::new("/config.yaml", &config_yaml)], - }) -} - -fn build_rewritten_node_config( - deployment: &E::Deployment, - node: &E::Node, - index: usize, - hostnames: &[String], -) -> Result { - let mut node_config = E::build_node_config(deployment, node).map_err(adapter_error)?; - E::rewrite_for_hostnames(deployment, index, hostnames, &mut node_config) - .map_err(adapter_error)?; - - Ok(node_config) -} - -fn build_node_artifacts_from_config(config: &NodeArtifacts) -> ArtifactSet { - ArtifactSet::new(config.files.clone()) -} - -#[doc(hidden)] -pub type CfgsyncNodeConfig = NodeArtifacts; - -#[doc(hidden)] -pub type CfgsyncNodeArtifacts = ArtifactSet; - -#[doc(hidden)] -pub type RegistrationSnapshot = RegistrationSet; - -#[doc(hidden)] -pub type CfgsyncNodeCatalog = NodeArtifactsCatalog; - -#[doc(hidden)] -pub type MaterializingConfigProvider = RegistrationConfigProvider; - -#[doc(hidden)] -pub type SnapshotMaterializingConfigProvider = SnapshotConfigProvider; - -#[cfg(test)] -mod tests { - use std::sync::atomic::{AtomicUsize, Ordering}; - - use cfgsync_artifacts::ArtifactFile; - use cfgsync_core::{ - CfgsyncErrorCode, ConfigResolveResponse, NodeConfigSource, NodeRegistration, - }; - - use super::{ - ArtifactSet, DynCfgsyncError, NodeArtifacts, NodeArtifactsCatalog, - NodeArtifactsMaterializer, RegistrationConfigProvider, RegistrationSet, - }; - - #[test] - fn catalog_resolves_identifier() { - let catalog = NodeArtifactsCatalog::new(vec![NodeArtifacts { - identifier: "node-1".to_owned(), - files: vec![ArtifactFile::new("/config.yaml", "key: value")], - }]); - - let node = catalog.resolve("node-1").expect("resolve node config"); - - assert_eq!(node.files[0].content, "key: value"); - } - - #[test] - fn materializing_provider_resolves_registered_node() { - let catalog = NodeArtifactsCatalog::new(vec![NodeArtifacts { - identifier: "node-1".to_owned(), - files: vec![ArtifactFile::new("/config.yaml", "key: value")], - }]); - let provider = RegistrationConfigProvider::new(catalog); - let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")); - - let _ = provider.register(registration.clone()); - - match provider.resolve(®istration) { - ConfigResolveResponse::Config(payload) => { - assert_eq!(payload.files()[0].path, "/config.yaml") - } - ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"), - } - } - - #[test] - fn materializing_provider_reports_not_ready_before_registration() { - let catalog = NodeArtifactsCatalog::new(vec![NodeArtifacts { - identifier: "node-1".to_owned(), - files: vec![ArtifactFile::new("/config.yaml", "key: value")], - }]); - let provider = RegistrationConfigProvider::new(catalog); - let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")); - - match provider.resolve(®istration) { - ConfigResolveResponse::Config(_) => panic!("expected not-ready error"), - ConfigResolveResponse::Error(error) => { - assert!(matches!(error.code, CfgsyncErrorCode::NotReady)) - } - } - } - - struct ThresholdMaterializer { - calls: AtomicUsize, - } - - impl NodeArtifactsMaterializer for ThresholdMaterializer { - fn materialize( - &self, - registration: &NodeRegistration, - registrations: &RegistrationSet, - ) -> Result, DynCfgsyncError> { - self.calls.fetch_add(1, Ordering::SeqCst); - - if registrations.len() < 2 { - return Ok(None); - } - - let peer_count = registrations.iter().count(); - let files = vec![ - ArtifactFile::new("/config.yaml", format!("id: {}", registration.identifier)), - ArtifactFile::new("/shared.yaml", format!("peers: {peer_count}")), - ]; - - Ok(Some(ArtifactSet::new(files))) - } - } - - #[test] - fn materializing_provider_uses_registration_snapshot_for_readiness() { - let provider = RegistrationConfigProvider::new(ThresholdMaterializer { - calls: AtomicUsize::new(0), - }); - let node_a = NodeRegistration::new("node-a", "127.0.0.1".parse().expect("parse ip")); - let node_b = NodeRegistration::new("node-b", "127.0.0.2".parse().expect("parse ip")); - - let _ = provider.register(node_a.clone()); - - match provider.resolve(&node_a) { - ConfigResolveResponse::Config(_) => panic!("expected not-ready error"), - ConfigResolveResponse::Error(error) => { - assert!(matches!(error.code, CfgsyncErrorCode::NotReady)) - } - } - - let _ = provider.register(node_b); - - match provider.resolve(&node_a) { - ConfigResolveResponse::Config(payload) => { - assert_eq!(payload.files()[0].content, "id: node-a"); - assert_eq!(payload.files()[1].content, "peers: 2"); - } - ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"), - } - } -} +pub use materializer::{ + DynCfgsyncError, NodeArtifactsMaterializer, RegistrationSnapshotMaterializer, +}; +pub use registrations::RegistrationSnapshot; +pub use sources::{MaterializingConfigSource, SnapshotConfigSource}; diff --git a/cfgsync/adapter/src/materializer.rs b/cfgsync/adapter/src/materializer.rs new file mode 100644 index 0000000..5680960 --- /dev/null +++ b/cfgsync/adapter/src/materializer.rs @@ -0,0 +1,26 @@ +use std::error::Error; + +use cfgsync_core::NodeRegistration; + +use crate::{ArtifactSet, NodeArtifactsCatalog, RegistrationSnapshot}; + +/// Type-erased cfgsync adapter error used to preserve source context. +pub type DynCfgsyncError = Box; + +/// Adapter-side materialization contract for a single registered node. +pub trait NodeArtifactsMaterializer: Send + Sync { + fn materialize( + &self, + registration: &NodeRegistration, + registrations: &RegistrationSnapshot, + ) -> Result, DynCfgsyncError>; +} + +/// Adapter contract for materializing a whole registration snapshot into +/// per-node cfgsync artifacts. +pub trait RegistrationSnapshotMaterializer: Send + Sync { + fn materialize_snapshot( + &self, + registrations: &RegistrationSnapshot, + ) -> Result, DynCfgsyncError>; +} diff --git a/cfgsync/adapter/src/registrations.rs b/cfgsync/adapter/src/registrations.rs new file mode 100644 index 0000000..3926b57 --- /dev/null +++ b/cfgsync/adapter/src/registrations.rs @@ -0,0 +1,36 @@ +use cfgsync_core::NodeRegistration; + +/// Immutable view of registrations currently known to cfgsync. +#[derive(Debug, Clone, Default)] +pub struct RegistrationSnapshot { + registrations: Vec, +} + +impl RegistrationSnapshot { + #[must_use] + pub fn new(registrations: Vec) -> Self { + Self { registrations } + } + + #[must_use] + pub fn len(&self) -> usize { + self.registrations.len() + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.registrations.is_empty() + } + + #[must_use] + pub fn iter(&self) -> impl Iterator { + self.registrations.iter() + } + + #[must_use] + pub fn get(&self, identifier: &str) -> Option<&NodeRegistration> { + self.registrations + .iter() + .find(|registration| registration.identifier == identifier) + } +} diff --git a/cfgsync/adapter/src/sources.rs b/cfgsync/adapter/src/sources.rs new file mode 100644 index 0000000..5008515 --- /dev/null +++ b/cfgsync/adapter/src/sources.rs @@ -0,0 +1,365 @@ +use std::{collections::HashMap, sync::Mutex}; + +use cfgsync_core::{ + CfgsyncErrorResponse, ConfigResolveResponse, NodeArtifactsPayload, NodeConfigSource, + NodeRegistration, RegisterNodeResponse, +}; + +use crate::{ + ArtifactSet, DynCfgsyncError, NodeArtifactsCatalog, NodeArtifactsMaterializer, + RegistrationSnapshot, RegistrationSnapshotMaterializer, +}; + +impl NodeArtifactsMaterializer for NodeArtifactsCatalog { + fn materialize( + &self, + registration: &NodeRegistration, + _registrations: &RegistrationSnapshot, + ) -> Result, DynCfgsyncError> { + Ok(self + .resolve(®istration.identifier) + .map(build_artifact_set_from_catalog_entry)) + } +} + +impl RegistrationSnapshotMaterializer for NodeArtifactsCatalog { + fn materialize_snapshot( + &self, + _registrations: &RegistrationSnapshot, + ) -> Result, DynCfgsyncError> { + Ok(Some(self.clone())) + } +} + +/// Registration-aware source backed by an adapter materializer. +pub struct MaterializingConfigSource { + materializer: M, + registrations: Mutex>, +} + +impl MaterializingConfigSource { + #[must_use] + pub fn new(materializer: M) -> Self { + Self { + materializer, + registrations: Mutex::new(HashMap::new()), + } + } + + fn registration_for(&self, identifier: &str) -> Option { + let registrations = self + .registrations + .lock() + .expect("cfgsync registration store should not be poisoned"); + + registrations.get(identifier).cloned() + } + + fn registration_snapshot(&self) -> RegistrationSnapshot { + let registrations = self + .registrations + .lock() + .expect("cfgsync registration store should not be poisoned"); + + RegistrationSnapshot::new(registrations.values().cloned().collect()) + } +} + +impl NodeConfigSource for MaterializingConfigSource +where + M: NodeArtifactsMaterializer, +{ + fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse { + let mut registrations = self + .registrations + .lock() + .expect("cfgsync registration store should not be poisoned"); + registrations.insert(registration.identifier.clone(), registration); + + RegisterNodeResponse::Registered + } + + fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse { + let registration = match self.registration_for(®istration.identifier) { + Some(registration) => registration, + None => { + return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready( + ®istration.identifier, + )); + } + }; + let registrations = self.registration_snapshot(); + + match self.materializer.materialize(®istration, ®istrations) { + Ok(Some(artifacts)) => ConfigResolveResponse::Config(NodeArtifactsPayload::from_files( + artifacts.files().to_vec(), + )), + Ok(None) => ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready( + ®istration.identifier, + )), + Err(error) => ConfigResolveResponse::Error(CfgsyncErrorResponse::internal(format!( + "failed to materialize config for host {}: {error}", + registration.identifier + ))), + } + } +} + +/// Registration-aware source backed by a snapshot materializer. +pub struct SnapshotConfigSource { + materializer: M, + registrations: Mutex>, +} + +impl SnapshotConfigSource { + #[must_use] + pub fn new(materializer: M) -> Self { + Self { + materializer, + registrations: Mutex::new(HashMap::new()), + } + } + + fn registration_for(&self, identifier: &str) -> Option { + let registrations = self + .registrations + .lock() + .expect("cfgsync registration store should not be poisoned"); + + registrations.get(identifier).cloned() + } + + fn registration_snapshot(&self) -> RegistrationSnapshot { + let registrations = self + .registrations + .lock() + .expect("cfgsync registration store should not be poisoned"); + + RegistrationSnapshot::new(registrations.values().cloned().collect()) + } +} + +impl NodeConfigSource for SnapshotConfigSource +where + M: RegistrationSnapshotMaterializer, +{ + fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse { + let mut registrations = self + .registrations + .lock() + .expect("cfgsync registration store should not be poisoned"); + registrations.insert(registration.identifier.clone(), registration); + + RegisterNodeResponse::Registered + } + + fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse { + let registration = match self.registration_for(®istration.identifier) { + Some(registration) => registration, + None => { + return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready( + ®istration.identifier, + )); + } + }; + + let registrations = self.registration_snapshot(); + let catalog = match self.materializer.materialize_snapshot(®istrations) { + Ok(Some(catalog)) => catalog, + Ok(None) => { + return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready( + ®istration.identifier, + )); + } + Err(error) => { + return ConfigResolveResponse::Error(CfgsyncErrorResponse::internal(format!( + "failed to materialize config snapshot: {error}" + ))); + } + }; + + match catalog.resolve(®istration.identifier) { + Some(config) => ConfigResolveResponse::Config(NodeArtifactsPayload::from_files( + config.files.clone(), + )), + None => ConfigResolveResponse::Error(CfgsyncErrorResponse::missing_config( + ®istration.identifier, + )), + } + } +} + +fn build_artifact_set_from_catalog_entry(config: &crate::NodeArtifacts) -> ArtifactSet { + ArtifactSet::new(config.files.clone()) +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicUsize, Ordering}; + + use cfgsync_artifacts::ArtifactFile; + use cfgsync_core::{ + CfgsyncErrorCode, ConfigResolveResponse, NodeConfigSource, NodeRegistration, + }; + + use super::{MaterializingConfigSource, SnapshotConfigSource}; + use crate::{ + DynCfgsyncError, NodeArtifacts, NodeArtifactsCatalog, NodeArtifactsMaterializer, + RegistrationSnapshot, RegistrationSnapshotMaterializer, + }; + + #[test] + fn catalog_resolves_identifier() { + let catalog = NodeArtifactsCatalog::new(vec![NodeArtifacts { + identifier: "node-1".to_owned(), + files: vec![ArtifactFile::new("/config.yaml", "key: value")], + }]); + + let node = catalog.resolve("node-1").expect("resolve node config"); + + assert_eq!(node.files[0].content, "key: value"); + } + + #[test] + fn materializing_source_resolves_registered_node() { + let catalog = NodeArtifactsCatalog::new(vec![NodeArtifacts { + identifier: "node-1".to_owned(), + files: vec![ArtifactFile::new("/config.yaml", "key: value")], + }]); + let source = MaterializingConfigSource::new(catalog); + let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")); + + let _ = source.register(registration.clone()); + + match source.resolve(®istration) { + ConfigResolveResponse::Config(payload) => { + assert_eq!(payload.files()[0].path, "/config.yaml") + } + ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"), + } + } + + #[test] + fn materializing_source_reports_not_ready_before_registration() { + let catalog = NodeArtifactsCatalog::new(vec![NodeArtifacts { + identifier: "node-1".to_owned(), + files: vec![ArtifactFile::new("/config.yaml", "key: value")], + }]); + let source = MaterializingConfigSource::new(catalog); + let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")); + + match source.resolve(®istration) { + ConfigResolveResponse::Config(_) => panic!("expected not-ready error"), + ConfigResolveResponse::Error(error) => { + assert!(matches!(error.code, CfgsyncErrorCode::NotReady)) + } + } + } + + struct ThresholdMaterializer { + calls: AtomicUsize, + } + + impl NodeArtifactsMaterializer for ThresholdMaterializer { + fn materialize( + &self, + registration: &NodeRegistration, + registrations: &RegistrationSnapshot, + ) -> Result, DynCfgsyncError> { + self.calls.fetch_add(1, Ordering::SeqCst); + + if registrations.len() < 2 { + return Ok(None); + } + + let peer_count = registrations.iter().count(); + let files = vec![ + ArtifactFile::new("/config.yaml", format!("id: {}", registration.identifier)), + ArtifactFile::new("/peers.txt", peer_count.to_string()), + ]; + + Ok(Some(crate::ArtifactSet::new(files))) + } + } + + #[test] + fn materializing_source_passes_registration_snapshot() { + let source = MaterializingConfigSource::new(ThresholdMaterializer { + calls: AtomicUsize::new(0), + }); + let node_a = NodeRegistration::new("node-a", "127.0.0.1".parse().expect("parse ip")); + let node_b = NodeRegistration::new("node-b", "127.0.0.2".parse().expect("parse ip")); + + let _ = source.register(node_a.clone()); + + match source.resolve(&node_a) { + ConfigResolveResponse::Config(_) => panic!("expected not-ready error"), + ConfigResolveResponse::Error(error) => { + assert!(matches!(error.code, CfgsyncErrorCode::NotReady)) + } + } + + let _ = source.register(node_b); + + match source.resolve(&node_a) { + ConfigResolveResponse::Config(payload) => { + assert_eq!(payload.files()[0].content, "id: node-a"); + assert_eq!(payload.files()[1].content, "2"); + } + ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"), + } + + assert_eq!(source.materializer.calls.load(Ordering::SeqCst), 2); + } + + struct ThresholdSnapshotMaterializer; + + impl RegistrationSnapshotMaterializer for ThresholdSnapshotMaterializer { + fn materialize_snapshot( + &self, + registrations: &RegistrationSnapshot, + ) -> Result, DynCfgsyncError> { + if registrations.len() < 2 { + return Ok(None); + } + + Ok(Some(NodeArtifactsCatalog::new( + registrations + .iter() + .map(|registration| NodeArtifacts { + identifier: registration.identifier.clone(), + files: vec![ArtifactFile::new( + "/config.yaml", + format!("peer_count: {}", registrations.len()), + )], + }) + .collect(), + ))) + } + } + + #[test] + fn snapshot_source_materializes_from_registration_snapshot() { + let source = SnapshotConfigSource::new(ThresholdSnapshotMaterializer); + let node_a = NodeRegistration::new("node-a", "127.0.0.1".parse().expect("parse ip")); + let node_b = NodeRegistration::new("node-b", "127.0.0.2".parse().expect("parse ip")); + + let _ = source.register(node_a.clone()); + + match source.resolve(&node_a) { + ConfigResolveResponse::Config(_) => panic!("expected not-ready error"), + ConfigResolveResponse::Error(error) => { + assert!(matches!(error.code, CfgsyncErrorCode::NotReady)) + } + } + + let _ = source.register(node_b); + + match source.resolve(&node_a) { + ConfigResolveResponse::Config(payload) => { + assert_eq!(payload.files()[0].content, "peer_count: 2"); + } + ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"), + } + } +} diff --git a/cfgsync/core/src/bundle.rs b/cfgsync/core/src/bundle.rs index d8f6cab..5281b13 100644 --- a/cfgsync/core/src/bundle.rs +++ b/cfgsync/core/src/bundle.rs @@ -24,9 +24,3 @@ pub struct NodeArtifactsBundleEntry { #[serde(default)] pub files: Vec, } - -#[doc(hidden)] -pub type CfgSyncBundle = NodeArtifactsBundle; - -#[doc(hidden)] -pub type CfgSyncBundleNode = NodeArtifactsBundleEntry; diff --git a/cfgsync/core/src/client.rs b/cfgsync/core/src/client.rs index e599c35..4032c35 100644 --- a/cfgsync/core/src/client.rs +++ b/cfgsync/core/src/client.rs @@ -1,7 +1,7 @@ use serde::Serialize; use thiserror::Error; -use crate::repo::{CfgsyncErrorCode, CfgsyncErrorResponse, NodeArtifactsPayload, NodeRegistration}; +use crate::{CfgsyncErrorCode, CfgsyncErrorResponse, NodeArtifactsPayload, NodeRegistration}; /// cfgsync client-side request/response failures. #[derive(Debug, Error)] @@ -63,14 +63,6 @@ impl CfgsyncClient { self.post_json("/node", payload).await } - /// Fetches `/init-with-node` payload for a node identifier. - pub async fn fetch_init_with_node_config( - &self, - payload: &NodeRegistration, - ) -> Result { - self.post_json("/init-with-node", payload).await - } - pub async fn fetch_node_config_status( &self, payload: &NodeRegistration, @@ -155,6 +147,3 @@ impl CfgsyncClient { } } } - -#[doc(hidden)] -pub type CfgSyncClient = CfgsyncClient; diff --git a/cfgsync/core/src/compat.rs b/cfgsync/core/src/compat.rs new file mode 100644 index 0000000..ea1cb17 --- /dev/null +++ b/cfgsync/core/src/compat.rs @@ -0,0 +1,20 @@ +#![doc(hidden)] + +pub use crate::{ + bundle::{NodeArtifactsBundle as CfgSyncBundle, NodeArtifactsBundleEntry as CfgSyncBundleNode}, + client::CfgsyncClient as CfgSyncClient, + protocol::{ + CfgsyncErrorCode as CfgSyncErrorCode, CfgsyncErrorResponse as CfgSyncErrorResponse, + ConfigResolveResponse as RepoResponse, NodeArtifactFile as CfgSyncFile, + NodeArtifactsPayload as CfgSyncPayload, RegisterNodeResponse as RegistrationResponse, + }, + server::{ + CfgsyncServerState as CfgSyncState, build_legacy_cfgsync_router as cfgsync_app, + serve_cfgsync as run_cfgsync, + }, + source::{ + BundleConfigSource as FileConfigProvider, + BundleConfigSourceError as FileConfigProviderError, NodeConfigSource as ConfigProvider, + StaticConfigSource as ConfigRepo, + }, +}; diff --git a/cfgsync/core/src/lib.rs b/cfgsync/core/src/lib.rs index aaac4c7..b7664bb 100644 --- a/cfgsync/core/src/lib.rs +++ b/cfgsync/core/src/lib.rs @@ -1,33 +1,26 @@ pub mod bundle; pub mod client; +#[doc(hidden)] +pub mod compat; +pub mod protocol; pub mod render; -pub mod repo; pub mod server; +pub mod source; -#[doc(hidden)] -pub use bundle::{CfgSyncBundle, CfgSyncBundleNode}; pub use bundle::{NodeArtifactsBundle, NodeArtifactsBundleEntry}; -#[doc(hidden)] -pub use client::CfgSyncClient; pub use client::{CfgsyncClient, ClientError, ConfigFetchStatus}; +pub use protocol::{ + CFGSYNC_SCHEMA_VERSION, CfgsyncErrorCode, CfgsyncErrorResponse, ConfigResolveResponse, + NodeArtifactFile, NodeArtifactsPayload, NodeRegistration, RegisterNodeResponse, + RegistrationPayload, +}; pub use render::{ CfgsyncConfigOverrides, CfgsyncOutputPaths, RenderedCfgsync, apply_cfgsync_overrides, apply_timeout_floor, ensure_bundle_path, load_cfgsync_template_yaml, render_cfgsync_yaml_from_template, write_rendered_cfgsync, }; -pub use repo::{ - BundleConfigSource, BundleConfigSourceError, CFGSYNC_SCHEMA_VERSION, CfgsyncErrorCode, - CfgsyncErrorResponse, ConfigResolveResponse, NodeArtifactFile, NodeArtifactsPayload, - NodeConfigSource, NodeRegistration, RegisterNodeResponse, RegistrationPayload, - StaticConfigSource, -}; -#[doc(hidden)] -pub use repo::{ - CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile, CfgSyncPayload, ConfigProvider, - ConfigRepo, FileConfigProvider, FileConfigProviderError, RegistrationResponse, RepoResponse, -}; -#[doc(hidden)] -pub use server::CfgSyncState; pub use server::{CfgsyncServerState, RunCfgsyncError, build_cfgsync_router, serve_cfgsync}; -#[doc(hidden)] -pub use server::{cfgsync_app, run_cfgsync}; +pub use source::{ + BundleConfigSource, BundleConfigSourceError, BundleLoadError, NodeConfigSource, + StaticConfigSource, bundle_to_payload_map, load_bundle, +}; diff --git a/cfgsync/core/src/protocol.rs b/cfgsync/core/src/protocol.rs new file mode 100644 index 0000000..ca8f205 --- /dev/null +++ b/cfgsync/core/src/protocol.rs @@ -0,0 +1,258 @@ +use std::net::Ipv4Addr; + +use cfgsync_artifacts::ArtifactFile; +use serde::{Deserialize, Deserializer, Serialize, Serializer, de::DeserializeOwned}; +use serde_json::Value; +use thiserror::Error; + +/// Schema version served by cfgsync payload responses. +pub const CFGSYNC_SCHEMA_VERSION: u16 = 1; + +/// Canonical cfgsync file type used in payloads and bundles. +pub type NodeArtifactFile = ArtifactFile; + +/// Payload returned by cfgsync server for one node. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodeArtifactsPayload { + /// Payload schema version for compatibility checks. + pub schema_version: u16, + /// Files that must be written on the target node. + #[serde(default)] + pub files: Vec, +} + +/// Adapter-owned registration payload stored alongside a generic node identity. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct RegistrationPayload { + raw_json: Option, +} + +impl RegistrationPayload { + #[must_use] + pub fn new() -> Self { + Self::default() + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.raw_json.is_none() + } + + pub fn from_serializable(value: &T) -> Result + where + T: Serialize, + { + Ok(Self { + raw_json: Some(serde_json::to_string(value)?), + }) + } + + pub fn from_json_str(raw_json: &str) -> Result { + let value: Value = serde_json::from_str(raw_json)?; + + Ok(Self { + raw_json: Some(serde_json::to_string(&value)?), + }) + } + + pub fn deserialize(&self) -> Result, serde_json::Error> + where + T: DeserializeOwned, + { + self.raw_json + .as_ref() + .map(|raw_json| serde_json::from_str(raw_json)) + .transpose() + } + + #[must_use] + pub fn raw_json(&self) -> Option<&str> { + self.raw_json.as_deref() + } +} + +impl Serialize for RegistrationPayload { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self.raw_json.as_deref() { + Some(raw_json) => { + let value: Value = + serde_json::from_str(raw_json).map_err(serde::ser::Error::custom)?; + value.serialize(serializer) + } + None => serializer.serialize_none(), + } + } +} + +impl<'de> Deserialize<'de> for RegistrationPayload { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let value = Option::::deserialize(deserializer)?; + let raw_json = value + .map(|value| serde_json::to_string(&value).map_err(serde::de::Error::custom)) + .transpose()?; + + Ok(Self { raw_json }) + } +} + +/// Node metadata recorded before config materialization. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct NodeRegistration { + pub identifier: String, + pub ip: Ipv4Addr, + #[serde(default, skip_serializing_if = "RegistrationPayload::is_empty")] + pub metadata: RegistrationPayload, +} + +impl NodeRegistration { + #[must_use] + pub fn new(identifier: impl Into, ip: Ipv4Addr) -> Self { + Self { + identifier: identifier.into(), + ip, + metadata: RegistrationPayload::default(), + } + } + + pub fn with_metadata(mut self, metadata: &T) -> Result + where + T: Serialize, + { + self.metadata = RegistrationPayload::from_serializable(metadata)?; + Ok(self) + } + + #[must_use] + pub fn with_payload(mut self, payload: RegistrationPayload) -> Self { + self.metadata = payload; + self + } +} + +impl NodeArtifactsPayload { + #[must_use] + pub fn from_files(files: Vec) -> Self { + Self { + schema_version: CFGSYNC_SCHEMA_VERSION, + files, + } + } + + #[must_use] + pub fn files(&self) -> &[NodeArtifactFile] { + &self.files + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.files.is_empty() + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum CfgsyncErrorCode { + MissingConfig, + NotReady, + Internal, +} + +/// Structured error body returned by cfgsync server. +#[derive(Debug, Clone, Serialize, Deserialize, Error)] +#[error("{code:?}: {message}")] +pub struct CfgsyncErrorResponse { + pub code: CfgsyncErrorCode, + pub message: String, +} + +impl CfgsyncErrorResponse { + #[must_use] + pub fn missing_config(identifier: &str) -> Self { + Self { + code: CfgsyncErrorCode::MissingConfig, + message: format!("missing config for host {identifier}"), + } + } + + #[must_use] + pub fn not_ready(identifier: &str) -> Self { + Self { + code: CfgsyncErrorCode::NotReady, + message: format!("config for host {identifier} is not ready"), + } + } + + #[must_use] + pub fn internal(message: impl Into) -> Self { + Self { + code: CfgsyncErrorCode::Internal, + message: message.into(), + } + } +} + +/// Resolution outcome for a requested node identifier. +pub enum ConfigResolveResponse { + Config(NodeArtifactsPayload), + Error(CfgsyncErrorResponse), +} + +/// Outcome for a node registration request. +pub enum RegisterNodeResponse { + Registered, + Error(CfgsyncErrorResponse), +} + +#[cfg(test)] +mod tests { + use serde::{Deserialize, Serialize}; + use serde_json::Value; + + use super::{NodeRegistration, RegistrationPayload}; + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + struct ExampleRegistration { + network_port: u16, + service: String, + } + + #[test] + fn registration_payload_round_trips_typed_value() { + let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")) + .with_metadata(&ExampleRegistration { + network_port: 3000, + service: "blend".to_owned(), + }) + .expect("serialize registration metadata"); + + let encoded = serde_json::to_value(®istration).expect("serialize registration"); + let metadata = encoded.get("metadata").expect("registration metadata"); + assert_eq!(metadata.get("network_port"), Some(&Value::from(3000u16))); + assert_eq!(metadata.get("service"), Some(&Value::from("blend"))); + + let decoded: NodeRegistration = + serde_json::from_value(encoded).expect("deserialize registration"); + let typed: ExampleRegistration = decoded + .metadata + .deserialize() + .expect("deserialize metadata") + .expect("registration metadata value"); + + assert_eq!(typed.network_port, 3000); + assert_eq!(typed.service, "blend"); + } + + #[test] + fn registration_payload_accepts_raw_json() { + let payload = + RegistrationPayload::from_json_str(r#"{"network_port":3000}"#).expect("parse raw json"); + + assert_eq!(payload.raw_json(), Some(r#"{"network_port":3000}"#)); + } +} diff --git a/cfgsync/core/src/repo.rs b/cfgsync/core/src/repo.rs deleted file mode 100644 index 6652367..0000000 --- a/cfgsync/core/src/repo.rs +++ /dev/null @@ -1,523 +0,0 @@ -use std::{collections::HashMap, fs, net::Ipv4Addr, path::Path, sync::Arc}; - -use cfgsync_artifacts::ArtifactFile; -use serde::{Deserialize, Deserializer, Serialize, Serializer, de::DeserializeOwned}; -use serde_json::Value; -use thiserror::Error; - -use crate::{NodeArtifactsBundle, NodeArtifactsBundleEntry}; - -/// Schema version served by cfgsync payload responses. -pub const CFGSYNC_SCHEMA_VERSION: u16 = 1; - -/// Canonical cfgsync file type used in payloads and bundles. -pub type NodeArtifactFile = ArtifactFile; - -/// Payload returned by cfgsync server for one node. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct NodeArtifactsPayload { - /// Payload schema version for compatibility checks. - pub schema_version: u16, - /// Files that must be written on the target node. - #[serde(default)] - pub files: Vec, -} - -/// Adapter-owned registration payload stored alongside a generic node identity. -#[derive(Debug, Clone, Default, PartialEq, Eq)] -pub struct RegistrationPayload { - raw_json: Option, -} - -impl RegistrationPayload { - #[must_use] - pub fn new() -> Self { - Self::default() - } - - #[must_use] - pub fn is_empty(&self) -> bool { - self.raw_json.is_none() - } - - pub fn from_serializable(value: &T) -> Result - where - T: Serialize, - { - Ok(Self { - raw_json: Some(serde_json::to_string(value)?), - }) - } - - pub fn from_json_str(raw_json: &str) -> Result { - let value: Value = serde_json::from_str(raw_json)?; - - Ok(Self { - raw_json: Some(serde_json::to_string(&value)?), - }) - } - - pub fn deserialize(&self) -> Result, serde_json::Error> - where - T: DeserializeOwned, - { - self.raw_json - .as_ref() - .map(|raw_json| serde_json::from_str(raw_json)) - .transpose() - } - - #[must_use] - pub fn raw_json(&self) -> Option<&str> { - self.raw_json.as_deref() - } -} - -impl Serialize for RegistrationPayload { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - match self.raw_json.as_deref() { - Some(raw_json) => { - let value: Value = - serde_json::from_str(raw_json).map_err(serde::ser::Error::custom)?; - value.serialize(serializer) - } - None => serializer.serialize_none(), - } - } -} - -impl<'de> Deserialize<'de> for RegistrationPayload { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let value = Option::::deserialize(deserializer)?; - let raw_json = value - .map(|value| serde_json::to_string(&value).map_err(serde::de::Error::custom)) - .transpose()?; - - Ok(Self { raw_json }) - } -} - -/// Node metadata recorded before config materialization. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct NodeRegistration { - pub identifier: String, - pub ip: Ipv4Addr, - #[serde(default, skip_serializing_if = "RegistrationPayload::is_empty")] - pub metadata: RegistrationPayload, -} - -impl NodeRegistration { - #[must_use] - pub fn new(identifier: impl Into, ip: Ipv4Addr) -> Self { - Self { - identifier: identifier.into(), - ip, - metadata: RegistrationPayload::default(), - } - } - - pub fn with_metadata(mut self, metadata: &T) -> Result - where - T: Serialize, - { - self.metadata = RegistrationPayload::from_serializable(metadata)?; - Ok(self) - } - - #[must_use] - pub fn with_payload(mut self, payload: RegistrationPayload) -> Self { - self.metadata = payload; - self - } -} - -impl NodeArtifactsPayload { - #[must_use] - pub fn from_files(files: Vec) -> Self { - Self { - schema_version: CFGSYNC_SCHEMA_VERSION, - files, - } - } - - #[must_use] - pub fn files(&self) -> &[NodeArtifactFile] { - &self.files - } - - #[must_use] - pub fn is_empty(&self) -> bool { - self.files.is_empty() - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum CfgsyncErrorCode { - MissingConfig, - NotReady, - Internal, -} - -/// Structured error body returned by cfgsync server. -#[derive(Debug, Clone, Serialize, Deserialize, Error)] -#[error("{code:?}: {message}")] -pub struct CfgsyncErrorResponse { - pub code: CfgsyncErrorCode, - pub message: String, -} - -impl CfgsyncErrorResponse { - #[must_use] - pub fn missing_config(identifier: &str) -> Self { - Self { - code: CfgsyncErrorCode::MissingConfig, - message: format!("missing config for host {identifier}"), - } - } - - #[must_use] - pub fn not_ready(identifier: &str) -> Self { - Self { - code: CfgsyncErrorCode::NotReady, - message: format!("config for host {identifier} is not ready"), - } - } - - #[must_use] - pub fn internal(message: impl Into) -> Self { - Self { - code: CfgsyncErrorCode::Internal, - message: message.into(), - } - } -} - -/// Resolution outcome for a requested node identifier. -pub enum ConfigResolveResponse { - Config(NodeArtifactsPayload), - Error(CfgsyncErrorResponse), -} - -/// Outcome for a node registration request. -pub enum RegisterNodeResponse { - Registered, - Error(CfgsyncErrorResponse), -} - -/// Source of cfgsync node payloads. -pub trait NodeConfigSource: Send + Sync { - fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse; - - fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse; -} - -/// In-memory map-backed source used by cfgsync server state. -pub struct StaticConfigSource { - configs: HashMap, -} - -impl StaticConfigSource { - #[must_use] - pub fn from_bundle(configs: HashMap) -> Arc { - Arc::new(Self { configs }) - } -} - -impl NodeConfigSource for StaticConfigSource { - fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse { - if self.configs.contains_key(®istration.identifier) { - RegisterNodeResponse::Registered - } else { - RegisterNodeResponse::Error(CfgsyncErrorResponse::missing_config( - ®istration.identifier, - )) - } - } - - fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse { - self.configs - .get(®istration.identifier) - .cloned() - .map_or_else( - || { - ConfigResolveResponse::Error(CfgsyncErrorResponse::missing_config( - ®istration.identifier, - )) - }, - ConfigResolveResponse::Config, - ) - } -} - -#[derive(Debug, Error)] -pub enum BundleLoadError { - #[error("reading cfgsync bundle {path}: {source}")] - ReadBundle { - path: String, - #[source] - source: std::io::Error, - }, - #[error("parsing cfgsync bundle {path}: {source}")] - ParseBundle { - path: String, - #[source] - source: serde_yaml::Error, - }, -} - -#[must_use] -pub fn bundle_to_payload_map(bundle: NodeArtifactsBundle) -> HashMap { - bundle - .nodes - .into_iter() - .map(|node| { - let NodeArtifactsBundleEntry { identifier, files } = node; - - (identifier, NodeArtifactsPayload::from_files(files)) - }) - .collect() -} - -pub fn load_bundle(path: &Path) -> Result { - let path_string = path.display().to_string(); - let raw = fs::read_to_string(path).map_err(|source| BundleLoadError::ReadBundle { - path: path_string.clone(), - source, - })?; - serde_yaml::from_str(&raw).map_err(|source| BundleLoadError::ParseBundle { - path: path_string, - source, - }) -} - -#[cfg(test)] -mod tests { - use std::io::Write as _; - - use tempfile::NamedTempFile; - - use super::*; - - #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] - struct ExampleRegistration { - network_port: u16, - service: String, - } - - #[test] - fn registration_payload_round_trips_typed_value() { - let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")) - .with_metadata(&ExampleRegistration { - network_port: 3000, - service: "blend".to_owned(), - }) - .expect("serialize registration metadata"); - - let encoded = serde_json::to_value(®istration).expect("serialize registration"); - let metadata = encoded.get("metadata").expect("registration metadata"); - assert_eq!(metadata.get("network_port"), Some(&Value::from(3000u16))); - assert_eq!(metadata.get("service"), Some(&Value::from("blend"))); - - let decoded: NodeRegistration = - serde_json::from_value(encoded).expect("deserialize registration"); - let typed: ExampleRegistration = decoded - .metadata - .deserialize() - .expect("deserialize metadata") - .expect("registration metadata value"); - - assert_eq!(typed.network_port, 3000); - assert_eq!(typed.service, "blend"); - } - - fn sample_payload() -> NodeArtifactsPayload { - NodeArtifactsPayload::from_files(vec![NodeArtifactFile::new("/config.yaml", "key: value")]) - } - - #[test] - fn resolves_existing_identifier() { - let mut configs = HashMap::new(); - configs.insert("node-1".to_owned(), sample_payload()); - let repo = StaticConfigSource { configs }; - - match repo.resolve(&NodeRegistration::new( - "node-1", - "127.0.0.1".parse().expect("parse ip"), - )) { - ConfigResolveResponse::Config(payload) => { - assert_eq!(payload.schema_version, CFGSYNC_SCHEMA_VERSION); - assert_eq!(payload.files.len(), 1); - assert_eq!(payload.files[0].path, "/config.yaml"); - } - ConfigResolveResponse::Error(error) => panic!("expected config response, got {error}"), - } - } - - #[test] - fn reports_missing_identifier() { - let repo = StaticConfigSource { - configs: HashMap::new(), - }; - - match repo.resolve(&NodeRegistration::new( - "unknown-node", - "127.0.0.1".parse().expect("parse ip"), - )) { - ConfigResolveResponse::Config(_) => panic!("expected missing-config error"), - ConfigResolveResponse::Error(error) => { - assert!(matches!(error.code, CfgsyncErrorCode::MissingConfig)); - assert!(error.message.contains("unknown-node")); - } - } - } - - #[test] - fn loads_file_provider_bundle() { - let mut bundle_file = NamedTempFile::new().expect("create temp bundle"); - let yaml = r#" -nodes: - - identifier: node-1 - files: - - path: /config.yaml - content: "a: 1" -"#; - bundle_file - .write_all(yaml.as_bytes()) - .expect("write bundle yaml"); - - let provider = - BundleConfigSource::from_yaml_file(bundle_file.path()).expect("load file provider"); - - let _ = provider.register(NodeRegistration::new( - "node-1", - "127.0.0.1".parse().expect("parse ip"), - )); - - match provider.resolve(&NodeRegistration::new( - "node-1", - "127.0.0.1".parse().expect("parse ip"), - )) { - ConfigResolveResponse::Config(payload) => assert_eq!(payload.files.len(), 1), - ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"), - } - } - - #[test] - fn resolve_accepts_known_registration_without_gating() { - let mut configs = HashMap::new(); - configs.insert("node-1".to_owned(), sample_payload()); - let repo = StaticConfigSource { configs }; - - match repo.resolve(&NodeRegistration::new( - "node-1", - "127.0.0.1".parse().expect("parse ip"), - )) { - ConfigResolveResponse::Config(_) => {} - ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"), - } - } -} - -/// Failures when loading a bundle-backed cfgsync source. -#[derive(Debug, Error)] -pub enum BundleConfigSourceError { - #[error("failed to read cfgsync bundle at {path}: {source}")] - Read { - path: String, - #[source] - source: std::io::Error, - }, - #[error("failed to parse cfgsync bundle at {path}: {source}")] - Parse { - path: String, - #[source] - source: serde_yaml::Error, - }, -} - -/// YAML bundle-backed source implementation. -pub struct BundleConfigSource { - inner: StaticConfigSource, -} - -impl BundleConfigSource { - /// Loads provider state from a cfgsync bundle YAML file. - pub fn from_yaml_file(path: &Path) -> Result { - let raw = fs::read_to_string(path).map_err(|source| BundleConfigSourceError::Read { - path: path.display().to_string(), - source, - })?; - - let bundle: NodeArtifactsBundle = - serde_yaml::from_str(&raw).map_err(|source| BundleConfigSourceError::Parse { - path: path.display().to_string(), - source, - })?; - - let configs = bundle - .nodes - .into_iter() - .map(payload_from_bundle_node) - .collect(); - - Ok(Self { - inner: StaticConfigSource { configs }, - }) - } -} - -impl NodeConfigSource for BundleConfigSource { - fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse { - self.inner.register(registration) - } - - fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse { - self.inner.resolve(registration) - } -} - -fn payload_from_bundle_node(node: NodeArtifactsBundleEntry) -> (String, NodeArtifactsPayload) { - ( - node.identifier, - NodeArtifactsPayload::from_files(node.files), - ) -} - -#[doc(hidden)] -pub type RepoResponse = ConfigResolveResponse; - -#[doc(hidden)] -pub type RegistrationResponse = RegisterNodeResponse; - -#[doc(hidden)] -pub trait ConfigProvider: NodeConfigSource {} - -impl ConfigProvider for T {} - -#[doc(hidden)] -pub type ConfigRepo = StaticConfigSource; - -#[doc(hidden)] -pub type FileConfigProvider = BundleConfigSource; - -#[doc(hidden)] -pub type FileConfigProviderError = BundleConfigSourceError; - -#[doc(hidden)] -pub type CfgSyncFile = NodeArtifactFile; - -#[doc(hidden)] -pub type CfgSyncPayload = NodeArtifactsPayload; - -#[doc(hidden)] -pub type CfgSyncErrorCode = CfgsyncErrorCode; - -#[doc(hidden)] -pub type CfgSyncErrorResponse = CfgsyncErrorResponse; diff --git a/cfgsync/core/src/server.rs b/cfgsync/core/src/server.rs index 59970e1..3a82a17 100644 --- a/cfgsync/core/src/server.rs +++ b/cfgsync/core/src/server.rs @@ -3,7 +3,7 @@ use std::{io, sync::Arc}; use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::post}; use thiserror::Error; -use crate::repo::{ +use crate::{ CfgsyncErrorCode, ConfigResolveResponse, NodeConfigSource, NodeRegistration, RegisterNodeResponse, }; @@ -84,6 +84,14 @@ fn error_status(code: &CfgsyncErrorCode) -> StatusCode { } pub fn build_cfgsync_router(state: CfgsyncServerState) -> Router { + Router::new() + .route("/register", post(register_node)) + .route("/node", post(node_config)) + .with_state(Arc::new(state)) +} + +#[doc(hidden)] +pub fn build_legacy_cfgsync_router(state: CfgsyncServerState) -> Router { Router::new() .route("/register", post(register_node)) .route("/node", post(node_config)) @@ -108,14 +116,6 @@ pub async fn serve_cfgsync(port: u16, state: CfgsyncServerState) -> Result<(), R Ok(()) } -#[doc(hidden)] -pub type CfgSyncState = CfgsyncServerState; - -#[doc(hidden)] -pub use build_cfgsync_router as cfgsync_app; -#[doc(hidden)] -pub use serve_cfgsync as run_cfgsync; - #[cfg(test)] mod tests { use std::{collections::HashMap, sync::Arc}; @@ -123,7 +123,7 @@ mod tests { use axum::{Json, extract::State, http::StatusCode, response::IntoResponse}; use super::{CfgsyncServerState, NodeRegistration, node_config, register_node}; - use crate::repo::{ + use crate::{ CFGSYNC_SCHEMA_VERSION, CfgsyncErrorCode, CfgsyncErrorResponse, ConfigResolveResponse, NodeArtifactFile, NodeArtifactsPayload, NodeConfigSource, RegisterNodeResponse, }; diff --git a/cfgsync/core/src/source.rs b/cfgsync/core/src/source.rs new file mode 100644 index 0000000..7981343 --- /dev/null +++ b/cfgsync/core/src/source.rs @@ -0,0 +1,264 @@ +use std::{collections::HashMap, fs, path::Path, sync::Arc}; + +use thiserror::Error; + +use crate::{ + NodeArtifactsBundle, NodeArtifactsBundleEntry, NodeArtifactsPayload, NodeRegistration, + RegisterNodeResponse, protocol::ConfigResolveResponse, +}; + +/// Source of cfgsync node payloads. +pub trait NodeConfigSource: Send + Sync { + fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse; + + fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse; +} + +/// In-memory map-backed source used by cfgsync server state. +pub struct StaticConfigSource { + configs: HashMap, +} + +impl StaticConfigSource { + #[must_use] + pub fn from_payloads(configs: HashMap) -> Arc { + Arc::new(Self { configs }) + } + + #[must_use] + pub fn from_bundle(bundle: NodeArtifactsBundle) -> Arc { + Self::from_payloads(bundle_to_payload_map(bundle)) + } +} + +impl NodeConfigSource for StaticConfigSource { + fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse { + if self.configs.contains_key(®istration.identifier) { + RegisterNodeResponse::Registered + } else { + RegisterNodeResponse::Error(crate::CfgsyncErrorResponse::missing_config( + ®istration.identifier, + )) + } + } + + fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse { + self.configs + .get(®istration.identifier) + .cloned() + .map_or_else( + || { + ConfigResolveResponse::Error(crate::CfgsyncErrorResponse::missing_config( + ®istration.identifier, + )) + }, + ConfigResolveResponse::Config, + ) + } +} + +#[derive(Debug, Error)] +pub enum BundleLoadError { + #[error("reading cfgsync bundle {path}: {source}")] + ReadBundle { + path: String, + #[source] + source: std::io::Error, + }, + #[error("parsing cfgsync bundle {path}: {source}")] + ParseBundle { + path: String, + #[source] + source: serde_yaml::Error, + }, +} + +#[must_use] +pub fn bundle_to_payload_map(bundle: NodeArtifactsBundle) -> HashMap { + bundle + .nodes + .into_iter() + .map(|node| { + let NodeArtifactsBundleEntry { identifier, files } = node; + + (identifier, NodeArtifactsPayload::from_files(files)) + }) + .collect() +} + +pub fn load_bundle(path: &Path) -> Result { + let path_string = path.display().to_string(); + let raw = fs::read_to_string(path).map_err(|source| BundleLoadError::ReadBundle { + path: path_string.clone(), + source, + })?; + serde_yaml::from_str(&raw).map_err(|source| BundleLoadError::ParseBundle { + path: path_string, + source, + }) +} + +/// Failures when loading a bundle-backed cfgsync source. +#[derive(Debug, Error)] +pub enum BundleConfigSourceError { + #[error("failed to read cfgsync bundle at {path}: {source}")] + Read { + path: String, + #[source] + source: std::io::Error, + }, + #[error("failed to parse cfgsync bundle at {path}: {source}")] + Parse { + path: String, + #[source] + source: serde_yaml::Error, + }, +} + +/// YAML bundle-backed source implementation. +pub struct BundleConfigSource { + inner: StaticConfigSource, +} + +impl BundleConfigSource { + /// Loads source state from a cfgsync bundle YAML file. + pub fn from_yaml_file(path: &Path) -> Result { + let raw = fs::read_to_string(path).map_err(|source| BundleConfigSourceError::Read { + path: path.display().to_string(), + source, + })?; + + let bundle: NodeArtifactsBundle = + serde_yaml::from_str(&raw).map_err(|source| BundleConfigSourceError::Parse { + path: path.display().to_string(), + source, + })?; + + let configs = bundle + .nodes + .into_iter() + .map(payload_from_bundle_node) + .collect(); + + Ok(Self { + inner: StaticConfigSource { configs }, + }) + } +} + +impl NodeConfigSource for BundleConfigSource { + fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse { + self.inner.register(registration) + } + + fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse { + self.inner.resolve(registration) + } +} + +fn payload_from_bundle_node(node: NodeArtifactsBundleEntry) -> (String, NodeArtifactsPayload) { + ( + node.identifier, + NodeArtifactsPayload::from_files(node.files), + ) +} + +#[cfg(test)] +mod tests { + use std::{collections::HashMap, io::Write as _}; + + use tempfile::NamedTempFile; + + use super::{BundleConfigSource, StaticConfigSource}; + use crate::{ + CFGSYNC_SCHEMA_VERSION, CfgsyncErrorCode, ConfigResolveResponse, NodeArtifactFile, + NodeArtifactsPayload, NodeConfigSource, NodeRegistration, + }; + + fn sample_payload() -> NodeArtifactsPayload { + NodeArtifactsPayload::from_files(vec![NodeArtifactFile::new("/config.yaml", "key: value")]) + } + + #[test] + fn resolves_existing_identifier() { + let mut configs = HashMap::new(); + configs.insert("node-1".to_owned(), sample_payload()); + let repo = StaticConfigSource { configs }; + + match repo.resolve(&NodeRegistration::new( + "node-1", + "127.0.0.1".parse().expect("parse ip"), + )) { + ConfigResolveResponse::Config(payload) => { + assert_eq!(payload.schema_version, CFGSYNC_SCHEMA_VERSION); + assert_eq!(payload.files.len(), 1); + assert_eq!(payload.files[0].path, "/config.yaml"); + } + ConfigResolveResponse::Error(error) => panic!("expected config response, got {error}"), + } + } + + #[test] + fn reports_missing_identifier() { + let repo = StaticConfigSource { + configs: HashMap::new(), + }; + + match repo.resolve(&NodeRegistration::new( + "unknown-node", + "127.0.0.1".parse().expect("parse ip"), + )) { + ConfigResolveResponse::Config(_) => panic!("expected missing-config error"), + ConfigResolveResponse::Error(error) => { + assert!(matches!(error.code, CfgsyncErrorCode::MissingConfig)); + assert!(error.message.contains("unknown-node")); + } + } + } + + #[test] + fn loads_file_provider_bundle() { + let mut bundle_file = NamedTempFile::new().expect("create temp bundle"); + let yaml = r#" +nodes: + - identifier: node-1 + files: + - path: /config.yaml + content: "a: 1" +"#; + bundle_file + .write_all(yaml.as_bytes()) + .expect("write bundle yaml"); + + let provider = + BundleConfigSource::from_yaml_file(bundle_file.path()).expect("load file provider"); + + let _ = provider.register(NodeRegistration::new( + "node-1", + "127.0.0.1".parse().expect("parse ip"), + )); + + match provider.resolve(&NodeRegistration::new( + "node-1", + "127.0.0.1".parse().expect("parse ip"), + )) { + ConfigResolveResponse::Config(payload) => assert_eq!(payload.files.len(), 1), + ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"), + } + } + + #[test] + fn resolve_accepts_known_registration_without_gating() { + let mut configs = HashMap::new(); + configs.insert("node-1".to_owned(), sample_payload()); + let repo = StaticConfigSource { configs }; + + match repo.resolve(&NodeRegistration::new( + "node-1", + "127.0.0.1".parse().expect("parse ip"), + )) { + ConfigResolveResponse::Config(_) => {} + ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"), + } + } +} diff --git a/cfgsync/runtime/src/bin/cfgsync-server.rs b/cfgsync/runtime/src/bin/cfgsync-server.rs index 28134ea..a719044 100644 --- a/cfgsync/runtime/src/bin/cfgsync-server.rs +++ b/cfgsync/runtime/src/bin/cfgsync-server.rs @@ -1,10 +1,10 @@ use std::path::PathBuf; -use cfgsync_runtime::run_cfgsync_server; +use cfgsync_runtime::serve_cfgsync_from_config; use clap::Parser; #[derive(Parser, Debug)] -#[command(about = "CfgSync")] +#[command(about = "Cfgsync server")] struct Args { config: PathBuf, } @@ -12,5 +12,5 @@ struct Args { #[tokio::main] async fn main() -> anyhow::Result<()> { let args = Args::parse(); - run_cfgsync_server(&args.config).await + serve_cfgsync_from_config(&args.config).await } diff --git a/cfgsync/runtime/src/client.rs b/cfgsync/runtime/src/client.rs index 9951e3f..340bdc1 100644 --- a/cfgsync/runtime/src/client.rs +++ b/cfgsync/runtime/src/client.rs @@ -184,11 +184,9 @@ fn parse_registration_payload(raw: &str) -> Result { #[cfg(test)] mod tests { - use std::collections::HashMap; - use cfgsync_core::{ - CfgsyncServerState, NodeArtifactsBundle, NodeArtifactsBundleEntry, NodeArtifactsPayload, - StaticConfigSource, serve_cfgsync, + CfgsyncServerState, NodeArtifactsBundle, NodeArtifactsBundleEntry, StaticConfigSource, + serve_cfgsync, }; use tempfile::tempdir; @@ -208,7 +206,7 @@ mod tests { ], }]); - let repo = StaticConfigSource::from_bundle(bundle_to_payload_map(bundle)); + let repo = StaticConfigSource::from_bundle(bundle); let state = CfgsyncServerState::new(repo); let port = allocate_test_port(); let address = format!("http://127.0.0.1:{port}"); @@ -234,19 +232,6 @@ mod tests { assert_eq!(app_config, "app_key: app_value"); assert_eq!(deployment, "mode: local"); } - - fn bundle_to_payload_map(bundle: NodeArtifactsBundle) -> HashMap { - bundle - .nodes - .into_iter() - .map(|node| { - let NodeArtifactsBundleEntry { identifier, files } = node; - - (identifier, NodeArtifactsPayload::from_files(files)) - }) - .collect() - } - fn allocate_test_port() -> u16 { let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port for test"); diff --git a/cfgsync/runtime/src/lib.rs b/cfgsync/runtime/src/lib.rs index f9b225f..f74a8a4 100644 --- a/cfgsync/runtime/src/lib.rs +++ b/cfgsync/runtime/src/lib.rs @@ -4,8 +4,7 @@ mod client; mod server; pub use client::run_cfgsync_client_from_env; -#[doc(hidden)] -pub use server::CfgSyncServerConfig; pub use server::{ - CfgsyncServerConfig, CfgsyncServingMode, LoadCfgsyncServerConfigError, run_cfgsync_server, + CfgsyncServerConfig, CfgsyncServingMode, LoadCfgsyncServerConfigError, + serve_cfgsync_from_config, }; diff --git a/cfgsync/runtime/src/server.rs b/cfgsync/runtime/src/server.rs index c77b05d..651df58 100644 --- a/cfgsync/runtime/src/server.rs +++ b/cfgsync/runtime/src/server.rs @@ -1,7 +1,7 @@ use std::{fs, path::Path, sync::Arc}; use anyhow::Context as _; -use cfgsync_adapter::{NodeArtifacts, NodeArtifactsCatalog, RegistrationConfigProvider}; +use cfgsync_adapter::{MaterializingConfigSource, NodeArtifacts, NodeArtifactsCatalog}; use cfgsync_core::{ BundleConfigSource, CfgsyncServerState, NodeArtifactsBundle, NodeConfigSource, serve_cfgsync, }; @@ -25,16 +25,6 @@ pub enum CfgsyncServingMode { Registration, } -#[derive(Debug, Deserialize)] -struct RawCfgsyncServerConfig { - port: u16, - bundle_path: String, - #[serde(default)] - serving_mode: Option, - #[serde(default)] - registration_flow: Option, -} - #[derive(Debug, Error)] pub enum LoadCfgsyncServerConfigError { #[error("failed to read cfgsync config file {path}: {source}")] @@ -61,20 +51,11 @@ impl CfgsyncServerConfig { source, })?; - let raw: RawCfgsyncServerConfig = - serde_yaml::from_str(&config_content).map_err(|source| { - LoadCfgsyncServerConfigError::Parse { - path: config_path, - source, - } - })?; - - Ok(Self { - port: raw.port, - bundle_path: raw.bundle_path, - serving_mode: raw - .serving_mode - .unwrap_or_else(|| mode_from_legacy_registration_flow(raw.registration_flow)), + serde_yaml::from_str(&config_content).map_err(|source| { + LoadCfgsyncServerConfigError::Parse { + path: config_path, + source, + } }) } @@ -97,14 +78,6 @@ impl CfgsyncServerConfig { } } -fn mode_from_legacy_registration_flow(registration_flow: Option) -> CfgsyncServingMode { - if registration_flow.unwrap_or(false) { - CfgsyncServingMode::Registration - } else { - CfgsyncServingMode::Bundle - } -} - fn load_bundle_provider(bundle_path: &Path) -> anyhow::Result> { let provider = BundleConfigSource::from_yaml_file(bundle_path) .with_context(|| format!("loading cfgsync provider from {}", bundle_path.display()))?; @@ -112,10 +85,10 @@ fn load_bundle_provider(bundle_path: &Path) -> anyhow::Result anyhow::Result> { +fn load_registration_source(bundle_path: &Path) -> anyhow::Result> { let bundle = load_bundle_yaml(bundle_path)?; let catalog = build_node_catalog(bundle); - let provider = RegistrationConfigProvider::new(catalog); + let provider = MaterializingConfigSource::new(catalog); Ok(Arc::new(provider)) } @@ -154,7 +127,7 @@ fn resolve_bundle_path(config_path: &Path, bundle_path: &str) -> std::path::Path } /// Loads runtime config and starts cfgsync HTTP server process. -pub async fn run_cfgsync_server(config_path: &Path) -> anyhow::Result<()> { +pub async fn serve_cfgsync_from_config(config_path: &Path) -> anyhow::Result<()> { let config = CfgsyncServerConfig::load_from_file(config_path)?; let bundle_path = resolve_bundle_path(config_path, &config.bundle_path); @@ -170,11 +143,8 @@ fn build_server_state( ) -> anyhow::Result { let repo = match config.serving_mode { CfgsyncServingMode::Bundle => load_bundle_provider(bundle_path)?, - CfgsyncServingMode::Registration => load_materializing_provider(bundle_path)?, + CfgsyncServingMode::Registration => load_registration_source(bundle_path)?, }; Ok(CfgsyncServerState::new(repo)) } - -#[doc(hidden)] -pub type CfgSyncServerConfig = CfgsyncServerConfig; diff --git a/logos/runtime/ext/src/cfgsync/mod.rs b/logos/runtime/ext/src/cfgsync/mod.rs index 4d06cd6..9833e57 100644 --- a/logos/runtime/ext/src/cfgsync/mod.rs +++ b/logos/runtime/ext/src/cfgsync/mod.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use cfgsync_adapter::{CfgsyncEnv, build_cfgsync_node_catalog}; +use cfgsync_adapter::{DeploymentAdapter, build_node_artifact_catalog}; pub(crate) use cfgsync_core::render::CfgsyncOutputPaths; use cfgsync_core::{ NodeArtifactsBundle, NodeArtifactsBundleEntry, @@ -27,7 +27,7 @@ enum BundleRenderError { MissingYamlKey { key: String }, } -pub(crate) fn render_cfgsync_from_template( +pub(crate) fn render_cfgsync_from_template( topology: &E::Deployment, hostnames: &[String], options: CfgsyncRenderOptions, @@ -45,11 +45,11 @@ pub(crate) fn render_cfgsync_from_template( }) } -fn build_cfgsync_bundle( +fn build_cfgsync_bundle( topology: &E::Deployment, hostnames: &[String], ) -> Result { - let nodes = build_cfgsync_node_catalog::(topology, hostnames)?.into_configs(); + let nodes = build_node_artifact_catalog::(topology, hostnames)?.into_nodes(); let nodes = nodes .into_iter() .map(|node| NodeArtifactsBundleEntry { @@ -129,7 +129,7 @@ fn build_cfgsync_server_config() -> Value { Value::Mapping(root) } -pub(crate) fn render_and_write_cfgsync_from_template( +pub(crate) fn render_and_write_cfgsync_from_template( topology: &E::Deployment, hostnames: &[String], mut options: CfgsyncRenderOptions, @@ -143,7 +143,7 @@ pub(crate) fn render_and_write_cfgsync_from_template( Ok(rendered) } -fn build_overrides( +fn build_overrides( topology: &E::Deployment, options: CfgsyncRenderOptions, ) -> CfgsyncConfigOverrides { diff --git a/testing-framework/core/src/cfgsync/mod.rs b/testing-framework/core/src/cfgsync/mod.rs index af45d4d..8ed5910 100644 --- a/testing-framework/core/src/cfgsync/mod.rs +++ b/testing-framework/core/src/cfgsync/mod.rs @@ -1 +1,5 @@ pub use cfgsync_adapter::*; +#[doc(hidden)] +pub use cfgsync_adapter::{ + DeploymentAdapter as CfgsyncEnv, build_node_artifact_catalog as build_cfgsync_node_catalog, +};