From 91c898f19cb595ec6b6c75c65aed222fecb721ce Mon Sep 17 00:00:00 2001 From: Daniil Polyakov Date: Wed, 3 Dec 2025 00:17:12 +0300 Subject: [PATCH 1/4] refactor: split block polling --- wallet/Cargo.toml | 2 + wallet/src/cli/account.rs | 13 +--- wallet/src/cli/mod.rs | 26 ++----- wallet/src/helperfunctions.rs | 132 ++-------------------------------- wallet/src/lib.rs | 94 +++++++++++++++++++++++- wallet/src/poller.rs | 16 ++++- 6 files changed, 122 insertions(+), 161 deletions(-) diff --git a/wallet/Cargo.toml b/wallet/Cargo.toml index 3b12d8f..34fc84c 100644 --- a/wallet/Cargo.toml +++ b/wallet/Cargo.toml @@ -21,6 +21,8 @@ hex = "0.4.3" rand.workspace = true itertools = "0.14.0" sha2.workspace = true +futures.workspace = true +async-stream = "0.3.6" [dependencies.key_protocol] path = "../key_protocol" diff --git a/wallet/src/cli/account.rs b/wallet/src/cli/account.rs index 5b23b2b..aeaf182 100644 --- a/wallet/src/cli/account.rs +++ b/wallet/src/cli/account.rs @@ -9,9 +9,7 @@ use serde::Serialize; use crate::{ WalletCore, cli::{SubcommandReturnValue, WalletSubcommand}, - helperfunctions::{ - AccountPrivacyKind, HumanReadableAccount, parse_addr_with_privacy_prefix, parse_block_range, - }, + helperfunctions::{AccountPrivacyKind, HumanReadableAccount, parse_addr_with_privacy_prefix}, }; const TOKEN_DEFINITION_TYPE: u8 = 0; @@ -278,7 +276,6 @@ impl WalletSubcommand for AccountSubcommand { new_subcommand.handle_subcommand(wallet_core).await } AccountSubcommand::SyncPrivate {} => { - let last_synced_block = wallet_core.last_synced_block; let curr_last_block = wallet_core .sequencer_client .get_last_block() @@ -298,13 +295,7 @@ impl WalletSubcommand for AccountSubcommand { println!("Stored persistent data at {path:#?}"); } else { - parse_block_range( - last_synced_block + 1, - curr_last_block, - wallet_core.sequencer_client.clone(), - wallet_core, - ) - .await?; + wallet_core.sync_to_block(curr_last_block).await?; } Ok(SubcommandReturnValue::SyncedToBlock(curr_last_block)) diff --git a/wallet/src/cli/mod.rs b/wallet/src/cli/mod.rs index c1def06..eb4e891 100644 --- a/wallet/src/cli/mod.rs +++ b/wallet/src/cli/mod.rs @@ -1,8 +1,5 @@ -use std::sync::Arc; - use anyhow::Result; use clap::{Parser, Subcommand}; -use common::sequencer_client::SequencerClient; use nssa::program::Program; use crate::{ @@ -16,7 +13,7 @@ use crate::{ token::TokenProgramAgnosticSubcommand, }, }, - helperfunctions::{fetch_config, parse_block_range}, + helperfunctions::fetch_config, }; pub mod account; @@ -164,29 +161,20 @@ pub async fn execute_subcommand(command: Command) -> Result Result<()> { let config = fetch_config().await?; - let seq_client = Arc::new(SequencerClient::new(config.sequencer_addr.clone())?); let mut wallet_core = WalletCore::start_from_config_update_chain(config.clone()).await?; - let mut latest_block_num = seq_client.get_last_block().await?.last_block; - let mut curr_last_block = latest_block_num; - loop { - parse_block_range( - curr_last_block, - latest_block_num, - seq_client.clone(), - &mut wallet_core, - ) - .await?; - - curr_last_block = latest_block_num + 1; + let latest_block_num = wallet_core + .sequencer_client + .get_last_block() + .await? + .last_block; + wallet_core.sync_to_block(latest_block_num).await?; tokio::time::sleep(std::time::Duration::from_millis( config.seq_poll_timeout_millis, )) .await; - - latest_block_num = seq_client.get_last_block().await?.last_block; } } diff --git a/wallet/src/helperfunctions.rs b/wallet/src/helperfunctions.rs index 19d2d56..770d2bb 100644 --- a/wallet/src/helperfunctions.rs +++ b/wallet/src/helperfunctions.rs @@ -1,21 +1,16 @@ -use std::{path::PathBuf, str::FromStr, sync::Arc}; +use std::{path::PathBuf, str::FromStr}; use anyhow::Result; use base64::{Engine, engine::general_purpose::STANDARD as BASE64}; -use common::{ - block::HashableBlockData, sequencer_client::SequencerClient, transaction::NSSATransaction, -}; -use key_protocol::{ - key_management::key_tree::traits::KeyNode as _, key_protocol_core::NSSAUserData, -}; -use nssa::{Account, privacy_preserving_transaction::message::EncryptedAccountData}; +use key_protocol::key_protocol_core::NSSAUserData; +use nssa::Account; use nssa_core::account::Nonce; use rand::{RngCore, rngs::OsRng}; use serde::Serialize; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use crate::{ - HOME_DIR_ENV_VAR, WalletCore, + HOME_DIR_ENV_VAR, config::{ InitialAccountData, InitialAccountDataPrivate, InitialAccountDataPublic, PersistentAccountDataPrivate, PersistentAccountDataPublic, PersistentStorage, WalletConfig, @@ -230,125 +225,6 @@ impl From for HumanReadableAccount { } } -pub async fn parse_block_range( - start: u64, - stop: u64, - seq_client: Arc, - wallet_core: &mut WalletCore, -) -> Result<()> { - for block_id in start..(stop + 1) { - let block = - borsh::from_slice::(&seq_client.get_block(block_id).await?.block)?; - - for tx in block.transactions { - let nssa_tx = NSSATransaction::try_from(&tx)?; - - if let NSSATransaction::PrivacyPreserving(tx) = nssa_tx { - let mut affected_accounts = vec![]; - - for (acc_account_id, (key_chain, _)) in - &wallet_core.storage.user_data.default_user_private_accounts - { - let view_tag = EncryptedAccountData::compute_view_tag( - key_chain.nullifer_public_key.clone(), - key_chain.incoming_viewing_public_key.clone(), - ); - - for (ciph_id, encrypted_data) in tx - .message() - .encrypted_private_post_states - .iter() - .enumerate() - { - if encrypted_data.view_tag == view_tag { - let ciphertext = &encrypted_data.ciphertext; - let commitment = &tx.message.new_commitments[ciph_id]; - let shared_secret = key_chain - .calculate_shared_secret_receiver(encrypted_data.epk.clone()); - - let res_acc = nssa_core::EncryptionScheme::decrypt( - ciphertext, - &shared_secret, - commitment, - ciph_id as u32, - ); - - if let Some(res_acc) = res_acc { - println!( - "Received new account for account_id {acc_account_id:#?} with account object {res_acc:#?}" - ); - - affected_accounts.push((*acc_account_id, res_acc)); - } - } - } - } - - for keys_node in wallet_core - .storage - .user_data - .private_key_tree - .key_map - .values() - { - let acc_account_id = keys_node.account_id(); - let key_chain = &keys_node.value.0; - - let view_tag = EncryptedAccountData::compute_view_tag( - key_chain.nullifer_public_key.clone(), - key_chain.incoming_viewing_public_key.clone(), - ); - - for (ciph_id, encrypted_data) in tx - .message() - .encrypted_private_post_states - .iter() - .enumerate() - { - if encrypted_data.view_tag == view_tag { - let ciphertext = &encrypted_data.ciphertext; - let commitment = &tx.message.new_commitments[ciph_id]; - let shared_secret = key_chain - .calculate_shared_secret_receiver(encrypted_data.epk.clone()); - - let res_acc = nssa_core::EncryptionScheme::decrypt( - ciphertext, - &shared_secret, - commitment, - ciph_id as u32, - ); - - if let Some(res_acc) = res_acc { - println!( - "Received new account for account_id {acc_account_id:#?} with account object {res_acc:#?}" - ); - - affected_accounts.push((acc_account_id, res_acc)); - } - } - } - } - - for (affected_account_id, new_acc) in affected_accounts { - wallet_core - .storage - .insert_private_account_data(affected_account_id, new_acc); - } - } - } - - wallet_core.last_synced_block = block_id; - wallet_core.store_persistent_data().await?; - - println!( - "Block at id {block_id} with timestamp {} parsed", - block.timestamp - ); - } - - Ok(()) -} - #[cfg(test)] mod tests { use super::*; diff --git a/wallet/src/lib.rs b/wallet/src/lib.rs index f79d947..2886dcd 100644 --- a/wallet/src/lib.rs +++ b/wallet/src/lib.rs @@ -9,9 +9,12 @@ use common::{ transaction::{EncodedTransaction, NSSATransaction}, }; use config::WalletConfig; -use key_protocol::key_management::key_tree::chain_index::ChainIndex; +use key_protocol::key_management::key_tree::{chain_index::ChainIndex, traits::KeyNode as _}; use log::info; -use nssa::{Account, AccountId, PrivacyPreservingTransaction, program::Program}; +use nssa::{ + Account, AccountId, PrivacyPreservingTransaction, + privacy_preserving_transaction::message::EncryptedAccountData, program::Program, +}; use nssa_core::{Commitment, MembershipProof, SharedSecretKey, program::InstructionData}; pub use privacy_preserving_tx::PrivacyPreservingAccount; use tokio::io::AsyncWriteExt; @@ -293,4 +296,91 @@ impl WalletCore { shared_secrets, )) } + + pub async fn sync_to_block(&mut self, block_id: u64) -> Result<()> { + use futures::TryStreamExt as _; + + if self.last_synced_block >= block_id { + return Ok(()); + } + + let poller = self.poller.clone(); + let mut blocks = + std::pin::pin!(poller.poll_block_range(self.last_synced_block + 1..=block_id)); + + while let Some(block) = blocks.try_next().await? { + for tx in block.transactions { + let nssa_tx = NSSATransaction::try_from(&tx)?; + self.sync_private_accounts_with_tx(nssa_tx); + } + + self.last_synced_block = block.block_id; + self.store_persistent_data().await?; + + println!( + "Block at id {} with timestamp {} parsed", + block.block_id, block.timestamp, + ); + } + + Ok(()) + } + + fn sync_private_accounts_with_tx(&mut self, tx: NSSATransaction) { + let NSSATransaction::PrivacyPreserving(tx) = tx else { + return; + }; + + let private_account_key_chains = self + .storage + .user_data + .default_user_private_accounts + .iter() + .map(|(acc_account_id, (key_chain, _))| (*acc_account_id, key_chain)) + .chain( + self.storage + .user_data + .private_key_tree + .key_map + .values() + .map(|keys_node| (keys_node.account_id(), &keys_node.value.0)), + ); + + let affected_accounts = private_account_key_chains + .flat_map(|(acc_account_id, key_chain)| { + let view_tag = EncryptedAccountData::compute_view_tag( + key_chain.nullifer_public_key.clone(), + key_chain.incoming_viewing_public_key.clone(), + ); + + tx.message() + .encrypted_private_post_states + .iter() + .enumerate() + .filter(move |(_, encrypted_data)| encrypted_data.view_tag == view_tag) + .filter_map(|(ciph_id, encrypted_data)| { + let ciphertext = &encrypted_data.ciphertext; + let commitment = &tx.message.new_commitments[ciph_id]; + let shared_secret = + key_chain.calculate_shared_secret_receiver(encrypted_data.epk.clone()); + + nssa_core::EncryptionScheme::decrypt( + ciphertext, + &shared_secret, + commitment, + ciph_id as u32, + ) + }) + .map(move |res_acc| (acc_account_id, res_acc)) + }) + .collect::>(); + + for (affected_account_id, new_acc) in affected_accounts { + println!( + "Received new account for account_id {affected_account_id:#?} with account object {new_acc:#?}" + ); + self.storage + .insert_private_account_data(affected_account_id, new_acc); + } + } } diff --git a/wallet/src/poller.rs b/wallet/src/poller.rs index 2b709e7..0e2192d 100644 --- a/wallet/src/poller.rs +++ b/wallet/src/poller.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use anyhow::Result; -use common::sequencer_client::SequencerClient; +use common::{block::HashableBlockData, sequencer_client::SequencerClient}; use log::{info, warn}; use crate::config::WalletConfig; @@ -66,4 +66,18 @@ impl TxPoller { anyhow::bail!("Transaction not found in preconfigured amount of blocks"); } + + pub fn poll_block_range( + &self, + range: std::ops::RangeInclusive, + ) -> impl futures::Stream> { + async_stream::stream! { + for block_id in range { + let block = borsh::from_slice::( + &self.client.get_block(block_id).await?.block, + )?; + yield Ok(block); + } + } + } } From 6b268112299a0b9234ef6b4b2c75af075dd26d9e Mon Sep 17 00:00:00 2001 From: Daniil Polyakov Date: Thu, 4 Dec 2025 14:55:45 +0300 Subject: [PATCH 2/4] feat: implement multiple blocks polling --- Cargo.toml | 1 + common/src/rpc_primitives/requests.rs | 13 +++++++ common/src/sequencer_client/mod.rs | 25 +++++++++++-- .../configs/debug/wallet/wallet_config.json | 4 +-- integration_tests/src/test_suite_map.rs | 10 +++--- sequencer_rpc/Cargo.toml | 1 + sequencer_rpc/src/process.rs | 25 ++++++++++++- wallet/Cargo.toml | 2 +- wallet/src/chain_storage.rs | 4 +-- wallet/src/cli/config.rs | 36 +++++++++---------- wallet/src/config.rs | 12 +++---- wallet/src/lib.rs | 12 ++++--- wallet/src/poller.rs | 34 +++++++++++------- 13 files changed, 124 insertions(+), 55 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a4a2b89..a54b91a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ hmac-sha512 = "1.1.7" chrono = "0.4.41" borsh = "1.5.7" base58 = "0.2.0" +itertools = "0.14.0" rocksdb = { version = "0.21.0", default-features = false, features = [ "snappy", diff --git a/common/src/rpc_primitives/requests.rs b/common/src/rpc_primitives/requests.rs index 7149472..f87dc69 100644 --- a/common/src/rpc_primitives/requests.rs +++ b/common/src/rpc_primitives/requests.rs @@ -28,6 +28,13 @@ pub struct GetBlockDataRequest { pub block_id: u64, } +/// Get a range of blocks from `start_block_id` to `end_block_id` (inclusive) +#[derive(Serialize, Deserialize, Debug)] +pub struct GetBlockRangeDataRequest { + pub start_block_id: u64, + pub end_block_id: u64, +} + #[derive(Serialize, Deserialize, Debug)] pub struct GetGenesisIdRequest {} @@ -69,6 +76,7 @@ parse_request!(HelloRequest); parse_request!(RegisterAccountRequest); parse_request!(SendTxRequest); parse_request!(GetBlockDataRequest); +parse_request!(GetBlockRangeDataRequest); parse_request!(GetGenesisIdRequest); parse_request!(GetLastBlockRequest); parse_request!(GetInitialTestnetAccountsRequest); @@ -100,6 +108,11 @@ pub struct GetBlockDataResponse { pub block: Vec, } +#[derive(Serialize, Deserialize, Debug)] +pub struct GetBlockRangeDataResponse { + pub blocks: Vec>, +} + #[derive(Serialize, Deserialize, Debug)] pub struct GetGenesisIdResponse { pub genesis_id: u64, diff --git a/common/src/sequencer_client/mod.rs b/common/src/sequencer_client/mod.rs index a31806e..7a3956b 100644 --- a/common/src/sequencer_client/mod.rs +++ b/common/src/sequencer_client/mod.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::{collections::HashMap, ops::RangeInclusive}; use anyhow::Result; use json::{SendTxRequest, SendTxResponse, SequencerRpcRequest, SequencerRpcResponse}; @@ -14,7 +14,8 @@ use crate::{ error::{SequencerClientError, SequencerRpcError}, rpc_primitives::requests::{ GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse, - GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, + GetBlockRangeDataRequest, GetBlockRangeDataResponse, GetLastBlockRequest, + GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest, GetTransactionByHashResponse, }, @@ -80,6 +81,26 @@ impl SequencerClient { Ok(resp_deser) } + pub async fn get_block_range( + &self, + range: RangeInclusive, + ) -> Result { + let block_req = GetBlockRangeDataRequest { + start_block_id: *range.start(), + end_block_id: *range.end(), + }; + + let req = serde_json::to_value(block_req)?; + + let resp = self + .call_method_with_payload("get_block_range", req) + .await?; + + let resp_deser = serde_json::from_value(resp)?; + + Ok(resp_deser) + } + /// Get last known `blokc_id` from sequencer pub async fn get_last_block(&self) -> Result { let block_req = GetLastBlockRequest {}; diff --git a/integration_tests/configs/debug/wallet/wallet_config.json b/integration_tests/configs/debug/wallet/wallet_config.json index 82f2864..ac4bae8 100644 --- a/integration_tests/configs/debug/wallet/wallet_config.json +++ b/integration_tests/configs/debug/wallet/wallet_config.json @@ -2,9 +2,9 @@ "override_rust_log": null, "sequencer_addr": "http://127.0.0.1:3040", "seq_poll_timeout_millis": 12000, - "seq_poll_max_blocks": 5, + "seq_tx_poll_max_blocks": 5, "seq_poll_max_retries": 5, - "seq_poll_retry_delay_millis": 500, + "seq_block_poll_max_amount": 100, "initial_accounts": [ { "Public": { diff --git a/integration_tests/src/test_suite_map.rs b/integration_tests/src/test_suite_map.rs index 9903345..1c5f91f 100644 --- a/integration_tests/src/test_suite_map.rs +++ b/integration_tests/src/test_suite_map.rs @@ -1646,23 +1646,23 @@ pub fn prepare_function_map() -> HashMap { info!("########## test_modify_config_fields ##########"); let wallet_config = fetch_config().await.unwrap(); - let old_seq_poll_retry_delay_millis = wallet_config.seq_poll_retry_delay_millis; + let old_seq_poll_timeout_millis = wallet_config.seq_poll_timeout_millis; // Change config field let command = Command::Config(ConfigSubcommand::Set { - key: "seq_poll_retry_delay_millis".to_string(), + key: "seq_poll_timeout_millis".to_string(), value: "1000".to_string(), }); wallet::cli::execute_subcommand(command).await.unwrap(); let wallet_config = fetch_config().await.unwrap(); - assert_eq!(wallet_config.seq_poll_retry_delay_millis, 1000); + assert_eq!(wallet_config.seq_poll_timeout_millis, 1000); // Return how it was at the beginning let command = Command::Config(ConfigSubcommand::Set { - key: "seq_poll_retry_delay_millis".to_string(), - value: old_seq_poll_retry_delay_millis.to_string(), + key: "seq_poll_timeout_millis".to_string(), + value: old_seq_poll_timeout_millis.to_string(), }); wallet::cli::execute_subcommand(command).await.unwrap(); diff --git a/sequencer_rpc/Cargo.toml b/sequencer_rpc/Cargo.toml index 242e8b2..395660f 100644 --- a/sequencer_rpc/Cargo.toml +++ b/sequencer_rpc/Cargo.toml @@ -14,6 +14,7 @@ base58.workspace = true hex = "0.4.3" tempfile.workspace = true base64.workspace = true +itertools.workspace = true actix-web.workspace = true tokio.workspace = true diff --git a/sequencer_rpc/src/process.rs b/sequencer_rpc/src/process.rs index 23d5edd..387abf2 100644 --- a/sequencer_rpc/src/process.rs +++ b/sequencer_rpc/src/process.rs @@ -13,7 +13,8 @@ use common::{ requests::{ GetAccountBalanceRequest, GetAccountBalanceResponse, GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse, - GetBlockDataRequest, GetBlockDataResponse, GetGenesisIdRequest, GetGenesisIdResponse, + GetBlockDataRequest, GetBlockDataResponse, GetBlockRangeDataRequest, + GetBlockRangeDataResponse, GetGenesisIdRequest, GetGenesisIdResponse, GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest, @@ -23,6 +24,7 @@ use common::{ }, transaction::{EncodedTransaction, NSSATransaction}, }; +use itertools::Itertools as _; use log::warn; use nssa::{self, program::Program}; use sequencer_core::{TransactionMalformationError, config::AccountInitialData}; @@ -33,6 +35,7 @@ use super::{JsonHandler, respond, types::err_rpc::RpcErr}; pub const HELLO: &str = "hello"; pub const SEND_TX: &str = "send_tx"; pub const GET_BLOCK: &str = "get_block"; +pub const GET_BLOCK_RANGE: &str = "get_block_range"; pub const GET_GENESIS: &str = "get_genesis"; pub const GET_LAST_BLOCK: &str = "get_last_block"; pub const GET_ACCOUNT_BALANCE: &str = "get_account_balance"; @@ -120,6 +123,25 @@ impl JsonHandler { respond(response) } + async fn process_get_block_range_data(&self, request: Request) -> Result { + let get_block_req = GetBlockRangeDataRequest::parse(Some(request.params))?; + + let blocks = { + let state = self.sequencer_state.lock().await; + (get_block_req.start_block_id..=get_block_req.end_block_id) + .map(|block_id| state.block_store().get_block_at_id(block_id)) + .map_ok(|block| { + borsh::to_vec(&HashableBlockData::from(block)) + .expect("derived BorshSerialize should never fail") + }) + .collect::, _>>()? + }; + + let response = GetBlockRangeDataResponse { blocks }; + + respond(response) + } + async fn process_get_genesis(&self, request: Request) -> Result { let _get_genesis_req = GetGenesisIdRequest::parse(Some(request.params))?; @@ -297,6 +319,7 @@ impl JsonHandler { HELLO => self.process_temp_hello(request).await, SEND_TX => self.process_send_tx(request).await, GET_BLOCK => self.process_get_block_data(request).await, + GET_BLOCK_RANGE => self.process_get_block_range_data(request).await, GET_GENESIS => self.process_get_genesis(request).await, GET_LAST_BLOCK => self.process_get_last_block(request).await, GET_INITIAL_TESTNET_ACCOUNTS => self.get_initial_testnet_accounts(request).await, diff --git a/wallet/Cargo.toml b/wallet/Cargo.toml index 34fc84c..6f97c63 100644 --- a/wallet/Cargo.toml +++ b/wallet/Cargo.toml @@ -19,7 +19,7 @@ borsh.workspace = true base58.workspace = true hex = "0.4.3" rand.workspace = true -itertools = "0.14.0" +itertools.workspace = true sha2.workspace = true futures.workspace = true async-stream = "0.3.6" diff --git a/wallet/src/chain_storage.rs b/wallet/src/chain_storage.rs index 14e931a..0625fce 100644 --- a/wallet/src/chain_storage.rs +++ b/wallet/src/chain_storage.rs @@ -259,9 +259,9 @@ mod tests { override_rust_log: None, sequencer_addr: "http://127.0.0.1".to_string(), seq_poll_timeout_millis: 12000, - seq_poll_max_blocks: 5, + seq_tx_poll_max_blocks: 5, seq_poll_max_retries: 10, - seq_poll_retry_delay_millis: 500, + seq_block_poll_max_amount: 100, initial_accounts: create_initial_accounts(), } } diff --git a/wallet/src/cli/config.rs b/wallet/src/cli/config.rs index 68670af..df0413e 100644 --- a/wallet/src/cli/config.rs +++ b/wallet/src/cli/config.rs @@ -55,19 +55,19 @@ impl WalletSubcommand for ConfigSubcommand { wallet_core.storage.wallet_config.seq_poll_timeout_millis ); } - "seq_poll_max_blocks" => { - println!("{}", wallet_core.storage.wallet_config.seq_poll_max_blocks); + "seq_tx_poll_max_blocks" => { + println!( + "{}", + wallet_core.storage.wallet_config.seq_tx_poll_max_blocks + ); } "seq_poll_max_retries" => { println!("{}", wallet_core.storage.wallet_config.seq_poll_max_retries); } - "seq_poll_retry_delay_millis" => { + "seq_block_poll_max_amount" => { println!( "{}", - wallet_core - .storage - .wallet_config - .seq_poll_retry_delay_millis + wallet_core.storage.wallet_config.seq_block_poll_max_amount ); } "initial_accounts" => { @@ -89,17 +89,15 @@ impl WalletSubcommand for ConfigSubcommand { wallet_core.storage.wallet_config.seq_poll_timeout_millis = value.parse()?; } - "seq_poll_max_blocks" => { - wallet_core.storage.wallet_config.seq_poll_max_blocks = value.parse()?; + "seq_tx_poll_max_blocks" => { + wallet_core.storage.wallet_config.seq_tx_poll_max_blocks = value.parse()?; } "seq_poll_max_retries" => { wallet_core.storage.wallet_config.seq_poll_max_retries = value.parse()?; } - "seq_poll_retry_delay_millis" => { - wallet_core - .storage - .wallet_config - .seq_poll_retry_delay_millis = value.parse()?; + "seq_block_poll_max_amount" => { + wallet_core.storage.wallet_config.seq_block_poll_max_amount = + value.parse()?; } "initial_accounts" => { anyhow::bail!("Setting this field from wallet is not supported"); @@ -125,19 +123,19 @@ impl WalletSubcommand for ConfigSubcommand { "Sequencer client retry variable: how much time to wait between retries in milliseconds(can be zero)" ); } - "seq_poll_max_blocks" => { + "seq_tx_poll_max_blocks" => { println!( - "Sequencer client polling variable: max number of blocks to poll in parallel" + "Sequencer client polling variable: max number of blocks to poll to find a transaction" ); } "seq_poll_max_retries" => { println!( - "Sequencer client retry variable: MAX number of retries before failing(can be zero)" + "Sequencer client retry variable: max number of retries before failing(can be zero)" ); } - "seq_poll_retry_delay_millis" => { + "seq_block_poll_max_amount" => { println!( - "Sequencer client polling variable: how much time to wait in milliseconds between polling retries(can be zero)" + "Sequencer client polling variable: max number of blocks to request in one polling call" ); } "initial_accounts" => { diff --git a/wallet/src/config.rs b/wallet/src/config.rs index e11359e..ebcf283 100644 --- a/wallet/src/config.rs +++ b/wallet/src/config.rs @@ -135,12 +135,12 @@ pub struct WalletConfig { pub sequencer_addr: String, /// Sequencer polling duration for new blocks in milliseconds pub seq_poll_timeout_millis: u64, - /// Sequencer polling max number of blocks - pub seq_poll_max_blocks: usize, + /// Sequencer polling max number of blocks to find transaction + pub seq_tx_poll_max_blocks: usize, /// Sequencer polling max number error retries pub seq_poll_max_retries: u64, - /// Sequencer polling error retry delay in milliseconds - pub seq_poll_retry_delay_millis: u64, + /// Max amount of blocks to poll in one request + pub seq_block_poll_max_amount: u64, /// Initial accounts for wallet pub initial_accounts: Vec, } @@ -151,9 +151,9 @@ impl Default for WalletConfig { override_rust_log: None, sequencer_addr: "http://127.0.0.1:3040".to_string(), seq_poll_timeout_millis: 12000, - seq_poll_max_blocks: 5, + seq_tx_poll_max_blocks: 5, seq_poll_max_retries: 5, - seq_poll_retry_delay_millis: 500, + seq_block_poll_max_amount: 100, initial_accounts: { let init_acc_json = r#" [ diff --git a/wallet/src/lib.rs b/wallet/src/lib.rs index 2886dcd..13812be 100644 --- a/wallet/src/lib.rs +++ b/wallet/src/lib.rs @@ -304,6 +304,8 @@ impl WalletCore { return Ok(()); } + let before_polling = std::time::Instant::now(); + let poller = self.poller.clone(); let mut blocks = std::pin::pin!(poller.poll_block_range(self.last_synced_block + 1..=block_id)); @@ -316,13 +318,13 @@ impl WalletCore { self.last_synced_block = block.block_id; self.store_persistent_data().await?; - - println!( - "Block at id {} with timestamp {} parsed", - block.block_id, block.timestamp, - ); } + println!( + "Synced to block {block_id} in {:?}", + before_polling.elapsed() + ); + Ok(()) } diff --git a/wallet/src/poller.rs b/wallet/src/poller.rs index 0e2192d..a96b1ae 100644 --- a/wallet/src/poller.rs +++ b/wallet/src/poller.rs @@ -9,21 +9,21 @@ use crate::config::WalletConfig; #[derive(Clone)] /// Helperstruct to poll transactions pub struct TxPoller { - pub polling_max_blocks_to_query: usize, - pub polling_max_error_attempts: u64, + polling_max_blocks_to_query: usize, + polling_max_error_attempts: u64, // TODO: This should be Duration - pub polling_error_delay_millis: u64, - pub polling_delay_millis: u64, - pub client: Arc, + polling_delay_millis: u64, + block_poll_max_amount: u64, + client: Arc, } impl TxPoller { pub fn new(config: WalletConfig, client: Arc) -> Self { Self { polling_delay_millis: config.seq_poll_timeout_millis, - polling_max_blocks_to_query: config.seq_poll_max_blocks, + polling_max_blocks_to_query: config.seq_tx_poll_max_blocks, polling_max_error_attempts: config.seq_poll_max_retries, - polling_error_delay_millis: config.seq_poll_retry_delay_millis, + block_poll_max_amount: config.seq_block_poll_max_amount, client: client.clone(), } } @@ -72,11 +72,21 @@ impl TxPoller { range: std::ops::RangeInclusive, ) -> impl futures::Stream> { async_stream::stream! { - for block_id in range { - let block = borsh::from_slice::( - &self.client.get_block(block_id).await?.block, - )?; - yield Ok(block); + let mut chunk_start = *range.start(); + + loop { + let chunk_end = std::cmp::min(chunk_start + self.block_poll_max_amount - 1, *range.end()); + + let blocks = self.client.get_block_range(chunk_start..=chunk_end).await?.blocks; + for block in blocks { + let block = borsh::from_slice::(&block)?; + yield Ok(block); + } + + chunk_start = chunk_end + 1; + if chunk_start > *range.end() { + break; + } } } } From 03e911ecd51f514baf07872bc3ac261bec610723 Mon Sep 17 00:00:00 2001 From: Daniil Polyakov Date: Wed, 3 Dec 2025 18:33:40 +0300 Subject: [PATCH 3/4] feat: apply base64 encoding for large binary data transfer --- common/Cargo.toml | 1 + common/src/rpc_primitives/requests.rs | 61 +++++++++++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/common/Cargo.toml b/common/Cargo.toml index 999c731..920ad2a 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -15,6 +15,7 @@ log.workspace = true hex.workspace = true nssa-core = { path = "../nssa/core", features = ["host"] } borsh.workspace = true +base64.workspace = true [dependencies.nssa] path = "../nssa" diff --git a/common/src/rpc_primitives/requests.rs b/common/src/rpc_primitives/requests.rs index f87dc69..e0c6d31 100644 --- a/common/src/rpc_primitives/requests.rs +++ b/common/src/rpc_primitives/requests.rs @@ -20,6 +20,7 @@ pub struct RegisterAccountRequest { #[derive(Serialize, Deserialize, Debug)] pub struct SendTxRequest { + #[serde(with = "base64_deser")] pub transaction: Vec, } @@ -105,14 +106,74 @@ pub struct SendTxResponse { #[derive(Serialize, Deserialize, Debug)] pub struct GetBlockDataResponse { + #[serde(with = "base64_deser")] pub block: Vec, } #[derive(Serialize, Deserialize, Debug)] pub struct GetBlockRangeDataResponse { + #[serde(with = "base64_deser::vec")] pub blocks: Vec>, } +mod base64_deser { + use base64::{Engine as _, engine::general_purpose}; + use serde::{self, Deserialize, Deserializer, Serializer, ser::SerializeSeq as _}; + + pub fn serialize(bytes: &[u8], serializer: S) -> Result + where + S: Serializer, + { + let base64_string = general_purpose::STANDARD.encode(bytes); + serializer.serialize_str(&base64_string) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + let base64_string: String = Deserialize::deserialize(deserializer)?; + general_purpose::STANDARD + .decode(&base64_string) + .map_err(serde::de::Error::custom) + } + + pub mod vec { + use super::*; + + pub fn serialize(bytes: &[Vec], serializer: S) -> Result + where + S: Serializer, + { + let base64_strings: Vec = bytes + .iter() + .map(|b| general_purpose::STANDARD.encode(b)) + .collect(); + let mut seq = serializer.serialize_seq(Some(base64_strings.len()))?; + for s in base64_strings { + seq.serialize_element(&s)?; + } + seq.end() + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result>, D::Error> + where + D: Deserializer<'de>, + { + let base64_strings: Vec = Deserialize::deserialize(deserializer)?; + let bytes_vec: Result>, D::Error> = base64_strings + .into_iter() + .map(|s| { + general_purpose::STANDARD + .decode(&s) + .map_err(serde::de::Error::custom) + }) + .collect(); + bytes_vec + } + } +} + #[derive(Serialize, Deserialize, Debug)] pub struct GetGenesisIdResponse { pub genesis_id: u64, From 1412ad4da4928cfe4e263cb78fd7c7c9fc6e365a Mon Sep 17 00:00:00 2001 From: Daniil Polyakov Date: Thu, 4 Dec 2025 03:51:09 +0300 Subject: [PATCH 4/4] refactor: remove redundant request and response types --- common/src/rpc_primitives/message.rs | 10 ++++ common/src/rpc_primitives/requests.rs | 23 ++++---- .../mod.rs => sequencer_client.rs} | 35 +++++++----- common/src/sequencer_client/json.rs | 53 ------------------- wallet/src/lib.rs | 3 +- .../native_token_transfer/deshielded.rs | 2 +- .../native_token_transfer/private.rs | 2 +- .../native_token_transfer/public.rs | 2 +- .../native_token_transfer/shielded.rs | 2 +- wallet/src/program_facades/pinata.rs | 2 +- wallet/src/program_facades/token.rs | 2 +- 11 files changed, 54 insertions(+), 82 deletions(-) rename common/src/{sequencer_client/mod.rs => sequencer_client.rs} (89%) delete mode 100644 common/src/sequencer_client/json.rs diff --git a/common/src/rpc_primitives/message.rs b/common/src/rpc_primitives/message.rs index 8207267..9886744 100644 --- a/common/src/rpc_primitives/message.rs +++ b/common/src/rpc_primitives/message.rs @@ -62,6 +62,16 @@ pub struct Request { } impl Request { + pub fn from_payload_version_2_0(method: String, payload: serde_json::Value) -> Self { + Self { + jsonrpc: Version, + method, + params: payload, + // ToDo: Correct checking of id + id: 1.into(), + } + } + /// Answer the request with a (positive) reply. /// /// The ID is taken from the request. diff --git a/common/src/rpc_primitives/requests.rs b/common/src/rpc_primitives/requests.rs index e0c6d31..7164193 100644 --- a/common/src/rpc_primitives/requests.rs +++ b/common/src/rpc_primitives/requests.rs @@ -141,16 +141,13 @@ mod base64_deser { pub mod vec { use super::*; - pub fn serialize(bytes: &[Vec], serializer: S) -> Result + pub fn serialize(bytes_vec: &[Vec], serializer: S) -> Result where S: Serializer, { - let base64_strings: Vec = bytes - .iter() - .map(|b| general_purpose::STANDARD.encode(b)) - .collect(); - let mut seq = serializer.serialize_seq(Some(base64_strings.len()))?; - for s in base64_strings { + let mut seq = serializer.serialize_seq(Some(bytes_vec.len()))?; + for bytes in bytes_vec { + let s = general_purpose::STANDARD.encode(bytes); seq.serialize_element(&s)?; } seq.end() @@ -161,15 +158,14 @@ mod base64_deser { D: Deserializer<'de>, { let base64_strings: Vec = Deserialize::deserialize(deserializer)?; - let bytes_vec: Result>, D::Error> = base64_strings + base64_strings .into_iter() .map(|s| { general_purpose::STANDARD .decode(&s) .map_err(serde::de::Error::custom) }) - .collect(); - bytes_vec + .collect() } } } @@ -213,3 +209,10 @@ pub struct GetProofForCommitmentResponse { pub struct GetProgramIdsResponse { pub program_ids: HashMap, } + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct GetInitialTestnetAccountsResponse { + /// Hex encoded account id + pub account_id: String, + pub balance: u64, +} diff --git a/common/src/sequencer_client/mod.rs b/common/src/sequencer_client.rs similarity index 89% rename from common/src/sequencer_client/mod.rs rename to common/src/sequencer_client.rs index 7a3956b..d3c5f23 100644 --- a/common/src/sequencer_client/mod.rs +++ b/common/src/sequencer_client.rs @@ -1,9 +1,9 @@ use std::{collections::HashMap, ops::RangeInclusive}; use anyhow::Result; -use json::{SendTxRequest, SendTxResponse, SequencerRpcRequest, SequencerRpcResponse}; use nssa_core::program::ProgramId; use reqwest::Client; +use serde::Deserialize; use serde_json::Value; use super::rpc_primitives::requests::{ @@ -12,19 +12,20 @@ use super::rpc_primitives::requests::{ }; use crate::{ error::{SequencerClientError, SequencerRpcError}, - rpc_primitives::requests::{ - GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse, - GetBlockRangeDataRequest, GetBlockRangeDataResponse, GetLastBlockRequest, - GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, - GetProofForCommitmentRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest, - GetTransactionByHashResponse, + rpc_primitives::{ + self, + requests::{ + GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, + GetAccountsNoncesResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse, + GetInitialTestnetAccountsResponse, GetLastBlockRequest, GetLastBlockResponse, + GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, + GetProofForCommitmentResponse, GetTransactionByHashRequest, + GetTransactionByHashResponse, SendTxRequest, SendTxResponse, + }, }, - sequencer_client::json::AccountInitialData, transaction::{EncodedTransaction, NSSATransaction}, }; -pub mod json; - #[derive(Clone)] pub struct SequencerClient { pub client: reqwest::Client, @@ -47,7 +48,8 @@ impl SequencerClient { method: &str, payload: Value, ) -> Result { - let request = SequencerRpcRequest::from_payload_version_2_0(method.to_string(), payload); + let request = + rpc_primitives::message::Request::from_payload_version_2_0(method.to_string(), payload); let call_builder = self.client.post(&self.sequencer_addr); @@ -55,6 +57,15 @@ impl SequencerClient { let response_vall = call_res.json::().await?; + // TODO: Actually why we need separation of `result` and `error` in rpc response? + #[derive(Debug, Clone, Deserialize)] + #[allow(dead_code)] + pub struct SequencerRpcResponse { + pub jsonrpc: String, + pub result: serde_json::Value, + pub id: u64, + } + if let Ok(response) = serde_json::from_value::(response_vall.clone()) { Ok(response.result) @@ -244,7 +255,7 @@ impl SequencerClient { /// Get initial testnet accounts from sequencer pub async fn get_initial_testnet_accounts( &self, - ) -> Result, SequencerClientError> { + ) -> Result, SequencerClientError> { let acc_req = GetInitialTestnetAccountsRequest {}; let req = serde_json::to_value(acc_req).unwrap(); diff --git a/common/src/sequencer_client/json.rs b/common/src/sequencer_client/json.rs deleted file mode 100644 index d47aea4..0000000 --- a/common/src/sequencer_client/json.rs +++ /dev/null @@ -1,53 +0,0 @@ -use serde::{Deserialize, Serialize}; - -// Requests - -#[derive(Serialize, Deserialize, Debug)] -pub struct SendTxRequest { - pub transaction: Vec, -} - -// Responses - -#[derive(Serialize, Deserialize, Debug)] -pub struct SendTxResponse { - pub status: String, - pub tx_hash: String, -} - -// General - -#[derive(Debug, Clone, Serialize)] -pub struct SequencerRpcRequest { - jsonrpc: String, - pub method: String, - pub params: serde_json::Value, - pub id: u64, -} - -impl SequencerRpcRequest { - pub fn from_payload_version_2_0(method: String, payload: serde_json::Value) -> Self { - Self { - jsonrpc: "2.0".to_string(), - method, - params: payload, - // ToDo: Correct checking of id - id: 1, - } - } -} - -#[derive(Debug, Clone, Deserialize)] -pub struct SequencerRpcResponse { - pub jsonrpc: String, - pub result: serde_json::Value, - pub id: u64, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -/// Helperstruct for account serialization -pub struct AccountInitialData { - /// Hex encoded account id - pub account_id: String, - pub balance: u64, -} diff --git a/wallet/src/lib.rs b/wallet/src/lib.rs index 13812be..91a0e4b 100644 --- a/wallet/src/lib.rs +++ b/wallet/src/lib.rs @@ -5,7 +5,8 @@ use base64::{Engine, engine::general_purpose::STANDARD as BASE64}; use chain_storage::WalletChainStore; use common::{ error::ExecutionFailureKind, - sequencer_client::{SequencerClient, json::SendTxResponse}, + rpc_primitives::requests::SendTxResponse, + sequencer_client::SequencerClient, transaction::{EncodedTransaction, NSSATransaction}, }; use config::WalletConfig; diff --git a/wallet/src/program_facades/native_token_transfer/deshielded.rs b/wallet/src/program_facades/native_token_transfer/deshielded.rs index a25be2c..35a13ba 100644 --- a/wallet/src/program_facades/native_token_transfer/deshielded.rs +++ b/wallet/src/program_facades/native_token_transfer/deshielded.rs @@ -1,4 +1,4 @@ -use common::{error::ExecutionFailureKind, sequencer_client::json::SendTxResponse}; +use common::{error::ExecutionFailureKind, rpc_primitives::requests::SendTxResponse}; use nssa::AccountId; use super::{NativeTokenTransfer, auth_transfer_preparation}; diff --git a/wallet/src/program_facades/native_token_transfer/private.rs b/wallet/src/program_facades/native_token_transfer/private.rs index fcf6eee..320027b 100644 --- a/wallet/src/program_facades/native_token_transfer/private.rs +++ b/wallet/src/program_facades/native_token_transfer/private.rs @@ -1,6 +1,6 @@ use std::vec; -use common::{error::ExecutionFailureKind, sequencer_client::json::SendTxResponse}; +use common::{error::ExecutionFailureKind, rpc_primitives::requests::SendTxResponse}; use nssa::{AccountId, program::Program}; use nssa_core::{NullifierPublicKey, SharedSecretKey, encryption::IncomingViewingPublicKey}; diff --git a/wallet/src/program_facades/native_token_transfer/public.rs b/wallet/src/program_facades/native_token_transfer/public.rs index 2edab15..7981c19 100644 --- a/wallet/src/program_facades/native_token_transfer/public.rs +++ b/wallet/src/program_facades/native_token_transfer/public.rs @@ -1,4 +1,4 @@ -use common::{error::ExecutionFailureKind, sequencer_client::json::SendTxResponse}; +use common::{error::ExecutionFailureKind, rpc_primitives::requests::SendTxResponse}; use nssa::{ AccountId, PublicTransaction, program::Program, diff --git a/wallet/src/program_facades/native_token_transfer/shielded.rs b/wallet/src/program_facades/native_token_transfer/shielded.rs index c049b13..0802d6e 100644 --- a/wallet/src/program_facades/native_token_transfer/shielded.rs +++ b/wallet/src/program_facades/native_token_transfer/shielded.rs @@ -1,4 +1,4 @@ -use common::{error::ExecutionFailureKind, sequencer_client::json::SendTxResponse}; +use common::{error::ExecutionFailureKind, rpc_primitives::requests::SendTxResponse}; use nssa::AccountId; use nssa_core::{NullifierPublicKey, SharedSecretKey, encryption::IncomingViewingPublicKey}; diff --git a/wallet/src/program_facades/pinata.rs b/wallet/src/program_facades/pinata.rs index 46bc7a1..41e7510 100644 --- a/wallet/src/program_facades/pinata.rs +++ b/wallet/src/program_facades/pinata.rs @@ -1,4 +1,4 @@ -use common::{error::ExecutionFailureKind, sequencer_client::json::SendTxResponse}; +use common::{error::ExecutionFailureKind, rpc_primitives::requests::SendTxResponse}; use nssa::AccountId; use nssa_core::SharedSecretKey; diff --git a/wallet/src/program_facades/token.rs b/wallet/src/program_facades/token.rs index 298c4f4..7c97155 100644 --- a/wallet/src/program_facades/token.rs +++ b/wallet/src/program_facades/token.rs @@ -1,4 +1,4 @@ -use common::{error::ExecutionFailureKind, sequencer_client::json::SendTxResponse}; +use common::{error::ExecutionFailureKind, rpc_primitives::requests::SendTxResponse}; use nssa::{AccountId, program::Program}; use nssa_core::{ NullifierPublicKey, SharedSecretKey, encryption::IncomingViewingPublicKey,