diff --git a/Cargo.lock b/Cargo.lock index 199461bb..dee037a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2954,6 +2954,7 @@ dependencies = [ "env_logger", "futures", "hex", + "indexer", "key_protocol", "log", "nssa", @@ -5176,6 +5177,7 @@ dependencies = [ "chrono", "common", "futures", + "indexer", "log", "mempool", "nssa", @@ -5196,10 +5198,12 @@ dependencies = [ "anyhow", "base58", "base64", + "bedrock_client", "borsh", "common", "futures", "hex", + "indexer", "itertools 0.14.0", "log", "mempool", @@ -5218,9 +5222,11 @@ dependencies = [ "actix", "actix-web", "anyhow", + "bedrock_client", "clap", "common", "env_logger", + "indexer", "log", "sequencer_core", "sequencer_rpc", diff --git a/Cargo.toml b/Cargo.toml index 843e4f64..b010b08f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ members = [ "examples/program_deployment", "examples/program_deployment/methods", "examples/program_deployment/methods/guest", - "bedrock_client", + "bedrock_client", "indexer", ] diff --git a/common/src/block.rs b/common/src/block.rs index baba1e42..346c2126 100644 --- a/common/src/block.rs +++ b/common/src/block.rs @@ -69,6 +69,10 @@ impl HashableBlockData { }, } } + + pub fn block_hash(&self) -> BlockHash { + OwnHasher::hash(&borsh::to_vec(&self).unwrap()) + } } impl From for HashableBlockData { diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml index 2025a3b1..6d05bed5 100644 --- a/indexer/Cargo.toml +++ b/indexer/Cargo.toml @@ -30,4 +30,4 @@ async-stream = "0.3.6" indicatif = { version = "0.18.3", features = ["improved_unicode"] } risc0-zkvm.workspace = true url.workspace = true -nomos-core.workspace = true \ No newline at end of file +nomos-core.workspace = true diff --git a/indexer/src/config.rs b/indexer/src/config.rs index 8818d15e..3c7a75fc 100644 --- a/indexer/src/config.rs +++ b/indexer/src/config.rs @@ -1,6 +1,7 @@ use nomos_core::mantle::ops::channel::ChannelId; +use serde::{Deserialize, Serialize}; -#[derive(Debug)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct IndexerConfig { pub resubscribe_interval: u64, pub channel_id: ChannelId, diff --git a/indexer/src/lib.rs b/indexer/src/lib.rs index 1474f85c..93c5094b 100644 --- a/indexer/src/lib.rs +++ b/indexer/src/lib.rs @@ -1,6 +1,6 @@ use anyhow::Result; use bedrock_client::{BasicAuthCredentials, BedrockClient}; -use common::block::HashableBlockData; +use common::block::{BlockHash, HashableBlockData}; use futures::StreamExt; use nomos_core::mantle::{ Op, SignedMantleTx, @@ -9,22 +9,24 @@ use nomos_core::mantle::{ use tokio::sync::mpsc::Sender; use url::Url; -use crate::config::IndexerConfig; +use crate::{config::IndexerConfig, state::IndexerState}; pub mod config; +pub mod state; pub struct IndexerCore { pub bedrock_client: BedrockClient, pub bedrock_url: Url, - pub channel_sender: Sender, + pub channel_sender: Sender, pub config: IndexerConfig, + pub state: IndexerState, } impl IndexerCore { pub fn new( addr: &str, auth: Option, - sender: Sender, + sender: Sender, config: IndexerConfig, ) -> Result { Ok(Self { @@ -32,6 +34,10 @@ impl IndexerCore { bedrock_url: Url::parse(addr)?, channel_sender: sender, config, + // No state setup for now, future task. + state: IndexerState { + latest_seen_block: 0, + }, }) } @@ -58,7 +64,8 @@ impl IndexerCore { ); for l2_block in l2_blocks_parsed { - self.channel_sender.send(l2_block).await?; + // Sending data into sequencer, may need to be expanded. + self.channel_sender.send(l2_block.block_hash()).await?; } } } diff --git a/indexer/src/state.rs b/indexer/src/state.rs new file mode 100644 index 00000000..74301688 --- /dev/null +++ b/indexer/src/state.rs @@ -0,0 +1,5 @@ +#[derive(Debug)] +pub struct IndexerState { + // Only one field for now, for testing. + pub latest_seen_block: u64, +} diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index b888c177..be29063c 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -11,6 +11,7 @@ sequencer_runner.workspace = true wallet.workspace = true common.workspace = true key_protocol.workspace = true +indexer.workspace = true anyhow.workspace = true env_logger.workspace = true diff --git a/integration_tests/tests/tps.rs b/integration_tests/tests/tps.rs index b62a3ce7..6c2de6d9 100644 --- a/integration_tests/tests/tps.rs +++ b/integration_tests/tests/tps.rs @@ -1,6 +1,7 @@ use std::time::{Duration, Instant}; use anyhow::Result; +use indexer::config::IndexerConfig; use integration_tests::TestContext; use key_protocol::key_management::ephemeral_key_holder::EphemeralKeyHolder; use log::info; @@ -185,6 +186,12 @@ impl TpsTestManager { initial_accounts: initial_public_accounts, initial_commitments: vec![initial_commitment], signing_key: [37; 32], + bedrock_addr: "0.0.0.0".to_string(), + bedrock_auth: ("".to_string(), "".to_string()), + indexer_config: IndexerConfig { + resubscribe_interval: 100, + channel_id: [42; 32].into(), + }, } } } diff --git a/sequencer_core/Cargo.toml b/sequencer_core/Cargo.toml index a844c524..71fe0e16 100644 --- a/sequencer_core/Cargo.toml +++ b/sequencer_core/Cargo.toml @@ -9,6 +9,7 @@ nssa_core.workspace = true common.workspace = true storage.workspace = true mempool.workspace = true +indexer.workspace = true base58.workspace = true anyhow.workspace = true @@ -17,11 +18,11 @@ serde_json.workspace = true tempfile.workspace = true chrono.workspace = true log.workspace = true +tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } [features] default = [] testnet = [] [dev-dependencies] -tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } futures.workspace = true diff --git a/sequencer_core/src/config.rs b/sequencer_core/src/config.rs index 4ef08803..785c5328 100644 --- a/sequencer_core/src/config.rs +++ b/sequencer_core/src/config.rs @@ -5,6 +5,7 @@ use std::{ }; use anyhow::Result; +use indexer::config::IndexerConfig; use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize, Clone)] @@ -47,6 +48,12 @@ pub struct SequencerConfig { pub initial_commitments: Vec, /// Sequencer own signing key pub signing_key: [u8; 32], + /// Bedrock addr + pub bedrock_addr: String, + /// Bedrock auth + pub bedrock_auth: (String, String), + /// Indexer config + pub indexer_config: IndexerConfig, } impl SequencerConfig { diff --git a/sequencer_core/src/lib.rs b/sequencer_core/src/lib.rs index 8e193ff6..2c57a765 100644 --- a/sequencer_core/src/lib.rs +++ b/sequencer_core/src/lib.rs @@ -5,13 +5,14 @@ use anyhow::Result; use common::PINATA_BASE58; use common::{ HashType, - block::HashableBlockData, + block::{BlockHash, HashableBlockData}, transaction::{EncodedTransaction, NSSATransaction}, }; use config::SequencerConfig; use log::warn; use mempool::{MemPool, MemPoolHandle}; use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc::Receiver; use crate::block_store::SequencerBlockStore; @@ -24,6 +25,9 @@ pub struct SequencerCore { mempool: MemPool, sequencer_config: SequencerConfig, chain_height: u64, + // No logic here for now + #[allow(unused)] + receiver: Receiver, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] @@ -42,7 +46,10 @@ impl std::error::Error for TransactionMalformationError {} impl SequencerCore { /// Start Sequencer from configuration and construct transaction sender - pub fn start_from_config(config: SequencerConfig) -> (Self, MemPoolHandle) { + pub fn start_from_config( + config: SequencerConfig, + receiver: Receiver, + ) -> (Self, MemPoolHandle) { let hashable_data = HashableBlockData { block_id: config.genesis_id, transactions: vec![], @@ -93,6 +100,7 @@ impl SequencerCore { mempool, chain_height: config.genesis_id, sequencer_config: config, + receiver, }; this.sync_state_with_stored_blocks(); @@ -248,6 +256,7 @@ mod tests { use base58::{FromBase58, ToBase58}; use common::test_utils::sequencer_sign_key_for_testing; + use indexer::config::IndexerConfig; use nssa::PrivateKey; use super::*; @@ -277,6 +286,12 @@ mod tests { initial_accounts, initial_commitments: vec![], signing_key: *sequencer_sign_key_for_testing().value(), + bedrock_addr: "0.0.0.0".to_string(), + bedrock_auth: ("".to_string(), "".to_string()), + indexer_config: IndexerConfig { + resubscribe_interval: 100, + channel_id: [42; 32].into(), + }, } } @@ -322,7 +337,9 @@ mod tests { async fn common_setup_with_config( config: SequencerConfig, ) -> (SequencerCore, MemPoolHandle) { - let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config); + let (_, receiver) = tokio::sync::mpsc::channel(100); + + let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config, receiver); let tx = common::test_utils::produce_dummy_empty_transaction(); mempool_handle.push(tx).await.unwrap(); @@ -337,7 +354,9 @@ mod tests { #[test] fn test_start_from_config() { let config = setup_sequencer_config(); - let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone()); + let (_, receiver) = tokio::sync::mpsc::channel(100); + let (sequencer, _mempool_handle) = + SequencerCore::start_from_config(config.clone(), receiver); assert_eq!(sequencer.chain_height, config.genesis_id); assert_eq!(sequencer.sequencer_config.max_num_tx_in_block, 10); @@ -396,7 +415,9 @@ mod tests { let initial_accounts = vec![initial_acc1, initial_acc2]; let config = setup_sequencer_config_variable_initial_accounts(initial_accounts); - let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone()); + let (_, receiver) = tokio::sync::mpsc::channel(100); + let (sequencer, _mempool_handle) = + SequencerCore::start_from_config(config.clone(), receiver); let acc1_account_id = config.initial_accounts[0] .account_id @@ -729,7 +750,9 @@ mod tests { // from `acc_1` to `acc_2`. The block created with that transaction will be kept stored in // the temporary directory for the block storage of this test. { - let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config.clone()); + let (_, receiver) = tokio::sync::mpsc::channel(100); + let (mut sequencer, mempool_handle) = + SequencerCore::start_from_config(config.clone(), receiver); let signing_key = PrivateKey::try_new([1; 32]).unwrap(); let tx = common::test_utils::create_transaction_native_token_transfer( @@ -753,7 +776,9 @@ mod tests { // Instantiating a new sequencer from the same config. This should load the existing block // with the above transaction and update the state to reflect that. - let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone()); + let (_, receiver) = tokio::sync::mpsc::channel(100); + let (sequencer, _mempool_handle) = + SequencerCore::start_from_config(config.clone(), receiver); let balance_acc_1 = sequencer.state.get_account_by_id(&acc1_account_id).balance; let balance_acc_2 = sequencer.state.get_account_by_id(&acc2_account_id).balance; diff --git a/sequencer_rpc/Cargo.toml b/sequencer_rpc/Cargo.toml index 2abd5400..cf95b009 100644 --- a/sequencer_rpc/Cargo.toml +++ b/sequencer_rpc/Cargo.toml @@ -8,6 +8,8 @@ nssa.workspace = true common.workspace = true mempool.workspace = true sequencer_core.workspace = true +indexer.workspace = true +bedrock_client.workspace = true anyhow.workspace = true serde_json.workspace = true diff --git a/sequencer_rpc/src/lib.rs b/sequencer_rpc/src/lib.rs index 89b3e8cd..d41dbbd3 100644 --- a/sequencer_rpc/src/lib.rs +++ b/sequencer_rpc/src/lib.rs @@ -8,6 +8,7 @@ use common::{ rpc_primitives::errors::{RpcError, RpcErrorKind}, transaction::EncodedTransaction, }; +use indexer::IndexerCore; use mempool::MemPoolHandle; pub use net_utils::*; use sequencer_core::SequencerCore; @@ -20,6 +21,9 @@ use self::types::err_rpc::RpcErr; // ToDo: Add necessary fields pub struct JsonHandler { sequencer_state: Arc>, + // No functionality for now. + #[allow(unused)] + indexer_state: Arc>, mempool_handle: MemPoolHandle, } diff --git a/sequencer_rpc/src/net_utils.rs b/sequencer_rpc/src/net_utils.rs index 8b9b7e64..dc1f287b 100644 --- a/sequencer_rpc/src/net_utils.rs +++ b/sequencer_rpc/src/net_utils.rs @@ -7,6 +7,7 @@ use common::{ transaction::EncodedTransaction, }; use futures::{Future, FutureExt}; +use indexer::IndexerCore; use log::info; use mempool::MemPoolHandle; use sequencer_core::SequencerCore; @@ -46,6 +47,7 @@ pub fn new_http_server( config: RpcConfig, seuquencer_core: Arc>, mempool_handle: MemPoolHandle, + indexer_core: Arc>, ) -> io::Result<(actix_web::dev::Server, SocketAddr)> { let RpcConfig { addr, @@ -55,6 +57,7 @@ pub fn new_http_server( info!(target:NETWORK, "Starting HTTP server at {addr}"); let handler = web::Data::new(JsonHandler { sequencer_state: seuquencer_core.clone(), + indexer_state: indexer_core.clone(), mempool_handle, }); diff --git a/sequencer_rpc/src/process.rs b/sequencer_rpc/src/process.rs index 387abf28..097fb47b 100644 --- a/sequencer_rpc/src/process.rs +++ b/sequencer_rpc/src/process.rs @@ -340,7 +340,9 @@ mod tests { use base58::ToBase58; use base64::{Engine, engine::general_purpose}; + use bedrock_client::BasicAuthCredentials; use common::{test_utils::sequencer_sign_key_for_testing, transaction::EncodedTransaction}; + use indexer::{IndexerCore, config::IndexerConfig}; use sequencer_core::{ SequencerCore, config::{AccountInitialData, SequencerConfig}, @@ -388,12 +390,30 @@ mod tests { initial_accounts, initial_commitments: vec![], signing_key: *sequencer_sign_key_for_testing().value(), + bedrock_addr: "0.0.0.0".to_string(), + bedrock_auth: ("".to_string(), "".to_string()), + indexer_config: IndexerConfig { + resubscribe_interval: 100, + channel_id: [42; 32].into(), + }, } } async fn components_for_tests() -> (JsonHandler, Vec, EncodedTransaction) { let config = sequencer_config_for_tests(); - let (mut sequencer_core, mempool_handle) = SequencerCore::start_from_config(config); + let (sender, receiver) = tokio::sync::mpsc::channel(100); + let indexer_core = IndexerCore::new( + &config.bedrock_addr, + Some(BasicAuthCredentials::new( + config.bedrock_auth.0.clone(), + Some(config.bedrock_auth.1.clone()), + )), + sender, + config.indexer_config.clone(), + ) + .unwrap(); + let (mut sequencer_core, mempool_handle) = + SequencerCore::start_from_config(config, receiver); let initial_accounts = sequencer_core.sequencer_config().initial_accounts.clone(); let signing_key = nssa::PrivateKey::try_new([1; 32]).unwrap(); @@ -419,10 +439,12 @@ mod tests { .unwrap(); let sequencer_core = Arc::new(Mutex::new(sequencer_core)); + let indexer_core = Arc::new(Mutex::new(indexer_core)); ( JsonHandler { sequencer_state: sequencer_core, + indexer_state: indexer_core, mempool_handle, }, initial_accounts, diff --git a/sequencer_runner/Cargo.toml b/sequencer_runner/Cargo.toml index 55f56dec..d98ecf66 100644 --- a/sequencer_runner/Cargo.toml +++ b/sequencer_runner/Cargo.toml @@ -7,6 +7,8 @@ edition = "2024" common.workspace = true sequencer_core = { workspace = true, features = ["testnet"] } sequencer_rpc.workspace = true +indexer.workspace = true +bedrock_client.workspace = true clap = { workspace = true, features = ["derive", "env"] } anyhow.workspace = true diff --git a/sequencer_runner/src/lib.rs b/sequencer_runner/src/lib.rs index 5c1ab920..c1863a17 100644 --- a/sequencer_runner/src/lib.rs +++ b/sequencer_runner/src/lib.rs @@ -2,8 +2,10 @@ use std::{net::SocketAddr, path::PathBuf, sync::Arc}; use actix_web::dev::ServerHandle; use anyhow::Result; +use bedrock_client::BasicAuthCredentials; use clap::Parser; use common::rpc_primitives::RpcConfig; +use indexer::IndexerCore; use log::info; use sequencer_core::{SequencerCore, config::SequencerConfig}; use sequencer_rpc::new_http_server; @@ -24,16 +26,33 @@ pub async fn startup_sequencer( let block_timeout = app_config.block_create_timeout_millis; let port = app_config.port; - let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config); + // ToDo: Maybe make buffer size configurable. + let (sender, receiver) = tokio::sync::mpsc::channel(100); + + let indexer_core = IndexerCore::new( + &app_config.bedrock_addr, + Some(BasicAuthCredentials::new( + app_config.bedrock_auth.0.clone(), + Some(app_config.bedrock_auth.1.clone()), + )), + sender, + app_config.indexer_config.clone(), + )?; + + info!("Indexer core set up"); + + let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config, receiver); info!("Sequencer core set up"); + let indexer_core_wrapped = Arc::new(Mutex::new(indexer_core)); let seq_core_wrapped = Arc::new(Mutex::new(sequencer_core)); let (http_server, addr) = new_http_server( RpcConfig::with_port(port), Arc::clone(&seq_core_wrapped), mempool_handle, + Arc::clone(&indexer_core_wrapped), )?; info!("HTTP server started"); let http_server_handle = http_server.handle();