diff --git a/lez/sequencer/core/src/block_publisher.rs b/lez/sequencer/core/src/block_publisher.rs index 6c1eb2bf..48b481fa 100644 --- a/lez/sequencer/core/src/block_publisher.rs +++ b/lez/sequencer/core/src/block_publisher.rs @@ -19,6 +19,11 @@ use tokio::{sync::mpsc, task::JoinHandle}; use crate::config::BedrockConfig; +/// Channel capacity for the publish inbox. One publish per produced block, drained +/// in microseconds by the drive task — 32 is huge headroom and just provides +/// backpressure if the drive task stalls (reconnect, long backfill). +const PUBLISH_INBOX_CAPACITY: usize = 32; + /// Sink for `Event::Published` checkpoints emitted by the drive task. /// Caller is responsible for persistence (e.g. writing to rocksdb). pub type CheckpointSink = Box; @@ -32,11 +37,6 @@ pub type FinalizedBlockSink = Box; pub type OnDepositEventSink = Box Pin + Send>> + Send + 'static>; -/// Channel capacity for the publish inbox. One publish per produced block, drained -/// in microseconds by the drive task — 32 is huge headroom and just provides -/// backpressure if the drive task stalls (reconnect, long backfill). -const PUBLISH_INBOX_CAPACITY: usize = 32; - #[expect(async_fn_in_trait, reason = "We don't care about Send/Sync here")] pub trait BlockPublisherTrait: Clone { async fn new( @@ -111,40 +111,44 @@ impl BlockPublisherTrait for ZoneSdkPublisher { let drive_task = tokio::spawn(async move { loop { - tokio::select! { - // Drain external publish requests by calling the borrowing - // handle — `&mut sequencer` is only available here. - Some(data) = publish_rx.recv() => { - if let Err(e) = sequencer.handle().publish(data) { - warn!("zone-sdk publish failed: {e:?}"); + #[allow(clippy::integer_division_remainder_used)] + { + tokio::select! { + // Drain external publish requests by calling the + // borrowing handle — `&mut sequencer` is only + // available here. + Some(data) = publish_rx.recv() => { + if let Err(e) = sequencer.handle().publish(data) { + warn!("zone-sdk publish failed: {e:?}"); + } } - } - event = sequencer.next_event() => { - let Some(event) = event else { continue }; - match event { - Event::BlocksProcessed { - checkpoint, - channel_update: _, - finalized, - } => { - on_checkpoint(checkpoint); - for op in finalized.into_iter().flat_map(|item| item.ops) { - match op { - FinalizedOp::Inscription(inscription) => { - if let Some(block_id) = - block_id_from_inscription(&inscription) - { - on_finalized_block(block_id); + event = sequencer.next_event() => { + let Some(event) = event else { continue }; + match event { + Event::BlocksProcessed { + checkpoint, + channel_update: _, + finalized, + } => { + on_checkpoint(checkpoint); + for op in finalized.into_iter().flat_map(|item| item.ops) { + match op { + FinalizedOp::Inscription(inscription) => { + if let Some(block_id) = + block_id_from_inscription(&inscription) + { + on_finalized_block(block_id); + } } + FinalizedOp::Deposit(deposit) => { + on_deposit_event(deposit).await; + } + FinalizedOp::Withdraw(_) => {} } - FinalizedOp::Deposit(deposit) => { - on_deposit_event(deposit).await; - } - FinalizedOp::Withdraw(_) => {} } } + Event::Ready | Event::TurnNotification { .. } => {} } - Event::Ready | Event::TurnNotification { .. } => {} } } } @@ -173,7 +177,7 @@ impl BlockPublisherTrait for ZoneSdkPublisher { self.publish_tx .send(data_bounded) .await - .map_err(|_| anyhow!("Drive task is no longer running"))?; + .map_err(|_closed| anyhow!("Drive task is no longer running"))?; Ok(()) }