diff --git a/integration_tests/tests/cross_zone_verified.rs b/integration_tests/tests/cross_zone_verified.rs new file mode 100644 index 00000000..e25312eb --- /dev/null +++ b/integration_tests/tests/cross_zone_verified.rs @@ -0,0 +1,152 @@ +#![expect( + clippy::tests_outside_test_module, + clippy::arithmetic_side_effects, + reason = "We don't care about these in tests" +)] + +//! Cross-zone round trip with the indexer in the loop (Option B). A ping on zone +//! A is delivered to zone B, and zone B's indexer independently re-derives the +//! injected dispatch from zone A's finalized blocks before applying it. The +//! payload landing in the indexer's state proves verification passed; a forgery +//! would have halted the indexer instead. + +use std::{net::SocketAddr, time::Duration}; + +use anyhow::{Context as _, Result}; +use common::transaction::LeeTransaction; +use cross_zone_outbox_core::outbox_pda; +use integration_tests::{ + config::{self, SequencerPartialConfig}, + indexer_client::IndexerClient, + setup::{setup_bedrock_node, setup_indexer, setup_sequencer}, +}; +use lee::{AccountId, PublicTransaction, program::Program, public_transaction::Message}; +use lee_core::program::ProgramId; +use ping_core::{ReceiverInstruction, SenderInstruction, ping_record_pda}; +use sequencer_core::config::{CrossZoneConfig, CrossZonePeer}; +use sequencer_service_rpc::{RpcClient as _, SequencerClient, SequencerClientBuilder}; +use tokio::test; + +const DELIVERY_TIMEOUT: Duration = Duration::from_secs(600); +const PING_PAYLOAD: &[u8] = b"hello-verified-zone"; + +#[test] +async fn indexer_verifies_and_delivers_cross_zone_ping() -> Result<()> { + // Declared first so it outlives both zones (drops run in reverse order). + let (_bedrock, bedrock_addr) = setup_bedrock_node() + .await + .context("Failed to set up shared Bedrock node")?; + + let partial = SequencerPartialConfig::default(); + let channel_a = config::bedrock_channel_id(); + let channel_b = config::bedrock_channel_id_b(); + let zone_a: [u8; 32] = *channel_a.as_ref(); + let zone_b: [u8; 32] = *channel_b.as_ref(); + + let receiver_id = Program::ping_receiver().id(); + let cross_zone = CrossZoneConfig { + peers: vec![CrossZonePeer { + channel_id: zone_a, + allowed_targets: vec![receiver_id], + }], + }; + + // Zone A: source. Zone B: destination, with the watcher on its sequencer and + // the verifier on its indexer. + let (seq_a, _seq_a_home) = setup_sequencer(partial, bedrock_addr, vec![], channel_a, None) + .await + .context("Failed to set up zone A sequencer")?; + let (_idx_a, _idx_a_home) = setup_indexer(bedrock_addr, channel_a, None) + .await + .context("Failed to set up zone A indexer")?; + let (_seq_b, _seq_b_home) = + setup_sequencer(partial, bedrock_addr, vec![], channel_b, Some(cross_zone.clone())) + .await + .context("Failed to set up zone B sequencer")?; + let (idx_b, _idx_b_home) = setup_indexer(bedrock_addr, channel_b, Some(cross_zone)) + .await + .context("Failed to set up zone B indexer")?; + + // Submit the ping on zone A, addressed to ping_receiver on zone B. + let ping = build_ping_tx(zone_b, receiver_id); + sequencer_client(seq_a.addr())? + .send_transaction(ping) + .await + .context("Failed to submit ping on zone A")?; + + // Wait until zone B's indexer records the delivered payload. The indexer only + // applies the dispatch after re-deriving and verifying it. + let record_id = ping_record_pda(receiver_id); + let indexer_url = config::addr_to_url(config::UrlProtocol::Ws, idx_b.addr()) + .context("Failed to build indexer URL")?; + let indexer = IndexerClient::new(&indexer_url) + .await + .context("Failed to build indexer client")?; + + let delivered = wait_for_indexer_delivery(&indexer, record_id).await?; + assert_eq!( + delivered, PING_PAYLOAD, + "Zone B's indexer must record the verified cross-zone payload" + ); + Ok(()) +} + +fn build_ping_tx(target_zone: [u8; 32], receiver_id: ProgramId) -> LeeTransaction { + let outbox_id = Program::cross_zone_outbox().id(); + let ordinal = 0; + + let words = risc0_zkvm::serde::to_vec(&ReceiverInstruction::Record { + payload: PING_PAYLOAD.to_vec(), + }) + .expect("serialize ping instruction"); + let payload: Vec = words.iter().flat_map(|word| word.to_le_bytes()).collect(); + + let send = SenderInstruction::Send { + outbox_program_id: outbox_id, + target_zone, + target_program_id: receiver_id, + target_accounts: vec![ping_record_pda(receiver_id).into_value()], + payload, + ordinal, + }; + + let outbox_account = outbox_pda(outbox_id, &target_zone, ordinal); + let message = Message::try_new(Program::ping_sender().id(), vec![outbox_account], vec![], send) + .expect("build ping message"); + LeeTransaction::Public(PublicTransaction::new( + message, + lee::public_transaction::WitnessSet::from_raw_parts(vec![]), + )) +} + +fn sequencer_client(addr: SocketAddr) -> Result { + let url = config::addr_to_url(config::UrlProtocol::Http, addr) + .context("Failed to build sequencer URL")?; + SequencerClientBuilder::default() + .build(url) + .context("Failed to build sequencer client") +} + +/// Polls zone B's indexer until the ping record PDA holds a payload. +async fn wait_for_indexer_delivery( + indexer: &IndexerClient, + record_id: AccountId, +) -> Result> { + let account_id = indexer_service_protocol::AccountId { + value: record_id.into_value(), + }; + let wait = async { + loop { + let account = + indexer_service_rpc::RpcClient::get_account(&**indexer, account_id).await?; + let data = account.data.0; + if !data.is_empty() { + return Ok::, anyhow::Error>(data); + } + tokio::time::sleep(Duration::from_secs(3)).await; + } + }; + tokio::time::timeout(DELIVERY_TIMEOUT, wait) + .await + .context("Zone B's indexer did not record the payload in time")? +} diff --git a/lez/indexer/core/src/lib.rs b/lez/indexer/core/src/lib.rs index ee0e813d..46d2cbb6 100644 --- a/lez/indexer/core/src/lib.rs +++ b/lez/indexer/core/src/lib.rs @@ -124,7 +124,13 @@ impl IndexerCore { // no longer requires it. Zone-sdk handles L1 tracking internally. let placeholder_l1_header = HeaderId::from([0_u8; 32]); if let Err(err) = self.store.put_block(block.clone(), placeholder_l1_header).await { - error!("Failed to store block {}: {err:#}", block.header.block_id); + // Do not advance the cursor past a block we failed to + // apply: halt ingestion instead of silently desyncing. + error!( + "Failed to store block {}: {err:#}. Halting indexer ingestion.", + block.header.block_id + ); + return; } cursor = Some(slot);