Da: Network dispersal (#688)

* Fix reading messages state machine

* Improve waker on behaviour

* Rename test

* First sketch

* Implement replication

* Refactor protocols names

* Clippy happy

* expose behaviour

* Return dummy on outbound for validator behaviour

* Implement behaviour, lacks streams handling

* Cleanup

* Implement stream handling

* Add some documentation and comments

* Clippy fixes after rebase

* Move all neighbours to test_utils module

* Executor only outgoing streams

* First assault at testing behaviours

* Add debug instead of traces

* Added more logs

* Bullshitting tests

* Fix outstream handling, offers hook to send new peer streams

* Fix blob dispersal handling

* Refactor test, use new api

* Imports cleanup

* Working test

* Add docs

* Non overlapping test port
This commit is contained in:
Daniel Sanchez 2024-08-12 09:12:08 +02:00 committed by GitHub
parent 4150c6514d
commit 5f0707c276
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 781 additions and 25 deletions

View File

@ -5,18 +5,25 @@ edition = "2021"
[dependencies]
libp2p = { version = "0.53" }
libp2p-stream = "0.1.0-alpha"
futures = "0.3"
tracing = "0.1"
indexmap = "2.2"
subnetworks-assignations = { path = "../subnetworks-assignations" }
nomos-da-messages = { path = "../messages" }
kzgrs-backend = { path = "../../kzgrs-backend" }
tracing-subscriber = "0.3.18"
void = "1.0.2"
either = "1.13.0"
log = "0.4"
serde = "1.0"
bincode = "1.3"
tokio = { version = "1.39" }
tokio-stream = "0.1"
thiserror = "1.0"
[dev-dependencies]
tokio = { version = "1.39", features = ["macros", "rt-multi-thread", "time"] }
libp2p = { version = "0.53", features = ["ed25519", "ping", "macros", "tokio", "quic"] }
libp2p = { version = "0.53", features = ["ed25519", "ping", "macros", "tokio", "quic", "tcp", "yamux", "noise"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@ -0,0 +1,439 @@
// std
use std::collections::{HashMap, VecDeque};
use std::task::{Context, Poll};
// crates
use either::Either;
use futures::future::BoxFuture;
use futures::stream::{BoxStream, FuturesUnordered};
use futures::{AsyncWriteExt, FutureExt, StreamExt};
use libp2p::core::Endpoint;
use libp2p::swarm::{
ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent,
THandlerOutEvent, ToSwarm,
};
use libp2p::{Multiaddr, PeerId, Stream};
use libp2p_stream::{Control, OpenStreamError};
use thiserror::Error;
use tokio::sync::mpsc;
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::wrappers::UnboundedReceiverStream;
// internal
use crate::protocol::DISPERSAL_PROTOCOL;
use crate::SubnetworkId;
use kzgrs_backend::common::blob::DaBlob;
use nomos_da_messages::common::Blob;
use nomos_da_messages::dispersal::dispersal_res::MessageType;
use nomos_da_messages::dispersal::{DispersalErr, DispersalReq, DispersalRes};
use nomos_da_messages::{pack_message, unpack_from_reader};
use subnetworks_assignations::MembershipHandler;
#[derive(Debug, Error)]
pub enum DispersalError {
#[error("Stream disconnected: {error}")]
Io {
error: std::io::Error,
blob_id: BlobId,
subnetwork_id: SubnetworkId,
},
#[error("Could not serialized: {error}")]
Serialization {
error: bincode::Error,
blob_id: BlobId,
subnetwork_id: SubnetworkId,
},
#[error("Dispersal response error: {error:?}")]
Protocol {
subnetwork_id: SubnetworkId,
error: DispersalErr,
},
#[error("Error dialing peer [{peer_id}]: {error}")]
OpenStreamError {
peer_id: PeerId,
error: OpenStreamError,
},
}
impl DispersalError {
pub fn blob_id(&self) -> Option<BlobId> {
match self {
DispersalError::Io { blob_id, .. } => Some(*blob_id),
DispersalError::Serialization { blob_id, .. } => Some(*blob_id),
DispersalError::Protocol {
error: DispersalErr { blob_id, .. },
..
} => Some(blob_id.clone().try_into().unwrap()),
DispersalError::OpenStreamError { .. } => None,
}
}
pub fn subnetwork_id(&self) -> Option<SubnetworkId> {
match self {
DispersalError::Io { subnetwork_id, .. } => Some(*subnetwork_id),
DispersalError::Serialization { subnetwork_id, .. } => Some(*subnetwork_id),
DispersalError::Protocol { subnetwork_id, .. } => Some(*subnetwork_id),
DispersalError::OpenStreamError { .. } => None,
}
}
}
type BlobId = [u8; 32];
#[derive(Debug)]
pub enum DispersalExecutorEvent {
/// A blob successfully arrived its destination
DispersalSuccess {
blob_id: BlobId,
subnetwork_id: SubnetworkId,
},
/// Something went wrong delivering the blob
DispersalError { error: DispersalError },
}
struct DispersalStream {
stream: Stream,
peer_id: PeerId,
}
type StreamHandlerFutureSuccess = (BlobId, SubnetworkId, DispersalRes, DispersalStream);
type StreamHandlerFuture = BoxFuture<'static, Result<StreamHandlerFutureSuccess, DispersalError>>;
/// Executor dispersal protocol
/// Do not handle incoming connections, just accepts outgoing ones.
/// It takes care of sending blobs to different subnetworks.
/// Bubbles up events with the success or error when dispersing
pub struct DispersalExecutorBehaviour<Membership: MembershipHandler> {
/// Underlying stream behaviour
stream_behaviour: libp2p_stream::Behaviour,
/// Pending running tasks (one task per stream)
tasks: FuturesUnordered<StreamHandlerFuture>,
/// Streams which didn't have any pending task
idle_streams: HashMap<PeerId, DispersalStream>,
/// Subnetworks membership information
membership: Membership,
/// Pending blobs that need to be dispersed by PeerId
to_disperse: HashMap<PeerId, VecDeque<(Membership::NetworkId, DaBlob)>>,
/// Already connected peers connection Ids
connected_subnetworks: HashMap<PeerId, ConnectionId>,
/// Sender hook of peers to open streams channel
pending_out_streams_sender: UnboundedSender<PeerId>,
/// Pending to open streams
pending_out_streams: BoxStream<'static, Result<DispersalStream, DispersalError>>,
/// Dispersal hook of pending blobs channel
pending_blobs_sender: UnboundedSender<(Membership::NetworkId, DaBlob)>,
/// Pending blobs stream
pending_blobs_stream: BoxStream<'static, (Membership::NetworkId, DaBlob)>,
}
impl<Membership> DispersalExecutorBehaviour<Membership>
where
Membership: MembershipHandler + 'static,
Membership::NetworkId: Send,
{
pub fn new(membership: Membership) -> Self {
let stream_behaviour = libp2p_stream::Behaviour::new();
let tasks = FuturesUnordered::new();
let to_disperse = HashMap::new();
let connected_subnetworks = HashMap::new();
let idle_streams = HashMap::new();
let (pending_out_streams_sender, receiver) = mpsc::unbounded_channel();
let control = stream_behaviour.new_control();
let pending_out_streams = UnboundedReceiverStream::new(receiver)
.zip(futures::stream::repeat(control))
.then(|(peer_id, control)| Self::open_stream(peer_id, control))
.boxed();
let (pending_blobs_sender, receiver) = mpsc::unbounded_channel();
let pending_blobs_stream = UnboundedReceiverStream::new(receiver).boxed();
Self {
stream_behaviour,
tasks,
membership,
to_disperse,
connected_subnetworks,
idle_streams,
pending_out_streams_sender,
pending_out_streams,
pending_blobs_sender,
pending_blobs_stream,
}
}
/// Open a new stream from the underlying control to the provided peer
async fn open_stream(
peer_id: PeerId,
mut control: Control,
) -> Result<DispersalStream, DispersalError> {
let stream = control
.open_stream(peer_id, DISPERSAL_PROTOCOL)
.await
.map_err(|error| DispersalError::OpenStreamError { peer_id, error })?;
Ok(DispersalStream { stream, peer_id })
}
/// Get a hook to the sender channel of open stream events
pub fn open_stream_sender(&self) -> UnboundedSender<PeerId> {
self.pending_out_streams_sender.clone()
}
/// Get a hook to the sender channel of the blobs dispersal events
pub fn blobs_sender(&self) -> UnboundedSender<(Membership::NetworkId, DaBlob)> {
self.pending_blobs_sender.clone()
}
/// Task for handling streams, one message at a time
/// Writes the blob to the stream and waits for an acknowledgment response
async fn stream_disperse(
mut stream: DispersalStream,
message: DaBlob,
subnetwork_id: SubnetworkId,
) -> Result<StreamHandlerFutureSuccess, DispersalError> {
let blob_id = message.id();
let blob = bincode::serialize(&message).map_err(|error| DispersalError::Serialization {
error,
blob_id: blob_id.clone().try_into().unwrap(),
subnetwork_id,
})?;
let message = DispersalReq {
blob: Some(Blob {
blob_id: blob_id.clone(),
data: blob,
}),
subnetwork_id,
};
stream
.stream
.write_all(&pack_message(&message).map_err(|error| DispersalError::Io {
error,
blob_id: blob_id.clone().try_into().unwrap(),
subnetwork_id,
})?)
.await
.map_err(|error| DispersalError::Io {
error,
blob_id: blob_id.clone().try_into().unwrap(),
subnetwork_id,
})?;
stream
.stream
.flush()
.await
.map_err(|error| DispersalError::Io {
error,
blob_id: blob_id.clone().try_into().unwrap(),
subnetwork_id,
})?;
let response: DispersalRes =
unpack_from_reader(&mut stream.stream)
.await
.map_err(|error| DispersalError::Io {
error,
blob_id: blob_id.clone().try_into().unwrap(),
subnetwork_id,
})?;
// Safety: blob_id should always be a 32bytes hash, currently is abstracted into a `Vec<u8>`
// but probably we should have a `[u8; 32]` wrapped in a custom type `BlobId`
// TODO: use blob_id when changing types to [u8; 32]
Ok((blob_id.try_into().unwrap(), subnetwork_id, response, stream))
}
/// Run when a stream gets free, if there is a pending task for the stream it will get scheduled to run
/// otherwise it is parked as idle.
fn handle_stream(
tasks: &mut FuturesUnordered<StreamHandlerFuture>,
to_disperse: &mut HashMap<PeerId, VecDeque<(SubnetworkId, DaBlob)>>,
idle_streams: &mut HashMap<PeerId, DispersalStream>,
stream: DispersalStream,
) {
if let Some((subnetwork_id, next_request)) =
Self::next_request(&stream.peer_id, to_disperse)
{
let fut = Self::stream_disperse(stream, next_request, subnetwork_id).boxed();
tasks.push(fut);
} else {
// There is no pending request, so just idle the stream
idle_streams.insert(stream.peer_id, stream);
};
}
/// Get a pending request if its available
fn next_request(
peer_id: &PeerId,
to_disperse: &mut HashMap<PeerId, VecDeque<(SubnetworkId, DaBlob)>>,
) -> Option<(SubnetworkId, DaBlob)> {
to_disperse
.get_mut(peer_id)
.and_then(|queue| queue.pop_front())
}
}
impl<Membership: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static>
DispersalExecutorBehaviour<Membership>
{
/// Schedule a new task for sending the blob, if stream is not available queue messages for later
/// processing.
fn disperse_blob(
tasks: &mut FuturesUnordered<StreamHandlerFuture>,
idle_streams: &mut HashMap<Membership::Id, DispersalStream>,
membership: &mut Membership,
connected_subnetworks: &mut HashMap<PeerId, ConnectionId>,
to_disperse: &mut HashMap<PeerId, VecDeque<(Membership::NetworkId, DaBlob)>>,
subnetwork_id: SubnetworkId,
blob: DaBlob,
) {
let members = membership.members_of(&subnetwork_id);
let peers = members
.iter()
.filter(|peer_id| connected_subnetworks.contains_key(peer_id));
// We may be connected to more than a single node. Usually will be one, but that is an
// internal decision of the executor itself.
for peer in peers {
if let Some(stream) = idle_streams.remove(peer) {
// push a task if the stream is immediately available
let fut = Self::stream_disperse(stream, blob.clone(), subnetwork_id).boxed();
tasks.push(fut);
} else {
// otherwise queue the blob
to_disperse
.entry(*peer)
.or_default()
.push_back((subnetwork_id, blob.clone()));
}
}
}
}
impl<M: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static> NetworkBehaviour
for DispersalExecutorBehaviour<M>
{
type ConnectionHandler = Either<
<libp2p_stream::Behaviour as NetworkBehaviour>::ConnectionHandler,
libp2p::swarm::dummy::ConnectionHandler,
>;
type ToSwarm = DispersalExecutorEvent;
fn handle_established_inbound_connection(
&mut self,
_connection_id: ConnectionId,
_peer: PeerId,
_local_addr: &Multiaddr,
_remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Either::Right(libp2p::swarm::dummy::ConnectionHandler))
}
fn handle_established_outbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
addr: &Multiaddr,
role_override: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
self.connected_subnetworks.insert(peer, connection_id);
self.stream_behaviour
.handle_established_outbound_connection(connection_id, peer, addr, role_override)
.map(Either::Left)
}
fn on_swarm_event(&mut self, event: FromSwarm) {
self.stream_behaviour.on_swarm_event(event)
}
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
let Either::Left(event) = event else {
unreachable!()
};
self.stream_behaviour
.on_connection_handler_event(peer_id, connection_id, event)
}
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
let Self {
tasks,
to_disperse,
idle_streams,
pending_out_streams,
pending_blobs_stream,
membership,
connected_subnetworks,
..
} = self;
// poll pending tasks
if let Poll::Ready(Some(future_result)) = tasks.poll_next_unpin(cx) {
match future_result {
Ok((blob_id, subnetwork_id, dispersal_response, stream)) => {
// handle the free stream then return the success
Self::handle_stream(tasks, to_disperse, idle_streams, stream);
// return an error if there was an error on the other side of the wire
if let DispersalRes {
message_type: Some(MessageType::Err(error)),
} = dispersal_response
{
return Poll::Ready(ToSwarm::GenerateEvent(
DispersalExecutorEvent::DispersalError {
error: DispersalError::Protocol {
subnetwork_id,
error,
},
},
));
}
return Poll::Ready(ToSwarm::GenerateEvent(
DispersalExecutorEvent::DispersalSuccess {
blob_id,
subnetwork_id,
},
));
}
// Something went up on our side of the wire, bubble it up
Err(error) => {
return Poll::Ready(ToSwarm::GenerateEvent(
DispersalExecutorEvent::DispersalError { error },
))
}
}
}
// poll pending blobs
if let Poll::Ready(Some((subnetwork_id, blob))) = pending_blobs_stream.poll_next_unpin(cx) {
Self::disperse_blob(
tasks,
idle_streams,
membership,
connected_subnetworks,
to_disperse,
subnetwork_id,
blob,
);
}
// poll pending streams
if let Poll::Ready(Some(res)) = pending_out_streams.poll_next_unpin(cx) {
match res {
Ok(stream) => {
Self::handle_stream(tasks, to_disperse, idle_streams, stream);
}
Err(error) => {
return Poll::Ready(ToSwarm::GenerateEvent(
DispersalExecutorEvent::DispersalError { error },
));
}
}
}
// Deal with connection as the underlying behaviour would do
match self.stream_behaviour.poll(cx) {
Poll::Ready(ToSwarm::Dial { opts }) => Poll::Ready(ToSwarm::Dial { opts }),
Poll::Pending => {
// TODO: probably must be smarter when to wake this
cx.waker().wake_by_ref();
Poll::Pending
}
_ => unreachable!(),
}
}
}

