1
0
mirror of synced 2025-01-10 15:56:03 +00:00

Da: network service backend (#695)

* Sketch up types and backend skeleton

* Pipe up sampling events

* Pipe up validation blobs events

* Added docs

* Cargo fmt

* Clippy happy

* Debug log events in validator swarm run method
This commit is contained in:
Daniel Sanchez 2024-08-22 12:14:42 +02:00 committed by GitHub
parent 3671376691
commit 66de571bfa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 405 additions and 41 deletions

View File

@ -48,4 +48,28 @@ where
self.dispersal.update_membership(membership.clone());
self.replication.update_membership(membership);
}
pub fn sampling_behaviour(&self) -> &SamplingBehaviour<Membership> {
&self.sampling
}
pub fn dispersal_behaviour(&self) -> &DispersalValidatorBehaviour<Membership> {
&self.dispersal
}
pub fn replication_behaviour(&self) -> &ReplicationBehaviour<Membership> {
&self.replication
}
pub fn sampling_behaviour_mut(&mut self) -> &mut SamplingBehaviour<Membership> {
&mut self.sampling
}
pub fn dispersal_behaviour_mut(&mut self) -> &mut DispersalValidatorBehaviour<Membership> {
&mut self.dispersal
}
pub fn replication_behaviour_mut(&mut self) -> &mut ReplicationBehaviour<Membership> {
&mut self.replication
}
}

View File

@ -1,4 +1,5 @@
// std
use bincode::ErrorKind;
use std::collections::{HashMap, HashSet, VecDeque};
use std::task::{Context, Poll};
// crates
@ -72,6 +73,74 @@ impl SamplingError {
}
}
impl Clone for SamplingError {
fn clone(&self) -> Self {
match self {
SamplingError::Io { peer_id, error } => SamplingError::Io {
peer_id: *peer_id,
error: std::io::Error::new(error.kind(), error.to_string()),
},
SamplingError::Protocol {
subnetwork_id,
peer_id,
error,
} => SamplingError::Protocol {
subnetwork_id: *subnetwork_id,
peer_id: *peer_id,
error: error.clone(),
},
SamplingError::OpenStream { peer_id, error } => SamplingError::OpenStream {
peer_id: *peer_id,
error: match error {
OpenStreamError::UnsupportedProtocol(protocol) => {
OpenStreamError::UnsupportedProtocol(protocol.clone())
}
OpenStreamError::Io(error) => {
OpenStreamError::Io(std::io::Error::new(error.kind(), error.to_string()))
}
err => OpenStreamError::Io(std::io::Error::new(
std::io::ErrorKind::Other,
err.to_string(),
)),
},
},
SamplingError::Deserialize {
blob_id,
subnetwork_id,
peer_id,
error,
} => SamplingError::Deserialize {
blob_id: *blob_id,
subnetwork_id: *subnetwork_id,
peer_id: *peer_id,
error: clone_deserialize_error(error),
},
SamplingError::RequestChannel { request, peer_id } => SamplingError::RequestChannel {
request: request.clone(),
peer_id: *peer_id,
},
SamplingError::ResponseChannel { error, peer_id } => SamplingError::ResponseChannel {
peer_id: *peer_id,
error: *error,
},
}
}
}
fn clone_deserialize_error(error: &bincode::Error) -> bincode::Error {
Box::new(match error.as_ref() {
ErrorKind::Io(error) => ErrorKind::Io(std::io::Error::new(error.kind(), error.to_string())),
ErrorKind::InvalidUtf8Encoding(error) => ErrorKind::InvalidUtf8Encoding(*error),
ErrorKind::InvalidBoolEncoding(bool) => ErrorKind::InvalidBoolEncoding(*bool),
ErrorKind::InvalidCharEncoding => ErrorKind::InvalidCharEncoding,
ErrorKind::InvalidTagEncoding(tag) => ErrorKind::InvalidTagEncoding(*tag),
ErrorKind::DeserializeAnyNotSupported => ErrorKind::DeserializeAnyNotSupported,
ErrorKind::SizeLimit => ErrorKind::SizeLimit,
ErrorKind::SequenceMustHaveLength => ErrorKind::SequenceMustHaveLength,
ErrorKind::Custom(custom) => ErrorKind::Custom(custom.clone()),
})
}
/// Inner type representation of a Blob ID
// TODO: Use a proper type that is common to the codebase
type BlobId = [u8; 32];

