DA Improve indexer integration test (#679)

* Improve indexer test

* Use workspace gh api token

* Unpin rustls version for libp2p
This commit is contained in:
gusto 2024-07-24 18:59:47 +03:00 committed by GitHub
parent fcda9a6da8
commit 4cd3d040f3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 47 additions and 4 deletions

View File

@ -22,6 +22,8 @@ jobs:
with:
submodules: true
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rs/toolchain@v1
with:
profile: minimal
@ -46,6 +48,8 @@ jobs:
with:
submodules: true
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
# Setup Rust toolchain with GNU for Windows
- name: Setup Rust with GNU toolchain (Windows)
if: matrix.os == 'windows-latest'
@ -103,6 +107,8 @@ jobs:
with:
submodules: true
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rs/toolchain@v1
with:
profile: minimal

View File

@ -17,6 +17,8 @@ jobs:
with:
submodules: true
- uses: arduino/setup-protoc@v3
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Checkout submodules
run: git submodule update --init --recursive
- uses: actions-rs/toolchain@v1

View File

@ -22,7 +22,6 @@ hex = "0.4.3"
log = "0.4.19"
thiserror = "1.0.40"
tracing = "0.1"
rustls = { version = "=0.23.10", default-features = false } # https://github.com/libp2p/rust-libp2p/issues/5487
[dev-dependencies]
rand = "0.8.5"

View File

@ -24,6 +24,7 @@ nomos-libp2p = { path = "../../../nomos-libp2p" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1.15"
tempfile = "3.6"
tracing = "0.1"
time = "0.3"

View File

@ -1,16 +1,17 @@
// std
use std::hash::{DefaultHasher, Hash};
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;
use std::time::Duration;
use crate::common::*;
// crates
use blake2::{
digest::{Update, VariableOutput},
Blake2bVar,
};
use bytes::Bytes;
use cryptarchia_consensus::ConsensusMsg;
use cryptarchia_consensus::TimeConfig;
use cryptarchia_ledger::{Coin, LedgerState};
use full_replication::attestation::Attestation;
@ -38,6 +39,9 @@ use overwatch_rs::services::handle::ServiceHandle;
use rand::{thread_rng, Rng};
use tempfile::{NamedTempFile, TempDir};
use time::OffsetDateTime;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
// internal
use crate::common::*;
#[derive(Services)]
struct IndexerNode {
@ -202,6 +206,7 @@ fn test_indexer() {
let mempool = node1.handle().relay::<DaMempool>();
let storage = node1.handle().relay::<StorageService<RocksBackend<Wire>>>();
let indexer = node1.handle().relay::<DaIndexer>();
let consensus = node1.handle().relay::<Cryptarchia>();
let blob_hash = [9u8; 32];
let app_id = [7u8; 32];
@ -249,6 +254,19 @@ fn test_indexer() {
let mempool_outbound = mempool.connect().await.unwrap();
let storage_outbound = storage.connect().await.unwrap();
let indexer_outbound = indexer.connect().await.unwrap();
let consensus_outbound = consensus.connect().await.unwrap();
let (sender, receiver) = tokio::sync::oneshot::channel();
consensus_outbound
.send(ConsensusMsg::BlockSubscribe { sender })
.await
.unwrap();
let broadcast_receiver = receiver.await.unwrap();
let mut broadcast_receiver =
BroadcastStream::new(broadcast_receiver).filter_map(|result| match result {
Ok(block) => Some(block),
Err(_) => None,
});
// Mock attested blob by writting directly into the da storage.
let mut attested_key = Vec::from(b"da/attested/" as &[u8]);
@ -275,7 +293,24 @@ fn test_indexer() {
let _ = mempool_rx.await.unwrap();
// Wait for block in the network.
tokio::time::sleep(Duration::from_secs(2)).await;
let timeout = tokio::time::sleep(Duration::from_secs(10));
tokio::pin!(timeout);
loop {
tokio::select! {
Some(block) = broadcast_receiver.next() => {
if block.blobs().any(|v| *v == vid) {
break;
}
}
_ = &mut timeout => {
break;
}
}
}
// Give time for services to process and store data.
tokio::time::sleep(Duration::from_secs(1)).await;
// Request range of vids from indexer.
let (indexer_tx, indexer_rx) = tokio::sync::oneshot::channel();