diff --git a/Cargo.lock b/Cargo.lock index 0a047c72..7ad1d1e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -927,9 +927,14 @@ name = "bedrock_client" version = "0.1.0" dependencies = [ "anyhow", + "futures", + "log", + "logos-blockchain-chain-broadcast-service", "logos-blockchain-common-http-client", "logos-blockchain-core", "reqwest", + "serde", + "tokio-retry", ] [[package]] @@ -1335,6 +1340,7 @@ dependencies = [ "borsh", "hex", "log", + "logos-blockchain-common-http-client", "logos-blockchain-core", "nssa", "nssa_core", @@ -1343,6 +1349,7 @@ dependencies = [ "serde_json", "sha2", "thiserror 2.0.17", + "url", ] [[package]] @@ -2627,7 +2634,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.1", "system-configuration", "tokio", "tower-service", @@ -2773,6 +2780,23 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ee796ad498c8d9a1d68e477df8f754ed784ef875de1414ebdaf169f70a6a784" +[[package]] +name = "indexer_core" +version = "0.1.0" +dependencies = [ + "anyhow", + "bedrock_client", + "borsh", + "common", + "futures", + "log", + "logos-blockchain-core", + "serde", + "serde_json", + "tokio", + "url", +] + [[package]] name = "indexer_service" version = "0.1.0" @@ -2870,6 +2894,7 @@ dependencies = [ "env_logger", "futures", "hex", + "indexer_core", "key_protocol", "log", "nssa", @@ -2878,6 +2903,7 @@ dependencies = [ "sequencer_runner", "tempfile", "tokio", + "url", "wallet", ] @@ -4678,7 +4704,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.5.10", + "socket2 0.6.1", "tracing", "windows-sys 0.60.2", ] @@ -6199,6 +6225,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-retry" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" +dependencies = [ + "pin-project", + "rand 0.8.5", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.4" @@ -6596,6 +6633,7 @@ dependencies = [ "serde_json", "sha2", "tokio", + "url", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 3b1c9f12..6d40c7fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ members = [ "examples/program_deployment/methods", "examples/program_deployment/methods/guest", "bedrock_client", + "indexer_core", ] [workspace.dependencies] @@ -41,6 +42,7 @@ indexer_service_rpc = { path = "indexer_service/rpc" } wallet = { path = "wallet" } test_program_methods = { path = "test_program_methods" } bedrock_client = { path = "bedrock_client" } +indexer_core = { path = "indexer_core" } tokio = { version = "1.28.2", features = [ "net", @@ -85,12 +87,14 @@ chrono = "0.4.41" borsh = "1.5.7" base58 = "0.2.0" itertools = "0.14.0" -url = "2.5.4" +url = { version = "2.5.4", features = ["serde"] } +tokio-retry = "0.3.0" schemars = "1.2.0" logos-blockchain-common-http-client = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } logos-blockchain-key-management-system-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } logos-blockchain-core = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } +logos-blockchain-chain-broadcast-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } rocksdb = { version = "0.24.0", default-features = false, features = [ "snappy", diff --git a/bedrock_client/Cargo.toml b/bedrock_client/Cargo.toml index 50a54815..a250befe 100644 --- a/bedrock_client/Cargo.toml +++ b/bedrock_client/Cargo.toml @@ -6,5 +6,10 @@ edition = "2024" [dependencies] reqwest.workspace = true anyhow.workspace = true +tokio-retry.workspace = true +futures.workspace = true +log.workspace = true +serde.workspace = true logos-blockchain-common-http-client.workspace = true logos-blockchain-core.workspace = true +logos-blockchain-chain-broadcast-service.workspace = true diff --git a/bedrock_client/src/lib.rs b/bedrock_client/src/lib.rs index b16204c9..631216bd 100644 --- a/bedrock_client/src/lib.rs +++ b/bedrock_client/src/lib.rs @@ -1,7 +1,19 @@ use anyhow::Result; +use futures::{Stream, TryFutureExt}; +use log::warn; +pub use logos_blockchain_chain_broadcast_service::BlockInfo; pub use logos_blockchain_common_http_client::{BasicAuthCredentials, CommonHttpClient, Error}; -use logos_blockchain_core::mantle::SignedMantleTx; +pub use logos_blockchain_core::{block::Block, header::HeaderId, mantle::SignedMantleTx}; use reqwest::{Client, Url}; +use serde::{Deserialize, Serialize}; +use tokio_retry::Retry; + +/// Fibonacci backoff retry strategy configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BackoffConfig { + pub start_delay_millis: u64, + pub max_retries: usize, +} // Simple wrapper // maybe extend in the future for our purposes @@ -30,4 +42,25 @@ impl BedrockClient { .post_transaction(self.node_url.clone(), tx) .await } + + pub async fn get_lib_stream(&self) -> Result, Error> { + self.http_client.get_lib_stream(self.node_url.clone()).await + } + + pub async fn get_block_by_id( + &self, + header_id: HeaderId, + backoff: &BackoffConfig, + ) -> Result>, Error> { + let strategy = + tokio_retry::strategy::FibonacciBackoff::from_millis(backoff.start_delay_millis) + .take(backoff.max_retries); + + Retry::spawn(strategy, || { + self.http_client + .get_block_by_id(self.node_url.clone(), header_id) + .inspect_err(|err| warn!("Block fetching failed with err: {err:#?}")) + }) + .await + } } diff --git a/common/Cargo.toml b/common/Cargo.toml index 96f267df..def7f2fa 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -18,3 +18,5 @@ hex.workspace = true borsh.workspace = true base64.workspace = true logos-blockchain-core.workspace = true +url.workspace = true +logos-blockchain-common-http-client.workspace = true diff --git a/common/src/block.rs b/common/src/block.rs index e7659fbf..1eab90d5 100644 --- a/common/src/block.rs +++ b/common/src/block.rs @@ -89,6 +89,10 @@ impl HashableBlockData { bedrock_parent_id, } } + + pub fn block_hash(&self) -> BlockHash { + OwnHasher::hash(&borsh::to_vec(&self).unwrap()) + } } impl From for HashableBlockData { diff --git a/common/src/communication/indexer.rs b/common/src/communication/indexer.rs new file mode 100644 index 00000000..a0edc176 --- /dev/null +++ b/common/src/communication/indexer.rs @@ -0,0 +1,6 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Message { + L2BlockFinalized { l2_block_height: u64 }, +} diff --git a/common/src/communication/mod.rs b/common/src/communication/mod.rs new file mode 100644 index 00000000..d99eb481 --- /dev/null +++ b/common/src/communication/mod.rs @@ -0,0 +1 @@ +pub mod indexer; diff --git a/common/src/lib.rs b/common/src/lib.rs index b64e6ef9..68902811 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -1,4 +1,5 @@ pub mod block; +pub mod communication; pub mod error; pub mod rpc_primitives; pub mod sequencer_client; diff --git a/common/src/rpc_primitives/requests.rs b/common/src/rpc_primitives/requests.rs index 71641936..6191df44 100644 --- a/common/src/rpc_primitives/requests.rs +++ b/common/src/rpc_primitives/requests.rs @@ -73,6 +73,11 @@ pub struct GetProofForCommitmentRequest { #[derive(Serialize, Deserialize, Debug)] pub struct GetProgramIdsRequest {} +#[derive(Serialize, Deserialize, Debug)] +pub struct PostIndexerMessageRequest { + pub message: crate::communication::indexer::Message, +} + parse_request!(HelloRequest); parse_request!(RegisterAccountRequest); parse_request!(SendTxRequest); @@ -87,6 +92,7 @@ parse_request!(GetAccountsNoncesRequest); parse_request!(GetProofForCommitmentRequest); parse_request!(GetAccountRequest); parse_request!(GetProgramIdsRequest); +parse_request!(PostIndexerMessageRequest); #[derive(Serialize, Deserialize, Debug)] pub struct HelloResponse { @@ -216,3 +222,8 @@ pub struct GetInitialTestnetAccountsResponse { pub account_id: String, pub balance: u64, } + +#[derive(Serialize, Deserialize, Debug)] +pub struct PostIndexerMessageResponse { + pub status: String, +} diff --git a/common/src/sequencer_client.rs b/common/src/sequencer_client.rs index 0cb03f6f..7a14d425 100644 --- a/common/src/sequencer_client.rs +++ b/common/src/sequencer_client.rs @@ -1,10 +1,12 @@ -use std::{collections::HashMap, ops::RangeInclusive}; +use std::{collections::HashMap, ops::RangeInclusive, str::FromStr}; use anyhow::Result; +use logos_blockchain_common_http_client::BasicAuthCredentials; use nssa_core::program::ProgramId; use reqwest::Client; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use serde_json::Value; +use url::Url; use super::rpc_primitives::requests::{ GetAccountBalanceRequest, GetAccountBalanceResponse, GetBlockDataRequest, GetBlockDataResponse, @@ -20,28 +22,75 @@ use crate::{ GetInitialTestnetAccountsResponse, GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest, - GetTransactionByHashResponse, SendTxRequest, SendTxResponse, + GetTransactionByHashResponse, PostIndexerMessageRequest, PostIndexerMessageResponse, + SendTxRequest, SendTxResponse, }, }, transaction::{EncodedTransaction, NSSATransaction}, }; +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BasicAuth { + pub username: String, + pub password: Option, +} + +impl std::fmt::Display for BasicAuth { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.username)?; + if let Some(password) = &self.password { + write!(f, ":{password}")?; + } + + Ok(()) + } +} + +impl FromStr for BasicAuth { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let parse = || { + let mut parts = s.splitn(2, ':'); + let username = parts.next()?; + let password = parts.next().filter(|p| !p.is_empty()); + if parts.next().is_some() { + return None; + } + + Some((username, password)) + }; + + let (username, password) = parse().ok_or_else(|| { + anyhow::anyhow!("Invalid auth format. Expected 'user' or 'user:password'") + })?; + + Ok(Self { + username: username.to_string(), + password: password.map(|p| p.to_string()), + }) + } +} + +impl From for BasicAuthCredentials { + fn from(value: BasicAuth) -> Self { + BasicAuthCredentials::new(value.username, value.password) + } +} + #[derive(Clone)] pub struct SequencerClient { pub client: reqwest::Client, - pub sequencer_addr: String, - pub basic_auth: Option<(String, Option)>, + pub sequencer_addr: Url, + pub basic_auth: Option, } impl SequencerClient { - pub fn new(sequencer_addr: String) -> Result { + pub fn new(sequencer_addr: Url) -> Result { Self::new_with_auth(sequencer_addr, None) } - pub fn new_with_auth( - sequencer_addr: String, - basic_auth: Option<(String, Option)>, - ) -> Result { + pub fn new_with_auth(sequencer_addr: Url, basic_auth: Option) -> Result { Ok(Self { client: Client::builder() // Add more fields if needed @@ -66,9 +115,9 @@ impl SequencerClient { "Calling method {method} with payload {request:?} to sequencer at {}", self.sequencer_addr ); - let mut call_builder = self.client.post(&self.sequencer_addr); + let mut call_builder = self.client.post(self.sequencer_addr.clone()); - if let Some((username, password)) = &self.basic_auth { + if let Some(BasicAuth { username, password }) = &self.basic_auth { call_builder = call_builder.basic_auth(username, password.as_deref()); } @@ -347,4 +396,23 @@ impl SequencerClient { Ok(resp_deser) } + + /// Post indexer into sequencer + pub async fn post_indexer_message( + &self, + message: crate::communication::indexer::Message, + ) -> Result { + let last_req = PostIndexerMessageRequest { message }; + + let req = serde_json::to_value(last_req).unwrap(); + + let resp = self + .call_method_with_payload("post_indexer_message", req) + .await + .unwrap(); + + let resp_deser = serde_json::from_value(resp).unwrap(); + + Ok(resp_deser) + } } diff --git a/indexer_core/Cargo.toml b/indexer_core/Cargo.toml new file mode 100644 index 00000000..922f566c --- /dev/null +++ b/indexer_core/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "indexer_core" +version = "0.1.0" +edition = "2024" + +[dependencies] +common.workspace = true +bedrock_client.workspace = true + +anyhow.workspace = true +log.workspace = true +serde.workspace = true +tokio.workspace = true +borsh.workspace = true +futures.workspace = true +url.workspace = true +logos-blockchain-core.workspace = true +serde_json.workspace = true diff --git a/indexer_core/src/config.rs b/indexer_core/src/config.rs new file mode 100644 index 00000000..784f5840 --- /dev/null +++ b/indexer_core/src/config.rs @@ -0,0 +1,36 @@ +use std::{fs::File, io::BufReader, path::Path}; + +use anyhow::{Context, Result}; +use bedrock_client::BackoffConfig; +use common::sequencer_client::BasicAuth; +use logos_blockchain_core::mantle::ops::channel::ChannelId; +use serde::{Deserialize, Serialize}; +use url::Url; + +#[derive(Debug, Clone, Serialize, Deserialize)] +/// ToDo: Expand if necessary +pub struct ClientConfig { + pub addr: Url, + pub auth: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +/// Note: For individual RPC requests we use Fibonacci backoff retry strategy +pub struct IndexerConfig { + pub resubscribe_interval_millis: u64, + pub backoff: BackoffConfig, + pub bedrock_client_config: ClientConfig, + pub sequencer_client_config: ClientConfig, + pub channel_id: ChannelId, +} + +impl IndexerConfig { + pub fn from_path(config_home: &Path) -> Result { + let file = File::open(config_home) + .with_context(|| format!("Failed to open indexer config at {config_home:?}"))?; + let reader = BufReader::new(file); + + serde_json::from_reader(reader) + .with_context(|| format!("Failed to parse indexer config at {config_home:?}")) + } +} diff --git a/indexer_core/src/lib.rs b/indexer_core/src/lib.rs new file mode 100644 index 00000000..ca9ec22f --- /dev/null +++ b/indexer_core/src/lib.rs @@ -0,0 +1,124 @@ +use std::sync::Arc; + +use anyhow::Result; +use bedrock_client::BedrockClient; +use common::{ + block::HashableBlockData, communication::indexer::Message, + rpc_primitives::requests::PostIndexerMessageResponse, sequencer_client::SequencerClient, +}; +use futures::StreamExt; +use log::info; +use logos_blockchain_core::mantle::{ + Op, SignedMantleTx, + ops::channel::{ChannelId, inscribe::InscriptionOp}, +}; +use tokio::sync::RwLock; + +use crate::{config::IndexerConfig, state::IndexerState}; + +pub mod config; +pub mod state; + +pub struct IndexerCore { + pub bedrock_client: BedrockClient, + pub sequencer_client: SequencerClient, + pub config: IndexerConfig, + pub state: IndexerState, +} + +impl IndexerCore { + pub fn new(config: IndexerConfig) -> Result { + Ok(Self { + bedrock_client: BedrockClient::new( + config.bedrock_client_config.auth.clone().map(Into::into), + config.bedrock_client_config.addr.clone(), + )?, + sequencer_client: SequencerClient::new_with_auth( + config.sequencer_client_config.addr.clone(), + config.sequencer_client_config.auth.clone(), + )?, + config, + // No state setup for now, future task. + state: IndexerState { + latest_seen_block: Arc::new(RwLock::new(0)), + }, + }) + } + + pub async fn subscribe_parse_block_stream(&self) -> Result<()> { + loop { + let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?); + + info!("Block stream joined"); + + while let Some(block_info) = stream_pinned.next().await { + let header_id = block_info.header_id; + + info!("Observed L1 block at height {}", block_info.height); + + if let Some(l1_block) = self + .bedrock_client + .get_block_by_id(header_id, &self.config.backoff) + .await? + { + info!("Extracted L1 block at height {}", block_info.height); + + let l2_blocks_parsed = parse_blocks( + l1_block.into_transactions().into_iter(), + &self.config.channel_id, + ); + + for l2_block in l2_blocks_parsed { + // State modification, will be updated in future + { + let mut guard = self.state.latest_seen_block.write().await; + if l2_block.block_id > *guard { + *guard = l2_block.block_id; + } + } + + // Sending data into sequencer, may need to be expanded. + let message = Message::L2BlockFinalized { + l2_block_height: l2_block.block_id, + }; + + let status = self.send_message_to_sequencer(message.clone()).await?; + + info!("Sent message {message:#?} to sequencer; status {status:#?}"); + } + } + } + + // Refetch stream after delay + tokio::time::sleep(std::time::Duration::from_millis( + self.config.resubscribe_interval_millis, + )) + .await; + } + } + + pub async fn send_message_to_sequencer( + &self, + message: Message, + ) -> Result { + Ok(self.sequencer_client.post_indexer_message(message).await?) + } +} + +fn parse_blocks( + block_txs: impl Iterator, + decoded_channel_id: &ChannelId, +) -> impl Iterator { + block_txs.flat_map(|tx| { + tx.mantle_tx.ops.into_iter().filter_map(|op| match op { + Op::ChannelInscribe(InscriptionOp { + channel_id, + inscription, + .. + }) if channel_id == *decoded_channel_id => { + borsh::from_slice::(&inscription).ok() + } + _ => None, + }) + }) +} diff --git a/indexer_core/src/state.rs b/indexer_core/src/state.rs new file mode 100644 index 00000000..bd05971f --- /dev/null +++ b/indexer_core/src/state.rs @@ -0,0 +1,9 @@ +use std::sync::Arc; + +use tokio::sync::RwLock; + +#[derive(Debug, Clone)] +pub struct IndexerState { + // Only one field for now, for testing. + pub latest_seen_block: Arc>, +} diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index b888c177..b7ca13dd 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -11,6 +11,8 @@ sequencer_runner.workspace = true wallet.workspace = true common.workspace = true key_protocol.workspace = true +indexer_core.workspace = true +url.workspace = true anyhow.workspace = true env_logger.workspace = true diff --git a/integration_tests/configs/indexer/indexer_config.json b/integration_tests/configs/indexer/indexer_config.json new file mode 100644 index 00000000..fd5309b2 --- /dev/null +++ b/integration_tests/configs/indexer/indexer_config.json @@ -0,0 +1,17 @@ +{ + "bedrock_client_config": { + "addr": "http://127.0.0.1:8080", + "auth": { + "username": "user" + } + }, + "channel_id": "0101010101010101010101010101010101010101010101010101010101010101", + "backoff": { + "max_retries": 10, + "start_delay_millis": 100 + }, + "resubscribe_interval_millis": 1000, + "sequencer_client_config": { + "addr": "will_be_replaced_in_runtime" + } +} \ No newline at end of file diff --git a/integration_tests/configs/sequencer/bedrock_local_attached/sequencer_config.json b/integration_tests/configs/sequencer/bedrock_local_attached/sequencer_config.json new file mode 100644 index 00000000..3253115b --- /dev/null +++ b/integration_tests/configs/sequencer/bedrock_local_attached/sequencer_config.json @@ -0,0 +1,165 @@ +{ + "home": "", + "override_rust_log": null, + "genesis_id": 1, + "is_genesis_random": true, + "max_num_tx_in_block": 20, + "mempool_max_size": 10000, + "block_create_timeout_millis": 10000, + "port": 0, + "initial_accounts": [ + { + "account_id": "BLgCRDXYdQPMMWVHYRFGQZbgeHx9frkipa8GtpG2Syqy", + "balance": 10000 + }, + { + "account_id": "Gj1mJy5W7J5pfmLRujmQaLfLMWidNxQ6uwnhb666ZwHw", + "balance": 20000 + } + ], + "initial_commitments": [ + { + "npk": [ + 63, + 202, + 178, + 231, + 183, + 82, + 237, + 212, + 216, + 221, + 215, + 255, + 153, + 101, + 177, + 161, + 254, + 210, + 128, + 122, + 54, + 190, + 230, + 151, + 183, + 64, + 225, + 229, + 113, + 1, + 228, + 97 + ], + "account": { + "program_owner": [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0 + ], + "balance": 10000, + "data": [], + "nonce": 0 + } + }, + { + "npk": [ + 192, + 251, + 166, + 243, + 167, + 236, + 84, + 249, + 35, + 136, + 130, + 172, + 219, + 225, + 161, + 139, + 229, + 89, + 243, + 125, + 194, + 213, + 209, + 30, + 23, + 174, + 100, + 244, + 124, + 74, + 140, + 47 + ], + "account": { + "program_owner": [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0 + ], + "balance": 20000, + "data": [], + "nonce": 0 + } + } + ], + "signing_key": [ + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37, + 37 + ], + "bedrock_config": { + "channel_id": "0101010101010101010101010101010101010101010101010101010101010101", + "node_url": "http://127.0.0.1:8080", + "auth": { + "username": "user" + } + } +} diff --git a/integration_tests/configs/sequencer/sequencer_config.json b/integration_tests/configs/sequencer/detached/sequencer_config.json similarity index 100% rename from integration_tests/configs/sequencer/sequencer_config.json rename to integration_tests/configs/sequencer/detached/sequencer_config.json diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index 5cb7233e..524621ad 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -3,19 +3,21 @@ use std::{net::SocketAddr, path::PathBuf, sync::LazyLock}; use actix_web::dev::ServerHandle; -use anyhow::{Context as _, Result}; +use anyhow::{Context, Result}; use base64::{Engine, engine::general_purpose::STANDARD as BASE64}; use common::{ sequencer_client::SequencerClient, transaction::{EncodedTransaction, NSSATransaction}, }; use futures::FutureExt as _; +use indexer_core::{IndexerCore, config::IndexerConfig}; use log::debug; use nssa::PrivacyPreservingTransaction; use nssa_core::Commitment; use sequencer_core::config::SequencerConfig; use tempfile::TempDir; use tokio::task::JoinHandle; +use url::Url; use wallet::{WalletCore, config::WalletConfigOverrides}; // TODO: Remove this and control time from tests @@ -38,6 +40,7 @@ static LOGGER: LazyLock<()> = LazyLock::new(env_logger::init); pub struct TestContext { sequencer_server_handle: ServerHandle, sequencer_loop_handle: JoinHandle>, + indexer_loop_handle: Option>>, sequencer_client: SequencerClient, wallet: WalletCore, _temp_sequencer_dir: TempDir, @@ -45,24 +48,47 @@ pub struct TestContext { } impl TestContext { - /// Create new test context. + /// Create new test context in detached mode. Default. pub async fn new() -> Result { let manifest_dir = env!("CARGO_MANIFEST_DIR"); let sequencer_config_path = - PathBuf::from(manifest_dir).join("configs/sequencer/sequencer_config.json"); + PathBuf::from(manifest_dir).join("configs/sequencer/detached/sequencer_config.json"); let sequencer_config = SequencerConfig::from_path(&sequencer_config_path) .context("Failed to create sequencer config from file")?; - Self::new_with_sequencer_config(sequencer_config).await + Self::new_with_sequencer_and_maybe_indexer_configs(sequencer_config, None).await } - /// Create new test context with custom sequencer config. + /// Create new test context in local bedrock node attached mode. + pub async fn new_bedrock_local_attached() -> Result { + let manifest_dir = env!("CARGO_MANIFEST_DIR"); + + let sequencer_config_path = PathBuf::from(manifest_dir) + .join("configs/sequencer/bedrock_local_attached/sequencer_config.json"); + + let sequencer_config = SequencerConfig::from_path(&sequencer_config_path) + .context("Failed to create sequencer config from file")?; + + let indexer_config_path = + PathBuf::from(manifest_dir).join("configs/indexer/indexer_config.json"); + + let indexer_config = IndexerConfig::from_path(&indexer_config_path) + .context("Failed to create indexer config from file")?; + + Self::new_with_sequencer_and_maybe_indexer_configs(sequencer_config, Some(indexer_config)) + .await + } + + /// Create new test context with custom sequencer config and maybe indexer config. /// /// `home` and `port` fields of the provided config will be overridden to meet tests parallelism /// requirements. - pub async fn new_with_sequencer_config(sequencer_config: SequencerConfig) -> Result { + pub async fn new_with_sequencer_and_maybe_indexer_configs( + sequencer_config: SequencerConfig, + indexer_config: Option, + ) -> Result { // Ensure logger is initialized only once *LOGGER; @@ -86,17 +112,41 @@ impl TestContext { .await .context("Failed to setup wallet")?; - let sequencer_client = - SequencerClient::new(sequencer_addr).context("Failed to create sequencer client")?; + let sequencer_client = SequencerClient::new( + Url::parse(&sequencer_addr).context("Failed to parse sequencer addr")?, + ) + .context("Failed to create sequencer client")?; - Ok(Self { - sequencer_server_handle, - sequencer_loop_handle, - sequencer_client, - wallet, - _temp_sequencer_dir: temp_sequencer_dir, - _temp_wallet_dir: temp_wallet_dir, - }) + if let Some(mut indexer_config) = indexer_config { + indexer_config.sequencer_client_config.addr = + Url::parse(&sequencer_addr).context("Failed to parse sequencer addr")?; + + let indexer_core = IndexerCore::new(indexer_config)?; + + let indexer_loop_handle = Some(tokio::spawn(async move { + indexer_core.subscribe_parse_block_stream().await + })); + + Ok(Self { + sequencer_server_handle, + sequencer_loop_handle, + indexer_loop_handle, + sequencer_client, + wallet, + _temp_sequencer_dir: temp_sequencer_dir, + _temp_wallet_dir: temp_wallet_dir, + }) + } else { + Ok(Self { + sequencer_server_handle, + sequencer_loop_handle, + indexer_loop_handle: None, + sequencer_client, + wallet, + _temp_sequencer_dir: temp_sequencer_dir, + _temp_wallet_dir: temp_wallet_dir, + }) + } } async fn setup_sequencer( @@ -180,6 +230,7 @@ impl Drop for TestContext { let Self { sequencer_server_handle, sequencer_loop_handle, + indexer_loop_handle, sequencer_client: _, wallet: _, _temp_sequencer_dir, @@ -187,6 +238,9 @@ impl Drop for TestContext { } = self; sequencer_loop_handle.abort(); + if let Some(indexer_loop_handle) = indexer_loop_handle { + indexer_loop_handle.abort(); + } // Can't wait here as Drop can't be async, but anyway stop signal should be sent sequencer_server_handle.stop(true).now_or_never(); diff --git a/integration_tests/tests/indexer.rs b/integration_tests/tests/indexer.rs new file mode 100644 index 00000000..b25c887b --- /dev/null +++ b/integration_tests/tests/indexer.rs @@ -0,0 +1,23 @@ +use anyhow::Result; +use integration_tests::TestContext; +use log::info; +use tokio::test; + +#[ignore = "needs complicated setup"] +#[test] +// To run this test properly, you need nomos node running in the background. +// For instructions in building nomos node, refer to [this](https://github.com/logos-blockchain/logos-blockchain?tab=readme-ov-file#running-a-logos-blockchain-node). +// +// Recommended to run node locally from build binary. +async fn indexer_run_local_node() -> Result<()> { + let _ctx = TestContext::new_bedrock_local_attached().await?; + + info!("Let's observe behaviour"); + + tokio::time::sleep(std::time::Duration::from_secs(180)).await; + + // No way to check state of indexer now + // When it will be a service, then it will become possible. + + Ok(()) +} diff --git a/integration_tests/tests/tps.rs b/integration_tests/tests/tps.rs index 73b823cf..5fc09c4c 100644 --- a/integration_tests/tests/tps.rs +++ b/integration_tests/tests/tps.rs @@ -25,7 +25,11 @@ pub async fn tps_test() -> Result<()> { let target_tps = 12; let tps_test = TpsTestManager::new(target_tps, num_transactions); - let ctx = TestContext::new_with_sequencer_config(tps_test.generate_sequencer_config()).await?; + let ctx = TestContext::new_with_sequencer_and_maybe_indexer_configs( + tps_test.generate_sequencer_config(), + None, + ) + .await?; let target_time = tps_test.target_time(); info!( diff --git a/sequencer_core/Cargo.toml b/sequencer_core/Cargo.toml index 8d2886ce..528fa16f 100644 --- a/sequencer_core/Cargo.toml +++ b/sequencer_core/Cargo.toml @@ -17,6 +17,7 @@ serde_json.workspace = true tempfile.workspace = true chrono.workspace = true log.workspace = true +tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } bedrock_client.workspace = true logos-blockchain-key-management-system-service.workspace = true logos-blockchain-core.workspace = true @@ -29,5 +30,4 @@ default = [] testnet = [] [dev-dependencies] -tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } futures.workspace = true diff --git a/sequencer_core/src/block_settlement_client.rs b/sequencer_core/src/block_settlement_client.rs index 5b323ca5..53839159 100644 --- a/sequencer_core/src/block_settlement_client.rs +++ b/sequencer_core/src/block_settlement_client.rs @@ -1,4 +1,4 @@ -use std::{fs, path::Path}; +use std::{fs, path::Path, str::FromStr}; use anyhow::{Context, Result, anyhow}; use bedrock_client::BedrockClient; @@ -10,6 +10,7 @@ use logos_blockchain_core::mantle::{ use logos_blockchain_key_management_system_service::keys::{ ED25519_SECRET_KEY_SIZE, Ed25519Key, Ed25519PublicKey, }; +use reqwest::Url; use crate::config::BedrockConfig; @@ -25,13 +26,14 @@ impl BlockSettlementClient { pub fn try_new(home: &Path, config: &BedrockConfig) -> Result { let bedrock_signing_key = load_or_create_signing_key(&home.join("bedrock_signing_key")) .context("Failed to load or create signing key")?; - let bedrock_channel_id = ChannelId::from(config.channel_id); - let bedrock_client = BedrockClient::new(None, config.node_url.clone()) - .context("Failed to initialize bedrock client")?; + let bedrock_url = Url::from_str(config.node_url.as_ref()) + .context("Bedrock node address is not a valid url")?; + let bedrock_client = + BedrockClient::new(None, bedrock_url).context("Failed to initialize bedrock client")?; Ok(Self { bedrock_client, bedrock_signing_key, - bedrock_channel_id, + bedrock_channel_id: config.channel_id, }) } diff --git a/sequencer_core/src/config.rs b/sequencer_core/src/config.rs index 74460931..3d69e8af 100644 --- a/sequencer_core/src/config.rs +++ b/sequencer_core/src/config.rs @@ -5,7 +5,8 @@ use std::{ }; use anyhow::Result; -use reqwest::Url; +use common::sequencer_client::BasicAuth; +use logos_blockchain_core::mantle::ops::channel::ChannelId; use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize, Clone)] @@ -57,9 +58,11 @@ pub struct SequencerConfig { #[derive(Clone, Serialize, Deserialize)] pub struct BedrockConfig { /// Bedrock channel ID - pub channel_id: [u8; 32], + pub channel_id: ChannelId, /// Bedrock Url - pub node_url: Url, + pub node_url: String, + /// Bedrock auth + pub auth: Option, } impl SequencerConfig { diff --git a/sequencer_core/src/lib.rs b/sequencer_core/src/lib.rs index 38671beb..9417439b 100644 --- a/sequencer_core/src/lib.rs +++ b/sequencer_core/src/lib.rs @@ -55,8 +55,8 @@ impl SequencerCore { }; let signing_key = nssa::PrivateKey::try_new(config.signing_key).unwrap(); - let channel_genesis_msg = MsgId::from([0; 32]); - let genesis_block = hashable_data.into_pending_block(&signing_key, channel_genesis_msg); + let channel_genesis_msg_id = MsgId::from([0; 32]); + let genesis_block = hashable_data.into_pending_block(&signing_key, channel_genesis_msg_id); // Sequencer should panic if unable to open db, // as fixing this issue may require actions non-native to program scope @@ -110,7 +110,6 @@ impl SequencerCore { .expect("Block settlement client should be constructible") }); - let channel_genesis_msg_id = MsgId::from([0; 32]); let sequencer_core = Self { state, store, diff --git a/sequencer_rpc/src/process.rs b/sequencer_rpc/src/process.rs index eb19b620..8b4ec7a5 100644 --- a/sequencer_rpc/src/process.rs +++ b/sequencer_rpc/src/process.rs @@ -18,8 +18,8 @@ use common::{ GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest, - GetTransactionByHashResponse, HelloRequest, HelloResponse, SendTxRequest, - SendTxResponse, + GetTransactionByHashResponse, HelloRequest, HelloResponse, PostIndexerMessageRequest, + PostIndexerMessageResponse, SendTxRequest, SendTxResponse, }, }, transaction::{EncodedTransaction, NSSATransaction}, @@ -44,6 +44,7 @@ pub const GET_ACCOUNTS_NONCES: &str = "get_accounts_nonces"; pub const GET_ACCOUNT: &str = "get_account"; pub const GET_PROOF_FOR_COMMITMENT: &str = "get_proof_for_commitment"; pub const GET_PROGRAM_IDS: &str = "get_program_ids"; +pub const POST_INDEXER_MESSAGE: &str = "post_indexer_message"; pub const HELLO_FROM_SEQUENCER: &str = "HELLO_FROM_SEQUENCER"; @@ -314,6 +315,18 @@ impl JsonHandler { respond(response) } + async fn process_indexer_message(&self, request: Request) -> Result { + let _indexer_post_req = PostIndexerMessageRequest::parse(Some(request.params))?; + + // ToDo: Add indexer messages handling + + let response = PostIndexerMessageResponse { + status: "Success".to_string(), + }; + + respond(response) + } + pub async fn process_request_internal(&self, request: Request) -> Result { match request.method.as_ref() { HELLO => self.process_temp_hello(request).await, @@ -329,6 +342,7 @@ impl JsonHandler { GET_TRANSACTION_BY_HASH => self.process_get_transaction_by_hash(request).await, GET_PROOF_FOR_COMMITMENT => self.process_get_proof_by_commitment(request).await, GET_PROGRAM_IDS => self.process_get_program_ids(request).await, + POST_INDEXER_MESSAGE => self.process_indexer_message(request).await, _ => Err(RpcErr(RpcError::method_not_found(request.method))), } } @@ -340,10 +354,13 @@ mod tests { use base58::ToBase58; use base64::{Engine, engine::general_purpose}; - use common::{test_utils::sequencer_sign_key_for_testing, transaction::EncodedTransaction}; + use common::{ + sequencer_client::BasicAuth, test_utils::sequencer_sign_key_for_testing, + transaction::EncodedTransaction, + }; use sequencer_core::{ SequencerCore, - config::{AccountInitialData, SequencerConfig}, + config::{AccountInitialData, BedrockConfig, SequencerConfig}, }; use serde_json::Value; use tempfile::tempdir; @@ -388,13 +405,21 @@ mod tests { initial_accounts, initial_commitments: vec![], signing_key: *sequencer_sign_key_for_testing().value(), - bedrock_config: None, retry_pending_blocks_timeout_millis: 1000 * 60 * 4, + bedrock_config: Some(BedrockConfig { + channel_id: [42; 32].into(), + node_url: "http://localhost:8080".to_string(), + auth: Some(BasicAuth { + username: "user".to_string(), + password: None, + }), + }), } } async fn components_for_tests() -> (JsonHandler, Vec, EncodedTransaction) { let config = sequencer_config_for_tests(); + let (mut sequencer_core, mempool_handle) = SequencerCore::start_from_config(config); let initial_accounts = sequencer_core.sequencer_config().initial_accounts.clone(); diff --git a/sequencer_runner/configs/debug/sequencer_config.json b/sequencer_runner/configs/debug/sequencer_config.json index 5aeacb48..80bfe0a4 100644 --- a/sequencer_runner/configs/debug/sequencer_config.json +++ b/sequencer_runner/configs/debug/sequencer_config.json @@ -157,7 +157,10 @@ 37 ], "bedrock_config": { - "channel_id": [1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1], - "node_url": "http://localhost:58072" + "channel_id": "0101010101010101010101010101010101010101010101010101010101010101", + "node_url": "http://localhost:8080", + "auth": { + "username": "user" + } } } diff --git a/wallet/Cargo.toml b/wallet/Cargo.toml index bef25007..292cebac 100644 --- a/wallet/Cargo.toml +++ b/wallet/Cargo.toml @@ -29,3 +29,4 @@ risc0-zkvm.workspace = true async-stream = "0.3.6" indicatif = { version = "0.18.3", features = ["improved_unicode"] } optfield = "0.4.0" +url.workspace = true diff --git a/wallet/src/config.rs b/wallet/src/config.rs index 45407b6d..8da28bce 100644 --- a/wallet/src/config.rs +++ b/wallet/src/config.rs @@ -1,10 +1,10 @@ use std::{ io::{BufReader, Write as _}, path::Path, - str::FromStr, }; use anyhow::{Context as _, Result}; +use common::sequencer_client::BasicAuth; use key_protocol::key_management::{ KeyChain, key_tree::{ @@ -14,49 +14,6 @@ use key_protocol::key_management::{ use log::warn; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct BasicAuth { - pub username: String, - pub password: Option, -} - -impl std::fmt::Display for BasicAuth { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.username)?; - if let Some(password) = &self.password { - write!(f, ":{password}")?; - } - - Ok(()) - } -} - -impl FromStr for BasicAuth { - type Err = anyhow::Error; - - fn from_str(s: &str) -> Result { - let parse = || { - let mut parts = s.splitn(2, ':'); - let username = parts.next()?; - let password = parts.next().filter(|p| !p.is_empty()); - if parts.next().is_some() { - return None; - } - - Some((username, password)) - }; - - let (username, password) = parse().ok_or_else(|| { - anyhow::anyhow!("Invalid auth format. Expected 'user' or 'user:password'") - })?; - - Ok(Self { - username: username.to_string(), - password: password.map(|p| p.to_string()), - }) - } -} - #[derive(Debug, Clone, Serialize, Deserialize)] pub struct InitialAccountDataPublic { pub account_id: String, diff --git a/wallet/src/lib.rs b/wallet/src/lib.rs index 45709d05..09cd5c35 100644 --- a/wallet/src/lib.rs +++ b/wallet/src/lib.rs @@ -23,6 +23,7 @@ use nssa_core::{ }; pub use privacy_preserving_tx::PrivacyPreservingAccount; use tokio::io::AsyncWriteExt; +use url::Url; use crate::{ config::{PersistentStorage, WalletConfigOverrides}, @@ -188,13 +189,9 @@ impl WalletCore { config.apply_overrides(config_overrides); } - let basic_auth = config - .basic_auth - .as_ref() - .map(|auth| (auth.username.clone(), auth.password.clone())); let sequencer_client = Arc::new(SequencerClient::new_with_auth( - config.sequencer_addr.clone(), - basic_auth, + Url::parse(&config.sequencer_addr)?, + config.basic_auth.clone(), )?); let tx_poller = TxPoller::new(config.clone(), Arc::clone(&sequencer_client));