mirror of
https://github.com/logos-blockchain/logos-blockchain-testing.git
synced 2026-03-31 16:23:08 +00:00
Unify cfgsync around registration materialization
This commit is contained in:
parent
cdcb475975
commit
320b089fbd
3
Cargo.lock
generated
3
Cargo.lock
generated
@ -924,7 +924,6 @@ dependencies = [
|
||||
"cfgsync-core",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror 2.0.18",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -958,6 +957,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"axum",
|
||||
"cfgsync-adapter",
|
||||
"cfgsync-artifacts",
|
||||
"cfgsync-core",
|
||||
"clap",
|
||||
"serde",
|
||||
@ -6557,6 +6557,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"cfgsync-adapter",
|
||||
"cfgsync-artifacts",
|
||||
"futures",
|
||||
"parking_lot",
|
||||
"prometheus-http-query",
|
||||
|
||||
@ -8,13 +8,9 @@ The boundary is simple. `cfgsync` owns transport, registration storage, polling,
|
||||
|
||||
## The model
|
||||
|
||||
There are two ways to use `cfgsync`.
|
||||
There is one main way to use `cfgsync`: nodes register, the server evaluates the current registration snapshot, and the application materializer decides whether artifacts are ready yet. Once ready, cfgsync serves a single payload containing both node-local and shared files.
|
||||
|
||||
The simpler path is static bundle serving. In that mode, all artifacts are known ahead of time and the server just serves a precomputed bundle.
|
||||
|
||||
The more general path is registration-backed serving. In that mode, nodes register first, the server builds a stable registration snapshot, and the application materializer decides when artifacts are ready and what should be served.
|
||||
|
||||
Both paths use the same client protocol and the same artifact payload shape. The difference is only where artifacts come from.
|
||||
Precomputed artifacts still fit this model. They are just a special case where the materializer already knows the final outputs and uses registration only as an identity and readiness gate.
|
||||
|
||||
## Crate roles
|
||||
|
||||
@ -34,11 +30,11 @@ This crate is the application-facing integration layer. The main concepts are `R
|
||||
|
||||
The adapter answers one question: given the current registration snapshot, are artifacts ready yet, and if so, what should be served?
|
||||
|
||||
The crate also includes reusable wrappers such as `CachedSnapshotMaterializer`, `PersistingSnapshotMaterializer`, and `RegistrationConfigSource`. Static deployment-driven rendering still exists, but it lives under `cfgsync_adapter::static_deployment` as a secondary helper path. The main cfgsync model is registration-backed materialization.
|
||||
The crate also includes reusable wrappers such as `CachedSnapshotMaterializer`, `PersistingSnapshotMaterializer`, and `RegistrationConfigSource`. Static deployment-driven rendering still exists for current testing-framework consumers, but it is intentionally a secondary helper path. The main cfgsync model is registration-backed materialization.
|
||||
|
||||
### `cfgsync-runtime`
|
||||
|
||||
This crate provides operational helpers and binaries. It includes client-side fetch/write helpers, server config loading, and direct server entrypoints for materializers. Use this crate when you want to run cfgsync rather than define its protocol or adapter contracts.
|
||||
This crate provides the operational entrypoints. It includes client-side fetch/write helpers, server config loading, and the default `serve_cfgsync(...)` path for snapshot materializers. Use this crate when you want to run cfgsync rather than define its protocol or adapter contracts.
|
||||
|
||||
## Artifact model
|
||||
|
||||
@ -58,13 +54,11 @@ The server stores registrations and builds a `RegistrationSnapshot`. The applica
|
||||
|
||||
If the materializer returns `NotReady`, cfgsync responds accordingly and the client can retry later. If it returns `Ready`, cfgsync serves the resolved artifact payload.
|
||||
|
||||
## Static bundle flow
|
||||
## Precomputed artifacts
|
||||
|
||||
Static bundle mode still exists because it is useful when artifacts are already known.
|
||||
Some consumers know the full artifact set ahead of time. That case still fits the same registration-backed model: the server starts with precomputed `MaterializedArtifacts`, nodes register, and cfgsync serves the right payload once the registration is acceptable.
|
||||
|
||||
That is appropriate for fully precomputed topologies, deterministic fixtures, and test setups where no runtime coordination is needed. In that mode, cfgsync serves from `NodeArtifactsBundle` through `BundleConfigSource`.
|
||||
|
||||
Bundle mode is useful, but it is not the defining idea of the library anymore. The primary model is registration-backed materialization, and the static helpers are intentionally kept off the main adapter surface.
|
||||
The important point is that precomputed artifacts are not a separate public workflow anymore. They are one way to back the same registration/materialization protocol.
|
||||
|
||||
## Example: typed registration metadata
|
||||
|
||||
@ -124,14 +118,16 @@ impl RegistrationSnapshotMaterializer for MyMaterializer {
|
||||
## Example: serving cfgsync
|
||||
|
||||
```rust
|
||||
use cfgsync_runtime::serve_snapshot_cfgsync;
|
||||
use cfgsync_runtime::serve_cfgsync;
|
||||
|
||||
# async fn run() -> anyhow::Result<()> {
|
||||
serve_snapshot_cfgsync(4400, MyMaterializer).await?;
|
||||
serve_cfgsync(4400, MyMaterializer).await?;
|
||||
# Ok(())
|
||||
# }
|
||||
```
|
||||
|
||||
A standalone version of this example lives in `cfgsync/runtime/examples/minimal_cfgsync.rs`.
|
||||
|
||||
## Example: fetching artifacts
|
||||
|
||||
```rust
|
||||
@ -157,7 +153,7 @@ Do not push application-specific topology semantics, genesis or deployment gener
|
||||
|
||||
## Recommended integration path
|
||||
|
||||
If you are integrating a new app, the shortest sensible path is to define a typed registration payload, implement `RegistrationSnapshotMaterializer`, return node-local and optional shared artifacts, serve them with `serve_snapshot_cfgsync(...)`, and use `CfgsyncClient` or the runtime helpers on the node side. That gives you the main library value without forcing extra application logic into cfgsync itself.
|
||||
If you are integrating a new app, the shortest sensible path is to define a typed registration payload, implement `RegistrationSnapshotMaterializer`, return node-local and optional shared artifacts, serve them with `serve_cfgsync(...)`, and use `CfgsyncClient` or the runtime helpers on the node side. That gives you the main library value without forcing extra application logic into cfgsync itself.
|
||||
|
||||
## Compatibility
|
||||
|
||||
|
||||
@ -17,4 +17,3 @@ cfgsync-artifacts = { workspace = true }
|
||||
cfgsync-core = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
|
||||
@ -1,124 +0,0 @@
|
||||
use std::{collections::HashMap, error::Error};
|
||||
|
||||
use cfgsync_artifacts::{ArtifactFile, ArtifactSet};
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::MaterializedArtifacts;
|
||||
|
||||
/// Adapter contract for converting an application deployment model into
|
||||
/// node-specific serialized config payloads.
|
||||
pub trait DeploymentAdapter {
|
||||
/// Application-specific deployment model that cfgsync renders from.
|
||||
type Deployment;
|
||||
/// One node entry inside the deployment model.
|
||||
type Node;
|
||||
/// In-memory node config type produced before serialization.
|
||||
type NodeConfig;
|
||||
/// Adapter-specific failure type raised while building or rewriting
|
||||
/// configs.
|
||||
type Error: Error + Send + Sync + 'static;
|
||||
|
||||
/// Returns the ordered node list that cfgsync should materialize.
|
||||
fn nodes(deployment: &Self::Deployment) -> &[Self::Node];
|
||||
|
||||
/// Returns the stable identifier cfgsync should use for this node.
|
||||
fn node_identifier(index: usize, node: &Self::Node) -> String;
|
||||
|
||||
/// Builds the initial in-memory config for one node before hostname
|
||||
/// rewriting is applied.
|
||||
fn build_node_config(
|
||||
deployment: &Self::Deployment,
|
||||
node: &Self::Node,
|
||||
) -> Result<Self::NodeConfig, Self::Error>;
|
||||
|
||||
/// Rewrites any inter-node references so the config can be served through
|
||||
/// cfgsync using the provided hostnames.
|
||||
fn rewrite_for_hostnames(
|
||||
deployment: &Self::Deployment,
|
||||
node_index: usize,
|
||||
hostnames: &[String],
|
||||
config: &mut Self::NodeConfig,
|
||||
) -> Result<(), Self::Error>;
|
||||
|
||||
/// Serializes the final node config into the file content cfgsync should
|
||||
/// deliver.
|
||||
fn serialize_node_config(config: &Self::NodeConfig) -> Result<String, Self::Error>;
|
||||
}
|
||||
|
||||
/// High-level failures while building adapter output for cfgsync.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum BuildCfgsyncNodesError {
|
||||
#[error("cfgsync hostnames mismatch (nodes={nodes}, hostnames={hostnames})")]
|
||||
HostnameCountMismatch { nodes: usize, hostnames: usize },
|
||||
#[error("cfgsync adapter failed: {source}")]
|
||||
Adapter {
|
||||
#[source]
|
||||
source: super::DynCfgsyncError,
|
||||
},
|
||||
}
|
||||
|
||||
fn adapter_error<E>(source: E) -> BuildCfgsyncNodesError
|
||||
where
|
||||
E: Error + Send + Sync + 'static,
|
||||
{
|
||||
BuildCfgsyncNodesError::Adapter {
|
||||
source: Box::new(source),
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds materialized cfgsync artifacts for a deployment by:
|
||||
/// 1) validating hostname count,
|
||||
/// 2) building each node config,
|
||||
/// 3) rewriting host references,
|
||||
/// 4) serializing each node payload.
|
||||
pub fn build_materialized_artifacts<E: DeploymentAdapter>(
|
||||
deployment: &E::Deployment,
|
||||
hostnames: &[String],
|
||||
) -> Result<MaterializedArtifacts, BuildCfgsyncNodesError> {
|
||||
let nodes = E::nodes(deployment);
|
||||
ensure_hostname_count(nodes.len(), hostnames.len())?;
|
||||
|
||||
let mut output = HashMap::with_capacity(nodes.len());
|
||||
for (index, node) in nodes.iter().enumerate() {
|
||||
let (identifier, artifacts) = build_node_entry::<E>(deployment, node, index, hostnames)?;
|
||||
output.insert(identifier, artifacts);
|
||||
}
|
||||
|
||||
Ok(MaterializedArtifacts::from_nodes(output))
|
||||
}
|
||||
|
||||
fn ensure_hostname_count(nodes: usize, hostnames: usize) -> Result<(), BuildCfgsyncNodesError> {
|
||||
if nodes != hostnames {
|
||||
return Err(BuildCfgsyncNodesError::HostnameCountMismatch { nodes, hostnames });
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn build_node_entry<E: DeploymentAdapter>(
|
||||
deployment: &E::Deployment,
|
||||
node: &E::Node,
|
||||
index: usize,
|
||||
hostnames: &[String],
|
||||
) -> Result<(String, ArtifactSet), BuildCfgsyncNodesError> {
|
||||
let node_config = build_rewritten_node_config::<E>(deployment, node, index, hostnames)?;
|
||||
let config_yaml = E::serialize_node_config(&node_config).map_err(adapter_error)?;
|
||||
|
||||
Ok((
|
||||
E::node_identifier(index, node),
|
||||
ArtifactSet::new(vec![ArtifactFile::new("/config.yaml", &config_yaml)]),
|
||||
))
|
||||
}
|
||||
|
||||
fn build_rewritten_node_config<E: DeploymentAdapter>(
|
||||
deployment: &E::Deployment,
|
||||
node: &E::Node,
|
||||
index: usize,
|
||||
hostnames: &[String],
|
||||
) -> Result<E::NodeConfig, BuildCfgsyncNodesError> {
|
||||
let mut node_config = E::build_node_config(deployment, node).map_err(adapter_error)?;
|
||||
E::rewrite_for_hostnames(deployment, index, hostnames, &mut node_config)
|
||||
.map_err(adapter_error)?;
|
||||
|
||||
Ok(node_config)
|
||||
}
|
||||
@ -1,5 +1,4 @@
|
||||
mod artifacts;
|
||||
mod deployment;
|
||||
mod materializer;
|
||||
mod registrations;
|
||||
mod sources;
|
||||
@ -11,14 +10,3 @@ pub use materializer::{
|
||||
};
|
||||
pub use registrations::RegistrationSnapshot;
|
||||
pub use sources::RegistrationConfigSource;
|
||||
|
||||
/// Static deployment helpers for precomputed cfgsync artifact generation.
|
||||
///
|
||||
/// This module is intentionally secondary to the registration-backed
|
||||
/// materializer flow. Use it when artifacts are already determined by a
|
||||
/// deployment plan and do not need runtime registration to become available.
|
||||
pub mod static_deployment {
|
||||
pub use super::deployment::{
|
||||
BuildCfgsyncNodesError, DeploymentAdapter, build_materialized_artifacts,
|
||||
};
|
||||
}
|
||||
|
||||
@ -64,8 +64,8 @@ pub struct CfgsyncConfigOverrides {
|
||||
pub n_hosts: Option<usize>,
|
||||
/// Minimum timeout to enforce on the rendered template.
|
||||
pub timeout_floor_secs: Option<u64>,
|
||||
/// Override for the bundle path written into cfgsync config.
|
||||
pub bundle_path: Option<String>,
|
||||
/// Override for the precomputed artifacts path written into cfgsync config.
|
||||
pub artifacts_path: Option<String>,
|
||||
/// Optional OTLP metrics endpoint injected into tracing settings.
|
||||
pub metrics_otlp_ingest_url: Option<String>,
|
||||
}
|
||||
@ -113,10 +113,10 @@ pub fn apply_cfgsync_overrides(
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(bundle_path) = &overrides.bundle_path {
|
||||
if let Some(artifacts_path) = &overrides.artifacts_path {
|
||||
root.insert(
|
||||
Value::String("bundle_path".to_string()),
|
||||
Value::String(bundle_path.clone()),
|
||||
Value::String("artifacts_path".to_string()),
|
||||
Value::String(artifacts_path.clone()),
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@ -13,16 +13,17 @@ version = { workspace = true }
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1"
|
||||
axum = { default-features = false, features = ["http1", "http2", "tokio"], version = "0.7.5" }
|
||||
cfgsync-adapter = { workspace = true }
|
||||
cfgsync-core = { workspace = true }
|
||||
clap = { version = "4", features = ["derive"] }
|
||||
serde = { workspace = true }
|
||||
serde_yaml = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" }
|
||||
tracing = { workspace = true }
|
||||
anyhow = "1"
|
||||
axum = { default-features = false, features = ["http1", "http2", "tokio"], version = "0.7.5" }
|
||||
cfgsync-adapter = { workspace = true }
|
||||
cfgsync-artifacts = { workspace = true }
|
||||
cfgsync-core = { workspace = true }
|
||||
clap = { version = "4", features = ["derive"] }
|
||||
serde = { workspace = true }
|
||||
serde_yaml = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" }
|
||||
tracing = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = { workspace = true }
|
||||
|
||||
39
cfgsync/runtime/examples/minimal_cfgsync.rs
Normal file
39
cfgsync/runtime/examples/minimal_cfgsync.rs
Normal file
@ -0,0 +1,39 @@
|
||||
use cfgsync_adapter::{
|
||||
DynCfgsyncError, MaterializationResult, MaterializedArtifacts, RegistrationSnapshot,
|
||||
RegistrationSnapshotMaterializer,
|
||||
};
|
||||
use cfgsync_artifacts::{ArtifactFile, ArtifactSet};
|
||||
use cfgsync_runtime::serve_cfgsync;
|
||||
|
||||
struct ExampleMaterializer;
|
||||
|
||||
impl RegistrationSnapshotMaterializer for ExampleMaterializer {
|
||||
fn materialize_snapshot(
|
||||
&self,
|
||||
registrations: &RegistrationSnapshot,
|
||||
) -> Result<MaterializationResult, DynCfgsyncError> {
|
||||
if registrations.is_empty() {
|
||||
return Ok(MaterializationResult::NotReady);
|
||||
}
|
||||
|
||||
let nodes = registrations.iter().map(|registration| {
|
||||
(
|
||||
registration.identifier.clone(),
|
||||
ArtifactSet::new(vec![ArtifactFile::new(
|
||||
"/config.yaml",
|
||||
format!("id: {}\n", registration.identifier),
|
||||
)]),
|
||||
)
|
||||
});
|
||||
|
||||
Ok(MaterializationResult::ready(
|
||||
MaterializedArtifacts::from_nodes(nodes),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
serve_cfgsync(4400, ExampleMaterializer).await?;
|
||||
Ok(())
|
||||
}
|
||||
@ -8,7 +8,7 @@ pub use client::{
|
||||
run_cfgsync_client_from_env,
|
||||
};
|
||||
pub use server::{
|
||||
CfgsyncServerConfig, CfgsyncServerSource, LoadCfgsyncServerConfigError,
|
||||
build_persisted_snapshot_cfgsync_router, build_snapshot_cfgsync_router,
|
||||
serve_cfgsync_from_config, serve_persisted_snapshot_cfgsync, serve_snapshot_cfgsync,
|
||||
CfgsyncServerConfig, CfgsyncServerSource, LoadCfgsyncServerConfigError, build_cfgsync_router,
|
||||
build_persisted_cfgsync_router, serve_cfgsync, serve_cfgsync_from_config,
|
||||
serve_persisted_cfgsync,
|
||||
};
|
||||
|
||||
@ -8,13 +8,13 @@ use cfgsync_adapter::{
|
||||
};
|
||||
use cfgsync_core::{
|
||||
BundleConfigSource, CfgsyncServerState, NodeConfigSource, RunCfgsyncError,
|
||||
build_cfgsync_router, serve_cfgsync,
|
||||
serve_cfgsync as serve_cfgsync_state,
|
||||
};
|
||||
use serde::{Deserialize, de::Error as _};
|
||||
use serde::Deserialize;
|
||||
use thiserror::Error;
|
||||
|
||||
/// Runtime cfgsync server config loaded from YAML.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
|
||||
pub struct CfgsyncServerConfig {
|
||||
/// HTTP port to bind the cfgsync server on.
|
||||
pub port: u16,
|
||||
@ -35,26 +35,7 @@ pub enum CfgsyncServerSource {
|
||||
/// Serve a static precomputed artifact bundle directly.
|
||||
Bundle { bundle_path: String },
|
||||
/// Require node registration before serving precomputed artifacts.
|
||||
#[serde(alias = "registration_bundle")]
|
||||
Registration {
|
||||
#[serde(alias = "bundle_path")]
|
||||
artifacts_path: String,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
enum LegacyServingMode {
|
||||
Bundle,
|
||||
Registration,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct RawCfgsyncServerConfig {
|
||||
port: u16,
|
||||
source: Option<CfgsyncServerSource>,
|
||||
bundle_path: Option<String>,
|
||||
serving_mode: Option<LegacyServingMode>,
|
||||
Registration { artifacts_path: String },
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
@ -83,7 +64,7 @@ impl CfgsyncServerConfig {
|
||||
source,
|
||||
})?;
|
||||
|
||||
let raw: RawCfgsyncServerConfig =
|
||||
let config: CfgsyncServerConfig =
|
||||
serde_yaml::from_str(&config_content).map_err(|source| {
|
||||
LoadCfgsyncServerConfigError::Parse {
|
||||
path: config_path,
|
||||
@ -91,10 +72,7 @@ impl CfgsyncServerConfig {
|
||||
}
|
||||
})?;
|
||||
|
||||
Self::from_raw(raw).map_err(|source| LoadCfgsyncServerConfigError::Parse {
|
||||
path: path.display().to_string(),
|
||||
source,
|
||||
})
|
||||
Ok(config)
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
@ -118,30 +96,6 @@ impl CfgsyncServerConfig {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn from_raw(raw: RawCfgsyncServerConfig) -> Result<Self, serde_yaml::Error> {
|
||||
let source = match (raw.source, raw.bundle_path, raw.serving_mode) {
|
||||
(Some(source), _, _) => source,
|
||||
(None, Some(bundle_path), Some(LegacyServingMode::Registration)) => {
|
||||
CfgsyncServerSource::Registration {
|
||||
artifacts_path: bundle_path,
|
||||
}
|
||||
}
|
||||
(None, Some(bundle_path), None | Some(LegacyServingMode::Bundle)) => {
|
||||
CfgsyncServerSource::Bundle { bundle_path }
|
||||
}
|
||||
(None, None, _) => {
|
||||
return Err(serde_yaml::Error::custom(
|
||||
"cfgsync server config requires source.kind or legacy bundle_path",
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
port: raw.port,
|
||||
source,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn load_bundle_provider(bundle_path: &Path) -> anyhow::Result<Arc<dyn NodeConfigSource>> {
|
||||
@ -194,12 +148,12 @@ pub async fn serve_cfgsync_from_config(config_path: &Path) -> anyhow::Result<()>
|
||||
let bundle_path = resolve_source_path(config_path, &config.source);
|
||||
|
||||
let state = build_server_state(&config, &bundle_path)?;
|
||||
serve_cfgsync(config.port, state).await?;
|
||||
serve_cfgsync_state(config.port, state).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Builds a registration-backed cfgsync router directly from a snapshot
|
||||
/// Builds the default registration-backed cfgsync router from a snapshot
|
||||
/// materializer.
|
||||
///
|
||||
/// This is the main code-driven entrypoint for apps that want cfgsync to own:
|
||||
@ -208,12 +162,12 @@ pub async fn serve_cfgsync_from_config(config_path: &Path) -> anyhow::Result<()>
|
||||
/// - artifact serving
|
||||
///
|
||||
/// while the app owns only snapshot materialization logic.
|
||||
pub fn build_snapshot_cfgsync_router<M>(materializer: M) -> Router
|
||||
pub fn build_cfgsync_router<M>(materializer: M) -> Router
|
||||
where
|
||||
M: RegistrationSnapshotMaterializer + 'static,
|
||||
{
|
||||
let provider = RegistrationConfigSource::new(CachedSnapshotMaterializer::new(materializer));
|
||||
build_cfgsync_router(CfgsyncServerState::new(Arc::new(provider)))
|
||||
cfgsync_core::build_cfgsync_router(CfgsyncServerState::new(Arc::new(provider)))
|
||||
}
|
||||
|
||||
/// Builds a registration-backed cfgsync router with a persistence hook for
|
||||
@ -221,7 +175,7 @@ where
|
||||
///
|
||||
/// Use this when the application wants cfgsync to persist or publish shared
|
||||
/// artifacts after a snapshot becomes ready.
|
||||
pub fn build_persisted_snapshot_cfgsync_router<M, S>(materializer: M, sink: S) -> Router
|
||||
pub fn build_persisted_cfgsync_router<M, S>(materializer: M, sink: S) -> Router
|
||||
where
|
||||
M: RegistrationSnapshotMaterializer + 'static,
|
||||
S: MaterializedArtifactsSink + 'static,
|
||||
@ -230,19 +184,19 @@ where
|
||||
PersistingSnapshotMaterializer::new(materializer, sink),
|
||||
));
|
||||
|
||||
build_cfgsync_router(CfgsyncServerState::new(Arc::new(provider)))
|
||||
cfgsync_core::build_cfgsync_router(CfgsyncServerState::new(Arc::new(provider)))
|
||||
}
|
||||
|
||||
/// Runs a registration-backed cfgsync server directly from a snapshot
|
||||
/// Runs the default registration-backed cfgsync server directly from a snapshot
|
||||
/// materializer.
|
||||
///
|
||||
/// This is the simplest runtime entrypoint when the application already has a
|
||||
/// materializer value and does not need to compose extra routes.
|
||||
pub async fn serve_snapshot_cfgsync<M>(port: u16, materializer: M) -> Result<(), RunCfgsyncError>
|
||||
pub async fn serve_cfgsync<M>(port: u16, materializer: M) -> Result<(), RunCfgsyncError>
|
||||
where
|
||||
M: RegistrationSnapshotMaterializer + 'static,
|
||||
{
|
||||
let router = build_snapshot_cfgsync_router(materializer);
|
||||
let router = build_cfgsync_router(materializer);
|
||||
serve_router(port, router).await
|
||||
}
|
||||
|
||||
@ -250,7 +204,50 @@ where
|
||||
/// materialization results.
|
||||
///
|
||||
/// This is the direct serving counterpart to
|
||||
/// [`build_persisted_snapshot_cfgsync_router`].
|
||||
/// [`build_persisted_cfgsync_router`].
|
||||
pub async fn serve_persisted_cfgsync<M, S>(
|
||||
port: u16,
|
||||
materializer: M,
|
||||
sink: S,
|
||||
) -> Result<(), RunCfgsyncError>
|
||||
where
|
||||
M: RegistrationSnapshotMaterializer + 'static,
|
||||
S: MaterializedArtifactsSink + 'static,
|
||||
{
|
||||
let router = build_persisted_cfgsync_router(materializer, sink);
|
||||
serve_router(port, router).await
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[allow(dead_code)]
|
||||
pub fn build_snapshot_cfgsync_router<M>(materializer: M) -> Router
|
||||
where
|
||||
M: RegistrationSnapshotMaterializer + 'static,
|
||||
{
|
||||
build_cfgsync_router(materializer)
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[allow(dead_code)]
|
||||
pub fn build_persisted_snapshot_cfgsync_router<M, S>(materializer: M, sink: S) -> Router
|
||||
where
|
||||
M: RegistrationSnapshotMaterializer + 'static,
|
||||
S: MaterializedArtifactsSink + 'static,
|
||||
{
|
||||
build_persisted_cfgsync_router(materializer, sink)
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[allow(dead_code)]
|
||||
pub async fn serve_snapshot_cfgsync<M>(port: u16, materializer: M) -> Result<(), RunCfgsyncError>
|
||||
where
|
||||
M: RegistrationSnapshotMaterializer + 'static,
|
||||
{
|
||||
serve_cfgsync(port, materializer).await
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[allow(dead_code)]
|
||||
pub async fn serve_persisted_snapshot_cfgsync<M, S>(
|
||||
port: u16,
|
||||
materializer: M,
|
||||
@ -260,8 +257,7 @@ where
|
||||
M: RegistrationSnapshotMaterializer + 'static,
|
||||
S: MaterializedArtifactsSink + 'static,
|
||||
{
|
||||
let router = build_persisted_snapshot_cfgsync_router(materializer, sink);
|
||||
serve_router(port, router).await
|
||||
serve_persisted_cfgsync(port, materializer, sink).await
|
||||
}
|
||||
|
||||
async fn serve_router(port: u16, router: Router) -> Result<(), RunCfgsyncError> {
|
||||
|
||||
@ -39,7 +39,7 @@ spec:
|
||||
items:
|
||||
- key: cfgsync.yaml
|
||||
path: cfgsync.yaml
|
||||
- key: cfgsync.bundle.yaml
|
||||
path: cfgsync.bundle.yaml
|
||||
- key: cfgsync.artifacts.yaml
|
||||
path: cfgsync.artifacts.yaml
|
||||
- key: run_cfgsync.sh
|
||||
path: scripts/run_cfgsync.sh
|
||||
|
||||
@ -11,9 +11,9 @@ data:
|
||||
{{- else }}
|
||||
{{ "" | indent 4 }}
|
||||
{{- end }}
|
||||
cfgsync.bundle.yaml: |
|
||||
{{- if .Values.cfgsync.bundle }}
|
||||
{{ .Values.cfgsync.bundle | indent 4 }}
|
||||
cfgsync.artifacts.yaml: |
|
||||
{{- if .Values.cfgsync.artifacts }}
|
||||
{{ .Values.cfgsync.artifacts | indent 4 }}
|
||||
{{- else }}
|
||||
{{ "" | indent 4 }}
|
||||
{{- end }}
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
use anyhow::Result;
|
||||
use cfgsync_adapter::static_deployment::{DeploymentAdapter, build_materialized_artifacts};
|
||||
use cfgsync_artifacts::ArtifactFile;
|
||||
pub(crate) use cfgsync_core::render::CfgsyncOutputPaths;
|
||||
use cfgsync_core::render::{
|
||||
@ -8,6 +7,7 @@ use cfgsync_core::render::{
|
||||
};
|
||||
use reqwest::Url;
|
||||
use serde_yaml::{Mapping, Value};
|
||||
use testing_framework_core::cfgsync::{StaticArtifactRenderer, build_static_artifacts};
|
||||
use thiserror::Error;
|
||||
|
||||
pub(crate) struct CfgsyncRenderOptions {
|
||||
@ -25,7 +25,7 @@ enum BundleRenderError {
|
||||
MissingYamlKey { key: String },
|
||||
}
|
||||
|
||||
pub(crate) fn render_cfgsync_from_template<E: DeploymentAdapter>(
|
||||
pub(crate) fn render_cfgsync_from_template<E: StaticArtifactRenderer>(
|
||||
topology: &E::Deployment,
|
||||
hostnames: &[String],
|
||||
options: CfgsyncRenderOptions,
|
||||
@ -33,7 +33,7 @@ pub(crate) fn render_cfgsync_from_template<E: DeploymentAdapter>(
|
||||
let cfg = build_cfgsync_server_config();
|
||||
let overrides = build_overrides::<E>(topology, options);
|
||||
let config_yaml = render_cfgsync_yaml_from_template(cfg, &overrides)?;
|
||||
let mut materialized = build_materialized_artifacts::<E>(topology, hostnames)?;
|
||||
let mut materialized = build_static_artifacts::<E>(topology, hostnames)?;
|
||||
append_deployment_files(&mut materialized)?;
|
||||
let artifacts_yaml = serde_yaml::to_string(&materialized)?;
|
||||
|
||||
@ -122,7 +122,7 @@ fn build_cfgsync_server_config() -> Value {
|
||||
Value::Mapping(root)
|
||||
}
|
||||
|
||||
pub(crate) fn render_and_write_cfgsync_from_template<E: DeploymentAdapter>(
|
||||
pub(crate) fn render_and_write_cfgsync_from_template<E: StaticArtifactRenderer>(
|
||||
topology: &E::Deployment,
|
||||
hostnames: &[String],
|
||||
mut options: CfgsyncRenderOptions,
|
||||
@ -136,7 +136,7 @@ pub(crate) fn render_and_write_cfgsync_from_template<E: DeploymentAdapter>(
|
||||
Ok(rendered)
|
||||
}
|
||||
|
||||
fn build_overrides<E: DeploymentAdapter>(
|
||||
fn build_overrides<E: StaticArtifactRenderer>(
|
||||
topology: &E::Deployment,
|
||||
options: CfgsyncRenderOptions,
|
||||
) -> CfgsyncConfigOverrides {
|
||||
@ -151,7 +151,7 @@ fn build_overrides<E: DeploymentAdapter>(
|
||||
port,
|
||||
n_hosts: Some(E::nodes(topology).len()),
|
||||
timeout_floor_secs: min_timeout_secs,
|
||||
bundle_path: artifacts_path,
|
||||
artifacts_path,
|
||||
metrics_otlp_ingest_url: metrics_otlp_ingest_url.map(|url| url.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
@ -117,7 +117,7 @@ impl ComposeDeployEnv for LbcExtEnv {
|
||||
nodes = topology.nodes().len(),
|
||||
"updating cfgsync template"
|
||||
);
|
||||
let bundle_path = cfgsync_bundle_path(path);
|
||||
let artifacts_path = cfgsync_artifacts_path(path);
|
||||
let hostnames = topology_hostnames(topology);
|
||||
let options = cfgsync_render_options(port, metrics_otlp_ingest_url);
|
||||
|
||||
@ -127,7 +127,7 @@ impl ComposeDeployEnv for LbcExtEnv {
|
||||
options,
|
||||
CfgsyncOutputPaths {
|
||||
config_path: path,
|
||||
artifacts_path: &bundle_path,
|
||||
artifacts_path: &artifacts_path,
|
||||
},
|
||||
)?;
|
||||
Ok(())
|
||||
@ -186,7 +186,7 @@ fn node_instance_name(index: usize) -> String {
|
||||
format!("node-{index}")
|
||||
}
|
||||
|
||||
fn cfgsync_bundle_path(config_path: &Path) -> PathBuf {
|
||||
fn cfgsync_artifacts_path(config_path: &Path) -> PathBuf {
|
||||
config_path
|
||||
.parent()
|
||||
.unwrap_or(config_path)
|
||||
|
||||
@ -182,11 +182,11 @@ pub fn prepare_assets(
|
||||
let root = workspace_root().map_err(|source| AssetsError::WorkspaceRoot { source })?;
|
||||
let tempdir = create_assets_tempdir()?;
|
||||
|
||||
let (cfgsync_file, cfgsync_yaml, bundle_yaml) =
|
||||
let (cfgsync_file, cfgsync_yaml, artifacts_yaml) =
|
||||
render_and_write_cfgsync(topology, metrics_otlp_ingest_url, &tempdir)?;
|
||||
let scripts = validate_scripts(&root)?;
|
||||
let chart_path = helm_chart_path()?;
|
||||
let values_file = render_and_write_values(topology, &tempdir, &cfgsync_yaml, &bundle_yaml)?;
|
||||
let values_file = render_and_write_values(topology, &tempdir, &cfgsync_yaml, &artifacts_yaml)?;
|
||||
let image = testnet_image();
|
||||
|
||||
log_assets_prepare_done(&cfgsync_file, &values_file, &chart_path, &image);
|
||||
@ -569,7 +569,7 @@ struct KzgValues {
|
||||
struct CfgsyncValues {
|
||||
port: u16,
|
||||
config: String,
|
||||
bundle: String,
|
||||
artifacts: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
@ -589,11 +589,11 @@ struct NodeValues {
|
||||
env: BTreeMap<String, String>,
|
||||
}
|
||||
|
||||
fn build_values(topology: &DeploymentPlan, cfgsync_yaml: &str, bundle_yaml: &str) -> HelmValues {
|
||||
fn build_values(topology: &DeploymentPlan, cfgsync_yaml: &str, artifacts_yaml: &str) -> HelmValues {
|
||||
let cfgsync = CfgsyncValues {
|
||||
port: cfgsync_port(),
|
||||
config: cfgsync_yaml.to_string(),
|
||||
bundle: bundle_yaml.to_string(),
|
||||
artifacts: artifacts_yaml.to_string(),
|
||||
};
|
||||
let kzg = KzgValues::disabled();
|
||||
let image_pull_policy =
|
||||
|
||||
@ -18,6 +18,7 @@ default = []
|
||||
[dependencies]
|
||||
async-trait = "0.1"
|
||||
cfgsync-adapter = { workspace = true }
|
||||
cfgsync-artifacts = { workspace = true }
|
||||
futures = { default-features = false, features = ["std"], version = "0.3" }
|
||||
parking_lot = { workspace = true }
|
||||
prometheus-http-query = "0.8"
|
||||
|
||||
@ -1,5 +1,90 @@
|
||||
#[doc(hidden)]
|
||||
pub use cfgsync_adapter::static_deployment::{
|
||||
DeploymentAdapter as CfgsyncEnv, build_materialized_artifacts as build_cfgsync_node_catalog,
|
||||
};
|
||||
use std::error::Error;
|
||||
|
||||
pub use cfgsync_adapter::*;
|
||||
use cfgsync_artifacts::{ArtifactFile, ArtifactSet};
|
||||
use thiserror::Error;
|
||||
|
||||
#[doc(hidden)]
|
||||
pub type DynCfgsyncError = Box<dyn Error + Send + Sync + 'static>;
|
||||
|
||||
#[doc(hidden)]
|
||||
pub trait StaticArtifactRenderer {
|
||||
type Deployment;
|
||||
type Node;
|
||||
type NodeConfig;
|
||||
type Error: Error + Send + Sync + 'static;
|
||||
|
||||
fn nodes(deployment: &Self::Deployment) -> &[Self::Node];
|
||||
|
||||
fn node_identifier(index: usize, node: &Self::Node) -> String;
|
||||
|
||||
fn build_node_config(
|
||||
deployment: &Self::Deployment,
|
||||
node: &Self::Node,
|
||||
) -> Result<Self::NodeConfig, Self::Error>;
|
||||
|
||||
fn rewrite_for_hostnames(
|
||||
deployment: &Self::Deployment,
|
||||
node_index: usize,
|
||||
hostnames: &[String],
|
||||
config: &mut Self::NodeConfig,
|
||||
) -> Result<(), Self::Error>;
|
||||
|
||||
fn serialize_node_config(config: &Self::NodeConfig) -> Result<String, Self::Error>;
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub use StaticArtifactRenderer as CfgsyncEnv;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum BuildStaticArtifactsError {
|
||||
#[error("cfgsync hostnames mismatch (nodes={nodes}, hostnames={hostnames})")]
|
||||
HostnameCountMismatch { nodes: usize, hostnames: usize },
|
||||
#[error("cfgsync adapter failed: {source}")]
|
||||
Adapter {
|
||||
#[source]
|
||||
source: DynCfgsyncError,
|
||||
},
|
||||
}
|
||||
|
||||
fn adapter_error<E>(source: E) -> BuildStaticArtifactsError
|
||||
where
|
||||
E: Error + Send + Sync + 'static,
|
||||
{
|
||||
BuildStaticArtifactsError::Adapter {
|
||||
source: Box::new(source),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build_static_artifacts<E: StaticArtifactRenderer>(
|
||||
deployment: &E::Deployment,
|
||||
hostnames: &[String],
|
||||
) -> Result<cfgsync_adapter::MaterializedArtifacts, BuildStaticArtifactsError> {
|
||||
let nodes = E::nodes(deployment);
|
||||
|
||||
if nodes.len() != hostnames.len() {
|
||||
return Err(BuildStaticArtifactsError::HostnameCountMismatch {
|
||||
nodes: nodes.len(),
|
||||
hostnames: hostnames.len(),
|
||||
});
|
||||
}
|
||||
|
||||
let mut output = std::collections::HashMap::with_capacity(nodes.len());
|
||||
|
||||
for (index, node) in nodes.iter().enumerate() {
|
||||
let mut node_config = E::build_node_config(deployment, node).map_err(adapter_error)?;
|
||||
E::rewrite_for_hostnames(deployment, index, hostnames, &mut node_config)
|
||||
.map_err(adapter_error)?;
|
||||
let config_yaml = E::serialize_node_config(&node_config).map_err(adapter_error)?;
|
||||
|
||||
output.insert(
|
||||
E::node_identifier(index, node),
|
||||
ArtifactSet::new(vec![ArtifactFile::new("/config.yaml", &config_yaml)]),
|
||||
);
|
||||
}
|
||||
|
||||
Ok(cfgsync_adapter::MaterializedArtifacts::from_nodes(output))
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub use build_static_artifacts as build_cfgsync_node_catalog;
|
||||
|
||||
@ -1,7 +1,3 @@
|
||||
#[deprecated(
|
||||
since = "0.1.0",
|
||||
note = "testing-framework-core::cfgsync moved to cfgsync-adapter; update imports"
|
||||
)]
|
||||
pub mod cfgsync;
|
||||
pub mod env;
|
||||
pub mod runtime;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user