mirror of
https://github.com/logos-blockchain/logos-execution-zone.git
synced 2026-06-29 18:39:30 +00:00
feat(cross-zone): add Option B indexer verifier and seed inbox config consistently
This commit is contained in:
parent
b83a3c7556
commit
0ddb7a9291
4
Cargo.lock
generated
4
Cargo.lock
generated
@ -3824,13 +3824,17 @@ dependencies = [
|
||||
"authenticated_transfer_core",
|
||||
"borsh",
|
||||
"common",
|
||||
"cross_zone_inbox_core",
|
||||
"futures",
|
||||
"hex",
|
||||
"humantime-serde",
|
||||
"lee",
|
||||
"lee_core",
|
||||
"log",
|
||||
"logos-blockchain-core",
|
||||
"logos-blockchain-zone-sdk",
|
||||
"ping_core",
|
||||
"risc0-zkvm",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"storage",
|
||||
|
||||
@ -61,6 +61,7 @@ pub fn setup_indexer_ffi(
|
||||
bedrock_addr,
|
||||
temp_indexer_dir.path().to_owned(),
|
||||
integration_tests::config::bedrock_channel_id(),
|
||||
None,
|
||||
)
|
||||
.context("Failed to create Indexer config")?;
|
||||
|
||||
|
||||
@ -39,13 +39,13 @@ async fn two_zones_share_one_bedrock_and_both_advance() -> Result<()> {
|
||||
let (seq_a, _seq_a_home) = setup_sequencer(partial, bedrock_addr, vec![], channel_a, None)
|
||||
.await
|
||||
.context("Failed to set up zone A sequencer")?;
|
||||
let (idx_a, _idx_a_home) = setup_indexer(bedrock_addr, channel_a)
|
||||
let (idx_a, _idx_a_home) = setup_indexer(bedrock_addr, channel_a, None)
|
||||
.await
|
||||
.context("Failed to set up zone A indexer")?;
|
||||
let (seq_b, _seq_b_home) = setup_sequencer(partial, bedrock_addr, vec![], channel_b, None)
|
||||
.await
|
||||
.context("Failed to set up zone B sequencer")?;
|
||||
let (idx_b, _idx_b_home) = setup_indexer(bedrock_addr, channel_b)
|
||||
let (idx_b, _idx_b_home) = setup_indexer(bedrock_addr, channel_b, None)
|
||||
.await
|
||||
.context("Failed to set up zone B indexer")?;
|
||||
|
||||
|
||||
@ -12,6 +12,8 @@ common.workspace = true
|
||||
logos-blockchain-zone-sdk.workspace = true
|
||||
lee.workspace = true
|
||||
lee_core.workspace = true
|
||||
cross_zone_inbox_core = { workspace = true, features = ["host"] }
|
||||
ping_core.workspace = true
|
||||
storage.workspace = true
|
||||
testnet_initial_state.workspace = true
|
||||
|
||||
@ -26,6 +28,8 @@ logos-blockchain-core.workspace = true
|
||||
serde_json.workspace = true
|
||||
async-stream.workspace = true
|
||||
tokio.workspace = true
|
||||
risc0-zkvm.workspace = true
|
||||
hex.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile.workspace = true
|
||||
|
||||
@ -22,8 +22,13 @@ pub struct IndexerStore {
|
||||
impl IndexerStore {
|
||||
/// Starting database at the start of new chain.
|
||||
/// Creates files if necessary.
|
||||
pub fn open_db(location: &Path) -> Result<Self> {
|
||||
let initial_state = testnet_initial_state::initial_state();
|
||||
pub fn open_db(location: &Path, genesis_seed: Option<(AccountId, Account)>) -> Result<Self> {
|
||||
let mut initial_state = testnet_initial_state::initial_state();
|
||||
// Seed any zone-specific genesis accounts (e.g. the cross-zone inbox
|
||||
// config) so the indexer's replayed state matches the sequencer's.
|
||||
if let Some((account_id, account)) = genesis_seed {
|
||||
initial_state.insert_genesis_account(account_id, account);
|
||||
}
|
||||
let dbio = RocksDBIO::open_or_create(location, &initial_state)?;
|
||||
|
||||
let current_state = dbio.final_state()?;
|
||||
@ -215,7 +220,7 @@ mod tests {
|
||||
fn correct_startup() {
|
||||
let home = tempdir().unwrap();
|
||||
|
||||
let storage = IndexerStore::open_db(home.as_ref()).unwrap();
|
||||
let storage = IndexerStore::open_db(home.as_ref(), None).unwrap();
|
||||
|
||||
let final_id = storage.get_last_block_id().unwrap();
|
||||
|
||||
@ -226,7 +231,7 @@ mod tests {
|
||||
async fn state_transition() {
|
||||
let home = tempdir().unwrap();
|
||||
|
||||
let storage = IndexerStore::open_db(home.as_ref()).unwrap();
|
||||
let storage = IndexerStore::open_db(home.as_ref(), None).unwrap();
|
||||
|
||||
let initial_accounts = initial_pub_accounts_private_keys();
|
||||
let from = initial_accounts[0].account_id;
|
||||
@ -278,7 +283,7 @@ mod tests {
|
||||
async fn account_state_at_block() {
|
||||
let home = tempdir().unwrap();
|
||||
|
||||
let storage = IndexerStore::open_db(home.as_ref()).unwrap();
|
||||
let storage = IndexerStore::open_db(home.as_ref(), None).unwrap();
|
||||
|
||||
let mut prev_hash = None;
|
||||
|
||||
|
||||
@ -7,6 +7,7 @@ use std::{
|
||||
|
||||
use anyhow::{Context as _, Result};
|
||||
use common::config::BasicAuth;
|
||||
use cross_zone_inbox_core::CrossZoneConfig;
|
||||
use humantime_serde;
|
||||
pub use logos_blockchain_core::mantle::ops::channel::ChannelId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@ -27,6 +28,9 @@ pub struct IndexerConfig {
|
||||
pub consensus_info_polling_interval: Duration,
|
||||
pub bedrock_config: ClientConfig,
|
||||
pub channel_id: ChannelId,
|
||||
/// Cross-zone configuration. `None` disables the indexer's cross-zone handling.
|
||||
#[serde(default)]
|
||||
pub cross_zone: Option<CrossZoneConfig>,
|
||||
}
|
||||
|
||||
impl IndexerConfig {
|
||||
|
||||
369
lez/indexer/core/src/cross_zone_verifier.rs
Normal file
369
lez/indexer/core/src/cross_zone_verifier.rs
Normal file
@ -0,0 +1,369 @@
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use anyhow::{Context as _, Result, bail};
|
||||
use common::{block::Block, transaction::LeeTransaction};
|
||||
use cross_zone_inbox_core::{
|
||||
CrossZoneMessage, Instruction as InboxInstruction, MessageKey, ZoneId,
|
||||
build_dispatch_from_emission, message_key,
|
||||
};
|
||||
use futures::StreamExt as _;
|
||||
use lee::program::Program;
|
||||
use lee_core::program::ProgramId;
|
||||
use log::{error, info};
|
||||
use logos_blockchain_core::mantle::ops::channel::ChannelId;
|
||||
use logos_blockchain_zone_sdk::{
|
||||
CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer,
|
||||
};
|
||||
use ping_core::SenderInstruction;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::config::IndexerConfig;
|
||||
|
||||
/// How long the verifier waits for a referenced peer block to finalize before
|
||||
/// rejecting the dispatch as referencing a nonexistent block.
|
||||
const PEER_BLOCK_WAIT: Duration = Duration::from_secs(60);
|
||||
|
||||
/// Cache of finalized peer-zone blocks, filled by per-peer reader tasks and read
|
||||
/// by the verifier to re-derive cross-zone dispatch transactions.
|
||||
#[derive(Clone, Default)]
|
||||
struct PeerBlocks {
|
||||
chains: Arc<RwLock<HashMap<ZoneId, HashMap<u64, Block>>>>,
|
||||
}
|
||||
|
||||
impl PeerBlocks {
|
||||
async fn insert(&self, zone: ZoneId, block: Block) {
|
||||
self.chains
|
||||
.write()
|
||||
.await
|
||||
.entry(zone)
|
||||
.or_default()
|
||||
.insert(block.header.block_id, block);
|
||||
}
|
||||
|
||||
async fn get(&self, zone: ZoneId, block_id: u64) -> Option<Block> {
|
||||
self.chains
|
||||
.read()
|
||||
.await
|
||||
.get(&zone)
|
||||
.and_then(|chain| chain.get(&block_id).cloned())
|
||||
}
|
||||
}
|
||||
|
||||
/// The indexer-side Option B verifier. For every cross-zone dispatch in a block
|
||||
/// it re-derives the transaction from the peer's finalized block and rejects it
|
||||
/// if the bytes differ (a forgery) or the message was already delivered (a
|
||||
/// replay), so delivery no longer relies on trusting the sequencer.
|
||||
#[derive(Clone)]
|
||||
pub struct CrossZoneVerifier {
|
||||
self_zone: ZoneId,
|
||||
inbox_id: ProgramId,
|
||||
emitter_id: ProgramId,
|
||||
peers: PeerBlocks,
|
||||
seen: Arc<RwLock<HashSet<MessageKey>>>,
|
||||
}
|
||||
|
||||
impl CrossZoneVerifier {
|
||||
/// Builds the verifier and spawns one peer reader per configured peer.
|
||||
/// Returns `None` when cross-zone messaging is disabled.
|
||||
pub fn start(config: &IndexerConfig) -> Option<Self> {
|
||||
let cross_zone = config.cross_zone.as_ref()?;
|
||||
let self_zone: ZoneId = *config.channel_id.as_ref();
|
||||
let peers = PeerBlocks::default();
|
||||
|
||||
for peer in &cross_zone.peers {
|
||||
let node = NodeHttpClient::new(
|
||||
CommonHttpClient::new(config.bedrock_config.auth.clone().map(Into::into)),
|
||||
config.bedrock_config.addr.clone(),
|
||||
);
|
||||
tokio::spawn(read_peer(
|
||||
ZoneIndexer::new(ChannelId::from(peer.channel_id), node),
|
||||
peer.channel_id,
|
||||
peers.clone(),
|
||||
config.consensus_info_polling_interval,
|
||||
));
|
||||
}
|
||||
|
||||
Some(Self {
|
||||
self_zone,
|
||||
inbox_id: Program::cross_zone_inbox().id(),
|
||||
emitter_id: Program::ping_sender().id(),
|
||||
peers,
|
||||
seen: Arc::new(RwLock::new(HashSet::new())),
|
||||
})
|
||||
}
|
||||
|
||||
/// Verifies every cross-zone dispatch in a block, returning `Err` on the
|
||||
/// first forged or replayed dispatch. The caller halts ingestion on error.
|
||||
pub async fn verify_block(&self, block: &Block) -> Result<()> {
|
||||
for tx in &block.body.transactions {
|
||||
let Some(msg) = self.decode_dispatch(tx) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let key = message_key(&msg.src_zone, msg.src_block_id, msg.src_tx_index);
|
||||
if self.seen.read().await.contains(&key) {
|
||||
bail!("cross-zone replay: message {} re-delivered", hex::encode(key));
|
||||
}
|
||||
|
||||
let expected = self.rederive(&msg).await?;
|
||||
if LeeTransaction::Public(expected) != *tx {
|
||||
bail!(
|
||||
"forged cross-zone dispatch from zone {} block {} tx {}: re-derivation mismatch",
|
||||
hex::encode(msg.src_zone),
|
||||
msg.src_block_id,
|
||||
msg.src_tx_index
|
||||
);
|
||||
}
|
||||
|
||||
self.seen.write().await.insert(key);
|
||||
info!(
|
||||
"Verified cross-zone dispatch from zone {} block {} tx {}",
|
||||
hex::encode(msg.src_zone),
|
||||
msg.src_block_id,
|
||||
msg.src_tx_index
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Decodes a transaction into the cross-zone message it dispatches, or `None`
|
||||
/// if it is not an inbox dispatch.
|
||||
fn decode_dispatch(&self, tx: &LeeTransaction) -> Option<CrossZoneMessage> {
|
||||
let LeeTransaction::Public(public_tx) = tx else {
|
||||
return None;
|
||||
};
|
||||
if public_tx.message().program_id != self.inbox_id {
|
||||
return None;
|
||||
}
|
||||
match risc0_zkvm::serde::from_slice::<InboxInstruction, _>(
|
||||
&public_tx.message().instruction_data,
|
||||
) {
|
||||
Ok(InboxInstruction::Dispatch(msg)) => Some(msg),
|
||||
Err(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Re-derives the dispatch transaction the watcher should have injected for
|
||||
/// `msg`, reading the source emission from the peer's finalized block.
|
||||
async fn rederive(&self, msg: &CrossZoneMessage) -> Result<lee::PublicTransaction> {
|
||||
let peer_block = self
|
||||
.wait_for_peer_block(msg.src_zone, msg.src_block_id)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"no peer block {} from zone {} to verify against",
|
||||
msg.src_block_id,
|
||||
hex::encode(msg.src_zone)
|
||||
)
|
||||
})?;
|
||||
|
||||
let emission = peer_block
|
||||
.body
|
||||
.transactions
|
||||
.get(msg.src_tx_index as usize)
|
||||
.ok_or_else(|| {
|
||||
anyhow::anyhow!("src_tx_index {} out of range in peer block", msg.src_tx_index)
|
||||
})?;
|
||||
|
||||
let LeeTransaction::Public(emission) = emission else {
|
||||
bail!("peer emission transaction is not public");
|
||||
};
|
||||
if emission.message().program_id != self.emitter_id {
|
||||
bail!("peer transaction at src_tx_index is not an emitter transaction");
|
||||
}
|
||||
|
||||
let SenderInstruction::Send {
|
||||
target_zone,
|
||||
target_program_id,
|
||||
target_accounts,
|
||||
payload,
|
||||
..
|
||||
} = risc0_zkvm::serde::from_slice(&emission.message().instruction_data)
|
||||
.context("decode peer emission instruction")?;
|
||||
|
||||
if target_zone != self.self_zone {
|
||||
bail!("peer emission targets a different zone");
|
||||
}
|
||||
|
||||
Ok(build_dispatch_from_emission(
|
||||
self.inbox_id,
|
||||
msg.src_zone,
|
||||
msg.src_block_id,
|
||||
msg.src_tx_index,
|
||||
self.emitter_id,
|
||||
target_program_id,
|
||||
&target_accounts,
|
||||
payload,
|
||||
))
|
||||
}
|
||||
|
||||
/// Polls the peer cache until the referenced block finalizes. A forged
|
||||
/// reference to a never-finalized block times out and is rejected.
|
||||
async fn wait_for_peer_block(&self, zone: ZoneId, block_id: u64) -> Result<Block> {
|
||||
let mut waited = Duration::ZERO;
|
||||
loop {
|
||||
if let Some(block) = self.peers.get(zone, block_id).await {
|
||||
return Ok(block);
|
||||
}
|
||||
if waited >= PEER_BLOCK_WAIT {
|
||||
bail!(
|
||||
"peer block {} from zone {} did not finalize within {:?}",
|
||||
block_id,
|
||||
hex::encode(zone),
|
||||
PEER_BLOCK_WAIT
|
||||
);
|
||||
}
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
waited += Duration::from_secs(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Reads a peer zone's finalized blocks from Bedrock into the shared cache.
|
||||
async fn read_peer(
|
||||
zone_indexer: ZoneIndexer<NodeHttpClient>,
|
||||
peer_zone: ZoneId,
|
||||
peers: PeerBlocks,
|
||||
poll_interval: Duration,
|
||||
) {
|
||||
info!("Cross-zone peer reader started for {}", hex::encode(peer_zone));
|
||||
|
||||
let mut cursor = None;
|
||||
loop {
|
||||
let stream = match zone_indexer.next_messages(cursor).await {
|
||||
Ok(stream) => stream,
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Peer reader next_messages failed for {}: {err}",
|
||||
hex::encode(peer_zone)
|
||||
);
|
||||
tokio::time::sleep(poll_interval).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let mut stream = std::pin::pin!(stream);
|
||||
|
||||
while let Some((msg, slot)) = stream.next().await {
|
||||
if let ZoneMessage::Block(zone_block) = msg {
|
||||
match borsh::from_slice::<Block>(&zone_block.data) {
|
||||
Ok(block) => peers.insert(peer_zone, block).await,
|
||||
Err(err) => error!("Peer reader failed to deserialize block: {err}"),
|
||||
}
|
||||
}
|
||||
cursor = Some(slot);
|
||||
}
|
||||
|
||||
tokio::time::sleep(poll_interval).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common::test_utils::produce_dummy_block;
|
||||
use lee::{
|
||||
PublicTransaction,
|
||||
program::Program,
|
||||
public_transaction::{Message, WitnessSet},
|
||||
};
|
||||
use ping_core::ping_record_pda;
|
||||
|
||||
use super::*;
|
||||
|
||||
const SELF_ZONE: ZoneId = [1; 32];
|
||||
const PEER_ZONE: ZoneId = [2; 32];
|
||||
const PEER_BLOCK_ID: u64 = 5;
|
||||
|
||||
fn verifier() -> CrossZoneVerifier {
|
||||
CrossZoneVerifier {
|
||||
self_zone: SELF_ZONE,
|
||||
inbox_id: Program::cross_zone_inbox().id(),
|
||||
emitter_id: Program::ping_sender().id(),
|
||||
peers: PeerBlocks::default(),
|
||||
seen: Arc::new(RwLock::new(HashSet::new())),
|
||||
}
|
||||
}
|
||||
|
||||
/// A ping_sender emission addressed to `SELF_ZONE` carrying `payload`.
|
||||
fn emission(payload: &[u8]) -> LeeTransaction {
|
||||
let receiver_id = Program::ping_receiver().id();
|
||||
let send = SenderInstruction::Send {
|
||||
outbox_program_id: Program::cross_zone_outbox().id(),
|
||||
target_zone: SELF_ZONE,
|
||||
target_program_id: receiver_id,
|
||||
target_accounts: vec![ping_record_pda(receiver_id).into_value()],
|
||||
payload: payload.to_vec(),
|
||||
ordinal: 0,
|
||||
};
|
||||
let message = Message::try_new(Program::ping_sender().id(), vec![], vec![], send)
|
||||
.expect("emission serializes");
|
||||
LeeTransaction::Public(PublicTransaction::new(
|
||||
message,
|
||||
WitnessSet::from_raw_parts(vec![]),
|
||||
))
|
||||
}
|
||||
|
||||
/// The dispatch a watcher would inject for a `PEER_BLOCK_ID` emission of `payload`.
|
||||
fn dispatch(payload: &[u8]) -> LeeTransaction {
|
||||
let receiver_id = Program::ping_receiver().id();
|
||||
LeeTransaction::Public(build_dispatch_from_emission(
|
||||
Program::cross_zone_inbox().id(),
|
||||
PEER_ZONE,
|
||||
PEER_BLOCK_ID,
|
||||
0,
|
||||
Program::ping_sender().id(),
|
||||
receiver_id,
|
||||
&[ping_record_pda(receiver_id).into_value()],
|
||||
payload.to_vec(),
|
||||
))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn verifies_dispatch_matching_a_peer_emission() {
|
||||
let verifier = verifier();
|
||||
verifier
|
||||
.peers
|
||||
.insert(PEER_ZONE, produce_dummy_block(PEER_BLOCK_ID, None, vec![emission(b"hi")]))
|
||||
.await;
|
||||
|
||||
let block = produce_dummy_block(9, None, vec![dispatch(b"hi")]);
|
||||
verifier
|
||||
.verify_block(&block)
|
||||
.await
|
||||
.expect("dispatch matching the peer emission verifies");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejects_dispatch_with_no_matching_emission() {
|
||||
let verifier = verifier();
|
||||
// The peer block carries the real emission, but the block claims a
|
||||
// different payload, so re-derivation does not reproduce it.
|
||||
verifier
|
||||
.peers
|
||||
.insert(PEER_ZONE, produce_dummy_block(PEER_BLOCK_ID, None, vec![emission(b"real")]))
|
||||
.await;
|
||||
|
||||
let block = produce_dummy_block(9, None, vec![dispatch(b"forged")]);
|
||||
let err = verifier.verify_block(&block).await.unwrap_err();
|
||||
assert!(err.to_string().contains("forged"), "unexpected error: {err}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejects_replayed_dispatch() {
|
||||
let verifier = verifier();
|
||||
verifier
|
||||
.peers
|
||||
.insert(PEER_ZONE, produce_dummy_block(PEER_BLOCK_ID, None, vec![emission(b"hi")]))
|
||||
.await;
|
||||
|
||||
let first = produce_dummy_block(9, None, vec![dispatch(b"hi")]);
|
||||
verifier.verify_block(&first).await.expect("first delivery verifies");
|
||||
|
||||
let replay = produce_dummy_block(10, None, vec![dispatch(b"hi")]);
|
||||
let err = verifier.verify_block(&replay).await.unwrap_err();
|
||||
assert!(err.to_string().contains("replay"), "unexpected error: {err}");
|
||||
}
|
||||
}
|
||||
@ -10,16 +10,20 @@ use logos_blockchain_zone_sdk::{
|
||||
CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer,
|
||||
};
|
||||
|
||||
use crate::{block_store::IndexerStore, config::IndexerConfig};
|
||||
use crate::{
|
||||
block_store::IndexerStore, config::IndexerConfig, cross_zone_verifier::CrossZoneVerifier,
|
||||
};
|
||||
|
||||
pub mod block_store;
|
||||
pub mod config;
|
||||
pub mod cross_zone_verifier;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct IndexerCore {
|
||||
pub zone_indexer: Arc<ZoneIndexer<NodeHttpClient>>,
|
||||
pub config: IndexerConfig,
|
||||
pub store: IndexerStore,
|
||||
verifier: Option<CrossZoneVerifier>,
|
||||
}
|
||||
|
||||
impl IndexerCore {
|
||||
@ -33,10 +37,22 @@ impl IndexerCore {
|
||||
);
|
||||
let zone_indexer = ZoneIndexer::new(config.channel_id, node);
|
||||
|
||||
// Seed the inbox config so the indexer can replay cross-zone dispatch
|
||||
// transactions, matching the account the sequencer seeds at genesis.
|
||||
let inbox_config_seed = config.cross_zone.as_ref().map(|cross_zone| {
|
||||
let self_zone: [u8; 32] = *config.channel_id.as_ref();
|
||||
cross_zone_inbox_core::build_inbox_config_account(self_zone, cross_zone)
|
||||
});
|
||||
|
||||
// Option B verifier: re-derives each cross-zone dispatch from the peer's
|
||||
// finalized blocks. `None` when cross-zone messaging is disabled.
|
||||
let verifier = CrossZoneVerifier::start(&config);
|
||||
|
||||
Ok(Self {
|
||||
zone_indexer: Arc::new(zone_indexer),
|
||||
config,
|
||||
store: IndexerStore::open_db(&home)?,
|
||||
store: IndexerStore::open_db(&home, inbox_config_seed)?,
|
||||
verifier,
|
||||
})
|
||||
}
|
||||
|
||||
@ -91,6 +107,19 @@ impl IndexerCore {
|
||||
|
||||
info!("Indexed L2 block {}", block.header.block_id);
|
||||
|
||||
// Option B: re-derive and verify every cross-zone dispatch
|
||||
// before applying the block. A forged or replayed dispatch
|
||||
// halts ingestion rather than persisting an invalid state.
|
||||
if let Some(verifier) = &self.verifier {
|
||||
if let Err(err) = verifier.verify_block(&block).await {
|
||||
error!(
|
||||
"Cross-zone verification failed for block {}: {err:#}. Halting indexer ingestion.",
|
||||
block.header.block_id
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Remove l1_header placeholder once storage layer
|
||||
// no longer requires it. Zone-sdk handles L1 tracking internally.
|
||||
let placeholder_l1_header = HeaderId::from([0_u8; 32]);
|
||||
|
||||
@ -10,7 +10,6 @@ use bytesize::ByteSize;
|
||||
use common::config::BasicAuth;
|
||||
use humantime_serde;
|
||||
use lee::AccountId;
|
||||
use lee_core::program::ProgramId;
|
||||
use logos_blockchain_core::mantle::ops::channel::ChannelId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use url::Url;
|
||||
@ -28,21 +27,7 @@ pub enum GenesisAction {
|
||||
},
|
||||
}
|
||||
|
||||
/// A peer zone whose outbox this zone watches for inbound cross-zone messages.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct CrossZonePeer {
|
||||
/// The peer's Bedrock channel; its 32 bytes double as the peer's zone id.
|
||||
pub channel_id: [u8; 32],
|
||||
/// Programs on this zone a message from this peer is allowed to target.
|
||||
pub allowed_targets: Vec<ProgramId>,
|
||||
}
|
||||
|
||||
/// Cross-zone watcher configuration: the peers this zone reads from Bedrock and,
|
||||
/// per peer, the local programs they may deliver to. `None` disables the watcher.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct CrossZoneConfig {
|
||||
pub peers: Vec<CrossZonePeer>,
|
||||
}
|
||||
pub use cross_zone_inbox_core::{CrossZoneConfig, CrossZonePeer};
|
||||
|
||||
// TODO: Provide default values
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
|
||||
@ -1,12 +1,10 @@
|
||||
use std::{collections::BTreeMap, time::Duration};
|
||||
use std::time::Duration;
|
||||
|
||||
use common::{block::Block, transaction::LeeTransaction};
|
||||
use cross_zone_inbox_core::{
|
||||
CrossZoneMessage, InboxConfig, build_inbox_dispatch_tx, inbox_config_account_id,
|
||||
};
|
||||
use cross_zone_inbox_core::build_dispatch_from_emission;
|
||||
use futures::StreamExt as _;
|
||||
use lee::{AccountId, program::Program};
|
||||
use lee_core::{account::Account, program::ProgramId};
|
||||
use lee::program::Program;
|
||||
use lee_core::program::ProgramId;
|
||||
use log::{error, info, warn};
|
||||
use logos_blockchain_core::mantle::ops::channel::ChannelId;
|
||||
use logos_blockchain_zone_sdk::{
|
||||
@ -20,35 +18,6 @@ use crate::{
|
||||
config::{BedrockConfig, CrossZoneConfig},
|
||||
};
|
||||
|
||||
/// The inbox config account this zone seeds at startup so the inbox guest can
|
||||
/// authorize inbound peer messages. The config is zone-specific (self zone plus
|
||||
/// per-peer target allowlists), so it cannot live in the shared genesis state.
|
||||
#[must_use]
|
||||
pub fn inbox_config_account(self_zone: [u8; 32], cross_zone: &CrossZoneConfig) -> (AccountId, Account) {
|
||||
let inbox_id = Program::cross_zone_inbox().id();
|
||||
|
||||
let mut allowed_targets = BTreeMap::new();
|
||||
for peer in &cross_zone.peers {
|
||||
allowed_targets.insert(peer.channel_id, peer.allowed_targets.clone());
|
||||
}
|
||||
let config = InboxConfig {
|
||||
self_zone,
|
||||
allowed_peers: BTreeMap::new(),
|
||||
allowed_targets,
|
||||
};
|
||||
|
||||
let account = Account {
|
||||
program_owner: inbox_id,
|
||||
balance: 0,
|
||||
data: config
|
||||
.to_bytes()
|
||||
.try_into()
|
||||
.expect("inbox config fits in account data"),
|
||||
nonce: 0_u128.into(),
|
||||
};
|
||||
(inbox_config_account_id(inbox_id), account)
|
||||
}
|
||||
|
||||
/// Spawns one watcher task per configured peer. Each task reads the peer's
|
||||
/// finalized blocks from Bedrock, recognizes outbound messages addressed to this
|
||||
/// zone, and injects the matching inbox dispatch as a sequencer-origin
|
||||
@ -188,17 +157,16 @@ async fn deliver_block(
|
||||
continue;
|
||||
}
|
||||
|
||||
let cross_zone_message = CrossZoneMessage {
|
||||
src_zone: peer_zone,
|
||||
src_block_id: block.header.block_id,
|
||||
src_tx_index: u32::try_from(index).unwrap_or(u32::MAX),
|
||||
src_program_id: emitter_id,
|
||||
let dispatch = build_dispatch_from_emission(
|
||||
inbox_id,
|
||||
peer_zone,
|
||||
block.header.block_id,
|
||||
u32::try_from(index).unwrap_or(u32::MAX),
|
||||
emitter_id,
|
||||
target_program_id,
|
||||
&target_accounts,
|
||||
payload,
|
||||
l1_inclusion_witness: None,
|
||||
};
|
||||
let target_ids: Vec<AccountId> = target_accounts.into_iter().map(AccountId::new).collect();
|
||||
let dispatch = build_inbox_dispatch_tx(inbox_id, &cross_zone_message, target_ids);
|
||||
);
|
||||
|
||||
match mempool_handle
|
||||
.push((TransactionOrigin::Sequencer, LeeTransaction::Public(dispatch)))
|
||||
|
||||
@ -626,7 +626,7 @@ fn build_genesis_state(config: &SequencerConfig) -> (lee::V03State, Vec<LeeTrans
|
||||
if let Some(cross_zone) = &config.cross_zone {
|
||||
let self_zone = *config.bedrock_config.channel_id.as_ref();
|
||||
let (config_id, config_account) =
|
||||
cross_zone_watcher::inbox_config_account(self_zone, cross_zone);
|
||||
cross_zone_inbox_core::build_inbox_config_account(self_zone, cross_zone);
|
||||
state.insert_genesis_account(config_id, config_account);
|
||||
}
|
||||
|
||||
|
||||
@ -23,6 +23,23 @@ const MESSAGE_KEY_DOMAIN: [u8; 32] = *b"/LEZ/v0.3/CrossZoneMsgKey/00000/";
|
||||
const INBOX_CONFIG_SEED: [u8; 32] = *b"/LEZ/v0.3/CrossZoneInboxCfg/000/";
|
||||
const INBOX_SEEN_SEED_DOMAIN: [u8; 32] = *b"/LEZ/v0.3/CrossZoneInboxSeen/00/";
|
||||
|
||||
/// A peer zone whose outbox a zone watches for inbound cross-zone messages.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct CrossZonePeer {
|
||||
/// The peer's Bedrock channel; its 32 bytes double as the peer's zone id.
|
||||
pub channel_id: ZoneId,
|
||||
/// Programs on the local zone a message from this peer is allowed to target.
|
||||
pub allowed_targets: Vec<ProgramId>,
|
||||
}
|
||||
|
||||
/// Cross-zone configuration shared by a zone's sequencer (watcher) and indexer
|
||||
/// (verifier): the peers it reads from Bedrock and, per peer, the local programs
|
||||
/// they may deliver to.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct CrossZoneConfig {
|
||||
pub peers: Vec<CrossZonePeer>,
|
||||
}
|
||||
|
||||
/// A finalized outbound message observed on a peer zone, addressed to a program
|
||||
/// on this zone. The watcher fills it from the peer's block; it is never
|
||||
/// self-reported by a user.
|
||||
@ -183,6 +200,67 @@ pub fn build_inbox_dispatch_tx(
|
||||
)
|
||||
}
|
||||
|
||||
/// Builds the dispatch transaction for one peer emission. Both the sequencer's
|
||||
/// watcher and the indexer's verifier go through this so their transactions are
|
||||
/// byte-identical for the same emission (the basis of the Option B check).
|
||||
#[cfg(feature = "host")]
|
||||
#[must_use]
|
||||
pub fn build_dispatch_from_emission(
|
||||
inbox_id: ProgramId,
|
||||
src_zone: ZoneId,
|
||||
src_block_id: u64,
|
||||
src_tx_index: u32,
|
||||
src_program_id: ProgramId,
|
||||
target_program_id: ProgramId,
|
||||
target_accounts: &[[u8; 32]],
|
||||
payload: Vec<u8>,
|
||||
) -> lee::PublicTransaction {
|
||||
let msg = CrossZoneMessage {
|
||||
src_zone,
|
||||
src_block_id,
|
||||
src_tx_index,
|
||||
src_program_id,
|
||||
target_program_id,
|
||||
payload,
|
||||
l1_inclusion_witness: None,
|
||||
};
|
||||
let target_ids = target_accounts.iter().copied().map(AccountId::new).collect();
|
||||
build_inbox_dispatch_tx(inbox_id, &msg, target_ids)
|
||||
}
|
||||
|
||||
/// Builds the inbox config account a zone seeds into genesis state so the inbox
|
||||
/// guest can authorize inbound peer messages. The sequencer and indexer seed the
|
||||
/// same account from the same config, keeping their replayed state consistent.
|
||||
#[cfg(feature = "host")]
|
||||
#[must_use]
|
||||
pub fn build_inbox_config_account(
|
||||
self_zone: ZoneId,
|
||||
cross_zone: &CrossZoneConfig,
|
||||
) -> (AccountId, lee_core::account::Account) {
|
||||
let inbox_id = lee::program::Program::cross_zone_inbox().id();
|
||||
|
||||
let mut allowed_targets = BTreeMap::new();
|
||||
for peer in &cross_zone.peers {
|
||||
allowed_targets.insert(peer.channel_id, peer.allowed_targets.clone());
|
||||
}
|
||||
let config = InboxConfig {
|
||||
self_zone,
|
||||
allowed_peers: BTreeMap::new(),
|
||||
allowed_targets,
|
||||
};
|
||||
|
||||
let account = lee_core::account::Account {
|
||||
program_owner: inbox_id,
|
||||
balance: 0,
|
||||
data: config
|
||||
.to_bytes()
|
||||
.try_into()
|
||||
.expect("inbox config fits in account data"),
|
||||
nonce: 0_u128.into(),
|
||||
};
|
||||
(inbox_config_account_id(inbox_id), account)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@ -170,6 +170,7 @@ pub fn indexer_config(
|
||||
bedrock_addr: SocketAddr,
|
||||
home: PathBuf,
|
||||
channel_id: ChannelId,
|
||||
cross_zone: Option<CrossZoneConfig>,
|
||||
) -> Result<IndexerConfig> {
|
||||
Ok(IndexerConfig {
|
||||
home,
|
||||
@ -180,6 +181,7 @@ pub fn indexer_config(
|
||||
auth: None,
|
||||
},
|
||||
channel_id,
|
||||
cross_zone,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@ -308,7 +308,7 @@ impl TestContextBuilder {
|
||||
|
||||
let indexer_components = if enable_indexer {
|
||||
let (indexer_handle, temp_indexer_dir) =
|
||||
setup_indexer(bedrock_addr, config::bedrock_channel_id())
|
||||
setup_indexer(bedrock_addr, config::bedrock_channel_id(), None)
|
||||
.await
|
||||
.context("Failed to setup Indexer")?;
|
||||
let indexer_url = config::addr_to_url(config::UrlProtocol::Ws, indexer_handle.addr())
|
||||
|
||||
@ -92,6 +92,7 @@ pub async fn setup_bedrock_node() -> Result<(DockerCompose, SocketAddr)> {
|
||||
pub async fn setup_indexer(
|
||||
bedrock_addr: SocketAddr,
|
||||
channel_id: ChannelId,
|
||||
cross_zone: Option<sequencer_core::config::CrossZoneConfig>,
|
||||
) -> Result<(IndexerHandle, TempDir)> {
|
||||
let temp_indexer_dir =
|
||||
tempfile::tempdir().context("Failed to create temp dir for indexer home")?;
|
||||
@ -101,9 +102,13 @@ pub async fn setup_indexer(
|
||||
temp_indexer_dir.path().display()
|
||||
);
|
||||
|
||||
let indexer_config =
|
||||
config::indexer_config(bedrock_addr, temp_indexer_dir.path().to_owned(), channel_id)
|
||||
.context("Failed to create Indexer config")?;
|
||||
let indexer_config = config::indexer_config(
|
||||
bedrock_addr,
|
||||
temp_indexer_dir.path().to_owned(),
|
||||
channel_id,
|
||||
cross_zone,
|
||||
)
|
||||
.context("Failed to create Indexer config")?;
|
||||
|
||||
indexer_service::run_server(indexer_config, 0)
|
||||
.await
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user