DA: Integration tests for sampling and dispersal (#733)

* Use common test node in da tests

* Pick ranged subnetids when sampling

* Da network settings for integration tests

* Predefined node keys and deterministic rng

* Disperse kzgrs encoded blob

* Cli swarm fixes

* Increase indexer integration test timeout

* Check dispersal responses in cli da adapter

* DA membership configuration in node tests

* Nomos Cli act as a node in tests

* Increase timeout for dispersal tests

* Different node configurations for da tests

* Collect unique ids for sampling
This commit is contained in:
gusto 2024-09-18 15:22:36 +02:00 committed by GitHub
parent fadf6d010b
commit 9f4f139771
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 800 additions and 448 deletions

View File

@ -1,13 +1,19 @@
use futures::future::join_all;
// std
use std::collections::HashSet;
// crates
use futures::{future::join_all, StreamExt};
use kzgrs_backend::{common::blob::DaBlob, encoder::EncodedData as KzgEncodedData};
use nomos_core::da::DaDispersal;
use nomos_da_network_core::SubnetworkId;
use nomos_da_network_service::{DaNetworkMsg, NetworkService};
use overwatch_rs::services::{relay::OutboundRelay, ServiceData};
use thiserror::Error;
use tokio::sync::oneshot;
// internal
use crate::da::{network::backend::Command, NetworkBackend};
use crate::da::{
network::{backend::Command, swarm::DispersalEvent},
NetworkBackend,
};
type Relay<T> = OutboundRelay<<NetworkService<T> as ServiceData>::Message>;
@ -39,6 +45,14 @@ impl DaDispersal for Libp2pExecutorDispersalAdapter {
async fn disperse(&self, encoded_data: Self::EncodedData) -> Result<(), Self::Error> {
let mut tasks = Vec::new();
let (sender, receiver) = oneshot::channel();
self.network_relay
.send(DaNetworkMsg::Subscribe { kind: (), sender })
.await
.map_err(|(e, _)| e.to_string())?;
let mut event_stream = receiver.await.map_err(|e| e.to_string())?;
let mut expected_acknowledgments = HashSet::new();
for (i, column) in encoded_data.extended_data.columns().enumerate() {
let blob = DaBlob {
column: column.clone(),
@ -56,6 +70,8 @@ impl DaDispersal for Libp2pExecutorDispersalAdapter {
.collect(),
};
expected_acknowledgments.insert((blob.id().clone(), i as SubnetworkId));
let relay = self.network_relay.clone();
let command = DaNetworkMsg::Process(Command::Disperse {
blob,
@ -73,6 +89,28 @@ impl DaDispersal for Libp2pExecutorDispersalAdapter {
result?;
}
while !expected_acknowledgments.is_empty() {
let event = event_stream.next().await;
match event {
Some(event) => match event {
DispersalEvent::DispersalSuccess {
blob_id,
subnetwork_id,
} => {
expected_acknowledgments.remove(&(blob_id.to_vec(), subnetwork_id));
}
DispersalEvent::DispersalError { error } => {
return Err(DispersalError(format!("Received dispersal error: {error}")));
}
},
None => {
return Err(DispersalError(
"Event stream ended before receiving all acknowledgments".into(),
));
}
}
}
Ok(())
}
}

View File

@ -1,5 +1,5 @@
// std
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::pin::Pin;
@ -59,6 +59,7 @@ pub struct ExecutorBackendSettings<Membership> {
/// Membership of DA network PoV set
pub membership: Membership,
pub node_addrs: HashMap<PeerId, Multiaddr>,
pub num_subnets: u16,
}
impl<Membership> ExecutorBackend<Membership> {
@ -96,18 +97,43 @@ where
let keypair =
libp2p::identity::Keypair::from(ed25519::Keypair::from(config.node_key.clone()));
let mut executor_swarm =
ExecutorSwarm::new(keypair, config.membership, dispersal_events_sender);
ExecutorSwarm::new(keypair, config.membership.clone(), dispersal_events_sender);
let dispersal_request_sender = executor_swarm.blobs_sender();
for (_, addr) in config.node_addrs {
let mut connected_peers = HashSet::new();
let local_peer_id = *executor_swarm.local_peer_id();
for subnetwork_id in 0..config.num_subnets {
// Connect to one peer in a subnet.
let mut members = config.membership.members_of(&(subnetwork_id as u32));
members.remove(&local_peer_id);
let peer_id = *members
.iter()
.next()
.expect("Subnet should have at least one node which is not the nomos_cli");
let addr = config
.node_addrs
.get(&peer_id)
.expect("Peer address should be in the list");
executor_swarm
.dial(addr)
.dial(addr.clone())
.expect("Should schedule the dials");
connected_peers.insert(peer_id);
}
let executor_open_stream_sender = executor_swarm.open_stream_sender();
let task = overwatch_handle
.runtime()
.spawn(async move { executor_swarm.run().await });
for peer_id in connected_peers.iter() {
executor_open_stream_sender.send(*peer_id).unwrap();
}
let (dispersal_broadcast_sender, dispersal_broadcast_receiver) =
broadcast::channel(BROADCAST_CHANNEL_SIZE);
let dispersal_events_receiver = UnboundedReceiverStream::new(dispersal_events_receiver);

View File

@ -1,4 +1,5 @@
// std
use std::time::Duration;
// crates
use kzgrs_backend::common::blob::DaBlob;
use libp2p::futures::StreamExt;
@ -56,6 +57,10 @@ where
self.swarm.behaviour().blobs_sender()
}
pub fn open_stream_sender(&self) -> UnboundedSender<PeerId> {
self.swarm.behaviour().open_stream_sender()
}
fn build_swarm(
key: Keypair,
membership: Membership,
@ -65,6 +70,9 @@ where
.with_quic()
.with_behaviour(|_key| DispersalExecutorBehaviour::new(membership))
.expect("Validator behaviour should build")
.with_swarm_config(|cfg| {
cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX))
})
.build()
}
@ -73,29 +81,36 @@ where
Ok(())
}
pub fn local_peer_id(&self) -> &PeerId {
self.swarm.local_peer_id()
}
pub async fn run(&mut self) {
tokio::select! {
Some(event) = self.swarm.next() => {
match event {
SwarmEvent::Behaviour(behaviour_event) => {
self.handle_dispersal_event(behaviour_event).await;
},
SwarmEvent::ConnectionEstablished{ .. } => {}
SwarmEvent::ConnectionClosed{ .. } => {}
SwarmEvent::IncomingConnection{ .. } => {}
SwarmEvent::IncomingConnectionError{ .. } => {}
SwarmEvent::OutgoingConnectionError{ .. } => {}
SwarmEvent::NewListenAddr{ .. } => {}
SwarmEvent::ExpiredListenAddr{ .. } => {}
SwarmEvent::ListenerClosed{ .. } => {}
SwarmEvent::ListenerError{ .. } => {}
SwarmEvent::Dialing{ .. } => {}
SwarmEvent::NewExternalAddrCandidate{ .. } => {}
SwarmEvent::ExternalAddrConfirmed{ .. } => {}
SwarmEvent::ExternalAddrExpired{ .. } => {}
SwarmEvent::NewExternalAddrOfPeer{ .. } => {}
event => {
debug!("Unsupported validator swarm event: {event:?}");
loop {
tokio::select! {
Some(event) = self.swarm.next() => {
debug!("Executor received an event: {event:?}");
match event {
SwarmEvent::Behaviour(behaviour_event) => {
self.handle_dispersal_event(behaviour_event).await;
},
SwarmEvent::ConnectionEstablished{ .. } => {}
SwarmEvent::ConnectionClosed{ .. } => {}
SwarmEvent::IncomingConnection{ .. } => {}
SwarmEvent::IncomingConnectionError{ .. } => {}
SwarmEvent::OutgoingConnectionError{ .. } => {}
SwarmEvent::NewListenAddr{ .. } => {}
SwarmEvent::ExpiredListenAddr{ .. } => {}
SwarmEvent::ListenerClosed{ .. } => {}
SwarmEvent::ListenerError{ .. } => {}
SwarmEvent::Dialing{ .. } => {}
SwarmEvent::NewExternalAddrCandidate{ .. } => {}
SwarmEvent::ExternalAddrConfirmed{ .. } => {}
SwarmEvent::ExternalAddrExpired{ .. } => {}
SwarmEvent::NewExternalAddrOfPeer{ .. } => {}
event => {
debug!("Unsupported validator swarm event: {event:?}");
}
}
}
}

View File

@ -1,4 +1,5 @@
// std
use std::time::Duration;
// crates
use futures::StreamExt;
use kzgrs_backend::common::blob::DaBlob;
@ -68,6 +69,9 @@ where
.with_quic()
.with_behaviour(|key| ValidatorBehaviour::new(key, membership, addresses))
.expect("Validator behaviour should build")
.with_swarm_config(|cfg| {
cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX))
})
.build()
}

View File

@ -5,7 +5,6 @@ use std::time::{Duration, Instant};
// crates
use hex;
use rand::distributions::Standard;
use rand::prelude::*;
use serde::{Deserialize, Serialize};
use tokio::time;
@ -28,6 +27,7 @@ pub struct SamplingContext {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KzgrsSamplingBackendSettings {
pub num_samples: u16,
pub num_subnets: u16,
pub old_blobs_check_interval: Duration,
pub blobs_validity_duration: Duration,
}
@ -116,10 +116,9 @@ impl<R: Rng + Sync + Send> DaSamplingServiceBackend<R> for KzgrsSamplingBackend<
return SamplingState::Terminated;
}
let subnets: Vec<SubnetworkId> = Standard
.sample_iter(&mut self.rng)
.take(self.settings.num_samples as usize)
.collect();
let subnets: Vec<SubnetworkId> = (0..self.settings.num_subnets as SubnetworkId)
.choose_multiple(&mut self.rng, self.settings.num_samples.into());
let ctx: SamplingContext = SamplingContext {
subnets: HashSet::new(),
started: Instant::now(),
@ -149,9 +148,10 @@ mod test {
use kzgrs_backend::common::{blob::DaBlob, Column};
use nomos_core::da::BlobId;
fn create_sampler(subnet_num: usize) -> KzgrsSamplingBackend<StdRng> {
fn create_sampler(num_samples: usize, num_subnets: usize) -> KzgrsSamplingBackend<StdRng> {
let settings = KzgrsSamplingBackendSettings {
num_samples: subnet_num as u16,
num_samples: num_samples as u16,
num_subnets: num_subnets as u16,
old_blobs_check_interval: Duration::from_millis(20),
blobs_validity_duration: Duration::from_millis(10),
};
@ -159,13 +159,46 @@ mod test {
KzgrsSamplingBackend::new(settings, rng)
}
#[tokio::test]
async fn test_init_sampling_subnet_range() {
let number_of_subnets = 42;
let num_samples = 50; // Testing with more samples than subnets to check the limit
let mut backend = create_sampler(num_samples, number_of_subnets);
let blob_id = BlobId::default();
let state = backend.init_sampling(blob_id).await;
if let SamplingState::Init(subnets) = state {
let unique_subnet_ids: HashSet<_> = subnets.iter().cloned().collect();
assert_eq!(
unique_subnet_ids.len(),
subnets.len(),
"Subnet IDs are not unique"
);
assert_eq!(
subnets.len(),
number_of_subnets.min(num_samples),
"Incorrect number of subnet IDs selected"
);
for subnet_id in subnets {
assert!(
(subnet_id as usize) < number_of_subnets,
"Subnet ID is out of range"
);
}
}
}
#[tokio::test]
async fn test_sampler() {
// fictitious number of subnets
let subnet_num: usize = 42;
// create a sampler instance
let sampler = &mut create_sampler(subnet_num);
let sampler = &mut create_sampler(subnet_num, 42);
// create some blobs and blob_ids
let b1: BlobId = sampler.rng.gen();
@ -309,7 +342,7 @@ mod test {
#[tokio::test]
async fn test_pruning() {
let mut sampler = create_sampler(42);
let mut sampler = create_sampler(42, 42);
// create some sampling contexes
// first set will go through as in time

View File

@ -17,15 +17,8 @@ use nomos_da_network_service::{DaNetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use overwatch_rs::DynError;
use serde::{Deserialize, Serialize};
use subnetworks_assignations::MembershipHandler;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DaNetworkSamplingSettings {
pub num_samples: u16,
pub subnet_size: SubnetworkId,
}
pub struct Libp2pAdapter<Membership>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
@ -51,7 +44,7 @@ where
+ 'static,
{
type Backend = DaNetworkValidatorBackend<Membership>;
type Settings = DaNetworkSamplingSettings;
type Settings = ();
async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,

View File

@ -25,6 +25,7 @@ nomos-storage = { path = "../../../nomos-services/storage", features = ["rocksdb
nomos-log = { path = "../../log" }
nomos-network = { path = "../../network", features = ["mock"] }
nomos-libp2p = { path = "../../../nomos-libp2p" }
libp2p = { version = "0.53.2", features = ["ed25519"] }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
rand = "0.8"
@ -40,6 +41,6 @@ time = "0.3"
blake2 = { version = "0.10" }
[features]
default = []
default = ["libp2p"]
libp2p = []
mixnet = []

View File

@ -1,36 +1,80 @@
// std
use std::path::PathBuf;
use std::time::Duration;
// crates
use bytes::Bytes;
use full_replication::BlobInfo;
use cl::InputWitness;
use cryptarchia_consensus::TimeConfig;
use cryptarchia_ledger::LedgerState;
use kzgrs_backend::common::blob::DaBlob;
use kzgrs_backend::dispersal::BlobInfo;
use kzgrs_backend::encoder::DaEncoder;
use kzgrs_backend::encoder::DaEncoderParams;
use libp2p::identity::{
ed25519::{self, Keypair as Ed25519Keypair},
Keypair, PeerId,
};
use nomos_core::{da::blob::info::DispersedBlobInfo, header::HeaderId, tx::Transaction};
pub use nomos_core::{
da::blob::select::FillSize as FillSizeWithBlobs, tx::select::FillSize as FillSizeWithTx,
};
use nomos_da_indexer::consensus::adapters::cryptarchia::CryptarchiaConsensusAdapter;
use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapter as IndexerStorageAdapter;
use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageSettings;
use nomos_da_indexer::DataIndexerService;
use nomos_da_indexer::IndexerSettings;
use nomos_da_network_service::backends::libp2p::validator::{
DaNetworkValidatorBackend, DaNetworkValidatorBackendSettings,
};
use nomos_da_network_service::NetworkConfig as DaNetworkConfig;
use nomos_da_network_service::NetworkService as DaNetworkService;
use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings;
use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapter as SamplingStorageAdapter;
use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as SamplingStorageSettings;
use nomos_da_sampling::DaSamplingService;
use nomos_da_sampling::DaSamplingServiceSettings;
use nomos_da_sampling::{
backend::kzgrs::KzgrsSamplingBackend,
network::adapters::libp2p::Libp2pAdapter as SamplingLibp2pAdapter,
};
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier;
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings;
use nomos_da_verifier::network::adapters::libp2p::Libp2pAdapter;
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapter as VerifierStorageAdapter;
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageSettings;
use nomos_da_verifier::DaVerifierService;
use nomos_da_verifier::DaVerifierServiceSettings;
use nomos_libp2p::{Multiaddr, Swarm, SwarmConfig};
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter;
use nomos_mempool::{backend::mockpool::MockPool, TxMempoolService};
use nomos_storage::backends::rocksdb::RocksBackend;
use rand_chacha::ChaCha20Rng;
use subnetworks_assignations::versions::v1::FillFromNodeList;
pub use nomos_core::{
da::blob::select::FillSize as FillSizeWithBlobs, tx::select::FillSize as FillSizeWithTx,
};
use nomos_mempool::da::service::DaMempoolService;
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter;
use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings;
use nomos_mempool::{backend::mockpool::MockPool, TxMempoolService};
use nomos_mempool::{DaMempoolSettings, TxMempoolSettings};
use nomos_network::backends::libp2p::{Libp2p as NetworkBackend, Libp2pConfig};
use nomos_network::NetworkConfig;
use nomos_network::NetworkService;
use nomos_node::{Tx, Wire};
use nomos_storage::backends::rocksdb::RocksBackend;
use nomos_storage::StorageService;
use overwatch_derive::*;
use overwatch_rs::overwatch::{Overwatch, OverwatchRunner};
use overwatch_rs::services::handle::ServiceHandle;
use rand::{Rng, RngCore};
use subnetworks_assignations::versions::v1::FillFromNodeList;
// internal
use crate::rng::TestRng;
type IntegrationRng = TestRng;
/// Membership used by the DA Network service.
pub type NomosDaMembership = FillFromNodeList;
pub const PARAMS: DaEncoderParams = DaEncoderParams::default_with(2);
pub const ENCODER: DaEncoder = DaEncoder::new(PARAMS);
pub const SK1: &str = "aca2c52f5928a53de79679daf390b0903eeccd9671b4350d49948d84334874806afe68536da9e076205a2af0af350e6c50851a040e3057b6544a29f5689ccd31";
pub const SK2: &str = "f9dc26eea8bc56d9a4c59841b438665b998ce5e42f49f832df5b770a725c2daafee53b33539127321f6f5085e42902bd380e82d18a7aff6404e632b842106785";
pub(crate) type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus<
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter<Tx, BlobInfo>,
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
@ -40,24 +84,24 @@ pub(crate) type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus<
FillSizeWithTx<MB16, Tx>,
FillSizeWithBlobs<MB16, BlobInfo>,
RocksBackend<Wire>,
KzgrsSamplingBackend<ChaCha20Rng>,
KzgrsSamplingBackend<IntegrationRng>,
SamplingLibp2pAdapter<NomosDaMembership>,
ChaCha20Rng,
IntegrationRng,
SamplingStorageAdapter<DaBlob, Wire>,
>;
pub type DaSampling = DaSamplingService<
KzgrsSamplingBackend<ChaCha20Rng>,
KzgrsSamplingBackend<IntegrationRng>,
SamplingLibp2pAdapter<NomosDaMembership>,
ChaCha20Rng,
IntegrationRng,
SamplingStorageAdapter<DaBlob, Wire>,
>;
pub(crate) type DaIndexer = DataIndexerService<
// Indexer specific.
Bytes,
IndexerStorageAdapter<Wire, full_replication::BlobInfo>,
CryptarchiaConsensusAdapter<Tx, full_replication::BlobInfo>,
IndexerStorageAdapter<Wire, BlobInfo>,
CryptarchiaConsensusAdapter<Tx, BlobInfo>,
// Cryptarchia specific, should be the same as in `Cryptarchia` type above.
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter<Tx, BlobInfo>,
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
@ -67,9 +111,9 @@ pub(crate) type DaIndexer = DataIndexerService<
FillSizeWithTx<MB16, Tx>,
FillSizeWithBlobs<MB16, BlobInfo>,
RocksBackend<Wire>,
KzgrsSamplingBackend<ChaCha20Rng>,
KzgrsSamplingBackend<IntegrationRng>,
SamplingLibp2pAdapter<NomosDaMembership>,
ChaCha20Rng,
IntegrationRng,
SamplingStorageAdapter<DaBlob, Wire>,
>;
@ -81,9 +125,9 @@ pub(crate) type TxMempool = TxMempoolService<
pub type DaMempool = DaMempoolService<
MempoolNetworkAdapter<BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
MockPool<HeaderId, BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
KzgrsSamplingBackend<ChaCha20Rng>,
KzgrsSamplingBackend<IntegrationRng>,
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
ChaCha20Rng,
IntegrationRng,
SamplingStorageAdapter<DaBlob, Wire>,
>;
@ -95,6 +139,190 @@ pub(crate) type DaVerifier = DaVerifierService<
pub(crate) const MB16: usize = 1024 * 1024 * 16;
#[derive(Services)]
pub struct TestNode {
//logging: ServiceHandle<Logger>,
network: ServiceHandle<NetworkService<NetworkBackend>>,
cl_mempool: ServiceHandle<TxMempool>,
da_network: ServiceHandle<DaNetworkService<DaNetworkValidatorBackend<FillFromNodeList>>>,
da_mempool: ServiceHandle<DaMempool>,
storage: ServiceHandle<StorageService<RocksBackend<Wire>>>,
cryptarchia: ServiceHandle<Cryptarchia>,
indexer: ServiceHandle<DaIndexer>,
verifier: ServiceHandle<DaVerifier>,
da_sampling: ServiceHandle<DaSampling>,
}
pub struct TestDaNetworkSettings {
pub peer_addresses: Vec<(PeerId, Multiaddr)>,
pub listening_address: Multiaddr,
pub num_subnets: u16,
pub num_samples: u16,
pub nodes_per_subnet: u16,
pub node_key: ed25519::SecretKey,
}
pub fn new_node(
note: &InputWitness,
ledger_config: &cryptarchia_ledger::Config,
genesis_state: &LedgerState,
time_config: &TimeConfig,
swarm_config: &SwarmConfig,
db_path: PathBuf,
blobs_dir: &PathBuf,
initial_peers: Vec<Multiaddr>,
verifier_settings: KzgrsDaVerifierSettings,
da_network_settings: TestDaNetworkSettings,
) -> Overwatch {
OverwatchRunner::<TestNode>::run(
TestNodeServiceSettings {
//logging: Default::default(),
network: NetworkConfig {
backend: Libp2pConfig {
inner: swarm_config.clone(),
initial_peers,
},
},
da_network: DaNetworkConfig {
backend: DaNetworkValidatorBackendSettings {
node_key: da_network_settings.node_key,
membership: FillFromNodeList::new(
&da_network_settings
.peer_addresses
.iter()
.map(|(peer_id, _)| peer_id.clone())
.collect::<Vec<PeerId>>(),
da_network_settings.num_subnets.into(),
da_network_settings.nodes_per_subnet.into(),
),
addresses: da_network_settings.peer_addresses,
listening_address: da_network_settings.listening_address,
},
},
cl_mempool: TxMempoolSettings {
backend: (),
network: AdapterSettings {
topic: String::from(nomos_node::CL_TOPIC),
id: <Tx as Transaction>::hash,
},
registry: None,
},
da_mempool: DaMempoolSettings {
backend: (),
network: AdapterSettings {
topic: String::from(nomos_node::DA_TOPIC),
id: <BlobInfo as DispersedBlobInfo>::blob_id,
},
registry: None,
},
storage: nomos_storage::backends::rocksdb::RocksBackendSettings {
db_path,
read_only: false,
column_family: Some("blocks".into()),
},
indexer: IndexerSettings {
storage: IndexerStorageSettings {
blob_storage_directory: blobs_dir.clone(),
},
},
cryptarchia: cryptarchia_consensus::CryptarchiaSettings {
transaction_selector_settings: (),
blob_selector_settings: (),
config: ledger_config.clone(),
genesis_state: genesis_state.clone(),
time: time_config.clone(),
notes: vec![note.clone()],
},
verifier: DaVerifierServiceSettings {
verifier_settings,
network_adapter_settings: (),
storage_adapter_settings: VerifierStorageSettings {
blob_storage_directory: blobs_dir.clone(),
},
},
da_sampling: DaSamplingServiceSettings {
// TODO: setup this properly!
sampling_settings: KzgrsSamplingBackendSettings {
num_samples: da_network_settings.num_samples,
num_subnets: da_network_settings.num_subnets,
// Sampling service period can't be zero.
old_blobs_check_interval: Duration::from_secs(1),
blobs_validity_duration: Duration::from_secs(15),
},
network_adapter_settings: (),
storage_adapter_settings: SamplingStorageSettings {
blob_storage_directory: blobs_dir.clone(),
},
},
},
None,
)
.map_err(|e| eprintln!("Error encountered: {}", e))
.unwrap()
}
// Client node is only created for asyncroniously interact with nodes in the test.
// The services defined in it are not used.
#[derive(Services)]
pub struct TestClient {
storage: ServiceHandle<StorageService<RocksBackend<Wire>>>,
}
// Client node is just an empty overwatch service to spawn a task that could communicate with other
// nodes and manage the data availability cycle during tests.
pub fn new_client(db_path: PathBuf) -> Overwatch {
OverwatchRunner::<TestClient>::run(
TestClientServiceSettings {
storage: nomos_storage::backends::rocksdb::RocksBackendSettings {
db_path,
read_only: false,
column_family: None,
},
},
None,
)
.map_err(|e| eprintln!("Error encountered: {}", e))
.unwrap()
}
pub fn node_address(config: &SwarmConfig) -> Multiaddr {
Swarm::multiaddr(std::net::Ipv4Addr::new(127, 0, 0, 1), config.port)
}
pub fn generate_blst_hex_keys() -> (String, String) {
let mut rng = rand::thread_rng();
let sk_bytes: [u8; 32] = rng.gen();
let sk = blst::min_sig::SecretKey::key_gen(&sk_bytes, &[]).unwrap();
let pk = sk.sk_to_pk();
(hex::encode(sk.to_bytes()), hex::encode(pk.to_bytes()))
}
pub fn create_ed25519_sk_peerid(key: &str) -> (ed25519::SecretKey, PeerId) {
let mut b = hex::decode(key).unwrap();
let ed25519_keypair = Ed25519Keypair::try_from_bytes(&mut b).unwrap();
let kp = ed25519_keypair.to_bytes();
println!("sk > {}", hex::encode(kp));
let secret_key = ed25519_keypair.secret().clone();
let libp2p_keypair: Keypair = ed25519_keypair.into();
let peer_id = PeerId::from_public_key(&libp2p_keypair.public());
(secret_key, peer_id)
}
pub fn generate_ed25519_sk_peerid() -> (ed25519::SecretKey, PeerId) {
let ed25519_keypair = Ed25519Keypair::generate();
let kp = ed25519_keypair.to_bytes();
println!("sk > {}", hex::encode(kp));
let secret_key = ed25519_keypair.secret().clone();
let libp2p_keypair: Keypair = ed25519_keypair.into();
let peer_id = PeerId::from_public_key(&libp2p_keypair.public());
(secret_key, peer_id)
}
pub fn rand_data(elements_count: usize) -> Vec<u8> {
let mut buff = vec![0; elements_count * DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE];
rand::thread_rng().fill_bytes(&mut buff);
buff
}

View File

@ -1,134 +1,46 @@
// std
use std::hash::{DefaultHasher, Hash};
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;
use std::time::Duration;
use std::{
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering::SeqCst},
Arc,
},
time::Duration,
};
// crates
use bytes::Bytes;
use cl::{InputWitness, NoteWitness, NullifierSecret};
use cryptarchia_consensus::ConsensusMsg;
use cryptarchia_consensus::TimeConfig;
use cryptarchia_consensus::{ConsensusMsg, TimeConfig};
use cryptarchia_ledger::LedgerState;
use full_replication::{BlobInfo, Metadata};
use nomos_core::da::blob::info::DispersedBlobInfo;
use nomos_core::da::blob::metadata::Metadata as _;
use nomos_core::{staking::NMO_UNIT, tx::Transaction};
use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings;
use nomos_da_indexer::IndexerSettings;
use nomos_da_network_service::backends::libp2p::validator::{
DaNetworkValidatorBackend, DaNetworkValidatorBackendSettings,
use kzgrs_backend::{
common::blob::DaBlob,
dispersal::{BlobInfo, Metadata},
};
use nomos_da_network_service::NetworkConfig as DaNetworkConfig;
use nomos_da_network_service::NetworkService as DaNetworkService;
use nomos_da_storage::fs::write_blob;
use nomos_core::da::blob::Blob;
use nomos_core::da::DaEncoder as _;
use nomos_core::{da::blob::metadata::Metadata as _, staking::NMO_UNIT};
use nomos_da_storage::rocksdb::DA_VERIFIED_KEY_PREFIX;
use nomos_libp2p::{ed25519, identity, PeerId};
use nomos_da_storage::{fs::write_blob, rocksdb::key_bytes};
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings;
use nomos_libp2p::{Multiaddr, SwarmConfig};
use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings;
use nomos_mempool::{DaMempoolSettings, TxMempoolSettings};
use nomos_network::backends::libp2p::{Libp2p as NetworkBackend, Libp2pConfig};
use nomos_network::{NetworkConfig, NetworkService};
use nomos_node::{Tx, Wire};
use nomos_storage::{backends::rocksdb::RocksBackend, StorageService};
use overwatch_derive::*;
use overwatch_rs::overwatch::{Overwatch, OverwatchRunner};
use overwatch_rs::services::handle::ServiceHandle;
use nomos_node::Wire;
use nomos_storage::{
backends::{rocksdb::RocksBackend, StorageSerde},
StorageService,
};
use rand::{thread_rng, Rng};
use subnetworks_assignations::versions::v1::FillFromNodeList;
use tempfile::{NamedTempFile, TempDir};
use time::OffsetDateTime;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;
// internal
use crate::common::*;
#[derive(Services)]
struct IndexerNode {
network: ServiceHandle<NetworkService<NetworkBackend>>,
cl_mempool: ServiceHandle<TxMempool>,
da_network: ServiceHandle<DaNetworkService<DaNetworkValidatorBackend<FillFromNodeList>>>,
da_mempool: ServiceHandle<DaMempool>,
storage: ServiceHandle<StorageService<RocksBackend<Wire>>>,
cryptarchia: ServiceHandle<Cryptarchia>,
indexer: ServiceHandle<DaIndexer>,
}
fn new_node(
note: &InputWitness,
ledger_config: &cryptarchia_ledger::Config,
genesis_state: &LedgerState,
time_config: &TimeConfig,
swarm_config: &SwarmConfig,
db_path: PathBuf,
blobs_dir: &PathBuf,
initial_peers: Vec<Multiaddr>,
) -> Overwatch {
OverwatchRunner::<IndexerNode>::run(
IndexerNodeServiceSettings {
network: NetworkConfig {
backend: Libp2pConfig {
inner: swarm_config.clone(),
initial_peers,
},
},
da_network: DaNetworkConfig {
backend: DaNetworkValidatorBackendSettings {
node_key: ed25519::SecretKey::generate(),
membership: FillFromNodeList::new(
&[PeerId::from(identity::Keypair::generate_ed25519().public())],
2,
1,
),
addresses: Default::default(),
listening_address: "/ip4/127.0.0.1/udp/0/quic-v1".parse::<Multiaddr>().unwrap(),
},
},
cl_mempool: TxMempoolSettings {
backend: (),
network: AdapterSettings {
topic: String::from(nomos_node::CL_TOPIC),
id: <Tx as Transaction>::hash,
},
registry: None,
},
da_mempool: DaMempoolSettings {
backend: (),
network: AdapterSettings {
topic: String::from(nomos_node::DA_TOPIC),
id: <BlobInfo as DispersedBlobInfo>::blob_id,
},
registry: None,
},
storage: nomos_storage::backends::rocksdb::RocksBackendSettings {
db_path,
read_only: false,
column_family: Some("blocks".into()),
},
indexer: IndexerSettings {
storage: RocksAdapterSettings {
blob_storage_directory: blobs_dir.clone(),
},
},
cryptarchia: cryptarchia_consensus::CryptarchiaSettings {
transaction_selector_settings: (),
blob_selector_settings: (),
config: ledger_config.clone(),
genesis_state: genesis_state.clone(),
time: time_config.clone(),
notes: vec![note.clone()],
},
},
None,
)
.map_err(|e| eprintln!("Error encountered: {}", e))
.unwrap()
}
const INDEXER_TEST_MAX_SECONDS: u64 = 60;
// TODO: When verifier is implemented this test should be removed and a new one
// performed in integration tests crate using the real node.
#[ignore = "Membership needs to be configured correctly"]
#[test]
fn test_indexer() {
let performed_tx = Arc::new(AtomicBool::new(false));
@ -181,6 +93,21 @@ fn test_indexer() {
let blobs_dir = TempDir::new().unwrap().path().to_path_buf();
let (node1_sk, node1_pk) = generate_blst_hex_keys();
let (node2_sk, node2_pk) = generate_blst_hex_keys();
let (peer_sk_1, peer_id_1) = create_ed25519_sk_peerid(SK1);
let (peer_sk_2, peer_id_2) = create_ed25519_sk_peerid(SK2);
let addr_1 = Multiaddr::from_str("/ip4/127.0.0.1/udp/8780/quic-v1").unwrap();
let addr_2 = Multiaddr::from_str("/ip4/127.0.0.1/udp/8781/quic-v1").unwrap();
let peer_addresses = vec![(peer_id_1, addr_1.clone()), (peer_id_2, addr_2.clone())];
let num_samples = 1;
let num_subnets = 2;
let nodes_per_subnet = 2;
let node1 = new_node(
&notes[0],
&ledger_config,
@ -190,9 +117,21 @@ fn test_indexer() {
NamedTempFile::new().unwrap().path().to_path_buf(),
&blobs_dir,
vec![node_address(&swarm_config2)],
KzgrsDaVerifierSettings {
sk: node1_sk.clone(),
nodes_public_keys: vec![node1_pk.clone(), node2_pk.clone()],
},
TestDaNetworkSettings {
peer_addresses: peer_addresses.clone(),
listening_address: addr_1,
num_subnets,
num_samples,
nodes_per_subnet,
node_key: peer_sk_1,
},
);
let _node2 = new_node(
let node2 = new_node(
&notes[1],
&ledger_config,
&genesis_state,
@ -201,19 +140,78 @@ fn test_indexer() {
NamedTempFile::new().unwrap().path().to_path_buf(),
&blobs_dir,
vec![node_address(&swarm_config1)],
KzgrsDaVerifierSettings {
sk: node2_sk.clone(),
nodes_public_keys: vec![node1_pk.clone(), node2_pk.clone()],
},
TestDaNetworkSettings {
peer_addresses,
listening_address: addr_2,
num_subnets,
num_samples,
nodes_per_subnet,
node_key: peer_sk_2,
},
);
let mempool = node1.handle().relay::<DaMempool>();
let storage = node1.handle().relay::<StorageService<RocksBackend<Wire>>>();
let indexer = node1.handle().relay::<DaIndexer>();
let consensus = node1.handle().relay::<Cryptarchia>();
// Node1 relays.
let node1_mempool = node1.handle().relay::<DaMempool>();
let node1_storage = node1.handle().relay::<StorageService<RocksBackend<Wire>>>();
let node1_indexer = node1.handle().relay::<DaIndexer>();
let node1_consensus = node1.handle().relay::<Cryptarchia>();
// Node2 relays.
let node2_storage = node2.handle().relay::<StorageService<RocksBackend<Wire>>>();
let encoder = &ENCODER;
let data = rand_data(10);
let encoded_data = encoder.encode(&data).unwrap();
let columns: Vec<_> = encoded_data.extended_data.columns().collect();
// Mock attestation step where blob is persisted in nodes blob storage.
let rt = tokio::runtime::Runtime::new().unwrap();
let mut blobs = vec![];
for (i, column) in columns.iter().enumerate() {
let da_blob = DaBlob {
column: column.clone(),
column_idx: i
.try_into()
.expect("Column index shouldn't overflow the target type"),
column_commitment: encoded_data.column_commitments[i],
aggregated_column_commitment: encoded_data.aggregated_column_commitment,
aggregated_column_proof: encoded_data.aggregated_column_proofs[i],
rows_commitments: encoded_data.row_commitments.clone(),
rows_proofs: encoded_data
.rows_proofs
.iter()
.map(|proofs| proofs.get(i).cloned().unwrap())
.collect(),
};
rt.block_on(write_blob(
blobs_dir.clone(),
da_blob.id().as_ref(),
da_blob.column_idx().as_ref(),
&Wire::serialize(da_blob.clone()),
))
.unwrap();
blobs.push(da_blob);
}
// Test generate hash for Certificate with default Hasher
// let mut default_hasher = DefaultHasher::new();
// let _hash3 = <BlobInfo as Hash>::hash(&blob_info, &mut default_hasher);
let blob_hash = [9u8; 32];
let app_id = [7u8; 32];
let index = 0.into();
let range = 0.into()..1.into(); // get idx 0 and 1.
let meta = Metadata::new(app_id, index);
let blob_hash = <DaBlob as Blob>::id(&blobs[0]);
let blob_info = BlobInfo::new(blob_hash, meta);
// Test get Metadata for Certificate
@ -222,28 +220,29 @@ fn test_indexer() {
assert_eq!(app_id2, app_id);
assert_eq!(index2, index);
// Test generate hash for Certificate with default Hasher
let mut default_hasher = DefaultHasher::new();
let _hash3 = <BlobInfo as Hash>::hash(&blob_info, &mut default_hasher);
let expected_blob_info = blob_info.clone();
let col_idx = (0 as u16).to_be_bytes();
let blob_0_bytes = Wire::serialize(blobs[0].clone());
// Mock attestation step where blob is persisted in nodes blob storage.
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(write_blob(
blobs_dir,
blob_info.blob_id().as_ref(),
&col_idx,
b"blob",
))
.unwrap();
node2.spawn(async move {
let storage_outbound = node2_storage.connect().await.unwrap();
// Mock attested blob by writting directly into the da storage.
let attested_key = key_bytes(DA_VERIFIED_KEY_PREFIX, blob_hash);
storage_outbound
.send(nomos_storage::StorageMsg::Store {
key: attested_key.into(),
value: Bytes::new(),
})
.await
.unwrap();
});
node1.spawn(async move {
let mempool_outbound = mempool.connect().await.unwrap();
let storage_outbound = storage.connect().await.unwrap();
let indexer_outbound = indexer.connect().await.unwrap();
let consensus_outbound = consensus.connect().await.unwrap();
let mempool_outbound = node1_mempool.connect().await.unwrap();
let storage_outbound = node1_storage.connect().await.unwrap();
let indexer_outbound = node1_indexer.connect().await.unwrap();
let consensus_outbound = node1_consensus.connect().await.unwrap();
let (sender, receiver) = tokio::sync::oneshot::channel();
consensus_outbound
@ -258,9 +257,7 @@ fn test_indexer() {
});
// Mock attested blob by writting directly into the da storage.
let mut attested_key = Vec::from(DA_VERIFIED_KEY_PREFIX.as_bytes());
attested_key.extend_from_slice(&blob_hash);
attested_key.extend_from_slice(&col_idx);
let attested_key = key_bytes(DA_VERIFIED_KEY_PREFIX, blob_hash);
storage_outbound
.send(nomos_storage::StorageMsg::Store {
@ -283,7 +280,7 @@ fn test_indexer() {
let _ = mempool_rx.await.unwrap();
// Wait for block in the network.
let timeout = tokio::time::sleep(Duration::from_secs(10));
let timeout = tokio::time::sleep(Duration::from_secs(INDEXER_TEST_MAX_SECONDS));
tokio::pin!(timeout);
loop {
@ -300,7 +297,7 @@ fn test_indexer() {
}
// Give time for services to process and store data.
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(Duration::from_secs(5)).await;
// Request range of blobs from indexer.
let (indexer_tx, indexer_rx) = tokio::sync::oneshot::channel();
@ -322,7 +319,8 @@ fn test_indexer() {
// When Indexer is asked for app_id at index, it will return all blobs that it has for that
// blob_id.
let columns = app_id_blobs[0];
if !columns.is_empty() && *columns[0] == *b"blob" && app_id_blobs[1].is_empty() {
if !columns.is_empty() && columns[0] == blob_0_bytes.as_ref() && app_id_blobs[1].is_empty()
{
is_success_tx.store(true, SeqCst);
}

View File

@ -1,9 +1,23 @@
// Networking is not essential for verifier and indexer tests.
// Libp2p network is chosen for consensus requirement, mixnet is ignored.
//
// Note: To enable rust-analyzer in modules, comment out the
// `#[cfg(not(feature = "mixnet"))]` lines (reenable when pushing).
#[cfg(all(test, feature = "libp2p", not(feature = "mixnet")))]
#[cfg(test)]
#[cfg(feature = "libp2p")]
#[cfg(not(feature = "mixnet"))]
mod common;
#[cfg(all(test, feature = "libp2p", not(feature = "mixnet")))]
#[cfg(test)]
#[cfg(feature = "libp2p")]
#[cfg(not(feature = "mixnet"))]
mod indexer_integration;
#[cfg(all(test, feature = "libp2p", not(feature = "mixnet")))]
#[cfg(test)]
#[cfg(feature = "libp2p")]
#[cfg(not(feature = "mixnet"))]
mod verifier_integration;
#[cfg(test)]
mod rng;

View File

@ -0,0 +1,35 @@
use rand::{rngs::mock::StepRng, RngCore, SeedableRng};
pub struct TestRng(StepRng);
/// Implement RngCore for TestRng
impl RngCore for TestRng {
fn next_u32(&mut self) -> u32 {
self.0.next_u32()
}
fn next_u64(&mut self) -> u64 {
self.0.next_u64()
}
fn fill_bytes(&mut self, dest: &mut [u8]) {
self.0.fill_bytes(dest)
}
fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
self.0.try_fill_bytes(dest)
}
}
impl SeedableRng for TestRng {
type Seed = [u8; 8];
fn from_seed(seed: Self::Seed) -> Self {
let seed_as_u64 = u64::from_le_bytes(seed);
TestRng(StepRng::new(seed_as_u64, 1))
}
fn seed_from_u64(seed: u64) -> Self {
TestRng(StepRng::new(seed, 1))
}
}

View File

@ -1,200 +1,27 @@
// std
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;
use std::time::Duration;
use std::{
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering::SeqCst},
Arc,
},
time::Duration,
};
// crates
use cl::{InputWitness, NoteWitness, NullifierSecret};
use cryptarchia_consensus::TimeConfig;
use cryptarchia_ledger::LedgerState;
use full_replication::BlobInfo;
use kzgrs_backend::common::blob::DaBlob;
use kzgrs_backend::encoder::{DaEncoder, DaEncoderParams};
use nomos_core::da::{blob::info::DispersedBlobInfo, DaEncoder as _};
use nomos_core::{staking::NMO_UNIT, tx::Transaction};
use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageSettings;
use nomos_da_indexer::IndexerSettings;
use nomos_da_network_service::backends::libp2p::validator::{
DaNetworkValidatorBackend, DaNetworkValidatorBackendSettings,
};
use nomos_da_network_service::NetworkConfig as DaNetworkConfig;
use nomos_da_network_service::NetworkService as DaNetworkService;
use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings;
use nomos_da_sampling::network::adapters::libp2p::DaNetworkSamplingSettings;
use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as SamplingStorageSettings;
use nomos_da_sampling::DaSamplingServiceSettings;
use nomos_core::{da::DaEncoder as _, staking::NMO_UNIT};
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings;
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageSettings;
use nomos_da_verifier::DaVerifierServiceSettings;
use nomos_libp2p::{ed25519, identity, PeerId};
use nomos_libp2p::{Multiaddr, SwarmConfig};
use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings;
use nomos_mempool::{DaMempoolSettings, TxMempoolSettings};
use nomos_network::backends::libp2p::{Libp2p as NetworkBackend, Libp2pConfig};
use nomos_network::{NetworkConfig, NetworkService};
use nomos_node::{Tx, Wire};
use nomos_storage::{backends::rocksdb::RocksBackend, StorageService};
use overwatch_derive::*;
use overwatch_rs::overwatch::{Overwatch, OverwatchRunner};
use overwatch_rs::services::handle::ServiceHandle;
use rand::{thread_rng, Rng, RngCore};
use subnetworks_assignations::versions::v1::FillFromNodeList;
use nomos_libp2p::Multiaddr;
use nomos_libp2p::SwarmConfig;
use rand::{thread_rng, Rng};
use tempfile::{NamedTempFile, TempDir};
use time::OffsetDateTime;
// internal
use crate::common::*;
// Client node is only created for asyncroniously interact with nodes in the test.
// The services defined in it are not used.
#[derive(Services)]
struct ClientNode {
storage: ServiceHandle<StorageService<RocksBackend<Wire>>>,
}
#[derive(Services)]
struct VerifierNode {
network: ServiceHandle<NetworkService<NetworkBackend>>,
da_network: ServiceHandle<DaNetworkService<DaNetworkValidatorBackend<FillFromNodeList>>>,
cl_mempool: ServiceHandle<TxMempool>,
da_mempool: ServiceHandle<DaMempool>,
storage: ServiceHandle<StorageService<RocksBackend<Wire>>>,
cryptarchia: ServiceHandle<Cryptarchia>,
indexer: ServiceHandle<DaIndexer>,
verifier: ServiceHandle<DaVerifier>,
da_sampling: ServiceHandle<DaSampling>,
}
// Client node is just an empty overwatch service to spawn a task that could communicate with other
// nodes and manage the data availability cycle during tests.
fn new_client(db_path: PathBuf) -> Overwatch {
OverwatchRunner::<ClientNode>::run(
ClientNodeServiceSettings {
storage: nomos_storage::backends::rocksdb::RocksBackendSettings {
db_path,
read_only: false,
column_family: None,
},
},
None,
)
.map_err(|e| eprintln!("Error encountered: {}", e))
.unwrap()
}
fn new_node(
note: &InputWitness,
ledger_config: &cryptarchia_ledger::Config,
genesis_state: &LedgerState,
time_config: &TimeConfig,
swarm_config: &SwarmConfig,
db_path: PathBuf,
blobs_dir: &PathBuf,
initial_peers: Vec<Multiaddr>,
verifier_settings: KzgrsDaVerifierSettings,
) -> Overwatch {
OverwatchRunner::<VerifierNode>::run(
VerifierNodeServiceSettings {
network: NetworkConfig {
backend: Libp2pConfig {
inner: swarm_config.clone(),
initial_peers,
},
},
da_network: DaNetworkConfig {
backend: DaNetworkValidatorBackendSettings {
node_key: ed25519::SecretKey::generate(),
membership: FillFromNodeList::new(
&[PeerId::from(identity::Keypair::generate_ed25519().public())],
2,
1,
),
addresses: Default::default(),
listening_address: "/ip4/127.0.0.1/udp/0/quic-v1".parse::<Multiaddr>().unwrap(),
},
},
cl_mempool: TxMempoolSettings {
backend: (),
network: AdapterSettings {
topic: String::from(nomos_node::CL_TOPIC),
id: <Tx as Transaction>::hash,
},
registry: None,
},
da_mempool: DaMempoolSettings {
backend: (),
network: AdapterSettings {
topic: String::from(nomos_node::DA_TOPIC),
id: <BlobInfo as DispersedBlobInfo>::blob_id,
},
registry: None,
},
storage: nomos_storage::backends::rocksdb::RocksBackendSettings {
db_path,
read_only: false,
column_family: Some("blocks".into()),
},
indexer: IndexerSettings {
storage: IndexerStorageSettings {
blob_storage_directory: blobs_dir.clone(),
},
},
cryptarchia: cryptarchia_consensus::CryptarchiaSettings {
transaction_selector_settings: (),
blob_selector_settings: (),
config: ledger_config.clone(),
genesis_state: genesis_state.clone(),
time: time_config.clone(),
notes: vec![note.clone()],
},
verifier: DaVerifierServiceSettings {
verifier_settings,
network_adapter_settings: (),
storage_adapter_settings: VerifierStorageSettings {
blob_storage_directory: blobs_dir.clone(),
},
},
da_sampling: DaSamplingServiceSettings {
// TODO: setup this properly!
sampling_settings: KzgrsSamplingBackendSettings {
num_samples: 0,
// Sampling service period can't be zero.
old_blobs_check_interval: Duration::from_secs(1),
blobs_validity_duration: Duration::from_secs(1),
},
network_adapter_settings: DaNetworkSamplingSettings {
num_samples: 0,
subnet_size: 0,
},
storage_adapter_settings: SamplingStorageSettings {
blob_storage_directory: blobs_dir.clone(),
},
},
},
None,
)
.map_err(|e| eprintln!("Error encountered: {}", e))
.unwrap()
}
fn generate_hex_keys() -> (String, String) {
let mut rng = rand::thread_rng();
let sk_bytes: [u8; 32] = rng.gen();
let sk = blst::min_sig::SecretKey::key_gen(&sk_bytes, &[]).unwrap();
let pk = sk.sk_to_pk();
(hex::encode(sk.to_bytes()), hex::encode(pk.to_bytes()))
}
pub fn rand_data(elements_count: usize) -> Vec<u8> {
let mut buff = vec![0; elements_count * DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE];
rand::thread_rng().fill_bytes(&mut buff);
buff
}
pub const PARAMS: DaEncoderParams = DaEncoderParams::default_with(2);
pub const ENCODER: DaEncoder = DaEncoder::new(PARAMS);
#[test]
fn test_verifier() {
let performed_tx = Arc::new(AtomicBool::new(false));
@ -247,11 +74,23 @@ fn test_verifier() {
let blobs_dir = TempDir::new().unwrap().path().to_path_buf();
let (node1_sk, node1_pk) = generate_hex_keys();
let (node2_sk, node2_pk) = generate_hex_keys();
let (node1_sk, node1_pk) = generate_blst_hex_keys();
let (node2_sk, node2_pk) = generate_blst_hex_keys();
let client_zone = new_client(NamedTempFile::new().unwrap().path().to_path_buf());
let (peer_sk_1, peer_id_1) = generate_ed25519_sk_peerid();
let (peer_sk_2, peer_id_2) = generate_ed25519_sk_peerid();
let addr_1 = Multiaddr::from_str("/ip4/127.0.0.1/udp/8880/quic-v1").unwrap();
let addr_2 = Multiaddr::from_str("/ip4/127.0.0.1/udp/8881/quic-v1").unwrap();
let peer_addresses = vec![(peer_id_1, addr_1.clone()), (peer_id_2, addr_2.clone())];
let num_samples = 1;
let num_subnets = 2;
let nodes_per_subnet = 1;
let node1 = new_node(
&notes[0],
&ledger_config,
@ -265,6 +104,14 @@ fn test_verifier() {
sk: node1_sk.clone(),
nodes_public_keys: vec![node1_pk.clone(), node2_pk.clone()],
},
TestDaNetworkSettings {
peer_addresses: peer_addresses.clone(),
listening_address: addr_1,
num_subnets,
num_samples,
nodes_per_subnet,
node_key: peer_sk_1,
},
);
let node2 = new_node(
@ -280,6 +127,14 @@ fn test_verifier() {
sk: node2_sk,
nodes_public_keys: vec![node1_pk, node2_pk],
},
TestDaNetworkSettings {
peer_addresses,
listening_address: addr_2,
num_subnets,
num_samples,
nodes_per_subnet,
node_key: peer_sk_2,
},
);
let node1_verifier = node1.handle().relay::<DaVerifier>();

View File

@ -2,15 +2,15 @@ pub mod nodes;
pub use nodes::NomosNode;
use once_cell::sync::Lazy;
use std::env;
// std
use std::env;
use std::net::TcpListener;
use std::ops::Mul;
use std::time::Duration;
use std::{fmt::Debug, sync::Mutex};
//crates
use nomos_libp2p::{Multiaddr, Swarm};
use nomos_libp2p::{Multiaddr, PeerId, Swarm};
use nomos_node::Config;
use rand::{thread_rng, Rng};
@ -49,8 +49,8 @@ pub trait Node: Sized {
}
fn node_configs(config: SpawnConfig) -> Vec<Config> {
match config {
SpawnConfig::Star { consensus } => {
let mut configs = Self::create_node_configs(consensus);
SpawnConfig::Star { consensus, da } => {
let mut configs = Self::create_node_configs(consensus, da);
let next_leader_config = configs.remove(0);
let first_node_addr = node_address(&next_leader_config);
let mut node_configs = vec![next_leader_config];
@ -64,8 +64,8 @@ pub trait Node: Sized {
}
node_configs
}
SpawnConfig::Chain { consensus } => {
let mut configs = Self::create_node_configs(consensus);
SpawnConfig::Chain { consensus, da } => {
let mut configs = Self::create_node_configs(consensus, da);
let next_leader_config = configs.remove(0);
let mut prev_node_addr = node_address(&next_leader_config);
let mut node_configs = vec![next_leader_config];
@ -79,7 +79,7 @@ pub trait Node: Sized {
}
}
}
fn create_node_configs(consensus: ConsensusConfig) -> Vec<Config>;
fn create_node_configs(consensus: ConsensusConfig, da: DaConfig) -> Vec<Config>;
async fn consensus_info(&self) -> Self::ConsensusInfo;
fn stop(&mut self);
}
@ -87,14 +87,20 @@ pub trait Node: Sized {
#[derive(Clone)]
pub enum SpawnConfig {
// Star topology: Every node is initially connected to a single node.
Star { consensus: ConsensusConfig },
Star {
consensus: ConsensusConfig,
da: DaConfig,
},
// Chain topology: Every node is chained to the node next to it.
Chain { consensus: ConsensusConfig },
Chain {
consensus: ConsensusConfig,
da: DaConfig,
},
}
impl SpawnConfig {
// Returns a SpawnConfig::Chain with proper configurations for happy-path tests
pub fn chain_happy(n_participants: usize) -> Self {
pub fn chain_happy(n_participants: usize, da: DaConfig) -> Self {
Self::Chain {
consensus: ConsensusConfig {
n_participants,
@ -105,10 +111,11 @@ impl SpawnConfig {
// a block should be produced (on average) every slot
active_slot_coeff: 0.9,
},
da,
}
}
pub fn star_happy(n_participants: usize) -> Self {
pub fn star_happy(n_participants: usize, da: DaConfig) -> Self {
Self::Star {
consensus: ConsensusConfig {
n_participants,
@ -119,6 +126,7 @@ impl SpawnConfig {
// a block should be produced (on average) every slot
active_slot_coeff: 0.9,
},
da,
}
}
}
@ -136,3 +144,28 @@ pub struct ConsensusConfig {
pub security_param: u32,
pub active_slot_coeff: f64,
}
#[derive(Clone)]
pub struct DaConfig {
pub subnetwork_size: usize,
pub dispersal_factor: usize,
pub executor_peer_ids: Vec<PeerId>,
pub num_samples: u16,
pub num_subnets: u16,
pub old_blobs_check_interval: Duration,
pub blobs_validity_duration: Duration,
}
impl Default for DaConfig {
fn default() -> Self {
Self {
subnetwork_size: 2,
dispersal_factor: 1,
executor_peer_ids: vec![],
num_samples: 1,
num_subnets: 2,
old_blobs_check_interval: Duration::from_secs(5),
blobs_validity_duration: Duration::from_secs(15),
}
}
}

View File

@ -5,7 +5,7 @@ use std::str::FromStr;
use std::time::Duration;
// internal
use super::{create_tempdir, persist_tempdir, LOGS_PREFIX};
use crate::{adjust_timeout, get_available_port, ConsensusConfig, Node};
use crate::{adjust_timeout, get_available_port, ConsensusConfig, DaConfig, Node};
use blst::min_sig::SecretKey;
use cl::{InputWitness, NoteWitness, NullifierSecret};
use cryptarchia_consensus::{CryptarchiaInfo, CryptarchiaSettings, TimeConfig};
@ -27,7 +27,7 @@ use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as Sampl
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings;
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageAdapterSettings;
use nomos_da_verifier::DaVerifierServiceSettings;
use nomos_libp2p::{Multiaddr, SwarmConfig};
use nomos_libp2p::{Multiaddr, PeerId, SwarmConfig};
use nomos_log::{LoggerBackend, LoggerFormat};
use nomos_mempool::MempoolMetrics;
#[cfg(feature = "mixnet")]
@ -36,11 +36,11 @@ use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig};
use nomos_node::{api::AxumBackendSettings, Config, Tx};
// crates
use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings;
use nomos_da_sampling::network::adapters::libp2p::DaNetworkSamplingSettings;
use nomos_da_sampling::DaSamplingServiceSettings;
use once_cell::sync::Lazy;
use rand::{thread_rng, Rng, RngCore};
use rand::{thread_rng, Rng};
use reqwest::{Client, Url};
use subnetworks_assignations::versions::v1::FillFromNodeList;
use tempfile::NamedTempFile;
use time::OffsetDateTime;
@ -231,7 +231,7 @@ impl Node for NomosNode {
/// so the leader can receive votes from all other nodes that will be subsequently spawned.
/// If not, the leader will miss votes from nodes spawned before itself.
/// This issue will be resolved by devising the block catch-up mechanism in the future.
fn create_node_configs(consensus: ConsensusConfig) -> Vec<Config> {
fn create_node_configs(consensus: ConsensusConfig, da: DaConfig) -> Vec<Config> {
// we use the same random bytes for:
// * da id
// * coin sk
@ -289,6 +289,7 @@ impl Node for NomosNode {
ledger_config.clone(),
vec![coin],
time_config.clone(),
da.clone(),
#[cfg(feature = "mixnet")]
MixnetConfig {
mixclient: mixclient_config.clone(),
@ -298,6 +299,17 @@ impl Node for NomosNode {
})
.collect::<Vec<_>>();
// Build DA memberships and address lists.
let peer_addresses = build_da_peer_list(&configs);
let mut peer_ids = peer_addresses.iter().map(|(p, _)| *p).collect::<Vec<_>>();
peer_ids.extend(da.executor_peer_ids);
for config in &mut configs {
config.da_network.backend.membership =
FillFromNodeList::new(&peer_ids, da.subnetwork_size, da.dispersal_factor);
config.da_network.backend.addresses = peer_addresses.clone();
}
#[cfg(feature = "mixnet")]
{
// Build a topology using only a subset of nodes.
@ -369,21 +381,38 @@ fn build_mixnet_topology(mixnode_candidates: &[&Config]) -> MixnetTopology {
MixnetTopology::new(candidates, num_layers, 1, [1u8; 32]).unwrap()
}
fn secret_key_to_peer_id(node_key: nomos_libp2p::ed25519::SecretKey) -> PeerId {
PeerId::from_public_key(
&nomos_libp2p::ed25519::Keypair::from(node_key)
.public()
.into(),
)
}
fn build_da_peer_list(configs: &[Config]) -> Vec<(PeerId, Multiaddr)> {
configs
.iter()
.map(|c| {
(
secret_key_to_peer_id(c.da_network.backend.node_key.clone()),
c.da_network.backend.listening_address.clone(),
)
})
.collect()
}
fn create_node_config(
_id: [u8; 32],
id: [u8; 32],
genesis_state: LedgerState,
config: cryptarchia_ledger::Config,
notes: Vec<InputWitness>,
time: TimeConfig,
da_config: DaConfig,
#[cfg(feature = "mixnet")] mixnet_config: MixnetConfig,
) -> Config {
let swarm_config: SwarmConfig = Default::default();
let mut rng = rand::thread_rng();
let mut buff = [0u8; 32];
rng.fill_bytes(&mut buff);
let verifier_sk = SecretKey::key_gen(&buff, &[]).unwrap();
let verifier_sk = SecretKey::key_gen(&id, &[]).unwrap();
let verifier_pk_bytes = verifier_sk.sk_to_pk().to_bytes();
let verifier_sk_bytes = verifier_sk.to_bytes();
@ -407,7 +436,11 @@ fn create_node_config(
da_network: DaNetworkConfig {
backend: DaNetworkValidatorBackendSettings {
node_key: swarm_config.node_key,
listening_address: Multiaddr::from_str("/ip4/127.0.0.1/udp/0/quic-v1").unwrap(),
listening_address: Multiaddr::from_str(&format!(
"/ip4/127.0.0.1/udp/{}/quic-v1",
get_available_port(),
))
.unwrap(),
addresses: Default::default(),
membership: Default::default(),
},
@ -437,20 +470,16 @@ fn create_node_config(
},
},
da_sampling: DaSamplingServiceSettings {
// TODO: setup this properly!
sampling_settings: KzgrsSamplingBackendSettings {
num_samples: 0,
// Sampling service period can't be zero.
old_blobs_check_interval: Duration::from_secs(1),
blobs_validity_duration: Duration::from_secs(1),
},
network_adapter_settings: DaNetworkSamplingSettings {
num_samples: 0,
subnet_size: 0,
num_samples: da_config.num_samples,
num_subnets: da_config.num_subnets,
old_blobs_check_interval: da_config.old_blobs_check_interval,
blobs_validity_duration: da_config.blobs_validity_duration,
},
storage_adapter_settings: SamplingStorageAdapterSettings {
blob_storage_directory: "./".into(),
},
network_adapter_settings: (),
},
};

View File

@ -4,7 +4,6 @@ use nomos_cli::da::network::backend::ExecutorBackendSettings;
use nomos_da_network_service::NetworkConfig;
use nomos_libp2p::ed25519;
use nomos_libp2p::libp2p;
use nomos_libp2p::libp2p::multiaddr::multiaddr;
use nomos_libp2p::Multiaddr;
use nomos_libp2p::PeerId;
use std::collections::HashMap;
@ -16,6 +15,7 @@ use tests::Node;
use tests::SpawnConfig;
const CLI_BIN: &str = "../target/debug/nomos-cli";
const APP_ID: &str = "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715";
use std::process::Command;
@ -30,6 +30,8 @@ fn run_disseminate(disseminate: &Disseminate) {
.arg(disseminate.index.to_string())
.arg("--columns")
.arg(disseminate.columns.to_string())
.arg("--timeout")
.arg(disseminate.timeout.to_string())
.arg("--node-addr")
.arg(disseminate.node_addr.as_ref().unwrap().as_str());
@ -42,9 +44,11 @@ fn run_disseminate(disseminate: &Disseminate) {
c.status().expect("failed to execute nomos cli");
}
async fn disseminate(config: &mut Disseminate) {
let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy(2)).await;
async fn disseminate(nodes: &Vec<NomosNode>, config: &mut Disseminate) {
// Nomos Cli is acting as the first node when dispersing the data by using the key associated
// with that Nomos Node.
let first_config = nodes[0].config();
let node_key = first_config.da_network.backend.node_key.clone();
let node_addrs: HashMap<PeerId, Multiaddr> = nodes
.iter()
.map(|n| {
@ -53,18 +57,26 @@ async fn disseminate(config: &mut Disseminate) {
libp2p_config.node_key.clone(),
));
let peer_id = PeerId::from(keypair.public());
let address = multiaddr!(Ip4(libp2p_config.host), Udp(libp2p_config.port), QuicV1);
let address = n
.config()
.da_network
.backend
.listening_address
.clone()
.with_p2p(peer_id)
.unwrap();
(peer_id, address)
})
.collect();
let peer_ids: Vec<nomos_libp2p::PeerId> = node_addrs.keys().cloned().collect();
let membership = first_config.da_network.backend.membership.clone();
let num_subnets = first_config.da_sampling.sampling_settings.num_subnets;
let da_network_config: NetworkConfig<ExecutorBackend<FillFromNodeList>> = NetworkConfig {
backend: ExecutorBackendSettings {
node_key: ed25519::SecretKey::generate(),
membership: FillFromNodeList::new(&peer_ids, 2, 1),
node_key,
membership,
node_addrs,
num_subnets,
},
};
@ -72,7 +84,6 @@ async fn disseminate(config: &mut Disseminate) {
let config_path = file.path().to_owned();
serde_yaml::to_writer(&mut file, &da_network_config).unwrap();
config.timeout = 20;
config.network_config = config_path;
config.node_addr = Some(
format!(
@ -82,9 +93,6 @@ async fn disseminate(config: &mut Disseminate) {
.parse()
.unwrap(),
);
config.app_id = "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715".to_string();
config.index = 0;
config.columns = 32;
run_disseminate(&config);
}
@ -93,9 +101,23 @@ async fn disseminate(config: &mut Disseminate) {
async fn disseminate_blob() {
let mut config = Disseminate {
data: Some("hello world".to_string()),
timeout: 180,
app_id: APP_ID.into(),
index: 0,
columns: 2,
..Default::default()
};
disseminate(&mut config).await;
let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy(
2,
tests::DaConfig {
dispersal_factor: 2,
..Default::default()
},
))
.await;
disseminate(&nodes, &mut config).await;
}
#[tokio::test]
@ -107,9 +129,23 @@ async fn disseminate_big_blob() {
.collect::<Vec<_>>()
.join("")
.into(),
timeout: 180,
app_id: APP_ID.into(),
index: 0,
columns: 2,
..Default::default()
};
disseminate(&mut config).await;
let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy(
2,
tests::DaConfig {
dispersal_factor: 2,
..Default::default()
},
))
.await;
disseminate(&nodes, &mut config).await;
}
#[tokio::test]
@ -119,7 +155,21 @@ async fn disseminate_blob_from_file() {
let mut config = Disseminate {
file: Some(file.path().to_path_buf()),
timeout: 180,
app_id: APP_ID.into(),
index: 0,
columns: 2,
..Default::default()
};
disseminate(&mut config).await;
let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy(
4,
tests::DaConfig {
dispersal_factor: 2,
..Default::default()
},
))
.await;
disseminate(&nodes, &mut config).await;
}

View File

@ -52,6 +52,6 @@ async fn happy_test(nodes: &[NomosNode]) {
#[tokio::test]
async fn two_nodes_happy() {
let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy(2)).await;
let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy(2, Default::default())).await;
happy_test(&nodes).await;
}