feat(examples): add simple clustered demo apps

This commit is contained in:
andrussal 2026-04-10 10:03:25 +02:00
parent a5dcc97763
commit 1ff6b8df6d
73 changed files with 4384 additions and 0 deletions

242
Cargo.lock generated
View File

@ -177,11 +177,13 @@ dependencies = [
"serde",
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
@ -202,6 +204,7 @@ dependencies = [
"sync_wrapper",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
@ -1385,6 +1388,63 @@ dependencies = [
"tracing",
]
[[package]]
name = "kvstore-examples"
version = "0.1.0"
dependencies = [
"anyhow",
"kvstore-runtime-ext",
"kvstore-runtime-workloads",
"serde",
"testing-framework-core",
"testing-framework-runner-compose",
"testing-framework-runner-k8s",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "kvstore-node"
version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"clap",
"reqwest",
"serde",
"serde_yaml",
"tokio",
"tower-http",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "kvstore-runtime-ext"
version = "0.1.0"
dependencies = [
"async-trait",
"reqwest",
"serde",
"testing-framework-core",
"testing-framework-runner-compose",
"testing-framework-runner-k8s",
"testing-framework-runner-local",
]
[[package]]
name = "kvstore-runtime-workloads"
version = "0.1.0"
dependencies = [
"async-trait",
"kvstore-runtime-ext",
"serde",
"testing-framework-core",
"tokio",
"tracing",
]
[[package]]
name = "lazy_static"
version = "1.5.0"
@ -1436,6 +1496,15 @@ version = "0.4.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897"
[[package]]
name = "matchers"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
dependencies = [
"regex-automata",
]
[[package]]
name = "matchit"
version = "0.7.3"
@ -1482,6 +1551,15 @@ dependencies = [
"tempfile",
]
[[package]]
name = "nu-ansi-term"
version = "0.50.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "num-conv"
version = "0.2.0"
@ -1803,6 +1881,61 @@ dependencies = [
"url",
]
[[package]]
name = "queue-examples"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"queue-runtime-ext",
"queue-runtime-workloads",
"testing-framework-core",
"testing-framework-runner-compose",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "queue-node"
version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"clap",
"reqwest",
"serde",
"serde_yaml",
"tokio",
"tower-http",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "queue-runtime-ext"
version = "0.1.0"
dependencies = [
"async-trait",
"reqwest",
"serde",
"testing-framework-core",
"testing-framework-runner-compose",
"testing-framework-runner-local",
]
[[package]]
name = "queue-runtime-workloads"
version = "0.1.0"
dependencies = [
"async-trait",
"queue-runtime-ext",
"serde",
"testing-framework-core",
"tokio",
"tracing",
]
[[package]]
name = "quote"
version = "1.0.44"
@ -2048,6 +2181,60 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "scheduler-examples"
version = "0.1.0"
dependencies = [
"anyhow",
"scheduler-runtime-ext",
"scheduler-runtime-workloads",
"testing-framework-core",
"testing-framework-runner-compose",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "scheduler-node"
version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"clap",
"reqwest",
"serde",
"serde_yaml",
"tokio",
"tower-http",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "scheduler-runtime-ext"
version = "0.1.0"
dependencies = [
"async-trait",
"reqwest",
"serde",
"testing-framework-core",
"testing-framework-runner-compose",
"testing-framework-runner-local",
]
[[package]]
name = "scheduler-runtime-workloads"
version = "0.1.0"
dependencies = [
"async-trait",
"scheduler-runtime-ext",
"serde",
"testing-framework-core",
"tokio",
"tracing",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
@ -2205,6 +2392,15 @@ dependencies = [
"digest",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]]
name = "shlex"
version = "1.3.0"
@ -2465,6 +2661,15 @@ dependencies = [
"syn",
]
[[package]]
name = "thread_local"
version = "1.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185"
dependencies = [
"cfg-if",
]
[[package]]
name = "time"
version = "0.3.47"
@ -2514,6 +2719,7 @@ dependencies = [
"bytes",
"libc",
"mio",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
@ -2657,6 +2863,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex-automata",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]
[[package]]
@ -2737,6 +2973,12 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "valuable"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "vcpkg"
version = "0.2.15"

View File

@ -4,6 +4,18 @@ members = [
"cfgsync/artifacts",
"cfgsync/core",
"cfgsync/runtime",
"examples/kvstore/examples",
"examples/kvstore/kvstore-node",
"examples/kvstore/testing/integration",
"examples/kvstore/testing/workloads",
"examples/queue/examples",
"examples/queue/queue-node",
"examples/queue/testing/integration",
"examples/queue/testing/workloads",
"examples/scheduler/examples",
"examples/scheduler/scheduler-node",
"examples/scheduler/testing/integration",
"examples/scheduler/testing/workloads",
"testing-framework/core",
"testing-framework/deployers/compose",
"testing-framework/deployers/k8s",

View File

@ -0,0 +1,28 @@
# Build stage
FROM rustlang/rust:nightly-bookworm AS builder
WORKDIR /build
# Copy all workspace files required for workspace build.
COPY Cargo.toml Cargo.lock ./
COPY cfgsync/ ./cfgsync/
COPY examples/ ./examples/
COPY testing-framework/ ./testing-framework/
# Build kvstore-node in release mode.
RUN cargo build --release -p kvstore-node
# Runtime stage
FROM debian:bookworm-slim
RUN apt-get update && \
apt-get install -y ca-certificates && \
rm -rf /var/lib/apt/lists/*
COPY --from=builder /build/target/release/kvstore-node /usr/local/bin/kvstore-node
RUN mkdir -p /etc/kvstore
WORKDIR /app
ENTRYPOINT ["/usr/local/bin/kvstore-node"]
CMD ["--config", "/etc/kvstore/config.yaml"]

View File

@ -0,0 +1,64 @@
# KV Store Example
This example runs a small replicated key-value store.
The usual scenario writes keys through one node and checks that the other nodes
eventually return the same values.
## How TF runs this
Each example follows the same pattern:
- TF starts a small deployment of kvstore nodes
- a workload writes keys through one node
- an expectation keeps reading from all nodes until they agree on the values
## Scenarios
- `basic_convergence` runs the convergence check locally
- `compose_convergence` runs the same check in Docker Compose
- `k8s_convergence` runs it on Kubernetes
- `k8s_manual_convergence` starts the nodes through the k8s manual cluster API, restarts one node, and checks convergence again
## API
Each node exposes:
- `PUT /kv/:key` to write a value
- `GET /kv/:key` to read a value
- `GET /internal/snapshot` to read the local replicated state
## Run locally
```bash
cargo run -p kvstore-examples --bin basic_convergence
```
## Run with Docker Compose
```bash
cargo run -p kvstore-examples --bin compose_convergence
```
Set `KVSTORE_IMAGE` to override the default compose image tag.
## Run with Kubernetes
```bash
docker build -t kvstore-node:local -f examples/kvstore/Dockerfile .
cargo run -p kvstore-examples --bin k8s_convergence
```
Prerequisites:
- `kubectl` configured with a reachable cluster
- `helm` installed
Optional image override:
- `KVSTORE_K8S_IMAGE` (falls back to `KVSTORE_IMAGE`, then `kvstore-node:local`)
## Run with Kubernetes manual cluster
```bash
docker build -t kvstore-node:local -f examples/kvstore/Dockerfile .
cargo run -p kvstore-examples --bin k8s_manual_convergence
```

View File

@ -0,0 +1,18 @@
[package]
edition.workspace = true
license.workspace = true
name = "kvstore-examples"
version.workspace = true
[dependencies]
kvstore-runtime-ext = { path = "../testing/integration" }
kvstore-runtime-workloads = { path = "../testing/workloads" }
testing-framework-core = { workspace = true }
testing-framework-runner-compose = { workspace = true }
testing-framework-runner-k8s = { workspace = true }
anyhow = "1.0"
serde = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@ -0,0 +1,31 @@
use std::time::Duration;
use kvstore_runtime_ext::KvLocalDeployer;
use kvstore_runtime_workloads::{
KvBuilderExt, KvConverges, KvScenarioBuilder, KvTopology, KvWriteWorkload,
};
use testing_framework_core::scenario::Deployer;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let mut scenario = KvScenarioBuilder::deployment_with(|_| KvTopology::new(3))
.with_run_duration(Duration::from_secs(30))
.with_workload(
KvWriteWorkload::new()
.operations(300)
.key_count(30)
.rate_per_sec(30)
.key_prefix("demo"),
)
.with_expectation(KvConverges::new("demo", 30).timeout(Duration::from_secs(25)))
.build()?;
let deployer = KvLocalDeployer::default();
let runner = deployer.deploy(&scenario).await?;
runner.run(&mut scenario).await?;
Ok(())
}

View File

@ -0,0 +1,44 @@
use std::time::Duration;
use anyhow::{Context as _, Result};
use kvstore_runtime_workloads::{
KvBuilderExt, KvConverges, KvScenarioBuilder, KvTopology, KvWriteWorkload,
};
use testing_framework_core::scenario::Deployer;
use testing_framework_runner_compose::ComposeRunnerError;
use tracing::{info, warn};
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let mut scenario = KvScenarioBuilder::deployment_with(|_| KvTopology::new(3))
.with_run_duration(Duration::from_secs(30))
.with_workload(
KvWriteWorkload::new()
.operations(200)
.key_count(20)
.rate_per_sec(20),
)
.with_expectation(KvConverges::new("kv-demo", 20).timeout(Duration::from_secs(25)))
.build()?;
let deployer = kvstore_runtime_ext::KvComposeDeployer::new();
let runner = match deployer.deploy(&scenario).await {
Ok(runner) => runner,
Err(ComposeRunnerError::DockerUnavailable) => {
warn!("docker unavailable; skipping compose kv run");
return Ok(());
}
Err(error) => return Err(anyhow::Error::new(error)).context("deploying kv compose stack"),
};
info!("running kv compose convergence scenario");
runner
.run(&mut scenario)
.await
.context("running kv compose scenario")?;
Ok(())
}

View File

@ -0,0 +1,58 @@
use std::time::Duration;
use anyhow::{Context as _, Result};
use kvstore_runtime_ext::KvK8sDeployer;
use kvstore_runtime_workloads::{
KvBuilderExt, KvConverges, KvScenarioBuilder, KvTopology, KvWriteWorkload,
};
use testing_framework_core::scenario::Deployer;
use testing_framework_runner_k8s::K8sRunnerError;
use tracing::{info, warn};
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let mut scenario = KvScenarioBuilder::deployment_with(|_| KvTopology::new(3))
.with_run_duration(Duration::from_secs(30))
.with_workload(
KvWriteWorkload::new()
.operations(200)
.key_count(20)
.rate_per_sec(20),
)
.with_expectation(KvConverges::new("kv-demo", 20).timeout(Duration::from_secs(25)))
.build()?;
let deployer = KvK8sDeployer::new();
let runner = match deployer.deploy(&scenario).await {
Ok(runner) => runner,
Err(K8sRunnerError::ClientInit { source }) => {
warn!("k8s unavailable ({source}); skipping kv k8s run");
return Ok(());
}
Err(K8sRunnerError::InstallStack { source })
if k8s_cluster_unavailable(&source.to_string()) =>
{
warn!("k8s unavailable ({source}); skipping kv k8s run");
return Ok(());
}
Err(error) => return Err(anyhow::Error::new(error)).context("deploying kv k8s stack"),
};
info!("running kv k8s convergence scenario");
runner
.run(&mut scenario)
.await
.context("running kv k8s scenario")?;
Ok(())
}
fn k8s_cluster_unavailable(message: &str) -> bool {
message.contains("Unable to connect to the server")
|| message.contains("TLS handshake timeout")
|| message.contains("connection refused")
}

View File

@ -0,0 +1,154 @@
use std::time::Duration;
use anyhow::{Context as _, Result, anyhow};
use kvstore_runtime_ext::{KvHttpClient, KvK8sDeployer, KvTopology};
use serde::{Deserialize, Serialize};
use testing_framework_runner_k8s::ManualClusterError;
use tracing::{info, warn};
#[derive(Serialize)]
struct PutRequest {
value: String,
expected_version: Option<u64>,
}
#[derive(Deserialize)]
struct PutResponse {
applied: bool,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
struct ValueRecord {
value: String,
version: u64,
origin: u64,
}
#[derive(Deserialize)]
struct GetResponse {
record: Option<ValueRecord>,
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let deployer = KvK8sDeployer::new();
let cluster = match deployer
.manual_cluster_from_descriptors(KvTopology::new(3))
.await
{
Ok(cluster) => cluster,
Err(ManualClusterError::ClientInit { source }) => {
warn!("k8s unavailable ({source}); skipping kv k8s manual run");
return Ok(());
}
Err(ManualClusterError::InstallStack { source })
if k8s_cluster_unavailable(&source.to_string()) =>
{
warn!("k8s unavailable ({source}); skipping kv k8s manual run");
return Ok(());
}
Err(error) => {
return Err(anyhow::Error::new(error)).context("creating kv k8s manual cluster");
}
};
let node0 = cluster.start_node("node-0").await?.client;
let node1 = cluster.start_node("node-1").await?.client;
let node2 = cluster.start_node("node-2").await?.client;
cluster.wait_network_ready().await?;
write_keys(&node0, "kv-manual", 12).await?;
wait_for_convergence(
&[node0.clone(), node1.clone(), node2.clone()],
"kv-manual",
12,
)
.await?;
info!("restarting node-2 in manual cluster");
cluster.restart_node("node-2").await?;
cluster.wait_network_ready().await?;
let node2 = cluster
.node_client("node-2")
.ok_or_else(|| anyhow!("node-2 client missing after restart"))?;
wait_for_convergence(&[node0, node1, node2], "kv-manual", 12).await?;
cluster.stop_all();
Ok(())
}
async fn write_keys(client: &KvHttpClient, prefix: &str, key_count: usize) -> Result<()> {
for index in 0..key_count {
let key = format!("{prefix}-{index}");
let response: PutResponse = client
.put(
&format!("/kv/{key}"),
&PutRequest {
value: format!("value-{index}"),
expected_version: None,
},
)
.await
.map_err(|error| anyhow!(error.to_string()))
.with_context(|| format!("writing key {key}"))?;
if !response.applied {
return Err(anyhow!("write rejected for key {key}"));
}
}
Ok(())
}
async fn wait_for_convergence(
clients: &[KvHttpClient],
prefix: &str,
key_count: usize,
) -> Result<()> {
let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
while tokio::time::Instant::now() < deadline {
if is_converged(clients, prefix, key_count).await? {
info!(key_count, "kv manual cluster converged");
return Ok(());
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
Err(anyhow!("kv manual cluster did not converge within timeout"))
}
async fn is_converged(clients: &[KvHttpClient], prefix: &str, key_count: usize) -> Result<bool> {
for index in 0..key_count {
let key = format!("{prefix}-{index}");
let first = read_key(&clients[0], &key).await?;
for client in &clients[1..] {
if read_key(client, &key).await? != first {
return Ok(false);
}
}
}
Ok(true)
}
async fn read_key(client: &KvHttpClient, key: &str) -> Result<Option<ValueRecord>> {
let response: GetResponse = client
.get(&format!("/kv/{key}"))
.await
.map_err(|error| anyhow!(error.to_string()))
.with_context(|| format!("reading key {key}"))?;
Ok(response.record)
}
fn k8s_cluster_unavailable(message: &str) -> bool {
message.contains("Unable to connect to the server")
|| message.contains("TLS handshake timeout")
|| message.contains("connection refused")
}

View File

@ -0,0 +1,24 @@
[package]
edition.workspace = true
license.workspace = true
name = "kvstore-node"
version.workspace = true
[[bin]]
name = "kvstore-node"
path = "src/main.rs"
[dependencies]
axum = "0.7"
tower-http = { version = "0.6", features = ["trace"] }
serde = { workspace = true }
serde_yaml = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
anyhow = "1.0"
clap = { version = "4.0", features = ["derive"] }
reqwest = { workspace = true, features = ["json"] }

View File

@ -0,0 +1,30 @@
use std::{fs, path::Path};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PeerInfo {
pub node_id: u64,
pub http_address: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct KvConfig {
pub node_id: u64,
pub http_port: u16,
pub peers: Vec<PeerInfo>,
#[serde(default = "default_sync_interval_ms")]
pub sync_interval_ms: u64,
}
impl KvConfig {
pub fn load(path: &Path) -> anyhow::Result<Self> {
let raw = fs::read_to_string(path)?;
let config = serde_yaml::from_str(&raw)?;
Ok(config)
}
}
const fn default_sync_interval_ms() -> u64 {
1000
}

View File

@ -0,0 +1,36 @@
mod config;
mod server;
mod state;
mod sync;
use std::path::PathBuf;
use clap::Parser;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use crate::{config::KvConfig, state::KvState, sync::SyncService};
#[derive(Parser, Debug)]
#[command(name = "kvstore-node")]
struct Args {
#[arg(short, long)]
config: PathBuf,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "kvstore_node=info,tower_http=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let args = Args::parse();
let config = KvConfig::load(&args.config)?;
let state = KvState::new(config.node_id);
SyncService::new(config.clone(), state.clone()).start();
server::start_server(config, state).await
}

View File

@ -0,0 +1,112 @@
use std::net::SocketAddr;
use axum::{
Router,
extract::{Path, State},
http::StatusCode,
response::Json,
routing::get,
};
use serde::{Deserialize, Serialize};
use tower_http::trace::TraceLayer;
use crate::{
config::KvConfig,
state::{KvState, Snapshot, ValueRecord},
};
#[derive(Serialize)]
struct HealthResponse {
status: &'static str,
}
#[derive(Deserialize)]
struct PutRequest {
value: String,
expected_version: Option<u64>,
}
#[derive(Serialize)]
struct PutResponse {
applied: bool,
version: u64,
}
#[derive(Serialize)]
struct GetResponse {
key: String,
record: Option<ValueRecord>,
}
pub async fn start_server(config: KvConfig, state: KvState) -> anyhow::Result<()> {
let app = Router::new()
.route("/health/live", get(health_live))
.route("/health/ready", get(health_ready))
.route("/kv/:key", get(get_key).put(put_key))
.route("/internal/snapshot", get(get_snapshot))
.layer(TraceLayer::new_for_http())
.with_state(state.clone());
let addr = SocketAddr::from(([0, 0, 0, 0], config.http_port));
let listener = tokio::net::TcpListener::bind(addr).await?;
state.set_ready(true).await;
tracing::info!(node_id = state.node_id(), %addr, "kv node ready");
axum::serve(listener, app).await?;
Ok(())
}
async fn health_live() -> (StatusCode, Json<HealthResponse>) {
(StatusCode::OK, Json(HealthResponse { status: "alive" }))
}
async fn health_ready(State(state): State<KvState>) -> (StatusCode, Json<HealthResponse>) {
if state.is_ready().await {
(StatusCode::OK, Json(HealthResponse { status: "ready" }))
} else {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(HealthResponse {
status: "not-ready",
}),
)
}
}
async fn get_key(Path(key): Path<String>, State(state): State<KvState>) -> Json<GetResponse> {
let record = state.get(&key).await;
Json(GetResponse { key, record })
}
async fn put_key(
Path(key): Path<String>,
State(state): State<KvState>,
Json(request): Json<PutRequest>,
) -> (StatusCode, Json<PutResponse>) {
let outcome = state
.put_local(key, request.value, request.expected_version)
.await;
if outcome.applied {
(
StatusCode::OK,
Json(PutResponse {
applied: true,
version: outcome.current_version,
}),
)
} else {
(
StatusCode::CONFLICT,
Json(PutResponse {
applied: false,
version: outcome.current_version,
}),
)
}
}
async fn get_snapshot(State(state): State<KvState>) -> Json<Snapshot> {
Json(state.snapshot().await)
}

View File

@ -0,0 +1,111 @@
use std::{collections::HashMap, sync::Arc};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct ValueRecord {
pub value: String,
pub version: u64,
pub origin: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Snapshot {
pub node_id: u64,
pub entries: HashMap<String, ValueRecord>,
}
#[derive(Clone, Debug)]
pub struct PutOutcome {
pub applied: bool,
pub current_version: u64,
}
#[derive(Clone)]
pub struct KvState {
node_id: u64,
ready: Arc<RwLock<bool>>,
entries: Arc<RwLock<HashMap<String, ValueRecord>>>,
}
impl KvState {
pub fn new(node_id: u64) -> Self {
Self {
node_id,
ready: Arc::new(RwLock::new(false)),
entries: Arc::new(RwLock::new(HashMap::new())),
}
}
pub const fn node_id(&self) -> u64 {
self.node_id
}
pub async fn set_ready(&self, value: bool) {
*self.ready.write().await = value;
}
pub async fn is_ready(&self) -> bool {
*self.ready.read().await
}
pub async fn get(&self, key: &str) -> Option<ValueRecord> {
self.entries.read().await.get(key).cloned()
}
pub async fn put_local(
&self,
key: String,
value: String,
expected_version: Option<u64>,
) -> PutOutcome {
let mut entries = self.entries.write().await;
let current_version = entries.get(&key).map_or(0, |record| record.version);
if expected_version.is_some_and(|expected| expected != current_version) {
return PutOutcome {
applied: false,
current_version,
};
}
let next_version = current_version.saturating_add(1);
entries.insert(
key,
ValueRecord {
value,
version: next_version,
origin: self.node_id,
},
);
PutOutcome {
applied: true,
current_version: next_version,
}
}
pub async fn merge_snapshot(&self, snapshot: Snapshot) {
let mut local = self.entries.write().await;
for (key, incoming) in snapshot.entries {
match local.get(&key) {
Some(existing) if !is_newer_record(&incoming, existing) => {}
_ => {
local.insert(key, incoming);
}
}
}
}
pub async fn snapshot(&self) -> Snapshot {
Snapshot {
node_id: self.node_id,
entries: self.entries.read().await.clone(),
}
}
}
fn is_newer_record(candidate: &ValueRecord, existing: &ValueRecord) -> bool {
(candidate.version, candidate.origin) > (existing.version, existing.origin)
}

View File

@ -0,0 +1,103 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use reqwest::Client;
use tokio::sync::Mutex;
use tracing::{debug, warn};
use crate::{
config::KvConfig,
state::{KvState, Snapshot},
};
const WARN_AFTER_CONSECUTIVE_FAILURES: u32 = 5;
#[derive(Clone)]
pub struct SyncService {
config: Arc<KvConfig>,
state: KvState,
client: Client,
failures_by_peer: Arc<Mutex<HashMap<String, u32>>>,
}
impl SyncService {
pub fn new(config: KvConfig, state: KvState) -> Self {
Self {
config: Arc::new(config),
state,
client: Client::new(),
failures_by_peer: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn start(&self) {
let service = self.clone();
tokio::spawn(async move {
service.run().await;
});
}
async fn run(self) {
let interval = Duration::from_millis(self.config.sync_interval_ms.max(100));
loop {
self.sync_once().await;
tokio::time::sleep(interval).await;
}
}
async fn sync_once(&self) {
for peer in &self.config.peers {
match self.fetch_snapshot(&peer.http_address).await {
Ok(snapshot) => {
self.state.merge_snapshot(snapshot).await;
self.clear_failure_counter(&peer.http_address).await;
}
Err(error) => {
self.record_sync_failure(&peer.http_address, &error).await;
}
}
}
}
async fn fetch_snapshot(&self, peer_address: &str) -> anyhow::Result<Snapshot> {
let url = format!("http://{peer_address}/internal/snapshot");
let snapshot = self
.client
.get(url)
.send()
.await?
.error_for_status()?
.json()
.await?;
Ok(snapshot)
}
async fn clear_failure_counter(&self, peer_address: &str) {
let mut failures = self.failures_by_peer.lock().await;
failures.remove(peer_address);
}
async fn record_sync_failure(&self, peer_address: &str, error: &anyhow::Error) {
let consecutive_failures = {
let mut failures = self.failures_by_peer.lock().await;
let entry = failures.entry(peer_address.to_owned()).or_insert(0);
*entry += 1;
*entry
};
if consecutive_failures >= WARN_AFTER_CONSECUTIVE_FAILURES {
warn!(
peer = %peer_address,
%error,
consecutive_failures,
"kv sync repeatedly failing"
);
} else {
debug!(
peer = %peer_address,
%error,
consecutive_failures,
"kv sync failed"
);
}
}
}

View File

@ -0,0 +1,15 @@
[package]
edition.workspace = true
license.workspace = true
name = "kvstore-runtime-ext"
version.workspace = true
[dependencies]
testing-framework-core = { workspace = true }
testing-framework-runner-compose = { workspace = true }
testing-framework-runner-k8s = { workspace = true }
testing-framework-runner-local = { workspace = true }
async-trait = { workspace = true }
reqwest = { workspace = true, features = ["json"] }
serde = { workspace = true }

View File

@ -0,0 +1,114 @@
use std::io::Error;
use async_trait::async_trait;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use testing_framework_core::scenario::{
Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView,
DefaultFeedRuntime, DynError, NodeAccess, serialize_cluster_yaml_config,
};
pub type KvTopology = testing_framework_core::topology::ClusterTopology;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct KvPeerInfo {
pub node_id: u64,
pub http_address: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct KvNodeConfig {
pub node_id: u64,
pub http_port: u16,
pub peers: Vec<KvPeerInfo>,
pub sync_interval_ms: u64,
}
#[derive(Clone)]
pub struct KvHttpClient {
base_url: Url,
client: reqwest::Client,
}
impl KvHttpClient {
#[must_use]
pub fn new(base_url: Url) -> Self {
Self {
base_url,
client: reqwest::Client::new(),
}
}
pub async fn get<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T, DynError> {
let url = self.base_url.join(path)?;
let response = self.client.get(url).send().await?.error_for_status()?;
Ok(response.json().await?)
}
pub async fn put<B: Serialize, T: serde::de::DeserializeOwned>(
&self,
path: &str,
body: &B,
) -> Result<T, DynError> {
let url = self.base_url.join(path)?;
let response = self
.client
.put(url)
.json(body)
.send()
.await?
.error_for_status()?;
Ok(response.json().await?)
}
}
pub struct KvEnv;
#[async_trait]
impl Application for KvEnv {
type Deployment = KvTopology;
type NodeClient = KvHttpClient;
type NodeConfig = KvNodeConfig;
type FeedRuntime = DefaultFeedRuntime;
fn build_node_client(access: &NodeAccess) -> Result<Self::NodeClient, DynError> {
Ok(KvHttpClient::new(access.api_base_url()?))
}
fn node_readiness_path() -> &'static str {
"/health/ready"
}
}
impl ClusterNodeConfigApplication for KvEnv {
type ConfigError = Error;
fn static_network_port() -> u16 {
8080
}
fn build_cluster_node_config(
node: &ClusterNodeView,
peers: &[ClusterPeerView],
) -> Result<Self::NodeConfig, Self::ConfigError> {
let peers = peers
.iter()
.map(|peer| KvPeerInfo {
node_id: peer.index() as u64,
http_address: peer.authority(),
})
.collect::<Vec<_>>();
Ok(KvNodeConfig {
node_id: node.index() as u64,
http_port: node.network_port(),
peers,
sync_interval_ms: 500,
})
}
fn serialize_cluster_node_config(
config: &Self::NodeConfig,
) -> Result<String, Self::ConfigError> {
serialize_cluster_yaml_config(config).map_err(Error::other)
}
}

View File

@ -0,0 +1,15 @@
use testing_framework_runner_compose::{BinaryConfigNodeSpec, ComposeBinaryApp};
use crate::KvEnv;
const NODE_CONFIG_PATH: &str = "/etc/kvstore/config.yaml";
impl ComposeBinaryApp for KvEnv {
fn compose_node_spec() -> BinaryConfigNodeSpec {
BinaryConfigNodeSpec::conventional(
"/usr/local/bin/kvstore-node",
NODE_CONFIG_PATH,
vec![8080, 8081],
)
}
}

View File

@ -0,0 +1,21 @@
use testing_framework_runner_k8s::{BinaryConfigK8sSpec, K8sBinaryApp};
use crate::KvEnv;
const CONTAINER_CONFIG_PATH: &str = "/etc/kvstore/config.yaml";
const CONTAINER_HTTP_PORT: u16 = 8080;
const SERVICE_TESTING_PORT: u16 = 8081;
const NODE_NAME_PREFIX: &str = "kvstore-node";
impl K8sBinaryApp for KvEnv {
fn k8s_binary_spec() -> BinaryConfigK8sSpec {
BinaryConfigK8sSpec::conventional(
"kvstore",
NODE_NAME_PREFIX,
"/usr/local/bin/kvstore-node",
CONTAINER_CONFIG_PATH,
CONTAINER_HTTP_PORT,
SERVICE_TESTING_PORT,
)
}
}

View File

@ -0,0 +1,12 @@
mod app;
mod compose_env;
mod k8s_env;
mod local_env;
pub mod scenario;
pub use app::*;
pub use scenario::{KvBuilderExt, KvScenarioBuilder};
pub type KvLocalDeployer = testing_framework_runner_local::ProcessDeployer<KvEnv>;
pub type KvComposeDeployer = testing_framework_runner_compose::ComposeDeployer<KvEnv>;
pub type KvK8sDeployer = testing_framework_runner_k8s::K8sDeployer<KvEnv>;

View File

@ -0,0 +1,41 @@
use std::collections::HashMap;
use testing_framework_core::scenario::{DynError, StartNodeOptions};
use testing_framework_runner_local::{
LocalBinaryApp, LocalNodePorts, LocalPeerNode, LocalProcessSpec,
build_local_cluster_node_config, yaml_node_config,
};
use crate::{KvEnv, KvNodeConfig};
impl LocalBinaryApp for KvEnv {
fn initial_node_name_prefix() -> &'static str {
"kv-node"
}
fn build_local_node_config_with_peers(
_topology: &Self::Deployment,
index: usize,
ports: &LocalNodePorts,
peers: &[LocalPeerNode],
_peer_ports_by_name: &HashMap<String, u16>,
_options: &StartNodeOptions<Self>,
_template_config: Option<
&<Self as testing_framework_core::scenario::Application>::NodeConfig,
>,
) -> Result<<Self as testing_framework_core::scenario::Application>::NodeConfig, DynError> {
build_local_cluster_node_config::<Self>(index, ports, peers)
}
fn local_process_spec() -> LocalProcessSpec {
LocalProcessSpec::new("KVSTORE_NODE_BIN", "kvstore-node").with_rust_log("kvstore_node=info")
}
fn render_local_config(config: &KvNodeConfig) -> Result<Vec<u8>, DynError> {
yaml_node_config(config)
}
fn http_api_port(config: &KvNodeConfig) -> u16 {
config.http_port
}
}

View File

@ -0,0 +1,15 @@
use testing_framework_core::scenario::ScenarioBuilder;
use crate::{KvEnv, KvTopology};
pub type KvScenarioBuilder = ScenarioBuilder<KvEnv>;
pub trait KvBuilderExt: Sized {
fn deployment_with(f: impl FnOnce(KvTopology) -> KvTopology) -> Self;
}
impl KvBuilderExt for KvScenarioBuilder {
fn deployment_with(f: impl FnOnce(KvTopology) -> KvTopology) -> Self {
KvScenarioBuilder::with_deployment(f(KvTopology::new(3)))
}
}

View File

@ -0,0 +1,14 @@
[package]
edition.workspace = true
license.workspace = true
name = "kvstore-runtime-workloads"
version.workspace = true
[dependencies]
kvstore-runtime-ext = { path = "../integration" }
testing-framework-core = { workspace = true }
async-trait = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }

View File

@ -0,0 +1,103 @@
use std::time::Duration;
use async_trait::async_trait;
use kvstore_runtime_ext::KvEnv;
use serde::Deserialize;
use testing_framework_core::scenario::{DynError, Expectation, RunContext};
use tracing::info;
#[derive(Clone)]
pub struct KvConverges {
key_prefix: String,
key_count: usize,
timeout: Duration,
poll_interval: Duration,
}
#[derive(Deserialize, Clone, Debug, Eq, PartialEq)]
struct ValueRecord {
value: String,
version: u64,
origin: u64,
}
#[derive(Deserialize)]
struct GetResponse {
record: Option<ValueRecord>,
}
impl KvConverges {
#[must_use]
pub fn new(key_prefix: impl Into<String>, key_count: usize) -> Self {
Self {
key_prefix: key_prefix.into(),
key_count,
timeout: Duration::from_secs(20),
poll_interval: Duration::from_millis(500),
}
}
#[must_use]
pub const fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
}
#[async_trait]
impl Expectation<KvEnv> for KvConverges {
fn name(&self) -> &str {
"kv_converges"
}
async fn evaluate(&mut self, ctx: &RunContext<KvEnv>) -> Result<(), DynError> {
let clients = ctx.node_clients().snapshot();
if clients.is_empty() {
return Err("no kv node clients available".into());
}
let deadline = tokio::time::Instant::now() + self.timeout;
while tokio::time::Instant::now() < deadline {
if self.is_converged(&clients).await? {
info!(key_count = self.key_count, "kv convergence reached");
return Ok(());
}
tokio::time::sleep(self.poll_interval).await;
}
Err(format!(
"kv convergence not reached within {:?} for {} keys",
self.timeout, self.key_count
)
.into())
}
}
impl KvConverges {
async fn is_converged(
&self,
clients: &[kvstore_runtime_ext::KvHttpClient],
) -> Result<bool, DynError> {
for key_idx in 0..self.key_count {
let key = format!("{}-{key_idx}", self.key_prefix);
let first = read_key(clients, &key, 0).await?;
for node_idx in 1..clients.len() {
let current = read_key(clients, &key, node_idx).await?;
if current != first {
return Ok(false);
}
}
}
Ok(true)
}
}
async fn read_key(
clients: &[kvstore_runtime_ext::KvHttpClient],
key: &str,
index: usize,
) -> Result<Option<ValueRecord>, DynError> {
let response: GetResponse = clients[index].get(&format!("/kv/{key}")).await?;
Ok(response.record)
}

View File

@ -0,0 +1,6 @@
mod expectations;
mod write;
pub use expectations::KvConverges;
pub use kvstore_runtime_ext::{KvBuilderExt, KvEnv, KvScenarioBuilder, KvTopology};
pub use write::KvWriteWorkload;

View File

@ -0,0 +1,135 @@
use std::time::Duration;
use async_trait::async_trait;
use kvstore_runtime_ext::KvEnv;
use serde::{Deserialize, Serialize};
use testing_framework_core::scenario::{DynError, RunContext, Workload};
use tracing::info;
#[derive(Clone)]
pub struct KvWriteWorkload {
operations: usize,
key_count: usize,
rate_per_sec: Option<usize>,
key_prefix: String,
}
#[derive(Serialize)]
struct PutRequest {
value: String,
expected_version: Option<u64>,
}
#[derive(Deserialize)]
struct PutResponse {
applied: bool,
version: u64,
}
impl KvWriteWorkload {
#[must_use]
pub fn new() -> Self {
Self {
operations: 200,
key_count: 20,
rate_per_sec: Some(25),
key_prefix: "kv-demo".to_owned(),
}
}
#[must_use]
pub const fn operations(mut self, value: usize) -> Self {
self.operations = value;
self
}
#[must_use]
pub const fn key_count(mut self, value: usize) -> Self {
self.key_count = value;
self
}
#[must_use]
pub const fn rate_per_sec(mut self, value: usize) -> Self {
self.rate_per_sec = Some(value);
self
}
#[must_use]
pub fn key_prefix(mut self, value: impl Into<String>) -> Self {
self.key_prefix = value.into();
self
}
}
impl Default for KvWriteWorkload {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Workload<KvEnv> for KvWriteWorkload {
fn name(&self) -> &str {
"kv_write_workload"
}
async fn start(&self, ctx: &RunContext<KvEnv>) -> Result<(), DynError> {
let clients = ctx.node_clients().snapshot();
let Some(leader) = clients.first() else {
return Err("no kv node clients available".into());
};
if self.key_count == 0 {
return Err("kv workload key_count must be > 0".into());
}
let interval = self.rate_per_sec.and_then(compute_interval);
info!(
operations = self.operations,
key_count = self.key_count,
rate_per_sec = ?self.rate_per_sec,
"starting kv write workload"
);
for idx in 0..self.operations {
let key = format!("{}-{}", self.key_prefix, idx % self.key_count);
let value = format!("value-{idx}");
let response: PutResponse = leader
.put(
&format!("/kv/{key}"),
&PutRequest {
value,
expected_version: None,
},
)
.await?;
if !response.applied {
return Err(format!("leader rejected write for key {key}").into());
}
if (idx + 1) % 25 == 0 {
info!(
completed = idx + 1,
version = response.version,
"kv write progress"
);
}
if let Some(delay) = interval {
tokio::time::sleep(delay).await;
}
}
Ok(())
}
}
fn compute_interval(rate_per_sec: usize) -> Option<Duration> {
if rate_per_sec == 0 {
return None;
}
Some(Duration::from_millis((1000 / rate_per_sec as u64).max(1)))
}

24
examples/queue/Dockerfile Normal file
View File

@ -0,0 +1,24 @@
FROM rustlang/rust:nightly-bookworm AS builder
WORKDIR /build
COPY Cargo.toml Cargo.lock ./
COPY cfgsync/ ./cfgsync/
COPY examples/ ./examples/
COPY testing-framework/ ./testing-framework/
RUN cargo build --release -p queue-node
FROM debian:bookworm-slim
RUN apt-get update && \
apt-get install -y ca-certificates && \
rm -rf /var/lib/apt/lists/*
COPY --from=builder /build/target/release/queue-node /usr/local/bin/queue-node
RUN mkdir -p /etc/queue
WORKDIR /app
ENTRYPOINT ["/usr/local/bin/queue-node"]
CMD ["--config", "/etc/queue/config.yaml"]

47
examples/queue/README.md Normal file
View File

@ -0,0 +1,47 @@
# Queue Example
This example runs a small replicated FIFO queue.
The scenarios enqueue messages, dequeue them again, and check that queue state
either converges or drains as expected.
## How TF runs this
Each example follows the same pattern:
- TF starts a small deployment of queue nodes
- a workload produces messages, or produces and consumes them
- an expectation checks either that queue state converges or that the queue drains
## Scenarios
- `basic_convergence` produces messages and checks that queue state converges locally
- `basic_roundtrip` produces and consumes messages locally until the queue drains
- `basic_restart_chaos` injects random local node restarts during the run
- `compose_convergence` and `compose_roundtrip` run the same checks in Docker Compose
## API
Each node exposes:
- `POST /queue/enqueue` to add a message
- `POST /queue/dequeue` to remove a message
- `GET /queue/state` to inspect the current queue state
- `GET /internal/snapshot` to read the local replicated state
## Run locally
```bash
cargo run -p queue-examples --bin basic_convergence
cargo run -p queue-examples --bin basic_roundtrip
cargo run -p queue-examples --bin basic_restart_chaos
```
## Run with Docker Compose
```bash
cargo run -p queue-examples --bin compose_convergence
cargo run -p queue-examples --bin compose_roundtrip
```
Set `QUEUE_IMAGE` to override the default compose image tag.

View File

@ -0,0 +1,16 @@
[package]
edition.workspace = true
license.workspace = true
name = "queue-examples"
version.workspace = true
[dependencies]
anyhow = "1.0"
async-trait = { workspace = true }
queue-runtime-ext = { path = "../testing/integration" }
queue-runtime-workloads = { path = "../testing/workloads" }
testing-framework-core = { workspace = true }
testing-framework-runner-compose = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@ -0,0 +1,32 @@
use std::time::Duration;
use queue_runtime_ext::QueueLocalDeployer;
use queue_runtime_workloads::{
QueueBuilderExt, QueueConverges, QueueProduceWorkload, QueueScenarioBuilder, QueueTopology,
};
use testing_framework_core::scenario::Deployer;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let operations = 300;
let mut scenario = QueueScenarioBuilder::deployment_with(|_| QueueTopology::new(3))
.with_run_duration(Duration::from_secs(30))
.with_workload(
QueueProduceWorkload::new()
.operations(operations)
.rate_per_sec(30)
.payload_prefix("demo"),
)
.with_expectation(QueueConverges::new(operations).timeout(Duration::from_secs(25)))
.build()?;
let deployer = QueueLocalDeployer::default();
let runner = deployer.deploy(&scenario).await?;
runner.run(&mut scenario).await?;
Ok(())
}

View File

@ -0,0 +1,84 @@
use std::time::Duration;
use async_trait::async_trait;
use queue_runtime_ext::QueueLocalDeployer;
use queue_runtime_workloads::{
QueueBuilderExt, QueueConverges, QueueProduceWorkload, QueueScenarioBuilder, QueueTopology,
};
use testing_framework_core::{
scenario::{Deployer, DynError, RunContext, Workload},
topology::DeploymentDescriptor,
};
use tracing::info;
#[derive(Clone)]
struct FixedRestartChaosWorkload {
restarts: usize,
delay: Duration,
}
impl FixedRestartChaosWorkload {
const fn new(restarts: usize, delay: Duration) -> Self {
Self { restarts, delay }
}
}
#[async_trait]
impl Workload<queue_runtime_workloads::QueueEnv> for FixedRestartChaosWorkload {
fn name(&self) -> &str {
"fixed_restart_chaos"
}
async fn start(
&self,
ctx: &RunContext<queue_runtime_workloads::QueueEnv>,
) -> Result<(), DynError> {
let Some(control) = ctx.node_control() else {
return Err("fixed restart chaos requires node control".into());
};
let node_count = ctx.descriptors().node_count();
if node_count == 0 {
return Err("fixed restart chaos requires at least one node".into());
}
for step in 0..self.restarts {
tokio::time::sleep(self.delay).await;
let target_index = if node_count > 1 {
(step % (node_count - 1)) + 1
} else {
0
};
let target = format!("node-{target_index}");
info!(step, %target, "triggering controlled chaos restart");
control.restart_node(&target).await?;
}
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let mut scenario = QueueScenarioBuilder::deployment_with(|_| QueueTopology::new(3))
.enable_node_control()
.with_workload(FixedRestartChaosWorkload::new(3, Duration::from_secs(8)))
.with_run_duration(Duration::from_secs(30))
.with_workload(
QueueProduceWorkload::new()
.operations(400)
.rate_per_sec(40)
.payload_prefix("queue-chaos"),
)
.with_expectation(QueueConverges::new(200).timeout(Duration::from_secs(30)))
.build()?;
let deployer = QueueLocalDeployer::default();
let runner = deployer.deploy(&scenario).await?;
runner.run(&mut scenario).await?;
Ok(())
}

View File

@ -0,0 +1,31 @@
use std::time::Duration;
use queue_runtime_ext::QueueLocalDeployer;
use queue_runtime_workloads::{
QueueBuilderExt, QueueDrained, QueueRoundTripWorkload, QueueScenarioBuilder, QueueTopology,
};
use testing_framework_core::scenario::Deployer;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let operations = 200;
let mut scenario = QueueScenarioBuilder::deployment_with(|_| QueueTopology::new(3))
.with_run_duration(Duration::from_secs(30))
.with_workload(
QueueRoundTripWorkload::new()
.operations(operations)
.rate_per_sec(25),
)
.with_expectation(QueueDrained::new().timeout(Duration::from_secs(25)))
.build()?;
let deployer = QueueLocalDeployer::default();
let runner = deployer.deploy(&scenario).await?;
runner.run(&mut scenario).await?;
Ok(())
}

View File

@ -0,0 +1,47 @@
use std::time::Duration;
use anyhow::{Context as _, Result};
use queue_runtime_workloads::{
QueueBuilderExt, QueueConverges, QueueProduceWorkload, QueueScenarioBuilder, QueueTopology,
};
use testing_framework_core::scenario::Deployer;
use testing_framework_runner_compose::ComposeRunnerError;
use tracing::{info, warn};
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let operations = 200;
let mut scenario = QueueScenarioBuilder::deployment_with(|_| QueueTopology::new(3))
.with_run_duration(Duration::from_secs(30))
.with_workload(
QueueProduceWorkload::new()
.operations(operations)
.rate_per_sec(20),
)
.with_expectation(QueueConverges::new(operations).timeout(Duration::from_secs(25)))
.build()?;
let deployer = queue_runtime_ext::QueueComposeDeployer::new();
let runner = match deployer.deploy(&scenario).await {
Ok(runner) => runner,
Err(ComposeRunnerError::DockerUnavailable) => {
warn!("docker unavailable; skipping compose queue run");
return Ok(());
}
Err(error) => {
return Err(anyhow::Error::new(error)).context("deploying queue compose stack");
}
};
info!("running queue compose convergence scenario");
runner
.run(&mut scenario)
.await
.context("running queue compose scenario")?;
Ok(())
}

View File

@ -0,0 +1,48 @@
use std::time::Duration;
use anyhow::{Context as _, Result};
use queue_runtime_workloads::{
QueueBuilderExt, QueueDrained, QueueRoundTripWorkload, QueueScenarioBuilder, QueueTopology,
};
use testing_framework_core::scenario::Deployer;
use testing_framework_runner_compose::ComposeRunnerError;
use tracing::{info, warn};
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let operations = 200;
let mut scenario = QueueScenarioBuilder::deployment_with(|_| QueueTopology::new(3))
.with_run_duration(Duration::from_secs(30))
.with_workload(
QueueRoundTripWorkload::new()
.operations(operations)
.rate_per_sec(20),
)
.with_expectation(QueueDrained::new().timeout(Duration::from_secs(25)))
.build()?;
let deployer = queue_runtime_ext::QueueComposeDeployer::new();
let runner = match deployer.deploy(&scenario).await {
Ok(runner) => runner,
Err(ComposeRunnerError::DockerUnavailable) => {
warn!("docker unavailable; skipping compose queue roundtrip run");
return Ok(());
}
Err(error) => {
return Err(anyhow::Error::new(error))
.context("deploying queue compose roundtrip stack");
}
};
info!("running queue compose roundtrip scenario");
runner
.run(&mut scenario)
.await
.context("running queue compose roundtrip scenario")?;
Ok(())
}

View File

@ -0,0 +1,21 @@
[package]
edition.workspace = true
license.workspace = true
name = "queue-node"
version.workspace = true
[[bin]]
name = "queue-node"
path = "src/main.rs"
[dependencies]
anyhow = "1.0"
axum = "0.7"
clap = { version = "4.0", features = ["derive"] }
reqwest = { workspace = true, features = ["json"] }
serde = { workspace = true }
serde_yaml = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tower-http = { version = "0.6", features = ["trace"] }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@ -0,0 +1,29 @@
use std::{fs, path::Path};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PeerInfo {
pub node_id: u64,
pub http_address: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct QueueConfig {
pub node_id: u64,
pub http_port: u16,
pub peers: Vec<PeerInfo>,
#[serde(default = "default_sync_interval_ms")]
pub sync_interval_ms: u64,
}
impl QueueConfig {
pub fn load(path: &Path) -> anyhow::Result<Self> {
let raw = fs::read_to_string(path)?;
Ok(serde_yaml::from_str(&raw)?)
}
}
const fn default_sync_interval_ms() -> u64 {
1000
}

View File

@ -0,0 +1,36 @@
mod config;
mod server;
mod state;
mod sync;
use std::path::PathBuf;
use clap::Parser;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use crate::{config::QueueConfig, state::QueueState, sync::SyncService};
#[derive(Parser, Debug)]
#[command(name = "queue-node")]
struct Args {
#[arg(short, long)]
config: PathBuf,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "queue_node=info,tower_http=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let args = Args::parse();
let config = QueueConfig::load(&args.config)?;
let state = QueueState::new(config.node_id);
SyncService::new(config.clone(), state.clone()).start();
server::start_server(config, state).await
}

View File

@ -0,0 +1,115 @@
use std::net::SocketAddr;
use axum::{
Router,
extract::State,
http::StatusCode,
response::Json,
routing::{get, post},
};
use serde::{Deserialize, Serialize};
use tower_http::trace::TraceLayer;
use crate::{
config::QueueConfig,
state::{QueueMessage, QueueRevision, QueueState, QueueStateView, Snapshot},
};
#[derive(Serialize)]
struct HealthResponse {
status: &'static str,
}
#[derive(Deserialize)]
struct EnqueueRequest {
payload: String,
}
#[derive(Serialize)]
struct EnqueueResponse {
accepted: bool,
id: u64,
queue_len: usize,
revision: QueueRevision,
}
#[derive(Serialize)]
struct DequeueResponse {
message: Option<QueueMessage>,
queue_len: usize,
revision: QueueRevision,
}
pub async fn start_server(config: QueueConfig, state: QueueState) -> anyhow::Result<()> {
let app = Router::new()
.route("/health/live", get(health_live))
.route("/health/ready", get(health_ready))
.route("/queue/enqueue", post(enqueue))
.route("/queue/dequeue", post(dequeue))
.route("/queue/state", get(queue_state))
.route("/internal/snapshot", get(get_snapshot))
.layer(TraceLayer::new_for_http())
.with_state(state.clone());
let addr = SocketAddr::from(([0, 0, 0, 0], config.http_port));
let listener = tokio::net::TcpListener::bind(addr).await?;
state.set_ready(true).await;
tracing::info!(node_id = state.node_id(), %addr, "queue node ready");
axum::serve(listener, app).await?;
Ok(())
}
async fn health_live() -> (StatusCode, Json<HealthResponse>) {
(StatusCode::OK, Json(HealthResponse { status: "alive" }))
}
async fn health_ready(State(state): State<QueueState>) -> (StatusCode, Json<HealthResponse>) {
if state.is_ready().await {
(StatusCode::OK, Json(HealthResponse { status: "ready" }))
} else {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(HealthResponse {
status: "not-ready",
}),
)
}
}
async fn enqueue(
State(state): State<QueueState>,
Json(request): Json<EnqueueRequest>,
) -> (StatusCode, Json<EnqueueResponse>) {
let outcome = state.enqueue_local(request.payload).await;
(
StatusCode::OK,
Json(EnqueueResponse {
accepted: outcome.accepted,
id: outcome.id,
queue_len: outcome.queue_len,
revision: outcome.revision,
}),
)
}
async fn dequeue(State(state): State<QueueState>) -> (StatusCode, Json<DequeueResponse>) {
let outcome = state.dequeue_local().await;
(
StatusCode::OK,
Json(DequeueResponse {
message: outcome.message,
queue_len: outcome.queue_len,
revision: outcome.revision,
}),
)
}
async fn queue_state(State(state): State<QueueState>) -> Json<QueueStateView> {
Json(state.queue_state().await)
}
async fn get_snapshot(State(state): State<QueueState>) -> Json<Snapshot> {
Json(state.snapshot().await)
}

View File

@ -0,0 +1,151 @@
use std::{collections::VecDeque, sync::Arc};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
pub struct QueueRevision {
pub version: u64,
pub origin: u64,
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct QueueMessage {
pub id: u64,
pub payload: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Snapshot {
pub node_id: u64,
pub revision: QueueRevision,
pub messages: Vec<QueueMessage>,
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub struct QueueStateView {
pub revision: QueueRevision,
pub queue_len: usize,
pub head_id: Option<u64>,
pub tail_id: Option<u64>,
}
#[derive(Clone, Debug)]
pub struct EnqueueOutcome {
pub accepted: bool,
pub id: u64,
pub queue_len: usize,
pub revision: QueueRevision,
}
#[derive(Clone, Debug)]
pub struct DequeueOutcome {
pub message: Option<QueueMessage>,
pub queue_len: usize,
pub revision: QueueRevision,
}
#[derive(Debug, Default)]
struct QueueData {
revision: QueueRevision,
messages: VecDeque<QueueMessage>,
}
#[derive(Clone)]
pub struct QueueState {
node_id: u64,
ready: Arc<RwLock<bool>>,
data: Arc<RwLock<QueueData>>,
}
impl QueueState {
pub fn new(node_id: u64) -> Self {
Self {
node_id,
ready: Arc::new(RwLock::new(false)),
data: Arc::new(RwLock::new(QueueData::default())),
}
}
pub const fn node_id(&self) -> u64 {
self.node_id
}
pub async fn set_ready(&self, value: bool) {
*self.ready.write().await = value;
}
pub async fn is_ready(&self) -> bool {
*self.ready.read().await
}
pub async fn enqueue_local(&self, payload: String) -> EnqueueOutcome {
let mut data = self.data.write().await;
let id = next_message_id(&data.messages);
data.messages.push_back(QueueMessage { id, payload });
bump_revision(&mut data.revision, self.node_id);
EnqueueOutcome {
accepted: true,
id,
queue_len: data.messages.len(),
revision: data.revision,
}
}
pub async fn dequeue_local(&self) -> DequeueOutcome {
let mut data = self.data.write().await;
let message = data.messages.pop_front();
if message.is_some() {
bump_revision(&mut data.revision, self.node_id);
}
DequeueOutcome {
message,
queue_len: data.messages.len(),
revision: data.revision,
}
}
pub async fn queue_state(&self) -> QueueStateView {
let data = self.data.read().await;
QueueStateView {
revision: data.revision,
queue_len: data.messages.len(),
head_id: data.messages.front().map(|message| message.id),
tail_id: data.messages.back().map(|message| message.id),
}
}
pub async fn merge_snapshot(&self, snapshot: Snapshot) {
let mut data = self.data.write().await;
if is_newer_revision(snapshot.revision, data.revision) {
data.revision = snapshot.revision;
data.messages = snapshot.messages.into();
}
}
pub async fn snapshot(&self) -> Snapshot {
let data = self.data.read().await;
Snapshot {
node_id: self.node_id,
revision: data.revision,
messages: data.messages.iter().cloned().collect(),
}
}
}
fn next_message_id(messages: &VecDeque<QueueMessage>) -> u64 {
messages
.back()
.map_or(1, |message| message.id.saturating_add(1))
}
fn bump_revision(revision: &mut QueueRevision, node_id: u64) {
revision.version = revision.version.saturating_add(1);
revision.origin = node_id;
}
fn is_newer_revision(candidate: QueueRevision, existing: QueueRevision) -> bool {
(candidate.version, candidate.origin) > (existing.version, existing.origin)
}

View File

@ -0,0 +1,103 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use reqwest::Client;
use tokio::sync::Mutex;
use tracing::{debug, warn};
use crate::{
config::QueueConfig,
state::{QueueState, Snapshot},
};
const WARN_AFTER_CONSECUTIVE_FAILURES: u32 = 5;
#[derive(Clone)]
pub struct SyncService {
config: Arc<QueueConfig>,
state: QueueState,
client: Client,
failures_by_peer: Arc<Mutex<HashMap<String, u32>>>,
}
impl SyncService {
pub fn new(config: QueueConfig, state: QueueState) -> Self {
Self {
config: Arc::new(config),
state,
client: Client::new(),
failures_by_peer: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn start(&self) {
let service = self.clone();
tokio::spawn(async move {
service.run().await;
});
}
async fn run(self) {
let interval = Duration::from_millis(self.config.sync_interval_ms.max(100));
loop {
self.sync_once().await;
tokio::time::sleep(interval).await;
}
}
async fn sync_once(&self) {
for peer in &self.config.peers {
match self.fetch_snapshot(&peer.http_address).await {
Ok(snapshot) => {
self.state.merge_snapshot(snapshot).await;
self.clear_failure_counter(&peer.http_address).await;
}
Err(error) => {
self.record_sync_failure(&peer.http_address, &error).await;
}
}
}
}
async fn fetch_snapshot(&self, peer_address: &str) -> anyhow::Result<Snapshot> {
let url = format!("http://{peer_address}/internal/snapshot");
let snapshot = self
.client
.get(url)
.send()
.await?
.error_for_status()?
.json()
.await?;
Ok(snapshot)
}
async fn clear_failure_counter(&self, peer_address: &str) {
let mut failures = self.failures_by_peer.lock().await;
failures.remove(peer_address);
}
async fn record_sync_failure(&self, peer_address: &str, error: &anyhow::Error) {
let consecutive_failures = {
let mut failures = self.failures_by_peer.lock().await;
let entry = failures.entry(peer_address.to_owned()).or_insert(0);
*entry += 1;
*entry
};
if consecutive_failures >= WARN_AFTER_CONSECUTIVE_FAILURES {
warn!(
peer = %peer_address,
%error,
consecutive_failures,
"queue sync repeatedly failing"
);
} else {
debug!(
peer = %peer_address,
%error,
consecutive_failures,
"queue sync failed"
);
}
}
}

View File

@ -0,0 +1,13 @@
[package]
edition.workspace = true
license.workspace = true
name = "queue-runtime-ext"
version.workspace = true
[dependencies]
async-trait = { workspace = true }
reqwest = { workspace = true, features = ["json"] }
serde = { workspace = true }
testing-framework-core = { workspace = true }
testing-framework-runner-compose = { workspace = true }
testing-framework-runner-local = { workspace = true }

View File

@ -0,0 +1,114 @@
use std::io::Error;
use async_trait::async_trait;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use testing_framework_core::scenario::{
Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView,
DefaultFeedRuntime, DynError, NodeAccess, serialize_cluster_yaml_config,
};
pub type QueueTopology = testing_framework_core::topology::ClusterTopology;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct QueuePeerInfo {
pub node_id: u64,
pub http_address: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct QueueNodeConfig {
pub node_id: u64,
pub http_port: u16,
pub peers: Vec<QueuePeerInfo>,
pub sync_interval_ms: u64,
}
#[derive(Clone)]
pub struct QueueHttpClient {
base_url: Url,
client: reqwest::Client,
}
impl QueueHttpClient {
#[must_use]
pub fn new(base_url: Url) -> Self {
Self {
base_url,
client: reqwest::Client::new(),
}
}
pub async fn get<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T, DynError> {
let url = self.base_url.join(path)?;
let response = self.client.get(url).send().await?.error_for_status()?;
Ok(response.json().await?)
}
pub async fn post<B: Serialize, T: serde::de::DeserializeOwned>(
&self,
path: &str,
body: &B,
) -> Result<T, DynError> {
let url = self.base_url.join(path)?;
let response = self
.client
.post(url)
.json(body)
.send()
.await?
.error_for_status()?;
Ok(response.json().await?)
}
}
pub struct QueueEnv;
#[async_trait]
impl Application for QueueEnv {
type Deployment = QueueTopology;
type NodeClient = QueueHttpClient;
type NodeConfig = QueueNodeConfig;
type FeedRuntime = DefaultFeedRuntime;
fn build_node_client(access: &NodeAccess) -> Result<Self::NodeClient, DynError> {
Ok(QueueHttpClient::new(access.api_base_url()?))
}
fn node_readiness_path() -> &'static str {
"/health/ready"
}
}
impl ClusterNodeConfigApplication for QueueEnv {
type ConfigError = Error;
fn static_network_port() -> u16 {
8080
}
fn build_cluster_node_config(
node: &ClusterNodeView,
peers: &[ClusterPeerView],
) -> Result<Self::NodeConfig, Self::ConfigError> {
let peers = peers
.iter()
.map(|peer| QueuePeerInfo {
node_id: peer.index() as u64,
http_address: peer.authority(),
})
.collect::<Vec<_>>();
Ok(QueueNodeConfig {
node_id: node.index() as u64,
http_port: node.network_port(),
peers,
sync_interval_ms: 500,
})
}
fn serialize_cluster_node_config(
config: &Self::NodeConfig,
) -> Result<String, Self::ConfigError> {
serialize_cluster_yaml_config(config).map_err(Error::other)
}
}

View File

@ -0,0 +1,15 @@
use testing_framework_runner_compose::{BinaryConfigNodeSpec, ComposeBinaryApp};
use crate::QueueEnv;
const NODE_CONFIG_PATH: &str = "/etc/queue/config.yaml";
impl ComposeBinaryApp for QueueEnv {
fn compose_node_spec() -> BinaryConfigNodeSpec {
BinaryConfigNodeSpec::conventional(
"/usr/local/bin/queue-node",
NODE_CONFIG_PATH,
vec![8080, 8081],
)
}
}

View File

@ -0,0 +1,10 @@
mod app;
mod compose_env;
mod local_env;
pub mod scenario;
pub use app::*;
pub use scenario::{QueueBuilderExt, QueueScenarioBuilder};
pub type QueueLocalDeployer = testing_framework_runner_local::ProcessDeployer<QueueEnv>;
pub type QueueComposeDeployer = testing_framework_runner_compose::ComposeDeployer<QueueEnv>;

View File

@ -0,0 +1,41 @@
use std::collections::HashMap;
use testing_framework_core::scenario::{DynError, StartNodeOptions};
use testing_framework_runner_local::{
LocalBinaryApp, LocalNodePorts, LocalPeerNode, LocalProcessSpec,
build_local_cluster_node_config, yaml_node_config,
};
use crate::{QueueEnv, QueueNodeConfig};
impl LocalBinaryApp for QueueEnv {
fn initial_node_name_prefix() -> &'static str {
"queue-node"
}
fn build_local_node_config_with_peers(
_topology: &Self::Deployment,
index: usize,
ports: &LocalNodePorts,
peers: &[LocalPeerNode],
_peer_ports_by_name: &HashMap<String, u16>,
_options: &StartNodeOptions<Self>,
_template_config: Option<
&<Self as testing_framework_core::scenario::Application>::NodeConfig,
>,
) -> Result<<Self as testing_framework_core::scenario::Application>::NodeConfig, DynError> {
build_local_cluster_node_config::<Self>(index, ports, peers)
}
fn local_process_spec() -> LocalProcessSpec {
LocalProcessSpec::new("QUEUE_NODE_BIN", "queue-node").with_rust_log("queue_node=info")
}
fn render_local_config(config: &QueueNodeConfig) -> Result<Vec<u8>, DynError> {
yaml_node_config(config)
}
fn http_api_port(config: &QueueNodeConfig) -> u16 {
config.http_port
}
}

View File

@ -0,0 +1,15 @@
use testing_framework_core::scenario::ScenarioBuilder;
use crate::{QueueEnv, QueueTopology};
pub type QueueScenarioBuilder = ScenarioBuilder<QueueEnv>;
pub trait QueueBuilderExt: Sized {
fn deployment_with(f: impl FnOnce(QueueTopology) -> QueueTopology) -> Self;
}
impl QueueBuilderExt for QueueScenarioBuilder {
fn deployment_with(f: impl FnOnce(QueueTopology) -> QueueTopology) -> Self {
QueueScenarioBuilder::with_deployment(f(QueueTopology::new(3)))
}
}

View File

@ -0,0 +1,13 @@
[package]
edition.workspace = true
license.workspace = true
name = "queue-runtime-workloads"
version.workspace = true
[dependencies]
async-trait = { workspace = true }
queue-runtime-ext = { path = "../integration" }
serde = { workspace = true }
testing-framework-core = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }

View File

@ -0,0 +1,106 @@
use std::time::Duration;
use async_trait::async_trait;
use queue_runtime_ext::QueueEnv;
use serde::Deserialize;
use testing_framework_core::scenario::{DynError, Expectation, RunContext};
use tracing::info;
#[derive(Clone)]
pub struct QueueDrained {
timeout: Duration,
poll_interval: Duration,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
struct QueueRevision {
version: u64,
origin: u64,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
struct QueueStateResponse {
revision: QueueRevision,
queue_len: usize,
head_id: Option<u64>,
tail_id: Option<u64>,
}
impl QueueDrained {
#[must_use]
pub fn new() -> Self {
Self {
timeout: Duration::from_secs(20),
poll_interval: Duration::from_millis(500),
}
}
#[must_use]
pub const fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
}
impl Default for QueueDrained {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Expectation<QueueEnv> for QueueDrained {
fn name(&self) -> &str {
"queue_drained"
}
async fn evaluate(&mut self, ctx: &RunContext<QueueEnv>) -> Result<(), DynError> {
let clients = ctx.node_clients().snapshot();
if clients.is_empty() {
return Err("no queue node clients available".into());
}
let deadline = tokio::time::Instant::now() + self.timeout;
while tokio::time::Instant::now() < deadline {
if is_drained_and_converged(&clients).await? {
info!("queue drained and converged");
return Ok(());
}
tokio::time::sleep(self.poll_interval).await;
}
Err(format!("queue not drained within {:?}", self.timeout).into())
}
}
async fn is_drained_and_converged(
clients: &[queue_runtime_ext::QueueHttpClient],
) -> Result<bool, DynError> {
let Some((first, rest)) = clients.split_first() else {
return Ok(false);
};
let baseline = read_state(first).await?;
if !is_drained(&baseline) {
return Ok(false);
}
for client in rest {
let current = read_state(client).await?;
if current != baseline {
return Ok(false);
}
}
Ok(true)
}
fn is_drained(state: &QueueStateResponse) -> bool {
state.queue_len == 0 && state.head_id.is_none() && state.tail_id.is_none()
}
async fn read_state(
client: &queue_runtime_ext::QueueHttpClient,
) -> Result<QueueStateResponse, DynError> {
client.get("/queue/state").await
}

View File

@ -0,0 +1,108 @@
use std::time::Duration;
use async_trait::async_trait;
use queue_runtime_ext::QueueEnv;
use serde::Deserialize;
use testing_framework_core::scenario::{DynError, Expectation, RunContext};
use tracing::info;
#[derive(Clone)]
pub struct QueueConverges {
min_queue_len: usize,
timeout: Duration,
poll_interval: Duration,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
struct QueueRevision {
version: u64,
origin: u64,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
struct QueueStateResponse {
revision: QueueRevision,
queue_len: usize,
head_id: Option<u64>,
tail_id: Option<u64>,
}
impl QueueConverges {
#[must_use]
pub fn new(min_queue_len: usize) -> Self {
Self {
min_queue_len,
timeout: Duration::from_secs(20),
poll_interval: Duration::from_millis(500),
}
}
#[must_use]
pub const fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
}
#[async_trait]
impl Expectation<QueueEnv> for QueueConverges {
fn name(&self) -> &str {
"queue_converges"
}
async fn evaluate(&mut self, ctx: &RunContext<QueueEnv>) -> Result<(), DynError> {
let clients = ctx.node_clients().snapshot();
if clients.is_empty() {
return Err("no queue node clients available".into());
}
let deadline = tokio::time::Instant::now() + self.timeout;
while tokio::time::Instant::now() < deadline {
if self.is_converged(&clients).await? {
info!(
min_queue_len = self.min_queue_len,
"queue convergence reached"
);
return Ok(());
}
tokio::time::sleep(self.poll_interval).await;
}
Err(format!(
"queue convergence not reached within {:?} (min_queue_len={})",
self.timeout, self.min_queue_len
)
.into())
}
}
impl QueueConverges {
async fn is_converged(
&self,
clients: &[queue_runtime_ext::QueueHttpClient],
) -> Result<bool, DynError> {
let Some((first, rest)) = clients.split_first() else {
return Ok(false);
};
let baseline = read_state(first).await?;
if baseline.queue_len < self.min_queue_len {
return Ok(false);
}
for client in rest {
let current = read_state(client).await?;
if current != baseline {
return Ok(false);
}
}
Ok(true)
}
}
async fn read_state(
client: &queue_runtime_ext::QueueHttpClient,
) -> Result<QueueStateResponse, DynError> {
client.get("/queue/state").await
}

View File

@ -0,0 +1,10 @@
mod drained;
mod expectations;
mod produce;
mod roundtrip;
pub use drained::QueueDrained;
pub use expectations::QueueConverges;
pub use produce::QueueProduceWorkload;
pub use queue_runtime_ext::{QueueBuilderExt, QueueEnv, QueueScenarioBuilder, QueueTopology};
pub use roundtrip::QueueRoundTripWorkload;

View File

@ -0,0 +1,116 @@
use std::time::Duration;
use async_trait::async_trait;
use queue_runtime_ext::QueueEnv;
use serde::{Deserialize, Serialize};
use testing_framework_core::scenario::{DynError, RunContext, Workload};
use tracing::info;
#[derive(Clone)]
pub struct QueueProduceWorkload {
operations: usize,
rate_per_sec: Option<usize>,
payload_prefix: String,
}
#[derive(Serialize)]
struct EnqueueRequest {
payload: String,
}
#[derive(Deserialize)]
struct EnqueueResponse {
accepted: bool,
id: u64,
queue_len: usize,
}
impl QueueProduceWorkload {
#[must_use]
pub fn new() -> Self {
Self {
operations: 200,
rate_per_sec: Some(25),
payload_prefix: "queue-demo".to_owned(),
}
}
#[must_use]
pub const fn operations(mut self, value: usize) -> Self {
self.operations = value;
self
}
#[must_use]
pub const fn rate_per_sec(mut self, value: usize) -> Self {
self.rate_per_sec = Some(value);
self
}
#[must_use]
pub fn payload_prefix(mut self, value: impl Into<String>) -> Self {
self.payload_prefix = value.into();
self
}
}
impl Default for QueueProduceWorkload {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Workload<QueueEnv> for QueueProduceWorkload {
fn name(&self) -> &str {
"queue_produce_workload"
}
async fn start(&self, ctx: &RunContext<QueueEnv>) -> Result<(), DynError> {
let clients = ctx.node_clients().snapshot();
let Some(producer) = clients.first() else {
return Err("no queue node clients available".into());
};
let interval = self.rate_per_sec.and_then(compute_interval);
info!(
operations = self.operations,
rate_per_sec = ?self.rate_per_sec,
"starting queue produce workload"
);
for idx in 0..self.operations {
let payload = format!("{}-{idx}", self.payload_prefix);
let response: EnqueueResponse = producer
.post("/queue/enqueue", &EnqueueRequest { payload })
.await?;
if !response.accepted {
return Err(format!("node rejected enqueue at operation {idx}").into());
}
if (idx + 1) % 25 == 0 {
info!(
completed = idx + 1,
last_id = response.id,
queue_len = response.queue_len,
"queue produce progress"
);
}
if let Some(delay) = interval {
tokio::time::sleep(delay).await;
}
}
Ok(())
}
}
fn compute_interval(rate_per_sec: usize) -> Option<Duration> {
if rate_per_sec == 0 {
return None;
}
Some(Duration::from_millis((1000 / rate_per_sec as u64).max(1)))
}

View File

@ -0,0 +1,179 @@
use std::{collections::HashSet, time::Duration};
use async_trait::async_trait;
use queue_runtime_ext::QueueEnv;
use serde::{Deserialize, Serialize};
use testing_framework_core::scenario::{DynError, RunContext, Workload};
use tokio::time::{Instant, sleep};
use tracing::info;
#[derive(Clone)]
pub struct QueueRoundTripWorkload {
operations: usize,
rate_per_sec: Option<usize>,
payload_prefix: String,
drain_timeout: Duration,
empty_poll_interval: Duration,
}
#[derive(Serialize)]
struct EnqueueRequest {
payload: String,
}
#[derive(Deserialize)]
struct EnqueueResponse {
accepted: bool,
id: u64,
}
#[derive(Serialize)]
struct DequeueRequest {}
#[derive(Deserialize)]
struct QueueMessage {
id: u64,
payload: String,
}
#[derive(Deserialize)]
struct DequeueResponse {
message: Option<QueueMessage>,
}
impl QueueRoundTripWorkload {
#[must_use]
pub fn new() -> Self {
Self {
operations: 200,
rate_per_sec: Some(25),
payload_prefix: "queue-roundtrip".to_owned(),
drain_timeout: Duration::from_secs(20),
empty_poll_interval: Duration::from_millis(100),
}
}
#[must_use]
pub const fn operations(mut self, value: usize) -> Self {
self.operations = value;
self
}
#[must_use]
pub const fn rate_per_sec(mut self, value: usize) -> Self {
self.rate_per_sec = Some(value);
self
}
#[must_use]
pub fn payload_prefix(mut self, value: impl Into<String>) -> Self {
self.payload_prefix = value.into();
self
}
#[must_use]
pub const fn drain_timeout(mut self, value: Duration) -> Self {
self.drain_timeout = value;
self
}
}
impl Default for QueueRoundTripWorkload {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Workload<QueueEnv> for QueueRoundTripWorkload {
fn name(&self) -> &str {
"queue_roundtrip_workload"
}
async fn start(&self, ctx: &RunContext<QueueEnv>) -> Result<(), DynError> {
let clients = ctx.node_clients().snapshot();
let Some(driver) = clients.first() else {
return Err("no queue node clients available".into());
};
let interval = self.rate_per_sec.and_then(compute_interval);
let mut produced_ids = HashSet::with_capacity(self.operations);
info!(
operations = self.operations,
"queue roundtrip: produce phase"
);
for idx in 0..self.operations {
let payload = format!("{}-{idx}", self.payload_prefix);
let response: EnqueueResponse = driver
.post("/queue/enqueue", &EnqueueRequest { payload })
.await?;
if !response.accepted {
return Err(format!("enqueue rejected at operation {idx}").into());
}
if !produced_ids.insert(response.id) {
return Err(format!("duplicate enqueue id observed: {}", response.id).into());
}
if let Some(delay) = interval {
sleep(delay).await;
}
}
info!(
operations = self.operations,
"queue roundtrip: consume phase"
);
let mut consumed = 0usize;
let deadline = Instant::now() + self.drain_timeout;
while consumed < self.operations && Instant::now() < deadline {
let response: DequeueResponse =
driver.post("/queue/dequeue", &DequeueRequest {}).await?;
match response.message {
Some(message) => {
if !message.payload.starts_with(&self.payload_prefix) {
return Err(format!("unexpected payload: {}", message.payload).into());
}
if !produced_ids.remove(&message.id) {
return Err(
format!("unknown or duplicate dequeue id: {}", message.id).into()
);
}
consumed += 1;
}
None => sleep(self.empty_poll_interval).await,
}
}
if consumed != self.operations {
return Err(format!(
"queue roundtrip timed out: consumed {consumed}/{} messages",
self.operations
)
.into());
}
if !produced_ids.is_empty() {
return Err(format!(
"queue roundtrip ended with {} undrained produced ids",
produced_ids.len()
)
.into());
}
info!(operations = self.operations, "queue roundtrip finished");
Ok(())
}
}
fn compute_interval(rate_per_sec: usize) -> Option<Duration> {
if rate_per_sec == 0 {
return None;
}
Some(Duration::from_millis((1000 / rate_per_sec as u64).max(1)))
}

View File

@ -0,0 +1,24 @@
FROM rustlang/rust:nightly-bookworm AS builder
WORKDIR /build
COPY Cargo.toml Cargo.lock ./
COPY cfgsync/ ./cfgsync/
COPY examples/ ./examples/
COPY testing-framework/ ./testing-framework/
RUN cargo build --release -p scheduler-node
FROM debian:bookworm-slim
RUN apt-get update && \
apt-get install -y ca-certificates && \
rm -rf /var/lib/apt/lists/*
COPY --from=builder /build/target/release/scheduler-node /usr/local/bin/scheduler-node
RUN mkdir -p /etc/scheduler
WORKDIR /app
ENTRYPOINT ["/usr/local/bin/scheduler-node"]
CMD ["--config", "/etc/scheduler/config.yaml"]

View File

@ -0,0 +1,45 @@
# Scheduler Example
This example runs a small replicated job scheduler with worker leases.
The scenario enqueues jobs, lets one worker claim them, stops making progress,
and then checks that another worker can reclaim and complete them after the
lease expires.
## How TF runs this
Each example follows the same pattern:
- TF starts a small deployment of scheduler nodes
- the workload drives the worker flow through the HTTP API
- the expectation checks that jobs are eventually reclaimed and completed
## Scenario
- `basic_failover` runs the failover flow locally
- `compose_failover` runs the same flow in Docker Compose
## API
Each node exposes:
- `POST /jobs/enqueue` to add jobs
- `POST /jobs/claim` to claim pending jobs with a lease
- `POST /jobs/heartbeat` to extend a lease
- `POST /jobs/ack` to mark a job complete
- `GET /jobs/state` to inspect scheduler state
- `GET /internal/snapshot` to read the local replicated state
## Run locally
```bash
cargo run -p scheduler-examples --bin basic_failover
```
## Run with Docker Compose
```bash
cargo run -p scheduler-examples --bin compose_failover
```
Set `SCHEDULER_IMAGE` to override the default compose image tag.

View File

@ -0,0 +1,15 @@
[package]
edition.workspace = true
license.workspace = true
name = "scheduler-examples"
version.workspace = true
[dependencies]
anyhow = "1.0"
scheduler-runtime-ext = { path = "../testing/integration" }
scheduler-runtime-workloads = { path = "../testing/workloads" }
testing-framework-core = { workspace = true }
testing-framework-runner-compose = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@ -0,0 +1,33 @@
use std::time::Duration;
use scheduler_runtime_ext::SchedulerLocalDeployer;
use scheduler_runtime_workloads::{
SchedulerBuilderExt, SchedulerDrained, SchedulerLeaseFailoverWorkload,
SchedulerScenarioBuilder, SchedulerTopology,
};
use testing_framework_core::scenario::Deployer;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let jobs = 100;
let mut scenario = SchedulerScenarioBuilder::deployment_with(|_| SchedulerTopology::new(3))
.with_run_duration(Duration::from_secs(35))
.with_workload(
SchedulerLeaseFailoverWorkload::new()
.operations(jobs)
.lease_ttl(Duration::from_secs(3))
.rate_per_sec(25),
)
.with_expectation(SchedulerDrained::new(jobs).timeout(Duration::from_secs(30)))
.build()?;
let deployer = SchedulerLocalDeployer::default();
let runner = deployer.deploy(&scenario).await?;
runner.run(&mut scenario).await?;
Ok(())
}

View File

@ -0,0 +1,49 @@
use std::time::Duration;
use anyhow::{Context as _, Result};
use scheduler_runtime_workloads::{
SchedulerBuilderExt, SchedulerDrained, SchedulerLeaseFailoverWorkload,
SchedulerScenarioBuilder, SchedulerTopology,
};
use testing_framework_core::scenario::Deployer;
use testing_framework_runner_compose::ComposeRunnerError;
use tracing::{info, warn};
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let jobs = 100;
let mut scenario = SchedulerScenarioBuilder::deployment_with(|_| SchedulerTopology::new(3))
.with_run_duration(Duration::from_secs(35))
.with_workload(
SchedulerLeaseFailoverWorkload::new()
.operations(jobs)
.lease_ttl(Duration::from_secs(3))
.rate_per_sec(20),
)
.with_expectation(SchedulerDrained::new(jobs).timeout(Duration::from_secs(30)))
.build()?;
let deployer = scheduler_runtime_ext::SchedulerComposeDeployer::new();
let runner = match deployer.deploy(&scenario).await {
Ok(runner) => runner,
Err(ComposeRunnerError::DockerUnavailable) => {
warn!("docker unavailable; skipping scheduler compose run");
return Ok(());
}
Err(error) => {
return Err(anyhow::Error::new(error)).context("deploying scheduler compose stack");
}
};
info!("running scheduler compose failover scenario");
runner
.run(&mut scenario)
.await
.context("running scheduler compose scenario")?;
Ok(())
}

View File

@ -0,0 +1,21 @@
[package]
edition.workspace = true
license.workspace = true
name = "scheduler-node"
version.workspace = true
[[bin]]
name = "scheduler-node"
path = "src/main.rs"
[dependencies]
anyhow = "1.0"
axum = "0.7"
clap = { version = "4.0", features = ["derive"] }
reqwest = { workspace = true, features = ["json"] }
serde = { workspace = true }
serde_yaml = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tower-http = { version = "0.6", features = ["trace"] }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@ -0,0 +1,35 @@
use std::{fs, path::Path};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PeerInfo {
pub node_id: u64,
pub http_address: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SchedulerConfig {
pub node_id: u64,
pub http_port: u16,
pub peers: Vec<PeerInfo>,
#[serde(default = "default_sync_interval_ms")]
pub sync_interval_ms: u64,
#[serde(default = "default_lease_ttl_ms")]
pub lease_ttl_ms: u64,
}
impl SchedulerConfig {
pub fn load(path: &Path) -> anyhow::Result<Self> {
let raw = fs::read_to_string(path)?;
Ok(serde_yaml::from_str(&raw)?)
}
}
const fn default_sync_interval_ms() -> u64 {
1000
}
const fn default_lease_ttl_ms() -> u64 {
3000
}

View File

@ -0,0 +1,36 @@
mod config;
mod server;
mod state;
mod sync;
use std::path::PathBuf;
use clap::Parser;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use crate::{config::SchedulerConfig, state::SchedulerState, sync::SyncService};
#[derive(Parser, Debug)]
#[command(name = "scheduler-node")]
struct Args {
#[arg(short, long)]
config: PathBuf,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "scheduler_node=info,tower_http=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let args = Args::parse();
let config = SchedulerConfig::load(&args.config)?;
let state = SchedulerState::new(config.node_id, config.lease_ttl_ms);
SyncService::new(config.clone(), state.clone()).start();
server::start_server(config, state).await
}

View File

@ -0,0 +1,156 @@
use std::net::SocketAddr;
use axum::{
Router,
extract::State,
http::StatusCode,
response::Json,
routing::{get, post},
};
use serde::{Deserialize, Serialize};
use tower_http::trace::TraceLayer;
use crate::{
config::SchedulerConfig,
state::{SchedulerState, Snapshot, StateView},
};
#[derive(Serialize)]
struct HealthResponse {
status: &'static str,
}
#[derive(Deserialize)]
struct EnqueueRequest {
payload: String,
}
#[derive(Serialize)]
struct EnqueueResponse {
id: u64,
}
#[derive(Deserialize)]
struct ClaimRequest {
worker_id: String,
max_jobs: usize,
}
#[derive(Serialize)]
struct ClaimResponse {
jobs: Vec<ClaimedJob>,
}
#[derive(Serialize)]
struct ClaimedJob {
id: u64,
payload: String,
attempt: u32,
}
#[derive(Deserialize)]
struct HeartbeatRequest {
worker_id: String,
job_id: u64,
}
#[derive(Deserialize)]
struct AckRequest {
worker_id: String,
job_id: u64,
}
#[derive(Serialize)]
struct OperationResponse {
ok: bool,
}
pub async fn start_server(config: SchedulerConfig, state: SchedulerState) -> anyhow::Result<()> {
let app = Router::new()
.route("/health/live", get(health_live))
.route("/health/ready", get(health_ready))
.route("/jobs/enqueue", post(enqueue))
.route("/jobs/claim", post(claim))
.route("/jobs/heartbeat", post(heartbeat))
.route("/jobs/ack", post(ack))
.route("/jobs/state", get(state_view))
.route("/internal/snapshot", get(snapshot))
.layer(TraceLayer::new_for_http())
.with_state(state.clone());
let addr = SocketAddr::from(([0, 0, 0, 0], config.http_port));
let listener = tokio::net::TcpListener::bind(addr).await?;
state.set_ready(true).await;
tracing::info!(node_id = state.node_id(), %addr, "scheduler node ready");
axum::serve(listener, app).await?;
Ok(())
}
async fn health_live() -> (StatusCode, Json<HealthResponse>) {
(StatusCode::OK, Json(HealthResponse { status: "alive" }))
}
async fn health_ready(State(state): State<SchedulerState>) -> (StatusCode, Json<HealthResponse>) {
if state.is_ready().await {
(StatusCode::OK, Json(HealthResponse { status: "ready" }))
} else {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(HealthResponse {
status: "not-ready",
}),
)
}
}
async fn enqueue(
State(state): State<SchedulerState>,
Json(request): Json<EnqueueRequest>,
) -> (StatusCode, Json<EnqueueResponse>) {
let id = state.enqueue(request.payload).await;
(StatusCode::OK, Json(EnqueueResponse { id }))
}
async fn claim(
State(state): State<SchedulerState>,
Json(request): Json<ClaimRequest>,
) -> (StatusCode, Json<ClaimResponse>) {
let result = state.claim(request.worker_id, request.max_jobs).await;
let jobs = result
.jobs
.into_iter()
.map(|job| ClaimedJob {
id: job.id,
payload: job.payload,
attempt: job.attempt,
})
.collect();
(StatusCode::OK, Json(ClaimResponse { jobs }))
}
async fn heartbeat(
State(state): State<SchedulerState>,
Json(request): Json<HeartbeatRequest>,
) -> (StatusCode, Json<OperationResponse>) {
let ok = state.heartbeat(&request.worker_id, request.job_id).await;
(StatusCode::OK, Json(OperationResponse { ok }))
}
async fn ack(
State(state): State<SchedulerState>,
Json(request): Json<AckRequest>,
) -> (StatusCode, Json<OperationResponse>) {
let ok = state.ack(&request.worker_id, request.job_id).await;
(StatusCode::OK, Json(OperationResponse { ok }))
}
async fn state_view(State(state): State<SchedulerState>) -> Json<StateView> {
Json(state.state_view().await)
}
async fn snapshot(State(state): State<SchedulerState>) -> Json<Snapshot> {
Json(state.snapshot().await)
}

View File

@ -0,0 +1,249 @@
use std::{
collections::BTreeMap,
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
pub struct Revision {
pub version: u64,
pub origin: u64,
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct JobRecord {
pub id: u64,
pub payload: String,
pub attempt: u32,
pub owner: Option<String>,
pub lease_expires_at_ms: Option<u64>,
pub done: bool,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Snapshot {
pub node_id: u64,
pub revision: Revision,
pub next_id: u64,
pub jobs: BTreeMap<u64, JobRecord>,
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub struct StateView {
pub revision: Revision,
pub next_id: u64,
pub pending: usize,
pub leased: usize,
pub done: usize,
}
#[derive(Clone, Debug, Serialize)]
pub struct ClaimResult {
pub jobs: Vec<JobRecord>,
}
#[derive(Debug, Default)]
struct Data {
revision: Revision,
next_id: u64,
jobs: BTreeMap<u64, JobRecord>,
}
#[derive(Clone)]
pub struct SchedulerState {
node_id: u64,
ready: Arc<RwLock<bool>>,
lease_ttl_ms: u64,
data: Arc<RwLock<Data>>,
}
impl SchedulerState {
pub fn new(node_id: u64, lease_ttl_ms: u64) -> Self {
Self {
node_id,
ready: Arc::new(RwLock::new(false)),
lease_ttl_ms,
data: Arc::new(RwLock::new(Data {
next_id: 1,
..Data::default()
})),
}
}
pub const fn node_id(&self) -> u64 {
self.node_id
}
pub async fn set_ready(&self, value: bool) {
*self.ready.write().await = value;
}
pub async fn is_ready(&self) -> bool {
*self.ready.read().await
}
pub async fn enqueue(&self, payload: String) -> u64 {
let mut data = self.data.write().await;
reap_expired_leases(&mut data.jobs);
let id = data.next_id;
data.next_id = data.next_id.saturating_add(1);
data.jobs.insert(
id,
JobRecord {
id,
payload,
attempt: 0,
owner: None,
lease_expires_at_ms: None,
done: false,
},
);
bump_revision(&mut data.revision, self.node_id);
id
}
pub async fn claim(&self, worker_id: String, max_jobs: usize) -> ClaimResult {
let mut data = self.data.write().await;
reap_expired_leases(&mut data.jobs);
let now = unix_ms();
let mut claimed = Vec::new();
for job in data.jobs.values_mut() {
if claimed.len() >= max_jobs {
break;
}
if job.done || job.owner.is_some() {
continue;
}
job.attempt = job.attempt.saturating_add(1);
job.owner = Some(worker_id.clone());
job.lease_expires_at_ms = Some(now.saturating_add(self.lease_ttl_ms));
claimed.push(job.clone());
}
if !claimed.is_empty() {
bump_revision(&mut data.revision, self.node_id);
}
ClaimResult { jobs: claimed }
}
pub async fn heartbeat(&self, worker_id: &str, job_id: u64) -> bool {
let mut data = self.data.write().await;
reap_expired_leases(&mut data.jobs);
let Some(job) = data.jobs.get_mut(&job_id) else {
return false;
};
if job.done || job.owner.as_deref() != Some(worker_id) {
return false;
}
job.lease_expires_at_ms = Some(unix_ms().saturating_add(self.lease_ttl_ms));
bump_revision(&mut data.revision, self.node_id);
true
}
pub async fn ack(&self, worker_id: &str, job_id: u64) -> bool {
let mut data = self.data.write().await;
reap_expired_leases(&mut data.jobs);
let Some(job) = data.jobs.get_mut(&job_id) else {
return false;
};
if job.done || job.owner.as_deref() != Some(worker_id) {
return false;
}
job.done = true;
job.owner = None;
job.lease_expires_at_ms = None;
bump_revision(&mut data.revision, self.node_id);
true
}
pub async fn state_view(&self) -> StateView {
let data = self.data.read().await;
let mut pending = 0;
let mut leased = 0;
let mut done = 0;
for job in data.jobs.values() {
if job.done {
done += 1;
} else if job.owner.is_some() {
leased += 1;
} else {
pending += 1;
}
}
StateView {
revision: data.revision,
next_id: data.next_id,
pending,
leased,
done,
}
}
pub async fn merge_snapshot(&self, snapshot: Snapshot) {
let mut data = self.data.write().await;
if is_newer_revision(snapshot.revision, data.revision) {
data.revision = snapshot.revision;
data.next_id = snapshot.next_id;
data.jobs = snapshot.jobs;
}
}
pub async fn snapshot(&self) -> Snapshot {
let data = self.data.read().await;
Snapshot {
node_id: self.node_id,
revision: data.revision,
next_id: data.next_id,
jobs: data.jobs.clone(),
}
}
}
fn reap_expired_leases(jobs: &mut BTreeMap<u64, JobRecord>) {
let now = unix_ms();
for job in jobs.values_mut() {
if job.done {
continue;
}
if let Some(expiry) = job.lease_expires_at_ms
&& expiry <= now
{
job.owner = None;
job.lease_expires_at_ms = None;
}
}
}
fn bump_revision(revision: &mut Revision, node_id: u64) {
revision.version = revision.version.saturating_add(1);
revision.origin = node_id;
}
fn is_newer_revision(candidate: Revision, existing: Revision) -> bool {
(candidate.version, candidate.origin) > (existing.version, existing.origin)
}
fn unix_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |duration| duration.as_millis() as u64)
}

View File

@ -0,0 +1,103 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use reqwest::Client;
use tokio::sync::Mutex;
use tracing::{debug, warn};
use crate::{
config::SchedulerConfig,
state::{SchedulerState, Snapshot},
};
const WARN_AFTER_CONSECUTIVE_FAILURES: u32 = 5;
#[derive(Clone)]
pub struct SyncService {
config: Arc<SchedulerConfig>,
state: SchedulerState,
client: Client,
failures_by_peer: Arc<Mutex<HashMap<String, u32>>>,
}
impl SyncService {
pub fn new(config: SchedulerConfig, state: SchedulerState) -> Self {
Self {
config: Arc::new(config),
state,
client: Client::new(),
failures_by_peer: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn start(&self) {
let service = self.clone();
tokio::spawn(async move {
service.run().await;
});
}
async fn run(self) {
let interval = Duration::from_millis(self.config.sync_interval_ms.max(100));
loop {
self.sync_once().await;
tokio::time::sleep(interval).await;
}
}
async fn sync_once(&self) {
for peer in &self.config.peers {
match self.fetch_snapshot(&peer.http_address).await {
Ok(snapshot) => {
self.state.merge_snapshot(snapshot).await;
self.clear_failure_counter(&peer.http_address).await;
}
Err(error) => {
self.record_sync_failure(&peer.http_address, &error).await;
}
}
}
}
async fn fetch_snapshot(&self, peer_address: &str) -> anyhow::Result<Snapshot> {
let url = format!("http://{peer_address}/internal/snapshot");
let snapshot = self
.client
.get(url)
.send()
.await?
.error_for_status()?
.json()
.await?;
Ok(snapshot)
}
async fn clear_failure_counter(&self, peer_address: &str) {
let mut failures = self.failures_by_peer.lock().await;
failures.remove(peer_address);
}
async fn record_sync_failure(&self, peer_address: &str, error: &anyhow::Error) {
let consecutive_failures = {
let mut failures = self.failures_by_peer.lock().await;
let entry = failures.entry(peer_address.to_owned()).or_insert(0);
*entry += 1;
*entry
};
if consecutive_failures >= WARN_AFTER_CONSECUTIVE_FAILURES {
warn!(
peer = %peer_address,
%error,
consecutive_failures,
"scheduler sync repeatedly failing"
);
} else {
debug!(
peer = %peer_address,
%error,
consecutive_failures,
"scheduler sync failed"
);
}
}
}

View File

@ -0,0 +1,13 @@
[package]
edition.workspace = true
license.workspace = true
name = "scheduler-runtime-ext"
version.workspace = true
[dependencies]
async-trait = { workspace = true }
reqwest = { workspace = true, features = ["json"] }
serde = { workspace = true }
testing-framework-core = { workspace = true }
testing-framework-runner-compose = { workspace = true }
testing-framework-runner-local = { workspace = true }

View File

@ -0,0 +1,116 @@
use std::io::Error;
use async_trait::async_trait;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use testing_framework_core::scenario::{
Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView,
DefaultFeedRuntime, DynError, NodeAccess, serialize_cluster_yaml_config,
};
pub type SchedulerTopology = testing_framework_core::topology::ClusterTopology;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SchedulerPeerInfo {
pub node_id: u64,
pub http_address: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SchedulerNodeConfig {
pub node_id: u64,
pub http_port: u16,
pub peers: Vec<SchedulerPeerInfo>,
pub sync_interval_ms: u64,
pub lease_ttl_ms: u64,
}
#[derive(Clone)]
pub struct SchedulerHttpClient {
base_url: Url,
client: reqwest::Client,
}
impl SchedulerHttpClient {
#[must_use]
pub fn new(base_url: Url) -> Self {
Self {
base_url,
client: reqwest::Client::new(),
}
}
pub async fn get<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T, DynError> {
let url = self.base_url.join(path)?;
let response = self.client.get(url).send().await?.error_for_status()?;
Ok(response.json().await?)
}
pub async fn post<B: Serialize, T: serde::de::DeserializeOwned>(
&self,
path: &str,
body: &B,
) -> Result<T, DynError> {
let url = self.base_url.join(path)?;
let response = self
.client
.post(url)
.json(body)
.send()
.await?
.error_for_status()?;
Ok(response.json().await?)
}
}
pub struct SchedulerEnv;
#[async_trait]
impl Application for SchedulerEnv {
type Deployment = SchedulerTopology;
type NodeClient = SchedulerHttpClient;
type NodeConfig = SchedulerNodeConfig;
type FeedRuntime = DefaultFeedRuntime;
fn build_node_client(access: &NodeAccess) -> Result<Self::NodeClient, DynError> {
Ok(SchedulerHttpClient::new(access.api_base_url()?))
}
fn node_readiness_path() -> &'static str {
"/health/ready"
}
}
impl ClusterNodeConfigApplication for SchedulerEnv {
type ConfigError = Error;
fn static_network_port() -> u16 {
8080
}
fn build_cluster_node_config(
node: &ClusterNodeView,
peers: &[ClusterPeerView],
) -> Result<Self::NodeConfig, Self::ConfigError> {
let peers = peers
.iter()
.map(|peer| SchedulerPeerInfo {
node_id: peer.index() as u64,
http_address: peer.authority(),
})
.collect::<Vec<_>>();
Ok(SchedulerNodeConfig {
node_id: node.index() as u64,
http_port: node.network_port(),
peers,
sync_interval_ms: 500,
lease_ttl_ms: 3000,
})
}
fn serialize_cluster_node_config(
config: &Self::NodeConfig,
) -> Result<String, Self::ConfigError> {
serialize_cluster_yaml_config(config).map_err(Error::other)
}
}

View File

@ -0,0 +1,15 @@
use testing_framework_runner_compose::{BinaryConfigNodeSpec, ComposeBinaryApp};
use crate::SchedulerEnv;
const NODE_CONFIG_PATH: &str = "/etc/scheduler/config.yaml";
impl ComposeBinaryApp for SchedulerEnv {
fn compose_node_spec() -> BinaryConfigNodeSpec {
BinaryConfigNodeSpec::conventional(
"/usr/local/bin/scheduler-node",
NODE_CONFIG_PATH,
vec![8080, 8081],
)
}
}

View File

@ -0,0 +1,10 @@
mod app;
mod compose_env;
mod local_env;
pub mod scenario;
pub use app::*;
pub use scenario::{SchedulerBuilderExt, SchedulerScenarioBuilder};
pub type SchedulerLocalDeployer = testing_framework_runner_local::ProcessDeployer<SchedulerEnv>;
pub type SchedulerComposeDeployer = testing_framework_runner_compose::ComposeDeployer<SchedulerEnv>;

View File

@ -0,0 +1,42 @@
use std::collections::HashMap;
use testing_framework_core::scenario::{DynError, StartNodeOptions};
use testing_framework_runner_local::{
LocalBinaryApp, LocalNodePorts, LocalPeerNode, LocalProcessSpec,
build_local_cluster_node_config, yaml_node_config,
};
use crate::{SchedulerEnv, SchedulerNodeConfig};
impl LocalBinaryApp for SchedulerEnv {
fn initial_node_name_prefix() -> &'static str {
"scheduler-node"
}
fn build_local_node_config_with_peers(
_topology: &Self::Deployment,
index: usize,
ports: &LocalNodePorts,
peers: &[LocalPeerNode],
_peer_ports_by_name: &HashMap<String, u16>,
_options: &StartNodeOptions<Self>,
_template_config: Option<
&<Self as testing_framework_core::scenario::Application>::NodeConfig,
>,
) -> Result<<Self as testing_framework_core::scenario::Application>::NodeConfig, DynError> {
build_local_cluster_node_config::<Self>(index, ports, peers)
}
fn local_process_spec() -> LocalProcessSpec {
LocalProcessSpec::new("SCHEDULER_NODE_BIN", "scheduler-node")
.with_rust_log("scheduler_node=info")
}
fn render_local_config(config: &SchedulerNodeConfig) -> Result<Vec<u8>, DynError> {
yaml_node_config(config)
}
fn http_api_port(config: &SchedulerNodeConfig) -> u16 {
config.http_port
}
}

View File

@ -0,0 +1,15 @@
use testing_framework_core::scenario::ScenarioBuilder;
use crate::{SchedulerEnv, SchedulerTopology};
pub type SchedulerScenarioBuilder = ScenarioBuilder<SchedulerEnv>;
pub trait SchedulerBuilderExt: Sized {
fn deployment_with(f: impl FnOnce(SchedulerTopology) -> SchedulerTopology) -> Self;
}
impl SchedulerBuilderExt for SchedulerScenarioBuilder {
fn deployment_with(f: impl FnOnce(SchedulerTopology) -> SchedulerTopology) -> Self {
SchedulerScenarioBuilder::with_deployment(f(SchedulerTopology::new(3)))
}
}

View File

@ -0,0 +1,13 @@
[package]
edition.workspace = true
license.workspace = true
name = "scheduler-runtime-workloads"
version.workspace = true
[dependencies]
async-trait = { workspace = true }
scheduler-runtime-ext = { path = "../integration" }
serde = { workspace = true }
testing-framework-core = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }

View File

@ -0,0 +1,99 @@
use std::time::Duration;
use async_trait::async_trait;
use scheduler_runtime_ext::SchedulerEnv;
use serde::Deserialize;
use testing_framework_core::scenario::{DynError, Expectation, RunContext};
use tracing::info;
#[derive(Clone)]
pub struct SchedulerDrained {
min_done: usize,
timeout: Duration,
poll_interval: Duration,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
struct Revision {
version: u64,
origin: u64,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
struct StateResponse {
revision: Revision,
pending: usize,
leased: usize,
done: usize,
}
impl SchedulerDrained {
#[must_use]
pub fn new(min_done: usize) -> Self {
Self {
min_done,
timeout: Duration::from_secs(30),
poll_interval: Duration::from_millis(500),
}
}
#[must_use]
pub const fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
}
#[async_trait]
impl Expectation<SchedulerEnv> for SchedulerDrained {
fn name(&self) -> &str {
"scheduler_drained"
}
async fn evaluate(&mut self, ctx: &RunContext<SchedulerEnv>) -> Result<(), DynError> {
let clients = ctx.node_clients().snapshot();
if clients.is_empty() {
return Err("no scheduler node clients available".into());
}
let deadline = tokio::time::Instant::now() + self.timeout;
while tokio::time::Instant::now() < deadline {
if is_drained_and_converged(&clients, self.min_done).await? {
info!(min_done = self.min_done, "scheduler drained and converged");
return Ok(());
}
tokio::time::sleep(self.poll_interval).await;
}
Err(format!("scheduler not drained within {:?}", self.timeout).into())
}
}
async fn is_drained_and_converged(
clients: &[scheduler_runtime_ext::SchedulerHttpClient],
min_done: usize,
) -> Result<bool, DynError> {
let Some((first, rest)) = clients.split_first() else {
return Ok(false);
};
let baseline = read_state(first).await?;
if baseline.pending != 0 || baseline.leased != 0 || baseline.done < min_done {
return Ok(false);
}
for client in rest {
let current = read_state(client).await?;
if current != baseline {
return Ok(false);
}
}
Ok(true)
}
async fn read_state(
client: &scheduler_runtime_ext::SchedulerHttpClient,
) -> Result<StateResponse, DynError> {
client.get("/jobs/state").await
}

View File

@ -0,0 +1,205 @@
use std::{collections::HashSet, time::Duration};
use async_trait::async_trait;
use scheduler_runtime_ext::SchedulerEnv;
use serde::{Deserialize, Serialize};
use testing_framework_core::scenario::{DynError, RunContext, Workload};
use tokio::time::{Instant, sleep};
use tracing::info;
#[derive(Clone)]
pub struct SchedulerLeaseFailoverWorkload {
operations: usize,
lease_ttl: Duration,
rate_per_sec: Option<usize>,
payload_prefix: String,
}
#[derive(Serialize)]
struct EnqueueRequest {
payload: String,
}
#[derive(Deserialize)]
struct EnqueueResponse {
id: u64,
}
#[derive(Serialize)]
struct ClaimRequest {
worker_id: String,
max_jobs: usize,
}
#[derive(Deserialize)]
struct ClaimedJob {
id: u64,
}
#[derive(Deserialize)]
struct ClaimResponse {
jobs: Vec<ClaimedJob>,
}
#[derive(Serialize)]
struct AckRequest {
worker_id: String,
job_id: u64,
}
#[derive(Deserialize)]
struct OperationResponse {
ok: bool,
}
impl SchedulerLeaseFailoverWorkload {
#[must_use]
pub fn new() -> Self {
Self {
operations: 100,
lease_ttl: Duration::from_secs(3),
rate_per_sec: Some(25),
payload_prefix: "scheduler-job".to_owned(),
}
}
#[must_use]
pub const fn operations(mut self, value: usize) -> Self {
self.operations = value;
self
}
#[must_use]
pub const fn lease_ttl(mut self, value: Duration) -> Self {
self.lease_ttl = value;
self
}
#[must_use]
pub const fn rate_per_sec(mut self, value: usize) -> Self {
self.rate_per_sec = Some(value);
self
}
}
impl Default for SchedulerLeaseFailoverWorkload {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Workload<SchedulerEnv> for SchedulerLeaseFailoverWorkload {
fn name(&self) -> &str {
"scheduler_lease_failover_workload"
}
async fn start(&self, ctx: &RunContext<SchedulerEnv>) -> Result<(), DynError> {
let clients = ctx.node_clients().snapshot();
let Some(node_a) = clients.first() else {
return Err("no scheduler node clients available".into());
};
let node_b = clients.get(1).unwrap_or(node_a);
let interval = self.rate_per_sec.and_then(compute_interval);
let mut enqueued_ids = Vec::with_capacity(self.operations);
info!(
operations = self.operations,
"scheduler failover: enqueue phase"
);
for index in 0..self.operations {
let response: EnqueueResponse = node_a
.post(
"/jobs/enqueue",
&EnqueueRequest {
payload: format!("{}-{index}", self.payload_prefix),
},
)
.await?;
enqueued_ids.push(response.id);
if let Some(delay) = interval {
sleep(delay).await;
}
}
info!("scheduler failover: worker-a claim without ack");
let first_claim: ClaimResponse = node_a
.post(
"/jobs/claim",
&ClaimRequest {
worker_id: "worker-a".to_owned(),
max_jobs: self.operations,
},
)
.await?;
if first_claim.jobs.len() != self.operations {
return Err(format!(
"worker-a claimed {} jobs, expected {}",
first_claim.jobs.len(),
self.operations
)
.into());
}
sleep(self.lease_ttl + Duration::from_millis(500)).await;
info!("scheduler failover: worker-b reclaim and ack");
let mut pending_ids: HashSet<u64> = enqueued_ids.into_iter().collect();
let reclaim_deadline = Instant::now() + Duration::from_secs(20);
while !pending_ids.is_empty() && Instant::now() < reclaim_deadline {
let claim: ClaimResponse = node_b
.post(
"/jobs/claim",
&ClaimRequest {
worker_id: "worker-b".to_owned(),
max_jobs: pending_ids.len(),
},
)
.await?;
if claim.jobs.is_empty() {
sleep(Duration::from_millis(200)).await;
continue;
}
for job in claim.jobs {
if !pending_ids.remove(&job.id) {
return Err(format!("unexpected reclaimed job id {}", job.id).into());
}
let ack: OperationResponse = node_b
.post(
"/jobs/ack",
&AckRequest {
worker_id: "worker-b".to_owned(),
job_id: job.id,
},
)
.await?;
if !ack.ok {
return Err(format!("failed to ack reclaimed job {}", job.id).into());
}
}
}
if !pending_ids.is_empty() {
return Err(
format!("scheduler failover left {} unacked jobs", pending_ids.len()).into(),
);
}
Ok(())
}
}
fn compute_interval(rate_per_sec: usize) -> Option<Duration> {
if rate_per_sec == 0 {
return None;
}
Some(Duration::from_millis((1000 / rate_per_sec as u64).max(1)))
}

View File

@ -0,0 +1,8 @@
mod drained;
mod lease_failover;
pub use drained::SchedulerDrained;
pub use lease_failover::SchedulerLeaseFailoverWorkload;
pub use scheduler_runtime_ext::{
SchedulerBuilderExt, SchedulerEnv, SchedulerScenarioBuilder, SchedulerTopology,
};