From 80e1fe6c6628ff8efd5d98f4a83241d2c8ab8385 Mon Sep 17 00:00:00 2001 From: andrussal Date: Tue, 10 Mar 2026 09:41:03 +0100 Subject: [PATCH] Make cfgsync registration metadata generic --- Cargo.lock | 1 + cfgsync/adapter/src/lib.rs | 10 +-- cfgsync/core/src/lib.rs | 2 +- cfgsync/core/src/repo.rs | 141 +++++++++++++++++++++++++++++----- cfgsync/core/src/server.rs | 15 +--- cfgsync/runtime/Cargo.toml | 1 + cfgsync/runtime/src/client.rs | 60 +++++++++++++-- 7 files changed, 184 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f9bf1be..2be427b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -959,6 +959,7 @@ dependencies = [ "cfgsync-core", "clap", "serde", + "serde_json", "serde_yaml", "tempfile", "thiserror 2.0.18", diff --git a/cfgsync/adapter/src/lib.rs b/cfgsync/adapter/src/lib.rs index d4da5ed..226b98d 100644 --- a/cfgsync/adapter/src/lib.rs +++ b/cfgsync/adapter/src/lib.rs @@ -308,10 +308,7 @@ mod tests { files: vec![ArtifactFile::new("/config.yaml", "key: value")], }]); let provider = MaterializingConfigProvider::new(catalog); - let registration = NodeRegistration { - identifier: "node-1".to_owned(), - ip: "127.0.0.1".parse().expect("parse ip"), - }; + let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")); let _ = provider.register(registration.clone()); @@ -328,10 +325,7 @@ mod tests { files: vec![ArtifactFile::new("/config.yaml", "key: value")], }]); let provider = MaterializingConfigProvider::new(catalog); - let registration = NodeRegistration { - identifier: "node-1".to_owned(), - ip: "127.0.0.1".parse().expect("parse ip"), - }; + let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")); match provider.resolve(®istration) { RepoResponse::Config(_) => panic!("expected not-ready error"), diff --git a/cfgsync/core/src/lib.rs b/cfgsync/core/src/lib.rs index 2807346..c33c32f 100644 --- a/cfgsync/core/src/lib.rs +++ b/cfgsync/core/src/lib.rs @@ -14,6 +14,6 @@ pub use render::{ pub use repo::{ CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile, CfgSyncPayload, ConfigProvider, ConfigRepo, FileConfigProvider, FileConfigProviderError, NodeRegistration, - RegistrationResponse, RepoResponse, + RegistrationMetadata, RegistrationResponse, RepoResponse, }; pub use server::{CfgSyncState, RunCfgsyncError, cfgsync_app, run_cfgsync}; diff --git a/cfgsync/core/src/repo.rs b/cfgsync/core/src/repo.rs index 69dd758..e301aca 100644 --- a/cfgsync/core/src/repo.rs +++ b/cfgsync/core/src/repo.rs @@ -2,6 +2,7 @@ use std::{collections::HashMap, fs, net::Ipv4Addr, path::Path, sync::Arc}; use cfgsync_artifacts::ArtifactFile; use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; use thiserror::Error; use crate::{CfgSyncBundle, CfgSyncBundleNode}; @@ -22,11 +23,84 @@ pub struct CfgSyncPayload { pub files: Vec, } +/// Adapter-owned registration metadata stored alongside a generic node +/// identity. +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] +#[serde(transparent)] +pub struct RegistrationMetadata { + values: Map, +} + +impl RegistrationMetadata { + #[must_use] + pub fn new() -> Self { + Self::default() + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.values.is_empty() + } + + #[must_use] + pub fn get(&self, key: &str) -> Option<&Value> { + self.values.get(key) + } + + pub fn insert_json_value(&mut self, key: impl Into, value: Value) { + self.values.insert(key.into(), value); + } + + pub fn insert_serialized( + &mut self, + key: impl Into, + value: T, + ) -> Result<(), serde_json::Error> + where + T: Serialize, + { + let value = serde_json::to_value(value)?; + self.insert_json_value(key, value); + + Ok(()) + } + + #[must_use] + pub fn values(&self) -> &Map { + &self.values + } +} + +impl From> for RegistrationMetadata { + fn from(values: Map) -> Self { + Self { values } + } +} + /// Node metadata recorded before config materialization. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct NodeRegistration { pub identifier: String, pub ip: Ipv4Addr, + #[serde(default, skip_serializing_if = "RegistrationMetadata::is_empty")] + pub metadata: RegistrationMetadata, +} + +impl NodeRegistration { + #[must_use] + pub fn new(identifier: impl Into, ip: Ipv4Addr) -> Self { + Self { + identifier: identifier.into(), + ip, + metadata: RegistrationMetadata::default(), + } + } + + #[must_use] + pub fn with_metadata(mut self, metadata: RegistrationMetadata) -> Self { + self.metadata = metadata; + self + } } impl CfgSyncPayload { @@ -228,10 +302,10 @@ mod tests { configs.insert("node-1".to_owned(), sample_payload()); let repo = ConfigRepo { configs }; - match repo.resolve(&NodeRegistration { - identifier: "node-1".to_owned(), - ip: "127.0.0.1".parse().expect("parse ip"), - }) { + match repo.resolve(&NodeRegistration::new( + "node-1", + "127.0.0.1".parse().expect("parse ip"), + )) { RepoResponse::Config(payload) => { assert_eq!(payload.schema_version, CFGSYNC_SCHEMA_VERSION); assert_eq!(payload.files.len(), 1); @@ -247,10 +321,10 @@ mod tests { configs: HashMap::new(), }; - match repo.resolve(&NodeRegistration { - identifier: "unknown-node".to_owned(), - ip: "127.0.0.1".parse().expect("parse ip"), - }) { + match repo.resolve(&NodeRegistration::new( + "unknown-node", + "127.0.0.1".parse().expect("parse ip"), + )) { RepoResponse::Config(_) => panic!("expected missing-config error"), RepoResponse::Error(error) => { assert!(matches!(error.code, CfgSyncErrorCode::MissingConfig)); @@ -276,15 +350,15 @@ nodes: let provider = FileConfigProvider::from_yaml_file(bundle_file.path()).expect("load file provider"); - let _ = provider.register(NodeRegistration { - identifier: "node-1".to_owned(), - ip: "127.0.0.1".parse().expect("parse ip"), - }); + let _ = provider.register(NodeRegistration::new( + "node-1", + "127.0.0.1".parse().expect("parse ip"), + )); - match provider.resolve(&NodeRegistration { - identifier: "node-1".to_owned(), - ip: "127.0.0.1".parse().expect("parse ip"), - }) { + match provider.resolve(&NodeRegistration::new( + "node-1", + "127.0.0.1".parse().expect("parse ip"), + )) { RepoResponse::Config(payload) => assert_eq!(payload.files.len(), 1), RepoResponse::Error(error) => panic!("expected config, got {error}"), } @@ -296,12 +370,39 @@ nodes: configs.insert("node-1".to_owned(), sample_payload()); let repo = ConfigRepo { configs }; - match repo.resolve(&NodeRegistration { - identifier: "node-1".to_owned(), - ip: "127.0.0.1".parse().expect("parse ip"), - }) { + match repo.resolve(&NodeRegistration::new( + "node-1", + "127.0.0.1".parse().expect("parse ip"), + )) { RepoResponse::Config(_) => {} RepoResponse::Error(error) => panic!("expected config, got {error}"), } } + + #[test] + fn registration_metadata_serializes_as_object() { + let mut metadata = RegistrationMetadata::new(); + metadata + .insert_serialized("network_port", 3000_u16) + .expect("serialize metadata"); + metadata.insert_json_value("service", Value::String("blend".to_owned())); + + let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")) + .with_metadata(metadata); + + let encoded = serde_json::to_value(®istration).expect("serialize registration"); + let metadata = encoded + .get("metadata") + .and_then(Value::as_object) + .expect("registration metadata object"); + + assert_eq!( + metadata.get("network_port"), + Some(&Value::Number(3000_u16.into())) + ); + assert_eq!( + metadata.get("service"), + Some(&Value::String("blend".to_owned())) + ); + } } diff --git a/cfgsync/core/src/server.rs b/cfgsync/core/src/server.rs index 407f30a..794af79 100644 --- a/cfgsync/core/src/server.rs +++ b/cfgsync/core/src/server.rs @@ -212,10 +212,7 @@ mod tests { registrations: std::sync::Mutex::new(HashMap::new()), }); let state = Arc::new(CfgSyncState::new(provider)); - let payload = NodeRegistration { - ip: "127.0.0.1".parse().expect("valid ip"), - identifier: "node-a".to_owned(), - }; + let payload = NodeRegistration::new("node-a", "127.0.0.1".parse().expect("valid ip")); let _ = register_node(State(state.clone()), Json(payload.clone())) .await @@ -234,10 +231,7 @@ mod tests { data: HashMap::new(), }); let state = Arc::new(CfgSyncState::new(provider)); - let payload = NodeRegistration { - ip: "127.0.0.1".parse().expect("valid ip"), - identifier: "missing-node".to_owned(), - }; + let payload = NodeRegistration::new("missing-node", "127.0.0.1".parse().expect("valid ip")); let response = node_config(State(state), Json(payload)) .await @@ -263,10 +257,7 @@ mod tests { registrations: std::sync::Mutex::new(HashMap::new()), }); let state = Arc::new(CfgSyncState::new(provider)); - let payload = NodeRegistration { - ip: "127.0.0.1".parse().expect("valid ip"), - identifier: "node-a".to_owned(), - }; + let payload = NodeRegistration::new("node-a", "127.0.0.1".parse().expect("valid ip")); let response = node_config(State(state), Json(payload)) .await diff --git a/cfgsync/runtime/Cargo.toml b/cfgsync/runtime/Cargo.toml index f708ba1..b6e7a5f 100644 --- a/cfgsync/runtime/Cargo.toml +++ b/cfgsync/runtime/Cargo.toml @@ -18,6 +18,7 @@ cfgsync-adapter = { workspace = true } cfgsync-core = { workspace = true } clap = { version = "4", features = ["derive"] } serde = { workspace = true } +serde_json = { workspace = true } serde_yaml = { workspace = true } thiserror = { workspace = true } tokio = { default-features = false, features = ["macros", "net", "rt-multi-thread"], version = "1" } diff --git a/cfgsync/runtime/src/client.rs b/cfgsync/runtime/src/client.rs index 82dbe65..7cc113c 100644 --- a/cfgsync/runtime/src/client.rs +++ b/cfgsync/runtime/src/client.rs @@ -7,7 +7,9 @@ use std::{ use anyhow::{Context as _, Result, bail}; use cfgsync_core::{ CFGSYNC_SCHEMA_VERSION, CfgSyncClient, CfgSyncFile, CfgSyncPayload, NodeRegistration, + RegistrationMetadata, }; +use serde_json::Value; use thiserror::Error; use tokio::time::{Duration, sleep}; use tracing::info; @@ -19,6 +21,8 @@ const FETCH_RETRY_DELAY: Duration = Duration::from_millis(250); enum ClientEnvError { #[error("CFG_HOST_IP `{value}` is not a valid IPv4 address")] InvalidIp { value: String }, + #[error("CFG_REGISTRATION_METADATA_JSON must be a JSON object")] + InvalidRegistrationMetadataShape, } async fn fetch_with_retry(payload: &NodeRegistration, server_addr: &str) -> Result { @@ -145,8 +149,13 @@ pub async fn run_cfgsync_client_from_env(default_port: u16) -> Result<()> { let ip = parse_ip_env(&env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_owned()))?; let identifier = env::var("CFG_HOST_IDENTIFIER").unwrap_or_else(|_| "unidentified-node".to_owned()); + let metadata = parse_registration_metadata_env()?; - pull_config_files(NodeRegistration { ip, identifier }, &server_addr).await + pull_config_files( + NodeRegistration::new(identifier, ip).with_metadata(metadata), + &server_addr, + ) + .await } fn parse_ip_env(ip_str: &str) -> Result { @@ -158,6 +167,24 @@ fn parse_ip_env(ip_str: &str) -> Result { .map_err(Into::into) } +fn parse_registration_metadata_env() -> Result { + let Ok(raw) = env::var("CFG_REGISTRATION_METADATA_JSON") else { + return Ok(RegistrationMetadata::default()); + }; + + parse_registration_metadata(&raw) +} + +fn parse_registration_metadata(raw: &str) -> Result { + let value: Value = + serde_json::from_str(raw).context("parsing CFG_REGISTRATION_METADATA_JSON")?; + let Some(metadata) = value.as_object() else { + return Err(ClientEnvError::InvalidRegistrationMetadataShape.into()); + }; + + Ok(RegistrationMetadata::from(metadata.clone())) +} + #[cfg(test)] mod tests { use std::collections::HashMap; @@ -192,10 +219,7 @@ mod tests { }); pull_config_files( - NodeRegistration { - ip: "127.0.0.1".parse().expect("parse ip"), - identifier: "node-1".to_owned(), - }, + NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")), &address, ) .await @@ -230,4 +254,30 @@ mod tests { drop(listener); port } + + #[test] + fn parses_registration_metadata_object() { + let metadata = parse_registration_metadata(r#"{"network_port":3000,"service":"blend"}"#) + .expect("parse metadata"); + + assert_eq!( + metadata.get("network_port"), + Some(&Value::Number(3000_u16.into())) + ); + assert_eq!( + metadata.get("service"), + Some(&Value::String("blend".to_owned())) + ); + } + + #[test] + fn rejects_non_object_registration_metadata() { + let error = parse_registration_metadata(r#"[1,2,3]"#).expect_err("reject metadata array"); + + assert!( + error + .to_string() + .contains("CFG_REGISTRATION_METADATA_JSON must be a JSON object") + ); + } }