feat: implement multiple blocks polling

This commit is contained in:
Daniil Polyakov 2025-12-04 14:55:45 +03:00
parent 91c898f19c
commit 6b26811229
13 changed files with 124 additions and 55 deletions

View File

@ -46,6 +46,7 @@ hmac-sha512 = "1.1.7"
chrono = "0.4.41"
borsh = "1.5.7"
base58 = "0.2.0"
itertools = "0.14.0"
rocksdb = { version = "0.21.0", default-features = false, features = [
"snappy",

View File

@ -28,6 +28,13 @@ pub struct GetBlockDataRequest {
pub block_id: u64,
}
/// Get a range of blocks from `start_block_id` to `end_block_id` (inclusive)
#[derive(Serialize, Deserialize, Debug)]
pub struct GetBlockRangeDataRequest {
pub start_block_id: u64,
pub end_block_id: u64,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetGenesisIdRequest {}
@ -69,6 +76,7 @@ parse_request!(HelloRequest);
parse_request!(RegisterAccountRequest);
parse_request!(SendTxRequest);
parse_request!(GetBlockDataRequest);
parse_request!(GetBlockRangeDataRequest);
parse_request!(GetGenesisIdRequest);
parse_request!(GetLastBlockRequest);
parse_request!(GetInitialTestnetAccountsRequest);
@ -100,6 +108,11 @@ pub struct GetBlockDataResponse {
pub block: Vec<u8>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetBlockRangeDataResponse {
pub blocks: Vec<Vec<u8>>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetGenesisIdResponse {
pub genesis_id: u64,

View File

@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, ops::RangeInclusive};
use anyhow::Result;
use json::{SendTxRequest, SendTxResponse, SequencerRpcRequest, SequencerRpcResponse};
@ -14,7 +14,8 @@ use crate::{
error::{SequencerClientError, SequencerRpcError},
rpc_primitives::requests::{
GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse,
GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse,
GetBlockRangeDataRequest, GetBlockRangeDataResponse, GetLastBlockRequest,
GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse,
GetProofForCommitmentRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest,
GetTransactionByHashResponse,
},
@ -80,6 +81,26 @@ impl SequencerClient {
Ok(resp_deser)
}
pub async fn get_block_range(
&self,
range: RangeInclusive<u64>,
) -> Result<GetBlockRangeDataResponse, SequencerClientError> {
let block_req = GetBlockRangeDataRequest {
start_block_id: *range.start(),
end_block_id: *range.end(),
};
let req = serde_json::to_value(block_req)?;
let resp = self
.call_method_with_payload("get_block_range", req)
.await?;
let resp_deser = serde_json::from_value(resp)?;
Ok(resp_deser)
}
/// Get last known `blokc_id` from sequencer
pub async fn get_last_block(&self) -> Result<GetLastBlockResponse, SequencerClientError> {
let block_req = GetLastBlockRequest {};

View File

@ -2,9 +2,9 @@
"override_rust_log": null,
"sequencer_addr": "http://127.0.0.1:3040",
"seq_poll_timeout_millis": 12000,
"seq_poll_max_blocks": 5,
"seq_tx_poll_max_blocks": 5,
"seq_poll_max_retries": 5,
"seq_poll_retry_delay_millis": 500,
"seq_block_poll_max_amount": 100,
"initial_accounts": [
{
"Public": {

View File

@ -1646,23 +1646,23 @@ pub fn prepare_function_map() -> HashMap<String, TestFunction> {
info!("########## test_modify_config_fields ##########");
let wallet_config = fetch_config().await.unwrap();
let old_seq_poll_retry_delay_millis = wallet_config.seq_poll_retry_delay_millis;
let old_seq_poll_timeout_millis = wallet_config.seq_poll_timeout_millis;
// Change config field
let command = Command::Config(ConfigSubcommand::Set {
key: "seq_poll_retry_delay_millis".to_string(),
key: "seq_poll_timeout_millis".to_string(),
value: "1000".to_string(),
});
wallet::cli::execute_subcommand(command).await.unwrap();
let wallet_config = fetch_config().await.unwrap();
assert_eq!(wallet_config.seq_poll_retry_delay_millis, 1000);
assert_eq!(wallet_config.seq_poll_timeout_millis, 1000);
// Return how it was at the beginning
let command = Command::Config(ConfigSubcommand::Set {
key: "seq_poll_retry_delay_millis".to_string(),
value: old_seq_poll_retry_delay_millis.to_string(),
key: "seq_poll_timeout_millis".to_string(),
value: old_seq_poll_timeout_millis.to_string(),
});
wallet::cli::execute_subcommand(command).await.unwrap();

View File

@ -14,6 +14,7 @@ base58.workspace = true
hex = "0.4.3"
tempfile.workspace = true
base64.workspace = true
itertools.workspace = true
actix-web.workspace = true
tokio.workspace = true

View File

@ -13,7 +13,8 @@ use common::{
requests::{
GetAccountBalanceRequest, GetAccountBalanceResponse, GetAccountRequest,
GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse,
GetBlockDataRequest, GetBlockDataResponse, GetGenesisIdRequest, GetGenesisIdResponse,
GetBlockDataRequest, GetBlockDataResponse, GetBlockRangeDataRequest,
GetBlockRangeDataResponse, GetGenesisIdRequest, GetGenesisIdResponse,
GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse,
GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest,
GetProofForCommitmentResponse, GetTransactionByHashRequest,
@ -23,6 +24,7 @@ use common::{
},
transaction::{EncodedTransaction, NSSATransaction},
};
use itertools::Itertools as _;
use log::warn;
use nssa::{self, program::Program};
use sequencer_core::{TransactionMalformationError, config::AccountInitialData};
@ -33,6 +35,7 @@ use super::{JsonHandler, respond, types::err_rpc::RpcErr};
pub const HELLO: &str = "hello";
pub const SEND_TX: &str = "send_tx";
pub const GET_BLOCK: &str = "get_block";
pub const GET_BLOCK_RANGE: &str = "get_block_range";
pub const GET_GENESIS: &str = "get_genesis";
pub const GET_LAST_BLOCK: &str = "get_last_block";
pub const GET_ACCOUNT_BALANCE: &str = "get_account_balance";
@ -120,6 +123,25 @@ impl JsonHandler {
respond(response)
}
async fn process_get_block_range_data(&self, request: Request) -> Result<Value, RpcErr> {
let get_block_req = GetBlockRangeDataRequest::parse(Some(request.params))?;
let blocks = {
let state = self.sequencer_state.lock().await;
(get_block_req.start_block_id..=get_block_req.end_block_id)
.map(|block_id| state.block_store().get_block_at_id(block_id))
.map_ok(|block| {
borsh::to_vec(&HashableBlockData::from(block))
.expect("derived BorshSerialize should never fail")
})
.collect::<Result<Vec<_>, _>>()?
};
let response = GetBlockRangeDataResponse { blocks };
respond(response)
}
async fn process_get_genesis(&self, request: Request) -> Result<Value, RpcErr> {
let _get_genesis_req = GetGenesisIdRequest::parse(Some(request.params))?;
@ -297,6 +319,7 @@ impl JsonHandler {
HELLO => self.process_temp_hello(request).await,
SEND_TX => self.process_send_tx(request).await,
GET_BLOCK => self.process_get_block_data(request).await,
GET_BLOCK_RANGE => self.process_get_block_range_data(request).await,
GET_GENESIS => self.process_get_genesis(request).await,
GET_LAST_BLOCK => self.process_get_last_block(request).await,
GET_INITIAL_TESTNET_ACCOUNTS => self.get_initial_testnet_accounts(request).await,

View File

@ -19,7 +19,7 @@ borsh.workspace = true
base58.workspace = true
hex = "0.4.3"
rand.workspace = true
itertools = "0.14.0"
itertools.workspace = true
sha2.workspace = true
futures.workspace = true
async-stream = "0.3.6"

View File

@ -259,9 +259,9 @@ mod tests {
override_rust_log: None,
sequencer_addr: "http://127.0.0.1".to_string(),
seq_poll_timeout_millis: 12000,
seq_poll_max_blocks: 5,
seq_tx_poll_max_blocks: 5,
seq_poll_max_retries: 10,
seq_poll_retry_delay_millis: 500,
seq_block_poll_max_amount: 100,
initial_accounts: create_initial_accounts(),
}
}

View File

@ -55,19 +55,19 @@ impl WalletSubcommand for ConfigSubcommand {
wallet_core.storage.wallet_config.seq_poll_timeout_millis
);
}
"seq_poll_max_blocks" => {
println!("{}", wallet_core.storage.wallet_config.seq_poll_max_blocks);
"seq_tx_poll_max_blocks" => {
println!(
"{}",
wallet_core.storage.wallet_config.seq_tx_poll_max_blocks
);
}
"seq_poll_max_retries" => {
println!("{}", wallet_core.storage.wallet_config.seq_poll_max_retries);
}
"seq_poll_retry_delay_millis" => {
"seq_block_poll_max_amount" => {
println!(
"{}",
wallet_core
.storage
.wallet_config
.seq_poll_retry_delay_millis
wallet_core.storage.wallet_config.seq_block_poll_max_amount
);
}
"initial_accounts" => {
@ -89,17 +89,15 @@ impl WalletSubcommand for ConfigSubcommand {
wallet_core.storage.wallet_config.seq_poll_timeout_millis =
value.parse()?;
}
"seq_poll_max_blocks" => {
wallet_core.storage.wallet_config.seq_poll_max_blocks = value.parse()?;
"seq_tx_poll_max_blocks" => {
wallet_core.storage.wallet_config.seq_tx_poll_max_blocks = value.parse()?;
}
"seq_poll_max_retries" => {
wallet_core.storage.wallet_config.seq_poll_max_retries = value.parse()?;
}
"seq_poll_retry_delay_millis" => {
wallet_core
.storage
.wallet_config
.seq_poll_retry_delay_millis = value.parse()?;
"seq_block_poll_max_amount" => {
wallet_core.storage.wallet_config.seq_block_poll_max_amount =
value.parse()?;
}
"initial_accounts" => {
anyhow::bail!("Setting this field from wallet is not supported");
@ -125,19 +123,19 @@ impl WalletSubcommand for ConfigSubcommand {
"Sequencer client retry variable: how much time to wait between retries in milliseconds(can be zero)"
);
}
"seq_poll_max_blocks" => {
"seq_tx_poll_max_blocks" => {
println!(
"Sequencer client polling variable: max number of blocks to poll in parallel"
"Sequencer client polling variable: max number of blocks to poll to find a transaction"
);
}
"seq_poll_max_retries" => {
println!(
"Sequencer client retry variable: MAX number of retries before failing(can be zero)"
"Sequencer client retry variable: max number of retries before failing(can be zero)"
);
}
"seq_poll_retry_delay_millis" => {
"seq_block_poll_max_amount" => {
println!(
"Sequencer client polling variable: how much time to wait in milliseconds between polling retries(can be zero)"
"Sequencer client polling variable: max number of blocks to request in one polling call"
);
}
"initial_accounts" => {

View File

@ -135,12 +135,12 @@ pub struct WalletConfig {
pub sequencer_addr: String,
/// Sequencer polling duration for new blocks in milliseconds
pub seq_poll_timeout_millis: u64,
/// Sequencer polling max number of blocks
pub seq_poll_max_blocks: usize,
/// Sequencer polling max number of blocks to find transaction
pub seq_tx_poll_max_blocks: usize,
/// Sequencer polling max number error retries
pub seq_poll_max_retries: u64,
/// Sequencer polling error retry delay in milliseconds
pub seq_poll_retry_delay_millis: u64,
/// Max amount of blocks to poll in one request
pub seq_block_poll_max_amount: u64,
/// Initial accounts for wallet
pub initial_accounts: Vec<InitialAccountData>,
}
@ -151,9 +151,9 @@ impl Default for WalletConfig {
override_rust_log: None,
sequencer_addr: "http://127.0.0.1:3040".to_string(),
seq_poll_timeout_millis: 12000,
seq_poll_max_blocks: 5,
seq_tx_poll_max_blocks: 5,
seq_poll_max_retries: 5,
seq_poll_retry_delay_millis: 500,
seq_block_poll_max_amount: 100,
initial_accounts: {
let init_acc_json = r#"
[

View File

@ -304,6 +304,8 @@ impl WalletCore {
return Ok(());
}
let before_polling = std::time::Instant::now();
let poller = self.poller.clone();
let mut blocks =
std::pin::pin!(poller.poll_block_range(self.last_synced_block + 1..=block_id));
@ -316,13 +318,13 @@ impl WalletCore {
self.last_synced_block = block.block_id;
self.store_persistent_data().await?;
println!(
"Block at id {} with timestamp {} parsed",
block.block_id, block.timestamp,
);
}
println!(
"Synced to block {block_id} in {:?}",
before_polling.elapsed()
);
Ok(())
}

View File

@ -9,21 +9,21 @@ use crate::config::WalletConfig;
#[derive(Clone)]
/// Helperstruct to poll transactions
pub struct TxPoller {
pub polling_max_blocks_to_query: usize,
pub polling_max_error_attempts: u64,
polling_max_blocks_to_query: usize,
polling_max_error_attempts: u64,
// TODO: This should be Duration
pub polling_error_delay_millis: u64,
pub polling_delay_millis: u64,
pub client: Arc<SequencerClient>,
polling_delay_millis: u64,
block_poll_max_amount: u64,
client: Arc<SequencerClient>,
}
impl TxPoller {
pub fn new(config: WalletConfig, client: Arc<SequencerClient>) -> Self {
Self {
polling_delay_millis: config.seq_poll_timeout_millis,
polling_max_blocks_to_query: config.seq_poll_max_blocks,
polling_max_blocks_to_query: config.seq_tx_poll_max_blocks,
polling_max_error_attempts: config.seq_poll_max_retries,
polling_error_delay_millis: config.seq_poll_retry_delay_millis,
block_poll_max_amount: config.seq_block_poll_max_amount,
client: client.clone(),
}
}
@ -72,11 +72,21 @@ impl TxPoller {
range: std::ops::RangeInclusive<u64>,
) -> impl futures::Stream<Item = Result<HashableBlockData>> {
async_stream::stream! {
for block_id in range {
let block = borsh::from_slice::<HashableBlockData>(
&self.client.get_block(block_id).await?.block,
)?;
yield Ok(block);
let mut chunk_start = *range.start();
loop {
let chunk_end = std::cmp::min(chunk_start + self.block_poll_max_amount - 1, *range.end());
let blocks = self.client.get_block_range(chunk_start..=chunk_end).await?.blocks;
for block in blocks {
let block = borsh::from_slice::<HashableBlockData>(&block)?;
yield Ok(block);
}
chunk_start = chunk_end + 1;
if chunk_start > *range.end() {
break;
}
}
}
}