Refactor cfgsync around external-facing modules

This commit is contained in:
andrussal 2026-03-10 12:30:53 +01:00
parent ef1d7663c5
commit 728b90b770
20 changed files with 1227 additions and 1235 deletions

View File

@ -0,0 +1,74 @@
use std::collections::HashMap;
use cfgsync_artifacts::ArtifactFile;
use serde::{Deserialize, Serialize};
/// Per-node artifact payload served by cfgsync for one registered node.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeArtifacts {
/// Stable node identifier resolved by the adapter.
pub identifier: String,
/// Files served to the node after cfgsync registration.
pub files: Vec<ArtifactFile>,
}
/// Materialized artifact files for a single registered node.
#[derive(Debug, Clone, Default)]
pub struct ArtifactSet {
files: Vec<ArtifactFile>,
}
impl ArtifactSet {
#[must_use]
pub fn new(files: Vec<ArtifactFile>) -> Self {
Self { files }
}
#[must_use]
pub fn files(&self) -> &[ArtifactFile] {
&self.files
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.files.is_empty()
}
}
/// Artifact payloads indexed by stable node identifier.
#[derive(Debug, Clone, Default)]
pub struct NodeArtifactsCatalog {
nodes: HashMap<String, NodeArtifacts>,
}
impl NodeArtifactsCatalog {
#[must_use]
pub fn new(nodes: Vec<NodeArtifacts>) -> Self {
let nodes = nodes
.into_iter()
.map(|node| (node.identifier.clone(), node))
.collect();
Self { nodes }
}
#[must_use]
pub fn resolve(&self, identifier: &str) -> Option<&NodeArtifacts> {
self.nodes.get(identifier)
}
#[must_use]
pub fn len(&self) -> usize {
self.nodes.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.nodes.is_empty()
}
#[must_use]
pub fn into_nodes(self) -> Vec<NodeArtifacts> {
self.nodes.into_values().collect()
}
}

View File

@ -0,0 +1,118 @@
use std::error::Error;
use cfgsync_artifacts::ArtifactFile;
use thiserror::Error;
use crate::{NodeArtifacts, NodeArtifactsCatalog};
/// Adapter contract for converting an application deployment model into
/// node-specific serialized config payloads.
pub trait DeploymentAdapter {
type Deployment;
type Node;
type NodeConfig;
type Error: Error + Send + Sync + 'static;
fn nodes(deployment: &Self::Deployment) -> &[Self::Node];
fn node_identifier(index: usize, node: &Self::Node) -> String;
fn build_node_config(
deployment: &Self::Deployment,
node: &Self::Node,
) -> Result<Self::NodeConfig, Self::Error>;
fn rewrite_for_hostnames(
deployment: &Self::Deployment,
node_index: usize,
hostnames: &[String],
config: &mut Self::NodeConfig,
) -> Result<(), Self::Error>;
fn serialize_node_config(config: &Self::NodeConfig) -> Result<String, Self::Error>;
}
/// High-level failures while building adapter output for cfgsync.
#[derive(Debug, Error)]
pub enum BuildCfgsyncNodesError {
#[error("cfgsync hostnames mismatch (nodes={nodes}, hostnames={hostnames})")]
HostnameCountMismatch { nodes: usize, hostnames: usize },
#[error("cfgsync adapter failed: {source}")]
Adapter {
#[source]
source: super::DynCfgsyncError,
},
}
fn adapter_error<E>(source: E) -> BuildCfgsyncNodesError
where
E: Error + Send + Sync + 'static,
{
BuildCfgsyncNodesError::Adapter {
source: Box::new(source),
}
}
/// Builds cfgsync node configs for a deployment by:
/// 1) validating hostname count,
/// 2) building each node config,
/// 3) rewriting host references,
/// 4) serializing each node payload.
pub fn build_cfgsync_node_configs<E: DeploymentAdapter>(
deployment: &E::Deployment,
hostnames: &[String],
) -> Result<Vec<NodeArtifacts>, BuildCfgsyncNodesError> {
Ok(build_node_artifact_catalog::<E>(deployment, hostnames)?.into_nodes())
}
/// Builds cfgsync node configs and indexes them by stable identifier.
pub fn build_node_artifact_catalog<E: DeploymentAdapter>(
deployment: &E::Deployment,
hostnames: &[String],
) -> Result<NodeArtifactsCatalog, BuildCfgsyncNodesError> {
let nodes = E::nodes(deployment);
ensure_hostname_count(nodes.len(), hostnames.len())?;
let mut output = Vec::with_capacity(nodes.len());
for (index, node) in nodes.iter().enumerate() {
output.push(build_node_entry::<E>(deployment, node, index, hostnames)?);
}
Ok(NodeArtifactsCatalog::new(output))
}
fn ensure_hostname_count(nodes: usize, hostnames: usize) -> Result<(), BuildCfgsyncNodesError> {
if nodes != hostnames {
return Err(BuildCfgsyncNodesError::HostnameCountMismatch { nodes, hostnames });
}
Ok(())
}
fn build_node_entry<E: DeploymentAdapter>(
deployment: &E::Deployment,
node: &E::Node,
index: usize,
hostnames: &[String],
) -> Result<NodeArtifacts, BuildCfgsyncNodesError> {
let node_config = build_rewritten_node_config::<E>(deployment, node, index, hostnames)?;
let config_yaml = E::serialize_node_config(&node_config).map_err(adapter_error)?;
Ok(NodeArtifacts {
identifier: E::node_identifier(index, node),
files: vec![ArtifactFile::new("/config.yaml", &config_yaml)],
})
}
fn build_rewritten_node_config<E: DeploymentAdapter>(
deployment: &E::Deployment,
node: &E::Node,
index: usize,
hostnames: &[String],
) -> Result<E::NodeConfig, BuildCfgsyncNodesError> {
let mut node_config = E::build_node_config(deployment, node).map_err(adapter_error)?;
E::rewrite_for_hostnames(deployment, index, hostnames, &mut node_config)
.map_err(adapter_error)?;
Ok(node_config)
}

View File

