mirror of
https://github.com/logos-blockchain/logos-execution-zone.git
synced 2026-06-29 18:39:30 +00:00
feat(cross-zone): add inbox watcher and seed inbox config at genesis
This commit is contained in:
parent
0631ffc481
commit
77cfff5256
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -4001,7 +4001,9 @@ dependencies = [
|
||||
"logos-blockchain-key-management-system-service",
|
||||
"logos-blockchain-zone-sdk",
|
||||
"num-bigint 0.4.6",
|
||||
"ping_core",
|
||||
"reqwest",
|
||||
"risc0-zkvm",
|
||||
"sequencer_core",
|
||||
"sequencer_service_rpc",
|
||||
"serde_json",
|
||||
|
||||
@ -36,13 +36,13 @@ async fn two_zones_share_one_bedrock_and_both_advance() -> Result<()> {
|
||||
let channel_b = config::bedrock_channel_id_b();
|
||||
|
||||
// Empty genesis is enough: the clock transaction drives block production.
|
||||
let (seq_a, _seq_a_home) = setup_sequencer(partial, bedrock_addr, vec![], channel_a)
|
||||
let (seq_a, _seq_a_home) = setup_sequencer(partial, bedrock_addr, vec![], channel_a, None)
|
||||
.await
|
||||
.context("Failed to set up zone A sequencer")?;
|
||||
let (idx_a, _idx_a_home) = setup_indexer(bedrock_addr, channel_a)
|
||||
.await
|
||||
.context("Failed to set up zone A indexer")?;
|
||||
let (seq_b, _seq_b_home) = setup_sequencer(partial, bedrock_addr, vec![], channel_b)
|
||||
let (seq_b, _seq_b_home) = setup_sequencer(partial, bedrock_addr, vec![], channel_b, None)
|
||||
.await
|
||||
.context("Failed to set up zone B sequencer")?;
|
||||
let (idx_b, _idx_b_home) = setup_indexer(bedrock_addr, channel_b)
|
||||
|
||||
@ -199,6 +199,8 @@ impl V03State {
|
||||
this.insert_program(Program::bridge());
|
||||
this.insert_program(Program::cross_zone_outbox());
|
||||
this.insert_program(Program::cross_zone_inbox());
|
||||
this.insert_program(Program::ping_sender());
|
||||
this.insert_program(Program::ping_receiver());
|
||||
|
||||
this
|
||||
}
|
||||
@ -374,6 +376,13 @@ impl V03State {
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
/// Inserts an account directly into genesis state, bypassing execution.
|
||||
/// Genesis-only: used to seed configuration accounts that are not produced by
|
||||
/// any transaction. Must never be reachable from transaction processing.
|
||||
pub fn insert_genesis_account(&mut self, account_id: AccountId, account: Account) {
|
||||
self.public_state.insert(account_id, account);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test-utils"))]
|
||||
@ -716,6 +725,8 @@ pub mod tests {
|
||||
Program::cross_zone_outbox(),
|
||||
);
|
||||
this.insert(Program::cross_zone_inbox().id(), Program::cross_zone_inbox());
|
||||
this.insert(Program::ping_sender().id(), Program::ping_sender());
|
||||
this.insert(Program::ping_receiver().id(), Program::ping_receiver());
|
||||
this
|
||||
};
|
||||
|
||||
|
||||
@ -18,6 +18,8 @@ testnet_initial_state.workspace = true
|
||||
faucet_core.workspace = true
|
||||
bridge_core.workspace = true
|
||||
vault_core.workspace = true
|
||||
cross_zone_inbox_core = { workspace = true, features = ["host"] }
|
||||
ping_core.workspace = true
|
||||
|
||||
logos-blockchain-key-management-system-service.workspace = true
|
||||
logos-blockchain-core.workspace = true
|
||||
|
||||
@ -10,6 +10,7 @@ use bytesize::ByteSize;
|
||||
use common::config::BasicAuth;
|
||||
use humantime_serde;
|
||||
use lee::AccountId;
|
||||
use lee_core::program::ProgramId;
|
||||
use logos_blockchain_core::mantle::ops::channel::ChannelId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use url::Url;
|
||||
@ -27,6 +28,22 @@ pub enum GenesisAction {
|
||||
},
|
||||
}
|
||||
|
||||
/// A peer zone whose outbox this zone watches for inbound cross-zone messages.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct CrossZonePeer {
|
||||
/// The peer's Bedrock channel; its 32 bytes double as the peer's zone id.
|
||||
pub channel_id: [u8; 32],
|
||||
/// Programs on this zone a message from this peer is allowed to target.
|
||||
pub allowed_targets: Vec<ProgramId>,
|
||||
}
|
||||
|
||||
/// Cross-zone watcher configuration: the peers this zone reads from Bedrock and,
|
||||
/// per peer, the local programs they may deliver to. `None` disables the watcher.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct CrossZoneConfig {
|
||||
pub peers: Vec<CrossZonePeer>,
|
||||
}
|
||||
|
||||
// TODO: Provide default values
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct SequencerConfig {
|
||||
@ -53,6 +70,9 @@ pub struct SequencerConfig {
|
||||
/// Genesis configuration.
|
||||
#[serde(default)]
|
||||
pub genesis: Vec<GenesisAction>,
|
||||
/// Cross-zone messaging configuration. `None` disables the watcher.
|
||||
#[serde(default)]
|
||||
pub cross_zone: Option<CrossZoneConfig>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
|
||||
216
lez/sequencer/core/src/cross_zone_watcher.rs
Normal file
216
lez/sequencer/core/src/cross_zone_watcher.rs
Normal file
@ -0,0 +1,216 @@
|
||||
use std::{collections::BTreeMap, time::Duration};
|
||||
|
||||
use common::{block::Block, transaction::LeeTransaction};
|
||||
use cross_zone_inbox_core::{
|
||||
CrossZoneMessage, InboxConfig, build_inbox_dispatch_tx, inbox_config_account_id,
|
||||
};
|
||||
use futures::StreamExt as _;
|
||||
use lee::{AccountId, program::Program};
|
||||
use lee_core::{account::Account, program::ProgramId};
|
||||
use log::{error, info, warn};
|
||||
use logos_blockchain_core::mantle::ops::channel::ChannelId;
|
||||
use logos_blockchain_zone_sdk::{
|
||||
CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer,
|
||||
};
|
||||
use mempool::MemPoolHandle;
|
||||
use ping_core::SenderInstruction;
|
||||
|
||||
use crate::{
|
||||
TransactionOrigin,
|
||||
config::{BedrockConfig, CrossZoneConfig},
|
||||
};
|
||||
|
||||
/// The inbox config account this zone seeds at startup so the inbox guest can
|
||||
/// authorize inbound peer messages. The config is zone-specific (self zone plus
|
||||
/// per-peer target allowlists), so it cannot live in the shared genesis state.
|
||||
#[must_use]
|
||||
pub fn inbox_config_account(self_zone: [u8; 32], cross_zone: &CrossZoneConfig) -> (AccountId, Account) {
|
||||
let inbox_id = Program::cross_zone_inbox().id();
|
||||
|
||||
let mut allowed_targets = BTreeMap::new();
|
||||
for peer in &cross_zone.peers {
|
||||
allowed_targets.insert(peer.channel_id, peer.allowed_targets.clone());
|
||||
}
|
||||
let config = InboxConfig {
|
||||
self_zone,
|
||||
allowed_peers: BTreeMap::new(),
|
||||
allowed_targets,
|
||||
};
|
||||
|
||||
let account = Account {
|
||||
program_owner: inbox_id,
|
||||
balance: 0,
|
||||
data: config
|
||||
.to_bytes()
|
||||
.try_into()
|
||||
.expect("inbox config fits in account data"),
|
||||
nonce: 0_u128.into(),
|
||||
};
|
||||
(inbox_config_account_id(inbox_id), account)
|
||||
}
|
||||
|
||||
/// Spawns one watcher task per configured peer. Each task reads the peer's
|
||||
/// finalized blocks from Bedrock, recognizes outbound messages addressed to this
|
||||
/// zone, and injects the matching inbox dispatch as a sequencer-origin
|
||||
/// transaction into the local mempool.
|
||||
pub fn spawn_watchers(
|
||||
bedrock_config: &BedrockConfig,
|
||||
cross_zone: &CrossZoneConfig,
|
||||
poll_interval: Duration,
|
||||
mempool_handle: MemPoolHandle<(TransactionOrigin, LeeTransaction)>,
|
||||
) {
|
||||
let self_zone: [u8; 32] = *bedrock_config.channel_id.as_ref();
|
||||
let inbox_id = Program::cross_zone_inbox().id();
|
||||
let emitter_id = Program::ping_sender().id();
|
||||
|
||||
for peer in cross_zone.peers.clone() {
|
||||
let node = NodeHttpClient::new(
|
||||
CommonHttpClient::new(bedrock_config.auth.clone().map(Into::into)),
|
||||
bedrock_config.node_url.clone(),
|
||||
);
|
||||
tokio::spawn(watch_peer(
|
||||
ZoneIndexer::new(ChannelId::from(peer.channel_id), node),
|
||||
peer.channel_id,
|
||||
peer.allowed_targets,
|
||||
self_zone,
|
||||
inbox_id,
|
||||
emitter_id,
|
||||
poll_interval,
|
||||
mempool_handle.clone(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::too_many_arguments,
|
||||
reason = "Each parameter is an independent piece of per-peer watcher state"
|
||||
)]
|
||||
async fn watch_peer(
|
||||
zone_indexer: ZoneIndexer<NodeHttpClient>,
|
||||
peer_zone: [u8; 32],
|
||||
allowed_targets: Vec<ProgramId>,
|
||||
self_zone: [u8; 32],
|
||||
inbox_id: ProgramId,
|
||||
emitter_id: ProgramId,
|
||||
poll_interval: Duration,
|
||||
mempool_handle: MemPoolHandle<(TransactionOrigin, LeeTransaction)>,
|
||||
) {
|
||||
info!("Cross-zone watcher started for peer {}", hex::encode(peer_zone));
|
||||
|
||||
let mut cursor = None;
|
||||
loop {
|
||||
let stream = match zone_indexer.next_messages(cursor).await {
|
||||
Ok(stream) => stream,
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Watcher next_messages failed for peer {}: {err}",
|
||||
hex::encode(peer_zone)
|
||||
);
|
||||
tokio::time::sleep(poll_interval).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let mut stream = std::pin::pin!(stream);
|
||||
|
||||
while let Some((msg, slot)) = stream.next().await {
|
||||
let zone_block = match msg {
|
||||
ZoneMessage::Block(block) => block,
|
||||
ZoneMessage::Deposit(_) | ZoneMessage::Withdraw(_) => continue,
|
||||
};
|
||||
match borsh::from_slice::<Block>(&zone_block.data) {
|
||||
Ok(block) => {
|
||||
deliver_block(
|
||||
&block,
|
||||
peer_zone,
|
||||
self_zone,
|
||||
inbox_id,
|
||||
emitter_id,
|
||||
&allowed_targets,
|
||||
&mempool_handle,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(err) => error!("Watcher failed to deserialize peer block: {err}"),
|
||||
}
|
||||
cursor = Some(slot);
|
||||
}
|
||||
|
||||
// Stream ended (caught up to the peer's last finalized block); poll again.
|
||||
tokio::time::sleep(poll_interval).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Scans one peer block for outbound messages and injects a dispatch per match.
|
||||
///
|
||||
/// Option A (M3): the watcher recognizes the demo emitter and reads the outbound
|
||||
/// message straight off its instruction. M4 replaces this with re-derivation
|
||||
/// from the outbox PDA write, which removes the emitter-specific decoding.
|
||||
async fn deliver_block(
|
||||
block: &Block,
|
||||
peer_zone: [u8; 32],
|
||||
self_zone: [u8; 32],
|
||||
inbox_id: ProgramId,
|
||||
emitter_id: ProgramId,
|
||||
allowed_targets: &[ProgramId],
|
||||
mempool_handle: &MemPoolHandle<(TransactionOrigin, LeeTransaction)>,
|
||||
) {
|
||||
for (index, tx) in block.body.transactions.iter().enumerate() {
|
||||
let LeeTransaction::Public(public_tx) = tx else {
|
||||
continue;
|
||||
};
|
||||
let message = public_tx.message();
|
||||
if message.program_id != emitter_id {
|
||||
continue;
|
||||
}
|
||||
|
||||
let SenderInstruction::Send {
|
||||
target_zone,
|
||||
target_program_id,
|
||||
target_accounts,
|
||||
payload,
|
||||
..
|
||||
} = match risc0_zkvm::serde::from_slice(&message.instruction_data) {
|
||||
Ok(send) => send,
|
||||
Err(err) => {
|
||||
warn!("Watcher could not decode emitter instruction: {err}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if target_zone != self_zone {
|
||||
continue;
|
||||
}
|
||||
if !allowed_targets.contains(&target_program_id) {
|
||||
warn!(
|
||||
"Watcher dropping message to disallowed target from peer {}",
|
||||
hex::encode(peer_zone)
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
let cross_zone_message = CrossZoneMessage {
|
||||
src_zone: peer_zone,
|
||||
src_block_id: block.header.block_id,
|
||||
src_tx_index: u32::try_from(index).unwrap_or(u32::MAX),
|
||||
src_program_id: emitter_id,
|
||||
target_program_id,
|
||||
payload,
|
||||
l1_inclusion_witness: None,
|
||||
};
|
||||
let target_ids: Vec<AccountId> = target_accounts.into_iter().map(AccountId::new).collect();
|
||||
let dispatch = build_inbox_dispatch_tx(inbox_id, &cross_zone_message, target_ids);
|
||||
|
||||
match mempool_handle
|
||||
.push((TransactionOrigin::Sequencer, LeeTransaction::Public(dispatch)))
|
||||
.await
|
||||
{
|
||||
Ok(()) => info!(
|
||||
"Watcher injected cross-zone dispatch from peer {} block {} tx {}",
|
||||
hex::encode(peer_zone),
|
||||
block.header.block_id,
|
||||
index
|
||||
),
|
||||
Err(err) => error!("Watcher failed to enqueue inbox dispatch: {err}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -31,6 +31,7 @@ use crate::{
|
||||
pub mod block_publisher;
|
||||
pub mod block_store;
|
||||
pub mod config;
|
||||
pub mod cross_zone_watcher;
|
||||
|
||||
#[cfg(feature = "mock")]
|
||||
pub mod mock;
|
||||
@ -158,6 +159,17 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
|
||||
.expect("Failed to publish genesis block");
|
||||
}
|
||||
|
||||
// Cross-zone messaging: start a watcher per configured peer. The inbox
|
||||
// config account is seeded into genesis state in `build_genesis_state`.
|
||||
if let Some(cross_zone) = &config.cross_zone {
|
||||
cross_zone_watcher::spawn_watchers(
|
||||
&config.bedrock_config,
|
||||
cross_zone,
|
||||
config.block_create_timeout,
|
||||
mempool_handle.clone(),
|
||||
);
|
||||
}
|
||||
|
||||
let sequencer_core = Self {
|
||||
state,
|
||||
store,
|
||||
@ -609,6 +621,15 @@ fn build_genesis_state(config: &SequencerConfig) -> (lee::V03State, Vec<LeeTrans
|
||||
.map(LeeTransaction::Public)
|
||||
.collect();
|
||||
|
||||
// Seed this zone's cross-zone inbox config so the inbox guest can authorize
|
||||
// inbound peer messages (zone-specific config, not produced by any tx).
|
||||
if let Some(cross_zone) = &config.cross_zone {
|
||||
let self_zone = *config.bedrock_config.channel_id.as_ref();
|
||||
let (config_id, config_account) =
|
||||
cross_zone_watcher::inbox_config_account(self_zone, cross_zone);
|
||||
state.insert_genesis_account(config_id, config_account);
|
||||
}
|
||||
|
||||
(state, genesis_txs)
|
||||
}
|
||||
|
||||
@ -868,6 +889,7 @@ mod tests {
|
||||
},
|
||||
retry_pending_blocks_timeout: Duration::from_mins(4),
|
||||
genesis: vec![],
|
||||
cross_zone: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -6,7 +6,7 @@ use indexer_service::{ChannelId, ClientConfig, IndexerConfig};
|
||||
use key_protocol::key_management::KeyChain;
|
||||
use lee::{AccountId, PrivateKey, PublicKey};
|
||||
use lee_core::Identifier;
|
||||
use sequencer_core::config::{BedrockConfig, GenesisAction, SequencerConfig};
|
||||
use sequencer_core::config::{BedrockConfig, CrossZoneConfig, GenesisAction, SequencerConfig};
|
||||
use url::Url;
|
||||
use wallet::config::WalletConfig;
|
||||
|
||||
@ -68,6 +68,7 @@ pub fn sequencer_config(
|
||||
bedrock_addr: SocketAddr,
|
||||
genesis_transactions: Vec<GenesisAction>,
|
||||
channel_id: ChannelId,
|
||||
cross_zone: Option<CrossZoneConfig>,
|
||||
) -> Result<SequencerConfig> {
|
||||
let SequencerPartialConfig {
|
||||
max_num_tx_in_block,
|
||||
@ -91,6 +92,7 @@ pub fn sequencer_config(
|
||||
.context("Failed to convert bedrock addr to URL")?,
|
||||
auth: None,
|
||||
},
|
||||
cross_zone,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@ -344,6 +344,7 @@ impl TestContextBuilder {
|
||||
bedrock_addr,
|
||||
genesis,
|
||||
config::bedrock_channel_id(),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.context("Failed to setup Sequencer")?;
|
||||
|
||||
@ -116,6 +116,7 @@ pub async fn setup_sequencer(
|
||||
bedrock_addr: SocketAddr,
|
||||
genesis_transactions: Vec<GenesisAction>,
|
||||
channel_id: ChannelId,
|
||||
cross_zone: Option<sequencer_core::config::CrossZoneConfig>,
|
||||
) -> Result<(SequencerHandle, TempDir)> {
|
||||
let temp_sequencer_dir =
|
||||
tempfile::tempdir().context("Failed to create temp dir for sequencer home")?;
|
||||
@ -131,6 +132,7 @@ pub async fn setup_sequencer(
|
||||
bedrock_addr,
|
||||
genesis_transactions,
|
||||
channel_id,
|
||||
cross_zone,
|
||||
)
|
||||
.context("Failed to create Sequencer config")?;
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user