diff --git a/cfgsync/adapter/src/lib.rs b/cfgsync/adapter/src/lib.rs index 467630d..0e0c7fa 100644 --- a/cfgsync/adapter/src/lib.rs +++ b/cfgsync/adapter/src/lib.rs @@ -1,4 +1,4 @@ -use std::error::Error; +use std::{collections::HashMap, error::Error}; use thiserror::Error; @@ -14,6 +14,44 @@ pub struct CfgsyncNodeConfig { pub config_yaml: String, } +/// Precomputed node configs indexed by stable identifier. +#[derive(Debug, Clone, Default)] +pub struct CfgsyncNodeCatalog { + nodes: HashMap, +} + +impl CfgsyncNodeCatalog { + #[must_use] + pub fn new(nodes: Vec) -> Self { + let nodes = nodes + .into_iter() + .map(|node| (node.identifier.clone(), node)) + .collect(); + + Self { nodes } + } + + #[must_use] + pub fn resolve(&self, identifier: &str) -> Option<&CfgsyncNodeConfig> { + self.nodes.get(identifier) + } + + #[must_use] + pub fn len(&self) -> usize { + self.nodes.len() + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.nodes.is_empty() + } + + #[must_use] + pub fn into_configs(self) -> Vec { + self.nodes.into_values().collect() + } +} + /// Adapter contract for converting an application deployment model into /// node-specific serialized config payloads. pub trait CfgsyncEnv { @@ -71,6 +109,14 @@ pub fn build_cfgsync_node_configs( deployment: &E::Deployment, hostnames: &[String], ) -> Result, BuildCfgsyncNodesError> { + Ok(build_cfgsync_node_catalog::(deployment, hostnames)?.into_configs()) +} + +/// Builds cfgsync node configs and indexes them by stable identifier. +pub fn build_cfgsync_node_catalog( + deployment: &E::Deployment, + hostnames: &[String], +) -> Result { let nodes = E::nodes(deployment); ensure_hostname_count(nodes.len(), hostnames.len())?; @@ -79,7 +125,7 @@ pub fn build_cfgsync_node_configs( output.push(build_node_entry::(deployment, node, index, hostnames)?); } - Ok(output) + Ok(CfgsyncNodeCatalog::new(output)) } fn ensure_hostname_count(nodes: usize, hostnames: usize) -> Result<(), BuildCfgsyncNodesError> { @@ -117,3 +163,20 @@ fn build_rewritten_node_config( Ok(node_config) } + +#[cfg(test)] +mod tests { + use super::{CfgsyncNodeCatalog, CfgsyncNodeConfig}; + + #[test] + fn catalog_resolves_identifier() { + let catalog = CfgsyncNodeCatalog::new(vec![CfgsyncNodeConfig { + identifier: "node-1".to_owned(), + config_yaml: "key: value".to_owned(), + }]); + + let node = catalog.resolve("node-1").expect("resolve node config"); + + assert_eq!(node.config_yaml, "key: value"); + } +} diff --git a/cfgsync/core/src/client.rs b/cfgsync/core/src/client.rs index 28e59b5..bfc07ec 100644 --- a/cfgsync/core/src/client.rs +++ b/cfgsync/core/src/client.rs @@ -1,10 +1,7 @@ use serde::Serialize; use thiserror::Error; -use crate::{ - repo::{CfgSyncErrorResponse, CfgSyncPayload}, - server::ClientIp, -}; +use crate::repo::{CfgSyncErrorResponse, CfgSyncPayload, NodeRegistration}; /// cfgsync client-side request/response failures. #[derive(Debug, Error)] @@ -21,6 +18,12 @@ pub enum ClientError { Decode(serde_json::Error), } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConfigFetchStatus { + Ready, + NotReady, +} + /// Reusable HTTP client for cfgsync server endpoints. #[derive(Clone, Debug)] pub struct CfgSyncClient { @@ -46,10 +49,15 @@ impl CfgSyncClient { &self.base_url } + /// Registers a node before requesting config. + pub async fn register_node(&self, payload: &NodeRegistration) -> Result<(), ClientError> { + self.post_status_only("/register", payload).await + } + /// Fetches `/node` payload for a node identifier. pub async fn fetch_node_config( &self, - payload: &ClientIp, + payload: &NodeRegistration, ) -> Result { self.post_json("/node", payload).await } @@ -57,11 +65,29 @@ impl CfgSyncClient { /// Fetches `/init-with-node` payload for a node identifier. pub async fn fetch_init_with_node_config( &self, - payload: &ClientIp, + payload: &NodeRegistration, ) -> Result { self.post_json("/init-with-node", payload).await } + pub async fn fetch_node_config_status( + &self, + payload: &NodeRegistration, + ) -> Result { + match self.fetch_node_config(payload).await { + Ok(_) => Ok(ConfigFetchStatus::Ready), + Err(ClientError::Status { + status, + error: Some(error), + .. + }) if status == reqwest::StatusCode::TOO_EARLY => { + let _ = error; + Ok(ConfigFetchStatus::NotReady) + } + Err(error) => Err(error), + } + } + /// Posts JSON payload to a cfgsync endpoint and decodes cfgsync payload. pub async fn post_json( &self, @@ -89,6 +115,32 @@ impl CfgSyncClient { serde_json::from_str(&body).map_err(ClientError::Decode) } + async fn post_status_only( + &self, + path: &str, + payload: &P, + ) -> Result<(), ClientError> { + let url = self.endpoint_url(path); + let response = self.http.post(url).json(payload).send().await?; + + let status = response.status(); + let body = response.text().await?; + if !status.is_success() { + let error = serde_json::from_str::(&body).ok(); + let message = error + .as_ref() + .map(|err| err.message.clone()) + .unwrap_or_else(|| body.clone()); + return Err(ClientError::Status { + status, + message, + error, + }); + } + + Ok(()) + } + fn endpoint_url(&self, path: &str) -> String { if path.starts_with('/') { format!("{}{}", self.base_url, path) diff --git a/cfgsync/core/src/lib.rs b/cfgsync/core/src/lib.rs index b6851e3..2807346 100644 --- a/cfgsync/core/src/lib.rs +++ b/cfgsync/core/src/lib.rs @@ -5,7 +5,7 @@ pub mod repo; pub mod server; pub use bundle::{CfgSyncBundle, CfgSyncBundleNode}; -pub use client::{CfgSyncClient, ClientError}; +pub use client::{CfgSyncClient, ClientError, ConfigFetchStatus}; pub use render::{ CfgsyncConfigOverrides, CfgsyncOutputPaths, RenderedCfgsync, apply_cfgsync_overrides, apply_timeout_floor, ensure_bundle_path, load_cfgsync_template_yaml, @@ -13,6 +13,7 @@ pub use render::{ }; pub use repo::{ CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile, CfgSyncPayload, - ConfigProvider, ConfigRepo, FileConfigProvider, FileConfigProviderError, RepoResponse, + ConfigProvider, ConfigRepo, FileConfigProvider, FileConfigProviderError, NodeRegistration, + RegistrationResponse, RepoResponse, }; -pub use server::{CfgSyncState, ClientIp, RunCfgsyncError, cfgsync_app, run_cfgsync}; +pub use server::{CfgSyncState, RunCfgsyncError, cfgsync_app, run_cfgsync}; diff --git a/cfgsync/core/src/repo.rs b/cfgsync/core/src/repo.rs index 560e265..dbce99f 100644 --- a/cfgsync/core/src/repo.rs +++ b/cfgsync/core/src/repo.rs @@ -1,4 +1,10 @@ -use std::{collections::HashMap, fs, path::Path, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + fs, + net::Ipv4Addr, + path::Path, + sync::{Arc, Mutex}, +}; use cfgsync_artifacts::ArtifactFile; use serde::{Deserialize, Serialize}; @@ -22,6 +28,13 @@ pub struct CfgSyncPayload { pub files: Vec, } +/// Node metadata recorded before config materialization. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct NodeRegistration { + pub identifier: String, + pub ip: Ipv4Addr, +} + impl CfgSyncPayload { #[must_use] pub fn from_files(files: Vec) -> Self { @@ -46,6 +59,7 @@ impl CfgSyncPayload { #[serde(rename_all = "snake_case")] pub enum CfgSyncErrorCode { MissingConfig, + NotReady, Internal, } @@ -66,6 +80,14 @@ impl CfgSyncErrorResponse { } } + #[must_use] + pub fn not_ready(identifier: &str) -> Self { + Self { + code: CfgSyncErrorCode::NotReady, + message: format!("config for host {identifier} is not ready"), + } + } + #[must_use] pub fn internal(message: impl Into) -> Self { Self { @@ -81,25 +103,72 @@ pub enum RepoResponse { Error(CfgSyncErrorResponse), } +/// Repository outcome for a node registration request. +pub enum RegistrationResponse { + Registered, + Error(CfgSyncErrorResponse), +} + /// Read-only source for cfgsync node payloads. pub trait ConfigProvider: Send + Sync { + fn register(&self, registration: NodeRegistration) -> RegistrationResponse; + fn resolve(&self, identifier: &str) -> RepoResponse; } /// In-memory map-backed provider used by cfgsync server state. pub struct ConfigRepo { configs: HashMap, + registrations: Mutex>, } impl ConfigRepo { #[must_use] pub fn from_bundle(configs: HashMap) -> Arc { - Arc::new(Self { configs }) + Arc::new(Self { + configs, + registrations: Mutex::new(HashSet::new()), + }) + } + + fn register_identifier(&self, identifier: &str) -> RegistrationResponse { + if !self.configs.contains_key(identifier) { + return RegistrationResponse::Error(CfgSyncErrorResponse::missing_config(identifier)); + } + + let mut registrations = self + .registrations + .lock() + .expect("cfgsync registration store should not be poisoned"); + registrations.insert(identifier.to_owned()); + + RegistrationResponse::Registered + } + + fn is_registered(&self, identifier: &str) -> bool { + let registrations = self + .registrations + .lock() + .expect("cfgsync registration store should not be poisoned"); + + registrations.contains(identifier) } } impl ConfigProvider for ConfigRepo { + fn register(&self, registration: NodeRegistration) -> RegistrationResponse { + self.register_identifier(®istration.identifier) + } + fn resolve(&self, identifier: &str) -> RepoResponse { + if !self.configs.contains_key(identifier) { + return RepoResponse::Error(CfgSyncErrorResponse::missing_config(identifier)); + } + + if !self.is_registered(identifier) { + return RepoResponse::Error(CfgSyncErrorResponse::not_ready(identifier)); + } + self.configs.get(identifier).cloned().map_or_else( || RepoResponse::Error(CfgSyncErrorResponse::missing_config(identifier)), RepoResponse::Config, @@ -150,12 +219,19 @@ impl FileConfigProvider { .collect(); Ok(Self { - inner: ConfigRepo { configs }, + inner: ConfigRepo { + configs, + registrations: Mutex::new(HashSet::new()), + }, }) } } impl ConfigProvider for FileConfigProvider { + fn register(&self, registration: NodeRegistration) -> RegistrationResponse { + self.inner.register(registration) + } + fn resolve(&self, identifier: &str) -> RepoResponse { self.inner.resolve(identifier) } @@ -181,7 +257,10 @@ mod tests { fn resolves_existing_identifier() { let mut configs = HashMap::new(); configs.insert("node-1".to_owned(), sample_payload()); - let repo = ConfigRepo { configs }; + let repo = ConfigRepo { + configs, + registrations: Mutex::new(HashSet::from(["node-1".to_owned()])), + }; match repo.resolve("node-1") { RepoResponse::Config(payload) => { @@ -197,6 +276,7 @@ mod tests { fn reports_missing_identifier() { let repo = ConfigRepo { configs: HashMap::new(), + registrations: Mutex::new(HashSet::new()), }; match repo.resolve("unknown-node") { @@ -225,9 +305,31 @@ nodes: let provider = FileConfigProvider::from_yaml_file(bundle_file.path()).expect("load file provider"); + let _ = provider.register(NodeRegistration { + identifier: "node-1".to_owned(), + ip: "127.0.0.1".parse().expect("parse ip"), + }); + match provider.resolve("node-1") { RepoResponse::Config(payload) => assert_eq!(payload.files.len(), 1), RepoResponse::Error(error) => panic!("expected config, got {error}"), } } + + #[test] + fn resolve_requires_registration_first() { + let mut configs = HashMap::new(); + configs.insert("node-1".to_owned(), sample_payload()); + let repo = ConfigRepo { + configs, + registrations: Mutex::new(HashSet::new()), + }; + + match repo.resolve("node-1") { + RepoResponse::Config(_) => panic!("expected not-ready error"), + RepoResponse::Error(error) => { + assert!(matches!(error.code, CfgSyncErrorCode::NotReady)); + } + } + } } diff --git a/cfgsync/core/src/server.rs b/cfgsync/core/src/server.rs index e841610..330c80d 100644 --- a/cfgsync/core/src/server.rs +++ b/cfgsync/core/src/server.rs @@ -1,19 +1,11 @@ -use std::{io, net::Ipv4Addr, sync::Arc}; +use std::{io, sync::Arc}; use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::post}; -use serde::{Deserialize, Serialize}; use thiserror::Error; -use crate::repo::{CfgSyncErrorCode, ConfigProvider, RepoResponse}; - -/// Request payload used by cfgsync client for node config resolution. -#[derive(Serialize, Deserialize)] -pub struct ClientIp { - /// Node IP that can be used by clients for observability/logging. - pub ip: Ipv4Addr, - /// Stable node identifier used as key in cfgsync bundle lookup. - pub identifier: String, -} +use crate::repo::{ + CfgSyncErrorCode, ConfigProvider, NodeRegistration, RegistrationResponse, RepoResponse, +}; /// Runtime state shared across cfgsync HTTP handlers. pub struct CfgSyncState { @@ -45,7 +37,7 @@ pub enum RunCfgsyncError { async fn node_config( State(state): State>, - Json(payload): Json, + Json(payload): Json, ) -> impl IntoResponse { let response = resolve_node_config_response(&state, &payload.identifier); @@ -59,6 +51,20 @@ async fn node_config( } } +async fn register_node( + State(state): State>, + Json(payload): Json, +) -> impl IntoResponse { + match state.repo.register(payload) { + RegistrationResponse::Registered => StatusCode::ACCEPTED.into_response(), + RegistrationResponse::Error(error) => { + let status = error_status(&error.code); + + (status, Json(error)).into_response() + } + } +} + fn resolve_node_config_response(state: &CfgSyncState, identifier: &str) -> RepoResponse { state.repo.resolve(identifier) } @@ -66,12 +72,14 @@ fn resolve_node_config_response(state: &CfgSyncState, identifier: &str) -> RepoR fn error_status(code: &CfgSyncErrorCode) -> StatusCode { match code { CfgSyncErrorCode::MissingConfig => StatusCode::NOT_FOUND, + CfgSyncErrorCode::NotReady => StatusCode::TOO_EARLY, CfgSyncErrorCode::Internal => StatusCode::INTERNAL_SERVER_ERROR, } } pub fn cfgsync_app(state: CfgSyncState) -> Router { Router::new() + .route("/register", post(register_node)) .route("/node", post(node_config)) .route("/init-with-node", post(node_config)) .with_state(Arc::new(state)) @@ -100,10 +108,10 @@ mod tests { use axum::{Json, extract::State, http::StatusCode, response::IntoResponse}; - use super::{CfgSyncState, ClientIp, node_config}; + use super::{CfgSyncState, NodeRegistration, node_config, register_node}; use crate::repo::{ CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile, - CfgSyncPayload, ConfigProvider, RepoResponse, + CfgSyncPayload, ConfigProvider, RegistrationResponse, RepoResponse, }; struct StaticProvider { @@ -111,6 +119,16 @@ mod tests { } impl ConfigProvider for StaticProvider { + fn register(&self, registration: NodeRegistration) -> RegistrationResponse { + if self.data.contains_key(®istration.identifier) { + RegistrationResponse::Registered + } else { + RegistrationResponse::Error(CfgSyncErrorResponse::missing_config( + ®istration.identifier, + )) + } + } + fn resolve(&self, identifier: &str) -> RepoResponse { self.data.get(identifier).cloned().map_or_else( || RepoResponse::Error(CfgSyncErrorResponse::missing_config(identifier)), @@ -131,13 +149,17 @@ mod tests { let mut data = HashMap::new(); data.insert("node-a".to_owned(), sample_payload()); - let provider = Arc::new(StaticProvider { data }); + let provider = crate::repo::ConfigRepo::from_bundle(data); let state = Arc::new(CfgSyncState::new(provider)); - let payload = ClientIp { + let payload = NodeRegistration { ip: "127.0.0.1".parse().expect("valid ip"), identifier: "node-a".to_owned(), }; + let _ = register_node(State(state.clone()), Json(payload.clone())) + .await + .into_response(); + let response = node_config(State(state), Json(payload)) .await .into_response(); @@ -151,7 +173,7 @@ mod tests { data: HashMap::new(), }); let state = Arc::new(CfgSyncState::new(provider)); - let payload = ClientIp { + let payload = NodeRegistration { ip: "127.0.0.1".parse().expect("valid ip"), identifier: "missing-node".to_owned(), }; @@ -169,4 +191,23 @@ mod tests { assert!(matches!(error.code, CfgSyncErrorCode::MissingConfig)); } + + #[tokio::test] + async fn node_config_returns_not_ready_before_registration() { + let mut data = HashMap::new(); + data.insert("node-a".to_owned(), sample_payload()); + + let provider = crate::repo::ConfigRepo::from_bundle(data); + let state = Arc::new(CfgSyncState::new(provider)); + let payload = NodeRegistration { + ip: "127.0.0.1".parse().expect("valid ip"), + identifier: "node-a".to_owned(), + }; + + let response = node_config(State(state), Json(payload)) + .await + .into_response(); + + assert_eq!(response.status(), StatusCode::TOO_EARLY); + } } diff --git a/cfgsync/runtime/src/client.rs b/cfgsync/runtime/src/client.rs index aab3d1c..82dbe65 100644 --- a/cfgsync/runtime/src/client.rs +++ b/cfgsync/runtime/src/client.rs @@ -5,7 +5,9 @@ use std::{ }; use anyhow::{Context as _, Result, bail}; -use cfgsync_core::{CFGSYNC_SCHEMA_VERSION, CfgSyncClient, CfgSyncFile, CfgSyncPayload, ClientIp}; +use cfgsync_core::{ + CFGSYNC_SCHEMA_VERSION, CfgSyncClient, CfgSyncFile, CfgSyncPayload, NodeRegistration, +}; use thiserror::Error; use tokio::time::{Duration, sleep}; use tracing::info; @@ -19,7 +21,7 @@ enum ClientEnvError { InvalidIp { value: String }, } -async fn fetch_with_retry(payload: &ClientIp, server_addr: &str) -> Result { +async fn fetch_with_retry(payload: &NodeRegistration, server_addr: &str) -> Result { let client = CfgSyncClient::new(server_addr); for attempt in 1..=FETCH_ATTEMPTS { @@ -40,13 +42,15 @@ async fn fetch_with_retry(payload: &ClientIp, server_addr: &str) -> Result Result { +async fn fetch_once(client: &CfgSyncClient, payload: &NodeRegistration) -> Result { let response = client.fetch_node_config(payload).await?; Ok(response) } -async fn pull_config_files(payload: ClientIp, server_addr: &str) -> Result<()> { +async fn pull_config_files(payload: NodeRegistration, server_addr: &str) -> Result<()> { + register_node(&payload, server_addr).await?; + let config = fetch_with_retry(&payload, server_addr) .await .context("fetching cfgsync node config")?; @@ -63,6 +67,30 @@ async fn pull_config_files(payload: ClientIp, server_addr: &str) -> Result<()> { Ok(()) } +async fn register_node(payload: &NodeRegistration, server_addr: &str) -> Result<()> { + let client = CfgSyncClient::new(server_addr); + + for attempt in 1..=FETCH_ATTEMPTS { + match client.register_node(payload).await { + Ok(()) => { + info!(identifier = %payload.identifier, "cfgsync node registered"); + return Ok(()); + } + Err(error) => { + if attempt == FETCH_ATTEMPTS { + return Err(error).with_context(|| { + format!("registering node with cfgsync after {attempt} attempts") + }); + } + + sleep(FETCH_RETRY_DELAY).await; + } + } + } + + unreachable!("cfgsync register loop always returns before exhausting attempts"); +} + fn ensure_schema_version(config: &CfgSyncPayload) -> Result<()> { if config.schema_version != CFGSYNC_SCHEMA_VERSION { bail!( @@ -118,7 +146,7 @@ pub async fn run_cfgsync_client_from_env(default_port: u16) -> Result<()> { let identifier = env::var("CFG_HOST_IDENTIFIER").unwrap_or_else(|_| "unidentified-node".to_owned()); - pull_config_files(ClientIp { ip, identifier }, &server_addr).await + pull_config_files(NodeRegistration { ip, identifier }, &server_addr).await } fn parse_ip_env(ip_str: &str) -> Result { @@ -164,7 +192,7 @@ mod tests { }); pull_config_files( - ClientIp { + NodeRegistration { ip: "127.0.0.1".parse().expect("parse ip"), identifier: "node-1".to_owned(), }, diff --git a/logos/runtime/ext/src/cfgsync/mod.rs b/logos/runtime/ext/src/cfgsync/mod.rs index 7fe4e44..074223d 100644 --- a/logos/runtime/ext/src/cfgsync/mod.rs +++ b/logos/runtime/ext/src/cfgsync/mod.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use cfgsync_adapter::{CfgsyncEnv, build_cfgsync_node_configs}; +use cfgsync_adapter::{CfgsyncEnv, build_cfgsync_node_catalog}; pub(crate) use cfgsync_core::render::CfgsyncOutputPaths; use cfgsync_core::{ CfgSyncBundle, CfgSyncBundleNode, @@ -49,7 +49,7 @@ fn build_cfgsync_bundle( topology: &E::Deployment, hostnames: &[String], ) -> Result { - let nodes = build_cfgsync_node_configs::(topology, hostnames)?; + let nodes = build_cfgsync_node_catalog::(topology, hostnames)?.into_configs(); let nodes = nodes .into_iter() .map(|node| CfgSyncBundleNode {