Serve precomputed cfgsync artifacts directly

This commit is contained in:
andrussal 2026-03-12 08:27:44 +01:00
parent ec4c42244a
commit cdcb475975
10 changed files with 116 additions and 119 deletions

2
Cargo.lock generated
View File

@ -958,7 +958,6 @@ dependencies = [
"anyhow",
"axum",
"cfgsync-adapter",
"cfgsync-artifacts",
"cfgsync-core",
"clap",
"serde",
@ -2920,6 +2919,7 @@ dependencies = [
"anyhow",
"async-trait",
"cfgsync-adapter",
"cfgsync-artifacts",
"cfgsync-core",
"kube",
"logos-blockchain-http-api-common",

View File

@ -1,12 +1,13 @@
use std::collections::HashMap;
use cfgsync_artifacts::{ArtifactFile, ArtifactSet};
use serde::{Deserialize, Serialize};
/// Fully materialized cfgsync artifacts for a registration set.
///
/// `nodes` holds the node-local files keyed by stable node identifier.
/// `shared` holds files that should be delivered alongside every node.
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct MaterializedArtifacts {
nodes: HashMap<String, ArtifactSet>,
shared: ArtifactSet,

View File

@ -16,7 +16,7 @@ pub use protocol::{
};
pub use render::{
CfgsyncConfigOverrides, CfgsyncOutputPaths, RenderedCfgsync, apply_cfgsync_overrides,
apply_timeout_floor, ensure_bundle_path, load_cfgsync_template_yaml,
apply_timeout_floor, ensure_artifacts_path, load_cfgsync_template_yaml,
render_cfgsync_yaml_from_template, write_rendered_cfgsync,
};
pub use server::{CfgsyncServerState, RunCfgsyncError, build_cfgsync_router, serve_cfgsync};

View File

@ -9,8 +9,8 @@ use thiserror::Error;
pub struct RenderedCfgsync {
/// Serialized cfgsync server config YAML.
pub config_yaml: String,
/// Serialized node bundle YAML.
pub bundle_yaml: String,
/// Serialized precomputed artifact YAML used by cfgsync runtime.
pub artifacts_yaml: String,
}
/// Output paths used when materializing rendered cfgsync files.
@ -18,21 +18,22 @@ pub struct RenderedCfgsync {
pub struct CfgsyncOutputPaths<'a> {
/// Output path for the rendered server config YAML.
pub config_path: &'a Path,
/// Output path for the rendered static bundle YAML.
pub bundle_path: &'a Path,
/// Output path for the rendered precomputed artifacts YAML.
pub artifacts_path: &'a Path,
}
/// Ensures bundle path override exists, defaulting to output bundle file name.
pub fn ensure_bundle_path(bundle_path: &mut Option<String>, output_bundle_path: &Path) {
if bundle_path.is_some() {
/// Ensures artifacts path override exists, defaulting to the output artifacts
/// file name.
pub fn ensure_artifacts_path(artifacts_path: &mut Option<String>, output_artifacts_path: &Path) {
if artifacts_path.is_some() {
return;
}
*bundle_path = Some(
output_bundle_path
*artifacts_path = Some(
output_artifacts_path
.file_name()
.and_then(|name| name.to_str())
.unwrap_or("cfgsync.bundle.yaml")
.unwrap_or("cfgsync.artifacts.yaml")
.to_string(),
);
}
@ -50,7 +51,7 @@ pub fn write_rendered_cfgsync(
output: CfgsyncOutputPaths<'_>,
) -> Result<()> {
fs::write(output.config_path, &rendered.config_yaml)?;
fs::write(output.bundle_path, &rendered.bundle_yaml)?;
fs::write(output.artifacts_path, &rendered.artifacts_yaml)?;
Ok(())
}

View File

@ -13,17 +13,16 @@ version = { workspace = true }
workspace = true
[dependencies]
anyhow = "1"
axum = { default-features = false, features = ["http1", "http2", "tokio"], version = "0.7.5" }
cfgsync-adapter = { workspace = true }
cfgsync-artifacts = { workspace = true }
cfgsync-core = { workspace = true }
clap = { version = "4", features = ["derive"] }
serde = { workspace = true }
serde_yaml = { workspace = true }
thiserror = { workspace = true }
tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" }
tracing = { workspace = true }
anyhow = "1"
axum = { default-features = false, features = ["http1", "http2", "tokio"], version = "0.7.5" }
cfgsync-adapter = { workspace = true }
cfgsync-core = { workspace = true }
clap = { version = "4", features = ["derive"] }
serde = { workspace = true }
serde_yaml = { workspace = true }
thiserror = { workspace = true }
tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" }
tracing = { workspace = true }
[dev-dependencies]
tempfile = { workspace = true }

View File

@ -6,9 +6,8 @@ use cfgsync_adapter::{
CachedSnapshotMaterializer, MaterializedArtifacts, MaterializedArtifactsSink,
PersistingSnapshotMaterializer, RegistrationConfigSource, RegistrationSnapshotMaterializer,
};
use cfgsync_artifacts::ArtifactSet;
use cfgsync_core::{
BundleConfigSource, CfgsyncServerState, NodeArtifactsBundle, NodeConfigSource, RunCfgsyncError,
BundleConfigSource, CfgsyncServerState, NodeConfigSource, RunCfgsyncError,
build_cfgsync_router, serve_cfgsync,
};
use serde::{Deserialize, de::Error as _};
@ -27,7 +26,7 @@ pub struct CfgsyncServerConfig {
///
/// This type is intentionally runtime-oriented:
/// - `Bundle` serves a static precomputed bundle directly
/// - `RegistrationBundle` serves a precomputed bundle through the registration
/// - `Registration` serves precomputed artifacts through the registration
/// protocol, which is useful when the consumer wants clients to register
/// before receiving already-materialized artifacts
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
@ -35,8 +34,12 @@ pub struct CfgsyncServerConfig {
pub enum CfgsyncServerSource {
/// Serve a static precomputed artifact bundle directly.
Bundle { bundle_path: String },
/// Require node registration before serving artifacts from a static bundle.
RegistrationBundle { bundle_path: String },
/// Require node registration before serving precomputed artifacts.
#[serde(alias = "registration_bundle")]
Registration {
#[serde(alias = "bundle_path")]
artifacts_path: String,
},
}
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)]
@ -107,11 +110,11 @@ impl CfgsyncServerConfig {
/// Builds a config that serves a static bundle behind the registration
/// flow.
#[must_use]
pub fn for_registration_bundle(port: u16, bundle_path: impl Into<String>) -> Self {
pub fn for_registration(port: u16, artifacts_path: impl Into<String>) -> Self {
Self {
port,
source: CfgsyncServerSource::RegistrationBundle {
bundle_path: bundle_path.into(),
source: CfgsyncServerSource::Registration {
artifacts_path: artifacts_path.into(),
},
}
}
@ -120,7 +123,9 @@ impl CfgsyncServerConfig {
let source = match (raw.source, raw.bundle_path, raw.serving_mode) {
(Some(source), _, _) => source,
(None, Some(bundle_path), Some(LegacyServingMode::Registration)) => {
CfgsyncServerSource::RegistrationBundle { bundle_path }
CfgsyncServerSource::Registration {
artifacts_path: bundle_path,
}
}
(None, Some(bundle_path), None | Some(LegacyServingMode::Bundle)) => {
CfgsyncServerSource::Bundle { bundle_path }
@ -146,29 +151,29 @@ fn load_bundle_provider(bundle_path: &Path) -> anyhow::Result<Arc<dyn NodeConfig
Ok(Arc::new(provider))
}
fn load_registration_source(bundle_path: &Path) -> anyhow::Result<Arc<dyn NodeConfigSource>> {
let bundle = load_bundle_yaml(bundle_path)?;
let materialized = build_materialized_artifacts(bundle);
fn load_registration_source(artifacts_path: &Path) -> anyhow::Result<Arc<dyn NodeConfigSource>> {
let materialized = load_materialized_artifacts_yaml(artifacts_path)?;
let provider = RegistrationConfigSource::new(materialized);
Ok(Arc::new(provider))
}
fn load_bundle_yaml(bundle_path: &Path) -> anyhow::Result<NodeArtifactsBundle> {
let raw = fs::read_to_string(bundle_path)
.with_context(|| format!("reading cfgsync bundle from {}", bundle_path.display()))?;
fn load_materialized_artifacts_yaml(
artifacts_path: &Path,
) -> anyhow::Result<MaterializedArtifacts> {
let raw = fs::read_to_string(artifacts_path).with_context(|| {
format!(
"reading cfgsync materialized artifacts from {}",
artifacts_path.display()
)
})?;
serde_yaml::from_str(&raw)
.with_context(|| format!("parsing cfgsync bundle from {}", bundle_path.display()))
}
fn build_materialized_artifacts(bundle: NodeArtifactsBundle) -> MaterializedArtifacts {
let nodes = bundle
.nodes
.into_iter()
.map(|node| (node.identifier, ArtifactSet::new(node.files)));
MaterializedArtifacts::from_nodes(nodes).with_shared(ArtifactSet::new(bundle.shared_files))
serde_yaml::from_str(&raw).with_context(|| {
format!(
"parsing cfgsync materialized artifacts from {}",
artifacts_path.display()
)
})
}
fn resolve_bundle_path(config_path: &Path, bundle_path: &str) -> std::path::PathBuf {
@ -278,7 +283,7 @@ fn build_server_state(
) -> anyhow::Result<CfgsyncServerState> {
let repo = match &config.source {
CfgsyncServerSource::Bundle { .. } => load_bundle_provider(source_path)?,
CfgsyncServerSource::RegistrationBundle { .. } => load_registration_source(source_path)?,
CfgsyncServerSource::Registration { .. } => load_registration_source(source_path)?,
};
Ok(CfgsyncServerState::new(repo))
@ -286,9 +291,11 @@ fn build_server_state(
fn resolve_source_path(config_path: &Path, source: &CfgsyncServerSource) -> std::path::PathBuf {
match source {
CfgsyncServerSource::Bundle { bundle_path }
| CfgsyncServerSource::RegistrationBundle { bundle_path } => {
CfgsyncServerSource::Bundle { bundle_path } => {
resolve_bundle_path(config_path, bundle_path)
}
CfgsyncServerSource::Registration { artifacts_path } => {
resolve_bundle_path(config_path, artifacts_path)
}
}
}

View File

@ -8,6 +8,7 @@ version = { workspace = true }
[dependencies]
# Workspace crates
cfgsync-adapter = { workspace = true }
cfgsync-artifacts = { workspace = true }
cfgsync-core = { workspace = true }
lb-framework = { workspace = true }
testing-framework-core = { workspace = true }

View File

@ -1,12 +1,10 @@
use anyhow::Result;
use cfgsync_adapter::static_deployment::{DeploymentAdapter, build_materialized_artifacts};
use cfgsync_artifacts::ArtifactFile;
pub(crate) use cfgsync_core::render::CfgsyncOutputPaths;
use cfgsync_core::{
NodeArtifactsBundle, NodeArtifactsBundleEntry,
render::{
CfgsyncConfigOverrides, RenderedCfgsync, ensure_bundle_path,
render_cfgsync_yaml_from_template, write_rendered_cfgsync,
},
use cfgsync_core::render::{
CfgsyncConfigOverrides, RenderedCfgsync, ensure_artifacts_path,
render_cfgsync_yaml_from_template, write_rendered_cfgsync,
};
use reqwest::Url;
use serde_yaml::{Mapping, Value};
@ -14,7 +12,7 @@ use thiserror::Error;
pub(crate) struct CfgsyncRenderOptions {
pub port: Option<u16>,
pub bundle_path: Option<String>,
pub artifacts_path: Option<String>,
pub min_timeout_secs: Option<u64>,
pub metrics_otlp_ingest_url: Option<Url>,
}
@ -35,69 +33,59 @@ pub(crate) fn render_cfgsync_from_template<E: DeploymentAdapter>(
let cfg = build_cfgsync_server_config();
let overrides = build_overrides::<E>(topology, options);
let config_yaml = render_cfgsync_yaml_from_template(cfg, &overrides)?;
let mut bundle = build_cfgsync_bundle::<E>(topology, hostnames)?;
append_deployment_files(&mut bundle)?;
let bundle_yaml = serde_yaml::to_string(&bundle)?;
let mut materialized = build_materialized_artifacts::<E>(topology, hostnames)?;
append_deployment_files(&mut materialized)?;
let artifacts_yaml = serde_yaml::to_string(&materialized)?;
Ok(RenderedCfgsync {
config_yaml,
bundle_yaml,
artifacts_yaml,
})
}
fn build_cfgsync_bundle<E: DeploymentAdapter>(
topology: &E::Deployment,
hostnames: &[String],
) -> Result<NodeArtifactsBundle> {
let materialized = build_materialized_artifacts::<E>(topology, hostnames)?;
let nodes = materialized
.iter()
.map(|(identifier, artifacts)| NodeArtifactsBundleEntry {
identifier: identifier.to_owned(),
files: artifacts.files.clone(),
})
.collect();
Ok(NodeArtifactsBundle::new(nodes).with_shared_files(materialized.shared().files.clone()))
}
fn append_deployment_files(bundle: &mut NodeArtifactsBundle) -> Result<()> {
if has_shared_file_path(bundle, "/deployment.yaml") {
fn append_deployment_files(
materialized: &mut cfgsync_adapter::MaterializedArtifacts,
) -> Result<()> {
if has_shared_file_path(materialized, "/deployment.yaml") {
return Ok(());
}
let Some(node) = bundle.nodes.first() else {
let Some((identifier, artifacts)) = materialized.iter().next() else {
return Ok(());
};
let config_content =
config_file_content(node).ok_or_else(|| BundleRenderError::MissingConfigFile {
identifier: node.identifier.clone(),
config_file_content(artifacts).ok_or_else(|| BundleRenderError::MissingConfigFile {
identifier: identifier.to_owned(),
})?;
let deployment_yaml = extract_yaml_key(&config_content, "deployment")?;
bundle
.shared_files
.push(build_bundle_file("/deployment.yaml", deployment_yaml));
let mut shared = materialized.shared().clone();
shared
.files
.push(build_artifact_file("/deployment.yaml", deployment_yaml));
*materialized = materialized.clone().with_shared(shared);
Ok(())
}
fn has_shared_file_path(bundle: &NodeArtifactsBundle, path: &str) -> bool {
bundle.shared_files.iter().any(|file| file.path == path)
fn has_shared_file_path(materialized: &cfgsync_adapter::MaterializedArtifacts, path: &str) -> bool {
materialized
.shared()
.files
.iter()
.any(|file| file.path == path)
}
fn config_file_content(node: &NodeArtifactsBundleEntry) -> Option<String> {
node.files
fn config_file_content(artifacts: &cfgsync_artifacts::ArtifactSet) -> Option<String> {
artifacts
.files
.iter()
.find_map(|file| (file.path == "/config.yaml").then_some(file.content.clone()))
}
fn build_bundle_file(path: &str, content: String) -> cfgsync_core::NodeArtifactFile {
cfgsync_core::NodeArtifactFile {
path: path.to_owned(),
content,
}
fn build_artifact_file(path: &str, content: String) -> ArtifactFile {
ArtifactFile::new(path, content)
}
fn extract_yaml_key(content: &str, key: &str) -> Result<String> {
@ -122,11 +110,11 @@ fn build_cfgsync_server_config() -> Value {
let mut source = Mapping::new();
source.insert(
Value::String("kind".to_string()),
Value::String("registration_bundle".to_string()),
Value::String("registration".to_string()),
);
source.insert(
Value::String("bundle_path".to_string()),
Value::String("cfgsync.bundle.yaml".to_string()),
Value::String("artifacts_path".to_string()),
Value::String("cfgsync.artifacts.yaml".to_string()),
);
root.insert(Value::String("source".to_string()), Value::Mapping(source));
@ -140,7 +128,7 @@ pub(crate) fn render_and_write_cfgsync_from_template<E: DeploymentAdapter>(
mut options: CfgsyncRenderOptions,
output: CfgsyncOutputPaths<'_>,
) -> Result<RenderedCfgsync> {
ensure_bundle_path(&mut options.bundle_path, output.bundle_path);
ensure_artifacts_path(&mut options.artifacts_path, output.artifacts_path);
let rendered = render_cfgsync_from_template::<E>(topology, hostnames, options)?;
write_rendered_cfgsync(&rendered, output)?;
@ -154,7 +142,7 @@ fn build_overrides<E: DeploymentAdapter>(
) -> CfgsyncConfigOverrides {
let CfgsyncRenderOptions {
port,
bundle_path,
artifacts_path,
min_timeout_secs,
metrics_otlp_ingest_url,
} = options;
@ -163,7 +151,7 @@ fn build_overrides<E: DeploymentAdapter>(
port,
n_hosts: Some(E::nodes(topology).len()),
timeout_floor_secs: min_timeout_secs,
bundle_path,
bundle_path: artifacts_path,
metrics_otlp_ingest_url: metrics_otlp_ingest_url.map(|url| url.to_string()),
}
}

View File

@ -127,7 +127,7 @@ impl ComposeDeployEnv for LbcExtEnv {
options,
CfgsyncOutputPaths {
config_path: path,
bundle_path: &bundle_path,
artifacts_path: &bundle_path,
},
)?;
Ok(())
@ -190,7 +190,7 @@ fn cfgsync_bundle_path(config_path: &Path) -> PathBuf {
config_path
.parent()
.unwrap_or(config_path)
.join("cfgsync.bundle.yaml")
.join("cfgsync.artifacts.yaml")
}
fn topology_hostnames(topology: &DeploymentPlan) -> Vec<String> {
@ -207,7 +207,7 @@ fn cfgsync_render_options(
) -> CfgsyncRenderOptions {
CfgsyncRenderOptions {
port: Some(port),
bundle_path: None,
artifacts_path: None,
min_timeout_secs: None,
metrics_otlp_ingest_url: metrics_otlp_ingest_url.cloned(),
}

View File

@ -351,24 +351,24 @@ fn render_and_write_cfgsync(
tempdir: &TempDir,
) -> Result<(PathBuf, String, String), AssetsError> {
let cfgsync_file = tempdir.path().join("cfgsync.yaml");
let bundle_file = tempdir.path().join("cfgsync.bundle.yaml");
let (cfgsync_yaml, bundle_yaml) = render_cfgsync_config(
let artifacts_file = tempdir.path().join("cfgsync.artifacts.yaml");
let (cfgsync_yaml, artifacts_yaml) = render_cfgsync_config(
topology,
metrics_otlp_ingest_url,
&cfgsync_file,
&bundle_file,
&artifacts_file,
)?;
Ok((cfgsync_file, cfgsync_yaml, bundle_yaml))
Ok((cfgsync_file, cfgsync_yaml, artifacts_yaml))
}
fn render_and_write_values(
topology: &DeploymentPlan,
tempdir: &TempDir,
cfgsync_yaml: &str,
bundle_yaml: &str,
artifacts_yaml: &str,
) -> Result<PathBuf, AssetsError> {
let values_yaml = render_values_yaml(topology, cfgsync_yaml, bundle_yaml)?;
let values_yaml = render_values_yaml(topology, cfgsync_yaml, artifacts_yaml)?;
write_temp_file(tempdir.path(), "values.yaml", values_yaml)
}
@ -380,7 +380,7 @@ fn render_cfgsync_config(
topology: &DeploymentPlan,
metrics_otlp_ingest_url: Option<&Url>,
cfgsync_file: &Path,
bundle_file: &Path,
artifacts_file: &Path,
) -> Result<(String, String), AssetsError> {
let hostnames = k8s_node_hostnames(topology);
let rendered = render_and_write_cfgsync_from_template::<lb_framework::LbcEnv>(
@ -388,18 +388,18 @@ fn render_cfgsync_config(
&hostnames,
CfgsyncRenderOptions {
port: Some(cfgsync_port()),
bundle_path: Some("cfgsync.bundle.yaml".to_string()),
artifacts_path: Some("cfgsync.artifacts.yaml".to_string()),
min_timeout_secs: Some(CFGSYNC_K8S_TIMEOUT_SECS),
metrics_otlp_ingest_url: metrics_otlp_ingest_url.cloned(),
},
CfgsyncOutputPaths {
config_path: cfgsync_file,
bundle_path: bundle_file,
artifacts_path: artifacts_file,
},
)
.map_err(|source| AssetsError::Cfgsync { source })?;
Ok((rendered.config_yaml, rendered.bundle_yaml))
Ok((rendered.config_yaml, rendered.artifacts_yaml))
}
fn k8s_node_hostnames(topology: &DeploymentPlan) -> Vec<String> {
@ -459,9 +459,9 @@ fn helm_chart_path() -> Result<PathBuf, AssetsError> {
fn render_values_yaml(
topology: &DeploymentPlan,
cfgsync_yaml: &str,
bundle_yaml: &str,
artifacts_yaml: &str,
) -> Result<String, AssetsError> {
let values = build_values(topology, cfgsync_yaml, bundle_yaml);
let values = build_values(topology, cfgsync_yaml, artifacts_yaml);
serde_yaml::to_string(&values).map_err(|source| AssetsError::Values { source })
}