From f3dcda346cc016436515f05deca5dc7cfbc10bdd Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Mon, 19 Jan 2026 13:55:31 +0200 Subject: [PATCH] fix: ci test 1 --- .github/workflows/ci.yml | 27 ------------------- indexer/src/lib.rs | 5 ++-- indexer/src/state.rs | 2 +- integration_tests/tests/indexer.rs | 8 +++--- sequencer_core/src/block_settlement_client.rs | 5 ++-- sequencer_rpc/src/lib.rs | 6 ++--- sequencer_rpc/src/net_utils.rs | 4 +-- sequencer_rpc/src/process.rs | 7 +++-- .../configs/debug/sequencer_config.json | 11 ++++++-- sequencer_runner/src/lib.rs | 16 +++++------ 10 files changed, 34 insertions(+), 57 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 800b0422..72794bdf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -127,33 +127,6 @@ jobs: RUST_LOG: "info" run: cargo test -p integration_tests -- --exact private::private_transfer_to_owned_account - indexer-test: - runs-on: ubuntu-latest - timeout-minutes: 60 - steps: - - uses: actions/checkout@v5 - with: - ref: ${{ github.head_ref }} - - - uses: ./.github/actions/install-system-deps - - - uses: ./.github/actions/install-risc0 - - - name: Install active toolchain - run: rustup install - - - name: Test indexer run - env: - RUST_LOG: "info" - run: | - git clone https://github.com/logos-blockchain/logos-blockchain.git - cd logos-blockchain - chmod 777 ./scripts/setup-nomos-circuits.sh && ./scripts/setup-nomos-circuits.sh - cargo build --all-features --all-targets - CONSENSUS_SLOT_TIME=5 POL_PROOF_DEV_MODE=true target/debug/nomos-node nodes/nomos-node/config-one-node.yaml --dev-mode-reset-chain-clock > /dev/null 2>&1 - cd .. - cargo test -p integration_tests -- --exact indexer_run_local_node - artifacts: runs-on: ubuntu-latest timeout-minutes: 60 diff --git a/indexer/src/lib.rs b/indexer/src/lib.rs index 9b713eee..b7d70dab 100644 --- a/indexer/src/lib.rs +++ b/indexer/src/lib.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use anyhow::Result; use bedrock_client::{BasicAuthCredentials, BedrockClient}; use common::block::HashableBlockData; -use futures::StreamExt; -use log::info; +use futures::{StreamExt, TryFutureExt}; +use log::{info, warn}; use nomos_core::mantle::{ Op, SignedMantleTx, ops::channel::{ChannelId, inscribe::InscriptionOp}, @@ -74,6 +74,7 @@ impl IndexerCore { self.bedrock_client .0 .get_block_by_id(self.bedrock_url.clone(), header_id) + .inspect_err(|err| warn!("Block fetching failed with err: {err:#?}")) }) .await? { diff --git a/indexer/src/state.rs b/indexer/src/state.rs index b2aa48a5..bd05971f 100644 --- a/indexer/src/state.rs +++ b/indexer/src/state.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use tokio::sync::RwLock; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct IndexerState { // Only one field for now, for testing. pub latest_seen_block: Arc>, diff --git a/integration_tests/tests/indexer.rs b/integration_tests/tests/indexer.rs index 12e011ff..120a85c1 100644 --- a/integration_tests/tests/indexer.rs +++ b/integration_tests/tests/indexer.rs @@ -5,14 +5,11 @@ use tokio::test; #[test] async fn indexer_run_local_node() -> Result<()> { - println!("Waiting 20 seconds for L1 node to start producing"); - tokio::time::sleep(std::time::Duration::from_secs(30)).await; - let ctx = TestContext::new_bedrock_local_attached().await?; info!("Let's observe behaviour"); - tokio::time::sleep(std::time::Duration::from_secs(600)).await; + tokio::time::sleep(std::time::Duration::from_secs(180)).await; let gen_id = ctx .sequencer_client() @@ -20,6 +17,9 @@ async fn indexer_run_local_node() -> Result<()> { .await .unwrap(); + // Checking, that some blocks are landed on bedrock + assert!(gen_id.last_block > 0); + info!("Last seen L2 block at indexer is {}", gen_id.last_block); Ok(()) diff --git a/sequencer_core/src/block_settlement_client.rs b/sequencer_core/src/block_settlement_client.rs index 2a82e47a..16b33107 100644 --- a/sequencer_core/src/block_settlement_client.rs +++ b/sequencer_core/src/block_settlement_client.rs @@ -87,9 +87,8 @@ impl BlockSettlementClient { .post_transaction(self.bedrock_node_url.clone(), tx.clone()) .await?; - match tx.mantle_tx.ops.first() { - Some(Op::ChannelInscribe(inscribe)) => self.last_message_id = inscribe.id(), - _ => {} + if let Some(Op::ChannelInscribe(inscribe)) = tx.mantle_tx.ops.first() { + self.last_message_id = inscribe.id() } Ok(block_data.block_id) diff --git a/sequencer_rpc/src/lib.rs b/sequencer_rpc/src/lib.rs index d66392c8..4f39f957 100644 --- a/sequencer_rpc/src/lib.rs +++ b/sequencer_rpc/src/lib.rs @@ -8,7 +8,7 @@ use common::{ rpc_primitives::errors::{RpcError, RpcErrorKind}, transaction::EncodedTransaction, }; -use indexer::IndexerCore; +use indexer::state::IndexerState; use mempool::MemPoolHandle; pub use net_utils::*; use sequencer_core::SequencerCore; @@ -21,9 +21,9 @@ use self::types::err_rpc::RpcErr; // ToDo: Add necessary fields pub struct JsonHandler { sequencer_state: Arc>, - // No functionality for now. + // No meaningfull functionality for now. #[allow(unused)] - indexer_state: Option>>, + indexer_state: Option, mempool_handle: MemPoolHandle, } diff --git a/sequencer_rpc/src/net_utils.rs b/sequencer_rpc/src/net_utils.rs index 8803831e..0198a8dc 100644 --- a/sequencer_rpc/src/net_utils.rs +++ b/sequencer_rpc/src/net_utils.rs @@ -7,7 +7,7 @@ use common::{ transaction::EncodedTransaction, }; use futures::{Future, FutureExt}; -use indexer::IndexerCore; +use indexer::state::IndexerState; use log::info; use mempool::MemPoolHandle; use sequencer_core::SequencerCore; @@ -47,7 +47,7 @@ pub fn new_http_server( config: RpcConfig, seuquencer_core: Arc>, mempool_handle: MemPoolHandle, - indexer_core: Option>>, + indexer_core: Option, ) -> io::Result<(actix_web::dev::Server, SocketAddr)> { let RpcConfig { addr, diff --git a/sequencer_rpc/src/process.rs b/sequencer_rpc/src/process.rs index 2c0ed866..797cbe52 100644 --- a/sequencer_rpc/src/process.rs +++ b/sequencer_rpc/src/process.rs @@ -323,9 +323,9 @@ impl JsonHandler { let last_block = { if let Some(indexer_state) = &self.indexer_state { - let state = indexer_state.lock().await; + let last_seen_block = indexer_state.latest_seen_block.read().await; - *state.state.latest_seen_block.read().await + *last_seen_block } else { 0 } @@ -474,12 +474,11 @@ mod tests { .unwrap(); let sequencer_core = Arc::new(Mutex::new(sequencer_core)); - let indexer_core = Arc::new(Mutex::new(indexer_core)); ( JsonHandler { sequencer_state: sequencer_core, - indexer_state: Some(indexer_core), + indexer_state: Some(indexer_core.state.clone()), mempool_handle, }, initial_accounts, diff --git a/sequencer_runner/configs/debug/sequencer_config.json b/sequencer_runner/configs/debug/sequencer_config.json index ad43ba65..d7cce5d0 100644 --- a/sequencer_runner/configs/debug/sequencer_config.json +++ b/sequencer_runner/configs/debug/sequencer_config.json @@ -156,7 +156,14 @@ 37 ], "bedrock_config": { - "channel_id": [1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1], - "node_url": "http://localhost:8080" + "channel_id": "0101010101010101010101010101010101010101010101010101010101010101", + "node_url": "http://localhost:8080", + "user": "user", + "password": null, + "indexer_config": { + "resubscribe_interval": 1000, + "start_delay": 1000, + "limit_retry": 10 + } } } diff --git a/sequencer_runner/src/lib.rs b/sequencer_runner/src/lib.rs index 16303db7..ffb95806 100644 --- a/sequencer_runner/src/lib.rs +++ b/sequencer_runner/src/lib.rs @@ -6,7 +6,7 @@ use bedrock_client::BasicAuthCredentials; use clap::Parser; use common::rpc_primitives::RpcConfig; use indexer::IndexerCore; -use log::info; +use log::{error, info}; use sequencer_core::{SequencerCore, config::SequencerConfig}; use sequencer_rpc::new_http_server; use tokio::{sync::Mutex, task::JoinHandle}; @@ -59,14 +59,14 @@ pub async fn startup_sequencer( info!("Sequencer core set up"); - let indexer_core_wrapped = indexer_core.map(|core| Arc::new(Mutex::new(core))); + let indexer_state_wrapped = indexer_core.as_ref().map(|core| core.state.clone()); let seq_core_wrapped = Arc::new(Mutex::new(sequencer_core)); let (http_server, addr) = new_http_server( RpcConfig::with_port(port), Arc::clone(&seq_core_wrapped), mempool_handle, - indexer_core_wrapped.clone(), + indexer_state_wrapped, )?; info!("HTTP server started"); let http_server_handle = http_server.handle(); @@ -94,13 +94,11 @@ pub async fn startup_sequencer( } }); - let indexer_loop_handle = indexer_core_wrapped.map(|indexer_core_wrapped| { + let indexer_loop_handle = indexer_core.map(|indexer_core| { tokio::spawn(async move { - { - let indexer_guard = indexer_core_wrapped.lock().await; - let res = indexer_guard.subscribe_parse_block_stream().await; - - info!("Indexer loop res is {res:#?}"); + match indexer_core.subscribe_parse_block_stream().await { + Ok(()) => unreachable!(), + Err(err) => error!("Indexer loop failed with error: {err:#?}"), } Ok(())