fix: new model rewrite

This commit is contained in:
Pravdyvy 2026-01-22 14:44:48 +02:00
parent eb77217318
commit 2a2fe1347a
20 changed files with 125 additions and 228 deletions

3
Cargo.lock generated
View File

@ -5183,7 +5183,6 @@ dependencies = [
"chrono",
"common",
"futures",
"indexer",
"key-management-system-service",
"log",
"mempool",
@ -5213,7 +5212,6 @@ dependencies = [
"common",
"futures",
"hex",
"indexer",
"itertools 0.14.0",
"log",
"mempool",
@ -5236,7 +5234,6 @@ dependencies = [
"clap",
"common",
"env_logger",
"indexer",
"log",
"sequencer_core",
"sequencer_rpc",

View File

@ -0,0 +1,6 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Message {
L2BlockFinalized { l2_block_height: u64 },
}

View File

@ -0,0 +1 @@
pub mod indexer;

View File

@ -1,4 +1,5 @@
pub mod block;
pub mod communication;
pub mod error;
pub mod rpc_primitives;
pub mod sequencer_client;

View File

@ -74,7 +74,9 @@ pub struct GetProofForCommitmentRequest {
pub struct GetProgramIdsRequest {}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetLastSeenL2BlockAtIndexerRequest {}
pub struct PostIndexerMessageRequest {
pub message: crate::communication::indexer::Message,
}
parse_request!(HelloRequest);
parse_request!(RegisterAccountRequest);
@ -90,7 +92,7 @@ parse_request!(GetAccountsNoncesRequest);
parse_request!(GetProofForCommitmentRequest);
parse_request!(GetAccountRequest);
parse_request!(GetProgramIdsRequest);
parse_request!(GetLastSeenL2BlockAtIndexerRequest);
parse_request!(PostIndexerMessageRequest);
#[derive(Serialize, Deserialize, Debug)]
pub struct HelloResponse {
@ -222,6 +224,6 @@ pub struct GetInitialTestnetAccountsResponse {
}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetLastSeenL2BlockResponse {
pub last_block: Option<u64>,
pub struct PostIndexerMessageResponse {
pub status: String,
}

View File

@ -18,10 +18,10 @@ use crate::{
GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest,
GetAccountsNoncesResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse,
GetInitialTestnetAccountsResponse, GetLastBlockRequest, GetLastBlockResponse,
GetLastSeenL2BlockAtIndexerRequest, GetLastSeenL2BlockResponse, GetProgramIdsRequest,
GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse,
GetTransactionByHashRequest, GetTransactionByHashResponse, SendTxRequest,
SendTxResponse,
GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest,
GetProofForCommitmentResponse, GetTransactionByHashRequest,
GetTransactionByHashResponse, PostIndexerMessageRequest, PostIndexerMessageResponse,
SendTxRequest, SendTxResponse,
},
},
transaction::{EncodedTransaction, NSSATransaction},
@ -350,15 +350,16 @@ impl SequencerClient {
}
/// Get last seen l2 block at indexer
pub async fn get_last_seen_l2_block_at_indexer(
pub async fn post_indexer_message(
&self,
) -> Result<GetLastSeenL2BlockResponse, SequencerClientError> {
let last_req = GetLastSeenL2BlockAtIndexerRequest {};
message: crate::communication::indexer::Message,
) -> Result<PostIndexerMessageResponse, SequencerClientError> {
let last_req = PostIndexerMessageRequest { message };
let req = serde_json::to_value(last_req).unwrap();
let resp = self
.call_method_with_payload("get_last_seen_l2_block_at_indexer", req)
.call_method_with_payload("post_indexer_message", req)
.await
.unwrap();

View File

@ -1,9 +1,20 @@
use nomos_core::mantle::ops::channel::ChannelId;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
/// ToDo: Expand if necessary
pub struct ClientConfig {
pub addr: String,
pub auth: Option<(String, Option<String>)>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
/// Note: For individual RPC requests we use Fibonacci backoff retry strategy
pub struct IndexerConfig {
pub resubscribe_interval_millis: u64,
pub start_delay_millis: u64,
pub max_retries: usize,
pub bedrock_client_config: ClientConfig,
pub sequencer_client_config: ClientConfig,
pub channel_id: ChannelId,
}

View File

@ -2,45 +2,49 @@ use std::sync::Arc;
use anyhow::Result;
use bedrock_client::{BasicAuthCredentials, BedrockClient};
use common::block::HashableBlockData;
use common::{
block::HashableBlockData, communication::indexer::Message,
rpc_primitives::requests::PostIndexerMessageResponse, sequencer_client::SequencerClient,
};
use futures::StreamExt;
use log::info;
use nomos_core::mantle::{
Op, SignedMantleTx,
ops::channel::{ChannelId, inscribe::InscriptionOp},
};
use tokio::sync::{RwLock, mpsc::Sender};
use tokio::sync::RwLock;
use url::Url;
use crate::{config::IndexerConfig, message::Message, state::IndexerState};
use crate::{config::IndexerConfig, state::IndexerState};
pub mod config;
pub mod message;
pub mod state;
pub struct IndexerCore {
pub bedrock_client: BedrockClient,
pub channel_sender: Sender<Message>,
pub sequencer_client: SequencerClient,
pub config: IndexerConfig,
// ToDo: Remove this duplication by unifying addr representation in all clients.
pub bedrock_url: Url,
pub channel_id: ChannelId,
pub state: IndexerState,
}
impl IndexerCore {
pub fn new(
addr: &str,
auth: Option<BasicAuthCredentials>,
sender: Sender<Message>,
config: IndexerConfig,
channel_id: ChannelId,
) -> Result<Self> {
pub fn new(config: IndexerConfig) -> Result<Self> {
Ok(Self {
bedrock_client: BedrockClient::new(auth)?,
bedrock_url: Url::parse(addr)?,
channel_sender: sender,
bedrock_client: BedrockClient::new(
config
.bedrock_client_config
.auth
.clone()
.map(|auth| BasicAuthCredentials::new(auth.0, auth.1)),
)?,
bedrock_url: Url::parse(&config.bedrock_client_config.addr)?,
sequencer_client: SequencerClient::new_with_auth(
config.sequencer_client_config.addr.clone(),
config.sequencer_client_config.auth.clone(),
)?,
config,
channel_id,
// No state setup for now, future task.
state: IndexerState {
latest_seen_block: Arc::new(RwLock::new(0)),
@ -75,8 +79,10 @@ impl IndexerCore {
{
info!("Extracted L1 block at height {}", block_info.height);
let l2_blocks_parsed =
parse_blocks(l1_block.into_transactions().into_iter(), &self.channel_id);
let l2_blocks_parsed = parse_blocks(
l1_block.into_transactions().into_iter(),
&self.config.channel_id,
);
for l2_block in l2_blocks_parsed {
// State modification, will be updated in future
@ -88,14 +94,13 @@ impl IndexerCore {
}
// Sending data into sequencer, may need to be expanded.
let message = Message::BlockObserved {
l1_block_id: block_info.height,
let message = Message::L2BlockFinalized {
l2_block_height: l2_block.block_id,
};
self.channel_sender.send(message.clone()).await?;
let status = self.send_message_to_sequencer(message.clone()).await?;
info!("Sent message {:#?} to sequencer", message);
info!("Sent message {message:#?} to sequencer; status {status:#?}");
}
}
}
@ -107,6 +112,13 @@ impl IndexerCore {
.await;
}
}
pub async fn send_message_to_sequencer(
&self,
message: Message,
) -> Result<PostIndexerMessageResponse> {
Ok(self.sequencer_client.post_indexer_message(message).await?)
}
}
fn parse_blocks(

View File

@ -1,7 +0,0 @@
#[derive(Debug, Clone)]
pub enum Message {
BlockObserved {
l1_block_id: u64,
l2_block_height: u64,
},
}

View File

@ -38,7 +38,6 @@ static LOGGER: LazyLock<()> = LazyLock::new(env_logger::init);
pub struct TestContext {
sequencer_server_handle: ServerHandle,
sequencer_loop_handle: JoinHandle<Result<()>>,
indexer_loop_handle: Option<JoinHandle<Result<()>>>,
sequencer_client: SequencerClient,
wallet: WalletCore,
_temp_sequencer_dir: TempDir,
@ -82,15 +81,10 @@ impl TestContext {
debug!("Test context setup");
let (
sequencer_server_handle,
sequencer_addr,
sequencer_loop_handle,
temp_sequencer_dir,
indexer_loop_handle,
) = Self::setup_sequencer(sequencer_config)
.await
.context("Failed to setup sequencer")?;
let (sequencer_server_handle, sequencer_addr, sequencer_loop_handle, temp_sequencer_dir) =
Self::setup_sequencer(sequencer_config)
.await
.context("Failed to setup sequencer")?;
// Convert 0.0.0.0 to 127.0.0.1 for client connections
// When binding to port 0, the server binds to 0.0.0.0:<random_port>
@ -111,7 +105,6 @@ impl TestContext {
Ok(Self {
sequencer_server_handle,
sequencer_loop_handle,
indexer_loop_handle,
sequencer_client,
wallet,
_temp_sequencer_dir: temp_sequencer_dir,
@ -121,13 +114,7 @@ impl TestContext {
async fn setup_sequencer(
mut config: SequencerConfig,
) -> Result<(
ServerHandle,
SocketAddr,
JoinHandle<Result<()>>,
TempDir,
Option<JoinHandle<Result<()>>>,
)> {
) -> Result<(ServerHandle, SocketAddr, JoinHandle<Result<()>>, TempDir)> {
let temp_sequencer_dir =
tempfile::tempdir().context("Failed to create temp dir for sequencer home")?;
@ -139,7 +126,7 @@ impl TestContext {
// Setting port to 0 lets the OS choose a free port for us
config.port = 0;
let (sequencer_server_handle, sequencer_addr, sequencer_loop_handle, indexer_loop_handle) =
let (sequencer_server_handle, sequencer_addr, sequencer_loop_handle) =
sequencer_runner::startup_sequencer(config).await?;
Ok((
@ -147,7 +134,6 @@ impl TestContext {
sequencer_addr,
sequencer_loop_handle,
temp_sequencer_dir,
indexer_loop_handle,
))
}
@ -207,7 +193,6 @@ impl Drop for TestContext {
let Self {
sequencer_server_handle,
sequencer_loop_handle,
indexer_loop_handle,
sequencer_client: _,
wallet: _,
_temp_sequencer_dir,
@ -215,9 +200,6 @@ impl Drop for TestContext {
} = self;
sequencer_loop_handle.abort();
if let Some(handle) = indexer_loop_handle {
handle.abort();
}
// Can't wait here as Drop can't be async, but anyway stop signal should be sent
sequencer_server_handle.stop(true).now_or_never();

View File

@ -1,33 +1,34 @@
use anyhow::Result;
use integration_tests::TestContext;
use log::info;
use tokio::test;
// use anyhow::Result;
// use integration_tests::TestContext;
// use log::info;
// use tokio::test;
#[ignore = "needs complicated setup"]
#[test]
/// To run this test properly, you need nomos node running in the background.
/// For instructions in building nomos node, refer to [this](https://github.com/logos-blockchain/logos-blockchain?tab=readme-ov-file#running-a-logos-blockchain-node).
///
/// Recommended to run node locally from build binary.
async fn indexer_run_local_node() -> Result<()> {
let ctx = TestContext::new_bedrock_local_attached().await?;
// ToDo: Uncomment when indexer RPC is available
//#[ignore = "needs complicated setup"]
//#[test]
// To run this test properly, you need nomos node running in the background.
// For instructions in building nomos node, refer to [this](https://github.com/logos-blockchain/logos-blockchain?tab=readme-ov-file#running-a-logos-blockchain-node).
//
// Recommended to run node locally from build binary.
// async fn indexer_run_local_node() -> Result<()> {
// let ctx = TestContext::new_bedrock_local_attached().await?;
info!("Let's observe behaviour");
// info!("Let's observe behaviour");
tokio::time::sleep(std::time::Duration::from_secs(180)).await;
// tokio::time::sleep(std::time::Duration::from_secs(180)).await;
let gen_id = ctx
.sequencer_client()
.get_last_seen_l2_block_at_indexer()
.await
.unwrap()
.last_block
.unwrap();
// let gen_id = ctx
// .sequencer_client()
// .get_last_seen_l2_block_at_indexer()
// .await
// .unwrap()
// .last_block
// .unwrap();
// Checking, that some blocks are landed on bedrock
assert!(gen_id > 0);
// // Checking, that some blocks are landed on bedrock
// assert!(gen_id > 0);
info!("Last seen L2 block at indexer is {gen_id}");
// info!("Last seen L2 block at indexer is {gen_id}");
Ok(())
}
// Ok(())
// }

View File

@ -9,7 +9,6 @@ nssa_core.workspace = true
common.workspace = true
storage.workspace = true
mempool.workspace = true
indexer.workspace = true
base58.workspace = true
anyhow.workspace = true

View File

@ -5,7 +5,6 @@ use std::{
};
use anyhow::Result;
use indexer::config::IndexerConfig;
use nomos_core::mantle::ops::channel::ChannelId;
use serde::{Deserialize, Serialize};
@ -63,8 +62,6 @@ pub struct BedrockConfig {
pub user: String,
/// Bedrock password(optional)
pub password: Option<String>,
/// Indexer config
pub indexer_config: IndexerConfig,
}
impl SequencerConfig {

View File

@ -9,11 +9,9 @@ use common::{
transaction::{EncodedTransaction, NSSATransaction},
};
use config::SequencerConfig;
use indexer::message::Message;
use log::warn;
use mempool::{MemPool, MemPoolHandle};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::Receiver;
use crate::{block_settlement_client::BlockSettlementClient, block_store::SequencerBlockStore};
@ -27,8 +25,6 @@ pub struct SequencerCore {
mempool: MemPool<EncodedTransaction>,
sequencer_config: SequencerConfig,
chain_height: u64,
#[expect(unused, reason = "No logic here for now")]
receiver: Option<Receiver<Message>>,
block_settlement_client: Option<BlockSettlementClient>,
}
@ -48,10 +44,7 @@ impl std::error::Error for TransactionMalformationError {}
impl SequencerCore {
/// Start Sequencer from configuration and construct transaction sender
pub fn start_from_config(
config: SequencerConfig,
receiver: Option<Receiver<Message>>,
) -> (Self, MemPoolHandle<EncodedTransaction>) {
pub fn start_from_config(config: SequencerConfig) -> (Self, MemPoolHandle<EncodedTransaction>) {
let hashable_data = HashableBlockData {
block_id: config.genesis_id,
transactions: vec![],
@ -107,7 +100,6 @@ impl SequencerCore {
mempool,
chain_height: config.genesis_id,
sequencer_config: config,
receiver,
block_settlement_client: block_settlement,
};
@ -350,7 +342,7 @@ mod tests {
async fn common_setup_with_config(
config: SequencerConfig,
) -> (SequencerCore, MemPoolHandle<EncodedTransaction>) {
let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config, None);
let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config);
let tx = common::test_utils::produce_dummy_empty_transaction();
mempool_handle.push(tx).await.unwrap();
@ -365,7 +357,7 @@ mod tests {
#[test]
fn test_start_from_config() {
let config = setup_sequencer_config();
let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone(), None);
let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone());
assert_eq!(sequencer.chain_height, config.genesis_id);
assert_eq!(sequencer.sequencer_config.max_num_tx_in_block, 10);
@ -424,7 +416,7 @@ mod tests {
let initial_accounts = vec![initial_acc1, initial_acc2];
let config = setup_sequencer_config_variable_initial_accounts(initial_accounts);
let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone(), None);
let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone());
let acc1_account_id = config.initial_accounts[0]
.account_id
@ -760,8 +752,7 @@ mod tests {
// from `acc_1` to `acc_2`. The block created with that transaction will be kept stored in
// the temporary directory for the block storage of this test.
{
let (mut sequencer, mempool_handle) =
SequencerCore::start_from_config(config.clone(), None);
let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config.clone());
let signing_key = PrivateKey::try_new([1; 32]).unwrap();
let tx = common::test_utils::create_transaction_native_token_transfer(
@ -786,7 +777,7 @@ mod tests {
// Instantiating a new sequencer from the same config. This should load the existing block
// with the above transaction and update the state to reflect that.
let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone(), None);
let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone());
let balance_acc_1 = sequencer.state.get_account_by_id(&acc1_account_id).balance;
let balance_acc_2 = sequencer.state.get_account_by_id(&acc2_account_id).balance;

View File

@ -8,7 +8,6 @@ nssa.workspace = true
common.workspace = true
mempool.workspace = true
sequencer_core.workspace = true
indexer.workspace = true
bedrock_client.workspace = true
anyhow.workspace = true

View File

@ -8,7 +8,6 @@ use common::{
rpc_primitives::errors::{RpcError, RpcErrorKind},
transaction::EncodedTransaction,
};
use indexer::state::IndexerState;
use mempool::MemPoolHandle;
pub use net_utils::*;
use sequencer_core::SequencerCore;
@ -21,9 +20,6 @@ use self::types::err_rpc::RpcErr;
// ToDo: Add necessary fields
pub struct JsonHandler {
sequencer_state: Arc<Mutex<SequencerCore>>,
// No meaningfull functionality for now.
#[allow(unused)]
indexer_state: Option<IndexerState>,
mempool_handle: MemPoolHandle<EncodedTransaction>,
}

View File

@ -7,7 +7,6 @@ use common::{
transaction::EncodedTransaction,
};
use futures::{Future, FutureExt};
use indexer::state::IndexerState;
use log::info;
use mempool::MemPoolHandle;
use sequencer_core::SequencerCore;
@ -47,7 +46,6 @@ pub fn new_http_server(
config: RpcConfig,
seuquencer_core: Arc<Mutex<SequencerCore>>,
mempool_handle: MemPoolHandle<EncodedTransaction>,
indexer_core: Option<IndexerState>,
) -> io::Result<(actix_web::dev::Server, SocketAddr)> {
let RpcConfig {
addr,
@ -57,7 +55,6 @@ pub fn new_http_server(
info!(target:NETWORK, "Starting HTTP server at {addr}");
let handler = web::Data::new(JsonHandler {
sequencer_state: seuquencer_core.clone(),
indexer_state: indexer_core.clone(),
mempool_handle,
});

View File

@ -16,10 +16,10 @@ use common::{
GetBlockDataRequest, GetBlockDataResponse, GetBlockRangeDataRequest,
GetBlockRangeDataResponse, GetGenesisIdRequest, GetGenesisIdResponse,
GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse,
GetLastSeenL2BlockAtIndexerRequest, GetLastSeenL2BlockResponse, GetProgramIdsRequest,
GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse,
GetTransactionByHashRequest, GetTransactionByHashResponse, HelloRequest, HelloResponse,
SendTxRequest, SendTxResponse,
GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest,
GetProofForCommitmentResponse, GetTransactionByHashRequest,
GetTransactionByHashResponse, HelloRequest, HelloResponse, PostIndexerMessageRequest,
PostIndexerMessageResponse, SendTxRequest, SendTxResponse,
},
},
transaction::{EncodedTransaction, NSSATransaction},
@ -44,7 +44,7 @@ pub const GET_ACCOUNTS_NONCES: &str = "get_accounts_nonces";
pub const GET_ACCOUNT: &str = "get_account";
pub const GET_PROOF_FOR_COMMITMENT: &str = "get_proof_for_commitment";
pub const GET_PROGRAM_IDS: &str = "get_program_ids";
pub const GET_LAST_SEEN_L2_BLOCK_AT_INDEXER: &str = "get_last_seen_l2_block_at_indexer";
pub const POST_INDEXER_MESSAGE: &str = "post_indexer_message";
pub const HELLO_FROM_SEQUENCER: &str = "HELLO_FROM_SEQUENCER";
@ -315,24 +315,15 @@ impl JsonHandler {
respond(response)
}
async fn process_get_last_seen_l2_block_at_indexer(
&self,
request: Request,
) -> Result<Value, RpcErr> {
let _get_last_req = GetLastSeenL2BlockAtIndexerRequest::parse(Some(request.params))?;
async fn process_indexer_message(&self, request: Request) -> Result<Value, RpcErr> {
let _indexer_post_req = PostIndexerMessageRequest::parse(Some(request.params))?;
let last_block = {
if let Some(indexer_state) = &self.indexer_state {
let last_seen_block = indexer_state.latest_seen_block.read().await;
// ToDo: Add indexer messages handling
Some(*last_seen_block)
} else {
None
}
let response = PostIndexerMessageResponse {
status: "Success".to_string(),
};
let response = GetLastSeenL2BlockResponse { last_block };
respond(response)
}
@ -351,10 +342,7 @@ impl JsonHandler {
GET_TRANSACTION_BY_HASH => self.process_get_transaction_by_hash(request).await,
GET_PROOF_FOR_COMMITMENT => self.process_get_proof_by_commitment(request).await,
GET_PROGRAM_IDS => self.process_get_program_ids(request).await,
GET_LAST_SEEN_L2_BLOCK_AT_INDEXER => {
self.process_get_last_seen_l2_block_at_indexer(request)
.await
}
POST_INDEXER_MESSAGE => self.process_indexer_message(request).await,
_ => Err(RpcErr(RpcError::method_not_found(request.method))),
}
}
@ -366,9 +354,7 @@ mod tests {
use base58::ToBase58;
use base64::{Engine, engine::general_purpose};
use bedrock_client::BasicAuthCredentials;
use common::{test_utils::sequencer_sign_key_for_testing, transaction::EncodedTransaction};
use indexer::{IndexerCore, config::IndexerConfig};
use sequencer_core::{
SequencerCore,
config::{AccountInitialData, BedrockConfig, SequencerConfig},
@ -421,34 +407,14 @@ mod tests {
node_url: "http://localhost:8080".to_string(),
user: "user".to_string(),
password: None,
indexer_config: IndexerConfig {
resubscribe_interval_millis: 100,
start_delay_millis: 1000,
max_retries: 10,
},
}),
}
}
async fn components_for_tests() -> (JsonHandler, Vec<AccountInitialData>, EncodedTransaction) {
let config = sequencer_config_for_tests();
let bedrock_config = config.bedrock_config.clone().unwrap();
let (sender, receiver) = tokio::sync::mpsc::channel(100);
let indexer_core = IndexerCore::new(
&bedrock_config.node_url,
Some(BasicAuthCredentials::new(
bedrock_config.user.clone(),
bedrock_config.password.clone(),
)),
sender,
bedrock_config.indexer_config.clone(),
bedrock_config.channel_id,
)
.unwrap();
let (mut sequencer_core, mempool_handle) =
SequencerCore::start_from_config(config, Some(receiver));
let (mut sequencer_core, mempool_handle) = SequencerCore::start_from_config(config);
let initial_accounts = sequencer_core.sequencer_config().initial_accounts.clone();
let signing_key = nssa::PrivateKey::try_new([1; 32]).unwrap();
@ -478,7 +444,6 @@ mod tests {
(
JsonHandler {
sequencer_state: sequencer_core,
indexer_state: Some(indexer_core.state.clone()),
mempool_handle,
},
initial_accounts,

View File

@ -7,7 +7,6 @@ edition = "2024"
common.workspace = true
sequencer_core = { workspace = true, features = ["testnet"] }
sequencer_rpc.workspace = true
indexer.workspace = true
bedrock_client.workspace = true
clap = { workspace = true, features = ["derive", "env"] }

View File

@ -2,11 +2,9 @@ use std::{net::SocketAddr, path::PathBuf, sync::Arc};
use actix_web::dev::ServerHandle;
use anyhow::Result;
use bedrock_client::BasicAuthCredentials;
use clap::Parser;
use common::rpc_primitives::RpcConfig;
use indexer::IndexerCore;
use log::{error, info};
use log::info;
use sequencer_core::{SequencerCore, config::SequencerConfig};
use sequencer_rpc::new_http_server;
use tokio::{sync::Mutex, task::JoinHandle};
@ -22,51 +20,20 @@ struct Args {
pub async fn startup_sequencer(
app_config: SequencerConfig,
) -> Result<(
ServerHandle,
SocketAddr,
JoinHandle<Result<()>>,
Option<JoinHandle<Result<()>>>,
)> {
) -> Result<(ServerHandle, SocketAddr, JoinHandle<Result<()>>)> {
let block_timeout = app_config.block_create_timeout_millis;
let port = app_config.port;
// ToDo: Maybe make buffer size configurable.
let (indexer_core, receiver) = if let Some(bedrock_config) = app_config.bedrock_config.clone() {
let (sender, receiver) = tokio::sync::mpsc::channel(100);
let indexer_core = IndexerCore::new(
&bedrock_config.node_url,
Some(BasicAuthCredentials::new(
bedrock_config.user.clone(),
bedrock_config.password.clone(),
)),
sender,
bedrock_config.indexer_config.clone(),
bedrock_config.channel_id,
)?;
info!("Indexer core set up");
(Some(indexer_core), Some(receiver))
} else {
info!("Bedrock config not provided, skipping indexer setup");
(None, None)
};
let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config, receiver);
let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config);
info!("Sequencer core set up");
let indexer_state_wrapped = indexer_core.as_ref().map(|core| core.state.clone());
let seq_core_wrapped = Arc::new(Mutex::new(sequencer_core));
let (http_server, addr) = new_http_server(
RpcConfig::with_port(port),
Arc::clone(&seq_core_wrapped),
mempool_handle,
indexer_state_wrapped,
)?;
info!("HTTP server started");
let http_server_handle = http_server.handle();
@ -94,23 +61,7 @@ pub async fn startup_sequencer(
}
});
let indexer_loop_handle = indexer_core.map(|indexer_core| {
tokio::spawn(async move {
match indexer_core.subscribe_parse_block_stream().await {
Ok(()) => unreachable!(),
Err(err) => error!("Indexer loop failed with error: {err:#?}"),
}
Ok(())
})
});
Ok((
http_server_handle,
addr,
main_loop_handle,
indexer_loop_handle,
))
Ok((http_server_handle, addr, main_loop_handle))
}
pub async fn main_runner() -> Result<()> {
@ -130,13 +81,9 @@ pub async fn main_runner() -> Result<()> {
}
// ToDo: Add restart on failures
let (_, _, main_loop_handle, indexer_loop_handle) = startup_sequencer(app_config).await?;
let (_, _, main_loop_handle) = startup_sequencer(app_config).await?;
main_loop_handle.await??;
if let Some(indexer_loop_handle) = indexer_loop_handle {
indexer_loop_handle.await??;
}
Ok(())
}