From a13f8611ee5b1a5645ade640716a6dbe3fbb95ee Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Tue, 3 Sep 2024 17:02:49 +0200 Subject: [PATCH] DA: Consensus sampling (#708) * Add sampling relay to consensus and massage all generics * Pipe in sampling filtering of blob info * Add mark in block * Pipe validate block * Refactor mark_in_block -> mark_complete * Fix generics on tests * Fix generics on tests * Fix rebase * Cargo fmt after rebase * Sampling service configuration * Sampling service config in indexer integration tests --------- Co-authored-by: Gusto --- nodes/nomos-node/Cargo.toml | 3 + nodes/nomos-node/src/api.rs | 168 +++++++++++- nodes/nomos-node/src/config.rs | 1 + nodes/nomos-node/src/lib.rs | 39 ++- nodes/nomos-node/src/main.rs | 1 + nomos-core/src/block/builder.rs | 2 +- .../src/versions/v2.rs | 4 +- nomos-services/api/Cargo.toml | 8 +- .../api/src/http/consensus/cryptarchia.rs | 48 +++- nomos-services/api/src/http/da.rs | 35 ++- .../cryptarchia-consensus/Cargo.toml | 1 + .../cryptarchia-consensus/src/lib.rs | 245 ++++++++++++++++-- .../data-availability/indexer/Cargo.toml | 4 +- .../data-availability/indexer/src/lib.rs | 91 ++++++- .../sampling/src/backend/kzgrs.rs | 33 +-- .../sampling/src/backend/mod.rs | 6 +- .../data-availability/sampling/src/lib.rs | 15 +- .../sampling/src/network/adapters/libp2p.rs | 3 +- .../data-availability/tests/Cargo.toml | 6 +- .../data-availability/tests/src/common.rs | 12 + .../tests/src/indexer_integration.rs | 21 ++ tests/Cargo.toml | 1 + tests/src/nodes/nomos.rs | 16 ++ 23 files changed, 678 insertions(+), 85 deletions(-) diff --git a/nodes/nomos-node/Cargo.toml b/nodes/nomos-node/Cargo.toml index c1c922de..940ab309 100644 --- a/nodes/nomos-node/Cargo.toml +++ b/nodes/nomos-node/Cargo.toml @@ -22,6 +22,7 @@ overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d tracing = "0.1" multiaddr = "0.18" nomos-core = { path = "../../nomos-core" } +nomos-da-sampling = { path = "../../nomos-services/data-availability/sampling" } nomos-da-verifier = { path = "../../nomos-services/data-availability/verifier", features = ["rocksdb-backend", "libp2p"] } nomos-da-indexer = { path = "../../nomos-services/data-availability/indexer", features = ["rocksdb-backend"] } nomos-da-network-service = { path = "../../nomos-services/data-availability/network" } @@ -40,6 +41,8 @@ nomos-system-sig = { path = "../../nomos-services/system-sig" } tracing-subscriber = "0.3" cryptarchia-engine = { path = "../../consensus/cryptarchia-engine" } cryptarchia-ledger = { path = "../../ledger/cryptarchia-ledger" } +rand = "0.8" +rand_chacha = "0.3" tokio = { version = "1.24", features = ["sync"] } serde_json = "1.0" serde_yaml = "0.9" diff --git a/nodes/nomos-node/src/api.rs b/nodes/nomos-node/src/api.rs index 5492ea81..80a45571 100644 --- a/nodes/nomos-node/src/api.rs +++ b/nodes/nomos-node/src/api.rs @@ -21,6 +21,7 @@ use nomos_core::da::blob::info::DispersedBlobInfo; use nomos_core::da::blob::metadata::Metadata; use nomos_core::da::DaVerifier as CoreDaVerifier; use nomos_core::{da::blob::Blob, header::HeaderId, tx::Transaction}; +use nomos_da_sampling::backend::DaSamplingServiceBackend; use nomos_da_verifier::backend::VerifierBackend; use nomos_mempool::{ network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter, @@ -29,6 +30,7 @@ use nomos_mempool::{ use nomos_network::backends::libp2p::Libp2p as NetworkBackend; use nomos_storage::backends::StorageSerde; use overwatch_rs::overwatch::handle::OverwatchHandle; +use rand::{RngCore, SeedableRng}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tower_http::{ cors::{Any, CorsLayer}, @@ -47,7 +49,19 @@ pub struct AxumBackendSettings { pub cors_origins: Vec, } -pub struct AxumBackend { +pub struct AxumBackend< + A, + B, + C, + V, + VB, + T, + S, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + const SIZE: usize, +> { settings: AxumBackendSettings, _attestation: core::marker::PhantomData, _blob: core::marker::PhantomData, @@ -56,6 +70,9 @@ pub struct AxumBackend { _verifier_backend: core::marker::PhantomData, _tx: core::marker::PhantomData, _storage_serde: core::marker::PhantomData, + _sampling_backend: core::marker::PhantomData, + _sampling_network_adapter: core::marker::PhantomData, + _sampling_rng: core::marker::PhantomData, } #[derive(OpenApi)] @@ -72,7 +89,32 @@ pub struct AxumBackend { struct ApiDoc; #[async_trait::async_trait] -impl Backend for AxumBackend +impl< + A, + B, + C, + V, + VB, + T, + S, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + const SIZE: usize, + > Backend + for AxumBackend< + A, + B, + C, + V, + VB, + T, + S, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + SIZE, + > where A: Serialize + DeserializeOwned + Clone + Send + Sync + 'static, B: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static, @@ -119,6 +161,12 @@ where ::Hash: Serialize + for<'de> Deserialize<'de> + std::cmp::Ord + Debug + Send + Sync + 'static, S: StorageSerde + Send + Sync + 'static, + SamplingRng: SeedableRng + RngCore + Send + 'static, + SamplingBackend: DaSamplingServiceBackend + Send + 'static, + SamplingBackend::Settings: Clone, + SamplingBackend::Blob: Debug + 'static, + SamplingBackend::BlobId: Debug + 'static, + SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter + Send + 'static, { type Error = hyper::Error; type Settings = AxumBackendSettings; @@ -136,6 +184,9 @@ where _verifier_backend: core::marker::PhantomData, _tx: core::marker::PhantomData, _storage_serde: core::marker::PhantomData, + _sampling_backend: core::marker::PhantomData, + _sampling_network_adapter: core::marker::PhantomData, + _sampling_rng: core::marker::PhantomData, }) } @@ -166,16 +217,45 @@ where .route("/cl/status", routing::post(cl_status::)) .route( "/cryptarchia/info", - routing::get(cryptarchia_info::), + routing::get( + cryptarchia_info::< + T, + S, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + SIZE, + >, + ), ) .route( "/cryptarchia/headers", - routing::get(cryptarchia_headers::), + routing::get( + cryptarchia_headers::< + T, + S, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + SIZE, + >, + ), ) .route("/da/add_blob", routing::post(add_blob::)) .route( "/da/get_range", - routing::post(get_range::), + routing::post( + get_range::< + T, + C, + V, + S, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + SIZE, + >, + ), ) .route("/network/info", routing::get(libp2p_info)) .route("/storage/block", routing::post(block::)) @@ -263,7 +343,14 @@ struct QueryParams { (status = 500, description = "Internal server error", body = String), ) )] -async fn cryptarchia_info( +async fn cryptarchia_info< + Tx, + SS, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + const SIZE: usize, +>( State(handle): State, ) -> Response where @@ -279,8 +366,21 @@ where + 'static, ::Hash: std::cmp::Ord + Debug + Send + Sync + 'static, SS: StorageSerde + Send + Sync + 'static, + SamplingRng: SeedableRng + RngCore, + SamplingBackend: DaSamplingServiceBackend + Send, + SamplingBackend::Settings: Clone, + SamplingBackend::Blob: Debug + 'static, + SamplingBackend::BlobId: Debug + 'static, + SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, { - make_request_and_return_response!(consensus::cryptarchia_info::(&handle)) + make_request_and_return_response!(consensus::cryptarchia_info::< + Tx, + SS, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + SIZE, + >(&handle)) } #[utoipa::path( @@ -291,7 +391,14 @@ where (status = 500, description = "Internal server error", body = String), ) )] -async fn cryptarchia_headers( +async fn cryptarchia_headers< + Tx, + SS, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + const SIZE: usize, +>( State(store): State, Query(query): Query, ) -> Response @@ -308,11 +415,22 @@ where + 'static, ::Hash: std::cmp::Ord + Debug + Send + Sync + 'static, SS: StorageSerde + Send + Sync + 'static, + SamplingRng: SeedableRng + RngCore, + SamplingBackend: DaSamplingServiceBackend + Send, + SamplingBackend::Settings: Clone, + SamplingBackend::Blob: Debug + 'static, + SamplingBackend::BlobId: Debug + 'static, + SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, { let QueryParams { from, to } = query; - make_request_and_return_response!(consensus::cryptarchia_headers::( - &store, from, to - )) + make_request_and_return_response!(consensus::cryptarchia_headers::< + Tx, + SS, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + SIZE, + >(&store, from, to)) } #[utoipa::path( @@ -358,7 +476,16 @@ where (status = 500, description = "Internal server error", body = String), ) )] -async fn get_range( +async fn get_range< + Tx, + C, + V, + SS, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + const SIZE: usize, +>( State(handle): State, Json(GetRangeReq { app_id, range }): Json>, ) -> Response @@ -400,8 +527,23 @@ where ::Index: AsRef<[u8]> + Clone + Serialize + DeserializeOwned + PartialOrd + Send + Sync, SS: StorageSerde + Send + Sync + 'static, + SamplingRng: SeedableRng + RngCore, + SamplingBackend: DaSamplingServiceBackend + Send, + SamplingBackend::Settings: Clone, + SamplingBackend::Blob: Debug + 'static, + SamplingBackend::BlobId: Debug + 'static, + SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, { - make_request_and_return_response!(da::get_range::(&handle, app_id, range)) + make_request_and_return_response!(da::get_range::< + Tx, + C, + V, + SS, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + SIZE, + >(&handle, app_id, range)) } #[utoipa::path( diff --git a/nodes/nomos-node/src/config.rs b/nodes/nomos-node/src/config.rs index 676c822b..e06b82be 100644 --- a/nodes/nomos-node/src/config.rs +++ b/nodes/nomos-node/src/config.rs @@ -123,6 +123,7 @@ pub struct Config { > as ServiceData>::Settings, pub da_indexer: ::Settings, pub da_verifier: ::Settings, + pub da_sampling: ::Settings, pub http: ::Settings, pub cryptarchia: ::Settings, } diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index 03d49b5b..f173d98c 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -21,6 +21,9 @@ use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapter as IndexerStorage use nomos_da_indexer::DataIndexerService; use nomos_da_network_service::backends::libp2p::validator::DaNetworkValidatorBackend; use nomos_da_network_service::NetworkService as DaNetworkService; +use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackend; +use nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter as SamplingLibp2pAdapter; +use nomos_da_sampling::DaSamplingService; use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier; use nomos_da_verifier::network::adapters::libp2p::Libp2pAdapter as VerifierNetworkAdapter; use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapter as VerifierStorageAdapter; @@ -41,13 +44,30 @@ use nomos_storage::{ use nomos_system_sig::SystemSig; use overwatch_derive::*; use overwatch_rs::services::handle::ServiceHandle; +use rand_chacha::ChaCha20Rng; use serde::{de::DeserializeOwned, Serialize}; use subnetworks_assignations::versions::v1::FillFromNodeList; // internal pub use tx::Tx; -pub type NomosApiService = - ApiService>; +/// Membership used by the DA Network service. +pub type NomosDaMembership = FillFromNodeList; + +pub type NomosApiService = ApiService< + AxumBackend< + (), + DaBlob, + BlobInfo, + BlobInfo, + KzgrsDaVerifier, + Tx, + Wire, + KzgrsSamplingBackend, + nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter, + ChaCha20Rng, + MB16, + >, +>; pub const CL_TOPIC: &str = "cl"; pub const DA_TOPIC: &str = "da"; @@ -62,6 +82,9 @@ pub type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus< FillSizeWithTx, FillSizeWithBlobs, RocksBackend, + KzgrsSamplingBackend, + nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter, + ChaCha20Rng, >; pub type TxMempool = TxMempoolService< @@ -88,6 +111,15 @@ pub type DaIndexer = DataIndexerService< FillSizeWithTx, FillSizeWithBlobs, RocksBackend, + KzgrsSamplingBackend, + nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter, + ChaCha20Rng, +>; + +pub type DaSampling = DaSamplingService< + KzgrsSamplingBackend, + SamplingLibp2pAdapter, + ChaCha20Rng, >; pub type DaVerifier = DaVerifierService< @@ -103,7 +135,8 @@ pub struct Nomos { network: ServiceHandle>, da_indexer: ServiceHandle, da_verifier: ServiceHandle, - da_network: ServiceHandle>>, + da_sampling: ServiceHandle, + da_network: ServiceHandle>>, cl_mempool: ServiceHandle, da_mempool: ServiceHandle, cryptarchia: ServiceHandle, diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index 3268efcf..75c90256 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -92,6 +92,7 @@ fn main() -> Result<()> { }, da_network: config.da_network, da_indexer: config.da_indexer, + da_sampling: config.da_sampling, da_verifier: config.da_verifier, cryptarchia: config.cryptarchia, #[cfg(feature = "metrics")] diff --git a/nomos-core/src/block/builder.rs b/nomos-core/src/block/builder.rs index 4107ac9f..917f26ab 100644 --- a/nomos-core/src/block/builder.rs +++ b/nomos-core/src/block/builder.rs @@ -100,7 +100,7 @@ where } #[must_use] - pub fn with_blobs_certificates( + pub fn with_blobs_info( mut self, blobs_certificates: impl Iterator + 'static, ) -> Self { diff --git a/nomos-da/network/subnetworks-assignations/src/versions/v2.rs b/nomos-da/network/subnetworks-assignations/src/versions/v2.rs index d60ce2e1..1babfedc 100644 --- a/nomos-da/network/subnetworks-assignations/src/versions/v2.rs +++ b/nomos-da/network/subnetworks-assignations/src/versions/v2.rs @@ -1,7 +1,9 @@ use crate::MembershipHandler; use libp2p_identity::PeerId; +use serde::{Deserialize, Serialize}; use std::collections::HashSet; +#[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct FillWithOriginalReplication { pub assignations: Vec>, pub subnetwork_size: usize, @@ -63,7 +65,7 @@ impl FillWithOriginalReplication { } impl MembershipHandler for FillWithOriginalReplication { - type NetworkId = u16; + type NetworkId = u32; type Id = PeerId; fn membership(&self, id: &Self::Id) -> HashSet { diff --git a/nomos-services/api/Cargo.toml b/nomos-services/api/Cargo.toml index 883be1e6..3f4acdc9 100644 --- a/nomos-services/api/Cargo.toml +++ b/nomos-services/api/Cargo.toml @@ -17,17 +17,19 @@ nomos-core = { path = "../../nomos-core" } cryptarchia-consensus = { path = "../cryptarchia-consensus" } nomos-network = { path = "../../nomos-services/network" } nomos-mempool = { path = "../../nomos-services/mempool", features = [ - "mock", - "libp2p", - "openapi", + "mock", + "libp2p", + "openapi", ] } nomos-metrics = { path = "../../nomos-services/metrics" } nomos-da-indexer = { path = "../data-availability/indexer", features = ["rocksdb-backend"] } +nomos-da-sampling = { path = "../data-availability/sampling" } nomos-da-verifier = { path = "../data-availability/verifier", features = ["rocksdb-backend", "libp2p"] } nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] } nomos-libp2p = { path = "../../nomos-libp2p" } full-replication = { path = "../../nomos-da/full-replication" } kzgrs-backend = { path = "../../nomos-da/kzgrs-backend" } +rand = "0.8" serde = { version = "1", features = ["derive"] } tokio = { version = "1.33", default-features = false, features = ["sync"] } diff --git a/nomos-services/api/src/http/consensus/cryptarchia.rs b/nomos-services/api/src/http/consensus/cryptarchia.rs index 71392982..00c56f34 100644 --- a/nomos-services/api/src/http/consensus/cryptarchia.rs +++ b/nomos-services/api/src/http/consensus/cryptarchia.rs @@ -1,6 +1,7 @@ use std::{fmt::Debug, hash::Hash}; use overwatch_rs::overwatch::handle::OverwatchHandle; +use rand::{RngCore, SeedableRng}; use serde::{de::DeserializeOwned, Serialize}; use tokio::sync::oneshot; @@ -15,12 +16,20 @@ use nomos_core::{ header::HeaderId, tx::{select::FillSize as FillSizeWithTx, Transaction}, }; +use nomos_da_sampling::backend::DaSamplingServiceBackend; use nomos_mempool::{ backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter, }; use nomos_storage::backends::{rocksdb::RocksBackend, StorageSerde}; -pub type Cryptarchia = CryptarchiaConsensus< +pub type Cryptarchia< + Tx, + SS, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + const SIZE: usize, +> = CryptarchiaConsensus< ConsensusNetworkAdapter, MockPool::Hash>, MempoolNetworkAdapter::Hash>, @@ -29,9 +38,19 @@ pub type Cryptarchia = CryptarchiaConsensus< FillSizeWithTx, FillSizeWithBlobs, RocksBackend, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, >; -pub async fn cryptarchia_info( +pub async fn cryptarchia_info< + Tx, + SS, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + const SIZE: usize, +>( handle: &OverwatchHandle, ) -> Result where @@ -47,9 +66,15 @@ where + 'static, ::Hash: std::cmp::Ord + Debug + Send + Sync + 'static, SS: StorageSerde + Send + Sync + 'static, + SamplingRng: SeedableRng + RngCore, + SamplingBackend: DaSamplingServiceBackend + Send, + SamplingBackend::Settings: Clone, + SamplingBackend::Blob: Debug + 'static, + SamplingBackend::BlobId: Debug + 'static, + SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, { let relay = handle - .relay::>() + .relay::>() .connect() .await?; let (sender, receiver) = oneshot::channel(); @@ -61,7 +86,14 @@ where Ok(receiver.await?) } -pub async fn cryptarchia_headers( +pub async fn cryptarchia_headers< + Tx, + SS, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + const SIZE: usize, +>( handle: &OverwatchHandle, from: Option, to: Option, @@ -79,9 +111,15 @@ where + 'static, ::Hash: std::cmp::Ord + Debug + Send + Sync + 'static, SS: StorageSerde + Send + Sync + 'static, + SamplingRng: SeedableRng + RngCore, + SamplingBackend: DaSamplingServiceBackend + Send, + SamplingBackend::Settings: Clone, + SamplingBackend::Blob: Debug + 'static, + SamplingBackend::BlobId: Debug + 'static, + SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, { let relay = handle - .relay::>() + .relay::>() .connect() .await?; let (sender, receiver) = oneshot::channel(); diff --git a/nomos-services/api/src/http/da.rs b/nomos-services/api/src/http/da.rs index e0eee7d1..e8f43ed7 100644 --- a/nomos-services/api/src/http/da.rs +++ b/nomos-services/api/src/http/da.rs @@ -10,6 +10,7 @@ use nomos_da_indexer::DaMsg; use nomos_da_indexer::{ consensus::adapters::cryptarchia::CryptarchiaConsensusAdapter, DataIndexerService, }; +use nomos_da_sampling::backend::DaSamplingServiceBackend; use nomos_da_verifier::backend::VerifierBackend; use nomos_da_verifier::network::adapters::libp2p::Libp2pAdapter; use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapter as VerifierStorageAdapter; @@ -20,6 +21,7 @@ use nomos_storage::backends::rocksdb::RocksBackend; use nomos_storage::backends::StorageSerde; use overwatch_rs::overwatch::handle::OverwatchHandle; use overwatch_rs::DynError; +use rand::{RngCore, SeedableRng}; use serde::de::DeserializeOwned; use serde::Serialize; use std::error::Error; @@ -27,7 +29,16 @@ use std::fmt::Debug; use std::hash::Hash; use tokio::sync::oneshot; -pub type DaIndexer = DataIndexerService< +pub type DaIndexer< + Tx, + C, + V, + SS, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + const SIZE: usize, +> = DataIndexerService< // Indexer specific. Bytes, IndexerStorageAdapter, @@ -41,6 +52,9 @@ pub type DaIndexer = DataIndexerService< FillSizeWithTx, FillSizeWithBlobs, RocksBackend, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, >; pub type DaVerifier = @@ -73,7 +87,16 @@ where Ok(receiver.await?) } -pub async fn get_range( +pub async fn get_range< + Tx, + C, + V, + SS, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + const SIZE: usize, +>( handle: &OverwatchHandle, app_id: ::AppId, range: Range<::Index>, @@ -116,9 +139,15 @@ where ::Index: AsRef<[u8]> + Serialize + DeserializeOwned + Clone + PartialOrd + Send + Sync, SS: StorageSerde + Send + Sync + 'static, + SamplingRng: SeedableRng + RngCore, + SamplingBackend: DaSamplingServiceBackend + Send, + SamplingBackend::Settings: Clone, + SamplingBackend::Blob: Debug + 'static, + SamplingBackend::BlobId: Debug + 'static, + SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, { let relay = handle - .relay::>() + .relay::>() .connect() .await?; let (sender, receiver) = oneshot::channel(); diff --git a/nomos-services/cryptarchia-consensus/Cargo.toml b/nomos-services/cryptarchia-consensus/Cargo.toml index 9d24dde0..00362dd3 100644 --- a/nomos-services/cryptarchia-consensus/Cargo.toml +++ b/nomos-services/cryptarchia-consensus/Cargo.toml @@ -12,6 +12,7 @@ chrono = "0.4" cryptarchia-engine = { path = "../../consensus/cryptarchia-engine", features = ["serde"] } cryptarchia-ledger = { path = "../../ledger/cryptarchia-ledger", features = ["serde"] } futures = "0.3" +nomos-da-sampling = { path = "../data-availability/sampling" } nomos-network = { path = "../network" } nomos-mempool = { path = "../mempool" } nomos-core = { path = "../../nomos-core" } diff --git a/nomos-services/cryptarchia-consensus/src/lib.rs b/nomos-services/cryptarchia-consensus/src/lib.rs index 0f24e22c..7e86bdc9 100644 --- a/nomos-services/cryptarchia-consensus/src/lib.rs +++ b/nomos-services/cryptarchia-consensus/src/lib.rs @@ -16,6 +16,8 @@ use nomos_core::{ block::{builder::BlockBuilder, Block}, header::cryptarchia::Builder, }; +use nomos_da_sampling::backend::DaSamplingServiceBackend; +use nomos_da_sampling::{DaSamplingService, DaSamplingServiceMsg}; use nomos_mempool::{ backend::MemPool, network::NetworkAdapter as MempoolAdapter, DaMempoolService, MempoolMsg, TxMempoolService, @@ -29,8 +31,11 @@ use overwatch_rs::services::{ state::{NoOperator, NoState}, ServiceCore, ServiceData, ServiceId, }; +use overwatch_rs::DynError; +use rand::{RngCore, SeedableRng}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_with::serde_as; +use std::collections::BTreeSet; use std::hash::Hash; use thiserror::Error; pub use time::Config as TimeConfig; @@ -41,6 +46,7 @@ use tracing::{error, instrument, span, Level}; use tracing_futures::Instrument; type MempoolRelay = OutboundRelay>; +type SamplingRelay = OutboundRelay>; // Limit the number of blocks returned by GetHeaders const HEADERS_LIMIT: usize = 512; @@ -134,8 +140,19 @@ impl CryptarchiaSettings { } } -pub struct CryptarchiaConsensus -where +pub struct CryptarchiaConsensus< + A, + ClPool, + ClPoolAdapter, + DaPool, + DaPoolAdapter, + TxS, + BS, + Storage, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, +> where A: NetworkAdapter, ClPoolAdapter: MempoolAdapter, ClPool: MemPool, @@ -150,6 +167,12 @@ where TxS: TxSelect, BS: BlobSelect, Storage: StorageBackend + Send + Sync + 'static, + SamplingRng: SeedableRng + RngCore, + SamplingBackend: DaSamplingServiceBackend + Send, + SamplingBackend::Settings: Clone, + SamplingBackend::Blob: Debug + 'static, + SamplingBackend::BlobId: Debug + 'static, + SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, { service_state: ServiceStateHandle, // underlying networking backend. We need this so we can relay and check the types properly @@ -157,12 +180,37 @@ where network_relay: Relay>, cl_mempool_relay: Relay>, da_mempool_relay: Relay>, + sampling_relay: Relay>, block_subscription_sender: broadcast::Sender>, storage_relay: Relay>, } -impl ServiceData - for CryptarchiaConsensus +impl< + A, + ClPool, + ClPoolAdapter, + DaPool, + DaPoolAdapter, + TxS, + BS, + Storage, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + > ServiceData + for CryptarchiaConsensus< + A, + ClPool, + ClPoolAdapter, + DaPool, + DaPoolAdapter, + TxS, + BS, + Storage, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + > where A: NetworkAdapter, ClPool: MemPool, @@ -177,6 +225,12 @@ where TxS: TxSelect, BS: BlobSelect, Storage: StorageBackend + Send + Sync + 'static, + SamplingRng: SeedableRng + RngCore, + SamplingBackend: DaSamplingServiceBackend + Send, + SamplingBackend::Settings: Clone, + SamplingBackend::Blob: Debug + 'static, + SamplingBackend::BlobId: Debug + 'static, + SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, { const SERVICE_ID: ServiceId = CRYPTARCHIA_ID; type Settings = CryptarchiaSettings; @@ -186,8 +240,32 @@ where } #[async_trait::async_trait] -impl ServiceCore - for CryptarchiaConsensus +impl< + A, + ClPool, + ClPoolAdapter, + DaPool, + DaPoolAdapter, + TxS, + BS, + Storage, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + > ServiceCore + for CryptarchiaConsensus< + A, + ClPool, + ClPoolAdapter, + DaPool, + DaPoolAdapter, + TxS, + BS, + Storage, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + > where A: NetworkAdapter + Clone @@ -196,7 +274,7 @@ where + 'static, ClPool: MemPool + Send + Sync + 'static, ClPool::Settings: Send + Sync + 'static, - DaPool: MemPool + Send + Sync + 'static, + DaPool: MemPool + Send + Sync + 'static, DaPool::Settings: Send + Sync + 'static, ClPool::Item: Transaction + Debug @@ -221,7 +299,6 @@ where + Sync + 'static, ClPool::Key: Debug + Send + Sync, - DaPool::Key: Debug + Send + Sync, ClPoolAdapter: MempoolAdapter + Send + Sync + 'static, DaPoolAdapter: MempoolAdapter + Send + Sync + 'static, @@ -231,12 +308,20 @@ where BS: BlobSelect + Clone + Send + Sync + 'static, BS::Settings: Send + Sync + 'static, Storage: StorageBackend + Send + Sync + 'static, + SamplingRng: SeedableRng + RngCore, + SamplingBackend: DaSamplingServiceBackend + Send, + SamplingBackend::Settings: Clone, + SamplingBackend::Blob: Debug + Send + 'static, + SamplingBackend::BlobId: Debug + Ord + Send + Sync + 'static, + + SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, { fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); let cl_mempool_relay = service_state.overwatch_handle.relay(); let da_mempool_relay = service_state.overwatch_handle.relay(); let storage_relay = service_state.overwatch_handle.relay(); + let sampling_relay = service_state.overwatch_handle.relay(); let (block_subscription_sender, _) = broadcast::channel(16); Ok(Self { service_state, @@ -245,6 +330,7 @@ where da_mempool_relay, block_subscription_sender, storage_relay, + sampling_relay, }) } @@ -273,6 +359,12 @@ where .await .expect("Relay connection with StorageService should succeed"); + let sampling_relay: OutboundRelay<_> = self + .sampling_relay + .connect() + .await + .expect("Relay connection with SamplingService should succeed"); + let CryptarchiaSettings { config, genesis_state, @@ -317,6 +409,7 @@ where storage_relay.clone(), cl_mempool_relay.clone(), da_mempool_relay.clone(), + sampling_relay.clone(), &mut self.block_subscription_sender ) .await; @@ -341,6 +434,7 @@ where blob_selector.clone(), cl_mempool_relay.clone(), da_mempool_relay.clone(), + sampling_relay.clone(), ).await; if let Some(block) = block { @@ -367,14 +461,36 @@ where } } -impl - CryptarchiaConsensus +impl< + A, + ClPool, + ClPoolAdapter, + DaPool, + DaPoolAdapter, + TxS, + BS, + Storage, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + > + CryptarchiaConsensus< + A, + ClPool, + ClPoolAdapter, + DaPool, + DaPoolAdapter, + TxS, + BS, + Storage, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + > where A: NetworkAdapter + Clone + Send + Sync + 'static, ClPool: MemPool + Send + Sync + 'static, ClPool::Settings: Send + Sync + 'static, - DaPool: MemPool + Send + Sync + 'static, - DaPool::Settings: Send + Sync + 'static, ClPool::Item: Transaction + Debug + Clone @@ -396,15 +512,22 @@ where + Send + Sync + 'static, + DaPool: MemPool + Send + Sync + 'static, + DaPool::Settings: Send + Sync + 'static, TxS: TxSelect + Clone + Send + Sync + 'static, BS: BlobSelect + Clone + Send + Sync + 'static, ClPool::Key: Debug + Send + Sync, - DaPool::Key: Debug + Send + Sync, ClPoolAdapter: MempoolAdapter + Send + Sync + 'static, DaPoolAdapter: MempoolAdapter + Send + Sync + 'static, DaPoolAdapter::Payload: DispersedBlobInfo + Into + Debug, Storage: StorageBackend + Send + Sync + 'static, + SamplingRng: SeedableRng + RngCore, + SamplingBackend: DaSamplingServiceBackend + Send, + SamplingBackend::Settings: Clone, + SamplingBackend::Blob: Debug + 'static, + SamplingBackend::BlobId: Debug + Ord + Send + Sync + 'static, + SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, { async fn should_stop_service(message: LifecycleMessage) -> bool { match message { @@ -479,7 +602,14 @@ where #[allow(clippy::type_complexity, clippy::too_many_arguments)] #[instrument( level = "debug", - skip(cryptarchia, storage_relay, cl_mempool_relay, da_mempool_relay, leader) + skip( + cryptarchia, + storage_relay, + cl_mempool_relay, + da_mempool_relay, + sampling_relay, + leader + ) )] async fn process_block( mut cryptarchia: Cryptarchia, @@ -492,6 +622,7 @@ where da_mempool_relay: OutboundRelay< MempoolMsg, >, + sampling_relay: SamplingRelay, block_broadcaster: &mut broadcast::Sender>, ) -> Cryptarchia { tracing::debug!("received proposal {:?}", block); @@ -500,6 +631,18 @@ where let header = block.header().cryptarchia(); let id = header.id(); + let sampled_blobs = match get_sampled_blobs(sampling_relay.clone()).await { + Ok(sampled_blobs) => sampled_blobs, + Err(error) => { + error!("Unable to retrieved sampled blobs: {error}"); + return cryptarchia; + } + }; + if !Self::validate_block(&block, &sampled_blobs) { + error!("Invalid block: {block:?}"); + return cryptarchia; + } + match cryptarchia.try_apply_header(header) { Ok(new_state) => { // update leader @@ -512,7 +655,6 @@ where id, ) .await; - mark_in_block( da_mempool_relay, block.blobs().map(DispersedBlobInfo::blob_id), @@ -520,6 +662,12 @@ where ) .await; + mark_blob_in_block( + sampling_relay, + block.blobs().map(DispersedBlobInfo::blob_id).collect(), + ) + .await; + // store block let msg = >::new_store_message(header.id(), block.clone()); if let Err((e, _msg)) = storage_relay.send(msg).await { @@ -544,7 +692,13 @@ where #[instrument( level = "debug", - skip(cl_mempool_relay, da_mempool_relay, tx_selector, blob_selector) + skip( + cl_mempool_relay, + da_mempool_relay, + sampling_relay, + tx_selector, + blob_selector + ) )] async fn propose_block( parent: HeaderId, @@ -553,17 +707,20 @@ where blob_selector: BS, cl_mempool_relay: MempoolRelay, da_mempool_relay: MempoolRelay, + sampling_relay: SamplingRelay, ) -> Option> { let mut output = None; let cl_txs = get_mempool_contents(cl_mempool_relay); let da_certs = get_mempool_contents(da_mempool_relay); - - match futures::join!(cl_txs, da_certs) { - (Ok(cl_txs), Ok(da_certs)) => { + let blobs_ids = get_sampled_blobs(sampling_relay); + match futures::join!(cl_txs, da_certs, blobs_ids) { + (Ok(cl_txs), Ok(da_blobs_info), Ok(blobs_ids)) => { let Ok(block) = BlockBuilder::new(tx_selector, blob_selector) .with_cryptarchia_builder(Builder::new(parent, proof)) .with_transactions(cl_txs) - .with_blobs_certificates(da_certs) + .with_blobs_info( + da_blobs_info.filter(move |info| blobs_ids.contains(&info.blob_id())), + ) .build() else { panic!("Proposal block should always succeed to be built") @@ -571,11 +728,30 @@ where tracing::debug!("proposed block with id {:?}", block.header().id()); output = Some(block); } - (Err(_), _) => tracing::error!("Could not fetch block cl transactions"), - (_, Err(_)) => tracing::error!("Could not fetch block da certificates"), + (tx_error, da_certificate_error, blobs_error) => { + if let Err(_tx_error) = tx_error { + tracing::error!("Could not fetch block cl transactions"); + } + if let Err(_da_certificate_error) = da_certificate_error { + tracing::error!("Could not fetch block da certificates"); + } + if let Err(_blobs_error) = blobs_error { + tracing::error!("Could not fetch block da blobs"); + } + } } output } + + fn validate_block( + block: &Block, + sampled_blobs_ids: &BTreeSet, + ) -> bool { + let validated_blobs = block + .blobs() + .all(|blob| sampled_blobs_ids.contains(&blob.blob_id())); + validated_blobs + } } #[derive(Debug)] @@ -633,3 +809,28 @@ async fn mark_in_block( .await .unwrap_or_else(|(e, _)| tracing::error!("Could not mark items in block: {e}")) } + +async fn mark_blob_in_block( + sampling_relay: SamplingRelay, + blobs_id: Vec, +) { + if let Err((_e, DaSamplingServiceMsg::MarkInBlock { blobs_id })) = sampling_relay + .send(DaSamplingServiceMsg::MarkInBlock { blobs_id }) + .await + { + error!("Error marking in block for blobs ids: {blobs_id:?}"); + } +} + +async fn get_sampled_blobs( + sampling_relay: SamplingRelay, +) -> Result, DynError> { + let (sender, receiver) = oneshot::channel(); + sampling_relay + .send(DaSamplingServiceMsg::GetValidatedBlobs { + reply_channel: sender, + }) + .await + .map_err(|(error, _)| Box::new(error) as DynError)?; + receiver.await.map_err(|error| Box::new(error) as DynError) +} diff --git a/nomos-services/data-availability/indexer/Cargo.toml b/nomos-services/data-availability/indexer/Cargo.toml index d4513202..5ab75e98 100644 --- a/nomos-services/data-availability/indexer/Cargo.toml +++ b/nomos-services/data-availability/indexer/Cargo.toml @@ -11,14 +11,16 @@ bytes = "1.2" futures = "0.3" nomos-core = { path = "../../../nomos-core" } nomos-da-storage = { path = "../../../nomos-da/storage" } +nomos-da-sampling = { path = "../sampling" } nomos-storage = { path = "../../../nomos-services/storage" } nomos-mempool = { path = "../../../nomos-services/mempool" } cryptarchia-consensus = { path = "../../../nomos-services/cryptarchia-consensus" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } -tokio = { version = "1", features = ["sync"] } serde = { version = "1.0", features = ["derive"] } +rand = "0.8" tracing = "0.1" +tokio = { version = "1", features = ["sync"] } tokio-stream = "0.1.15" [features] diff --git a/nomos-services/data-availability/indexer/src/lib.rs b/nomos-services/data-availability/indexer/src/lib.rs index 8da9b9f9..ee8abdda 100644 --- a/nomos-services/data-availability/indexer/src/lib.rs +++ b/nomos-services/data-availability/indexer/src/lib.rs @@ -13,6 +13,7 @@ use nomos_core::block::Block; use nomos_core::da::blob::{info::DispersedBlobInfo, metadata::Metadata, BlobSelect}; use nomos_core::header::HeaderId; use nomos_core::tx::{Transaction, TxSelect}; +use nomos_da_sampling::backend::DaSamplingServiceBackend; use nomos_mempool::{backend::MemPool, network::NetworkAdapter as MempoolAdapter}; use nomos_storage::backends::StorageBackend; use nomos_storage::StorageService; @@ -22,14 +23,40 @@ use overwatch_rs::services::relay::{Relay, RelayMessage}; use overwatch_rs::services::state::{NoOperator, NoState}; use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; use overwatch_rs::DynError; +use rand::{RngCore, SeedableRng}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use storage::DaStorageAdapter; use tokio::sync::oneshot::Sender; use tracing::error; -pub type ConsensusRelay = - Relay>; +pub type ConsensusRelay< + A, + ClPool, + ClPoolAdapter, + DaPool, + DaPoolAdapter, + TxS, + BS, + Storage, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, +> = Relay< + CryptarchiaConsensus< + A, + ClPool, + ClPoolAdapter, + DaPool, + DaPoolAdapter, + TxS, + BS, + Storage, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + >, +>; pub struct DataIndexerService< B, @@ -43,6 +70,9 @@ pub struct DataIndexerService< TxS, BS, ConsensusStorage, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, > where B: 'static, A: NetworkAdapter, @@ -60,11 +90,28 @@ pub struct DataIndexerService< BS: BlobSelect, DaStorage: DaStorageAdapter, ConsensusStorage: StorageBackend + Send + Sync + 'static, + SamplingRng: SeedableRng + RngCore, + SamplingBackend: DaSamplingServiceBackend + Send, + SamplingBackend::Settings: Clone, + SamplingBackend::Blob: Debug + 'static, + SamplingBackend::BlobId: Debug + 'static, + SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, { service_state: ServiceStateHandle, storage_relay: Relay>, - consensus_relay: - ConsensusRelay, + consensus_relay: ConsensusRelay< + A, + ClPool, + ClPoolAdapter, + DaPool, + DaPoolAdapter, + TxS, + BS, + ConsensusStorage, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + >, } pub enum DaMsg { @@ -105,6 +152,9 @@ impl< TxS, BS, ConsensusStorage, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, > ServiceData for DataIndexerService< B, @@ -118,6 +168,9 @@ impl< TxS, BS, ConsensusStorage, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, > where B: 'static, @@ -136,6 +189,12 @@ where BS: BlobSelect, DaStorage: DaStorageAdapter, ConsensusStorage: StorageBackend + Send + Sync + 'static, + SamplingRng: SeedableRng + RngCore, + SamplingBackend: DaSamplingServiceBackend + Send, + SamplingBackend::Settings: Clone, + SamplingBackend::Blob: Debug + 'static, + SamplingBackend::BlobId: Debug + 'static, + SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, { const SERVICE_ID: ServiceId = "DaIndexer"; type Settings = IndexerSettings; @@ -156,6 +215,9 @@ impl< TxS, BS, ConsensusStorage, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, > DataIndexerService< B, @@ -169,6 +231,9 @@ impl< TxS, BS, ConsensusStorage, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, > where B: Send + Sync + 'static, @@ -188,6 +253,12 @@ where BS: BlobSelect, DaStorage: DaStorageAdapter, ConsensusStorage: StorageBackend + Send + Sync + 'static, + SamplingRng: SeedableRng + RngCore, + SamplingBackend: DaSamplingServiceBackend + Send, + SamplingBackend::Settings: Clone, + SamplingBackend::Blob: Debug + 'static, + SamplingBackend::BlobId: Debug + 'static, + SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, { async fn handle_new_block( storage_adapter: &DaStorage, @@ -249,6 +320,9 @@ impl< TxS, BS, ConsensusStorage, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, > ServiceCore for DataIndexerService< B, @@ -262,6 +336,9 @@ impl< TxS, BS, ConsensusStorage, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, > where B: Debug + Send + Sync, @@ -304,6 +381,12 @@ where DaStorage::Settings: Clone + Send + Sync + 'static, ConsensusStorage: StorageBackend + Send + Sync + 'static, Consensus: ConsensusAdapter + Send + Sync, + SamplingRng: SeedableRng + RngCore, + SamplingBackend: DaSamplingServiceBackend + Send, + SamplingBackend::Settings: Clone, + SamplingBackend::Blob: Debug + 'static, + SamplingBackend::BlobId: Debug + 'static, + SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, { fn init(service_state: ServiceStateHandle) -> Result { let consensus_relay = service_state.overwatch_handle.relay(); diff --git a/nomos-services/data-availability/sampling/src/backend/kzgrs.rs b/nomos-services/data-availability/sampling/src/backend/kzgrs.rs index 2d7bb76e..419888d9 100644 --- a/nomos-services/data-availability/sampling/src/backend/kzgrs.rs +++ b/nomos-services/data-availability/sampling/src/backend/kzgrs.rs @@ -7,6 +7,7 @@ use std::time::{Duration, Instant}; use hex; use rand::distributions::Standard; use rand::prelude::*; +use serde::{Deserialize, Serialize}; use tokio::time; use tokio::time::Interval; @@ -24,21 +25,21 @@ pub struct SamplingContext { started: Instant, } -#[derive(Debug, Clone)] -pub struct KzgrsDaSamplerSettings { +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct KzgrsSamplingBackendSettings { pub num_samples: u16, pub old_blobs_check_interval: Duration, pub blobs_validity_duration: Duration, } -pub struct KzgrsDaSampler { - settings: KzgrsDaSamplerSettings, +pub struct KzgrsSamplingBackend { + settings: KzgrsSamplingBackendSettings, validated_blobs: BTreeSet, pending_sampling_blobs: HashMap, rng: R, } -impl KzgrsDaSampler { +impl KzgrsSamplingBackend { fn prune_by_time(&mut self) { self.pending_sampling_blobs.retain(|_blob_id, context| { context.started.elapsed() < self.settings.blobs_validity_duration @@ -47,8 +48,8 @@ impl KzgrsDaSampler { } #[async_trait::async_trait] -impl DaSamplingServiceBackend for KzgrsDaSampler { - type Settings = KzgrsDaSamplerSettings; +impl DaSamplingServiceBackend for KzgrsSamplingBackend { + type Settings = KzgrsSamplingBackendSettings; type BlobId = BlobId; type Blob = DaBlob; @@ -62,7 +63,7 @@ impl DaSamplingServiceBackend for KzgrsDaSampler { } } - async fn prune_interval(&self) -> Interval { + fn prune_interval(&self) -> Interval { time::interval(self.settings.old_blobs_check_interval) } @@ -70,7 +71,7 @@ impl DaSamplingServiceBackend for KzgrsDaSampler { self.validated_blobs.clone() } - async fn mark_in_block(&mut self, blobs_ids: &[Self::BlobId]) { + async fn mark_completed(&mut self, blobs_ids: &[Self::BlobId]) { for id in blobs_ids { self.pending_sampling_blobs.remove(id); self.validated_blobs.remove(id); @@ -142,20 +143,20 @@ mod test { use rand::rngs::StdRng; use crate::backend::kzgrs::{ - DaSamplingServiceBackend, KzgrsDaSampler, KzgrsDaSamplerSettings, SamplingContext, - SamplingState, + DaSamplingServiceBackend, KzgrsSamplingBackend, KzgrsSamplingBackendSettings, + SamplingContext, SamplingState, }; use kzgrs_backend::common::{blob::DaBlob, Column}; use nomos_core::da::BlobId; - fn create_sampler(subnet_num: usize) -> KzgrsDaSampler { - let settings = KzgrsDaSamplerSettings { + fn create_sampler(subnet_num: usize) -> KzgrsSamplingBackend { + let settings = KzgrsSamplingBackendSettings { num_samples: subnet_num as u16, old_blobs_check_interval: Duration::from_millis(20), blobs_validity_duration: Duration::from_millis(10), }; let rng = StdRng::from_entropy(); - KzgrsDaSampler::new(settings, rng) + KzgrsSamplingBackend::new(settings, rng) } #[tokio::test] @@ -204,7 +205,7 @@ mod test { // mark in block for both // collections should be reset - sampler.mark_in_block(&[b1, b2]).await; + sampler.mark_completed(&[b1, b2]).await; assert!(sampler.pending_sampling_blobs.is_empty()); assert!(sampler.validated_blobs.is_empty()); @@ -301,7 +302,7 @@ mod test { // run mark_in_block for the same blob // should return empty for everything - sampler.mark_in_block(&[b1]).await; + sampler.mark_completed(&[b1]).await; assert!(sampler.validated_blobs.is_empty()); assert!(sampler.pending_sampling_blobs.is_empty()); } diff --git a/nomos-services/data-availability/sampling/src/backend/mod.rs b/nomos-services/data-availability/sampling/src/backend/mod.rs index 0cc6e758..075e950b 100644 --- a/nomos-services/data-availability/sampling/src/backend/mod.rs +++ b/nomos-services/data-availability/sampling/src/backend/mod.rs @@ -2,11 +2,9 @@ pub mod kzgrs; // std use std::collections::BTreeSet; - // crates use rand::Rng; use tokio::time::Interval; -// // internal use nomos_da_network_core::SubnetworkId; @@ -24,10 +22,10 @@ pub trait DaSamplingServiceBackend { fn new(settings: Self::Settings, rng: R) -> Self; async fn get_validated_blobs(&self) -> BTreeSet; - async fn mark_in_block(&mut self, blobs_ids: &[Self::BlobId]); + async fn mark_completed(&mut self, blobs_ids: &[Self::BlobId]); async fn handle_sampling_success(&mut self, blob_id: Self::BlobId, blob: Self::Blob); async fn handle_sampling_error(&mut self, blob_id: Self::BlobId); async fn init_sampling(&mut self, blob_id: Self::BlobId) -> SamplingState; - async fn prune_interval(&self) -> Interval; + fn prune_interval(&self) -> Interval; fn prune(&mut self); } diff --git a/nomos-services/data-availability/sampling/src/lib.rs b/nomos-services/data-availability/sampling/src/lib.rs index 9eea10d9..4bb3b6b3 100644 --- a/nomos-services/data-availability/sampling/src/lib.rs +++ b/nomos-services/data-availability/sampling/src/lib.rs @@ -22,6 +22,7 @@ use overwatch_rs::services::relay::{Relay, RelayMessage}; use overwatch_rs::services::state::{NoOperator, NoState}; use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; use overwatch_rs::DynError; +use serde::{Deserialize, Serialize}; use tokio::sync::oneshot; const DA_SAMPLING_TAG: ServiceId = "DA-Sampling"; @@ -39,7 +40,7 @@ pub enum DaSamplingServiceMsg { }, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct DaSamplingServiceSettings { pub sampling_settings: BackendSettings, pub network_adapter_settings: NetworkSettings, @@ -47,7 +48,7 @@ pub struct DaSamplingServiceSettings { impl RelayMessage for DaSamplingServiceMsg {} -pub struct DaSamplingService +pub struct DaSamplingService where R: SeedableRng + RngCore, Backend: DaSamplingServiceBackend + Send, @@ -62,7 +63,7 @@ where sampler: Backend, } -impl DaSamplingService +impl DaSamplingService where R: SeedableRng + RngCore, Backend: DaSamplingServiceBackend + Send + 'static, @@ -111,7 +112,7 @@ where } } DaSamplingServiceMsg::MarkInBlock { blobs_id } => { - sampler.mark_in_block(&blobs_id).await; + sampler.mark_completed(&blobs_id).await; } } } @@ -132,7 +133,7 @@ where } } -impl ServiceData for DaSamplingService +impl ServiceData for DaSamplingService where R: SeedableRng + RngCore, Backend: DaSamplingServiceBackend + Send, @@ -150,7 +151,7 @@ where } #[async_trait::async_trait] -impl ServiceCore for DaSamplingService +impl ServiceCore for DaSamplingService where R: SeedableRng + RngCore, Backend: DaSamplingServiceBackend + Send + Sync + 'static, @@ -189,7 +190,7 @@ where let mut network_adapter = N::new(network_relay).await; let mut sampling_message_stream = network_adapter.listen_to_sampling_messages().await?; - let mut next_prune_tick = sampler.prune_interval().await; + let mut next_prune_tick = sampler.prune_interval(); let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); async { diff --git a/nomos-services/data-availability/sampling/src/network/adapters/libp2p.rs b/nomos-services/data-availability/sampling/src/network/adapters/libp2p.rs index 7d7685be..302c2188 100644 --- a/nomos-services/data-availability/sampling/src/network/adapters/libp2p.rs +++ b/nomos-services/data-availability/sampling/src/network/adapters/libp2p.rs @@ -17,9 +17,10 @@ use nomos_da_network_service::{DaNetworkMsg, NetworkService}; use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::ServiceData; use overwatch_rs::DynError; +use serde::{Deserialize, Serialize}; use subnetworks_assignations::MembershipHandler; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct DaNetworkSamplingSettings { pub num_samples: u16, pub subnet_size: SubnetworkId, diff --git a/nomos-services/data-availability/tests/Cargo.toml b/nomos-services/data-availability/tests/Cargo.toml index b353d4e9..9421ec95 100644 --- a/nomos-services/data-availability/tests/Cargo.toml +++ b/nomos-services/data-availability/tests/Cargo.toml @@ -15,6 +15,8 @@ kzgrs-backend = { path = "../../../nomos-da/kzgrs-backend" } nomos-core = { path = "../../../nomos-core" } nomos-da-indexer = { path = "../indexer", features = ["rocksdb-backend"] } nomos-da-verifier = { path = "../verifier", features = ["rocksdb-backend", "libp2p"] } +nomos-da-sampling = { path = "../sampling" } +nomos-da-network-service = { path = "../network" } nomos-da-storage = { path = "../../../nomos-da/storage" } nomos-node = { path = "../../../nodes/nomos-node" } nomos-mempool = { path = "../../../nomos-services/mempool" } @@ -24,12 +26,14 @@ nomos-network = { path = "../../network", features = ["mock"] } nomos-libp2p = { path = "../../../nomos-libp2p" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } +rand = "0.8" +rand_chacha = "0.3" tokio = { version = "1", features = ["sync"] } tokio-stream = "0.1.15" tempfile = "3.6" tracing = "0.1" time = "0.3" -rand = "0.8" +subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" } [dev-dependencies] blake2 = { version = "0.10" } diff --git a/nomos-services/data-availability/tests/src/common.rs b/nomos-services/data-availability/tests/src/common.rs index 46587e64..7995bbd5 100644 --- a/nomos-services/data-availability/tests/src/common.rs +++ b/nomos-services/data-availability/tests/src/common.rs @@ -5,6 +5,10 @@ use nomos_core::{da::blob::info::DispersedBlobInfo, header::HeaderId, tx::Transa use nomos_da_indexer::consensus::adapters::cryptarchia::CryptarchiaConsensusAdapter; use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapter as IndexerStorageAdapter; use nomos_da_indexer::DataIndexerService; +use nomos_da_sampling::{ + backend::kzgrs::KzgrsSamplingBackend, + network::adapters::libp2p::Libp2pAdapter as SamplingLibp2pAdapter, +}; use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier; use nomos_da_verifier::network::adapters::libp2p::Libp2pAdapter; use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapter as VerifierStorageAdapter; @@ -13,6 +17,8 @@ use nomos_libp2p::{Multiaddr, Swarm, SwarmConfig}; use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter; use nomos_mempool::{backend::mockpool::MockPool, TxMempoolService}; use nomos_storage::backends::rocksdb::RocksBackend; +use rand_chacha::ChaCha20Rng; +use subnetworks_assignations::versions::v2::FillWithOriginalReplication; pub use nomos_core::{ da::blob::select::FillSize as FillSizeWithBlobs, tx::select::FillSize as FillSizeWithTx, @@ -29,6 +35,9 @@ pub(crate) type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus< FillSizeWithTx, FillSizeWithBlobs, RocksBackend, + KzgrsSamplingBackend, + SamplingLibp2pAdapter, + ChaCha20Rng, >; pub(crate) type DaIndexer = DataIndexerService< @@ -45,6 +54,9 @@ pub(crate) type DaIndexer = DataIndexerService< FillSizeWithTx, FillSizeWithBlobs, RocksBackend, + KzgrsSamplingBackend, + SamplingLibp2pAdapter, + ChaCha20Rng, >; pub(crate) type TxMempool = TxMempoolService< diff --git a/nomos-services/data-availability/tests/src/indexer_integration.rs b/nomos-services/data-availability/tests/src/indexer_integration.rs index 40f2e3e9..73aabf4c 100644 --- a/nomos-services/data-availability/tests/src/indexer_integration.rs +++ b/nomos-services/data-availability/tests/src/indexer_integration.rs @@ -16,8 +16,14 @@ use nomos_core::da::blob::metadata::Metadata as _; use nomos_core::tx::Transaction; use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings; use nomos_da_indexer::IndexerSettings; +use nomos_da_network_service::backends::libp2p::validator::{ + DaNetworkValidatorBackend, DaNetworkValidatorBackendSettings, +}; +use nomos_da_network_service::NetworkConfig as DaNetworkConfig; +use nomos_da_network_service::NetworkService as DaNetworkService; use nomos_da_storage::fs::write_blob; use nomos_da_storage::rocksdb::DA_VERIFIED_KEY_PREFIX; +use nomos_libp2p::{ed25519, identity, PeerId}; use nomos_libp2p::{Multiaddr, SwarmConfig}; use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings; use nomos_mempool::{DaMempoolSettings, TxMempoolSettings}; @@ -29,6 +35,7 @@ use overwatch_derive::*; use overwatch_rs::overwatch::{Overwatch, OverwatchRunner}; use overwatch_rs::services::handle::ServiceHandle; use rand::{thread_rng, Rng}; +use subnetworks_assignations::versions::v1::FillFromNodeList; use tempfile::{NamedTempFile, TempDir}; use time::OffsetDateTime; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; @@ -39,6 +46,7 @@ use crate::common::*; struct IndexerNode { network: ServiceHandle>, cl_mempool: ServiceHandle, + da_network: ServiceHandle>>, da_mempool: ServiceHandle, storage: ServiceHandle>>, cryptarchia: ServiceHandle, @@ -63,6 +71,18 @@ fn new_node( initial_peers, }, }, + da_network: DaNetworkConfig { + backend: DaNetworkValidatorBackendSettings { + node_key: ed25519::SecretKey::generate(), + membership: FillFromNodeList::new( + &[PeerId::from(identity::Keypair::generate_ed25519().public())], + 2, + 1, + ), + addresses: Default::default(), + listening_address: "/ip4/127.0.0.1/udp/0/quic-v1".parse::().unwrap(), + }, + }, cl_mempool: TxMempoolSettings { backend: (), network: AdapterSettings { @@ -107,6 +127,7 @@ fn new_node( // TODO: When verifier is implemented this test should be removed and a new one // performed in integration tests crate using the real node. +#[ignore = "Sampling needs to be started in mempool"] #[test] fn test_indexer() { let performed_tx = Arc::new(AtomicBool::new(false)); diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 8f3013ed..0a55e24a 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -20,6 +20,7 @@ nomos-mempool = { path = "../nomos-services/mempool", features = ["mock", "libp2 nomos-da-network-service = { path = "../nomos-services/data-availability/network" } nomos-da-indexer = { path = "../nomos-services/data-availability/indexer" } nomos-da-verifier = { path = "../nomos-services/data-availability/verifier" } +nomos-da-sampling = { path = "../nomos-services/data-availability/sampling" } subnetworks-assignations = { path = "../nomos-da/network/subnetworks-assignations" } full-replication = { path = "../nomos-da/full-replication" } hex = "0.4.3" diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index b8510cdb..3cb49f2c 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -33,6 +33,9 @@ use nomos_network::backends::libp2p::mixnet::MixnetConfig; use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig}; use nomos_node::{api::AxumBackendSettings, Config, Tx}; // crates +use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings; +use nomos_da_sampling::network::adapters::libp2p::DaNetworkSamplingSettings; +use nomos_da_sampling::DaSamplingServiceSettings; use once_cell::sync::Lazy; use rand::{thread_rng, Rng, RngCore}; use reqwest::{Client, Url}; @@ -419,6 +422,19 @@ fn create_node_config( cors_origins: vec![], }, }, + da_sampling: DaSamplingServiceSettings { + // TODO: setup this properly! + sampling_settings: KzgrsSamplingBackendSettings { + num_samples: 0, + // Sampling service period can't be zero. + old_blobs_check_interval: Duration::from_secs(1), + blobs_validity_duration: Duration::from_secs(1), + }, + network_adapter_settings: DaNetworkSamplingSettings { + num_samples: 0, + subnet_size: 0, + }, + }, }; config.network.backend.inner.port = get_available_port();