From 96dc957881f9ff89e9dc9024d2da30314c1c44bb Mon Sep 17 00:00:00 2001 From: andrussal Date: Thu, 12 Mar 2026 10:03:02 +0100 Subject: [PATCH] Add focused cfgsync examples --- cfgsync/README.md | 31 ++++--- cfgsync/runtime/examples/minimal_cfgsync.rs | 8 +- .../precomputed_registration_cfgsync.rs | 73 +++++++++++++++++ .../wait_for_registrations_cfgsync.rs | 81 +++++++++++++++++++ 4 files changed, 174 insertions(+), 19 deletions(-) create mode 100644 cfgsync/runtime/examples/precomputed_registration_cfgsync.rs create mode 100644 cfgsync/runtime/examples/wait_for_registrations_cfgsync.rs diff --git a/cfgsync/README.md b/cfgsync/README.md index 83ca201..abeaed4 100644 --- a/cfgsync/README.md +++ b/cfgsync/README.md @@ -145,21 +145,19 @@ Those belong in the adapter or in the consuming application. ## Start here -If you want the shortest path into the library, start with the end-to-end -runtime example: +Start with the examples in `cfgsync/runtime/examples/`. -- `cfgsync/runtime/examples/minimal_cfgsync.rs` +- `minimal_cfgsync.rs` shows the smallest complete flow: serve cfgsync, register + one node, fetch artifacts, and write them locally. +- `precomputed_registration_cfgsync.rs` shows how precomputed artifacts still + use the same registration flow, including a later node that joins after the + server is already running. +- `wait_for_registrations_cfgsync.rs` shows the normal `NotReady` path: one node + waits until the materializer sees enough registrations, then both nodes + receive config. -It shows the full loop: - -- define a snapshot materializer -- serve cfgsync -- register a node -- fetch artifacts -- write them locally - -After that, the only concepts you usually need to learn are the ones in the -next section. +Those three examples cover the full public model. The rest of this README just +names the pieces and explains where application-specific logic belongs. ## Minimal integration path @@ -169,11 +167,12 @@ For a new application, the shortest sensible path is: 2. implement `RegistrationSnapshotMaterializer` 3. return node-local and optional shared artifacts 4. serve them with `serve(...)` -5. use `CfgsyncClient` or the runtime helpers on the node side +5. use `Client` on the node side -That gives you the main value of the library without forcing extra application logic into cfgsync itself. +That gives you the main value of the library without pushing application logic +into cfgsync itself. -## Code sketch +## API sketch Typed registration payload: diff --git a/cfgsync/runtime/examples/minimal_cfgsync.rs b/cfgsync/runtime/examples/minimal_cfgsync.rs index d61f1d7..38db4f0 100644 --- a/cfgsync/runtime/examples/minimal_cfgsync.rs +++ b/cfgsync/runtime/examples/minimal_cfgsync.rs @@ -44,15 +44,17 @@ async fn main() -> anyhow::Result<()> { sleep(Duration::from_millis(100)).await; let tempdir = tempdir()?; - let config_path = tempdir.path().join("config.yaml"); - let outputs = OutputMap::new().route("/config.yaml", &config_path); + let outputs = OutputMap::under(tempdir.path()); let registration = NodeRegistration::new("node-1", "127.0.0.1".parse()?); Client::new("http://127.0.0.1:4400") .fetch_and_write(®istration, &outputs) .await?; - println!("{}", std::fs::read_to_string(&config_path)?); + println!( + "{}", + std::fs::read_to_string(tempdir.path().join("config.yaml"))? + ); server.abort(); Ok(()) diff --git a/cfgsync/runtime/examples/precomputed_registration_cfgsync.rs b/cfgsync/runtime/examples/precomputed_registration_cfgsync.rs new file mode 100644 index 0000000..dc5774b --- /dev/null +++ b/cfgsync/runtime/examples/precomputed_registration_cfgsync.rs @@ -0,0 +1,73 @@ +use cfgsync_adapter::MaterializedArtifacts; +use cfgsync_artifacts::{ArtifactFile, ArtifactSet}; +use cfgsync_core::NodeRegistration; +use cfgsync_runtime::{Client, OutputMap, serve}; +use tempfile::tempdir; +use tokio::time::{Duration, sleep}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let port = 4401; + let artifacts = MaterializedArtifacts::from_nodes([ + ( + "node-1".to_owned(), + ArtifactSet::new(vec![ArtifactFile::new("/config.yaml", "id: node-1\n")]), + ), + ( + "node-2".to_owned(), + ArtifactSet::new(vec![ArtifactFile::new("/config.yaml", "id: node-2\n")]), + ), + ]) + .with_shared(ArtifactSet::new(vec![ArtifactFile::new( + "/shared/cluster.yaml", + "cluster: demo\n", + )])); + + let server = tokio::spawn(async move { serve(port, artifacts).await }); + + // Give the server a moment to bind before clients register. + sleep(Duration::from_millis(100)).await; + + let node_1_dir = tempdir()?; + let node_1_outputs = OutputMap::config_and_shared( + node_1_dir.path().join("config.yaml"), + node_1_dir.path().join("shared"), + ); + let node_1 = NodeRegistration::new("node-1", "127.0.0.1".parse()?); + + Client::new("http://127.0.0.1:4401") + .fetch_and_write(&node_1, &node_1_outputs) + .await?; + + println!( + "node-1 config:\n{}", + std::fs::read_to_string(node_1_dir.path().join("config.yaml"))? + ); + + // A later node still uses the same registration/fetch flow. The artifacts + // were already known; registration only gates delivery. + sleep(Duration::from_millis(250)).await; + + let node_2_dir = tempdir()?; + let node_2_outputs = OutputMap::config_and_shared( + node_2_dir.path().join("config.yaml"), + node_2_dir.path().join("shared"), + ); + let node_2 = NodeRegistration::new("node-2", "127.0.0.2".parse()?); + + Client::new("http://127.0.0.1:4401") + .fetch_and_write(&node_2, &node_2_outputs) + .await?; + + println!( + "node-2 config:\n{}", + std::fs::read_to_string(node_2_dir.path().join("config.yaml"))? + ); + println!( + "shared artifact:\n{}", + std::fs::read_to_string(node_2_dir.path().join("shared/shared/cluster.yaml"))? + ); + + server.abort(); + Ok(()) +} diff --git a/cfgsync/runtime/examples/wait_for_registrations_cfgsync.rs b/cfgsync/runtime/examples/wait_for_registrations_cfgsync.rs new file mode 100644 index 0000000..6e25151 --- /dev/null +++ b/cfgsync/runtime/examples/wait_for_registrations_cfgsync.rs @@ -0,0 +1,81 @@ +use cfgsync_adapter::{ + DynCfgsyncError, MaterializationResult, MaterializedArtifacts, RegistrationSnapshot, + RegistrationSnapshotMaterializer, +}; +use cfgsync_artifacts::{ArtifactFile, ArtifactSet}; +use cfgsync_core::NodeRegistration; +use cfgsync_runtime::{Client, OutputMap, serve}; +use tempfile::tempdir; +use tokio::time::{Duration, sleep}; + +struct ThresholdMaterializer; + +impl RegistrationSnapshotMaterializer for ThresholdMaterializer { + fn materialize_snapshot( + &self, + registrations: &RegistrationSnapshot, + ) -> Result { + if registrations.len() < 2 { + return Ok(MaterializationResult::NotReady); + } + + let nodes = registrations.iter().map(|registration| { + ( + registration.identifier.clone(), + ArtifactSet::new(vec![ArtifactFile::new( + "/config.yaml", + format!("id: {}\ncluster_ready: true\n", registration.identifier), + )]), + ) + }); + + Ok(MaterializationResult::ready( + MaterializedArtifacts::from_nodes(nodes), + )) + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let port = 4402; + let server = tokio::spawn(async move { serve(port, ThresholdMaterializer).await }); + + sleep(Duration::from_millis(100)).await; + + let waiting_dir = tempdir()?; + let waiting_outputs = OutputMap::under(waiting_dir.path()); + let waiting_node = NodeRegistration::new("node-1", "127.0.0.1".parse()?); + let waiting_client = Client::new("http://127.0.0.1:4402"); + + let waiting_task = tokio::spawn(async move { + waiting_client + .fetch_and_write(&waiting_node, &waiting_outputs) + .await + }); + + // node-1 is now polling. The materializer will keep returning NotReady + // until node-2 registers. + sleep(Duration::from_millis(400)).await; + + let second_dir = tempdir()?; + let second_outputs = OutputMap::under(second_dir.path()); + let second_node = NodeRegistration::new("node-2", "127.0.0.2".parse()?); + + Client::new("http://127.0.0.1:4402") + .fetch_and_write(&second_node, &second_outputs) + .await?; + + waiting_task.await??; + + println!( + "node-1 config after threshold reached:\n{}", + std::fs::read_to_string(waiting_dir.path().join("config.yaml"))? + ); + println!( + "node-2 config:\n{}", + std::fs::read_to_string(second_dir.path().join("config.yaml"))? + ); + + server.abort(); + Ok(()) +}