feat(examples): observe openraft manual cluster state

This commit is contained in:
andrussal 2026-04-12 11:50:13 +02:00
parent abb962b910
commit bc287c85e2
4 changed files with 133 additions and 54 deletions

View File

@ -1,15 +1,19 @@
use std::time::Duration; use std::{sync::Arc, time::Duration};
use anyhow::{Context as _, Result, anyhow}; use anyhow::{Context as _, Result, anyhow};
use openraft_kv_examples::{ use openraft_kv_examples::{
INITIAL_WRITE_BATCH, RAFT_KEY_PREFIX, SECOND_WRITE_BATCH, TOTAL_WRITES, INITIAL_WRITE_BATCH, RAFT_KEY_PREFIX, SECOND_WRITE_BATCH, TOTAL_WRITES,
}; };
use openraft_kv_node::OpenRaftKvClient; use openraft_kv_node::OpenRaftKvClient;
use openraft_kv_runtime_ext::{OpenRaftKvEnv, OpenRaftKvK8sDeployer, OpenRaftKvTopology}; use openraft_kv_runtime_ext::{
use openraft_kv_runtime_workloads::{ OpenRaftClusterObserver, OpenRaftKvEnv, OpenRaftKvK8sDeployer, OpenRaftKvTopology,
OpenRaftMembership, resolve_client_for_node, wait_for_leader, wait_for_membership, OpenRaftManualClusterSourceProvider,
wait_for_replication, write_batch,
}; };
use openraft_kv_runtime_workloads::{
OpenRaftMembership, expected_kv, wait_for_observed_leader, wait_for_observed_membership,
wait_for_observed_replication, write_batch,
};
use testing_framework_core::observation::{ObservationHandle, ObservationRuntime};
use testing_framework_runner_k8s::{ManualCluster, ManualClusterError}; use testing_framework_runner_k8s::{ManualCluster, ManualClusterError};
use tracing::{info, warn}; use tracing::{info, warn};
@ -42,51 +46,76 @@ async fn main() -> Result<()> {
} }
}; };
run_failover(cluster, Duration::from_secs(40)).await run_failover(Arc::new(cluster), Duration::from_secs(40)).await
} }
async fn run_failover(cluster: ManualCluster<OpenRaftKvEnv>, timeout: Duration) -> Result<()> { async fn run_failover(cluster: Arc<ManualCluster<OpenRaftKvEnv>>, timeout: Duration) -> Result<()> {
let mut clients = start_cluster(&cluster).await?; start_cluster(cluster.as_ref()).await?;
clients[0].init_self().await?; let observation_runtime = start_observer(Arc::clone(&cluster)).await?;
let observer = observation_runtime.handle();
let initial_leader = wait_for_leader(&clients, timeout, None).await?; client_for_node(cluster.as_ref(), 0)?.init_self().await?;
let membership = OpenRaftMembership::discover(&clients).await?;
add_learners_and_promote(&clients, initial_leader, &membership, timeout).await?; let initial_leader = wait_for_observed_leader(&observer, timeout, None).await?;
write_initial_batch(&clients, initial_leader, timeout).await?; let membership = current_membership(&observer)?;
restart_leader(&cluster, initial_leader).await?; add_learners_and_promote(
refresh_clients(&cluster, &mut clients)?; cluster.as_ref(),
&observer,
initial_leader,
&membership,
timeout,
)
.await?;
write_initial_batch(cluster.as_ref(), initial_leader).await?;
let new_leader = wait_for_leader(&clients, timeout, Some(initial_leader)).await?; restart_leader(cluster.as_ref(), initial_leader).await?;
write_second_batch(&clients, new_leader, timeout).await?;
let expected = openraft_kv_runtime_workloads::expected_kv(RAFT_KEY_PREFIX, TOTAL_WRITES); let new_leader = wait_for_observed_leader(&observer, timeout, Some(initial_leader)).await?;
wait_for_replication(&clients, &expected, timeout).await?; write_second_batch(cluster.as_ref(), new_leader).await?;
let expected = expected_kv(RAFT_KEY_PREFIX, TOTAL_WRITES);
wait_for_observed_replication(&observer, &expected, timeout).await?;
cluster.stop_all(); cluster.stop_all();
Ok(()) Ok(())
} }
async fn start_cluster(cluster: &ManualCluster<OpenRaftKvEnv>) -> Result<Vec<OpenRaftKvClient>> { async fn start_cluster(cluster: &ManualCluster<OpenRaftKvEnv>) -> Result<()> {
let node0 = cluster.start_node("node-0").await?.client; cluster.start_node("node-0").await?;
let node1 = cluster.start_node("node-1").await?.client; cluster.start_node("node-1").await?;
let node2 = cluster.start_node("node-2").await?.client; cluster.start_node("node-2").await?;
cluster.wait_network_ready().await?; cluster.wait_network_ready().await?;
Ok(vec![node0, node1, node2]) Ok(())
}
async fn start_observer(
cluster: Arc<ManualCluster<OpenRaftKvEnv>>,
) -> Result<ObservationRuntime<OpenRaftClusterObserver>> {
let provider = OpenRaftManualClusterSourceProvider::new(cluster, 3);
ObservationRuntime::start(
provider,
OpenRaftClusterObserver,
OpenRaftClusterObserver::config(),
)
.await
.map_err(anyhow::Error::new)
.context("starting openraft k8s observer")
} }
async fn add_learners_and_promote( async fn add_learners_and_promote(
clients: &[OpenRaftKvClient], cluster: &ManualCluster<OpenRaftKvEnv>,
observer: &ObservationHandle<OpenRaftClusterObserver>,
leader_id: u64, leader_id: u64,
membership: &OpenRaftMembership, membership: &OpenRaftMembership,
timeout: Duration, timeout: Duration,
) -> Result<()> { ) -> Result<()> {
let leader = resolve_client_for_node(clients, leader_id, timeout).await?; let leader = client_for_node(cluster, leader_id)?;
for learner in membership.learner_targets(leader_id) { for learner in membership.learner_targets(leader_id) {
info!( info!(
@ -103,28 +132,22 @@ async fn add_learners_and_promote(
let voter_ids = membership.voter_ids(); let voter_ids = membership.voter_ids();
leader.change_membership(voter_ids.iter().copied()).await?; leader.change_membership(voter_ids.iter().copied()).await?;
wait_for_membership(clients, &voter_ids, timeout).await?; wait_for_observed_membership(observer, &voter_ids, timeout).await?;
Ok(()) Ok(())
} }
async fn write_initial_batch( async fn write_initial_batch(cluster: &ManualCluster<OpenRaftKvEnv>, leader_id: u64) -> Result<()> {
clients: &[OpenRaftKvClient], let leader = client_for_node(cluster, leader_id)?;
leader_id: u64,
timeout: Duration,
) -> Result<()> {
let leader = resolve_client_for_node(clients, leader_id, timeout).await?;
write_batch(&leader, RAFT_KEY_PREFIX, 0, INITIAL_WRITE_BATCH).await?; write_batch(&leader, RAFT_KEY_PREFIX, 0, INITIAL_WRITE_BATCH).await?;
Ok(()) Ok(())
} }
async fn write_second_batch( async fn write_second_batch(cluster: &ManualCluster<OpenRaftKvEnv>, leader_id: u64) -> Result<()> {
clients: &[OpenRaftKvClient], let leader = client_for_node(cluster, leader_id)?;
leader_id: u64,
timeout: Duration,
) -> Result<()> {
let leader = resolve_client_for_node(clients, leader_id, timeout).await?;
write_batch( write_batch(
&leader, &leader,
RAFT_KEY_PREFIX, RAFT_KEY_PREFIX,
@ -146,17 +169,23 @@ async fn restart_leader(cluster: &ManualCluster<OpenRaftKvEnv>, leader_id: u64)
Ok(()) Ok(())
} }
fn refresh_clients( fn current_membership(
cluster: &ManualCluster<OpenRaftKvEnv>, observer: &ObservationHandle<OpenRaftClusterObserver>,
clients: &mut [OpenRaftKvClient], ) -> Result<OpenRaftMembership> {
) -> Result<()> { let snapshot = observer
for (index, slot) in clients.iter_mut().enumerate() { .latest_snapshot()
*slot = cluster .ok_or_else(|| anyhow!("openraft observer has not produced a snapshot yet"))?;
.node_client(&format!("node-{index}"))
.ok_or_else(|| anyhow!("node-{index} client missing after restart"))?;
}
Ok(()) Ok(OpenRaftMembership::from_states(snapshot.value.states()))
}
fn client_for_node(
cluster: &ManualCluster<OpenRaftKvEnv>,
node_id: u64,
) -> Result<OpenRaftKvClient> {
cluster
.node_client(&format!("node-{node_id}"))
.ok_or_else(|| anyhow!("node-{node_id} client missing"))
} }
fn k8s_cluster_unavailable(message: &str) -> bool { fn k8s_cluster_unavailable(message: &str) -> bool {

View File

@ -1,5 +1,6 @@
use std::{ use std::{
collections::{BTreeMap, BTreeSet}, collections::{BTreeMap, BTreeSet},
sync::Arc,
time::Duration, time::Duration,
}; };
@ -11,6 +12,7 @@ use testing_framework_core::{
}, },
scenario::{Application, DynError, NodeClients}, scenario::{Application, DynError, NodeClients},
}; };
use testing_framework_runner_k8s::ManualCluster;
use crate::OpenRaftKvEnv; use crate::OpenRaftKvEnv;
@ -190,6 +192,47 @@ pub fn openraft_cluster_source_provider(
)))) ))))
} }
/// Dynamic source provider backed by a manual cluster.
///
/// This keeps observation aligned with the latest client handles after manual
/// node restarts.
#[derive(Clone)]
pub struct OpenRaftManualClusterSourceProvider {
cluster: Arc<ManualCluster<OpenRaftKvEnv>>,
node_names: Vec<String>,
}
impl OpenRaftManualClusterSourceProvider {
/// Builds a provider for the fixed node names used by the OpenRaft
/// examples.
#[must_use]
pub fn new(cluster: Arc<ManualCluster<OpenRaftKvEnv>>, node_count: usize) -> Self {
Self {
cluster,
node_names: (0..node_count)
.map(|index| format!("node-{index}"))
.collect(),
}
}
}
#[async_trait]
impl testing_framework_core::observation::SourceProvider<OpenRaftKvClient>
for OpenRaftManualClusterSourceProvider
{
async fn sources(&self) -> Result<Vec<ObservedSource<OpenRaftKvClient>>, DynError> {
Ok(self
.node_names
.iter()
.filter_map(|name| {
self.cluster
.node_client(name)
.map(|client| ObservedSource::new(name, client))
})
.collect())
}
}
fn named_sources(clients: Vec<OpenRaftKvClient>) -> Vec<ObservedSource<OpenRaftKvClient>> { fn named_sources(clients: Vec<OpenRaftKvClient>) -> Vec<ObservedSource<OpenRaftKvClient>> {
clients clients
.into_iter() .into_iter()

View File

@ -9,6 +9,6 @@ pub use failover::OpenRaftKvFailoverWorkload;
/// Shared cluster helpers used by the OpenRaft workload and manual k8s example. /// Shared cluster helpers used by the OpenRaft workload and manual k8s example.
pub use support::{ pub use support::{
FULL_VOTER_SET, OpenRaftClusterError, OpenRaftMembership, ensure_cluster_size, expected_kv, FULL_VOTER_SET, OpenRaftClusterError, OpenRaftMembership, ensure_cluster_size, expected_kv,
resolve_client_for_node, wait_for_leader, wait_for_membership, wait_for_replication, resolve_client_for_node, wait_for_leader, wait_for_membership, wait_for_observed_leader,
write_batch, wait_for_observed_membership, wait_for_observed_replication, wait_for_replication, write_batch,
}; };

View File

@ -33,6 +33,15 @@ pub struct OpenRaftMembership {
} }
impl OpenRaftMembership { impl OpenRaftMembership {
/// Builds a membership view from already observed node states.
#[must_use]
pub fn from_states(states: &[OpenRaftKvState]) -> Self {
let mut states = states.to_vec();
states.sort_by_key(|state| state.node_id);
Self { states }
}
/// Reads and sorts the current node states by id. /// Reads and sorts the current node states by id.
pub async fn discover(clients: &[OpenRaftKvClient]) -> Result<Self, OpenRaftClusterError> { pub async fn discover(clients: &[OpenRaftKvClient]) -> Result<Self, OpenRaftClusterError> {
let mut states = Vec::with_capacity(clients.len()); let mut states = Vec::with_capacity(clients.len());
@ -41,9 +50,7 @@ impl OpenRaftMembership {
states.push(client.state().await.map_err(OpenRaftClusterError::Client)?); states.push(client.state().await.map_err(OpenRaftClusterError::Client)?);
} }
states.sort_by_key(|state| state.node_id); Ok(Self::from_states(&states))
Ok(Self { states })
} }
/// Returns the full voter set implied by the discovered nodes. /// Returns the full voter set implied by the discovered nodes.