From 03d7779276a692f7b5715e6fa0a56cc2316bae6f Mon Sep 17 00:00:00 2001 From: andrussal Date: Tue, 3 Mar 2026 08:02:30 +0100 Subject: [PATCH] fix(cfgsync): support generic multi-file payloads and compose deployment split --- Cargo.lock | 1 + logos/infra/assets/stack/scripts/run_logos.sh | 2 +- logos/runtime/ext/Cargo.toml | 1 + logos/runtime/ext/src/cfgsync/mod.rs | 51 +++++++++++++- scripts/build/build_test_image.sh | 8 --- .../tools/cfgsync-core/src/lib.rs | 4 +- .../tools/cfgsync-core/src/repo.rs | 33 ++++++++- .../tools/cfgsync-runtime/src/bundle.rs | 12 +++- .../tools/cfgsync-runtime/src/client.rs | 69 ++++++++++++++----- .../tools/cfgsync-runtime/src/server.rs | 49 ++++++++++--- 10 files changed, 185 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 844e836..0145d09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2891,6 +2891,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "cfgsync-core", "cfgsync-runtime", "kube", "logos-blockchain-http-api-common", diff --git a/logos/infra/assets/stack/scripts/run_logos.sh b/logos/infra/assets/stack/scripts/run_logos.sh index 06edebb..45b23f8 100755 --- a/logos/infra/assets/stack/scripts/run_logos.sh +++ b/logos/infra/assets/stack/scripts/run_logos.sh @@ -86,4 +86,4 @@ until "${cfgsync_bin}"; do sleep "$sleep_seconds" done -exec "${bin_path}" /config.yaml +exec "${bin_path}" /config.yaml --deployment /deployment.yaml diff --git a/logos/runtime/ext/Cargo.toml b/logos/runtime/ext/Cargo.toml index 16b4f1d..008bf4f 100644 --- a/logos/runtime/ext/Cargo.toml +++ b/logos/runtime/ext/Cargo.toml @@ -7,6 +7,7 @@ version = { workspace = true } [dependencies] # Workspace crates +cfgsync-core = { workspace = true } cfgsync_runtime = { workspace = true } lb-framework = { workspace = true } testing-framework-core = { workspace = true } diff --git a/logos/runtime/ext/src/cfgsync/mod.rs b/logos/runtime/ext/src/cfgsync/mod.rs index 47c2842..6d0b308 100644 --- a/logos/runtime/ext/src/cfgsync/mod.rs +++ b/logos/runtime/ext/src/cfgsync/mod.rs @@ -1,7 +1,7 @@ -use anyhow::Result; +use anyhow::{Result, anyhow}; pub(crate) use cfgsync_runtime::render::CfgsyncOutputPaths; use cfgsync_runtime::{ - bundle::build_cfgsync_bundle_with_hostnames, + bundle::{CfgSyncBundle, CfgSyncBundleNode, build_cfgsync_bundle_with_hostnames}, render::{ CfgsyncConfigOverrides, RenderedCfgsync, ensure_bundle_path, render_cfgsync_yaml_from_template, write_rendered_cfgsync, @@ -26,7 +26,8 @@ pub(crate) fn render_cfgsync_from_template( let cfg = build_cfgsync_server_config(); let overrides = build_overrides::(topology, options); let config_yaml = render_cfgsync_yaml_from_template(cfg, &overrides)?; - let bundle = build_cfgsync_bundle_with_hostnames::(topology, hostnames)?; + let mut bundle = build_cfgsync_bundle_with_hostnames::(topology, hostnames)?; + append_deployment_files(&mut bundle)?; let bundle_yaml = serde_yaml::to_string(&bundle)?; Ok(RenderedCfgsync { @@ -35,6 +36,50 @@ pub(crate) fn render_cfgsync_from_template( }) } +fn append_deployment_files(bundle: &mut CfgSyncBundle) -> Result<()> { + for node in &mut bundle.nodes { + if has_file_path(node, "/deployment.yaml") { + continue; + } + + let config_content = config_file_content(node) + .ok_or_else(|| anyhow!("cfgsync bundle node missing /config.yaml"))?; + let deployment_yaml = extract_yaml_key(&config_content, "deployment")?; + + node.files + .push(build_bundle_file("/deployment.yaml", deployment_yaml)); + } + + Ok(()) +} + +fn has_file_path(node: &CfgSyncBundleNode, path: &str) -> bool { + node.files.iter().any(|file| file.path == path) +} + +fn config_file_content(node: &CfgSyncBundleNode) -> Option { + node.files + .iter() + .find_map(|file| (file.path == "/config.yaml").then_some(file.content.clone())) +} + +fn build_bundle_file(path: &str, content: String) -> cfgsync_core::CfgSyncFile { + cfgsync_core::CfgSyncFile { + path: path.to_owned(), + content, + } +} + +fn extract_yaml_key(content: &str, key: &str) -> Result { + let document: Value = serde_yaml::from_str(content)?; + let value = document + .get(key) + .cloned() + .ok_or_else(|| anyhow!("config yaml missing `{key}`"))?; + + Ok(serde_yaml::to_string(&value)?) +} + fn build_cfgsync_server_config() -> Value { let mut root = Mapping::new(); root.insert( diff --git a/scripts/build/build_test_image.sh b/scripts/build/build_test_image.sh index 2763df5..e43c7be 100755 --- a/scripts/build/build_test_image.sh +++ b/scripts/build/build_test_image.sh @@ -96,14 +96,6 @@ build_test_image::parse_args() { TAR_PATH="${BUNDLE_TAR_PATH:-${DEFAULT_LINUX_TAR}}" LOGOS_BLOCKCHAIN_NODE_PATH="${LOGOS_BLOCKCHAIN_NODE_PATH:-}" - if [ -z "${LOGOS_BLOCKCHAIN_NODE_PATH}" ]; then - # Prefer local checkout when available: this repo currently depends on - # lb-framework from nomos-node/tests/testing_framework. - local sibling_node_path="${ROOT_DIR}/../nomos-node" - if [ -d "${sibling_node_path}/tests/testing_framework" ]; then - LOGOS_BLOCKCHAIN_NODE_PATH="${sibling_node_path}" - fi - fi if [ -n "${LOGOS_BLOCKCHAIN_NODE_PATH}" ] && [ ! -d "${LOGOS_BLOCKCHAIN_NODE_PATH}" ]; then build_test_image::fail "LOGOS_BLOCKCHAIN_NODE_PATH does not exist: ${LOGOS_BLOCKCHAIN_NODE_PATH}" fi diff --git a/testing-framework/tools/cfgsync-core/src/lib.rs b/testing-framework/tools/cfgsync-core/src/lib.rs index c437b8f..822ae69 100644 --- a/testing-framework/tools/cfgsync-core/src/lib.rs +++ b/testing-framework/tools/cfgsync-core/src/lib.rs @@ -4,7 +4,7 @@ pub mod server; pub use client::{CfgSyncClient, ClientError}; pub use repo::{ - CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncPayload, ConfigRepo, - RepoResponse, + CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile, CfgSyncPayload, + ConfigRepo, RepoResponse, }; pub use server::{CfgSyncState, ClientIp, RunCfgsyncError, cfgsync_app, run_cfgsync}; diff --git a/testing-framework/tools/cfgsync-core/src/repo.rs b/testing-framework/tools/cfgsync-core/src/repo.rs index 0a22b4e..62e320c 100644 --- a/testing-framework/tools/cfgsync-core/src/repo.rs +++ b/testing-framework/tools/cfgsync-core/src/repo.rs @@ -6,20 +6,47 @@ use tokio::sync::oneshot::Sender; pub const CFGSYNC_SCHEMA_VERSION: u16 = 1; +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CfgSyncFile { + pub path: String, + pub content: String, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CfgSyncPayload { pub schema_version: u16, - pub config_yaml: String, + #[serde(default)] + pub files: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub config_yaml: Option, } impl CfgSyncPayload { #[must_use] - pub fn new(config_yaml: String) -> Self { + pub fn from_files(files: Vec) -> Self { Self { schema_version: CFGSYNC_SCHEMA_VERSION, - config_yaml, + files, + config_yaml: None, } } + + #[must_use] + pub fn normalized_files(&self, default_config_path: &str) -> Vec { + if !self.files.is_empty() { + return self.files.clone(); + } + + self.config_yaml + .as_ref() + .map(|content| { + vec![CfgSyncFile { + path: default_config_path.to_owned(), + content: content.clone(), + }] + }) + .unwrap_or_default() + } } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/testing-framework/tools/cfgsync-runtime/src/bundle.rs b/testing-framework/tools/cfgsync-runtime/src/bundle.rs index 1453b55..8aa257d 100644 --- a/testing-framework/tools/cfgsync-runtime/src/bundle.rs +++ b/testing-framework/tools/cfgsync-runtime/src/bundle.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use cfgsync_core::CfgSyncFile; use serde::{Deserialize, Serialize}; use testing_framework_core::cfgsync::{CfgsyncEnv, build_cfgsync_node_configs}; @@ -10,7 +11,10 @@ pub struct CfgSyncBundle { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CfgSyncBundleNode { pub identifier: String, - pub config_yaml: String, + #[serde(default)] + pub files: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub config_yaml: Option, } pub fn build_cfgsync_bundle_with_hostnames( @@ -24,7 +28,11 @@ pub fn build_cfgsync_bundle_with_hostnames( .into_iter() .map(|node| CfgSyncBundleNode { identifier: node.identifier, - config_yaml: node.config_yaml, + files: vec![CfgSyncFile { + path: "/config.yaml".to_owned(), + content: node.config_yaml, + }], + config_yaml: None, }) .collect(), }) diff --git a/testing-framework/tools/cfgsync-runtime/src/client.rs b/testing-framework/tools/cfgsync-runtime/src/client.rs index 75f0836..364fdf6 100644 --- a/testing-framework/tools/cfgsync-runtime/src/client.rs +++ b/testing-framework/tools/cfgsync-runtime/src/client.rs @@ -1,7 +1,11 @@ -use std::{env, fs, net::Ipv4Addr}; +use std::{ + env, fs, + net::Ipv4Addr, + path::{Path, PathBuf}, +}; -use anyhow::{Context as _, Result}; -use cfgsync_core::{CFGSYNC_SCHEMA_VERSION, CfgSyncClient, ClientIp}; +use anyhow::{Context as _, Result, anyhow, bail}; +use cfgsync_core::{CFGSYNC_SCHEMA_VERSION, CfgSyncClient, CfgSyncFile, CfgSyncPayload, ClientIp}; use tokio::time::{Duration, sleep}; const FETCH_ATTEMPTS: usize = 5; @@ -11,10 +15,7 @@ fn parse_ip(ip_str: &str) -> Ipv4Addr { ip_str.parse().unwrap_or(Ipv4Addr::LOCALHOST) } -async fn fetch_with_retry( - payload: &ClientIp, - server_addr: &str, -) -> Result { +async fn fetch_with_retry(payload: &ClientIp, server_addr: &str) -> Result { let client = CfgSyncClient::new(server_addr); let mut last_error: Option = None; @@ -33,29 +34,65 @@ async fn fetch_with_retry( match last_error { Some(error) => Err(error), - None => Err(anyhow::anyhow!( - "cfgsync client fetch failed without an error" - )), + None => Err(anyhow!("cfgsync client fetch failed without an error")), } } -async fn pull_to_file(payload: ClientIp, server_addr: &str, config_file: &str) -> Result<()> { +async fn pull_config_files(payload: ClientIp, server_addr: &str, config_file: &str) -> Result<()> { let config = fetch_with_retry(&payload, server_addr) .await .context("fetching cfgsync node config")?; + ensure_schema_version(&config)?; + let files = collect_payload_files(&config, config_file)?; + + for file in files { + write_cfgsync_file(&file)?; + } + + println!("Config files saved"); + Ok(()) +} + +fn ensure_schema_version(config: &CfgSyncPayload) -> Result<()> { if config.schema_version != CFGSYNC_SCHEMA_VERSION { - anyhow::bail!( + bail!( "unsupported cfgsync payload schema version {}, expected {}", config.schema_version, CFGSYNC_SCHEMA_VERSION ); } - fs::write(config_file, &config.config_yaml) - .with_context(|| format!("writing config to {}", config_file))?; + Ok(()) +} - println!("Config saved to {config_file}"); +fn collect_payload_files(config: &CfgSyncPayload, config_file: &str) -> Result> { + let files = config.normalized_files(config_file); + if files.is_empty() { + bail!("cfgsync payload contains no files"); + } + + Ok(files) +} + +fn write_cfgsync_file(file: &CfgSyncFile) -> Result<()> { + let path = PathBuf::from(&file.path); + + ensure_parent_dir(&path)?; + + fs::write(&path, &file.content).with_context(|| format!("writing {}", path.display()))?; + + println!("Config saved to {}", path.display()); + Ok(()) +} + +fn ensure_parent_dir(path: &Path) -> Result<()> { + if let Some(parent) = path.parent() { + if !parent.as_os_str().is_empty() { + fs::create_dir_all(parent) + .with_context(|| format!("creating parent directory {}", parent.display()))?; + } + } Ok(()) } @@ -67,5 +104,5 @@ pub async fn run_cfgsync_client_from_env(default_port: u16) -> Result<()> { let identifier = env::var("CFG_HOST_IDENTIFIER").unwrap_or_else(|_| "unidentified-node".to_owned()); - pull_to_file(ClientIp { ip, identifier }, &server_addr, &config_file_path).await + pull_config_files(ClientIp { ip, identifier }, &server_addr, &config_file_path).await } diff --git a/testing-framework/tools/cfgsync-runtime/src/server.rs b/testing-framework/tools/cfgsync-runtime/src/server.rs index a497515..4c3e59d 100644 --- a/testing-framework/tools/cfgsync-runtime/src/server.rs +++ b/testing-framework/tools/cfgsync-runtime/src/server.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, fs, path::Path, sync::Arc}; use anyhow::Context as _; -use cfgsync_core::{CfgSyncPayload, CfgSyncState, ConfigRepo, run_cfgsync}; +use cfgsync_core::{CfgSyncFile, CfgSyncPayload, CfgSyncState, ConfigRepo, run_cfgsync}; use serde::Deserialize; #[derive(Debug, Deserialize, Clone)] @@ -27,26 +27,55 @@ struct CfgSyncBundle { #[derive(Debug, Deserialize)] struct CfgSyncBundleNode { identifier: String, - config_yaml: String, + #[serde(default)] + files: Vec, + #[serde(default)] + config_yaml: Option, } fn load_bundle(bundle_path: &Path) -> anyhow::Result> { + let bundle = read_cfgsync_bundle(bundle_path)?; + + let configs = bundle + .nodes + .into_iter() + .map(build_repo_entry) + .collect::>(); + + Ok(ConfigRepo::from_bundle(configs)) +} + +fn read_cfgsync_bundle(bundle_path: &Path) -> anyhow::Result { let bundle_content = fs::read_to_string(bundle_path).with_context(|| { format!( "failed to read cfgsync bundle file {}", bundle_path.display() ) })?; - let bundle: CfgSyncBundle = serde_yaml::from_str(&bundle_content) - .with_context(|| format!("failed to parse cfgsync bundle {}", bundle_path.display()))?; - let configs = bundle - .nodes - .into_iter() - .map(|node| (node.identifier, CfgSyncPayload::new(node.config_yaml))) - .collect::>(); + serde_yaml::from_str(&bundle_content) + .with_context(|| format!("failed to parse cfgsync bundle {}", bundle_path.display())) +} - Ok(ConfigRepo::from_bundle(configs)) +fn build_repo_entry(node: CfgSyncBundleNode) -> (String, CfgSyncPayload) { + let files = if node.files.is_empty() { + build_legacy_files(node.config_yaml) + } else { + node.files + }; + + (node.identifier, CfgSyncPayload::from_files(files)) +} + +fn build_legacy_files(config_yaml: Option) -> Vec { + config_yaml + .map(|content| { + vec![CfgSyncFile { + path: "/config.yaml".to_owned(), + content, + }] + }) + .unwrap_or_default() } fn resolve_bundle_path(config_path: &Path, bundle_path: &str) -> std::path::PathBuf {