Use typed cfgsync registration payloads

This commit is contained in:
andrussal 2026-03-10 09:56:12 +01:00
parent 80e1fe6c66
commit 13084c3a36
3 changed files with 217 additions and 137 deletions

View File

@ -14,6 +14,6 @@ pub use render::{
pub use repo::{ pub use repo::{
CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile, CfgSyncPayload, CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile, CfgSyncPayload,
ConfigProvider, ConfigRepo, FileConfigProvider, FileConfigProviderError, NodeRegistration, ConfigProvider, ConfigRepo, FileConfigProvider, FileConfigProviderError, NodeRegistration,
RegistrationMetadata, RegistrationResponse, RepoResponse, RegistrationPayload, RegistrationResponse, RepoResponse,
}; };
pub use server::{CfgSyncState, RunCfgsyncError, cfgsync_app, run_cfgsync}; pub use server::{CfgSyncState, RunCfgsyncError, cfgsync_app, run_cfgsync};

View File

@ -1,8 +1,8 @@
use std::{collections::HashMap, fs, net::Ipv4Addr, path::Path, sync::Arc}; use std::{collections::HashMap, fs, net::Ipv4Addr, path::Path, sync::Arc};
use cfgsync_artifacts::ArtifactFile; use cfgsync_artifacts::ArtifactFile;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Deserializer, Serialize, Serializer, de::DeserializeOwned};
use serde_json::{Map, Value}; use serde_json::Value;
use thiserror::Error; use thiserror::Error;
use crate::{CfgSyncBundle, CfgSyncBundleNode}; use crate::{CfgSyncBundle, CfgSyncBundleNode};
@ -23,15 +23,13 @@ pub struct CfgSyncPayload {
pub files: Vec<CfgSyncFile>, pub files: Vec<CfgSyncFile>,
} }
/// Adapter-owned registration metadata stored alongside a generic node /// Adapter-owned registration payload stored alongside a generic node identity.
/// identity. #[derive(Debug, Clone, Default, PartialEq, Eq)]
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] pub struct RegistrationPayload {
#[serde(transparent)] raw_json: Option<String>,
pub struct RegistrationMetadata {
values: Map<String, Value>,
} }
impl RegistrationMetadata { impl RegistrationPayload {
#[must_use] #[must_use]
pub fn new() -> Self { pub fn new() -> Self {
Self::default() Self::default()
@ -39,41 +37,69 @@ impl RegistrationMetadata {
#[must_use] #[must_use]
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.values.is_empty() self.raw_json.is_none()
} }
#[must_use] pub fn from_serializable<T>(value: &T) -> Result<Self, serde_json::Error>
pub fn get(&self, key: &str) -> Option<&Value> {
self.values.get(key)
}
pub fn insert_json_value(&mut self, key: impl Into<String>, value: Value) {
self.values.insert(key.into(), value);
}
pub fn insert_serialized<T>(
&mut self,
key: impl Into<String>,
value: T,
) -> Result<(), serde_json::Error>
where where
T: Serialize, T: Serialize,
{ {
let value = serde_json::to_value(value)?; Ok(Self {
self.insert_json_value(key, value); raw_json: Some(serde_json::to_string(value)?),
})
}
Ok(()) pub fn from_json_str(raw_json: &str) -> Result<Self, serde_json::Error> {
let value: Value = serde_json::from_str(raw_json)?;
Ok(Self {
raw_json: Some(serde_json::to_string(&value)?),
})
}
pub fn deserialize<T>(&self) -> Result<Option<T>, serde_json::Error>
where
T: DeserializeOwned,
{
self.raw_json
.as_ref()
.map(|raw_json| serde_json::from_str(raw_json))
.transpose()
} }
#[must_use] #[must_use]
pub fn values(&self) -> &Map<String, Value> { pub fn raw_json(&self) -> Option<&str> {
&self.values self.raw_json.as_deref()
} }
} }
impl From<Map<String, Value>> for RegistrationMetadata { impl Serialize for RegistrationPayload {
fn from(values: Map<String, Value>) -> Self { fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
Self { values } where
S: Serializer,
{
match self.raw_json.as_deref() {
Some(raw_json) => {
let value: Value =
serde_json::from_str(raw_json).map_err(serde::ser::Error::custom)?;
value.serialize(serializer)
}
None => serializer.serialize_none(),
}
}
}
impl<'de> Deserialize<'de> for RegistrationPayload {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let value = Option::<Value>::deserialize(deserializer)?;
let raw_json = value
.map(|value| serde_json::to_string(&value).map_err(serde::de::Error::custom))
.transpose()?;
Ok(Self { raw_json })
} }
} }
@ -82,8 +108,8 @@ impl From<Map<String, Value>> for RegistrationMetadata {
pub struct NodeRegistration { pub struct NodeRegistration {
pub identifier: String, pub identifier: String,
pub ip: Ipv4Addr, pub ip: Ipv4Addr,
#[serde(default, skip_serializing_if = "RegistrationMetadata::is_empty")] #[serde(default, skip_serializing_if = "RegistrationPayload::is_empty")]
pub metadata: RegistrationMetadata, pub metadata: RegistrationPayload,
} }
impl NodeRegistration { impl NodeRegistration {
@ -92,13 +118,21 @@ impl NodeRegistration {
Self { Self {
identifier: identifier.into(), identifier: identifier.into(),
ip, ip,
metadata: RegistrationMetadata::default(), metadata: RegistrationPayload::default(),
} }
} }
pub fn with_metadata<T>(mut self, metadata: &T) -> Result<Self, serde_json::Error>
where
T: Serialize,
{
self.metadata = RegistrationPayload::from_serializable(metadata)?;
Ok(self)
}
#[must_use] #[must_use]
pub fn with_metadata(mut self, metadata: RegistrationMetadata) -> Self { pub fn with_payload(mut self, payload: RegistrationPayload) -> Self {
self.metadata = metadata; self.metadata = payload;
self self
} }
} }
@ -222,66 +256,45 @@ impl ConfigProvider for ConfigRepo {
} }
} }
/// Failures when loading a file-backed cfgsync provider.
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum FileConfigProviderError { pub enum BundleLoadError {
#[error("failed to read cfgsync bundle at {path}: {source}")] #[error("reading cfgsync bundle {path}: {source}")]
Read { ReadBundle {
path: String, path: String,
#[source] #[source]
source: std::io::Error, source: std::io::Error,
}, },
#[error("failed to parse cfgsync bundle at {path}: {source}")] #[error("parsing cfgsync bundle {path}: {source}")]
Parse { ParseBundle {
path: String, path: String,
#[source] #[source]
source: serde_yaml::Error, source: serde_yaml::Error,
}, },
} }
/// YAML bundle-backed provider implementation. #[must_use]
pub struct FileConfigProvider { pub fn bundle_to_payload_map(bundle: CfgSyncBundle) -> HashMap<String, CfgSyncPayload> {
inner: ConfigRepo, bundle
} .nodes
.into_iter()
.map(|node| {
let CfgSyncBundleNode { identifier, files } = node;
impl FileConfigProvider { (identifier, CfgSyncPayload::from_files(files))
/// Loads provider state from a cfgsync bundle YAML file.
pub fn from_yaml_file(path: &Path) -> Result<Self, FileConfigProviderError> {
let raw = fs::read_to_string(path).map_err(|source| FileConfigProviderError::Read {
path: path.display().to_string(),
source,
})?;
let bundle: CfgSyncBundle =
serde_yaml::from_str(&raw).map_err(|source| FileConfigProviderError::Parse {
path: path.display().to_string(),
source,
})?;
let configs = bundle
.nodes
.into_iter()
.map(payload_from_bundle_node)
.collect();
Ok(Self {
inner: ConfigRepo { configs },
}) })
} .collect()
} }
impl ConfigProvider for FileConfigProvider { pub fn load_bundle(path: &Path) -> Result<CfgSyncBundle, BundleLoadError> {
fn register(&self, registration: NodeRegistration) -> RegistrationResponse { let path_string = path.display().to_string();
self.inner.register(registration) let raw = fs::read_to_string(path).map_err(|source| BundleLoadError::ReadBundle {
} path: path_string.clone(),
source,
fn resolve(&self, registration: &NodeRegistration) -> RepoResponse { })?;
self.inner.resolve(registration) serde_yaml::from_str(&raw).map_err(|source| BundleLoadError::ParseBundle {
} path: path_string,
} source,
})
fn payload_from_bundle_node(node: CfgSyncBundleNode) -> (String, CfgSyncPayload) {
(node.identifier, CfgSyncPayload::from_files(node.files))
} }
#[cfg(test)] #[cfg(test)]
@ -292,6 +305,38 @@ mod tests {
use super::*; use super::*;
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
struct ExampleRegistration {
network_port: u16,
service: String,
}
#[test]
fn registration_payload_round_trips_typed_value() {
let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip"))
.with_metadata(&ExampleRegistration {
network_port: 3000,
service: "blend".to_owned(),
})
.expect("serialize registration metadata");
let encoded = serde_json::to_value(&registration).expect("serialize registration");
let metadata = encoded.get("metadata").expect("registration metadata");
assert_eq!(metadata.get("network_port"), Some(&Value::from(3000u16)));
assert_eq!(metadata.get("service"), Some(&Value::from("blend")));
let decoded: NodeRegistration =
serde_json::from_value(encoded).expect("deserialize registration");
let typed: ExampleRegistration = decoded
.metadata
.deserialize()
.expect("deserialize metadata")
.expect("registration metadata value");
assert_eq!(typed.network_port, 3000);
assert_eq!(typed.service, "blend");
}
fn sample_payload() -> CfgSyncPayload { fn sample_payload() -> CfgSyncPayload {
CfgSyncPayload::from_files(vec![CfgSyncFile::new("/config.yaml", "key: value")]) CfgSyncPayload::from_files(vec![CfgSyncFile::new("/config.yaml", "key: value")])
} }
@ -378,31 +423,66 @@ nodes:
RepoResponse::Error(error) => panic!("expected config, got {error}"), RepoResponse::Error(error) => panic!("expected config, got {error}"),
} }
} }
}
#[test] /// Failures when loading a file-backed cfgsync provider.
fn registration_metadata_serializes_as_object() { #[derive(Debug, Error)]
let mut metadata = RegistrationMetadata::new(); pub enum FileConfigProviderError {
metadata #[error("failed to read cfgsync bundle at {path}: {source}")]
.insert_serialized("network_port", 3000_u16) Read {
.expect("serialize metadata"); path: String,
metadata.insert_json_value("service", Value::String("blend".to_owned())); #[source]
source: std::io::Error,
},
#[error("failed to parse cfgsync bundle at {path}: {source}")]
Parse {
path: String,
#[source]
source: serde_yaml::Error,
},
}
let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")) /// YAML bundle-backed provider implementation.
.with_metadata(metadata); pub struct FileConfigProvider {
inner: ConfigRepo,
}
let encoded = serde_json::to_value(&registration).expect("serialize registration"); impl FileConfigProvider {
let metadata = encoded /// Loads provider state from a cfgsync bundle YAML file.
.get("metadata") pub fn from_yaml_file(path: &Path) -> Result<Self, FileConfigProviderError> {
.and_then(Value::as_object) let raw = fs::read_to_string(path).map_err(|source| FileConfigProviderError::Read {
.expect("registration metadata object"); path: path.display().to_string(),
source,
})?;
assert_eq!( let bundle: CfgSyncBundle =
metadata.get("network_port"), serde_yaml::from_str(&raw).map_err(|source| FileConfigProviderError::Parse {
Some(&Value::Number(3000_u16.into())) path: path.display().to_string(),
); source,
assert_eq!( })?;
metadata.get("service"),
Some(&Value::String("blend".to_owned())) let configs = bundle
); .nodes
.into_iter()
.map(payload_from_bundle_node)
.collect();
Ok(Self {
inner: ConfigRepo { configs },
})
} }
} }
impl ConfigProvider for FileConfigProvider {
fn register(&self, registration: NodeRegistration) -> RegistrationResponse {
self.inner.register(registration)
}
fn resolve(&self, registration: &NodeRegistration) -> RepoResponse {
self.inner.resolve(registration)
}
}
fn payload_from_bundle_node(node: CfgSyncBundleNode) -> (String, CfgSyncPayload) {
(node.identifier, CfgSyncPayload::from_files(node.files))
}

