Merge branch 'main' into marvin/bip-32-comp

This commit is contained in:
jonesmarvin8 2026-02-26 12:41:50 -05:00
commit b9324bf03b
89 changed files with 4490 additions and 1664 deletions

View File

@ -154,7 +154,35 @@ jobs:
env:
RISC0_DEV_MODE: "1"
RUST_LOG: "info"
run: cargo nextest run -p integration_tests -- --skip tps_test
run: cargo nextest run -p integration_tests -- --skip tps_test --skip indexer
integration-tests-indexer:
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
- uses: actions/checkout@v5
with:
ref: ${{ github.head_ref }}
- uses: ./.github/actions/install-system-deps
- uses: ./.github/actions/install-risc0
- uses: ./.github/actions/install-logos-blockchain-circuits
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
- name: Install active toolchain
run: rustup install
- name: Install nextest
run: cargo install --locked cargo-nextest
- name: Run tests
env:
RISC0_DEV_MODE: "1"
RUST_LOG: "info"
run: cargo nextest run -p integration_tests indexer -- --skip tps_test
valid-proof-test:
runs-on: ubuntu-latest

View File

@ -11,10 +11,18 @@ jobs:
include:
- name: sequencer_runner
dockerfile: ./sequencer_runner/Dockerfile
build_args: |
STANDALONE=false
- name: sequencer_runner-standalone
dockerfile: ./sequencer_runner/Dockerfile
build_args: |
STANDALONE=true
- name: indexer_service
dockerfile: ./indexer/service/Dockerfile
build_args: ""
- name: explorer_service
dockerfile: ./explorer_service/Dockerfile
build_args: ""
steps:
- uses: actions/checkout@v5
@ -49,5 +57,6 @@ jobs:
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
build-args: ${{ matrix.build_args }}
cache-from: type=gha
cache-to: type=gha,mode=max

