From 0ddb7a92912aa8ecd0b5e9eef4c7ae6b54a8c345 Mon Sep 17 00:00:00 2001 From: moudyellaz Date: Tue, 23 Jun 2026 14:21:16 +0200 Subject: [PATCH] feat(cross-zone): add Option B indexer verifier and seed inbox config consistently --- Cargo.lock | 4 + .../tests/indexer_ffi_helpers/mod.rs | 1 + integration_tests/tests/two_zone.rs | 4 +- lez/indexer/core/Cargo.toml | 4 + lez/indexer/core/src/block_store.rs | 15 +- lez/indexer/core/src/config.rs | 4 + lez/indexer/core/src/cross_zone_verifier.rs | 369 ++++++++++++++++++ lez/indexer/core/src/lib.rs | 33 +- lez/sequencer/core/src/config.rs | 17 +- lez/sequencer/core/src/cross_zone_watcher.rs | 56 +-- lez/sequencer/core/src/lib.rs | 2 +- programs/cross_zone_inbox/core/src/lib.rs | 78 ++++ test_fixtures/src/config.rs | 2 + test_fixtures/src/lib.rs | 2 +- test_fixtures/src/setup.rs | 11 +- 15 files changed, 528 insertions(+), 74 deletions(-) create mode 100644 lez/indexer/core/src/cross_zone_verifier.rs diff --git a/Cargo.lock b/Cargo.lock index ca99945c..ebe1637b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3824,13 +3824,17 @@ dependencies = [ "authenticated_transfer_core", "borsh", "common", + "cross_zone_inbox_core", "futures", + "hex", "humantime-serde", "lee", "lee_core", "log", "logos-blockchain-core", "logos-blockchain-zone-sdk", + "ping_core", + "risc0-zkvm", "serde", "serde_json", "storage", diff --git a/integration_tests/tests/indexer_ffi_helpers/mod.rs b/integration_tests/tests/indexer_ffi_helpers/mod.rs index fe8f8989..51320193 100644 --- a/integration_tests/tests/indexer_ffi_helpers/mod.rs +++ b/integration_tests/tests/indexer_ffi_helpers/mod.rs @@ -61,6 +61,7 @@ pub fn setup_indexer_ffi( bedrock_addr, temp_indexer_dir.path().to_owned(), integration_tests::config::bedrock_channel_id(), + None, ) .context("Failed to create Indexer config")?; diff --git a/integration_tests/tests/two_zone.rs b/integration_tests/tests/two_zone.rs index 90f994cb..4770a9d7 100644 --- a/integration_tests/tests/two_zone.rs +++ b/integration_tests/tests/two_zone.rs @@ -39,13 +39,13 @@ async fn two_zones_share_one_bedrock_and_both_advance() -> Result<()> { let (seq_a, _seq_a_home) = setup_sequencer(partial, bedrock_addr, vec![], channel_a, None) .await .context("Failed to set up zone A sequencer")?; - let (idx_a, _idx_a_home) = setup_indexer(bedrock_addr, channel_a) + let (idx_a, _idx_a_home) = setup_indexer(bedrock_addr, channel_a, None) .await .context("Failed to set up zone A indexer")?; let (seq_b, _seq_b_home) = setup_sequencer(partial, bedrock_addr, vec![], channel_b, None) .await .context("Failed to set up zone B sequencer")?; - let (idx_b, _idx_b_home) = setup_indexer(bedrock_addr, channel_b) + let (idx_b, _idx_b_home) = setup_indexer(bedrock_addr, channel_b, None) .await .context("Failed to set up zone B indexer")?; diff --git a/lez/indexer/core/Cargo.toml b/lez/indexer/core/Cargo.toml index c6cc5fc6..249e6e49 100644 --- a/lez/indexer/core/Cargo.toml +++ b/lez/indexer/core/Cargo.toml @@ -12,6 +12,8 @@ common.workspace = true logos-blockchain-zone-sdk.workspace = true lee.workspace = true lee_core.workspace = true +cross_zone_inbox_core = { workspace = true, features = ["host"] } +ping_core.workspace = true storage.workspace = true testnet_initial_state.workspace = true @@ -26,6 +28,8 @@ logos-blockchain-core.workspace = true serde_json.workspace = true async-stream.workspace = true tokio.workspace = true +risc0-zkvm.workspace = true +hex.workspace = true [dev-dependencies] tempfile.workspace = true diff --git a/lez/indexer/core/src/block_store.rs b/lez/indexer/core/src/block_store.rs index f00c94c5..6b7c5c83 100644 --- a/lez/indexer/core/src/block_store.rs +++ b/lez/indexer/core/src/block_store.rs @@ -22,8 +22,13 @@ pub struct IndexerStore { impl IndexerStore { /// Starting database at the start of new chain. /// Creates files if necessary. - pub fn open_db(location: &Path) -> Result { - let initial_state = testnet_initial_state::initial_state(); + pub fn open_db(location: &Path, genesis_seed: Option<(AccountId, Account)>) -> Result { + let mut initial_state = testnet_initial_state::initial_state(); + // Seed any zone-specific genesis accounts (e.g. the cross-zone inbox + // config) so the indexer's replayed state matches the sequencer's. + if let Some((account_id, account)) = genesis_seed { + initial_state.insert_genesis_account(account_id, account); + } let dbio = RocksDBIO::open_or_create(location, &initial_state)?; let current_state = dbio.final_state()?; @@ -215,7 +220,7 @@ mod tests { fn correct_startup() { let home = tempdir().unwrap(); - let storage = IndexerStore::open_db(home.as_ref()).unwrap(); + let storage = IndexerStore::open_db(home.as_ref(), None).unwrap(); let final_id = storage.get_last_block_id().unwrap(); @@ -226,7 +231,7 @@ mod tests { async fn state_transition() { let home = tempdir().unwrap(); - let storage = IndexerStore::open_db(home.as_ref()).unwrap(); + let storage = IndexerStore::open_db(home.as_ref(), None).unwrap(); let initial_accounts = initial_pub_accounts_private_keys(); let from = initial_accounts[0].account_id; @@ -278,7 +283,7 @@ mod tests { async fn account_state_at_block() { let home = tempdir().unwrap(); - let storage = IndexerStore::open_db(home.as_ref()).unwrap(); + let storage = IndexerStore::open_db(home.as_ref(), None).unwrap(); let mut prev_hash = None; diff --git a/lez/indexer/core/src/config.rs b/lez/indexer/core/src/config.rs index 6a019828..bcf4a31a 100644 --- a/lez/indexer/core/src/config.rs +++ b/lez/indexer/core/src/config.rs @@ -7,6 +7,7 @@ use std::{ use anyhow::{Context as _, Result}; use common::config::BasicAuth; +use cross_zone_inbox_core::CrossZoneConfig; use humantime_serde; pub use logos_blockchain_core::mantle::ops::channel::ChannelId; use serde::{Deserialize, Serialize}; @@ -27,6 +28,9 @@ pub struct IndexerConfig { pub consensus_info_polling_interval: Duration, pub bedrock_config: ClientConfig, pub channel_id: ChannelId, + /// Cross-zone configuration. `None` disables the indexer's cross-zone handling. + #[serde(default)] + pub cross_zone: Option, } impl IndexerConfig { diff --git a/lez/indexer/core/src/cross_zone_verifier.rs b/lez/indexer/core/src/cross_zone_verifier.rs new file mode 100644 index 00000000..6514ed87 --- /dev/null +++ b/lez/indexer/core/src/cross_zone_verifier.rs @@ -0,0 +1,369 @@ +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, +}; + +use anyhow::{Context as _, Result, bail}; +use common::{block::Block, transaction::LeeTransaction}; +use cross_zone_inbox_core::{ + CrossZoneMessage, Instruction as InboxInstruction, MessageKey, ZoneId, + build_dispatch_from_emission, message_key, +}; +use futures::StreamExt as _; +use lee::program::Program; +use lee_core::program::ProgramId; +use log::{error, info}; +use logos_blockchain_core::mantle::ops::channel::ChannelId; +use logos_blockchain_zone_sdk::{ + CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer, +}; +use ping_core::SenderInstruction; +use tokio::sync::RwLock; + +use crate::config::IndexerConfig; + +/// How long the verifier waits for a referenced peer block to finalize before +/// rejecting the dispatch as referencing a nonexistent block. +const PEER_BLOCK_WAIT: Duration = Duration::from_secs(60); + +/// Cache of finalized peer-zone blocks, filled by per-peer reader tasks and read +/// by the verifier to re-derive cross-zone dispatch transactions. +#[derive(Clone, Default)] +struct PeerBlocks { + chains: Arc>>>, +} + +impl PeerBlocks { + async fn insert(&self, zone: ZoneId, block: Block) { + self.chains + .write() + .await + .entry(zone) + .or_default() + .insert(block.header.block_id, block); + } + + async fn get(&self, zone: ZoneId, block_id: u64) -> Option { + self.chains + .read() + .await + .get(&zone) + .and_then(|chain| chain.get(&block_id).cloned()) + } +} + +/// The indexer-side Option B verifier. For every cross-zone dispatch in a block +/// it re-derives the transaction from the peer's finalized block and rejects it +/// if the bytes differ (a forgery) or the message was already delivered (a +/// replay), so delivery no longer relies on trusting the sequencer. +#[derive(Clone)] +pub struct CrossZoneVerifier { + self_zone: ZoneId, + inbox_id: ProgramId, + emitter_id: ProgramId, + peers: PeerBlocks, + seen: Arc>>, +} + +impl CrossZoneVerifier { + /// Builds the verifier and spawns one peer reader per configured peer. + /// Returns `None` when cross-zone messaging is disabled. + pub fn start(config: &IndexerConfig) -> Option { + let cross_zone = config.cross_zone.as_ref()?; + let self_zone: ZoneId = *config.channel_id.as_ref(); + let peers = PeerBlocks::default(); + + for peer in &cross_zone.peers { + let node = NodeHttpClient::new( + CommonHttpClient::new(config.bedrock_config.auth.clone().map(Into::into)), + config.bedrock_config.addr.clone(), + ); + tokio::spawn(read_peer( + ZoneIndexer::new(ChannelId::from(peer.channel_id), node), + peer.channel_id, + peers.clone(), + config.consensus_info_polling_interval, + )); + } + + Some(Self { + self_zone, + inbox_id: Program::cross_zone_inbox().id(), + emitter_id: Program::ping_sender().id(), + peers, + seen: Arc::new(RwLock::new(HashSet::new())), + }) + } + + /// Verifies every cross-zone dispatch in a block, returning `Err` on the + /// first forged or replayed dispatch. The caller halts ingestion on error. + pub async fn verify_block(&self, block: &Block) -> Result<()> { + for tx in &block.body.transactions { + let Some(msg) = self.decode_dispatch(tx) else { + continue; + }; + + let key = message_key(&msg.src_zone, msg.src_block_id, msg.src_tx_index); + if self.seen.read().await.contains(&key) { + bail!("cross-zone replay: message {} re-delivered", hex::encode(key)); + } + + let expected = self.rederive(&msg).await?; + if LeeTransaction::Public(expected) != *tx { + bail!( + "forged cross-zone dispatch from zone {} block {} tx {}: re-derivation mismatch", + hex::encode(msg.src_zone), + msg.src_block_id, + msg.src_tx_index + ); + } + + self.seen.write().await.insert(key); + info!( + "Verified cross-zone dispatch from zone {} block {} tx {}", + hex::encode(msg.src_zone), + msg.src_block_id, + msg.src_tx_index + ); + } + Ok(()) + } + + /// Decodes a transaction into the cross-zone message it dispatches, or `None` + /// if it is not an inbox dispatch. + fn decode_dispatch(&self, tx: &LeeTransaction) -> Option { + let LeeTransaction::Public(public_tx) = tx else { + return None; + }; + if public_tx.message().program_id != self.inbox_id { + return None; + } + match risc0_zkvm::serde::from_slice::( + &public_tx.message().instruction_data, + ) { + Ok(InboxInstruction::Dispatch(msg)) => Some(msg), + Err(_) => None, + } + } + + /// Re-derives the dispatch transaction the watcher should have injected for + /// `msg`, reading the source emission from the peer's finalized block. + async fn rederive(&self, msg: &CrossZoneMessage) -> Result { + let peer_block = self + .wait_for_peer_block(msg.src_zone, msg.src_block_id) + .await + .with_context(|| { + format!( + "no peer block {} from zone {} to verify against", + msg.src_block_id, + hex::encode(msg.src_zone) + ) + })?; + + let emission = peer_block + .body + .transactions + .get(msg.src_tx_index as usize) + .ok_or_else(|| { + anyhow::anyhow!("src_tx_index {} out of range in peer block", msg.src_tx_index) + })?; + + let LeeTransaction::Public(emission) = emission else { + bail!("peer emission transaction is not public"); + }; + if emission.message().program_id != self.emitter_id { + bail!("peer transaction at src_tx_index is not an emitter transaction"); + } + + let SenderInstruction::Send { + target_zone, + target_program_id, + target_accounts, + payload, + .. + } = risc0_zkvm::serde::from_slice(&emission.message().instruction_data) + .context("decode peer emission instruction")?; + + if target_zone != self.self_zone { + bail!("peer emission targets a different zone"); + } + + Ok(build_dispatch_from_emission( + self.inbox_id, + msg.src_zone, + msg.src_block_id, + msg.src_tx_index, + self.emitter_id, + target_program_id, + &target_accounts, + payload, + )) + } + + /// Polls the peer cache until the referenced block finalizes. A forged + /// reference to a never-finalized block times out and is rejected. + async fn wait_for_peer_block(&self, zone: ZoneId, block_id: u64) -> Result { + let mut waited = Duration::ZERO; + loop { + if let Some(block) = self.peers.get(zone, block_id).await { + return Ok(block); + } + if waited >= PEER_BLOCK_WAIT { + bail!( + "peer block {} from zone {} did not finalize within {:?}", + block_id, + hex::encode(zone), + PEER_BLOCK_WAIT + ); + } + tokio::time::sleep(Duration::from_secs(1)).await; + waited += Duration::from_secs(1); + } + } +} + +/// Reads a peer zone's finalized blocks from Bedrock into the shared cache. +async fn read_peer( + zone_indexer: ZoneIndexer, + peer_zone: ZoneId, + peers: PeerBlocks, + poll_interval: Duration, +) { + info!("Cross-zone peer reader started for {}", hex::encode(peer_zone)); + + let mut cursor = None; + loop { + let stream = match zone_indexer.next_messages(cursor).await { + Ok(stream) => stream, + Err(err) => { + error!( + "Peer reader next_messages failed for {}: {err}", + hex::encode(peer_zone) + ); + tokio::time::sleep(poll_interval).await; + continue; + } + }; + let mut stream = std::pin::pin!(stream); + + while let Some((msg, slot)) = stream.next().await { + if let ZoneMessage::Block(zone_block) = msg { + match borsh::from_slice::(&zone_block.data) { + Ok(block) => peers.insert(peer_zone, block).await, + Err(err) => error!("Peer reader failed to deserialize block: {err}"), + } + } + cursor = Some(slot); + } + + tokio::time::sleep(poll_interval).await; + } +} + +#[cfg(test)] +mod tests { + use common::test_utils::produce_dummy_block; + use lee::{ + PublicTransaction, + program::Program, + public_transaction::{Message, WitnessSet}, + }; + use ping_core::ping_record_pda; + + use super::*; + + const SELF_ZONE: ZoneId = [1; 32]; + const PEER_ZONE: ZoneId = [2; 32]; + const PEER_BLOCK_ID: u64 = 5; + + fn verifier() -> CrossZoneVerifier { + CrossZoneVerifier { + self_zone: SELF_ZONE, + inbox_id: Program::cross_zone_inbox().id(), + emitter_id: Program::ping_sender().id(), + peers: PeerBlocks::default(), + seen: Arc::new(RwLock::new(HashSet::new())), + } + } + + /// A ping_sender emission addressed to `SELF_ZONE` carrying `payload`. + fn emission(payload: &[u8]) -> LeeTransaction { + let receiver_id = Program::ping_receiver().id(); + let send = SenderInstruction::Send { + outbox_program_id: Program::cross_zone_outbox().id(), + target_zone: SELF_ZONE, + target_program_id: receiver_id, + target_accounts: vec![ping_record_pda(receiver_id).into_value()], + payload: payload.to_vec(), + ordinal: 0, + }; + let message = Message::try_new(Program::ping_sender().id(), vec![], vec![], send) + .expect("emission serializes"); + LeeTransaction::Public(PublicTransaction::new( + message, + WitnessSet::from_raw_parts(vec![]), + )) + } + + /// The dispatch a watcher would inject for a `PEER_BLOCK_ID` emission of `payload`. + fn dispatch(payload: &[u8]) -> LeeTransaction { + let receiver_id = Program::ping_receiver().id(); + LeeTransaction::Public(build_dispatch_from_emission( + Program::cross_zone_inbox().id(), + PEER_ZONE, + PEER_BLOCK_ID, + 0, + Program::ping_sender().id(), + receiver_id, + &[ping_record_pda(receiver_id).into_value()], + payload.to_vec(), + )) + } + + #[tokio::test] + async fn verifies_dispatch_matching_a_peer_emission() { + let verifier = verifier(); + verifier + .peers + .insert(PEER_ZONE, produce_dummy_block(PEER_BLOCK_ID, None, vec![emission(b"hi")])) + .await; + + let block = produce_dummy_block(9, None, vec![dispatch(b"hi")]); + verifier + .verify_block(&block) + .await + .expect("dispatch matching the peer emission verifies"); + } + + #[tokio::test] + async fn rejects_dispatch_with_no_matching_emission() { + let verifier = verifier(); + // The peer block carries the real emission, but the block claims a + // different payload, so re-derivation does not reproduce it. + verifier + .peers + .insert(PEER_ZONE, produce_dummy_block(PEER_BLOCK_ID, None, vec![emission(b"real")])) + .await; + + let block = produce_dummy_block(9, None, vec![dispatch(b"forged")]); + let err = verifier.verify_block(&block).await.unwrap_err(); + assert!(err.to_string().contains("forged"), "unexpected error: {err}"); + } + + #[tokio::test] + async fn rejects_replayed_dispatch() { + let verifier = verifier(); + verifier + .peers + .insert(PEER_ZONE, produce_dummy_block(PEER_BLOCK_ID, None, vec![emission(b"hi")])) + .await; + + let first = produce_dummy_block(9, None, vec![dispatch(b"hi")]); + verifier.verify_block(&first).await.expect("first delivery verifies"); + + let replay = produce_dummy_block(10, None, vec![dispatch(b"hi")]); + let err = verifier.verify_block(&replay).await.unwrap_err(); + assert!(err.to_string().contains("replay"), "unexpected error: {err}"); + } +} diff --git a/lez/indexer/core/src/lib.rs b/lez/indexer/core/src/lib.rs index b0416905..ee0e813d 100644 --- a/lez/indexer/core/src/lib.rs +++ b/lez/indexer/core/src/lib.rs @@ -10,16 +10,20 @@ use logos_blockchain_zone_sdk::{ CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer, }; -use crate::{block_store::IndexerStore, config::IndexerConfig}; +use crate::{ + block_store::IndexerStore, config::IndexerConfig, cross_zone_verifier::CrossZoneVerifier, +}; pub mod block_store; pub mod config; +pub mod cross_zone_verifier; #[derive(Clone)] pub struct IndexerCore { pub zone_indexer: Arc>, pub config: IndexerConfig, pub store: IndexerStore, + verifier: Option, } impl IndexerCore { @@ -33,10 +37,22 @@ impl IndexerCore { ); let zone_indexer = ZoneIndexer::new(config.channel_id, node); + // Seed the inbox config so the indexer can replay cross-zone dispatch + // transactions, matching the account the sequencer seeds at genesis. + let inbox_config_seed = config.cross_zone.as_ref().map(|cross_zone| { + let self_zone: [u8; 32] = *config.channel_id.as_ref(); + cross_zone_inbox_core::build_inbox_config_account(self_zone, cross_zone) + }); + + // Option B verifier: re-derives each cross-zone dispatch from the peer's + // finalized blocks. `None` when cross-zone messaging is disabled. + let verifier = CrossZoneVerifier::start(&config); + Ok(Self { zone_indexer: Arc::new(zone_indexer), config, - store: IndexerStore::open_db(&home)?, + store: IndexerStore::open_db(&home, inbox_config_seed)?, + verifier, }) } @@ -91,6 +107,19 @@ impl IndexerCore { info!("Indexed L2 block {}", block.header.block_id); + // Option B: re-derive and verify every cross-zone dispatch + // before applying the block. A forged or replayed dispatch + // halts ingestion rather than persisting an invalid state. + if let Some(verifier) = &self.verifier { + if let Err(err) = verifier.verify_block(&block).await { + error!( + "Cross-zone verification failed for block {}: {err:#}. Halting indexer ingestion.", + block.header.block_id + ); + return; + } + } + // TODO: Remove l1_header placeholder once storage layer // no longer requires it. Zone-sdk handles L1 tracking internally. let placeholder_l1_header = HeaderId::from([0_u8; 32]); diff --git a/lez/sequencer/core/src/config.rs b/lez/sequencer/core/src/config.rs index 8e299152..ce686676 100644 --- a/lez/sequencer/core/src/config.rs +++ b/lez/sequencer/core/src/config.rs @@ -10,7 +10,6 @@ use bytesize::ByteSize; use common::config::BasicAuth; use humantime_serde; use lee::AccountId; -use lee_core::program::ProgramId; use logos_blockchain_core::mantle::ops::channel::ChannelId; use serde::{Deserialize, Serialize}; use url::Url; @@ -28,21 +27,7 @@ pub enum GenesisAction { }, } -/// A peer zone whose outbox this zone watches for inbound cross-zone messages. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct CrossZonePeer { - /// The peer's Bedrock channel; its 32 bytes double as the peer's zone id. - pub channel_id: [u8; 32], - /// Programs on this zone a message from this peer is allowed to target. - pub allowed_targets: Vec, -} - -/// Cross-zone watcher configuration: the peers this zone reads from Bedrock and, -/// per peer, the local programs they may deliver to. `None` disables the watcher. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct CrossZoneConfig { - pub peers: Vec, -} +pub use cross_zone_inbox_core::{CrossZoneConfig, CrossZonePeer}; // TODO: Provide default values #[derive(Clone, Serialize, Deserialize)] diff --git a/lez/sequencer/core/src/cross_zone_watcher.rs b/lez/sequencer/core/src/cross_zone_watcher.rs index a81967b4..8a176f3b 100644 --- a/lez/sequencer/core/src/cross_zone_watcher.rs +++ b/lez/sequencer/core/src/cross_zone_watcher.rs @@ -1,12 +1,10 @@ -use std::{collections::BTreeMap, time::Duration}; +use std::time::Duration; use common::{block::Block, transaction::LeeTransaction}; -use cross_zone_inbox_core::{ - CrossZoneMessage, InboxConfig, build_inbox_dispatch_tx, inbox_config_account_id, -}; +use cross_zone_inbox_core::build_dispatch_from_emission; use futures::StreamExt as _; -use lee::{AccountId, program::Program}; -use lee_core::{account::Account, program::ProgramId}; +use lee::program::Program; +use lee_core::program::ProgramId; use log::{error, info, warn}; use logos_blockchain_core::mantle::ops::channel::ChannelId; use logos_blockchain_zone_sdk::{ @@ -20,35 +18,6 @@ use crate::{ config::{BedrockConfig, CrossZoneConfig}, }; -/// The inbox config account this zone seeds at startup so the inbox guest can -/// authorize inbound peer messages. The config is zone-specific (self zone plus -/// per-peer target allowlists), so it cannot live in the shared genesis state. -#[must_use] -pub fn inbox_config_account(self_zone: [u8; 32], cross_zone: &CrossZoneConfig) -> (AccountId, Account) { - let inbox_id = Program::cross_zone_inbox().id(); - - let mut allowed_targets = BTreeMap::new(); - for peer in &cross_zone.peers { - allowed_targets.insert(peer.channel_id, peer.allowed_targets.clone()); - } - let config = InboxConfig { - self_zone, - allowed_peers: BTreeMap::new(), - allowed_targets, - }; - - let account = Account { - program_owner: inbox_id, - balance: 0, - data: config - .to_bytes() - .try_into() - .expect("inbox config fits in account data"), - nonce: 0_u128.into(), - }; - (inbox_config_account_id(inbox_id), account) -} - /// Spawns one watcher task per configured peer. Each task reads the peer's /// finalized blocks from Bedrock, recognizes outbound messages addressed to this /// zone, and injects the matching inbox dispatch as a sequencer-origin @@ -188,17 +157,16 @@ async fn deliver_block( continue; } - let cross_zone_message = CrossZoneMessage { - src_zone: peer_zone, - src_block_id: block.header.block_id, - src_tx_index: u32::try_from(index).unwrap_or(u32::MAX), - src_program_id: emitter_id, + let dispatch = build_dispatch_from_emission( + inbox_id, + peer_zone, + block.header.block_id, + u32::try_from(index).unwrap_or(u32::MAX), + emitter_id, target_program_id, + &target_accounts, payload, - l1_inclusion_witness: None, - }; - let target_ids: Vec = target_accounts.into_iter().map(AccountId::new).collect(); - let dispatch = build_inbox_dispatch_tx(inbox_id, &cross_zone_message, target_ids); + ); match mempool_handle .push((TransactionOrigin::Sequencer, LeeTransaction::Public(dispatch))) diff --git a/lez/sequencer/core/src/lib.rs b/lez/sequencer/core/src/lib.rs index fec6f57d..69c596e3 100644 --- a/lez/sequencer/core/src/lib.rs +++ b/lez/sequencer/core/src/lib.rs @@ -626,7 +626,7 @@ fn build_genesis_state(config: &SequencerConfig) -> (lee::V03State, Vec, +} + +/// Cross-zone configuration shared by a zone's sequencer (watcher) and indexer +/// (verifier): the peers it reads from Bedrock and, per peer, the local programs +/// they may deliver to. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CrossZoneConfig { + pub peers: Vec, +} + /// A finalized outbound message observed on a peer zone, addressed to a program /// on this zone. The watcher fills it from the peer's block; it is never /// self-reported by a user. @@ -183,6 +200,67 @@ pub fn build_inbox_dispatch_tx( ) } +/// Builds the dispatch transaction for one peer emission. Both the sequencer's +/// watcher and the indexer's verifier go through this so their transactions are +/// byte-identical for the same emission (the basis of the Option B check). +#[cfg(feature = "host")] +#[must_use] +pub fn build_dispatch_from_emission( + inbox_id: ProgramId, + src_zone: ZoneId, + src_block_id: u64, + src_tx_index: u32, + src_program_id: ProgramId, + target_program_id: ProgramId, + target_accounts: &[[u8; 32]], + payload: Vec, +) -> lee::PublicTransaction { + let msg = CrossZoneMessage { + src_zone, + src_block_id, + src_tx_index, + src_program_id, + target_program_id, + payload, + l1_inclusion_witness: None, + }; + let target_ids = target_accounts.iter().copied().map(AccountId::new).collect(); + build_inbox_dispatch_tx(inbox_id, &msg, target_ids) +} + +/// Builds the inbox config account a zone seeds into genesis state so the inbox +/// guest can authorize inbound peer messages. The sequencer and indexer seed the +/// same account from the same config, keeping their replayed state consistent. +#[cfg(feature = "host")] +#[must_use] +pub fn build_inbox_config_account( + self_zone: ZoneId, + cross_zone: &CrossZoneConfig, +) -> (AccountId, lee_core::account::Account) { + let inbox_id = lee::program::Program::cross_zone_inbox().id(); + + let mut allowed_targets = BTreeMap::new(); + for peer in &cross_zone.peers { + allowed_targets.insert(peer.channel_id, peer.allowed_targets.clone()); + } + let config = InboxConfig { + self_zone, + allowed_peers: BTreeMap::new(), + allowed_targets, + }; + + let account = lee_core::account::Account { + program_owner: inbox_id, + balance: 0, + data: config + .to_bytes() + .try_into() + .expect("inbox config fits in account data"), + nonce: 0_u128.into(), + }; + (inbox_config_account_id(inbox_id), account) +} + #[cfg(test)] mod tests { use super::*; diff --git a/test_fixtures/src/config.rs b/test_fixtures/src/config.rs index 48a15a3a..a58be4fe 100644 --- a/test_fixtures/src/config.rs +++ b/test_fixtures/src/config.rs @@ -170,6 +170,7 @@ pub fn indexer_config( bedrock_addr: SocketAddr, home: PathBuf, channel_id: ChannelId, + cross_zone: Option, ) -> Result { Ok(IndexerConfig { home, @@ -180,6 +181,7 @@ pub fn indexer_config( auth: None, }, channel_id, + cross_zone, }) } diff --git a/test_fixtures/src/lib.rs b/test_fixtures/src/lib.rs index 46ac0668..283240d9 100644 --- a/test_fixtures/src/lib.rs +++ b/test_fixtures/src/lib.rs @@ -308,7 +308,7 @@ impl TestContextBuilder { let indexer_components = if enable_indexer { let (indexer_handle, temp_indexer_dir) = - setup_indexer(bedrock_addr, config::bedrock_channel_id()) + setup_indexer(bedrock_addr, config::bedrock_channel_id(), None) .await .context("Failed to setup Indexer")?; let indexer_url = config::addr_to_url(config::UrlProtocol::Ws, indexer_handle.addr()) diff --git a/test_fixtures/src/setup.rs b/test_fixtures/src/setup.rs index df26141a..dd8753f3 100644 --- a/test_fixtures/src/setup.rs +++ b/test_fixtures/src/setup.rs @@ -92,6 +92,7 @@ pub async fn setup_bedrock_node() -> Result<(DockerCompose, SocketAddr)> { pub async fn setup_indexer( bedrock_addr: SocketAddr, channel_id: ChannelId, + cross_zone: Option, ) -> Result<(IndexerHandle, TempDir)> { let temp_indexer_dir = tempfile::tempdir().context("Failed to create temp dir for indexer home")?; @@ -101,9 +102,13 @@ pub async fn setup_indexer( temp_indexer_dir.path().display() ); - let indexer_config = - config::indexer_config(bedrock_addr, temp_indexer_dir.path().to_owned(), channel_id) - .context("Failed to create Indexer config")?; + let indexer_config = config::indexer_config( + bedrock_addr, + temp_indexer_dir.path().to_owned(), + channel_id, + cross_zone, + ) + .context("Failed to create Indexer config")?; indexer_service::run_server(indexer_config, 0) .await