DA: Sampling service (#705)
* initial * first iteration, lots of lifetime and trait issues, does not compile * Daniel/sampling service (#706) * Implement sampling service * Implement libp2p adapter listen_to_sampling_messages * temporary empty backend mod; implement start_sampling * addressed PR comments; moved random sampling subnet list generation to backend (todo impl) * addressed PR nitpicks * remove empty mod which was failing CI due to cargo fmt * removed unused code --------- Co-authored-by: holisticode <holistic.computing@gmail.com> Co-authored-by: Daniel Sanchez <sanchez.quiros.daniel@gmail.com>
This commit is contained in:
parent
d5ceceff9e
commit
0cb039d806
@ -18,6 +18,7 @@ members = [
|
|||||||
"nomos-services/system-sig",
|
"nomos-services/system-sig",
|
||||||
"nomos-services/data-availability/indexer",
|
"nomos-services/data-availability/indexer",
|
||||||
"nomos-services/data-availability/network",
|
"nomos-services/data-availability/network",
|
||||||
|
"nomos-services/data-availability/sampling",
|
||||||
"nomos-services/data-availability/verifier",
|
"nomos-services/data-availability/verifier",
|
||||||
"nomos-services/data-availability/tests",
|
"nomos-services/data-availability/tests",
|
||||||
"nomos-da/full-replication",
|
"nomos-da/full-replication",
|
||||||
|
@ -4,7 +4,7 @@ version = "0.1.0"
|
|||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
libp2p = { version = "0.53", features = ["macros"] }
|
libp2p = { version = "0.53", features = ["macros", "tokio", "quic"] }
|
||||||
libp2p-stream = "0.1.0-alpha"
|
libp2p-stream = "0.1.0-alpha"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
@ -25,6 +25,6 @@ thiserror = "1.0"
|
|||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio = { version = "1.39", features = ["macros", "rt-multi-thread", "time"] }
|
tokio = { version = "1.39", features = ["macros", "rt-multi-thread", "time"] }
|
||||||
libp2p = { version = "0.53", features = ["ed25519", "ping", "macros", "tokio", "quic", "tcp", "yamux", "noise"] }
|
libp2p = { version = "0.53", features = ["ed25519", "ping", "macros", "quic", "tcp", "yamux", "noise"] }
|
||||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||||
|
|
||||||
|
29
nomos-services/data-availability/sampling/Cargo.toml
Normal file
29
nomos-services/data-availability/sampling/Cargo.toml
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
[package]
|
||||||
|
name = "nomos-da-sampling"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
async-trait = "0.1"
|
||||||
|
bytes = "1.2"
|
||||||
|
futures = "0.3"
|
||||||
|
kzgrs-backend = { path = "../../../nomos-da/kzgrs-backend" }
|
||||||
|
libp2p-identity = { version = "0.2" }
|
||||||
|
nomos-core = { path = "../../../nomos-core" }
|
||||||
|
nomos-da-network-core = { path = "../../../nomos-da/network/core" }
|
||||||
|
nomos-da-network-service = { path = "../../../nomos-services/data-availability/network" }
|
||||||
|
nomos-storage = { path = "../../../nomos-services/storage" }
|
||||||
|
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||||
|
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
|
||||||
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" }
|
||||||
|
tokio = { version = "1", features = ["sync", "macros"] }
|
||||||
|
tokio-stream = "0.1.15"
|
||||||
|
tracing = "0.1"
|
||||||
|
thiserror = "1.0.63"
|
||||||
|
rand = "0.8.5"
|
||||||
|
rand_chacha = "0.3.1"
|
||||||
|
|
||||||
|
[features]
|
||||||
|
default = ["libp2p"]
|
||||||
|
libp2p = []
|
@ -0,0 +1,2 @@
|
|||||||
|
|
||||||
|
|
21
nomos-services/data-availability/sampling/src/backend/mod.rs
Normal file
21
nomos-services/data-availability/sampling/src/backend/mod.rs
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
// std
|
||||||
|
use std::collections::BTreeSet;
|
||||||
|
|
||||||
|
// crates
|
||||||
|
//
|
||||||
|
// internal
|
||||||
|
use nomos_da_network_core::SubnetworkId;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
pub trait DaSamplingServiceBackend {
|
||||||
|
type Settings;
|
||||||
|
type BlobId;
|
||||||
|
type Blob;
|
||||||
|
|
||||||
|
fn new(settings: Self::Settings) -> Self;
|
||||||
|
async fn get_validated_blobs(&self) -> BTreeSet<Self::BlobId>;
|
||||||
|
async fn mark_in_block(&mut self, blobs_id: &[Self::BlobId]);
|
||||||
|
async fn handle_sampling_success(&mut self, blob_id: Self::BlobId, blob: Self::Blob);
|
||||||
|
async fn handle_sampling_error(&mut self, blob_id: Self::BlobId);
|
||||||
|
async fn init_sampling(&mut self, blob_id: Self::BlobId) -> Vec<SubnetworkId>;
|
||||||
|
}
|
202
nomos-services/data-availability/sampling/src/lib.rs
Normal file
202
nomos-services/data-availability/sampling/src/lib.rs
Normal file
@ -0,0 +1,202 @@
|
|||||||
|
pub mod backend;
|
||||||
|
pub mod network;
|
||||||
|
|
||||||
|
// std
|
||||||
|
use std::collections::BTreeSet;
|
||||||
|
use std::fmt::Debug;
|
||||||
|
|
||||||
|
// crates
|
||||||
|
use tokio_stream::StreamExt;
|
||||||
|
use tracing::{error, span, Instrument, Level};
|
||||||
|
// internal
|
||||||
|
use backend::DaSamplingServiceBackend;
|
||||||
|
use kzgrs_backend::common::blob::DaBlob;
|
||||||
|
use network::NetworkAdapter;
|
||||||
|
use nomos_core::da::BlobId;
|
||||||
|
use nomos_da_network_service::backends::libp2p::validator::SamplingEvent;
|
||||||
|
use nomos_da_network_service::NetworkService;
|
||||||
|
use overwatch_rs::services::handle::ServiceStateHandle;
|
||||||
|
use overwatch_rs::services::life_cycle::LifecycleMessage;
|
||||||
|
use overwatch_rs::services::relay::{Relay, RelayMessage};
|
||||||
|
use overwatch_rs::services::state::{NoOperator, NoState};
|
||||||
|
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
|
||||||
|
use overwatch_rs::DynError;
|
||||||
|
use tokio::sync::oneshot;
|
||||||
|
|
||||||
|
const DA_SAMPLING_TAG: ServiceId = "DA-Sampling";
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum DaSamplingServiceMsg<BlobId> {
|
||||||
|
TriggerSampling {
|
||||||
|
blob_id: BlobId,
|
||||||
|
},
|
||||||
|
GetValidatedBlobs {
|
||||||
|
reply_channel: oneshot::Sender<BTreeSet<BlobId>>,
|
||||||
|
},
|
||||||
|
MarkInBlock {
|
||||||
|
blobs_id: Vec<BlobId>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct DaSamplingServiceSettings<BackendSettings, NetworkSettings> {
|
||||||
|
pub sampling_settings: BackendSettings,
|
||||||
|
pub network_adapter_settings: NetworkSettings,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B: 'static> RelayMessage for DaSamplingServiceMsg<B> {}
|
||||||
|
|
||||||
|
pub struct DaSamplingService<Backend, N, S>
|
||||||
|
where
|
||||||
|
Backend: DaSamplingServiceBackend + Send,
|
||||||
|
Backend::Settings: Clone,
|
||||||
|
Backend::Blob: Debug + 'static,
|
||||||
|
Backend::BlobId: Debug + 'static,
|
||||||
|
N: NetworkAdapter,
|
||||||
|
N::Settings: Clone,
|
||||||
|
{
|
||||||
|
network_relay: Relay<NetworkService<N::Backend>>,
|
||||||
|
service_state: ServiceStateHandle<Self>,
|
||||||
|
sampler: Backend,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Backend, N, S> DaSamplingService<Backend, N, S>
|
||||||
|
where
|
||||||
|
Backend: DaSamplingServiceBackend<BlobId = BlobId, Blob = DaBlob> + Send + 'static,
|
||||||
|
Backend::Settings: Clone,
|
||||||
|
N: NetworkAdapter + Send + 'static,
|
||||||
|
N::Settings: Clone,
|
||||||
|
{
|
||||||
|
async fn should_stop_service(message: LifecycleMessage) -> bool {
|
||||||
|
match message {
|
||||||
|
LifecycleMessage::Shutdown(sender) => {
|
||||||
|
if sender.send(()).is_err() {
|
||||||
|
error!(
|
||||||
|
"Error sending successful shutdown signal from service {}",
|
||||||
|
Self::SERVICE_ID
|
||||||
|
);
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
LifecycleMessage::Kill => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_service_message(
|
||||||
|
msg: <Self as ServiceData>::Message,
|
||||||
|
network_adapter: &mut N,
|
||||||
|
sampler: &mut Backend,
|
||||||
|
) {
|
||||||
|
match msg {
|
||||||
|
DaSamplingServiceMsg::TriggerSampling { blob_id } => {
|
||||||
|
let sampling_subnets = sampler.init_sampling(blob_id).await;
|
||||||
|
if let Err(e) = network_adapter
|
||||||
|
.start_sampling(blob_id, &sampling_subnets)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
error!("Error sampling for BlobId: {blob_id:?}: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DaSamplingServiceMsg::GetValidatedBlobs { reply_channel } => {
|
||||||
|
let validated_blobs = sampler.get_validated_blobs().await;
|
||||||
|
if let Err(_e) = reply_channel.send(validated_blobs) {
|
||||||
|
error!("Error repliying validated blobs request");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DaSamplingServiceMsg::MarkInBlock { blobs_id } => {
|
||||||
|
sampler.mark_in_block(&blobs_id).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_sampling_message(event: SamplingEvent, sampler: &mut Backend) {
|
||||||
|
match event {
|
||||||
|
SamplingEvent::SamplingSuccess { blob_id, blob } => {
|
||||||
|
sampler.handle_sampling_success(blob_id, *blob).await;
|
||||||
|
}
|
||||||
|
SamplingEvent::SamplingError { error } => {
|
||||||
|
error!("Error while sampling: {error}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Backend, N, S> ServiceData for DaSamplingService<Backend, N, S>
|
||||||
|
where
|
||||||
|
Backend: DaSamplingServiceBackend + Send,
|
||||||
|
Backend::Settings: Clone,
|
||||||
|
Backend::Blob: Debug + 'static,
|
||||||
|
Backend::BlobId: Debug + 'static,
|
||||||
|
N: NetworkAdapter,
|
||||||
|
N::Settings: Clone,
|
||||||
|
{
|
||||||
|
const SERVICE_ID: ServiceId = DA_SAMPLING_TAG;
|
||||||
|
type Settings = DaSamplingServiceSettings<Backend::Settings, N::Settings>;
|
||||||
|
type State = NoState<Self::Settings>;
|
||||||
|
type StateOperator = NoOperator<Self::State>;
|
||||||
|
type Message = DaSamplingServiceMsg<Backend::BlobId>;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl<Backend, N, S> ServiceCore for DaSamplingService<Backend, N, S>
|
||||||
|
where
|
||||||
|
Backend: DaSamplingServiceBackend<BlobId = BlobId, Blob = DaBlob> + Send + Sync + 'static,
|
||||||
|
Backend::Settings: Clone + Send + Sync + 'static,
|
||||||
|
N: NetworkAdapter + Send + Sync + 'static,
|
||||||
|
N::Settings: Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
|
||||||
|
let DaSamplingServiceSettings {
|
||||||
|
sampling_settings, ..
|
||||||
|
} = service_state.settings_reader.get_updated_settings();
|
||||||
|
|
||||||
|
let network_relay = service_state.overwatch_handle.relay();
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
network_relay,
|
||||||
|
service_state,
|
||||||
|
sampler: Backend::new(sampling_settings),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(self) -> Result<(), DynError> {
|
||||||
|
// This service will likely have to be modified later on.
|
||||||
|
// Most probably the verifier itself need to be constructed/update for every message with
|
||||||
|
// an updated list of the available nodes list, as it needs his own index coming from the
|
||||||
|
// position of his bls public key landing in the above-mentioned list.
|
||||||
|
let Self {
|
||||||
|
network_relay,
|
||||||
|
mut service_state,
|
||||||
|
mut sampler,
|
||||||
|
} = self;
|
||||||
|
let DaSamplingServiceSettings { .. } = service_state.settings_reader.get_updated_settings();
|
||||||
|
|
||||||
|
let network_relay = network_relay.connect().await?;
|
||||||
|
let mut network_adapter = N::new(network_relay).await;
|
||||||
|
|
||||||
|
let mut sampling_message_stream = network_adapter.listen_to_sampling_messages().await?;
|
||||||
|
|
||||||
|
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
|
||||||
|
async {
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
Some(service_message) = service_state.inbound_relay.recv() => {
|
||||||
|
Self::handle_service_message(service_message, &mut network_adapter, &mut sampler).await;
|
||||||
|
}
|
||||||
|
Some(sampling_message) = sampling_message_stream.next() => {
|
||||||
|
Self::handle_sampling_message(sampling_message, &mut sampler).await;
|
||||||
|
}
|
||||||
|
Some(msg) = lifecycle_stream.next() => {
|
||||||
|
if Self::should_stop_service(msg).await {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.instrument(span!(Level::TRACE, DA_SAMPLING_TAG))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,104 @@
|
|||||||
|
// std
|
||||||
|
use std::fmt::Debug;
|
||||||
|
use std::pin::Pin;
|
||||||
|
|
||||||
|
// crates
|
||||||
|
use futures::{Stream, StreamExt};
|
||||||
|
use libp2p_identity::PeerId;
|
||||||
|
use tokio::sync::oneshot;
|
||||||
|
// internal
|
||||||
|
use crate::network::NetworkAdapter;
|
||||||
|
use nomos_core::da::BlobId;
|
||||||
|
use nomos_da_network_core::SubnetworkId;
|
||||||
|
use nomos_da_network_service::backends::libp2p::validator::{
|
||||||
|
DaNetworkEvent, DaNetworkEventKind, DaNetworkMessage, DaNetworkValidatorBackend, SamplingEvent,
|
||||||
|
};
|
||||||
|
use nomos_da_network_service::{DaNetworkMsg, NetworkService};
|
||||||
|
use overwatch_rs::services::relay::OutboundRelay;
|
||||||
|
use overwatch_rs::services::ServiceData;
|
||||||
|
use overwatch_rs::DynError;
|
||||||
|
use subnetworks_assignations::MembershipHandler;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct DaNetworkSamplingSettings {
|
||||||
|
pub num_samples: u16,
|
||||||
|
pub subnet_size: SubnetworkId,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Libp2pAdapter<Membership>
|
||||||
|
where
|
||||||
|
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
|
||||||
|
+ Debug
|
||||||
|
+ Clone
|
||||||
|
+ Send
|
||||||
|
+ Sync
|
||||||
|
+ 'static,
|
||||||
|
{
|
||||||
|
network_relay: OutboundRelay<
|
||||||
|
<NetworkService<DaNetworkValidatorBackend<Membership>> as ServiceData>::Message,
|
||||||
|
>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl<Membership> NetworkAdapter for Libp2pAdapter<Membership>
|
||||||
|
where
|
||||||
|
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
|
||||||
|
+ Debug
|
||||||
|
+ Clone
|
||||||
|
+ Send
|
||||||
|
+ Sync
|
||||||
|
+ 'static,
|
||||||
|
{
|
||||||
|
type Backend = DaNetworkValidatorBackend<Membership>;
|
||||||
|
type Settings = DaNetworkSamplingSettings;
|
||||||
|
|
||||||
|
async fn new(
|
||||||
|
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
|
||||||
|
) -> Self {
|
||||||
|
Self { network_relay }
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn start_sampling(
|
||||||
|
&mut self,
|
||||||
|
blob_id: BlobId,
|
||||||
|
subnets: &[SubnetworkId],
|
||||||
|
) -> Result<(), DynError> {
|
||||||
|
for id in subnets {
|
||||||
|
let subnetwork_id = id;
|
||||||
|
self.network_relay
|
||||||
|
.send(DaNetworkMsg::Process(DaNetworkMessage::RequestSample {
|
||||||
|
blob_id,
|
||||||
|
subnetwork_id: *subnetwork_id,
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
.expect("RequestSample message should have been sent")
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn listen_to_sampling_messages(
|
||||||
|
&self,
|
||||||
|
) -> Result<Pin<Box<dyn Stream<Item = SamplingEvent> + Send>>, DynError> {
|
||||||
|
let (stream_sender, stream_receiver) = oneshot::channel();
|
||||||
|
self.network_relay
|
||||||
|
.send(DaNetworkMsg::Subscribe {
|
||||||
|
kind: DaNetworkEventKind::Sampling,
|
||||||
|
sender: stream_sender,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|(error, _)| error)?;
|
||||||
|
stream_receiver
|
||||||
|
.await
|
||||||
|
.map(|stream| {
|
||||||
|
tokio_stream::StreamExt::filter_map(stream, |event| match event {
|
||||||
|
DaNetworkEvent::Sampling(event) => {
|
||||||
|
Some(event)
|
||||||
|
}
|
||||||
|
DaNetworkEvent::Verifying(_) => {
|
||||||
|
unreachable!("Subscribirng to sampling events should return a sampling only event stream");
|
||||||
|
}
|
||||||
|
}).boxed()
|
||||||
|
})
|
||||||
|
.map_err(|error| Box::new(error) as DynError)
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,2 @@
|
|||||||
|
#[cfg(feature = "libp2p")]
|
||||||
|
pub mod libp2p;
|
31
nomos-services/data-availability/sampling/src/network/mod.rs
Normal file
31
nomos-services/data-availability/sampling/src/network/mod.rs
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
pub mod adapters;
|
||||||
|
|
||||||
|
use futures::Stream;
|
||||||
|
use nomos_core::da::BlobId;
|
||||||
|
use nomos_da_network_core::SubnetworkId;
|
||||||
|
use nomos_da_network_service::backends::libp2p::validator::SamplingEvent;
|
||||||
|
use nomos_da_network_service::backends::NetworkBackend;
|
||||||
|
use nomos_da_network_service::NetworkService;
|
||||||
|
use overwatch_rs::services::relay::OutboundRelay;
|
||||||
|
use overwatch_rs::services::ServiceData;
|
||||||
|
use overwatch_rs::DynError;
|
||||||
|
use std::pin::Pin;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
pub trait NetworkAdapter {
|
||||||
|
type Backend: NetworkBackend + Send + 'static;
|
||||||
|
type Settings: Clone;
|
||||||
|
|
||||||
|
async fn new(
|
||||||
|
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
|
||||||
|
) -> Self;
|
||||||
|
|
||||||
|
async fn start_sampling(
|
||||||
|
&mut self,
|
||||||
|
blob_id: BlobId,
|
||||||
|
subnets: &[SubnetworkId],
|
||||||
|
) -> Result<(), DynError>;
|
||||||
|
async fn listen_to_sampling_messages(
|
||||||
|
&self,
|
||||||
|
) -> Result<Pin<Box<dyn Stream<Item = SamplingEvent> + Send>>, DynError>;
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user