This commit is contained in:
Petar Radovic 2026-06-15 12:28:05 +02:00
parent 13f15530e4
commit c2f93aff59

View File

@ -19,6 +19,11 @@ use tokio::{sync::mpsc, task::JoinHandle};
use crate::config::BedrockConfig;
/// Channel capacity for the publish inbox. One publish per produced block, drained
/// in microseconds by the drive task — 32 is huge headroom and just provides
/// backpressure if the drive task stalls (reconnect, long backfill).
const PUBLISH_INBOX_CAPACITY: usize = 32;
/// Sink for `Event::Published` checkpoints emitted by the drive task.
/// Caller is responsible for persistence (e.g. writing to rocksdb).
pub type CheckpointSink = Box<dyn Fn(SequencerCheckpoint) + Send + 'static>;
@ -32,11 +37,6 @@ pub type FinalizedBlockSink = Box<dyn Fn(u64) + Send + 'static>;
pub type OnDepositEventSink =
Box<dyn Fn(DepositInfo) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + 'static>;
/// Channel capacity for the publish inbox. One publish per produced block, drained
/// in microseconds by the drive task — 32 is huge headroom and just provides
/// backpressure if the drive task stalls (reconnect, long backfill).
const PUBLISH_INBOX_CAPACITY: usize = 32;
#[expect(async_fn_in_trait, reason = "We don't care about Send/Sync here")]
pub trait BlockPublisherTrait: Clone {
async fn new(
@ -111,40 +111,44 @@ impl BlockPublisherTrait for ZoneSdkPublisher {
let drive_task = tokio::spawn(async move {
loop {
tokio::select! {
// Drain external publish requests by calling the borrowing
// handle — `&mut sequencer` is only available here.
Some(data) = publish_rx.recv() => {
if let Err(e) = sequencer.handle().publish(data) {
warn!("zone-sdk publish failed: {e:?}");
#[allow(clippy::integer_division_remainder_used)]
{
tokio::select! {
// Drain external publish requests by calling the
// borrowing handle — `&mut sequencer` is only
// available here.
Some(data) = publish_rx.recv() => {
if let Err(e) = sequencer.handle().publish(data) {
warn!("zone-sdk publish failed: {e:?}");
}
}
}
event = sequencer.next_event() => {
let Some(event) = event else { continue };
match event {
Event::BlocksProcessed {
checkpoint,
channel_update: _,
finalized,
} => {
on_checkpoint(checkpoint);
for op in finalized.into_iter().flat_map(|item| item.ops) {
match op {
FinalizedOp::Inscription(inscription) => {
if let Some(block_id) =
block_id_from_inscription(&inscription)
{
on_finalized_block(block_id);
event = sequencer.next_event() => {
let Some(event) = event else { continue };
match event {
Event::BlocksProcessed {
checkpoint,
channel_update: _,
finalized,
} => {
on_checkpoint(checkpoint);
for op in finalized.into_iter().flat_map(|item| item.ops) {
match op {
FinalizedOp::Inscription(inscription) => {
if let Some(block_id) =
block_id_from_inscription(&inscription)
{
on_finalized_block(block_id);
}
}
FinalizedOp::Deposit(deposit) => {
on_deposit_event(deposit).await;
}
FinalizedOp::Withdraw(_) => {}
}
FinalizedOp::Deposit(deposit) => {
on_deposit_event(deposit).await;
}
FinalizedOp::Withdraw(_) => {}
}
}
Event::Ready | Event::TurnNotification { .. } => {}
}
Event::Ready | Event::TurnNotification { .. } => {}
}
}
}
@ -173,7 +177,7 @@ impl BlockPublisherTrait for ZoneSdkPublisher {
self.publish_tx
.send(data_bounded)
.await
.map_err(|_| anyhow!("Drive task is no longer running"))?;
.map_err(|_closed| anyhow!("Drive task is no longer running"))?;
Ok(())
}