diff --git a/nomos-cli/src/da/network/adapters/libp2p.rs b/nomos-cli/src/da/network/adapters/libp2p.rs index 99652cb6..778a1017 100644 --- a/nomos-cli/src/da/network/adapters/libp2p.rs +++ b/nomos-cli/src/da/network/adapters/libp2p.rs @@ -1,13 +1,19 @@ -use futures::future::join_all; // std +use std::collections::HashSet; // crates +use futures::{future::join_all, StreamExt}; use kzgrs_backend::{common::blob::DaBlob, encoder::EncodedData as KzgEncodedData}; use nomos_core::da::DaDispersal; +use nomos_da_network_core::SubnetworkId; use nomos_da_network_service::{DaNetworkMsg, NetworkService}; use overwatch_rs::services::{relay::OutboundRelay, ServiceData}; use thiserror::Error; +use tokio::sync::oneshot; // internal -use crate::da::{network::backend::Command, NetworkBackend}; +use crate::da::{ + network::{backend::Command, swarm::DispersalEvent}, + NetworkBackend, +}; type Relay = OutboundRelay< as ServiceData>::Message>; @@ -39,6 +45,14 @@ impl DaDispersal for Libp2pExecutorDispersalAdapter { async fn disperse(&self, encoded_data: Self::EncodedData) -> Result<(), Self::Error> { let mut tasks = Vec::new(); + let (sender, receiver) = oneshot::channel(); + self.network_relay + .send(DaNetworkMsg::Subscribe { kind: (), sender }) + .await + .map_err(|(e, _)| e.to_string())?; + let mut event_stream = receiver.await.map_err(|e| e.to_string())?; + let mut expected_acknowledgments = HashSet::new(); + for (i, column) in encoded_data.extended_data.columns().enumerate() { let blob = DaBlob { column: column.clone(), @@ -56,6 +70,8 @@ impl DaDispersal for Libp2pExecutorDispersalAdapter { .collect(), }; + expected_acknowledgments.insert((blob.id().clone(), i as SubnetworkId)); + let relay = self.network_relay.clone(); let command = DaNetworkMsg::Process(Command::Disperse { blob, @@ -73,6 +89,28 @@ impl DaDispersal for Libp2pExecutorDispersalAdapter { result?; } + while !expected_acknowledgments.is_empty() { + let event = event_stream.next().await; + match event { + Some(event) => match event { + DispersalEvent::DispersalSuccess { + blob_id, + subnetwork_id, + } => { + expected_acknowledgments.remove(&(blob_id.to_vec(), subnetwork_id)); + } + DispersalEvent::DispersalError { error } => { + return Err(DispersalError(format!("Received dispersal error: {error}"))); + } + }, + None => { + return Err(DispersalError( + "Event stream ended before receiving all acknowledgments".into(), + )); + } + } + } + Ok(()) } } diff --git a/nomos-cli/src/da/network/backend.rs b/nomos-cli/src/da/network/backend.rs index 5e9fc520..08466ad0 100644 --- a/nomos-cli/src/da/network/backend.rs +++ b/nomos-cli/src/da/network/backend.rs @@ -1,5 +1,5 @@ // std -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::marker::PhantomData; use std::pin::Pin; @@ -59,6 +59,7 @@ pub struct ExecutorBackendSettings { /// Membership of DA network PoV set pub membership: Membership, pub node_addrs: HashMap, + pub num_subnets: u16, } impl ExecutorBackend { @@ -96,18 +97,43 @@ where let keypair = libp2p::identity::Keypair::from(ed25519::Keypair::from(config.node_key.clone())); let mut executor_swarm = - ExecutorSwarm::new(keypair, config.membership, dispersal_events_sender); + ExecutorSwarm::new(keypair, config.membership.clone(), dispersal_events_sender); let dispersal_request_sender = executor_swarm.blobs_sender(); - for (_, addr) in config.node_addrs { + let mut connected_peers = HashSet::new(); + + let local_peer_id = *executor_swarm.local_peer_id(); + for subnetwork_id in 0..config.num_subnets { + // Connect to one peer in a subnet. + let mut members = config.membership.members_of(&(subnetwork_id as u32)); + members.remove(&local_peer_id); + let peer_id = *members + .iter() + .next() + .expect("Subnet should have at least one node which is not the nomos_cli"); + + let addr = config + .node_addrs + .get(&peer_id) + .expect("Peer address should be in the list"); + executor_swarm - .dial(addr) + .dial(addr.clone()) .expect("Should schedule the dials"); + + connected_peers.insert(peer_id); } + let executor_open_stream_sender = executor_swarm.open_stream_sender(); + let task = overwatch_handle .runtime() .spawn(async move { executor_swarm.run().await }); + + for peer_id in connected_peers.iter() { + executor_open_stream_sender.send(*peer_id).unwrap(); + } + let (dispersal_broadcast_sender, dispersal_broadcast_receiver) = broadcast::channel(BROADCAST_CHANNEL_SIZE); let dispersal_events_receiver = UnboundedReceiverStream::new(dispersal_events_receiver); diff --git a/nomos-cli/src/da/network/swarm.rs b/nomos-cli/src/da/network/swarm.rs index d5ef009c..2f0674a6 100644 --- a/nomos-cli/src/da/network/swarm.rs +++ b/nomos-cli/src/da/network/swarm.rs @@ -1,4 +1,5 @@ // std +use std::time::Duration; // crates use kzgrs_backend::common::blob::DaBlob; use libp2p::futures::StreamExt; @@ -56,6 +57,10 @@ where self.swarm.behaviour().blobs_sender() } + pub fn open_stream_sender(&self) -> UnboundedSender { + self.swarm.behaviour().open_stream_sender() + } + fn build_swarm( key: Keypair, membership: Membership, @@ -65,6 +70,9 @@ where .with_quic() .with_behaviour(|_key| DispersalExecutorBehaviour::new(membership)) .expect("Validator behaviour should build") + .with_swarm_config(|cfg| { + cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX)) + }) .build() } @@ -73,29 +81,36 @@ where Ok(()) } + pub fn local_peer_id(&self) -> &PeerId { + self.swarm.local_peer_id() + } + pub async fn run(&mut self) { - tokio::select! { - Some(event) = self.swarm.next() => { - match event { - SwarmEvent::Behaviour(behaviour_event) => { - self.handle_dispersal_event(behaviour_event).await; - }, - SwarmEvent::ConnectionEstablished{ .. } => {} - SwarmEvent::ConnectionClosed{ .. } => {} - SwarmEvent::IncomingConnection{ .. } => {} - SwarmEvent::IncomingConnectionError{ .. } => {} - SwarmEvent::OutgoingConnectionError{ .. } => {} - SwarmEvent::NewListenAddr{ .. } => {} - SwarmEvent::ExpiredListenAddr{ .. } => {} - SwarmEvent::ListenerClosed{ .. } => {} - SwarmEvent::ListenerError{ .. } => {} - SwarmEvent::Dialing{ .. } => {} - SwarmEvent::NewExternalAddrCandidate{ .. } => {} - SwarmEvent::ExternalAddrConfirmed{ .. } => {} - SwarmEvent::ExternalAddrExpired{ .. } => {} - SwarmEvent::NewExternalAddrOfPeer{ .. } => {} - event => { - debug!("Unsupported validator swarm event: {event:?}"); + loop { + tokio::select! { + Some(event) = self.swarm.next() => { + debug!("Executor received an event: {event:?}"); + match event { + SwarmEvent::Behaviour(behaviour_event) => { + self.handle_dispersal_event(behaviour_event).await; + }, + SwarmEvent::ConnectionEstablished{ .. } => {} + SwarmEvent::ConnectionClosed{ .. } => {} + SwarmEvent::IncomingConnection{ .. } => {} + SwarmEvent::IncomingConnectionError{ .. } => {} + SwarmEvent::OutgoingConnectionError{ .. } => {} + SwarmEvent::NewListenAddr{ .. } => {} + SwarmEvent::ExpiredListenAddr{ .. } => {} + SwarmEvent::ListenerClosed{ .. } => {} + SwarmEvent::ListenerError{ .. } => {} + SwarmEvent::Dialing{ .. } => {} + SwarmEvent::NewExternalAddrCandidate{ .. } => {} + SwarmEvent::ExternalAddrConfirmed{ .. } => {} + SwarmEvent::ExternalAddrExpired{ .. } => {} + SwarmEvent::NewExternalAddrOfPeer{ .. } => {} + event => { + debug!("Unsupported validator swarm event: {event:?}"); + } } } } diff --git a/nomos-da/network/core/src/swarm/validator.rs b/nomos-da/network/core/src/swarm/validator.rs index 2a17cef4..5d2bc37d 100644 --- a/nomos-da/network/core/src/swarm/validator.rs +++ b/nomos-da/network/core/src/swarm/validator.rs @@ -1,4 +1,5 @@ // std +use std::time::Duration; // crates use futures::StreamExt; use kzgrs_backend::common::blob::DaBlob; @@ -68,6 +69,9 @@ where .with_quic() .with_behaviour(|key| ValidatorBehaviour::new(key, membership, addresses)) .expect("Validator behaviour should build") + .with_swarm_config(|cfg| { + cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX)) + }) .build() } diff --git a/nomos-services/data-availability/sampling/src/backend/kzgrs.rs b/nomos-services/data-availability/sampling/src/backend/kzgrs.rs index 419888d9..48fafdcd 100644 --- a/nomos-services/data-availability/sampling/src/backend/kzgrs.rs +++ b/nomos-services/data-availability/sampling/src/backend/kzgrs.rs @@ -5,7 +5,6 @@ use std::time::{Duration, Instant}; // crates use hex; -use rand::distributions::Standard; use rand::prelude::*; use serde::{Deserialize, Serialize}; use tokio::time; @@ -28,6 +27,7 @@ pub struct SamplingContext { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct KzgrsSamplingBackendSettings { pub num_samples: u16, + pub num_subnets: u16, pub old_blobs_check_interval: Duration, pub blobs_validity_duration: Duration, } @@ -116,10 +116,9 @@ impl DaSamplingServiceBackend for KzgrsSamplingBackend< return SamplingState::Terminated; } - let subnets: Vec = Standard - .sample_iter(&mut self.rng) - .take(self.settings.num_samples as usize) - .collect(); + let subnets: Vec = (0..self.settings.num_subnets as SubnetworkId) + .choose_multiple(&mut self.rng, self.settings.num_samples.into()); + let ctx: SamplingContext = SamplingContext { subnets: HashSet::new(), started: Instant::now(), @@ -149,9 +148,10 @@ mod test { use kzgrs_backend::common::{blob::DaBlob, Column}; use nomos_core::da::BlobId; - fn create_sampler(subnet_num: usize) -> KzgrsSamplingBackend { + fn create_sampler(num_samples: usize, num_subnets: usize) -> KzgrsSamplingBackend { let settings = KzgrsSamplingBackendSettings { - num_samples: subnet_num as u16, + num_samples: num_samples as u16, + num_subnets: num_subnets as u16, old_blobs_check_interval: Duration::from_millis(20), blobs_validity_duration: Duration::from_millis(10), }; @@ -159,13 +159,46 @@ mod test { KzgrsSamplingBackend::new(settings, rng) } + #[tokio::test] + async fn test_init_sampling_subnet_range() { + let number_of_subnets = 42; + let num_samples = 50; // Testing with more samples than subnets to check the limit + let mut backend = create_sampler(num_samples, number_of_subnets); + + let blob_id = BlobId::default(); + let state = backend.init_sampling(blob_id).await; + + if let SamplingState::Init(subnets) = state { + let unique_subnet_ids: HashSet<_> = subnets.iter().cloned().collect(); + + assert_eq!( + unique_subnet_ids.len(), + subnets.len(), + "Subnet IDs are not unique" + ); + + assert_eq!( + subnets.len(), + number_of_subnets.min(num_samples), + "Incorrect number of subnet IDs selected" + ); + + for subnet_id in subnets { + assert!( + (subnet_id as usize) < number_of_subnets, + "Subnet ID is out of range" + ); + } + } + } + #[tokio::test] async fn test_sampler() { // fictitious number of subnets let subnet_num: usize = 42; // create a sampler instance - let sampler = &mut create_sampler(subnet_num); + let sampler = &mut create_sampler(subnet_num, 42); // create some blobs and blob_ids let b1: BlobId = sampler.rng.gen(); @@ -309,7 +342,7 @@ mod test { #[tokio::test] async fn test_pruning() { - let mut sampler = create_sampler(42); + let mut sampler = create_sampler(42, 42); // create some sampling contexes // first set will go through as in time diff --git a/nomos-services/data-availability/sampling/src/network/adapters/libp2p.rs b/nomos-services/data-availability/sampling/src/network/adapters/libp2p.rs index 302c2188..fc2533b0 100644 --- a/nomos-services/data-availability/sampling/src/network/adapters/libp2p.rs +++ b/nomos-services/data-availability/sampling/src/network/adapters/libp2p.rs @@ -17,15 +17,8 @@ use nomos_da_network_service::{DaNetworkMsg, NetworkService}; use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::ServiceData; use overwatch_rs::DynError; -use serde::{Deserialize, Serialize}; use subnetworks_assignations::MembershipHandler; -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct DaNetworkSamplingSettings { - pub num_samples: u16, - pub subnet_size: SubnetworkId, -} - pub struct Libp2pAdapter where Membership: MembershipHandler @@ -51,7 +44,7 @@ where + 'static, { type Backend = DaNetworkValidatorBackend; - type Settings = DaNetworkSamplingSettings; + type Settings = (); async fn new( network_relay: OutboundRelay< as ServiceData>::Message>, diff --git a/nomos-services/data-availability/tests/Cargo.toml b/nomos-services/data-availability/tests/Cargo.toml index 02a27c93..7a0dc492 100644 --- a/nomos-services/data-availability/tests/Cargo.toml +++ b/nomos-services/data-availability/tests/Cargo.toml @@ -25,6 +25,7 @@ nomos-storage = { path = "../../../nomos-services/storage", features = ["rocksdb nomos-log = { path = "../../log" } nomos-network = { path = "../../network", features = ["mock"] } nomos-libp2p = { path = "../../../nomos-libp2p" } +libp2p = { version = "0.53.2", features = ["ed25519"] } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } rand = "0.8" @@ -40,6 +41,6 @@ time = "0.3" blake2 = { version = "0.10" } [features] -default = [] +default = ["libp2p"] libp2p = [] mixnet = [] diff --git a/nomos-services/data-availability/tests/src/common.rs b/nomos-services/data-availability/tests/src/common.rs index 845a954c..526fd245 100644 --- a/nomos-services/data-availability/tests/src/common.rs +++ b/nomos-services/data-availability/tests/src/common.rs @@ -1,36 +1,80 @@ +// std +use std::path::PathBuf; +use std::time::Duration; +// crates use bytes::Bytes; -use full_replication::BlobInfo; +use cl::InputWitness; +use cryptarchia_consensus::TimeConfig; +use cryptarchia_ledger::LedgerState; use kzgrs_backend::common::blob::DaBlob; +use kzgrs_backend::dispersal::BlobInfo; +use kzgrs_backend::encoder::DaEncoder; +use kzgrs_backend::encoder::DaEncoderParams; +use libp2p::identity::{ + ed25519::{self, Keypair as Ed25519Keypair}, + Keypair, PeerId, +}; use nomos_core::{da::blob::info::DispersedBlobInfo, header::HeaderId, tx::Transaction}; +pub use nomos_core::{ + da::blob::select::FillSize as FillSizeWithBlobs, tx::select::FillSize as FillSizeWithTx, +}; use nomos_da_indexer::consensus::adapters::cryptarchia::CryptarchiaConsensusAdapter; use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapter as IndexerStorageAdapter; +use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageSettings; use nomos_da_indexer::DataIndexerService; +use nomos_da_indexer::IndexerSettings; +use nomos_da_network_service::backends::libp2p::validator::{ + DaNetworkValidatorBackend, DaNetworkValidatorBackendSettings, +}; +use nomos_da_network_service::NetworkConfig as DaNetworkConfig; +use nomos_da_network_service::NetworkService as DaNetworkService; +use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings; use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapter as SamplingStorageAdapter; +use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as SamplingStorageSettings; use nomos_da_sampling::DaSamplingService; +use nomos_da_sampling::DaSamplingServiceSettings; use nomos_da_sampling::{ backend::kzgrs::KzgrsSamplingBackend, network::adapters::libp2p::Libp2pAdapter as SamplingLibp2pAdapter, }; use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier; +use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings; use nomos_da_verifier::network::adapters::libp2p::Libp2pAdapter; use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapter as VerifierStorageAdapter; +use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageSettings; use nomos_da_verifier::DaVerifierService; +use nomos_da_verifier::DaVerifierServiceSettings; use nomos_libp2p::{Multiaddr, Swarm, SwarmConfig}; -use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter; -use nomos_mempool::{backend::mockpool::MockPool, TxMempoolService}; -use nomos_storage::backends::rocksdb::RocksBackend; -use rand_chacha::ChaCha20Rng; -use subnetworks_assignations::versions::v1::FillFromNodeList; - -pub use nomos_core::{ - da::blob::select::FillSize as FillSizeWithBlobs, tx::select::FillSize as FillSizeWithTx, -}; use nomos_mempool::da::service::DaMempoolService; +use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter; +use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings; +use nomos_mempool::{backend::mockpool::MockPool, TxMempoolService}; +use nomos_mempool::{DaMempoolSettings, TxMempoolSettings}; +use nomos_network::backends::libp2p::{Libp2p as NetworkBackend, Libp2pConfig}; +use nomos_network::NetworkConfig; +use nomos_network::NetworkService; use nomos_node::{Tx, Wire}; +use nomos_storage::backends::rocksdb::RocksBackend; +use nomos_storage::StorageService; +use overwatch_derive::*; +use overwatch_rs::overwatch::{Overwatch, OverwatchRunner}; +use overwatch_rs::services::handle::ServiceHandle; +use rand::{Rng, RngCore}; +use subnetworks_assignations::versions::v1::FillFromNodeList; +// internal +use crate::rng::TestRng; + +type IntegrationRng = TestRng; /// Membership used by the DA Network service. pub type NomosDaMembership = FillFromNodeList; +pub const PARAMS: DaEncoderParams = DaEncoderParams::default_with(2); +pub const ENCODER: DaEncoder = DaEncoder::new(PARAMS); + +pub const SK1: &str = "aca2c52f5928a53de79679daf390b0903eeccd9671b4350d49948d84334874806afe68536da9e076205a2af0af350e6c50851a040e3057b6544a29f5689ccd31"; +pub const SK2: &str = "f9dc26eea8bc56d9a4c59841b438665b998ce5e42f49f832df5b770a725c2daafee53b33539127321f6f5085e42902bd380e82d18a7aff6404e632b842106785"; + pub(crate) type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus< cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter, MockPool::Hash>, @@ -40,24 +84,24 @@ pub(crate) type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus< FillSizeWithTx, FillSizeWithBlobs, RocksBackend, - KzgrsSamplingBackend, + KzgrsSamplingBackend, SamplingLibp2pAdapter, - ChaCha20Rng, + IntegrationRng, SamplingStorageAdapter, >; pub type DaSampling = DaSamplingService< - KzgrsSamplingBackend, + KzgrsSamplingBackend, SamplingLibp2pAdapter, - ChaCha20Rng, + IntegrationRng, SamplingStorageAdapter, >; pub(crate) type DaIndexer = DataIndexerService< // Indexer specific. Bytes, - IndexerStorageAdapter, - CryptarchiaConsensusAdapter, + IndexerStorageAdapter, + CryptarchiaConsensusAdapter, // Cryptarchia specific, should be the same as in `Cryptarchia` type above. cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter, MockPool::Hash>, @@ -67,9 +111,9 @@ pub(crate) type DaIndexer = DataIndexerService< FillSizeWithTx, FillSizeWithBlobs, RocksBackend, - KzgrsSamplingBackend, + KzgrsSamplingBackend, SamplingLibp2pAdapter, - ChaCha20Rng, + IntegrationRng, SamplingStorageAdapter, >; @@ -81,9 +125,9 @@ pub(crate) type TxMempool = TxMempoolService< pub type DaMempool = DaMempoolService< MempoolNetworkAdapter::BlobId>, MockPool::BlobId>, - KzgrsSamplingBackend, + KzgrsSamplingBackend, nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter, - ChaCha20Rng, + IntegrationRng, SamplingStorageAdapter, >; @@ -95,6 +139,190 @@ pub(crate) type DaVerifier = DaVerifierService< pub(crate) const MB16: usize = 1024 * 1024 * 16; +#[derive(Services)] +pub struct TestNode { + //logging: ServiceHandle, + network: ServiceHandle>, + cl_mempool: ServiceHandle, + da_network: ServiceHandle>>, + da_mempool: ServiceHandle, + storage: ServiceHandle>>, + cryptarchia: ServiceHandle, + indexer: ServiceHandle, + verifier: ServiceHandle, + da_sampling: ServiceHandle, +} + +pub struct TestDaNetworkSettings { + pub peer_addresses: Vec<(PeerId, Multiaddr)>, + pub listening_address: Multiaddr, + pub num_subnets: u16, + pub num_samples: u16, + pub nodes_per_subnet: u16, + pub node_key: ed25519::SecretKey, +} + +pub fn new_node( + note: &InputWitness, + ledger_config: &cryptarchia_ledger::Config, + genesis_state: &LedgerState, + time_config: &TimeConfig, + swarm_config: &SwarmConfig, + db_path: PathBuf, + blobs_dir: &PathBuf, + initial_peers: Vec, + verifier_settings: KzgrsDaVerifierSettings, + da_network_settings: TestDaNetworkSettings, +) -> Overwatch { + OverwatchRunner::::run( + TestNodeServiceSettings { + //logging: Default::default(), + network: NetworkConfig { + backend: Libp2pConfig { + inner: swarm_config.clone(), + initial_peers, + }, + }, + da_network: DaNetworkConfig { + backend: DaNetworkValidatorBackendSettings { + node_key: da_network_settings.node_key, + membership: FillFromNodeList::new( + &da_network_settings + .peer_addresses + .iter() + .map(|(peer_id, _)| peer_id.clone()) + .collect::>(), + da_network_settings.num_subnets.into(), + da_network_settings.nodes_per_subnet.into(), + ), + addresses: da_network_settings.peer_addresses, + listening_address: da_network_settings.listening_address, + }, + }, + cl_mempool: TxMempoolSettings { + backend: (), + network: AdapterSettings { + topic: String::from(nomos_node::CL_TOPIC), + id: ::hash, + }, + registry: None, + }, + da_mempool: DaMempoolSettings { + backend: (), + network: AdapterSettings { + topic: String::from(nomos_node::DA_TOPIC), + id: ::blob_id, + }, + registry: None, + }, + storage: nomos_storage::backends::rocksdb::RocksBackendSettings { + db_path, + read_only: false, + column_family: Some("blocks".into()), + }, + indexer: IndexerSettings { + storage: IndexerStorageSettings { + blob_storage_directory: blobs_dir.clone(), + }, + }, + cryptarchia: cryptarchia_consensus::CryptarchiaSettings { + transaction_selector_settings: (), + blob_selector_settings: (), + config: ledger_config.clone(), + genesis_state: genesis_state.clone(), + time: time_config.clone(), + notes: vec![note.clone()], + }, + verifier: DaVerifierServiceSettings { + verifier_settings, + network_adapter_settings: (), + storage_adapter_settings: VerifierStorageSettings { + blob_storage_directory: blobs_dir.clone(), + }, + }, + da_sampling: DaSamplingServiceSettings { + // TODO: setup this properly! + sampling_settings: KzgrsSamplingBackendSettings { + num_samples: da_network_settings.num_samples, + num_subnets: da_network_settings.num_subnets, + // Sampling service period can't be zero. + old_blobs_check_interval: Duration::from_secs(1), + blobs_validity_duration: Duration::from_secs(15), + }, + network_adapter_settings: (), + storage_adapter_settings: SamplingStorageSettings { + blob_storage_directory: blobs_dir.clone(), + }, + }, + }, + None, + ) + .map_err(|e| eprintln!("Error encountered: {}", e)) + .unwrap() +} + +// Client node is only created for asyncroniously interact with nodes in the test. +// The services defined in it are not used. +#[derive(Services)] +pub struct TestClient { + storage: ServiceHandle>>, +} + +// Client node is just an empty overwatch service to spawn a task that could communicate with other +// nodes and manage the data availability cycle during tests. +pub fn new_client(db_path: PathBuf) -> Overwatch { + OverwatchRunner::::run( + TestClientServiceSettings { + storage: nomos_storage::backends::rocksdb::RocksBackendSettings { + db_path, + read_only: false, + column_family: None, + }, + }, + None, + ) + .map_err(|e| eprintln!("Error encountered: {}", e)) + .unwrap() +} + pub fn node_address(config: &SwarmConfig) -> Multiaddr { Swarm::multiaddr(std::net::Ipv4Addr::new(127, 0, 0, 1), config.port) } + +pub fn generate_blst_hex_keys() -> (String, String) { + let mut rng = rand::thread_rng(); + let sk_bytes: [u8; 32] = rng.gen(); + let sk = blst::min_sig::SecretKey::key_gen(&sk_bytes, &[]).unwrap(); + + let pk = sk.sk_to_pk(); + (hex::encode(sk.to_bytes()), hex::encode(pk.to_bytes())) +} + +pub fn create_ed25519_sk_peerid(key: &str) -> (ed25519::SecretKey, PeerId) { + let mut b = hex::decode(key).unwrap(); + let ed25519_keypair = Ed25519Keypair::try_from_bytes(&mut b).unwrap(); + let kp = ed25519_keypair.to_bytes(); + println!("sk > {}", hex::encode(kp)); + let secret_key = ed25519_keypair.secret().clone(); + let libp2p_keypair: Keypair = ed25519_keypair.into(); + let peer_id = PeerId::from_public_key(&libp2p_keypair.public()); + + (secret_key, peer_id) +} + +pub fn generate_ed25519_sk_peerid() -> (ed25519::SecretKey, PeerId) { + let ed25519_keypair = Ed25519Keypair::generate(); + let kp = ed25519_keypair.to_bytes(); + println!("sk > {}", hex::encode(kp)); + let secret_key = ed25519_keypair.secret().clone(); + let libp2p_keypair: Keypair = ed25519_keypair.into(); + let peer_id = PeerId::from_public_key(&libp2p_keypair.public()); + + (secret_key, peer_id) +} + +pub fn rand_data(elements_count: usize) -> Vec { + let mut buff = vec![0; elements_count * DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE]; + rand::thread_rng().fill_bytes(&mut buff); + buff +} diff --git a/nomos-services/data-availability/tests/src/indexer_integration.rs b/nomos-services/data-availability/tests/src/indexer_integration.rs index 98df47d6..daac5447 100644 --- a/nomos-services/data-availability/tests/src/indexer_integration.rs +++ b/nomos-services/data-availability/tests/src/indexer_integration.rs @@ -1,134 +1,46 @@ // std -use std::hash::{DefaultHasher, Hash}; -use std::path::PathBuf; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering::SeqCst; -use std::sync::Arc; -use std::time::Duration; +use std::{ + str::FromStr, + sync::{ + atomic::{AtomicBool, Ordering::SeqCst}, + Arc, + }, + time::Duration, +}; // crates use bytes::Bytes; use cl::{InputWitness, NoteWitness, NullifierSecret}; -use cryptarchia_consensus::ConsensusMsg; -use cryptarchia_consensus::TimeConfig; +use cryptarchia_consensus::{ConsensusMsg, TimeConfig}; use cryptarchia_ledger::LedgerState; -use full_replication::{BlobInfo, Metadata}; -use nomos_core::da::blob::info::DispersedBlobInfo; -use nomos_core::da::blob::metadata::Metadata as _; -use nomos_core::{staking::NMO_UNIT, tx::Transaction}; -use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings; -use nomos_da_indexer::IndexerSettings; -use nomos_da_network_service::backends::libp2p::validator::{ - DaNetworkValidatorBackend, DaNetworkValidatorBackendSettings, +use kzgrs_backend::{ + common::blob::DaBlob, + dispersal::{BlobInfo, Metadata}, }; -use nomos_da_network_service::NetworkConfig as DaNetworkConfig; -use nomos_da_network_service::NetworkService as DaNetworkService; -use nomos_da_storage::fs::write_blob; +use nomos_core::da::blob::Blob; +use nomos_core::da::DaEncoder as _; +use nomos_core::{da::blob::metadata::Metadata as _, staking::NMO_UNIT}; use nomos_da_storage::rocksdb::DA_VERIFIED_KEY_PREFIX; -use nomos_libp2p::{ed25519, identity, PeerId}; +use nomos_da_storage::{fs::write_blob, rocksdb::key_bytes}; +use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings; use nomos_libp2p::{Multiaddr, SwarmConfig}; -use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings; -use nomos_mempool::{DaMempoolSettings, TxMempoolSettings}; -use nomos_network::backends::libp2p::{Libp2p as NetworkBackend, Libp2pConfig}; -use nomos_network::{NetworkConfig, NetworkService}; -use nomos_node::{Tx, Wire}; -use nomos_storage::{backends::rocksdb::RocksBackend, StorageService}; -use overwatch_derive::*; -use overwatch_rs::overwatch::{Overwatch, OverwatchRunner}; -use overwatch_rs::services::handle::ServiceHandle; +use nomos_node::Wire; +use nomos_storage::{ + backends::{rocksdb::RocksBackend, StorageSerde}, + StorageService, +}; use rand::{thread_rng, Rng}; -use subnetworks_assignations::versions::v1::FillFromNodeList; use tempfile::{NamedTempFile, TempDir}; use time::OffsetDateTime; -use tokio_stream::{wrappers::BroadcastStream, StreamExt}; +use tokio_stream::wrappers::BroadcastStream; +use tokio_stream::StreamExt; // internal use crate::common::*; -#[derive(Services)] -struct IndexerNode { - network: ServiceHandle>, - cl_mempool: ServiceHandle, - da_network: ServiceHandle>>, - da_mempool: ServiceHandle, - storage: ServiceHandle>>, - cryptarchia: ServiceHandle, - indexer: ServiceHandle, -} - -fn new_node( - note: &InputWitness, - ledger_config: &cryptarchia_ledger::Config, - genesis_state: &LedgerState, - time_config: &TimeConfig, - swarm_config: &SwarmConfig, - db_path: PathBuf, - blobs_dir: &PathBuf, - initial_peers: Vec, -) -> Overwatch { - OverwatchRunner::::run( - IndexerNodeServiceSettings { - network: NetworkConfig { - backend: Libp2pConfig { - inner: swarm_config.clone(), - initial_peers, - }, - }, - da_network: DaNetworkConfig { - backend: DaNetworkValidatorBackendSettings { - node_key: ed25519::SecretKey::generate(), - membership: FillFromNodeList::new( - &[PeerId::from(identity::Keypair::generate_ed25519().public())], - 2, - 1, - ), - addresses: Default::default(), - listening_address: "/ip4/127.0.0.1/udp/0/quic-v1".parse::().unwrap(), - }, - }, - cl_mempool: TxMempoolSettings { - backend: (), - network: AdapterSettings { - topic: String::from(nomos_node::CL_TOPIC), - id: ::hash, - }, - registry: None, - }, - da_mempool: DaMempoolSettings { - backend: (), - network: AdapterSettings { - topic: String::from(nomos_node::DA_TOPIC), - id: ::blob_id, - }, - registry: None, - }, - storage: nomos_storage::backends::rocksdb::RocksBackendSettings { - db_path, - read_only: false, - column_family: Some("blocks".into()), - }, - indexer: IndexerSettings { - storage: RocksAdapterSettings { - blob_storage_directory: blobs_dir.clone(), - }, - }, - cryptarchia: cryptarchia_consensus::CryptarchiaSettings { - transaction_selector_settings: (), - blob_selector_settings: (), - config: ledger_config.clone(), - genesis_state: genesis_state.clone(), - time: time_config.clone(), - notes: vec![note.clone()], - }, - }, - None, - ) - .map_err(|e| eprintln!("Error encountered: {}", e)) - .unwrap() -} +const INDEXER_TEST_MAX_SECONDS: u64 = 60; // TODO: When verifier is implemented this test should be removed and a new one // performed in integration tests crate using the real node. -#[ignore = "Membership needs to be configured correctly"] #[test] fn test_indexer() { let performed_tx = Arc::new(AtomicBool::new(false)); @@ -181,6 +93,21 @@ fn test_indexer() { let blobs_dir = TempDir::new().unwrap().path().to_path_buf(); + let (node1_sk, node1_pk) = generate_blst_hex_keys(); + let (node2_sk, node2_pk) = generate_blst_hex_keys(); + + let (peer_sk_1, peer_id_1) = create_ed25519_sk_peerid(SK1); + let (peer_sk_2, peer_id_2) = create_ed25519_sk_peerid(SK2); + + let addr_1 = Multiaddr::from_str("/ip4/127.0.0.1/udp/8780/quic-v1").unwrap(); + let addr_2 = Multiaddr::from_str("/ip4/127.0.0.1/udp/8781/quic-v1").unwrap(); + + let peer_addresses = vec![(peer_id_1, addr_1.clone()), (peer_id_2, addr_2.clone())]; + + let num_samples = 1; + let num_subnets = 2; + let nodes_per_subnet = 2; + let node1 = new_node( ¬es[0], &ledger_config, @@ -190,9 +117,21 @@ fn test_indexer() { NamedTempFile::new().unwrap().path().to_path_buf(), &blobs_dir, vec![node_address(&swarm_config2)], + KzgrsDaVerifierSettings { + sk: node1_sk.clone(), + nodes_public_keys: vec![node1_pk.clone(), node2_pk.clone()], + }, + TestDaNetworkSettings { + peer_addresses: peer_addresses.clone(), + listening_address: addr_1, + num_subnets, + num_samples, + nodes_per_subnet, + node_key: peer_sk_1, + }, ); - let _node2 = new_node( + let node2 = new_node( ¬es[1], &ledger_config, &genesis_state, @@ -201,19 +140,78 @@ fn test_indexer() { NamedTempFile::new().unwrap().path().to_path_buf(), &blobs_dir, vec![node_address(&swarm_config1)], + KzgrsDaVerifierSettings { + sk: node2_sk.clone(), + nodes_public_keys: vec![node1_pk.clone(), node2_pk.clone()], + }, + TestDaNetworkSettings { + peer_addresses, + listening_address: addr_2, + num_subnets, + num_samples, + nodes_per_subnet, + node_key: peer_sk_2, + }, ); - let mempool = node1.handle().relay::(); - let storage = node1.handle().relay::>>(); - let indexer = node1.handle().relay::(); - let consensus = node1.handle().relay::(); + // Node1 relays. + let node1_mempool = node1.handle().relay::(); + let node1_storage = node1.handle().relay::>>(); + let node1_indexer = node1.handle().relay::(); + let node1_consensus = node1.handle().relay::(); + + // Node2 relays. + let node2_storage = node2.handle().relay::>>(); + + let encoder = &ENCODER; + let data = rand_data(10); + + let encoded_data = encoder.encode(&data).unwrap(); + let columns: Vec<_> = encoded_data.extended_data.columns().collect(); + + // Mock attestation step where blob is persisted in nodes blob storage. + let rt = tokio::runtime::Runtime::new().unwrap(); + + let mut blobs = vec![]; + for (i, column) in columns.iter().enumerate() { + let da_blob = DaBlob { + column: column.clone(), + column_idx: i + .try_into() + .expect("Column index shouldn't overflow the target type"), + column_commitment: encoded_data.column_commitments[i], + aggregated_column_commitment: encoded_data.aggregated_column_commitment, + aggregated_column_proof: encoded_data.aggregated_column_proofs[i], + rows_commitments: encoded_data.row_commitments.clone(), + rows_proofs: encoded_data + .rows_proofs + .iter() + .map(|proofs| proofs.get(i).cloned().unwrap()) + .collect(), + }; + + rt.block_on(write_blob( + blobs_dir.clone(), + da_blob.id().as_ref(), + da_blob.column_idx().as_ref(), + &Wire::serialize(da_blob.clone()), + )) + .unwrap(); + + blobs.push(da_blob); + } + + // Test generate hash for Certificate with default Hasher + // let mut default_hasher = DefaultHasher::new(); + // let _hash3 = ::hash(&blob_info, &mut default_hasher); - let blob_hash = [9u8; 32]; let app_id = [7u8; 32]; let index = 0.into(); let range = 0.into()..1.into(); // get idx 0 and 1. let meta = Metadata::new(app_id, index); + + let blob_hash = ::id(&blobs[0]); let blob_info = BlobInfo::new(blob_hash, meta); // Test get Metadata for Certificate @@ -222,28 +220,29 @@ fn test_indexer() { assert_eq!(app_id2, app_id); assert_eq!(index2, index); - // Test generate hash for Certificate with default Hasher - let mut default_hasher = DefaultHasher::new(); - let _hash3 = ::hash(&blob_info, &mut default_hasher); - let expected_blob_info = blob_info.clone(); - let col_idx = (0 as u16).to_be_bytes(); + let blob_0_bytes = Wire::serialize(blobs[0].clone()); - // Mock attestation step where blob is persisted in nodes blob storage. - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(write_blob( - blobs_dir, - blob_info.blob_id().as_ref(), - &col_idx, - b"blob", - )) - .unwrap(); + node2.spawn(async move { + let storage_outbound = node2_storage.connect().await.unwrap(); + + // Mock attested blob by writting directly into the da storage. + let attested_key = key_bytes(DA_VERIFIED_KEY_PREFIX, blob_hash); + + storage_outbound + .send(nomos_storage::StorageMsg::Store { + key: attested_key.into(), + value: Bytes::new(), + }) + .await + .unwrap(); + }); node1.spawn(async move { - let mempool_outbound = mempool.connect().await.unwrap(); - let storage_outbound = storage.connect().await.unwrap(); - let indexer_outbound = indexer.connect().await.unwrap(); - let consensus_outbound = consensus.connect().await.unwrap(); + let mempool_outbound = node1_mempool.connect().await.unwrap(); + let storage_outbound = node1_storage.connect().await.unwrap(); + let indexer_outbound = node1_indexer.connect().await.unwrap(); + let consensus_outbound = node1_consensus.connect().await.unwrap(); let (sender, receiver) = tokio::sync::oneshot::channel(); consensus_outbound @@ -258,9 +257,7 @@ fn test_indexer() { }); // Mock attested blob by writting directly into the da storage. - let mut attested_key = Vec::from(DA_VERIFIED_KEY_PREFIX.as_bytes()); - attested_key.extend_from_slice(&blob_hash); - attested_key.extend_from_slice(&col_idx); + let attested_key = key_bytes(DA_VERIFIED_KEY_PREFIX, blob_hash); storage_outbound .send(nomos_storage::StorageMsg::Store { @@ -283,7 +280,7 @@ fn test_indexer() { let _ = mempool_rx.await.unwrap(); // Wait for block in the network. - let timeout = tokio::time::sleep(Duration::from_secs(10)); + let timeout = tokio::time::sleep(Duration::from_secs(INDEXER_TEST_MAX_SECONDS)); tokio::pin!(timeout); loop { @@ -300,7 +297,7 @@ fn test_indexer() { } // Give time for services to process and store data. - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_secs(5)).await; // Request range of blobs from indexer. let (indexer_tx, indexer_rx) = tokio::sync::oneshot::channel(); @@ -322,7 +319,8 @@ fn test_indexer() { // When Indexer is asked for app_id at index, it will return all blobs that it has for that // blob_id. let columns = app_id_blobs[0]; - if !columns.is_empty() && *columns[0] == *b"blob" && app_id_blobs[1].is_empty() { + if !columns.is_empty() && columns[0] == blob_0_bytes.as_ref() && app_id_blobs[1].is_empty() + { is_success_tx.store(true, SeqCst); } diff --git a/nomos-services/data-availability/tests/src/lib.rs b/nomos-services/data-availability/tests/src/lib.rs index 03302be9..89bd1908 100644 --- a/nomos-services/data-availability/tests/src/lib.rs +++ b/nomos-services/data-availability/tests/src/lib.rs @@ -1,9 +1,23 @@ // Networking is not essential for verifier and indexer tests. // Libp2p network is chosen for consensus requirement, mixnet is ignored. +// +// Note: To enable rust-analyzer in modules, comment out the +// `#[cfg(not(feature = "mixnet"))]` lines (reenable when pushing). -#[cfg(all(test, feature = "libp2p", not(feature = "mixnet")))] +#[cfg(test)] +#[cfg(feature = "libp2p")] +#[cfg(not(feature = "mixnet"))] mod common; -#[cfg(all(test, feature = "libp2p", not(feature = "mixnet")))] + +#[cfg(test)] +#[cfg(feature = "libp2p")] +#[cfg(not(feature = "mixnet"))] mod indexer_integration; -#[cfg(all(test, feature = "libp2p", not(feature = "mixnet")))] + +#[cfg(test)] +#[cfg(feature = "libp2p")] +#[cfg(not(feature = "mixnet"))] mod verifier_integration; + +#[cfg(test)] +mod rng; diff --git a/nomos-services/data-availability/tests/src/rng.rs b/nomos-services/data-availability/tests/src/rng.rs new file mode 100644 index 00000000..28af9a1c --- /dev/null +++ b/nomos-services/data-availability/tests/src/rng.rs @@ -0,0 +1,35 @@ +use rand::{rngs::mock::StepRng, RngCore, SeedableRng}; + +pub struct TestRng(StepRng); + +/// Implement RngCore for TestRng +impl RngCore for TestRng { + fn next_u32(&mut self) -> u32 { + self.0.next_u32() + } + + fn next_u64(&mut self) -> u64 { + self.0.next_u64() + } + + fn fill_bytes(&mut self, dest: &mut [u8]) { + self.0.fill_bytes(dest) + } + + fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> { + self.0.try_fill_bytes(dest) + } +} + +impl SeedableRng for TestRng { + type Seed = [u8; 8]; + + fn from_seed(seed: Self::Seed) -> Self { + let seed_as_u64 = u64::from_le_bytes(seed); + TestRng(StepRng::new(seed_as_u64, 1)) + } + + fn seed_from_u64(seed: u64) -> Self { + TestRng(StepRng::new(seed, 1)) + } +} diff --git a/nomos-services/data-availability/tests/src/verifier_integration.rs b/nomos-services/data-availability/tests/src/verifier_integration.rs index 2e513f0d..668748ab 100644 --- a/nomos-services/data-availability/tests/src/verifier_integration.rs +++ b/nomos-services/data-availability/tests/src/verifier_integration.rs @@ -1,200 +1,27 @@ // std -use std::path::PathBuf; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering::SeqCst; -use std::sync::Arc; -use std::time::Duration; +use std::{ + str::FromStr, + sync::{ + atomic::{AtomicBool, Ordering::SeqCst}, + Arc, + }, + time::Duration, +}; // crates use cl::{InputWitness, NoteWitness, NullifierSecret}; use cryptarchia_consensus::TimeConfig; use cryptarchia_ledger::LedgerState; -use full_replication::BlobInfo; use kzgrs_backend::common::blob::DaBlob; -use kzgrs_backend::encoder::{DaEncoder, DaEncoderParams}; -use nomos_core::da::{blob::info::DispersedBlobInfo, DaEncoder as _}; -use nomos_core::{staking::NMO_UNIT, tx::Transaction}; -use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageSettings; -use nomos_da_indexer::IndexerSettings; -use nomos_da_network_service::backends::libp2p::validator::{ - DaNetworkValidatorBackend, DaNetworkValidatorBackendSettings, -}; -use nomos_da_network_service::NetworkConfig as DaNetworkConfig; -use nomos_da_network_service::NetworkService as DaNetworkService; -use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings; -use nomos_da_sampling::network::adapters::libp2p::DaNetworkSamplingSettings; -use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as SamplingStorageSettings; -use nomos_da_sampling::DaSamplingServiceSettings; +use nomos_core::{da::DaEncoder as _, staking::NMO_UNIT}; use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings; -use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageSettings; -use nomos_da_verifier::DaVerifierServiceSettings; -use nomos_libp2p::{ed25519, identity, PeerId}; -use nomos_libp2p::{Multiaddr, SwarmConfig}; -use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings; -use nomos_mempool::{DaMempoolSettings, TxMempoolSettings}; -use nomos_network::backends::libp2p::{Libp2p as NetworkBackend, Libp2pConfig}; -use nomos_network::{NetworkConfig, NetworkService}; -use nomos_node::{Tx, Wire}; -use nomos_storage::{backends::rocksdb::RocksBackend, StorageService}; -use overwatch_derive::*; -use overwatch_rs::overwatch::{Overwatch, OverwatchRunner}; -use overwatch_rs::services::handle::ServiceHandle; -use rand::{thread_rng, Rng, RngCore}; -use subnetworks_assignations::versions::v1::FillFromNodeList; +use nomos_libp2p::Multiaddr; +use nomos_libp2p::SwarmConfig; +use rand::{thread_rng, Rng}; use tempfile::{NamedTempFile, TempDir}; use time::OffsetDateTime; // internal use crate::common::*; -// Client node is only created for asyncroniously interact with nodes in the test. -// The services defined in it are not used. -#[derive(Services)] -struct ClientNode { - storage: ServiceHandle>>, -} - -#[derive(Services)] -struct VerifierNode { - network: ServiceHandle>, - da_network: ServiceHandle>>, - cl_mempool: ServiceHandle, - da_mempool: ServiceHandle, - storage: ServiceHandle>>, - cryptarchia: ServiceHandle, - indexer: ServiceHandle, - verifier: ServiceHandle, - da_sampling: ServiceHandle, -} - -// Client node is just an empty overwatch service to spawn a task that could communicate with other -// nodes and manage the data availability cycle during tests. -fn new_client(db_path: PathBuf) -> Overwatch { - OverwatchRunner::::run( - ClientNodeServiceSettings { - storage: nomos_storage::backends::rocksdb::RocksBackendSettings { - db_path, - read_only: false, - column_family: None, - }, - }, - None, - ) - .map_err(|e| eprintln!("Error encountered: {}", e)) - .unwrap() -} - -fn new_node( - note: &InputWitness, - ledger_config: &cryptarchia_ledger::Config, - genesis_state: &LedgerState, - time_config: &TimeConfig, - swarm_config: &SwarmConfig, - db_path: PathBuf, - blobs_dir: &PathBuf, - initial_peers: Vec, - verifier_settings: KzgrsDaVerifierSettings, -) -> Overwatch { - OverwatchRunner::::run( - VerifierNodeServiceSettings { - network: NetworkConfig { - backend: Libp2pConfig { - inner: swarm_config.clone(), - initial_peers, - }, - }, - da_network: DaNetworkConfig { - backend: DaNetworkValidatorBackendSettings { - node_key: ed25519::SecretKey::generate(), - membership: FillFromNodeList::new( - &[PeerId::from(identity::Keypair::generate_ed25519().public())], - 2, - 1, - ), - addresses: Default::default(), - listening_address: "/ip4/127.0.0.1/udp/0/quic-v1".parse::().unwrap(), - }, - }, - cl_mempool: TxMempoolSettings { - backend: (), - network: AdapterSettings { - topic: String::from(nomos_node::CL_TOPIC), - id: ::hash, - }, - registry: None, - }, - da_mempool: DaMempoolSettings { - backend: (), - network: AdapterSettings { - topic: String::from(nomos_node::DA_TOPIC), - id: ::blob_id, - }, - registry: None, - }, - storage: nomos_storage::backends::rocksdb::RocksBackendSettings { - db_path, - read_only: false, - column_family: Some("blocks".into()), - }, - indexer: IndexerSettings { - storage: IndexerStorageSettings { - blob_storage_directory: blobs_dir.clone(), - }, - }, - cryptarchia: cryptarchia_consensus::CryptarchiaSettings { - transaction_selector_settings: (), - blob_selector_settings: (), - config: ledger_config.clone(), - genesis_state: genesis_state.clone(), - time: time_config.clone(), - notes: vec![note.clone()], - }, - verifier: DaVerifierServiceSettings { - verifier_settings, - network_adapter_settings: (), - storage_adapter_settings: VerifierStorageSettings { - blob_storage_directory: blobs_dir.clone(), - }, - }, - da_sampling: DaSamplingServiceSettings { - // TODO: setup this properly! - sampling_settings: KzgrsSamplingBackendSettings { - num_samples: 0, - // Sampling service period can't be zero. - old_blobs_check_interval: Duration::from_secs(1), - blobs_validity_duration: Duration::from_secs(1), - }, - network_adapter_settings: DaNetworkSamplingSettings { - num_samples: 0, - subnet_size: 0, - }, - storage_adapter_settings: SamplingStorageSettings { - blob_storage_directory: blobs_dir.clone(), - }, - }, - }, - None, - ) - .map_err(|e| eprintln!("Error encountered: {}", e)) - .unwrap() -} - -fn generate_hex_keys() -> (String, String) { - let mut rng = rand::thread_rng(); - let sk_bytes: [u8; 32] = rng.gen(); - let sk = blst::min_sig::SecretKey::key_gen(&sk_bytes, &[]).unwrap(); - - let pk = sk.sk_to_pk(); - (hex::encode(sk.to_bytes()), hex::encode(pk.to_bytes())) -} - -pub fn rand_data(elements_count: usize) -> Vec { - let mut buff = vec![0; elements_count * DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE]; - rand::thread_rng().fill_bytes(&mut buff); - buff -} - -pub const PARAMS: DaEncoderParams = DaEncoderParams::default_with(2); -pub const ENCODER: DaEncoder = DaEncoder::new(PARAMS); - #[test] fn test_verifier() { let performed_tx = Arc::new(AtomicBool::new(false)); @@ -247,11 +74,23 @@ fn test_verifier() { let blobs_dir = TempDir::new().unwrap().path().to_path_buf(); - let (node1_sk, node1_pk) = generate_hex_keys(); - let (node2_sk, node2_pk) = generate_hex_keys(); + let (node1_sk, node1_pk) = generate_blst_hex_keys(); + let (node2_sk, node2_pk) = generate_blst_hex_keys(); let client_zone = new_client(NamedTempFile::new().unwrap().path().to_path_buf()); + let (peer_sk_1, peer_id_1) = generate_ed25519_sk_peerid(); + let (peer_sk_2, peer_id_2) = generate_ed25519_sk_peerid(); + + let addr_1 = Multiaddr::from_str("/ip4/127.0.0.1/udp/8880/quic-v1").unwrap(); + let addr_2 = Multiaddr::from_str("/ip4/127.0.0.1/udp/8881/quic-v1").unwrap(); + + let peer_addresses = vec![(peer_id_1, addr_1.clone()), (peer_id_2, addr_2.clone())]; + + let num_samples = 1; + let num_subnets = 2; + let nodes_per_subnet = 1; + let node1 = new_node( ¬es[0], &ledger_config, @@ -265,6 +104,14 @@ fn test_verifier() { sk: node1_sk.clone(), nodes_public_keys: vec![node1_pk.clone(), node2_pk.clone()], }, + TestDaNetworkSettings { + peer_addresses: peer_addresses.clone(), + listening_address: addr_1, + num_subnets, + num_samples, + nodes_per_subnet, + node_key: peer_sk_1, + }, ); let node2 = new_node( @@ -280,6 +127,14 @@ fn test_verifier() { sk: node2_sk, nodes_public_keys: vec![node1_pk, node2_pk], }, + TestDaNetworkSettings { + peer_addresses, + listening_address: addr_2, + num_subnets, + num_samples, + nodes_per_subnet, + node_key: peer_sk_2, + }, ); let node1_verifier = node1.handle().relay::(); diff --git a/tests/src/lib.rs b/tests/src/lib.rs index 708da679..d1e88004 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -2,15 +2,15 @@ pub mod nodes; pub use nodes::NomosNode; use once_cell::sync::Lazy; -use std::env; // std +use std::env; use std::net::TcpListener; use std::ops::Mul; use std::time::Duration; use std::{fmt::Debug, sync::Mutex}; //crates -use nomos_libp2p::{Multiaddr, Swarm}; +use nomos_libp2p::{Multiaddr, PeerId, Swarm}; use nomos_node::Config; use rand::{thread_rng, Rng}; @@ -49,8 +49,8 @@ pub trait Node: Sized { } fn node_configs(config: SpawnConfig) -> Vec { match config { - SpawnConfig::Star { consensus } => { - let mut configs = Self::create_node_configs(consensus); + SpawnConfig::Star { consensus, da } => { + let mut configs = Self::create_node_configs(consensus, da); let next_leader_config = configs.remove(0); let first_node_addr = node_address(&next_leader_config); let mut node_configs = vec![next_leader_config]; @@ -64,8 +64,8 @@ pub trait Node: Sized { } node_configs } - SpawnConfig::Chain { consensus } => { - let mut configs = Self::create_node_configs(consensus); + SpawnConfig::Chain { consensus, da } => { + let mut configs = Self::create_node_configs(consensus, da); let next_leader_config = configs.remove(0); let mut prev_node_addr = node_address(&next_leader_config); let mut node_configs = vec![next_leader_config]; @@ -79,7 +79,7 @@ pub trait Node: Sized { } } } - fn create_node_configs(consensus: ConsensusConfig) -> Vec; + fn create_node_configs(consensus: ConsensusConfig, da: DaConfig) -> Vec; async fn consensus_info(&self) -> Self::ConsensusInfo; fn stop(&mut self); } @@ -87,14 +87,20 @@ pub trait Node: Sized { #[derive(Clone)] pub enum SpawnConfig { // Star topology: Every node is initially connected to a single node. - Star { consensus: ConsensusConfig }, + Star { + consensus: ConsensusConfig, + da: DaConfig, + }, // Chain topology: Every node is chained to the node next to it. - Chain { consensus: ConsensusConfig }, + Chain { + consensus: ConsensusConfig, + da: DaConfig, + }, } impl SpawnConfig { // Returns a SpawnConfig::Chain with proper configurations for happy-path tests - pub fn chain_happy(n_participants: usize) -> Self { + pub fn chain_happy(n_participants: usize, da: DaConfig) -> Self { Self::Chain { consensus: ConsensusConfig { n_participants, @@ -105,10 +111,11 @@ impl SpawnConfig { // a block should be produced (on average) every slot active_slot_coeff: 0.9, }, + da, } } - pub fn star_happy(n_participants: usize) -> Self { + pub fn star_happy(n_participants: usize, da: DaConfig) -> Self { Self::Star { consensus: ConsensusConfig { n_participants, @@ -119,6 +126,7 @@ impl SpawnConfig { // a block should be produced (on average) every slot active_slot_coeff: 0.9, }, + da, } } } @@ -136,3 +144,28 @@ pub struct ConsensusConfig { pub security_param: u32, pub active_slot_coeff: f64, } + +#[derive(Clone)] +pub struct DaConfig { + pub subnetwork_size: usize, + pub dispersal_factor: usize, + pub executor_peer_ids: Vec, + pub num_samples: u16, + pub num_subnets: u16, + pub old_blobs_check_interval: Duration, + pub blobs_validity_duration: Duration, +} + +impl Default for DaConfig { + fn default() -> Self { + Self { + subnetwork_size: 2, + dispersal_factor: 1, + executor_peer_ids: vec![], + num_samples: 1, + num_subnets: 2, + old_blobs_check_interval: Duration::from_secs(5), + blobs_validity_duration: Duration::from_secs(15), + } + } +} diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index 39147525..d1224e8e 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -5,7 +5,7 @@ use std::str::FromStr; use std::time::Duration; // internal use super::{create_tempdir, persist_tempdir, LOGS_PREFIX}; -use crate::{adjust_timeout, get_available_port, ConsensusConfig, Node}; +use crate::{adjust_timeout, get_available_port, ConsensusConfig, DaConfig, Node}; use blst::min_sig::SecretKey; use cl::{InputWitness, NoteWitness, NullifierSecret}; use cryptarchia_consensus::{CryptarchiaInfo, CryptarchiaSettings, TimeConfig}; @@ -27,7 +27,7 @@ use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as Sampl use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings; use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageAdapterSettings; use nomos_da_verifier::DaVerifierServiceSettings; -use nomos_libp2p::{Multiaddr, SwarmConfig}; +use nomos_libp2p::{Multiaddr, PeerId, SwarmConfig}; use nomos_log::{LoggerBackend, LoggerFormat}; use nomos_mempool::MempoolMetrics; #[cfg(feature = "mixnet")] @@ -36,11 +36,11 @@ use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig}; use nomos_node::{api::AxumBackendSettings, Config, Tx}; // crates use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings; -use nomos_da_sampling::network::adapters::libp2p::DaNetworkSamplingSettings; use nomos_da_sampling::DaSamplingServiceSettings; use once_cell::sync::Lazy; -use rand::{thread_rng, Rng, RngCore}; +use rand::{thread_rng, Rng}; use reqwest::{Client, Url}; +use subnetworks_assignations::versions::v1::FillFromNodeList; use tempfile::NamedTempFile; use time::OffsetDateTime; @@ -231,7 +231,7 @@ impl Node for NomosNode { /// so the leader can receive votes from all other nodes that will be subsequently spawned. /// If not, the leader will miss votes from nodes spawned before itself. /// This issue will be resolved by devising the block catch-up mechanism in the future. - fn create_node_configs(consensus: ConsensusConfig) -> Vec { + fn create_node_configs(consensus: ConsensusConfig, da: DaConfig) -> Vec { // we use the same random bytes for: // * da id // * coin sk @@ -289,6 +289,7 @@ impl Node for NomosNode { ledger_config.clone(), vec![coin], time_config.clone(), + da.clone(), #[cfg(feature = "mixnet")] MixnetConfig { mixclient: mixclient_config.clone(), @@ -298,6 +299,17 @@ impl Node for NomosNode { }) .collect::>(); + // Build DA memberships and address lists. + let peer_addresses = build_da_peer_list(&configs); + let mut peer_ids = peer_addresses.iter().map(|(p, _)| *p).collect::>(); + peer_ids.extend(da.executor_peer_ids); + + for config in &mut configs { + config.da_network.backend.membership = + FillFromNodeList::new(&peer_ids, da.subnetwork_size, da.dispersal_factor); + config.da_network.backend.addresses = peer_addresses.clone(); + } + #[cfg(feature = "mixnet")] { // Build a topology using only a subset of nodes. @@ -369,21 +381,38 @@ fn build_mixnet_topology(mixnode_candidates: &[&Config]) -> MixnetTopology { MixnetTopology::new(candidates, num_layers, 1, [1u8; 32]).unwrap() } +fn secret_key_to_peer_id(node_key: nomos_libp2p::ed25519::SecretKey) -> PeerId { + PeerId::from_public_key( + &nomos_libp2p::ed25519::Keypair::from(node_key) + .public() + .into(), + ) +} + +fn build_da_peer_list(configs: &[Config]) -> Vec<(PeerId, Multiaddr)> { + configs + .iter() + .map(|c| { + ( + secret_key_to_peer_id(c.da_network.backend.node_key.clone()), + c.da_network.backend.listening_address.clone(), + ) + }) + .collect() +} + fn create_node_config( - _id: [u8; 32], + id: [u8; 32], genesis_state: LedgerState, config: cryptarchia_ledger::Config, notes: Vec, time: TimeConfig, + da_config: DaConfig, #[cfg(feature = "mixnet")] mixnet_config: MixnetConfig, ) -> Config { let swarm_config: SwarmConfig = Default::default(); - let mut rng = rand::thread_rng(); - let mut buff = [0u8; 32]; - rng.fill_bytes(&mut buff); - - let verifier_sk = SecretKey::key_gen(&buff, &[]).unwrap(); + let verifier_sk = SecretKey::key_gen(&id, &[]).unwrap(); let verifier_pk_bytes = verifier_sk.sk_to_pk().to_bytes(); let verifier_sk_bytes = verifier_sk.to_bytes(); @@ -407,7 +436,11 @@ fn create_node_config( da_network: DaNetworkConfig { backend: DaNetworkValidatorBackendSettings { node_key: swarm_config.node_key, - listening_address: Multiaddr::from_str("/ip4/127.0.0.1/udp/0/quic-v1").unwrap(), + listening_address: Multiaddr::from_str(&format!( + "/ip4/127.0.0.1/udp/{}/quic-v1", + get_available_port(), + )) + .unwrap(), addresses: Default::default(), membership: Default::default(), }, @@ -437,20 +470,16 @@ fn create_node_config( }, }, da_sampling: DaSamplingServiceSettings { - // TODO: setup this properly! sampling_settings: KzgrsSamplingBackendSettings { - num_samples: 0, - // Sampling service period can't be zero. - old_blobs_check_interval: Duration::from_secs(1), - blobs_validity_duration: Duration::from_secs(1), - }, - network_adapter_settings: DaNetworkSamplingSettings { - num_samples: 0, - subnet_size: 0, + num_samples: da_config.num_samples, + num_subnets: da_config.num_subnets, + old_blobs_check_interval: da_config.old_blobs_check_interval, + blobs_validity_duration: da_config.blobs_validity_duration, }, storage_adapter_settings: SamplingStorageAdapterSettings { blob_storage_directory: "./".into(), }, + network_adapter_settings: (), }, }; diff --git a/tests/src/tests/cli.rs b/tests/src/tests/cli.rs index 127206d4..763a21df 100644 --- a/tests/src/tests/cli.rs +++ b/tests/src/tests/cli.rs @@ -4,7 +4,6 @@ use nomos_cli::da::network::backend::ExecutorBackendSettings; use nomos_da_network_service::NetworkConfig; use nomos_libp2p::ed25519; use nomos_libp2p::libp2p; -use nomos_libp2p::libp2p::multiaddr::multiaddr; use nomos_libp2p::Multiaddr; use nomos_libp2p::PeerId; use std::collections::HashMap; @@ -16,6 +15,7 @@ use tests::Node; use tests::SpawnConfig; const CLI_BIN: &str = "../target/debug/nomos-cli"; +const APP_ID: &str = "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715"; use std::process::Command; @@ -30,6 +30,8 @@ fn run_disseminate(disseminate: &Disseminate) { .arg(disseminate.index.to_string()) .arg("--columns") .arg(disseminate.columns.to_string()) + .arg("--timeout") + .arg(disseminate.timeout.to_string()) .arg("--node-addr") .arg(disseminate.node_addr.as_ref().unwrap().as_str()); @@ -42,9 +44,11 @@ fn run_disseminate(disseminate: &Disseminate) { c.status().expect("failed to execute nomos cli"); } -async fn disseminate(config: &mut Disseminate) { - let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy(2)).await; - +async fn disseminate(nodes: &Vec, config: &mut Disseminate) { + // Nomos Cli is acting as the first node when dispersing the data by using the key associated + // with that Nomos Node. + let first_config = nodes[0].config(); + let node_key = first_config.da_network.backend.node_key.clone(); let node_addrs: HashMap = nodes .iter() .map(|n| { @@ -53,18 +57,26 @@ async fn disseminate(config: &mut Disseminate) { libp2p_config.node_key.clone(), )); let peer_id = PeerId::from(keypair.public()); - let address = multiaddr!(Ip4(libp2p_config.host), Udp(libp2p_config.port), QuicV1); + let address = n + .config() + .da_network + .backend + .listening_address + .clone() + .with_p2p(peer_id) + .unwrap(); (peer_id, address) }) .collect(); - - let peer_ids: Vec = node_addrs.keys().cloned().collect(); + let membership = first_config.da_network.backend.membership.clone(); + let num_subnets = first_config.da_sampling.sampling_settings.num_subnets; let da_network_config: NetworkConfig> = NetworkConfig { backend: ExecutorBackendSettings { - node_key: ed25519::SecretKey::generate(), - membership: FillFromNodeList::new(&peer_ids, 2, 1), + node_key, + membership, node_addrs, + num_subnets, }, }; @@ -72,7 +84,6 @@ async fn disseminate(config: &mut Disseminate) { let config_path = file.path().to_owned(); serde_yaml::to_writer(&mut file, &da_network_config).unwrap(); - config.timeout = 20; config.network_config = config_path; config.node_addr = Some( format!( @@ -82,9 +93,6 @@ async fn disseminate(config: &mut Disseminate) { .parse() .unwrap(), ); - config.app_id = "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715".to_string(); - config.index = 0; - config.columns = 32; run_disseminate(&config); } @@ -93,9 +101,23 @@ async fn disseminate(config: &mut Disseminate) { async fn disseminate_blob() { let mut config = Disseminate { data: Some("hello world".to_string()), + timeout: 180, + app_id: APP_ID.into(), + index: 0, + columns: 2, ..Default::default() }; - disseminate(&mut config).await; + + let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy( + 2, + tests::DaConfig { + dispersal_factor: 2, + ..Default::default() + }, + )) + .await; + + disseminate(&nodes, &mut config).await; } #[tokio::test] @@ -107,9 +129,23 @@ async fn disseminate_big_blob() { .collect::>() .join("") .into(), + timeout: 180, + app_id: APP_ID.into(), + index: 0, + columns: 2, ..Default::default() }; - disseminate(&mut config).await; + + let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy( + 2, + tests::DaConfig { + dispersal_factor: 2, + ..Default::default() + }, + )) + .await; + + disseminate(&nodes, &mut config).await; } #[tokio::test] @@ -119,7 +155,21 @@ async fn disseminate_blob_from_file() { let mut config = Disseminate { file: Some(file.path().to_path_buf()), + timeout: 180, + app_id: APP_ID.into(), + index: 0, + columns: 2, ..Default::default() }; - disseminate(&mut config).await; + + let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy( + 4, + tests::DaConfig { + dispersal_factor: 2, + ..Default::default() + }, + )) + .await; + + disseminate(&nodes, &mut config).await; } diff --git a/tests/src/tests/cryptarchia/happy.rs b/tests/src/tests/cryptarchia/happy.rs index 853cee71..0500f793 100644 --- a/tests/src/tests/cryptarchia/happy.rs +++ b/tests/src/tests/cryptarchia/happy.rs @@ -52,6 +52,6 @@ async fn happy_test(nodes: &[NomosNode]) { #[tokio::test] async fn two_nodes_happy() { - let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy(2)).await; + let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy(2, Default::default())).await; happy_test(&nodes).await; }