workflows: remove panics from workloads

This commit is contained in:
andrussal 2025-12-18 23:10:22 +01:00
parent 14148221eb
commit a1e0dddea3
5 changed files with 36 additions and 30 deletions

View File

@ -4,11 +4,11 @@ use nomos_core::mantle::{
ledger::Tx as LedgerTx, ledger::Tx as LedgerTx,
ops::channel::{ChannelId, MsgId, inscribe::InscriptionOp}, ops::channel::{ChannelId, MsgId, inscribe::InscriptionOp},
}; };
use testing_framework_core::scenario::DynError;
/// Builds a signed inscription transaction with deterministic payload for /// Builds a signed inscription transaction with deterministic payload for
/// testing. /// testing.
#[must_use] pub fn create_inscription_transaction_with_id(id: ChannelId) -> Result<SignedMantleTx, DynError> {
pub fn create_inscription_transaction_with_id(id: ChannelId) -> SignedMantleTx {
let signing_key = Ed25519Key::from_bytes(&[0u8; 32]); let signing_key = Ed25519Key::from_bytes(&[0u8; 32]);
let signer = signing_key.public_key(); let signer = signing_key.public_key();
@ -31,10 +31,13 @@ pub fn create_inscription_transaction_with_id(id: ChannelId) -> SignedMantleTx {
let zk_key = ZkKey::zero(); let zk_key = ZkKey::zero();
tracing::debug!(channel = ?id, tx_hash = ?tx_hash, "building inscription transaction"); tracing::debug!(channel = ?id, tx_hash = ?tx_hash, "building inscription transaction");
let zk_signature = ZkKey::multi_sign(&[zk_key], tx_hash.as_ref())
.map_err(|err| format!("zk signature generation failed: {err}"))?;
SignedMantleTx::new( SignedMantleTx::new(
mantle_tx, mantle_tx,
vec![OpProof::Ed25519Sig(signature)], vec![OpProof::Ed25519Sig(signature)],
ZkKey::multi_sign(&[zk_key], tx_hash.as_ref()).expect("zk signature generation"), zk_signature,
) )
.expect("valid transaction") .map_err(|err| format!("failed to build signed mantle transaction: {err}").into())
} }

View File