@ -1,596 +1,16 @@
use std::{collections::HashMap, error::Error, sync::Mutex};
mod artifacts;
mod deployment;
mod materializer;
mod registrations;
mod sources;
use cfgsync_artifacts::ArtifactFile;
use cfgsync_core::{
CfgsyncErrorResponse, ConfigResolveResponse, NodeArtifactsPayload, NodeConfigSource,
NodeRegistration, RegisterNodeResponse,
pub use artifacts::{ArtifactSet, NodeArtifacts, NodeArtifactsCatalog};
pub use deployment::{
BuildCfgsyncNodesError, DeploymentAdapter, build_cfgsync_node_configs,
build_node_artifact_catalog,
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
/// Type-erased cfgsync adapter error used to preserve source context.
pub type DynCfgsyncError = Box<dyn Error + Send + Sync + 'static>;
/// Per-node artifact payload served by cfgsync for one registered node.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeArtifacts {
/// Stable node identifier resolved by the adapter.
pub identifier: String,
/// Files served to the node after cfgsync registration.
pub files: Vec<ArtifactFile>,
}
/// Materialized artifact files for a single registered node.
#[derive(Debug, Clone, Default)]
pub struct ArtifactSet {
files: Vec<ArtifactFile>,
}
/// Immutable view of registrations currently known to cfgsync.
#[derive(Debug, Clone, Default)]
pub struct RegistrationSet {
registrations: Vec<NodeRegistration>,
}
impl RegistrationSet {
#[must_use]
pub fn new(registrations: Vec<NodeRegistration>) -> Self {
Self { registrations }
}
#[must_use]
pub fn len(&self) -> usize {
self.registrations.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.registrations.is_empty()
}
#[must_use]
pub fn iter(&self) -> impl Iterator<Item = &NodeRegistration> {
self.registrations.iter()
}
#[must_use]
pub fn get(&self, identifier: &str) -> Option<&NodeRegistration> {
self.registrations
.iter()
.find(|registration| registration.identifier == identifier)
}
}
impl ArtifactSet {
#[must_use]
pub fn new(files: Vec<ArtifactFile>) -> Self {
Self { files }
}
#[must_use]
pub fn files(&self) -> &[ArtifactFile] {
&self.files
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.files.is_empty()
}
}
/// Artifact payloads indexed by stable node identifier.
#[derive(Debug, Clone, Default)]
pub struct NodeArtifactsCatalog {
nodes: HashMap<String, NodeArtifacts>,
}
impl NodeArtifactsCatalog {
#[must_use]
pub fn new(nodes: Vec<NodeArtifacts>) -> Self {
let nodes = nodes
.into_iter()
.map(|node| (node.identifier.clone(), node))
.collect();
Self { nodes }
}
#[must_use]
pub fn resolve(&self, identifier: &str) -> Option<&NodeArtifacts> {
self.nodes.get(identifier)
}
#[must_use]
pub fn len(&self) -> usize {
self.nodes.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.nodes.is_empty()
}
#[must_use]
pub fn into_nodes(self) -> Vec<NodeArtifacts> {
self.nodes.into_values().collect()
}
#[doc(hidden)]
#[must_use]
pub fn into_configs(self) -> Vec<NodeArtifacts> {
self.into_nodes()
}
}
/// Adapter-side materialization contract for a single registered node.
pub trait NodeArtifactsMaterializer: Send + Sync {
fn materialize(
&self,
registration: &NodeRegistration,
registrations: &RegistrationSet,
) -> Result<Option<ArtifactSet>, DynCfgsyncError>;
}
/// Backward-compatible alias for the previous materializer trait name.
pub trait CfgsyncMaterializer: NodeArtifactsMaterializer {}
impl<T> CfgsyncMaterializer for T where T: NodeArtifactsMaterializer + ?Sized {}
/// Adapter contract for materializing a whole registration set into
/// per-node cfgsync artifacts.
pub trait RegistrationSetMaterializer: Send + Sync {
fn materialize_snapshot(
&self,
registrations: &RegistrationSet,
) -> Result<Option<NodeArtifactsCatalog>, DynCfgsyncError>;
}
/// Backward-compatible alias for the previous snapshot materializer trait name.
pub trait CfgsyncSnapshotMaterializer: RegistrationSetMaterializer {}
impl<T> CfgsyncSnapshotMaterializer for T where T: RegistrationSetMaterializer + ?Sized {}
impl NodeArtifactsMaterializer for NodeArtifactsCatalog {
fn materialize(
&self,
registration: &NodeRegistration,
_registrations: &RegistrationSet,
) -> Result<Option<ArtifactSet>, DynCfgsyncError> {
let artifacts = self
.resolve(&registration.identifier)
.map(build_node_artifacts_from_config);
Ok(artifacts)
}
}
impl RegistrationSetMaterializer for NodeArtifactsCatalog {
fn materialize_snapshot(
&self,
_registrations: &RegistrationSet,
) -> Result<Option<NodeArtifactsCatalog>, DynCfgsyncError> {
Ok(Some(self.clone()))
}
}
/// Registration-aware provider backed by an adapter materializer.
pub struct RegistrationConfigProvider<M> {
materializer: M,
registrations: Mutex<HashMap<String, NodeRegistration>>,
}
impl<M> RegistrationConfigProvider<M> {
#[must_use]
pub fn new(materializer: M) -> Self {
Self {
materializer,
registrations: Mutex::new(HashMap::new()),
}
}
fn registration_for(&self, identifier: &str) -> Option<NodeRegistration> {
let registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
registrations.get(identifier).cloned()
}
fn registration_set(&self) -> RegistrationSet {
let registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
RegistrationSet::new(registrations.values().cloned().collect())
}
}
/// Registration-aware provider backed by a snapshot materializer.
pub struct SnapshotConfigProvider<M> {
materializer: M,
registrations: Mutex<HashMap<String, NodeRegistration>>,
}
impl<M> SnapshotConfigProvider<M> {
#[must_use]
pub fn new(materializer: M) -> Self {
Self {
materializer,
registrations: Mutex::new(HashMap::new()),
}
}
fn registration_for(&self, identifier: &str) -> Option<NodeRegistration> {
let registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
registrations.get(identifier).cloned()
}
fn registration_set(&self) -> RegistrationSet {
let registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
RegistrationSet::new(registrations.values().cloned().collect())
}
}
impl<M> NodeConfigSource for SnapshotConfigProvider<M>
where
M: RegistrationSetMaterializer,
{
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse {
let mut registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
registrations.insert(registration.identifier.clone(), registration);
RegisterNodeResponse::Registered
}
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse {
let registration = match self.registration_for(&registration.identifier) {
Some(registration) => registration,
None => {
return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready(
&registration.identifier,
));
}
};
let registrations = self.registration_set();
let catalog = match self.materializer.materialize_snapshot(&registrations) {
Ok(Some(catalog)) => catalog,
Ok(None) => {
return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready(
&registration.identifier,
));
}
Err(error) => {
return ConfigResolveResponse::Error(CfgsyncErrorResponse::internal(format!(
"failed to materialize config snapshot: {error}"
)));
}
};
match catalog.resolve(&registration.identifier) {
Some(config) => ConfigResolveResponse::Config(NodeArtifactsPayload::from_files(
config.files.clone(),
)),
None => ConfigResolveResponse::Error(CfgsyncErrorResponse::missing_config(
&registration.identifier,
)),
}
}
}
impl<M> NodeConfigSource for RegistrationConfigProvider<M>
where
M: NodeArtifactsMaterializer,
{
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse {
let mut registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
registrations.insert(registration.identifier.clone(), registration);
RegisterNodeResponse::Registered
}
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse {
let registration = match self.registration_for(&registration.identifier) {
Some(registration) => registration,
None => {
return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready(
&registration.identifier,
));
}
};
let registrations = self.registration_set();
match self.materializer.materialize(&registration, &registrations) {
Ok(Some(artifacts)) => ConfigResolveResponse::Config(NodeArtifactsPayload::from_files(
artifacts.files().to_vec(),
)),
Ok(None) => ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready(
&registration.identifier,
)),
Err(error) => ConfigResolveResponse::Error(CfgsyncErrorResponse::internal(format!(
"failed to materialize config for host {}: {error}",
registration.identifier
))),
}
}
}
/// Adapter contract for converting an application deployment model into
/// node-specific serialized config payloads.
pub trait CfgsyncEnv {
type Deployment;
type Node;
type NodeConfig;
type Error: Error + Send + Sync + 'static;
fn nodes(deployment: &Self::Deployment) -> &[Self::Node];
fn node_identifier(index: usize, node: &Self::Node) -> String;
fn build_node_config(
deployment: &Self::Deployment,
node: &Self::Node,
) -> Result<Self::NodeConfig, Self::Error>;
fn rewrite_for_hostnames(
deployment: &Self::Deployment,
node_index: usize,
hostnames: &[String],
config: &mut Self::NodeConfig,
) -> Result<(), Self::Error>;
fn serialize_node_config(config: &Self::NodeConfig) -> Result<String, Self::Error>;
}
/// Preferred public name for application-side cfgsync integration.
pub trait DeploymentAdapter: CfgsyncEnv {}
impl<T> DeploymentAdapter for T where T: CfgsyncEnv + ?Sized {}
/// High-level failures while building adapter output for cfgsync.
#[derive(Debug, Error)]
pub enum BuildCfgsyncNodesError {
#[error("cfgsync hostnames mismatch (nodes={nodes}, hostnames={hostnames})")]
HostnameCountMismatch { nodes: usize, hostnames: usize },
#[error("cfgsync adapter failed: {source}")]
Adapter {
#[source]
source: DynCfgsyncError,
},
}
fn adapter_error<E>(source: E) -> BuildCfgsyncNodesError
where
E: Error + Send + Sync + 'static,
{
BuildCfgsyncNodesError::Adapter {
source: Box::new(source),
}
}
/// Builds cfgsync node configs for a deployment by:
/// 1) validating hostname count,
/// 2) building each node config,
/// 3) rewriting host references,
/// 4) serializing each node payload.
pub fn build_cfgsync_node_configs<E: CfgsyncEnv>(
deployment: &E::Deployment,
hostnames: &[String],
) -> Result<Vec<CfgsyncNodeConfig>, BuildCfgsyncNodesError> {
Ok(build_node_artifact_catalog::<E>(deployment, hostnames)?.into_nodes())
}
/// Builds cfgsync node configs and indexes them by stable identifier.
pub fn build_node_artifact_catalog<E: DeploymentAdapter>(
deployment: &E::Deployment,
hostnames: &[String],
) -> Result<NodeArtifactsCatalog, BuildCfgsyncNodesError> {
let nodes = E::nodes(deployment);
ensure_hostname_count(nodes.len(), hostnames.len())?;
let mut output = Vec::with_capacity(nodes.len());
for (index, node) in nodes.iter().enumerate() {
output.push(build_node_entry::<E>(deployment, node, index, hostnames)?);
}
Ok(NodeArtifactsCatalog::new(output))
}
#[doc(hidden)]
pub fn build_cfgsync_node_catalog<E: CfgsyncEnv>(
deployment: &E::Deployment,
hostnames: &[String],
) -> Result<NodeArtifactsCatalog, BuildCfgsyncNodesError> {
build_node_artifact_catalog::<E>(deployment, hostnames)
}
fn ensure_hostname_count(nodes: usize, hostnames: usize) -> Result<(), BuildCfgsyncNodesError> {
if nodes != hostnames {
return Err(BuildCfgsyncNodesError::HostnameCountMismatch { nodes, hostnames });
}
Ok(())
}
fn build_node_entry<E: DeploymentAdapter>(
deployment: &E::Deployment,
node: &E::Node,
index: usize,
hostnames: &[String],
) -> Result<NodeArtifacts, BuildCfgsyncNodesError> {
let node_config = build_rewritten_node_config::<E>(deployment, node, index, hostnames)?;
let config_yaml = E::serialize_node_config(&node_config).map_err(adapter_error)?;
Ok(NodeArtifacts {
identifier: E::node_identifier(index, node),
files: vec![ArtifactFile::new("/config.yaml", &config_yaml)],
})
}
fn build_rewritten_node_config<E: DeploymentAdapter>(
deployment: &E::Deployment,
node: &E::Node,
index: usize,
hostnames: &[String],
) -> Result<E::NodeConfig, BuildCfgsyncNodesError> {
let mut node_config = E::build_node_config(deployment, node).map_err(adapter_error)?;
E::rewrite_for_hostnames(deployment, index, hostnames, &mut node_config)
.map_err(adapter_error)?;
Ok(node_config)
}
fn build_node_artifacts_from_config(config: &NodeArtifacts) -> ArtifactSet {
ArtifactSet::new(config.files.clone())
}
#[doc(hidden)]
pub type CfgsyncNodeConfig = NodeArtifacts;
#[doc(hidden)]
pub type CfgsyncNodeArtifacts = ArtifactSet;
#[doc(hidden)]
pub type RegistrationSnapshot = RegistrationSet;
#[doc(hidden)]
pub type CfgsyncNodeCatalog = NodeArtifactsCatalog;
#[doc(hidden)]
pub type MaterializingConfigProvider<M> = RegistrationConfigProvider<M>;
#[doc(hidden)]
pub type SnapshotMaterializingConfigProvider<M> = SnapshotConfigProvider<M>;
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use cfgsync_artifacts::ArtifactFile;
use cfgsync_core::{
CfgsyncErrorCode, ConfigResolveResponse, NodeConfigSource, NodeRegistration,
};
use super::{
ArtifactSet, DynCfgsyncError, NodeArtifacts, NodeArtifactsCatalog,
NodeArtifactsMaterializer, RegistrationConfigProvider, RegistrationSet,
};
#[test]
fn catalog_resolves_identifier() {
let catalog = NodeArtifactsCatalog::new(vec![NodeArtifacts {
identifier: "node-1".to_owned(),
files: vec![ArtifactFile::new("/config.yaml", "key: value")],
}]);
let node = catalog.resolve("node-1").expect("resolve node config");
assert_eq!(node.files[0].content, "key: value");
}
#[test]
fn materializing_provider_resolves_registered_node() {
let catalog = NodeArtifactsCatalog::new(vec![NodeArtifacts {
identifier: "node-1".to_owned(),
files: vec![ArtifactFile::new("/config.yaml", "key: value")],
}]);
let provider = RegistrationConfigProvider::new(catalog);
let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip"));
let _ = provider.register(registration.clone());
match provider.resolve(&registration) {
ConfigResolveResponse::Config(payload) => {
assert_eq!(payload.files()[0].path, "/config.yaml")
}
ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"),
}
}
#[test]
fn materializing_provider_reports_not_ready_before_registration() {
let catalog = NodeArtifactsCatalog::new(vec![NodeArtifacts {
identifier: "node-1".to_owned(),
files: vec![ArtifactFile::new("/config.yaml", "key: value")],
}]);
let provider = RegistrationConfigProvider::new(catalog);
let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip"));
match provider.resolve(&registration) {
ConfigResolveResponse::Config(_) => panic!("expected not-ready error"),
ConfigResolveResponse::Error(error) => {
assert!(matches!(error.code, CfgsyncErrorCode::NotReady))
}
}
}
struct ThresholdMaterializer {
calls: AtomicUsize,
}
impl NodeArtifactsMaterializer for ThresholdMaterializer {
fn materialize(
&self,
registration: &NodeRegistration,
registrations: &RegistrationSet,
) -> Result<Option<ArtifactSet>, DynCfgsyncError> {
self.calls.fetch_add(1, Ordering::SeqCst);
if registrations.len() < 2 {
return Ok(None);
}
let peer_count = registrations.iter().count();
let files = vec![
ArtifactFile::new("/config.yaml", format!("id: {}", registration.identifier)),
ArtifactFile::new("/shared.yaml", format!("peers: {peer_count}")),
];
Ok(Some(ArtifactSet::new(files)))
}
}
#[test]
fn materializing_provider_uses_registration_snapshot_for_readiness() {
let provider = RegistrationConfigProvider::new(ThresholdMaterializer {
calls: AtomicUsize::new(0),
});
let node_a = NodeRegistration::new("node-a", "127.0.0.1".parse().expect("parse ip"));
let node_b = NodeRegistration::new("node-b", "127.0.0.2".parse().expect("parse ip"));
let _ = provider.register(node_a.clone());
match provider.resolve(&node_a) {
ConfigResolveResponse::Config(_) => panic!("expected not-ready error"),
ConfigResolveResponse::Error(error) => {
assert!(matches!(error.code, CfgsyncErrorCode::NotReady))
}
}
let _ = provider.register(node_b);
match provider.resolve(&node_a) {
ConfigResolveResponse::Config(payload) => {
assert_eq!(payload.files()[0].content, "id: node-a");
assert_eq!(payload.files()[1].content, "peers: 2");
}
ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"),
}
}
}
pub use materializer::{
DynCfgsyncError, NodeArtifactsMaterializer, RegistrationSnapshotMaterializer,
};
pub use registrations::RegistrationSnapshot;
pub use sources::{MaterializingConfigSource, SnapshotConfigSource};

