refactor: split block polling

This commit is contained in:
Daniil Polyakov 2025-12-03 00:17:12 +03:00
parent 2d34925725
commit 91c898f19c
6 changed files with 122 additions and 161 deletions

View File

@ -21,6 +21,8 @@ hex = "0.4.3"
rand.workspace = true
itertools = "0.14.0"
sha2.workspace = true
futures.workspace = true
async-stream = "0.3.6"
[dependencies.key_protocol]
path = "../key_protocol"

View File

@ -9,9 +9,7 @@ use serde::Serialize;
use crate::{
WalletCore,
cli::{SubcommandReturnValue, WalletSubcommand},
helperfunctions::{
AccountPrivacyKind, HumanReadableAccount, parse_addr_with_privacy_prefix, parse_block_range,
},
helperfunctions::{AccountPrivacyKind, HumanReadableAccount, parse_addr_with_privacy_prefix},
};
const TOKEN_DEFINITION_TYPE: u8 = 0;
@ -278,7 +276,6 @@ impl WalletSubcommand for AccountSubcommand {
new_subcommand.handle_subcommand(wallet_core).await
}
AccountSubcommand::SyncPrivate {} => {
let last_synced_block = wallet_core.last_synced_block;
let curr_last_block = wallet_core
.sequencer_client
.get_last_block()
@ -298,13 +295,7 @@ impl WalletSubcommand for AccountSubcommand {
println!("Stored persistent data at {path:#?}");
} else {
parse_block_range(
last_synced_block + 1,
curr_last_block,
wallet_core.sequencer_client.clone(),
wallet_core,
)
.await?;
wallet_core.sync_to_block(curr_last_block).await?;
}
Ok(SubcommandReturnValue::SyncedToBlock(curr_last_block))

View File

@ -1,8 +1,5 @@
use std::sync::Arc;
use anyhow::Result;
use clap::{Parser, Subcommand};
use common::sequencer_client::SequencerClient;
use nssa::program::Program;
use crate::{
@ -16,7 +13,7 @@ use crate::{
token::TokenProgramAgnosticSubcommand,
},
},
helperfunctions::{fetch_config, parse_block_range},
helperfunctions::fetch_config,
};
pub mod account;
@ -164,29 +161,20 @@ pub async fn execute_subcommand(command: Command) -> Result<SubcommandReturnValu
pub async fn execute_continuous_run() -> Result<()> {
let config = fetch_config().await?;
let seq_client = Arc::new(SequencerClient::new(config.sequencer_addr.clone())?);
let mut wallet_core = WalletCore::start_from_config_update_chain(config.clone()).await?;
let mut latest_block_num = seq_client.get_last_block().await?.last_block;
let mut curr_last_block = latest_block_num;
loop {
parse_block_range(
curr_last_block,
latest_block_num,
seq_client.clone(),
&mut wallet_core,
)
.await?;
curr_last_block = latest_block_num + 1;
let latest_block_num = wallet_core
.sequencer_client
.get_last_block()
.await?
.last_block;
wallet_core.sync_to_block(latest_block_num).await?;
tokio::time::sleep(std::time::Duration::from_millis(
config.seq_poll_timeout_millis,
))
.await;
latest_block_num = seq_client.get_last_block().await?.last_block;
}
}

View File

