diff --git a/Cargo.lock b/Cargo.lock index dbfd17a..4b740bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -956,6 +956,7 @@ name = "cfgsync-runtime" version = "0.1.0" dependencies = [ "anyhow", + "axum", "cfgsync-adapter", "cfgsync-core", "clap", diff --git a/cfgsync/runtime/Cargo.toml b/cfgsync/runtime/Cargo.toml index f708ba1..547b182 100644 --- a/cfgsync/runtime/Cargo.toml +++ b/cfgsync/runtime/Cargo.toml @@ -14,6 +14,7 @@ workspace = true [dependencies] anyhow = "1" +axum = { default-features = false, features = ["http1", "http2", "tokio"], version = "0.7.5" } cfgsync-adapter = { workspace = true } cfgsync-core = { workspace = true } clap = { version = "4", features = ["derive"] } diff --git a/cfgsync/runtime/src/lib.rs b/cfgsync/runtime/src/lib.rs index 8a7337f..92c0192 100644 --- a/cfgsync/runtime/src/lib.rs +++ b/cfgsync/runtime/src/lib.rs @@ -9,5 +9,6 @@ pub use client::{ }; pub use server::{ CfgsyncServerConfig, CfgsyncServingMode, LoadCfgsyncServerConfigError, - serve_cfgsync_from_config, serve_snapshot_cfgsync, + build_persisted_snapshot_cfgsync_router, build_snapshot_cfgsync_router, + serve_cfgsync_from_config, serve_persisted_snapshot_cfgsync, serve_snapshot_cfgsync, }; diff --git a/cfgsync/runtime/src/server.rs b/cfgsync/runtime/src/server.rs index 6bae042..d9beda2 100644 --- a/cfgsync/runtime/src/server.rs +++ b/cfgsync/runtime/src/server.rs @@ -1,13 +1,14 @@ use std::{fs, path::Path, sync::Arc}; use anyhow::Context as _; +use axum::Router; use cfgsync_adapter::{ - ArtifactSet, CachedSnapshotMaterializer, MaterializedArtifacts, - RegistrationSnapshotMaterializer, SnapshotConfigSource, + ArtifactSet, CachedSnapshotMaterializer, MaterializedArtifacts, MaterializedArtifactsSink, + PersistingSnapshotMaterializer, RegistrationSnapshotMaterializer, SnapshotConfigSource, }; use cfgsync_core::{ BundleConfigSource, CfgsyncServerState, NodeArtifactsBundle, NodeConfigSource, RunCfgsyncError, - serve_cfgsync, + build_cfgsync_router, serve_cfgsync, }; use serde::Deserialize; use thiserror::Error; @@ -144,16 +145,66 @@ pub async fn serve_cfgsync_from_config(config_path: &Path) -> anyhow::Result<()> Ok(()) } +/// Builds a registration-backed cfgsync router directly from a snapshot +/// materializer. +pub fn build_snapshot_cfgsync_router(materializer: M) -> Router +where + M: RegistrationSnapshotMaterializer + 'static, +{ + let provider = SnapshotConfigSource::new(CachedSnapshotMaterializer::new(materializer)); + build_cfgsync_router(CfgsyncServerState::new(Arc::new(provider))) +} + +/// Builds a registration-backed cfgsync router with a persistence hook for +/// ready materialization results. +pub fn build_persisted_snapshot_cfgsync_router(materializer: M, sink: S) -> Router +where + M: RegistrationSnapshotMaterializer + 'static, + S: MaterializedArtifactsSink + 'static, +{ + let provider = SnapshotConfigSource::new(CachedSnapshotMaterializer::new( + PersistingSnapshotMaterializer::new(materializer, sink), + )); + + build_cfgsync_router(CfgsyncServerState::new(Arc::new(provider))) +} + /// Runs a registration-backed cfgsync server directly from a snapshot /// materializer. pub async fn serve_snapshot_cfgsync(port: u16, materializer: M) -> Result<(), RunCfgsyncError> where M: RegistrationSnapshotMaterializer + 'static, { - let provider = SnapshotConfigSource::new(CachedSnapshotMaterializer::new(materializer)); - let state = CfgsyncServerState::new(Arc::new(provider)); + let router = build_snapshot_cfgsync_router(materializer); + serve_router(port, router).await +} - serve_cfgsync(port, state).await +/// Runs a registration-backed cfgsync server with a persistence hook for ready +/// materialization results. +pub async fn serve_persisted_snapshot_cfgsync( + port: u16, + materializer: M, + sink: S, +) -> Result<(), RunCfgsyncError> +where + M: RegistrationSnapshotMaterializer + 'static, + S: MaterializedArtifactsSink + 'static, +{ + let router = build_persisted_snapshot_cfgsync_router(materializer, sink); + serve_router(port, router).await +} + +async fn serve_router(port: u16, router: Router) -> Result<(), RunCfgsyncError> { + let bind_addr = format!("0.0.0.0:{port}"); + let listener = tokio::net::TcpListener::bind(&bind_addr) + .await + .map_err(|source| RunCfgsyncError::Bind { bind_addr, source })?; + + axum::serve(listener, router) + .await + .map_err(|source| RunCfgsyncError::Serve { source })?; + + Ok(()) } fn build_server_state(