Merge 1158e55140df6d474613c9351ecb996c807ddd5f into 27f31cf3d045506e3f0e887628057e15c7588463

This commit is contained in:
Pravdyvy 2026-02-18 11:42:08 +02:00 committed by GitHub
commit 365c432150
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
54 changed files with 3695 additions and 1449 deletions

1444
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -113,6 +113,7 @@ logos-blockchain-common-http-client = { git = "https://github.com/logos-blockcha
logos-blockchain-key-management-system-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
logos-blockchain-core = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
logos-blockchain-chain-broadcast-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
logos-blockchain-chain-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
rocksdb = { version = "0.24.0", default-features = false, features = [
"snappy",

View File

@ -35,8 +35,12 @@ To our knowledge, this design is unique to LEZ. Other privacy-focused programmab
3. Transferring private to public (local / privacy-preserving execution)
- Bob executes the token program `Transfer` function locally, sending to Charlies public account.
- A ZKP of correct execution is generated.
- Bobs private balance stays hidden.
- Charlies public account is updated on-chain.
- Bobs private account and balance still remain hidden.
- Charlie's public account is modified with the new tokens added.
4. Transferring public to public (public execution):
- Alice submits a transaction to execute the token program `Transfer` function on-chain, specifying Charlie's public account as recipient.
- The execution is handled on-chain without ZKPs involved.
- Alice's and Charlie's accounts are modified according to the transaction.
4. Transfer from public to public (public execution)
- Alice submits an on-chain transaction to run `Transfer`, sending to Charlies public account.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -16,3 +16,4 @@ serde.workspace = true
logos-blockchain-common-http-client.workspace = true
logos-blockchain-core.workspace = true
logos-blockchain-chain-broadcast-service.workspace = true
logos-blockchain-chain-service.workspace = true

View File

@ -5,6 +5,7 @@ use common::config::BasicAuth;
use futures::{Stream, TryFutureExt};
use log::{info, warn};
pub use logos_blockchain_chain_broadcast_service::BlockInfo;
use logos_blockchain_chain_service::CryptarchiaInfo;
pub use logos_blockchain_common_http_client::{CommonHttpClient, Error};
pub use logos_blockchain_core::{block::Block, header::HeaderId, mantle::SignedMantleTx};
use reqwest::{Client, Url};
@ -82,6 +83,15 @@ impl BedrockClient {
.await
}
pub async fn get_consensus_info(&self) -> Result<CryptarchiaInfo, Error> {
Retry::spawn(self.backoff_strategy(), || {
self.http_client
.consensus_info(self.node_url.clone())
.inspect_err(|err| warn!("Block fetching failed with error: {err:#}"))
})
.await
}
fn backoff_strategy(&self) -> impl Iterator<Item = Duration> {
tokio_retry::strategy::FibonacciBackoff::from_millis(self.backoff.start_delay_millis)
.take(self.backoff.max_retries)

View File

@ -1,4 +1,6 @@
use borsh::{BorshDeserialize, BorshSerialize};
use nssa::AccountId;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256, digest::FixedOutput};
use crate::{HashType, transaction::NSSATransaction};
@ -50,7 +52,7 @@ pub enum BedrockStatus {
Finalized,
}
#[derive(Debug, BorshSerialize, BorshDeserialize)]
#[derive(Debug, BorshSerialize, BorshDeserialize, Clone)]
pub struct Block {
pub header: BlockHeader,
pub body: BlockBody,
@ -107,6 +109,20 @@ impl From<Block> for HashableBlockData {
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
/// Helperstruct for account serialization
pub struct AccountInitialData {
pub account_id: AccountId,
pub balance: u128,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
/// Helperstruct to initialize commitments
pub struct CommitmentsInitialData {
pub npk: nssa_core::NullifierPublicKey,
pub account: nssa_core::account::Account,
}
#[cfg(test)]
mod tests {
use crate::{HashType, block::HashableBlockData, test_utils};

View File

@ -1,4 +1,8 @@
use std::fmt::Display;
use borsh::{BorshDeserialize, BorshSerialize};
use log::warn;
use nssa::{AccountId, V02State};
use serde::{Deserialize, Serialize};
use crate::HashType;
@ -32,6 +36,16 @@ impl From<nssa::PrivacyPreservingTransaction> for NSSATransaction {
}
}
impl NSSATransaction {
pub fn affected_public_account_ids(&self) -> Vec<AccountId> {
match self {
NSSATransaction::ProgramDeployment(tx) => tx.affected_public_account_ids(),
NSSATransaction::Public(tx) => tx.affected_public_account_ids(),
NSSATransaction::PrivacyPreserving(tx) => tx.affected_public_account_ids(),
}
}
}
impl From<nssa::ProgramDeploymentTransaction> for NSSATransaction {
fn from(value: nssa::ProgramDeploymentTransaction) -> Self {
Self::ProgramDeployment(value)
@ -46,3 +60,59 @@ pub enum TxKind {
PrivacyPreserving,
ProgramDeployment,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum TransactionMalformationError {
InvalidSignature,
FailedToDecode { tx: HashType },
}
impl Display for TransactionMalformationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:#?}")
}
}
impl std::error::Error for TransactionMalformationError {}
// TODO: Introduce type-safe wrapper around checked transaction, e.g. AuthenticatedTransaction
pub fn transaction_pre_check(
tx: NSSATransaction,
) -> Result<NSSATransaction, TransactionMalformationError> {
// Stateless checks here
match tx {
NSSATransaction::Public(tx) => {
if tx.witness_set().is_valid_for(tx.message()) {
Ok(NSSATransaction::Public(tx))
} else {
Err(TransactionMalformationError::InvalidSignature)
}
}
NSSATransaction::PrivacyPreserving(tx) => {
if tx.witness_set().signatures_are_valid_for(tx.message()) {
Ok(NSSATransaction::PrivacyPreserving(tx))
} else {
Err(TransactionMalformationError::InvalidSignature)
}
}
NSSATransaction::ProgramDeployment(tx) => Ok(NSSATransaction::ProgramDeployment(tx)),
}
}
pub fn execute_check_transaction_on_state(
state: &mut V02State,
tx: NSSATransaction,
) -> Result<NSSATransaction, nssa::error::NssaError> {
match &tx {
NSSATransaction::Public(tx) => state.transition_from_public_transaction(tx),
NSSATransaction::PrivacyPreserving(tx) => {
state.transition_from_privacy_preserving_transaction(tx)
}
NSSATransaction::ProgramDeployment(tx) => {
state.transition_from_program_deployment_transaction(tx)
}
}
.inspect_err(|err| warn!("Error at transition {err:#?}"))?;
Ok(tx)
}

View File

@ -1,4 +1,4 @@
use std::str::FromStr as _;
use std::str::FromStr;
use indexer_service_protocol::{Account, AccountId, Block, BlockId, HashType, Transaction};
use leptos::prelude::*;

View File

@ -7,6 +7,9 @@ license = { workspace = true }
[dependencies]
common.workspace = true
bedrock_client.workspace = true
nssa.workspace = true
nssa_core.workspace = true
storage.workspace = true
anyhow.workspace = true
log.workspace = true

View File

@ -0,0 +1,118 @@
use std::{path::Path, sync::Arc};
use anyhow::Result;
use bedrock_client::HeaderId;
use common::{
block::Block,
transaction::{NSSATransaction, execute_check_transaction_on_state, transaction_pre_check},
};
use nssa::{Account, AccountId, V02State};
use storage::indexer::RocksDBIO;
#[derive(Clone)]
pub struct IndexerStore {
dbio: Arc<RocksDBIO>,
}
impl IndexerStore {
/// Starting database at the start of new chain.
/// Creates files if necessary.
///
/// ATTENTION: Will overwrite genesis block.
pub fn open_db_with_genesis(
location: &Path,
start_data: Option<(Block, V02State)>,
) -> Result<Self> {
let dbio = RocksDBIO::open_or_create(location, start_data)?;
Ok(Self {
dbio: Arc::new(dbio),
})
}
/// Reopening existing database
pub fn open_db_restart(location: &Path) -> Result<Self> {
Self::open_db_with_genesis(location, None)
}
pub fn last_observed_l1_header(&self) -> Result<Option<HeaderId>> {
Ok(self
.dbio
.get_meta_last_observed_l1_block_in_db()?
.map(|raw_id| HeaderId::from(raw_id)))
}
pub fn get_last_block_id(&self) -> Result<u64> {
Ok(self.dbio.get_meta_last_block_in_db()?)
}
pub fn get_block_at_id(&self, id: u64) -> Result<Block> {
Ok(self.dbio.get_block(id)?)
}
pub fn get_block_batch(&self, offset: u64, limit: u64) -> Result<Vec<Block>> {
Ok(self.dbio.get_block_batch(offset, limit)?)
}
pub fn get_transaction_by_hash(&self, tx_hash: [u8; 32]) -> Result<NSSATransaction> {
let block = self.get_block_at_id(self.dbio.get_block_id_by_tx_hash(tx_hash)?)?;
let transaction = block
.body
.transactions
.iter()
.find(|enc_tx| enc_tx.hash().0 == tx_hash)
.ok_or_else(|| anyhow::anyhow!("Transaction not found in DB"))?;
Ok(transaction.clone())
}
pub fn get_block_by_hash(&self, hash: [u8; 32]) -> Result<Block> {
self.get_block_at_id(self.dbio.get_block_id_by_hash(hash)?)
}
pub fn get_transactions_by_account(
&self,
acc_id: [u8; 32],
offset: u64,
limit: u64,
) -> Result<Vec<NSSATransaction>> {
Ok(self.dbio.get_acc_transactions(acc_id, offset, limit)?)
}
pub fn genesis_id(&self) -> u64 {
self.dbio
.get_meta_first_block_in_db()
.expect("Must be set at the DB startup")
}
pub fn last_block(&self) -> u64 {
self.dbio
.get_meta_last_block_in_db()
.expect("Must be set at the DB startup")
}
pub fn get_state_at_block(&self, block_id: u64) -> Result<V02State> {
Ok(self.dbio.calculate_state_for_id(block_id)?)
}
pub fn final_state(&self) -> Result<V02State> {
Ok(self.dbio.final_state()?)
}
pub fn get_account_final(&self, account_id: &AccountId) -> Result<Account> {
Ok(self.final_state()?.get_account_by_id(*account_id))
}
pub fn put_block(&self, block: Block, l1_header: HeaderId) -> Result<()> {
let mut final_state = self.dbio.final_state()?;
for transaction in &block.body.transactions {
execute_check_transaction_on_state(
&mut final_state,
transaction_pre_check(transaction.clone())?,
)?;
}
Ok(self.dbio.put_block(block, l1_header.into())?)
}
}

View File

@ -1,14 +1,21 @@
use std::{fs::File, io::BufReader, path::Path};
use std::{
fs::File,
io::BufReader,
path::{Path, PathBuf},
};
use anyhow::{Context as _, Result};
pub use bedrock_client::BackoffConfig;
use common::config::BasicAuth;
use common::{
block::{AccountInitialData, CommitmentsInitialData},
config::BasicAuth,
};
pub use logos_blockchain_core::mantle::ops::channel::ChannelId;
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BedrockClientConfig {
pub struct ClientConfig {
/// For individual RPC requests we use Fibonacci backoff retry strategy.
pub backoff: BackoffConfig,
pub addr: Url,
@ -18,8 +25,20 @@ pub struct BedrockClientConfig {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexerConfig {
/// Home dir of sequencer storage
pub home: PathBuf,
/// List of initial accounts data
pub initial_accounts: Vec<AccountInitialData>,
/// List of initial commitments
pub initial_commitments: Vec<CommitmentsInitialData>,
/// Sequencers signing key
///
/// ToDo: Remove it after introducing bedrock block parsing.
/// Currently can not be removed, because indexer must start
/// chain BEFORE sequencer.
pub signing_key: [u8; 32],
pub resubscribe_interval_millis: u64,
pub bedrock_client_config: BedrockClientConfig,
pub bedrock_client_config: ClientConfig,
pub channel_id: ChannelId,
}

View File

@ -1,64 +1,102 @@
use std::sync::Arc;
use std::collections::VecDeque;
use anyhow::{Context as _, Result};
use bedrock_client::BedrockClient;
use common::block::Block;
use futures::StreamExt;
use log::{debug, info};
use anyhow::Result;
use bedrock_client::{BedrockClient, HeaderId};
use common::block::{Block, HashableBlockData};
// ToDo: Remove after testnet
use common::{HashType, PINATA_BASE58};
use log::info;
use logos_blockchain_core::mantle::{
Op, SignedMantleTx,
ops::channel::{ChannelId, inscribe::InscriptionOp},
};
use tokio::sync::RwLock;
use crate::{config::IndexerConfig, state::IndexerState};
use crate::{block_store::IndexerStore, config::IndexerConfig};
pub mod block_store;
pub mod config;
pub mod state;
#[derive(Clone)]
pub struct IndexerCore {
bedrock_client: BedrockClient,
config: IndexerConfig,
state: IndexerState,
pub bedrock_client: BedrockClient,
pub config: IndexerConfig,
pub store: IndexerStore,
}
impl IndexerCore {
pub fn new(config: IndexerConfig) -> Result<Self> {
// ToDo: replace with correct startup
let hashable_data = HashableBlockData {
block_id: 1,
transactions: vec![],
prev_block_hash: HashType([0; 32]),
timestamp: 0,
};
let signing_key = nssa::PrivateKey::try_new(config.signing_key).unwrap();
let channel_genesis_msg_id = [0; 32];
let start_block = hashable_data.into_pending_block(&signing_key, channel_genesis_msg_id);
let initial_commitments: Vec<nssa_core::Commitment> = config
.initial_commitments
.iter()
.map(|init_comm_data| {
let npk = &init_comm_data.npk;
let mut acc = init_comm_data.account.clone();
acc.program_owner = nssa::program::Program::authenticated_transfer_program().id();
nssa_core::Commitment::new(npk, &acc)
})
.collect();
let init_accs: Vec<(nssa::AccountId, u128)> = config
.initial_accounts
.iter()
.map(|acc_data| (acc_data.account_id, acc_data.balance))
.collect();
let mut state = nssa::V02State::new_with_genesis_accounts(&init_accs, &initial_commitments);
// ToDo: Remove after testnet
state.add_pinata_program(PINATA_BASE58.parse().unwrap());
let home = config.home.join("rocksdb");
Ok(Self {
bedrock_client: BedrockClient::new(
config.bedrock_client_config.backoff,
config.bedrock_client_config.addr.clone(),
config.bedrock_client_config.auth.clone(),
)
.context("Failed to create Bedrock client")?,
)?,
config,
// No state setup for now, future task.
state: IndexerState {
latest_seen_block: Arc::new(RwLock::new(0)),
},
// ToDo: Implement restarts
store: IndexerStore::open_db_with_genesis(&home, Some((start_block, state)))?,
})
}
pub async fn subscribe_parse_block_stream(&self) -> impl futures::Stream<Item = Result<Block>> {
debug!("Subscribing to Bedrock block stream");
async_stream::stream! {
loop {
let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?);
let last_l1_header = self.store.last_observed_l1_header()?;
info!("Block stream joined");
let mut last_fin_header = match last_l1_header {
Some(last_l1_header) => {
info!("Last fin header found: {last_l1_header}");
last_l1_header
},
None => {
info!("Searching for the start of a channel");
while let Some(block_info) = stream_pinned.next().await {
let header_id = block_info.header_id;
let start_buff = self.search_for_channel_start().await?;
info!("Observed L1 block at height {}", block_info.height);
let last_l1_header = start_buff.back().ok_or(anyhow::anyhow!("Failure: Chain is empty"))?.header().id();
if let Some(l1_block) = self
.bedrock_client
.get_block_by_id(header_id)
.await?
{
info!("Extracted L1 block at height {}", block_info.height);
for l1_block in start_buff {
info!("Observed L1 block at slot {}", l1_block.header().slot().into_inner());
let curr_l1_header = l1_block.header().id();
let l2_blocks_parsed = parse_blocks(
l1_block.into_transactions().into_iter(),
@ -68,27 +106,205 @@ impl IndexerCore {
info!("Parsed {} L2 blocks", l2_blocks_parsed.len());
for l2_block in l2_blocks_parsed {
// State modification, will be updated in future
{
let mut guard = self.state.latest_seen_block.write().await;
if l2_block.header.block_id > *guard {
*guard = l2_block.header.block_id;
}
}
self.store.put_block(l2_block.clone(), curr_l1_header)?;
yield Ok(l2_block);
}
}
}
// Refetch stream after delay
tokio::time::sleep(std::time::Duration::from_millis(
self.config.resubscribe_interval_millis,
))
.await;
last_l1_header
},
};
info!("INITIAL_SEARCH_END");
loop {
let buff = self.rollback_to_last_known_finalized_l1_id(last_fin_header).await
.inspect_err(|err| log::error!("Failed to roll back to last finalized l1 id with err {err:#?}"))?;
last_fin_header = buff.back().ok_or(anyhow::anyhow!("Failure: Chain is empty"))
.inspect_err(|err| log::error!("Chain is empty {err:#?}"))?.header().id();
for l1_block in buff {
info!("Observed L1 block at slot {}", l1_block.header().slot().into_inner());
let curr_l1_header = l1_block.header().id();
let l2_blocks_parsed = parse_blocks(
l1_block.into_transactions().into_iter(),
&self.config.channel_id,
).collect::<Vec<_>>();
info!("Parsed {} L2 blocks", l2_blocks_parsed.len());
for l2_block in l2_blocks_parsed {
self.store.put_block(l2_block.clone(), curr_l1_header)?;
yield Ok(l2_block);
}
}
}
}
}
// async fn wait_last_finalized_block_header(&self) -> Result<HeaderId> {
// let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?);
// stream_pinned
// .next()
// .await
// .ok_or(anyhow::anyhow!("Stream failure"))
// .map(|info| info.header_id)
// }
async fn get_tip(&self) -> Result<HeaderId> {
Ok(self.bedrock_client.get_consensus_info().await?.tip)
}
async fn get_next_tip(&self, prev_tip: HeaderId) -> Result<HeaderId> {
loop {
let next_tip = self.get_tip().await?;
if next_tip != prev_tip {
break Ok(next_tip);
} else {
info!("Wait 10s to not spam the node");
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
}
}
/// WARNING: depending on chain behaviour,
/// may take indefinite amount of time
pub async fn search_for_channel_start(
&self,
) -> Result<VecDeque<bedrock_client::Block<SignedMantleTx>>> {
// let mut curr_last_header = self.wait_last_finalized_block_header().await?;
let mut curr_last_header = self.get_tip().await?;
// Storing start for future use
let mut rollback_start = curr_last_header;
// ToDo: How to get root?
let mut rollback_limit = HeaderId::from([0; 32]);
// ToDo: Not scalable, initial buffer should be stored in DB to not run out of memory
// Don't want to complicate DB even more right now.
let mut block_buffer = VecDeque::new();
'outer: loop {
let mut cycle_header = curr_last_header;
loop {
let curr_last_block =
if let Some(block) = self.bedrock_client.get_block_by_id(cycle_header).await? {
block
} else {
break;
};
info!(
"INITIAL_SEARCH: Observed L1 block at slot {}",
curr_last_block.header().slot().into_inner()
);
info!(
"INITIAL_SEARCH: This block header is {}",
curr_last_block.header().id()
);
info!(
"INITIAL_SEARCH: This block parent is {}",
curr_last_block.header().parent()
);
block_buffer.push_front(curr_last_block.clone());
if let Some(_) = curr_last_block.transactions().find_map(|tx| {
tx.mantle_tx.ops.iter().find_map(|op| match op {
Op::ChannelInscribe(InscriptionOp {
channel_id, inscription, ..
}) => {
if channel_id == &self.config.channel_id
{
let l2_block = borsh::from_slice::<Block>(&inscription).
inspect_err(|err| log::error!("INITIAL_SEARCH: failed to deserialize with err: {err:#?}")).ok();
match l2_block {
Some(block) => {
info!("!!!!!!!!!!!!!!!INITIAL_SEARCH: Observed L2 block at id {}", block.header.block_id);
if block.header.block_id == 1 {
Some(curr_last_block.header().id())
} else {
None
}
},
None => None,
}
} else {
None
}
}
_ => None,
})
}) {
info!("INITIAL_SEARCH: Found channel start");
break 'outer;
} else {
// Step back to parent
let parent = curr_last_block.header().parent();
if parent == rollback_limit {
break;
}
cycle_header = parent;
};
}
info!("INITIAL_SEARCH: Reached rollback limit, refetching last block");
block_buffer.clear();
rollback_limit = rollback_start;
curr_last_header = loop {
let next_tip = self.get_tip().await?;
if next_tip != curr_last_header {
break next_tip;
} else {
info!("Wait 10s to not spam the node");
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
};
rollback_start = curr_last_header;
}
Ok(block_buffer)
}
pub async fn rollback_to_last_known_finalized_l1_id(
&self,
last_fin_header: HeaderId,
) -> Result<VecDeque<bedrock_client::Block<SignedMantleTx>>> {
let mut curr_last_header = self.get_next_tip(last_fin_header).await?;
// ToDo: Not scalable, buffer should be stored in DB to not run out of memory
// Don't want to complicate DB even more right now.
let mut block_buffer = VecDeque::new();
loop {
let Some(curr_last_block) = self
.bedrock_client
.get_block_by_id(curr_last_header)
.await?
else {
return Err(anyhow::anyhow!("Chain inconsistency"));
};
if curr_last_block.header().id() == last_fin_header {
break;
} else {
// Step back to parent
curr_last_header = curr_last_block.header().parent();
}
block_buffer.push_front(curr_last_block.clone());
}
Ok(block_buffer)
}
}
fn parse_blocks(

View File

@ -1,4 +1,5 @@
{
"home": "./indexer/service",
"resubscribe_interval_millis": 1000,
"bedrock_client_config": {
"addr": "http://localhost:8080",
@ -7,5 +8,153 @@
"max_retries": 5
}
},
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101"
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101",
"initial_accounts": [
{
"account_id": "BLgCRDXYdQPMMWVHYRFGQZbgeHx9frkipa8GtpG2Syqy",
"balance": 10000
},
{
"account_id": "Gj1mJy5W7J5pfmLRujmQaLfLMWidNxQ6uwnhb666ZwHw",
"balance": 20000
}
],
"initial_commitments": [
{
"npk": [
63,
202,
178,
231,
183,
82,
237,
212,
216,
221,
215,
255,
153,
101,
177,
161,
254,
210,
128,
122,
54,
190,
230,
151,
183,
64,
225,
229,
113,
1,
228,
97
],
"account": {
"program_owner": [
0,
0,
0,
0,
0,
0,
0,
0
],
"balance": 10000,
"data": [],
"nonce": 0
}
},
{
"npk": [
192,
251,
166,
243,
167,
236,
84,
249,
35,
136,
130,
172,
219,
225,
161,
139,
229,
89,
243,
125,
194,
213,
209,
30,
23,
174,
100,
244,
124,
74,
140,
47
],
"account": {
"program_owner": [
0,
0,
0,
0,
0,
0,
0,
0
],
"balance": 20000,
"data": [],
"nonce": 0
}
}
],
"signing_key": [
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37
]
}

View File

@ -26,6 +26,9 @@ pub trait Rpc {
#[subscription(name = "subscribeToFinalizedBlocks", item = BlockId)]
async fn subscribe_to_finalized_blocks(&self) -> SubscriptionResult;
#[method(name = "getLastFinalizedBlockId")]
async fn get_last_finalized_block_id(&self) -> Result<BlockId, ErrorObjectOwned>;
#[method(name = "getBlockById")]
async fn get_block_by_id(&self, block_id: BlockId) -> Result<Block, ErrorObjectOwned>;
@ -48,4 +51,8 @@ pub trait Rpc {
limit: u32,
offset: u32,
) -> Result<Vec<Transaction>, ErrorObjectOwned>;
// ToDo: expand healthcheck response into some kind of report
#[method(name = "checkHealth")]
async fn healthcheck(&self) -> Result<(), ErrorObjectOwned>;
}

View File

@ -180,6 +180,15 @@ impl indexer_service_rpc::RpcServer for MockIndexerService {
Ok(())
}
async fn get_last_finalized_block_id(&self) -> Result<BlockId, ErrorObjectOwned> {
self.blocks
.last()
.map(|bl| bl.header.block_id)
.ok_or_else(|| {
ErrorObjectOwned::owned(-32001, "Last block not found".to_string(), None::<()>)
})
}
async fn get_block_by_id(&self, block_id: BlockId) -> Result<Block, ErrorObjectOwned> {
self.blocks
.iter()
@ -268,4 +277,8 @@ impl indexer_service_rpc::RpcServer for MockIndexerService {
.map(|(tx, _)| tx.clone())
.collect())
}
async fn healthcheck(&self) -> Result<(), ErrorObjectOwned> {
Ok(())
}
}

View File

@ -15,11 +15,6 @@ use tokio::sync::mpsc::UnboundedSender;
pub struct IndexerService {
subscription_service: SubscriptionService,
#[expect(
dead_code,
reason = "Will be used in future implementations of RPC methods"
)]
indexer: IndexerCore,
}
@ -53,33 +48,104 @@ impl indexer_service_rpc::RpcServer for IndexerService {
Ok(())
}
async fn get_block_by_id(&self, _block_id: BlockId) -> Result<Block, ErrorObjectOwned> {
Err(not_yet_implemented_error())
async fn get_last_finalized_block_id(&self) -> Result<BlockId, ErrorObjectOwned> {
self.indexer.store.get_last_block_id().map_err(|err| {
ErrorObjectOwned::owned(-32001, "DBError".to_string(), Some(format!("{err:#?}")))
})
}
async fn get_block_by_hash(&self, _block_hash: HashType) -> Result<Block, ErrorObjectOwned> {
Err(not_yet_implemented_error())
async fn get_block_by_id(&self, block_id: BlockId) -> Result<Block, ErrorObjectOwned> {
Ok(self
.indexer
.store
.get_block_at_id(block_id)
.map_err(|err| {
ErrorObjectOwned::owned(-32001, "DBError".to_string(), Some(format!("{err:#?}")))
})?
.into())
}
async fn get_account(&self, _account_id: AccountId) -> Result<Account, ErrorObjectOwned> {
Err(not_yet_implemented_error())
async fn get_block_by_hash(&self, block_hash: HashType) -> Result<Block, ErrorObjectOwned> {
Ok(self
.indexer
.store
.get_block_by_hash(block_hash.0)
.map_err(|err| {
ErrorObjectOwned::owned(-32001, "DBError".to_string(), Some(format!("{err:#?}")))
})?
.into())
}
async fn get_transaction(&self, _tx_hash: HashType) -> Result<Transaction, ErrorObjectOwned> {
Err(not_yet_implemented_error())
async fn get_account(&self, account_id: AccountId) -> Result<Account, ErrorObjectOwned> {
Ok(self
.indexer
.store
.get_account_final(&account_id.into())
.map_err(|err| {
ErrorObjectOwned::owned(-32001, "DBError".to_string(), Some(format!("{err:#?}")))
})?
.into())
}
async fn get_blocks(&self, _offset: u32, _limit: u32) -> Result<Vec<Block>, ErrorObjectOwned> {
Err(not_yet_implemented_error())
async fn get_transaction(&self, tx_hash: HashType) -> Result<Transaction, ErrorObjectOwned> {
Ok(self
.indexer
.store
.get_transaction_by_hash(tx_hash.0)
.map_err(|err| {
ErrorObjectOwned::owned(-32001, "DBError".to_string(), Some(format!("{err:#?}")))
})?
.into())
}
async fn get_blocks(&self, offset: u32, limit: u32) -> Result<Vec<Block>, ErrorObjectOwned> {
let blocks = self
.indexer
.store
.get_block_batch(offset as u64, limit as u64)
.map_err(|err| {
ErrorObjectOwned::owned(-32001, "DBError".to_string(), Some(format!("{err:#?}")))
})?;
let mut block_res = vec![];
for block in blocks {
block_res.push(block.into())
}
Ok(block_res)
}
async fn get_transactions_by_account(
&self,
_account_id: AccountId,
_limit: u32,
_offset: u32,
account_id: AccountId,
limit: u32,
offset: u32,
) -> Result<Vec<Transaction>, ErrorObjectOwned> {
Err(not_yet_implemented_error())
let transactions = self
.indexer
.store
.get_transactions_by_account(account_id.value, offset as u64, limit as u64)
.map_err(|err| {
ErrorObjectOwned::owned(-32001, "DBError".to_string(), Some(format!("{err:#?}")))
})?;
let mut tx_res = vec![];
for tx in transactions {
tx_res.push(tx.into())
}
Ok(tx_res)
}
async fn healthcheck(&self) -> Result<(), ErrorObjectOwned> {
// Checking, that indexer can calculate last state
let _ = self.indexer.store.final_state().map_err(|err| {
ErrorObjectOwned::owned(-32001, "DBError".to_string(), Some(format!("{err:#?}")))
})?;
Ok(())
}
}
@ -219,7 +285,7 @@ impl<T> Drop for Subscription<T> {
}
}
fn not_yet_implemented_error() -> ErrorObjectOwned {
pub fn not_yet_implemented_error() -> ErrorObjectOwned {
ErrorObject::owned(
ErrorCode::InternalError.code(),
"Not yet implemented",

View File

@ -12,15 +12,17 @@ sequencer_runner.workspace = true
wallet.workspace = true
common.workspace = true
key_protocol.workspace = true
wallet-ffi.workspace = true
token_core.workspace = true
indexer_service.workspace = true
serde_json.workspace = true
token_core.workspace = true
indexer_service_rpc.workspace = true
wallet-ffi.workspace = true
url.workspace = true
anyhow.workspace = true
env_logger.workspace = true
log.workspace = true
serde_json.workspace = true
base64.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
hex.workspace = true

View File

@ -1,22 +1,26 @@
use std::{net::SocketAddr, path::PathBuf};
use anyhow::{Context, Result};
use indexer_service::{BackoffConfig, BedrockClientConfig, ChannelId, IndexerConfig};
use common::block::{AccountInitialData, CommitmentsInitialData};
use indexer_service::{BackoffConfig, ChannelId, ClientConfig, IndexerConfig};
use key_protocol::key_management::KeyChain;
use nssa::{Account, AccountId, PrivateKey, PublicKey};
use nssa_core::{account::Data, program::DEFAULT_PROGRAM_ID};
use sequencer_core::config::{
AccountInitialData, BedrockConfig, CommitmentsInitialData, SequencerConfig,
};
use sequencer_core::config::{BedrockConfig, SequencerConfig};
use url::Url;
use wallet::config::{
InitialAccountData, InitialAccountDataPrivate, InitialAccountDataPublic, WalletConfig,
};
pub fn indexer_config(bedrock_addr: SocketAddr) -> Result<IndexerConfig> {
pub fn indexer_config(
bedrock_addr: SocketAddr,
home: PathBuf,
initial_data: &InitialData,
) -> Result<IndexerConfig> {
Ok(IndexerConfig {
home,
resubscribe_interval_millis: 1000,
bedrock_client_config: BedrockClientConfig {
bedrock_client_config: ClientConfig {
addr: addr_to_url(UrlProtocol::Http, bedrock_addr)
.context("Failed to convert bedrock addr to URL")?,
auth: None,
@ -25,6 +29,9 @@ pub fn indexer_config(bedrock_addr: SocketAddr) -> Result<IndexerConfig> {
max_retries: 10,
},
},
initial_accounts: initial_data.sequencer_initial_accounts(),
initial_commitments: initial_data.sequencer_initial_commitments(),
signing_key: [37; 32],
channel_id: bedrock_channel_id(),
})
}
@ -67,7 +74,7 @@ pub fn sequencer_config(
max_num_tx_in_block,
mempool_max_size,
block_create_timeout_millis,
retry_pending_blocks_timeout_millis: 240_000,
retry_pending_blocks_timeout_millis: 120_000,
port: 0,
initial_accounts: initial_data.sequencer_initial_accounts(),
initial_commitments: initial_data.sequencer_initial_commitments(),

View File

@ -10,6 +10,7 @@ use indexer_service::IndexerHandle;
use log::{debug, error, warn};
use nssa::{AccountId, PrivacyPreservingTransaction};
use nssa_core::Commitment;
use sequencer_core::indexer_client::{IndexerClient, IndexerClientTrait};
use sequencer_runner::SequencerHandle;
use tempfile::TempDir;
use testcontainers::compose::DockerCompose;
@ -33,11 +34,13 @@ static LOGGER: LazyLock<()> = LazyLock::new(env_logger::init);
// NOTE: Order of fields is important for proper drop order.
pub struct TestContext {
sequencer_client: SequencerClient,
indexer_client: IndexerClient,
wallet: WalletCore,
wallet_password: String,
sequencer_handle: SequencerHandle,
indexer_handle: IndexerHandle,
bedrock_compose: DockerCompose,
_temp_indexer_dir: TempDir,
_temp_sequencer_dir: TempDir,
_temp_wallet_dir: TempDir,
}
@ -63,7 +66,7 @@ impl TestContext {
let (bedrock_compose, bedrock_addr) = Self::setup_bedrock_node().await?;
let indexer_handle = Self::setup_indexer(bedrock_addr)
let (indexer_handle, temp_indexer_dir) = Self::setup_indexer(bedrock_addr, &initial_data)
.await
.context("Failed to setup Indexer")?;
@ -83,16 +86,23 @@ impl TestContext {
let sequencer_url = config::addr_to_url(config::UrlProtocol::Http, sequencer_handle.addr())
.context("Failed to convert sequencer addr to URL")?;
let indexer_url = config::addr_to_url(config::UrlProtocol::Ws, indexer_handle.addr())
.context("Failed to convert indexer addr to URL")?;
let sequencer_client =
SequencerClient::new(sequencer_url).context("Failed to create sequencer client")?;
let indexer_client = IndexerClient::new(&indexer_url)
.await
.context("Failed to create indexer client")?;
Ok(Self {
sequencer_client,
indexer_client,
wallet,
wallet_password,
bedrock_compose,
sequencer_handle,
indexer_handle,
_temp_indexer_dir: temp_indexer_dir,
_temp_sequencer_dir: temp_sequencer_dir,
_temp_wallet_dir: temp_wallet_dir,
})
@ -161,13 +171,26 @@ impl TestContext {
Ok((compose, addr))
}
async fn setup_indexer(bedrock_addr: SocketAddr) -> Result<IndexerHandle> {
let indexer_config =
config::indexer_config(bedrock_addr).context("Failed to create Indexer config")?;
async fn setup_indexer(
bedrock_addr: SocketAddr,
initial_data: &config::InitialData,
) -> Result<(IndexerHandle, TempDir)> {
let temp_indexer_dir =
tempfile::tempdir().context("Failed to create temp dir for indexer home")?;
debug!("Using temp indexer home at {:?}", temp_indexer_dir.path());
let indexer_config = config::indexer_config(
bedrock_addr,
temp_indexer_dir.path().to_owned(),
initial_data,
)
.context("Failed to create Indexer config")?;
indexer_service::run_server(indexer_config, 0)
.await
.context("Failed to run Indexer Service")
.map(|handle| (handle, temp_indexer_dir))
}
async fn setup_sequencer(
@ -252,6 +275,11 @@ impl TestContext {
&self.sequencer_client
}
/// Get reference to the indexer client.
pub fn indexer_client(&self) -> &IndexerClient {
&self.indexer_client
}
/// Get existing public account IDs in the wallet.
pub fn existing_public_accounts(&self) -> Vec<AccountId> {
self.wallet
@ -277,9 +305,11 @@ impl Drop for TestContext {
sequencer_handle,
indexer_handle,
bedrock_compose,
_temp_indexer_dir: _,
_temp_sequencer_dir: _,
_temp_wallet_dir: _,
sequencer_client: _,
indexer_client: _,
wallet: _,
wallet_password: _,
} = self;

View File

@ -0,0 +1,180 @@
use std::time::Duration;
use anyhow::{Context, Result};
use indexer_service_rpc::RpcClient;
use integration_tests::{
TIME_TO_WAIT_FOR_BLOCK_SECONDS, TestContext, format_private_account_id,
format_public_account_id, verify_commitment_is_in_state,
};
use log::info;
use nssa::AccountId;
use tokio::test;
use wallet::cli::{Command, programs::native_token_transfer::AuthTransferSubcommand};
/// Timeout in milliseconds to reliably await for block finalization
const L2_TO_L1_TIMEOUT_MILLIS: u64 = 500000;
#[test]
//#[ignore = "Not reliable with current bedrock node"]
async fn indexer_test_run() -> Result<()> {
let ctx = TestContext::new().await?;
// RUN OBSERVATION
tokio::time::sleep(std::time::Duration::from_millis(L2_TO_L1_TIMEOUT_MILLIS)).await;
let last_block_seq = ctx
.sequencer_client()
.get_last_block()
.await
.unwrap()
.last_block;
info!("Last block on seq now is {last_block_seq}");
let last_block_indexer = ctx
.indexer_client()
.get_last_finalized_block_id()
.await
.unwrap();
info!("Last block on ind now is {last_block_indexer}");
assert!(last_block_indexer > 1);
Ok(())
}
#[test]
#[ignore = "Not reliable with current bedrock node"]
async fn indexer_block_batching() -> Result<()> {
let ctx = TestContext::new().await?;
// WAIT
info!("Waiting for indexer to parse blocks");
tokio::time::sleep(std::time::Duration::from_millis(L2_TO_L1_TIMEOUT_MILLIS)).await;
let last_block_indexer = ctx
.indexer_client()
.get_last_finalized_block_id()
.await
.unwrap();
info!("Last block on ind now is {last_block_indexer}");
assert!(last_block_indexer > 1);
// Getting wide batch to fit all blocks
let block_batch = ctx.indexer_client().get_blocks(1, 100).await.unwrap();
// Checking chain consistency
let mut prev_block_hash = block_batch.first().unwrap().header.hash;
for block in &block_batch[1..] {
assert_eq!(block.header.prev_block_hash, prev_block_hash);
info!("Block {} chain-consistent", block.header.block_id);
prev_block_hash = block.header.hash;
}
Ok(())
}
#[test]
#[ignore = "Not reliable with current bedrock node"]
async fn indexer_state_consistency() -> Result<()> {
let mut ctx = TestContext::new().await?;
let command = Command::AuthTransfer(AuthTransferSubcommand::Send {
from: format_public_account_id(ctx.existing_public_accounts()[0]),
to: Some(format_public_account_id(ctx.existing_public_accounts()[1])),
to_npk: None,
to_vpk: None,
amount: 100,
});
wallet::cli::execute_subcommand(ctx.wallet_mut(), command).await?;
info!("Waiting for next block creation");
tokio::time::sleep(Duration::from_secs(TIME_TO_WAIT_FOR_BLOCK_SECONDS)).await;
info!("Checking correct balance move");
let acc_1_balance = ctx
.sequencer_client()
.get_account_balance(ctx.existing_public_accounts()[0])
.await?;
let acc_2_balance = ctx
.sequencer_client()
.get_account_balance(ctx.existing_public_accounts()[1])
.await?;
info!("Balance of sender: {acc_1_balance:#?}");
info!("Balance of receiver: {acc_2_balance:#?}");
assert_eq!(acc_1_balance.balance, 9900);
assert_eq!(acc_2_balance.balance, 20100);
let from: AccountId = ctx.existing_private_accounts()[0];
let to: AccountId = ctx.existing_private_accounts()[1];
let command = Command::AuthTransfer(AuthTransferSubcommand::Send {
from: format_private_account_id(from),
to: Some(format_private_account_id(to)),
to_npk: None,
to_vpk: None,
amount: 100,
});
wallet::cli::execute_subcommand(ctx.wallet_mut(), command).await?;
info!("Waiting for next block creation");
tokio::time::sleep(Duration::from_secs(TIME_TO_WAIT_FOR_BLOCK_SECONDS)).await;
let new_commitment1 = ctx
.wallet()
.get_private_account_commitment(from)
.context("Failed to get private account commitment for sender")?;
assert!(verify_commitment_is_in_state(new_commitment1, ctx.sequencer_client()).await);
let new_commitment2 = ctx
.wallet()
.get_private_account_commitment(to)
.context("Failed to get private account commitment for receiver")?;
assert!(verify_commitment_is_in_state(new_commitment2, ctx.sequencer_client()).await);
info!("Successfully transferred privately to owned account");
// WAIT
info!("Waiting for indexer to parse blocks");
tokio::time::sleep(std::time::Duration::from_millis(L2_TO_L1_TIMEOUT_MILLIS)).await;
let acc1_ind_state = ctx
.indexer_client()
.get_account(ctx.existing_public_accounts()[0].into())
.await
.unwrap();
let acc2_ind_state = ctx
.indexer_client()
.get_account(ctx.existing_public_accounts()[1].into())
.await
.unwrap();
info!("Checking correct state transition");
let acc1_seq_state = ctx
.sequencer_client()
.get_account(ctx.existing_public_accounts()[0])
.await?
.account;
let acc2_seq_state = ctx
.sequencer_client()
.get_account(ctx.existing_public_accounts()[1])
.await?
.account;
assert_eq!(acc1_ind_state, acc1_seq_state.into());
assert_eq!(acc2_ind_state, acc2_seq_state.into());
// ToDo: Check private state transition
Ok(())
}

View File

@ -146,6 +146,16 @@ impl PrivacyPreservingTransaction {
.map(|(_, public_key)| AccountId::from(public_key))
.collect()
}
pub fn affected_public_account_ids(&self) -> Vec<AccountId> {
let mut acc_set = self
.signer_account_ids()
.into_iter()
.collect::<HashSet<_>>();
acc_set.extend(&self.message.public_account_ids);
acc_set.into_iter().collect()
}
}
fn check_privacy_preserving_circuit_proof_is_valid(

View File

@ -1,4 +1,5 @@
use borsh::{BorshDeserialize, BorshSerialize};
use nssa_core::account::AccountId;
use sha2::{Digest as _, digest::FixedOutput as _};
use crate::{
@ -38,4 +39,8 @@ impl ProgramDeploymentTransaction {
hasher.update(&bytes);
hasher.finalize_fixed().into()
}
pub fn affected_public_account_ids(&self) -> Vec<AccountId> {
vec![]
}
}

View File

@ -45,6 +45,16 @@ impl PublicTransaction {
.collect()
}
pub fn affected_public_account_ids(&self) -> Vec<AccountId> {
let mut acc_set = self
.signer_account_ids()
.into_iter()
.collect::<HashSet<_>>();
acc_set.extend(&self.message.account_ids);
acc_set.into_iter().collect()
}
pub fn hash(&self) -> [u8; 32] {
let bytes = self.to_bytes();
let mut hasher = sha2::Sha256::new();

View File

@ -7,7 +7,7 @@ use common::{
transaction::NSSATransaction,
};
use nssa::V02State;
use storage::RocksDBIO;
use storage::sequencer::RocksDBIO;
pub struct SequencerStore {
dbio: RocksDBIO,

View File

@ -5,27 +5,15 @@ use std::{
};
use anyhow::Result;
pub use bedrock_client::BackoffConfig;
use common::config::BasicAuth;
use bedrock_client::BackoffConfig;
use common::{
block::{AccountInitialData, CommitmentsInitialData},
config::BasicAuth,
};
use logos_blockchain_core::mantle::ops::channel::ChannelId;
use nssa::AccountId;
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Debug, Serialize, Deserialize, Clone)]
/// Helperstruct for account serialization
pub struct AccountInitialData {
pub account_id: AccountId,
pub balance: u128,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
/// Helperstruct to initialize commitments
pub struct CommitmentsInitialData {
pub npk: nssa_core::NullifierPublicKey,
pub account: nssa_core::account::Account,
}
// TODO: Provide default values
#[derive(Clone, Serialize, Deserialize)]
pub struct SequencerConfig {

View File

@ -314,30 +314,6 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, I
}
}
// TODO: Introduce type-safe wrapper around checked transaction, e.g. AuthenticatedTransaction
pub fn transaction_pre_check(
tx: NSSATransaction,
) -> Result<NSSATransaction, TransactionMalformationError> {
// Stateless checks here
match tx {
NSSATransaction::Public(tx) => {
if tx.witness_set().is_valid_for(tx.message()) {
Ok(NSSATransaction::Public(tx))
} else {
Err(TransactionMalformationError::InvalidSignature)
}
}
NSSATransaction::PrivacyPreserving(tx) => {
if tx.witness_set().signatures_are_valid_for(tx.message()) {
Ok(NSSATransaction::PrivacyPreserving(tx))
} else {
Err(TransactionMalformationError::InvalidSignature)
}
}
NSSATransaction::ProgramDeployment(tx) => Ok(NSSATransaction::ProgramDeployment(tx)),
}
}
/// Load signing key from file or generate a new one if it doesn't exist
fn load_or_create_signing_key(path: &Path) -> Result<Ed25519Key> {
if path.exists() {
@ -364,15 +340,18 @@ mod tests {
use base58::ToBase58;
use bedrock_client::BackoffConfig;
use common::{test_utils::sequencer_sign_key_for_testing, transaction::NSSATransaction};
use common::{
block::AccountInitialData,
test_utils::sequencer_sign_key_for_testing,
transaction::{NSSATransaction, transaction_pre_check},
};
use logos_blockchain_core::mantle::ops::channel::ChannelId;
use mempool::MemPoolHandle;
use nssa::{AccountId, PrivateKey};
use crate::{
config::{AccountInitialData, BedrockConfig, SequencerConfig},
config::{BedrockConfig, SequencerConfig},
mock::SequencerCoreWithMockClients,
transaction_pre_check,
};
fn setup_sequencer_config_variable_initial_accounts(

View File

@ -8,7 +8,8 @@ license = { workspace = true }
nssa.workspace = true
common.workspace = true
mempool.workspace = true
sequencer_core.workspace = true
sequencer_core = { workspace = true }
bedrock_client.workspace = true
anyhow.workspace = true
serde_json.workspace = true

View File

@ -3,7 +3,7 @@ use std::collections::HashMap;
use actix_web::Error as HttpError;
use base64::{Engine, engine::general_purpose};
use common::{
block::HashableBlockData,
block::{AccountInitialData, HashableBlockData},
rpc_primitives::{
errors::RpcError,
message::{Message, Request},
@ -20,14 +20,13 @@ use common::{
SendTxResponse,
},
},
transaction::NSSATransaction,
transaction::{NSSATransaction, transaction_pre_check},
};
use itertools::Itertools as _;
use log::warn;
use nssa::{self, program::Program};
use sequencer_core::{
block_settlement_client::BlockSettlementClientTrait, config::AccountInitialData,
indexer_client::IndexerClientTrait,
block_settlement_client::BlockSettlementClientTrait, indexer_client::IndexerClientTrait,
};
use serde_json::Value;
@ -95,8 +94,8 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> JsonHandler<BC, IC>
let tx = borsh::from_slice::<NSSATransaction>(&send_tx_req.transaction).unwrap();
let tx_hash = tx.hash();
let authenticated_tx = sequencer_core::transaction_pre_check(tx)
.inspect_err(|err| warn!("Error at pre_check {err:#?}"))?;
let authenticated_tx =
transaction_pre_check(tx).inspect_err(|err| warn!("Error at pre_check {err:#?}"))?;
// TODO: Do we need a timeout here? It will be usable if we have too many transactions to
// process
@ -327,12 +326,14 @@ mod tests {
use base58::ToBase58;
use base64::{Engine, engine::general_purpose};
use bedrock_client::BackoffConfig;
use common::{
config::BasicAuth, test_utils::sequencer_sign_key_for_testing, transaction::NSSATransaction,
block::AccountInitialData, config::BasicAuth, test_utils::sequencer_sign_key_for_testing,
transaction::NSSATransaction,
};
use nssa::AccountId;
use sequencer_core::{
config::{AccountInitialData, BackoffConfig, BedrockConfig, SequencerConfig},
config::{BedrockConfig, SequencerConfig},
mock::{MockBlockSettlementClient, MockIndexerClient, SequencerCoreWithMockClients},
};
use serde_json::Value;

View File

@ -1,6 +1,8 @@
use common::rpc_primitives::errors::{RpcError, RpcParseError};
use common::{
rpc_primitives::errors::{RpcError, RpcParseError},
transaction::TransactionMalformationError,
};
use log::debug;
use sequencer_core::TransactionMalformationError;
pub struct RpcErr(pub RpcError);

View File

@ -1,5 +1,5 @@
{
"home": ".",
"home": "./sequencer_runner",
"override_rust_log": null,
"genesis_id": 1,
"is_genesis_random": true,

View File

@ -6,8 +6,9 @@ license = { workspace = true }
[dependencies]
common.workspace = true
nssa.workspace = true
thiserror.workspace = true
borsh.workspace = true
rocksdb.workspace = true
nssa.workspace = true
tempfile.workspace = true

1301
storage/src/indexer.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,601 +1,3 @@
use std::{path::Path, sync::Arc};
use common::block::{BedrockStatus, Block, BlockMeta, MantleMsgId};
use error::DbError;
use nssa::V02State;
use rocksdb::{
BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options, WriteBatch,
};
pub mod error;
/// Maximal size of stored blocks in base
///
/// Used to control db size
///
/// Currently effectively unbounded.
pub const BUFF_SIZE_ROCKSDB: usize = usize::MAX;
/// Size of stored blocks cache in memory
///
/// Keeping small to not run out of memory
pub const CACHE_SIZE: usize = 1000;
/// Key base for storing metainformation about id of first block in db
pub const DB_META_FIRST_BLOCK_IN_DB_KEY: &str = "first_block_in_db";
/// Key base for storing metainformation about id of last current block in db
pub const DB_META_LAST_BLOCK_IN_DB_KEY: &str = "last_block_in_db";
/// Key base for storing metainformation which describe if first block has been set
pub const DB_META_FIRST_BLOCK_SET_KEY: &str = "first_block_set";
/// Key base for storing metainformation about the last finalized block on Bedrock
pub const DB_META_LAST_FINALIZED_BLOCK_ID: &str = "last_finalized_block_id";
/// Key base for storing metainformation about the latest block meta
pub const DB_META_LATEST_BLOCK_META_KEY: &str = "latest_block_meta";
/// Key base for storing the NSSA state
pub const DB_NSSA_STATE_KEY: &str = "nssa_state";
/// Name of block column family
pub const CF_BLOCK_NAME: &str = "cf_block";
/// Name of meta column family
pub const CF_META_NAME: &str = "cf_meta";
/// Name of state column family
pub const CF_NSSA_STATE_NAME: &str = "cf_nssa_state";
pub type DbResult<T> = Result<T, DbError>;
pub struct RocksDBIO {
pub db: DBWithThreadMode<MultiThreaded>,
}
impl RocksDBIO {
pub fn open_or_create(
path: &Path,
start_block: Option<(&Block, MantleMsgId)>,
) -> DbResult<Self> {
let mut cf_opts = Options::default();
cf_opts.set_max_write_buffer_number(16);
// ToDo: Add more column families for different data
let cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone());
let cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone());
let cfstate = ColumnFamilyDescriptor::new(CF_NSSA_STATE_NAME, cf_opts.clone());
let mut db_opts = Options::default();
db_opts.create_missing_column_families(true);
db_opts.create_if_missing(true);
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&db_opts,
path,
vec![cfb, cfmeta, cfstate],
);
let dbio = Self {
// There is no point in handling this from runner code
db: db.unwrap(),
};
let is_start_set = dbio.get_meta_is_first_block_set()?;
if is_start_set {
Ok(dbio)
} else if let Some((block, msg_id)) = start_block {
let block_id = block.header.block_id;
dbio.put_meta_first_block_in_db(block, msg_id)?;
dbio.put_meta_is_first_block_set()?;
dbio.put_meta_last_block_in_db(block_id)?;
dbio.put_meta_last_finalized_block_id(None)?;
dbio.put_meta_latest_block_meta(&BlockMeta {
id: block.header.block_id,
hash: block.header.hash,
msg_id,
})?;
Ok(dbio)
} else {
// Here we are trying to start a DB without a block, one should not do it.
unreachable!()
}
}
pub fn destroy(path: &Path) -> DbResult<()> {
let mut cf_opts = Options::default();
cf_opts.set_max_write_buffer_number(16);
// ToDo: Add more column families for different data
let _cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone());
let _cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone());
let _cfstate = ColumnFamilyDescriptor::new(CF_NSSA_STATE_NAME, cf_opts.clone());
let mut db_opts = Options::default();
db_opts.create_missing_column_families(true);
db_opts.create_if_missing(true);
DBWithThreadMode::<MultiThreaded>::destroy(&db_opts, path)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))
}
pub fn meta_column(&self) -> Arc<BoundColumnFamily<'_>> {
self.db.cf_handle(CF_META_NAME).unwrap()
}
pub fn block_column(&self) -> Arc<BoundColumnFamily<'_>> {
self.db.cf_handle(CF_BLOCK_NAME).unwrap()
}
pub fn nssa_state_column(&self) -> Arc<BoundColumnFamily<'_>> {
self.db.cf_handle(CF_NSSA_STATE_NAME).unwrap()
}
pub fn get_meta_first_block_in_db(&self) -> DbResult<u64> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_FIRST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<u64>(&data).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to deserialize first block".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"First block not found".to_string(),
))
}
}
pub fn get_meta_last_block_in_db(&self) -> DbResult<u64> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<u64>(&data).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to deserialize last block".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"Last block not found".to_string(),
))
}
}
pub fn get_meta_is_first_block_set(&self) -> DbResult<bool> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_SET_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_FIRST_BLOCK_SET_KEY".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(res.is_some())
}
pub fn put_nssa_state_in_db(&self, state: &V02State, batch: &mut WriteBatch) -> DbResult<()> {
let cf_nssa_state = self.nssa_state_column();
batch.put_cf(
&cf_nssa_state,
borsh::to_vec(&DB_NSSA_STATE_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_NSSA_STATE_KEY".to_string()),
)
})?,
borsh::to_vec(state).map_err(|err| {
DbError::borsh_cast_message(err, Some("Failed to serialize NSSA state".to_string()))
})?,
);
Ok(())
}
pub fn put_meta_first_block_in_db(&self, block: &Block, msg_id: MantleMsgId) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_FIRST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
borsh::to_vec(&block.header.block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize first block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
let mut batch = WriteBatch::default();
self.put_block(block, msg_id, true, &mut batch)?;
self.db.write(batch).map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
Some("Failed to write first block in db".to_string()),
)
})?;
Ok(())
}
pub fn put_meta_last_block_in_db(&self, block_id: u64) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize last block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
fn put_meta_last_block_in_db_batch(
&self,
block_id: u64,
batch: &mut WriteBatch,
) -> DbResult<()> {
let cf_meta = self.meta_column();
batch.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize last block id".to_string()),
)
})?,
);
Ok(())
}
pub fn put_meta_last_finalized_block_id(&self, block_id: Option<u64>) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_FINALIZED_BLOCK_ID).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LAST_FINALIZED_BLOCK_ID".to_string()),
)
})?,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize last block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
pub fn put_meta_is_first_block_set(&self) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_SET_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_FIRST_BLOCK_SET_KEY".to_string()),
)
})?,
[1u8; 1],
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
fn put_meta_latest_block_meta(&self, block_meta: &BlockMeta) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_string()),
)
})?,
borsh::to_vec(&block_meta).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize latest block meta".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
fn put_meta_latest_block_meta_batch(
&self,
block_meta: &BlockMeta,
batch: &mut WriteBatch,
) -> DbResult<()> {
let cf_meta = self.meta_column();
batch.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_string()),
)
})?,
borsh::to_vec(&block_meta).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize latest block meta".to_string()),
)
})?,
);
Ok(())
}
pub fn latest_block_meta(&self) -> DbResult<BlockMeta> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(
&cf_meta,
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<BlockMeta>(&data).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to deserialize latest block meta".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"Latest block meta not found".to_string(),
))
}
}
pub fn put_block(
&self,
block: &Block,
msg_id: MantleMsgId,
first: bool,
batch: &mut WriteBatch,
) -> DbResult<()> {
let cf_block = self.block_column();
if !first {
let last_curr_block = self.get_meta_last_block_in_db()?;
if block.header.block_id > last_curr_block {
self.put_meta_last_block_in_db_batch(block.header.block_id, batch)?;
self.put_meta_latest_block_meta_batch(
&BlockMeta {
id: block.header.block_id,
hash: block.header.hash,
msg_id,
},
batch,
)?;
}
}
batch.put_cf(
&cf_block,
borsh::to_vec(&block.header.block_id).map_err(|err| {
DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_string()))
})?,
borsh::to_vec(block).map_err(|err| {
DbError::borsh_cast_message(err, Some("Failed to serialize block data".to_string()))
})?,
);
Ok(())
}
pub fn get_block(&self, block_id: u64) -> DbResult<Block> {
let cf_block = self.block_column();
let res = self
.db
.get_cf(
&cf_block,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<Block>(&data).map_err(|serr| {
DbError::borsh_cast_message(
serr,
Some("Failed to deserialize block data".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"Block on this id not found".to_string(),
))
}
}
pub fn get_nssa_state(&self) -> DbResult<V02State> {
let cf_nssa_state = self.nssa_state_column();
let res = self
.db
.get_cf(
&cf_nssa_state,
borsh::to_vec(&DB_NSSA_STATE_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<V02State>(&data).map_err(|serr| {
DbError::borsh_cast_message(
serr,
Some("Failed to deserialize block data".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"Block on this id not found".to_string(),
))
}
}
pub fn delete_block(&self, block_id: u64) -> DbResult<()> {
let cf_block = self.block_column();
let key = borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_string()))
})?;
if self
.db
.get_cf(&cf_block, &key)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?
.is_none()
{
return Err(DbError::db_interaction_error(
"Block on this id not found".to_string(),
));
}
self.db
.delete_cf(&cf_block, key)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
pub fn mark_block_as_finalized(&self, block_id: u64) -> DbResult<()> {
let mut block = self.get_block(block_id)?;
block.bedrock_status = BedrockStatus::Finalized;
let cf_block = self.block_column();
self.db
.put_cf(
&cf_block,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block id".to_string()),
)
})?,
borsh::to_vec(&block).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block data".to_string()),
)
})?,
)
.map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
Some(format!("Failed to mark block {block_id} as finalized")),
)
})?;
Ok(())
}
pub fn get_all_blocks(&self) -> impl Iterator<Item = DbResult<Block>> {
let cf_block = self.block_column();
self.db
.iterator_cf(&cf_block, rocksdb::IteratorMode::Start)
.map(|res| {
let (_key, value) = res.map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
Some("Failed to get key value pair".to_string()),
)
})?;
borsh::from_slice::<Block>(&value).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to deserialize block data".to_string()),
)
})
})
}
pub fn atomic_update(
&self,
block: &Block,
msg_id: MantleMsgId,
state: &V02State,
) -> DbResult<()> {
let block_id = block.header.block_id;
let mut batch = WriteBatch::default();
self.put_block(block, msg_id, false, &mut batch)?;
self.put_nssa_state_in_db(state, &mut batch)?;
self.db.write(batch).map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
Some(format!("Failed to udpate db with block {block_id}")),
)
})
}
}
pub mod indexer;
pub mod sequencer;

