Merge pull request #389 from logos-blockchain/pg/zone-sdk-lez

feat(sequencer, indexer): Use zone-sdk instead of bedrock client
This commit is contained in:
Daniil Polyakov 2026-05-05 14:36:16 +03:00 committed by GitHub
commit 332bd29e93
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 2033 additions and 1337 deletions

1634
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -36,7 +36,6 @@ members = [
"examples/program_deployment",
"examples/program_deployment/methods",
"examples/program_deployment/methods/guest",
"bedrock_client",
"testnet_initial_state",
"indexer_ffi",
]
@ -67,7 +66,6 @@ amm_program = { path = "programs/amm" }
ata_core = { path = "programs/associated_token_account/core" }
ata_program = { path = "programs/associated_token_account" }
test_program_methods = { path = "test_program_methods" }
bedrock_client = { path = "bedrock_client" }
testnet_initial_state = { path = "testnet_initial_state" }
tokio = { version = "1.50", features = [
@ -122,11 +120,12 @@ tokio-retry = "0.3.0"
schemars = "1.2"
async-stream = "0.3.6"
logos-blockchain-common-http-client = { git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "1da154c74b911318fb853d37261f8a05ffe513b4" }
logos-blockchain-key-management-system-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "1da154c74b911318fb853d37261f8a05ffe513b4" }
logos-blockchain-core = { git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "1da154c74b911318fb853d37261f8a05ffe513b4" }
logos-blockchain-chain-broadcast-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "1da154c74b911318fb853d37261f8a05ffe513b4" }
logos-blockchain-chain-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "1da154c74b911318fb853d37261f8a05ffe513b4" }
logos-blockchain-common-http-client = { git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "ee281a447d95a951752461ee0a6e88eb4a0f17cf" }
logos-blockchain-key-management-system-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "ee281a447d95a951752461ee0a6e88eb4a0f17cf" }
logos-blockchain-core = { git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "ee281a447d95a951752461ee0a6e88eb4a0f17cf" }
logos-blockchain-chain-broadcast-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "ee281a447d95a951752461ee0a6e88eb4a0f17cf" }
logos-blockchain-chain-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "ee281a447d95a951752461ee0a6e88eb4a0f17cf" }
logos-blockchain-zone-sdk = { git = "https://github.com/logos-blockchain/logos-blockchain.git", rev = "ee281a447d95a951752461ee0a6e88eb4a0f17cf" }
rocksdb = { version = "0.24.0", default-features = false, features = [
"snappy",

View File

@ -39,42 +39,42 @@ cryptarchia:
threshold: 1
timestamp: 0
gossipsub_protocol: /integration/logos-blockchain/cryptarchia/proto/1.0.0
genesis_state:
mantle_tx:
ops:
genesis_block:
header:
version: Bedrock
parent_block: '0000000000000000000000000000000000000000000000000000000000000000'
slot: 0
block_root: b5f8787ac23674822414c70eea15d842da38f2e806ede1a73cf7b5cf0277da07
proof_of_leadership:
proof: '0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000'
entropy_contribution: '0000000000000000000000000000000000000000000000000000000000000000'
leader_key: '0000000000000000000000000000000000000000000000000000000000000000'
voucher_cm: '0000000000000000000000000000000000000000000000000000000000000000'
signature: '00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000'
transactions:
- mantle_tx:
ops:
- opcode: 0
payload:
inputs: [ ]
inputs: []
outputs:
- value: 1
pk: d204000000000000000000000000000000000000000000000000000000000000
- value: 100
pk: 2e03b2eff5a45478e7e79668d2a146cf2c5c7925bce927f2b1c67f2ab4fc0d26
- value: 1
pk: d204000000000000000000000000000000000000000000000000000000000000
- value: 100
pk: '2e03b2eff5a45478e7e79668d2a146cf2c5c7925bce927f2b1c67f2ab4fc0d26'
- value: 1
pk: ed266e6e887b9b97059dc1aa1b7b2e19b934291753c6336a163fe4ebaa28e717
- opcode: 17
payload:
channel_id: "0000000000000000000000000000000000000000000000000000000000000000"
inscription: [ 103, 101, 110, 101, 115, 105, 115 ] # "genesis" in bytes
parent: "0000000000000000000000000000000000000000000000000000000000000000"
signer: "0000000000000000000000000000000000000000000000000000000000000000"
execution_gas_price: 0
storage_gas_price: 0
ops_proofs:
- !ZkSig
pi_a: [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
]
pi_b: [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
]
pi_c: [
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
]
- NoProof
channel_id: '0000000000000000000000000000000000000000000000000000000000000000'
inscription: '67656e65736973'
parent: '0000000000000000000000000000000000000000000000000000000000000000'
signer: '0000000000000000000000000000000000000000000000000000000000000000'
execution_gas_price: 0
storage_gas_price: 0
ops_proofs:
- !Ed25519Sig '00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000'
- !Ed25519Sig '00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000'
time:
slot_duration: '1.0'
chain_start_time: PLACEHOLDER_CHAIN_START_TIME

View File

@ -1,7 +1,7 @@
services:
logos-blockchain-node-0:
image: ghcr.io/logos-blockchain/logos-blockchain@sha256:c5243681b353278cabb562a176f0a5cfbefc2056f18cebc47fe0e3720c29fb12
image: ghcr.io/logos-blockchain/logos-blockchain@sha256:9f1829dea335c56f6ff68ae37ea872ed5313b96b69e8ffe143c02b7217de85fc
ports:
- "${PORT:-8080}:18080/tcp"
volumes:

View File

@ -1,23 +0,0 @@
[package]
name = "bedrock_client"
version = "0.1.0"
edition = "2024"
license = { workspace = true }
[lints]
workspace = true
[dependencies]
common.workspace = true
reqwest.workspace = true
anyhow.workspace = true
tokio-retry.workspace = true
futures.workspace = true
log.workspace = true
serde.workspace = true
humantime-serde.workspace = true
logos-blockchain-common-http-client.workspace = true
logos-blockchain-core.workspace = true
logos-blockchain-chain-broadcast-service.workspace = true
logos-blockchain-chain-service.workspace = true

View File

@ -1,121 +0,0 @@
use std::time::Duration;
use anyhow::{Context as _, Result};
use common::config::BasicAuth;
use futures::{Stream, TryFutureExt as _};
#[expect(clippy::single_component_path_imports, reason = "Satisfy machete")]
use humantime_serde;
use log::{info, warn};
pub use logos_blockchain_chain_broadcast_service::BlockInfo;
use logos_blockchain_chain_service::CryptarchiaInfo;
pub use logos_blockchain_common_http_client::{CommonHttpClient, Error};
pub use logos_blockchain_core::{block::Block, header::HeaderId, mantle::SignedMantleTx};
use reqwest::{Client, Url};
use serde::{Deserialize, Serialize};
use tokio_retry::Retry;
/// Fibonacci backoff retry strategy configuration.
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
pub struct BackoffConfig {
#[serde(with = "humantime_serde")]
pub start_delay: Duration,
pub max_retries: usize,
}
impl Default for BackoffConfig {
fn default() -> Self {
Self {
start_delay: Duration::from_millis(100),
max_retries: 5,
}
}
}
/// Simple wrapper
/// maybe extend in the future for our purposes
/// `Clone` is cheap because `CommonHttpClient` is internally reference counted (`Arc`).
#[derive(Clone)]
pub struct BedrockClient {
http_client: CommonHttpClient,
node_url: Url,
backoff: BackoffConfig,
}
impl BedrockClient {
pub fn new(backoff: BackoffConfig, node_url: Url, auth: Option<BasicAuth>) -> Result<Self> {
info!("Creating Bedrock client with node URL {node_url}");
let client = Client::builder()
//Add more fields if needed
.timeout(std::time::Duration::from_mins(1))
.build()
.context("Failed to build HTTP client")?;
let auth = auth.map(|a| {
logos_blockchain_common_http_client::BasicAuthCredentials::new(a.username, a.password)
});
let http_client = CommonHttpClient::new_with_client(client, auth);
Ok(Self {
http_client,
node_url,
backoff,
})
}
pub async fn post_transaction(&self, tx: SignedMantleTx) -> Result<Result<(), Error>, Error> {
Retry::spawn(self.backoff_strategy(), || async {
match self
.http_client
.post_transaction(self.node_url.clone(), tx.clone())
.await
{
Ok(()) => Ok(Ok(())),
Err(err) => match err {
// Retry arm.
// Retrying only reqwest errors: mainly connected to http.
Error::Request(_) => Err(err),
// Returning non-retryable error
Error::Server(_) | Error::Client(_) | Error::Url(_) => Ok(Err(err)),
},
}
})
.await
}
pub async fn get_lib_stream(&self) -> Result<impl Stream<Item = BlockInfo>, Error> {
self.http_client.get_lib_stream(self.node_url.clone()).await
}
pub async fn get_block_by_id(
&self,
header_id: HeaderId,
) -> Result<Option<Block<SignedMantleTx>>, Error> {
Retry::spawn(self.backoff_strategy(), || {
self.http_client
.get_block_by_id(self.node_url.clone(), header_id)
.inspect_err(|err| warn!("Block fetching failed with error: {err:#}"))
})
.await
}
pub async fn get_consensus_info(&self) -> Result<CryptarchiaInfo, Error> {
Retry::spawn(self.backoff_strategy(), || {
self.http_client
.consensus_info(self.node_url.clone())
.inspect_err(|err| warn!("Block fetching failed with error: {err:#}"))
})
.await
}
fn backoff_strategy(&self) -> impl Iterator<Item = Duration> {
let start_delay_millis = self
.backoff
.start_delay
.as_millis()
.try_into()
.expect("Start delay must be less than u64::MAX milliseconds");
tokio_retry::strategy::FibonacciBackoff::from_millis(start_delay_millis)
.take(self.backoff.max_retries)
}
}

View File

@ -1,12 +1,8 @@
{
"home": "./indexer/service",
"consensus_info_polling_interval": "1s",
"bedrock_client_config": {
"addr": "http://logos-blockchain-node-0:18080",
"backoff": {
"start_delay": "100ms",
"max_retries": 5
}
"bedrock_config": {
"addr": "http://logos-blockchain-node-0:18080"
},
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101",
"initial_accounts": [

View File

@ -9,7 +9,7 @@ workspace = true
[dependencies]
common.workspace = true
bedrock_client.workspace = true
logos-blockchain-zone-sdk.workspace = true
nssa.workspace = true
nssa_core.workspace = true
storage.workspace = true
@ -19,13 +19,13 @@ anyhow.workspace = true
log.workspace = true
serde.workspace = true
humantime-serde.workspace = true
tokio.workspace = true
borsh.workspace = true
futures.workspace = true
url.workspace = true
logos-blockchain-core.workspace = true
serde_json.workspace = true
async-stream.workspace = true
tokio.workspace = true
[dev-dependencies]
tempfile.workspace = true

View File

@ -1,11 +1,12 @@
use std::{path::Path, sync::Arc};
use anyhow::Result;
use bedrock_client::HeaderId;
use anyhow::{Context as _, Result};
use common::{
block::{BedrockStatus, Block},
transaction::{NSSATransaction, clock_invocation},
};
use logos_blockchain_core::{header::HeaderId, mantle::ops::channel::MsgId};
use logos_blockchain_zone_sdk::Slot;
use nssa::{Account, AccountId, V03State};
use nssa_core::BlockId;
use storage::indexer::RocksDBIO;
@ -103,6 +104,22 @@ impl IndexerStore {
Ok(self.dbio.calculate_state_for_id(block_id)?)
}
pub fn get_zone_cursor(&self) -> Result<Option<(MsgId, Slot)>> {
let Some(bytes) = self.dbio.get_zone_sdk_indexer_cursor_bytes()? else {
return Ok(None);
};
let cursor: (MsgId, Slot) = serde_json::from_slice(&bytes)
.context("Failed to deserialize stored zone-sdk indexer cursor")?;
Ok(Some(cursor))
}
pub fn set_zone_cursor(&self, cursor: &(MsgId, Slot)) -> Result<()> {
let bytes =
serde_json::to_vec(cursor).context("Failed to serialize zone-sdk indexer cursor")?;
self.dbio.put_zone_sdk_indexer_cursor_bytes(&bytes)?;
Ok(())
}
/// Recalculation of final state directly from DB.
///
/// Used for indexer healthcheck.

View File

@ -6,7 +6,6 @@ use std::{
};
use anyhow::{Context as _, Result};
pub use bedrock_client::BackoffConfig;
use common::config::BasicAuth;
use humantime_serde;
pub use logos_blockchain_core::mantle::ops::channel::ChannelId;
@ -16,8 +15,6 @@ use url::Url;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClientConfig {
/// For individual RPC requests we use Fibonacci backoff retry strategy.
pub backoff: BackoffConfig,
pub addr: Url,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub auth: Option<BasicAuth>,
@ -31,7 +28,7 @@ pub struct IndexerConfig {
pub signing_key: [u8; 32],
#[serde(with = "humantime_serde")]
pub consensus_info_polling_interval: Duration,
pub bedrock_client_config: ClientConfig,
pub bedrock_config: ClientConfig,
pub channel_id: ChannelId,
#[serde(skip_serializing_if = "Option::is_none")]
pub initial_public_accounts: Option<Vec<PublicAccountPublicInitialData>>,

View File

@ -1,15 +1,14 @@
use std::collections::VecDeque;
use std::sync::Arc;
use anyhow::Result;
use bedrock_client::{BedrockClient, HeaderId};
use common::{
HashType, PINATA_BASE58,
block::{Block, HashableBlockData},
};
use log::{debug, error, info};
use logos_blockchain_core::mantle::{
Op, SignedMantleTx,
ops::channel::{ChannelId, inscribe::InscriptionOp},
use common::block::{Block, HashableBlockData};
// ToDo: Remove after testnet
use common::{HashType, PINATA_BASE58};
use futures::StreamExt as _;
use log::{error, info, warn};
use logos_blockchain_core::header::HeaderId;
use logos_blockchain_zone_sdk::{
CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer,
};
use nssa::V03State;
use testnet_initial_state::initial_state_testnet;
@ -21,25 +20,11 @@ pub mod config;
#[derive(Clone)]
pub struct IndexerCore {
pub bedrock_client: BedrockClient,
pub zone_indexer: Arc<ZoneIndexer<NodeHttpClient>>,
pub config: IndexerConfig,
pub store: IndexerStore,
}
#[derive(Clone)]
/// This struct represents one L1 block data fetched from backfilling.
pub struct BackfillBlockData {
l2_blocks: Vec<Block>,
l1_header: HeaderId,
}
#[derive(Clone)]
/// This struct represents data fetched fom backfilling in one iteration.
pub struct BackfillData {
block_data: VecDeque<BackfillBlockData>,
curr_fin_l1_lib_header: HeaderId,
}
impl IndexerCore {
pub fn new(config: IndexerConfig) -> Result<Self> {
let hashable_data = HashableBlockData {
@ -107,279 +92,88 @@ impl IndexerCore {
let home = config.home.join("rocksdb");
let basic_auth = config.bedrock_config.auth.clone().map(Into::into);
let node = NodeHttpClient::new(
CommonHttpClient::new(basic_auth),
config.bedrock_config.addr.clone(),
);
let zone_indexer = ZoneIndexer::new(config.channel_id, node);
Ok(Self {
bedrock_client: BedrockClient::new(
config.bedrock_client_config.backoff,
config.bedrock_client_config.addr.clone(),
config.bedrock_client_config.auth.clone(),
)?,
zone_indexer: Arc::new(zone_indexer),
config,
store: IndexerStore::open_db_with_genesis(&home, &genesis_block, &state)?,
})
}
pub fn subscribe_parse_block_stream(&self) -> impl futures::Stream<Item = Result<Block>> {
pub fn subscribe_parse_block_stream(&self) -> impl futures::Stream<Item = Result<Block>> + '_ {
let poll_interval = self.config.consensus_info_polling_interval;
let initial_cursor = self
.store
.get_zone_cursor()
.expect("Failed to load zone-sdk indexer cursor");
async_stream::stream! {
info!("Searching for initial header");
let mut cursor = initial_cursor;
let last_stored_l1_lib_header = self.store.last_observed_l1_lib_header()?;
let mut prev_last_l1_lib_header = if let Some(last_l1_lib_header) = last_stored_l1_lib_header {
info!("Last l1 lib header found: {last_l1_lib_header}");
last_l1_lib_header
if cursor.is_some() {
info!("Resuming indexer from cursor {cursor:?}");
} else {
info!("Last l1 lib header not found in DB");
info!("Searching for the start of a channel");
let BackfillData {
block_data: start_buff,
curr_fin_l1_lib_header: last_l1_lib_header,
} = self.search_for_channel_start().await?;
for BackfillBlockData {
l2_blocks: l2_block_vec,
l1_header,
} in start_buff {
let mut l2_blocks_parsed_ids: Vec<_> = l2_block_vec.iter().map(|block| block.header.block_id).collect();
l2_blocks_parsed_ids.sort_unstable();
info!("Parsed {} L2 blocks with ids {:?}", l2_block_vec.len(), l2_blocks_parsed_ids);
for l2_block in l2_block_vec {
// TODO: proper fix is to make the sequencer's genesis include a
// trailing `clock_invocation(0)` (and have the indexer's
// `open_db_with_genesis` not pre-apply state transitions) so the
// inscribed genesis can flow through `put_block` like any other
// block. For now we skip re-applying it.
//
// The channel-start (block_id == 1) is the sequencer's genesis
// inscription that we re-discover during initial search. The
// indexer already has its own locally-constructed genesis in
// the store from `open_db_with_genesis`, so re-applying the
// inscribed copy is both redundant and would fail the strict
// block validation in `put_block` (the inscribed genesis lacks
// the trailing clock invocation).
if l2_block.header.block_id != 1 {
self
.store
.put_block(l2_block.clone(), l1_header)
.await
.inspect_err(|err| error!("Failed to put block with err {err:?}"))?;
}
yield Ok(l2_block);
}
}
last_l1_lib_header
};
info!("Searching for initial header finished");
info!("Starting backfilling from {prev_last_l1_lib_header}");
info!("Starting indexer from beginning of channel");
}
loop {
let BackfillData {
block_data: buff,
curr_fin_l1_lib_header,
} = self
.backfill_to_last_l1_lib_header_id(prev_last_l1_lib_header, &self.config.channel_id)
.await
.inspect_err(|err| error!("Failed to backfill to last l1 lib header id with err {err:#?}"))?;
prev_last_l1_lib_header = curr_fin_l1_lib_header;
for BackfillBlockData {
l2_blocks: l2_block_vec,
l1_header: header,
} in buff {
let mut l2_blocks_parsed_ids: Vec<_> = l2_block_vec.iter().map(|block| block.header.block_id).collect();
l2_blocks_parsed_ids.sort_unstable();
info!("Parsed {} L2 blocks with ids {:?}", l2_block_vec.len(), l2_blocks_parsed_ids);
for l2_block in l2_block_vec {
self.store.put_block(l2_block.clone(), header).await?;
yield Ok(l2_block);
let stream = match self.zone_indexer.next_messages(cursor).await {
Ok(s) => s,
Err(err) => {
error!("Failed to start zone-sdk next_messages stream: {err}");
tokio::time::sleep(poll_interval).await;
continue;
}
}
}
}
}
async fn get_lib(&self) -> Result<HeaderId> {
Ok(self.bedrock_client.get_consensus_info().await?.lib)
}
async fn get_next_lib(&self, prev_lib: HeaderId) -> Result<HeaderId> {
loop {
let next_lib = self.get_lib().await?;
if next_lib == prev_lib {
info!(
"Wait {:?} to not spam the node",
self.config.consensus_info_polling_interval
);
tokio::time::sleep(self.config.consensus_info_polling_interval).await;
} else {
break Ok(next_lib);
}
}
}
/// WARNING: depending on channel state,
/// may take indefinite amount of time.
pub async fn search_for_channel_start(&self) -> Result<BackfillData> {
let mut curr_last_l1_lib_header = self.get_lib().await?;
let mut backfill_start = curr_last_l1_lib_header;
// ToDo: How to get root?
let mut backfill_limit = HeaderId::from([0; 32]);
// ToDo: Not scalable, initial buffer should be stored in DB to not run out of memory
// Don't want to complicate DB even more right now.
let mut block_buffer = VecDeque::new();
'outer: loop {
let mut cycle_header = curr_last_l1_lib_header;
loop {
let Some(cycle_block) = self.bedrock_client.get_block_by_id(cycle_header).await?
else {
// First run can reach root easily
// so here we are optimistic about L1
// failing to get parent.
break;
};
let mut stream = std::pin::pin!(stream);
// It would be better to have id, but block does not have it, so slot will do.
info!(
"INITIAL SEARCH: Observed L1 block at slot {}",
cycle_block.header().slot().into_inner()
);
debug!(
"INITIAL SEARCH: This block header is {}",
cycle_block.header().id()
);
debug!(
"INITIAL SEARCH: This block parent is {}",
cycle_block.header().parent()
);
while let Some((msg, slot)) = stream.next().await {
let zone_block = match msg {
ZoneMessage::Block(b) => b,
// Non-block messages don't carry a cursor position; the
// next ZoneBlock advances past them implicitly.
ZoneMessage::Deposit(_) | ZoneMessage::Withdraw(_) => continue,
};
let (l2_block_vec, l1_header) =
parse_block_owned(&cycle_block, &self.config.channel_id);
let block: Block = match borsh::from_slice(&zone_block.data) {
Ok(b) => b,
Err(e) => {
error!("Failed to deserialize L2 block from zone-sdk: {e}");
// Advance past the broken inscription so we don't
// re-process it on restart.
cursor = Some((zone_block.id, slot));
if let Err(err) = self.store.set_zone_cursor(&(zone_block.id, slot)) {
warn!("Failed to persist indexer cursor: {err:#}");
}
continue;
}
};
info!("Parsed {} L2 blocks", l2_block_vec.len());
info!("Indexed L2 block {}", block.header.block_id);
if !l2_block_vec.is_empty() {
block_buffer.push_front(BackfillBlockData {
l2_blocks: l2_block_vec.clone(),
l1_header,
});
}
if let Some(first_l2_block) = l2_block_vec.first()
&& first_l2_block.header.block_id == 1
{
info!("INITIAL_SEARCH: Found channel start");
break 'outer;
}
// Step back to parent
let parent = cycle_block.header().parent();
if parent == backfill_limit {
break;
}
cycle_header = parent;
}
info!("INITIAL_SEARCH: Reached backfill limit, refetching last l1 lib header");
block_buffer.clear();
backfill_limit = backfill_start;
curr_last_l1_lib_header = self.get_next_lib(curr_last_l1_lib_header).await?;
backfill_start = curr_last_l1_lib_header;
}
Ok(BackfillData {
block_data: block_buffer,
curr_fin_l1_lib_header: curr_last_l1_lib_header,
})
}
pub async fn backfill_to_last_l1_lib_header_id(
&self,
last_fin_l1_lib_header: HeaderId,
channel_id: &ChannelId,
) -> Result<BackfillData> {
let curr_fin_l1_lib_header = self.get_next_lib(last_fin_l1_lib_header).await?;
// ToDo: Not scalable, buffer should be stored in DB to not run out of memory
// Don't want to complicate DB even more right now.
let mut block_buffer = VecDeque::new();
let mut cycle_header = curr_fin_l1_lib_header;
loop {
let Some(cycle_block) = self.bedrock_client.get_block_by_id(cycle_header).await? else {
return Err(anyhow::anyhow!("Parent not found"));
};
if cycle_block.header().id() == last_fin_l1_lib_header {
break;
}
// Step back to parent
cycle_header = cycle_block.header().parent();
// It would be better to have id, but block does not have it, so slot will do.
info!(
"Observed L1 block at slot {}",
cycle_block.header().slot().into_inner()
);
let (l2_block_vec, l1_header) = parse_block_owned(&cycle_block, channel_id);
info!("Parsed {} L2 blocks", l2_block_vec.len());
if !l2_block_vec.is_empty() {
block_buffer.push_front(BackfillBlockData {
l2_blocks: l2_block_vec,
l1_header,
});
}
}
Ok(BackfillData {
block_data: block_buffer,
curr_fin_l1_lib_header,
})
}
}
fn parse_block_owned(
l1_block: &bedrock_client::Block<SignedMantleTx>,
decoded_channel_id: &ChannelId,
) -> (Vec<Block>, HeaderId) {
(
#[expect(
clippy::wildcard_enum_match_arm,
reason = "We are only interested in channel inscription ops, so it's fine to ignore the rest"
)]
l1_block
.transactions()
.flat_map(|tx| {
tx.mantle_tx.ops.iter().filter_map(|op| match op {
Op::ChannelInscribe(InscriptionOp {
channel_id,
inscription,
..
}) if channel_id == decoded_channel_id => {
borsh::from_slice::<Block>(inscription)
.inspect_err(|err| {
error!("Failed to deserialize our inscription with err: {err:#?}");
})
.ok()
// TODO: Remove l1_header placeholder once storage layer
// no longer requires it. Zone-sdk handles L1 tracking internally.
let placeholder_l1_header = HeaderId::from([0_u8; 32]);
if let Err(err) = self.store.put_block(block.clone(), placeholder_l1_header).await {
error!("Failed to store block {}: {err:#}", block.header.block_id);
}
_ => None,
})
})
.collect(),
l1_block.header().id(),
)
cursor = Some((zone_block.id, slot));
if let Err(err) = self.store.set_zone_cursor(&(zone_block.id, slot)) {
warn!("Failed to persist indexer cursor: {err:#}");
}
yield Ok(block);
}
// Stream ended (caught up to LIB). Sleep then poll again.
tokio::time::sleep(poll_interval).await;
}
}
}
}

View File

@ -1,12 +1,8 @@
{
"home": ".",
"consensus_info_polling_interval": "1s",
"bedrock_client_config": {
"addr": "http://localhost:8080",
"backoff": {
"start_delay": "100ms",
"max_retries": 5
}
"bedrock_config": {
"addr": "http://localhost:8080"
},
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101",
"initial_accounts": [

View File

@ -19,8 +19,9 @@ indexer_service.workspace = true
serde_json.workspace = true
token_core.workspace = true
ata_core.workspace = true
indexer_service_rpc.workspace = true
indexer_service_rpc = { workspace = true, features = ["client"] }
sequencer_service_rpc = { workspace = true, features = ["client"] }
jsonrpsee = { workspace = true, features = ["ws-client"] }
wallet-ffi.workspace = true
indexer_ffi.workspace = true
testnet_initial_state.workspace = true

View File

@ -2,7 +2,7 @@ use std::{net::SocketAddr, path::PathBuf, time::Duration};
use anyhow::{Context as _, Result};
use bytesize::ByteSize;
use indexer_service::{BackoffConfig, ChannelId, ClientConfig, IndexerConfig};
use indexer_service::{ChannelId, ClientConfig, IndexerConfig};
use key_protocol::key_management::KeyChain;
use nssa::{Account, AccountId, PrivateKey, PublicKey};
use nssa_core::{account::Data, program::DEFAULT_PROGRAM_ID};
@ -164,35 +164,10 @@ impl std::fmt::Display for UrlProtocol {
}
}
pub fn indexer_config(
bedrock_addr: SocketAddr,
home: PathBuf,
initial_data: &InitialData,
) -> Result<IndexerConfig> {
Ok(IndexerConfig {
home,
consensus_info_polling_interval: Duration::from_secs(1),
bedrock_client_config: ClientConfig {
addr: addr_to_url(UrlProtocol::Http, bedrock_addr)
.context("Failed to convert bedrock addr to URL")?,
auth: None,
backoff: BackoffConfig {
start_delay: Duration::from_millis(100),
max_retries: 10,
},
},
initial_public_accounts: Some(initial_data.sequencer_initial_public_accounts()),
initial_private_accounts: Some(initial_data.sequencer_initial_private_accounts()),
signing_key: [37; 32],
channel_id: bedrock_channel_id(),
})
}
pub fn sequencer_config(
partial: SequencerPartialConfig,
home: PathBuf,
bedrock_addr: SocketAddr,
indexer_addr: SocketAddr,
initial_data: &InitialData,
) -> Result<SequencerConfig> {
let SequencerPartialConfig {
@ -215,17 +190,11 @@ pub fn sequencer_config(
initial_private_accounts: Some(initial_data.sequencer_initial_private_accounts()),
signing_key: [37; 32],
bedrock_config: BedrockConfig {
backoff: BackoffConfig {
start_delay: Duration::from_millis(100),
max_retries: 5,
},
channel_id: bedrock_channel_id(),
node_url: addr_to_url(UrlProtocol::Http, bedrock_addr)
.context("Failed to convert bedrock addr to URL")?,
auth: None,
},
indexer_rpc_url: addr_to_url(UrlProtocol::Ws, indexer_addr)
.context("Failed to convert indexer addr to URL")?,
})
}
@ -245,6 +214,26 @@ pub fn wallet_config(
})
}
pub fn indexer_config(
bedrock_addr: SocketAddr,
home: PathBuf,
initial_data: &InitialData,
) -> Result<IndexerConfig> {
Ok(IndexerConfig {
home,
consensus_info_polling_interval: Duration::from_secs(1),
bedrock_config: ClientConfig {
addr: addr_to_url(UrlProtocol::Http, bedrock_addr)
.context("Failed to convert bedrock addr to URL")?,
auth: None,
},
initial_public_accounts: Some(initial_data.sequencer_initial_public_accounts()),
initial_private_accounts: Some(initial_data.sequencer_initial_private_accounts()),
signing_key: [37; 32],
channel_id: bedrock_channel_id(),
})
}
pub fn addr_to_url(protocol: UrlProtocol, addr: SocketAddr) -> Result<Url> {
// Convert 0.0.0.0 to 127.0.0.1 for client connections
// When binding to port 0, the server binds to 0.0.0.0:<random_port>

View File

@ -0,0 +1,34 @@
//! Thin client wrapper for querying the indexer's JSON-RPC API in tests.
//!
//! The sequencer doesn't depend on the indexer at runtime — finalization comes
//! from zone-sdk events. This wrapper exists purely for test ergonomics so
//! integration tests can construct a single connection and call
//! `indexer_service_rpc::RpcClient` methods directly via `Deref`.
use std::ops::Deref;
use anyhow::{Context as _, Result};
use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
use log::info;
use url::Url;
pub struct IndexerClient(WsClient);
impl IndexerClient {
pub async fn new(indexer_url: &Url) -> Result<Self> {
info!("Connecting to Indexer at {indexer_url}");
let client = WsClientBuilder::default()
.build(indexer_url)
.await
.context("Failed to create websocket client")?;
Ok(Self(client))
}
}
impl Deref for IndexerClient {
type Target = WsClient;
fn deref(&self) -> &Self::Target {
&self.0
}
}

View File

@ -9,16 +9,19 @@ use indexer_service::IndexerHandle;
use log::{debug, error};
use nssa::{AccountId, PrivacyPreservingTransaction};
use nssa_core::Commitment;
use sequencer_core::indexer_client::{IndexerClient, IndexerClientTrait as _};
use sequencer_service::SequencerHandle;
use sequencer_service_rpc::{RpcClient as _, SequencerClient, SequencerClientBuilder};
use tempfile::TempDir;
use testcontainers::compose::DockerCompose;
use wallet::WalletCore;
use crate::setup::{setup_bedrock_node, setup_indexer, setup_sequencer, setup_wallet};
use crate::{
indexer_client::IndexerClient,
setup::{setup_bedrock_node, setup_indexer, setup_sequencer, setup_wallet},
};
pub mod config;
pub mod indexer_client;
pub mod setup;
pub mod test_context_ffi;
@ -77,14 +80,10 @@ impl TestContext {
.await
.context("Failed to setup Indexer")?;
let (sequencer_handle, temp_sequencer_dir) = setup_sequencer(
sequencer_partial_config,
bedrock_addr,
indexer_handle.addr(),
&initial_data,
)
.await
.context("Failed to setup Sequencer")?;
let (sequencer_handle, temp_sequencer_dir) =
setup_sequencer(sequencer_partial_config, bedrock_addr, &initial_data)
.await
.context("Failed to setup Sequencer")?;
let (wallet, temp_wallet_dir, wallet_password) =
setup_wallet(sequencer_handle.addr(), &initial_data)

View File

@ -119,7 +119,6 @@ pub(crate) async fn setup_indexer(
pub(crate) async fn setup_sequencer(
partial: config::SequencerPartialConfig,
bedrock_addr: SocketAddr,
indexer_addr: SocketAddr,
initial_data: &config::InitialData,
) -> Result<(SequencerHandle, TempDir)> {
let temp_sequencer_dir =
@ -134,7 +133,6 @@ pub(crate) async fn setup_sequencer(
partial,
temp_sequencer_dir.path().to_owned(),
bedrock_addr,
indexer_addr,
initial_data,
)
.context("Failed to create Sequencer config")?;

View File

@ -6,7 +6,6 @@ use indexer_ffi::IndexerServiceFFI;
use indexer_service_rpc::RpcClient as _;
use log::{debug, error};
use nssa::AccountId;
use sequencer_core::indexer_client::{IndexerClient, IndexerClientTrait as _};
use sequencer_service::SequencerHandle;
use sequencer_service_rpc::{RpcClient as _, SequencerClient, SequencerClientBuilder};
use tempfile::TempDir;
@ -15,6 +14,7 @@ use wallet::WalletCore;
use crate::{
BEDROCK_SERVICE_WITH_OPEN_PORT, LOGGER, TestContextBuilder, config,
indexer_client::IndexerClient,
setup::{setup_bedrock_node, setup_indexer_ffi, setup_sequencer, setup_wallet},
};
@ -85,8 +85,6 @@ impl TestContextFFI {
.block_on(setup_sequencer(
sequencer_partial_config,
bedrock_addr,
// SAFETY: addr is valid if indexer_ffi is valid.
unsafe { indexer_ffi.addr() },
initial_data,
))
.context("Failed to setup Sequencer")?;

View File

@ -13,7 +13,7 @@ nssa_core.workspace = true
common.workspace = true
storage.workspace = true
mempool.workspace = true
bedrock_client.workspace = true
logos-blockchain-zone-sdk.workspace = true
testnet_initial_state.workspace = true
anyhow.workspace = true
@ -30,7 +30,6 @@ rand.workspace = true
borsh.workspace = true
bytesize.workspace = true
url.workspace = true
jsonrpsee = { workspace = true, features = ["ws-client"] }
[features]
default = []

View File

@ -0,0 +1,136 @@
use std::{sync::Arc, time::Duration};
use anyhow::{Context as _, Result, anyhow};
use common::block::Block;
use log::warn;
pub use logos_blockchain_core::mantle::ops::channel::MsgId;
pub use logos_blockchain_key_management_system_service::keys::Ed25519Key;
pub use logos_blockchain_zone_sdk::sequencer::SequencerCheckpoint;
use logos_blockchain_zone_sdk::{
CommonHttpClient,
adapter::NodeHttpClient,
sequencer::{Event, SequencerConfig as ZoneSdkSequencerConfig, SequencerHandle, ZoneSequencer},
state::InscriptionInfo,
};
use tokio::task::JoinHandle;
use crate::config::BedrockConfig;
/// Sink for `Event::Published` checkpoints emitted by the drive task.
/// Caller is responsible for persistence (e.g. writing to rocksdb).
pub type CheckpointSink = Box<dyn Fn(SequencerCheckpoint) + Send + Sync + 'static>;
/// Sink for finalized L2 block ids derived from `Event::TxsFinalized` and
/// `Event::FinalizedInscriptions`. Caller is responsible for cleanup
/// (e.g. marking pending blocks as finalized in storage).
pub type FinalizedBlockSink = Box<dyn Fn(u64) + Send + Sync + 'static>;
#[expect(async_fn_in_trait, reason = "We don't care about Send/Sync here")]
pub trait BlockPublisherTrait: Clone {
async fn new(
config: &BedrockConfig,
bedrock_signing_key: Ed25519Key,
resubmit_interval: Duration,
initial_checkpoint: Option<SequencerCheckpoint>,
on_checkpoint: CheckpointSink,
on_finalized_block: FinalizedBlockSink,
) -> Result<Self>;
/// Fire-and-forget publish. Zone-sdk drives the actual submission and
/// retries internally; this just hands the payload off.
async fn publish_block(&self, block: &Block) -> Result<()>;
}
/// Real block publisher backed by zone-sdk's `ZoneSequencer`.
#[derive(Clone)]
pub struct ZoneSdkPublisher {
handle: SequencerHandle<NodeHttpClient>,
// Aborts the drive task when the last clone is dropped.
_drive_task: Arc<DriveTaskGuard>,
}
struct DriveTaskGuard(JoinHandle<()>);
impl Drop for DriveTaskGuard {
fn drop(&mut self) {
self.0.abort();
}
}
impl BlockPublisherTrait for ZoneSdkPublisher {
async fn new(
config: &BedrockConfig,
bedrock_signing_key: Ed25519Key,
resubmit_interval: Duration,
initial_checkpoint: Option<SequencerCheckpoint>,
on_checkpoint: CheckpointSink,
on_finalized_block: FinalizedBlockSink,
) -> Result<Self> {
let basic_auth = config.auth.clone().map(Into::into);
let node = NodeHttpClient::new(CommonHttpClient::new(basic_auth), config.node_url.clone());
let zone_sdk_config = ZoneSdkSequencerConfig {
resubmit_interval,
..ZoneSdkSequencerConfig::default()
};
let (mut sequencer, mut handle) = ZoneSequencer::init_with_config(
config.channel_id,
bedrock_signing_key,
node,
zone_sdk_config,
initial_checkpoint,
);
let drive_task = tokio::spawn(async move {
loop {
let Some(event) = sequencer.next_event().await else {
continue;
};
match event {
Event::Published { checkpoint, .. } => on_checkpoint(checkpoint),
Event::TxsFinalized { inscriptions, .. }
| Event::FinalizedInscriptions { inscriptions } => {
if let Some(max_block_id) = max_block_id_from_inscriptions(&inscriptions) {
on_finalized_block(max_block_id);
}
}
Event::ChannelUpdate { .. } | Event::Ready => {}
}
}
});
handle.wait_ready().await;
Ok(Self {
handle,
_drive_task: Arc::new(DriveTaskGuard(drive_task)),
})
}
async fn publish_block(&self, block: &Block) -> Result<()> {
let data = borsh::to_vec(block).context("Failed to serialize block")?;
self.handle
.publish_message(data)
.await
.map_err(|e| anyhow!("zone-sdk publish failed: {e}"))?;
Ok(())
}
}
/// Deserialize each inscription payload as a `Block` and return the highest
/// `block_id`. Bad payloads are logged and skipped.
fn max_block_id_from_inscriptions(inscriptions: &[InscriptionInfo]) -> Option<u64> {
inscriptions
.iter()
.filter_map(
|inscription| match borsh::from_slice::<Block>(&inscription.payload) {
Ok(block) => Some(block.header.block_id),
Err(err) => {
warn!("Failed to deserialize finalized inscription as Block: {err:#}");
None
}
},
)
.max()
}

View File

@ -1,116 +0,0 @@
use anyhow::{Context as _, Result};
use bedrock_client::BedrockClient;
pub use common::block::Block;
pub use logos_blockchain_core::mantle::{MantleTx, SignedMantleTx, ops::channel::MsgId};
use logos_blockchain_core::mantle::{
Op, OpProof, Transaction as _,
ops::channel::{ChannelId, inscribe::InscriptionOp},
};
pub use logos_blockchain_key_management_system_service::keys::Ed25519Key;
use logos_blockchain_key_management_system_service::keys::Ed25519PublicKey;
use crate::config::BedrockConfig;
#[expect(async_fn_in_trait, reason = "We don't care about Send/Sync here")]
pub trait BlockSettlementClientTrait: Clone {
//// Create a new client.
fn new(config: &BedrockConfig, signing_key: Ed25519Key) -> Result<Self>;
/// Get the bedrock channel ID used by this client.
fn bedrock_channel_id(&self) -> ChannelId;
/// Get the bedrock signing key used by this client.
fn bedrock_signing_key(&self) -> &Ed25519Key;
/// Post a transaction to the node.
async fn submit_inscribe_tx_to_bedrock(&self, tx: SignedMantleTx) -> Result<()>;
/// Create and sign a transaction for inscribing data.
fn create_inscribe_tx(&self, block: &Block) -> Result<(SignedMantleTx, MsgId)> {
let inscription_data = borsh::to_vec(block)?;
log::debug!(
"The size of the block {} is {} bytes",
block.header.block_id,
inscription_data.len()
);
let verifying_key_bytes = self.bedrock_signing_key().public_key().to_bytes();
let verifying_key =
Ed25519PublicKey::from_bytes(&verifying_key_bytes).expect("valid ed25519 public key");
let inscribe_op = InscriptionOp {
channel_id: self.bedrock_channel_id(),
inscription: inscription_data,
parent: block.bedrock_parent_id.into(),
signer: verifying_key,
};
let inscribe_op_id = inscribe_op.id();
let inscribe_tx = MantleTx {
ops: vec![Op::ChannelInscribe(inscribe_op)],
// Altruistic test config
storage_gas_price: 0.into(),
execution_gas_price: 0.into(),
};
let tx_hash = inscribe_tx.hash();
let signature_bytes = self
.bedrock_signing_key()
.sign_payload(tx_hash.as_signing_bytes().as_ref())
.to_bytes();
let signature =
logos_blockchain_key_management_system_service::keys::Ed25519Signature::from_bytes(
&signature_bytes,
);
let signed_mantle_tx = SignedMantleTx {
ops_proofs: vec![OpProof::Ed25519Sig(signature)],
mantle_tx: inscribe_tx,
};
Ok((signed_mantle_tx, inscribe_op_id))
}
}
/// A component that posts block data to logos blockchain.
#[derive(Clone)]
pub struct BlockSettlementClient {
client: BedrockClient,
signing_key: Ed25519Key,
channel_id: ChannelId,
}
impl BlockSettlementClientTrait for BlockSettlementClient {
fn new(config: &BedrockConfig, signing_key: Ed25519Key) -> Result<Self> {
let client =
BedrockClient::new(config.backoff, config.node_url.clone(), config.auth.clone())
.context("Failed to initialize bedrock client")?;
Ok(Self {
client,
signing_key,
channel_id: config.channel_id,
})
}
async fn submit_inscribe_tx_to_bedrock(&self, tx: SignedMantleTx) -> Result<()> {
let (parent_id, msg_id) = match tx.mantle_tx.ops.first() {
Some(Op::ChannelInscribe(inscribe)) => (inscribe.parent, inscribe.id()),
_ => panic!("Expected ChannelInscribe op"),
};
self.client
.post_transaction(tx)
.await
.context("Failed to post transaction to Bedrock after retries")?
.context("Failed to post transaction to Bedrock with non-retryable error")?;
log::debug!("Posted block to Bedrock with parent id {parent_id:?} and msg id: {msg_id:?}");
Ok(())
}
fn bedrock_channel_id(&self) -> ChannelId {
self.channel_id
}
fn bedrock_signing_key(&self) -> &Ed25519Key {
&self.signing_key
}
}

View File

@ -1,16 +1,17 @@
use std::{collections::HashMap, path::Path};
use std::{collections::HashMap, path::Path, sync::Arc};
use anyhow::Result;
use anyhow::{Context as _, Result};
use common::{
HashType,
block::{Block, BlockMeta, MantleMsgId},
transaction::NSSATransaction,
};
use logos_blockchain_zone_sdk::sequencer::SequencerCheckpoint;
use nssa::V03State;
use storage::{error::DbError, sequencer::RocksDBIO};
pub struct SequencerStore {
dbio: RocksDBIO,
dbio: Arc<RocksDBIO>,
// TODO: Consider adding the hashmap to the database for faster recovery.
tx_hash_to_block_map: HashMap<HashType, u64>,
genesis_id: u64,
@ -30,7 +31,11 @@ impl SequencerStore {
) -> Result<Self> {
let tx_hash_to_block_map = block_to_transactions_map(genesis_block);
let dbio = RocksDBIO::open_or_create(location, genesis_block, genesis_msg_id)?;
let dbio = Arc::new(RocksDBIO::open_or_create(
location,
genesis_block,
genesis_msg_id,
)?);
let genesis_id = dbio.get_meta_first_block_in_db()?;
@ -42,6 +47,14 @@ impl SequencerStore {
})
}
/// Shared handle to the underlying rocksdb. Used to persist the zone-sdk
/// checkpoint from the sequencer's drive task without needing &mut to the
/// store.
#[must_use]
pub fn dbio(&self) -> Arc<RocksDBIO> {
Arc::clone(&self.dbio)
}
pub fn get_block_at_id(&self, id: u64) -> Result<Option<Block>, DbError> {
self.dbio.get_block(id)
}
@ -55,6 +68,7 @@ impl SequencerStore {
}
/// Returns the transaction corresponding to the given hash, if it exists in the blockchain.
#[must_use]
pub fn get_transaction_by_hash(&self, hash: HashType) -> Option<NSSATransaction> {
let block_id = *self.tx_hash_to_block_map.get(&hash)?;
let block = self
@ -76,10 +90,12 @@ impl SequencerStore {
Ok(self.dbio.latest_block_meta()?)
}
#[must_use]
pub const fn genesis_id(&self) -> u64 {
self.genesis_id
}
#[must_use]
pub const fn signing_key(&self) -> &nssa::PrivateKey {
&self.signing_key
}
@ -100,9 +116,26 @@ impl SequencerStore {
Ok(())
}
#[must_use]
pub fn get_nssa_state(&self) -> Option<V03State> {
self.dbio.get_nssa_state().ok()
}
pub fn get_zone_checkpoint(&self) -> Result<Option<SequencerCheckpoint>> {
let Some(bytes) = self.dbio.get_zone_sdk_checkpoint_bytes()? else {
return Ok(None);
};
let checkpoint: SequencerCheckpoint = serde_json::from_slice(&bytes)
.context("Failed to deserialize stored zone-sdk checkpoint")?;
Ok(Some(checkpoint))
}
pub fn set_zone_checkpoint(&self, checkpoint: &SequencerCheckpoint) -> Result<()> {
let bytes =
serde_json::to_vec(checkpoint).context("Failed to serialize zone-sdk checkpoint")?;
self.dbio.put_zone_sdk_checkpoint_bytes(&bytes)?;
Ok(())
}
}
pub(crate) fn block_to_transactions_map(block: &Block) -> HashMap<HashType, u64> {

View File

@ -6,7 +6,6 @@ use std::{
};
use anyhow::Result;
use bedrock_client::BackoffConfig;
use bytesize::ByteSize;
use common::config::BasicAuth;
use humantime_serde;
@ -42,8 +41,6 @@ pub struct SequencerConfig {
pub signing_key: [u8; 32],
/// Bedrock configuration options.
pub bedrock_config: BedrockConfig,
/// Indexer RPC URL.
pub indexer_rpc_url: Url,
#[serde(skip_serializing_if = "Option::is_none")]
pub initial_public_accounts: Option<Vec<PublicAccountPublicInitialData>>,
#[serde(skip_serializing_if = "Option::is_none")]
@ -52,9 +49,6 @@ pub struct SequencerConfig {
#[derive(Clone, Serialize, Deserialize)]
pub struct BedrockConfig {
/// Fibonacci backoff retry strategy configuration.
#[serde(default)]
pub backoff: BackoffConfig,
/// Bedrock channel ID.
pub channel_id: ChannelId,
/// Bedrock Url.

View File

@ -1,34 +0,0 @@
use std::{ops::Deref, sync::Arc};
use anyhow::{Context as _, Result};
use log::info;
pub use url::Url;
#[expect(async_fn_in_trait, reason = "We don't care about Send/Sync here")]
pub trait IndexerClientTrait: Clone {
async fn new(indexer_url: &Url) -> Result<Self>;
}
#[derive(Clone)]
pub struct IndexerClient(Arc<jsonrpsee::ws_client::WsClient>);
impl IndexerClientTrait for IndexerClient {
async fn new(indexer_url: &Url) -> Result<Self> {
info!("Connecting to Indexer at {indexer_url}");
let client = jsonrpsee::ws_client::WsClientBuilder::default()
.build(indexer_url)
.await
.context("Failed to create websocket client")?;
Ok(Self(Arc::new(client)))
}
}
impl Deref for IndexerClient {
type Target = jsonrpsee::ws_client::WsClient;
fn deref(&self) -> &Self::Target {
&self.0
}
}

View File

@ -1,7 +1,6 @@
use std::{path::Path, time::Instant};
use anyhow::{Context as _, Result, anyhow};
use bedrock_client::SignedMantleTx;
#[cfg(feature = "testnet")]
use common::PINATA_BASE58;
use common::{
@ -20,33 +19,27 @@ pub use storage::error::DbError;
use testnet_initial_state::initial_state;
use crate::{
block_settlement_client::{BlockSettlementClient, BlockSettlementClientTrait, MsgId},
block_publisher::{BlockPublisherTrait, ZoneSdkPublisher},
block_store::SequencerStore,
indexer_client::{IndexerClient, IndexerClientTrait},
};
pub mod block_settlement_client;
pub mod block_publisher;
pub mod block_store;
pub mod config;
pub mod indexer_client;
#[cfg(feature = "mock")]
pub mod mock;
pub struct SequencerCore<
BC: BlockSettlementClientTrait = BlockSettlementClient,
IC: IndexerClientTrait = IndexerClient,
> {
pub struct SequencerCore<BP: BlockPublisherTrait = ZoneSdkPublisher> {
state: nssa::V03State,
store: SequencerStore,
mempool: MemPool<NSSATransaction>,
sequencer_config: SequencerConfig,
chain_height: u64,
block_settlement_client: BC,
indexer_client: IC,
block_publisher: BP,
}
impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, IC> {
impl<BP: BlockPublisherTrait> SequencerCore<BP> {
/// Starts the sequencer using the provided configuration.
/// If an existing database is found, the sequencer state is loaded from it and
/// assumed to represent the correct latest state consistent with Bedrock-finalized data.
@ -70,23 +63,16 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, I
load_or_create_signing_key(&config.home.join("bedrock_signing_key"))
.expect("Failed to load or create bedrock signing key");
let block_settlement_client = BC::new(&config.bedrock_config, bedrock_signing_key)
.expect("Failed to initialize Block Settlement Client");
let indexer_client = IC::new(&config.indexer_rpc_url)
.await
.expect("Failed to create Indexer Client");
let (_tx, genesis_msg_id) = block_settlement_client
.create_inscribe_tx(&genesis_block)
.expect("Failed to create inscribe tx for genesis block");
// TODO: Remove msg_id from BlockMeta — it is no longer needed now that
// zone-sdk manages L1 settlement state via its own checkpoint.
let genesis_msg_id = [0_u8; 32];
// Sequencer should panic if unable to open db,
// as fixing this issue may require actions non-native to program scope
let store = SequencerStore::open_db_with_genesis(
&config.home.join("rocksdb"),
&genesis_block,
genesis_msg_id.into(),
genesis_msg_id,
signing_key,
)
.unwrap();
@ -94,6 +80,51 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, I
.latest_block_meta()
.expect("Failed to read latest block meta from store");
let initial_checkpoint = store
.get_zone_checkpoint()
.expect("Failed to load zone-sdk checkpoint");
let is_fresh_start = initial_checkpoint.is_none();
let dbio_for_checkpoint = store.dbio();
let on_checkpoint: block_publisher::CheckpointSink = Box::new(move |cp| {
let bytes = match serde_json::to_vec(&cp) {
Ok(b) => b,
Err(err) => {
error!("Failed to serialize zone-sdk checkpoint: {err:#}");
return;
}
};
if let Err(err) = dbio_for_checkpoint.put_zone_sdk_checkpoint_bytes(&bytes) {
error!("Failed to persist zone-sdk checkpoint: {err:#}");
}
});
let dbio_for_finalized = store.dbio();
let on_finalized_block: block_publisher::FinalizedBlockSink = Box::new(move |block_id| {
if let Err(err) = dbio_for_finalized.clean_pending_blocks_up_to(block_id) {
error!("Failed to mark pending blocks finalized up to {block_id}: {err:#}");
}
});
let block_publisher = BP::new(
&config.bedrock_config,
bedrock_signing_key,
config.retry_pending_blocks_timeout,
initial_checkpoint,
on_checkpoint,
on_finalized_block,
)
.await
.expect("Failed to initialize Block Publisher");
// On a truly fresh start (no checkpoint persisted yet), publish the
// genesis block so the indexer can find the channel start. After the
// first publish, zone-sdk's checkpoint persistence covers further
// restarts.
if is_fresh_start && let Err(err) = block_publisher.publish_block(&genesis_block).await {
error!("Failed to publish genesis block: {err:#}");
}
#[cfg_attr(not(feature = "testnet"), allow(unused_mut))]
let mut state = if let Some(state) = store.get_nssa_state() {
info!("Found local database. Loading state and pending blocks from it.");
@ -159,35 +190,33 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, I
mempool,
chain_height: latest_block_meta.id,
sequencer_config: config,
block_settlement_client,
indexer_client,
block_publisher,
};
(sequencer_core, mempool_handle)
}
/// Produces a new block from mempool transactions and publishes it via zone-sdk.
pub async fn produce_new_block(&mut self) -> Result<u64> {
let (tx, _msg_id) = self
.produce_new_block_with_mempool_transactions()
.context("Failed to produce new block with mempool transactions")?;
match self
.block_settlement_client
.submit_inscribe_tx_to_bedrock(tx)
.await
{
Ok(()) => {}
Err(err) => {
error!("Failed to post block data to Bedrock with error: {err:#}");
}
let block = self
.build_block_from_mempool()
.context("Failed to build block from mempool transactions")?;
// TODO: Remove msg_id from store.update — it is no longer needed now that
// zone-sdk manages L1 settlement state via its own checkpoint.
let placeholder_msg_id = [0_u8; 32];
if let Err(err) = self.block_publisher.publish_block(&block).await {
error!("Failed to publish block to Bedrock with error: {err:#}");
}
self.store.update(&block, placeholder_msg_id, &self.state)?;
Ok(self.chain_height)
}
/// Produces new block from transactions in mempool and packs it into a `SignedMantleTx`.
pub fn produce_new_block_with_mempool_transactions(
&mut self,
) -> Result<(SignedMantleTx, MsgId)> {
/// Builds a new block from transactions in the mempool.
/// Does NOT publish or store the block — the caller is responsible for that.
pub fn build_block_from_mempool(&mut self) -> Result<Block> {
let now = Instant::now();
let new_block_height = self.next_block_id();
@ -277,21 +306,12 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, I
timestamp: new_block_timestamp,
};
// TODO: Remove bedrock_parent_id from Block — it is no longer needed now
// that zone-sdk manages the inscription parent chain internally.
let placeholder_parent_id = [0_u8; 32];
let block = hashable_data
.clone()
.into_pending_block(self.store.signing_key(), latest_block_meta.msg_id);
let (tx, msg_id) = self
.block_settlement_client
.create_inscribe_tx(&block)
.with_context(|| {
format!(
"Failed to create inscribe transaction for block with id {}",
block.header.block_id
)
})?;
self.store.update(&block, msg_id.into(), &self.state)?;
.into_pending_block(self.store.signing_key(), placeholder_parent_id);
self.chain_height = new_block_height;
@ -300,7 +320,7 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, I
hashable_data.transactions.len(),
now.elapsed().as_secs()
);
Ok((tx, msg_id))
Ok(block)
}
pub const fn state(&self) -> &nssa::V03State {
@ -319,22 +339,19 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, I
&self.sequencer_config
}
/// Deletes finalized blocks from the sequencer's pending block list.
/// This method must be called when new blocks are finalized on Bedrock.
/// All pending blocks with an ID less than or equal to `last_finalized_block_id`
/// are removed from the database.
pub fn clean_finalized_blocks_from_db(&mut self, last_finalized_block_id: u64) -> Result<()> {
self.get_pending_blocks()?
.iter()
.map(|block| block.header.block_id)
.min()
.map_or(Ok(()), |first_pending_block_id| {
info!("Clearing pending blocks up to id: {last_finalized_block_id}");
// TODO: Delete blocks instead of marking them as finalized.
// Current approach is used because we still have `GetBlockDataRequest`.
(first_pending_block_id..=last_finalized_block_id)
.try_for_each(|id| self.store.mark_block_as_finalized(id))
})
/// Marks all pending blocks with `block_id <= last_finalized_block_id` as
/// finalized. Idempotent. Production callers don't invoke this directly —
/// it's wired up in `start_from_config` to the publisher's
/// `on_finalized_block` sink, which fires on `Event::TxsFinalized` /
/// `Event::FinalizedInscriptions`. Kept on the type for tests.
// TODO: Delete blocks instead of marking them as finalized. Current
// approach is used because we still have `GetBlockDataRequest`.
pub fn clean_finalized_blocks_from_db(&self, last_finalized_block_id: u64) -> Result<()> {
info!("Clearing pending blocks up to id: {last_finalized_block_id}");
self.store
.dbio()
.clean_pending_blocks_up_to(last_finalized_block_id)?;
Ok(())
}
/// Returns the list of stored pending blocks.
@ -348,12 +365,8 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, I
.collect())
}
pub fn block_settlement_client(&self) -> BC {
self.block_settlement_client.clone()
}
pub fn indexer_client(&self) -> IC {
self.indexer_client.clone()
pub fn block_publisher(&self) -> BP {
self.block_publisher.clone()
}
fn next_block_id(&self) -> u64 {
@ -392,7 +405,6 @@ mod tests {
use std::{pin::pin, time::Duration};
use bedrock_client::BackoffConfig;
use common::{
test_utils::sequencer_sign_key_for_testing,
transaction::{NSSATransaction, clock_invocation},
@ -420,16 +432,11 @@ mod tests {
block_create_timeout: Duration::from_secs(1),
signing_key: *sequencer_sign_key_for_testing().value(),
bedrock_config: BedrockConfig {
backoff: BackoffConfig {
start_delay: Duration::from_millis(100),
max_retries: 5,
},
channel_id: ChannelId::from([0; 32]),
node_url: "http://not-used-in-unit-tests".parse().unwrap(),
auth: None,
},
retry_pending_blocks_timeout: Duration::from_mins(4),
indexer_rpc_url: "ws://localhost:8779".parse().unwrap(),
initial_public_accounts: None,
initial_private_accounts: None,
}
@ -457,9 +464,7 @@ mod tests {
let tx = common::test_utils::produce_dummy_empty_transaction();
mempool_handle.push(tx).await.unwrap();
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
sequencer.produce_new_block().await.unwrap();
(sequencer, mempool_handle)
}
@ -604,23 +609,21 @@ mod tests {
assert!(poll.is_pending());
// Empty the mempool by producing a block
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
sequencer.produce_new_block().await.unwrap();
// Resolve the pending push
assert!(push_fut.await.is_ok());
}
#[tokio::test]
async fn produce_new_block_with_mempool_transactions() {
async fn build_block_from_mempool() {
let (mut sequencer, mempool_handle) = common_setup().await;
let genesis_height = sequencer.chain_height;
let tx = common::test_utils::produce_dummy_empty_transaction();
mempool_handle.push(tx).await.unwrap();
let result = sequencer.produce_new_block_with_mempool_transactions();
let result = sequencer.build_block_from_mempool();
assert!(result.is_ok());
assert_eq!(sequencer.chain_height, genesis_height + 1);
}
@ -645,9 +648,7 @@ mod tests {
mempool_handle.push(tx_replay).await.unwrap();
// Create block
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
sequencer.produce_new_block().await.unwrap();
let block = sequencer
.store
.get_block_at_id(sequencer.chain_height)
@ -679,9 +680,7 @@ mod tests {
// The transaction should be included the first time
mempool_handle.push(tx.clone()).await.unwrap();
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
sequencer.produce_new_block().await.unwrap();
let block = sequencer
.store
.get_block_at_id(sequencer.chain_height)
@ -697,9 +696,7 @@ mod tests {
// Add same transaction should fail
mempool_handle.push(tx.clone()).await.unwrap();
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
sequencer.produce_new_block().await.unwrap();
let block = sequencer
.store
.get_block_at_id(sequencer.chain_height)
@ -738,9 +735,7 @@ mod tests {
);
mempool_handle.push(tx.clone()).await.unwrap();
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
sequencer.produce_new_block().await.unwrap();
let block = sequencer
.store
.get_block_at_id(sequencer.chain_height)
@ -778,15 +773,9 @@ mod tests {
let config = setup_sequencer_config();
let (mut sequencer, _mempool_handle) =
SequencerCoreWithMockClients::start_from_config(config).await;
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
sequencer.produce_new_block().await.unwrap();
sequencer.produce_new_block().await.unwrap();
sequencer.produce_new_block().await.unwrap();
assert_eq!(sequencer.get_pending_blocks().unwrap().len(), 4);
}
@ -795,15 +784,9 @@ mod tests {
let config = setup_sequencer_config();
let (mut sequencer, _mempool_handle) =
SequencerCoreWithMockClients::start_from_config(config).await;
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
sequencer.produce_new_block().await.unwrap();
sequencer.produce_new_block().await.unwrap();
sequencer.produce_new_block().await.unwrap();
let last_finalized_block = 3;
sequencer
@ -836,9 +819,7 @@ mod tests {
);
mempool_handle.push(tx).await.unwrap();
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
sequencer.produce_new_block().await.unwrap();
// Get the metadata of the last block produced
sequencer.store.latest_block_meta().unwrap()
@ -861,9 +842,7 @@ mod tests {
mempool_handle.push(tx.clone()).await.unwrap();
// Step 4: Produce new block
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
sequencer.produce_new_block().await.unwrap();
// Step 5: Verify the new block has correct previous block metadata
let new_block = sequencer
@ -876,10 +855,6 @@ mod tests {
new_block.header.prev_block_hash, expected_prev_meta.hash,
"New block's prev_block_hash should match the stored metadata hash"
);
assert_eq!(
new_block.bedrock_parent_id, expected_prev_meta.msg_id,
"New block's bedrock_parent_id should match the stored metadata msg_id"
);
assert_eq!(
new_block.body.transactions,
vec![
@ -914,9 +889,7 @@ mod tests {
.await
.unwrap();
mempool_handle.push(crafted_clock_tx).await.unwrap();
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
sequencer.produce_new_block().await.unwrap();
let block = sequencer
.store
@ -949,15 +922,11 @@ mod tests {
// Produce multiple blocks to advance chain height
let tx = common::test_utils::produce_dummy_empty_transaction();
mempool_handle.push(tx).await.unwrap();
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
sequencer.produce_new_block().await.unwrap();
let tx = common::test_utils::produce_dummy_empty_transaction();
mempool_handle.push(tx).await.unwrap();
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
sequencer.produce_new_block().await.unwrap();
// Return the current chain height (should be genesis_id + 2)
sequencer.chain_height
@ -994,9 +963,7 @@ mod tests {
),
));
mempool_handle.push(deploy_tx).await.unwrap();
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
sequencer.produce_new_block().await.unwrap();
// Build a user transaction that invokes clock_chain_caller, which in turn chain-calls the
// clock program with the clock accounts. The sequencer should detect that the resulting
@ -1021,9 +988,7 @@ mod tests {
));
mempool_handle.push(user_tx).await.unwrap();
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
sequencer.produce_new_block().await.unwrap();
let block = sequencer
.store
@ -1057,7 +1022,7 @@ mod tests {
mempool_handle.push(tx).await.unwrap();
// Block production must fail because the appended clock tx cannot execute.
let result = sequencer.produce_new_block_with_mempool_transactions();
let result = sequencer.produce_new_block().await;
assert!(
result.is_err(),
"Block production should abort when clock account data is corrupted"

View File

@ -1,76 +1,34 @@
use anyhow::{Result, anyhow};
use bedrock_client::SignedMantleTx;
use logos_blockchain_core::mantle::ops::channel::ChannelId;
use std::time::Duration;
use anyhow::Result;
use common::block::Block;
use logos_blockchain_key_management_system_service::keys::Ed25519Key;
use url::Url;
use crate::{
block_settlement_client::BlockSettlementClientTrait, config::BedrockConfig,
indexer_client::IndexerClientTrait,
block_publisher::{
BlockPublisherTrait, CheckpointSink, FinalizedBlockSink, SequencerCheckpoint,
},
config::BedrockConfig,
};
pub type SequencerCoreWithMockClients =
crate::SequencerCore<MockBlockSettlementClient, MockIndexerClient>;
pub type SequencerCoreWithMockClients = crate::SequencerCore<MockBlockPublisher>;
#[derive(Clone)]
pub struct MockBlockSettlementClient {
bedrock_channel_id: ChannelId,
bedrock_signing_key: Ed25519Key,
}
pub struct MockBlockPublisher;
impl BlockSettlementClientTrait for MockBlockSettlementClient {
fn new(config: &BedrockConfig, signing_key: Ed25519Key) -> Result<Self> {
Ok(Self {
bedrock_channel_id: config.channel_id,
bedrock_signing_key: signing_key,
})
impl BlockPublisherTrait for MockBlockPublisher {
async fn new(
_config: &BedrockConfig,
_bedrock_signing_key: Ed25519Key,
_resubmit_interval: Duration,
_initial_checkpoint: Option<SequencerCheckpoint>,
_on_checkpoint: CheckpointSink,
_on_finalized_block: FinalizedBlockSink,
) -> Result<Self> {
Ok(Self)
}
fn bedrock_channel_id(&self) -> ChannelId {
self.bedrock_channel_id
}
fn bedrock_signing_key(&self) -> &Ed25519Key {
&self.bedrock_signing_key
}
async fn submit_inscribe_tx_to_bedrock(&self, _tx: SignedMantleTx) -> Result<()> {
async fn publish_block(&self, _block: &Block) -> Result<()> {
Ok(())
}
}
#[derive(Clone)]
pub struct MockBlockSettlementClientWithError {
bedrock_channel_id: ChannelId,
bedrock_signing_key: Ed25519Key,
}
impl BlockSettlementClientTrait for MockBlockSettlementClientWithError {
fn new(config: &BedrockConfig, signing_key: Ed25519Key) -> Result<Self> {
Ok(Self {
bedrock_channel_id: config.channel_id,
bedrock_signing_key: signing_key,
})
}
fn bedrock_channel_id(&self) -> ChannelId {
self.bedrock_channel_id
}
fn bedrock_signing_key(&self) -> &Ed25519Key {
&self.bedrock_signing_key
}
async fn submit_inscribe_tx_to_bedrock(&self, _tx: SignedMantleTx) -> Result<()> {
Err(anyhow!("Mock error"))
}
}
#[derive(Copy, Clone)]
pub struct MockIndexerClient;
impl IndexerClientTrait for MockIndexerClient {
async fn new(_indexer_url: &Url) -> Result<Self> {
Ok(Self)
}
}

View File

@ -14,7 +14,6 @@ mempool.workspace = true
sequencer_core = { workspace = true, features = ["testnet"] }
sequencer_service_protocol.workspace = true
sequencer_service_rpc = { workspace = true, features = ["server"] }
indexer_service_rpc = { workspace = true, features = ["client"] }
clap = { workspace = true, features = ["derive", "env"] }
anyhow.workspace = true

View File

@ -5,15 +5,13 @@ use bytesize::ByteSize;
use common::transaction::NSSATransaction;
use futures::never::Never;
use jsonrpsee::server::ServerHandle;
#[cfg(not(feature = "standalone"))]
use log::warn;
use log::{error, info};
use mempool::MemPoolHandle;
#[cfg(not(feature = "standalone"))]
use sequencer_core::SequencerCore;
#[cfg(feature = "standalone")]
use sequencer_core::SequencerCoreWithMockClients as SequencerCore;
pub use sequencer_core::config::*;
#[cfg(not(feature = "standalone"))]
use sequencer_core::{SequencerCore, block_settlement_client::BlockSettlementClientTrait as _};
use sequencer_service_rpc::RpcServer as _;
use tokio::{sync::Mutex, task::JoinHandle};
@ -29,8 +27,6 @@ pub struct SequencerHandle {
/// Option because of `Drop` which forbids to simply move out of `self` in `stopped()`.
server_handle: Option<ServerHandle>,
main_loop_handle: JoinHandle<Result<Never>>,
retry_pending_blocks_loop_handle: JoinHandle<Result<Never>>,
listen_for_bedrock_blocks_loop_handle: JoinHandle<Result<Never>>,
}
impl SequencerHandle {
@ -38,15 +34,11 @@ impl SequencerHandle {
addr: SocketAddr,
server_handle: ServerHandle,
main_loop_handle: JoinHandle<Result<Never>>,
retry_pending_blocks_loop_handle: JoinHandle<Result<Never>>,
listen_for_bedrock_blocks_loop_handle: JoinHandle<Result<Never>>,
) -> Self {
Self {
addr,
server_handle: Some(server_handle),
main_loop_handle,
retry_pending_blocks_loop_handle,
listen_for_bedrock_blocks_loop_handle,
}
}
@ -60,8 +52,6 @@ impl SequencerHandle {
addr: _,
server_handle,
main_loop_handle,
retry_pending_blocks_loop_handle,
listen_for_bedrock_blocks_loop_handle,
} = &mut self;
let server_handle = server_handle.take().expect("Server handle is set");
@ -75,16 +65,6 @@ impl SequencerHandle {
.context("Main loop task panicked")?
.context("Main loop exited unexpectedly")
}
res = retry_pending_blocks_loop_handle => {
res
.context("Retry pending blocks loop task panicked")?
.context("Retry pending blocks loop exited unexpectedly")
}
res = listen_for_bedrock_blocks_loop_handle => {
res
.context("Listen for bedrock blocks loop task panicked")?
.context("Listen for bedrock blocks loop exited unexpectedly")
}
}
}
@ -98,14 +78,10 @@ impl SequencerHandle {
addr: _,
server_handle,
main_loop_handle,
retry_pending_blocks_loop_handle,
listen_for_bedrock_blocks_loop_handle,
} = self;
let stopped = server_handle.as_ref().is_none_or(ServerHandle::is_stopped)
|| main_loop_handle.is_finished()
|| retry_pending_blocks_loop_handle.is_finished()
|| listen_for_bedrock_blocks_loop_handle.is_finished();
|| main_loop_handle.is_finished();
!stopped
}
@ -121,13 +97,9 @@ impl Drop for SequencerHandle {
addr: _,
server_handle,
main_loop_handle,
retry_pending_blocks_loop_handle,
listen_for_bedrock_blocks_loop_handle,
} = self;
main_loop_handle.abort();
retry_pending_blocks_loop_handle.abort();
listen_for_bedrock_blocks_loop_handle.abort();
let Some(handle) = server_handle else {
return;
@ -141,7 +113,6 @@ impl Drop for SequencerHandle {
pub async fn run(config: SequencerConfig, port: u16) -> Result<SequencerHandle> {
let block_timeout = config.block_create_timeout;
let retry_pending_blocks_timeout = config.retry_pending_blocks_timeout;
let max_block_size = config.max_block_size;
let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(config).await;
@ -159,34 +130,10 @@ pub async fn run(config: SequencerConfig, port: u16) -> Result<SequencerHandle>
.await?;
info!("RPC server started");
#[cfg(not(feature = "standalone"))]
{
info!("Submitting stored pending blocks");
retry_pending_blocks(&seq_core_wrapped)
.await
.expect("Failed to submit pending blocks on startup");
}
info!("Starting main sequencer loop");
let main_loop_handle = tokio::spawn(main_loop(Arc::clone(&seq_core_wrapped), block_timeout));
let main_loop_handle = tokio::spawn(main_loop(seq_core_wrapped, block_timeout));
info!("Starting pending block retry loop");
let retry_pending_blocks_loop_handle = tokio::spawn(retry_pending_blocks_loop(
Arc::clone(&seq_core_wrapped),
retry_pending_blocks_timeout,
));
info!("Starting bedrock block listening loop");
let listen_for_bedrock_blocks_loop_handle =
tokio::spawn(listen_for_bedrock_blocks_loop(seq_core_wrapped));
Ok(SequencerHandle::new(
addr,
server_handle,
main_loop_handle,
retry_pending_blocks_loop_handle,
listen_for_bedrock_blocks_loop_handle,
))
Ok(SequencerHandle::new(addr, server_handle, main_loop_handle))
}
async fn run_server(
@ -235,118 +182,3 @@ async fn main_loop(seq_core: Arc<Mutex<SequencerCore>>, block_timeout: Duration)
info!("Waiting for new transactions");
}
}
#[cfg(not(feature = "standalone"))]
async fn retry_pending_blocks(seq_core: &Arc<Mutex<SequencerCore>>) -> Result<()> {
use std::time::Instant;
use log::debug;
let (mut pending_blocks, block_settlement_client) = {
let sequencer_core = seq_core.lock().await;
let client = sequencer_core.block_settlement_client();
let pending_blocks = sequencer_core
.get_pending_blocks()
.expect("Sequencer should be able to retrieve pending blocks");
(pending_blocks, client)
};
pending_blocks.sort_by(|block1, block2| block1.header.block_id.cmp(&block2.header.block_id));
if !pending_blocks.is_empty() {
info!(
"Resubmitting blocks from {} to {}",
pending_blocks.first().unwrap().header.block_id,
pending_blocks.last().unwrap().header.block_id
);
}
for block in &pending_blocks {
debug!(
"Resubmitting pending block with id {}",
block.header.block_id
);
// TODO: We could cache the inscribe tx for each pending block to avoid re-creating it
// on every retry.
let now = Instant::now();
let (tx, _msg_id) = block_settlement_client
.create_inscribe_tx(block)
.context("Failed to create inscribe tx for pending block")?;
debug!("Create inscribe: {:?}", now.elapsed());
let now = Instant::now();
if let Err(e) = block_settlement_client
.submit_inscribe_tx_to_bedrock(tx)
.await
{
warn!(
"Failed to resubmit block with id {} with error {e:#}",
block.header.block_id
);
}
debug!("Post: {:?}", now.elapsed());
}
Ok(())
}
#[cfg(not(feature = "standalone"))]
async fn retry_pending_blocks_loop(
seq_core: Arc<Mutex<SequencerCore>>,
retry_pending_blocks_timeout: Duration,
) -> Result<Never> {
loop {
tokio::time::sleep(retry_pending_blocks_timeout).await;
retry_pending_blocks(&seq_core).await?;
}
}
#[cfg(not(feature = "standalone"))]
async fn listen_for_bedrock_blocks_loop(seq_core: Arc<Mutex<SequencerCore>>) -> Result<Never> {
use indexer_service_rpc::RpcClient as _;
let indexer_client = seq_core.lock().await.indexer_client();
let retry_delay = Duration::from_secs(5);
loop {
// TODO: Subscribe from the first pending block ID?
let mut subscription = indexer_client
.subscribe_to_finalized_blocks()
.await
.context("Failed to subscribe to finalized blocks")?;
while let Some(block_id) = subscription.next().await {
let block_id = block_id.context("Failed to get next block from subscription")?;
info!("Received new L2 block with ID {block_id}");
seq_core
.lock()
.await
.clean_finalized_blocks_from_db(block_id)
.with_context(|| {
format!("Failed to clean finalized blocks from DB for block ID {block_id}")
})?;
}
warn!(
"Block subscription closed unexpectedly, reason: {:?}, retrying after {retry_delay:?}",
subscription.close_reason()
);
tokio::time::sleep(retry_delay).await;
}
}
#[cfg(feature = "standalone")]
async fn listen_for_bedrock_blocks_loop(_seq_core: Arc<Mutex<SequencerCore>>) -> Result<Never> {
std::future::pending::<Result<Never>>().await
}
#[cfg(feature = "standalone")]
async fn retry_pending_blocks_loop(
_seq_core: Arc<Mutex<SequencerCore>>,
_retry_pending_blocks_timeout: Duration,
) -> Result<Never> {
std::future::pending::<Result<Never>>().await
}

View File

@ -8,10 +8,7 @@ use jsonrpsee::{
use log::warn;
use mempool::MemPoolHandle;
use nssa::{self, program::Program};
use sequencer_core::{
DbError, SequencerCore, block_settlement_client::BlockSettlementClientTrait,
indexer_client::IndexerClientTrait,
};
use sequencer_core::{DbError, SequencerCore, block_publisher::BlockPublisherTrait};
use sequencer_service_protocol::{
Account, AccountId, Block, BlockId, Commitment, HashType, MembershipProof, Nonce, ProgramId,
};
@ -19,15 +16,15 @@ use tokio::sync::Mutex;
const NOT_FOUND_ERROR_CODE: i32 = -31999;
pub struct SequencerService<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> {
sequencer: Arc<Mutex<SequencerCore<BC, IC>>>,
pub struct SequencerService<BC: BlockPublisherTrait> {
sequencer: Arc<Mutex<SequencerCore<BC>>>,
mempool_handle: MemPoolHandle<NSSATransaction>,
max_block_size: u64,
}
impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerService<BC, IC> {
impl<BC: BlockPublisherTrait> SequencerService<BC> {
pub const fn new(
sequencer: Arc<Mutex<SequencerCore<BC, IC>>>,
sequencer: Arc<Mutex<SequencerCore<BC>>>,
mempool_handle: MemPoolHandle<NSSATransaction>,
max_block_size: u64,
) -> Self {
@ -40,8 +37,8 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerService<BC
}
#[async_trait]
impl<BC: BlockSettlementClientTrait + Send + 'static, IC: IndexerClientTrait + Send + 'static>
sequencer_service_rpc::RpcServer for SequencerService<BC, IC>
impl<BC: BlockPublisherTrait + Send + 'static> sequencer_service_rpc::RpcServer
for SequencerService<BC>
{
async fn send_transaction(&self, tx: NSSATransaction) -> Result<HashType, ErrorObjectOwned> {
// Reserve ~200 bytes for block header overhead

View File

@ -8,7 +8,8 @@ use crate::{
indexer::{
ACC_NUM_CELL_NAME, BLOCK_HASH_CELL_NAME, BREAKPOINT_CELL_NAME, CF_ACC_META,
CF_BREAKPOINT_NAME, CF_HASH_TO_ID, CF_TX_TO_ID, DB_META_LAST_BREAKPOINT_ID,
DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY, TX_HASH_CELL_NAME,
DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY, DB_META_ZONE_SDK_INDEXER_CURSOR_KEY,
TX_HASH_CELL_NAME,
},
};
@ -211,6 +212,41 @@ impl SimpleWritableCell for AccNumTxCell {
}
}
/// Opaque bytes for the zone-sdk indexer cursor `Option<(MsgId, Slot)>`.
/// The caller serializes via `serde_json` (neither type derives borsh).
#[derive(BorshDeserialize)]
pub struct ZoneSdkIndexerCursorCellOwned(pub Vec<u8>);
impl SimpleStorableCell for ZoneSdkIndexerCursorCellOwned {
type KeyParams = ();
const CELL_NAME: &'static str = DB_META_ZONE_SDK_INDEXER_CURSOR_KEY;
const CF_NAME: &'static str = CF_META_NAME;
}
impl SimpleReadableCell for ZoneSdkIndexerCursorCellOwned {}
#[derive(BorshSerialize)]
pub struct ZoneSdkIndexerCursorCellRef<'bytes>(pub &'bytes [u8]);
impl SimpleStorableCell for ZoneSdkIndexerCursorCellRef<'_> {
type KeyParams = ();
const CELL_NAME: &'static str = DB_META_ZONE_SDK_INDEXER_CURSOR_KEY;
const CF_NAME: &'static str = CF_META_NAME;
}
impl SimpleWritableCell for ZoneSdkIndexerCursorCellRef<'_> {
fn value_constructor(&self) -> DbResult<Vec<u8>> {
borsh::to_vec(&self).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize zone-sdk indexer cursor cell".to_owned()),
)
})
}
}
#[cfg(test)]
mod uniform_tests {
use crate::{

View File

@ -22,6 +22,8 @@ pub const DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY: &str =
"last_observed_l1_lib_header_in_db";
/// Key base for storing metainformation about the last breakpoint.
pub const DB_META_LAST_BREAKPOINT_ID: &str = "last_breakpoint_id";
/// Key base for storing the zone-sdk indexer cursor (opaque bytes).
pub const DB_META_ZONE_SDK_INDEXER_CURSOR_KEY: &str = "zone_sdk_indexer_cursor";
/// Cell name for a breakpoint.
pub const BREAKPOINT_CELL_NAME: &str = "breakpoint";

View File

@ -4,7 +4,7 @@ use crate::{
cells::shared_cells::{BlockCell, FirstBlockCell, FirstBlockSetCell, LastBlockCell},
indexer::indexer_cells::{
AccNumTxCell, BlockHashToBlockIdMapCell, BreakpointCellOwned, LastBreakpointIdCell,
LastObservedL1LibHeaderCell, TxHashToBlockIdMapCell,
LastObservedL1LibHeaderCell, TxHashToBlockIdMapCell, ZoneSdkIndexerCursorCellOwned,
},
};
@ -64,4 +64,10 @@ impl RocksDBIO {
self.get_opt::<AccNumTxCell>(acc_id)
.map(|opt| opt.map(|cell| cell.0))
}
pub fn get_zone_sdk_indexer_cursor_bytes(&self) -> DbResult<Option<Vec<u8>>> {
Ok(self
.get_opt::<ZoneSdkIndexerCursorCellOwned>(())?
.map(|cell| cell.0))
}
}

View File

@ -4,6 +4,7 @@ use crate::{
cells::shared_cells::{FirstBlockSetCell, LastBlockCell},
indexer::indexer_cells::{
BreakpointCellRef, LastBreakpointIdCell, LastObservedL1LibHeaderCell,
ZoneSdkIndexerCursorCellRef,
},
};
@ -30,6 +31,10 @@ impl RocksDBIO {
self.put(&FirstBlockSetCell(true), ())
}
pub fn put_zone_sdk_indexer_cursor_bytes(&self, bytes: &[u8]) -> DbResult<()> {
self.put(&ZoneSdkIndexerCursorCellRef(bytes), ())
}
// State
pub fn put_breakpoint(&self, br_id: u64, breakpoint: &V03State) -> DbResult<()> {

View File

@ -12,7 +12,7 @@ use crate::{
error::DbError,
sequencer::sequencer_cells::{
LastFinalizedBlockIdCell, LatestBlockMetaCellOwned, LatestBlockMetaCellRef,
NSSAStateCellOwned, NSSAStateCellRef,
NSSAStateCellOwned, NSSAStateCellRef, ZoneSdkCheckpointCellOwned, ZoneSdkCheckpointCellRef,
},
};
@ -22,6 +22,8 @@ pub mod sequencer_cells;
pub const DB_META_LAST_FINALIZED_BLOCK_ID: &str = "last_finalized_block_id";
/// Key base for storing metainformation about the latest block meta.
pub const DB_META_LATEST_BLOCK_META_KEY: &str = "latest_block_meta";
/// Key base for storing the zone-sdk sequencer checkpoint (opaque bytes).
pub const DB_META_ZONE_SDK_CHECKPOINT_KEY: &str = "zone_sdk_checkpoint";
/// Key base for storing the NSSA state.
pub const DB_NSSA_STATE_KEY: &str = "nssa_state";
@ -205,6 +207,16 @@ impl RocksDBIO {
self.get::<LatestBlockMetaCellOwned>(()).map(|val| val.0)
}
pub fn get_zone_sdk_checkpoint_bytes(&self) -> DbResult<Option<Vec<u8>>> {
Ok(self
.get_opt::<ZoneSdkCheckpointCellOwned>(())?
.map(|cell| cell.0))
}
pub fn put_zone_sdk_checkpoint_bytes(&self, bytes: &[u8]) -> DbResult<()> {
self.put(&ZoneSdkCheckpointCellRef(bytes), ())
}
pub fn put_block(
&self,
block: &Block,
@ -275,6 +287,22 @@ impl RocksDBIO {
Ok(())
}
/// Mark every pending block with `block_id <= last_finalized` as finalized.
/// Idempotent — already-finalized blocks are skipped.
pub fn clean_pending_blocks_up_to(&self, last_finalized: u64) -> DbResult<()> {
let pending_ids: Vec<u64> = self
.get_all_blocks()
.filter_map(Result::ok)
.filter(|b| matches!(b.bedrock_status, BedrockStatus::Pending))
.map(|b| b.header.block_id)
.filter(|id| *id <= last_finalized)
.collect();
for id in pending_ids {
self.mark_block_as_finalized(id)?;
}
Ok(())
}
pub fn mark_block_as_finalized(&self, block_id: u64) -> DbResult<()> {
let mut block = self.get_block(block_id)?.ok_or_else(|| {
DbError::db_interaction_error(format!("Block with id {block_id} not found"))

View File

@ -8,7 +8,7 @@ use crate::{
error::DbError,
sequencer::{
CF_NSSA_STATE_NAME, DB_META_LAST_FINALIZED_BLOCK_ID, DB_META_LATEST_BLOCK_META_KEY,
DB_NSSA_STATE_KEY,
DB_META_ZONE_SDK_CHECKPOINT_KEY, DB_NSSA_STATE_KEY,
},
};
@ -95,6 +95,42 @@ impl SimpleWritableCell for LatestBlockMetaCellRef<'_> {
}
}
/// Opaque bytes for the zone-sdk sequencer checkpoint. The caller is
/// responsible for the actual encoding (we use `serde_json` since
/// `SequencerCheckpoint` only derives serde, not borsh).
#[derive(BorshDeserialize)]
pub struct ZoneSdkCheckpointCellOwned(pub Vec<u8>);
impl SimpleStorableCell for ZoneSdkCheckpointCellOwned {
type KeyParams = ();
const CELL_NAME: &'static str = DB_META_ZONE_SDK_CHECKPOINT_KEY;
const CF_NAME: &'static str = CF_META_NAME;
}
impl SimpleReadableCell for ZoneSdkCheckpointCellOwned {}
#[derive(BorshSerialize)]
pub struct ZoneSdkCheckpointCellRef<'bytes>(pub &'bytes [u8]);
impl SimpleStorableCell for ZoneSdkCheckpointCellRef<'_> {
type KeyParams = ();
const CELL_NAME: &'static str = DB_META_ZONE_SDK_CHECKPOINT_KEY;
const CF_NAME: &'static str = CF_META_NAME;
}
impl SimpleWritableCell for ZoneSdkCheckpointCellRef<'_> {
fn value_constructor(&self) -> DbResult<Vec<u8>> {
borsh::to_vec(&self).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize zone-sdk checkpoint cell".to_owned()),
)
})
}
}
#[cfg(test)]
mod uniform_tests {
use crate::{