View File

@ -1,10 +1,12 @@
// std
// crates
use futures::StreamExt;
use kzgrs_backend::common::blob::DaBlob;
use libp2p::identity::Keypair;
use libp2p::swarm::SwarmEvent;
use libp2p::{PeerId, Swarm, SwarmBuilder};
use log::debug;
use log::{debug, error};
use nomos_da_messages::replication::ReplicationReq;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
// internal
@ -18,6 +20,7 @@ use subnetworks_assignations::MembershipHandler;
pub struct ValidatorEventsStream {
pub sampling_events_receiver: UnboundedReceiverStream<SamplingEvent>,
pub validation_events_receiver: UnboundedReceiverStream<DaBlob>,
}
pub struct ValidatorSwarm<
@ -25,6 +28,7 @@ pub struct ValidatorSwarm<
> {
swarm: Swarm<ValidatorBehaviour<Membership>>,
sampling_events_sender: UnboundedSender<SamplingEvent>,
validation_events_sender: UnboundedSender<DaBlob>,
}
impl<Membership> ValidatorSwarm<Membership>
@ -33,14 +37,19 @@ where
{
pub fn new(key: Keypair, membership: Membership) -> (Self, ValidatorEventsStream) {
let (sampling_events_sender, sampling_events_receiver) = unbounded_channel();
let (validation_events_sender, validation_events_receiver) = unbounded_channel();
let sampling_events_receiver = UnboundedReceiverStream::new(sampling_events_receiver);
let validation_events_receiver = UnboundedReceiverStream::new(validation_events_receiver);
(
Self {
swarm: Self::build_swarm(key, membership),
sampling_events_sender,
validation_events_sender,
},
ValidatorEventsStream {
sampling_events_receiver,
validation_events_receiver,
},
)
}
@ -53,21 +62,48 @@ where
.build()
}
pub fn protocol_swarm(&self) -> &Swarm<ValidatorBehaviour<Membership>> {
&self.swarm
}
pub fn protocol_swarm_mut(&mut self) -> &mut Swarm<ValidatorBehaviour<Membership>> {
&mut self.swarm
}
async fn handle_sampling_event(&mut self, event: SamplingEvent) {
if let Err(e) = self.sampling_events_sender.send(event) {
debug!("Error distributing sampling message internally: {e:?}");
}
}
async fn handle_dispersal_event(&mut self, _event: DispersalEvent) {
// TODO: hook incoming dispersal events => to replication
unimplemented!()
async fn handle_dispersal_event(&mut self, event: DispersalEvent) {
match event {
// Send message for replication
DispersalEvent::IncomingMessage { message } => {
if let Ok(blob) = bincode::deserialize::<DaBlob>(
message
.blob
.as_ref()
.expect("Message blob should not be empty")
.data
.as_slice(),
) {
if let Err(e) = self.validation_events_sender.send(blob) {
error!("Error sending blob to validation: {e:?}");
}
}
self.swarm
.behaviour_mut()
.replication_behaviour_mut()
.send_message(ReplicationReq {
blob: message.blob,
subnetwork_id: message.subnetwork_id,
});
}
}
}
async fn handle_replication_event(&mut self, _event: ReplicationEvent) {
// TODO: Hook incoming blobs from replication protocol
unimplemented!()
}
async fn handle_replication_event(&mut self, _event: ReplicationEvent) {}
async fn handle_behaviour_event(&mut self, event: ValidatorBehaviourEvent<Membership>) {
match event {
@ -83,33 +119,33 @@ where
}
}
pub async fn step(&mut self) {
tokio::select! {
Some(event) = self.swarm.next() => {
pub async fn run(mut self) {
loop {
if let Some(event) = self.swarm.next().await {
debug!("Da swarm event received: {event:?}");
match event {
SwarmEvent::Behaviour(behaviour_event) => {
self.handle_behaviour_event(behaviour_event).await;
},
SwarmEvent::ConnectionEstablished{ .. } => {}
SwarmEvent::ConnectionClosed{ .. } => {}
SwarmEvent::IncomingConnection{ .. } => {}
SwarmEvent::IncomingConnectionError{ .. } => {}
SwarmEvent::OutgoingConnectionError{ .. } => {}
SwarmEvent::NewListenAddr{ .. } => {}
SwarmEvent::ExpiredListenAddr{ .. } => {}
SwarmEvent::ListenerClosed{ .. } => {}
SwarmEvent::ListenerError{ .. } => {}
SwarmEvent::Dialing{ .. } => {}
SwarmEvent::NewExternalAddrCandidate{ .. } => {}
SwarmEvent::ExternalAddrConfirmed{ .. } => {}
SwarmEvent::ExternalAddrExpired{ .. } => {}
SwarmEvent::NewExternalAddrOfPeer{ .. } => {}
}
SwarmEvent::ConnectionEstablished { .. } => {}
SwarmEvent::ConnectionClosed { .. } => {}
SwarmEvent::IncomingConnection { .. } => {}
SwarmEvent::IncomingConnectionError { .. } => {}
SwarmEvent::OutgoingConnectionError { .. } => {}
SwarmEvent::NewListenAddr { .. } => {}
SwarmEvent::ExpiredListenAddr { .. } => {}
SwarmEvent::ListenerClosed { .. } => {}
SwarmEvent::ListenerError { .. } => {}
SwarmEvent::Dialing { .. } => {}
SwarmEvent::NewExternalAddrCandidate { .. } => {}
SwarmEvent::ExternalAddrConfirmed { .. } => {}
SwarmEvent::ExternalAddrExpired { .. } => {}
SwarmEvent::NewExternalAddrOfPeer { .. } => {}
event => {
debug!("Unsupported validator swarm event: {event:?}");
}
}
}
}
}
}

View File

@ -8,7 +8,11 @@ async-trait = "0.1"
futures = "0.3"
kzgrs-backend = { path = "../../../nomos-da/kzgrs-backend" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
nomos-da-network-core = { path = "../../../nomos-da/network/core" }
subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" }
libp2p = { version = "0.54", features = ["ed25519"] }
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1", features = ["macros", "sync"] }
tokio-stream = "0.1"
tracing = "0.1"
log = "0.4.22"

View File

@ -0,0 +1 @@
pub mod validator;

View File

@ -0,0 +1,217 @@
use crate::backends::NetworkBackend;
use futures::{Stream, StreamExt};
use kzgrs_backend::common::blob::DaBlob;
use libp2p::identity::Keypair;
use libp2p::PeerId;
use log::error;
use nomos_da_network_core::protocols::sampling;
use nomos_da_network_core::protocols::sampling::behaviour::SamplingError;
use nomos_da_network_core::swarm::validator::{ValidatorEventsStream, ValidatorSwarm};
use nomos_da_network_core::SubnetworkId;
use overwatch_rs::overwatch::handle::OverwatchHandle;
use overwatch_rs::services::state::NoState;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::pin::Pin;
use subnetworks_assignations::MembershipHandler;
use tokio::sync::broadcast;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::BroadcastStream;
type BlobId = [u8; 32];
type ColumnIdx = u32;
const BROADCAST_CHANNEL_SIZE: usize = 128;
/// Message that the backend replies to
#[derive(Debug)]
pub enum DaNetworkMessage {
/// Kickstart a network sapling
RequestSample {
subnetwork_id: ColumnIdx,
blob_id: BlobId,
},
}
/// Events types to subscribe to
/// * Sampling: Incoming sampling events [success/fail]
/// * Incoming blobs to be verified
#[derive(Debug)]
pub enum DaNetworkEventKind {
Sampling,
Verifying,
}
/// Sampling events coming from da network
#[derive(Debug, Clone)]
pub enum SamplingEvent {
/// A success sampling
SamplingSuccess { blob_id: BlobId, blob: Box<DaBlob> },
/// A failed sampling error
SamplingError { error: SamplingError },
}
/// DA network incoming events
#[derive(Debug)]
pub enum DaNetworkEvent {
Sampling(SamplingEvent),
Verifying(Box<DaBlob>),
}
/// DA network backend for validators
/// Internally uses a libp2p swarm composed of the [`ValidatorBehaviour`]
/// It forwards network messages to the corresponding subscription channels/streams
pub struct DaNetworkValidatorBackend<Membership> {
// TODO: this join handles should be cancelable tasks. We should add an stop method for
// the `NetworkBackend` trait so if the service is stopped the backend can gracefully handle open
// sub-tasks as well.
#[allow(dead_code)]
task: JoinHandle<()>,
#[allow(dead_code)]
replies_task: JoinHandle<()>,
sampling_request_channel: UnboundedSender<(SubnetworkId, BlobId)>,
sampling_broadcast_receiver: broadcast::Receiver<SamplingEvent>,
verifying_broadcast_receiver: broadcast::Receiver<DaBlob>,
_membership: PhantomData<Membership>,
}
#[derive(Clone, Debug)]
pub struct DaNetworkValidatorBackendSettings<Membership> {
/// Identification key
key: Keypair,
/// Membership of DA network PoV set
membership: Membership,
}
impl<Membership> DaNetworkValidatorBackend<Membership> {
/// Send the sampling request to the underlying sampling behaviour
async fn handle_sample_request(&self, subnetwork_id: SubnetworkId, blob_id: BlobId) {
if let Err(SendError((subnetwork_id, blob_id))) =
self.sampling_request_channel.send((subnetwork_id, blob_id))
{
error!(
"Error requesting sample for subnetwork id : {subnetwork_id}, blob_id: {blob_id:?}"
);
}
}
}
#[async_trait::async_trait]
impl<Membership> NetworkBackend for DaNetworkValidatorBackend<Membership>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Clone
+ Debug
+ Send
+ Sync
+ 'static,
{
type Settings = DaNetworkValidatorBackendSettings<Membership>;
type State = NoState<Self::Settings>;
type Message = DaNetworkMessage;
type EventKind = DaNetworkEventKind;
type NetworkEvent = DaNetworkEvent;
fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self {
let (validator_swarm, events_streams) = ValidatorSwarm::new(config.key, config.membership);
let sampling_request_channel = validator_swarm
.protocol_swarm()
.behaviour()
.sampling_behaviour()
.sample_request_channel();
let task = overwatch_handle.runtime().spawn(validator_swarm.run());
let (sampling_broadcast_sender, sampling_broadcast_receiver) =
broadcast::channel(BROADCAST_CHANNEL_SIZE);
let (verifying_broadcast_sender, verifying_broadcast_receiver) =
broadcast::channel(BROADCAST_CHANNEL_SIZE);
let replies_task = overwatch_handle
.runtime()
.spawn(handle_validator_events_stream(
events_streams,
sampling_broadcast_sender,
verifying_broadcast_sender,
));
Self {
task,
replies_task,
sampling_request_channel,
sampling_broadcast_receiver,
verifying_broadcast_receiver,
_membership: Default::default(),
}
}
async fn process(&self, msg: Self::Message) {
match msg {
DaNetworkMessage::RequestSample {
subnetwork_id,
blob_id,
} => {
self.handle_sample_request(subnetwork_id, blob_id).await;
}
}
}
async fn subscribe(
&mut self,
event: Self::EventKind,
) -> Pin<Box<dyn Stream<Item = Self::NetworkEvent> + Send>> {
match event {
DaNetworkEventKind::Sampling => Box::pin(
BroadcastStream::new(self.sampling_broadcast_receiver.resubscribe())
.filter_map(|event| async { event.ok() })
.map(Self::NetworkEvent::Sampling),
),
DaNetworkEventKind::Verifying => Box::pin(
BroadcastStream::new(self.verifying_broadcast_receiver.resubscribe())
.filter_map(|event| async { event.ok() })
.map(|blob| Self::NetworkEvent::Verifying(Box::new(blob))),
),
}
}
}
/// Task that handles forwarding of events to the subscriptions channels/stream
async fn handle_validator_events_stream(
events_streams: ValidatorEventsStream,
sampling_broadcast_sender: broadcast::Sender<SamplingEvent>,
validation_broadcast_sender: broadcast::Sender<DaBlob>,
) {
let ValidatorEventsStream {
mut sampling_events_receiver,
mut validation_events_receiver,
} = events_streams;
#[allow(clippy::never_loop)]
loop {
// WARNING: `StreamExt::next` is cancellation safe.
// If adding more branches check if such methods are within the cancellation safe set:
// https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
tokio::select! {
Some(sampling_event) = StreamExt::next(&mut sampling_events_receiver) => {
match sampling_event {
sampling::behaviour::SamplingEvent::SamplingSuccess{ blob_id, blob , .. } => {
if let Err(e) = sampling_broadcast_sender.send(SamplingEvent::SamplingSuccess {blob_id, blob}){
error!("Error in internal broadcast of sampling success: {e:?}");
}
}
sampling::behaviour::SamplingEvent::IncomingSample{ .. } => {
unimplemented!("Handle request/response from Sampling service");
}
sampling::behaviour::SamplingEvent::SamplingError{ error } => {
if let Err(e) = sampling_broadcast_sender.send(SamplingEvent::SamplingError {error}) {
error!{"Error in internal broadcast of sampling error: {e:?}"};
}
}}
}
Some(da_blob) = StreamExt::next(&mut validation_events_receiver)=> {
if let Err(error) = validation_broadcast_sender.send(da_blob) {
error!("Error in internal broadcast of validation for blob: {:?}", error.0);
}
}
}
}
}

View File

@ -1,10 +1,13 @@
use futures::{Stream, StreamExt};
use kzgrs_backend::common::{blob::DaBlob, build_blob_id};
use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::NoState};
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use tokio::sync::{
broadcast::{self, Receiver},
broadcast::{self},
mpsc,
};
use tokio_stream::wrappers::BroadcastStream;
use crate::backends::NetworkBackend;
@ -96,9 +99,15 @@ impl NetworkBackend for MockExecutorBackend {
}
}
async fn subscribe(&mut self, kind: Self::EventKind) -> Receiver<Self::NetworkEvent> {
async fn subscribe(
&mut self,
kind: Self::EventKind,
) -> Pin<Box<dyn Stream<Item = Self::NetworkEvent> + Send>> {
match kind {
EventKind::Dispersal | EventKind::Sample => self.events_tx.subscribe(),
EventKind::Dispersal | EventKind::Sample => Box::pin(
BroadcastStream::new(self.events_tx.subscribe())
.filter_map(|event| async { event.ok() }),
),
}
}
}

View File

@ -1,8 +1,10 @@
pub mod libp2p;
pub mod mock;
use super::*;
use futures::Stream;
use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::ServiceState};
use tokio::sync::broadcast::Receiver;
use std::pin::Pin;
#[async_trait::async_trait]
pub trait NetworkBackend {
@ -14,5 +16,8 @@ pub trait NetworkBackend {
fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self;
async fn process(&self, msg: Self::Message);
async fn subscribe(&mut self, event: Self::EventKind) -> Receiver<Self::NetworkEvent>;
async fn subscribe(
&mut self,
event: Self::EventKind,
) -> Pin<Box<dyn Stream<Item = Self::NetworkEvent> + Send>>;
}

View File

@ -2,10 +2,11 @@ pub mod backends;
// std
use std::fmt::{self, Debug};
use std::pin::Pin;
// crates
use async_trait::async_trait;
use backends::NetworkBackend;
use futures::StreamExt;
use futures::{Stream, StreamExt};
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::{
handle::ServiceStateHandle,
@ -14,7 +15,6 @@ use overwatch_rs::services::{
ServiceCore, ServiceData, ServiceId,
};
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use tokio::sync::oneshot;
use tracing::error;
// internal
@ -23,7 +23,7 @@ pub enum DaNetworkMsg<B: NetworkBackend> {
Process(B::Message),
Subscribe {
kind: B::EventKind,
sender: oneshot::Sender<broadcast::Receiver<B::NetworkEvent>>,
sender: oneshot::Sender<Pin<Box<dyn Stream<Item = B::NetworkEvent> + Send>>>,
},
}
@ -31,10 +31,9 @@ impl<B: NetworkBackend> Debug for DaNetworkMsg<B> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Process(msg) => write!(fmt, "DaNetworkMsg::Process({msg:?})"),
Self::Subscribe { kind, sender } => write!(
fmt,
"DaNetworkMsg::Subscribe{{ kind: {kind:?}, sender: {sender:?}}}"
),
Self::Subscribe { kind, .. } => {
write!(fmt, "DaNetworkMsg::Subscribe{{ kind: {kind:?}}}")
}
}
}
}
@ -52,7 +51,7 @@ impl<B: NetworkBackend> Debug for NetworkConfig<B> {
}
}
pub struct NetworkService<B: NetworkBackend + 'static> {
pub struct NetworkService<B: NetworkBackend + Send + 'static> {
backend: B,
service_state: ServiceStateHandle<Self>,
}
@ -61,7 +60,7 @@ pub struct NetworkState<B: NetworkBackend> {
_backend: B::State,
}
impl<B: NetworkBackend + 'static> ServiceData for NetworkService<B> {
impl<B: NetworkBackend + 'static + Send> ServiceData for NetworkService<B> {
const SERVICE_ID: ServiceId = "DaNetwork";
type Settings = NetworkConfig<B>;
type State = NetworkState<B>;