From ff658e322d519bb6ecf22c68dacb59f6f83afe32 Mon Sep 17 00:00:00 2001 From: andrussal Date: Thu, 12 Mar 2026 09:51:03 +0100 Subject: [PATCH] Simplify cfgsync runtime naming --- cfgsync/README.md | 14 +- cfgsync/runtime/examples/minimal_cfgsync.rs | 8 +- cfgsync/runtime/src/bin/cfgsync-client.rs | 4 +- cfgsync/runtime/src/bin/cfgsync-server.rs | 4 +- cfgsync/runtime/src/client.rs | 218 ++++++++++++-------- cfgsync/runtime/src/lib.rs | 10 +- cfgsync/runtime/src/server.rs | 63 +++--- 7 files changed, 177 insertions(+), 144 deletions(-) diff --git a/cfgsync/README.md b/cfgsync/README.md index 759cc0f..c113019 100644 --- a/cfgsync/README.md +++ b/cfgsync/README.md @@ -99,7 +99,7 @@ Use it when you want to run cfgsync rather than define its protocol: - client-side fetch/write helpers - server config loading -- direct serving helpers such as `serve_cfgsync(...)` +- direct serving helpers such as `serve(...)` This is the crate that should feel like the normal “start here” path for users integrating cfgsync into a real system. @@ -150,7 +150,7 @@ For a new application, the shortest sensible path is: 1. define a typed registration payload 2. implement `RegistrationSnapshotMaterializer` 3. return node-local and optional shared artifacts -4. serve them with `serve_cfgsync(...)` +4. serve them with `serve(...)` 5. use `CfgsyncClient` or the runtime helpers on the node side That gives you the main value of the library without forcing extra application logic into cfgsync itself. @@ -227,10 +227,10 @@ impl RegistrationSnapshotMaterializer for MyMaterializer { Serving: ```rust -use cfgsync_runtime::serve_cfgsync; +use cfgsync_runtime::serve; # async fn run() -> anyhow::Result<()> { -serve_cfgsync(4400, MyMaterializer).await?; +serve(4400, MyMaterializer).await?; # Ok(()) # } ``` @@ -238,14 +238,14 @@ serve_cfgsync(4400, MyMaterializer).await?; Fetching and writing artifacts: ```rust -use cfgsync_runtime::{ArtifactOutputMap, fetch_and_write_artifacts}; +use cfgsync_runtime::{OutputMap, fetch_and_write}; # async fn run(registration: cfgsync_core::NodeRegistration) -> anyhow::Result<()> { -let outputs = ArtifactOutputMap::new() +let outputs = OutputMap::new() .route("/config.yaml", "/node-data/node-1/config.yaml") .route("deployment-settings.yaml", "/node-data/shared/deployment-settings.yaml"); -fetch_and_write_artifacts(®istration, "http://127.0.0.1:4400", &outputs).await?; +fetch_and_write(®istration, "http://127.0.0.1:4400", &outputs).await?; # Ok(()) # } ``` diff --git a/cfgsync/runtime/examples/minimal_cfgsync.rs b/cfgsync/runtime/examples/minimal_cfgsync.rs index 2235705..b2aabda 100644 --- a/cfgsync/runtime/examples/minimal_cfgsync.rs +++ b/cfgsync/runtime/examples/minimal_cfgsync.rs @@ -4,7 +4,7 @@ use cfgsync_adapter::{ }; use cfgsync_artifacts::{ArtifactFile, ArtifactSet}; use cfgsync_core::NodeRegistration; -use cfgsync_runtime::{ArtifactOutputMap, fetch_and_write_artifacts, serve_cfgsync}; +use cfgsync_runtime::{OutputMap, fetch_and_write, serve}; use tempfile::tempdir; use tokio::time::{Duration, sleep}; @@ -38,17 +38,17 @@ impl RegistrationSnapshotMaterializer for ExampleMaterializer { #[tokio::main] async fn main() -> anyhow::Result<()> { let port = 4400; - let server = tokio::spawn(async move { serve_cfgsync(port, ExampleMaterializer).await }); + let server = tokio::spawn(async move { serve(port, ExampleMaterializer).await }); // Give the server a moment to bind before the client registers. sleep(Duration::from_millis(100)).await; let tempdir = tempdir()?; let config_path = tempdir.path().join("config.yaml"); - let outputs = ArtifactOutputMap::new().route("/config.yaml", &config_path); + let outputs = OutputMap::new().route("/config.yaml", &config_path); let registration = NodeRegistration::new("node-1", "127.0.0.1".parse()?); - fetch_and_write_artifacts(®istration, "http://127.0.0.1:4400", &outputs).await?; + fetch_and_write(®istration, "http://127.0.0.1:4400", &outputs).await?; println!("{}", std::fs::read_to_string(&config_path)?); diff --git a/cfgsync/runtime/src/bin/cfgsync-client.rs b/cfgsync/runtime/src/bin/cfgsync-client.rs index 98c3914..b821679 100644 --- a/cfgsync/runtime/src/bin/cfgsync-client.rs +++ b/cfgsync/runtime/src/bin/cfgsync-client.rs @@ -1,6 +1,6 @@ use std::{env, process}; -use cfgsync_runtime::run_cfgsync_client_from_env; +use cfgsync_runtime::run_client_from_env; const CFGSYNC_PORT_ENV: &str = "LOGOS_BLOCKCHAIN_CFGSYNC_PORT"; const DEFAULT_CFGSYNC_PORT: u16 = 4400; @@ -14,7 +14,7 @@ fn cfgsync_port() -> u16 { #[tokio::main] async fn main() { - if let Err(err) = run_cfgsync_client_from_env(cfgsync_port()).await { + if let Err(err) = run_client_from_env(cfgsync_port()).await { eprintln!("Error: {err}"); process::exit(1); } diff --git a/cfgsync/runtime/src/bin/cfgsync-server.rs b/cfgsync/runtime/src/bin/cfgsync-server.rs index a719044..51db99b 100644 --- a/cfgsync/runtime/src/bin/cfgsync-server.rs +++ b/cfgsync/runtime/src/bin/cfgsync-server.rs @@ -1,6 +1,6 @@ use std::path::PathBuf; -use cfgsync_runtime::serve_cfgsync_from_config; +use cfgsync_runtime::serve_from_config; use clap::Parser; #[derive(Parser, Debug)] @@ -12,5 +12,5 @@ struct Args { #[tokio::main] async fn main() -> anyhow::Result<()> { let args = Args::parse(); - serve_cfgsync_from_config(&args.config).await + serve_from_config(&args.config).await } diff --git a/cfgsync/runtime/src/client.rs b/cfgsync/runtime/src/client.rs index 510da04..e9a19da 100644 --- a/cfgsync/runtime/src/client.rs +++ b/cfgsync/runtime/src/client.rs @@ -19,11 +19,11 @@ const FETCH_RETRY_DELAY: Duration = Duration::from_millis(250); /// Output routing for fetched artifact files. #[derive(Debug, Clone, Default)] -pub struct ArtifactOutputMap { +pub struct OutputMap { routes: HashMap, } -impl ArtifactOutputMap { +impl OutputMap { /// Creates an empty artifact output map. #[must_use] pub fn new() -> Self { @@ -49,101 +49,142 @@ impl ArtifactOutputMap { } } +/// Runtime-oriented cfgsync client that handles registration, fetch, and local +/// artifact materialization. +#[derive(Debug, Clone)] +pub struct Client { + inner: CfgsyncClient, +} + +impl Client { + /// Creates a runtime client that talks to the cfgsync server at + /// `server_addr`. + #[must_use] + pub fn new(server_addr: &str) -> Self { + Self { + inner: CfgsyncClient::new(server_addr), + } + } + + /// Registers a node and fetches its artifact payload from cfgsync. + pub async fn register_and_fetch( + &self, + registration: &NodeRegistration, + ) -> Result { + self.register_node(registration).await?; + + let payload = self + .fetch_with_retry(registration) + .await + .context("fetching node artifacts")?; + ensure_schema_version(&payload)?; + + Ok(payload) + } + + /// Registers a node, fetches its artifact payload, and writes the result + /// using the provided output routing policy. + pub async fn fetch_and_write( + &self, + registration: &NodeRegistration, + outputs: &OutputMap, + ) -> Result<()> { + let payload = self.register_and_fetch(registration).await?; + let files = collect_payload_files(&payload)?; + + for file in files { + write_file(file, outputs)?; + } + + info!(files = files.len(), "cfgsync files saved"); + + Ok(()) + } + + async fn fetch_with_retry( + &self, + registration: &NodeRegistration, + ) -> Result { + for attempt in 1..=FETCH_ATTEMPTS { + match self.fetch_once(registration).await { + Ok(config) => return Ok(config), + Err(error) => { + if attempt == FETCH_ATTEMPTS { + return Err(error).with_context(|| { + format!("fetching node artifacts after {attempt} attempts") + }); + } + + sleep(FETCH_RETRY_DELAY).await; + } + } + } + + unreachable!("cfgsync fetch loop always returns before exhausting attempts"); + } + + async fn fetch_once(&self, registration: &NodeRegistration) -> Result { + self.inner + .fetch_node_config(registration) + .await + .map_err(Into::into) + } + + async fn register_node(&self, registration: &NodeRegistration) -> Result<()> { + for attempt in 1..=FETCH_ATTEMPTS { + match self.inner.register_node(registration).await { + Ok(()) => { + info!(identifier = %registration.identifier, "cfgsync node registered"); + return Ok(()); + } + Err(error) => { + if attempt == FETCH_ATTEMPTS { + return Err(error).with_context(|| { + format!("registering node with cfgsync after {attempt} attempts") + }); + } + + sleep(FETCH_RETRY_DELAY).await; + } + } + } + + unreachable!("cfgsync register loop always returns before exhausting attempts"); + } +} + #[derive(Debug, Error)] enum ClientEnvError { #[error("CFG_HOST_IP `{value}` is not a valid IPv4 address")] InvalidIp { value: String }, } -async fn fetch_with_retry( - payload: &NodeRegistration, - server_addr: &str, -) -> Result { - let client = CfgsyncClient::new(server_addr); - - for attempt in 1..=FETCH_ATTEMPTS { - match fetch_once(&client, payload).await { - Ok(config) => return Ok(config), - Err(error) => { - if attempt == FETCH_ATTEMPTS { - return Err(error).with_context(|| { - format!("fetching cfgsync payload after {attempt} attempts") - }); - } - - sleep(FETCH_RETRY_DELAY).await; - } - } - } - - unreachable!("cfgsync fetch loop always returns before exhausting attempts"); -} - -async fn fetch_once( - client: &CfgsyncClient, - payload: &NodeRegistration, -) -> Result { - let response = client.fetch_node_config(payload).await?; - - Ok(response) -} - -async fn register_node(payload: &NodeRegistration, server_addr: &str) -> Result<()> { - let client = CfgsyncClient::new(server_addr); - - for attempt in 1..=FETCH_ATTEMPTS { - match client.register_node(payload).await { - Ok(()) => { - info!(identifier = %payload.identifier, "cfgsync node registered"); - return Ok(()); - } - Err(error) => { - if attempt == FETCH_ATTEMPTS { - return Err(error).with_context(|| { - format!("registering node with cfgsync after {attempt} attempts") - }); - } - - sleep(FETCH_RETRY_DELAY).await; - } - } - } - - unreachable!("cfgsync register loop always returns before exhausting attempts"); -} - /// Registers a node and fetches its artifact payload from cfgsync. -pub async fn register_and_fetch_artifacts( +/// +/// Prefer [`Client::register_and_fetch`] when you already hold a runtime +/// client value. +pub async fn register_and_fetch( registration: &NodeRegistration, server_addr: &str, ) -> Result { - register_node(registration, server_addr).await?; - - let payload = fetch_with_retry(registration, server_addr) + Client::new(server_addr) + .register_and_fetch(registration) .await - .context("fetching cfgsync node config")?; - ensure_schema_version(&payload)?; - - Ok(payload) } /// Registers a node, fetches its artifact payload, and writes the files using /// the provided output routing policy. -pub async fn fetch_and_write_artifacts( +/// +/// Prefer [`Client::fetch_and_write`] when you already hold a runtime client +/// value. +pub async fn fetch_and_write( registration: &NodeRegistration, server_addr: &str, - outputs: &ArtifactOutputMap, + outputs: &OutputMap, ) -> Result<()> { - let payload = register_and_fetch_artifacts(registration, server_addr).await?; - let files = collect_payload_files(&payload)?; - - for file in files { - write_cfgsync_file(file, outputs)?; - } - - info!(files = files.len(), "cfgsync files saved"); - - Ok(()) + Client::new(server_addr) + .fetch_and_write(registration, outputs) + .await } fn ensure_schema_version(config: &NodeArtifactsPayload) -> Result<()> { @@ -166,7 +207,7 @@ fn collect_payload_files(config: &NodeArtifactsPayload) -> Result<&[NodeArtifact Ok(config.files()) } -fn write_cfgsync_file(file: &NodeArtifactFile, outputs: &ArtifactOutputMap) -> Result<()> { +fn write_file(file: &NodeArtifactFile, outputs: &OutputMap) -> Result<()> { let path = outputs.resolve_path(file); ensure_parent_dir(&path)?; @@ -193,8 +234,8 @@ fn ensure_parent_dir(path: &Path) -> Result<()> { Ok(()) } -/// Resolves cfgsync client inputs from environment and materializes node files. -pub async fn run_cfgsync_client_from_env(default_port: u16) -> Result<()> { +/// Resolves runtime client inputs from environment and materializes node files. +pub async fn run_client_from_env(default_port: u16) -> Result<()> { let server_addr = env::var("CFG_SERVER_ADDR").unwrap_or_else(|_| format!("http://127.0.0.1:{default_port}")); let ip = parse_ip_env(&env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_owned()))?; @@ -203,7 +244,7 @@ pub async fn run_cfgsync_client_from_env(default_port: u16) -> Result<()> { let metadata = parse_registration_payload_env()?; let outputs = build_output_map(); - fetch_and_write_artifacts( + fetch_and_write( &NodeRegistration::new(identifier, ip).with_payload(metadata), &server_addr, &outputs, @@ -232,8 +273,8 @@ fn parse_registration_payload(raw: &str) -> Result { RegistrationPayload::from_json_str(raw).context("parsing CFG_REGISTRATION_METADATA_JSON") } -fn build_output_map() -> ArtifactOutputMap { - let mut outputs = ArtifactOutputMap::default(); +fn build_output_map() -> OutputMap { + let mut outputs = OutputMap::default(); if let Ok(path) = env::var("CFG_FILE_PATH") { outputs = outputs @@ -255,7 +296,6 @@ fn build_output_map() -> ArtifactOutputMap { mod tests { use cfgsync_core::{ CfgsyncServerState, NodeArtifactsBundle, NodeArtifactsBundleEntry, StaticConfigSource, - serve_cfgsync, }; use tempfile::tempdir; @@ -280,15 +320,15 @@ mod tests { let port = allocate_test_port(); let address = format!("http://127.0.0.1:{port}"); let server = tokio::spawn(async move { - serve_cfgsync(port, state) + cfgsync_core::serve_cfgsync(port, state) .await .expect("run cfgsync server"); }); - fetch_and_write_artifacts( + fetch_and_write( &NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")), &address, - &ArtifactOutputMap::default(), + &OutputMap::default(), ) .await .expect("pull config files"); diff --git a/cfgsync/runtime/src/lib.rs b/cfgsync/runtime/src/lib.rs index fcca208..ab6032b 100644 --- a/cfgsync/runtime/src/lib.rs +++ b/cfgsync/runtime/src/lib.rs @@ -3,12 +3,8 @@ pub use cfgsync_core as core; mod client; mod server; -pub use client::{ - ArtifactOutputMap, fetch_and_write_artifacts, register_and_fetch_artifacts, - run_cfgsync_client_from_env, -}; +pub use client::{Client, OutputMap, fetch_and_write, register_and_fetch, run_client_from_env}; pub use server::{ - CfgsyncServerConfig, CfgsyncServerSource, LoadCfgsyncServerConfigError, build_cfgsync_router, - build_persisted_cfgsync_router, serve_cfgsync, serve_cfgsync_from_config, - serve_persisted_cfgsync, + LoadServerConfigError, ServerConfig, ServerSource, build_persisted_router, build_router, serve, + serve_from_config, serve_persisted, }; diff --git a/cfgsync/runtime/src/server.rs b/cfgsync/runtime/src/server.rs index 27e9a27..931a088 100644 --- a/cfgsync/runtime/src/server.rs +++ b/cfgsync/runtime/src/server.rs @@ -15,11 +15,11 @@ use thiserror::Error; /// Runtime cfgsync server config loaded from YAML. #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] -pub struct CfgsyncServerConfig { +pub struct ServerConfig { /// HTTP port to bind the cfgsync server on. pub port: u16, /// Source used by the runtime-managed cfgsync server. - pub source: CfgsyncServerSource, + pub source: ServerSource, } /// Runtime cfgsync source loaded from config. @@ -31,7 +31,7 @@ pub struct CfgsyncServerConfig { /// before receiving already-materialized artifacts #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] #[serde(tag = "kind", rename_all = "snake_case")] -pub enum CfgsyncServerSource { +pub enum ServerSource { /// Serve a static precomputed artifact bundle directly. Bundle { bundle_path: String }, /// Require node registration before serving precomputed artifacts. @@ -39,7 +39,7 @@ pub enum CfgsyncServerSource { } #[derive(Debug, Error)] -pub enum LoadCfgsyncServerConfigError { +pub enum LoadServerConfigError { #[error("failed to read cfgsync config file {path}: {source}")] Read { path: String, @@ -54,23 +54,22 @@ pub enum LoadCfgsyncServerConfigError { }, } -impl CfgsyncServerConfig { +impl ServerConfig { /// Loads cfgsync runtime server config from a YAML file. - pub fn load_from_file(path: &Path) -> Result { + pub fn load_from_file(path: &Path) -> Result { let config_path = path.display().to_string(); let config_content = - fs::read_to_string(path).map_err(|source| LoadCfgsyncServerConfigError::Read { + fs::read_to_string(path).map_err(|source| LoadServerConfigError::Read { path: config_path.clone(), source, })?; - let config: CfgsyncServerConfig = - serde_yaml::from_str(&config_content).map_err(|source| { - LoadCfgsyncServerConfigError::Parse { - path: config_path, - source, - } - })?; + let config: ServerConfig = serde_yaml::from_str(&config_content).map_err(|source| { + LoadServerConfigError::Parse { + path: config_path, + source, + } + })?; Ok(config) } @@ -79,7 +78,7 @@ impl CfgsyncServerConfig { pub fn for_bundle(port: u16, bundle_path: impl Into) -> Self { Self { port, - source: CfgsyncServerSource::Bundle { + source: ServerSource::Bundle { bundle_path: bundle_path.into(), }, } @@ -91,7 +90,7 @@ impl CfgsyncServerConfig { pub fn for_registration(port: u16, artifacts_path: impl Into) -> Self { Self { port, - source: CfgsyncServerSource::Registration { + source: ServerSource::Registration { artifacts_path: artifacts_path.into(), }, } @@ -143,8 +142,8 @@ fn resolve_bundle_path(config_path: &Path, bundle_path: &str) -> std::path::Path } /// Loads runtime config and starts cfgsync HTTP server process. -pub async fn serve_cfgsync_from_config(config_path: &Path) -> anyhow::Result<()> { - let config = CfgsyncServerConfig::load_from_file(config_path)?; +pub async fn serve_from_config(config_path: &Path) -> anyhow::Result<()> { + let config = ServerConfig::load_from_file(config_path)?; let bundle_path = resolve_source_path(config_path, &config.source); let state = build_server_state(&config, &bundle_path)?; @@ -162,7 +161,7 @@ pub async fn serve_cfgsync_from_config(config_path: &Path) -> anyhow::Result<()> /// - artifact serving /// /// while the app owns only snapshot materialization logic. -pub fn build_cfgsync_router(materializer: M) -> Router +pub fn build_router(materializer: M) -> Router where M: RegistrationSnapshotMaterializer + 'static, { @@ -175,7 +174,7 @@ where /// /// Use this when the application wants cfgsync to persist or publish shared /// artifacts after a snapshot becomes ready. -pub fn build_persisted_cfgsync_router(materializer: M, sink: S) -> Router +pub fn build_persisted_router(materializer: M, sink: S) -> Router where M: RegistrationSnapshotMaterializer + 'static, S: MaterializedArtifactsSink + 'static, @@ -192,11 +191,11 @@ where /// /// This is the simplest runtime entrypoint when the application already has a /// materializer value and does not need to compose extra routes. -pub async fn serve_cfgsync(port: u16, materializer: M) -> Result<(), RunCfgsyncError> +pub async fn serve(port: u16, materializer: M) -> Result<(), RunCfgsyncError> where M: RegistrationSnapshotMaterializer + 'static, { - let router = build_cfgsync_router(materializer); + let router = build_router(materializer); serve_router(port, router).await } @@ -204,8 +203,8 @@ where /// materialization results. /// /// This is the direct serving counterpart to -/// [`build_persisted_cfgsync_router`]. -pub async fn serve_persisted_cfgsync( +/// [`build_persisted_router`]. +pub async fn serve_persisted( port: u16, materializer: M, sink: S, @@ -214,7 +213,7 @@ where M: RegistrationSnapshotMaterializer + 'static, S: MaterializedArtifactsSink + 'static, { - let router = build_persisted_cfgsync_router(materializer, sink); + let router = build_persisted_router(materializer, sink); serve_router(port, router).await } @@ -232,23 +231,21 @@ async fn serve_router(port: u16, router: Router) -> Result<(), RunCfgsyncError> } fn build_server_state( - config: &CfgsyncServerConfig, + config: &ServerConfig, source_path: &Path, ) -> anyhow::Result { let repo = match &config.source { - CfgsyncServerSource::Bundle { .. } => load_bundle_provider(source_path)?, - CfgsyncServerSource::Registration { .. } => load_registration_source(source_path)?, + ServerSource::Bundle { .. } => load_bundle_provider(source_path)?, + ServerSource::Registration { .. } => load_registration_source(source_path)?, }; Ok(CfgsyncServerState::new(repo)) } -fn resolve_source_path(config_path: &Path, source: &CfgsyncServerSource) -> std::path::PathBuf { +fn resolve_source_path(config_path: &Path, source: &ServerSource) -> std::path::PathBuf { match source { - CfgsyncServerSource::Bundle { bundle_path } => { - resolve_bundle_path(config_path, bundle_path) - } - CfgsyncServerSource::Registration { artifacts_path } => { + ServerSource::Bundle { bundle_path } => resolve_bundle_path(config_path, bundle_path), + ServerSource::Registration { artifacts_path } => { resolve_bundle_path(config_path, artifacts_path) } }