From 2a2fe1347a83e9a731a74de65747aabad5b76a9e Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Thu, 22 Jan 2026 14:44:48 +0200 Subject: [PATCH] fix: new model rewrite --- Cargo.lock | 3 -- common/src/communication/indexer.rs | 6 +++ common/src/communication/mod.rs | 1 + common/src/lib.rs | 1 + common/src/rpc_primitives/requests.rs | 10 +++-- common/src/sequencer_client.rs | 17 ++++---- indexer/src/config.rs | 11 +++++ indexer/src/lib.rs | 58 ++++++++++++++---------- indexer/src/message.rs | 7 --- integration_tests/src/lib.rs | 30 +++---------- integration_tests/tests/indexer.rs | 53 +++++++++++----------- sequencer_core/Cargo.toml | 1 - sequencer_core/src/config.rs | 3 -- sequencer_core/src/lib.rs | 21 +++------ sequencer_rpc/Cargo.toml | 1 - sequencer_rpc/src/lib.rs | 4 -- sequencer_rpc/src/net_utils.rs | 3 -- sequencer_rpc/src/process.rs | 59 +++++-------------------- sequencer_runner/Cargo.toml | 1 - sequencer_runner/src/lib.rs | 63 +++------------------------ 20 files changed, 125 insertions(+), 228 deletions(-) create mode 100644 common/src/communication/indexer.rs create mode 100644 common/src/communication/mod.rs delete mode 100644 indexer/src/message.rs diff --git a/Cargo.lock b/Cargo.lock index 0506b1b1..48b32cf8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5183,7 +5183,6 @@ dependencies = [ "chrono", "common", "futures", - "indexer", "key-management-system-service", "log", "mempool", @@ -5213,7 +5212,6 @@ dependencies = [ "common", "futures", "hex", - "indexer", "itertools 0.14.0", "log", "mempool", @@ -5236,7 +5234,6 @@ dependencies = [ "clap", "common", "env_logger", - "indexer", "log", "sequencer_core", "sequencer_rpc", diff --git a/common/src/communication/indexer.rs b/common/src/communication/indexer.rs new file mode 100644 index 00000000..a0edc176 --- /dev/null +++ b/common/src/communication/indexer.rs @@ -0,0 +1,6 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Message { + L2BlockFinalized { l2_block_height: u64 }, +} diff --git a/common/src/communication/mod.rs b/common/src/communication/mod.rs new file mode 100644 index 00000000..d99eb481 --- /dev/null +++ b/common/src/communication/mod.rs @@ -0,0 +1 @@ +pub mod indexer; diff --git a/common/src/lib.rs b/common/src/lib.rs index b64e6ef9..68902811 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -1,4 +1,5 @@ pub mod block; +pub mod communication; pub mod error; pub mod rpc_primitives; pub mod sequencer_client; diff --git a/common/src/rpc_primitives/requests.rs b/common/src/rpc_primitives/requests.rs index 3c231f1c..6191df44 100644 --- a/common/src/rpc_primitives/requests.rs +++ b/common/src/rpc_primitives/requests.rs @@ -74,7 +74,9 @@ pub struct GetProofForCommitmentRequest { pub struct GetProgramIdsRequest {} #[derive(Serialize, Deserialize, Debug)] -pub struct GetLastSeenL2BlockAtIndexerRequest {} +pub struct PostIndexerMessageRequest { + pub message: crate::communication::indexer::Message, +} parse_request!(HelloRequest); parse_request!(RegisterAccountRequest); @@ -90,7 +92,7 @@ parse_request!(GetAccountsNoncesRequest); parse_request!(GetProofForCommitmentRequest); parse_request!(GetAccountRequest); parse_request!(GetProgramIdsRequest); -parse_request!(GetLastSeenL2BlockAtIndexerRequest); +parse_request!(PostIndexerMessageRequest); #[derive(Serialize, Deserialize, Debug)] pub struct HelloResponse { @@ -222,6 +224,6 @@ pub struct GetInitialTestnetAccountsResponse { } #[derive(Serialize, Deserialize, Debug)] -pub struct GetLastSeenL2BlockResponse { - pub last_block: Option, +pub struct PostIndexerMessageResponse { + pub status: String, } diff --git a/common/src/sequencer_client.rs b/common/src/sequencer_client.rs index 57b0822c..f76fc3e5 100644 --- a/common/src/sequencer_client.rs +++ b/common/src/sequencer_client.rs @@ -18,10 +18,10 @@ use crate::{ GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse, GetInitialTestnetAccountsResponse, GetLastBlockRequest, GetLastBlockResponse, - GetLastSeenL2BlockAtIndexerRequest, GetLastSeenL2BlockResponse, GetProgramIdsRequest, - GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse, - GetTransactionByHashRequest, GetTransactionByHashResponse, SendTxRequest, - SendTxResponse, + GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, + GetProofForCommitmentResponse, GetTransactionByHashRequest, + GetTransactionByHashResponse, PostIndexerMessageRequest, PostIndexerMessageResponse, + SendTxRequest, SendTxResponse, }, }, transaction::{EncodedTransaction, NSSATransaction}, @@ -350,15 +350,16 @@ impl SequencerClient { } /// Get last seen l2 block at indexer - pub async fn get_last_seen_l2_block_at_indexer( + pub async fn post_indexer_message( &self, - ) -> Result { - let last_req = GetLastSeenL2BlockAtIndexerRequest {}; + message: crate::communication::indexer::Message, + ) -> Result { + let last_req = PostIndexerMessageRequest { message }; let req = serde_json::to_value(last_req).unwrap(); let resp = self - .call_method_with_payload("get_last_seen_l2_block_at_indexer", req) + .call_method_with_payload("post_indexer_message", req) .await .unwrap(); diff --git a/indexer/src/config.rs b/indexer/src/config.rs index 2a5cb0dd..4b717de6 100644 --- a/indexer/src/config.rs +++ b/indexer/src/config.rs @@ -1,9 +1,20 @@ +use nomos_core::mantle::ops::channel::ChannelId; use serde::{Deserialize, Serialize}; +#[derive(Debug, Clone, Serialize, Deserialize)] +/// ToDo: Expand if necessary +pub struct ClientConfig { + pub addr: String, + pub auth: Option<(String, Option)>, +} + #[derive(Debug, Clone, Serialize, Deserialize)] /// Note: For individual RPC requests we use Fibonacci backoff retry strategy pub struct IndexerConfig { pub resubscribe_interval_millis: u64, pub start_delay_millis: u64, pub max_retries: usize, + pub bedrock_client_config: ClientConfig, + pub sequencer_client_config: ClientConfig, + pub channel_id: ChannelId, } diff --git a/indexer/src/lib.rs b/indexer/src/lib.rs index 1b93ff20..2099d55c 100644 --- a/indexer/src/lib.rs +++ b/indexer/src/lib.rs @@ -2,45 +2,49 @@ use std::sync::Arc; use anyhow::Result; use bedrock_client::{BasicAuthCredentials, BedrockClient}; -use common::block::HashableBlockData; +use common::{ + block::HashableBlockData, communication::indexer::Message, + rpc_primitives::requests::PostIndexerMessageResponse, sequencer_client::SequencerClient, +}; use futures::StreamExt; use log::info; use nomos_core::mantle::{ Op, SignedMantleTx, ops::channel::{ChannelId, inscribe::InscriptionOp}, }; -use tokio::sync::{RwLock, mpsc::Sender}; +use tokio::sync::RwLock; use url::Url; -use crate::{config::IndexerConfig, message::Message, state::IndexerState}; +use crate::{config::IndexerConfig, state::IndexerState}; pub mod config; -pub mod message; pub mod state; pub struct IndexerCore { pub bedrock_client: BedrockClient, - pub channel_sender: Sender, + pub sequencer_client: SequencerClient, pub config: IndexerConfig, + // ToDo: Remove this duplication by unifying addr representation in all clients. pub bedrock_url: Url, - pub channel_id: ChannelId, pub state: IndexerState, } impl IndexerCore { - pub fn new( - addr: &str, - auth: Option, - sender: Sender, - config: IndexerConfig, - channel_id: ChannelId, - ) -> Result { + pub fn new(config: IndexerConfig) -> Result { Ok(Self { - bedrock_client: BedrockClient::new(auth)?, - bedrock_url: Url::parse(addr)?, - channel_sender: sender, + bedrock_client: BedrockClient::new( + config + .bedrock_client_config + .auth + .clone() + .map(|auth| BasicAuthCredentials::new(auth.0, auth.1)), + )?, + bedrock_url: Url::parse(&config.bedrock_client_config.addr)?, + sequencer_client: SequencerClient::new_with_auth( + config.sequencer_client_config.addr.clone(), + config.sequencer_client_config.auth.clone(), + )?, config, - channel_id, // No state setup for now, future task. state: IndexerState { latest_seen_block: Arc::new(RwLock::new(0)), @@ -75,8 +79,10 @@ impl IndexerCore { { info!("Extracted L1 block at height {}", block_info.height); - let l2_blocks_parsed = - parse_blocks(l1_block.into_transactions().into_iter(), &self.channel_id); + let l2_blocks_parsed = parse_blocks( + l1_block.into_transactions().into_iter(), + &self.config.channel_id, + ); for l2_block in l2_blocks_parsed { // State modification, will be updated in future @@ -88,14 +94,13 @@ impl IndexerCore { } // Sending data into sequencer, may need to be expanded. - let message = Message::BlockObserved { - l1_block_id: block_info.height, + let message = Message::L2BlockFinalized { l2_block_height: l2_block.block_id, }; - self.channel_sender.send(message.clone()).await?; + let status = self.send_message_to_sequencer(message.clone()).await?; - info!("Sent message {:#?} to sequencer", message); + info!("Sent message {message:#?} to sequencer; status {status:#?}"); } } } @@ -107,6 +112,13 @@ impl IndexerCore { .await; } } + + pub async fn send_message_to_sequencer( + &self, + message: Message, + ) -> Result { + Ok(self.sequencer_client.post_indexer_message(message).await?) + } } fn parse_blocks( diff --git a/indexer/src/message.rs b/indexer/src/message.rs deleted file mode 100644 index afc9cae0..00000000 --- a/indexer/src/message.rs +++ /dev/null @@ -1,7 +0,0 @@ -#[derive(Debug, Clone)] -pub enum Message { - BlockObserved { - l1_block_id: u64, - l2_block_height: u64, - }, -} diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index a2afb11b..074f9a50 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -38,7 +38,6 @@ static LOGGER: LazyLock<()> = LazyLock::new(env_logger::init); pub struct TestContext { sequencer_server_handle: ServerHandle, sequencer_loop_handle: JoinHandle>, - indexer_loop_handle: Option>>, sequencer_client: SequencerClient, wallet: WalletCore, _temp_sequencer_dir: TempDir, @@ -82,15 +81,10 @@ impl TestContext { debug!("Test context setup"); - let ( - sequencer_server_handle, - sequencer_addr, - sequencer_loop_handle, - temp_sequencer_dir, - indexer_loop_handle, - ) = Self::setup_sequencer(sequencer_config) - .await - .context("Failed to setup sequencer")?; + let (sequencer_server_handle, sequencer_addr, sequencer_loop_handle, temp_sequencer_dir) = + Self::setup_sequencer(sequencer_config) + .await + .context("Failed to setup sequencer")?; // Convert 0.0.0.0 to 127.0.0.1 for client connections // When binding to port 0, the server binds to 0.0.0.0: @@ -111,7 +105,6 @@ impl TestContext { Ok(Self { sequencer_server_handle, sequencer_loop_handle, - indexer_loop_handle, sequencer_client, wallet, _temp_sequencer_dir: temp_sequencer_dir, @@ -121,13 +114,7 @@ impl TestContext { async fn setup_sequencer( mut config: SequencerConfig, - ) -> Result<( - ServerHandle, - SocketAddr, - JoinHandle>, - TempDir, - Option>>, - )> { + ) -> Result<(ServerHandle, SocketAddr, JoinHandle>, TempDir)> { let temp_sequencer_dir = tempfile::tempdir().context("Failed to create temp dir for sequencer home")?; @@ -139,7 +126,7 @@ impl TestContext { // Setting port to 0 lets the OS choose a free port for us config.port = 0; - let (sequencer_server_handle, sequencer_addr, sequencer_loop_handle, indexer_loop_handle) = + let (sequencer_server_handle, sequencer_addr, sequencer_loop_handle) = sequencer_runner::startup_sequencer(config).await?; Ok(( @@ -147,7 +134,6 @@ impl TestContext { sequencer_addr, sequencer_loop_handle, temp_sequencer_dir, - indexer_loop_handle, )) } @@ -207,7 +193,6 @@ impl Drop for TestContext { let Self { sequencer_server_handle, sequencer_loop_handle, - indexer_loop_handle, sequencer_client: _, wallet: _, _temp_sequencer_dir, @@ -215,9 +200,6 @@ impl Drop for TestContext { } = self; sequencer_loop_handle.abort(); - if let Some(handle) = indexer_loop_handle { - handle.abort(); - } // Can't wait here as Drop can't be async, but anyway stop signal should be sent sequencer_server_handle.stop(true).now_or_never(); diff --git a/integration_tests/tests/indexer.rs b/integration_tests/tests/indexer.rs index a7f27611..6186e247 100644 --- a/integration_tests/tests/indexer.rs +++ b/integration_tests/tests/indexer.rs @@ -1,33 +1,34 @@ -use anyhow::Result; -use integration_tests::TestContext; -use log::info; -use tokio::test; +// use anyhow::Result; +// use integration_tests::TestContext; +// use log::info; +// use tokio::test; -#[ignore = "needs complicated setup"] -#[test] -/// To run this test properly, you need nomos node running in the background. -/// For instructions in building nomos node, refer to [this](https://github.com/logos-blockchain/logos-blockchain?tab=readme-ov-file#running-a-logos-blockchain-node). -/// -/// Recommended to run node locally from build binary. -async fn indexer_run_local_node() -> Result<()> { - let ctx = TestContext::new_bedrock_local_attached().await?; +// ToDo: Uncomment when indexer RPC is available +//#[ignore = "needs complicated setup"] +//#[test] +// To run this test properly, you need nomos node running in the background. +// For instructions in building nomos node, refer to [this](https://github.com/logos-blockchain/logos-blockchain?tab=readme-ov-file#running-a-logos-blockchain-node). +// +// Recommended to run node locally from build binary. +// async fn indexer_run_local_node() -> Result<()> { +// let ctx = TestContext::new_bedrock_local_attached().await?; - info!("Let's observe behaviour"); +// info!("Let's observe behaviour"); - tokio::time::sleep(std::time::Duration::from_secs(180)).await; +// tokio::time::sleep(std::time::Duration::from_secs(180)).await; - let gen_id = ctx - .sequencer_client() - .get_last_seen_l2_block_at_indexer() - .await - .unwrap() - .last_block - .unwrap(); +// let gen_id = ctx +// .sequencer_client() +// .get_last_seen_l2_block_at_indexer() +// .await +// .unwrap() +// .last_block +// .unwrap(); - // Checking, that some blocks are landed on bedrock - assert!(gen_id > 0); +// // Checking, that some blocks are landed on bedrock +// assert!(gen_id > 0); - info!("Last seen L2 block at indexer is {gen_id}"); +// info!("Last seen L2 block at indexer is {gen_id}"); - Ok(()) -} +// Ok(()) +// } diff --git a/sequencer_core/Cargo.toml b/sequencer_core/Cargo.toml index 4ef1e8f6..58d7a9b8 100644 --- a/sequencer_core/Cargo.toml +++ b/sequencer_core/Cargo.toml @@ -9,7 +9,6 @@ nssa_core.workspace = true common.workspace = true storage.workspace = true mempool.workspace = true -indexer.workspace = true base58.workspace = true anyhow.workspace = true diff --git a/sequencer_core/src/config.rs b/sequencer_core/src/config.rs index 3ef692d0..a7eecc6c 100644 --- a/sequencer_core/src/config.rs +++ b/sequencer_core/src/config.rs @@ -5,7 +5,6 @@ use std::{ }; use anyhow::Result; -use indexer::config::IndexerConfig; use nomos_core::mantle::ops::channel::ChannelId; use serde::{Deserialize, Serialize}; @@ -63,8 +62,6 @@ pub struct BedrockConfig { pub user: String, /// Bedrock password(optional) pub password: Option, - /// Indexer config - pub indexer_config: IndexerConfig, } impl SequencerConfig { diff --git a/sequencer_core/src/lib.rs b/sequencer_core/src/lib.rs index ee658742..561f5ef6 100644 --- a/sequencer_core/src/lib.rs +++ b/sequencer_core/src/lib.rs @@ -9,11 +9,9 @@ use common::{ transaction::{EncodedTransaction, NSSATransaction}, }; use config::SequencerConfig; -use indexer::message::Message; use log::warn; use mempool::{MemPool, MemPoolHandle}; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc::Receiver; use crate::{block_settlement_client::BlockSettlementClient, block_store::SequencerBlockStore}; @@ -27,8 +25,6 @@ pub struct SequencerCore { mempool: MemPool, sequencer_config: SequencerConfig, chain_height: u64, - #[expect(unused, reason = "No logic here for now")] - receiver: Option>, block_settlement_client: Option, } @@ -48,10 +44,7 @@ impl std::error::Error for TransactionMalformationError {} impl SequencerCore { /// Start Sequencer from configuration and construct transaction sender - pub fn start_from_config( - config: SequencerConfig, - receiver: Option>, - ) -> (Self, MemPoolHandle) { + pub fn start_from_config(config: SequencerConfig) -> (Self, MemPoolHandle) { let hashable_data = HashableBlockData { block_id: config.genesis_id, transactions: vec![], @@ -107,7 +100,6 @@ impl SequencerCore { mempool, chain_height: config.genesis_id, sequencer_config: config, - receiver, block_settlement_client: block_settlement, }; @@ -350,7 +342,7 @@ mod tests { async fn common_setup_with_config( config: SequencerConfig, ) -> (SequencerCore, MemPoolHandle) { - let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config, None); + let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config); let tx = common::test_utils::produce_dummy_empty_transaction(); mempool_handle.push(tx).await.unwrap(); @@ -365,7 +357,7 @@ mod tests { #[test] fn test_start_from_config() { let config = setup_sequencer_config(); - let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone(), None); + let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone()); assert_eq!(sequencer.chain_height, config.genesis_id); assert_eq!(sequencer.sequencer_config.max_num_tx_in_block, 10); @@ -424,7 +416,7 @@ mod tests { let initial_accounts = vec![initial_acc1, initial_acc2]; let config = setup_sequencer_config_variable_initial_accounts(initial_accounts); - let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone(), None); + let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone()); let acc1_account_id = config.initial_accounts[0] .account_id @@ -760,8 +752,7 @@ mod tests { // from `acc_1` to `acc_2`. The block created with that transaction will be kept stored in // the temporary directory for the block storage of this test. { - let (mut sequencer, mempool_handle) = - SequencerCore::start_from_config(config.clone(), None); + let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config.clone()); let signing_key = PrivateKey::try_new([1; 32]).unwrap(); let tx = common::test_utils::create_transaction_native_token_transfer( @@ -786,7 +777,7 @@ mod tests { // Instantiating a new sequencer from the same config. This should load the existing block // with the above transaction and update the state to reflect that. - let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone(), None); + let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone()); let balance_acc_1 = sequencer.state.get_account_by_id(&acc1_account_id).balance; let balance_acc_2 = sequencer.state.get_account_by_id(&acc2_account_id).balance; diff --git a/sequencer_rpc/Cargo.toml b/sequencer_rpc/Cargo.toml index cf95b009..e971dca8 100644 --- a/sequencer_rpc/Cargo.toml +++ b/sequencer_rpc/Cargo.toml @@ -8,7 +8,6 @@ nssa.workspace = true common.workspace = true mempool.workspace = true sequencer_core.workspace = true -indexer.workspace = true bedrock_client.workspace = true anyhow.workspace = true diff --git a/sequencer_rpc/src/lib.rs b/sequencer_rpc/src/lib.rs index 4f39f957..89b3e8cd 100644 --- a/sequencer_rpc/src/lib.rs +++ b/sequencer_rpc/src/lib.rs @@ -8,7 +8,6 @@ use common::{ rpc_primitives::errors::{RpcError, RpcErrorKind}, transaction::EncodedTransaction, }; -use indexer::state::IndexerState; use mempool::MemPoolHandle; pub use net_utils::*; use sequencer_core::SequencerCore; @@ -21,9 +20,6 @@ use self::types::err_rpc::RpcErr; // ToDo: Add necessary fields pub struct JsonHandler { sequencer_state: Arc>, - // No meaningfull functionality for now. - #[allow(unused)] - indexer_state: Option, mempool_handle: MemPoolHandle, } diff --git a/sequencer_rpc/src/net_utils.rs b/sequencer_rpc/src/net_utils.rs index 0198a8dc..8b9b7e64 100644 --- a/sequencer_rpc/src/net_utils.rs +++ b/sequencer_rpc/src/net_utils.rs @@ -7,7 +7,6 @@ use common::{ transaction::EncodedTransaction, }; use futures::{Future, FutureExt}; -use indexer::state::IndexerState; use log::info; use mempool::MemPoolHandle; use sequencer_core::SequencerCore; @@ -47,7 +46,6 @@ pub fn new_http_server( config: RpcConfig, seuquencer_core: Arc>, mempool_handle: MemPoolHandle, - indexer_core: Option, ) -> io::Result<(actix_web::dev::Server, SocketAddr)> { let RpcConfig { addr, @@ -57,7 +55,6 @@ pub fn new_http_server( info!(target:NETWORK, "Starting HTTP server at {addr}"); let handler = web::Data::new(JsonHandler { sequencer_state: seuquencer_core.clone(), - indexer_state: indexer_core.clone(), mempool_handle, }); diff --git a/sequencer_rpc/src/process.rs b/sequencer_rpc/src/process.rs index 99407215..aa1c71c2 100644 --- a/sequencer_rpc/src/process.rs +++ b/sequencer_rpc/src/process.rs @@ -16,10 +16,10 @@ use common::{ GetBlockDataRequest, GetBlockDataResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse, GetGenesisIdRequest, GetGenesisIdResponse, GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse, - GetLastSeenL2BlockAtIndexerRequest, GetLastSeenL2BlockResponse, GetProgramIdsRequest, - GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse, - GetTransactionByHashRequest, GetTransactionByHashResponse, HelloRequest, HelloResponse, - SendTxRequest, SendTxResponse, + GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, + GetProofForCommitmentResponse, GetTransactionByHashRequest, + GetTransactionByHashResponse, HelloRequest, HelloResponse, PostIndexerMessageRequest, + PostIndexerMessageResponse, SendTxRequest, SendTxResponse, }, }, transaction::{EncodedTransaction, NSSATransaction}, @@ -44,7 +44,7 @@ pub const GET_ACCOUNTS_NONCES: &str = "get_accounts_nonces"; pub const GET_ACCOUNT: &str = "get_account"; pub const GET_PROOF_FOR_COMMITMENT: &str = "get_proof_for_commitment"; pub const GET_PROGRAM_IDS: &str = "get_program_ids"; -pub const GET_LAST_SEEN_L2_BLOCK_AT_INDEXER: &str = "get_last_seen_l2_block_at_indexer"; +pub const POST_INDEXER_MESSAGE: &str = "post_indexer_message"; pub const HELLO_FROM_SEQUENCER: &str = "HELLO_FROM_SEQUENCER"; @@ -315,24 +315,15 @@ impl JsonHandler { respond(response) } - async fn process_get_last_seen_l2_block_at_indexer( - &self, - request: Request, - ) -> Result { - let _get_last_req = GetLastSeenL2BlockAtIndexerRequest::parse(Some(request.params))?; + async fn process_indexer_message(&self, request: Request) -> Result { + let _indexer_post_req = PostIndexerMessageRequest::parse(Some(request.params))?; - let last_block = { - if let Some(indexer_state) = &self.indexer_state { - let last_seen_block = indexer_state.latest_seen_block.read().await; + // ToDo: Add indexer messages handling - Some(*last_seen_block) - } else { - None - } + let response = PostIndexerMessageResponse { + status: "Success".to_string(), }; - let response = GetLastSeenL2BlockResponse { last_block }; - respond(response) } @@ -351,10 +342,7 @@ impl JsonHandler { GET_TRANSACTION_BY_HASH => self.process_get_transaction_by_hash(request).await, GET_PROOF_FOR_COMMITMENT => self.process_get_proof_by_commitment(request).await, GET_PROGRAM_IDS => self.process_get_program_ids(request).await, - GET_LAST_SEEN_L2_BLOCK_AT_INDEXER => { - self.process_get_last_seen_l2_block_at_indexer(request) - .await - } + POST_INDEXER_MESSAGE => self.process_indexer_message(request).await, _ => Err(RpcErr(RpcError::method_not_found(request.method))), } } @@ -366,9 +354,7 @@ mod tests { use base58::ToBase58; use base64::{Engine, engine::general_purpose}; - use bedrock_client::BasicAuthCredentials; use common::{test_utils::sequencer_sign_key_for_testing, transaction::EncodedTransaction}; - use indexer::{IndexerCore, config::IndexerConfig}; use sequencer_core::{ SequencerCore, config::{AccountInitialData, BedrockConfig, SequencerConfig}, @@ -421,34 +407,14 @@ mod tests { node_url: "http://localhost:8080".to_string(), user: "user".to_string(), password: None, - indexer_config: IndexerConfig { - resubscribe_interval_millis: 100, - start_delay_millis: 1000, - max_retries: 10, - }, }), } } async fn components_for_tests() -> (JsonHandler, Vec, EncodedTransaction) { let config = sequencer_config_for_tests(); - let bedrock_config = config.bedrock_config.clone().unwrap(); - let (sender, receiver) = tokio::sync::mpsc::channel(100); - let indexer_core = IndexerCore::new( - &bedrock_config.node_url, - Some(BasicAuthCredentials::new( - bedrock_config.user.clone(), - bedrock_config.password.clone(), - )), - sender, - bedrock_config.indexer_config.clone(), - bedrock_config.channel_id, - ) - .unwrap(); - - let (mut sequencer_core, mempool_handle) = - SequencerCore::start_from_config(config, Some(receiver)); + let (mut sequencer_core, mempool_handle) = SequencerCore::start_from_config(config); let initial_accounts = sequencer_core.sequencer_config().initial_accounts.clone(); let signing_key = nssa::PrivateKey::try_new([1; 32]).unwrap(); @@ -478,7 +444,6 @@ mod tests { ( JsonHandler { sequencer_state: sequencer_core, - indexer_state: Some(indexer_core.state.clone()), mempool_handle, }, initial_accounts, diff --git a/sequencer_runner/Cargo.toml b/sequencer_runner/Cargo.toml index d98ecf66..0e0b2b92 100644 --- a/sequencer_runner/Cargo.toml +++ b/sequencer_runner/Cargo.toml @@ -7,7 +7,6 @@ edition = "2024" common.workspace = true sequencer_core = { workspace = true, features = ["testnet"] } sequencer_rpc.workspace = true -indexer.workspace = true bedrock_client.workspace = true clap = { workspace = true, features = ["derive", "env"] } diff --git a/sequencer_runner/src/lib.rs b/sequencer_runner/src/lib.rs index fead0e3f..fd4a6c08 100644 --- a/sequencer_runner/src/lib.rs +++ b/sequencer_runner/src/lib.rs @@ -2,11 +2,9 @@ use std::{net::SocketAddr, path::PathBuf, sync::Arc}; use actix_web::dev::ServerHandle; use anyhow::Result; -use bedrock_client::BasicAuthCredentials; use clap::Parser; use common::rpc_primitives::RpcConfig; -use indexer::IndexerCore; -use log::{error, info}; +use log::info; use sequencer_core::{SequencerCore, config::SequencerConfig}; use sequencer_rpc::new_http_server; use tokio::{sync::Mutex, task::JoinHandle}; @@ -22,51 +20,20 @@ struct Args { pub async fn startup_sequencer( app_config: SequencerConfig, -) -> Result<( - ServerHandle, - SocketAddr, - JoinHandle>, - Option>>, -)> { +) -> Result<(ServerHandle, SocketAddr, JoinHandle>)> { let block_timeout = app_config.block_create_timeout_millis; let port = app_config.port; - // ToDo: Maybe make buffer size configurable. - let (indexer_core, receiver) = if let Some(bedrock_config) = app_config.bedrock_config.clone() { - let (sender, receiver) = tokio::sync::mpsc::channel(100); - - let indexer_core = IndexerCore::new( - &bedrock_config.node_url, - Some(BasicAuthCredentials::new( - bedrock_config.user.clone(), - bedrock_config.password.clone(), - )), - sender, - bedrock_config.indexer_config.clone(), - bedrock_config.channel_id, - )?; - - info!("Indexer core set up"); - - (Some(indexer_core), Some(receiver)) - } else { - info!("Bedrock config not provided, skipping indexer setup"); - - (None, None) - }; - - let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config, receiver); + let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config); info!("Sequencer core set up"); - let indexer_state_wrapped = indexer_core.as_ref().map(|core| core.state.clone()); let seq_core_wrapped = Arc::new(Mutex::new(sequencer_core)); let (http_server, addr) = new_http_server( RpcConfig::with_port(port), Arc::clone(&seq_core_wrapped), mempool_handle, - indexer_state_wrapped, )?; info!("HTTP server started"); let http_server_handle = http_server.handle(); @@ -94,23 +61,7 @@ pub async fn startup_sequencer( } }); - let indexer_loop_handle = indexer_core.map(|indexer_core| { - tokio::spawn(async move { - match indexer_core.subscribe_parse_block_stream().await { - Ok(()) => unreachable!(), - Err(err) => error!("Indexer loop failed with error: {err:#?}"), - } - - Ok(()) - }) - }); - - Ok(( - http_server_handle, - addr, - main_loop_handle, - indexer_loop_handle, - )) + Ok((http_server_handle, addr, main_loop_handle)) } pub async fn main_runner() -> Result<()> { @@ -130,13 +81,9 @@ pub async fn main_runner() -> Result<()> { } // ToDo: Add restart on failures - let (_, _, main_loop_handle, indexer_loop_handle) = startup_sequencer(app_config).await?; + let (_, _, main_loop_handle) = startup_sequencer(app_config).await?; main_loop_handle.await??; - if let Some(indexer_loop_handle) = indexer_loop_handle { - indexer_loop_handle.await??; - } - Ok(()) }