Simplify cfgsync adapter surface

This commit is contained in:
andrussal 2026-03-12 07:44:20 +01:00
parent fd154a9487
commit fb0129020c
11 changed files with 248 additions and 566 deletions

1
Cargo.lock generated
View File

@ -958,6 +958,7 @@ dependencies = [
"anyhow",
"axum",
"cfgsync-adapter",
"cfgsync-artifacts",
"cfgsync-core",
"clap",
"serde",

View File

@ -56,25 +56,19 @@ Typical flow:
Adapter-facing materialization layer.
Primary types:
- `NodeArtifacts`
- `NodeArtifactsCatalog`
- `MaterializedArtifacts`
- `RegistrationSnapshot`
- `NodeArtifactsMaterializer`
- `RegistrationSnapshotMaterializer`
- `CachedSnapshotMaterializer`
- `MaterializingConfigSource`
- `SnapshotConfigSource`
- `RegistrationConfigSource`
- `DeploymentAdapter`
This crate is where app-specific bootstrap logic plugs in.
Two useful patterns exist:
- single-node materialization
- `NodeArtifactsMaterializer`
- whole-snapshot materialization
- `RegistrationSnapshotMaterializer`
The main pattern is snapshot materialization:
- `RegistrationSnapshotMaterializer`
Use snapshot materialization when readiness depends on the full registered set.
Use it when readiness depends on the full registered set.
### `cfgsync-runtime`
Small runtime helpers and binaries.
@ -111,7 +105,7 @@ Config is produced from node registrations.
Use:
- `RegistrationSnapshotMaterializer`
- `CachedSnapshotMaterializer`
- `SnapshotConfigSource`
- `RegistrationConfigSource`
- `serve_snapshot_cfgsync(...)`
This is the right model when config readiness depends on the current registered set.
@ -151,10 +145,10 @@ let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().unwrap())
```rust
use cfgsync_adapter::{
DynCfgsyncError, NodeArtifacts, NodeArtifactsCatalog, RegistrationSnapshot,
DynCfgsyncError, MaterializationResult, MaterializedArtifacts, RegistrationSnapshot,
RegistrationSnapshotMaterializer,
};
use cfgsync_artifacts::ArtifactFile;
use cfgsync_artifacts::{ArtifactFile, ArtifactSet};
struct MyMaterializer;
@ -162,23 +156,24 @@ impl RegistrationSnapshotMaterializer for MyMaterializer {
fn materialize_snapshot(
&self,
registrations: &RegistrationSnapshot,
) -> Result<Option<NodeArtifactsCatalog>, DynCfgsyncError> {
) -> Result<MaterializationResult, DynCfgsyncError> {
if registrations.len() < 2 {
return Ok(None);
return Ok(MaterializationResult::NotReady);
}
let nodes = registrations
.iter()
.map(|registration| NodeArtifacts {
identifier: registration.identifier.clone(),
files: vec![ArtifactFile::new(
.map(|registration| (
registration.identifier.clone(),
ArtifactSet::new(vec![ArtifactFile::new(
"/config.yaml",
format!("id: {}\n", registration.identifier),
)],
})
.collect();
)]),
));
Ok(Some(NodeArtifactsCatalog::new(nodes)))
Ok(MaterializationResult::ready(
MaterializedArtifacts::from_nodes(nodes),
))
}
}
```

View File

