diff --git a/Cargo.lock b/Cargo.lock index 2be427b..dbfd17a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -923,6 +923,7 @@ dependencies = [ "cfgsync-artifacts", "cfgsync-core", "serde", + "serde_json", "thiserror 2.0.18", ] @@ -959,7 +960,6 @@ dependencies = [ "cfgsync-core", "clap", "serde", - "serde_json", "serde_yaml", "tempfile", "thiserror 2.0.18", diff --git a/cfgsync/adapter/Cargo.toml b/cfgsync/adapter/Cargo.toml index bcc4da7..cba0480 100644 --- a/cfgsync/adapter/Cargo.toml +++ b/cfgsync/adapter/Cargo.toml @@ -16,4 +16,5 @@ workspace = true cfgsync-artifacts = { workspace = true } cfgsync-core = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } thiserror = { workspace = true } diff --git a/cfgsync/adapter/src/lib.rs b/cfgsync/adapter/src/lib.rs index c530f62..fd4e168 100644 --- a/cfgsync/adapter/src/lib.rs +++ b/cfgsync/adapter/src/lib.rs @@ -10,7 +10,8 @@ pub use deployment::{ build_node_artifact_catalog, }; pub use materializer::{ - DynCfgsyncError, NodeArtifactsMaterializer, RegistrationSnapshotMaterializer, + CachedSnapshotMaterializer, DynCfgsyncError, NodeArtifactsMaterializer, + RegistrationSnapshotMaterializer, }; pub use registrations::RegistrationSnapshot; pub use sources::{MaterializingConfigSource, SnapshotConfigSource}; diff --git a/cfgsync/adapter/src/materializer.rs b/cfgsync/adapter/src/materializer.rs index 5680960..764bcc3 100644 --- a/cfgsync/adapter/src/materializer.rs +++ b/cfgsync/adapter/src/materializer.rs @@ -1,6 +1,7 @@ -use std::error::Error; +use std::{error::Error, sync::Mutex}; use cfgsync_core::NodeRegistration; +use serde_json::to_string; use crate::{ArtifactSet, NodeArtifactsCatalog, RegistrationSnapshot}; @@ -24,3 +25,66 @@ pub trait RegistrationSnapshotMaterializer: Send + Sync { registrations: &RegistrationSnapshot, ) -> Result, DynCfgsyncError>; } + +/// Snapshot materializer wrapper that caches the last materialized result. +pub struct CachedSnapshotMaterializer { + inner: M, + cache: Mutex>, +} + +struct CachedSnapshot { + key: String, + catalog: Option, +} + +impl CachedSnapshotMaterializer { + #[must_use] + pub fn new(inner: M) -> Self { + Self { + inner, + cache: Mutex::new(None), + } + } + + fn snapshot_key(registrations: &RegistrationSnapshot) -> Result { + Ok(to_string(registrations)?) + } +} + +impl RegistrationSnapshotMaterializer for CachedSnapshotMaterializer +where + M: RegistrationSnapshotMaterializer, +{ + fn materialize_snapshot( + &self, + registrations: &RegistrationSnapshot, + ) -> Result, DynCfgsyncError> { + let key = Self::snapshot_key(registrations)?; + + { + let cache = self + .cache + .lock() + .expect("cfgsync snapshot cache should not be poisoned"); + + if let Some(cached) = &*cache + && cached.key == key + { + return Ok(cached.catalog.clone()); + } + } + + let catalog = self.inner.materialize_snapshot(registrations)?; + let mut cache = self + .cache + .lock() + .expect("cfgsync snapshot cache should not be poisoned"); + + *cache = Some(CachedSnapshot { + key, + catalog: catalog.clone(), + }); + + Ok(catalog) + } +} diff --git a/cfgsync/adapter/src/registrations.rs b/cfgsync/adapter/src/registrations.rs index 3926b57..47e365f 100644 --- a/cfgsync/adapter/src/registrations.rs +++ b/cfgsync/adapter/src/registrations.rs @@ -1,14 +1,17 @@ use cfgsync_core::NodeRegistration; +use serde::Serialize; /// Immutable view of registrations currently known to cfgsync. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, Serialize)] pub struct RegistrationSnapshot { registrations: Vec, } impl RegistrationSnapshot { #[must_use] - pub fn new(registrations: Vec) -> Self { + pub fn new(mut registrations: Vec) -> Self { + registrations.sort_by(|left, right| left.identifier.cmp(&right.identifier)); + Self { registrations } } diff --git a/cfgsync/adapter/src/sources.rs b/cfgsync/adapter/src/sources.rs index 5008515..e655a13 100644 --- a/cfgsync/adapter/src/sources.rs +++ b/cfgsync/adapter/src/sources.rs @@ -204,8 +204,8 @@ mod tests { use super::{MaterializingConfigSource, SnapshotConfigSource}; use crate::{ - DynCfgsyncError, NodeArtifacts, NodeArtifactsCatalog, NodeArtifactsMaterializer, - RegistrationSnapshot, RegistrationSnapshotMaterializer, + CachedSnapshotMaterializer, DynCfgsyncError, NodeArtifacts, NodeArtifactsCatalog, + NodeArtifactsMaterializer, RegistrationSnapshot, RegistrationSnapshotMaterializer, }; #[test] @@ -362,4 +362,47 @@ mod tests { ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"), } } + + struct CountingSnapshotMaterializer { + calls: std::sync::Arc, + } + + impl RegistrationSnapshotMaterializer for CountingSnapshotMaterializer { + fn materialize_snapshot( + &self, + registrations: &RegistrationSnapshot, + ) -> Result, DynCfgsyncError> { + self.calls.fetch_add(1, Ordering::SeqCst); + + Ok(Some(NodeArtifactsCatalog::new( + registrations + .iter() + .map(|registration| NodeArtifacts { + identifier: registration.identifier.clone(), + files: vec![ArtifactFile::new("/config.yaml", "cached: true")], + }) + .collect(), + ))) + } + } + + #[test] + fn cached_snapshot_materializer_reuses_previous_result() { + let calls = std::sync::Arc::new(AtomicUsize::new(0)); + let source = SnapshotConfigSource::new(CachedSnapshotMaterializer::new( + CountingSnapshotMaterializer { + calls: std::sync::Arc::clone(&calls), + }, + )); + let node_a = NodeRegistration::new("node-a", "127.0.0.1".parse().expect("parse ip")); + let node_b = NodeRegistration::new("node-b", "127.0.0.2".parse().expect("parse ip")); + + let _ = source.register(node_a.clone()); + let _ = source.register(node_b.clone()); + + let _ = source.resolve(&node_a); + let _ = source.resolve(&node_b); + + assert_eq!(calls.load(Ordering::SeqCst), 1); + } } diff --git a/cfgsync/runtime/Cargo.toml b/cfgsync/runtime/Cargo.toml index b6e7a5f..f708ba1 100644 --- a/cfgsync/runtime/Cargo.toml +++ b/cfgsync/runtime/Cargo.toml @@ -18,7 +18,6 @@ cfgsync-adapter = { workspace = true } cfgsync-core = { workspace = true } clap = { version = "4", features = ["derive"] } serde = { workspace = true } -serde_json = { workspace = true } serde_yaml = { workspace = true } thiserror = { workspace = true } tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" } diff --git a/cfgsync/runtime/src/client.rs b/cfgsync/runtime/src/client.rs index 340bdc1..b693ad9 100644 --- a/cfgsync/runtime/src/client.rs +++ b/cfgsync/runtime/src/client.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, env, fs, net::Ipv4Addr, path::{Path, PathBuf}, @@ -16,6 +17,36 @@ use tracing::info; const FETCH_ATTEMPTS: usize = 5; const FETCH_RETRY_DELAY: Duration = Duration::from_millis(250); +/// Output routing for fetched artifact files. +#[derive(Debug, Clone, Default)] +pub struct ArtifactOutputMap { + routes: HashMap, +} + +impl ArtifactOutputMap { + #[must_use] + pub fn new() -> Self { + Self::default() + } + + #[must_use] + pub fn route( + mut self, + artifact_path: impl Into, + output_path: impl Into, + ) -> Self { + self.routes.insert(artifact_path.into(), output_path.into()); + self + } + + fn resolve_path(&self, file: &NodeArtifactFile) -> PathBuf { + self.routes + .get(&file.path) + .cloned() + .unwrap_or_else(|| PathBuf::from(&file.path)) + } +} + #[derive(Debug, Error)] enum ClientEnvError { #[error("CFG_HOST_IP `{value}` is not a valid IPv4 address")] @@ -55,25 +86,6 @@ async fn fetch_once( Ok(response) } -async fn pull_config_files(payload: NodeRegistration, server_addr: &str) -> Result<()> { - register_node(&payload, server_addr).await?; - - let config = fetch_with_retry(&payload, server_addr) - .await - .context("fetching cfgsync node config")?; - ensure_schema_version(&config)?; - - let files = collect_payload_files(&config)?; - - for file in files { - write_cfgsync_file(file)?; - } - - info!(files = files.len(), "cfgsync files saved"); - - Ok(()) -} - async fn register_node(payload: &NodeRegistration, server_addr: &str) -> Result<()> { let client = CfgsyncClient::new(server_addr); @@ -98,6 +110,40 @@ async fn register_node(payload: &NodeRegistration, server_addr: &str) -> Result< unreachable!("cfgsync register loop always returns before exhausting attempts"); } +/// Registers a node and fetches its artifact payload from cfgsync. +pub async fn register_and_fetch_artifacts( + registration: &NodeRegistration, + server_addr: &str, +) -> Result { + register_node(registration, server_addr).await?; + + let payload = fetch_with_retry(registration, server_addr) + .await + .context("fetching cfgsync node config")?; + ensure_schema_version(&payload)?; + + Ok(payload) +} + +/// Registers a node, fetches its artifact payload, and writes the files using +/// the provided output routing policy. +pub async fn fetch_and_write_artifacts( + registration: &NodeRegistration, + server_addr: &str, + outputs: &ArtifactOutputMap, +) -> Result<()> { + let payload = register_and_fetch_artifacts(registration, server_addr).await?; + let files = collect_payload_files(&payload)?; + + for file in files { + write_cfgsync_file(file, outputs)?; + } + + info!(files = files.len(), "cfgsync files saved"); + + Ok(()) +} + fn ensure_schema_version(config: &NodeArtifactsPayload) -> Result<()> { if config.schema_version != CFGSYNC_SCHEMA_VERSION { bail!( @@ -118,8 +164,8 @@ fn collect_payload_files(config: &NodeArtifactsPayload) -> Result<&[NodeArtifact Ok(config.files()) } -fn write_cfgsync_file(file: &NodeArtifactFile) -> Result<()> { - let path = PathBuf::from(&file.path); +fn write_cfgsync_file(file: &NodeArtifactFile, outputs: &ArtifactOutputMap) -> Result<()> { + let path = outputs.resolve_path(file); ensure_parent_dir(&path)?; @@ -153,10 +199,12 @@ pub async fn run_cfgsync_client_from_env(default_port: u16) -> Result<()> { let identifier = env::var("CFG_HOST_IDENTIFIER").unwrap_or_else(|_| "unidentified-node".to_owned()); let metadata = parse_registration_payload_env()?; + let outputs = build_output_map(); - pull_config_files( - NodeRegistration::new(identifier, ip).with_payload(metadata), + fetch_and_write_artifacts( + &NodeRegistration::new(identifier, ip).with_payload(metadata), &server_addr, + &outputs, ) .await } @@ -182,6 +230,25 @@ fn parse_registration_payload(raw: &str) -> Result { RegistrationPayload::from_json_str(raw).context("parsing CFG_REGISTRATION_METADATA_JSON") } +fn build_output_map() -> ArtifactOutputMap { + let mut outputs = ArtifactOutputMap::default(); + + if let Ok(path) = env::var("CFG_FILE_PATH") { + outputs = outputs + .route("/config.yaml", path.clone()) + .route("config.yaml", path); + } + + if let Ok(path) = env::var("CFG_DEPLOYMENT_PATH") { + outputs = outputs + .route("/deployment.yaml", path.clone()) + .route("deployment-settings.yaml", path.clone()) + .route("/deployment-settings.yaml", path); + } + + outputs +} + #[cfg(test)] mod tests { use cfgsync_core::{ @@ -216,9 +283,10 @@ mod tests { .expect("run cfgsync server"); }); - pull_config_files( - NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")), + fetch_and_write_artifacts( + &NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")), &address, + &ArtifactOutputMap::default(), ) .await .expect("pull config files"); diff --git a/cfgsync/runtime/src/lib.rs b/cfgsync/runtime/src/lib.rs index f74a8a4..8a7337f 100644 --- a/cfgsync/runtime/src/lib.rs +++ b/cfgsync/runtime/src/lib.rs @@ -3,8 +3,11 @@ pub use cfgsync_core as core; mod client; mod server; -pub use client::run_cfgsync_client_from_env; +pub use client::{ + ArtifactOutputMap, fetch_and_write_artifacts, register_and_fetch_artifacts, + run_cfgsync_client_from_env, +}; pub use server::{ CfgsyncServerConfig, CfgsyncServingMode, LoadCfgsyncServerConfigError, - serve_cfgsync_from_config, + serve_cfgsync_from_config, serve_snapshot_cfgsync, }; diff --git a/cfgsync/runtime/src/server.rs b/cfgsync/runtime/src/server.rs index 651df58..077c075 100644 --- a/cfgsync/runtime/src/server.rs +++ b/cfgsync/runtime/src/server.rs @@ -1,9 +1,13 @@ use std::{fs, path::Path, sync::Arc}; use anyhow::Context as _; -use cfgsync_adapter::{MaterializingConfigSource, NodeArtifacts, NodeArtifactsCatalog}; +use cfgsync_adapter::{ + CachedSnapshotMaterializer, MaterializingConfigSource, NodeArtifacts, NodeArtifactsCatalog, + RegistrationSnapshotMaterializer, SnapshotConfigSource, +}; use cfgsync_core::{ - BundleConfigSource, CfgsyncServerState, NodeArtifactsBundle, NodeConfigSource, serve_cfgsync, + BundleConfigSource, CfgsyncServerState, NodeArtifactsBundle, NodeConfigSource, RunCfgsyncError, + serve_cfgsync, }; use serde::Deserialize; use thiserror::Error; @@ -137,6 +141,18 @@ pub async fn serve_cfgsync_from_config(config_path: &Path) -> anyhow::Result<()> Ok(()) } +/// Runs a registration-backed cfgsync server directly from a snapshot +/// materializer. +pub async fn serve_snapshot_cfgsync(port: u16, materializer: M) -> Result<(), RunCfgsyncError> +where + M: RegistrationSnapshotMaterializer + 'static, +{ + let provider = SnapshotConfigSource::new(CachedSnapshotMaterializer::new(materializer)); + let state = CfgsyncServerState::new(Arc::new(provider)); + + serve_cfgsync(port, state).await +} + fn build_server_state( config: &CfgsyncServerConfig, bundle_path: &Path,