Fix some errors

This commit is contained in:
Daniil Polyakov 2026-01-31 01:39:59 +03:00
parent 71787a70f7
commit a87db600cf
5 changed files with 35 additions and 31 deletions

View File

@ -2,7 +2,7 @@ use std::sync::Arc;
use anyhow::Result;
use bedrock_client::BedrockClient;
use common::block::{Block, };
use common::block::Block;
use futures::StreamExt;
use log::info;
use logos_blockchain_core::mantle::{
@ -38,9 +38,7 @@ impl IndexerCore {
})
}
pub async fn subscribe_parse_block_stream(
&self,
) -> impl futures::Stream<Item = Result<Block>> {
pub async fn subscribe_parse_block_stream(&self) -> impl futures::Stream<Item = Result<Block>> {
async_stream::stream! {
loop {
let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?);

View File

@ -63,7 +63,7 @@ async fn run_server(config_path: PathBuf, port: u16) -> Result<jsonrpsee::server
let service = indexer_service::service::IndexerService::new(config)
.context("Failed to initialize indexer service")?;
server.start(service.into_rpc())
}?;
};
#[cfg(feature = "mock-responses")]
let handle = server.start(
indexer_service::mock_service::MockIndexerService::new_with_mock_blocks().into_rpc(),

View File

@ -1,4 +1,4 @@
use std::{fmt::Display, time::Instant};
use std::{fmt::Display, sync::Arc, time::Instant};
use anyhow::Result;
#[cfg(feature = "testnet")]
@ -19,7 +19,7 @@ mod block_settlement_client;
pub mod block_store;
pub mod config;
type IndexerClient = jsonrpsee::http_client::HttpClient;
type IndexerClient = Arc<jsonrpsee::ws_client::WsClient>;
pub struct SequencerCore {
state: nssa::V02State,
@ -52,7 +52,9 @@ impl SequencerCore {
/// assumed to represent the correct latest state consistent with Bedrock-finalized data.
/// If no database is found, the sequencer performs a fresh start from genesis,
/// initializing its state with the accounts defined in the configuration file.
pub fn start_from_config(config: SequencerConfig) -> (Self, MemPoolHandle<EncodedTransaction>) {
pub async fn start_from_config(
config: SequencerConfig,
) -> (Self, MemPoolHandle<EncodedTransaction>) {
let hashable_data = HashableBlockData {
block_id: config.genesis_id,
transactions: vec![],
@ -116,9 +118,12 @@ impl SequencerCore {
.expect("Block settlement client should be constructible")
});
let indexer_client = jsonrpsee::http_client::HttpClientBuilder::default()
.build(config.indexer_rpc_url.clone())
.expect("Failed to create Indexer client");
let indexer_client = Arc::new(
jsonrpsee::ws_client::WsClientBuilder::default()
.build(config.indexer_rpc_url.clone())
.await
.expect("Failed to create Indexer client"),
);
let sequencer_core = Self {
state,
@ -275,8 +280,8 @@ impl SequencerCore {
self.block_settlement_client.clone()
}
pub fn indexer_client(&self) -> &IndexerClient {
&self.indexer_client
pub fn indexer_client(&self) -> IndexerClient {
Arc::clone(&self.indexer_client)
}
}
@ -387,7 +392,7 @@ mod tests {
async fn common_setup_with_config(
config: SequencerConfig,
) -> (SequencerCore, MemPoolHandle<EncodedTransaction>) {
let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config);
let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config).await;
let tx = common::test_utils::produce_dummy_empty_transaction();
mempool_handle.push(tx).await.unwrap();
@ -399,10 +404,10 @@ mod tests {
(sequencer, mempool_handle)
}
#[test]
fn test_start_from_config() {
#[tokio::test]
async fn test_start_from_config() {
let config = setup_sequencer_config();
let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone());
let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone()).await;
assert_eq!(sequencer.chain_height, config.genesis_id);
assert_eq!(sequencer.sequencer_config.max_num_tx_in_block, 10);
@ -436,8 +441,8 @@ mod tests {
assert_eq!(20000, balance_acc_2);
}
#[test]
fn test_start_different_intial_accounts_balances() {
#[tokio::test]
async fn test_start_different_intial_accounts_balances() {
let acc1_account_id: Vec<u8> = vec![
27, 132, 197, 86, 123, 18, 100, 64, 153, 93, 62, 213, 170, 186, 5, 101, 215, 30, 24,
52, 96, 72, 25, 255, 156, 23, 245, 233, 213, 221, 7, 143,
@ -461,7 +466,7 @@ mod tests {
let initial_accounts = vec![initial_acc1, initial_acc2];
let config = setup_sequencer_config_variable_initial_accounts(initial_accounts);
let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone());
let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone()).await;
let acc1_account_id = config.initial_accounts[0]
.account_id
@ -788,7 +793,8 @@ mod tests {
// from `acc_1` to `acc_2`. The block created with that transaction will be kept stored in
// the temporary directory for the block storage of this test.
{
let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config.clone());
let (mut sequencer, mempool_handle) =
SequencerCore::start_from_config(config.clone()).await;
let signing_key = PrivateKey::try_new([1; 32]).unwrap();
let tx = common::test_utils::create_transaction_native_token_transfer(
@ -810,7 +816,7 @@ mod tests {
// Instantiating a new sequencer from the same config. This should load the existing block
// with the above transaction and update the state to reflect that.
let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone());
let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone()).await;
let balance_acc_1 = sequencer.state.get_account_by_id(&acc1_account_id).balance;
let balance_acc_2 = sequencer.state.get_account_by_id(&acc2_account_id).balance;
@ -825,10 +831,10 @@ mod tests {
);
}
#[test]
fn test_get_pending_blocks() {
#[tokio::test]
async fn test_get_pending_blocks() {
let config = setup_sequencer_config();
let (mut sequencer, _mempool_handle) = SequencerCore::start_from_config(config);
let (mut sequencer, _mempool_handle) = SequencerCore::start_from_config(config).await;
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
@ -841,10 +847,10 @@ mod tests {
assert_eq!(sequencer.get_pending_blocks().unwrap().len(), 4);
}
#[test]
fn test_delete_blocks() {
#[tokio::test]
async fn test_delete_blocks() {
let config = setup_sequencer_config();
let (mut sequencer, _mempool_handle) = SequencerCore::start_from_config(config);
let (mut sequencer, _mempool_handle) = SequencerCore::start_from_config(config).await;
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();

View File

@ -407,7 +407,7 @@ mod tests {
async fn components_for_tests() -> (JsonHandler, Vec<AccountInitialData>, EncodedTransaction) {
let config = sequencer_config_for_tests();
let (mut sequencer_core, mempool_handle) = SequencerCore::start_from_config(config);
let (mut sequencer_core, mempool_handle) = SequencerCore::start_from_config(config).await;
let initial_accounts = sequencer_core.sequencer_config().initial_accounts.clone();
let signing_key = nssa::PrivateKey::try_new([1; 32]).unwrap();

View File

@ -91,7 +91,7 @@ pub async fn startup_sequencer(
Duration::from_millis(app_config.retry_pending_blocks_timeout_millis);
let port = app_config.port;
let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config);
let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config).await;
info!("Sequencer core set up");
@ -185,7 +185,7 @@ async fn retry_pending_blocks_loop(
async fn listen_for_bedrock_blocks_loop(seq_core: Arc<Mutex<SequencerCore>>) -> Result<Never> {
use indexer_service_rpc::RpcClient as _;
let indexer_client = seq_core.lock().await.indexer_client().clone();
let indexer_client = seq_core.lock().await.indexer_client();
loop {
// TODO: Subscribe from the first pending block ID?