fix: indexer update

This commit is contained in:
Pravdyvy 2026-01-16 16:15:21 +02:00
parent c2e09031e1
commit b96865ab89
14 changed files with 144 additions and 60 deletions

12
Cargo.lock generated
View File

@ -2893,6 +2893,7 @@ dependencies = [
"serde_json",
"sha2",
"tokio",
"tokio-retry",
"url",
]
@ -5826,6 +5827,17 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-retry"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f"
dependencies = [
"pin-project",
"rand 0.8.5",
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.26.4"

View File

@ -80,6 +80,7 @@ borsh = "1.5.7"
base58 = "0.2.0"
itertools = "0.14.0"
url = "2.5.4"
tokio-retry = "0.3.0"
common-http-client = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
key-management-system-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }

View File

@ -73,6 +73,9 @@ pub struct GetProofForCommitmentRequest {
#[derive(Serialize, Deserialize, Debug)]
pub struct GetProgramIdsRequest {}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetLastSeenL2BlockAtIndexerRequest {}
parse_request!(HelloRequest);
parse_request!(RegisterAccountRequest);
parse_request!(SendTxRequest);
@ -87,6 +90,7 @@ parse_request!(GetAccountsNoncesRequest);
parse_request!(GetProofForCommitmentRequest);
parse_request!(GetAccountRequest);
parse_request!(GetProgramIdsRequest);
parse_request!(GetLastSeenL2BlockAtIndexerRequest);
#[derive(Serialize, Deserialize, Debug)]
pub struct HelloResponse {

View File

@ -18,9 +18,10 @@ use crate::{
GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest,
GetAccountsNoncesResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse,
GetInitialTestnetAccountsResponse, GetLastBlockRequest, GetLastBlockResponse,
GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest,
GetProofForCommitmentResponse, GetTransactionByHashRequest,
GetTransactionByHashResponse, SendTxRequest, SendTxResponse,
GetLastSeenL2BlockAtIndexerRequest, GetProgramIdsRequest, GetProgramIdsResponse,
GetProofForCommitmentRequest, GetProofForCommitmentResponse,
GetTransactionByHashRequest, GetTransactionByHashResponse, SendTxRequest,
SendTxResponse,
},
},
transaction::{EncodedTransaction, NSSATransaction},
@ -347,4 +348,22 @@ impl SequencerClient {
Ok(resp_deser)
}
/// Get last seen l2 block at indexer
pub async fn get_last_seen_l2_block_at_indexer(
&self,
) -> Result<GetLastBlockResponse, SequencerClientError> {
let last_req = GetLastSeenL2BlockAtIndexerRequest {};
let req = serde_json::to_value(last_req).unwrap();
let resp = self
.call_method_with_payload("get_last_seen_l2_block_at_indexer", req)
.await
.unwrap();
let resp_deser = serde_json::from_value(resp).unwrap();
Ok(resp_deser)
}
}

View File

@ -31,3 +31,4 @@ indicatif = { version = "0.18.3", features = ["improved_unicode"] }
risc0-zkvm.workspace = true
url.workspace = true
nomos-core.workspace = true
tokio-retry.workspace = true

View File

@ -3,4 +3,6 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexerConfig {
pub resubscribe_interval: u64,
pub start_delay: u64,
pub limit_retry: usize,
}

View File

