mirror of
https://github.com/logos-blockchain/logos-blockchain-testing.git
synced 2026-04-02 09:13:16 +00:00
Simplify cfgsync runtime naming
This commit is contained in:
parent
566a69af4c
commit
ff658e322d
@ -99,7 +99,7 @@ Use it when you want to run cfgsync rather than define its protocol:
|
|||||||
|
|
||||||
- client-side fetch/write helpers
|
- client-side fetch/write helpers
|
||||||
- server config loading
|
- server config loading
|
||||||
- direct serving helpers such as `serve_cfgsync(...)`
|
- direct serving helpers such as `serve(...)`
|
||||||
|
|
||||||
This is the crate that should feel like the normal “start here” path for users integrating cfgsync into a real system.
|
This is the crate that should feel like the normal “start here” path for users integrating cfgsync into a real system.
|
||||||
|
|
||||||
@ -150,7 +150,7 @@ For a new application, the shortest sensible path is:
|
|||||||
1. define a typed registration payload
|
1. define a typed registration payload
|
||||||
2. implement `RegistrationSnapshotMaterializer`
|
2. implement `RegistrationSnapshotMaterializer`
|
||||||
3. return node-local and optional shared artifacts
|
3. return node-local and optional shared artifacts
|
||||||
4. serve them with `serve_cfgsync(...)`
|
4. serve them with `serve(...)`
|
||||||
5. use `CfgsyncClient` or the runtime helpers on the node side
|
5. use `CfgsyncClient` or the runtime helpers on the node side
|
||||||
|
|
||||||
That gives you the main value of the library without forcing extra application logic into cfgsync itself.
|
That gives you the main value of the library without forcing extra application logic into cfgsync itself.
|
||||||
@ -227,10 +227,10 @@ impl RegistrationSnapshotMaterializer for MyMaterializer {
|
|||||||
Serving:
|
Serving:
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
use cfgsync_runtime::serve_cfgsync;
|
use cfgsync_runtime::serve;
|
||||||
|
|
||||||
# async fn run() -> anyhow::Result<()> {
|
# async fn run() -> anyhow::Result<()> {
|
||||||
serve_cfgsync(4400, MyMaterializer).await?;
|
serve(4400, MyMaterializer).await?;
|
||||||
# Ok(())
|
# Ok(())
|
||||||
# }
|
# }
|
||||||
```
|
```
|
||||||
@ -238,14 +238,14 @@ serve_cfgsync(4400, MyMaterializer).await?;
|
|||||||
Fetching and writing artifacts:
|
Fetching and writing artifacts:
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
use cfgsync_runtime::{ArtifactOutputMap, fetch_and_write_artifacts};
|
use cfgsync_runtime::{OutputMap, fetch_and_write};
|
||||||
|
|
||||||
# async fn run(registration: cfgsync_core::NodeRegistration) -> anyhow::Result<()> {
|
# async fn run(registration: cfgsync_core::NodeRegistration) -> anyhow::Result<()> {
|
||||||
let outputs = ArtifactOutputMap::new()
|
let outputs = OutputMap::new()
|
||||||
.route("/config.yaml", "/node-data/node-1/config.yaml")
|
.route("/config.yaml", "/node-data/node-1/config.yaml")
|
||||||
.route("deployment-settings.yaml", "/node-data/shared/deployment-settings.yaml");
|
.route("deployment-settings.yaml", "/node-data/shared/deployment-settings.yaml");
|
||||||
|
|
||||||
fetch_and_write_artifacts(®istration, "http://127.0.0.1:4400", &outputs).await?;
|
fetch_and_write(®istration, "http://127.0.0.1:4400", &outputs).await?;
|
||||||
# Ok(())
|
# Ok(())
|
||||||
# }
|
# }
|
||||||
```
|
```
|
||||||
|
|||||||
@ -4,7 +4,7 @@ use cfgsync_adapter::{
|
|||||||
};
|
};
|
||||||
use cfgsync_artifacts::{ArtifactFile, ArtifactSet};
|
use cfgsync_artifacts::{ArtifactFile, ArtifactSet};
|
||||||
use cfgsync_core::NodeRegistration;
|
use cfgsync_core::NodeRegistration;
|
||||||
use cfgsync_runtime::{ArtifactOutputMap, fetch_and_write_artifacts, serve_cfgsync};
|
use cfgsync_runtime::{OutputMap, fetch_and_write, serve};
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
use tokio::time::{Duration, sleep};
|
use tokio::time::{Duration, sleep};
|
||||||
|
|
||||||
@ -38,17 +38,17 @@ impl RegistrationSnapshotMaterializer for ExampleMaterializer {
|
|||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
let port = 4400;
|
let port = 4400;
|
||||||
let server = tokio::spawn(async move { serve_cfgsync(port, ExampleMaterializer).await });
|
let server = tokio::spawn(async move { serve(port, ExampleMaterializer).await });
|
||||||
|
|
||||||
// Give the server a moment to bind before the client registers.
|
// Give the server a moment to bind before the client registers.
|
||||||
sleep(Duration::from_millis(100)).await;
|
sleep(Duration::from_millis(100)).await;
|
||||||
|
|
||||||
let tempdir = tempdir()?;
|
let tempdir = tempdir()?;
|
||||||
let config_path = tempdir.path().join("config.yaml");
|
let config_path = tempdir.path().join("config.yaml");
|
||||||
let outputs = ArtifactOutputMap::new().route("/config.yaml", &config_path);
|
let outputs = OutputMap::new().route("/config.yaml", &config_path);
|
||||||
let registration = NodeRegistration::new("node-1", "127.0.0.1".parse()?);
|
let registration = NodeRegistration::new("node-1", "127.0.0.1".parse()?);
|
||||||
|
|
||||||
fetch_and_write_artifacts(®istration, "http://127.0.0.1:4400", &outputs).await?;
|
fetch_and_write(®istration, "http://127.0.0.1:4400", &outputs).await?;
|
||||||
|
|
||||||
println!("{}", std::fs::read_to_string(&config_path)?);
|
println!("{}", std::fs::read_to_string(&config_path)?);
|
||||||
|
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
use std::{env, process};
|
use std::{env, process};
|
||||||
|
|
||||||
use cfgsync_runtime::run_cfgsync_client_from_env;
|
use cfgsync_runtime::run_client_from_env;
|
||||||
|
|
||||||
const CFGSYNC_PORT_ENV: &str = "LOGOS_BLOCKCHAIN_CFGSYNC_PORT";
|
const CFGSYNC_PORT_ENV: &str = "LOGOS_BLOCKCHAIN_CFGSYNC_PORT";
|
||||||
const DEFAULT_CFGSYNC_PORT: u16 = 4400;
|
const DEFAULT_CFGSYNC_PORT: u16 = 4400;
|
||||||
@ -14,7 +14,7 @@ fn cfgsync_port() -> u16 {
|
|||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
if let Err(err) = run_cfgsync_client_from_env(cfgsync_port()).await {
|
if let Err(err) = run_client_from_env(cfgsync_port()).await {
|
||||||
eprintln!("Error: {err}");
|
eprintln!("Error: {err}");
|
||||||
process::exit(1);
|
process::exit(1);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
use cfgsync_runtime::serve_cfgsync_from_config;
|
use cfgsync_runtime::serve_from_config;
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
|
|
||||||
#[derive(Parser, Debug)]
|
#[derive(Parser, Debug)]
|
||||||
@ -12,5 +12,5 @@ struct Args {
|
|||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
let args = Args::parse();
|
let args = Args::parse();
|
||||||
serve_cfgsync_from_config(&args.config).await
|
serve_from_config(&args.config).await
|
||||||
}
|
}
|
||||||
|
|||||||
@ -19,11 +19,11 @@ const FETCH_RETRY_DELAY: Duration = Duration::from_millis(250);
|
|||||||
|
|
||||||
/// Output routing for fetched artifact files.
|
/// Output routing for fetched artifact files.
|
||||||
#[derive(Debug, Clone, Default)]
|
#[derive(Debug, Clone, Default)]
|
||||||
pub struct ArtifactOutputMap {
|
pub struct OutputMap {
|
||||||
routes: HashMap<String, PathBuf>,
|
routes: HashMap<String, PathBuf>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ArtifactOutputMap {
|
impl OutputMap {
|
||||||
/// Creates an empty artifact output map.
|
/// Creates an empty artifact output map.
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
@ -49,101 +49,142 @@ impl ArtifactOutputMap {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Runtime-oriented cfgsync client that handles registration, fetch, and local
|
||||||
|
/// artifact materialization.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct Client {
|
||||||
|
inner: CfgsyncClient,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Client {
|
||||||
|
/// Creates a runtime client that talks to the cfgsync server at
|
||||||
|
/// `server_addr`.
|
||||||
|
#[must_use]
|
||||||
|
pub fn new(server_addr: &str) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: CfgsyncClient::new(server_addr),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Registers a node and fetches its artifact payload from cfgsync.
|
||||||
|
pub async fn register_and_fetch(
|
||||||
|
&self,
|
||||||
|
registration: &NodeRegistration,
|
||||||
|
) -> Result<NodeArtifactsPayload> {
|
||||||
|
self.register_node(registration).await?;
|
||||||
|
|
||||||
|
let payload = self
|
||||||
|
.fetch_with_retry(registration)
|
||||||
|
.await
|
||||||
|
.context("fetching node artifacts")?;
|
||||||
|
ensure_schema_version(&payload)?;
|
||||||
|
|
||||||
|
Ok(payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Registers a node, fetches its artifact payload, and writes the result
|
||||||
|
/// using the provided output routing policy.
|
||||||
|
pub async fn fetch_and_write(
|
||||||
|
&self,
|
||||||
|
registration: &NodeRegistration,
|
||||||
|
outputs: &OutputMap,
|
||||||
|
) -> Result<()> {
|
||||||
|
let payload = self.register_and_fetch(registration).await?;
|
||||||
|
let files = collect_payload_files(&payload)?;
|
||||||
|
|
||||||
|
for file in files {
|
||||||
|
write_file(file, outputs)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(files = files.len(), "cfgsync files saved");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fetch_with_retry(
|
||||||
|
&self,
|
||||||
|
registration: &NodeRegistration,
|
||||||
|
) -> Result<NodeArtifactsPayload> {
|
||||||
|
for attempt in 1..=FETCH_ATTEMPTS {
|
||||||
|
match self.fetch_once(registration).await {
|
||||||
|
Ok(config) => return Ok(config),
|
||||||
|
Err(error) => {
|
||||||
|
if attempt == FETCH_ATTEMPTS {
|
||||||
|
return Err(error).with_context(|| {
|
||||||
|
format!("fetching node artifacts after {attempt} attempts")
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
sleep(FETCH_RETRY_DELAY).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unreachable!("cfgsync fetch loop always returns before exhausting attempts");
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fetch_once(&self, registration: &NodeRegistration) -> Result<NodeArtifactsPayload> {
|
||||||
|
self.inner
|
||||||
|
.fetch_node_config(registration)
|
||||||
|
.await
|
||||||
|
.map_err(Into::into)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn register_node(&self, registration: &NodeRegistration) -> Result<()> {
|
||||||
|
for attempt in 1..=FETCH_ATTEMPTS {
|
||||||
|
match self.inner.register_node(registration).await {
|
||||||
|
Ok(()) => {
|
||||||
|
info!(identifier = %registration.identifier, "cfgsync node registered");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
if attempt == FETCH_ATTEMPTS {
|
||||||
|
return Err(error).with_context(|| {
|
||||||
|
format!("registering node with cfgsync after {attempt} attempts")
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
sleep(FETCH_RETRY_DELAY).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unreachable!("cfgsync register loop always returns before exhausting attempts");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
enum ClientEnvError {
|
enum ClientEnvError {
|
||||||
#[error("CFG_HOST_IP `{value}` is not a valid IPv4 address")]
|
#[error("CFG_HOST_IP `{value}` is not a valid IPv4 address")]
|
||||||
InvalidIp { value: String },
|
InvalidIp { value: String },
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_with_retry(
|
|
||||||
payload: &NodeRegistration,
|
|
||||||
server_addr: &str,
|
|
||||||
) -> Result<NodeArtifactsPayload> {
|
|
||||||
let client = CfgsyncClient::new(server_addr);
|
|
||||||
|
|
||||||
for attempt in 1..=FETCH_ATTEMPTS {
|
|
||||||
match fetch_once(&client, payload).await {
|
|
||||||
Ok(config) => return Ok(config),
|
|
||||||
Err(error) => {
|
|
||||||
if attempt == FETCH_ATTEMPTS {
|
|
||||||
return Err(error).with_context(|| {
|
|
||||||
format!("fetching cfgsync payload after {attempt} attempts")
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
sleep(FETCH_RETRY_DELAY).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
unreachable!("cfgsync fetch loop always returns before exhausting attempts");
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn fetch_once(
|
|
||||||
client: &CfgsyncClient,
|
|
||||||
payload: &NodeRegistration,
|
|
||||||
) -> Result<NodeArtifactsPayload> {
|
|
||||||
let response = client.fetch_node_config(payload).await?;
|
|
||||||
|
|
||||||
Ok(response)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn register_node(payload: &NodeRegistration, server_addr: &str) -> Result<()> {
|
|
||||||
let client = CfgsyncClient::new(server_addr);
|
|
||||||
|
|
||||||
for attempt in 1..=FETCH_ATTEMPTS {
|
|
||||||
match client.register_node(payload).await {
|
|
||||||
Ok(()) => {
|
|
||||||
info!(identifier = %payload.identifier, "cfgsync node registered");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
Err(error) => {
|
|
||||||
if attempt == FETCH_ATTEMPTS {
|
|
||||||
return Err(error).with_context(|| {
|
|
||||||
format!("registering node with cfgsync after {attempt} attempts")
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
sleep(FETCH_RETRY_DELAY).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
unreachable!("cfgsync register loop always returns before exhausting attempts");
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Registers a node and fetches its artifact payload from cfgsync.
|
/// Registers a node and fetches its artifact payload from cfgsync.
|
||||||
pub async fn register_and_fetch_artifacts(
|
///
|
||||||
|
/// Prefer [`Client::register_and_fetch`] when you already hold a runtime
|
||||||
|
/// client value.
|
||||||
|
pub async fn register_and_fetch(
|
||||||
registration: &NodeRegistration,
|
registration: &NodeRegistration,
|
||||||
server_addr: &str,
|
server_addr: &str,
|
||||||
) -> Result<NodeArtifactsPayload> {
|
) -> Result<NodeArtifactsPayload> {
|
||||||
register_node(registration, server_addr).await?;
|
Client::new(server_addr)
|
||||||
|
.register_and_fetch(registration)
|
||||||
let payload = fetch_with_retry(registration, server_addr)
|
|
||||||
.await
|
.await
|
||||||
.context("fetching cfgsync node config")?;
|
|
||||||
ensure_schema_version(&payload)?;
|
|
||||||
|
|
||||||
Ok(payload)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Registers a node, fetches its artifact payload, and writes the files using
|
/// Registers a node, fetches its artifact payload, and writes the files using
|
||||||
/// the provided output routing policy.
|
/// the provided output routing policy.
|
||||||
pub async fn fetch_and_write_artifacts(
|
///
|
||||||
|
/// Prefer [`Client::fetch_and_write`] when you already hold a runtime client
|
||||||
|
/// value.
|
||||||
|
pub async fn fetch_and_write(
|
||||||
registration: &NodeRegistration,
|
registration: &NodeRegistration,
|
||||||
server_addr: &str,
|
server_addr: &str,
|
||||||
outputs: &ArtifactOutputMap,
|
outputs: &OutputMap,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let payload = register_and_fetch_artifacts(registration, server_addr).await?;
|
Client::new(server_addr)
|
||||||
let files = collect_payload_files(&payload)?;
|
.fetch_and_write(registration, outputs)
|
||||||
|
.await
|
||||||
for file in files {
|
|
||||||
write_cfgsync_file(file, outputs)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
info!(files = files.len(), "cfgsync files saved");
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn ensure_schema_version(config: &NodeArtifactsPayload) -> Result<()> {
|
fn ensure_schema_version(config: &NodeArtifactsPayload) -> Result<()> {
|
||||||
@ -166,7 +207,7 @@ fn collect_payload_files(config: &NodeArtifactsPayload) -> Result<&[NodeArtifact
|
|||||||
Ok(config.files())
|
Ok(config.files())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_cfgsync_file(file: &NodeArtifactFile, outputs: &ArtifactOutputMap) -> Result<()> {
|
fn write_file(file: &NodeArtifactFile, outputs: &OutputMap) -> Result<()> {
|
||||||
let path = outputs.resolve_path(file);
|
let path = outputs.resolve_path(file);
|
||||||
|
|
||||||
ensure_parent_dir(&path)?;
|
ensure_parent_dir(&path)?;
|
||||||
@ -193,8 +234,8 @@ fn ensure_parent_dir(path: &Path) -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Resolves cfgsync client inputs from environment and materializes node files.
|
/// Resolves runtime client inputs from environment and materializes node files.
|
||||||
pub async fn run_cfgsync_client_from_env(default_port: u16) -> Result<()> {
|
pub async fn run_client_from_env(default_port: u16) -> Result<()> {
|
||||||
let server_addr =
|
let server_addr =
|
||||||
env::var("CFG_SERVER_ADDR").unwrap_or_else(|_| format!("http://127.0.0.1:{default_port}"));
|
env::var("CFG_SERVER_ADDR").unwrap_or_else(|_| format!("http://127.0.0.1:{default_port}"));
|
||||||
let ip = parse_ip_env(&env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_owned()))?;
|
let ip = parse_ip_env(&env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_owned()))?;
|
||||||
@ -203,7 +244,7 @@ pub async fn run_cfgsync_client_from_env(default_port: u16) -> Result<()> {
|
|||||||
let metadata = parse_registration_payload_env()?;
|
let metadata = parse_registration_payload_env()?;
|
||||||
let outputs = build_output_map();
|
let outputs = build_output_map();
|
||||||
|
|
||||||
fetch_and_write_artifacts(
|
fetch_and_write(
|
||||||
&NodeRegistration::new(identifier, ip).with_payload(metadata),
|
&NodeRegistration::new(identifier, ip).with_payload(metadata),
|
||||||
&server_addr,
|
&server_addr,
|
||||||
&outputs,
|
&outputs,
|
||||||
@ -232,8 +273,8 @@ fn parse_registration_payload(raw: &str) -> Result<RegistrationPayload> {
|
|||||||
RegistrationPayload::from_json_str(raw).context("parsing CFG_REGISTRATION_METADATA_JSON")
|
RegistrationPayload::from_json_str(raw).context("parsing CFG_REGISTRATION_METADATA_JSON")
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build_output_map() -> ArtifactOutputMap {
|
fn build_output_map() -> OutputMap {
|
||||||
let mut outputs = ArtifactOutputMap::default();
|
let mut outputs = OutputMap::default();
|
||||||
|
|
||||||
if let Ok(path) = env::var("CFG_FILE_PATH") {
|
if let Ok(path) = env::var("CFG_FILE_PATH") {
|
||||||
outputs = outputs
|
outputs = outputs
|
||||||
@ -255,7 +296,6 @@ fn build_output_map() -> ArtifactOutputMap {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use cfgsync_core::{
|
use cfgsync_core::{
|
||||||
CfgsyncServerState, NodeArtifactsBundle, NodeArtifactsBundleEntry, StaticConfigSource,
|
CfgsyncServerState, NodeArtifactsBundle, NodeArtifactsBundleEntry, StaticConfigSource,
|
||||||
serve_cfgsync,
|
|
||||||
};
|
};
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
|
|
||||||
@ -280,15 +320,15 @@ mod tests {
|
|||||||
let port = allocate_test_port();
|
let port = allocate_test_port();
|
||||||
let address = format!("http://127.0.0.1:{port}");
|
let address = format!("http://127.0.0.1:{port}");
|
||||||
let server = tokio::spawn(async move {
|
let server = tokio::spawn(async move {
|
||||||
serve_cfgsync(port, state)
|
cfgsync_core::serve_cfgsync(port, state)
|
||||||
.await
|
.await
|
||||||
.expect("run cfgsync server");
|
.expect("run cfgsync server");
|
||||||
});
|
});
|
||||||
|
|
||||||
fetch_and_write_artifacts(
|
fetch_and_write(
|
||||||
&NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")),
|
&NodeRegistration::new("node-1", "127.0.0.1".parse().expect("parse ip")),
|
||||||
&address,
|
&address,
|
||||||
&ArtifactOutputMap::default(),
|
&OutputMap::default(),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.expect("pull config files");
|
.expect("pull config files");
|
||||||
|
|||||||
@ -3,12 +3,8 @@ pub use cfgsync_core as core;
|
|||||||
mod client;
|
mod client;
|
||||||
mod server;
|
mod server;
|
||||||
|
|
||||||
pub use client::{
|
pub use client::{Client, OutputMap, fetch_and_write, register_and_fetch, run_client_from_env};
|
||||||
ArtifactOutputMap, fetch_and_write_artifacts, register_and_fetch_artifacts,
|
|
||||||
run_cfgsync_client_from_env,
|
|
||||||
};
|
|
||||||
pub use server::{
|
pub use server::{
|
||||||
CfgsyncServerConfig, CfgsyncServerSource, LoadCfgsyncServerConfigError, build_cfgsync_router,
|
LoadServerConfigError, ServerConfig, ServerSource, build_persisted_router, build_router, serve,
|
||||||
build_persisted_cfgsync_router, serve_cfgsync, serve_cfgsync_from_config,
|
serve_from_config, serve_persisted,
|
||||||
serve_persisted_cfgsync,
|
|
||||||
};
|
};
|
||||||
|
|||||||
@ -15,11 +15,11 @@ use thiserror::Error;
|
|||||||
|
|
||||||
/// Runtime cfgsync server config loaded from YAML.
|
/// Runtime cfgsync server config loaded from YAML.
|
||||||
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
|
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
|
||||||
pub struct CfgsyncServerConfig {
|
pub struct ServerConfig {
|
||||||
/// HTTP port to bind the cfgsync server on.
|
/// HTTP port to bind the cfgsync server on.
|
||||||
pub port: u16,
|
pub port: u16,
|
||||||
/// Source used by the runtime-managed cfgsync server.
|
/// Source used by the runtime-managed cfgsync server.
|
||||||
pub source: CfgsyncServerSource,
|
pub source: ServerSource,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Runtime cfgsync source loaded from config.
|
/// Runtime cfgsync source loaded from config.
|
||||||
@ -31,7 +31,7 @@ pub struct CfgsyncServerConfig {
|
|||||||
/// before receiving already-materialized artifacts
|
/// before receiving already-materialized artifacts
|
||||||
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
|
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
|
||||||
#[serde(tag = "kind", rename_all = "snake_case")]
|
#[serde(tag = "kind", rename_all = "snake_case")]
|
||||||
pub enum CfgsyncServerSource {
|
pub enum ServerSource {
|
||||||
/// Serve a static precomputed artifact bundle directly.
|
/// Serve a static precomputed artifact bundle directly.
|
||||||
Bundle { bundle_path: String },
|
Bundle { bundle_path: String },
|
||||||
/// Require node registration before serving precomputed artifacts.
|
/// Require node registration before serving precomputed artifacts.
|
||||||
@ -39,7 +39,7 @@ pub enum CfgsyncServerSource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
pub enum LoadCfgsyncServerConfigError {
|
pub enum LoadServerConfigError {
|
||||||
#[error("failed to read cfgsync config file {path}: {source}")]
|
#[error("failed to read cfgsync config file {path}: {source}")]
|
||||||
Read {
|
Read {
|
||||||
path: String,
|
path: String,
|
||||||
@ -54,23 +54,22 @@ pub enum LoadCfgsyncServerConfigError {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CfgsyncServerConfig {
|
impl ServerConfig {
|
||||||
/// Loads cfgsync runtime server config from a YAML file.
|
/// Loads cfgsync runtime server config from a YAML file.
|
||||||
pub fn load_from_file(path: &Path) -> Result<Self, LoadCfgsyncServerConfigError> {
|
pub fn load_from_file(path: &Path) -> Result<Self, LoadServerConfigError> {
|
||||||
let config_path = path.display().to_string();
|
let config_path = path.display().to_string();
|
||||||
let config_content =
|
let config_content =
|
||||||
fs::read_to_string(path).map_err(|source| LoadCfgsyncServerConfigError::Read {
|
fs::read_to_string(path).map_err(|source| LoadServerConfigError::Read {
|
||||||
path: config_path.clone(),
|
path: config_path.clone(),
|
||||||
source,
|
source,
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let config: CfgsyncServerConfig =
|
let config: ServerConfig = serde_yaml::from_str(&config_content).map_err(|source| {
|
||||||
serde_yaml::from_str(&config_content).map_err(|source| {
|
LoadServerConfigError::Parse {
|
||||||
LoadCfgsyncServerConfigError::Parse {
|
path: config_path,
|
||||||
path: config_path,
|
source,
|
||||||
source,
|
}
|
||||||
}
|
})?;
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(config)
|
Ok(config)
|
||||||
}
|
}
|
||||||
@ -79,7 +78,7 @@ impl CfgsyncServerConfig {
|
|||||||
pub fn for_bundle(port: u16, bundle_path: impl Into<String>) -> Self {
|
pub fn for_bundle(port: u16, bundle_path: impl Into<String>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
port,
|
port,
|
||||||
source: CfgsyncServerSource::Bundle {
|
source: ServerSource::Bundle {
|
||||||
bundle_path: bundle_path.into(),
|
bundle_path: bundle_path.into(),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -91,7 +90,7 @@ impl CfgsyncServerConfig {
|
|||||||
pub fn for_registration(port: u16, artifacts_path: impl Into<String>) -> Self {
|
pub fn for_registration(port: u16, artifacts_path: impl Into<String>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
port,
|
port,
|
||||||
source: CfgsyncServerSource::Registration {
|
source: ServerSource::Registration {
|
||||||
artifacts_path: artifacts_path.into(),
|
artifacts_path: artifacts_path.into(),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -143,8 +142,8 @@ fn resolve_bundle_path(config_path: &Path, bundle_path: &str) -> std::path::Path
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Loads runtime config and starts cfgsync HTTP server process.
|
/// Loads runtime config and starts cfgsync HTTP server process.
|
||||||
pub async fn serve_cfgsync_from_config(config_path: &Path) -> anyhow::Result<()> {
|
pub async fn serve_from_config(config_path: &Path) -> anyhow::Result<()> {
|
||||||
let config = CfgsyncServerConfig::load_from_file(config_path)?;
|
let config = ServerConfig::load_from_file(config_path)?;
|
||||||
let bundle_path = resolve_source_path(config_path, &config.source);
|
let bundle_path = resolve_source_path(config_path, &config.source);
|
||||||
|
|
||||||
let state = build_server_state(&config, &bundle_path)?;
|
let state = build_server_state(&config, &bundle_path)?;
|
||||||
@ -162,7 +161,7 @@ pub async fn serve_cfgsync_from_config(config_path: &Path) -> anyhow::Result<()>
|
|||||||
/// - artifact serving
|
/// - artifact serving
|
||||||
///
|
///
|
||||||
/// while the app owns only snapshot materialization logic.
|
/// while the app owns only snapshot materialization logic.
|
||||||
pub fn build_cfgsync_router<M>(materializer: M) -> Router
|
pub fn build_router<M>(materializer: M) -> Router
|
||||||
where
|
where
|
||||||
M: RegistrationSnapshotMaterializer + 'static,
|
M: RegistrationSnapshotMaterializer + 'static,
|
||||||
{
|
{
|
||||||
@ -175,7 +174,7 @@ where
|
|||||||
///
|
///
|
||||||
/// Use this when the application wants cfgsync to persist or publish shared
|
/// Use this when the application wants cfgsync to persist or publish shared
|
||||||
/// artifacts after a snapshot becomes ready.
|
/// artifacts after a snapshot becomes ready.
|
||||||
pub fn build_persisted_cfgsync_router<M, S>(materializer: M, sink: S) -> Router
|
pub fn build_persisted_router<M, S>(materializer: M, sink: S) -> Router
|
||||||
where
|
where
|
||||||
M: RegistrationSnapshotMaterializer + 'static,
|
M: RegistrationSnapshotMaterializer + 'static,
|
||||||
S: MaterializedArtifactsSink + 'static,
|
S: MaterializedArtifactsSink + 'static,
|
||||||
@ -192,11 +191,11 @@ where
|
|||||||
///
|
///
|
||||||
/// This is the simplest runtime entrypoint when the application already has a
|
/// This is the simplest runtime entrypoint when the application already has a
|
||||||
/// materializer value and does not need to compose extra routes.
|
/// materializer value and does not need to compose extra routes.
|
||||||
pub async fn serve_cfgsync<M>(port: u16, materializer: M) -> Result<(), RunCfgsyncError>
|
pub async fn serve<M>(port: u16, materializer: M) -> Result<(), RunCfgsyncError>
|
||||||
where
|
where
|
||||||
M: RegistrationSnapshotMaterializer + 'static,
|
M: RegistrationSnapshotMaterializer + 'static,
|
||||||
{
|
{
|
||||||
let router = build_cfgsync_router(materializer);
|
let router = build_router(materializer);
|
||||||
serve_router(port, router).await
|
serve_router(port, router).await
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -204,8 +203,8 @@ where
|
|||||||
/// materialization results.
|
/// materialization results.
|
||||||
///
|
///
|
||||||
/// This is the direct serving counterpart to
|
/// This is the direct serving counterpart to
|
||||||
/// [`build_persisted_cfgsync_router`].
|
/// [`build_persisted_router`].
|
||||||
pub async fn serve_persisted_cfgsync<M, S>(
|
pub async fn serve_persisted<M, S>(
|
||||||
port: u16,
|
port: u16,
|
||||||
materializer: M,
|
materializer: M,
|
||||||
sink: S,
|
sink: S,
|
||||||
@ -214,7 +213,7 @@ where
|
|||||||
M: RegistrationSnapshotMaterializer + 'static,
|
M: RegistrationSnapshotMaterializer + 'static,
|
||||||
S: MaterializedArtifactsSink + 'static,
|
S: MaterializedArtifactsSink + 'static,
|
||||||
{
|
{
|
||||||
let router = build_persisted_cfgsync_router(materializer, sink);
|
let router = build_persisted_router(materializer, sink);
|
||||||
serve_router(port, router).await
|
serve_router(port, router).await
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -232,23 +231,21 @@ async fn serve_router(port: u16, router: Router) -> Result<(), RunCfgsyncError>
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn build_server_state(
|
fn build_server_state(
|
||||||
config: &CfgsyncServerConfig,
|
config: &ServerConfig,
|
||||||
source_path: &Path,
|
source_path: &Path,
|
||||||
) -> anyhow::Result<CfgsyncServerState> {
|
) -> anyhow::Result<CfgsyncServerState> {
|
||||||
let repo = match &config.source {
|
let repo = match &config.source {
|
||||||
CfgsyncServerSource::Bundle { .. } => load_bundle_provider(source_path)?,
|
ServerSource::Bundle { .. } => load_bundle_provider(source_path)?,
|
||||||
CfgsyncServerSource::Registration { .. } => load_registration_source(source_path)?,
|
ServerSource::Registration { .. } => load_registration_source(source_path)?,
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(CfgsyncServerState::new(repo))
|
Ok(CfgsyncServerState::new(repo))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn resolve_source_path(config_path: &Path, source: &CfgsyncServerSource) -> std::path::PathBuf {
|
fn resolve_source_path(config_path: &Path, source: &ServerSource) -> std::path::PathBuf {
|
||||||
match source {
|
match source {
|
||||||
CfgsyncServerSource::Bundle { bundle_path } => {
|
ServerSource::Bundle { bundle_path } => resolve_bundle_path(config_path, bundle_path),
|
||||||
resolve_bundle_path(config_path, bundle_path)
|
ServerSource::Registration { artifacts_path } => {
|
||||||
}
|
|
||||||
CfgsyncServerSource::Registration { artifacts_path } => {
|
|
||||||
resolve_bundle_path(config_path, artifacts_path)
|
resolve_bundle_path(config_path, artifacts_path)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user