From 77cfff5256905ad96b445a4679611c562a1f9f42 Mon Sep 17 00:00:00 2001 From: moudyellaz Date: Tue, 23 Jun 2026 10:17:46 +0200 Subject: [PATCH] feat(cross-zone): add inbox watcher and seed inbox config at genesis --- Cargo.lock | 2 + integration_tests/tests/two_zone.rs | 4 +- lee/state_machine/src/state.rs | 11 + lez/sequencer/core/Cargo.toml | 2 + lez/sequencer/core/src/config.rs | 20 ++ lez/sequencer/core/src/cross_zone_watcher.rs | 216 +++++++++++++++++++ lez/sequencer/core/src/lib.rs | 22 ++ test_fixtures/src/config.rs | 4 +- test_fixtures/src/lib.rs | 1 + test_fixtures/src/setup.rs | 2 + 10 files changed, 281 insertions(+), 3 deletions(-) create mode 100644 lez/sequencer/core/src/cross_zone_watcher.rs diff --git a/Cargo.lock b/Cargo.lock index d0395448..e09f6a21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4001,7 +4001,9 @@ dependencies = [ "logos-blockchain-key-management-system-service", "logos-blockchain-zone-sdk", "num-bigint 0.4.6", + "ping_core", "reqwest", + "risc0-zkvm", "sequencer_core", "sequencer_service_rpc", "serde_json", diff --git a/integration_tests/tests/two_zone.rs b/integration_tests/tests/two_zone.rs index 1b1ddefc..90f994cb 100644 --- a/integration_tests/tests/two_zone.rs +++ b/integration_tests/tests/two_zone.rs @@ -36,13 +36,13 @@ async fn two_zones_share_one_bedrock_and_both_advance() -> Result<()> { let channel_b = config::bedrock_channel_id_b(); // Empty genesis is enough: the clock transaction drives block production. - let (seq_a, _seq_a_home) = setup_sequencer(partial, bedrock_addr, vec![], channel_a) + let (seq_a, _seq_a_home) = setup_sequencer(partial, bedrock_addr, vec![], channel_a, None) .await .context("Failed to set up zone A sequencer")?; let (idx_a, _idx_a_home) = setup_indexer(bedrock_addr, channel_a) .await .context("Failed to set up zone A indexer")?; - let (seq_b, _seq_b_home) = setup_sequencer(partial, bedrock_addr, vec![], channel_b) + let (seq_b, _seq_b_home) = setup_sequencer(partial, bedrock_addr, vec![], channel_b, None) .await .context("Failed to set up zone B sequencer")?; let (idx_b, _idx_b_home) = setup_indexer(bedrock_addr, channel_b) diff --git a/lee/state_machine/src/state.rs b/lee/state_machine/src/state.rs index 949c9920..cbcbad53 100644 --- a/lee/state_machine/src/state.rs +++ b/lee/state_machine/src/state.rs @@ -199,6 +199,8 @@ impl V03State { this.insert_program(Program::bridge()); this.insert_program(Program::cross_zone_outbox()); this.insert_program(Program::cross_zone_inbox()); + this.insert_program(Program::ping_sender()); + this.insert_program(Program::ping_receiver()); this } @@ -374,6 +376,13 @@ impl V03State { }, ); } + + /// Inserts an account directly into genesis state, bypassing execution. + /// Genesis-only: used to seed configuration accounts that are not produced by + /// any transaction. Must never be reachable from transaction processing. + pub fn insert_genesis_account(&mut self, account_id: AccountId, account: Account) { + self.public_state.insert(account_id, account); + } } #[cfg(any(test, feature = "test-utils"))] @@ -716,6 +725,8 @@ pub mod tests { Program::cross_zone_outbox(), ); this.insert(Program::cross_zone_inbox().id(), Program::cross_zone_inbox()); + this.insert(Program::ping_sender().id(), Program::ping_sender()); + this.insert(Program::ping_receiver().id(), Program::ping_receiver()); this }; diff --git a/lez/sequencer/core/Cargo.toml b/lez/sequencer/core/Cargo.toml index 9201d8b3..09fef127 100644 --- a/lez/sequencer/core/Cargo.toml +++ b/lez/sequencer/core/Cargo.toml @@ -18,6 +18,8 @@ testnet_initial_state.workspace = true faucet_core.workspace = true bridge_core.workspace = true vault_core.workspace = true +cross_zone_inbox_core = { workspace = true, features = ["host"] } +ping_core.workspace = true logos-blockchain-key-management-system-service.workspace = true logos-blockchain-core.workspace = true diff --git a/lez/sequencer/core/src/config.rs b/lez/sequencer/core/src/config.rs index b445bcd5..8e299152 100644 --- a/lez/sequencer/core/src/config.rs +++ b/lez/sequencer/core/src/config.rs @@ -10,6 +10,7 @@ use bytesize::ByteSize; use common::config::BasicAuth; use humantime_serde; use lee::AccountId; +use lee_core::program::ProgramId; use logos_blockchain_core::mantle::ops::channel::ChannelId; use serde::{Deserialize, Serialize}; use url::Url; @@ -27,6 +28,22 @@ pub enum GenesisAction { }, } +/// A peer zone whose outbox this zone watches for inbound cross-zone messages. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CrossZonePeer { + /// The peer's Bedrock channel; its 32 bytes double as the peer's zone id. + pub channel_id: [u8; 32], + /// Programs on this zone a message from this peer is allowed to target. + pub allowed_targets: Vec, +} + +/// Cross-zone watcher configuration: the peers this zone reads from Bedrock and, +/// per peer, the local programs they may deliver to. `None` disables the watcher. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CrossZoneConfig { + pub peers: Vec, +} + // TODO: Provide default values #[derive(Clone, Serialize, Deserialize)] pub struct SequencerConfig { @@ -53,6 +70,9 @@ pub struct SequencerConfig { /// Genesis configuration. #[serde(default)] pub genesis: Vec, + /// Cross-zone messaging configuration. `None` disables the watcher. + #[serde(default)] + pub cross_zone: Option, } #[derive(Clone, Serialize, Deserialize)] diff --git a/lez/sequencer/core/src/cross_zone_watcher.rs b/lez/sequencer/core/src/cross_zone_watcher.rs new file mode 100644 index 00000000..a81967b4 --- /dev/null +++ b/lez/sequencer/core/src/cross_zone_watcher.rs @@ -0,0 +1,216 @@ +use std::{collections::BTreeMap, time::Duration}; + +use common::{block::Block, transaction::LeeTransaction}; +use cross_zone_inbox_core::{ + CrossZoneMessage, InboxConfig, build_inbox_dispatch_tx, inbox_config_account_id, +}; +use futures::StreamExt as _; +use lee::{AccountId, program::Program}; +use lee_core::{account::Account, program::ProgramId}; +use log::{error, info, warn}; +use logos_blockchain_core::mantle::ops::channel::ChannelId; +use logos_blockchain_zone_sdk::{ + CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer, +}; +use mempool::MemPoolHandle; +use ping_core::SenderInstruction; + +use crate::{ + TransactionOrigin, + config::{BedrockConfig, CrossZoneConfig}, +}; + +/// The inbox config account this zone seeds at startup so the inbox guest can +/// authorize inbound peer messages. The config is zone-specific (self zone plus +/// per-peer target allowlists), so it cannot live in the shared genesis state. +#[must_use] +pub fn inbox_config_account(self_zone: [u8; 32], cross_zone: &CrossZoneConfig) -> (AccountId, Account) { + let inbox_id = Program::cross_zone_inbox().id(); + + let mut allowed_targets = BTreeMap::new(); + for peer in &cross_zone.peers { + allowed_targets.insert(peer.channel_id, peer.allowed_targets.clone()); + } + let config = InboxConfig { + self_zone, + allowed_peers: BTreeMap::new(), + allowed_targets, + }; + + let account = Account { + program_owner: inbox_id, + balance: 0, + data: config + .to_bytes() + .try_into() + .expect("inbox config fits in account data"), + nonce: 0_u128.into(), + }; + (inbox_config_account_id(inbox_id), account) +} + +/// Spawns one watcher task per configured peer. Each task reads the peer's +/// finalized blocks from Bedrock, recognizes outbound messages addressed to this +/// zone, and injects the matching inbox dispatch as a sequencer-origin +/// transaction into the local mempool. +pub fn spawn_watchers( + bedrock_config: &BedrockConfig, + cross_zone: &CrossZoneConfig, + poll_interval: Duration, + mempool_handle: MemPoolHandle<(TransactionOrigin, LeeTransaction)>, +) { + let self_zone: [u8; 32] = *bedrock_config.channel_id.as_ref(); + let inbox_id = Program::cross_zone_inbox().id(); + let emitter_id = Program::ping_sender().id(); + + for peer in cross_zone.peers.clone() { + let node = NodeHttpClient::new( + CommonHttpClient::new(bedrock_config.auth.clone().map(Into::into)), + bedrock_config.node_url.clone(), + ); + tokio::spawn(watch_peer( + ZoneIndexer::new(ChannelId::from(peer.channel_id), node), + peer.channel_id, + peer.allowed_targets, + self_zone, + inbox_id, + emitter_id, + poll_interval, + mempool_handle.clone(), + )); + } +} + +#[expect( + clippy::too_many_arguments, + reason = "Each parameter is an independent piece of per-peer watcher state" +)] +async fn watch_peer( + zone_indexer: ZoneIndexer, + peer_zone: [u8; 32], + allowed_targets: Vec, + self_zone: [u8; 32], + inbox_id: ProgramId, + emitter_id: ProgramId, + poll_interval: Duration, + mempool_handle: MemPoolHandle<(TransactionOrigin, LeeTransaction)>, +) { + info!("Cross-zone watcher started for peer {}", hex::encode(peer_zone)); + + let mut cursor = None; + loop { + let stream = match zone_indexer.next_messages(cursor).await { + Ok(stream) => stream, + Err(err) => { + error!( + "Watcher next_messages failed for peer {}: {err}", + hex::encode(peer_zone) + ); + tokio::time::sleep(poll_interval).await; + continue; + } + }; + let mut stream = std::pin::pin!(stream); + + while let Some((msg, slot)) = stream.next().await { + let zone_block = match msg { + ZoneMessage::Block(block) => block, + ZoneMessage::Deposit(_) | ZoneMessage::Withdraw(_) => continue, + }; + match borsh::from_slice::(&zone_block.data) { + Ok(block) => { + deliver_block( + &block, + peer_zone, + self_zone, + inbox_id, + emitter_id, + &allowed_targets, + &mempool_handle, + ) + .await; + } + Err(err) => error!("Watcher failed to deserialize peer block: {err}"), + } + cursor = Some(slot); + } + + // Stream ended (caught up to the peer's last finalized block); poll again. + tokio::time::sleep(poll_interval).await; + } +} + +/// Scans one peer block for outbound messages and injects a dispatch per match. +/// +/// Option A (M3): the watcher recognizes the demo emitter and reads the outbound +/// message straight off its instruction. M4 replaces this with re-derivation +/// from the outbox PDA write, which removes the emitter-specific decoding. +async fn deliver_block( + block: &Block, + peer_zone: [u8; 32], + self_zone: [u8; 32], + inbox_id: ProgramId, + emitter_id: ProgramId, + allowed_targets: &[ProgramId], + mempool_handle: &MemPoolHandle<(TransactionOrigin, LeeTransaction)>, +) { + for (index, tx) in block.body.transactions.iter().enumerate() { + let LeeTransaction::Public(public_tx) = tx else { + continue; + }; + let message = public_tx.message(); + if message.program_id != emitter_id { + continue; + } + + let SenderInstruction::Send { + target_zone, + target_program_id, + target_accounts, + payload, + .. + } = match risc0_zkvm::serde::from_slice(&message.instruction_data) { + Ok(send) => send, + Err(err) => { + warn!("Watcher could not decode emitter instruction: {err}"); + continue; + } + }; + + if target_zone != self_zone { + continue; + } + if !allowed_targets.contains(&target_program_id) { + warn!( + "Watcher dropping message to disallowed target from peer {}", + hex::encode(peer_zone) + ); + continue; + } + + let cross_zone_message = CrossZoneMessage { + src_zone: peer_zone, + src_block_id: block.header.block_id, + src_tx_index: u32::try_from(index).unwrap_or(u32::MAX), + src_program_id: emitter_id, + target_program_id, + payload, + l1_inclusion_witness: None, + }; + let target_ids: Vec = target_accounts.into_iter().map(AccountId::new).collect(); + let dispatch = build_inbox_dispatch_tx(inbox_id, &cross_zone_message, target_ids); + + match mempool_handle + .push((TransactionOrigin::Sequencer, LeeTransaction::Public(dispatch))) + .await + { + Ok(()) => info!( + "Watcher injected cross-zone dispatch from peer {} block {} tx {}", + hex::encode(peer_zone), + block.header.block_id, + index + ), + Err(err) => error!("Watcher failed to enqueue inbox dispatch: {err}"), + } + } +} diff --git a/lez/sequencer/core/src/lib.rs b/lez/sequencer/core/src/lib.rs index 651951d2..fec6f57d 100644 --- a/lez/sequencer/core/src/lib.rs +++ b/lez/sequencer/core/src/lib.rs @@ -31,6 +31,7 @@ use crate::{ pub mod block_publisher; pub mod block_store; pub mod config; +pub mod cross_zone_watcher; #[cfg(feature = "mock")] pub mod mock; @@ -158,6 +159,17 @@ impl SequencerCore { .expect("Failed to publish genesis block"); } + // Cross-zone messaging: start a watcher per configured peer. The inbox + // config account is seeded into genesis state in `build_genesis_state`. + if let Some(cross_zone) = &config.cross_zone { + cross_zone_watcher::spawn_watchers( + &config.bedrock_config, + cross_zone, + config.block_create_timeout, + mempool_handle.clone(), + ); + } + let sequencer_core = Self { state, store, @@ -609,6 +621,15 @@ fn build_genesis_state(config: &SequencerConfig) -> (lee::V03State, Vec, channel_id: ChannelId, + cross_zone: Option, ) -> Result { let SequencerPartialConfig { max_num_tx_in_block, @@ -91,6 +92,7 @@ pub fn sequencer_config( .context("Failed to convert bedrock addr to URL")?, auth: None, }, + cross_zone, }) } diff --git a/test_fixtures/src/lib.rs b/test_fixtures/src/lib.rs index 8ca97147..46ac0668 100644 --- a/test_fixtures/src/lib.rs +++ b/test_fixtures/src/lib.rs @@ -344,6 +344,7 @@ impl TestContextBuilder { bedrock_addr, genesis, config::bedrock_channel_id(), + None, ) .await .context("Failed to setup Sequencer")?; diff --git a/test_fixtures/src/setup.rs b/test_fixtures/src/setup.rs index 85652e2f..df26141a 100644 --- a/test_fixtures/src/setup.rs +++ b/test_fixtures/src/setup.rs @@ -116,6 +116,7 @@ pub async fn setup_sequencer( bedrock_addr: SocketAddr, genesis_transactions: Vec, channel_id: ChannelId, + cross_zone: Option, ) -> Result<(SequencerHandle, TempDir)> { let temp_sequencer_dir = tempfile::tempdir().context("Failed to create temp dir for sequencer home")?; @@ -131,6 +132,7 @@ pub async fn setup_sequencer( bedrock_addr, genesis_transactions, channel_id, + cross_zone, ) .context("Failed to create Sequencer config")?;