@ -1,159 +1,38 @@
use std::collections::HashMap;
use cfgsync_artifacts::ArtifactFile;
use serde::{Deserialize, Serialize};
use cfgsync_artifacts::{ArtifactFile, ArtifactSet};
/// Per-node artifact payload served by cfgsync for one registered node.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeArtifacts {
/// Stable node identifier resolved by the adapter.
pub identifier: String,
/// Files served to the node after cfgsync registration.
pub files: Vec<ArtifactFile>,
}
/// Materialized artifact files for a single registered node.
#[derive(Debug, Clone, Default)]
pub struct ArtifactSet {
files: Vec<ArtifactFile>,
}
impl ArtifactSet {
/// Creates one logical artifact group.
///
/// The same type is used for:
/// - node-local files that belong to one node only
/// - shared files that should be delivered alongside every node
#[must_use]
pub fn new(files: Vec<ArtifactFile>) -> Self {
Self { files }
}
/// Returns the files carried by this artifact group.
#[must_use]
pub fn files(&self) -> &[ArtifactFile] {
&self.files
}
/// Returns `true` when the group contains no files.
#[must_use]
pub fn is_empty(&self) -> bool {
self.files.is_empty()
}
/// Consumes the group and returns its files.
#[must_use]
pub fn into_files(self) -> Vec<ArtifactFile> {
self.files
}
}
/// Resolved artifact payload for one node, including any shared files that
/// should be delivered alongside the node-local files.
#[derive(Debug, Clone, Default)]
pub struct ResolvedNodeArtifacts {
node: ArtifactSet,
shared: ArtifactSet,
}
impl ResolvedNodeArtifacts {
/// Creates the resolved file set for one node.
#[must_use]
pub fn new(node: ArtifactSet, shared: ArtifactSet) -> Self {
Self { node, shared }
}
/// Returns the node-local files.
#[must_use]
pub fn node(&self) -> &ArtifactSet {
&self.node
}
/// Returns the shared files delivered alongside every node.
#[must_use]
pub fn shared(&self) -> &ArtifactSet {
&self.shared
}
/// Returns the full file list that should be written for this node.
#[must_use]
pub fn files(&self) -> Vec<ArtifactFile> {
let mut files = self.node.files().to_vec();
files.extend_from_slice(self.shared.files());
files
}
}
/// Artifact payloads indexed by stable node identifier.
#[derive(Debug, Clone, Default)]
pub struct NodeArtifactsCatalog {
nodes: HashMap<String, NodeArtifacts>,
}
impl NodeArtifactsCatalog {
/// Creates a catalog indexed by stable node identifier.
#[must_use]
pub fn new(nodes: Vec<NodeArtifacts>) -> Self {
let nodes = nodes
.into_iter()
.map(|node| (node.identifier.clone(), node))
.collect();
Self { nodes }
}
/// Resolves one node's local artifacts by identifier.
#[must_use]
pub fn resolve(&self, identifier: &str) -> Option<&NodeArtifacts> {
self.nodes.get(identifier)
}
/// Returns the number of nodes in the catalog.
#[must_use]
pub fn len(&self) -> usize {
self.nodes.len()
}
/// Returns `true` when the catalog is empty.
#[must_use]
pub fn is_empty(&self) -> bool {
self.nodes.is_empty()
}
/// Consumes the catalog and returns its node entries.
#[must_use]
pub fn into_nodes(self) -> Vec<NodeArtifacts> {
self.nodes.into_values().collect()
}
}
/// Materialized cfgsync output for a whole registration set.
/// Fully materialized cfgsync artifacts for a registration set.
///
/// `nodes` holds the node-local files keyed by stable node identifier.
/// `shared` holds files that should be delivered alongside every node.
#[derive(Debug, Clone, Default)]
pub struct MaterializedArtifacts {
nodes: NodeArtifactsCatalog,
nodes: HashMap<String, ArtifactSet>,
shared: ArtifactSet,
}
impl MaterializedArtifacts {
/// Creates a fully materialized cfgsync result.
///
/// `nodes` contains node-specific files.
/// `shared` contains files that should accompany every node.
/// Creates materialized artifacts from node-local artifact sets.
#[must_use]
pub fn new(nodes: NodeArtifactsCatalog, shared: ArtifactSet) -> Self {
Self { nodes, shared }
pub fn from_nodes(nodes: impl IntoIterator<Item = (String, ArtifactSet)>) -> Self {
Self {
nodes: nodes.into_iter().collect(),
shared: ArtifactSet::default(),
}
}
/// Creates a materialized result without any shared files.
/// Attaches shared files delivered alongside every node.
#[must_use]
pub fn from_catalog(nodes: NodeArtifactsCatalog) -> Self {
Self::new(nodes, ArtifactSet::default())
pub fn with_shared(mut self, shared: ArtifactSet) -> Self {
self.shared = shared;
self
}
/// Returns the node-specific artifact catalog.
/// Returns the node-local artifact set for one identifier.
#[must_use]
pub fn nodes(&self) -> &NodeArtifactsCatalog {
&self.nodes
pub fn node(&self, identifier: &str) -> Option<&ArtifactSet> {
self.nodes.get(identifier)
}
/// Returns the shared artifact set.
@ -162,11 +41,31 @@ impl MaterializedArtifacts {
&self.shared
}
/// Resolves the full file set for one node.
/// Returns the number of node-local artifact sets.
#[must_use]
pub fn resolve(&self, identifier: &str) -> Option<ResolvedNodeArtifacts> {
self.nodes.resolve(identifier).map(|node| {
ResolvedNodeArtifacts::new(ArtifactSet::new(node.files.clone()), self.shared.clone())
})
pub fn len(&self) -> usize {
self.nodes.len()
}
/// Returns `true` when no node-local artifact sets are present.
#[must_use]
pub fn is_empty(&self) -> bool {
self.nodes.is_empty()
}
/// Resolves the full file set that should be written for one node.
#[must_use]
pub fn resolve(&self, identifier: &str) -> Option<ArtifactSet> {
let node = self.node(identifier)?;
let mut files: Vec<ArtifactFile> = node.files.clone();
files.extend(self.shared.files.iter().cloned());
Some(ArtifactSet::new(files))
}
/// Iterates node-local artifact sets by stable identifier.
pub fn iter(&self) -> impl Iterator<Item = (&str, &ArtifactSet)> {
self.nodes
.iter()
.map(|(identifier, artifacts)| (identifier.as_str(), artifacts))
}
}

View File

@ -1,9 +1,9 @@
use std::error::Error;
use std::{collections::HashMap, error::Error};
use cfgsync_artifacts::ArtifactFile;
use cfgsync_artifacts::{ArtifactFile, ArtifactSet};
use thiserror::Error;
use crate::{NodeArtifacts, NodeArtifactsCatalog};
use crate::MaterializedArtifacts;
/// Adapter contract for converting an application deployment model into
/// node-specific serialized config payloads.
@ -53,32 +53,25 @@ where
}
}
/// Builds cfgsync node configs for a deployment by:
/// Builds materialized cfgsync artifacts for a deployment by:
/// 1) validating hostname count,
/// 2) building each node config,
/// 3) rewriting host references,
/// 4) serializing each node payload.
pub fn build_cfgsync_node_configs<E: DeploymentAdapter>(
pub fn build_materialized_artifacts<E: DeploymentAdapter>(
deployment: &E::Deployment,
hostnames: &[String],
) -> Result<Vec<NodeArtifacts>, BuildCfgsyncNodesError> {
Ok(build_node_artifact_catalog::<E>(deployment, hostnames)?.into_nodes())
}
/// Builds cfgsync node configs and indexes them by stable identifier.
pub fn build_node_artifact_catalog<E: DeploymentAdapter>(
deployment: &E::Deployment,
hostnames: &[String],
) -> Result<NodeArtifactsCatalog, BuildCfgsyncNodesError> {
) -> Result<MaterializedArtifacts, BuildCfgsyncNodesError> {
let nodes = E::nodes(deployment);
ensure_hostname_count(nodes.len(), hostnames.len())?;
let mut output = Vec::with_capacity(nodes.len());
let mut output = HashMap::with_capacity(nodes.len());
for (index, node) in nodes.iter().enumerate() {
output.push(build_node_entry::<E>(deployment, node, index, hostnames)?);
let (identifier, artifacts) = build_node_entry::<E>(deployment, node, index, hostnames)?;
output.insert(identifier, artifacts);
}
Ok(NodeArtifactsCatalog::new(output))
Ok(MaterializedArtifacts::from_nodes(output))
}
fn ensure_hostname_count(nodes: usize, hostnames: usize) -> Result<(), BuildCfgsyncNodesError> {
@ -94,14 +87,14 @@ fn build_node_entry<E: DeploymentAdapter>(
node: &E::Node,
index: usize,
hostnames: &[String],
) -> Result<NodeArtifacts, BuildCfgsyncNodesError> {
) -> Result<(String, ArtifactSet), BuildCfgsyncNodesError> {
let node_config = build_rewritten_node_config::<E>(deployment, node, index, hostnames)?;
let config_yaml = E::serialize_node_config(&node_config).map_err(adapter_error)?;
Ok(NodeArtifacts {
identifier: E::node_identifier(index, node),
files: vec![ArtifactFile::new("/config.yaml", &config_yaml)],
})
Ok((
E::node_identifier(index, node),
ArtifactSet::new(vec![ArtifactFile::new("/config.yaml", &config_yaml)]),
))
}
fn build_rewritten_node_config<E: DeploymentAdapter>(

View File

@ -4,16 +4,11 @@ mod materializer;
mod registrations;
mod sources;
pub use artifacts::{
ArtifactSet, MaterializedArtifacts, NodeArtifacts, NodeArtifactsCatalog, ResolvedNodeArtifacts,
};
pub use deployment::{
BuildCfgsyncNodesError, DeploymentAdapter, build_cfgsync_node_configs,
build_node_artifact_catalog,
};
pub use artifacts::MaterializedArtifacts;
pub use deployment::{BuildCfgsyncNodesError, DeploymentAdapter, build_materialized_artifacts};
pub use materializer::{
CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, MaterializedArtifactsSink,
NodeArtifactsMaterializer, PersistingSnapshotMaterializer, RegistrationSnapshotMaterializer,
PersistingSnapshotMaterializer, RegistrationSnapshotMaterializer,
};
pub use registrations::RegistrationSnapshot;
pub use sources::{MaterializingConfigSource, SnapshotConfigSource};
pub use sources::RegistrationConfigSource;

View File

@ -1,32 +1,17 @@
use std::{error::Error, sync::Mutex};
use cfgsync_core::NodeRegistration;
use serde_json::to_string;
use crate::{MaterializedArtifacts, RegistrationSnapshot, ResolvedNodeArtifacts};
use crate::{MaterializedArtifacts, RegistrationSnapshot};
/// Type-erased cfgsync adapter error used to preserve source context.
pub type DynCfgsyncError = Box<dyn Error + Send + Sync + 'static>;
/// Adapter-side materialization contract for a single registered node.
pub trait NodeArtifactsMaterializer: Send + Sync {
/// Resolves one node from the current registration set.
///
/// Returning `Ok(None)` means the node is known but its artifacts are not
/// ready yet.
fn materialize(
&self,
registration: &NodeRegistration,
registrations: &RegistrationSnapshot,
) -> Result<Option<ResolvedNodeArtifacts>, DynCfgsyncError>;
}
/// Adapter contract for materializing a whole registration snapshot into
/// per-node cfgsync artifacts.
/// cfgsync artifacts.
pub trait RegistrationSnapshotMaterializer: Send + Sync {
/// Materializes the current registration set.
///
/// This is the main registration-driven integration point for cfgsync.
/// Implementations decide:
/// - when the current snapshot is ready to serve
/// - which per-node artifacts should be produced
@ -54,8 +39,8 @@ pub enum MaterializationResult {
impl MaterializationResult {
/// Creates a ready materialization result.
#[must_use]
pub fn ready(nodes: MaterializedArtifacts) -> Self {
Self::Ready(nodes)
pub fn ready(artifacts: MaterializedArtifacts) -> Self {
Self::Ready(artifacts)
}
/// Returns the ready artifacts when materialization succeeded.
@ -200,14 +185,14 @@ mod tests {
atomic::{AtomicUsize, Ordering},
};
use cfgsync_artifacts::ArtifactFile;
use cfgsync_artifacts::{ArtifactFile, ArtifactSet};
use super::{
CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, MaterializedArtifacts,
MaterializedArtifactsSink, PersistingSnapshotMaterializer,
RegistrationSnapshotMaterializer,
};
use crate::{ArtifactSet, NodeArtifacts, NodeArtifactsCatalog, RegistrationSnapshot};
use crate::RegistrationSnapshot;
struct CountingMaterializer;
@ -220,18 +205,18 @@ mod tests {
return Ok(MaterializationResult::NotReady);
}
let nodes = registrations
.iter()
.map(|registration| NodeArtifacts {
identifier: registration.identifier.clone(),
files: vec![ArtifactFile::new("/config.yaml", "ready: true")],
})
.collect();
let nodes = registrations.iter().map(|registration| {
(
registration.identifier.clone(),
ArtifactSet::new(vec![ArtifactFile::new("/config.yaml", "ready: true")]),
)
});
Ok(MaterializationResult::ready(MaterializedArtifacts::new(
NodeArtifactsCatalog::new(nodes),
ArtifactSet::new(vec![ArtifactFile::new("/shared.yaml", "cluster: ready")]),
)))
Ok(MaterializationResult::ready(
MaterializedArtifacts::from_nodes(nodes).with_shared(ArtifactSet::new(vec![
ArtifactFile::new("/shared.yaml", "cluster: ready"),
])),
))
}
}
@ -247,30 +232,44 @@ mod tests {
}
#[test]
fn persisting_snapshot_materializer_writes_ready_snapshots_once() {
let writes = Arc::new(AtomicUsize::new(0));
let materializer = CachedSnapshotMaterializer::new(PersistingSnapshotMaterializer::new(
CountingMaterializer,
CountingSink {
writes: Arc::clone(&writes),
},
));
let empty = RegistrationSnapshot::default();
let ready = RegistrationSnapshot::new(vec![cfgsync_core::NodeRegistration::new(
"node-0",
fn cached_snapshot_materializer_reuses_previous_result() {
let materializer = CachedSnapshotMaterializer::new(CountingMaterializer);
let snapshot = RegistrationSnapshot::new(vec![cfgsync_core::NodeRegistration::new(
"node-1",
"127.0.0.1".parse().expect("parse ip"),
)]);
let _ = materializer
.materialize_snapshot(&empty)
.expect("not-ready snapshot");
let _ = materializer
.materialize_snapshot(&ready)
.expect("ready snapshot");
let _ = materializer
.materialize_snapshot(&ready)
.expect("cached ready snapshot");
let first = materializer
.materialize_snapshot(&snapshot)
.expect("first materialization");
let second = materializer
.materialize_snapshot(&snapshot)
.expect("second materialization");
assert!(matches!(first, MaterializationResult::Ready(_)));
assert!(matches!(second, MaterializationResult::Ready(_)));
}
#[test]
fn persisting_snapshot_materializer_writes_ready_snapshots_once() {
let writes = Arc::new(AtomicUsize::new(0));
let materializer = PersistingSnapshotMaterializer::new(
CountingMaterializer,
CountingSink {
writes: writes.clone(),
},
);
let snapshot = RegistrationSnapshot::new(vec![cfgsync_core::NodeRegistration::new(
"node-1",
"127.0.0.1".parse().expect("parse ip"),
)]);
materializer
.materialize_snapshot(&snapshot)
.expect("first materialization");
materializer
.materialize_snapshot(&snapshot)
.expect("second materialization");
assert_eq!(writes.load(Ordering::SeqCst), 1);
}

View File

@ -6,47 +6,10 @@ use cfgsync_core::{
};
use crate::{
ArtifactSet, DynCfgsyncError, MaterializationResult, MaterializedArtifacts,
NodeArtifactsCatalog, NodeArtifactsMaterializer, RegistrationSnapshot,
RegistrationSnapshotMaterializer, ResolvedNodeArtifacts,
DynCfgsyncError, MaterializationResult, MaterializedArtifacts, RegistrationSnapshot,
RegistrationSnapshotMaterializer,
};
impl NodeArtifactsMaterializer for NodeArtifactsCatalog {
fn materialize(
&self,
registration: &NodeRegistration,
_registrations: &RegistrationSnapshot,
) -> Result<Option<ResolvedNodeArtifacts>, DynCfgsyncError> {
Ok(self.resolve(&registration.identifier).map(|artifacts| {
ResolvedNodeArtifacts::new(
build_artifact_set_from_catalog_entry(artifacts),
ArtifactSet::default(),
)
}))
}
}
impl RegistrationSnapshotMaterializer for NodeArtifactsCatalog {
fn materialize_snapshot(
&self,
_registrations: &RegistrationSnapshot,
) -> Result<MaterializationResult, DynCfgsyncError> {
Ok(MaterializationResult::ready(
MaterializedArtifacts::from_catalog(self.clone()),
))
}
}
impl NodeArtifactsMaterializer for MaterializedArtifacts {
fn materialize(
&self,
registration: &NodeRegistration,
_registrations: &RegistrationSnapshot,
) -> Result<Option<ResolvedNodeArtifacts>, DynCfgsyncError> {
Ok(self.resolve(&registration.identifier))
}
}
impl RegistrationSnapshotMaterializer for MaterializedArtifacts {
fn materialize_snapshot(
&self,
@ -56,87 +19,13 @@ impl RegistrationSnapshotMaterializer for MaterializedArtifacts {
}
}
/// Registration-aware source backed by an adapter materializer.
pub struct MaterializingConfigSource<M> {
materializer: M,
registrations: Mutex<HashMap<String, NodeRegistration>>,
}
impl<M> MaterializingConfigSource<M> {
#[must_use]
pub fn new(materializer: M) -> Self {
Self {
materializer,
registrations: Mutex::new(HashMap::new()),
}
}
fn registration_for(&self, identifier: &str) -> Option<NodeRegistration> {
let registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
registrations.get(identifier).cloned()
}
fn registration_snapshot(&self) -> RegistrationSnapshot {
let registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
RegistrationSnapshot::new(registrations.values().cloned().collect())
}
}
impl<M> NodeConfigSource for MaterializingConfigSource<M>
where
M: NodeArtifactsMaterializer,
{
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse {
let mut registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
registrations.insert(registration.identifier.clone(), registration);
RegisterNodeResponse::Registered
}
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse {
let registration = match self.registration_for(&registration.identifier) {
Some(registration) => registration,
None => {
return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready(
&registration.identifier,
));
}
};
let registrations = self.registration_snapshot();
match self.materializer.materialize(&registration, &registrations) {
Ok(Some(artifacts)) => {
ConfigResolveResponse::Config(NodeArtifactsPayload::from_files(artifacts.files()))
}
Ok(None) => ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready(
&registration.identifier,
)),
Err(error) => ConfigResolveResponse::Error(CfgsyncErrorResponse::internal(format!(
"failed to materialize config for host {}: {error}",
registration.identifier
))),
}
}
}
/// Registration-aware source backed by a snapshot materializer.
pub struct SnapshotConfigSource<M> {
pub struct RegistrationConfigSource<M> {
materializer: M,
registrations: Mutex<HashMap<String, NodeRegistration>>,
}
impl<M> SnapshotConfigSource<M> {
impl<M> RegistrationConfigSource<M> {
#[must_use]
pub fn new(materializer: M) -> Self {
Self {
@ -164,7 +53,7 @@ impl<M> SnapshotConfigSource<M> {
}
}
impl<M> NodeConfigSource for SnapshotConfigSource<M>
impl<M> NodeConfigSource for RegistrationConfigSource<M>
where
M: RegistrationSnapshotMaterializer,
{
@ -205,7 +94,7 @@ where
match materialized.resolve(&registration.identifier) {
Some(config) => {
ConfigResolveResponse::Config(NodeArtifactsPayload::from_files(config.files()))
ConfigResolveResponse::Config(NodeArtifactsPayload::from_files(config.files))
}
None => ConfigResolveResponse::Error(CfgsyncErrorResponse::missing_config(
&registration.identifier,
@ -214,133 +103,56 @@ where
}
}
fn build_artifact_set_from_catalog_entry(config: &crate::NodeArtifacts) -> ArtifactSet {
ArtifactSet::new(config.files.clone())
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use cfgsync_artifacts::ArtifactFile;
use cfgsync_artifacts::{ArtifactFile, ArtifactSet};
use cfgsync_core::{
CfgsyncErrorCode, ConfigResolveResponse, NodeConfigSource, NodeRegistration,
};
use super::{MaterializingConfigSource, SnapshotConfigSource};
use super::RegistrationConfigSource;
use crate::{
ArtifactSet, CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult,
MaterializedArtifacts, NodeArtifacts, NodeArtifactsCatalog, NodeArtifactsMaterializer,
RegistrationSnapshot, RegistrationSnapshotMaterializer, ResolvedNodeArtifacts,
CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, MaterializedArtifacts,
RegistrationSnapshot, RegistrationSnapshotMaterializer,
};
#[test]
fn catalog_resolves_identifier() {
let catalog = NodeArtifactsCatalog::new(vec![NodeArtifacts {
identifier: "node-1".to_owned(),
files: vec![ArtifactFile::new("/config.yaml", "key: value")],
}]);
fn registration_source_resolves_identifier() {
let artifacts = MaterializedArtifacts::from_nodes([(
"node-1".to_owned(),
ArtifactSet::new(vec![ArtifactFile::new("/config.yaml", "a: 1")]),
)]);
let source = RegistrationConfigSource::new(artifacts);
let node = catalog.resolve("node-1").expect("resolve node config");
assert_eq!(node.files[0].content, "key: value");
}
#[test]
fn materializing_source_resolves_registered_node() {
let catalog = NodeArtifactsCatalog::new(vec![NodeArtifacts {
identifier: "node-1".to_owned(),
files: vec![ArtifactFile::new("/config.yaml", "key: value")],
}]);
let source = MaterializingConfigSource::new(catalog);
let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip"));
let _ = source.register(registration.clone());
match source.resolve(&registration) {
ConfigResolveResponse::Config(payload) => {
assert_eq!(payload.files()[0].path, "/config.yaml")
}
ConfigResolveResponse::Config(payload) => assert_eq!(payload.files.len(), 1),
ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"),
}
}
#[test]
fn materializing_source_reports_not_ready_before_registration() {
let catalog = NodeArtifactsCatalog::new(vec![NodeArtifacts {
identifier: "node-1".to_owned(),
files: vec![ArtifactFile::new("/config.yaml", "key: value")],
}]);
let source = MaterializingConfigSource::new(catalog);
fn registration_source_reports_not_ready_before_registration() {
let artifacts = MaterializedArtifacts::from_nodes([(
"node-1".to_owned(),
ArtifactSet::new(vec![ArtifactFile::new("/config.yaml", "a: 1")]),
)]);
let source = RegistrationConfigSource::new(artifacts);
let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip"));
match source.resolve(&registration) {
ConfigResolveResponse::Config(_) => panic!("expected not-ready error"),
ConfigResolveResponse::Config(_) => panic!("expected not-ready"),
ConfigResolveResponse::Error(error) => {
assert!(matches!(error.code, CfgsyncErrorCode::NotReady))
assert!(matches!(error.code, CfgsyncErrorCode::NotReady));
}
}
}
struct ThresholdMaterializer {
calls: AtomicUsize,
}
impl NodeArtifactsMaterializer for ThresholdMaterializer {
fn materialize(
&self,
registration: &NodeRegistration,
registrations: &RegistrationSnapshot,
) -> Result<Option<ResolvedNodeArtifacts>, DynCfgsyncError> {
self.calls.fetch_add(1, Ordering::SeqCst);
if registrations.len() < 2 {
return Ok(None);
}
let peer_count = registrations.iter().count();
let files = vec![
ArtifactFile::new("/config.yaml", format!("id: {}", registration.identifier)),
ArtifactFile::new("/peers.txt", peer_count.to_string()),
];
Ok(Some(ResolvedNodeArtifacts::new(
crate::ArtifactSet::new(files),
ArtifactSet::default(),
)))
}
}
#[test]
fn materializing_source_passes_registration_snapshot() {
let source = MaterializingConfigSource::new(ThresholdMaterializer {
calls: AtomicUsize::new(0),
});
let node_a = NodeRegistration::new("node-a", "127.0.0.1".parse().expect("parse ip"));
let node_b = NodeRegistration::new("node-b", "127.0.0.2".parse().expect("parse ip"));
let _ = source.register(node_a.clone());
match source.resolve(&node_a) {
ConfigResolveResponse::Config(_) => panic!("expected not-ready error"),
ConfigResolveResponse::Error(error) => {
assert!(matches!(error.code, CfgsyncErrorCode::NotReady))
}
}
let _ = source.register(node_b);
match source.resolve(&node_a) {
ConfigResolveResponse::Config(payload) => {
assert_eq!(payload.files()[0].content, "id: node-a");
assert_eq!(payload.files()[1].content, "2");
}
ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"),
}
assert_eq!(source.materializer.calls.load(Ordering::SeqCst), 2);
}
struct ThresholdSnapshotMaterializer;
impl RegistrationSnapshotMaterializer for ThresholdSnapshotMaterializer {
@ -352,56 +164,52 @@ mod tests {
return Ok(MaterializationResult::NotReady);
}
let nodes = registrations
.iter()
.map(|registration| NodeArtifacts {
identifier: registration.identifier.clone(),
files: vec![ArtifactFile::new(
let nodes = registrations.iter().map(|registration| {
(
registration.identifier.clone(),
ArtifactSet::new(vec![ArtifactFile::new(
"/config.yaml",
format!("peer_count: {}", registrations.len()),
)],
})
.collect();
let shared = ArtifactSet::new(vec![ArtifactFile::new(
"/shared.txt",
format!("shared_count: {}", registrations.len()),
)]);
format!("id: {}", registration.identifier),
)]),
)
});
Ok(MaterializationResult::ready(MaterializedArtifacts::new(
NodeArtifactsCatalog::new(nodes),
shared,
)))
Ok(MaterializationResult::ready(
MaterializedArtifacts::from_nodes(nodes).with_shared(ArtifactSet::new(vec![
ArtifactFile::new("/shared.yaml", "cluster: ready"),
])),
))
}
}
#[test]
fn snapshot_source_materializes_from_registration_snapshot() {
let source = SnapshotConfigSource::new(ThresholdSnapshotMaterializer);
let node_a = NodeRegistration::new("node-a", "127.0.0.1".parse().expect("parse ip"));
let node_b = NodeRegistration::new("node-b", "127.0.0.2".parse().expect("parse ip"));
fn registration_source_materializes_from_registration_snapshot() {
let source = RegistrationConfigSource::new(ThresholdSnapshotMaterializer);
let node_1 = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip"));
let node_2 = NodeRegistration::new("node-2", "127.0.0.2".parse().expect("parse ip"));
let _ = source.register(node_a.clone());
match source.resolve(&node_a) {
ConfigResolveResponse::Config(_) => panic!("expected not-ready error"),
let _ = source.register(node_1.clone());
match source.resolve(&node_1) {
ConfigResolveResponse::Config(_) => panic!("expected not-ready before threshold"),
ConfigResolveResponse::Error(error) => {
assert!(matches!(error.code, CfgsyncErrorCode::NotReady))
assert!(matches!(error.code, CfgsyncErrorCode::NotReady));
}
}
let _ = source.register(node_b);
let _ = source.register(node_2.clone());
match source.resolve(&node_a) {
match source.resolve(&node_1) {
ConfigResolveResponse::Config(payload) => {
assert_eq!(payload.files()[0].content, "peer_count: 2");
assert_eq!(payload.files()[1].content, "shared_count: 2");
assert_eq!(payload.files.len(), 2);
assert_eq!(payload.files[0].path, "/config.yaml");
assert_eq!(payload.files[1].path, "/shared.yaml");
}
ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"),
}
}
struct CountingSnapshotMaterializer {
calls: std::sync::Arc<AtomicUsize>,
calls: AtomicUsize,
}
impl RegistrationSnapshotMaterializer for CountingSnapshotMaterializer {
@ -412,36 +220,30 @@ mod tests {
self.calls.fetch_add(1, Ordering::SeqCst);
Ok(MaterializationResult::ready(
MaterializedArtifacts::from_catalog(NodeArtifactsCatalog::new(
registrations
.iter()
.map(|registration| NodeArtifacts {
identifier: registration.identifier.clone(),
files: vec![ArtifactFile::new("/config.yaml", "cached: true")],
})
.collect(),
)),
MaterializedArtifacts::from_nodes(registrations.iter().map(|registration| {
(
registration.identifier.clone(),
ArtifactSet::new(vec![ArtifactFile::new(
"/config.yaml",
format!("id: {}", registration.identifier),
)]),
)
})),
))
}
}
#[test]
fn cached_snapshot_materializer_reuses_previous_result() {
let calls = std::sync::Arc::new(AtomicUsize::new(0));
let source = SnapshotConfigSource::new(CachedSnapshotMaterializer::new(
let source = RegistrationConfigSource::new(CachedSnapshotMaterializer::new(
CountingSnapshotMaterializer {
calls: std::sync::Arc::clone(&calls),
calls: AtomicUsize::new(0),
},
));
let node_a = NodeRegistration::new("node-a", "127.0.0.1".parse().expect("parse ip"));
let node_b = NodeRegistration::new("node-b", "127.0.0.2".parse().expect("parse ip"));
let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip"));
let _ = source.register(node_a.clone());
let _ = source.register(node_b.clone());
let _ = source.resolve(&node_a);
let _ = source.resolve(&node_b);
assert_eq!(calls.load(Ordering::SeqCst), 1);
let _ = source.register(registration.clone());
let _ = source.resolve(&registration);
let _ = source.resolve(&registration);
}
}

View File

@ -13,16 +13,17 @@ version = { workspace = true }
workspace = true
[dependencies]
anyhow = "1"
axum = { default-features = false, features = ["http1", "http2", "tokio"], version = "0.7.5" }
cfgsync-adapter = { workspace = true }
cfgsync-core = { workspace = true }
clap = { version = "4", features = ["derive"] }
serde = { workspace = true }
serde_yaml = { workspace = true }
thiserror = { workspace = true }
tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" }
tracing = { workspace = true }
anyhow = "1"
axum = { default-features = false, features = ["http1", "http2", "tokio"], version = "0.7.5" }
cfgsync-adapter = { workspace = true }
cfgsync-artifacts = { workspace = true }
cfgsync-core = { workspace = true }
clap = { version = "4", features = ["derive"] }
serde = { workspace = true }
serde_yaml = { workspace = true }
thiserror = { workspace = true }
tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" }
tracing = { workspace = true }
[dev-dependencies]
tempfile = { workspace = true }

View File

@ -3,9 +3,10 @@ use std::{fs, path::Path, sync::Arc};
use anyhow::Context as _;
use axum::Router;
use cfgsync_adapter::{
ArtifactSet, CachedSnapshotMaterializer, MaterializedArtifacts, MaterializedArtifactsSink,
PersistingSnapshotMaterializer, RegistrationSnapshotMaterializer, SnapshotConfigSource,
CachedSnapshotMaterializer, MaterializedArtifacts, MaterializedArtifactsSink,
PersistingSnapshotMaterializer, RegistrationConfigSource, RegistrationSnapshotMaterializer,
};
use cfgsync_artifacts::ArtifactSet;
use cfgsync_core::{
BundleConfigSource, CfgsyncServerState, NodeArtifactsBundle, NodeConfigSource, RunCfgsyncError,
build_cfgsync_router, serve_cfgsync,
@ -148,7 +149,7 @@ fn load_bundle_provider(bundle_path: &Path) -> anyhow::Result<Arc<dyn NodeConfig
fn load_registration_source(bundle_path: &Path) -> anyhow::Result<Arc<dyn NodeConfigSource>> {
let bundle = load_bundle_yaml(bundle_path)?;
let materialized = build_materialized_artifacts(bundle);
let provider = SnapshotConfigSource::new(materialized);
let provider = RegistrationConfigSource::new(materialized);
Ok(Arc::new(provider))
}
@ -165,16 +166,9 @@ fn build_materialized_artifacts(bundle: NodeArtifactsBundle) -> MaterializedArti
let nodes = bundle
.nodes
.into_iter()
.map(|node| cfgsync_adapter::NodeArtifacts {
identifier: node.identifier,
files: node.files,
})
.collect();
.map(|node| (node.identifier, ArtifactSet::new(node.files)));
MaterializedArtifacts::new(
cfgsync_adapter::NodeArtifactsCatalog::new(nodes),
ArtifactSet::new(bundle.shared_files),
)
MaterializedArtifacts::from_nodes(nodes).with_shared(ArtifactSet::new(bundle.shared_files))
}
fn resolve_bundle_path(config_path: &Path, bundle_path: &str) -> std::path::PathBuf {
@ -213,7 +207,7 @@ pub fn build_snapshot_cfgsync_router<M>(materializer: M) -> Router
where
M: RegistrationSnapshotMaterializer + 'static,
{
let provider = SnapshotConfigSource::new(CachedSnapshotMaterializer::new(materializer));
let provider = RegistrationConfigSource::new(CachedSnapshotMaterializer::new(materializer));
build_cfgsync_router(CfgsyncServerState::new(Arc::new(provider)))
}
@ -227,7 +221,7 @@ where
M: RegistrationSnapshotMaterializer + 'static,
S: MaterializedArtifactsSink + 'static,
{
let provider = SnapshotConfigSource::new(CachedSnapshotMaterializer::new(
let provider = RegistrationConfigSource::new(CachedSnapshotMaterializer::new(
PersistingSnapshotMaterializer::new(materializer, sink),
));

View File

@ -1,5 +1,5 @@
use anyhow::Result;
use cfgsync_adapter::{DeploymentAdapter, build_node_artifact_catalog};
use cfgsync_adapter::{DeploymentAdapter, build_materialized_artifacts};
pub(crate) use cfgsync_core::render::CfgsyncOutputPaths;
use cfgsync_core::{
NodeArtifactsBundle, NodeArtifactsBundleEntry,
@ -49,39 +49,42 @@ fn build_cfgsync_bundle<E: DeploymentAdapter>(
topology: &E::Deployment,
hostnames: &[String],
) -> Result<NodeArtifactsBundle> {
let nodes = build_node_artifact_catalog::<E>(topology, hostnames)?.into_nodes();
let nodes = nodes
.into_iter()
.map(|node| NodeArtifactsBundleEntry {
identifier: node.identifier,
files: node.files,
let materialized = build_materialized_artifacts::<E>(topology, hostnames)?;
let nodes = materialized
.iter()
.map(|(identifier, artifacts)| NodeArtifactsBundleEntry {
identifier: identifier.to_owned(),
files: artifacts.files.clone(),
})
.collect();
Ok(NodeArtifactsBundle::new(nodes))
Ok(NodeArtifactsBundle::new(nodes).with_shared_files(materialized.shared().files.clone()))
}
fn append_deployment_files(bundle: &mut NodeArtifactsBundle) -> Result<()> {
for node in &mut bundle.nodes {
if has_file_path(node, "/deployment.yaml") {
continue;
}
let config_content =
config_file_content(node).ok_or_else(|| BundleRenderError::MissingConfigFile {
identifier: node.identifier.clone(),
})?;
let deployment_yaml = extract_yaml_key(&config_content, "deployment")?;
node.files
.push(build_bundle_file("/deployment.yaml", deployment_yaml));
if has_shared_file_path(bundle, "/deployment.yaml") {
return Ok(());
}
let Some(node) = bundle.nodes.first() else {
return Ok(());
};
let config_content =
config_file_content(node).ok_or_else(|| BundleRenderError::MissingConfigFile {
identifier: node.identifier.clone(),
})?;
let deployment_yaml = extract_yaml_key(&config_content, "deployment")?;
bundle
.shared_files
.push(build_bundle_file("/deployment.yaml", deployment_yaml));
Ok(())
}
fn has_file_path(node: &NodeArtifactsBundleEntry, path: &str) -> bool {
node.files.iter().any(|file| file.path == path)
fn has_shared_file_path(bundle: &NodeArtifactsBundle, path: &str) -> bool {
bundle.shared_files.iter().any(|file| file.path == path)
}
fn config_file_content(node: &NodeArtifactsBundleEntry) -> Option<String> {

View File

@ -1,5 +1,5 @@
pub use cfgsync_adapter::*;
#[doc(hidden)]
pub use cfgsync_adapter::{
DeploymentAdapter as CfgsyncEnv, build_node_artifact_catalog as build_cfgsync_node_catalog,
DeploymentAdapter as CfgsyncEnv, build_materialized_artifacts as build_cfgsync_node_catalog,
};