mirror of
https://github.com/logos-blockchain/logos-blockchain-testing.git
synced 2026-03-31 16:23:08 +00:00
Extract cfgsync into standalone crates
This commit is contained in:
parent
93161113db
commit
7da3df455f
26
Cargo.lock
generated
26
Cargo.lock
generated
@ -916,14 +916,33 @@ dependencies = [
|
|||||||
"syn 2.0.114",
|
"syn 2.0.114",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "cfgsync-adapter"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"thiserror 2.0.18",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "cfgsync-artifacts"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"serde",
|
||||||
|
"thiserror 2.0.18",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cfgsync-core"
|
name = "cfgsync-core"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"anyhow",
|
||||||
"axum",
|
"axum",
|
||||||
|
"cfgsync-artifacts",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
"serde_yaml",
|
||||||
|
"tempfile",
|
||||||
"thiserror 2.0.18",
|
"thiserror 2.0.18",
|
||||||
"tokio",
|
"tokio",
|
||||||
]
|
]
|
||||||
@ -937,8 +956,10 @@ dependencies = [
|
|||||||
"clap",
|
"clap",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_yaml",
|
"serde_yaml",
|
||||||
"testing-framework-core",
|
"tempfile",
|
||||||
|
"thiserror 2.0.18",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@ -2891,8 +2912,8 @@ version = "0.1.0"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
"cfgsync-adapter",
|
||||||
"cfgsync-core",
|
"cfgsync-core",
|
||||||
"cfgsync-runtime",
|
|
||||||
"kube",
|
"kube",
|
||||||
"logos-blockchain-http-api-common",
|
"logos-blockchain-http-api-common",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
@ -6528,6 +6549,7 @@ name = "testing-framework-core"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
"cfgsync-adapter",
|
||||||
"futures",
|
"futures",
|
||||||
"parking_lot",
|
"parking_lot",
|
||||||
"prometheus-http-query",
|
"prometheus-http-query",
|
||||||
|
|||||||
13
Cargo.toml
13
Cargo.toml
@ -1,5 +1,9 @@
|
|||||||
[workspace]
|
[workspace]
|
||||||
members = [
|
members = [
|
||||||
|
"cfgsync/adapter",
|
||||||
|
"cfgsync/artifacts",
|
||||||
|
"cfgsync/core",
|
||||||
|
"cfgsync/runtime",
|
||||||
"logos/examples",
|
"logos/examples",
|
||||||
"logos/runtime/env",
|
"logos/runtime/env",
|
||||||
"logos/runtime/ext",
|
"logos/runtime/ext",
|
||||||
@ -8,8 +12,6 @@ members = [
|
|||||||
"testing-framework/deployers/compose",
|
"testing-framework/deployers/compose",
|
||||||
"testing-framework/deployers/k8s",
|
"testing-framework/deployers/k8s",
|
||||||
"testing-framework/deployers/local",
|
"testing-framework/deployers/local",
|
||||||
"testing-framework/tools/cfgsync-core",
|
|
||||||
"testing-framework/tools/cfgsync-runtime",
|
|
||||||
]
|
]
|
||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
||||||
@ -31,7 +33,9 @@ all = "allow"
|
|||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
# Local testing framework crates
|
# Local testing framework crates
|
||||||
cfgsync-core = { default-features = false, path = "testing-framework/tools/cfgsync-core" }
|
cfgsync-adapter = { default-features = false, path = "cfgsync/adapter" }
|
||||||
|
cfgsync-artifacts = { default-features = false, path = "cfgsync/artifacts" }
|
||||||
|
cfgsync-core = { default-features = false, path = "cfgsync/core" }
|
||||||
lb-ext = { default-features = false, path = "logos/runtime/ext" }
|
lb-ext = { default-features = false, path = "logos/runtime/ext" }
|
||||||
lb-framework = { default-features = false, package = "testing_framework", git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "5ebe88a6e89ec6d7dd89e123c46f6b26dd1e4667" }
|
lb-framework = { default-features = false, package = "testing_framework", git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "5ebe88a6e89ec6d7dd89e123c46f6b26dd1e4667" }
|
||||||
lb-workloads = { default-features = false, path = "logos/runtime/workloads" }
|
lb-workloads = { default-features = false, path = "logos/runtime/workloads" }
|
||||||
@ -40,10 +44,11 @@ testing-framework-env = { default-features = false, path = "logos/run
|
|||||||
testing-framework-runner-compose = { default-features = false, path = "testing-framework/deployers/compose" }
|
testing-framework-runner-compose = { default-features = false, path = "testing-framework/deployers/compose" }
|
||||||
testing-framework-runner-k8s = { default-features = false, path = "testing-framework/deployers/k8s" }
|
testing-framework-runner-k8s = { default-features = false, path = "testing-framework/deployers/k8s" }
|
||||||
testing-framework-runner-local = { default-features = false, path = "testing-framework/deployers/local" }
|
testing-framework-runner-local = { default-features = false, path = "testing-framework/deployers/local" }
|
||||||
|
testing-framework-workflows = { default-features = false, package = "lb-workloads", path = "logos/runtime/workloads" }
|
||||||
|
|
||||||
# Logos dependencies (from logos-blockchain master @ deccbb2d2)
|
# Logos dependencies (from logos-blockchain master @ deccbb2d2)
|
||||||
broadcast-service = { default-features = false, package = "logos-blockchain-chain-broadcast-service", git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "5ebe88a6e89ec6d7dd89e123c46f6b26dd1e4667" }
|
broadcast-service = { default-features = false, package = "logos-blockchain-chain-broadcast-service", git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "5ebe88a6e89ec6d7dd89e123c46f6b26dd1e4667" }
|
||||||
cfgsync_runtime = { default-features = false, package = "cfgsync-runtime", path = "testing-framework/tools/cfgsync-runtime" }
|
cfgsync_runtime = { default-features = false, package = "cfgsync-runtime", path = "cfgsync/runtime" }
|
||||||
chain-leader = { default-features = false, features = [
|
chain-leader = { default-features = false, features = [
|
||||||
"pol-dev-mode",
|
"pol-dev-mode",
|
||||||
], package = "logos-blockchain-chain-leader-service", git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "5ebe88a6e89ec6d7dd89e123c46f6b26dd1e4667" }
|
], package = "logos-blockchain-chain-leader-service", git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "5ebe88a6e89ec6d7dd89e123c46f6b26dd1e4667" }
|
||||||
|
|||||||
16
cfgsync/adapter/Cargo.toml
Normal file
16
cfgsync/adapter/Cargo.toml
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
[package]
|
||||||
|
categories = { workspace = true }
|
||||||
|
description = { workspace = true }
|
||||||
|
edition = { workspace = true }
|
||||||
|
keywords = { workspace = true }
|
||||||
|
license = { workspace = true }
|
||||||
|
name = "cfgsync-adapter"
|
||||||
|
readme = { workspace = true }
|
||||||
|
repository = { workspace = true }
|
||||||
|
version = { workspace = true }
|
||||||
|
|
||||||
|
[lints]
|
||||||
|
workspace = true
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
thiserror = { workspace = true }
|
||||||
119
cfgsync/adapter/src/lib.rs
Normal file
119
cfgsync/adapter/src/lib.rs
Normal file
@ -0,0 +1,119 @@
|
|||||||
|
use std::error::Error;
|
||||||
|
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
/// Type-erased cfgsync adapter error used to preserve source context.
|
||||||
|
pub type DynCfgsyncError = Box<dyn Error + Send + Sync + 'static>;
|
||||||
|
|
||||||
|
/// Per-node rendered config output used to build cfgsync bundles.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct CfgsyncNodeConfig {
|
||||||
|
/// Stable node identifier resolved by the adapter.
|
||||||
|
pub identifier: String,
|
||||||
|
/// Serialized config payload for the node.
|
||||||
|
pub config_yaml: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Adapter contract for converting an application deployment model into
|
||||||
|
/// node-specific serialized config payloads.
|
||||||
|
pub trait CfgsyncEnv {
|
||||||
|
type Deployment;
|
||||||
|
type Node;
|
||||||
|
type NodeConfig;
|
||||||
|
type Error: Error + Send + Sync + 'static;
|
||||||
|
|
||||||
|
fn nodes(deployment: &Self::Deployment) -> &[Self::Node];
|
||||||
|
|
||||||
|
fn node_identifier(index: usize, node: &Self::Node) -> String;
|
||||||
|
|
||||||
|
fn build_node_config(
|
||||||
|
deployment: &Self::Deployment,
|
||||||
|
node: &Self::Node,
|
||||||
|
) -> Result<Self::NodeConfig, Self::Error>;
|
||||||
|
|
||||||
|
fn rewrite_for_hostnames(
|
||||||
|
deployment: &Self::Deployment,
|
||||||
|
node_index: usize,
|
||||||
|
hostnames: &[String],
|
||||||
|
config: &mut Self::NodeConfig,
|
||||||
|
) -> Result<(), Self::Error>;
|
||||||
|
|
||||||
|
fn serialize_node_config(config: &Self::NodeConfig) -> Result<String, Self::Error>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// High-level failures while building adapter output for cfgsync.
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum BuildCfgsyncNodesError {
|
||||||
|
#[error("cfgsync hostnames mismatch (nodes={nodes}, hostnames={hostnames})")]
|
||||||
|
HostnameCountMismatch { nodes: usize, hostnames: usize },
|
||||||
|
#[error("cfgsync adapter failed: {source}")]
|
||||||
|
Adapter {
|
||||||
|
#[source]
|
||||||
|
source: DynCfgsyncError,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
fn adapter_error<E>(source: E) -> BuildCfgsyncNodesError
|
||||||
|
where
|
||||||
|
E: Error + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
BuildCfgsyncNodesError::Adapter {
|
||||||
|
source: Box::new(source),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builds cfgsync node configs for a deployment by:
|
||||||
|
/// 1) validating hostname count,
|
||||||
|
/// 2) building each node config,
|
||||||
|
/// 3) rewriting host references,
|
||||||
|
/// 4) serializing each node payload.
|
||||||
|
pub fn build_cfgsync_node_configs<E: CfgsyncEnv>(
|
||||||
|
deployment: &E::Deployment,
|
||||||
|
hostnames: &[String],
|
||||||
|
) -> Result<Vec<CfgsyncNodeConfig>, BuildCfgsyncNodesError> {
|
||||||
|
let nodes = E::nodes(deployment);
|
||||||
|
ensure_hostname_count(nodes.len(), hostnames.len())?;
|
||||||
|
|
||||||
|
let mut output = Vec::with_capacity(nodes.len());
|
||||||
|
for (index, node) in nodes.iter().enumerate() {
|
||||||
|
output.push(build_node_entry::<E>(deployment, node, index, hostnames)?);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(output)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ensure_hostname_count(nodes: usize, hostnames: usize) -> Result<(), BuildCfgsyncNodesError> {
|
||||||
|
if nodes != hostnames {
|
||||||
|
return Err(BuildCfgsyncNodesError::HostnameCountMismatch { nodes, hostnames });
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_node_entry<E: CfgsyncEnv>(
|
||||||
|
deployment: &E::Deployment,
|
||||||
|
node: &E::Node,
|
||||||
|
index: usize,
|
||||||
|
hostnames: &[String],
|
||||||
|
) -> Result<CfgsyncNodeConfig, BuildCfgsyncNodesError> {
|
||||||
|
let node_config = build_rewritten_node_config::<E>(deployment, node, index, hostnames)?;
|
||||||
|
let config_yaml = E::serialize_node_config(&node_config).map_err(adapter_error)?;
|
||||||
|
|
||||||
|
Ok(CfgsyncNodeConfig {
|
||||||
|
identifier: E::node_identifier(index, node),
|
||||||
|
config_yaml,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_rewritten_node_config<E: CfgsyncEnv>(
|
||||||
|
deployment: &E::Deployment,
|
||||||
|
node: &E::Node,
|
||||||
|
index: usize,
|
||||||
|
hostnames: &[String],
|
||||||
|
) -> Result<E::NodeConfig, BuildCfgsyncNodesError> {
|
||||||
|
let mut node_config = E::build_node_config(deployment, node).map_err(adapter_error)?;
|
||||||
|
E::rewrite_for_hostnames(deployment, index, hostnames, &mut node_config)
|
||||||
|
.map_err(adapter_error)?;
|
||||||
|
|
||||||
|
Ok(node_config)
|
||||||
|
}
|
||||||
17
cfgsync/artifacts/Cargo.toml
Normal file
17
cfgsync/artifacts/Cargo.toml
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
[package]
|
||||||
|
categories = { workspace = true }
|
||||||
|
description = "App-agnostic cfgsync artifact model"
|
||||||
|
edition = { workspace = true }
|
||||||
|
keywords = { workspace = true }
|
||||||
|
license = { workspace = true }
|
||||||
|
name = "cfgsync-artifacts"
|
||||||
|
readme = { workspace = true }
|
||||||
|
repository = { workspace = true }
|
||||||
|
version = { workspace = true }
|
||||||
|
|
||||||
|
[lints]
|
||||||
|
workspace = true
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
serde = { workspace = true }
|
||||||
|
thiserror = { workspace = true }
|
||||||
64
cfgsync/artifacts/src/lib.rs
Normal file
64
cfgsync/artifacts/src/lib.rs
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
/// Single file artifact delivered to a node.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct ArtifactFile {
|
||||||
|
/// Destination path where content should be written.
|
||||||
|
pub path: String,
|
||||||
|
/// Raw file contents.
|
||||||
|
pub content: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ArtifactFile {
|
||||||
|
#[must_use]
|
||||||
|
pub fn new(path: impl Into<String>, content: impl Into<String>) -> Self {
|
||||||
|
Self {
|
||||||
|
path: path.into(),
|
||||||
|
content: content.into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Collection of files delivered together for one node.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
|
||||||
|
pub struct ArtifactSet {
|
||||||
|
pub files: Vec<ArtifactFile>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ArtifactSet {
|
||||||
|
#[must_use]
|
||||||
|
pub fn new(files: Vec<ArtifactFile>) -> Self {
|
||||||
|
Self { files }
|
||||||
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
self.files.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.files.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Validates that no two files target the same output path.
|
||||||
|
pub fn ensure_unique_paths(&self) -> Result<(), ArtifactValidationError> {
|
||||||
|
let mut seen = std::collections::HashSet::new();
|
||||||
|
|
||||||
|
for file in &self.files {
|
||||||
|
if !seen.insert(file.path.clone()) {
|
||||||
|
return Err(ArtifactValidationError::DuplicatePath(file.path.clone()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Validation failures for [`ArtifactSet`].
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum ArtifactValidationError {
|
||||||
|
#[error("duplicate artifact path `{0}`")]
|
||||||
|
DuplicatePath(String),
|
||||||
|
}
|
||||||
27
cfgsync/core/Cargo.toml
Normal file
27
cfgsync/core/Cargo.toml
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
[package]
|
||||||
|
categories = { workspace = true }
|
||||||
|
description = { workspace = true }
|
||||||
|
edition = { workspace = true }
|
||||||
|
keywords = { workspace = true }
|
||||||
|
license = { workspace = true }
|
||||||
|
name = "cfgsync-core"
|
||||||
|
readme = { workspace = true }
|
||||||
|
repository = { workspace = true }
|
||||||
|
version = { workspace = true }
|
||||||
|
|
||||||
|
[lints]
|
||||||
|
workspace = true
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
anyhow = "1"
|
||||||
|
axum = { default-features = false, features = ["http1", "http2", "json", "tokio"], version = "0.7.5" }
|
||||||
|
cfgsync-artifacts = { workspace = true }
|
||||||
|
reqwest = { features = ["json"], workspace = true }
|
||||||
|
serde = { default-features = false, features = ["derive"], version = "1" }
|
||||||
|
serde_json = { workspace = true }
|
||||||
|
serde_yaml = { workspace = true }
|
||||||
|
thiserror = { workspace = true }
|
||||||
|
tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" }
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
tempfile = { workspace = true }
|
||||||
26
cfgsync/core/src/bundle.rs
Normal file
26
cfgsync/core/src/bundle.rs
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use crate::CfgSyncFile;
|
||||||
|
|
||||||
|
/// Top-level cfgsync bundle containing per-node file payloads.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct CfgSyncBundle {
|
||||||
|
pub nodes: Vec<CfgSyncBundleNode>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CfgSyncBundle {
|
||||||
|
#[must_use]
|
||||||
|
pub fn new(nodes: Vec<CfgSyncBundleNode>) -> Self {
|
||||||
|
Self { nodes }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Artifact set for a single node resolved by identifier.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct CfgSyncBundleNode {
|
||||||
|
/// Stable node identifier used by cfgsync lookup.
|
||||||
|
pub identifier: String,
|
||||||
|
/// Files that should be materialized for the node.
|
||||||
|
#[serde(default)]
|
||||||
|
pub files: Vec<CfgSyncFile>,
|
||||||
|
}
|
||||||
@ -6,6 +6,7 @@ use crate::{
|
|||||||
server::ClientIp,
|
server::ClientIp,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// cfgsync client-side request/response failures.
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
pub enum ClientError {
|
pub enum ClientError {
|
||||||
#[error("request failed: {0}")]
|
#[error("request failed: {0}")]
|
||||||
@ -20,6 +21,7 @@ pub enum ClientError {
|
|||||||
Decode(serde_json::Error),
|
Decode(serde_json::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Reusable HTTP client for cfgsync server endpoints.
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct CfgSyncClient {
|
pub struct CfgSyncClient {
|
||||||
base_url: String,
|
base_url: String,
|
||||||
@ -44,6 +46,7 @@ impl CfgSyncClient {
|
|||||||
&self.base_url
|
&self.base_url
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Fetches `/node` payload for a node identifier.
|
||||||
pub async fn fetch_node_config(
|
pub async fn fetch_node_config(
|
||||||
&self,
|
&self,
|
||||||
payload: &ClientIp,
|
payload: &ClientIp,
|
||||||
@ -51,6 +54,7 @@ impl CfgSyncClient {
|
|||||||
self.post_json("/node", payload).await
|
self.post_json("/node", payload).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Fetches `/init-with-node` payload for a node identifier.
|
||||||
pub async fn fetch_init_with_node_config(
|
pub async fn fetch_init_with_node_config(
|
||||||
&self,
|
&self,
|
||||||
payload: &ClientIp,
|
payload: &ClientIp,
|
||||||
@ -58,6 +62,7 @@ impl CfgSyncClient {
|
|||||||
self.post_json("/init-with-node", payload).await
|
self.post_json("/init-with-node", payload).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Posts JSON payload to a cfgsync endpoint and decodes cfgsync payload.
|
||||||
pub async fn post_json<P: Serialize>(
|
pub async fn post_json<P: Serialize>(
|
||||||
&self,
|
&self,
|
||||||
path: &str,
|
path: &str,
|
||||||
18
cfgsync/core/src/lib.rs
Normal file
18
cfgsync/core/src/lib.rs
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
pub mod bundle;
|
||||||
|
pub mod client;
|
||||||
|
pub mod render;
|
||||||
|
pub mod repo;
|
||||||
|
pub mod server;
|
||||||
|
|
||||||
|
pub use bundle::{CfgSyncBundle, CfgSyncBundleNode};
|
||||||
|
pub use client::{CfgSyncClient, ClientError};
|
||||||
|
pub use render::{
|
||||||
|
CfgsyncConfigOverrides, CfgsyncOutputPaths, RenderedCfgsync, apply_cfgsync_overrides,
|
||||||
|
apply_timeout_floor, ensure_bundle_path, load_cfgsync_template_yaml,
|
||||||
|
render_cfgsync_yaml_from_template, write_rendered_cfgsync,
|
||||||
|
};
|
||||||
|
pub use repo::{
|
||||||
|
CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile, CfgSyncPayload,
|
||||||
|
ConfigProvider, ConfigRepo, FileConfigProvider, FileConfigProviderError, RepoResponse,
|
||||||
|
};
|
||||||
|
pub use server::{CfgSyncState, ClientIp, RunCfgsyncError, cfgsync_app, run_cfgsync};
|
||||||
@ -2,19 +2,25 @@ use std::{fs, path::Path};
|
|||||||
|
|
||||||
use anyhow::{Context as _, Result};
|
use anyhow::{Context as _, Result};
|
||||||
use serde_yaml::{Mapping, Value};
|
use serde_yaml::{Mapping, Value};
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
/// Rendered cfgsync outputs written for server startup.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct RenderedCfgsync {
|
pub struct RenderedCfgsync {
|
||||||
|
/// Serialized cfgsync server config YAML.
|
||||||
pub config_yaml: String,
|
pub config_yaml: String,
|
||||||
|
/// Serialized node bundle YAML.
|
||||||
pub bundle_yaml: String,
|
pub bundle_yaml: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Output paths used when materializing rendered cfgsync files.
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
pub struct CfgsyncOutputPaths<'a> {
|
pub struct CfgsyncOutputPaths<'a> {
|
||||||
pub config_path: &'a Path,
|
pub config_path: &'a Path,
|
||||||
pub bundle_path: &'a Path,
|
pub bundle_path: &'a Path,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Ensures bundle path override exists, defaulting to output bundle file name.
|
||||||
pub fn ensure_bundle_path(bundle_path: &mut Option<String>, output_bundle_path: &Path) {
|
pub fn ensure_bundle_path(bundle_path: &mut Option<String>, output_bundle_path: &Path) {
|
||||||
if bundle_path.is_some() {
|
if bundle_path.is_some() {
|
||||||
return;
|
return;
|
||||||
@ -29,12 +35,14 @@ pub fn ensure_bundle_path(bundle_path: &mut Option<String>, output_bundle_path:
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Applies a minimum timeout floor to an existing timeout value.
|
||||||
pub fn apply_timeout_floor(timeout: &mut u64, min_timeout_secs: Option<u64>) {
|
pub fn apply_timeout_floor(timeout: &mut u64, min_timeout_secs: Option<u64>) {
|
||||||
if let Some(min_timeout_secs) = min_timeout_secs {
|
if let Some(min_timeout_secs) = min_timeout_secs {
|
||||||
*timeout = (*timeout).max(min_timeout_secs);
|
*timeout = (*timeout).max(min_timeout_secs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Writes rendered cfgsync server and bundle YAML files.
|
||||||
pub fn write_rendered_cfgsync(
|
pub fn write_rendered_cfgsync(
|
||||||
rendered: &RenderedCfgsync,
|
rendered: &RenderedCfgsync,
|
||||||
output: CfgsyncOutputPaths<'_>,
|
output: CfgsyncOutputPaths<'_>,
|
||||||
@ -44,6 +52,7 @@ pub fn write_rendered_cfgsync(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Optional overrides applied to a cfgsync template document.
|
||||||
#[derive(Debug, Clone, Default)]
|
#[derive(Debug, Clone, Default)]
|
||||||
pub struct CfgsyncConfigOverrides {
|
pub struct CfgsyncConfigOverrides {
|
||||||
pub port: Option<u16>,
|
pub port: Option<u16>,
|
||||||
@ -53,12 +62,20 @@ pub struct CfgsyncConfigOverrides {
|
|||||||
pub metrics_otlp_ingest_url: Option<String>,
|
pub metrics_otlp_ingest_url: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
enum RenderTemplateError {
|
||||||
|
#[error("cfgsync template key `{key}` must be a YAML map")]
|
||||||
|
NonMappingEntry { key: String },
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Loads cfgsync template YAML from disk.
|
||||||
pub fn load_cfgsync_template_yaml(path: &Path) -> Result<Value> {
|
pub fn load_cfgsync_template_yaml(path: &Path) -> Result<Value> {
|
||||||
let file = fs::File::open(path)
|
let file = fs::File::open(path)
|
||||||
.with_context(|| format!("opening cfgsync template at {}", path.display()))?;
|
.with_context(|| format!("opening cfgsync template at {}", path.display()))?;
|
||||||
serde_yaml::from_reader(file).context("parsing cfgsync template")
|
serde_yaml::from_reader(file).context("parsing cfgsync template")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Renders cfgsync config YAML by applying overrides to a template document.
|
||||||
pub fn render_cfgsync_yaml_from_template(
|
pub fn render_cfgsync_yaml_from_template(
|
||||||
mut template: Value,
|
mut template: Value,
|
||||||
overrides: &CfgsyncConfigOverrides,
|
overrides: &CfgsyncConfigOverrides,
|
||||||
@ -67,6 +84,7 @@ pub fn render_cfgsync_yaml_from_template(
|
|||||||
serde_yaml::to_string(&template).context("serializing rendered cfgsync config")
|
serde_yaml::to_string(&template).context("serializing rendered cfgsync config")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Applies cfgsync-specific override fields to a mutable YAML document.
|
||||||
pub fn apply_cfgsync_overrides(
|
pub fn apply_cfgsync_overrides(
|
||||||
template: &mut Value,
|
template: &mut Value,
|
||||||
overrides: &CfgsyncConfigOverrides,
|
overrides: &CfgsyncConfigOverrides,
|
||||||
@ -105,7 +123,7 @@ pub fn apply_cfgsync_overrides(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if let Some(endpoint) = &overrides.metrics_otlp_ingest_url {
|
if let Some(endpoint) = &overrides.metrics_otlp_ingest_url {
|
||||||
let tracing_settings = nested_mapping_mut(root, "tracing_settings");
|
let tracing_settings = nested_mapping_mut(root, "tracing_settings")?;
|
||||||
tracing_settings.insert(
|
tracing_settings.insert(
|
||||||
Value::String("metrics".to_string()),
|
Value::String("metrics".to_string()),
|
||||||
parse_otlp_metrics_layer(endpoint)?,
|
parse_otlp_metrics_layer(endpoint)?,
|
||||||
@ -121,19 +139,20 @@ fn mapping_mut(value: &mut Value) -> Result<&mut Mapping> {
|
|||||||
.context("cfgsync template root must be a YAML map")
|
.context("cfgsync template root must be a YAML map")
|
||||||
}
|
}
|
||||||
|
|
||||||
fn nested_mapping_mut<'a>(mapping: &'a mut Mapping, key: &str) -> &'a mut Mapping {
|
fn nested_mapping_mut<'a>(mapping: &'a mut Mapping, key: &str) -> Result<&'a mut Mapping> {
|
||||||
let key = Value::String(key.to_string());
|
let key_name = key.to_owned();
|
||||||
|
let key = Value::String(key_name.clone());
|
||||||
let entry = mapping
|
let entry = mapping
|
||||||
.entry(key)
|
.entry(key)
|
||||||
.or_insert_with(|| Value::Mapping(Mapping::new()));
|
.or_insert_with(|| Value::Mapping(Mapping::new()));
|
||||||
|
|
||||||
if !entry.is_mapping() {
|
if !entry.is_mapping() {
|
||||||
*entry = Value::Mapping(Mapping::new());
|
return Err(RenderTemplateError::NonMappingEntry { key: key_name }).map_err(Into::into);
|
||||||
}
|
}
|
||||||
|
|
||||||
entry
|
entry
|
||||||
.as_mapping_mut()
|
.as_mapping_mut()
|
||||||
.expect("mapping entry should always be a mapping")
|
.context("cfgsync template entry should be a YAML map")
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parse_otlp_metrics_layer(endpoint: &str) -> Result<Value> {
|
fn parse_otlp_metrics_layer(endpoint: &str) -> Result<Value> {
|
||||||
233
cfgsync/core/src/repo.rs
Normal file
233
cfgsync/core/src/repo.rs
Normal file
@ -0,0 +1,233 @@
|
|||||||
|
use std::{collections::HashMap, fs, path::Path, sync::Arc};
|
||||||
|
|
||||||
|
use cfgsync_artifacts::ArtifactFile;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
use crate::{CfgSyncBundle, CfgSyncBundleNode};
|
||||||
|
|
||||||
|
/// Schema version served by cfgsync payload responses.
|
||||||
|
pub const CFGSYNC_SCHEMA_VERSION: u16 = 1;
|
||||||
|
|
||||||
|
/// Canonical cfgsync file type used in payloads and bundles.
|
||||||
|
pub type CfgSyncFile = ArtifactFile;
|
||||||
|
|
||||||
|
/// Payload returned by cfgsync server for one node.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct CfgSyncPayload {
|
||||||
|
/// Payload schema version for compatibility checks.
|
||||||
|
pub schema_version: u16,
|
||||||
|
/// Files that must be written on the target node.
|
||||||
|
#[serde(default)]
|
||||||
|
pub files: Vec<CfgSyncFile>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CfgSyncPayload {
|
||||||
|
#[must_use]
|
||||||
|
pub fn from_files(files: Vec<CfgSyncFile>) -> Self {
|
||||||
|
Self {
|
||||||
|
schema_version: CFGSYNC_SCHEMA_VERSION,
|
||||||
|
files,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
|
pub fn files(&self) -> &[CfgSyncFile] {
|
||||||
|
&self.files
|
||||||
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.files.is_empty()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum CfgSyncErrorCode {
|
||||||
|
MissingConfig,
|
||||||
|
Internal,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Structured error body returned by cfgsync server.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, Error)]
|
||||||
|
#[error("{code:?}: {message}")]
|
||||||
|
pub struct CfgSyncErrorResponse {
|
||||||
|
pub code: CfgSyncErrorCode,
|
||||||
|
pub message: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CfgSyncErrorResponse {
|
||||||
|
#[must_use]
|
||||||
|
pub fn missing_config(identifier: &str) -> Self {
|
||||||
|
Self {
|
||||||
|
code: CfgSyncErrorCode::MissingConfig,
|
||||||
|
message: format!("missing config for host {identifier}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[must_use]
|
||||||
|
pub fn internal(message: impl Into<String>) -> Self {
|
||||||
|
Self {
|
||||||
|
code: CfgSyncErrorCode::Internal,
|
||||||
|
message: message.into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Repository resolution outcome for a requested node identifier.
|
||||||
|
pub enum RepoResponse {
|
||||||
|
Config(CfgSyncPayload),
|
||||||
|
Error(CfgSyncErrorResponse),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read-only source for cfgsync node payloads.
|
||||||
|
pub trait ConfigProvider: Send + Sync {
|
||||||
|
fn resolve(&self, identifier: &str) -> RepoResponse;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// In-memory map-backed provider used by cfgsync server state.
|
||||||
|
pub struct ConfigRepo {
|
||||||
|
configs: HashMap<String, CfgSyncPayload>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConfigRepo {
|
||||||
|
#[must_use]
|
||||||
|
pub fn from_bundle(configs: HashMap<String, CfgSyncPayload>) -> Arc<Self> {
|
||||||
|
Arc::new(Self { configs })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConfigProvider for ConfigRepo {
|
||||||
|
fn resolve(&self, identifier: &str) -> RepoResponse {
|
||||||
|
self.configs.get(identifier).cloned().map_or_else(
|
||||||
|
|| RepoResponse::Error(CfgSyncErrorResponse::missing_config(identifier)),
|
||||||
|
RepoResponse::Config,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Failures when loading a file-backed cfgsync provider.
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum FileConfigProviderError {
|
||||||
|
#[error("failed to read cfgsync bundle at {path}: {source}")]
|
||||||
|
Read {
|
||||||
|
path: String,
|
||||||
|
#[source]
|
||||||
|
source: std::io::Error,
|
||||||
|
},
|
||||||
|
#[error("failed to parse cfgsync bundle at {path}: {source}")]
|
||||||
|
Parse {
|
||||||
|
path: String,
|
||||||
|
#[source]
|
||||||
|
source: serde_yaml::Error,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
/// YAML bundle-backed provider implementation.
|
||||||
|
pub struct FileConfigProvider {
|
||||||
|
inner: ConfigRepo,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FileConfigProvider {
|
||||||
|
/// Loads provider state from a cfgsync bundle YAML file.
|
||||||
|
pub fn from_yaml_file(path: &Path) -> Result<Self, FileConfigProviderError> {
|
||||||
|
let raw = fs::read_to_string(path).map_err(|source| FileConfigProviderError::Read {
|
||||||
|
path: path.display().to_string(),
|
||||||
|
source,
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let bundle: CfgSyncBundle =
|
||||||
|
serde_yaml::from_str(&raw).map_err(|source| FileConfigProviderError::Parse {
|
||||||
|
path: path.display().to_string(),
|
||||||
|
source,
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let configs = bundle
|
||||||
|
.nodes
|
||||||
|
.into_iter()
|
||||||
|
.map(payload_from_bundle_node)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
inner: ConfigRepo { configs },
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConfigProvider for FileConfigProvider {
|
||||||
|
fn resolve(&self, identifier: &str) -> RepoResponse {
|
||||||
|
self.inner.resolve(identifier)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn payload_from_bundle_node(node: CfgSyncBundleNode) -> (String, CfgSyncPayload) {
|
||||||
|
(node.identifier, CfgSyncPayload::from_files(node.files))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use std::io::Write as _;
|
||||||
|
|
||||||
|
use tempfile::NamedTempFile;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
fn sample_payload() -> CfgSyncPayload {
|
||||||
|
CfgSyncPayload::from_files(vec![CfgSyncFile::new("/config.yaml", "key: value")])
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn resolves_existing_identifier() {
|
||||||
|
let mut configs = HashMap::new();
|
||||||
|
configs.insert("node-1".to_owned(), sample_payload());
|
||||||
|
let repo = ConfigRepo { configs };
|
||||||
|
|
||||||
|
match repo.resolve("node-1") {
|
||||||
|
RepoResponse::Config(payload) => {
|
||||||
|
assert_eq!(payload.schema_version, CFGSYNC_SCHEMA_VERSION);
|
||||||
|
assert_eq!(payload.files.len(), 1);
|
||||||
|
assert_eq!(payload.files[0].path, "/config.yaml");
|
||||||
|
}
|
||||||
|
RepoResponse::Error(error) => panic!("expected config response, got {error}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn reports_missing_identifier() {
|
||||||
|
let repo = ConfigRepo {
|
||||||
|
configs: HashMap::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
match repo.resolve("unknown-node") {
|
||||||
|
RepoResponse::Config(_) => panic!("expected missing-config error"),
|
||||||
|
RepoResponse::Error(error) => {
|
||||||
|
assert!(matches!(error.code, CfgSyncErrorCode::MissingConfig));
|
||||||
|
assert!(error.message.contains("unknown-node"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn loads_file_provider_bundle() {
|
||||||
|
let mut bundle_file = NamedTempFile::new().expect("create temp bundle");
|
||||||
|
let yaml = r#"
|
||||||
|
nodes:
|
||||||
|
- identifier: node-1
|
||||||
|
files:
|
||||||
|
- path: /config.yaml
|
||||||
|
content: "a: 1"
|
||||||
|
"#;
|
||||||
|
bundle_file
|
||||||
|
.write_all(yaml.as_bytes())
|
||||||
|
.expect("write bundle yaml");
|
||||||
|
|
||||||
|
let provider =
|
||||||
|
FileConfigProvider::from_yaml_file(bundle_file.path()).expect("load file provider");
|
||||||
|
|
||||||
|
match provider.resolve("node-1") {
|
||||||
|
RepoResponse::Config(payload) => assert_eq!(payload.files.len(), 1),
|
||||||
|
RepoResponse::Error(error) => panic!("expected config, got {error}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
172
cfgsync/core/src/server.rs
Normal file
172
cfgsync/core/src/server.rs
Normal file
@ -0,0 +1,172 @@
|
|||||||
|
use std::{io, net::Ipv4Addr, sync::Arc};
|
||||||
|
|
||||||
|
use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::post};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
use crate::repo::{CfgSyncErrorCode, ConfigProvider, RepoResponse};
|
||||||
|
|
||||||
|
/// Request payload used by cfgsync client for node config resolution.
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
pub struct ClientIp {
|
||||||
|
/// Node IP that can be used by clients for observability/logging.
|
||||||
|
pub ip: Ipv4Addr,
|
||||||
|
/// Stable node identifier used as key in cfgsync bundle lookup.
|
||||||
|
pub identifier: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Runtime state shared across cfgsync HTTP handlers.
|
||||||
|
pub struct CfgSyncState {
|
||||||
|
repo: Arc<dyn ConfigProvider>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CfgSyncState {
|
||||||
|
#[must_use]
|
||||||
|
pub fn new(repo: Arc<dyn ConfigProvider>) -> Self {
|
||||||
|
Self { repo }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fatal runtime failures when serving cfgsync HTTP endpoints.
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum RunCfgsyncError {
|
||||||
|
#[error("failed to bind cfgsync server on {bind_addr}: {source}")]
|
||||||
|
Bind {
|
||||||
|
bind_addr: String,
|
||||||
|
#[source]
|
||||||
|
source: io::Error,
|
||||||
|
},
|
||||||
|
#[error("cfgsync server terminated unexpectedly: {source}")]
|
||||||
|
Serve {
|
||||||
|
#[source]
|
||||||
|
source: io::Error,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn node_config(
|
||||||
|
State(state): State<Arc<CfgSyncState>>,
|
||||||
|
Json(payload): Json<ClientIp>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
let response = resolve_node_config_response(&state, &payload.identifier);
|
||||||
|
|
||||||
|
match response {
|
||||||
|
RepoResponse::Config(payload_data) => (StatusCode::OK, Json(payload_data)).into_response(),
|
||||||
|
RepoResponse::Error(error) => {
|
||||||
|
let status = error_status(&error.code);
|
||||||
|
|
||||||
|
(status, Json(error)).into_response()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn resolve_node_config_response(state: &CfgSyncState, identifier: &str) -> RepoResponse {
|
||||||
|
state.repo.resolve(identifier)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn error_status(code: &CfgSyncErrorCode) -> StatusCode {
|
||||||
|
match code {
|
||||||
|
CfgSyncErrorCode::MissingConfig => StatusCode::NOT_FOUND,
|
||||||
|
CfgSyncErrorCode::Internal => StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn cfgsync_app(state: CfgSyncState) -> Router {
|
||||||
|
Router::new()
|
||||||
|
.route("/node", post(node_config))
|
||||||
|
.route("/init-with-node", post(node_config))
|
||||||
|
.with_state(Arc::new(state))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Runs cfgsync HTTP server on the provided port until shutdown/error.
|
||||||
|
pub async fn run_cfgsync(port: u16, state: CfgSyncState) -> Result<(), RunCfgsyncError> {
|
||||||
|
let app = cfgsync_app(state);
|
||||||
|
println!("Server running on http://0.0.0.0:{port}");
|
||||||
|
|
||||||
|
let bind_addr = format!("0.0.0.0:{port}");
|
||||||
|
let listener = tokio::net::TcpListener::bind(&bind_addr)
|
||||||
|
.await
|
||||||
|
.map_err(|source| RunCfgsyncError::Bind { bind_addr, source })?;
|
||||||
|
|
||||||
|
axum::serve(listener, app)
|
||||||
|
.await
|
||||||
|
.map_err(|source| RunCfgsyncError::Serve { source })?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use std::{collections::HashMap, sync::Arc};
|
||||||
|
|
||||||
|
use axum::{Json, extract::State, http::StatusCode, response::IntoResponse};
|
||||||
|
|
||||||
|
use super::{CfgSyncState, ClientIp, node_config};
|
||||||
|
use crate::repo::{
|
||||||
|
CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile,
|
||||||
|
CfgSyncPayload, ConfigProvider, RepoResponse,
|
||||||
|
};
|
||||||
|
|
||||||
|
struct StaticProvider {
|
||||||
|
data: HashMap<String, CfgSyncPayload>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConfigProvider for StaticProvider {
|
||||||
|
fn resolve(&self, identifier: &str) -> RepoResponse {
|
||||||
|
self.data.get(identifier).cloned().map_or_else(
|
||||||
|
|| RepoResponse::Error(CfgSyncErrorResponse::missing_config(identifier)),
|
||||||
|
RepoResponse::Config,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sample_payload() -> CfgSyncPayload {
|
||||||
|
CfgSyncPayload {
|
||||||
|
schema_version: CFGSYNC_SCHEMA_VERSION,
|
||||||
|
files: vec![CfgSyncFile::new("/app-config.yaml", "app: test")],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn node_config_resolves_from_non_tf_provider() {
|
||||||
|
let mut data = HashMap::new();
|
||||||
|
data.insert("node-a".to_owned(), sample_payload());
|
||||||
|
|
||||||
|
let provider = Arc::new(StaticProvider { data });
|
||||||
|
let state = Arc::new(CfgSyncState::new(provider));
|
||||||
|
let payload = ClientIp {
|
||||||
|
ip: "127.0.0.1".parse().expect("valid ip"),
|
||||||
|
identifier: "node-a".to_owned(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let response = node_config(State(state), Json(payload))
|
||||||
|
.await
|
||||||
|
.into_response();
|
||||||
|
|
||||||
|
assert_eq!(response.status(), StatusCode::OK);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn node_config_returns_not_found_for_unknown_identifier() {
|
||||||
|
let provider = Arc::new(StaticProvider {
|
||||||
|
data: HashMap::new(),
|
||||||
|
});
|
||||||
|
let state = Arc::new(CfgSyncState::new(provider));
|
||||||
|
let payload = ClientIp {
|
||||||
|
ip: "127.0.0.1".parse().expect("valid ip"),
|
||||||
|
identifier: "missing-node".to_owned(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let response = node_config(State(state), Json(payload))
|
||||||
|
.await
|
||||||
|
.into_response();
|
||||||
|
|
||||||
|
assert_eq!(response.status(), StatusCode::NOT_FOUND);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn missing_config_error_uses_expected_code() {
|
||||||
|
let error = CfgSyncErrorResponse::missing_config("missing-node");
|
||||||
|
|
||||||
|
assert!(matches!(error.code, CfgSyncErrorCode::MissingConfig));
|
||||||
|
}
|
||||||
|
}
|
||||||
26
cfgsync/runtime/Cargo.toml
Normal file
26
cfgsync/runtime/Cargo.toml
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
[package]
|
||||||
|
categories = { workspace = true }
|
||||||
|
description = { workspace = true }
|
||||||
|
edition = { workspace = true }
|
||||||
|
keywords = { workspace = true }
|
||||||
|
license = { workspace = true }
|
||||||
|
name = "cfgsync-runtime"
|
||||||
|
readme = { workspace = true }
|
||||||
|
repository = { workspace = true }
|
||||||
|
version = { workspace = true }
|
||||||
|
|
||||||
|
[lints]
|
||||||
|
workspace = true
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
anyhow = "1"
|
||||||
|
cfgsync-core = { workspace = true }
|
||||||
|
clap = { version = "4", features = ["derive"] }
|
||||||
|
serde = { workspace = true }
|
||||||
|
serde_yaml = { workspace = true }
|
||||||
|
thiserror = { workspace = true }
|
||||||
|
tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" }
|
||||||
|
tracing = { workspace = true }
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
tempfile = { workspace = true }
|
||||||
205
cfgsync/runtime/src/client.rs
Normal file
205
cfgsync/runtime/src/client.rs
Normal file
@ -0,0 +1,205 @@
|
|||||||
|
use std::{
|
||||||
|
env, fs,
|
||||||
|
net::Ipv4Addr,
|
||||||
|
path::{Path, PathBuf},
|
||||||
|
};
|
||||||
|
|
||||||
|
use anyhow::{Context as _, Result, bail};
|
||||||
|
use cfgsync_core::{CFGSYNC_SCHEMA_VERSION, CfgSyncClient, CfgSyncFile, CfgSyncPayload, ClientIp};
|
||||||
|
use thiserror::Error;
|
||||||
|
use tokio::time::{Duration, sleep};
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
const FETCH_ATTEMPTS: usize = 5;
|
||||||
|
const FETCH_RETRY_DELAY: Duration = Duration::from_millis(250);
|
||||||
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
enum ClientEnvError {
|
||||||
|
#[error("CFG_HOST_IP `{value}` is not a valid IPv4 address")]
|
||||||
|
InvalidIp { value: String },
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fetch_with_retry(payload: &ClientIp, server_addr: &str) -> Result<CfgSyncPayload> {
|
||||||
|
let client = CfgSyncClient::new(server_addr);
|
||||||
|
|
||||||
|
for attempt in 1..=FETCH_ATTEMPTS {
|
||||||
|
match fetch_once(&client, payload).await {
|
||||||
|
Ok(config) => return Ok(config),
|
||||||
|
Err(error) => {
|
||||||
|
if attempt == FETCH_ATTEMPTS {
|
||||||
|
return Err(error).with_context(|| {
|
||||||
|
format!("fetching cfgsync payload after {attempt} attempts")
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
sleep(FETCH_RETRY_DELAY).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unreachable!("cfgsync fetch loop always returns before exhausting attempts");
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fetch_once(client: &CfgSyncClient, payload: &ClientIp) -> Result<CfgSyncPayload> {
|
||||||
|
let response = client.fetch_node_config(payload).await?;
|
||||||
|
|
||||||
|
Ok(response)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn pull_config_files(payload: ClientIp, server_addr: &str) -> Result<()> {
|
||||||
|
let config = fetch_with_retry(&payload, server_addr)
|
||||||
|
.await
|
||||||
|
.context("fetching cfgsync node config")?;
|
||||||
|
ensure_schema_version(&config)?;
|
||||||
|
|
||||||
|
let files = collect_payload_files(&config)?;
|
||||||
|
|
||||||
|
for file in files {
|
||||||
|
write_cfgsync_file(file)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(files = files.len(), "cfgsync files saved");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ensure_schema_version(config: &CfgSyncPayload) -> Result<()> {
|
||||||
|
if config.schema_version != CFGSYNC_SCHEMA_VERSION {
|
||||||
|
bail!(
|
||||||
|
"unsupported cfgsync payload schema version {}, expected {}",
|
||||||
|
config.schema_version,
|
||||||
|
CFGSYNC_SCHEMA_VERSION
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn collect_payload_files(config: &CfgSyncPayload) -> Result<&[CfgSyncFile]> {
|
||||||
|
if config.is_empty() {
|
||||||
|
bail!("cfgsync payload contains no files");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(config.files())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write_cfgsync_file(file: &CfgSyncFile) -> Result<()> {
|
||||||
|
let path = PathBuf::from(&file.path);
|
||||||
|
|
||||||
|
ensure_parent_dir(&path)?;
|
||||||
|
|
||||||
|
fs::write(&path, &file.content).with_context(|| format!("writing {}", path.display()))?;
|
||||||
|
|
||||||
|
info!(path = %path.display(), "cfgsync file saved");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ensure_parent_dir(path: &Path) -> Result<()> {
|
||||||
|
let Some(parent) = path.parent() else {
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
|
||||||
|
if parent.as_os_str().is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
fs::create_dir_all(parent)
|
||||||
|
.with_context(|| format!("creating parent directory {}", parent.display()))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Resolves cfgsync client inputs from environment and materializes node files.
|
||||||
|
pub async fn run_cfgsync_client_from_env(default_port: u16) -> Result<()> {
|
||||||
|
let server_addr =
|
||||||
|
env::var("CFG_SERVER_ADDR").unwrap_or_else(|_| format!("http://127.0.0.1:{default_port}"));
|
||||||
|
let ip = parse_ip_env(&env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_owned()))?;
|
||||||
|
let identifier =
|
||||||
|
env::var("CFG_HOST_IDENTIFIER").unwrap_or_else(|_| "unidentified-node".to_owned());
|
||||||
|
|
||||||
|
pull_config_files(ClientIp { ip, identifier }, &server_addr).await
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_ip_env(ip_str: &str) -> Result<Ipv4Addr> {
|
||||||
|
ip_str
|
||||||
|
.parse()
|
||||||
|
.map_err(|_| ClientEnvError::InvalidIp {
|
||||||
|
value: ip_str.to_owned(),
|
||||||
|
})
|
||||||
|
.map_err(Into::into)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use cfgsync_core::{
|
||||||
|
CfgSyncBundle, CfgSyncBundleNode, CfgSyncPayload, CfgSyncState, ConfigRepo, run_cfgsync,
|
||||||
|
};
|
||||||
|
use tempfile::tempdir;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn client_materializes_multi_file_payload_from_cfgsync_server() {
|
||||||
|
let dir = tempdir().expect("create temp dir");
|
||||||
|
let app_config_path = dir.path().join("config.yaml");
|
||||||
|
let deployment_path = dir.path().join("deployment.yaml");
|
||||||
|
|
||||||
|
let bundle = CfgSyncBundle::new(vec![CfgSyncBundleNode {
|
||||||
|
identifier: "node-1".to_owned(),
|
||||||
|
files: vec![
|
||||||
|
CfgSyncFile::new(app_config_path.to_string_lossy(), "app_key: app_value"),
|
||||||
|
CfgSyncFile::new(deployment_path.to_string_lossy(), "mode: local"),
|
||||||
|
],
|
||||||
|
}]);
|
||||||
|
|
||||||
|
let repo = ConfigRepo::from_bundle(bundle_to_payload_map(bundle));
|
||||||
|
let state = CfgSyncState::new(repo);
|
||||||
|
let port = allocate_test_port();
|
||||||
|
let address = format!("http://127.0.0.1:{port}");
|
||||||
|
let server = tokio::spawn(async move {
|
||||||
|
run_cfgsync(port, state).await.expect("run cfgsync server");
|
||||||
|
});
|
||||||
|
|
||||||
|
pull_config_files(
|
||||||
|
ClientIp {
|
||||||
|
ip: "127.0.0.1".parse().expect("parse ip"),
|
||||||
|
identifier: "node-1".to_owned(),
|
||||||
|
},
|
||||||
|
&address,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.expect("pull config files");
|
||||||
|
|
||||||
|
server.abort();
|
||||||
|
let _ = server.await;
|
||||||
|
|
||||||
|
let app_config = fs::read_to_string(&app_config_path).expect("read app config");
|
||||||
|
let deployment = fs::read_to_string(&deployment_path).expect("read deployment config");
|
||||||
|
|
||||||
|
assert_eq!(app_config, "app_key: app_value");
|
||||||
|
assert_eq!(deployment, "mode: local");
|
||||||
|
}
|
||||||
|
|
||||||
|
fn bundle_to_payload_map(bundle: CfgSyncBundle) -> HashMap<String, CfgSyncPayload> {
|
||||||
|
bundle
|
||||||
|
.nodes
|
||||||
|
.into_iter()
|
||||||
|
.map(|node| {
|
||||||
|
let CfgSyncBundleNode { identifier, files } = node;
|
||||||
|
|
||||||
|
(identifier, CfgSyncPayload::from_files(files))
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn allocate_test_port() -> u16 {
|
||||||
|
let listener =
|
||||||
|
std::net::TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port for test");
|
||||||
|
let port = listener.local_addr().expect("read local addr").port();
|
||||||
|
drop(listener);
|
||||||
|
port
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,6 +1,3 @@
|
|||||||
pub mod bundle;
|
|
||||||
pub mod render;
|
|
||||||
|
|
||||||
pub use cfgsync_core as core;
|
pub use cfgsync_core as core;
|
||||||
|
|
||||||
mod client;
|
mod client;
|
||||||
59
cfgsync/runtime/src/server.rs
Normal file
59
cfgsync/runtime/src/server.rs
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
use std::{fs, path::Path, sync::Arc};
|
||||||
|
|
||||||
|
use anyhow::Context as _;
|
||||||
|
use cfgsync_core::{CfgSyncState, ConfigProvider, FileConfigProvider, run_cfgsync};
|
||||||
|
use serde::Deserialize;
|
||||||
|
|
||||||
|
/// Runtime cfgsync server config loaded from YAML.
|
||||||
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
|
pub struct CfgSyncServerConfig {
|
||||||
|
pub port: u16,
|
||||||
|
pub bundle_path: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CfgSyncServerConfig {
|
||||||
|
/// Loads cfgsync runtime server config from a YAML file.
|
||||||
|
pub fn load_from_file(path: &Path) -> anyhow::Result<Self> {
|
||||||
|
let config_content = fs::read_to_string(path)
|
||||||
|
.with_context(|| format!("failed to read cfgsync config file {}", path.display()))?;
|
||||||
|
|
||||||
|
serde_yaml::from_str(&config_content)
|
||||||
|
.with_context(|| format!("failed to parse cfgsync config file {}", path.display()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn load_bundle(bundle_path: &Path) -> anyhow::Result<Arc<dyn ConfigProvider>> {
|
||||||
|
let provider = FileConfigProvider::from_yaml_file(bundle_path)
|
||||||
|
.with_context(|| format!("loading cfgsync provider from {}", bundle_path.display()))?;
|
||||||
|
|
||||||
|
Ok(Arc::new(provider))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn resolve_bundle_path(config_path: &Path, bundle_path: &str) -> std::path::PathBuf {
|
||||||
|
let path = Path::new(bundle_path);
|
||||||
|
if path.is_absolute() {
|
||||||
|
return path.to_path_buf();
|
||||||
|
}
|
||||||
|
|
||||||
|
config_path
|
||||||
|
.parent()
|
||||||
|
.unwrap_or_else(|| Path::new("."))
|
||||||
|
.join(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Loads runtime config and starts cfgsync HTTP server process.
|
||||||
|
pub async fn run_cfgsync_server(config_path: &Path) -> anyhow::Result<()> {
|
||||||
|
let config = CfgSyncServerConfig::load_from_file(config_path)?;
|
||||||
|
let bundle_path = resolve_bundle_path(config_path, &config.bundle_path);
|
||||||
|
|
||||||
|
let state = build_server_state(&bundle_path)?;
|
||||||
|
run_cfgsync(config.port, state).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_server_state(bundle_path: &Path) -> anyhow::Result<CfgSyncState> {
|
||||||
|
let repo = load_bundle(bundle_path)?;
|
||||||
|
|
||||||
|
Ok(CfgSyncState::new(repo))
|
||||||
|
}
|
||||||
@ -2,10 +2,10 @@
|
|||||||
set -euo pipefail
|
set -euo pipefail
|
||||||
|
|
||||||
RUSTFLAGS='--cfg feature="pol-dev-mode"' \
|
RUSTFLAGS='--cfg feature="pol-dev-mode"' \
|
||||||
cargo build --manifest-path /workspace/testing-framework/tools/cfgsync-runtime/Cargo.toml --bin cfgsync-server
|
cargo build --manifest-path /workspace/cfgsync/runtime/Cargo.toml --bin cfgsync-server
|
||||||
|
|
||||||
RUSTFLAGS='--cfg feature="pol-dev-mode"' \
|
RUSTFLAGS='--cfg feature="pol-dev-mode"' \
|
||||||
cargo build --manifest-path /workspace/testing-framework/tools/cfgsync-runtime/Cargo.toml --bin cfgsync-client
|
cargo build --manifest-path /workspace/cfgsync/runtime/Cargo.toml --bin cfgsync-client
|
||||||
|
|
||||||
cp /workspace/target/debug/cfgsync-server /workspace/artifacts/cfgsync-server
|
cp /workspace/target/debug/cfgsync-server /workspace/artifacts/cfgsync-server
|
||||||
cp /workspace/target/debug/cfgsync-client /workspace/artifacts/cfgsync-client
|
cp /workspace/target/debug/cfgsync-client /workspace/artifacts/cfgsync-client
|
||||||
|
|||||||
@ -7,8 +7,8 @@ version = { workspace = true }
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
# Workspace crates
|
# Workspace crates
|
||||||
|
cfgsync-adapter = { workspace = true }
|
||||||
cfgsync-core = { workspace = true }
|
cfgsync-core = { workspace = true }
|
||||||
cfgsync_runtime = { workspace = true }
|
|
||||||
lb-framework = { workspace = true }
|
lb-framework = { workspace = true }
|
||||||
testing-framework-core = { workspace = true }
|
testing-framework-core = { workspace = true }
|
||||||
testing-framework-env = { workspace = true }
|
testing-framework-env = { workspace = true }
|
||||||
|
|||||||
@ -1,7 +1,8 @@
|
|||||||
use anyhow::{Result, anyhow};
|
use anyhow::Result;
|
||||||
pub(crate) use cfgsync_runtime::render::CfgsyncOutputPaths;
|
use cfgsync_adapter::{CfgsyncEnv, build_cfgsync_node_configs};
|
||||||
use cfgsync_runtime::{
|
pub(crate) use cfgsync_core::render::CfgsyncOutputPaths;
|
||||||
bundle::{CfgSyncBundle, CfgSyncBundleNode, build_cfgsync_bundle_with_hostnames},
|
use cfgsync_core::{
|
||||||
|
CfgSyncBundle, CfgSyncBundleNode,
|
||||||
render::{
|
render::{
|
||||||
CfgsyncConfigOverrides, RenderedCfgsync, ensure_bundle_path,
|
CfgsyncConfigOverrides, RenderedCfgsync, ensure_bundle_path,
|
||||||
render_cfgsync_yaml_from_template, write_rendered_cfgsync,
|
render_cfgsync_yaml_from_template, write_rendered_cfgsync,
|
||||||
@ -9,7 +10,7 @@ use cfgsync_runtime::{
|
|||||||
};
|
};
|
||||||
use reqwest::Url;
|
use reqwest::Url;
|
||||||
use serde_yaml::{Mapping, Value};
|
use serde_yaml::{Mapping, Value};
|
||||||
use testing_framework_core::cfgsync::CfgsyncEnv;
|
use thiserror::Error;
|
||||||
|
|
||||||
pub(crate) struct CfgsyncRenderOptions {
|
pub(crate) struct CfgsyncRenderOptions {
|
||||||
pub port: Option<u16>,
|
pub port: Option<u16>,
|
||||||
@ -18,6 +19,14 @@ pub(crate) struct CfgsyncRenderOptions {
|
|||||||
pub metrics_otlp_ingest_url: Option<Url>,
|
pub metrics_otlp_ingest_url: Option<Url>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
enum BundleRenderError {
|
||||||
|
#[error("cfgsync bundle node `{identifier}` is missing `/config.yaml`")]
|
||||||
|
MissingConfigFile { identifier: String },
|
||||||
|
#[error("cfgsync config file is missing `{key}`")]
|
||||||
|
MissingYamlKey { key: String },
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn render_cfgsync_from_template<E: CfgsyncEnv>(
|
pub(crate) fn render_cfgsync_from_template<E: CfgsyncEnv>(
|
||||||
topology: &E::Deployment,
|
topology: &E::Deployment,
|
||||||
hostnames: &[String],
|
hostnames: &[String],
|
||||||
@ -26,7 +35,7 @@ pub(crate) fn render_cfgsync_from_template<E: CfgsyncEnv>(
|
|||||||
let cfg = build_cfgsync_server_config();
|
let cfg = build_cfgsync_server_config();
|
||||||
let overrides = build_overrides::<E>(topology, options);
|
let overrides = build_overrides::<E>(topology, options);
|
||||||
let config_yaml = render_cfgsync_yaml_from_template(cfg, &overrides)?;
|
let config_yaml = render_cfgsync_yaml_from_template(cfg, &overrides)?;
|
||||||
let mut bundle = build_cfgsync_bundle_with_hostnames::<E>(topology, hostnames)?;
|
let mut bundle = build_cfgsync_bundle::<E>(topology, hostnames)?;
|
||||||
append_deployment_files(&mut bundle)?;
|
append_deployment_files(&mut bundle)?;
|
||||||
let bundle_yaml = serde_yaml::to_string(&bundle)?;
|
let bundle_yaml = serde_yaml::to_string(&bundle)?;
|
||||||
|
|
||||||
@ -36,14 +45,32 @@ pub(crate) fn render_cfgsync_from_template<E: CfgsyncEnv>(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn build_cfgsync_bundle<E: CfgsyncEnv>(
|
||||||
|
topology: &E::Deployment,
|
||||||
|
hostnames: &[String],
|
||||||
|
) -> Result<CfgSyncBundle> {
|
||||||
|
let nodes = build_cfgsync_node_configs::<E>(topology, hostnames)?;
|
||||||
|
let nodes = nodes
|
||||||
|
.into_iter()
|
||||||
|
.map(|node| CfgSyncBundleNode {
|
||||||
|
identifier: node.identifier,
|
||||||
|
files: vec![build_bundle_file("/config.yaml", node.config_yaml)],
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
Ok(CfgSyncBundle::new(nodes))
|
||||||
|
}
|
||||||
|
|
||||||
fn append_deployment_files(bundle: &mut CfgSyncBundle) -> Result<()> {
|
fn append_deployment_files(bundle: &mut CfgSyncBundle) -> Result<()> {
|
||||||
for node in &mut bundle.nodes {
|
for node in &mut bundle.nodes {
|
||||||
if has_file_path(node, "/deployment.yaml") {
|
if has_file_path(node, "/deployment.yaml") {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let config_content = config_file_content(node)
|
let config_content =
|
||||||
.ok_or_else(|| anyhow!("cfgsync bundle node missing /config.yaml"))?;
|
config_file_content(node).ok_or_else(|| BundleRenderError::MissingConfigFile {
|
||||||
|
identifier: node.identifier.clone(),
|
||||||
|
})?;
|
||||||
let deployment_yaml = extract_yaml_key(&config_content, "deployment")?;
|
let deployment_yaml = extract_yaml_key(&config_content, "deployment")?;
|
||||||
|
|
||||||
node.files
|
node.files
|
||||||
@ -75,7 +102,9 @@ fn extract_yaml_key(content: &str, key: &str) -> Result<String> {
|
|||||||
let value = document
|
let value = document
|
||||||
.get(key)
|
.get(key)
|
||||||
.cloned()
|
.cloned()
|
||||||
.ok_or_else(|| anyhow!("config yaml missing `{key}`"))?;
|
.ok_or_else(|| BundleRenderError::MissingYamlKey {
|
||||||
|
key: key.to_owned(),
|
||||||
|
})?;
|
||||||
|
|
||||||
Ok(serde_yaml::to_string(&value)?)
|
Ok(serde_yaml::to_string(&value)?)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -254,7 +254,6 @@ fn build_compose_node_descriptor(
|
|||||||
base_volumes(),
|
base_volumes(),
|
||||||
default_extra_hosts(),
|
default_extra_hosts(),
|
||||||
ports,
|
ports,
|
||||||
api_port,
|
|
||||||
environment,
|
environment,
|
||||||
platform,
|
platform,
|
||||||
)
|
)
|
||||||
|
|||||||
@ -31,7 +31,7 @@ use crate::{
|
|||||||
|
|
||||||
const CFGSYNC_K8S_TIMEOUT_SECS: u64 = 300;
|
const CFGSYNC_K8S_TIMEOUT_SECS: u64 = 300;
|
||||||
const K8S_FULLNAME_OVERRIDE: &str = "logos-runner";
|
const K8S_FULLNAME_OVERRIDE: &str = "logos-runner";
|
||||||
const DEFAULT_K8S_TESTNET_IMAGE: &str = "logos-blockchain-testing:local";
|
const DEFAULT_K8S_TESTNET_IMAGE: &str = "public.ecr.aws/r4s5t9y4/logos/logos-blockchain:test";
|
||||||
|
|
||||||
/// Paths and image metadata required to deploy the Helm chart.
|
/// Paths and image metadata required to deploy the Helm chart.
|
||||||
pub struct K8sAssets {
|
pub struct K8sAssets {
|
||||||
|
|||||||
@ -17,6 +17,7 @@ default = []
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
|
cfgsync-adapter = { workspace = true }
|
||||||
futures = { default-features = false, features = ["std"], version = "0.3" }
|
futures = { default-features = false, features = ["std"], version = "0.3" }
|
||||||
parking_lot = { workspace = true }
|
parking_lot = { workspace = true }
|
||||||
prometheus-http-query = "0.8"
|
prometheus-http-query = "0.8"
|
||||||
|
|||||||
@ -1,84 +1 @@
|
|||||||
use std::error::Error;
|
pub use cfgsync_adapter::*;
|
||||||
|
|
||||||
use thiserror::Error;
|
|
||||||
|
|
||||||
pub type DynCfgsyncError = Box<dyn Error + Send + Sync + 'static>;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct CfgsyncNodeConfig {
|
|
||||||
pub identifier: String,
|
|
||||||
pub config_yaml: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait CfgsyncEnv {
|
|
||||||
type Deployment;
|
|
||||||
type Node;
|
|
||||||
type NodeConfig;
|
|
||||||
type Error: Error + Send + Sync + 'static;
|
|
||||||
|
|
||||||
fn nodes(deployment: &Self::Deployment) -> &[Self::Node];
|
|
||||||
|
|
||||||
fn node_identifier(index: usize, node: &Self::Node) -> String;
|
|
||||||
|
|
||||||
fn build_node_config(
|
|
||||||
deployment: &Self::Deployment,
|
|
||||||
node: &Self::Node,
|
|
||||||
) -> Result<Self::NodeConfig, Self::Error>;
|
|
||||||
|
|
||||||
fn rewrite_for_hostnames(
|
|
||||||
deployment: &Self::Deployment,
|
|
||||||
node_index: usize,
|
|
||||||
hostnames: &[String],
|
|
||||||
config: &mut Self::NodeConfig,
|
|
||||||
) -> Result<(), Self::Error>;
|
|
||||||
|
|
||||||
fn serialize_node_config(config: &Self::NodeConfig) -> Result<String, Self::Error>;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
|
||||||
pub enum BuildCfgsyncNodesError {
|
|
||||||
#[error("cfgsync hostnames mismatch (nodes={nodes}, hostnames={hostnames})")]
|
|
||||||
HostnameCountMismatch { nodes: usize, hostnames: usize },
|
|
||||||
#[error("cfgsync adapter failed: {source}")]
|
|
||||||
Adapter {
|
|
||||||
#[source]
|
|
||||||
source: DynCfgsyncError,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
fn adapter_error<E>(source: E) -> BuildCfgsyncNodesError
|
|
||||||
where
|
|
||||||
E: Error + Send + Sync + 'static,
|
|
||||||
{
|
|
||||||
BuildCfgsyncNodesError::Adapter {
|
|
||||||
source: Box::new(source),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn build_cfgsync_node_configs<E: CfgsyncEnv>(
|
|
||||||
deployment: &E::Deployment,
|
|
||||||
hostnames: &[String],
|
|
||||||
) -> Result<Vec<CfgsyncNodeConfig>, BuildCfgsyncNodesError> {
|
|
||||||
let nodes = E::nodes(deployment);
|
|
||||||
if nodes.len() != hostnames.len() {
|
|
||||||
return Err(BuildCfgsyncNodesError::HostnameCountMismatch {
|
|
||||||
nodes: nodes.len(),
|
|
||||||
hostnames: hostnames.len(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut output = Vec::with_capacity(nodes.len());
|
|
||||||
for (index, node) in nodes.iter().enumerate() {
|
|
||||||
let mut node_config = E::build_node_config(deployment, node).map_err(adapter_error)?;
|
|
||||||
E::rewrite_for_hostnames(deployment, index, hostnames, &mut node_config)
|
|
||||||
.map_err(adapter_error)?;
|
|
||||||
let config_yaml = E::serialize_node_config(&node_config).map_err(adapter_error)?;
|
|
||||||
|
|
||||||
output.push(CfgsyncNodeConfig {
|
|
||||||
identifier: E::node_identifier(index, node),
|
|
||||||
config_yaml,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(output)
|
|
||||||
}
|
|
||||||
|
|||||||
@ -1,3 +1,7 @@
|
|||||||
|
#[deprecated(
|
||||||
|
since = "0.1.0",
|
||||||
|
note = "testing-framework-core::cfgsync moved to cfgsync-adapter; update imports"
|
||||||
|
)]
|
||||||
pub mod cfgsync;
|
pub mod cfgsync;
|
||||||
pub mod env;
|
pub mod env;
|
||||||
pub mod runtime;
|
pub mod runtime;
|
||||||
|
|||||||
@ -31,4 +31,5 @@ uuid = { features = ["v4"], version = "1" }
|
|||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
groth16 = { workspace = true }
|
groth16 = { workspace = true }
|
||||||
key-management-system-service = { workspace = true }
|
key-management-system-service = { workspace = true }
|
||||||
|
serde_json = { workspace = true }
|
||||||
zksign = { workspace = true }
|
zksign = { workspace = true }
|
||||||
|
|||||||
@ -18,9 +18,6 @@ services:
|
|||||||
{% for port in node.ports %}
|
{% for port in node.ports %}
|
||||||
- {{ port }}
|
- {{ port }}
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
labels:
|
|
||||||
testing-framework.node: "true"
|
|
||||||
testing-framework.api-container-port: "{{ node.api_container_port }}"
|
|
||||||
environment:
|
environment:
|
||||||
{% for env in node.environment %}
|
{% for env in node.environment %}
|
||||||
{{ env.key }}: "{{ env.value }}"
|
{{ env.key }}: "{{ env.value }}"
|
||||||
|
|||||||
@ -9,7 +9,6 @@ pub struct NodeDescriptor {
|
|||||||
volumes: Vec<String>,
|
volumes: Vec<String>,
|
||||||
extra_hosts: Vec<String>,
|
extra_hosts: Vec<String>,
|
||||||
ports: Vec<String>,
|
ports: Vec<String>,
|
||||||
api_container_port: u16,
|
|
||||||
environment: Vec<EnvEntry>,
|
environment: Vec<EnvEntry>,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
platform: Option<String>,
|
platform: Option<String>,
|
||||||
@ -50,7 +49,6 @@ impl NodeDescriptor {
|
|||||||
volumes: Vec<String>,
|
volumes: Vec<String>,
|
||||||
extra_hosts: Vec<String>,
|
extra_hosts: Vec<String>,
|
||||||
ports: Vec<String>,
|
ports: Vec<String>,
|
||||||
api_container_port: u16,
|
|
||||||
environment: Vec<EnvEntry>,
|
environment: Vec<EnvEntry>,
|
||||||
platform: Option<String>,
|
platform: Option<String>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
@ -61,7 +59,6 @@ impl NodeDescriptor {
|
|||||||
volumes,
|
volumes,
|
||||||
extra_hosts,
|
extra_hosts,
|
||||||
ports,
|
ports,
|
||||||
api_container_port,
|
|
||||||
environment,
|
environment,
|
||||||
platform,
|
platform,
|
||||||
}
|
}
|
||||||
@ -80,9 +77,4 @@ impl NodeDescriptor {
|
|||||||
pub fn environment(&self) -> &[EnvEntry] {
|
pub fn environment(&self) -> &[EnvEntry] {
|
||||||
&self.environment
|
&self.environment
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
pub fn api_container_port(&self) -> u16 {
|
|
||||||
self.api_container_port
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -106,7 +106,8 @@ pub trait K8sDeployEnv: Application {
|
|||||||
format!("{release}-node-{index}")
|
format!("{release}-node-{index}")
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Label selector used to discover managed node services in attached mode.
|
/// Label selector used to discover managed node services in
|
||||||
|
/// existing-cluster mode.
|
||||||
fn attach_node_service_selector(release: &str) -> String {
|
fn attach_node_service_selector(release: &str) -> String {
|
||||||
format!("app.kubernetes.io/instance={release}")
|
format!("app.kubernetes.io/instance={release}")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,21 +0,0 @@
|
|||||||
[package]
|
|
||||||
categories = { workspace = true }
|
|
||||||
description = { workspace = true }
|
|
||||||
edition = { workspace = true }
|
|
||||||
keywords = { workspace = true }
|
|
||||||
license = { workspace = true }
|
|
||||||
name = "cfgsync-core"
|
|
||||||
readme = { workspace = true }
|
|
||||||
repository = { workspace = true }
|
|
||||||
version = { workspace = true }
|
|
||||||
|
|
||||||
[lints]
|
|
||||||
workspace = true
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
axum = { default-features = false, features = ["http1", "http2", "json", "tokio"], version = "0.7.5" }
|
|
||||||
reqwest = { features = ["json"], workspace = true }
|
|
||||||
serde = { default-features = false, features = ["derive"], version = "1" }
|
|
||||||
serde_json = { workspace = true }
|
|
||||||
thiserror = { workspace = true }
|
|
||||||
tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" }
|
|
||||||
@ -1,10 +0,0 @@
|
|||||||
pub mod client;
|
|
||||||
pub mod repo;
|
|
||||||
pub mod server;
|
|
||||||
|
|
||||||
pub use client::{CfgSyncClient, ClientError};
|
|
||||||
pub use repo::{
|
|
||||||
CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile, CfgSyncPayload,
|
|
||||||
ConfigRepo, RepoResponse,
|
|
||||||
};
|
|
||||||
pub use server::{CfgSyncState, ClientIp, RunCfgsyncError, cfgsync_app, run_cfgsync};
|
|
||||||
@ -1,107 +0,0 @@
|
|||||||
use std::{collections::HashMap, sync::Arc};
|
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use thiserror::Error;
|
|
||||||
use tokio::sync::oneshot::Sender;
|
|
||||||
|
|
||||||
pub const CFGSYNC_SCHEMA_VERSION: u16 = 1;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
||||||
pub struct CfgSyncFile {
|
|
||||||
pub path: String,
|
|
||||||
pub content: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
||||||
pub struct CfgSyncPayload {
|
|
||||||
pub schema_version: u16,
|
|
||||||
#[serde(default)]
|
|
||||||
pub files: Vec<CfgSyncFile>,
|
|
||||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
|
||||||
pub config_yaml: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CfgSyncPayload {
|
|
||||||
#[must_use]
|
|
||||||
pub fn from_files(files: Vec<CfgSyncFile>) -> Self {
|
|
||||||
Self {
|
|
||||||
schema_version: CFGSYNC_SCHEMA_VERSION,
|
|
||||||
files,
|
|
||||||
config_yaml: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[must_use]
|
|
||||||
pub fn normalized_files(&self, default_config_path: &str) -> Vec<CfgSyncFile> {
|
|
||||||
if !self.files.is_empty() {
|
|
||||||
return self.files.clone();
|
|
||||||
}
|
|
||||||
|
|
||||||
self.config_yaml
|
|
||||||
.as_ref()
|
|
||||||
.map(|content| {
|
|
||||||
vec![CfgSyncFile {
|
|
||||||
path: default_config_path.to_owned(),
|
|
||||||
content: content.clone(),
|
|
||||||
}]
|
|
||||||
})
|
|
||||||
.unwrap_or_default()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
||||||
#[serde(rename_all = "snake_case")]
|
|
||||||
pub enum CfgSyncErrorCode {
|
|
||||||
MissingConfig,
|
|
||||||
Internal,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, Error)]
|
|
||||||
#[error("{code:?}: {message}")]
|
|
||||||
pub struct CfgSyncErrorResponse {
|
|
||||||
pub code: CfgSyncErrorCode,
|
|
||||||
pub message: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CfgSyncErrorResponse {
|
|
||||||
#[must_use]
|
|
||||||
pub fn missing_config(identifier: &str) -> Self {
|
|
||||||
Self {
|
|
||||||
code: CfgSyncErrorCode::MissingConfig,
|
|
||||||
message: format!("missing config for host {identifier}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[must_use]
|
|
||||||
pub fn internal(message: impl Into<String>) -> Self {
|
|
||||||
Self {
|
|
||||||
code: CfgSyncErrorCode::Internal,
|
|
||||||
message: message.into(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub enum RepoResponse {
|
|
||||||
Config(CfgSyncPayload),
|
|
||||||
Error(CfgSyncErrorResponse),
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct ConfigRepo {
|
|
||||||
configs: HashMap<String, CfgSyncPayload>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ConfigRepo {
|
|
||||||
#[must_use]
|
|
||||||
pub fn from_bundle(configs: HashMap<String, CfgSyncPayload>) -> Arc<Self> {
|
|
||||||
Arc::new(Self { configs })
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn register(&self, identifier: String, reply_tx: Sender<RepoResponse>) {
|
|
||||||
let response = self.configs.get(&identifier).cloned().map_or_else(
|
|
||||||
|| RepoResponse::Error(CfgSyncErrorResponse::missing_config(&identifier)),
|
|
||||||
RepoResponse::Config,
|
|
||||||
);
|
|
||||||
|
|
||||||
let _ = reply_tx.send(response);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,95 +0,0 @@
|
|||||||
use std::{io, net::Ipv4Addr, sync::Arc};
|
|
||||||
|
|
||||||
use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::post};
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use thiserror::Error;
|
|
||||||
use tokio::sync::oneshot::channel;
|
|
||||||
|
|
||||||
use crate::repo::{CfgSyncErrorResponse, ConfigRepo, RepoResponse};
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
pub struct ClientIp {
|
|
||||||
/// Node IP that can be used by clients for observability/logging.
|
|
||||||
pub ip: Ipv4Addr,
|
|
||||||
/// Stable node identifier used as key in cfgsync bundle lookup.
|
|
||||||
pub identifier: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct CfgSyncState {
|
|
||||||
repo: Arc<ConfigRepo>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CfgSyncState {
|
|
||||||
#[must_use]
|
|
||||||
pub fn new(repo: Arc<ConfigRepo>) -> Self {
|
|
||||||
Self { repo }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
|
||||||
pub enum RunCfgsyncError {
|
|
||||||
#[error("failed to bind cfgsync server on {bind_addr}: {source}")]
|
|
||||||
Bind {
|
|
||||||
bind_addr: String,
|
|
||||||
#[source]
|
|
||||||
source: io::Error,
|
|
||||||
},
|
|
||||||
#[error("cfgsync server terminated unexpectedly: {source}")]
|
|
||||||
Serve {
|
|
||||||
#[source]
|
|
||||||
source: io::Error,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn node_config(
|
|
||||||
State(state): State<Arc<CfgSyncState>>,
|
|
||||||
Json(payload): Json<ClientIp>,
|
|
||||||
) -> impl IntoResponse {
|
|
||||||
let identifier = payload.identifier.clone();
|
|
||||||
let (reply_tx, reply_rx) = channel();
|
|
||||||
state.repo.register(identifier, reply_tx).await;
|
|
||||||
|
|
||||||
match reply_rx.await {
|
|
||||||
Err(_) => (
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR,
|
|
||||||
Json(CfgSyncErrorResponse::internal(
|
|
||||||
"error receiving config from repo",
|
|
||||||
)),
|
|
||||||
)
|
|
||||||
.into_response(),
|
|
||||||
Ok(RepoResponse::Config(payload_data)) => {
|
|
||||||
(StatusCode::OK, Json(payload_data)).into_response()
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(RepoResponse::Error(error)) => {
|
|
||||||
let status = match error.code {
|
|
||||||
crate::repo::CfgSyncErrorCode::MissingConfig => StatusCode::NOT_FOUND,
|
|
||||||
crate::repo::CfgSyncErrorCode::Internal => StatusCode::INTERNAL_SERVER_ERROR,
|
|
||||||
};
|
|
||||||
(status, Json(error)).into_response()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn cfgsync_app(state: CfgSyncState) -> Router {
|
|
||||||
Router::new()
|
|
||||||
.route("/node", post(node_config))
|
|
||||||
.route("/init-with-node", post(node_config))
|
|
||||||
.with_state(Arc::new(state))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn run_cfgsync(port: u16, state: CfgSyncState) -> Result<(), RunCfgsyncError> {
|
|
||||||
let app = cfgsync_app(state);
|
|
||||||
println!("Server running on http://0.0.0.0:{port}");
|
|
||||||
|
|
||||||
let bind_addr = format!("0.0.0.0:{port}");
|
|
||||||
let listener = tokio::net::TcpListener::bind(&bind_addr)
|
|
||||||
.await
|
|
||||||
.map_err(|source| RunCfgsyncError::Bind { bind_addr, source })?;
|
|
||||||
|
|
||||||
axum::serve(listener, app)
|
|
||||||
.await
|
|
||||||
.map_err(|source| RunCfgsyncError::Serve { source })?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
@ -1,22 +0,0 @@
|
|||||||
[package]
|
|
||||||
categories = { workspace = true }
|
|
||||||
description = { workspace = true }
|
|
||||||
edition = { workspace = true }
|
|
||||||
keywords = { workspace = true }
|
|
||||||
license = { workspace = true }
|
|
||||||
name = "cfgsync-runtime"
|
|
||||||
readme = { workspace = true }
|
|
||||||
repository = { workspace = true }
|
|
||||||
version = { workspace = true }
|
|
||||||
|
|
||||||
[lints]
|
|
||||||
workspace = true
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
anyhow = "1"
|
|
||||||
cfgsync-core = { workspace = true }
|
|
||||||
clap = { version = "4", features = ["derive"] }
|
|
||||||
serde = { workspace = true }
|
|
||||||
serde_yaml = { workspace = true }
|
|
||||||
testing-framework-core = { workspace = true }
|
|
||||||
tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" }
|
|
||||||
@ -1,39 +0,0 @@
|
|||||||
use anyhow::Result;
|
|
||||||
use cfgsync_core::CfgSyncFile;
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use testing_framework_core::cfgsync::{CfgsyncEnv, build_cfgsync_node_configs};
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
||||||
pub struct CfgSyncBundle {
|
|
||||||
pub nodes: Vec<CfgSyncBundleNode>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
||||||
pub struct CfgSyncBundleNode {
|
|
||||||
pub identifier: String,
|
|
||||||
#[serde(default)]
|
|
||||||
pub files: Vec<CfgSyncFile>,
|
|
||||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
|
||||||
pub config_yaml: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn build_cfgsync_bundle_with_hostnames<E: CfgsyncEnv>(
|
|
||||||
deployment: &E::Deployment,
|
|
||||||
hostnames: &[String],
|
|
||||||
) -> Result<CfgSyncBundle> {
|
|
||||||
let nodes = build_cfgsync_node_configs::<E>(deployment, hostnames)?;
|
|
||||||
|
|
||||||
Ok(CfgSyncBundle {
|
|
||||||
nodes: nodes
|
|
||||||
.into_iter()
|
|
||||||
.map(|node| CfgSyncBundleNode {
|
|
||||||
identifier: node.identifier,
|
|
||||||
files: vec![CfgSyncFile {
|
|
||||||
path: "/config.yaml".to_owned(),
|
|
||||||
content: node.config_yaml,
|
|
||||||
}],
|
|
||||||
config_yaml: None,
|
|
||||||
})
|
|
||||||
.collect(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
@ -1,108 +0,0 @@
|
|||||||
use std::{
|
|
||||||
env, fs,
|
|
||||||
net::Ipv4Addr,
|
|
||||||
path::{Path, PathBuf},
|
|
||||||
};
|
|
||||||
|
|
||||||
use anyhow::{Context as _, Result, anyhow, bail};
|
|
||||||
use cfgsync_core::{CFGSYNC_SCHEMA_VERSION, CfgSyncClient, CfgSyncFile, CfgSyncPayload, ClientIp};
|
|
||||||
use tokio::time::{Duration, sleep};
|
|
||||||
|
|
||||||
const FETCH_ATTEMPTS: usize = 5;
|
|
||||||
const FETCH_RETRY_DELAY: Duration = Duration::from_millis(250);
|
|
||||||
|
|
||||||
fn parse_ip(ip_str: &str) -> Ipv4Addr {
|
|
||||||
ip_str.parse().unwrap_or(Ipv4Addr::LOCALHOST)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn fetch_with_retry(payload: &ClientIp, server_addr: &str) -> Result<CfgSyncPayload> {
|
|
||||||
let client = CfgSyncClient::new(server_addr);
|
|
||||||
let mut last_error: Option<anyhow::Error> = None;
|
|
||||||
|
|
||||||
for attempt in 1..=FETCH_ATTEMPTS {
|
|
||||||
match client.fetch_node_config(payload).await {
|
|
||||||
Ok(config) => return Ok(config),
|
|
||||||
Err(error) => {
|
|
||||||
last_error = Some(error.into());
|
|
||||||
|
|
||||||
if attempt < FETCH_ATTEMPTS {
|
|
||||||
sleep(FETCH_RETRY_DELAY).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
match last_error {
|
|
||||||
Some(error) => Err(error),
|
|
||||||
None => Err(anyhow!("cfgsync client fetch failed without an error")),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn pull_config_files(payload: ClientIp, server_addr: &str, config_file: &str) -> Result<()> {
|
|
||||||
let config = fetch_with_retry(&payload, server_addr)
|
|
||||||
.await
|
|
||||||
.context("fetching cfgsync node config")?;
|
|
||||||
ensure_schema_version(&config)?;
|
|
||||||
|
|
||||||
let files = collect_payload_files(&config, config_file)?;
|
|
||||||
|
|
||||||
for file in files {
|
|
||||||
write_cfgsync_file(&file)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
println!("Config files saved");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn ensure_schema_version(config: &CfgSyncPayload) -> Result<()> {
|
|
||||||
if config.schema_version != CFGSYNC_SCHEMA_VERSION {
|
|
||||||
bail!(
|
|
||||||
"unsupported cfgsync payload schema version {}, expected {}",
|
|
||||||
config.schema_version,
|
|
||||||
CFGSYNC_SCHEMA_VERSION
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn collect_payload_files(config: &CfgSyncPayload, config_file: &str) -> Result<Vec<CfgSyncFile>> {
|
|
||||||
let files = config.normalized_files(config_file);
|
|
||||||
if files.is_empty() {
|
|
||||||
bail!("cfgsync payload contains no files");
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(files)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn write_cfgsync_file(file: &CfgSyncFile) -> Result<()> {
|
|
||||||
let path = PathBuf::from(&file.path);
|
|
||||||
|
|
||||||
ensure_parent_dir(&path)?;
|
|
||||||
|
|
||||||
fs::write(&path, &file.content).with_context(|| format!("writing {}", path.display()))?;
|
|
||||||
|
|
||||||
println!("Config saved to {}", path.display());
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn ensure_parent_dir(path: &Path) -> Result<()> {
|
|
||||||
if let Some(parent) = path.parent() {
|
|
||||||
if !parent.as_os_str().is_empty() {
|
|
||||||
fs::create_dir_all(parent)
|
|
||||||
.with_context(|| format!("creating parent directory {}", parent.display()))?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn run_cfgsync_client_from_env(default_port: u16) -> Result<()> {
|
|
||||||
let config_file_path = env::var("CFG_FILE_PATH").unwrap_or_else(|_| "config.yaml".to_owned());
|
|
||||||
let server_addr =
|
|
||||||
env::var("CFG_SERVER_ADDR").unwrap_or_else(|_| format!("http://127.0.0.1:{default_port}"));
|
|
||||||
let ip = parse_ip(&env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_owned()));
|
|
||||||
let identifier =
|
|
||||||
env::var("CFG_HOST_IDENTIFIER").unwrap_or_else(|_| "unidentified-node".to_owned());
|
|
||||||
|
|
||||||
pull_config_files(ClientIp { ip, identifier }, &server_addr, &config_file_path).await
|
|
||||||
}
|
|
||||||
@ -1,101 +0,0 @@
|
|||||||
use std::{collections::HashMap, fs, path::Path, sync::Arc};
|
|
||||||
|
|
||||||
use anyhow::Context as _;
|
|
||||||
use cfgsync_core::{CfgSyncFile, CfgSyncPayload, CfgSyncState, ConfigRepo, run_cfgsync};
|
|
||||||
use serde::Deserialize;
|
|
||||||
|
|
||||||
#[derive(Debug, Deserialize, Clone)]
|
|
||||||
pub struct CfgSyncServerConfig {
|
|
||||||
pub port: u16,
|
|
||||||
pub bundle_path: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CfgSyncServerConfig {
|
|
||||||
pub fn load_from_file(path: &Path) -> anyhow::Result<Self> {
|
|
||||||
let config_content = fs::read_to_string(path)
|
|
||||||
.with_context(|| format!("failed to read cfgsync config file {}", path.display()))?;
|
|
||||||
serde_yaml::from_str(&config_content)
|
|
||||||
.with_context(|| format!("failed to parse cfgsync config file {}", path.display()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
|
||||||
struct CfgSyncBundle {
|
|
||||||
nodes: Vec<CfgSyncBundleNode>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
|
||||||
struct CfgSyncBundleNode {
|
|
||||||
identifier: String,
|
|
||||||
#[serde(default)]
|
|
||||||
files: Vec<CfgSyncFile>,
|
|
||||||
#[serde(default)]
|
|
||||||
config_yaml: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn load_bundle(bundle_path: &Path) -> anyhow::Result<Arc<ConfigRepo>> {
|
|
||||||
let bundle = read_cfgsync_bundle(bundle_path)?;
|
|
||||||
|
|
||||||
let configs = bundle
|
|
||||||
.nodes
|
|
||||||
.into_iter()
|
|
||||||
.map(build_repo_entry)
|
|
||||||
.collect::<HashMap<_, _>>();
|
|
||||||
|
|
||||||
Ok(ConfigRepo::from_bundle(configs))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read_cfgsync_bundle(bundle_path: &Path) -> anyhow::Result<CfgSyncBundle> {
|
|
||||||
let bundle_content = fs::read_to_string(bundle_path).with_context(|| {
|
|
||||||
format!(
|
|
||||||
"failed to read cfgsync bundle file {}",
|
|
||||||
bundle_path.display()
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
serde_yaml::from_str(&bundle_content)
|
|
||||||
.with_context(|| format!("failed to parse cfgsync bundle {}", bundle_path.display()))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn build_repo_entry(node: CfgSyncBundleNode) -> (String, CfgSyncPayload) {
|
|
||||||
let files = if node.files.is_empty() {
|
|
||||||
build_legacy_files(node.config_yaml)
|
|
||||||
} else {
|
|
||||||
node.files
|
|
||||||
};
|
|
||||||
|
|
||||||
(node.identifier, CfgSyncPayload::from_files(files))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn build_legacy_files(config_yaml: Option<String>) -> Vec<CfgSyncFile> {
|
|
||||||
config_yaml
|
|
||||||
.map(|content| {
|
|
||||||
vec![CfgSyncFile {
|
|
||||||
path: "/config.yaml".to_owned(),
|
|
||||||
content,
|
|
||||||
}]
|
|
||||||
})
|
|
||||||
.unwrap_or_default()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn resolve_bundle_path(config_path: &Path, bundle_path: &str) -> std::path::PathBuf {
|
|
||||||
let path = Path::new(bundle_path);
|
|
||||||
if path.is_absolute() {
|
|
||||||
return path.to_path_buf();
|
|
||||||
}
|
|
||||||
|
|
||||||
config_path
|
|
||||||
.parent()
|
|
||||||
.unwrap_or_else(|| Path::new("."))
|
|
||||||
.join(path)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn run_cfgsync_server(config_path: &Path) -> anyhow::Result<()> {
|
|
||||||
let config = CfgSyncServerConfig::load_from_file(config_path)?;
|
|
||||||
let bundle_path = resolve_bundle_path(config_path, &config.bundle_path);
|
|
||||||
|
|
||||||
let repo = load_bundle(&bundle_path)?;
|
|
||||||
let state = CfgSyncState::new(repo);
|
|
||||||
run_cfgsync(config.port, state).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Loading…
x
Reference in New Issue
Block a user