View File

@ -0,0 +1,26 @@
use std::error::Error;
use cfgsync_core::NodeRegistration;
use crate::{ArtifactSet, NodeArtifactsCatalog, RegistrationSnapshot};
/// Type-erased cfgsync adapter error used to preserve source context.
pub type DynCfgsyncError = Box<dyn Error + Send + Sync + 'static>;
/// Adapter-side materialization contract for a single registered node.
pub trait NodeArtifactsMaterializer: Send + Sync {
fn materialize(
&self,
registration: &NodeRegistration,
registrations: &RegistrationSnapshot,
) -> Result<Option<ArtifactSet>, DynCfgsyncError>;
}
/// Adapter contract for materializing a whole registration snapshot into
/// per-node cfgsync artifacts.
pub trait RegistrationSnapshotMaterializer: Send + Sync {
fn materialize_snapshot(
&self,
registrations: &RegistrationSnapshot,
) -> Result<Option<NodeArtifactsCatalog>, DynCfgsyncError>;
}

View File

@ -0,0 +1,36 @@
use cfgsync_core::NodeRegistration;
/// Immutable view of registrations currently known to cfgsync.
#[derive(Debug, Clone, Default)]
pub struct RegistrationSnapshot {
registrations: Vec<NodeRegistration>,
}
impl RegistrationSnapshot {
#[must_use]
pub fn new(registrations: Vec<NodeRegistration>) -> Self {
Self { registrations }
}
#[must_use]
pub fn len(&self) -> usize {
self.registrations.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.registrations.is_empty()
}
#[must_use]
pub fn iter(&self) -> impl Iterator<Item = &NodeRegistration> {
self.registrations.iter()
}
#[must_use]
pub fn get(&self, identifier: &str) -> Option<&NodeRegistration> {
self.registrations
.iter()
.find(|registration| registration.identifier == identifier)
}
}

View File

