feat(cfgsync): support generic multi-file payloads

This commit is contained in:
Andrus Salumets 2026-03-06 19:03:42 +07:00 committed by GitHub
commit cd285484a7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 185 additions and 45 deletions

1
Cargo.lock generated
View File

@ -2891,6 +2891,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"cfgsync-core",
"cfgsync-runtime",
"kube",
"logos-blockchain-http-api-common",

View File

@ -86,4 +86,4 @@ until "${cfgsync_bin}"; do
sleep "$sleep_seconds"
done
exec "${bin_path}" /config.yaml
exec "${bin_path}" /config.yaml --deployment /deployment.yaml

View File

@ -7,6 +7,7 @@ version = { workspace = true }
[dependencies]
# Workspace crates
cfgsync-core = { workspace = true }
cfgsync_runtime = { workspace = true }
lb-framework = { workspace = true }
testing-framework-core = { workspace = true }

View File

@ -1,7 +1,7 @@
use anyhow::Result;
use anyhow::{Result, anyhow};
pub(crate) use cfgsync_runtime::render::CfgsyncOutputPaths;
use cfgsync_runtime::{
bundle::build_cfgsync_bundle_with_hostnames,
bundle::{CfgSyncBundle, CfgSyncBundleNode, build_cfgsync_bundle_with_hostnames},
render::{
CfgsyncConfigOverrides, RenderedCfgsync, ensure_bundle_path,
render_cfgsync_yaml_from_template, write_rendered_cfgsync,
@ -26,7 +26,8 @@ pub(crate) fn render_cfgsync_from_template<E: CfgsyncEnv>(
let cfg = build_cfgsync_server_config();
let overrides = build_overrides::<E>(topology, options);
let config_yaml = render_cfgsync_yaml_from_template(cfg, &overrides)?;
let bundle = build_cfgsync_bundle_with_hostnames::<E>(topology, hostnames)?;
let mut bundle = build_cfgsync_bundle_with_hostnames::<E>(topology, hostnames)?;
append_deployment_files(&mut bundle)?;
let bundle_yaml = serde_yaml::to_string(&bundle)?;
Ok(RenderedCfgsync {
@ -35,6 +36,50 @@ pub(crate) fn render_cfgsync_from_template<E: CfgsyncEnv>(
})
}
fn append_deployment_files(bundle: &mut CfgSyncBundle) -> Result<()> {
for node in &mut bundle.nodes {
if has_file_path(node, "/deployment.yaml") {
continue;
}
let config_content = config_file_content(node)
.ok_or_else(|| anyhow!("cfgsync bundle node missing /config.yaml"))?;
let deployment_yaml = extract_yaml_key(&config_content, "deployment")?;
node.files
.push(build_bundle_file("/deployment.yaml", deployment_yaml));
}
Ok(())
}
fn has_file_path(node: &CfgSyncBundleNode, path: &str) -> bool {
node.files.iter().any(|file| file.path == path)
}
fn config_file_content(node: &CfgSyncBundleNode) -> Option<String> {
node.files
.iter()
.find_map(|file| (file.path == "/config.yaml").then_some(file.content.clone()))
}
fn build_bundle_file(path: &str, content: String) -> cfgsync_core::CfgSyncFile {
cfgsync_core::CfgSyncFile {
path: path.to_owned(),
content,
}
}
fn extract_yaml_key(content: &str, key: &str) -> Result<String> {
let document: Value = serde_yaml::from_str(content)?;
let value = document
.get(key)
.cloned()
.ok_or_else(|| anyhow!("config yaml missing `{key}`"))?;
Ok(serde_yaml::to_string(&value)?)
}
fn build_cfgsync_server_config() -> Value {
let mut root = Mapping::new();
root.insert(

View File

@ -96,14 +96,6 @@ build_test_image::parse_args() {
TAR_PATH="${BUNDLE_TAR_PATH:-${DEFAULT_LINUX_TAR}}"
LOGOS_BLOCKCHAIN_NODE_PATH="${LOGOS_BLOCKCHAIN_NODE_PATH:-}"
if [ -z "${LOGOS_BLOCKCHAIN_NODE_PATH}" ]; then
# Prefer local checkout when available: this repo currently depends on
# lb-framework from nomos-node/tests/testing_framework.
local sibling_node_path="${ROOT_DIR}/../nomos-node"
if [ -d "${sibling_node_path}/tests/testing_framework" ]; then
LOGOS_BLOCKCHAIN_NODE_PATH="${sibling_node_path}"
fi
fi
if [ -n "${LOGOS_BLOCKCHAIN_NODE_PATH}" ] && [ ! -d "${LOGOS_BLOCKCHAIN_NODE_PATH}" ]; then
build_test_image::fail "LOGOS_BLOCKCHAIN_NODE_PATH does not exist: ${LOGOS_BLOCKCHAIN_NODE_PATH}"
fi

View File

@ -4,7 +4,7 @@ pub mod server;
pub use client::{CfgSyncClient, ClientError};
pub use repo::{
CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncPayload, ConfigRepo,
RepoResponse,
CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile, CfgSyncPayload,
ConfigRepo, RepoResponse,
};
pub use server::{CfgSyncState, ClientIp, RunCfgsyncError, cfgsync_app, run_cfgsync};

View File

@ -6,20 +6,47 @@ use tokio::sync::oneshot::Sender;
pub const CFGSYNC_SCHEMA_VERSION: u16 = 1;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CfgSyncFile {
pub path: String,
pub content: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CfgSyncPayload {
pub schema_version: u16,
pub config_yaml: String,
#[serde(default)]
pub files: Vec<CfgSyncFile>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub config_yaml: Option<String>,
}
impl CfgSyncPayload {
#[must_use]
pub fn new(config_yaml: String) -> Self {
pub fn from_files(files: Vec<CfgSyncFile>) -> Self {
Self {
schema_version: CFGSYNC_SCHEMA_VERSION,
config_yaml,
files,
config_yaml: None,
}
}
#[must_use]
pub fn normalized_files(&self, default_config_path: &str) -> Vec<CfgSyncFile> {
if !self.files.is_empty() {
return self.files.clone();
}
self.config_yaml
.as_ref()
.map(|content| {
vec![CfgSyncFile {
path: default_config_path.to_owned(),
content: content.clone(),
}]
})
.unwrap_or_default()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@ -1,4 +1,5 @@
use anyhow::Result;
use cfgsync_core::CfgSyncFile;
use serde::{Deserialize, Serialize};
use testing_framework_core::cfgsync::{CfgsyncEnv, build_cfgsync_node_configs};
@ -10,7 +11,10 @@ pub struct CfgSyncBundle {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CfgSyncBundleNode {
pub identifier: String,
pub config_yaml: String,
#[serde(default)]
pub files: Vec<CfgSyncFile>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub config_yaml: Option<String>,
}
pub fn build_cfgsync_bundle_with_hostnames<E: CfgsyncEnv>(
@ -24,7 +28,11 @@ pub fn build_cfgsync_bundle_with_hostnames<E: CfgsyncEnv>(
.into_iter()
.map(|node| CfgSyncBundleNode {
identifier: node.identifier,
config_yaml: node.config_yaml,
files: vec![CfgSyncFile {
path: "/config.yaml".to_owned(),
content: node.config_yaml,
}],
config_yaml: None,
})
.collect(),
})

View File

@ -1,7 +1,11 @@
use std::{env, fs, net::Ipv4Addr};
use std::{
env, fs,
net::Ipv4Addr,
path::{Path, PathBuf},
};
use anyhow::{Context as _, Result};
use cfgsync_core::{CFGSYNC_SCHEMA_VERSION, CfgSyncClient, ClientIp};
use anyhow::{Context as _, Result, anyhow, bail};
use cfgsync_core::{CFGSYNC_SCHEMA_VERSION, CfgSyncClient, CfgSyncFile, CfgSyncPayload, ClientIp};
use tokio::time::{Duration, sleep};
const FETCH_ATTEMPTS: usize = 5;
@ -11,10 +15,7 @@ fn parse_ip(ip_str: &str) -> Ipv4Addr {
ip_str.parse().unwrap_or(Ipv4Addr::LOCALHOST)
}
async fn fetch_with_retry(
payload: &ClientIp,
server_addr: &str,
) -> Result<cfgsync_core::CfgSyncPayload> {
async fn fetch_with_retry(payload: &ClientIp, server_addr: &str) -> Result<CfgSyncPayload> {
let client = CfgSyncClient::new(server_addr);
let mut last_error: Option<anyhow::Error> = None;
@ -33,29 +34,65 @@ async fn fetch_with_retry(
match last_error {
Some(error) => Err(error),
None => Err(anyhow::anyhow!(
"cfgsync client fetch failed without an error"
)),
None => Err(anyhow!("cfgsync client fetch failed without an error")),
}
}
async fn pull_to_file(payload: ClientIp, server_addr: &str, config_file: &str) -> Result<()> {
async fn pull_config_files(payload: ClientIp, server_addr: &str, config_file: &str) -> Result<()> {
let config = fetch_with_retry(&payload, server_addr)
.await
.context("fetching cfgsync node config")?;
ensure_schema_version(&config)?;
let files = collect_payload_files(&config, config_file)?;
for file in files {
write_cfgsync_file(&file)?;
}
println!("Config files saved");
Ok(())
}
fn ensure_schema_version(config: &CfgSyncPayload) -> Result<()> {
if config.schema_version != CFGSYNC_SCHEMA_VERSION {
anyhow::bail!(
bail!(
"unsupported cfgsync payload schema version {}, expected {}",
config.schema_version,
CFGSYNC_SCHEMA_VERSION
);
}
fs::write(config_file, &config.config_yaml)
.with_context(|| format!("writing config to {}", config_file))?;
Ok(())
}
println!("Config saved to {config_file}");
fn collect_payload_files(config: &CfgSyncPayload, config_file: &str) -> Result<Vec<CfgSyncFile>> {
let files = config.normalized_files(config_file);
if files.is_empty() {
bail!("cfgsync payload contains no files");
}
Ok(files)
}
fn write_cfgsync_file(file: &CfgSyncFile) -> Result<()> {
let path = PathBuf::from(&file.path);
ensure_parent_dir(&path)?;
fs::write(&path, &file.content).with_context(|| format!("writing {}", path.display()))?;
println!("Config saved to {}", path.display());
Ok(())
}
fn ensure_parent_dir(path: &Path) -> Result<()> {
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() {
fs::create_dir_all(parent)
.with_context(|| format!("creating parent directory {}", parent.display()))?;
}
}
Ok(())
}
@ -67,5 +104,5 @@ pub async fn run_cfgsync_client_from_env(default_port: u16) -> Result<()> {
let identifier =
env::var("CFG_HOST_IDENTIFIER").unwrap_or_else(|_| "unidentified-node".to_owned());
pull_to_file(ClientIp { ip, identifier }, &server_addr, &config_file_path).await
pull_config_files(ClientIp { ip, identifier }, &server_addr, &config_file_path).await
}

View File

@ -1,7 +1,7 @@
use std::{collections::HashMap, fs, path::Path, sync::Arc};
use anyhow::Context as _;
use cfgsync_core::{CfgSyncPayload, CfgSyncState, ConfigRepo, run_cfgsync};
use cfgsync_core::{CfgSyncFile, CfgSyncPayload, CfgSyncState, ConfigRepo, run_cfgsync};
use serde::Deserialize;
#[derive(Debug, Deserialize, Clone)]
@ -27,26 +27,55 @@ struct CfgSyncBundle {
#[derive(Debug, Deserialize)]
struct CfgSyncBundleNode {
identifier: String,
config_yaml: String,
#[serde(default)]
files: Vec<CfgSyncFile>,
#[serde(default)]
config_yaml: Option<String>,
}
fn load_bundle(bundle_path: &Path) -> anyhow::Result<Arc<ConfigRepo>> {
let bundle = read_cfgsync_bundle(bundle_path)?;
let configs = bundle
.nodes
.into_iter()
.map(build_repo_entry)
.collect::<HashMap<_, _>>();
Ok(ConfigRepo::from_bundle(configs))
}
fn read_cfgsync_bundle(bundle_path: &Path) -> anyhow::Result<CfgSyncBundle> {
let bundle_content = fs::read_to_string(bundle_path).with_context(|| {
format!(
"failed to read cfgsync bundle file {}",
bundle_path.display()
)
})?;
let bundle: CfgSyncBundle = serde_yaml::from_str(&bundle_content)
.with_context(|| format!("failed to parse cfgsync bundle {}", bundle_path.display()))?;
let configs = bundle
.nodes
.into_iter()
.map(|node| (node.identifier, CfgSyncPayload::new(node.config_yaml)))
.collect::<HashMap<_, _>>();
serde_yaml::from_str(&bundle_content)
.with_context(|| format!("failed to parse cfgsync bundle {}", bundle_path.display()))
}
Ok(ConfigRepo::from_bundle(configs))
fn build_repo_entry(node: CfgSyncBundleNode) -> (String, CfgSyncPayload) {
let files = if node.files.is_empty() {
build_legacy_files(node.config_yaml)
} else {
node.files
};
(node.identifier, CfgSyncPayload::from_files(files))
}
fn build_legacy_files(config_yaml: Option<String>) -> Vec<CfgSyncFile> {
config_yaml
.map(|content| {
vec![CfgSyncFile {
path: "/config.yaml".to_owned(),
content,
}]
})
.unwrap_or_default()
}
fn resolve_bundle_path(config_path: &Path, bundle_path: &str) -> std::path::PathBuf {