feat(cfgsync): as a generally usable framework

This commit is contained in:
Andrus Salumets 2026-03-18 15:30:21 +07:00 committed by GitHub
commit 4b44a962d6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
52 changed files with 3325 additions and 746 deletions

40
Cargo.lock generated
View File

@ -916,14 +916,36 @@ dependencies = [
"syn 2.0.114",
]
[[package]]
name = "cfgsync-adapter"
version = "0.1.0"
dependencies = [
"cfgsync-artifacts",
"cfgsync-core",
"serde",
"serde_json",
]
[[package]]
name = "cfgsync-artifacts"
version = "0.1.0"
dependencies = [
"serde",
"thiserror 2.0.18",
]
[[package]]
name = "cfgsync-core"
version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"cfgsync-artifacts",
"reqwest",
"serde",
"serde_json",
"serde_yaml",
"tempfile",
"thiserror 2.0.18",
"tokio",
]
@ -933,12 +955,17 @@ name = "cfgsync-runtime"
version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"cfgsync-adapter",
"cfgsync-artifacts",
"cfgsync-core",
"clap",
"serde",
"serde_yaml",
"testing-framework-core",
"tempfile",
"thiserror 2.0.18",
"tokio",
"tracing",
]
[[package]]
@ -1299,7 +1326,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de"
dependencies = [
"data-encoding",
"syn 2.0.114",
"syn 1.0.109",
]
[[package]]
@ -2891,8 +2918,9 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"cfgsync-adapter",
"cfgsync-artifacts",
"cfgsync-core",
"cfgsync-runtime",
"kube",
"logos-blockchain-http-api-common",
"reqwest",
@ -5499,9 +5527,9 @@ dependencies = [
[[package]]
name = "quinn-proto"
version = "0.11.13"
version = "0.11.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31"
checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098"
dependencies = [
"bytes",
"getrandom 0.3.4",
@ -6528,6 +6556,8 @@ name = "testing-framework-core"
version = "0.1.0"
dependencies = [
"async-trait",
"cfgsync-adapter",
"cfgsync-artifacts",
"futures",
"parking_lot",
"prometheus-http-query",

View File

@ -1,5 +1,9 @@
[workspace]
members = [
"cfgsync/adapter",
"cfgsync/artifacts",
"cfgsync/core",
"cfgsync/runtime",
"logos/examples",
"logos/runtime/env",
"logos/runtime/ext",
@ -8,8 +12,6 @@ members = [
"testing-framework/deployers/compose",
"testing-framework/deployers/k8s",
"testing-framework/deployers/local",
"testing-framework/tools/cfgsync-core",
"testing-framework/tools/cfgsync-runtime",
]
resolver = "2"
@ -31,7 +33,9 @@ all = "allow"
[workspace.dependencies]
# Local testing framework crates
cfgsync-core = { default-features = false, path = "testing-framework/tools/cfgsync-core" }
cfgsync-adapter = { default-features = false, path = "cfgsync/adapter" }
cfgsync-artifacts = { default-features = false, path = "cfgsync/artifacts" }
cfgsync-core = { default-features = false, path = "cfgsync/core" }
lb-ext = { default-features = false, path = "logos/runtime/ext" }
lb-framework = { default-features = false, package = "testing_framework", git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "5ebe88a6e89ec6d7dd89e123c46f6b26dd1e4667" }
lb-workloads = { default-features = false, path = "logos/runtime/workloads" }
@ -40,10 +44,11 @@ testing-framework-env = { default-features = false, path = "logos/run
testing-framework-runner-compose = { default-features = false, path = "testing-framework/deployers/compose" }
testing-framework-runner-k8s = { default-features = false, path = "testing-framework/deployers/k8s" }
testing-framework-runner-local = { default-features = false, path = "testing-framework/deployers/local" }
testing-framework-workflows = { default-features = false, package = "lb-workloads", path = "logos/runtime/workloads" }
# Logos dependencies (from logos-blockchain master @ deccbb2d2)
broadcast-service = { default-features = false, package = "logos-blockchain-chain-broadcast-service", git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "5ebe88a6e89ec6d7dd89e123c46f6b26dd1e4667" }
cfgsync_runtime = { default-features = false, package = "cfgsync-runtime", path = "testing-framework/tools/cfgsync-runtime" }
cfgsync_runtime = { default-features = false, package = "cfgsync-runtime", path = "cfgsync/runtime" }
chain-leader = { default-features = false, features = [
"pol-dev-mode",
], package = "logos-blockchain-chain-leader-service", git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "5ebe88a6e89ec6d7dd89e123c46f6b26dd1e4667" }

265
cfgsync/README.md Normal file
View File

@ -0,0 +1,265 @@
# cfgsync
`cfgsync` is a small library stack for bootstrap-time config delivery.
The library solves one problem: nodes need to identify themselves, wait until configuration is ready, fetch the files they need, write them locally, and then continue startup. `cfgsync` owns that transport and serving loop. The application using it still decides what “ready” means and what files should be generated.
That split is the point of the design:
- `cfgsync` owns registration, polling, payload transport, and file delivery.
- the application adapter owns readiness policy and artifact generation.
The result is a reusable library without application-specific bootstrap logic leaking into core crates.
## How it works
The normal flow is registration-backed serving.
Each node first sends a registration containing:
- a stable node identifier
- its IP address
- optional typed application metadata
The server stores registrations and builds a `RegistrationSnapshot`. The application provides a `RegistrationSnapshotMaterializer`, which receives that snapshot and decides whether configuration is ready yet.
If the materializer returns `NotReady`, the node keeps polling. If it returns `Ready`, cfgsync serves one payload containing:
- node-local files for the requesting node
- optional shared files that every node should receive
The node then writes those files locally and continues startup.
That is the main model. Everything else is a variation of it.
## Precomputed artifacts
Some systems already know the final artifacts before any node starts. That still fits the same model.
In that case the server simply starts with precomputed `MaterializedArtifacts`. Nodes still register and fetch through the same protocol, but the materializer already knows the final outputs. Registration becomes an identity and readiness gate, not a source of topology discovery.
This is why cfgsync no longer needs a separate “static mode” as a first-class concept. Precomputed serving is just registration-backed serving with an already-known result.
## Crate layout
### `cfgsync-artifacts`
This crate contains the file-level data model:
- `ArtifactFile` for a single file
- `ArtifactSet` for a group of files
If all you need is “what files exist and how are they grouped”, this is the crate to look at.
### `cfgsync-core`
This crate contains the protocol and the low-level HTTP implementation.
Important types here are:
- `NodeRegistration`
- `RegistrationPayload`
- `NodeArtifactsPayload`
- `Client`
- `NodeConfigSource`
It also defines the HTTP contract:
- `POST /register`
- `POST /node`
The server answers with either a payload, `NotReady`, or `Missing`.
### `cfgsync-adapter`
This crate defines the application-facing seam.
The key types are:
- `RegistrationSnapshot`
- `RegistrationSnapshotMaterializer`
- `MaterializedArtifacts`
- `MaterializationResult`
The adapters job is simple: given the current registration snapshot, decide whether artifacts are ready, and if they are, return them.
The crate also contains reusable wrappers around that seam:
- `CachedSnapshotMaterializer`
- `PersistingSnapshotMaterializer`
- `RegistrationConfigSource`
These exist because caching and result persistence are generic orchestration concerns, not application-specific logic.
### `cfgsync-runtime`
This crate provides the operational entrypoints.
Use it when you want to run cfgsync rather than define its protocol:
- client-side fetch/write helpers
- server config loading
- direct serving helpers such as `serve(...)`
This is the crate that should feel like the normal “start here” path for users integrating cfgsync into a real system.
## Artifact model
The adapter usually thinks in full snapshots, but cfgsync serves one node at a time.
The materializer returns `MaterializedArtifacts`, which contain:
- node-local artifacts keyed by node identifier
- optional shared artifacts
When one node fetches config, cfgsync resolves that nodes local files, merges in the shared files, and returns a single payload.
That is why applications usually do not need a second “shared config” endpoint. Shared files can travel in the same payload as node-local files.
## The adapter boundary
The adapter is where application semantics belong.
In practice, the adapter should define:
- the typed registration payload
- the readiness rule
- the conversion from registration snapshots into artifacts
- any shared artifact generation the application needs
Typical examples are:
- waiting for `n` initial nodes
- deriving peer lists from registrations
- generating one node-local config file per node
- generating one shared deployment file for all nodes
What does not belong in cfgsync core is equally important. Generic cfgsync should not understand:
- application-specific topology semantics
- genesis or deployment generation rules for one protocol
- application-specific command/state-machine logic
- domain-specific ideas of what a node “really is”
Those belong in the adapter or in the consuming application.
## Start here
Start with the examples in `cfgsync/runtime/examples/`.
- `minimal_cfgsync.rs` shows the smallest complete flow: serve cfgsync, register
one node, fetch artifacts, and write them locally.
- `precomputed_registration_cfgsync.rs` shows how precomputed artifacts still
use the same registration flow, including a later node that joins after the
server is already running.
- `wait_for_registrations_cfgsync.rs` shows the normal `NotReady` path: one node
waits until the materializer sees enough registrations, then both nodes
receive config.
Those three examples cover the full public model. The rest of this README just
names the pieces and explains where application-specific logic belongs.
## Minimal integration path
For a new application, the shortest sensible path is:
1. define a typed registration payload
2. implement `RegistrationSnapshotMaterializer`
3. return node-local and optional shared artifacts
4. serve them with `serve(...)`
5. use `Client` on the node side
That gives you the main value of the library without pushing application logic
into cfgsync itself.
## API sketch
Typed registration payload:
```rust
use cfgsync_core::NodeRegistration;
#[derive(serde::Serialize)]
struct MyNodeMetadata {
network_port: u16,
api_port: u16,
}
let registration = NodeRegistration::new("node-1".to_string(), "127.0.0.1".parse().unwrap())
.with_metadata(&MyNodeMetadata {
network_port: 3000,
api_port: 18080,
})?;
```
Snapshot materializer:
```rust
use cfgsync_adapter::{
DynCfgsyncError, MaterializationResult, MaterializedArtifacts, RegistrationSnapshot,
RegistrationSnapshotMaterializer,
};
use cfgsync_artifacts::{ArtifactFile, ArtifactSet};
struct MyMaterializer;
impl RegistrationSnapshotMaterializer for MyMaterializer {
fn materialize_snapshot(
&self,
registrations: &RegistrationSnapshot,
) -> Result<MaterializationResult, DynCfgsyncError> {
if registrations.len() < 2 {
return Ok(MaterializationResult::NotReady);
}
let nodes = registrations.iter().map(|registration| {
(
registration.identifier.clone(),
ArtifactSet::new(vec![ArtifactFile::new(
"/config.yaml",
format!("id: {}\n", registration.identifier),
)]),
)
});
Ok(MaterializationResult::ready(
MaterializedArtifacts::from_nodes(nodes),
))
}
}
```
Serving:
```rust
use cfgsync_runtime::serve;
# async fn run() -> anyhow::Result<()> {
serve(4400, MyMaterializer).await?;
# Ok(())
# }
```
Fetching and writing artifacts:
```rust
use cfgsync_runtime::{Client, OutputMap};
# async fn run(registration: cfgsync_core::NodeRegistration) -> anyhow::Result<()> {
let outputs = OutputMap::config_and_shared(
"/node-data/node-1/config.yaml",
"/node-data/shared",
);
Client::new("http://127.0.0.1:4400")
.fetch_and_write(&registration, &outputs)
.await?;
# Ok(())
# }
```
## Compatibility
The intended public API is what the crate roots reexport today.
Some older compatibility paths still exist internally to avoid breaking current in-repo consumers, but they are not the main model and should not be treated as the recommended public surface.

View File

@ -0,0 +1,19 @@
[package]
categories = { workspace = true }
description = { workspace = true }
edition = { workspace = true }
keywords = { workspace = true }
license = { workspace = true }
name = "cfgsync-adapter"
readme = { workspace = true }
repository = { workspace = true }
version = { workspace = true }
[lints]
workspace = true
[dependencies]
cfgsync-artifacts = { workspace = true }
cfgsync-core = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }

View File

@ -0,0 +1,75 @@
use std::collections::HashMap;
use cfgsync_artifacts::{ArtifactFile, ArtifactSet};
use serde::{Deserialize, Serialize};
/// Fully materialized cfgsync artifacts for a registration set.
///
/// `nodes` holds the node-local files keyed by stable node identifier.
/// `shared` holds files that should be delivered alongside every node.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct MaterializedArtifacts {
nodes: HashMap<String, ArtifactSet>,
shared: ArtifactSet,
}
impl MaterializedArtifacts {
/// Creates materialized artifacts from node-local artifact sets.
#[must_use]
pub fn from_nodes<I>(nodes: I) -> Self
where
I: IntoIterator<Item = (String, ArtifactSet)>,
{
Self {
nodes: nodes.into_iter().collect(),
shared: ArtifactSet::default(),
}
}
/// Attaches shared files delivered alongside every node.
#[must_use]
pub fn with_shared(mut self, shared: ArtifactSet) -> Self {
self.shared = shared;
self
}
/// Returns the node-local artifact set for one identifier.
#[must_use]
pub fn node(&self, identifier: &str) -> Option<&ArtifactSet> {
self.nodes.get(identifier)
}
/// Returns the shared artifact set.
#[must_use]
pub fn shared(&self) -> &ArtifactSet {
&self.shared
}
/// Returns the number of node-local artifact sets.
#[must_use]
pub fn len(&self) -> usize {
self.nodes.len()
}
/// Returns `true` when no node-local artifact sets are present.
#[must_use]
pub fn is_empty(&self) -> bool {
self.nodes.is_empty()
}
/// Resolves the full file set that should be written for one node.
#[must_use]
pub fn resolve(&self, identifier: &str) -> Option<ArtifactSet> {
let node = self.node(identifier)?;
let mut files: Vec<ArtifactFile> = node.files.clone();
files.extend(self.shared.files.iter().cloned());
Some(ArtifactSet::new(files))
}
/// Iterates node-local artifact sets by stable identifier.
pub fn iter(&self) -> impl Iterator<Item = (&str, &ArtifactSet)> {
self.nodes
.iter()
.map(|(identifier, artifacts)| (identifier.as_str(), artifacts))
}
}

View File

@ -0,0 +1,12 @@
mod artifacts;
mod materializer;
mod registrations;
mod sources;
pub use artifacts::MaterializedArtifacts;
pub use materializer::{
CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, MaterializedArtifactsSink,
PersistingSnapshotMaterializer, RegistrationSnapshotMaterializer,
};
pub use registrations::RegistrationSnapshot;
pub use sources::RegistrationConfigSource;

View File

@ -0,0 +1,279 @@
use std::{error::Error, sync::Mutex};
use serde_json::to_string;
use crate::{MaterializedArtifacts, RegistrationSnapshot};
/// Type-erased cfgsync adapter error used to preserve source context.
pub type DynCfgsyncError = Box<dyn Error + Send + Sync + 'static>;
/// Adapter contract for materializing a whole registration snapshot into
/// cfgsync artifacts.
pub trait RegistrationSnapshotMaterializer: Send + Sync {
/// Materializes the current registration set.
///
/// Implementations decide:
/// - when the current snapshot is ready to serve
/// - which per-node artifacts should be produced
/// - which shared artifacts should accompany every node
fn materialize_snapshot(
&self,
registrations: &RegistrationSnapshot,
) -> Result<MaterializationResult, DynCfgsyncError>;
}
/// Optional hook for persisting or publishing materialized cfgsync artifacts.
pub trait MaterializedArtifactsSink: Send + Sync {
/// Persists or publishes a ready materialization result.
fn persist(&self, artifacts: &MaterializedArtifacts) -> Result<(), DynCfgsyncError>;
}
/// Registration-driven materialization status.
#[derive(Debug, Clone, Default)]
pub enum MaterializationResult {
#[default]
NotReady,
Ready(MaterializedArtifacts),
}
impl MaterializationResult {
/// Creates a ready materialization result.
#[must_use]
pub fn ready(artifacts: MaterializedArtifacts) -> Self {
Self::Ready(artifacts)
}
/// Returns the ready artifacts when materialization succeeded.
#[must_use]
pub fn artifacts(&self) -> Option<&MaterializedArtifacts> {
match self {
Self::NotReady => None,
Self::Ready(artifacts) => Some(artifacts),
}
}
}
/// Snapshot materializer wrapper that caches the last materialized result.
pub struct CachedSnapshotMaterializer<M> {
inner: M,
cache: Mutex<Option<CachedSnapshot>>,
}
struct CachedSnapshot {
key: String,
result: MaterializationResult,
}
impl<M> CachedSnapshotMaterializer<M> {
/// Wraps a snapshot materializer with deterministic snapshot-result
/// caching.
#[must_use]
pub fn new(inner: M) -> Self {
Self {
inner,
cache: Mutex::new(None),
}
}
fn snapshot_key(registrations: &RegistrationSnapshot) -> Result<String, DynCfgsyncError> {
Ok(to_string(registrations)?)
}
}
impl<M> RegistrationSnapshotMaterializer for CachedSnapshotMaterializer<M>
where
M: RegistrationSnapshotMaterializer,
{
fn materialize_snapshot(
&self,
registrations: &RegistrationSnapshot,
) -> Result<MaterializationResult, DynCfgsyncError> {
let key = Self::snapshot_key(registrations)?;
{
let cache = self
.cache
.lock()
.expect("cfgsync snapshot cache should not be poisoned");
if let Some(cached) = &*cache
&& cached.key == key
{
return Ok(cached.result.clone());
}
}
let result = self.inner.materialize_snapshot(registrations)?;
let mut cache = self
.cache
.lock()
.expect("cfgsync snapshot cache should not be poisoned");
*cache = Some(CachedSnapshot {
key,
result: result.clone(),
});
Ok(result)
}
}
/// Snapshot materializer wrapper that persists ready results through a generic
/// sink. It only persists once per distinct registration snapshot.
pub struct PersistingSnapshotMaterializer<M, S> {
inner: M,
sink: S,
persisted_key: Mutex<Option<String>>,
}
impl<M, S> PersistingSnapshotMaterializer<M, S> {
/// Wraps a snapshot materializer with one-time persistence for each
/// distinct registration snapshot.
#[must_use]
pub fn new(inner: M, sink: S) -> Self {
Self {
inner,
sink,
persisted_key: Mutex::new(None),
}
}
}
impl<M, S> RegistrationSnapshotMaterializer for PersistingSnapshotMaterializer<M, S>
where
M: RegistrationSnapshotMaterializer,
S: MaterializedArtifactsSink,
{
fn materialize_snapshot(
&self,
registrations: &RegistrationSnapshot,
) -> Result<MaterializationResult, DynCfgsyncError> {
let key = CachedSnapshotMaterializer::<M>::snapshot_key(registrations)?;
let result = self.inner.materialize_snapshot(registrations)?;
let Some(artifacts) = result.artifacts() else {
return Ok(result);
};
{
let persisted_key = self
.persisted_key
.lock()
.expect("cfgsync persistence state should not be poisoned");
if persisted_key.as_deref() == Some(&key) {
return Ok(result);
}
}
self.sink.persist(artifacts)?;
let mut persisted_key = self
.persisted_key
.lock()
.expect("cfgsync persistence state should not be poisoned");
*persisted_key = Some(key);
Ok(result)
}
}
#[cfg(test)]
mod tests {
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
use cfgsync_artifacts::{ArtifactFile, ArtifactSet};
use super::{
CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, MaterializedArtifacts,
MaterializedArtifactsSink, PersistingSnapshotMaterializer,
RegistrationSnapshotMaterializer,
};
use crate::RegistrationSnapshot;
struct CountingMaterializer;
impl RegistrationSnapshotMaterializer for CountingMaterializer {
fn materialize_snapshot(
&self,
registrations: &RegistrationSnapshot,
) -> Result<MaterializationResult, DynCfgsyncError> {
if registrations.is_empty() {
return Ok(MaterializationResult::NotReady);
}
let nodes = registrations.iter().map(|registration| {
(
registration.identifier.clone(),
ArtifactSet::new(vec![ArtifactFile::new(
"/config.yaml".to_string(),
"ready: true".to_string(),
)]),
)
});
Ok(MaterializationResult::ready(
MaterializedArtifacts::from_nodes(nodes).with_shared(ArtifactSet::new(vec![
ArtifactFile::new("/shared.yaml".to_string(), "cluster: ready".to_string()),
])),
))
}
}
struct CountingSink {
writes: Arc<AtomicUsize>,
}
impl MaterializedArtifactsSink for CountingSink {
fn persist(&self, _artifacts: &MaterializedArtifacts) -> Result<(), DynCfgsyncError> {
self.writes.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
#[test]
fn cached_snapshot_materializer_reuses_previous_result() {
let materializer = CachedSnapshotMaterializer::new(CountingMaterializer);
let snapshot = RegistrationSnapshot::new(vec![cfgsync_core::NodeRegistration::new(
"node-1".to_string(),
"127.0.0.1".parse().expect("parse ip"),
)]);
let first = materializer
.materialize_snapshot(&snapshot)
.expect("first materialization");
let second = materializer
.materialize_snapshot(&snapshot)
.expect("second materialization");
assert!(matches!(first, MaterializationResult::Ready(_)));
assert!(matches!(second, MaterializationResult::Ready(_)));
}
#[test]
fn persisting_snapshot_materializer_writes_ready_snapshots_once() {
let writes = Arc::new(AtomicUsize::new(0));
let materializer = PersistingSnapshotMaterializer::new(
CountingMaterializer,
CountingSink {
writes: writes.clone(),
},
);
let snapshot = RegistrationSnapshot::new(vec![cfgsync_core::NodeRegistration::new(
"node-1".to_string(),
"127.0.0.1".parse().expect("parse ip"),
)]);
materializer
.materialize_snapshot(&snapshot)
.expect("first materialization");
materializer
.materialize_snapshot(&snapshot)
.expect("second materialization");
assert_eq!(writes.load(Ordering::SeqCst), 1);
}
}

View File

@ -0,0 +1,44 @@
use cfgsync_core::NodeRegistration;
use serde::Serialize;
/// Immutable view of registrations currently known to cfgsync.
#[derive(Debug, Clone, Default, Serialize)]
pub struct RegistrationSnapshot {
registrations: Vec<NodeRegistration>,
}
impl RegistrationSnapshot {
/// Creates a stable registration snapshot sorted by node identifier.
#[must_use]
pub fn new(mut registrations: Vec<NodeRegistration>) -> Self {
registrations.sort_by(|left, right| left.identifier.cmp(&right.identifier));
Self { registrations }
}
/// Returns the number of registrations in the snapshot.
#[must_use]
pub fn len(&self) -> usize {
self.registrations.len()
}
/// Returns `true` when the snapshot contains no registrations.
#[must_use]
pub fn is_empty(&self) -> bool {
self.registrations.is_empty()
}
/// Iterates registrations in deterministic identifier order.
#[must_use]
pub fn iter(&self) -> impl Iterator<Item = &NodeRegistration> {
self.registrations.iter()
}
/// Looks up a registration by its stable node identifier.
#[must_use]
pub fn get(&self, identifier: &str) -> Option<&NodeRegistration> {
self.registrations
.iter()
.find(|registration| registration.identifier == identifier)
}
}

View File

@ -0,0 +1,260 @@
use std::{collections::HashMap, sync::Mutex};
use cfgsync_core::{
CfgsyncErrorResponse, ConfigResolveResponse, NodeArtifactsPayload, NodeConfigSource,
NodeRegistration, RegisterNodeResponse,
};
use crate::{
DynCfgsyncError, MaterializationResult, MaterializedArtifacts, RegistrationSnapshot,
RegistrationSnapshotMaterializer,
};
impl RegistrationSnapshotMaterializer for MaterializedArtifacts {
fn materialize_snapshot(
&self,
_registrations: &RegistrationSnapshot,
) -> Result<MaterializationResult, DynCfgsyncError> {
Ok(MaterializationResult::ready(self.clone()))
}
}
/// Registration-aware source backed by a snapshot materializer.
pub struct RegistrationConfigSource<M> {
materializer: M,
registrations: Mutex<HashMap<String, NodeRegistration>>,
}
impl<M> RegistrationConfigSource<M> {
#[must_use]
pub fn new(materializer: M) -> Self {
Self {
materializer,
registrations: Mutex::new(HashMap::new()),
}
}
fn registration_for(&self, identifier: &str) -> Option<NodeRegistration> {
let registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
registrations.get(identifier).cloned()
}
fn registration_snapshot(&self) -> RegistrationSnapshot {
let registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
RegistrationSnapshot::new(registrations.values().cloned().collect())
}
}
impl<M> NodeConfigSource for RegistrationConfigSource<M>
where
M: RegistrationSnapshotMaterializer,
{
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse {
let mut registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
registrations.insert(registration.identifier.clone(), registration);
RegisterNodeResponse::Registered
}
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse {
let registration = match self.registration_for(&registration.identifier) {
Some(registration) => registration,
None => {
return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready(
&registration.identifier,
));
}
};
let registrations = self.registration_snapshot();
let materialized = match self.materializer.materialize_snapshot(&registrations) {
Ok(MaterializationResult::Ready(materialized)) => materialized,
Ok(MaterializationResult::NotReady) => {
return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready(
&registration.identifier,
));
}
Err(error) => {
return ConfigResolveResponse::Error(CfgsyncErrorResponse::internal(format!(
"failed to materialize config snapshot: {error}"
)));
}
};
match materialized.resolve(&registration.identifier) {
Some(config) => {
ConfigResolveResponse::Config(NodeArtifactsPayload::from_files(config.files))
}
None => ConfigResolveResponse::Error(CfgsyncErrorResponse::missing_config(
&registration.identifier,
)),
}
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use cfgsync_artifacts::{ArtifactFile, ArtifactSet};
use cfgsync_core::{
CfgsyncErrorCode, ConfigResolveResponse, NodeConfigSource, NodeRegistration,
};
use super::RegistrationConfigSource;
use crate::{
CachedSnapshotMaterializer, DynCfgsyncError, MaterializationResult, MaterializedArtifacts,
RegistrationSnapshot, RegistrationSnapshotMaterializer,
};
#[test]
fn registration_source_resolves_identifier() {
let artifacts = MaterializedArtifacts::from_nodes([(
"node-1".to_owned(),
ArtifactSet::new(vec![ArtifactFile::new(
"/config.yaml".to_string(),
"a: 1".to_string(),
)]),
)]);
let source = RegistrationConfigSource::new(artifacts);
let registration =
NodeRegistration::new("node-1".to_string(), "127.0.0.1".parse().expect("parse ip"));
let _ = source.register(registration.clone());
match source.resolve(&registration) {
ConfigResolveResponse::Config(payload) => assert_eq!(payload.files.len(), 1),
ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"),
}
}
#[test]
fn registration_source_reports_not_ready_before_registration() {
let artifacts = MaterializedArtifacts::from_nodes([(
"node-1".to_owned(),
ArtifactSet::new(vec![ArtifactFile::new(
"/config.yaml".to_string(),
"a: 1".to_string(),
)]),
)]);
let source = RegistrationConfigSource::new(artifacts);
let registration =
NodeRegistration::new("node-1".to_string(), "127.0.0.1".parse().expect("parse ip"));
match source.resolve(&registration) {
ConfigResolveResponse::Config(_) => panic!("expected not-ready"),
ConfigResolveResponse::Error(error) => {
assert!(matches!(error.code, CfgsyncErrorCode::NotReady));
}
}
}
struct ThresholdSnapshotMaterializer;
impl RegistrationSnapshotMaterializer for ThresholdSnapshotMaterializer {
fn materialize_snapshot(
&self,
registrations: &RegistrationSnapshot,
) -> Result<MaterializationResult, DynCfgsyncError> {
if registrations.len() < 2 {
return Ok(MaterializationResult::NotReady);
}
let nodes = registrations.iter().map(|registration| {
(
registration.identifier.clone(),
ArtifactSet::new(vec![ArtifactFile::new(
"/config.yaml".to_string(),
format!("id: {}", registration.identifier),
)]),
)
});
Ok(MaterializationResult::ready(
MaterializedArtifacts::from_nodes(nodes).with_shared(ArtifactSet::new(vec![
ArtifactFile::new("/shared.yaml".to_string(), "cluster: ready".to_string()),
])),
))
}
}
#[test]
fn registration_source_materializes_from_registration_snapshot() {
let source = RegistrationConfigSource::new(ThresholdSnapshotMaterializer);
let node_1 =
NodeRegistration::new("node-1".to_string(), "127.0.0.1".parse().expect("parse ip"));
let node_2 =
NodeRegistration::new("node-2".to_string(), "127.0.0.2".parse().expect("parse ip"));
let _ = source.register(node_1.clone());
match source.resolve(&node_1) {
ConfigResolveResponse::Config(_) => panic!("expected not-ready before threshold"),
ConfigResolveResponse::Error(error) => {
assert!(matches!(error.code, CfgsyncErrorCode::NotReady));
}
}
let _ = source.register(node_2.clone());
match source.resolve(&node_1) {
ConfigResolveResponse::Config(payload) => {
assert_eq!(payload.files.len(), 2);
assert_eq!(payload.files[0].path, "/config.yaml");
assert_eq!(payload.files[1].path, "/shared.yaml");
}
ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"),
}
}
struct CountingSnapshotMaterializer {
calls: AtomicUsize,
}
impl RegistrationSnapshotMaterializer for CountingSnapshotMaterializer {
fn materialize_snapshot(
&self,
registrations: &RegistrationSnapshot,
) -> Result<MaterializationResult, DynCfgsyncError> {
self.calls.fetch_add(1, Ordering::SeqCst);
Ok(MaterializationResult::ready(
MaterializedArtifacts::from_nodes(registrations.iter().map(|registration| {
(
registration.identifier.clone(),
ArtifactSet::new(vec![ArtifactFile::new(
"/config.yaml".to_string(),
format!("id: {}", registration.identifier),
)]),
)
})),
))
}
}
#[test]
fn cached_snapshot_materializer_reuses_previous_result() {
let source = RegistrationConfigSource::new(CachedSnapshotMaterializer::new(
CountingSnapshotMaterializer {
calls: AtomicUsize::new(0),
},
));
let registration =
NodeRegistration::new("node-1".to_string(), "127.0.0.1".parse().expect("parse ip"));
let _ = source.register(registration.clone());
let _ = source.resolve(&registration);
let _ = source.resolve(&registration);
}
}

View File

@ -0,0 +1,17 @@
[package]
categories = { workspace = true }
description = "App-agnostic cfgsync artifact model"
edition = { workspace = true }
keywords = { workspace = true }
license = { workspace = true }
name = "cfgsync-artifacts"
readme = { workspace = true }
repository = { workspace = true }
version = { workspace = true }
[lints]
workspace = true
[dependencies]
serde = { workspace = true }
thiserror = { workspace = true }

View File

@ -0,0 +1,61 @@
use serde::{Deserialize, Serialize};
use thiserror::Error;
/// Single file artifact delivered to a node.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ArtifactFile {
/// Destination path where content should be written.
pub path: String,
/// Raw file contents.
pub content: String,
}
impl ArtifactFile {
#[must_use]
pub fn new(path: String, content: String) -> Self {
Self { path, content }
}
}
/// Collection of files delivered together for one node.
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
pub struct ArtifactSet {
pub files: Vec<ArtifactFile>,
}
impl ArtifactSet {
#[must_use]
pub fn new(files: Vec<ArtifactFile>) -> Self {
Self { files }
}
#[must_use]
pub fn len(&self) -> usize {
self.files.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.files.is_empty()
}
/// Validates that no two files target the same output path.
pub fn ensure_unique_paths(&self) -> Result<(), ArtifactValidationError> {
let mut seen = std::collections::HashSet::new();
for file in &self.files {
if !seen.insert(file.path.clone()) {
return Err(ArtifactValidationError::DuplicatePath(file.path.clone()));
}
}
Ok(())
}
}
/// Validation failures for [`ArtifactSet`].
#[derive(Debug, Error)]
pub enum ArtifactValidationError {
#[error("duplicate artifact path `{0}`")]
DuplicatePath(String),
}

27
cfgsync/core/Cargo.toml Normal file
View File

@ -0,0 +1,27 @@
[package]
categories = { workspace = true }
description = { workspace = true }
edition = { workspace = true }
keywords = { workspace = true }
license = { workspace = true }
name = "cfgsync-core"
readme = { workspace = true }
repository = { workspace = true }
version = { workspace = true }
[lints]
workspace = true
[dependencies]
anyhow = "1"
axum = { default-features = false, features = ["http1", "http2", "json", "tokio"], version = "0.7.5" }
cfgsync-artifacts = { workspace = true }
reqwest = { features = ["json"], workspace = true }
serde = { default-features = false, features = ["derive"], version = "1" }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
thiserror = { workspace = true }
tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" }
[dev-dependencies]
tempfile = { workspace = true }

View File

@ -0,0 +1,44 @@
use serde::{Deserialize, Serialize};
use crate::NodeArtifactFile;
/// Static cfgsync artifact bundle.
///
/// This is the bundle-oriented source format used when all artifacts are known
/// ahead of time and no registration-time materialization is required.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeArtifactsBundle {
/// Per-node artifact entries keyed by identifier.
pub nodes: Vec<NodeArtifactsBundleEntry>,
/// Files that should be served alongside every node-specific entry.
#[serde(default)]
pub shared_files: Vec<NodeArtifactFile>,
}
impl NodeArtifactsBundle {
/// Creates a bundle with node-specific entries only.
#[must_use]
pub fn new(nodes: Vec<NodeArtifactsBundleEntry>) -> Self {
Self {
nodes,
shared_files: Vec::new(),
}
}
/// Attaches files that should be served alongside every node entry.
#[must_use]
pub fn with_shared_files(mut self, shared_files: Vec<NodeArtifactFile>) -> Self {
self.shared_files = shared_files;
self
}
}
/// One node entry inside a static cfgsync bundle.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeArtifactsBundleEntry {
/// Stable node identifier used by cfgsync lookup.
pub identifier: String,
/// Files that should be materialized for the node.
#[serde(default)]
pub files: Vec<NodeArtifactFile>,
}

157
cfgsync/core/src/client.rs Normal file
View File

@ -0,0 +1,157 @@
use serde::Serialize;
use thiserror::Error;
use crate::{CfgsyncErrorCode, CfgsyncErrorResponse, NodeArtifactsPayload, NodeRegistration};
/// cfgsync client-side request/response failures.
#[derive(Debug, Error)]
pub enum ClientError {
#[error("request failed: {0}")]
Request(#[from] reqwest::Error),
#[error("cfgsync server error {status}: {message}")]
Status {
status: reqwest::StatusCode,
message: String,
error: Option<CfgsyncErrorResponse>,
},
#[error("failed to parse cfgsync response: {0}")]
Decode(serde_json::Error),
}
/// Result of probing cfgsync for a node's current artifact availability.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConfigFetchStatus {
/// The node payload is ready and can be fetched successfully.
Ready,
/// The node has registered but artifacts are not ready yet.
NotReady,
/// The server does not know how to materialize artifacts for this node.
Missing,
}
/// Reusable HTTP client for cfgsync server endpoints.
#[derive(Clone, Debug)]
pub struct Client {
base_url: String,
http: reqwest::Client,
}
impl Client {
/// Creates a cfgsync client pointed at the given server base URL.
#[must_use]
pub fn new(base_url: String) -> Self {
let mut base_url = base_url;
while base_url.ends_with('/') {
base_url.pop();
}
Self {
base_url,
http: reqwest::Client::new(),
}
}
/// Returns the normalized cfgsync server base URL used for requests.
#[must_use]
pub fn base_url(&self) -> &str {
&self.base_url
}
/// Registers a node before requesting config.
pub async fn register_node(&self, payload: &NodeRegistration) -> Result<(), ClientError> {
self.post_status_only("/register", payload).await
}
/// Fetches `/node` payload for a node identifier.
pub async fn fetch_node_config(
&self,
payload: &NodeRegistration,
) -> Result<NodeArtifactsPayload, ClientError> {
self.post_json("/node", payload).await
}
/// Probes whether artifacts for a node are ready, missing, or still
/// pending.
pub async fn fetch_node_config_status(
&self,
payload: &NodeRegistration,
) -> Result<ConfigFetchStatus, ClientError> {
match self.fetch_node_config(payload).await {
Ok(_) => Ok(ConfigFetchStatus::Ready),
Err(ClientError::Status {
status,
error: Some(error),
..
}) => match error.code {
CfgsyncErrorCode::NotReady => Ok(ConfigFetchStatus::NotReady),
CfgsyncErrorCode::MissingConfig => Ok(ConfigFetchStatus::Missing),
CfgsyncErrorCode::Internal => Err(ClientError::Status {
status,
message: error.message.clone(),
error: Some(error),
}),
},
Err(error) => Err(error),
}
}
/// Posts JSON payload to a cfgsync endpoint and decodes cfgsync payload.
pub async fn post_json<P: Serialize>(
&self,
path: &str,
payload: &P,
) -> Result<NodeArtifactsPayload, ClientError> {
let url = self.endpoint_url(path);
let response = self.http.post(url).json(payload).send().await?;
let status = response.status();
let body = response.text().await?;
if !status.is_success() {
let error = serde_json::from_str::<CfgsyncErrorResponse>(&body).ok();
let message = error
.as_ref()
.map(|err| err.message.clone())
.unwrap_or_else(|| body.clone());
return Err(ClientError::Status {
status,
message,
error,
});
}
serde_json::from_str(&body).map_err(ClientError::Decode)
}
async fn post_status_only<P: Serialize>(
&self,
path: &str,
payload: &P,
) -> Result<(), ClientError> {
let url = self.endpoint_url(path);
let response = self.http.post(url).json(payload).send().await?;
let status = response.status();
let body = response.text().await?;
if !status.is_success() {
let error = serde_json::from_str::<CfgsyncErrorResponse>(&body).ok();
let message = error
.as_ref()
.map(|err| err.message.clone())
.unwrap_or_else(|| body.clone());
return Err(ClientError::Status {
status,
message,
error,
});
}
Ok(())
}
fn endpoint_url(&self, path: &str) -> String {
if path.starts_with('/') {
format!("{}{}", self.base_url, path)
} else {
format!("{}/{}", self.base_url, path)
}
}
}

View File

@ -0,0 +1,20 @@
#![doc(hidden)]
pub use crate::{
bundle::{NodeArtifactsBundle as CfgSyncBundle, NodeArtifactsBundleEntry as CfgSyncBundleNode},
client::Client as CfgSyncClient,
protocol::{
CfgsyncErrorCode as CfgSyncErrorCode, CfgsyncErrorResponse as CfgSyncErrorResponse,
ConfigResolveResponse as RepoResponse, NodeArtifactFile as CfgSyncFile,
NodeArtifactsPayload as CfgSyncPayload, RegisterNodeResponse as RegistrationResponse,
},
server::{
CfgsyncServerState as CfgSyncState, build_legacy_cfgsync_router as cfgsync_app,
serve_cfgsync as run_cfgsync,
},
source::{
BundleConfigSource as FileConfigProvider,
BundleConfigSourceError as FileConfigProviderError, NodeConfigSource as ConfigProvider,
StaticConfigSource as ConfigRepo,
},
};

26
cfgsync/core/src/lib.rs Normal file
View File

@ -0,0 +1,26 @@
pub mod bundle;
pub mod client;
#[doc(hidden)]
pub mod compat;
pub mod protocol;
pub mod render;
pub mod server;
pub mod source;
pub use bundle::{NodeArtifactsBundle, NodeArtifactsBundleEntry};
pub use client::{Client, ClientError, ConfigFetchStatus};
pub use protocol::{
CFGSYNC_SCHEMA_VERSION, CfgsyncErrorCode, CfgsyncErrorResponse, ConfigResolveResponse,
NodeArtifactFile, NodeArtifactsPayload, NodeRegistration, RegisterNodeResponse,
RegistrationPayload,
};
pub use render::{
CfgsyncConfigOverrides, CfgsyncOutputPaths, RenderedCfgsync, apply_cfgsync_overrides,
apply_timeout_floor, ensure_artifacts_path, load_cfgsync_template_yaml,
render_cfgsync_yaml_from_template, write_rendered_cfgsync,
};
pub use server::{CfgsyncServerState, RunCfgsyncError, build_cfgsync_router, serve_cfgsync};
pub use source::{
BundleConfigSource, BundleConfigSourceError, BundleLoadError, NodeConfigSource,
StaticConfigSource, bundle_to_payload_map, load_bundle,
};

View File

@ -0,0 +1,286 @@
use std::net::Ipv4Addr;
use cfgsync_artifacts::ArtifactFile;
use serde::{Deserialize, Deserializer, Serialize, Serializer, de::DeserializeOwned};
use serde_json::Value;
use thiserror::Error;
/// Schema version served by cfgsync payload responses.
pub const CFGSYNC_SCHEMA_VERSION: u16 = 1;
/// Canonical cfgsync file type used in payloads and bundles.
pub type NodeArtifactFile = ArtifactFile;
/// Payload returned by cfgsync server for one node.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeArtifactsPayload {
/// Payload schema version for compatibility checks.
pub schema_version: u16,
/// Files that must be written on the target node.
#[serde(default)]
pub files: Vec<NodeArtifactFile>,
}
/// Adapter-owned registration payload stored alongside a generic node identity.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct RegistrationPayload {
raw_json: Option<String>,
}
impl RegistrationPayload {
/// Creates an empty adapter-owned payload.
#[must_use]
pub fn new() -> Self {
Self::default()
}
/// Returns `true` when no adapter-owned payload was attached.
#[must_use]
pub fn is_empty(&self) -> bool {
self.raw_json.is_none()
}
/// Stores one typed adapter payload as opaque JSON.
pub fn from_serializable<T>(value: &T) -> Result<Self, serde_json::Error>
where
T: Serialize,
{
Ok(Self {
raw_json: Some(serde_json::to_string(value)?),
})
}
/// Stores a raw JSON payload after validating that it parses.
pub fn from_json_str(raw_json: &str) -> Result<Self, serde_json::Error> {
let value: Value = serde_json::from_str(raw_json)?;
Ok(Self {
raw_json: Some(serde_json::to_string(&value)?),
})
}
/// Deserializes the adapter-owned payload into the requested type.
pub fn deserialize<T>(&self) -> Result<Option<T>, serde_json::Error>
where
T: DeserializeOwned,
{
self.raw_json
.as_ref()
.map(|raw_json| serde_json::from_str(raw_json))
.transpose()
}
/// Returns the validated JSON representation stored in this payload.
#[must_use]
pub fn raw_json(&self) -> Option<&str> {
self.raw_json.as_deref()
}
}
impl Serialize for RegistrationPayload {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match self.raw_json.as_deref() {
Some(raw_json) => {
let value: Value =
serde_json::from_str(raw_json).map_err(serde::ser::Error::custom)?;
value.serialize(serializer)
}
None => serializer.serialize_none(),
}
}
}
impl<'de> Deserialize<'de> for RegistrationPayload {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let value = Option::<Value>::deserialize(deserializer)?;
let raw_json = value
.map(|value| serde_json::to_string(&value).map_err(serde::de::Error::custom))
.transpose()?;
Ok(Self { raw_json })
}
}
/// Node metadata recorded before config materialization.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct NodeRegistration {
/// Stable node identifier used for registration and artifact lookup.
pub identifier: String,
/// IPv4 address advertised as part of registration.
pub ip: Ipv4Addr,
/// Adapter-owned payload interpreted only by the app materializer.
#[serde(default, skip_serializing_if = "RegistrationPayload::is_empty")]
pub metadata: RegistrationPayload,
}
impl NodeRegistration {
/// Creates a registration with the generic node identity fields only.
#[must_use]
pub fn new(identifier: String, ip: Ipv4Addr) -> Self {
Self {
identifier,
ip,
metadata: RegistrationPayload::default(),
}
}
/// Attaches one typed adapter-owned payload to this registration.
pub fn with_metadata<T>(mut self, metadata: &T) -> Result<Self, serde_json::Error>
where
T: Serialize,
{
self.metadata = RegistrationPayload::from_serializable(metadata)?;
Ok(self)
}
/// Attaches a prebuilt registration payload to this registration.
#[must_use]
pub fn with_payload(mut self, payload: RegistrationPayload) -> Self {
self.metadata = payload;
self
}
}
impl NodeArtifactsPayload {
/// Creates a payload from the files that should be written for one node.
#[must_use]
pub fn from_files(files: Vec<NodeArtifactFile>) -> Self {
Self {
schema_version: CFGSYNC_SCHEMA_VERSION,
files,
}
}
/// Returns the files carried by this payload.
#[must_use]
pub fn files(&self) -> &[NodeArtifactFile] {
&self.files
}
/// Returns `true` when the payload carries no files.
#[must_use]
pub fn is_empty(&self) -> bool {
self.files.is_empty()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CfgsyncErrorCode {
/// No artifact payload is available for the requested node.
MissingConfig,
/// The node is registered but artifacts are not ready yet.
NotReady,
/// An unexpected server-side failure occurred.
Internal,
}
/// Structured error body returned by cfgsync server.
#[derive(Debug, Clone, Serialize, Deserialize, Error)]
#[error("{code:?}: {message}")]
pub struct CfgsyncErrorResponse {
/// Machine-readable failure category.
pub code: CfgsyncErrorCode,
/// Human-readable error details.
pub message: String,
}
impl CfgsyncErrorResponse {
/// Builds a missing-config error for one identifier.
#[must_use]
pub fn missing_config(identifier: &str) -> Self {
Self {
code: CfgsyncErrorCode::MissingConfig,
message: format!("missing config for host {identifier}"),
}
}
/// Builds a not-ready error for one identifier.
#[must_use]
pub fn not_ready(identifier: &str) -> Self {
Self {
code: CfgsyncErrorCode::NotReady,
message: format!("config for host {identifier} is not ready"),
}
}
/// Builds an internal cfgsync error.
#[must_use]
pub fn internal(message: String) -> Self {
Self {
code: CfgsyncErrorCode::Internal,
message,
}
}
}
/// Resolution outcome for a requested node identifier.
pub enum ConfigResolveResponse {
/// Artifacts are ready for the requested node.
Config(NodeArtifactsPayload),
/// Artifacts could not be resolved for the requested node.
Error(CfgsyncErrorResponse),
}
/// Outcome for a node registration request.
pub enum RegisterNodeResponse {
/// Registration was accepted.
Registered,
/// Registration failed.
Error(CfgsyncErrorResponse),
}
#[cfg(test)]
mod tests {
use serde::{Deserialize, Serialize};
use serde_json::Value;
use super::{NodeRegistration, RegistrationPayload};
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
struct ExampleRegistration {
network_port: u16,
service: String,
}
#[test]
fn registration_payload_round_trips_typed_value() {
let registration =
NodeRegistration::new("node-1".to_string(), "127.0.0.1".parse().expect("parse ip"))
.with_metadata(&ExampleRegistration {
network_port: 3000,
service: "blend".to_owned(),
})
.expect("serialize registration metadata");
let encoded = serde_json::to_value(&registration).expect("serialize registration");
let metadata = encoded.get("metadata").expect("registration metadata");
assert_eq!(metadata.get("network_port"), Some(&Value::from(3000u16)));
assert_eq!(metadata.get("service"), Some(&Value::from("blend")));
let decoded: NodeRegistration =
serde_json::from_value(encoded).expect("deserialize registration");
let typed: ExampleRegistration = decoded
.metadata
.deserialize()
.expect("deserialize metadata")
.expect("registration metadata value");
assert_eq!(typed.network_port, 3000);
assert_eq!(typed.service, "blend");
}
#[test]
fn registration_payload_accepts_raw_json() {
let payload =
RegistrationPayload::from_json_str(r#"{"network_port":3000}"#).expect("parse raw json");
assert_eq!(payload.raw_json(), Some(r#"{"network_port":3000}"#));
}
}

View File

@ -2,63 +2,88 @@ use std::{fs, path::Path};
use anyhow::{Context as _, Result};
use serde_yaml::{Mapping, Value};
use thiserror::Error;
/// Rendered cfgsync outputs written for server startup.
#[derive(Debug, Clone)]
pub struct RenderedCfgsync {
/// Serialized cfgsync server config YAML.
pub config_yaml: String,
pub bundle_yaml: String,
/// Serialized precomputed artifact YAML used by cfgsync runtime.
pub artifacts_yaml: String,
}
/// Output paths used when materializing rendered cfgsync files.
#[derive(Debug, Clone, Copy)]
pub struct CfgsyncOutputPaths<'a> {
/// Output path for the rendered server config YAML.
pub config_path: &'a Path,
pub bundle_path: &'a Path,
/// Output path for the rendered precomputed artifacts YAML.
pub artifacts_path: &'a Path,
}
pub fn ensure_bundle_path(bundle_path: &mut Option<String>, output_bundle_path: &Path) {
if bundle_path.is_some() {
/// Ensures artifacts path override exists, defaulting to the output artifacts
/// file name.
pub fn ensure_artifacts_path(artifacts_path: &mut Option<String>, output_artifacts_path: &Path) {
if artifacts_path.is_some() {
return;
}
*bundle_path = Some(
output_bundle_path
*artifacts_path = Some(
output_artifacts_path
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("cfgsync.bundle.yaml")
.unwrap_or("cfgsync.artifacts.yaml")
.to_string(),
);
}
/// Applies a minimum timeout floor to an existing timeout value.
pub fn apply_timeout_floor(timeout: &mut u64, min_timeout_secs: Option<u64>) {
if let Some(min_timeout_secs) = min_timeout_secs {
*timeout = (*timeout).max(min_timeout_secs);
}
}
/// Writes rendered cfgsync server and bundle YAML files.
pub fn write_rendered_cfgsync(
rendered: &RenderedCfgsync,
output: CfgsyncOutputPaths<'_>,
) -> Result<()> {
fs::write(output.config_path, &rendered.config_yaml)?;
fs::write(output.bundle_path, &rendered.bundle_yaml)?;
fs::write(output.artifacts_path, &rendered.artifacts_yaml)?;
Ok(())
}
/// Optional overrides applied to a cfgsync template document.
#[derive(Debug, Clone, Default)]
pub struct CfgsyncConfigOverrides {
/// Override for the HTTP listen port.
pub port: Option<u16>,
/// Override for the expected initial host count.
pub n_hosts: Option<usize>,
/// Minimum timeout to enforce on the rendered template.
pub timeout_floor_secs: Option<u64>,
pub bundle_path: Option<String>,
/// Override for the precomputed artifacts path written into cfgsync config.
pub artifacts_path: Option<String>,
/// Optional OTLP metrics endpoint injected into tracing settings.
pub metrics_otlp_ingest_url: Option<String>,
}
#[derive(Debug, Error)]
enum RenderTemplateError {
#[error("cfgsync template key `{key}` must be a YAML map")]
NonMappingEntry { key: String },
}
/// Loads cfgsync template YAML from disk.
pub fn load_cfgsync_template_yaml(path: &Path) -> Result<Value> {
let file = fs::File::open(path)
.with_context(|| format!("opening cfgsync template at {}", path.display()))?;
serde_yaml::from_reader(file).context("parsing cfgsync template")
}
/// Renders cfgsync config YAML by applying overrides to a template document.
pub fn render_cfgsync_yaml_from_template(
mut template: Value,
overrides: &CfgsyncConfigOverrides,
@ -67,6 +92,7 @@ pub fn render_cfgsync_yaml_from_template(
serde_yaml::to_string(&template).context("serializing rendered cfgsync config")
}
/// Applies cfgsync-specific override fields to a mutable YAML document.
pub fn apply_cfgsync_overrides(
template: &mut Value,
overrides: &CfgsyncConfigOverrides,
@ -87,10 +113,10 @@ pub fn apply_cfgsync_overrides(
);
}
if let Some(bundle_path) = &overrides.bundle_path {
if let Some(artifacts_path) = &overrides.artifacts_path {
root.insert(
Value::String("bundle_path".to_string()),
Value::String(bundle_path.clone()),
Value::String("artifacts_path".to_string()),
Value::String(artifacts_path.clone()),
);
}
@ -105,7 +131,7 @@ pub fn apply_cfgsync_overrides(
}
if let Some(endpoint) = &overrides.metrics_otlp_ingest_url {
let tracing_settings = nested_mapping_mut(root, "tracing_settings");
let tracing_settings = nested_mapping_mut(root, "tracing_settings")?;
tracing_settings.insert(
Value::String("metrics".to_string()),
parse_otlp_metrics_layer(endpoint)?,
@ -121,19 +147,20 @@ fn mapping_mut(value: &mut Value) -> Result<&mut Mapping> {
.context("cfgsync template root must be a YAML map")
}
fn nested_mapping_mut<'a>(mapping: &'a mut Mapping, key: &str) -> &'a mut Mapping {
let key = Value::String(key.to_string());
fn nested_mapping_mut<'a>(mapping: &'a mut Mapping, key: &str) -> Result<&'a mut Mapping> {
let key_name = key.to_owned();
let key = Value::String(key_name.clone());
let entry = mapping
.entry(key)
.or_insert_with(|| Value::Mapping(Mapping::new()));
if !entry.is_mapping() {
*entry = Value::Mapping(Mapping::new());
return Err(RenderTemplateError::NonMappingEntry { key: key_name }).map_err(Into::into);
}
entry
.as_mapping_mut()
.expect("mapping entry should always be a mapping")
.context("cfgsync template entry should be a YAML map")
}
fn parse_otlp_metrics_layer(endpoint: &str) -> Result<Value> {

291
cfgsync/core/src/server.rs Normal file
View File

@ -0,0 +1,291 @@
use std::{io, sync::Arc};
use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::post};
use thiserror::Error;
use crate::{
CfgsyncErrorCode, ConfigResolveResponse, NodeConfigSource, NodeRegistration,
RegisterNodeResponse,
};
/// Runtime state shared across cfgsync HTTP handlers.
pub struct CfgsyncServerState {
repo: Arc<dyn NodeConfigSource>,
}
impl CfgsyncServerState {
/// Wraps a node config source for use by cfgsync HTTP handlers.
#[must_use]
pub fn new(repo: Arc<dyn NodeConfigSource>) -> Self {
Self { repo }
}
}
/// Fatal runtime failures when serving cfgsync HTTP endpoints.
#[derive(Debug, Error)]
pub enum RunCfgsyncError {
#[error("failed to bind cfgsync server on {bind_addr}: {source}")]
Bind {
bind_addr: String,
#[source]
source: io::Error,
},
#[error("cfgsync server terminated unexpectedly: {source}")]
Serve {
#[source]
source: io::Error,
},
}
async fn node_config(
State(state): State<Arc<CfgsyncServerState>>,
Json(payload): Json<NodeRegistration>,
) -> impl IntoResponse {
let response = resolve_node_config_response(&state, &payload);
match response {
ConfigResolveResponse::Config(payload_data) => {
(StatusCode::OK, Json(payload_data)).into_response()
}
ConfigResolveResponse::Error(error) => {
let status = error_status(&error.code);
(status, Json(error)).into_response()
}
}
}
async fn register_node(
State(state): State<Arc<CfgsyncServerState>>,
Json(payload): Json<NodeRegistration>,
) -> impl IntoResponse {
match state.repo.register(payload) {
RegisterNodeResponse::Registered => StatusCode::ACCEPTED.into_response(),
RegisterNodeResponse::Error(error) => {
let status = error_status(&error.code);
(status, Json(error)).into_response()
}
}
}
fn resolve_node_config_response(
state: &CfgsyncServerState,
registration: &NodeRegistration,
) -> ConfigResolveResponse {
state.repo.resolve(registration)
}
fn error_status(code: &CfgsyncErrorCode) -> StatusCode {
match code {
CfgsyncErrorCode::MissingConfig => StatusCode::NOT_FOUND,
CfgsyncErrorCode::NotReady => StatusCode::TOO_EARLY,
CfgsyncErrorCode::Internal => StatusCode::INTERNAL_SERVER_ERROR,
}
}
/// Builds the primary cfgsync router with registration and node artifact
/// routes.
pub fn build_cfgsync_router(state: CfgsyncServerState) -> Router {
Router::new()
.route("/register", post(register_node))
.route("/node", post(node_config))
.with_state(Arc::new(state))
}
#[doc(hidden)]
/// Builds the legacy cfgsync router that still serves `/init-with-node`.
pub fn build_legacy_cfgsync_router(state: CfgsyncServerState) -> Router {
Router::new()
.route("/register", post(register_node))
.route("/node", post(node_config))
.route("/init-with-node", post(node_config))
.with_state(Arc::new(state))
}
/// Runs cfgsync HTTP server on the provided port until shutdown/error.
pub async fn serve_cfgsync(port: u16, state: CfgsyncServerState) -> Result<(), RunCfgsyncError> {
let app = build_cfgsync_router(state);
println!("Server running on http://0.0.0.0:{port}");
let bind_addr = format!("0.0.0.0:{port}");
let listener = tokio::net::TcpListener::bind(&bind_addr)
.await
.map_err(|source| RunCfgsyncError::Bind { bind_addr, source })?;
axum::serve(listener, app)
.await
.map_err(|source| RunCfgsyncError::Serve { source })?;
Ok(())
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc};
use axum::{Json, extract::State, http::StatusCode, response::IntoResponse};
use super::{CfgsyncServerState, NodeRegistration, node_config, register_node};
use crate::{
CFGSYNC_SCHEMA_VERSION, CfgsyncErrorCode, CfgsyncErrorResponse, ConfigResolveResponse,
NodeArtifactFile, NodeArtifactsPayload, NodeConfigSource, RegisterNodeResponse,
};
struct StaticProvider {
data: HashMap<String, NodeArtifactsPayload>,
}
impl NodeConfigSource for StaticProvider {
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse {
if self.data.contains_key(&registration.identifier) {
RegisterNodeResponse::Registered
} else {
RegisterNodeResponse::Error(CfgsyncErrorResponse::missing_config(
&registration.identifier,
))
}
}
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse {
self.data
.get(&registration.identifier)
.cloned()
.map_or_else(
|| {
ConfigResolveResponse::Error(CfgsyncErrorResponse::missing_config(
&registration.identifier,
))
},
ConfigResolveResponse::Config,
)
}
}
struct RegistrationAwareProvider {
data: HashMap<String, NodeArtifactsPayload>,
registrations: std::sync::Mutex<HashMap<String, NodeRegistration>>,
}
impl NodeConfigSource for RegistrationAwareProvider {
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse {
if !self.data.contains_key(&registration.identifier) {
return RegisterNodeResponse::Error(CfgsyncErrorResponse::missing_config(
&registration.identifier,
));
}
let mut registrations = self
.registrations
.lock()
.expect("test registration store should not be poisoned");
registrations.insert(registration.identifier.clone(), registration);
RegisterNodeResponse::Registered
}
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse {
let registrations = self
.registrations
.lock()
.expect("test registration store should not be poisoned");
if !registrations.contains_key(&registration.identifier) {
return ConfigResolveResponse::Error(CfgsyncErrorResponse::not_ready(
&registration.identifier,
));
}
self.data
.get(&registration.identifier)
.cloned()
.map_or_else(
|| {
ConfigResolveResponse::Error(CfgsyncErrorResponse::missing_config(
&registration.identifier,
))
},
ConfigResolveResponse::Config,
)
}
}
fn sample_payload() -> NodeArtifactsPayload {
NodeArtifactsPayload {
schema_version: CFGSYNC_SCHEMA_VERSION,
files: vec![NodeArtifactFile::new(
"/app-config.yaml".to_string(),
"app: test".to_string(),
)],
}
}
#[tokio::test]
async fn node_config_resolves_from_non_tf_provider() {
let mut data = HashMap::new();
data.insert("node-a".to_owned(), sample_payload());
let provider = Arc::new(RegistrationAwareProvider {
data,
registrations: std::sync::Mutex::new(HashMap::new()),
});
let state = Arc::new(CfgsyncServerState::new(provider));
let payload =
NodeRegistration::new("node-a".to_string(), "127.0.0.1".parse().expect("valid ip"));
let _ = register_node(State(state.clone()), Json(payload.clone()))
.await
.into_response();
let response = node_config(State(state), Json(payload))
.await
.into_response();
assert_eq!(response.status(), StatusCode::OK);
}
#[tokio::test]
async fn node_config_returns_not_found_for_unknown_identifier() {
let provider = Arc::new(StaticProvider {
data: HashMap::new(),
});
let state = Arc::new(CfgsyncServerState::new(provider));
let payload = NodeRegistration::new(
"missing-node".to_string(),
"127.0.0.1".parse().expect("valid ip"),
);
let response = node_config(State(state), Json(payload))
.await
.into_response();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
}
#[test]
fn missing_config_error_uses_expected_code() {
let error = CfgsyncErrorResponse::missing_config("missing-node");
assert!(matches!(error.code, CfgsyncErrorCode::MissingConfig));
}
#[tokio::test]
async fn node_config_returns_not_ready_before_registration() {
let mut data = HashMap::new();
data.insert("node-a".to_owned(), sample_payload());
let provider = Arc::new(RegistrationAwareProvider {
data,
registrations: std::sync::Mutex::new(HashMap::new()),
});
let state = Arc::new(CfgsyncServerState::new(provider));
let payload =
NodeRegistration::new("node-a".to_string(), "127.0.0.1".parse().expect("valid ip"));
let response = node_config(State(state), Json(payload))
.await
.into_response();
assert_eq!(response.status(), StatusCode::TOO_EARLY);
}
}

278
cfgsync/core/src/source.rs Normal file
View File

@ -0,0 +1,278 @@
use std::{collections::HashMap, fs, path::Path, sync::Arc};
use thiserror::Error;
use crate::{
NodeArtifactsBundle, NodeArtifactsBundleEntry, NodeArtifactsPayload, NodeRegistration,
RegisterNodeResponse, protocol::ConfigResolveResponse,
};
/// Source of cfgsync node payloads.
pub trait NodeConfigSource: Send + Sync {
/// Records a node registration before config resolution.
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse;
/// Resolves the current artifact payload for a previously registered node.
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse;
}
/// In-memory map-backed source used by cfgsync server state.
pub struct StaticConfigSource {
configs: HashMap<String, NodeArtifactsPayload>,
}
impl StaticConfigSource {
/// Builds an in-memory source from fully formed payloads.
#[must_use]
pub fn from_payloads(configs: HashMap<String, NodeArtifactsPayload>) -> Arc<Self> {
Arc::new(Self { configs })
}
/// Builds an in-memory source from a static bundle document.
#[must_use]
pub fn from_bundle(bundle: NodeArtifactsBundle) -> Arc<Self> {
Self::from_payloads(bundle_to_payload_map(bundle))
}
}
impl NodeConfigSource for StaticConfigSource {
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse {
if self.configs.contains_key(&registration.identifier) {
RegisterNodeResponse::Registered
} else {
RegisterNodeResponse::Error(crate::CfgsyncErrorResponse::missing_config(
&registration.identifier,
))
}
}
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse {
self.configs
.get(&registration.identifier)
.cloned()
.map_or_else(
|| {
ConfigResolveResponse::Error(crate::CfgsyncErrorResponse::missing_config(
&registration.identifier,
))
},
ConfigResolveResponse::Config,
)
}
}
#[derive(Debug, Error)]
pub enum BundleLoadError {
#[error("reading cfgsync bundle {path}: {source}")]
ReadBundle {
path: String,
#[source]
source: std::io::Error,
},
#[error("parsing cfgsync bundle {path}: {source}")]
ParseBundle {
path: String,
#[source]
source: serde_yaml::Error,
},
}
/// Converts a static bundle into the node payload map used by static sources.
#[must_use]
pub fn bundle_to_payload_map(bundle: NodeArtifactsBundle) -> HashMap<String, NodeArtifactsPayload> {
let shared_files = bundle.shared_files;
bundle
.nodes
.into_iter()
.map(|node| {
let NodeArtifactsBundleEntry { identifier, files } = node;
let mut payload_files = files;
payload_files.extend(shared_files.clone());
(identifier, NodeArtifactsPayload::from_files(payload_files))
})
.collect()
}
/// Loads a cfgsync bundle YAML file from disk.
pub fn load_bundle(path: &Path) -> Result<NodeArtifactsBundle, BundleLoadError> {
let path_string = path.display().to_string();
let raw = fs::read_to_string(path).map_err(|source| BundleLoadError::ReadBundle {
path: path_string.clone(),
source,
})?;
serde_yaml::from_str(&raw).map_err(|source| BundleLoadError::ParseBundle {
path: path_string,
source,
})
}
/// Failures when loading a bundle-backed cfgsync source.
#[derive(Debug, Error)]
pub enum BundleConfigSourceError {
#[error("failed to read cfgsync bundle at {path}: {source}")]
Read {
path: String,
#[source]
source: std::io::Error,
},
#[error("failed to parse cfgsync bundle at {path}: {source}")]
Parse {
path: String,
#[source]
source: serde_yaml::Error,
},
}
/// YAML bundle-backed source implementation.
pub struct BundleConfigSource {
inner: StaticConfigSource,
}
impl BundleConfigSource {
/// Loads source state from a cfgsync bundle YAML file.
pub fn from_yaml_file(path: &Path) -> Result<Self, BundleConfigSourceError> {
let raw = fs::read_to_string(path).map_err(|source| BundleConfigSourceError::Read {
path: path.display().to_string(),
source,
})?;
let bundle: NodeArtifactsBundle =
serde_yaml::from_str(&raw).map_err(|source| BundleConfigSourceError::Parse {
path: path.display().to_string(),
source,
})?;
let configs = bundle
.nodes
.into_iter()
.map(payload_from_bundle_node)
.collect();
Ok(Self {
inner: StaticConfigSource { configs },
})
}
}
impl NodeConfigSource for BundleConfigSource {
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse {
self.inner.register(registration)
}
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse {
self.inner.resolve(registration)
}
}
fn payload_from_bundle_node(node: NodeArtifactsBundleEntry) -> (String, NodeArtifactsPayload) {
(
node.identifier,
NodeArtifactsPayload::from_files(node.files),
)
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, io::Write as _};
use tempfile::NamedTempFile;
use super::{BundleConfigSource, StaticConfigSource};
use crate::{
CFGSYNC_SCHEMA_VERSION, CfgsyncErrorCode, ConfigResolveResponse, NodeArtifactFile,
NodeArtifactsPayload, NodeConfigSource, NodeRegistration,
};
fn sample_payload() -> NodeArtifactsPayload {
NodeArtifactsPayload::from_files(vec![NodeArtifactFile::new(
"/config.yaml".to_string(),
"key: value".to_string(),
)])
}
#[test]
fn resolves_existing_identifier() {
let mut configs = HashMap::new();
configs.insert("node-1".to_owned(), sample_payload());
let repo = StaticConfigSource { configs };
match repo.resolve(&NodeRegistration::new(
"node-1".to_string(),
"127.0.0.1".parse().expect("parse ip"),
)) {
ConfigResolveResponse::Config(payload) => {
assert_eq!(payload.schema_version, CFGSYNC_SCHEMA_VERSION);
assert_eq!(payload.files.len(), 1);
assert_eq!(payload.files[0].path, "/config.yaml");
}
ConfigResolveResponse::Error(error) => panic!("expected config response, got {error}"),
}
}
#[test]
fn reports_missing_identifier() {
let repo = StaticConfigSource {
configs: HashMap::new(),
};
match repo.resolve(&NodeRegistration::new(
"unknown-node".to_string(),
"127.0.0.1".parse().expect("parse ip"),
)) {
ConfigResolveResponse::Config(_) => panic!("expected missing-config error"),
ConfigResolveResponse::Error(error) => {
assert!(matches!(error.code, CfgsyncErrorCode::MissingConfig));
assert!(error.message.contains("unknown-node"));
}
}
}
#[test]
fn loads_file_provider_bundle() {
let mut bundle_file = NamedTempFile::new().expect("create temp bundle");
let yaml = r#"
nodes:
- identifier: node-1
files:
- path: /config.yaml
content: "a: 1"
"#;
bundle_file
.write_all(yaml.as_bytes())
.expect("write bundle yaml");
let provider =
BundleConfigSource::from_yaml_file(bundle_file.path()).expect("load file provider");
let _ = provider.register(NodeRegistration::new(
"node-1".to_string(),
"127.0.0.1".parse().expect("parse ip"),
));
match provider.resolve(&NodeRegistration::new(
"node-1".to_string(),
"127.0.0.1".parse().expect("parse ip"),
)) {
ConfigResolveResponse::Config(payload) => assert_eq!(payload.files.len(), 1),
ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"),
}
}
#[test]
fn resolve_accepts_known_registration_without_gating() {
let mut configs = HashMap::new();
configs.insert("node-1".to_owned(), sample_payload());
let repo = StaticConfigSource { configs };
match repo.resolve(&NodeRegistration::new(
"node-1".to_string(),
"127.0.0.1".parse().expect("parse ip"),
)) {
ConfigResolveResponse::Config(_) => {}
ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"),
}
}
}

View File

@ -0,0 +1,29 @@
[package]
categories = { workspace = true }
description = { workspace = true }
edition = { workspace = true }
keywords = { workspace = true }
license = { workspace = true }
name = "cfgsync-runtime"
readme = { workspace = true }
repository = { workspace = true }
version = { workspace = true }
[lints]
workspace = true
[dependencies]
anyhow = "1"
axum = { default-features = false, features = ["http1", "http2", "tokio"], version = "0.7.5" }
cfgsync-adapter = { workspace = true }
cfgsync-artifacts = { workspace = true }
cfgsync-core = { workspace = true }
clap = { version = "4", features = ["derive"] }
serde = { workspace = true }
serde_yaml = { workspace = true }
thiserror = { workspace = true }
tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" }
tracing = { workspace = true }
[dev-dependencies]
tempfile = { workspace = true }

View File

@ -0,0 +1,61 @@
use cfgsync_adapter::{
DynCfgsyncError, MaterializationResult, MaterializedArtifacts, RegistrationSnapshot,
RegistrationSnapshotMaterializer,
};
use cfgsync_artifacts::{ArtifactFile, ArtifactSet};
use cfgsync_core::NodeRegistration;
use cfgsync_runtime::{Client, OutputMap, serve};
use tempfile::tempdir;
use tokio::time::{Duration, sleep};
struct ExampleMaterializer;
impl RegistrationSnapshotMaterializer for ExampleMaterializer {
fn materialize_snapshot(
&self,
registrations: &RegistrationSnapshot,
) -> Result<MaterializationResult, DynCfgsyncError> {
if registrations.is_empty() {
return Ok(MaterializationResult::NotReady);
}
let nodes = registrations.iter().map(|registration| {
(
registration.identifier.clone(),
ArtifactSet::new(vec![ArtifactFile::new(
"/config.yaml".to_string(),
format!("id: {}\n", registration.identifier),
)]),
)
});
Ok(MaterializationResult::ready(
MaterializedArtifacts::from_nodes(nodes),
))
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let port = 4400;
let server = tokio::spawn(async move { serve(port, ExampleMaterializer).await });
// Give the server a moment to bind before the client registers.
sleep(Duration::from_millis(100)).await;
let tempdir = tempdir()?;
let outputs = OutputMap::under(tempdir.path().to_path_buf());
let registration = NodeRegistration::new("node-1".to_string(), "127.0.0.1".parse()?);
Client::new("http://127.0.0.1:4400")
.fetch_and_write(&registration, &outputs)
.await?;
println!(
"{}",
std::fs::read_to_string(tempdir.path().join("config.yaml"))?
);
server.abort();
Ok(())
}

View File

@ -0,0 +1,79 @@
use cfgsync_adapter::MaterializedArtifacts;
use cfgsync_artifacts::{ArtifactFile, ArtifactSet};
use cfgsync_core::NodeRegistration;
use cfgsync_runtime::{Client, OutputMap, serve};
use tempfile::tempdir;
use tokio::time::{Duration, sleep};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let port = 4401;
let artifacts = MaterializedArtifacts::from_nodes([
(
"node-1".to_owned(),
ArtifactSet::new(vec![ArtifactFile::new(
"/config.yaml".to_string(),
"id: node-1\n".to_string(),
)]),
),
(
"node-2".to_owned(),
ArtifactSet::new(vec![ArtifactFile::new(
"/config.yaml".to_string(),
"id: node-2\n".to_string(),
)]),
),
])
.with_shared(ArtifactSet::new(vec![ArtifactFile::new(
"/shared/cluster.yaml".to_string(),
"cluster: demo\n".to_string(),
)]));
let server = tokio::spawn(async move { serve(port, artifacts).await });
// Give the server a moment to bind before clients register.
sleep(Duration::from_millis(100)).await;
let node_1_dir = tempdir()?;
let node_1_outputs = OutputMap::config_and_shared(
node_1_dir.path().join("config.yaml"),
node_1_dir.path().join("shared"),
);
let node_1 = NodeRegistration::new("node-1".to_string(), "127.0.0.1".parse()?);
Client::new("http://127.0.0.1:4401")
.fetch_and_write(&node_1, &node_1_outputs)
.await?;
println!(
"node-1 config:\n{}",
std::fs::read_to_string(node_1_dir.path().join("config.yaml"))?
);
// A later node still uses the same registration/fetch flow. The artifacts
// were already known; registration only gates delivery.
sleep(Duration::from_millis(250)).await;
let node_2_dir = tempdir()?;
let node_2_outputs = OutputMap::config_and_shared(
node_2_dir.path().join("config.yaml"),
node_2_dir.path().join("shared"),
);
let node_2 = NodeRegistration::new("node-2".to_string(), "127.0.0.2".parse()?);
Client::new("http://127.0.0.1:4401")
.fetch_and_write(&node_2, &node_2_outputs)
.await?;
println!(
"node-2 config:\n{}",
std::fs::read_to_string(node_2_dir.path().join("config.yaml"))?
);
println!(
"shared artifact:\n{}",
std::fs::read_to_string(node_2_dir.path().join("shared/shared/cluster.yaml"))?
);
server.abort();
Ok(())
}

View File

@ -0,0 +1,81 @@
use cfgsync_adapter::{
DynCfgsyncError, MaterializationResult, MaterializedArtifacts, RegistrationSnapshot,
RegistrationSnapshotMaterializer,
};
use cfgsync_artifacts::{ArtifactFile, ArtifactSet};
use cfgsync_core::NodeRegistration;
use cfgsync_runtime::{Client, OutputMap, serve};
use tempfile::tempdir;
use tokio::time::{Duration, sleep};
struct ThresholdMaterializer;
impl RegistrationSnapshotMaterializer for ThresholdMaterializer {
fn materialize_snapshot(
&self,
registrations: &RegistrationSnapshot,
) -> Result<MaterializationResult, DynCfgsyncError> {
if registrations.len() < 2 {
return Ok(MaterializationResult::NotReady);
}
let nodes = registrations.iter().map(|registration| {
(
registration.identifier.clone(),
ArtifactSet::new(vec![ArtifactFile::new(
"/config.yaml".to_string(),
format!("id: {}\ncluster_ready: true\n", registration.identifier),
)]),
)
});
Ok(MaterializationResult::ready(
MaterializedArtifacts::from_nodes(nodes),
))
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let port = 4402;
let server = tokio::spawn(async move { serve(port, ThresholdMaterializer).await });
sleep(Duration::from_millis(100)).await;
let waiting_dir = tempdir()?;
let waiting_outputs = OutputMap::under(waiting_dir.path().to_path_buf());
let waiting_node = NodeRegistration::new("node-1".to_string(), "127.0.0.1".parse()?);
let waiting_client = Client::new("http://127.0.0.1:4402");
let waiting_task = tokio::spawn(async move {
waiting_client
.fetch_and_write(&waiting_node, &waiting_outputs)
.await
});
// node-1 is now polling. The materializer will keep returning NotReady
// until node-2 registers.
sleep(Duration::from_millis(400)).await;
let second_dir = tempdir()?;
let second_outputs = OutputMap::under(second_dir.path().to_path_buf());
let second_node = NodeRegistration::new("node-2".to_string(), "127.0.0.2".parse()?);
Client::new("http://127.0.0.1:4402")
.fetch_and_write(&second_node, &second_outputs)
.await?;
waiting_task.await??;
println!(
"node-1 config after threshold reached:\n{}",
std::fs::read_to_string(waiting_dir.path().join("config.yaml"))?
);
println!(
"node-2 config:\n{}",
std::fs::read_to_string(second_dir.path().join("config.yaml"))?
);
server.abort();
Ok(())
}

View File

@ -1,6 +1,6 @@
use std::{env, process};
use cfgsync_runtime::run_cfgsync_client_from_env;
use cfgsync_runtime::run_client_from_env;
const CFGSYNC_PORT_ENV: &str = "LOGOS_BLOCKCHAIN_CFGSYNC_PORT";
const DEFAULT_CFGSYNC_PORT: u16 = 4400;
@ -14,7 +14,7 @@ fn cfgsync_port() -> u16 {
#[tokio::main]
async fn main() {
if let Err(err) = run_cfgsync_client_from_env(cfgsync_port()).await {
if let Err(err) = run_client_from_env(cfgsync_port()).await {
eprintln!("Error: {err}");
process::exit(1);
}

View File

@ -1,10 +1,10 @@
use std::path::PathBuf;
use cfgsync_runtime::run_cfgsync_server;
use cfgsync_runtime::serve_from_config;
use clap::Parser;
#[derive(Parser, Debug)]
#[command(about = "CfgSync")]
#[command(about = "Cfgsync server")]
struct Args {
config: PathBuf,
}
@ -12,5 +12,5 @@ struct Args {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
run_cfgsync_server(&args.config).await
serve_from_config(&args.config).await
}

View File

@ -0,0 +1,419 @@
use std::{
collections::HashMap,
env, fs,
net::Ipv4Addr,
path::{Path, PathBuf},
};
use anyhow::{Context as _, Result, bail};
use cfgsync_core::{
CFGSYNC_SCHEMA_VERSION, Client as ProtocolClient, NodeArtifactFile, NodeArtifactsPayload,
NodeRegistration, RegistrationPayload,
};
use thiserror::Error;
use tokio::time::{Duration, sleep};
use tracing::info;
const FETCH_ATTEMPTS: usize = 5;
const FETCH_RETRY_DELAY: Duration = Duration::from_millis(250);
/// Output routing for fetched artifact files.
#[derive(Debug, Clone, Default)]
pub struct OutputMap {
routes: HashMap<String, PathBuf>,
fallback: Option<FallbackRoute>,
}
#[derive(Debug, Clone)]
enum FallbackRoute {
Under(PathBuf),
Shared { dir: PathBuf },
}
impl OutputMap {
/// Creates an empty artifact output map.
#[must_use]
pub fn new() -> Self {
Self::default()
}
/// Routes one artifact path from the payload to a local output path.
#[must_use]
pub fn route(mut self, artifact_path: String, output_path: PathBuf) -> Self {
self.routes.insert(artifact_path, output_path);
self
}
/// Writes payload files under `root`, preserving each artifact path.
///
/// For example, `/config.yaml` is written to `<root>/config.yaml` and
/// `shared/deployment-settings.yaml` is written to
/// `<root>/shared/deployment-settings.yaml`.
#[must_use]
pub fn under(root: PathBuf) -> Self {
Self {
routes: HashMap::new(),
fallback: Some(FallbackRoute::Under(root)),
}
}
/// Writes the node config to `config_path` and all other files under
/// `shared_dir`, preserving their relative artifact paths.
#[must_use]
pub fn config_and_shared(config_path: PathBuf, shared_dir: PathBuf) -> Self {
Self::default()
.route("/config.yaml".to_string(), config_path.clone())
.route("config.yaml".to_string(), config_path)
.with_fallback(FallbackRoute::Shared { dir: shared_dir })
}
fn resolve_path(&self, file: &NodeArtifactFile) -> PathBuf {
self.routes
.get(&file.path)
.cloned()
.or_else(|| {
self.fallback
.as_ref()
.map(|fallback| fallback.resolve(&file.path))
})
.unwrap_or_else(|| PathBuf::from(&file.path))
}
fn with_fallback(mut self, fallback: FallbackRoute) -> Self {
self.fallback = Some(fallback);
self
}
}
impl FallbackRoute {
fn resolve(&self, artifact_path: &str) -> PathBuf {
let relative = artifact_path.trim_start_matches('/');
match self {
FallbackRoute::Under(root) => root.join(relative),
FallbackRoute::Shared { dir } => dir.join(relative),
}
}
}
/// Runtime-oriented cfgsync client that handles registration, fetch, and local
/// artifact materialization.
#[derive(Debug, Clone)]
pub struct Client {
inner: ProtocolClient,
}
impl Client {
/// Creates a runtime client that talks to the cfgsync server at
/// `server_addr`.
#[must_use]
pub fn new(server_addr: &str) -> Self {
Self {
inner: ProtocolClient::new(server_addr.to_string()),
}
}
/// Registers a node and fetches its artifact payload from cfgsync.
pub async fn register_and_fetch(
&self,
registration: &NodeRegistration,
) -> Result<NodeArtifactsPayload> {
self.register_node(registration).await?;
let payload = self
.fetch_with_retry(registration)
.await
.context("fetching node artifacts")?;
ensure_schema_version(&payload)?;
Ok(payload)
}
/// Registers a node, fetches its artifact payload, and writes the result
/// using the provided output routing policy.
pub async fn fetch_and_write(
&self,
registration: &NodeRegistration,
outputs: &OutputMap,
) -> Result<()> {
let payload = self.register_and_fetch(registration).await?;
let files = collect_payload_files(&payload)?;
for file in files {
write_file(file, outputs)?;
}
info!(files = files.len(), "cfgsync files saved");
Ok(())
}
async fn fetch_with_retry(
&self,
registration: &NodeRegistration,
) -> Result<NodeArtifactsPayload> {
for attempt in 1..=FETCH_ATTEMPTS {
match self.fetch_once(registration).await {
Ok(config) => return Ok(config),
Err(error) => {
if attempt == FETCH_ATTEMPTS {
return Err(error).with_context(|| {
format!("fetching node artifacts after {attempt} attempts")
});
}
sleep(FETCH_RETRY_DELAY).await;
}
}
}
unreachable!("cfgsync fetch loop always returns before exhausting attempts");
}
async fn fetch_once(&self, registration: &NodeRegistration) -> Result<NodeArtifactsPayload> {
self.inner
.fetch_node_config(registration)
.await
.map_err(Into::into)
}
async fn register_node(&self, registration: &NodeRegistration) -> Result<()> {
for attempt in 1..=FETCH_ATTEMPTS {
match self.inner.register_node(registration).await {
Ok(()) => {
info!(identifier = %registration.identifier, "cfgsync node registered");
return Ok(());
}
Err(error) => {
if attempt == FETCH_ATTEMPTS {
return Err(error).with_context(|| {
format!("registering node with cfgsync after {attempt} attempts")
});
}
sleep(FETCH_RETRY_DELAY).await;
}
}
}
unreachable!("cfgsync register loop always returns before exhausting attempts");
}
}
#[derive(Debug, Error)]
enum ClientEnvError {
#[error("CFG_HOST_IP `{value}` is not a valid IPv4 address")]
InvalidIp { value: String },
}
fn ensure_schema_version(config: &NodeArtifactsPayload) -> Result<()> {
if config.schema_version != CFGSYNC_SCHEMA_VERSION {
bail!(
"unsupported cfgsync payload schema version {}, expected {}",
config.schema_version,
CFGSYNC_SCHEMA_VERSION
);
}
Ok(())
}
fn collect_payload_files(config: &NodeArtifactsPayload) -> Result<&[NodeArtifactFile]> {
if config.is_empty() {
bail!("cfgsync payload contains no files");
}
Ok(config.files())
}
fn write_file(file: &NodeArtifactFile, outputs: &OutputMap) -> Result<()> {
let path = outputs.resolve_path(file);
ensure_parent_dir(&path)?;
fs::write(&path, &file.content).with_context(|| format!("writing {}", path.display()))?;
info!(path = %path.display(), "cfgsync file saved");
Ok(())
}
fn ensure_parent_dir(path: &Path) -> Result<()> {
let Some(parent) = path.parent() else {
return Ok(());
};
if parent.as_os_str().is_empty() {
return Ok(());
}
fs::create_dir_all(parent)
.with_context(|| format!("creating parent directory {}", parent.display()))?;
Ok(())
}
/// Resolves runtime client inputs from environment and materializes node files.
pub async fn run_client_from_env(default_port: u16) -> Result<()> {
let server_addr =
env::var("CFG_SERVER_ADDR").unwrap_or_else(|_| format!("http://127.0.0.1:{default_port}"));
let ip = parse_ip_env(&env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_owned()))?;
let identifier =
env::var("CFG_HOST_IDENTIFIER").unwrap_or_else(|_| "unidentified-node".to_owned());
let metadata = parse_registration_payload_env()?;
let outputs = build_output_map();
Client::new(&server_addr)
.fetch_and_write(
&NodeRegistration::new(identifier, ip).with_payload(metadata),
&outputs,
)
.await
}
fn parse_ip_env(ip_str: &str) -> Result<Ipv4Addr> {
ip_str
.parse()
.map_err(|_| ClientEnvError::InvalidIp {
value: ip_str.to_owned(),
})
.map_err(Into::into)
}
fn parse_registration_payload_env() -> Result<RegistrationPayload> {
let Ok(raw) = env::var("CFG_REGISTRATION_METADATA_JSON") else {
return Ok(RegistrationPayload::default());
};
parse_registration_payload(&raw)
}
fn parse_registration_payload(raw: &str) -> Result<RegistrationPayload> {
RegistrationPayload::from_json_str(raw).context("parsing CFG_REGISTRATION_METADATA_JSON")
}
fn build_output_map() -> OutputMap {
let mut outputs = OutputMap::default();
if let Ok(path) = env::var("CFG_FILE_PATH") {
let path = PathBuf::from(path);
outputs = outputs
.route("/config.yaml".to_string(), path.clone())
.route("config.yaml".to_string(), path);
}
if let Ok(path) = env::var("CFG_DEPLOYMENT_PATH") {
let path = PathBuf::from(path);
outputs = outputs
.route("/deployment.yaml".to_string(), path.clone())
.route("deployment-settings.yaml".to_string(), path.clone())
.route("/deployment-settings.yaml".to_string(), path);
}
outputs
}
#[cfg(test)]
mod tests {
use cfgsync_core::{
CfgsyncServerState, NodeArtifactsBundle, NodeArtifactsBundleEntry, StaticConfigSource,
};
use tempfile::tempdir;
use super::*;
#[tokio::test]
async fn client_materializes_multi_file_payload_from_cfgsync_server() {
let dir = tempdir().expect("create temp dir");
let app_config_path = dir.path().join("config.yaml");
let deployment_path = dir.path().join("deployment.yaml");
let bundle = NodeArtifactsBundle::new(vec![NodeArtifactsBundleEntry {
identifier: "node-1".to_owned(),
files: vec![
NodeArtifactFile::new(
app_config_path.to_string_lossy().into_owned(),
"app_key: app_value".to_string(),
),
NodeArtifactFile::new(
deployment_path.to_string_lossy().into_owned(),
"mode: local".to_string(),
),
],
}]);
let repo = StaticConfigSource::from_bundle(bundle);
let state = CfgsyncServerState::new(repo);
let port = allocate_test_port();
let address = format!("http://127.0.0.1:{port}");
let server = tokio::spawn(async move {
cfgsync_core::serve_cfgsync(port, state)
.await
.expect("run cfgsync server");
});
Client::new(&address)
.fetch_and_write(
&NodeRegistration::new(
"node-1".to_string(),
"127.0.0.1".parse().expect("parse ip"),
),
&OutputMap::default(),
)
.await
.expect("pull config files");
server.abort();
let _ = server.await;
let app_config = fs::read_to_string(&app_config_path).expect("read app config");
let deployment = fs::read_to_string(&deployment_path).expect("read deployment config");
assert_eq!(app_config, "app_key: app_value");
assert_eq!(deployment, "mode: local");
}
fn allocate_test_port() -> u16 {
let listener =
std::net::TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port for test");
let port = listener.local_addr().expect("read local addr").port();
drop(listener);
port
}
#[test]
fn parses_registration_payload_object() {
#[derive(Debug, serde::Deserialize, PartialEq, Eq)]
struct ExamplePayload {
network_port: u16,
service: String,
}
let metadata = parse_registration_payload(r#"{"network_port":3000,"service":"blend"}"#)
.expect("parse metadata");
let payload: ExamplePayload = metadata
.deserialize()
.expect("deserialize payload")
.expect("payload value");
assert_eq!(
payload,
ExamplePayload {
network_port: 3000,
service: "blend".to_owned(),
}
);
}
#[test]
fn parses_registration_payload_array() {
let metadata = parse_registration_payload(r#"[1,2,3]"#).expect("parse metadata array");
let payload: Vec<u8> = metadata
.deserialize()
.expect("deserialize payload")
.expect("payload value");
assert_eq!(payload, vec![1, 2, 3]);
}
}

View File

@ -0,0 +1,10 @@
pub use cfgsync_core as core;
mod client;
mod server;
pub use client::{Client, OutputMap, run_client_from_env};
pub use server::{
LoadServerConfigError, ServerConfig, ServerSource, build_persisted_router, build_router, serve,
serve_from_config, serve_persisted,
};

View File

@ -0,0 +1,255 @@
use std::{fs, path::Path, sync::Arc};
use anyhow::Context as _;
use axum::Router;
use cfgsync_adapter::{
CachedSnapshotMaterializer, MaterializedArtifacts, MaterializedArtifactsSink,
PersistingSnapshotMaterializer, RegistrationConfigSource, RegistrationSnapshotMaterializer,
};
use cfgsync_core::{
BundleConfigSource, CfgsyncServerState, NodeConfigSource, RunCfgsyncError,
serve_cfgsync as serve_cfgsync_state,
};
use serde::Deserialize;
use thiserror::Error;
/// Runtime cfgsync server config loaded from YAML.
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub struct ServerConfig {
/// HTTP port to bind the cfgsync server on.
pub port: u16,
/// Source used by the runtime-managed cfgsync server.
pub source: ServerSource,
}
/// Runtime cfgsync source loaded from config.
///
/// This type is intentionally runtime-oriented:
/// - `Static` serves precomputed artifacts directly without registration
/// - `Registration` serves precomputed artifacts through the registration
/// protocol, which is useful when the consumer wants clients to register
/// before receiving already-materialized artifacts
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum ServerSource {
/// Serve precomputed artifacts directly, without requiring registration.
#[serde(alias = "bundle")]
Static { artifacts_path: String },
/// Require node registration before serving precomputed artifacts.
Registration { artifacts_path: String },
}
#[derive(Debug, Error)]
pub enum LoadServerConfigError {
#[error("failed to read cfgsync config file {path}: {source}")]
Read {
path: String,
#[source]
source: std::io::Error,
},
#[error("failed to parse cfgsync config file {path}: {source}")]
Parse {
path: String,
#[source]
source: serde_yaml::Error,
},
}
impl ServerConfig {
/// Loads cfgsync runtime server config from a YAML file.
pub fn load_from_file(path: &Path) -> Result<Self, LoadServerConfigError> {
let config_path = path.display().to_string();
let config_content =
fs::read_to_string(path).map_err(|source| LoadServerConfigError::Read {
path: config_path.clone(),
source,
})?;
let config: ServerConfig = serde_yaml::from_str(&config_content).map_err(|source| {
LoadServerConfigError::Parse {
path: config_path,
source,
}
})?;
Ok(config)
}
#[must_use]
pub fn for_static(port: u16, artifacts_path: String) -> Self {
Self {
port,
source: ServerSource::Static { artifacts_path },
}
}
/// Builds a config that serves precomputed artifacts through the
/// registration flow.
#[must_use]
pub fn for_registration(port: u16, artifacts_path: String) -> Self {
Self {
port,
source: ServerSource::Registration { artifacts_path },
}
}
}
fn load_static_source(artifacts_path: &Path) -> anyhow::Result<Arc<dyn NodeConfigSource>> {
let provider = BundleConfigSource::from_yaml_file(artifacts_path).with_context(|| {
format!(
"loading cfgsync static artifacts from {}",
artifacts_path.display()
)
})?;
Ok(Arc::new(provider))
}
fn load_registration_source(artifacts_path: &Path) -> anyhow::Result<Arc<dyn NodeConfigSource>> {
let materialized = load_materialized_artifacts_yaml(artifacts_path)?;
let provider = RegistrationConfigSource::new(materialized);
Ok(Arc::new(provider))
}
fn load_materialized_artifacts_yaml(
artifacts_path: &Path,
) -> anyhow::Result<MaterializedArtifacts> {
let raw = fs::read_to_string(artifacts_path).with_context(|| {
format!(
"reading cfgsync materialized artifacts from {}",
artifacts_path.display()
)
})?;
serde_yaml::from_str(&raw).with_context(|| {
format!(
"parsing cfgsync materialized artifacts from {}",
artifacts_path.display()
)
})
}
fn resolve_artifacts_path(config_path: &Path, artifacts_path: &str) -> std::path::PathBuf {
let path = Path::new(artifacts_path);
if path.is_absolute() {
return path.to_path_buf();
}
config_path
.parent()
.unwrap_or_else(|| Path::new("."))
.join(path)
}
/// Loads runtime config and starts cfgsync HTTP server process.
pub async fn serve_from_config(config_path: &Path) -> anyhow::Result<()> {
let config = ServerConfig::load_from_file(config_path)?;
let artifacts_path = resolve_source_path(config_path, &config.source);
let state = build_server_state(&config, &artifacts_path)?;
serve_cfgsync_state(config.port, state).await?;
Ok(())
}
/// Builds the default registration-backed cfgsync router from a snapshot
/// materializer.
///
/// This is the main code-driven entrypoint for apps that want cfgsync to own:
/// - node registration
/// - readiness polling
/// - artifact serving
///
/// while the app owns only snapshot materialization logic.
pub fn build_router<M>(materializer: M) -> Router
where
M: RegistrationSnapshotMaterializer + 'static,
{
let provider = RegistrationConfigSource::new(CachedSnapshotMaterializer::new(materializer));
cfgsync_core::build_cfgsync_router(CfgsyncServerState::new(Arc::new(provider)))
}
/// Builds a registration-backed cfgsync router with a persistence hook for
/// ready materialization results.
///
/// Use this when the application wants cfgsync to persist or publish shared
/// artifacts after a snapshot becomes ready.
pub fn build_persisted_router<M, S>(materializer: M, sink: S) -> Router
where
M: RegistrationSnapshotMaterializer + 'static,
S: MaterializedArtifactsSink + 'static,
{
let provider = RegistrationConfigSource::new(CachedSnapshotMaterializer::new(
PersistingSnapshotMaterializer::new(materializer, sink),
));
cfgsync_core::build_cfgsync_router(CfgsyncServerState::new(Arc::new(provider)))
}
/// Runs the default registration-backed cfgsync server directly from a snapshot
/// materializer.
///
/// This is the simplest runtime entrypoint when the application already has a
/// materializer value and does not need to compose extra routes.
pub async fn serve<M>(port: u16, materializer: M) -> Result<(), RunCfgsyncError>
where
M: RegistrationSnapshotMaterializer + 'static,
{
let router = build_router(materializer);
serve_router(port, router).await
}
/// Runs a registration-backed cfgsync server with a persistence hook for ready
/// materialization results.
///
/// This is the direct serving counterpart to
/// [`build_persisted_router`].
pub async fn serve_persisted<M, S>(
port: u16,
materializer: M,
sink: S,
) -> Result<(), RunCfgsyncError>
where
M: RegistrationSnapshotMaterializer + 'static,
S: MaterializedArtifactsSink + 'static,
{
let router = build_persisted_router(materializer, sink);
serve_router(port, router).await
}
async fn serve_router(port: u16, router: Router) -> Result<(), RunCfgsyncError> {
let bind_addr = format!("0.0.0.0:{port}");
let listener = tokio::net::TcpListener::bind(&bind_addr)
.await
.map_err(|source| RunCfgsyncError::Bind { bind_addr, source })?;
axum::serve(listener, router)
.await
.map_err(|source| RunCfgsyncError::Serve { source })?;
Ok(())
}
fn build_server_state(
config: &ServerConfig,
source_path: &Path,
) -> anyhow::Result<CfgsyncServerState> {
let repo = match &config.source {
ServerSource::Static { .. } => load_static_source(source_path)?,
ServerSource::Registration { .. } => load_registration_source(source_path)?,
};
Ok(CfgsyncServerState::new(repo))
}
fn resolve_source_path(config_path: &Path, source: &ServerSource) -> std::path::PathBuf {
match source {
ServerSource::Static { artifacts_path } => {
resolve_artifacts_path(config_path, artifacts_path)
}
ServerSource::Registration { artifacts_path } => {
resolve_artifacts_path(config_path, artifacts_path)
}
}
}

View File

@ -2,10 +2,10 @@
set -euo pipefail
RUSTFLAGS='--cfg feature="pol-dev-mode"' \
cargo build --manifest-path /workspace/testing-framework/tools/cfgsync-runtime/Cargo.toml --bin cfgsync-server
cargo build --manifest-path /workspace/cfgsync/runtime/Cargo.toml --bin cfgsync-server
RUSTFLAGS='--cfg feature="pol-dev-mode"' \
cargo build --manifest-path /workspace/testing-framework/tools/cfgsync-runtime/Cargo.toml --bin cfgsync-client
cargo build --manifest-path /workspace/cfgsync/runtime/Cargo.toml --bin cfgsync-client
cp /workspace/target/debug/cfgsync-server /workspace/artifacts/cfgsync-server
cp /workspace/target/debug/cfgsync-client /workspace/artifacts/cfgsync-client

View File

@ -39,7 +39,7 @@ spec:
items:
- key: cfgsync.yaml
path: cfgsync.yaml
- key: cfgsync.bundle.yaml
path: cfgsync.bundle.yaml
- key: cfgsync.artifacts.yaml
path: cfgsync.artifacts.yaml
- key: run_cfgsync.sh
path: scripts/run_cfgsync.sh

View File

@ -11,9 +11,9 @@ data:
{{- else }}
{{ "" | indent 4 }}
{{- end }}
cfgsync.bundle.yaml: |
{{- if .Values.cfgsync.bundle }}
{{ .Values.cfgsync.bundle | indent 4 }}
cfgsync.artifacts.yaml: |
{{- if .Values.cfgsync.artifacts }}
{{ .Values.cfgsync.artifacts | indent 4 }}
{{- else }}
{{ "" | indent 4 }}
{{- end }}

View File

@ -7,8 +7,9 @@ version = { workspace = true }
[dependencies]
# Workspace crates
cfgsync-adapter = { workspace = true }
cfgsync-artifacts = { workspace = true }
cfgsync-core = { workspace = true }
cfgsync_runtime = { workspace = true }
lb-framework = { workspace = true }
testing-framework-core = { workspace = true }
testing-framework-env = { workspace = true }

View File

@ -1,24 +1,31 @@
use anyhow::{Result, anyhow};
pub(crate) use cfgsync_runtime::render::CfgsyncOutputPaths;
use cfgsync_runtime::{
bundle::{CfgSyncBundle, CfgSyncBundleNode, build_cfgsync_bundle_with_hostnames},
render::{
CfgsyncConfigOverrides, RenderedCfgsync, ensure_bundle_path,
render_cfgsync_yaml_from_template, write_rendered_cfgsync,
},
use anyhow::Result;
use cfgsync_artifacts::ArtifactFile;
pub(crate) use cfgsync_core::render::CfgsyncOutputPaths;
use cfgsync_core::render::{
CfgsyncConfigOverrides, RenderedCfgsync, ensure_artifacts_path,
render_cfgsync_yaml_from_template, write_rendered_cfgsync,
};
use reqwest::Url;
use serde_yaml::{Mapping, Value};
use testing_framework_core::cfgsync::CfgsyncEnv;
use testing_framework_core::cfgsync::{StaticArtifactRenderer, build_static_artifacts};
use thiserror::Error;
pub(crate) struct CfgsyncRenderOptions {
pub port: Option<u16>,
pub bundle_path: Option<String>,
pub artifacts_path: Option<String>,
pub min_timeout_secs: Option<u64>,
pub metrics_otlp_ingest_url: Option<Url>,
}
pub(crate) fn render_cfgsync_from_template<E: CfgsyncEnv>(
#[derive(Debug, Error)]
enum BundleRenderError {
#[error("cfgsync bundle node `{identifier}` is missing `/config.yaml`")]
MissingConfigFile { identifier: String },
#[error("cfgsync config file is missing `{key}`")]
MissingYamlKey { key: String },
}
pub(crate) fn render_cfgsync_from_template<E: StaticArtifactRenderer>(
topology: &E::Deployment,
hostnames: &[String],
options: CfgsyncRenderOptions,
@ -26,48 +33,59 @@ pub(crate) fn render_cfgsync_from_template<E: CfgsyncEnv>(
let cfg = build_cfgsync_server_config();
let overrides = build_overrides::<E>(topology, options);
let config_yaml = render_cfgsync_yaml_from_template(cfg, &overrides)?;
let mut bundle = build_cfgsync_bundle_with_hostnames::<E>(topology, hostnames)?;
append_deployment_files(&mut bundle)?;
let bundle_yaml = serde_yaml::to_string(&bundle)?;
let mut materialized = build_static_artifacts::<E>(topology, hostnames)?;
append_deployment_files(&mut materialized)?;
let artifacts_yaml = serde_yaml::to_string(&materialized)?;
Ok(RenderedCfgsync {
config_yaml,
bundle_yaml,
artifacts_yaml,
})
}
fn append_deployment_files(bundle: &mut CfgSyncBundle) -> Result<()> {
for node in &mut bundle.nodes {
if has_file_path(node, "/deployment.yaml") {
continue;
}
let config_content = config_file_content(node)
.ok_or_else(|| anyhow!("cfgsync bundle node missing /config.yaml"))?;
let deployment_yaml = extract_yaml_key(&config_content, "deployment")?;
node.files
.push(build_bundle_file("/deployment.yaml", deployment_yaml));
fn append_deployment_files(
materialized: &mut cfgsync_adapter::MaterializedArtifacts,
) -> Result<()> {
if has_shared_file_path(materialized, "/deployment.yaml") {
return Ok(());
}
let Some((identifier, artifacts)) = materialized.iter().next() else {
return Ok(());
};
let config_content =
config_file_content(artifacts).ok_or_else(|| BundleRenderError::MissingConfigFile {
identifier: identifier.to_owned(),
})?;
let deployment_yaml = extract_yaml_key(&config_content, "deployment")?;
let mut shared = materialized.shared().clone();
shared
.files
.push(build_artifact_file("/deployment.yaml", deployment_yaml));
*materialized = materialized.clone().with_shared(shared);
Ok(())
}
fn has_file_path(node: &CfgSyncBundleNode, path: &str) -> bool {
node.files.iter().any(|file| file.path == path)
fn has_shared_file_path(materialized: &cfgsync_adapter::MaterializedArtifacts, path: &str) -> bool {
materialized
.shared()
.files
.iter()
.any(|file| file.path == path)
}
fn config_file_content(node: &CfgSyncBundleNode) -> Option<String> {
node.files
fn config_file_content(artifacts: &cfgsync_artifacts::ArtifactSet) -> Option<String> {
artifacts
.files
.iter()
.find_map(|file| (file.path == "/config.yaml").then_some(file.content.clone()))
}
fn build_bundle_file(path: &str, content: String) -> cfgsync_core::CfgSyncFile {
cfgsync_core::CfgSyncFile {
path: path.to_owned(),
content,
}
fn build_artifact_file(path: &str, content: String) -> ArtifactFile {
ArtifactFile::new(path.to_string(), content.to_string())
}
fn extract_yaml_key(content: &str, key: &str) -> Result<String> {
@ -75,7 +93,9 @@ fn extract_yaml_key(content: &str, key: &str) -> Result<String> {
let value = document
.get(key)
.cloned()
.ok_or_else(|| anyhow!("config yaml missing `{key}`"))?;
.ok_or_else(|| BundleRenderError::MissingYamlKey {
key: key.to_owned(),
})?;
Ok(serde_yaml::to_string(&value)?)
}
@ -87,21 +107,28 @@ fn build_cfgsync_server_config() -> Value {
Value::Number(4400_u64.into()),
);
root.insert(
Value::String("bundle_path".to_string()),
Value::String("cfgsync.bundle.yaml".to_string()),
let mut source = Mapping::new();
source.insert(
Value::String("kind".to_string()),
Value::String("registration".to_string()),
);
source.insert(
Value::String("artifacts_path".to_string()),
Value::String("cfgsync.artifacts.yaml".to_string()),
);
root.insert(Value::String("source".to_string()), Value::Mapping(source));
Value::Mapping(root)
}
pub(crate) fn render_and_write_cfgsync_from_template<E: CfgsyncEnv>(
pub(crate) fn render_and_write_cfgsync_from_template<E: StaticArtifactRenderer>(
topology: &E::Deployment,
hostnames: &[String],
mut options: CfgsyncRenderOptions,
output: CfgsyncOutputPaths<'_>,
) -> Result<RenderedCfgsync> {
ensure_bundle_path(&mut options.bundle_path, output.bundle_path);
ensure_artifacts_path(&mut options.artifacts_path, output.artifacts_path);
let rendered = render_cfgsync_from_template::<E>(topology, hostnames, options)?;
write_rendered_cfgsync(&rendered, output)?;
@ -109,13 +136,13 @@ pub(crate) fn render_and_write_cfgsync_from_template<E: CfgsyncEnv>(
Ok(rendered)
}
fn build_overrides<E: CfgsyncEnv>(
fn build_overrides<E: StaticArtifactRenderer>(
topology: &E::Deployment,
options: CfgsyncRenderOptions,
) -> CfgsyncConfigOverrides {
let CfgsyncRenderOptions {
port,
bundle_path,
artifacts_path,
min_timeout_secs,
metrics_otlp_ingest_url,
} = options;
@ -124,7 +151,7 @@ fn build_overrides<E: CfgsyncEnv>(
port,
n_hosts: Some(E::nodes(topology).len()),
timeout_floor_secs: min_timeout_secs,
bundle_path,
artifacts_path,
metrics_otlp_ingest_url: metrics_otlp_ingest_url.map(|url| url.to_string()),
}
}

View File

@ -117,7 +117,7 @@ impl ComposeDeployEnv for LbcExtEnv {
nodes = topology.nodes().len(),
"updating cfgsync template"
);
let bundle_path = cfgsync_bundle_path(path);
let artifacts_path = cfgsync_artifacts_path(path);
let hostnames = topology_hostnames(topology);
let options = cfgsync_render_options(port, metrics_otlp_ingest_url);
@ -127,7 +127,7 @@ impl ComposeDeployEnv for LbcExtEnv {
options,
CfgsyncOutputPaths {
config_path: path,
bundle_path: &bundle_path,
artifacts_path: &artifacts_path,
},
)?;
Ok(())
@ -186,11 +186,11 @@ fn node_instance_name(index: usize) -> String {
format!("node-{index}")
}
fn cfgsync_bundle_path(config_path: &Path) -> PathBuf {
fn cfgsync_artifacts_path(config_path: &Path) -> PathBuf {
config_path
.parent()
.unwrap_or(config_path)
.join("cfgsync.bundle.yaml")
.join("cfgsync.artifacts.yaml")
}
fn topology_hostnames(topology: &DeploymentPlan) -> Vec<String> {
@ -207,7 +207,7 @@ fn cfgsync_render_options(
) -> CfgsyncRenderOptions {
CfgsyncRenderOptions {
port: Some(port),
bundle_path: None,
artifacts_path: None,
min_timeout_secs: None,
metrics_otlp_ingest_url: metrics_otlp_ingest_url.cloned(),
}
@ -254,7 +254,6 @@ fn build_compose_node_descriptor(
base_volumes(),
default_extra_hosts(),
ports,
api_port,
environment,
platform,
)

View File

@ -31,7 +31,7 @@ use crate::{
const CFGSYNC_K8S_TIMEOUT_SECS: u64 = 300;
const K8S_FULLNAME_OVERRIDE: &str = "logos-runner";
const DEFAULT_K8S_TESTNET_IMAGE: &str = "logos-blockchain-testing:local";
const DEFAULT_K8S_TESTNET_IMAGE: &str = "public.ecr.aws/r4s5t9y4/logos/logos-blockchain:test";
/// Paths and image metadata required to deploy the Helm chart.
pub struct K8sAssets {
@ -182,11 +182,11 @@ pub fn prepare_assets(
let root = workspace_root().map_err(|source| AssetsError::WorkspaceRoot { source })?;
let tempdir = create_assets_tempdir()?;
let (cfgsync_file, cfgsync_yaml, bundle_yaml) =
let (cfgsync_file, cfgsync_yaml, artifacts_yaml) =
render_and_write_cfgsync(topology, metrics_otlp_ingest_url, &tempdir)?;
let scripts = validate_scripts(&root)?;
let chart_path = helm_chart_path()?;
let values_file = render_and_write_values(topology, &tempdir, &cfgsync_yaml, &bundle_yaml)?;
let values_file = render_and_write_values(topology, &tempdir, &cfgsync_yaml, &artifacts_yaml)?;
let image = testnet_image();
log_assets_prepare_done(&cfgsync_file, &values_file, &chart_path, &image);
@ -351,24 +351,24 @@ fn render_and_write_cfgsync(
tempdir: &TempDir,
) -> Result<(PathBuf, String, String), AssetsError> {
let cfgsync_file = tempdir.path().join("cfgsync.yaml");
let bundle_file = tempdir.path().join("cfgsync.bundle.yaml");
let (cfgsync_yaml, bundle_yaml) = render_cfgsync_config(
let artifacts_file = tempdir.path().join("cfgsync.artifacts.yaml");
let (cfgsync_yaml, artifacts_yaml) = render_cfgsync_config(
topology,
metrics_otlp_ingest_url,
&cfgsync_file,
&bundle_file,
&artifacts_file,
)?;
Ok((cfgsync_file, cfgsync_yaml, bundle_yaml))
Ok((cfgsync_file, cfgsync_yaml, artifacts_yaml))
}
fn render_and_write_values(
topology: &DeploymentPlan,
tempdir: &TempDir,
cfgsync_yaml: &str,
bundle_yaml: &str,
artifacts_yaml: &str,
) -> Result<PathBuf, AssetsError> {
let values_yaml = render_values_yaml(topology, cfgsync_yaml, bundle_yaml)?;
let values_yaml = render_values_yaml(topology, cfgsync_yaml, artifacts_yaml)?;
write_temp_file(tempdir.path(), "values.yaml", values_yaml)
}
@ -380,7 +380,7 @@ fn render_cfgsync_config(
topology: &DeploymentPlan,
metrics_otlp_ingest_url: Option<&Url>,
cfgsync_file: &Path,
bundle_file: &Path,
artifacts_file: &Path,
) -> Result<(String, String), AssetsError> {
let hostnames = k8s_node_hostnames(topology);
let rendered = render_and_write_cfgsync_from_template::<lb_framework::LbcEnv>(
@ -388,18 +388,18 @@ fn render_cfgsync_config(
&hostnames,
CfgsyncRenderOptions {
port: Some(cfgsync_port()),
bundle_path: Some("cfgsync.bundle.yaml".to_string()),
artifacts_path: Some("cfgsync.artifacts.yaml".to_string()),
min_timeout_secs: Some(CFGSYNC_K8S_TIMEOUT_SECS),
metrics_otlp_ingest_url: metrics_otlp_ingest_url.cloned(),
},
CfgsyncOutputPaths {
config_path: cfgsync_file,
bundle_path: bundle_file,
artifacts_path: artifacts_file,
},
)
.map_err(|source| AssetsError::Cfgsync { source })?;
Ok((rendered.config_yaml, rendered.bundle_yaml))
Ok((rendered.config_yaml, rendered.artifacts_yaml))
}
fn k8s_node_hostnames(topology: &DeploymentPlan) -> Vec<String> {
@ -459,9 +459,9 @@ fn helm_chart_path() -> Result<PathBuf, AssetsError> {
fn render_values_yaml(
topology: &DeploymentPlan,
cfgsync_yaml: &str,
bundle_yaml: &str,
artifacts_yaml: &str,
) -> Result<String, AssetsError> {
let values = build_values(topology, cfgsync_yaml, bundle_yaml);
let values = build_values(topology, cfgsync_yaml, artifacts_yaml);
serde_yaml::to_string(&values).map_err(|source| AssetsError::Values { source })
}
@ -569,7 +569,7 @@ struct KzgValues {
struct CfgsyncValues {
port: u16,
config: String,
bundle: String,
artifacts: String,
}
#[derive(Serialize)]
@ -589,11 +589,11 @@ struct NodeValues {
env: BTreeMap<String, String>,
}
fn build_values(topology: &DeploymentPlan, cfgsync_yaml: &str, bundle_yaml: &str) -> HelmValues {
fn build_values(topology: &DeploymentPlan, cfgsync_yaml: &str, artifacts_yaml: &str) -> HelmValues {
let cfgsync = CfgsyncValues {
port: cfgsync_port(),
config: cfgsync_yaml.to_string(),
bundle: bundle_yaml.to_string(),
artifacts: artifacts_yaml.to_string(),
};
let kzg = KzgValues::disabled();
let image_pull_policy =

View File

@ -17,6 +17,8 @@ default = []
[dependencies]
async-trait = "0.1"
cfgsync-adapter = { workspace = true }
cfgsync-artifacts = { workspace = true }
futures = { default-features = false, features = ["std"], version = "0.3" }
parking_lot = { workspace = true }
prometheus-http-query = "0.8"

View File

@ -1,16 +1,14 @@
use std::error::Error;
pub use cfgsync_adapter::*;
use cfgsync_artifacts::{ArtifactFile, ArtifactSet};
use thiserror::Error;
#[doc(hidden)]
pub type DynCfgsyncError = Box<dyn Error + Send + Sync + 'static>;
#[derive(Debug, Clone)]
pub struct CfgsyncNodeConfig {
pub identifier: String,
pub config_yaml: String,
}
pub trait CfgsyncEnv {
#[doc(hidden)]
pub trait StaticArtifactRenderer {
type Deployment;
type Node;
type NodeConfig;
@ -35,8 +33,11 @@ pub trait CfgsyncEnv {
fn serialize_node_config(config: &Self::NodeConfig) -> Result<String, Self::Error>;
}
#[doc(hidden)]
pub use StaticArtifactRenderer as CfgsyncEnv;
#[derive(Debug, Error)]
pub enum BuildCfgsyncNodesError {
pub enum BuildStaticArtifactsError {
#[error("cfgsync hostnames mismatch (nodes={nodes}, hostnames={hostnames})")]
HostnameCountMismatch { nodes: usize, hostnames: usize },
#[error("cfgsync adapter failed: {source}")]
@ -46,39 +47,47 @@ pub enum BuildCfgsyncNodesError {
},
}
fn adapter_error<E>(source: E) -> BuildCfgsyncNodesError
fn adapter_error<E>(source: E) -> BuildStaticArtifactsError
where
E: Error + Send + Sync + 'static,
{
BuildCfgsyncNodesError::Adapter {
BuildStaticArtifactsError::Adapter {
source: Box::new(source),
}
}
pub fn build_cfgsync_node_configs<E: CfgsyncEnv>(
pub fn build_static_artifacts<E: StaticArtifactRenderer>(
deployment: &E::Deployment,
hostnames: &[String],
) -> Result<Vec<CfgsyncNodeConfig>, BuildCfgsyncNodesError> {
) -> Result<cfgsync_adapter::MaterializedArtifacts, BuildStaticArtifactsError> {
let nodes = E::nodes(deployment);
if nodes.len() != hostnames.len() {
return Err(BuildCfgsyncNodesError::HostnameCountMismatch {
return Err(BuildStaticArtifactsError::HostnameCountMismatch {
nodes: nodes.len(),
hostnames: hostnames.len(),
});
}
let mut output = Vec::with_capacity(nodes.len());
let mut output = std::collections::HashMap::with_capacity(nodes.len());
for (index, node) in nodes.iter().enumerate() {
let mut node_config = E::build_node_config(deployment, node).map_err(adapter_error)?;
E::rewrite_for_hostnames(deployment, index, hostnames, &mut node_config)
.map_err(adapter_error)?;
let config_yaml = E::serialize_node_config(&node_config).map_err(adapter_error)?;
output.push(CfgsyncNodeConfig {
identifier: E::node_identifier(index, node),
config_yaml,
});
output.insert(
E::node_identifier(index, node),
ArtifactSet::new(vec![ArtifactFile::new(
"/config.yaml".to_string(),
config_yaml.clone(),
)]),
);
}
Ok(output)
Ok(cfgsync_adapter::MaterializedArtifacts::from_nodes(output))
}
#[doc(hidden)]
pub use build_static_artifacts as build_cfgsync_node_catalog;

View File

@ -31,4 +31,5 @@ uuid = { features = ["v4"], version = "1" }
[dev-dependencies]
groth16 = { workspace = true }
key-management-system-service = { workspace = true }
serde_json = { workspace = true }
zksign = { workspace = true }

View File

@ -18,9 +18,6 @@ services:
{% for port in node.ports %}
- {{ port }}
{% endfor %}
labels:
testing-framework.node: "true"
testing-framework.api-container-port: "{{ node.api_container_port }}"
environment:
{% for env in node.environment %}
{{ env.key }}: "{{ env.value }}"

View File

@ -9,7 +9,6 @@ pub struct NodeDescriptor {
volumes: Vec<String>,
extra_hosts: Vec<String>,
ports: Vec<String>,
api_container_port: u16,
environment: Vec<EnvEntry>,
#[serde(skip_serializing_if = "Option::is_none")]
platform: Option<String>,
@ -50,7 +49,6 @@ impl NodeDescriptor {
volumes: Vec<String>,
extra_hosts: Vec<String>,
ports: Vec<String>,
api_container_port: u16,
environment: Vec<EnvEntry>,
platform: Option<String>,
) -> Self {
@ -61,7 +59,6 @@ impl NodeDescriptor {
volumes,
extra_hosts,
ports,
api_container_port,
environment,
platform,
}
@ -80,9 +77,4 @@ impl NodeDescriptor {
pub fn environment(&self) -> &[EnvEntry] {
&self.environment
}
#[cfg(test)]
pub fn api_container_port(&self) -> u16 {
self.api_container_port
}
}

View File

@ -106,7 +106,8 @@ pub trait K8sDeployEnv: Application {
format!("{release}-node-{index}")
}
/// Label selector used to discover managed node services in attached mode.
/// Label selector used to discover managed node services in
/// existing-cluster mode.
fn attach_node_service_selector(release: &str) -> String {
format!("app.kubernetes.io/instance={release}")
}

View File

@ -1,21 +0,0 @@
[package]
categories = { workspace = true }
description = { workspace = true }
edition = { workspace = true }
keywords = { workspace = true }
license = { workspace = true }
name = "cfgsync-core"
readme = { workspace = true }
repository = { workspace = true }
version = { workspace = true }
[lints]
workspace = true
[dependencies]
axum = { default-features = false, features = ["http1", "http2", "json", "tokio"], version = "0.7.5" }
reqwest = { features = ["json"], workspace = true }
serde = { default-features = false, features = ["derive"], version = "1" }
serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" }

View File

@ -1,94 +0,0 @@
use serde::Serialize;
use thiserror::Error;
use crate::{
repo::{CfgSyncErrorResponse, CfgSyncPayload},
server::ClientIp,
};
#[derive(Debug, Error)]
pub enum ClientError {
#[error("request failed: {0}")]
Request(#[from] reqwest::Error),
#[error("cfgsync server error {status}: {message}")]
Status {
status: reqwest::StatusCode,
message: String,
error: Option<CfgSyncErrorResponse>,
},
#[error("failed to parse cfgsync response: {0}")]
Decode(serde_json::Error),
}
#[derive(Clone, Debug)]
pub struct CfgSyncClient {
base_url: String,
http: reqwest::Client,
}
impl CfgSyncClient {
#[must_use]
pub fn new(base_url: impl Into<String>) -> Self {
let mut base_url = base_url.into();
while base_url.ends_with('/') {
base_url.pop();
}
Self {
base_url,
http: reqwest::Client::new(),
}
}
#[must_use]
pub fn base_url(&self) -> &str {
&self.base_url
}
pub async fn fetch_node_config(
&self,
payload: &ClientIp,
) -> Result<CfgSyncPayload, ClientError> {
self.post_json("/node", payload).await
}
pub async fn fetch_init_with_node_config(
&self,
payload: &ClientIp,
) -> Result<CfgSyncPayload, ClientError> {
self.post_json("/init-with-node", payload).await
}
pub async fn post_json<P: Serialize>(
&self,
path: &str,
payload: &P,
) -> Result<CfgSyncPayload, ClientError> {
let url = self.endpoint_url(path);
let response = self.http.post(url).json(payload).send().await?;
let status = response.status();
let body = response.text().await?;
if !status.is_success() {
let error = serde_json::from_str::<CfgSyncErrorResponse>(&body).ok();
let message = error
.as_ref()
.map(|err| err.message.clone())
.unwrap_or_else(|| body.clone());
return Err(ClientError::Status {
status,
message,
error,
});
}
serde_json::from_str(&body).map_err(ClientError::Decode)
}
fn endpoint_url(&self, path: &str) -> String {
if path.starts_with('/') {
format!("{}{}", self.base_url, path)
} else {
format!("{}/{}", self.base_url, path)
}
}
}

View File

@ -1,10 +0,0 @@
pub mod client;
pub mod repo;
pub mod server;
pub use client::{CfgSyncClient, ClientError};
pub use repo::{
CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile, CfgSyncPayload,
ConfigRepo, RepoResponse,
};
pub use server::{CfgSyncState, ClientIp, RunCfgsyncError, cfgsync_app, run_cfgsync};

View File

@ -1,107 +0,0 @@
use std::{collections::HashMap, sync::Arc};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::oneshot::Sender;
pub const CFGSYNC_SCHEMA_VERSION: u16 = 1;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CfgSyncFile {
pub path: String,
pub content: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CfgSyncPayload {
pub schema_version: u16,
#[serde(default)]
pub files: Vec<CfgSyncFile>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub config_yaml: Option<String>,
}
impl CfgSyncPayload {
#[must_use]
pub fn from_files(files: Vec<CfgSyncFile>) -> Self {
Self {
schema_version: CFGSYNC_SCHEMA_VERSION,
files,
config_yaml: None,
}
}
#[must_use]
pub fn normalized_files(&self, default_config_path: &str) -> Vec<CfgSyncFile> {
if !self.files.is_empty() {
return self.files.clone();
}
self.config_yaml
.as_ref()
.map(|content| {
vec![CfgSyncFile {
path: default_config_path.to_owned(),
content: content.clone(),
}]
})
.unwrap_or_default()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CfgSyncErrorCode {
MissingConfig,
Internal,
}
#[derive(Debug, Clone, Serialize, Deserialize, Error)]
#[error("{code:?}: {message}")]
pub struct CfgSyncErrorResponse {
pub code: CfgSyncErrorCode,
pub message: String,
}
impl CfgSyncErrorResponse {
#[must_use]
pub fn missing_config(identifier: &str) -> Self {
Self {
code: CfgSyncErrorCode::MissingConfig,
message: format!("missing config for host {identifier}"),
}
}
#[must_use]
pub fn internal(message: impl Into<String>) -> Self {
Self {
code: CfgSyncErrorCode::Internal,
message: message.into(),
}
}
}
pub enum RepoResponse {
Config(CfgSyncPayload),
Error(CfgSyncErrorResponse),
}
pub struct ConfigRepo {
configs: HashMap<String, CfgSyncPayload>,
}
impl ConfigRepo {
#[must_use]
pub fn from_bundle(configs: HashMap<String, CfgSyncPayload>) -> Arc<Self> {
Arc::new(Self { configs })
}
pub async fn register(&self, identifier: String, reply_tx: Sender<RepoResponse>) {
let response = self.configs.get(&identifier).cloned().map_or_else(
|| RepoResponse::Error(CfgSyncErrorResponse::missing_config(&identifier)),
RepoResponse::Config,
);
let _ = reply_tx.send(response);
}
}

View File

@ -1,95 +0,0 @@
use std::{io, net::Ipv4Addr, sync::Arc};
use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::post};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::oneshot::channel;
use crate::repo::{CfgSyncErrorResponse, ConfigRepo, RepoResponse};
#[derive(Serialize, Deserialize)]
pub struct ClientIp {
/// Node IP that can be used by clients for observability/logging.
pub ip: Ipv4Addr,
/// Stable node identifier used as key in cfgsync bundle lookup.
pub identifier: String,
}
pub struct CfgSyncState {
repo: Arc<ConfigRepo>,
}
impl CfgSyncState {
#[must_use]
pub fn new(repo: Arc<ConfigRepo>) -> Self {
Self { repo }
}
}
#[derive(Debug, Error)]
pub enum RunCfgsyncError {
#[error("failed to bind cfgsync server on {bind_addr}: {source}")]
Bind {
bind_addr: String,
#[source]
source: io::Error,
},
#[error("cfgsync server terminated unexpectedly: {source}")]
Serve {
#[source]
source: io::Error,
},
}
async fn node_config(
State(state): State<Arc<CfgSyncState>>,
Json(payload): Json<ClientIp>,
) -> impl IntoResponse {
let identifier = payload.identifier.clone();
let (reply_tx, reply_rx) = channel();
state.repo.register(identifier, reply_tx).await;
match reply_rx.await {
Err(_) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(CfgSyncErrorResponse::internal(
"error receiving config from repo",
)),
)
.into_response(),
Ok(RepoResponse::Config(payload_data)) => {
(StatusCode::OK, Json(payload_data)).into_response()
}
Ok(RepoResponse::Error(error)) => {
let status = match error.code {
crate::repo::CfgSyncErrorCode::MissingConfig => StatusCode::NOT_FOUND,
crate::repo::CfgSyncErrorCode::Internal => StatusCode::INTERNAL_SERVER_ERROR,
};
(status, Json(error)).into_response()
}
}
}
pub fn cfgsync_app(state: CfgSyncState) -> Router {
Router::new()
.route("/node", post(node_config))
.route("/init-with-node", post(node_config))
.with_state(Arc::new(state))
}
pub async fn run_cfgsync(port: u16, state: CfgSyncState) -> Result<(), RunCfgsyncError> {
let app = cfgsync_app(state);
println!("Server running on http://0.0.0.0:{port}");
let bind_addr = format!("0.0.0.0:{port}");
let listener = tokio::net::TcpListener::bind(&bind_addr)
.await
.map_err(|source| RunCfgsyncError::Bind { bind_addr, source })?;
axum::serve(listener, app)
.await
.map_err(|source| RunCfgsyncError::Serve { source })?;
Ok(())
}

View File

@ -1,22 +0,0 @@
[package]
categories = { workspace = true }
description = { workspace = true }
edition = { workspace = true }
keywords = { workspace = true }
license = { workspace = true }
name = "cfgsync-runtime"
readme = { workspace = true }
repository = { workspace = true }
version = { workspace = true }
[lints]
workspace = true
[dependencies]
anyhow = "1"
cfgsync-core = { workspace = true }
clap = { version = "4", features = ["derive"] }
serde = { workspace = true }
serde_yaml = { workspace = true }
testing-framework-core = { workspace = true }
tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" }

View File

@ -1,39 +0,0 @@
use anyhow::Result;
use cfgsync_core::CfgSyncFile;
use serde::{Deserialize, Serialize};
use testing_framework_core::cfgsync::{CfgsyncEnv, build_cfgsync_node_configs};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CfgSyncBundle {
pub nodes: Vec<CfgSyncBundleNode>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CfgSyncBundleNode {
pub identifier: String,
#[serde(default)]
pub files: Vec<CfgSyncFile>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub config_yaml: Option<String>,
}
pub fn build_cfgsync_bundle_with_hostnames<E: CfgsyncEnv>(
deployment: &E::Deployment,
hostnames: &[String],
) -> Result<CfgSyncBundle> {
let nodes = build_cfgsync_node_configs::<E>(deployment, hostnames)?;
Ok(CfgSyncBundle {
nodes: nodes
.into_iter()
.map(|node| CfgSyncBundleNode {
identifier: node.identifier,
files: vec![CfgSyncFile {
path: "/config.yaml".to_owned(),
content: node.config_yaml,
}],
config_yaml: None,
})
.collect(),
})
}

View File

@ -1,108 +0,0 @@
use std::{
env, fs,
net::Ipv4Addr,
path::{Path, PathBuf},
};
use anyhow::{Context as _, Result, anyhow, bail};
use cfgsync_core::{CFGSYNC_SCHEMA_VERSION, CfgSyncClient, CfgSyncFile, CfgSyncPayload, ClientIp};
use tokio::time::{Duration, sleep};
const FETCH_ATTEMPTS: usize = 5;
const FETCH_RETRY_DELAY: Duration = Duration::from_millis(250);
fn parse_ip(ip_str: &str) -> Ipv4Addr {
ip_str.parse().unwrap_or(Ipv4Addr::LOCALHOST)
}
async fn fetch_with_retry(payload: &ClientIp, server_addr: &str) -> Result<CfgSyncPayload> {
let client = CfgSyncClient::new(server_addr);
let mut last_error: Option<anyhow::Error> = None;
for attempt in 1..=FETCH_ATTEMPTS {
match client.fetch_node_config(payload).await {
Ok(config) => return Ok(config),
Err(error) => {
last_error = Some(error.into());
if attempt < FETCH_ATTEMPTS {
sleep(FETCH_RETRY_DELAY).await;
}
}
}
}
match last_error {
Some(error) => Err(error),
None => Err(anyhow!("cfgsync client fetch failed without an error")),
}
}
async fn pull_config_files(payload: ClientIp, server_addr: &str, config_file: &str) -> Result<()> {
let config = fetch_with_retry(&payload, server_addr)
.await
.context("fetching cfgsync node config")?;
ensure_schema_version(&config)?;
let files = collect_payload_files(&config, config_file)?;
for file in files {
write_cfgsync_file(&file)?;
}
println!("Config files saved");
Ok(())
}
fn ensure_schema_version(config: &CfgSyncPayload) -> Result<()> {
if config.schema_version != CFGSYNC_SCHEMA_VERSION {
bail!(
"unsupported cfgsync payload schema version {}, expected {}",
config.schema_version,
CFGSYNC_SCHEMA_VERSION
);
}
Ok(())
}
fn collect_payload_files(config: &CfgSyncPayload, config_file: &str) -> Result<Vec<CfgSyncFile>> {
let files = config.normalized_files(config_file);
if files.is_empty() {
bail!("cfgsync payload contains no files");
}
Ok(files)
}
fn write_cfgsync_file(file: &CfgSyncFile) -> Result<()> {
let path = PathBuf::from(&file.path);
ensure_parent_dir(&path)?;
fs::write(&path, &file.content).with_context(|| format!("writing {}", path.display()))?;
println!("Config saved to {}", path.display());
Ok(())
}
fn ensure_parent_dir(path: &Path) -> Result<()> {
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() {
fs::create_dir_all(parent)
.with_context(|| format!("creating parent directory {}", parent.display()))?;
}
}
Ok(())
}
pub async fn run_cfgsync_client_from_env(default_port: u16) -> Result<()> {
let config_file_path = env::var("CFG_FILE_PATH").unwrap_or_else(|_| "config.yaml".to_owned());
let server_addr =
env::var("CFG_SERVER_ADDR").unwrap_or_else(|_| format!("http://127.0.0.1:{default_port}"));
let ip = parse_ip(&env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_owned()));
let identifier =
env::var("CFG_HOST_IDENTIFIER").unwrap_or_else(|_| "unidentified-node".to_owned());
pull_config_files(ClientIp { ip, identifier }, &server_addr, &config_file_path).await
}

View File

@ -1,10 +0,0 @@
pub mod bundle;
pub mod render;
pub use cfgsync_core as core;
mod client;
mod server;
pub use client::run_cfgsync_client_from_env;
pub use server::{CfgSyncServerConfig, run_cfgsync_server};

View File

@ -1,101 +0,0 @@
use std::{collections::HashMap, fs, path::Path, sync::Arc};
use anyhow::Context as _;
use cfgsync_core::{CfgSyncFile, CfgSyncPayload, CfgSyncState, ConfigRepo, run_cfgsync};
use serde::Deserialize;
#[derive(Debug, Deserialize, Clone)]
pub struct CfgSyncServerConfig {
pub port: u16,
pub bundle_path: String,
}
impl CfgSyncServerConfig {
pub fn load_from_file(path: &Path) -> anyhow::Result<Self> {
let config_content = fs::read_to_string(path)
.with_context(|| format!("failed to read cfgsync config file {}", path.display()))?;
serde_yaml::from_str(&config_content)
.with_context(|| format!("failed to parse cfgsync config file {}", path.display()))
}
}
#[derive(Debug, Deserialize)]
struct CfgSyncBundle {
nodes: Vec<CfgSyncBundleNode>,
}
#[derive(Debug, Deserialize)]
struct CfgSyncBundleNode {
identifier: String,
#[serde(default)]
files: Vec<CfgSyncFile>,
#[serde(default)]
config_yaml: Option<String>,
}
fn load_bundle(bundle_path: &Path) -> anyhow::Result<Arc<ConfigRepo>> {
let bundle = read_cfgsync_bundle(bundle_path)?;
let configs = bundle
.nodes
.into_iter()
.map(build_repo_entry)
.collect::<HashMap<_, _>>();
Ok(ConfigRepo::from_bundle(configs))
}
fn read_cfgsync_bundle(bundle_path: &Path) -> anyhow::Result<CfgSyncBundle> {
let bundle_content = fs::read_to_string(bundle_path).with_context(|| {
format!(
"failed to read cfgsync bundle file {}",
bundle_path.display()
)
})?;
serde_yaml::from_str(&bundle_content)
.with_context(|| format!("failed to parse cfgsync bundle {}", bundle_path.display()))
}
fn build_repo_entry(node: CfgSyncBundleNode) -> (String, CfgSyncPayload) {
let files = if node.files.is_empty() {
build_legacy_files(node.config_yaml)
} else {
node.files
};
(node.identifier, CfgSyncPayload::from_files(files))
}
fn build_legacy_files(config_yaml: Option<String>) -> Vec<CfgSyncFile> {
config_yaml
.map(|content| {
vec![CfgSyncFile {
path: "/config.yaml".to_owned(),
content,
}]
})
.unwrap_or_default()
}
fn resolve_bundle_path(config_path: &Path, bundle_path: &str) -> std::path::PathBuf {
let path = Path::new(bundle_path);
if path.is_absolute() {
return path.to_path_buf();
}
config_path
.parent()
.unwrap_or_else(|| Path::new("."))
.join(path)
}
pub async fn run_cfgsync_server(config_path: &Path) -> anyhow::Result<()> {
let config = CfgSyncServerConfig::load_from_file(config_path)?;
let bundle_path = resolve_bundle_path(config_path, &config.bundle_path);
let repo = load_bundle(&bundle_path)?;
let state = CfgSyncState::new(repo);
run_cfgsync(config.port, state).await?;
Ok(())
}