@ -93,7 +93,11 @@ impl RandomRestartWorkload {
&self, &self,
targets: &[Target], targets: &[Target],
cooldowns: &HashMap<Target, Instant>, cooldowns: &HashMap<Target, Instant>,
) -> Target { ) -> Result<Target, DynError> {
if targets.is_empty() {
return Err("chaos restart workload has no eligible targets".into());
}
loop { loop {
let now = Instant::now(); let now = Instant::now();
if let Some(next_ready) = cooldowns if let Some(next_ready) = cooldowns
@ -121,13 +125,13 @@ impl RandomRestartWorkload {
if let Some(choice) = available.choose(&mut thread_rng()).copied() { if let Some(choice) = available.choose(&mut thread_rng()).copied() {
tracing::debug!(?choice, "chaos restart picked target"); tracing::debug!(?choice, "chaos restart picked target");
return choice; return Ok(choice);
} }
return targets if let Some(choice) = targets.choose(&mut thread_rng()).copied() {
.choose(&mut thread_rng()) return Ok(choice);
.copied() }
.expect("chaos restart workload has targets"); return Err("chaos restart workload has no eligible targets".into());
} }
} }
} }
@ -160,7 +164,7 @@ impl Workload for RandomRestartWorkload {
loop { loop {
sleep(self.random_delay()).await; sleep(self.random_delay()).await;
let target = self.pick_target(&targets, &cooldowns).await; let target = self.pick_target(&targets, &cooldowns).await?;
match target { match target {
Target::Validator(index) => { Target::Validator(index) => {

View File

@ -19,6 +19,16 @@ use tokio::{pin, select, spawn, sync::broadcast, time::sleep};
use super::workload::{planned_channel_count, planned_channel_ids}; use super::workload::{planned_channel_count, planned_channel_ids};
fn lock_or_recover<'a, T>(mutex: &'a Mutex<T>, name: &'static str) -> std::sync::MutexGuard<'a, T> {
match mutex.lock() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::warn!(lock = name, "mutex poisoned; recovering inner value");
poisoned.into_inner()
}
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct DaWorkloadExpectation { pub struct DaWorkloadExpectation {
blob_rate_per_block: NonZeroU64, blob_rate_per_block: NonZeroU64,
@ -277,15 +287,12 @@ impl DaWorkloadExpectation {
} }
fn missing_inscriptions(&self, state: &CaptureState) -> Vec<ChannelId> { fn missing_inscriptions(&self, state: &CaptureState) -> Vec<ChannelId> {
let inscriptions = state let inscriptions = lock_or_recover(&state.inscriptions, "da_inscriptions");
.inscriptions
.lock()
.expect("inscription lock poisoned");
missing_channels(&state.planned, &inscriptions) missing_channels(&state.planned, &inscriptions)
} }
fn observe_blobs(&self, state: &CaptureState) -> BlobObservation { fn observe_blobs(&self, state: &CaptureState) -> BlobObservation {
let blobs = state.blobs.lock().expect("blob lock poisoned"); let blobs = lock_or_recover(&state.blobs, "da_blobs");
let observed_total_blobs = blobs.values().sum::<u64>(); let observed_total_blobs = blobs.values().sum::<u64>();
let channels_with_blobs = blobs let channels_with_blobs = blobs
.iter() .iter()
@ -355,13 +362,13 @@ fn capture_block(
} }
if !new_inscriptions.is_empty() { if !new_inscriptions.is_empty() {
let mut guard = inscriptions.lock().expect("inscription lock poisoned"); let mut guard = lock_or_recover(inscriptions, "da_inscriptions");
guard.extend(new_inscriptions); guard.extend(new_inscriptions);
tracing::debug!(count = guard.len(), "DA expectation captured inscriptions"); tracing::debug!(count = guard.len(), "DA expectation captured inscriptions");
} }
if !new_blobs.is_empty() { if !new_blobs.is_empty() {
let mut guard = blobs.lock().expect("blob lock poisoned"); let mut guard = lock_or_recover(blobs, "da_blobs");
for channel in new_blobs { for channel in new_blobs {
let entry = guard.entry(channel).or_insert(0); let entry = guard.entry(channel).or_insert(0);
*entry += 1; *entry += 1;

View File

@ -28,8 +28,6 @@ use crate::{
}; };
const TEST_KEY_BYTES: [u8; 32] = [0u8; 32]; const TEST_KEY_BYTES: [u8; 32] = [0u8; 32];
const DEFAULT_BLOB_RATE_PER_BLOCK: u64 = 1;
const DEFAULT_CHANNEL_RATE_PER_BLOCK: u64 = 1;
const BLOB_CHUNK_OPTIONS: &[usize] = &[1, 2, 4, 8]; const BLOB_CHUNK_OPTIONS: &[usize] = &[1, 2, 4, 8];
const PUBLISH_RETRIES: usize = 5; const PUBLISH_RETRIES: usize = 5;
const PUBLISH_RETRY_DELAY: Duration = Duration::from_secs(2); const PUBLISH_RETRY_DELAY: Duration = Duration::from_secs(2);
@ -44,11 +42,7 @@ pub struct Workload {
impl Default for Workload { impl Default for Workload {
fn default() -> Self { fn default() -> Self {
Self::with_rate( Self::with_rate(NonZeroU64::MIN, NonZeroU64::MIN, DEFAULT_HEADROOM_PERCENT)
NonZeroU64::new(DEFAULT_BLOB_RATE_PER_BLOCK).expect("non-zero"),
NonZeroU64::new(DEFAULT_CHANNEL_RATE_PER_BLOCK).expect("non-zero"),
DEFAULT_HEADROOM_PERCENT,
)
} }
} }
@ -139,7 +133,7 @@ async fn run_channel_flow(
target_blobs: u64, target_blobs: u64,
) -> Result<(), DynError> { ) -> Result<(), DynError> {
tracing::debug!(channel_id = ?channel_id, "DA: submitting inscription tx"); tracing::debug!(channel_id = ?channel_id, "DA: submitting inscription tx");
let inscription_tx = Arc::new(tx::create_inscription_transaction_with_id(channel_id)); let inscription_tx = Arc::new(tx::create_inscription_transaction_with_id(channel_id)?);
submit_transaction_via_cluster(ctx, Arc::clone(&inscription_tx)).await?; submit_transaction_via_cluster(ctx, Arc::clone(&inscription_tx)).await?;
let mut receiver = ctx.block_feed().subscribe(); let mut receiver = ctx.block_feed().subscribe();
@ -280,9 +274,7 @@ fn random_blob_payload() -> Vec<u8> {
let mut rng = thread_rng(); let mut rng = thread_rng();
// KZGRS encoder expects the polynomial degree to be a power of two, which // KZGRS encoder expects the polynomial degree to be a power of two, which
// effectively constrains the blob chunk count. // effectively constrains the blob chunk count.
let chunks = *BLOB_CHUNK_OPTIONS let chunks = BLOB_CHUNK_OPTIONS.choose(&mut rng).copied().unwrap_or(1);
.choose(&mut rng)
.expect("non-empty chunk options");
let mut data = vec![0u8; 31 * chunks]; let mut data = vec![0u8; 31 * chunks];
rng.fill_bytes(&mut data); rng.fill_bytes(&mut data);
data data

View File

@ -157,7 +157,7 @@ impl Workload {
impl Default for Workload { impl Default for Workload {
fn default() -> Self { fn default() -> Self {
Self::new(NonZeroU64::new(1).expect("non-zero")) Self::new(NonZeroU64::MIN)
} }
} }