diff --git a/Cargo.toml b/Cargo.toml index a4a2b89..a54b91a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ hmac-sha512 = "1.1.7" chrono = "0.4.41" borsh = "1.5.7" base58 = "0.2.0" +itertools = "0.14.0" rocksdb = { version = "0.21.0", default-features = false, features = [ "snappy", diff --git a/common/src/rpc_primitives/requests.rs b/common/src/rpc_primitives/requests.rs index 7149472..f87dc69 100644 --- a/common/src/rpc_primitives/requests.rs +++ b/common/src/rpc_primitives/requests.rs @@ -28,6 +28,13 @@ pub struct GetBlockDataRequest { pub block_id: u64, } +/// Get a range of blocks from `start_block_id` to `end_block_id` (inclusive) +#[derive(Serialize, Deserialize, Debug)] +pub struct GetBlockRangeDataRequest { + pub start_block_id: u64, + pub end_block_id: u64, +} + #[derive(Serialize, Deserialize, Debug)] pub struct GetGenesisIdRequest {} @@ -69,6 +76,7 @@ parse_request!(HelloRequest); parse_request!(RegisterAccountRequest); parse_request!(SendTxRequest); parse_request!(GetBlockDataRequest); +parse_request!(GetBlockRangeDataRequest); parse_request!(GetGenesisIdRequest); parse_request!(GetLastBlockRequest); parse_request!(GetInitialTestnetAccountsRequest); @@ -100,6 +108,11 @@ pub struct GetBlockDataResponse { pub block: Vec, } +#[derive(Serialize, Deserialize, Debug)] +pub struct GetBlockRangeDataResponse { + pub blocks: Vec>, +} + #[derive(Serialize, Deserialize, Debug)] pub struct GetGenesisIdResponse { pub genesis_id: u64, diff --git a/common/src/sequencer_client/mod.rs b/common/src/sequencer_client/mod.rs index a31806e..7a3956b 100644 --- a/common/src/sequencer_client/mod.rs +++ b/common/src/sequencer_client/mod.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::{collections::HashMap, ops::RangeInclusive}; use anyhow::Result; use json::{SendTxRequest, SendTxResponse, SequencerRpcRequest, SequencerRpcResponse}; @@ -14,7 +14,8 @@ use crate::{ error::{SequencerClientError, SequencerRpcError}, rpc_primitives::requests::{ GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse, - GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, + GetBlockRangeDataRequest, GetBlockRangeDataResponse, GetLastBlockRequest, + GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest, GetTransactionByHashResponse, }, @@ -80,6 +81,26 @@ impl SequencerClient { Ok(resp_deser) } + pub async fn get_block_range( + &self, + range: RangeInclusive, + ) -> Result { + let block_req = GetBlockRangeDataRequest { + start_block_id: *range.start(), + end_block_id: *range.end(), + }; + + let req = serde_json::to_value(block_req)?; + + let resp = self + .call_method_with_payload("get_block_range", req) + .await?; + + let resp_deser = serde_json::from_value(resp)?; + + Ok(resp_deser) + } + /// Get last known `blokc_id` from sequencer pub async fn get_last_block(&self) -> Result { let block_req = GetLastBlockRequest {}; diff --git a/integration_tests/configs/debug/wallet/wallet_config.json b/integration_tests/configs/debug/wallet/wallet_config.json index 82f2864..ac4bae8 100644 --- a/integration_tests/configs/debug/wallet/wallet_config.json +++ b/integration_tests/configs/debug/wallet/wallet_config.json @@ -2,9 +2,9 @@ "override_rust_log": null, "sequencer_addr": "http://127.0.0.1:3040", "seq_poll_timeout_millis": 12000, - "seq_poll_max_blocks": 5, + "seq_tx_poll_max_blocks": 5, "seq_poll_max_retries": 5, - "seq_poll_retry_delay_millis": 500, + "seq_block_poll_max_amount": 100, "initial_accounts": [ { "Public": { diff --git a/integration_tests/src/test_suite_map.rs b/integration_tests/src/test_suite_map.rs index 9903345..1c5f91f 100644 --- a/integration_tests/src/test_suite_map.rs +++ b/integration_tests/src/test_suite_map.rs @@ -1646,23 +1646,23 @@ pub fn prepare_function_map() -> HashMap { info!("########## test_modify_config_fields ##########"); let wallet_config = fetch_config().await.unwrap(); - let old_seq_poll_retry_delay_millis = wallet_config.seq_poll_retry_delay_millis; + let old_seq_poll_timeout_millis = wallet_config.seq_poll_timeout_millis; // Change config field let command = Command::Config(ConfigSubcommand::Set { - key: "seq_poll_retry_delay_millis".to_string(), + key: "seq_poll_timeout_millis".to_string(), value: "1000".to_string(), }); wallet::cli::execute_subcommand(command).await.unwrap(); let wallet_config = fetch_config().await.unwrap(); - assert_eq!(wallet_config.seq_poll_retry_delay_millis, 1000); + assert_eq!(wallet_config.seq_poll_timeout_millis, 1000); // Return how it was at the beginning let command = Command::Config(ConfigSubcommand::Set { - key: "seq_poll_retry_delay_millis".to_string(), - value: old_seq_poll_retry_delay_millis.to_string(), + key: "seq_poll_timeout_millis".to_string(), + value: old_seq_poll_timeout_millis.to_string(), }); wallet::cli::execute_subcommand(command).await.unwrap(); diff --git a/sequencer_rpc/Cargo.toml b/sequencer_rpc/Cargo.toml index 242e8b2..395660f 100644 --- a/sequencer_rpc/Cargo.toml +++ b/sequencer_rpc/Cargo.toml @@ -14,6 +14,7 @@ base58.workspace = true hex = "0.4.3" tempfile.workspace = true base64.workspace = true +itertools.workspace = true actix-web.workspace = true tokio.workspace = true diff --git a/sequencer_rpc/src/process.rs b/sequencer_rpc/src/process.rs index 23d5edd..387abf2 100644 --- a/sequencer_rpc/src/process.rs +++ b/sequencer_rpc/src/process.rs @@ -13,7 +13,8 @@ use common::{ requests::{ GetAccountBalanceRequest, GetAccountBalanceResponse, GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse, - GetBlockDataRequest, GetBlockDataResponse, GetGenesisIdRequest, GetGenesisIdResponse, + GetBlockDataRequest, GetBlockDataResponse, GetBlockRangeDataRequest, + GetBlockRangeDataResponse, GetGenesisIdRequest, GetGenesisIdResponse, GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest, @@ -23,6 +24,7 @@ use common::{ }, transaction::{EncodedTransaction, NSSATransaction}, }; +use itertools::Itertools as _; use log::warn; use nssa::{self, program::Program}; use sequencer_core::{TransactionMalformationError, config::AccountInitialData}; @@ -33,6 +35,7 @@ use super::{JsonHandler, respond, types::err_rpc::RpcErr}; pub const HELLO: &str = "hello"; pub const SEND_TX: &str = "send_tx"; pub const GET_BLOCK: &str = "get_block"; +pub const GET_BLOCK_RANGE: &str = "get_block_range"; pub const GET_GENESIS: &str = "get_genesis"; pub const GET_LAST_BLOCK: &str = "get_last_block"; pub const GET_ACCOUNT_BALANCE: &str = "get_account_balance"; @@ -120,6 +123,25 @@ impl JsonHandler { respond(response) } + async fn process_get_block_range_data(&self, request: Request) -> Result { + let get_block_req = GetBlockRangeDataRequest::parse(Some(request.params))?; + + let blocks = { + let state = self.sequencer_state.lock().await; + (get_block_req.start_block_id..=get_block_req.end_block_id) + .map(|block_id| state.block_store().get_block_at_id(block_id)) + .map_ok(|block| { + borsh::to_vec(&HashableBlockData::from(block)) + .expect("derived BorshSerialize should never fail") + }) + .collect::, _>>()? + }; + + let response = GetBlockRangeDataResponse { blocks }; + + respond(response) + } + async fn process_get_genesis(&self, request: Request) -> Result { let _get_genesis_req = GetGenesisIdRequest::parse(Some(request.params))?; @@ -297,6 +319,7 @@ impl JsonHandler { HELLO => self.process_temp_hello(request).await, SEND_TX => self.process_send_tx(request).await, GET_BLOCK => self.process_get_block_data(request).await, + GET_BLOCK_RANGE => self.process_get_block_range_data(request).await, GET_GENESIS => self.process_get_genesis(request).await, GET_LAST_BLOCK => self.process_get_last_block(request).await, GET_INITIAL_TESTNET_ACCOUNTS => self.get_initial_testnet_accounts(request).await, diff --git a/wallet/Cargo.toml b/wallet/Cargo.toml index 34fc84c..6f97c63 100644 --- a/wallet/Cargo.toml +++ b/wallet/Cargo.toml @@ -19,7 +19,7 @@ borsh.workspace = true base58.workspace = true hex = "0.4.3" rand.workspace = true -itertools = "0.14.0" +itertools.workspace = true sha2.workspace = true futures.workspace = true async-stream = "0.3.6" diff --git a/wallet/src/chain_storage.rs b/wallet/src/chain_storage.rs index 14e931a..0625fce 100644 --- a/wallet/src/chain_storage.rs +++ b/wallet/src/chain_storage.rs @@ -259,9 +259,9 @@ mod tests { override_rust_log: None, sequencer_addr: "http://127.0.0.1".to_string(), seq_poll_timeout_millis: 12000, - seq_poll_max_blocks: 5, + seq_tx_poll_max_blocks: 5, seq_poll_max_retries: 10, - seq_poll_retry_delay_millis: 500, + seq_block_poll_max_amount: 100, initial_accounts: create_initial_accounts(), } } diff --git a/wallet/src/cli/config.rs b/wallet/src/cli/config.rs index 68670af..df0413e 100644 --- a/wallet/src/cli/config.rs +++ b/wallet/src/cli/config.rs @@ -55,19 +55,19 @@ impl WalletSubcommand for ConfigSubcommand { wallet_core.storage.wallet_config.seq_poll_timeout_millis ); } - "seq_poll_max_blocks" => { - println!("{}", wallet_core.storage.wallet_config.seq_poll_max_blocks); + "seq_tx_poll_max_blocks" => { + println!( + "{}", + wallet_core.storage.wallet_config.seq_tx_poll_max_blocks + ); } "seq_poll_max_retries" => { println!("{}", wallet_core.storage.wallet_config.seq_poll_max_retries); } - "seq_poll_retry_delay_millis" => { + "seq_block_poll_max_amount" => { println!( "{}", - wallet_core - .storage - .wallet_config - .seq_poll_retry_delay_millis + wallet_core.storage.wallet_config.seq_block_poll_max_amount ); } "initial_accounts" => { @@ -89,17 +89,15 @@ impl WalletSubcommand for ConfigSubcommand { wallet_core.storage.wallet_config.seq_poll_timeout_millis = value.parse()?; } - "seq_poll_max_blocks" => { - wallet_core.storage.wallet_config.seq_poll_max_blocks = value.parse()?; + "seq_tx_poll_max_blocks" => { + wallet_core.storage.wallet_config.seq_tx_poll_max_blocks = value.parse()?; } "seq_poll_max_retries" => { wallet_core.storage.wallet_config.seq_poll_max_retries = value.parse()?; } - "seq_poll_retry_delay_millis" => { - wallet_core - .storage - .wallet_config - .seq_poll_retry_delay_millis = value.parse()?; + "seq_block_poll_max_amount" => { + wallet_core.storage.wallet_config.seq_block_poll_max_amount = + value.parse()?; } "initial_accounts" => { anyhow::bail!("Setting this field from wallet is not supported"); @@ -125,19 +123,19 @@ impl WalletSubcommand for ConfigSubcommand { "Sequencer client retry variable: how much time to wait between retries in milliseconds(can be zero)" ); } - "seq_poll_max_blocks" => { + "seq_tx_poll_max_blocks" => { println!( - "Sequencer client polling variable: max number of blocks to poll in parallel" + "Sequencer client polling variable: max number of blocks to poll to find a transaction" ); } "seq_poll_max_retries" => { println!( - "Sequencer client retry variable: MAX number of retries before failing(can be zero)" + "Sequencer client retry variable: max number of retries before failing(can be zero)" ); } - "seq_poll_retry_delay_millis" => { + "seq_block_poll_max_amount" => { println!( - "Sequencer client polling variable: how much time to wait in milliseconds between polling retries(can be zero)" + "Sequencer client polling variable: max number of blocks to request in one polling call" ); } "initial_accounts" => { diff --git a/wallet/src/config.rs b/wallet/src/config.rs index e11359e..ebcf283 100644 --- a/wallet/src/config.rs +++ b/wallet/src/config.rs @@ -135,12 +135,12 @@ pub struct WalletConfig { pub sequencer_addr: String, /// Sequencer polling duration for new blocks in milliseconds pub seq_poll_timeout_millis: u64, - /// Sequencer polling max number of blocks - pub seq_poll_max_blocks: usize, + /// Sequencer polling max number of blocks to find transaction + pub seq_tx_poll_max_blocks: usize, /// Sequencer polling max number error retries pub seq_poll_max_retries: u64, - /// Sequencer polling error retry delay in milliseconds - pub seq_poll_retry_delay_millis: u64, + /// Max amount of blocks to poll in one request + pub seq_block_poll_max_amount: u64, /// Initial accounts for wallet pub initial_accounts: Vec, } @@ -151,9 +151,9 @@ impl Default for WalletConfig { override_rust_log: None, sequencer_addr: "http://127.0.0.1:3040".to_string(), seq_poll_timeout_millis: 12000, - seq_poll_max_blocks: 5, + seq_tx_poll_max_blocks: 5, seq_poll_max_retries: 5, - seq_poll_retry_delay_millis: 500, + seq_block_poll_max_amount: 100, initial_accounts: { let init_acc_json = r#" [ diff --git a/wallet/src/lib.rs b/wallet/src/lib.rs index 2886dcd..13812be 100644 --- a/wallet/src/lib.rs +++ b/wallet/src/lib.rs @@ -304,6 +304,8 @@ impl WalletCore { return Ok(()); } + let before_polling = std::time::Instant::now(); + let poller = self.poller.clone(); let mut blocks = std::pin::pin!(poller.poll_block_range(self.last_synced_block + 1..=block_id)); @@ -316,13 +318,13 @@ impl WalletCore { self.last_synced_block = block.block_id; self.store_persistent_data().await?; - - println!( - "Block at id {} with timestamp {} parsed", - block.block_id, block.timestamp, - ); } + println!( + "Synced to block {block_id} in {:?}", + before_polling.elapsed() + ); + Ok(()) } diff --git a/wallet/src/poller.rs b/wallet/src/poller.rs index 0e2192d..a96b1ae 100644 --- a/wallet/src/poller.rs +++ b/wallet/src/poller.rs @@ -9,21 +9,21 @@ use crate::config::WalletConfig; #[derive(Clone)] /// Helperstruct to poll transactions pub struct TxPoller { - pub polling_max_blocks_to_query: usize, - pub polling_max_error_attempts: u64, + polling_max_blocks_to_query: usize, + polling_max_error_attempts: u64, // TODO: This should be Duration - pub polling_error_delay_millis: u64, - pub polling_delay_millis: u64, - pub client: Arc, + polling_delay_millis: u64, + block_poll_max_amount: u64, + client: Arc, } impl TxPoller { pub fn new(config: WalletConfig, client: Arc) -> Self { Self { polling_delay_millis: config.seq_poll_timeout_millis, - polling_max_blocks_to_query: config.seq_poll_max_blocks, + polling_max_blocks_to_query: config.seq_tx_poll_max_blocks, polling_max_error_attempts: config.seq_poll_max_retries, - polling_error_delay_millis: config.seq_poll_retry_delay_millis, + block_poll_max_amount: config.seq_block_poll_max_amount, client: client.clone(), } } @@ -72,11 +72,21 @@ impl TxPoller { range: std::ops::RangeInclusive, ) -> impl futures::Stream> { async_stream::stream! { - for block_id in range { - let block = borsh::from_slice::( - &self.client.get_block(block_id).await?.block, - )?; - yield Ok(block); + let mut chunk_start = *range.start(); + + loop { + let chunk_end = std::cmp::min(chunk_start + self.block_poll_max_amount - 1, *range.end()); + + let blocks = self.client.get_block_range(chunk_start..=chunk_end).await?.blocks; + for block in blocks { + let block = borsh::from_slice::(&block)?; + yield Ok(block); + } + + chunk_start = chunk_end + 1; + if chunk_start > *range.end() { + break; + } } } }