From fc2156265e6835e660c92104ca8097c30d6cd7bc Mon Sep 17 00:00:00 2001 From: andrussal Date: Sun, 12 Apr 2026 11:40:35 +0200 Subject: [PATCH] feat(examples): observe openraft scenario state --- Cargo.lock | 1 + examples/openraft_kv/examples/src/lib.rs | 1 + .../testing/integration/Cargo.toml | 1 + .../testing/integration/src/lib.rs | 2 + .../testing/integration/src/observation.rs | 219 ++++++++++++++++++ .../testing/integration/src/scenario.rs | 17 +- .../testing/workloads/src/convergence.rs | 13 +- .../testing/workloads/src/failover.rs | 26 ++- .../testing/workloads/src/support.rs | 188 ++++++++------- 9 files changed, 355 insertions(+), 113 deletions(-) create mode 100644 examples/openraft_kv/testing/integration/src/observation.rs diff --git a/Cargo.lock b/Cargo.lock index c7f4a48..bebd639 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1804,6 +1804,7 @@ dependencies = [ name = "openraft-kv-runtime-ext" version = "0.1.0" dependencies = [ + "async-trait", "openraft-kv-node", "reqwest", "testing-framework-core", diff --git a/examples/openraft_kv/examples/src/lib.rs b/examples/openraft_kv/examples/src/lib.rs index 4ab036c..8e19a67 100644 --- a/examples/openraft_kv/examples/src/lib.rs +++ b/examples/openraft_kv/examples/src/lib.rs @@ -21,6 +21,7 @@ pub fn build_failover_scenario( ) -> anyhow::Result> { Ok( OpenRaftKvScenarioBuilder::deployment_with(|deployment| deployment) + .with_cluster_observer() .enable_node_control() .with_run_duration(run_duration) .with_workload( diff --git a/examples/openraft_kv/testing/integration/Cargo.toml b/examples/openraft_kv/testing/integration/Cargo.toml index d5b90e1..ef40e34 100644 --- a/examples/openraft_kv/testing/integration/Cargo.toml +++ b/examples/openraft_kv/testing/integration/Cargo.toml @@ -5,6 +5,7 @@ name = "openraft-kv-runtime-ext" version.workspace = true [dependencies] +async-trait = { workspace = true } openraft-kv-node = { path = "../../openraft-kv-node" } reqwest = { workspace = true } testing-framework-core = { workspace = true } diff --git a/examples/openraft_kv/testing/integration/src/lib.rs b/examples/openraft_kv/testing/integration/src/lib.rs index 27ed847..b68514d 100644 --- a/examples/openraft_kv/testing/integration/src/lib.rs +++ b/examples/openraft_kv/testing/integration/src/lib.rs @@ -2,9 +2,11 @@ mod app; mod compose_env; mod k8s_env; mod local_env; +mod observation; pub mod scenario; pub use app::*; +pub use observation::*; pub use scenario::{OpenRaftKvBuilderExt, OpenRaftKvScenarioBuilder}; /// Local process deployer for the OpenRaft example app. diff --git a/examples/openraft_kv/testing/integration/src/observation.rs b/examples/openraft_kv/testing/integration/src/observation.rs new file mode 100644 index 0000000..d97f74e --- /dev/null +++ b/examples/openraft_kv/testing/integration/src/observation.rs @@ -0,0 +1,219 @@ +use std::{ + collections::{BTreeMap, BTreeSet}, + time::Duration, +}; + +use async_trait::async_trait; +use openraft_kv_node::{OpenRaftKvClient, OpenRaftKvState}; +use testing_framework_core::{ + observation::{ + BoxedSourceProvider, ObservationConfig, ObservedSource, Observer, StaticSourceProvider, + }, + scenario::{Application, DynError, NodeClients}, +}; + +use crate::OpenRaftKvEnv; + +const OBSERVATION_INTERVAL: Duration = Duration::from_millis(250); +const OBSERVATION_HISTORY_LIMIT: usize = 16; + +/// Materialized OpenRaft cluster state built from the latest node polls. +#[derive(Clone, Debug, Default)] +pub struct OpenRaftClusterSnapshot { + states: Vec, + failures: Vec, +} + +impl OpenRaftClusterSnapshot { + /// Returns the successfully observed node states sorted by node id. + #[must_use] + pub fn states(&self) -> &[OpenRaftKvState] { + &self.states + } + + /// Returns `true` when the snapshot contains no successful node states. + #[must_use] + pub fn is_empty(&self) -> bool { + self.states.is_empty() + } + + /// Returns the unique observed leader when all responding nodes agree. + #[must_use] + pub fn agreed_leader(&self, different_from: Option) -> Option { + let observed = self + .states + .iter() + .filter_map(|state| state.current_leader) + .collect::>(); + + let leader = observed.iter().next().copied()?; + + (observed.len() == 1 && different_from != Some(leader)).then_some(leader) + } + + /// Returns `true` when every observed node reports the expected voter set. + #[must_use] + pub fn all_voters_match(&self, expected_voters: &BTreeSet) -> bool { + !self.states.is_empty() + && self.failures.is_empty() + && self.states.iter().all(|state| { + state.voters.iter().copied().collect::>() == *expected_voters + }) + } + + /// Returns `true` when every observed node exposes the expected replicated + /// key/value data. + #[must_use] + pub fn all_kv_match( + &self, + expected: &BTreeMap, + full_voter_set: &[u64], + ) -> bool { + !self.states.is_empty() + && self.failures.is_empty() + && self.states.iter().all(|state| { + state.current_leader.is_some() + && state.voters == full_voter_set + && expected + .iter() + .all(|(key, value)| state.kv.get(key) == Some(value)) + }) + } + + /// Returns a concise summary for timeout and validation errors. + #[must_use] + pub fn summary(&self) -> String { + let mut lines = self + .states + .iter() + .map(|state| { + format!( + "node={} leader={:?} voters={:?} keys={}", + state.node_id, + state.current_leader, + state.voters, + state.kv.len() + ) + }) + .collect::>(); + + lines.extend(self.failures.iter().map(OpenRaftSourceFailure::summary)); + + if lines.is_empty() { + return "no state observed yet".to_owned(); + } + + lines.join("; ") + } +} + +/// One failed source read captured during an observation cycle. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct OpenRaftSourceFailure { + source_name: String, + message: String, +} + +impl OpenRaftSourceFailure { + fn new(source_name: &str, message: &str) -> Self { + Self { + source_name: source_name.to_owned(), + message: message.to_owned(), + } + } + + fn summary(&self) -> String { + format!("source={} error={}", self.source_name, self.message) + } +} + +/// Observer that keeps the latest per-node OpenRaft state. +#[derive(Clone, Debug, Default)] +pub struct OpenRaftClusterObserver; + +impl OpenRaftClusterObserver { + /// Default runtime configuration for the OpenRaft example observer. + #[must_use] + pub fn config() -> ObservationConfig { + ObservationConfig { + interval: OBSERVATION_INTERVAL, + history_limit: OBSERVATION_HISTORY_LIMIT, + } + } +} + +/// Captures one best-effort OpenRaft cluster snapshot from the provided node +/// clients. +pub async fn capture_openraft_cluster_snapshot( + clients: &[OpenRaftKvClient], +) -> OpenRaftClusterSnapshot { + capture_cluster_snapshot(&named_sources(clients.to_vec())).await +} + +#[async_trait] +impl Observer for OpenRaftClusterObserver { + type Source = OpenRaftKvClient; + type State = OpenRaftClusterSnapshot; + type Snapshot = OpenRaftClusterSnapshot; + type Event = (); + + async fn init( + &self, + sources: &[ObservedSource], + ) -> Result { + Ok(capture_cluster_snapshot(sources).await) + } + + async fn poll( + &self, + sources: &[ObservedSource], + state: &mut Self::State, + ) -> Result, DynError> { + *state = capture_cluster_snapshot(sources).await; + + Ok(Vec::new()) + } + + fn snapshot(&self, state: &Self::State) -> Self::Snapshot { + state.clone() + } +} + +/// Builds the fixed source provider used by the scenario-based OpenRaft +/// examples. +pub fn openraft_cluster_source_provider( + _deployment: &::Deployment, + node_clients: NodeClients, +) -> Result, DynError> { + Ok(Box::new(StaticSourceProvider::new(named_sources( + node_clients.snapshot(), + )))) +} + +fn named_sources(clients: Vec) -> Vec> { + clients + .into_iter() + .enumerate() + .map(|(index, client)| ObservedSource::new(&format!("node-{index}"), client)) + .collect() +} + +async fn capture_cluster_snapshot( + sources: &[ObservedSource], +) -> OpenRaftClusterSnapshot { + let mut states = Vec::with_capacity(sources.len()); + let mut failures = Vec::new(); + + for source in sources { + match source.source.state().await { + Ok(state) => states.push(state), + Err(error) => { + failures.push(OpenRaftSourceFailure::new(&source.name, &error.to_string())) + } + } + } + + states.sort_by_key(|state| state.node_id); + + OpenRaftClusterSnapshot { states, failures } +} diff --git a/examples/openraft_kv/testing/integration/src/scenario.rs b/examples/openraft_kv/testing/integration/src/scenario.rs index eca27b7..d2de604 100644 --- a/examples/openraft_kv/testing/integration/src/scenario.rs +++ b/examples/openraft_kv/testing/integration/src/scenario.rs @@ -1,6 +1,8 @@ -use testing_framework_core::scenario::ScenarioBuilder; +use testing_framework_core::scenario::{CoreBuilderExt, ScenarioBuilder}; -use crate::{OpenRaftKvEnv, OpenRaftKvTopology}; +use crate::{ + OpenRaftClusterObserver, OpenRaftKvEnv, OpenRaftKvTopology, openraft_cluster_source_provider, +}; /// Scenario builder alias used by the OpenRaft example binaries. pub type OpenRaftKvScenarioBuilder = ScenarioBuilder; @@ -10,10 +12,21 @@ pub trait OpenRaftKvBuilderExt: Sized { /// Starts from the default three-node deployment and lets callers adjust /// it. fn deployment_with(f: impl FnOnce(OpenRaftKvTopology) -> OpenRaftKvTopology) -> Self; + + /// Attaches the default OpenRaft cluster observer to the scenario. + fn with_cluster_observer(self) -> Self; } impl OpenRaftKvBuilderExt for OpenRaftKvScenarioBuilder { fn deployment_with(f: impl FnOnce(OpenRaftKvTopology) -> OpenRaftKvTopology) -> Self { OpenRaftKvScenarioBuilder::with_deployment(f(OpenRaftKvTopology::new(3))) } + + fn with_cluster_observer(self) -> Self { + self.with_observer( + OpenRaftClusterObserver, + openraft_cluster_source_provider, + OpenRaftClusterObserver::config(), + ) + } } diff --git a/examples/openraft_kv/testing/workloads/src/convergence.rs b/examples/openraft_kv/testing/workloads/src/convergence.rs index 9e88870..abd9ec1 100644 --- a/examples/openraft_kv/testing/workloads/src/convergence.rs +++ b/examples/openraft_kv/testing/workloads/src/convergence.rs @@ -1,10 +1,13 @@ use std::time::Duration; use async_trait::async_trait; -use openraft_kv_runtime_ext::OpenRaftKvEnv; -use testing_framework_core::scenario::{DynError, Expectation, RunContext}; +use openraft_kv_runtime_ext::{OpenRaftClusterObserver, OpenRaftKvEnv}; +use testing_framework_core::{ + observation::ObservationHandle, + scenario::{DynError, Expectation, RunContext}, +}; -use crate::support::{expected_kv, wait_for_replication}; +use crate::support::{expected_kv, wait_for_observed_replication}; /// Expectation that waits for the full voter set and the writes from this run /// to converge on every node. @@ -49,9 +52,9 @@ impl Expectation for OpenRaftKvConverges { async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> { let expected = expected_kv(&self.key_prefix, self.total_writes); - let clients = ctx.node_clients().snapshot(); + let observer = ctx.require_extension::>()?; - wait_for_replication(&clients, &expected, self.timeout).await?; + wait_for_observed_replication(&observer, &expected, self.timeout).await?; Ok(()) } diff --git a/examples/openraft_kv/testing/workloads/src/failover.rs b/examples/openraft_kv/testing/workloads/src/failover.rs index 89482c7..c9570bd 100644 --- a/examples/openraft_kv/testing/workloads/src/failover.rs +++ b/examples/openraft_kv/testing/workloads/src/failover.rs @@ -2,13 +2,16 @@ use std::time::Duration; use async_trait::async_trait; use openraft_kv_node::OpenRaftKvClient; -use openraft_kv_runtime_ext::OpenRaftKvEnv; -use testing_framework_core::scenario::{DynError, RunContext, Workload}; +use openraft_kv_runtime_ext::{OpenRaftClusterObserver, OpenRaftKvEnv}; +use testing_framework_core::{ + observation::ObservationHandle, + scenario::{DynError, RunContext, Workload}, +}; use tracing::info; use crate::support::{ - OpenRaftMembership, ensure_cluster_size, resolve_client_for_node, wait_for_leader, - wait_for_membership, write_batch, + OpenRaftMembership, ensure_cluster_size, resolve_client_for_node, wait_for_observed_leader, + wait_for_observed_membership, write_batch, }; /// Workload that bootstraps the cluster, expands it to three voters, writes one @@ -77,19 +80,21 @@ impl Workload for OpenRaftKvFailoverWorkload { async fn start(&self, ctx: &RunContext) -> Result<(), DynError> { let clients = ctx.node_clients().snapshot(); + let observer = ctx.require_extension::>()?; + ensure_cluster_size(&clients, 3)?; self.bootstrap_cluster(&clients).await?; - let initial_leader = wait_for_leader(&clients, self.timeout, None).await?; + let initial_leader = wait_for_observed_leader(&observer, self.timeout, None).await?; let membership = OpenRaftMembership::discover(&clients).await?; - self.promote_cluster(&clients, initial_leader, &membership) + self.promote_cluster(&observer, &clients, initial_leader, &membership) .await?; self.write_initial_batch(&clients, initial_leader).await?; let new_leader = self - .restart_leader_and_wait_for_failover(ctx, &clients, initial_leader) + .restart_leader_and_wait_for_failover(ctx, &observer, initial_leader) .await?; self.write_second_batch(&clients, new_leader).await?; @@ -108,6 +113,7 @@ impl OpenRaftKvFailoverWorkload { async fn promote_cluster( &self, + observer: &ObservationHandle, clients: &[OpenRaftKvClient], leader_id: u64, membership: &OpenRaftMembership, @@ -129,7 +135,7 @@ impl OpenRaftKvFailoverWorkload { let voter_ids = membership.voter_ids(); leader.change_membership(voter_ids.iter().copied()).await?; - wait_for_membership(clients, &voter_ids, self.timeout).await?; + wait_for_observed_membership(observer, &voter_ids, self.timeout).await?; Ok(()) } @@ -154,7 +160,7 @@ impl OpenRaftKvFailoverWorkload { async fn restart_leader_and_wait_for_failover( &self, ctx: &RunContext, - clients: &[OpenRaftKvClient], + observer: &ObservationHandle, leader_id: u64, ) -> Result { let Some(control) = ctx.node_control() else { @@ -166,7 +172,7 @@ impl OpenRaftKvFailoverWorkload { control.restart_node(&leader_name).await?; - let new_leader = wait_for_leader(clients, self.timeout, Some(leader_id)).await?; + let new_leader = wait_for_observed_leader(observer, self.timeout, Some(leader_id)).await?; info!( old_leader = leader_id, diff --git a/examples/openraft_kv/testing/workloads/src/support.rs b/examples/openraft_kv/testing/workloads/src/support.rs index 489575a..facade6 100644 --- a/examples/openraft_kv/testing/workloads/src/support.rs +++ b/examples/openraft_kv/testing/workloads/src/support.rs @@ -4,6 +4,10 @@ use std::{ }; use openraft_kv_node::{OpenRaftKvClient, OpenRaftKvState}; +use openraft_kv_runtime_ext::{ + OpenRaftClusterObserver, OpenRaftClusterSnapshot, capture_openraft_cluster_snapshot, +}; +use testing_framework_core::observation::{ObservationHandle, ObservationSnapshot}; use thiserror::Error; use tokio::time::{Instant, sleep}; @@ -62,98 +66,6 @@ impl OpenRaftMembership { } } -/// One poll result across all known clients. -#[derive(Clone, Debug, Default)] -pub struct OpenRaftObservation { - states: Vec, - failures: Vec, -} - -impl OpenRaftObservation { - /// Captures one best-effort view of the cluster. - pub async fn capture(clients: &[OpenRaftKvClient]) -> Self { - let mut states = Vec::with_capacity(clients.len()); - let mut failures = Vec::new(); - - for (index, client) in clients.iter().enumerate() { - match client.state().await { - Ok(state) => states.push(state), - Err(error) => failures.push(format!("client_index={index} error={error}")), - } - } - - states.sort_by_key(|state| state.node_id); - - Self { states, failures } - } - - /// Returns the unique observed leader when all responding nodes agree. - #[must_use] - pub fn agreed_leader(&self, different_from: Option) -> Option { - let observed = self - .states - .iter() - .filter_map(|state| state.current_leader) - .collect::>(); - - let leader = observed.iter().next().copied()?; - - (observed.len() == 1 && different_from != Some(leader)).then_some(leader) - } - - /// Returns `true` when every responding node reports the expected voter - /// set. - #[must_use] - pub fn all_voters_match(&self, expected_voters: &BTreeSet) -> bool { - !self.states.is_empty() - && self.failures.is_empty() - && self.states.iter().all(|state| { - state.voters.iter().copied().collect::>() == *expected_voters - }) - } - - /// Returns `true` when every responding node exposes the expected key/value - /// data. - #[must_use] - pub fn all_kv_match(&self, expected: &BTreeMap) -> bool { - !self.states.is_empty() - && self.failures.is_empty() - && self.states.iter().all(|state| { - state.current_leader.is_some() - && state.voters == FULL_VOTER_SET - && expected - .iter() - .all(|(key, value)| state.kv.get(key) == Some(value)) - }) - } - - /// Returns a concise summary for timeout errors. - #[must_use] - pub fn summary(&self) -> String { - let mut lines = self - .states - .iter() - .map(|state| { - format!( - "node={} leader={:?} voters={:?} keys={}", - state.node_id, - state.current_leader, - state.voters, - state.kv.len() - ) - }) - .collect::>(); - - lines.extend(self.failures.iter().cloned()); - - if lines.is_empty() { - return "no state observed yet".to_owned(); - } - - lines.join("; ") - } -} - /// Errors raised by the OpenRaft example cluster helpers. #[derive(Debug, Error)] pub enum OpenRaftClusterError { @@ -161,6 +73,8 @@ pub enum OpenRaftClusterError { InsufficientClients { expected: usize, actual: usize }, #[error("failed to query openraft node state: {0}")] Client(#[source] anyhow::Error), + #[error("openraft cluster observation is not available yet")] + MissingObservation, #[error( "timed out waiting for {action} after {timeout:?}; last observation: {last_observation}" )] @@ -197,7 +111,7 @@ pub async fn wait_for_leader( let deadline = Instant::now() + timeout; loop { - let last_observation = OpenRaftObservation::capture(clients).await; + let last_observation = capture_openraft_cluster_snapshot(clients).await; if let Some(leader) = last_observation.agreed_leader(different_from) { return Ok(leader); @@ -224,7 +138,7 @@ pub async fn wait_for_membership( let deadline = Instant::now() + timeout; loop { - let last_observation = OpenRaftObservation::capture(clients).await; + let last_observation = capture_openraft_cluster_snapshot(clients).await; if last_observation.all_voters_match(expected_voters) { return Ok(()); @@ -251,9 +165,9 @@ pub async fn wait_for_replication( let deadline = Instant::now() + timeout; loop { - let last_observation = OpenRaftObservation::capture(clients).await; + let last_observation = capture_openraft_cluster_snapshot(clients).await; - if last_observation.all_kv_match(expected) { + if last_observation.all_kv_match(expected, &FULL_VOTER_SET) { return Ok(()); } @@ -269,6 +183,58 @@ pub async fn wait_for_replication( } } +/// Waits until the observer reports one agreed leader. +pub async fn wait_for_observed_leader( + handle: &ObservationHandle, + timeout: Duration, + different_from: Option, +) -> Result { + let snapshot = + wait_for_observed_snapshot(handle, timeout, "observed leader agreement", |snapshot| { + snapshot.agreed_leader(different_from).is_some() + }) + .await?; + + snapshot + .value + .agreed_leader(different_from) + .ok_or(OpenRaftClusterError::MissingObservation) +} + +/// Waits until the observer reports the expected voter set on every node. +pub async fn wait_for_observed_membership( + handle: &ObservationHandle, + expected_voters: &BTreeSet, + timeout: Duration, +) -> Result<(), OpenRaftClusterError> { + wait_for_observed_snapshot( + handle, + timeout, + "observed membership convergence", + |snapshot| snapshot.all_voters_match(expected_voters), + ) + .await?; + + Ok(()) +} + +/// Waits until the observer reports the full replicated key set. +pub async fn wait_for_observed_replication( + handle: &ObservationHandle, + expected: &BTreeMap, + timeout: Duration, +) -> Result<(), OpenRaftClusterError> { + wait_for_observed_snapshot( + handle, + timeout, + "observed replicated state convergence", + |snapshot| snapshot.all_kv_match(expected, &FULL_VOTER_SET), + ) + .await?; + + Ok(()) +} + /// Resolves the client handle that currently identifies as `node_id`. pub async fn resolve_client_for_node( clients: &[OpenRaftKvClient], @@ -323,3 +289,33 @@ pub fn expected_kv(prefix: &str, total_writes: usize) -> BTreeMap, + timeout: Duration, + action: &'static str, + matches: impl Fn(&OpenRaftClusterSnapshot) -> bool, +) -> Result, OpenRaftClusterError> { + let deadline = Instant::now() + timeout; + let mut last_summary = "no state observed yet".to_owned(); + + loop { + if let Some(snapshot) = handle.latest_snapshot() { + last_summary = snapshot.value.summary(); + + if matches(&snapshot.value) { + return Ok(snapshot); + } + } + + if Instant::now() >= deadline { + return Err(OpenRaftClusterError::Timeout { + action, + timeout, + last_observation: last_summary, + }); + } + + sleep(POLL_INTERVAL).await; + } +}