fix: suggestions 1

This commit is contained in:
Pravdyvy 2026-01-21 14:50:29 +02:00
parent b143beef37
commit eb77217318
16 changed files with 108 additions and 66 deletions

View File

@ -105,7 +105,7 @@ jobs:
env:
RISC0_DEV_MODE: "1"
RUST_LOG: "info"
run: cargo nextest run --no-fail-fast -- --skip tps_test --skip indexer_run_local_node
run: cargo nextest run --no-fail-fast -- --skip tps_test
valid-proof-test:
runs-on: ubuntu-latest

7
Cargo.lock generated
View File

@ -956,8 +956,14 @@ name = "bedrock_client"
version = "0.1.0"
dependencies = [
"anyhow",
"broadcast-service",
"common-http-client",
"futures",
"log",
"nomos-core",
"reqwest",
"tokio-retry",
"url",
]
[[package]]
@ -2877,7 +2883,6 @@ dependencies = [
"nomos-core",
"serde",
"tokio",
"tokio-retry",
"url",
]

View File

@ -85,6 +85,7 @@ tokio-retry = "0.3.0"
common-http-client = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
key-management-system-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
nomos-core = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
broadcast-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
rocksdb = { version = "0.24.0", default-features = false, features = [
"snappy",

View File

@ -7,3 +7,9 @@ edition = "2024"
reqwest.workspace = true
anyhow.workspace = true
common-http-client.workspace = true
nomos-core.workspace = true
broadcast-service.workspace = true
url.workspace = true
futures.workspace = true
tokio-retry.workspace = true
log.workspace = true

View File

@ -1,7 +1,13 @@
use anyhow::Result;
use broadcast_service::BlockInfo;
use common_http_client::CommonHttpClient;
pub use common_http_client::{BasicAuthCredentials, Error};
use futures::{Stream, TryFutureExt};
use log::warn;
use nomos_core::{block::Block, header::HeaderId, mantle::SignedMantleTx};
use reqwest::Client;
use tokio_retry::Retry;
use url::Url;
// Simple wrapper
// maybe extend in the future for our purposes
@ -18,4 +24,26 @@ impl BedrockClient {
client, auth,
)))
}
pub async fn get_lib_stream(&self, url: Url) -> Result<impl Stream<Item = BlockInfo>, Error> {
self.0.get_lib_stream(url).await
}
pub async fn get_block_by_id(
&self,
url: &Url,
header_id: HeaderId,
start_delay_millis: u64,
max_retries: usize,
) -> Result<Option<Block<SignedMantleTx>>, Error> {
let strategy = tokio_retry::strategy::FibonacciBackoff::from_millis(start_delay_millis)
.take(max_retries);
Retry::spawn(strategy, || {
self.0
.get_block_by_id(url.clone(), header_id)
.inspect_err(|err| warn!("Block fetching failed with err: {err:#?}"))
})
.await
}
}

View File

