feat(core): add observation runtime foundation

This commit is contained in:
andrussal 2026-04-12 11:22:24 +02:00
parent e4dbb8bb85
commit d3bb6348e5
10 changed files with 1291 additions and 14 deletions

View File

@ -6,7 +6,7 @@ exclude-dev = true
no-default-features = true
[advisories]
ignore = []
ignore = ["RUSTSEC-2026-0097"]
yanked = "deny"
[bans]

View File

@ -0,0 +1,314 @@
# Observation Runtime Plan
## Why this work exists
TF is good at deployment plumbing. It is weak at continuous observation.
Today, the same problems are solved repeatedly with custom loops:
- TF block feed logic in Logos
- Cucumber manual-cluster polling loops
- ad hoc catch-up scans for wallet and chain state
- app-local state polling in expectations
That is the gap this work should close.
The goal is not a generic "distributed systems DSL".
The goal is one reusable observation runtime that:
- continuously collects data from dynamic sources
- keeps typed materialized state
- exposes both current snapshot and delta/history views
- fits naturally in TF scenarios and Cucumber manual-cluster code
## Constraints
### TF constraints
- TF abstractions must stay universal and simple.
- TF must not know app semantics like blocks, wallets, leaders, jobs, or topics.
- TF must remain useful for simple apps such as `openraft_kv`, not only Logos.
### App constraints
- Apps must be able to build richer abstractions on top of TF.
- Logos must be able to support:
- current block-feed replacement
- fork-aware chain state
- public-peer sync targets
- multi-wallet UTXO tracking
- Apps must be able to adopt this incrementally.
### Migration constraints
- We do not want a flag-day rewrite.
- Existing loops can coexist with the new runtime until replacements are proven.
## Non-goals
This work should not:
- put feed back onto the base `Application` trait
- build app-specific semantics into TF core
- replace filesystem blockchain snapshots used for startup/restore
- force every app to use continuous observation
- introduce a large public abstraction stack that nobody can explain
## Core idea
Introduce one TF-level observation runtime.
That runtime owns:
- source refresh
- scheduling
- polling/ingestion
- bounded history
- latest snapshot caching
- delta publication
- freshness/error tracking
- lifecycle hooks for TF and Cucumber
Apps own:
- source types
- raw observation logic
- materialized state
- snapshot shape
- delta/event shape
- higher-level projections such as wallet state
## Public TF surface
The TF public surface should stay small.
### `ObservedSource<S>`
A named source instance.
Used for:
- local node clients
- public peer endpoints
- any other app-owned source type
### `SourceProvider<S>`
Returns the current source set.
This must support dynamic source lists because:
- manual cluster nodes come and go
- Cucumber worlds may attach public peers
- node control may restart or replace sources during a run
### `Observer`
App-owned observation logic.
It defines:
- `Source`
- `State`
- `Snapshot`
- `Event`
And it implements:
- `init(...)`
- `poll(...)`
- `snapshot(...)`
The important boundary is:
- TF owns the runtime
- app code owns materialization
### `ObservationRuntime`
The engine that:
- starts the loop
- refreshes sources
- calls `poll(...)`
- stores history
- publishes deltas
- updates latest snapshot
- tracks last error and freshness
### `ObservationHandle`
The read-side interface for workloads, expectations, and Cucumber steps.
It should expose at least:
- latest snapshot
- delta subscription
- bounded history
- last error
## Intended shape
```rust
pub struct ObservedSource<S> {
pub name: String,
pub source: S,
}
#[async_trait]
pub trait SourceProvider<S>: Send + Sync + 'static {
async fn sources(&self) -> Vec<ObservedSource<S>>;
}
#[async_trait]
pub trait Observer: Send + Sync + 'static {
type Source: Clone + Send + Sync + 'static;
type State: Send + Sync + 'static;
type Snapshot: Clone + Send + Sync + 'static;
type Event: Clone + Send + Sync + 'static;
async fn init(
&self,
sources: &[ObservedSource<Self::Source>],
) -> Result<Self::State, DynError>;
async fn poll(
&self,
sources: &[ObservedSource<Self::Source>],
state: &mut Self::State,
) -> Result<Vec<Self::Event>, DynError>;
fn snapshot(&self, state: &Self::State) -> Self::Snapshot;
}
```
This is enough.
If more helper layers are needed, they should stay internal first.
## How current use cases fit
### `openraft_kv`
Use one simple observer.
- sources: node clients
- state: latest per-node Raft state
- snapshot: sorted node-state view
- events: optional deltas, possibly empty at first
This is the simplest proving case.
It validates the runtime without dragging in Logos complexity.
### Logos block feed replacement
Use one shared chain observer.
- sources: local node clients
- state:
- node heads
- block graph
- heights
- seen headers
- recent history
- snapshot:
- current head/lib/graph summary
- events:
- newly discovered blocks
This covers both existing Logos feed use cases:
- current snapshot consumers
- delta/subscription consumers
### Cucumber manual-cluster sync
Use the same observer runtime with a different source set.
- sources:
- local manual-cluster node clients
- public peer endpoints
- state:
- local consensus views
- public consensus views
- derived majority public target
- snapshot:
- current local and public sync picture
This removes custom poll/sleep loops from steps.
### Multi-wallet fork-aware tracking
This should not be a TF concept.
It should be a Logos projection built on top of the shared chain observer.
- input: chain observer state
- output: per-header wallet state cache keyed by block header
- property: naturally fork-aware because it follows actual ancestry
That replaces repeated backward scans from tip with continuous maintained state.
## Logos layering
Logos should not put every concern into one giant impl.
Recommended layering:
1. **Chain source adapter**
- local node reads
- public peer reads
2. **Shared chain observer**
- catch-up
- continuous ingestion
- graph/history materialization
3. **Logos projections**
- head view
- public sync target
- fork graph queries
- wallet state
- tx inclusion helpers
TF provides the runtime.
Logos provides the domain model built on top.
## Adoption plan
### Phase 1: add TF observation runtime
- add `ObservedSource`, `SourceProvider`, `Observer`, `ObservationRuntime`, `ObservationHandle`
- keep the public API small
- no app migrations yet
### Phase 2: prove it on `openraft_kv`
- add one simple observer over `/state`
- migrate one expectation to use the observation handle
- validate local, compose, and k8s
### Phase 3: add Logos shared chain observer
- implement it alongside current feed/loops
- do not remove existing consumers yet
- prove snapshot and delta outputs are useful
### Phase 4: migrate one Logos consumer at a time
Suggested order:
1. fork/head snapshot consumer
2. tx inclusion consumer
3. Cucumber sync-to-public-chain logic
4. wallet/UTXO tracking
### Phase 5: delete old loops and feed paths
- only after the new runtime has replaced real consumers cleanly
## Validation gates
Each phase should have clear checks.
### Runtime-level
- crate-level `cargo check`
- targeted tests for runtime lifecycle and history retention
- explicit tests for dynamic source refresh
### App-level
- `openraft_kv`:
- local failover
- compose failover
- k8s failover
- Logos:
- one snapshot consumer migrated
- one delta consumer migrated
- Cucumber:
- one manual-cluster sync path migrated
## Open questions
These should stay open until implementation forces a decision:
- whether `ObservationHandle` should expose full history directly or only cursor/subscription access
- how much error/freshness metadata belongs in the generic runtime vs app snapshot types
- whether multiple observers should share one scheduler/runtime instance or simply run independently first
## Design guardrails
When implementing this work:
- keep TF public abstractions minimal
- keep app semantics out of TF core
- do not chase a generic testing DSL
- build from reusable blocks, not one-off mega impls
- keep migration incremental
- prefer simple, explainable runtime behavior over clever abstraction