@ -0,0 +1,365 @@
use std::{collections::HashMap, sync::Mutex};
use cfgsync_core::{
CfgsyncErrorResponse, ConfigResolveResponse, NodeArtifactsPayload, NodeConfigSource,
NodeRegistration, RegisterNodeResponse,
};
use crate::{
ArtifactSet, DynCfgsyncError, NodeArtifactsCatalog, NodeArtifactsMaterializer,
RegistrationSnapshot, RegistrationSnapshotMaterializer,
};
impl NodeArtifactsMaterializer for NodeArtifactsCatalog {
fn materialize(
&self,
registration: &NodeRegistration,
_registrations: &RegistrationSnapshot,
) -> Result<Option<ArtifactSet>, DynCfgsyncError> {
Ok(self
.resolve(&registration.identifier)
.map(build_artifact_set_from_catalog_entry))
}
}
impl RegistrationSnapshotMaterializer for NodeArtifactsCatalog {
fn materialize_snapshot(
&self,
_registrations: &RegistrationSnapshot,
) -> Result<Option<NodeArtifactsCatalog>, DynCfgsyncError> {
Ok(Some(self.clone()))
}
}
/// Registration-aware source backed by an adapter materializer.
pub struct MaterializingConfigSource<M> {
materializer: M,
registrations: Mutex<HashMap<String, NodeRegistration>>,
}
impl<M> MaterializingConfigSource<M> {
#[must_use]
pub fn new(materializer: M) -> Self {
Self {
materializer,
registrations: Mutex::new(HashMap::new()),
}
}
fn registration_for(&self, identifier: &str) -> Option<NodeRegistration> {
let registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
registrations.get(identifier).cloned()
}
fn registration_snapshot(&self) -> RegistrationSnapshot {
let registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
RegistrationSnapshot::new(registrations.values().cloned().collect())
}
}
impl<M> NodeConfigSource for MaterializingConfigSource<M>
where
M: NodeArtifactsMaterializer,
{
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse {
let mut registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
registrations.insert(registration.identifier.clone(), registration);
RegisterNodeResponse::Registered
}
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse {
let registration = match self.registration_for(&registration.identifier) {
Some(registration) => registration,
None => {
return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready(
&registration.identifier,
));
}
};
let registrations = self.registration_snapshot();
match self.materializer.materialize(&registration, &registrations) {
Ok(Some(artifacts)) => ConfigResolveResponse::Config(NodeArtifactsPayload::from_files(
artifacts.files().to_vec(),
)),
Ok(None) => ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready(
&registration.identifier,
)),
Err(error) => ConfigResolveResponse::Error(CfgsyncErrorResponse::internal(format!(
"failed to materialize config for host {}: {error}",
registration.identifier
))),
}
}
}
/// Registration-aware source backed by a snapshot materializer.
pub struct SnapshotConfigSource<M> {
materializer: M,
registrations: Mutex<HashMap<String, NodeRegistration>>,
}
impl<M> SnapshotConfigSource<M> {
#[must_use]
pub fn new(materializer: M) -> Self {
Self {
materializer,
registrations: Mutex::new(HashMap::new()),
}
}
fn registration_for(&self, identifier: &str) -> Option<NodeRegistration> {
let registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
registrations.get(identifier).cloned()
}
fn registration_snapshot(&self) -> RegistrationSnapshot {
let registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
RegistrationSnapshot::new(registrations.values().cloned().collect())
}
}
impl<M> NodeConfigSource for SnapshotConfigSource<M>
where
M: RegistrationSnapshotMaterializer,
{
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse {
let mut registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
registrations.insert(registration.identifier.clone(), registration);
RegisterNodeResponse::Registered
}
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse {
let registration = match self.registration_for(&registration.identifier) {
Some(registration) => registration,
None => {
return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready(
&registration.identifier,
));
}
};
let registrations = self.registration_snapshot();
let catalog = match self.materializer.materialize_snapshot(&registrations) {
Ok(Some(catalog)) => catalog,
Ok(None) => {
return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready(
&registration.identifier,
));
}
Err(error) => {
return ConfigResolveResponse::Error(CfgsyncErrorResponse::internal(format!(
"failed to materialize config snapshot: {error}"
)));
}
};
match catalog.resolve(&registration.identifier) {
Some(config) => ConfigResolveResponse::Config(NodeArtifactsPayload::from_files(
config.files.clone(),
)),
None => ConfigResolveResponse::Error(CfgsyncErrorResponse::missing_config(
&registration.identifier,
)),
}
}
}
fn build_artifact_set_from_catalog_entry(config: &crate::NodeArtifacts) -> ArtifactSet {
ArtifactSet::new(config.files.clone())
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use cfgsync_artifacts::ArtifactFile;
use cfgsync_core::{
CfgsyncErrorCode, ConfigResolveResponse, NodeConfigSource, NodeRegistration,
};
use super::{MaterializingConfigSource, SnapshotConfigSource};
use crate::{
DynCfgsyncError, NodeArtifacts, NodeArtifactsCatalog, NodeArtifactsMaterializer,
RegistrationSnapshot, RegistrationSnapshotMaterializer,
};
#[test]
fn catalog_resolves_identifier() {
let catalog = NodeArtifactsCatalog::new(vec![NodeArtifacts {
identifier: "node-1".to_owned(),
files: vec![ArtifactFile::new("/config.yaml", "key: value")],
}]);
let node = catalog.resolve("node-1").expect("resolve node config");
assert_eq!(node.files[0].content, "key: value");
}
#[test]
fn materializing_source_resolves_registered_node() {
let catalog = NodeArtifactsCatalog::new(vec![NodeArtifacts {
identifier: "node-1".to_owned(),
files: vec![ArtifactFile::new("/config.yaml", "key: value")],
}]);
let source = MaterializingConfigSource::new(catalog);
let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip"));
let _ = source.register(registration.clone());
match source.resolve(&registration) {
ConfigResolveResponse::Config(payload) => {
assert_eq!(payload.files()[0].path, "/config.yaml")
}
ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"),
}
}
#[test]
fn materializing_source_reports_not_ready_before_registration() {
let catalog = NodeArtifactsCatalog::new(vec![NodeArtifacts {
identifier: "node-1".to_owned(),
files: vec![ArtifactFile::new("/config.yaml", "key: value")],
}]);
let source = MaterializingConfigSource::new(catalog);
let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip"));
match source.resolve(&registration) {
ConfigResolveResponse::Config(_) => panic!("expected not-ready error"),
ConfigResolveResponse::Error(error) => {
assert!(matches!(error.code, CfgsyncErrorCode::NotReady))
}
}
}
struct ThresholdMaterializer {
calls: AtomicUsize,
}
impl NodeArtifactsMaterializer for ThresholdMaterializer {
fn materialize(
&self,
registration: &NodeRegistration,
registrations: &RegistrationSnapshot,
) -> Result<Option<crate::ArtifactSet>, DynCfgsyncError> {
self.calls.fetch_add(1, Ordering::SeqCst);
if registrations.len() < 2 {
return Ok(None);
}
let peer_count = registrations.iter().count();
let files = vec![
ArtifactFile::new("/config.yaml", format!("id: {}", registration.identifier)),
ArtifactFile::new("/peers.txt", peer_count.to_string()),
];
Ok(Some(crate::ArtifactSet::new(files)))
}
}
#[test]
fn materializing_source_passes_registration_snapshot() {
let source = MaterializingConfigSource::new(ThresholdMaterializer {
calls: AtomicUsize::new(0),
});
let node_a = NodeRegistration::new("node-a", "127.0.0.1".parse().expect("parse ip"));
let node_b = NodeRegistration::new("node-b", "127.0.0.2".parse().expect("parse ip"));
let _ = source.register(node_a.clone());
match source.resolve(&node_a) {
ConfigResolveResponse::Config(_) => panic!("expected not-ready error"),
ConfigResolveResponse::Error(error) => {
assert!(matches!(error.code, CfgsyncErrorCode::NotReady))
}
}
let _ = source.register(node_b);
match source.resolve(&node_a) {
ConfigResolveResponse::Config(payload) => {
assert_eq!(payload.files()[0].content, "id: node-a");
assert_eq!(payload.files()[1].content, "2");
}
ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"),
}
assert_eq!(source.materializer.calls.load(Ordering::SeqCst), 2);
}
struct ThresholdSnapshotMaterializer;
impl RegistrationSnapshotMaterializer for ThresholdSnapshotMaterializer {
fn materialize_snapshot(
&self,
registrations: &RegistrationSnapshot,
) -> Result<Option<NodeArtifactsCatalog>, DynCfgsyncError> {
if registrations.len() < 2 {
return Ok(None);
}
Ok(Some(NodeArtifactsCatalog::new(
registrations
.iter()
.map(|registration| NodeArtifacts {
identifier: registration.identifier.clone(),
files: vec![ArtifactFile::new(
"/config.yaml",
format!("peer_count: {}", registrations.len()),
)],
})
.collect(),
)))
}
}
#[test]
fn snapshot_source_materializes_from_registration_snapshot() {
let source = SnapshotConfigSource::new(ThresholdSnapshotMaterializer);
let node_a = NodeRegistration::new("node-a", "127.0.0.1".parse().expect("parse ip"));
let node_b = NodeRegistration::new("node-b", "127.0.0.2".parse().expect("parse ip"));
let _ = source.register(node_a.clone());
match source.resolve(&node_a) {
ConfigResolveResponse::Config(_) => panic!("expected not-ready error"),
ConfigResolveResponse::Error(error) => {
assert!(matches!(error.code, CfgsyncErrorCode::NotReady))
}
}
let _ = source.register(node_b);
match source.resolve(&node_a) {
ConfigResolveResponse::Config(payload) => {
assert_eq!(payload.files()[0].content, "peer_count: 2");
}
ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"),
}
}
}

View File

@ -24,9 +24,3 @@ pub struct NodeArtifactsBundleEntry {
#[serde(default)]
pub files: Vec<NodeArtifactFile>,
}
#[doc(hidden)]
pub type CfgSyncBundle = NodeArtifactsBundle;
#[doc(hidden)]
pub type CfgSyncBundleNode = NodeArtifactsBundleEntry;

View File

@ -1,7 +1,7 @@
use serde::Serialize;
use thiserror::Error;
use crate::repo::{CfgsyncErrorCode, CfgsyncErrorResponse, NodeArtifactsPayload, NodeRegistration};
use crate::{CfgsyncErrorCode, CfgsyncErrorResponse, NodeArtifactsPayload, NodeRegistration};
/// cfgsync client-side request/response failures.
#[derive(Debug, Error)]
@ -63,14 +63,6 @@ impl CfgsyncClient {
self.post_json("/node", payload).await
}
/// Fetches `/init-with-node` payload for a node identifier.
pub async fn fetch_init_with_node_config(
&self,
payload: &NodeRegistration,
) -> Result<NodeArtifactsPayload, ClientError> {
self.post_json("/init-with-node", payload).await
}
pub async fn fetch_node_config_status(
&self,
payload: &NodeRegistration,
@ -155,6 +147,3 @@ impl CfgsyncClient {
}
}
}
#[doc(hidden)]
pub type CfgSyncClient = CfgsyncClient;

View File

@ -0,0 +1,20 @@
#![doc(hidden)]
pub use crate::{
bundle::{NodeArtifactsBundle as CfgSyncBundle, NodeArtifactsBundleEntry as CfgSyncBundleNode},
client::CfgsyncClient as CfgSyncClient,
protocol::{
CfgsyncErrorCode as CfgSyncErrorCode, CfgsyncErrorResponse as CfgSyncErrorResponse,
ConfigResolveResponse as RepoResponse, NodeArtifactFile as CfgSyncFile,
NodeArtifactsPayload as CfgSyncPayload, RegisterNodeResponse as RegistrationResponse,
},
server::{
CfgsyncServerState as CfgSyncState, build_legacy_cfgsync_router as cfgsync_app,
serve_cfgsync as run_cfgsync,
},
source::{
BundleConfigSource as FileConfigProvider,
BundleConfigSourceError as FileConfigProviderError, NodeConfigSource as ConfigProvider,
StaticConfigSource as ConfigRepo,
},
};

View File

