From 4a503c24a8a52017ca6ec0b1c87e3b37a5224ed1 Mon Sep 17 00:00:00 2001 From: andrussal Date: Sun, 12 Apr 2026 11:50:13 +0200 Subject: [PATCH] feat(examples): observe openraft manual cluster state --- .../examples/src/bin/k8s_failover.rs | 127 +++++++++++------- .../testing/integration/src/observation.rs | 43 ++++++ .../openraft_kv/testing/workloads/src/lib.rs | 4 +- .../testing/workloads/src/support.rs | 13 +- 4 files changed, 133 insertions(+), 54 deletions(-) diff --git a/examples/openraft_kv/examples/src/bin/k8s_failover.rs b/examples/openraft_kv/examples/src/bin/k8s_failover.rs index 38ba53b..0c39fe5 100644 --- a/examples/openraft_kv/examples/src/bin/k8s_failover.rs +++ b/examples/openraft_kv/examples/src/bin/k8s_failover.rs @@ -1,15 +1,19 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use anyhow::{Context as _, Result, anyhow}; use openraft_kv_examples::{ INITIAL_WRITE_BATCH, RAFT_KEY_PREFIX, SECOND_WRITE_BATCH, TOTAL_WRITES, }; use openraft_kv_node::OpenRaftKvClient; -use openraft_kv_runtime_ext::{OpenRaftKvEnv, OpenRaftKvK8sDeployer, OpenRaftKvTopology}; -use openraft_kv_runtime_workloads::{ - OpenRaftMembership, resolve_client_for_node, wait_for_leader, wait_for_membership, - wait_for_replication, write_batch, +use openraft_kv_runtime_ext::{ + OpenRaftClusterObserver, OpenRaftKvEnv, OpenRaftKvK8sDeployer, OpenRaftKvTopology, + OpenRaftManualClusterSourceProvider, }; +use openraft_kv_runtime_workloads::{ + OpenRaftMembership, expected_kv, wait_for_observed_leader, wait_for_observed_membership, + wait_for_observed_replication, write_batch, +}; +use testing_framework_core::observation::{ObservationHandle, ObservationRuntime}; use testing_framework_runner_k8s::{ManualCluster, ManualClusterError}; use tracing::{info, warn}; @@ -42,51 +46,76 @@ async fn main() -> Result<()> { } }; - run_failover(cluster, Duration::from_secs(40)).await + run_failover(Arc::new(cluster), Duration::from_secs(40)).await } -async fn run_failover(cluster: ManualCluster, timeout: Duration) -> Result<()> { - let mut clients = start_cluster(&cluster).await?; +async fn run_failover(cluster: Arc>, timeout: Duration) -> Result<()> { + start_cluster(cluster.as_ref()).await?; - clients[0].init_self().await?; + let observation_runtime = start_observer(Arc::clone(&cluster)).await?; + let observer = observation_runtime.handle(); - let initial_leader = wait_for_leader(&clients, timeout, None).await?; - let membership = OpenRaftMembership::discover(&clients).await?; + client_for_node(cluster.as_ref(), 0)?.init_self().await?; - add_learners_and_promote(&clients, initial_leader, &membership, timeout).await?; - write_initial_batch(&clients, initial_leader, timeout).await?; + let initial_leader = wait_for_observed_leader(&observer, timeout, None).await?; + let membership = current_membership(&observer)?; - restart_leader(&cluster, initial_leader).await?; - refresh_clients(&cluster, &mut clients)?; + add_learners_and_promote( + cluster.as_ref(), + &observer, + initial_leader, + &membership, + timeout, + ) + .await?; + write_initial_batch(cluster.as_ref(), initial_leader).await?; - let new_leader = wait_for_leader(&clients, timeout, Some(initial_leader)).await?; - write_second_batch(&clients, new_leader, timeout).await?; + restart_leader(cluster.as_ref(), initial_leader).await?; - let expected = openraft_kv_runtime_workloads::expected_kv(RAFT_KEY_PREFIX, TOTAL_WRITES); - wait_for_replication(&clients, &expected, timeout).await?; + let new_leader = wait_for_observed_leader(&observer, timeout, Some(initial_leader)).await?; + write_second_batch(cluster.as_ref(), new_leader).await?; + + let expected = expected_kv(RAFT_KEY_PREFIX, TOTAL_WRITES); + wait_for_observed_replication(&observer, &expected, timeout).await?; cluster.stop_all(); Ok(()) } -async fn start_cluster(cluster: &ManualCluster) -> Result> { - let node0 = cluster.start_node("node-0").await?.client; - let node1 = cluster.start_node("node-1").await?.client; - let node2 = cluster.start_node("node-2").await?.client; +async fn start_cluster(cluster: &ManualCluster) -> Result<()> { + cluster.start_node("node-0").await?; + cluster.start_node("node-1").await?; + cluster.start_node("node-2").await?; cluster.wait_network_ready().await?; - Ok(vec![node0, node1, node2]) + Ok(()) +} + +async fn start_observer( + cluster: Arc>, +) -> Result> { + let provider = OpenRaftManualClusterSourceProvider::new(cluster, 3); + + ObservationRuntime::start( + provider, + OpenRaftClusterObserver, + OpenRaftClusterObserver::config(), + ) + .await + .map_err(anyhow::Error::new) + .context("starting openraft k8s observer") } async fn add_learners_and_promote( - clients: &[OpenRaftKvClient], + cluster: &ManualCluster, + observer: &ObservationHandle, leader_id: u64, membership: &OpenRaftMembership, timeout: Duration, ) -> Result<()> { - let leader = resolve_client_for_node(clients, leader_id, timeout).await?; + let leader = client_for_node(cluster, leader_id)?; for learner in membership.learner_targets(leader_id) { info!( @@ -103,28 +132,22 @@ async fn add_learners_and_promote( let voter_ids = membership.voter_ids(); leader.change_membership(voter_ids.iter().copied()).await?; - wait_for_membership(clients, &voter_ids, timeout).await?; + wait_for_observed_membership(observer, &voter_ids, timeout).await?; Ok(()) } -async fn write_initial_batch( - clients: &[OpenRaftKvClient], - leader_id: u64, - timeout: Duration, -) -> Result<()> { - let leader = resolve_client_for_node(clients, leader_id, timeout).await?; +async fn write_initial_batch(cluster: &ManualCluster, leader_id: u64) -> Result<()> { + let leader = client_for_node(cluster, leader_id)?; + write_batch(&leader, RAFT_KEY_PREFIX, 0, INITIAL_WRITE_BATCH).await?; Ok(()) } -async fn write_second_batch( - clients: &[OpenRaftKvClient], - leader_id: u64, - timeout: Duration, -) -> Result<()> { - let leader = resolve_client_for_node(clients, leader_id, timeout).await?; +async fn write_second_batch(cluster: &ManualCluster, leader_id: u64) -> Result<()> { + let leader = client_for_node(cluster, leader_id)?; + write_batch( &leader, RAFT_KEY_PREFIX, @@ -146,17 +169,23 @@ async fn restart_leader(cluster: &ManualCluster, leader_id: u64) Ok(()) } -fn refresh_clients( - cluster: &ManualCluster, - clients: &mut [OpenRaftKvClient], -) -> Result<()> { - for (index, slot) in clients.iter_mut().enumerate() { - *slot = cluster - .node_client(&format!("node-{index}")) - .ok_or_else(|| anyhow!("node-{index} client missing after restart"))?; - } +fn current_membership( + observer: &ObservationHandle, +) -> Result { + let snapshot = observer + .latest_snapshot() + .ok_or_else(|| anyhow!("openraft observer has not produced a snapshot yet"))?; - Ok(()) + Ok(OpenRaftMembership::from_states(snapshot.value.states())) +} + +fn client_for_node( + cluster: &ManualCluster, + node_id: u64, +) -> Result { + cluster + .node_client(&format!("node-{node_id}")) + .ok_or_else(|| anyhow!("node-{node_id} client missing")) } fn k8s_cluster_unavailable(message: &str) -> bool { diff --git a/examples/openraft_kv/testing/integration/src/observation.rs b/examples/openraft_kv/testing/integration/src/observation.rs index d97f74e..04e3373 100644 --- a/examples/openraft_kv/testing/integration/src/observation.rs +++ b/examples/openraft_kv/testing/integration/src/observation.rs @@ -1,5 +1,6 @@ use std::{ collections::{BTreeMap, BTreeSet}, + sync::Arc, time::Duration, }; @@ -11,6 +12,7 @@ use testing_framework_core::{ }, scenario::{Application, DynError, NodeClients}, }; +use testing_framework_runner_k8s::ManualCluster; use crate::OpenRaftKvEnv; @@ -190,6 +192,47 @@ pub fn openraft_cluster_source_provider( )))) } +/// Dynamic source provider backed by a manual cluster. +/// +/// This keeps observation aligned with the latest client handles after manual +/// node restarts. +#[derive(Clone)] +pub struct OpenRaftManualClusterSourceProvider { + cluster: Arc>, + node_names: Vec, +} + +impl OpenRaftManualClusterSourceProvider { + /// Builds a provider for the fixed node names used by the OpenRaft + /// examples. + #[must_use] + pub fn new(cluster: Arc>, node_count: usize) -> Self { + Self { + cluster, + node_names: (0..node_count) + .map(|index| format!("node-{index}")) + .collect(), + } + } +} + +#[async_trait] +impl testing_framework_core::observation::SourceProvider + for OpenRaftManualClusterSourceProvider +{ + async fn sources(&self) -> Result>, DynError> { + Ok(self + .node_names + .iter() + .filter_map(|name| { + self.cluster + .node_client(name) + .map(|client| ObservedSource::new(name, client)) + }) + .collect()) + } +} + fn named_sources(clients: Vec) -> Vec> { clients .into_iter() diff --git a/examples/openraft_kv/testing/workloads/src/lib.rs b/examples/openraft_kv/testing/workloads/src/lib.rs index f55a906..02bd5a8 100644 --- a/examples/openraft_kv/testing/workloads/src/lib.rs +++ b/examples/openraft_kv/testing/workloads/src/lib.rs @@ -9,6 +9,6 @@ pub use failover::OpenRaftKvFailoverWorkload; /// Shared cluster helpers used by the OpenRaft workload and manual k8s example. pub use support::{ FULL_VOTER_SET, OpenRaftClusterError, OpenRaftMembership, ensure_cluster_size, expected_kv, - resolve_client_for_node, wait_for_leader, wait_for_membership, wait_for_replication, - write_batch, + resolve_client_for_node, wait_for_leader, wait_for_membership, wait_for_observed_leader, + wait_for_observed_membership, wait_for_observed_replication, wait_for_replication, write_batch, }; diff --git a/examples/openraft_kv/testing/workloads/src/support.rs b/examples/openraft_kv/testing/workloads/src/support.rs index facade6..268a28f 100644 --- a/examples/openraft_kv/testing/workloads/src/support.rs +++ b/examples/openraft_kv/testing/workloads/src/support.rs @@ -33,6 +33,15 @@ pub struct OpenRaftMembership { } impl OpenRaftMembership { + /// Builds a membership view from already observed node states. + #[must_use] + pub fn from_states(states: &[OpenRaftKvState]) -> Self { + let mut states = states.to_vec(); + states.sort_by_key(|state| state.node_id); + + Self { states } + } + /// Reads and sorts the current node states by id. pub async fn discover(clients: &[OpenRaftKvClient]) -> Result { let mut states = Vec::with_capacity(clients.len()); @@ -41,9 +50,7 @@ impl OpenRaftMembership { states.push(client.state().await.map_err(OpenRaftClusterError::Client)?); } - states.sort_by_key(|state| state.node_id); - - Ok(Self { states }) + Ok(Self::from_states(&states)) } /// Returns the full voter set implied by the discovered nodes.