From 7da3df455f5620e53bed4dccce02027859cdbeaf Mon Sep 17 00:00:00 2001 From: andrussal Date: Mon, 9 Mar 2026 08:48:05 +0100 Subject: [PATCH] Extract cfgsync into standalone crates --- Cargo.lock | 26 +- Cargo.toml | 13 +- cfgsync/adapter/Cargo.toml | 16 ++ cfgsync/adapter/src/lib.rs | 119 +++++++++ cfgsync/artifacts/Cargo.toml | 17 ++ cfgsync/artifacts/src/lib.rs | 64 +++++ cfgsync/core/Cargo.toml | 27 ++ cfgsync/core/src/bundle.rs | 26 ++ .../core}/src/client.rs | 5 + cfgsync/core/src/lib.rs | 18 ++ .../core}/src/render.rs | 29 ++- cfgsync/core/src/repo.rs | 233 ++++++++++++++++++ cfgsync/core/src/server.rs | 172 +++++++++++++ cfgsync/runtime/Cargo.toml | 26 ++ .../runtime}/src/bin/cfgsync-client.rs | 0 .../runtime}/src/bin/cfgsync-server.rs | 0 cfgsync/runtime/src/client.rs | 205 +++++++++++++++ .../runtime}/src/lib.rs | 3 - cfgsync/runtime/src/server.rs | 59 +++++ .../stack/scripts/docker/build_cfgsync.sh | 4 +- logos/runtime/ext/Cargo.toml | 2 +- logos/runtime/ext/src/cfgsync/mod.rs | 47 +++- logos/runtime/ext/src/compose_env.rs | 1 - logos/runtime/ext/src/k8s_env.rs | 2 +- testing-framework/core/Cargo.toml | 1 + testing-framework/core/src/cfgsync/mod.rs | 85 +------ testing-framework/core/src/lib.rs | 4 + .../deployers/compose/Cargo.toml | 1 + .../compose/assets/docker-compose.yml.tera | 3 - .../deployers/compose/src/descriptor/node.rs | 8 - testing-framework/deployers/k8s/src/env.rs | 3 +- .../tools/cfgsync-core/Cargo.toml | 21 -- .../tools/cfgsync-core/src/lib.rs | 10 - .../tools/cfgsync-core/src/repo.rs | 107 -------- .../tools/cfgsync-core/src/server.rs | 95 ------- .../tools/cfgsync-runtime/Cargo.toml | 22 -- .../tools/cfgsync-runtime/src/bundle.rs | 39 --- .../tools/cfgsync-runtime/src/client.rs | 108 -------- .../tools/cfgsync-runtime/src/server.rs | 101 -------- 39 files changed, 1095 insertions(+), 627 deletions(-) create mode 100644 cfgsync/adapter/Cargo.toml create mode 100644 cfgsync/adapter/src/lib.rs create mode 100644 cfgsync/artifacts/Cargo.toml create mode 100644 cfgsync/artifacts/src/lib.rs create mode 100644 cfgsync/core/Cargo.toml create mode 100644 cfgsync/core/src/bundle.rs rename {testing-framework/tools/cfgsync-core => cfgsync/core}/src/client.rs (88%) create mode 100644 cfgsync/core/src/lib.rs rename {testing-framework/tools/cfgsync-runtime => cfgsync/core}/src/render.rs (77%) create mode 100644 cfgsync/core/src/repo.rs create mode 100644 cfgsync/core/src/server.rs create mode 100644 cfgsync/runtime/Cargo.toml rename {testing-framework/tools/cfgsync-runtime => cfgsync/runtime}/src/bin/cfgsync-client.rs (100%) rename {testing-framework/tools/cfgsync-runtime => cfgsync/runtime}/src/bin/cfgsync-server.rs (100%) create mode 100644 cfgsync/runtime/src/client.rs rename {testing-framework/tools/cfgsync-runtime => cfgsync/runtime}/src/lib.rs (82%) create mode 100644 cfgsync/runtime/src/server.rs delete mode 100644 testing-framework/tools/cfgsync-core/Cargo.toml delete mode 100644 testing-framework/tools/cfgsync-core/src/lib.rs delete mode 100644 testing-framework/tools/cfgsync-core/src/repo.rs delete mode 100644 testing-framework/tools/cfgsync-core/src/server.rs delete mode 100644 testing-framework/tools/cfgsync-runtime/Cargo.toml delete mode 100644 testing-framework/tools/cfgsync-runtime/src/bundle.rs delete mode 100644 testing-framework/tools/cfgsync-runtime/src/client.rs delete mode 100644 testing-framework/tools/cfgsync-runtime/src/server.rs diff --git a/Cargo.lock b/Cargo.lock index 0145d09..0d00c93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -916,14 +916,33 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "cfgsync-adapter" +version = "0.1.0" +dependencies = [ + "thiserror 2.0.18", +] + +[[package]] +name = "cfgsync-artifacts" +version = "0.1.0" +dependencies = [ + "serde", + "thiserror 2.0.18", +] + [[package]] name = "cfgsync-core" version = "0.1.0" dependencies = [ + "anyhow", "axum", + "cfgsync-artifacts", "reqwest", "serde", "serde_json", + "serde_yaml", + "tempfile", "thiserror 2.0.18", "tokio", ] @@ -937,8 +956,10 @@ dependencies = [ "clap", "serde", "serde_yaml", - "testing-framework-core", + "tempfile", + "thiserror 2.0.18", "tokio", + "tracing", ] [[package]] @@ -2891,8 +2912,8 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "cfgsync-adapter", "cfgsync-core", - "cfgsync-runtime", "kube", "logos-blockchain-http-api-common", "reqwest", @@ -6528,6 +6549,7 @@ name = "testing-framework-core" version = "0.1.0" dependencies = [ "async-trait", + "cfgsync-adapter", "futures", "parking_lot", "prometheus-http-query", diff --git a/Cargo.toml b/Cargo.toml index 9e32eb1..a1f960d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,9 @@ [workspace] members = [ + "cfgsync/adapter", + "cfgsync/artifacts", + "cfgsync/core", + "cfgsync/runtime", "logos/examples", "logos/runtime/env", "logos/runtime/ext", @@ -8,8 +12,6 @@ members = [ "testing-framework/deployers/compose", "testing-framework/deployers/k8s", "testing-framework/deployers/local", - "testing-framework/tools/cfgsync-core", - "testing-framework/tools/cfgsync-runtime", ] resolver = "2" @@ -31,7 +33,9 @@ all = "allow" [workspace.dependencies] # Local testing framework crates -cfgsync-core = { default-features = false, path = "testing-framework/tools/cfgsync-core" } +cfgsync-adapter = { default-features = false, path = "cfgsync/adapter" } +cfgsync-artifacts = { default-features = false, path = "cfgsync/artifacts" } +cfgsync-core = { default-features = false, path = "cfgsync/core" } lb-ext = { default-features = false, path = "logos/runtime/ext" } lb-framework = { default-features = false, package = "testing_framework", git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "5ebe88a6e89ec6d7dd89e123c46f6b26dd1e4667" } lb-workloads = { default-features = false, path = "logos/runtime/workloads" } @@ -40,10 +44,11 @@ testing-framework-env = { default-features = false, path = "logos/run testing-framework-runner-compose = { default-features = false, path = "testing-framework/deployers/compose" } testing-framework-runner-k8s = { default-features = false, path = "testing-framework/deployers/k8s" } testing-framework-runner-local = { default-features = false, path = "testing-framework/deployers/local" } +testing-framework-workflows = { default-features = false, package = "lb-workloads", path = "logos/runtime/workloads" } # Logos dependencies (from logos-blockchain master @ deccbb2d2) broadcast-service = { default-features = false, package = "logos-blockchain-chain-broadcast-service", git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "5ebe88a6e89ec6d7dd89e123c46f6b26dd1e4667" } -cfgsync_runtime = { default-features = false, package = "cfgsync-runtime", path = "testing-framework/tools/cfgsync-runtime" } +cfgsync_runtime = { default-features = false, package = "cfgsync-runtime", path = "cfgsync/runtime" } chain-leader = { default-features = false, features = [ "pol-dev-mode", ], package = "logos-blockchain-chain-leader-service", git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "5ebe88a6e89ec6d7dd89e123c46f6b26dd1e4667" } diff --git a/cfgsync/adapter/Cargo.toml b/cfgsync/adapter/Cargo.toml new file mode 100644 index 0000000..034b349 --- /dev/null +++ b/cfgsync/adapter/Cargo.toml @@ -0,0 +1,16 @@ +[package] +categories = { workspace = true } +description = { workspace = true } +edition = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +name = "cfgsync-adapter" +readme = { workspace = true } +repository = { workspace = true } +version = { workspace = true } + +[lints] +workspace = true + +[dependencies] +thiserror = { workspace = true } diff --git a/cfgsync/adapter/src/lib.rs b/cfgsync/adapter/src/lib.rs new file mode 100644 index 0000000..467630d --- /dev/null +++ b/cfgsync/adapter/src/lib.rs @@ -0,0 +1,119 @@ +use std::error::Error; + +use thiserror::Error; + +/// Type-erased cfgsync adapter error used to preserve source context. +pub type DynCfgsyncError = Box; + +/// Per-node rendered config output used to build cfgsync bundles. +#[derive(Debug, Clone)] +pub struct CfgsyncNodeConfig { + /// Stable node identifier resolved by the adapter. + pub identifier: String, + /// Serialized config payload for the node. + pub config_yaml: String, +} + +/// Adapter contract for converting an application deployment model into +/// node-specific serialized config payloads. +pub trait CfgsyncEnv { + type Deployment; + type Node; + type NodeConfig; + type Error: Error + Send + Sync + 'static; + + fn nodes(deployment: &Self::Deployment) -> &[Self::Node]; + + fn node_identifier(index: usize, node: &Self::Node) -> String; + + fn build_node_config( + deployment: &Self::Deployment, + node: &Self::Node, + ) -> Result; + + fn rewrite_for_hostnames( + deployment: &Self::Deployment, + node_index: usize, + hostnames: &[String], + config: &mut Self::NodeConfig, + ) -> Result<(), Self::Error>; + + fn serialize_node_config(config: &Self::NodeConfig) -> Result; +} + +/// High-level failures while building adapter output for cfgsync. +#[derive(Debug, Error)] +pub enum BuildCfgsyncNodesError { + #[error("cfgsync hostnames mismatch (nodes={nodes}, hostnames={hostnames})")] + HostnameCountMismatch { nodes: usize, hostnames: usize }, + #[error("cfgsync adapter failed: {source}")] + Adapter { + #[source] + source: DynCfgsyncError, + }, +} + +fn adapter_error(source: E) -> BuildCfgsyncNodesError +where + E: Error + Send + Sync + 'static, +{ + BuildCfgsyncNodesError::Adapter { + source: Box::new(source), + } +} + +/// Builds cfgsync node configs for a deployment by: +/// 1) validating hostname count, +/// 2) building each node config, +/// 3) rewriting host references, +/// 4) serializing each node payload. +pub fn build_cfgsync_node_configs( + deployment: &E::Deployment, + hostnames: &[String], +) -> Result, BuildCfgsyncNodesError> { + let nodes = E::nodes(deployment); + ensure_hostname_count(nodes.len(), hostnames.len())?; + + let mut output = Vec::with_capacity(nodes.len()); + for (index, node) in nodes.iter().enumerate() { + output.push(build_node_entry::(deployment, node, index, hostnames)?); + } + + Ok(output) +} + +fn ensure_hostname_count(nodes: usize, hostnames: usize) -> Result<(), BuildCfgsyncNodesError> { + if nodes != hostnames { + return Err(BuildCfgsyncNodesError::HostnameCountMismatch { nodes, hostnames }); + } + + Ok(()) +} + +fn build_node_entry( + deployment: &E::Deployment, + node: &E::Node, + index: usize, + hostnames: &[String], +) -> Result { + let node_config = build_rewritten_node_config::(deployment, node, index, hostnames)?; + let config_yaml = E::serialize_node_config(&node_config).map_err(adapter_error)?; + + Ok(CfgsyncNodeConfig { + identifier: E::node_identifier(index, node), + config_yaml, + }) +} + +fn build_rewritten_node_config( + deployment: &E::Deployment, + node: &E::Node, + index: usize, + hostnames: &[String], +) -> Result { + let mut node_config = E::build_node_config(deployment, node).map_err(adapter_error)?; + E::rewrite_for_hostnames(deployment, index, hostnames, &mut node_config) + .map_err(adapter_error)?; + + Ok(node_config) +} diff --git a/cfgsync/artifacts/Cargo.toml b/cfgsync/artifacts/Cargo.toml new file mode 100644 index 0000000..7c2db1e --- /dev/null +++ b/cfgsync/artifacts/Cargo.toml @@ -0,0 +1,17 @@ +[package] +categories = { workspace = true } +description = "App-agnostic cfgsync artifact model" +edition = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +name = "cfgsync-artifacts" +readme = { workspace = true } +repository = { workspace = true } +version = { workspace = true } + +[lints] +workspace = true + +[dependencies] +serde = { workspace = true } +thiserror = { workspace = true } diff --git a/cfgsync/artifacts/src/lib.rs b/cfgsync/artifacts/src/lib.rs new file mode 100644 index 0000000..2d1b54c --- /dev/null +++ b/cfgsync/artifacts/src/lib.rs @@ -0,0 +1,64 @@ +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +/// Single file artifact delivered to a node. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ArtifactFile { + /// Destination path where content should be written. + pub path: String, + /// Raw file contents. + pub content: String, +} + +impl ArtifactFile { + #[must_use] + pub fn new(path: impl Into, content: impl Into) -> Self { + Self { + path: path.into(), + content: content.into(), + } + } +} + +/// Collection of files delivered together for one node. +#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)] +pub struct ArtifactSet { + pub files: Vec, +} + +impl ArtifactSet { + #[must_use] + pub fn new(files: Vec) -> Self { + Self { files } + } + + #[must_use] + pub fn len(&self) -> usize { + self.files.len() + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.files.is_empty() + } + + /// Validates that no two files target the same output path. + pub fn ensure_unique_paths(&self) -> Result<(), ArtifactValidationError> { + let mut seen = std::collections::HashSet::new(); + + for file in &self.files { + if !seen.insert(file.path.clone()) { + return Err(ArtifactValidationError::DuplicatePath(file.path.clone())); + } + } + + Ok(()) + } +} + +/// Validation failures for [`ArtifactSet`]. +#[derive(Debug, Error)] +pub enum ArtifactValidationError { + #[error("duplicate artifact path `{0}`")] + DuplicatePath(String), +} diff --git a/cfgsync/core/Cargo.toml b/cfgsync/core/Cargo.toml new file mode 100644 index 0000000..fea2c39 --- /dev/null +++ b/cfgsync/core/Cargo.toml @@ -0,0 +1,27 @@ +[package] +categories = { workspace = true } +description = { workspace = true } +edition = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +name = "cfgsync-core" +readme = { workspace = true } +repository = { workspace = true } +version = { workspace = true } + +[lints] +workspace = true + +[dependencies] +anyhow = "1" +axum = { default-features = false, features = ["http1", "http2", "json", "tokio"], version = "0.7.5" } +cfgsync-artifacts = { workspace = true } +reqwest = { features = ["json"], workspace = true } +serde = { default-features = false, features = ["derive"], version = "1" } +serde_json = { workspace = true } +serde_yaml = { workspace = true } +thiserror = { workspace = true } +tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" } + +[dev-dependencies] +tempfile = { workspace = true } diff --git a/cfgsync/core/src/bundle.rs b/cfgsync/core/src/bundle.rs new file mode 100644 index 0000000..003dd8a --- /dev/null +++ b/cfgsync/core/src/bundle.rs @@ -0,0 +1,26 @@ +use serde::{Deserialize, Serialize}; + +use crate::CfgSyncFile; + +/// Top-level cfgsync bundle containing per-node file payloads. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CfgSyncBundle { + pub nodes: Vec, +} + +impl CfgSyncBundle { + #[must_use] + pub fn new(nodes: Vec) -> Self { + Self { nodes } + } +} + +/// Artifact set for a single node resolved by identifier. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CfgSyncBundleNode { + /// Stable node identifier used by cfgsync lookup. + pub identifier: String, + /// Files that should be materialized for the node. + #[serde(default)] + pub files: Vec, +} diff --git a/testing-framework/tools/cfgsync-core/src/client.rs b/cfgsync/core/src/client.rs similarity index 88% rename from testing-framework/tools/cfgsync-core/src/client.rs rename to cfgsync/core/src/client.rs index 2df652c..28e59b5 100644 --- a/testing-framework/tools/cfgsync-core/src/client.rs +++ b/cfgsync/core/src/client.rs @@ -6,6 +6,7 @@ use crate::{ server::ClientIp, }; +/// cfgsync client-side request/response failures. #[derive(Debug, Error)] pub enum ClientError { #[error("request failed: {0}")] @@ -20,6 +21,7 @@ pub enum ClientError { Decode(serde_json::Error), } +/// Reusable HTTP client for cfgsync server endpoints. #[derive(Clone, Debug)] pub struct CfgSyncClient { base_url: String, @@ -44,6 +46,7 @@ impl CfgSyncClient { &self.base_url } + /// Fetches `/node` payload for a node identifier. pub async fn fetch_node_config( &self, payload: &ClientIp, @@ -51,6 +54,7 @@ impl CfgSyncClient { self.post_json("/node", payload).await } + /// Fetches `/init-with-node` payload for a node identifier. pub async fn fetch_init_with_node_config( &self, payload: &ClientIp, @@ -58,6 +62,7 @@ impl CfgSyncClient { self.post_json("/init-with-node", payload).await } + /// Posts JSON payload to a cfgsync endpoint and decodes cfgsync payload. pub async fn post_json( &self, path: &str, diff --git a/cfgsync/core/src/lib.rs b/cfgsync/core/src/lib.rs new file mode 100644 index 0000000..b6851e3 --- /dev/null +++ b/cfgsync/core/src/lib.rs @@ -0,0 +1,18 @@ +pub mod bundle; +pub mod client; +pub mod render; +pub mod repo; +pub mod server; + +pub use bundle::{CfgSyncBundle, CfgSyncBundleNode}; +pub use client::{CfgSyncClient, ClientError}; +pub use render::{ + CfgsyncConfigOverrides, CfgsyncOutputPaths, RenderedCfgsync, apply_cfgsync_overrides, + apply_timeout_floor, ensure_bundle_path, load_cfgsync_template_yaml, + render_cfgsync_yaml_from_template, write_rendered_cfgsync, +}; +pub use repo::{ + CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile, CfgSyncPayload, + ConfigProvider, ConfigRepo, FileConfigProvider, FileConfigProviderError, RepoResponse, +}; +pub use server::{CfgSyncState, ClientIp, RunCfgsyncError, cfgsync_app, run_cfgsync}; diff --git a/testing-framework/tools/cfgsync-runtime/src/render.rs b/cfgsync/core/src/render.rs similarity index 77% rename from testing-framework/tools/cfgsync-runtime/src/render.rs rename to cfgsync/core/src/render.rs index 0f59b5c..2031986 100644 --- a/testing-framework/tools/cfgsync-runtime/src/render.rs +++ b/cfgsync/core/src/render.rs @@ -2,19 +2,25 @@ use std::{fs, path::Path}; use anyhow::{Context as _, Result}; use serde_yaml::{Mapping, Value}; +use thiserror::Error; +/// Rendered cfgsync outputs written for server startup. #[derive(Debug, Clone)] pub struct RenderedCfgsync { + /// Serialized cfgsync server config YAML. pub config_yaml: String, + /// Serialized node bundle YAML. pub bundle_yaml: String, } +/// Output paths used when materializing rendered cfgsync files. #[derive(Debug, Clone, Copy)] pub struct CfgsyncOutputPaths<'a> { pub config_path: &'a Path, pub bundle_path: &'a Path, } +/// Ensures bundle path override exists, defaulting to output bundle file name. pub fn ensure_bundle_path(bundle_path: &mut Option, output_bundle_path: &Path) { if bundle_path.is_some() { return; @@ -29,12 +35,14 @@ pub fn ensure_bundle_path(bundle_path: &mut Option, output_bundle_path: ); } +/// Applies a minimum timeout floor to an existing timeout value. pub fn apply_timeout_floor(timeout: &mut u64, min_timeout_secs: Option) { if let Some(min_timeout_secs) = min_timeout_secs { *timeout = (*timeout).max(min_timeout_secs); } } +/// Writes rendered cfgsync server and bundle YAML files. pub fn write_rendered_cfgsync( rendered: &RenderedCfgsync, output: CfgsyncOutputPaths<'_>, @@ -44,6 +52,7 @@ pub fn write_rendered_cfgsync( Ok(()) } +/// Optional overrides applied to a cfgsync template document. #[derive(Debug, Clone, Default)] pub struct CfgsyncConfigOverrides { pub port: Option, @@ -53,12 +62,20 @@ pub struct CfgsyncConfigOverrides { pub metrics_otlp_ingest_url: Option, } +#[derive(Debug, Error)] +enum RenderTemplateError { + #[error("cfgsync template key `{key}` must be a YAML map")] + NonMappingEntry { key: String }, +} + +/// Loads cfgsync template YAML from disk. pub fn load_cfgsync_template_yaml(path: &Path) -> Result { let file = fs::File::open(path) .with_context(|| format!("opening cfgsync template at {}", path.display()))?; serde_yaml::from_reader(file).context("parsing cfgsync template") } +/// Renders cfgsync config YAML by applying overrides to a template document. pub fn render_cfgsync_yaml_from_template( mut template: Value, overrides: &CfgsyncConfigOverrides, @@ -67,6 +84,7 @@ pub fn render_cfgsync_yaml_from_template( serde_yaml::to_string(&template).context("serializing rendered cfgsync config") } +/// Applies cfgsync-specific override fields to a mutable YAML document. pub fn apply_cfgsync_overrides( template: &mut Value, overrides: &CfgsyncConfigOverrides, @@ -105,7 +123,7 @@ pub fn apply_cfgsync_overrides( } if let Some(endpoint) = &overrides.metrics_otlp_ingest_url { - let tracing_settings = nested_mapping_mut(root, "tracing_settings"); + let tracing_settings = nested_mapping_mut(root, "tracing_settings")?; tracing_settings.insert( Value::String("metrics".to_string()), parse_otlp_metrics_layer(endpoint)?, @@ -121,19 +139,20 @@ fn mapping_mut(value: &mut Value) -> Result<&mut Mapping> { .context("cfgsync template root must be a YAML map") } -fn nested_mapping_mut<'a>(mapping: &'a mut Mapping, key: &str) -> &'a mut Mapping { - let key = Value::String(key.to_string()); +fn nested_mapping_mut<'a>(mapping: &'a mut Mapping, key: &str) -> Result<&'a mut Mapping> { + let key_name = key.to_owned(); + let key = Value::String(key_name.clone()); let entry = mapping .entry(key) .or_insert_with(|| Value::Mapping(Mapping::new())); if !entry.is_mapping() { - *entry = Value::Mapping(Mapping::new()); + return Err(RenderTemplateError::NonMappingEntry { key: key_name }).map_err(Into::into); } entry .as_mapping_mut() - .expect("mapping entry should always be a mapping") + .context("cfgsync template entry should be a YAML map") } fn parse_otlp_metrics_layer(endpoint: &str) -> Result { diff --git a/cfgsync/core/src/repo.rs b/cfgsync/core/src/repo.rs new file mode 100644 index 0000000..560e265 --- /dev/null +++ b/cfgsync/core/src/repo.rs @@ -0,0 +1,233 @@ +use std::{collections::HashMap, fs, path::Path, sync::Arc}; + +use cfgsync_artifacts::ArtifactFile; +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +use crate::{CfgSyncBundle, CfgSyncBundleNode}; + +/// Schema version served by cfgsync payload responses. +pub const CFGSYNC_SCHEMA_VERSION: u16 = 1; + +/// Canonical cfgsync file type used in payloads and bundles. +pub type CfgSyncFile = ArtifactFile; + +/// Payload returned by cfgsync server for one node. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CfgSyncPayload { + /// Payload schema version for compatibility checks. + pub schema_version: u16, + /// Files that must be written on the target node. + #[serde(default)] + pub files: Vec, +} + +impl CfgSyncPayload { + #[must_use] + pub fn from_files(files: Vec) -> Self { + Self { + schema_version: CFGSYNC_SCHEMA_VERSION, + files, + } + } + + #[must_use] + pub fn files(&self) -> &[CfgSyncFile] { + &self.files + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.files.is_empty() + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum CfgSyncErrorCode { + MissingConfig, + Internal, +} + +/// Structured error body returned by cfgsync server. +#[derive(Debug, Clone, Serialize, Deserialize, Error)] +#[error("{code:?}: {message}")] +pub struct CfgSyncErrorResponse { + pub code: CfgSyncErrorCode, + pub message: String, +} + +impl CfgSyncErrorResponse { + #[must_use] + pub fn missing_config(identifier: &str) -> Self { + Self { + code: CfgSyncErrorCode::MissingConfig, + message: format!("missing config for host {identifier}"), + } + } + + #[must_use] + pub fn internal(message: impl Into) -> Self { + Self { + code: CfgSyncErrorCode::Internal, + message: message.into(), + } + } +} + +/// Repository resolution outcome for a requested node identifier. +pub enum RepoResponse { + Config(CfgSyncPayload), + Error(CfgSyncErrorResponse), +} + +/// Read-only source for cfgsync node payloads. +pub trait ConfigProvider: Send + Sync { + fn resolve(&self, identifier: &str) -> RepoResponse; +} + +/// In-memory map-backed provider used by cfgsync server state. +pub struct ConfigRepo { + configs: HashMap, +} + +impl ConfigRepo { + #[must_use] + pub fn from_bundle(configs: HashMap) -> Arc { + Arc::new(Self { configs }) + } +} + +impl ConfigProvider for ConfigRepo { + fn resolve(&self, identifier: &str) -> RepoResponse { + self.configs.get(identifier).cloned().map_or_else( + || RepoResponse::Error(CfgSyncErrorResponse::missing_config(identifier)), + RepoResponse::Config, + ) + } +} + +/// Failures when loading a file-backed cfgsync provider. +#[derive(Debug, Error)] +pub enum FileConfigProviderError { + #[error("failed to read cfgsync bundle at {path}: {source}")] + Read { + path: String, + #[source] + source: std::io::Error, + }, + #[error("failed to parse cfgsync bundle at {path}: {source}")] + Parse { + path: String, + #[source] + source: serde_yaml::Error, + }, +} + +/// YAML bundle-backed provider implementation. +pub struct FileConfigProvider { + inner: ConfigRepo, +} + +impl FileConfigProvider { + /// Loads provider state from a cfgsync bundle YAML file. + pub fn from_yaml_file(path: &Path) -> Result { + let raw = fs::read_to_string(path).map_err(|source| FileConfigProviderError::Read { + path: path.display().to_string(), + source, + })?; + + let bundle: CfgSyncBundle = + serde_yaml::from_str(&raw).map_err(|source| FileConfigProviderError::Parse { + path: path.display().to_string(), + source, + })?; + + let configs = bundle + .nodes + .into_iter() + .map(payload_from_bundle_node) + .collect(); + + Ok(Self { + inner: ConfigRepo { configs }, + }) + } +} + +impl ConfigProvider for FileConfigProvider { + fn resolve(&self, identifier: &str) -> RepoResponse { + self.inner.resolve(identifier) + } +} + +fn payload_from_bundle_node(node: CfgSyncBundleNode) -> (String, CfgSyncPayload) { + (node.identifier, CfgSyncPayload::from_files(node.files)) +} + +#[cfg(test)] +mod tests { + use std::io::Write as _; + + use tempfile::NamedTempFile; + + use super::*; + + fn sample_payload() -> CfgSyncPayload { + CfgSyncPayload::from_files(vec![CfgSyncFile::new("/config.yaml", "key: value")]) + } + + #[test] + fn resolves_existing_identifier() { + let mut configs = HashMap::new(); + configs.insert("node-1".to_owned(), sample_payload()); + let repo = ConfigRepo { configs }; + + match repo.resolve("node-1") { + RepoResponse::Config(payload) => { + assert_eq!(payload.schema_version, CFGSYNC_SCHEMA_VERSION); + assert_eq!(payload.files.len(), 1); + assert_eq!(payload.files[0].path, "/config.yaml"); + } + RepoResponse::Error(error) => panic!("expected config response, got {error}"), + } + } + + #[test] + fn reports_missing_identifier() { + let repo = ConfigRepo { + configs: HashMap::new(), + }; + + match repo.resolve("unknown-node") { + RepoResponse::Config(_) => panic!("expected missing-config error"), + RepoResponse::Error(error) => { + assert!(matches!(error.code, CfgSyncErrorCode::MissingConfig)); + assert!(error.message.contains("unknown-node")); + } + } + } + + #[test] + fn loads_file_provider_bundle() { + let mut bundle_file = NamedTempFile::new().expect("create temp bundle"); + let yaml = r#" +nodes: + - identifier: node-1 + files: + - path: /config.yaml + content: "a: 1" +"#; + bundle_file + .write_all(yaml.as_bytes()) + .expect("write bundle yaml"); + + let provider = + FileConfigProvider::from_yaml_file(bundle_file.path()).expect("load file provider"); + + match provider.resolve("node-1") { + RepoResponse::Config(payload) => assert_eq!(payload.files.len(), 1), + RepoResponse::Error(error) => panic!("expected config, got {error}"), + } + } +} diff --git a/cfgsync/core/src/server.rs b/cfgsync/core/src/server.rs new file mode 100644 index 0000000..e841610 --- /dev/null +++ b/cfgsync/core/src/server.rs @@ -0,0 +1,172 @@ +use std::{io, net::Ipv4Addr, sync::Arc}; + +use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::post}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +use crate::repo::{CfgSyncErrorCode, ConfigProvider, RepoResponse}; + +/// Request payload used by cfgsync client for node config resolution. +#[derive(Serialize, Deserialize)] +pub struct ClientIp { + /// Node IP that can be used by clients for observability/logging. + pub ip: Ipv4Addr, + /// Stable node identifier used as key in cfgsync bundle lookup. + pub identifier: String, +} + +/// Runtime state shared across cfgsync HTTP handlers. +pub struct CfgSyncState { + repo: Arc, +} + +impl CfgSyncState { + #[must_use] + pub fn new(repo: Arc) -> Self { + Self { repo } + } +} + +/// Fatal runtime failures when serving cfgsync HTTP endpoints. +#[derive(Debug, Error)] +pub enum RunCfgsyncError { + #[error("failed to bind cfgsync server on {bind_addr}: {source}")] + Bind { + bind_addr: String, + #[source] + source: io::Error, + }, + #[error("cfgsync server terminated unexpectedly: {source}")] + Serve { + #[source] + source: io::Error, + }, +} + +async fn node_config( + State(state): State>, + Json(payload): Json, +) -> impl IntoResponse { + let response = resolve_node_config_response(&state, &payload.identifier); + + match response { + RepoResponse::Config(payload_data) => (StatusCode::OK, Json(payload_data)).into_response(), + RepoResponse::Error(error) => { + let status = error_status(&error.code); + + (status, Json(error)).into_response() + } + } +} + +fn resolve_node_config_response(state: &CfgSyncState, identifier: &str) -> RepoResponse { + state.repo.resolve(identifier) +} + +fn error_status(code: &CfgSyncErrorCode) -> StatusCode { + match code { + CfgSyncErrorCode::MissingConfig => StatusCode::NOT_FOUND, + CfgSyncErrorCode::Internal => StatusCode::INTERNAL_SERVER_ERROR, + } +} + +pub fn cfgsync_app(state: CfgSyncState) -> Router { + Router::new() + .route("/node", post(node_config)) + .route("/init-with-node", post(node_config)) + .with_state(Arc::new(state)) +} + +/// Runs cfgsync HTTP server on the provided port until shutdown/error. +pub async fn run_cfgsync(port: u16, state: CfgSyncState) -> Result<(), RunCfgsyncError> { + let app = cfgsync_app(state); + println!("Server running on http://0.0.0.0:{port}"); + + let bind_addr = format!("0.0.0.0:{port}"); + let listener = tokio::net::TcpListener::bind(&bind_addr) + .await + .map_err(|source| RunCfgsyncError::Bind { bind_addr, source })?; + + axum::serve(listener, app) + .await + .map_err(|source| RunCfgsyncError::Serve { source })?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::{collections::HashMap, sync::Arc}; + + use axum::{Json, extract::State, http::StatusCode, response::IntoResponse}; + + use super::{CfgSyncState, ClientIp, node_config}; + use crate::repo::{ + CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile, + CfgSyncPayload, ConfigProvider, RepoResponse, + }; + + struct StaticProvider { + data: HashMap, + } + + impl ConfigProvider for StaticProvider { + fn resolve(&self, identifier: &str) -> RepoResponse { + self.data.get(identifier).cloned().map_or_else( + || RepoResponse::Error(CfgSyncErrorResponse::missing_config(identifier)), + RepoResponse::Config, + ) + } + } + + fn sample_payload() -> CfgSyncPayload { + CfgSyncPayload { + schema_version: CFGSYNC_SCHEMA_VERSION, + files: vec![CfgSyncFile::new("/app-config.yaml", "app: test")], + } + } + + #[tokio::test] + async fn node_config_resolves_from_non_tf_provider() { + let mut data = HashMap::new(); + data.insert("node-a".to_owned(), sample_payload()); + + let provider = Arc::new(StaticProvider { data }); + let state = Arc::new(CfgSyncState::new(provider)); + let payload = ClientIp { + ip: "127.0.0.1".parse().expect("valid ip"), + identifier: "node-a".to_owned(), + }; + + let response = node_config(State(state), Json(payload)) + .await + .into_response(); + + assert_eq!(response.status(), StatusCode::OK); + } + + #[tokio::test] + async fn node_config_returns_not_found_for_unknown_identifier() { + let provider = Arc::new(StaticProvider { + data: HashMap::new(), + }); + let state = Arc::new(CfgSyncState::new(provider)); + let payload = ClientIp { + ip: "127.0.0.1".parse().expect("valid ip"), + identifier: "missing-node".to_owned(), + }; + + let response = node_config(State(state), Json(payload)) + .await + .into_response(); + + assert_eq!(response.status(), StatusCode::NOT_FOUND); + } + + #[test] + fn missing_config_error_uses_expected_code() { + let error = CfgSyncErrorResponse::missing_config("missing-node"); + + assert!(matches!(error.code, CfgSyncErrorCode::MissingConfig)); + } +} diff --git a/cfgsync/runtime/Cargo.toml b/cfgsync/runtime/Cargo.toml new file mode 100644 index 0000000..045bc73 --- /dev/null +++ b/cfgsync/runtime/Cargo.toml @@ -0,0 +1,26 @@ +[package] +categories = { workspace = true } +description = { workspace = true } +edition = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +name = "cfgsync-runtime" +readme = { workspace = true } +repository = { workspace = true } +version = { workspace = true } + +[lints] +workspace = true + +[dependencies] +anyhow = "1" +cfgsync-core = { workspace = true } +clap = { version = "4", features = ["derive"] } +serde = { workspace = true } +serde_yaml = { workspace = true } +thiserror = { workspace = true } +tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" } +tracing = { workspace = true } + +[dev-dependencies] +tempfile = { workspace = true } diff --git a/testing-framework/tools/cfgsync-runtime/src/bin/cfgsync-client.rs b/cfgsync/runtime/src/bin/cfgsync-client.rs similarity index 100% rename from testing-framework/tools/cfgsync-runtime/src/bin/cfgsync-client.rs rename to cfgsync/runtime/src/bin/cfgsync-client.rs diff --git a/testing-framework/tools/cfgsync-runtime/src/bin/cfgsync-server.rs b/cfgsync/runtime/src/bin/cfgsync-server.rs similarity index 100% rename from testing-framework/tools/cfgsync-runtime/src/bin/cfgsync-server.rs rename to cfgsync/runtime/src/bin/cfgsync-server.rs diff --git a/cfgsync/runtime/src/client.rs b/cfgsync/runtime/src/client.rs new file mode 100644 index 0000000..aab3d1c --- /dev/null +++ b/cfgsync/runtime/src/client.rs @@ -0,0 +1,205 @@ +use std::{ + env, fs, + net::Ipv4Addr, + path::{Path, PathBuf}, +}; + +use anyhow::{Context as _, Result, bail}; +use cfgsync_core::{CFGSYNC_SCHEMA_VERSION, CfgSyncClient, CfgSyncFile, CfgSyncPayload, ClientIp}; +use thiserror::Error; +use tokio::time::{Duration, sleep}; +use tracing::info; + +const FETCH_ATTEMPTS: usize = 5; +const FETCH_RETRY_DELAY: Duration = Duration::from_millis(250); + +#[derive(Debug, Error)] +enum ClientEnvError { + #[error("CFG_HOST_IP `{value}` is not a valid IPv4 address")] + InvalidIp { value: String }, +} + +async fn fetch_with_retry(payload: &ClientIp, server_addr: &str) -> Result { + let client = CfgSyncClient::new(server_addr); + + for attempt in 1..=FETCH_ATTEMPTS { + match fetch_once(&client, payload).await { + Ok(config) => return Ok(config), + Err(error) => { + if attempt == FETCH_ATTEMPTS { + return Err(error).with_context(|| { + format!("fetching cfgsync payload after {attempt} attempts") + }); + } + + sleep(FETCH_RETRY_DELAY).await; + } + } + } + + unreachable!("cfgsync fetch loop always returns before exhausting attempts"); +} + +async fn fetch_once(client: &CfgSyncClient, payload: &ClientIp) -> Result { + let response = client.fetch_node_config(payload).await?; + + Ok(response) +} + +async fn pull_config_files(payload: ClientIp, server_addr: &str) -> Result<()> { + let config = fetch_with_retry(&payload, server_addr) + .await + .context("fetching cfgsync node config")?; + ensure_schema_version(&config)?; + + let files = collect_payload_files(&config)?; + + for file in files { + write_cfgsync_file(file)?; + } + + info!(files = files.len(), "cfgsync files saved"); + + Ok(()) +} + +fn ensure_schema_version(config: &CfgSyncPayload) -> Result<()> { + if config.schema_version != CFGSYNC_SCHEMA_VERSION { + bail!( + "unsupported cfgsync payload schema version {}, expected {}", + config.schema_version, + CFGSYNC_SCHEMA_VERSION + ); + } + + Ok(()) +} + +fn collect_payload_files(config: &CfgSyncPayload) -> Result<&[CfgSyncFile]> { + if config.is_empty() { + bail!("cfgsync payload contains no files"); + } + + Ok(config.files()) +} + +fn write_cfgsync_file(file: &CfgSyncFile) -> Result<()> { + let path = PathBuf::from(&file.path); + + ensure_parent_dir(&path)?; + + fs::write(&path, &file.content).with_context(|| format!("writing {}", path.display()))?; + + info!(path = %path.display(), "cfgsync file saved"); + + Ok(()) +} + +fn ensure_parent_dir(path: &Path) -> Result<()> { + let Some(parent) = path.parent() else { + return Ok(()); + }; + + if parent.as_os_str().is_empty() { + return Ok(()); + } + + fs::create_dir_all(parent) + .with_context(|| format!("creating parent directory {}", parent.display()))?; + + Ok(()) +} + +/// Resolves cfgsync client inputs from environment and materializes node files. +pub async fn run_cfgsync_client_from_env(default_port: u16) -> Result<()> { + let server_addr = + env::var("CFG_SERVER_ADDR").unwrap_or_else(|_| format!("http://127.0.0.1:{default_port}")); + let ip = parse_ip_env(&env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_owned()))?; + let identifier = + env::var("CFG_HOST_IDENTIFIER").unwrap_or_else(|_| "unidentified-node".to_owned()); + + pull_config_files(ClientIp { ip, identifier }, &server_addr).await +} + +fn parse_ip_env(ip_str: &str) -> Result { + ip_str + .parse() + .map_err(|_| ClientEnvError::InvalidIp { + value: ip_str.to_owned(), + }) + .map_err(Into::into) +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use cfgsync_core::{ + CfgSyncBundle, CfgSyncBundleNode, CfgSyncPayload, CfgSyncState, ConfigRepo, run_cfgsync, + }; + use tempfile::tempdir; + + use super::*; + + #[tokio::test] + async fn client_materializes_multi_file_payload_from_cfgsync_server() { + let dir = tempdir().expect("create temp dir"); + let app_config_path = dir.path().join("config.yaml"); + let deployment_path = dir.path().join("deployment.yaml"); + + let bundle = CfgSyncBundle::new(vec![CfgSyncBundleNode { + identifier: "node-1".to_owned(), + files: vec![ + CfgSyncFile::new(app_config_path.to_string_lossy(), "app_key: app_value"), + CfgSyncFile::new(deployment_path.to_string_lossy(), "mode: local"), + ], + }]); + + let repo = ConfigRepo::from_bundle(bundle_to_payload_map(bundle)); + let state = CfgSyncState::new(repo); + let port = allocate_test_port(); + let address = format!("http://127.0.0.1:{port}"); + let server = tokio::spawn(async move { + run_cfgsync(port, state).await.expect("run cfgsync server"); + }); + + pull_config_files( + ClientIp { + ip: "127.0.0.1".parse().expect("parse ip"), + identifier: "node-1".to_owned(), + }, + &address, + ) + .await + .expect("pull config files"); + + server.abort(); + let _ = server.await; + + let app_config = fs::read_to_string(&app_config_path).expect("read app config"); + let deployment = fs::read_to_string(&deployment_path).expect("read deployment config"); + + assert_eq!(app_config, "app_key: app_value"); + assert_eq!(deployment, "mode: local"); + } + + fn bundle_to_payload_map(bundle: CfgSyncBundle) -> HashMap { + bundle + .nodes + .into_iter() + .map(|node| { + let CfgSyncBundleNode { identifier, files } = node; + + (identifier, CfgSyncPayload::from_files(files)) + }) + .collect() + } + + fn allocate_test_port() -> u16 { + let listener = + std::net::TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port for test"); + let port = listener.local_addr().expect("read local addr").port(); + drop(listener); + port + } +} diff --git a/testing-framework/tools/cfgsync-runtime/src/lib.rs b/cfgsync/runtime/src/lib.rs similarity index 82% rename from testing-framework/tools/cfgsync-runtime/src/lib.rs rename to cfgsync/runtime/src/lib.rs index bc28a08..7c0a75b 100644 --- a/testing-framework/tools/cfgsync-runtime/src/lib.rs +++ b/cfgsync/runtime/src/lib.rs @@ -1,6 +1,3 @@ -pub mod bundle; -pub mod render; - pub use cfgsync_core as core; mod client; diff --git a/cfgsync/runtime/src/server.rs b/cfgsync/runtime/src/server.rs new file mode 100644 index 0000000..a038a43 --- /dev/null +++ b/cfgsync/runtime/src/server.rs @@ -0,0 +1,59 @@ +use std::{fs, path::Path, sync::Arc}; + +use anyhow::Context as _; +use cfgsync_core::{CfgSyncState, ConfigProvider, FileConfigProvider, run_cfgsync}; +use serde::Deserialize; + +/// Runtime cfgsync server config loaded from YAML. +#[derive(Debug, Deserialize, Clone)] +pub struct CfgSyncServerConfig { + pub port: u16, + pub bundle_path: String, +} + +impl CfgSyncServerConfig { + /// Loads cfgsync runtime server config from a YAML file. + pub fn load_from_file(path: &Path) -> anyhow::Result { + let config_content = fs::read_to_string(path) + .with_context(|| format!("failed to read cfgsync config file {}", path.display()))?; + + serde_yaml::from_str(&config_content) + .with_context(|| format!("failed to parse cfgsync config file {}", path.display())) + } +} + +fn load_bundle(bundle_path: &Path) -> anyhow::Result> { + let provider = FileConfigProvider::from_yaml_file(bundle_path) + .with_context(|| format!("loading cfgsync provider from {}", bundle_path.display()))?; + + Ok(Arc::new(provider)) +} + +fn resolve_bundle_path(config_path: &Path, bundle_path: &str) -> std::path::PathBuf { + let path = Path::new(bundle_path); + if path.is_absolute() { + return path.to_path_buf(); + } + + config_path + .parent() + .unwrap_or_else(|| Path::new(".")) + .join(path) +} + +/// Loads runtime config and starts cfgsync HTTP server process. +pub async fn run_cfgsync_server(config_path: &Path) -> anyhow::Result<()> { + let config = CfgSyncServerConfig::load_from_file(config_path)?; + let bundle_path = resolve_bundle_path(config_path, &config.bundle_path); + + let state = build_server_state(&bundle_path)?; + run_cfgsync(config.port, state).await?; + + Ok(()) +} + +fn build_server_state(bundle_path: &Path) -> anyhow::Result { + let repo = load_bundle(bundle_path)?; + + Ok(CfgSyncState::new(repo)) +} diff --git a/logos/infra/assets/stack/scripts/docker/build_cfgsync.sh b/logos/infra/assets/stack/scripts/docker/build_cfgsync.sh index 61a4ad5..7921e8a 100755 --- a/logos/infra/assets/stack/scripts/docker/build_cfgsync.sh +++ b/logos/infra/assets/stack/scripts/docker/build_cfgsync.sh @@ -2,10 +2,10 @@ set -euo pipefail RUSTFLAGS='--cfg feature="pol-dev-mode"' \ - cargo build --manifest-path /workspace/testing-framework/tools/cfgsync-runtime/Cargo.toml --bin cfgsync-server + cargo build --manifest-path /workspace/cfgsync/runtime/Cargo.toml --bin cfgsync-server RUSTFLAGS='--cfg feature="pol-dev-mode"' \ - cargo build --manifest-path /workspace/testing-framework/tools/cfgsync-runtime/Cargo.toml --bin cfgsync-client + cargo build --manifest-path /workspace/cfgsync/runtime/Cargo.toml --bin cfgsync-client cp /workspace/target/debug/cfgsync-server /workspace/artifacts/cfgsync-server cp /workspace/target/debug/cfgsync-client /workspace/artifacts/cfgsync-client diff --git a/logos/runtime/ext/Cargo.toml b/logos/runtime/ext/Cargo.toml index 008bf4f..05dd243 100644 --- a/logos/runtime/ext/Cargo.toml +++ b/logos/runtime/ext/Cargo.toml @@ -7,8 +7,8 @@ version = { workspace = true } [dependencies] # Workspace crates +cfgsync-adapter = { workspace = true } cfgsync-core = { workspace = true } -cfgsync_runtime = { workspace = true } lb-framework = { workspace = true } testing-framework-core = { workspace = true } testing-framework-env = { workspace = true } diff --git a/logos/runtime/ext/src/cfgsync/mod.rs b/logos/runtime/ext/src/cfgsync/mod.rs index 6d0b308..7fe4e44 100644 --- a/logos/runtime/ext/src/cfgsync/mod.rs +++ b/logos/runtime/ext/src/cfgsync/mod.rs @@ -1,7 +1,8 @@ -use anyhow::{Result, anyhow}; -pub(crate) use cfgsync_runtime::render::CfgsyncOutputPaths; -use cfgsync_runtime::{ - bundle::{CfgSyncBundle, CfgSyncBundleNode, build_cfgsync_bundle_with_hostnames}, +use anyhow::Result; +use cfgsync_adapter::{CfgsyncEnv, build_cfgsync_node_configs}; +pub(crate) use cfgsync_core::render::CfgsyncOutputPaths; +use cfgsync_core::{ + CfgSyncBundle, CfgSyncBundleNode, render::{ CfgsyncConfigOverrides, RenderedCfgsync, ensure_bundle_path, render_cfgsync_yaml_from_template, write_rendered_cfgsync, @@ -9,7 +10,7 @@ use cfgsync_runtime::{ }; use reqwest::Url; use serde_yaml::{Mapping, Value}; -use testing_framework_core::cfgsync::CfgsyncEnv; +use thiserror::Error; pub(crate) struct CfgsyncRenderOptions { pub port: Option, @@ -18,6 +19,14 @@ pub(crate) struct CfgsyncRenderOptions { pub metrics_otlp_ingest_url: Option, } +#[derive(Debug, Error)] +enum BundleRenderError { + #[error("cfgsync bundle node `{identifier}` is missing `/config.yaml`")] + MissingConfigFile { identifier: String }, + #[error("cfgsync config file is missing `{key}`")] + MissingYamlKey { key: String }, +} + pub(crate) fn render_cfgsync_from_template( topology: &E::Deployment, hostnames: &[String], @@ -26,7 +35,7 @@ pub(crate) fn render_cfgsync_from_template( let cfg = build_cfgsync_server_config(); let overrides = build_overrides::(topology, options); let config_yaml = render_cfgsync_yaml_from_template(cfg, &overrides)?; - let mut bundle = build_cfgsync_bundle_with_hostnames::(topology, hostnames)?; + let mut bundle = build_cfgsync_bundle::(topology, hostnames)?; append_deployment_files(&mut bundle)?; let bundle_yaml = serde_yaml::to_string(&bundle)?; @@ -36,14 +45,32 @@ pub(crate) fn render_cfgsync_from_template( }) } +fn build_cfgsync_bundle( + topology: &E::Deployment, + hostnames: &[String], +) -> Result { + let nodes = build_cfgsync_node_configs::(topology, hostnames)?; + let nodes = nodes + .into_iter() + .map(|node| CfgSyncBundleNode { + identifier: node.identifier, + files: vec![build_bundle_file("/config.yaml", node.config_yaml)], + }) + .collect(); + + Ok(CfgSyncBundle::new(nodes)) +} + fn append_deployment_files(bundle: &mut CfgSyncBundle) -> Result<()> { for node in &mut bundle.nodes { if has_file_path(node, "/deployment.yaml") { continue; } - let config_content = config_file_content(node) - .ok_or_else(|| anyhow!("cfgsync bundle node missing /config.yaml"))?; + let config_content = + config_file_content(node).ok_or_else(|| BundleRenderError::MissingConfigFile { + identifier: node.identifier.clone(), + })?; let deployment_yaml = extract_yaml_key(&config_content, "deployment")?; node.files @@ -75,7 +102,9 @@ fn extract_yaml_key(content: &str, key: &str) -> Result { let value = document .get(key) .cloned() - .ok_or_else(|| anyhow!("config yaml missing `{key}`"))?; + .ok_or_else(|| BundleRenderError::MissingYamlKey { + key: key.to_owned(), + })?; Ok(serde_yaml::to_string(&value)?) } diff --git a/logos/runtime/ext/src/compose_env.rs b/logos/runtime/ext/src/compose_env.rs index 9803b12..4fa3018 100644 --- a/logos/runtime/ext/src/compose_env.rs +++ b/logos/runtime/ext/src/compose_env.rs @@ -254,7 +254,6 @@ fn build_compose_node_descriptor( base_volumes(), default_extra_hosts(), ports, - api_port, environment, platform, ) diff --git a/logos/runtime/ext/src/k8s_env.rs b/logos/runtime/ext/src/k8s_env.rs index 8c9a1e8..15b2dc9 100644 --- a/logos/runtime/ext/src/k8s_env.rs +++ b/logos/runtime/ext/src/k8s_env.rs @@ -31,7 +31,7 @@ use crate::{ const CFGSYNC_K8S_TIMEOUT_SECS: u64 = 300; const K8S_FULLNAME_OVERRIDE: &str = "logos-runner"; -const DEFAULT_K8S_TESTNET_IMAGE: &str = "logos-blockchain-testing:local"; +const DEFAULT_K8S_TESTNET_IMAGE: &str = "public.ecr.aws/r4s5t9y4/logos/logos-blockchain:test"; /// Paths and image metadata required to deploy the Helm chart. pub struct K8sAssets { diff --git a/testing-framework/core/Cargo.toml b/testing-framework/core/Cargo.toml index 00e0658..cda1d37 100644 --- a/testing-framework/core/Cargo.toml +++ b/testing-framework/core/Cargo.toml @@ -17,6 +17,7 @@ default = [] [dependencies] async-trait = "0.1" +cfgsync-adapter = { workspace = true } futures = { default-features = false, features = ["std"], version = "0.3" } parking_lot = { workspace = true } prometheus-http-query = "0.8" diff --git a/testing-framework/core/src/cfgsync/mod.rs b/testing-framework/core/src/cfgsync/mod.rs index 8f0ad34..af45d4d 100644 --- a/testing-framework/core/src/cfgsync/mod.rs +++ b/testing-framework/core/src/cfgsync/mod.rs @@ -1,84 +1 @@ -use std::error::Error; - -use thiserror::Error; - -pub type DynCfgsyncError = Box; - -#[derive(Debug, Clone)] -pub struct CfgsyncNodeConfig { - pub identifier: String, - pub config_yaml: String, -} - -pub trait CfgsyncEnv { - type Deployment; - type Node; - type NodeConfig; - type Error: Error + Send + Sync + 'static; - - fn nodes(deployment: &Self::Deployment) -> &[Self::Node]; - - fn node_identifier(index: usize, node: &Self::Node) -> String; - - fn build_node_config( - deployment: &Self::Deployment, - node: &Self::Node, - ) -> Result; - - fn rewrite_for_hostnames( - deployment: &Self::Deployment, - node_index: usize, - hostnames: &[String], - config: &mut Self::NodeConfig, - ) -> Result<(), Self::Error>; - - fn serialize_node_config(config: &Self::NodeConfig) -> Result; -} - -#[derive(Debug, Error)] -pub enum BuildCfgsyncNodesError { - #[error("cfgsync hostnames mismatch (nodes={nodes}, hostnames={hostnames})")] - HostnameCountMismatch { nodes: usize, hostnames: usize }, - #[error("cfgsync adapter failed: {source}")] - Adapter { - #[source] - source: DynCfgsyncError, - }, -} - -fn adapter_error(source: E) -> BuildCfgsyncNodesError -where - E: Error + Send + Sync + 'static, -{ - BuildCfgsyncNodesError::Adapter { - source: Box::new(source), - } -} - -pub fn build_cfgsync_node_configs( - deployment: &E::Deployment, - hostnames: &[String], -) -> Result, BuildCfgsyncNodesError> { - let nodes = E::nodes(deployment); - if nodes.len() != hostnames.len() { - return Err(BuildCfgsyncNodesError::HostnameCountMismatch { - nodes: nodes.len(), - hostnames: hostnames.len(), - }); - } - - let mut output = Vec::with_capacity(nodes.len()); - for (index, node) in nodes.iter().enumerate() { - let mut node_config = E::build_node_config(deployment, node).map_err(adapter_error)?; - E::rewrite_for_hostnames(deployment, index, hostnames, &mut node_config) - .map_err(adapter_error)?; - let config_yaml = E::serialize_node_config(&node_config).map_err(adapter_error)?; - - output.push(CfgsyncNodeConfig { - identifier: E::node_identifier(index, node), - config_yaml, - }); - } - - Ok(output) -} +pub use cfgsync_adapter::*; diff --git a/testing-framework/core/src/lib.rs b/testing-framework/core/src/lib.rs index 5cbdb97..3e76f81 100644 --- a/testing-framework/core/src/lib.rs +++ b/testing-framework/core/src/lib.rs @@ -1,3 +1,7 @@ +#[deprecated( + since = "0.1.0", + note = "testing-framework-core::cfgsync moved to cfgsync-adapter; update imports" +)] pub mod cfgsync; pub mod env; pub mod runtime; diff --git a/testing-framework/deployers/compose/Cargo.toml b/testing-framework/deployers/compose/Cargo.toml index 9ac8b7a..4df1f40 100644 --- a/testing-framework/deployers/compose/Cargo.toml +++ b/testing-framework/deployers/compose/Cargo.toml @@ -31,4 +31,5 @@ uuid = { features = ["v4"], version = "1" } [dev-dependencies] groth16 = { workspace = true } key-management-system-service = { workspace = true } +serde_json = { workspace = true } zksign = { workspace = true } diff --git a/testing-framework/deployers/compose/assets/docker-compose.yml.tera b/testing-framework/deployers/compose/assets/docker-compose.yml.tera index c6ecc29..ba21922 100644 --- a/testing-framework/deployers/compose/assets/docker-compose.yml.tera +++ b/testing-framework/deployers/compose/assets/docker-compose.yml.tera @@ -18,9 +18,6 @@ services: {% for port in node.ports %} - {{ port }} {% endfor %} - labels: - testing-framework.node: "true" - testing-framework.api-container-port: "{{ node.api_container_port }}" environment: {% for env in node.environment %} {{ env.key }}: "{{ env.value }}" diff --git a/testing-framework/deployers/compose/src/descriptor/node.rs b/testing-framework/deployers/compose/src/descriptor/node.rs index d35f8f5..c5f769b 100644 --- a/testing-framework/deployers/compose/src/descriptor/node.rs +++ b/testing-framework/deployers/compose/src/descriptor/node.rs @@ -9,7 +9,6 @@ pub struct NodeDescriptor { volumes: Vec, extra_hosts: Vec, ports: Vec, - api_container_port: u16, environment: Vec, #[serde(skip_serializing_if = "Option::is_none")] platform: Option, @@ -50,7 +49,6 @@ impl NodeDescriptor { volumes: Vec, extra_hosts: Vec, ports: Vec, - api_container_port: u16, environment: Vec, platform: Option, ) -> Self { @@ -61,7 +59,6 @@ impl NodeDescriptor { volumes, extra_hosts, ports, - api_container_port, environment, platform, } @@ -80,9 +77,4 @@ impl NodeDescriptor { pub fn environment(&self) -> &[EnvEntry] { &self.environment } - - #[cfg(test)] - pub fn api_container_port(&self) -> u16 { - self.api_container_port - } } diff --git a/testing-framework/deployers/k8s/src/env.rs b/testing-framework/deployers/k8s/src/env.rs index 4ec7fd2..353d7e9 100644 --- a/testing-framework/deployers/k8s/src/env.rs +++ b/testing-framework/deployers/k8s/src/env.rs @@ -106,7 +106,8 @@ pub trait K8sDeployEnv: Application { format!("{release}-node-{index}") } - /// Label selector used to discover managed node services in attached mode. + /// Label selector used to discover managed node services in + /// existing-cluster mode. fn attach_node_service_selector(release: &str) -> String { format!("app.kubernetes.io/instance={release}") } diff --git a/testing-framework/tools/cfgsync-core/Cargo.toml b/testing-framework/tools/cfgsync-core/Cargo.toml deleted file mode 100644 index 63f2d1f..0000000 --- a/testing-framework/tools/cfgsync-core/Cargo.toml +++ /dev/null @@ -1,21 +0,0 @@ -[package] -categories = { workspace = true } -description = { workspace = true } -edition = { workspace = true } -keywords = { workspace = true } -license = { workspace = true } -name = "cfgsync-core" -readme = { workspace = true } -repository = { workspace = true } -version = { workspace = true } - -[lints] -workspace = true - -[dependencies] -axum = { default-features = false, features = ["http1", "http2", "json", "tokio"], version = "0.7.5" } -reqwest = { features = ["json"], workspace = true } -serde = { default-features = false, features = ["derive"], version = "1" } -serde_json = { workspace = true } -thiserror = { workspace = true } -tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" } diff --git a/testing-framework/tools/cfgsync-core/src/lib.rs b/testing-framework/tools/cfgsync-core/src/lib.rs deleted file mode 100644 index 822ae69..0000000 --- a/testing-framework/tools/cfgsync-core/src/lib.rs +++ /dev/null @@ -1,10 +0,0 @@ -pub mod client; -pub mod repo; -pub mod server; - -pub use client::{CfgSyncClient, ClientError}; -pub use repo::{ - CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile, CfgSyncPayload, - ConfigRepo, RepoResponse, -}; -pub use server::{CfgSyncState, ClientIp, RunCfgsyncError, cfgsync_app, run_cfgsync}; diff --git a/testing-framework/tools/cfgsync-core/src/repo.rs b/testing-framework/tools/cfgsync-core/src/repo.rs deleted file mode 100644 index 62e320c..0000000 --- a/testing-framework/tools/cfgsync-core/src/repo.rs +++ /dev/null @@ -1,107 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use serde::{Deserialize, Serialize}; -use thiserror::Error; -use tokio::sync::oneshot::Sender; - -pub const CFGSYNC_SCHEMA_VERSION: u16 = 1; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CfgSyncFile { - pub path: String, - pub content: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CfgSyncPayload { - pub schema_version: u16, - #[serde(default)] - pub files: Vec, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub config_yaml: Option, -} - -impl CfgSyncPayload { - #[must_use] - pub fn from_files(files: Vec) -> Self { - Self { - schema_version: CFGSYNC_SCHEMA_VERSION, - files, - config_yaml: None, - } - } - - #[must_use] - pub fn normalized_files(&self, default_config_path: &str) -> Vec { - if !self.files.is_empty() { - return self.files.clone(); - } - - self.config_yaml - .as_ref() - .map(|content| { - vec![CfgSyncFile { - path: default_config_path.to_owned(), - content: content.clone(), - }] - }) - .unwrap_or_default() - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum CfgSyncErrorCode { - MissingConfig, - Internal, -} - -#[derive(Debug, Clone, Serialize, Deserialize, Error)] -#[error("{code:?}: {message}")] -pub struct CfgSyncErrorResponse { - pub code: CfgSyncErrorCode, - pub message: String, -} - -impl CfgSyncErrorResponse { - #[must_use] - pub fn missing_config(identifier: &str) -> Self { - Self { - code: CfgSyncErrorCode::MissingConfig, - message: format!("missing config for host {identifier}"), - } - } - - #[must_use] - pub fn internal(message: impl Into) -> Self { - Self { - code: CfgSyncErrorCode::Internal, - message: message.into(), - } - } -} - -pub enum RepoResponse { - Config(CfgSyncPayload), - Error(CfgSyncErrorResponse), -} - -pub struct ConfigRepo { - configs: HashMap, -} - -impl ConfigRepo { - #[must_use] - pub fn from_bundle(configs: HashMap) -> Arc { - Arc::new(Self { configs }) - } - - pub async fn register(&self, identifier: String, reply_tx: Sender) { - let response = self.configs.get(&identifier).cloned().map_or_else( - || RepoResponse::Error(CfgSyncErrorResponse::missing_config(&identifier)), - RepoResponse::Config, - ); - - let _ = reply_tx.send(response); - } -} diff --git a/testing-framework/tools/cfgsync-core/src/server.rs b/testing-framework/tools/cfgsync-core/src/server.rs deleted file mode 100644 index f519d53..0000000 --- a/testing-framework/tools/cfgsync-core/src/server.rs +++ /dev/null @@ -1,95 +0,0 @@ -use std::{io, net::Ipv4Addr, sync::Arc}; - -use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::post}; -use serde::{Deserialize, Serialize}; -use thiserror::Error; -use tokio::sync::oneshot::channel; - -use crate::repo::{CfgSyncErrorResponse, ConfigRepo, RepoResponse}; - -#[derive(Serialize, Deserialize)] -pub struct ClientIp { - /// Node IP that can be used by clients for observability/logging. - pub ip: Ipv4Addr, - /// Stable node identifier used as key in cfgsync bundle lookup. - pub identifier: String, -} - -pub struct CfgSyncState { - repo: Arc, -} - -impl CfgSyncState { - #[must_use] - pub fn new(repo: Arc) -> Self { - Self { repo } - } -} - -#[derive(Debug, Error)] -pub enum RunCfgsyncError { - #[error("failed to bind cfgsync server on {bind_addr}: {source}")] - Bind { - bind_addr: String, - #[source] - source: io::Error, - }, - #[error("cfgsync server terminated unexpectedly: {source}")] - Serve { - #[source] - source: io::Error, - }, -} - -async fn node_config( - State(state): State>, - Json(payload): Json, -) -> impl IntoResponse { - let identifier = payload.identifier.clone(); - let (reply_tx, reply_rx) = channel(); - state.repo.register(identifier, reply_tx).await; - - match reply_rx.await { - Err(_) => ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(CfgSyncErrorResponse::internal( - "error receiving config from repo", - )), - ) - .into_response(), - Ok(RepoResponse::Config(payload_data)) => { - (StatusCode::OK, Json(payload_data)).into_response() - } - - Ok(RepoResponse::Error(error)) => { - let status = match error.code { - crate::repo::CfgSyncErrorCode::MissingConfig => StatusCode::NOT_FOUND, - crate::repo::CfgSyncErrorCode::Internal => StatusCode::INTERNAL_SERVER_ERROR, - }; - (status, Json(error)).into_response() - } - } -} - -pub fn cfgsync_app(state: CfgSyncState) -> Router { - Router::new() - .route("/node", post(node_config)) - .route("/init-with-node", post(node_config)) - .with_state(Arc::new(state)) -} - -pub async fn run_cfgsync(port: u16, state: CfgSyncState) -> Result<(), RunCfgsyncError> { - let app = cfgsync_app(state); - println!("Server running on http://0.0.0.0:{port}"); - - let bind_addr = format!("0.0.0.0:{port}"); - let listener = tokio::net::TcpListener::bind(&bind_addr) - .await - .map_err(|source| RunCfgsyncError::Bind { bind_addr, source })?; - - axum::serve(listener, app) - .await - .map_err(|source| RunCfgsyncError::Serve { source })?; - - Ok(()) -} diff --git a/testing-framework/tools/cfgsync-runtime/Cargo.toml b/testing-framework/tools/cfgsync-runtime/Cargo.toml deleted file mode 100644 index f51d081..0000000 --- a/testing-framework/tools/cfgsync-runtime/Cargo.toml +++ /dev/null @@ -1,22 +0,0 @@ -[package] -categories = { workspace = true } -description = { workspace = true } -edition = { workspace = true } -keywords = { workspace = true } -license = { workspace = true } -name = "cfgsync-runtime" -readme = { workspace = true } -repository = { workspace = true } -version = { workspace = true } - -[lints] -workspace = true - -[dependencies] -anyhow = "1" -cfgsync-core = { workspace = true } -clap = { version = "4", features = ["derive"] } -serde = { workspace = true } -serde_yaml = { workspace = true } -testing-framework-core = { workspace = true } -tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" } diff --git a/testing-framework/tools/cfgsync-runtime/src/bundle.rs b/testing-framework/tools/cfgsync-runtime/src/bundle.rs deleted file mode 100644 index 8aa257d..0000000 --- a/testing-framework/tools/cfgsync-runtime/src/bundle.rs +++ /dev/null @@ -1,39 +0,0 @@ -use anyhow::Result; -use cfgsync_core::CfgSyncFile; -use serde::{Deserialize, Serialize}; -use testing_framework_core::cfgsync::{CfgsyncEnv, build_cfgsync_node_configs}; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CfgSyncBundle { - pub nodes: Vec, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CfgSyncBundleNode { - pub identifier: String, - #[serde(default)] - pub files: Vec, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub config_yaml: Option, -} - -pub fn build_cfgsync_bundle_with_hostnames( - deployment: &E::Deployment, - hostnames: &[String], -) -> Result { - let nodes = build_cfgsync_node_configs::(deployment, hostnames)?; - - Ok(CfgSyncBundle { - nodes: nodes - .into_iter() - .map(|node| CfgSyncBundleNode { - identifier: node.identifier, - files: vec![CfgSyncFile { - path: "/config.yaml".to_owned(), - content: node.config_yaml, - }], - config_yaml: None, - }) - .collect(), - }) -} diff --git a/testing-framework/tools/cfgsync-runtime/src/client.rs b/testing-framework/tools/cfgsync-runtime/src/client.rs deleted file mode 100644 index 364fdf6..0000000 --- a/testing-framework/tools/cfgsync-runtime/src/client.rs +++ /dev/null @@ -1,108 +0,0 @@ -use std::{ - env, fs, - net::Ipv4Addr, - path::{Path, PathBuf}, -}; - -use anyhow::{Context as _, Result, anyhow, bail}; -use cfgsync_core::{CFGSYNC_SCHEMA_VERSION, CfgSyncClient, CfgSyncFile, CfgSyncPayload, ClientIp}; -use tokio::time::{Duration, sleep}; - -const FETCH_ATTEMPTS: usize = 5; -const FETCH_RETRY_DELAY: Duration = Duration::from_millis(250); - -fn parse_ip(ip_str: &str) -> Ipv4Addr { - ip_str.parse().unwrap_or(Ipv4Addr::LOCALHOST) -} - -async fn fetch_with_retry(payload: &ClientIp, server_addr: &str) -> Result { - let client = CfgSyncClient::new(server_addr); - let mut last_error: Option = None; - - for attempt in 1..=FETCH_ATTEMPTS { - match client.fetch_node_config(payload).await { - Ok(config) => return Ok(config), - Err(error) => { - last_error = Some(error.into()); - - if attempt < FETCH_ATTEMPTS { - sleep(FETCH_RETRY_DELAY).await; - } - } - } - } - - match last_error { - Some(error) => Err(error), - None => Err(anyhow!("cfgsync client fetch failed without an error")), - } -} - -async fn pull_config_files(payload: ClientIp, server_addr: &str, config_file: &str) -> Result<()> { - let config = fetch_with_retry(&payload, server_addr) - .await - .context("fetching cfgsync node config")?; - ensure_schema_version(&config)?; - - let files = collect_payload_files(&config, config_file)?; - - for file in files { - write_cfgsync_file(&file)?; - } - - println!("Config files saved"); - Ok(()) -} - -fn ensure_schema_version(config: &CfgSyncPayload) -> Result<()> { - if config.schema_version != CFGSYNC_SCHEMA_VERSION { - bail!( - "unsupported cfgsync payload schema version {}, expected {}", - config.schema_version, - CFGSYNC_SCHEMA_VERSION - ); - } - - Ok(()) -} - -fn collect_payload_files(config: &CfgSyncPayload, config_file: &str) -> Result> { - let files = config.normalized_files(config_file); - if files.is_empty() { - bail!("cfgsync payload contains no files"); - } - - Ok(files) -} - -fn write_cfgsync_file(file: &CfgSyncFile) -> Result<()> { - let path = PathBuf::from(&file.path); - - ensure_parent_dir(&path)?; - - fs::write(&path, &file.content).with_context(|| format!("writing {}", path.display()))?; - - println!("Config saved to {}", path.display()); - Ok(()) -} - -fn ensure_parent_dir(path: &Path) -> Result<()> { - if let Some(parent) = path.parent() { - if !parent.as_os_str().is_empty() { - fs::create_dir_all(parent) - .with_context(|| format!("creating parent directory {}", parent.display()))?; - } - } - Ok(()) -} - -pub async fn run_cfgsync_client_from_env(default_port: u16) -> Result<()> { - let config_file_path = env::var("CFG_FILE_PATH").unwrap_or_else(|_| "config.yaml".to_owned()); - let server_addr = - env::var("CFG_SERVER_ADDR").unwrap_or_else(|_| format!("http://127.0.0.1:{default_port}")); - let ip = parse_ip(&env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_owned())); - let identifier = - env::var("CFG_HOST_IDENTIFIER").unwrap_or_else(|_| "unidentified-node".to_owned()); - - pull_config_files(ClientIp { ip, identifier }, &server_addr, &config_file_path).await -} diff --git a/testing-framework/tools/cfgsync-runtime/src/server.rs b/testing-framework/tools/cfgsync-runtime/src/server.rs deleted file mode 100644 index 4c3e59d..0000000 --- a/testing-framework/tools/cfgsync-runtime/src/server.rs +++ /dev/null @@ -1,101 +0,0 @@ -use std::{collections::HashMap, fs, path::Path, sync::Arc}; - -use anyhow::Context as _; -use cfgsync_core::{CfgSyncFile, CfgSyncPayload, CfgSyncState, ConfigRepo, run_cfgsync}; -use serde::Deserialize; - -#[derive(Debug, Deserialize, Clone)] -pub struct CfgSyncServerConfig { - pub port: u16, - pub bundle_path: String, -} - -impl CfgSyncServerConfig { - pub fn load_from_file(path: &Path) -> anyhow::Result { - let config_content = fs::read_to_string(path) - .with_context(|| format!("failed to read cfgsync config file {}", path.display()))?; - serde_yaml::from_str(&config_content) - .with_context(|| format!("failed to parse cfgsync config file {}", path.display())) - } -} - -#[derive(Debug, Deserialize)] -struct CfgSyncBundle { - nodes: Vec, -} - -#[derive(Debug, Deserialize)] -struct CfgSyncBundleNode { - identifier: String, - #[serde(default)] - files: Vec, - #[serde(default)] - config_yaml: Option, -} - -fn load_bundle(bundle_path: &Path) -> anyhow::Result> { - let bundle = read_cfgsync_bundle(bundle_path)?; - - let configs = bundle - .nodes - .into_iter() - .map(build_repo_entry) - .collect::>(); - - Ok(ConfigRepo::from_bundle(configs)) -} - -fn read_cfgsync_bundle(bundle_path: &Path) -> anyhow::Result { - let bundle_content = fs::read_to_string(bundle_path).with_context(|| { - format!( - "failed to read cfgsync bundle file {}", - bundle_path.display() - ) - })?; - - serde_yaml::from_str(&bundle_content) - .with_context(|| format!("failed to parse cfgsync bundle {}", bundle_path.display())) -} - -fn build_repo_entry(node: CfgSyncBundleNode) -> (String, CfgSyncPayload) { - let files = if node.files.is_empty() { - build_legacy_files(node.config_yaml) - } else { - node.files - }; - - (node.identifier, CfgSyncPayload::from_files(files)) -} - -fn build_legacy_files(config_yaml: Option) -> Vec { - config_yaml - .map(|content| { - vec![CfgSyncFile { - path: "/config.yaml".to_owned(), - content, - }] - }) - .unwrap_or_default() -} - -fn resolve_bundle_path(config_path: &Path, bundle_path: &str) -> std::path::PathBuf { - let path = Path::new(bundle_path); - if path.is_absolute() { - return path.to_path_buf(); - } - - config_path - .parent() - .unwrap_or_else(|| Path::new(".")) - .join(path) -} - -pub async fn run_cfgsync_server(config_path: &Path) -> anyhow::Result<()> { - let config = CfgSyncServerConfig::load_from_file(config_path)?; - let bundle_path = resolve_bundle_path(config_path, &config.bundle_path); - - let repo = load_bundle(&bundle_path)?; - let state = CfgSyncState::new(repo); - run_cfgsync(config.port, state).await?; - Ok(()) -}