use std::{ collections::HashSet, sync::{Arc, Mutex}, }; use async_trait::async_trait; use nomos_core::mantle::{ AuthenticatedMantleTx as _, ops::{Op, channel::ChannelId}, }; use testing_framework_core::scenario::{BlockRecord, DynError, Expectation, RunContext}; use thiserror::Error; use tokio::sync::broadcast; #[derive(Debug)] pub struct DaWorkloadExpectation { planned_channels: Vec, capture_state: Option, } #[derive(Debug)] struct CaptureState { planned: Arc>, inscriptions: Arc>>, blobs: Arc>>, } const MIN_INCLUSION_RATIO: f64 = 0.8; #[derive(Debug, Error)] enum DaExpectationError { #[error("da workload expectation not started")] NotCaptured, #[error("missing inscriptions for {missing:?}")] MissingInscriptions { missing: Vec }, #[error("missing blobs for {missing:?}")] MissingBlobs { missing: Vec }, } impl DaWorkloadExpectation { /// Validates that inscriptions and blobs landed for the planned channels. pub const fn new(planned_channels: Vec) -> Self { Self { planned_channels, capture_state: None, } } } #[async_trait] impl Expectation for DaWorkloadExpectation { fn name(&self) -> &'static str { "da_workload_inclusions" } async fn start_capture(&mut self, ctx: &RunContext) -> Result<(), DynError> { if self.capture_state.is_some() { return Ok(()); } tracing::info!( planned_channels = self.planned_channels.len(), "DA inclusion expectation starting capture" ); let planned = Arc::new( self.planned_channels .iter() .copied() .collect::>(), ); let inscriptions = Arc::new(Mutex::new(HashSet::new())); let blobs = Arc::new(Mutex::new(HashSet::new())); let mut receiver = ctx.block_feed().subscribe(); let planned_for_task = Arc::clone(&planned); let inscriptions_for_task = Arc::clone(&inscriptions); let blobs_for_task = Arc::clone(&blobs); tokio::spawn(async move { loop { match receiver.recv().await { Ok(record) => capture_block( record.as_ref(), &planned_for_task, &inscriptions_for_task, &blobs_for_task, ), Err(broadcast::error::RecvError::Lagged(skipped)) => { tracing::debug!(skipped, "DA expectation: receiver lagged"); } Err(broadcast::error::RecvError::Closed) => { tracing::debug!("DA expectation: block feed closed"); break; } } } }); self.capture_state = Some(CaptureState { planned, inscriptions, blobs, }); Ok(()) } async fn evaluate(&mut self, _ctx: &RunContext) -> Result<(), DynError> { let state = self .capture_state .as_ref() .ok_or(DaExpectationError::NotCaptured) .map_err(DynError::from)?; let planned_total = state.planned.len(); let missing_inscriptions = { let inscriptions = state .inscriptions .lock() .expect("inscription lock poisoned"); missing_channels(&state.planned, &inscriptions) }; let required_inscriptions = minimum_required(planned_total, MIN_INCLUSION_RATIO); if planned_total.saturating_sub(missing_inscriptions.len()) < required_inscriptions { tracing::warn!( planned = planned_total, missing = missing_inscriptions.len(), required = required_inscriptions, "DA expectation missing inscriptions" ); return Err(DaExpectationError::MissingInscriptions { missing: missing_inscriptions, } .into()); } let missing_blobs = { let blobs = state.blobs.lock().expect("blob lock poisoned"); missing_channels(&state.planned, &blobs) }; let required_blobs = minimum_required(planned_total, MIN_INCLUSION_RATIO); if planned_total.saturating_sub(missing_blobs.len()) < required_blobs { tracing::warn!( planned = planned_total, missing = missing_blobs.len(), required = required_blobs, "DA expectation missing blobs" ); return Err(DaExpectationError::MissingBlobs { missing: missing_blobs, } .into()); } tracing::info!( planned = planned_total, inscriptions = planned_total - missing_inscriptions.len(), blobs = planned_total - missing_blobs.len(), "DA inclusion expectation satisfied" ); Ok(()) } } fn capture_block( block: &BlockRecord, planned: &HashSet, inscriptions: &Arc>>, blobs: &Arc>>, ) { let mut new_inscriptions = Vec::new(); let mut new_blobs = Vec::new(); for tx in block.block.transactions() { for op in &tx.mantle_tx().ops { match op { Op::ChannelInscribe(inscribe) if planned.contains(&inscribe.channel_id) => { new_inscriptions.push(inscribe.channel_id); } Op::ChannelBlob(blob) if planned.contains(&blob.channel) => { new_blobs.push(blob.channel); } _ => {} } } } if !new_inscriptions.is_empty() { let mut guard = inscriptions.lock().expect("inscription lock poisoned"); guard.extend(new_inscriptions); tracing::debug!(count = guard.len(), "DA expectation captured inscriptions"); } if !new_blobs.is_empty() { let mut guard = blobs.lock().expect("blob lock poisoned"); guard.extend(new_blobs); tracing::debug!(count = guard.len(), "DA expectation captured blobs"); } } fn missing_channels(planned: &HashSet, observed: &HashSet) -> Vec { planned.difference(observed).copied().collect() } fn minimum_required(total: usize, ratio: f64) -> usize { ((total as f64) * ratio).ceil() as usize }