mirror of
https://github.com/logos-blockchain/logos-blockchain-testing.git
synced 2026-04-01 00:33:40 +00:00
Refine cfgsync core naming for external use
This commit is contained in:
parent
a751d819ea
commit
65fb8da5a5
@ -2,8 +2,8 @@ use std::{collections::HashMap, error::Error, sync::Mutex};
|
||||
|
||||
use cfgsync_artifacts::ArtifactFile;
|
||||
use cfgsync_core::{
|
||||
CfgSyncErrorResponse, CfgSyncPayload, ConfigProvider, NodeRegistration, RegistrationResponse,
|
||||
RepoResponse,
|
||||
CfgSyncErrorResponse, CfgSyncPayload, ConfigResolveResponse, NodeConfigSource,
|
||||
NodeRegistration, RegisterNodeResponse,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use thiserror::Error;
|
||||
@ -241,25 +241,25 @@ impl<M> SnapshotConfigProvider<M> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<M> ConfigProvider for SnapshotConfigProvider<M>
|
||||
impl<M> NodeConfigSource for SnapshotConfigProvider<M>
|
||||
where
|
||||
M: RegistrationSetMaterializer,
|
||||
{
|
||||
fn register(&self, registration: NodeRegistration) -> RegistrationResponse {
|
||||
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse {
|
||||
let mut registrations = self
|
||||
.registrations
|
||||
.lock()
|
||||
.expect("cfgsync registration store should not be poisoned");
|
||||
registrations.insert(registration.identifier.clone(), registration);
|
||||
|
||||
RegistrationResponse::Registered
|
||||
RegisterNodeResponse::Registered
|
||||
}
|
||||
|
||||
fn resolve(&self, registration: &NodeRegistration) -> RepoResponse {
|
||||
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse {
|
||||
let registration = match self.registration_for(®istration.identifier) {
|
||||
Some(registration) => registration,
|
||||
None => {
|
||||
return RepoResponse::Error(CfgSyncErrorResponse::not_ready(
|
||||
return ConfigResolveResponse::Error(CfgSyncErrorResponse::not_ready(
|
||||
®istration.identifier,
|
||||
));
|
||||
}
|
||||
@ -269,45 +269,47 @@ where
|
||||
let catalog = match self.materializer.materialize_snapshot(®istrations) {
|
||||
Ok(Some(catalog)) => catalog,
|
||||
Ok(None) => {
|
||||
return RepoResponse::Error(CfgSyncErrorResponse::not_ready(
|
||||
return ConfigResolveResponse::Error(CfgSyncErrorResponse::not_ready(
|
||||
®istration.identifier,
|
||||
));
|
||||
}
|
||||
Err(error) => {
|
||||
return RepoResponse::Error(CfgSyncErrorResponse::internal(format!(
|
||||
return ConfigResolveResponse::Error(CfgSyncErrorResponse::internal(format!(
|
||||
"failed to materialize config snapshot: {error}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
match catalog.resolve(®istration.identifier) {
|
||||
Some(config) => RepoResponse::Config(CfgSyncPayload::from_files(config.files.clone())),
|
||||
None => RepoResponse::Error(CfgSyncErrorResponse::missing_config(
|
||||
Some(config) => {
|
||||
ConfigResolveResponse::Config(CfgSyncPayload::from_files(config.files.clone()))
|
||||
}
|
||||
None => ConfigResolveResponse::Error(CfgSyncErrorResponse::missing_config(
|
||||
®istration.identifier,
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M> ConfigProvider for RegistrationConfigProvider<M>
|
||||
impl<M> NodeConfigSource for RegistrationConfigProvider<M>
|
||||
where
|
||||
M: NodeArtifactsMaterializer,
|
||||
{
|
||||
fn register(&self, registration: NodeRegistration) -> RegistrationResponse {
|
||||
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse {
|
||||
let mut registrations = self
|
||||
.registrations
|
||||
.lock()
|
||||
.expect("cfgsync registration store should not be poisoned");
|
||||
registrations.insert(registration.identifier.clone(), registration);
|
||||
|
||||
RegistrationResponse::Registered
|
||||
RegisterNodeResponse::Registered
|
||||
}
|
||||
|
||||
fn resolve(&self, registration: &NodeRegistration) -> RepoResponse {
|
||||
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse {
|
||||
let registration = match self.registration_for(®istration.identifier) {
|
||||
Some(registration) => registration,
|
||||
None => {
|
||||
return RepoResponse::Error(CfgSyncErrorResponse::not_ready(
|
||||
return ConfigResolveResponse::Error(CfgSyncErrorResponse::not_ready(
|
||||
®istration.identifier,
|
||||
));
|
||||
}
|
||||
@ -315,13 +317,13 @@ where
|
||||
let registrations = self.registration_set();
|
||||
|
||||
match self.materializer.materialize(®istration, ®istrations) {
|
||||
Ok(Some(artifacts)) => {
|
||||
RepoResponse::Config(CfgSyncPayload::from_files(artifacts.files().to_vec()))
|
||||
}
|
||||
Ok(None) => {
|
||||
RepoResponse::Error(CfgSyncErrorResponse::not_ready(®istration.identifier))
|
||||
}
|
||||
Err(error) => RepoResponse::Error(CfgSyncErrorResponse::internal(format!(
|
||||
Ok(Some(artifacts)) => ConfigResolveResponse::Config(CfgSyncPayload::from_files(
|
||||
artifacts.files().to_vec(),
|
||||
)),
|
||||
Ok(None) => ConfigResolveResponse::Error(CfgSyncErrorResponse::not_ready(
|
||||
®istration.identifier,
|
||||
)),
|
||||
Err(error) => ConfigResolveResponse::Error(CfgSyncErrorResponse::internal(format!(
|
||||
"failed to materialize config for host {}: {error}",
|
||||
registration.identifier
|
||||
))),
|
||||
@ -481,7 +483,9 @@ mod tests {
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use cfgsync_artifacts::ArtifactFile;
|
||||
use cfgsync_core::{CfgSyncErrorCode, ConfigProvider, NodeRegistration, RepoResponse};
|
||||
use cfgsync_core::{
|
||||
CfgSyncErrorCode, ConfigResolveResponse, NodeConfigSource, NodeRegistration,
|
||||
};
|
||||
|
||||
use super::{
|
||||
ArtifactSet, DynCfgsyncError, NodeArtifacts, NodeArtifactsCatalog,
|
||||
@ -512,8 +516,10 @@ mod tests {
|
||||
let _ = provider.register(registration.clone());
|
||||
|
||||
match provider.resolve(®istration) {
|
||||
RepoResponse::Config(payload) => assert_eq!(payload.files()[0].path, "/config.yaml"),
|
||||
RepoResponse::Error(error) => panic!("expected config, got {error}"),
|
||||
ConfigResolveResponse::Config(payload) => {
|
||||
assert_eq!(payload.files()[0].path, "/config.yaml")
|
||||
}
|
||||
ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"),
|
||||
}
|
||||
}
|
||||
|
||||
@ -527,8 +533,10 @@ mod tests {
|
||||
let registration = NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip"));
|
||||
|
||||
match provider.resolve(®istration) {
|
||||
RepoResponse::Config(_) => panic!("expected not-ready error"),
|
||||
RepoResponse::Error(error) => assert!(matches!(error.code, CfgSyncErrorCode::NotReady)),
|
||||
ConfigResolveResponse::Config(_) => panic!("expected not-ready error"),
|
||||
ConfigResolveResponse::Error(error) => {
|
||||
assert!(matches!(error.code, CfgSyncErrorCode::NotReady))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -569,18 +577,20 @@ mod tests {
|
||||
let _ = provider.register(node_a.clone());
|
||||
|
||||
match provider.resolve(&node_a) {
|
||||
RepoResponse::Config(_) => panic!("expected not-ready error"),
|
||||
RepoResponse::Error(error) => assert!(matches!(error.code, CfgSyncErrorCode::NotReady)),
|
||||
ConfigResolveResponse::Config(_) => panic!("expected not-ready error"),
|
||||
ConfigResolveResponse::Error(error) => {
|
||||
assert!(matches!(error.code, CfgSyncErrorCode::NotReady))
|
||||
}
|
||||
}
|
||||
|
||||
let _ = provider.register(node_b);
|
||||
|
||||
match provider.resolve(&node_a) {
|
||||
RepoResponse::Config(payload) => {
|
||||
ConfigResolveResponse::Config(payload) => {
|
||||
assert_eq!(payload.files()[0].content, "id: node-a");
|
||||
assert_eq!(payload.files()[1].content, "peers: 2");
|
||||
}
|
||||
RepoResponse::Error(error) => panic!("expected config, got {error}"),
|
||||
ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -12,8 +12,15 @@ pub use render::{
|
||||
render_cfgsync_yaml_from_template, write_rendered_cfgsync,
|
||||
};
|
||||
pub use repo::{
|
||||
CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile, CfgSyncPayload,
|
||||
ConfigProvider, ConfigRepo, FileConfigProvider, FileConfigProviderError, NodeRegistration,
|
||||
RegistrationPayload, RegistrationResponse, RepoResponse,
|
||||
BundleConfigSource, BundleConfigSourceError, CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode,
|
||||
CfgSyncErrorResponse, CfgSyncFile, CfgSyncPayload, ConfigResolveResponse, NodeConfigSource,
|
||||
NodeRegistration, RegisterNodeResponse, RegistrationPayload, StaticConfigSource,
|
||||
};
|
||||
pub use server::{CfgSyncState, RunCfgsyncError, cfgsync_app, run_cfgsync};
|
||||
#[doc(hidden)]
|
||||
pub use repo::{
|
||||
ConfigProvider, ConfigRepo, FileConfigProvider, FileConfigProviderError, RegistrationResponse,
|
||||
RepoResponse,
|
||||
};
|
||||
#[doc(hidden)]
|
||||
pub use server::CfgSyncState;
|
||||
pub use server::{CfgsyncServerState, RunCfgsyncError, cfgsync_app, run_cfgsync};
|
||||
|
||||
@ -199,59 +199,59 @@ impl CfgSyncErrorResponse {
|
||||
}
|
||||
}
|
||||
|
||||
/// Repository resolution outcome for a requested node identifier.
|
||||
pub enum RepoResponse {
|
||||
/// Resolution outcome for a requested node identifier.
|
||||
pub enum ConfigResolveResponse {
|
||||
Config(CfgSyncPayload),
|
||||
Error(CfgSyncErrorResponse),
|
||||
}
|
||||
|
||||
/// Repository outcome for a node registration request.
|
||||
pub enum RegistrationResponse {
|
||||
/// Outcome for a node registration request.
|
||||
pub enum RegisterNodeResponse {
|
||||
Registered,
|
||||
Error(CfgSyncErrorResponse),
|
||||
}
|
||||
|
||||
/// Read-only source for cfgsync node payloads.
|
||||
pub trait ConfigProvider: Send + Sync {
|
||||
fn register(&self, registration: NodeRegistration) -> RegistrationResponse;
|
||||
/// Source of cfgsync node payloads.
|
||||
pub trait NodeConfigSource: Send + Sync {
|
||||
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse;
|
||||
|
||||
fn resolve(&self, registration: &NodeRegistration) -> RepoResponse;
|
||||
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse;
|
||||
}
|
||||
|
||||
/// In-memory map-backed provider used by cfgsync server state.
|
||||
pub struct ConfigRepo {
|
||||
/// In-memory map-backed source used by cfgsync server state.
|
||||
pub struct StaticConfigSource {
|
||||
configs: HashMap<String, CfgSyncPayload>,
|
||||
}
|
||||
|
||||
impl ConfigRepo {
|
||||
impl StaticConfigSource {
|
||||
#[must_use]
|
||||
pub fn from_bundle(configs: HashMap<String, CfgSyncPayload>) -> Arc<Self> {
|
||||
Arc::new(Self { configs })
|
||||
}
|
||||
}
|
||||
|
||||
impl ConfigProvider for ConfigRepo {
|
||||
fn register(&self, registration: NodeRegistration) -> RegistrationResponse {
|
||||
impl NodeConfigSource for StaticConfigSource {
|
||||
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse {
|
||||
if self.configs.contains_key(®istration.identifier) {
|
||||
RegistrationResponse::Registered
|
||||
RegisterNodeResponse::Registered
|
||||
} else {
|
||||
RegistrationResponse::Error(CfgSyncErrorResponse::missing_config(
|
||||
RegisterNodeResponse::Error(CfgSyncErrorResponse::missing_config(
|
||||
®istration.identifier,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve(&self, registration: &NodeRegistration) -> RepoResponse {
|
||||
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse {
|
||||
self.configs
|
||||
.get(®istration.identifier)
|
||||
.cloned()
|
||||
.map_or_else(
|
||||
|| {
|
||||
RepoResponse::Error(CfgSyncErrorResponse::missing_config(
|
||||
ConfigResolveResponse::Error(CfgSyncErrorResponse::missing_config(
|
||||
®istration.identifier,
|
||||
))
|
||||
},
|
||||
RepoResponse::Config,
|
||||
ConfigResolveResponse::Config,
|
||||
)
|
||||
}
|
||||
}
|
||||
@ -345,24 +345,24 @@ mod tests {
|
||||
fn resolves_existing_identifier() {
|
||||
let mut configs = HashMap::new();
|
||||
configs.insert("node-1".to_owned(), sample_payload());
|
||||
let repo = ConfigRepo { configs };
|
||||
let repo = StaticConfigSource { configs };
|
||||
|
||||
match repo.resolve(&NodeRegistration::new(
|
||||
"node-1",
|
||||
"127.0.0.1".parse().expect("parse ip"),
|
||||
)) {
|
||||
RepoResponse::Config(payload) => {
|
||||
ConfigResolveResponse::Config(payload) => {
|
||||
assert_eq!(payload.schema_version, CFGSYNC_SCHEMA_VERSION);
|
||||
assert_eq!(payload.files.len(), 1);
|
||||
assert_eq!(payload.files[0].path, "/config.yaml");
|
||||
}
|
||||
RepoResponse::Error(error) => panic!("expected config response, got {error}"),
|
||||
ConfigResolveResponse::Error(error) => panic!("expected config response, got {error}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reports_missing_identifier() {
|
||||
let repo = ConfigRepo {
|
||||
let repo = StaticConfigSource {
|
||||
configs: HashMap::new(),
|
||||
};
|
||||
|
||||
@ -370,8 +370,8 @@ mod tests {
|
||||
"unknown-node",
|
||||
"127.0.0.1".parse().expect("parse ip"),
|
||||
)) {
|
||||
RepoResponse::Config(_) => panic!("expected missing-config error"),
|
||||
RepoResponse::Error(error) => {
|
||||
ConfigResolveResponse::Config(_) => panic!("expected missing-config error"),
|
||||
ConfigResolveResponse::Error(error) => {
|
||||
assert!(matches!(error.code, CfgSyncErrorCode::MissingConfig));
|
||||
assert!(error.message.contains("unknown-node"));
|
||||
}
|
||||
@ -393,7 +393,7 @@ nodes:
|
||||
.expect("write bundle yaml");
|
||||
|
||||
let provider =
|
||||
FileConfigProvider::from_yaml_file(bundle_file.path()).expect("load file provider");
|
||||
BundleConfigSource::from_yaml_file(bundle_file.path()).expect("load file provider");
|
||||
|
||||
let _ = provider.register(NodeRegistration::new(
|
||||
"node-1",
|
||||
@ -404,8 +404,8 @@ nodes:
|
||||
"node-1",
|
||||
"127.0.0.1".parse().expect("parse ip"),
|
||||
)) {
|
||||
RepoResponse::Config(payload) => assert_eq!(payload.files.len(), 1),
|
||||
RepoResponse::Error(error) => panic!("expected config, got {error}"),
|
||||
ConfigResolveResponse::Config(payload) => assert_eq!(payload.files.len(), 1),
|
||||
ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"),
|
||||
}
|
||||
}
|
||||
|
||||
@ -413,21 +413,21 @@ nodes:
|
||||
fn resolve_accepts_known_registration_without_gating() {
|
||||
let mut configs = HashMap::new();
|
||||
configs.insert("node-1".to_owned(), sample_payload());
|
||||
let repo = ConfigRepo { configs };
|
||||
let repo = StaticConfigSource { configs };
|
||||
|
||||
match repo.resolve(&NodeRegistration::new(
|
||||
"node-1",
|
||||
"127.0.0.1".parse().expect("parse ip"),
|
||||
)) {
|
||||
RepoResponse::Config(_) => {}
|
||||
RepoResponse::Error(error) => panic!("expected config, got {error}"),
|
||||
ConfigResolveResponse::Config(_) => {}
|
||||
ConfigResolveResponse::Error(error) => panic!("expected config, got {error}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Failures when loading a file-backed cfgsync provider.
|
||||
/// Failures when loading a bundle-backed cfgsync source.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum FileConfigProviderError {
|
||||
pub enum BundleConfigSourceError {
|
||||
#[error("failed to read cfgsync bundle at {path}: {source}")]
|
||||
Read {
|
||||
path: String,
|
||||
@ -442,21 +442,21 @@ pub enum FileConfigProviderError {
|
||||
},
|
||||
}
|
||||
|
||||
/// YAML bundle-backed provider implementation.
|
||||
pub struct FileConfigProvider {
|
||||
inner: ConfigRepo,
|
||||
/// YAML bundle-backed source implementation.
|
||||
pub struct BundleConfigSource {
|
||||
inner: StaticConfigSource,
|
||||
}
|
||||
|
||||
impl FileConfigProvider {
|
||||
impl BundleConfigSource {
|
||||
/// Loads provider state from a cfgsync bundle YAML file.
|
||||
pub fn from_yaml_file(path: &Path) -> Result<Self, FileConfigProviderError> {
|
||||
let raw = fs::read_to_string(path).map_err(|source| FileConfigProviderError::Read {
|
||||
pub fn from_yaml_file(path: &Path) -> Result<Self, BundleConfigSourceError> {
|
||||
let raw = fs::read_to_string(path).map_err(|source| BundleConfigSourceError::Read {
|
||||
path: path.display().to_string(),
|
||||
source,
|
||||
})?;
|
||||
|
||||
let bundle: CfgSyncBundle =
|
||||
serde_yaml::from_str(&raw).map_err(|source| FileConfigProviderError::Parse {
|
||||
serde_yaml::from_str(&raw).map_err(|source| BundleConfigSourceError::Parse {
|
||||
path: path.display().to_string(),
|
||||
source,
|
||||
})?;
|
||||
@ -468,17 +468,17 @@ impl FileConfigProvider {
|
||||
.collect();
|
||||
|
||||
Ok(Self {
|
||||
inner: ConfigRepo { configs },
|
||||
inner: StaticConfigSource { configs },
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl ConfigProvider for FileConfigProvider {
|
||||
fn register(&self, registration: NodeRegistration) -> RegistrationResponse {
|
||||
impl NodeConfigSource for BundleConfigSource {
|
||||
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse {
|
||||
self.inner.register(registration)
|
||||
}
|
||||
|
||||
fn resolve(&self, registration: &NodeRegistration) -> RepoResponse {
|
||||
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse {
|
||||
self.inner.resolve(registration)
|
||||
}
|
||||
}
|
||||
@ -486,3 +486,23 @@ impl ConfigProvider for FileConfigProvider {
|
||||
fn payload_from_bundle_node(node: CfgSyncBundleNode) -> (String, CfgSyncPayload) {
|
||||
(node.identifier, CfgSyncPayload::from_files(node.files))
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub type RepoResponse = ConfigResolveResponse;
|
||||
|
||||
#[doc(hidden)]
|
||||
pub type RegistrationResponse = RegisterNodeResponse;
|
||||
|
||||
#[doc(hidden)]
|
||||
pub trait ConfigProvider: NodeConfigSource {}
|
||||
|
||||
impl<T: NodeConfigSource + ?Sized> ConfigProvider for T {}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub type ConfigRepo = StaticConfigSource;
|
||||
|
||||
#[doc(hidden)]
|
||||
pub type FileConfigProvider = BundleConfigSource;
|
||||
|
||||
#[doc(hidden)]
|
||||
pub type FileConfigProviderError = BundleConfigSourceError;
|
||||
|
||||
@ -4,17 +4,18 @@ use axum::{Json, Router, extract::State, http::StatusCode, response::IntoRespons
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::repo::{
|
||||
CfgSyncErrorCode, ConfigProvider, NodeRegistration, RegistrationResponse, RepoResponse,
|
||||
CfgSyncErrorCode, ConfigResolveResponse, NodeConfigSource, NodeRegistration,
|
||||
RegisterNodeResponse,
|
||||
};
|
||||
|
||||
/// Runtime state shared across cfgsync HTTP handlers.
|
||||
pub struct CfgSyncState {
|
||||
repo: Arc<dyn ConfigProvider>,
|
||||
pub struct CfgsyncServerState {
|
||||
repo: Arc<dyn NodeConfigSource>,
|
||||
}
|
||||
|
||||
impl CfgSyncState {
|
||||
impl CfgsyncServerState {
|
||||
#[must_use]
|
||||
pub fn new(repo: Arc<dyn ConfigProvider>) -> Self {
|
||||
pub fn new(repo: Arc<dyn NodeConfigSource>) -> Self {
|
||||
Self { repo }
|
||||
}
|
||||
}
|
||||
@ -36,14 +37,16 @@ pub enum RunCfgsyncError {
|
||||
}
|
||||
|
||||
async fn node_config(
|
||||
State(state): State<Arc<CfgSyncState>>,
|
||||
State(state): State<Arc<CfgsyncServerState>>,
|
||||
Json(payload): Json<NodeRegistration>,
|
||||
) -> impl IntoResponse {
|
||||
let response = resolve_node_config_response(&state, &payload);
|
||||
|
||||
match response {
|
||||
RepoResponse::Config(payload_data) => (StatusCode::OK, Json(payload_data)).into_response(),
|
||||
RepoResponse::Error(error) => {
|
||||
ConfigResolveResponse::Config(payload_data) => {
|
||||
(StatusCode::OK, Json(payload_data)).into_response()
|
||||
}
|
||||
ConfigResolveResponse::Error(error) => {
|
||||
let status = error_status(&error.code);
|
||||
|
||||
(status, Json(error)).into_response()
|
||||
@ -52,12 +55,12 @@ async fn node_config(
|
||||
}
|
||||
|
||||
async fn register_node(
|
||||
State(state): State<Arc<CfgSyncState>>,
|
||||
State(state): State<Arc<CfgsyncServerState>>,
|
||||
Json(payload): Json<NodeRegistration>,
|
||||
) -> impl IntoResponse {
|
||||
match state.repo.register(payload) {
|
||||
RegistrationResponse::Registered => StatusCode::ACCEPTED.into_response(),
|
||||
RegistrationResponse::Error(error) => {
|
||||
RegisterNodeResponse::Registered => StatusCode::ACCEPTED.into_response(),
|
||||
RegisterNodeResponse::Error(error) => {
|
||||
let status = error_status(&error.code);
|
||||
|
||||
(status, Json(error)).into_response()
|
||||
@ -66,9 +69,9 @@ async fn register_node(
|
||||
}
|
||||
|
||||
fn resolve_node_config_response(
|
||||
state: &CfgSyncState,
|
||||
state: &CfgsyncServerState,
|
||||
registration: &NodeRegistration,
|
||||
) -> RepoResponse {
|
||||
) -> ConfigResolveResponse {
|
||||
state.repo.resolve(registration)
|
||||
}
|
||||
|
||||
@ -80,7 +83,7 @@ fn error_status(code: &CfgSyncErrorCode) -> StatusCode {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn cfgsync_app(state: CfgSyncState) -> Router {
|
||||
pub fn cfgsync_app(state: CfgsyncServerState) -> Router {
|
||||
Router::new()
|
||||
.route("/register", post(register_node))
|
||||
.route("/node", post(node_config))
|
||||
@ -89,7 +92,7 @@ pub fn cfgsync_app(state: CfgSyncState) -> Router {
|
||||
}
|
||||
|
||||
/// Runs cfgsync HTTP server on the provided port until shutdown/error.
|
||||
pub async fn run_cfgsync(port: u16, state: CfgSyncState) -> Result<(), RunCfgsyncError> {
|
||||
pub async fn run_cfgsync(port: u16, state: CfgsyncServerState) -> Result<(), RunCfgsyncError> {
|
||||
let app = cfgsync_app(state);
|
||||
println!("Server running on http://0.0.0.0:{port}");
|
||||
|
||||
@ -105,44 +108,47 @@ pub async fn run_cfgsync(port: u16, state: CfgSyncState) -> Result<(), RunCfgsyn
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
pub type CfgSyncState = CfgsyncServerState;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use axum::{Json, extract::State, http::StatusCode, response::IntoResponse};
|
||||
|
||||
use super::{CfgSyncState, NodeRegistration, node_config, register_node};
|
||||
use super::{CfgsyncServerState, NodeRegistration, node_config, register_node};
|
||||
use crate::repo::{
|
||||
CFGSYNC_SCHEMA_VERSION, CfgSyncErrorCode, CfgSyncErrorResponse, CfgSyncFile,
|
||||
CfgSyncPayload, ConfigProvider, RegistrationResponse, RepoResponse,
|
||||
CfgSyncPayload, ConfigResolveResponse, NodeConfigSource, RegisterNodeResponse,
|
||||
};
|
||||
|
||||
struct StaticProvider {
|
||||
data: HashMap<String, CfgSyncPayload>,
|
||||
}
|
||||
|
||||
impl ConfigProvider for StaticProvider {
|
||||
fn register(&self, registration: NodeRegistration) -> RegistrationResponse {
|
||||
impl NodeConfigSource for StaticProvider {
|
||||
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse {
|
||||
if self.data.contains_key(®istration.identifier) {
|
||||
RegistrationResponse::Registered
|
||||
RegisterNodeResponse::Registered
|
||||
} else {
|
||||
RegistrationResponse::Error(CfgSyncErrorResponse::missing_config(
|
||||
RegisterNodeResponse::Error(CfgSyncErrorResponse::missing_config(
|
||||
®istration.identifier,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve(&self, registration: &NodeRegistration) -> RepoResponse {
|
||||
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse {
|
||||
self.data
|
||||
.get(®istration.identifier)
|
||||
.cloned()
|
||||
.map_or_else(
|
||||
|| {
|
||||
RepoResponse::Error(CfgSyncErrorResponse::missing_config(
|
||||
ConfigResolveResponse::Error(CfgSyncErrorResponse::missing_config(
|
||||
®istration.identifier,
|
||||
))
|
||||
},
|
||||
RepoResponse::Config,
|
||||
ConfigResolveResponse::Config,
|
||||
)
|
||||
}
|
||||
}
|
||||
@ -152,10 +158,10 @@ mod tests {
|
||||
registrations: std::sync::Mutex<HashMap<String, NodeRegistration>>,
|
||||
}
|
||||
|
||||
impl ConfigProvider for RegistrationAwareProvider {
|
||||
fn register(&self, registration: NodeRegistration) -> RegistrationResponse {
|
||||
impl NodeConfigSource for RegistrationAwareProvider {
|
||||
fn register(&self, registration: NodeRegistration) -> RegisterNodeResponse {
|
||||
if !self.data.contains_key(®istration.identifier) {
|
||||
return RegistrationResponse::Error(CfgSyncErrorResponse::missing_config(
|
||||
return RegisterNodeResponse::Error(CfgSyncErrorResponse::missing_config(
|
||||
®istration.identifier,
|
||||
));
|
||||
}
|
||||
@ -166,17 +172,17 @@ mod tests {
|
||||
.expect("test registration store should not be poisoned");
|
||||
registrations.insert(registration.identifier.clone(), registration);
|
||||
|
||||
RegistrationResponse::Registered
|
||||
RegisterNodeResponse::Registered
|
||||
}
|
||||
|
||||
fn resolve(&self, registration: &NodeRegistration) -> RepoResponse {
|
||||
fn resolve(&self, registration: &NodeRegistration) -> ConfigResolveResponse {
|
||||
let registrations = self
|
||||
.registrations
|
||||
.lock()
|
||||
.expect("test registration store should not be poisoned");
|
||||
|
||||
if !registrations.contains_key(®istration.identifier) {
|
||||
return RepoResponse::Error(CfgSyncErrorResponse::not_ready(
|
||||
return ConfigResolveResponse::Error(CfgSyncErrorResponse::not_ready(
|
||||
®istration.identifier,
|
||||
));
|
||||
}
|
||||
@ -186,11 +192,11 @@ mod tests {
|
||||
.cloned()
|
||||
.map_or_else(
|
||||
|| {
|
||||
RepoResponse::Error(CfgSyncErrorResponse::missing_config(
|
||||
ConfigResolveResponse::Error(CfgSyncErrorResponse::missing_config(
|
||||
®istration.identifier,
|
||||
))
|
||||
},
|
||||
RepoResponse::Config,
|
||||
ConfigResolveResponse::Config,
|
||||
)
|
||||
}
|
||||
}
|
||||
@ -211,7 +217,7 @@ mod tests {
|
||||
data,
|
||||
registrations: std::sync::Mutex::new(HashMap::new()),
|
||||
});
|
||||
let state = Arc::new(CfgSyncState::new(provider));
|
||||
let state = Arc::new(CfgsyncServerState::new(provider));
|
||||
let payload = NodeRegistration::new("node-a", "127.0.0.1".parse().expect("valid ip"));
|
||||
|
||||
let _ = register_node(State(state.clone()), Json(payload.clone()))
|
||||
@ -230,7 +236,7 @@ mod tests {
|
||||
let provider = Arc::new(StaticProvider {
|
||||
data: HashMap::new(),
|
||||
});
|
||||
let state = Arc::new(CfgSyncState::new(provider));
|
||||
let state = Arc::new(CfgsyncServerState::new(provider));
|
||||
let payload = NodeRegistration::new("missing-node", "127.0.0.1".parse().expect("valid ip"));
|
||||
|
||||
let response = node_config(State(state), Json(payload))
|
||||
@ -256,7 +262,7 @@ mod tests {
|
||||
data,
|
||||
registrations: std::sync::Mutex::new(HashMap::new()),
|
||||
});
|
||||
let state = Arc::new(CfgSyncState::new(provider));
|
||||
let state = Arc::new(CfgsyncServerState::new(provider));
|
||||
let payload = NodeRegistration::new("node-a", "127.0.0.1".parse().expect("valid ip"));
|
||||
|
||||
let response = node_config(State(state), Json(payload))
|
||||
|
||||
@ -181,7 +181,8 @@ mod tests {
|
||||
use std::collections::HashMap;
|
||||
|
||||
use cfgsync_core::{
|
||||
CfgSyncBundle, CfgSyncBundleNode, CfgSyncPayload, CfgSyncState, ConfigRepo, run_cfgsync,
|
||||
CfgSyncBundle, CfgSyncBundleNode, CfgSyncPayload, CfgsyncServerState, StaticConfigSource,
|
||||
run_cfgsync,
|
||||
};
|
||||
use tempfile::tempdir;
|
||||
|
||||
@ -201,8 +202,8 @@ mod tests {
|
||||
],
|
||||
}]);
|
||||
|
||||
let repo = ConfigRepo::from_bundle(bundle_to_payload_map(bundle));
|
||||
let state = CfgSyncState::new(repo);
|
||||
let repo = StaticConfigSource::from_bundle(bundle_to_payload_map(bundle));
|
||||
let state = CfgsyncServerState::new(repo);
|
||||
let port = allocate_test_port();
|
||||
let address = format!("http://127.0.0.1:{port}");
|
||||
let server = tokio::spawn(async move {
|
||||
|
||||
@ -2,7 +2,9 @@ use std::{fs, path::Path, sync::Arc};
|
||||
|
||||
use anyhow::Context as _;
|
||||
use cfgsync_adapter::{NodeArtifacts, NodeArtifactsCatalog, RegistrationConfigProvider};
|
||||
use cfgsync_core::{CfgSyncBundle, CfgSyncState, ConfigProvider, FileConfigProvider, run_cfgsync};
|
||||
use cfgsync_core::{
|
||||
BundleConfigSource, CfgSyncBundle, CfgsyncServerState, NodeConfigSource, run_cfgsync,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
|
||||
/// Runtime cfgsync server config loaded from YAML.
|
||||
@ -25,14 +27,14 @@ impl CfgSyncServerConfig {
|
||||
}
|
||||
}
|
||||
|
||||
fn load_bundle_provider(bundle_path: &Path) -> anyhow::Result<Arc<dyn ConfigProvider>> {
|
||||
let provider = FileConfigProvider::from_yaml_file(bundle_path)
|
||||
fn load_bundle_provider(bundle_path: &Path) -> anyhow::Result<Arc<dyn NodeConfigSource>> {
|
||||
let provider = BundleConfigSource::from_yaml_file(bundle_path)
|
||||
.with_context(|| format!("loading cfgsync provider from {}", bundle_path.display()))?;
|
||||
|
||||
Ok(Arc::new(provider))
|
||||
}
|
||||
|
||||
fn load_materializing_provider(bundle_path: &Path) -> anyhow::Result<Arc<dyn ConfigProvider>> {
|
||||
fn load_materializing_provider(bundle_path: &Path) -> anyhow::Result<Arc<dyn NodeConfigSource>> {
|
||||
let bundle = load_bundle_yaml(bundle_path)?;
|
||||
let catalog = build_node_catalog(bundle);
|
||||
let provider = RegistrationConfigProvider::new(catalog);
|
||||
@ -87,12 +89,12 @@ pub async fn run_cfgsync_server(config_path: &Path) -> anyhow::Result<()> {
|
||||
fn build_server_state(
|
||||
config: &CfgSyncServerConfig,
|
||||
bundle_path: &Path,
|
||||
) -> anyhow::Result<CfgSyncState> {
|
||||
) -> anyhow::Result<CfgsyncServerState> {
|
||||
let repo = if config.registration_flow {
|
||||
load_materializing_provider(bundle_path)?
|
||||
} else {
|
||||
load_bundle_provider(bundle_path)?
|
||||
};
|
||||
|
||||
Ok(CfgSyncState::new(repo))
|
||||
Ok(CfgsyncServerState::new(repo))
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user