diff --git a/wallet/Cargo.toml b/wallet/Cargo.toml index 3b12d8f9..34fc84c5 100644 --- a/wallet/Cargo.toml +++ b/wallet/Cargo.toml @@ -21,6 +21,8 @@ hex = "0.4.3" rand.workspace = true itertools = "0.14.0" sha2.workspace = true +futures.workspace = true +async-stream = "0.3.6" [dependencies.key_protocol] path = "../key_protocol" diff --git a/wallet/src/cli/account.rs b/wallet/src/cli/account.rs index 5b23b2b8..aeaf1821 100644 --- a/wallet/src/cli/account.rs +++ b/wallet/src/cli/account.rs @@ -9,9 +9,7 @@ use serde::Serialize; use crate::{ WalletCore, cli::{SubcommandReturnValue, WalletSubcommand}, - helperfunctions::{ - AccountPrivacyKind, HumanReadableAccount, parse_addr_with_privacy_prefix, parse_block_range, - }, + helperfunctions::{AccountPrivacyKind, HumanReadableAccount, parse_addr_with_privacy_prefix}, }; const TOKEN_DEFINITION_TYPE: u8 = 0; @@ -278,7 +276,6 @@ impl WalletSubcommand for AccountSubcommand { new_subcommand.handle_subcommand(wallet_core).await } AccountSubcommand::SyncPrivate {} => { - let last_synced_block = wallet_core.last_synced_block; let curr_last_block = wallet_core .sequencer_client .get_last_block() @@ -298,13 +295,7 @@ impl WalletSubcommand for AccountSubcommand { println!("Stored persistent data at {path:#?}"); } else { - parse_block_range( - last_synced_block + 1, - curr_last_block, - wallet_core.sequencer_client.clone(), - wallet_core, - ) - .await?; + wallet_core.sync_to_block(curr_last_block).await?; } Ok(SubcommandReturnValue::SyncedToBlock(curr_last_block)) diff --git a/wallet/src/cli/mod.rs b/wallet/src/cli/mod.rs index c1def06e..eb4e8910 100644 --- a/wallet/src/cli/mod.rs +++ b/wallet/src/cli/mod.rs @@ -1,8 +1,5 @@ -use std::sync::Arc; - use anyhow::Result; use clap::{Parser, Subcommand}; -use common::sequencer_client::SequencerClient; use nssa::program::Program; use crate::{ @@ -16,7 +13,7 @@ use crate::{ token::TokenProgramAgnosticSubcommand, }, }, - helperfunctions::{fetch_config, parse_block_range}, + helperfunctions::fetch_config, }; pub mod account; @@ -164,29 +161,20 @@ pub async fn execute_subcommand(command: Command) -> Result Result<()> { let config = fetch_config().await?; - let seq_client = Arc::new(SequencerClient::new(config.sequencer_addr.clone())?); let mut wallet_core = WalletCore::start_from_config_update_chain(config.clone()).await?; - let mut latest_block_num = seq_client.get_last_block().await?.last_block; - let mut curr_last_block = latest_block_num; - loop { - parse_block_range( - curr_last_block, - latest_block_num, - seq_client.clone(), - &mut wallet_core, - ) - .await?; - - curr_last_block = latest_block_num + 1; + let latest_block_num = wallet_core + .sequencer_client + .get_last_block() + .await? + .last_block; + wallet_core.sync_to_block(latest_block_num).await?; tokio::time::sleep(std::time::Duration::from_millis( config.seq_poll_timeout_millis, )) .await; - - latest_block_num = seq_client.get_last_block().await?.last_block; } } diff --git a/wallet/src/helperfunctions.rs b/wallet/src/helperfunctions.rs index 19d2d565..770d2bbe 100644 --- a/wallet/src/helperfunctions.rs +++ b/wallet/src/helperfunctions.rs @@ -1,21 +1,16 @@ -use std::{path::PathBuf, str::FromStr, sync::Arc}; +use std::{path::PathBuf, str::FromStr}; use anyhow::Result; use base64::{Engine, engine::general_purpose::STANDARD as BASE64}; -use common::{ - block::HashableBlockData, sequencer_client::SequencerClient, transaction::NSSATransaction, -}; -use key_protocol::{ - key_management::key_tree::traits::KeyNode as _, key_protocol_core::NSSAUserData, -}; -use nssa::{Account, privacy_preserving_transaction::message::EncryptedAccountData}; +use key_protocol::key_protocol_core::NSSAUserData; +use nssa::Account; use nssa_core::account::Nonce; use rand::{RngCore, rngs::OsRng}; use serde::Serialize; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use crate::{ - HOME_DIR_ENV_VAR, WalletCore, + HOME_DIR_ENV_VAR, config::{ InitialAccountData, InitialAccountDataPrivate, InitialAccountDataPublic, PersistentAccountDataPrivate, PersistentAccountDataPublic, PersistentStorage, WalletConfig, @@ -230,125 +225,6 @@ impl From for HumanReadableAccount { } } -pub async fn parse_block_range( - start: u64, - stop: u64, - seq_client: Arc, - wallet_core: &mut WalletCore, -) -> Result<()> { - for block_id in start..(stop + 1) { - let block = - borsh::from_slice::(&seq_client.get_block(block_id).await?.block)?; - - for tx in block.transactions { - let nssa_tx = NSSATransaction::try_from(&tx)?; - - if let NSSATransaction::PrivacyPreserving(tx) = nssa_tx { - let mut affected_accounts = vec![]; - - for (acc_account_id, (key_chain, _)) in - &wallet_core.storage.user_data.default_user_private_accounts - { - let view_tag = EncryptedAccountData::compute_view_tag( - key_chain.nullifer_public_key.clone(), - key_chain.incoming_viewing_public_key.clone(), - ); - - for (ciph_id, encrypted_data) in tx - .message() - .encrypted_private_post_states - .iter() - .enumerate() - { - if encrypted_data.view_tag == view_tag { - let ciphertext = &encrypted_data.ciphertext; - let commitment = &tx.message.new_commitments[ciph_id]; - let shared_secret = key_chain - .calculate_shared_secret_receiver(encrypted_data.epk.clone()); - - let res_acc = nssa_core::EncryptionScheme::decrypt( - ciphertext, - &shared_secret, - commitment, - ciph_id as u32, - ); - - if let Some(res_acc) = res_acc { - println!( - "Received new account for account_id {acc_account_id:#?} with account object {res_acc:#?}" - ); - - affected_accounts.push((*acc_account_id, res_acc)); - } - } - } - } - - for keys_node in wallet_core - .storage - .user_data - .private_key_tree - .key_map - .values() - { - let acc_account_id = keys_node.account_id(); - let key_chain = &keys_node.value.0; - - let view_tag = EncryptedAccountData::compute_view_tag( - key_chain.nullifer_public_key.clone(), - key_chain.incoming_viewing_public_key.clone(), - ); - - for (ciph_id, encrypted_data) in tx - .message() - .encrypted_private_post_states - .iter() - .enumerate() - { - if encrypted_data.view_tag == view_tag { - let ciphertext = &encrypted_data.ciphertext; - let commitment = &tx.message.new_commitments[ciph_id]; - let shared_secret = key_chain - .calculate_shared_secret_receiver(encrypted_data.epk.clone()); - - let res_acc = nssa_core::EncryptionScheme::decrypt( - ciphertext, - &shared_secret, - commitment, - ciph_id as u32, - ); - - if let Some(res_acc) = res_acc { - println!( - "Received new account for account_id {acc_account_id:#?} with account object {res_acc:#?}" - ); - - affected_accounts.push((acc_account_id, res_acc)); - } - } - } - } - - for (affected_account_id, new_acc) in affected_accounts { - wallet_core - .storage - .insert_private_account_data(affected_account_id, new_acc); - } - } - } - - wallet_core.last_synced_block = block_id; - wallet_core.store_persistent_data().await?; - - println!( - "Block at id {block_id} with timestamp {} parsed", - block.timestamp - ); - } - - Ok(()) -} - #[cfg(test)] mod tests { use super::*; diff --git a/wallet/src/lib.rs b/wallet/src/lib.rs index f79d947e..2886dcda 100644 --- a/wallet/src/lib.rs +++ b/wallet/src/lib.rs @@ -9,9 +9,12 @@ use common::{ transaction::{EncodedTransaction, NSSATransaction}, }; use config::WalletConfig; -use key_protocol::key_management::key_tree::chain_index::ChainIndex; +use key_protocol::key_management::key_tree::{chain_index::ChainIndex, traits::KeyNode as _}; use log::info; -use nssa::{Account, AccountId, PrivacyPreservingTransaction, program::Program}; +use nssa::{ + Account, AccountId, PrivacyPreservingTransaction, + privacy_preserving_transaction::message::EncryptedAccountData, program::Program, +}; use nssa_core::{Commitment, MembershipProof, SharedSecretKey, program::InstructionData}; pub use privacy_preserving_tx::PrivacyPreservingAccount; use tokio::io::AsyncWriteExt; @@ -293,4 +296,91 @@ impl WalletCore { shared_secrets, )) } + + pub async fn sync_to_block(&mut self, block_id: u64) -> Result<()> { + use futures::TryStreamExt as _; + + if self.last_synced_block >= block_id { + return Ok(()); + } + + let poller = self.poller.clone(); + let mut blocks = + std::pin::pin!(poller.poll_block_range(self.last_synced_block + 1..=block_id)); + + while let Some(block) = blocks.try_next().await? { + for tx in block.transactions { + let nssa_tx = NSSATransaction::try_from(&tx)?; + self.sync_private_accounts_with_tx(nssa_tx); + } + + self.last_synced_block = block.block_id; + self.store_persistent_data().await?; + + println!( + "Block at id {} with timestamp {} parsed", + block.block_id, block.timestamp, + ); + } + + Ok(()) + } + + fn sync_private_accounts_with_tx(&mut self, tx: NSSATransaction) { + let NSSATransaction::PrivacyPreserving(tx) = tx else { + return; + }; + + let private_account_key_chains = self + .storage + .user_data + .default_user_private_accounts + .iter() + .map(|(acc_account_id, (key_chain, _))| (*acc_account_id, key_chain)) + .chain( + self.storage + .user_data + .private_key_tree + .key_map + .values() + .map(|keys_node| (keys_node.account_id(), &keys_node.value.0)), + ); + + let affected_accounts = private_account_key_chains + .flat_map(|(acc_account_id, key_chain)| { + let view_tag = EncryptedAccountData::compute_view_tag( + key_chain.nullifer_public_key.clone(), + key_chain.incoming_viewing_public_key.clone(), + ); + + tx.message() + .encrypted_private_post_states + .iter() + .enumerate() + .filter(move |(_, encrypted_data)| encrypted_data.view_tag == view_tag) + .filter_map(|(ciph_id, encrypted_data)| { + let ciphertext = &encrypted_data.ciphertext; + let commitment = &tx.message.new_commitments[ciph_id]; + let shared_secret = + key_chain.calculate_shared_secret_receiver(encrypted_data.epk.clone()); + + nssa_core::EncryptionScheme::decrypt( + ciphertext, + &shared_secret, + commitment, + ciph_id as u32, + ) + }) + .map(move |res_acc| (acc_account_id, res_acc)) + }) + .collect::>(); + + for (affected_account_id, new_acc) in affected_accounts { + println!( + "Received new account for account_id {affected_account_id:#?} with account object {new_acc:#?}" + ); + self.storage + .insert_private_account_data(affected_account_id, new_acc); + } + } } diff --git a/wallet/src/poller.rs b/wallet/src/poller.rs index 2b709e75..0e2192d5 100644 --- a/wallet/src/poller.rs +++ b/wallet/src/poller.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use anyhow::Result; -use common::sequencer_client::SequencerClient; +use common::{block::HashableBlockData, sequencer_client::SequencerClient}; use log::{info, warn}; use crate::config::WalletConfig; @@ -66,4 +66,18 @@ impl TxPoller { anyhow::bail!("Transaction not found in preconfigured amount of blocks"); } + + pub fn poll_block_range( + &self, + range: std::ops::RangeInclusive, + ) -> impl futures::Stream> { + async_stream::stream! { + for block_id in range { + let block = borsh::from_slice::( + &self.client.get_block(block_id).await?.block, + )?; + yield Ok(block); + } + } + } }