diff --git a/integration_tests/tests/bridge.rs b/integration_tests/tests/bridge.rs index cd8c8fa5..cc4042ba 100644 --- a/integration_tests/tests/bridge.rs +++ b/integration_tests/tests/bridge.rs @@ -1,5 +1,6 @@ #![expect( clippy::tests_outside_test_module, + clippy::arithmetic_side_effects, reason = "We don't care about these in tests" )] @@ -26,6 +27,8 @@ use nssa_core::{InputAccountIdentity, account::AccountWithMetadata}; use sequencer_service_rpc::RpcClient as _; use tokio::test; +const TIME_TO_FINALIZE_DEPOSIT_EVENT_ON_BEDROCK: Duration = Duration::from_mins(6); + #[test] async fn public_bridge_deposit_invocation_is_dropped() -> anyhow::Result<()> { let ctx = TestContext::new().await?; @@ -189,16 +192,16 @@ async fn private_bridge_deposit_invocation_is_dropped() -> anyhow::Result<()> { Ok(()) } -#[derive(BorshSerialize)] -struct DepositMetadata { - recipient_id: AccountId, -} - async fn submit_bedrock_deposit( bedrock_addr: std::net::SocketAddr, recipient_id: AccountId, amount: u128, ) -> anyhow::Result<()> { + #[derive(BorshSerialize)] + struct DepositMetadata { + recipient_id: AccountId, + } + // Encode deposit metadata let metadata = borsh::to_vec(&DepositMetadata { recipient_id }) .context("Failed to encode deposit metadata")?; @@ -220,15 +223,7 @@ async fn submit_bedrock_deposit( .await .context("Failed to query Bedrock wallet balance")?; - if !balance_response.status().is_success() { - let status = balance_response.status(); - let body_text = balance_response.text().await.unwrap_or_default(); - anyhow::bail!( - "Bedrock balance query failed with status {} and body {}", - status, - body_text, - ); - } + let balance_response = check_response_success(balance_response).await?; balance_response .json::() @@ -273,16 +268,7 @@ async fn submit_bedrock_deposit( .send() .await .context("Failed to submit Bedrock transfer-funds request")?; - - if !transfer_response.status().is_success() { - let status = transfer_response.status(); - let body_text = transfer_response.text().await.unwrap_or_default(); - anyhow::bail!( - "Bedrock transfer-funds request failed with status {} and body {}", - status, - body_text, - ); - } + let transfer_response = check_response_success(transfer_response).await?; let transfer: WalletTransferFundsResponseBody = transfer_response .json() @@ -312,8 +298,7 @@ async fn submit_bedrock_deposit( let Some(selected_note_id) = selected_note_id else { anyhow::bail!( - "Failed to locate exact-value note {:?} for Bedrock deposit; available notes: {:?}", - amount, + "Failed to locate exact-value note {amount:?} for Bedrock deposit; available notes: {:?}", balance.notes, ); }; @@ -336,26 +321,26 @@ async fn submit_bedrock_deposit( .send() .await .context("Failed to submit Bedrock deposit request")?; + let response = check_response_success(response).await?; + let body_text = response + .text() + .await + .unwrap_or_else(|_| "".to_owned()); + info!( + "Successfully submitted Bedrock deposit request for recipient {recipient_id} and amount {amount}, response body: {body_text}", + ); + + Ok(()) +} + +async fn check_response_success(response: reqwest::Response) -> anyhow::Result { if response.status().is_success() { - let body_text = response - .text() - .await - .unwrap_or_else(|_| "".to_owned()); - info!( - "Successfully submitted Bedrock deposit request for recipient {recipient_id} and amount {amount}, response body: {}", - body_text - ); - - return Ok(()); + Ok(response) } else { let status = response.status(); let body_text = response.text().await.unwrap_or_default(); - anyhow::bail!( - "Bedrock deposit request failed with status {} and body {}", - status, - body_text, - ); + anyhow::bail!("Request failed with status {status} and body {body_text}"); } } @@ -363,8 +348,9 @@ async fn wait_for_vault_balance( ctx: &TestContext, vault_id: AccountId, expected_balance: u128, - timeout: Duration, ) -> anyhow::Result<()> { + let timeout = TIME_TO_FINALIZE_DEPOSIT_EVENT_ON_BEDROCK + + Duration::from_secs(TIME_TO_WAIT_FOR_BLOCK_SECONDS); tokio::time::timeout(timeout, async { loop { let balance = ctx.sequencer_client().get_account_balance(vault_id).await?; @@ -401,13 +387,7 @@ async fn bedrock_deposit_mints_to_vault_then_claim_succeeds() -> anyhow::Result< submit_bedrock_deposit(ctx.bedrock_addr(), recipient_id, 1).await?; // Wait for vault to receive the deposit (minted from bridge to vault) - wait_for_vault_balance( - &ctx, - recipient_vault_id, - vault_balance_before + 1, - Duration::from_mins(5), - ) - .await?; + wait_for_vault_balance(&ctx, recipient_vault_id, vault_balance_before + 1).await?; // Now claim funds from vault back to recipient let nonces = ctx diff --git a/sequencer/service/src/lib.rs b/sequencer/service/src/lib.rs index ecedfd7c..9311f1de 100644 --- a/sequencer/service/src/lib.rs +++ b/sequencer/service/src/lib.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, net::SocketAddr, sync::Arc, time::Duration}; +use std::{net::SocketAddr, sync::Arc, time::Duration}; use anyhow::{Context as _, Result, anyhow}; #[cfg(not(feature = "standalone"))] @@ -13,8 +13,10 @@ use jsonrpsee::server::ServerHandle; use log::warn; use log::{error, info}; #[cfg(not(feature = "standalone"))] +use logos_blockchain_core::mantle::ops::channel::MsgId; +#[cfg(not(feature = "standalone"))] use logos_blockchain_zone_sdk::{ - CommonHttpClient, ZoneMessage, adapter::Node as _, adapter::NodeHttpClient, + CommonHttpClient, Slot, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer, }; use mempool::MemPoolHandle; #[cfg(not(feature = "standalone"))] @@ -252,77 +254,60 @@ async fn bedrock_deposit_loop( mempool_handle: MemPoolHandle, ) -> Result { let basic_auth = bedrock_config.auth.map(Into::into); - let channel_id = bedrock_config.channel_id; let node = NodeHttpClient::new(CommonHttpClient::new(basic_auth), bedrock_config.node_url); - let mut seen_deposits = HashSet::new(); + let zone_indexer = ZoneIndexer::new(bedrock_config.channel_id, node); + + let mut cursor: Option<(MsgId, Slot)> = None; + let poll_interval = Duration::from_secs(1); loop { - let stream = match node.block_stream().await { + let stream = match zone_indexer.next_messages(cursor).await { Ok(stream) => stream, Err(err) => { error!("Failed to start Bedrock deposit stream: {err}"); - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(poll_interval).await; continue; } }; let mut stream = std::pin::pin!(stream); - while let Some(block_event) = stream.next().await { - let block_id = block_event.block.header.id; - - let zone_messages = match node.zone_messages_in_block(block_id, channel_id).await { - Ok(messages) => messages, - Err(err) => { - warn!("Failed to fetch zone messages for Bedrock block {block_id}: {err}"); - continue; + while let Some((msg, slot)) = stream.next().await { + match msg { + ZoneMessage::Block(block) => { + cursor = Some((block.id, slot)); + info!("Observed Bedrock channel block id {:?}", block.id); } - }; - let mut zone_messages = std::pin::pin!(zone_messages); - - while let Some(msg) = zone_messages.next().await { - match msg { - ZoneMessage::Block(block) => { - info!("Observed Bedrock channel block id {:?}", block.id); - } - ZoneMessage::Deposit(deposit) => { - // Dedupe by stable payload to avoid replaying the same deposit. - let deposit_fingerprint = - format!("{:?}:{:?}", deposit.inputs, deposit.metadata); - if !seen_deposits.insert(deposit_fingerprint) { + ZoneMessage::Deposit(deposit) => { + let metadata = match DepositMetadata::decode(&deposit.metadata) { + Ok(metadata) => metadata, + Err(err) => { + warn!("Skipping Bedrock Deposit with invalid metadata: {err}"); continue; } + }; - let metadata = match DepositMetadata::decode(&deposit.metadata) { - Ok(metadata) => metadata, - Err(err) => { - warn!("Skipping Bedrock Deposit with invalid metadata: {err}"); - continue; - } - }; + let tx = match build_bridge_deposit_tx(&metadata) { + Ok(tx) => tx, + Err(err) => { + warn!("Skipping Bedrock Deposit due to tx build failure: {err:#}"); + continue; + } + }; - let tx = match build_bridge_deposit_tx(&metadata) { - Ok(tx) => tx, - Err(err) => { - warn!("Skipping Bedrock Deposit due to tx build failure: {err:#}"); - continue; - } - }; - - info!( - "Observed Bedrock Deposit for recipient {recipient_id}, pushing to mempool", - recipient_id = metadata.recipient_id - ); - mempool_handle.push(tx).await.context( - "Mempool is closed while pushing Bedrock Deposit transaction", - )?; - } - ZoneMessage::Withdraw(_) => {} + info!( + "Observed Bedrock Deposit for recipient {recipient_id}, pushing to mempool", + recipient_id = metadata.recipient_id + ); + mempool_handle + .push(tx) + .await + .context("Mempool is closed while pushing Bedrock Deposit transaction")?; } + ZoneMessage::Withdraw(_) => {} } } - warn!("Bedrock deposit stream ended unexpectedly, reconnecting"); - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(poll_interval).await; } } diff --git a/test_fixtures/src/config.rs b/test_fixtures/src/config.rs index ef500873..de7c2644 100644 --- a/test_fixtures/src/config.rs +++ b/test_fixtures/src/config.rs @@ -189,6 +189,7 @@ pub fn addr_to_url(protocol: UrlProtocol, addr: SocketAddr) -> Result { url_string.parse().map_err(Into::into) } +#[must_use] pub fn bedrock_channel_id() -> ChannelId { ChannelId::from([0_u8; 32]) }