diff --git a/.cargo-deny.toml b/.cargo-deny.toml index 687e5e2..13fb949 100644 --- a/.cargo-deny.toml +++ b/.cargo-deny.toml @@ -6,7 +6,7 @@ exclude-dev = true no-default-features = true [advisories] -ignore = [] +ignore = ["RUSTSEC-2026-0097"] yanked = "deny" [bans] diff --git a/docs/observation-runtime-plan.md b/docs/observation-runtime-plan.md new file mode 100644 index 0000000..1ebc166 --- /dev/null +++ b/docs/observation-runtime-plan.md @@ -0,0 +1,314 @@ +# Observation Runtime Plan + +## Why this work exists + +TF is good at deployment plumbing. It is weak at continuous observation. + +Today, the same problems are solved repeatedly with custom loops: +- TF block feed logic in Logos +- Cucumber manual-cluster polling loops +- ad hoc catch-up scans for wallet and chain state +- app-local state polling in expectations + +That is the gap this work should close. + +The goal is not a generic "distributed systems DSL". +The goal is one reusable observation runtime that: +- continuously collects data from dynamic sources +- keeps typed materialized state +- exposes both current snapshot and delta/history views +- fits naturally in TF scenarios and Cucumber manual-cluster code + +## Constraints + +### TF constraints +- TF abstractions must stay universal and simple. +- TF must not know app semantics like blocks, wallets, leaders, jobs, or topics. +- TF must remain useful for simple apps such as `openraft_kv`, not only Logos. + +### App constraints +- Apps must be able to build richer abstractions on top of TF. +- Logos must be able to support: + - current block-feed replacement + - fork-aware chain state + - public-peer sync targets + - multi-wallet UTXO tracking +- Apps must be able to adopt this incrementally. + +### Migration constraints +- We do not want a flag-day rewrite. +- Existing loops can coexist with the new runtime until replacements are proven. + +## Non-goals + +This work should not: +- put feed back onto the base `Application` trait +- build app-specific semantics into TF core +- replace filesystem blockchain snapshots used for startup/restore +- force every app to use continuous observation +- introduce a large public abstraction stack that nobody can explain + +## Core idea + +Introduce one TF-level observation runtime. + +That runtime owns: +- source refresh +- scheduling +- polling/ingestion +- bounded history +- latest snapshot caching +- delta publication +- freshness/error tracking +- lifecycle hooks for TF and Cucumber + +Apps own: +- source types +- raw observation logic +- materialized state +- snapshot shape +- delta/event shape +- higher-level projections such as wallet state + +## Public TF surface + +The TF public surface should stay small. + +### `ObservedSource` +A named source instance. + +Used for: +- local node clients +- public peer endpoints +- any other app-owned source type + +### `SourceProvider` +Returns the current source set. + +This must support dynamic source lists because: +- manual cluster nodes come and go +- Cucumber worlds may attach public peers +- node control may restart or replace sources during a run + +### `Observer` +App-owned observation logic. + +It defines: +- `Source` +- `State` +- `Snapshot` +- `Event` + +And it implements: +- `init(...)` +- `poll(...)` +- `snapshot(...)` + +The important boundary is: +- TF owns the runtime +- app code owns materialization + +### `ObservationRuntime` +The engine that: +- starts the loop +- refreshes sources +- calls `poll(...)` +- stores history +- publishes deltas +- updates latest snapshot +- tracks last error and freshness + +### `ObservationHandle` +The read-side interface for workloads, expectations, and Cucumber steps. + +It should expose at least: +- latest snapshot +- delta subscription +- bounded history +- last error + +## Intended shape + +```rust +pub struct ObservedSource { + pub name: String, + pub source: S, +} + +#[async_trait] +pub trait SourceProvider: Send + Sync + 'static { + async fn sources(&self) -> Vec>; +} + +#[async_trait] +pub trait Observer: Send + Sync + 'static { + type Source: Clone + Send + Sync + 'static; + type State: Send + Sync + 'static; + type Snapshot: Clone + Send + Sync + 'static; + type Event: Clone + Send + Sync + 'static; + + async fn init( + &self, + sources: &[ObservedSource], + ) -> Result; + + async fn poll( + &self, + sources: &[ObservedSource], + state: &mut Self::State, + ) -> Result, DynError>; + + fn snapshot(&self, state: &Self::State) -> Self::Snapshot; +} +``` + +This is enough. + +If more helper layers are needed, they should stay internal first. + +## How current use cases fit + +### `openraft_kv` +Use one simple observer. + +- sources: node clients +- state: latest per-node Raft state +- snapshot: sorted node-state view +- events: optional deltas, possibly empty at first + +This is the simplest proving case. +It validates the runtime without dragging in Logos complexity. + +### Logos block feed replacement +Use one shared chain observer. + +- sources: local node clients +- state: + - node heads + - block graph + - heights + - seen headers + - recent history +- snapshot: + - current head/lib/graph summary +- events: + - newly discovered blocks + +This covers both existing Logos feed use cases: +- current snapshot consumers +- delta/subscription consumers + +### Cucumber manual-cluster sync +Use the same observer runtime with a different source set. + +- sources: + - local manual-cluster node clients + - public peer endpoints +- state: + - local consensus views + - public consensus views + - derived majority public target +- snapshot: + - current local and public sync picture + +This removes custom poll/sleep loops from steps. + +### Multi-wallet fork-aware tracking +This should not be a TF concept. + +It should be a Logos projection built on top of the shared chain observer. + +- input: chain observer state +- output: per-header wallet state cache keyed by block header +- property: naturally fork-aware because it follows actual ancestry + +That replaces repeated backward scans from tip with continuous maintained state. + +## Logos layering + +Logos should not put every concern into one giant impl. + +Recommended layering: + +1. **Chain source adapter** + - local node reads + - public peer reads + +2. **Shared chain observer** + - catch-up + - continuous ingestion + - graph/history materialization + +3. **Logos projections** + - head view + - public sync target + - fork graph queries + - wallet state + - tx inclusion helpers + +TF provides the runtime. +Logos provides the domain model built on top. + +## Adoption plan + +### Phase 1: add TF observation runtime +- add `ObservedSource`, `SourceProvider`, `Observer`, `ObservationRuntime`, `ObservationHandle` +- keep the public API small +- no app migrations yet + +### Phase 2: prove it on `openraft_kv` +- add one simple observer over `/state` +- migrate one expectation to use the observation handle +- validate local, compose, and k8s + +### Phase 3: add Logos shared chain observer +- implement it alongside current feed/loops +- do not remove existing consumers yet +- prove snapshot and delta outputs are useful + +### Phase 4: migrate one Logos consumer at a time +Suggested order: +1. fork/head snapshot consumer +2. tx inclusion consumer +3. Cucumber sync-to-public-chain logic +4. wallet/UTXO tracking + +### Phase 5: delete old loops and feed paths +- only after the new runtime has replaced real consumers cleanly + +## Validation gates + +Each phase should have clear checks. + +### Runtime-level +- crate-level `cargo check` +- targeted tests for runtime lifecycle and history retention +- explicit tests for dynamic source refresh + +### App-level +- `openraft_kv`: + - local failover + - compose failover + - k8s failover +- Logos: + - one snapshot consumer migrated + - one delta consumer migrated +- Cucumber: + - one manual-cluster sync path migrated + +## Open questions + +These should stay open until implementation forces a decision: +- whether `ObservationHandle` should expose full history directly or only cursor/subscription access +- how much error/freshness metadata belongs in the generic runtime vs app snapshot types +- whether multiple observers should share one scheduler/runtime instance or simply run independently first + +## Design guardrails + +When implementing this work: +- keep TF public abstractions minimal +- keep app semantics out of TF core +- do not chase a generic testing DSL +- build from reusable blocks, not one-off mega impls +- keep migration incremental +- prefer simple, explainable runtime behavior over clever abstraction diff --git a/testing-framework/core/Cargo.toml b/testing-framework/core/Cargo.toml index 478c6a9..008c5a5 100644 --- a/testing-framework/core/Cargo.toml +++ b/testing-framework/core/Cargo.toml @@ -29,5 +29,5 @@ reqwest = { features = ["json"], workspace = true } serde = { workspace = true } serde_yaml = { workspace = true } thiserror = { workspace = true } -tokio = { features = ["macros", "process", "rt-multi-thread", "time"], workspace = true } +tokio = { features = ["macros", "process", "rt-multi-thread", "sync", "time"], workspace = true } tracing = { workspace = true } diff --git a/testing-framework/core/src/lib.rs b/testing-framework/core/src/lib.rs index 5cbdb97..1085e89 100644 --- a/testing-framework/core/src/lib.rs +++ b/testing-framework/core/src/lib.rs @@ -1,5 +1,6 @@ pub mod cfgsync; pub mod env; +pub mod observation; pub mod runtime; pub mod scenario; pub mod topology; diff --git a/testing-framework/core/src/observation/factory.rs b/testing-framework/core/src/observation/factory.rs new file mode 100644 index 0000000..d50b45d --- /dev/null +++ b/testing-framework/core/src/observation/factory.rs @@ -0,0 +1,161 @@ +use std::{marker::PhantomData, sync::Arc}; + +use async_trait::async_trait; + +use super::{ + ObservationConfig, ObservationHandle, ObservationRuntime, ObservedSource, Observer, + SourceProvider, +}; +use crate::scenario::{ + Application, DynError, NodeClients, PreparedRuntimeExtension, RuntimeExtensionFactory, +}; + +/// Boxed source provider used by observation factories. +pub type BoxedSourceProvider = Box>; + +/// Builds an observation source provider once node clients are available. +pub trait SourceProviderFactory: Send + Sync + 'static { + /// Builds the source provider for one scenario run. + fn build_source_provider( + &self, + deployment: &E::Deployment, + node_clients: NodeClients, + ) -> Result, DynError>; +} + +impl SourceProviderFactory for F +where + E: Application, + S: Clone + Send + Sync + 'static, + F: Fn(&E::Deployment, NodeClients) -> Result, DynError> + + Send + + Sync + + 'static, +{ + fn build_source_provider( + &self, + deployment: &E::Deployment, + node_clients: NodeClients, + ) -> Result, DynError> { + self(deployment, node_clients) + } +} + +/// Fixed source provider for scenario runs with a stable source set. +#[derive(Clone, Debug)] +pub struct StaticSourceProvider { + sources: Vec>, +} + +impl StaticSourceProvider { + /// Builds a provider from a fixed source list. + #[must_use] + pub fn new(sources: Vec>) -> Self { + Self { sources } + } +} + +#[async_trait] +impl SourceProvider for StaticSourceProvider +where + S: Clone + Send + Sync + 'static, +{ + async fn sources(&self) -> Result>, DynError> { + Ok(self.sources.clone()) + } +} + +/// Runtime extension factory that starts one observer and stores its handle in +/// `RunContext`. +pub struct ObservationExtensionFactory { + observer_builder: Arc O + Send + Sync>, + source_provider_factory: Arc>, + config: ObservationConfig, + env_marker: PhantomData, +} + +impl ObservationExtensionFactory { + /// Builds an observation extension factory from builders. + #[must_use] + pub fn from_parts( + observer_builder: impl Fn() -> O + Send + Sync + 'static, + source_provider_factory: impl SourceProviderFactory, + config: ObservationConfig, + ) -> Self { + Self { + observer_builder: Arc::new(observer_builder), + source_provider_factory: Arc::new(source_provider_factory), + config, + env_marker: PhantomData, + } + } +} + +impl ObservationExtensionFactory +where + E: Application, + O: Observer + Clone, +{ + /// Builds an observation extension factory from one clonable observer. + #[must_use] + pub fn new( + observer: O, + source_provider_factory: impl SourceProviderFactory, + config: ObservationConfig, + ) -> Self { + Self::from_parts(move || observer.clone(), source_provider_factory, config) + } +} + +#[async_trait] +impl RuntimeExtensionFactory for ObservationExtensionFactory +where + E: Application, + O: Observer, +{ + async fn prepare( + &self, + deployment: &E::Deployment, + node_clients: NodeClients, + ) -> Result { + let source_provider = self + .source_provider_factory + .build_source_provider(deployment, node_clients)?; + + let observer = (self.observer_builder)(); + let runtime = + ObservationRuntime::start(source_provider, observer, self.config.clone()).await?; + + let (handle, task) = runtime.into_parts(); + + Ok(PreparedRuntimeExtension::from_task(handle, task)) + } +} + +#[async_trait] +impl SourceProvider for Box

