296 lines
9.6 KiB
Rust
Raw Normal View History

2026-03-09 08:48:05 +01:00
use std::{fs, path::Path, sync::Arc};
use anyhow::Context as _;
use axum::Router;
2026-03-10 13:56:27 +01:00
use cfgsync_adapter::{
ArtifactSet, CachedSnapshotMaterializer, MaterializedArtifacts, MaterializedArtifactsSink,
PersistingSnapshotMaterializer, RegistrationSnapshotMaterializer, SnapshotConfigSource,
2026-03-10 13:56:27 +01:00
};
use cfgsync_core::{
2026-03-10 13:56:27 +01:00
BundleConfigSource, CfgsyncServerState, NodeArtifactsBundle, NodeConfigSource, RunCfgsyncError,
build_cfgsync_router, serve_cfgsync,
};
use serde::{Deserialize, de::Error as _};
2026-03-10 11:06:16 +01:00
use thiserror::Error;
2026-03-09 08:48:05 +01:00
/// Runtime cfgsync server config loaded from YAML.
#[derive(Debug, Clone, PartialEq, Eq)]
2026-03-10 11:06:16 +01:00
pub struct CfgsyncServerConfig {
2026-03-09 08:48:05 +01:00
pub port: u16,
2026-03-12 07:30:01 +01:00
/// Source used by the runtime-managed cfgsync server.
pub source: CfgsyncServerSource,
2026-03-09 08:48:05 +01:00
}
2026-03-12 07:30:01 +01:00
/// Runtime cfgsync source loaded from config.
///
/// This type is intentionally runtime-oriented:
/// - `Bundle` serves a static precomputed bundle directly
/// - `RegistrationBundle` serves a precomputed bundle through the registration
/// protocol, which is useful when the consumer wants clients to register
/// before receiving already-materialized artifacts
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum CfgsyncServerSource {
Bundle { bundle_path: String },
RegistrationBundle { bundle_path: String },
}
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)]
2026-03-10 11:06:16 +01:00
#[serde(rename_all = "snake_case")]
enum LegacyServingMode {
2026-03-10 11:06:16 +01:00
Bundle,
Registration,
}
#[derive(Debug, Deserialize)]
struct RawCfgsyncServerConfig {
port: u16,
source: Option<CfgsyncServerSource>,
bundle_path: Option<String>,
serving_mode: Option<LegacyServingMode>,
}
2026-03-10 11:06:16 +01:00
#[derive(Debug, Error)]
pub enum LoadCfgsyncServerConfigError {
#[error("failed to read cfgsync config file {path}: {source}")]
Read {
path: String,
#[source]
source: std::io::Error,
},
#[error("failed to parse cfgsync config file {path}: {source}")]
Parse {
path: String,
#[source]
source: serde_yaml::Error,
},
}
impl CfgsyncServerConfig {
2026-03-09 08:48:05 +01:00
/// Loads cfgsync runtime server config from a YAML file.
2026-03-10 11:06:16 +01:00
pub fn load_from_file(path: &Path) -> Result<Self, LoadCfgsyncServerConfigError> {
let config_path = path.display().to_string();
let config_content =
fs::read_to_string(path).map_err(|source| LoadCfgsyncServerConfigError::Read {
path: config_path.clone(),
source,
})?;
let raw: RawCfgsyncServerConfig =
serde_yaml::from_str(&config_content).map_err(|source| {
LoadCfgsyncServerConfigError::Parse {
path: config_path,
source,
}
})?;
Self::from_raw(raw).map_err(|source| LoadCfgsyncServerConfigError::Parse {
path: path.display().to_string(),
source,
2026-03-10 11:06:16 +01:00
})
}
#[must_use]
pub fn for_bundle(port: u16, bundle_path: impl Into<String>) -> Self {
Self {
port,
source: CfgsyncServerSource::Bundle {
bundle_path: bundle_path.into(),
},
2026-03-10 11:06:16 +01:00
}
}
2026-03-09 08:48:05 +01:00
2026-03-10 11:06:16 +01:00
#[must_use]
pub fn for_registration_bundle(port: u16, bundle_path: impl Into<String>) -> Self {
2026-03-10 11:06:16 +01:00
Self {
port,
source: CfgsyncServerSource::RegistrationBundle {
bundle_path: bundle_path.into(),
},
2026-03-10 11:06:16 +01:00
}
}
fn from_raw(raw: RawCfgsyncServerConfig) -> Result<Self, serde_yaml::Error> {
let source = match (raw.source, raw.bundle_path, raw.serving_mode) {
(Some(source), _, _) => source,
(None, Some(bundle_path), Some(LegacyServingMode::Registration)) => {
CfgsyncServerSource::RegistrationBundle { bundle_path }
}
(None, Some(bundle_path), None | Some(LegacyServingMode::Bundle)) => {
CfgsyncServerSource::Bundle { bundle_path }
}
(None, None, _) => {
return Err(serde_yaml::Error::custom(
"cfgsync server config requires source.kind or legacy bundle_path",
));
}
};
Ok(Self {
port: raw.port,
source,
})
}
2026-03-10 11:06:16 +01:00
}
fn load_bundle_provider(bundle_path: &Path) -> anyhow::Result<Arc<dyn NodeConfigSource>> {
let provider = BundleConfigSource::from_yaml_file(bundle_path)
2026-03-09 08:48:05 +01:00
.with_context(|| format!("loading cfgsync provider from {}", bundle_path.display()))?;
Ok(Arc::new(provider))
}
fn load_registration_source(bundle_path: &Path) -> anyhow::Result<Arc<dyn NodeConfigSource>> {
let bundle = load_bundle_yaml(bundle_path)?;
let materialized = build_materialized_artifacts(bundle);
let provider = SnapshotConfigSource::new(materialized);
Ok(Arc::new(provider))
}
fn load_bundle_yaml(bundle_path: &Path) -> anyhow::Result<NodeArtifactsBundle> {
let raw = fs::read_to_string(bundle_path)
.with_context(|| format!("reading cfgsync bundle from {}", bundle_path.display()))?;
serde_yaml::from_str(&raw)
.with_context(|| format!("parsing cfgsync bundle from {}", bundle_path.display()))
}
fn build_materialized_artifacts(bundle: NodeArtifactsBundle) -> MaterializedArtifacts {
let nodes = bundle
.nodes
.into_iter()
.map(|node| cfgsync_adapter::NodeArtifacts {
identifier: node.identifier,
files: node.files,
})
.collect();
MaterializedArtifacts::new(
cfgsync_adapter::NodeArtifactsCatalog::new(nodes),
ArtifactSet::new(bundle.shared_files),
)
}
2026-03-09 08:48:05 +01:00
fn resolve_bundle_path(config_path: &Path, bundle_path: &str) -> std::path::PathBuf {
let path = Path::new(bundle_path);
if path.is_absolute() {
return path.to_path_buf();
}
config_path
.parent()
.unwrap_or_else(|| Path::new("."))
.join(path)
}
/// Loads runtime config and starts cfgsync HTTP server process.
pub async fn serve_cfgsync_from_config(config_path: &Path) -> anyhow::Result<()> {
2026-03-10 11:06:16 +01:00
let config = CfgsyncServerConfig::load_from_file(config_path)?;
let bundle_path = resolve_source_path(config_path, &config.source);
2026-03-09 08:48:05 +01:00
let state = build_server_state(&config, &bundle_path)?;
serve_cfgsync(config.port, state).await?;
2026-03-09 08:48:05 +01:00
Ok(())
}
/// Builds a registration-backed cfgsync router directly from a snapshot
/// materializer.
2026-03-12 07:30:01 +01:00
///
/// This is the main code-driven entrypoint for apps that want cfgsync to own:
/// - node registration
/// - readiness polling
/// - artifact serving
///
/// while the app owns only snapshot materialization logic.
pub fn build_snapshot_cfgsync_router<M>(materializer: M) -> Router
where
M: RegistrationSnapshotMaterializer + 'static,
{
let provider = SnapshotConfigSource::new(CachedSnapshotMaterializer::new(materializer));
build_cfgsync_router(CfgsyncServerState::new(Arc::new(provider)))
}
/// Builds a registration-backed cfgsync router with a persistence hook for
/// ready materialization results.
2026-03-12 07:30:01 +01:00
///
/// Use this when the application wants cfgsync to persist or publish shared
/// artifacts after a snapshot becomes ready.
pub fn build_persisted_snapshot_cfgsync_router<M, S>(materializer: M, sink: S) -> Router
where
M: RegistrationSnapshotMaterializer + 'static,
S: MaterializedArtifactsSink + 'static,
{
let provider = SnapshotConfigSource::new(CachedSnapshotMaterializer::new(
PersistingSnapshotMaterializer::new(materializer, sink),
));
build_cfgsync_router(CfgsyncServerState::new(Arc::new(provider)))
}
2026-03-10 13:56:27 +01:00
/// Runs a registration-backed cfgsync server directly from a snapshot
/// materializer.
2026-03-12 07:30:01 +01:00
///
/// This is the simplest runtime entrypoint when the application already has a
/// materializer value and does not need to compose extra routes.
2026-03-10 13:56:27 +01:00
pub async fn serve_snapshot_cfgsync<M>(port: u16, materializer: M) -> Result<(), RunCfgsyncError>
where
M: RegistrationSnapshotMaterializer + 'static,
{
let router = build_snapshot_cfgsync_router(materializer);
serve_router(port, router).await
}
2026-03-10 13:56:27 +01:00
/// Runs a registration-backed cfgsync server with a persistence hook for ready
/// materialization results.
2026-03-12 07:30:01 +01:00
///
/// This is the direct serving counterpart to
/// [`build_persisted_snapshot_cfgsync_router`].
pub async fn serve_persisted_snapshot_cfgsync<M, S>(
port: u16,
materializer: M,
sink: S,
) -> Result<(), RunCfgsyncError>
where
M: RegistrationSnapshotMaterializer + 'static,
S: MaterializedArtifactsSink + 'static,
{
let router = build_persisted_snapshot_cfgsync_router(materializer, sink);
serve_router(port, router).await
}
async fn serve_router(port: u16, router: Router) -> Result<(), RunCfgsyncError> {
let bind_addr = format!("0.0.0.0:{port}");
let listener = tokio::net::TcpListener::bind(&bind_addr)
.await
.map_err(|source| RunCfgsyncError::Bind { bind_addr, source })?;
axum::serve(listener, router)
.await
.map_err(|source| RunCfgsyncError::Serve { source })?;
Ok(())
2026-03-10 13:56:27 +01:00
}
fn build_server_state(
2026-03-10 11:06:16 +01:00
config: &CfgsyncServerConfig,
source_path: &Path,
) -> anyhow::Result<CfgsyncServerState> {
let repo = match &config.source {
CfgsyncServerSource::Bundle { .. } => load_bundle_provider(source_path)?,
CfgsyncServerSource::RegistrationBundle { .. } => load_registration_source(source_path)?,
};
2026-03-09 08:48:05 +01:00
Ok(CfgsyncServerState::new(repo))
2026-03-09 08:48:05 +01:00
}
fn resolve_source_path(config_path: &Path, source: &CfgsyncServerSource) -> std::path::PathBuf {
match source {
CfgsyncServerSource::Bundle { bundle_path }
| CfgsyncServerSource::RegistrationBundle { bundle_path } => {
resolve_bundle_path(config_path, bundle_path)
}
}
}