View File

@ -0,0 +1 @@
pub mod behaviour;

View File

@ -1 +1,143 @@
pub mod executor;
pub mod validator;
#[cfg(test)]
pub mod test {
use crate::dispersal::executor::behaviour::DispersalExecutorBehaviour;
use crate::dispersal::validator::behaviour::{DispersalEvent, DispersalValidatorBehaviour};
use crate::test_utils::AllNeighbours;
use futures::StreamExt;
use kzgrs_backend::common::blob::DaBlob;
use kzgrs_backend::common::Column;
use libp2p::identity::Keypair;
use libp2p::swarm::SwarmEvent;
use libp2p::{quic, Multiaddr, PeerId};
use log::info;
use std::time::Duration;
use subnetworks_assignations::MembershipHandler;
use tracing_subscriber::fmt::TestWriter;
use tracing_subscriber::EnvFilter;
pub fn executor_swarm(
key: Keypair,
membership: impl MembershipHandler<NetworkId = u32, Id = PeerId> + 'static,
) -> libp2p::Swarm<
DispersalExecutorBehaviour<impl MembershipHandler<NetworkId = u32, Id = PeerId>>,
> {
libp2p::SwarmBuilder::with_existing_identity(key)
.with_tokio()
.with_other_transport(|keypair| quic::tokio::Transport::new(quic::Config::new(keypair)))
.unwrap()
.with_behaviour(|_key| DispersalExecutorBehaviour::new(membership))
.unwrap()
.with_swarm_config(|cfg| {
cfg.with_idle_connection_timeout(std::time::Duration::from_secs(u64::MAX))
})
.build()
}
pub fn validator_swarm(
key: Keypair,
membership: impl MembershipHandler<NetworkId = u32, Id = PeerId> + 'static,
) -> libp2p::Swarm<
DispersalValidatorBehaviour<impl MembershipHandler<NetworkId = u32, Id = PeerId>>,
> {
libp2p::SwarmBuilder::with_existing_identity(key)
.with_tokio()
.with_other_transport(|keypair| quic::tokio::Transport::new(quic::Config::new(keypair)))
.unwrap()
.with_behaviour(|_key| DispersalValidatorBehaviour::new(membership))
.unwrap()
.with_swarm_config(|cfg| {
cfg.with_idle_connection_timeout(std::time::Duration::from_secs(u64::MAX))
})
.build()
}
#[tokio::test]
async fn test_dispersal_single_node() {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.compact()
.with_writer(TestWriter::default())
.try_init();
let k1 = libp2p::identity::Keypair::generate_ed25519();
let k2 = libp2p::identity::Keypair::generate_ed25519();
let validator_peer = PeerId::from_public_key(&k2.public());
let neighbours = AllNeighbours {
neighbours: [
PeerId::from_public_key(&k1.public()),
PeerId::from_public_key(&k2.public()),
]
.into_iter()
.collect(),
};
let mut executor = executor_swarm(k1, neighbours.clone());
let mut validator = validator_swarm(k2, neighbours);
let msg_count = 10usize;
let addr: Multiaddr = "/ip4/127.0.0.1/udp/5063/quic-v1".parse().unwrap();
let addr2 = addr.clone().with_p2p(validator_peer).unwrap();
let validator_task = async move {
validator.listen_on(addr).unwrap();
let mut res = vec![];
loop {
match validator.select_next_some().await {
SwarmEvent::Behaviour(DispersalEvent::IncomingMessage { message }) => {
res.push(message);
}
event => {
info!("Validator event: {event:?}");
}
}
if res.len() == msg_count {
break;
}
}
res
};
let join_validator = tokio::spawn(validator_task);
tokio::time::sleep(Duration::from_secs(1)).await;
executor.dial(addr2).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
let executor_open_stream_sender = executor.behaviour().open_stream_sender();
let executor_disperse_blob_sender = executor.behaviour().blobs_sender();
let (sender, mut receiver) = tokio::sync::oneshot::channel();
let executor_poll = async move {
loop {
tokio::select! {
Some(event) = executor.next() => {
info!("Executor event: {event:?}");
}
_ = &mut receiver => {
break;
}
}
}
};
let executor_task = tokio::spawn(executor_poll);
executor_open_stream_sender.send(validator_peer).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
for i in 0..10 {
info!("Sending blob: {i}");
executor_disperse_blob_sender
.send((
0,
DaBlob {
column: Column(vec![]),
column_commitment: Default::default(),
aggregated_column_commitment: Default::default(),
aggregated_column_proof: Default::default(),
rows_commitments: vec![],
rows_proofs: vec![],
},
))
.unwrap()
}
assert_eq!(join_validator.await.unwrap().len(), msg_count);
sender.send(()).unwrap();
executor_task.await.unwrap();
}
}

