Add shared cfgsync artifact materialization

This commit is contained in:
andrussal 2026-03-10 14:24:00 +01:00
parent 592b4d6a4f
commit 8681117301
7 changed files with 212 additions and 64 deletions

View File

@ -33,6 +33,43 @@ impl ArtifactSet {
pub fn is_empty(&self) -> bool {
self.files.is_empty()
}
#[must_use]
pub fn into_files(self) -> Vec<ArtifactFile> {
self.files
}
}
/// Resolved artifact payload for one node, including any shared files that
/// should be delivered alongside the node-local files.
#[derive(Debug, Clone, Default)]
pub struct ResolvedNodeArtifacts {
node: ArtifactSet,
shared: ArtifactSet,
}
impl ResolvedNodeArtifacts {
#[must_use]
pub fn new(node: ArtifactSet, shared: ArtifactSet) -> Self {
Self { node, shared }
}
#[must_use]
pub fn node(&self) -> &ArtifactSet {
&self.node
}
#[must_use]
pub fn shared(&self) -> &ArtifactSet {
&self.shared
}
#[must_use]
pub fn files(&self) -> Vec<ArtifactFile> {
let mut files = self.node.files().to_vec();
files.extend_from_slice(self.shared.files());
files
}
}
/// Artifact payloads indexed by stable node identifier.
@ -72,3 +109,39 @@ impl NodeArtifactsCatalog {
self.nodes.into_values().collect()
}
}
/// Materialized cfgsync output for a whole registration set.
#[derive(Debug, Clone, Default)]
pub struct MaterializedArtifacts {
nodes: NodeArtifactsCatalog,
shared: ArtifactSet,
}
impl MaterializedArtifacts {
#[must_use]
pub fn new(nodes: NodeArtifactsCatalog, shared: ArtifactSet) -> Self {
Self { nodes, shared }
}
#[must_use]
pub fn from_catalog(nodes: NodeArtifactsCatalog) -> Self {
Self::new(nodes, ArtifactSet::default())
}
#[must_use]
pub fn nodes(&self) -> &NodeArtifactsCatalog {
&self.nodes
}
#[must_use]
pub fn shared(&self) -> &ArtifactSet {
&self.shared
}
#[must_use]
pub fn resolve(&self, identifier: &str) -> Option<ResolvedNodeArtifacts> {
self.nodes.resolve(identifier).map(|node| {
ResolvedNodeArtifacts::new(ArtifactSet::new(node.files.clone()), self.shared.clone())
})
}
}

View File

@ -4,13 +4,15 @@ mod materializer;
mod registrations;
mod sources;
pub use artifacts::{ArtifactSet, NodeArtifacts, NodeArtifactsCatalog};
pub use artifacts::{
ArtifactSet, MaterializedArtifacts, NodeArtifacts, NodeArtifactsCatalog, ResolvedNodeArtifacts,
};
pub use deployment::{
BuildCfgsyncNodesError, DeploymentAdapter, build_cfgsync_node_configs,
build_node_artifact_catalog,
};
pub use materializer::{
CachedSnapshotMaterializer, DynCfgsyncError, NodeArtifactsMaterializer,
CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, NodeArtifactsMaterializer,
RegistrationSnapshotMaterializer,
};
pub use registrations::RegistrationSnapshot;

View File

