diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 38161a0f..800b0422 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -105,7 +105,7 @@ jobs: env: RISC0_DEV_MODE: "1" RUST_LOG: "info" - run: cargo nextest run --no-fail-fast -- --skip tps_test + run: cargo nextest run --no-fail-fast -- --skip tps_test --skip indexer_run_local_node valid-proof-test: runs-on: ubuntu-latest @@ -127,6 +127,33 @@ jobs: RUST_LOG: "info" run: cargo test -p integration_tests -- --exact private::private_transfer_to_owned_account + indexer-test: + runs-on: ubuntu-latest + timeout-minutes: 60 + steps: + - uses: actions/checkout@v5 + with: + ref: ${{ github.head_ref }} + + - uses: ./.github/actions/install-system-deps + + - uses: ./.github/actions/install-risc0 + + - name: Install active toolchain + run: rustup install + + - name: Test indexer run + env: + RUST_LOG: "info" + run: | + git clone https://github.com/logos-blockchain/logos-blockchain.git + cd logos-blockchain + chmod 777 ./scripts/setup-nomos-circuits.sh && ./scripts/setup-nomos-circuits.sh + cargo build --all-features --all-targets + CONSENSUS_SLOT_TIME=5 POL_PROOF_DEV_MODE=true target/debug/nomos-node nodes/nomos-node/config-one-node.yaml --dev-mode-reset-chain-clock > /dev/null 2>&1 + cd .. + cargo test -p integration_tests -- --exact indexer_run_local_node + artifacts: runs-on: ubuntu-latest timeout-minutes: 60 diff --git a/indexer/src/lib.rs b/indexer/src/lib.rs index 93c5094b..f797ce1b 100644 --- a/indexer/src/lib.rs +++ b/indexer/src/lib.rs @@ -2,6 +2,7 @@ use anyhow::Result; use bedrock_client::{BasicAuthCredentials, BedrockClient}; use common::block::{BlockHash, HashableBlockData}; use futures::StreamExt; +use log::info; use nomos_core::mantle::{ Op, SignedMantleTx, ops::channel::{ChannelId, inscribe::InscriptionOp}, @@ -49,15 +50,21 @@ impl IndexerCore { .await?, ); + info!("Block stream joined"); + while let Some(block_info) = stream_pinned.next().await { let header_id = block_info.header_id; + info!("Observed L1 block at height {}", block_info.height); + if let Some(l1_block) = self .bedrock_client .0 .get_block_by_id(self.bedrock_url.clone(), header_id) .await? { + info!("Extracted L1 block at height {} with data {l1_block:#?}", block_info.height); + let l2_blocks_parsed = parse_blocks( l1_block.into_transactions().into_iter(), &self.config.channel_id, diff --git a/integration_tests/configs/sequencer/sequencer_config.json b/integration_tests/configs/sequencer/sequencer_config.json index 1548bb5b..dd6ce12d 100644 --- a/integration_tests/configs/sequencer/sequencer_config.json +++ b/integration_tests/configs/sequencer/sequencer_config.json @@ -154,5 +154,14 @@ 37, 37, 37 - ] + ], + "bedrock_addr": "http://127.0.0.1:8080", + "bedrock_auth": [ + "user", + "password" + ], + "indexer_config": { + "resubscribe_interval": 1000, + "channel_id": "2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a" + } } \ No newline at end of file diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index 12d718ec..3602e9eb 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -38,6 +38,7 @@ static LOGGER: LazyLock<()> = LazyLock::new(env_logger::init); pub struct TestContext { sequencer_server_handle: ServerHandle, sequencer_loop_handle: JoinHandle>, + indexer_loop_handle: JoinHandle>, sequencer_client: SequencerClient, wallet: WalletCore, _temp_sequencer_dir: TempDir, @@ -68,7 +69,7 @@ impl TestContext { debug!("Test context setup"); - let (sequencer_server_handle, sequencer_addr, sequencer_loop_handle, temp_sequencer_dir) = + let (sequencer_server_handle, sequencer_addr, sequencer_loop_handle, temp_sequencer_dir, indexer_loop_handle) = Self::setup_sequencer(sequencer_config) .await .context("Failed to setup sequencer")?; @@ -92,6 +93,7 @@ impl TestContext { Ok(Self { sequencer_server_handle, sequencer_loop_handle, + indexer_loop_handle, sequencer_client, wallet, _temp_sequencer_dir: temp_sequencer_dir, @@ -101,7 +103,7 @@ impl TestContext { async fn setup_sequencer( mut config: SequencerConfig, - ) -> Result<(ServerHandle, SocketAddr, JoinHandle>, TempDir)> { + ) -> Result<(ServerHandle, SocketAddr, JoinHandle>, TempDir, JoinHandle>)> { let temp_sequencer_dir = tempfile::tempdir().context("Failed to create temp dir for sequencer home")?; @@ -113,7 +115,7 @@ impl TestContext { // Setting port to 0 lets the OS choose a free port for us config.port = 0; - let (sequencer_server_handle, sequencer_addr, sequencer_loop_handle) = + let (sequencer_server_handle, sequencer_addr, sequencer_loop_handle, indexer_loop_handle) = sequencer_runner::startup_sequencer(config).await?; Ok(( @@ -121,6 +123,7 @@ impl TestContext { sequencer_addr, sequencer_loop_handle, temp_sequencer_dir, + indexer_loop_handle )) } @@ -180,6 +183,7 @@ impl Drop for TestContext { let Self { sequencer_server_handle, sequencer_loop_handle, + indexer_loop_handle, sequencer_client: _, wallet: _, _temp_sequencer_dir, @@ -187,6 +191,7 @@ impl Drop for TestContext { } = self; sequencer_loop_handle.abort(); + indexer_loop_handle.abort(); // Can't wait here as Drop can't be async, but anyway stop signal should be sent sequencer_server_handle.stop(true).now_or_never(); diff --git a/integration_tests/tests/indexer.rs b/integration_tests/tests/indexer.rs new file mode 100644 index 00000000..b30499d5 --- /dev/null +++ b/integration_tests/tests/indexer.rs @@ -0,0 +1,22 @@ +use anyhow::Result; +use integration_tests::TestContext; +use log::info; +use tokio::test; + +#[test] +async fn indexer_run_local_node() -> Result<()> { + println!("Waiting 20 seconds for L1 node to start producing"); + tokio::time::sleep(std::time::Duration::from_secs(20)).await; + + let ctx = TestContext::new().await?; + + info!("Let's observe behaviour"); + + tokio::time::sleep(std::time::Duration::from_secs(30)).await; + + let gen_id = ctx.sequencer_client().get_genesis_id().await.unwrap(); + + info!("btw, gen id is {gen_id:?}"); + + Ok(()) +} \ No newline at end of file diff --git a/sequencer_rpc/src/process.rs b/sequencer_rpc/src/process.rs index 097fb47b..51fba4c0 100644 --- a/sequencer_rpc/src/process.rs +++ b/sequencer_rpc/src/process.rs @@ -390,7 +390,7 @@ mod tests { initial_accounts, initial_commitments: vec![], signing_key: *sequencer_sign_key_for_testing().value(), - bedrock_addr: "0.0.0.0".to_string(), + bedrock_addr: "http://127.0.0.1:8080".to_string(), bedrock_auth: ("".to_string(), "".to_string()), indexer_config: IndexerConfig { resubscribe_interval: 100, diff --git a/sequencer_runner/src/lib.rs b/sequencer_runner/src/lib.rs index c1863a17..35a60e4d 100644 --- a/sequencer_runner/src/lib.rs +++ b/sequencer_runner/src/lib.rs @@ -22,7 +22,7 @@ struct Args { pub async fn startup_sequencer( app_config: SequencerConfig, -) -> Result<(ServerHandle, SocketAddr, JoinHandle>)> { +) -> Result<(ServerHandle, SocketAddr, JoinHandle>, JoinHandle>)> { let block_timeout = app_config.block_create_timeout_millis; let port = app_config.port; @@ -78,7 +78,18 @@ pub async fn startup_sequencer( } }); - Ok((http_server_handle, addr, main_loop_handle)) + let indexer_loop_handle = tokio::spawn(async move { + { + let indexer_guard = indexer_core_wrapped.lock().await; + let res = indexer_guard.subscribe_parse_block_stream().await; + + info!("Indexer loop res is {res:#?}"); + } + + Ok(()) + }); + + Ok((http_server_handle, addr, main_loop_handle, indexer_loop_handle)) } pub async fn main_runner() -> Result<()> { @@ -98,9 +109,10 @@ pub async fn main_runner() -> Result<()> { } // ToDo: Add restart on failures - let (_, _, main_loop_handle) = startup_sequencer(app_config).await?; + let (_, _, main_loop_handle, indexer_loop_handle) = startup_sequencer(app_config).await?; main_loop_handle.await??; + indexer_loop_handle.await??; Ok(()) }