1
0
mirror of synced 2025-02-02 19:04:42 +00:00

DA: Replication integration (#764)

* Handle replication messages

* Multiple indexes for kzgrs verifier

* Add rocksdb storage to node config

* Configurable delay before sending blob info to mempool

* Dial peers in same subnet for replication

* Update da integration tests

* Update nomos node tests

* Load kzgrs config from file

* SLOW_ENV set true for macos CI

* Assert retrieved blobs in dissemination test

* Kzgrs params deserialize unchecked
This commit is contained in:
gusto 2024-09-25 10:27:39 +02:00 committed by GitHub
parent ca0eb824aa
commit 915fe3f098
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 557 additions and 220 deletions

View File

@ -520,7 +520,7 @@ where
post,
path = "/da/get_range",
responses(
(status = 200, description = "Range of blobs", body = Vec<([u8;8], Option<DaBlob>)>),
(status = 200, description = "Range of blobs", body = Vec<([u8;8], Vec<DaBlob>)>),
(status = 500, description = "Internal server error", body = String),
)
)]

View File

@ -15,12 +15,13 @@ use nomos_libp2p::{ed25519::SecretKey, Multiaddr};
use nomos_log::{Logger, LoggerBackend, LoggerFormat};
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
use nomos_network::NetworkService;
use nomos_storage::backends::rocksdb::RocksBackend;
use overwatch_rs::services::ServiceData;
use serde::{Deserialize, Serialize};
use subnetworks_assignations::versions::v1::FillFromNodeList;
use tracing::Level;
// internal
use crate::NomosApiService;
use crate::{NomosApiService, Wire};
#[derive(ValueEnum, Clone, Debug, Default)]
pub enum LoggerBackendType {
@ -120,6 +121,8 @@ pub struct Config {
pub da_sampling: <crate::DaSampling as ServiceData>::Settings,
pub http: <NomosApiService as ServiceData>::Settings,
pub cryptarchia: <crate::Cryptarchia as ServiceData>::Settings,
pub storage: <crate::StorageService<RocksBackend<Wire>> as ServiceData>::Settings,
pub wait_online_secs: u64,
}
impl Config {

View File

@ -16,8 +16,6 @@ use overwatch_rs::overwatch::*;
use tracing::{span, Level};
use uuid::Uuid;
const DEFAULT_DB_PATH: &str = "./db";
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
@ -97,11 +95,7 @@ fn main() -> Result<()> {
cryptarchia: config.cryptarchia,
#[cfg(feature = "metrics")]
metrics: MetricsSettings { registry },
storage: nomos_storage::backends::rocksdb::RocksBackendSettings {
db_path: std::path::PathBuf::from(DEFAULT_DB_PATH),
read_only: false,
column_family: Some("blocks".into()),
},
storage: config.storage,
system_sig: (),
},
None,

View File

@ -41,12 +41,18 @@ pub struct Disseminate {
// Number of columns to use for encoding.
#[clap(long, default_value = "4096")]
pub columns: usize,
// Duration in seconds to wait before publishing blob info.
#[clap(short, long, default_value = "5")]
pub wait_until_disseminated: u64,
/// File to write the certificate to, if present.
#[clap(long)]
pub output: Option<PathBuf>,
/// File to disseminate
#[clap(short, long)]
pub file: Option<PathBuf>,
// Path to the KzgRs global parameters.
#[clap(long)]
pub global_params_path: String,
}
impl Disseminate {
@ -80,8 +86,11 @@ impl Disseminate {
let output = self.output.clone();
let num_columns = self.columns;
let with_cache = self.with_cache;
let wait_until_disseminated = Duration::from_secs(self.wait_until_disseminated);
let global_params_path = self.global_params_path.clone();
let (payload_sender, payload_rx) = tokio::sync::mpsc::unbounded_channel();
payload_sender.send(bytes.into_boxed_slice()).unwrap();
std::thread::spawn(move || {
OverwatchRunner::<DisseminateApp>::run(
DisseminateAppServiceSettings {
@ -96,7 +105,9 @@ impl Disseminate {
metadata,
status_updates,
node_addr,
wait_until_disseminated,
output,
global_params_path,
},
logger: LoggerSettings {
backend: LoggerBackend::None,

View File

@ -33,6 +33,7 @@ use tokio::sync::{mpsc::UnboundedReceiver, Mutex};
use super::{network::adapters::libp2p::Libp2pExecutorDispersalAdapter, NetworkBackend};
use crate::api::mempool::send_blob_info;
#[allow(clippy::too_many_arguments)]
pub async fn disseminate_and_wait<E, D>(
encoder: &E,
disperal: &D,
@ -40,6 +41,7 @@ pub async fn disseminate_and_wait<E, D>(
metadata: Metadata,
status_updates: Sender<Status>,
node_addr: Option<&Url>,
wait_until_disseminated: Duration,
output: Option<&PathBuf>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
@ -68,6 +70,9 @@ where
std::fs::write(output, wire::serialize(&blob_info)?)?;
}
// Wait for blobs be replicated in da network.
tokio::time::sleep(wait_until_disseminated).await;
// 4) Send blob info to the mempool.
if let Some(node) = node_addr {
status_updates.send(Status::SendingBlobInfo)?;
@ -127,7 +132,9 @@ pub struct Settings {
pub metadata: Metadata,
pub status_updates: Sender<Status>,
pub node_addr: Option<Url>,
pub wait_until_disseminated: Duration,
pub output: Option<std::path::PathBuf>,
pub global_params_path: String,
}
pub struct DisseminateService {
@ -157,7 +164,9 @@ impl ServiceCore for DisseminateService {
metadata,
status_updates,
node_addr,
wait_until_disseminated,
output,
global_params_path,
} = service_state.settings_reader.get_updated_settings();
let network_relay: Relay<NetworkService<NetworkBackend>> =
@ -167,9 +176,13 @@ impl ServiceCore for DisseminateService {
.await
.expect("Relay connection with NetworkService should succeed");
let global_params = kzgrs_backend::global::global_parameters_from_file(&global_params_path)
.expect("Global parameters should be loaded from file");
let params = kzgrs_backend::encoder::DaEncoderParams::new(
kzgrs_settings.num_columns,
kzgrs_settings.with_cache,
global_params,
);
let da_encoder = kzgrs_backend::encoder::DaEncoder::new(params);
let da_dispersal = Libp2pExecutorDispersalAdapter::new(network_relay);
@ -184,6 +197,7 @@ impl ServiceCore for DisseminateService {
metadata,
status_updates.clone(),
node_addr.as_ref(),
wait_until_disseminated,
output.as_ref(),
),
)

View File

@ -95,6 +95,7 @@ mod tests {
test::{rand_data, ENCODER},
EncodedData,
},
global::GLOBAL_PARAMETERS,
verifier::DaVerifier,
};
@ -141,7 +142,9 @@ mod tests {
.clone()
.into_iter()
.enumerate()
.map(|(index, sk)| DaVerifier { sk, index })
.map(|(index, sk)| {
DaVerifier::new(sk, [index as u32].into(), GLOBAL_PARAMETERS.clone())
})
.collect();
let encoded_data = encoder.encode(&data).unwrap();

View File

@ -7,8 +7,8 @@ use ark_poly::EvaluationDomain;
use kzgrs::common::bytes_to_polynomial_unchecked;
use kzgrs::fk20::{fk20_batch_generate_elements_proofs, Toeplitz1Cache};
use kzgrs::{
bytes_to_polynomial, commit_polynomial, encode, Commitment, Evaluations, KzgRsError,
Polynomial, PolynomialEvaluationDomain, Proof, BYTES_PER_FIELD_ELEMENT,
bytes_to_polynomial, commit_polynomial, encode, Commitment, Evaluations, GlobalParameters,
KzgRsError, Polynomial, PolynomialEvaluationDomain, Proof, BYTES_PER_FIELD_ELEMENT,
};
#[cfg(feature = "parallel")]
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
@ -21,24 +21,27 @@ use crate::global::GLOBAL_PARAMETERS;
pub struct DaEncoderParams {
column_count: usize,
toeplitz1cache: Option<Toeplitz1Cache>,
global_parameters: GlobalParameters,
}
impl DaEncoderParams {
pub const MAX_BLS12_381_ENCODING_CHUNK_SIZE: usize = 31;
pub fn new(column_count: usize, with_cache: bool) -> Self {
pub fn new(column_count: usize, with_cache: bool, global_parameters: GlobalParameters) -> Self {
let toeplitz1cache =
with_cache.then(|| Toeplitz1Cache::with_size(&GLOBAL_PARAMETERS, column_count));
with_cache.then(|| Toeplitz1Cache::with_size(&global_parameters, column_count));
Self {
column_count,
toeplitz1cache,
global_parameters,
}
}
pub const fn default_with(column_count: usize) -> Self {
pub fn default_with(column_count: usize) -> Self {
Self {
column_count,
toeplitz1cache: None,
global_parameters: GLOBAL_PARAMETERS.clone(),
}
}
}
@ -83,6 +86,7 @@ impl DaEncoder {
#[allow(clippy::type_complexity)]
fn compute_kzg_row_commitments(
global_parameters: &GlobalParameters,
matrix: &ChunksMatrix,
polynomial_evaluation_domain: PolynomialEvaluationDomain,
) -> Result<Vec<((Evaluations, Polynomial), Commitment)>, KzgRsError> {
@ -104,7 +108,7 @@ impl DaEncoder {
r.as_bytes().as_ref(),
polynomial_evaluation_domain,
);
commit_polynomial(&poly, &GLOBAL_PARAMETERS)
commit_polynomial(&poly, global_parameters)
.map(|commitment| ((evals, poly), commitment))
})
.collect()
@ -136,6 +140,7 @@ impl DaEncoder {
}
fn compute_rows_proofs(
global_parameters: &GlobalParameters,
polynomials: &[Polynomial],
toeplitz1cache: Option<&Toeplitz1Cache>,
) -> Result<Vec<Vec<Proof>>, KzgRsError> {
@ -149,19 +154,25 @@ impl DaEncoder {
polynomials.par_iter()
}
}
.map(|poly| fk20_batch_generate_elements_proofs(poly, &GLOBAL_PARAMETERS, toeplitz1cache))
.map(|poly| fk20_batch_generate_elements_proofs(poly, global_parameters, toeplitz1cache))
.collect())
}
#[allow(clippy::type_complexity)]
fn compute_kzg_column_commitments(
global_parameters: &GlobalParameters,
matrix: &ChunksMatrix,
polynomial_evaluation_domain: PolynomialEvaluationDomain,
) -> Result<Vec<((Evaluations, Polynomial), Commitment)>, KzgRsError> {
Self::compute_kzg_row_commitments(&matrix.transposed(), polynomial_evaluation_domain)
Self::compute_kzg_row_commitments(
global_parameters,
&matrix.transposed(),
polynomial_evaluation_domain,
)
}
fn compute_aggregated_column_commitment(
global_parameters: &GlobalParameters,
matrix: &ChunksMatrix,
commitments: &[Commitment],
polynomial_evaluation_domain: PolynomialEvaluationDomain,
@ -179,17 +190,18 @@ impl DaEncoder {
let (evals, poly) = bytes_to_polynomial::<
{ DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE },
>(hashes.as_ref(), polynomial_evaluation_domain)?;
let commitment = commit_polynomial(&poly, &GLOBAL_PARAMETERS)?;
let commitment = commit_polynomial(&poly, global_parameters)?;
Ok(((evals, poly), commitment))
}
fn compute_aggregated_column_proofs(
global_parameters: &GlobalParameters,
polynomial: &Polynomial,
toeplitz1cache: Option<&Toeplitz1Cache>,
) -> Result<Vec<Proof>, KzgRsError> {
Ok(fk20_batch_generate_elements_proofs(
polynomial,
&GLOBAL_PARAMETERS,
global_parameters,
toeplitz1cache,
))
}
@ -215,31 +227,37 @@ impl nomos_core::da::DaEncoder for DaEncoder {
type Error = KzgRsError;
fn encode(&self, data: &[u8]) -> Result<EncodedData, KzgRsError> {
let global_parameters = &self.params.global_parameters;
let chunked_data = self.chunkify(data);
let row_domain = PolynomialEvaluationDomain::new(self.params.column_count)
.expect("Domain should be able to build");
let column_domain = PolynomialEvaluationDomain::new(chunked_data.len())
.expect("Domain should be able to build");
let (row_polynomials, row_commitments): (Vec<_>, Vec<_>) =
Self::compute_kzg_row_commitments(&chunked_data, row_domain)?
Self::compute_kzg_row_commitments(global_parameters, &chunked_data, row_domain)?
.into_iter()
.unzip();
let (_, row_polynomials): (Vec<_>, Vec<_>) = row_polynomials.into_iter().unzip();
let encoded_evaluations = Self::rs_encode_rows(&row_polynomials, row_domain);
let extended_data = Self::evals_to_chunk_matrix(&encoded_evaluations);
let rows_proofs =
Self::compute_rows_proofs(&row_polynomials, self.params.toeplitz1cache.as_ref())?;
let rows_proofs = Self::compute_rows_proofs(
global_parameters,
&row_polynomials,
self.params.toeplitz1cache.as_ref(),
)?;
let (_column_polynomials, column_commitments): (Vec<_>, Vec<_>) =
Self::compute_kzg_column_commitments(&extended_data, column_domain)?
Self::compute_kzg_column_commitments(global_parameters, &extended_data, column_domain)?
.into_iter()
.unzip();
let ((_aggregated_evals, aggregated_polynomial), aggregated_column_commitment) =
Self::compute_aggregated_column_commitment(
global_parameters,
&extended_data,
&column_commitments,
row_domain,
)?;
let aggregated_column_proofs = Self::compute_aggregated_column_proofs(
global_parameters,
&aggregated_polynomial,
self.params.toeplitz1cache.as_ref(),
)?;
@ -269,12 +287,14 @@ pub mod test {
BYTES_PER_FIELD_ELEMENT,
};
use nomos_core::da::DaEncoder as _;
use once_cell::sync::Lazy;
use rand::RngCore;
use std::ops::Div;
pub const DOMAIN_SIZE: usize = 16;
pub const PARAMS: DaEncoderParams = DaEncoderParams::default_with(DOMAIN_SIZE);
pub const ENCODER: DaEncoder = DaEncoder::new(PARAMS);
pub static DOMAIN_SIZE: usize = 16;
pub static PARAMS: Lazy<DaEncoderParams> =
Lazy::new(|| DaEncoderParams::default_with(DOMAIN_SIZE));
pub static ENCODER: Lazy<DaEncoder> = Lazy::new(|| DaEncoder::new(PARAMS.clone()));
pub fn rand_data(elements_count: usize) -> Vec<u8> {
let mut buff = vec![0; elements_count * DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE];
@ -301,7 +321,8 @@ pub mod test {
let data = rand_data(32);
let domain = GeneralEvaluationDomain::new(DOMAIN_SIZE).unwrap();
let matrix = ENCODER.chunkify(data.as_ref());
let commitments_data = DaEncoder::compute_kzg_row_commitments(&matrix, domain).unwrap();
let commitments_data =
DaEncoder::compute_kzg_row_commitments(&GLOBAL_PARAMETERS, &matrix, domain).unwrap();
assert_eq!(commitments_data.len(), matrix.len());
}
@ -311,7 +332,7 @@ pub mod test {
let matrix = ENCODER.chunkify(data.as_ref());
let domain = PolynomialEvaluationDomain::new(DOMAIN_SIZE).unwrap();
let (poly_data, _): (Vec<_>, Vec<_>) =
DaEncoder::compute_kzg_row_commitments(&matrix, domain)
DaEncoder::compute_kzg_row_commitments(&GLOBAL_PARAMETERS, &matrix, domain)
.unwrap()
.into_iter()
.unzip();
@ -331,7 +352,7 @@ pub mod test {
let domain = GeneralEvaluationDomain::new(DOMAIN_SIZE).unwrap();
let matrix = ENCODER.chunkify(data.as_ref());
let (poly_data, _): (Vec<_>, Vec<_>) =
DaEncoder::compute_kzg_row_commitments(&matrix, domain)
DaEncoder::compute_kzg_row_commitments(&GLOBAL_PARAMETERS, &matrix, domain)
.unwrap()
.into_iter()
.unzip();
@ -365,14 +386,15 @@ pub mod test {
let domain = GeneralEvaluationDomain::new(DOMAIN_SIZE).unwrap();
let matrix = ENCODER.chunkify(data.as_ref());
let (poly_data, commitments): (Vec<_>, Vec<_>) =
DaEncoder::compute_kzg_row_commitments(&matrix, domain)
DaEncoder::compute_kzg_row_commitments(&GLOBAL_PARAMETERS, &matrix, domain)
.unwrap()
.into_iter()
.unzip();
let (_evals, polynomials): (Vec<_>, Vec<_>) = poly_data.into_iter().unzip();
let extended_evaluations = DaEncoder::rs_encode_rows(&polynomials, domain);
let extended_matrix = DaEncoder::evals_to_chunk_matrix(&extended_evaluations);
let proofs = DaEncoder::compute_rows_proofs(&polynomials, None).unwrap();
let proofs =
DaEncoder::compute_rows_proofs(&GLOBAL_PARAMETERS, &polynomials, None).unwrap();
let checks = izip!(matrix.iter(), &commitments, &proofs);
for (row, commitment, proofs) in checks {
@ -411,7 +433,8 @@ pub mod test {
let data = rand_data(32);
let domain = GeneralEvaluationDomain::new(DOMAIN_SIZE).unwrap();
let matrix = ENCODER.chunkify(data.as_ref());
let commitments_data = DaEncoder::compute_kzg_column_commitments(&matrix, domain).unwrap();
let commitments_data =
DaEncoder::compute_kzg_column_commitments(&GLOBAL_PARAMETERS, &matrix, domain).unwrap();
assert_eq!(commitments_data.len(), matrix.columns().count());
}
@ -421,12 +444,17 @@ pub mod test {
let matrix = ENCODER.chunkify(data.as_ref());
let domain = GeneralEvaluationDomain::new(DOMAIN_SIZE).unwrap();
let (_, commitments): (Vec<_>, Vec<_>) =
DaEncoder::compute_kzg_column_commitments(&matrix, domain)
DaEncoder::compute_kzg_column_commitments(&GLOBAL_PARAMETERS, &matrix, domain)
.unwrap()
.into_iter()
.unzip();
let _ =
DaEncoder::compute_aggregated_column_commitment(&matrix, &commitments, domain).unwrap();
let _ = DaEncoder::compute_aggregated_column_commitment(
&GLOBAL_PARAMETERS,
&matrix,
&commitments,
domain,
)
.unwrap();
}
#[test]
@ -435,13 +463,19 @@ pub mod test {
let matrix = ENCODER.chunkify(data.as_ref());
let domain = GeneralEvaluationDomain::new(DOMAIN_SIZE).unwrap();
let (_poly_data, commitments): (Vec<_>, Vec<_>) =
DaEncoder::compute_kzg_column_commitments(&matrix, domain)
DaEncoder::compute_kzg_column_commitments(&GLOBAL_PARAMETERS, &matrix, domain)
.unwrap()
.into_iter()
.unzip();
let ((_evals, polynomial), _aggregated_commitment) =
DaEncoder::compute_aggregated_column_commitment(&matrix, &commitments, domain).unwrap();
DaEncoder::compute_aggregated_column_proofs(&polynomial, None).unwrap();
DaEncoder::compute_aggregated_column_commitment(
&GLOBAL_PARAMETERS,
&matrix,
&commitments,
domain,
)
.unwrap();
DaEncoder::compute_aggregated_column_proofs(&GLOBAL_PARAMETERS, &polynomial, None).unwrap();
}
#[test]

View File

@ -1,7 +1,32 @@
use kzgrs::{global_parameters_from_randomness, GlobalParameters};
use once_cell::sync::Lazy;
// Reexport global parameters loading from file.
pub use kzgrs::global_parameters_from_file;
pub static GLOBAL_PARAMETERS: Lazy<GlobalParameters> = Lazy::new(|| {
println!("WARNING: Global parameters are randomly generated. Use for development only.");
let mut rng = rand::thread_rng();
global_parameters_from_randomness(&mut rng)
});
#[cfg(test)]
mod tests {
use std::fs::File;
use ark_serialize::{CanonicalSerialize, Write};
use kzgrs::global_parameters_from_randomness;
#[test]
#[ignore = "for testing purposes only"]
fn write_random_kzgrs_params_to_file() {
let mut rng = rand::thread_rng();
let params = global_parameters_from_randomness(&mut rng);
let mut serialized_data = Vec::new();
params.serialize_uncompressed(&mut serialized_data).unwrap();
let mut file = File::create("./kzgrs_test_params").unwrap();
file.write_all(&serialized_data).unwrap();
}
}

View File

@ -1,41 +1,38 @@
// std
use ark_poly::EvaluationDomain;
use std::collections::HashSet;
// crates
use blst::min_sig::{PublicKey, SecretKey};
use ark_poly::EvaluationDomain;
use blst::min_sig::SecretKey;
use itertools::{izip, Itertools};
use kzgrs::common::field_element_from_bytes_le;
use kzgrs::{
bytes_to_polynomial, commit_polynomial, verify_element_proof, Commitment,
bytes_to_polynomial, commit_polynomial, verify_element_proof, Commitment, GlobalParameters,
PolynomialEvaluationDomain, Proof, BYTES_PER_FIELD_ELEMENT,
};
use crate::common::blob::DaBlob;
use nomos_core::da::blob::Blob;
// internal
use crate::common::blob::DaBlob;
use crate::common::{hash_column_and_commitment, Chunk, Column};
use crate::encoder::DaEncoderParams;
use crate::global::GLOBAL_PARAMETERS;
pub struct DaVerifier {
// TODO: substitute this for an abstraction to sign things over
pub sk: SecretKey,
pub index: usize,
pub index: HashSet<u32>,
global_parameters: GlobalParameters,
}
impl DaVerifier {
pub fn new(sk: SecretKey, nodes_public_keys: &[PublicKey]) -> Self {
// TODO: `is_sorted` is experimental, and by contract `nodes_public_keys` should be shorted
// but not sure how we could enforce it here without re-sorting anyway.
// assert!(nodes_public_keys.is_sorted());
let self_pk = sk.sk_to_pk();
let (index, _) = nodes_public_keys
.iter()
.find_position(|&pk| pk == &self_pk)
.expect("Self pk should be registered");
Self { sk, index }
pub fn new(sk: SecretKey, index: HashSet<u32>, global_parameters: GlobalParameters) -> Self {
Self {
sk,
index,
global_parameters,
}
}
fn verify_column(
global_parameters: &GlobalParameters,
column: &Column,
column_commitment: &Commitment,
aggregated_column_commitment: &Commitment,
@ -52,7 +49,7 @@ impl DaVerifier {
) else {
return false;
};
let Ok(computed_column_commitment) = commit_polynomial(&polynomial, &GLOBAL_PARAMETERS)
let Ok(computed_column_commitment) = commit_polynomial(&polynomial, global_parameters)
else {
return false;
};
@ -72,11 +69,12 @@ impl DaVerifier {
aggregated_column_commitment,
aggregated_column_proof,
rows_domain,
&GLOBAL_PARAMETERS,
global_parameters,
)
}
fn verify_chunk(
global_parameters: &GlobalParameters,
chunk: &Chunk,
commitment: &Commitment,
proof: &Proof,
@ -90,11 +88,12 @@ impl DaVerifier {
commitment,
proof,
domain,
&GLOBAL_PARAMETERS,
global_parameters,
)
}
fn verify_chunks(
global_parameters: &GlobalParameters,
chunks: &[Chunk],
commitments: &[Commitment],
proofs: &[Proof],
@ -108,7 +107,8 @@ impl DaVerifier {
return false;
}
for (chunk, commitment, proof) in izip!(chunks, commitments, proofs) {
if !DaVerifier::verify_chunk(chunk, commitment, proof, index, domain) {
if !DaVerifier::verify_chunk(global_parameters, chunk, commitment, proof, index, domain)
{
return false;
}
}
@ -118,12 +118,16 @@ impl DaVerifier {
pub fn verify(&self, blob: &DaBlob, rows_domain_size: usize) -> bool {
let rows_domain = PolynomialEvaluationDomain::new(rows_domain_size)
.expect("Domain should be able to build");
let blob_col_idx = &u16::from_be_bytes(blob.column_idx()).into();
let index = self.index.get(blob_col_idx).unwrap();
let is_column_verified = DaVerifier::verify_column(
&self.global_parameters,
&blob.column,
&blob.column_commitment,
&blob.aggregated_column_commitment,
&blob.aggregated_column_proof,
self.index,
*index as usize,
rows_domain,
);
if !is_column_verified {
@ -131,10 +135,11 @@ impl DaVerifier {
}
let are_chunks_verified = DaVerifier::verify_chunks(
&self.global_parameters,
blob.column.as_ref(),
&blob.rows_commitments,
&blob.rows_proofs,
self.index,
*index as usize,
rows_domain,
);
if !are_chunks_verified {
@ -160,7 +165,7 @@ mod test {
global_parameters_from_randomness, Commitment, GlobalParameters,
PolynomialEvaluationDomain, Proof, BYTES_PER_FIELD_ELEMENT,
};
use nomos_core::da::DaEncoder as _;
use nomos_core::da::DaEncoder;
use once_cell::sync::Lazy;
use rand::{thread_rng, RngCore};
@ -226,6 +231,7 @@ mod test {
let column_data = prepare_column(false).unwrap();
assert!(DaVerifier::verify_column(
&GLOBAL_PARAMETERS,
&column_data.column,
&column_data.column_commitment,
&column_data.aggregated_commitment,
@ -257,6 +263,7 @@ mod test {
.is_err());
assert!(!DaVerifier::verify_column(
&GLOBAL_PARAMETERS,
&column2,
&column_data.column_commitment,
&column_data.aggregated_commitment,
@ -269,6 +276,7 @@ mod test {
let column_data2 = prepare_column(true).unwrap();
assert!(!DaVerifier::verify_column(
&GLOBAL_PARAMETERS,
&column_data2.column,
&column_data2.column_commitment,
&column_data2.aggregated_commitment,
@ -303,6 +311,7 @@ mod test {
};
// Happy case
let chunks_verified = DaVerifier::verify_chunks(
&GLOBAL_PARAMETERS,
da_blob.column.as_ref(),
&da_blob.rows_commitments,
&da_blob.rows_proofs,
@ -316,6 +325,7 @@ mod test {
column_w_missing_chunk.pop();
let chunks_not_verified = !DaVerifier::verify_chunks(
&GLOBAL_PARAMETERS,
column_w_missing_chunk.as_ref(),
&da_blob.rows_commitments,
&da_blob.rows_proofs,
@ -329,6 +339,7 @@ mod test {
modified_proofs.swap(0, 1);
let chunks_not_verified = !DaVerifier::verify_chunks(
&GLOBAL_PARAMETERS,
da_blob.column.as_ref(),
&da_blob.rows_commitments,
&modified_proofs,
@ -342,6 +353,7 @@ mod test {
modified_commitments.swap(0, 1);
let chunks_not_verified = !DaVerifier::verify_chunks(
&GLOBAL_PARAMETERS,
da_blob.column.as_ref(),
&modified_commitments,
&da_blob.rows_proofs,
@ -367,7 +379,9 @@ mod test {
let verifiers: Vec<DaVerifier> = sks
.into_iter()
.enumerate()
.map(|(index, sk)| DaVerifier { sk, index })
.map(|(index, sk)| {
DaVerifier::new(sk, [index as u32].into(), GLOBAL_PARAMETERS.clone())
})
.collect();
let encoded_data = encoder.encode(&data).unwrap();
for (i, column) in encoded_data.extended_data.columns().enumerate() {

View File

@ -1,9 +1,24 @@
use super::GlobalParameters;
// std
use std::{error::Error, fs::File};
// crates
use ark_bls12_381::{fr::Fr, Bls12_381};
use ark_poly::polynomial::univariate::DensePolynomial;
use ark_poly_commit::kzg10::KZG10;
use ark_poly_commit::kzg10::{UniversalParams, KZG10};
use ark_serialize::{CanonicalDeserialize, Read};
use rand::Rng;
// internal
use super::GlobalParameters;
pub fn global_parameters_from_randomness<R: Rng>(rng: &mut R) -> GlobalParameters {
KZG10::<Bls12_381, DensePolynomial<Fr>>::setup(8192, true, rng).unwrap()
}
pub fn global_parameters_from_file(file_path: &str) -> Result<GlobalParameters, Box<dyn Error>> {
let mut file = File::open(file_path)?;
let mut serialized_data = Vec::new();
file.read_to_end(&mut serialized_data)?;
let params =
UniversalParams::<Bls12_381>::deserialize_uncompressed_unchecked(&*serialized_data)?;
Ok(params)
}

View File

@ -12,7 +12,7 @@ use ark_poly_commit::sonic_pc::UniversalParams;
use std::mem;
pub use common::{bytes_to_evaluations, bytes_to_polynomial, KzgRsError};
pub use global_parameters::global_parameters_from_randomness;
pub use global_parameters::{global_parameters_from_file, global_parameters_from_randomness};
pub use kzg::{commit_polynomial, generate_element_proof, verify_element_proof};
pub use rs::{decode, encode};

View File

@ -7,8 +7,8 @@ use either::Either;
use indexmap::IndexSet;
use libp2p::core::Endpoint;
use libp2p::swarm::{
ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, NotifyHandler, THandler,
THandlerInEvent, THandlerOutEvent, ToSwarm,
ConnectionClosed, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, NotifyHandler,
THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use libp2p::{Multiaddr, PeerId};
use log::{error, trace};
@ -43,7 +43,10 @@ pub struct ReplicationBehaviour<Membership> {
/// nomos DA subnetworks
membership: Membership,
/// Relation of connected peers of replication subnetworks
connected: HashMap<PeerId, ConnectionId>,
///
/// **TODO**: Node needs only one connection per peer for Nomos DA network communications.
/// Allowing multiple connections from the same peer id is only temporal and will be removed!
connected: HashMap<PeerId, HashSet<ConnectionId>>,
/// Outgoing event queue
outgoing_events: VecDeque<SwarmEvent>,
/// Seen messages cache holds a record of seen messages, messages will be removed from this
@ -114,14 +117,16 @@ where
.filter(|(peer_id, _connection_id)| peers.contains(peer_id))
.collect();
for (peer_id, connection_id) in connected_peers {
self.outgoing_events.push_back(SwarmEvent::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::One(*connection_id),
event: Either::Left(BehaviourEventToHandler::OutgoingMessage {
message: message.clone(),
}),
})
for (peer_id, connection_ids) in connected_peers {
for connection_id in connection_ids.iter() {
self.outgoing_events.push_back(SwarmEvent::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::One(*connection_id),
event: Either::Left(BehaviourEventToHandler::OutgoingMessage {
message: message.clone(),
}),
})
}
}
self.try_wake();
}
@ -152,7 +157,10 @@ where
return Ok(Either::Right(libp2p::swarm::dummy::ConnectionHandler));
}
trace!("{}, Connected to {peer_id}", self.local_peer_id);
self.connected.insert(peer_id, connection_id);
self.connected
.entry(peer_id)
.or_default()
.insert(connection_id);
Ok(Either::Left(ReplicationHandler::new()))
}
@ -164,11 +172,28 @@ where
_role_override: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
trace!("{}, Connected to {peer_id}", self.local_peer_id);
self.connected.insert(peer_id, connection_id);
self.connected
.entry(peer_id)
.or_default()
.insert(connection_id);
Ok(Either::Left(ReplicationHandler::new()))
}
fn on_swarm_event(&mut self, _event: FromSwarm) {}
fn on_swarm_event(&mut self, event: FromSwarm) {
if let FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id,
..
}) = event
{
if let Some(connections) = self.connected.get_mut(&peer_id) {
connections.remove(&connection_id);
if connections.is_empty() {
self.connected.remove(&peer_id);
}
}
}
}
fn on_connection_handler_event(
&mut self,

View File

@ -4,8 +4,8 @@ use std::time::Duration;
use futures::StreamExt;
use kzgrs_backend::common::blob::DaBlob;
use libp2p::identity::Keypair;
use libp2p::swarm::SwarmEvent;
use libp2p::{PeerId, Swarm, SwarmBuilder};
use libp2p::swarm::{DialError, SwarmEvent};
use libp2p::{Multiaddr, PeerId, Swarm, SwarmBuilder};
use log::{debug, error};
use nomos_da_messages::replication::ReplicationReq;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
@ -75,6 +75,15 @@ where
.build()
}
pub fn dial(&mut self, addr: Multiaddr) -> Result<(), DialError> {
self.swarm.dial(addr)?;
Ok(())
}
pub fn local_peer_id(&self) -> &PeerId {
self.swarm.local_peer_id()
}
pub fn protocol_swarm(&self) -> &Swarm<ValidatorBehaviour<Membership>> {
&self.swarm
}
@ -116,7 +125,21 @@ where
}
}
async fn handle_replication_event(&mut self, _event: ReplicationEvent) {}
async fn handle_replication_event(&mut self, event: ReplicationEvent) {
let ReplicationEvent::IncomingMessage { message, .. } = event;
if let Ok(blob) = bincode::deserialize::<DaBlob>(
message
.blob
.as_ref()
.expect("Message blob should not be empty")
.data
.as_slice(),
) {
if let Err(e) = self.validation_events_sender.send(blob) {
error!("Error sending blob to validation: {e:?}");
}
}
}
async fn handle_behaviour_event(&mut self, event: ValidatorBehaviourEvent<Membership>) {
match event {

View File

@ -16,6 +16,7 @@ use nomos_libp2p::secret_key_serde;
use overwatch_rs::overwatch::handle::OverwatchHandle;
use overwatch_rs::services::state::NoState;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::pin::Pin;
@ -131,8 +132,8 @@ where
libp2p::identity::Keypair::from(ed25519::Keypair::from(config.node_key.clone()));
let (mut validator_swarm, events_streams) = ValidatorSwarm::new(
keypair,
config.membership,
config.addresses.into_iter().collect(),
config.membership.clone(),
config.addresses.clone().into_iter().collect(),
);
let sampling_request_channel = validator_swarm
.protocol_swarm()
@ -147,6 +148,33 @@ where
.unwrap_or_else(|e| {
panic!("Error listening on DA network with address {address}: {e}")
});
// Dial peers in the same subnetworks (Node might participate in multiple).
let local_peer_id = *validator_swarm.local_peer_id();
let mut connected_peers = HashSet::new();
config
.membership
.membership(&local_peer_id)
.iter()
.flat_map(|subnet| config.membership.members_of(subnet))
.filter(|peer| peer != &local_peer_id)
.filter_map(|peer| {
config
.addresses
.iter()
.find(|(p, _)| p == &peer)
.map(|(_, addr)| (peer, addr.clone()))
})
.for_each(|(peer, addr)| {
// Only dial if we haven't already connected to this peer.
if connected_peers.insert(peer) {
validator_swarm
.dial(addr)
.expect("Node should be able to dial peer in a subnet");
}
});
let task = overwatch_handle.runtime().spawn(validator_swarm.run());
let (sampling_broadcast_sender, sampling_broadcast_receiver) =
broadcast::channel(BROADCAST_CHANNEL_SIZE);

View File

@ -55,10 +55,13 @@ where
blob_id: <Self::Blob as Blob>::BlobId,
column_idx: ColumnIndex,
) -> Result<Option<Self::Blob>, DynError> {
let column_idx = column_idx.to_be_bytes();
let blob_idx = create_blob_idx(blob_id.as_ref(), &column_idx);
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
self.storage_relay
.send(StorageMsg::Load {
key: key_bytes(DA_VERIFIED_KEY_PREFIX, &blob_id),
key: key_bytes(DA_VERIFIED_KEY_PREFIX, blob_idx),
reply_channel: reply_tx,
})
.await
@ -68,7 +71,7 @@ where
let blob_bytes = load_blob(
self.settings.blob_storage_directory.clone(),
blob_id.as_ref(),
&column_idx.to_be_bytes(),
&column_idx,
)
.await?;
Ok(S::deserialize(blob_bytes)
@ -84,3 +87,13 @@ where
pub struct RocksAdapterSettings {
pub blob_storage_directory: PathBuf,
}
// Combines a 32-byte blob ID (`[u8; 32]`) with a 2-byte column index
// (`u16` represented as `[u8; 2]`).
fn create_blob_idx(blob_id: &[u8], column_idx: &[u8]) -> [u8; 34] {
let mut blob_idx = [0u8; 34];
blob_idx[..blob_id.len()].copy_from_slice(blob_id);
blob_idx[blob_id.len()..].copy_from_slice(column_idx);
blob_idx
}

View File

@ -26,6 +26,7 @@ nomos-log = { path = "../../log" }
nomos-network = { path = "../../network", features = ["mock"] }
nomos-libp2p = { path = "../../../nomos-libp2p" }
libp2p = { version = "0.53.2", features = ["ed25519"] }
once_cell = "1.19"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
rand = "0.8"

View File

@ -56,6 +56,7 @@ use nomos_network::NetworkService;
use nomos_node::{Tx, Wire};
use nomos_storage::backends::rocksdb::RocksBackend;
use nomos_storage::StorageService;
use once_cell::sync::Lazy;
use overwatch_derive::*;
use overwatch_rs::overwatch::{Overwatch, OverwatchRunner};
use overwatch_rs::services::handle::ServiceHandle;
@ -69,12 +70,18 @@ type IntegrationRng = TestRng;
/// Membership used by the DA Network service.
pub type NomosDaMembership = FillFromNodeList;
pub const PARAMS: DaEncoderParams = DaEncoderParams::default_with(2);
pub const ENCODER: DaEncoder = DaEncoder::new(PARAMS);
pub const GLOBAL_PARAMS_PATH: &str = "../../../tests/kzgrs/kzgrs_test_params";
pub const SK1: &str = "aca2c52f5928a53de79679daf390b0903eeccd9671b4350d49948d84334874806afe68536da9e076205a2af0af350e6c50851a040e3057b6544a29f5689ccd31";
pub const SK2: &str = "f9dc26eea8bc56d9a4c59841b438665b998ce5e42f49f832df5b770a725c2daafee53b33539127321f6f5085e42902bd380e82d18a7aff6404e632b842106785";
pub static PARAMS: Lazy<DaEncoderParams> = Lazy::new(|| {
let global_parameters =
kzgrs_backend::global::global_parameters_from_file(GLOBAL_PARAMS_PATH).unwrap();
DaEncoderParams::new(2, false, global_parameters)
});
pub static ENCODER: Lazy<DaEncoder> = Lazy::new(|| DaEncoder::new(PARAMS.clone()));
pub(crate) type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus<
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter<Tx, BlobInfo>,
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
@ -246,8 +253,8 @@ pub fn new_node(
num_samples: da_network_settings.num_samples,
num_subnets: da_network_settings.num_subnets,
// Sampling service period can't be zero.
old_blobs_check_interval: Duration::from_secs(1),
blobs_validity_duration: Duration::from_secs(15),
old_blobs_check_interval: Duration::from_secs(5),
blobs_validity_duration: Duration::from_secs(u64::MAX),
},
network_adapter_settings: (),
storage_adapter_settings: SamplingStorageSettings {

View File

@ -93,8 +93,8 @@ fn test_indexer() {
let blobs_dir = TempDir::new().unwrap().path().to_path_buf();
let (node1_sk, node1_pk) = generate_blst_hex_keys();
let (node2_sk, node2_pk) = generate_blst_hex_keys();
let (node1_sk, _) = generate_blst_hex_keys();
let (node2_sk, _) = generate_blst_hex_keys();
let (peer_sk_1, peer_id_1) = create_ed25519_sk_peerid(SK1);
let (peer_sk_2, peer_id_2) = create_ed25519_sk_peerid(SK2);
@ -119,7 +119,8 @@ fn test_indexer() {
vec![node_address(&swarm_config2)],
KzgrsDaVerifierSettings {
sk: node1_sk.clone(),
nodes_public_keys: vec![node1_pk.clone(), node2_pk.clone()],
index: [0].into(),
global_params_path: GLOBAL_PARAMS_PATH.into(),
},
TestDaNetworkSettings {
peer_addresses: peer_addresses.clone(),
@ -142,7 +143,8 @@ fn test_indexer() {
vec![node_address(&swarm_config1)],
KzgrsDaVerifierSettings {
sk: node2_sk.clone(),
nodes_public_keys: vec![node1_pk.clone(), node2_pk.clone()],
index: [1].into(),
global_params_path: GLOBAL_PARAMS_PATH.into(),
},
TestDaNetworkSettings {
peer_addresses,
@ -214,6 +216,17 @@ fn test_indexer() {
let blob_hash = <DaBlob as Blob>::id(&blobs[0]);
let blob_info = BlobInfo::new(blob_hash, meta);
let mut node_1_blob_0_idx = Vec::new();
node_1_blob_0_idx.extend_from_slice(&blob_hash);
node_1_blob_0_idx.extend_from_slice(&0u16.to_be_bytes());
let mut node_1_blob_1_idx = Vec::new();
node_1_blob_1_idx.extend_from_slice(&blob_hash);
node_1_blob_1_idx.extend_from_slice(&1u16.to_be_bytes());
let node_2_blob_0_idx = node_1_blob_0_idx.clone();
let node_2_blob_1_idx = node_1_blob_1_idx.clone();
// Test get Metadata for Certificate
let (app_id2, index2) = blob_info.metadata();
@ -226,12 +239,17 @@ fn test_indexer() {
node2.spawn(async move {
let storage_outbound = node2_storage.connect().await.unwrap();
// Mock attested blob by writting directly into the da storage.
let attested_key = key_bytes(DA_VERIFIED_KEY_PREFIX, blob_hash);
// Mock both attested blobs by writting directly into the da storage.
storage_outbound
.send(nomos_storage::StorageMsg::Store {
key: attested_key.into(),
key: key_bytes(DA_VERIFIED_KEY_PREFIX, node_2_blob_0_idx).into(),
value: Bytes::new(),
})
.await
.unwrap();
storage_outbound
.send(nomos_storage::StorageMsg::Store {
key: key_bytes(DA_VERIFIED_KEY_PREFIX, node_2_blob_1_idx).into(),
value: Bytes::new(),
})
.await
@ -256,12 +274,17 @@ fn test_indexer() {
Err(_) => None,
});
// Mock attested blob by writting directly into the da storage.
let attested_key = key_bytes(DA_VERIFIED_KEY_PREFIX, blob_hash);
// Mock both attested blobs by writting directly into the da storage.
storage_outbound
.send(nomos_storage::StorageMsg::Store {
key: attested_key.into(),
key: key_bytes(DA_VERIFIED_KEY_PREFIX, node_1_blob_0_idx).into(),
value: Bytes::new(),
})
.await
.unwrap();
storage_outbound
.send(nomos_storage::StorageMsg::Store {
key: key_bytes(DA_VERIFIED_KEY_PREFIX, node_1_blob_1_idx).into(),
value: Bytes::new(),
})
.await

View File

@ -74,8 +74,8 @@ fn test_verifier() {
let blobs_dir = TempDir::new().unwrap().path().to_path_buf();
let (node1_sk, node1_pk) = generate_blst_hex_keys();
let (node2_sk, node2_pk) = generate_blst_hex_keys();
let (node1_sk, _) = generate_blst_hex_keys();
let (node2_sk, _) = generate_blst_hex_keys();
let client_zone = new_client(NamedTempFile::new().unwrap().path().to_path_buf());
@ -102,7 +102,8 @@ fn test_verifier() {
vec![node_address(&swarm_config2)],
KzgrsDaVerifierSettings {
sk: node1_sk.clone(),
nodes_public_keys: vec![node1_pk.clone(), node2_pk.clone()],
index: [0].into(),
global_params_path: GLOBAL_PARAMS_PATH.into(),
},
TestDaNetworkSettings {
peer_addresses: peer_addresses.clone(),
@ -125,7 +126,8 @@ fn test_verifier() {
vec![node_address(&swarm_config1)],
KzgrsDaVerifierSettings {
sk: node2_sk,
nodes_public_keys: vec![node1_pk, node2_pk],
index: [1].into(),
global_params_path: GLOBAL_PARAMS_PATH.into(),
},
TestDaNetworkSettings {
peer_addresses,
@ -155,7 +157,10 @@ fn test_verifier() {
// Encode data
let encoder = &ENCODER;
let data = rand_data(10);
let data = vec![
49u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
];
let encoded_data = encoder.encode(&data).unwrap();
let columns: Vec<_> = encoded_data.extended_data.columns().collect();

View File

@ -1,8 +1,12 @@
// std
use core::fmt;
use std::collections::HashSet;
// crates
use blst::{min_sig::PublicKey, min_sig::SecretKey};
use kzgrs_backend::{common::blob::DaBlob, verifier::DaVerifier as NomosKzgrsVerifier};
use blst::min_sig::SecretKey;
use kzgrs_backend::{
common::blob::DaBlob, global::global_parameters_from_file,
verifier::DaVerifier as NomosKzgrsVerifier,
};
use nomos_core::da::DaVerifier;
use serde::{Deserialize, Serialize};
// internal
@ -34,19 +38,10 @@ impl VerifierBackend for KzgrsDaVerifier {
let bytes = hex::decode(settings.sk).expect("Secret key string should decode to bytes");
let secret_key =
SecretKey::from_bytes(&bytes).expect("Secret key should be reconstructed from bytes");
let global_params = global_parameters_from_file(&settings.global_params_path)
.expect("Global parameters has to be loaded from file");
let nodes_public_keys = settings
.nodes_public_keys
.iter()
.map(|pk_hex| {
let pk_bytes =
hex::decode(pk_hex).expect("Public key string should decode to bytes");
PublicKey::from_bytes(&pk_bytes)
.expect("Public key should be reconstructed from bytes")
})
.collect::<Vec<PublicKey>>();
let verifier = NomosKzgrsVerifier::new(secret_key, &nodes_public_keys);
let verifier = NomosKzgrsVerifier::new(secret_key, settings.index, global_params);
Self { verifier }
}
}
@ -71,5 +66,6 @@ impl DaVerifier for KzgrsDaVerifier {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KzgrsDaVerifierSettings {
pub sk: String,
pub nodes_public_keys: Vec<String>,
pub index: HashSet<u32>,
pub global_params_path: String,
}

View File

@ -2,27 +2,27 @@ pub mod backend;
pub mod network;
pub mod storage;
use nomos_core::da::blob::Blob;
// std
use nomos_storage::StorageService;
use overwatch_rs::services::life_cycle::LifecycleMessage;
use serde::{Deserialize, Serialize};
use std::error::Error;
use std::fmt::{Debug, Formatter};
use storage::DaStorageAdapter;
use tokio::sync::oneshot::Sender;
// crates
use nomos_core::da::blob::Blob;
use nomos_da_network_service::NetworkService;
use nomos_storage::StorageService;
use overwatch_rs::services::handle::ServiceStateHandle;
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::relay::{Relay, RelayMessage};
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::DynError;
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot::Sender;
use tokio_stream::StreamExt;
use tracing::{error, span, Instrument, Level};
// internal
use backend::VerifierBackend;
use network::NetworkAdapter;
use nomos_da_network_service::NetworkService;
use overwatch_rs::services::handle::ServiceStateHandle;
use overwatch_rs::services::relay::{Relay, RelayMessage};
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::DynError;
use storage::DaStorageAdapter;
const DA_VERIFIER_TAG: ServiceId = "DA-Verifier";
pub enum DaVerifierMsg<B, A> {
@ -75,7 +75,11 @@ where
storage_adapter: &S,
blob: &Backend::DaBlob,
) -> Result<(), DynError> {
if storage_adapter.get_blob(blob.id()).await?.is_some() {
if storage_adapter
.get_blob(blob.id(), blob.column_idx())
.await?
.is_some()
{
Ok(())
} else {
verifier.verify(blob)?;

View File

@ -83,8 +83,10 @@ where
async fn get_blob(
&self,
blob_idx: <Self::Blob as Blob>::BlobId,
blob_id: <Self::Blob as Blob>::BlobId,
column_idx: <Self::Blob as Blob>::ColumnIndex,
) -> Result<Option<Self::Attestation>, DynError> {
let blob_idx = create_blob_idx(blob_id.as_ref(), column_idx.as_ref());
let key = key_bytes(DA_VERIFIED_KEY_PREFIX, blob_idx);
let (reply_channel, reply_rx) = tokio::sync::oneshot::channel();
self.storage_relay

View File

@ -28,5 +28,6 @@ pub trait DaStorageAdapter {
async fn get_blob(
&self,
blob_id: <Self::Blob as Blob>::BlobId,
column_idx: <Self::Blob as Blob>::ColumnIndex,
) -> Result<Option<Self::Attestation>, DynError>;
}

View File

@ -6,11 +6,12 @@ use async_trait::async_trait;
use bytes::Bytes;
pub use rocksdb::Error;
use rocksdb::{Options, DB};
use serde::{Deserialize, Serialize};
// internal
use super::{StorageBackend, StorageSerde, StorageTransaction};
/// Rocks backend setting
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RocksBackendSettings {
/// File path to the db file
pub db_path: PathBuf,

View File

@ -22,6 +22,7 @@ nomos-da-network-service = { path = "../nomos-services/data-availability/network
nomos-da-indexer = { path = "../nomos-services/data-availability/indexer" }
nomos-da-verifier = { path = "../nomos-services/data-availability/verifier" }
nomos-da-sampling = { path = "../nomos-services/data-availability/sampling" }
nomos-storage = { path = "../nomos-services/storage" }
subnetworks-assignations = { path = "../nomos-da/network/subnetworks-assignations" }
full-replication = { path = "../nomos-da/full-replication" }
hex = "0.4.3"

Binary file not shown.

View File

@ -17,6 +17,16 @@ use rand::{thread_rng, Rng};
static NET_PORT: Lazy<Mutex<u16>> = Lazy::new(|| Mutex::new(thread_rng().gen_range(8000..10000)));
static IS_SLOW_TEST_ENV: Lazy<bool> =
Lazy::new(|| env::var("SLOW_TEST_ENV").is_ok_and(|s| s == "true"));
pub static GLOBAL_PARAMS_PATH: Lazy<String> = Lazy::new(|| {
let relative_path = "./kzgrs/kzgrs_test_params";
let current_dir = env::current_dir().expect("Failed to get current directory");
current_dir
.join(relative_path)
.canonicalize()
.expect("Failed to resolve absolute path")
.to_string_lossy()
.to_string()
});
pub fn get_available_port() -> u16 {
let mut port = NET_PORT.lock().unwrap();
@ -49,8 +59,12 @@ pub trait Node: Sized {
}
fn node_configs(config: SpawnConfig) -> Vec<Config> {
match config {
SpawnConfig::Star { consensus, da } => {
let mut configs = Self::create_node_configs(consensus, da);
SpawnConfig::Star {
consensus,
da,
test,
} => {
let mut configs = Self::create_node_configs(consensus, da, test);
let next_leader_config = configs.remove(0);
let first_node_addr = node_address(&next_leader_config);
let mut node_configs = vec![next_leader_config];
@ -64,8 +78,12 @@ pub trait Node: Sized {
}
node_configs
}
SpawnConfig::Chain { consensus, da } => {
let mut configs = Self::create_node_configs(consensus, da);
SpawnConfig::Chain {
consensus,
da,
test,
} => {
let mut configs = Self::create_node_configs(consensus, da, test);
let next_leader_config = configs.remove(0);
let mut prev_node_addr = node_address(&next_leader_config);
let mut node_configs = vec![next_leader_config];
@ -79,7 +97,11 @@ pub trait Node: Sized {
}
}
}
fn create_node_configs(consensus: ConsensusConfig, da: DaConfig) -> Vec<Config>;
fn create_node_configs(
consensus: ConsensusConfig,
da: DaConfig,
test: TestConfig,
) -> Vec<Config>;
async fn consensus_info(&self) -> Self::ConsensusInfo;
fn stop(&mut self);
}
@ -90,17 +112,19 @@ pub enum SpawnConfig {
Star {
consensus: ConsensusConfig,
da: DaConfig,
test: TestConfig,
},
// Chain topology: Every node is chained to the node next to it.
Chain {
consensus: ConsensusConfig,
da: DaConfig,
test: TestConfig,
},
}
impl SpawnConfig {
// Returns a SpawnConfig::Chain with proper configurations for happy-path tests
pub fn chain_happy(n_participants: usize, da: DaConfig) -> Self {
pub fn chain_happy(n_participants: usize, da: DaConfig, test: TestConfig) -> Self {
Self::Chain {
consensus: ConsensusConfig {
n_participants,
@ -112,10 +136,11 @@ impl SpawnConfig {
active_slot_coeff: 0.9,
},
da,
test,
}
}
pub fn star_happy(n_participants: usize, da: DaConfig) -> Self {
pub fn star_happy(n_participants: usize, da: DaConfig, test: TestConfig) -> Self {
Self::Star {
consensus: ConsensusConfig {
n_participants,
@ -127,6 +152,7 @@ impl SpawnConfig {
active_slot_coeff: 0.9,
},
da,
test,
}
}
}
@ -154,6 +180,7 @@ pub struct DaConfig {
pub num_subnets: u16,
pub old_blobs_check_interval: Duration,
pub blobs_validity_duration: Duration,
pub global_params_path: String,
}
impl Default for DaConfig {
@ -165,7 +192,21 @@ impl Default for DaConfig {
num_samples: 1,
num_subnets: 2,
old_blobs_check_interval: Duration::from_secs(5),
blobs_validity_duration: Duration::from_secs(15),
blobs_validity_duration: Duration::from_secs(u64::MAX),
global_params_path: GLOBAL_PARAMS_PATH.to_string(),
}
}
}
#[derive(Clone)]
pub struct TestConfig {
pub wait_online_secs: u64,
}
impl Default for TestConfig {
fn default() -> Self {
Self {
wait_online_secs: 10,
}
}
}

View File

@ -1,11 +1,10 @@
// std
use std::net::SocketAddr;
use std::ops::Range;
use std::process::{Child, Command, Stdio};
use std::str::FromStr;
use std::time::Duration;
// internal
use super::{create_tempdir, persist_tempdir, LOGS_PREFIX};
use crate::{adjust_timeout, get_available_port, ConsensusConfig, DaConfig, Node};
// crates
use blst::min_sig::SecretKey;
use cl::{InputWitness, NoteWitness, NullifierSecret};
use cryptarchia_consensus::{CryptarchiaInfo, CryptarchiaSettings, TimeConfig};
@ -23,7 +22,9 @@ use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as Indexe
use nomos_da_indexer::IndexerSettings;
use nomos_da_network_service::backends::libp2p::validator::DaNetworkValidatorBackendSettings;
use nomos_da_network_service::NetworkConfig as DaNetworkConfig;
use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings;
use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as SamplingStorageAdapterSettings;
use nomos_da_sampling::DaSamplingServiceSettings;
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings;
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageAdapterSettings;
use nomos_da_verifier::DaVerifierServiceSettings;
@ -34,21 +35,25 @@ use nomos_mempool::MempoolMetrics;
use nomos_network::backends::libp2p::mixnet::MixnetConfig;
use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig};
use nomos_node::{api::AxumBackendSettings, Config, Tx};
// crates
use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings;
use nomos_da_sampling::DaSamplingServiceSettings;
use nomos_storage::backends::rocksdb::RocksBackendSettings;
use once_cell::sync::Lazy;
use rand::{thread_rng, Rng};
use reqwest::{Client, Url};
use serde::{Deserialize, Serialize};
use subnetworks_assignations::versions::v1::FillFromNodeList;
use subnetworks_assignations::MembershipHandler;
use tempfile::NamedTempFile;
use time::OffsetDateTime;
// internal
use super::{create_tempdir, persist_tempdir, LOGS_PREFIX};
use crate::{adjust_timeout, get_available_port, ConsensusConfig, DaConfig, Node, TestConfig};
static CLIENT: Lazy<Client> = Lazy::new(Client::new);
const CRYPTARCHIA_INFO_API: &str = "cryptarchia/info";
const GET_HEADERS_INFO: &str = "cryptarchia/headers";
const NOMOS_BIN: &str = "../target/debug/nomos-node";
const STORAGE_BLOCKS_API: &str = "storage/block";
const INDEXER_RANGE_API: &str = "da/get_range";
const DEFAULT_SLOT_TIME: u64 = 2;
const CONSENSUS_SLOT_TIME_VAR: &str = "CONSENSUS_SLOT_TIME";
#[cfg(feature = "mixnet")]
@ -81,6 +86,7 @@ impl NomosNode {
let dir = create_tempdir().unwrap();
let mut file = NamedTempFile::new().unwrap();
let config_path = file.path().to_owned();
let wait_online_secs = config.wait_online_secs;
// setup logging so that we can intercept it later in testing
config.log.backend = LoggerBackend::File {
@ -89,6 +95,17 @@ impl NomosNode {
};
config.log.format = LoggerFormat::Json;
config.storage.db_path = dir.path().join("db");
config
.da_sampling
.storage_adapter_settings
.blob_storage_directory = dir.path().to_owned();
config
.da_verifier
.storage_adapter_settings
.blob_storage_directory = dir.path().to_owned();
config.da_indexer.storage.blob_storage_directory = dir.path().to_owned();
serde_yaml::to_writer(&mut file, &config).unwrap();
let child = Command::new(std::env::current_dir().unwrap().join(NOMOS_BIN))
.arg(&config_path)
@ -102,9 +119,10 @@ impl NomosNode {
_tempdir: dir,
config,
};
tokio::time::timeout(adjust_timeout(Duration::from_secs(10)), async {
node.wait_online().await
})
tokio::time::timeout(
adjust_timeout(Duration::from_secs(wait_online_secs)),
async { node.wait_online().await },
)
.await
.unwrap();
@ -164,6 +182,23 @@ impl NomosNode {
}
}
pub async fn get_indexer_range(
&self,
app_id: [u8; 32],
range: Range<[u8; 8]>,
) -> Vec<([u8; 8], Vec<Vec<u8>>)> {
CLIENT
.post(format!("http://{}/{}", self.addr, INDEXER_RANGE_API))
.header("Content-Type", "application/json")
.body(serde_json::to_string(&GetRangeReq { app_id, range }).unwrap())
.send()
.await
.unwrap()
.json::<Vec<([u8; 8], Vec<Vec<u8>>)>>()
.await
.unwrap()
}
// not async so that we can use this in `Drop`
pub fn get_logs_from_file(&self) -> String {
println!(
@ -231,7 +266,11 @@ impl Node for NomosNode {
/// so the leader can receive votes from all other nodes that will be subsequently spawned.
/// If not, the leader will miss votes from nodes spawned before itself.
/// This issue will be resolved by devising the block catch-up mechanism in the future.
fn create_node_configs(consensus: ConsensusConfig, da: DaConfig) -> Vec<Config> {
fn create_node_configs(
consensus: ConsensusConfig,
da: DaConfig,
test: TestConfig,
) -> Vec<Config> {
// we use the same random bytes for:
// * da id
// * coin sk
@ -290,6 +329,7 @@ impl Node for NomosNode {
vec![coin],
time_config.clone(),
da.clone(),
test.wait_online_secs,
#[cfg(feature = "mixnet")]
MixnetConfig {
mixclient: mixclient_config.clone(),
@ -305,8 +345,12 @@ impl Node for NomosNode {
peer_ids.extend(da.executor_peer_ids);
for config in &mut configs {
config.da_network.backend.membership =
let membership =
FillFromNodeList::new(&peer_ids, da.subnetwork_size, da.dispersal_factor);
let local_peer_id = secret_key_to_peer_id(config.da_network.backend.node_key.clone());
let subnetwork_ids = membership.membership(&local_peer_id);
config.da_verifier.verifier_settings.index = subnetwork_ids;
config.da_network.backend.membership = membership;
config.da_network.backend.addresses = peer_addresses.clone();
}
@ -335,6 +379,12 @@ pub enum Pool {
Cl,
}
#[derive(Serialize, Deserialize)]
struct GetRangeReq {
pub app_id: [u8; 32],
pub range: Range<[u8; 8]>,
}
#[cfg(feature = "mixnet")]
fn create_mixnet_config(ids: &[[u8; 32]]) -> (MixClientConfig, Vec<MixNodeConfig>) {
use std::num::NonZeroU8;
@ -401,6 +451,7 @@ fn build_da_peer_list(configs: &[Config]) -> Vec<(PeerId, Multiaddr)> {
.collect()
}
#[allow(clippy::too_many_arguments)]
fn create_node_config(
id: [u8; 32],
genesis_state: LedgerState,
@ -408,12 +459,12 @@ fn create_node_config(
notes: Vec<InputWitness>,
time: TimeConfig,
da_config: DaConfig,
wait_online_secs: u64,
#[cfg(feature = "mixnet")] mixnet_config: MixnetConfig,
) -> Config {
let swarm_config: SwarmConfig = Default::default();
let verifier_sk = SecretKey::key_gen(&id, &[]).unwrap();
let verifier_pk_bytes = verifier_sk.sk_to_pk().to_bytes();
let verifier_sk_bytes = verifier_sk.to_bytes();
let mut config = Config {
@ -453,7 +504,8 @@ fn create_node_config(
da_verifier: DaVerifierServiceSettings {
verifier_settings: KzgrsDaVerifierSettings {
sk: hex::encode(verifier_sk_bytes),
nodes_public_keys: vec![hex::encode(verifier_pk_bytes)],
index: Default::default(),
global_params_path: da_config.global_params_path,
},
network_adapter_settings: (),
storage_adapter_settings: VerifierStorageAdapterSettings {
@ -481,6 +533,12 @@ fn create_node_config(
},
network_adapter_settings: (),
},
storage: RocksBackendSettings {
db_path: "./db".into(),
read_only: false,
column_family: Some("blocks".into()),
},
wait_online_secs,
};
config.network.backend.inner.port = get_available_port();

View File

@ -7,12 +7,13 @@ use nomos_libp2p::libp2p;
use nomos_libp2p::Multiaddr;
use nomos_libp2p::PeerId;
use std::collections::HashMap;
use std::io::Write;
use std::time::Duration;
use subnetworks_assignations::versions::v1::FillFromNodeList;
use tempfile::NamedTempFile;
use tests::nodes::NomosNode;
use tests::Node;
use tests::SpawnConfig;
use tests::GLOBAL_PARAMS_PATH;
const CLI_BIN: &str = "../target/debug/nomos-cli";
const APP_ID: &str = "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715";
@ -32,8 +33,12 @@ fn run_disseminate(disseminate: &Disseminate) {
.arg(disseminate.columns.to_string())
.arg("--timeout")
.arg(disseminate.timeout.to_string())
.arg("--wait-until-disseminated")
.arg(disseminate.wait_until_disseminated.to_string())
.arg("--node-addr")
.arg(disseminate.node_addr.as_ref().unwrap().as_str());
.arg(disseminate.node_addr.as_ref().unwrap().as_str())
.arg("--global-params-path")
.arg(GLOBAL_PARAMS_PATH.to_string());
match (&disseminate.data, &disseminate.file) {
(Some(data), None) => c.args(["--data", &data]),
@ -98,10 +103,11 @@ async fn disseminate(nodes: &Vec<NomosNode>, config: &mut Disseminate) {
}
#[tokio::test]
async fn disseminate_blob() {
async fn disseminate_and_retrieve() {
let mut config = Disseminate {
data: Some("hello world".to_string()),
timeout: 180,
timeout: 60,
wait_until_disseminated: 5,
app_id: APP_ID.into(),
index: 0,
columns: 2,
@ -112,64 +118,48 @@ async fn disseminate_blob() {
2,
tests::DaConfig {
dispersal_factor: 2,
subnetwork_size: 2,
num_subnets: 2,
..Default::default()
},
tests::TestConfig {
wait_online_secs: 50,
},
))
.await;
disseminate(&nodes, &mut config).await;
}
#[tokio::test]
async fn disseminate_big_blob() {
const MSG_SIZE: usize = 1024;
let mut config = Disseminate {
data: std::iter::repeat(String::from("X"))
.take(MSG_SIZE)
.collect::<Vec<_>>()
.join("")
.into(),
timeout: 180,
app_id: APP_ID.into(),
index: 0,
columns: 2,
..Default::default()
};
let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy(
2,
tests::DaConfig {
dispersal_factor: 2,
..Default::default()
},
))
.await;
disseminate(&nodes, &mut config).await;
}
#[tokio::test]
async fn disseminate_blob_from_file() {
let mut file = NamedTempFile::new().unwrap();
file.write_all("hello world".as_bytes()).unwrap();
let mut config = Disseminate {
file: Some(file.path().to_path_buf()),
timeout: 180,
app_id: APP_ID.into(),
index: 0,
columns: 2,
..Default::default()
};
let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy(
4,
tests::DaConfig {
dispersal_factor: 2,
..Default::default()
},
))
.await;
disseminate(&nodes, &mut config).await;
tokio::time::sleep(Duration::from_secs(10)).await;
let from = 0u64.to_be_bytes();
let to = 1u64.to_be_bytes();
let app_id = hex::decode(APP_ID).unwrap();
let node1_blobs = nodes[0]
.get_indexer_range(app_id.clone().try_into().unwrap(), from..to)
.await;
let node2_blobs = nodes[1]
.get_indexer_range(app_id.try_into().unwrap(), from..to)
.await;
let node1_idx_0_blobs: Vec<_> = node1_blobs
.iter()
.filter(|(i, _)| i == &from)
.flat_map(|(_, blobs)| blobs)
.collect();
let node2_idx_0_blobs: Vec<_> = node2_blobs
.iter()
.filter(|(i, _)| i == &from)
.flat_map(|(_, blobs)| blobs)
.collect();
// Index zero shouldn't be empty, node 2 replicated both blobs to node 1 because they both
// are in the same subnetwork.
for b in node1_idx_0_blobs.iter() {
assert!(!b.is_empty())
}
for b in node2_idx_0_blobs.iter() {
assert!(!b.is_empty())
}
}

View File

@ -52,6 +52,11 @@ async fn happy_test(nodes: &[NomosNode]) {
#[tokio::test]
async fn two_nodes_happy() {
let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy(2, Default::default())).await;
let nodes = NomosNode::spawn_nodes(SpawnConfig::star_happy(
2,
Default::default(),
Default::default(),
))
.await;
happy_test(&nodes).await;
}