From b0a48883e1da576f5db7bd94ba11e0817e62fd80 Mon Sep 17 00:00:00 2001 From: Daniil Polyakov Date: Wed, 29 Apr 2026 21:48:57 +0300 Subject: [PATCH] chore: add new blocks every 30 seconds to mock indexer service Co-authored-by: Copilot --- indexer/service/src/mock_service.rs | 400 +++++++++++++++++++--------- 1 file changed, 271 insertions(+), 129 deletions(-) diff --git a/indexer/service/src/mock_service.rs b/indexer/service/src/mock_service.rs index 09ae96f5..c4a099b8 100644 --- a/indexer/service/src/mock_service.rs +++ b/indexer/service/src/mock_service.rs @@ -6,7 +6,7 @@ clippy::integer_division_remainder_used, reason = "Mock service uses intentional casts and format patterns for test data generation" )] -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc, time::Duration}; use indexer_service_protocol::{ Account, AccountId, BedrockStatus, Block, BlockBody, BlockHeader, BlockId, Commitment, @@ -19,15 +19,73 @@ use jsonrpsee::{ core::{SubscriptionResult, async_trait}, types::ErrorObjectOwned, }; +use tokio::sync::{RwLock, broadcast}; -/// A mock implementation of the `IndexerService` RPC for testing purposes. -pub struct MockIndexerService { +const MOCK_GENESIS_TIMESTAMP_MS: u64 = 1_704_067_200_000; +const MOCK_BLOCK_INTERVAL_MS: u64 = 30_000; + +struct MockState { blocks: Vec, accounts: HashMap, + account_ids: Vec, transactions: HashMap, } +/// A mock implementation of the `IndexerService` RPC for testing purposes. +pub struct MockIndexerService { + state: Arc>, + finalized_blocks_tx: broadcast::Sender, +} + impl MockIndexerService { + fn spawn_block_generation_task( + state: Arc>, + finalized_blocks_tx: broadcast::Sender, + ) { + tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(30)).await; + + let new_block = { + let mut state = state.write().await; + + let next_block_id = state + .blocks + .last() + .map_or(1, |block| block.header.block_id.saturating_add(1)); + let prev_hash = state + .blocks + .last() + .map_or(HashType([0_u8; 32]), |block| block.header.hash); + let timestamp = state.blocks.last().map_or( + MOCK_GENESIS_TIMESTAMP_MS + MOCK_BLOCK_INTERVAL_MS, + |block| { + block + .header + .timestamp + .saturating_add(MOCK_BLOCK_INTERVAL_MS) + }, + ); + + let block = build_mock_block( + next_block_id, + prev_hash, + timestamp, + &state.account_ids, + BedrockStatus::Finalized, + ); + + index_block_transactions(&mut state.transactions, &block); + state.blocks.push(block.clone()); + + block + }; + + let _res = finalized_blocks_tx.send(new_block); + } + }); + } + #[must_use] pub fn new_with_mock_blocks() -> Self { let mut blocks = Vec::new(); @@ -59,119 +117,38 @@ impl MockIndexerService { let mut prev_hash = HashType([0_u8; 32]); for block_id in 1..=100 { - let block_hash = { - let mut hash = [0_u8; 32]; - hash[0] = block_id as u8; - hash[1] = 0xff; - HashType(hash) - }; - - // Create 2-4 transactions per block (mix of Public, PrivacyPreserving, and - // ProgramDeployment) - let num_txs = 2 + (block_id % 3); - let mut block_transactions = Vec::new(); - - for tx_idx in 0..num_txs { - let tx_hash = { - let mut hash = [0_u8; 32]; - hash[0] = block_id as u8; - hash[1] = tx_idx as u8; - HashType(hash) - }; - - // Vary transaction types: Public, PrivacyPreserving, or ProgramDeployment - let tx = match (block_id + tx_idx) % 5 { - // Public transactions (most common) - 0 | 1 => Transaction::Public(PublicTransaction { - hash: tx_hash, - message: PublicMessage { - program_id: ProgramId([1_u32; 8]), - account_ids: vec![ - account_ids[tx_idx as usize % account_ids.len()], - account_ids[(tx_idx as usize + 1) % account_ids.len()], - ], - nonces: vec![block_id as u128, (block_id + 1) as u128], - instruction_data: vec![1, 2, 3, 4], - }, - witness_set: WitnessSet { - signatures_and_public_keys: vec![], - proof: None, - }, - }), - // PrivacyPreserving transactions - 2 | 3 => Transaction::PrivacyPreserving(PrivacyPreservingTransaction { - hash: tx_hash, - message: PrivacyPreservingMessage { - public_account_ids: vec![ - account_ids[tx_idx as usize % account_ids.len()], - ], - nonces: vec![block_id as u128], - public_post_states: vec![Account { - program_owner: ProgramId([1_u32; 8]), - balance: 500, - data: Data(vec![0xdd, 0xee]), - nonce: block_id as u128, - }], - encrypted_private_post_states: vec![EncryptedAccountData { - ciphertext: indexer_service_protocol::Ciphertext(vec![ - 0x01, 0x02, 0x03, 0x04, - ]), - epk: indexer_service_protocol::EphemeralPublicKey(vec![0xaa; 32]), - view_tag: 42, - }], - new_commitments: vec![Commitment([block_id as u8; 32])], - new_nullifiers: vec![( - indexer_service_protocol::Nullifier([tx_idx as u8; 32]), - CommitmentSetDigest([0xff; 32]), - )], - block_validity_window: ValidityWindow((None, None)), - timestamp_validity_window: ValidityWindow((None, None)), - }, - witness_set: WitnessSet { - signatures_and_public_keys: vec![], - proof: Some(indexer_service_protocol::Proof(vec![0; 32])), - }, - }), - // ProgramDeployment transactions (rare) - _ => Transaction::ProgramDeployment(ProgramDeploymentTransaction { - hash: tx_hash, - message: ProgramDeploymentMessage { - bytecode: vec![0x00, 0x61, 0x73, 0x6d, 0x01, 0x00, 0x00, 0x00], /* WASM magic number */ - }, - }), - }; - - transactions.insert(tx_hash, (tx.clone(), block_id)); - block_transactions.push(tx); - } - - let block = Block { - header: BlockHeader { - block_id, - prev_block_hash: prev_hash, - hash: block_hash, - timestamp: 1_704_067_200_000 + (block_id * 12_000), // ~12 seconds per block - signature: Signature([0_u8; 64]), - }, - body: BlockBody { - transactions: block_transactions, - }, - bedrock_status: match block_id { + let block = build_mock_block( + block_id, + prev_hash, + MOCK_GENESIS_TIMESTAMP_MS + (block_id * MOCK_BLOCK_INTERVAL_MS), + &account_ids, + match block_id { 0..=5 => BedrockStatus::Finalized, 6..=8 => BedrockStatus::Safe, _ => BedrockStatus::Pending, }, - bedrock_parent_id: MantleMsgId([0; 32]), - }; + ); - prev_hash = block_hash; + index_block_transactions(&mut transactions, &block); + + prev_hash = block.header.hash; blocks.push(block); } - Self { + let state = Arc::new(RwLock::new(MockState { blocks, accounts, + account_ids, transactions, + })); + + let (finalized_blocks_tx, _) = broadcast::channel(32); + + Self::spawn_block_generation_task(Arc::clone(&state), finalized_blocks_tx.clone()); + + Self { + state, + finalized_blocks_tx, } } } @@ -183,21 +160,45 @@ impl indexer_service_rpc::RpcServer for MockIndexerService { subscription_sink: jsonrpsee::PendingSubscriptionSink, ) -> SubscriptionResult { let sink = subscription_sink.accept().await?; - for block in self - .blocks - .iter() - .filter(|b| b.bedrock_status == BedrockStatus::Finalized) - { + let initial_finalized_blocks: Vec = { + let state = self.state.read().await; + state + .blocks + .iter() + .filter(|b| b.bedrock_status == BedrockStatus::Finalized) + .cloned() + .collect() + }; + + for block in &initial_finalized_blocks { let json = serde_json::value::to_raw_value(block).unwrap(); sink.send(json).await?; } + + let mut receiver = self.finalized_blocks_tx.subscribe(); + loop { + match receiver.recv().await { + Ok(block) => { + let json = serde_json::value::to_raw_value(&block).unwrap(); + sink.send(json).await?; + } + Err(broadcast::error::RecvError::Lagged(_)) => {} + Err(broadcast::error::RecvError::Closed) => break, + } + } + Ok(()) } async fn get_last_finalized_block_id(&self) -> Result { - self.blocks - .last() - .map(|bl| bl.header.block_id) + self.state + .read() + .await + .blocks + .iter() + .rev() + .find(|block| block.bedrock_status == BedrockStatus::Finalized) + .map(|block| block.header.block_id) .ok_or_else(|| { ErrorObjectOwned::owned(-32001, "Last block not found".to_owned(), None::<()>) }) @@ -205,6 +206,9 @@ impl indexer_service_rpc::RpcServer for MockIndexerService { async fn get_block_by_id(&self, block_id: BlockId) -> Result, ErrorObjectOwned> { Ok(self + .state + .read() + .await .blocks .iter() .find(|b| b.header.block_id == block_id) @@ -216,6 +220,9 @@ impl indexer_service_rpc::RpcServer for MockIndexerService { block_hash: HashType, ) -> Result, ErrorObjectOwned> { Ok(self + .state + .read() + .await .blocks .iter() .find(|b| b.header.hash == block_hash) @@ -223,7 +230,10 @@ impl indexer_service_rpc::RpcServer for MockIndexerService { } async fn get_account(&self, account_id: AccountId) -> Result { - self.accounts + self.state + .read() + .await + .accounts .get(&account_id) .cloned() .ok_or_else(|| ErrorObjectOwned::owned(-32001, "Account not found", None::<()>)) @@ -233,7 +243,13 @@ impl indexer_service_rpc::RpcServer for MockIndexerService { &self, tx_hash: HashType, ) -> Result, ErrorObjectOwned> { - Ok(self.transactions.get(&tx_hash).map(|(tx, _)| tx.clone())) + Ok(self + .state + .read() + .await + .transactions + .get(&tx_hash) + .map(|(tx, _)| tx.clone())) } async fn get_blocks( @@ -241,15 +257,17 @@ impl indexer_service_rpc::RpcServer for MockIndexerService { before: Option, limit: u64, ) -> Result, ErrorObjectOwned> { + let state = self.state.read().await; + let start_id = before.map_or_else( - || self.blocks.len(), + || state.blocks.len(), |id| usize::try_from(id.saturating_sub(1)).expect("u64 should fit in usize"), ); let result = (1..=start_id) .rev() .take(limit as usize) - .map_while(|block_id| self.blocks.get(block_id - 1).cloned()) + .map_while(|block_id| state.blocks.get(block_id - 1).cloned()) .collect(); Ok(result) @@ -261,20 +279,24 @@ impl indexer_service_rpc::RpcServer for MockIndexerService { offset: u64, limit: u64, ) -> Result, ErrorObjectOwned> { - let mut account_txs: Vec<_> = self - .transactions - .values() - .filter(|(tx, _)| match tx { - Transaction::Public(pub_tx) => pub_tx.message.account_ids.contains(&account_id), - Transaction::PrivacyPreserving(priv_tx) => { - priv_tx.message.public_account_ids.contains(&account_id) - } - Transaction::ProgramDeployment(_) => false, - }) - .collect(); + let mut account_txs: Vec<(Transaction, BlockId)> = { + let state = self.state.read().await; + state + .transactions + .values() + .filter(|(tx, _)| match tx { + Transaction::Public(pub_tx) => pub_tx.message.account_ids.contains(&account_id), + Transaction::PrivacyPreserving(priv_tx) => { + priv_tx.message.public_account_ids.contains(&account_id) + } + Transaction::ProgramDeployment(_) => false, + }) + .cloned() + .collect() + }; // Sort by block ID descending (most recent first) - account_txs.sort_by_key(|b| std::cmp::Reverse(b.1)); + account_txs.sort_by_key(|(_, block_id)| std::cmp::Reverse(*block_id)); let start = offset as usize; if start >= account_txs.len() { @@ -293,3 +315,123 @@ impl indexer_service_rpc::RpcServer for MockIndexerService { Ok(()) } } + +fn build_mock_block( + block_id: BlockId, + prev_hash: HashType, + timestamp: u64, + account_ids: &[AccountId], + bedrock_status: BedrockStatus, +) -> Block { + let block_hash = { + let mut hash = [0_u8; 32]; + hash[0] = block_id as u8; + hash[1] = 0xff; + HashType(hash) + }; + + // Create 2-4 transactions per block (mix of Public, PrivacyPreserving, and ProgramDeployment) + let num_txs = 2 + (block_id % 3); + let mut block_transactions = Vec::new(); + + for tx_idx in 0..num_txs { + let tx_hash = { + let mut hash = [0_u8; 32]; + hash[0] = block_id as u8; + hash[1] = tx_idx as u8; + HashType(hash) + }; + + // Vary transaction types: Public, PrivacyPreserving, or ProgramDeployment + let tx = match (block_id + tx_idx) % 5 { + // Public transactions (most common) + 0 | 1 => Transaction::Public(PublicTransaction { + hash: tx_hash, + message: PublicMessage { + program_id: ProgramId([1_u32; 8]), + account_ids: vec![ + account_ids[tx_idx as usize % account_ids.len()], + account_ids[(tx_idx as usize + 1) % account_ids.len()], + ], + nonces: vec![block_id as u128, (block_id + 1) as u128], + instruction_data: vec![1, 2, 3, 4], + }, + witness_set: WitnessSet { + signatures_and_public_keys: vec![], + proof: None, + }, + }), + // PrivacyPreserving transactions + 2 | 3 => Transaction::PrivacyPreserving(PrivacyPreservingTransaction { + hash: tx_hash, + message: PrivacyPreservingMessage { + public_account_ids: vec![account_ids[tx_idx as usize % account_ids.len()]], + nonces: vec![block_id as u128], + public_post_states: vec![Account { + program_owner: ProgramId([1_u32; 8]), + balance: 500, + data: Data(vec![0xdd, 0xee]), + nonce: block_id as u128, + }], + encrypted_private_post_states: vec![EncryptedAccountData { + ciphertext: indexer_service_protocol::Ciphertext(vec![ + 0x01, 0x02, 0x03, 0x04, + ]), + epk: indexer_service_protocol::EphemeralPublicKey(vec![0xaa; 32]), + view_tag: 42, + }], + new_commitments: vec![Commitment([block_id as u8; 32])], + new_nullifiers: vec![( + indexer_service_protocol::Nullifier([tx_idx as u8; 32]), + CommitmentSetDigest([0xff; 32]), + )], + block_validity_window: ValidityWindow((None, None)), + timestamp_validity_window: ValidityWindow((None, None)), + }, + witness_set: WitnessSet { + signatures_and_public_keys: vec![], + proof: Some(indexer_service_protocol::Proof(vec![0; 32])), + }, + }), + // ProgramDeployment transactions (rare) + _ => Transaction::ProgramDeployment(ProgramDeploymentTransaction { + hash: tx_hash, + message: ProgramDeploymentMessage { + bytecode: vec![0x00, 0x61, 0x73, 0x6d, 0x01, 0x00, 0x00, 0x00], /* WASM magic + * number */ + }, + }), + }; + + block_transactions.push(tx); + } + + Block { + header: BlockHeader { + block_id, + prev_block_hash: prev_hash, + hash: block_hash, + timestamp, + signature: Signature([0_u8; 64]), + }, + body: BlockBody { + transactions: block_transactions, + }, + bedrock_status, + bedrock_parent_id: MantleMsgId([0; 32]), + } +} + +fn index_block_transactions( + transactions: &mut HashMap, + block: &Block, +) { + for tx in &block.body.transactions { + let tx_hash = match tx { + Transaction::Public(public_tx) => public_tx.hash, + Transaction::PrivacyPreserving(private_tx) => private_tx.hash, + Transaction::ProgramDeployment(deployment_tx) => deployment_tx.hash, + }; + transactions.insert(tx_hash, (tx.clone(), block.header.block_id)); + } +}