View File

@ -0,0 +1,145 @@
use crate::protocol::DISPERSAL_PROTOCOL;
use crate::SubnetworkId;
use either::Either;
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{AsyncWriteExt, FutureExt, StreamExt};
use libp2p::core::Endpoint;
use libp2p::swarm::{
ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent,
THandlerOutEvent, ToSwarm,
};
use libp2p::{Multiaddr, PeerId, Stream};
use libp2p_stream::IncomingStreams;
use log::debug;
use nomos_da_messages::dispersal::dispersal_res::MessageType;
use nomos_da_messages::dispersal::{DispersalReq, DispersalRes};
use nomos_da_messages::{pack_message, unpack_from_reader};
use std::io::Error;
use std::task::{Context, Poll};
use subnetworks_assignations::MembershipHandler;
#[derive(Debug)]
pub enum DispersalEvent {
/// Received a n
IncomingMessage { message: DispersalReq },
}
pub struct DispersalValidatorBehaviour<Membership> {
stream_behaviour: libp2p_stream::Behaviour,
incoming_streams: IncomingStreams,
tasks: FuturesUnordered<BoxFuture<'static, Result<(DispersalReq, Stream), Error>>>,
membership: Membership,
}
impl<Membership: MembershipHandler> DispersalValidatorBehaviour<Membership> {
pub fn new(membership: Membership) -> Self {
let stream_behaviour = libp2p_stream::Behaviour::new();
let mut stream_control = stream_behaviour.new_control();
let incoming_streams = stream_control
.accept(DISPERSAL_PROTOCOL)
.expect("Just a single accept to protocol is valid");
let tasks = FuturesUnordered::new();
Self {
stream_behaviour,
incoming_streams,
tasks,
membership,
}
}
/// Stream handling messages task.
/// This task handles a single message receive. Then it writes up the acknowledgment into the same
/// stream as response and finish.
async fn handle_new_stream(mut stream: Stream) -> Result<(DispersalReq, Stream), Error> {
let message: DispersalReq = unpack_from_reader(&mut stream).await?;
let blob_id = message.blob.clone().unwrap().blob_id;
let response = DispersalRes {
message_type: Some(MessageType::BlobId(blob_id)),
};
let message_bytes = pack_message(&response)?;
stream.write_all(&message_bytes).await?;
stream.flush().await?;
Ok((message, stream))
}
}
impl<M: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static> NetworkBehaviour
for DispersalValidatorBehaviour<M>
{
type ConnectionHandler = Either<
<libp2p_stream::Behaviour as NetworkBehaviour>::ConnectionHandler,
libp2p::swarm::dummy::ConnectionHandler,
>;
type ToSwarm = DispersalEvent;
fn handle_established_inbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
if !self.membership.is_allowed(&peer) {
return Ok(Either::Right(libp2p::swarm::dummy::ConnectionHandler));
}
self.stream_behaviour
.handle_established_inbound_connection(connection_id, peer, local_addr, remote_addr)
.map(Either::Left)
}
fn handle_established_outbound_connection(
&mut self,
_connection_id: ConnectionId,
_peer: PeerId,
_addr: &Multiaddr,
_role_override: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Either::Right(libp2p::swarm::dummy::ConnectionHandler))
}
fn on_swarm_event(&mut self, event: FromSwarm) {
self.stream_behaviour.on_swarm_event(event)
}
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
connection_id: ConnectionId,
event: THandlerOutEvent<Self>,
) {
let Either::Left(event) = event else {
unreachable!()
};
self.stream_behaviour
.on_connection_handler_event(peer_id, connection_id, event)
}
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
let Self {
incoming_streams,
tasks,
..
} = self;
match tasks.poll_next_unpin(cx) {
Poll::Ready(Some(Ok((message, stream)))) => {
tasks.push(Self::handle_new_stream(stream).boxed());
return Poll::Ready(ToSwarm::GenerateEvent(DispersalEvent::IncomingMessage {
message,
}));
}
Poll::Ready(Some(Err(error))) => {
debug!("Error on dispersal stream {error:?}");
}
_ => {}
}
if let Poll::Ready(Some((_peer_id, stream))) = incoming_streams.poll_next_unpin(cx) {
tasks.push(Self::handle_new_stream(stream).boxed());
}
// TODO: probably must be smarter when to wake this
cx.waker().wake_by_ref();
Poll::Pending
}
}