@ -1,33 +1,26 @@
pub mod bundle;
pub mod client;
#[doc(hidden)]
pub mod compat;
pub mod protocol;
pub mod render;
pub mod repo;
pub mod server;
pub mod source;
#[doc(hidden)]
pub use bundle::{CfgSyncBundle, CfgSyncBundleNode};
pub use bundle::{NodeArtifactsBundle, NodeArtifactsBundleEntry};
#[doc(hidden)]
pub use client::CfgSyncClient;
pub use client::{CfgsyncClient, ClientError, ConfigFetchStatus};
pub use protocol::{
CFGSYNC_SCHEMA_VERSION, CfgsyncErrorCode, CfgsyncErrorResponse, ConfigResolveResponse,
NodeArtifactFile, NodeArtifactsPayload, NodeRegistration, RegisterNodeResponse,
RegistrationPayload,
};
pub use render::{
CfgsyncConfigOverrides, CfgsyncOutputPaths, RenderedCfgsync, apply_cfgsync_overrides,
apply_timeout_floor, ensure_bundle_path, load_cfgsync_template_yaml,
render_cfgsync_yaml_from_template, write_rendered_cfgsync,
};
pub use repo::{
BundleConfigSource, BundleConfigSourceError, CFGSYNC_SCHEMA_VERSION, CfgsyncErrorCode,
CfgsyncErrorResponse, ConfigResolveResponse, NodeArtifactFile, NodeArtifactsPayload,
NodeConfigSource, NodeRegistration, RegisterNodeResponse, RegistrationPayload,
StaticConfigSource,
};
#[doc(hidden)]
pub use repo::{
CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile, CfgSyncPayload, ConfigProvider,
ConfigRepo, FileConfigProvider, FileConfigProviderError, RegistrationResponse, RepoResponse,
};
#[doc(hidden)]
pub use server::CfgSyncState;
pub use server::{CfgsyncServerState, RunCfgsyncError, build_cfgsync_router, serve_cfgsync};
#[doc(hidden)]
pub use server::{cfgsync_app, run_cfgsync};
pub use source::{
BundleConfigSource, BundleConfigSourceError, BundleLoadError, NodeConfigSource,
StaticConfigSource, bundle_to_payload_map, load_bundle,
};

View File

