diff --git a/Cargo.lock b/Cargo.lock index 471e158..395a13c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -177,11 +177,13 @@ dependencies = [ "serde", "serde_json", "serde_path_to_error", + "serde_urlencoded", "sync_wrapper", "tokio", "tower", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -202,6 +204,7 @@ dependencies = [ "sync_wrapper", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -1385,6 +1388,65 @@ dependencies = [ "tracing", ] +[[package]] +name = "kvstore-examples" +version = "0.1.0" +dependencies = [ + "anyhow", + "kvstore-node", + "kvstore-runtime-ext", + "kvstore-runtime-workloads", + "serde", + "testing-framework-core", + "testing-framework-runner-compose", + "testing-framework-runner-k8s", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "kvstore-node" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum", + "clap", + "reqwest", + "serde", + "serde_yaml", + "tokio", + "tower-http", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "kvstore-runtime-ext" +version = "0.1.0" +dependencies = [ + "async-trait", + "kvstore-node", + "serde", + "testing-framework-core", + "testing-framework-runner-compose", + "testing-framework-runner-k8s", + "testing-framework-runner-local", +] + +[[package]] +name = "kvstore-runtime-workloads" +version = "0.1.0" +dependencies = [ + "async-trait", + "kvstore-node", + "kvstore-runtime-ext", + "serde", + "testing-framework-core", + "tokio", + "tracing", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -1436,6 +1498,15 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "matchit" version = "0.7.3" @@ -1482,6 +1553,15 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "num-conv" version = "0.2.0" @@ -1803,6 +1883,62 @@ dependencies = [ "url", ] +[[package]] +name = "queue-examples" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "queue-runtime-ext", + "queue-runtime-workloads", + "testing-framework-core", + "testing-framework-runner-compose", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "queue-node" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum", + "clap", + "reqwest", + "serde", + "serde_yaml", + "tokio", + "tower-http", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "queue-runtime-ext" +version = "0.1.0" +dependencies = [ + "async-trait", + "queue-node", + "serde", + "testing-framework-core", + "testing-framework-runner-compose", + "testing-framework-runner-local", +] + +[[package]] +name = "queue-runtime-workloads" +version = "0.1.0" +dependencies = [ + "async-trait", + "queue-node", + "queue-runtime-ext", + "serde", + "testing-framework-core", + "tokio", + "tracing", +] + [[package]] name = "quote" version = "1.0.44" @@ -2048,6 +2184,61 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "scheduler-examples" +version = "0.1.0" +dependencies = [ + "anyhow", + "scheduler-runtime-ext", + "scheduler-runtime-workloads", + "testing-framework-core", + "testing-framework-runner-compose", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "scheduler-node" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum", + "clap", + "reqwest", + "serde", + "serde_yaml", + "tokio", + "tower-http", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "scheduler-runtime-ext" +version = "0.1.0" +dependencies = [ + "async-trait", + "scheduler-node", + "serde", + "testing-framework-core", + "testing-framework-runner-compose", + "testing-framework-runner-local", +] + +[[package]] +name = "scheduler-runtime-workloads" +version = "0.1.0" +dependencies = [ + "async-trait", + "scheduler-node", + "scheduler-runtime-ext", + "serde", + "testing-framework-core", + "tokio", + "tracing", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -2205,6 +2396,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -2465,6 +2665,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "time" version = "0.3.47" @@ -2514,6 +2723,7 @@ dependencies = [ "bytes", "libc", "mio", + "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", @@ -2657,6 +2867,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -2737,6 +2977,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index d24851f..4bba838 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,18 @@ members = [ "cfgsync/artifacts", "cfgsync/core", "cfgsync/runtime", + "examples/kvstore/examples", + "examples/kvstore/kvstore-node", + "examples/kvstore/testing/integration", + "examples/kvstore/testing/workloads", + "examples/queue/examples", + "examples/queue/queue-node", + "examples/queue/testing/integration", + "examples/queue/testing/workloads", + "examples/scheduler/examples", + "examples/scheduler/scheduler-node", + "examples/scheduler/testing/integration", + "examples/scheduler/testing/workloads", "testing-framework/core", "testing-framework/deployers/compose", "testing-framework/deployers/k8s", diff --git a/examples/kvstore/Dockerfile b/examples/kvstore/Dockerfile new file mode 100644 index 0000000..2bcd4cb --- /dev/null +++ b/examples/kvstore/Dockerfile @@ -0,0 +1,28 @@ +# Build stage +FROM rustlang/rust:nightly-bookworm AS builder + +WORKDIR /build + +# Copy all workspace files required for workspace build. +COPY Cargo.toml Cargo.lock ./ +COPY cfgsync/ ./cfgsync/ +COPY examples/ ./examples/ +COPY testing-framework/ ./testing-framework/ + +# Build kvstore-node in release mode. +RUN cargo build --release -p kvstore-node + +# Runtime stage +FROM debian:bookworm-slim + +RUN apt-get update && \ + apt-get install -y ca-certificates && \ + rm -rf /var/lib/apt/lists/* + +COPY --from=builder /build/target/release/kvstore-node /usr/local/bin/kvstore-node + +RUN mkdir -p /etc/kvstore +WORKDIR /app + +ENTRYPOINT ["/usr/local/bin/kvstore-node"] +CMD ["--config", "/etc/kvstore/config.yaml"] diff --git a/examples/kvstore/README.md b/examples/kvstore/README.md new file mode 100644 index 0000000..9fdb86b --- /dev/null +++ b/examples/kvstore/README.md @@ -0,0 +1,64 @@ +# KV Store Example + +This example runs a small replicated key-value store. + +The usual scenario writes keys through one node and checks that the other nodes +eventually return the same values. + +## How TF runs this + +Each example follows the same pattern: + +- TF starts a small deployment of kvstore nodes +- a workload writes keys through one node +- an expectation keeps reading from all nodes until they agree on the values + +## Scenarios + +- `basic_convergence` runs the convergence check locally +- `compose_convergence` runs the same check in Docker Compose +- `k8s_convergence` runs it on Kubernetes +- `k8s_manual_convergence` starts the nodes through the k8s manual cluster API, restarts one node, and checks convergence again + +## API + +Each node exposes: + +- `PUT /kv/:key` to write a value +- `GET /kv/:key` to read a value +- `GET /internal/snapshot` to read the local replicated state + +## Run locally + +```bash +cargo run -p kvstore-examples --bin basic_convergence +``` + +## Run with Docker Compose + +```bash +cargo run -p kvstore-examples --bin compose_convergence +``` + +Set `KVSTORE_IMAGE` to override the default compose image tag. + +## Run with Kubernetes + +```bash +docker build -t kvstore-node:local -f examples/kvstore/Dockerfile . +cargo run -p kvstore-examples --bin k8s_convergence +``` + +Prerequisites: +- `kubectl` configured with a reachable cluster +- `helm` installed + +Optional image override: +- `KVSTORE_K8S_IMAGE` (falls back to `KVSTORE_IMAGE`, then `kvstore-node:local`) + +## Run with Kubernetes manual cluster + +```bash +docker build -t kvstore-node:local -f examples/kvstore/Dockerfile . +cargo run -p kvstore-examples --bin k8s_manual_convergence +``` diff --git a/examples/kvstore/examples/Cargo.toml b/examples/kvstore/examples/Cargo.toml new file mode 100644 index 0000000..7a5dcac --- /dev/null +++ b/examples/kvstore/examples/Cargo.toml @@ -0,0 +1,19 @@ +[package] +edition.workspace = true +license.workspace = true +name = "kvstore-examples" +version.workspace = true + +[dependencies] +kvstore-node = { path = "../kvstore-node" } +kvstore-runtime-ext = { path = "../testing/integration" } +kvstore-runtime-workloads = { path = "../testing/workloads" } +testing-framework-core = { workspace = true } +testing-framework-runner-compose = { workspace = true } +testing-framework-runner-k8s = { workspace = true } + +anyhow = "1.0" +serde = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/examples/kvstore/examples/src/bin/basic_convergence.rs b/examples/kvstore/examples/src/bin/basic_convergence.rs new file mode 100644 index 0000000..6ebc130 --- /dev/null +++ b/examples/kvstore/examples/src/bin/basic_convergence.rs @@ -0,0 +1,31 @@ +use std::time::Duration; + +use kvstore_runtime_ext::KvLocalDeployer; +use kvstore_runtime_workloads::{ + KvBuilderExt, KvConverges, KvScenarioBuilder, KvTopology, KvWriteWorkload, +}; +use testing_framework_core::scenario::Deployer; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let mut scenario = KvScenarioBuilder::deployment_with(|_| KvTopology::new(3)) + .with_run_duration(Duration::from_secs(30)) + .with_workload( + KvWriteWorkload::new() + .operations(300) + .key_count(30) + .rate_per_sec(30) + .key_prefix("demo"), + ) + .with_expectation(KvConverges::new("demo", 30).timeout(Duration::from_secs(25))) + .build()?; + + let deployer = KvLocalDeployer::default(); + let runner = deployer.deploy(&scenario).await?; + runner.run(&mut scenario).await?; + Ok(()) +} diff --git a/examples/kvstore/examples/src/bin/compose_convergence.rs b/examples/kvstore/examples/src/bin/compose_convergence.rs new file mode 100644 index 0000000..03984da --- /dev/null +++ b/examples/kvstore/examples/src/bin/compose_convergence.rs @@ -0,0 +1,44 @@ +use std::time::Duration; + +use anyhow::{Context as _, Result}; +use kvstore_runtime_workloads::{ + KvBuilderExt, KvConverges, KvScenarioBuilder, KvTopology, KvWriteWorkload, +}; +use testing_framework_core::scenario::Deployer; +use testing_framework_runner_compose::ComposeRunnerError; +use tracing::{info, warn}; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let mut scenario = KvScenarioBuilder::deployment_with(|_| KvTopology::new(3)) + .with_run_duration(Duration::from_secs(30)) + .with_workload( + KvWriteWorkload::new() + .operations(200) + .key_count(20) + .rate_per_sec(20), + ) + .with_expectation(KvConverges::new("kv-demo", 20).timeout(Duration::from_secs(25))) + .build()?; + + let deployer = kvstore_runtime_ext::KvComposeDeployer::new(); + let runner = match deployer.deploy(&scenario).await { + Ok(runner) => runner, + Err(ComposeRunnerError::DockerUnavailable) => { + warn!("docker unavailable; skipping compose kv run"); + return Ok(()); + } + Err(error) => return Err(anyhow::Error::new(error)).context("deploying kv compose stack"), + }; + + info!("running kv compose convergence scenario"); + runner + .run(&mut scenario) + .await + .context("running kv compose scenario")?; + Ok(()) +} diff --git a/examples/kvstore/examples/src/bin/k8s_convergence.rs b/examples/kvstore/examples/src/bin/k8s_convergence.rs new file mode 100644 index 0000000..715ae7e --- /dev/null +++ b/examples/kvstore/examples/src/bin/k8s_convergence.rs @@ -0,0 +1,58 @@ +use std::time::Duration; + +use anyhow::{Context as _, Result}; +use kvstore_runtime_ext::KvK8sDeployer; +use kvstore_runtime_workloads::{ + KvBuilderExt, KvConverges, KvScenarioBuilder, KvTopology, KvWriteWorkload, +}; +use testing_framework_core::scenario::Deployer; +use testing_framework_runner_k8s::K8sRunnerError; +use tracing::{info, warn}; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let mut scenario = KvScenarioBuilder::deployment_with(|_| KvTopology::new(3)) + .with_run_duration(Duration::from_secs(30)) + .with_workload( + KvWriteWorkload::new() + .operations(200) + .key_count(20) + .rate_per_sec(20), + ) + .with_expectation(KvConverges::new("kv-demo", 20).timeout(Duration::from_secs(25))) + .build()?; + + let deployer = KvK8sDeployer::new(); + let runner = match deployer.deploy(&scenario).await { + Ok(runner) => runner, + Err(K8sRunnerError::ClientInit { source }) => { + warn!("k8s unavailable ({source}); skipping kv k8s run"); + return Ok(()); + } + Err(K8sRunnerError::InstallStack { source }) + if k8s_cluster_unavailable(&source.to_string()) => + { + warn!("k8s unavailable ({source}); skipping kv k8s run"); + return Ok(()); + } + Err(error) => return Err(anyhow::Error::new(error)).context("deploying kv k8s stack"), + }; + + info!("running kv k8s convergence scenario"); + runner + .run(&mut scenario) + .await + .context("running kv k8s scenario")?; + + Ok(()) +} + +fn k8s_cluster_unavailable(message: &str) -> bool { + message.contains("Unable to connect to the server") + || message.contains("TLS handshake timeout") + || message.contains("connection refused") +} diff --git a/examples/kvstore/examples/src/bin/k8s_manual_convergence.rs b/examples/kvstore/examples/src/bin/k8s_manual_convergence.rs new file mode 100644 index 0000000..755e92c --- /dev/null +++ b/examples/kvstore/examples/src/bin/k8s_manual_convergence.rs @@ -0,0 +1,155 @@ +use std::time::Duration; + +use anyhow::{Context as _, Result, anyhow}; +use kvstore_node::KvHttpClient; +use kvstore_runtime_ext::{KvK8sDeployer, KvTopology}; +use serde::{Deserialize, Serialize}; +use testing_framework_runner_k8s::ManualClusterError; +use tracing::{info, warn}; + +#[derive(Serialize)] +struct PutRequest { + value: String, + expected_version: Option, +} + +#[derive(Deserialize)] +struct PutResponse { + applied: bool, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +struct ValueRecord { + value: String, + version: u64, + origin: u64, +} + +#[derive(Deserialize)] +struct GetResponse { + record: Option, +} + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let deployer = KvK8sDeployer::new(); + let cluster = match deployer + .manual_cluster_from_descriptors(KvTopology::new(3)) + .await + { + Ok(cluster) => cluster, + Err(ManualClusterError::ClientInit { source }) => { + warn!("k8s unavailable ({source}); skipping kv k8s manual run"); + return Ok(()); + } + Err(ManualClusterError::InstallStack { source }) + if k8s_cluster_unavailable(&source.to_string()) => + { + warn!("k8s unavailable ({source}); skipping kv k8s manual run"); + return Ok(()); + } + Err(error) => { + return Err(anyhow::Error::new(error)).context("creating kv k8s manual cluster"); + } + }; + + let node0 = cluster.start_node("node-0").await?.client; + let node1 = cluster.start_node("node-1").await?.client; + let node2 = cluster.start_node("node-2").await?.client; + + cluster.wait_network_ready().await?; + + write_keys(&node0, "kv-manual", 12).await?; + wait_for_convergence( + &[node0.clone(), node1.clone(), node2.clone()], + "kv-manual", + 12, + ) + .await?; + + info!("restarting node-2 in manual cluster"); + cluster.restart_node("node-2").await?; + cluster.wait_network_ready().await?; + + let node2 = cluster + .node_client("node-2") + .ok_or_else(|| anyhow!("node-2 client missing after restart"))?; + wait_for_convergence(&[node0, node1, node2], "kv-manual", 12).await?; + + cluster.stop_all(); + Ok(()) +} + +async fn write_keys(client: &KvHttpClient, prefix: &str, key_count: usize) -> Result<()> { + for index in 0..key_count { + let key = format!("{prefix}-{index}"); + let response: PutResponse = client + .put( + &format!("/kv/{key}"), + &PutRequest { + value: format!("value-{index}"), + expected_version: None, + }, + ) + .await + .map_err(|error| anyhow!(error.to_string())) + .with_context(|| format!("writing key {key}"))?; + + if !response.applied { + return Err(anyhow!("write rejected for key {key}")); + } + } + + Ok(()) +} + +async fn wait_for_convergence( + clients: &[KvHttpClient], + prefix: &str, + key_count: usize, +) -> Result<()> { + let deadline = tokio::time::Instant::now() + Duration::from_secs(30); + + while tokio::time::Instant::now() < deadline { + if is_converged(clients, prefix, key_count).await? { + info!(key_count, "kv manual cluster converged"); + return Ok(()); + } + tokio::time::sleep(Duration::from_millis(500)).await; + } + + Err(anyhow!("kv manual cluster did not converge within timeout")) +} + +async fn is_converged(clients: &[KvHttpClient], prefix: &str, key_count: usize) -> Result { + for index in 0..key_count { + let key = format!("{prefix}-{index}"); + let first = read_key(&clients[0], &key).await?; + for client in &clients[1..] { + if read_key(client, &key).await? != first { + return Ok(false); + } + } + } + + Ok(true) +} + +async fn read_key(client: &KvHttpClient, key: &str) -> Result> { + let response: GetResponse = client + .get(&format!("/kv/{key}")) + .await + .map_err(|error| anyhow!(error.to_string())) + .with_context(|| format!("reading key {key}"))?; + Ok(response.record) +} + +fn k8s_cluster_unavailable(message: &str) -> bool { + message.contains("Unable to connect to the server") + || message.contains("TLS handshake timeout") + || message.contains("connection refused") +} diff --git a/examples/kvstore/kvstore-node/Cargo.toml b/examples/kvstore/kvstore-node/Cargo.toml new file mode 100644 index 0000000..d117f67 --- /dev/null +++ b/examples/kvstore/kvstore-node/Cargo.toml @@ -0,0 +1,24 @@ +[package] +edition.workspace = true +license.workspace = true +name = "kvstore-node" +version.workspace = true + +[[bin]] +name = "kvstore-node" +path = "src/main.rs" + +[dependencies] +axum = "0.7" +tower-http = { version = "0.6", features = ["trace"] } + +serde = { workspace = true } +serde_yaml = { workspace = true } + +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } + +anyhow = "1.0" +clap = { version = "4.0", features = ["derive"] } +reqwest = { workspace = true, features = ["json"] } diff --git a/examples/kvstore/kvstore-node/src/client.rs b/examples/kvstore/kvstore-node/src/client.rs new file mode 100644 index 0000000..8276795 --- /dev/null +++ b/examples/kvstore/kvstore-node/src/client.rs @@ -0,0 +1,40 @@ +use reqwest::Url; +use serde::Serialize; + +#[derive(Clone)] +pub struct KvHttpClient { + base_url: Url, + client: reqwest::Client, +} + +impl KvHttpClient { + #[must_use] + pub fn new(base_url: Url) -> Self { + Self { + base_url, + client: reqwest::Client::new(), + } + } + + pub async fn get(&self, path: &str) -> anyhow::Result { + let url = self.base_url.join(path)?; + let response = self.client.get(url).send().await?.error_for_status()?; + Ok(response.json().await?) + } + + pub async fn put( + &self, + path: &str, + body: &B, + ) -> anyhow::Result { + let url = self.base_url.join(path)?; + let response = self + .client + .put(url) + .json(body) + .send() + .await? + .error_for_status()?; + Ok(response.json().await?) + } +} diff --git a/examples/kvstore/kvstore-node/src/config.rs b/examples/kvstore/kvstore-node/src/config.rs new file mode 100644 index 0000000..de5149c --- /dev/null +++ b/examples/kvstore/kvstore-node/src/config.rs @@ -0,0 +1,30 @@ +use std::{fs, path::Path}; + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PeerInfo { + pub node_id: u64, + pub http_address: String, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct KvConfig { + pub node_id: u64, + pub http_port: u16, + pub peers: Vec, + #[serde(default = "default_sync_interval_ms")] + pub sync_interval_ms: u64, +} + +impl KvConfig { + pub fn load(path: &Path) -> anyhow::Result { + let raw = fs::read_to_string(path)?; + let config = serde_yaml::from_str(&raw)?; + Ok(config) + } +} + +const fn default_sync_interval_ms() -> u64 { + 1000 +} diff --git a/examples/kvstore/kvstore-node/src/lib.rs b/examples/kvstore/kvstore-node/src/lib.rs new file mode 100644 index 0000000..2292ad1 --- /dev/null +++ b/examples/kvstore/kvstore-node/src/lib.rs @@ -0,0 +1,3 @@ +pub mod client; + +pub use client::KvHttpClient; diff --git a/examples/kvstore/kvstore-node/src/main.rs b/examples/kvstore/kvstore-node/src/main.rs new file mode 100644 index 0000000..2301c2a --- /dev/null +++ b/examples/kvstore/kvstore-node/src/main.rs @@ -0,0 +1,36 @@ +mod config; +mod server; +mod state; +mod sync; + +use std::path::PathBuf; + +use clap::Parser; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +use crate::{config::KvConfig, state::KvState, sync::SyncService}; + +#[derive(Parser, Debug)] +#[command(name = "kvstore-node")] +struct Args { + #[arg(short, long)] + config: PathBuf, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "kvstore_node=info,tower_http=debug".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + + let args = Args::parse(); + let config = KvConfig::load(&args.config)?; + + let state = KvState::new(config.node_id); + SyncService::new(config.clone(), state.clone()).start(); + server::start_server(config, state).await +} diff --git a/examples/kvstore/kvstore-node/src/server.rs b/examples/kvstore/kvstore-node/src/server.rs new file mode 100644 index 0000000..94fcd82 --- /dev/null +++ b/examples/kvstore/kvstore-node/src/server.rs @@ -0,0 +1,112 @@ +use std::net::SocketAddr; + +use axum::{ + Router, + extract::{Path, State}, + http::StatusCode, + response::Json, + routing::get, +}; +use serde::{Deserialize, Serialize}; +use tower_http::trace::TraceLayer; + +use crate::{ + config::KvConfig, + state::{KvState, Snapshot, ValueRecord}, +}; + +#[derive(Serialize)] +struct HealthResponse { + status: &'static str, +} + +#[derive(Deserialize)] +struct PutRequest { + value: String, + expected_version: Option, +} + +#[derive(Serialize)] +struct PutResponse { + applied: bool, + version: u64, +} + +#[derive(Serialize)] +struct GetResponse { + key: String, + record: Option, +} + +pub async fn start_server(config: KvConfig, state: KvState) -> anyhow::Result<()> { + let app = Router::new() + .route("/health/live", get(health_live)) + .route("/health/ready", get(health_ready)) + .route("/kv/:key", get(get_key).put(put_key)) + .route("/internal/snapshot", get(get_snapshot)) + .layer(TraceLayer::new_for_http()) + .with_state(state.clone()); + + let addr = SocketAddr::from(([0, 0, 0, 0], config.http_port)); + let listener = tokio::net::TcpListener::bind(addr).await?; + + state.set_ready(true).await; + tracing::info!(node_id = state.node_id(), %addr, "kv node ready"); + + axum::serve(listener, app).await?; + Ok(()) +} + +async fn health_live() -> (StatusCode, Json) { + (StatusCode::OK, Json(HealthResponse { status: "alive" })) +} + +async fn health_ready(State(state): State) -> (StatusCode, Json) { + if state.is_ready().await { + (StatusCode::OK, Json(HealthResponse { status: "ready" })) + } else { + ( + StatusCode::SERVICE_UNAVAILABLE, + Json(HealthResponse { + status: "not-ready", + }), + ) + } +} + +async fn get_key(Path(key): Path, State(state): State) -> Json { + let record = state.get(&key).await; + Json(GetResponse { key, record }) +} + +async fn put_key( + Path(key): Path, + State(state): State, + Json(request): Json, +) -> (StatusCode, Json) { + let outcome = state + .put_local(key, request.value, request.expected_version) + .await; + + if outcome.applied { + ( + StatusCode::OK, + Json(PutResponse { + applied: true, + version: outcome.current_version, + }), + ) + } else { + ( + StatusCode::CONFLICT, + Json(PutResponse { + applied: false, + version: outcome.current_version, + }), + ) + } +} + +async fn get_snapshot(State(state): State) -> Json { + Json(state.snapshot().await) +} diff --git a/examples/kvstore/kvstore-node/src/state.rs b/examples/kvstore/kvstore-node/src/state.rs new file mode 100644 index 0000000..6d23120 --- /dev/null +++ b/examples/kvstore/kvstore-node/src/state.rs @@ -0,0 +1,111 @@ +use std::{collections::HashMap, sync::Arc}; + +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct ValueRecord { + pub value: String, + pub version: u64, + pub origin: u64, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Snapshot { + pub node_id: u64, + pub entries: HashMap, +} + +#[derive(Clone, Debug)] +pub struct PutOutcome { + pub applied: bool, + pub current_version: u64, +} + +#[derive(Clone)] +pub struct KvState { + node_id: u64, + ready: Arc>, + entries: Arc>>, +} + +impl KvState { + pub fn new(node_id: u64) -> Self { + Self { + node_id, + ready: Arc::new(RwLock::new(false)), + entries: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub const fn node_id(&self) -> u64 { + self.node_id + } + + pub async fn set_ready(&self, value: bool) { + *self.ready.write().await = value; + } + + pub async fn is_ready(&self) -> bool { + *self.ready.read().await + } + + pub async fn get(&self, key: &str) -> Option { + self.entries.read().await.get(key).cloned() + } + + pub async fn put_local( + &self, + key: String, + value: String, + expected_version: Option, + ) -> PutOutcome { + let mut entries = self.entries.write().await; + let current_version = entries.get(&key).map_or(0, |record| record.version); + + if expected_version.is_some_and(|expected| expected != current_version) { + return PutOutcome { + applied: false, + current_version, + }; + } + + let next_version = current_version.saturating_add(1); + entries.insert( + key, + ValueRecord { + value, + version: next_version, + origin: self.node_id, + }, + ); + + PutOutcome { + applied: true, + current_version: next_version, + } + } + + pub async fn merge_snapshot(&self, snapshot: Snapshot) { + let mut local = self.entries.write().await; + for (key, incoming) in snapshot.entries { + match local.get(&key) { + Some(existing) if !is_newer_record(&incoming, existing) => {} + _ => { + local.insert(key, incoming); + } + } + } + } + + pub async fn snapshot(&self) -> Snapshot { + Snapshot { + node_id: self.node_id, + entries: self.entries.read().await.clone(), + } + } +} + +fn is_newer_record(candidate: &ValueRecord, existing: &ValueRecord) -> bool { + (candidate.version, candidate.origin) > (existing.version, existing.origin) +} diff --git a/examples/kvstore/kvstore-node/src/sync.rs b/examples/kvstore/kvstore-node/src/sync.rs new file mode 100644 index 0000000..875342d --- /dev/null +++ b/examples/kvstore/kvstore-node/src/sync.rs @@ -0,0 +1,103 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use reqwest::Client; +use tokio::sync::Mutex; +use tracing::{debug, warn}; + +use crate::{ + config::KvConfig, + state::{KvState, Snapshot}, +}; + +const WARN_AFTER_CONSECUTIVE_FAILURES: u32 = 5; + +#[derive(Clone)] +pub struct SyncService { + config: Arc, + state: KvState, + client: Client, + failures_by_peer: Arc>>, +} + +impl SyncService { + pub fn new(config: KvConfig, state: KvState) -> Self { + Self { + config: Arc::new(config), + state, + client: Client::new(), + failures_by_peer: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub fn start(&self) { + let service = self.clone(); + tokio::spawn(async move { + service.run().await; + }); + } + + async fn run(self) { + let interval = Duration::from_millis(self.config.sync_interval_ms.max(100)); + loop { + self.sync_once().await; + tokio::time::sleep(interval).await; + } + } + + async fn sync_once(&self) { + for peer in &self.config.peers { + match self.fetch_snapshot(&peer.http_address).await { + Ok(snapshot) => { + self.state.merge_snapshot(snapshot).await; + self.clear_failure_counter(&peer.http_address).await; + } + Err(error) => { + self.record_sync_failure(&peer.http_address, &error).await; + } + } + } + } + + async fn fetch_snapshot(&self, peer_address: &str) -> anyhow::Result { + let url = format!("http://{peer_address}/internal/snapshot"); + let snapshot = self + .client + .get(url) + .send() + .await? + .error_for_status()? + .json() + .await?; + Ok(snapshot) + } + + async fn clear_failure_counter(&self, peer_address: &str) { + let mut failures = self.failures_by_peer.lock().await; + failures.remove(peer_address); + } + + async fn record_sync_failure(&self, peer_address: &str, error: &anyhow::Error) { + let consecutive_failures = { + let mut failures = self.failures_by_peer.lock().await; + let entry = failures.entry(peer_address.to_owned()).or_insert(0); + *entry += 1; + *entry + }; + + if consecutive_failures >= WARN_AFTER_CONSECUTIVE_FAILURES { + warn!( + peer = %peer_address, + %error, + consecutive_failures, + "kv sync repeatedly failing" + ); + } else { + debug!( + peer = %peer_address, + %error, + consecutive_failures, + "kv sync failed" + ); + } + } +} diff --git a/examples/kvstore/testing/integration/Cargo.toml b/examples/kvstore/testing/integration/Cargo.toml new file mode 100644 index 0000000..9773cac --- /dev/null +++ b/examples/kvstore/testing/integration/Cargo.toml @@ -0,0 +1,15 @@ +[package] +edition.workspace = true +license.workspace = true +name = "kvstore-runtime-ext" +version.workspace = true + +[dependencies] +testing-framework-core = { workspace = true } +testing-framework-runner-compose = { workspace = true } +testing-framework-runner-k8s = { workspace = true } +testing-framework-runner-local = { workspace = true } + +async-trait = { workspace = true } +kvstore-node = { path = "../../kvstore-node" } +serde = { workspace = true } diff --git a/examples/kvstore/testing/integration/src/app.rs b/examples/kvstore/testing/integration/src/app.rs new file mode 100644 index 0000000..bd0dffe --- /dev/null +++ b/examples/kvstore/testing/integration/src/app.rs @@ -0,0 +1,75 @@ +use std::io::Error; + +use async_trait::async_trait; +use kvstore_node::KvHttpClient; +use serde::{Deserialize, Serialize}; +use testing_framework_core::scenario::{ + Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView, DynError, + NodeAccess, serialize_cluster_yaml_config, +}; + +pub type KvTopology = testing_framework_core::topology::ClusterTopology; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct KvPeerInfo { + pub node_id: u64, + pub http_address: String, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct KvNodeConfig { + pub node_id: u64, + pub http_port: u16, + pub peers: Vec, + pub sync_interval_ms: u64, +} + +pub struct KvEnv; + +#[async_trait] +impl Application for KvEnv { + type Deployment = KvTopology; + type NodeClient = KvHttpClient; + type NodeConfig = KvNodeConfig; + fn build_node_client(access: &NodeAccess) -> Result { + Ok(KvHttpClient::new(access.api_base_url()?)) + } + + fn node_readiness_path() -> &'static str { + "/health/ready" + } +} + +impl ClusterNodeConfigApplication for KvEnv { + type ConfigError = Error; + + fn static_network_port() -> u16 { + 8080 + } + + fn build_cluster_node_config( + node: &ClusterNodeView, + peers: &[ClusterPeerView], + ) -> Result { + let peers = peers + .iter() + .map(|peer| KvPeerInfo { + node_id: peer.index() as u64, + http_address: peer.authority(), + }) + .collect::>(); + + Ok(KvNodeConfig { + node_id: node.index() as u64, + http_port: node.network_port(), + peers, + sync_interval_ms: 500, + }) + } + + fn serialize_cluster_node_config( + config: &Self::NodeConfig, + ) -> Result { + serialize_cluster_yaml_config(config).map_err(Error::other) + } +} diff --git a/examples/kvstore/testing/integration/src/compose_env.rs b/examples/kvstore/testing/integration/src/compose_env.rs new file mode 100644 index 0000000..27fe638 --- /dev/null +++ b/examples/kvstore/testing/integration/src/compose_env.rs @@ -0,0 +1,15 @@ +use testing_framework_runner_compose::{BinaryConfigNodeSpec, ComposeBinaryApp}; + +use crate::KvEnv; + +const NODE_CONFIG_PATH: &str = "/etc/kvstore/config.yaml"; + +impl ComposeBinaryApp for KvEnv { + fn compose_node_spec() -> BinaryConfigNodeSpec { + BinaryConfigNodeSpec::conventional( + "/usr/local/bin/kvstore-node", + NODE_CONFIG_PATH, + vec![8080, 8081], + ) + } +} diff --git a/examples/kvstore/testing/integration/src/k8s_env.rs b/examples/kvstore/testing/integration/src/k8s_env.rs new file mode 100644 index 0000000..f7e5d27 --- /dev/null +++ b/examples/kvstore/testing/integration/src/k8s_env.rs @@ -0,0 +1,21 @@ +use testing_framework_runner_k8s::{BinaryConfigK8sSpec, K8sBinaryApp}; + +use crate::KvEnv; + +const CONTAINER_CONFIG_PATH: &str = "/etc/kvstore/config.yaml"; +const CONTAINER_HTTP_PORT: u16 = 8080; +const SERVICE_TESTING_PORT: u16 = 8081; +const NODE_NAME_PREFIX: &str = "kvstore-node"; + +impl K8sBinaryApp for KvEnv { + fn k8s_binary_spec() -> BinaryConfigK8sSpec { + BinaryConfigK8sSpec::conventional( + "kvstore", + NODE_NAME_PREFIX, + "/usr/local/bin/kvstore-node", + CONTAINER_CONFIG_PATH, + CONTAINER_HTTP_PORT, + SERVICE_TESTING_PORT, + ) + } +} diff --git a/examples/kvstore/testing/integration/src/lib.rs b/examples/kvstore/testing/integration/src/lib.rs new file mode 100644 index 0000000..ec8e387 --- /dev/null +++ b/examples/kvstore/testing/integration/src/lib.rs @@ -0,0 +1,12 @@ +mod app; +mod compose_env; +mod k8s_env; +mod local_env; +pub mod scenario; + +pub use app::*; +pub use scenario::{KvBuilderExt, KvScenarioBuilder}; + +pub type KvLocalDeployer = testing_framework_runner_local::ProcessDeployer; +pub type KvComposeDeployer = testing_framework_runner_compose::ComposeDeployer; +pub type KvK8sDeployer = testing_framework_runner_k8s::K8sDeployer; diff --git a/examples/kvstore/testing/integration/src/local_env.rs b/examples/kvstore/testing/integration/src/local_env.rs new file mode 100644 index 0000000..0450439 --- /dev/null +++ b/examples/kvstore/testing/integration/src/local_env.rs @@ -0,0 +1,41 @@ +use std::collections::HashMap; + +use testing_framework_core::scenario::{DynError, StartNodeOptions}; +use testing_framework_runner_local::{ + LocalBinaryApp, LocalNodePorts, LocalPeerNode, LocalProcessSpec, + build_local_cluster_node_config, yaml_node_config, +}; + +use crate::{KvEnv, KvNodeConfig}; + +impl LocalBinaryApp for KvEnv { + fn initial_node_name_prefix() -> &'static str { + "kv-node" + } + + fn build_local_node_config_with_peers( + _topology: &Self::Deployment, + index: usize, + ports: &LocalNodePorts, + peers: &[LocalPeerNode], + _peer_ports_by_name: &HashMap, + _options: &StartNodeOptions, + _template_config: Option< + &::NodeConfig, + >, + ) -> Result<::NodeConfig, DynError> { + build_local_cluster_node_config::(index, ports, peers) + } + + fn local_process_spec() -> LocalProcessSpec { + LocalProcessSpec::new("KVSTORE_NODE_BIN", "kvstore-node").with_rust_log("kvstore_node=info") + } + + fn render_local_config(config: &KvNodeConfig) -> Result, DynError> { + yaml_node_config(config) + } + + fn http_api_port(config: &KvNodeConfig) -> u16 { + config.http_port + } +} diff --git a/examples/kvstore/testing/integration/src/scenario.rs b/examples/kvstore/testing/integration/src/scenario.rs new file mode 100644 index 0000000..5a0e770 --- /dev/null +++ b/examples/kvstore/testing/integration/src/scenario.rs @@ -0,0 +1,15 @@ +use testing_framework_core::scenario::ScenarioBuilder; + +use crate::{KvEnv, KvTopology}; + +pub type KvScenarioBuilder = ScenarioBuilder; + +pub trait KvBuilderExt: Sized { + fn deployment_with(f: impl FnOnce(KvTopology) -> KvTopology) -> Self; +} + +impl KvBuilderExt for KvScenarioBuilder { + fn deployment_with(f: impl FnOnce(KvTopology) -> KvTopology) -> Self { + KvScenarioBuilder::with_deployment(f(KvTopology::new(3))) + } +} diff --git a/examples/kvstore/testing/workloads/Cargo.toml b/examples/kvstore/testing/workloads/Cargo.toml new file mode 100644 index 0000000..e400c2a --- /dev/null +++ b/examples/kvstore/testing/workloads/Cargo.toml @@ -0,0 +1,15 @@ +[package] +edition.workspace = true +license.workspace = true +name = "kvstore-runtime-workloads" +version.workspace = true + +[dependencies] +kvstore-node = { path = "../../kvstore-node" } +kvstore-runtime-ext = { path = "../integration" } +testing-framework-core = { workspace = true } + +async-trait = { workspace = true } +serde = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } diff --git a/examples/kvstore/testing/workloads/src/expectations.rs b/examples/kvstore/testing/workloads/src/expectations.rs new file mode 100644 index 0000000..612731d --- /dev/null +++ b/examples/kvstore/testing/workloads/src/expectations.rs @@ -0,0 +1,100 @@ +use std::time::Duration; + +use async_trait::async_trait; +use kvstore_runtime_ext::KvEnv; +use serde::Deserialize; +use testing_framework_core::scenario::{DynError, Expectation, RunContext}; +use tracing::info; + +#[derive(Clone)] +pub struct KvConverges { + key_prefix: String, + key_count: usize, + timeout: Duration, + poll_interval: Duration, +} + +#[derive(Deserialize, Clone, Debug, Eq, PartialEq)] +struct ValueRecord { + value: String, + version: u64, + origin: u64, +} + +#[derive(Deserialize)] +struct GetResponse { + record: Option, +} + +impl KvConverges { + #[must_use] + pub fn new(key_prefix: impl Into, key_count: usize) -> Self { + Self { + key_prefix: key_prefix.into(), + key_count, + timeout: Duration::from_secs(20), + poll_interval: Duration::from_millis(500), + } + } + + #[must_use] + pub const fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } +} + +#[async_trait] +impl Expectation for KvConverges { + fn name(&self) -> &str { + "kv_converges" + } + + async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> { + let clients = ctx.node_clients().snapshot(); + if clients.is_empty() { + return Err("no kv node clients available".into()); + } + + let deadline = tokio::time::Instant::now() + self.timeout; + while tokio::time::Instant::now() < deadline { + if self.is_converged(&clients).await? { + info!(key_count = self.key_count, "kv convergence reached"); + return Ok(()); + } + tokio::time::sleep(self.poll_interval).await; + } + + Err(format!( + "kv convergence not reached within {:?} for {} keys", + self.timeout, self.key_count + ) + .into()) + } +} + +impl KvConverges { + async fn is_converged(&self, clients: &[kvstore_node::KvHttpClient]) -> Result { + for key_idx in 0..self.key_count { + let key = format!("{}-{key_idx}", self.key_prefix); + let first = read_key(clients, &key, 0).await?; + for node_idx in 1..clients.len() { + let current = read_key(clients, &key, node_idx).await?; + if current != first { + return Ok(false); + } + } + } + + Ok(true) + } +} + +async fn read_key( + clients: &[kvstore_node::KvHttpClient], + key: &str, + index: usize, +) -> Result, DynError> { + let response: GetResponse = clients[index].get(&format!("/kv/{key}")).await?; + Ok(response.record) +} diff --git a/examples/kvstore/testing/workloads/src/lib.rs b/examples/kvstore/testing/workloads/src/lib.rs new file mode 100644 index 0000000..23ac375 --- /dev/null +++ b/examples/kvstore/testing/workloads/src/lib.rs @@ -0,0 +1,6 @@ +mod expectations; +mod write; + +pub use expectations::KvConverges; +pub use kvstore_runtime_ext::{KvBuilderExt, KvEnv, KvScenarioBuilder, KvTopology}; +pub use write::KvWriteWorkload; diff --git a/examples/kvstore/testing/workloads/src/write.rs b/examples/kvstore/testing/workloads/src/write.rs new file mode 100644 index 0000000..ec65a05 --- /dev/null +++ b/examples/kvstore/testing/workloads/src/write.rs @@ -0,0 +1,135 @@ +use std::time::Duration; + +use async_trait::async_trait; +use kvstore_runtime_ext::KvEnv; +use serde::{Deserialize, Serialize}; +use testing_framework_core::scenario::{DynError, RunContext, Workload}; +use tracing::info; + +#[derive(Clone)] +pub struct KvWriteWorkload { + operations: usize, + key_count: usize, + rate_per_sec: Option, + key_prefix: String, +} + +#[derive(Serialize)] +struct PutRequest { + value: String, + expected_version: Option, +} + +#[derive(Deserialize)] +struct PutResponse { + applied: bool, + version: u64, +} + +impl KvWriteWorkload { + #[must_use] + pub fn new() -> Self { + Self { + operations: 200, + key_count: 20, + rate_per_sec: Some(25), + key_prefix: "kv-demo".to_owned(), + } + } + + #[must_use] + pub const fn operations(mut self, value: usize) -> Self { + self.operations = value; + self + } + + #[must_use] + pub const fn key_count(mut self, value: usize) -> Self { + self.key_count = value; + self + } + + #[must_use] + pub const fn rate_per_sec(mut self, value: usize) -> Self { + self.rate_per_sec = Some(value); + self + } + + #[must_use] + pub fn key_prefix(mut self, value: impl Into) -> Self { + self.key_prefix = value.into(); + self + } +} + +impl Default for KvWriteWorkload { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl Workload for KvWriteWorkload { + fn name(&self) -> &str { + "kv_write_workload" + } + + async fn start(&self, ctx: &RunContext) -> Result<(), DynError> { + let clients = ctx.node_clients().snapshot(); + let Some(leader) = clients.first() else { + return Err("no kv node clients available".into()); + }; + + if self.key_count == 0 { + return Err("kv workload key_count must be > 0".into()); + } + + let interval = self.rate_per_sec.and_then(compute_interval); + info!( + operations = self.operations, + key_count = self.key_count, + rate_per_sec = ?self.rate_per_sec, + "starting kv write workload" + ); + + for idx in 0..self.operations { + let key = format!("{}-{}", self.key_prefix, idx % self.key_count); + let value = format!("value-{idx}"); + let response: PutResponse = leader + .put( + &format!("/kv/{key}"), + &PutRequest { + value, + expected_version: None, + }, + ) + .await?; + + if !response.applied { + return Err(format!("leader rejected write for key {key}").into()); + } + + if (idx + 1) % 25 == 0 { + info!( + completed = idx + 1, + version = response.version, + "kv write progress" + ); + } + + if let Some(delay) = interval { + tokio::time::sleep(delay).await; + } + } + + Ok(()) + } +} + +fn compute_interval(rate_per_sec: usize) -> Option { + if rate_per_sec == 0 { + return None; + } + + Some(Duration::from_millis((1000 / rate_per_sec as u64).max(1))) +} diff --git a/examples/queue/Dockerfile b/examples/queue/Dockerfile new file mode 100644 index 0000000..266fbdd --- /dev/null +++ b/examples/queue/Dockerfile @@ -0,0 +1,24 @@ +FROM rustlang/rust:nightly-bookworm AS builder + +WORKDIR /build + +COPY Cargo.toml Cargo.lock ./ +COPY cfgsync/ ./cfgsync/ +COPY examples/ ./examples/ +COPY testing-framework/ ./testing-framework/ + +RUN cargo build --release -p queue-node + +FROM debian:bookworm-slim + +RUN apt-get update && \ + apt-get install -y ca-certificates && \ + rm -rf /var/lib/apt/lists/* + +COPY --from=builder /build/target/release/queue-node /usr/local/bin/queue-node + +RUN mkdir -p /etc/queue +WORKDIR /app + +ENTRYPOINT ["/usr/local/bin/queue-node"] +CMD ["--config", "/etc/queue/config.yaml"] diff --git a/examples/queue/README.md b/examples/queue/README.md new file mode 100644 index 0000000..998f119 --- /dev/null +++ b/examples/queue/README.md @@ -0,0 +1,47 @@ +# Queue Example + +This example runs a small replicated FIFO queue. + +The scenarios enqueue messages, dequeue them again, and check that queue state +either converges or drains as expected. + +## How TF runs this + +Each example follows the same pattern: + +- TF starts a small deployment of queue nodes +- a workload produces messages, or produces and consumes them +- an expectation checks either that queue state converges or that the queue drains + +## Scenarios + +- `basic_convergence` produces messages and checks that queue state converges locally +- `basic_roundtrip` produces and consumes messages locally until the queue drains +- `basic_restart_chaos` injects random local node restarts during the run +- `compose_convergence` and `compose_roundtrip` run the same checks in Docker Compose + +## API + +Each node exposes: + +- `POST /queue/enqueue` to add a message +- `POST /queue/dequeue` to remove a message +- `GET /queue/state` to inspect the current queue state +- `GET /internal/snapshot` to read the local replicated state + +## Run locally + +```bash +cargo run -p queue-examples --bin basic_convergence +cargo run -p queue-examples --bin basic_roundtrip +cargo run -p queue-examples --bin basic_restart_chaos +``` + +## Run with Docker Compose + +```bash +cargo run -p queue-examples --bin compose_convergence +cargo run -p queue-examples --bin compose_roundtrip +``` + +Set `QUEUE_IMAGE` to override the default compose image tag. diff --git a/examples/queue/examples/Cargo.toml b/examples/queue/examples/Cargo.toml new file mode 100644 index 0000000..d71caab --- /dev/null +++ b/examples/queue/examples/Cargo.toml @@ -0,0 +1,16 @@ +[package] +edition.workspace = true +license.workspace = true +name = "queue-examples" +version.workspace = true + +[dependencies] +anyhow = "1.0" +async-trait = { workspace = true } +queue-runtime-ext = { path = "../testing/integration" } +queue-runtime-workloads = { path = "../testing/workloads" } +testing-framework-core = { workspace = true } +testing-framework-runner-compose = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/examples/queue/examples/src/bin/basic_convergence.rs b/examples/queue/examples/src/bin/basic_convergence.rs new file mode 100644 index 0000000..e7bceec --- /dev/null +++ b/examples/queue/examples/src/bin/basic_convergence.rs @@ -0,0 +1,32 @@ +use std::time::Duration; + +use queue_runtime_ext::QueueLocalDeployer; +use queue_runtime_workloads::{ + QueueBuilderExt, QueueConverges, QueueProduceWorkload, QueueScenarioBuilder, QueueTopology, +}; +use testing_framework_core::scenario::Deployer; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let operations = 300; + + let mut scenario = QueueScenarioBuilder::deployment_with(|_| QueueTopology::new(3)) + .with_run_duration(Duration::from_secs(30)) + .with_workload( + QueueProduceWorkload::new() + .operations(operations) + .rate_per_sec(30) + .payload_prefix("demo"), + ) + .with_expectation(QueueConverges::new(operations).timeout(Duration::from_secs(25))) + .build()?; + + let deployer = QueueLocalDeployer::default(); + let runner = deployer.deploy(&scenario).await?; + runner.run(&mut scenario).await?; + Ok(()) +} diff --git a/examples/queue/examples/src/bin/basic_restart_chaos.rs b/examples/queue/examples/src/bin/basic_restart_chaos.rs new file mode 100644 index 0000000..cca911e --- /dev/null +++ b/examples/queue/examples/src/bin/basic_restart_chaos.rs @@ -0,0 +1,84 @@ +use std::time::Duration; + +use async_trait::async_trait; +use queue_runtime_ext::QueueLocalDeployer; +use queue_runtime_workloads::{ + QueueBuilderExt, QueueConverges, QueueProduceWorkload, QueueScenarioBuilder, QueueTopology, +}; +use testing_framework_core::{ + scenario::{Deployer, DynError, RunContext, Workload}, + topology::DeploymentDescriptor, +}; +use tracing::info; + +#[derive(Clone)] +struct FixedRestartChaosWorkload { + restarts: usize, + delay: Duration, +} + +impl FixedRestartChaosWorkload { + const fn new(restarts: usize, delay: Duration) -> Self { + Self { restarts, delay } + } +} + +#[async_trait] +impl Workload for FixedRestartChaosWorkload { + fn name(&self) -> &str { + "fixed_restart_chaos" + } + + async fn start( + &self, + ctx: &RunContext, + ) -> Result<(), DynError> { + let Some(control) = ctx.node_control() else { + return Err("fixed restart chaos requires node control".into()); + }; + + let node_count = ctx.descriptors().node_count(); + if node_count == 0 { + return Err("fixed restart chaos requires at least one node".into()); + } + + for step in 0..self.restarts { + tokio::time::sleep(self.delay).await; + let target_index = if node_count > 1 { + (step % (node_count - 1)) + 1 + } else { + 0 + }; + let target = format!("node-{target_index}"); + info!(step, %target, "triggering controlled chaos restart"); + control.restart_node(&target).await?; + } + + Ok(()) + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let mut scenario = QueueScenarioBuilder::deployment_with(|_| QueueTopology::new(3)) + .enable_node_control() + .with_workload(FixedRestartChaosWorkload::new(3, Duration::from_secs(8))) + .with_run_duration(Duration::from_secs(30)) + .with_workload( + QueueProduceWorkload::new() + .operations(400) + .rate_per_sec(40) + .payload_prefix("queue-chaos"), + ) + .with_expectation(QueueConverges::new(200).timeout(Duration::from_secs(30))) + .build()?; + + let deployer = QueueLocalDeployer::default(); + let runner = deployer.deploy(&scenario).await?; + runner.run(&mut scenario).await?; + Ok(()) +} diff --git a/examples/queue/examples/src/bin/basic_roundtrip.rs b/examples/queue/examples/src/bin/basic_roundtrip.rs new file mode 100644 index 0000000..17ce4c1 --- /dev/null +++ b/examples/queue/examples/src/bin/basic_roundtrip.rs @@ -0,0 +1,31 @@ +use std::time::Duration; + +use queue_runtime_ext::QueueLocalDeployer; +use queue_runtime_workloads::{ + QueueBuilderExt, QueueDrained, QueueRoundTripWorkload, QueueScenarioBuilder, QueueTopology, +}; +use testing_framework_core::scenario::Deployer; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let operations = 200; + + let mut scenario = QueueScenarioBuilder::deployment_with(|_| QueueTopology::new(3)) + .with_run_duration(Duration::from_secs(30)) + .with_workload( + QueueRoundTripWorkload::new() + .operations(operations) + .rate_per_sec(25), + ) + .with_expectation(QueueDrained::new().timeout(Duration::from_secs(25))) + .build()?; + + let deployer = QueueLocalDeployer::default(); + let runner = deployer.deploy(&scenario).await?; + runner.run(&mut scenario).await?; + Ok(()) +} diff --git a/examples/queue/examples/src/bin/compose_convergence.rs b/examples/queue/examples/src/bin/compose_convergence.rs new file mode 100644 index 0000000..adec78c --- /dev/null +++ b/examples/queue/examples/src/bin/compose_convergence.rs @@ -0,0 +1,47 @@ +use std::time::Duration; + +use anyhow::{Context as _, Result}; +use queue_runtime_workloads::{ + QueueBuilderExt, QueueConverges, QueueProduceWorkload, QueueScenarioBuilder, QueueTopology, +}; +use testing_framework_core::scenario::Deployer; +use testing_framework_runner_compose::ComposeRunnerError; +use tracing::{info, warn}; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let operations = 200; + + let mut scenario = QueueScenarioBuilder::deployment_with(|_| QueueTopology::new(3)) + .with_run_duration(Duration::from_secs(30)) + .with_workload( + QueueProduceWorkload::new() + .operations(operations) + .rate_per_sec(20), + ) + .with_expectation(QueueConverges::new(operations).timeout(Duration::from_secs(25))) + .build()?; + + let deployer = queue_runtime_ext::QueueComposeDeployer::new(); + let runner = match deployer.deploy(&scenario).await { + Ok(runner) => runner, + Err(ComposeRunnerError::DockerUnavailable) => { + warn!("docker unavailable; skipping compose queue run"); + return Ok(()); + } + Err(error) => { + return Err(anyhow::Error::new(error)).context("deploying queue compose stack"); + } + }; + + info!("running queue compose convergence scenario"); + runner + .run(&mut scenario) + .await + .context("running queue compose scenario")?; + Ok(()) +} diff --git a/examples/queue/examples/src/bin/compose_roundtrip.rs b/examples/queue/examples/src/bin/compose_roundtrip.rs new file mode 100644 index 0000000..59683f2 --- /dev/null +++ b/examples/queue/examples/src/bin/compose_roundtrip.rs @@ -0,0 +1,48 @@ +use std::time::Duration; + +use anyhow::{Context as _, Result}; +use queue_runtime_workloads::{ + QueueBuilderExt, QueueDrained, QueueRoundTripWorkload, QueueScenarioBuilder, QueueTopology, +}; +use testing_framework_core::scenario::Deployer; +use testing_framework_runner_compose::ComposeRunnerError; +use tracing::{info, warn}; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let operations = 200; + + let mut scenario = QueueScenarioBuilder::deployment_with(|_| QueueTopology::new(3)) + .with_run_duration(Duration::from_secs(30)) + .with_workload( + QueueRoundTripWorkload::new() + .operations(operations) + .rate_per_sec(20), + ) + .with_expectation(QueueDrained::new().timeout(Duration::from_secs(25))) + .build()?; + + let deployer = queue_runtime_ext::QueueComposeDeployer::new(); + let runner = match deployer.deploy(&scenario).await { + Ok(runner) => runner, + Err(ComposeRunnerError::DockerUnavailable) => { + warn!("docker unavailable; skipping compose queue roundtrip run"); + return Ok(()); + } + Err(error) => { + return Err(anyhow::Error::new(error)) + .context("deploying queue compose roundtrip stack"); + } + }; + + info!("running queue compose roundtrip scenario"); + runner + .run(&mut scenario) + .await + .context("running queue compose roundtrip scenario")?; + Ok(()) +} diff --git a/examples/queue/queue-node/Cargo.toml b/examples/queue/queue-node/Cargo.toml new file mode 100644 index 0000000..2d37a6a --- /dev/null +++ b/examples/queue/queue-node/Cargo.toml @@ -0,0 +1,21 @@ +[package] +edition.workspace = true +license.workspace = true +name = "queue-node" +version.workspace = true + +[[bin]] +name = "queue-node" +path = "src/main.rs" + +[dependencies] +anyhow = "1.0" +axum = "0.7" +clap = { version = "4.0", features = ["derive"] } +reqwest = { workspace = true, features = ["json"] } +serde = { workspace = true } +serde_yaml = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tower-http = { version = "0.6", features = ["trace"] } +tracing = { workspace = true } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/examples/queue/queue-node/src/client.rs b/examples/queue/queue-node/src/client.rs new file mode 100644 index 0000000..1b6b4e9 --- /dev/null +++ b/examples/queue/queue-node/src/client.rs @@ -0,0 +1,40 @@ +use reqwest::Url; +use serde::Serialize; + +#[derive(Clone)] +pub struct QueueHttpClient { + base_url: Url, + client: reqwest::Client, +} + +impl QueueHttpClient { + #[must_use] + pub fn new(base_url: Url) -> Self { + Self { + base_url, + client: reqwest::Client::new(), + } + } + + pub async fn get(&self, path: &str) -> anyhow::Result { + let url = self.base_url.join(path)?; + let response = self.client.get(url).send().await?.error_for_status()?; + Ok(response.json().await?) + } + + pub async fn post( + &self, + path: &str, + body: &B, + ) -> anyhow::Result { + let url = self.base_url.join(path)?; + let response = self + .client + .post(url) + .json(body) + .send() + .await? + .error_for_status()?; + Ok(response.json().await?) + } +} diff --git a/examples/queue/queue-node/src/config.rs b/examples/queue/queue-node/src/config.rs new file mode 100644 index 0000000..ab63ae5 --- /dev/null +++ b/examples/queue/queue-node/src/config.rs @@ -0,0 +1,29 @@ +use std::{fs, path::Path}; + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PeerInfo { + pub node_id: u64, + pub http_address: String, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct QueueConfig { + pub node_id: u64, + pub http_port: u16, + pub peers: Vec, + #[serde(default = "default_sync_interval_ms")] + pub sync_interval_ms: u64, +} + +impl QueueConfig { + pub fn load(path: &Path) -> anyhow::Result { + let raw = fs::read_to_string(path)?; + Ok(serde_yaml::from_str(&raw)?) + } +} + +const fn default_sync_interval_ms() -> u64 { + 1000 +} diff --git a/examples/queue/queue-node/src/lib.rs b/examples/queue/queue-node/src/lib.rs new file mode 100644 index 0000000..90997fe --- /dev/null +++ b/examples/queue/queue-node/src/lib.rs @@ -0,0 +1,3 @@ +pub mod client; + +pub use client::QueueHttpClient; diff --git a/examples/queue/queue-node/src/main.rs b/examples/queue/queue-node/src/main.rs new file mode 100644 index 0000000..30c5f63 --- /dev/null +++ b/examples/queue/queue-node/src/main.rs @@ -0,0 +1,36 @@ +mod config; +mod server; +mod state; +mod sync; + +use std::path::PathBuf; + +use clap::Parser; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +use crate::{config::QueueConfig, state::QueueState, sync::SyncService}; + +#[derive(Parser, Debug)] +#[command(name = "queue-node")] +struct Args { + #[arg(short, long)] + config: PathBuf, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "queue_node=info,tower_http=debug".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + + let args = Args::parse(); + let config = QueueConfig::load(&args.config)?; + + let state = QueueState::new(config.node_id); + SyncService::new(config.clone(), state.clone()).start(); + server::start_server(config, state).await +} diff --git a/examples/queue/queue-node/src/server.rs b/examples/queue/queue-node/src/server.rs new file mode 100644 index 0000000..85227f1 --- /dev/null +++ b/examples/queue/queue-node/src/server.rs @@ -0,0 +1,115 @@ +use std::net::SocketAddr; + +use axum::{ + Router, + extract::State, + http::StatusCode, + response::Json, + routing::{get, post}, +}; +use serde::{Deserialize, Serialize}; +use tower_http::trace::TraceLayer; + +use crate::{ + config::QueueConfig, + state::{QueueMessage, QueueRevision, QueueState, QueueStateView, Snapshot}, +}; + +#[derive(Serialize)] +struct HealthResponse { + status: &'static str, +} + +#[derive(Deserialize)] +struct EnqueueRequest { + payload: String, +} + +#[derive(Serialize)] +struct EnqueueResponse { + accepted: bool, + id: u64, + queue_len: usize, + revision: QueueRevision, +} + +#[derive(Serialize)] +struct DequeueResponse { + message: Option, + queue_len: usize, + revision: QueueRevision, +} + +pub async fn start_server(config: QueueConfig, state: QueueState) -> anyhow::Result<()> { + let app = Router::new() + .route("/health/live", get(health_live)) + .route("/health/ready", get(health_ready)) + .route("/queue/enqueue", post(enqueue)) + .route("/queue/dequeue", post(dequeue)) + .route("/queue/state", get(queue_state)) + .route("/internal/snapshot", get(get_snapshot)) + .layer(TraceLayer::new_for_http()) + .with_state(state.clone()); + + let addr = SocketAddr::from(([0, 0, 0, 0], config.http_port)); + let listener = tokio::net::TcpListener::bind(addr).await?; + + state.set_ready(true).await; + tracing::info!(node_id = state.node_id(), %addr, "queue node ready"); + + axum::serve(listener, app).await?; + Ok(()) +} + +async fn health_live() -> (StatusCode, Json) { + (StatusCode::OK, Json(HealthResponse { status: "alive" })) +} + +async fn health_ready(State(state): State) -> (StatusCode, Json) { + if state.is_ready().await { + (StatusCode::OK, Json(HealthResponse { status: "ready" })) + } else { + ( + StatusCode::SERVICE_UNAVAILABLE, + Json(HealthResponse { + status: "not-ready", + }), + ) + } +} + +async fn enqueue( + State(state): State, + Json(request): Json, +) -> (StatusCode, Json) { + let outcome = state.enqueue_local(request.payload).await; + ( + StatusCode::OK, + Json(EnqueueResponse { + accepted: outcome.accepted, + id: outcome.id, + queue_len: outcome.queue_len, + revision: outcome.revision, + }), + ) +} + +async fn dequeue(State(state): State) -> (StatusCode, Json) { + let outcome = state.dequeue_local().await; + ( + StatusCode::OK, + Json(DequeueResponse { + message: outcome.message, + queue_len: outcome.queue_len, + revision: outcome.revision, + }), + ) +} + +async fn queue_state(State(state): State) -> Json { + Json(state.queue_state().await) +} + +async fn get_snapshot(State(state): State) -> Json { + Json(state.snapshot().await) +} diff --git a/examples/queue/queue-node/src/state.rs b/examples/queue/queue-node/src/state.rs new file mode 100644 index 0000000..4bd59c7 --- /dev/null +++ b/examples/queue/queue-node/src/state.rs @@ -0,0 +1,151 @@ +use std::{collections::VecDeque, sync::Arc}; + +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; + +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] +pub struct QueueRevision { + pub version: u64, + pub origin: u64, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct QueueMessage { + pub id: u64, + pub payload: String, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Snapshot { + pub node_id: u64, + pub revision: QueueRevision, + pub messages: Vec, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +pub struct QueueStateView { + pub revision: QueueRevision, + pub queue_len: usize, + pub head_id: Option, + pub tail_id: Option, +} + +#[derive(Clone, Debug)] +pub struct EnqueueOutcome { + pub accepted: bool, + pub id: u64, + pub queue_len: usize, + pub revision: QueueRevision, +} + +#[derive(Clone, Debug)] +pub struct DequeueOutcome { + pub message: Option, + pub queue_len: usize, + pub revision: QueueRevision, +} + +#[derive(Debug, Default)] +struct QueueData { + revision: QueueRevision, + messages: VecDeque, +} + +#[derive(Clone)] +pub struct QueueState { + node_id: u64, + ready: Arc>, + data: Arc>, +} + +impl QueueState { + pub fn new(node_id: u64) -> Self { + Self { + node_id, + ready: Arc::new(RwLock::new(false)), + data: Arc::new(RwLock::new(QueueData::default())), + } + } + + pub const fn node_id(&self) -> u64 { + self.node_id + } + + pub async fn set_ready(&self, value: bool) { + *self.ready.write().await = value; + } + + pub async fn is_ready(&self) -> bool { + *self.ready.read().await + } + + pub async fn enqueue_local(&self, payload: String) -> EnqueueOutcome { + let mut data = self.data.write().await; + let id = next_message_id(&data.messages); + data.messages.push_back(QueueMessage { id, payload }); + bump_revision(&mut data.revision, self.node_id); + + EnqueueOutcome { + accepted: true, + id, + queue_len: data.messages.len(), + revision: data.revision, + } + } + + pub async fn dequeue_local(&self) -> DequeueOutcome { + let mut data = self.data.write().await; + let message = data.messages.pop_front(); + if message.is_some() { + bump_revision(&mut data.revision, self.node_id); + } + + DequeueOutcome { + message, + queue_len: data.messages.len(), + revision: data.revision, + } + } + + pub async fn queue_state(&self) -> QueueStateView { + let data = self.data.read().await; + QueueStateView { + revision: data.revision, + queue_len: data.messages.len(), + head_id: data.messages.front().map(|message| message.id), + tail_id: data.messages.back().map(|message| message.id), + } + } + + pub async fn merge_snapshot(&self, snapshot: Snapshot) { + let mut data = self.data.write().await; + if is_newer_revision(snapshot.revision, data.revision) { + data.revision = snapshot.revision; + data.messages = snapshot.messages.into(); + } + } + + pub async fn snapshot(&self) -> Snapshot { + let data = self.data.read().await; + Snapshot { + node_id: self.node_id, + revision: data.revision, + messages: data.messages.iter().cloned().collect(), + } + } +} + +fn next_message_id(messages: &VecDeque) -> u64 { + messages + .back() + .map_or(1, |message| message.id.saturating_add(1)) +} + +fn bump_revision(revision: &mut QueueRevision, node_id: u64) { + revision.version = revision.version.saturating_add(1); + revision.origin = node_id; +} + +fn is_newer_revision(candidate: QueueRevision, existing: QueueRevision) -> bool { + (candidate.version, candidate.origin) > (existing.version, existing.origin) +} diff --git a/examples/queue/queue-node/src/sync.rs b/examples/queue/queue-node/src/sync.rs new file mode 100644 index 0000000..3018dc0 --- /dev/null +++ b/examples/queue/queue-node/src/sync.rs @@ -0,0 +1,103 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use reqwest::Client; +use tokio::sync::Mutex; +use tracing::{debug, warn}; + +use crate::{ + config::QueueConfig, + state::{QueueState, Snapshot}, +}; + +const WARN_AFTER_CONSECUTIVE_FAILURES: u32 = 5; + +#[derive(Clone)] +pub struct SyncService { + config: Arc, + state: QueueState, + client: Client, + failures_by_peer: Arc>>, +} + +impl SyncService { + pub fn new(config: QueueConfig, state: QueueState) -> Self { + Self { + config: Arc::new(config), + state, + client: Client::new(), + failures_by_peer: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub fn start(&self) { + let service = self.clone(); + tokio::spawn(async move { + service.run().await; + }); + } + + async fn run(self) { + let interval = Duration::from_millis(self.config.sync_interval_ms.max(100)); + loop { + self.sync_once().await; + tokio::time::sleep(interval).await; + } + } + + async fn sync_once(&self) { + for peer in &self.config.peers { + match self.fetch_snapshot(&peer.http_address).await { + Ok(snapshot) => { + self.state.merge_snapshot(snapshot).await; + self.clear_failure_counter(&peer.http_address).await; + } + Err(error) => { + self.record_sync_failure(&peer.http_address, &error).await; + } + } + } + } + + async fn fetch_snapshot(&self, peer_address: &str) -> anyhow::Result { + let url = format!("http://{peer_address}/internal/snapshot"); + let snapshot = self + .client + .get(url) + .send() + .await? + .error_for_status()? + .json() + .await?; + Ok(snapshot) + } + + async fn clear_failure_counter(&self, peer_address: &str) { + let mut failures = self.failures_by_peer.lock().await; + failures.remove(peer_address); + } + + async fn record_sync_failure(&self, peer_address: &str, error: &anyhow::Error) { + let consecutive_failures = { + let mut failures = self.failures_by_peer.lock().await; + let entry = failures.entry(peer_address.to_owned()).or_insert(0); + *entry += 1; + *entry + }; + + if consecutive_failures >= WARN_AFTER_CONSECUTIVE_FAILURES { + warn!( + peer = %peer_address, + %error, + consecutive_failures, + "queue sync repeatedly failing" + ); + } else { + debug!( + peer = %peer_address, + %error, + consecutive_failures, + "queue sync failed" + ); + } + } +} diff --git a/examples/queue/testing/integration/Cargo.toml b/examples/queue/testing/integration/Cargo.toml new file mode 100644 index 0000000..c85823e --- /dev/null +++ b/examples/queue/testing/integration/Cargo.toml @@ -0,0 +1,13 @@ +[package] +edition.workspace = true +license.workspace = true +name = "queue-runtime-ext" +version.workspace = true + +[dependencies] +async-trait = { workspace = true } +queue-node = { path = "../../queue-node" } +serde = { workspace = true } +testing-framework-core = { workspace = true } +testing-framework-runner-compose = { workspace = true } +testing-framework-runner-local = { workspace = true } diff --git a/examples/queue/testing/integration/src/app.rs b/examples/queue/testing/integration/src/app.rs new file mode 100644 index 0000000..5d42181 --- /dev/null +++ b/examples/queue/testing/integration/src/app.rs @@ -0,0 +1,75 @@ +use std::io::Error; + +use async_trait::async_trait; +use queue_node::QueueHttpClient; +use serde::{Deserialize, Serialize}; +use testing_framework_core::scenario::{ + Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView, DynError, + NodeAccess, serialize_cluster_yaml_config, +}; + +pub type QueueTopology = testing_framework_core::topology::ClusterTopology; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct QueuePeerInfo { + pub node_id: u64, + pub http_address: String, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct QueueNodeConfig { + pub node_id: u64, + pub http_port: u16, + pub peers: Vec, + pub sync_interval_ms: u64, +} + +pub struct QueueEnv; + +#[async_trait] +impl Application for QueueEnv { + type Deployment = QueueTopology; + type NodeClient = QueueHttpClient; + type NodeConfig = QueueNodeConfig; + fn build_node_client(access: &NodeAccess) -> Result { + Ok(QueueHttpClient::new(access.api_base_url()?)) + } + + fn node_readiness_path() -> &'static str { + "/health/ready" + } +} + +impl ClusterNodeConfigApplication for QueueEnv { + type ConfigError = Error; + + fn static_network_port() -> u16 { + 8080 + } + + fn build_cluster_node_config( + node: &ClusterNodeView, + peers: &[ClusterPeerView], + ) -> Result { + let peers = peers + .iter() + .map(|peer| QueuePeerInfo { + node_id: peer.index() as u64, + http_address: peer.authority(), + }) + .collect::>(); + + Ok(QueueNodeConfig { + node_id: node.index() as u64, + http_port: node.network_port(), + peers, + sync_interval_ms: 500, + }) + } + + fn serialize_cluster_node_config( + config: &Self::NodeConfig, + ) -> Result { + serialize_cluster_yaml_config(config).map_err(Error::other) + } +} diff --git a/examples/queue/testing/integration/src/compose_env.rs b/examples/queue/testing/integration/src/compose_env.rs new file mode 100644 index 0000000..04722ff --- /dev/null +++ b/examples/queue/testing/integration/src/compose_env.rs @@ -0,0 +1,15 @@ +use testing_framework_runner_compose::{BinaryConfigNodeSpec, ComposeBinaryApp}; + +use crate::QueueEnv; + +const NODE_CONFIG_PATH: &str = "/etc/queue/config.yaml"; + +impl ComposeBinaryApp for QueueEnv { + fn compose_node_spec() -> BinaryConfigNodeSpec { + BinaryConfigNodeSpec::conventional( + "/usr/local/bin/queue-node", + NODE_CONFIG_PATH, + vec![8080, 8081], + ) + } +} diff --git a/examples/queue/testing/integration/src/lib.rs b/examples/queue/testing/integration/src/lib.rs new file mode 100644 index 0000000..184f71a --- /dev/null +++ b/examples/queue/testing/integration/src/lib.rs @@ -0,0 +1,10 @@ +mod app; +mod compose_env; +mod local_env; +pub mod scenario; + +pub use app::*; +pub use scenario::{QueueBuilderExt, QueueScenarioBuilder}; + +pub type QueueLocalDeployer = testing_framework_runner_local::ProcessDeployer; +pub type QueueComposeDeployer = testing_framework_runner_compose::ComposeDeployer; diff --git a/examples/queue/testing/integration/src/local_env.rs b/examples/queue/testing/integration/src/local_env.rs new file mode 100644 index 0000000..90649f8 --- /dev/null +++ b/examples/queue/testing/integration/src/local_env.rs @@ -0,0 +1,41 @@ +use std::collections::HashMap; + +use testing_framework_core::scenario::{DynError, StartNodeOptions}; +use testing_framework_runner_local::{ + LocalBinaryApp, LocalNodePorts, LocalPeerNode, LocalProcessSpec, + build_local_cluster_node_config, yaml_node_config, +}; + +use crate::{QueueEnv, QueueNodeConfig}; + +impl LocalBinaryApp for QueueEnv { + fn initial_node_name_prefix() -> &'static str { + "queue-node" + } + + fn build_local_node_config_with_peers( + _topology: &Self::Deployment, + index: usize, + ports: &LocalNodePorts, + peers: &[LocalPeerNode], + _peer_ports_by_name: &HashMap, + _options: &StartNodeOptions, + _template_config: Option< + &::NodeConfig, + >, + ) -> Result<::NodeConfig, DynError> { + build_local_cluster_node_config::(index, ports, peers) + } + + fn local_process_spec() -> LocalProcessSpec { + LocalProcessSpec::new("QUEUE_NODE_BIN", "queue-node").with_rust_log("queue_node=info") + } + + fn render_local_config(config: &QueueNodeConfig) -> Result, DynError> { + yaml_node_config(config) + } + + fn http_api_port(config: &QueueNodeConfig) -> u16 { + config.http_port + } +} diff --git a/examples/queue/testing/integration/src/scenario.rs b/examples/queue/testing/integration/src/scenario.rs new file mode 100644 index 0000000..4ae9904 --- /dev/null +++ b/examples/queue/testing/integration/src/scenario.rs @@ -0,0 +1,15 @@ +use testing_framework_core::scenario::ScenarioBuilder; + +use crate::{QueueEnv, QueueTopology}; + +pub type QueueScenarioBuilder = ScenarioBuilder; + +pub trait QueueBuilderExt: Sized { + fn deployment_with(f: impl FnOnce(QueueTopology) -> QueueTopology) -> Self; +} + +impl QueueBuilderExt for QueueScenarioBuilder { + fn deployment_with(f: impl FnOnce(QueueTopology) -> QueueTopology) -> Self { + QueueScenarioBuilder::with_deployment(f(QueueTopology::new(3))) + } +} diff --git a/examples/queue/testing/workloads/Cargo.toml b/examples/queue/testing/workloads/Cargo.toml new file mode 100644 index 0000000..c3e52ec --- /dev/null +++ b/examples/queue/testing/workloads/Cargo.toml @@ -0,0 +1,14 @@ +[package] +edition.workspace = true +license.workspace = true +name = "queue-runtime-workloads" +version.workspace = true + +[dependencies] +async-trait = { workspace = true } +queue-node = { path = "../../queue-node" } +queue-runtime-ext = { path = "../integration" } +serde = { workspace = true } +testing-framework-core = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } diff --git a/examples/queue/testing/workloads/src/drained.rs b/examples/queue/testing/workloads/src/drained.rs new file mode 100644 index 0000000..9ff3eeb --- /dev/null +++ b/examples/queue/testing/workloads/src/drained.rs @@ -0,0 +1,104 @@ +use std::time::Duration; + +use async_trait::async_trait; +use queue_runtime_ext::QueueEnv; +use serde::Deserialize; +use testing_framework_core::scenario::{DynError, Expectation, RunContext}; +use tracing::info; + +#[derive(Clone)] +pub struct QueueDrained { + timeout: Duration, + poll_interval: Duration, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +struct QueueRevision { + version: u64, + origin: u64, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +struct QueueStateResponse { + revision: QueueRevision, + queue_len: usize, + head_id: Option, + tail_id: Option, +} + +impl QueueDrained { + #[must_use] + pub fn new() -> Self { + Self { + timeout: Duration::from_secs(20), + poll_interval: Duration::from_millis(500), + } + } + + #[must_use] + pub const fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } +} + +impl Default for QueueDrained { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl Expectation for QueueDrained { + fn name(&self) -> &str { + "queue_drained" + } + + async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> { + let clients = ctx.node_clients().snapshot(); + if clients.is_empty() { + return Err("no queue node clients available".into()); + } + + let deadline = tokio::time::Instant::now() + self.timeout; + while tokio::time::Instant::now() < deadline { + if is_drained_and_converged(&clients).await? { + info!("queue drained and converged"); + return Ok(()); + } + tokio::time::sleep(self.poll_interval).await; + } + + Err(format!("queue not drained within {:?}", self.timeout).into()) + } +} + +async fn is_drained_and_converged( + clients: &[queue_node::QueueHttpClient], +) -> Result { + let Some((first, rest)) = clients.split_first() else { + return Ok(false); + }; + + let baseline = read_state(first).await?; + if !is_drained(&baseline) { + return Ok(false); + } + + for client in rest { + let current = read_state(client).await?; + if current != baseline { + return Ok(false); + } + } + + Ok(true) +} + +fn is_drained(state: &QueueStateResponse) -> bool { + state.queue_len == 0 && state.head_id.is_none() && state.tail_id.is_none() +} + +async fn read_state(client: &queue_node::QueueHttpClient) -> Result { + Ok(client.get("/queue/state").await?) +} diff --git a/examples/queue/testing/workloads/src/expectations.rs b/examples/queue/testing/workloads/src/expectations.rs new file mode 100644 index 0000000..a06486b --- /dev/null +++ b/examples/queue/testing/workloads/src/expectations.rs @@ -0,0 +1,106 @@ +use std::time::Duration; + +use async_trait::async_trait; +use queue_runtime_ext::QueueEnv; +use serde::Deserialize; +use testing_framework_core::scenario::{DynError, Expectation, RunContext}; +use tracing::info; + +#[derive(Clone)] +pub struct QueueConverges { + min_queue_len: usize, + timeout: Duration, + poll_interval: Duration, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +struct QueueRevision { + version: u64, + origin: u64, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +struct QueueStateResponse { + revision: QueueRevision, + queue_len: usize, + head_id: Option, + tail_id: Option, +} + +impl QueueConverges { + #[must_use] + pub fn new(min_queue_len: usize) -> Self { + Self { + min_queue_len, + timeout: Duration::from_secs(20), + poll_interval: Duration::from_millis(500), + } + } + + #[must_use] + pub const fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } +} + +#[async_trait] +impl Expectation for QueueConverges { + fn name(&self) -> &str { + "queue_converges" + } + + async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> { + let clients = ctx.node_clients().snapshot(); + if clients.is_empty() { + return Err("no queue node clients available".into()); + } + + let deadline = tokio::time::Instant::now() + self.timeout; + while tokio::time::Instant::now() < deadline { + if self.is_converged(&clients).await? { + info!( + min_queue_len = self.min_queue_len, + "queue convergence reached" + ); + return Ok(()); + } + tokio::time::sleep(self.poll_interval).await; + } + + Err(format!( + "queue convergence not reached within {:?} (min_queue_len={})", + self.timeout, self.min_queue_len + ) + .into()) + } +} + +impl QueueConverges { + async fn is_converged( + &self, + clients: &[queue_node::QueueHttpClient], + ) -> Result { + let Some((first, rest)) = clients.split_first() else { + return Ok(false); + }; + + let baseline = read_state(first).await?; + if baseline.queue_len < self.min_queue_len { + return Ok(false); + } + + for client in rest { + let current = read_state(client).await?; + if current != baseline { + return Ok(false); + } + } + + Ok(true) + } +} + +async fn read_state(client: &queue_node::QueueHttpClient) -> Result { + Ok(client.get("/queue/state").await?) +} diff --git a/examples/queue/testing/workloads/src/lib.rs b/examples/queue/testing/workloads/src/lib.rs new file mode 100644 index 0000000..91b2670 --- /dev/null +++ b/examples/queue/testing/workloads/src/lib.rs @@ -0,0 +1,10 @@ +mod drained; +mod expectations; +mod produce; +mod roundtrip; + +pub use drained::QueueDrained; +pub use expectations::QueueConverges; +pub use produce::QueueProduceWorkload; +pub use queue_runtime_ext::{QueueBuilderExt, QueueEnv, QueueScenarioBuilder, QueueTopology}; +pub use roundtrip::QueueRoundTripWorkload; diff --git a/examples/queue/testing/workloads/src/produce.rs b/examples/queue/testing/workloads/src/produce.rs new file mode 100644 index 0000000..c2e3794 --- /dev/null +++ b/examples/queue/testing/workloads/src/produce.rs @@ -0,0 +1,116 @@ +use std::time::Duration; + +use async_trait::async_trait; +use queue_runtime_ext::QueueEnv; +use serde::{Deserialize, Serialize}; +use testing_framework_core::scenario::{DynError, RunContext, Workload}; +use tracing::info; + +#[derive(Clone)] +pub struct QueueProduceWorkload { + operations: usize, + rate_per_sec: Option, + payload_prefix: String, +} + +#[derive(Serialize)] +struct EnqueueRequest { + payload: String, +} + +#[derive(Deserialize)] +struct EnqueueResponse { + accepted: bool, + id: u64, + queue_len: usize, +} + +impl QueueProduceWorkload { + #[must_use] + pub fn new() -> Self { + Self { + operations: 200, + rate_per_sec: Some(25), + payload_prefix: "queue-demo".to_owned(), + } + } + + #[must_use] + pub const fn operations(mut self, value: usize) -> Self { + self.operations = value; + self + } + + #[must_use] + pub const fn rate_per_sec(mut self, value: usize) -> Self { + self.rate_per_sec = Some(value); + self + } + + #[must_use] + pub fn payload_prefix(mut self, value: impl Into) -> Self { + self.payload_prefix = value.into(); + self + } +} + +impl Default for QueueProduceWorkload { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl Workload for QueueProduceWorkload { + fn name(&self) -> &str { + "queue_produce_workload" + } + + async fn start(&self, ctx: &RunContext) -> Result<(), DynError> { + let clients = ctx.node_clients().snapshot(); + let Some(producer) = clients.first() else { + return Err("no queue node clients available".into()); + }; + + let interval = self.rate_per_sec.and_then(compute_interval); + info!( + operations = self.operations, + rate_per_sec = ?self.rate_per_sec, + "starting queue produce workload" + ); + + for idx in 0..self.operations { + let payload = format!("{}-{idx}", self.payload_prefix); + let response: EnqueueResponse = producer + .post("/queue/enqueue", &EnqueueRequest { payload }) + .await?; + + if !response.accepted { + return Err(format!("node rejected enqueue at operation {idx}").into()); + } + + if (idx + 1) % 25 == 0 { + info!( + completed = idx + 1, + last_id = response.id, + queue_len = response.queue_len, + "queue produce progress" + ); + } + + if let Some(delay) = interval { + tokio::time::sleep(delay).await; + } + } + + Ok(()) + } +} + +fn compute_interval(rate_per_sec: usize) -> Option { + if rate_per_sec == 0 { + return None; + } + + Some(Duration::from_millis((1000 / rate_per_sec as u64).max(1))) +} diff --git a/examples/queue/testing/workloads/src/roundtrip.rs b/examples/queue/testing/workloads/src/roundtrip.rs new file mode 100644 index 0000000..a87a84c --- /dev/null +++ b/examples/queue/testing/workloads/src/roundtrip.rs @@ -0,0 +1,179 @@ +use std::{collections::HashSet, time::Duration}; + +use async_trait::async_trait; +use queue_runtime_ext::QueueEnv; +use serde::{Deserialize, Serialize}; +use testing_framework_core::scenario::{DynError, RunContext, Workload}; +use tokio::time::{Instant, sleep}; +use tracing::info; + +#[derive(Clone)] +pub struct QueueRoundTripWorkload { + operations: usize, + rate_per_sec: Option, + payload_prefix: String, + drain_timeout: Duration, + empty_poll_interval: Duration, +} + +#[derive(Serialize)] +struct EnqueueRequest { + payload: String, +} + +#[derive(Deserialize)] +struct EnqueueResponse { + accepted: bool, + id: u64, +} + +#[derive(Serialize)] +struct DequeueRequest {} + +#[derive(Deserialize)] +struct QueueMessage { + id: u64, + payload: String, +} + +#[derive(Deserialize)] +struct DequeueResponse { + message: Option, +} + +impl QueueRoundTripWorkload { + #[must_use] + pub fn new() -> Self { + Self { + operations: 200, + rate_per_sec: Some(25), + payload_prefix: "queue-roundtrip".to_owned(), + drain_timeout: Duration::from_secs(20), + empty_poll_interval: Duration::from_millis(100), + } + } + + #[must_use] + pub const fn operations(mut self, value: usize) -> Self { + self.operations = value; + self + } + + #[must_use] + pub const fn rate_per_sec(mut self, value: usize) -> Self { + self.rate_per_sec = Some(value); + self + } + + #[must_use] + pub fn payload_prefix(mut self, value: impl Into) -> Self { + self.payload_prefix = value.into(); + self + } + + #[must_use] + pub const fn drain_timeout(mut self, value: Duration) -> Self { + self.drain_timeout = value; + self + } +} + +impl Default for QueueRoundTripWorkload { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl Workload for QueueRoundTripWorkload { + fn name(&self) -> &str { + "queue_roundtrip_workload" + } + + async fn start(&self, ctx: &RunContext) -> Result<(), DynError> { + let clients = ctx.node_clients().snapshot(); + let Some(driver) = clients.first() else { + return Err("no queue node clients available".into()); + }; + + let interval = self.rate_per_sec.and_then(compute_interval); + let mut produced_ids = HashSet::with_capacity(self.operations); + + info!( + operations = self.operations, + "queue roundtrip: produce phase" + ); + for idx in 0..self.operations { + let payload = format!("{}-{idx}", self.payload_prefix); + let response: EnqueueResponse = driver + .post("/queue/enqueue", &EnqueueRequest { payload }) + .await?; + + if !response.accepted { + return Err(format!("enqueue rejected at operation {idx}").into()); + } + + if !produced_ids.insert(response.id) { + return Err(format!("duplicate enqueue id observed: {}", response.id).into()); + } + + if let Some(delay) = interval { + sleep(delay).await; + } + } + + info!( + operations = self.operations, + "queue roundtrip: consume phase" + ); + let mut consumed = 0usize; + let deadline = Instant::now() + self.drain_timeout; + + while consumed < self.operations && Instant::now() < deadline { + let response: DequeueResponse = + driver.post("/queue/dequeue", &DequeueRequest {}).await?; + + match response.message { + Some(message) => { + if !message.payload.starts_with(&self.payload_prefix) { + return Err(format!("unexpected payload: {}", message.payload).into()); + } + if !produced_ids.remove(&message.id) { + return Err( + format!("unknown or duplicate dequeue id: {}", message.id).into() + ); + } + consumed += 1; + } + None => sleep(self.empty_poll_interval).await, + } + } + + if consumed != self.operations { + return Err(format!( + "queue roundtrip timed out: consumed {consumed}/{} messages", + self.operations + ) + .into()); + } + + if !produced_ids.is_empty() { + return Err(format!( + "queue roundtrip ended with {} undrained produced ids", + produced_ids.len() + ) + .into()); + } + + info!(operations = self.operations, "queue roundtrip finished"); + Ok(()) + } +} + +fn compute_interval(rate_per_sec: usize) -> Option { + if rate_per_sec == 0 { + return None; + } + + Some(Duration::from_millis((1000 / rate_per_sec as u64).max(1))) +} diff --git a/examples/scheduler/Dockerfile b/examples/scheduler/Dockerfile new file mode 100644 index 0000000..625ef34 --- /dev/null +++ b/examples/scheduler/Dockerfile @@ -0,0 +1,24 @@ +FROM rustlang/rust:nightly-bookworm AS builder + +WORKDIR /build + +COPY Cargo.toml Cargo.lock ./ +COPY cfgsync/ ./cfgsync/ +COPY examples/ ./examples/ +COPY testing-framework/ ./testing-framework/ + +RUN cargo build --release -p scheduler-node + +FROM debian:bookworm-slim + +RUN apt-get update && \ + apt-get install -y ca-certificates && \ + rm -rf /var/lib/apt/lists/* + +COPY --from=builder /build/target/release/scheduler-node /usr/local/bin/scheduler-node + +RUN mkdir -p /etc/scheduler +WORKDIR /app + +ENTRYPOINT ["/usr/local/bin/scheduler-node"] +CMD ["--config", "/etc/scheduler/config.yaml"] diff --git a/examples/scheduler/README.md b/examples/scheduler/README.md new file mode 100644 index 0000000..9a97079 --- /dev/null +++ b/examples/scheduler/README.md @@ -0,0 +1,45 @@ +# Scheduler Example + +This example runs a small replicated job scheduler with worker leases. + +The scenario enqueues jobs, lets one worker claim them, stops making progress, +and then checks that another worker can reclaim and complete them after the +lease expires. + +## How TF runs this + +Each example follows the same pattern: + +- TF starts a small deployment of scheduler nodes +- the workload drives the worker flow through the HTTP API +- the expectation checks that jobs are eventually reclaimed and completed + +## Scenario + +- `basic_failover` runs the failover flow locally +- `compose_failover` runs the same flow in Docker Compose + +## API + +Each node exposes: + +- `POST /jobs/enqueue` to add jobs +- `POST /jobs/claim` to claim pending jobs with a lease +- `POST /jobs/heartbeat` to extend a lease +- `POST /jobs/ack` to mark a job complete +- `GET /jobs/state` to inspect scheduler state +- `GET /internal/snapshot` to read the local replicated state + +## Run locally + +```bash +cargo run -p scheduler-examples --bin basic_failover +``` + +## Run with Docker Compose + +```bash +cargo run -p scheduler-examples --bin compose_failover +``` + +Set `SCHEDULER_IMAGE` to override the default compose image tag. diff --git a/examples/scheduler/examples/Cargo.toml b/examples/scheduler/examples/Cargo.toml new file mode 100644 index 0000000..32c3756 --- /dev/null +++ b/examples/scheduler/examples/Cargo.toml @@ -0,0 +1,15 @@ +[package] +edition.workspace = true +license.workspace = true +name = "scheduler-examples" +version.workspace = true + +[dependencies] +anyhow = "1.0" +scheduler-runtime-ext = { path = "../testing/integration" } +scheduler-runtime-workloads = { path = "../testing/workloads" } +testing-framework-core = { workspace = true } +testing-framework-runner-compose = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/examples/scheduler/examples/src/bin/basic_failover.rs b/examples/scheduler/examples/src/bin/basic_failover.rs new file mode 100644 index 0000000..f7c4577 --- /dev/null +++ b/examples/scheduler/examples/src/bin/basic_failover.rs @@ -0,0 +1,33 @@ +use std::time::Duration; + +use scheduler_runtime_ext::SchedulerLocalDeployer; +use scheduler_runtime_workloads::{ + SchedulerBuilderExt, SchedulerDrained, SchedulerLeaseFailoverWorkload, + SchedulerScenarioBuilder, SchedulerTopology, +}; +use testing_framework_core::scenario::Deployer; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let jobs = 100; + + let mut scenario = SchedulerScenarioBuilder::deployment_with(|_| SchedulerTopology::new(3)) + .with_run_duration(Duration::from_secs(35)) + .with_workload( + SchedulerLeaseFailoverWorkload::new() + .operations(jobs) + .lease_ttl(Duration::from_secs(3)) + .rate_per_sec(25), + ) + .with_expectation(SchedulerDrained::new(jobs).timeout(Duration::from_secs(30))) + .build()?; + + let deployer = SchedulerLocalDeployer::default(); + let runner = deployer.deploy(&scenario).await?; + runner.run(&mut scenario).await?; + Ok(()) +} diff --git a/examples/scheduler/examples/src/bin/compose_failover.rs b/examples/scheduler/examples/src/bin/compose_failover.rs new file mode 100644 index 0000000..8bbdb01 --- /dev/null +++ b/examples/scheduler/examples/src/bin/compose_failover.rs @@ -0,0 +1,49 @@ +use std::time::Duration; + +use anyhow::{Context as _, Result}; +use scheduler_runtime_workloads::{ + SchedulerBuilderExt, SchedulerDrained, SchedulerLeaseFailoverWorkload, + SchedulerScenarioBuilder, SchedulerTopology, +}; +use testing_framework_core::scenario::Deployer; +use testing_framework_runner_compose::ComposeRunnerError; +use tracing::{info, warn}; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let jobs = 100; + + let mut scenario = SchedulerScenarioBuilder::deployment_with(|_| SchedulerTopology::new(3)) + .with_run_duration(Duration::from_secs(35)) + .with_workload( + SchedulerLeaseFailoverWorkload::new() + .operations(jobs) + .lease_ttl(Duration::from_secs(3)) + .rate_per_sec(20), + ) + .with_expectation(SchedulerDrained::new(jobs).timeout(Duration::from_secs(30))) + .build()?; + + let deployer = scheduler_runtime_ext::SchedulerComposeDeployer::new(); + let runner = match deployer.deploy(&scenario).await { + Ok(runner) => runner, + Err(ComposeRunnerError::DockerUnavailable) => { + warn!("docker unavailable; skipping scheduler compose run"); + return Ok(()); + } + Err(error) => { + return Err(anyhow::Error::new(error)).context("deploying scheduler compose stack"); + } + }; + + info!("running scheduler compose failover scenario"); + runner + .run(&mut scenario) + .await + .context("running scheduler compose scenario")?; + Ok(()) +} diff --git a/examples/scheduler/scheduler-node/Cargo.toml b/examples/scheduler/scheduler-node/Cargo.toml new file mode 100644 index 0000000..6f2e682 --- /dev/null +++ b/examples/scheduler/scheduler-node/Cargo.toml @@ -0,0 +1,21 @@ +[package] +edition.workspace = true +license.workspace = true +name = "scheduler-node" +version.workspace = true + +[[bin]] +name = "scheduler-node" +path = "src/main.rs" + +[dependencies] +anyhow = "1.0" +axum = "0.7" +clap = { version = "4.0", features = ["derive"] } +reqwest = { workspace = true, features = ["json"] } +serde = { workspace = true } +serde_yaml = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tower-http = { version = "0.6", features = ["trace"] } +tracing = { workspace = true } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/examples/scheduler/scheduler-node/src/client.rs b/examples/scheduler/scheduler-node/src/client.rs new file mode 100644 index 0000000..4c2e0ee --- /dev/null +++ b/examples/scheduler/scheduler-node/src/client.rs @@ -0,0 +1,40 @@ +use reqwest::Url; +use serde::Serialize; + +#[derive(Clone)] +pub struct SchedulerHttpClient { + base_url: Url, + client: reqwest::Client, +} + +impl SchedulerHttpClient { + #[must_use] + pub fn new(base_url: Url) -> Self { + Self { + base_url, + client: reqwest::Client::new(), + } + } + + pub async fn get(&self, path: &str) -> anyhow::Result { + let url = self.base_url.join(path)?; + let response = self.client.get(url).send().await?.error_for_status()?; + Ok(response.json().await?) + } + + pub async fn post( + &self, + path: &str, + body: &B, + ) -> anyhow::Result { + let url = self.base_url.join(path)?; + let response = self + .client + .post(url) + .json(body) + .send() + .await? + .error_for_status()?; + Ok(response.json().await?) + } +} diff --git a/examples/scheduler/scheduler-node/src/config.rs b/examples/scheduler/scheduler-node/src/config.rs new file mode 100644 index 0000000..fca2eb7 --- /dev/null +++ b/examples/scheduler/scheduler-node/src/config.rs @@ -0,0 +1,35 @@ +use std::{fs, path::Path}; + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PeerInfo { + pub node_id: u64, + pub http_address: String, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct SchedulerConfig { + pub node_id: u64, + pub http_port: u16, + pub peers: Vec, + #[serde(default = "default_sync_interval_ms")] + pub sync_interval_ms: u64, + #[serde(default = "default_lease_ttl_ms")] + pub lease_ttl_ms: u64, +} + +impl SchedulerConfig { + pub fn load(path: &Path) -> anyhow::Result { + let raw = fs::read_to_string(path)?; + Ok(serde_yaml::from_str(&raw)?) + } +} + +const fn default_sync_interval_ms() -> u64 { + 1000 +} + +const fn default_lease_ttl_ms() -> u64 { + 3000 +} diff --git a/examples/scheduler/scheduler-node/src/lib.rs b/examples/scheduler/scheduler-node/src/lib.rs new file mode 100644 index 0000000..3f17002 --- /dev/null +++ b/examples/scheduler/scheduler-node/src/lib.rs @@ -0,0 +1,3 @@ +pub mod client; + +pub use client::SchedulerHttpClient; diff --git a/examples/scheduler/scheduler-node/src/main.rs b/examples/scheduler/scheduler-node/src/main.rs new file mode 100644 index 0000000..803c212 --- /dev/null +++ b/examples/scheduler/scheduler-node/src/main.rs @@ -0,0 +1,36 @@ +mod config; +mod server; +mod state; +mod sync; + +use std::path::PathBuf; + +use clap::Parser; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +use crate::{config::SchedulerConfig, state::SchedulerState, sync::SyncService}; + +#[derive(Parser, Debug)] +#[command(name = "scheduler-node")] +struct Args { + #[arg(short, long)] + config: PathBuf, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "scheduler_node=info,tower_http=debug".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + + let args = Args::parse(); + let config = SchedulerConfig::load(&args.config)?; + + let state = SchedulerState::new(config.node_id, config.lease_ttl_ms); + SyncService::new(config.clone(), state.clone()).start(); + server::start_server(config, state).await +} diff --git a/examples/scheduler/scheduler-node/src/server.rs b/examples/scheduler/scheduler-node/src/server.rs new file mode 100644 index 0000000..662be80 --- /dev/null +++ b/examples/scheduler/scheduler-node/src/server.rs @@ -0,0 +1,156 @@ +use std::net::SocketAddr; + +use axum::{ + Router, + extract::State, + http::StatusCode, + response::Json, + routing::{get, post}, +}; +use serde::{Deserialize, Serialize}; +use tower_http::trace::TraceLayer; + +use crate::{ + config::SchedulerConfig, + state::{SchedulerState, Snapshot, StateView}, +}; + +#[derive(Serialize)] +struct HealthResponse { + status: &'static str, +} + +#[derive(Deserialize)] +struct EnqueueRequest { + payload: String, +} + +#[derive(Serialize)] +struct EnqueueResponse { + id: u64, +} + +#[derive(Deserialize)] +struct ClaimRequest { + worker_id: String, + max_jobs: usize, +} + +#[derive(Serialize)] +struct ClaimResponse { + jobs: Vec, +} + +#[derive(Serialize)] +struct ClaimedJob { + id: u64, + payload: String, + attempt: u32, +} + +#[derive(Deserialize)] +struct HeartbeatRequest { + worker_id: String, + job_id: u64, +} + +#[derive(Deserialize)] +struct AckRequest { + worker_id: String, + job_id: u64, +} + +#[derive(Serialize)] +struct OperationResponse { + ok: bool, +} + +pub async fn start_server(config: SchedulerConfig, state: SchedulerState) -> anyhow::Result<()> { + let app = Router::new() + .route("/health/live", get(health_live)) + .route("/health/ready", get(health_ready)) + .route("/jobs/enqueue", post(enqueue)) + .route("/jobs/claim", post(claim)) + .route("/jobs/heartbeat", post(heartbeat)) + .route("/jobs/ack", post(ack)) + .route("/jobs/state", get(state_view)) + .route("/internal/snapshot", get(snapshot)) + .layer(TraceLayer::new_for_http()) + .with_state(state.clone()); + + let addr = SocketAddr::from(([0, 0, 0, 0], config.http_port)); + let listener = tokio::net::TcpListener::bind(addr).await?; + + state.set_ready(true).await; + tracing::info!(node_id = state.node_id(), %addr, "scheduler node ready"); + + axum::serve(listener, app).await?; + Ok(()) +} + +async fn health_live() -> (StatusCode, Json) { + (StatusCode::OK, Json(HealthResponse { status: "alive" })) +} + +async fn health_ready(State(state): State) -> (StatusCode, Json) { + if state.is_ready().await { + (StatusCode::OK, Json(HealthResponse { status: "ready" })) + } else { + ( + StatusCode::SERVICE_UNAVAILABLE, + Json(HealthResponse { + status: "not-ready", + }), + ) + } +} + +async fn enqueue( + State(state): State, + Json(request): Json, +) -> (StatusCode, Json) { + let id = state.enqueue(request.payload).await; + (StatusCode::OK, Json(EnqueueResponse { id })) +} + +async fn claim( + State(state): State, + Json(request): Json, +) -> (StatusCode, Json) { + let result = state.claim(request.worker_id, request.max_jobs).await; + let jobs = result + .jobs + .into_iter() + .map(|job| ClaimedJob { + id: job.id, + payload: job.payload, + attempt: job.attempt, + }) + .collect(); + + (StatusCode::OK, Json(ClaimResponse { jobs })) +} + +async fn heartbeat( + State(state): State, + Json(request): Json, +) -> (StatusCode, Json) { + let ok = state.heartbeat(&request.worker_id, request.job_id).await; + (StatusCode::OK, Json(OperationResponse { ok })) +} + +async fn ack( + State(state): State, + Json(request): Json, +) -> (StatusCode, Json) { + let ok = state.ack(&request.worker_id, request.job_id).await; + (StatusCode::OK, Json(OperationResponse { ok })) +} + +async fn state_view(State(state): State) -> Json { + Json(state.state_view().await) +} + +async fn snapshot(State(state): State) -> Json { + Json(state.snapshot().await) +} diff --git a/examples/scheduler/scheduler-node/src/state.rs b/examples/scheduler/scheduler-node/src/state.rs new file mode 100644 index 0000000..d3e8ada --- /dev/null +++ b/examples/scheduler/scheduler-node/src/state.rs @@ -0,0 +1,249 @@ +use std::{ + collections::BTreeMap, + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; + +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; + +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] +pub struct Revision { + pub version: u64, + pub origin: u64, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct JobRecord { + pub id: u64, + pub payload: String, + pub attempt: u32, + pub owner: Option, + pub lease_expires_at_ms: Option, + pub done: bool, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Snapshot { + pub node_id: u64, + pub revision: Revision, + pub next_id: u64, + pub jobs: BTreeMap, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +pub struct StateView { + pub revision: Revision, + pub next_id: u64, + pub pending: usize, + pub leased: usize, + pub done: usize, +} + +#[derive(Clone, Debug, Serialize)] +pub struct ClaimResult { + pub jobs: Vec, +} + +#[derive(Debug, Default)] +struct Data { + revision: Revision, + next_id: u64, + jobs: BTreeMap, +} + +#[derive(Clone)] +pub struct SchedulerState { + node_id: u64, + ready: Arc>, + lease_ttl_ms: u64, + data: Arc>, +} + +impl SchedulerState { + pub fn new(node_id: u64, lease_ttl_ms: u64) -> Self { + Self { + node_id, + ready: Arc::new(RwLock::new(false)), + lease_ttl_ms, + data: Arc::new(RwLock::new(Data { + next_id: 1, + ..Data::default() + })), + } + } + + pub const fn node_id(&self) -> u64 { + self.node_id + } + + pub async fn set_ready(&self, value: bool) { + *self.ready.write().await = value; + } + + pub async fn is_ready(&self) -> bool { + *self.ready.read().await + } + + pub async fn enqueue(&self, payload: String) -> u64 { + let mut data = self.data.write().await; + reap_expired_leases(&mut data.jobs); + + let id = data.next_id; + data.next_id = data.next_id.saturating_add(1); + + data.jobs.insert( + id, + JobRecord { + id, + payload, + attempt: 0, + owner: None, + lease_expires_at_ms: None, + done: false, + }, + ); + + bump_revision(&mut data.revision, self.node_id); + id + } + + pub async fn claim(&self, worker_id: String, max_jobs: usize) -> ClaimResult { + let mut data = self.data.write().await; + reap_expired_leases(&mut data.jobs); + + let now = unix_ms(); + let mut claimed = Vec::new(); + + for job in data.jobs.values_mut() { + if claimed.len() >= max_jobs { + break; + } + if job.done || job.owner.is_some() { + continue; + } + + job.attempt = job.attempt.saturating_add(1); + job.owner = Some(worker_id.clone()); + job.lease_expires_at_ms = Some(now.saturating_add(self.lease_ttl_ms)); + claimed.push(job.clone()); + } + + if !claimed.is_empty() { + bump_revision(&mut data.revision, self.node_id); + } + + ClaimResult { jobs: claimed } + } + + pub async fn heartbeat(&self, worker_id: &str, job_id: u64) -> bool { + let mut data = self.data.write().await; + reap_expired_leases(&mut data.jobs); + + let Some(job) = data.jobs.get_mut(&job_id) else { + return false; + }; + + if job.done || job.owner.as_deref() != Some(worker_id) { + return false; + } + + job.lease_expires_at_ms = Some(unix_ms().saturating_add(self.lease_ttl_ms)); + bump_revision(&mut data.revision, self.node_id); + true + } + + pub async fn ack(&self, worker_id: &str, job_id: u64) -> bool { + let mut data = self.data.write().await; + reap_expired_leases(&mut data.jobs); + + let Some(job) = data.jobs.get_mut(&job_id) else { + return false; + }; + + if job.done || job.owner.as_deref() != Some(worker_id) { + return false; + } + + job.done = true; + job.owner = None; + job.lease_expires_at_ms = None; + bump_revision(&mut data.revision, self.node_id); + true + } + + pub async fn state_view(&self) -> StateView { + let data = self.data.read().await; + let mut pending = 0; + let mut leased = 0; + let mut done = 0; + + for job in data.jobs.values() { + if job.done { + done += 1; + } else if job.owner.is_some() { + leased += 1; + } else { + pending += 1; + } + } + + StateView { + revision: data.revision, + next_id: data.next_id, + pending, + leased, + done, + } + } + + pub async fn merge_snapshot(&self, snapshot: Snapshot) { + let mut data = self.data.write().await; + if is_newer_revision(snapshot.revision, data.revision) { + data.revision = snapshot.revision; + data.next_id = snapshot.next_id; + data.jobs = snapshot.jobs; + } + } + + pub async fn snapshot(&self) -> Snapshot { + let data = self.data.read().await; + Snapshot { + node_id: self.node_id, + revision: data.revision, + next_id: data.next_id, + jobs: data.jobs.clone(), + } + } +} + +fn reap_expired_leases(jobs: &mut BTreeMap) { + let now = unix_ms(); + for job in jobs.values_mut() { + if job.done { + continue; + } + + if let Some(expiry) = job.lease_expires_at_ms + && expiry <= now + { + job.owner = None; + job.lease_expires_at_ms = None; + } + } +} + +fn bump_revision(revision: &mut Revision, node_id: u64) { + revision.version = revision.version.saturating_add(1); + revision.origin = node_id; +} + +fn is_newer_revision(candidate: Revision, existing: Revision) -> bool { + (candidate.version, candidate.origin) > (existing.version, existing.origin) +} + +fn unix_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_or(0, |duration| duration.as_millis() as u64) +} diff --git a/examples/scheduler/scheduler-node/src/sync.rs b/examples/scheduler/scheduler-node/src/sync.rs new file mode 100644 index 0000000..2f25802 --- /dev/null +++ b/examples/scheduler/scheduler-node/src/sync.rs @@ -0,0 +1,103 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use reqwest::Client; +use tokio::sync::Mutex; +use tracing::{debug, warn}; + +use crate::{ + config::SchedulerConfig, + state::{SchedulerState, Snapshot}, +}; + +const WARN_AFTER_CONSECUTIVE_FAILURES: u32 = 5; + +#[derive(Clone)] +pub struct SyncService { + config: Arc, + state: SchedulerState, + client: Client, + failures_by_peer: Arc>>, +} + +impl SyncService { + pub fn new(config: SchedulerConfig, state: SchedulerState) -> Self { + Self { + config: Arc::new(config), + state, + client: Client::new(), + failures_by_peer: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub fn start(&self) { + let service = self.clone(); + tokio::spawn(async move { + service.run().await; + }); + } + + async fn run(self) { + let interval = Duration::from_millis(self.config.sync_interval_ms.max(100)); + loop { + self.sync_once().await; + tokio::time::sleep(interval).await; + } + } + + async fn sync_once(&self) { + for peer in &self.config.peers { + match self.fetch_snapshot(&peer.http_address).await { + Ok(snapshot) => { + self.state.merge_snapshot(snapshot).await; + self.clear_failure_counter(&peer.http_address).await; + } + Err(error) => { + self.record_sync_failure(&peer.http_address, &error).await; + } + } + } + } + + async fn fetch_snapshot(&self, peer_address: &str) -> anyhow::Result { + let url = format!("http://{peer_address}/internal/snapshot"); + let snapshot = self + .client + .get(url) + .send() + .await? + .error_for_status()? + .json() + .await?; + Ok(snapshot) + } + + async fn clear_failure_counter(&self, peer_address: &str) { + let mut failures = self.failures_by_peer.lock().await; + failures.remove(peer_address); + } + + async fn record_sync_failure(&self, peer_address: &str, error: &anyhow::Error) { + let consecutive_failures = { + let mut failures = self.failures_by_peer.lock().await; + let entry = failures.entry(peer_address.to_owned()).or_insert(0); + *entry += 1; + *entry + }; + + if consecutive_failures >= WARN_AFTER_CONSECUTIVE_FAILURES { + warn!( + peer = %peer_address, + %error, + consecutive_failures, + "scheduler sync repeatedly failing" + ); + } else { + debug!( + peer = %peer_address, + %error, + consecutive_failures, + "scheduler sync failed" + ); + } + } +} diff --git a/examples/scheduler/testing/integration/Cargo.toml b/examples/scheduler/testing/integration/Cargo.toml new file mode 100644 index 0000000..819b60b --- /dev/null +++ b/examples/scheduler/testing/integration/Cargo.toml @@ -0,0 +1,13 @@ +[package] +edition.workspace = true +license.workspace = true +name = "scheduler-runtime-ext" +version.workspace = true + +[dependencies] +async-trait = { workspace = true } +scheduler-node = { path = "../../scheduler-node" } +serde = { workspace = true } +testing-framework-core = { workspace = true } +testing-framework-runner-compose = { workspace = true } +testing-framework-runner-local = { workspace = true } diff --git a/examples/scheduler/testing/integration/src/app.rs b/examples/scheduler/testing/integration/src/app.rs new file mode 100644 index 0000000..af1db60 --- /dev/null +++ b/examples/scheduler/testing/integration/src/app.rs @@ -0,0 +1,77 @@ +use std::io::Error; + +use async_trait::async_trait; +use scheduler_node::SchedulerHttpClient; +use serde::{Deserialize, Serialize}; +use testing_framework_core::scenario::{ + Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView, DynError, + NodeAccess, serialize_cluster_yaml_config, +}; + +pub type SchedulerTopology = testing_framework_core::topology::ClusterTopology; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct SchedulerPeerInfo { + pub node_id: u64, + pub http_address: String, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct SchedulerNodeConfig { + pub node_id: u64, + pub http_port: u16, + pub peers: Vec, + pub sync_interval_ms: u64, + pub lease_ttl_ms: u64, +} + +pub struct SchedulerEnv; + +#[async_trait] +impl Application for SchedulerEnv { + type Deployment = SchedulerTopology; + type NodeClient = SchedulerHttpClient; + type NodeConfig = SchedulerNodeConfig; + fn build_node_client(access: &NodeAccess) -> Result { + Ok(SchedulerHttpClient::new(access.api_base_url()?)) + } + + fn node_readiness_path() -> &'static str { + "/health/ready" + } +} + +impl ClusterNodeConfigApplication for SchedulerEnv { + type ConfigError = Error; + + fn static_network_port() -> u16 { + 8080 + } + + fn build_cluster_node_config( + node: &ClusterNodeView, + peers: &[ClusterPeerView], + ) -> Result { + let peers = peers + .iter() + .map(|peer| SchedulerPeerInfo { + node_id: peer.index() as u64, + http_address: peer.authority(), + }) + .collect::>(); + + Ok(SchedulerNodeConfig { + node_id: node.index() as u64, + http_port: node.network_port(), + peers, + sync_interval_ms: 500, + lease_ttl_ms: 3000, + }) + } + + fn serialize_cluster_node_config( + config: &Self::NodeConfig, + ) -> Result { + serialize_cluster_yaml_config(config).map_err(Error::other) + } +} diff --git a/examples/scheduler/testing/integration/src/compose_env.rs b/examples/scheduler/testing/integration/src/compose_env.rs new file mode 100644 index 0000000..f844c08 --- /dev/null +++ b/examples/scheduler/testing/integration/src/compose_env.rs @@ -0,0 +1,15 @@ +use testing_framework_runner_compose::{BinaryConfigNodeSpec, ComposeBinaryApp}; + +use crate::SchedulerEnv; + +const NODE_CONFIG_PATH: &str = "/etc/scheduler/config.yaml"; + +impl ComposeBinaryApp for SchedulerEnv { + fn compose_node_spec() -> BinaryConfigNodeSpec { + BinaryConfigNodeSpec::conventional( + "/usr/local/bin/scheduler-node", + NODE_CONFIG_PATH, + vec![8080, 8081], + ) + } +} diff --git a/examples/scheduler/testing/integration/src/lib.rs b/examples/scheduler/testing/integration/src/lib.rs new file mode 100644 index 0000000..d855502 --- /dev/null +++ b/examples/scheduler/testing/integration/src/lib.rs @@ -0,0 +1,10 @@ +mod app; +mod compose_env; +mod local_env; +pub mod scenario; + +pub use app::*; +pub use scenario::{SchedulerBuilderExt, SchedulerScenarioBuilder}; + +pub type SchedulerLocalDeployer = testing_framework_runner_local::ProcessDeployer; +pub type SchedulerComposeDeployer = testing_framework_runner_compose::ComposeDeployer; diff --git a/examples/scheduler/testing/integration/src/local_env.rs b/examples/scheduler/testing/integration/src/local_env.rs new file mode 100644 index 0000000..e6560b3 --- /dev/null +++ b/examples/scheduler/testing/integration/src/local_env.rs @@ -0,0 +1,42 @@ +use std::collections::HashMap; + +use testing_framework_core::scenario::{DynError, StartNodeOptions}; +use testing_framework_runner_local::{ + LocalBinaryApp, LocalNodePorts, LocalPeerNode, LocalProcessSpec, + build_local_cluster_node_config, yaml_node_config, +}; + +use crate::{SchedulerEnv, SchedulerNodeConfig}; + +impl LocalBinaryApp for SchedulerEnv { + fn initial_node_name_prefix() -> &'static str { + "scheduler-node" + } + + fn build_local_node_config_with_peers( + _topology: &Self::Deployment, + index: usize, + ports: &LocalNodePorts, + peers: &[LocalPeerNode], + _peer_ports_by_name: &HashMap, + _options: &StartNodeOptions, + _template_config: Option< + &::NodeConfig, + >, + ) -> Result<::NodeConfig, DynError> { + build_local_cluster_node_config::(index, ports, peers) + } + + fn local_process_spec() -> LocalProcessSpec { + LocalProcessSpec::new("SCHEDULER_NODE_BIN", "scheduler-node") + .with_rust_log("scheduler_node=info") + } + + fn render_local_config(config: &SchedulerNodeConfig) -> Result, DynError> { + yaml_node_config(config) + } + + fn http_api_port(config: &SchedulerNodeConfig) -> u16 { + config.http_port + } +} diff --git a/examples/scheduler/testing/integration/src/scenario.rs b/examples/scheduler/testing/integration/src/scenario.rs new file mode 100644 index 0000000..b6e54eb --- /dev/null +++ b/examples/scheduler/testing/integration/src/scenario.rs @@ -0,0 +1,15 @@ +use testing_framework_core::scenario::ScenarioBuilder; + +use crate::{SchedulerEnv, SchedulerTopology}; + +pub type SchedulerScenarioBuilder = ScenarioBuilder; + +pub trait SchedulerBuilderExt: Sized { + fn deployment_with(f: impl FnOnce(SchedulerTopology) -> SchedulerTopology) -> Self; +} + +impl SchedulerBuilderExt for SchedulerScenarioBuilder { + fn deployment_with(f: impl FnOnce(SchedulerTopology) -> SchedulerTopology) -> Self { + SchedulerScenarioBuilder::with_deployment(f(SchedulerTopology::new(3))) + } +} diff --git a/examples/scheduler/testing/workloads/Cargo.toml b/examples/scheduler/testing/workloads/Cargo.toml new file mode 100644 index 0000000..657653a --- /dev/null +++ b/examples/scheduler/testing/workloads/Cargo.toml @@ -0,0 +1,14 @@ +[package] +edition.workspace = true +license.workspace = true +name = "scheduler-runtime-workloads" +version.workspace = true + +[dependencies] +async-trait = { workspace = true } +scheduler-node = { path = "../../scheduler-node" } +scheduler-runtime-ext = { path = "../integration" } +serde = { workspace = true } +testing-framework-core = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } diff --git a/examples/scheduler/testing/workloads/src/drained.rs b/examples/scheduler/testing/workloads/src/drained.rs new file mode 100644 index 0000000..62366f3 --- /dev/null +++ b/examples/scheduler/testing/workloads/src/drained.rs @@ -0,0 +1,99 @@ +use std::time::Duration; + +use async_trait::async_trait; +use scheduler_runtime_ext::SchedulerEnv; +use serde::Deserialize; +use testing_framework_core::scenario::{DynError, Expectation, RunContext}; +use tracing::info; + +#[derive(Clone)] +pub struct SchedulerDrained { + min_done: usize, + timeout: Duration, + poll_interval: Duration, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +struct Revision { + version: u64, + origin: u64, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +struct StateResponse { + revision: Revision, + pending: usize, + leased: usize, + done: usize, +} + +impl SchedulerDrained { + #[must_use] + pub fn new(min_done: usize) -> Self { + Self { + min_done, + timeout: Duration::from_secs(30), + poll_interval: Duration::from_millis(500), + } + } + + #[must_use] + pub const fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } +} + +#[async_trait] +impl Expectation for SchedulerDrained { + fn name(&self) -> &str { + "scheduler_drained" + } + + async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> { + let clients = ctx.node_clients().snapshot(); + if clients.is_empty() { + return Err("no scheduler node clients available".into()); + } + + let deadline = tokio::time::Instant::now() + self.timeout; + while tokio::time::Instant::now() < deadline { + if is_drained_and_converged(&clients, self.min_done).await? { + info!(min_done = self.min_done, "scheduler drained and converged"); + return Ok(()); + } + tokio::time::sleep(self.poll_interval).await; + } + + Err(format!("scheduler not drained within {:?}", self.timeout).into()) + } +} + +async fn is_drained_and_converged( + clients: &[scheduler_node::SchedulerHttpClient], + min_done: usize, +) -> Result { + let Some((first, rest)) = clients.split_first() else { + return Ok(false); + }; + + let baseline = read_state(first).await?; + if baseline.pending != 0 || baseline.leased != 0 || baseline.done < min_done { + return Ok(false); + } + + for client in rest { + let current = read_state(client).await?; + if current != baseline { + return Ok(false); + } + } + + Ok(true) +} + +async fn read_state( + client: &scheduler_node::SchedulerHttpClient, +) -> Result { + Ok(client.get("/jobs/state").await?) +} diff --git a/examples/scheduler/testing/workloads/src/lease_failover.rs b/examples/scheduler/testing/workloads/src/lease_failover.rs new file mode 100644 index 0000000..15eb0c5 --- /dev/null +++ b/examples/scheduler/testing/workloads/src/lease_failover.rs @@ -0,0 +1,205 @@ +use std::{collections::HashSet, time::Duration}; + +use async_trait::async_trait; +use scheduler_runtime_ext::SchedulerEnv; +use serde::{Deserialize, Serialize}; +use testing_framework_core::scenario::{DynError, RunContext, Workload}; +use tokio::time::{Instant, sleep}; +use tracing::info; + +#[derive(Clone)] +pub struct SchedulerLeaseFailoverWorkload { + operations: usize, + lease_ttl: Duration, + rate_per_sec: Option, + payload_prefix: String, +} + +#[derive(Serialize)] +struct EnqueueRequest { + payload: String, +} + +#[derive(Deserialize)] +struct EnqueueResponse { + id: u64, +} + +#[derive(Serialize)] +struct ClaimRequest { + worker_id: String, + max_jobs: usize, +} + +#[derive(Deserialize)] +struct ClaimedJob { + id: u64, +} + +#[derive(Deserialize)] +struct ClaimResponse { + jobs: Vec, +} + +#[derive(Serialize)] +struct AckRequest { + worker_id: String, + job_id: u64, +} + +#[derive(Deserialize)] +struct OperationResponse { + ok: bool, +} + +impl SchedulerLeaseFailoverWorkload { + #[must_use] + pub fn new() -> Self { + Self { + operations: 100, + lease_ttl: Duration::from_secs(3), + rate_per_sec: Some(25), + payload_prefix: "scheduler-job".to_owned(), + } + } + + #[must_use] + pub const fn operations(mut self, value: usize) -> Self { + self.operations = value; + self + } + + #[must_use] + pub const fn lease_ttl(mut self, value: Duration) -> Self { + self.lease_ttl = value; + self + } + + #[must_use] + pub const fn rate_per_sec(mut self, value: usize) -> Self { + self.rate_per_sec = Some(value); + self + } +} + +impl Default for SchedulerLeaseFailoverWorkload { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl Workload for SchedulerLeaseFailoverWorkload { + fn name(&self) -> &str { + "scheduler_lease_failover_workload" + } + + async fn start(&self, ctx: &RunContext) -> Result<(), DynError> { + let clients = ctx.node_clients().snapshot(); + let Some(node_a) = clients.first() else { + return Err("no scheduler node clients available".into()); + }; + let node_b = clients.get(1).unwrap_or(node_a); + + let interval = self.rate_per_sec.and_then(compute_interval); + let mut enqueued_ids = Vec::with_capacity(self.operations); + + info!( + operations = self.operations, + "scheduler failover: enqueue phase" + ); + for index in 0..self.operations { + let response: EnqueueResponse = node_a + .post( + "/jobs/enqueue", + &EnqueueRequest { + payload: format!("{}-{index}", self.payload_prefix), + }, + ) + .await?; + enqueued_ids.push(response.id); + if let Some(delay) = interval { + sleep(delay).await; + } + } + + info!("scheduler failover: worker-a claim without ack"); + let first_claim: ClaimResponse = node_a + .post( + "/jobs/claim", + &ClaimRequest { + worker_id: "worker-a".to_owned(), + max_jobs: self.operations, + }, + ) + .await?; + + if first_claim.jobs.len() != self.operations { + return Err(format!( + "worker-a claimed {} jobs, expected {}", + first_claim.jobs.len(), + self.operations + ) + .into()); + } + + sleep(self.lease_ttl + Duration::from_millis(500)).await; + + info!("scheduler failover: worker-b reclaim and ack"); + let mut pending_ids: HashSet = enqueued_ids.into_iter().collect(); + let reclaim_deadline = Instant::now() + Duration::from_secs(20); + + while !pending_ids.is_empty() && Instant::now() < reclaim_deadline { + let claim: ClaimResponse = node_b + .post( + "/jobs/claim", + &ClaimRequest { + worker_id: "worker-b".to_owned(), + max_jobs: pending_ids.len(), + }, + ) + .await?; + + if claim.jobs.is_empty() { + sleep(Duration::from_millis(200)).await; + continue; + } + + for job in claim.jobs { + if !pending_ids.remove(&job.id) { + return Err(format!("unexpected reclaimed job id {}", job.id).into()); + } + + let ack: OperationResponse = node_b + .post( + "/jobs/ack", + &AckRequest { + worker_id: "worker-b".to_owned(), + job_id: job.id, + }, + ) + .await?; + + if !ack.ok { + return Err(format!("failed to ack reclaimed job {}", job.id).into()); + } + } + } + + if !pending_ids.is_empty() { + return Err( + format!("scheduler failover left {} unacked jobs", pending_ids.len()).into(), + ); + } + + Ok(()) + } +} + +fn compute_interval(rate_per_sec: usize) -> Option { + if rate_per_sec == 0 { + return None; + } + + Some(Duration::from_millis((1000 / rate_per_sec as u64).max(1))) +} diff --git a/examples/scheduler/testing/workloads/src/lib.rs b/examples/scheduler/testing/workloads/src/lib.rs new file mode 100644 index 0000000..6df35b7 --- /dev/null +++ b/examples/scheduler/testing/workloads/src/lib.rs @@ -0,0 +1,8 @@ +mod drained; +mod lease_failover; + +pub use drained::SchedulerDrained; +pub use lease_failover::SchedulerLeaseFailoverWorkload; +pub use scheduler_runtime_ext::{ + SchedulerBuilderExt, SchedulerEnv, SchedulerScenarioBuilder, SchedulerTopology, +};