@ -10,6 +10,7 @@ use nomos_core::mantle::{
ops::channel::{ChannelId, inscribe::InscriptionOp},
};
use tokio::sync::{RwLock, mpsc::Sender};
use tokio_retry::Retry;
use url::Url;
use crate::{config::IndexerConfig, message::IndexerToSequencerMessage, state::IndexerState};
@ -49,54 +50,66 @@ impl IndexerCore {
}
pub async fn subscribe_parse_block_stream(&self) -> Result<()> {
let mut stream_pinned = Box::pin(
self.bedrock_client
.0
.get_lib_stream(self.bedrock_url.clone())
.await?,
);
loop {
let mut stream_pinned = Box::pin(
self.bedrock_client
.0
.get_lib_stream(self.bedrock_url.clone())
.await?,
);
info!("Block stream joined");
info!("Block stream joined");
while let Some(block_info) = stream_pinned.next().await {
let header_id = block_info.header_id;
while let Some(block_info) = stream_pinned.next().await {
let header_id = block_info.header_id;
info!("Observed L1 block at height {}", block_info.height);
info!("Observed L1 block at height {}", block_info.height);
if let Some(l1_block) = self
.bedrock_client
.0
.get_block_by_id(self.bedrock_url.clone(), header_id)
// Simple retry strategy on requests
let strategy =
tokio_retry::strategy::FibonacciBackoff::from_millis(self.config.start_delay)
.take(self.config.limit_retry);
if let Some(l1_block) = Retry::spawn(strategy, || {
self.bedrock_client
.0
.get_block_by_id(self.bedrock_url.clone(), header_id)
})
.await?
{
info!("Extracted L1 block at height {}", block_info.height);
{
info!("Extracted L1 block at height {}", block_info.height);
let l2_blocks_parsed =
parse_blocks(l1_block.into_transactions().into_iter(), &self.channel_id);
let l2_blocks_parsed =
parse_blocks(l1_block.into_transactions().into_iter(), &self.channel_id);
for l2_block in l2_blocks_parsed {
// State modification, will be updated in future
{
let mut guard = self.state.latest_seen_block.write().await;
if l2_block.block_id > *guard {
*guard = l2_block.block_id;
for l2_block in l2_blocks_parsed {
// State modification, will be updated in future
{
let mut guard = self.state.latest_seen_block.write().await;
if l2_block.block_id > *guard {
*guard = l2_block.block_id;
}
}
// Sending data into sequencer, may need to be expanded.
let message = IndexerToSequencerMessage::BlockObserved {
l1_block_id: block_info.height,
l2_block_height: l2_block.block_id,
};
self.channel_sender.send(message.clone()).await?;
info!("Sent message {:#?} to sequencer", message);
}
// Sending data into sequencer, may need to be expanded.
let message = IndexerToSequencerMessage::BlockObserved {
l1_block_id: block_info.height,
l2_block_height: l2_block.block_id,
};
self.channel_sender.send(message.clone()).await?;
info!("Sent message {:#?} to sequencer", message);
}
}
}
Ok(())
// Refetch stream after delay
tokio::time::sleep(std::time::Duration::from_millis(
self.config.resubscribe_interval,
))
.await;
}
}
}

View File

@ -156,12 +156,14 @@
37
],
"bedrock_config": {
"channel_id": "2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a",
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101",
"node_url": "http://127.0.0.1:8080",
"user": "user",
"password": null,
"indexer_config": {
"resubscribe_interval": 1000
"resubscribe_interval": 1000,
"start_delay": 1000,
"limit_retry": 10
}
}
}

View File

@ -215,12 +215,9 @@ impl Drop for TestContext {
} = self;
sequencer_loop_handle.abort();
match indexer_loop_handle {
Some(handle) => {
handle.abort();
}
None => {}
};
if let Some(handle) = indexer_loop_handle {
handle.abort();
}
// Can't wait here as Drop can't be async, but anyway stop signal should be sent
sequencer_server_handle.stop(true).now_or_never();

View File

@ -6,17 +6,21 @@ use tokio::test;
#[test]
async fn indexer_run_local_node() -> Result<()> {
println!("Waiting 20 seconds for L1 node to start producing");
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
let ctx = TestContext::new_bedrock_local_attached().await?;
info!("Let's observe behaviour");
tokio::time::sleep(std::time::Duration::from_secs(300)).await;
tokio::time::sleep(std::time::Duration::from_secs(600)).await;
let gen_id = ctx.sequencer_client().get_genesis_id().await.unwrap();
let gen_id = ctx
.sequencer_client()
.get_last_seen_l2_block_at_indexer()
.await
.unwrap();
info!("btw, gen id is {gen_id:?}");
info!("Last seen L2 block at indexer is {}", gen_id.last_block);
Ok(())
}

View File

@ -21,7 +21,7 @@ log.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
bedrock_client.workspace = true
key-management-system-service.workspace = true
nomos-core.workspace=true
nomos-core.workspace = true
rand.workspace = true
reqwest.workspace = true
borsh.workspace = true

View File

@ -26,7 +26,7 @@ impl BlockSettlementClient {
.expect("Signing key should load or be created successfully");
let bedrock_node_url =
Url::parse(&config.node_url).expect("Bedrock URL should be a valid URL");
let bedrock_channel_id = ChannelId::from(config.channel_id);
let bedrock_channel_id = config.channel_id;
let bedrock_client =
BedrockClient::new(None).expect("Bedrock client should be able to initialize");
Self {
@ -78,7 +78,8 @@ impl BlockSettlementClient {
pub async fn post_and_wait(&self, block_data: &HashableBlockData) -> Result<u64> {
let msg_id: MsgId = {
let mut this = [0; 32];
this[0..8].copy_from_slice(&block_data.block_id.to_le_bytes());
// Bandaid solution
this[0..8].copy_from_slice(&(block_data.block_id - 2).to_le_bytes());
this.into()
};

View File

@ -16,10 +16,10 @@ use common::{
GetBlockDataRequest, GetBlockDataResponse, GetBlockRangeDataRequest,
GetBlockRangeDataResponse, GetGenesisIdRequest, GetGenesisIdResponse,
GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse,
GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest,
GetProofForCommitmentResponse, GetTransactionByHashRequest,
GetTransactionByHashResponse, HelloRequest, HelloResponse, SendTxRequest,
SendTxResponse,
GetLastSeenL2BlockAtIndexerRequest, GetProgramIdsRequest, GetProgramIdsResponse,
GetProofForCommitmentRequest, GetProofForCommitmentResponse,
GetTransactionByHashRequest, GetTransactionByHashResponse, HelloRequest, HelloResponse,
SendTxRequest, SendTxResponse,
},
},
transaction::{EncodedTransaction, NSSATransaction},
@ -44,6 +44,7 @@ pub const GET_ACCOUNTS_NONCES: &str = "get_accounts_nonces";
pub const GET_ACCOUNT: &str = "get_account";
pub const GET_PROOF_FOR_COMMITMENT: &str = "get_proof_for_commitment";
pub const GET_PROGRAM_IDS: &str = "get_program_ids";
pub const GET_LAST_SEEN_L2_BLOCK_AT_INDEXER: &str = "get_last_seen_l2_block_at_indexer";
pub const HELLO_FROM_SEQUENCER: &str = "HELLO_FROM_SEQUENCER";
@ -314,6 +315,27 @@ impl JsonHandler {
respond(response)
}
async fn process_get_last_seen_l2_block_at_indexer(
&self,
request: Request,
) -> Result<Value, RpcErr> {
let _get_last_req = GetLastSeenL2BlockAtIndexerRequest::parse(Some(request.params))?;
let last_block = {
if let Some(indexer_state) = &self.indexer_state {
let state = indexer_state.lock().await;
*state.state.latest_seen_block.read().await
} else {
0
}
};
let response = GetLastBlockResponse { last_block };
respond(response)
}
pub async fn process_request_internal(&self, request: Request) -> Result<Value, RpcErr> {
match request.method.as_ref() {
HELLO => self.process_temp_hello(request).await,
@ -329,6 +351,10 @@ impl JsonHandler {
GET_TRANSACTION_BY_HASH => self.process_get_transaction_by_hash(request).await,
GET_PROOF_FOR_COMMITMENT => self.process_get_proof_by_commitment(request).await,
GET_PROGRAM_IDS => self.process_get_program_ids(request).await,
GET_LAST_SEEN_L2_BLOCK_AT_INDEXER => {
self.process_get_last_seen_l2_block_at_indexer(request)
.await
}
_ => Err(RpcErr(RpcError::method_not_found(request.method))),
}
}
@ -397,6 +423,8 @@ mod tests {
password: None,
indexer_config: IndexerConfig {
resubscribe_interval: 100,
start_delay: 1000,
limit_retry: 10,
},
}),
}
@ -415,7 +443,7 @@ mod tests {
)),
sender,
bedrock_config.indexer_config.clone(),
bedrock_config.channel_id.into(),
bedrock_config.channel_id,
)
.unwrap();

View File

@ -43,7 +43,7 @@ pub async fn startup_sequencer(
)),
sender,
bedrock_config.indexer_config.clone(),
bedrock_config.channel_id.into(),
bedrock_config.channel_id,
)?;
info!("Indexer core set up");