From 0cb039d8062310c01d3dcd6dcbb4205fcf2ac27d Mon Sep 17 00:00:00 2001 From: holisticode <88287+holisticode@users.noreply.github.com> Date: Tue, 27 Aug 2024 13:42:52 -0500 Subject: [PATCH] DA: Sampling service (#705) * initial * first iteration, lots of lifetime and trait issues, does not compile * Daniel/sampling service (#706) * Implement sampling service * Implement libp2p adapter listen_to_sampling_messages * temporary empty backend mod; implement start_sampling * addressed PR comments; moved random sampling subnet list generation to backend (todo impl) * addressed PR nitpicks * remove empty mod which was failing CI due to cargo fmt * removed unused code --------- Co-authored-by: holisticode Co-authored-by: Daniel Sanchez --- Cargo.toml | 1 + nomos-da/network/core/Cargo.toml | 4 +- .../data-availability/sampling/Cargo.toml | 29 +++ .../sampling/src/backend/kzgrs.rs | 2 + .../sampling/src/backend/mod.rs | 21 ++ .../data-availability/sampling/src/lib.rs | 202 ++++++++++++++++++ .../sampling/src/network/adapters/libp2p.rs | 104 +++++++++ .../sampling/src/network/adapters/mod.rs | 2 + .../sampling/src/network/mod.rs | 31 +++ 9 files changed, 394 insertions(+), 2 deletions(-) create mode 100644 nomos-services/data-availability/sampling/Cargo.toml create mode 100644 nomos-services/data-availability/sampling/src/backend/kzgrs.rs create mode 100644 nomos-services/data-availability/sampling/src/backend/mod.rs create mode 100644 nomos-services/data-availability/sampling/src/lib.rs create mode 100644 nomos-services/data-availability/sampling/src/network/adapters/libp2p.rs create mode 100644 nomos-services/data-availability/sampling/src/network/adapters/mod.rs create mode 100644 nomos-services/data-availability/sampling/src/network/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 582abb78..e36a55e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ members = [ "nomos-services/system-sig", "nomos-services/data-availability/indexer", "nomos-services/data-availability/network", + "nomos-services/data-availability/sampling", "nomos-services/data-availability/verifier", "nomos-services/data-availability/tests", "nomos-da/full-replication", diff --git a/nomos-da/network/core/Cargo.toml b/nomos-da/network/core/Cargo.toml index efd5f5bf..63e1dcc4 100644 --- a/nomos-da/network/core/Cargo.toml +++ b/nomos-da/network/core/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -libp2p = { version = "0.53", features = ["macros"] } +libp2p = { version = "0.53", features = ["macros", "tokio", "quic"] } libp2p-stream = "0.1.0-alpha" futures = "0.3" tracing = "0.1" @@ -25,6 +25,6 @@ thiserror = "1.0" [dev-dependencies] tokio = { version = "1.39", features = ["macros", "rt-multi-thread", "time"] } -libp2p = { version = "0.53", features = ["ed25519", "ping", "macros", "tokio", "quic", "tcp", "yamux", "noise"] } +libp2p = { version = "0.53", features = ["ed25519", "ping", "macros", "quic", "tcp", "yamux", "noise"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/nomos-services/data-availability/sampling/Cargo.toml b/nomos-services/data-availability/sampling/Cargo.toml new file mode 100644 index 00000000..ae1c38a0 --- /dev/null +++ b/nomos-services/data-availability/sampling/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "nomos-da-sampling" +version = "0.1.0" +edition = "2021" + +[dependencies] +async-trait = "0.1" +bytes = "1.2" +futures = "0.3" +kzgrs-backend = { path = "../../../nomos-da/kzgrs-backend" } +libp2p-identity = { version = "0.2" } +nomos-core = { path = "../../../nomos-core" } +nomos-da-network-core = { path = "../../../nomos-da/network/core" } +nomos-da-network-service = { path = "../../../nomos-services/data-availability/network" } +nomos-storage = { path = "../../../nomos-services/storage" } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } +overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } +serde = { version = "1.0", features = ["derive"] } +subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" } +tokio = { version = "1", features = ["sync", "macros"] } +tokio-stream = "0.1.15" +tracing = "0.1" +thiserror = "1.0.63" +rand = "0.8.5" +rand_chacha = "0.3.1" + +[features] +default = ["libp2p"] +libp2p = [] diff --git a/nomos-services/data-availability/sampling/src/backend/kzgrs.rs b/nomos-services/data-availability/sampling/src/backend/kzgrs.rs new file mode 100644 index 00000000..139597f9 --- /dev/null +++ b/nomos-services/data-availability/sampling/src/backend/kzgrs.rs @@ -0,0 +1,2 @@ + + diff --git a/nomos-services/data-availability/sampling/src/backend/mod.rs b/nomos-services/data-availability/sampling/src/backend/mod.rs new file mode 100644 index 00000000..4d508843 --- /dev/null +++ b/nomos-services/data-availability/sampling/src/backend/mod.rs @@ -0,0 +1,21 @@ +// std +use std::collections::BTreeSet; + +// crates +// +// internal +use nomos_da_network_core::SubnetworkId; + +#[async_trait::async_trait] +pub trait DaSamplingServiceBackend { + type Settings; + type BlobId; + type Blob; + + fn new(settings: Self::Settings) -> Self; + async fn get_validated_blobs(&self) -> BTreeSet; + async fn mark_in_block(&mut self, blobs_id: &[Self::BlobId]); + async fn handle_sampling_success(&mut self, blob_id: Self::BlobId, blob: Self::Blob); + async fn handle_sampling_error(&mut self, blob_id: Self::BlobId); + async fn init_sampling(&mut self, blob_id: Self::BlobId) -> Vec; +} diff --git a/nomos-services/data-availability/sampling/src/lib.rs b/nomos-services/data-availability/sampling/src/lib.rs new file mode 100644 index 00000000..67bf5b8d --- /dev/null +++ b/nomos-services/data-availability/sampling/src/lib.rs @@ -0,0 +1,202 @@ +pub mod backend; +pub mod network; + +// std +use std::collections::BTreeSet; +use std::fmt::Debug; + +// crates +use tokio_stream::StreamExt; +use tracing::{error, span, Instrument, Level}; +// internal +use backend::DaSamplingServiceBackend; +use kzgrs_backend::common::blob::DaBlob; +use network::NetworkAdapter; +use nomos_core::da::BlobId; +use nomos_da_network_service::backends::libp2p::validator::SamplingEvent; +use nomos_da_network_service::NetworkService; +use overwatch_rs::services::handle::ServiceStateHandle; +use overwatch_rs::services::life_cycle::LifecycleMessage; +use overwatch_rs::services::relay::{Relay, RelayMessage}; +use overwatch_rs::services::state::{NoOperator, NoState}; +use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; +use overwatch_rs::DynError; +use tokio::sync::oneshot; + +const DA_SAMPLING_TAG: ServiceId = "DA-Sampling"; + +#[derive(Debug)] +pub enum DaSamplingServiceMsg { + TriggerSampling { + blob_id: BlobId, + }, + GetValidatedBlobs { + reply_channel: oneshot::Sender>, + }, + MarkInBlock { + blobs_id: Vec, + }, +} + +#[derive(Debug, Clone)] +pub struct DaSamplingServiceSettings { + pub sampling_settings: BackendSettings, + pub network_adapter_settings: NetworkSettings, +} + +impl RelayMessage for DaSamplingServiceMsg {} + +pub struct DaSamplingService +where + Backend: DaSamplingServiceBackend + Send, + Backend::Settings: Clone, + Backend::Blob: Debug + 'static, + Backend::BlobId: Debug + 'static, + N: NetworkAdapter, + N::Settings: Clone, +{ + network_relay: Relay>, + service_state: ServiceStateHandle, + sampler: Backend, +} + +impl DaSamplingService +where + Backend: DaSamplingServiceBackend + Send + 'static, + Backend::Settings: Clone, + N: NetworkAdapter + Send + 'static, + N::Settings: Clone, +{ + async fn should_stop_service(message: LifecycleMessage) -> bool { + match message { + LifecycleMessage::Shutdown(sender) => { + if sender.send(()).is_err() { + error!( + "Error sending successful shutdown signal from service {}", + Self::SERVICE_ID + ); + } + true + } + LifecycleMessage::Kill => true, + } + } + + async fn handle_service_message( + msg: ::Message, + network_adapter: &mut N, + sampler: &mut Backend, + ) { + match msg { + DaSamplingServiceMsg::TriggerSampling { blob_id } => { + let sampling_subnets = sampler.init_sampling(blob_id).await; + if let Err(e) = network_adapter + .start_sampling(blob_id, &sampling_subnets) + .await + { + error!("Error sampling for BlobId: {blob_id:?}: {e}"); + } + } + DaSamplingServiceMsg::GetValidatedBlobs { reply_channel } => { + let validated_blobs = sampler.get_validated_blobs().await; + if let Err(_e) = reply_channel.send(validated_blobs) { + error!("Error repliying validated blobs request"); + } + } + DaSamplingServiceMsg::MarkInBlock { blobs_id } => { + sampler.mark_in_block(&blobs_id).await; + } + } + } + + async fn handle_sampling_message(event: SamplingEvent, sampler: &mut Backend) { + match event { + SamplingEvent::SamplingSuccess { blob_id, blob } => { + sampler.handle_sampling_success(blob_id, *blob).await; + } + SamplingEvent::SamplingError { error } => { + error!("Error while sampling: {error}"); + } + } + } +} + +impl ServiceData for DaSamplingService +where + Backend: DaSamplingServiceBackend + Send, + Backend::Settings: Clone, + Backend::Blob: Debug + 'static, + Backend::BlobId: Debug + 'static, + N: NetworkAdapter, + N::Settings: Clone, +{ + const SERVICE_ID: ServiceId = DA_SAMPLING_TAG; + type Settings = DaSamplingServiceSettings; + type State = NoState; + type StateOperator = NoOperator; + type Message = DaSamplingServiceMsg; +} + +#[async_trait::async_trait] +impl ServiceCore for DaSamplingService +where + Backend: DaSamplingServiceBackend + Send + Sync + 'static, + Backend::Settings: Clone + Send + Sync + 'static, + N: NetworkAdapter + Send + Sync + 'static, + N::Settings: Clone + Send + Sync + 'static, +{ + fn init(service_state: ServiceStateHandle) -> Result { + let DaSamplingServiceSettings { + sampling_settings, .. + } = service_state.settings_reader.get_updated_settings(); + + let network_relay = service_state.overwatch_handle.relay(); + + Ok(Self { + network_relay, + service_state, + sampler: Backend::new(sampling_settings), + }) + } + + async fn run(self) -> Result<(), DynError> { + // This service will likely have to be modified later on. + // Most probably the verifier itself need to be constructed/update for every message with + // an updated list of the available nodes list, as it needs his own index coming from the + // position of his bls public key landing in the above-mentioned list. + let Self { + network_relay, + mut service_state, + mut sampler, + } = self; + let DaSamplingServiceSettings { .. } = service_state.settings_reader.get_updated_settings(); + + let network_relay = network_relay.connect().await?; + let mut network_adapter = N::new(network_relay).await; + + let mut sampling_message_stream = network_adapter.listen_to_sampling_messages().await?; + + let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); + async { + loop { + tokio::select! { + Some(service_message) = service_state.inbound_relay.recv() => { + Self::handle_service_message(service_message, &mut network_adapter, &mut sampler).await; + } + Some(sampling_message) = sampling_message_stream.next() => { + Self::handle_sampling_message(sampling_message, &mut sampler).await; + } + Some(msg) = lifecycle_stream.next() => { + if Self::should_stop_service(msg).await { + break; + } + } + } + } + } + .instrument(span!(Level::TRACE, DA_SAMPLING_TAG)) + .await; + + Ok(()) + } +} diff --git a/nomos-services/data-availability/sampling/src/network/adapters/libp2p.rs b/nomos-services/data-availability/sampling/src/network/adapters/libp2p.rs new file mode 100644 index 00000000..7d7685be --- /dev/null +++ b/nomos-services/data-availability/sampling/src/network/adapters/libp2p.rs @@ -0,0 +1,104 @@ +// std +use std::fmt::Debug; +use std::pin::Pin; + +// crates +use futures::{Stream, StreamExt}; +use libp2p_identity::PeerId; +use tokio::sync::oneshot; +// internal +use crate::network::NetworkAdapter; +use nomos_core::da::BlobId; +use nomos_da_network_core::SubnetworkId; +use nomos_da_network_service::backends::libp2p::validator::{ + DaNetworkEvent, DaNetworkEventKind, DaNetworkMessage, DaNetworkValidatorBackend, SamplingEvent, +}; +use nomos_da_network_service::{DaNetworkMsg, NetworkService}; +use overwatch_rs::services::relay::OutboundRelay; +use overwatch_rs::services::ServiceData; +use overwatch_rs::DynError; +use subnetworks_assignations::MembershipHandler; + +#[derive(Debug, Clone)] +pub struct DaNetworkSamplingSettings { + pub num_samples: u16, + pub subnet_size: SubnetworkId, +} + +pub struct Libp2pAdapter +where + Membership: MembershipHandler + + Debug + + Clone + + Send + + Sync + + 'static, +{ + network_relay: OutboundRelay< + > as ServiceData>::Message, + >, +} + +#[async_trait::async_trait] +impl NetworkAdapter for Libp2pAdapter +where + Membership: MembershipHandler + + Debug + + Clone + + Send + + Sync + + 'static, +{ + type Backend = DaNetworkValidatorBackend; + type Settings = DaNetworkSamplingSettings; + + async fn new( + network_relay: OutboundRelay< as ServiceData>::Message>, + ) -> Self { + Self { network_relay } + } + + async fn start_sampling( + &mut self, + blob_id: BlobId, + subnets: &[SubnetworkId], + ) -> Result<(), DynError> { + for id in subnets { + let subnetwork_id = id; + self.network_relay + .send(DaNetworkMsg::Process(DaNetworkMessage::RequestSample { + blob_id, + subnetwork_id: *subnetwork_id, + })) + .await + .expect("RequestSample message should have been sent") + } + Ok(()) + } + + async fn listen_to_sampling_messages( + &self, + ) -> Result + Send>>, DynError> { + let (stream_sender, stream_receiver) = oneshot::channel(); + self.network_relay + .send(DaNetworkMsg::Subscribe { + kind: DaNetworkEventKind::Sampling, + sender: stream_sender, + }) + .await + .map_err(|(error, _)| error)?; + stream_receiver + .await + .map(|stream| { + tokio_stream::StreamExt::filter_map(stream, |event| match event { + DaNetworkEvent::Sampling(event) => { + Some(event) + } + DaNetworkEvent::Verifying(_) => { + unreachable!("Subscribirng to sampling events should return a sampling only event stream"); + } + }).boxed() + }) + .map_err(|error| Box::new(error) as DynError) + } +} diff --git a/nomos-services/data-availability/sampling/src/network/adapters/mod.rs b/nomos-services/data-availability/sampling/src/network/adapters/mod.rs new file mode 100644 index 00000000..a22ade97 --- /dev/null +++ b/nomos-services/data-availability/sampling/src/network/adapters/mod.rs @@ -0,0 +1,2 @@ +#[cfg(feature = "libp2p")] +pub mod libp2p; diff --git a/nomos-services/data-availability/sampling/src/network/mod.rs b/nomos-services/data-availability/sampling/src/network/mod.rs new file mode 100644 index 00000000..0a2d0a13 --- /dev/null +++ b/nomos-services/data-availability/sampling/src/network/mod.rs @@ -0,0 +1,31 @@ +pub mod adapters; + +use futures::Stream; +use nomos_core::da::BlobId; +use nomos_da_network_core::SubnetworkId; +use nomos_da_network_service::backends::libp2p::validator::SamplingEvent; +use nomos_da_network_service::backends::NetworkBackend; +use nomos_da_network_service::NetworkService; +use overwatch_rs::services::relay::OutboundRelay; +use overwatch_rs::services::ServiceData; +use overwatch_rs::DynError; +use std::pin::Pin; + +#[async_trait::async_trait] +pub trait NetworkAdapter { + type Backend: NetworkBackend + Send + 'static; + type Settings: Clone; + + async fn new( + network_relay: OutboundRelay< as ServiceData>::Message>, + ) -> Self; + + async fn start_sampling( + &mut self, + blob_id: BlobId, + subnets: &[SubnetworkId], + ) -> Result<(), DynError>; + async fn listen_to_sampling_messages( + &self, + ) -> Result + Send>>, DynError>; +}