View File

@ -29,5 +29,5 @@ reqwest = { features = ["json"], workspace = true }
serde = { workspace = true }
serde_yaml = { workspace = true }
thiserror = { workspace = true }
tokio = { features = ["macros", "process", "rt-multi-thread", "time"], workspace = true }
tokio = { features = ["macros", "process", "rt-multi-thread", "sync", "time"], workspace = true }
tracing = { workspace = true }

View File

@ -1,5 +1,6 @@
pub mod cfgsync;
pub mod env;
pub mod observation;
pub mod runtime;
pub mod scenario;
pub mod topology;

View File

@ -0,0 +1,161 @@
use std::{marker::PhantomData, sync::Arc};
use async_trait::async_trait;
use super::{
ObservationConfig, ObservationHandle, ObservationRuntime, ObservedSource, Observer,
SourceProvider,
};
use crate::scenario::{
Application, DynError, NodeClients, PreparedRuntimeExtension, RuntimeExtensionFactory,
};
/// Boxed source provider used by observation factories.
pub type BoxedSourceProvider<S> = Box<dyn SourceProvider<S>>;
/// Builds an observation source provider once node clients are available.
pub trait SourceProviderFactory<E: Application, S>: Send + Sync + 'static {
/// Builds the source provider for one scenario run.
fn build_source_provider(
&self,
deployment: &E::Deployment,
node_clients: NodeClients<E>,
) -> Result<BoxedSourceProvider<S>, DynError>;
}
impl<E, S, F> SourceProviderFactory<E, S> for F
where
E: Application,
S: Clone + Send + Sync + 'static,
F: Fn(&E::Deployment, NodeClients<E>) -> Result<BoxedSourceProvider<S>, DynError>
+ Send
+ Sync
+ 'static,
{
fn build_source_provider(
&self,
deployment: &E::Deployment,
node_clients: NodeClients<E>,
) -> Result<BoxedSourceProvider<S>, DynError> {
self(deployment, node_clients)
}
}
/// Fixed source provider for scenario runs with a stable source set.
#[derive(Clone, Debug)]
pub struct StaticSourceProvider<S> {
sources: Vec<ObservedSource<S>>,
}
impl<S> StaticSourceProvider<S> {
/// Builds a provider from a fixed source list.
#[must_use]
pub fn new(sources: Vec<ObservedSource<S>>) -> Self {
Self { sources }
}
}
#[async_trait]
impl<S> SourceProvider<S> for StaticSourceProvider<S>
where
S: Clone + Send + Sync + 'static,
{
async fn sources(&self) -> Result<Vec<ObservedSource<S>>, DynError> {
Ok(self.sources.clone())
}
}
/// Runtime extension factory that starts one observer and stores its handle in
/// `RunContext`.
pub struct ObservationExtensionFactory<E: Application, O: Observer> {
observer_builder: Arc<dyn Fn() -> O + Send + Sync>,
source_provider_factory: Arc<dyn SourceProviderFactory<E, O::Source>>,
config: ObservationConfig,
env_marker: PhantomData<E>,
}
impl<E: Application, O: Observer> ObservationExtensionFactory<E, O> {
/// Builds an observation extension factory from builders.
#[must_use]
pub fn from_parts(
observer_builder: impl Fn() -> O + Send + Sync + 'static,
source_provider_factory: impl SourceProviderFactory<E, O::Source>,
config: ObservationConfig,
) -> Self {
Self {
observer_builder: Arc::new(observer_builder),
source_provider_factory: Arc::new(source_provider_factory),
config,
env_marker: PhantomData,
}
}
}
impl<E, O> ObservationExtensionFactory<E, O>
where
E: Application,
O: Observer + Clone,
{
/// Builds an observation extension factory from one clonable observer.
#[must_use]
pub fn new(
observer: O,
source_provider_factory: impl SourceProviderFactory<E, O::Source>,
config: ObservationConfig,
) -> Self {
Self::from_parts(move || observer.clone(), source_provider_factory, config)
}
}
#[async_trait]
impl<E, O> RuntimeExtensionFactory<E> for ObservationExtensionFactory<E, O>
where
E: Application,
O: Observer,
{
async fn prepare(
&self,
deployment: &E::Deployment,
node_clients: NodeClients<E>,
) -> Result<PreparedRuntimeExtension, DynError> {
let source_provider = self
.source_provider_factory
.build_source_provider(deployment, node_clients)?;
let observer = (self.observer_builder)();
let runtime =
ObservationRuntime::start(source_provider, observer, self.config.clone()).await?;
let (handle, task) = runtime.into_parts();
Ok(PreparedRuntimeExtension::from_task(handle, task))
}
}
#[async_trait]
impl<S, P> SourceProvider<S> for Box<P>
where
S: Clone + Send + Sync + 'static,
P: SourceProvider<S> + ?Sized,
{
async fn sources(&self) -> Result<Vec<ObservedSource<S>>, DynError> {
(**self).sources().await
}
}
#[async_trait]
impl<S, P> SourceProvider<S> for Arc<P>
where
S: Clone + Send + Sync + 'static,
P: SourceProvider<S> + ?Sized,
{
async fn sources(&self) -> Result<Vec<ObservedSource<S>>, DynError> {
(**self).sources().await
}
}
impl<O: Observer> From<ObservationHandle<O>> for PreparedRuntimeExtension {
fn from(handle: ObservationHandle<O>) -> Self {
PreparedRuntimeExtension::new(handle)
}
}

