Add cfgsync registration flow

This commit is contained in:
andrussal 2026-03-09 10:18:36 +01:00
parent 7da3df455f
commit 129099337f
7 changed files with 328 additions and 41 deletions

View File

@ -1,4 +1,4 @@
use std::error::Error;
use std::{collections::HashMap, error::Error};
use thiserror::Error;
@ -14,6 +14,44 @@ pub struct CfgsyncNodeConfig {
pub config_yaml: String,
}
/// Precomputed node configs indexed by stable identifier.
#[derive(Debug, Clone, Default)]
pub struct CfgsyncNodeCatalog {
nodes: HashMap<String, CfgsyncNodeConfig>,
}
impl CfgsyncNodeCatalog {
#[must_use]
pub fn new(nodes: Vec<CfgsyncNodeConfig>) -> Self {
let nodes = nodes
.into_iter()
.map(|node| (node.identifier.clone(), node))
.collect();
Self { nodes }
}
#[must_use]
pub fn resolve(&self, identifier: &str) -> Option<&CfgsyncNodeConfig> {
self.nodes.get(identifier)
}
#[must_use]
pub fn len(&self) -> usize {
self.nodes.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.nodes.is_empty()
}
#[must_use]
pub fn into_configs(self) -> Vec<CfgsyncNodeConfig> {
self.nodes.into_values().collect()
}
}
/// Adapter contract for converting an application deployment model into
/// node-specific serialized config payloads.
pub trait CfgsyncEnv {
@ -71,6 +109,14 @@ pub fn build_cfgsync_node_configs<E: CfgsyncEnv>(
deployment: &E::Deployment,
hostnames: &[String],
) -> Result<Vec<CfgsyncNodeConfig>, BuildCfgsyncNodesError> {
Ok(build_cfgsync_node_catalog::<E>(deployment, hostnames)?.into_configs())
}
/// Builds cfgsync node configs and indexes them by stable identifier.
pub fn build_cfgsync_node_catalog<E: CfgsyncEnv>(
deployment: &E::Deployment,
hostnames: &[String],
) -> Result<CfgsyncNodeCatalog, BuildCfgsyncNodesError> {
let nodes = E::nodes(deployment);
ensure_hostname_count(nodes.len(), hostnames.len())?;
@ -79,7 +125,7 @@ pub fn build_cfgsync_node_configs<E: CfgsyncEnv>(
output.push(build_node_entry::<E>(deployment, node, index, hostnames)?);
}
Ok(output)
Ok(CfgsyncNodeCatalog::new(output))
}
fn ensure_hostname_count(nodes: usize, hostnames: usize) -> Result<(), BuildCfgsyncNodesError> {
@ -117,3 +163,20 @@ fn build_rewritten_node_config<E: CfgsyncEnv>(
Ok(node_config)
}
#[cfg(test)]
mod tests {
use super::{CfgsyncNodeCatalog, CfgsyncNodeConfig};
#[test]
fn catalog_resolves_identifier() {
let catalog = CfgsyncNodeCatalog::new(vec![CfgsyncNodeConfig {
identifier: "node-1".to_owned(),
config_yaml: "key: value".to_owned(),
}]);
let node = catalog.resolve("node-1").expect("resolve node config");
assert_eq!(node.config_yaml, "key: value");
}
}

View File

@ -1,10 +1,7 @@
use serde::Serialize;
use thiserror::Error;
use crate::{
repo::{CfgSyncErrorResponse, CfgSyncPayload},
server::ClientIp,
};
use crate::repo::{CfgSyncErrorResponse, CfgSyncPayload, NodeRegistration};
/// cfgsync client-side request/response failures.
#[derive(Debug, Error)]
@ -21,6 +18,12 @@ pub enum ClientError {
Decode(serde_json::Error),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConfigFetchStatus {
Ready,
NotReady,
}
/// Reusable HTTP client for cfgsync server endpoints.
#[derive(Clone, Debug)]
pub struct CfgSyncClient {
@ -46,10 +49,15 @@ impl CfgSyncClient {
&self.base_url
}
/// Registers a node before requesting config.
pub async fn register_node(&self, payload: &NodeRegistration) -> Result<(), ClientError> {
self.post_status_only("/register", payload).await
}
/// Fetches `/node` payload for a node identifier.
pub async fn fetch_node_config(
&self,
payload: &ClientIp,
payload: &NodeRegistration,
) -> Result<CfgSyncPayload, ClientError> {
self.post_json("/node", payload).await
}
@ -57,11 +65,29 @@ impl CfgSyncClient {
/// Fetches `/init-with-node` payload for a node identifier.
pub async fn fetch_init_with_node_config(
&self,
payload: &ClientIp,
payload: &NodeRegistration,
) -> Result<CfgSyncPayload, ClientError> {
self.post_json("/init-with-node", payload).await
}
pub async fn fetch_node_config_status(
&self,
payload: &NodeRegistration,
) -> Result<ConfigFetchStatus, ClientError> {
match self.fetch_node_config(payload).await {
Ok(_) => Ok(ConfigFetchStatus::Ready),
Err(ClientError::Status {
status,
error: Some(error),
..
}) if status == reqwest::StatusCode::TOO_EARLY => {
let _ = error;
Ok(ConfigFetchStatus::NotReady)
}
Err(error) => Err(error),
}
}
/// Posts JSON payload to a cfgsync endpoint and decodes cfgsync payload.
pub async fn post_json<P: Serialize>(
&self,
@ -89,6 +115,32 @@ impl CfgSyncClient {
serde_json::from_str(&body).map_err(ClientError::Decode)
}
async fn post_status_only<P: Serialize>(
&self,
path: &str,
payload: &P,
) -> Result<(), ClientError> {
let url = self.endpoint_url(path);
let response = self.http.post(url).json(payload).send().await?;
let status = response.status();
let body = response.text().await?;
if !status.is_success() {
let error = serde_json::from_str::<CfgSyncErrorResponse>(&body).ok();
let message = error
.as_ref()
.map(|err| err.message.clone())
.unwrap_or_else(|| body.clone());
return Err(ClientError::Status {
status,
message,
error,
});
}
Ok(())
}
fn endpoint_url(&self, path: &str) -> String {
if path.starts_with('/') {
format!("{}{}", self.base_url, path)

View File

@ -5,7 +5,7 @@ pub mod repo;
pub mod server;
pub use bundle::{CfgSyncBundle, CfgSyncBundleNode};
pub use client::{CfgSyncClient, ClientError};
pub use client::{CfgSyncClient, ClientError, ConfigFetchStatus};
pub use render::{
CfgsyncConfigOverrides, CfgsyncOutputPaths, RenderedCfgsync, apply_cfgsync_overrides,
apply_timeout_floor, ensure_bundle_path, load_cfgsync_template_yaml,
@ -13,6 +13,7 @@ pub use render::{
};
pub use repo::{
CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile, CfgSyncPayload,
ConfigProvider, ConfigRepo, FileConfigProvider, FileConfigProviderError, RepoResponse,
ConfigProvider, ConfigRepo, FileConfigProvider, FileConfigProviderError, NodeRegistration,
RegistrationResponse, RepoResponse,
};
pub use server::{CfgSyncState, ClientIp, RunCfgsyncError, cfgsync_app, run_cfgsync};
pub use server::{CfgSyncState, RunCfgsyncError, cfgsync_app, run_cfgsync};

View File

@ -1,4 +1,10 @@
use std::{collections::HashMap, fs, path::Path, sync::Arc};
use std::{
collections::{HashMap, HashSet},
fs,
net::Ipv4Addr,
path::Path,
sync::{Arc, Mutex},
};
use cfgsync_artifacts::ArtifactFile;
use serde::{Deserialize, Serialize};
@ -22,6 +28,13 @@ pub struct CfgSyncPayload {
pub files: Vec<CfgSyncFile>,
}
/// Node metadata recorded before config materialization.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct NodeRegistration {
pub identifier: String,
pub ip: Ipv4Addr,
}
impl CfgSyncPayload {
#[must_use]
pub fn from_files(files: Vec<CfgSyncFile>) -> Self {
@ -46,6 +59,7 @@ impl CfgSyncPayload {
#[serde(rename_all = "snake_case")]
pub enum CfgSyncErrorCode {
MissingConfig,
NotReady,
Internal,
}
@ -66,6 +80,14 @@ impl CfgSyncErrorResponse {
}
}
#[must_use]
pub fn not_ready(identifier: &str) -> Self {
Self {
code: CfgSyncErrorCode::NotReady,
message: format!("config for host {identifier} is not ready"),
}
}
#[must_use]
pub fn internal(message: impl Into<String>) -> Self {
Self {
@ -81,25 +103,72 @@ pub enum RepoResponse {
Error(CfgSyncErrorResponse),
}
/// Repository outcome for a node registration request.
pub enum RegistrationResponse {
Registered,
Error(CfgSyncErrorResponse),
}
/// Read-only source for cfgsync node payloads.
pub trait ConfigProvider: Send + Sync {
fn register(&self, registration: NodeRegistration) -> RegistrationResponse;
fn resolve(&self, identifier: &str) -> RepoResponse;
}
/// In-memory map-backed provider used by cfgsync server state.
pub struct ConfigRepo {
configs: HashMap<String, CfgSyncPayload>,
registrations: Mutex<HashSet<String>>,
}
impl ConfigRepo {
#[must_use]
pub fn from_bundle(configs: HashMap<String, CfgSyncPayload>) -> Arc<Self> {
Arc::new(Self { configs })
Arc::new(Self {
configs,
registrations: Mutex::new(HashSet::new()),
})
}
fn register_identifier(&self, identifier: &str) -> RegistrationResponse {
if !self.configs.contains_key(identifier) {
return RegistrationResponse::Error(CfgSyncErrorResponse::missing_config(identifier));
}
let mut registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
registrations.insert(identifier.to_owned());
RegistrationResponse::Registered
}
fn is_registered(&self, identifier: &str) -> bool {
let registrations = self
.registrations
.lock()
.expect("cfgsync registration store should not be poisoned");
registrations.contains(identifier)
}
}
impl ConfigProvider for ConfigRepo {
fn register(&self, registration: NodeRegistration) -> RegistrationResponse {
self.register_identifier(&registration.identifier)
}
fn resolve(&self, identifier: &str) -> RepoResponse {
if !self.configs.contains_key(identifier) {
return RepoResponse::Error(CfgSyncErrorResponse::missing_config(identifier));
}
if !self.is_registered(identifier) {
return RepoResponse::Error(CfgSyncErrorResponse::not_ready(identifier));
}
self.configs.get(identifier).cloned().map_or_else(
|| RepoResponse::Error(CfgSyncErrorResponse::missing_config(identifier)),
RepoResponse::Config,
@ -150,12 +219,19 @@ impl FileConfigProvider {
.collect();
Ok(Self {
inner: ConfigRepo { configs },
inner: ConfigRepo {
configs,
registrations: Mutex::new(HashSet::new()),
},
})
}
}
impl ConfigProvider for FileConfigProvider {
fn register(&self, registration: NodeRegistration) -> RegistrationResponse {
self.inner.register(registration)
}
fn resolve(&self, identifier: &str) -> RepoResponse {
self.inner.resolve(identifier)
}
@ -181,7 +257,10 @@ mod tests {
fn resolves_existing_identifier() {
let mut configs = HashMap::new();
configs.insert("node-1".to_owned(), sample_payload());
let repo = ConfigRepo { configs };
let repo = ConfigRepo {
configs,
registrations: Mutex::new(HashSet::from(["node-1".to_owned()])),
};
match repo.resolve("node-1") {
RepoResponse::Config(payload) => {
@ -197,6 +276,7 @@ mod tests {
fn reports_missing_identifier() {
let repo = ConfigRepo {
configs: HashMap::new(),
registrations: Mutex::new(HashSet::new()),
};
match repo.resolve("unknown-node") {
@ -225,9 +305,31 @@ nodes:
let provider =
FileConfigProvider::from_yaml_file(bundle_file.path()).expect("load file provider");
let _ = provider.register(NodeRegistration {
identifier: "node-1".to_owned(),
ip: "127.0.0.1".parse().expect("parse ip"),
});
match provider.resolve("node-1") {
RepoResponse::Config(payload) => assert_eq!(payload.files.len(), 1),
RepoResponse::Error(error) => panic!("expected config, got {error}"),
}
}
#[test]
fn resolve_requires_registration_first() {
let mut configs = HashMap::new();
configs.insert("node-1".to_owned(), sample_payload());
let repo = ConfigRepo {
configs,
registrations: Mutex::new(HashSet::new()),
};
match repo.resolve("node-1") {
RepoResponse::Config(_) => panic!("expected not-ready error"),
RepoResponse::Error(error) => {
assert!(matches!(error.code, CfgSyncErrorCode::NotReady));
}
}
}
}

View File

@ -1,19 +1,11 @@
use std::{io, net::Ipv4Addr, sync::Arc};
use std::{io, sync::Arc};
use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::post};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use crate::repo::{CfgSyncErrorCode, ConfigProvider, RepoResponse};
/// Request payload used by cfgsync client for node config resolution.
#[derive(Serialize, Deserialize)]
pub struct ClientIp {
/// Node IP that can be used by clients for observability/logging.
pub ip: Ipv4Addr,
/// Stable node identifier used as key in cfgsync bundle lookup.
pub identifier: String,
}
use crate::repo::{
CfgSyncErrorCode, ConfigProvider, NodeRegistration, RegistrationResponse, RepoResponse,
};
/// Runtime state shared across cfgsync HTTP handlers.
pub struct CfgSyncState {
@ -45,7 +37,7 @@ pub enum RunCfgsyncError {
async fn node_config(
State(state): State<Arc<CfgSyncState>>,
Json(payload): Json<ClientIp>,
Json(payload): Json<NodeRegistration>,
) -> impl IntoResponse {
let response = resolve_node_config_response(&state, &payload.identifier);
@ -59,6 +51,20 @@ async fn node_config(
}
}
async fn register_node(
State(state): State<Arc<CfgSyncState>>,
Json(payload): Json<NodeRegistration>,
) -> impl IntoResponse {
match state.repo.register(payload) {
RegistrationResponse::Registered => StatusCode::ACCEPTED.into_response(),
RegistrationResponse::Error(error) => {
let status = error_status(&error.code);
(status, Json(error)).into_response()
}
}
}
fn resolve_node_config_response(state: &CfgSyncState, identifier: &str) -> RepoResponse {
state.repo.resolve(identifier)
}
@ -66,12 +72,14 @@ fn resolve_node_config_response(state: &CfgSyncState, identifier: &str) -> RepoR
fn error_status(code: &CfgSyncErrorCode) -> StatusCode {
match code {
CfgSyncErrorCode::MissingConfig => StatusCode::NOT_FOUND,
CfgSyncErrorCode::NotReady => StatusCode::TOO_EARLY,
CfgSyncErrorCode::Internal => StatusCode::INTERNAL_SERVER_ERROR,
}
}
pub fn cfgsync_app(state: CfgSyncState) -> Router {
Router::new()
.route("/register", post(register_node))
.route("/node", post(node_config))
.route("/init-with-node", post(node_config))
.with_state(Arc::new(state))
@ -100,10 +108,10 @@ mod tests {
use axum::{Json, extract::State, http::StatusCode, response::IntoResponse};
use super::{CfgSyncState, ClientIp, node_config};
use super::{CfgSyncState, NodeRegistration, node_config, register_node};
use crate::repo::{
CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile,
CfgSyncPayload, ConfigProvider, RepoResponse,
CfgSyncPayload, ConfigProvider, RegistrationResponse, RepoResponse,
};
struct StaticProvider {
@ -111,6 +119,16 @@ mod tests {
}
impl ConfigProvider for StaticProvider {
fn register(&self, registration: NodeRegistration) -> RegistrationResponse {
if self.data.contains_key(&registration.identifier) {
RegistrationResponse::Registered
} else {
RegistrationResponse::Error(CfgSyncErrorResponse::missing_config(
&registration.identifier,
))
}
}
fn resolve(&self, identifier: &str) -> RepoResponse {
self.data.get(identifier).cloned().map_or_else(
|| RepoResponse::Error(CfgSyncErrorResponse::missing_config(identifier)),
@ -131,13 +149,17 @@ mod tests {
let mut data = HashMap::new();
data.insert("node-a".to_owned(), sample_payload());
let provider = Arc::new(StaticProvider { data });
let provider = crate::repo::ConfigRepo::from_bundle(data);
let state = Arc::new(CfgSyncState::new(provider));
let payload = ClientIp {
let payload = NodeRegistration {
ip: "127.0.0.1".parse().expect("valid ip"),
identifier: "node-a".to_owned(),
};
let _ = register_node(State(state.clone()), Json(payload.clone()))
.await
.into_response();
let response = node_config(State(state), Json(payload))
.await
.into_response();
@ -151,7 +173,7 @@ mod tests {
data: HashMap::new(),
});
let state = Arc::new(CfgSyncState::new(provider));
let payload = ClientIp {
let payload = NodeRegistration {
ip: "127.0.0.1".parse().expect("valid ip"),
identifier: "missing-node".to_owned(),
};
@ -169,4 +191,23 @@ mod tests {
assert!(matches!(error.code, CfgSyncErrorCode::MissingConfig));
}
#[tokio::test]
async fn node_config_returns_not_ready_before_registration() {
let mut data = HashMap::new();
data.insert("node-a".to_owned(), sample_payload());
let provider = crate::repo::ConfigRepo::from_bundle(data);
let state = Arc::new(CfgSyncState::new(provider));
let payload = NodeRegistration {
ip: "127.0.0.1".parse().expect("valid ip"),
identifier: "node-a".to_owned(),
};
let response = node_config(State(state), Json(payload))
.await
.into_response();
assert_eq!(response.status(), StatusCode::TOO_EARLY);
}
}

View File

@ -5,7 +5,9 @@ use std::{
};
use anyhow::{Context as _, Result, bail};
use cfgsync_core::{CFGSYNC_SCHEMA_VERSION, CfgSyncClient, CfgSyncFile, CfgSyncPayload, ClientIp};
use cfgsync_core::{
CFGSYNC_SCHEMA_VERSION, CfgSyncClient, CfgSyncFile, CfgSyncPayload, NodeRegistration,
};
use thiserror::Error;
use tokio::time::{Duration, sleep};
use tracing::info;
@ -19,7 +21,7 @@ enum ClientEnvError {
InvalidIp { value: String },
}
async fn fetch_with_retry(payload: &ClientIp, server_addr: &str) -> Result<CfgSyncPayload> {
async fn fetch_with_retry(payload: &NodeRegistration, server_addr: &str) -> Result<CfgSyncPayload> {
let client = CfgSyncClient::new(server_addr);
for attempt in 1..=FETCH_ATTEMPTS {
@ -40,13 +42,15 @@ async fn fetch_with_retry(payload: &ClientIp, server_addr: &str) -> Result<CfgSy
unreachable!("cfgsync fetch loop always returns before exhausting attempts");
}
async fn fetch_once(client: &CfgSyncClient, payload: &ClientIp) -> Result<CfgSyncPayload> {
async fn fetch_once(client: &CfgSyncClient, payload: &NodeRegistration) -> Result<CfgSyncPayload> {
let response = client.fetch_node_config(payload).await?;
Ok(response)
}
async fn pull_config_files(payload: ClientIp, server_addr: &str) -> Result<()> {
async fn pull_config_files(payload: NodeRegistration, server_addr: &str) -> Result<()> {
register_node(&payload, server_addr).await?;
let config = fetch_with_retry(&payload, server_addr)
.await
.context("fetching cfgsync node config")?;
@ -63,6 +67,30 @@ async fn pull_config_files(payload: ClientIp, server_addr: &str) -> Result<()> {
Ok(())
}
async fn register_node(payload: &NodeRegistration, server_addr: &str) -> Result<()> {
let client = CfgSyncClient::new(server_addr);
for attempt in 1..=FETCH_ATTEMPTS {
match client.register_node(payload).await {
Ok(()) => {
info!(identifier = %payload.identifier, "cfgsync node registered");
return Ok(());
}
Err(error) => {
if attempt == FETCH_ATTEMPTS {
return Err(error).with_context(|| {
format!("registering node with cfgsync after {attempt} attempts")
});
}
sleep(FETCH_RETRY_DELAY).await;
}
}
}
unreachable!("cfgsync register loop always returns before exhausting attempts");
}
fn ensure_schema_version(config: &CfgSyncPayload) -> Result<()> {
if config.schema_version != CFGSYNC_SCHEMA_VERSION {
bail!(
@ -118,7 +146,7 @@ pub async fn run_cfgsync_client_from_env(default_port: u16) -> Result<()> {
let identifier =
env::var("CFG_HOST_IDENTIFIER").unwrap_or_else(|_| "unidentified-node".to_owned());
pull_config_files(ClientIp { ip, identifier }, &server_addr).await
pull_config_files(NodeRegistration { ip, identifier }, &server_addr).await
}
fn parse_ip_env(ip_str: &str) -> Result<Ipv4Addr> {
@ -164,7 +192,7 @@ mod tests {
});
pull_config_files(
ClientIp {
NodeRegistration {
ip: "127.0.0.1".parse().expect("parse ip"),
identifier: "node-1".to_owned(),
},

View File

@ -1,5 +1,5 @@
use anyhow::Result;
use cfgsync_adapter::{CfgsyncEnv, build_cfgsync_node_configs};
use cfgsync_adapter::{CfgsyncEnv, build_cfgsync_node_catalog};
pub(crate) use cfgsync_core::render::CfgsyncOutputPaths;
use cfgsync_core::{
CfgSyncBundle, CfgSyncBundleNode,
@ -49,7 +49,7 @@ fn build_cfgsync_bundle<E: CfgsyncEnv>(
topology: &E::Deployment,
hostnames: &[String],
) -> Result<CfgSyncBundle> {
let nodes = build_cfgsync_node_configs::<E>(topology, hostnames)?;
let nodes = build_cfgsync_node_catalog::<E>(topology, hostnames)?.into_configs();
let nodes = nodes
.into_iter()
.map(|node| CfgSyncBundleNode {