600
storage/src/sequencer.rs Normal file
View File

@ -0,0 +1,600 @@
use std::{path::Path, sync::Arc};
use common::block::{BedrockStatus, Block, BlockMeta, MantleMsgId};
use nssa::V02State;
use rocksdb::{
BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options, WriteBatch,
};
use crate::error::DbError;
/// Maximal size of stored blocks in base
///
/// Used to control db size
///
/// Currently effectively unbounded.
pub const BUFF_SIZE_ROCKSDB: usize = usize::MAX;
/// Size of stored blocks cache in memory
///
/// Keeping small to not run out of memory
pub const CACHE_SIZE: usize = 1000;
/// Key base for storing metainformation about id of first block in db
pub const DB_META_FIRST_BLOCK_IN_DB_KEY: &str = "first_block_in_db";
/// Key base for storing metainformation about id of last current block in db
pub const DB_META_LAST_BLOCK_IN_DB_KEY: &str = "last_block_in_db";
/// Key base for storing metainformation which describe if first block has been set
pub const DB_META_FIRST_BLOCK_SET_KEY: &str = "first_block_set";
/// Key base for storing metainformation about the last finalized block on Bedrock
pub const DB_META_LAST_FINALIZED_BLOCK_ID: &str = "last_finalized_block_id";
/// Key base for storing metainformation about the latest block meta
pub const DB_META_LATEST_BLOCK_META_KEY: &str = "latest_block_meta";
/// Key base for storing the NSSA state
pub const DB_NSSA_STATE_KEY: &str = "nssa_state";
/// Name of block column family
pub const CF_BLOCK_NAME: &str = "cf_block";
/// Name of meta column family
pub const CF_META_NAME: &str = "cf_meta";
/// Name of state column family
pub const CF_NSSA_STATE_NAME: &str = "cf_nssa_state";
pub type DbResult<T> = Result<T, DbError>;
pub struct RocksDBIO {
pub db: DBWithThreadMode<MultiThreaded>,
}
impl RocksDBIO {
pub fn open_or_create(
path: &Path,
start_block: Option<(&Block, MantleMsgId)>,
) -> DbResult<Self> {
let mut cf_opts = Options::default();
cf_opts.set_max_write_buffer_number(16);
// ToDo: Add more column families for different data
let cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone());
let cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone());
let cfstate = ColumnFamilyDescriptor::new(CF_NSSA_STATE_NAME, cf_opts.clone());
let mut db_opts = Options::default();
db_opts.create_missing_column_families(true);
db_opts.create_if_missing(true);
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&db_opts,
path,
vec![cfb, cfmeta, cfstate],
);
let dbio = Self {
// There is no point in handling this from runner code
db: db.unwrap(),
};
let is_start_set = dbio.get_meta_is_first_block_set()?;
if is_start_set {
Ok(dbio)
} else if let Some((block, msg_id)) = start_block {
let block_id = block.header.block_id;
dbio.put_meta_first_block_in_db(block, msg_id)?;
dbio.put_meta_is_first_block_set()?;
dbio.put_meta_last_block_in_db(block_id)?;
dbio.put_meta_last_finalized_block_id(None)?;
dbio.put_meta_latest_block_meta(&BlockMeta {
id: block.header.block_id,
hash: block.header.hash,
msg_id,
})?;
Ok(dbio)
} else {
// Here we are trying to start a DB without a block, one should not do it.
unreachable!()
}
}
pub fn destroy(path: &Path) -> DbResult<()> {
let mut cf_opts = Options::default();
cf_opts.set_max_write_buffer_number(16);
// ToDo: Add more column families for different data
let _cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone());
let _cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone());
let _cfstate = ColumnFamilyDescriptor::new(CF_NSSA_STATE_NAME, cf_opts.clone());
let mut db_opts = Options::default();
db_opts.create_missing_column_families(true);
db_opts.create_if_missing(true);
DBWithThreadMode::<MultiThreaded>::destroy(&db_opts, path)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))
}
pub fn meta_column(&self) -> Arc<BoundColumnFamily<'_>> {
self.db.cf_handle(CF_META_NAME).unwrap()
}
pub fn block_column(&self) -> Arc<BoundColumnFamily<'_>> {
self.db.cf_handle(CF_BLOCK_NAME).unwrap()
}
pub fn nssa_state_column(&self) -> Arc<BoundColumnFamily<'_>> {
self.db.cf_handle(CF_NSSA_STATE_NAME).unwrap()
}
pub fn get_meta_first_block_in_db(&self) -> DbResult<u64> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_FIRST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<u64>(&data).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to deserialize first block".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"First block not found".to_string(),
))
}
}
pub fn get_meta_last_block_in_db(&self) -> DbResult<u64> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<u64>(&data).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to deserialize last block".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"Last block not found".to_string(),
))
}
}
pub fn get_meta_is_first_block_set(&self) -> DbResult<bool> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_SET_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_FIRST_BLOCK_SET_KEY".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(res.is_some())
}
pub fn put_nssa_state_in_db(&self, state: &V02State, batch: &mut WriteBatch) -> DbResult<()> {
let cf_nssa_state = self.nssa_state_column();
batch.put_cf(
&cf_nssa_state,
borsh::to_vec(&DB_NSSA_STATE_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_NSSA_STATE_KEY".to_string()),
)
})?,
borsh::to_vec(state).map_err(|err| {
DbError::borsh_cast_message(err, Some("Failed to serialize NSSA state".to_string()))
})?,
);
Ok(())
}
pub fn put_meta_first_block_in_db(&self, block: &Block, msg_id: MantleMsgId) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_FIRST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
borsh::to_vec(&block.header.block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize first block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
let mut batch = WriteBatch::default();
self.put_block(block, msg_id, true, &mut batch)?;
self.db.write(batch).map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
Some("Failed to write first block in db".to_string()),
)
})?;
Ok(())
}
pub fn put_meta_last_block_in_db(&self, block_id: u64) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize last block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
fn put_meta_last_block_in_db_batch(
&self,
block_id: u64,
batch: &mut WriteBatch,
) -> DbResult<()> {
let cf_meta = self.meta_column();
batch.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize last block id".to_string()),
)
})?,
);
Ok(())
}
pub fn put_meta_last_finalized_block_id(&self, block_id: Option<u64>) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_FINALIZED_BLOCK_ID).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LAST_FINALIZED_BLOCK_ID".to_string()),
)
})?,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize last block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
pub fn put_meta_is_first_block_set(&self) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_SET_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_FIRST_BLOCK_SET_KEY".to_string()),
)
})?,
[1u8; 1],
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
fn put_meta_latest_block_meta(&self, block_meta: &BlockMeta) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_string()),
)
})?,
borsh::to_vec(&block_meta).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize latest block meta".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
fn put_meta_latest_block_meta_batch(
&self,
block_meta: &BlockMeta,
batch: &mut WriteBatch,
) -> DbResult<()> {
let cf_meta = self.meta_column();
batch.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_string()),
)
})?,
borsh::to_vec(&block_meta).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize latest block meta".to_string()),
)
})?,
);
Ok(())
}
pub fn latest_block_meta(&self) -> DbResult<BlockMeta> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(
&cf_meta,
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<BlockMeta>(&data).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to deserialize latest block meta".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"Latest block meta not found".to_string(),
))
}
}
pub fn put_block(
&self,
block: &Block,
msg_id: MantleMsgId,
first: bool,
batch: &mut WriteBatch,
) -> DbResult<()> {
let cf_block = self.block_column();
if !first {
let last_curr_block = self.get_meta_last_block_in_db()?;
if block.header.block_id > last_curr_block {
self.put_meta_last_block_in_db_batch(block.header.block_id, batch)?;
self.put_meta_latest_block_meta_batch(
&BlockMeta {
id: block.header.block_id,
hash: block.header.hash,
msg_id,
},
batch,
)?;
}
}
batch.put_cf(
&cf_block,
borsh::to_vec(&block.header.block_id).map_err(|err| {
DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_string()))
})?,
borsh::to_vec(block).map_err(|err| {
DbError::borsh_cast_message(err, Some("Failed to serialize block data".to_string()))
})?,
);
Ok(())
}
pub fn get_block(&self, block_id: u64) -> DbResult<Block> {
let cf_block = self.block_column();
let res = self
.db
.get_cf(
&cf_block,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<Block>(&data).map_err(|serr| {
DbError::borsh_cast_message(
serr,
Some("Failed to deserialize block data".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"Block on this id not found".to_string(),
))
}
}
pub fn get_nssa_state(&self) -> DbResult<V02State> {
let cf_nssa_state = self.nssa_state_column();
let res = self
.db
.get_cf(
&cf_nssa_state,
borsh::to_vec(&DB_NSSA_STATE_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<V02State>(&data).map_err(|serr| {
DbError::borsh_cast_message(
serr,
Some("Failed to deserialize block data".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"Block on this id not found".to_string(),
))
}
}
pub fn delete_block(&self, block_id: u64) -> DbResult<()> {
let cf_block = self.block_column();
let key = borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_string()))
})?;
if self
.db
.get_cf(&cf_block, &key)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?
.is_none()
{
return Err(DbError::db_interaction_error(
"Block on this id not found".to_string(),
));
}
self.db
.delete_cf(&cf_block, key)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
pub fn mark_block_as_finalized(&self, block_id: u64) -> DbResult<()> {
let mut block = self.get_block(block_id)?;
block.bedrock_status = BedrockStatus::Finalized;
let cf_block = self.block_column();
self.db
.put_cf(
&cf_block,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block id".to_string()),
)
})?,
borsh::to_vec(&block).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block data".to_string()),
)
})?,
)
.map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
Some(format!("Failed to mark block {block_id} as finalized")),
)
})?;
Ok(())
}
pub fn get_all_blocks(&self) -> impl Iterator<Item = DbResult<Block>> {
let cf_block = self.block_column();
self.db
.iterator_cf(&cf_block, rocksdb::IteratorMode::Start)
.map(|res| {
let (_key, value) = res.map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
Some("Failed to get key value pair".to_string()),
)
})?;
borsh::from_slice::<Block>(&value).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to deserialize block data".to_string()),
)
})
})
}
pub fn atomic_update(
&self,
block: &Block,
msg_id: MantleMsgId,
state: &V02State,
) -> DbResult<()> {
let block_id = block.header.block_id;
let mut batch = WriteBatch::default();
self.put_block(block, msg_id, false, &mut batch)?;
self.put_nssa_state_in_db(state, &mut batch)?;
self.db.write(batch).map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
Some(format!("Failed to udpate db with block {block_id}")),
)
})
}
}