View File

@ -7,9 +7,8 @@ use std::{
use anyhow::{Context as _, Result, bail}; use anyhow::{Context as _, Result, bail};
use cfgsync_core::{ use cfgsync_core::{
CFGSYNC_SCHEMA_VERSION, CfgSyncClient, CfgSyncFile, CfgSyncPayload, NodeRegistration, CFGSYNC_SCHEMA_VERSION, CfgSyncClient, CfgSyncFile, CfgSyncPayload, NodeRegistration,
RegistrationMetadata, RegistrationPayload,
}; };
use serde_json::Value;
use thiserror::Error; use thiserror::Error;
use tokio::time::{Duration, sleep}; use tokio::time::{Duration, sleep};
use tracing::info; use tracing::info;
@ -21,8 +20,6 @@ const FETCH_RETRY_DELAY: Duration = Duration::from_millis(250);
enum ClientEnvError { enum ClientEnvError {
#[error("CFG_HOST_IP `{value}` is not a valid IPv4 address")] #[error("CFG_HOST_IP `{value}` is not a valid IPv4 address")]
InvalidIp { value: String }, InvalidIp { value: String },
#[error("CFG_REGISTRATION_METADATA_JSON must be a JSON object")]
InvalidRegistrationMetadataShape,
} }
async fn fetch_with_retry(payload: &NodeRegistration, server_addr: &str) -> Result<CfgSyncPayload> { async fn fetch_with_retry(payload: &NodeRegistration, server_addr: &str) -> Result<CfgSyncPayload> {
@ -149,10 +146,10 @@ pub async fn run_cfgsync_client_from_env(default_port: u16) -> Result<()> {
let ip = parse_ip_env(&env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_owned()))?; let ip = parse_ip_env(&env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_owned()))?;
let identifier = let identifier =
env::var("CFG_HOST_IDENTIFIER").unwrap_or_else(|_| "unidentified-node".to_owned()); env::var("CFG_HOST_IDENTIFIER").unwrap_or_else(|_| "unidentified-node".to_owned());
let metadata = parse_registration_metadata_env()?; let metadata = parse_registration_payload_env()?;
pull_config_files( pull_config_files(
NodeRegistration::new(identifier, ip).with_metadata(metadata), NodeRegistration::new(identifier, ip).with_payload(metadata),
&server_addr, &server_addr,
) )
.await .await
@ -167,22 +164,16 @@ fn parse_ip_env(ip_str: &str) -> Result<Ipv4Addr> {
.map_err(Into::into) .map_err(Into::into)
} }
fn parse_registration_metadata_env() -> Result<RegistrationMetadata> { fn parse_registration_payload_env() -> Result<RegistrationPayload> {
let Ok(raw) = env::var("CFG_REGISTRATION_METADATA_JSON") else { let Ok(raw) = env::var("CFG_REGISTRATION_METADATA_JSON") else {
return Ok(RegistrationMetadata::default()); return Ok(RegistrationPayload::default());
}; };
parse_registration_metadata(&raw) parse_registration_payload(&raw)
} }
fn parse_registration_metadata(raw: &str) -> Result<RegistrationMetadata> { fn parse_registration_payload(raw: &str) -> Result<RegistrationPayload> {
let value: Value = RegistrationPayload::from_json_str(raw).context("parsing CFG_REGISTRATION_METADATA_JSON")
serde_json::from_str(raw).context("parsing CFG_REGISTRATION_METADATA_JSON")?;
let Some(metadata) = value.as_object() else {
return Err(ClientEnvError::InvalidRegistrationMetadataShape.into());
};
Ok(RegistrationMetadata::from(metadata.clone()))
} }
#[cfg(test)] #[cfg(test)]
@ -256,28 +247,37 @@ mod tests {
} }
#[test] #[test]
fn parses_registration_metadata_object() { fn parses_registration_payload_object() {
let metadata = parse_registration_metadata(r#"{"network_port":3000,"service":"blend"}"#) #[derive(Debug, serde::Deserialize, PartialEq, Eq)]
struct ExamplePayload {
network_port: u16,
service: String,
}
let metadata = parse_registration_payload(r#"{"network_port":3000,"service":"blend"}"#)
.expect("parse metadata"); .expect("parse metadata");
let payload: ExamplePayload = metadata
.deserialize()
.expect("deserialize payload")
.expect("payload value");
assert_eq!( assert_eq!(
metadata.get("network_port"), payload,
Some(&Value::Number(3000_u16.into())) ExamplePayload {
); network_port: 3000,
assert_eq!( service: "blend".to_owned(),
metadata.get("service"), }
Some(&Value::String("blend".to_owned()))
); );
} }
#[test] #[test]
fn rejects_non_object_registration_metadata() { fn parses_registration_payload_array() {
let error = parse_registration_metadata(r#"[1,2,3]"#).expect_err("reject metadata array"); let metadata = parse_registration_payload(r#"[1,2,3]"#).expect("parse metadata array");
let payload: Vec<u8> = metadata
.deserialize()
.expect("deserialize payload")
.expect("payload value");
assert!( assert_eq!(payload, vec![1, 2, 3]);
error
.to_string()
.contains("CFG_REGISTRATION_METADATA_JSON must be a JSON object")
);
} }
} }