fix: suggestions fix 1

This commit is contained in:
Pravdyvy 2026-01-27 09:46:31 +02:00
parent 4c4e30864b
commit c0e879edae
20 changed files with 187 additions and 189 deletions

6
Cargo.lock generated
View File

@ -933,6 +933,7 @@ dependencies = [
"logos-blockchain-common-http-client",
"logos-blockchain-core",
"reqwest",
"serde",
"tokio-retry",
]
@ -1323,6 +1324,7 @@ dependencies = [
"borsh",
"hex",
"log",
"logos-blockchain-common-http-client",
"nssa",
"nssa_core",
"reqwest",
@ -1330,6 +1332,7 @@ dependencies = [
"serde_json",
"sha2",
"thiserror 2.0.17",
"url",
]
[[package]]
@ -2783,6 +2786,7 @@ dependencies = [
"sequencer_runner",
"tempfile",
"tokio",
"url",
"wallet",
]
@ -6127,6 +6131,7 @@ dependencies = [
"idna",
"percent-encoding",
"serde",
"serde_derive",
]
[[package]]
@ -6187,6 +6192,7 @@ dependencies = [
"serde_json",
"sha2",
"tokio",
"url",
]
[[package]]

View File

@ -79,7 +79,7 @@ chrono = "0.4.41"
borsh = "1.5.7"
base58 = "0.2.0"
itertools = "0.14.0"
url = "2.5.4"
url = { version = "2.5.4", features = ["serde"] }
tokio-retry = "0.3.0"
logos-blockchain-common-http-client = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }

View File

@ -9,6 +9,7 @@ anyhow.workspace = true
tokio-retry.workspace = true
futures.workspace = true
log.workspace = true
serde.workspace = true
logos-blockchain-common-http-client.workspace = true
logos-blockchain-core.workspace = true
logos-blockchain-chain-broadcast-service.workspace = true

View File

