diff --git a/nodes/nomos-node/src/api.rs b/nodes/nomos-node/src/api.rs index cf0f09b3..b5c14ae6 100644 --- a/nodes/nomos-node/src/api.rs +++ b/nodes/nomos-node/src/api.rs @@ -520,7 +520,7 @@ where post, path = "/da/get_range", responses( - (status = 200, description = "Range of blobs", body = Vec<([u8;8], Option)>), + (status = 200, description = "Range of blobs", body = Vec<([u8;8], Vec)>), (status = 500, description = "Internal server error", body = String), ) )] diff --git a/nodes/nomos-node/src/config.rs b/nodes/nomos-node/src/config.rs index fb9f247f..87d7479c 100644 --- a/nodes/nomos-node/src/config.rs +++ b/nodes/nomos-node/src/config.rs @@ -15,12 +15,13 @@ use nomos_libp2p::{ed25519::SecretKey, Multiaddr}; use nomos_log::{Logger, LoggerBackend, LoggerFormat}; use nomos_network::backends::libp2p::Libp2p as NetworkBackend; use nomos_network::NetworkService; +use nomos_storage::backends::rocksdb::RocksBackend; use overwatch_rs::services::ServiceData; use serde::{Deserialize, Serialize}; use subnetworks_assignations::versions::v1::FillFromNodeList; use tracing::Level; // internal -use crate::NomosApiService; +use crate::{NomosApiService, Wire}; #[derive(ValueEnum, Clone, Debug, Default)] pub enum LoggerBackendType { @@ -120,6 +121,8 @@ pub struct Config { pub da_sampling: ::Settings, pub http: ::Settings, pub cryptarchia: ::Settings, + pub storage: > as ServiceData>::Settings, + pub wait_online_secs: u64, } impl Config { diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index 75c90256..672d8bb8 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -16,8 +16,6 @@ use overwatch_rs::overwatch::*; use tracing::{span, Level}; use uuid::Uuid; -const DEFAULT_DB_PATH: &str = "./db"; - #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { @@ -97,11 +95,7 @@ fn main() -> Result<()> { cryptarchia: config.cryptarchia, #[cfg(feature = "metrics")] metrics: MetricsSettings { registry }, - storage: nomos_storage::backends::rocksdb::RocksBackendSettings { - db_path: std::path::PathBuf::from(DEFAULT_DB_PATH), - read_only: false, - column_family: Some("blocks".into()), - }, + storage: config.storage, system_sig: (), }, None, diff --git a/nomos-cli/src/cmds/disseminate/mod.rs b/nomos-cli/src/cmds/disseminate/mod.rs index 1b56ab49..f2504c2a 100644 --- a/nomos-cli/src/cmds/disseminate/mod.rs +++ b/nomos-cli/src/cmds/disseminate/mod.rs @@ -41,12 +41,18 @@ pub struct Disseminate { // Number of columns to use for encoding. #[clap(long, default_value = "4096")] pub columns: usize, + // Duration in seconds to wait before publishing blob info. + #[clap(short, long, default_value = "5")] + pub wait_until_disseminated: u64, /// File to write the certificate to, if present. #[clap(long)] pub output: Option, /// File to disseminate #[clap(short, long)] pub file: Option, + // Path to the KzgRs global parameters. + #[clap(long)] + pub global_params_path: String, } impl Disseminate { @@ -80,8 +86,11 @@ impl Disseminate { let output = self.output.clone(); let num_columns = self.columns; let with_cache = self.with_cache; + let wait_until_disseminated = Duration::from_secs(self.wait_until_disseminated); + let global_params_path = self.global_params_path.clone(); let (payload_sender, payload_rx) = tokio::sync::mpsc::unbounded_channel(); payload_sender.send(bytes.into_boxed_slice()).unwrap(); + std::thread::spawn(move || { OverwatchRunner::::run( DisseminateAppServiceSettings { @@ -96,7 +105,9 @@ impl Disseminate { metadata, status_updates, node_addr, + wait_until_disseminated, output, + global_params_path, }, logger: LoggerSettings { backend: LoggerBackend::None, diff --git a/nomos-cli/src/da/disseminate.rs b/nomos-cli/src/da/disseminate.rs index 248ca4e4..7d2f8572 100644 --- a/nomos-cli/src/da/disseminate.rs +++ b/nomos-cli/src/da/disseminate.rs @@ -33,6 +33,7 @@ use tokio::sync::{mpsc::UnboundedReceiver, Mutex}; use super::{network::adapters::libp2p::Libp2pExecutorDispersalAdapter, NetworkBackend}; use crate::api::mempool::send_blob_info; +#[allow(clippy::too_many_arguments)] pub async fn disseminate_and_wait( encoder: &E, disperal: &D, @@ -40,6 +41,7 @@ pub async fn disseminate_and_wait( metadata: Metadata, status_updates: Sender, node_addr: Option<&Url>, + wait_until_disseminated: Duration, output: Option<&PathBuf>, ) -> Result<(), Box> where @@ -68,6 +70,9 @@ where std::fs::write(output, wire::serialize(&blob_info)?)?; } + // Wait for blobs be replicated in da network. + tokio::time::sleep(wait_until_disseminated).await; + // 4) Send blob info to the mempool. if let Some(node) = node_addr { status_updates.send(Status::SendingBlobInfo)?; @@ -127,7 +132,9 @@ pub struct Settings { pub metadata: Metadata, pub status_updates: Sender, pub node_addr: Option, + pub wait_until_disseminated: Duration, pub output: Option, + pub global_params_path: String, } pub struct DisseminateService { @@ -157,7 +164,9 @@ impl ServiceCore for DisseminateService { metadata, status_updates, node_addr, + wait_until_disseminated, output, + global_params_path, } = service_state.settings_reader.get_updated_settings(); let network_relay: Relay> = @@ -167,9 +176,13 @@ impl ServiceCore for DisseminateService { .await .expect("Relay connection with NetworkService should succeed"); + let global_params = kzgrs_backend::global::global_parameters_from_file(&global_params_path) + .expect("Global parameters should be loaded from file"); + let params = kzgrs_backend::encoder::DaEncoderParams::new( kzgrs_settings.num_columns, kzgrs_settings.with_cache, + global_params, ); let da_encoder = kzgrs_backend::encoder::DaEncoder::new(params); let da_dispersal = Libp2pExecutorDispersalAdapter::new(network_relay); @@ -184,6 +197,7 @@ impl ServiceCore for DisseminateService { metadata, status_updates.clone(), node_addr.as_ref(), + wait_until_disseminated, output.as_ref(), ), ) diff --git a/nomos-da/kzgrs-backend/src/dispersal.rs b/nomos-da/kzgrs-backend/src/dispersal.rs index 43c3e184..f9c4bd8d 100644 --- a/nomos-da/kzgrs-backend/src/dispersal.rs +++ b/nomos-da/kzgrs-backend/src/dispersal.rs @@ -95,6 +95,7 @@ mod tests { test::{rand_data, ENCODER}, EncodedData, }, + global::GLOBAL_PARAMETERS, verifier::DaVerifier, }; @@ -141,7 +142,9 @@ mod tests { .clone() .into_iter() .enumerate() - .map(|(index, sk)| DaVerifier { sk, index }) + .map(|(index, sk)| { + DaVerifier::new(sk, [index as u32].into(), GLOBAL_PARAMETERS.clone()) + }) .collect(); let encoded_data = encoder.encode(&data).unwrap(); diff --git a/nomos-da/kzgrs-backend/src/encoder.rs b/nomos-da/kzgrs-backend/src/encoder.rs index 7e22f762..cf3c395c 100644 --- a/nomos-da/kzgrs-backend/src/encoder.rs +++ b/nomos-da/kzgrs-backend/src/encoder.rs @@ -7,8 +7,8 @@ use ark_poly::EvaluationDomain; use kzgrs::common::bytes_to_polynomial_unchecked; use kzgrs::fk20::{fk20_batch_generate_elements_proofs, Toeplitz1Cache}; use kzgrs::{ - bytes_to_polynomial, commit_polynomial, encode, Commitment, Evaluations, KzgRsError, - Polynomial, PolynomialEvaluationDomain, Proof, BYTES_PER_FIELD_ELEMENT, + bytes_to_polynomial, commit_polynomial, encode, Commitment, Evaluations, GlobalParameters, + KzgRsError, Polynomial, PolynomialEvaluationDomain, Proof, BYTES_PER_FIELD_ELEMENT, }; #[cfg(feature = "parallel")] use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; @@ -21,24 +21,27 @@ use crate::global::GLOBAL_PARAMETERS; pub struct DaEncoderParams { column_count: usize, toeplitz1cache: Option, + global_parameters: GlobalParameters, } impl DaEncoderParams { pub const MAX_BLS12_381_ENCODING_CHUNK_SIZE: usize = 31; - pub fn new(column_count: usize, with_cache: bool) -> Self { + pub fn new(column_count: usize, with_cache: bool, global_parameters: GlobalParameters) -> Self { let toeplitz1cache = - with_cache.then(|| Toeplitz1Cache::with_size(&GLOBAL_PARAMETERS, column_count)); + with_cache.then(|| Toeplitz1Cache::with_size(&global_parameters, column_count)); Self { column_count, toeplitz1cache, + global_parameters, } } - pub const fn default_with(column_count: usize) -> Self { + pub fn default_with(column_count: usize) -> Self { Self { column_count, toeplitz1cache: None, + global_parameters: GLOBAL_PARAMETERS.clone(), } } } @@ -83,6 +86,7 @@ impl DaEncoder { #[allow(clippy::type_complexity)] fn compute_kzg_row_commitments( + global_parameters: &GlobalParameters, matrix: &ChunksMatrix, polynomial_evaluation_domain: PolynomialEvaluationDomain, ) -> Result, KzgRsError> { @@ -104,7 +108,7 @@ impl DaEncoder { r.as_bytes().as_ref(), polynomial_evaluation_domain, ); - commit_polynomial(&poly, &GLOBAL_PARAMETERS) + commit_polynomial(&poly, global_parameters) .map(|commitment| ((evals, poly), commitment)) }) .collect() @@ -136,6 +140,7 @@ impl DaEncoder { } fn compute_rows_proofs( + global_parameters: &GlobalParameters, polynomials: &[Polynomial], toeplitz1cache: Option<&Toeplitz1Cache>, ) -> Result>, KzgRsError> { @@ -149,19 +154,25 @@ impl DaEncoder { polynomials.par_iter() } } - .map(|poly| fk20_batch_generate_elements_proofs(poly, &GLOBAL_PARAMETERS, toeplitz1cache)) + .map(|poly| fk20_batch_generate_elements_proofs(poly, global_parameters, toeplitz1cache)) .collect()) } #[allow(clippy::type_complexity)] fn compute_kzg_column_commitments( + global_parameters: &GlobalParameters, matrix: &ChunksMatrix, polynomial_evaluation_domain: PolynomialEvaluationDomain, ) -> Result, KzgRsError> { - Self::compute_kzg_row_commitments(&matrix.transposed(), polynomial_evaluation_domain) + Self::compute_kzg_row_commitments( + global_parameters, + &matrix.transposed(), + polynomial_evaluation_domain, + ) } fn compute_aggregated_column_commitment( + global_parameters: &GlobalParameters, matrix: &ChunksMatrix, commitments: &[Commitment], polynomial_evaluation_domain: PolynomialEvaluationDomain, @@ -179,17 +190,18 @@ impl DaEncoder { let (evals, poly) = bytes_to_polynomial::< { DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE }, >(hashes.as_ref(), polynomial_evaluation_domain)?; - let commitment = commit_polynomial(&poly, &GLOBAL_PARAMETERS)?; + let commitment = commit_polynomial(&poly, global_parameters)?; Ok(((evals, poly), commitment)) } fn compute_aggregated_column_proofs( + global_parameters: &GlobalParameters, polynomial: &Polynomial, toeplitz1cache: Option<&Toeplitz1Cache>, ) -> Result, KzgRsError> { Ok(fk20_batch_generate_elements_proofs( polynomial, - &GLOBAL_PARAMETERS, + global_parameters, toeplitz1cache, )) } @@ -215,31 +227,37 @@ impl nomos_core::da::DaEncoder for DaEncoder { type Error = KzgRsError; fn encode(&self, data: &[u8]) -> Result { + let global_parameters = &self.params.global_parameters; let chunked_data = self.chunkify(data); let row_domain = PolynomialEvaluationDomain::new(self.params.column_count) .expect("Domain should be able to build"); let column_domain = PolynomialEvaluationDomain::new(chunked_data.len()) .expect("Domain should be able to build"); let (row_polynomials, row_commitments): (Vec<_>, Vec<_>) = - Self::compute_kzg_row_commitments(&chunked_data, row_domain)? + Self::compute_kzg_row_commitments(global_parameters, &chunked_data, row_domain)? .into_iter() .unzip(); let (_, row_polynomials): (Vec<_>, Vec<_>) = row_polynomials.into_iter().unzip(); let encoded_evaluations = Self::rs_encode_rows(&row_polynomials, row_domain); let extended_data = Self::evals_to_chunk_matrix(&encoded_evaluations); - let rows_proofs = - Self::compute_rows_proofs(&row_polynomials, self.params.toeplitz1cache.as_ref())?; + let rows_proofs = Self::compute_rows_proofs( + global_parameters, + &row_polynomials, + self.params.toeplitz1cache.as_ref(), + )?; let (_column_polynomials, column_commitments): (Vec<_>, Vec<_>) = - Self::compute_kzg_column_commitments(&extended_data, column_domain)? + Self::compute_kzg_column_commitments(global_parameters, &extended_data, column_domain)? .into_iter() .unzip(); let ((_aggregated_evals, aggregated_polynomial), aggregated_column_commitment) = Self::compute_aggregated_column_commitment( + global_parameters, &extended_data, &column_commitments, row_domain, )?; let aggregated_column_proofs = Self::compute_aggregated_column_proofs( + global_parameters, &aggregated_polynomial, self.params.toeplitz1cache.as_ref(), )?; @@ -269,12 +287,14 @@ pub mod test { BYTES_PER_FIELD_ELEMENT, }; use nomos_core::da::DaEncoder as _; + use once_cell::sync::Lazy; use rand::RngCore; use std::ops::Div; - pub const DOMAIN_SIZE: usize = 16; - pub const PARAMS: DaEncoderParams = DaEncoderParams::default_with(DOMAIN_SIZE); - pub const ENCODER: DaEncoder = DaEncoder::new(PARAMS); + pub static DOMAIN_SIZE: usize = 16; + pub static PARAMS: Lazy = + Lazy::new(|| DaEncoderParams::default_with(DOMAIN_SIZE)); + pub static ENCODER: Lazy = Lazy::new(|| DaEncoder::new(PARAMS.clone())); pub fn rand_data(elements_count: usize) -> Vec { let mut buff = vec![0; elements_count * DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE]; @@ -301,7 +321,8 @@ pub mod test { let data = rand_data(32); let domain = GeneralEvaluationDomain::new(DOMAIN_SIZE).unwrap(); let matrix = ENCODER.chunkify(data.as_ref()); - let commitments_data = DaEncoder::compute_kzg_row_commitments(&matrix, domain).unwrap(); + let commitments_data = + DaEncoder::compute_kzg_row_commitments(&GLOBAL_PARAMETERS, &matrix, domain).unwrap(); assert_eq!(commitments_data.len(), matrix.len()); } @@ -311,7 +332,7 @@ pub mod test { let matrix = ENCODER.chunkify(data.as_ref()); let domain = PolynomialEvaluationDomain::new(DOMAIN_SIZE).unwrap(); let (poly_data, _): (Vec<_>, Vec<_>) = - DaEncoder::compute_kzg_row_commitments(&matrix, domain) + DaEncoder::compute_kzg_row_commitments(&GLOBAL_PARAMETERS, &matrix, domain) .unwrap() .into_iter() .unzip(); @@ -331,7 +352,7 @@ pub mod test { let domain = GeneralEvaluationDomain::new(DOMAIN_SIZE).unwrap(); let matrix = ENCODER.chunkify(data.as_ref()); let (poly_data, _): (Vec<_>, Vec<_>) = - DaEncoder::compute_kzg_row_commitments(&matrix, domain) + DaEncoder::compute_kzg_row_commitments(&GLOBAL_PARAMETERS, &matrix, domain) .unwrap() .into_iter() .unzip(); @@ -365,14 +386,15 @@ pub mod test { let domain = GeneralEvaluationDomain::new(DOMAIN_SIZE).unwrap(); let matrix = ENCODER.chunkify(data.as_ref()); let (poly_data, commitments): (Vec<_>, Vec<_>) = - DaEncoder::compute_kzg_row_commitments(&matrix, domain) + DaEncoder::compute_kzg_row_commitments(&GLOBAL_PARAMETERS, &matrix, domain) .unwrap() .into_iter() .unzip(); let (_evals, polynomials): (Vec<_>, Vec<_>) = poly_data.into_iter().unzip(); let extended_evaluations = DaEncoder::rs_encode_rows(&polynomials, domain); let extended_matrix = DaEncoder::evals_to_chunk_matrix(&extended_evaluations); - let proofs = DaEncoder::compute_rows_proofs(&polynomials, None).unwrap(); + let proofs = + DaEncoder::compute_rows_proofs(&GLOBAL_PARAMETERS, &polynomials, None).unwrap(); let checks = izip!(matrix.iter(), &commitments, &proofs); for (row, commitment, proofs) in checks { @@ -411,7 +433,8 @@ pub mod test { let data = rand_data(32); let domain = GeneralEvaluationDomain::new(DOMAIN_SIZE).unwrap(); let matrix = ENCODER.chunkify(data.as_ref()); - let commitments_data = DaEncoder::compute_kzg_column_commitments(&matrix, domain).unwrap(); + let commitments_data = + DaEncoder::compute_kzg_column_commitments(&GLOBAL_PARAMETERS, &matrix, domain).unwrap(); assert_eq!(commitments_data.len(), matrix.columns().count()); } @@ -421,12 +444,17 @@ pub mod test { let matrix = ENCODER.chunkify(data.as_ref()); let domain = GeneralEvaluationDomain::new(DOMAIN_SIZE).unwrap(); let (_, commitments): (Vec<_>, Vec<_>) = - DaEncoder::compute_kzg_column_commitments(&matrix, domain) + DaEncoder::compute_kzg_column_commitments(&GLOBAL_PARAMETERS, &matrix, domain) .unwrap() .into_iter() .unzip(); - let _ = - DaEncoder::compute_aggregated_column_commitment(&matrix, &commitments, domain).unwrap(); + let _ = DaEncoder::compute_aggregated_column_commitment( + &GLOBAL_PARAMETERS, + &matrix, + &commitments, + domain, + ) + .unwrap(); } #[test] @@ -435,13 +463,19 @@ pub mod test { let matrix = ENCODER.chunkify(data.as_ref()); let domain = GeneralEvaluationDomain::new(DOMAIN_SIZE).unwrap(); let (_poly_data, commitments): (Vec<_>, Vec<_>) = - DaEncoder::compute_kzg_column_commitments(&matrix, domain) + DaEncoder::compute_kzg_column_commitments(&GLOBAL_PARAMETERS, &matrix, domain) .unwrap() .into_iter() .unzip(); let ((_evals, polynomial), _aggregated_commitment) = - DaEncoder::compute_aggregated_column_commitment(&matrix, &commitments, domain).unwrap(); - DaEncoder::compute_aggregated_column_proofs(&polynomial, None).unwrap(); + DaEncoder::compute_aggregated_column_commitment( + &GLOBAL_PARAMETERS, + &matrix, + &commitments, + domain, + ) + .unwrap(); + DaEncoder::compute_aggregated_column_proofs(&GLOBAL_PARAMETERS, &polynomial, None).unwrap(); } #[test] diff --git a/nomos-da/kzgrs-backend/src/global.rs b/nomos-da/kzgrs-backend/src/global.rs index f8b52590..d6a74f4b 100644 --- a/nomos-da/kzgrs-backend/src/global.rs +++ b/nomos-da/kzgrs-backend/src/global.rs @@ -1,7 +1,32 @@ use kzgrs::{global_parameters_from_randomness, GlobalParameters}; use once_cell::sync::Lazy; +// Reexport global parameters loading from file. +pub use kzgrs::global_parameters_from_file; + pub static GLOBAL_PARAMETERS: Lazy = Lazy::new(|| { + println!("WARNING: Global parameters are randomly generated. Use for development only."); let mut rng = rand::thread_rng(); global_parameters_from_randomness(&mut rng) }); + +#[cfg(test)] +mod tests { + use std::fs::File; + + use ark_serialize::{CanonicalSerialize, Write}; + use kzgrs::global_parameters_from_randomness; + + #[test] + #[ignore = "for testing purposes only"] + fn write_random_kzgrs_params_to_file() { + let mut rng = rand::thread_rng(); + let params = global_parameters_from_randomness(&mut rng); + + let mut serialized_data = Vec::new(); + params.serialize_uncompressed(&mut serialized_data).unwrap(); + + let mut file = File::create("./kzgrs_test_params").unwrap(); + file.write_all(&serialized_data).unwrap(); + } +} diff --git a/nomos-da/kzgrs-backend/src/verifier.rs b/nomos-da/kzgrs-backend/src/verifier.rs index 83112934..fe25af6d 100644 --- a/nomos-da/kzgrs-backend/src/verifier.rs +++ b/nomos-da/kzgrs-backend/src/verifier.rs @@ -1,41 +1,38 @@ // std - -use ark_poly::EvaluationDomain; +use std::collections::HashSet; // crates -use blst::min_sig::{PublicKey, SecretKey}; +use ark_poly::EvaluationDomain; +use blst::min_sig::SecretKey; use itertools::{izip, Itertools}; use kzgrs::common::field_element_from_bytes_le; use kzgrs::{ - bytes_to_polynomial, commit_polynomial, verify_element_proof, Commitment, + bytes_to_polynomial, commit_polynomial, verify_element_proof, Commitment, GlobalParameters, PolynomialEvaluationDomain, Proof, BYTES_PER_FIELD_ELEMENT, }; - -use crate::common::blob::DaBlob; +use nomos_core::da::blob::Blob; // internal +use crate::common::blob::DaBlob; use crate::common::{hash_column_and_commitment, Chunk, Column}; use crate::encoder::DaEncoderParams; -use crate::global::GLOBAL_PARAMETERS; pub struct DaVerifier { // TODO: substitute this for an abstraction to sign things over pub sk: SecretKey, - pub index: usize, + pub index: HashSet, + global_parameters: GlobalParameters, } impl DaVerifier { - pub fn new(sk: SecretKey, nodes_public_keys: &[PublicKey]) -> Self { - // TODO: `is_sorted` is experimental, and by contract `nodes_public_keys` should be shorted - // but not sure how we could enforce it here without re-sorting anyway. - // assert!(nodes_public_keys.is_sorted()); - let self_pk = sk.sk_to_pk(); - let (index, _) = nodes_public_keys - .iter() - .find_position(|&pk| pk == &self_pk) - .expect("Self pk should be registered"); - Self { sk, index } + pub fn new(sk: SecretKey, index: HashSet, global_parameters: GlobalParameters) -> Self { + Self { + sk, + index, + global_parameters, + } } fn verify_column( + global_parameters: &GlobalParameters, column: &Column, column_commitment: &Commitment, aggregated_column_commitment: &Commitment, @@ -52,7 +49,7 @@ impl DaVerifier { ) else { return false; }; - let Ok(computed_column_commitment) = commit_polynomial(&polynomial, &GLOBAL_PARAMETERS) + let Ok(computed_column_commitment) = commit_polynomial(&polynomial, global_parameters) else { return false; }; @@ -72,11 +69,12 @@ impl DaVerifier { aggregated_column_commitment, aggregated_column_proof, rows_domain, - &GLOBAL_PARAMETERS, + global_parameters, ) } fn verify_chunk( + global_parameters: &GlobalParameters, chunk: &Chunk, commitment: &Commitment, proof: &Proof, @@ -90,11 +88,12 @@ impl DaVerifier { commitment, proof, domain, - &GLOBAL_PARAMETERS, + global_parameters, ) } fn verify_chunks( + global_parameters: &GlobalParameters, chunks: &[Chunk], commitments: &[Commitment], proofs: &[Proof], @@ -108,7 +107,8 @@ impl DaVerifier { return false; } for (chunk, commitment, proof) in izip!(chunks, commitments, proofs) { - if !DaVerifier::verify_chunk(chunk, commitment, proof, index, domain) { + if !DaVerifier::verify_chunk(global_parameters, chunk, commitment, proof, index, domain) + { return false; } } @@ -118,12 +118,16 @@ impl DaVerifier { pub fn verify(&self, blob: &DaBlob, rows_domain_size: usize) -> bool { let rows_domain = PolynomialEvaluationDomain::new(rows_domain_size) .expect("Domain should be able to build"); + let blob_col_idx = &u16::from_be_bytes(blob.column_idx()).into(); + let index = self.index.get(blob_col_idx).unwrap(); + let is_column_verified = DaVerifier::verify_column( + &self.global_parameters, &blob.column, &blob.column_commitment, &blob.aggregated_column_commitment, &blob.aggregated_column_proof, - self.index, + *index as usize, rows_domain, ); if !is_column_verified { @@ -131,10 +135,11 @@ impl DaVerifier { } let are_chunks_verified = DaVerifier::verify_chunks( + &self.global_parameters, blob.column.as_ref(), &blob.rows_commitments, &blob.rows_proofs, - self.index, + *index as usize, rows_domain, ); if !are_chunks_verified { @@ -160,7 +165,7 @@ mod test { global_parameters_from_randomness, Commitment, GlobalParameters, PolynomialEvaluationDomain, Proof, BYTES_PER_FIELD_ELEMENT, }; - use nomos_core::da::DaEncoder as _; + use nomos_core::da::DaEncoder; use once_cell::sync::Lazy; use rand::{thread_rng, RngCore}; @@ -226,6 +231,7 @@ mod test { let column_data = prepare_column(false).unwrap(); assert!(DaVerifier::verify_column( + &GLOBAL_PARAMETERS, &column_data.column, &column_data.column_commitment, &column_data.aggregated_commitment, @@ -257,6 +263,7 @@ mod test { .is_err()); assert!(!DaVerifier::verify_column( + &GLOBAL_PARAMETERS, &column2, &column_data.column_commitment, &column_data.aggregated_commitment, @@ -269,6 +276,7 @@ mod test { let column_data2 = prepare_column(true).unwrap(); assert!(!DaVerifier::verify_column( + &GLOBAL_PARAMETERS, &column_data2.column, &column_data2.column_commitment, &column_data2.aggregated_commitment, @@ -303,6 +311,7 @@ mod test { }; // Happy case let chunks_verified = DaVerifier::verify_chunks( + &GLOBAL_PARAMETERS, da_blob.column.as_ref(), &da_blob.rows_commitments, &da_blob.rows_proofs, @@ -316,6 +325,7 @@ mod test { column_w_missing_chunk.pop(); let chunks_not_verified = !DaVerifier::verify_chunks( + &GLOBAL_PARAMETERS, column_w_missing_chunk.as_ref(), &da_blob.rows_commitments, &da_blob.rows_proofs, @@ -329,6 +339,7 @@ mod test { modified_proofs.swap(0, 1); let chunks_not_verified = !DaVerifier::verify_chunks( + &GLOBAL_PARAMETERS, da_blob.column.as_ref(), &da_blob.rows_commitments, &modified_proofs, @@ -342,6 +353,7 @@ mod test { modified_commitments.swap(0, 1); let chunks_not_verified = !DaVerifier::verify_chunks( + &GLOBAL_PARAMETERS, da_blob.column.as_ref(), &modified_commitments, &da_blob.rows_proofs, @@ -367,7 +379,9 @@ mod test { let verifiers: Vec = sks .into_iter() .enumerate() - .map(|(index, sk)| DaVerifier { sk, index }) + .map(|(index, sk)| { + DaVerifier::new(sk, [index as u32].into(), GLOBAL_PARAMETERS.clone()) + }) .collect(); let encoded_data = encoder.encode(&data).unwrap(); for (i, column) in encoded_data.extended_data.columns().enumerate() { diff --git a/nomos-da/kzgrs/src/global_parameters.rs b/nomos-da/kzgrs/src/global_parameters.rs index 417e4120..91e77ab4 100644 --- a/nomos-da/kzgrs/src/global_parameters.rs +++ b/nomos-da/kzgrs/src/global_parameters.rs @@ -1,9 +1,24 @@ -use super::GlobalParameters; +// std +use std::{error::Error, fs::File}; +// crates use ark_bls12_381::{fr::Fr, Bls12_381}; use ark_poly::polynomial::univariate::DensePolynomial; -use ark_poly_commit::kzg10::KZG10; +use ark_poly_commit::kzg10::{UniversalParams, KZG10}; +use ark_serialize::{CanonicalDeserialize, Read}; use rand::Rng; +// internal +use super::GlobalParameters; pub fn global_parameters_from_randomness(rng: &mut R) -> GlobalParameters { KZG10::>::setup(8192, true, rng).unwrap() } + +pub fn global_parameters_from_file(file_path: &str) -> Result> { + let mut file = File::open(file_path)?; + let mut serialized_data = Vec::new(); + file.read_to_end(&mut serialized_data)?; + + let params = + UniversalParams::::deserialize_uncompressed_unchecked(&*serialized_data)?; + Ok(params) +} diff --git a/nomos-da/kzgrs/src/lib.rs b/nomos-da/kzgrs/src/lib.rs index 97b0c486..52136344 100644 --- a/nomos-da/kzgrs/src/lib.rs +++ b/nomos-da/kzgrs/src/lib.rs @@ -12,7 +12,7 @@ use ark_poly_commit::sonic_pc::UniversalParams; use std::mem; pub use common::{bytes_to_evaluations, bytes_to_polynomial, KzgRsError}; -pub use global_parameters::global_parameters_from_randomness; +pub use global_parameters::{global_parameters_from_file, global_parameters_from_randomness}; pub use kzg::{commit_polynomial, generate_element_proof, verify_element_proof}; pub use rs::{decode, encode}; diff --git a/nomos-da/network/core/src/protocols/replication/behaviour.rs b/nomos-da/network/core/src/protocols/replication/behaviour.rs index a9676e23..b00e96d8 100644 --- a/nomos-da/network/core/src/protocols/replication/behaviour.rs +++ b/nomos-da/network/core/src/protocols/replication/behaviour.rs @@ -7,8 +7,8 @@ use either::Either; use indexmap::IndexSet; use libp2p::core::Endpoint; use libp2p::swarm::{ - ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, NotifyHandler, THandler, - THandlerInEvent, THandlerOutEvent, ToSwarm, + ConnectionClosed, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, NotifyHandler, + THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use libp2p::{Multiaddr, PeerId}; use log::{error, trace}; @@ -43,7 +43,10 @@ pub struct ReplicationBehaviour { /// nomos DA subnetworks membership: Membership, /// Relation of connected peers of replication subnetworks - connected: HashMap, + /// + /// **TODO**: Node needs only one connection per peer for Nomos DA network communications. + /// Allowing multiple connections from the same peer id is only temporal and will be removed! + connected: HashMap>, /// Outgoing event queue outgoing_events: VecDeque, /// Seen messages cache holds a record of seen messages, messages will be removed from this @@ -114,14 +117,16 @@ where .filter(|(peer_id, _connection_id)| peers.contains(peer_id)) .collect(); - for (peer_id, connection_id) in connected_peers { - self.outgoing_events.push_back(SwarmEvent::NotifyHandler { - peer_id: *peer_id, - handler: NotifyHandler::One(*connection_id), - event: Either::Left(BehaviourEventToHandler::OutgoingMessage { - message: message.clone(), - }), - }) + for (peer_id, connection_ids) in connected_peers { + for connection_id in connection_ids.iter() { + self.outgoing_events.push_back(SwarmEvent::NotifyHandler { + peer_id: *peer_id, + handler: NotifyHandler::One(*connection_id), + event: Either::Left(BehaviourEventToHandler::OutgoingMessage { + message: message.clone(), + }), + }) + } } self.try_wake(); } @@ -152,7 +157,10 @@ where return Ok(Either::Right(libp2p::swarm::dummy::ConnectionHandler)); } trace!("{}, Connected to {peer_id}", self.local_peer_id); - self.connected.insert(peer_id, connection_id); + self.connected + .entry(peer_id) + .or_default() + .insert(connection_id); Ok(Either::Left(ReplicationHandler::new())) } @@ -164,11 +172,28 @@ where _role_override: Endpoint, ) -> Result, ConnectionDenied> { trace!("{}, Connected to {peer_id}", self.local_peer_id); - self.connected.insert(peer_id, connection_id); + self.connected + .entry(peer_id) + .or_default() + .insert(connection_id); Ok(Either::Left(ReplicationHandler::new())) } - fn on_swarm_event(&mut self, _event: FromSwarm) {} + fn on_swarm_event(&mut self, event: FromSwarm) { + if let FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + connection_id, + .. + }) = event + { + if let Some(connections) = self.connected.get_mut(&peer_id) { + connections.remove(&connection_id); + if connections.is_empty() { + self.connected.remove(&peer_id); + } + } + } + } fn on_connection_handler_event( &mut self, diff --git a/nomos-da/network/core/src/swarm/validator.rs b/nomos-da/network/core/src/swarm/validator.rs index 5d2bc37d..bfbec68c 100644 --- a/nomos-da/network/core/src/swarm/validator.rs +++ b/nomos-da/network/core/src/swarm/validator.rs @@ -4,8 +4,8 @@ use std::time::Duration; use futures::StreamExt; use kzgrs_backend::common::blob::DaBlob; use libp2p::identity::Keypair; -use libp2p::swarm::SwarmEvent; -use libp2p::{PeerId, Swarm, SwarmBuilder}; +use libp2p::swarm::{DialError, SwarmEvent}; +use libp2p::{Multiaddr, PeerId, Swarm, SwarmBuilder}; use log::{debug, error}; use nomos_da_messages::replication::ReplicationReq; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; @@ -75,6 +75,15 @@ where .build() } + pub fn dial(&mut self, addr: Multiaddr) -> Result<(), DialError> { + self.swarm.dial(addr)?; + Ok(()) + } + + pub fn local_peer_id(&self) -> &PeerId { + self.swarm.local_peer_id() + } + pub fn protocol_swarm(&self) -> &Swarm> { &self.swarm } @@ -116,7 +125,21 @@ where } } - async fn handle_replication_event(&mut self, _event: ReplicationEvent) {} + async fn handle_replication_event(&mut self, event: ReplicationEvent) { + let ReplicationEvent::IncomingMessage { message, .. } = event; + if let Ok(blob) = bincode::deserialize::( + message + .blob + .as_ref() + .expect("Message blob should not be empty") + .data + .as_slice(), + ) { + if let Err(e) = self.validation_events_sender.send(blob) { + error!("Error sending blob to validation: {e:?}"); + } + } + } async fn handle_behaviour_event(&mut self, event: ValidatorBehaviourEvent) { match event { diff --git a/nomos-services/data-availability/network/src/backends/libp2p/validator.rs b/nomos-services/data-availability/network/src/backends/libp2p/validator.rs index 03f3f39e..8a02dac0 100644 --- a/nomos-services/data-availability/network/src/backends/libp2p/validator.rs +++ b/nomos-services/data-availability/network/src/backends/libp2p/validator.rs @@ -16,6 +16,7 @@ use nomos_libp2p::secret_key_serde; use overwatch_rs::overwatch::handle::OverwatchHandle; use overwatch_rs::services::state::NoState; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; use std::fmt::Debug; use std::marker::PhantomData; use std::pin::Pin; @@ -131,8 +132,8 @@ where libp2p::identity::Keypair::from(ed25519::Keypair::from(config.node_key.clone())); let (mut validator_swarm, events_streams) = ValidatorSwarm::new( keypair, - config.membership, - config.addresses.into_iter().collect(), + config.membership.clone(), + config.addresses.clone().into_iter().collect(), ); let sampling_request_channel = validator_swarm .protocol_swarm() @@ -147,6 +148,33 @@ where .unwrap_or_else(|e| { panic!("Error listening on DA network with address {address}: {e}") }); + + // Dial peers in the same subnetworks (Node might participate in multiple). + let local_peer_id = *validator_swarm.local_peer_id(); + let mut connected_peers = HashSet::new(); + + config + .membership + .membership(&local_peer_id) + .iter() + .flat_map(|subnet| config.membership.members_of(subnet)) + .filter(|peer| peer != &local_peer_id) + .filter_map(|peer| { + config + .addresses + .iter() + .find(|(p, _)| p == &peer) + .map(|(_, addr)| (peer, addr.clone())) + }) + .for_each(|(peer, addr)| { + // Only dial if we haven't already connected to this peer. + if connected_peers.insert(peer) { + validator_swarm + .dial(addr) + .expect("Node should be able to dial peer in a subnet"); + } + }); + let task = overwatch_handle.runtime().spawn(validator_swarm.run()); let (sampling_broadcast_sender, sampling_broadcast_receiver) = broadcast::channel(BROADCAST_CHANNEL_SIZE); diff --git a/nomos-services/data-availability/sampling/src/storage/adapters/rocksdb.rs b/nomos-services/data-availability/sampling/src/storage/adapters/rocksdb.rs index 8322fc80..8150a102 100644 --- a/nomos-services/data-availability/sampling/src/storage/adapters/rocksdb.rs +++ b/nomos-services/data-availability/sampling/src/storage/adapters/rocksdb.rs @@ -55,10 +55,13 @@ where blob_id: ::BlobId, column_idx: ColumnIndex, ) -> Result, DynError> { + let column_idx = column_idx.to_be_bytes(); + let blob_idx = create_blob_idx(blob_id.as_ref(), &column_idx); + let (reply_tx, reply_rx) = tokio::sync::oneshot::channel(); self.storage_relay .send(StorageMsg::Load { - key: key_bytes(DA_VERIFIED_KEY_PREFIX, &blob_id), + key: key_bytes(DA_VERIFIED_KEY_PREFIX, blob_idx), reply_channel: reply_tx, }) .await @@ -68,7 +71,7 @@ where let blob_bytes = load_blob( self.settings.blob_storage_directory.clone(), blob_id.as_ref(), - &column_idx.to_be_bytes(), + &column_idx, ) .await?; Ok(S::deserialize(blob_bytes) @@ -84,3 +87,13 @@ where pub struct RocksAdapterSettings { pub blob_storage_directory: PathBuf, } + +// Combines a 32-byte blob ID (`[u8; 32]`) with a 2-byte column index +// (`u16` represented as `[u8; 2]`). +fn create_blob_idx(blob_id: &[u8], column_idx: &[u8]) -> [u8; 34] { + let mut blob_idx = [0u8; 34]; + blob_idx[..blob_id.len()].copy_from_slice(blob_id); + blob_idx[blob_id.len()..].copy_from_slice(column_idx); + + blob_idx +} diff --git a/nomos-services/data-availability/tests/Cargo.toml b/nomos-services/data-availability/tests/Cargo.toml index 7a0dc492..8dd3108e 100644 --- a/nomos-services/data-availability/tests/Cargo.toml +++ b/nomos-services/data-availability/tests/Cargo.toml @@ -26,6 +26,7 @@ nomos-log = { path = "../../log" } nomos-network = { path = "../../network", features = ["mock"] } nomos-libp2p = { path = "../../../nomos-libp2p" } libp2p = { version = "0.53.2", features = ["ed25519"] } +once_cell = "1.19" overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } rand = "0.8" diff --git a/nomos-services/data-availability/tests/src/common.rs b/nomos-services/data-availability/tests/src/common.rs index 526fd245..2ae86d0e 100644 --- a/nomos-services/data-availability/tests/src/common.rs +++ b/nomos-services/data-availability/tests/src/common.rs @@ -56,6 +56,7 @@ use nomos_network::NetworkService; use nomos_node::{Tx, Wire}; use nomos_storage::backends::rocksdb::RocksBackend; use nomos_storage::StorageService; +use once_cell::sync::Lazy; use overwatch_derive::*; use overwatch_rs::overwatch::{Overwatch, OverwatchRunner}; use overwatch_rs::services::handle::ServiceHandle; @@ -69,12 +70,18 @@ type IntegrationRng = TestRng; /// Membership used by the DA Network service. pub type NomosDaMembership = FillFromNodeList; -pub const PARAMS: DaEncoderParams = DaEncoderParams::default_with(2); -pub const ENCODER: DaEncoder = DaEncoder::new(PARAMS); +pub const GLOBAL_PARAMS_PATH: &str = "../../../tests/kzgrs/kzgrs_test_params"; pub const SK1: &str = "aca2c52f5928a53de79679daf390b0903eeccd9671b4350d49948d84334874806afe68536da9e076205a2af0af350e6c50851a040e3057b6544a29f5689ccd31"; pub const SK2: &str = "f9dc26eea8bc56d9a4c59841b438665b998ce5e42f49f832df5b770a725c2daafee53b33539127321f6f5085e42902bd380e82d18a7aff6404e632b842106785"; +pub static PARAMS: Lazy = Lazy::new(|| { + let global_parameters = + kzgrs_backend::global::global_parameters_from_file(GLOBAL_PARAMS_PATH).unwrap(); + DaEncoderParams::new(2, false, global_parameters) +}); +pub static ENCODER: Lazy = Lazy::new(|| DaEncoder::new(PARAMS.clone())); + pub(crate) type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus< cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter, MockPool::Hash>, @@ -246,8 +253,8 @@ pub fn new_node( num_samples: da_network_settings.num_samples, num_subnets: da_network_settings.num_subnets, // Sampling service period can't be zero. - old_blobs_check_interval: Duration::from_secs(1), - blobs_validity_duration: Duration::from_secs(15), + old_blobs_check_interval: Duration::from_secs(5), + blobs_validity_duration: Duration::from_secs(u64::MAX), }, network_adapter_settings: (), storage_adapter_settings: SamplingStorageSettings { diff --git a/nomos-services/data-availability/tests/src/indexer_integration.rs b/nomos-services/data-availability/tests/src/indexer_integration.rs index daac5447..ad1dffe1 100644 --- a/nomos-services/data-availability/tests/src/indexer_integration.rs +++ b/nomos-services/data-availability/tests/src/indexer_integration.rs @@ -93,8 +93,8 @@ fn test_indexer() { let blobs_dir = TempDir::new().unwrap().path().to_path_buf(); - let (node1_sk, node1_pk) = generate_blst_hex_keys(); - let (node2_sk, node2_pk) = generate_blst_hex_keys(); + let (node1_sk, _) = generate_blst_hex_keys(); + let (node2_sk, _) = generate_blst_hex_keys(); let (peer_sk_1, peer_id_1) = create_ed25519_sk_peerid(SK1); let (peer_sk_2, peer_id_2) = create_ed25519_sk_peerid(SK2); @@ -119,7 +119,8 @@ fn test_indexer() { vec![node_address(&swarm_config2)], KzgrsDaVerifierSettings { sk: node1_sk.clone(), - nodes_public_keys: vec![node1_pk.clone(), node2_pk.clone()], + index: [0].into(), + global_params_path: GLOBAL_PARAMS_PATH.into(), }, TestDaNetworkSettings { peer_addresses: peer_addresses.clone(), @@ -142,7 +143,8 @@ fn test_indexer() { vec![node_address(&swarm_config1)], KzgrsDaVerifierSettings { sk: node2_sk.clone(), - nodes_public_keys: vec![node1_pk.clone(), node2_pk.clone()], + index: [1].into(), + global_params_path: GLOBAL_PARAMS_PATH.into(), }, TestDaNetworkSettings { peer_addresses, @@ -214,6 +216,17 @@ fn test_indexer() { let blob_hash = ::id(&blobs[0]); let blob_info = BlobInfo::new(blob_hash, meta); + let mut node_1_blob_0_idx = Vec::new(); + node_1_blob_0_idx.extend_from_slice(&blob_hash); + node_1_blob_0_idx.extend_from_slice(&0u16.to_be_bytes()); + + let mut node_1_blob_1_idx = Vec::new(); + node_1_blob_1_idx.extend_from_slice(&blob_hash); + node_1_blob_1_idx.extend_from_slice(&1u16.to_be_bytes()); + + let node_2_blob_0_idx = node_1_blob_0_idx.clone(); + let node_2_blob_1_idx = node_1_blob_1_idx.clone(); + // Test get Metadata for Certificate let (app_id2, index2) = blob_info.metadata(); @@ -226,12 +239,17 @@ fn test_indexer() { node2.spawn(async move { let storage_outbound = node2_storage.connect().await.unwrap(); - // Mock attested blob by writting directly into the da storage. - let attested_key = key_bytes(DA_VERIFIED_KEY_PREFIX, blob_hash); - + // Mock both attested blobs by writting directly into the da storage. storage_outbound .send(nomos_storage::StorageMsg::Store { - key: attested_key.into(), + key: key_bytes(DA_VERIFIED_KEY_PREFIX, node_2_blob_0_idx).into(), + value: Bytes::new(), + }) + .await + .unwrap(); + storage_outbound + .send(nomos_storage::StorageMsg::Store { + key: key_bytes(DA_VERIFIED_KEY_PREFIX, node_2_blob_1_idx).into(), value: Bytes::new(), }) .await @@ -256,12 +274,17 @@ fn test_indexer() { Err(_) => None, }); - // Mock attested blob by writting directly into the da storage. - let attested_key = key_bytes(DA_VERIFIED_KEY_PREFIX, blob_hash); - + // Mock both attested blobs by writting directly into the da storage. storage_outbound .send(nomos_storage::StorageMsg::Store { - key: attested_key.into(), + key: key_bytes(DA_VERIFIED_KEY_PREFIX, node_1_blob_0_idx).into(), + value: Bytes::new(), + }) + .await + .unwrap(); + storage_outbound + .send(nomos_storage::StorageMsg::Store { + key: key_bytes(DA_VERIFIED_KEY_PREFIX, node_1_blob_1_idx).into(), value: Bytes::new(), }) .await diff --git a/nomos-services/data-availability/tests/src/verifier_integration.rs b/nomos-services/data-availability/tests/src/verifier_integration.rs index 668748ab..76feefdf 100644 --- a/nomos-services/data-availability/tests/src/verifier_integration.rs +++ b/nomos-services/data-availability/tests/src/verifier_integration.rs @@ -74,8 +74,8 @@ fn test_verifier() { let blobs_dir = TempDir::new().unwrap().path().to_path_buf(); - let (node1_sk, node1_pk) = generate_blst_hex_keys(); - let (node2_sk, node2_pk) = generate_blst_hex_keys(); + let (node1_sk, _) = generate_blst_hex_keys(); + let (node2_sk, _) = generate_blst_hex_keys(); let client_zone = new_client(NamedTempFile::new().unwrap().path().to_path_buf()); @@ -102,7 +102,8 @@ fn test_verifier() { vec![node_address(&swarm_config2)], KzgrsDaVerifierSettings { sk: node1_sk.clone(), - nodes_public_keys: vec![node1_pk.clone(), node2_pk.clone()], + index: [0].into(), + global_params_path: GLOBAL_PARAMS_PATH.into(), }, TestDaNetworkSettings { peer_addresses: peer_addresses.clone(), @@ -125,7 +126,8 @@ fn test_verifier() { vec![node_address(&swarm_config1)], KzgrsDaVerifierSettings { sk: node2_sk, - nodes_public_keys: vec![node1_pk, node2_pk], + index: [1].into(), + global_params_path: GLOBAL_PARAMS_PATH.into(), }, TestDaNetworkSettings { peer_addresses, @@ -155,7 +157,10 @@ fn test_verifier() { // Encode data let encoder = &ENCODER; - let data = rand_data(10); + let data = vec![ + 49u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, + 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, + ]; let encoded_data = encoder.encode(&data).unwrap(); let columns: Vec<_> = encoded_data.extended_data.columns().collect(); diff --git a/nomos-services/data-availability/verifier/src/backend/kzgrs.rs b/nomos-services/data-availability/verifier/src/backend/kzgrs.rs index 6e29fcef..59d5e9b0 100644 --- a/nomos-services/data-availability/verifier/src/backend/kzgrs.rs +++ b/nomos-services/data-availability/verifier/src/backend/kzgrs.rs @@ -1,8 +1,12 @@ // std use core::fmt; +use std::collections::HashSet; // crates -use blst::{min_sig::PublicKey, min_sig::SecretKey}; -use kzgrs_backend::{common::blob::DaBlob, verifier::DaVerifier as NomosKzgrsVerifier}; +use blst::min_sig::SecretKey; +use kzgrs_backend::{ + common::blob::DaBlob, global::global_parameters_from_file, + verifier::DaVerifier as NomosKzgrsVerifier, +}; use nomos_core::da::DaVerifier; use serde::{Deserialize, Serialize}; // internal @@ -34,19 +38,10 @@ impl VerifierBackend for KzgrsDaVerifier { let bytes = hex::decode(settings.sk).expect("Secret key string should decode to bytes"); let secret_key = SecretKey::from_bytes(&bytes).expect("Secret key should be reconstructed from bytes"); + let global_params = global_parameters_from_file(&settings.global_params_path) + .expect("Global parameters has to be loaded from file"); - let nodes_public_keys = settings - .nodes_public_keys - .iter() - .map(|pk_hex| { - let pk_bytes = - hex::decode(pk_hex).expect("Public key string should decode to bytes"); - PublicKey::from_bytes(&pk_bytes) - .expect("Public key should be reconstructed from bytes") - }) - .collect::>(); - - let verifier = NomosKzgrsVerifier::new(secret_key, &nodes_public_keys); + let verifier = NomosKzgrsVerifier::new(secret_key, settings.index, global_params); Self { verifier } } } @@ -71,5 +66,6 @@ impl DaVerifier for KzgrsDaVerifier { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct KzgrsDaVerifierSettings { pub sk: String, - pub nodes_public_keys: Vec, + pub index: HashSet, + pub global_params_path: String, } diff --git a/nomos-services/data-availability/verifier/src/lib.rs b/nomos-services/data-availability/verifier/src/lib.rs index b7b688f6..ff8645e4 100644 --- a/nomos-services/data-availability/verifier/src/lib.rs +++ b/nomos-services/data-availability/verifier/src/lib.rs @@ -2,27 +2,27 @@ pub mod backend; pub mod network; pub mod storage; -use nomos_core::da::blob::Blob; // std -use nomos_storage::StorageService; -use overwatch_rs::services::life_cycle::LifecycleMessage; -use serde::{Deserialize, Serialize}; use std::error::Error; use std::fmt::{Debug, Formatter}; -use storage::DaStorageAdapter; -use tokio::sync::oneshot::Sender; // crates +use nomos_core::da::blob::Blob; +use nomos_da_network_service::NetworkService; +use nomos_storage::StorageService; +use overwatch_rs::services::handle::ServiceStateHandle; +use overwatch_rs::services::life_cycle::LifecycleMessage; +use overwatch_rs::services::relay::{Relay, RelayMessage}; +use overwatch_rs::services::state::{NoOperator, NoState}; +use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; +use overwatch_rs::DynError; +use serde::{Deserialize, Serialize}; +use tokio::sync::oneshot::Sender; use tokio_stream::StreamExt; use tracing::{error, span, Instrument, Level}; // internal use backend::VerifierBackend; use network::NetworkAdapter; -use nomos_da_network_service::NetworkService; -use overwatch_rs::services::handle::ServiceStateHandle; -use overwatch_rs::services::relay::{Relay, RelayMessage}; -use overwatch_rs::services::state::{NoOperator, NoState}; -use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; -use overwatch_rs::DynError; +use storage::DaStorageAdapter; const DA_VERIFIER_TAG: ServiceId = "DA-Verifier"; pub enum DaVerifierMsg { @@ -75,7 +75,11 @@ where storage_adapter: &S, blob: &Backend::DaBlob, ) -> Result<(), DynError> { - if storage_adapter.get_blob(blob.id()).await?.is_some() { + if storage_adapter + .get_blob(blob.id(), blob.column_idx()) + .await? + .is_some() + { Ok(()) } else { verifier.verify(blob)?; diff --git a/nomos-services/data-availability/verifier/src/storage/adapters/rocksdb.rs b/nomos-services/data-availability/verifier/src/storage/adapters/rocksdb.rs index b9cbd77a..acda9147 100644 --- a/nomos-services/data-availability/verifier/src/storage/adapters/rocksdb.rs +++ b/nomos-services/data-availability/verifier/src/storage/adapters/rocksdb.rs @@ -83,8 +83,10 @@ where async fn get_blob( &self, - blob_idx: ::BlobId, + blob_id: ::BlobId, + column_idx: ::ColumnIndex, ) -> Result, DynError> { + let blob_idx = create_blob_idx(blob_id.as_ref(), column_idx.as_ref()); let key = key_bytes(DA_VERIFIED_KEY_PREFIX, blob_idx); let (reply_channel, reply_rx) = tokio::sync::oneshot::channel(); self.storage_relay diff --git a/nomos-services/data-availability/verifier/src/storage/mod.rs b/nomos-services/data-availability/verifier/src/storage/mod.rs index f1cc6614..c577205c 100644 --- a/nomos-services/data-availability/verifier/src/storage/mod.rs +++ b/nomos-services/data-availability/verifier/src/storage/mod.rs @@ -28,5 +28,6 @@ pub trait DaStorageAdapter { async fn get_blob( &self, blob_id: ::BlobId, + column_idx: ::ColumnIndex, ) -> Result, DynError>; } diff --git a/nomos-services/storage/src/backends/rocksdb.rs b/nomos-services/storage/src/backends/rocksdb.rs index 76f3ddc9..f5b12688 100644 --- a/nomos-services/storage/src/backends/rocksdb.rs +++ b/nomos-services/storage/src/backends/rocksdb.rs @@ -6,11 +6,12 @@ use async_trait::async_trait; use bytes::Bytes; pub use rocksdb::Error; use rocksdb::{Options, DB}; +use serde::{Deserialize, Serialize}; // internal use super::{StorageBackend, StorageSerde, StorageTransaction}; /// Rocks backend setting -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct RocksBackendSettings { /// File path to the db file pub db_path: PathBuf, diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 02794ebd..b82f4f97 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -22,6 +22,7 @@ nomos-da-network-service = { path = "../nomos-services/data-availability/network nomos-da-indexer = { path = "../nomos-services/data-availability/indexer" } nomos-da-verifier = { path = "../nomos-services/data-availability/verifier" } nomos-da-sampling = { path = "../nomos-services/data-availability/sampling" } +nomos-storage = { path = "../nomos-services/storage" } subnetworks-assignations = { path = "../nomos-da/network/subnetworks-assignations" } full-replication = { path = "../nomos-da/full-replication" } hex = "0.4.3" diff --git a/tests/kzgrs/kzgrs_test_params b/tests/kzgrs/kzgrs_test_params new file mode 100644 index 00000000..ce727bcc Binary files /dev/null and b/tests/kzgrs/kzgrs_test_params differ diff --git a/tests/src/lib.rs b/tests/src/lib.rs index d1e88004..508bdaae 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -17,6 +17,16 @@ use rand::{thread_rng, Rng}; static NET_PORT: Lazy> = Lazy::new(|| Mutex::new(thread_rng().gen_range(8000..10000))); static IS_SLOW_TEST_ENV: Lazy = Lazy::new(|| env::var("SLOW_TEST_ENV").is_ok_and(|s| s == "true")); +pub static GLOBAL_PARAMS_PATH: Lazy = Lazy::new(|| { + let relative_path = "./kzgrs/kzgrs_test_params"; + let current_dir = env::current_dir().expect("Failed to get current directory"); + current_dir + .join(relative_path) + .canonicalize() + .expect("Failed to resolve absolute path") + .to_string_lossy() + .to_string() +}); pub fn get_available_port() -> u16 { let mut port = NET_PORT.lock().unwrap(); @@ -49,8 +59,12 @@ pub trait Node: Sized { } fn node_configs(config: SpawnConfig) -> Vec { match config { - SpawnConfig::Star { consensus, da } => { - let mut configs = Self::create_node_configs(consensus, da); + SpawnConfig::Star { + consensus, + da, + test, + } => { + let mut configs = Self::create_node_configs(consensus, da, test); let next_leader_config = configs.remove(0); let first_node_addr = node_address(&next_leader_config); let mut node_configs = vec![next_leader_config]; @@ -64,8 +78,12 @@ pub trait Node: Sized { } node_configs } - SpawnConfig::Chain { consensus, da } => { - let mut configs = Self::create_node_configs(consensus, da); + SpawnConfig::Chain { + consensus, + da, + test, + } => { + let mut configs = Self::create_node_configs(consensus, da, test); let next_leader_config = configs.remove(0); let mut prev_node_addr = node_address(&next_leader_config); let mut node_configs = vec![next_leader_config]; @@ -79,7 +97,11 @@ pub trait Node: Sized { } } } - fn create_node_configs(consensus: ConsensusConfig, da: DaConfig) -> Vec; + fn create_node_configs( + consensus: ConsensusConfig, + da: DaConfig, + test: TestConfig, + ) -> Vec; async fn consensus_info(&self) -> Self::ConsensusInfo; fn stop(&mut self); } @@ -90,17 +112,19 @@ pub enum SpawnConfig { Star { consensus: ConsensusConfig, da: DaConfig, + test: TestConfig, }, // Chain topology: Every node is chained to the node next to it. Chain { consensus: ConsensusConfig, da: DaConfig, + test: TestConfig, }, } impl SpawnConfig { // Returns a SpawnConfig::Chain with proper configurations for happy-path tests - pub fn chain_happy(n_participants: usize, da: DaConfig) -> Self { + pub fn chain_happy(n_participants: usize, da: DaConfig, test: TestConfig) -> Self { Self::Chain { consensus: ConsensusConfig { n_participants, @@ -112,10 +136,11 @@ impl SpawnConfig { active_slot_coeff: 0.9, }, da, + test, } } - pub fn star_happy(n_participants: usize, da: DaConfig) -> Self { + pub fn star_happy(n_participants: usize, da: DaConfig, test: TestConfig) -> Self { Self::Star { consensus: ConsensusConfig { n_participants, @@ -127,6 +152,7 @@ impl SpawnConfig { active_slot_coeff: 0.9, }, da, + test, } } } @@ -154,6 +180,7 @@ pub struct DaConfig { pub num_subnets: u16, pub old_blobs_check_interval: Duration, pub blobs_validity_duration: Duration, + pub global_params_path: String, } impl Default for DaConfig { @@ -165,7 +192,21 @@ impl Default for DaConfig { num_samples: 1, num_subnets: 2, old_blobs_check_interval: Duration::from_secs(5), - blobs_validity_duration: Duration::from_secs(15), + blobs_validity_duration: Duration::from_secs(u64::MAX), + global_params_path: GLOBAL_PARAMS_PATH.to_string(), + } + } +} + +#[derive(Clone)] +pub struct TestConfig { + pub wait_online_secs: u64, +} + +impl Default for TestConfig { + fn default() -> Self { + Self { + wait_online_secs: 10, } } } diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index d1224e8e..669db1b2 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -1,11 +1,10 @@ // std use std::net::SocketAddr; +use std::ops::Range; use std::process::{Child, Command, Stdio}; use std::str::FromStr; use std::time::Duration; -// internal -use super::{create_tempdir, persist_tempdir, LOGS_PREFIX}; -use crate::{adjust_timeout, get_available_port, ConsensusConfig, DaConfig, Node}; +// crates use blst::min_sig::SecretKey; use cl::{InputWitness, NoteWitness, NullifierSecret}; use cryptarchia_consensus::{CryptarchiaInfo, CryptarchiaSettings, TimeConfig}; @@ -23,7 +22,9 @@ use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as Indexe use nomos_da_indexer::IndexerSettings; use nomos_da_network_service::backends::libp2p::validator::DaNetworkValidatorBackendSettings; use nomos_da_network_service::NetworkConfig as DaNetworkConfig; +use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings; use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as SamplingStorageAdapterSettings; +use nomos_da_sampling::DaSamplingServiceSettings; use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings; use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageAdapterSettings; use nomos_da_verifier::DaVerifierServiceSettings; @@ -34,21 +35,25 @@ use nomos_mempool::MempoolMetrics; use nomos_network::backends::libp2p::mixnet::MixnetConfig; use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig}; use nomos_node::{api::AxumBackendSettings, Config, Tx}; -// crates -use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings; -use nomos_da_sampling::DaSamplingServiceSettings; +use nomos_storage::backends::rocksdb::RocksBackendSettings; use once_cell::sync::Lazy; use rand::{thread_rng, Rng}; use reqwest::{Client, Url}; +use serde::{Deserialize, Serialize}; use subnetworks_assignations::versions::v1::FillFromNodeList; +use subnetworks_assignations::MembershipHandler; use tempfile::NamedTempFile; use time::OffsetDateTime; +// internal +use super::{create_tempdir, persist_tempdir, LOGS_PREFIX}; +use crate::{adjust_timeout, get_available_port, ConsensusConfig, DaConfig, Node, TestConfig}; static CLIENT: Lazy = Lazy::new(Client::new); const CRYPTARCHIA_INFO_API: &str = "cryptarchia/info"; const GET_HEADERS_INFO: &str = "cryptarchia/headers"; const NOMOS_BIN: &str = "../target/debug/nomos-node"; const STORAGE_BLOCKS_API: &str = "storage/block"; +const INDEXER_RANGE_API: &str = "da/get_range"; const DEFAULT_SLOT_TIME: u64 = 2; const CONSENSUS_SLOT_TIME_VAR: &str = "CONSENSUS_SLOT_TIME"; #[cfg(feature = "mixnet")] @@ -81,6 +86,7 @@ impl NomosNode { let dir = create_tempdir().unwrap(); let mut file = NamedTempFile::new().unwrap(); let config_path = file.path().to_owned(); + let wait_online_secs = config.wait_online_secs; // setup logging so that we can intercept it later in testing config.log.backend = LoggerBackend::File { @@ -89,6 +95,17 @@ impl NomosNode { }; config.log.format = LoggerFormat::Json; + config.storage.db_path = dir.path().join("db"); + config + .da_sampling + .storage_adapter_settings + .blob_storage_directory = dir.path().to_owned(); + config + .da_verifier + .storage_adapter_settings + .blob_storage_directory = dir.path().to_owned(); + config.da_indexer.storage.blob_storage_directory = dir.path().to_owned(); + serde_yaml::to_writer(&mut file, &config).unwrap(); let child = Command::new(std::env::current_dir().unwrap().join(NOMOS_BIN)) .arg(&config_path) @@ -102,9 +119,10 @@ impl NomosNode { _tempdir: dir, config, }; - tokio::time::timeout(adjust_timeout(Duration::from_secs(10)), async { - node.wait_online().await - }) + tokio::time::timeout( + adjust_timeout(Duration::from_secs(wait_online_secs)), + async { node.wait_online().await }, + ) .await .unwrap(); @@ -164,6 +182,23 @@ impl NomosNode { } } + pub async fn get_indexer_range( + &self, + app_id: [u8; 32], + range: Range<[u8; 8]>, + ) -> Vec<([u8; 8], Vec>)> { + CLIENT + .post(format!("http://{}/{}", self.addr, INDEXER_RANGE_API)) + .header("Content-Type", "application/json") + .body(serde_json::to_string(&GetRangeReq { app_id, range }).unwrap()) + .send() + .await + .unwrap() + .json::>)>>() + .await + .unwrap() + } + // not async so that we can use this in `Drop` pub fn get_logs_from_file(&self) -> String { println!( @@ -231,7 +266,11 @@ impl Node for NomosNode { /// so the leader can receive votes from all other nodes that will be subsequently spawned. /// If not, the leader will miss votes from nodes spawned before itself. /// This issue will be resolved by devising the block catch-up mechanism in the future. - fn create_node_configs(consensus: ConsensusConfig, da: DaConfig) -> Vec { + fn create_node_configs( + consensus: ConsensusConfig, + da: DaConfig, + test: TestConfig, + ) -> Vec { // we use the same random bytes for: // * da id // * coin sk @@ -290,6 +329,7 @@ impl Node for NomosNode { vec![coin], time_config.clone(), da.clone(), + test.wait_online_secs, #[cfg(feature = "mixnet")] MixnetConfig { mixclient: mixclient_config.clone(), @@ -305,8 +345,12 @@ impl Node for NomosNode { peer_ids.extend(da.executor_peer_ids); for config in &mut configs { - config.da_network.backend.membership = + let membership = FillFromNodeList::new(&peer_ids, da.subnetwork_size, da.dispersal_factor); + let local_peer_id = secret_key_to_peer_id(config.da_network.backend.node_key.clone()); + let subnetwork_ids = membership.membership(&local_peer_id); + config.da_verifier.verifier_settings.index = subnetwork_ids; + config.da_network.backend.membership = membership; config.da_network.backend.addresses = peer_addresses.clone(); } @@ -335,6 +379,12 @@ pub enum Pool { Cl, } +#[derive(Serialize, Deserialize)] +struct GetRangeReq { + pub app_id: [u8; 32], + pub range: Range<[u8; 8]>, +} + #[cfg(feature = "mixnet")] fn create_mixnet_config(ids: &[[u8; 32]]) -> (MixClientConfig, Vec) { use std::num::NonZeroU8; @@ -401,6 +451,7 @@ fn build_da_peer_list(configs: &[Config]) -> Vec<(PeerId, Multiaddr)> { .collect() } +#[allow(clippy::too_many_arguments)] fn create_node_config( id: [u8; 32], genesis_state: LedgerState, @@ -408,12 +459,12 @@ fn create_node_config( notes: Vec, time: TimeConfig, da_config: DaConfig, + wait_online_secs: u64, #[cfg(feature = "mixnet")] mixnet_config: MixnetConfig, ) -> Config { let swarm_config: SwarmConfig = Default::default(); let verifier_sk = SecretKey::key_gen(&id, &[]).unwrap(); - let verifier_pk_bytes = verifier_sk.sk_to_pk().to_bytes(); let verifier_sk_bytes = verifier_sk.to_bytes(); let mut config = Config { @@ -453,7 +504,8 @@ fn create_node_config( da_verifier: DaVerifierServiceSettings { verifier_settings: KzgrsDaVerifierSettings { sk: hex::encode(verifier_sk_bytes), - nodes_public_keys: vec![hex::encode(verifier_pk_bytes)], + index: Default::default(), + global_params_path: da_config.global_params_path, }, network_adapter_settings: (), storage_adapter_settings: VerifierStorageAdapterSettings { @@ -481,6 +533,12 @@ fn create_node_config( }, network_adapter_settings: (), }, + storage: RocksBackendSettings { + db_path: "./db".into(), + read_only: false, + column_family: Some("blocks".into()), + }, + wait_online_secs, }; config.network.backend.inner.port = get_available_port(); diff --git a/tests/src/tests/cli.rs b/tests/src/tests/cli.rs index 763a21df..a1a99d21 100644 --- a/tests/src/tests/cli.rs +++ b/tests/src/tests/cli.rs @@ -7,12 +7,13 @@ use nomos_libp2p::libp2p; use nomos_libp2p::Multiaddr; use nomos_libp2p::PeerId; use std::collections::HashMap; -use std::io::Write; +use std::time::Duration; use subnetworks_assignations::versions::v1::FillFromNodeList; use tempfile::NamedTempFile; use tests::nodes::NomosNode; use tests::Node; use tests::SpawnConfig; +use tests::GLOBAL_PARAMS_PATH; const CLI_BIN: &str = "../target/debug/nomos-cli"; const APP_ID: &str = "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715"; @@ -32,8 +33,12 @@ fn run_disseminate(disseminate: &Disseminate) { .arg(disseminate.columns.to_string()) .arg("--timeout") .arg(disseminate.timeout.to_string()) + .arg("--wait-until-disseminated") + .arg(disseminate.wait_until_disseminated.to_string()) .arg("--node-addr") - .arg(disseminate.node_addr.as_ref().unwrap().as_str()); + .arg(disseminate.node_addr.as_ref().unwrap().as_str()) + .arg("--global-params-path") + .arg(GLOBAL_PARAMS_PATH.to_string()); match (&disseminate.data, &disseminate.file) { (Some(data), None) => c.args(["--data", &data]), @@ -98,10 +103,11 @@ async fn disseminate(nodes: &Vec, config: &mut Disseminate) { } #[tokio::test] -async fn disseminate_blob() { +async fn disseminate_and_retrieve() { let mut config = Disseminate { data: Some("hello world".to_string()), - timeout: 180, + timeout: 60, + wait_until_disseminated: 5, app_id: APP_ID.into(), index: 0, columns: 2, @@ -112,64 +118,48 @@ async fn disseminate_blob() { 2, tests::DaConfig { dispersal_factor: 2, + subnetwork_size: 2, + num_subnets: 2, ..Default::default() }, + tests::TestConfig { + wait_online_secs: 50, + }, )) .await; disseminate(&nodes, &mut config).await; -} - -#[tokio::test] -async fn disseminate_big_blob() { - const MSG_SIZE: usize = 1024; - let mut config = Disseminate { - data: std::iter::repeat(String::from("X")) - .take(MSG_SIZE) - .collect::>() - .join("") - .into(), - timeout: 180, - app_id: APP_ID.into(), - index: 0, - columns: 2, - ..Default::default() - }; - - let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy( - 2, - tests::DaConfig { - dispersal_factor: 2, - ..Default::default() - }, - )) - .await; - - disseminate(&nodes, &mut config).await; -} - -#[tokio::test] -async fn disseminate_blob_from_file() { - let mut file = NamedTempFile::new().unwrap(); - file.write_all("hello world".as_bytes()).unwrap(); - - let mut config = Disseminate { - file: Some(file.path().to_path_buf()), - timeout: 180, - app_id: APP_ID.into(), - index: 0, - columns: 2, - ..Default::default() - }; - - let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy( - 4, - tests::DaConfig { - dispersal_factor: 2, - ..Default::default() - }, - )) - .await; - - disseminate(&nodes, &mut config).await; + tokio::time::sleep(Duration::from_secs(10)).await; + + let from = 0u64.to_be_bytes(); + let to = 1u64.to_be_bytes(); + let app_id = hex::decode(APP_ID).unwrap(); + + let node1_blobs = nodes[0] + .get_indexer_range(app_id.clone().try_into().unwrap(), from..to) + .await; + let node2_blobs = nodes[1] + .get_indexer_range(app_id.try_into().unwrap(), from..to) + .await; + + let node1_idx_0_blobs: Vec<_> = node1_blobs + .iter() + .filter(|(i, _)| i == &from) + .flat_map(|(_, blobs)| blobs) + .collect(); + let node2_idx_0_blobs: Vec<_> = node2_blobs + .iter() + .filter(|(i, _)| i == &from) + .flat_map(|(_, blobs)| blobs) + .collect(); + + // Index zero shouldn't be empty, node 2 replicated both blobs to node 1 because they both + // are in the same subnetwork. + for b in node1_idx_0_blobs.iter() { + assert!(!b.is_empty()) + } + + for b in node2_idx_0_blobs.iter() { + assert!(!b.is_empty()) + } } diff --git a/tests/src/tests/cryptarchia/happy.rs b/tests/src/tests/cryptarchia/happy.rs index 0500f793..788c93ff 100644 --- a/tests/src/tests/cryptarchia/happy.rs +++ b/tests/src/tests/cryptarchia/happy.rs @@ -52,6 +52,11 @@ async fn happy_test(nodes: &[NomosNode]) { #[tokio::test] async fn two_nodes_happy() { - let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy(2, Default::default())).await; + let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy( + 2, + Default::default(), + Default::default(), + )) + .await; happy_test(&nodes).await; }