Merge pull request #217 from vacp2p/arjentix/iss-188-fix-wallet-long-sync

This commit is contained in:
Daniil Polyakov 2025-12-04 22:13:14 +03:00 committed by GitHub
commit f1aed0b632
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 340 additions and 276 deletions

View File

@ -46,6 +46,7 @@ hmac-sha512 = "1.1.7"
chrono = "0.4.41"
borsh = "1.5.7"
base58 = "0.2.0"
itertools = "0.14.0"
rocksdb = { version = "0.21.0", default-features = false, features = [
"snappy",

View File

@ -15,6 +15,7 @@ log.workspace = true
hex.workspace = true
nssa-core = { path = "../nssa/core", features = ["host"] }
borsh.workspace = true
base64.workspace = true
[dependencies.nssa]
path = "../nssa"

View File

@ -62,6 +62,16 @@ pub struct Request {
}
impl Request {
pub fn from_payload_version_2_0(method: String, payload: serde_json::Value) -> Self {
Self {
jsonrpc: Version,
method,
params: payload,
// ToDo: Correct checking of id
id: 1.into(),
}
}
/// Answer the request with a (positive) reply.
///
/// The ID is taken from the request.

View File

@ -20,6 +20,7 @@ pub struct RegisterAccountRequest {
#[derive(Serialize, Deserialize, Debug)]
pub struct SendTxRequest {
#[serde(with = "base64_deser")]
pub transaction: Vec<u8>,
}
@ -28,6 +29,13 @@ pub struct GetBlockDataRequest {
pub block_id: u64,
}
/// Get a range of blocks from `start_block_id` to `end_block_id` (inclusive)
#[derive(Serialize, Deserialize, Debug)]
pub struct GetBlockRangeDataRequest {
pub start_block_id: u64,
pub end_block_id: u64,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetGenesisIdRequest {}
@ -69,6 +77,7 @@ parse_request!(HelloRequest);
parse_request!(RegisterAccountRequest);
parse_request!(SendTxRequest);
parse_request!(GetBlockDataRequest);
parse_request!(GetBlockRangeDataRequest);
parse_request!(GetGenesisIdRequest);
parse_request!(GetLastBlockRequest);
parse_request!(GetInitialTestnetAccountsRequest);
@ -97,9 +106,70 @@ pub struct SendTxResponse {
#[derive(Serialize, Deserialize, Debug)]
pub struct GetBlockDataResponse {
#[serde(with = "base64_deser")]
pub block: Vec<u8>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetBlockRangeDataResponse {
#[serde(with = "base64_deser::vec")]
pub blocks: Vec<Vec<u8>>,
}
mod base64_deser {
use base64::{Engine as _, engine::general_purpose};
use serde::{self, Deserialize, Deserializer, Serializer, ser::SerializeSeq as _};
pub fn serialize<S>(bytes: &[u8], serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let base64_string = general_purpose::STANDARD.encode(bytes);
serializer.serialize_str(&base64_string)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let base64_string: String = Deserialize::deserialize(deserializer)?;
general_purpose::STANDARD
.decode(&base64_string)
.map_err(serde::de::Error::custom)
}
pub mod vec {
use super::*;
pub fn serialize<S>(bytes_vec: &[Vec<u8>], serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut seq = serializer.serialize_seq(Some(bytes_vec.len()))?;
for bytes in bytes_vec {
let s = general_purpose::STANDARD.encode(bytes);
seq.serialize_element(&s)?;
}
seq.end()
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<Vec<u8>>, D::Error>
where
D: Deserializer<'de>,
{
let base64_strings: Vec<String> = Deserialize::deserialize(deserializer)?;
base64_strings
.into_iter()
.map(|s| {
general_purpose::STANDARD
.decode(&s)
.map_err(serde::de::Error::custom)
})
.collect()
}
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetGenesisIdResponse {
pub genesis_id: u64,
@ -139,3 +209,10 @@ pub struct GetProofForCommitmentResponse {
pub struct GetProgramIdsResponse {
pub program_ids: HashMap<String, ProgramId>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct GetInitialTestnetAccountsResponse {
/// Hex encoded account id
pub account_id: String,
pub balance: u64,
}

View File

@ -1,9 +1,9 @@
use std::collections::HashMap;
use std::{collections::HashMap, ops::RangeInclusive};
use anyhow::Result;
use json::{SendTxRequest, SendTxResponse, SequencerRpcRequest, SequencerRpcResponse};
use nssa_core::program::ProgramId;
use reqwest::Client;
use serde::Deserialize;
use serde_json::Value;
use super::rpc_primitives::requests::{
@ -12,18 +12,20 @@ use super::rpc_primitives::requests::{
};
use crate::{
error::{SequencerClientError, SequencerRpcError},
rpc_primitives::requests::{
GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse,
GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse,
GetProofForCommitmentRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest,
GetTransactionByHashResponse,
rpc_primitives::{
self,
requests::{
GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest,
GetAccountsNoncesResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse,
GetInitialTestnetAccountsResponse, GetLastBlockRequest, GetLastBlockResponse,
GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest,
GetProofForCommitmentResponse, GetTransactionByHashRequest,
GetTransactionByHashResponse, SendTxRequest, SendTxResponse,
},
},
sequencer_client::json::AccountInitialData,
transaction::{EncodedTransaction, NSSATransaction},
};
pub mod json;
#[derive(Clone)]
pub struct SequencerClient {
pub client: reqwest::Client,
@ -46,7 +48,8 @@ impl SequencerClient {
method: &str,
payload: Value,
) -> Result<Value, SequencerClientError> {
let request = SequencerRpcRequest::from_payload_version_2_0(method.to_string(), payload);
let request =
rpc_primitives::message::Request::from_payload_version_2_0(method.to_string(), payload);
let call_builder = self.client.post(&self.sequencer_addr);
@ -54,6 +57,15 @@ impl SequencerClient {
let response_vall = call_res.json::<Value>().await?;
// TODO: Actually why we need separation of `result` and `error` in rpc response?
#[derive(Debug, Clone, Deserialize)]
#[allow(dead_code)]
pub struct SequencerRpcResponse {
pub jsonrpc: String,
pub result: serde_json::Value,
pub id: u64,
}
if let Ok(response) = serde_json::from_value::<SequencerRpcResponse>(response_vall.clone())
{
Ok(response.result)
@ -80,6 +92,26 @@ impl SequencerClient {
Ok(resp_deser)
}
pub async fn get_block_range(
&self,
range: RangeInclusive<u64>,
) -> Result<GetBlockRangeDataResponse, SequencerClientError> {
let block_req = GetBlockRangeDataRequest {
start_block_id: *range.start(),
end_block_id: *range.end(),
};
let req = serde_json::to_value(block_req)?;
let resp = self
.call_method_with_payload("get_block_range", req)
.await?;
let resp_deser = serde_json::from_value(resp)?;
Ok(resp_deser)
}
/// Get last known `blokc_id` from sequencer
pub async fn get_last_block(&self) -> Result<GetLastBlockResponse, SequencerClientError> {
let block_req = GetLastBlockRequest {};
@ -223,7 +255,7 @@ impl SequencerClient {
/// Get initial testnet accounts from sequencer
pub async fn get_initial_testnet_accounts(
&self,
) -> Result<Vec<AccountInitialData>, SequencerClientError> {
) -> Result<Vec<GetInitialTestnetAccountsResponse>, SequencerClientError> {
let acc_req = GetInitialTestnetAccountsRequest {};
let req = serde_json::to_value(acc_req).unwrap();

View File

@ -1,53 +0,0 @@
use serde::{Deserialize, Serialize};
// Requests
#[derive(Serialize, Deserialize, Debug)]
pub struct SendTxRequest {
pub transaction: Vec<u8>,
}
// Responses
#[derive(Serialize, Deserialize, Debug)]
pub struct SendTxResponse {
pub status: String,
pub tx_hash: String,
}
// General
#[derive(Debug, Clone, Serialize)]
pub struct SequencerRpcRequest {
jsonrpc: String,
pub method: String,
pub params: serde_json::Value,
pub id: u64,
}
impl SequencerRpcRequest {
pub fn from_payload_version_2_0(method: String, payload: serde_json::Value) -> Self {
Self {
jsonrpc: "2.0".to_string(),
method,
params: payload,
// ToDo: Correct checking of id
id: 1,
}
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct SequencerRpcResponse {
pub jsonrpc: String,
pub result: serde_json::Value,
pub id: u64,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
/// Helperstruct for account serialization
pub struct AccountInitialData {
/// Hex encoded account id
pub account_id: String,
pub balance: u64,
}

View File

@ -2,9 +2,9 @@
"override_rust_log": null,
"sequencer_addr": "http://127.0.0.1:3040",
"seq_poll_timeout_millis": 12000,
"seq_poll_max_blocks": 5,
"seq_tx_poll_max_blocks": 5,
"seq_poll_max_retries": 5,
"seq_poll_retry_delay_millis": 500,
"seq_block_poll_max_amount": 100,
"initial_accounts": [
{
"Public": {

View File

@ -1646,23 +1646,23 @@ pub fn prepare_function_map() -> HashMap<String, TestFunction> {
info!("########## test_modify_config_fields ##########");
let wallet_config = fetch_config().await.unwrap();
let old_seq_poll_retry_delay_millis = wallet_config.seq_poll_retry_delay_millis;
let old_seq_poll_timeout_millis = wallet_config.seq_poll_timeout_millis;
// Change config field
let command = Command::Config(ConfigSubcommand::Set {
key: "seq_poll_retry_delay_millis".to_string(),
key: "seq_poll_timeout_millis".to_string(),
value: "1000".to_string(),
});
wallet::cli::execute_subcommand(command).await.unwrap();
let wallet_config = fetch_config().await.unwrap();
assert_eq!(wallet_config.seq_poll_retry_delay_millis, 1000);
assert_eq!(wallet_config.seq_poll_timeout_millis, 1000);
// Return how it was at the beginning
let command = Command::Config(ConfigSubcommand::Set {
key: "seq_poll_retry_delay_millis".to_string(),
value: old_seq_poll_retry_delay_millis.to_string(),
key: "seq_poll_timeout_millis".to_string(),
value: old_seq_poll_timeout_millis.to_string(),
});
wallet::cli::execute_subcommand(command).await.unwrap();

View File

@ -14,6 +14,7 @@ base58.workspace = true
hex = "0.4.3"
tempfile.workspace = true
base64.workspace = true
itertools.workspace = true
actix-web.workspace = true
tokio.workspace = true

View File

@ -13,7 +13,8 @@ use common::{
requests::{
GetAccountBalanceRequest, GetAccountBalanceResponse, GetAccountRequest,
GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse,
GetBlockDataRequest, GetBlockDataResponse, GetGenesisIdRequest, GetGenesisIdResponse,
GetBlockDataRequest, GetBlockDataResponse, GetBlockRangeDataRequest,
GetBlockRangeDataResponse, GetGenesisIdRequest, GetGenesisIdResponse,
GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse,
GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest,
GetProofForCommitmentResponse, GetTransactionByHashRequest,
@ -23,6 +24,7 @@ use common::{
},
transaction::{EncodedTransaction, NSSATransaction},
};
use itertools::Itertools as _;
use log::warn;
use nssa::{self, program::Program};
use sequencer_core::{TransactionMalformationError, config::AccountInitialData};
@ -33,6 +35,7 @@ use super::{JsonHandler, respond, types::err_rpc::RpcErr};
pub const HELLO: &str = "hello";
pub const SEND_TX: &str = "send_tx";
pub const GET_BLOCK: &str = "get_block";
pub const GET_BLOCK_RANGE: &str = "get_block_range";
pub const GET_GENESIS: &str = "get_genesis";
pub const GET_LAST_BLOCK: &str = "get_last_block";
pub const GET_ACCOUNT_BALANCE: &str = "get_account_balance";
@ -120,6 +123,25 @@ impl JsonHandler {
respond(response)
}
async fn process_get_block_range_data(&self, request: Request) -> Result<Value, RpcErr> {
let get_block_req = GetBlockRangeDataRequest::parse(Some(request.params))?;
let blocks = {
let state = self.sequencer_state.lock().await;
(get_block_req.start_block_id..=get_block_req.end_block_id)
.map(|block_id| state.block_store().get_block_at_id(block_id))
.map_ok(|block| {
borsh::to_vec(&HashableBlockData::from(block))
.expect("derived BorshSerialize should never fail")
})
.collect::<Result<Vec<_>, _>>()?
};
let response = GetBlockRangeDataResponse { blocks };
respond(response)
}
async fn process_get_genesis(&self, request: Request) -> Result<Value, RpcErr> {
let _get_genesis_req = GetGenesisIdRequest::parse(Some(request.params))?;
@ -297,6 +319,7 @@ impl JsonHandler {
HELLO => self.process_temp_hello(request).await,
SEND_TX => self.process_send_tx(request).await,
GET_BLOCK => self.process_get_block_data(request).await,
GET_BLOCK_RANGE => self.process_get_block_range_data(request).await,
GET_GENESIS => self.process_get_genesis(request).await,
GET_LAST_BLOCK => self.process_get_last_block(request).await,
GET_INITIAL_TESTNET_ACCOUNTS => self.get_initial_testnet_accounts(request).await,

View File

@ -19,8 +19,10 @@ borsh.workspace = true
base58.workspace = true
hex = "0.4.3"
rand.workspace = true
itertools = "0.14.0"
itertools.workspace = true
sha2.workspace = true
futures.workspace = true
async-stream = "0.3.6"
[dependencies.key_protocol]
path = "../key_protocol"

View File

@ -259,9 +259,9 @@ mod tests {
override_rust_log: None,
sequencer_addr: "http://127.0.0.1".to_string(),
seq_poll_timeout_millis: 12000,
seq_poll_max_blocks: 5,
seq_tx_poll_max_blocks: 5,
seq_poll_max_retries: 10,
seq_poll_retry_delay_millis: 500,
seq_block_poll_max_amount: 100,
initial_accounts: create_initial_accounts(),
}
}

View File

@ -9,9 +9,7 @@ use serde::Serialize;
use crate::{
WalletCore,
cli::{SubcommandReturnValue, WalletSubcommand},
helperfunctions::{
AccountPrivacyKind, HumanReadableAccount, parse_addr_with_privacy_prefix, parse_block_range,
},
helperfunctions::{AccountPrivacyKind, HumanReadableAccount, parse_addr_with_privacy_prefix},
};
const TOKEN_DEFINITION_TYPE: u8 = 0;
@ -278,7 +276,6 @@ impl WalletSubcommand for AccountSubcommand {
new_subcommand.handle_subcommand(wallet_core).await
}
AccountSubcommand::SyncPrivate {} => {
let last_synced_block = wallet_core.last_synced_block;
let curr_last_block = wallet_core
.sequencer_client
.get_last_block()
@ -298,13 +295,7 @@ impl WalletSubcommand for AccountSubcommand {
println!("Stored persistent data at {path:#?}");
} else {
parse_block_range(
last_synced_block + 1,
curr_last_block,
wallet_core.sequencer_client.clone(),
wallet_core,
)
.await?;
wallet_core.sync_to_block(curr_last_block).await?;
}
Ok(SubcommandReturnValue::SyncedToBlock(curr_last_block))

View File

@ -55,19 +55,19 @@ impl WalletSubcommand for ConfigSubcommand {
wallet_core.storage.wallet_config.seq_poll_timeout_millis
);
}
"seq_poll_max_blocks" => {
println!("{}", wallet_core.storage.wallet_config.seq_poll_max_blocks);
"seq_tx_poll_max_blocks" => {
println!(
"{}",
wallet_core.storage.wallet_config.seq_tx_poll_max_blocks
);
}
"seq_poll_max_retries" => {
println!("{}", wallet_core.storage.wallet_config.seq_poll_max_retries);
}
"seq_poll_retry_delay_millis" => {
"seq_block_poll_max_amount" => {
println!(
"{}",
wallet_core
.storage
.wallet_config
.seq_poll_retry_delay_millis
wallet_core.storage.wallet_config.seq_block_poll_max_amount
);
}
"initial_accounts" => {
@ -89,17 +89,15 @@ impl WalletSubcommand for ConfigSubcommand {
wallet_core.storage.wallet_config.seq_poll_timeout_millis =
value.parse()?;
}
"seq_poll_max_blocks" => {
wallet_core.storage.wallet_config.seq_poll_max_blocks = value.parse()?;
"seq_tx_poll_max_blocks" => {
wallet_core.storage.wallet_config.seq_tx_poll_max_blocks = value.parse()?;
}
"seq_poll_max_retries" => {
wallet_core.storage.wallet_config.seq_poll_max_retries = value.parse()?;
}
"seq_poll_retry_delay_millis" => {
wallet_core
.storage
.wallet_config
.seq_poll_retry_delay_millis = value.parse()?;
"seq_block_poll_max_amount" => {
wallet_core.storage.wallet_config.seq_block_poll_max_amount =
value.parse()?;
}
"initial_accounts" => {
anyhow::bail!("Setting this field from wallet is not supported");
@ -125,19 +123,19 @@ impl WalletSubcommand for ConfigSubcommand {
"Sequencer client retry variable: how much time to wait between retries in milliseconds(can be zero)"
);
}
"seq_poll_max_blocks" => {
"seq_tx_poll_max_blocks" => {
println!(
"Sequencer client polling variable: max number of blocks to poll in parallel"
"Sequencer client polling variable: max number of blocks to poll to find a transaction"
);
}
"seq_poll_max_retries" => {
println!(
"Sequencer client retry variable: MAX number of retries before failing(can be zero)"
"Sequencer client retry variable: max number of retries before failing(can be zero)"
);
}
"seq_poll_retry_delay_millis" => {
"seq_block_poll_max_amount" => {
println!(
"Sequencer client polling variable: how much time to wait in milliseconds between polling retries(can be zero)"
"Sequencer client polling variable: max number of blocks to request in one polling call"
);
}
"initial_accounts" => {

View File

@ -1,8 +1,5 @@
use std::sync::Arc;
use anyhow::Result;
use clap::{Parser, Subcommand};
use common::sequencer_client::SequencerClient;
use nssa::program::Program;
use crate::{
@ -16,7 +13,7 @@ use crate::{
token::TokenProgramAgnosticSubcommand,
},
},
helperfunctions::{fetch_config, parse_block_range},
helperfunctions::fetch_config,
};
pub mod account;
@ -164,29 +161,20 @@ pub async fn execute_subcommand(command: Command) -> Result<SubcommandReturnValu
pub async fn execute_continuous_run() -> Result<()> {
let config = fetch_config().await?;
let seq_client = Arc::new(SequencerClient::new(config.sequencer_addr.clone())?);
let mut wallet_core = WalletCore::start_from_config_update_chain(config.clone()).await?;
let mut latest_block_num = seq_client.get_last_block().await?.last_block;
let mut curr_last_block = latest_block_num;
loop {
parse_block_range(
curr_last_block,
latest_block_num,
seq_client.clone(),
&mut wallet_core,
)
.await?;
curr_last_block = latest_block_num + 1;
let latest_block_num = wallet_core
.sequencer_client
.get_last_block()
.await?
.last_block;
wallet_core.sync_to_block(latest_block_num).await?;
tokio::time::sleep(std::time::Duration::from_millis(
config.seq_poll_timeout_millis,
))
.await;
latest_block_num = seq_client.get_last_block().await?.last_block;
}
}

View File

@ -135,12 +135,12 @@ pub struct WalletConfig {
pub sequencer_addr: String,
/// Sequencer polling duration for new blocks in milliseconds
pub seq_poll_timeout_millis: u64,
/// Sequencer polling max number of blocks
pub seq_poll_max_blocks: usize,
/// Sequencer polling max number of blocks to find transaction
pub seq_tx_poll_max_blocks: usize,
/// Sequencer polling max number error retries
pub seq_poll_max_retries: u64,
/// Sequencer polling error retry delay in milliseconds
pub seq_poll_retry_delay_millis: u64,
/// Max amount of blocks to poll in one request
pub seq_block_poll_max_amount: u64,
/// Initial accounts for wallet
pub initial_accounts: Vec<InitialAccountData>,
}
@ -151,9 +151,9 @@ impl Default for WalletConfig {
override_rust_log: None,
sequencer_addr: "http://127.0.0.1:3040".to_string(),
seq_poll_timeout_millis: 12000,
seq_poll_max_blocks: 5,
seq_tx_poll_max_blocks: 5,
seq_poll_max_retries: 5,
seq_poll_retry_delay_millis: 500,
seq_block_poll_max_amount: 100,
initial_accounts: {
let init_acc_json = r#"
[

View File

@ -1,21 +1,16 @@
use std::{path::PathBuf, str::FromStr, sync::Arc};
use std::{path::PathBuf, str::FromStr};
use anyhow::Result;
use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
use common::{
block::HashableBlockData, sequencer_client::SequencerClient, transaction::NSSATransaction,
};
use key_protocol::{
key_management::key_tree::traits::KeyNode as _, key_protocol_core::NSSAUserData,
};
use nssa::{Account, privacy_preserving_transaction::message::EncryptedAccountData};
use key_protocol::key_protocol_core::NSSAUserData;
use nssa::Account;
use nssa_core::account::Nonce;
use rand::{RngCore, rngs::OsRng};
use serde::Serialize;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use crate::{
HOME_DIR_ENV_VAR, WalletCore,
HOME_DIR_ENV_VAR,
config::{
InitialAccountData, InitialAccountDataPrivate, InitialAccountDataPublic,
PersistentAccountDataPrivate, PersistentAccountDataPublic, PersistentStorage, WalletConfig,
@ -230,125 +225,6 @@ impl From<Account> for HumanReadableAccount {
}
}
pub async fn parse_block_range(
start: u64,
stop: u64,
seq_client: Arc<SequencerClient>,
wallet_core: &mut WalletCore,
) -> Result<()> {
for block_id in start..(stop + 1) {
let block =
borsh::from_slice::<HashableBlockData>(&seq_client.get_block(block_id).await?.block)?;
for tx in block.transactions {
let nssa_tx = NSSATransaction::try_from(&tx)?;
if let NSSATransaction::PrivacyPreserving(tx) = nssa_tx {
let mut affected_accounts = vec![];
for (acc_account_id, (key_chain, _)) in
&wallet_core.storage.user_data.default_user_private_accounts
{
let view_tag = EncryptedAccountData::compute_view_tag(
key_chain.nullifer_public_key.clone(),
key_chain.incoming_viewing_public_key.clone(),
);
for (ciph_id, encrypted_data) in tx
.message()
.encrypted_private_post_states
.iter()
.enumerate()
{
if encrypted_data.view_tag == view_tag {
let ciphertext = &encrypted_data.ciphertext;
let commitment = &tx.message.new_commitments[ciph_id];
let shared_secret = key_chain
.calculate_shared_secret_receiver(encrypted_data.epk.clone());
let res_acc = nssa_core::EncryptionScheme::decrypt(
ciphertext,
&shared_secret,
commitment,
ciph_id as u32,
);
if let Some(res_acc) = res_acc {
println!(
"Received new account for account_id {acc_account_id:#?} with account object {res_acc:#?}"
);
affected_accounts.push((*acc_account_id, res_acc));
}
}
}
}
for keys_node in wallet_core
.storage
.user_data
.private_key_tree
.key_map
.values()
{
let acc_account_id = keys_node.account_id();
let key_chain = &keys_node.value.0;
let view_tag = EncryptedAccountData::compute_view_tag(
key_chain.nullifer_public_key.clone(),
key_chain.incoming_viewing_public_key.clone(),
);
for (ciph_id, encrypted_data) in tx
.message()
.encrypted_private_post_states
.iter()
.enumerate()
{
if encrypted_data.view_tag == view_tag {
let ciphertext = &encrypted_data.ciphertext;
let commitment = &tx.message.new_commitments[ciph_id];
let shared_secret = key_chain
.calculate_shared_secret_receiver(encrypted_data.epk.clone());
let res_acc = nssa_core::EncryptionScheme::decrypt(
ciphertext,
&shared_secret,
commitment,
ciph_id as u32,
);
if let Some(res_acc) = res_acc {
println!(
"Received new account for account_id {acc_account_id:#?} with account object {res_acc:#?}"
);
affected_accounts.push((acc_account_id, res_acc));
}
}
}
}
for (affected_account_id, new_acc) in affected_accounts {
wallet_core
.storage
.insert_private_account_data(affected_account_id, new_acc);
}
}
}
wallet_core.last_synced_block = block_id;
wallet_core.store_persistent_data().await?;
println!(
"Block at id {block_id} with timestamp {} parsed",
block.timestamp
);
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -5,13 +5,17 @@ use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
use chain_storage::WalletChainStore;
use common::{
error::ExecutionFailureKind,
sequencer_client::{SequencerClient, json::SendTxResponse},
rpc_primitives::requests::SendTxResponse,
sequencer_client::SequencerClient,
transaction::{EncodedTransaction, NSSATransaction},
};
use config::WalletConfig;
use key_protocol::key_management::key_tree::chain_index::ChainIndex;
use key_protocol::key_management::key_tree::{chain_index::ChainIndex, traits::KeyNode as _};
use log::info;
use nssa::{Account, AccountId, PrivacyPreservingTransaction, program::Program};
use nssa::{
Account, AccountId, PrivacyPreservingTransaction,
privacy_preserving_transaction::message::EncryptedAccountData, program::Program,
};
use nssa_core::{Commitment, MembershipProof, SharedSecretKey, program::InstructionData};
pub use privacy_preserving_tx::PrivacyPreservingAccount;
use tokio::io::AsyncWriteExt;
@ -293,4 +297,93 @@ impl WalletCore {
shared_secrets,
))
}
pub async fn sync_to_block(&mut self, block_id: u64) -> Result<()> {
use futures::TryStreamExt as _;
if self.last_synced_block >= block_id {
return Ok(());
}
let before_polling = std::time::Instant::now();
let poller = self.poller.clone();
let mut blocks =
std::pin::pin!(poller.poll_block_range(self.last_synced_block + 1..=block_id));
while let Some(block) = blocks.try_next().await? {
for tx in block.transactions {
let nssa_tx = NSSATransaction::try_from(&tx)?;
self.sync_private_accounts_with_tx(nssa_tx);
}
self.last_synced_block = block.block_id;
self.store_persistent_data().await?;
}
println!(
"Synced to block {block_id} in {:?}",
before_polling.elapsed()
);
Ok(())
}
fn sync_private_accounts_with_tx(&mut self, tx: NSSATransaction) {
let NSSATransaction::PrivacyPreserving(tx) = tx else {
return;
};
let private_account_key_chains = self
.storage
.user_data
.default_user_private_accounts
.iter()
.map(|(acc_account_id, (key_chain, _))| (*acc_account_id, key_chain))
.chain(
self.storage
.user_data
.private_key_tree
.key_map
.values()
.map(|keys_node| (keys_node.account_id(), &keys_node.value.0)),
);
let affected_accounts = private_account_key_chains
.flat_map(|(acc_account_id, key_chain)| {
let view_tag = EncryptedAccountData::compute_view_tag(
key_chain.nullifer_public_key.clone(),
key_chain.incoming_viewing_public_key.clone(),
);
tx.message()
.encrypted_private_post_states
.iter()
.enumerate()
.filter(move |(_, encrypted_data)| encrypted_data.view_tag == view_tag)
.filter_map(|(ciph_id, encrypted_data)| {
let ciphertext = &encrypted_data.ciphertext;
let commitment = &tx.message.new_commitments[ciph_id];
let shared_secret =
key_chain.calculate_shared_secret_receiver(encrypted_data.epk.clone());
nssa_core::EncryptionScheme::decrypt(
ciphertext,
&shared_secret,
commitment,
ciph_id as u32,
)
})
.map(move |res_acc| (acc_account_id, res_acc))
})
.collect::<Vec<_>>();
for (affected_account_id, new_acc) in affected_accounts {
println!(
"Received new account for account_id {affected_account_id:#?} with account object {new_acc:#?}"
);
self.storage
.insert_private_account_data(affected_account_id, new_acc);
}
}
}

View File

@ -1,7 +1,7 @@
use std::sync::Arc;
use anyhow::Result;
use common::sequencer_client::SequencerClient;
use common::{block::HashableBlockData, sequencer_client::SequencerClient};
use log::{info, warn};
use crate::config::WalletConfig;
@ -9,21 +9,21 @@ use crate::config::WalletConfig;
#[derive(Clone)]
/// Helperstruct to poll transactions
pub struct TxPoller {
pub polling_max_blocks_to_query: usize,
pub polling_max_error_attempts: u64,
polling_max_blocks_to_query: usize,
polling_max_error_attempts: u64,
// TODO: This should be Duration
pub polling_error_delay_millis: u64,
pub polling_delay_millis: u64,
pub client: Arc<SequencerClient>,
polling_delay_millis: u64,
block_poll_max_amount: u64,
client: Arc<SequencerClient>,
}
impl TxPoller {
pub fn new(config: WalletConfig, client: Arc<SequencerClient>) -> Self {
Self {
polling_delay_millis: config.seq_poll_timeout_millis,
polling_max_blocks_to_query: config.seq_poll_max_blocks,
polling_max_blocks_to_query: config.seq_tx_poll_max_blocks,
polling_max_error_attempts: config.seq_poll_max_retries,
polling_error_delay_millis: config.seq_poll_retry_delay_millis,
block_poll_max_amount: config.seq_block_poll_max_amount,
client: client.clone(),
}
}
@ -66,4 +66,28 @@ impl TxPoller {
anyhow::bail!("Transaction not found in preconfigured amount of blocks");
}
pub fn poll_block_range(
&self,
range: std::ops::RangeInclusive<u64>,
) -> impl futures::Stream<Item = Result<HashableBlockData>> {
async_stream::stream! {
let mut chunk_start = *range.start();
loop {
let chunk_end = std::cmp::min(chunk_start + self.block_poll_max_amount - 1, *range.end());
let blocks = self.client.get_block_range(chunk_start..=chunk_end).await?.blocks;
for block in blocks {
let block = borsh::from_slice::<HashableBlockData>(&block)?;
yield Ok(block);
}
chunk_start = chunk_end + 1;
if chunk_start > *range.end() {
break;
}
}
}
}
}

View File

@ -1,4 +1,4 @@
use common::{error::ExecutionFailureKind, sequencer_client::json::SendTxResponse};
use common::{error::ExecutionFailureKind, rpc_primitives::requests::SendTxResponse};
use nssa::AccountId;
use super::{NativeTokenTransfer, auth_transfer_preparation};

View File

@ -1,6 +1,6 @@
use std::vec;
use common::{error::ExecutionFailureKind, sequencer_client::json::SendTxResponse};
use common::{error::ExecutionFailureKind, rpc_primitives::requests::SendTxResponse};
use nssa::{AccountId, program::Program};
use nssa_core::{NullifierPublicKey, SharedSecretKey, encryption::IncomingViewingPublicKey};

View File

@ -1,4 +1,4 @@
use common::{error::ExecutionFailureKind, sequencer_client::json::SendTxResponse};
use common::{error::ExecutionFailureKind, rpc_primitives::requests::SendTxResponse};
use nssa::{
AccountId, PublicTransaction,
program::Program,

View File

@ -1,4 +1,4 @@
use common::{error::ExecutionFailureKind, sequencer_client::json::SendTxResponse};
use common::{error::ExecutionFailureKind, rpc_primitives::requests::SendTxResponse};
use nssa::AccountId;
use nssa_core::{NullifierPublicKey, SharedSecretKey, encryption::IncomingViewingPublicKey};

View File

@ -1,4 +1,4 @@
use common::{error::ExecutionFailureKind, sequencer_client::json::SendTxResponse};
use common::{error::ExecutionFailureKind, rpc_primitives::requests::SendTxResponse};
use nssa::AccountId;
use nssa_core::SharedSecretKey;

View File

@ -1,4 +1,4 @@
use common::{error::ExecutionFailureKind, sequencer_client::json::SendTxResponse};
use common::{error::ExecutionFailureKind, rpc_primitives::requests::SendTxResponse};
use nssa::{AccountId, program::Program};
use nssa_core::{
NullifierPublicKey, SharedSecretKey, encryption::IncomingViewingPublicKey,