diff --git a/Cargo.lock b/Cargo.lock index 8646b5d8..e2392385 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2893,6 +2893,7 @@ dependencies = [ "serde_json", "sha2", "tokio", + "tokio-retry", "url", ] @@ -5826,6 +5827,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-retry" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" +dependencies = [ + "pin-project", + "rand 0.8.5", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.4" diff --git a/Cargo.toml b/Cargo.toml index 4a4dc2ef..3fe9c16b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,6 +80,7 @@ borsh = "1.5.7" base58 = "0.2.0" itertools = "0.14.0" url = "2.5.4" +tokio-retry = "0.3.0" common-http-client = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } key-management-system-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } diff --git a/common/src/rpc_primitives/requests.rs b/common/src/rpc_primitives/requests.rs index 71641936..62460209 100644 --- a/common/src/rpc_primitives/requests.rs +++ b/common/src/rpc_primitives/requests.rs @@ -73,6 +73,9 @@ pub struct GetProofForCommitmentRequest { #[derive(Serialize, Deserialize, Debug)] pub struct GetProgramIdsRequest {} +#[derive(Serialize, Deserialize, Debug)] +pub struct GetLastSeenL2BlockAtIndexerRequest {} + parse_request!(HelloRequest); parse_request!(RegisterAccountRequest); parse_request!(SendTxRequest); @@ -87,6 +90,7 @@ parse_request!(GetAccountsNoncesRequest); parse_request!(GetProofForCommitmentRequest); parse_request!(GetAccountRequest); parse_request!(GetProgramIdsRequest); +parse_request!(GetLastSeenL2BlockAtIndexerRequest); #[derive(Serialize, Deserialize, Debug)] pub struct HelloResponse { diff --git a/common/src/sequencer_client.rs b/common/src/sequencer_client.rs index 0cb03f6f..b33c22a5 100644 --- a/common/src/sequencer_client.rs +++ b/common/src/sequencer_client.rs @@ -18,9 +18,10 @@ use crate::{ GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse, GetInitialTestnetAccountsResponse, GetLastBlockRequest, GetLastBlockResponse, - GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, - GetProofForCommitmentResponse, GetTransactionByHashRequest, - GetTransactionByHashResponse, SendTxRequest, SendTxResponse, + GetLastSeenL2BlockAtIndexerRequest, GetProgramIdsRequest, GetProgramIdsResponse, + GetProofForCommitmentRequest, GetProofForCommitmentResponse, + GetTransactionByHashRequest, GetTransactionByHashResponse, SendTxRequest, + SendTxResponse, }, }, transaction::{EncodedTransaction, NSSATransaction}, @@ -347,4 +348,22 @@ impl SequencerClient { Ok(resp_deser) } + + /// Get last seen l2 block at indexer + pub async fn get_last_seen_l2_block_at_indexer( + &self, + ) -> Result { + let last_req = GetLastSeenL2BlockAtIndexerRequest {}; + + let req = serde_json::to_value(last_req).unwrap(); + + let resp = self + .call_method_with_payload("get_last_seen_l2_block_at_indexer", req) + .await + .unwrap(); + + let resp_deser = serde_json::from_value(resp).unwrap(); + + Ok(resp_deser) + } } diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml index 6d05bed5..b3ad6f3d 100644 --- a/indexer/Cargo.toml +++ b/indexer/Cargo.toml @@ -31,3 +31,4 @@ indicatif = { version = "0.18.3", features = ["improved_unicode"] } risc0-zkvm.workspace = true url.workspace = true nomos-core.workspace = true +tokio-retry.workspace = true diff --git a/indexer/src/config.rs b/indexer/src/config.rs index 1ab102f8..d37bb423 100644 --- a/indexer/src/config.rs +++ b/indexer/src/config.rs @@ -3,4 +3,6 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct IndexerConfig { pub resubscribe_interval: u64, + pub start_delay: u64, + pub limit_retry: usize, } diff --git a/indexer/src/lib.rs b/indexer/src/lib.rs index 8fd2c9c5..9b713eee 100644 --- a/indexer/src/lib.rs +++ b/indexer/src/lib.rs @@ -10,6 +10,7 @@ use nomos_core::mantle::{ ops::channel::{ChannelId, inscribe::InscriptionOp}, }; use tokio::sync::{RwLock, mpsc::Sender}; +use tokio_retry::Retry; use url::Url; use crate::{config::IndexerConfig, message::IndexerToSequencerMessage, state::IndexerState}; @@ -49,54 +50,66 @@ impl IndexerCore { } pub async fn subscribe_parse_block_stream(&self) -> Result<()> { - let mut stream_pinned = Box::pin( - self.bedrock_client - .0 - .get_lib_stream(self.bedrock_url.clone()) - .await?, - ); + loop { + let mut stream_pinned = Box::pin( + self.bedrock_client + .0 + .get_lib_stream(self.bedrock_url.clone()) + .await?, + ); - info!("Block stream joined"); + info!("Block stream joined"); - while let Some(block_info) = stream_pinned.next().await { - let header_id = block_info.header_id; + while let Some(block_info) = stream_pinned.next().await { + let header_id = block_info.header_id; - info!("Observed L1 block at height {}", block_info.height); + info!("Observed L1 block at height {}", block_info.height); - if let Some(l1_block) = self - .bedrock_client - .0 - .get_block_by_id(self.bedrock_url.clone(), header_id) + // Simple retry strategy on requests + let strategy = + tokio_retry::strategy::FibonacciBackoff::from_millis(self.config.start_delay) + .take(self.config.limit_retry); + + if let Some(l1_block) = Retry::spawn(strategy, || { + self.bedrock_client + .0 + .get_block_by_id(self.bedrock_url.clone(), header_id) + }) .await? - { - info!("Extracted L1 block at height {}", block_info.height); + { + info!("Extracted L1 block at height {}", block_info.height); - let l2_blocks_parsed = - parse_blocks(l1_block.into_transactions().into_iter(), &self.channel_id); + let l2_blocks_parsed = + parse_blocks(l1_block.into_transactions().into_iter(), &self.channel_id); - for l2_block in l2_blocks_parsed { - // State modification, will be updated in future - { - let mut guard = self.state.latest_seen_block.write().await; - if l2_block.block_id > *guard { - *guard = l2_block.block_id; + for l2_block in l2_blocks_parsed { + // State modification, will be updated in future + { + let mut guard = self.state.latest_seen_block.write().await; + if l2_block.block_id > *guard { + *guard = l2_block.block_id; + } } + + // Sending data into sequencer, may need to be expanded. + let message = IndexerToSequencerMessage::BlockObserved { + l1_block_id: block_info.height, + l2_block_height: l2_block.block_id, + }; + + self.channel_sender.send(message.clone()).await?; + + info!("Sent message {:#?} to sequencer", message); } - - // Sending data into sequencer, may need to be expanded. - let message = IndexerToSequencerMessage::BlockObserved { - l1_block_id: block_info.height, - l2_block_height: l2_block.block_id, - }; - - self.channel_sender.send(message.clone()).await?; - - info!("Sent message {:#?} to sequencer", message); } } - } - Ok(()) + // Refetch stream after delay + tokio::time::sleep(std::time::Duration::from_millis( + self.config.resubscribe_interval, + )) + .await; + } } } diff --git a/integration_tests/configs/sequencer/bedrock_local_attached/sequencer_config.json b/integration_tests/configs/sequencer/bedrock_local_attached/sequencer_config.json index 7212ce36..80013751 100644 --- a/integration_tests/configs/sequencer/bedrock_local_attached/sequencer_config.json +++ b/integration_tests/configs/sequencer/bedrock_local_attached/sequencer_config.json @@ -156,12 +156,14 @@ 37 ], "bedrock_config": { - "channel_id": "2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a", + "channel_id": "0101010101010101010101010101010101010101010101010101010101010101", "node_url": "http://127.0.0.1:8080", "user": "user", "password": null, "indexer_config": { - "resubscribe_interval": 1000 + "resubscribe_interval": 1000, + "start_delay": 1000, + "limit_retry": 10 } } } diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index af8758e4..a2afb11b 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -215,12 +215,9 @@ impl Drop for TestContext { } = self; sequencer_loop_handle.abort(); - match indexer_loop_handle { - Some(handle) => { - handle.abort(); - } - None => {} - }; + if let Some(handle) = indexer_loop_handle { + handle.abort(); + } // Can't wait here as Drop can't be async, but anyway stop signal should be sent sequencer_server_handle.stop(true).now_or_never(); diff --git a/integration_tests/tests/indexer.rs b/integration_tests/tests/indexer.rs index d82e8960..12e011ff 100644 --- a/integration_tests/tests/indexer.rs +++ b/integration_tests/tests/indexer.rs @@ -6,17 +6,21 @@ use tokio::test; #[test] async fn indexer_run_local_node() -> Result<()> { println!("Waiting 20 seconds for L1 node to start producing"); - tokio::time::sleep(std::time::Duration::from_secs(2)).await; + tokio::time::sleep(std::time::Duration::from_secs(30)).await; let ctx = TestContext::new_bedrock_local_attached().await?; info!("Let's observe behaviour"); - tokio::time::sleep(std::time::Duration::from_secs(300)).await; + tokio::time::sleep(std::time::Duration::from_secs(600)).await; - let gen_id = ctx.sequencer_client().get_genesis_id().await.unwrap(); + let gen_id = ctx + .sequencer_client() + .get_last_seen_l2_block_at_indexer() + .await + .unwrap(); - info!("btw, gen id is {gen_id:?}"); + info!("Last seen L2 block at indexer is {}", gen_id.last_block); Ok(()) } diff --git a/sequencer_core/Cargo.toml b/sequencer_core/Cargo.toml index 85e36632..4ef1e8f6 100644 --- a/sequencer_core/Cargo.toml +++ b/sequencer_core/Cargo.toml @@ -21,7 +21,7 @@ log.workspace = true tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } bedrock_client.workspace = true key-management-system-service.workspace = true -nomos-core.workspace=true +nomos-core.workspace = true rand.workspace = true reqwest.workspace = true borsh.workspace = true diff --git a/sequencer_core/src/block_settlement_client.rs b/sequencer_core/src/block_settlement_client.rs index 1964f37f..ca10695d 100644 --- a/sequencer_core/src/block_settlement_client.rs +++ b/sequencer_core/src/block_settlement_client.rs @@ -26,7 +26,7 @@ impl BlockSettlementClient { .expect("Signing key should load or be created successfully"); let bedrock_node_url = Url::parse(&config.node_url).expect("Bedrock URL should be a valid URL"); - let bedrock_channel_id = ChannelId::from(config.channel_id); + let bedrock_channel_id = config.channel_id; let bedrock_client = BedrockClient::new(None).expect("Bedrock client should be able to initialize"); Self { @@ -78,7 +78,8 @@ impl BlockSettlementClient { pub async fn post_and_wait(&self, block_data: &HashableBlockData) -> Result { let msg_id: MsgId = { let mut this = [0; 32]; - this[0..8].copy_from_slice(&block_data.block_id.to_le_bytes()); + // Bandaid solution + this[0..8].copy_from_slice(&(block_data.block_id - 2).to_le_bytes()); this.into() }; diff --git a/sequencer_rpc/src/process.rs b/sequencer_rpc/src/process.rs index 05c6342d..2c0ed866 100644 --- a/sequencer_rpc/src/process.rs +++ b/sequencer_rpc/src/process.rs @@ -16,10 +16,10 @@ use common::{ GetBlockDataRequest, GetBlockDataResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse, GetGenesisIdRequest, GetGenesisIdResponse, GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse, - GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, - GetProofForCommitmentResponse, GetTransactionByHashRequest, - GetTransactionByHashResponse, HelloRequest, HelloResponse, SendTxRequest, - SendTxResponse, + GetLastSeenL2BlockAtIndexerRequest, GetProgramIdsRequest, GetProgramIdsResponse, + GetProofForCommitmentRequest, GetProofForCommitmentResponse, + GetTransactionByHashRequest, GetTransactionByHashResponse, HelloRequest, HelloResponse, + SendTxRequest, SendTxResponse, }, }, transaction::{EncodedTransaction, NSSATransaction}, @@ -44,6 +44,7 @@ pub const GET_ACCOUNTS_NONCES: &str = "get_accounts_nonces"; pub const GET_ACCOUNT: &str = "get_account"; pub const GET_PROOF_FOR_COMMITMENT: &str = "get_proof_for_commitment"; pub const GET_PROGRAM_IDS: &str = "get_program_ids"; +pub const GET_LAST_SEEN_L2_BLOCK_AT_INDEXER: &str = "get_last_seen_l2_block_at_indexer"; pub const HELLO_FROM_SEQUENCER: &str = "HELLO_FROM_SEQUENCER"; @@ -314,6 +315,27 @@ impl JsonHandler { respond(response) } + async fn process_get_last_seen_l2_block_at_indexer( + &self, + request: Request, + ) -> Result { + let _get_last_req = GetLastSeenL2BlockAtIndexerRequest::parse(Some(request.params))?; + + let last_block = { + if let Some(indexer_state) = &self.indexer_state { + let state = indexer_state.lock().await; + + *state.state.latest_seen_block.read().await + } else { + 0 + } + }; + + let response = GetLastBlockResponse { last_block }; + + respond(response) + } + pub async fn process_request_internal(&self, request: Request) -> Result { match request.method.as_ref() { HELLO => self.process_temp_hello(request).await, @@ -329,6 +351,10 @@ impl JsonHandler { GET_TRANSACTION_BY_HASH => self.process_get_transaction_by_hash(request).await, GET_PROOF_FOR_COMMITMENT => self.process_get_proof_by_commitment(request).await, GET_PROGRAM_IDS => self.process_get_program_ids(request).await, + GET_LAST_SEEN_L2_BLOCK_AT_INDEXER => { + self.process_get_last_seen_l2_block_at_indexer(request) + .await + } _ => Err(RpcErr(RpcError::method_not_found(request.method))), } } @@ -397,6 +423,8 @@ mod tests { password: None, indexer_config: IndexerConfig { resubscribe_interval: 100, + start_delay: 1000, + limit_retry: 10, }, }), } @@ -415,7 +443,7 @@ mod tests { )), sender, bedrock_config.indexer_config.clone(), - bedrock_config.channel_id.into(), + bedrock_config.channel_id, ) .unwrap(); diff --git a/sequencer_runner/src/lib.rs b/sequencer_runner/src/lib.rs index b65675c4..16303db7 100644 --- a/sequencer_runner/src/lib.rs +++ b/sequencer_runner/src/lib.rs @@ -43,7 +43,7 @@ pub async fn startup_sequencer( )), sender, bedrock_config.indexer_config.clone(), - bedrock_config.channel_id.into(), + bedrock_config.channel_id, )?; info!("Indexer core set up");