+where + S: Clone + Send + Sync + 'static, + P: SourceProvider + ?Sized, +{ + async fn sources(&self) -> Result>, DynError> { + (**self).sources().await + } +} + +#[async_trait] +impl SourceProvider for Arc

+where + S: Clone + Send + Sync + 'static, + P: SourceProvider + ?Sized, +{ + async fn sources(&self) -> Result>, DynError> { + (**self).sources().await + } +} + +impl From> for PreparedRuntimeExtension { + fn from(handle: ObservationHandle) -> Self { + PreparedRuntimeExtension::new(handle) + } +} diff --git a/testing-framework/core/src/observation/mod.rs b/testing-framework/core/src/observation/mod.rs new file mode 100644 index 0000000..de21228 --- /dev/null +++ b/testing-framework/core/src/observation/mod.rs @@ -0,0 +1,503 @@ +//! Generic continuous observation runtime. +//! +//! This module provides the reusable runtime needed by both TF scenarios and +//! manual-cluster consumers such as Cucumber worlds. It does not know any app +//! semantics. Apps provide source types, observation logic, materialized state, +//! snapshots, and delta events. + +mod factory; + +use std::{ + any::type_name, + collections::VecDeque, + sync::Arc, + time::{Duration, SystemTime}, +}; + +use async_trait::async_trait; +pub use factory::{ + BoxedSourceProvider, ObservationExtensionFactory, SourceProviderFactory, StaticSourceProvider, +}; +use parking_lot::Mutex; +use tokio::{ + sync::broadcast, + task::JoinHandle, + time::{MissedTickBehavior, interval}, +}; +use tracing::{debug, info, warn}; + +use crate::scenario::DynError; + +/// Configuration for a background observation runtime. +#[derive(Clone, Debug)] +pub struct ObservationConfig { + /// Time between observation cycles. + pub interval: Duration, + /// Maximum number of non-empty event batches retained in memory. + pub history_limit: usize, +} + +impl Default for ObservationConfig { + fn default() -> Self { + Self { + interval: Duration::from_secs(1), + history_limit: 64, + } + } +} + +/// One named observation source. +#[derive(Clone, Debug)] +pub struct ObservedSource { + /// Human-readable source name used in logs and app-level reporting. + pub name: String, + /// App-owned source handle. + pub source: S, +} + +impl ObservedSource { + /// Builds one named observation source. + #[must_use] + pub fn new(name: &str, source: S) -> Self { + Self { + name: name.to_owned(), + source, + } + } +} + +/// Supplies the current observation source set. +#[async_trait] +pub trait SourceProvider: Send + Sync + 'static { + /// Returns the current source set for the next observation cycle. + async fn sources(&self) -> Result>, DynError>; +} + +/// App-owned observation logic. +#[async_trait] +pub trait Observer: Send + Sync + 'static { + /// App-owned source type. + type Source: Clone + Send + Sync + 'static; + /// App-owned retained materialized state. + type State: Send + Sync + 'static; + /// App-owned current snapshot view. + type Snapshot: Clone + Send + Sync + 'static; + /// App-owned delta event type emitted per cycle. + type Event: Clone + Send + Sync + 'static; + + /// Builds the initial retained state from the current source set. + async fn init(&self, sources: &[ObservedSource]) + -> Result; + + /// Advances retained state by one cycle and returns any new delta events. + async fn poll( + &self, + sources: &[ObservedSource], + state: &mut Self::State, + ) -> Result, DynError>; + + /// Builds the current snapshot view from retained state. + fn snapshot(&self, state: &Self::State) -> Self::Snapshot; +} + +/// One materialized snapshot emitted by the runtime. +#[derive(Clone, Debug)] +pub struct ObservationSnapshot { + /// Monotonic cycle number. + pub cycle: u64, + /// Capture timestamp. + pub observed_at: SystemTime, + /// Number of sources used for this snapshot. + pub source_count: usize, + /// App-owned snapshot payload. + pub value: S, +} + +/// One delta batch emitted by a successful observation cycle. +#[derive(Clone, Debug)] +pub struct ObservationBatch { + /// Monotonic cycle number. + pub cycle: u64, + /// Capture timestamp. + pub observed_at: SystemTime, + /// Number of sources used for this batch. + pub source_count: usize, + /// App-owned delta events discovered in this cycle. + pub events: Vec, +} + +/// Observation runtime failure stage. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum ObservationFailureStage { + /// Source refresh failed before a poll could run. + SourceRefresh, + /// Observer poll failed after sources were refreshed. + Poll, +} + +/// Last failed observation cycle. +#[derive(Clone, Debug)] +pub struct ObservationFailure { + /// Monotonic cycle number. + pub cycle: u64, + /// Failure timestamp. + pub observed_at: SystemTime, + /// Number of sources involved in the failed cycle. + pub source_count: usize, + /// Runtime stage that failed. + pub stage: ObservationFailureStage, + /// Human-readable failure message. + pub message: String, +} + +/// Errors returned while starting an observation runtime. +#[derive(Debug, thiserror::Error)] +pub enum ObservationRuntimeError { + /// The configured interval is invalid. + #[error("observation interval must be greater than zero")] + InvalidInterval, + /// Source discovery failed during runtime startup. + #[error("failed to refresh observation sources during startup: {source}")] + SourceRefresh { + #[source] + source: DynError, + }, + /// Observer state initialization failed during runtime startup. + #[error("failed to initialize observation state: {source}")] + ObserverInit { + #[source] + source: DynError, + }, +} + +/// Read-side handle for one running observer. +pub struct ObservationHandle { + shared: Arc>>, + batches: broadcast::Sender>>, +} + +impl Clone for ObservationHandle { + fn clone(&self) -> Self { + Self { + shared: Arc::clone(&self.shared), + batches: self.batches.clone(), + } + } +} + +impl ObservationHandle { + /// Returns the latest successful snapshot, if one has been produced. + #[must_use] + pub fn latest_snapshot(&self) -> Option> { + self.shared.lock().latest_snapshot.clone() + } + + /// Returns retained non-empty event batches. + #[must_use] + pub fn history(&self) -> Vec>> { + self.shared.lock().history.iter().cloned().collect() + } + + /// Returns the most recent cycle failure, if any. + #[must_use] + pub fn last_error(&self) -> Option { + self.shared.lock().last_error.clone() + } + + /// Subscribes to future non-empty event batches. + #[must_use] + pub fn subscribe(&self) -> broadcast::Receiver>> { + self.batches.subscribe() + } +} + +/// Lifecycle owner for one background observation runtime. +pub struct ObservationRuntime { + handle: ObservationHandle, + task: Option>, +} + +impl ObservationRuntime { + /// Starts one background observation runtime. + pub async fn start

( + provider: P, + observer: O, + config: ObservationConfig, + ) -> Result + where + P: SourceProvider, + { + ensure_positive_interval(config.interval)?; + + let sources = provider + .sources() + .await + .map_err(|source| ObservationRuntimeError::SourceRefresh { source })?; + + let source_count = sources.len(); + let state = observer + .init(&sources) + .await + .map_err(|source| ObservationRuntimeError::ObserverInit { source })?; + + let snapshot = build_snapshot(0, source_count, &observer, &state); + let batches = broadcast::channel(config.history_limit.max(1)).0; + let shared = Arc::new(Mutex::new(SharedObservationState::new(snapshot))); + let handle = ObservationHandle { + shared: Arc::clone(&shared), + batches, + }; + + info!( + observer = type_name::(), + interval_ms = config.interval.as_millis(), + history_limit = config.history_limit, + source_count, + "starting observation runtime" + ); + + let runtime_handle = handle.clone(); + let task = tokio::spawn(run_observation_loop( + provider, + observer, + config, + shared, + runtime_handle.batches.clone(), + state, + )); + + Ok(Self { + handle: runtime_handle, + task: Some(task), + }) + } + + /// Returns a read-side handle for the running observer. + #[must_use] + pub fn handle(&self) -> ObservationHandle { + self.handle.clone() + } + + /// Splits the runtime into its handle and background task. + #[must_use] + pub fn into_parts(mut self) -> (ObservationHandle, JoinHandle<()>) { + let task = self + .task + .take() + .expect("observation runtime task is always present before into_parts"); + + (self.handle.clone(), task) + } + + /// Aborts the background task. + pub fn abort(&mut self) { + if let Some(task) = self.task.take() { + task.abort(); + } + } +} + +impl Drop for ObservationRuntime { + fn drop(&mut self) { + self.abort(); + } +} + +struct SharedObservationState { + latest_snapshot: Option>, + history: VecDeque>>, + last_error: Option, +} + +impl SharedObservationState { + fn new(snapshot: ObservationSnapshot) -> Self { + Self { + latest_snapshot: Some(snapshot), + history: VecDeque::new(), + last_error: None, + } + } +} + +async fn run_observation_loop( + provider: P, + observer: O, + config: ObservationConfig, + shared: Arc>>, + batches: broadcast::Sender>>, + mut state: O::State, +) where + O: Observer, + P: SourceProvider, +{ + let mut ticker = build_interval(config.interval); + let mut cycle = 1u64; + + ticker.tick().await; + + loop { + ticker.tick().await; + + let cycle_outcome = observe_cycle(&provider, &observer, cycle, &mut state).await; + + match cycle_outcome { + Ok(success) => record_cycle_success(&shared, &batches, &config, success), + Err(failure) => record_cycle_failure(&shared, failure), + } + + cycle += 1; + } +} + +struct CycleSuccess { + snapshot: ObservationSnapshot, + batch: Option>>, +} + +async fn observe_cycle( + provider: &P, + observer: &O, + cycle: u64, + state: &mut O::State, +) -> Result, ObservationFailure> +where + O: Observer, + P: SourceProvider, +{ + let sources = provider.sources().await.map_err(|source| { + build_failure(cycle, 0, ObservationFailureStage::SourceRefresh, source) + })?; + + let source_count = sources.len(); + let events = observer.poll(&sources, state).await.map_err(|source| { + build_failure(cycle, source_count, ObservationFailureStage::Poll, source) + })?; + + let snapshot = build_snapshot(cycle, source_count, observer, state); + let batch = build_batch(cycle, source_count, events); + + Ok(CycleSuccess { snapshot, batch }) +} + +fn record_cycle_success( + shared: &Arc>>, + batches: &broadcast::Sender>>, + config: &ObservationConfig, + success: CycleSuccess, +) { + debug!( + observer = type_name::(), + cycle = success.snapshot.cycle, + source_count = success.snapshot.source_count, + event_count = success.batch.as_ref().map_or(0, |batch| batch.events.len()), + "observation cycle completed" + ); + + let mut state = shared.lock(); + state.latest_snapshot = Some(success.snapshot); + state.last_error = None; + + let Some(batch) = success.batch else { + return; + }; + + push_history(&mut state.history, Arc::clone(&batch), config.history_limit); + drop(state); + + let _ = batches.send(batch); +} + +fn record_cycle_failure( + shared: &Arc>>, + failure: ObservationFailure, +) { + warn!( + observer = type_name::(), + cycle = failure.cycle, + source_count = failure.source_count, + stage = ?failure.stage, + message = %failure.message, + "observation cycle failed" + ); + + shared.lock().last_error = Some(failure); +} + +fn ensure_positive_interval(interval: Duration) -> Result<(), ObservationRuntimeError> { + if interval.is_zero() { + return Err(ObservationRuntimeError::InvalidInterval); + } + + Ok(()) +} + +fn build_interval(period: Duration) -> tokio::time::Interval { + let mut ticker = interval(period); + ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + ticker +} + +fn build_snapshot( + cycle: u64, + source_count: usize, + observer: &O, + state: &O::State, +) -> ObservationSnapshot { + ObservationSnapshot { + cycle, + observed_at: SystemTime::now(), + source_count, + value: observer.snapshot(state), + } +} + +fn build_batch( + cycle: u64, + source_count: usize, + events: Vec, +) -> Option>> { + if events.is_empty() { + return None; + } + + Some(Arc::new(ObservationBatch { + cycle, + observed_at: SystemTime::now(), + source_count, + events, + })) +} + +fn build_failure( + cycle: u64, + source_count: usize, + stage: ObservationFailureStage, + source: DynError, +) -> ObservationFailure { + ObservationFailure { + cycle, + observed_at: SystemTime::now(), + source_count, + stage, + message: source.to_string(), + } +} + +fn push_history( + history: &mut VecDeque>>, + batch: Arc>, + history_limit: usize, +) { + if history_limit == 0 { + return; + } + + history.push_back(batch); + + while history.len() > history_limit { + history.pop_front(); + } +} + +#[cfg(test)] +mod tests; diff --git a/testing-framework/core/src/observation/tests.rs b/testing-framework/core/src/observation/tests.rs new file mode 100644 index 0000000..f394631 --- /dev/null +++ b/testing-framework/core/src/observation/tests.rs @@ -0,0 +1,250 @@ +use std::{ + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, + time::Duration, +}; + +use async_trait::async_trait; +use parking_lot::Mutex; +use tokio::time::{Instant, sleep}; + +use super::{ + ObservationConfig, ObservationFailureStage, ObservationRuntime, ObservedSource, Observer, + SourceProvider, +}; +use crate::scenario::DynError; + +#[derive(Clone)] +struct TestSourceProvider { + sources: Arc>>>, + fail_refreshes: Arc, +} + +impl TestSourceProvider { + fn new(sources: Vec>) -> Self { + Self { + sources: Arc::new(Mutex::new(sources)), + fail_refreshes: Arc::new(AtomicUsize::new(0)), + } + } + + fn replace_sources(&self, sources: Vec>) { + *self.sources.lock() = sources; + } + + fn fail_next_refresh(&self) { + self.fail_refreshes.store(1, Ordering::SeqCst); + } +} + +#[async_trait] +impl SourceProvider for TestSourceProvider { + async fn sources(&self) -> Result>, DynError> { + if self.fail_refreshes.swap(0, Ordering::SeqCst) == 1 { + return Err("refresh failed".into()); + } + + Ok(self.sources.lock().clone()) + } +} + +#[derive(Clone, Debug, Eq, PartialEq)] +struct TestSnapshot { + total_sources_seen: u64, + last_source_count: usize, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +struct TestEvent { + total_sources_seen: u64, +} + +#[derive(Default)] +struct TestState { + total_sources_seen: u64, + last_source_count: usize, +} + +struct CountingObserver; + +#[async_trait] +impl Observer for CountingObserver { + type Source = u64; + type State = TestState; + type Snapshot = TestSnapshot; + type Event = TestEvent; + + async fn init( + &self, + sources: &[ObservedSource], + ) -> Result { + Ok(TestState { + total_sources_seen: sources.iter().map(|source| source.source).sum(), + last_source_count: sources.len(), + }) + } + + async fn poll( + &self, + sources: &[ObservedSource], + state: &mut Self::State, + ) -> Result, DynError> { + state.total_sources_seen += sources.iter().map(|source| source.source).sum::(); + state.last_source_count = sources.len(); + + Ok(vec![TestEvent { + total_sources_seen: state.total_sources_seen, + }]) + } + + fn snapshot(&self, state: &Self::State) -> Self::Snapshot { + TestSnapshot { + total_sources_seen: state.total_sources_seen, + last_source_count: state.last_source_count, + } + } +} + +#[tokio::test] +async fn runtime_updates_snapshot_and_history() { + let provider = TestSourceProvider::new(vec![ObservedSource::new("node-0", 2)]); + let runtime = ObservationRuntime::start( + provider, + CountingObserver, + ObservationConfig { + interval: Duration::from_millis(25), + history_limit: 2, + }, + ) + .await + .expect("runtime should start"); + + let handle = runtime.handle(); + wait_for_cycle(&handle, 2).await; + + let snapshot = handle.latest_snapshot().expect("snapshot should exist"); + assert!(snapshot.cycle >= 2); + assert_eq!(snapshot.source_count, 1); + assert_eq!(snapshot.value.last_source_count, 1); + assert!(snapshot.value.total_sources_seen >= 6); + + let history = handle.history(); + assert_eq!(history.len(), 2); + assert!(history.iter().all(|batch| !batch.events.is_empty())); +} + +#[tokio::test] +async fn runtime_refreshes_sources_each_cycle() { + let provider = TestSourceProvider::new(vec![ObservedSource::new("node-0", 1)]); + let runtime = ObservationRuntime::start( + provider.clone(), + CountingObserver, + ObservationConfig { + interval: Duration::from_millis(25), + history_limit: 4, + }, + ) + .await + .expect("runtime should start"); + + let handle = runtime.handle(); + wait_for_cycle(&handle, 1).await; + + provider.replace_sources(vec![ + ObservedSource::new("node-0", 1), + ObservedSource::new("node-1", 3), + ]); + + wait_for_snapshot_source_count(&handle, 2).await; + + let snapshot = handle.latest_snapshot().expect("snapshot should exist"); + assert_eq!(snapshot.source_count, 2); + assert_eq!(snapshot.value.last_source_count, 2); +} + +#[tokio::test] +async fn runtime_records_cycle_failures() { + let provider = TestSourceProvider::new(vec![ObservedSource::new("node-0", 1)]); + let runtime = ObservationRuntime::start( + provider.clone(), + CountingObserver, + ObservationConfig { + interval: Duration::from_millis(25), + history_limit: 2, + }, + ) + .await + .expect("runtime should start"); + + let handle = runtime.handle(); + provider.fail_next_refresh(); + + wait_for_failure(&handle).await; + + let failure = handle.last_error().expect("failure should exist"); + assert_eq!(failure.stage, ObservationFailureStage::SourceRefresh); + assert_eq!(failure.message, "refresh failed"); +} + +async fn wait_for_cycle(handle: &super::ObservationHandle, cycle: u64) { + let deadline = Instant::now() + Duration::from_secs(2); + + loop { + let Some(snapshot) = handle.latest_snapshot() else { + sleep(Duration::from_millis(10)).await; + continue; + }; + + if snapshot.cycle >= cycle { + return; + } + + assert!( + Instant::now() < deadline, + "timed out waiting for cycle {cycle}" + ); + + sleep(Duration::from_millis(10)).await; + } +} + +async fn wait_for_snapshot_source_count( + handle: &super::ObservationHandle, + source_count: usize, +) { + let deadline = Instant::now() + Duration::from_secs(2); + + loop { + let Some(snapshot) = handle.latest_snapshot() else { + sleep(Duration::from_millis(10)).await; + continue; + }; + + if snapshot.source_count == source_count { + return; + } + + assert!( + Instant::now() < deadline, + "timed out waiting for source_count {source_count}" + ); + + sleep(Duration::from_millis(10)).await; + } +} + +async fn wait_for_failure(handle: &super::ObservationHandle) { + let deadline = Instant::now() + Duration::from_secs(2); + + loop { + if handle.last_error().is_some() { + return; + } + + assert!(Instant::now() < deadline, "timed out waiting for failure"); + + sleep(Duration::from_millis(10)).await; + } +} diff --git a/testing-framework/core/src/scenario/common_builder_ext.rs b/testing-framework/core/src/scenario/common_builder_ext.rs index 947b3bf..ea9056a 100644 --- a/testing-framework/core/src/scenario/common_builder_ext.rs +++ b/testing-framework/core/src/scenario/common_builder_ext.rs @@ -4,7 +4,12 @@ use super::{ Application, CleanupPolicy, DeploymentPolicy, Expectation, HttpReadinessRequirement, RetryPolicy, RuntimeExtensionFactory, Workload, internal::CoreBuilderAccess, }; -use crate::topology::{DeploymentProvider, DeploymentSeed}; +use crate::{ + observation::{ + ObservationConfig, ObservationExtensionFactory, Observer, SourceProviderFactory, + }, + topology::{DeploymentProvider, DeploymentSeed}, +}; type DeploymentProviderHandle = Box::Deployment>>; @@ -60,6 +65,48 @@ pub trait CoreBuilderExt: CoreBuilderAccess + Sized { self.map_core_builder(|builder| builder.with_runtime_extension_factory(extension)) } + /// Registers one clonable observer as a runtime extension. + #[must_use] + fn with_observer( + self, + observer: O, + source_provider_factory: impl SourceProviderFactory, + config: ObservationConfig, + ) -> Self + where + O: Observer + Clone, + Self::Env: Application, + { + let extension = ObservationExtensionFactory::::new( + observer, + source_provider_factory, + config, + ); + + self.with_runtime_extension_factory(Box::new(extension)) + } + + /// Registers one observer built lazily per run as a runtime extension. + #[must_use] + fn with_observer_factory( + self, + observer_builder: impl Fn() -> O + Send + Sync + 'static, + source_provider_factory: impl SourceProviderFactory, + config: ObservationConfig, + ) -> Self + where + O: Observer, + Self::Env: Application, + { + let extension = ObservationExtensionFactory::::from_parts( + observer_builder, + source_provider_factory, + config, + ); + + self.with_runtime_extension_factory(Box::new(extension)) + } + #[must_use] fn with_run_duration(self, duration: Duration) -> Self { self.map_core_builder(|builder| builder.with_run_duration(duration)) diff --git a/testing-framework/core/src/scenario/runtime/runner.rs b/testing-framework/core/src/scenario/runtime/runner.rs index 216cd97..8d51b9a 100644 --- a/testing-framework/core/src/scenario/runtime/runner.rs +++ b/testing-framework/core/src/scenario/runtime/runner.rs @@ -16,8 +16,8 @@ use crate::scenario::{ type WorkloadOutcome = Result<(), DynError>; const MIN_NODE_CONTROL_COOLDOWN: Duration = Duration::from_secs(30); -const DEFAULT_BLOCK_FEED_SETTLE_WAIT: Duration = Duration::from_secs(1); -const MIN_BLOCK_FEED_SETTLE_WAIT: Duration = Duration::from_secs(2); +const DEFAULT_POST_WORKLOAD_SETTLE_WAIT: Duration = Duration::from_secs(1); +const MIN_POST_WORKLOAD_SETTLE_WAIT: Duration = Duration::from_secs(2); const EXPECTATION_CAPTURE_CHECK_INTERVAL: Duration = Duration::from_secs(1); const UNKNOWN_PANIC: &str = ""; @@ -183,7 +183,8 @@ impl Runner { } async fn settle_before_expectations(context: &RunContext) { - // Give the feed a short catch-up window before evaluating expectations. + // Give runtime extensions a short catch-up window before evaluating + // expectations. let Some(wait) = Self::settle_wait_duration(context) else { return; }; @@ -200,12 +201,12 @@ impl Runner { } let wait = if configured_wait.is_zero() { - DEFAULT_BLOCK_FEED_SETTLE_WAIT + DEFAULT_POST_WORKLOAD_SETTLE_WAIT } else { configured_wait }; - Some(wait.max(MIN_BLOCK_FEED_SETTLE_WAIT)) + Some(wait.max(MIN_POST_WORKLOAD_SETTLE_WAIT)) } /// Evaluates every registered expectation, aggregating failures so callers @@ -231,8 +232,8 @@ impl Runner { } fn cooldown_duration(context: &RunContext) -> Option { - // Managed environments need a minimum cooldown so feed and expectations - // observe stabilized state. + // Managed environments need a minimum cooldown so runtime extensions and + // expectations observe stabilized state. let needs_stabilization = context.cluster_control_profile().framework_owns_lifecycle(); let mut wait = context.expectation_cooldown(); diff --git a/testing-framework/deployers/compose/src/errors.rs b/testing-framework/deployers/compose/src/errors.rs index 1844bf1..365754c 100644 --- a/testing-framework/deployers/compose/src/errors.rs +++ b/testing-framework/deployers/compose/src/errors.rs @@ -31,8 +31,8 @@ pub enum ComposeRunnerError { NodeClients(#[from] NodeClientError), #[error(transparent)] Telemetry(#[from] MetricsError), - #[error("feed requires at least one node client")] - BlockFeedMissing, + #[error("observation runtime requires at least one node client")] + ObservationMissing, #[error("runtime preflight failed: no node clients available")] RuntimePreflight, #[error("runtime extension setup failed: {source}")] @@ -45,8 +45,8 @@ pub enum ComposeRunnerError { #[source] source: DynError, }, - #[error("failed to start feed: {source}")] - BlockFeed { + #[error("failed to start observation runtime: {source}")] + ObservationRuntime { #[source] source: DynError, },