View File

@ -0,0 +1 @@
pub mod behaviour;

View File

@ -2,5 +2,7 @@ pub mod dispersal;
pub mod protocol;
pub mod replication;
pub mod sampling;
#[cfg(test)]
pub mod test_utils;
pub type SubnetworkId = u32;

View File

@ -1,3 +1,4 @@
use libp2p::StreamProtocol;
pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/nomos/da/0.1.0");
pub const REPLICATION_PROTOCOL: StreamProtocol = StreamProtocol::new("/nomos/da/0.1.0/replication");
pub const DISPERSAL_PROTOCOL: StreamProtocol = StreamProtocol::new("/nomos/da/0.1.0/dispersal");

View File

@ -16,7 +16,7 @@ use tracing::error;
// internal
use nomos_da_messages::{pack_message, unpack_from_reader};
use crate::protocol::PROTOCOL_NAME;
use crate::protocol::REPLICATION_PROTOCOL;
pub type DaMessage = nomos_da_messages::replication::ReplicationReq;
@ -185,7 +185,7 @@ impl ConnectionHandler for ReplicationHandler {
type OutboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ())
SubstreamProtocol::new(ReadyUpgrade::new(REPLICATION_PROTOCOL), ())
}
fn poll(

View File

@ -3,7 +3,6 @@ pub mod handler;
#[cfg(test)]
mod test {
use std::collections::HashSet;
use std::time::Duration;
use futures::StreamExt;
@ -15,28 +14,10 @@ mod test {
use tracing_subscriber::EnvFilter;
use nomos_da_messages::common::Blob;
use subnetworks_assignations::MembershipHandler;
use crate::replication::behaviour::{ReplicationBehaviour, ReplicationEvent};
use crate::replication::handler::DaMessage;
#[derive(Clone)]
struct AllNeighbours {
neighbours: HashSet<PeerId>,
}
impl MembershipHandler for AllNeighbours {
type NetworkId = u32;
type Id = PeerId;
fn membership(&self, _self_id: &Self::Id) -> HashSet<Self::NetworkId> {
[0].into_iter().collect()
}
fn members_of(&self, _network_id: &Self::NetworkId) -> HashSet<Self::Id> {
self.neighbours.clone()
}
}
use crate::test_utils::AllNeighbours;
#[tokio::test]
async fn test_connects_and_receives_replication_messages() {

View File

@ -0,0 +1,25 @@
use libp2p::PeerId;
use std::collections::HashSet;
use subnetworks_assignations::MembershipHandler;
#[derive(Clone)]
pub struct AllNeighbours {
pub neighbours: HashSet<PeerId>,
}
impl MembershipHandler for AllNeighbours {
type NetworkId = u32;
type Id = PeerId;
fn membership(&self, _self_id: &Self::Id) -> HashSet<Self::NetworkId> {
[0].into_iter().collect()
}
fn is_allowed(&self, _id: &Self::Id) -> bool {
true
}
fn members_of(&self, _network_id: &Self::NetworkId) -> HashSet<Self::Id> {
self.neighbours.clone()
}
}

View File

@ -2,11 +2,23 @@ use std::collections::HashSet;
use std::hash::Hash;
pub trait MembershipHandler {
// Subnetworks Id type
type NetworkId: Eq + Hash;
// Members Id type
type Id;
fn membership(&self, self_id: &Self::Id) -> HashSet<Self::NetworkId>;
// Returns the set of NetworksIds an id is a member of
fn membership(&self, id: &Self::Id) -> HashSet<Self::NetworkId>;
// True if the id is a member of a network_id, False otherwise
fn is_member_of(&self, id: &Self::Id, network_id: &Self::NetworkId) -> bool {
self.membership(id).contains(network_id)
}
// Returns true if the member id is in the overall membership set
fn is_allowed(&self, id: &Self::Id) -> bool;
// Returns the set of members in a subnetwork by its NetworkId
fn members_of(&self, network_id: &Self::NetworkId) -> HashSet<Self::Id>;
}