Add adapter-backed cfgsync materialization

This commit is contained in:
andrussal 2026-03-10 08:57:41 +01:00
parent 129099337f
commit 911d09e2c1
5 changed files with 287 additions and 90 deletions

8
Cargo.lock generated
View File

@ -920,6 +920,8 @@ dependencies = [
name = "cfgsync-adapter"
version = "0.1.0"
dependencies = [
"cfgsync-artifacts",
"cfgsync-core",
"thiserror 2.0.18",
]
@ -1320,7 +1322,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de"
dependencies = [
"data-encoding",
"syn 2.0.114",
"syn 1.0.109",
]
[[package]]
@ -5520,9 +5522,9 @@ dependencies = [
[[package]]
name = "quinn-proto"
version = "0.11.13"
version = "0.11.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31"
checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098"
dependencies = [
"bytes",
"getrandom 0.3.4",

View File

@ -13,4 +13,6 @@ version = { workspace = true }
workspace = true
[dependencies]
thiserror = { workspace = true }
cfgsync-artifacts = { workspace = true }
cfgsync-core = { workspace = true }
thiserror = { workspace = true }

View File

@ -1,5 +1,10 @@
use std::{collections::HashMap, error::Error};
use std::{collections::HashMap, error::Error, sync::Mutex};
use cfgsync_artifacts::ArtifactFile;
use cfgsync_core::{
CfgSyncErrorResponse, CfgSyncPayload, ConfigProvider, NodeRegistration, RegistrationResponse,
RepoResponse,
};
use thiserror::Error;
/// Type-erased cfgsync adapter error used to preserve source context.
@ -14,6 +19,29 @@ pub struct CfgsyncNodeConfig {
pub config_yaml: String,
}
/// Node artifacts produced by a cfgsync materializer.
#[derive(Debug, Clone, Default)]
pub struct CfgsyncNodeArtifacts {
files: Vec<ArtifactFile>,
}
impl CfgsyncNodeArtifacts {
#[must_use]
pub fn new(files: Vec<ArtifactFile>) -> Self {
Self { files }
}
#[must_use]
pub fn files(&self) -> &[ArtifactFile] {
&self.files
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.files.is_empty()
}
}
/// Precomputed node configs indexed by stable identifier.
#[derive(Debug, Clone, Default)]
pub struct CfgsyncNodeCatalog {
@ -52,6 +80,91 @@ impl CfgsyncNodeCatalog {
}
}
/// Adapter-side node config materialization contract used by cfgsync server.
pub trait CfgsyncMaterializer: Send + Sync {
fn materialize(
&self,
registration: &NodeRegistration,
) -> Result<Option<CfgsyncNodeArtifacts>, DynCfgsyncError>;
}
impl CfgsyncMaterializer for CfgsyncNodeCatalog {
fn materialize(
&self,
registration: &NodeRegistration,
) -> Result<Option<CfgsyncNodeArtifacts>, DynCfgsyncError> {
let artifacts = self
.resolve(&registration.identifier)
.map(build_node_artifacts_from_config);
Ok(artifacts)
}
}
/// Registration-aware provider backed by an adapter materializer.
pub struct MaterializingConfigProvider<M> {
materializer: M,
registrations: Mutex<HashMap<String, NodeRegistration>>,
}
impl<M> MaterializingConfigProvider<M> {
#[must_use]
pub fn new(materializer: M) -> Self {
Self {
materializer,
registrations: Mutex::new(HashMap::new()),
}
}
fn registration_for(&self, identifier: &str) -> Option<NodeRegistration> {
let registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
registrations.get(identifier).cloned()
}
}
impl<M> ConfigProvider for MaterializingConfigProvider<M>
where
M: CfgsyncMaterializer,
{
fn register(&self, registration: NodeRegistration) -> RegistrationResponse {
let mut registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
registrations.insert(registration.identifier.clone(), registration);
RegistrationResponse::Registered
}
fn resolve(&self, registration: &NodeRegistration) -> RepoResponse {
let registration = match self.registration_for(&registration.identifier) {
Some(registration) => registration,
None => {
return RepoResponse::Error(CfgSyncErrorResponse::not_ready(
&registration.identifier,
));
}
};
match self.materializer.materialize(&registration) {
Ok(Some(artifacts)) => {
RepoResponse::Config(CfgSyncPayload::from_files(artifacts.files().to_vec()))
}
Ok(None) => {
RepoResponse::Error(CfgSyncErrorResponse::not_ready(&registration.identifier))
}
Err(error) => RepoResponse::Error(CfgSyncErrorResponse::internal(format!(
"failed to materialize config for host {}: {error}",
registration.identifier
))),
}
}
}
/// Adapter contract for converting an application deployment model into
/// node-specific serialized config payloads.
pub trait CfgsyncEnv {
@ -164,9 +277,15 @@ fn build_rewritten_node_config<E: CfgsyncEnv>(
Ok(node_config)
}
fn build_node_artifacts_from_config(config: &CfgsyncNodeConfig) -> CfgsyncNodeArtifacts {
CfgsyncNodeArtifacts::new(vec![ArtifactFile::new("/config.yaml", &config.config_yaml)])
}
#[cfg(test)]
mod tests {
use super::{CfgsyncNodeCatalog, CfgsyncNodeConfig};
use cfgsync_core::{CfgSyncErrorCode, ConfigProvider, NodeRegistration, RepoResponse};
use super::{CfgsyncNodeCatalog, CfgsyncNodeConfig, MaterializingConfigProvider};
#[test]
fn catalog_resolves_identifier() {
@ -179,4 +298,42 @@ mod tests {
assert_eq!(node.config_yaml, "key: value");
}
#[test]
fn materializing_provider_resolves_registered_node() {
let catalog = CfgsyncNodeCatalog::new(vec![CfgsyncNodeConfig {
identifier: "node-1".to_owned(),
config_yaml: "key: value".to_owned(),
}]);
let provider = MaterializingConfigProvider::new(catalog);
let registration = NodeRegistration {
identifier: "node-1".to_owned(),
ip: "127.0.0.1".parse().expect("parse ip"),
};
let _ = provider.register(registration.clone());
match provider.resolve(&registration) {
RepoResponse::Config(payload) => assert_eq!(payload.files()[0].path, "/config.yaml"),
RepoResponse::Error(error) => panic!("expected config, got {error}"),
}
}
#[test]
fn materializing_provider_reports_not_ready_before_registration() {
let catalog = CfgsyncNodeCatalog::new(vec![CfgsyncNodeConfig {
identifier: "node-1".to_owned(),
config_yaml: "key: value".to_owned(),
}]);
let provider = MaterializingConfigProvider::new(catalog);
let registration = NodeRegistration {
identifier: "node-1".to_owned(),
ip: "127.0.0.1".parse().expect("parse ip"),
};
match provider.resolve(&registration) {
RepoResponse::Config(_) => panic!("expected not-ready error"),
RepoResponse::Error(error) => assert!(matches!(error.code, CfgSyncErrorCode::NotReady)),
}
}
}

View File

@ -1,10 +1,4 @@
use std::{
collections::{HashMap, HashSet},
fs,
net::Ipv4Addr,
path::Path,
sync::{Arc, Mutex},
};
use std::{collections::HashMap, fs, net::Ipv4Addr, path::Path, sync::Arc};
use cfgsync_artifacts::ArtifactFile;
use serde::{Deserialize, Serialize};
@ -113,66 +107,44 @@ pub enum RegistrationResponse {
pub trait ConfigProvider: Send + Sync {
fn register(&self, registration: NodeRegistration) -> RegistrationResponse;
fn resolve(&self, identifier: &str) -> RepoResponse;
fn resolve(&self, registration: &NodeRegistration) -> RepoResponse;
}
/// In-memory map-backed provider used by cfgsync server state.
pub struct ConfigRepo {
configs: HashMap<String, CfgSyncPayload>,
registrations: Mutex<HashSet<String>>,
}
impl ConfigRepo {
#[must_use]
pub fn from_bundle(configs: HashMap<String, CfgSyncPayload>) -> Arc<Self> {
Arc::new(Self {
configs,
registrations: Mutex::new(HashSet::new()),
})
}
fn register_identifier(&self, identifier: &str) -> RegistrationResponse {
if !self.configs.contains_key(identifier) {
return RegistrationResponse::Error(CfgSyncErrorResponse::missing_config(identifier));
}
let mut registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
registrations.insert(identifier.to_owned());
RegistrationResponse::Registered
}
fn is_registered(&self, identifier: &str) -> bool {
let registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
registrations.contains(identifier)
Arc::new(Self { configs })
}
}
impl ConfigProvider for ConfigRepo {
fn register(&self, registration: NodeRegistration) -> RegistrationResponse {
self.register_identifier(&registration.identifier)
if self.configs.contains_key(&registration.identifier) {
RegistrationResponse::Registered
} else {
RegistrationResponse::Error(CfgSyncErrorResponse::missing_config(
&registration.identifier,
))
}
}
fn resolve(&self, identifier: &str) -> RepoResponse {
if !self.configs.contains_key(identifier) {
return RepoResponse::Error(CfgSyncErrorResponse::missing_config(identifier));
}
if !self.is_registered(identifier) {
return RepoResponse::Error(CfgSyncErrorResponse::not_ready(identifier));
}
self.configs.get(identifier).cloned().map_or_else(
|| RepoResponse::Error(CfgSyncErrorResponse::missing_config(identifier)),
RepoResponse::Config,
)
fn resolve(&self, registration: &NodeRegistration) -> RepoResponse {
self.configs
.get(&registration.identifier)
.cloned()
.map_or_else(
|| {
RepoResponse::Error(CfgSyncErrorResponse::missing_config(
&registration.identifier,
))
},
RepoResponse::Config,
)
}
}
@ -219,10 +191,7 @@ impl FileConfigProvider {
.collect();
Ok(Self {
inner: ConfigRepo {
configs,
registrations: Mutex::new(HashSet::new()),
},
inner: ConfigRepo { configs },
})
}
}
@ -232,8 +201,8 @@ impl ConfigProvider for FileConfigProvider {
self.inner.register(registration)
}
fn resolve(&self, identifier: &str) -> RepoResponse {
self.inner.resolve(identifier)
fn resolve(&self, registration: &NodeRegistration) -> RepoResponse {
self.inner.resolve(registration)
}
}
@ -257,12 +226,12 @@ mod tests {
fn resolves_existing_identifier() {
let mut configs = HashMap::new();
configs.insert("node-1".to_owned(), sample_payload());
let repo = ConfigRepo {
configs,
registrations: Mutex::new(HashSet::from(["node-1".to_owned()])),
};
let repo = ConfigRepo { configs };
match repo.resolve("node-1") {
match repo.resolve(&NodeRegistration {
identifier: "node-1".to_owned(),
ip: "127.0.0.1".parse().expect("parse ip"),
}) {
RepoResponse::Config(payload) => {
assert_eq!(payload.schema_version, CFGSYNC_SCHEMA_VERSION);
assert_eq!(payload.files.len(), 1);
@ -276,10 +245,12 @@ mod tests {
fn reports_missing_identifier() {
let repo = ConfigRepo {
configs: HashMap::new(),
registrations: Mutex::new(HashSet::new()),
};
match repo.resolve("unknown-node") {
match repo.resolve(&NodeRegistration {
identifier: "unknown-node".to_owned(),
ip: "127.0.0.1".parse().expect("parse ip"),
}) {
RepoResponse::Config(_) => panic!("expected missing-config error"),
RepoResponse::Error(error) => {
assert!(matches!(error.code, CfgSyncErrorCode::MissingConfig));
@ -310,26 +281,27 @@ nodes:
ip: "127.0.0.1".parse().expect("parse ip"),
});
match provider.resolve("node-1") {
match provider.resolve(&NodeRegistration {
identifier: "node-1".to_owned(),
ip: "127.0.0.1".parse().expect("parse ip"),
}) {
RepoResponse::Config(payload) => assert_eq!(payload.files.len(), 1),
RepoResponse::Error(error) => panic!("expected config, got {error}"),
}
}
#[test]
fn resolve_requires_registration_first() {
fn resolve_accepts_known_registration_without_gating() {
let mut configs = HashMap::new();
configs.insert("node-1".to_owned(), sample_payload());
let repo = ConfigRepo {
configs,
registrations: Mutex::new(HashSet::new()),
};
let repo = ConfigRepo { configs };
match repo.resolve("node-1") {
RepoResponse::Config(_) => panic!("expected not-ready error"),
RepoResponse::Error(error) => {
assert!(matches!(error.code, CfgSyncErrorCode::NotReady));
}
match repo.resolve(&NodeRegistration {
identifier: "node-1".to_owned(),
ip: "127.0.0.1".parse().expect("parse ip"),
}) {
RepoResponse::Config(_) => {}
RepoResponse::Error(error) => panic!("expected config, got {error}"),
}
}
}

View File

@ -39,7 +39,7 @@ async fn node_config(
State(state): State<Arc<CfgSyncState>>,
Json(payload): Json<NodeRegistration>,
) -> impl IntoResponse {
let response = resolve_node_config_response(&state, &payload.identifier);
let response = resolve_node_config_response(&state, &payload);
match response {
RepoResponse::Config(payload_data) => (StatusCode::OK, Json(payload_data)).into_response(),
@ -65,8 +65,11 @@ async fn register_node(
}
}
fn resolve_node_config_response(state: &CfgSyncState, identifier: &str) -> RepoResponse {
state.repo.resolve(identifier)
fn resolve_node_config_response(
state: &CfgSyncState,
registration: &NodeRegistration,
) -> RepoResponse {
state.repo.resolve(registration)
}
fn error_status(code: &CfgSyncErrorCode) -> StatusCode {
@ -129,11 +132,66 @@ mod tests {
}
}
fn resolve(&self, identifier: &str) -> RepoResponse {
self.data.get(identifier).cloned().map_or_else(
|| RepoResponse::Error(CfgSyncErrorResponse::missing_config(identifier)),
RepoResponse::Config,
)
fn resolve(&self, registration: &NodeRegistration) -> RepoResponse {
self.data
.get(&registration.identifier)
.cloned()
.map_or_else(
|| {
RepoResponse::Error(CfgSyncErrorResponse::missing_config(
&registration.identifier,
))
},
RepoResponse::Config,
)
}
}
struct RegistrationAwareProvider {
data: HashMap<String, CfgSyncPayload>,
registrations: std::sync::Mutex<HashMap<String, NodeRegistration>>,
}
impl ConfigProvider for RegistrationAwareProvider {
fn register(&self, registration: NodeRegistration) -> RegistrationResponse {
if !self.data.contains_key(&registration.identifier) {
return RegistrationResponse::Error(CfgSyncErrorResponse::missing_config(
&registration.identifier,
));
}
let mut registrations = self
.registrations
.lock()
.expect("test registration store should not be poisoned");
registrations.insert(registration.identifier.clone(), registration);
RegistrationResponse::Registered
}
fn resolve(&self, registration: &NodeRegistration) -> RepoResponse {
let registrations = self
.registrations
.lock()
.expect("test registration store should not be poisoned");
if !registrations.contains_key(&registration.identifier) {
return RepoResponse::Error(CfgSyncErrorResponse::not_ready(
&registration.identifier,
));
}
self.data
.get(&registration.identifier)
.cloned()
.map_or_else(
|| {
RepoResponse::Error(CfgSyncErrorResponse::missing_config(
&registration.identifier,
))
},
RepoResponse::Config,
)
}
}
@ -149,7 +207,10 @@ mod tests {
let mut data = HashMap::new();
data.insert("node-a".to_owned(), sample_payload());
let provider = crate::repo::ConfigRepo::from_bundle(data);
let provider = Arc::new(RegistrationAwareProvider {
data,
registrations: std::sync::Mutex::new(HashMap::new()),
});
let state = Arc::new(CfgSyncState::new(provider));
let payload = NodeRegistration {
ip: "127.0.0.1".parse().expect("valid ip"),
@ -197,7 +258,10 @@ mod tests {
let mut data = HashMap::new();
data.insert("node-a".to_owned(), sample_payload());
let provider = crate::repo::ConfigRepo::from_bundle(data);
let provider = Arc::new(RegistrationAwareProvider {
data,
registrations: std::sync::Mutex::new(HashMap::new()),
});
let state = Arc::new(CfgSyncState::new(provider));
let payload = NodeRegistration {
ip: "127.0.0.1".parse().expect("valid ip"),