Merge pull request #282 from logos-blockchain/Pravdyvy/block-parsing-validation of

Indexer block parsing
This commit is contained in:
Pravdyvy 2026-01-29 14:37:28 +02:00 committed by GitHub
commit d9839ea5a4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
50 changed files with 932 additions and 310 deletions

473
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -23,6 +23,7 @@ members = [
"examples/program_deployment/methods", "examples/program_deployment/methods",
"examples/program_deployment/methods/guest", "examples/program_deployment/methods/guest",
"bedrock_client", "bedrock_client",
"indexer_core",
] ]
[workspace.dependencies] [workspace.dependencies]
@ -41,6 +42,7 @@ indexer_service_rpc = { path = "indexer_service/rpc" }
wallet = { path = "wallet" } wallet = { path = "wallet" }
test_program_methods = { path = "test_program_methods" } test_program_methods = { path = "test_program_methods" }
bedrock_client = { path = "bedrock_client" } bedrock_client = { path = "bedrock_client" }
indexer_core = { path = "indexer_core" }
tokio = { version = "1.28.2", features = [ tokio = { version = "1.28.2", features = [
"net", "net",
@ -85,12 +87,14 @@ chrono = "0.4.41"
borsh = "1.5.7" borsh = "1.5.7"
base58 = "0.2.0" base58 = "0.2.0"
itertools = "0.14.0" itertools = "0.14.0"
url = "2.5.4" url = { version = "2.5.4", features = ["serde"] }
tokio-retry = "0.3.0"
schemars = "1.2.0" schemars = "1.2.0"
logos-blockchain-common-http-client = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } logos-blockchain-common-http-client = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
logos-blockchain-key-management-system-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } logos-blockchain-key-management-system-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
logos-blockchain-core = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } logos-blockchain-core = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
logos-blockchain-chain-broadcast-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
rocksdb = { version = "0.24.0", default-features = false, features = [ rocksdb = { version = "0.24.0", default-features = false, features = [
"snappy", "snappy",

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -6,5 +6,10 @@ edition = "2024"
[dependencies] [dependencies]
reqwest.workspace = true reqwest.workspace = true
anyhow.workspace = true anyhow.workspace = true
tokio-retry.workspace = true
futures.workspace = true
log.workspace = true
serde.workspace = true
logos-blockchain-common-http-client.workspace = true logos-blockchain-common-http-client.workspace = true
logos-blockchain-core.workspace = true logos-blockchain-core.workspace = true
logos-blockchain-chain-broadcast-service.workspace = true

View File

@ -1,7 +1,19 @@
use anyhow::Result; use anyhow::Result;
use futures::{Stream, TryFutureExt};
use log::warn;
pub use logos_blockchain_chain_broadcast_service::BlockInfo;
pub use logos_blockchain_common_http_client::{BasicAuthCredentials, CommonHttpClient, Error}; pub use logos_blockchain_common_http_client::{BasicAuthCredentials, CommonHttpClient, Error};
use logos_blockchain_core::mantle::SignedMantleTx; pub use logos_blockchain_core::{block::Block, header::HeaderId, mantle::SignedMantleTx};
use reqwest::{Client, Url}; use reqwest::{Client, Url};
use serde::{Deserialize, Serialize};
use tokio_retry::Retry;
/// Fibonacci backoff retry strategy configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackoffConfig {
pub start_delay_millis: u64,
pub max_retries: usize,
}
// Simple wrapper // Simple wrapper
// maybe extend in the future for our purposes // maybe extend in the future for our purposes
@ -29,4 +41,25 @@ impl BedrockClient {
.post_transaction(self.node_url.clone(), tx) .post_transaction(self.node_url.clone(), tx)
.await .await
} }
pub async fn get_lib_stream(&self) -> Result<impl Stream<Item = BlockInfo>, Error> {
self.http_client.get_lib_stream(self.node_url.clone()).await
}
pub async fn get_block_by_id(
&self,
header_id: HeaderId,
backoff: &BackoffConfig,
) -> Result<Option<Block<SignedMantleTx>>, Error> {
let strategy =
tokio_retry::strategy::FibonacciBackoff::from_millis(backoff.start_delay_millis)
.take(backoff.max_retries);
Retry::spawn(strategy, || {
self.http_client
.get_block_by_id(self.node_url.clone(), header_id)
.inspect_err(|err| warn!("Block fetching failed with err: {err:#?}"))
})
.await
}
} }

View File

@ -17,3 +17,5 @@ log.workspace = true
hex.workspace = true hex.workspace = true
borsh.workspace = true borsh.workspace = true
base64.workspace = true base64.workspace = true
url.workspace = true
logos-blockchain-common-http-client.workspace = true

View File

@ -78,6 +78,10 @@ impl HashableBlockData {
bedrock_status: BedrockStatus::Pending, bedrock_status: BedrockStatus::Pending,
} }
} }
pub fn block_hash(&self) -> BlockHash {
OwnHasher::hash(&borsh::to_vec(&self).unwrap())
}
} }
impl From<Block> for HashableBlockData { impl From<Block> for HashableBlockData {

View File

@ -0,0 +1,6 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Message {
L2BlockFinalized { l2_block_height: u64 },
}

View File

@ -0,0 +1 @@
pub mod indexer;

View File

@ -1,4 +1,5 @@
pub mod block; pub mod block;
pub mod communication;
pub mod error; pub mod error;
pub mod rpc_primitives; pub mod rpc_primitives;
pub mod sequencer_client; pub mod sequencer_client;

View File

@ -73,6 +73,11 @@ pub struct GetProofForCommitmentRequest {
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct GetProgramIdsRequest {} pub struct GetProgramIdsRequest {}
#[derive(Serialize, Deserialize, Debug)]
pub struct PostIndexerMessageRequest {
pub message: crate::communication::indexer::Message,
}
parse_request!(HelloRequest); parse_request!(HelloRequest);
parse_request!(RegisterAccountRequest); parse_request!(RegisterAccountRequest);
parse_request!(SendTxRequest); parse_request!(SendTxRequest);
@ -87,6 +92,7 @@ parse_request!(GetAccountsNoncesRequest);
parse_request!(GetProofForCommitmentRequest); parse_request!(GetProofForCommitmentRequest);
parse_request!(GetAccountRequest); parse_request!(GetAccountRequest);
parse_request!(GetProgramIdsRequest); parse_request!(GetProgramIdsRequest);
parse_request!(PostIndexerMessageRequest);
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct HelloResponse { pub struct HelloResponse {
@ -216,3 +222,8 @@ pub struct GetInitialTestnetAccountsResponse {
pub account_id: String, pub account_id: String,
pub balance: u64, pub balance: u64,
} }
#[derive(Serialize, Deserialize, Debug)]
pub struct PostIndexerMessageResponse {
pub status: String,
}

View File

@ -1,10 +1,12 @@
use std::{collections::HashMap, ops::RangeInclusive}; use std::{collections::HashMap, ops::RangeInclusive, str::FromStr};
use anyhow::Result; use anyhow::Result;
use logos_blockchain_common_http_client::BasicAuthCredentials;
use nssa_core::program::ProgramId; use nssa_core::program::ProgramId;
use reqwest::Client; use reqwest::Client;
use serde::Deserialize; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use url::Url;
use super::rpc_primitives::requests::{ use super::rpc_primitives::requests::{
GetAccountBalanceRequest, GetAccountBalanceResponse, GetBlockDataRequest, GetBlockDataResponse, GetAccountBalanceRequest, GetAccountBalanceResponse, GetBlockDataRequest, GetBlockDataResponse,
@ -20,28 +22,75 @@ use crate::{
GetInitialTestnetAccountsResponse, GetLastBlockRequest, GetLastBlockResponse, GetInitialTestnetAccountsResponse, GetLastBlockRequest, GetLastBlockResponse,
GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest,
GetProofForCommitmentResponse, GetTransactionByHashRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest,
GetTransactionByHashResponse, SendTxRequest, SendTxResponse, GetTransactionByHashResponse, PostIndexerMessageRequest, PostIndexerMessageResponse,
SendTxRequest, SendTxResponse,
}, },
}, },
transaction::{EncodedTransaction, NSSATransaction}, transaction::{EncodedTransaction, NSSATransaction},
}; };
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BasicAuth {
pub username: String,
pub password: Option<String>,
}
impl std::fmt::Display for BasicAuth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.username)?;
if let Some(password) = &self.password {
write!(f, ":{password}")?;
}
Ok(())
}
}
impl FromStr for BasicAuth {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let parse = || {
let mut parts = s.splitn(2, ':');
let username = parts.next()?;
let password = parts.next().filter(|p| !p.is_empty());
if parts.next().is_some() {
return None;
}
Some((username, password))
};
let (username, password) = parse().ok_or_else(|| {
anyhow::anyhow!("Invalid auth format. Expected 'user' or 'user:password'")
})?;
Ok(Self {
username: username.to_string(),
password: password.map(|p| p.to_string()),
})
}
}
impl From<BasicAuth> for BasicAuthCredentials {
fn from(value: BasicAuth) -> Self {
BasicAuthCredentials::new(value.username, value.password)
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct SequencerClient { pub struct SequencerClient {
pub client: reqwest::Client, pub client: reqwest::Client,
pub sequencer_addr: String, pub sequencer_addr: Url,
pub basic_auth: Option<(String, Option<String>)>, pub basic_auth: Option<BasicAuth>,
} }
impl SequencerClient { impl SequencerClient {
pub fn new(sequencer_addr: String) -> Result<Self> { pub fn new(sequencer_addr: Url) -> Result<Self> {
Self::new_with_auth(sequencer_addr, None) Self::new_with_auth(sequencer_addr, None)
} }
pub fn new_with_auth( pub fn new_with_auth(sequencer_addr: Url, basic_auth: Option<BasicAuth>) -> Result<Self> {
sequencer_addr: String,
basic_auth: Option<(String, Option<String>)>,
) -> Result<Self> {
Ok(Self { Ok(Self {
client: Client::builder() client: Client::builder()
// Add more fields if needed // Add more fields if needed
@ -66,9 +115,9 @@ impl SequencerClient {
"Calling method {method} with payload {request:?} to sequencer at {}", "Calling method {method} with payload {request:?} to sequencer at {}",
self.sequencer_addr self.sequencer_addr
); );
let mut call_builder = self.client.post(&self.sequencer_addr); let mut call_builder = self.client.post(self.sequencer_addr.clone());
if let Some((username, password)) = &self.basic_auth { if let Some(BasicAuth { username, password }) = &self.basic_auth {
call_builder = call_builder.basic_auth(username, password.as_deref()); call_builder = call_builder.basic_auth(username, password.as_deref());
} }
@ -347,4 +396,23 @@ impl SequencerClient {
Ok(resp_deser) Ok(resp_deser)
} }
/// Post indexer into sequencer
pub async fn post_indexer_message(
&self,
message: crate::communication::indexer::Message,
) -> Result<PostIndexerMessageResponse, SequencerClientError> {
let last_req = PostIndexerMessageRequest { message };
let req = serde_json::to_value(last_req).unwrap();
let resp = self
.call_method_with_payload("post_indexer_message", req)
.await
.unwrap();
let resp_deser = serde_json::from_value(resp).unwrap();
Ok(resp_deser)
}
} }

18
indexer_core/Cargo.toml Normal file
View File

@ -0,0 +1,18 @@
[package]
name = "indexer_core"
version = "0.1.0"
edition = "2024"
[dependencies]
common.workspace = true
bedrock_client.workspace = true
anyhow.workspace = true
log.workspace = true
serde.workspace = true
tokio.workspace = true
borsh.workspace = true
futures.workspace = true
url.workspace = true
logos-blockchain-core.workspace = true
serde_json.workspace = true

View File

@ -0,0 +1,36 @@
use std::{fs::File, io::BufReader, path::Path};
use anyhow::{Context, Result};
use bedrock_client::BackoffConfig;
use common::sequencer_client::BasicAuth;
use logos_blockchain_core::mantle::ops::channel::ChannelId;
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Debug, Clone, Serialize, Deserialize)]
/// ToDo: Expand if necessary
pub struct ClientConfig {
pub addr: Url,
pub auth: Option<BasicAuth>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
/// Note: For individual RPC requests we use Fibonacci backoff retry strategy
pub struct IndexerConfig {
pub resubscribe_interval_millis: u64,
pub backoff: BackoffConfig,
pub bedrock_client_config: ClientConfig,
pub sequencer_client_config: ClientConfig,
pub channel_id: ChannelId,
}
impl IndexerConfig {
pub fn from_path(config_home: &Path) -> Result<IndexerConfig> {
let file = File::open(config_home)
.with_context(|| format!("Failed to open indexer config at {config_home:?}"))?;
let reader = BufReader::new(file);
serde_json::from_reader(reader)
.with_context(|| format!("Failed to parse indexer config at {config_home:?}"))
}
}

124
indexer_core/src/lib.rs Normal file
View File

@ -0,0 +1,124 @@
use std::sync::Arc;
use anyhow::Result;
use bedrock_client::BedrockClient;
use common::{
block::HashableBlockData, communication::indexer::Message,
rpc_primitives::requests::PostIndexerMessageResponse, sequencer_client::SequencerClient,
};
use futures::StreamExt;
use log::info;
use logos_blockchain_core::mantle::{
Op, SignedMantleTx,
ops::channel::{ChannelId, inscribe::InscriptionOp},
};
use tokio::sync::RwLock;
use crate::{config::IndexerConfig, state::IndexerState};
pub mod config;
pub mod state;
pub struct IndexerCore {
pub bedrock_client: BedrockClient,
pub sequencer_client: SequencerClient,
pub config: IndexerConfig,
pub state: IndexerState,
}
impl IndexerCore {
pub fn new(config: IndexerConfig) -> Result<Self> {
Ok(Self {
bedrock_client: BedrockClient::new(
config.bedrock_client_config.auth.clone().map(Into::into),
config.bedrock_client_config.addr.clone(),
)?,
sequencer_client: SequencerClient::new_with_auth(
config.sequencer_client_config.addr.clone(),
config.sequencer_client_config.auth.clone(),
)?,
config,
// No state setup for now, future task.
state: IndexerState {
latest_seen_block: Arc::new(RwLock::new(0)),
},
})
}
pub async fn subscribe_parse_block_stream(&self) -> Result<()> {
loop {
let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?);
info!("Block stream joined");
while let Some(block_info) = stream_pinned.next().await {
let header_id = block_info.header_id;
info!("Observed L1 block at height {}", block_info.height);
if let Some(l1_block) = self
.bedrock_client
.get_block_by_id(header_id, &self.config.backoff)
.await?
{
info!("Extracted L1 block at height {}", block_info.height);
let l2_blocks_parsed = parse_blocks(
l1_block.into_transactions().into_iter(),
&self.config.channel_id,
);
for l2_block in l2_blocks_parsed {
// State modification, will be updated in future
{
let mut guard = self.state.latest_seen_block.write().await;
if l2_block.block_id > *guard {
*guard = l2_block.block_id;
}
}
// Sending data into sequencer, may need to be expanded.
let message = Message::L2BlockFinalized {
l2_block_height: l2_block.block_id,
};
let status = self.send_message_to_sequencer(message.clone()).await?;
info!("Sent message {message:#?} to sequencer; status {status:#?}");
}
}
}
// Refetch stream after delay
tokio::time::sleep(std::time::Duration::from_millis(
self.config.resubscribe_interval_millis,
))
.await;
}
}
pub async fn send_message_to_sequencer(
&self,
message: Message,
) -> Result<PostIndexerMessageResponse> {
Ok(self.sequencer_client.post_indexer_message(message).await?)
}
}
fn parse_blocks(
block_txs: impl Iterator<Item = SignedMantleTx>,
decoded_channel_id: &ChannelId,
) -> impl Iterator<Item = HashableBlockData> {
block_txs.flat_map(|tx| {
tx.mantle_tx.ops.into_iter().filter_map(|op| match op {
Op::ChannelInscribe(InscriptionOp {
channel_id,
inscription,
..
}) if channel_id == *decoded_channel_id => {
borsh::from_slice::<HashableBlockData>(&inscription).ok()
}
_ => None,
})
})
}

View File

@ -0,0 +1,9 @@
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone)]
pub struct IndexerState {
// Only one field for now, for testing.
pub latest_seen_block: Arc<RwLock<u64>>,
}

View File

@ -11,6 +11,8 @@ sequencer_runner.workspace = true
wallet.workspace = true wallet.workspace = true
common.workspace = true common.workspace = true
key_protocol.workspace = true key_protocol.workspace = true
indexer_core.workspace = true
url.workspace = true
anyhow.workspace = true anyhow.workspace = true
env_logger.workspace = true env_logger.workspace = true

View File

@ -0,0 +1,17 @@
{
"bedrock_client_config": {
"addr": "http://127.0.0.1:8080",
"auth": {
"username": "user"
}
},
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101",
"backoff": {
"max_retries": 10,
"start_delay_millis": 100
},
"resubscribe_interval_millis": 1000,
"sequencer_client_config": {
"addr": "will_be_replaced_in_runtime"
}
}

View File

@ -0,0 +1,165 @@
{
"home": "",
"override_rust_log": null,
"genesis_id": 1,
"is_genesis_random": true,
"max_num_tx_in_block": 20,
"mempool_max_size": 10000,
"block_create_timeout_millis": 10000,
"port": 0,
"initial_accounts": [
{
"account_id": "BLgCRDXYdQPMMWVHYRFGQZbgeHx9frkipa8GtpG2Syqy",
"balance": 10000
},
{
"account_id": "Gj1mJy5W7J5pfmLRujmQaLfLMWidNxQ6uwnhb666ZwHw",
"balance": 20000
}
],
"initial_commitments": [
{
"npk": [
63,
202,
178,
231,
183,
82,
237,
212,
216,
221,
215,
255,
153,
101,
177,
161,
254,
210,
128,
122,
54,
190,
230,
151,
183,
64,
225,
229,
113,
1,
228,
97
],
"account": {
"program_owner": [
0,
0,
0,
0,
0,
0,
0,
0
],
"balance": 10000,
"data": [],
"nonce": 0
}
},
{
"npk": [
192,
251,
166,
243,
167,
236,
84,
249,
35,
136,
130,
172,
219,
225,
161,
139,
229,
89,
243,
125,
194,
213,
209,
30,
23,
174,
100,
244,
124,
74,
140,
47
],
"account": {
"program_owner": [
0,
0,
0,
0,
0,
0,
0,
0
],
"balance": 20000,
"data": [],
"nonce": 0
}
}
],
"signing_key": [
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37
],
"bedrock_config": {
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101",
"node_url": "http://127.0.0.1:8080",
"auth": {
"username": "user"
}
}
}

View File

@ -3,19 +3,21 @@
use std::{net::SocketAddr, path::PathBuf, sync::LazyLock}; use std::{net::SocketAddr, path::PathBuf, sync::LazyLock};
use actix_web::dev::ServerHandle; use actix_web::dev::ServerHandle;
use anyhow::{Context as _, Result}; use anyhow::{Context, Result};
use base64::{Engine, engine::general_purpose::STANDARD as BASE64}; use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
use common::{ use common::{
sequencer_client::SequencerClient, sequencer_client::SequencerClient,
transaction::{EncodedTransaction, NSSATransaction}, transaction::{EncodedTransaction, NSSATransaction},
}; };
use futures::FutureExt as _; use futures::FutureExt as _;
use indexer_core::{IndexerCore, config::IndexerConfig};
use log::debug; use log::debug;
use nssa::PrivacyPreservingTransaction; use nssa::PrivacyPreservingTransaction;
use nssa_core::Commitment; use nssa_core::Commitment;
use sequencer_core::config::SequencerConfig; use sequencer_core::config::SequencerConfig;
use tempfile::TempDir; use tempfile::TempDir;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use url::Url;
use wallet::{WalletCore, config::WalletConfigOverrides}; use wallet::{WalletCore, config::WalletConfigOverrides};
// TODO: Remove this and control time from tests // TODO: Remove this and control time from tests
@ -38,6 +40,7 @@ static LOGGER: LazyLock<()> = LazyLock::new(env_logger::init);
pub struct TestContext { pub struct TestContext {
sequencer_server_handle: ServerHandle, sequencer_server_handle: ServerHandle,
sequencer_loop_handle: JoinHandle<Result<()>>, sequencer_loop_handle: JoinHandle<Result<()>>,
indexer_loop_handle: Option<JoinHandle<Result<()>>>,
sequencer_client: SequencerClient, sequencer_client: SequencerClient,
wallet: WalletCore, wallet: WalletCore,
_temp_sequencer_dir: TempDir, _temp_sequencer_dir: TempDir,
@ -45,24 +48,47 @@ pub struct TestContext {
} }
impl TestContext { impl TestContext {
/// Create new test context. /// Create new test context in detached mode. Default.
pub async fn new() -> Result<Self> { pub async fn new() -> Result<Self> {
let manifest_dir = env!("CARGO_MANIFEST_DIR"); let manifest_dir = env!("CARGO_MANIFEST_DIR");
let sequencer_config_path = let sequencer_config_path =
PathBuf::from(manifest_dir).join("configs/sequencer/sequencer_config.json"); PathBuf::from(manifest_dir).join("configs/sequencer/detached/sequencer_config.json");
let sequencer_config = SequencerConfig::from_path(&sequencer_config_path) let sequencer_config = SequencerConfig::from_path(&sequencer_config_path)
.context("Failed to create sequencer config from file")?; .context("Failed to create sequencer config from file")?;
Self::new_with_sequencer_config(sequencer_config).await Self::new_with_sequencer_and_maybe_indexer_configs(sequencer_config, None).await
} }
/// Create new test context with custom sequencer config. /// Create new test context in local bedrock node attached mode.
pub async fn new_bedrock_local_attached() -> Result<Self> {
let manifest_dir = env!("CARGO_MANIFEST_DIR");
let sequencer_config_path = PathBuf::from(manifest_dir)
.join("configs/sequencer/bedrock_local_attached/sequencer_config.json");
let sequencer_config = SequencerConfig::from_path(&sequencer_config_path)
.context("Failed to create sequencer config from file")?;
let indexer_config_path =
PathBuf::from(manifest_dir).join("configs/indexer/indexer_config.json");
let indexer_config = IndexerConfig::from_path(&indexer_config_path)
.context("Failed to create indexer config from file")?;
Self::new_with_sequencer_and_maybe_indexer_configs(sequencer_config, Some(indexer_config))
.await
}
/// Create new test context with custom sequencer config and maybe indexer config.
/// ///
/// `home` and `port` fields of the provided config will be overridden to meet tests parallelism /// `home` and `port` fields of the provided config will be overridden to meet tests parallelism
/// requirements. /// requirements.
pub async fn new_with_sequencer_config(sequencer_config: SequencerConfig) -> Result<Self> { pub async fn new_with_sequencer_and_maybe_indexer_configs(
sequencer_config: SequencerConfig,
indexer_config: Option<IndexerConfig>,
) -> Result<Self> {
// Ensure logger is initialized only once // Ensure logger is initialized only once
*LOGGER; *LOGGER;
@ -86,17 +112,41 @@ impl TestContext {
.await .await
.context("Failed to setup wallet")?; .context("Failed to setup wallet")?;
let sequencer_client = let sequencer_client = SequencerClient::new(
SequencerClient::new(sequencer_addr).context("Failed to create sequencer client")?; Url::parse(&sequencer_addr).context("Failed to parse sequencer addr")?,
)
.context("Failed to create sequencer client")?;
Ok(Self { if let Some(mut indexer_config) = indexer_config {
sequencer_server_handle, indexer_config.sequencer_client_config.addr =
sequencer_loop_handle, Url::parse(&sequencer_addr).context("Failed to parse sequencer addr")?;
sequencer_client,
wallet, let indexer_core = IndexerCore::new(indexer_config)?;
_temp_sequencer_dir: temp_sequencer_dir,
_temp_wallet_dir: temp_wallet_dir, let indexer_loop_handle = Some(tokio::spawn(async move {
}) indexer_core.subscribe_parse_block_stream().await
}));
Ok(Self {
sequencer_server_handle,
sequencer_loop_handle,
indexer_loop_handle,
sequencer_client,
wallet,
_temp_sequencer_dir: temp_sequencer_dir,
_temp_wallet_dir: temp_wallet_dir,
})
} else {
Ok(Self {
sequencer_server_handle,
sequencer_loop_handle,
indexer_loop_handle: None,
sequencer_client,
wallet,
_temp_sequencer_dir: temp_sequencer_dir,
_temp_wallet_dir: temp_wallet_dir,
})
}
} }
async fn setup_sequencer( async fn setup_sequencer(
@ -180,6 +230,7 @@ impl Drop for TestContext {
let Self { let Self {
sequencer_server_handle, sequencer_server_handle,
sequencer_loop_handle, sequencer_loop_handle,
indexer_loop_handle,
sequencer_client: _, sequencer_client: _,
wallet: _, wallet: _,
_temp_sequencer_dir, _temp_sequencer_dir,
@ -187,6 +238,9 @@ impl Drop for TestContext {
} = self; } = self;
sequencer_loop_handle.abort(); sequencer_loop_handle.abort();
if let Some(indexer_loop_handle) = indexer_loop_handle {
indexer_loop_handle.abort();
}
// Can't wait here as Drop can't be async, but anyway stop signal should be sent // Can't wait here as Drop can't be async, but anyway stop signal should be sent
sequencer_server_handle.stop(true).now_or_never(); sequencer_server_handle.stop(true).now_or_never();

View File

@ -0,0 +1,23 @@
use anyhow::Result;
use integration_tests::TestContext;
use log::info;
use tokio::test;
#[ignore = "needs complicated setup"]
#[test]
// To run this test properly, you need nomos node running in the background.
// For instructions in building nomos node, refer to [this](https://github.com/logos-blockchain/logos-blockchain?tab=readme-ov-file#running-a-logos-blockchain-node).
//
// Recommended to run node locally from build binary.
async fn indexer_run_local_node() -> Result<()> {
let _ctx = TestContext::new_bedrock_local_attached().await?;
info!("Let's observe behaviour");
tokio::time::sleep(std::time::Duration::from_secs(180)).await;
// No way to check state of indexer now
// When it will be a service, then it will become possible.
Ok(())
}

View File

@ -25,7 +25,11 @@ pub async fn tps_test() -> Result<()> {
let target_tps = 12; let target_tps = 12;
let tps_test = TpsTestManager::new(target_tps, num_transactions); let tps_test = TpsTestManager::new(target_tps, num_transactions);
let ctx = TestContext::new_with_sequencer_config(tps_test.generate_sequencer_config()).await?; let ctx = TestContext::new_with_sequencer_and_maybe_indexer_configs(
tps_test.generate_sequencer_config(),
None,
)
.await?;
let target_time = tps_test.target_time(); let target_time = tps_test.target_time();
info!( info!(

View File

@ -17,6 +17,7 @@ serde_json.workspace = true
tempfile.workspace = true tempfile.workspace = true
chrono.workspace = true chrono.workspace = true
log.workspace = true log.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
bedrock_client.workspace = true bedrock_client.workspace = true
logos-blockchain-key-management-system-service.workspace = true logos-blockchain-key-management-system-service.workspace = true
logos-blockchain-core.workspace = true logos-blockchain-core.workspace = true
@ -29,5 +30,4 @@ default = []
testnet = [] testnet = []
[dev-dependencies] [dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
futures.workspace = true futures.workspace = true

View File

@ -1,4 +1,4 @@
use std::{fs, path::Path}; use std::{fs, path::Path, str::FromStr};
use anyhow::{Context, Result, anyhow}; use anyhow::{Context, Result, anyhow};
use bedrock_client::BedrockClient; use bedrock_client::BedrockClient;
@ -10,6 +10,7 @@ use logos_blockchain_core::mantle::{
use logos_blockchain_key_management_system_service::keys::{ use logos_blockchain_key_management_system_service::keys::{
ED25519_SECRET_KEY_SIZE, Ed25519Key, Ed25519PublicKey, ED25519_SECRET_KEY_SIZE, Ed25519Key, Ed25519PublicKey,
}; };
use reqwest::Url;
use crate::config::BedrockConfig; use crate::config::BedrockConfig;
@ -25,14 +26,15 @@ impl BlockSettlementClient {
pub fn try_new(home: &Path, config: &BedrockConfig) -> Result<Self> { pub fn try_new(home: &Path, config: &BedrockConfig) -> Result<Self> {
let bedrock_signing_key = load_or_create_signing_key(&home.join("bedrock_signing_key")) let bedrock_signing_key = load_or_create_signing_key(&home.join("bedrock_signing_key"))
.context("Failed to load or create signing key")?; .context("Failed to load or create signing key")?;
let bedrock_channel_id = ChannelId::from(config.channel_id); let bedrock_url = Url::from_str(config.node_url.as_ref())
let bedrock_client = BedrockClient::new(None, config.node_url.clone()) .context("Bedrock node address is not a valid url")?;
.context("Failed to initialize bedrock client")?; let bedrock_client =
BedrockClient::new(None, bedrock_url).context("Failed to initialize bedrock client")?;
let channel_genesis_msg = MsgId::from([0; 32]); let channel_genesis_msg = MsgId::from([0; 32]);
Ok(Self { Ok(Self {
bedrock_client, bedrock_client,
bedrock_signing_key, bedrock_signing_key,
bedrock_channel_id, bedrock_channel_id: config.channel_id,
last_message_id: channel_genesis_msg, last_message_id: channel_genesis_msg,
}) })
} }

View File

@ -5,7 +5,8 @@ use std::{
}; };
use anyhow::Result; use anyhow::Result;
use reqwest::Url; use common::sequencer_client::BasicAuth;
use logos_blockchain_core::mantle::ops::channel::ChannelId;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
@ -55,9 +56,11 @@ pub struct SequencerConfig {
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
pub struct BedrockConfig { pub struct BedrockConfig {
/// Bedrock channel ID /// Bedrock channel ID
pub channel_id: [u8; 32], pub channel_id: ChannelId,
/// Bedrock Url /// Bedrock Url
pub node_url: Url, pub node_url: String,
/// Bedrock auth
pub auth: Option<BasicAuth>,
} }
impl SequencerConfig { impl SequencerConfig {

View File

@ -18,8 +18,8 @@ use common::{
GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse, GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse,
GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest,
GetProofForCommitmentResponse, GetTransactionByHashRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest,
GetTransactionByHashResponse, HelloRequest, HelloResponse, SendTxRequest, GetTransactionByHashResponse, HelloRequest, HelloResponse, PostIndexerMessageRequest,
SendTxResponse, PostIndexerMessageResponse, SendTxRequest, SendTxResponse,
}, },
}, },
transaction::{EncodedTransaction, NSSATransaction}, transaction::{EncodedTransaction, NSSATransaction},
@ -44,6 +44,7 @@ pub const GET_ACCOUNTS_NONCES: &str = "get_accounts_nonces";
pub const GET_ACCOUNT: &str = "get_account"; pub const GET_ACCOUNT: &str = "get_account";
pub const GET_PROOF_FOR_COMMITMENT: &str = "get_proof_for_commitment"; pub const GET_PROOF_FOR_COMMITMENT: &str = "get_proof_for_commitment";
pub const GET_PROGRAM_IDS: &str = "get_program_ids"; pub const GET_PROGRAM_IDS: &str = "get_program_ids";
pub const POST_INDEXER_MESSAGE: &str = "post_indexer_message";
pub const HELLO_FROM_SEQUENCER: &str = "HELLO_FROM_SEQUENCER"; pub const HELLO_FROM_SEQUENCER: &str = "HELLO_FROM_SEQUENCER";
@ -314,6 +315,18 @@ impl JsonHandler {
respond(response) respond(response)
} }
async fn process_indexer_message(&self, request: Request) -> Result<Value, RpcErr> {
let _indexer_post_req = PostIndexerMessageRequest::parse(Some(request.params))?;
// ToDo: Add indexer messages handling
let response = PostIndexerMessageResponse {
status: "Success".to_string(),
};
respond(response)
}
pub async fn process_request_internal(&self, request: Request) -> Result<Value, RpcErr> { pub async fn process_request_internal(&self, request: Request) -> Result<Value, RpcErr> {
match request.method.as_ref() { match request.method.as_ref() {
HELLO => self.process_temp_hello(request).await, HELLO => self.process_temp_hello(request).await,
@ -329,6 +342,7 @@ impl JsonHandler {
GET_TRANSACTION_BY_HASH => self.process_get_transaction_by_hash(request).await, GET_TRANSACTION_BY_HASH => self.process_get_transaction_by_hash(request).await,
GET_PROOF_FOR_COMMITMENT => self.process_get_proof_by_commitment(request).await, GET_PROOF_FOR_COMMITMENT => self.process_get_proof_by_commitment(request).await,
GET_PROGRAM_IDS => self.process_get_program_ids(request).await, GET_PROGRAM_IDS => self.process_get_program_ids(request).await,
POST_INDEXER_MESSAGE => self.process_indexer_message(request).await,
_ => Err(RpcErr(RpcError::method_not_found(request.method))), _ => Err(RpcErr(RpcError::method_not_found(request.method))),
} }
} }
@ -340,10 +354,13 @@ mod tests {
use base58::ToBase58; use base58::ToBase58;
use base64::{Engine, engine::general_purpose}; use base64::{Engine, engine::general_purpose};
use common::{test_utils::sequencer_sign_key_for_testing, transaction::EncodedTransaction}; use common::{
sequencer_client::BasicAuth, test_utils::sequencer_sign_key_for_testing,
transaction::EncodedTransaction,
};
use sequencer_core::{ use sequencer_core::{
SequencerCore, SequencerCore,
config::{AccountInitialData, SequencerConfig}, config::{AccountInitialData, BedrockConfig, SequencerConfig},
}; };
use serde_json::Value; use serde_json::Value;
use tempfile::tempdir; use tempfile::tempdir;
@ -388,12 +405,20 @@ mod tests {
initial_accounts, initial_accounts,
initial_commitments: vec![], initial_commitments: vec![],
signing_key: *sequencer_sign_key_for_testing().value(), signing_key: *sequencer_sign_key_for_testing().value(),
bedrock_config: None, bedrock_config: Some(BedrockConfig {
channel_id: [42; 32].into(),
node_url: "http://localhost:8080".to_string(),
auth: Some(BasicAuth {
username: "user".to_string(),
password: None,
}),
}),
} }
} }
async fn components_for_tests() -> (JsonHandler, Vec<AccountInitialData>, EncodedTransaction) { async fn components_for_tests() -> (JsonHandler, Vec<AccountInitialData>, EncodedTransaction) {
let config = sequencer_config_for_tests(); let config = sequencer_config_for_tests();
let (mut sequencer_core, mempool_handle) = SequencerCore::start_from_config(config); let (mut sequencer_core, mempool_handle) = SequencerCore::start_from_config(config);
let initial_accounts = sequencer_core.sequencer_config().initial_accounts.clone(); let initial_accounts = sequencer_core.sequencer_config().initial_accounts.clone();

View File

@ -156,7 +156,10 @@
37 37
], ],
"bedrock_config": { "bedrock_config": {
"channel_id": [1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1], "channel_id": "0101010101010101010101010101010101010101010101010101010101010101",
"node_url": "http://localhost:8080" "node_url": "http://localhost:8080",
"auth": {
"username": "user"
}
} }
} }

View File

@ -29,3 +29,4 @@ risc0-zkvm.workspace = true
async-stream = "0.3.6" async-stream = "0.3.6"
indicatif = { version = "0.18.3", features = ["improved_unicode"] } indicatif = { version = "0.18.3", features = ["improved_unicode"] }
optfield = "0.4.0" optfield = "0.4.0"
url.workspace = true

View File

@ -1,10 +1,10 @@
use std::{ use std::{
io::{BufReader, Write as _}, io::{BufReader, Write as _},
path::Path, path::Path,
str::FromStr,
}; };
use anyhow::{Context as _, Result}; use anyhow::{Context as _, Result};
use common::sequencer_client::BasicAuth;
use key_protocol::key_management::{ use key_protocol::key_management::{
KeyChain, KeyChain,
key_tree::{ key_tree::{
@ -14,49 +14,6 @@ use key_protocol::key_management::{
use log::warn; use log::warn;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BasicAuth {
pub username: String,
pub password: Option<String>,
}
impl std::fmt::Display for BasicAuth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.username)?;
if let Some(password) = &self.password {
write!(f, ":{password}")?;
}
Ok(())
}
}
impl FromStr for BasicAuth {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let parse = || {
let mut parts = s.splitn(2, ':');
let username = parts.next()?;
let password = parts.next().filter(|p| !p.is_empty());
if parts.next().is_some() {
return None;
}
Some((username, password))
};
let (username, password) = parse().ok_or_else(|| {
anyhow::anyhow!("Invalid auth format. Expected 'user' or 'user:password'")
})?;
Ok(Self {
username: username.to_string(),
password: password.map(|p| p.to_string()),
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InitialAccountDataPublic { pub struct InitialAccountDataPublic {
pub account_id: String, pub account_id: String,

View File

@ -23,6 +23,7 @@ use nssa_core::{
}; };
pub use privacy_preserving_tx::PrivacyPreservingAccount; pub use privacy_preserving_tx::PrivacyPreservingAccount;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use url::Url;
use crate::{ use crate::{
config::{PersistentStorage, WalletConfigOverrides}, config::{PersistentStorage, WalletConfigOverrides},
@ -188,13 +189,9 @@ impl WalletCore {
config.apply_overrides(config_overrides); config.apply_overrides(config_overrides);
} }
let basic_auth = config
.basic_auth
.as_ref()
.map(|auth| (auth.username.clone(), auth.password.clone()));
let sequencer_client = Arc::new(SequencerClient::new_with_auth( let sequencer_client = Arc::new(SequencerClient::new_with_auth(
config.sequencer_addr.clone(), Url::parse(&config.sequencer_addr)?,
basic_auth, config.basic_auth.clone(),
)?); )?);
let tx_poller = TxPoller::new(config.clone(), Arc::clone(&sequencer_client)); let tx_poller = TxPoller::new(config.clone(), Arc::clone(&sequencer_client));