From 461a9cacdf0c0e2f3b93a7dfe488442ccd241f75 Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Wed, 9 Oct 2024 12:32:00 +0200 Subject: [PATCH] DA: Executor sampling adapter (#815) * Add macro to share adapter implementation for both validator and executor * Fix imports in binaries * Adapt cryptarchia types in binaries * Fix tests build --- nodes/nomos-executor/src/config.rs | 2 +- nodes/nomos-executor/src/lib.rs | 9 +- nodes/nomos-node/src/config.rs | 8 +- nodes/nomos-node/src/lib.rs | 17 ++-- .../sampling/src/network/adapters/common.rs | 87 ++++++++++++++++ .../sampling/src/network/adapters/executor.rs | 31 ++++++ .../sampling/src/network/adapters/libp2p.rs | 99 ------------------- .../sampling/src/network/adapters/mod.rs | 6 +- .../src/network/adapters/validator.rs | 29 ++++++ .../data-availability/tests/src/common.rs | 4 +- 10 files changed, 176 insertions(+), 116 deletions(-) create mode 100644 nomos-services/data-availability/sampling/src/network/adapters/common.rs create mode 100644 nomos-services/data-availability/sampling/src/network/adapters/executor.rs delete mode 100644 nomos-services/data-availability/sampling/src/network/adapters/libp2p.rs create mode 100644 nomos-services/data-availability/sampling/src/network/adapters/validator.rs diff --git a/nodes/nomos-executor/src/config.rs b/nodes/nomos-executor/src/config.rs index 2e381b33..bd9fec36 100644 --- a/nodes/nomos-executor/src/config.rs +++ b/nodes/nomos-executor/src/config.rs @@ -26,7 +26,7 @@ pub struct Config { pub da_verifier: ::Settings, pub da_sampling: ::Settings, pub http: ::Settings, - pub cryptarchia: ::Settings, + pub cryptarchia: ::Settings, pub storage: > as ServiceData>::Settings, pub wait_online_secs: u64, } diff --git a/nodes/nomos-executor/src/lib.rs b/nodes/nomos-executor/src/lib.rs index ca162215..a49ac58b 100644 --- a/nodes/nomos-executor/src/lib.rs +++ b/nodes/nomos-executor/src/lib.rs @@ -36,7 +36,7 @@ pub type ExecutorApiService = ApiService< DispersalMempoolAdapter, kzgrs_backend::dispersal::Metadata, KzgrsSamplingBackend, - nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter, + nomos_da_sampling::network::adapters::executor::Libp2pAdapter, ChaCha20Rng, SamplingStorageAdapter, MB16, @@ -47,7 +47,7 @@ pub type DispersalMempoolAdapter = KzgrsMempoolAdapter< MempoolNetworkAdapter::BlobId>, MockPool::BlobId>, KzgrsSamplingBackend, - nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter, + nomos_da_sampling::network::adapters::executor::Libp2pAdapter, ChaCha20Rng, SamplingStorageAdapter, >; @@ -60,6 +60,9 @@ pub type DaDispersal = DispersalService< kzgrs_backend::dispersal::Metadata, >; +pub type ExecutorCryptarchia = + Cryptarchia>; + #[derive(Services)] pub struct NomosExecutor { #[cfg(feature = "tracing")] @@ -72,7 +75,7 @@ pub struct NomosExecutor { da_network: ServiceHandle>>, cl_mempool: ServiceHandle, da_mempool: ServiceHandle, - cryptarchia: ServiceHandle, + cryptarchia: ServiceHandle, http: ServiceHandle, storage: ServiceHandle>>, #[cfg(feature = "metrics")] diff --git a/nodes/nomos-node/src/config.rs b/nodes/nomos-node/src/config.rs index 9e91487a..aade250b 100644 --- a/nodes/nomos-node/src/config.rs +++ b/nodes/nomos-node/src/config.rs @@ -21,7 +21,7 @@ use serde::{Deserialize, Serialize}; use subnetworks_assignations::versions::v1::FillFromNodeList; use tracing::Level; // internal -use crate::{NomosApiService, Wire}; +use crate::{NomosApiService, NomosDaMembership, Wire}; #[derive(ValueEnum, Clone, Debug, Default)] pub enum LoggerBackendType { @@ -120,7 +120,9 @@ pub struct Config { pub da_verifier: ::Settings, pub da_sampling: ::Settings, pub http: ::Settings, - pub cryptarchia: ::Settings, + pub cryptarchia: , + > as ServiceData>::Settings, pub storage: > as ServiceData>::Settings, } @@ -245,7 +247,7 @@ pub fn update_http( } pub fn update_cryptarchia_consensus( - cryptarchia: &mut ::Settings, + cryptarchia: &mut ::Settings, consensus_args: CryptarchiaArgs, ) -> Result<()> { let CryptarchiaArgs { diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index 22bd80fe..4a8e14e8 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -22,7 +22,7 @@ use nomos_da_indexer::DataIndexerService; pub use nomos_da_network_service::backends::libp2p::validator::DaNetworkValidatorBackend; pub use nomos_da_network_service::NetworkService as DaNetworkService; use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackend; -use nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter as SamplingLibp2pAdapter; +use nomos_da_sampling::network::adapters::validator::Libp2pAdapter as SamplingLibp2pAdapter; use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapter as SamplingStorageAdapter; use nomos_da_sampling::DaSamplingService; use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier; @@ -72,7 +72,7 @@ pub type NomosApiService = ApiService< Tx, Wire, KzgrsSamplingBackend, - nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter, + nomos_da_sampling::network::adapters::validator::Libp2pAdapter, ChaCha20Rng, SamplingStorageAdapter, MB16, @@ -83,7 +83,7 @@ pub const CL_TOPIC: &str = "cl"; pub const DA_TOPIC: &str = "da"; pub const MB16: usize = 1024 * 1024 * 16; -pub type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus< +pub type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus< cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter, MockPool::Hash>, MempoolNetworkAdapter::Hash>, @@ -93,11 +93,14 @@ pub type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus< FillSizeWithBlobs, RocksBackend, KzgrsSamplingBackend, - nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter, + SamplingAdapter, ChaCha20Rng, SamplingStorageAdapter, >; +pub type NodeCryptarchia = + Cryptarchia>; + pub type TxMempool = TxMempoolService< MempoolNetworkAdapter::Hash>, MockPool::Hash>, @@ -107,7 +110,7 @@ pub type DaMempool = DaMempoolService< MempoolNetworkAdapter::BlobId>, MockPool::BlobId>, KzgrsSamplingBackend, - nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter, + nomos_da_sampling::network::adapters::validator::Libp2pAdapter, ChaCha20Rng, SamplingStorageAdapter, >; @@ -127,7 +130,7 @@ pub type DaIndexer = DataIndexerService< FillSizeWithBlobs, RocksBackend, KzgrsSamplingBackend, - nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter, + nomos_da_sampling::network::adapters::validator::Libp2pAdapter, ChaCha20Rng, SamplingStorageAdapter, >; @@ -156,7 +159,7 @@ pub struct Nomos { da_network: ServiceHandle>>, cl_mempool: ServiceHandle, da_mempool: ServiceHandle, - cryptarchia: ServiceHandle, + cryptarchia: ServiceHandle, http: ServiceHandle, storage: ServiceHandle>>, #[cfg(feature = "metrics")] diff --git a/nomos-services/data-availability/sampling/src/network/adapters/common.rs b/nomos-services/data-availability/sampling/src/network/adapters/common.rs new file mode 100644 index 00000000..e98d7176 --- /dev/null +++ b/nomos-services/data-availability/sampling/src/network/adapters/common.rs @@ -0,0 +1,87 @@ +macro_rules! adapter_for { + ($DaNetworkBackend:ident, $DaNetworkMessage:ident, $DaEventKind:ident, $DaNetworkEvent:ident) => { + pub struct Libp2pAdapter + where + Membership: MembershipHandler + + Debug + + Clone + + Send + + Sync + + 'static, + { + network_relay: OutboundRelay< + > as ServiceData>::Message, + >, + } + + #[async_trait::async_trait] + impl NetworkAdapter for Libp2pAdapter + where + Membership: MembershipHandler + + Debug + + Clone + + Send + + Sync + + 'static, + { + type Backend = $DaNetworkBackend; + type Settings = (); + + async fn new( + network_relay: OutboundRelay< as ServiceData>::Message>, + ) -> Self { + Self { network_relay } + } + + async fn start_sampling( + &mut self, + blob_id: BlobId, + subnets: &[SubnetworkId], + ) -> Result<(), DynError> { + for id in subnets { + let subnetwork_id = id; + self.network_relay + .send(DaNetworkMsg::Process($DaNetworkMessage::RequestSample { + blob_id, + subnetwork_id: *subnetwork_id, + })) + .await + .expect("RequestSample message should have been sent") + } + Ok(()) + } + + #[allow(unreachable_patterns)] + async fn listen_to_sampling_messages( + &self, + ) -> Result + Send>>, DynError> { + let (stream_sender, stream_receiver) = oneshot::channel(); + self.network_relay + .send(DaNetworkMsg::Subscribe { + kind: $DaEventKind::Sampling, + sender: stream_sender, + }) + .await + .map_err(|(error, _)| error)?; + stream_receiver + .await + .map(|stream| { + tokio_stream::StreamExt::filter_map(stream, |event| match event { + $DaNetworkEvent::Sampling(event) => { + Some(event) + } + $DaNetworkEvent::Verifying(_) => { + unreachable!("Subscribirng to sampling events should return a sampling only event stream"); + } + _ => { + unreachable!(); + } + }).boxed() + }) + .map_err(|error| Box::new(error) as DynError) + } + } + } +} + +pub(crate) use adapter_for; diff --git a/nomos-services/data-availability/sampling/src/network/adapters/executor.rs b/nomos-services/data-availability/sampling/src/network/adapters/executor.rs new file mode 100644 index 00000000..2960e145 --- /dev/null +++ b/nomos-services/data-availability/sampling/src/network/adapters/executor.rs @@ -0,0 +1,31 @@ +// std +use std::fmt::Debug; +use std::pin::Pin; + +// crates +use futures::{Stream, StreamExt}; +use libp2p_identity::PeerId; +use tokio::sync::oneshot; +// internal +use crate::network::adapters::common::adapter_for; +use crate::network::NetworkAdapter; +use nomos_core::da::BlobId; +use nomos_da_network_core::SubnetworkId; +use nomos_da_network_service::backends::libp2p::{ + common::SamplingEvent, + executor::{ + DaNetworkEvent, DaNetworkEventKind, DaNetworkExecutorBackend, ExecutorDaNetworkMessage, + }, +}; +use nomos_da_network_service::{DaNetworkMsg, NetworkService}; +use overwatch_rs::services::relay::OutboundRelay; +use overwatch_rs::services::ServiceData; +use overwatch_rs::DynError; +use subnetworks_assignations::MembershipHandler; + +adapter_for!( + DaNetworkExecutorBackend, + ExecutorDaNetworkMessage, + DaNetworkEventKind, + DaNetworkEvent +); diff --git a/nomos-services/data-availability/sampling/src/network/adapters/libp2p.rs b/nomos-services/data-availability/sampling/src/network/adapters/libp2p.rs deleted file mode 100644 index 5ce7c959..00000000 --- a/nomos-services/data-availability/sampling/src/network/adapters/libp2p.rs +++ /dev/null @@ -1,99 +0,0 @@ -// std -use std::fmt::Debug; -use std::pin::Pin; - -// crates -use futures::{Stream, StreamExt}; -use libp2p_identity::PeerId; -use tokio::sync::oneshot; -// internal -use crate::network::NetworkAdapter; -use nomos_core::da::BlobId; -use nomos_da_network_core::SubnetworkId; -use nomos_da_network_service::backends::libp2p::{ - common::SamplingEvent, - validator::{DaNetworkEvent, DaNetworkEventKind, DaNetworkMessage, DaNetworkValidatorBackend}, -}; -use nomos_da_network_service::{DaNetworkMsg, NetworkService}; -use overwatch_rs::services::relay::OutboundRelay; -use overwatch_rs::services::ServiceData; -use overwatch_rs::DynError; -use subnetworks_assignations::MembershipHandler; - -pub struct Libp2pAdapter -where - Membership: MembershipHandler - + Debug - + Clone - + Send - + Sync - + 'static, -{ - network_relay: OutboundRelay< - > as ServiceData>::Message, - >, -} - -#[async_trait::async_trait] -impl NetworkAdapter for Libp2pAdapter -where - Membership: MembershipHandler - + Debug - + Clone - + Send - + Sync - + 'static, -{ - type Backend = DaNetworkValidatorBackend; - type Settings = (); - - async fn new( - network_relay: OutboundRelay< as ServiceData>::Message>, - ) -> Self { - Self { network_relay } - } - - async fn start_sampling( - &mut self, - blob_id: BlobId, - subnets: &[SubnetworkId], - ) -> Result<(), DynError> { - for id in subnets { - let subnetwork_id = id; - self.network_relay - .send(DaNetworkMsg::Process(DaNetworkMessage::RequestSample { - blob_id, - subnetwork_id: *subnetwork_id, - })) - .await - .expect("RequestSample message should have been sent") - } - Ok(()) - } - - async fn listen_to_sampling_messages( - &self, - ) -> Result + Send>>, DynError> { - let (stream_sender, stream_receiver) = oneshot::channel(); - self.network_relay - .send(DaNetworkMsg::Subscribe { - kind: DaNetworkEventKind::Sampling, - sender: stream_sender, - }) - .await - .map_err(|(error, _)| error)?; - stream_receiver - .await - .map(|stream| { - tokio_stream::StreamExt::filter_map(stream, |event| match event { - DaNetworkEvent::Sampling(event) => { - Some(event) - } - DaNetworkEvent::Verifying(_) => { - unreachable!("Subscribirng to sampling events should return a sampling only event stream"); - } - }).boxed() - }) - .map_err(|error| Box::new(error) as DynError) - } -} diff --git a/nomos-services/data-availability/sampling/src/network/adapters/mod.rs b/nomos-services/data-availability/sampling/src/network/adapters/mod.rs index a22ade97..5cac502a 100644 --- a/nomos-services/data-availability/sampling/src/network/adapters/mod.rs +++ b/nomos-services/data-availability/sampling/src/network/adapters/mod.rs @@ -1,2 +1,6 @@ #[cfg(feature = "libp2p")] -pub mod libp2p; +pub mod common; +#[cfg(feature = "libp2p")] +pub mod executor; +#[cfg(feature = "libp2p")] +pub mod validator; diff --git a/nomos-services/data-availability/sampling/src/network/adapters/validator.rs b/nomos-services/data-availability/sampling/src/network/adapters/validator.rs new file mode 100644 index 00000000..5cee6d78 --- /dev/null +++ b/nomos-services/data-availability/sampling/src/network/adapters/validator.rs @@ -0,0 +1,29 @@ +// std +use std::fmt::Debug; +use std::pin::Pin; + +// crates +use futures::{Stream, StreamExt}; +use libp2p_identity::PeerId; +use tokio::sync::oneshot; +// internal +use crate::network::adapters::common::adapter_for; +use crate::network::NetworkAdapter; +use nomos_core::da::BlobId; +use nomos_da_network_core::SubnetworkId; +use nomos_da_network_service::backends::libp2p::{ + common::SamplingEvent, + validator::{DaNetworkEvent, DaNetworkEventKind, DaNetworkMessage, DaNetworkValidatorBackend}, +}; +use nomos_da_network_service::{DaNetworkMsg, NetworkService}; +use overwatch_rs::services::relay::OutboundRelay; +use overwatch_rs::services::ServiceData; +use overwatch_rs::DynError; +use subnetworks_assignations::MembershipHandler; + +adapter_for!( + DaNetworkValidatorBackend, + DaNetworkMessage, + DaNetworkEventKind, + DaNetworkEvent +); diff --git a/nomos-services/data-availability/tests/src/common.rs b/nomos-services/data-availability/tests/src/common.rs index 59c05435..935391b1 100644 --- a/nomos-services/data-availability/tests/src/common.rs +++ b/nomos-services/data-availability/tests/src/common.rs @@ -34,7 +34,7 @@ use nomos_da_sampling::DaSamplingService; use nomos_da_sampling::DaSamplingServiceSettings; use nomos_da_sampling::{ backend::kzgrs::KzgrsSamplingBackend, - network::adapters::libp2p::Libp2pAdapter as SamplingLibp2pAdapter, + network::adapters::validator::Libp2pAdapter as SamplingLibp2pAdapter, }; use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier; use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings; @@ -132,7 +132,7 @@ pub type DaMempool = DaMempoolService< MempoolNetworkAdapter::BlobId>, MockPool::BlobId>, KzgrsSamplingBackend, - nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter, + nomos_da_sampling::network::adapters::validator::Libp2pAdapter, IntegrationRng, SamplingStorageAdapter, >;