From 13084c3a36f65a84fde48358ffb97f75136ce40a Mon Sep 17 00:00:00 2001 From: andrussal Date: Tue, 10 Mar 2026 09:56:12 +0100 Subject: [PATCH] Use typed cfgsync registration payloads --- cfgsync/core/src/lib.rs | 2 +- cfgsync/core/src/repo.rs | 288 ++++++++++++++++++++++------------ cfgsync/runtime/src/client.rs | 64 ++++---- 3 files changed, 217 insertions(+), 137 deletions(-) diff --git a/cfgsync/core/src/lib.rs b/cfgsync/core/src/lib.rs index c33c32f..e0b3969 100644 --- a/cfgsync/core/src/lib.rs +++ b/cfgsync/core/src/lib.rs @@ -14,6 +14,6 @@ pub use render::{ pub use repo::{ CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile, CfgSyncPayload, ConfigProvider, ConfigRepo, FileConfigProvider, FileConfigProviderError, NodeRegistration, - RegistrationMetadata, RegistrationResponse, RepoResponse, + RegistrationPayload, RegistrationResponse, RepoResponse, }; pub use server::{CfgSyncState, RunCfgsyncError, cfgsync_app, run_cfgsync}; diff --git a/cfgsync/core/src/repo.rs b/cfgsync/core/src/repo.rs index e301aca..15d1332 100644 --- a/cfgsync/core/src/repo.rs +++ b/cfgsync/core/src/repo.rs @@ -1,8 +1,8 @@ use std::{collections::HashMap, fs, net::Ipv4Addr, path::Path, sync::Arc}; use cfgsync_artifacts::ArtifactFile; -use serde::{Deserialize, Serialize}; -use serde_json::{Map, Value}; +use serde::{Deserialize, Deserializer, Serialize, Serializer, de::DeserializeOwned}; +use serde_json::Value; use thiserror::Error; use crate::{CfgSyncBundle, CfgSyncBundleNode}; @@ -23,15 +23,13 @@ pub struct CfgSyncPayload { pub files: Vec, } -/// Adapter-owned registration metadata stored alongside a generic node -/// identity. -#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] -#[serde(transparent)] -pub struct RegistrationMetadata { - values: Map, +/// Adapter-owned registration payload stored alongside a generic node identity. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct RegistrationPayload { + raw_json: Option, } -impl RegistrationMetadata { +impl RegistrationPayload { #[must_use] pub fn new() -> Self { Self::default() @@ -39,41 +37,69 @@ impl RegistrationMetadata { #[must_use] pub fn is_empty(&self) -> bool { - self.values.is_empty() + self.raw_json.is_none() } - #[must_use] - pub fn get(&self, key: &str) -> Option<&Value> { - self.values.get(key) - } - - pub fn insert_json_value(&mut self, key: impl Into, value: Value) { - self.values.insert(key.into(), value); - } - - pub fn insert_serialized( - &mut self, - key: impl Into, - value: T, - ) -> Result<(), serde_json::Error> + pub fn from_serializable(value: &T) -> Result where T: Serialize, { - let value = serde_json::to_value(value)?; - self.insert_json_value(key, value); + Ok(Self { + raw_json: Some(serde_json::to_string(value)?), + }) + } - Ok(()) + pub fn from_json_str(raw_json: &str) -> Result { + let value: Value = serde_json::from_str(raw_json)?; + + Ok(Self { + raw_json: Some(serde_json::to_string(&value)?), + }) + } + + pub fn deserialize(&self) -> Result, serde_json::Error> + where + T: DeserializeOwned, + { + self.raw_json + .as_ref() + .map(|raw_json| serde_json::from_str(raw_json)) + .transpose() } #[must_use] - pub fn values(&self) -> &Map { - &self.values + pub fn raw_json(&self) -> Option<&str> { + self.raw_json.as_deref() } } -impl From> for RegistrationMetadata { - fn from(values: Map) -> Self { - Self { values } +impl Serialize for RegistrationPayload { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self.raw_json.as_deref() { + Some(raw_json) => { + let value: Value = + serde_json::from_str(raw_json).map_err(serde::ser::Error::custom)?; + value.serialize(serializer) + } + None => serializer.serialize_none(), + } + } +} + +impl<'de> Deserialize<'de> for RegistrationPayload { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let value = Option::::deserialize(deserializer)?; + let raw_json = value + .map(|value| serde_json::to_string(&value).map_err(serde::de::Error::custom)) + .transpose()?; + + Ok(Self { raw_json }) } } @@ -82,8 +108,8 @@ impl From> for RegistrationMetadata { pub struct NodeRegistration { pub identifier: String, pub ip: Ipv4Addr, - #[serde(default, skip_serializing_if = "RegistrationMetadata::is_empty")] - pub metadata: RegistrationMetadata, + #[serde(default, skip_serializing_if = "RegistrationPayload::is_empty")] + pub metadata: RegistrationPayload, } impl NodeRegistration { @@ -92,13 +118,21 @@ impl NodeRegistration { Self { identifier: identifier.into(), ip, - metadata: RegistrationMetadata::default(), + metadata: RegistrationPayload::default(), } } + pub fn with_metadata(mut self, metadata: &T) -> Result + where + T: Serialize, + { + self.metadata = RegistrationPayload::from_serializable(metadata)?; + Ok(self) + } + #[must_use] - pub fn with_metadata(mut self, metadata: RegistrationMetadata) -> Self { - self.metadata = metadata; + pub fn with_payload(mut self, payload: RegistrationPayload) -> Self { + self.metadata = payload; self } } @@ -222,66 +256,45 @@ impl ConfigProvider for ConfigRepo { } } -/// Failures when loading a file-backed cfgsync provider. #[derive(Debug, Error)] -pub enum FileConfigProviderError { - #[error("failed to read cfgsync bundle at {path}: {source}")] - Read { +pub enum BundleLoadError { + #[error("reading cfgsync bundle {path}: {source}")] + ReadBundle { path: String, #[source] source: std::io::Error, }, - #[error("failed to parse cfgsync bundle at {path}: {source}")] - Parse { + #[error("parsing cfgsync bundle {path}: {source}")] + ParseBundle { path: String, #[source] source: serde_yaml::Error, }, } -/// YAML bundle-backed provider implementation. -pub struct FileConfigProvider { - inner: ConfigRepo, -} +#[must_use] +pub fn bundle_to_payload_map(bundle: CfgSyncBundle) -> HashMap { + bundle + .nodes + .into_iter() + .map(|node| { + let CfgSyncBundleNode { identifier, files } = node; -impl FileConfigProvider { - /// Loads provider state from a cfgsync bundle YAML file. - pub fn from_yaml_file(path: &Path) -> Result { - let raw = fs::read_to_string(path).map_err(|source| FileConfigProviderError::Read { - path: path.display().to_string(), - source, - })?; - - let bundle: CfgSyncBundle = - serde_yaml::from_str(&raw).map_err(|source| FileConfigProviderError::Parse { - path: path.display().to_string(), - source, - })?; - - let configs = bundle - .nodes - .into_iter() - .map(payload_from_bundle_node) - .collect(); - - Ok(Self { - inner: ConfigRepo { configs }, + (identifier, CfgSyncPayload::from_files(files)) }) - } + .collect() } -impl ConfigProvider for FileConfigProvider { - fn register(&self, registration: NodeRegistration) -> RegistrationResponse { - self.inner.register(registration) - } - - fn resolve(&self, registration: &NodeRegistration) -> RepoResponse { - self.inner.resolve(registration) - } -} - -fn payload_from_bundle_node(node: CfgSyncBundleNode) -> (String, CfgSyncPayload) { - (node.identifier, CfgSyncPayload::from_files(node.files)) +pub fn load_bundle(path: &Path) -> Result { + let path_string = path.display().to_string(); + let raw = fs::read_to_string(path).map_err(|source| BundleLoadError::ReadBundle { + path: path_string.clone(), + source, + })?; + serde_yaml::from_str(&raw).map_err(|source| BundleLoadError::ParseBundle { + path: path_string, + source, + }) } #[cfg(test)] @@ -292,6 +305,38 @@ mod tests { use super::*; + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] + struct ExampleRegistration { + network_port: u16, + service: String, + } + + #[test] + fn registration_payload_round_trips_typed_value() { + let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")) + .with_metadata(&ExampleRegistration { + network_port: 3000, + service: "blend".to_owned(), + }) + .expect("serialize registration metadata"); + + let encoded = serde_json::to_value(®istration).expect("serialize registration"); + let metadata = encoded.get("metadata").expect("registration metadata"); + assert_eq!(metadata.get("network_port"), Some(&Value::from(3000u16))); + assert_eq!(metadata.get("service"), Some(&Value::from("blend"))); + + let decoded: NodeRegistration = + serde_json::from_value(encoded).expect("deserialize registration"); + let typed: ExampleRegistration = decoded + .metadata + .deserialize() + .expect("deserialize metadata") + .expect("registration metadata value"); + + assert_eq!(typed.network_port, 3000); + assert_eq!(typed.service, "blend"); + } + fn sample_payload() -> CfgSyncPayload { CfgSyncPayload::from_files(vec![CfgSyncFile::new("/config.yaml", "key: value")]) } @@ -378,31 +423,66 @@ nodes: RepoResponse::Error(error) => panic!("expected config, got {error}"), } } +} - #[test] - fn registration_metadata_serializes_as_object() { - let mut metadata = RegistrationMetadata::new(); - metadata - .insert_serialized("network_port", 3000_u16) - .expect("serialize metadata"); - metadata.insert_json_value("service", Value::String("blend".to_owned())); +/// Failures when loading a file-backed cfgsync provider. +#[derive(Debug, Error)] +pub enum FileConfigProviderError { + #[error("failed to read cfgsync bundle at {path}: {source}")] + Read { + path: String, + #[source] + source: std::io::Error, + }, + #[error("failed to parse cfgsync bundle at {path}: {source}")] + Parse { + path: String, + #[source] + source: serde_yaml::Error, + }, +} - let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")) - .with_metadata(metadata); +/// YAML bundle-backed provider implementation. +pub struct FileConfigProvider { + inner: ConfigRepo, +} - let encoded = serde_json::to_value(®istration).expect("serialize registration"); - let metadata = encoded - .get("metadata") - .and_then(Value::as_object) - .expect("registration metadata object"); +impl FileConfigProvider { + /// Loads provider state from a cfgsync bundle YAML file. + pub fn from_yaml_file(path: &Path) -> Result { + let raw = fs::read_to_string(path).map_err(|source| FileConfigProviderError::Read { + path: path.display().to_string(), + source, + })?; - assert_eq!( - metadata.get("network_port"), - Some(&Value::Number(3000_u16.into())) - ); - assert_eq!( - metadata.get("service"), - Some(&Value::String("blend".to_owned())) - ); + let bundle: CfgSyncBundle = + serde_yaml::from_str(&raw).map_err(|source| FileConfigProviderError::Parse { + path: path.display().to_string(), + source, + })?; + + let configs = bundle + .nodes + .into_iter() + .map(payload_from_bundle_node) + .collect(); + + Ok(Self { + inner: ConfigRepo { configs }, + }) } } + +impl ConfigProvider for FileConfigProvider { + fn register(&self, registration: NodeRegistration) -> RegistrationResponse { + self.inner.register(registration) + } + + fn resolve(&self, registration: &NodeRegistration) -> RepoResponse { + self.inner.resolve(registration) + } +} + +fn payload_from_bundle_node(node: CfgSyncBundleNode) -> (String, CfgSyncPayload) { + (node.identifier, CfgSyncPayload::from_files(node.files)) +} diff --git a/cfgsync/runtime/src/client.rs b/cfgsync/runtime/src/client.rs index 7cc113c..00af686 100644 --- a/cfgsync/runtime/src/client.rs +++ b/cfgsync/runtime/src/client.rs @@ -7,9 +7,8 @@ use std::{ use anyhow::{Context as _, Result, bail}; use cfgsync_core::{ CFGSYNC_SCHEMA_VERSION, CfgSyncClient, CfgSyncFile, CfgSyncPayload, NodeRegistration, - RegistrationMetadata, + RegistrationPayload, }; -use serde_json::Value; use thiserror::Error; use tokio::time::{Duration, sleep}; use tracing::info; @@ -21,8 +20,6 @@ const FETCH_RETRY_DELAY: Duration = Duration::from_millis(250); enum ClientEnvError { #[error("CFG_HOST_IP `{value}` is not a valid IPv4 address")] InvalidIp { value: String }, - #[error("CFG_REGISTRATION_METADATA_JSON must be a JSON object")] - InvalidRegistrationMetadataShape, } async fn fetch_with_retry(payload: &NodeRegistration, server_addr: &str) -> Result { @@ -149,10 +146,10 @@ pub async fn run_cfgsync_client_from_env(default_port: u16) -> Result<()> { let ip = parse_ip_env(&env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_owned()))?; let identifier = env::var("CFG_HOST_IDENTIFIER").unwrap_or_else(|_| "unidentified-node".to_owned()); - let metadata = parse_registration_metadata_env()?; + let metadata = parse_registration_payload_env()?; pull_config_files( - NodeRegistration::new(identifier, ip).with_metadata(metadata), + NodeRegistration::new(identifier, ip).with_payload(metadata), &server_addr, ) .await @@ -167,22 +164,16 @@ fn parse_ip_env(ip_str: &str) -> Result { .map_err(Into::into) } -fn parse_registration_metadata_env() -> Result { +fn parse_registration_payload_env() -> Result { let Ok(raw) = env::var("CFG_REGISTRATION_METADATA_JSON") else { - return Ok(RegistrationMetadata::default()); + return Ok(RegistrationPayload::default()); }; - parse_registration_metadata(&raw) + parse_registration_payload(&raw) } -fn parse_registration_metadata(raw: &str) -> Result { - let value: Value = - serde_json::from_str(raw).context("parsing CFG_REGISTRATION_METADATA_JSON")?; - let Some(metadata) = value.as_object() else { - return Err(ClientEnvError::InvalidRegistrationMetadataShape.into()); - }; - - Ok(RegistrationMetadata::from(metadata.clone())) +fn parse_registration_payload(raw: &str) -> Result { + RegistrationPayload::from_json_str(raw).context("parsing CFG_REGISTRATION_METADATA_JSON") } #[cfg(test)] @@ -256,28 +247,37 @@ mod tests { } #[test] - fn parses_registration_metadata_object() { - let metadata = parse_registration_metadata(r#"{"network_port":3000,"service":"blend"}"#) + fn parses_registration_payload_object() { + #[derive(Debug, serde::Deserialize, PartialEq, Eq)] + struct ExamplePayload { + network_port: u16, + service: String, + } + + let metadata = parse_registration_payload(r#"{"network_port":3000,"service":"blend"}"#) .expect("parse metadata"); + let payload: ExamplePayload = metadata + .deserialize() + .expect("deserialize payload") + .expect("payload value"); assert_eq!( - metadata.get("network_port"), - Some(&Value::Number(3000_u16.into())) - ); - assert_eq!( - metadata.get("service"), - Some(&Value::String("blend".to_owned())) + payload, + ExamplePayload { + network_port: 3000, + service: "blend".to_owned(), + } ); } #[test] - fn rejects_non_object_registration_metadata() { - let error = parse_registration_metadata(r#"[1,2,3]"#).expect_err("reject metadata array"); + fn parses_registration_payload_array() { + let metadata = parse_registration_payload(r#"[1,2,3]"#).expect("parse metadata array"); + let payload: Vec = metadata + .deserialize() + .expect("deserialize payload") + .expect("payload value"); - assert!( - error - .to_string() - .contains("CFG_REGISTRATION_METADATA_JSON must be a JSON object") - ); + assert_eq!(payload, vec![1, 2, 3]); } }