1
0
mirror of synced 2025-01-23 22:18:54 +00:00

DA: Executor sampling adapter (#815)

* Add macro to share adapter implementation for both validator and executor

* Fix imports in binaries

* Adapt cryptarchia types in binaries

* Fix tests build
This commit is contained in:
Daniel Sanchez 2024-10-09 12:32:00 +02:00 committed by GitHub
parent 0d6e14c773
commit 461a9cacdf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 176 additions and 116 deletions

View File

@ -26,7 +26,7 @@ pub struct Config {
pub da_verifier: <crate::DaVerifier as ServiceData>::Settings,
pub da_sampling: <crate::DaSampling as ServiceData>::Settings,
pub http: <ExecutorApiService as ServiceData>::Settings,
pub cryptarchia: <crate::Cryptarchia as ServiceData>::Settings,
pub cryptarchia: <crate::ExecutorCryptarchia as ServiceData>::Settings,
pub storage: <crate::StorageService<RocksBackend<Wire>> as ServiceData>::Settings,
pub wait_online_secs: u64,
}

View File

@ -36,7 +36,7 @@ pub type ExecutorApiService = ApiService<
DispersalMempoolAdapter,
kzgrs_backend::dispersal::Metadata,
KzgrsSamplingBackend<ChaCha20Rng>,
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
nomos_da_sampling::network::adapters::executor::Libp2pAdapter<NomosDaMembership>,
ChaCha20Rng,
SamplingStorageAdapter<DaBlob, Wire>,
MB16,
@ -47,7 +47,7 @@ pub type DispersalMempoolAdapter = KzgrsMempoolAdapter<
MempoolNetworkAdapter<BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
MockPool<HeaderId, BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
KzgrsSamplingBackend<ChaCha20Rng>,
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
nomos_da_sampling::network::adapters::executor::Libp2pAdapter<NomosDaMembership>,
ChaCha20Rng,
SamplingStorageAdapter<DaBlob, Wire>,
>;
@ -60,6 +60,9 @@ pub type DaDispersal = DispersalService<
kzgrs_backend::dispersal::Metadata,
>;
pub type ExecutorCryptarchia =
Cryptarchia<nomos_da_sampling::network::adapters::executor::Libp2pAdapter<NomosDaMembership>>;
#[derive(Services)]
pub struct NomosExecutor {
#[cfg(feature = "tracing")]
@ -72,7 +75,7 @@ pub struct NomosExecutor {
da_network: ServiceHandle<DaNetworkService<DaNetworkExecutorBackend<NomosDaMembership>>>,
cl_mempool: ServiceHandle<TxMempool>,
da_mempool: ServiceHandle<DaMempool>,
cryptarchia: ServiceHandle<Cryptarchia>,
cryptarchia: ServiceHandle<ExecutorCryptarchia>,
http: ServiceHandle<ExecutorApiService>,
storage: ServiceHandle<StorageService<RocksBackend<Wire>>>,
#[cfg(feature = "metrics")]

View File

@ -21,7 +21,7 @@ use serde::{Deserialize, Serialize};
use subnetworks_assignations::versions::v1::FillFromNodeList;
use tracing::Level;
// internal
use crate::{NomosApiService, Wire};
use crate::{NomosApiService, NomosDaMembership, Wire};
#[derive(ValueEnum, Clone, Debug, Default)]
pub enum LoggerBackendType {
@ -120,7 +120,9 @@ pub struct Config {
pub da_verifier: <crate::DaVerifier as ServiceData>::Settings,
pub da_sampling: <crate::DaSampling as ServiceData>::Settings,
pub http: <NomosApiService as ServiceData>::Settings,
pub cryptarchia: <crate::Cryptarchia as ServiceData>::Settings,
pub cryptarchia: <crate::Cryptarchia<
nomos_da_sampling::network::adapters::validator::Libp2pAdapter<NomosDaMembership>,
> as ServiceData>::Settings,
pub storage: <crate::StorageService<RocksBackend<Wire>> as ServiceData>::Settings,
}
@ -245,7 +247,7 @@ pub fn update_http(
}
pub fn update_cryptarchia_consensus(
cryptarchia: &mut <crate::Cryptarchia as ServiceData>::Settings,
cryptarchia: &mut <crate::NodeCryptarchia as ServiceData>::Settings,
consensus_args: CryptarchiaArgs,
) -> Result<()> {
let CryptarchiaArgs {

View File

@ -22,7 +22,7 @@ use nomos_da_indexer::DataIndexerService;
pub use nomos_da_network_service::backends::libp2p::validator::DaNetworkValidatorBackend;
pub use nomos_da_network_service::NetworkService as DaNetworkService;
use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackend;
use nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter as SamplingLibp2pAdapter;
use nomos_da_sampling::network::adapters::validator::Libp2pAdapter as SamplingLibp2pAdapter;
use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapter as SamplingStorageAdapter;
use nomos_da_sampling::DaSamplingService;
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier;
@ -72,7 +72,7 @@ pub type NomosApiService = ApiService<
Tx,
Wire,
KzgrsSamplingBackend<ChaCha20Rng>,
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
nomos_da_sampling::network::adapters::validator::Libp2pAdapter<NomosDaMembership>,
ChaCha20Rng,
SamplingStorageAdapter<DaBlob, Wire>,
MB16,
@ -83,7 +83,7 @@ pub const CL_TOPIC: &str = "cl";
pub const DA_TOPIC: &str = "da";
pub const MB16: usize = 1024 * 1024 * 16;
pub type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus<
pub type Cryptarchia<SamplingAdapter> = cryptarchia_consensus::CryptarchiaConsensus<
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter<Tx, BlobInfo>,
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
MempoolNetworkAdapter<Tx, <Tx as Transaction>::Hash>,
@ -93,11 +93,14 @@ pub type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus<
FillSizeWithBlobs<MB16, BlobInfo>,
RocksBackend<Wire>,
KzgrsSamplingBackend<ChaCha20Rng>,
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
SamplingAdapter,
ChaCha20Rng,
SamplingStorageAdapter<DaBlob, Wire>,
>;
pub type NodeCryptarchia =
Cryptarchia<nomos_da_sampling::network::adapters::validator::Libp2pAdapter<NomosDaMembership>>;
pub type TxMempool = TxMempoolService<
MempoolNetworkAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
@ -107,7 +110,7 @@ pub type DaMempool = DaMempoolService<
MempoolNetworkAdapter<BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
MockPool<HeaderId, BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
KzgrsSamplingBackend<ChaCha20Rng>,
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
nomos_da_sampling::network::adapters::validator::Libp2pAdapter<NomosDaMembership>,
ChaCha20Rng,
SamplingStorageAdapter<DaBlob, Wire>,
>;
@ -127,7 +130,7 @@ pub type DaIndexer = DataIndexerService<
FillSizeWithBlobs<MB16, BlobInfo>,
RocksBackend<Wire>,
KzgrsSamplingBackend<ChaCha20Rng>,
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
nomos_da_sampling::network::adapters::validator::Libp2pAdapter<NomosDaMembership>,
ChaCha20Rng,
SamplingStorageAdapter<DaBlob, Wire>,
>;
@ -156,7 +159,7 @@ pub struct Nomos {
da_network: ServiceHandle<DaNetworkService<DaNetworkValidatorBackend<NomosDaMembership>>>,
cl_mempool: ServiceHandle<TxMempool>,
da_mempool: ServiceHandle<DaMempool>,
cryptarchia: ServiceHandle<Cryptarchia>,
cryptarchia: ServiceHandle<NodeCryptarchia>,
http: ServiceHandle<NomosApiService>,
storage: ServiceHandle<StorageService<RocksBackend<Wire>>>,
#[cfg(feature = "metrics")]

View File

@ -0,0 +1,87 @@
macro_rules! adapter_for {
($DaNetworkBackend:ident, $DaNetworkMessage:ident, $DaEventKind:ident, $DaNetworkEvent:ident) => {
pub struct Libp2pAdapter<Membership>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Debug
+ Clone
+ Send
+ Sync
+ 'static,
{
network_relay: OutboundRelay<
<NetworkService<$DaNetworkBackend<Membership>> as ServiceData>::Message,
>,
}
#[async_trait::async_trait]
impl<Membership> NetworkAdapter for Libp2pAdapter<Membership>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Debug
+ Clone
+ Send
+ Sync
+ 'static,
{
type Backend = $DaNetworkBackend<Membership>;
type Settings = ();
async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self {
Self { network_relay }
}
async fn start_sampling(
&mut self,
blob_id: BlobId,
subnets: &[SubnetworkId],
) -> Result<(), DynError> {
for id in subnets {
let subnetwork_id = id;
self.network_relay
.send(DaNetworkMsg::Process($DaNetworkMessage::RequestSample {
blob_id,
subnetwork_id: *subnetwork_id,
}))
.await
.expect("RequestSample message should have been sent")
}
Ok(())
}
#[allow(unreachable_patterns)]
async fn listen_to_sampling_messages(
&self,
) -> Result<Pin<Box<dyn Stream<Item = SamplingEvent> + Send>>, DynError> {
let (stream_sender, stream_receiver) = oneshot::channel();
self.network_relay
.send(DaNetworkMsg::Subscribe {
kind: $DaEventKind::Sampling,
sender: stream_sender,
})
.await
.map_err(|(error, _)| error)?;
stream_receiver
.await
.map(|stream| {
tokio_stream::StreamExt::filter_map(stream, |event| match event {
$DaNetworkEvent::Sampling(event) => {
Some(event)
}
$DaNetworkEvent::Verifying(_) => {
unreachable!("Subscribirng to sampling events should return a sampling only event stream");
}
_ => {
unreachable!();
}
}).boxed()
})
.map_err(|error| Box::new(error) as DynError)
}
}
}
}
pub(crate) use adapter_for;

View File

@ -0,0 +1,31 @@
// std
use std::fmt::Debug;
use std::pin::Pin;
// crates
use futures::{Stream, StreamExt};
use libp2p_identity::PeerId;
use tokio::sync::oneshot;
// internal
use crate::network::adapters::common::adapter_for;
use crate::network::NetworkAdapter;
use nomos_core::da::BlobId;
use nomos_da_network_core::SubnetworkId;
use nomos_da_network_service::backends::libp2p::{
common::SamplingEvent,
executor::{
DaNetworkEvent, DaNetworkEventKind, DaNetworkExecutorBackend, ExecutorDaNetworkMessage,
},
};
use nomos_da_network_service::{DaNetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use overwatch_rs::DynError;
use subnetworks_assignations::MembershipHandler;
adapter_for!(
DaNetworkExecutorBackend,
ExecutorDaNetworkMessage,
DaNetworkEventKind,
DaNetworkEvent
);

View File

@ -1,99 +0,0 @@
// std
use std::fmt::Debug;
use std::pin::Pin;
// crates
use futures::{Stream, StreamExt};
use libp2p_identity::PeerId;
use tokio::sync::oneshot;
// internal
use crate::network::NetworkAdapter;
use nomos_core::da::BlobId;
use nomos_da_network_core::SubnetworkId;
use nomos_da_network_service::backends::libp2p::{
common::SamplingEvent,
validator::{DaNetworkEvent, DaNetworkEventKind, DaNetworkMessage, DaNetworkValidatorBackend},
};
use nomos_da_network_service::{DaNetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use overwatch_rs::DynError;
use subnetworks_assignations::MembershipHandler;
pub struct Libp2pAdapter<Membership>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Debug
+ Clone
+ Send
+ Sync
+ 'static,
{
network_relay: OutboundRelay<
<NetworkService<DaNetworkValidatorBackend<Membership>> as ServiceData>::Message,
>,
}
#[async_trait::async_trait]
impl<Membership> NetworkAdapter for Libp2pAdapter<Membership>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Debug
+ Clone
+ Send
+ Sync
+ 'static,
{
type Backend = DaNetworkValidatorBackend<Membership>;
type Settings = ();
async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self {
Self { network_relay }
}
async fn start_sampling(
&mut self,
blob_id: BlobId,
subnets: &[SubnetworkId],
) -> Result<(), DynError> {
for id in subnets {
let subnetwork_id = id;
self.network_relay
.send(DaNetworkMsg::Process(DaNetworkMessage::RequestSample {
blob_id,
subnetwork_id: *subnetwork_id,
}))
.await
.expect("RequestSample message should have been sent")
}
Ok(())
}
async fn listen_to_sampling_messages(
&self,
) -> Result<Pin<Box<dyn Stream<Item = SamplingEvent> + Send>>, DynError> {
let (stream_sender, stream_receiver) = oneshot::channel();
self.network_relay
.send(DaNetworkMsg::Subscribe {
kind: DaNetworkEventKind::Sampling,
sender: stream_sender,
})
.await
.map_err(|(error, _)| error)?;
stream_receiver
.await
.map(|stream| {
tokio_stream::StreamExt::filter_map(stream, |event| match event {
DaNetworkEvent::Sampling(event) => {
Some(event)
}
DaNetworkEvent::Verifying(_) => {
unreachable!("Subscribirng to sampling events should return a sampling only event stream");
}
}).boxed()
})
.map_err(|error| Box::new(error) as DynError)
}
}

View File

@ -1,2 +1,6 @@
#[cfg(feature = "libp2p")]
pub mod libp2p;
pub mod common;
#[cfg(feature = "libp2p")]
pub mod executor;
#[cfg(feature = "libp2p")]
pub mod validator;

View File

@ -0,0 +1,29 @@
// std
use std::fmt::Debug;
use std::pin::Pin;
// crates
use futures::{Stream, StreamExt};
use libp2p_identity::PeerId;
use tokio::sync::oneshot;
// internal
use crate::network::adapters::common::adapter_for;
use crate::network::NetworkAdapter;
use nomos_core::da::BlobId;
use nomos_da_network_core::SubnetworkId;
use nomos_da_network_service::backends::libp2p::{
common::SamplingEvent,
validator::{DaNetworkEvent, DaNetworkEventKind, DaNetworkMessage, DaNetworkValidatorBackend},
};
use nomos_da_network_service::{DaNetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use overwatch_rs::DynError;
use subnetworks_assignations::MembershipHandler;
adapter_for!(
DaNetworkValidatorBackend,
DaNetworkMessage,
DaNetworkEventKind,
DaNetworkEvent
);

View File

@ -34,7 +34,7 @@ use nomos_da_sampling::DaSamplingService;
use nomos_da_sampling::DaSamplingServiceSettings;
use nomos_da_sampling::{
backend::kzgrs::KzgrsSamplingBackend,
network::adapters::libp2p::Libp2pAdapter as SamplingLibp2pAdapter,
network::adapters::validator::Libp2pAdapter as SamplingLibp2pAdapter,
};
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier;
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings;
@ -132,7 +132,7 @@ pub type DaMempool = DaMempoolService<
MempoolNetworkAdapter<BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
MockPool<HeaderId, BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
KzgrsSamplingBackend<IntegrationRng>,
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
nomos_da_sampling::network::adapters::validator::Libp2pAdapter<NomosDaMembership>,
IntegrationRng,
SamplingStorageAdapter<DaBlob, Wire>,
>;