View File

@ -0,0 +1,503 @@
//! Generic continuous observation runtime.
//!
//! This module provides the reusable runtime needed by both TF scenarios and
//! manual-cluster consumers such as Cucumber worlds. It does not know any app
//! semantics. Apps provide source types, observation logic, materialized state,
//! snapshots, and delta events.
mod factory;
use std::{
any::type_name,
collections::VecDeque,
sync::Arc,
time::{Duration, SystemTime},
};
use async_trait::async_trait;
pub use factory::{
BoxedSourceProvider, ObservationExtensionFactory, SourceProviderFactory, StaticSourceProvider,
};
use parking_lot::Mutex;
use tokio::{
sync::broadcast,
task::JoinHandle,
time::{MissedTickBehavior, interval},
};
use tracing::{debug, info, warn};
use crate::scenario::DynError;
/// Configuration for a background observation runtime.
#[derive(Clone, Debug)]
pub struct ObservationConfig {
/// Time between observation cycles.
pub interval: Duration,
/// Maximum number of non-empty event batches retained in memory.
pub history_limit: usize,
}
impl Default for ObservationConfig {
fn default() -> Self {
Self {
interval: Duration::from_secs(1),
history_limit: 64,
}
}
}
/// One named observation source.
#[derive(Clone, Debug)]
pub struct ObservedSource<S> {
/// Human-readable source name used in logs and app-level reporting.
pub name: String,
/// App-owned source handle.
pub source: S,
}
impl<S> ObservedSource<S> {
/// Builds one named observation source.
#[must_use]
pub fn new(name: &str, source: S) -> Self {
Self {
name: name.to_owned(),
source,
}
}
}
/// Supplies the current observation source set.
#[async_trait]
pub trait SourceProvider<S>: Send + Sync + 'static {
/// Returns the current source set for the next observation cycle.
async fn sources(&self) -> Result<Vec<ObservedSource<S>>, DynError>;
}
/// App-owned observation logic.
#[async_trait]
pub trait Observer: Send + Sync + 'static {
/// App-owned source type.
type Source: Clone + Send + Sync + 'static;
/// App-owned retained materialized state.
type State: Send + Sync + 'static;
/// App-owned current snapshot view.
type Snapshot: Clone + Send + Sync + 'static;
/// App-owned delta event type emitted per cycle.
type Event: Clone + Send + Sync + 'static;
/// Builds the initial retained state from the current source set.
async fn init(&self, sources: &[ObservedSource<Self::Source>])
-> Result<Self::State, DynError>;
/// Advances retained state by one cycle and returns any new delta events.
async fn poll(
&self,
sources: &[ObservedSource<Self::Source>],
state: &mut Self::State,
) -> Result<Vec<Self::Event>, DynError>;
/// Builds the current snapshot view from retained state.
fn snapshot(&self, state: &Self::State) -> Self::Snapshot;
}
/// One materialized snapshot emitted by the runtime.
#[derive(Clone, Debug)]
pub struct ObservationSnapshot<S> {
/// Monotonic cycle number.
pub cycle: u64,
/// Capture timestamp.
pub observed_at: SystemTime,
/// Number of sources used for this snapshot.
pub source_count: usize,
/// App-owned snapshot payload.
pub value: S,
}
/// One delta batch emitted by a successful observation cycle.
#[derive(Clone, Debug)]
pub struct ObservationBatch<E> {
/// Monotonic cycle number.
pub cycle: u64,
/// Capture timestamp.
pub observed_at: SystemTime,
/// Number of sources used for this batch.
pub source_count: usize,
/// App-owned delta events discovered in this cycle.
pub events: Vec<E>,
}
/// Observation runtime failure stage.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ObservationFailureStage {
/// Source refresh failed before a poll could run.
SourceRefresh,
/// Observer poll failed after sources were refreshed.
Poll,
}
/// Last failed observation cycle.
#[derive(Clone, Debug)]
pub struct ObservationFailure {
/// Monotonic cycle number.
pub cycle: u64,
/// Failure timestamp.
pub observed_at: SystemTime,
/// Number of sources involved in the failed cycle.
pub source_count: usize,
/// Runtime stage that failed.
pub stage: ObservationFailureStage,
/// Human-readable failure message.
pub message: String,
}
/// Errors returned while starting an observation runtime.
#[derive(Debug, thiserror::Error)]
pub enum ObservationRuntimeError {
/// The configured interval is invalid.
#[error("observation interval must be greater than zero")]
InvalidInterval,
/// Source discovery failed during runtime startup.
#[error("failed to refresh observation sources during startup: {source}")]
SourceRefresh {
#[source]
source: DynError,
},
/// Observer state initialization failed during runtime startup.
#[error("failed to initialize observation state: {source}")]
ObserverInit {
#[source]
source: DynError,
},
}
/// Read-side handle for one running observer.
pub struct ObservationHandle<O: Observer> {
shared: Arc<Mutex<SharedObservationState<O>>>,
batches: broadcast::Sender<Arc<ObservationBatch<O::Event>>>,
}
impl<O: Observer> Clone for ObservationHandle<O> {
fn clone(&self) -> Self {
Self {
shared: Arc::clone(&self.shared),
batches: self.batches.clone(),
}
}
}
impl<O: Observer> ObservationHandle<O> {
/// Returns the latest successful snapshot, if one has been produced.
#[must_use]
pub fn latest_snapshot(&self) -> Option<ObservationSnapshot<O::Snapshot>> {
self.shared.lock().latest_snapshot.clone()
}
/// Returns retained non-empty event batches.
#[must_use]
pub fn history(&self) -> Vec<Arc<ObservationBatch<O::Event>>> {
self.shared.lock().history.iter().cloned().collect()
}
/// Returns the most recent cycle failure, if any.
#[must_use]
pub fn last_error(&self) -> Option<ObservationFailure> {
self.shared.lock().last_error.clone()
}
/// Subscribes to future non-empty event batches.
#[must_use]
pub fn subscribe(&self) -> broadcast::Receiver<Arc<ObservationBatch<O::Event>>> {
self.batches.subscribe()
}
}
/// Lifecycle owner for one background observation runtime.
pub struct ObservationRuntime<O: Observer> {
handle: ObservationHandle<O>,
task: Option<JoinHandle<()>>,
}
impl<O: Observer> ObservationRuntime<O> {
/// Starts one background observation runtime.
pub async fn start<P>(
provider: P,
observer: O,
config: ObservationConfig,
) -> Result<Self, ObservationRuntimeError>
where
P: SourceProvider<O::Source>,
{
ensure_positive_interval(config.interval)?;
let sources = provider
.sources()
.await
.map_err(|source| ObservationRuntimeError::SourceRefresh { source })?;
let source_count = sources.len();
let state = observer
.init(&sources)
.await
.map_err(|source| ObservationRuntimeError::ObserverInit { source })?;
let snapshot = build_snapshot(0, source_count, &observer, &state);
let batches = broadcast::channel(config.history_limit.max(1)).0;
let shared = Arc::new(Mutex::new(SharedObservationState::new(snapshot)));
let handle = ObservationHandle {
shared: Arc::clone(&shared),
batches,
};
info!(
observer = type_name::<O>(),
interval_ms = config.interval.as_millis(),
history_limit = config.history_limit,
source_count,
"starting observation runtime"
);
let runtime_handle = handle.clone();
let task = tokio::spawn(run_observation_loop(
provider,
observer,
config,
shared,
runtime_handle.batches.clone(),
state,
));
Ok(Self {
handle: runtime_handle,
task: Some(task),
})
}
/// Returns a read-side handle for the running observer.
#[must_use]
pub fn handle(&self) -> ObservationHandle<O> {
self.handle.clone()
}
/// Splits the runtime into its handle and background task.
#[must_use]
pub fn into_parts(mut self) -> (ObservationHandle<O>, JoinHandle<()>) {
let task = self
.task
.take()
.expect("observation runtime task is always present before into_parts");
(self.handle.clone(), task)
}
/// Aborts the background task.
pub fn abort(&mut self) {
if let Some(task) = self.task.take() {
task.abort();
}
}
}
impl<O: Observer> Drop for ObservationRuntime<O> {
fn drop(&mut self) {
self.abort();
}
}
struct SharedObservationState<O: Observer> {
latest_snapshot: Option<ObservationSnapshot<O::Snapshot>>,
history: VecDeque<Arc<ObservationBatch<O::Event>>>,
last_error: Option<ObservationFailure>,
}
impl<O: Observer> SharedObservationState<O> {
fn new(snapshot: ObservationSnapshot<O::Snapshot>) -> Self {
Self {
latest_snapshot: Some(snapshot),
history: VecDeque::new(),
last_error: None,
}
}
}
async fn run_observation_loop<O, P>(
provider: P,
observer: O,
config: ObservationConfig,
shared: Arc<Mutex<SharedObservationState<O>>>,
batches: broadcast::Sender<Arc<ObservationBatch<O::Event>>>,
mut state: O::State,
) where
O: Observer,
P: SourceProvider<O::Source>,
{
let mut ticker = build_interval(config.interval);
let mut cycle = 1u64;
ticker.tick().await;
loop {
ticker.tick().await;
let cycle_outcome = observe_cycle(&provider, &observer, cycle, &mut state).await;
match cycle_outcome {
Ok(success) => record_cycle_success(&shared, &batches, &config, success),
Err(failure) => record_cycle_failure(&shared, failure),
}
cycle += 1;
}
}
struct CycleSuccess<O: Observer> {
snapshot: ObservationSnapshot<O::Snapshot>,
batch: Option<Arc<ObservationBatch<O::Event>>>,
}
async fn observe_cycle<O, P>(
provider: &P,
observer: &O,
cycle: u64,
state: &mut O::State,
) -> Result<CycleSuccess<O>, ObservationFailure>
where
O: Observer,
P: SourceProvider<O::Source>,
{
let sources = provider.sources().await.map_err(|source| {
build_failure(cycle, 0, ObservationFailureStage::SourceRefresh, source)
})?;
let source_count = sources.len();
let events = observer.poll(&sources, state).await.map_err(|source| {
build_failure(cycle, source_count, ObservationFailureStage::Poll, source)
})?;
let snapshot = build_snapshot(cycle, source_count, observer, state);
let batch = build_batch(cycle, source_count, events);
Ok(CycleSuccess { snapshot, batch })
}
fn record_cycle_success<O: Observer>(
shared: &Arc<Mutex<SharedObservationState<O>>>,
batches: &broadcast::Sender<Arc<ObservationBatch<O::Event>>>,
config: &ObservationConfig,
success: CycleSuccess<O>,
) {
debug!(
observer = type_name::<O>(),
cycle = success.snapshot.cycle,
source_count = success.snapshot.source_count,
event_count = success.batch.as_ref().map_or(0, |batch| batch.events.len()),
"observation cycle completed"
);
let mut state = shared.lock();
state.latest_snapshot = Some(success.snapshot);
state.last_error = None;
let Some(batch) = success.batch else {
return;
};
push_history(&mut state.history, Arc::clone(&batch), config.history_limit);
drop(state);
let _ = batches.send(batch);
}
fn record_cycle_failure<O: Observer>(
shared: &Arc<Mutex<SharedObservationState<O>>>,
failure: ObservationFailure,
) {
warn!(
observer = type_name::<O>(),
cycle = failure.cycle,
source_count = failure.source_count,
stage = ?failure.stage,
message = %failure.message,
"observation cycle failed"
);
shared.lock().last_error = Some(failure);
}
fn ensure_positive_interval(interval: Duration) -> Result<(), ObservationRuntimeError> {
if interval.is_zero() {
return Err(ObservationRuntimeError::InvalidInterval);
}
Ok(())
}
fn build_interval(period: Duration) -> tokio::time::Interval {
let mut ticker = interval(period);
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
ticker
}
fn build_snapshot<O: Observer>(
cycle: u64,
source_count: usize,
observer: &O,
state: &O::State,
) -> ObservationSnapshot<O::Snapshot> {
ObservationSnapshot {
cycle,
observed_at: SystemTime::now(),
source_count,
value: observer.snapshot(state),
}
}
fn build_batch<E>(
cycle: u64,
source_count: usize,
events: Vec<E>,
) -> Option<Arc<ObservationBatch<E>>> {
if events.is_empty() {
return None;
}
Some(Arc::new(ObservationBatch {
cycle,
observed_at: SystemTime::now(),
source_count,
events,
}))
}
fn build_failure(
cycle: u64,
source_count: usize,
stage: ObservationFailureStage,
source: DynError,
) -> ObservationFailure {
ObservationFailure {
cycle,
observed_at: SystemTime::now(),
source_count,
stage,
message: source.to_string(),
}
}
fn push_history<E>(
history: &mut VecDeque<Arc<ObservationBatch<E>>>,
batch: Arc<ObservationBatch<E>>,
history_limit: usize,
) {
if history_limit == 0 {
return;
}
history.push_back(batch);
while history.len() > history_limit {
history.pop_front();
}
}
#[cfg(test)]
mod tests;

