mirror of
https://github.com/logos-blockchain/logos-blockchain-testing.git
synced 2026-03-31 08:13:48 +00:00
fix(cfgsync): support generic multi-file payloads and compose deployment split
This commit is contained in:
parent
a099392fdd
commit
03d7779276
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -2891,6 +2891,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"cfgsync-core",
|
||||
"cfgsync-runtime",
|
||||
"kube",
|
||||
"logos-blockchain-http-api-common",
|
||||
|
||||
@ -86,4 +86,4 @@ until "${cfgsync_bin}"; do
|
||||
sleep "$sleep_seconds"
|
||||
done
|
||||
|
||||
exec "${bin_path}" /config.yaml
|
||||
exec "${bin_path}" /config.yaml --deployment /deployment.yaml
|
||||
|
||||
@ -7,6 +7,7 @@ version = { workspace = true }
|
||||
|
||||
[dependencies]
|
||||
# Workspace crates
|
||||
cfgsync-core = { workspace = true }
|
||||
cfgsync_runtime = { workspace = true }
|
||||
lb-framework = { workspace = true }
|
||||
testing-framework-core = { workspace = true }
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
use anyhow::Result;
|
||||
use anyhow::{Result, anyhow};
|
||||
pub(crate) use cfgsync_runtime::render::CfgsyncOutputPaths;
|
||||
use cfgsync_runtime::{
|
||||
bundle::build_cfgsync_bundle_with_hostnames,
|
||||
bundle::{CfgSyncBundle, CfgSyncBundleNode, build_cfgsync_bundle_with_hostnames},
|
||||
render::{
|
||||
CfgsyncConfigOverrides, RenderedCfgsync, ensure_bundle_path,
|
||||
render_cfgsync_yaml_from_template, write_rendered_cfgsync,
|
||||
@ -26,7 +26,8 @@ pub(crate) fn render_cfgsync_from_template<E: CfgsyncEnv>(
|
||||
let cfg = build_cfgsync_server_config();
|
||||
let overrides = build_overrides::<E>(topology, options);
|
||||
let config_yaml = render_cfgsync_yaml_from_template(cfg, &overrides)?;
|
||||
let bundle = build_cfgsync_bundle_with_hostnames::<E>(topology, hostnames)?;
|
||||
let mut bundle = build_cfgsync_bundle_with_hostnames::<E>(topology, hostnames)?;
|
||||
append_deployment_files(&mut bundle)?;
|
||||
let bundle_yaml = serde_yaml::to_string(&bundle)?;
|
||||
|
||||
Ok(RenderedCfgsync {
|
||||
@ -35,6 +36,50 @@ pub(crate) fn render_cfgsync_from_template<E: CfgsyncEnv>(
|
||||
})
|
||||
}
|
||||
|
||||
fn append_deployment_files(bundle: &mut CfgSyncBundle) -> Result<()> {
|
||||
for node in &mut bundle.nodes {
|
||||
if has_file_path(node, "/deployment.yaml") {
|
||||
continue;
|
||||
}
|
||||
|
||||
let config_content = config_file_content(node)
|
||||
.ok_or_else(|| anyhow!("cfgsync bundle node missing /config.yaml"))?;
|
||||
let deployment_yaml = extract_yaml_key(&config_content, "deployment")?;
|
||||
|
||||
node.files
|
||||
.push(build_bundle_file("/deployment.yaml", deployment_yaml));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn has_file_path(node: &CfgSyncBundleNode, path: &str) -> bool {
|
||||
node.files.iter().any(|file| file.path == path)
|
||||
}
|
||||
|
||||
fn config_file_content(node: &CfgSyncBundleNode) -> Option<String> {
|
||||
node.files
|
||||
.iter()
|
||||
.find_map(|file| (file.path == "/config.yaml").then_some(file.content.clone()))
|
||||
}
|
||||
|
||||
fn build_bundle_file(path: &str, content: String) -> cfgsync_core::CfgSyncFile {
|
||||
cfgsync_core::CfgSyncFile {
|
||||
path: path.to_owned(),
|
||||
content,
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_yaml_key(content: &str, key: &str) -> Result<String> {
|
||||
let document: Value = serde_yaml::from_str(content)?;
|
||||
let value = document
|
||||
.get(key)
|
||||
.cloned()
|
||||
.ok_or_else(|| anyhow!("config yaml missing `{key}`"))?;
|
||||
|
||||
Ok(serde_yaml::to_string(&value)?)
|
||||
}
|
||||
|
||||
fn build_cfgsync_server_config() -> Value {
|
||||
let mut root = Mapping::new();
|
||||
root.insert(
|
||||
|
||||
@ -96,14 +96,6 @@ build_test_image::parse_args() {
|
||||
TAR_PATH="${BUNDLE_TAR_PATH:-${DEFAULT_LINUX_TAR}}"
|
||||
|
||||
LOGOS_BLOCKCHAIN_NODE_PATH="${LOGOS_BLOCKCHAIN_NODE_PATH:-}"
|
||||
if [ -z "${LOGOS_BLOCKCHAIN_NODE_PATH}" ]; then
|
||||
# Prefer local checkout when available: this repo currently depends on
|
||||
# lb-framework from nomos-node/tests/testing_framework.
|
||||
local sibling_node_path="${ROOT_DIR}/../nomos-node"
|
||||
if [ -d "${sibling_node_path}/tests/testing_framework" ]; then
|
||||
LOGOS_BLOCKCHAIN_NODE_PATH="${sibling_node_path}"
|
||||
fi
|
||||
fi
|
||||
if [ -n "${LOGOS_BLOCKCHAIN_NODE_PATH}" ] && [ ! -d "${LOGOS_BLOCKCHAIN_NODE_PATH}" ]; then
|
||||
build_test_image::fail "LOGOS_BLOCKCHAIN_NODE_PATH does not exist: ${LOGOS_BLOCKCHAIN_NODE_PATH}"
|
||||
fi
|
||||
|
||||
@ -4,7 +4,7 @@ pub mod server;
|
||||
|
||||
pub use client::{CfgSyncClient, ClientError};
|
||||
pub use repo::{
|
||||
CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncPayload, ConfigRepo,
|
||||
RepoResponse,
|
||||
CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile, CfgSyncPayload,
|
||||
ConfigRepo, RepoResponse,
|
||||
};
|
||||
pub use server::{CfgSyncState, ClientIp, RunCfgsyncError, cfgsync_app, run_cfgsync};
|
||||
|
||||
@ -6,20 +6,47 @@ use tokio::sync::oneshot::Sender;
|
||||
|
||||
pub const CFGSYNC_SCHEMA_VERSION: u16 = 1;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct CfgSyncFile {
|
||||
pub path: String,
|
||||
pub content: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct CfgSyncPayload {
|
||||
pub schema_version: u16,
|
||||
pub config_yaml: String,
|
||||
#[serde(default)]
|
||||
pub files: Vec<CfgSyncFile>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub config_yaml: Option<String>,
|
||||
}
|
||||
|
||||
impl CfgSyncPayload {
|
||||
#[must_use]
|
||||
pub fn new(config_yaml: String) -> Self {
|
||||
pub fn from_files(files: Vec<CfgSyncFile>) -> Self {
|
||||
Self {
|
||||
schema_version: CFGSYNC_SCHEMA_VERSION,
|
||||
config_yaml,
|
||||
files,
|
||||
config_yaml: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn normalized_files(&self, default_config_path: &str) -> Vec<CfgSyncFile> {
|
||||
if !self.files.is_empty() {
|
||||
return self.files.clone();
|
||||
}
|
||||
|
||||
self.config_yaml
|
||||
.as_ref()
|
||||
.map(|content| {
|
||||
vec![CfgSyncFile {
|
||||
path: default_config_path.to_owned(),
|
||||
content: content.clone(),
|
||||
}]
|
||||
})
|
||||
.unwrap_or_default()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
use anyhow::Result;
|
||||
use cfgsync_core::CfgSyncFile;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use testing_framework_core::cfgsync::{CfgsyncEnv, build_cfgsync_node_configs};
|
||||
|
||||
@ -10,7 +11,10 @@ pub struct CfgSyncBundle {
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct CfgSyncBundleNode {
|
||||
pub identifier: String,
|
||||
pub config_yaml: String,
|
||||
#[serde(default)]
|
||||
pub files: Vec<CfgSyncFile>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub config_yaml: Option<String>,
|
||||
}
|
||||
|
||||
pub fn build_cfgsync_bundle_with_hostnames<E: CfgsyncEnv>(
|
||||
@ -24,7 +28,11 @@ pub fn build_cfgsync_bundle_with_hostnames<E: CfgsyncEnv>(
|
||||
.into_iter()
|
||||
.map(|node| CfgSyncBundleNode {
|
||||
identifier: node.identifier,
|
||||
config_yaml: node.config_yaml,
|
||||
files: vec![CfgSyncFile {
|
||||
path: "/config.yaml".to_owned(),
|
||||
content: node.config_yaml,
|
||||
}],
|
||||
config_yaml: None,
|
||||
})
|
||||
.collect(),
|
||||
})
|
||||
|
||||
@ -1,7 +1,11 @@
|
||||
use std::{env, fs, net::Ipv4Addr};
|
||||
use std::{
|
||||
env, fs,
|
||||
net::Ipv4Addr,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use anyhow::{Context as _, Result};
|
||||
use cfgsync_core::{CFGSYNC_SCHEMA_VERSION, CfgSyncClient, ClientIp};
|
||||
use anyhow::{Context as _, Result, anyhow, bail};
|
||||
use cfgsync_core::{CFGSYNC_SCHEMA_VERSION, CfgSyncClient, CfgSyncFile, CfgSyncPayload, ClientIp};
|
||||
use tokio::time::{Duration, sleep};
|
||||
|
||||
const FETCH_ATTEMPTS: usize = 5;
|
||||
@ -11,10 +15,7 @@ fn parse_ip(ip_str: &str) -> Ipv4Addr {
|
||||
ip_str.parse().unwrap_or(Ipv4Addr::LOCALHOST)
|
||||
}
|
||||
|
||||
async fn fetch_with_retry(
|
||||
payload: &ClientIp,
|
||||
server_addr: &str,
|
||||
) -> Result<cfgsync_core::CfgSyncPayload> {
|
||||
async fn fetch_with_retry(payload: &ClientIp, server_addr: &str) -> Result<CfgSyncPayload> {
|
||||
let client = CfgSyncClient::new(server_addr);
|
||||
let mut last_error: Option<anyhow::Error> = None;
|
||||
|
||||
@ -33,29 +34,65 @@ async fn fetch_with_retry(
|
||||
|
||||
match last_error {
|
||||
Some(error) => Err(error),
|
||||
None => Err(anyhow::anyhow!(
|
||||
"cfgsync client fetch failed without an error"
|
||||
)),
|
||||
None => Err(anyhow!("cfgsync client fetch failed without an error")),
|
||||
}
|
||||
}
|
||||
|
||||
async fn pull_to_file(payload: ClientIp, server_addr: &str, config_file: &str) -> Result<()> {
|
||||
async fn pull_config_files(payload: ClientIp, server_addr: &str, config_file: &str) -> Result<()> {
|
||||
let config = fetch_with_retry(&payload, server_addr)
|
||||
.await
|
||||
.context("fetching cfgsync node config")?;
|
||||
ensure_schema_version(&config)?;
|
||||
|
||||
let files = collect_payload_files(&config, config_file)?;
|
||||
|
||||
for file in files {
|
||||
write_cfgsync_file(&file)?;
|
||||
}
|
||||
|
||||
println!("Config files saved");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ensure_schema_version(config: &CfgSyncPayload) -> Result<()> {
|
||||
if config.schema_version != CFGSYNC_SCHEMA_VERSION {
|
||||
anyhow::bail!(
|
||||
bail!(
|
||||
"unsupported cfgsync payload schema version {}, expected {}",
|
||||
config.schema_version,
|
||||
CFGSYNC_SCHEMA_VERSION
|
||||
);
|
||||
}
|
||||
|
||||
fs::write(config_file, &config.config_yaml)
|
||||
.with_context(|| format!("writing config to {}", config_file))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
println!("Config saved to {config_file}");
|
||||
fn collect_payload_files(config: &CfgSyncPayload, config_file: &str) -> Result<Vec<CfgSyncFile>> {
|
||||
let files = config.normalized_files(config_file);
|
||||
if files.is_empty() {
|
||||
bail!("cfgsync payload contains no files");
|
||||
}
|
||||
|
||||
Ok(files)
|
||||
}
|
||||
|
||||
fn write_cfgsync_file(file: &CfgSyncFile) -> Result<()> {
|
||||
let path = PathBuf::from(&file.path);
|
||||
|
||||
ensure_parent_dir(&path)?;
|
||||
|
||||
fs::write(&path, &file.content).with_context(|| format!("writing {}", path.display()))?;
|
||||
|
||||
println!("Config saved to {}", path.display());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ensure_parent_dir(path: &Path) -> Result<()> {
|
||||
if let Some(parent) = path.parent() {
|
||||
if !parent.as_os_str().is_empty() {
|
||||
fs::create_dir_all(parent)
|
||||
.with_context(|| format!("creating parent directory {}", parent.display()))?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -67,5 +104,5 @@ pub async fn run_cfgsync_client_from_env(default_port: u16) -> Result<()> {
|
||||
let identifier =
|
||||
env::var("CFG_HOST_IDENTIFIER").unwrap_or_else(|_| "unidentified-node".to_owned());
|
||||
|
||||
pull_to_file(ClientIp { ip, identifier }, &server_addr, &config_file_path).await
|
||||
pull_config_files(ClientIp { ip, identifier }, &server_addr, &config_file_path).await
|
||||
}
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
use std::{collections::HashMap, fs, path::Path, sync::Arc};
|
||||
|
||||
use anyhow::Context as _;
|
||||
use cfgsync_core::{CfgSyncPayload, CfgSyncState, ConfigRepo, run_cfgsync};
|
||||
use cfgsync_core::{CfgSyncFile, CfgSyncPayload, CfgSyncState, ConfigRepo, run_cfgsync};
|
||||
use serde::Deserialize;
|
||||
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
@ -27,26 +27,55 @@ struct CfgSyncBundle {
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct CfgSyncBundleNode {
|
||||
identifier: String,
|
||||
config_yaml: String,
|
||||
#[serde(default)]
|
||||
files: Vec<CfgSyncFile>,
|
||||
#[serde(default)]
|
||||
config_yaml: Option<String>,
|
||||
}
|
||||
|
||||
fn load_bundle(bundle_path: &Path) -> anyhow::Result<Arc<ConfigRepo>> {
|
||||
let bundle = read_cfgsync_bundle(bundle_path)?;
|
||||
|
||||
let configs = bundle
|
||||
.nodes
|
||||
.into_iter()
|
||||
.map(build_repo_entry)
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
Ok(ConfigRepo::from_bundle(configs))
|
||||
}
|
||||
|
||||
fn read_cfgsync_bundle(bundle_path: &Path) -> anyhow::Result<CfgSyncBundle> {
|
||||
let bundle_content = fs::read_to_string(bundle_path).with_context(|| {
|
||||
format!(
|
||||
"failed to read cfgsync bundle file {}",
|
||||
bundle_path.display()
|
||||
)
|
||||
})?;
|
||||
let bundle: CfgSyncBundle = serde_yaml::from_str(&bundle_content)
|
||||
.with_context(|| format!("failed to parse cfgsync bundle {}", bundle_path.display()))?;
|
||||
|
||||
let configs = bundle
|
||||
.nodes
|
||||
.into_iter()
|
||||
.map(|node| (node.identifier, CfgSyncPayload::new(node.config_yaml)))
|
||||
.collect::<HashMap<_, _>>();
|
||||
serde_yaml::from_str(&bundle_content)
|
||||
.with_context(|| format!("failed to parse cfgsync bundle {}", bundle_path.display()))
|
||||
}
|
||||
|
||||
Ok(ConfigRepo::from_bundle(configs))
|
||||
fn build_repo_entry(node: CfgSyncBundleNode) -> (String, CfgSyncPayload) {
|
||||
let files = if node.files.is_empty() {
|
||||
build_legacy_files(node.config_yaml)
|
||||
} else {
|
||||
node.files
|
||||
};
|
||||
|
||||
(node.identifier, CfgSyncPayload::from_files(files))
|
||||
}
|
||||
|
||||
fn build_legacy_files(config_yaml: Option<String>) -> Vec<CfgSyncFile> {
|
||||
config_yaml
|
||||
.map(|content| {
|
||||
vec![CfgSyncFile {
|
||||
path: "/config.yaml".to_owned(),
|
||||
content,
|
||||
}]
|
||||
})
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
fn resolve_bundle_path(config_path: &Path, bundle_path: &str) -> std::path::PathBuf {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user