mirror of
https://github.com/logos-blockchain/lssa.git
synced 2026-02-18 20:33:13 +00:00
Merge 992fd26bcd768343455e222b582bf72f71128f96 into 27f31cf3d045506e3f0e887628057e15c7588463
This commit is contained in:
commit
16eca57e31
1443
Cargo.lock
generated
1443
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -35,8 +35,12 @@ To our knowledge, this design is unique to LEZ. Other privacy-focused programmab
|
||||
3. Transferring private to public (local / privacy-preserving execution)
|
||||
- Bob executes the token program `Transfer` function locally, sending to Charlie’s public account.
|
||||
- A ZKP of correct execution is generated.
|
||||
- Bob’s private balance stays hidden.
|
||||
- Charlie’s public account is updated on-chain.
|
||||
- Bob’s private account and balance still remain hidden.
|
||||
- Charlie's public account is modified with the new tokens added.
|
||||
4. Transferring public to public (public execution):
|
||||
- Alice submits a transaction to execute the token program `Transfer` function on-chain, specifying Charlie's public account as recipient.
|
||||
- The execution is handled on-chain without ZKPs involved.
|
||||
- Alice's and Charlie's accounts are modified according to the transaction.
|
||||
|
||||
4. Transfer from public to public (public execution)
|
||||
- Alice submits an on-chain transaction to run `Transfer`, sending to Charlie’s public account.
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -1,4 +1,6 @@
|
||||
use borsh::{BorshDeserialize, BorshSerialize};
|
||||
use nssa::AccountId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sha2::{Digest, Sha256, digest::FixedOutput};
|
||||
|
||||
use crate::{HashType, transaction::NSSATransaction};
|
||||
@ -50,7 +52,7 @@ pub enum BedrockStatus {
|
||||
Finalized,
|
||||
}
|
||||
|
||||
#[derive(Debug, BorshSerialize, BorshDeserialize)]
|
||||
#[derive(Debug, BorshSerialize, BorshDeserialize, Clone)]
|
||||
pub struct Block {
|
||||
pub header: BlockHeader,
|
||||
pub body: BlockBody,
|
||||
@ -107,6 +109,20 @@ impl From<Block> for HashableBlockData {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
/// Helperstruct for account serialization
|
||||
pub struct AccountInitialData {
|
||||
pub account_id: AccountId,
|
||||
pub balance: u128,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
/// Helperstruct to initialize commitments
|
||||
pub struct CommitmentsInitialData {
|
||||
pub npk: nssa_core::NullifierPublicKey,
|
||||
pub account: nssa_core::account::Account,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{HashType, block::HashableBlockData, test_utils};
|
||||
|
||||
@ -1,4 +1,8 @@
|
||||
use std::fmt::Display;
|
||||
|
||||
use borsh::{BorshDeserialize, BorshSerialize};
|
||||
use log::warn;
|
||||
use nssa::{AccountId, V02State};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::HashType;
|
||||
@ -32,6 +36,16 @@ impl From<nssa::PrivacyPreservingTransaction> for NSSATransaction {
|
||||
}
|
||||
}
|
||||
|
||||
impl NSSATransaction {
|
||||
pub fn affected_public_account_ids(&self) -> Vec<AccountId> {
|
||||
match self {
|
||||
NSSATransaction::ProgramDeployment(tx) => tx.affected_public_account_ids(),
|
||||
NSSATransaction::Public(tx) => tx.affected_public_account_ids(),
|
||||
NSSATransaction::PrivacyPreserving(tx) => tx.affected_public_account_ids(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<nssa::ProgramDeploymentTransaction> for NSSATransaction {
|
||||
fn from(value: nssa::ProgramDeploymentTransaction) -> Self {
|
||||
Self::ProgramDeployment(value)
|
||||
@ -46,3 +60,59 @@ pub enum TxKind {
|
||||
PrivacyPreserving,
|
||||
ProgramDeployment,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub enum TransactionMalformationError {
|
||||
InvalidSignature,
|
||||
FailedToDecode { tx: HashType },
|
||||
}
|
||||
|
||||
impl Display for TransactionMalformationError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{self:#?}")
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for TransactionMalformationError {}
|
||||
|
||||
// TODO: Introduce type-safe wrapper around checked transaction, e.g. AuthenticatedTransaction
|
||||
pub fn transaction_pre_check(
|
||||
tx: NSSATransaction,
|
||||
) -> Result<NSSATransaction, TransactionMalformationError> {
|
||||
// Stateless checks here
|
||||
match tx {
|
||||
NSSATransaction::Public(tx) => {
|
||||
if tx.witness_set().is_valid_for(tx.message()) {
|
||||
Ok(NSSATransaction::Public(tx))
|
||||
} else {
|
||||
Err(TransactionMalformationError::InvalidSignature)
|
||||
}
|
||||
}
|
||||
NSSATransaction::PrivacyPreserving(tx) => {
|
||||
if tx.witness_set().signatures_are_valid_for(tx.message()) {
|
||||
Ok(NSSATransaction::PrivacyPreserving(tx))
|
||||
} else {
|
||||
Err(TransactionMalformationError::InvalidSignature)
|
||||
}
|
||||
}
|
||||
NSSATransaction::ProgramDeployment(tx) => Ok(NSSATransaction::ProgramDeployment(tx)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn execute_check_transaction_on_state(
|
||||
state: &mut V02State,
|
||||
tx: NSSATransaction,
|
||||
) -> Result<NSSATransaction, nssa::error::NssaError> {
|
||||
match &tx {
|
||||
NSSATransaction::Public(tx) => state.transition_from_public_transaction(tx),
|
||||
NSSATransaction::PrivacyPreserving(tx) => {
|
||||
state.transition_from_privacy_preserving_transaction(tx)
|
||||
}
|
||||
NSSATransaction::ProgramDeployment(tx) => {
|
||||
state.transition_from_program_deployment_transaction(tx)
|
||||
}
|
||||
}
|
||||
.inspect_err(|err| warn!("Error at transition {err:#?}"))?;
|
||||
|
||||
Ok(tx)
|
||||
}
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
use std::str::FromStr as _;
|
||||
use std::str::FromStr;
|
||||
|
||||
use indexer_service_protocol::{Account, AccountId, Block, BlockId, HashType, Transaction};
|
||||
use leptos::prelude::*;
|
||||
|
||||
@ -7,6 +7,9 @@ license = { workspace = true }
|
||||
[dependencies]
|
||||
common.workspace = true
|
||||
bedrock_client.workspace = true
|
||||
nssa.workspace = true
|
||||
nssa_core.workspace = true
|
||||
storage.workspace = true
|
||||
|
||||
anyhow.workspace = true
|
||||
log.workspace = true
|
||||
|
||||
110
indexer/core/src/block_store.rs
Normal file
110
indexer/core/src/block_store.rs
Normal file
@ -0,0 +1,110 @@
|
||||
use std::{path::Path, sync::Arc};
|
||||
|
||||
use anyhow::Result;
|
||||
use common::{
|
||||
block::Block,
|
||||
transaction::{NSSATransaction, execute_check_transaction_on_state, transaction_pre_check},
|
||||
};
|
||||
use nssa::{Account, AccountId, V02State};
|
||||
use storage::indexer::RocksDBIO;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct IndexerStore {
|
||||
dbio: Arc<RocksDBIO>,
|
||||
}
|
||||
|
||||
impl IndexerStore {
|
||||
/// Starting database at the start of new chain.
|
||||
/// Creates files if necessary.
|
||||
///
|
||||
/// ATTENTION: Will overwrite genesis block.
|
||||
pub fn open_db_with_genesis(
|
||||
location: &Path,
|
||||
start_data: Option<(Block, V02State)>,
|
||||
) -> Result<Self> {
|
||||
let dbio = RocksDBIO::open_or_create(location, start_data)?;
|
||||
|
||||
Ok(Self {
|
||||
dbio: Arc::new(dbio),
|
||||
})
|
||||
}
|
||||
|
||||
/// Reopening existing database
|
||||
pub fn open_db_restart(location: &Path) -> Result<Self> {
|
||||
Self::open_db_with_genesis(location, None)
|
||||
}
|
||||
|
||||
pub fn get_last_block_id(&self) -> Result<u64> {
|
||||
Ok(self.dbio.get_meta_last_block_in_db()?)
|
||||
}
|
||||
|
||||
pub fn get_block_at_id(&self, id: u64) -> Result<Block> {
|
||||
Ok(self.dbio.get_block(id)?)
|
||||
}
|
||||
|
||||
pub fn get_block_batch(&self, offset: u64, limit: u64) -> Result<Vec<Block>> {
|
||||
Ok(self.dbio.get_block_batch(offset, limit)?)
|
||||
}
|
||||
|
||||
pub fn get_transaction_by_hash(&self, tx_hash: [u8; 32]) -> Result<NSSATransaction> {
|
||||
let block = self.get_block_at_id(self.dbio.get_block_id_by_tx_hash(tx_hash)?)?;
|
||||
let transaction = block
|
||||
.body
|
||||
.transactions
|
||||
.iter()
|
||||
.find(|enc_tx| enc_tx.hash().0 == tx_hash)
|
||||
.ok_or_else(|| anyhow::anyhow!("Transaction not found in DB"))?;
|
||||
|
||||
Ok(transaction.clone())
|
||||
}
|
||||
|
||||
pub fn get_block_by_hash(&self, hash: [u8; 32]) -> Result<Block> {
|
||||
self.get_block_at_id(self.dbio.get_block_id_by_hash(hash)?)
|
||||
}
|
||||
|
||||
pub fn get_transactions_by_account(
|
||||
&self,
|
||||
acc_id: [u8; 32],
|
||||
offset: u64,
|
||||
limit: u64,
|
||||
) -> Result<Vec<NSSATransaction>> {
|
||||
Ok(self.dbio.get_acc_transactions(acc_id, offset, limit)?)
|
||||
}
|
||||
|
||||
pub fn genesis_id(&self) -> u64 {
|
||||
self.dbio
|
||||
.get_meta_first_block_in_db()
|
||||
.expect("Must be set at the DB startup")
|
||||
}
|
||||
|
||||
pub fn last_block(&self) -> u64 {
|
||||
self.dbio
|
||||
.get_meta_last_block_in_db()
|
||||
.expect("Must be set at the DB startup")
|
||||
}
|
||||
|
||||
pub fn get_state_at_block(&self, block_id: u64) -> Result<V02State> {
|
||||
Ok(self.dbio.calculate_state_for_id(block_id)?)
|
||||
}
|
||||
|
||||
pub fn final_state(&self) -> Result<V02State> {
|
||||
Ok(self.dbio.final_state()?)
|
||||
}
|
||||
|
||||
pub fn get_account_final(&self, account_id: &AccountId) -> Result<Account> {
|
||||
Ok(self.final_state()?.get_account_by_id(*account_id))
|
||||
}
|
||||
|
||||
pub fn put_block(&self, block: Block) -> Result<()> {
|
||||
let mut final_state = self.dbio.final_state()?;
|
||||
|
||||
for transaction in &block.body.transactions {
|
||||
execute_check_transaction_on_state(
|
||||
&mut final_state,
|
||||
transaction_pre_check(transaction.clone())?,
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(self.dbio.put_block(block)?)
|
||||
}
|
||||
}
|
||||
@ -1,14 +1,21 @@
|
||||
use std::{fs::File, io::BufReader, path::Path};
|
||||
use std::{
|
||||
fs::File,
|
||||
io::BufReader,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use anyhow::{Context as _, Result};
|
||||
pub use bedrock_client::BackoffConfig;
|
||||
use common::config::BasicAuth;
|
||||
use common::{
|
||||
block::{AccountInitialData, CommitmentsInitialData},
|
||||
config::BasicAuth,
|
||||
};
|
||||
pub use logos_blockchain_core::mantle::ops::channel::ChannelId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use url::Url;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct BedrockClientConfig {
|
||||
pub struct ClientConfig {
|
||||
/// For individual RPC requests we use Fibonacci backoff retry strategy.
|
||||
pub backoff: BackoffConfig,
|
||||
pub addr: Url,
|
||||
@ -18,8 +25,20 @@ pub struct BedrockClientConfig {
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct IndexerConfig {
|
||||
/// Home dir of sequencer storage
|
||||
pub home: PathBuf,
|
||||
/// List of initial accounts data
|
||||
pub initial_accounts: Vec<AccountInitialData>,
|
||||
/// List of initial commitments
|
||||
pub initial_commitments: Vec<CommitmentsInitialData>,
|
||||
/// Sequencers signing key
|
||||
///
|
||||
/// ToDo: Remove it after introducing bedrock block parsing.
|
||||
/// Currently can not be removed, because indexer must start
|
||||
/// chain BEFORE sequencer.
|
||||
pub signing_key: [u8; 32],
|
||||
pub resubscribe_interval_millis: u64,
|
||||
pub bedrock_client_config: BedrockClientConfig,
|
||||
pub bedrock_client_config: ClientConfig,
|
||||
pub channel_id: ChannelId,
|
||||
}
|
||||
|
||||
|
||||
@ -1,92 +1,121 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{Context as _, Result};
|
||||
use anyhow::Result;
|
||||
use bedrock_client::BedrockClient;
|
||||
use common::block::Block;
|
||||
use common::block::{Block, HashableBlockData};
|
||||
// ToDo: Remove after testnet
|
||||
use common::{HashType, PINATA_BASE58};
|
||||
use futures::StreamExt;
|
||||
use log::{debug, info};
|
||||
use log::info;
|
||||
use logos_blockchain_core::mantle::{
|
||||
Op, SignedMantleTx,
|
||||
ops::channel::{ChannelId, inscribe::InscriptionOp},
|
||||
};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::{config::IndexerConfig, state::IndexerState};
|
||||
use crate::{block_store::IndexerStore, config::IndexerConfig};
|
||||
|
||||
pub mod block_store;
|
||||
pub mod config;
|
||||
pub mod state;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct IndexerCore {
|
||||
bedrock_client: BedrockClient,
|
||||
config: IndexerConfig,
|
||||
state: IndexerState,
|
||||
pub bedrock_client: BedrockClient,
|
||||
pub config: IndexerConfig,
|
||||
pub store: IndexerStore,
|
||||
}
|
||||
|
||||
impl IndexerCore {
|
||||
pub fn new(config: IndexerConfig) -> Result<Self> {
|
||||
// ToDo: replace with correct startup
|
||||
let hashable_data = HashableBlockData {
|
||||
block_id: 1,
|
||||
transactions: vec![],
|
||||
prev_block_hash: HashType([0; 32]),
|
||||
timestamp: 0,
|
||||
};
|
||||
|
||||
let signing_key = nssa::PrivateKey::try_new(config.signing_key).unwrap();
|
||||
let channel_genesis_msg_id = [0; 32];
|
||||
let start_block = hashable_data.into_pending_block(&signing_key, channel_genesis_msg_id);
|
||||
|
||||
let initial_commitments: Vec<nssa_core::Commitment> = config
|
||||
.initial_commitments
|
||||
.iter()
|
||||
.map(|init_comm_data| {
|
||||
let npk = &init_comm_data.npk;
|
||||
|
||||
let mut acc = init_comm_data.account.clone();
|
||||
|
||||
acc.program_owner = nssa::program::Program::authenticated_transfer_program().id();
|
||||
|
||||
nssa_core::Commitment::new(npk, &acc)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let init_accs: Vec<(nssa::AccountId, u128)> = config
|
||||
.initial_accounts
|
||||
.iter()
|
||||
.map(|acc_data| (acc_data.account_id, acc_data.balance))
|
||||
.collect();
|
||||
|
||||
let mut state = nssa::V02State::new_with_genesis_accounts(&init_accs, &initial_commitments);
|
||||
|
||||
// ToDo: Remove after testnet
|
||||
state.add_pinata_program(PINATA_BASE58.parse().unwrap());
|
||||
|
||||
let home = config.home.join("rocksdb");
|
||||
|
||||
Ok(Self {
|
||||
bedrock_client: BedrockClient::new(
|
||||
config.bedrock_client_config.backoff,
|
||||
config.bedrock_client_config.addr.clone(),
|
||||
config.bedrock_client_config.auth.clone(),
|
||||
)
|
||||
.context("Failed to create Bedrock client")?,
|
||||
)?,
|
||||
config,
|
||||
// No state setup for now, future task.
|
||||
state: IndexerState {
|
||||
latest_seen_block: Arc::new(RwLock::new(0)),
|
||||
},
|
||||
// ToDo: Implement restarts
|
||||
store: IndexerStore::open_db_with_genesis(&home, Some((start_block, state)))?,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn subscribe_parse_block_stream(&self) -> impl futures::Stream<Item = Result<Block>> {
|
||||
debug!("Subscribing to Bedrock block stream");
|
||||
async_stream::stream! {
|
||||
loop {
|
||||
let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?);
|
||||
loop {
|
||||
let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?);
|
||||
|
||||
info!("Block stream joined");
|
||||
info!("Block stream joined");
|
||||
|
||||
while let Some(block_info) = stream_pinned.next().await {
|
||||
let header_id = block_info.header_id;
|
||||
while let Some(block_info) = stream_pinned.next().await {
|
||||
let header_id = block_info.header_id;
|
||||
|
||||
info!("Observed L1 block at height {}", block_info.height);
|
||||
info!("Observed L1 block at height {}", block_info.height);
|
||||
|
||||
if let Some(l1_block) = self
|
||||
.bedrock_client
|
||||
.get_block_by_id(header_id)
|
||||
.await?
|
||||
{
|
||||
info!("Extracted L1 block at height {}", block_info.height);
|
||||
if let Some(l1_block) = self
|
||||
.bedrock_client
|
||||
.get_block_by_id(header_id)
|
||||
.await?
|
||||
{
|
||||
info!("Extracted L1 block at height {}", block_info.height);
|
||||
|
||||
let l2_blocks_parsed = parse_blocks(
|
||||
l1_block.into_transactions().into_iter(),
|
||||
&self.config.channel_id,
|
||||
).collect::<Vec<_>>();
|
||||
let l2_blocks_parsed = parse_blocks(
|
||||
l1_block.into_transactions().into_iter(),
|
||||
&self.config.channel_id,
|
||||
).collect::<Vec<_>>();
|
||||
|
||||
info!("Parsed {} L2 blocks", l2_blocks_parsed.len());
|
||||
info!("Parsed {} L2 blocks", l2_blocks_parsed.len());
|
||||
|
||||
for l2_block in l2_blocks_parsed {
|
||||
// State modification, will be updated in future
|
||||
{
|
||||
let mut guard = self.state.latest_seen_block.write().await;
|
||||
if l2_block.header.block_id > *guard {
|
||||
*guard = l2_block.header.block_id;
|
||||
}
|
||||
}
|
||||
for l2_block in l2_blocks_parsed {
|
||||
self.store.put_block(l2_block.clone())?;
|
||||
|
||||
yield Ok(l2_block);
|
||||
}
|
||||
yield Ok(l2_block);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Refetch stream after delay
|
||||
tokio::time::sleep(std::time::Duration::from_millis(
|
||||
self.config.resubscribe_interval_millis,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
// Refetch stream after delay
|
||||
tokio::time::sleep(std::time::Duration::from_millis(
|
||||
self.config.resubscribe_interval_millis,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
{
|
||||
"home": "./indexer/service",
|
||||
"resubscribe_interval_millis": 1000,
|
||||
"bedrock_client_config": {
|
||||
"addr": "http://localhost:8080",
|
||||
@ -7,5 +8,153 @@
|
||||
"max_retries": 5
|
||||
}
|
||||
},
|
||||
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101"
|
||||
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101",
|
||||
"initial_accounts": [
|
||||
{
|
||||
"account_id": "BLgCRDXYdQPMMWVHYRFGQZbgeHx9frkipa8GtpG2Syqy",
|
||||
"balance": 10000
|
||||
},
|
||||
{
|
||||
"account_id": "Gj1mJy5W7J5pfmLRujmQaLfLMWidNxQ6uwnhb666ZwHw",
|
||||
"balance": 20000
|
||||
}
|
||||
],
|
||||
"initial_commitments": [
|
||||
{
|
||||
"npk": [
|
||||
63,
|
||||
202,
|
||||
178,
|
||||
231,
|
||||
183,
|
||||
82,
|
||||
237,
|
||||
212,
|
||||
216,
|
||||
221,
|
||||
215,
|
||||
255,
|
||||
153,
|
||||
101,
|
||||
177,
|
||||
161,
|
||||
254,
|
||||
210,
|
||||
128,
|
||||
122,
|
||||
54,
|
||||
190,
|
||||
230,
|
||||
151,
|
||||
183,
|
||||
64,
|
||||
225,
|
||||
229,
|
||||
113,
|
||||
1,
|
||||
228,
|
||||
97
|
||||
],
|
||||
"account": {
|
||||
"program_owner": [
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0
|
||||
],
|
||||
"balance": 10000,
|
||||
"data": [],
|
||||
"nonce": 0
|
||||
}
|
||||
},
|
||||
{
|
||||
"npk": [
|
||||
192,
|
||||
251,
|
||||
166,
|
||||
243,
|
||||
167,
|
||||
236,
|
||||
84,
|
||||
249,
|
||||
35,
|
||||
136,
|
||||
130,
|
||||
172,
|
||||
219,
|
||||
225,
|
||||
161,
|
||||
139,
|
||||
229,
|
||||
89,
|
||||
243,
|
||||
125,
|
||||
194,
|
||||
213,
|
||||
209,
|
||||
30,
|
||||
23,
|
||||
174,
|
||||
100,
|
||||
244,
|
||||
124,
|
||||
74,
|
||||
140,
|
||||
47
|
||||
],
|
||||
"account": {
|
||||
"program_owner": [
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0
|
||||
],
|
||||
"balance": 20000,
|
||||
"data": [],
|
||||
"nonce": 0
|
||||
}
|
||||
}
|
||||
],
|
||||
"signing_key": [
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37,
|
||||
37
|
||||
]
|
||||
}
|
||||
|
||||
@ -26,6 +26,9 @@ pub trait Rpc {
|
||||
#[subscription(name = "subscribeToFinalizedBlocks", item = BlockId)]
|
||||
async fn subscribe_to_finalized_blocks(&self) -> SubscriptionResult;
|
||||
|
||||
#[method(name = "getLastFinalizedBlockId")]
|
||||
async fn get_last_finalized_block_id(&self) -> Result<BlockId, ErrorObjectOwned>;
|
||||
|
||||
#[method(name = "getBlockById")]
|
||||
async fn get_block_by_id(&self, block_id: BlockId) -> Result<Block, ErrorObjectOwned>;
|
||||
|
||||
@ -48,4 +51,8 @@ pub trait Rpc {
|
||||
limit: u32,
|
||||
offset: u32,
|
||||
) -> Result<Vec<Transaction>, ErrorObjectOwned>;
|
||||
|
||||
// ToDo: expand healthcheck response into some kind of report
|
||||
#[method(name = "checkHealth")]
|
||||
async fn healthcheck(&self) -> Result<(), ErrorObjectOwned>;
|
||||
}
|
||||
|
||||
@ -180,6 +180,15 @@ impl indexer_service_rpc::RpcServer for MockIndexerService {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_last_finalized_block_id(&self) -> Result<BlockId, ErrorObjectOwned> {
|
||||
self.blocks
|
||||
.last()
|
||||
.map(|bl| bl.header.block_id)
|
||||
.ok_or_else(|| {
|
||||
ErrorObjectOwned::owned(-32001, "Last block not found".to_string(), None::<()>)
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_block_by_id(&self, block_id: BlockId) -> Result<Block, ErrorObjectOwned> {
|
||||
self.blocks
|
||||
.iter()
|
||||
@ -268,4 +277,8 @@ impl indexer_service_rpc::RpcServer for MockIndexerService {
|
||||
.map(|(tx, _)| tx.clone())
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn healthcheck(&self) -> Result<(), ErrorObjectOwned> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@ -15,11 +15,6 @@ use tokio::sync::mpsc::UnboundedSender;
|
||||
|
||||
pub struct IndexerService {
|
||||
subscription_service: SubscriptionService,
|
||||
|
||||
#[expect(
|
||||
dead_code,
|
||||
reason = "Will be used in future implementations of RPC methods"
|
||||
)]
|
||||
indexer: IndexerCore,
|
||||
}
|
||||
|
||||
@ -53,33 +48,104 @@ impl indexer_service_rpc::RpcServer for IndexerService {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_block_by_id(&self, _block_id: BlockId) -> Result<Block, ErrorObjectOwned> {
|
||||
Err(not_yet_implemented_error())
|
||||
async fn get_last_finalized_block_id(&self) -> Result<BlockId, ErrorObjectOwned> {
|
||||
self.indexer.store.get_last_block_id().map_err(|err| {
|
||||
ErrorObjectOwned::owned(-32001, "DBError".to_string(), Some(format!("{err:#?}")))
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_block_by_hash(&self, _block_hash: HashType) -> Result<Block, ErrorObjectOwned> {
|
||||
Err(not_yet_implemented_error())
|
||||
async fn get_block_by_id(&self, block_id: BlockId) -> Result<Block, ErrorObjectOwned> {
|
||||
Ok(self
|
||||
.indexer
|
||||
.store
|
||||
.get_block_at_id(block_id)
|
||||
.map_err(|err| {
|
||||
ErrorObjectOwned::owned(-32001, "DBError".to_string(), Some(format!("{err:#?}")))
|
||||
})?
|
||||
.into())
|
||||
}
|
||||
|
||||
async fn get_account(&self, _account_id: AccountId) -> Result<Account, ErrorObjectOwned> {
|
||||
Err(not_yet_implemented_error())
|
||||
async fn get_block_by_hash(&self, block_hash: HashType) -> Result<Block, ErrorObjectOwned> {
|
||||
Ok(self
|
||||
.indexer
|
||||
.store
|
||||
.get_block_by_hash(block_hash.0)
|
||||
.map_err(|err| {
|
||||
ErrorObjectOwned::owned(-32001, "DBError".to_string(), Some(format!("{err:#?}")))
|
||||
})?
|
||||
.into())
|
||||
}
|
||||
|
||||
async fn get_transaction(&self, _tx_hash: HashType) -> Result<Transaction, ErrorObjectOwned> {
|
||||
Err(not_yet_implemented_error())
|
||||
async fn get_account(&self, account_id: AccountId) -> Result<Account, ErrorObjectOwned> {
|
||||
Ok(self
|
||||
.indexer
|
||||
.store
|
||||
.get_account_final(&account_id.into())
|
||||
.map_err(|err| {
|
||||
ErrorObjectOwned::owned(-32001, "DBError".to_string(), Some(format!("{err:#?}")))
|
||||
})?
|
||||
.into())
|
||||
}
|
||||
|
||||
async fn get_blocks(&self, _offset: u32, _limit: u32) -> Result<Vec<Block>, ErrorObjectOwned> {
|
||||
Err(not_yet_implemented_error())
|
||||
async fn get_transaction(&self, tx_hash: HashType) -> Result<Transaction, ErrorObjectOwned> {
|
||||
Ok(self
|
||||
.indexer
|
||||
.store
|
||||
.get_transaction_by_hash(tx_hash.0)
|
||||
.map_err(|err| {
|
||||
ErrorObjectOwned::owned(-32001, "DBError".to_string(), Some(format!("{err:#?}")))
|
||||
})?
|
||||
.into())
|
||||
}
|
||||
|
||||
async fn get_blocks(&self, offset: u32, limit: u32) -> Result<Vec<Block>, ErrorObjectOwned> {
|
||||
let blocks = self
|
||||
.indexer
|
||||
.store
|
||||
.get_block_batch(offset as u64, limit as u64)
|
||||
.map_err(|err| {
|
||||
ErrorObjectOwned::owned(-32001, "DBError".to_string(), Some(format!("{err:#?}")))
|
||||
})?;
|
||||
|
||||
let mut block_res = vec![];
|
||||
|
||||
for block in blocks {
|
||||
block_res.push(block.into())
|
||||
}
|
||||
|
||||
Ok(block_res)
|
||||
}
|
||||
|
||||
async fn get_transactions_by_account(
|
||||
&self,
|
||||
_account_id: AccountId,
|
||||
_limit: u32,
|
||||
_offset: u32,
|
||||
account_id: AccountId,
|
||||
limit: u32,
|
||||
offset: u32,
|
||||
) -> Result<Vec<Transaction>, ErrorObjectOwned> {
|
||||
Err(not_yet_implemented_error())
|
||||
let transactions = self
|
||||
.indexer
|
||||
.store
|
||||
.get_transactions_by_account(account_id.value, offset as u64, limit as u64)
|
||||
.map_err(|err| {
|
||||
ErrorObjectOwned::owned(-32001, "DBError".to_string(), Some(format!("{err:#?}")))
|
||||
})?;
|
||||
|
||||
let mut tx_res = vec![];
|
||||
|
||||
for tx in transactions {
|
||||
tx_res.push(tx.into())
|
||||
}
|
||||
|
||||
Ok(tx_res)
|
||||
}
|
||||
|
||||
async fn healthcheck(&self) -> Result<(), ErrorObjectOwned> {
|
||||
// Checking, that indexer can calculate last state
|
||||
let _ = self.indexer.store.final_state().map_err(|err| {
|
||||
ErrorObjectOwned::owned(-32001, "DBError".to_string(), Some(format!("{err:#?}")))
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@ -219,7 +285,7 @@ impl<T> Drop for Subscription<T> {
|
||||
}
|
||||
}
|
||||
|
||||
fn not_yet_implemented_error() -> ErrorObjectOwned {
|
||||
pub fn not_yet_implemented_error() -> ErrorObjectOwned {
|
||||
ErrorObject::owned(
|
||||
ErrorCode::InternalError.code(),
|
||||
"Not yet implemented",
|
||||
|
||||
@ -12,15 +12,17 @@ sequencer_runner.workspace = true
|
||||
wallet.workspace = true
|
||||
common.workspace = true
|
||||
key_protocol.workspace = true
|
||||
wallet-ffi.workspace = true
|
||||
token_core.workspace = true
|
||||
indexer_service.workspace = true
|
||||
serde_json.workspace = true
|
||||
token_core.workspace = true
|
||||
indexer_service_rpc.workspace = true
|
||||
wallet-ffi.workspace = true
|
||||
|
||||
url.workspace = true
|
||||
|
||||
anyhow.workspace = true
|
||||
env_logger.workspace = true
|
||||
log.workspace = true
|
||||
serde_json.workspace = true
|
||||
base64.workspace = true
|
||||
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
|
||||
hex.workspace = true
|
||||
|
||||
@ -1,22 +1,26 @@
|
||||
use std::{net::SocketAddr, path::PathBuf};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use indexer_service::{BackoffConfig, BedrockClientConfig, ChannelId, IndexerConfig};
|
||||
use common::block::{AccountInitialData, CommitmentsInitialData};
|
||||
use indexer_service::{BackoffConfig, ChannelId, ClientConfig, IndexerConfig};
|
||||
use key_protocol::key_management::KeyChain;
|
||||
use nssa::{Account, AccountId, PrivateKey, PublicKey};
|
||||
use nssa_core::{account::Data, program::DEFAULT_PROGRAM_ID};
|
||||
use sequencer_core::config::{
|
||||
AccountInitialData, BedrockConfig, CommitmentsInitialData, SequencerConfig,
|
||||
};
|
||||
use sequencer_core::config::{BedrockConfig, SequencerConfig};
|
||||
use url::Url;
|
||||
use wallet::config::{
|
||||
InitialAccountData, InitialAccountDataPrivate, InitialAccountDataPublic, WalletConfig,
|
||||
};
|
||||
|
||||
pub fn indexer_config(bedrock_addr: SocketAddr) -> Result<IndexerConfig> {
|
||||
pub fn indexer_config(
|
||||
bedrock_addr: SocketAddr,
|
||||
home: PathBuf,
|
||||
initial_data: &InitialData,
|
||||
) -> Result<IndexerConfig> {
|
||||
Ok(IndexerConfig {
|
||||
home,
|
||||
resubscribe_interval_millis: 1000,
|
||||
bedrock_client_config: BedrockClientConfig {
|
||||
bedrock_client_config: ClientConfig {
|
||||
addr: addr_to_url(UrlProtocol::Http, bedrock_addr)
|
||||
.context("Failed to convert bedrock addr to URL")?,
|
||||
auth: None,
|
||||
@ -25,6 +29,9 @@ pub fn indexer_config(bedrock_addr: SocketAddr) -> Result<IndexerConfig> {
|
||||
max_retries: 10,
|
||||
},
|
||||
},
|
||||
initial_accounts: initial_data.sequencer_initial_accounts(),
|
||||
initial_commitments: initial_data.sequencer_initial_commitments(),
|
||||
signing_key: [37; 32],
|
||||
channel_id: bedrock_channel_id(),
|
||||
})
|
||||
}
|
||||
|
||||
@ -10,6 +10,7 @@ use indexer_service::IndexerHandle;
|
||||
use log::{debug, error, warn};
|
||||
use nssa::{AccountId, PrivacyPreservingTransaction};
|
||||
use nssa_core::Commitment;
|
||||
use sequencer_core::indexer_client::{IndexerClient, IndexerClientTrait};
|
||||
use sequencer_runner::SequencerHandle;
|
||||
use tempfile::TempDir;
|
||||
use testcontainers::compose::DockerCompose;
|
||||
@ -33,11 +34,13 @@ static LOGGER: LazyLock<()> = LazyLock::new(env_logger::init);
|
||||
// NOTE: Order of fields is important for proper drop order.
|
||||
pub struct TestContext {
|
||||
sequencer_client: SequencerClient,
|
||||
indexer_client: IndexerClient,
|
||||
wallet: WalletCore,
|
||||
wallet_password: String,
|
||||
sequencer_handle: SequencerHandle,
|
||||
indexer_handle: IndexerHandle,
|
||||
bedrock_compose: DockerCompose,
|
||||
_temp_indexer_dir: TempDir,
|
||||
_temp_sequencer_dir: TempDir,
|
||||
_temp_wallet_dir: TempDir,
|
||||
}
|
||||
@ -63,7 +66,7 @@ impl TestContext {
|
||||
|
||||
let (bedrock_compose, bedrock_addr) = Self::setup_bedrock_node().await?;
|
||||
|
||||
let indexer_handle = Self::setup_indexer(bedrock_addr)
|
||||
let (indexer_handle, temp_indexer_dir) = Self::setup_indexer(bedrock_addr, &initial_data)
|
||||
.await
|
||||
.context("Failed to setup Indexer")?;
|
||||
|
||||
@ -83,16 +86,23 @@ impl TestContext {
|
||||
|
||||
let sequencer_url = config::addr_to_url(config::UrlProtocol::Http, sequencer_handle.addr())
|
||||
.context("Failed to convert sequencer addr to URL")?;
|
||||
let indexer_url = config::addr_to_url(config::UrlProtocol::Ws, indexer_handle.addr())
|
||||
.context("Failed to convert indexer addr to URL")?;
|
||||
let sequencer_client =
|
||||
SequencerClient::new(sequencer_url).context("Failed to create sequencer client")?;
|
||||
let indexer_client = IndexerClient::new(&indexer_url)
|
||||
.await
|
||||
.context("Failed to create indexer client")?;
|
||||
|
||||
Ok(Self {
|
||||
sequencer_client,
|
||||
indexer_client,
|
||||
wallet,
|
||||
wallet_password,
|
||||
bedrock_compose,
|
||||
sequencer_handle,
|
||||
indexer_handle,
|
||||
_temp_indexer_dir: temp_indexer_dir,
|
||||
_temp_sequencer_dir: temp_sequencer_dir,
|
||||
_temp_wallet_dir: temp_wallet_dir,
|
||||
})
|
||||
@ -161,13 +171,26 @@ impl TestContext {
|
||||
Ok((compose, addr))
|
||||
}
|
||||
|
||||
async fn setup_indexer(bedrock_addr: SocketAddr) -> Result<IndexerHandle> {
|
||||
let indexer_config =
|
||||
config::indexer_config(bedrock_addr).context("Failed to create Indexer config")?;
|
||||
async fn setup_indexer(
|
||||
bedrock_addr: SocketAddr,
|
||||
initial_data: &config::InitialData,
|
||||
) -> Result<(IndexerHandle, TempDir)> {
|
||||
let temp_indexer_dir =
|
||||
tempfile::tempdir().context("Failed to create temp dir for indexer home")?;
|
||||
|
||||
debug!("Using temp indexer home at {:?}", temp_indexer_dir.path());
|
||||
|
||||
let indexer_config = config::indexer_config(
|
||||
bedrock_addr,
|
||||
temp_indexer_dir.path().to_owned(),
|
||||
initial_data,
|
||||
)
|
||||
.context("Failed to create Indexer config")?;
|
||||
|
||||
indexer_service::run_server(indexer_config, 0)
|
||||
.await
|
||||
.context("Failed to run Indexer Service")
|
||||
.map(|handle| (handle, temp_indexer_dir))
|
||||
}
|
||||
|
||||
async fn setup_sequencer(
|
||||
@ -252,6 +275,11 @@ impl TestContext {
|
||||
&self.sequencer_client
|
||||
}
|
||||
|
||||
/// Get reference to the indexer client.
|
||||
pub fn indexer_client(&self) -> &IndexerClient {
|
||||
&self.indexer_client
|
||||
}
|
||||
|
||||
/// Get existing public account IDs in the wallet.
|
||||
pub fn existing_public_accounts(&self) -> Vec<AccountId> {
|
||||
self.wallet
|
||||
@ -277,9 +305,11 @@ impl Drop for TestContext {
|
||||
sequencer_handle,
|
||||
indexer_handle,
|
||||
bedrock_compose,
|
||||
_temp_indexer_dir: _,
|
||||
_temp_sequencer_dir: _,
|
||||
_temp_wallet_dir: _,
|
||||
sequencer_client: _,
|
||||
indexer_client: _,
|
||||
wallet: _,
|
||||
wallet_password: _,
|
||||
} = self;
|
||||
|
||||
180
integration_tests/tests/indexer.rs
Normal file
180
integration_tests/tests/indexer.rs
Normal file
@ -0,0 +1,180 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use indexer_service_rpc::RpcClient;
|
||||
use integration_tests::{
|
||||
TIME_TO_WAIT_FOR_BLOCK_SECONDS, TestContext, format_private_account_id,
|
||||
format_public_account_id, verify_commitment_is_in_state,
|
||||
};
|
||||
use log::info;
|
||||
use nssa::AccountId;
|
||||
use tokio::test;
|
||||
use wallet::cli::{Command, programs::native_token_transfer::AuthTransferSubcommand};
|
||||
|
||||
/// Timeout in milliseconds to reliably await for block finalization
|
||||
const L2_TO_L1_TIMEOUT_MILLIS: u64 = 300000;
|
||||
|
||||
#[test]
|
||||
#[ignore = "Not reliable with current bedrock node"]
|
||||
async fn indexer_test_run() -> Result<()> {
|
||||
let ctx = TestContext::new().await?;
|
||||
|
||||
// RUN OBSERVATION
|
||||
tokio::time::sleep(std::time::Duration::from_millis(L2_TO_L1_TIMEOUT_MILLIS)).await;
|
||||
|
||||
let last_block_seq = ctx
|
||||
.sequencer_client()
|
||||
.get_last_block()
|
||||
.await
|
||||
.unwrap()
|
||||
.last_block;
|
||||
|
||||
info!("Last block on seq now is {last_block_seq}");
|
||||
|
||||
let last_block_indexer = ctx
|
||||
.indexer_client()
|
||||
.get_last_finalized_block_id()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
info!("Last block on ind now is {last_block_indexer}");
|
||||
|
||||
assert!(last_block_indexer > 1);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore = "Not reliable with current bedrock node"]
|
||||
async fn indexer_block_batching() -> Result<()> {
|
||||
let ctx = TestContext::new().await?;
|
||||
|
||||
// WAIT
|
||||
info!("Waiting for indexer to parse blocks");
|
||||
tokio::time::sleep(std::time::Duration::from_millis(L2_TO_L1_TIMEOUT_MILLIS)).await;
|
||||
|
||||
let last_block_indexer = ctx
|
||||
.indexer_client()
|
||||
.get_last_finalized_block_id()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
info!("Last block on ind now is {last_block_indexer}");
|
||||
|
||||
assert!(last_block_indexer > 1);
|
||||
|
||||
// Getting wide batch to fit all blocks
|
||||
let block_batch = ctx.indexer_client().get_blocks(1, 100).await.unwrap();
|
||||
|
||||
// Checking chain consistency
|
||||
let mut prev_block_hash = block_batch.first().unwrap().header.hash;
|
||||
|
||||
for block in &block_batch[1..] {
|
||||
assert_eq!(block.header.prev_block_hash, prev_block_hash);
|
||||
|
||||
info!("Block {} chain-consistent", block.header.block_id);
|
||||
|
||||
prev_block_hash = block.header.hash;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore = "Not reliable with current bedrock node"]
|
||||
async fn indexer_state_consistency() -> Result<()> {
|
||||
let mut ctx = TestContext::new().await?;
|
||||
|
||||
let command = Command::AuthTransfer(AuthTransferSubcommand::Send {
|
||||
from: format_public_account_id(ctx.existing_public_accounts()[0]),
|
||||
to: Some(format_public_account_id(ctx.existing_public_accounts()[1])),
|
||||
to_npk: None,
|
||||
to_vpk: None,
|
||||
amount: 100,
|
||||
});
|
||||
|
||||
wallet::cli::execute_subcommand(ctx.wallet_mut(), command).await?;
|
||||
|
||||
info!("Waiting for next block creation");
|
||||
tokio::time::sleep(Duration::from_secs(TIME_TO_WAIT_FOR_BLOCK_SECONDS)).await;
|
||||
|
||||
info!("Checking correct balance move");
|
||||
let acc_1_balance = ctx
|
||||
.sequencer_client()
|
||||
.get_account_balance(ctx.existing_public_accounts()[0])
|
||||
.await?;
|
||||
let acc_2_balance = ctx
|
||||
.sequencer_client()
|
||||
.get_account_balance(ctx.existing_public_accounts()[1])
|
||||
.await?;
|
||||
|
||||
info!("Balance of sender: {acc_1_balance:#?}");
|
||||
info!("Balance of receiver: {acc_2_balance:#?}");
|
||||
|
||||
assert_eq!(acc_1_balance.balance, 9900);
|
||||
assert_eq!(acc_2_balance.balance, 20100);
|
||||
|
||||
let from: AccountId = ctx.existing_private_accounts()[0];
|
||||
let to: AccountId = ctx.existing_private_accounts()[1];
|
||||
|
||||
let command = Command::AuthTransfer(AuthTransferSubcommand::Send {
|
||||
from: format_private_account_id(from),
|
||||
to: Some(format_private_account_id(to)),
|
||||
to_npk: None,
|
||||
to_vpk: None,
|
||||
amount: 100,
|
||||
});
|
||||
|
||||
wallet::cli::execute_subcommand(ctx.wallet_mut(), command).await?;
|
||||
|
||||
info!("Waiting for next block creation");
|
||||
tokio::time::sleep(Duration::from_secs(TIME_TO_WAIT_FOR_BLOCK_SECONDS)).await;
|
||||
|
||||
let new_commitment1 = ctx
|
||||
.wallet()
|
||||
.get_private_account_commitment(from)
|
||||
.context("Failed to get private account commitment for sender")?;
|
||||
assert!(verify_commitment_is_in_state(new_commitment1, ctx.sequencer_client()).await);
|
||||
|
||||
let new_commitment2 = ctx
|
||||
.wallet()
|
||||
.get_private_account_commitment(to)
|
||||
.context("Failed to get private account commitment for receiver")?;
|
||||
assert!(verify_commitment_is_in_state(new_commitment2, ctx.sequencer_client()).await);
|
||||
|
||||
info!("Successfully transferred privately to owned account");
|
||||
|
||||
// WAIT
|
||||
info!("Waiting for indexer to parse blocks");
|
||||
tokio::time::sleep(std::time::Duration::from_millis(L2_TO_L1_TIMEOUT_MILLIS)).await;
|
||||
|
||||
let acc1_ind_state = ctx
|
||||
.indexer_client()
|
||||
.get_account(ctx.existing_public_accounts()[0].into())
|
||||
.await
|
||||
.unwrap();
|
||||
let acc2_ind_state = ctx
|
||||
.indexer_client()
|
||||
.get_account(ctx.existing_public_accounts()[1].into())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
info!("Checking correct state transition");
|
||||
let acc1_seq_state = ctx
|
||||
.sequencer_client()
|
||||
.get_account(ctx.existing_public_accounts()[0])
|
||||
.await?
|
||||
.account;
|
||||
let acc2_seq_state = ctx
|
||||
.sequencer_client()
|
||||
.get_account(ctx.existing_public_accounts()[1])
|
||||
.await?
|
||||
.account;
|
||||
|
||||
assert_eq!(acc1_ind_state, acc1_seq_state.into());
|
||||
assert_eq!(acc2_ind_state, acc2_seq_state.into());
|
||||
|
||||
// ToDo: Check private state transition
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -146,6 +146,16 @@ impl PrivacyPreservingTransaction {
|
||||
.map(|(_, public_key)| AccountId::from(public_key))
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn affected_public_account_ids(&self) -> Vec<AccountId> {
|
||||
let mut acc_set = self
|
||||
.signer_account_ids()
|
||||
.into_iter()
|
||||
.collect::<HashSet<_>>();
|
||||
acc_set.extend(&self.message.public_account_ids);
|
||||
|
||||
acc_set.into_iter().collect()
|
||||
}
|
||||
}
|
||||
|
||||
fn check_privacy_preserving_circuit_proof_is_valid(
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
use borsh::{BorshDeserialize, BorshSerialize};
|
||||
use nssa_core::account::AccountId;
|
||||
use sha2::{Digest as _, digest::FixedOutput as _};
|
||||
|
||||
use crate::{
|
||||
@ -38,4 +39,8 @@ impl ProgramDeploymentTransaction {
|
||||
hasher.update(&bytes);
|
||||
hasher.finalize_fixed().into()
|
||||
}
|
||||
|
||||
pub fn affected_public_account_ids(&self) -> Vec<AccountId> {
|
||||
vec![]
|
||||
}
|
||||
}
|
||||
|
||||
@ -45,6 +45,16 @@ impl PublicTransaction {
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn affected_public_account_ids(&self) -> Vec<AccountId> {
|
||||
let mut acc_set = self
|
||||
.signer_account_ids()
|
||||
.into_iter()
|
||||
.collect::<HashSet<_>>();
|
||||
acc_set.extend(&self.message.account_ids);
|
||||
|
||||
acc_set.into_iter().collect()
|
||||
}
|
||||
|
||||
pub fn hash(&self) -> [u8; 32] {
|
||||
let bytes = self.to_bytes();
|
||||
let mut hasher = sha2::Sha256::new();
|
||||
|
||||
@ -7,7 +7,7 @@ use common::{
|
||||
transaction::NSSATransaction,
|
||||
};
|
||||
use nssa::V02State;
|
||||
use storage::RocksDBIO;
|
||||
use storage::sequencer::RocksDBIO;
|
||||
|
||||
pub struct SequencerStore {
|
||||
dbio: RocksDBIO,
|
||||
|
||||
@ -5,27 +5,15 @@ use std::{
|
||||
};
|
||||
|
||||
use anyhow::Result;
|
||||
pub use bedrock_client::BackoffConfig;
|
||||
use common::config::BasicAuth;
|
||||
use bedrock_client::BackoffConfig;
|
||||
use common::{
|
||||
block::{AccountInitialData, CommitmentsInitialData},
|
||||
config::BasicAuth,
|
||||
};
|
||||
use logos_blockchain_core::mantle::ops::channel::ChannelId;
|
||||
use nssa::AccountId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use url::Url;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
/// Helperstruct for account serialization
|
||||
pub struct AccountInitialData {
|
||||
pub account_id: AccountId,
|
||||
pub balance: u128,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
/// Helperstruct to initialize commitments
|
||||
pub struct CommitmentsInitialData {
|
||||
pub npk: nssa_core::NullifierPublicKey,
|
||||
pub account: nssa_core::account::Account,
|
||||
}
|
||||
|
||||
// TODO: Provide default values
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct SequencerConfig {
|
||||
|
||||
@ -314,30 +314,6 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, I
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Introduce type-safe wrapper around checked transaction, e.g. AuthenticatedTransaction
|
||||
pub fn transaction_pre_check(
|
||||
tx: NSSATransaction,
|
||||
) -> Result<NSSATransaction, TransactionMalformationError> {
|
||||
// Stateless checks here
|
||||
match tx {
|
||||
NSSATransaction::Public(tx) => {
|
||||
if tx.witness_set().is_valid_for(tx.message()) {
|
||||
Ok(NSSATransaction::Public(tx))
|
||||
} else {
|
||||
Err(TransactionMalformationError::InvalidSignature)
|
||||
}
|
||||
}
|
||||
NSSATransaction::PrivacyPreserving(tx) => {
|
||||
if tx.witness_set().signatures_are_valid_for(tx.message()) {
|
||||
Ok(NSSATransaction::PrivacyPreserving(tx))
|
||||
} else {
|
||||
Err(TransactionMalformationError::InvalidSignature)
|
||||
}
|
||||
}
|
||||
NSSATransaction::ProgramDeployment(tx) => Ok(NSSATransaction::ProgramDeployment(tx)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Load signing key from file or generate a new one if it doesn't exist
|
||||
fn load_or_create_signing_key(path: &Path) -> Result<Ed25519Key> {
|
||||
if path.exists() {
|
||||
@ -364,15 +340,18 @@ mod tests {
|
||||
|
||||
use base58::ToBase58;
|
||||
use bedrock_client::BackoffConfig;
|
||||
use common::{test_utils::sequencer_sign_key_for_testing, transaction::NSSATransaction};
|
||||
use common::{
|
||||
block::AccountInitialData,
|
||||
test_utils::sequencer_sign_key_for_testing,
|
||||
transaction::{NSSATransaction, transaction_pre_check},
|
||||
};
|
||||
use logos_blockchain_core::mantle::ops::channel::ChannelId;
|
||||
use mempool::MemPoolHandle;
|
||||
use nssa::{AccountId, PrivateKey};
|
||||
|
||||
use crate::{
|
||||
config::{AccountInitialData, BedrockConfig, SequencerConfig},
|
||||
config::{BedrockConfig, SequencerConfig},
|
||||
mock::SequencerCoreWithMockClients,
|
||||
transaction_pre_check,
|
||||
};
|
||||
|
||||
fn setup_sequencer_config_variable_initial_accounts(
|
||||
|
||||
@ -8,7 +8,8 @@ license = { workspace = true }
|
||||
nssa.workspace = true
|
||||
common.workspace = true
|
||||
mempool.workspace = true
|
||||
sequencer_core.workspace = true
|
||||
sequencer_core = { workspace = true }
|
||||
bedrock_client.workspace = true
|
||||
|
||||
anyhow.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
@ -3,7 +3,7 @@ use std::collections::HashMap;
|
||||
use actix_web::Error as HttpError;
|
||||
use base64::{Engine, engine::general_purpose};
|
||||
use common::{
|
||||
block::HashableBlockData,
|
||||
block::{AccountInitialData, HashableBlockData},
|
||||
rpc_primitives::{
|
||||
errors::RpcError,
|
||||
message::{Message, Request},
|
||||
@ -20,14 +20,13 @@ use common::{
|
||||
SendTxResponse,
|
||||
},
|
||||
},
|
||||
transaction::NSSATransaction,
|
||||
transaction::{NSSATransaction, transaction_pre_check},
|
||||
};
|
||||
use itertools::Itertools as _;
|
||||
use log::warn;
|
||||
use nssa::{self, program::Program};
|
||||
use sequencer_core::{
|
||||
block_settlement_client::BlockSettlementClientTrait, config::AccountInitialData,
|
||||
indexer_client::IndexerClientTrait,
|
||||
block_settlement_client::BlockSettlementClientTrait, indexer_client::IndexerClientTrait,
|
||||
};
|
||||
use serde_json::Value;
|
||||
|
||||
@ -95,8 +94,8 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> JsonHandler<BC, IC>
|
||||
let tx = borsh::from_slice::<NSSATransaction>(&send_tx_req.transaction).unwrap();
|
||||
let tx_hash = tx.hash();
|
||||
|
||||
let authenticated_tx = sequencer_core::transaction_pre_check(tx)
|
||||
.inspect_err(|err| warn!("Error at pre_check {err:#?}"))?;
|
||||
let authenticated_tx =
|
||||
transaction_pre_check(tx).inspect_err(|err| warn!("Error at pre_check {err:#?}"))?;
|
||||
|
||||
// TODO: Do we need a timeout here? It will be usable if we have too many transactions to
|
||||
// process
|
||||
@ -327,12 +326,14 @@ mod tests {
|
||||
|
||||
use base58::ToBase58;
|
||||
use base64::{Engine, engine::general_purpose};
|
||||
use bedrock_client::BackoffConfig;
|
||||
use common::{
|
||||
config::BasicAuth, test_utils::sequencer_sign_key_for_testing, transaction::NSSATransaction,
|
||||
block::AccountInitialData, config::BasicAuth, test_utils::sequencer_sign_key_for_testing,
|
||||
transaction::NSSATransaction,
|
||||
};
|
||||
use nssa::AccountId;
|
||||
use sequencer_core::{
|
||||
config::{AccountInitialData, BackoffConfig, BedrockConfig, SequencerConfig},
|
||||
config::{BedrockConfig, SequencerConfig},
|
||||
mock::{MockBlockSettlementClient, MockIndexerClient, SequencerCoreWithMockClients},
|
||||
};
|
||||
use serde_json::Value;
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
use common::rpc_primitives::errors::{RpcError, RpcParseError};
|
||||
use common::{
|
||||
rpc_primitives::errors::{RpcError, RpcParseError},
|
||||
transaction::TransactionMalformationError,
|
||||
};
|
||||
use log::debug;
|
||||
use sequencer_core::TransactionMalformationError;
|
||||
|
||||
pub struct RpcErr(pub RpcError);
|
||||
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
{
|
||||
"home": ".",
|
||||
"home": "./sequencer_runner",
|
||||
"override_rust_log": null,
|
||||
"genesis_id": 1,
|
||||
"is_genesis_random": true,
|
||||
|
||||
@ -6,8 +6,9 @@ license = { workspace = true }
|
||||
|
||||
[dependencies]
|
||||
common.workspace = true
|
||||
nssa.workspace = true
|
||||
|
||||
thiserror.workspace = true
|
||||
borsh.workspace = true
|
||||
rocksdb.workspace = true
|
||||
nssa.workspace = true
|
||||
tempfile.workspace = true
|
||||
|
||||
1244
storage/src/indexer.rs
Normal file
1244
storage/src/indexer.rs
Normal file
File diff suppressed because it is too large
Load Diff
@ -1,601 +1,3 @@
|
||||
use std::{path::Path, sync::Arc};
|
||||
|
||||
use common::block::{BedrockStatus, Block, BlockMeta, MantleMsgId};
|
||||
use error::DbError;
|
||||
use nssa::V02State;
|
||||
use rocksdb::{
|
||||
BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options, WriteBatch,
|
||||
};
|
||||
|
||||
pub mod error;
|
||||
|
||||
/// Maximal size of stored blocks in base
|
||||
///
|
||||
/// Used to control db size
|
||||
///
|
||||
/// Currently effectively unbounded.
|
||||
pub const BUFF_SIZE_ROCKSDB: usize = usize::MAX;
|
||||
|
||||
/// Size of stored blocks cache in memory
|
||||
///
|
||||
/// Keeping small to not run out of memory
|
||||
pub const CACHE_SIZE: usize = 1000;
|
||||
|
||||
/// Key base for storing metainformation about id of first block in db
|
||||
pub const DB_META_FIRST_BLOCK_IN_DB_KEY: &str = "first_block_in_db";
|
||||
/// Key base for storing metainformation about id of last current block in db
|
||||
pub const DB_META_LAST_BLOCK_IN_DB_KEY: &str = "last_block_in_db";
|
||||
/// Key base for storing metainformation which describe if first block has been set
|
||||
pub const DB_META_FIRST_BLOCK_SET_KEY: &str = "first_block_set";
|
||||
/// Key base for storing metainformation about the last finalized block on Bedrock
|
||||
pub const DB_META_LAST_FINALIZED_BLOCK_ID: &str = "last_finalized_block_id";
|
||||
/// Key base for storing metainformation about the latest block meta
|
||||
pub const DB_META_LATEST_BLOCK_META_KEY: &str = "latest_block_meta";
|
||||
|
||||
/// Key base for storing the NSSA state
|
||||
pub const DB_NSSA_STATE_KEY: &str = "nssa_state";
|
||||
|
||||
/// Name of block column family
|
||||
pub const CF_BLOCK_NAME: &str = "cf_block";
|
||||
/// Name of meta column family
|
||||
pub const CF_META_NAME: &str = "cf_meta";
|
||||
/// Name of state column family
|
||||
pub const CF_NSSA_STATE_NAME: &str = "cf_nssa_state";
|
||||
|
||||
pub type DbResult<T> = Result<T, DbError>;
|
||||
|
||||
pub struct RocksDBIO {
|
||||
pub db: DBWithThreadMode<MultiThreaded>,
|
||||
}
|
||||
|
||||
impl RocksDBIO {
|
||||
pub fn open_or_create(
|
||||
path: &Path,
|
||||
start_block: Option<(&Block, MantleMsgId)>,
|
||||
) -> DbResult<Self> {
|
||||
let mut cf_opts = Options::default();
|
||||
cf_opts.set_max_write_buffer_number(16);
|
||||
// ToDo: Add more column families for different data
|
||||
let cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone());
|
||||
let cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone());
|
||||
let cfstate = ColumnFamilyDescriptor::new(CF_NSSA_STATE_NAME, cf_opts.clone());
|
||||
|
||||
let mut db_opts = Options::default();
|
||||
db_opts.create_missing_column_families(true);
|
||||
db_opts.create_if_missing(true);
|
||||
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
|
||||
&db_opts,
|
||||
path,
|
||||
vec![cfb, cfmeta, cfstate],
|
||||
);
|
||||
|
||||
let dbio = Self {
|
||||
// There is no point in handling this from runner code
|
||||
db: db.unwrap(),
|
||||
};
|
||||
|
||||
let is_start_set = dbio.get_meta_is_first_block_set()?;
|
||||
|
||||
if is_start_set {
|
||||
Ok(dbio)
|
||||
} else if let Some((block, msg_id)) = start_block {
|
||||
let block_id = block.header.block_id;
|
||||
dbio.put_meta_first_block_in_db(block, msg_id)?;
|
||||
dbio.put_meta_is_first_block_set()?;
|
||||
dbio.put_meta_last_block_in_db(block_id)?;
|
||||
dbio.put_meta_last_finalized_block_id(None)?;
|
||||
dbio.put_meta_latest_block_meta(&BlockMeta {
|
||||
id: block.header.block_id,
|
||||
hash: block.header.hash,
|
||||
msg_id,
|
||||
})?;
|
||||
|
||||
Ok(dbio)
|
||||
} else {
|
||||
// Here we are trying to start a DB without a block, one should not do it.
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn destroy(path: &Path) -> DbResult<()> {
|
||||
let mut cf_opts = Options::default();
|
||||
cf_opts.set_max_write_buffer_number(16);
|
||||
// ToDo: Add more column families for different data
|
||||
let _cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone());
|
||||
let _cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone());
|
||||
let _cfstate = ColumnFamilyDescriptor::new(CF_NSSA_STATE_NAME, cf_opts.clone());
|
||||
|
||||
let mut db_opts = Options::default();
|
||||
db_opts.create_missing_column_families(true);
|
||||
db_opts.create_if_missing(true);
|
||||
DBWithThreadMode::<MultiThreaded>::destroy(&db_opts, path)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))
|
||||
}
|
||||
|
||||
pub fn meta_column(&self) -> Arc<BoundColumnFamily<'_>> {
|
||||
self.db.cf_handle(CF_META_NAME).unwrap()
|
||||
}
|
||||
|
||||
pub fn block_column(&self) -> Arc<BoundColumnFamily<'_>> {
|
||||
self.db.cf_handle(CF_BLOCK_NAME).unwrap()
|
||||
}
|
||||
|
||||
pub fn nssa_state_column(&self) -> Arc<BoundColumnFamily<'_>> {
|
||||
self.db.cf_handle(CF_NSSA_STATE_NAME).unwrap()
|
||||
}
|
||||
|
||||
pub fn get_meta_first_block_in_db(&self) -> DbResult<u64> {
|
||||
let cf_meta = self.meta_column();
|
||||
let res = self
|
||||
.db
|
||||
.get_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_FIRST_BLOCK_IN_DB_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_FIRST_BLOCK_IN_DB_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
|
||||
if let Some(data) = res {
|
||||
Ok(borsh::from_slice::<u64>(&data).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to deserialize first block".to_string()),
|
||||
)
|
||||
})?)
|
||||
} else {
|
||||
Err(DbError::db_interaction_error(
|
||||
"First block not found".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_meta_last_block_in_db(&self) -> DbResult<u64> {
|
||||
let cf_meta = self.meta_column();
|
||||
let res = self
|
||||
.db
|
||||
.get_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
|
||||
if let Some(data) = res {
|
||||
Ok(borsh::from_slice::<u64>(&data).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to deserialize last block".to_string()),
|
||||
)
|
||||
})?)
|
||||
} else {
|
||||
Err(DbError::db_interaction_error(
|
||||
"Last block not found".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_meta_is_first_block_set(&self) -> DbResult<bool> {
|
||||
let cf_meta = self.meta_column();
|
||||
let res = self
|
||||
.db
|
||||
.get_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_FIRST_BLOCK_SET_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_FIRST_BLOCK_SET_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
|
||||
Ok(res.is_some())
|
||||
}
|
||||
|
||||
pub fn put_nssa_state_in_db(&self, state: &V02State, batch: &mut WriteBatch) -> DbResult<()> {
|
||||
let cf_nssa_state = self.nssa_state_column();
|
||||
batch.put_cf(
|
||||
&cf_nssa_state,
|
||||
borsh::to_vec(&DB_NSSA_STATE_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_NSSA_STATE_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
borsh::to_vec(state).map_err(|err| {
|
||||
DbError::borsh_cast_message(err, Some("Failed to serialize NSSA state".to_string()))
|
||||
})?,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_meta_first_block_in_db(&self, block: &Block, msg_id: MantleMsgId) -> DbResult<()> {
|
||||
let cf_meta = self.meta_column();
|
||||
self.db
|
||||
.put_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_FIRST_BLOCK_IN_DB_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_FIRST_BLOCK_IN_DB_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
borsh::to_vec(&block.header.block_id).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize first block id".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
|
||||
let mut batch = WriteBatch::default();
|
||||
self.put_block(block, msg_id, true, &mut batch)?;
|
||||
self.db.write(batch).map_err(|rerr| {
|
||||
DbError::rocksdb_cast_message(
|
||||
rerr,
|
||||
Some("Failed to write first block in db".to_string()),
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_meta_last_block_in_db(&self, block_id: u64) -> DbResult<()> {
|
||||
let cf_meta = self.meta_column();
|
||||
self.db
|
||||
.put_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
borsh::to_vec(&block_id).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize last block id".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put_meta_last_block_in_db_batch(
|
||||
&self,
|
||||
block_id: u64,
|
||||
batch: &mut WriteBatch,
|
||||
) -> DbResult<()> {
|
||||
let cf_meta = self.meta_column();
|
||||
batch.put_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
borsh::to_vec(&block_id).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize last block id".to_string()),
|
||||
)
|
||||
})?,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_meta_last_finalized_block_id(&self, block_id: Option<u64>) -> DbResult<()> {
|
||||
let cf_meta = self.meta_column();
|
||||
self.db
|
||||
.put_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_LAST_FINALIZED_BLOCK_ID).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_LAST_FINALIZED_BLOCK_ID".to_string()),
|
||||
)
|
||||
})?,
|
||||
borsh::to_vec(&block_id).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize last block id".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_meta_is_first_block_set(&self) -> DbResult<()> {
|
||||
let cf_meta = self.meta_column();
|
||||
self.db
|
||||
.put_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_FIRST_BLOCK_SET_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_FIRST_BLOCK_SET_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
[1u8; 1],
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put_meta_latest_block_meta(&self, block_meta: &BlockMeta) -> DbResult<()> {
|
||||
let cf_meta = self.meta_column();
|
||||
self.db
|
||||
.put_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
borsh::to_vec(&block_meta).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize latest block meta".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put_meta_latest_block_meta_batch(
|
||||
&self,
|
||||
block_meta: &BlockMeta,
|
||||
batch: &mut WriteBatch,
|
||||
) -> DbResult<()> {
|
||||
let cf_meta = self.meta_column();
|
||||
batch.put_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
borsh::to_vec(&block_meta).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize latest block meta".to_string()),
|
||||
)
|
||||
})?,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn latest_block_meta(&self) -> DbResult<BlockMeta> {
|
||||
let cf_meta = self.meta_column();
|
||||
let res = self
|
||||
.db
|
||||
.get_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
|
||||
if let Some(data) = res {
|
||||
Ok(borsh::from_slice::<BlockMeta>(&data).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to deserialize latest block meta".to_string()),
|
||||
)
|
||||
})?)
|
||||
} else {
|
||||
Err(DbError::db_interaction_error(
|
||||
"Latest block meta not found".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn put_block(
|
||||
&self,
|
||||
block: &Block,
|
||||
msg_id: MantleMsgId,
|
||||
first: bool,
|
||||
batch: &mut WriteBatch,
|
||||
) -> DbResult<()> {
|
||||
let cf_block = self.block_column();
|
||||
|
||||
if !first {
|
||||
let last_curr_block = self.get_meta_last_block_in_db()?;
|
||||
|
||||
if block.header.block_id > last_curr_block {
|
||||
self.put_meta_last_block_in_db_batch(block.header.block_id, batch)?;
|
||||
self.put_meta_latest_block_meta_batch(
|
||||
&BlockMeta {
|
||||
id: block.header.block_id,
|
||||
hash: block.header.hash,
|
||||
msg_id,
|
||||
},
|
||||
batch,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
batch.put_cf(
|
||||
&cf_block,
|
||||
borsh::to_vec(&block.header.block_id).map_err(|err| {
|
||||
DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_string()))
|
||||
})?,
|
||||
borsh::to_vec(block).map_err(|err| {
|
||||
DbError::borsh_cast_message(err, Some("Failed to serialize block data".to_string()))
|
||||
})?,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_block(&self, block_id: u64) -> DbResult<Block> {
|
||||
let cf_block = self.block_column();
|
||||
let res = self
|
||||
.db
|
||||
.get_cf(
|
||||
&cf_block,
|
||||
borsh::to_vec(&block_id).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize block id".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
|
||||
if let Some(data) = res {
|
||||
Ok(borsh::from_slice::<Block>(&data).map_err(|serr| {
|
||||
DbError::borsh_cast_message(
|
||||
serr,
|
||||
Some("Failed to deserialize block data".to_string()),
|
||||
)
|
||||
})?)
|
||||
} else {
|
||||
Err(DbError::db_interaction_error(
|
||||
"Block on this id not found".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_nssa_state(&self) -> DbResult<V02State> {
|
||||
let cf_nssa_state = self.nssa_state_column();
|
||||
let res = self
|
||||
.db
|
||||
.get_cf(
|
||||
&cf_nssa_state,
|
||||
borsh::to_vec(&DB_NSSA_STATE_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize block id".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
|
||||
if let Some(data) = res {
|
||||
Ok(borsh::from_slice::<V02State>(&data).map_err(|serr| {
|
||||
DbError::borsh_cast_message(
|
||||
serr,
|
||||
Some("Failed to deserialize block data".to_string()),
|
||||
)
|
||||
})?)
|
||||
} else {
|
||||
Err(DbError::db_interaction_error(
|
||||
"Block on this id not found".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn delete_block(&self, block_id: u64) -> DbResult<()> {
|
||||
let cf_block = self.block_column();
|
||||
let key = borsh::to_vec(&block_id).map_err(|err| {
|
||||
DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_string()))
|
||||
})?;
|
||||
|
||||
if self
|
||||
.db
|
||||
.get_cf(&cf_block, &key)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?
|
||||
.is_none()
|
||||
{
|
||||
return Err(DbError::db_interaction_error(
|
||||
"Block on this id not found".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
self.db
|
||||
.delete_cf(&cf_block, key)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn mark_block_as_finalized(&self, block_id: u64) -> DbResult<()> {
|
||||
let mut block = self.get_block(block_id)?;
|
||||
block.bedrock_status = BedrockStatus::Finalized;
|
||||
|
||||
let cf_block = self.block_column();
|
||||
self.db
|
||||
.put_cf(
|
||||
&cf_block,
|
||||
borsh::to_vec(&block_id).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize block id".to_string()),
|
||||
)
|
||||
})?,
|
||||
borsh::to_vec(&block).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize block data".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| {
|
||||
DbError::rocksdb_cast_message(
|
||||
rerr,
|
||||
Some(format!("Failed to mark block {block_id} as finalized")),
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_all_blocks(&self) -> impl Iterator<Item = DbResult<Block>> {
|
||||
let cf_block = self.block_column();
|
||||
self.db
|
||||
.iterator_cf(&cf_block, rocksdb::IteratorMode::Start)
|
||||
.map(|res| {
|
||||
let (_key, value) = res.map_err(|rerr| {
|
||||
DbError::rocksdb_cast_message(
|
||||
rerr,
|
||||
Some("Failed to get key value pair".to_string()),
|
||||
)
|
||||
})?;
|
||||
|
||||
borsh::from_slice::<Block>(&value).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to deserialize block data".to_string()),
|
||||
)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
pub fn atomic_update(
|
||||
&self,
|
||||
block: &Block,
|
||||
msg_id: MantleMsgId,
|
||||
state: &V02State,
|
||||
) -> DbResult<()> {
|
||||
let block_id = block.header.block_id;
|
||||
let mut batch = WriteBatch::default();
|
||||
self.put_block(block, msg_id, false, &mut batch)?;
|
||||
self.put_nssa_state_in_db(state, &mut batch)?;
|
||||
self.db.write(batch).map_err(|rerr| {
|
||||
DbError::rocksdb_cast_message(
|
||||
rerr,
|
||||
Some(format!("Failed to udpate db with block {block_id}")),
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
pub mod indexer;
|
||||
pub mod sequencer;
|
||||
|
||||
600
storage/src/sequencer.rs
Normal file
600
storage/src/sequencer.rs
Normal file
@ -0,0 +1,600 @@
|
||||
use std::{path::Path, sync::Arc};
|
||||
|
||||
use common::block::{BedrockStatus, Block, BlockMeta, MantleMsgId};
|
||||
use nssa::V02State;
|
||||
use rocksdb::{
|
||||
BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options, WriteBatch,
|
||||
};
|
||||
|
||||
use crate::error::DbError;
|
||||
|
||||
/// Maximal size of stored blocks in base
|
||||
///
|
||||
/// Used to control db size
|
||||
///
|
||||
/// Currently effectively unbounded.
|
||||
pub const BUFF_SIZE_ROCKSDB: usize = usize::MAX;
|
||||
|
||||
/// Size of stored blocks cache in memory
|
||||
///
|
||||
/// Keeping small to not run out of memory
|
||||
pub const CACHE_SIZE: usize = 1000;
|
||||
|
||||
/// Key base for storing metainformation about id of first block in db
|
||||
pub const DB_META_FIRST_BLOCK_IN_DB_KEY: &str = "first_block_in_db";
|
||||
/// Key base for storing metainformation about id of last current block in db
|
||||
pub const DB_META_LAST_BLOCK_IN_DB_KEY: &str = "last_block_in_db";
|
||||
/// Key base for storing metainformation which describe if first block has been set
|
||||
pub const DB_META_FIRST_BLOCK_SET_KEY: &str = "first_block_set";
|
||||
/// Key base for storing metainformation about the last finalized block on Bedrock
|
||||
pub const DB_META_LAST_FINALIZED_BLOCK_ID: &str = "last_finalized_block_id";
|
||||
/// Key base for storing metainformation about the latest block meta
|
||||
pub const DB_META_LATEST_BLOCK_META_KEY: &str = "latest_block_meta";
|
||||
|
||||
/// Key base for storing the NSSA state
|
||||
pub const DB_NSSA_STATE_KEY: &str = "nssa_state";
|
||||
|
||||
/// Name of block column family
|
||||
pub const CF_BLOCK_NAME: &str = "cf_block";
|
||||
/// Name of meta column family
|
||||
pub const CF_META_NAME: &str = "cf_meta";
|
||||
/// Name of state column family
|
||||
pub const CF_NSSA_STATE_NAME: &str = "cf_nssa_state";
|
||||
|
||||
pub type DbResult<T> = Result<T, DbError>;
|
||||
|
||||
pub struct RocksDBIO {
|
||||
pub db: DBWithThreadMode<MultiThreaded>,
|
||||
}
|
||||
|
||||
impl RocksDBIO {
|
||||
pub fn open_or_create(
|
||||
path: &Path,
|
||||
start_block: Option<(&Block, MantleMsgId)>,
|
||||
) -> DbResult<Self> {
|
||||
let mut cf_opts = Options::default();
|
||||
cf_opts.set_max_write_buffer_number(16);
|
||||
// ToDo: Add more column families for different data
|
||||
let cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone());
|
||||
let cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone());
|
||||
let cfstate = ColumnFamilyDescriptor::new(CF_NSSA_STATE_NAME, cf_opts.clone());
|
||||
|
||||
let mut db_opts = Options::default();
|
||||
db_opts.create_missing_column_families(true);
|
||||
db_opts.create_if_missing(true);
|
||||
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
|
||||
&db_opts,
|
||||
path,
|
||||
vec![cfb, cfmeta, cfstate],
|
||||
);
|
||||
|
||||
let dbio = Self {
|
||||
// There is no point in handling this from runner code
|
||||
db: db.unwrap(),
|
||||
};
|
||||
|
||||
let is_start_set = dbio.get_meta_is_first_block_set()?;
|
||||
|
||||
if is_start_set {
|
||||
Ok(dbio)
|
||||
} else if let Some((block, msg_id)) = start_block {
|
||||
let block_id = block.header.block_id;
|
||||
dbio.put_meta_first_block_in_db(block, msg_id)?;
|
||||
dbio.put_meta_is_first_block_set()?;
|
||||
dbio.put_meta_last_block_in_db(block_id)?;
|
||||
dbio.put_meta_last_finalized_block_id(None)?;
|
||||
dbio.put_meta_latest_block_meta(&BlockMeta {
|
||||
id: block.header.block_id,
|
||||
hash: block.header.hash,
|
||||
msg_id,
|
||||
})?;
|
||||
|
||||
Ok(dbio)
|
||||
} else {
|
||||
// Here we are trying to start a DB without a block, one should not do it.
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn destroy(path: &Path) -> DbResult<()> {
|
||||
let mut cf_opts = Options::default();
|
||||
cf_opts.set_max_write_buffer_number(16);
|
||||
// ToDo: Add more column families for different data
|
||||
let _cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone());
|
||||
let _cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone());
|
||||
let _cfstate = ColumnFamilyDescriptor::new(CF_NSSA_STATE_NAME, cf_opts.clone());
|
||||
|
||||
let mut db_opts = Options::default();
|
||||
db_opts.create_missing_column_families(true);
|
||||
db_opts.create_if_missing(true);
|
||||
DBWithThreadMode::<MultiThreaded>::destroy(&db_opts, path)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))
|
||||
}
|
||||
|
||||
pub fn meta_column(&self) -> Arc<BoundColumnFamily<'_>> {
|
||||
self.db.cf_handle(CF_META_NAME).unwrap()
|
||||
}
|
||||
|
||||
pub fn block_column(&self) -> Arc<BoundColumnFamily<'_>> {
|
||||
self.db.cf_handle(CF_BLOCK_NAME).unwrap()
|
||||
}
|
||||
|
||||
pub fn nssa_state_column(&self) -> Arc<BoundColumnFamily<'_>> {
|
||||
self.db.cf_handle(CF_NSSA_STATE_NAME).unwrap()
|
||||
}
|
||||
|
||||
pub fn get_meta_first_block_in_db(&self) -> DbResult<u64> {
|
||||
let cf_meta = self.meta_column();
|
||||
let res = self
|
||||
.db
|
||||
.get_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_FIRST_BLOCK_IN_DB_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_FIRST_BLOCK_IN_DB_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
|
||||
if let Some(data) = res {
|
||||
Ok(borsh::from_slice::<u64>(&data).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to deserialize first block".to_string()),
|
||||
)
|
||||
})?)
|
||||
} else {
|
||||
Err(DbError::db_interaction_error(
|
||||
"First block not found".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_meta_last_block_in_db(&self) -> DbResult<u64> {
|
||||
let cf_meta = self.meta_column();
|
||||
let res = self
|
||||
.db
|
||||
.get_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
|
||||
if let Some(data) = res {
|
||||
Ok(borsh::from_slice::<u64>(&data).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to deserialize last block".to_string()),
|
||||
)
|
||||
})?)
|
||||
} else {
|
||||
Err(DbError::db_interaction_error(
|
||||
"Last block not found".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_meta_is_first_block_set(&self) -> DbResult<bool> {
|
||||
let cf_meta = self.meta_column();
|
||||
let res = self
|
||||
.db
|
||||
.get_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_FIRST_BLOCK_SET_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_FIRST_BLOCK_SET_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
|
||||
Ok(res.is_some())
|
||||
}
|
||||
|
||||
pub fn put_nssa_state_in_db(&self, state: &V02State, batch: &mut WriteBatch) -> DbResult<()> {
|
||||
let cf_nssa_state = self.nssa_state_column();
|
||||
batch.put_cf(
|
||||
&cf_nssa_state,
|
||||
borsh::to_vec(&DB_NSSA_STATE_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_NSSA_STATE_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
borsh::to_vec(state).map_err(|err| {
|
||||
DbError::borsh_cast_message(err, Some("Failed to serialize NSSA state".to_string()))
|
||||
})?,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_meta_first_block_in_db(&self, block: &Block, msg_id: MantleMsgId) -> DbResult<()> {
|
||||
let cf_meta = self.meta_column();
|
||||
self.db
|
||||
.put_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_FIRST_BLOCK_IN_DB_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_FIRST_BLOCK_IN_DB_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
borsh::to_vec(&block.header.block_id).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize first block id".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
|
||||
let mut batch = WriteBatch::default();
|
||||
self.put_block(block, msg_id, true, &mut batch)?;
|
||||
self.db.write(batch).map_err(|rerr| {
|
||||
DbError::rocksdb_cast_message(
|
||||
rerr,
|
||||
Some("Failed to write first block in db".to_string()),
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_meta_last_block_in_db(&self, block_id: u64) -> DbResult<()> {
|
||||
let cf_meta = self.meta_column();
|
||||
self.db
|
||||
.put_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
borsh::to_vec(&block_id).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize last block id".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put_meta_last_block_in_db_batch(
|
||||
&self,
|
||||
block_id: u64,
|
||||
batch: &mut WriteBatch,
|
||||
) -> DbResult<()> {
|
||||
let cf_meta = self.meta_column();
|
||||
batch.put_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
borsh::to_vec(&block_id).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize last block id".to_string()),
|
||||
)
|
||||
})?,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_meta_last_finalized_block_id(&self, block_id: Option<u64>) -> DbResult<()> {
|
||||
let cf_meta = self.meta_column();
|
||||
self.db
|
||||
.put_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_LAST_FINALIZED_BLOCK_ID).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_LAST_FINALIZED_BLOCK_ID".to_string()),
|
||||
)
|
||||
})?,
|
||||
borsh::to_vec(&block_id).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize last block id".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_meta_is_first_block_set(&self) -> DbResult<()> {
|
||||
let cf_meta = self.meta_column();
|
||||
self.db
|
||||
.put_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_FIRST_BLOCK_SET_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_FIRST_BLOCK_SET_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
[1u8; 1],
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put_meta_latest_block_meta(&self, block_meta: &BlockMeta) -> DbResult<()> {
|
||||
let cf_meta = self.meta_column();
|
||||
self.db
|
||||
.put_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
borsh::to_vec(&block_meta).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize latest block meta".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put_meta_latest_block_meta_batch(
|
||||
&self,
|
||||
block_meta: &BlockMeta,
|
||||
batch: &mut WriteBatch,
|
||||
) -> DbResult<()> {
|
||||
let cf_meta = self.meta_column();
|
||||
batch.put_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
borsh::to_vec(&block_meta).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize latest block meta".to_string()),
|
||||
)
|
||||
})?,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn latest_block_meta(&self) -> DbResult<BlockMeta> {
|
||||
let cf_meta = self.meta_column();
|
||||
let res = self
|
||||
.db
|
||||
.get_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
|
||||
if let Some(data) = res {
|
||||
Ok(borsh::from_slice::<BlockMeta>(&data).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to deserialize latest block meta".to_string()),
|
||||
)
|
||||
})?)
|
||||
} else {
|
||||
Err(DbError::db_interaction_error(
|
||||
"Latest block meta not found".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn put_block(
|
||||
&self,
|
||||
block: &Block,
|
||||
msg_id: MantleMsgId,
|
||||
first: bool,
|
||||
batch: &mut WriteBatch,
|
||||
) -> DbResult<()> {
|
||||
let cf_block = self.block_column();
|
||||
|
||||
if !first {
|
||||
let last_curr_block = self.get_meta_last_block_in_db()?;
|
||||
|
||||
if block.header.block_id > last_curr_block {
|
||||
self.put_meta_last_block_in_db_batch(block.header.block_id, batch)?;
|
||||
self.put_meta_latest_block_meta_batch(
|
||||
&BlockMeta {
|
||||
id: block.header.block_id,
|
||||
hash: block.header.hash,
|
||||
msg_id,
|
||||
},
|
||||
batch,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
batch.put_cf(
|
||||
&cf_block,
|
||||
borsh::to_vec(&block.header.block_id).map_err(|err| {
|
||||
DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_string()))
|
||||
})?,
|
||||
borsh::to_vec(block).map_err(|err| {
|
||||
DbError::borsh_cast_message(err, Some("Failed to serialize block data".to_string()))
|
||||
})?,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_block(&self, block_id: u64) -> DbResult<Block> {
|
||||
let cf_block = self.block_column();
|
||||
let res = self
|
||||
.db
|
||||
.get_cf(
|
||||
&cf_block,
|
||||
borsh::to_vec(&block_id).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize block id".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
|
||||
if let Some(data) = res {
|
||||
Ok(borsh::from_slice::<Block>(&data).map_err(|serr| {
|
||||
DbError::borsh_cast_message(
|
||||
serr,
|
||||
Some("Failed to deserialize block data".to_string()),
|
||||
)
|
||||
})?)
|
||||
} else {
|
||||
Err(DbError::db_interaction_error(
|
||||
"Block on this id not found".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_nssa_state(&self) -> DbResult<V02State> {
|
||||
let cf_nssa_state = self.nssa_state_column();
|
||||
let res = self
|
||||
.db
|
||||
.get_cf(
|
||||
&cf_nssa_state,
|
||||
borsh::to_vec(&DB_NSSA_STATE_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize block id".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
|
||||
if let Some(data) = res {
|
||||
Ok(borsh::from_slice::<V02State>(&data).map_err(|serr| {
|
||||
DbError::borsh_cast_message(
|
||||
serr,
|
||||
Some("Failed to deserialize block data".to_string()),
|
||||
)
|
||||
})?)
|
||||
} else {
|
||||
Err(DbError::db_interaction_error(
|
||||
"Block on this id not found".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn delete_block(&self, block_id: u64) -> DbResult<()> {
|
||||
let cf_block = self.block_column();
|
||||
let key = borsh::to_vec(&block_id).map_err(|err| {
|
||||
DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_string()))
|
||||
})?;
|
||||
|
||||
if self
|
||||
.db
|
||||
.get_cf(&cf_block, &key)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?
|
||||
.is_none()
|
||||
{
|
||||
return Err(DbError::db_interaction_error(
|
||||
"Block on this id not found".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
self.db
|
||||
.delete_cf(&cf_block, key)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn mark_block_as_finalized(&self, block_id: u64) -> DbResult<()> {
|
||||
let mut block = self.get_block(block_id)?;
|
||||
block.bedrock_status = BedrockStatus::Finalized;
|
||||
|
||||
let cf_block = self.block_column();
|
||||
self.db
|
||||
.put_cf(
|
||||
&cf_block,
|
||||
borsh::to_vec(&block_id).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize block id".to_string()),
|
||||
)
|
||||
})?,
|
||||
borsh::to_vec(&block).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize block data".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| {
|
||||
DbError::rocksdb_cast_message(
|
||||
rerr,
|
||||
Some(format!("Failed to mark block {block_id} as finalized")),
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_all_blocks(&self) -> impl Iterator<Item = DbResult<Block>> {
|
||||
let cf_block = self.block_column();
|
||||
self.db
|
||||
.iterator_cf(&cf_block, rocksdb::IteratorMode::Start)
|
||||
.map(|res| {
|
||||
let (_key, value) = res.map_err(|rerr| {
|
||||
DbError::rocksdb_cast_message(
|
||||
rerr,
|
||||
Some("Failed to get key value pair".to_string()),
|
||||
)
|
||||
})?;
|
||||
|
||||
borsh::from_slice::<Block>(&value).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to deserialize block data".to_string()),
|
||||
)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
pub fn atomic_update(
|
||||
&self,
|
||||
block: &Block,
|
||||
msg_id: MantleMsgId,
|
||||
state: &V02State,
|
||||
) -> DbResult<()> {
|
||||
let block_id = block.header.block_id;
|
||||
let mut batch = WriteBatch::default();
|
||||
self.put_block(block, msg_id, false, &mut batch)?;
|
||||
self.put_nssa_state_in_db(state, &mut batch)?;
|
||||
self.db.write(batch).map_err(|rerr| {
|
||||
DbError::rocksdb_cast_message(
|
||||
rerr,
|
||||
Some(format!("Failed to udpate db with block {block_id}")),
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user