View File

@ -0,0 +1,250 @@
use std::{
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
time::Duration,
};
use async_trait::async_trait;
use parking_lot::Mutex;
use tokio::time::{Instant, sleep};
use super::{
ObservationConfig, ObservationFailureStage, ObservationRuntime, ObservedSource, Observer,
SourceProvider,
};
use crate::scenario::DynError;
#[derive(Clone)]
struct TestSourceProvider {
sources: Arc<Mutex<Vec<ObservedSource<u64>>>>,
fail_refreshes: Arc<AtomicUsize>,
}
impl TestSourceProvider {
fn new(sources: Vec<ObservedSource<u64>>) -> Self {
Self {
sources: Arc::new(Mutex::new(sources)),
fail_refreshes: Arc::new(AtomicUsize::new(0)),
}
}
fn replace_sources(&self, sources: Vec<ObservedSource<u64>>) {
*self.sources.lock() = sources;
}
fn fail_next_refresh(&self) {
self.fail_refreshes.store(1, Ordering::SeqCst);
}
}
#[async_trait]
impl SourceProvider<u64> for TestSourceProvider {
async fn sources(&self) -> Result<Vec<ObservedSource<u64>>, DynError> {
if self.fail_refreshes.swap(0, Ordering::SeqCst) == 1 {
return Err("refresh failed".into());
}
Ok(self.sources.lock().clone())
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
struct TestSnapshot {
total_sources_seen: u64,
last_source_count: usize,
}
#[derive(Clone, Debug, Eq, PartialEq)]
struct TestEvent {
total_sources_seen: u64,
}
#[derive(Default)]
struct TestState {
total_sources_seen: u64,
last_source_count: usize,
}
struct CountingObserver;
#[async_trait]
impl Observer for CountingObserver {
type Source = u64;
type State = TestState;
type Snapshot = TestSnapshot;
type Event = TestEvent;
async fn init(
&self,
sources: &[ObservedSource<Self::Source>],
) -> Result<Self::State, DynError> {
Ok(TestState {
total_sources_seen: sources.iter().map(|source| source.source).sum(),
last_source_count: sources.len(),
})
}
async fn poll(
&self,
sources: &[ObservedSource<Self::Source>],
state: &mut Self::State,
) -> Result<Vec<Self::Event>, DynError> {
state.total_sources_seen += sources.iter().map(|source| source.source).sum::<u64>();
state.last_source_count = sources.len();
Ok(vec![TestEvent {
total_sources_seen: state.total_sources_seen,
}])
}
fn snapshot(&self, state: &Self::State) -> Self::Snapshot {
TestSnapshot {
total_sources_seen: state.total_sources_seen,
last_source_count: state.last_source_count,
}
}
}
#[tokio::test]
async fn runtime_updates_snapshot_and_history() {
let provider = TestSourceProvider::new(vec![ObservedSource::new("node-0", 2)]);
let runtime = ObservationRuntime::start(
provider,
CountingObserver,
ObservationConfig {
interval: Duration::from_millis(25),
history_limit: 2,
},
)
.await
.expect("runtime should start");
let handle = runtime.handle();
wait_for_cycle(&handle, 2).await;
let snapshot = handle.latest_snapshot().expect("snapshot should exist");
assert!(snapshot.cycle >= 2);
assert_eq!(snapshot.source_count, 1);
assert_eq!(snapshot.value.last_source_count, 1);
assert!(snapshot.value.total_sources_seen >= 6);
let history = handle.history();
assert_eq!(history.len(), 2);
assert!(history.iter().all(|batch| !batch.events.is_empty()));
}
#[tokio::test]
async fn runtime_refreshes_sources_each_cycle() {
let provider = TestSourceProvider::new(vec![ObservedSource::new("node-0", 1)]);
let runtime = ObservationRuntime::start(
provider.clone(),
CountingObserver,
ObservationConfig {
interval: Duration::from_millis(25),
history_limit: 4,
},
)
.await
.expect("runtime should start");
let handle = runtime.handle();
wait_for_cycle(&handle, 1).await;
provider.replace_sources(vec![
ObservedSource::new("node-0", 1),
ObservedSource::new("node-1", 3),
]);
wait_for_snapshot_source_count(&handle, 2).await;
let snapshot = handle.latest_snapshot().expect("snapshot should exist");
assert_eq!(snapshot.source_count, 2);
assert_eq!(snapshot.value.last_source_count, 2);
}
#[tokio::test]
async fn runtime_records_cycle_failures() {
let provider = TestSourceProvider::new(vec![ObservedSource::new("node-0", 1)]);
let runtime = ObservationRuntime::start(
provider.clone(),
CountingObserver,
ObservationConfig {
interval: Duration::from_millis(25),
history_limit: 2,
},
)
.await
.expect("runtime should start");
let handle = runtime.handle();
provider.fail_next_refresh();
wait_for_failure(&handle).await;
let failure = handle.last_error().expect("failure should exist");
assert_eq!(failure.stage, ObservationFailureStage::SourceRefresh);
assert_eq!(failure.message, "refresh failed");
}
async fn wait_for_cycle(handle: &super::ObservationHandle<CountingObserver>, cycle: u64) {
let deadline = Instant::now() + Duration::from_secs(2);
loop {
let Some(snapshot) = handle.latest_snapshot() else {
sleep(Duration::from_millis(10)).await;
continue;
};
if snapshot.cycle >= cycle {
return;
}
assert!(
Instant::now() < deadline,
"timed out waiting for cycle {cycle}"
);
sleep(Duration::from_millis(10)).await;
}
}
async fn wait_for_snapshot_source_count(
handle: &super::ObservationHandle<CountingObserver>,
source_count: usize,
) {
let deadline = Instant::now() + Duration::from_secs(2);
loop {
let Some(snapshot) = handle.latest_snapshot() else {
sleep(Duration::from_millis(10)).await;
continue;
};
if snapshot.source_count == source_count {
return;
}
assert!(
Instant::now() < deadline,
"timed out waiting for source_count {source_count}"
);
sleep(Duration::from_millis(10)).await;
}
}
async fn wait_for_failure(handle: &super::ObservationHandle<CountingObserver>) {
let deadline = Instant::now() + Duration::from_secs(2);
loop {
if handle.last_error().is_some() {
return;
}
assert!(Instant::now() < deadline, "timed out waiting for failure");
sleep(Duration::from_millis(10)).await;
}
}

View File

@ -4,7 +4,12 @@ use super::{
Application, CleanupPolicy, DeploymentPolicy, Expectation, HttpReadinessRequirement,
RetryPolicy, RuntimeExtensionFactory, Workload, internal::CoreBuilderAccess,
};
use crate::topology::{DeploymentProvider, DeploymentSeed};
use crate::{
observation::{
ObservationConfig, ObservationExtensionFactory, Observer, SourceProviderFactory,
},
topology::{DeploymentProvider, DeploymentSeed},
};
type DeploymentProviderHandle<E> = Box<dyn DeploymentProvider<<E as Application>::Deployment>>;
@ -60,6 +65,48 @@ pub trait CoreBuilderExt: CoreBuilderAccess + Sized {
self.map_core_builder(|builder| builder.with_runtime_extension_factory(extension))
}
/// Registers one clonable observer as a runtime extension.
#[must_use]
fn with_observer<O>(
self,
observer: O,
source_provider_factory: impl SourceProviderFactory<Self::Env, O::Source>,
config: ObservationConfig,
) -> Self
where
O: Observer + Clone,
Self::Env: Application,
{
let extension = ObservationExtensionFactory::<Self::Env, O>::new(
observer,
source_provider_factory,
config,
);
self.with_runtime_extension_factory(Box::new(extension))
}
/// Registers one observer built lazily per run as a runtime extension.
#[must_use]
fn with_observer_factory<O>(
self,
observer_builder: impl Fn() -> O + Send + Sync + 'static,
source_provider_factory: impl SourceProviderFactory<Self::Env, O::Source>,
config: ObservationConfig,
) -> Self
where
O: Observer,
Self::Env: Application,
{
let extension = ObservationExtensionFactory::<Self::Env, O>::from_parts(
observer_builder,
source_provider_factory,
config,
);
self.with_runtime_extension_factory(Box::new(extension))
}
#[must_use]
fn with_run_duration(self, duration: Duration) -> Self {
self.map_core_builder(|builder| builder.with_run_duration(duration))

View File

@ -16,8 +16,8 @@ use crate::scenario::{
type WorkloadOutcome = Result<(), DynError>;
const MIN_NODE_CONTROL_COOLDOWN: Duration = Duration::from_secs(30);
const DEFAULT_BLOCK_FEED_SETTLE_WAIT: Duration = Duration::from_secs(1);
const MIN_BLOCK_FEED_SETTLE_WAIT: Duration = Duration::from_secs(2);
const DEFAULT_POST_WORKLOAD_SETTLE_WAIT: Duration = Duration::from_secs(1);
const MIN_POST_WORKLOAD_SETTLE_WAIT: Duration = Duration::from_secs(2);
const EXPECTATION_CAPTURE_CHECK_INTERVAL: Duration = Duration::from_secs(1);
const UNKNOWN_PANIC: &str = "<unknown panic>";
@ -183,7 +183,8 @@ impl<E: Application> Runner<E> {
}
async fn settle_before_expectations(context: &RunContext<E>) {
// Give the feed a short catch-up window before evaluating expectations.
// Give runtime extensions a short catch-up window before evaluating
// expectations.
let Some(wait) = Self::settle_wait_duration(context) else {
return;
};
@ -200,12 +201,12 @@ impl<E: Application> Runner<E> {
}
let wait = if configured_wait.is_zero() {
DEFAULT_BLOCK_FEED_SETTLE_WAIT
DEFAULT_POST_WORKLOAD_SETTLE_WAIT
} else {
configured_wait
};
Some(wait.max(MIN_BLOCK_FEED_SETTLE_WAIT))
Some(wait.max(MIN_POST_WORKLOAD_SETTLE_WAIT))
}
/// Evaluates every registered expectation, aggregating failures so callers
@ -231,8 +232,8 @@ impl<E: Application> Runner<E> {
}
fn cooldown_duration(context: &RunContext<E>) -> Option<Duration> {
// Managed environments need a minimum cooldown so feed and expectations
// observe stabilized state.
// Managed environments need a minimum cooldown so runtime extensions and
// expectations observe stabilized state.
let needs_stabilization = context.cluster_control_profile().framework_owns_lifecycle();
let mut wait = context.expectation_cooldown();

View File

@ -31,8 +31,8 @@ pub enum ComposeRunnerError {
NodeClients(#[from] NodeClientError),
#[error(transparent)]
Telemetry(#[from] MetricsError),
#[error("feed requires at least one node client")]
BlockFeedMissing,
#[error("observation runtime requires at least one node client")]
ObservationMissing,
#[error("runtime preflight failed: no node clients available")]
RuntimePreflight,
#[error("runtime extension setup failed: {source}")]
@ -45,8 +45,8 @@ pub enum ComposeRunnerError {
#[source]
source: DynError,
},
#[error("failed to start feed: {source}")]
BlockFeed {
#[error("failed to start observation runtime: {source}")]
ObservationRuntime {
#[source]
source: DynError,
},