feat: full integration try 1

This commit is contained in:
Pravdyvy 2026-01-30 12:51:18 +02:00
parent 17a1be5b0c
commit 0efc522837
16 changed files with 325 additions and 122 deletions

1
Cargo.lock generated
View File

@ -2791,6 +2791,7 @@ dependencies = [
"log",
"logos-blockchain-core",
"nssa",
"nssa_core",
"serde",
"serde_json",
"storage",

View File

@ -1,4 +1,5 @@
use borsh::{BorshDeserialize, BorshSerialize};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256, digest::FixedOutput};
use crate::transaction::EncodedTransaction;
@ -102,6 +103,21 @@ impl From<Block> for HashableBlockData {
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
/// Helperstruct for account serialization
pub struct AccountInitialData {
/// Hex encoded account id
pub account_id: String,
pub balance: u128,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
/// Helperstruct to initialize commitments
pub struct CommitmentsInitialData {
pub npk: nssa_core::NullifierPublicKey,
pub account: nssa_core::account::Account,
}
#[cfg(test)]
mod tests {
use crate::{block::HashableBlockData, test_utils};

View File

@ -39,6 +39,9 @@ pub struct GetBlockRangeDataRequest {
#[derive(Serialize, Deserialize, Debug)]
pub struct GetGenesisIdRequest {}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetGenesisBlockRequest {}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetLastBlockRequest {}
@ -93,6 +96,7 @@ parse_request!(GetProofForCommitmentRequest);
parse_request!(GetAccountRequest);
parse_request!(GetProgramIdsRequest);
parse_request!(PostIndexerMessageRequest);
parse_request!(GetGenesisBlockRequest);
#[derive(Serialize, Deserialize, Debug)]
pub struct HelloResponse {
@ -181,6 +185,11 @@ pub struct GetGenesisIdResponse {
pub genesis_id: u64,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetGenesisBlockResponse {
pub genesis_block_borsh_ser: Vec<u8>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetLastBlockResponse {
pub last_block: u64,

View File

@ -13,17 +13,18 @@ use super::rpc_primitives::requests::{
GetGenesisIdRequest, GetGenesisIdResponse, GetInitialTestnetAccountsRequest,
};
use crate::{
block::Block,
error::{SequencerClientError, SequencerRpcError},
rpc_primitives::{
self,
requests::{
GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest,
GetAccountsNoncesResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse,
GetInitialTestnetAccountsResponse, GetLastBlockRequest, GetLastBlockResponse,
GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest,
GetProofForCommitmentResponse, GetTransactionByHashRequest,
GetTransactionByHashResponse, PostIndexerMessageRequest, PostIndexerMessageResponse,
SendTxRequest, SendTxResponse,
GetGenesisBlockRequest, GetGenesisBlockResponse, GetInitialTestnetAccountsResponse,
GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse,
GetProofForCommitmentRequest, GetProofForCommitmentResponse,
GetTransactionByHashRequest, GetTransactionByHashResponse, PostIndexerMessageRequest,
PostIndexerMessageResponse, SendTxRequest, SendTxResponse,
},
},
transaction::{EncodedTransaction, NSSATransaction},
@ -319,6 +320,24 @@ impl SequencerClient {
Ok(resp_deser)
}
/// Get genesis block from sequencer
///
/// ToDo: Error handling
pub async fn get_genesis_block(&self) -> Result<Block, SequencerClientError> {
let genesis_req = GetGenesisBlockRequest {};
let req = serde_json::to_value(genesis_req).unwrap();
let resp = self
.call_method_with_payload("get_genesis_block", req)
.await
.unwrap();
let resp_deser = serde_json::from_value::<GetGenesisBlockResponse>(resp).unwrap();
Ok(borsh::from_slice(&resp_deser.genesis_block_borsh_ser).unwrap())
}
/// Get initial testnet accounts from sequencer
pub async fn get_initial_testnet_accounts(
&self,

View File

@ -7,6 +7,7 @@ edition = "2024"
common.workspace = true
bedrock_client.workspace = true
nssa.workspace = true
nssa_core.workspace = true
storage.workspace = true
anyhow.workspace = true
@ -18,3 +19,7 @@ futures.workspace = true
url.workspace = true
logos-blockchain-core.workspace = true
serde_json.workspace = true
[features]
default = []
testnet = []

View File

@ -1,7 +1,10 @@
use std::path::Path;
use anyhow::Result;
use common::{block::Block, transaction::{NSSATransaction, execute_check_transaction_on_state}};
use common::{
block::Block,
transaction::{NSSATransaction, execute_check_transaction_on_state, transaction_pre_check},
};
use nssa::V02State;
use storage::indexer::RocksDBIO;
@ -20,9 +23,7 @@ impl IndexerStore {
) -> Result<Self> {
let dbio = RocksDBIO::open_or_create(location, start_data)?;
Ok(Self {
dbio,
})
Ok(Self { dbio })
}
/// Reopening existing database
@ -35,66 +36,36 @@ impl IndexerStore {
}
pub fn genesis_id(&self) -> u64 {
self.dbio.get_meta_first_block_in_db().expect("Must be set at the DB startup")
self.dbio
.get_meta_first_block_in_db()
.expect("Must be set at the DB startup")
}
pub fn last_block(&self) -> u64 {
self.dbio.get_meta_last_block_in_db().expect("Must be set at the DB startup")
self.dbio
.get_meta_last_block_in_db()
.expect("Must be set at the DB startup")
}
pub fn get_state_at_block(&self, block_id: u64) -> Result<V02State> {
Ok(self.dbio.calculate_state_for_id(block_id)?)
}
pub fn final_state(&self) -> Result<V02State> {
Ok(self.dbio.final_state()?)
}
pub fn put_block(&self, block: Block) -> Result<()> {
let mut final_state = self.dbio.final_state()?;
for encoded_transaction in &block.body.transactions {
let transaction = NSSATransaction::try_from(encoded_transaction)?;
execute_check_transaction_on_state(&mut final_state, transaction)?;
execute_check_transaction_on_state(
&mut final_state,
transaction_pre_check(transaction)?,
)?;
}
Ok(self.dbio.put_block(block)?)
}
}
// #[cfg(test)]
// mod tests {
// use common::{block::HashableBlockData, test_utils::sequencer_sign_key_for_testing};
// use tempfile::tempdir;
// use super::*;
// #[test]
// fn test_get_transaction_by_hash() {
// let temp_dir = tempdir().unwrap();
// let path = temp_dir.path();
// let signing_key = sequencer_sign_key_for_testing();
// let genesis_block_hashable_data = HashableBlockData {
// block_id: 0,
// prev_block_hash: [0; 32],
// timestamp: 0,
// transactions: vec![],
// };
// let genesis_block = genesis_block_hashable_data.into_pending_block(&signing_key, [0; 32]);
// // Start an empty node store
// let mut node_store =
// SequencerStore::open_db_with_genesis(path, Some(genesis_block), signing_key).unwrap();
// let tx = common::test_utils::produce_dummy_empty_transaction();
// let block = common::test_utils::produce_dummy_block(1, None, vec![tx.clone()]);
// // Try retrieve a tx that's not in the chain yet.
// let retrieved_tx = node_store.get_transaction_by_hash(tx.hash());
// assert_eq!(None, retrieved_tx);
// // Add the block with the transaction
// let dummy_state = V02State::new_with_genesis_accounts(&[], &[]);
// node_store.update(block, &dummy_state).unwrap();
// // Try again
// let retrieved_tx = node_store.get_transaction_by_hash(tx.hash());
// assert_eq!(Some(tx), retrieved_tx);
// }
// }

View File

@ -1,8 +1,15 @@
use std::{fs::File, io::BufReader, path::Path};
use std::{
fs::File,
io::BufReader,
path::{Path, PathBuf},
};
use anyhow::{Context, Result};
use bedrock_client::BackoffConfig;
use common::sequencer_client::BasicAuth;
use common::{
block::{AccountInitialData, CommitmentsInitialData},
sequencer_client::BasicAuth,
};
use logos_blockchain_core::mantle::ops::channel::ChannelId;
use serde::{Deserialize, Serialize};
use url::Url;
@ -17,6 +24,12 @@ pub struct ClientConfig {
#[derive(Debug, Clone, Serialize, Deserialize)]
/// Note: For individual RPC requests we use Fibonacci backoff retry strategy
pub struct IndexerConfig {
/// Home dir of sequencer storage
pub home: PathBuf,
/// List of initial accounts data
pub initial_accounts: Vec<AccountInitialData>,
/// List of initial commitments
pub initial_commitments: Vec<CommitmentsInitialData>,
pub resubscribe_interval_millis: u64,
pub backoff: BackoffConfig,
pub bedrock_client_config: ClientConfig,

View File

@ -1,9 +1,9 @@
use std::sync::Arc;
use anyhow::Result;
use bedrock_client::BedrockClient;
// ToDo: Remove after testnet
use common::PINATA_BASE58;
use common::{
block::HashableBlockData, communication::indexer::Message,
block::Block, communication::indexer::Message,
rpc_primitives::requests::PostIndexerMessageResponse, sequencer_client::SequencerClient,
};
use futures::StreamExt;
@ -12,37 +12,65 @@ use logos_blockchain_core::mantle::{
Op, SignedMantleTx,
ops::channel::{ChannelId, inscribe::InscriptionOp},
};
use tokio::sync::RwLock;
use crate::{config::IndexerConfig, state::IndexerState};
use crate::{block_store::IndexerStore, config::IndexerConfig};
pub mod block_store;
pub mod config;
pub mod state;
pub mod block_store;
pub struct IndexerCore {
pub bedrock_client: BedrockClient,
pub sequencer_client: SequencerClient,
pub config: IndexerConfig,
pub state: IndexerState,
pub store: IndexerStore,
}
impl IndexerCore {
pub fn new(config: IndexerConfig) -> Result<Self> {
pub async fn new(config: IndexerConfig) -> Result<Self> {
let sequencer_client = SequencerClient::new_with_auth(
config.sequencer_client_config.addr.clone(),
config.sequencer_client_config.auth.clone(),
)?;
let start_block = sequencer_client.get_genesis_block().await?;
let initial_commitments: Vec<nssa_core::Commitment> = config
.initial_commitments
.iter()
.map(|init_comm_data| {
let npk = &init_comm_data.npk;
let mut acc = init_comm_data.account.clone();
acc.program_owner = nssa::program::Program::authenticated_transfer_program().id();
nssa_core::Commitment::new(npk, &acc)
})
.collect();
let init_accs: Vec<(nssa::AccountId, u128)> = config
.initial_accounts
.iter()
.map(|acc_data| (acc_data.account_id.parse().unwrap(), acc_data.balance))
.collect();
let mut state = nssa::V02State::new_with_genesis_accounts(&init_accs, &initial_commitments);
// ToDo: Remove after testnet
state.add_pinata_program(PINATA_BASE58.parse().unwrap());
let home = config.home.clone();
Ok(Self {
bedrock_client: BedrockClient::new(
config.bedrock_client_config.auth.clone().map(Into::into),
config.bedrock_client_config.addr.clone(),
)?,
sequencer_client: SequencerClient::new_with_auth(
config.sequencer_client_config.addr.clone(),
config.sequencer_client_config.auth.clone(),
)?,
sequencer_client,
config,
// No state setup for now, future task.
state: IndexerState {
latest_seen_block: Arc::new(RwLock::new(0)),
},
// ToDo: Implement restarts
store: IndexerStore::open_db_with_genesis(&home, Some((start_block, state)))?,
})
}
@ -70,18 +98,13 @@ impl IndexerCore {
);
for l2_block in l2_blocks_parsed {
let l2_block_height = l2_block.header.block_id;
// State modification, will be updated in future
{
let mut guard = self.state.latest_seen_block.write().await;
if l2_block.block_id > *guard {
*guard = l2_block.block_id;
}
}
self.store.put_block(l2_block)?;
// Sending data into sequencer, may need to be expanded.
let message = Message::L2BlockFinalized {
l2_block_height: l2_block.block_id,
};
let message = Message::L2BlockFinalized { l2_block_height };
let status = self.send_message_to_sequencer(message.clone()).await?;
@ -109,7 +132,7 @@ impl IndexerCore {
fn parse_blocks(
block_txs: impl Iterator<Item = SignedMantleTx>,
decoded_channel_id: &ChannelId,
) -> impl Iterator<Item = HashableBlockData> {
) -> impl Iterator<Item = Block> {
block_txs.flat_map(|tx| {
tx.mantle_tx.ops.into_iter().filter_map(|op| match op {
Op::ChannelInscribe(InscriptionOp {
@ -117,7 +140,7 @@ fn parse_blocks(
inscription,
..
}) if channel_id == *decoded_channel_id => {
borsh::from_slice::<HashableBlockData>(&inscription).ok()
borsh::from_slice::<Block>(&inscription).ok()
}
_ => None,
})

View File

@ -1,4 +1,119 @@
{
"home": "",
"initial_accounts": [
{
"account_id": "BLgCRDXYdQPMMWVHYRFGQZbgeHx9frkipa8GtpG2Syqy",
"balance": 10000
},
{
"account_id": "Gj1mJy5W7J5pfmLRujmQaLfLMWidNxQ6uwnhb666ZwHw",
"balance": 20000
}
],
"initial_commitments": [
{
"npk": [
63,
202,
178,
231,
183,
82,
237,
212,
216,
221,
215,
255,
153,
101,
177,
161,
254,
210,
128,
122,
54,
190,
230,
151,
183,
64,
225,
229,
113,
1,
228,
97
],
"account": {
"program_owner": [
0,
0,
0,
0,
0,
0,
0,
0
],
"balance": 10000,
"data": [],
"nonce": 0
}
},
{
"npk": [
192,
251,
166,
243,
167,
236,
84,
249,
35,
136,
130,
172,
219,
225,
161,
139,
229,
89,
243,
125,
194,
213,
209,
30,
23,
174,
100,
244,
124,
74,
140,
47
],
"account": {
"program_owner": [
0,
0,
0,
0,
0,
0,
0,
0
],
"balance": 20000,
"data": [],
"nonce": 0
}
}
],
"bedrock_client_config": {
"addr": "http://127.0.0.1:8080",
"auth": {

View File

@ -127,7 +127,7 @@ impl TestContext {
indexer_config.sequencer_client_config.addr =
Url::parse(&sequencer_addr).context("Failed to parse sequencer addr")?;
let indexer_core = IndexerCore::new(indexer_config)?;
let indexer_core = IndexerCore::new(indexer_config).await?;
let indexer_loop_handle = Some(tokio::spawn(async move {
indexer_core.subscribe_parse_block_stream().await

View File

@ -1,6 +1,7 @@
use std::time::{Duration, Instant};
use anyhow::Result;
use common::block::{AccountInitialData, CommitmentsInitialData};
use integration_tests::TestContext;
use key_protocol::key_management::ephemeral_key_holder::EphemeralKeyHolder;
use log::info;
@ -15,7 +16,7 @@ use nssa_core::{
account::{AccountWithMetadata, data::Data},
encryption::IncomingViewingPublicKey,
};
use sequencer_core::config::{AccountInitialData, CommitmentsInitialData, SequencerConfig};
use sequencer_core::config::SequencerConfig;
use tokio::test;
// TODO: Make a proper benchmark instead of an ad-hoc test

View File

@ -5,25 +5,13 @@ use std::{
};
use anyhow::Result;
use common::sequencer_client::BasicAuth;
use common::{
block::{AccountInitialData, CommitmentsInitialData},
sequencer_client::BasicAuth,
};
use logos_blockchain_core::mantle::ops::channel::ChannelId;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone)]
/// Helperstruct for account serialization
pub struct AccountInitialData {
/// Hex encoded account id
pub account_id: String,
pub balance: u128,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
/// Helperstruct to initialize commitments
pub struct CommitmentsInitialData {
pub npk: nssa_core::NullifierPublicKey,
pub account: nssa_core::account::Account,
}
// TODO: Provide default values
#[derive(Clone, Serialize, Deserialize)]
pub struct SequencerConfig {

View File

@ -257,11 +257,13 @@ mod tests {
use std::pin::pin;
use base58::{FromBase58, ToBase58};
use common::{test_utils::sequencer_sign_key_for_testing, transaction::transaction_pre_check};
use common::{
block::AccountInitialData, test_utils::sequencer_sign_key_for_testing,
transaction::transaction_pre_check,
};
use nssa::PrivateKey;
use super::*;
use crate::config::AccountInitialData;
fn parse_unwrap_tx_body_into_nssa_tx(tx_body: EncodedTransaction) -> NSSATransaction {
NSSATransaction::try_from(&tx_body)

View File

@ -5,7 +5,7 @@ use base58::FromBase58;
use base64::{Engine, engine::general_purpose};
use common::{
HashType,
block::HashableBlockData,
block::{AccountInitialData, HashableBlockData},
rpc_primitives::{
errors::RpcError,
message::{Message, Request},
@ -14,20 +14,21 @@ use common::{
GetAccountBalanceRequest, GetAccountBalanceResponse, GetAccountRequest,
GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse,
GetBlockDataRequest, GetBlockDataResponse, GetBlockRangeDataRequest,
GetBlockRangeDataResponse, GetGenesisIdRequest, GetGenesisIdResponse,
GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse,
GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest,
GetProofForCommitmentResponse, GetTransactionByHashRequest,
GetTransactionByHashResponse, HelloRequest, HelloResponse, PostIndexerMessageRequest,
PostIndexerMessageResponse, SendTxRequest, SendTxResponse,
GetBlockRangeDataResponse, GetGenesisBlockRequest, GetGenesisBlockResponse,
GetGenesisIdRequest, GetGenesisIdResponse, GetInitialTestnetAccountsRequest,
GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse,
GetProofForCommitmentRequest, GetProofForCommitmentResponse,
GetTransactionByHashRequest, GetTransactionByHashResponse, HelloRequest, HelloResponse,
PostIndexerMessageRequest, PostIndexerMessageResponse, SendTxRequest, SendTxResponse,
},
},
transaction::{EncodedTransaction, NSSATransaction, TransactionMalformationError, transaction_pre_check},
transaction::{
EncodedTransaction, NSSATransaction, TransactionMalformationError, transaction_pre_check,
},
};
use itertools::Itertools as _;
use log::warn;
use nssa::{self, program::Program};
use sequencer_core::config::AccountInitialData;
use serde_json::Value;
use super::{JsonHandler, respond, types::err_rpc::RpcErr};
@ -37,6 +38,7 @@ pub const SEND_TX: &str = "send_tx";
pub const GET_BLOCK: &str = "get_block";
pub const GET_BLOCK_RANGE: &str = "get_block_range";
pub const GET_GENESIS: &str = "get_genesis";
pub const GET_GENESIS_BLOCK: &str = "get_genesis_block";
pub const GET_LAST_BLOCK: &str = "get_last_block";
pub const GET_ACCOUNT_BALANCE: &str = "get_account_balance";
pub const GET_TRANSACTION_BY_HASH: &str = "get_transaction_by_hash";
@ -157,6 +159,25 @@ impl JsonHandler {
respond(response)
}
async fn process_get_genesis_block(&self, request: Request) -> Result<Value, RpcErr> {
let _get_genesis_req = GetGenesisBlockRequest::parse(Some(request.params))?;
let genesis_block = {
let state = self.sequencer_state.lock().await;
let gen_id = state.block_store().genesis_id();
state.block_store().get_block_at_id(gen_id)?
};
let response = GetGenesisBlockResponse {
genesis_block_borsh_ser: borsh::to_vec(&genesis_block)
.expect("Block must serialize correctly"),
};
respond(response)
}
async fn process_get_last_block(&self, request: Request) -> Result<Value, RpcErr> {
let _get_last_block_req = GetLastBlockRequest::parse(Some(request.params))?;
@ -334,6 +355,7 @@ impl JsonHandler {
GET_BLOCK => self.process_get_block_data(request).await,
GET_BLOCK_RANGE => self.process_get_block_range_data(request).await,
GET_GENESIS => self.process_get_genesis(request).await,
GET_GENESIS_BLOCK => self.process_get_genesis_block(request).await,
GET_LAST_BLOCK => self.process_get_last_block(request).await,
GET_INITIAL_TESTNET_ACCOUNTS => self.get_initial_testnet_accounts(request).await,
GET_ACCOUNT_BALANCE => self.process_get_account_balance(request).await,
@ -355,12 +377,12 @@ mod tests {
use base58::ToBase58;
use base64::{Engine, engine::general_purpose};
use common::{
sequencer_client::BasicAuth, test_utils::sequencer_sign_key_for_testing,
transaction::EncodedTransaction,
block::AccountInitialData, sequencer_client::BasicAuth,
test_utils::sequencer_sign_key_for_testing, transaction::EncodedTransaction,
};
use sequencer_core::{
SequencerCore,
config::{AccountInitialData, BedrockConfig, SequencerConfig},
config::{BedrockConfig, SequencerConfig},
};
use serde_json::Value;
use tempfile::tempdir;

View File

@ -1,4 +1,7 @@
use common::{rpc_primitives::errors::{RpcError, RpcParseError}, transaction::TransactionMalformationError};
use common::{
rpc_primitives::errors::{RpcError, RpcParseError},
transaction::TransactionMalformationError,
};
use log::debug;
pub struct RpcErr(pub RpcError);

View File

@ -2,7 +2,7 @@ use std::{ops::Div, path::Path, sync::Arc};
use common::{
block::Block,
transaction::{NSSATransaction, execute_check_transaction_on_state},
transaction::{NSSATransaction, execute_check_transaction_on_state, transaction_pre_check},
};
use nssa::V02State;
use rocksdb::{
@ -53,10 +53,7 @@ pub struct RocksDBIO {
}
impl RocksDBIO {
pub fn open_or_create(
path: &Path,
start_data: Option<(Block, V02State)>,
) -> DbResult<Self> {
pub fn open_or_create(path: &Path, start_data: Option<(Block, V02State)>) -> DbResult<Self> {
let mut cf_opts = Options::default();
cf_opts.set_max_write_buffer_number(16);
// ToDo: Add more column families for different data
@ -449,9 +446,27 @@ impl RocksDBIO {
let block = self.get_block(id)?;
for encoded_transaction in block.body.transactions {
let transaction = NSSATransaction::try_from(&encoded_transaction).unwrap();
let transaction =
NSSATransaction::try_from(&encoded_transaction).map_err(|err| {
DbError::db_interaction_error(format!(
"failed to decode transaction in block {} with err {err:?}",
block.header.block_id
))
})?;
execute_check_transaction_on_state(&mut breakpoint, transaction).unwrap();
execute_check_transaction_on_state(
&mut breakpoint,
transaction_pre_check(transaction).map_err(|err| {
DbError::db_interaction_error(format!(
"transaction pre check failed with err {err:?}"
))
})?,
)
.map_err(|err| {
DbError::db_interaction_error(format!(
"transaction execution failed with err {err:?}"
))
})?;
}
}