feat(examples): replace scheduler with openraft kv example

This commit is contained in:
andrussal 2026-04-12 10:55:19 +02:00
parent 5a33627aca
commit f3f069c321
50 changed files with 2707 additions and 1351 deletions

View File

@ -6,7 +6,11 @@ exclude-dev = true
no-default-features = true
[advisories]
ignore = ["RUSTSEC-2026-0097"]
ignore = [
# Existing workspace dependencies still resolve rand 0.8 via tera/tokio-retry.
# Track removal when those upstream edges move to a fixed release.
"RUSTSEC-2026-0097",
]
yanked = "deny"
[bans]

677
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -8,14 +8,14 @@ members = [
"examples/kvstore/kvstore-node",
"examples/kvstore/testing/integration",
"examples/kvstore/testing/workloads",
"examples/openraft_kv/examples",
"examples/openraft_kv/openraft-kv-node",
"examples/openraft_kv/testing/integration",
"examples/openraft_kv/testing/workloads",
"examples/queue/examples",
"examples/queue/queue-node",
"examples/queue/testing/integration",
"examples/queue/testing/workloads",
"examples/scheduler/examples",
"examples/scheduler/scheduler-node",
"examples/scheduler/testing/integration",
"examples/scheduler/testing/workloads",
"testing-framework/core",
"testing-framework/deployers/compose",
"testing-framework/deployers/k8s",
@ -54,6 +54,8 @@ bytes = { default-features = false, version = "1.3" }
hex = { default-features = false, version = "0.4.3" }
libp2p = { default-features = false, version = "0.55" }
num-bigint = { default-features = false, version = "0.4" }
openraft = { default-features = true, features = ["serde", "type-alias"], version = "0.10.0-alpha.17" }
openraft-memstore = { default-features = true, version = "0.10.0-alpha.17" }
parking_lot = { default-features = false, version = "0.12" }
rand = { default-features = false, features = ["std", "std_rng"], version = "0.8" }
reqwest = { default-features = false, version = "0.12" }

View File

@ -1,3 +1,4 @@
# Build stage
FROM rustlang/rust:nightly-bookworm AS builder
WORKDIR /build
@ -7,7 +8,7 @@ COPY cfgsync/ ./cfgsync/
COPY examples/ ./examples/
COPY testing-framework/ ./testing-framework/
RUN cargo build --release -p scheduler-node
RUN cargo build --release -p openraft-kv-node
FROM debian:bookworm-slim
@ -15,10 +16,10 @@ RUN apt-get update && \
apt-get install -y ca-certificates && \
rm -rf /var/lib/apt/lists/*
COPY --from=builder /build/target/release/scheduler-node /usr/local/bin/scheduler-node
COPY --from=builder /build/target/release/openraft-kv-node /usr/local/bin/openraft-kv-node
RUN mkdir -p /etc/scheduler
RUN mkdir -p /etc/openraft-kv
WORKDIR /app
ENTRYPOINT ["/usr/local/bin/scheduler-node"]
CMD ["--config", "/etc/scheduler/config.yaml"]
ENTRYPOINT ["/usr/local/bin/openraft-kv-node"]
CMD ["--config", "/etc/openraft-kv/config.yaml"]

View File

@ -0,0 +1,87 @@
# OpenRaft KV Example
This example runs a small key-value service built on top of `OpenRaft`.
The main scenario does four things:
- bootstraps node 0 as a one-node cluster
- adds nodes 1 and 2 as learners and promotes them to voters
- writes one batch of keys through the current leader
- restarts that leader, waits for a new leader, writes again, and then checks
that all three nodes expose the same replicated state
## How TF runs this
- TF starts three OpenRaft nodes
- the workload bootstraps the cluster through the admin API
- the workload writes a first batch, restarts the current leader, waits for failover, and writes again
- the expectation checks that all three nodes converge on the same key/value state and membership
## Scenario
- `basic_failover` runs the leader-restart flow locally
- `compose_failover` runs the same flow in Docker Compose
- `k8s_failover` runs the same flow against a manual Kubernetes cluster deployment
## API
Each node exposes:
- `GET /healthz` for readiness
- `GET /state` for current Raft role, leader, membership, log progress, and replicated key/value data
- `POST /kv/write` to submit a write through the local Raft node
- `POST /kv/read` to read a key from the local state machine
- `POST /admin/init` to initialize a single-node cluster
- `POST /admin/add-learner` to add a new Raft learner
- `POST /admin/change-membership` to promote learners into the voting set
The node also exposes internal Raft RPC endpoints used only for replication:
- `POST /raft/vote`
- `POST /raft/append`
- `POST /raft/snapshot`
## Run locally
```bash
OPENRAFT_KV_NODE_BIN="$(pwd)/target/debug/openraft-kv-node" \
cargo run -p openraft-kv-examples --bin basic_failover
```
Build the node first if you have not done that yet:
```bash
cargo build -p openraft-kv-node
```
## Run with Docker Compose
Build the image first:
```bash
docker build -t openraft-kv-node:local -f examples/openraft_kv/Dockerfile .
```
Then run:
```bash
cargo run -p openraft-kv-examples --bin compose_failover
```
Set `OPENRAFT_KV_IMAGE` to override the default compose image tag.
## Run on Kubernetes
Build the same image first:
```bash
docker build -t openraft-kv-node:local -f examples/openraft_kv/Dockerfile .
```
Then run:
```bash
cargo run -p openraft-kv-examples --bin k8s_failover
```
If no cluster is available, the example exits early and prints a skip message.

View File

@ -0,0 +1,16 @@
[package]
edition.workspace = true
license.workspace = true
name = "openraft-kv-examples"
version.workspace = true
[dependencies]
anyhow = "1.0"
openraft-kv-node = { path = "../openraft-kv-node" }
openraft-kv-runtime-ext = { path = "../testing/integration" }
openraft-kv-runtime-workloads = { path = "../testing/workloads" }
testing-framework-core = { workspace = true }
testing-framework-runner-k8s = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@ -0,0 +1,20 @@
use std::time::Duration;
use openraft_kv_examples::build_failover_scenario;
use openraft_kv_runtime_ext::OpenRaftKvLocalDeployer;
use testing_framework_core::scenario::Deployer;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let mut scenario = build_failover_scenario(Duration::from_secs(45), Duration::from_secs(30))?;
let deployer = OpenRaftKvLocalDeployer::default();
let runner = deployer.deploy(&scenario).await?;
runner.run(&mut scenario).await?;
Ok(())
}

View File

@ -0,0 +1,20 @@
use std::time::Duration;
use openraft_kv_examples::build_failover_scenario;
use openraft_kv_runtime_ext::OpenRaftKvComposeDeployer;
use testing_framework_core::scenario::Deployer;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let mut scenario = build_failover_scenario(Duration::from_secs(60), Duration::from_secs(40))?;
let deployer = OpenRaftKvComposeDeployer::new();
let runner = deployer.deploy(&scenario).await?;
runner.run(&mut scenario).await?;
Ok(())
}

View File

@ -0,0 +1,166 @@
use std::time::Duration;
use anyhow::{Context as _, Result, anyhow};
use openraft_kv_examples::{
INITIAL_WRITE_BATCH, RAFT_KEY_PREFIX, SECOND_WRITE_BATCH, TOTAL_WRITES,
};
use openraft_kv_node::OpenRaftKvClient;
use openraft_kv_runtime_ext::{OpenRaftKvEnv, OpenRaftKvK8sDeployer, OpenRaftKvTopology};
use openraft_kv_runtime_workloads::{
OpenRaftMembership, resolve_client_for_node, wait_for_leader, wait_for_membership,
wait_for_replication, write_batch,
};
use testing_framework_runner_k8s::{ManualCluster, ManualClusterError};
use tracing::{info, warn};
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let deployer = OpenRaftKvK8sDeployer::new();
let cluster = match deployer
.manual_cluster_from_descriptors(OpenRaftKvTopology::new(3))
.await
{
Ok(cluster) => cluster,
Err(ManualClusterError::ClientInit { source }) => {
warn!("k8s unavailable ({source}); skipping openraft k8s run");
return Ok(());
}
Err(ManualClusterError::InstallStack { source })
if k8s_cluster_unavailable(&source.to_string()) =>
{
warn!("k8s unavailable ({source}); skipping openraft k8s run");
return Ok(());
}
Err(error) => {
return Err(anyhow::Error::new(error)).context("creating openraft k8s cluster");
}
};
run_failover(cluster, Duration::from_secs(40)).await
}
async fn run_failover(cluster: ManualCluster<OpenRaftKvEnv>, timeout: Duration) -> Result<()> {
let mut clients = start_cluster(&cluster).await?;
clients[0].init_self().await?;
let initial_leader = wait_for_leader(&clients, timeout, None).await?;
let membership = OpenRaftMembership::discover(&clients).await?;
add_learners_and_promote(&clients, initial_leader, &membership, timeout).await?;
write_initial_batch(&clients, initial_leader, timeout).await?;
restart_leader(&cluster, initial_leader).await?;
refresh_clients(&cluster, &mut clients)?;
let new_leader = wait_for_leader(&clients, timeout, Some(initial_leader)).await?;
write_second_batch(&clients, new_leader, timeout).await?;
let expected = openraft_kv_runtime_workloads::expected_kv(RAFT_KEY_PREFIX, TOTAL_WRITES);
wait_for_replication(&clients, &expected, timeout).await?;
cluster.stop_all();
Ok(())
}
async fn start_cluster(cluster: &ManualCluster<OpenRaftKvEnv>) -> Result<Vec<OpenRaftKvClient>> {
let node0 = cluster.start_node("node-0").await?.client;
let node1 = cluster.start_node("node-1").await?.client;
let node2 = cluster.start_node("node-2").await?.client;
cluster.wait_network_ready().await?;
Ok(vec![node0, node1, node2])
}
async fn add_learners_and_promote(
clients: &[OpenRaftKvClient],
leader_id: u64,
membership: &OpenRaftMembership,
timeout: Duration,
) -> Result<()> {
let leader = resolve_client_for_node(clients, leader_id, timeout).await?;
for learner in membership.learner_targets(leader_id) {
info!(
target = learner.node_id,
addr = %learner.public_addr,
"adding learner"
);
leader
.add_learner(learner.node_id, &learner.public_addr)
.await?;
}
let voter_ids = membership.voter_ids();
leader.change_membership(voter_ids.iter().copied()).await?;
wait_for_membership(clients, &voter_ids, timeout).await?;
Ok(())
}
async fn write_initial_batch(
clients: &[OpenRaftKvClient],
leader_id: u64,
timeout: Duration,
) -> Result<()> {
let leader = resolve_client_for_node(clients, leader_id, timeout).await?;
write_batch(&leader, RAFT_KEY_PREFIX, 0, INITIAL_WRITE_BATCH).await?;
Ok(())
}
async fn write_second_batch(
clients: &[OpenRaftKvClient],
leader_id: u64,
timeout: Duration,
) -> Result<()> {
let leader = resolve_client_for_node(clients, leader_id, timeout).await?;
write_batch(
&leader,
RAFT_KEY_PREFIX,
INITIAL_WRITE_BATCH,
SECOND_WRITE_BATCH,
)
.await?;
Ok(())
}
async fn restart_leader(cluster: &ManualCluster<OpenRaftKvEnv>, leader_id: u64) -> Result<()> {
let leader_name = format!("node-{leader_id}");
info!(%leader_name, "restarting current leader");
cluster.restart_node(&leader_name).await?;
cluster.wait_network_ready().await?;
Ok(())
}
fn refresh_clients(
cluster: &ManualCluster<OpenRaftKvEnv>,
clients: &mut [OpenRaftKvClient],
) -> Result<()> {
for (index, slot) in clients.iter_mut().enumerate() {
*slot = cluster
.node_client(&format!("node-{index}"))
.ok_or_else(|| anyhow!("node-{index} client missing after restart"))?;
}
Ok(())
}
fn k8s_cluster_unavailable(message: &str) -> bool {
message.contains("Unable to connect to the server")
|| message.contains("TLS handshake timeout")
|| message.contains("connection refused")
}

View File

@ -0,0 +1,40 @@
use std::time::Duration;
use openraft_kv_runtime_ext::{OpenRaftKvBuilderExt, OpenRaftKvEnv, OpenRaftKvScenarioBuilder};
use openraft_kv_runtime_workloads::{OpenRaftKvConverges, OpenRaftKvFailoverWorkload};
use testing_framework_core::scenario::{NodeControlCapability, Scenario};
/// Number of writes issued before the leader restart.
pub const INITIAL_WRITE_BATCH: usize = 8;
/// Number of writes issued after the leader restart.
pub const SECOND_WRITE_BATCH: usize = 8;
/// Total write count expected after the scenario completes.
pub const TOTAL_WRITES: usize = INITIAL_WRITE_BATCH + SECOND_WRITE_BATCH;
/// Key prefix shared by the failover workload and convergence expectation.
pub const RAFT_KEY_PREFIX: &str = "raft-key";
/// Builds the standard failover scenario used by the local and compose
/// binaries.
pub fn build_failover_scenario(
run_duration: Duration,
workload_timeout: Duration,
) -> anyhow::Result<Scenario<OpenRaftKvEnv, NodeControlCapability>> {
Ok(
OpenRaftKvScenarioBuilder::deployment_with(|deployment| deployment)
.enable_node_control()
.with_run_duration(run_duration)
.with_workload(
OpenRaftKvFailoverWorkload::new()
.first_batch(INITIAL_WRITE_BATCH)
.second_batch(SECOND_WRITE_BATCH)
.timeout(workload_timeout)
.key_prefix(RAFT_KEY_PREFIX),
)
.with_expectation(
OpenRaftKvConverges::new(TOTAL_WRITES)
.timeout(run_duration)
.key_prefix(RAFT_KEY_PREFIX),
)
.build()?,
)
}

View File

@ -1,17 +1,19 @@
[package]
edition.workspace = true
license.workspace = true
name = "scheduler-node"
name = "openraft-kv-node"
version.workspace = true
[[bin]]
name = "scheduler-node"
name = "openraft-kv-node"
path = "src/main.rs"
[dependencies]
anyhow = "1.0"
axum = "0.7"
clap = { version = "4.0", features = ["derive"] }
openraft = { workspace = true }
openraft-memstore = { workspace = true }
reqwest = { workspace = true, features = ["json"] }
serde = { workspace = true }
serde_yaml = { workspace = true }

View File

@ -0,0 +1,136 @@
use std::{collections::BTreeSet, time::Duration};
use reqwest::Url;
use serde::{Serialize, de::DeserializeOwned};
use crate::types::{
AddLearnerRequest, AddLearnerResult, ChangeMembershipRequest, ChangeMembershipResult,
InitResult, OpenRaftKvReadRequest, OpenRaftKvReadResponse, OpenRaftKvState,
OpenRaftKvWriteRequest, OpenRaftKvWriteResponse,
};
/// Small HTTP client for the OpenRaft example node and its admin endpoints.
#[derive(Clone)]
pub struct OpenRaftKvClient {
base_url: Url,
client: reqwest::Client,
}
impl OpenRaftKvClient {
/// Builds a client for one node base URL.
#[must_use]
pub fn new(base_url: Url) -> Self {
Self {
base_url,
client: reqwest::Client::builder()
.timeout(Duration::from_secs(2))
.connect_timeout(Duration::from_secs(2))
.build()
.expect("openraft kv client timeout configuration is valid"),
}
}
/// Fetches the node's current Raft and application state.
pub async fn state(&self) -> anyhow::Result<OpenRaftKvState> {
self.get("state").await
}
/// Replicates one key/value write through the current leader.
pub async fn write(
&self,
key: &str,
value: &str,
serial: u64,
) -> anyhow::Result<OpenRaftKvWriteResponse> {
self.post_result(
"kv/write",
&OpenRaftKvWriteRequest {
key: key.to_owned(),
value: value.to_owned(),
serial,
},
)
.await
}
/// Reads one key from the replicated state machine.
pub async fn read(&self, key: &str) -> anyhow::Result<Option<String>> {
let response: OpenRaftKvReadResponse = self
.post_result(
"kv/read",
&OpenRaftKvReadRequest {
key: key.to_owned(),
},
)
.await?;
Ok(response.value)
}
/// Bootstraps a one-node cluster on this node.
pub async fn init_self(&self) -> anyhow::Result<()> {
let _: InitResult = self.post("admin/init", &()).await?;
Ok(())
}
/// Registers another node as a learner with the current leader.
pub async fn add_learner(&self, node_id: u64, addr: &str) -> anyhow::Result<()> {
let _: AddLearnerResult = self
.post(
"admin/add-learner",
&AddLearnerRequest {
node_id,
addr: addr.to_owned(),
},
)
.await?;
Ok(())
}
/// Promotes the cluster to the provided voter set.
pub async fn change_membership(
&self,
voters: impl IntoIterator<Item = u64>,
) -> anyhow::Result<()> {
let voters = normalize_voters(voters);
let request = ChangeMembershipRequest { voters };
let _: ChangeMembershipResult = self.post("admin/change-membership", &request).await?;
Ok(())
}
async fn get<T: DeserializeOwned>(&self, path: &str) -> anyhow::Result<T> {
let url = self.base_url.join(path)?;
let response = self.client.get(url).send().await?;
let response = response.error_for_status()?;
Ok(response.json().await?)
}
async fn post<B: Serialize, T: DeserializeOwned>(
&self,
path: &str,
body: &B,
) -> anyhow::Result<T> {
let url = self.base_url.join(path)?;
let response = self.client.post(url).json(body).send().await?;
let response = response.error_for_status()?;
Ok(response.json().await?)
}
async fn post_result<B: Serialize, T: DeserializeOwned>(
&self,
path: &str,
body: &B,
) -> anyhow::Result<T> {
let result: Result<T, String> = self.post(path, body).await?;
result.map_err(anyhow::Error::msg)
}
}
fn normalize_voters(voters: impl IntoIterator<Item = u64>) -> Vec<u64> {
let unique_voters = voters.into_iter().collect::<BTreeSet<_>>();
unique_voters.into_iter().collect()
}

View File

@ -0,0 +1,46 @@
use std::{collections::BTreeMap, fs, path::Path};
use serde::{Deserialize, Serialize};
/// Static node config written by TF for one OpenRaft node process.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OpenRaftKvNodeConfig {
/// Stable OpenRaft node identifier.
pub node_id: u64,
/// HTTP port bound by the node process.
pub http_port: u16,
/// Advertised Raft address for this node.
pub public_addr: String,
/// Advertised Raft addresses for the other known nodes.
#[serde(default)]
pub peer_addrs: BTreeMap<u64, String>,
/// Heartbeat interval passed to the OpenRaft config.
#[serde(default = "default_heartbeat_interval_ms")]
pub heartbeat_interval_ms: u64,
/// Lower election timeout bound passed to OpenRaft.
#[serde(default = "default_election_timeout_min_ms")]
pub election_timeout_min_ms: u64,
/// Upper election timeout bound passed to OpenRaft.
#[serde(default = "default_election_timeout_max_ms")]
pub election_timeout_max_ms: u64,
}
impl OpenRaftKvNodeConfig {
/// Loads one node config from YAML on disk.
pub fn load(path: &Path) -> anyhow::Result<Self> {
let raw = fs::read_to_string(path)?;
Ok(serde_yaml::from_str(&raw)?)
}
}
const fn default_heartbeat_interval_ms() -> u64 {
500
}
const fn default_election_timeout_min_ms() -> u64 {
1_500
}
const fn default_election_timeout_max_ms() -> u64 {
3_000
}

View File

@ -0,0 +1,25 @@
//! OpenRaft-backed key-value node used by the `examples-simple-clusters`
//! branch.
/// HTTP client for interacting with one OpenRaft node.
pub mod client;
/// YAML node configuration used by TF and the node binary.
pub mod config;
mod network;
/// Axum server bootstrap and request handlers for one node process.
pub mod server;
/// Shared request, response, and state payload types.
pub mod types;
/// Re-export of the node HTTP client.
pub use client::OpenRaftKvClient;
/// Re-export of the node YAML config type.
pub use config::OpenRaftKvNodeConfig;
/// Re-export of the public request and state payloads.
pub use types::{
AddLearnerRequest, ChangeMembershipRequest, OpenRaftKvReadRequest, OpenRaftKvReadResponse,
OpenRaftKvState, OpenRaftKvWriteRequest, OpenRaftKvWriteResponse,
};
/// OpenRaft type configuration shared by the in-memory log and state machine.
pub type TypeConfig = openraft_memstore::TypeConfig;

View File

@ -0,0 +1,24 @@
use std::path::PathBuf;
use clap::Parser;
use openraft_kv_node::{config::OpenRaftKvNodeConfig, server::run_server};
use tracing_subscriber::EnvFilter;
#[derive(Parser, Clone, Debug)]
#[command(author, version, about)]
struct Opt {
#[arg(long)]
config: PathBuf,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.with_ansi(false)
.init();
let options = Opt::parse();
let config = OpenRaftKvNodeConfig::load(&options.config)?;
run_server(config).await
}

View File

@ -0,0 +1,158 @@
//! HTTP transport used by OpenRaft to replicate between example nodes.
use std::{collections::BTreeMap, sync::Arc};
use openraft::{
RaftNetworkFactory, RaftNetworkV2,
alias::{SnapshotOf, VoteOf},
errors::{RPCError, StreamingError, Unreachable},
network::RPCOption,
};
use reqwest::Url;
use tokio::sync::RwLock;
use crate::{
TypeConfig,
types::{InstallFullSnapshotBody, SnapshotRpcResult},
};
/// Shared node-address book used by Raft RPC clients.
#[derive(Clone, Default)]
pub struct HttpNetworkFactory {
client: reqwest::Client,
known_nodes: Arc<RwLock<BTreeMap<u64, String>>>,
}
/// Per-target HTTP client used for Raft replication traffic.
pub struct HttpNetworkClient {
client: reqwest::Client,
target: u64,
target_addr: Option<String>,
}
impl HttpNetworkFactory {
/// Creates a network factory backed by one shared node-address map.
#[must_use]
pub fn new(known_nodes: Arc<RwLock<BTreeMap<u64, String>>>) -> Self {
Self {
client: reqwest::Client::new(),
known_nodes,
}
}
}
impl RaftNetworkFactory<TypeConfig> for HttpNetworkFactory {
type Network = HttpNetworkClient;
async fn new_client(&mut self, target: u64, _node: &()) -> Self::Network {
let target_addr = self.known_nodes.read().await.get(&target).cloned();
HttpNetworkClient {
client: self.client.clone(),
target,
target_addr,
}
}
}
impl RaftNetworkV2<TypeConfig> for HttpNetworkClient {
async fn append_entries(
&mut self,
rpc: openraft::raft::AppendEntriesRequest<TypeConfig>,
_option: RPCOption,
) -> Result<openraft::raft::AppendEntriesResponse<TypeConfig>, RPCError<TypeConfig>> {
self.post_rpc("raft/append", &rpc).await
}
async fn vote(
&mut self,
rpc: openraft::raft::VoteRequest<TypeConfig>,
_option: RPCOption,
) -> Result<openraft::raft::VoteResponse<TypeConfig>, RPCError<TypeConfig>> {
self.post_rpc("raft/vote", &rpc).await
}
async fn full_snapshot(
&mut self,
vote: VoteOf<TypeConfig>,
snapshot: SnapshotOf<TypeConfig>,
_cancel: impl std::future::Future<Output = openraft::errors::ReplicationClosed>
+ openraft::OptionalSend
+ 'static,
_option: RPCOption,
) -> Result<openraft::raft::SnapshotResponse<TypeConfig>, StreamingError<TypeConfig>> {
let body = InstallFullSnapshotBody {
vote,
meta: snapshot.meta,
data: snapshot.snapshot.into_inner(),
};
self.post_snapshot("raft/snapshot", &body).await
}
}
impl HttpNetworkClient {
async fn post_rpc<B, T>(&self, path: &str, body: &B) -> Result<T, RPCError<TypeConfig>>
where
B: serde::Serialize,
T: serde::de::DeserializeOwned,
{
let url = self.endpoint_url(path)?;
let response = self
.client
.post(url)
.json(body)
.send()
.await
.map_err(|err| RPCError::Unreachable(Unreachable::new(&err)))?
.error_for_status()
.map_err(|err| RPCError::Unreachable(Unreachable::new(&err)))?;
let result: Result<T, String> = response
.json()
.await
.map_err(|err| RPCError::Unreachable(Unreachable::new(&err)))?;
result.map_err(|err| RPCError::Unreachable(Unreachable::from_string(err)))
}
async fn post_snapshot(
&self,
path: &str,
body: &InstallFullSnapshotBody,
) -> Result<openraft::raft::SnapshotResponse<TypeConfig>, StreamingError<TypeConfig>> {
let url = self
.endpoint_url(path)
.map_err(|err| StreamingError::Unreachable(Unreachable::new(&err)))?;
let response = self
.client
.post(url)
.json(body)
.send()
.await
.map_err(|err| StreamingError::Unreachable(Unreachable::new(&err)))?
.error_for_status()
.map_err(|err| StreamingError::Unreachable(Unreachable::new(&err)))?;
let result: SnapshotRpcResult = response
.json()
.await
.map_err(|err| StreamingError::Unreachable(Unreachable::new(&err)))?;
result.map_err(|err| StreamingError::Unreachable(Unreachable::from_string(err)))
}
fn endpoint_url(&self, path: &str) -> Result<Url, Unreachable<TypeConfig>> {
let Some(addr) = &self.target_addr else {
return Err(Unreachable::from_string(format!(
"target {} has no known address",
self.target
)));
};
let mut url =
Url::parse(&format!("http://{addr}/")).map_err(|err| Unreachable::new(&err))?;
url.set_path(path);
Ok(url)
}
}

View File

@ -0,0 +1,276 @@
//! Axum server that exposes the OpenRaft example node and its admin endpoints.
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
};
use axum::{
Json, Router,
extract::State,
http::StatusCode,
routing::{get, post},
};
use openraft::{Config, Raft, SnapshotPolicy, type_config::async_runtime::WatchReceiver};
use openraft_memstore::{ClientRequest, MemLogStore, MemStateMachine, new_mem_store};
use tokio::sync::RwLock;
use tower_http::trace::TraceLayer;
use tracing::info;
use crate::{
TypeConfig,
config::OpenRaftKvNodeConfig,
network::HttpNetworkFactory,
types::{
AddLearnerRequest, AppendRpcResult, ChangeMembershipRequest, InitResult,
InstallSnapshotBody, MetricsResult, OpenRaftKvReadRequest, OpenRaftKvReadResponse,
OpenRaftKvState, OpenRaftKvWriteRequest, OpenRaftKvWriteResponse, SnapshotRpcResult,
VoteRpcResult,
},
};
type KnownNodes = Arc<RwLock<BTreeMap<u64, String>>>;
/// Shared state used by the HTTP handlers exposed by one node.
#[derive(Clone)]
pub struct AppState {
config: OpenRaftKvNodeConfig,
raft: Raft<TypeConfig, Arc<MemStateMachine>>,
state_machine: Arc<MemStateMachine>,
known_nodes: KnownNodes,
}
impl AppState {
/// Builds the application state for one node process.
pub fn new(
config: OpenRaftKvNodeConfig,
raft: Raft<TypeConfig, Arc<MemStateMachine>>,
state_machine: Arc<MemStateMachine>,
known_nodes: KnownNodes,
) -> Self {
Self {
config,
raft,
state_machine,
known_nodes,
}
}
}
/// Starts one OpenRaft-backed HTTP node.
pub async fn run_server(config: OpenRaftKvNodeConfig) -> anyhow::Result<()> {
let raft_config = Arc::new(
Config {
cluster_name: "openraft-kv".to_owned(),
heartbeat_interval: config.heartbeat_interval_ms,
election_timeout_min: config.election_timeout_min_ms,
election_timeout_max: config.election_timeout_max_ms,
snapshot_policy: SnapshotPolicy::Never,
..Default::default()
}
.validate()?,
);
let known_nodes = Arc::new(RwLock::new(known_nodes(&config)));
let (log_store, state_machine): (Arc<MemLogStore>, Arc<MemStateMachine>) = new_mem_store();
let network = HttpNetworkFactory::new(known_nodes.clone());
let raft = Raft::new(
config.node_id,
raft_config,
network,
log_store,
state_machine.clone(),
)
.await?;
let app_state = AppState::new(config.clone(), raft, state_machine, known_nodes);
let app = router(app_state);
let address = std::net::SocketAddr::from(([0, 0, 0, 0], config.http_port));
info!(
node_id = config.node_id,
public_addr = %config.public_addr,
peers = ?config.peer_addrs,
%address,
"starting openraft kv node"
);
let listener = tokio::net::TcpListener::bind(address).await?;
axum::serve(listener, app).await?;
Ok(())
}
fn router(app_state: AppState) -> Router {
let app_routes = Router::new()
.route("/healthz", get(healthz))
.route("/state", get(cluster_state))
.route("/kv/write", post(write))
.route("/kv/read", post(read));
let admin_routes = Router::new()
.route("/admin/init", post(init))
.route("/admin/add-learner", post(add_learner))
.route("/admin/change-membership", post(change_membership))
.route("/admin/metrics", get(metrics));
let raft_routes = Router::new()
.route("/raft/vote", post(vote))
.route("/raft/append", post(append))
.route("/raft/snapshot", post(snapshot));
app_routes
.merge(admin_routes)
.merge(raft_routes)
.layer(TraceLayer::new_for_http())
.with_state(app_state)
}
async fn healthz() -> &'static str {
"ok"
}
async fn cluster_state(State(app): State<AppState>) -> Result<Json<OpenRaftKvState>, StatusCode> {
let metrics = app.raft.metrics().borrow_watched().clone();
let sm = app.state_machine.get_state_machine().await;
let voters = metrics
.membership_config
.membership()
.voter_ids()
.collect::<Vec<_>>();
let kv = sm.client_status.into_iter().collect::<BTreeMap<_, _>>();
Ok(Json(OpenRaftKvState {
node_id: app.config.node_id,
public_addr: app.config.public_addr.clone(),
role: format!("{:?}", metrics.state),
current_leader: metrics.current_leader,
current_term: metrics.current_term,
last_log_index: metrics.last_log_index,
last_applied_index: metrics.last_applied.as_ref().map(|log_id| log_id.index()),
voters,
kv,
}))
}
async fn metrics(State(app): State<AppState>) -> Json<MetricsResult> {
Json(Ok(app.raft.metrics().borrow_watched().clone()))
}
async fn init(State(app): State<AppState>) -> Json<InitResult> {
let members = BTreeSet::from([app.config.node_id]);
Json(
app.raft
.initialize(members)
.await
.map_err(|err| err.to_string()),
)
}
async fn add_learner(
State(app): State<AppState>,
Json(request): Json<AddLearnerRequest>,
) -> Json<InitResult> {
let mut known_nodes = app.known_nodes.write().await;
known_nodes.insert(request.node_id, request.addr.clone());
drop(known_nodes);
Json(
app.raft
.add_learner(request.node_id, (), true)
.await
.map(|_| ())
.map_err(|err| err.to_string()),
)
}
async fn change_membership(
State(app): State<AppState>,
Json(request): Json<ChangeMembershipRequest>,
) -> Json<InitResult> {
Json(
app.raft
.change_membership(request.voters.into_iter().collect::<BTreeSet<_>>(), false)
.await
.map(|_| ())
.map_err(|err| err.to_string()),
)
}
async fn write(
State(app): State<AppState>,
Json(request): Json<OpenRaftKvWriteRequest>,
) -> Json<Result<OpenRaftKvWriteResponse, String>> {
let result = app
.raft
.client_write(ClientRequest {
client: request.key,
serial: request.serial,
status: request.value,
})
.await
.map(|response| OpenRaftKvWriteResponse {
previous: response.response().0.clone(),
})
.map_err(|err| err.to_string());
Json(result)
}
async fn read(
State(app): State<AppState>,
Json(request): Json<OpenRaftKvReadRequest>,
) -> Json<Result<OpenRaftKvReadResponse, String>> {
let sm = app.state_machine.get_state_machine().await;
Json(Ok(OpenRaftKvReadResponse {
value: sm.client_status.get(&request.key).cloned(),
}))
}
async fn vote(
State(app): State<AppState>,
Json(request): Json<openraft::raft::VoteRequest<TypeConfig>>,
) -> Json<VoteRpcResult> {
Json(app.raft.vote(request).await.map_err(|err| err.to_string()))
}
async fn append(
State(app): State<AppState>,
Json(request): Json<openraft::raft::AppendEntriesRequest<TypeConfig>>,
) -> Json<AppendRpcResult> {
Json(
app.raft
.append_entries(request)
.await
.map_err(|err| err.to_string()),
)
}
async fn snapshot(
State(app): State<AppState>,
Json(request): Json<InstallSnapshotBody>,
) -> Json<SnapshotRpcResult> {
let snapshot = openraft::alias::SnapshotOf::<TypeConfig> {
meta: request.meta,
snapshot: std::io::Cursor::new(request.data),
};
Json(
app.raft
.install_full_snapshot(request.vote, snapshot)
.await
.map_err(|err| err.to_string()),
)
}
fn known_nodes(config: &OpenRaftKvNodeConfig) -> BTreeMap<u64, String> {
let mut known_nodes = config.peer_addrs.clone();
known_nodes.insert(config.node_id, config.public_addr.clone());
known_nodes
}

View File

@ -0,0 +1,112 @@
use std::collections::BTreeMap;
use openraft::{
RaftMetrics,
alias::{SnapshotMetaOf, VoteOf},
raft::InstallSnapshotRequest,
};
use serde::{Deserialize, Serialize};
use crate::TypeConfig;
/// Result shape used by the simple admin endpoints in this example.
pub type OpenRaftResult<T> = Result<T, String>;
/// Request body for a replicated write submitted through the leader.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OpenRaftKvWriteRequest {
/// Application key to write.
pub key: String,
/// Value stored for the key.
pub value: String,
/// Client-side serial used by OpenRaft's example state machine.
pub serial: u64,
}
/// Response body returned after a replicated write is committed.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OpenRaftKvWriteResponse {
/// Previous value stored under the key, if any.
pub previous: Option<String>,
}
/// Request body for a key lookup.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OpenRaftKvReadRequest {
/// Application key to look up.
pub key: String,
}
/// Response body returned by a key lookup.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OpenRaftKvReadResponse {
/// Current value stored under the key, if any.
pub value: Option<String>,
}
/// Admin request used to register a learner in the current cluster.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AddLearnerRequest {
/// OpenRaft node identifier for the learner.
pub node_id: u64,
/// Advertised Raft address for the learner.
pub addr: String,
}
/// Admin request used to promote the cluster to a concrete voter set.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChangeMembershipRequest {
/// Full voter set that should own the cluster after the change.
pub voters: Vec<u64>,
}
/// Snapshot of one node's externally visible Raft and application state.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OpenRaftKvState {
/// Stable OpenRaft node identifier.
pub node_id: u64,
/// Advertised Raft address for this node.
pub public_addr: String,
/// Current OpenRaft role rendered as text.
pub role: String,
/// Leader known by this node, if any.
pub current_leader: Option<u64>,
/// Current term reported by this node.
pub current_term: u64,
/// Highest log index stored locally.
pub last_log_index: Option<u64>,
/// Highest log index applied to the state machine.
pub last_applied_index: Option<u64>,
/// Current voter set reported by this node.
pub voters: Vec<u64>,
/// Application state machine contents.
pub kv: BTreeMap<String, String>,
}
/// JSON representation used for full-snapshot replication over HTTP.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct InstallFullSnapshotBody {
/// Vote bundled with the snapshot transfer.
pub vote: VoteOf<TypeConfig>,
/// Snapshot metadata describing the transferred state.
pub meta: SnapshotMetaOf<TypeConfig>,
/// Serialized state machine bytes.
pub data: Vec<u8>,
}
/// Serialized result of a vote RPC.
pub type VoteRpcResult = Result<openraft::raft::VoteResponse<TypeConfig>, String>;
/// Serialized result of an append-entries RPC.
pub type AppendRpcResult = Result<openraft::raft::AppendEntriesResponse<TypeConfig>, String>;
/// Serialized result of a full-snapshot RPC.
pub type SnapshotRpcResult = Result<openraft::raft::SnapshotResponse<TypeConfig>, String>;
/// JSON payload returned by the metrics endpoint.
pub type MetricsResult = Result<RaftMetrics<TypeConfig>, String>;
/// JSON payload returned by `/admin/init`.
pub type InitResult = Result<(), String>;
/// JSON payload returned by `/admin/add-learner`.
pub type AddLearnerResult = Result<(), String>;
/// JSON payload returned by `/admin/change-membership`.
pub type ChangeMembershipResult = Result<(), String>;
/// Request type accepted by the snapshot endpoint.
pub type InstallSnapshotBody = InstallSnapshotRequest<TypeConfig>;

View File

@ -1,13 +1,13 @@
[package]
edition.workspace = true
license.workspace = true
name = "scheduler-runtime-ext"
name = "openraft-kv-runtime-ext"
version.workspace = true
[dependencies]
async-trait = { workspace = true }
scheduler-node = { path = "../../scheduler-node" }
serde = { workspace = true }
openraft-kv-node = { path = "../../openraft-kv-node" }
reqwest = { workspace = true }
testing-framework-core = { workspace = true }
testing-framework-runner-compose = { workspace = true }
testing-framework-runner-k8s = { workspace = true }
testing-framework-runner-local = { workspace = true }

View File

@ -0,0 +1,59 @@
use std::io::Error;
use openraft_kv_node::{OpenRaftKvClient, OpenRaftKvNodeConfig};
use testing_framework_core::scenario::{
Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView, DynError,
NodeAccess, serialize_cluster_yaml_config,
};
/// Three-node topology used by the OpenRaft example scenarios.
pub type OpenRaftKvTopology = testing_framework_core::topology::ClusterTopology;
/// Application environment wiring for the OpenRaft-backed key-value example.
pub struct OpenRaftKvEnv;
impl Application for OpenRaftKvEnv {
type Deployment = OpenRaftKvTopology;
type NodeClient = OpenRaftKvClient;
type NodeConfig = OpenRaftKvNodeConfig;
fn build_node_client(access: &NodeAccess) -> Result<Self::NodeClient, DynError> {
Ok(OpenRaftKvClient::new(access.api_base_url()?))
}
fn node_readiness_path() -> &'static str {
"/healthz"
}
}
impl ClusterNodeConfigApplication for OpenRaftKvEnv {
type ConfigError = Error;
fn static_network_port() -> u16 {
8080
}
fn build_cluster_node_config(
node: &ClusterNodeView,
peers: &[ClusterPeerView],
) -> Result<Self::NodeConfig, Self::ConfigError> {
Ok(OpenRaftKvNodeConfig {
node_id: node.index() as u64,
http_port: node.network_port(),
public_addr: node.authority(),
peer_addrs: peers
.iter()
.map(|peer| (peer.index() as u64, peer.authority()))
.collect(),
heartbeat_interval_ms: 500,
election_timeout_min_ms: 1_500,
election_timeout_max_ms: 3_000,
})
}
fn serialize_cluster_node_config(
config: &Self::NodeConfig,
) -> Result<String, Self::ConfigError> {
serialize_cluster_yaml_config(config).map_err(Error::other)
}
}

View File

@ -0,0 +1,112 @@
use std::{fs, path::Path};
use testing_framework_core::{
cfgsync::StaticNodeConfigProvider,
scenario::{Application, DynError},
topology::DeploymentDescriptor,
};
use testing_framework_runner_compose::{
BinaryConfigNodeSpec, ComposeDeployEnv, ComposeDescriptor, NodeDescriptor,
binary_config_node_runtime_spec, node_identifier,
};
use crate::OpenRaftKvEnv;
const NODE_CONFIG_PATH: &str = "/etc/openraft-kv/config.yaml";
const COMPOSE_HTTP_PORT_BASE: u16 = 47_080;
fn compose_node_spec() -> BinaryConfigNodeSpec {
BinaryConfigNodeSpec::conventional(
"/usr/local/bin/openraft-kv-node",
NODE_CONFIG_PATH,
vec![8080],
)
}
fn fixed_loopback_port_binding(host_port: u16, container_port: u16) -> String {
format!("127.0.0.1:{host_port}:{container_port}")
}
impl ComposeDeployEnv for OpenRaftKvEnv {
fn prepare_compose_configs(
path: &Path,
topology: &<Self as Application>::Deployment,
_cfgsync_port: u16,
_metrics_otlp_ingest_url: Option<&reqwest::Url>,
) -> Result<(), DynError> {
let hostnames = Self::cfgsync_hostnames(topology);
let stack_dir = path
.parent()
.ok_or_else(|| std::io::Error::other("compose config path has no parent"))?;
let configs_dir = stack_dir.join("configs");
fs::create_dir_all(&configs_dir)?;
for index in 0..topology.node_count() {
let mut config = Self::build_node_config(topology, index)?;
Self::rewrite_for_hostnames(topology, index, &hostnames, &mut config)?;
let rendered = Self::serialize_node_config(&config)?;
fs::write(
configs_dir.join(Self::static_node_config_file_name(index)),
rendered,
)?;
}
Ok(())
}
fn static_node_config_file_name(index: usize) -> String {
format!("node-{index}.yaml")
}
fn binary_config_node_spec(
_topology: &<Self as Application>::Deployment,
_index: usize,
) -> Result<Option<BinaryConfigNodeSpec>, DynError> {
Ok(Some(compose_node_spec()))
}
fn compose_descriptor(
topology: &<Self as Application>::Deployment,
_cfgsync_port: u16,
) -> Result<ComposeDescriptor, DynError> {
let spec = compose_node_spec();
let nodes = (0..topology.node_count())
.map(|index| {
let runtime = binary_config_node_runtime_spec(index, &spec);
let file_name = Self::static_node_config_file_name(index);
let host_port = COMPOSE_HTTP_PORT_BASE + index as u16;
let ports = compose_node_ports(host_port, &runtime.container_ports);
NodeDescriptor::new(
node_identifier(index),
runtime.image,
runtime.entrypoint,
vec![format!(
"./stack/configs/{file_name}:{}:ro",
spec.config_container_path
)],
runtime.extra_hosts,
ports,
runtime.container_ports,
runtime.environment,
runtime.platform,
)
})
.collect();
Ok(ComposeDescriptor::new(nodes))
}
}
fn compose_node_ports(host_port: u16, container_ports: &[u16]) -> Vec<String> {
container_ports
.iter()
.map(|port| {
// OpenRaft failover restarts the leader. Fixed host ports keep TF
// clients stable across `docker compose restart`.
fixed_loopback_port_binding(host_port, *port)
})
.collect()
}

View File

@ -0,0 +1,21 @@
use testing_framework_runner_k8s::{BinaryConfigK8sSpec, K8sBinaryApp};
use crate::OpenRaftKvEnv;
const CONTAINER_CONFIG_PATH: &str = "/etc/openraft-kv/config.yaml";
const CONTAINER_HTTP_PORT: u16 = 8080;
const SERVICE_TESTING_PORT: u16 = 8081;
const NODE_NAME_PREFIX: &str = "openraft-kv-node";
impl K8sBinaryApp for OpenRaftKvEnv {
fn k8s_binary_spec() -> BinaryConfigK8sSpec {
BinaryConfigK8sSpec::conventional(
"openraft-kv",
NODE_NAME_PREFIX,
"/usr/local/bin/openraft-kv-node",
CONTAINER_CONFIG_PATH,
CONTAINER_HTTP_PORT,
SERVICE_TESTING_PORT,
)
}
}

View File

@ -0,0 +1,16 @@
mod app;
mod compose_env;
mod k8s_env;
mod local_env;
pub mod scenario;
pub use app::*;
pub use scenario::{OpenRaftKvBuilderExt, OpenRaftKvScenarioBuilder};
/// Local process deployer for the OpenRaft example app.
pub type OpenRaftKvLocalDeployer = testing_framework_runner_local::ProcessDeployer<OpenRaftKvEnv>;
/// Docker Compose deployer for the OpenRaft example app.
pub type OpenRaftKvComposeDeployer =
testing_framework_runner_compose::ComposeDeployer<OpenRaftKvEnv>;
/// Kubernetes deployer for the OpenRaft example app.
pub type OpenRaftKvK8sDeployer = testing_framework_runner_k8s::K8sDeployer<OpenRaftKvEnv>;

View File

@ -0,0 +1,125 @@
use std::collections::{BTreeMap, HashMap};
use openraft_kv_node::OpenRaftKvNodeConfig;
use testing_framework_core::{
scenario::{DynError, StartNodeOptions},
topology::DeploymentDescriptor,
};
use testing_framework_runner_local::{
BuiltNodeConfig, LocalDeployerEnv, LocalNodePorts, LocalProcessSpec, NodeConfigEntry,
reserve_local_node_ports, yaml_node_config,
};
use crate::OpenRaftKvEnv;
impl LocalDeployerEnv for OpenRaftKvEnv {
fn build_node_config_from_template(
_topology: &Self::Deployment,
index: usize,
_peer_ports_by_name: &HashMap<String, u16>,
_options: &StartNodeOptions<Self>,
peer_ports: &[u16],
template_config: Option<&OpenRaftKvNodeConfig>,
) -> Result<BuiltNodeConfig<OpenRaftKvNodeConfig>, DynError> {
let mut reserved = reserve_local_node_ports(1, &[], "node")
.map_err(|source| -> DynError { source.into() })?;
let ports = reserved
.pop()
.ok_or_else(|| std::io::Error::other("failed to reserve local node ports"))?;
let mut config = template_config
.cloned()
.unwrap_or_else(|| local_node_config(index, ports.network_port(), BTreeMap::new()));
// OpenRaft peer config is index-sensitive, so local restarts must rebuild
// the full peer map from the current reserved port set.
let network_port = ports.network_port();
config.node_id = index as u64;
config.http_port = network_port;
config.public_addr = local_addr(network_port);
config.peer_addrs = peer_addrs_from_ports(peer_ports, index);
Ok(BuiltNodeConfig {
config,
network_port,
})
}
fn build_initial_node_configs(
topology: &Self::Deployment,
) -> Result<
Vec<NodeConfigEntry<OpenRaftKvNodeConfig>>,
testing_framework_runner_local::process::ProcessSpawnError,
> {
let reserved_ports = reserve_local_node_ports(topology.node_count(), &[], "node")?;
let peer_ports = reserved_ports
.iter()
.map(LocalNodePorts::network_port)
.collect::<Vec<_>>();
// Build every node from the same reserved port view so the initial
// cluster starts with a consistent peer list on all nodes.
Ok(reserved_ports
.iter()
.enumerate()
.map(|(index, ports)| NodeConfigEntry {
name: format!("node-{index}"),
config: local_node_config(
index,
ports.network_port(),
peer_addrs_from_ports(&peer_ports, index),
),
})
.collect())
}
fn initial_node_name_prefix() -> &'static str {
"node"
}
fn local_process_spec() -> Option<LocalProcessSpec> {
Some(
LocalProcessSpec::new("OPENRAFT_KV_NODE_BIN", "openraft-kv-node").with_rust_log("info"),
)
}
fn render_local_config(config: &OpenRaftKvNodeConfig) -> Result<Vec<u8>, DynError> {
yaml_node_config(config)
}
fn http_api_port(config: &OpenRaftKvNodeConfig) -> Option<u16> {
Some(config.http_port)
}
}
fn local_node_config(
index: usize,
network_port: u16,
peer_addrs: BTreeMap<u64, String>,
) -> OpenRaftKvNodeConfig {
OpenRaftKvNodeConfig {
node_id: index as u64,
http_port: network_port,
public_addr: local_addr(network_port),
peer_addrs,
heartbeat_interval_ms: 500,
election_timeout_min_ms: 1_500,
election_timeout_max_ms: 3_000,
}
}
fn peer_addrs_from_ports(peer_ports: &[u16], local_index: usize) -> BTreeMap<u64, String> {
peer_ports
.iter()
.enumerate()
.filter(|(peer_index, _)| *peer_index != local_index)
.map(|(peer_index, peer_port)| (peer_index as u64, local_addr(*peer_port)))
.collect()
}
fn local_addr(port: u16) -> String {
format!("127.0.0.1:{port}")
}

View File

@ -0,0 +1,19 @@
use testing_framework_core::scenario::ScenarioBuilder;
use crate::{OpenRaftKvEnv, OpenRaftKvTopology};
/// Scenario builder alias used by the OpenRaft example binaries.
pub type OpenRaftKvScenarioBuilder = ScenarioBuilder<OpenRaftKvEnv>;
/// Convenience helpers for constructing the fixed three-node OpenRaft topology.
pub trait OpenRaftKvBuilderExt: Sized {
/// Starts from the default three-node deployment and lets callers adjust
/// it.
fn deployment_with(f: impl FnOnce(OpenRaftKvTopology) -> OpenRaftKvTopology) -> Self;
}
impl OpenRaftKvBuilderExt for OpenRaftKvScenarioBuilder {
fn deployment_with(f: impl FnOnce(OpenRaftKvTopology) -> OpenRaftKvTopology) -> Self {
OpenRaftKvScenarioBuilder::with_deployment(f(OpenRaftKvTopology::new(3)))
}
}

View File

@ -0,0 +1,15 @@
[package]
edition.workspace = true
license.workspace = true
name = "openraft-kv-runtime-workloads"
version.workspace = true
[dependencies]
anyhow = "1.0"
async-trait = { workspace = true }
openraft-kv-node = { path = "../../openraft-kv-node" }
openraft-kv-runtime-ext = { path = "../integration" }
testing-framework-core = { workspace = true }
thiserror = "2.0"
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }

View File

@ -0,0 +1,58 @@
use std::time::Duration;
use async_trait::async_trait;
use openraft_kv_runtime_ext::OpenRaftKvEnv;
use testing_framework_core::scenario::{DynError, Expectation, RunContext};
use crate::support::{expected_kv, wait_for_replication};
/// Expectation that waits for the full voter set and the writes from this run
/// to converge on every node.
#[derive(Clone)]
pub struct OpenRaftKvConverges {
total_writes: usize,
timeout: Duration,
key_prefix: String,
}
impl OpenRaftKvConverges {
/// Creates a convergence check for the given number of replicated writes.
#[must_use]
pub fn new(total_writes: usize) -> Self {
Self {
total_writes,
timeout: Duration::from_secs(30),
key_prefix: "raft-key".to_owned(),
}
}
/// Overrides the key prefix used to derive expected writes.
#[must_use]
pub fn key_prefix(mut self, value: &str) -> Self {
self.key_prefix = value.to_owned();
self
}
/// Overrides the convergence timeout.
#[must_use]
pub const fn timeout(mut self, value: Duration) -> Self {
self.timeout = value;
self
}
}
#[async_trait]
impl Expectation<OpenRaftKvEnv> for OpenRaftKvConverges {
fn name(&self) -> &str {
"openraft_kv_converges"
}
async fn evaluate(&mut self, ctx: &RunContext<OpenRaftKvEnv>) -> Result<(), DynError> {
let expected = expected_kv(&self.key_prefix, self.total_writes);
let clients = ctx.node_clients().snapshot();
wait_for_replication(&clients, &expected, self.timeout).await?;
Ok(())
}
}

View File

@ -0,0 +1,201 @@
use std::time::Duration;
use async_trait::async_trait;
use openraft_kv_node::OpenRaftKvClient;
use openraft_kv_runtime_ext::OpenRaftKvEnv;
use testing_framework_core::scenario::{DynError, RunContext, Workload};
use tracing::info;
use crate::support::{
OpenRaftMembership, ensure_cluster_size, resolve_client_for_node, wait_for_leader,
wait_for_membership, write_batch,
};
/// Workload that bootstraps the cluster, expands it to three voters, writes one
/// batch, restarts the leader, then writes a second batch through the new
/// leader.
#[derive(Clone)]
pub struct OpenRaftKvFailoverWorkload {
first_batch: usize,
second_batch: usize,
timeout: Duration,
key_prefix: String,
}
impl OpenRaftKvFailoverWorkload {
/// Creates the default failover workload configuration.
#[must_use]
pub fn new() -> Self {
Self {
first_batch: 8,
second_batch: 8,
timeout: Duration::from_secs(30),
key_prefix: "raft-key".to_owned(),
}
}
/// Sets the number of writes issued before the leader restart.
#[must_use]
pub const fn first_batch(mut self, value: usize) -> Self {
self.first_batch = value;
self
}
/// Sets the number of writes issued after the leader restart.
#[must_use]
pub const fn second_batch(mut self, value: usize) -> Self {
self.second_batch = value;
self
}
/// Overrides the key prefix used for generated writes.
#[must_use]
pub fn key_prefix(mut self, value: &str) -> Self {
self.key_prefix = value.to_owned();
self
}
/// Overrides the timeout used for leader and membership transitions.
#[must_use]
pub const fn timeout(mut self, value: Duration) -> Self {
self.timeout = value;
self
}
}
impl Default for OpenRaftKvFailoverWorkload {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Workload<OpenRaftKvEnv> for OpenRaftKvFailoverWorkload {
fn name(&self) -> &str {
"openraft_kv_failover_workload"
}
async fn start(&self, ctx: &RunContext<OpenRaftKvEnv>) -> Result<(), DynError> {
let clients = ctx.node_clients().snapshot();
ensure_cluster_size(&clients, 3)?;
self.bootstrap_cluster(&clients).await?;
let initial_leader = wait_for_leader(&clients, self.timeout, None).await?;
let membership = OpenRaftMembership::discover(&clients).await?;
self.promote_cluster(&clients, initial_leader, &membership)
.await?;
self.write_initial_batch(&clients, initial_leader).await?;
let new_leader = self
.restart_leader_and_wait_for_failover(ctx, &clients, initial_leader)
.await?;
self.write_second_batch(&clients, new_leader).await?;
Ok(())
}
}
impl OpenRaftKvFailoverWorkload {
async fn bootstrap_cluster(&self, clients: &[OpenRaftKvClient]) -> Result<(), DynError> {
info!("initializing openraft cluster");
clients[0].init_self().await?;
Ok(())
}
async fn promote_cluster(
&self,
clients: &[OpenRaftKvClient],
leader_id: u64,
membership: &OpenRaftMembership,
) -> Result<(), DynError> {
let leader = resolve_client_for_node(clients, leader_id, self.timeout).await?;
for learner in membership.learner_targets(leader_id) {
info!(
target = learner.node_id,
addr = %learner.public_addr,
"adding learner"
);
leader
.add_learner(learner.node_id, &learner.public_addr)
.await?;
}
let voter_ids = membership.voter_ids();
leader.change_membership(voter_ids.iter().copied()).await?;
wait_for_membership(clients, &voter_ids, self.timeout).await?;
Ok(())
}
async fn write_initial_batch(
&self,
clients: &[OpenRaftKvClient],
leader_id: u64,
) -> Result<(), DynError> {
info!(
leader = leader_id,
writes = self.first_batch,
"writing initial batch"
);
let leader = resolve_client_for_node(clients, leader_id, self.timeout).await?;
write_batch(&leader, &self.key_prefix, 0, self.first_batch).await?;
Ok(())
}
async fn restart_leader_and_wait_for_failover(
&self,
ctx: &RunContext<OpenRaftKvEnv>,
clients: &[OpenRaftKvClient],
leader_id: u64,
) -> Result<u64, DynError> {
let Some(control) = ctx.node_control() else {
return Err("openraft failover workload requires node control".into());
};
let leader_name = format!("node-{leader_id}");
info!(%leader_name, "restarting current leader");
control.restart_node(&leader_name).await?;
let new_leader = wait_for_leader(clients, self.timeout, Some(leader_id)).await?;
info!(
old_leader = leader_id,
new_leader, "leader changed after restart"
);
Ok(new_leader)
}
async fn write_second_batch(
&self,
clients: &[OpenRaftKvClient],
leader_id: u64,
) -> Result<(), DynError> {
info!(
leader = leader_id,
writes = self.second_batch,
"writing second batch"
);
let leader = resolve_client_for_node(clients, leader_id, self.timeout).await?;
write_batch(
&leader,
&self.key_prefix,
self.first_batch,
self.second_batch,
)
.await?;
Ok(())
}
}

View File

@ -0,0 +1,14 @@
mod convergence;
mod failover;
mod support;
/// Replication expectation used by the OpenRaft example binaries.
pub use convergence::OpenRaftKvConverges;
/// Failover workload used by the OpenRaft example binaries.
pub use failover::OpenRaftKvFailoverWorkload;
/// Shared cluster helpers used by the OpenRaft workload and manual k8s example.
pub use support::{
FULL_VOTER_SET, OpenRaftClusterError, OpenRaftMembership, ensure_cluster_size, expected_kv,
resolve_client_for_node, wait_for_leader, wait_for_membership, wait_for_replication,
write_batch,
};

View File

@ -0,0 +1,325 @@
use std::{
collections::{BTreeMap, BTreeSet},
time::Duration,
};
use openraft_kv_node::{OpenRaftKvClient, OpenRaftKvState};
use thiserror::Error;
use tokio::time::{Instant, sleep};
const POLL_INTERVAL: Duration = Duration::from_millis(250);
const CLIENT_RESOLUTION_INTERVAL: Duration = Duration::from_millis(200);
/// Fixed voter set used by the example cluster.
pub const FULL_VOTER_SET: [u64; 3] = [0, 1, 2];
/// One learner candidate discovered from cluster state.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct LearnerTarget {
/// Node identifier used by OpenRaft membership.
pub node_id: u64,
/// Public address advertised for Raft traffic.
pub public_addr: String,
}
/// Membership view captured from the current node states.
#[derive(Clone, Debug)]
pub struct OpenRaftMembership {
states: Vec<OpenRaftKvState>,
}
impl OpenRaftMembership {
/// Reads and sorts the current node states by id.
pub async fn discover(clients: &[OpenRaftKvClient]) -> Result<Self, OpenRaftClusterError> {
let mut states = Vec::with_capacity(clients.len());
for client in clients {
states.push(client.state().await.map_err(OpenRaftClusterError::Client)?);
}
states.sort_by_key(|state| state.node_id);
Ok(Self { states })
}
/// Returns the full voter set implied by the discovered nodes.
#[must_use]
pub fn voter_ids(&self) -> BTreeSet<u64> {
self.states.iter().map(|state| state.node_id).collect()
}
/// Returns every non-leader node as a learner target.
#[must_use]
pub fn learner_targets(&self, leader_id: u64) -> Vec<LearnerTarget> {
self.states
.iter()
.filter(|state| state.node_id != leader_id)
.map(|state| LearnerTarget {
node_id: state.node_id,
public_addr: state.public_addr.clone(),
})
.collect()
}
}
/// One poll result across all known clients.
#[derive(Clone, Debug, Default)]
pub struct OpenRaftObservation {
states: Vec<OpenRaftKvState>,
failures: Vec<String>,
}
impl OpenRaftObservation {
/// Captures one best-effort view of the cluster.
pub async fn capture(clients: &[OpenRaftKvClient]) -> Self {
let mut states = Vec::with_capacity(clients.len());
let mut failures = Vec::new();
for (index, client) in clients.iter().enumerate() {
match client.state().await {
Ok(state) => states.push(state),
Err(error) => failures.push(format!("client_index={index} error={error}")),
}
}
states.sort_by_key(|state| state.node_id);
Self { states, failures }
}
/// Returns the unique observed leader when all responding nodes agree.
#[must_use]
pub fn agreed_leader(&self, different_from: Option<u64>) -> Option<u64> {
let observed = self
.states
.iter()
.filter_map(|state| state.current_leader)
.collect::<BTreeSet<_>>();
let leader = observed.iter().next().copied()?;
(observed.len() == 1 && different_from != Some(leader)).then_some(leader)
}
/// Returns `true` when every responding node reports the expected voter
/// set.
#[must_use]
pub fn all_voters_match(&self, expected_voters: &BTreeSet<u64>) -> bool {
!self.states.is_empty()
&& self.failures.is_empty()
&& self.states.iter().all(|state| {
state.voters.iter().copied().collect::<BTreeSet<_>>() == *expected_voters
})
}
/// Returns `true` when every responding node exposes the expected key/value
/// data.
#[must_use]
pub fn all_kv_match(&self, expected: &BTreeMap<String, String>) -> bool {
!self.states.is_empty()
&& self.failures.is_empty()
&& self.states.iter().all(|state| {
state.current_leader.is_some()
&& state.voters == FULL_VOTER_SET
&& expected
.iter()
.all(|(key, value)| state.kv.get(key) == Some(value))
})
}
/// Returns a concise summary for timeout errors.
#[must_use]
pub fn summary(&self) -> String {
let mut lines = self
.states
.iter()
.map(|state| {
format!(
"node={} leader={:?} voters={:?} keys={}",
state.node_id,
state.current_leader,
state.voters,
state.kv.len()
)
})
.collect::<Vec<_>>();
lines.extend(self.failures.iter().cloned());
if lines.is_empty() {
return "no state observed yet".to_owned();
}
lines.join("; ")
}
}
/// Errors raised by the OpenRaft example cluster helpers.
#[derive(Debug, Error)]
pub enum OpenRaftClusterError {
#[error("openraft example requires at least {expected} node clients, got {actual}")]
InsufficientClients { expected: usize, actual: usize },
#[error("failed to query openraft node state: {0}")]
Client(#[source] anyhow::Error),
#[error(
"timed out waiting for {action} after {timeout:?}; last observation: {last_observation}"
)]
Timeout {
action: &'static str,
timeout: Duration,
last_observation: String,
},
#[error("timed out resolving node client for {node_id} after {timeout:?}")]
ClientResolution { node_id: u64, timeout: Duration },
}
/// Ensures the example cluster has the expected number of node clients.
pub fn ensure_cluster_size(
clients: &[OpenRaftKvClient],
expected: usize,
) -> Result<(), OpenRaftClusterError> {
if clients.len() < expected {
return Err(OpenRaftClusterError::InsufficientClients {
expected,
actual: clients.len(),
});
}
Ok(())
}
/// Waits until the cluster converges on one leader.
pub async fn wait_for_leader(
clients: &[OpenRaftKvClient],
timeout: Duration,
different_from: Option<u64>,
) -> Result<u64, OpenRaftClusterError> {
let deadline = Instant::now() + timeout;
loop {
let last_observation = OpenRaftObservation::capture(clients).await;
if let Some(leader) = last_observation.agreed_leader(different_from) {
return Ok(leader);
}
if Instant::now() >= deadline {
return Err(OpenRaftClusterError::Timeout {
action: "leader agreement",
timeout,
last_observation: last_observation.summary(),
});
}
sleep(POLL_INTERVAL).await;
}
}
/// Waits until every node reports the expected voter set.
pub async fn wait_for_membership(
clients: &[OpenRaftKvClient],
expected_voters: &BTreeSet<u64>,
timeout: Duration,
) -> Result<(), OpenRaftClusterError> {
let deadline = Instant::now() + timeout;
loop {
let last_observation = OpenRaftObservation::capture(clients).await;
if last_observation.all_voters_match(expected_voters) {
return Ok(());
}
if Instant::now() >= deadline {
return Err(OpenRaftClusterError::Timeout {
action: "membership convergence",
timeout,
last_observation: last_observation.summary(),
});
}
sleep(POLL_INTERVAL).await;
}
}
/// Waits until every node reports the full replicated key set.
pub async fn wait_for_replication(
clients: &[OpenRaftKvClient],
expected: &BTreeMap<String, String>,
timeout: Duration,
) -> Result<(), OpenRaftClusterError> {
let deadline = Instant::now() + timeout;
loop {
let last_observation = OpenRaftObservation::capture(clients).await;
if last_observation.all_kv_match(expected) {
return Ok(());
}
if Instant::now() >= deadline {
return Err(OpenRaftClusterError::Timeout {
action: "replicated state convergence",
timeout,
last_observation: last_observation.summary(),
});
}
sleep(POLL_INTERVAL).await;
}
}
/// Resolves the client handle that currently identifies as `node_id`.
pub async fn resolve_client_for_node(
clients: &[OpenRaftKvClient],
node_id: u64,
timeout: Duration,
) -> Result<OpenRaftKvClient, OpenRaftClusterError> {
let deadline = Instant::now() + timeout;
loop {
for client in clients {
let Ok(state) = client.state().await else {
continue;
};
if state.node_id == node_id {
return Ok(client.clone());
}
}
if Instant::now() >= deadline {
return Err(OpenRaftClusterError::ClientResolution { node_id, timeout });
}
sleep(CLIENT_RESOLUTION_INTERVAL).await;
}
}
/// Issues a contiguous batch of writes through the current leader.
pub async fn write_batch(
leader: &OpenRaftKvClient,
prefix: &str,
start: usize,
count: usize,
) -> Result<(), OpenRaftClusterError> {
for index in start..(start + count) {
let key = format!("{prefix}-{index}");
let value = format!("value-{index}");
leader
.write(&key, &value, index as u64 + 1)
.await
.map_err(OpenRaftClusterError::Client)?;
}
Ok(())
}
/// Builds the replicated key/value map expected after the workload completes.
#[must_use]
pub fn expected_kv(prefix: &str, total_writes: usize) -> BTreeMap<String, String> {
(0..total_writes)
.map(|index| (format!("{prefix}-{index}"), format!("value-{index}")))
.collect()
}

View File

@ -1,45 +0,0 @@
# Scheduler Example
This example runs a small replicated job scheduler with worker leases.
The scenario enqueues jobs, lets one worker claim them, stops making progress,
and then checks that another worker can reclaim and complete them after the
lease expires.
## How TF runs this
Each example follows the same pattern:
- TF starts a small deployment of scheduler nodes
- the workload drives the worker flow through the HTTP API
- the expectation checks that jobs are eventually reclaimed and completed
## Scenario
- `basic_failover` runs the failover flow locally
- `compose_failover` runs the same flow in Docker Compose
## API
Each node exposes:
- `POST /jobs/enqueue` to add jobs
- `POST /jobs/claim` to claim pending jobs with a lease
- `POST /jobs/heartbeat` to extend a lease
- `POST /jobs/ack` to mark a job complete
- `GET /jobs/state` to inspect scheduler state
- `GET /internal/snapshot` to read the local replicated state
## Run locally
```bash
cargo run -p scheduler-examples --bin basic_failover
```
## Run with Docker Compose
```bash
cargo run -p scheduler-examples --bin compose_failover
```
Set `SCHEDULER_IMAGE` to override the default compose image tag.

View File

@ -1,15 +0,0 @@
[package]
edition.workspace = true
license.workspace = true
name = "scheduler-examples"
version.workspace = true
[dependencies]
anyhow = "1.0"
scheduler-runtime-ext = { path = "../testing/integration" }
scheduler-runtime-workloads = { path = "../testing/workloads" }
testing-framework-core = { workspace = true }
testing-framework-runner-compose = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@ -1,33 +0,0 @@
use std::time::Duration;
use scheduler_runtime_ext::SchedulerLocalDeployer;
use scheduler_runtime_workloads::{
SchedulerBuilderExt, SchedulerDrained, SchedulerLeaseFailoverWorkload,
SchedulerScenarioBuilder, SchedulerTopology,
};
use testing_framework_core::scenario::Deployer;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let jobs = 100;
let mut scenario = SchedulerScenarioBuilder::deployment_with(|_| SchedulerTopology::new(3))
.with_run_duration(Duration::from_secs(35))
.with_workload(
SchedulerLeaseFailoverWorkload::new()
.operations(jobs)
.lease_ttl(Duration::from_secs(3))
.rate_per_sec(25),
)
.with_expectation(SchedulerDrained::new(jobs).timeout(Duration::from_secs(30)))
.build()?;
let deployer = SchedulerLocalDeployer::default();
let runner = deployer.deploy(&scenario).await?;
runner.run(&mut scenario).await?;
Ok(())
}

View File

@ -1,49 +0,0 @@
use std::time::Duration;
use anyhow::{Context as _, Result};
use scheduler_runtime_workloads::{
SchedulerBuilderExt, SchedulerDrained, SchedulerLeaseFailoverWorkload,
SchedulerScenarioBuilder, SchedulerTopology,
};
use testing_framework_core::scenario::Deployer;
use testing_framework_runner_compose::ComposeRunnerError;
use tracing::{info, warn};
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let jobs = 100;
let mut scenario = SchedulerScenarioBuilder::deployment_with(|_| SchedulerTopology::new(3))
.with_run_duration(Duration::from_secs(35))
.with_workload(
SchedulerLeaseFailoverWorkload::new()
.operations(jobs)
.lease_ttl(Duration::from_secs(3))
.rate_per_sec(20),
)
.with_expectation(SchedulerDrained::new(jobs).timeout(Duration::from_secs(30)))
.build()?;
let deployer = scheduler_runtime_ext::SchedulerComposeDeployer::new();
let runner = match deployer.deploy(&scenario).await {
Ok(runner) => runner,
Err(ComposeRunnerError::DockerUnavailable) => {
warn!("docker unavailable; skipping scheduler compose run");
return Ok(());
}
Err(error) => {
return Err(anyhow::Error::new(error)).context("deploying scheduler compose stack");
}
};
info!("running scheduler compose failover scenario");
runner
.run(&mut scenario)
.await
.context("running scheduler compose scenario")?;
Ok(())
}

View File

@ -1,40 +0,0 @@
use reqwest::Url;
use serde::Serialize;
#[derive(Clone)]
pub struct SchedulerHttpClient {
base_url: Url,
client: reqwest::Client,
}
impl SchedulerHttpClient {
#[must_use]
pub fn new(base_url: Url) -> Self {
Self {
base_url,
client: reqwest::Client::new(),
}
}
pub async fn get<T: serde::de::DeserializeOwned>(&self, path: &str) -> anyhow::Result<T> {
let url = self.base_url.join(path)?;
let response = self.client.get(url).send().await?.error_for_status()?;
Ok(response.json().await?)
}
pub async fn post<B: Serialize, T: serde::de::DeserializeOwned>(
&self,
path: &str,
body: &B,
) -> anyhow::Result<T> {
let url = self.base_url.join(path)?;
let response = self
.client
.post(url)
.json(body)
.send()
.await?
.error_for_status()?;
Ok(response.json().await?)
}
}

View File

@ -1,35 +0,0 @@
use std::{fs, path::Path};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PeerInfo {
pub node_id: u64,
pub http_address: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SchedulerConfig {
pub node_id: u64,
pub http_port: u16,
pub peers: Vec<PeerInfo>,
#[serde(default = "default_sync_interval_ms")]
pub sync_interval_ms: u64,
#[serde(default = "default_lease_ttl_ms")]
pub lease_ttl_ms: u64,
}
impl SchedulerConfig {
pub fn load(path: &Path) -> anyhow::Result<Self> {
let raw = fs::read_to_string(path)?;
Ok(serde_yaml::from_str(&raw)?)
}
}
const fn default_sync_interval_ms() -> u64 {
1000
}
const fn default_lease_ttl_ms() -> u64 {
3000
}

View File

@ -1,3 +0,0 @@
pub mod client;
pub use client::SchedulerHttpClient;

View File

@ -1,36 +0,0 @@
mod config;
mod server;
mod state;
mod sync;
use std::path::PathBuf;
use clap::Parser;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use crate::{config::SchedulerConfig, state::SchedulerState, sync::SyncService};
#[derive(Parser, Debug)]
#[command(name = "scheduler-node")]
struct Args {
#[arg(short, long)]
config: PathBuf,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "scheduler_node=info,tower_http=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let args = Args::parse();
let config = SchedulerConfig::load(&args.config)?;
let state = SchedulerState::new(config.node_id, config.lease_ttl_ms);
SyncService::new(config.clone(), state.clone()).start();
server::start_server(config, state).await
}

View File

@ -1,156 +0,0 @@
use std::net::SocketAddr;
use axum::{
Router,
extract::State,
http::StatusCode,
response::Json,
routing::{get, post},
};
use serde::{Deserialize, Serialize};
use tower_http::trace::TraceLayer;
use crate::{
config::SchedulerConfig,
state::{SchedulerState, Snapshot, StateView},
};
#[derive(Serialize)]
struct HealthResponse {
status: &'static str,
}
#[derive(Deserialize)]
struct EnqueueRequest {
payload: String,
}
#[derive(Serialize)]
struct EnqueueResponse {
id: u64,
}
#[derive(Deserialize)]
struct ClaimRequest {
worker_id: String,
max_jobs: usize,
}
#[derive(Serialize)]
struct ClaimResponse {
jobs: Vec<ClaimedJob>,
}
#[derive(Serialize)]
struct ClaimedJob {
id: u64,
payload: String,
attempt: u32,
}
#[derive(Deserialize)]
struct HeartbeatRequest {
worker_id: String,
job_id: u64,
}
#[derive(Deserialize)]
struct AckRequest {
worker_id: String,
job_id: u64,
}
#[derive(Serialize)]
struct OperationResponse {
ok: bool,
}
pub async fn start_server(config: SchedulerConfig, state: SchedulerState) -> anyhow::Result<()> {
let app = Router::new()
.route("/health/live", get(health_live))
.route("/health/ready", get(health_ready))
.route("/jobs/enqueue", post(enqueue))
.route("/jobs/claim", post(claim))
.route("/jobs/heartbeat", post(heartbeat))
.route("/jobs/ack", post(ack))
.route("/jobs/state", get(state_view))
.route("/internal/snapshot", get(snapshot))
.layer(TraceLayer::new_for_http())
.with_state(state.clone());
let addr = SocketAddr::from(([0, 0, 0, 0], config.http_port));
let listener = tokio::net::TcpListener::bind(addr).await?;
state.set_ready(true).await;
tracing::info!(node_id = state.node_id(), %addr, "scheduler node ready");
axum::serve(listener, app).await?;
Ok(())
}
async fn health_live() -> (StatusCode, Json<HealthResponse>) {
(StatusCode::OK, Json(HealthResponse { status: "alive" }))
}
async fn health_ready(State(state): State<SchedulerState>) -> (StatusCode, Json<HealthResponse>) {
if state.is_ready().await {
(StatusCode::OK, Json(HealthResponse { status: "ready" }))
} else {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(HealthResponse {
status: "not-ready",
}),
)
}
}
async fn enqueue(
State(state): State<SchedulerState>,
Json(request): Json<EnqueueRequest>,
) -> (StatusCode, Json<EnqueueResponse>) {
let id = state.enqueue(request.payload).await;
(StatusCode::OK, Json(EnqueueResponse { id }))
}
async fn claim(
State(state): State<SchedulerState>,
Json(request): Json<ClaimRequest>,
) -> (StatusCode, Json<ClaimResponse>) {
let result = state.claim(request.worker_id, request.max_jobs).await;
let jobs = result
.jobs
.into_iter()
.map(|job| ClaimedJob {
id: job.id,
payload: job.payload,
attempt: job.attempt,
})
.collect();
(StatusCode::OK, Json(ClaimResponse { jobs }))
}
async fn heartbeat(
State(state): State<SchedulerState>,
Json(request): Json<HeartbeatRequest>,
) -> (StatusCode, Json<OperationResponse>) {
let ok = state.heartbeat(&request.worker_id, request.job_id).await;
(StatusCode::OK, Json(OperationResponse { ok }))
}
async fn ack(
State(state): State<SchedulerState>,
Json(request): Json<AckRequest>,
) -> (StatusCode, Json<OperationResponse>) {
let ok = state.ack(&request.worker_id, request.job_id).await;
(StatusCode::OK, Json(OperationResponse { ok }))
}
async fn state_view(State(state): State<SchedulerState>) -> Json<StateView> {
Json(state.state_view().await)
}
async fn snapshot(State(state): State<SchedulerState>) -> Json<Snapshot> {
Json(state.snapshot().await)
}

View File

@ -1,249 +0,0 @@
use std::{
collections::BTreeMap,
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
pub struct Revision {
pub version: u64,
pub origin: u64,
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct JobRecord {
pub id: u64,
pub payload: String,
pub attempt: u32,
pub owner: Option<String>,
pub lease_expires_at_ms: Option<u64>,
pub done: bool,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Snapshot {
pub node_id: u64,
pub revision: Revision,
pub next_id: u64,
pub jobs: BTreeMap<u64, JobRecord>,
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub struct StateView {
pub revision: Revision,
pub next_id: u64,
pub pending: usize,
pub leased: usize,
pub done: usize,
}
#[derive(Clone, Debug, Serialize)]
pub struct ClaimResult {
pub jobs: Vec<JobRecord>,
}
#[derive(Debug, Default)]
struct Data {
revision: Revision,
next_id: u64,
jobs: BTreeMap<u64, JobRecord>,
}
#[derive(Clone)]
pub struct SchedulerState {
node_id: u64,
ready: Arc<RwLock<bool>>,
lease_ttl_ms: u64,
data: Arc<RwLock<Data>>,
}
impl SchedulerState {
pub fn new(node_id: u64, lease_ttl_ms: u64) -> Self {
Self {
node_id,
ready: Arc::new(RwLock::new(false)),
lease_ttl_ms,
data: Arc::new(RwLock::new(Data {
next_id: 1,
..Data::default()
})),
}
}
pub const fn node_id(&self) -> u64 {
self.node_id
}
pub async fn set_ready(&self, value: bool) {
*self.ready.write().await = value;
}
pub async fn is_ready(&self) -> bool {
*self.ready.read().await
}
pub async fn enqueue(&self, payload: String) -> u64 {
let mut data = self.data.write().await;
reap_expired_leases(&mut data.jobs);
let id = data.next_id;
data.next_id = data.next_id.saturating_add(1);
data.jobs.insert(
id,
JobRecord {
id,
payload,
attempt: 0,
owner: None,
lease_expires_at_ms: None,
done: false,
},
);
bump_revision(&mut data.revision, self.node_id);
id
}
pub async fn claim(&self, worker_id: String, max_jobs: usize) -> ClaimResult {
let mut data = self.data.write().await;
reap_expired_leases(&mut data.jobs);
let now = unix_ms();
let mut claimed = Vec::new();
for job in data.jobs.values_mut() {
if claimed.len() >= max_jobs {
break;
}
if job.done || job.owner.is_some() {
continue;
}
job.attempt = job.attempt.saturating_add(1);
job.owner = Some(worker_id.clone());
job.lease_expires_at_ms = Some(now.saturating_add(self.lease_ttl_ms));
claimed.push(job.clone());
}
if !claimed.is_empty() {
bump_revision(&mut data.revision, self.node_id);
}
ClaimResult { jobs: claimed }
}
pub async fn heartbeat(&self, worker_id: &str, job_id: u64) -> bool {
let mut data = self.data.write().await;
reap_expired_leases(&mut data.jobs);
let Some(job) = data.jobs.get_mut(&job_id) else {
return false;
};
if job.done || job.owner.as_deref() != Some(worker_id) {
return false;
}
job.lease_expires_at_ms = Some(unix_ms().saturating_add(self.lease_ttl_ms));
bump_revision(&mut data.revision, self.node_id);
true
}
pub async fn ack(&self, worker_id: &str, job_id: u64) -> bool {
let mut data = self.data.write().await;
reap_expired_leases(&mut data.jobs);
let Some(job) = data.jobs.get_mut(&job_id) else {
return false;
};
if job.done || job.owner.as_deref() != Some(worker_id) {
return false;
}
job.done = true;
job.owner = None;
job.lease_expires_at_ms = None;
bump_revision(&mut data.revision, self.node_id);
true
}
pub async fn state_view(&self) -> StateView {
let data = self.data.read().await;
let mut pending = 0;
let mut leased = 0;
let mut done = 0;
for job in data.jobs.values() {
if job.done {
done += 1;
} else if job.owner.is_some() {
leased += 1;
} else {
pending += 1;
}
}
StateView {
revision: data.revision,
next_id: data.next_id,
pending,
leased,
done,
}
}
pub async fn merge_snapshot(&self, snapshot: Snapshot) {
let mut data = self.data.write().await;
if is_newer_revision(snapshot.revision, data.revision) {
data.revision = snapshot.revision;
data.next_id = snapshot.next_id;
data.jobs = snapshot.jobs;
}
}
pub async fn snapshot(&self) -> Snapshot {
let data = self.data.read().await;
Snapshot {
node_id: self.node_id,
revision: data.revision,
next_id: data.next_id,
jobs: data.jobs.clone(),
}
}
}
fn reap_expired_leases(jobs: &mut BTreeMap<u64, JobRecord>) {
let now = unix_ms();
for job in jobs.values_mut() {
if job.done {
continue;
}
if let Some(expiry) = job.lease_expires_at_ms
&& expiry <= now
{
job.owner = None;
job.lease_expires_at_ms = None;
}
}
}
fn bump_revision(revision: &mut Revision, node_id: u64) {
revision.version = revision.version.saturating_add(1);
revision.origin = node_id;
}
fn is_newer_revision(candidate: Revision, existing: Revision) -> bool {
(candidate.version, candidate.origin) > (existing.version, existing.origin)
}
fn unix_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |duration| duration.as_millis() as u64)
}

View File

@ -1,103 +0,0 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use reqwest::Client;
use tokio::sync::Mutex;
use tracing::{debug, warn};
use crate::{
config::SchedulerConfig,
state::{SchedulerState, Snapshot},
};
const WARN_AFTER_CONSECUTIVE_FAILURES: u32 = 5;
#[derive(Clone)]
pub struct SyncService {
config: Arc<SchedulerConfig>,
state: SchedulerState,
client: Client,
failures_by_peer: Arc<Mutex<HashMap<String, u32>>>,
}
impl SyncService {
pub fn new(config: SchedulerConfig, state: SchedulerState) -> Self {
Self {
config: Arc::new(config),
state,
client: Client::new(),
failures_by_peer: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn start(&self) {
let service = self.clone();
tokio::spawn(async move {
service.run().await;
});
}
async fn run(self) {
let interval = Duration::from_millis(self.config.sync_interval_ms.max(100));
loop {
self.sync_once().await;
tokio::time::sleep(interval).await;
}
}
async fn sync_once(&self) {
for peer in &self.config.peers {
match self.fetch_snapshot(&peer.http_address).await {
Ok(snapshot) => {
self.state.merge_snapshot(snapshot).await;
self.clear_failure_counter(&peer.http_address).await;
}
Err(error) => {
self.record_sync_failure(&peer.http_address, &error).await;
}
}
}
}
async fn fetch_snapshot(&self, peer_address: &str) -> anyhow::Result<Snapshot> {
let url = format!("http://{peer_address}/internal/snapshot");
let snapshot = self
.client
.get(url)
.send()
.await?
.error_for_status()?
.json()
.await?;
Ok(snapshot)
}
async fn clear_failure_counter(&self, peer_address: &str) {
let mut failures = self.failures_by_peer.lock().await;
failures.remove(peer_address);
}
async fn record_sync_failure(&self, peer_address: &str, error: &anyhow::Error) {
let consecutive_failures = {
let mut failures = self.failures_by_peer.lock().await;
let entry = failures.entry(peer_address.to_owned()).or_insert(0);
*entry += 1;
*entry
};
if consecutive_failures >= WARN_AFTER_CONSECUTIVE_FAILURES {
warn!(
peer = %peer_address,
%error,
consecutive_failures,
"scheduler sync repeatedly failing"
);
} else {
debug!(
peer = %peer_address,
%error,
consecutive_failures,
"scheduler sync failed"
);
}
}
}

View File

@ -1,77 +0,0 @@
use std::io::Error;
use async_trait::async_trait;
use scheduler_node::SchedulerHttpClient;
use serde::{Deserialize, Serialize};
use testing_framework_core::scenario::{
Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView, DynError,
NodeAccess, serialize_cluster_yaml_config,
};
pub type SchedulerTopology = testing_framework_core::topology::ClusterTopology;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SchedulerPeerInfo {
pub node_id: u64,
pub http_address: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SchedulerNodeConfig {
pub node_id: u64,
pub http_port: u16,
pub peers: Vec<SchedulerPeerInfo>,
pub sync_interval_ms: u64,
pub lease_ttl_ms: u64,
}
pub struct SchedulerEnv;
#[async_trait]
impl Application for SchedulerEnv {
type Deployment = SchedulerTopology;
type NodeClient = SchedulerHttpClient;
type NodeConfig = SchedulerNodeConfig;
fn build_node_client(access: &NodeAccess) -> Result<Self::NodeClient, DynError> {
Ok(SchedulerHttpClient::new(access.api_base_url()?))
}
fn node_readiness_path() -> &'static str {
"/health/ready"
}
}
impl ClusterNodeConfigApplication for SchedulerEnv {
type ConfigError = Error;
fn static_network_port() -> u16 {
8080
}
fn build_cluster_node_config(
node: &ClusterNodeView,
peers: &[ClusterPeerView],
) -> Result<Self::NodeConfig, Self::ConfigError> {
let peers = peers
.iter()
.map(|peer| SchedulerPeerInfo {
node_id: peer.index() as u64,
http_address: peer.authority(),
})
.collect::<Vec<_>>();
Ok(SchedulerNodeConfig {
node_id: node.index() as u64,
http_port: node.network_port(),
peers,
sync_interval_ms: 500,
lease_ttl_ms: 3000,
})
}
fn serialize_cluster_node_config(
config: &Self::NodeConfig,
) -> Result<String, Self::ConfigError> {
serialize_cluster_yaml_config(config).map_err(Error::other)
}
}

View File

@ -1,15 +0,0 @@
use testing_framework_runner_compose::{BinaryConfigNodeSpec, ComposeBinaryApp};
use crate::SchedulerEnv;
const NODE_CONFIG_PATH: &str = "/etc/scheduler/config.yaml";
impl ComposeBinaryApp for SchedulerEnv {
fn compose_node_spec() -> BinaryConfigNodeSpec {
BinaryConfigNodeSpec::conventional(
"/usr/local/bin/scheduler-node",
NODE_CONFIG_PATH,
vec![8080, 8081],
)
}
}

View File

@ -1,10 +0,0 @@
mod app;
mod compose_env;
mod local_env;
pub mod scenario;
pub use app::*;
pub use scenario::{SchedulerBuilderExt, SchedulerScenarioBuilder};
pub type SchedulerLocalDeployer = testing_framework_runner_local::ProcessDeployer<SchedulerEnv>;
pub type SchedulerComposeDeployer = testing_framework_runner_compose::ComposeDeployer<SchedulerEnv>;

View File

@ -1,42 +0,0 @@
use std::collections::HashMap;
use testing_framework_core::scenario::{DynError, StartNodeOptions};
use testing_framework_runner_local::{
LocalBinaryApp, LocalNodePorts, LocalPeerNode, LocalProcessSpec,
build_local_cluster_node_config, yaml_node_config,
};
use crate::{SchedulerEnv, SchedulerNodeConfig};
impl LocalBinaryApp for SchedulerEnv {
fn initial_node_name_prefix() -> &'static str {
"scheduler-node"
}
fn build_local_node_config_with_peers(
_topology: &Self::Deployment,
index: usize,
ports: &LocalNodePorts,
peers: &[LocalPeerNode],
_peer_ports_by_name: &HashMap<String, u16>,
_options: &StartNodeOptions<Self>,
_template_config: Option<
&<Self as testing_framework_core::scenario::Application>::NodeConfig,
>,
) -> Result<<Self as testing_framework_core::scenario::Application>::NodeConfig, DynError> {
build_local_cluster_node_config::<Self>(index, ports, peers)
}
fn local_process_spec() -> LocalProcessSpec {
LocalProcessSpec::new("SCHEDULER_NODE_BIN", "scheduler-node")
.with_rust_log("scheduler_node=info")
}
fn render_local_config(config: &SchedulerNodeConfig) -> Result<Vec<u8>, DynError> {
yaml_node_config(config)
}
fn http_api_port(config: &SchedulerNodeConfig) -> u16 {
config.http_port
}
}

View File

@ -1,15 +0,0 @@
use testing_framework_core::scenario::ScenarioBuilder;
use crate::{SchedulerEnv, SchedulerTopology};
pub type SchedulerScenarioBuilder = ScenarioBuilder<SchedulerEnv>;
pub trait SchedulerBuilderExt: Sized {
fn deployment_with(f: impl FnOnce(SchedulerTopology) -> SchedulerTopology) -> Self;
}
impl SchedulerBuilderExt for SchedulerScenarioBuilder {
fn deployment_with(f: impl FnOnce(SchedulerTopology) -> SchedulerTopology) -> Self {
SchedulerScenarioBuilder::with_deployment(f(SchedulerTopology::new(3)))
}
}

View File

@ -1,14 +0,0 @@
[package]
edition.workspace = true
license.workspace = true
name = "scheduler-runtime-workloads"
version.workspace = true
[dependencies]
async-trait = { workspace = true }
scheduler-node = { path = "../../scheduler-node" }
scheduler-runtime-ext = { path = "../integration" }
serde = { workspace = true }
testing-framework-core = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }

View File

@ -1,99 +0,0 @@
use std::time::Duration;
use async_trait::async_trait;
use scheduler_runtime_ext::SchedulerEnv;
use serde::Deserialize;
use testing_framework_core::scenario::{DynError, Expectation, RunContext};
use tracing::info;
#[derive(Clone)]
pub struct SchedulerDrained {
min_done: usize,
timeout: Duration,
poll_interval: Duration,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
struct Revision {
version: u64,
origin: u64,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
struct StateResponse {
revision: Revision,
pending: usize,
leased: usize,
done: usize,
}
impl SchedulerDrained {
#[must_use]
pub fn new(min_done: usize) -> Self {
Self {
min_done,
timeout: Duration::from_secs(30),
poll_interval: Duration::from_millis(500),
}
}
#[must_use]
pub const fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
}
#[async_trait]
impl Expectation<SchedulerEnv> for SchedulerDrained {
fn name(&self) -> &str {
"scheduler_drained"
}
async fn evaluate(&mut self, ctx: &RunContext<SchedulerEnv>) -> Result<(), DynError> {
let clients = ctx.node_clients().snapshot();
if clients.is_empty() {
return Err("no scheduler node clients available".into());
}
let deadline = tokio::time::Instant::now() + self.timeout;
while tokio::time::Instant::now() < deadline {
if is_drained_and_converged(&clients, self.min_done).await? {
info!(min_done = self.min_done, "scheduler drained and converged");
return Ok(());
}
tokio::time::sleep(self.poll_interval).await;
}
Err(format!("scheduler not drained within {:?}", self.timeout).into())
}
}
async fn is_drained_and_converged(
clients: &[scheduler_node::SchedulerHttpClient],
min_done: usize,
) -> Result<bool, DynError> {
let Some((first, rest)) = clients.split_first() else {
return Ok(false);
};
let baseline = read_state(first).await?;
if baseline.pending != 0 || baseline.leased != 0 || baseline.done < min_done {
return Ok(false);
}
for client in rest {
let current = read_state(client).await?;
if current != baseline {
return Ok(false);
}
}
Ok(true)
}
async fn read_state(
client: &scheduler_node::SchedulerHttpClient,
) -> Result<StateResponse, DynError> {
Ok(client.get("/jobs/state").await?)
}

View File

@ -1,205 +0,0 @@
use std::{collections::HashSet, time::Duration};
use async_trait::async_trait;
use scheduler_runtime_ext::SchedulerEnv;
use serde::{Deserialize, Serialize};
use testing_framework_core::scenario::{DynError, RunContext, Workload};
use tokio::time::{Instant, sleep};
use tracing::info;
#[derive(Clone)]
pub struct SchedulerLeaseFailoverWorkload {
operations: usize,
lease_ttl: Duration,
rate_per_sec: Option<usize>,
payload_prefix: String,
}
#[derive(Serialize)]
struct EnqueueRequest {
payload: String,
}
#[derive(Deserialize)]
struct EnqueueResponse {
id: u64,
}
#[derive(Serialize)]
struct ClaimRequest {
worker_id: String,
max_jobs: usize,
}
#[derive(Deserialize)]
struct ClaimedJob {
id: u64,
}
#[derive(Deserialize)]
struct ClaimResponse {
jobs: Vec<ClaimedJob>,
}
#[derive(Serialize)]
struct AckRequest {
worker_id: String,
job_id: u64,
}
#[derive(Deserialize)]
struct OperationResponse {
ok: bool,
}
impl SchedulerLeaseFailoverWorkload {
#[must_use]
pub fn new() -> Self {
Self {
operations: 100,
lease_ttl: Duration::from_secs(3),
rate_per_sec: Some(25),
payload_prefix: "scheduler-job".to_owned(),
}
}
#[must_use]
pub const fn operations(mut self, value: usize) -> Self {
self.operations = value;
self
}
#[must_use]
pub const fn lease_ttl(mut self, value: Duration) -> Self {
self.lease_ttl = value;
self
}
#[must_use]
pub const fn rate_per_sec(mut self, value: usize) -> Self {
self.rate_per_sec = Some(value);
self
}
}
impl Default for SchedulerLeaseFailoverWorkload {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Workload<SchedulerEnv> for SchedulerLeaseFailoverWorkload {
fn name(&self) -> &str {
"scheduler_lease_failover_workload"
}
async fn start(&self, ctx: &RunContext<SchedulerEnv>) -> Result<(), DynError> {
let clients = ctx.node_clients().snapshot();
let Some(node_a) = clients.first() else {
return Err("no scheduler node clients available".into());
};
let node_b = clients.get(1).unwrap_or(node_a);
let interval = self.rate_per_sec.and_then(compute_interval);
let mut enqueued_ids = Vec::with_capacity(self.operations);
info!(
operations = self.operations,
"scheduler failover: enqueue phase"
);
for index in 0..self.operations {
let response: EnqueueResponse = node_a
.post(
"/jobs/enqueue",
&EnqueueRequest {
payload: format!("{}-{index}", self.payload_prefix),
},
)
.await?;
enqueued_ids.push(response.id);
if let Some(delay) = interval {
sleep(delay).await;
}
}
info!("scheduler failover: worker-a claim without ack");
let first_claim: ClaimResponse = node_a
.post(
"/jobs/claim",
&ClaimRequest {
worker_id: "worker-a".to_owned(),
max_jobs: self.operations,
},
)
.await?;
if first_claim.jobs.len() != self.operations {
return Err(format!(
"worker-a claimed {} jobs, expected {}",
first_claim.jobs.len(),
self.operations
)
.into());
}
sleep(self.lease_ttl + Duration::from_millis(500)).await;
info!("scheduler failover: worker-b reclaim and ack");
let mut pending_ids: HashSet<u64> = enqueued_ids.into_iter().collect();
let reclaim_deadline = Instant::now() + Duration::from_secs(20);
while !pending_ids.is_empty() && Instant::now() < reclaim_deadline {
let claim: ClaimResponse = node_b
.post(
"/jobs/claim",
&ClaimRequest {
worker_id: "worker-b".to_owned(),
max_jobs: pending_ids.len(),
},
)
.await?;
if claim.jobs.is_empty() {
sleep(Duration::from_millis(200)).await;
continue;
}
for job in claim.jobs {
if !pending_ids.remove(&job.id) {
return Err(format!("unexpected reclaimed job id {}", job.id).into());
}
let ack: OperationResponse = node_b
.post(
"/jobs/ack",
&AckRequest {
worker_id: "worker-b".to_owned(),
job_id: job.id,
},
)
.await?;
if !ack.ok {
return Err(format!("failed to ack reclaimed job {}", job.id).into());
}
}
}
if !pending_ids.is_empty() {
return Err(
format!("scheduler failover left {} unacked jobs", pending_ids.len()).into(),
);
}
Ok(())
}
}
fn compute_interval(rate_per_sec: usize) -> Option<Duration> {
if rate_per_sec == 0 {
return None;
}
Some(Duration::from_millis((1000 / rate_per_sec as u64).max(1)))
}

View File

@ -1,8 +0,0 @@
mod drained;
mod lease_failover;
pub use drained::SchedulerDrained;
pub use lease_failover::SchedulerLeaseFailoverWorkload;
pub use scheduler_runtime_ext::{
SchedulerBuilderExt, SchedulerEnv, SchedulerScenarioBuilder, SchedulerTopology,
};