@ -220,3 +220,8 @@ pub struct GetInitialTestnetAccountsResponse {
pub account_id: String,
pub balance: u64,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetLastSeenL2BlockResponse {
pub last_block: Option<u64>,
}

View File

@ -18,8 +18,8 @@ use crate::{
GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest,
GetAccountsNoncesResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse,
GetInitialTestnetAccountsResponse, GetLastBlockRequest, GetLastBlockResponse,
GetLastSeenL2BlockAtIndexerRequest, GetProgramIdsRequest, GetProgramIdsResponse,
GetProofForCommitmentRequest, GetProofForCommitmentResponse,
GetLastSeenL2BlockAtIndexerRequest, GetLastSeenL2BlockResponse, GetProgramIdsRequest,
GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse,
GetTransactionByHashRequest, GetTransactionByHashResponse, SendTxRequest,
SendTxResponse,
},
@ -352,7 +352,7 @@ impl SequencerClient {
/// Get last seen l2 block at indexer
pub async fn get_last_seen_l2_block_at_indexer(
&self,
) -> Result<GetLastBlockResponse, SequencerClientError> {
) -> Result<GetLastSeenL2BlockResponse, SequencerClientError> {
let last_req = GetLastSeenL2BlockAtIndexerRequest {};
let req = serde_json::to_value(last_req).unwrap();

View File

@ -15,4 +15,3 @@ borsh.workspace = true
futures.workspace = true
url.workspace = true
nomos-core.workspace = true
tokio-retry.workspace = true

View File

@ -1,8 +1,9 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
/// Note: For individual RPC requests we use Fibonacci backoff retry strategy
pub struct IndexerConfig {
pub resubscribe_interval: u64,
pub start_delay: u64,
pub limit_retry: usize,
pub resubscribe_interval_millis: u64,
pub start_delay_millis: u64,
pub max_retries: usize,
}

View File

@ -3,17 +3,16 @@ use std::sync::Arc;
use anyhow::Result;
use bedrock_client::{BasicAuthCredentials, BedrockClient};
use common::block::HashableBlockData;
use futures::{StreamExt, TryFutureExt};
use log::{info, warn};
use futures::StreamExt;
use log::info;
use nomos_core::mantle::{
Op, SignedMantleTx,
ops::channel::{ChannelId, inscribe::InscriptionOp},
};
use tokio::sync::{RwLock, mpsc::Sender};
use tokio_retry::Retry;
use url::Url;
use crate::{config::IndexerConfig, message::IndexerToSequencerMessage, state::IndexerState};
use crate::{config::IndexerConfig, message::Message, state::IndexerState};
pub mod config;
pub mod message;
@ -21,7 +20,7 @@ pub mod state;
pub struct IndexerCore {
pub bedrock_client: BedrockClient,
pub channel_sender: Sender<IndexerToSequencerMessage>,
pub channel_sender: Sender<Message>,
pub config: IndexerConfig,
pub bedrock_url: Url,
pub channel_id: ChannelId,
@ -32,7 +31,7 @@ impl IndexerCore {
pub fn new(
addr: &str,
auth: Option<BasicAuthCredentials>,
sender: Sender<IndexerToSequencerMessage>,
sender: Sender<Message>,
config: IndexerConfig,
channel_id: ChannelId,
) -> Result<Self> {
@ -53,7 +52,6 @@ impl IndexerCore {
loop {
let mut stream_pinned = Box::pin(
self.bedrock_client
.0
.get_lib_stream(self.bedrock_url.clone())
.await?,
);
@ -65,18 +63,15 @@ impl IndexerCore {
info!("Observed L1 block at height {}", block_info.height);
// Simple retry strategy on requests
let strategy =
tokio_retry::strategy::FibonacciBackoff::from_millis(self.config.start_delay)
.take(self.config.limit_retry);
if let Some(l1_block) = Retry::spawn(strategy, || {
self.bedrock_client
.0
.get_block_by_id(self.bedrock_url.clone(), header_id)
.inspect_err(|err| warn!("Block fetching failed with err: {err:#?}"))
})
.await?
if let Some(l1_block) = self
.bedrock_client
.get_block_by_id(
&self.bedrock_url,
header_id,
self.config.start_delay_millis,
self.config.max_retries,
)
.await?
{
info!("Extracted L1 block at height {}", block_info.height);
@ -93,7 +88,7 @@ impl IndexerCore {
}
// Sending data into sequencer, may need to be expanded.
let message = IndexerToSequencerMessage::BlockObserved {
let message = Message::BlockObserved {
l1_block_id: block_info.height,
l2_block_height: l2_block.block_id,
};
@ -107,33 +102,29 @@ impl IndexerCore {
// Refetch stream after delay
tokio::time::sleep(std::time::Duration::from_millis(
self.config.resubscribe_interval,
self.config.resubscribe_interval_millis,
))
.await;
}
}
}
pub fn parse_blocks(
fn parse_blocks(
block_txs: impl Iterator<Item = SignedMantleTx>,
decoded_channel_id: &ChannelId,
) -> Vec<HashableBlockData> {
block_txs
.flat_map(|tx| {
tx.mantle_tx
.ops
.iter()
.filter_map(|op| match op {
Op::ChannelInscribe(InscriptionOp {
channel_id,
inscription,
..
}) if channel_id == decoded_channel_id => {
borsh::from_slice::<HashableBlockData>(inscription).ok()
}
_ => None,
})
.collect::<Vec<_>>()
tx.mantle_tx.ops.into_iter().filter_map(|op| match op {
Op::ChannelInscribe(InscriptionOp {
channel_id,
inscription,
..
}) if channel_id == *decoded_channel_id => {
borsh::from_slice::<HashableBlockData>(&inscription).ok()
}
_ => None,
})
})
.collect()
}

View File

@ -1,5 +1,5 @@
#[derive(Debug, Clone)]
pub enum IndexerToSequencerMessage {
pub enum Message {
BlockObserved {
l1_block_id: u64,
l2_block_height: u64,

View File

@ -161,9 +161,9 @@
"user": "user",
"password": null,
"indexer_config": {
"resubscribe_interval": 1000,
"start_delay": 1000,
"limit_retry": 10
"resubscribe_interval_millis": 1000,
"start_delay_millis": 1000,
"max_retries": 10
}
}
}

View File

@ -3,7 +3,12 @@ use integration_tests::TestContext;
use log::info;
use tokio::test;
#[ignore = "needs complicated setup"]
#[test]
/// To run this test properly, you need nomos node running in the background.
/// For instructions in building nomos node, refer to [this](https://github.com/logos-blockchain/logos-blockchain?tab=readme-ov-file#running-a-logos-blockchain-node).
///
/// Recommended to run node locally from build binary.
async fn indexer_run_local_node() -> Result<()> {
let ctx = TestContext::new_bedrock_local_attached().await?;
@ -15,12 +20,14 @@ async fn indexer_run_local_node() -> Result<()> {
.sequencer_client()
.get_last_seen_l2_block_at_indexer()
.await
.unwrap()
.last_block
.unwrap();
// Checking, that some blocks are landed on bedrock
assert!(gen_id.last_block > 0);
assert!(gen_id > 0);
info!("Last seen L2 block at indexer is {}", gen_id.last_block);
info!("Last seen L2 block at indexer is {gen_id}");
Ok(())
}

View File

@ -9,7 +9,7 @@ use common::{
transaction::{EncodedTransaction, NSSATransaction},
};
use config::SequencerConfig;
use indexer::message::IndexerToSequencerMessage;
use indexer::message::Message;
use log::warn;
use mempool::{MemPool, MemPoolHandle};
use serde::{Deserialize, Serialize};
@ -27,9 +27,8 @@ pub struct SequencerCore {
mempool: MemPool<EncodedTransaction>,
sequencer_config: SequencerConfig,
chain_height: u64,
// No logic here for now
#[allow(unused)]
receiver: Option<Receiver<IndexerToSequencerMessage>>,
#[expect(unused, reason = "No logic here for now")]
receiver: Option<Receiver<Message>>,
block_settlement_client: Option<BlockSettlementClient>,
}
@ -51,7 +50,7 @@ impl SequencerCore {
/// Start Sequencer from configuration and construct transaction sender
pub fn start_from_config(
config: SequencerConfig,
receiver: Option<Receiver<IndexerToSequencerMessage>>,
receiver: Option<Receiver<Message>>,
) -> (Self, MemPoolHandle<EncodedTransaction>) {
let hashable_data = HashableBlockData {
block_id: config.genesis_id,

View File

@ -16,8 +16,8 @@ use common::{
GetBlockDataRequest, GetBlockDataResponse, GetBlockRangeDataRequest,
GetBlockRangeDataResponse, GetGenesisIdRequest, GetGenesisIdResponse,
GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse,
GetLastSeenL2BlockAtIndexerRequest, GetProgramIdsRequest, GetProgramIdsResponse,
GetProofForCommitmentRequest, GetProofForCommitmentResponse,
GetLastSeenL2BlockAtIndexerRequest, GetLastSeenL2BlockResponse, GetProgramIdsRequest,
GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse,
GetTransactionByHashRequest, GetTransactionByHashResponse, HelloRequest, HelloResponse,
SendTxRequest, SendTxResponse,
},
@ -325,13 +325,13 @@ impl JsonHandler {
if let Some(indexer_state) = &self.indexer_state {
let last_seen_block = indexer_state.latest_seen_block.read().await;
*last_seen_block
Some(*last_seen_block)
} else {
0
None
}
};
let response = GetLastBlockResponse { last_block };
let response = GetLastSeenL2BlockResponse { last_block };
respond(response)
}
@ -422,9 +422,9 @@ mod tests {
user: "user".to_string(),
password: None,
indexer_config: IndexerConfig {
resubscribe_interval: 100,
start_delay: 1000,
limit_retry: 10,
resubscribe_interval_millis: 100,
start_delay_millis: 1000,
max_retries: 10,
},
}),
}

View File

@ -134,8 +134,8 @@ pub async fn main_runner() -> Result<()> {
main_loop_handle.await??;
if indexer_loop_handle.is_some() {
indexer_loop_handle.unwrap().await??;
if let Some(indexer_loop_handle) = indexer_loop_handle {
indexer_loop_handle.await??;
}
Ok(())