diff --git a/Cargo.lock b/Cargo.lock index 0145d09..f550368 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -916,14 +916,36 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "cfgsync-adapter" +version = "0.1.0" +dependencies = [ + "cfgsync-artifacts", + "cfgsync-core", + "serde", + "serde_json", +] + +[[package]] +name = "cfgsync-artifacts" +version = "0.1.0" +dependencies = [ + "serde", + "thiserror 2.0.18", +] + [[package]] name = "cfgsync-core" version = "0.1.0" dependencies = [ + "anyhow", "axum", + "cfgsync-artifacts", "reqwest", "serde", "serde_json", + "serde_yaml", + "tempfile", "thiserror 2.0.18", "tokio", ] @@ -933,12 +955,17 @@ name = "cfgsync-runtime" version = "0.1.0" dependencies = [ "anyhow", + "axum", + "cfgsync-adapter", + "cfgsync-artifacts", "cfgsync-core", "clap", "serde", "serde_yaml", - "testing-framework-core", + "tempfile", + "thiserror 2.0.18", "tokio", + "tracing", ] [[package]] @@ -1299,7 +1326,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de" dependencies = [ "data-encoding", - "syn 2.0.114", + "syn 1.0.109", ] [[package]] @@ -2891,8 +2918,9 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "cfgsync-adapter", + "cfgsync-artifacts", "cfgsync-core", - "cfgsync-runtime", "kube", "logos-blockchain-http-api-common", "reqwest", @@ -5499,9 +5527,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.13" +version = "0.11.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" +checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" dependencies = [ "bytes", "getrandom 0.3.4", @@ -6528,6 +6556,8 @@ name = "testing-framework-core" version = "0.1.0" dependencies = [ "async-trait", + "cfgsync-adapter", + "cfgsync-artifacts", "futures", "parking_lot", "prometheus-http-query", diff --git a/Cargo.toml b/Cargo.toml index 9e32eb1..a1f960d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,9 @@ [workspace] members = [ + "cfgsync/adapter", + "cfgsync/artifacts", + "cfgsync/core", + "cfgsync/runtime", "logos/examples", "logos/runtime/env", "logos/runtime/ext", @@ -8,8 +12,6 @@ members = [ "testing-framework/deployers/compose", "testing-framework/deployers/k8s", "testing-framework/deployers/local", - "testing-framework/tools/cfgsync-core", - "testing-framework/tools/cfgsync-runtime", ] resolver = "2" @@ -31,7 +33,9 @@ all = "allow" [workspace.dependencies] # Local testing framework crates -cfgsync-core = { default-features = false, path = "testing-framework/tools/cfgsync-core" } +cfgsync-adapter = { default-features = false, path = "cfgsync/adapter" } +cfgsync-artifacts = { default-features = false, path = "cfgsync/artifacts" } +cfgsync-core = { default-features = false, path = "cfgsync/core" } lb-ext = { default-features = false, path = "logos/runtime/ext" } lb-framework = { default-features = false, package = "testing_framework", git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "5ebe88a6e89ec6d7dd89e123c46f6b26dd1e4667" } lb-workloads = { default-features = false, path = "logos/runtime/workloads" } @@ -40,10 +44,11 @@ testing-framework-env = { default-features = false, path = "logos/run testing-framework-runner-compose = { default-features = false, path = "testing-framework/deployers/compose" } testing-framework-runner-k8s = { default-features = false, path = "testing-framework/deployers/k8s" } testing-framework-runner-local = { default-features = false, path = "testing-framework/deployers/local" } +testing-framework-workflows = { default-features = false, package = "lb-workloads", path = "logos/runtime/workloads" } # Logos dependencies (from logos-blockchain master @ deccbb2d2) broadcast-service = { default-features = false, package = "logos-blockchain-chain-broadcast-service", git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "5ebe88a6e89ec6d7dd89e123c46f6b26dd1e4667" } -cfgsync_runtime = { default-features = false, package = "cfgsync-runtime", path = "testing-framework/tools/cfgsync-runtime" } +cfgsync_runtime = { default-features = false, package = "cfgsync-runtime", path = "cfgsync/runtime" } chain-leader = { default-features = false, features = [ "pol-dev-mode", ], package = "logos-blockchain-chain-leader-service", git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "5ebe88a6e89ec6d7dd89e123c46f6b26dd1e4667" } diff --git a/cfgsync/README.md b/cfgsync/README.md new file mode 100644 index 0000000..6e01eee --- /dev/null +++ b/cfgsync/README.md @@ -0,0 +1,265 @@ +# cfgsync + +`cfgsync` is a small library stack for bootstrap-time config delivery. + +The library solves one problem: nodes need to identify themselves, wait until configuration is ready, fetch the files they need, write them locally, and then continue startup. `cfgsync` owns that transport and serving loop. The application using it still decides what “ready” means and what files should be generated. + +That split is the point of the design: + +- `cfgsync` owns registration, polling, payload transport, and file delivery. +- the application adapter owns readiness policy and artifact generation. + +The result is a reusable library without application-specific bootstrap logic leaking into core crates. + +## How it works + +The normal flow is registration-backed serving. + +Each node first sends a registration containing: + +- a stable node identifier +- its IP address +- optional typed application metadata + +The server stores registrations and builds a `RegistrationSnapshot`. The application provides a `RegistrationSnapshotMaterializer`, which receives that snapshot and decides whether configuration is ready yet. + +If the materializer returns `NotReady`, the node keeps polling. If it returns `Ready`, cfgsync serves one payload containing: + +- node-local files for the requesting node +- optional shared files that every node should receive + +The node then writes those files locally and continues startup. + +That is the main model. Everything else is a variation of it. + +## Precomputed artifacts + +Some systems already know the final artifacts before any node starts. That still fits the same model. + +In that case the server simply starts with precomputed `MaterializedArtifacts`. Nodes still register and fetch through the same protocol, but the materializer already knows the final outputs. Registration becomes an identity and readiness gate, not a source of topology discovery. + +This is why cfgsync no longer needs a separate “static mode” as a first-class concept. Precomputed serving is just registration-backed serving with an already-known result. + +## Crate layout + +### `cfgsync-artifacts` + +This crate contains the file-level data model: + +- `ArtifactFile` for a single file +- `ArtifactSet` for a group of files + +If all you need is “what files exist and how are they grouped”, this is the crate to look at. + +### `cfgsync-core` + +This crate contains the protocol and the low-level HTTP implementation. + +Important types here are: + +- `NodeRegistration` +- `RegistrationPayload` +- `NodeArtifactsPayload` +- `Client` +- `NodeConfigSource` + +It also defines the HTTP contract: + +- `POST /register` +- `POST /node` + +The server answers with either a payload, `NotReady`, or `Missing`. + +### `cfgsync-adapter` + +This crate defines the application-facing seam. + +The key types are: + +- `RegistrationSnapshot` +- `RegistrationSnapshotMaterializer` +- `MaterializedArtifacts` +- `MaterializationResult` + +The adapter’s job is simple: given the current registration snapshot, decide whether artifacts are ready, and if they are, return them. + +The crate also contains reusable wrappers around that seam: + +- `CachedSnapshotMaterializer` +- `PersistingSnapshotMaterializer` +- `RegistrationConfigSource` + +These exist because caching and result persistence are generic orchestration concerns, not application-specific logic. + +### `cfgsync-runtime` + +This crate provides the operational entrypoints. + +Use it when you want to run cfgsync rather than define its protocol: + +- client-side fetch/write helpers +- server config loading +- direct serving helpers such as `serve(...)` + +This is the crate that should feel like the normal “start here” path for users integrating cfgsync into a real system. + +## Artifact model + +The adapter usually thinks in full snapshots, but cfgsync serves one node at a time. + +The materializer returns `MaterializedArtifacts`, which contain: + +- node-local artifacts keyed by node identifier +- optional shared artifacts + +When one node fetches config, cfgsync resolves that node’s local files, merges in the shared files, and returns a single payload. + +That is why applications usually do not need a second “shared config” endpoint. Shared files can travel in the same payload as node-local files. + +## The adapter boundary + +The adapter is where application semantics belong. + +In practice, the adapter should define: + +- the typed registration payload +- the readiness rule +- the conversion from registration snapshots into artifacts +- any shared artifact generation the application needs + +Typical examples are: + +- waiting for `n` initial nodes +- deriving peer lists from registrations +- generating one node-local config file per node +- generating one shared deployment file for all nodes + +What does not belong in cfgsync core is equally important. Generic cfgsync should not understand: + +- application-specific topology semantics +- genesis or deployment generation rules for one protocol +- application-specific command/state-machine logic +- domain-specific ideas of what a node “really is” + +Those belong in the adapter or in the consuming application. + +## Start here + +Start with the examples in `cfgsync/runtime/examples/`. + +- `minimal_cfgsync.rs` shows the smallest complete flow: serve cfgsync, register + one node, fetch artifacts, and write them locally. +- `precomputed_registration_cfgsync.rs` shows how precomputed artifacts still + use the same registration flow, including a later node that joins after the + server is already running. +- `wait_for_registrations_cfgsync.rs` shows the normal `NotReady` path: one node + waits until the materializer sees enough registrations, then both nodes + receive config. + +Those three examples cover the full public model. The rest of this README just +names the pieces and explains where application-specific logic belongs. + +## Minimal integration path + +For a new application, the shortest sensible path is: + +1. define a typed registration payload +2. implement `RegistrationSnapshotMaterializer` +3. return node-local and optional shared artifacts +4. serve them with `serve(...)` +5. use `Client` on the node side + +That gives you the main value of the library without pushing application logic +into cfgsync itself. + +## API sketch + +Typed registration payload: + +```rust +use cfgsync_core::NodeRegistration; + +#[derive(serde::Serialize)] +struct MyNodeMetadata { + network_port: u16, + api_port: u16, +} + +let registration = NodeRegistration::new("node-1".to_string(), "127.0.0.1".parse().unwrap()) + .with_metadata(&MyNodeMetadata { + network_port: 3000, + api_port: 18080, + })?; +``` + +Snapshot materializer: + +```rust +use cfgsync_adapter::{ + DynCfgsyncError, MaterializationResult, MaterializedArtifacts, RegistrationSnapshot, + RegistrationSnapshotMaterializer, +}; +use cfgsync_artifacts::{ArtifactFile, ArtifactSet}; + +struct MyMaterializer; + +impl RegistrationSnapshotMaterializer for MyMaterializer { + fn materialize_snapshot( + &self, + registrations: &RegistrationSnapshot, + ) -> Result { + if registrations.len() < 2 { + return Ok(MaterializationResult::NotReady); + } + + let nodes = registrations.iter().map(|registration| { + ( + registration.identifier.clone(), + ArtifactSet::new(vec![ArtifactFile::new( + "/config.yaml", + format!("id: {}\n", registration.identifier), + )]), + ) + }); + + Ok(MaterializationResult::ready( + MaterializedArtifacts::from_nodes(nodes), + )) + } +} +``` + +Serving: + +```rust +use cfgsync_runtime::serve; + +# async fn run() -> anyhow::Result<()> { +serve(4400, MyMaterializer).await?; +# Ok(()) +# } +``` + +Fetching and writing artifacts: + +```rust +use cfgsync_runtime::{Client, OutputMap}; + +# async fn run(registration: cfgsync_core::NodeRegistration) -> anyhow::Result<()> { +let outputs = OutputMap::config_and_shared( + "/node-data/node-1/config.yaml", + "/node-data/shared", +); + +Client::new("http://127.0.0.1:4400") + .fetch_and_write(®istration, &outputs) + .await?; +# Ok(()) +# } +``` + +## Compatibility + +The intended public API is what the crate roots reexport today. + +Some older compatibility paths still exist internally to avoid breaking current in-repo consumers, but they are not the main model and should not be treated as the recommended public surface. diff --git a/cfgsync/adapter/Cargo.toml b/cfgsync/adapter/Cargo.toml new file mode 100644 index 0000000..f8055f5 --- /dev/null +++ b/cfgsync/adapter/Cargo.toml @@ -0,0 +1,19 @@ +[package] +categories = { workspace = true } +description = { workspace = true } +edition = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +name = "cfgsync-adapter" +readme = { workspace = true } +repository = { workspace = true } +version = { workspace = true } + +[lints] +workspace = true + +[dependencies] +cfgsync-artifacts = { workspace = true } +cfgsync-core = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } diff --git a/cfgsync/adapter/src/artifacts.rs b/cfgsync/adapter/src/artifacts.rs new file mode 100644 index 0000000..2830b10 --- /dev/null +++ b/cfgsync/adapter/src/artifacts.rs @@ -0,0 +1,75 @@ +use std::collections::HashMap; + +use cfgsync_artifacts::{ArtifactFile, ArtifactSet}; +use serde::{Deserialize, Serialize}; + +/// Fully materialized cfgsync artifacts for a registration set. +/// +/// `nodes` holds the node-local files keyed by stable node identifier. +/// `shared` holds files that should be delivered alongside every node. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct MaterializedArtifacts { + nodes: HashMap, + shared: ArtifactSet, +} + +impl MaterializedArtifacts { + /// Creates materialized artifacts from node-local artifact sets. + #[must_use] + pub fn from_nodes(nodes: I) -> Self + where + I: IntoIterator, + { + Self { + nodes: nodes.into_iter().collect(), + shared: ArtifactSet::default(), + } + } + + /// Attaches shared files delivered alongside every node. + #[must_use] + pub fn with_shared(mut self, shared: ArtifactSet) -> Self { + self.shared = shared; + self + } + + /// Returns the node-local artifact set for one identifier. + #[must_use] + pub fn node(&self, identifier: &str) -> Option<&ArtifactSet> { + self.nodes.get(identifier) + } + + /// Returns the shared artifact set. + #[must_use] + pub fn shared(&self) -> &ArtifactSet { + &self.shared + } + + /// Returns the number of node-local artifact sets. + #[must_use] + pub fn len(&self) -> usize { + self.nodes.len() + } + + /// Returns `true` when no node-local artifact sets are present. + #[must_use] + pub fn is_empty(&self) -> bool { + self.nodes.is_empty() + } + + /// Resolves the full file set that should be written for one node. + #[must_use] + pub fn resolve(&self, identifier: &str) -> Option { + let node = self.node(identifier)?; + let mut files: Vec = node.files.clone(); + files.extend(self.shared.files.iter().cloned()); + Some(ArtifactSet::new(files)) + } + + /// Iterates node-local artifact sets by stable identifier. + pub fn iter(&self) -> impl Iterator { + self.nodes + .iter() + .map(|(identifier, artifacts)| (identifier.as_str(), artifacts)) + } +} diff --git a/cfgsync/adapter/src/lib.rs b/cfgsync/adapter/src/lib.rs new file mode 100644 index 0000000..d27e9fe --- /dev/null +++ b/cfgsync/adapter/src/lib.rs @@ -0,0 +1,12 @@ +mod artifacts; +mod materializer; +mod registrations; +mod sources; + +pub use artifacts::MaterializedArtifacts; +pub use materializer::{ + CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, MaterializedArtifactsSink, + PersistingSnapshotMaterializer, RegistrationSnapshotMaterializer, +}; +pub use registrations::RegistrationSnapshot; +pub use sources::RegistrationConfigSource; diff --git a/cfgsync/adapter/src/materializer.rs b/cfgsync/adapter/src/materializer.rs new file mode 100644 index 0000000..3681e8c --- /dev/null +++ b/cfgsync/adapter/src/materializer.rs @@ -0,0 +1,279 @@ +use std::{error::Error, sync::Mutex}; + +use serde_json::to_string; + +use crate::{MaterializedArtifacts, RegistrationSnapshot}; + +/// Type-erased cfgsync adapter error used to preserve source context. +pub type DynCfgsyncError = Box; + +/// Adapter contract for materializing a whole registration snapshot into +/// cfgsync artifacts. +pub trait RegistrationSnapshotMaterializer: Send + Sync { + /// Materializes the current registration set. + /// + /// Implementations decide: + /// - when the current snapshot is ready to serve + /// - which per-node artifacts should be produced + /// - which shared artifacts should accompany every node + fn materialize_snapshot( + &self, + registrations: &RegistrationSnapshot, + ) -> Result; +} + +/// Optional hook for persisting or publishing materialized cfgsync artifacts. +pub trait MaterializedArtifactsSink: Send + Sync { + /// Persists or publishes a ready materialization result. + fn persist(&self, artifacts: &MaterializedArtifacts) -> Result<(), DynCfgsyncError>; +} + +/// Registration-driven materialization status. +#[derive(Debug, Clone, Default)] +pub enum MaterializationResult { + #[default] + NotReady, + Ready(MaterializedArtifacts), +} + +impl MaterializationResult { + /// Creates a ready materialization result. + #[must_use] + pub fn ready(artifacts: MaterializedArtifacts) -> Self { + Self::Ready(artifacts) + } + + /// Returns the ready artifacts when materialization succeeded. + #[must_use] + pub fn artifacts(&self) -> Option<&MaterializedArtifacts> { + match self { + Self::NotReady => None, + Self::Ready(artifacts) => Some(artifacts), + } + } +} + +/// Snapshot materializer wrapper that caches the last materialized result. +pub struct CachedSnapshotMaterializer { + inner: M, + cache: Mutex>, +} + +struct CachedSnapshot { + key: String, + result: MaterializationResult, +} + +impl CachedSnapshotMaterializer { + /// Wraps a snapshot materializer with deterministic snapshot-result + /// caching. + #[must_use] + pub fn new(inner: M) -> Self { + Self { + inner, + cache: Mutex::new(None), + } + } + + fn snapshot_key(registrations: &RegistrationSnapshot) -> Result { + Ok(to_string(registrations)?) + } +} + +impl RegistrationSnapshotMaterializer for CachedSnapshotMaterializer +where + M: RegistrationSnapshotMaterializer, +{ + fn materialize_snapshot( + &self, + registrations: &RegistrationSnapshot, + ) -> Result { + let key = Self::snapshot_key(registrations)?; + + { + let cache = self + .cache + .lock() + .expect("cfgsync snapshot cache should not be poisoned"); + + if let Some(cached) = &*cache + && cached.key == key + { + return Ok(cached.result.clone()); + } + } + + let result = self.inner.materialize_snapshot(registrations)?; + let mut cache = self + .cache + .lock() + .expect("cfgsync snapshot cache should not be poisoned"); + + *cache = Some(CachedSnapshot { + key, + result: result.clone(), + }); + + Ok(result) + } +} + +/// Snapshot materializer wrapper that persists ready results through a generic +/// sink. It only persists once per distinct registration snapshot. +pub struct PersistingSnapshotMaterializer { + inner: M, + sink: S, + persisted_key: Mutex>, +} + +impl PersistingSnapshotMaterializer { + /// Wraps a snapshot materializer with one-time persistence for each + /// distinct registration snapshot. + #[must_use] + pub fn new(inner: M, sink: S) -> Self { + Self { + inner, + sink, + persisted_key: Mutex::new(None), + } + } +} + +impl RegistrationSnapshotMaterializer for PersistingSnapshotMaterializer +where + M: RegistrationSnapshotMaterializer, + S: MaterializedArtifactsSink, +{ + fn materialize_snapshot( + &self, + registrations: &RegistrationSnapshot, + ) -> Result { + let key = CachedSnapshotMaterializer::::snapshot_key(registrations)?; + let result = self.inner.materialize_snapshot(registrations)?; + + let Some(artifacts) = result.artifacts() else { + return Ok(result); + }; + + { + let persisted_key = self + .persisted_key + .lock() + .expect("cfgsync persistence state should not be poisoned"); + + if persisted_key.as_deref() == Some(&key) { + return Ok(result); + } + } + + self.sink.persist(artifacts)?; + + let mut persisted_key = self + .persisted_key + .lock() + .expect("cfgsync persistence state should not be poisoned"); + *persisted_key = Some(key); + + Ok(result) + } +} + +#[cfg(test)] +mod tests { + use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }; + + use cfgsync_artifacts::{ArtifactFile, ArtifactSet}; + + use super::{ + CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, MaterializedArtifacts, + MaterializedArtifactsSink, PersistingSnapshotMaterializer, + RegistrationSnapshotMaterializer, + }; + use crate::RegistrationSnapshot; + + struct CountingMaterializer; + + impl RegistrationSnapshotMaterializer for CountingMaterializer { + fn materialize_snapshot( + &self, + registrations: &RegistrationSnapshot, + ) -> Result { + if registrations.is_empty() { + return Ok(MaterializationResult::NotReady); + } + + let nodes = registrations.iter().map(|registration| { + ( + registration.identifier.clone(), + ArtifactSet::new(vec![ArtifactFile::new( + "/config.yaml".to_string(), + "ready: true".to_string(), + )]), + ) + }); + + Ok(MaterializationResult::ready( + MaterializedArtifacts::from_nodes(nodes).with_shared(ArtifactSet::new(vec![ + ArtifactFile::new("/shared.yaml".to_string(), "cluster: ready".to_string()), + ])), + )) + } + } + + struct CountingSink { + writes: Arc, + } + + impl MaterializedArtifactsSink for CountingSink { + fn persist(&self, _artifacts: &MaterializedArtifacts) -> Result<(), DynCfgsyncError> { + self.writes.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + } + + #[test] + fn cached_snapshot_materializer_reuses_previous_result() { + let materializer = CachedSnapshotMaterializer::new(CountingMaterializer); + let snapshot = RegistrationSnapshot::new(vec![cfgsync_core::NodeRegistration::new( + "node-1".to_string(), + "127.0.0.1".parse().expect("parse ip"), + )]); + + let first = materializer + .materialize_snapshot(&snapshot) + .expect("first materialization"); + let second = materializer + .materialize_snapshot(&snapshot) + .expect("second materialization"); + + assert!(matches!(first, MaterializationResult::Ready(_))); + assert!(matches!(second, MaterializationResult::Ready(_))); + } + + #[test] + fn persisting_snapshot_materializer_writes_ready_snapshots_once() { + let writes = Arc::new(AtomicUsize::new(0)); + let materializer = PersistingSnapshotMaterializer::new( + CountingMaterializer, + CountingSink { + writes: writes.clone(), + }, + ); + let snapshot = RegistrationSnapshot::new(vec![cfgsync_core::NodeRegistration::new( + "node-1".to_string(), + "127.0.0.1".parse().expect("parse ip"), + )]); + + materializer + .materialize_snapshot(&snapshot) + .expect("first materialization"); + materializer + .materialize_snapshot(&snapshot) + .expect("second materialization"); + + assert_eq!(writes.load(Ordering::SeqCst), 1); + } +} diff --git a/cfgsync/adapter/src/registrations.rs b/cfgsync/adapter/src/registrations.rs new file mode 100644 index 0000000..4970a8b --- /dev/null +++ b/cfgsync/adapter/src/registrations.rs @@ -0,0 +1,44 @@ +use cfgsync_core::NodeRegistration; +use serde::Serialize; + +/// Immutable view of registrations currently known to cfgsync. +#[derive(Debug, Clone, Default, Serialize)] +pub struct RegistrationSnapshot { + registrations: Vec, +} + +impl RegistrationSnapshot { + /// Creates a stable registration snapshot sorted by node identifier. + #[must_use] + pub fn new(mut registrations: Vec) -> Self { + registrations.sort_by(|left, right| left.identifier.cmp(&right.identifier)); + + Self { registrations } + } + + /// Returns the number of registrations in the snapshot. + #[must_use] + pub fn len(&self) -> usize { + self.registrations.len() + } + + /// Returns `true` when the snapshot contains no registrations. + #[must_use] + pub fn is_empty(&self) -> bool { + self.registrations.is_empty() + } + + /// Iterates registrations in deterministic identifier order. + #[must_use] + pub fn iter(&self) -> impl Iterator { + self.registrations.iter() + } + + /// Looks up a registration by its stable node identifier. + #[must_use] + pub fn get(&self, identifier: &str) -> Option<&NodeRegistration> { + self.registrations + .iter() + .find(|registration| registration.identifier == identifier) + } +} diff --git a/cfgsync/adapter/src/sources.rs b/cfgsync/adapter/src/sources.rs new file mode 100644 index 0000000..8e61acf --- /dev/null +++ b/cfgsync/adapter/src/sources.rs @@ -0,0 +1,260 @@ +use std::{collections::HashMap, sync::Mutex}; + +use cfgsync_core::{ + CfgsyncErrorResponse, ConfigResolveResponse, NodeArtifactsPayload, NodeConfigSource, + NodeRegistration, RegisterNodeResponse, +}; + +use crate::{ + DynCfgsyncError, MaterializationResult, MaterializedArtifacts, RegistrationSnapshot, + RegistrationSnapshotMaterializer, +}; + +impl RegistrationSnapshotMaterializer for MaterializedArtifacts { + fn materialize_snapshot( + &self, + _registrations: &RegistrationSnapshot, + ) -> Result { + Ok(MaterializationResult::ready(self.clone())) + } +} + +/// Registration-aware source backed by a snapshot materializer. +pub struct RegistrationConfigSource { + materializer: M, + registrations: Mutex>, +} + +impl RegistrationConfigSource { + #[must_use] + pub fn new(materializer: M) -> Self { + Self { + materializer, + registrations: Mutex::new(HashMap::new()), + } + } + + fn registration_for(&self, identifier: &str) -> Option { + let registrations = self + .registrations + .lock() + .expect("cfgsync registration store should not be poisoned"); + + registrations.get(identifier).cloned() + } + + fn registration_snapshot(&self) -> RegistrationSnapshot { + let registrations = self + .registrations + .lock() + .expect("cfgsync registration store should not be poisoned"); + + RegistrationSnapshot::new(registrations.values().cloned().collect()) + } +} + +impl NodeConfigSource for RegistrationConfigSource +where + M: RegistrationSnapshotMaterializer, +{ + fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse { + let mut registrations = self + .registrations + .lock() + .expect("cfgsync registration store should not be poisoned"); + registrations.insert(registration.identifier.clone(), registration); + + RegisterNodeResponse::Registered + } + + fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse { + let registration = match self.registration_for(®istration.identifier) { + Some(registration) => registration, + None => { + return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready( + ®istration.identifier, + )); + } + }; + + let registrations = self.registration_snapshot(); + let materialized = match self.materializer.materialize_snapshot(®istrations) { + Ok(MaterializationResult::Ready(materialized)) => materialized, + Ok(MaterializationResult::NotReady) => { + return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready( + ®istration.identifier, + )); + } + Err(error) => { + return ConfigResolveResponse::Error(CfgsyncErrorResponse::internal(format!( + "failed to materialize config snapshot: {error}" + ))); + } + }; + + match materialized.resolve(®istration.identifier) { + Some(config) => { + ConfigResolveResponse::Config(NodeArtifactsPayload::from_files(config.files)) + } + None => ConfigResolveResponse::Error(CfgsyncErrorResponse::missing_config( + ®istration.identifier, + )), + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicUsize, Ordering}; + + use cfgsync_artifacts::{ArtifactFile, ArtifactSet}; + use cfgsync_core::{ + CfgsyncErrorCode, ConfigResolveResponse, NodeConfigSource, NodeRegistration, + }; + + use super::RegistrationConfigSource; + use crate::{ + CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, MaterializedArtifacts, + RegistrationSnapshot, RegistrationSnapshotMaterializer, + }; + + #[test] + fn registration_source_resolves_identifier() { + let artifacts = MaterializedArtifacts::from_nodes([( + "node-1".to_owned(), + ArtifactSet::new(vec![ArtifactFile::new( + "/config.yaml".to_string(), + "a: 1".to_string(), + )]), + )]); + let source = RegistrationConfigSource::new(artifacts); + + let registration = + NodeRegistration::new("node-1".to_string(), "127.0.0.1".parse().expect("parse ip")); + let _ = source.register(registration.clone()); + + match source.resolve(®istration) { + ConfigResolveResponse::Config(payload) => assert_eq!(payload.files.len(), 1), + ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"), + } + } + + #[test] + fn registration_source_reports_not_ready_before_registration() { + let artifacts = MaterializedArtifacts::from_nodes([( + "node-1".to_owned(), + ArtifactSet::new(vec![ArtifactFile::new( + "/config.yaml".to_string(), + "a: 1".to_string(), + )]), + )]); + let source = RegistrationConfigSource::new(artifacts); + + let registration = + NodeRegistration::new("node-1".to_string(), "127.0.0.1".parse().expect("parse ip")); + + match source.resolve(®istration) { + ConfigResolveResponse::Config(_) => panic!("expected not-ready"), + ConfigResolveResponse::Error(error) => { + assert!(matches!(error.code, CfgsyncErrorCode::NotReady)); + } + } + } + + struct ThresholdSnapshotMaterializer; + + impl RegistrationSnapshotMaterializer for ThresholdSnapshotMaterializer { + fn materialize_snapshot( + &self, + registrations: &RegistrationSnapshot, + ) -> Result { + if registrations.len() < 2 { + return Ok(MaterializationResult::NotReady); + } + + let nodes = registrations.iter().map(|registration| { + ( + registration.identifier.clone(), + ArtifactSet::new(vec![ArtifactFile::new( + "/config.yaml".to_string(), + format!("id: {}", registration.identifier), + )]), + ) + }); + + Ok(MaterializationResult::ready( + MaterializedArtifacts::from_nodes(nodes).with_shared(ArtifactSet::new(vec![ + ArtifactFile::new("/shared.yaml".to_string(), "cluster: ready".to_string()), + ])), + )) + } + } + + #[test] + fn registration_source_materializes_from_registration_snapshot() { + let source = RegistrationConfigSource::new(ThresholdSnapshotMaterializer); + let node_1 = + NodeRegistration::new("node-1".to_string(), "127.0.0.1".parse().expect("parse ip")); + let node_2 = + NodeRegistration::new("node-2".to_string(), "127.0.0.2".parse().expect("parse ip")); + + let _ = source.register(node_1.clone()); + match source.resolve(&node_1) { + ConfigResolveResponse::Config(_) => panic!("expected not-ready before threshold"), + ConfigResolveResponse::Error(error) => { + assert!(matches!(error.code, CfgsyncErrorCode::NotReady)); + } + } + + let _ = source.register(node_2.clone()); + + match source.resolve(&node_1) { + ConfigResolveResponse::Config(payload) => { + assert_eq!(payload.files.len(), 2); + assert_eq!(payload.files[0].path, "/config.yaml"); + assert_eq!(payload.files[1].path, "/shared.yaml"); + } + ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"), + } + } + + struct CountingSnapshotMaterializer { + calls: AtomicUsize, + } + + impl RegistrationSnapshotMaterializer for CountingSnapshotMaterializer { + fn materialize_snapshot( + &self, + registrations: &RegistrationSnapshot, + ) -> Result { + self.calls.fetch_add(1, Ordering::SeqCst); + + Ok(MaterializationResult::ready( + MaterializedArtifacts::from_nodes(registrations.iter().map(|registration| { + ( + registration.identifier.clone(), + ArtifactSet::new(vec![ArtifactFile::new( + "/config.yaml".to_string(), + format!("id: {}", registration.identifier), + )]), + ) + })), + )) + } + } + + #[test] + fn cached_snapshot_materializer_reuses_previous_result() { + let source = RegistrationConfigSource::new(CachedSnapshotMaterializer::new( + CountingSnapshotMaterializer { + calls: AtomicUsize::new(0), + }, + )); + let registration = + NodeRegistration::new("node-1".to_string(), "127.0.0.1".parse().expect("parse ip")); + + let _ = source.register(registration.clone()); + let _ = source.resolve(®istration); + let _ = source.resolve(®istration); + } +} diff --git a/cfgsync/artifacts/Cargo.toml b/cfgsync/artifacts/Cargo.toml new file mode 100644 index 0000000..7c2db1e --- /dev/null +++ b/cfgsync/artifacts/Cargo.toml @@ -0,0 +1,17 @@ +[package] +categories = { workspace = true } +description = "App-agnostic cfgsync artifact model" +edition = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +name = "cfgsync-artifacts" +readme = { workspace = true } +repository = { workspace = true } +version = { workspace = true } + +[lints] +workspace = true + +[dependencies] +serde = { workspace = true } +thiserror = { workspace = true } diff --git a/cfgsync/artifacts/src/lib.rs b/cfgsync/artifacts/src/lib.rs new file mode 100644 index 0000000..ef04ce2 --- /dev/null +++ b/cfgsync/artifacts/src/lib.rs @@ -0,0 +1,61 @@ +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +/// Single file artifact delivered to a node. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ArtifactFile { + /// Destination path where content should be written. + pub path: String, + /// Raw file contents. + pub content: String, +} + +impl ArtifactFile { + #[must_use] + pub fn new(path: String, content: String) -> Self { + Self { path, content } + } +} + +/// Collection of files delivered together for one node. +#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)] +pub struct ArtifactSet { + pub files: Vec, +} + +impl ArtifactSet { + #[must_use] + pub fn new(files: Vec) -> Self { + Self { files } + } + + #[must_use] + pub fn len(&self) -> usize { + self.files.len() + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.files.is_empty() + } + + /// Validates that no two files target the same output path. + pub fn ensure_unique_paths(&self) -> Result<(), ArtifactValidationError> { + let mut seen = std::collections::HashSet::new(); + + for file in &self.files { + if !seen.insert(file.path.clone()) { + return Err(ArtifactValidationError::DuplicatePath(file.path.clone())); + } + } + + Ok(()) + } +} + +/// Validation failures for [`ArtifactSet`]. +#[derive(Debug, Error)] +pub enum ArtifactValidationError { + #[error("duplicate artifact path `{0}`")] + DuplicatePath(String), +} diff --git a/cfgsync/core/Cargo.toml b/cfgsync/core/Cargo.toml new file mode 100644 index 0000000..fea2c39 --- /dev/null +++ b/cfgsync/core/Cargo.toml @@ -0,0 +1,27 @@ +[package] +categories = { workspace = true } +description = { workspace = true } +edition = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +name = "cfgsync-core" +readme = { workspace = true } +repository = { workspace = true } +version = { workspace = true } + +[lints] +workspace = true + +[dependencies] +anyhow = "1" +axum = { default-features = false, features = ["http1", "http2", "json", "tokio"], version = "0.7.5" } +cfgsync-artifacts = { workspace = true } +reqwest = { features = ["json"], workspace = true } +serde = { default-features = false, features = ["derive"], version = "1" } +serde_json = { workspace = true } +serde_yaml = { workspace = true } +thiserror = { workspace = true } +tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" } + +[dev-dependencies] +tempfile = { workspace = true } diff --git a/cfgsync/core/src/bundle.rs b/cfgsync/core/src/bundle.rs new file mode 100644 index 0000000..3fc5675 --- /dev/null +++ b/cfgsync/core/src/bundle.rs @@ -0,0 +1,44 @@ +use serde::{Deserialize, Serialize}; + +use crate::NodeArtifactFile; + +/// Static cfgsync artifact bundle. +/// +/// This is the bundle-oriented source format used when all artifacts are known +/// ahead of time and no registration-time materialization is required. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodeArtifactsBundle { + /// Per-node artifact entries keyed by identifier. + pub nodes: Vec, + /// Files that should be served alongside every node-specific entry. + #[serde(default)] + pub shared_files: Vec, +} + +impl NodeArtifactsBundle { + /// Creates a bundle with node-specific entries only. + #[must_use] + pub fn new(nodes: Vec) -> Self { + Self { + nodes, + shared_files: Vec::new(), + } + } + + /// Attaches files that should be served alongside every node entry. + #[must_use] + pub fn with_shared_files(mut self, shared_files: Vec) -> Self { + self.shared_files = shared_files; + self + } +} + +/// One node entry inside a static cfgsync bundle. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodeArtifactsBundleEntry { + /// Stable node identifier used by cfgsync lookup. + pub identifier: String, + /// Files that should be materialized for the node. + #[serde(default)] + pub files: Vec, +} diff --git a/cfgsync/core/src/client.rs b/cfgsync/core/src/client.rs new file mode 100644 index 0000000..52dd22b --- /dev/null +++ b/cfgsync/core/src/client.rs @@ -0,0 +1,157 @@ +use serde::Serialize; +use thiserror::Error; + +use crate::{CfgsyncErrorCode, CfgsyncErrorResponse, NodeArtifactsPayload, NodeRegistration}; + +/// cfgsync client-side request/response failures. +#[derive(Debug, Error)] +pub enum ClientError { + #[error("request failed: {0}")] + Request(#[from] reqwest::Error), + #[error("cfgsync server error {status}: {message}")] + Status { + status: reqwest::StatusCode, + message: String, + error: Option, + }, + #[error("failed to parse cfgsync response: {0}")] + Decode(serde_json::Error), +} + +/// Result of probing cfgsync for a node's current artifact availability. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConfigFetchStatus { + /// The node payload is ready and can be fetched successfully. + Ready, + /// The node has registered but artifacts are not ready yet. + NotReady, + /// The server does not know how to materialize artifacts for this node. + Missing, +} + +/// Reusable HTTP client for cfgsync server endpoints. +#[derive(Clone, Debug)] +pub struct Client { + base_url: String, + http: reqwest::Client, +} + +impl Client { + /// Creates a cfgsync client pointed at the given server base URL. + #[must_use] + pub fn new(base_url: String) -> Self { + let mut base_url = base_url; + while base_url.ends_with('/') { + base_url.pop(); + } + Self { + base_url, + http: reqwest::Client::new(), + } + } + + /// Returns the normalized cfgsync server base URL used for requests. + #[must_use] + pub fn base_url(&self) -> &str { + &self.base_url + } + + /// Registers a node before requesting config. + pub async fn register_node(&self, payload: &NodeRegistration) -> Result<(), ClientError> { + self.post_status_only("/register", payload).await + } + + /// Fetches `/node` payload for a node identifier. + pub async fn fetch_node_config( + &self, + payload: &NodeRegistration, + ) -> Result { + self.post_json("/node", payload).await + } + + /// Probes whether artifacts for a node are ready, missing, or still + /// pending. + pub async fn fetch_node_config_status( + &self, + payload: &NodeRegistration, + ) -> Result { + match self.fetch_node_config(payload).await { + Ok(_) => Ok(ConfigFetchStatus::Ready), + Err(ClientError::Status { + status, + error: Some(error), + .. + }) => match error.code { + CfgsyncErrorCode::NotReady => Ok(ConfigFetchStatus::NotReady), + CfgsyncErrorCode::MissingConfig => Ok(ConfigFetchStatus::Missing), + CfgsyncErrorCode::Internal => Err(ClientError::Status { + status, + message: error.message.clone(), + error: Some(error), + }), + }, + Err(error) => Err(error), + } + } + + /// Posts JSON payload to a cfgsync endpoint and decodes cfgsync payload. + pub async fn post_json( + &self, + path: &str, + payload: &P, + ) -> Result { + let url = self.endpoint_url(path); + let response = self.http.post(url).json(payload).send().await?; + + let status = response.status(); + let body = response.text().await?; + if !status.is_success() { + let error = serde_json::from_str::(&body).ok(); + let message = error + .as_ref() + .map(|err| err.message.clone()) + .unwrap_or_else(|| body.clone()); + return Err(ClientError::Status { + status, + message, + error, + }); + } + + serde_json::from_str(&body).map_err(ClientError::Decode) + } + + async fn post_status_only( + &self, + path: &str, + payload: &P, + ) -> Result<(), ClientError> { + let url = self.endpoint_url(path); + let response = self.http.post(url).json(payload).send().await?; + + let status = response.status(); + let body = response.text().await?; + if !status.is_success() { + let error = serde_json::from_str::(&body).ok(); + let message = error + .as_ref() + .map(|err| err.message.clone()) + .unwrap_or_else(|| body.clone()); + return Err(ClientError::Status { + status, + message, + error, + }); + } + + Ok(()) + } + + fn endpoint_url(&self, path: &str) -> String { + if path.starts_with('/') { + format!("{}{}", self.base_url, path) + } else { + format!("{}/{}", self.base_url, path) + } + } +} diff --git a/cfgsync/core/src/compat.rs b/cfgsync/core/src/compat.rs new file mode 100644 index 0000000..d495592 --- /dev/null +++ b/cfgsync/core/src/compat.rs @@ -0,0 +1,20 @@ +#![doc(hidden)] + +pub use crate::{ + bundle::{NodeArtifactsBundle as CfgSyncBundle, NodeArtifactsBundleEntry as CfgSyncBundleNode}, + client::Client as CfgSyncClient, + protocol::{ + CfgsyncErrorCode as CfgSyncErrorCode, CfgsyncErrorResponse as CfgSyncErrorResponse, + ConfigResolveResponse as RepoResponse, NodeArtifactFile as CfgSyncFile, + NodeArtifactsPayload as CfgSyncPayload, RegisterNodeResponse as RegistrationResponse, + }, + server::{ + CfgsyncServerState as CfgSyncState, build_legacy_cfgsync_router as cfgsync_app, + serve_cfgsync as run_cfgsync, + }, + source::{ + BundleConfigSource as FileConfigProvider, + BundleConfigSourceError as FileConfigProviderError, NodeConfigSource as ConfigProvider, + StaticConfigSource as ConfigRepo, + }, +}; diff --git a/cfgsync/core/src/lib.rs b/cfgsync/core/src/lib.rs new file mode 100644 index 0000000..e855acb --- /dev/null +++ b/cfgsync/core/src/lib.rs @@ -0,0 +1,26 @@ +pub mod bundle; +pub mod client; +#[doc(hidden)] +pub mod compat; +pub mod protocol; +pub mod render; +pub mod server; +pub mod source; + +pub use bundle::{NodeArtifactsBundle, NodeArtifactsBundleEntry}; +pub use client::{Client, ClientError, ConfigFetchStatus}; +pub use protocol::{ + CFGSYNC_SCHEMA_VERSION, CfgsyncErrorCode, CfgsyncErrorResponse, ConfigResolveResponse, + NodeArtifactFile, NodeArtifactsPayload, NodeRegistration, RegisterNodeResponse, + RegistrationPayload, +}; +pub use render::{ + CfgsyncConfigOverrides, CfgsyncOutputPaths, RenderedCfgsync, apply_cfgsync_overrides, + apply_timeout_floor, ensure_artifacts_path, load_cfgsync_template_yaml, + render_cfgsync_yaml_from_template, write_rendered_cfgsync, +}; +pub use server::{CfgsyncServerState, RunCfgsyncError, build_cfgsync_router, serve_cfgsync}; +pub use source::{ + BundleConfigSource, BundleConfigSourceError, BundleLoadError, NodeConfigSource, + StaticConfigSource, bundle_to_payload_map, load_bundle, +}; diff --git a/cfgsync/core/src/protocol.rs b/cfgsync/core/src/protocol.rs new file mode 100644 index 0000000..2b81ca4 --- /dev/null +++ b/cfgsync/core/src/protocol.rs @@ -0,0 +1,286 @@ +use std::net::Ipv4Addr; + +use cfgsync_artifacts::ArtifactFile; +use serde::{Deserialize, Deserializer, Serialize, Serializer, de::DeserializeOwned}; +use serde_json::Value; +use thiserror::Error; + +/// Schema version served by cfgsync payload responses. +pub const CFGSYNC_SCHEMA_VERSION: u16 = 1; + +/// Canonical cfgsync file type used in payloads and bundles. +pub type NodeArtifactFile = ArtifactFile; + +/// Payload returned by cfgsync server for one node. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodeArtifactsPayload { + /// Payload schema version for compatibility checks. + pub schema_version: u16, + /// Files that must be written on the target node. + #[serde(default)] + pub files: Vec, +} + +/// Adapter-owned registration payload stored alongside a generic node identity. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct RegistrationPayload { + raw_json: Option, +} + +impl RegistrationPayload { + /// Creates an empty adapter-owned payload. + #[must_use] + pub fn new() -> Self { + Self::default() + } + + /// Returns `true` when no adapter-owned payload was attached. + #[must_use] + pub fn is_empty(&self) -> bool { + self.raw_json.is_none() + } + + /// Stores one typed adapter payload as opaque JSON. + pub fn from_serializable(value: &T) -> Result + where + T: Serialize, + { + Ok(Self { + raw_json: Some(serde_json::to_string(value)?), + }) + } + + /// Stores a raw JSON payload after validating that it parses. + pub fn from_json_str(raw_json: &str) -> Result { + let value: Value = serde_json::from_str(raw_json)?; + + Ok(Self { + raw_json: Some(serde_json::to_string(&value)?), + }) + } + + /// Deserializes the adapter-owned payload into the requested type. + pub fn deserialize(&self) -> Result, serde_json::Error> + where + T: DeserializeOwned, + { + self.raw_json + .as_ref() + .map(|raw_json| serde_json::from_str(raw_json)) + .transpose() + } + + /// Returns the validated JSON representation stored in this payload. + #[must_use] + pub fn raw_json(&self) -> Option<&str> { + self.raw_json.as_deref() + } +} + +impl Serialize for RegistrationPayload { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self.raw_json.as_deref() { + Some(raw_json) => { + let value: Value = + serde_json::from_str(raw_json).map_err(serde::ser::Error::custom)?; + value.serialize(serializer) + } + None => serializer.serialize_none(), + } + } +} + +impl<'de> Deserialize<'de> for RegistrationPayload { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let value = Option::::deserialize(deserializer)?; + let raw_json = value + .map(|value| serde_json::to_string(&value).map_err(serde::de::Error::custom)) + .transpose()?; + + Ok(Self { raw_json }) + } +} + +/// Node metadata recorded before config materialization. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct NodeRegistration { + /// Stable node identifier used for registration and artifact lookup. + pub identifier: String, + /// IPv4 address advertised as part of registration. + pub ip: Ipv4Addr, + /// Adapter-owned payload interpreted only by the app materializer. + #[serde(default, skip_serializing_if = "RegistrationPayload::is_empty")] + pub metadata: RegistrationPayload, +} + +impl NodeRegistration { + /// Creates a registration with the generic node identity fields only. + #[must_use] + pub fn new(identifier: String, ip: Ipv4Addr) -> Self { + Self { + identifier, + ip, + metadata: RegistrationPayload::default(), + } + } + + /// Attaches one typed adapter-owned payload to this registration. + pub fn with_metadata(mut self, metadata: &T) -> Result + where + T: Serialize, + { + self.metadata = RegistrationPayload::from_serializable(metadata)?; + Ok(self) + } + + /// Attaches a prebuilt registration payload to this registration. + #[must_use] + pub fn with_payload(mut self, payload: RegistrationPayload) -> Self { + self.metadata = payload; + self + } +} + +impl NodeArtifactsPayload { + /// Creates a payload from the files that should be written for one node. + #[must_use] + pub fn from_files(files: Vec) -> Self { + Self { + schema_version: CFGSYNC_SCHEMA_VERSION, + files, + } + } + + /// Returns the files carried by this payload. + #[must_use] + pub fn files(&self) -> &[NodeArtifactFile] { + &self.files + } + + /// Returns `true` when the payload carries no files. + #[must_use] + pub fn is_empty(&self) -> bool { + self.files.is_empty() + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum CfgsyncErrorCode { + /// No artifact payload is available for the requested node. + MissingConfig, + /// The node is registered but artifacts are not ready yet. + NotReady, + /// An unexpected server-side failure occurred. + Internal, +} + +/// Structured error body returned by cfgsync server. +#[derive(Debug, Clone, Serialize, Deserialize, Error)] +#[error("{code:?}: {message}")] +pub struct CfgsyncErrorResponse { + /// Machine-readable failure category. + pub code: CfgsyncErrorCode, + /// Human-readable error details. + pub message: String, +} + +impl CfgsyncErrorResponse { + /// Builds a missing-config error for one identifier. + #[must_use] + pub fn missing_config(identifier: &str) -> Self { + Self { + code: CfgsyncErrorCode::MissingConfig, + message: format!("missing config for host {identifier}"), + } + } + + /// Builds a not-ready error for one identifier. + #[must_use] + pub fn not_ready(identifier: &str) -> Self { + Self { + code: CfgsyncErrorCode::NotReady, + message: format!("config for host {identifier} is not ready"), + } + } + + /// Builds an internal cfgsync error. + #[must_use] + pub fn internal(message: String) -> Self { + Self { + code: CfgsyncErrorCode::Internal, + message, + } + } +} + +/// Resolution outcome for a requested node identifier. +pub enum ConfigResolveResponse { + /// Artifacts are ready for the requested node. + Config(NodeArtifactsPayload), + /// Artifacts could not be resolved for the requested node. + Error(CfgsyncErrorResponse), +} + +/// Outcome for a node registration request. +pub enum RegisterNodeResponse { + /// Registration was accepted. + Registered, + /// Registration failed. + Error(CfgsyncErrorResponse), +} + +#[cfg(test)] +mod tests { + use serde::{Deserialize, Serialize}; + use serde_json::Value; + + use super::{NodeRegistration, RegistrationPayload}; + + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + struct ExampleRegistration { + network_port: u16, + service: String, + } + + #[test] + fn registration_payload_round_trips_typed_value() { + let registration = + NodeRegistration::new("node-1".to_string(), "127.0.0.1".parse().expect("parse ip")) + .with_metadata(&ExampleRegistration { + network_port: 3000, + service: "blend".to_owned(), + }) + .expect("serialize registration metadata"); + + let encoded = serde_json::to_value(®istration).expect("serialize registration"); + let metadata = encoded.get("metadata").expect("registration metadata"); + assert_eq!(metadata.get("network_port"), Some(&Value::from(3000u16))); + assert_eq!(metadata.get("service"), Some(&Value::from("blend"))); + + let decoded: NodeRegistration = + serde_json::from_value(encoded).expect("deserialize registration"); + let typed: ExampleRegistration = decoded + .metadata + .deserialize() + .expect("deserialize metadata") + .expect("registration metadata value"); + + assert_eq!(typed.network_port, 3000); + assert_eq!(typed.service, "blend"); + } + + #[test] + fn registration_payload_accepts_raw_json() { + let payload = + RegistrationPayload::from_json_str(r#"{"network_port":3000}"#).expect("parse raw json"); + + assert_eq!(payload.raw_json(), Some(r#"{"network_port":3000}"#)); + } +} diff --git a/testing-framework/tools/cfgsync-runtime/src/render.rs b/cfgsync/core/src/render.rs similarity index 60% rename from testing-framework/tools/cfgsync-runtime/src/render.rs rename to cfgsync/core/src/render.rs index 0f59b5c..51af474 100644 --- a/testing-framework/tools/cfgsync-runtime/src/render.rs +++ b/cfgsync/core/src/render.rs @@ -2,63 +2,88 @@ use std::{fs, path::Path}; use anyhow::{Context as _, Result}; use serde_yaml::{Mapping, Value}; +use thiserror::Error; +/// Rendered cfgsync outputs written for server startup. #[derive(Debug, Clone)] pub struct RenderedCfgsync { + /// Serialized cfgsync server config YAML. pub config_yaml: String, - pub bundle_yaml: String, + /// Serialized precomputed artifact YAML used by cfgsync runtime. + pub artifacts_yaml: String, } +/// Output paths used when materializing rendered cfgsync files. #[derive(Debug, Clone, Copy)] pub struct CfgsyncOutputPaths<'a> { + /// Output path for the rendered server config YAML. pub config_path: &'a Path, - pub bundle_path: &'a Path, + /// Output path for the rendered precomputed artifacts YAML. + pub artifacts_path: &'a Path, } -pub fn ensure_bundle_path(bundle_path: &mut Option, output_bundle_path: &Path) { - if bundle_path.is_some() { +/// Ensures artifacts path override exists, defaulting to the output artifacts +/// file name. +pub fn ensure_artifacts_path(artifacts_path: &mut Option, output_artifacts_path: &Path) { + if artifacts_path.is_some() { return; } - *bundle_path = Some( - output_bundle_path + *artifacts_path = Some( + output_artifacts_path .file_name() .and_then(|name| name.to_str()) - .unwrap_or("cfgsync.bundle.yaml") + .unwrap_or("cfgsync.artifacts.yaml") .to_string(), ); } +/// Applies a minimum timeout floor to an existing timeout value. pub fn apply_timeout_floor(timeout: &mut u64, min_timeout_secs: Option) { if let Some(min_timeout_secs) = min_timeout_secs { *timeout = (*timeout).max(min_timeout_secs); } } +/// Writes rendered cfgsync server and bundle YAML files. pub fn write_rendered_cfgsync( rendered: &RenderedCfgsync, output: CfgsyncOutputPaths<'_>, ) -> Result<()> { fs::write(output.config_path, &rendered.config_yaml)?; - fs::write(output.bundle_path, &rendered.bundle_yaml)?; + fs::write(output.artifacts_path, &rendered.artifacts_yaml)?; Ok(()) } +/// Optional overrides applied to a cfgsync template document. #[derive(Debug, Clone, Default)] pub struct CfgsyncConfigOverrides { + /// Override for the HTTP listen port. pub port: Option, + /// Override for the expected initial host count. pub n_hosts: Option, + /// Minimum timeout to enforce on the rendered template. pub timeout_floor_secs: Option, - pub bundle_path: Option, + /// Override for the precomputed artifacts path written into cfgsync config. + pub artifacts_path: Option, + /// Optional OTLP metrics endpoint injected into tracing settings. pub metrics_otlp_ingest_url: Option, } +#[derive(Debug, Error)] +enum RenderTemplateError { + #[error("cfgsync template key `{key}` must be a YAML map")] + NonMappingEntry { key: String }, +} + +/// Loads cfgsync template YAML from disk. pub fn load_cfgsync_template_yaml(path: &Path) -> Result { let file = fs::File::open(path) .with_context(|| format!("opening cfgsync template at {}", path.display()))?; serde_yaml::from_reader(file).context("parsing cfgsync template") } +/// Renders cfgsync config YAML by applying overrides to a template document. pub fn render_cfgsync_yaml_from_template( mut template: Value, overrides: &CfgsyncConfigOverrides, @@ -67,6 +92,7 @@ pub fn render_cfgsync_yaml_from_template( serde_yaml::to_string(&template).context("serializing rendered cfgsync config") } +/// Applies cfgsync-specific override fields to a mutable YAML document. pub fn apply_cfgsync_overrides( template: &mut Value, overrides: &CfgsyncConfigOverrides, @@ -87,10 +113,10 @@ pub fn apply_cfgsync_overrides( ); } - if let Some(bundle_path) = &overrides.bundle_path { + if let Some(artifacts_path) = &overrides.artifacts_path { root.insert( - Value::String("bundle_path".to_string()), - Value::String(bundle_path.clone()), + Value::String("artifacts_path".to_string()), + Value::String(artifacts_path.clone()), ); } @@ -105,7 +131,7 @@ pub fn apply_cfgsync_overrides( } if let Some(endpoint) = &overrides.metrics_otlp_ingest_url { - let tracing_settings = nested_mapping_mut(root, "tracing_settings"); + let tracing_settings = nested_mapping_mut(root, "tracing_settings")?; tracing_settings.insert( Value::String("metrics".to_string()), parse_otlp_metrics_layer(endpoint)?, @@ -121,19 +147,20 @@ fn mapping_mut(value: &mut Value) -> Result<&mut Mapping> { .context("cfgsync template root must be a YAML map") } -fn nested_mapping_mut<'a>(mapping: &'a mut Mapping, key: &str) -> &'a mut Mapping { - let key = Value::String(key.to_string()); +fn nested_mapping_mut<'a>(mapping: &'a mut Mapping, key: &str) -> Result<&'a mut Mapping> { + let key_name = key.to_owned(); + let key = Value::String(key_name.clone()); let entry = mapping .entry(key) .or_insert_with(|| Value::Mapping(Mapping::new())); if !entry.is_mapping() { - *entry = Value::Mapping(Mapping::new()); + return Err(RenderTemplateError::NonMappingEntry { key: key_name }).map_err(Into::into); } entry .as_mapping_mut() - .expect("mapping entry should always be a mapping") + .context("cfgsync template entry should be a YAML map") } fn parse_otlp_metrics_layer(endpoint: &str) -> Result { diff --git a/cfgsync/core/src/server.rs b/cfgsync/core/src/server.rs new file mode 100644 index 0000000..1c56f54 --- /dev/null +++ b/cfgsync/core/src/server.rs @@ -0,0 +1,291 @@ +use std::{io, sync::Arc}; + +use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::post}; +use thiserror::Error; + +use crate::{ + CfgsyncErrorCode, ConfigResolveResponse, NodeConfigSource, NodeRegistration, + RegisterNodeResponse, +}; + +/// Runtime state shared across cfgsync HTTP handlers. +pub struct CfgsyncServerState { + repo: Arc, +} + +impl CfgsyncServerState { + /// Wraps a node config source for use by cfgsync HTTP handlers. + #[must_use] + pub fn new(repo: Arc) -> Self { + Self { repo } + } +} + +/// Fatal runtime failures when serving cfgsync HTTP endpoints. +#[derive(Debug, Error)] +pub enum RunCfgsyncError { + #[error("failed to bind cfgsync server on {bind_addr}: {source}")] + Bind { + bind_addr: String, + #[source] + source: io::Error, + }, + #[error("cfgsync server terminated unexpectedly: {source}")] + Serve { + #[source] + source: io::Error, + }, +} + +async fn node_config( + State(state): State>, + Json(payload): Json, +) -> impl IntoResponse { + let response = resolve_node_config_response(&state, &payload); + + match response { + ConfigResolveResponse::Config(payload_data) => { + (StatusCode::OK, Json(payload_data)).into_response() + } + ConfigResolveResponse::Error(error) => { + let status = error_status(&error.code); + + (status, Json(error)).into_response() + } + } +} + +async fn register_node( + State(state): State>, + Json(payload): Json, +) -> impl IntoResponse { + match state.repo.register(payload) { + RegisterNodeResponse::Registered => StatusCode::ACCEPTED.into_response(), + RegisterNodeResponse::Error(error) => { + let status = error_status(&error.code); + + (status, Json(error)).into_response() + } + } +} + +fn resolve_node_config_response( + state: &CfgsyncServerState, + registration: &NodeRegistration, +) -> ConfigResolveResponse { + state.repo.resolve(registration) +} + +fn error_status(code: &CfgsyncErrorCode) -> StatusCode { + match code { + CfgsyncErrorCode::MissingConfig => StatusCode::NOT_FOUND, + CfgsyncErrorCode::NotReady => StatusCode::TOO_EARLY, + CfgsyncErrorCode::Internal => StatusCode::INTERNAL_SERVER_ERROR, + } +} + +/// Builds the primary cfgsync router with registration and node artifact +/// routes. +pub fn build_cfgsync_router(state: CfgsyncServerState) -> Router { + Router::new() + .route("/register", post(register_node)) + .route("/node", post(node_config)) + .with_state(Arc::new(state)) +} + +#[doc(hidden)] +/// Builds the legacy cfgsync router that still serves `/init-with-node`. +pub fn build_legacy_cfgsync_router(state: CfgsyncServerState) -> Router { + Router::new() + .route("/register", post(register_node)) + .route("/node", post(node_config)) + .route("/init-with-node", post(node_config)) + .with_state(Arc::new(state)) +} + +/// Runs cfgsync HTTP server on the provided port until shutdown/error. +pub async fn serve_cfgsync(port: u16, state: CfgsyncServerState) -> Result<(), RunCfgsyncError> { + let app = build_cfgsync_router(state); + println!("Server running on http://0.0.0.0:{port}"); + + let bind_addr = format!("0.0.0.0:{port}"); + let listener = tokio::net::TcpListener::bind(&bind_addr) + .await + .map_err(|source| RunCfgsyncError::Bind { bind_addr, source })?; + + axum::serve(listener, app) + .await + .map_err(|source| RunCfgsyncError::Serve { source })?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::{collections::HashMap, sync::Arc}; + + use axum::{Json, extract::State, http::StatusCode, response::IntoResponse}; + + use super::{CfgsyncServerState, NodeRegistration, node_config, register_node}; + use crate::{ + CFGSYNC_SCHEMA_VERSION, CfgsyncErrorCode, CfgsyncErrorResponse, ConfigResolveResponse, + NodeArtifactFile, NodeArtifactsPayload, NodeConfigSource, RegisterNodeResponse, + }; + + struct StaticProvider { + data: HashMap, + } + + impl NodeConfigSource for StaticProvider { + fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse { + if self.data.contains_key(®istration.identifier) { + RegisterNodeResponse::Registered + } else { + RegisterNodeResponse::Error(CfgsyncErrorResponse::missing_config( + ®istration.identifier, + )) + } + } + + fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse { + self.data + .get(®istration.identifier) + .cloned() + .map_or_else( + || { + ConfigResolveResponse::Error(CfgsyncErrorResponse::missing_config( + ®istration.identifier, + )) + }, + ConfigResolveResponse::Config, + ) + } + } + + struct RegistrationAwareProvider { + data: HashMap, + registrations: std::sync::Mutex>, + } + + impl NodeConfigSource for RegistrationAwareProvider { + fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse { + if !self.data.contains_key(®istration.identifier) { + return RegisterNodeResponse::Error(CfgsyncErrorResponse::missing_config( + ®istration.identifier, + )); + } + + let mut registrations = self + .registrations + .lock() + .expect("test registration store should not be poisoned"); + registrations.insert(registration.identifier.clone(), registration); + + RegisterNodeResponse::Registered + } + + fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse { + let registrations = self + .registrations + .lock() + .expect("test registration store should not be poisoned"); + + if !registrations.contains_key(®istration.identifier) { + return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready( + ®istration.identifier, + )); + } + + self.data + .get(®istration.identifier) + .cloned() + .map_or_else( + || { + ConfigResolveResponse::Error(CfgsyncErrorResponse::missing_config( + ®istration.identifier, + )) + }, + ConfigResolveResponse::Config, + ) + } + } + + fn sample_payload() -> NodeArtifactsPayload { + NodeArtifactsPayload { + schema_version: CFGSYNC_SCHEMA_VERSION, + files: vec![NodeArtifactFile::new( + "/app-config.yaml".to_string(), + "app: test".to_string(), + )], + } + } + + #[tokio::test] + async fn node_config_resolves_from_non_tf_provider() { + let mut data = HashMap::new(); + data.insert("node-a".to_owned(), sample_payload()); + + let provider = Arc::new(RegistrationAwareProvider { + data, + registrations: std::sync::Mutex::new(HashMap::new()), + }); + let state = Arc::new(CfgsyncServerState::new(provider)); + let payload = + NodeRegistration::new("node-a".to_string(), "127.0.0.1".parse().expect("valid ip")); + + let _ = register_node(State(state.clone()), Json(payload.clone())) + .await + .into_response(); + + let response = node_config(State(state), Json(payload)) + .await + .into_response(); + + assert_eq!(response.status(), StatusCode::OK); + } + + #[tokio::test] + async fn node_config_returns_not_found_for_unknown_identifier() { + let provider = Arc::new(StaticProvider { + data: HashMap::new(), + }); + let state = Arc::new(CfgsyncServerState::new(provider)); + let payload = NodeRegistration::new( + "missing-node".to_string(), + "127.0.0.1".parse().expect("valid ip"), + ); + + let response = node_config(State(state), Json(payload)) + .await + .into_response(); + + assert_eq!(response.status(), StatusCode::NOT_FOUND); + } + + #[test] + fn missing_config_error_uses_expected_code() { + let error = CfgsyncErrorResponse::missing_config("missing-node"); + + assert!(matches!(error.code, CfgsyncErrorCode::MissingConfig)); + } + + #[tokio::test] + async fn node_config_returns_not_ready_before_registration() { + let mut data = HashMap::new(); + data.insert("node-a".to_owned(), sample_payload()); + + let provider = Arc::new(RegistrationAwareProvider { + data, + registrations: std::sync::Mutex::new(HashMap::new()), + }); + let state = Arc::new(CfgsyncServerState::new(provider)); + let payload = + NodeRegistration::new("node-a".to_string(), "127.0.0.1".parse().expect("valid ip")); + + let response = node_config(State(state), Json(payload)) + .await + .into_response(); + + assert_eq!(response.status(), StatusCode::TOO_EARLY); + } +} diff --git a/cfgsync/core/src/source.rs b/cfgsync/core/src/source.rs new file mode 100644 index 0000000..00a04a3 --- /dev/null +++ b/cfgsync/core/src/source.rs @@ -0,0 +1,278 @@ +use std::{collections::HashMap, fs, path::Path, sync::Arc}; + +use thiserror::Error; + +use crate::{ + NodeArtifactsBundle, NodeArtifactsBundleEntry, NodeArtifactsPayload, NodeRegistration, + RegisterNodeResponse, protocol::ConfigResolveResponse, +}; + +/// Source of cfgsync node payloads. +pub trait NodeConfigSource: Send + Sync { + /// Records a node registration before config resolution. + fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse; + + /// Resolves the current artifact payload for a previously registered node. + fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse; +} + +/// In-memory map-backed source used by cfgsync server state. +pub struct StaticConfigSource { + configs: HashMap, +} + +impl StaticConfigSource { + /// Builds an in-memory source from fully formed payloads. + #[must_use] + pub fn from_payloads(configs: HashMap) -> Arc { + Arc::new(Self { configs }) + } + + /// Builds an in-memory source from a static bundle document. + #[must_use] + pub fn from_bundle(bundle: NodeArtifactsBundle) -> Arc { + Self::from_payloads(bundle_to_payload_map(bundle)) + } +} + +impl NodeConfigSource for StaticConfigSource { + fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse { + if self.configs.contains_key(®istration.identifier) { + RegisterNodeResponse::Registered + } else { + RegisterNodeResponse::Error(crate::CfgsyncErrorResponse::missing_config( + ®istration.identifier, + )) + } + } + + fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse { + self.configs + .get(®istration.identifier) + .cloned() + .map_or_else( + || { + ConfigResolveResponse::Error(crate::CfgsyncErrorResponse::missing_config( + ®istration.identifier, + )) + }, + ConfigResolveResponse::Config, + ) + } +} + +#[derive(Debug, Error)] +pub enum BundleLoadError { + #[error("reading cfgsync bundle {path}: {source}")] + ReadBundle { + path: String, + #[source] + source: std::io::Error, + }, + #[error("parsing cfgsync bundle {path}: {source}")] + ParseBundle { + path: String, + #[source] + source: serde_yaml::Error, + }, +} + +/// Converts a static bundle into the node payload map used by static sources. +#[must_use] +pub fn bundle_to_payload_map(bundle: NodeArtifactsBundle) -> HashMap { + let shared_files = bundle.shared_files; + + bundle + .nodes + .into_iter() + .map(|node| { + let NodeArtifactsBundleEntry { identifier, files } = node; + + let mut payload_files = files; + payload_files.extend(shared_files.clone()); + + (identifier, NodeArtifactsPayload::from_files(payload_files)) + }) + .collect() +} + +/// Loads a cfgsync bundle YAML file from disk. +pub fn load_bundle(path: &Path) -> Result { + let path_string = path.display().to_string(); + let raw = fs::read_to_string(path).map_err(|source| BundleLoadError::ReadBundle { + path: path_string.clone(), + source, + })?; + serde_yaml::from_str(&raw).map_err(|source| BundleLoadError::ParseBundle { + path: path_string, + source, + }) +} + +/// Failures when loading a bundle-backed cfgsync source. +#[derive(Debug, Error)] +pub enum BundleConfigSourceError { + #[error("failed to read cfgsync bundle at {path}: {source}")] + Read { + path: String, + #[source] + source: std::io::Error, + }, + #[error("failed to parse cfgsync bundle at {path}: {source}")] + Parse { + path: String, + #[source] + source: serde_yaml::Error, + }, +} + +/// YAML bundle-backed source implementation. +pub struct BundleConfigSource { + inner: StaticConfigSource, +} + +impl BundleConfigSource { + /// Loads source state from a cfgsync bundle YAML file. + pub fn from_yaml_file(path: &Path) -> Result { + let raw = fs::read_to_string(path).map_err(|source| BundleConfigSourceError::Read { + path: path.display().to_string(), + source, + })?; + + let bundle: NodeArtifactsBundle = + serde_yaml::from_str(&raw).map_err(|source| BundleConfigSourceError::Parse { + path: path.display().to_string(), + source, + })?; + + let configs = bundle + .nodes + .into_iter() + .map(payload_from_bundle_node) + .collect(); + + Ok(Self { + inner: StaticConfigSource { configs }, + }) + } +} + +impl NodeConfigSource for BundleConfigSource { + fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse { + self.inner.register(registration) + } + + fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse { + self.inner.resolve(registration) + } +} + +fn payload_from_bundle_node(node: NodeArtifactsBundleEntry) -> (String, NodeArtifactsPayload) { + ( + node.identifier, + NodeArtifactsPayload::from_files(node.files), + ) +} + +#[cfg(test)] +mod tests { + use std::{collections::HashMap, io::Write as _}; + + use tempfile::NamedTempFile; + + use super::{BundleConfigSource, StaticConfigSource}; + use crate::{ + CFGSYNC_SCHEMA_VERSION, CfgsyncErrorCode, ConfigResolveResponse, NodeArtifactFile, + NodeArtifactsPayload, NodeConfigSource, NodeRegistration, + }; + + fn sample_payload() -> NodeArtifactsPayload { + NodeArtifactsPayload::from_files(vec![NodeArtifactFile::new( + "/config.yaml".to_string(), + "key: value".to_string(), + )]) + } + + #[test] + fn resolves_existing_identifier() { + let mut configs = HashMap::new(); + configs.insert("node-1".to_owned(), sample_payload()); + let repo = StaticConfigSource { configs }; + + match repo.resolve(&NodeRegistration::new( + "node-1".to_string(), + "127.0.0.1".parse().expect("parse ip"), + )) { + ConfigResolveResponse::Config(payload) => { + assert_eq!(payload.schema_version, CFGSYNC_SCHEMA_VERSION); + assert_eq!(payload.files.len(), 1); + assert_eq!(payload.files[0].path, "/config.yaml"); + } + ConfigResolveResponse::Error(error) => panic!("expected config response, got {error}"), + } + } + + #[test] + fn reports_missing_identifier() { + let repo = StaticConfigSource { + configs: HashMap::new(), + }; + + match repo.resolve(&NodeRegistration::new( + "unknown-node".to_string(), + "127.0.0.1".parse().expect("parse ip"), + )) { + ConfigResolveResponse::Config(_) => panic!("expected missing-config error"), + ConfigResolveResponse::Error(error) => { + assert!(matches!(error.code, CfgsyncErrorCode::MissingConfig)); + assert!(error.message.contains("unknown-node")); + } + } + } + + #[test] + fn loads_file_provider_bundle() { + let mut bundle_file = NamedTempFile::new().expect("create temp bundle"); + let yaml = r#" +nodes: + - identifier: node-1 + files: + - path: /config.yaml + content: "a: 1" +"#; + bundle_file + .write_all(yaml.as_bytes()) + .expect("write bundle yaml"); + + let provider = + BundleConfigSource::from_yaml_file(bundle_file.path()).expect("load file provider"); + + let _ = provider.register(NodeRegistration::new( + "node-1".to_string(), + "127.0.0.1".parse().expect("parse ip"), + )); + + match provider.resolve(&NodeRegistration::new( + "node-1".to_string(), + "127.0.0.1".parse().expect("parse ip"), + )) { + ConfigResolveResponse::Config(payload) => assert_eq!(payload.files.len(), 1), + ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"), + } + } + + #[test] + fn resolve_accepts_known_registration_without_gating() { + let mut configs = HashMap::new(); + configs.insert("node-1".to_owned(), sample_payload()); + let repo = StaticConfigSource { configs }; + + match repo.resolve(&NodeRegistration::new( + "node-1".to_string(), + "127.0.0.1".parse().expect("parse ip"), + )) { + ConfigResolveResponse::Config(_) => {} + ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"), + } + } +} diff --git a/cfgsync/runtime/Cargo.toml b/cfgsync/runtime/Cargo.toml new file mode 100644 index 0000000..a05d244 --- /dev/null +++ b/cfgsync/runtime/Cargo.toml @@ -0,0 +1,29 @@ +[package] +categories = { workspace = true } +description = { workspace = true } +edition = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +name = "cfgsync-runtime" +readme = { workspace = true } +repository = { workspace = true } +version = { workspace = true } + +[lints] +workspace = true + +[dependencies] +anyhow = "1" +axum = { default-features = false, features = ["http1", "http2", "tokio"], version = "0.7.5" } +cfgsync-adapter = { workspace = true } +cfgsync-artifacts = { workspace = true } +cfgsync-core = { workspace = true } +clap = { version = "4", features = ["derive"] } +serde = { workspace = true } +serde_yaml = { workspace = true } +thiserror = { workspace = true } +tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" } +tracing = { workspace = true } + +[dev-dependencies] +tempfile = { workspace = true } diff --git a/cfgsync/runtime/examples/minimal_cfgsync.rs b/cfgsync/runtime/examples/minimal_cfgsync.rs new file mode 100644 index 0000000..11308a6 --- /dev/null +++ b/cfgsync/runtime/examples/minimal_cfgsync.rs @@ -0,0 +1,61 @@ +use cfgsync_adapter::{ + DynCfgsyncError, MaterializationResult, MaterializedArtifacts, RegistrationSnapshot, + RegistrationSnapshotMaterializer, +}; +use cfgsync_artifacts::{ArtifactFile, ArtifactSet}; +use cfgsync_core::NodeRegistration; +use cfgsync_runtime::{Client, OutputMap, serve}; +use tempfile::tempdir; +use tokio::time::{Duration, sleep}; + +struct ExampleMaterializer; + +impl RegistrationSnapshotMaterializer for ExampleMaterializer { + fn materialize_snapshot( + &self, + registrations: &RegistrationSnapshot, + ) -> Result { + if registrations.is_empty() { + return Ok(MaterializationResult::NotReady); + } + + let nodes = registrations.iter().map(|registration| { + ( + registration.identifier.clone(), + ArtifactSet::new(vec![ArtifactFile::new( + "/config.yaml".to_string(), + format!("id: {}\n", registration.identifier), + )]), + ) + }); + + Ok(MaterializationResult::ready( + MaterializedArtifacts::from_nodes(nodes), + )) + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let port = 4400; + let server = tokio::spawn(async move { serve(port, ExampleMaterializer).await }); + + // Give the server a moment to bind before the client registers. + sleep(Duration::from_millis(100)).await; + + let tempdir = tempdir()?; + let outputs = OutputMap::under(tempdir.path().to_path_buf()); + let registration = NodeRegistration::new("node-1".to_string(), "127.0.0.1".parse()?); + + Client::new("http://127.0.0.1:4400") + .fetch_and_write(®istration, &outputs) + .await?; + + println!( + "{}", + std::fs::read_to_string(tempdir.path().join("config.yaml"))? + ); + + server.abort(); + Ok(()) +} diff --git a/cfgsync/runtime/examples/precomputed_registration_cfgsync.rs b/cfgsync/runtime/examples/precomputed_registration_cfgsync.rs new file mode 100644 index 0000000..e10a7b2 --- /dev/null +++ b/cfgsync/runtime/examples/precomputed_registration_cfgsync.rs @@ -0,0 +1,79 @@ +use cfgsync_adapter::MaterializedArtifacts; +use cfgsync_artifacts::{ArtifactFile, ArtifactSet}; +use cfgsync_core::NodeRegistration; +use cfgsync_runtime::{Client, OutputMap, serve}; +use tempfile::tempdir; +use tokio::time::{Duration, sleep}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let port = 4401; + let artifacts = MaterializedArtifacts::from_nodes([ + ( + "node-1".to_owned(), + ArtifactSet::new(vec![ArtifactFile::new( + "/config.yaml".to_string(), + "id: node-1\n".to_string(), + )]), + ), + ( + "node-2".to_owned(), + ArtifactSet::new(vec![ArtifactFile::new( + "/config.yaml".to_string(), + "id: node-2\n".to_string(), + )]), + ), + ]) + .with_shared(ArtifactSet::new(vec![ArtifactFile::new( + "/shared/cluster.yaml".to_string(), + "cluster: demo\n".to_string(), + )])); + + let server = tokio::spawn(async move { serve(port, artifacts).await }); + + // Give the server a moment to bind before clients register. + sleep(Duration::from_millis(100)).await; + + let node_1_dir = tempdir()?; + let node_1_outputs = OutputMap::config_and_shared( + node_1_dir.path().join("config.yaml"), + node_1_dir.path().join("shared"), + ); + let node_1 = NodeRegistration::new("node-1".to_string(), "127.0.0.1".parse()?); + + Client::new("http://127.0.0.1:4401") + .fetch_and_write(&node_1, &node_1_outputs) + .await?; + + println!( + "node-1 config:\n{}", + std::fs::read_to_string(node_1_dir.path().join("config.yaml"))? + ); + + // A later node still uses the same registration/fetch flow. The artifacts + // were already known; registration only gates delivery. + sleep(Duration::from_millis(250)).await; + + let node_2_dir = tempdir()?; + let node_2_outputs = OutputMap::config_and_shared( + node_2_dir.path().join("config.yaml"), + node_2_dir.path().join("shared"), + ); + let node_2 = NodeRegistration::new("node-2".to_string(), "127.0.0.2".parse()?); + + Client::new("http://127.0.0.1:4401") + .fetch_and_write(&node_2, &node_2_outputs) + .await?; + + println!( + "node-2 config:\n{}", + std::fs::read_to_string(node_2_dir.path().join("config.yaml"))? + ); + println!( + "shared artifact:\n{}", + std::fs::read_to_string(node_2_dir.path().join("shared/shared/cluster.yaml"))? + ); + + server.abort(); + Ok(()) +} diff --git a/cfgsync/runtime/examples/wait_for_registrations_cfgsync.rs b/cfgsync/runtime/examples/wait_for_registrations_cfgsync.rs new file mode 100644 index 0000000..d2a2823 --- /dev/null +++ b/cfgsync/runtime/examples/wait_for_registrations_cfgsync.rs @@ -0,0 +1,81 @@ +use cfgsync_adapter::{ + DynCfgsyncError, MaterializationResult, MaterializedArtifacts, RegistrationSnapshot, + RegistrationSnapshotMaterializer, +}; +use cfgsync_artifacts::{ArtifactFile, ArtifactSet}; +use cfgsync_core::NodeRegistration; +use cfgsync_runtime::{Client, OutputMap, serve}; +use tempfile::tempdir; +use tokio::time::{Duration, sleep}; + +struct ThresholdMaterializer; + +impl RegistrationSnapshotMaterializer for ThresholdMaterializer { + fn materialize_snapshot( + &self, + registrations: &RegistrationSnapshot, + ) -> Result { + if registrations.len() < 2 { + return Ok(MaterializationResult::NotReady); + } + + let nodes = registrations.iter().map(|registration| { + ( + registration.identifier.clone(), + ArtifactSet::new(vec![ArtifactFile::new( + "/config.yaml".to_string(), + format!("id: {}\ncluster_ready: true\n", registration.identifier), + )]), + ) + }); + + Ok(MaterializationResult::ready( + MaterializedArtifacts::from_nodes(nodes), + )) + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let port = 4402; + let server = tokio::spawn(async move { serve(port, ThresholdMaterializer).await }); + + sleep(Duration::from_millis(100)).await; + + let waiting_dir = tempdir()?; + let waiting_outputs = OutputMap::under(waiting_dir.path().to_path_buf()); + let waiting_node = NodeRegistration::new("node-1".to_string(), "127.0.0.1".parse()?); + let waiting_client = Client::new("http://127.0.0.1:4402"); + + let waiting_task = tokio::spawn(async move { + waiting_client + .fetch_and_write(&waiting_node, &waiting_outputs) + .await + }); + + // node-1 is now polling. The materializer will keep returning NotReady + // until node-2 registers. + sleep(Duration::from_millis(400)).await; + + let second_dir = tempdir()?; + let second_outputs = OutputMap::under(second_dir.path().to_path_buf()); + let second_node = NodeRegistration::new("node-2".to_string(), "127.0.0.2".parse()?); + + Client::new("http://127.0.0.1:4402") + .fetch_and_write(&second_node, &second_outputs) + .await?; + + waiting_task.await??; + + println!( + "node-1 config after threshold reached:\n{}", + std::fs::read_to_string(waiting_dir.path().join("config.yaml"))? + ); + println!( + "node-2 config:\n{}", + std::fs::read_to_string(second_dir.path().join("config.yaml"))? + ); + + server.abort(); + Ok(()) +} diff --git a/testing-framework/tools/cfgsync-runtime/src/bin/cfgsync-client.rs b/cfgsync/runtime/src/bin/cfgsync-client.rs similarity index 76% rename from testing-framework/tools/cfgsync-runtime/src/bin/cfgsync-client.rs rename to cfgsync/runtime/src/bin/cfgsync-client.rs index 98c3914..b821679 100644 --- a/testing-framework/tools/cfgsync-runtime/src/bin/cfgsync-client.rs +++ b/cfgsync/runtime/src/bin/cfgsync-client.rs @@ -1,6 +1,6 @@ use std::{env, process}; -use cfgsync_runtime::run_cfgsync_client_from_env; +use cfgsync_runtime::run_client_from_env; const CFGSYNC_PORT_ENV: &str = "LOGOS_BLOCKCHAIN_CFGSYNC_PORT"; const DEFAULT_CFGSYNC_PORT: u16 = 4400; @@ -14,7 +14,7 @@ fn cfgsync_port() -> u16 { #[tokio::main] async fn main() { - if let Err(err) = run_cfgsync_client_from_env(cfgsync_port()).await { + if let Err(err) = run_client_from_env(cfgsync_port()).await { eprintln!("Error: {err}"); process::exit(1); } diff --git a/testing-framework/tools/cfgsync-runtime/src/bin/cfgsync-server.rs b/cfgsync/runtime/src/bin/cfgsync-server.rs similarity index 61% rename from testing-framework/tools/cfgsync-runtime/src/bin/cfgsync-server.rs rename to cfgsync/runtime/src/bin/cfgsync-server.rs index 28134ea..51db99b 100644 --- a/testing-framework/tools/cfgsync-runtime/src/bin/cfgsync-server.rs +++ b/cfgsync/runtime/src/bin/cfgsync-server.rs @@ -1,10 +1,10 @@ use std::path::PathBuf; -use cfgsync_runtime::run_cfgsync_server; +use cfgsync_runtime::serve_from_config; use clap::Parser; #[derive(Parser, Debug)] -#[command(about = "CfgSync")] +#[command(about = "Cfgsync server")] struct Args { config: PathBuf, } @@ -12,5 +12,5 @@ struct Args { #[tokio::main] async fn main() -> anyhow::Result<()> { let args = Args::parse(); - run_cfgsync_server(&args.config).await + serve_from_config(&args.config).await } diff --git a/cfgsync/runtime/src/client.rs b/cfgsync/runtime/src/client.rs new file mode 100644 index 0000000..f2372bd --- /dev/null +++ b/cfgsync/runtime/src/client.rs @@ -0,0 +1,419 @@ +use std::{ + collections::HashMap, + env, fs, + net::Ipv4Addr, + path::{Path, PathBuf}, +}; + +use anyhow::{Context as _, Result, bail}; +use cfgsync_core::{ + CFGSYNC_SCHEMA_VERSION, Client as ProtocolClient, NodeArtifactFile, NodeArtifactsPayload, + NodeRegistration, RegistrationPayload, +}; +use thiserror::Error; +use tokio::time::{Duration, sleep}; +use tracing::info; + +const FETCH_ATTEMPTS: usize = 5; +const FETCH_RETRY_DELAY: Duration = Duration::from_millis(250); + +/// Output routing for fetched artifact files. +#[derive(Debug, Clone, Default)] +pub struct OutputMap { + routes: HashMap, + fallback: Option, +} + +#[derive(Debug, Clone)] +enum FallbackRoute { + Under(PathBuf), + Shared { dir: PathBuf }, +} + +impl OutputMap { + /// Creates an empty artifact output map. + #[must_use] + pub fn new() -> Self { + Self::default() + } + + /// Routes one artifact path from the payload to a local output path. + #[must_use] + pub fn route(mut self, artifact_path: String, output_path: PathBuf) -> Self { + self.routes.insert(artifact_path, output_path); + self + } + + /// Writes payload files under `root`, preserving each artifact path. + /// + /// For example, `/config.yaml` is written to `/config.yaml` and + /// `shared/deployment-settings.yaml` is written to + /// `/shared/deployment-settings.yaml`. + #[must_use] + pub fn under(root: PathBuf) -> Self { + Self { + routes: HashMap::new(), + fallback: Some(FallbackRoute::Under(root)), + } + } + + /// Writes the node config to `config_path` and all other files under + /// `shared_dir`, preserving their relative artifact paths. + #[must_use] + pub fn config_and_shared(config_path: PathBuf, shared_dir: PathBuf) -> Self { + Self::default() + .route("/config.yaml".to_string(), config_path.clone()) + .route("config.yaml".to_string(), config_path) + .with_fallback(FallbackRoute::Shared { dir: shared_dir }) + } + + fn resolve_path(&self, file: &NodeArtifactFile) -> PathBuf { + self.routes + .get(&file.path) + .cloned() + .or_else(|| { + self.fallback + .as_ref() + .map(|fallback| fallback.resolve(&file.path)) + }) + .unwrap_or_else(|| PathBuf::from(&file.path)) + } + + fn with_fallback(mut self, fallback: FallbackRoute) -> Self { + self.fallback = Some(fallback); + self + } +} + +impl FallbackRoute { + fn resolve(&self, artifact_path: &str) -> PathBuf { + let relative = artifact_path.trim_start_matches('/'); + + match self { + FallbackRoute::Under(root) => root.join(relative), + FallbackRoute::Shared { dir } => dir.join(relative), + } + } +} + +/// Runtime-oriented cfgsync client that handles registration, fetch, and local +/// artifact materialization. +#[derive(Debug, Clone)] +pub struct Client { + inner: ProtocolClient, +} + +impl Client { + /// Creates a runtime client that talks to the cfgsync server at + /// `server_addr`. + #[must_use] + pub fn new(server_addr: &str) -> Self { + Self { + inner: ProtocolClient::new(server_addr.to_string()), + } + } + + /// Registers a node and fetches its artifact payload from cfgsync. + pub async fn register_and_fetch( + &self, + registration: &NodeRegistration, + ) -> Result { + self.register_node(registration).await?; + + let payload = self + .fetch_with_retry(registration) + .await + .context("fetching node artifacts")?; + ensure_schema_version(&payload)?; + + Ok(payload) + } + + /// Registers a node, fetches its artifact payload, and writes the result + /// using the provided output routing policy. + pub async fn fetch_and_write( + &self, + registration: &NodeRegistration, + outputs: &OutputMap, + ) -> Result<()> { + let payload = self.register_and_fetch(registration).await?; + let files = collect_payload_files(&payload)?; + + for file in files { + write_file(file, outputs)?; + } + + info!(files = files.len(), "cfgsync files saved"); + + Ok(()) + } + + async fn fetch_with_retry( + &self, + registration: &NodeRegistration, + ) -> Result { + for attempt in 1..=FETCH_ATTEMPTS { + match self.fetch_once(registration).await { + Ok(config) => return Ok(config), + Err(error) => { + if attempt == FETCH_ATTEMPTS { + return Err(error).with_context(|| { + format!("fetching node artifacts after {attempt} attempts") + }); + } + + sleep(FETCH_RETRY_DELAY).await; + } + } + } + + unreachable!("cfgsync fetch loop always returns before exhausting attempts"); + } + + async fn fetch_once(&self, registration: &NodeRegistration) -> Result { + self.inner + .fetch_node_config(registration) + .await + .map_err(Into::into) + } + + async fn register_node(&self, registration: &NodeRegistration) -> Result<()> { + for attempt in 1..=FETCH_ATTEMPTS { + match self.inner.register_node(registration).await { + Ok(()) => { + info!(identifier = %registration.identifier, "cfgsync node registered"); + return Ok(()); + } + Err(error) => { + if attempt == FETCH_ATTEMPTS { + return Err(error).with_context(|| { + format!("registering node with cfgsync after {attempt} attempts") + }); + } + + sleep(FETCH_RETRY_DELAY).await; + } + } + } + + unreachable!("cfgsync register loop always returns before exhausting attempts"); + } +} + +#[derive(Debug, Error)] +enum ClientEnvError { + #[error("CFG_HOST_IP `{value}` is not a valid IPv4 address")] + InvalidIp { value: String }, +} + +fn ensure_schema_version(config: &NodeArtifactsPayload) -> Result<()> { + if config.schema_version != CFGSYNC_SCHEMA_VERSION { + bail!( + "unsupported cfgsync payload schema version {}, expected {}", + config.schema_version, + CFGSYNC_SCHEMA_VERSION + ); + } + + Ok(()) +} + +fn collect_payload_files(config: &NodeArtifactsPayload) -> Result<&[NodeArtifactFile]> { + if config.is_empty() { + bail!("cfgsync payload contains no files"); + } + + Ok(config.files()) +} + +fn write_file(file: &NodeArtifactFile, outputs: &OutputMap) -> Result<()> { + let path = outputs.resolve_path(file); + + ensure_parent_dir(&path)?; + + fs::write(&path, &file.content).with_context(|| format!("writing {}", path.display()))?; + + info!(path = %path.display(), "cfgsync file saved"); + + Ok(()) +} + +fn ensure_parent_dir(path: &Path) -> Result<()> { + let Some(parent) = path.parent() else { + return Ok(()); + }; + + if parent.as_os_str().is_empty() { + return Ok(()); + } + + fs::create_dir_all(parent) + .with_context(|| format!("creating parent directory {}", parent.display()))?; + + Ok(()) +} + +/// Resolves runtime client inputs from environment and materializes node files. +pub async fn run_client_from_env(default_port: u16) -> Result<()> { + let server_addr = + env::var("CFG_SERVER_ADDR").unwrap_or_else(|_| format!("http://127.0.0.1:{default_port}")); + let ip = parse_ip_env(&env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_owned()))?; + let identifier = + env::var("CFG_HOST_IDENTIFIER").unwrap_or_else(|_| "unidentified-node".to_owned()); + let metadata = parse_registration_payload_env()?; + let outputs = build_output_map(); + + Client::new(&server_addr) + .fetch_and_write( + &NodeRegistration::new(identifier, ip).with_payload(metadata), + &outputs, + ) + .await +} + +fn parse_ip_env(ip_str: &str) -> Result { + ip_str + .parse() + .map_err(|_| ClientEnvError::InvalidIp { + value: ip_str.to_owned(), + }) + .map_err(Into::into) +} + +fn parse_registration_payload_env() -> Result { + let Ok(raw) = env::var("CFG_REGISTRATION_METADATA_JSON") else { + return Ok(RegistrationPayload::default()); + }; + + parse_registration_payload(&raw) +} + +fn parse_registration_payload(raw: &str) -> Result { + RegistrationPayload::from_json_str(raw).context("parsing CFG_REGISTRATION_METADATA_JSON") +} + +fn build_output_map() -> OutputMap { + let mut outputs = OutputMap::default(); + + if let Ok(path) = env::var("CFG_FILE_PATH") { + let path = PathBuf::from(path); + + outputs = outputs + .route("/config.yaml".to_string(), path.clone()) + .route("config.yaml".to_string(), path); + } + + if let Ok(path) = env::var("CFG_DEPLOYMENT_PATH") { + let path = PathBuf::from(path); + + outputs = outputs + .route("/deployment.yaml".to_string(), path.clone()) + .route("deployment-settings.yaml".to_string(), path.clone()) + .route("/deployment-settings.yaml".to_string(), path); + } + + outputs +} + +#[cfg(test)] +mod tests { + use cfgsync_core::{ + CfgsyncServerState, NodeArtifactsBundle, NodeArtifactsBundleEntry, StaticConfigSource, + }; + use tempfile::tempdir; + + use super::*; + + #[tokio::test] + async fn client_materializes_multi_file_payload_from_cfgsync_server() { + let dir = tempdir().expect("create temp dir"); + let app_config_path = dir.path().join("config.yaml"); + let deployment_path = dir.path().join("deployment.yaml"); + + let bundle = NodeArtifactsBundle::new(vec![NodeArtifactsBundleEntry { + identifier: "node-1".to_owned(), + files: vec![ + NodeArtifactFile::new( + app_config_path.to_string_lossy().into_owned(), + "app_key: app_value".to_string(), + ), + NodeArtifactFile::new( + deployment_path.to_string_lossy().into_owned(), + "mode: local".to_string(), + ), + ], + }]); + + let repo = StaticConfigSource::from_bundle(bundle); + let state = CfgsyncServerState::new(repo); + let port = allocate_test_port(); + let address = format!("http://127.0.0.1:{port}"); + let server = tokio::spawn(async move { + cfgsync_core::serve_cfgsync(port, state) + .await + .expect("run cfgsync server"); + }); + + Client::new(&address) + .fetch_and_write( + &NodeRegistration::new( + "node-1".to_string(), + "127.0.0.1".parse().expect("parse ip"), + ), + &OutputMap::default(), + ) + .await + .expect("pull config files"); + + server.abort(); + let _ = server.await; + + let app_config = fs::read_to_string(&app_config_path).expect("read app config"); + let deployment = fs::read_to_string(&deployment_path).expect("read deployment config"); + + assert_eq!(app_config, "app_key: app_value"); + assert_eq!(deployment, "mode: local"); + } + fn allocate_test_port() -> u16 { + let listener = + std::net::TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port for test"); + let port = listener.local_addr().expect("read local addr").port(); + drop(listener); + port + } + + #[test] + fn parses_registration_payload_object() { + #[derive(Debug, serde::Deserialize, PartialEq, Eq)] + struct ExamplePayload { + network_port: u16, + service: String, + } + + let metadata = parse_registration_payload(r#"{"network_port":3000,"service":"blend"}"#) + .expect("parse metadata"); + let payload: ExamplePayload = metadata + .deserialize() + .expect("deserialize payload") + .expect("payload value"); + + assert_eq!( + payload, + ExamplePayload { + network_port: 3000, + service: "blend".to_owned(), + } + ); + } + + #[test] + fn parses_registration_payload_array() { + let metadata = parse_registration_payload(r#"[1,2,3]"#).expect("parse metadata array"); + let payload: Vec = metadata + .deserialize() + .expect("deserialize payload") + .expect("payload value"); + + assert_eq!(payload, vec![1, 2, 3]); + } +} diff --git a/cfgsync/runtime/src/lib.rs b/cfgsync/runtime/src/lib.rs new file mode 100644 index 0000000..479091e --- /dev/null +++ b/cfgsync/runtime/src/lib.rs @@ -0,0 +1,10 @@ +pub use cfgsync_core as core; + +mod client; +mod server; + +pub use client::{Client, OutputMap, run_client_from_env}; +pub use server::{ + LoadServerConfigError, ServerConfig, ServerSource, build_persisted_router, build_router, serve, + serve_from_config, serve_persisted, +}; diff --git a/cfgsync/runtime/src/server.rs b/cfgsync/runtime/src/server.rs new file mode 100644 index 0000000..25b54bf --- /dev/null +++ b/cfgsync/runtime/src/server.rs @@ -0,0 +1,255 @@ +use std::{fs, path::Path, sync::Arc}; + +use anyhow::Context as _; +use axum::Router; +use cfgsync_adapter::{ + CachedSnapshotMaterializer, MaterializedArtifacts, MaterializedArtifactsSink, + PersistingSnapshotMaterializer, RegistrationConfigSource, RegistrationSnapshotMaterializer, +}; +use cfgsync_core::{ + BundleConfigSource, CfgsyncServerState, NodeConfigSource, RunCfgsyncError, + serve_cfgsync as serve_cfgsync_state, +}; +use serde::Deserialize; +use thiserror::Error; + +/// Runtime cfgsync server config loaded from YAML. +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] +pub struct ServerConfig { + /// HTTP port to bind the cfgsync server on. + pub port: u16, + /// Source used by the runtime-managed cfgsync server. + pub source: ServerSource, +} + +/// Runtime cfgsync source loaded from config. +/// +/// This type is intentionally runtime-oriented: +/// - `Static` serves precomputed artifacts directly without registration +/// - `Registration` serves precomputed artifacts through the registration +/// protocol, which is useful when the consumer wants clients to register +/// before receiving already-materialized artifacts +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum ServerSource { + /// Serve precomputed artifacts directly, without requiring registration. + #[serde(alias = "bundle")] + Static { artifacts_path: String }, + /// Require node registration before serving precomputed artifacts. + Registration { artifacts_path: String }, +} + +#[derive(Debug, Error)] +pub enum LoadServerConfigError { + #[error("failed to read cfgsync config file {path}: {source}")] + Read { + path: String, + #[source] + source: std::io::Error, + }, + #[error("failed to parse cfgsync config file {path}: {source}")] + Parse { + path: String, + #[source] + source: serde_yaml::Error, + }, +} + +impl ServerConfig { + /// Loads cfgsync runtime server config from a YAML file. + pub fn load_from_file(path: &Path) -> Result { + let config_path = path.display().to_string(); + let config_content = + fs::read_to_string(path).map_err(|source| LoadServerConfigError::Read { + path: config_path.clone(), + source, + })?; + + let config: ServerConfig = serde_yaml::from_str(&config_content).map_err(|source| { + LoadServerConfigError::Parse { + path: config_path, + source, + } + })?; + + Ok(config) + } + + #[must_use] + pub fn for_static(port: u16, artifacts_path: String) -> Self { + Self { + port, + source: ServerSource::Static { artifacts_path }, + } + } + + /// Builds a config that serves precomputed artifacts through the + /// registration flow. + #[must_use] + pub fn for_registration(port: u16, artifacts_path: String) -> Self { + Self { + port, + source: ServerSource::Registration { artifacts_path }, + } + } +} + +fn load_static_source(artifacts_path: &Path) -> anyhow::Result> { + let provider = BundleConfigSource::from_yaml_file(artifacts_path).with_context(|| { + format!( + "loading cfgsync static artifacts from {}", + artifacts_path.display() + ) + })?; + + Ok(Arc::new(provider)) +} + +fn load_registration_source(artifacts_path: &Path) -> anyhow::Result> { + let materialized = load_materialized_artifacts_yaml(artifacts_path)?; + let provider = RegistrationConfigSource::new(materialized); + + Ok(Arc::new(provider)) +} + +fn load_materialized_artifacts_yaml( + artifacts_path: &Path, +) -> anyhow::Result { + let raw = fs::read_to_string(artifacts_path).with_context(|| { + format!( + "reading cfgsync materialized artifacts from {}", + artifacts_path.display() + ) + })?; + + serde_yaml::from_str(&raw).with_context(|| { + format!( + "parsing cfgsync materialized artifacts from {}", + artifacts_path.display() + ) + }) +} + +fn resolve_artifacts_path(config_path: &Path, artifacts_path: &str) -> std::path::PathBuf { + let path = Path::new(artifacts_path); + if path.is_absolute() { + return path.to_path_buf(); + } + + config_path + .parent() + .unwrap_or_else(|| Path::new(".")) + .join(path) +} + +/// Loads runtime config and starts cfgsync HTTP server process. +pub async fn serve_from_config(config_path: &Path) -> anyhow::Result<()> { + let config = ServerConfig::load_from_file(config_path)?; + let artifacts_path = resolve_source_path(config_path, &config.source); + + let state = build_server_state(&config, &artifacts_path)?; + serve_cfgsync_state(config.port, state).await?; + + Ok(()) +} + +/// Builds the default registration-backed cfgsync router from a snapshot +/// materializer. +/// +/// This is the main code-driven entrypoint for apps that want cfgsync to own: +/// - node registration +/// - readiness polling +/// - artifact serving +/// +/// while the app owns only snapshot materialization logic. +pub fn build_router(materializer: M) -> Router +where + M: RegistrationSnapshotMaterializer + 'static, +{ + let provider = RegistrationConfigSource::new(CachedSnapshotMaterializer::new(materializer)); + cfgsync_core::build_cfgsync_router(CfgsyncServerState::new(Arc::new(provider))) +} + +/// Builds a registration-backed cfgsync router with a persistence hook for +/// ready materialization results. +/// +/// Use this when the application wants cfgsync to persist or publish shared +/// artifacts after a snapshot becomes ready. +pub fn build_persisted_router(materializer: M, sink: S) -> Router +where + M: RegistrationSnapshotMaterializer + 'static, + S: MaterializedArtifactsSink + 'static, +{ + let provider = RegistrationConfigSource::new(CachedSnapshotMaterializer::new( + PersistingSnapshotMaterializer::new(materializer, sink), + )); + + cfgsync_core::build_cfgsync_router(CfgsyncServerState::new(Arc::new(provider))) +} + +/// Runs the default registration-backed cfgsync server directly from a snapshot +/// materializer. +/// +/// This is the simplest runtime entrypoint when the application already has a +/// materializer value and does not need to compose extra routes. +pub async fn serve(port: u16, materializer: M) -> Result<(), RunCfgsyncError> +where + M: RegistrationSnapshotMaterializer + 'static, +{ + let router = build_router(materializer); + serve_router(port, router).await +} + +/// Runs a registration-backed cfgsync server with a persistence hook for ready +/// materialization results. +/// +/// This is the direct serving counterpart to +/// [`build_persisted_router`]. +pub async fn serve_persisted( + port: u16, + materializer: M, + sink: S, +) -> Result<(), RunCfgsyncError> +where + M: RegistrationSnapshotMaterializer + 'static, + S: MaterializedArtifactsSink + 'static, +{ + let router = build_persisted_router(materializer, sink); + serve_router(port, router).await +} + +async fn serve_router(port: u16, router: Router) -> Result<(), RunCfgsyncError> { + let bind_addr = format!("0.0.0.0:{port}"); + let listener = tokio::net::TcpListener::bind(&bind_addr) + .await + .map_err(|source| RunCfgsyncError::Bind { bind_addr, source })?; + + axum::serve(listener, router) + .await + .map_err(|source| RunCfgsyncError::Serve { source })?; + + Ok(()) +} + +fn build_server_state( + config: &ServerConfig, + source_path: &Path, +) -> anyhow::Result { + let repo = match &config.source { + ServerSource::Static { .. } => load_static_source(source_path)?, + ServerSource::Registration { .. } => load_registration_source(source_path)?, + }; + + Ok(CfgsyncServerState::new(repo)) +} + +fn resolve_source_path(config_path: &Path, source: &ServerSource) -> std::path::PathBuf { + match source { + ServerSource::Static { artifacts_path } => { + resolve_artifacts_path(config_path, artifacts_path) + } + ServerSource::Registration { artifacts_path } => { + resolve_artifacts_path(config_path, artifacts_path) + } + } +} diff --git a/logos/infra/assets/stack/scripts/docker/build_cfgsync.sh b/logos/infra/assets/stack/scripts/docker/build_cfgsync.sh index 61a4ad5..7921e8a 100755 --- a/logos/infra/assets/stack/scripts/docker/build_cfgsync.sh +++ b/logos/infra/assets/stack/scripts/docker/build_cfgsync.sh @@ -2,10 +2,10 @@ set -euo pipefail RUSTFLAGS='--cfg feature="pol-dev-mode"' \ - cargo build --manifest-path /workspace/testing-framework/tools/cfgsync-runtime/Cargo.toml --bin cfgsync-server + cargo build --manifest-path /workspace/cfgsync/runtime/Cargo.toml --bin cfgsync-server RUSTFLAGS='--cfg feature="pol-dev-mode"' \ - cargo build --manifest-path /workspace/testing-framework/tools/cfgsync-runtime/Cargo.toml --bin cfgsync-client + cargo build --manifest-path /workspace/cfgsync/runtime/Cargo.toml --bin cfgsync-client cp /workspace/target/debug/cfgsync-server /workspace/artifacts/cfgsync-server cp /workspace/target/debug/cfgsync-client /workspace/artifacts/cfgsync-client diff --git a/logos/infra/helm/logos-runner/templates/cfgsync-deployment.yaml b/logos/infra/helm/logos-runner/templates/cfgsync-deployment.yaml index 7362ab3..f76dc99 100644 --- a/logos/infra/helm/logos-runner/templates/cfgsync-deployment.yaml +++ b/logos/infra/helm/logos-runner/templates/cfgsync-deployment.yaml @@ -39,7 +39,7 @@ spec: items: - key: cfgsync.yaml path: cfgsync.yaml - - key: cfgsync.bundle.yaml - path: cfgsync.bundle.yaml + - key: cfgsync.artifacts.yaml + path: cfgsync.artifacts.yaml - key: run_cfgsync.sh path: scripts/run_cfgsync.sh diff --git a/logos/infra/helm/logos-runner/templates/configmap.yaml b/logos/infra/helm/logos-runner/templates/configmap.yaml index a962e6a..36cd339 100644 --- a/logos/infra/helm/logos-runner/templates/configmap.yaml +++ b/logos/infra/helm/logos-runner/templates/configmap.yaml @@ -11,9 +11,9 @@ data: {{- else }} {{ "" | indent 4 }} {{- end }} - cfgsync.bundle.yaml: | -{{- if .Values.cfgsync.bundle }} -{{ .Values.cfgsync.bundle | indent 4 }} + cfgsync.artifacts.yaml: | +{{- if .Values.cfgsync.artifacts }} +{{ .Values.cfgsync.artifacts | indent 4 }} {{- else }} {{ "" | indent 4 }} {{- end }} diff --git a/logos/runtime/ext/Cargo.toml b/logos/runtime/ext/Cargo.toml index 008bf4f..b317f39 100644 --- a/logos/runtime/ext/Cargo.toml +++ b/logos/runtime/ext/Cargo.toml @@ -7,8 +7,9 @@ version = { workspace = true } [dependencies] # Workspace crates +cfgsync-adapter = { workspace = true } +cfgsync-artifacts = { workspace = true } cfgsync-core = { workspace = true } -cfgsync_runtime = { workspace = true } lb-framework = { workspace = true } testing-framework-core = { workspace = true } testing-framework-env = { workspace = true } diff --git a/logos/runtime/ext/src/cfgsync/mod.rs b/logos/runtime/ext/src/cfgsync/mod.rs index 6d0b308..268d393 100644 --- a/logos/runtime/ext/src/cfgsync/mod.rs +++ b/logos/runtime/ext/src/cfgsync/mod.rs @@ -1,24 +1,31 @@ -use anyhow::{Result, anyhow}; -pub(crate) use cfgsync_runtime::render::CfgsyncOutputPaths; -use cfgsync_runtime::{ - bundle::{CfgSyncBundle, CfgSyncBundleNode, build_cfgsync_bundle_with_hostnames}, - render::{ - CfgsyncConfigOverrides, RenderedCfgsync, ensure_bundle_path, - render_cfgsync_yaml_from_template, write_rendered_cfgsync, - }, +use anyhow::Result; +use cfgsync_artifacts::ArtifactFile; +pub(crate) use cfgsync_core::render::CfgsyncOutputPaths; +use cfgsync_core::render::{ + CfgsyncConfigOverrides, RenderedCfgsync, ensure_artifacts_path, + render_cfgsync_yaml_from_template, write_rendered_cfgsync, }; use reqwest::Url; use serde_yaml::{Mapping, Value}; -use testing_framework_core::cfgsync::CfgsyncEnv; +use testing_framework_core::cfgsync::{StaticArtifactRenderer, build_static_artifacts}; +use thiserror::Error; pub(crate) struct CfgsyncRenderOptions { pub port: Option, - pub bundle_path: Option, + pub artifacts_path: Option, pub min_timeout_secs: Option, pub metrics_otlp_ingest_url: Option, } -pub(crate) fn render_cfgsync_from_template( +#[derive(Debug, Error)] +enum BundleRenderError { + #[error("cfgsync bundle node `{identifier}` is missing `/config.yaml`")] + MissingConfigFile { identifier: String }, + #[error("cfgsync config file is missing `{key}`")] + MissingYamlKey { key: String }, +} + +pub(crate) fn render_cfgsync_from_template( topology: &E::Deployment, hostnames: &[String], options: CfgsyncRenderOptions, @@ -26,48 +33,59 @@ pub(crate) fn render_cfgsync_from_template( let cfg = build_cfgsync_server_config(); let overrides = build_overrides::(topology, options); let config_yaml = render_cfgsync_yaml_from_template(cfg, &overrides)?; - let mut bundle = build_cfgsync_bundle_with_hostnames::(topology, hostnames)?; - append_deployment_files(&mut bundle)?; - let bundle_yaml = serde_yaml::to_string(&bundle)?; + let mut materialized = build_static_artifacts::(topology, hostnames)?; + append_deployment_files(&mut materialized)?; + let artifacts_yaml = serde_yaml::to_string(&materialized)?; Ok(RenderedCfgsync { config_yaml, - bundle_yaml, + artifacts_yaml, }) } -fn append_deployment_files(bundle: &mut CfgSyncBundle) -> Result<()> { - for node in &mut bundle.nodes { - if has_file_path(node, "/deployment.yaml") { - continue; - } - - let config_content = config_file_content(node) - .ok_or_else(|| anyhow!("cfgsync bundle node missing /config.yaml"))?; - let deployment_yaml = extract_yaml_key(&config_content, "deployment")?; - - node.files - .push(build_bundle_file("/deployment.yaml", deployment_yaml)); +fn append_deployment_files( + materialized: &mut cfgsync_adapter::MaterializedArtifacts, +) -> Result<()> { + if has_shared_file_path(materialized, "/deployment.yaml") { + return Ok(()); } + let Some((identifier, artifacts)) = materialized.iter().next() else { + return Ok(()); + }; + + let config_content = + config_file_content(artifacts).ok_or_else(|| BundleRenderError::MissingConfigFile { + identifier: identifier.to_owned(), + })?; + let deployment_yaml = extract_yaml_key(&config_content, "deployment")?; + + let mut shared = materialized.shared().clone(); + shared + .files + .push(build_artifact_file("/deployment.yaml", deployment_yaml)); + *materialized = materialized.clone().with_shared(shared); + Ok(()) } -fn has_file_path(node: &CfgSyncBundleNode, path: &str) -> bool { - node.files.iter().any(|file| file.path == path) +fn has_shared_file_path(materialized: &cfgsync_adapter::MaterializedArtifacts, path: &str) -> bool { + materialized + .shared() + .files + .iter() + .any(|file| file.path == path) } -fn config_file_content(node: &CfgSyncBundleNode) -> Option { - node.files +fn config_file_content(artifacts: &cfgsync_artifacts::ArtifactSet) -> Option { + artifacts + .files .iter() .find_map(|file| (file.path == "/config.yaml").then_some(file.content.clone())) } -fn build_bundle_file(path: &str, content: String) -> cfgsync_core::CfgSyncFile { - cfgsync_core::CfgSyncFile { - path: path.to_owned(), - content, - } +fn build_artifact_file(path: &str, content: String) -> ArtifactFile { + ArtifactFile::new(path.to_string(), content.to_string()) } fn extract_yaml_key(content: &str, key: &str) -> Result { @@ -75,7 +93,9 @@ fn extract_yaml_key(content: &str, key: &str) -> Result { let value = document .get(key) .cloned() - .ok_or_else(|| anyhow!("config yaml missing `{key}`"))?; + .ok_or_else(|| BundleRenderError::MissingYamlKey { + key: key.to_owned(), + })?; Ok(serde_yaml::to_string(&value)?) } @@ -87,21 +107,28 @@ fn build_cfgsync_server_config() -> Value { Value::Number(4400_u64.into()), ); - root.insert( - Value::String("bundle_path".to_string()), - Value::String("cfgsync.bundle.yaml".to_string()), + let mut source = Mapping::new(); + source.insert( + Value::String("kind".to_string()), + Value::String("registration".to_string()), ); + source.insert( + Value::String("artifacts_path".to_string()), + Value::String("cfgsync.artifacts.yaml".to_string()), + ); + + root.insert(Value::String("source".to_string()), Value::Mapping(source)); Value::Mapping(root) } -pub(crate) fn render_and_write_cfgsync_from_template( +pub(crate) fn render_and_write_cfgsync_from_template( topology: &E::Deployment, hostnames: &[String], mut options: CfgsyncRenderOptions, output: CfgsyncOutputPaths<'_>, ) -> Result { - ensure_bundle_path(&mut options.bundle_path, output.bundle_path); + ensure_artifacts_path(&mut options.artifacts_path, output.artifacts_path); let rendered = render_cfgsync_from_template::(topology, hostnames, options)?; write_rendered_cfgsync(&rendered, output)?; @@ -109,13 +136,13 @@ pub(crate) fn render_and_write_cfgsync_from_template( Ok(rendered) } -fn build_overrides( +fn build_overrides( topology: &E::Deployment, options: CfgsyncRenderOptions, ) -> CfgsyncConfigOverrides { let CfgsyncRenderOptions { port, - bundle_path, + artifacts_path, min_timeout_secs, metrics_otlp_ingest_url, } = options; @@ -124,7 +151,7 @@ fn build_overrides( port, n_hosts: Some(E::nodes(topology).len()), timeout_floor_secs: min_timeout_secs, - bundle_path, + artifacts_path, metrics_otlp_ingest_url: metrics_otlp_ingest_url.map(|url| url.to_string()), } } diff --git a/logos/runtime/ext/src/compose_env.rs b/logos/runtime/ext/src/compose_env.rs index 9803b12..f539ce3 100644 --- a/logos/runtime/ext/src/compose_env.rs +++ b/logos/runtime/ext/src/compose_env.rs @@ -117,7 +117,7 @@ impl ComposeDeployEnv for LbcExtEnv { nodes = topology.nodes().len(), "updating cfgsync template" ); - let bundle_path = cfgsync_bundle_path(path); + let artifacts_path = cfgsync_artifacts_path(path); let hostnames = topology_hostnames(topology); let options = cfgsync_render_options(port, metrics_otlp_ingest_url); @@ -127,7 +127,7 @@ impl ComposeDeployEnv for LbcExtEnv { options, CfgsyncOutputPaths { config_path: path, - bundle_path: &bundle_path, + artifacts_path: &artifacts_path, }, )?; Ok(()) @@ -186,11 +186,11 @@ fn node_instance_name(index: usize) -> String { format!("node-{index}") } -fn cfgsync_bundle_path(config_path: &Path) -> PathBuf { +fn cfgsync_artifacts_path(config_path: &Path) -> PathBuf { config_path .parent() .unwrap_or(config_path) - .join("cfgsync.bundle.yaml") + .join("cfgsync.artifacts.yaml") } fn topology_hostnames(topology: &DeploymentPlan) -> Vec { @@ -207,7 +207,7 @@ fn cfgsync_render_options( ) -> CfgsyncRenderOptions { CfgsyncRenderOptions { port: Some(port), - bundle_path: None, + artifacts_path: None, min_timeout_secs: None, metrics_otlp_ingest_url: metrics_otlp_ingest_url.cloned(), } @@ -254,7 +254,6 @@ fn build_compose_node_descriptor( base_volumes(), default_extra_hosts(), ports, - api_port, environment, platform, ) diff --git a/logos/runtime/ext/src/k8s_env.rs b/logos/runtime/ext/src/k8s_env.rs index 8c9a1e8..ee90f2c 100644 --- a/logos/runtime/ext/src/k8s_env.rs +++ b/logos/runtime/ext/src/k8s_env.rs @@ -31,7 +31,7 @@ use crate::{ const CFGSYNC_K8S_TIMEOUT_SECS: u64 = 300; const K8S_FULLNAME_OVERRIDE: &str = "logos-runner"; -const DEFAULT_K8S_TESTNET_IMAGE: &str = "logos-blockchain-testing:local"; +const DEFAULT_K8S_TESTNET_IMAGE: &str = "public.ecr.aws/r4s5t9y4/logos/logos-blockchain:test"; /// Paths and image metadata required to deploy the Helm chart. pub struct K8sAssets { @@ -182,11 +182,11 @@ pub fn prepare_assets( let root = workspace_root().map_err(|source| AssetsError::WorkspaceRoot { source })?; let tempdir = create_assets_tempdir()?; - let (cfgsync_file, cfgsync_yaml, bundle_yaml) = + let (cfgsync_file, cfgsync_yaml, artifacts_yaml) = render_and_write_cfgsync(topology, metrics_otlp_ingest_url, &tempdir)?; let scripts = validate_scripts(&root)?; let chart_path = helm_chart_path()?; - let values_file = render_and_write_values(topology, &tempdir, &cfgsync_yaml, &bundle_yaml)?; + let values_file = render_and_write_values(topology, &tempdir, &cfgsync_yaml, &artifacts_yaml)?; let image = testnet_image(); log_assets_prepare_done(&cfgsync_file, &values_file, &chart_path, &image); @@ -351,24 +351,24 @@ fn render_and_write_cfgsync( tempdir: &TempDir, ) -> Result<(PathBuf, String, String), AssetsError> { let cfgsync_file = tempdir.path().join("cfgsync.yaml"); - let bundle_file = tempdir.path().join("cfgsync.bundle.yaml"); - let (cfgsync_yaml, bundle_yaml) = render_cfgsync_config( + let artifacts_file = tempdir.path().join("cfgsync.artifacts.yaml"); + let (cfgsync_yaml, artifacts_yaml) = render_cfgsync_config( topology, metrics_otlp_ingest_url, &cfgsync_file, - &bundle_file, + &artifacts_file, )?; - Ok((cfgsync_file, cfgsync_yaml, bundle_yaml)) + Ok((cfgsync_file, cfgsync_yaml, artifacts_yaml)) } fn render_and_write_values( topology: &DeploymentPlan, tempdir: &TempDir, cfgsync_yaml: &str, - bundle_yaml: &str, + artifacts_yaml: &str, ) -> Result { - let values_yaml = render_values_yaml(topology, cfgsync_yaml, bundle_yaml)?; + let values_yaml = render_values_yaml(topology, cfgsync_yaml, artifacts_yaml)?; write_temp_file(tempdir.path(), "values.yaml", values_yaml) } @@ -380,7 +380,7 @@ fn render_cfgsync_config( topology: &DeploymentPlan, metrics_otlp_ingest_url: Option<&Url>, cfgsync_file: &Path, - bundle_file: &Path, + artifacts_file: &Path, ) -> Result<(String, String), AssetsError> { let hostnames = k8s_node_hostnames(topology); let rendered = render_and_write_cfgsync_from_template::( @@ -388,18 +388,18 @@ fn render_cfgsync_config( &hostnames, CfgsyncRenderOptions { port: Some(cfgsync_port()), - bundle_path: Some("cfgsync.bundle.yaml".to_string()), + artifacts_path: Some("cfgsync.artifacts.yaml".to_string()), min_timeout_secs: Some(CFGSYNC_K8S_TIMEOUT_SECS), metrics_otlp_ingest_url: metrics_otlp_ingest_url.cloned(), }, CfgsyncOutputPaths { config_path: cfgsync_file, - bundle_path: bundle_file, + artifacts_path: artifacts_file, }, ) .map_err(|source| AssetsError::Cfgsync { source })?; - Ok((rendered.config_yaml, rendered.bundle_yaml)) + Ok((rendered.config_yaml, rendered.artifacts_yaml)) } fn k8s_node_hostnames(topology: &DeploymentPlan) -> Vec { @@ -459,9 +459,9 @@ fn helm_chart_path() -> Result { fn render_values_yaml( topology: &DeploymentPlan, cfgsync_yaml: &str, - bundle_yaml: &str, + artifacts_yaml: &str, ) -> Result { - let values = build_values(topology, cfgsync_yaml, bundle_yaml); + let values = build_values(topology, cfgsync_yaml, artifacts_yaml); serde_yaml::to_string(&values).map_err(|source| AssetsError::Values { source }) } @@ -569,7 +569,7 @@ struct KzgValues { struct CfgsyncValues { port: u16, config: String, - bundle: String, + artifacts: String, } #[derive(Serialize)] @@ -589,11 +589,11 @@ struct NodeValues { env: BTreeMap, } -fn build_values(topology: &DeploymentPlan, cfgsync_yaml: &str, bundle_yaml: &str) -> HelmValues { +fn build_values(topology: &DeploymentPlan, cfgsync_yaml: &str, artifacts_yaml: &str) -> HelmValues { let cfgsync = CfgsyncValues { port: cfgsync_port(), config: cfgsync_yaml.to_string(), - bundle: bundle_yaml.to_string(), + artifacts: artifacts_yaml.to_string(), }; let kzg = KzgValues::disabled(); let image_pull_policy = diff --git a/testing-framework/core/Cargo.toml b/testing-framework/core/Cargo.toml index 00e0658..492f746 100644 --- a/testing-framework/core/Cargo.toml +++ b/testing-framework/core/Cargo.toml @@ -17,6 +17,8 @@ default = [] [dependencies] async-trait = "0.1" +cfgsync-adapter = { workspace = true } +cfgsync-artifacts = { workspace = true } futures = { default-features = false, features = ["std"], version = "0.3" } parking_lot = { workspace = true } prometheus-http-query = "0.8" diff --git a/testing-framework/core/src/cfgsync/mod.rs b/testing-framework/core/src/cfgsync/mod.rs index 8f0ad34..2f93d7b 100644 --- a/testing-framework/core/src/cfgsync/mod.rs +++ b/testing-framework/core/src/cfgsync/mod.rs @@ -1,16 +1,14 @@ use std::error::Error; +pub use cfgsync_adapter::*; +use cfgsync_artifacts::{ArtifactFile, ArtifactSet}; use thiserror::Error; +#[doc(hidden)] pub type DynCfgsyncError = Box; -#[derive(Debug, Clone)] -pub struct CfgsyncNodeConfig { - pub identifier: String, - pub config_yaml: String, -} - -pub trait CfgsyncEnv { +#[doc(hidden)] +pub trait StaticArtifactRenderer { type Deployment; type Node; type NodeConfig; @@ -35,8 +33,11 @@ pub trait CfgsyncEnv { fn serialize_node_config(config: &Self::NodeConfig) -> Result; } +#[doc(hidden)] +pub use StaticArtifactRenderer as CfgsyncEnv; + #[derive(Debug, Error)] -pub enum BuildCfgsyncNodesError { +pub enum BuildStaticArtifactsError { #[error("cfgsync hostnames mismatch (nodes={nodes}, hostnames={hostnames})")] HostnameCountMismatch { nodes: usize, hostnames: usize }, #[error("cfgsync adapter failed: {source}")] @@ -46,39 +47,47 @@ pub enum BuildCfgsyncNodesError { }, } -fn adapter_error(source: E) -> BuildCfgsyncNodesError +fn adapter_error(source: E) -> BuildStaticArtifactsError where E: Error + Send + Sync + 'static, { - BuildCfgsyncNodesError::Adapter { + BuildStaticArtifactsError::Adapter { source: Box::new(source), } } -pub fn build_cfgsync_node_configs( +pub fn build_static_artifacts( deployment: &E::Deployment, hostnames: &[String], -) -> Result, BuildCfgsyncNodesError> { +) -> Result { let nodes = E::nodes(deployment); + if nodes.len() != hostnames.len() { - return Err(BuildCfgsyncNodesError::HostnameCountMismatch { + return Err(BuildStaticArtifactsError::HostnameCountMismatch { nodes: nodes.len(), hostnames: hostnames.len(), }); } - let mut output = Vec::with_capacity(nodes.len()); + let mut output = std::collections::HashMap::with_capacity(nodes.len()); + for (index, node) in nodes.iter().enumerate() { let mut node_config = E::build_node_config(deployment, node).map_err(adapter_error)?; E::rewrite_for_hostnames(deployment, index, hostnames, &mut node_config) .map_err(adapter_error)?; let config_yaml = E::serialize_node_config(&node_config).map_err(adapter_error)?; - output.push(CfgsyncNodeConfig { - identifier: E::node_identifier(index, node), - config_yaml, - }); + output.insert( + E::node_identifier(index, node), + ArtifactSet::new(vec![ArtifactFile::new( + "/config.yaml".to_string(), + config_yaml.clone(), + )]), + ); } - Ok(output) + Ok(cfgsync_adapter::MaterializedArtifacts::from_nodes(output)) } + +#[doc(hidden)] +pub use build_static_artifacts as build_cfgsync_node_catalog; diff --git a/testing-framework/deployers/compose/Cargo.toml b/testing-framework/deployers/compose/Cargo.toml index 9ac8b7a..4df1f40 100644 --- a/testing-framework/deployers/compose/Cargo.toml +++ b/testing-framework/deployers/compose/Cargo.toml @@ -31,4 +31,5 @@ uuid = { features = ["v4"], version = "1" } [dev-dependencies] groth16 = { workspace = true } key-management-system-service = { workspace = true } +serde_json = { workspace = true } zksign = { workspace = true } diff --git a/testing-framework/deployers/compose/assets/docker-compose.yml.tera b/testing-framework/deployers/compose/assets/docker-compose.yml.tera index c6ecc29..ba21922 100644 --- a/testing-framework/deployers/compose/assets/docker-compose.yml.tera +++ b/testing-framework/deployers/compose/assets/docker-compose.yml.tera @@ -18,9 +18,6 @@ services: {% for port in node.ports %} - {{ port }} {% endfor %} - labels: - testing-framework.node: "true" - testing-framework.api-container-port: "{{ node.api_container_port }}" environment: {% for env in node.environment %} {{ env.key }}: "{{ env.value }}" diff --git a/testing-framework/deployers/compose/src/descriptor/node.rs b/testing-framework/deployers/compose/src/descriptor/node.rs index d35f8f5..c5f769b 100644 --- a/testing-framework/deployers/compose/src/descriptor/node.rs +++ b/testing-framework/deployers/compose/src/descriptor/node.rs @@ -9,7 +9,6 @@ pub struct NodeDescriptor { volumes: Vec, extra_hosts: Vec, ports: Vec, - api_container_port: u16, environment: Vec, #[serde(skip_serializing_if = "Option::is_none")] platform: Option, @@ -50,7 +49,6 @@ impl NodeDescriptor { volumes: Vec, extra_hosts: Vec, ports: Vec, - api_container_port: u16, environment: Vec, platform: Option, ) -> Self { @@ -61,7 +59,6 @@ impl NodeDescriptor { volumes, extra_hosts, ports, - api_container_port, environment, platform, } @@ -80,9 +77,4 @@ impl NodeDescriptor { pub fn environment(&self) -> &[EnvEntry] { &self.environment } - - #[cfg(test)] - pub fn api_container_port(&self) -> u16 { - self.api_container_port - } } diff --git a/testing-framework/deployers/k8s/src/env.rs b/testing-framework/deployers/k8s/src/env.rs index 4ec7fd2..353d7e9 100644 --- a/testing-framework/deployers/k8s/src/env.rs +++ b/testing-framework/deployers/k8s/src/env.rs @@ -106,7 +106,8 @@ pub trait K8sDeployEnv: Application { format!("{release}-node-{index}") } - /// Label selector used to discover managed node services in attached mode. + /// Label selector used to discover managed node services in + /// existing-cluster mode. fn attach_node_service_selector(release: &str) -> String { format!("app.kubernetes.io/instance={release}") } diff --git a/testing-framework/tools/cfgsync-core/Cargo.toml b/testing-framework/tools/cfgsync-core/Cargo.toml deleted file mode 100644 index 63f2d1f..0000000 --- a/testing-framework/tools/cfgsync-core/Cargo.toml +++ /dev/null @@ -1,21 +0,0 @@ -[package] -categories = { workspace = true } -description = { workspace = true } -edition = { workspace = true } -keywords = { workspace = true } -license = { workspace = true } -name = "cfgsync-core" -readme = { workspace = true } -repository = { workspace = true } -version = { workspace = true } - -[lints] -workspace = true - -[dependencies] -axum = { default-features = false, features = ["http1", "http2", "json", "tokio"], version = "0.7.5" } -reqwest = { features = ["json"], workspace = true } -serde = { default-features = false, features = ["derive"], version = "1" } -serde_json = { workspace = true } -thiserror = { workspace = true } -tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" } diff --git a/testing-framework/tools/cfgsync-core/src/client.rs b/testing-framework/tools/cfgsync-core/src/client.rs deleted file mode 100644 index 2df652c..0000000 --- a/testing-framework/tools/cfgsync-core/src/client.rs +++ /dev/null @@ -1,94 +0,0 @@ -use serde::Serialize; -use thiserror::Error; - -use crate::{ - repo::{CfgSyncErrorResponse, CfgSyncPayload}, - server::ClientIp, -}; - -#[derive(Debug, Error)] -pub enum ClientError { - #[error("request failed: {0}")] - Request(#[from] reqwest::Error), - #[error("cfgsync server error {status}: {message}")] - Status { - status: reqwest::StatusCode, - message: String, - error: Option, - }, - #[error("failed to parse cfgsync response: {0}")] - Decode(serde_json::Error), -} - -#[derive(Clone, Debug)] -pub struct CfgSyncClient { - base_url: String, - http: reqwest::Client, -} - -impl CfgSyncClient { - #[must_use] - pub fn new(base_url: impl Into) -> Self { - let mut base_url = base_url.into(); - while base_url.ends_with('/') { - base_url.pop(); - } - Self { - base_url, - http: reqwest::Client::new(), - } - } - - #[must_use] - pub fn base_url(&self) -> &str { - &self.base_url - } - - pub async fn fetch_node_config( - &self, - payload: &ClientIp, - ) -> Result { - self.post_json("/node", payload).await - } - - pub async fn fetch_init_with_node_config( - &self, - payload: &ClientIp, - ) -> Result { - self.post_json("/init-with-node", payload).await - } - - pub async fn post_json( - &self, - path: &str, - payload: &P, - ) -> Result { - let url = self.endpoint_url(path); - let response = self.http.post(url).json(payload).send().await?; - - let status = response.status(); - let body = response.text().await?; - if !status.is_success() { - let error = serde_json::from_str::(&body).ok(); - let message = error - .as_ref() - .map(|err| err.message.clone()) - .unwrap_or_else(|| body.clone()); - return Err(ClientError::Status { - status, - message, - error, - }); - } - - serde_json::from_str(&body).map_err(ClientError::Decode) - } - - fn endpoint_url(&self, path: &str) -> String { - if path.starts_with('/') { - format!("{}{}", self.base_url, path) - } else { - format!("{}/{}", self.base_url, path) - } - } -} diff --git a/testing-framework/tools/cfgsync-core/src/lib.rs b/testing-framework/tools/cfgsync-core/src/lib.rs deleted file mode 100644 index 822ae69..0000000 --- a/testing-framework/tools/cfgsync-core/src/lib.rs +++ /dev/null @@ -1,10 +0,0 @@ -pub mod client; -pub mod repo; -pub mod server; - -pub use client::{CfgSyncClient, ClientError}; -pub use repo::{ - CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile, CfgSyncPayload, - ConfigRepo, RepoResponse, -}; -pub use server::{CfgSyncState, ClientIp, RunCfgsyncError, cfgsync_app, run_cfgsync}; diff --git a/testing-framework/tools/cfgsync-core/src/repo.rs b/testing-framework/tools/cfgsync-core/src/repo.rs deleted file mode 100644 index 62e320c..0000000 --- a/testing-framework/tools/cfgsync-core/src/repo.rs +++ /dev/null @@ -1,107 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use serde::{Deserialize, Serialize}; -use thiserror::Error; -use tokio::sync::oneshot::Sender; - -pub const CFGSYNC_SCHEMA_VERSION: u16 = 1; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CfgSyncFile { - pub path: String, - pub content: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CfgSyncPayload { - pub schema_version: u16, - #[serde(default)] - pub files: Vec, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub config_yaml: Option, -} - -impl CfgSyncPayload { - #[must_use] - pub fn from_files(files: Vec) -> Self { - Self { - schema_version: CFGSYNC_SCHEMA_VERSION, - files, - config_yaml: None, - } - } - - #[must_use] - pub fn normalized_files(&self, default_config_path: &str) -> Vec { - if !self.files.is_empty() { - return self.files.clone(); - } - - self.config_yaml - .as_ref() - .map(|content| { - vec![CfgSyncFile { - path: default_config_path.to_owned(), - content: content.clone(), - }] - }) - .unwrap_or_default() - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum CfgSyncErrorCode { - MissingConfig, - Internal, -} - -#[derive(Debug, Clone, Serialize, Deserialize, Error)] -#[error("{code:?}: {message}")] -pub struct CfgSyncErrorResponse { - pub code: CfgSyncErrorCode, - pub message: String, -} - -impl CfgSyncErrorResponse { - #[must_use] - pub fn missing_config(identifier: &str) -> Self { - Self { - code: CfgSyncErrorCode::MissingConfig, - message: format!("missing config for host {identifier}"), - } - } - - #[must_use] - pub fn internal(message: impl Into) -> Self { - Self { - code: CfgSyncErrorCode::Internal, - message: message.into(), - } - } -} - -pub enum RepoResponse { - Config(CfgSyncPayload), - Error(CfgSyncErrorResponse), -} - -pub struct ConfigRepo { - configs: HashMap, -} - -impl ConfigRepo { - #[must_use] - pub fn from_bundle(configs: HashMap) -> Arc { - Arc::new(Self { configs }) - } - - pub async fn register(&self, identifier: String, reply_tx: Sender) { - let response = self.configs.get(&identifier).cloned().map_or_else( - || RepoResponse::Error(CfgSyncErrorResponse::missing_config(&identifier)), - RepoResponse::Config, - ); - - let _ = reply_tx.send(response); - } -} diff --git a/testing-framework/tools/cfgsync-core/src/server.rs b/testing-framework/tools/cfgsync-core/src/server.rs deleted file mode 100644 index f519d53..0000000 --- a/testing-framework/tools/cfgsync-core/src/server.rs +++ /dev/null @@ -1,95 +0,0 @@ -use std::{io, net::Ipv4Addr, sync::Arc}; - -use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::post}; -use serde::{Deserialize, Serialize}; -use thiserror::Error; -use tokio::sync::oneshot::channel; - -use crate::repo::{CfgSyncErrorResponse, ConfigRepo, RepoResponse}; - -#[derive(Serialize, Deserialize)] -pub struct ClientIp { - /// Node IP that can be used by clients for observability/logging. - pub ip: Ipv4Addr, - /// Stable node identifier used as key in cfgsync bundle lookup. - pub identifier: String, -} - -pub struct CfgSyncState { - repo: Arc, -} - -impl CfgSyncState { - #[must_use] - pub fn new(repo: Arc) -> Self { - Self { repo } - } -} - -#[derive(Debug, Error)] -pub enum RunCfgsyncError { - #[error("failed to bind cfgsync server on {bind_addr}: {source}")] - Bind { - bind_addr: String, - #[source] - source: io::Error, - }, - #[error("cfgsync server terminated unexpectedly: {source}")] - Serve { - #[source] - source: io::Error, - }, -} - -async fn node_config( - State(state): State>, - Json(payload): Json, -) -> impl IntoResponse { - let identifier = payload.identifier.clone(); - let (reply_tx, reply_rx) = channel(); - state.repo.register(identifier, reply_tx).await; - - match reply_rx.await { - Err(_) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(CfgSyncErrorResponse::internal( - "error receiving config from repo", - )), - ) - .into_response(), - Ok(RepoResponse::Config(payload_data)) => { - (StatusCode::OK, Json(payload_data)).into_response() - } - - Ok(RepoResponse::Error(error)) => { - let status = match error.code { - crate::repo::CfgSyncErrorCode::MissingConfig => StatusCode::NOT_FOUND, - crate::repo::CfgSyncErrorCode::Internal => StatusCode::INTERNAL_SERVER_ERROR, - }; - (status, Json(error)).into_response() - } - } -} - -pub fn cfgsync_app(state: CfgSyncState) -> Router { - Router::new() - .route("/node", post(node_config)) - .route("/init-with-node", post(node_config)) - .with_state(Arc::new(state)) -} - -pub async fn run_cfgsync(port: u16, state: CfgSyncState) -> Result<(), RunCfgsyncError> { - let app = cfgsync_app(state); - println!("Server running on http://0.0.0.0:{port}"); - - let bind_addr = format!("0.0.0.0:{port}"); - let listener = tokio::net::TcpListener::bind(&bind_addr) - .await - .map_err(|source| RunCfgsyncError::Bind { bind_addr, source })?; - - axum::serve(listener, app) - .await - .map_err(|source| RunCfgsyncError::Serve { source })?; - - Ok(()) -} diff --git a/testing-framework/tools/cfgsync-runtime/Cargo.toml b/testing-framework/tools/cfgsync-runtime/Cargo.toml deleted file mode 100644 index f51d081..0000000 --- a/testing-framework/tools/cfgsync-runtime/Cargo.toml +++ /dev/null @@ -1,22 +0,0 @@ -[package] -categories = { workspace = true } -description = { workspace = true } -edition = { workspace = true } -keywords = { workspace = true } -license = { workspace = true } -name = "cfgsync-runtime" -readme = { workspace = true } -repository = { workspace = true } -version = { workspace = true } - -[lints] -workspace = true - -[dependencies] -anyhow = "1" -cfgsync-core = { workspace = true } -clap = { version = "4", features = ["derive"] } -serde = { workspace = true } -serde_yaml = { workspace = true } -testing-framework-core = { workspace = true } -tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" } diff --git a/testing-framework/tools/cfgsync-runtime/src/bundle.rs b/testing-framework/tools/cfgsync-runtime/src/bundle.rs deleted file mode 100644 index 8aa257d..0000000 --- a/testing-framework/tools/cfgsync-runtime/src/bundle.rs +++ /dev/null @@ -1,39 +0,0 @@ -use anyhow::Result; -use cfgsync_core::CfgSyncFile; -use serde::{Deserialize, Serialize}; -use testing_framework_core::cfgsync::{CfgsyncEnv, build_cfgsync_node_configs}; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CfgSyncBundle { - pub nodes: Vec, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CfgSyncBundleNode { - pub identifier: String, - #[serde(default)] - pub files: Vec, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub config_yaml: Option, -} - -pub fn build_cfgsync_bundle_with_hostnames( - deployment: &E::Deployment, - hostnames: &[String], -) -> Result { - let nodes = build_cfgsync_node_configs::(deployment, hostnames)?; - - Ok(CfgSyncBundle { - nodes: nodes - .into_iter() - .map(|node| CfgSyncBundleNode { - identifier: node.identifier, - files: vec![CfgSyncFile { - path: "/config.yaml".to_owned(), - content: node.config_yaml, - }], - config_yaml: None, - }) - .collect(), - }) -} diff --git a/testing-framework/tools/cfgsync-runtime/src/client.rs b/testing-framework/tools/cfgsync-runtime/src/client.rs deleted file mode 100644 index 364fdf6..0000000 --- a/testing-framework/tools/cfgsync-runtime/src/client.rs +++ /dev/null @@ -1,108 +0,0 @@ -use std::{ - env, fs, - net::Ipv4Addr, - path::{Path, PathBuf}, -}; - -use anyhow::{Context as _, Result, anyhow, bail}; -use cfgsync_core::{CFGSYNC_SCHEMA_VERSION, CfgSyncClient, CfgSyncFile, CfgSyncPayload, ClientIp}; -use tokio::time::{Duration, sleep}; - -const FETCH_ATTEMPTS: usize = 5; -const FETCH_RETRY_DELAY: Duration = Duration::from_millis(250); - -fn parse_ip(ip_str: &str) -> Ipv4Addr { - ip_str.parse().unwrap_or(Ipv4Addr::LOCALHOST) -} - -async fn fetch_with_retry(payload: &ClientIp, server_addr: &str) -> Result { - let client = CfgSyncClient::new(server_addr); - let mut last_error: Option = None; - - for attempt in 1..=FETCH_ATTEMPTS { - match client.fetch_node_config(payload).await { - Ok(config) => return Ok(config), - Err(error) => { - last_error = Some(error.into()); - - if attempt < FETCH_ATTEMPTS { - sleep(FETCH_RETRY_DELAY).await; - } - } - } - } - - match last_error { - Some(error) => Err(error), - None => Err(anyhow!("cfgsync client fetch failed without an error")), - } -} - -async fn pull_config_files(payload: ClientIp, server_addr: &str, config_file: &str) -> Result<()> { - let config = fetch_with_retry(&payload, server_addr) - .await - .context("fetching cfgsync node config")?; - ensure_schema_version(&config)?; - - let files = collect_payload_files(&config, config_file)?; - - for file in files { - write_cfgsync_file(&file)?; - } - - println!("Config files saved"); - Ok(()) -} - -fn ensure_schema_version(config: &CfgSyncPayload) -> Result<()> { - if config.schema_version != CFGSYNC_SCHEMA_VERSION { - bail!( - "unsupported cfgsync payload schema version {}, expected {}", - config.schema_version, - CFGSYNC_SCHEMA_VERSION - ); - } - - Ok(()) -} - -fn collect_payload_files(config: &CfgSyncPayload, config_file: &str) -> Result> { - let files = config.normalized_files(config_file); - if files.is_empty() { - bail!("cfgsync payload contains no files"); - } - - Ok(files) -} - -fn write_cfgsync_file(file: &CfgSyncFile) -> Result<()> { - let path = PathBuf::from(&file.path); - - ensure_parent_dir(&path)?; - - fs::write(&path, &file.content).with_context(|| format!("writing {}", path.display()))?; - - println!("Config saved to {}", path.display()); - Ok(()) -} - -fn ensure_parent_dir(path: &Path) -> Result<()> { - if let Some(parent) = path.parent() { - if !parent.as_os_str().is_empty() { - fs::create_dir_all(parent) - .with_context(|| format!("creating parent directory {}", parent.display()))?; - } - } - Ok(()) -} - -pub async fn run_cfgsync_client_from_env(default_port: u16) -> Result<()> { - let config_file_path = env::var("CFG_FILE_PATH").unwrap_or_else(|_| "config.yaml".to_owned()); - let server_addr = - env::var("CFG_SERVER_ADDR").unwrap_or_else(|_| format!("http://127.0.0.1:{default_port}")); - let ip = parse_ip(&env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_owned())); - let identifier = - env::var("CFG_HOST_IDENTIFIER").unwrap_or_else(|_| "unidentified-node".to_owned()); - - pull_config_files(ClientIp { ip, identifier }, &server_addr, &config_file_path).await -} diff --git a/testing-framework/tools/cfgsync-runtime/src/lib.rs b/testing-framework/tools/cfgsync-runtime/src/lib.rs deleted file mode 100644 index bc28a08..0000000 --- a/testing-framework/tools/cfgsync-runtime/src/lib.rs +++ /dev/null @@ -1,10 +0,0 @@ -pub mod bundle; -pub mod render; - -pub use cfgsync_core as core; - -mod client; -mod server; - -pub use client::run_cfgsync_client_from_env; -pub use server::{CfgSyncServerConfig, run_cfgsync_server}; diff --git a/testing-framework/tools/cfgsync-runtime/src/server.rs b/testing-framework/tools/cfgsync-runtime/src/server.rs deleted file mode 100644 index 4c3e59d..0000000 --- a/testing-framework/tools/cfgsync-runtime/src/server.rs +++ /dev/null @@ -1,101 +0,0 @@ -use std::{collections::HashMap, fs, path::Path, sync::Arc}; - -use anyhow::Context as _; -use cfgsync_core::{CfgSyncFile, CfgSyncPayload, CfgSyncState, ConfigRepo, run_cfgsync}; -use serde::Deserialize; - -#[derive(Debug, Deserialize, Clone)] -pub struct CfgSyncServerConfig { - pub port: u16, - pub bundle_path: String, -} - -impl CfgSyncServerConfig { - pub fn load_from_file(path: &Path) -> anyhow::Result { - let config_content = fs::read_to_string(path) - .with_context(|| format!("failed to read cfgsync config file {}", path.display()))?; - serde_yaml::from_str(&config_content) - .with_context(|| format!("failed to parse cfgsync config file {}", path.display())) - } -} - -#[derive(Debug, Deserialize)] -struct CfgSyncBundle { - nodes: Vec, -} - -#[derive(Debug, Deserialize)] -struct CfgSyncBundleNode { - identifier: String, - #[serde(default)] - files: Vec, - #[serde(default)] - config_yaml: Option, -} - -fn load_bundle(bundle_path: &Path) -> anyhow::Result> { - let bundle = read_cfgsync_bundle(bundle_path)?; - - let configs = bundle - .nodes - .into_iter() - .map(build_repo_entry) - .collect::>(); - - Ok(ConfigRepo::from_bundle(configs)) -} - -fn read_cfgsync_bundle(bundle_path: &Path) -> anyhow::Result { - let bundle_content = fs::read_to_string(bundle_path).with_context(|| { - format!( - "failed to read cfgsync bundle file {}", - bundle_path.display() - ) - })?; - - serde_yaml::from_str(&bundle_content) - .with_context(|| format!("failed to parse cfgsync bundle {}", bundle_path.display())) -} - -fn build_repo_entry(node: CfgSyncBundleNode) -> (String, CfgSyncPayload) { - let files = if node.files.is_empty() { - build_legacy_files(node.config_yaml) - } else { - node.files - }; - - (node.identifier, CfgSyncPayload::from_files(files)) -} - -fn build_legacy_files(config_yaml: Option) -> Vec { - config_yaml - .map(|content| { - vec![CfgSyncFile { - path: "/config.yaml".to_owned(), - content, - }] - }) - .unwrap_or_default() -} - -fn resolve_bundle_path(config_path: &Path, bundle_path: &str) -> std::path::PathBuf { - let path = Path::new(bundle_path); - if path.is_absolute() { - return path.to_path_buf(); - } - - config_path - .parent() - .unwrap_or_else(|| Path::new(".")) - .join(path) -} - -pub async fn run_cfgsync_server(config_path: &Path) -> anyhow::Result<()> { - let config = CfgSyncServerConfig::load_from_file(config_path)?; - let bundle_path = resolve_bundle_path(config_path, &config.bundle_path); - - let repo = load_bundle(&bundle_path)?; - let state = CfgSyncState::new(repo); - run_cfgsync(config.port, state).await?; - Ok(()) -}