@ -3,7 +3,7 @@ use std::{error::Error, sync::Mutex};
use cfgsync_core::NodeRegistration;
use serde_json::to_string;
use crate::{ArtifactSet, NodeArtifactsCatalog, RegistrationSnapshot};
use crate::{MaterializedArtifacts, RegistrationSnapshot, ResolvedNodeArtifacts};
/// Type-erased cfgsync adapter error used to preserve source context.
pub type DynCfgsyncError = Box<dyn Error + Send + Sync + 'static>;
@ -14,7 +14,7 @@ pub trait NodeArtifactsMaterializer: Send + Sync {
&self,
registration: &NodeRegistration,
registrations: &RegistrationSnapshot,
) -> Result<Option<ArtifactSet>, DynCfgsyncError>;
) -> Result<Option<ResolvedNodeArtifacts>, DynCfgsyncError>;
}
/// Adapter contract for materializing a whole registration snapshot into
@ -23,7 +23,22 @@ pub trait RegistrationSnapshotMaterializer: Send + Sync {
fn materialize_snapshot(
&self,
registrations: &RegistrationSnapshot,
) -> Result<Option<NodeArtifactsCatalog>, DynCfgsyncError>;
) -> Result<MaterializationResult, DynCfgsyncError>;
}
/// Registration-driven materialization status.
#[derive(Debug, Clone, Default)]
pub enum MaterializationResult {
#[default]
NotReady,
Ready(MaterializedArtifacts),
}
impl MaterializationResult {
#[must_use]
pub fn ready(nodes: MaterializedArtifacts) -> Self {
Self::Ready(nodes)
}
}
/// Snapshot materializer wrapper that caches the last materialized result.
@ -34,7 +49,7 @@ pub struct CachedSnapshotMaterializer<M> {
struct CachedSnapshot {
key: String,
catalog: Option<NodeArtifactsCatalog>,
result: MaterializationResult,
}
impl<M> CachedSnapshotMaterializer<M> {
@ -58,7 +73,7 @@ where
fn materialize_snapshot(
&self,
registrations: &RegistrationSnapshot,
) -> Result<Option<NodeArtifactsCatalog>, DynCfgsyncError> {
) -> Result<MaterializationResult, DynCfgsyncError> {
let key = Self::snapshot_key(registrations)?;
{
@ -70,11 +85,11 @@ where
if let Some(cached) = &*cache
&& cached.key == key
{
return Ok(cached.catalog.clone());
return Ok(cached.result.clone());
}
}
let catalog = self.inner.materialize_snapshot(registrations)?;
let result = self.inner.materialize_snapshot(registrations)?;
let mut cache = self
.cache
.lock()
@ -82,9 +97,9 @@ where
*cache = Some(CachedSnapshot {
key,
catalog: catalog.clone(),
result: result.clone(),
});
Ok(catalog)
Ok(result)
}
}

View File

@ -6,8 +6,9 @@ use cfgsync_core::{
};
use crate::{
ArtifactSet, DynCfgsyncError, NodeArtifactsCatalog, NodeArtifactsMaterializer,
RegistrationSnapshot, RegistrationSnapshotMaterializer,
ArtifactSet, DynCfgsyncError, MaterializationResult, MaterializedArtifacts,
NodeArtifactsCatalog, NodeArtifactsMaterializer, RegistrationSnapshot,
RegistrationSnapshotMaterializer, ResolvedNodeArtifacts,
};
impl NodeArtifactsMaterializer for NodeArtifactsCatalog {
@ -15,10 +16,13 @@ impl NodeArtifactsMaterializer for NodeArtifactsCatalog {
&self,
registration: &NodeRegistration,
_registrations: &RegistrationSnapshot,
) -> Result<Option<ArtifactSet>, DynCfgsyncError> {
Ok(self
.resolve(&registration.identifier)
.map(build_artifact_set_from_catalog_entry))
) -> Result<Option<ResolvedNodeArtifacts>, DynCfgsyncError> {
Ok(self.resolve(&registration.identifier).map(|artifacts| {
ResolvedNodeArtifacts::new(
build_artifact_set_from_catalog_entry(artifacts),
ArtifactSet::default(),
)
}))
}
}
@ -26,8 +30,29 @@ impl RegistrationSnapshotMaterializer for NodeArtifactsCatalog {
fn materialize_snapshot(
&self,
_registrations: &RegistrationSnapshot,
) -> Result<Option<NodeArtifactsCatalog>, DynCfgsyncError> {
Ok(Some(self.clone()))
) -> Result<MaterializationResult, DynCfgsyncError> {
Ok(MaterializationResult::ready(
MaterializedArtifacts::from_catalog(self.clone()),
))
}
}
impl NodeArtifactsMaterializer for MaterializedArtifacts {
fn materialize(
&self,
registration: &NodeRegistration,
_registrations: &RegistrationSnapshot,
) -> Result<Option<ResolvedNodeArtifacts>, DynCfgsyncError> {
Ok(self.resolve(&registration.identifier))
}
}
impl RegistrationSnapshotMaterializer for MaterializedArtifacts {
fn materialize_snapshot(
&self,
_registrations: &RegistrationSnapshot,
) -> Result<MaterializationResult, DynCfgsyncError> {
Ok(MaterializationResult::ready(self.clone()))
}
}
@ -91,9 +116,9 @@ where
let registrations = self.registration_snapshot();
match self.materializer.materialize(&registration, &registrations) {
Ok(Some(artifacts)) => ConfigResolveResponse::Config(NodeArtifactsPayload::from_files(
artifacts.files().to_vec(),
)),
Ok(Some(artifacts)) => {
ConfigResolveResponse::Config(NodeArtifactsPayload::from_files(artifacts.files()))
}
Ok(None) => ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready(
&registration.identifier,
)),
@ -164,9 +189,9 @@ where
};
let registrations = self.registration_snapshot();
let catalog = match self.materializer.materialize_snapshot(&registrations) {
Ok(Some(catalog)) => catalog,
Ok(None) => {
let materialized = match self.materializer.materialize_snapshot(&registrations) {
Ok(MaterializationResult::Ready(materialized)) => materialized,
Ok(MaterializationResult::NotReady) => {
return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready(
&registration.identifier,
));
@ -178,10 +203,10 @@ where
}
};
match catalog.resolve(&registration.identifier) {
Some(config) => ConfigResolveResponse::Config(NodeArtifactsPayload::from_files(
config.files.clone(),
)),
match materialized.resolve(&registration.identifier) {
Some(config) => {
ConfigResolveResponse::Config(NodeArtifactsPayload::from_files(config.files()))
}
None => ConfigResolveResponse::Error(CfgsyncErrorResponse::missing_config(
&registration.identifier,
)),
@ -204,8 +229,9 @@ mod tests {
use super::{MaterializingConfigSource, SnapshotConfigSource};
use crate::{
CachedSnapshotMaterializer, DynCfgsyncError, NodeArtifacts, NodeArtifactsCatalog,
NodeArtifactsMaterializer, RegistrationSnapshot, RegistrationSnapshotMaterializer,
ArtifactSet, CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult,
MaterializedArtifacts, NodeArtifacts, NodeArtifactsCatalog, NodeArtifactsMaterializer,
RegistrationSnapshot, RegistrationSnapshotMaterializer, ResolvedNodeArtifacts,
};
#[test]
@ -265,7 +291,7 @@ mod tests {
&self,
registration: &NodeRegistration,
registrations: &RegistrationSnapshot,
) -> Result<Option<crate::ArtifactSet>, DynCfgsyncError> {
) -> Result<Option<ResolvedNodeArtifacts>, DynCfgsyncError> {
self.calls.fetch_add(1, Ordering::SeqCst);
if registrations.len() < 2 {
@ -278,7 +304,10 @@ mod tests {
ArtifactFile::new("/peers.txt", peer_count.to_string()),
];
Ok(Some(crate::ArtifactSet::new(files)))
Ok(Some(ResolvedNodeArtifacts::new(
crate::ArtifactSet::new(files),
ArtifactSet::default(),
)))
}
}
@ -318,22 +347,29 @@ mod tests {
fn materialize_snapshot(
&self,
registrations: &RegistrationSnapshot,
) -> Result<Option<NodeArtifactsCatalog>, DynCfgsyncError> {
) -> Result<MaterializationResult, DynCfgsyncError> {
if registrations.len() < 2 {
return Ok(None);
return Ok(MaterializationResult::NotReady);
}
Ok(Some(NodeArtifactsCatalog::new(
registrations
.iter()
.map(|registration| NodeArtifacts {
identifier: registration.identifier.clone(),
files: vec![ArtifactFile::new(
"/config.yaml",
format!("peer_count: {}", registrations.len()),
)],
})
.collect(),
let nodes = registrations
.iter()
.map(|registration| NodeArtifacts {
identifier: registration.identifier.clone(),
files: vec![ArtifactFile::new(
"/config.yaml",
format!("peer_count: {}", registrations.len()),
)],
})
.collect();
let shared = ArtifactSet::new(vec![ArtifactFile::new(
"/shared.txt",
format!("shared_count: {}", registrations.len()),
)]);
Ok(MaterializationResult::ready(MaterializedArtifacts::new(
NodeArtifactsCatalog::new(nodes),
shared,
)))
}
}
@ -358,6 +394,7 @@ mod tests {
match source.resolve(&node_a) {
ConfigResolveResponse::Config(payload) => {
assert_eq!(payload.files()[0].content, "peer_count: 2");
assert_eq!(payload.files()[1].content, "shared_count: 2");
}
ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"),
}
@ -371,18 +408,20 @@ mod tests {
fn materialize_snapshot(
&self,
registrations: &RegistrationSnapshot,
) -> Result<Option<NodeArtifactsCatalog>, DynCfgsyncError> {
) -> Result<MaterializationResult, DynCfgsyncError> {
self.calls.fetch_add(1, Ordering::SeqCst);
Ok(Some(NodeArtifactsCatalog::new(
registrations
.iter()
.map(|registration| NodeArtifacts {
identifier: registration.identifier.clone(),
files: vec![ArtifactFile::new("/config.yaml", "cached: true")],
})
.collect(),
)))
Ok(MaterializationResult::ready(
MaterializedArtifacts::from_catalog(NodeArtifactsCatalog::new(
registrations
.iter()
.map(|registration| NodeArtifacts {
identifier: registration.identifier.clone(),
files: vec![ArtifactFile::new("/config.yaml", "cached: true")],
})
.collect(),
)),
))
}
}

View File

@ -6,12 +6,23 @@ use crate::NodeArtifactFile;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeArtifactsBundle {
pub nodes: Vec<NodeArtifactsBundleEntry>,
#[serde(default)]
pub shared_files: Vec<NodeArtifactFile>,
}
impl NodeArtifactsBundle {
#[must_use]
pub fn new(nodes: Vec<NodeArtifactsBundleEntry>) -> Self {
Self { nodes }
Self {
nodes,
shared_files: Vec::new(),
}
}
#[must_use]
pub fn with_shared_files(mut self, shared_files: Vec<NodeArtifactFile>) -> Self {
self.shared_files = shared_files;
self
}
}

View File

@ -75,13 +75,18 @@ pub enum BundleLoadError {
#[must_use]
pub fn bundle_to_payload_map(bundle: NodeArtifactsBundle) -> HashMap<String, NodeArtifactsPayload> {
let shared_files = bundle.shared_files;
bundle
.nodes
.into_iter()
.map(|node| {
let NodeArtifactsBundleEntry { identifier, files } = node;
(identifier, NodeArtifactsPayload::from_files(files))
let mut payload_files = files;
payload_files.extend(shared_files.clone());
(identifier, NodeArtifactsPayload::from_files(payload_files))
})
.collect()
}

View File

@ -2,7 +2,7 @@ use std::{fs, path::Path, sync::Arc};
use anyhow::Context as _;
use cfgsync_adapter::{
CachedSnapshotMaterializer, MaterializingConfigSource, NodeArtifacts, NodeArtifactsCatalog,
ArtifactSet, CachedSnapshotMaterializer, MaterializedArtifacts,
RegistrationSnapshotMaterializer, SnapshotConfigSource,
};
use cfgsync_core::{
@ -91,8 +91,8 @@ fn load_bundle_provider(bundle_path: &Path) -> anyhow::Result<Arc<dyn NodeConfig
fn load_registration_source(bundle_path: &Path) -> anyhow::Result<Arc<dyn NodeConfigSource>> {
let bundle = load_bundle_yaml(bundle_path)?;
let catalog = build_node_catalog(bundle);
let provider = MaterializingConfigSource::new(catalog);
let materialized = build_materialized_artifacts(bundle);
let provider = SnapshotConfigSource::new(materialized);
Ok(Arc::new(provider))
}
@ -105,17 +105,20 @@ fn load_bundle_yaml(bundle_path: &Path) -> anyhow::Result<NodeArtifactsBundle> {
.with_context(|| format!("parsing cfgsync bundle from {}", bundle_path.display()))
}
fn build_node_catalog(bundle: NodeArtifactsBundle) -> NodeArtifactsCatalog {
fn build_materialized_artifacts(bundle: NodeArtifactsBundle) -> MaterializedArtifacts {
let nodes = bundle
.nodes
.into_iter()
.map(|node| NodeArtifacts {
.map(|node| cfgsync_adapter::NodeArtifacts {
identifier: node.identifier,
files: node.files,
})
.collect();
NodeArtifactsCatalog::new(nodes)
MaterializedArtifacts::new(
cfgsync_adapter::NodeArtifactsCatalog::new(nodes),
ArtifactSet::new(bundle.shared_files),
)
}
fn resolve_bundle_path(config_path: &Path, bundle_path: &str) -> std::path::PathBuf {