mirror of
https://github.com/logos-blockchain/logos-blockchain-testing.git
synced 2026-03-31 08:13:48 +00:00
Add direct cfgsync materializer serving
This commit is contained in:
parent
95d1a75116
commit
daadbcfa15
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -956,6 +956,7 @@ name = "cfgsync-runtime"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"axum",
|
||||
"cfgsync-adapter",
|
||||
"cfgsync-core",
|
||||
"clap",
|
||||
|
||||
@ -14,6 +14,7 @@ workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1"
|
||||
axum = { default-features = false, features = ["http1", "http2", "tokio"], version = "0.7.5" }
|
||||
cfgsync-adapter = { workspace = true }
|
||||
cfgsync-core = { workspace = true }
|
||||
clap = { version = "4", features = ["derive"] }
|
||||
|
||||
@ -9,5 +9,6 @@ pub use client::{
|
||||
};
|
||||
pub use server::{
|
||||
CfgsyncServerConfig, CfgsyncServingMode, LoadCfgsyncServerConfigError,
|
||||
serve_cfgsync_from_config, serve_snapshot_cfgsync,
|
||||
build_persisted_snapshot_cfgsync_router, build_snapshot_cfgsync_router,
|
||||
serve_cfgsync_from_config, serve_persisted_snapshot_cfgsync, serve_snapshot_cfgsync,
|
||||
};
|
||||
|
||||
@ -1,13 +1,14 @@
|
||||
use std::{fs, path::Path, sync::Arc};
|
||||
|
||||
use anyhow::Context as _;
|
||||
use axum::Router;
|
||||
use cfgsync_adapter::{
|
||||
ArtifactSet, CachedSnapshotMaterializer, MaterializedArtifacts,
|
||||
RegistrationSnapshotMaterializer, SnapshotConfigSource,
|
||||
ArtifactSet, CachedSnapshotMaterializer, MaterializedArtifacts, MaterializedArtifactsSink,
|
||||
PersistingSnapshotMaterializer, RegistrationSnapshotMaterializer, SnapshotConfigSource,
|
||||
};
|
||||
use cfgsync_core::{
|
||||
BundleConfigSource, CfgsyncServerState, NodeArtifactsBundle, NodeConfigSource, RunCfgsyncError,
|
||||
serve_cfgsync,
|
||||
build_cfgsync_router, serve_cfgsync,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use thiserror::Error;
|
||||
@ -144,16 +145,66 @@ pub async fn serve_cfgsync_from_config(config_path: &Path) -> anyhow::Result<()>
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Builds a registration-backed cfgsync router directly from a snapshot
|
||||
/// materializer.
|
||||
pub fn build_snapshot_cfgsync_router<M>(materializer: M) -> Router
|
||||
where
|
||||
M: RegistrationSnapshotMaterializer + 'static,
|
||||
{
|
||||
let provider = SnapshotConfigSource::new(CachedSnapshotMaterializer::new(materializer));
|
||||
build_cfgsync_router(CfgsyncServerState::new(Arc::new(provider)))
|
||||
}
|
||||
|
||||
/// Builds a registration-backed cfgsync router with a persistence hook for
|
||||
/// ready materialization results.
|
||||
pub fn build_persisted_snapshot_cfgsync_router<M, S>(materializer: M, sink: S) -> Router
|
||||
where
|
||||
M: RegistrationSnapshotMaterializer + 'static,
|
||||
S: MaterializedArtifactsSink + 'static,
|
||||
{
|
||||
let provider = SnapshotConfigSource::new(CachedSnapshotMaterializer::new(
|
||||
PersistingSnapshotMaterializer::new(materializer, sink),
|
||||
));
|
||||
|
||||
build_cfgsync_router(CfgsyncServerState::new(Arc::new(provider)))
|
||||
}
|
||||
|
||||
/// Runs a registration-backed cfgsync server directly from a snapshot
|
||||
/// materializer.
|
||||
pub async fn serve_snapshot_cfgsync<M>(port: u16, materializer: M) -> Result<(), RunCfgsyncError>
|
||||
where
|
||||
M: RegistrationSnapshotMaterializer + 'static,
|
||||
{
|
||||
let provider = SnapshotConfigSource::new(CachedSnapshotMaterializer::new(materializer));
|
||||
let state = CfgsyncServerState::new(Arc::new(provider));
|
||||
let router = build_snapshot_cfgsync_router(materializer);
|
||||
serve_router(port, router).await
|
||||
}
|
||||
|
||||
serve_cfgsync(port, state).await
|
||||
/// Runs a registration-backed cfgsync server with a persistence hook for ready
|
||||
/// materialization results.
|
||||
pub async fn serve_persisted_snapshot_cfgsync<M, S>(
|
||||
port: u16,
|
||||
materializer: M,
|
||||
sink: S,
|
||||
) -> Result<(), RunCfgsyncError>
|
||||
where
|
||||
M: RegistrationSnapshotMaterializer + 'static,
|
||||
S: MaterializedArtifactsSink + 'static,
|
||||
{
|
||||
let router = build_persisted_snapshot_cfgsync_router(materializer, sink);
|
||||
serve_router(port, router).await
|
||||
}
|
||||
|
||||
async fn serve_router(port: u16, router: Router) -> Result<(), RunCfgsyncError> {
|
||||
let bind_addr = format!("0.0.0.0:{port}");
|
||||
let listener = tokio::net::TcpListener::bind(&bind_addr)
|
||||
.await
|
||||
.map_err(|source| RunCfgsyncError::Bind { bind_addr, source })?;
|
||||
|
||||
axum::serve(listener, router)
|
||||
.await
|
||||
.map_err(|source| RunCfgsyncError::Serve { source })?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn build_server_state(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user