@ -0,0 +1,258 @@
use std::net::Ipv4Addr;
use cfgsync_artifacts::ArtifactFile;
use serde::{Deserialize, Deserializer, Serialize, Serializer, de::DeserializeOwned};
use serde_json::Value;
use thiserror::Error;
/// Schema version served by cfgsync payload responses.
pub const CFGSYNC_SCHEMA_VERSION: u16 = 1;
/// Canonical cfgsync file type used in payloads and bundles.
pub type NodeArtifactFile = ArtifactFile;
/// Payload returned by cfgsync server for one node.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeArtifactsPayload {
/// Payload schema version for compatibility checks.
pub schema_version: u16,
/// Files that must be written on the target node.
#[serde(default)]
pub files: Vec<NodeArtifactFile>,
}
/// Adapter-owned registration payload stored alongside a generic node identity.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct RegistrationPayload {
raw_json: Option<String>,
}
impl RegistrationPayload {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.raw_json.is_none()
}
pub fn from_serializable<T>(value: &T) -> Result<Self, serde_json::Error>
where
T: Serialize,
{
Ok(Self {
raw_json: Some(serde_json::to_string(value)?),
})
}
pub fn from_json_str(raw_json: &str) -> Result<Self, serde_json::Error> {
let value: Value = serde_json::from_str(raw_json)?;
Ok(Self {
raw_json: Some(serde_json::to_string(&value)?),
})
}
pub fn deserialize<T>(&self) -> Result<Option<T>, serde_json::Error>
where
T: DeserializeOwned,
{
self.raw_json
.as_ref()
.map(|raw_json| serde_json::from_str(raw_json))
.transpose()
}
#[must_use]
pub fn raw_json(&self) -> Option<&str> {
self.raw_json.as_deref()
}
}
impl Serialize for RegistrationPayload {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match self.raw_json.as_deref() {
Some(raw_json) => {
let value: Value =
serde_json::from_str(raw_json).map_err(serde::ser::Error::custom)?;
value.serialize(serializer)
}
None => serializer.serialize_none(),
}
}
}
impl<'de> Deserialize<'de> for RegistrationPayload {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let value = Option::<Value>::deserialize(deserializer)?;
let raw_json = value
.map(|value| serde_json::to_string(&value).map_err(serde::de::Error::custom))
.transpose()?;
Ok(Self { raw_json })
}
}
/// Node metadata recorded before config materialization.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct NodeRegistration {
pub identifier: String,
pub ip: Ipv4Addr,
#[serde(default, skip_serializing_if = "RegistrationPayload::is_empty")]
pub metadata: RegistrationPayload,
}
impl NodeRegistration {
#[must_use]
pub fn new(identifier: impl Into<String>, ip: Ipv4Addr) -> Self {
Self {
identifier: identifier.into(),
ip,
metadata: RegistrationPayload::default(),
}
}
pub fn with_metadata<T>(mut self, metadata: &T) -> Result<Self, serde_json::Error>
where
T: Serialize,
{
self.metadata = RegistrationPayload::from_serializable(metadata)?;
Ok(self)
}
#[must_use]
pub fn with_payload(mut self, payload: RegistrationPayload) -> Self {
self.metadata = payload;
self
}
}
impl NodeArtifactsPayload {
#[must_use]
pub fn from_files(files: Vec<NodeArtifactFile>) -> Self {
Self {
schema_version: CFGSYNC_SCHEMA_VERSION,
files,
}
}
#[must_use]
pub fn files(&self) -> &[NodeArtifactFile] {
&self.files
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.files.is_empty()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CfgsyncErrorCode {
MissingConfig,
NotReady,
Internal,
}
/// Structured error body returned by cfgsync server.
#[derive(Debug, Clone, Serialize, Deserialize, Error)]
#[error("{code:?}: {message}")]
pub struct CfgsyncErrorResponse {
pub code: CfgsyncErrorCode,
pub message: String,
}
impl CfgsyncErrorResponse {
#[must_use]
pub fn missing_config(identifier: &str) -> Self {
Self {
code: CfgsyncErrorCode::MissingConfig,
message: format!("missing config for host {identifier}"),
}
}
#[must_use]
pub fn not_ready(identifier: &str) -> Self {
Self {
code: CfgsyncErrorCode::NotReady,
message: format!("config for host {identifier} is not ready"),
}
}
#[must_use]
pub fn internal(message: impl Into<String>) -> Self {
Self {
code: CfgsyncErrorCode::Internal,
message: message.into(),
}
}
}
/// Resolution outcome for a requested node identifier.
pub enum ConfigResolveResponse {
Config(NodeArtifactsPayload),
Error(CfgsyncErrorResponse),
}
/// Outcome for a node registration request.
pub enum RegisterNodeResponse {
Registered,
Error(CfgsyncErrorResponse),
}
#[cfg(test)]
mod tests {
use serde::{Deserialize, Serialize};
use serde_json::Value;
use super::{NodeRegistration, RegistrationPayload};
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
struct ExampleRegistration {
network_port: u16,
service: String,
}
#[test]
fn registration_payload_round_trips_typed_value() {
let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip"))
.with_metadata(&ExampleRegistration {
network_port: 3000,
service: "blend".to_owned(),
})
.expect("serialize registration metadata");
let encoded = serde_json::to_value(&registration).expect("serialize registration");
let metadata = encoded.get("metadata").expect("registration metadata");
assert_eq!(metadata.get("network_port"), Some(&Value::from(3000u16)));
assert_eq!(metadata.get("service"), Some(&Value::from("blend")));
let decoded: NodeRegistration =
serde_json::from_value(encoded).expect("deserialize registration");
let typed: ExampleRegistration = decoded
.metadata
.deserialize()
.expect("deserialize metadata")
.expect("registration metadata value");
assert_eq!(typed.network_port, 3000);
assert_eq!(typed.service, "blend");
}
#[test]
fn registration_payload_accepts_raw_json() {
let payload =
RegistrationPayload::from_json_str(r#"{"network_port":3000}"#).expect("parse raw json");
assert_eq!(payload.raw_json(), Some(r#"{"network_port":3000}"#));
}
}

View File

@ -1,523 +0,0 @@
use std::{collections::HashMap, fs, net::Ipv4Addr, path::Path, sync::Arc};
use cfgsync_artifacts::ArtifactFile;
use serde::{Deserialize, Deserializer, Serialize, Serializer, de::DeserializeOwned};
use serde_json::Value;
use thiserror::Error;
use crate::{NodeArtifactsBundle, NodeArtifactsBundleEntry};
/// Schema version served by cfgsync payload responses.
pub const CFGSYNC_SCHEMA_VERSION: u16 = 1;
/// Canonical cfgsync file type used in payloads and bundles.
pub type NodeArtifactFile = ArtifactFile;
/// Payload returned by cfgsync server for one node.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeArtifactsPayload {
/// Payload schema version for compatibility checks.
pub schema_version: u16,
/// Files that must be written on the target node.
#[serde(default)]
pub files: Vec<NodeArtifactFile>,
}
/// Adapter-owned registration payload stored alongside a generic node identity.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct RegistrationPayload {
raw_json: Option<String>,
}
impl RegistrationPayload {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.raw_json.is_none()
}
pub fn from_serializable<T>(value: &T) -> Result<Self, serde_json::Error>
where
T: Serialize,
{
Ok(Self {
raw_json: Some(serde_json::to_string(value)?),
})
}
pub fn from_json_str(raw_json: &str) -> Result<Self, serde_json::Error> {
let value: Value = serde_json::from_str(raw_json)?;
Ok(Self {
raw_json: Some(serde_json::to_string(&value)?),
})
}
pub fn deserialize<T>(&self) -> Result<Option<T>, serde_json::Error>
where
T: DeserializeOwned,
{
self.raw_json
.as_ref()
.map(|raw_json| serde_json::from_str(raw_json))
.transpose()
}
#[must_use]
pub fn raw_json(&self) -> Option<&str> {
self.raw_json.as_deref()
}
}
impl Serialize for RegistrationPayload {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match self.raw_json.as_deref() {
Some(raw_json) => {
let value: Value =
serde_json::from_str(raw_json).map_err(serde::ser::Error::custom)?;
value.serialize(serializer)
}
None => serializer.serialize_none(),
}
}
}
impl<'de> Deserialize<'de> for RegistrationPayload {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let value = Option::<Value>::deserialize(deserializer)?;
let raw_json = value
.map(|value| serde_json::to_string(&value).map_err(serde::de::Error::custom))
.transpose()?;
Ok(Self { raw_json })
}
}
/// Node metadata recorded before config materialization.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct NodeRegistration {
pub identifier: String,
pub ip: Ipv4Addr,
#[serde(default, skip_serializing_if = "RegistrationPayload::is_empty")]
pub metadata: RegistrationPayload,
}
impl NodeRegistration {
#[must_use]
pub fn new(identifier: impl Into<String>, ip: Ipv4Addr) -> Self {
Self {
identifier: identifier.into(),
ip,
metadata: RegistrationPayload::default(),
}
}
pub fn with_metadata<T>(mut self, metadata: &T) -> Result<Self, serde_json::Error>
where
T: Serialize,
{
self.metadata = RegistrationPayload::from_serializable(metadata)?;
Ok(self)
}
#[must_use]
pub fn with_payload(mut self, payload: RegistrationPayload) -> Self {
self.metadata = payload;
self
}
}
impl NodeArtifactsPayload {
#[must_use]
pub fn from_files(files: Vec<NodeArtifactFile>) -> Self {
Self {
schema_version: CFGSYNC_SCHEMA_VERSION,
files,
}
}
#[must_use]
pub fn files(&self) -> &[NodeArtifactFile] {
&self.files
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.files.is_empty()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CfgsyncErrorCode {
MissingConfig,
NotReady,
Internal,
}
/// Structured error body returned by cfgsync server.
#[derive(Debug, Clone, Serialize, Deserialize, Error)]
#[error("{code:?}: {message}")]
pub struct CfgsyncErrorResponse {
pub code: CfgsyncErrorCode,
pub message: String,
}
impl CfgsyncErrorResponse {
#[must_use]
pub fn missing_config(identifier: &str) -> Self {
Self {
code: CfgsyncErrorCode::MissingConfig,
message: format!("missing config for host {identifier}"),
}
}
#[must_use]
pub fn not_ready(identifier: &str) -> Self {
Self {
code: CfgsyncErrorCode::NotReady,
message: format!("config for host {identifier} is not ready"),
}
}
#[must_use]
pub fn internal(message: impl Into<String>) -> Self {
Self {
code: CfgsyncErrorCode::Internal,
message: message.into(),
}
}
}
/// Resolution outcome for a requested node identifier.
pub enum ConfigResolveResponse {
Config(NodeArtifactsPayload),
Error(CfgsyncErrorResponse),
}
/// Outcome for a node registration request.
pub enum RegisterNodeResponse {
Registered,
Error(CfgsyncErrorResponse),
}
/// Source of cfgsync node payloads.
pub trait NodeConfigSource: Send + Sync {
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse;
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse;
}
/// In-memory map-backed source used by cfgsync server state.
pub struct StaticConfigSource {
configs: HashMap<String, NodeArtifactsPayload>,
}
impl StaticConfigSource {
#[must_use]
pub fn from_bundle(configs: HashMap<String, NodeArtifactsPayload>) -> Arc<Self> {
Arc::new(Self { configs })
}
}
impl NodeConfigSource for StaticConfigSource {
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse {
if self.configs.contains_key(&registration.identifier) {
RegisterNodeResponse::Registered
} else {
RegisterNodeResponse::Error(CfgsyncErrorResponse::missing_config(
&registration.identifier,
))
}
}
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse {
self.configs
.get(&registration.identifier)
.cloned()
.map_or_else(
|| {
ConfigResolveResponse::Error(CfgsyncErrorResponse::missing_config(
&registration.identifier,
))
},
ConfigResolveResponse::Config,
)
}
}
#[derive(Debug, Error)]
pub enum BundleLoadError {
#[error("reading cfgsync bundle {path}: {source}")]
ReadBundle {
path: String,
#[source]
source: std::io::Error,
},
#[error("parsing cfgsync bundle {path}: {source}")]
ParseBundle {
path: String,
#[source]
source: serde_yaml::Error,
},
}
#[must_use]
pub fn bundle_to_payload_map(bundle: NodeArtifactsBundle) -> HashMap<String, NodeArtifactsPayload> {
bundle
.nodes
.into_iter()
.map(|node| {
let NodeArtifactsBundleEntry { identifier, files } = node;
(identifier, NodeArtifactsPayload::from_files(files))
})
.collect()
}
pub fn load_bundle(path: &Path) -> Result<NodeArtifactsBundle, BundleLoadError> {
let path_string = path.display().to_string();
let raw = fs::read_to_string(path).map_err(|source| BundleLoadError::ReadBundle {
path: path_string.clone(),
source,
})?;
serde_yaml::from_str(&raw).map_err(|source| BundleLoadError::ParseBundle {
path: path_string,
source,
})
}
#[cfg(test)]
mod tests {
use std::io::Write as _;
use tempfile::NamedTempFile;
use super::*;
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
struct ExampleRegistration {
network_port: u16,
service: String,
}
#[test]
fn registration_payload_round_trips_typed_value() {
let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip"))
.with_metadata(&ExampleRegistration {
network_port: 3000,
service: "blend".to_owned(),
})
.expect("serialize registration metadata");
let encoded = serde_json::to_value(&registration).expect("serialize registration");
let metadata = encoded.get("metadata").expect("registration metadata");
assert_eq!(metadata.get("network_port"), Some(&Value::from(3000u16)));
assert_eq!(metadata.get("service"), Some(&Value::from("blend")));
let decoded: NodeRegistration =
serde_json::from_value(encoded).expect("deserialize registration");
let typed: ExampleRegistration = decoded
.metadata
.deserialize()
.expect("deserialize metadata")
.expect("registration metadata value");
assert_eq!(typed.network_port, 3000);
assert_eq!(typed.service, "blend");
}
fn sample_payload() -> NodeArtifactsPayload {
NodeArtifactsPayload::from_files(vec![NodeArtifactFile::new("/config.yaml", "key: value")])
}
#[test]
fn resolves_existing_identifier() {
let mut configs = HashMap::new();
configs.insert("node-1".to_owned(), sample_payload());
let repo = StaticConfigSource { configs };
match repo.resolve(&NodeRegistration::new(
"node-1",
"127.0.0.1".parse().expect("parse ip"),
)) {
ConfigResolveResponse::Config(payload) => {
assert_eq!(payload.schema_version, CFGSYNC_SCHEMA_VERSION);
assert_eq!(payload.files.len(), 1);
assert_eq!(payload.files[0].path, "/config.yaml");
}
ConfigResolveResponse::Error(error) => panic!("expected config response, got {error}"),
}
}
#[test]
fn reports_missing_identifier() {
let repo = StaticConfigSource {
configs: HashMap::new(),
};
match repo.resolve(&NodeRegistration::new(
"unknown-node",
"127.0.0.1".parse().expect("parse ip"),
)) {
ConfigResolveResponse::Config(_) => panic!("expected missing-config error"),
ConfigResolveResponse::Error(error) => {
assert!(matches!(error.code, CfgsyncErrorCode::MissingConfig));
assert!(error.message.contains("unknown-node"));
}
}
}
#[test]
fn loads_file_provider_bundle() {
let mut bundle_file = NamedTempFile::new().expect("create temp bundle");
let yaml = r#"
nodes:
- identifier: node-1
files:
- path: /config.yaml
content: "a: 1"
"#;
bundle_file
.write_all(yaml.as_bytes())
.expect("write bundle yaml");
let provider =
BundleConfigSource::from_yaml_file(bundle_file.path()).expect("load file provider");
let _ = provider.register(NodeRegistration::new(
"node-1",
"127.0.0.1".parse().expect("parse ip"),
));
match provider.resolve(&NodeRegistration::new(
"node-1",
"127.0.0.1".parse().expect("parse ip"),
)) {
ConfigResolveResponse::Config(payload) => assert_eq!(payload.files.len(), 1),
ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"),
}
}
#[test]
fn resolve_accepts_known_registration_without_gating() {
let mut configs = HashMap::new();
configs.insert("node-1".to_owned(), sample_payload());
let repo = StaticConfigSource { configs };
match repo.resolve(&NodeRegistration::new(
"node-1",
"127.0.0.1".parse().expect("parse ip"),
)) {
ConfigResolveResponse::Config(_) => {}
ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"),
}
}
}
/// Failures when loading a bundle-backed cfgsync source.
#[derive(Debug, Error)]
pub enum BundleConfigSourceError {
#[error("failed to read cfgsync bundle at {path}: {source}")]
Read {
path: String,
#[source]
source: std::io::Error,
},
#[error("failed to parse cfgsync bundle at {path}: {source}")]
Parse {
path: String,
#[source]
source: serde_yaml::Error,
},
}
/// YAML bundle-backed source implementation.
pub struct BundleConfigSource {
inner: StaticConfigSource,
}
impl BundleConfigSource {
/// Loads provider state from a cfgsync bundle YAML file.
pub fn from_yaml_file(path: &Path) -> Result<Self, BundleConfigSourceError> {
let raw = fs::read_to_string(path).map_err(|source| BundleConfigSourceError::Read {
path: path.display().to_string(),
source,
})?;
let bundle: NodeArtifactsBundle =
serde_yaml::from_str(&raw).map_err(|source| BundleConfigSourceError::Parse {
path: path.display().to_string(),
source,
})?;
let configs = bundle
.nodes
.into_iter()
.map(payload_from_bundle_node)
.collect();
Ok(Self {
inner: StaticConfigSource { configs },
})
}
}
impl NodeConfigSource for BundleConfigSource {
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse {
self.inner.register(registration)
}
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse {
self.inner.resolve(registration)
}
}
fn payload_from_bundle_node(node: NodeArtifactsBundleEntry) -> (String, NodeArtifactsPayload) {
(
node.identifier,
NodeArtifactsPayload::from_files(node.files),
)
}
#[doc(hidden)]
pub type RepoResponse = ConfigResolveResponse;
#[doc(hidden)]
pub type RegistrationResponse = RegisterNodeResponse;
#[doc(hidden)]
pub trait ConfigProvider: NodeConfigSource {}
impl<T: NodeConfigSource + ?Sized> ConfigProvider for T {}
#[doc(hidden)]
pub type ConfigRepo = StaticConfigSource;
#[doc(hidden)]
pub type FileConfigProvider = BundleConfigSource;
#[doc(hidden)]
pub type FileConfigProviderError = BundleConfigSourceError;
#[doc(hidden)]
pub type CfgSyncFile = NodeArtifactFile;
#[doc(hidden)]
pub type CfgSyncPayload = NodeArtifactsPayload;
#[doc(hidden)]
pub type CfgSyncErrorCode = CfgsyncErrorCode;
#[doc(hidden)]
pub type CfgSyncErrorResponse = CfgsyncErrorResponse;

View File

@ -3,7 +3,7 @@ use std::{io, sync::Arc};
use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::post};
use thiserror::Error;
use crate::repo::{
use crate::{
CfgsyncErrorCode, ConfigResolveResponse, NodeConfigSource, NodeRegistration,
RegisterNodeResponse,
};
@ -84,6 +84,14 @@ fn error_status(code: &CfgsyncErrorCode) -> StatusCode {
}
pub fn build_cfgsync_router(state: CfgsyncServerState) -> Router {
Router::new()
.route("/register", post(register_node))
.route("/node", post(node_config))
.with_state(Arc::new(state))
}
#[doc(hidden)]
pub fn build_legacy_cfgsync_router(state: CfgsyncServerState) -> Router {
Router::new()
.route("/register", post(register_node))
.route("/node", post(node_config))
@ -108,14 +116,6 @@ pub async fn serve_cfgsync(port: u16, state: CfgsyncServerState) -> Result<(), R
Ok(())
}
#[doc(hidden)]
pub type CfgSyncState = CfgsyncServerState;
#[doc(hidden)]
pub use build_cfgsync_router as cfgsync_app;
#[doc(hidden)]
pub use serve_cfgsync as run_cfgsync;
#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc};
@ -123,7 +123,7 @@ mod tests {
use axum::{Json, extract::State, http::StatusCode, response::IntoResponse};
use super::{CfgsyncServerState, NodeRegistration, node_config, register_node};
use crate::repo::{
use crate::{
CFGSYNC_SCHEMA_VERSION, CfgsyncErrorCode, CfgsyncErrorResponse, ConfigResolveResponse,
NodeArtifactFile, NodeArtifactsPayload, NodeConfigSource, RegisterNodeResponse,
};

264
cfgsync/core/src/source.rs Normal file
View File

@ -0,0 +1,264 @@
use std::{collections::HashMap, fs, path::Path, sync::Arc};
use thiserror::Error;
use crate::{
NodeArtifactsBundle, NodeArtifactsBundleEntry, NodeArtifactsPayload, NodeRegistration,
RegisterNodeResponse, protocol::ConfigResolveResponse,
};
/// Source of cfgsync node payloads.
pub trait NodeConfigSource: Send + Sync {
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse;
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse;
}
/// In-memory map-backed source used by cfgsync server state.
pub struct StaticConfigSource {
configs: HashMap<String, NodeArtifactsPayload>,
}
impl StaticConfigSource {
#[must_use]
pub fn from_payloads(configs: HashMap<String, NodeArtifactsPayload>) -> Arc<Self> {
Arc::new(Self { configs })
}
#[must_use]
pub fn from_bundle(bundle: NodeArtifactsBundle) -> Arc<Self> {
Self::from_payloads(bundle_to_payload_map(bundle))
}
}
impl NodeConfigSource for StaticConfigSource {
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse {
if self.configs.contains_key(&registration.identifier) {
RegisterNodeResponse::Registered
} else {
RegisterNodeResponse::Error(crate::CfgsyncErrorResponse::missing_config(
&registration.identifier,
))
}
}
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse {
self.configs
.get(&registration.identifier)
.cloned()
.map_or_else(
|| {
ConfigResolveResponse::Error(crate::CfgsyncErrorResponse::missing_config(
&registration.identifier,
))
},
ConfigResolveResponse::Config,
)
}
}
#[derive(Debug, Error)]
pub enum BundleLoadError {
#[error("reading cfgsync bundle {path}: {source}")]
ReadBundle {
path: String,
#[source]
source: std::io::Error,
},
#[error("parsing cfgsync bundle {path}: {source}")]
ParseBundle {
path: String,
#[source]
source: serde_yaml::Error,
},
}
#[must_use]
pub fn bundle_to_payload_map(bundle: NodeArtifactsBundle) -> HashMap<String, NodeArtifactsPayload> {
bundle
.nodes
.into_iter()
.map(|node| {
let NodeArtifactsBundleEntry { identifier, files } = node;
(identifier, NodeArtifactsPayload::from_files(files))
})
.collect()
}
pub fn load_bundle(path: &Path) -> Result<NodeArtifactsBundle, BundleLoadError> {
let path_string = path.display().to_string();
let raw = fs::read_to_string(path).map_err(|source| BundleLoadError::ReadBundle {
path: path_string.clone(),
source,
})?;
serde_yaml::from_str(&raw).map_err(|source| BundleLoadError::ParseBundle {
path: path_string,
source,
})
}
/// Failures when loading a bundle-backed cfgsync source.
#[derive(Debug, Error)]
pub enum BundleConfigSourceError {
#[error("failed to read cfgsync bundle at {path}: {source}")]
Read {
path: String,
#[source]
source: std::io::Error,
},
#[error("failed to parse cfgsync bundle at {path}: {source}")]
Parse {
path: String,
#[source]
source: serde_yaml::Error,
},
}
/// YAML bundle-backed source implementation.
pub struct BundleConfigSource {
inner: StaticConfigSource,
}
impl BundleConfigSource {
/// Loads source state from a cfgsync bundle YAML file.
pub fn from_yaml_file(path: &Path) -> Result<Self, BundleConfigSourceError> {
let raw = fs::read_to_string(path).map_err(|source| BundleConfigSourceError::Read {
path: path.display().to_string(),
source,
})?;
let bundle: NodeArtifactsBundle =
serde_yaml::from_str(&raw).map_err(|source| BundleConfigSourceError::Parse {
path: path.display().to_string(),
source,
})?;
let configs = bundle
.nodes
.into_iter()
.map(payload_from_bundle_node)
.collect();
Ok(Self {
inner: StaticConfigSource { configs },
})
}
}
impl NodeConfigSource for BundleConfigSource {
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse {
self.inner.register(registration)
}
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse {
self.inner.resolve(registration)
}
}
fn payload_from_bundle_node(node: NodeArtifactsBundleEntry) -> (String, NodeArtifactsPayload) {
(
node.identifier,
NodeArtifactsPayload::from_files(node.files),
)
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, io::Write as _};
use tempfile::NamedTempFile;
use super::{BundleConfigSource, StaticConfigSource};
use crate::{
CFGSYNC_SCHEMA_VERSION, CfgsyncErrorCode, ConfigResolveResponse, NodeArtifactFile,
NodeArtifactsPayload, NodeConfigSource, NodeRegistration,
};
fn sample_payload() -> NodeArtifactsPayload {
NodeArtifactsPayload::from_files(vec![NodeArtifactFile::new("/config.yaml", "key: value")])
}
#[test]
fn resolves_existing_identifier() {
let mut configs = HashMap::new();
configs.insert("node-1".to_owned(), sample_payload());
let repo = StaticConfigSource { configs };
match repo.resolve(&NodeRegistration::new(
"node-1",
"127.0.0.1".parse().expect("parse ip"),
)) {
ConfigResolveResponse::Config(payload) => {
assert_eq!(payload.schema_version, CFGSYNC_SCHEMA_VERSION);
assert_eq!(payload.files.len(), 1);
assert_eq!(payload.files[0].path, "/config.yaml");
}
ConfigResolveResponse::Error(error) => panic!("expected config response, got {error}"),
}
}
#[test]
fn reports_missing_identifier() {
let repo = StaticConfigSource {
configs: HashMap::new(),
};
match repo.resolve(&NodeRegistration::new(
"unknown-node",
"127.0.0.1".parse().expect("parse ip"),
)) {
ConfigResolveResponse::Config(_) => panic!("expected missing-config error"),
ConfigResolveResponse::Error(error) => {
assert!(matches!(error.code, CfgsyncErrorCode::MissingConfig));
assert!(error.message.contains("unknown-node"));
}
}
}
#[test]
fn loads_file_provider_bundle() {
let mut bundle_file = NamedTempFile::new().expect("create temp bundle");
let yaml = r#"
nodes:
- identifier: node-1
files:
- path: /config.yaml
content: "a: 1"
"#;
bundle_file
.write_all(yaml.as_bytes())
.expect("write bundle yaml");
let provider =
BundleConfigSource::from_yaml_file(bundle_file.path()).expect("load file provider");
let _ = provider.register(NodeRegistration::new(
"node-1",
"127.0.0.1".parse().expect("parse ip"),
));
match provider.resolve(&NodeRegistration::new(
"node-1",
"127.0.0.1".parse().expect("parse ip"),
)) {
ConfigResolveResponse::Config(payload) => assert_eq!(payload.files.len(), 1),
ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"),
}
}
#[test]
fn resolve_accepts_known_registration_without_gating() {
let mut configs = HashMap::new();
configs.insert("node-1".to_owned(), sample_payload());
let repo = StaticConfigSource { configs };
match repo.resolve(&NodeRegistration::new(
"node-1",
"127.0.0.1".parse().expect("parse ip"),
)) {
ConfigResolveResponse::Config(_) => {}
ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"),
}
}
}

View File

@ -1,10 +1,10 @@
use std::path::PathBuf;
use cfgsync_runtime::run_cfgsync_server;
use cfgsync_runtime::serve_cfgsync_from_config;
use clap::Parser;
#[derive(Parser, Debug)]
#[command(about = "CfgSync")]
#[command(about = "Cfgsync server")]
struct Args {
config: PathBuf,
}
@ -12,5 +12,5 @@ struct Args {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
run_cfgsync_server(&args.config).await
serve_cfgsync_from_config(&args.config).await
}

View File

@ -184,11 +184,9 @@ fn parse_registration_payload(raw: &str) -> Result<RegistrationPayload> {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use cfgsync_core::{
CfgsyncServerState, NodeArtifactsBundle, NodeArtifactsBundleEntry, NodeArtifactsPayload,
StaticConfigSource, serve_cfgsync,
CfgsyncServerState, NodeArtifactsBundle, NodeArtifactsBundleEntry, StaticConfigSource,
serve_cfgsync,
};
use tempfile::tempdir;
@ -208,7 +206,7 @@ mod tests {
],
}]);
let repo = StaticConfigSource::from_bundle(bundle_to_payload_map(bundle));
let repo = StaticConfigSource::from_bundle(bundle);
let state = CfgsyncServerState::new(repo);
let port = allocate_test_port();
let address = format!("http://127.0.0.1:{port}");
@ -234,19 +232,6 @@ mod tests {
assert_eq!(app_config, "app_key: app_value");
assert_eq!(deployment, "mode: local");
}
fn bundle_to_payload_map(bundle: NodeArtifactsBundle) -> HashMap<String, NodeArtifactsPayload> {
bundle
.nodes
.into_iter()
.map(|node| {
let NodeArtifactsBundleEntry { identifier, files } = node;
(identifier, NodeArtifactsPayload::from_files(files))
})
.collect()
}
fn allocate_test_port() -> u16 {
let listener =
std::net::TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port for test");

View File

@ -4,8 +4,7 @@ mod client;
mod server;
pub use client::run_cfgsync_client_from_env;
#[doc(hidden)]
pub use server::CfgSyncServerConfig;
pub use server::{
CfgsyncServerConfig, CfgsyncServingMode, LoadCfgsyncServerConfigError, run_cfgsync_server,
CfgsyncServerConfig, CfgsyncServingMode, LoadCfgsyncServerConfigError,
serve_cfgsync_from_config,
};

View File

@ -1,7 +1,7 @@
use std::{fs, path::Path, sync::Arc};
use anyhow::Context as _;
use cfgsync_adapter::{NodeArtifacts, NodeArtifactsCatalog, RegistrationConfigProvider};
use cfgsync_adapter::{MaterializingConfigSource, NodeArtifacts, NodeArtifactsCatalog};
use cfgsync_core::{
BundleConfigSource, CfgsyncServerState, NodeArtifactsBundle, NodeConfigSource, serve_cfgsync,
};
@ -25,16 +25,6 @@ pub enum CfgsyncServingMode {
Registration,
}
#[derive(Debug, Deserialize)]
struct RawCfgsyncServerConfig {
port: u16,
bundle_path: String,
#[serde(default)]
serving_mode: Option<CfgsyncServingMode>,
#[serde(default)]
registration_flow: Option<bool>,
}
#[derive(Debug, Error)]
pub enum LoadCfgsyncServerConfigError {
#[error("failed to read cfgsync config file {path}: {source}")]
@ -61,20 +51,11 @@ impl CfgsyncServerConfig {
source,
})?;
let raw: RawCfgsyncServerConfig =
serde_yaml::from_str(&config_content).map_err(|source| {
LoadCfgsyncServerConfigError::Parse {
path: config_path,
source,
}
})?;
Ok(Self {
port: raw.port,
bundle_path: raw.bundle_path,
serving_mode: raw
.serving_mode
.unwrap_or_else(|| mode_from_legacy_registration_flow(raw.registration_flow)),
serde_yaml::from_str(&config_content).map_err(|source| {
LoadCfgsyncServerConfigError::Parse {
path: config_path,
source,
}
})
}
@ -97,14 +78,6 @@ impl CfgsyncServerConfig {
}
}
fn mode_from_legacy_registration_flow(registration_flow: Option<bool>) -> CfgsyncServingMode {
if registration_flow.unwrap_or(false) {
CfgsyncServingMode::Registration
} else {
CfgsyncServingMode::Bundle
}
}
fn load_bundle_provider(bundle_path: &Path) -> anyhow::Result<Arc<dyn NodeConfigSource>> {
let provider = BundleConfigSource::from_yaml_file(bundle_path)
.with_context(|| format!("loading cfgsync provider from {}", bundle_path.display()))?;
@ -112,10 +85,10 @@ fn load_bundle_provider(bundle_path: &Path) -> anyhow::Result<Arc<dyn NodeConfig
Ok(Arc::new(provider))
}
fn load_materializing_provider(bundle_path: &Path) -> anyhow::Result<Arc<dyn NodeConfigSource>> {
fn load_registration_source(bundle_path: &Path) -> anyhow::Result<Arc<dyn NodeConfigSource>> {
let bundle = load_bundle_yaml(bundle_path)?;
let catalog = build_node_catalog(bundle);
let provider = RegistrationConfigProvider::new(catalog);
let provider = MaterializingConfigSource::new(catalog);
Ok(Arc::new(provider))
}
@ -154,7 +127,7 @@ fn resolve_bundle_path(config_path: &Path, bundle_path: &str) -> std::path::Path
}
/// Loads runtime config and starts cfgsync HTTP server process.
pub async fn run_cfgsync_server(config_path: &Path) -> anyhow::Result<()> {
pub async fn serve_cfgsync_from_config(config_path: &Path) -> anyhow::Result<()> {
let config = CfgsyncServerConfig::load_from_file(config_path)?;
let bundle_path = resolve_bundle_path(config_path, &config.bundle_path);
@ -170,11 +143,8 @@ fn build_server_state(
) -> anyhow::Result<CfgsyncServerState> {
let repo = match config.serving_mode {
CfgsyncServingMode::Bundle => load_bundle_provider(bundle_path)?,
CfgsyncServingMode::Registration => load_materializing_provider(bundle_path)?,
CfgsyncServingMode::Registration => load_registration_source(bundle_path)?,
};
Ok(CfgsyncServerState::new(repo))
}
#[doc(hidden)]
pub type CfgSyncServerConfig = CfgsyncServerConfig;

View File

@ -1,5 +1,5 @@
use anyhow::Result;
use cfgsync_adapter::{CfgsyncEnv, build_cfgsync_node_catalog};
use cfgsync_adapter::{DeploymentAdapter, build_node_artifact_catalog};
pub(crate) use cfgsync_core::render::CfgsyncOutputPaths;
use cfgsync_core::{
NodeArtifactsBundle, NodeArtifactsBundleEntry,
@ -27,7 +27,7 @@ enum BundleRenderError {
MissingYamlKey { key: String },
}
pub(crate) fn render_cfgsync_from_template<E: CfgsyncEnv>(
pub(crate) fn render_cfgsync_from_template<E: DeploymentAdapter>(
topology: &E::Deployment,
hostnames: &[String],
options: CfgsyncRenderOptions,
@ -45,11 +45,11 @@ pub(crate) fn render_cfgsync_from_template<E: CfgsyncEnv>(
})
}
fn build_cfgsync_bundle<E: CfgsyncEnv>(
fn build_cfgsync_bundle<E: DeploymentAdapter>(
topology: &E::Deployment,
hostnames: &[String],
) -> Result<NodeArtifactsBundle> {
let nodes = build_cfgsync_node_catalog::<E>(topology, hostnames)?.into_configs();
let nodes = build_node_artifact_catalog::<E>(topology, hostnames)?.into_nodes();
let nodes = nodes
.into_iter()
.map(|node| NodeArtifactsBundleEntry {
@ -129,7 +129,7 @@ fn build_cfgsync_server_config() -> Value {
Value::Mapping(root)
}
pub(crate) fn render_and_write_cfgsync_from_template<E: CfgsyncEnv>(
pub(crate) fn render_and_write_cfgsync_from_template<E: DeploymentAdapter>(
topology: &E::Deployment,
hostnames: &[String],
mut options: CfgsyncRenderOptions,
@ -143,7 +143,7 @@ pub(crate) fn render_and_write_cfgsync_from_template<E: CfgsyncEnv>(
Ok(rendered)
}
fn build_overrides<E: CfgsyncEnv>(
fn build_overrides<E: DeploymentAdapter>(
topology: &E::Deployment,
options: CfgsyncRenderOptions,
) -> CfgsyncConfigOverrides {

View File

@ -1 +1,5 @@
pub use cfgsync_adapter::*;
#[doc(hidden)]
pub use cfgsync_adapter::{
DeploymentAdapter as CfgsyncEnv, build_node_artifact_catalog as build_cfgsync_node_catalog,
};