@ -1,21 +1,16 @@
use std::{path::PathBuf, str::FromStr, sync::Arc};
use std::{path::PathBuf, str::FromStr};
use anyhow::Result;
use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
use common::{
block::HashableBlockData, sequencer_client::SequencerClient, transaction::NSSATransaction,
};
use key_protocol::{
key_management::key_tree::traits::KeyNode as _, key_protocol_core::NSSAUserData,
};
use nssa::{Account, privacy_preserving_transaction::message::EncryptedAccountData};
use key_protocol::key_protocol_core::NSSAUserData;
use nssa::Account;
use nssa_core::account::Nonce;
use rand::{RngCore, rngs::OsRng};
use serde::Serialize;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use crate::{
HOME_DIR_ENV_VAR, WalletCore,
HOME_DIR_ENV_VAR,
config::{
InitialAccountData, InitialAccountDataPrivate, InitialAccountDataPublic,
PersistentAccountDataPrivate, PersistentAccountDataPublic, PersistentStorage, WalletConfig,
@ -230,125 +225,6 @@ impl From<Account> for HumanReadableAccount {
}
}
pub async fn parse_block_range(
start: u64,
stop: u64,
seq_client: Arc<SequencerClient>,
wallet_core: &mut WalletCore,
) -> Result<()> {
for block_id in start..(stop + 1) {
let block =
borsh::from_slice::<HashableBlockData>(&seq_client.get_block(block_id).await?.block)?;
for tx in block.transactions {
let nssa_tx = NSSATransaction::try_from(&tx)?;
if let NSSATransaction::PrivacyPreserving(tx) = nssa_tx {
let mut affected_accounts = vec![];
for (acc_account_id, (key_chain, _)) in
&wallet_core.storage.user_data.default_user_private_accounts
{
let view_tag = EncryptedAccountData::compute_view_tag(
key_chain.nullifer_public_key.clone(),
key_chain.incoming_viewing_public_key.clone(),
);
for (ciph_id, encrypted_data) in tx
.message()
.encrypted_private_post_states
.iter()
.enumerate()
{
if encrypted_data.view_tag == view_tag {
let ciphertext = &encrypted_data.ciphertext;
let commitment = &tx.message.new_commitments[ciph_id];
let shared_secret = key_chain
.calculate_shared_secret_receiver(encrypted_data.epk.clone());
let res_acc = nssa_core::EncryptionScheme::decrypt(
ciphertext,
&shared_secret,
commitment,
ciph_id as u32,
);
if let Some(res_acc) = res_acc {
println!(
"Received new account for account_id {acc_account_id:#?} with account object {res_acc:#?}"
);
affected_accounts.push((*acc_account_id, res_acc));
}
}
}
}
for keys_node in wallet_core
.storage
.user_data
.private_key_tree
.key_map
.values()
{
let acc_account_id = keys_node.account_id();
let key_chain = &keys_node.value.0;
let view_tag = EncryptedAccountData::compute_view_tag(
key_chain.nullifer_public_key.clone(),
key_chain.incoming_viewing_public_key.clone(),
);
for (ciph_id, encrypted_data) in tx
.message()
.encrypted_private_post_states
.iter()
.enumerate()
{
if encrypted_data.view_tag == view_tag {
let ciphertext = &encrypted_data.ciphertext;
let commitment = &tx.message.new_commitments[ciph_id];
let shared_secret = key_chain
.calculate_shared_secret_receiver(encrypted_data.epk.clone());
let res_acc = nssa_core::EncryptionScheme::decrypt(
ciphertext,
&shared_secret,
commitment,
ciph_id as u32,
);
if let Some(res_acc) = res_acc {
println!(
"Received new account for account_id {acc_account_id:#?} with account object {res_acc:#?}"
);
affected_accounts.push((acc_account_id, res_acc));
}
}
}
}
for (affected_account_id, new_acc) in affected_accounts {
wallet_core
.storage
.insert_private_account_data(affected_account_id, new_acc);
}
}
}
wallet_core.last_synced_block = block_id;
wallet_core.store_persistent_data().await?;
println!(
"Block at id {block_id} with timestamp {} parsed",
block.timestamp
);
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -9,9 +9,12 @@ use common::{
transaction::{EncodedTransaction, NSSATransaction},
};
use config::WalletConfig;
use key_protocol::key_management::key_tree::chain_index::ChainIndex;
use key_protocol::key_management::key_tree::{chain_index::ChainIndex, traits::KeyNode as _};
use log::info;
use nssa::{Account, AccountId, PrivacyPreservingTransaction, program::Program};
use nssa::{
Account, AccountId, PrivacyPreservingTransaction,
privacy_preserving_transaction::message::EncryptedAccountData, program::Program,
};
use nssa_core::{Commitment, MembershipProof, SharedSecretKey, program::InstructionData};
pub use privacy_preserving_tx::PrivacyPreservingAccount;
use tokio::io::AsyncWriteExt;
@ -293,4 +296,91 @@ impl WalletCore {
shared_secrets,
))
}
pub async fn sync_to_block(&mut self, block_id: u64) -> Result<()> {
use futures::TryStreamExt as _;
if self.last_synced_block >= block_id {
return Ok(());
}
let poller = self.poller.clone();
let mut blocks =
std::pin::pin!(poller.poll_block_range(self.last_synced_block + 1..=block_id));
while let Some(block) = blocks.try_next().await? {
for tx in block.transactions {
let nssa_tx = NSSATransaction::try_from(&tx)?;
self.sync_private_accounts_with_tx(nssa_tx);
}
self.last_synced_block = block.block_id;
self.store_persistent_data().await?;
println!(
"Block at id {} with timestamp {} parsed",
block.block_id, block.timestamp,
);
}
Ok(())
}
fn sync_private_accounts_with_tx(&mut self, tx: NSSATransaction) {
let NSSATransaction::PrivacyPreserving(tx) = tx else {
return;
};
let private_account_key_chains = self
.storage
.user_data
.default_user_private_accounts
.iter()
.map(|(acc_account_id, (key_chain, _))| (*acc_account_id, key_chain))
.chain(
self.storage
.user_data
.private_key_tree
.key_map
.values()
.map(|keys_node| (keys_node.account_id(), &keys_node.value.0)),
);
let affected_accounts = private_account_key_chains
.flat_map(|(acc_account_id, key_chain)| {
let view_tag = EncryptedAccountData::compute_view_tag(
key_chain.nullifer_public_key.clone(),
key_chain.incoming_viewing_public_key.clone(),
);
tx.message()
.encrypted_private_post_states
.iter()
.enumerate()
.filter(move |(_, encrypted_data)| encrypted_data.view_tag == view_tag)
.filter_map(|(ciph_id, encrypted_data)| {
let ciphertext = &encrypted_data.ciphertext;
let commitment = &tx.message.new_commitments[ciph_id];
let shared_secret =
key_chain.calculate_shared_secret_receiver(encrypted_data.epk.clone());
nssa_core::EncryptionScheme::decrypt(
ciphertext,
&shared_secret,
commitment,
ciph_id as u32,
)
})
.map(move |res_acc| (acc_account_id, res_acc))
})
.collect::<Vec<_>>();
for (affected_account_id, new_acc) in affected_accounts {
println!(
"Received new account for account_id {affected_account_id:#?} with account object {new_acc:#?}"
);
self.storage
.insert_private_account_data(affected_account_id, new_acc);
}
}
}

View File

@ -1,7 +1,7 @@
use std::sync::Arc;
use anyhow::Result;
use common::sequencer_client::SequencerClient;
use common::{block::HashableBlockData, sequencer_client::SequencerClient};
use log::{info, warn};
use crate::config::WalletConfig;
@ -66,4 +66,18 @@ impl TxPoller {
anyhow::bail!("Transaction not found in preconfigured amount of blocks");
}
pub fn poll_block_range(
&self,
range: std::ops::RangeInclusive<u64>,
) -> impl futures::Stream<Item = Result<HashableBlockData>> {
async_stream::stream! {
for block_id in range {
let block = borsh::from_slice::<HashableBlockData>(
&self.client.get_block(block_id).await?.block,
)?;
yield Ok(block);
}
}
}
}