1514
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -25,8 +25,6 @@ members = [
"indexer/service/protocol",
"indexer/service/rpc",
"explorer_service",
"programs/token/core",
"programs/token",
"program_methods",
"program_methods/guest",
"test_program_methods",
@ -52,7 +50,7 @@ indexer_service = { path = "indexer/service" }
indexer_service_protocol = { path = "indexer/service/protocol" }
indexer_service_rpc = { path = "indexer/service/rpc" }
wallet = { path = "wallet" }
wallet-ffi = { path = "wallet-ffi" }
wallet-ffi = { path = "wallet-ffi", default-features = false }
token_core = { path = "programs/token/core" }
token_program = { path = "programs/token" }
amm_core = { path = "programs/amm/core" }
@ -89,6 +87,9 @@ thiserror = "2.0.12"
sha2 = "0.10.8"
hex = "0.4.3"
bytemuck = "1.24.0"
bytesize = { version = "2.3.1", features = ["serde"] }
humantime-serde = "1.1"
humantime = "2.1"
aes-gcm = "0.10.3"
toml = "0.7.4"
bincode = "1.3.3"
@ -113,6 +114,7 @@ logos-blockchain-common-http-client = { git = "https://github.com/logos-blockcha
logos-blockchain-key-management-system-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
logos-blockchain-core = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
logos-blockchain-chain-broadcast-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
logos-blockchain-chain-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
rocksdb = { version = "0.24.0", default-features = false, features = [
"snappy",

View File

@ -35,8 +35,12 @@ To our knowledge, this design is unique to LEZ. Other privacy-focused programmab
3. Transferring private to public (local / privacy-preserving execution)
- Bob executes the token program `Transfer` function locally, sending to Charlies public account.
- A ZKP of correct execution is generated.
- Bobs private balance stays hidden.
- Charlies public account is updated on-chain.
- Bobs private account and balance still remain hidden.
- Charlie's public account is modified with the new tokens added.
4. Transferring public to public (public execution):
- Alice submits a transaction to execute the token program `Transfer` function on-chain, specifying Charlie's public account as recipient.
- The execution is handled on-chain without ZKPs involved.
- Alice's and Charlie's accounts are modified according to the transaction.
4. Transfer from public to public (public execution)
- Alice submits an on-chain transaction to run `Transfer`, sending to Charlies public account.
@ -139,12 +143,67 @@ The sequencer and logos blockchain node can be run locally:
- `cargo build --all-features`
- `./target/debug/logos-blockchain-node --deployment nodes/node/standalone-deployment-config.yaml nodes/node/standalone-node-config.yaml`
2. On another terminal go to the `logos-blockchain/lssa` repo and run indexer service:
2. Alternatively (WARNING: This node is outdated) go to ``logos-blockchain/lssa/` repo and run the node from docker:
- `cd bedrock`
- Change line 14 of `docker-compose.yml` from `"0:18080/tcp"` into `"8080:18080/tcp"`
- `docker compose up`
3. On another terminal go to the `logos-blockchain/lssa` repo and run indexer service:
- `RUST_LOG=info cargo run -p indexer_service indexer/service/configs/indexer_config.json`
3. On another terminal go to the `logos-blockchain/lssa` repo and run the sequencer:
4. On another terminal go to the `logos-blockchain/lssa` repo and run the sequencer:
- `RUST_LOG=info cargo run -p sequencer_runner sequencer_runner/configs/debug`
### Notes on cleanup
After stopping services above you need to remove 3 folders to start cleanly:
1. In the `logos-blockchain/logos-blockchain` folder `state` (not needed in case of docker setup)
2. In the `lssa` folder `sequencer_runner/rocksdb`
3. In the `lssa` file `sequencer_runner/bedrock_signing_key`
4. In the `lssa` folder `indexer/service/rocksdb`
### Normal mode (`just` commands)
We provide a `Justfile` for developer and user needs, you can run the whole setup with it. The only difference will be that logos-blockchain (bedrock) will be started from docker.
#### 1'st Terminal
```bash
just run-bedrock
```
#### 2'nd Terminal
```bash
just run-indexer
```
#### 3'rd Terminal
```bash
just run-sequencer
```
#### 4'th Terminal
```bash
just run-explorer
```
#### 5'th Terminal
You can run any command our wallet support by passing it as an argument for `just run-wallet`, for example:
```bash
just run-wallet check-health
```
This will use a wallet binary built from this repo and not the one installed in your system if you have some. Also another wallet home directory will be used. This is done to not to mess up with your local wallet and to easily clean generated files (see next section).
#### Shutdown
1. Press `ctrl-c` in every terminal
2. Run `just clean` to clean runtime data
### Standalone mode
The sequencer can be run in standalone mode with:
```bash

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -13,6 +13,8 @@ tokio-retry.workspace = true
futures.workspace = true
log.workspace = true
serde.workspace = true
humantime-serde.workspace = true
logos-blockchain-common-http-client.workspace = true
logos-blockchain-core.workspace = true
logos-blockchain-chain-broadcast-service.workspace = true
logos-blockchain-chain-service.workspace = true

View File

@ -3,8 +3,11 @@ use std::time::Duration;
use anyhow::{Context as _, Result};
use common::config::BasicAuth;
use futures::{Stream, TryFutureExt};
#[expect(clippy::single_component_path_imports, reason = "Satisfy machete")]
use humantime_serde;
use log::{info, warn};
pub use logos_blockchain_chain_broadcast_service::BlockInfo;
use logos_blockchain_chain_service::CryptarchiaInfo;
pub use logos_blockchain_common_http_client::{CommonHttpClient, Error};
pub use logos_blockchain_core::{block::Block, header::HeaderId, mantle::SignedMantleTx};
use reqwest::{Client, Url};
@ -14,14 +17,15 @@ use tokio_retry::Retry;
/// Fibonacci backoff retry strategy configuration
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
pub struct BackoffConfig {
pub start_delay_millis: u64,
#[serde(with = "humantime_serde")]
pub start_delay: Duration,
pub max_retries: usize,
}
impl Default for BackoffConfig {
fn default() -> Self {
Self {
start_delay_millis: 100,
start_delay: Duration::from_millis(100),
max_retries: 5,
}
}
@ -82,8 +86,19 @@ impl BedrockClient {
.await
}
pub async fn get_consensus_info(&self) -> Result<CryptarchiaInfo, Error> {
Retry::spawn(self.backoff_strategy(), || {
self.http_client
.consensus_info(self.node_url.clone())
.inspect_err(|err| warn!("Block fetching failed with error: {err:#}"))
})
.await
}
fn backoff_strategy(&self) -> impl Iterator<Item = Duration> {
tokio_retry::strategy::FibonacciBackoff::from_millis(self.backoff.start_delay_millis)
.take(self.backoff.max_retries)
tokio_retry::strategy::FibonacciBackoff::from_millis(
self.backoff.start_delay.as_millis() as u64
)
.take(self.backoff.max_retries)
}
}

View File

@ -18,6 +18,7 @@ sha2.workspace = true
log.workspace = true
hex.workspace = true
borsh.workspace = true
bytesize.workspace = true
base64.workspace = true
url.workspace = true
logos-blockchain-common-http-client.workspace = true

View File

@ -1,4 +1,6 @@
use borsh::{BorshDeserialize, BorshSerialize};
use nssa::AccountId;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256, digest::FixedOutput};
use crate::{HashType, transaction::NSSATransaction};
@ -50,7 +52,7 @@ pub enum BedrockStatus {
Finalized,
}
#[derive(Debug, BorshSerialize, BorshDeserialize)]
#[derive(Debug, Clone, BorshSerialize, BorshDeserialize)]
pub struct Block {
pub header: BlockHeader,
pub body: BlockBody,
@ -107,6 +109,20 @@ impl From<Block> for HashableBlockData {
}
}
/// Helper struct for account (de-)serialization
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AccountInitialData {
pub account_id: AccountId,
pub balance: u128,
}
/// Helper struct to (de-)serialize initial commitments
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommitmentsInitialData {
pub npk: nssa_core::NullifierPublicKey,
pub account: nssa_core::account::Account,
}
#[cfg(test)]
mod tests {
use crate::{HashType, block::HashableBlockData, test_utils};

View File

@ -1,3 +1,4 @@
use bytesize::ByteSize;
use serde::{Deserialize, Serialize};
pub mod errors;
@ -8,13 +9,13 @@ pub mod requests;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct RpcLimitsConfig {
/// Maximum byte size of the json payload.
pub json_payload_max_size: usize,
pub json_payload_max_size: ByteSize,
}
impl Default for RpcLimitsConfig {
fn default() -> Self {
Self {
json_payload_max_size: 10 * 1024 * 1024,
json_payload_max_size: ByteSize::mib(10),
}
}
}

View File

@ -1,4 +1,6 @@
use borsh::{BorshDeserialize, BorshSerialize};
use log::warn;
use nssa::{AccountId, V02State};
use serde::{Deserialize, Serialize};
use crate::HashType;
@ -18,6 +20,54 @@ impl NSSATransaction {
NSSATransaction::ProgramDeployment(tx) => tx.hash(),
})
}
pub fn affected_public_account_ids(&self) -> Vec<AccountId> {
match self {
NSSATransaction::ProgramDeployment(tx) => tx.affected_public_account_ids(),
NSSATransaction::Public(tx) => tx.affected_public_account_ids(),
NSSATransaction::PrivacyPreserving(tx) => tx.affected_public_account_ids(),
}
}
// TODO: Introduce type-safe wrapper around checked transaction, e.g. AuthenticatedTransaction
pub fn transaction_stateless_check(self) -> Result<Self, TransactionMalformationError> {
// Stateless checks here
match self {
NSSATransaction::Public(tx) => {
if tx.witness_set().is_valid_for(tx.message()) {
Ok(NSSATransaction::Public(tx))
} else {
Err(TransactionMalformationError::InvalidSignature)
}
}
NSSATransaction::PrivacyPreserving(tx) => {
if tx.witness_set().signatures_are_valid_for(tx.message()) {
Ok(NSSATransaction::PrivacyPreserving(tx))
} else {
Err(TransactionMalformationError::InvalidSignature)
}
}
NSSATransaction::ProgramDeployment(tx) => Ok(NSSATransaction::ProgramDeployment(tx)),
}
}
pub fn execute_check_on_state(
self,
state: &mut V02State,
) -> Result<Self, nssa::error::NssaError> {
match &self {
NSSATransaction::Public(tx) => state.transition_from_public_transaction(tx),
NSSATransaction::PrivacyPreserving(tx) => {
state.transition_from_privacy_preserving_transaction(tx)
}
NSSATransaction::ProgramDeployment(tx) => {
state.transition_from_program_deployment_transaction(tx)
}
}
.inspect_err(|err| warn!("Error at transition {err:#?}"))?;
Ok(self)
}
}
impl From<nssa::PublicTransaction> for NSSATransaction {
@ -46,3 +96,13 @@ pub enum TxKind {
PrivacyPreserving,
ProgramDeployment,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, thiserror::Error)]
pub enum TransactionMalformationError {
#[error("Invalid signature(-s)")]
InvalidSignature,
#[error("Failed to decode transaction with hash: {tx:?}")]
FailedToDecode { tx: HashType },
#[error("Transaction size {size} exceeds maximum allowed size of {max} bytes")]
TransactionTooLarge { size: usize, max: usize },
}

View File

@ -346,7 +346,7 @@ _wallet_config() {
'all'
'override_rust_log'
'sequencer_addr'
'seq_poll_timeout_millis'
'seq_poll_timeout'
'seq_tx_poll_max_blocks'
'seq_poll_max_retries'
'seq_block_poll_max_amount'

View File

@ -1,9 +1,9 @@
{
"resubscribe_interval_millis": 1000,
"resubscribe_interval": "1s",
"bedrock_client_config": {
"addr": "http://logos-blockchain-node-0:18080",
"backoff": {
"start_delay_millis": 100,
"start_delay": "100ms",
"max_retries": 5
}
},

View File

@ -4,13 +4,14 @@
"genesis_id": 1,
"is_genesis_random": true,
"max_num_tx_in_block": 20,
"max_block_size": "1 MiB",
"mempool_max_size": 10000,
"block_create_timeout_millis": 10000,
"retry_pending_blocks_timeout_millis": 7000,
"block_create_timeout": "10s",
"retry_pending_blocks_timeout": "7s",
"port": 3040,
"bedrock_config": {
"backoff": {
"start_delay_millis": 100,
"start_delay": "100ms",
"max_retries": 5
},
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101",

View File

@ -3,7 +3,7 @@
This guide walks you through running the sequencer, compiling example programs, deploying a Hello World program, and interacting with accounts.
You'll find:
- Programs: example NSSA programs under `methods/guest/src/bin`.
- Programs: example LEZ programs under `methods/guest/src/bin`.
- Runners: scripts to create and submit transactions to invoke these programs publicly and privately under `src/bin`.
# 0. Install the wallet
@ -13,16 +13,7 @@ cargo install --path wallet --force
```
# 1. Run the sequencer
From the projects root directory, start the sequencer:
```bash
cd sequencer_runner
RUST_LOG=info cargo run $(pwd)/configs/debug
```
Keep this terminal open. Well use it only to observe the node logs.
> [!NOTE]
> If you have already ran this before you'll see a `rocksdb` directory with stored blocks. Be sure to remove that directory to follow this tutorial.
From the projects root directory, start the sequencer by following [these instructions](https://github.com/logos-blockchain/lssa#run-the-sequencer-and-node).
## Checking and setting up the wallet
For sanity let's check that the wallet can connect to it.

View File

@ -1,5 +1,3 @@
use std::str::FromStr as _;
use indexer_service_protocol::{Account, AccountId, Block, BlockId, HashType, Transaction};
use leptos::prelude::*;
use serde::{Deserialize, Serialize};
@ -30,7 +28,10 @@ pub async fn get_account(account_id: AccountId) -> Result<Account, ServerFnError
/// Search for a block, transaction, or account by query string
#[server]
pub async fn search(query: String) -> Result<SearchResults, ServerFnError> {
use std::str::FromStr as _;
use indexer_service_rpc::RpcClient as _;
let client = expect_context::<IndexerRpcClient>();
let mut blocks = Vec::new();

View File

@ -7,10 +7,14 @@ license = { workspace = true }
[dependencies]
common.workspace = true
bedrock_client.workspace = true
nssa.workspace = true
nssa_core.workspace = true
storage.workspace = true
anyhow.workspace = true
log.workspace = true
serde.workspace = true
humantime-serde.workspace = true
tokio.workspace = true
borsh.workspace = true
futures.workspace = true

View File

@ -0,0 +1,115 @@
use std::{path::Path, sync::Arc};
use anyhow::Result;
use bedrock_client::HeaderId;
use common::{block::Block, transaction::NSSATransaction};
use nssa::{Account, AccountId, V02State};
use storage::indexer::RocksDBIO;
#[derive(Clone)]
pub struct IndexerStore {
dbio: Arc<RocksDBIO>,
}
impl IndexerStore {
/// Starting database at the start of new chain.
/// Creates files if necessary.
///
/// ATTENTION: Will overwrite genesis block.
pub fn open_db_with_genesis(
location: &Path,
start_data: Option<(Block, V02State)>,
) -> Result<Self> {
let dbio = RocksDBIO::open_or_create(location, start_data)?;
Ok(Self {
dbio: Arc::new(dbio),
})
}
/// Reopening existing database
pub fn open_db_restart(location: &Path) -> Result<Self> {
Self::open_db_with_genesis(location, None)
}
pub fn last_observed_l1_lib_header(&self) -> Result<Option<HeaderId>> {
Ok(self
.dbio
.get_meta_last_observed_l1_lib_header_in_db()?
.map(HeaderId::from))
}
pub fn get_last_block_id(&self) -> Result<u64> {
Ok(self.dbio.get_meta_last_block_in_db()?)
}
pub fn get_block_at_id(&self, id: u64) -> Result<Block> {
Ok(self.dbio.get_block(id)?)
}
pub fn get_block_batch(&self, offset: u64, limit: u64) -> Result<Vec<Block>> {
Ok(self.dbio.get_block_batch(offset, limit)?)
}
pub fn get_transaction_by_hash(&self, tx_hash: [u8; 32]) -> Result<NSSATransaction> {
let block = self.get_block_at_id(self.dbio.get_block_id_by_tx_hash(tx_hash)?)?;
let transaction = block
.body
.transactions
.iter()
.find(|enc_tx| enc_tx.hash().0 == tx_hash)
.ok_or_else(|| anyhow::anyhow!("Transaction not found in DB"))?;
Ok(transaction.clone())
}
pub fn get_block_by_hash(&self, hash: [u8; 32]) -> Result<Block> {
self.get_block_at_id(self.dbio.get_block_id_by_hash(hash)?)
}
pub fn get_transactions_by_account(
&self,
acc_id: [u8; 32],
offset: u64,
limit: u64,
) -> Result<Vec<NSSATransaction>> {
Ok(self.dbio.get_acc_transactions(acc_id, offset, limit)?)
}
pub fn genesis_id(&self) -> u64 {
self.dbio
.get_meta_first_block_in_db()
.expect("Must be set at the DB startup")
}
pub fn last_block(&self) -> u64 {
self.dbio
.get_meta_last_block_in_db()
.expect("Must be set at the DB startup")
}
pub fn get_state_at_block(&self, block_id: u64) -> Result<V02State> {
Ok(self.dbio.calculate_state_for_id(block_id)?)
}
pub fn final_state(&self) -> Result<V02State> {
Ok(self.dbio.final_state()?)
}
pub fn get_account_final(&self, account_id: &AccountId) -> Result<Account> {
Ok(self.final_state()?.get_account_by_id(*account_id))
}
pub fn put_block(&self, block: Block, l1_header: HeaderId) -> Result<()> {
let mut final_state = self.dbio.final_state()?;
for transaction in &block.body.transactions {
transaction
.clone()
.transaction_stateless_check()?
.execute_check_on_state(&mut final_state)?;
}
Ok(self.dbio.put_block(block, l1_header.into())?)
}
}

View File

@ -1,14 +1,23 @@
use std::{fs::File, io::BufReader, path::Path};
use std::{
fs::File,
io::BufReader,
path::{Path, PathBuf},
time::Duration,
};
use anyhow::{Context as _, Result};
pub use bedrock_client::BackoffConfig;
use common::config::BasicAuth;
use common::{
block::{AccountInitialData, CommitmentsInitialData},
config::BasicAuth,
};
use humantime_serde;
pub use logos_blockchain_core::mantle::ops::channel::ChannelId;
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BedrockClientConfig {
pub struct ClientConfig {
/// For individual RPC requests we use Fibonacci backoff retry strategy.
pub backoff: BackoffConfig,
pub addr: Url,
@ -18,8 +27,17 @@ pub struct BedrockClientConfig {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexerConfig {
pub resubscribe_interval_millis: u64,
pub bedrock_client_config: BedrockClientConfig,
/// Home dir of sequencer storage
pub home: PathBuf,
/// List of initial accounts data
pub initial_accounts: Vec<AccountInitialData>,
/// List of initial commitments
pub initial_commitments: Vec<CommitmentsInitialData>,
/// Sequencers signing key
pub signing_key: [u8; 32],
#[serde(with = "humantime_serde")]
pub consensus_info_polling_interval: Duration,
pub bedrock_client_config: ClientConfig,
pub channel_id: ChannelId,
}

View File

@ -1,110 +1,341 @@
use std::sync::Arc;
use std::collections::VecDeque;
use anyhow::{Context as _, Result};
use bedrock_client::BedrockClient;
use common::block::Block;
use futures::StreamExt;
use log::{debug, info};
use anyhow::Result;
use bedrock_client::{BedrockClient, HeaderId};
use common::block::{Block, HashableBlockData};
// ToDo: Remove after testnet
use common::{HashType, PINATA_BASE58};
use log::{debug, error, info};
use logos_blockchain_core::mantle::{
Op, SignedMantleTx,
ops::channel::{ChannelId, inscribe::InscriptionOp},
};
use tokio::sync::RwLock;
use crate::{config::IndexerConfig, state::IndexerState};
use crate::{block_store::IndexerStore, config::IndexerConfig};
pub mod block_store;
pub mod config;
pub mod state;
#[derive(Clone)]
pub struct IndexerCore {
bedrock_client: BedrockClient,
config: IndexerConfig,
state: IndexerState,
pub bedrock_client: BedrockClient,
pub config: IndexerConfig,
pub store: IndexerStore,
}
#[derive(Clone)]
/// This struct represents one L1 block data fetched from backfilling
pub struct BackfillBlockData {
l2_blocks: Vec<Block>,
l1_header: HeaderId,
}
#[derive(Clone)]
/// This struct represents data fetched fom backfilling in one iteration
pub struct BackfillData {
block_data: VecDeque<BackfillBlockData>,
curr_fin_l1_lib_header: HeaderId,
}
impl IndexerCore {
pub fn new(config: IndexerConfig) -> Result<Self> {
let hashable_data = HashableBlockData {
block_id: 1,
transactions: vec![],
prev_block_hash: HashType([0; 32]),
timestamp: 0,
};
// Genesis creation is fine as it is,
// because it will be overwritten by sequencer.
// Therefore:
// ToDo: remove key from indexer config, use some default.
let signing_key = nssa::PrivateKey::try_new(config.signing_key).unwrap();
let channel_genesis_msg_id = [0; 32];
let start_block = hashable_data.into_pending_block(&signing_key, channel_genesis_msg_id);
// This is a troubling moment, because changes in key protocol can
// affect this. And indexer can not reliably ask this data from sequencer
// because indexer must be independent from it.
// ToDo: move initial state generation into common and use the same method
// for indexer and sequencer. This way both services buit at same version
// could be in sync.
let initial_commitments: Vec<nssa_core::Commitment> = config
.initial_commitments
.iter()
.map(|init_comm_data| {
let npk = &init_comm_data.npk;
let mut acc = init_comm_data.account.clone();
acc.program_owner = nssa::program::Program::authenticated_transfer_program().id();
nssa_core::Commitment::new(npk, &acc)
})
.collect();
let init_accs: Vec<(nssa::AccountId, u128)> = config
.initial_accounts
.iter()
.map(|acc_data| (acc_data.account_id, acc_data.balance))
.collect();
let mut state = nssa::V02State::new_with_genesis_accounts(&init_accs, &initial_commitments);
// ToDo: Remove after testnet
state.add_pinata_program(PINATA_BASE58.parse().unwrap());
let home = config.home.join("rocksdb");
Ok(Self {
bedrock_client: BedrockClient::new(
config.bedrock_client_config.backoff,
config.bedrock_client_config.addr.clone(),
config.bedrock_client_config.auth.clone(),
)
.context("Failed to create Bedrock client")?,
)?,
config,
// No state setup for now, future task.
state: IndexerState {
latest_seen_block: Arc::new(RwLock::new(0)),
},
store: IndexerStore::open_db_with_genesis(&home, Some((start_block, state)))?,
})
}
pub async fn subscribe_parse_block_stream(&self) -> impl futures::Stream<Item = Result<Block>> {
debug!("Subscribing to Bedrock block stream");
async_stream::stream! {
loop {
let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?);
info!("Searching for initial header");
info!("Block stream joined");
let last_l1_lib_header = self.store.last_observed_l1_lib_header()?;
while let Some(block_info) = stream_pinned.next().await {
let header_id = block_info.header_id;
let mut prev_last_l1_lib_header = match last_l1_lib_header {
Some(last_l1_lib_header) => {
info!("Last l1 lib header found: {last_l1_lib_header}");
last_l1_lib_header
},
None => {
info!("Last l1 lib header not found in DB");
info!("Searching for the start of a channel");
info!("Observed L1 block at height {}", block_info.height);
let BackfillData {
block_data: start_buff,
curr_fin_l1_lib_header: last_l1_lib_header,
} = self.search_for_channel_start().await?;
if let Some(l1_block) = self
.bedrock_client
.get_block_by_id(header_id)
.await?
{
info!("Extracted L1 block at height {}", block_info.height);
let l2_blocks_parsed = parse_blocks(
l1_block.into_transactions().into_iter(),
&self.config.channel_id,
).collect::<Vec<_>>();
info!("Parsed {} L2 blocks", l2_blocks_parsed.len());
for l2_block in l2_blocks_parsed {
// State modification, will be updated in future
{
let mut guard = self.state.latest_seen_block.write().await;
if l2_block.header.block_id > *guard {
*guard = l2_block.header.block_id;
}
}
for BackfillBlockData {
l2_blocks: l2_block_vec,
l1_header,
} in start_buff {
for l2_block in l2_block_vec {
self.store.put_block(l2_block.clone(), l1_header)?;
yield Ok(l2_block);
}
}
last_l1_lib_header
},
};
info!("Searching for initial header finished");
info!("Starting backfilling from {prev_last_l1_lib_header}");
loop {
let BackfillData {
block_data: buff,
curr_fin_l1_lib_header,
} = self
.backfill_to_last_l1_lib_header_id(prev_last_l1_lib_header, &self.config.channel_id)
.await
.inspect_err(|err| error!("Failed to backfill to last l1 lib header id with err {err:#?}"))?;
prev_last_l1_lib_header = curr_fin_l1_lib_header;
for BackfillBlockData {
l2_blocks: l2_block_vec,
l1_header: header,
} in buff {
for l2_block in l2_block_vec {
self.store.put_block(l2_block.clone(), header)?;
yield Ok(l2_block);
}
}
}
}
}
async fn get_lib(&self) -> Result<HeaderId> {
Ok(self.bedrock_client.get_consensus_info().await?.lib)
}
async fn get_next_lib(&self, prev_lib: HeaderId) -> Result<HeaderId> {
loop {
let next_lib = self.get_lib().await?;
if next_lib != prev_lib {
break Ok(next_lib);
} else {
info!(
"Wait {:?} to not spam the node",
self.config.consensus_info_polling_interval
);
tokio::time::sleep(self.config.consensus_info_polling_interval).await;
}
}
}
/// WARNING: depending on channel state,
/// may take indefinite amount of time
pub async fn search_for_channel_start(&self) -> Result<BackfillData> {
let mut curr_last_l1_lib_header = self.get_lib().await?;
let mut backfill_start = curr_last_l1_lib_header;
// ToDo: How to get root?
let mut backfill_limit = HeaderId::from([0; 32]);
// ToDo: Not scalable, initial buffer should be stored in DB to not run out of memory
// Don't want to complicate DB even more right now.
let mut block_buffer = VecDeque::new();
'outer: loop {
let mut cycle_header = curr_last_l1_lib_header;
loop {
let cycle_block =
if let Some(block) = self.bedrock_client.get_block_by_id(cycle_header).await? {
block
} else {
// First run can reach root easily
// so here we are optimistic about L1
// failing to get parent.
break;
};
// It would be better to have id, but block does not have it, so slot will do.
info!(
"INITIAL SEARCH: Observed L1 block at slot {}",
cycle_block.header().slot().into_inner()
);
debug!(
"INITIAL SEARCH: This block header is {}",
cycle_block.header().id()
);
debug!(
"INITIAL SEARCH: This block parent is {}",
cycle_block.header().parent()
);
let (l2_block_vec, l1_header) =
parse_block_owned(&cycle_block, &self.config.channel_id);
info!("Parsed {} L2 blocks", l2_block_vec.len());
if !l2_block_vec.is_empty() {
block_buffer.push_front(BackfillBlockData {
l2_blocks: l2_block_vec.clone(),
l1_header,
});
}
// Refetch stream after delay
tokio::time::sleep(std::time::Duration::from_millis(
self.config.resubscribe_interval_millis,
))
.await;
if let Some(first_l2_block) = l2_block_vec.first()
&& first_l2_block.header.block_id == 1
{
info!("INITIAL_SEARCH: Found channel start");
break 'outer;
}
// Step back to parent
let parent = cycle_block.header().parent();
if parent == backfill_limit {
break;
}
cycle_header = parent;
}
info!("INITIAL_SEARCH: Reached backfill limit, refetching last l1 lib header");
block_buffer.clear();
backfill_limit = backfill_start;
curr_last_l1_lib_header = self.get_next_lib(curr_last_l1_lib_header).await?;
backfill_start = curr_last_l1_lib_header;
}
Ok(BackfillData {
block_data: block_buffer,
curr_fin_l1_lib_header: backfill_limit,
})
}
pub async fn backfill_to_last_l1_lib_header_id(
&self,
last_fin_l1_lib_header: HeaderId,
channel_id: &ChannelId,
) -> Result<BackfillData> {
let curr_fin_l1_lib_header = self.get_next_lib(last_fin_l1_lib_header).await?;
// ToDo: Not scalable, buffer should be stored in DB to not run out of memory
// Don't want to complicate DB even more right now.
let mut block_buffer = VecDeque::new();
let mut cycle_header = curr_fin_l1_lib_header;
loop {
let Some(cycle_block) = self.bedrock_client.get_block_by_id(cycle_header).await? else {
return Err(anyhow::anyhow!("Parent not found"));
};
if cycle_block.header().id() == last_fin_l1_lib_header {
break;
} else {
// Step back to parent
cycle_header = cycle_block.header().parent();
}
// It would be better to have id, but block does not have it, so slot will do.
info!(
"Observed L1 block at slot {}",
cycle_block.header().slot().into_inner()
);
let (l2_block_vec, l1_header) = parse_block_owned(&cycle_block, channel_id);
info!("Parsed {} L2 blocks", l2_block_vec.len());
if !l2_block_vec.is_empty() {
block_buffer.push_front(BackfillBlockData {
l2_blocks: l2_block_vec,
l1_header,
});
}
}
Ok(BackfillData {
block_data: block_buffer,
curr_fin_l1_lib_header,
})
}
}
fn parse_blocks(
block_txs: impl Iterator<Item = SignedMantleTx>,
fn parse_block_owned(
l1_block: &bedrock_client::Block<SignedMantleTx>,
decoded_channel_id: &ChannelId,
) -> impl Iterator<Item = Block> {
block_txs.flat_map(|tx| {
tx.mantle_tx.ops.into_iter().filter_map(|op| match op {
Op::ChannelInscribe(InscriptionOp {
channel_id,
inscription,
..
}) if channel_id == *decoded_channel_id => {
borsh::from_slice::<Block>(&inscription).ok()
}
_ => None,
})
})
) -> (Vec<Block>, HeaderId) {
(
l1_block
.transactions()
.flat_map(|tx| {
tx.mantle_tx.ops.iter().filter_map(|op| match op {
Op::ChannelInscribe(InscriptionOp {
channel_id,
inscription,
..
}) if channel_id == decoded_channel_id => {
borsh::from_slice::<Block>(inscription)
.inspect_err(|err| {
error!("Failed to deserialize our inscription with err: {err:#?}")
})
.ok()
}
_ => None,
})
})
.collect(),
l1_block.header().id(),
)
}

View File

@ -35,6 +35,11 @@ RUN strip /indexer_service/target/release/indexer_service
# Runtime stage - minimal image
FROM debian:trixie-slim
# Install runtime dependencies
RUN apt-get update \
&& apt-get install -y gosu jq \
&& rm -rf /var/lib/apt/lists/*
# Create non-root user for security
RUN useradd -m -u 1000 -s /bin/bash indexer_service_user && \
mkdir -p /indexer_service /etc/indexer_service && \
@ -43,6 +48,10 @@ RUN useradd -m -u 1000 -s /bin/bash indexer_service_user && \
# Copy binary from builder
COPY --from=builder --chown=indexer_service_user:indexer_service_user /indexer_service/target/release/indexer_service /usr/local/bin/indexer_service
# Copy entrypoint script
COPY indexer/service/docker-entrypoint.sh /docker-entrypoint.sh
RUN chmod +x /docker-entrypoint.sh
# Expose default port
EXPOSE 8779
@ -60,7 +69,9 @@ HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
# Run the application
ENV RUST_LOG=info
USER indexer_service_user
USER root
ENTRYPOINT ["/docker-entrypoint.sh"]
WORKDIR /indexer_service
CMD ["indexer_service", "/etc/indexer_service/indexer_config.json"]

View File

@ -1,11 +1,160 @@
{
"resubscribe_interval_millis": 1000,
"home": "./indexer/service",
"consensus_info_polling_interval": "1s",
"bedrock_client_config": {
"addr": "http://localhost:8080",
"backoff": {
"start_delay_millis": 100,
"start_delay": "100ms",
"max_retries": 5
}
},
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101"
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101",
"initial_accounts": [
{
"account_id": "BLgCRDXYdQPMMWVHYRFGQZbgeHx9frkipa8GtpG2Syqy",
"balance": 10000
},
{
"account_id": "Gj1mJy5W7J5pfmLRujmQaLfLMWidNxQ6uwnhb666ZwHw",
"balance": 20000
}
],
"initial_commitments": [
{
"npk": [
63,
202,
178,
231,
183,
82,
237,
212,
216,
221,
215,
255,
153,
101,
177,
161,
254,
210,
128,
122,
54,
190,
230,
151,
183,
64,
225,
229,
113,
1,
228,
97
],
"account": {
"program_owner": [
0,
0,
0,
0,
0,
0,
0,
0
],
"balance": 10000,
"data": [],
"nonce": 0
}
},
{
"npk": [
192,
251,
166,
243,
167,
236,
84,
249,
35,
136,
130,
172,
219,
225,
161,
139,
229,
89,
243,
125,
194,
213,
209,
30,
23,
174,
100,
244,
124,
74,
140,
47
],
"account": {
"program_owner": [
0,
0,
0,
0,
0,
0,
0,
0
],
"balance": 20000,
"data": [],
"nonce": 0
}
}
],
"signing_key": [
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37
]
}

View File

@ -10,3 +10,5 @@ services:
volumes:
# Mount configuration
- ./configs/indexer_config.json:/etc/indexer_service/indexer_config.json
# Mount data folder
- ./data:/var/lib/indexer_service

View File

@ -0,0 +1,29 @@
#!/bin/sh
# This is an entrypoint script for the indexer_service Docker container,
# it's not meant to be executed outside of the container.
set -e
CONFIG="/etc/indexer_service/indexer_service.json"
# Check config file exists
if [ ! -f "$CONFIG" ]; then
echo "Config file not found: $CONFIG" >&2
exit 1
fi
# Parse home dir
HOME_DIR=$(jq -r '.home' "$CONFIG")
if [ -z "$HOME_DIR" ] || [ "$HOME_DIR" = "null" ]; then
echo "'home' key missing in config" >&2
exit 1
fi
# Give permissions to the data directory and switch to non-root user
if [ "$(id -u)" = "0" ]; then
mkdir -p "$HOME_DIR"
chown -R indexer_service_user:indexer_service_user "$HOME_DIR"
exec gosu indexer_service_user "$@"
fi

View File

@ -26,6 +26,9 @@ pub trait Rpc {
#[subscription(name = "subscribeToFinalizedBlocks", item = BlockId)]
async fn subscribe_to_finalized_blocks(&self) -> SubscriptionResult;
#[method(name = "getLastFinalizedBlockId")]
async fn get_last_finalized_block_id(&self) -> Result<BlockId, ErrorObjectOwned>;
#[method(name = "getBlockById")]
async fn get_block_by_id(&self, block_id: BlockId) -> Result<Block, ErrorObjectOwned>;
@ -48,4 +51,8 @@ pub trait Rpc {
limit: u32,
offset: u32,
) -> Result<Vec<Transaction>, ErrorObjectOwned>;
// ToDo: expand healthcheck response into some kind of report
#[method(name = "checkHealth")]
async fn healthcheck(&self) -> Result<(), ErrorObjectOwned>;
}

View File

@ -180,6 +180,15 @@ impl indexer_service_rpc::RpcServer for MockIndexerService {
Ok(())
}
async fn get_last_finalized_block_id(&self) -> Result<BlockId, ErrorObjectOwned> {
self.blocks
.last()
.map(|bl| bl.header.block_id)
.ok_or_else(|| {
ErrorObjectOwned::owned(-32001, "Last block not found".to_string(), None::<()>)
})
}
async fn get_block_by_id(&self, block_id: BlockId) -> Result<Block, ErrorObjectOwned> {
self.blocks
.iter()
@ -268,4 +277,8 @@ impl indexer_service_rpc::RpcServer for MockIndexerService {
.map(|(tx, _)| tx.clone())
.collect())
}
async fn healthcheck(&self) -> Result<(), ErrorObjectOwned> {
Ok(())
}
}

View File

@ -15,11 +15,6 @@ use tokio::sync::mpsc::UnboundedSender;
pub struct IndexerService {
subscription_service: SubscriptionService,
#[expect(
dead_code,
reason = "Will be used in future implementations of RPC methods"
)]
indexer: IndexerCore,
}
@ -53,33 +48,88 @@ impl indexer_service_rpc::RpcServer for IndexerService {
Ok(())
}
async fn get_block_by_id(&self, _block_id: BlockId) -> Result<Block, ErrorObjectOwned> {
Err(not_yet_implemented_error())
async fn get_last_finalized_block_id(&self) -> Result<BlockId, ErrorObjectOwned> {
self.indexer.store.get_last_block_id().map_err(db_error)
}
async fn get_block_by_hash(&self, _block_hash: HashType) -> Result<Block, ErrorObjectOwned> {
Err(not_yet_implemented_error())
async fn get_block_by_id(&self, block_id: BlockId) -> Result<Block, ErrorObjectOwned> {
Ok(self
.indexer
.store
.get_block_at_id(block_id)
.map_err(db_error)?
.into())
}
async fn get_account(&self, _account_id: AccountId) -> Result<Account, ErrorObjectOwned> {
Err(not_yet_implemented_error())
async fn get_block_by_hash(&self, block_hash: HashType) -> Result<Block, ErrorObjectOwned> {
Ok(self
.indexer
.store
.get_block_by_hash(block_hash.0)
.map_err(db_error)?
.into())
}
async fn get_transaction(&self, _tx_hash: HashType) -> Result<Transaction, ErrorObjectOwned> {
Err(not_yet_implemented_error())
async fn get_account(&self, account_id: AccountId) -> Result<Account, ErrorObjectOwned> {
Ok(self
.indexer
.store
.get_account_final(&account_id.into())
.map_err(db_error)?
.into())
}
async fn get_blocks(&self, _offset: u32, _limit: u32) -> Result<Vec<Block>, ErrorObjectOwned> {
Err(not_yet_implemented_error())
async fn get_transaction(&self, tx_hash: HashType) -> Result<Transaction, ErrorObjectOwned> {
Ok(self
.indexer
.store
.get_transaction_by_hash(tx_hash.0)
.map_err(db_error)?
.into())
}
async fn get_blocks(&self, offset: u32, limit: u32) -> Result<Vec<Block>, ErrorObjectOwned> {
let blocks = self
.indexer
.store
.get_block_batch(offset as u64, limit as u64)
.map_err(db_error)?;
let mut block_res = vec![];
for block in blocks {
block_res.push(block.into())
}
Ok(block_res)
}
async fn get_transactions_by_account(
&self,
_account_id: AccountId,
_limit: u32,
_offset: u32,
account_id: AccountId,
limit: u32,
offset: u32,
) -> Result<Vec<Transaction>, ErrorObjectOwned> {
Err(not_yet_implemented_error())
let transactions = self
.indexer
.store
.get_transactions_by_account(account_id.value, offset as u64, limit as u64)
.map_err(db_error)?;
let mut tx_res = vec![];
for tx in transactions {
tx_res.push(tx.into())
}
Ok(tx_res)
}
async fn healthcheck(&self) -> Result<(), ErrorObjectOwned> {
// Checking, that indexer can calculate last state
let _ = self.indexer.store.final_state().map_err(db_error)?;
Ok(())
}
}
@ -219,10 +269,18 @@ impl<T> Drop for Subscription<T> {
}
}
fn not_yet_implemented_error() -> ErrorObjectOwned {
pub fn not_yet_implemented_error() -> ErrorObjectOwned {
ErrorObject::owned(
ErrorCode::InternalError.code(),
"Not yet implemented",
Option::<String>::None,
)
}
fn db_error(err: anyhow::Error) -> ErrorObjectOwned {
ErrorObjectOwned::owned(
ErrorCode::InternalError.code(),
"DBError".to_string(),
Some(format!("{err:#?}")),
)
}

View File

@ -12,19 +12,22 @@ sequencer_runner.workspace = true
wallet.workspace = true
common.workspace = true
key_protocol.workspace = true
wallet-ffi.workspace = true
token_core.workspace = true
indexer_service.workspace = true
serde_json.workspace = true
token_core.workspace = true
indexer_service_rpc.workspace = true
wallet-ffi.workspace = true
url.workspace = true
anyhow.workspace = true
env_logger.workspace = true
log.workspace = true
serde_json.workspace = true
base64.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
hex.workspace = true
tempfile.workspace = true
borsh.workspace = true
bytesize.workspace = true
futures.workspace = true
testcontainers = { version = "0.27.0", features = ["docker-compose"] }

View File

@ -1,30 +1,38 @@
use std::{net::SocketAddr, path::PathBuf};
use std::{net::SocketAddr, path::PathBuf, time::Duration};
use anyhow::{Context, Result};
use indexer_service::{BackoffConfig, BedrockClientConfig, ChannelId, IndexerConfig};
use bytesize::ByteSize;
use common::block::{AccountInitialData, CommitmentsInitialData};
use indexer_service::{BackoffConfig, ChannelId, ClientConfig, IndexerConfig};
use key_protocol::key_management::KeyChain;
use nssa::{Account, AccountId, PrivateKey, PublicKey};
use nssa_core::{account::Data, program::DEFAULT_PROGRAM_ID};
use sequencer_core::config::{
AccountInitialData, BedrockConfig, CommitmentsInitialData, SequencerConfig,
};
use sequencer_core::config::{BedrockConfig, SequencerConfig};
use url::Url;
use wallet::config::{
InitialAccountData, InitialAccountDataPrivate, InitialAccountDataPublic, WalletConfig,
};
pub fn indexer_config(bedrock_addr: SocketAddr) -> Result<IndexerConfig> {
pub fn indexer_config(
bedrock_addr: SocketAddr,
home: PathBuf,
initial_data: &InitialData,
) -> Result<IndexerConfig> {
Ok(IndexerConfig {
resubscribe_interval_millis: 1000,
bedrock_client_config: BedrockClientConfig {
home,
consensus_info_polling_interval: Duration::from_secs(1),
bedrock_client_config: ClientConfig {
addr: addr_to_url(UrlProtocol::Http, bedrock_addr)
.context("Failed to convert bedrock addr to URL")?,
auth: None,
backoff: BackoffConfig {
start_delay_millis: 100,
start_delay: Duration::from_millis(100),
max_retries: 10,
},
},
initial_accounts: initial_data.sequencer_initial_accounts(),
initial_commitments: initial_data.sequencer_initial_commitments(),
signing_key: [37; 32],
channel_id: bedrock_channel_id(),
})
}
@ -32,16 +40,18 @@ pub fn indexer_config(bedrock_addr: SocketAddr) -> Result<IndexerConfig> {
/// Sequencer config options available for custom changes in integration tests.
pub struct SequencerPartialConfig {
pub max_num_tx_in_block: usize,
pub max_block_size: ByteSize,
pub mempool_max_size: usize,
pub block_create_timeout_millis: u64,
pub block_create_timeout: Duration,
}
impl Default for SequencerPartialConfig {
fn default() -> Self {
Self {
max_num_tx_in_block: 20,
max_block_size: ByteSize::mib(1),
mempool_max_size: 10_000,
block_create_timeout_millis: 10_000,
block_create_timeout: Duration::from_secs(10),
}
}
}
@ -55,8 +65,9 @@ pub fn sequencer_config(
) -> Result<SequencerConfig> {
let SequencerPartialConfig {
max_num_tx_in_block,
max_block_size,
mempool_max_size,
block_create_timeout_millis,
block_create_timeout,
} = partial;
Ok(SequencerConfig {
@ -65,16 +76,17 @@ pub fn sequencer_config(
genesis_id: 1,
is_genesis_random: true,
max_num_tx_in_block,
max_block_size,
mempool_max_size,
block_create_timeout_millis,
retry_pending_blocks_timeout_millis: 240_000,
block_create_timeout,
retry_pending_blocks_timeout: Duration::from_secs(120),
port: 0,
initial_accounts: initial_data.sequencer_initial_accounts(),
initial_commitments: initial_data.sequencer_initial_commitments(),
signing_key: [37; 32],
bedrock_config: BedrockConfig {
backoff: BackoffConfig {
start_delay_millis: 100,
start_delay: Duration::from_millis(100),
max_retries: 5,
},
channel_id: bedrock_channel_id(),
@ -95,7 +107,7 @@ pub fn wallet_config(
override_rust_log: None,
sequencer_addr: addr_to_url(UrlProtocol::Http, sequencer_addr)
.context("Failed to convert sequencer addr to URL")?,
seq_poll_timeout_millis: 30_000,
seq_poll_timeout: Duration::from_secs(30),
seq_tx_poll_max_blocks: 15,
seq_poll_max_retries: 10,
seq_block_poll_max_amount: 100,

View File

@ -10,6 +10,7 @@ use indexer_service::IndexerHandle;
use log::{debug, error, warn};
use nssa::{AccountId, PrivacyPreservingTransaction};
use nssa_core::Commitment;
use sequencer_core::indexer_client::{IndexerClient, IndexerClientTrait};
use sequencer_runner::SequencerHandle;
use tempfile::TempDir;
use testcontainers::compose::DockerCompose;
@ -20,6 +21,7 @@ pub mod config;
// TODO: Remove this and control time from tests
pub const TIME_TO_WAIT_FOR_BLOCK_SECONDS: u64 = 12;
pub const NSSA_PROGRAM_FOR_TEST_DATA_CHANGER: &str = "data_changer.bin";
pub const NSSA_PROGRAM_FOR_TEST_NOOP: &str = "noop.bin";
const BEDROCK_SERVICE_WITH_OPEN_PORT: &str = "logos-blockchain-node-0";
const BEDROCK_SERVICE_PORT: u16 = 18080;
@ -33,11 +35,13 @@ static LOGGER: LazyLock<()> = LazyLock::new(env_logger::init);
// NOTE: Order of fields is important for proper drop order.
pub struct TestContext {
sequencer_client: SequencerClient,
indexer_client: IndexerClient,
wallet: WalletCore,
wallet_password: String,
sequencer_handle: SequencerHandle,
indexer_handle: IndexerHandle,
bedrock_compose: DockerCompose,
_temp_indexer_dir: TempDir,
_temp_sequencer_dir: TempDir,
_temp_wallet_dir: TempDir,
}
@ -63,7 +67,7 @@ impl TestContext {
let (bedrock_compose, bedrock_addr) = Self::setup_bedrock_node().await?;
let indexer_handle = Self::setup_indexer(bedrock_addr)
let (indexer_handle, temp_indexer_dir) = Self::setup_indexer(bedrock_addr, &initial_data)
.await
.context("Failed to setup Indexer")?;
@ -83,16 +87,23 @@ impl TestContext {
let sequencer_url = config::addr_to_url(config::UrlProtocol::Http, sequencer_handle.addr())
.context("Failed to convert sequencer addr to URL")?;
let indexer_url = config::addr_to_url(config::UrlProtocol::Ws, indexer_handle.addr())
.context("Failed to convert indexer addr to URL")?;
let sequencer_client =
SequencerClient::new(sequencer_url).context("Failed to create sequencer client")?;
let indexer_client = IndexerClient::new(&indexer_url)
.await
.context("Failed to create indexer client")?;
Ok(Self {
sequencer_client,
indexer_client,
wallet,
wallet_password,
bedrock_compose,
sequencer_handle,
indexer_handle,
_temp_indexer_dir: temp_indexer_dir,
_temp_sequencer_dir: temp_sequencer_dir,
_temp_wallet_dir: temp_wallet_dir,
})
@ -163,13 +174,26 @@ impl TestContext {
Ok((compose, addr))
}
async fn setup_indexer(bedrock_addr: SocketAddr) -> Result<IndexerHandle> {
let indexer_config =
config::indexer_config(bedrock_addr).context("Failed to create Indexer config")?;
async fn setup_indexer(
bedrock_addr: SocketAddr,
initial_data: &config::InitialData,
) -> Result<(IndexerHandle, TempDir)> {
let temp_indexer_dir =
tempfile::tempdir().context("Failed to create temp dir for indexer home")?;
debug!("Using temp indexer home at {:?}", temp_indexer_dir.path());
let indexer_config = config::indexer_config(
bedrock_addr,
temp_indexer_dir.path().to_owned(),
initial_data,
)
.context("Failed to create Indexer config")?;
indexer_service::run_server(indexer_config, 0)
.await
.context("Failed to run Indexer Service")
.map(|handle| (handle, temp_indexer_dir))
}
async fn setup_sequencer(
@ -254,6 +278,11 @@ impl TestContext {
&self.sequencer_client
}
/// Get reference to the indexer client.
pub fn indexer_client(&self) -> &IndexerClient {
&self.indexer_client
}
/// Get existing public account IDs in the wallet.
pub fn existing_public_accounts(&self) -> Vec<AccountId> {
self.wallet
@ -279,9 +308,11 @@ impl Drop for TestContext {
sequencer_handle,
indexer_handle,
bedrock_compose,
_temp_indexer_dir: _,
_temp_sequencer_dir: _,
_temp_wallet_dir: _,
sequencer_client: _,
indexer_client: _,
wallet: _,
wallet_password: _,
} = self;

View File

@ -0,0 +1,185 @@
use std::time::Duration;
use anyhow::Result;
use bytesize::ByteSize;
use common::{block::HashableBlockData, transaction::NSSATransaction};
use integration_tests::{
TIME_TO_WAIT_FOR_BLOCK_SECONDS, TestContext, config::SequencerPartialConfig,
};
use nssa::program::Program;
use tokio::test;
#[test]
async fn reject_oversized_transaction() -> Result<()> {
let ctx = TestContext::builder()
.with_sequencer_partial_config(SequencerPartialConfig {
max_num_tx_in_block: 100,
max_block_size: ByteSize::mib(1),
mempool_max_size: 1000,
block_create_timeout: Duration::from_secs(10),
})
.build()
.await?;
// Create a transaction that's definitely too large
// Block size is 1 MiB (1,048,576 bytes), minus ~200 bytes for header = ~1,048,376 bytes max tx
// Create a 1.1 MiB binary to ensure it exceeds the limit
let oversized_binary = vec![0u8; 1100 * 1024]; // 1.1 MiB binary
let message = nssa::program_deployment_transaction::Message::new(oversized_binary);
let tx = nssa::ProgramDeploymentTransaction::new(message);
// Try to submit the transaction and expect an error
let result = ctx.sequencer_client().send_tx_program(tx).await;
assert!(
result.is_err(),
"Expected error when submitting oversized transaction"
);
let err = result.unwrap_err();
let err_str = format!("{:?}", err);
// Check if the error contains information about transaction being too large
assert!(
err_str.contains("TransactionTooLarge") || err_str.contains("too large"),
"Expected TransactionTooLarge error, got: {}",
err_str
);
Ok(())
}
#[test]
async fn accept_transaction_within_limit() -> Result<()> {
let ctx = TestContext::builder()
.with_sequencer_partial_config(SequencerPartialConfig {
max_num_tx_in_block: 100,
max_block_size: ByteSize::mib(1),
mempool_max_size: 1000,
block_create_timeout: Duration::from_secs(10),
})
.build()
.await?;
// Create a small program deployment that should fit
let small_binary = vec![0u8; 1024]; // 1 KiB binary
let message = nssa::program_deployment_transaction::Message::new(small_binary);
let tx = nssa::ProgramDeploymentTransaction::new(message);
// This should succeed
let result = ctx.sequencer_client().send_tx_program(tx).await;
assert!(
result.is_ok(),
"Expected successful submission of small transaction, got error: {:?}",
result.as_ref().unwrap_err()
);
Ok(())
}
#[test]
async fn transaction_deferred_to_next_block_when_current_full() -> Result<()> {
let manifest_dir = env!("CARGO_MANIFEST_DIR");
let artifacts_dir =
std::path::PathBuf::from(manifest_dir).join("../artifacts/test_program_methods");
let burner_bytecode = std::fs::read(artifacts_dir.join("burner.bin"))?;
let chain_caller_bytecode = std::fs::read(artifacts_dir.join("chain_caller.bin"))?;
// Calculate block size to fit only one of the two transactions, leaving some room for headers
// (e.g., 10 KiB)
let max_program_size = burner_bytecode.len().max(chain_caller_bytecode.len());
let block_size = ByteSize::b((max_program_size + 10 * 1024) as u64);
let ctx = TestContext::builder()
.with_sequencer_partial_config(SequencerPartialConfig {
max_num_tx_in_block: 100,
max_block_size: block_size,
mempool_max_size: 1000,
block_create_timeout: Duration::from_secs(10),
})
.build()
.await?;
let burner_id = Program::new(burner_bytecode.clone())?.id();
let chain_caller_id = Program::new(chain_caller_bytecode.clone())?.id();
let initial_block_height = ctx.sequencer_client().get_last_block().await?.last_block;
// Submit both program deployments
ctx.sequencer_client()
.send_tx_program(nssa::ProgramDeploymentTransaction::new(
nssa::program_deployment_transaction::Message::new(burner_bytecode),
))
.await?;
ctx.sequencer_client()
.send_tx_program(nssa::ProgramDeploymentTransaction::new(
nssa::program_deployment_transaction::Message::new(chain_caller_bytecode),
))
.await?;
// Wait for first block
tokio::time::sleep(Duration::from_secs(TIME_TO_WAIT_FOR_BLOCK_SECONDS)).await;
let block1_response = ctx
.sequencer_client()
.get_block(initial_block_height + 1)
.await?;
let block1: HashableBlockData = borsh::from_slice(&block1_response.block)?;
// Check which program is in block 1
let get_program_ids = |block: &HashableBlockData| -> Vec<nssa::ProgramId> {
block
.transactions
.iter()
.filter_map(|tx| {
if let NSSATransaction::ProgramDeployment(deployment) = tx {
let bytecode = deployment.message.clone().into_bytecode();
Program::new(bytecode).ok().map(|p| p.id())
} else {
None
}
})
.collect()
};
let block1_program_ids = get_program_ids(&block1);
// First program should be in block 1, but not both due to block size limit
assert_eq!(
block1_program_ids.len(),
1,
"Expected exactly one program deployment in block 1"
);
assert_eq!(
block1_program_ids[0], burner_id,
"Expected burner program to be deployed in block 1"
);
// Wait for second block
tokio::time::sleep(Duration::from_secs(TIME_TO_WAIT_FOR_BLOCK_SECONDS)).await;
let block2_response = ctx
.sequencer_client()
.get_block(initial_block_height + 2)
.await?;
let block2: HashableBlockData = borsh::from_slice(&block2_response.block)?;
let block2_program_ids = get_program_ids(&block2);
// The other program should be in block 2
assert_eq!(
block2_program_ids.len(),
1,
"Expected exactly one program deployment in block 2"
);
assert_eq!(
block2_program_ids[0], chain_caller_id,
"Expected chain_caller program to be deployed in block 2"
);
Ok(())
}

View File

@ -8,22 +8,22 @@ use wallet::cli::{Command, config::ConfigSubcommand};
async fn modify_config_field() -> Result<()> {
let mut ctx = TestContext::new().await?;
let old_seq_poll_timeout_millis = ctx.wallet().config().seq_poll_timeout_millis;
let old_seq_poll_timeout = ctx.wallet().config().seq_poll_timeout;
// Change config field
let command = Command::Config(ConfigSubcommand::Set {
key: "seq_poll_timeout_millis".to_string(),
value: "1000".to_string(),
key: "seq_poll_timeout".to_string(),
value: "1s".to_string(),
});
wallet::cli::execute_subcommand(ctx.wallet_mut(), command).await?;
let new_seq_poll_timeout_millis = ctx.wallet().config().seq_poll_timeout_millis;
assert_eq!(new_seq_poll_timeout_millis, 1000);
let new_seq_poll_timeout = ctx.wallet().config().seq_poll_timeout;
assert_eq!(new_seq_poll_timeout, std::time::Duration::from_secs(1));
// Return how it was at the beginning
let command = Command::Config(ConfigSubcommand::Set {
key: "seq_poll_timeout_millis".to_string(),
value: old_seq_poll_timeout_millis.to_string(),
key: "seq_poll_timeout".to_string(),
value: format!("{:?}", old_seq_poll_timeout),
});
wallet::cli::execute_subcommand(ctx.wallet_mut(), command).await?;

View File

@ -0,0 +1,177 @@
use std::time::Duration;
use anyhow::{Context, Result};
use indexer_service_rpc::RpcClient;
use integration_tests::{
TIME_TO_WAIT_FOR_BLOCK_SECONDS, TestContext, format_private_account_id,
format_public_account_id, verify_commitment_is_in_state,
};
use log::info;
use nssa::AccountId;
use tokio::test;
use wallet::cli::{Command, programs::native_token_transfer::AuthTransferSubcommand};
/// Timeout in milliseconds to reliably await for block finalization
const L2_TO_L1_TIMEOUT_MILLIS: u64 = 600000;
#[test]
async fn indexer_test_run() -> Result<()> {
let ctx = TestContext::new().await?;
// RUN OBSERVATION
tokio::time::sleep(std::time::Duration::from_millis(L2_TO_L1_TIMEOUT_MILLIS)).await;
let last_block_seq = ctx
.sequencer_client()
.get_last_block()
.await
.unwrap()
.last_block;
info!("Last block on seq now is {last_block_seq}");
let last_block_indexer = ctx
.indexer_client()
.get_last_finalized_block_id()
.await
.unwrap();
info!("Last block on ind now is {last_block_indexer}");
assert!(last_block_indexer > 1);
Ok(())
}
#[test]
async fn indexer_block_batching() -> Result<()> {
let ctx = TestContext::new().await?;
// WAIT
info!("Waiting for indexer to parse blocks");
tokio::time::sleep(std::time::Duration::from_millis(L2_TO_L1_TIMEOUT_MILLIS)).await;
let last_block_indexer = ctx
.indexer_client()
.get_last_finalized_block_id()
.await
.unwrap();
info!("Last block on ind now is {last_block_indexer}");
assert!(last_block_indexer > 1);
// Getting wide batch to fit all blocks
let block_batch = ctx.indexer_client().get_blocks(1, 100).await.unwrap();
// Checking chain consistency
let mut prev_block_hash = block_batch.first().unwrap().header.hash;
for block in &block_batch[1..] {
assert_eq!(block.header.prev_block_hash, prev_block_hash);
info!("Block {} chain-consistent", block.header.block_id);
prev_block_hash = block.header.hash;
}
Ok(())
}
#[test]
async fn indexer_state_consistency() -> Result<()> {
let mut ctx = TestContext::new().await?;
let command = Command::AuthTransfer(AuthTransferSubcommand::Send {
from: format_public_account_id(ctx.existing_public_accounts()[0]),
to: Some(format_public_account_id(ctx.existing_public_accounts()[1])),
to_npk: None,
to_vpk: None,
amount: 100,
});
wallet::cli::execute_subcommand(ctx.wallet_mut(), command).await?;
info!("Waiting for next block creation");
tokio::time::sleep(Duration::from_secs(TIME_TO_WAIT_FOR_BLOCK_SECONDS)).await;
info!("Checking correct balance move");
let acc_1_balance = ctx
.sequencer_client()
.get_account_balance(ctx.existing_public_accounts()[0])
.await?;
let acc_2_balance = ctx
.sequencer_client()
.get_account_balance(ctx.existing_public_accounts()[1])
.await?;
info!("Balance of sender: {acc_1_balance:#?}");
info!("Balance of receiver: {acc_2_balance:#?}");
assert_eq!(acc_1_balance.balance, 9900);
assert_eq!(acc_2_balance.balance, 20100);
let from: AccountId = ctx.existing_private_accounts()[0];
let to: AccountId = ctx.existing_private_accounts()[1];
let command = Command::AuthTransfer(AuthTransferSubcommand::Send {
from: format_private_account_id(from),
to: Some(format_private_account_id(to)),
to_npk: None,
to_vpk: None,
amount: 100,
});
wallet::cli::execute_subcommand(ctx.wallet_mut(), command).await?;
info!("Waiting for next block creation");
tokio::time::sleep(Duration::from_secs(TIME_TO_WAIT_FOR_BLOCK_SECONDS)).await;
let new_commitment1 = ctx
.wallet()
.get_private_account_commitment(from)
.context("Failed to get private account commitment for sender")?;
assert!(verify_commitment_is_in_state(new_commitment1, ctx.sequencer_client()).await);
let new_commitment2 = ctx
.wallet()
.get_private_account_commitment(to)
.context("Failed to get private account commitment for receiver")?;
assert!(verify_commitment_is_in_state(new_commitment2, ctx.sequencer_client()).await);
info!("Successfully transferred privately to owned account");
// WAIT
info!("Waiting for indexer to parse blocks");
tokio::time::sleep(std::time::Duration::from_millis(L2_TO_L1_TIMEOUT_MILLIS)).await;
let acc1_ind_state = ctx
.indexer_client()
.get_account(ctx.existing_public_accounts()[0].into())
.await
.unwrap();
let acc2_ind_state = ctx
.indexer_client()
.get_account(ctx.existing_public_accounts()[1].into())
.await
.unwrap();
info!("Checking correct state transition");
let acc1_seq_state = ctx
.sequencer_client()
.get_account(ctx.existing_public_accounts()[0])
.await?
.account;
let acc2_seq_state = ctx
.sequencer_client()
.get_account(ctx.existing_public_accounts()[1])
.await?
.account;
assert_eq!(acc1_ind_state, acc1_seq_state.into());
assert_eq!(acc2_ind_state, acc2_seq_state.into());
// ToDo: Check private state transition
Ok(())
}

View File

@ -1,6 +1,7 @@
use std::time::{Duration, Instant};
use anyhow::Result;
use bytesize::ByteSize;
use integration_tests::{
TestContext,
config::{InitialData, SequencerPartialConfig},
@ -178,8 +179,9 @@ impl TpsTestManager {
fn generate_sequencer_partial_config() -> SequencerPartialConfig {
SequencerPartialConfig {
max_num_tx_in_block: 300,
max_block_size: ByteSize::mb(500),
mempool_max_size: 10_000,
block_create_timeout_millis: 12_000,
block_create_timeout: Duration::from_secs(12),
}
}
}

View File

@ -2,13 +2,17 @@ use tokio::sync::mpsc::{Receiver, Sender};
pub struct MemPool<T> {
receiver: Receiver<T>,
front_buffer: Vec<T>,
}
impl<T> MemPool<T> {
pub fn new(max_size: usize) -> (Self, MemPoolHandle<T>) {
let (sender, receiver) = tokio::sync::mpsc::channel(max_size);
let mem_pool = Self { receiver };
let mem_pool = Self {
receiver,
front_buffer: Vec::new(),
};
let sender = MemPoolHandle::new(sender);
(mem_pool, sender)
}
@ -16,6 +20,13 @@ impl<T> MemPool<T> {
pub fn pop(&mut self) -> Option<T> {
use tokio::sync::mpsc::error::TryRecvError;
// First check if there are any items in the front buffer (LIFO)
if let Some(item) = self.front_buffer.pop() {
return Some(item);
}
// Otherwise, try to receive from the channel (FIFO)
match self.receiver.try_recv() {
Ok(item) => Some(item),
Err(TryRecvError::Empty) => None,
@ -24,6 +35,11 @@ impl<T> MemPool<T> {
}
}
}
/// Push an item to the front of the mempool (will be popped first)
pub fn push_front(&mut self, item: T) {
self.front_buffer.push(item);
}
}
pub struct MemPoolHandle<T> {
@ -96,4 +112,24 @@ mod tests {
assert_eq!(pool.pop(), Some(1));
assert_eq!(pool.pop(), Some(2));
}
#[test]
async fn test_push_front() {
let (mut pool, handle) = MemPool::new(10);
handle.push(1).await.unwrap();
handle.push(2).await.unwrap();
// Push items to the front - these should be popped first
pool.push_front(10);
pool.push_front(20);
// Items pushed to front are popped in LIFO order
assert_eq!(pool.pop(), Some(20));
assert_eq!(pool.pop(), Some(10));
// Original items are then popped in FIFO order
assert_eq!(pool.pop(), Some(1));
assert_eq!(pool.pop(), Some(2));
assert_eq!(pool.pop(), None);
}
}

View File

@ -11,6 +11,7 @@ serde.workspace = true
serde_with.workspace = true
thiserror.workspace = true
bytemuck.workspace = true
bytesize.workspace = true
base58.workspace = true
k256 = { workspace = true, optional = true }
chacha20 = { version = "0.9", default-features = false }

View File

@ -1,9 +1,10 @@
use std::ops::Deref;
use borsh::{BorshDeserialize, BorshSerialize};
use bytesize::ByteSize;
use serde::{Deserialize, Serialize};
pub const DATA_MAX_LENGTH_IN_BYTES: usize = 100 * 1024; // 100 KiB
pub const DATA_MAX_LENGTH: ByteSize = ByteSize::kib(100);
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, BorshSerialize)]
pub struct Data(Vec<u8>);
@ -22,7 +23,7 @@ impl Data {
let mut u32_bytes = [0u8; 4];
cursor.read_exact(&mut u32_bytes)?;
let data_length = u32::from_le_bytes(u32_bytes);
if data_length as usize > DATA_MAX_LENGTH_IN_BYTES {
if data_length as usize > DATA_MAX_LENGTH.as_u64() as usize {
return Err(
std::io::Error::new(std::io::ErrorKind::InvalidData, DataTooBigError).into(),
);
@ -35,7 +36,7 @@ impl Data {
}
#[derive(Debug, thiserror::Error, Clone, Copy, PartialEq, Eq)]
#[error("data length exceeds maximum allowed length of {DATA_MAX_LENGTH_IN_BYTES} bytes")]
#[error("data length exceeds maximum allowed length of {} bytes", DATA_MAX_LENGTH.as_u64())]
pub struct DataTooBigError;
impl From<Data> for Vec<u8> {
@ -48,7 +49,7 @@ impl TryFrom<Vec<u8>> for Data {
type Error = DataTooBigError;
fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
if value.len() > DATA_MAX_LENGTH_IN_BYTES {
if value.len() > DATA_MAX_LENGTH.as_u64() as usize {
Err(DataTooBigError)
} else {
Ok(Self(value))
@ -78,7 +79,7 @@ impl<'de> Deserialize<'de> for Data {
/// Data deserialization visitor.
///
/// Compared to a simple deserialization into a `Vec<u8>`, this visitor enforces
/// early length check defined by [`DATA_MAX_LENGTH_IN_BYTES`].
/// early length check defined by [`DATA_MAX_LENGTH`].
struct DataVisitor;
impl<'de> serde::de::Visitor<'de> for DataVisitor {
@ -88,7 +89,7 @@ impl<'de> Deserialize<'de> for Data {
write!(
formatter,
"a byte array with length not exceeding {} bytes",
DATA_MAX_LENGTH_IN_BYTES
DATA_MAX_LENGTH.as_u64()
)
}
@ -96,11 +97,14 @@ impl<'de> Deserialize<'de> for Data {
where
A: serde::de::SeqAccess<'de>,
{
let mut vec =
Vec::with_capacity(seq.size_hint().unwrap_or(0).min(DATA_MAX_LENGTH_IN_BYTES));
let mut vec = Vec::with_capacity(
seq.size_hint()
.unwrap_or(0)
.min(DATA_MAX_LENGTH.as_u64() as usize),
);
while let Some(value) = seq.next_element()? {
if vec.len() >= DATA_MAX_LENGTH_IN_BYTES {
if vec.len() >= DATA_MAX_LENGTH.as_u64() as usize {
return Err(serde::de::Error::custom(DataTooBigError));
}
vec.push(value);
@ -121,7 +125,7 @@ impl BorshDeserialize for Data {
let len = u32::deserialize_reader(reader)?;
match len {
0 => Ok(Self::default()),
len if len as usize > DATA_MAX_LENGTH_IN_BYTES => Err(std::io::Error::new(
len if len as usize > DATA_MAX_LENGTH.as_u64() as usize => Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
DataTooBigError,
)),
@ -140,21 +144,21 @@ mod tests {
#[test]
fn test_data_max_length_allowed() {
let max_vec = vec![0u8; DATA_MAX_LENGTH_IN_BYTES];
let max_vec = vec![0u8; DATA_MAX_LENGTH.as_u64() as usize];
let result = Data::try_from(max_vec);
assert!(result.is_ok());
}
#[test]
fn test_data_too_big_error() {
let big_vec = vec![0u8; DATA_MAX_LENGTH_IN_BYTES + 1];
let big_vec = vec![0u8; DATA_MAX_LENGTH.as_u64() as usize + 1];
let result = Data::try_from(big_vec);
assert!(matches!(result, Err(DataTooBigError)));
}
#[test]
fn test_borsh_deserialize_exceeding_limit_error() {
let too_big_data = vec![0u8; DATA_MAX_LENGTH_IN_BYTES + 1];
let too_big_data = vec![0u8; DATA_MAX_LENGTH.as_u64() as usize + 1];
let mut serialized = Vec::new();
<_ as BorshSerialize>::serialize(&too_big_data, &mut serialized).unwrap();
@ -164,7 +168,7 @@ mod tests {
#[test]
fn test_json_deserialize_exceeding_limit_error() {
let data = vec![0u8; DATA_MAX_LENGTH_IN_BYTES + 1];
let data = vec![0u8; DATA_MAX_LENGTH.as_u64() as usize + 1];
let json = serde_json::to_string(&data).unwrap();
let result: Result<Data, _> = serde_json::from_str(&json);

View File

@ -21,7 +21,7 @@ fn hash_value(value: &Value) -> Node {
}
#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
#[derive(BorshSerialize, BorshDeserialize)]
#[derive(Clone, BorshSerialize, BorshDeserialize)]
pub struct MerkleTree {
nodes: Vec<Node>,
capacity: usize,

View File

@ -7,7 +7,7 @@ use nssa_core::{
account::AccountWithMetadata,
program::{ChainedCall, InstructionData, ProgramId, ProgramOutput},
};
use risc0_zkvm::{ExecutorEnv, InnerReceipt, Receipt, default_prover};
use risc0_zkvm::{ExecutorEnv, InnerReceipt, ProverOpts, Receipt, default_prover};
use crate::{
error::NssaError,
@ -126,8 +126,9 @@ pub fn execute_and_prove(
env_builder.write(&circuit_input).unwrap();
let env = env_builder.build().unwrap();
let prover = default_prover();
let opts = ProverOpts::succinct();
let prove_info = prover
.prove(env, PRIVACY_PRESERVING_CIRCUIT_ELF)
.prove_with_opts(env, PRIVACY_PRESERVING_CIRCUIT_ELF, &opts)
.map_err(|e| NssaError::CircuitProvingError(e.to_string()))?;
let proof = Proof(borsh::to_vec(&prove_info.receipt.inner)?);

View File

@ -146,6 +146,16 @@ impl PrivacyPreservingTransaction {
.map(|(_, public_key)| AccountId::from(public_key))
.collect()
}
pub fn affected_public_account_ids(&self) -> Vec<AccountId> {
let mut acc_set = self
.signer_account_ids()
.into_iter()
.collect::<HashSet<_>>();
acc_set.extend(&self.message.public_account_ids);
acc_set.into_iter().collect()
}
}
fn check_privacy_preserving_circuit_proof_is_valid(

View File

@ -1,4 +1,5 @@
use borsh::{BorshDeserialize, BorshSerialize};
use nssa_core::account::AccountId;
use sha2::{Digest as _, digest::FixedOutput as _};
use crate::{
@ -38,4 +39,8 @@ impl ProgramDeploymentTransaction {
hasher.update(&bytes);
hasher.finalize_fixed().into()
}
pub fn affected_public_account_ids(&self) -> Vec<AccountId> {
vec![]
}
}

View File

@ -45,6 +45,16 @@ impl PublicTransaction {
.collect()
}
pub fn affected_public_account_ids(&self) -> Vec<AccountId> {
let mut acc_set = self
.signer_account_ids()
.into_iter()
.collect::<HashSet<_>>();
acc_set.extend(&self.message.account_ids);
acc_set.into_iter().collect()
}
pub fn hash(&self) -> [u8; 32] {
let bytes = self.to_bytes();
let mut hasher = sha2::Sha256::new();

View File

@ -16,7 +16,7 @@ use crate::{
pub const MAX_NUMBER_CHAINED_CALLS: usize = 10;
#[derive(BorshSerialize, BorshDeserialize)]
#[derive(Clone, BorshSerialize, BorshDeserialize)]
#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
pub(crate) struct CommitmentSet {
merkle_tree: MerkleTree,
@ -64,6 +64,7 @@ impl CommitmentSet {
}
#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
#[derive(Clone)]
struct NullifierSet(BTreeSet<Nullifier>);
impl NullifierSet {
@ -104,7 +105,7 @@ impl BorshDeserialize for NullifierSet {
}
}
#[derive(BorshSerialize, BorshDeserialize)]
#[derive(Clone, BorshSerialize, BorshDeserialize)]
#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
pub struct V02State {
public_state: HashMap<AccountId, Account>,
@ -1331,7 +1332,8 @@ pub mod tests {
AccountId::new([0; 32]),
);
let large_data: Vec<u8> = vec![0; nssa_core::account::data::DATA_MAX_LENGTH_IN_BYTES + 1];
let large_data: Vec<u8> =
vec![0; nssa_core::account::data::DATA_MAX_LENGTH.as_u64() as usize + 1];
let result = execute_and_prove(
vec![public_account],

View File

@ -16,6 +16,7 @@ base58.workspace = true
anyhow.workspace = true
serde.workspace = true
serde_json.workspace = true
humantime-serde.workspace = true
tempfile.workspace = true
chrono.workspace = true
log.workspace = true
@ -24,6 +25,7 @@ logos-blockchain-key-management-system-service.workspace = true
logos-blockchain-core.workspace = true
rand.workspace = true
borsh.workspace = true
bytesize.workspace = true
url.workspace = true
jsonrpsee = { workspace = true, features = ["ws-client"] }

View File

@ -28,6 +28,11 @@ pub trait BlockSettlementClientTrait: Clone {
/// Create and sign a transaction for inscribing data.
fn create_inscribe_tx(&self, block: &Block) -> Result<(SignedMantleTx, MsgId)> {
let inscription_data = borsh::to_vec(block)?;
log::info!(
"The size of the block {} is {} bytes",
block.header.block_id,
inscription_data.len()
);
let verifying_key_bytes = self.bedrock_signing_key().public_key().to_bytes();
let verifying_key =
Ed25519PublicKey::from_bytes(&verifying_key_bytes).expect("valid ed25519 public key");
@ -90,11 +95,17 @@ impl BlockSettlementClientTrait for BlockSettlementClient {
}
async fn submit_inscribe_tx_to_bedrock(&self, tx: SignedMantleTx) -> Result<()> {
let (parent_id, msg_id) = match tx.mantle_tx.ops.first() {
Some(Op::ChannelInscribe(inscribe)) => (inscribe.parent, inscribe.id()),
_ => panic!("Expected ChannelInscribe op"),
};
self.bedrock_client
.post_transaction(tx)
.await
.context("Failed to post transaction to Bedrock")?;
log::info!("Posted block to Bedrock with parent id {parent_id:?} and msg id: {msg_id:?}");
Ok(())
}

View File

@ -7,7 +7,7 @@ use common::{
transaction::NSSATransaction,
};
use nssa::V02State;
use storage::RocksDBIO;
use storage::sequencer::RocksDBIO;
pub struct SequencerStore {
dbio: RocksDBIO,

View File

@ -2,30 +2,21 @@ use std::{
fs::File,
io::BufReader,
path::{Path, PathBuf},
time::Duration,
};
use anyhow::Result;
pub use bedrock_client::BackoffConfig;
use common::config::BasicAuth;
use bedrock_client::BackoffConfig;
use bytesize::ByteSize;
use common::{
block::{AccountInitialData, CommitmentsInitialData},
config::BasicAuth,
};
use humantime_serde;
use logos_blockchain_core::mantle::ops::channel::ChannelId;
use nssa::AccountId;
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Debug, Serialize, Deserialize, Clone)]
/// Helperstruct for account serialization
pub struct AccountInitialData {
pub account_id: AccountId,
pub balance: u128,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
/// Helperstruct to initialize commitments
pub struct CommitmentsInitialData {
pub npk: nssa_core::NullifierPublicKey,
pub account: nssa_core::account::Account,
}
// TODO: Provide default values
#[derive(Clone, Serialize, Deserialize)]
pub struct SequencerConfig {
@ -39,12 +30,17 @@ pub struct SequencerConfig {
pub is_genesis_random: bool,
/// Maximum number of transactions in block
pub max_num_tx_in_block: usize,
/// Maximum block size (includes header and transactions)
#[serde(default = "default_max_block_size")]
pub max_block_size: ByteSize,
/// Mempool maximum size
pub mempool_max_size: usize,
/// Interval in which blocks produced
pub block_create_timeout_millis: u64,
#[serde(with = "humantime_serde")]
pub block_create_timeout: Duration,
/// Interval in which pending blocks are retried
pub retry_pending_blocks_timeout_millis: u64,
#[serde(with = "humantime_serde")]
pub retry_pending_blocks_timeout: Duration,
/// Port to listen
pub port: u16,
/// List of initial accounts data
@ -80,3 +76,7 @@ impl SequencerConfig {
Ok(serde_json::from_reader(reader)?)
}
}
fn default_max_block_size() -> ByteSize {
ByteSize::mib(1)
}

View File

@ -1,4 +1,4 @@
use std::{fmt::Display, path::Path, time::Instant};
use std::{path::Path, time::Instant};
use anyhow::{Context as _, Result, anyhow};
use bedrock_client::SignedMantleTx;
@ -13,7 +13,6 @@ use config::SequencerConfig;
use log::{error, info, warn};
use logos_blockchain_key_management_system_service::keys::{ED25519_SECRET_KEY_SIZE, Ed25519Key};
use mempool::{MemPool, MemPoolHandle};
use serde::{Deserialize, Serialize};
use crate::{
block_settlement_client::{BlockSettlementClient, BlockSettlementClientTrait, MsgId},
@ -44,20 +43,6 @@ pub struct SequencerCore<
indexer_client: IC,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum TransactionMalformationError {
InvalidSignature,
FailedToDecode { tx: HashType },
}
impl Display for TransactionMalformationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:#?}")
}
}
impl std::error::Error for TransactionMalformationError {}
impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, IC> {
/// Starts the sequencer using the provided configuration.
/// If an existing database is found, the sequencer state is loaded from it and
@ -177,9 +162,19 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, I
}
pub async fn produce_new_block(&mut self) -> Result<u64> {
let (_tx, _msg_id) = self
let (tx, _msg_id) = self
.produce_new_block_with_mempool_transactions()
.context("Failed to produce new block with mempool transactions")?;
match self
.block_settlement_client
.submit_inscribe_tx_to_bedrock(tx)
.await
{
Ok(()) => {}
Err(err) => {
error!("Failed to post block data to Bedrock with error: {err:#}");
}
}
Ok(self.chain_height)
}
@ -194,13 +189,49 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, I
let mut valid_transactions = vec![];
let max_block_size = self.sequencer_config.max_block_size.as_u64() as usize;
let latest_block_meta = self
.store
.latest_block_meta()
.context("Failed to get latest block meta from store")?;
let curr_time = chrono::Utc::now().timestamp_millis() as u64;
while let Some(tx) = self.mempool.pop() {
let tx_hash = tx.hash();
// Check if block size exceeds limit
let temp_valid_transactions =
[valid_transactions.as_slice(), std::slice::from_ref(&tx)].concat();
let temp_hashable_data = HashableBlockData {
block_id: new_block_height,
transactions: temp_valid_transactions,
prev_block_hash: latest_block_meta.hash,
timestamp: curr_time,
};
let block_size = borsh::to_vec(&temp_hashable_data)
.context("Failed to serialize block for size check")?
.len();
if block_size > max_block_size {
// Block would exceed size limit, remove last transaction and push back
warn!(
"Transaction with hash {tx_hash} deferred to next block: \
block size {block_size} bytes would exceed limit of {max_block_size} bytes",
);
self.mempool.push_front(tx);
break;
}
match self.execute_check_transaction_on_state(tx) {
Ok(valid_tx) => {
info!("Validated transaction with hash {tx_hash}, including it in block",);
valid_transactions.push(valid_tx);
info!("Validated transaction with hash {tx_hash}, including it in block");
if valid_transactions.len() >= self.sequencer_config.max_num_tx_in_block {
break;
}
@ -214,13 +245,6 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, I
}
}
let latest_block_meta = self
.store
.latest_block_meta()
.context("Failed to get latest block meta from store")?;
let curr_time = chrono::Utc::now().timestamp_millis() as u64;
let hashable_data = HashableBlockData {
block_id: new_block_height,
transactions: valid_transactions,
@ -314,30 +338,6 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, I
}
}
// TODO: Introduce type-safe wrapper around checked transaction, e.g. AuthenticatedTransaction
pub fn transaction_pre_check(
tx: NSSATransaction,
) -> Result<NSSATransaction, TransactionMalformationError> {
// Stateless checks here
match tx {
NSSATransaction::Public(tx) => {
if tx.witness_set().is_valid_for(tx.message()) {
Ok(NSSATransaction::Public(tx))
} else {
Err(TransactionMalformationError::InvalidSignature)
}
}
NSSATransaction::PrivacyPreserving(tx) => {
if tx.witness_set().signatures_are_valid_for(tx.message()) {
Ok(NSSATransaction::PrivacyPreserving(tx))
} else {
Err(TransactionMalformationError::InvalidSignature)
}
}
NSSATransaction::ProgramDeployment(tx) => Ok(NSSATransaction::ProgramDeployment(tx)),
}
}
/// Load signing key from file or generate a new one if it doesn't exist
fn load_or_create_signing_key(path: &Path) -> Result<Ed25519Key> {
if path.exists() {
@ -360,19 +360,21 @@ fn load_or_create_signing_key(path: &Path) -> Result<Ed25519Key> {
#[cfg(all(test, feature = "mock"))]
mod tests {
use std::{pin::pin, str::FromStr as _};
use std::{pin::pin, str::FromStr as _, time::Duration};
use base58::ToBase58;
use bedrock_client::BackoffConfig;
use common::{test_utils::sequencer_sign_key_for_testing, transaction::NSSATransaction};
use common::{
block::AccountInitialData, test_utils::sequencer_sign_key_for_testing,
transaction::NSSATransaction,
};
use logos_blockchain_core::mantle::ops::channel::ChannelId;
use mempool::MemPoolHandle;
use nssa::{AccountId, PrivateKey};
use crate::{
config::{AccountInitialData, BedrockConfig, SequencerConfig},
config::{BedrockConfig, SequencerConfig},
mock::SequencerCoreWithMockClients,
transaction_pre_check,
};
fn setup_sequencer_config_variable_initial_accounts(
@ -387,22 +389,23 @@ mod tests {
genesis_id: 1,
is_genesis_random: false,
max_num_tx_in_block: 10,
max_block_size: bytesize::ByteSize::mib(1),
mempool_max_size: 10000,
block_create_timeout_millis: 1000,
block_create_timeout: Duration::from_secs(1),
port: 8080,
initial_accounts,
initial_commitments: vec![],
signing_key: *sequencer_sign_key_for_testing().value(),
bedrock_config: BedrockConfig {
backoff: BackoffConfig {
start_delay_millis: 100,
start_delay: Duration::from_millis(100),
max_retries: 5,
},
channel_id: ChannelId::from([0; 32]),
node_url: "http://not-used-in-unit-tests".parse().unwrap(),
auth: None,
},
retry_pending_blocks_timeout_millis: 1000 * 60 * 4,
retry_pending_blocks_timeout: Duration::from_secs(60 * 4),
indexer_rpc_url: "ws://localhost:8779".parse().unwrap(),
}
}
@ -526,7 +529,7 @@ mod tests {
#[test]
fn test_transaction_pre_check_pass() {
let tx = common::test_utils::produce_dummy_empty_transaction();
let result = transaction_pre_check(tx);
let result = tx.transaction_stateless_check();
assert!(result.is_ok());
}
@ -543,7 +546,7 @@ mod tests {
let tx = common::test_utils::create_transaction_native_token_transfer(
acc1, 0, acc2, 10, sign_key1,
);
let result = transaction_pre_check(tx);
let result = tx.transaction_stateless_check();
assert!(result.is_ok());
}
@ -562,7 +565,7 @@ mod tests {
);
// Signature is valid, stateless check pass
let tx = transaction_pre_check(tx).unwrap();
let tx = tx.transaction_stateless_check().unwrap();
// Signature is not from sender. Execution fails
let result = sequencer.execute_check_transaction_on_state(tx);
@ -586,7 +589,7 @@ mod tests {
acc1, 0, acc2, 10000000, sign_key1,
);
let result = transaction_pre_check(tx);
let result = tx.transaction_stateless_check();
// Passed pre-check
assert!(result.is_ok());

View File

@ -8,7 +8,8 @@ license = { workspace = true }
nssa.workspace = true
common.workspace = true
mempool.workspace = true
sequencer_core.workspace = true
sequencer_core = { workspace = true }
bedrock_client.workspace = true
anyhow.workspace = true
serde_json.workspace = true
@ -24,6 +25,7 @@ itertools.workspace = true
actix-web.workspace = true
tokio.workspace = true
borsh.workspace = true
bytesize.workspace = true
[dev-dependencies]
sequencer_core = { workspace = true, features = ["mock"] }

View File

@ -28,6 +28,7 @@ pub struct JsonHandler<
> {
sequencer_state: Arc<Mutex<SequencerCore<BC, IC>>>,
mempool_handle: MemPoolHandle<NSSATransaction>,
max_block_size: usize,
}
fn respond<T: Serialize>(val: T) -> Result<Value, RpcErr> {

View File

@ -52,7 +52,7 @@ fn get_cors(cors_allowed_origins: &[String]) -> Cors {
.max_age(3600)
}
pub fn new_http_server(
pub async fn new_http_server(
config: RpcConfig,
seuquencer_core: Arc<Mutex<SequencerCore>>,
mempool_handle: MemPoolHandle<NSSATransaction>,
@ -63,9 +63,16 @@ pub fn new_http_server(
limits_config,
} = config;
info!(target:NETWORK, "Starting HTTP server at {addr}");
let max_block_size = seuquencer_core
.lock()
.await
.sequencer_config()
.max_block_size
.as_u64() as usize;
let handler = web::Data::new(JsonHandler {
sequencer_state: seuquencer_core.clone(),
mempool_handle,
max_block_size,
});
// HTTP server
@ -73,7 +80,10 @@ pub fn new_http_server(
App::new()
.wrap(get_cors(&cors_allowed_origins))
.app_data(handler.clone())
.app_data(web::JsonConfig::default().limit(limits_config.json_payload_max_size))
.app_data(
web::JsonConfig::default()
.limit(limits_config.json_payload_max_size.as_u64() as usize),
)
.wrap(middleware::Logger::default())
.service(web::resource("/").route(web::post().to(rpc_handler::<JsonHandler>)))
})

View File

@ -3,7 +3,7 @@ use std::collections::HashMap;
use actix_web::Error as HttpError;
use base64::{Engine, engine::general_purpose};
use common::{
block::HashableBlockData,
block::{AccountInitialData, HashableBlockData},
rpc_primitives::{
errors::RpcError,
message::{Message, Request},
@ -20,14 +20,13 @@ use common::{
SendTxResponse,
},
},
transaction::NSSATransaction,
transaction::{NSSATransaction, TransactionMalformationError},
};
use itertools::Itertools as _;
use log::warn;
use nssa::{self, program::Program};
use sequencer_core::{
block_settlement_client::BlockSettlementClientTrait, config::AccountInitialData,
indexer_client::IndexerClientTrait,
block_settlement_client::BlockSettlementClientTrait, indexer_client::IndexerClientTrait,
};
use serde_json::Value;
@ -95,7 +94,25 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> JsonHandler<BC, IC>
let tx = borsh::from_slice::<NSSATransaction>(&send_tx_req.transaction).unwrap();
let tx_hash = tx.hash();
let authenticated_tx = sequencer_core::transaction_pre_check(tx)
// Check transaction size against block size limit
// Reserve ~200 bytes for block header overhead
const BLOCK_HEADER_OVERHEAD: usize = 200;
let tx_size = borsh::to_vec(&tx)
.map_err(|_| TransactionMalformationError::FailedToDecode { tx: tx_hash })?
.len();
let max_tx_size = self.max_block_size.saturating_sub(BLOCK_HEADER_OVERHEAD);
if tx_size > max_tx_size {
return Err(TransactionMalformationError::TransactionTooLarge {
size: tx_size,
max: max_tx_size,
}
.into());
}
let authenticated_tx = tx
.transaction_stateless_check()
.inspect_err(|err| warn!("Error at pre_check {err:#?}"))?;
// TODO: Do we need a timeout here? It will be usable if we have too many transactions to
@ -323,16 +340,18 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> JsonHandler<BC, IC>
#[cfg(test)]
mod tests {
use std::{str::FromStr as _, sync::Arc};
use std::{str::FromStr as _, sync::Arc, time::Duration};
use base58::ToBase58;
use base64::{Engine, engine::general_purpose};
use bedrock_client::BackoffConfig;
use common::{
config::BasicAuth, test_utils::sequencer_sign_key_for_testing, transaction::NSSATransaction,
block::AccountInitialData, config::BasicAuth, test_utils::sequencer_sign_key_for_testing,
transaction::NSSATransaction,
};
use nssa::AccountId;
use sequencer_core::{
config::{AccountInitialData, BackoffConfig, BedrockConfig, SequencerConfig},
config::{BedrockConfig, SequencerConfig},
mock::{MockBlockSettlementClient, MockIndexerClient, SequencerCoreWithMockClients},
};
use serde_json::Value;
@ -375,16 +394,17 @@ mod tests {
genesis_id: 1,
is_genesis_random: false,
max_num_tx_in_block: 10,
max_block_size: bytesize::ByteSize::mib(1),
mempool_max_size: 1000,
block_create_timeout_millis: 1000,
block_create_timeout: Duration::from_secs(1),
port: 8080,
initial_accounts,
initial_commitments: vec![],
signing_key: *sequencer_sign_key_for_testing().value(),
retry_pending_blocks_timeout_millis: 1000 * 60 * 4,
retry_pending_blocks_timeout: Duration::from_secs(60 * 4),
bedrock_config: BedrockConfig {
backoff: BackoffConfig {
start_delay_millis: 100,
start_delay: Duration::from_millis(100),
max_retries: 5,
},
channel_id: [42; 32].into(),
@ -435,12 +455,14 @@ mod tests {
.produce_new_block_with_mempool_transactions()
.unwrap();
let max_block_size = sequencer_core.sequencer_config().max_block_size.as_u64() as usize;
let sequencer_core = Arc::new(Mutex::new(sequencer_core));
(
JsonHandlerWithMockClients {
sequencer_state: sequencer_core,
mempool_handle,
max_block_size,
},
initial_accounts,
tx,

View File

@ -1,6 +1,8 @@
use common::rpc_primitives::errors::{RpcError, RpcParseError};
use common::{
rpc_primitives::errors::{RpcError, RpcParseError},
transaction::TransactionMalformationError,
};
use log::debug;
use sequencer_core::TransactionMalformationError;
pub struct RpcErr(pub RpcError);
@ -42,10 +44,7 @@ impl RpcErrKind for RpcErrInternal {
impl RpcErrKind for TransactionMalformationError {
fn into_rpc_err(self) -> RpcError {
RpcError::new_internal_error(
Some(serde_json::to_value(self).unwrap()),
"transaction not accepted",
)
RpcError::invalid_params(Some(serde_json::to_value(self).unwrap()))
}
}

View File

@ -14,13 +14,24 @@ RUN apt-get update && apt-get install -y \
git \
&& rm -rf /var/lib/apt/lists/*
# Install r0vm (manual build as it's portable across different host platforms)
RUN git clone --depth 1 --branch release-3.0 https://github.com/risc0/risc0.git
RUN git clone --depth 1 --branch r0.1.91.1 https://github.com/risc0/rust.git
WORKDIR /risc0
RUN cargo install --path rzup
RUN rzup build --path /rust rust --verbose
RUN cargo install --path risc0/cargo-risczero
# Install r0vm
# Use quick install for x86-64 (risczero provides binaries only for this linux platform)
# Manual build for other platforms (including arm64 Linux)
RUN ARCH=$(uname -m); \
if [ "$ARCH" = "x86_64" ]; then \
echo "Using quick install for $ARCH"; \
curl -L https://risczero.com/install | bash; \
export PATH="/root/.cargo/bin:/root/.risc0/bin:${PATH}"; \
rzup install; \
else \
echo "Using manual build for $ARCH"; \
git clone --depth 1 --branch release-3.0 https://github.com/risc0/risc0.git; \
git clone --depth 1 --branch r0.1.91.1 https://github.com/risc0/rust.git; \
cd /risc0; \
cargo install --path rzup; \
rzup build --path /rust rust --verbose; \
cargo install --path risc0/cargo-risczero; \
fi
ENV PATH="/root/.cargo/bin:/root/.risc0/bin:${PATH}"
RUN cp "$(which r0vm)" /usr/local/bin/r0vm
RUN test -x /usr/local/bin/r0vm
@ -31,6 +42,9 @@ RUN curl -sSL https://raw.githubusercontent.com/logos-blockchain/logos-blockchai
WORKDIR /sequencer_runner
# Build argument to enable standalone feature (defaults to false)
ARG STANDALONE=false
# Planner stage - generates dependency recipe
FROM chef AS planner
COPY . .
@ -38,15 +52,24 @@ RUN cargo chef prepare --bin sequencer_runner --recipe-path recipe.json
# Builder stage - builds dependencies and application
FROM chef AS builder
ARG STANDALONE
COPY --from=planner /sequencer_runner/recipe.json recipe.json
# Build dependencies only (this layer will be cached)
RUN cargo chef cook --bin sequencer_runner --release --recipe-path recipe.json
RUN if [ "$STANDALONE" = "true" ]; then \
cargo chef cook --bin sequencer_runner --features standalone --release --recipe-path recipe.json; \
else \
cargo chef cook --bin sequencer_runner --release --recipe-path recipe.json; \
fi
# Copy source code
COPY . .
# Build the actual application
RUN cargo build --release --bin sequencer_runner
RUN if [ "$STANDALONE" = "true" ]; then \
cargo build --release --features standalone --bin sequencer_runner; \
else \
cargo build --release --bin sequencer_runner; \
fi
# Strip debug symbols to reduce binary size
RUN strip /sequencer_runner/target/release/sequencer_runner

View File

@ -1,16 +1,17 @@
{
"home": ".",
"home": "./sequencer_runner",
"override_rust_log": null,
"genesis_id": 1,
"is_genesis_random": true,
"max_num_tx_in_block": 20,
"max_block_size": "1 MiB",
"mempool_max_size": 1000,
"block_create_timeout_millis": 12000,
"retry_pending_blocks_timeout_millis": 6000,
"block_create_timeout": "15s",
"retry_pending_blocks_timeout": "5s",
"port": 3040,
"bedrock_config": {
"backoff": {
"start_delay_millis": 100,
"start_delay": "100ms",
"max_retries": 5
},
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101",

View File

@ -4,13 +4,14 @@
"genesis_id": 1,
"is_genesis_random": true,
"max_num_tx_in_block": 20,
"max_block_size": "1 MiB",
"mempool_max_size": 10000,
"block_create_timeout_millis": 10000,
"block_create_timeout": "10s",
"port": 3040,
"retry_pending_blocks_timeout_millis": 7000,
"retry_pending_blocks_timeout": "7s",
"bedrock_config": {
"backoff": {
"start_delay_millis": 100,
"start_delay": "100ms",
"max_retries": 5
},
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101",

View File

@ -99,9 +99,8 @@ impl Drop for SequencerHandle {
}
pub async fn startup_sequencer(app_config: SequencerConfig) -> Result<SequencerHandle> {
let block_timeout = Duration::from_millis(app_config.block_create_timeout_millis);
let retry_pending_blocks_timeout =
Duration::from_millis(app_config.retry_pending_blocks_timeout_millis);
let block_timeout = app_config.block_create_timeout;
let retry_pending_blocks_timeout = app_config.retry_pending_blocks_timeout;
let port = app_config.port;
let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config).await;
@ -114,11 +113,20 @@ pub async fn startup_sequencer(app_config: SequencerConfig) -> Result<SequencerH
RpcConfig::with_port(port),
Arc::clone(&seq_core_wrapped),
mempool_handle,
)?;
)
.await?;
info!("HTTP server started");
let http_server_handle = http_server.handle();
tokio::spawn(http_server);
#[cfg(not(feature = "standalone"))]
{
info!("Submitting stored pending blocks");
retry_pending_blocks(&seq_core_wrapped)
.await
.expect("Failed to submit pending blocks on startup");
}
info!("Starting main sequencer loop");
let main_loop_handle = tokio::spawn(main_loop(Arc::clone(&seq_core_wrapped), block_timeout));
@ -159,6 +167,50 @@ async fn main_loop(seq_core: Arc<Mutex<SequencerCore>>, block_timeout: Duration)
}
}
#[cfg(not(feature = "standalone"))]
async fn retry_pending_blocks(seq_core: &Arc<Mutex<SequencerCore>>) -> Result<()> {
use std::time::Instant;
use log::debug;
let (pending_blocks, block_settlement_client) = {
let sequencer_core = seq_core.lock().await;
let client = sequencer_core.block_settlement_client();
let pending_blocks = sequencer_core
.get_pending_blocks()
.expect("Sequencer should be able to retrieve pending blocks");
(pending_blocks, client)
};
for block in pending_blocks.iter() {
info!(
"Resubmitting pending block with id {}",
block.header.block_id
);
// TODO: We could cache the inscribe tx for each pending block to avoid re-creating it
// on every retry.
let now = Instant::now();
let (tx, _msg_id) = block_settlement_client
.create_inscribe_tx(block)
.context("Failed to create inscribe tx for pending block")?;
debug!(">>>> Create inscribe: {:?}", now.elapsed());
let now = Instant::now();
if let Err(e) = block_settlement_client
.submit_inscribe_tx_to_bedrock(tx)
.await
{
warn!(
"Failed to resubmit block with id {} with error {e:#}",
block.header.block_id
);
}
debug!(">>>> Post: {:?}", now.elapsed());
}
Ok(())
}
#[cfg(not(feature = "standalone"))]
async fn retry_pending_blocks_loop(
seq_core: Arc<Mutex<SequencerCore>>,
@ -166,40 +218,7 @@ async fn retry_pending_blocks_loop(
) -> Result<Never> {
loop {
tokio::time::sleep(retry_pending_blocks_timeout).await;
let (pending_blocks, block_settlement_client) = {
let sequencer_core = seq_core.lock().await;
let client = sequencer_core.block_settlement_client();
let pending_blocks = sequencer_core
.get_pending_blocks()
.expect("Sequencer should be able to retrieve pending blocks");
(pending_blocks, client)
};
if let Some(block) = pending_blocks
.iter()
.min_by_key(|block| block.header.block_id)
{
info!(
"Resubmitting pending block with id {}",
block.header.block_id
);
// TODO: We could cache the inscribe tx for each pending block to avoid re-creating
// it on every retry.
let (tx, _msg_id) = block_settlement_client
.create_inscribe_tx(block)
.context("Failed to create inscribe tx for pending block")?;
if let Err(e) = block_settlement_client
.submit_inscribe_tx_to_bedrock(tx)
.await
{
warn!(
"Failed to resubmit block with id {} with error {e:#}",
block.header.block_id
);
}
}
retry_pending_blocks(&seq_core).await?;
}
}

View File

@ -6,8 +6,9 @@ license = { workspace = true }
[dependencies]
common.workspace = true
nssa.workspace = true
thiserror.workspace = true
borsh.workspace = true
rocksdb.workspace = true
nssa.workspace = true
tempfile.workspace = true

1306
storage/src/indexer.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,601 +1,3 @@
use std::{path::Path, sync::Arc};
use common::block::{BedrockStatus, Block, BlockMeta, MantleMsgId};
use error::DbError;
use nssa::V02State;
use rocksdb::{
BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options, WriteBatch,
};
pub mod error;
/// Maximal size of stored blocks in base
///
/// Used to control db size
///
/// Currently effectively unbounded.
pub const BUFF_SIZE_ROCKSDB: usize = usize::MAX;
/// Size of stored blocks cache in memory
///
/// Keeping small to not run out of memory
pub const CACHE_SIZE: usize = 1000;
/// Key base for storing metainformation about id of first block in db
pub const DB_META_FIRST_BLOCK_IN_DB_KEY: &str = "first_block_in_db";
/// Key base for storing metainformation about id of last current block in db
pub const DB_META_LAST_BLOCK_IN_DB_KEY: &str = "last_block_in_db";
/// Key base for storing metainformation which describe if first block has been set
pub const DB_META_FIRST_BLOCK_SET_KEY: &str = "first_block_set";
/// Key base for storing metainformation about the last finalized block on Bedrock
pub const DB_META_LAST_FINALIZED_BLOCK_ID: &str = "last_finalized_block_id";
/// Key base for storing metainformation about the latest block meta
pub const DB_META_LATEST_BLOCK_META_KEY: &str = "latest_block_meta";
/// Key base for storing the NSSA state
pub const DB_NSSA_STATE_KEY: &str = "nssa_state";
/// Name of block column family
pub const CF_BLOCK_NAME: &str = "cf_block";
/// Name of meta column family
pub const CF_META_NAME: &str = "cf_meta";
/// Name of state column family
pub const CF_NSSA_STATE_NAME: &str = "cf_nssa_state";
pub type DbResult<T> = Result<T, DbError>;
pub struct RocksDBIO {
pub db: DBWithThreadMode<MultiThreaded>,
}
impl RocksDBIO {
pub fn open_or_create(
path: &Path,
start_block: Option<(&Block, MantleMsgId)>,
) -> DbResult<Self> {
let mut cf_opts = Options::default();
cf_opts.set_max_write_buffer_number(16);
// ToDo: Add more column families for different data
let cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone());
let cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone());
let cfstate = ColumnFamilyDescriptor::new(CF_NSSA_STATE_NAME, cf_opts.clone());
let mut db_opts = Options::default();
db_opts.create_missing_column_families(true);
db_opts.create_if_missing(true);
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&db_opts,
path,
vec![cfb, cfmeta, cfstate],
);
let dbio = Self {
// There is no point in handling this from runner code
db: db.unwrap(),
};
let is_start_set = dbio.get_meta_is_first_block_set()?;
if is_start_set {
Ok(dbio)
} else if let Some((block, msg_id)) = start_block {
let block_id = block.header.block_id;
dbio.put_meta_first_block_in_db(block, msg_id)?;
dbio.put_meta_is_first_block_set()?;
dbio.put_meta_last_block_in_db(block_id)?;
dbio.put_meta_last_finalized_block_id(None)?;
dbio.put_meta_latest_block_meta(&BlockMeta {
id: block.header.block_id,
hash: block.header.hash,
msg_id,
})?;
Ok(dbio)
} else {
// Here we are trying to start a DB without a block, one should not do it.
unreachable!()
}
}
pub fn destroy(path: &Path) -> DbResult<()> {
let mut cf_opts = Options::default();
cf_opts.set_max_write_buffer_number(16);
// ToDo: Add more column families for different data
let _cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone());
let _cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone());
let _cfstate = ColumnFamilyDescriptor::new(CF_NSSA_STATE_NAME, cf_opts.clone());
let mut db_opts = Options::default();
db_opts.create_missing_column_families(true);
db_opts.create_if_missing(true);
DBWithThreadMode::<MultiThreaded>::destroy(&db_opts, path)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))
}
pub fn meta_column(&self) -> Arc<BoundColumnFamily<'_>> {
self.db.cf_handle(CF_META_NAME).unwrap()
}
pub fn block_column(&self) -> Arc<BoundColumnFamily<'_>> {
self.db.cf_handle(CF_BLOCK_NAME).unwrap()
}
pub fn nssa_state_column(&self) -> Arc<BoundColumnFamily<'_>> {
self.db.cf_handle(CF_NSSA_STATE_NAME).unwrap()
}
pub fn get_meta_first_block_in_db(&self) -> DbResult<u64> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_FIRST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<u64>(&data).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to deserialize first block".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"First block not found".to_string(),
))
}
}
pub fn get_meta_last_block_in_db(&self) -> DbResult<u64> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<u64>(&data).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to deserialize last block".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"Last block not found".to_string(),
))
}
}
pub fn get_meta_is_first_block_set(&self) -> DbResult<bool> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_SET_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_FIRST_BLOCK_SET_KEY".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(res.is_some())
}
pub fn put_nssa_state_in_db(&self, state: &V02State, batch: &mut WriteBatch) -> DbResult<()> {
let cf_nssa_state = self.nssa_state_column();
batch.put_cf(
&cf_nssa_state,
borsh::to_vec(&DB_NSSA_STATE_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_NSSA_STATE_KEY".to_string()),
)
})?,
borsh::to_vec(state).map_err(|err| {
DbError::borsh_cast_message(err, Some("Failed to serialize NSSA state".to_string()))
})?,
);
Ok(())
}
pub fn put_meta_first_block_in_db(&self, block: &Block, msg_id: MantleMsgId) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_FIRST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
borsh::to_vec(&block.header.block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize first block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
let mut batch = WriteBatch::default();
self.put_block(block, msg_id, true, &mut batch)?;
self.db.write(batch).map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
Some("Failed to write first block in db".to_string()),
)
})?;
Ok(())
}
pub fn put_meta_last_block_in_db(&self, block_id: u64) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize last block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
fn put_meta_last_block_in_db_batch(
&self,
block_id: u64,
batch: &mut WriteBatch,
) -> DbResult<()> {
let cf_meta = self.meta_column();
batch.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize last block id".to_string()),
)
})?,
);
Ok(())
}
pub fn put_meta_last_finalized_block_id(&self, block_id: Option<u64>) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_FINALIZED_BLOCK_ID).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LAST_FINALIZED_BLOCK_ID".to_string()),
)
})?,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize last block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
pub fn put_meta_is_first_block_set(&self) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_SET_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_FIRST_BLOCK_SET_KEY".to_string()),
)
})?,
[1u8; 1],
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
fn put_meta_latest_block_meta(&self, block_meta: &BlockMeta) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_string()),
)
})?,
borsh::to_vec(&block_meta).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize latest block meta".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
fn put_meta_latest_block_meta_batch(
&self,
block_meta: &BlockMeta,
batch: &mut WriteBatch,
) -> DbResult<()> {
let cf_meta = self.meta_column();
batch.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_string()),
)
})?,
borsh::to_vec(&block_meta).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize latest block meta".to_string()),
)
})?,
);
Ok(())
}
pub fn latest_block_meta(&self) -> DbResult<BlockMeta> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(
&cf_meta,
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<BlockMeta>(&data).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to deserialize latest block meta".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"Latest block meta not found".to_string(),
))
}
}
pub fn put_block(
&self,
block: &Block,
msg_id: MantleMsgId,
first: bool,
batch: &mut WriteBatch,
) -> DbResult<()> {
let cf_block = self.block_column();
if !first {
let last_curr_block = self.get_meta_last_block_in_db()?;
if block.header.block_id > last_curr_block {
self.put_meta_last_block_in_db_batch(block.header.block_id, batch)?;
self.put_meta_latest_block_meta_batch(
&BlockMeta {
id: block.header.block_id,
hash: block.header.hash,
msg_id,
},
batch,
)?;
}
}
batch.put_cf(
&cf_block,
borsh::to_vec(&block.header.block_id).map_err(|err| {
DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_string()))
})?,
borsh::to_vec(block).map_err(|err| {
DbError::borsh_cast_message(err, Some("Failed to serialize block data".to_string()))
})?,
);
Ok(())
}
pub fn get_block(&self, block_id: u64) -> DbResult<Block> {
let cf_block = self.block_column();
let res = self
.db
.get_cf(
&cf_block,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<Block>(&data).map_err(|serr| {
DbError::borsh_cast_message(
serr,
Some("Failed to deserialize block data".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"Block on this id not found".to_string(),
))
}
}
pub fn get_nssa_state(&self) -> DbResult<V02State> {
let cf_nssa_state = self.nssa_state_column();
let res = self
.db
.get_cf(
&cf_nssa_state,
borsh::to_vec(&DB_NSSA_STATE_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<V02State>(&data).map_err(|serr| {
DbError::borsh_cast_message(
serr,
Some("Failed to deserialize block data".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"Block on this id not found".to_string(),
))
}
}
pub fn delete_block(&self, block_id: u64) -> DbResult<()> {
let cf_block = self.block_column();
let key = borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_string()))
})?;
if self
.db
.get_cf(&cf_block, &key)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?
.is_none()
{
return Err(DbError::db_interaction_error(
"Block on this id not found".to_string(),
));
}
self.db
.delete_cf(&cf_block, key)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
pub fn mark_block_as_finalized(&self, block_id: u64) -> DbResult<()> {
let mut block = self.get_block(block_id)?;
block.bedrock_status = BedrockStatus::Finalized;
let cf_block = self.block_column();
self.db
.put_cf(
&cf_block,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block id".to_string()),
)
})?,
borsh::to_vec(&block).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block data".to_string()),
)
})?,
)
.map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
Some(format!("Failed to mark block {block_id} as finalized")),
)
})?;
Ok(())
}
pub fn get_all_blocks(&self) -> impl Iterator<Item = DbResult<Block>> {
let cf_block = self.block_column();
self.db
.iterator_cf(&cf_block, rocksdb::IteratorMode::Start)
.map(|res| {
let (_key, value) = res.map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
Some("Failed to get key value pair".to_string()),
)
})?;
borsh::from_slice::<Block>(&value).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to deserialize block data".to_string()),
)
})
})
}
pub fn atomic_update(
&self,
block: &Block,
msg_id: MantleMsgId,
state: &V02State,
) -> DbResult<()> {
let block_id = block.header.block_id;
let mut batch = WriteBatch::default();
self.put_block(block, msg_id, false, &mut batch)?;
self.put_nssa_state_in_db(state, &mut batch)?;
self.db.write(batch).map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
Some(format!("Failed to udpate db with block {block_id}")),
)
})
}
}
pub mod indexer;
pub mod sequencer;

600
storage/src/sequencer.rs Normal file
View File

@ -0,0 +1,600 @@
use std::{path::Path, sync::Arc};
use common::block::{BedrockStatus, Block, BlockMeta, MantleMsgId};
use nssa::V02State;
use rocksdb::{
BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options, WriteBatch,
};
use crate::error::DbError;
/// Maximal size of stored blocks in base
///
/// Used to control db size
///
/// Currently effectively unbounded.
pub const BUFF_SIZE_ROCKSDB: usize = usize::MAX;
/// Size of stored blocks cache in memory
///
/// Keeping small to not run out of memory
pub const CACHE_SIZE: usize = 1000;
/// Key base for storing metainformation about id of first block in db
pub const DB_META_FIRST_BLOCK_IN_DB_KEY: &str = "first_block_in_db";
/// Key base for storing metainformation about id of last current block in db
pub const DB_META_LAST_BLOCK_IN_DB_KEY: &str = "last_block_in_db";
/// Key base for storing metainformation which describe if first block has been set
pub const DB_META_FIRST_BLOCK_SET_KEY: &str = "first_block_set";
/// Key base for storing metainformation about the last finalized block on Bedrock
pub const DB_META_LAST_FINALIZED_BLOCK_ID: &str = "last_finalized_block_id";
/// Key base for storing metainformation about the latest block meta
pub const DB_META_LATEST_BLOCK_META_KEY: &str = "latest_block_meta";
/// Key base for storing the NSSA state
pub const DB_NSSA_STATE_KEY: &str = "nssa_state";
/// Name of block column family
pub const CF_BLOCK_NAME: &str = "cf_block";
/// Name of meta column family
pub const CF_META_NAME: &str = "cf_meta";
/// Name of state column family
pub const CF_NSSA_STATE_NAME: &str = "cf_nssa_state";
pub type DbResult<T> = Result<T, DbError>;
pub struct RocksDBIO {
pub db: DBWithThreadMode<MultiThreaded>,
}
impl RocksDBIO {
pub fn open_or_create(
path: &Path,
start_block: Option<(&Block, MantleMsgId)>,
) -> DbResult<Self> {
let mut cf_opts = Options::default();
cf_opts.set_max_write_buffer_number(16);
// ToDo: Add more column families for different data
let cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone());
let cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone());
let cfstate = ColumnFamilyDescriptor::new(CF_NSSA_STATE_NAME, cf_opts.clone());
let mut db_opts = Options::default();
db_opts.create_missing_column_families(true);
db_opts.create_if_missing(true);
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&db_opts,
path,
vec![cfb, cfmeta, cfstate],
);
let dbio = Self {
// There is no point in handling this from runner code
db: db.unwrap(),
};
let is_start_set = dbio.get_meta_is_first_block_set()?;
if is_start_set {
Ok(dbio)
} else if let Some((block, msg_id)) = start_block {
let block_id = block.header.block_id;
dbio.put_meta_first_block_in_db(block, msg_id)?;
dbio.put_meta_is_first_block_set()?;
dbio.put_meta_last_block_in_db(block_id)?;
dbio.put_meta_last_finalized_block_id(None)?;
dbio.put_meta_latest_block_meta(&BlockMeta {
id: block.header.block_id,
hash: block.header.hash,
msg_id,
})?;
Ok(dbio)
} else {
// Here we are trying to start a DB without a block, one should not do it.
unreachable!()
}
}
pub fn destroy(path: &Path) -> DbResult<()> {
let mut cf_opts = Options::default();
cf_opts.set_max_write_buffer_number(16);
// ToDo: Add more column families for different data
let _cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone());
let _cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone());
let _cfstate = ColumnFamilyDescriptor::new(CF_NSSA_STATE_NAME, cf_opts.clone());
let mut db_opts = Options::default();
db_opts.create_missing_column_families(true);
db_opts.create_if_missing(true);
DBWithThreadMode::<MultiThreaded>::destroy(&db_opts, path)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))
}
pub fn meta_column(&self) -> Arc<BoundColumnFamily<'_>> {
self.db.cf_handle(CF_META_NAME).unwrap()
}
pub fn block_column(&self) -> Arc<BoundColumnFamily<'_>> {
self.db.cf_handle(CF_BLOCK_NAME).unwrap()
}
pub fn nssa_state_column(&self) -> Arc<BoundColumnFamily<'_>> {
self.db.cf_handle(CF_NSSA_STATE_NAME).unwrap()
}
pub fn get_meta_first_block_in_db(&self) -> DbResult<u64> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_FIRST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<u64>(&data).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to deserialize first block".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"First block not found".to_string(),
))
}
}
pub fn get_meta_last_block_in_db(&self) -> DbResult<u64> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<u64>(&data).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to deserialize last block".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"Last block not found".to_string(),
))
}
}
pub fn get_meta_is_first_block_set(&self) -> DbResult<bool> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_SET_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_FIRST_BLOCK_SET_KEY".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(res.is_some())
}
pub fn put_nssa_state_in_db(&self, state: &V02State, batch: &mut WriteBatch) -> DbResult<()> {
let cf_nssa_state = self.nssa_state_column();
batch.put_cf(
&cf_nssa_state,
borsh::to_vec(&DB_NSSA_STATE_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_NSSA_STATE_KEY".to_string()),
)
})?,
borsh::to_vec(state).map_err(|err| {
DbError::borsh_cast_message(err, Some("Failed to serialize NSSA state".to_string()))
})?,
);
Ok(())
}
pub fn put_meta_first_block_in_db(&self, block: &Block, msg_id: MantleMsgId) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_FIRST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
borsh::to_vec(&block.header.block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize first block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
let mut batch = WriteBatch::default();
self.put_block(block, msg_id, true, &mut batch)?;
self.db.write(batch).map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
Some("Failed to write first block in db".to_string()),
)
})?;
Ok(())
}
pub fn put_meta_last_block_in_db(&self, block_id: u64) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize last block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
fn put_meta_last_block_in_db_batch(
&self,
block_id: u64,
batch: &mut WriteBatch,
) -> DbResult<()> {
let cf_meta = self.meta_column();
batch.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize last block id".to_string()),
)
})?,
);
Ok(())
}
pub fn put_meta_last_finalized_block_id(&self, block_id: Option<u64>) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_FINALIZED_BLOCK_ID).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LAST_FINALIZED_BLOCK_ID".to_string()),
)
})?,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize last block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
pub fn put_meta_is_first_block_set(&self) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_SET_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_FIRST_BLOCK_SET_KEY".to_string()),
)
})?,
[1u8; 1],
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
fn put_meta_latest_block_meta(&self, block_meta: &BlockMeta) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_string()),
)
})?,
borsh::to_vec(&block_meta).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize latest block meta".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
fn put_meta_latest_block_meta_batch(
&self,
block_meta: &BlockMeta,
batch: &mut WriteBatch,
) -> DbResult<()> {
let cf_meta = self.meta_column();
batch.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_string()),
)
})?,
borsh::to_vec(&block_meta).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize latest block meta".to_string()),
)
})?,
);
Ok(())
}
pub fn latest_block_meta(&self) -> DbResult<BlockMeta> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(
&cf_meta,
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<BlockMeta>(&data).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to deserialize latest block meta".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"Latest block meta not found".to_string(),
))
}
}
pub fn put_block(
&self,
block: &Block,
msg_id: MantleMsgId,
first: bool,
batch: &mut WriteBatch,
) -> DbResult<()> {
let cf_block = self.block_column();
if !first {
let last_curr_block = self.get_meta_last_block_in_db()?;
if block.header.block_id > last_curr_block {
self.put_meta_last_block_in_db_batch(block.header.block_id, batch)?;
self.put_meta_latest_block_meta_batch(
&BlockMeta {
id: block.header.block_id,
hash: block.header.hash,
msg_id,
},
batch,
)?;
}
}
batch.put_cf(
&cf_block,
borsh::to_vec(&block.header.block_id).map_err(|err| {
DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_string()))
})?,
borsh::to_vec(block).map_err(|err| {
DbError::borsh_cast_message(err, Some("Failed to serialize block data".to_string()))
})?,
);
Ok(())
}
pub fn get_block(&self, block_id: u64) -> DbResult<Block> {
let cf_block = self.block_column();
let res = self
.db
.get_cf(
&cf_block,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<Block>(&data).map_err(|serr| {
DbError::borsh_cast_message(
serr,
Some("Failed to deserialize block data".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"Block on this id not found".to_string(),
))
}
}
pub fn get_nssa_state(&self) -> DbResult<V02State> {
let cf_nssa_state = self.nssa_state_column();
let res = self
.db
.get_cf(
&cf_nssa_state,
borsh::to_vec(&DB_NSSA_STATE_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<V02State>(&data).map_err(|serr| {
DbError::borsh_cast_message(
serr,
Some("Failed to deserialize block data".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"Block on this id not found".to_string(),
))
}
}
pub fn delete_block(&self, block_id: u64) -> DbResult<()> {
let cf_block = self.block_column();
let key = borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_string()))
})?;
if self
.db
.get_cf(&cf_block, &key)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?
.is_none()
{
return Err(DbError::db_interaction_error(
"Block on this id not found".to_string(),
));
}
self.db
.delete_cf(&cf_block, key)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
pub fn mark_block_as_finalized(&self, block_id: u64) -> DbResult<()> {
let mut block = self.get_block(block_id)?;
block.bedrock_status = BedrockStatus::Finalized;
let cf_block = self.block_column();
self.db
.put_cf(
&cf_block,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block id".to_string()),
)
})?,
borsh::to_vec(&block).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block data".to_string()),
)
})?,
)
.map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
Some(format!("Failed to mark block {block_id} as finalized")),
)
})?;
Ok(())
}
pub fn get_all_blocks(&self) -> impl Iterator<Item = DbResult<Block>> {
let cf_block = self.block_column();
self.db
.iterator_cf(&cf_block, rocksdb::IteratorMode::Start)
.map(|res| {
let (_key, value) = res.map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
Some("Failed to get key value pair".to_string()),
)
})?;
borsh::from_slice::<Block>(&value).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to deserialize block data".to_string()),
)
})
})
}
pub fn atomic_update(
&self,
block: &Block,
msg_id: MantleMsgId,
state: &V02State,
) -> DbResult<()> {
let block_id = block.header.block_id;
let mut batch = WriteBatch::default();
self.put_block(block, msg_id, false, &mut batch)?;
self.put_nssa_state_in_db(state, &mut batch)?;
self.db.write(batch).map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
Some(format!("Failed to udpate db with block {block_id}")),
)
})
}
}

View File

@ -17,6 +17,8 @@ serde_json.workspace = true
env_logger.workspace = true
log.workspace = true
serde.workspace = true
humantime-serde.workspace = true
humantime.workspace = true
tokio = { workspace = true, features = ["macros"] }
clap.workspace = true
base64.workspace = true

View File

@ -1,7 +1,7 @@
{
"override_rust_log": null,
"sequencer_addr": "http://127.0.0.1:3040",
"seq_poll_timeout_millis": 30000,
"seq_poll_timeout": "30s",
"seq_tx_poll_max_blocks": 15,
"seq_poll_max_retries": 10,
"seq_block_poll_max_amount": 100,

View File

@ -262,7 +262,7 @@ mod tests {
WalletConfig {
override_rust_log: None,
sequencer_addr: "http://127.0.0.1".parse().unwrap(),
seq_poll_timeout_millis: 12000,
seq_poll_timeout: std::time::Duration::from_secs(12),
seq_tx_poll_max_blocks: 5,
seq_poll_max_retries: 10,
seq_block_poll_max_amount: 100,

View File

@ -49,11 +49,8 @@ impl WalletSubcommand for ConfigSubcommand {
"sequencer_addr" => {
println!("{}", wallet_core.storage.wallet_config.sequencer_addr);
}
"seq_poll_timeout_millis" => {
println!(
"{}",
wallet_core.storage.wallet_config.seq_poll_timeout_millis
);
"seq_poll_timeout" => {
println!("{:?}", wallet_core.storage.wallet_config.seq_poll_timeout);
}
"seq_tx_poll_max_blocks" => {
println!(
@ -97,9 +94,10 @@ impl WalletSubcommand for ConfigSubcommand {
"sequencer_addr" => {
wallet_core.storage.wallet_config.sequencer_addr = value.parse()?;
}
"seq_poll_timeout_millis" => {
wallet_core.storage.wallet_config.seq_poll_timeout_millis =
value.parse()?;
"seq_poll_timeout" => {
wallet_core.storage.wallet_config.seq_poll_timeout =
humantime::parse_duration(&value)
.map_err(|e| anyhow::anyhow!("Invalid duration: {}", e))?;
}
"seq_tx_poll_max_blocks" => {
wallet_core.storage.wallet_config.seq_tx_poll_max_blocks = value.parse()?;
@ -131,9 +129,9 @@ impl WalletSubcommand for ConfigSubcommand {
"sequencer_addr" => {
println!("HTTP V4 account_id of sequencer");
}
"seq_poll_timeout_millis" => {
"seq_poll_timeout" => {
println!(
"Sequencer client retry variable: how much time to wait between retries in milliseconds(can be zero)"
"Sequencer client retry variable: how much time to wait between retries (human readable duration)"
);
}
"seq_tx_poll_max_blocks" => {

View File

@ -173,7 +173,7 @@ pub async fn execute_subcommand(
.sequencer_client
.send_tx_program(transaction)
.await
.context("Transaction submission error");
.context("Transaction submission error")?;
SubcommandReturnValue::Empty
}
@ -191,10 +191,7 @@ pub async fn execute_continuous_run(wallet_core: &mut WalletCore) -> Result<()>
.last_block;
wallet_core.sync_to_block(latest_block_num).await?;
tokio::time::sleep(std::time::Duration::from_millis(
wallet_core.config().seq_poll_timeout_millis,
))
.await;
tokio::time::sleep(wallet_core.config().seq_poll_timeout).await;
}
}

View File

@ -2,10 +2,12 @@ use std::{
collections::HashMap,
io::{BufReader, Write as _},
path::Path,
time::Duration,
};
use anyhow::{Context as _, Result};
use common::config::BasicAuth;
use humantime_serde;
use key_protocol::key_management::{
KeyChain,
key_tree::{
@ -184,8 +186,9 @@ pub struct WalletConfig {
pub override_rust_log: Option<String>,
/// Sequencer URL
pub sequencer_addr: Url,
/// Sequencer polling duration for new blocks in milliseconds
pub seq_poll_timeout_millis: u64,
/// Sequencer polling duration for new blocks
#[serde(with = "humantime_serde")]
pub seq_poll_timeout: Duration,
/// Sequencer polling max number of blocks to find transaction
pub seq_tx_poll_max_blocks: usize,
/// Sequencer polling max number error retries
@ -204,7 +207,7 @@ impl Default for WalletConfig {
Self {
override_rust_log: None,
sequencer_addr: "http://127.0.0.1:3040".parse().unwrap(),
seq_poll_timeout_millis: 12000,
seq_poll_timeout: Duration::from_secs(12),
seq_tx_poll_max_blocks: 5,
seq_poll_max_retries: 5,
seq_block_poll_max_amount: 100,
@ -539,7 +542,7 @@ impl WalletConfig {
let WalletConfig {
override_rust_log,
sequencer_addr,
seq_poll_timeout_millis,
seq_poll_timeout,
seq_tx_poll_max_blocks,
seq_poll_max_retries,
seq_block_poll_max_amount,
@ -550,7 +553,7 @@ impl WalletConfig {
let WalletConfigOverrides {
override_rust_log: o_override_rust_log,
sequencer_addr: o_sequencer_addr,
seq_poll_timeout_millis: o_seq_poll_timeout_millis,
seq_poll_timeout: o_seq_poll_timeout,
seq_tx_poll_max_blocks: o_seq_tx_poll_max_blocks,
seq_poll_max_retries: o_seq_poll_max_retries,
seq_block_poll_max_amount: o_seq_block_poll_max_amount,
@ -566,9 +569,9 @@ impl WalletConfig {
warn!("Overriding wallet config 'sequencer_addr' to {v}");
*sequencer_addr = v;
}
if let Some(v) = o_seq_poll_timeout_millis {
warn!("Overriding wallet config 'seq_poll_timeout_millis' to {v}");
*seq_poll_timeout_millis = v;
if let Some(v) = o_seq_poll_timeout {
warn!("Overriding wallet config 'seq_poll_timeout' to {v:?}");
*seq_poll_timeout = v;
}
if let Some(v) = o_seq_tx_poll_max_blocks {
warn!("Overriding wallet config 'seq_tx_poll_max_blocks' to {v}");

View File

@ -156,6 +156,8 @@ impl WalletCore {
let mut storage_file = tokio::fs::File::create(&self.storage_path).await?;
storage_file.write_all(&storage).await?;
// Ensure data is flushed to disk before returning to prevent race conditions
storage_file.sync_all().await?;
println!("Stored persistent accounts at {:#?}", self.storage_path);
@ -168,6 +170,8 @@ impl WalletCore {
let mut config_file = tokio::fs::File::create(&self.config_path).await?;
config_file.write_all(&config).await?;
// Ensure data is flushed to disk before returning to prevent race conditions
config_file.sync_all().await?;
info!("Stored data at {:#?}", self.config_path);

View File

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use anyhow::Result;
use common::{HashType, block::HashableBlockData, sequencer_client::SequencerClient};
@ -11,8 +11,7 @@ use crate::config::WalletConfig;
pub struct TxPoller {
polling_max_blocks_to_query: usize,
polling_max_error_attempts: u64,
// TODO: This should be Duration
polling_delay_millis: u64,
polling_delay: Duration,
block_poll_max_amount: u64,
client: Arc<SequencerClient>,
}
@ -20,7 +19,7 @@ pub struct TxPoller {
impl TxPoller {
pub fn new(config: WalletConfig, client: Arc<SequencerClient>) -> Self {
Self {
polling_delay_millis: config.seq_poll_timeout_millis,
polling_delay: config.seq_poll_timeout,
polling_max_blocks_to_query: config.seq_tx_poll_max_blocks,
polling_max_error_attempts: config.seq_poll_max_retries,
block_poll_max_amount: config.seq_block_poll_max_amount,
@ -62,7 +61,7 @@ impl TxPoller {
return Ok(tx);
}
tokio::time::sleep(std::time::Duration::from_millis(self.polling_delay_millis)).await;
tokio::time::sleep(self.polling_delay).await;
}
anyhow::bail!("Transaction not found in preconfigured amount of blocks");