feat(examples): observe openraft scenario state

This commit is contained in:
andrussal 2026-04-12 11:40:35 +02:00
parent f3f069c321
commit fc2156265e
9 changed files with 355 additions and 113 deletions

1
Cargo.lock generated
View File

@ -1804,6 +1804,7 @@ dependencies = [
name = "openraft-kv-runtime-ext"
version = "0.1.0"
dependencies = [
"async-trait",
"openraft-kv-node",
"reqwest",
"testing-framework-core",

View File

@ -21,6 +21,7 @@ pub fn build_failover_scenario(
) -> anyhow::Result<Scenario<OpenRaftKvEnv, NodeControlCapability>> {
Ok(
OpenRaftKvScenarioBuilder::deployment_with(|deployment| deployment)
.with_cluster_observer()
.enable_node_control()
.with_run_duration(run_duration)
.with_workload(

View File

@ -5,6 +5,7 @@ name = "openraft-kv-runtime-ext"
version.workspace = true
[dependencies]
async-trait = { workspace = true }
openraft-kv-node = { path = "../../openraft-kv-node" }
reqwest = { workspace = true }
testing-framework-core = { workspace = true }

View File

@ -2,9 +2,11 @@ mod app;
mod compose_env;
mod k8s_env;
mod local_env;
mod observation;
pub mod scenario;
pub use app::*;
pub use observation::*;
pub use scenario::{OpenRaftKvBuilderExt, OpenRaftKvScenarioBuilder};
/// Local process deployer for the OpenRaft example app.

View File

@ -0,0 +1,219 @@
use std::{
collections::{BTreeMap, BTreeSet},
time::Duration,
};
use async_trait::async_trait;
use openraft_kv_node::{OpenRaftKvClient, OpenRaftKvState};
use testing_framework_core::{
observation::{
BoxedSourceProvider, ObservationConfig, ObservedSource, Observer, StaticSourceProvider,
},
scenario::{Application, DynError, NodeClients},
};
use crate::OpenRaftKvEnv;
const OBSERVATION_INTERVAL: Duration = Duration::from_millis(250);
const OBSERVATION_HISTORY_LIMIT: usize = 16;
/// Materialized OpenRaft cluster state built from the latest node polls.
#[derive(Clone, Debug, Default)]
pub struct OpenRaftClusterSnapshot {
states: Vec<OpenRaftKvState>,
failures: Vec<OpenRaftSourceFailure>,
}
impl OpenRaftClusterSnapshot {
/// Returns the successfully observed node states sorted by node id.
#[must_use]
pub fn states(&self) -> &[OpenRaftKvState] {
&self.states
}
/// Returns `true` when the snapshot contains no successful node states.
#[must_use]
pub fn is_empty(&self) -> bool {
self.states.is_empty()
}
/// Returns the unique observed leader when all responding nodes agree.
#[must_use]
pub fn agreed_leader(&self, different_from: Option<u64>) -> Option<u64> {
let observed = self
.states
.iter()
.filter_map(|state| state.current_leader)
.collect::<BTreeSet<_>>();
let leader = observed.iter().next().copied()?;
(observed.len() == 1 && different_from != Some(leader)).then_some(leader)
}
/// Returns `true` when every observed node reports the expected voter set.
#[must_use]
pub fn all_voters_match(&self, expected_voters: &BTreeSet<u64>) -> bool {
!self.states.is_empty()
&& self.failures.is_empty()
&& self.states.iter().all(|state| {
state.voters.iter().copied().collect::<BTreeSet<_>>() == *expected_voters
})
}
/// Returns `true` when every observed node exposes the expected replicated
/// key/value data.
#[must_use]
pub fn all_kv_match(
&self,
expected: &BTreeMap<String, String>,
full_voter_set: &[u64],
) -> bool {
!self.states.is_empty()
&& self.failures.is_empty()
&& self.states.iter().all(|state| {
state.current_leader.is_some()
&& state.voters == full_voter_set
&& expected
.iter()
.all(|(key, value)| state.kv.get(key) == Some(value))
})
}
/// Returns a concise summary for timeout and validation errors.
#[must_use]
pub fn summary(&self) -> String {
let mut lines = self
.states
.iter()
.map(|state| {
format!(
"node={} leader={:?} voters={:?} keys={}",
state.node_id,
state.current_leader,
state.voters,
state.kv.len()
)
})
.collect::<Vec<_>>();
lines.extend(self.failures.iter().map(OpenRaftSourceFailure::summary));
if lines.is_empty() {
return "no state observed yet".to_owned();
}
lines.join("; ")
}
}
/// One failed source read captured during an observation cycle.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct OpenRaftSourceFailure {
source_name: String,
message: String,
}
impl OpenRaftSourceFailure {
fn new(source_name: &str, message: &str) -> Self {
Self {
source_name: source_name.to_owned(),
message: message.to_owned(),
}
}
fn summary(&self) -> String {
format!("source={} error={}", self.source_name, self.message)
}
}
/// Observer that keeps the latest per-node OpenRaft state.
#[derive(Clone, Debug, Default)]
pub struct OpenRaftClusterObserver;
impl OpenRaftClusterObserver {
/// Default runtime configuration for the OpenRaft example observer.
#[must_use]
pub fn config() -> ObservationConfig {
ObservationConfig {
interval: OBSERVATION_INTERVAL,
history_limit: OBSERVATION_HISTORY_LIMIT,
}
}
}
/// Captures one best-effort OpenRaft cluster snapshot from the provided node
/// clients.
pub async fn capture_openraft_cluster_snapshot(
clients: &[OpenRaftKvClient],
) -> OpenRaftClusterSnapshot {
capture_cluster_snapshot(&named_sources(clients.to_vec())).await
}
#[async_trait]
impl Observer for OpenRaftClusterObserver {
type Source = OpenRaftKvClient;
type State = OpenRaftClusterSnapshot;
type Snapshot = OpenRaftClusterSnapshot;
type Event = ();
async fn init(
&self,
sources: &[ObservedSource<Self::Source>],
) -> Result<Self::State, DynError> {
Ok(capture_cluster_snapshot(sources).await)
}
async fn poll(
&self,
sources: &[ObservedSource<Self::Source>],
state: &mut Self::State,
) -> Result<Vec<Self::Event>, DynError> {
*state = capture_cluster_snapshot(sources).await;
Ok(Vec::new())
}
fn snapshot(&self, state: &Self::State) -> Self::Snapshot {
state.clone()
}
}
/// Builds the fixed source provider used by the scenario-based OpenRaft
/// examples.
pub fn openraft_cluster_source_provider(
_deployment: &<OpenRaftKvEnv as Application>::Deployment,
node_clients: NodeClients<OpenRaftKvEnv>,
) -> Result<BoxedSourceProvider<OpenRaftKvClient>, DynError> {
Ok(Box::new(StaticSourceProvider::new(named_sources(
node_clients.snapshot(),
))))
}
fn named_sources(clients: Vec<OpenRaftKvClient>) -> Vec<ObservedSource<OpenRaftKvClient>> {
clients
.into_iter()
.enumerate()
.map(|(index, client)| ObservedSource::new(&format!("node-{index}"), client))
.collect()
}
async fn capture_cluster_snapshot(
sources: &[ObservedSource<OpenRaftKvClient>],
) -> OpenRaftClusterSnapshot {
let mut states = Vec::with_capacity(sources.len());
let mut failures = Vec::new();
for source in sources {
match source.source.state().await {
Ok(state) => states.push(state),
Err(error) => {
failures.push(OpenRaftSourceFailure::new(&source.name, &error.to_string()))
}
}
}
states.sort_by_key(|state| state.node_id);
OpenRaftClusterSnapshot { states, failures }
}

View File

@ -1,6 +1,8 @@
use testing_framework_core::scenario::ScenarioBuilder;
use testing_framework_core::scenario::{CoreBuilderExt, ScenarioBuilder};
use crate::{OpenRaftKvEnv, OpenRaftKvTopology};
use crate::{
OpenRaftClusterObserver, OpenRaftKvEnv, OpenRaftKvTopology, openraft_cluster_source_provider,
};
/// Scenario builder alias used by the OpenRaft example binaries.
pub type OpenRaftKvScenarioBuilder = ScenarioBuilder<OpenRaftKvEnv>;
@ -10,10 +12,21 @@ pub trait OpenRaftKvBuilderExt: Sized {
/// Starts from the default three-node deployment and lets callers adjust
/// it.
fn deployment_with(f: impl FnOnce(OpenRaftKvTopology) -> OpenRaftKvTopology) -> Self;
/// Attaches the default OpenRaft cluster observer to the scenario.
fn with_cluster_observer(self) -> Self;
}
impl OpenRaftKvBuilderExt for OpenRaftKvScenarioBuilder {
fn deployment_with(f: impl FnOnce(OpenRaftKvTopology) -> OpenRaftKvTopology) -> Self {
OpenRaftKvScenarioBuilder::with_deployment(f(OpenRaftKvTopology::new(3)))
}
fn with_cluster_observer(self) -> Self {
self.with_observer(
OpenRaftClusterObserver,
openraft_cluster_source_provider,
OpenRaftClusterObserver::config(),
)
}
}

View File

@ -1,10 +1,13 @@
use std::time::Duration;
use async_trait::async_trait;
use openraft_kv_runtime_ext::OpenRaftKvEnv;
use testing_framework_core::scenario::{DynError, Expectation, RunContext};
use openraft_kv_runtime_ext::{OpenRaftClusterObserver, OpenRaftKvEnv};
use testing_framework_core::{
observation::ObservationHandle,
scenario::{DynError, Expectation, RunContext},
};
use crate::support::{expected_kv, wait_for_replication};
use crate::support::{expected_kv, wait_for_observed_replication};
/// Expectation that waits for the full voter set and the writes from this run
/// to converge on every node.
@ -49,9 +52,9 @@ impl Expectation<OpenRaftKvEnv> for OpenRaftKvConverges {
async fn evaluate(&mut self, ctx: &RunContext<OpenRaftKvEnv>) -> Result<(), DynError> {
let expected = expected_kv(&self.key_prefix, self.total_writes);
let clients = ctx.node_clients().snapshot();
let observer = ctx.require_extension::<ObservationHandle<OpenRaftClusterObserver>>()?;
wait_for_replication(&clients, &expected, self.timeout).await?;
wait_for_observed_replication(&observer, &expected, self.timeout).await?;
Ok(())
}

View File

@ -2,13 +2,16 @@ use std::time::Duration;
use async_trait::async_trait;
use openraft_kv_node::OpenRaftKvClient;
use openraft_kv_runtime_ext::OpenRaftKvEnv;
use testing_framework_core::scenario::{DynError, RunContext, Workload};
use openraft_kv_runtime_ext::{OpenRaftClusterObserver, OpenRaftKvEnv};
use testing_framework_core::{
observation::ObservationHandle,
scenario::{DynError, RunContext, Workload},
};
use tracing::info;
use crate::support::{
OpenRaftMembership, ensure_cluster_size, resolve_client_for_node, wait_for_leader,
wait_for_membership, write_batch,
OpenRaftMembership, ensure_cluster_size, resolve_client_for_node, wait_for_observed_leader,
wait_for_observed_membership, write_batch,
};
/// Workload that bootstraps the cluster, expands it to three voters, writes one
@ -77,19 +80,21 @@ impl Workload<OpenRaftKvEnv> for OpenRaftKvFailoverWorkload {
async fn start(&self, ctx: &RunContext<OpenRaftKvEnv>) -> Result<(), DynError> {
let clients = ctx.node_clients().snapshot();
let observer = ctx.require_extension::<ObservationHandle<OpenRaftClusterObserver>>()?;
ensure_cluster_size(&clients, 3)?;
self.bootstrap_cluster(&clients).await?;
let initial_leader = wait_for_leader(&clients, self.timeout, None).await?;
let initial_leader = wait_for_observed_leader(&observer, self.timeout, None).await?;
let membership = OpenRaftMembership::discover(&clients).await?;
self.promote_cluster(&clients, initial_leader, &membership)
self.promote_cluster(&observer, &clients, initial_leader, &membership)
.await?;
self.write_initial_batch(&clients, initial_leader).await?;
let new_leader = self
.restart_leader_and_wait_for_failover(ctx, &clients, initial_leader)
.restart_leader_and_wait_for_failover(ctx, &observer, initial_leader)
.await?;
self.write_second_batch(&clients, new_leader).await?;
@ -108,6 +113,7 @@ impl OpenRaftKvFailoverWorkload {
async fn promote_cluster(
&self,
observer: &ObservationHandle<OpenRaftClusterObserver>,
clients: &[OpenRaftKvClient],
leader_id: u64,
membership: &OpenRaftMembership,
@ -129,7 +135,7 @@ impl OpenRaftKvFailoverWorkload {
let voter_ids = membership.voter_ids();
leader.change_membership(voter_ids.iter().copied()).await?;
wait_for_membership(clients, &voter_ids, self.timeout).await?;
wait_for_observed_membership(observer, &voter_ids, self.timeout).await?;
Ok(())
}
@ -154,7 +160,7 @@ impl OpenRaftKvFailoverWorkload {
async fn restart_leader_and_wait_for_failover(
&self,
ctx: &RunContext<OpenRaftKvEnv>,
clients: &[OpenRaftKvClient],
observer: &ObservationHandle<OpenRaftClusterObserver>,
leader_id: u64,
) -> Result<u64, DynError> {
let Some(control) = ctx.node_control() else {
@ -166,7 +172,7 @@ impl OpenRaftKvFailoverWorkload {
control.restart_node(&leader_name).await?;
let new_leader = wait_for_leader(clients, self.timeout, Some(leader_id)).await?;
let new_leader = wait_for_observed_leader(observer, self.timeout, Some(leader_id)).await?;
info!(
old_leader = leader_id,

View File

@ -4,6 +4,10 @@ use std::{
};
use openraft_kv_node::{OpenRaftKvClient, OpenRaftKvState};
use openraft_kv_runtime_ext::{
OpenRaftClusterObserver, OpenRaftClusterSnapshot, capture_openraft_cluster_snapshot,
};
use testing_framework_core::observation::{ObservationHandle, ObservationSnapshot};
use thiserror::Error;
use tokio::time::{Instant, sleep};
@ -62,98 +66,6 @@ impl OpenRaftMembership {
}
}
/// One poll result across all known clients.
#[derive(Clone, Debug, Default)]
pub struct OpenRaftObservation {
states: Vec<OpenRaftKvState>,
failures: Vec<String>,
}
impl OpenRaftObservation {
/// Captures one best-effort view of the cluster.
pub async fn capture(clients: &[OpenRaftKvClient]) -> Self {
let mut states = Vec::with_capacity(clients.len());
let mut failures = Vec::new();
for (index, client) in clients.iter().enumerate() {
match client.state().await {
Ok(state) => states.push(state),
Err(error) => failures.push(format!("client_index={index} error={error}")),
}
}
states.sort_by_key(|state| state.node_id);
Self { states, failures }
}
/// Returns the unique observed leader when all responding nodes agree.
#[must_use]
pub fn agreed_leader(&self, different_from: Option<u64>) -> Option<u64> {
let observed = self
.states
.iter()
.filter_map(|state| state.current_leader)
.collect::<BTreeSet<_>>();
let leader = observed.iter().next().copied()?;
(observed.len() == 1 && different_from != Some(leader)).then_some(leader)
}
/// Returns `true` when every responding node reports the expected voter
/// set.
#[must_use]
pub fn all_voters_match(&self, expected_voters: &BTreeSet<u64>) -> bool {
!self.states.is_empty()
&& self.failures.is_empty()
&& self.states.iter().all(|state| {
state.voters.iter().copied().collect::<BTreeSet<_>>() == *expected_voters
})
}
/// Returns `true` when every responding node exposes the expected key/value
/// data.
#[must_use]
pub fn all_kv_match(&self, expected: &BTreeMap<String, String>) -> bool {
!self.states.is_empty()
&& self.failures.is_empty()
&& self.states.iter().all(|state| {
state.current_leader.is_some()
&& state.voters == FULL_VOTER_SET
&& expected
.iter()
.all(|(key, value)| state.kv.get(key) == Some(value))
})
}
/// Returns a concise summary for timeout errors.
#[must_use]
pub fn summary(&self) -> String {
let mut lines = self
.states
.iter()
.map(|state| {
format!(
"node={} leader={:?} voters={:?} keys={}",
state.node_id,
state.current_leader,
state.voters,
state.kv.len()
)
})
.collect::<Vec<_>>();
lines.extend(self.failures.iter().cloned());
if lines.is_empty() {
return "no state observed yet".to_owned();
}
lines.join("; ")
}
}
/// Errors raised by the OpenRaft example cluster helpers.
#[derive(Debug, Error)]
pub enum OpenRaftClusterError {
@ -161,6 +73,8 @@ pub enum OpenRaftClusterError {
InsufficientClients { expected: usize, actual: usize },
#[error("failed to query openraft node state: {0}")]
Client(#[source] anyhow::Error),
#[error("openraft cluster observation is not available yet")]
MissingObservation,
#[error(
"timed out waiting for {action} after {timeout:?}; last observation: {last_observation}"
)]
@ -197,7 +111,7 @@ pub async fn wait_for_leader(
let deadline = Instant::now() + timeout;
loop {
let last_observation = OpenRaftObservation::capture(clients).await;
let last_observation = capture_openraft_cluster_snapshot(clients).await;
if let Some(leader) = last_observation.agreed_leader(different_from) {
return Ok(leader);
@ -224,7 +138,7 @@ pub async fn wait_for_membership(
let deadline = Instant::now() + timeout;
loop {
let last_observation = OpenRaftObservation::capture(clients).await;
let last_observation = capture_openraft_cluster_snapshot(clients).await;
if last_observation.all_voters_match(expected_voters) {
return Ok(());
@ -251,9 +165,9 @@ pub async fn wait_for_replication(
let deadline = Instant::now() + timeout;
loop {
let last_observation = OpenRaftObservation::capture(clients).await;
let last_observation = capture_openraft_cluster_snapshot(clients).await;
if last_observation.all_kv_match(expected) {
if last_observation.all_kv_match(expected, &FULL_VOTER_SET) {
return Ok(());
}
@ -269,6 +183,58 @@ pub async fn wait_for_replication(
}
}
/// Waits until the observer reports one agreed leader.
pub async fn wait_for_observed_leader(
handle: &ObservationHandle<OpenRaftClusterObserver>,
timeout: Duration,
different_from: Option<u64>,
) -> Result<u64, OpenRaftClusterError> {
let snapshot =
wait_for_observed_snapshot(handle, timeout, "observed leader agreement", |snapshot| {
snapshot.agreed_leader(different_from).is_some()
})
.await?;
snapshot
.value
.agreed_leader(different_from)
.ok_or(OpenRaftClusterError::MissingObservation)
}
/// Waits until the observer reports the expected voter set on every node.
pub async fn wait_for_observed_membership(
handle: &ObservationHandle<OpenRaftClusterObserver>,
expected_voters: &BTreeSet<u64>,
timeout: Duration,
) -> Result<(), OpenRaftClusterError> {
wait_for_observed_snapshot(
handle,
timeout,
"observed membership convergence",
|snapshot| snapshot.all_voters_match(expected_voters),
)
.await?;
Ok(())
}
/// Waits until the observer reports the full replicated key set.
pub async fn wait_for_observed_replication(
handle: &ObservationHandle<OpenRaftClusterObserver>,
expected: &BTreeMap<String, String>,
timeout: Duration,
) -> Result<(), OpenRaftClusterError> {
wait_for_observed_snapshot(
handle,
timeout,
"observed replicated state convergence",
|snapshot| snapshot.all_kv_match(expected, &FULL_VOTER_SET),
)
.await?;
Ok(())
}
/// Resolves the client handle that currently identifies as `node_id`.
pub async fn resolve_client_for_node(
clients: &[OpenRaftKvClient],
@ -323,3 +289,33 @@ pub fn expected_kv(prefix: &str, total_writes: usize) -> BTreeMap<String, String
.map(|index| (format!("{prefix}-{index}"), format!("value-{index}")))
.collect()
}
async fn wait_for_observed_snapshot(
handle: &ObservationHandle<OpenRaftClusterObserver>,
timeout: Duration,
action: &'static str,
matches: impl Fn(&OpenRaftClusterSnapshot) -> bool,
) -> Result<ObservationSnapshot<OpenRaftClusterSnapshot>, OpenRaftClusterError> {
let deadline = Instant::now() + timeout;
let mut last_summary = "no state observed yet".to_owned();
loop {
if let Some(snapshot) = handle.latest_snapshot() {
last_summary = snapshot.value.summary();
if matches(&snapshot.value) {
return Ok(snapshot);
}
}
if Instant::now() >= deadline {
return Err(OpenRaftClusterError::Timeout {
action,
timeout,
last_observation: last_summary,
});
}
sleep(POLL_INTERVAL).await;
}
}