@ -1,12 +1,20 @@
use anyhow::Result;
use futures::{Stream, TryFutureExt};
use log::warn;
use logos_blockchain_chain_broadcast_service::BlockInfo;
pub use logos_blockchain_chain_broadcast_service::BlockInfo;
pub use logos_blockchain_common_http_client::{BasicAuthCredentials, CommonHttpClient, Error};
use logos_blockchain_core::{block::Block, header::HeaderId, mantle::SignedMantleTx};
pub use logos_blockchain_core::{block::Block, header::HeaderId, mantle::SignedMantleTx};
use reqwest::{Client, Url};
use serde::{Deserialize, Serialize};
use tokio_retry::Retry;
/// Fibonacci backoff retry strategy configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackoffConfig {
pub start_delay_millis: u64,
pub max_retries: usize,
}
// Simple wrapper
// maybe extend in the future for our purposes
pub struct BedrockClient {
@ -41,11 +49,11 @@ impl BedrockClient {
pub async fn get_block_by_id(
&self,
header_id: HeaderId,
start_delay_millis: u64,
max_retries: usize,
backoff: &BackoffConfig,
) -> Result<Option<Block<SignedMantleTx>>, Error> {
let strategy = tokio_retry::strategy::FibonacciBackoff::from_millis(start_delay_millis)
.take(max_retries);
let strategy =
tokio_retry::strategy::FibonacciBackoff::from_millis(backoff.start_delay_millis)
.take(backoff.max_retries);
Retry::spawn(strategy, || {
self.http_client

View File

@ -17,3 +17,5 @@ log.workspace = true
hex.workspace = true
borsh.workspace = true
base64.workspace = true
url.workspace = true
logos-blockchain-common-http-client.workspace = true

View File

@ -1,10 +1,12 @@
use std::{collections::HashMap, ops::RangeInclusive};
use std::{collections::HashMap, ops::RangeInclusive, str::FromStr};
use anyhow::Result;
use logos_blockchain_common_http_client::BasicAuthCredentials;
use nssa_core::program::ProgramId;
use reqwest::Client;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use url::Url;
use super::rpc_primitives::requests::{
GetAccountBalanceRequest, GetAccountBalanceResponse, GetBlockDataRequest, GetBlockDataResponse,
@ -27,22 +29,68 @@ use crate::{
transaction::{EncodedTransaction, NSSATransaction},
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BasicAuth {
pub username: String,
pub password: Option<String>,
}
impl std::fmt::Display for BasicAuth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.username)?;
if let Some(password) = &self.password {
write!(f, ":{password}")?;
}
Ok(())
}
}
impl FromStr for BasicAuth {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let parse = || {
let mut parts = s.splitn(2, ':');
let username = parts.next()?;
let password = parts.next().filter(|p| !p.is_empty());
if parts.next().is_some() {
return None;
}
Some((username, password))
};
let (username, password) = parse().ok_or_else(|| {
anyhow::anyhow!("Invalid auth format. Expected 'user' or 'user:password'")
})?;
Ok(Self {
username: username.to_string(),
password: password.map(|p| p.to_string()),
})
}
}
impl From<BasicAuth> for BasicAuthCredentials {
fn from(value: BasicAuth) -> Self {
BasicAuthCredentials::new(value.username, value.password)
}
}
#[derive(Clone)]
pub struct SequencerClient {
pub client: reqwest::Client,
pub sequencer_addr: String,
pub basic_auth: Option<(String, Option<String>)>,
pub sequencer_addr: Url,
pub basic_auth: Option<BasicAuth>,
}
impl SequencerClient {
pub fn new(sequencer_addr: String) -> Result<Self> {
pub fn new(sequencer_addr: Url) -> Result<Self> {
Self::new_with_auth(sequencer_addr, None)
}
pub fn new_with_auth(
sequencer_addr: String,
basic_auth: Option<(String, Option<String>)>,
) -> Result<Self> {
pub fn new_with_auth(sequencer_addr: Url, basic_auth: Option<BasicAuth>) -> Result<Self> {
Ok(Self {
client: Client::builder()
// Add more fields if needed
@ -67,9 +115,9 @@ impl SequencerClient {
"Calling method {method} with payload {request:?} to sequencer at {}",
self.sequencer_addr
);
let mut call_builder = self.client.post(&self.sequencer_addr);
let mut call_builder = self.client.post(self.sequencer_addr.clone());
if let Some((username, password)) = &self.basic_auth {
if let Some(BasicAuth { username, password }) = &self.basic_auth {
call_builder = call_builder.basic_auth(username, password.as_deref());
}
@ -349,7 +397,7 @@ impl SequencerClient {
Ok(resp_deser)
}
/// Get last seen l2 block at indexer
/// Post indexer into sequencer
pub async fn post_indexer_message(
&self,
message: crate::communication::indexer::Message,

View File

@ -1,22 +1,24 @@
use std::{fs::File, io::BufReader, path::Path};
use anyhow::Result;
use anyhow::{Context, Result};
use bedrock_client::BackoffConfig;
use common::sequencer_client::BasicAuth;
use logos_blockchain_core::mantle::ops::channel::ChannelId;
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Debug, Clone, Serialize, Deserialize)]
/// ToDo: Expand if necessary
pub struct ClientConfig {
pub addr: String,
pub auth: Option<(String, Option<String>)>,
pub addr: Url,
pub auth: Option<BasicAuth>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
/// Note: For individual RPC requests we use Fibonacci backoff retry strategy
pub struct IndexerConfig {
pub resubscribe_interval_millis: u64,
pub start_delay_millis: u64,
pub max_retries: usize,
pub backoff: BackoffConfig,
pub bedrock_client_config: ClientConfig,
pub sequencer_client_config: ClientConfig,
pub channel_id: ChannelId,
@ -24,9 +26,11 @@ pub struct IndexerConfig {
impl IndexerConfig {
pub fn from_path(config_home: &Path) -> Result<IndexerConfig> {
let file = File::open(config_home)?;
let file = File::open(config_home)
.with_context(|| format!("Failed to open indexer config at {config_home:?}"))?;
let reader = BufReader::new(file);
Ok(serde_json::from_reader(reader)?)
serde_json::from_reader(reader)
.with_context(|| format!("Failed to parse indexer config at {config_home:?}"))
}
}

View File

@ -1,7 +1,7 @@
use std::sync::Arc;
use anyhow::{Context, Result};
use bedrock_client::{BasicAuthCredentials, BedrockClient};
use anyhow::Result;
use bedrock_client::BedrockClient;
use common::{
block::HashableBlockData, communication::indexer::Message,
rpc_primitives::requests::PostIndexerMessageResponse, sequencer_client::SequencerClient,
@ -13,7 +13,6 @@ use logos_blockchain_core::mantle::{
ops::channel::{ChannelId, inscribe::InscriptionOp},
};
use tokio::sync::RwLock;
use url::Url;
use crate::{config::IndexerConfig, state::IndexerState};
@ -31,13 +30,8 @@ impl IndexerCore {
pub fn new(config: IndexerConfig) -> Result<Self> {
Ok(Self {
bedrock_client: BedrockClient::new(
config
.bedrock_client_config
.auth
.clone()
.map(|auth| BasicAuthCredentials::new(auth.0, auth.1)),
Url::parse(&config.bedrock_client_config.addr)
.context("Bedrock node addr is not a valid url")?,
config.bedrock_client_config.auth.clone().map(Into::into),
config.bedrock_client_config.addr.clone(),
)?,
sequencer_client: SequencerClient::new_with_auth(
config.sequencer_client_config.addr.clone(),
@ -64,11 +58,7 @@ impl IndexerCore {
if let Some(l1_block) = self
.bedrock_client
.get_block_by_id(
header_id,
self.config.start_delay_millis,
self.config.max_retries,
)
.get_block_by_id(header_id, &self.config.backoff)
.await?
{
info!("Extracted L1 block at height {}", block_info.height);
@ -118,19 +108,17 @@ impl IndexerCore {
fn parse_blocks(
block_txs: impl Iterator<Item = SignedMantleTx>,
decoded_channel_id: &ChannelId,
) -> Vec<HashableBlockData> {
block_txs
.flat_map(|tx| {
tx.mantle_tx.ops.into_iter().filter_map(|op| match op {
Op::ChannelInscribe(InscriptionOp {
channel_id,
inscription,
..
}) if channel_id == *decoded_channel_id => {
borsh::from_slice::<HashableBlockData>(&inscription).ok()
}
_ => None,
})
) -> impl Iterator<Item = HashableBlockData> {
block_txs.flat_map(|tx| {
tx.mantle_tx.ops.into_iter().filter_map(|op| match op {
Op::ChannelInscribe(InscriptionOp {
channel_id,
inscription,
..
}) if channel_id == *decoded_channel_id => {
borsh::from_slice::<HashableBlockData>(&inscription).ok()
}
_ => None,
})
.collect()
})
}

View File

@ -12,6 +12,7 @@ wallet.workspace = true
common.workspace = true
key_protocol.workspace = true
indexer_core.workspace = true
url.workspace = true
anyhow.workspace = true
env_logger.workspace = true

View File

@ -1,16 +1,17 @@
{
"bedrock_client_config": {
"addr": "http://127.0.0.1:8080",
"auth": [
"user",
null
]
"auth": {
"username": "user"
}
},
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101",
"max_retries": 10,
"backoff": {
"max_retries": 10,
"start_delay_millis": 100
},
"resubscribe_interval_millis": 1000,
"sequencer_client_config": {
"addr": "will_be_replaced_in_runtime"
},
"start_delay_millis": 100
}
}

View File

@ -158,7 +158,8 @@
"bedrock_config": {
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101",
"node_url": "http://127.0.0.1:8080",
"user": "user",
"password": null
"auth": {
"username": "user"
}
}
}

View File

@ -3,7 +3,7 @@
use std::{net::SocketAddr, path::PathBuf, sync::LazyLock};
use actix_web::dev::ServerHandle;
use anyhow::{Context as _, Result};
use anyhow::{Context, Result};
use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
use common::{
sequencer_client::SequencerClient,
@ -17,6 +17,7 @@ use nssa_core::Commitment;
use sequencer_core::config::SequencerConfig;
use tempfile::TempDir;
use tokio::task::JoinHandle;
use url::Url;
use wallet::{WalletCore, config::WalletConfigOverrides};
// TODO: Remove this and control time from tests
@ -57,7 +58,7 @@ impl TestContext {
let sequencer_config = SequencerConfig::from_path(&sequencer_config_path)
.context("Failed to create sequencer config from file")?;
Self::new_with_sequencer_config(sequencer_config).await
Self::new_with_sequencer_and_maybe_indexer_configs(sequencer_config, None).await
}
/// Create new test context in local bedrock node attached mode.
@ -76,58 +77,17 @@ impl TestContext {
let indexer_config = IndexerConfig::from_path(&indexer_config_path)
.context("Failed to create indexer config from file")?;
Self::new_with_sequencer_and_indexer_configs(sequencer_config, indexer_config).await
}
/// Create new test context with custom sequencer config.
///
/// `home` and `port` fields of the provided config will be overridden to meet tests parallelism
/// requirements.
pub async fn new_with_sequencer_config(sequencer_config: SequencerConfig) -> Result<Self> {
// Ensure logger is initialized only once
*LOGGER;
debug!("Test context setup");
let (sequencer_server_handle, sequencer_addr, sequencer_loop_handle, temp_sequencer_dir) =
Self::setup_sequencer(sequencer_config)
.await
.context("Failed to setup sequencer")?;
// Convert 0.0.0.0 to 127.0.0.1 for client connections
// When binding to port 0, the server binds to 0.0.0.0:<random_port>
// but clients need to connect to 127.0.0.1:<port> to work reliably
let sequencer_addr = if sequencer_addr.ip().is_unspecified() {
format!("http://127.0.0.1:{}", sequencer_addr.port())
} else {
format!("http://{sequencer_addr}")
};
let (wallet, temp_wallet_dir) = Self::setup_wallet(sequencer_addr.clone())
Self::new_with_sequencer_and_maybe_indexer_configs(sequencer_config, Some(indexer_config))
.await
.context("Failed to setup wallet")?;
let sequencer_client =
SequencerClient::new(sequencer_addr).context("Failed to create sequencer client")?;
Ok(Self {
sequencer_server_handle,
sequencer_loop_handle,
indexer_loop_handle: None,
sequencer_client,
wallet,
_temp_sequencer_dir: temp_sequencer_dir,
_temp_wallet_dir: temp_wallet_dir,
})
}
/// Create new test context with custom sequencer and indexer configs.
/// Create new test context with custom sequencer config and maybe indexer config.
///
/// `home` and `port` fields of the provided config will be overridden to meet tests parallelism
/// requirements.
pub async fn new_with_sequencer_and_indexer_configs(
pub async fn new_with_sequencer_and_maybe_indexer_configs(
sequencer_config: SequencerConfig,
mut indexer_config: IndexerConfig,
indexer_config: Option<IndexerConfig>,
) -> Result<Self> {
// Ensure logger is initialized only once
*LOGGER;
@ -152,26 +112,41 @@ impl TestContext {
.await
.context("Failed to setup wallet")?;
let sequencer_client = SequencerClient::new(sequencer_addr.clone())
.context("Failed to create sequencer client")?;
let sequencer_client = SequencerClient::new(
Url::parse(&sequencer_addr).context("Failed to parse sequencer addr")?,
)
.context("Failed to create sequencer client")?;
indexer_config.sequencer_client_config.addr = sequencer_addr;
if let Some(mut indexer_config) = indexer_config {
indexer_config.sequencer_client_config.addr =
Url::parse(&sequencer_addr).context("Failed to parse sequencer addr")?;
let indexer_core = IndexerCore::new(indexer_config)?;
let indexer_core = IndexerCore::new(indexer_config)?;
let indexer_loop_handle = Some(tokio::spawn(async move {
indexer_core.subscribe_parse_block_stream().await
}));
let indexer_loop_handle = Some(tokio::spawn(async move {
indexer_core.subscribe_parse_block_stream().await
}));
Ok(Self {
sequencer_server_handle,
sequencer_loop_handle,
indexer_loop_handle,
sequencer_client,
wallet,
_temp_sequencer_dir: temp_sequencer_dir,
_temp_wallet_dir: temp_wallet_dir,
})
Ok(Self {
sequencer_server_handle,
sequencer_loop_handle,
indexer_loop_handle,
sequencer_client,
wallet,
_temp_sequencer_dir: temp_sequencer_dir,
_temp_wallet_dir: temp_wallet_dir,
})
} else {
Ok(Self {
sequencer_server_handle,
sequencer_loop_handle,
indexer_loop_handle: None,
sequencer_client,
wallet,
_temp_sequencer_dir: temp_sequencer_dir,
_temp_wallet_dir: temp_wallet_dir,
})
}
}
async fn setup_sequencer(

View File

@ -25,7 +25,11 @@ pub async fn tps_test() -> Result<()> {
let target_tps = 12;
let tps_test = TpsTestManager::new(target_tps, num_transactions);
let ctx = TestContext::new_with_sequencer_config(tps_test.generate_sequencer_config()).await?;
let ctx = TestContext::new_with_sequencer_and_maybe_indexer_configs(
tps_test.generate_sequencer_config(),
None,
)
.await?;
let target_time = tps_test.target_time();
info!(

View File

@ -26,7 +26,6 @@ impl BlockSettlementClient {
pub fn try_new(home: &Path, config: &BedrockConfig) -> Result<Self> {
let bedrock_signing_key = load_or_create_signing_key(&home.join("bedrock_signing_key"))
.context("Failed to load or create signing key")?;
let bedrock_channel_id = ChannelId::from(config.channel_id);
let bedrock_url = Url::from_str(config.node_url.as_ref())
.context("Bedrock node address is not a valid url")?;
let bedrock_client =
@ -35,7 +34,7 @@ impl BlockSettlementClient {
Ok(Self {
bedrock_client,
bedrock_signing_key,
bedrock_channel_id,
bedrock_channel_id: config.channel_id,
last_message_id: channel_genesis_msg,
})
}

View File

@ -5,6 +5,7 @@ use std::{
};
use anyhow::Result;
use common::sequencer_client::BasicAuth;
use logos_blockchain_core::mantle::ops::channel::ChannelId;
use serde::{Deserialize, Serialize};
@ -58,10 +59,8 @@ pub struct BedrockConfig {
pub channel_id: ChannelId,
/// Bedrock Url
pub node_url: String,
/// Bedrock user
pub user: String,
/// Bedrock password
pub password: Option<String>,
/// Bedrock auth
pub auth: Option<BasicAuth>,
}
impl SequencerConfig {

View File

@ -354,7 +354,10 @@ mod tests {
use base58::ToBase58;
use base64::{Engine, engine::general_purpose};
use common::{test_utils::sequencer_sign_key_for_testing, transaction::EncodedTransaction};
use common::{
sequencer_client::BasicAuth, test_utils::sequencer_sign_key_for_testing,
transaction::EncodedTransaction,
};
use sequencer_core::{
SequencerCore,
config::{AccountInitialData, BedrockConfig, SequencerConfig},
@ -405,8 +408,10 @@ mod tests {
bedrock_config: Some(BedrockConfig {
channel_id: [42; 32].into(),
node_url: "http://localhost:8080".to_string(),
user: "user".to_string(),
password: None,
auth: Some(BasicAuth {
username: "user".to_string(),
password: None,
}),
}),
}
}

View File

@ -158,7 +158,8 @@
"bedrock_config": {
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101",
"node_url": "http://localhost:8080",
"user": "user",
"password": null
"auth": {
"username": "user"
}
}
}

View File

@ -29,3 +29,4 @@ risc0-zkvm.workspace = true
async-stream = "0.3.6"
indicatif = { version = "0.18.3", features = ["improved_unicode"] }
optfield = "0.4.0"
url.workspace = true

View File

@ -1,10 +1,10 @@
use std::{
io::{BufReader, Write as _},
path::Path,
str::FromStr,
};
use anyhow::{Context as _, Result};
use common::sequencer_client::BasicAuth;
use key_protocol::key_management::{
KeyChain,
key_tree::{
@ -14,49 +14,6 @@ use key_protocol::key_management::{
use log::warn;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BasicAuth {
pub username: String,
pub password: Option<String>,
}
impl std::fmt::Display for BasicAuth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.username)?;
if let Some(password) = &self.password {
write!(f, ":{password}")?;
}
Ok(())
}
}
impl FromStr for BasicAuth {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let parse = || {
let mut parts = s.splitn(2, ':');
let username = parts.next()?;
let password = parts.next().filter(|p| !p.is_empty());
if parts.next().is_some() {
return None;
}
Some((username, password))
};
let (username, password) = parse().ok_or_else(|| {
anyhow::anyhow!("Invalid auth format. Expected 'user' or 'user:password'")
})?;
Ok(Self {
username: username.to_string(),
password: password.map(|p| p.to_string()),
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InitialAccountDataPublic {
pub account_id: String,

View File

@ -23,6 +23,7 @@ use nssa_core::{
};
pub use privacy_preserving_tx::PrivacyPreservingAccount;
use tokio::io::AsyncWriteExt;
use url::Url;
use crate::{
config::{PersistentStorage, WalletConfigOverrides},
@ -188,13 +189,9 @@ impl WalletCore {
config.apply_overrides(config_overrides);
}
let basic_auth = config
.basic_auth
.as_ref()
.map(|auth| (auth.username.clone(), auth.password.clone()));
let sequencer_client = Arc::new(SequencerClient::new_with_auth(
config.sequencer_addr.clone(),
basic_auth,
Url::parse(&config.sequencer_addr)?,
config.basic_auth.clone(),
)?);
let tx_poller = TxPoller::new(config.clone(), Arc::clone(&sequencer_client));