mirror of
https://github.com/logos-blockchain/logos-execution-zone.git
synced 2026-06-29 18:39:30 +00:00
fix(cross-zone): halt the indexer on a failed block and add the verified round-trip test
This commit is contained in:
parent
0ddb7a9291
commit
60741a7191
152
integration_tests/tests/cross_zone_verified.rs
Normal file
152
integration_tests/tests/cross_zone_verified.rs
Normal file
@ -0,0 +1,152 @@
|
||||
#![expect(
|
||||
clippy::tests_outside_test_module,
|
||||
clippy::arithmetic_side_effects,
|
||||
reason = "We don't care about these in tests"
|
||||
)]
|
||||
|
||||
//! Cross-zone round trip with the indexer in the loop (Option B). A ping on zone
|
||||
//! A is delivered to zone B, and zone B's indexer independently re-derives the
|
||||
//! injected dispatch from zone A's finalized blocks before applying it. The
|
||||
//! payload landing in the indexer's state proves verification passed; a forgery
|
||||
//! would have halted the indexer instead.
|
||||
|
||||
use std::{net::SocketAddr, time::Duration};
|
||||
|
||||
use anyhow::{Context as _, Result};
|
||||
use common::transaction::LeeTransaction;
|
||||
use cross_zone_outbox_core::outbox_pda;
|
||||
use integration_tests::{
|
||||
config::{self, SequencerPartialConfig},
|
||||
indexer_client::IndexerClient,
|
||||
setup::{setup_bedrock_node, setup_indexer, setup_sequencer},
|
||||
};
|
||||
use lee::{AccountId, PublicTransaction, program::Program, public_transaction::Message};
|
||||
use lee_core::program::ProgramId;
|
||||
use ping_core::{ReceiverInstruction, SenderInstruction, ping_record_pda};
|
||||
use sequencer_core::config::{CrossZoneConfig, CrossZonePeer};
|
||||
use sequencer_service_rpc::{RpcClient as _, SequencerClient, SequencerClientBuilder};
|
||||
use tokio::test;
|
||||
|
||||
const DELIVERY_TIMEOUT: Duration = Duration::from_secs(600);
|
||||
const PING_PAYLOAD: &[u8] = b"hello-verified-zone";
|
||||
|
||||
#[test]
|
||||
async fn indexer_verifies_and_delivers_cross_zone_ping() -> Result<()> {
|
||||
// Declared first so it outlives both zones (drops run in reverse order).
|
||||
let (_bedrock, bedrock_addr) = setup_bedrock_node()
|
||||
.await
|
||||
.context("Failed to set up shared Bedrock node")?;
|
||||
|
||||
let partial = SequencerPartialConfig::default();
|
||||
let channel_a = config::bedrock_channel_id();
|
||||
let channel_b = config::bedrock_channel_id_b();
|
||||
let zone_a: [u8; 32] = *channel_a.as_ref();
|
||||
let zone_b: [u8; 32] = *channel_b.as_ref();
|
||||
|
||||
let receiver_id = Program::ping_receiver().id();
|
||||
let cross_zone = CrossZoneConfig {
|
||||
peers: vec![CrossZonePeer {
|
||||
channel_id: zone_a,
|
||||
allowed_targets: vec![receiver_id],
|
||||
}],
|
||||
};
|
||||
|
||||
// Zone A: source. Zone B: destination, with the watcher on its sequencer and
|
||||
// the verifier on its indexer.
|
||||
let (seq_a, _seq_a_home) = setup_sequencer(partial, bedrock_addr, vec![], channel_a, None)
|
||||
.await
|
||||
.context("Failed to set up zone A sequencer")?;
|
||||
let (_idx_a, _idx_a_home) = setup_indexer(bedrock_addr, channel_a, None)
|
||||
.await
|
||||
.context("Failed to set up zone A indexer")?;
|
||||
let (_seq_b, _seq_b_home) =
|
||||
setup_sequencer(partial, bedrock_addr, vec![], channel_b, Some(cross_zone.clone()))
|
||||
.await
|
||||
.context("Failed to set up zone B sequencer")?;
|
||||
let (idx_b, _idx_b_home) = setup_indexer(bedrock_addr, channel_b, Some(cross_zone))
|
||||
.await
|
||||
.context("Failed to set up zone B indexer")?;
|
||||
|
||||
// Submit the ping on zone A, addressed to ping_receiver on zone B.
|
||||
let ping = build_ping_tx(zone_b, receiver_id);
|
||||
sequencer_client(seq_a.addr())?
|
||||
.send_transaction(ping)
|
||||
.await
|
||||
.context("Failed to submit ping on zone A")?;
|
||||
|
||||
// Wait until zone B's indexer records the delivered payload. The indexer only
|
||||
// applies the dispatch after re-deriving and verifying it.
|
||||
let record_id = ping_record_pda(receiver_id);
|
||||
let indexer_url = config::addr_to_url(config::UrlProtocol::Ws, idx_b.addr())
|
||||
.context("Failed to build indexer URL")?;
|
||||
let indexer = IndexerClient::new(&indexer_url)
|
||||
.await
|
||||
.context("Failed to build indexer client")?;
|
||||
|
||||
let delivered = wait_for_indexer_delivery(&indexer, record_id).await?;
|
||||
assert_eq!(
|
||||
delivered, PING_PAYLOAD,
|
||||
"Zone B's indexer must record the verified cross-zone payload"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn build_ping_tx(target_zone: [u8; 32], receiver_id: ProgramId) -> LeeTransaction {
|
||||
let outbox_id = Program::cross_zone_outbox().id();
|
||||
let ordinal = 0;
|
||||
|
||||
let words = risc0_zkvm::serde::to_vec(&ReceiverInstruction::Record {
|
||||
payload: PING_PAYLOAD.to_vec(),
|
||||
})
|
||||
.expect("serialize ping instruction");
|
||||
let payload: Vec<u8> = words.iter().flat_map(|word| word.to_le_bytes()).collect();
|
||||
|
||||
let send = SenderInstruction::Send {
|
||||
outbox_program_id: outbox_id,
|
||||
target_zone,
|
||||
target_program_id: receiver_id,
|
||||
target_accounts: vec![ping_record_pda(receiver_id).into_value()],
|
||||
payload,
|
||||
ordinal,
|
||||
};
|
||||
|
||||
let outbox_account = outbox_pda(outbox_id, &target_zone, ordinal);
|
||||
let message = Message::try_new(Program::ping_sender().id(), vec![outbox_account], vec![], send)
|
||||
.expect("build ping message");
|
||||
LeeTransaction::Public(PublicTransaction::new(
|
||||
message,
|
||||
lee::public_transaction::WitnessSet::from_raw_parts(vec![]),
|
||||
))
|
||||
}
|
||||
|
||||
fn sequencer_client(addr: SocketAddr) -> Result<SequencerClient> {
|
||||
let url = config::addr_to_url(config::UrlProtocol::Http, addr)
|
||||
.context("Failed to build sequencer URL")?;
|
||||
SequencerClientBuilder::default()
|
||||
.build(url)
|
||||
.context("Failed to build sequencer client")
|
||||
}
|
||||
|
||||
/// Polls zone B's indexer until the ping record PDA holds a payload.
|
||||
async fn wait_for_indexer_delivery(
|
||||
indexer: &IndexerClient,
|
||||
record_id: AccountId,
|
||||
) -> Result<Vec<u8>> {
|
||||
let account_id = indexer_service_protocol::AccountId {
|
||||
value: record_id.into_value(),
|
||||
};
|
||||
let wait = async {
|
||||
loop {
|
||||
let account =
|
||||
indexer_service_rpc::RpcClient::get_account(&**indexer, account_id).await?;
|
||||
let data = account.data.0;
|
||||
if !data.is_empty() {
|
||||
return Ok::<Vec<u8>, anyhow::Error>(data);
|
||||
}
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
}
|
||||
};
|
||||
tokio::time::timeout(DELIVERY_TIMEOUT, wait)
|
||||
.await
|
||||
.context("Zone B's indexer did not record the payload in time")?
|
||||
}
|
||||
@ -124,7 +124,13 @@ impl IndexerCore {
|
||||
// no longer requires it. Zone-sdk handles L1 tracking internally.
|
||||
let placeholder_l1_header = HeaderId::from([0_u8; 32]);
|
||||
if let Err(err) = self.store.put_block(block.clone(), placeholder_l1_header).await {
|
||||
error!("Failed to store block {}: {err:#}", block.header.block_id);
|
||||
// Do not advance the cursor past a block we failed to
|
||||
// apply: halt ingestion instead of silently desyncing.
|
||||
error!(
|
||||
"Failed to store block {}: {err:#}. Halting indexer ingestion.",
|
||||
block.header.block_id
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
cursor = Some(slot);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user