From ca0eb824aac3ac46837ef0351f62d17a8bc5ed47 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Tue, 24 Sep 2024 22:39:59 +0900 Subject: [PATCH] Mix: NetworkBehaviour (#765) * Mix: NetworkBehaviour * use Waker.wake() * make clippy happy by defining Config instead of Behaviour::Default --- Cargo.toml | 5 +- nomos-mix/message/Cargo.toml | 7 + nomos-mix/message/src/error.rs | 10 ++ nomos-mix/message/src/lib.rs | 61 +++++++ nomos-mix/network/Cargo.toml | 18 ++ nomos-mix/network/src/behaviour.rs | 256 +++++++++++++++++++++++++++ nomos-mix/network/src/error.rs | 15 ++ nomos-mix/network/src/handler.rs | 268 +++++++++++++++++++++++++++++ nomos-mix/network/src/lib.rs | 157 +++++++++++++++++ nomos-mix/queue/Cargo.toml | 7 + nomos-mix/queue/src/lib.rs | 39 +++++ 11 files changed, 842 insertions(+), 1 deletion(-) create mode 100644 nomos-mix/message/Cargo.toml create mode 100644 nomos-mix/message/src/error.rs create mode 100644 nomos-mix/message/src/lib.rs create mode 100644 nomos-mix/network/Cargo.toml create mode 100644 nomos-mix/network/src/behaviour.rs create mode 100644 nomos-mix/network/src/error.rs create mode 100644 nomos-mix/network/src/handler.rs create mode 100644 nomos-mix/network/src/lib.rs create mode 100644 nomos-mix/queue/Cargo.toml create mode 100644 nomos-mix/queue/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 8d3e5b3b..94f0a7fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,9 @@ members = [ "nomos-services/data-availability/verifier", "nomos-services/data-availability/tests", "nomos-da/full-replication", + "nomos-mix/message", + "nomos-mix/network", + "nomos-mix/queue", "nomos-cli", "nomos-utils", "nodes/nomos-node", @@ -34,4 +37,4 @@ members = [ "tests", ] exclude = ["proof_of_leadership/risc0/risc0_proofs"] -resolver = "2" \ No newline at end of file +resolver = "2" diff --git a/nomos-mix/message/Cargo.toml b/nomos-mix/message/Cargo.toml new file mode 100644 index 00000000..a92252f4 --- /dev/null +++ b/nomos-mix/message/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "nomos-mix-message" +version = "0.1.0" +edition = "2021" + +[dependencies] +sha2 = "0.10.8" diff --git a/nomos-mix/message/src/error.rs b/nomos-mix/message/src/error.rs new file mode 100644 index 00000000..e5924bcb --- /dev/null +++ b/nomos-mix/message/src/error.rs @@ -0,0 +1,10 @@ +#[derive(Debug)] +pub enum Error { + /// Invalid mix message format + InvalidMixMessage, + /// Payload size is too large + PayloadTooLarge, + /// Unwrapping a message is not allowed + /// (e.g. the message cannot be unwrapped using the private key provided) + MsgUnwrapNotAllowed, +} diff --git a/nomos-mix/message/src/lib.rs b/nomos-mix/message/src/lib.rs new file mode 100644 index 00000000..3fb56e0c --- /dev/null +++ b/nomos-mix/message/src/lib.rs @@ -0,0 +1,61 @@ +mod error; + +pub use error::Error; + +use sha2::{Digest, Sha256}; + +pub const MSG_SIZE: usize = 1024; +pub const NOISE: [u8; MSG_SIZE] = [0; MSG_SIZE]; + +/// A mock implementation of the Sphinx encoding. +/// +/// The length of the encoded message is fixed to [`MSG_SIZE`] bytes. +/// The first byte of the encoded message is the number of remaining layers to be unwrapped. +/// The remaining bytes are the payload that is zero-padded to the end. +pub fn new_message(payload: &[u8], num_layers: u8) -> Result, Error> { + if payload.len() > MSG_SIZE - 1 { + return Err(Error::PayloadTooLarge); + } + + let mut message: Vec = Vec::with_capacity(MSG_SIZE); + message.push(num_layers); + message.extend(payload); + message.extend(std::iter::repeat(0).take(MSG_SIZE - message.len())); + Ok(message) +} + +/// SHA-256 hash of the message +pub fn message_id(message: &[u8]) -> Vec { + let mut hasher = Sha256::new(); + hasher.update(message); + hasher.finalize().to_vec() +} + +/// Unwrap the message one layer. +/// +/// This function returns the unwrapped message and a boolean indicating whether the message was fully unwrapped. +/// (False if the message still has layers to be unwrapped, true otherwise) +/// +/// If the input message was already fully unwrapped, or if ititss format is invalid, +/// this function returns `[Error::InvalidMixMessage]`. +pub fn unwrap_message(message: &[u8]) -> Result<(Vec, bool), Error> { + if message.is_empty() { + return Err(Error::InvalidMixMessage); + } + + match message[0] { + 0 => Err(Error::InvalidMixMessage), + 1 => Ok((message[1..].to_vec(), true)), + n => { + let mut unwrapped: Vec = Vec::with_capacity(message.len()); + unwrapped.push(n - 1); + unwrapped.extend(&message[1..]); + Ok((unwrapped, false)) + } + } +} + +/// Check if the message is a noise message. +pub fn is_noise(message: &[u8]) -> bool { + message == NOISE +} diff --git a/nomos-mix/network/Cargo.toml b/nomos-mix/network/Cargo.toml new file mode 100644 index 00000000..4618f21f --- /dev/null +++ b/nomos-mix/network/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "nomos-mix-network" +version = "0.1.0" +edition = "2021" + +[dependencies] +cached = "0.53.1" +futures = "0.3.30" +futures-timer = "3.0.3" +libp2p = "0.53" +tracing = "0.1" +nomos-mix-message = { path = "../message" } +nomos-mix-queue = { path = "../queue" } + +[dev-dependencies] +tokio = { version = "1.39", features = ["macros", "rt-multi-thread", "time"] } +libp2p = { version = "0.53", features = ["ed25519", "tokio", "quic"] } +tracing-subscriber = "0.3.18" diff --git a/nomos-mix/network/src/behaviour.rs b/nomos-mix/network/src/behaviour.rs new file mode 100644 index 00000000..84f588f9 --- /dev/null +++ b/nomos-mix/network/src/behaviour.rs @@ -0,0 +1,256 @@ +use std::{ + collections::{HashMap, HashSet, VecDeque}, + task::{Context, Poll, Waker}, +}; + +use cached::{Cached, TimedCache}; +use libp2p::{ + core::Endpoint, + swarm::{ + ConnectionClosed, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, + NotifyHandler, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + }, + Multiaddr, PeerId, +}; +use nomos_mix_message::{message_id, unwrap_message}; + +use crate::{ + error::Error, + handler::{FromBehaviour, MixConnectionHandler, ToBehaviour}, +}; + +/// A [`NetworkBehaviour`] that forwards messages between mix nodes. +pub struct Behaviour { + config: Config, + /// Peers that support the mix protocol, and their connection IDs + negotiated_peers: HashMap>, + /// Queue of events to yield to the swarm. + events: VecDeque>, + /// Waker that handles polling + waker: Option, + /// An LRU time cache for storing seen messages (based on their ID). This cache prevents + /// duplicates from being propagated on the network. + duplicate_cache: TimedCache, ()>, +} + +#[derive(Debug)] +pub struct Config { + pub transmission_rate: f64, + pub duplicate_cache_lifespan: u64, +} + +#[derive(Debug)] +pub enum Event { + /// A fully unwrapped message received from one of the peers. + FullyUnwrappedMessage(Vec), + Error(Error), +} + +impl Behaviour { + pub fn new(config: Config) -> Self { + let duplicate_cache = TimedCache::with_lifespan(config.duplicate_cache_lifespan); + Self { + config, + negotiated_peers: HashMap::new(), + events: VecDeque::new(), + waker: None, + duplicate_cache, + } + } + + /// Publishs a message through the mix network. + /// + /// This function expects that the message was already encoded for the cryptographic mixing + /// (e.g. Sphinx encoding). + /// + /// The message is forward to all connected peers, + /// so that it can arrive in the mix node who can unwrap it one layer. + /// Fully unwrapped messages are returned as the [`MixBehaviourEvent::FullyUnwrappedMessage`]. + pub fn publish(&mut self, message: Vec) -> Result<(), Error> { + self.duplicate_cache.cache_set(message_id(&message), ()); + self.forward_message(message, None) + } + + /// Forwards a message to all connected peers except the one that was received from. + /// + /// Returns [`Error::NoPeers`] if there are no connected peers that support the mix protocol. + fn forward_message( + &mut self, + message: Vec, + propagation_source: Option, + ) -> Result<(), Error> { + let peer_ids = self + .negotiated_peers + .keys() + .filter(|&peer_id| { + if let Some(propagation_source) = propagation_source { + *peer_id != propagation_source + } else { + true + } + }) + .cloned() + .collect::>(); + + if peer_ids.is_empty() { + return Err(Error::NoPeers); + } + + for peer_id in peer_ids.into_iter() { + tracing::debug!("Registering event for peer {:?} to send msg", peer_id); + self.events.push_back(ToSwarm::NotifyHandler { + peer_id, + handler: NotifyHandler::Any, + event: FromBehaviour::Message(message.clone()), + }); + } + + self.try_wake(); + Ok(()) + } + + fn add_negotiated_peer(&mut self, peer_id: PeerId, connection_id: ConnectionId) -> bool { + tracing::debug!( + "Adding to connected_peers: peer_id:{:?}, connection_id:{:?}", + peer_id, + connection_id + ); + self.negotiated_peers + .entry(peer_id) + .or_default() + .insert(connection_id) + } + + fn remove_negotiated_peer(&mut self, peer_id: &PeerId, connection_id: &ConnectionId) { + if let Some(connections) = self.negotiated_peers.get_mut(peer_id) { + tracing::debug!( + "Removing from connected_peers: peer:{:?}, connection_id:{:?}", + peer_id, + connection_id + ); + connections.remove(connection_id); + if connections.is_empty() { + self.negotiated_peers.remove(peer_id); + } + } + } + + fn try_wake(&mut self) { + if let Some(waker) = self.waker.take() { + waker.wake(); + } + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = MixConnectionHandler; + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(MixConnectionHandler::new(&self.config)) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(MixConnectionHandler::new(&self.config)) + } + + /// Informs the behaviour about an event from the [`Swarm`]. + fn on_swarm_event(&mut self, event: FromSwarm) { + if let FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + connection_id, + .. + }) = event + { + self.remove_negotiated_peer(&peer_id, &connection_id); + } + } + + /// Handles an event generated by the [`MixConnectionHandler`] + /// dedicated to the connection identified by `peer_id` and `connection_id`. + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: THandlerOutEvent, + ) { + match event { + // A message was forwarded from the peer. + ToBehaviour::Message(message) => { + if self + .duplicate_cache + .cache_set(message_id(&message), ()) + .is_some() + { + return; + } + + // Try to unwrap the message. + match unwrap_message(&message) { + Ok((unwrapped_msg, fully_unwrapped)) => { + if fully_unwrapped { + self.events.push_back(ToSwarm::GenerateEvent( + Event::FullyUnwrappedMessage(unwrapped_msg), + )); + } else if let Err(e) = self.forward_message(unwrapped_msg, None) { + tracing::error!("Failed to forward message: {:?}", e); + } + } + Err(nomos_mix_message::Error::MsgUnwrapNotAllowed) => { + // Forward the received message as it is. + if let Err(e) = self.forward_message(message, Some(peer_id)) { + tracing::error!("Failed to forward message: {:?}", e); + } + } + Err(e) => { + tracing::error!("Failed to unwrap message: {:?}", e); + } + } + } + // The connection was fully negotiated by the peer, + // which means that the peer supports the mix protocol. + ToBehaviour::FullyNegotiatedOutbound => { + self.add_negotiated_peer(peer_id, connection_id); + } + ToBehaviour::NegotiationFailed => { + self.remove_negotiated_peer(&peer_id, &connection_id); + } + ToBehaviour::IOError(error) => { + // TODO: Consider removing the peer from the connected_peers and closing the connection + self.events + .push_back(ToSwarm::GenerateEvent(Event::Error(Error::PeerIOError { + error, + peer_id, + connection_id, + }))); + } + } + + self.try_wake(); + } + + /// Polls for things that swarm should do. + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + if let Some(event) = self.events.pop_front() { + Poll::Ready(event) + } else { + self.waker = Some(cx.waker().clone()); + Poll::Pending + } + } +} diff --git a/nomos-mix/network/src/error.rs b/nomos-mix/network/src/error.rs new file mode 100644 index 00000000..598f3e9d --- /dev/null +++ b/nomos-mix/network/src/error.rs @@ -0,0 +1,15 @@ +use std::io; + +use libp2p::{swarm::ConnectionId, PeerId}; + +#[derive(Debug)] +pub enum Error { + /// There were no peers to send a message to. + NoPeers, + /// IO error from peer + PeerIOError { + error: io::Error, + peer_id: PeerId, + connection_id: ConnectionId, + }, +} diff --git a/nomos-mix/network/src/handler.rs b/nomos-mix/network/src/handler.rs new file mode 100644 index 00000000..7271bdd7 --- /dev/null +++ b/nomos-mix/network/src/handler.rs @@ -0,0 +1,268 @@ +use std::{ + collections::VecDeque, + io, + task::{Context, Poll, Waker}, + time::Duration, +}; + +use futures::{future::BoxFuture, AsyncReadExt, AsyncWriteExt, FutureExt}; +use futures_timer::Delay; +use libp2p::{ + core::upgrade::ReadyUpgrade, + swarm::{ + handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound}, + ConnectionHandler, ConnectionHandlerEvent, StreamUpgradeError, SubstreamProtocol, + }, + Stream, StreamProtocol, +}; +use nomos_mix_message::{is_noise, MSG_SIZE, NOISE}; +use nomos_mix_queue::{NonMixQueue, Queue}; + +use crate::behaviour::Config; + +const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/nomos/mix/0.1.0"); + +/// A [`ConnectionHandler`] that handles the mix protocol. +pub struct MixConnectionHandler { + inbound_substream: Option, + outbound_substream: Option, + interval: Duration, // TODO: use absolute time + timer: Delay, + queue: Box> + Send>, + pending_events_to_behaviour: VecDeque, + waker: Option, +} + +type MsgSendFuture = BoxFuture<'static, Result>; +type MsgRecvFuture = BoxFuture<'static, Result<(Stream, Vec), io::Error>>; + +enum OutboundSubstreamState { + /// A request to open a new outbound substream is being processed. + PendingOpenSubstream, + /// An outbound substream is open and ready to send messages. + Idle(Stream), + /// A message is being sent on the outbound substream. + PendingSend(MsgSendFuture), +} + +impl MixConnectionHandler { + pub fn new(config: &Config) -> Self { + let interval_sec = 1.0 / config.transmission_rate; + let interval = Duration::from_millis((interval_sec * 1000.0) as u64); + Self { + inbound_substream: None, + outbound_substream: None, + interval, + timer: Delay::new(interval), + queue: Box::new(NonMixQueue::new(NOISE.to_vec())), + pending_events_to_behaviour: VecDeque::new(), + waker: None, + } + } + + fn try_wake(&mut self) { + if let Some(waker) = self.waker.take() { + waker.wake(); + } + } +} + +#[derive(Debug)] +pub enum FromBehaviour { + /// A message to be sent to the connection. + Message(Vec), +} + +#[derive(Debug)] +pub enum ToBehaviour { + /// An outbound substream has been successfully upgraded for the mix protocol. + FullyNegotiatedOutbound, + /// An outbound substream was failed to be upgraded for the mix protocol. + NegotiationFailed, + /// A message has been received from the connection. + Message(Vec), + /// An IO error from the connection + IOError(io::Error), +} + +impl ConnectionHandler for MixConnectionHandler { + type FromBehaviour = FromBehaviour; + type ToBehaviour = ToBehaviour; + type InboundProtocol = ReadyUpgrade; + type InboundOpenInfo = (); + type OutboundProtocol = ReadyUpgrade; + type OutboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()) + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent, + > { + // Process pending events to be sent to the behaviour + if let Some(event) = self.pending_events_to_behaviour.pop_front() { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); + } + + // Process inbound stream + tracing::debug!("Processing inbound stream"); + if let Some(msg_recv_fut) = self.inbound_substream.as_mut() { + match msg_recv_fut.poll_unpin(cx) { + Poll::Ready(Ok((stream, msg))) => { + tracing::debug!("Received message from inbound stream. Notifying behaviour..."); + self.inbound_substream = Some(recv_msg(stream).boxed()); + if !is_noise(&msg) { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + ToBehaviour::Message(msg), + )); + } + } + Poll::Ready(Err(e)) => { + tracing::error!("Failed to receive message from inbound stream: {:?}", e); + self.inbound_substream = None; + } + Poll::Pending => {} + } + } + + // Process outbound stream + tracing::debug!("Processing outbound stream"); + loop { + match self.outbound_substream.take() { + // If the request to open a new outbound substream is still being processed, wait more. + Some(OutboundSubstreamState::PendingOpenSubstream) => { + self.outbound_substream = Some(OutboundSubstreamState::PendingOpenSubstream); + self.waker = Some(cx.waker().clone()); + return Poll::Pending; + } + // If the substream is idle, and if it's time to send a message, send it. + Some(OutboundSubstreamState::Idle(stream)) => match self.timer.poll_unpin(cx) { + Poll::Ready(_) => { + let msg = self.queue.pop(); + tracing::debug!("Sending message to outbound stream: {:?}", msg); + self.outbound_substream = Some(OutboundSubstreamState::PendingSend( + send_msg(stream, msg).boxed(), + )); + self.timer.reset(self.interval); + } + Poll::Pending => { + self.outbound_substream = Some(OutboundSubstreamState::Idle(stream)); + self.waker = Some(cx.waker().clone()); + return Poll::Pending; + } + }, + // If a message is being sent, check if it's done. + Some(OutboundSubstreamState::PendingSend(mut msg_send_fut)) => { + match msg_send_fut.poll_unpin(cx) { + Poll::Ready(Ok(stream)) => { + tracing::debug!("Message sent to outbound stream"); + self.outbound_substream = Some(OutboundSubstreamState::Idle(stream)); + } + Poll::Ready(Err(e)) => { + tracing::error!("Failed to send message to outbound stream: {:?}", e); + self.outbound_substream = None; + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + ToBehaviour::IOError(e), + )); + } + Poll::Pending => { + self.outbound_substream = + Some(OutboundSubstreamState::PendingSend(msg_send_fut)); + self.waker = Some(cx.waker().clone()); + return Poll::Pending; + } + } + } + // If there is no outbound substream, request to open a new one. + None => { + self.outbound_substream = Some(OutboundSubstreamState::PendingOpenSubstream); + return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()), + }); + } + } + } + } + + fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { + match event { + FromBehaviour::Message(msg) => { + self.queue.push(msg); + } + } + } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { + protocol: stream, + .. + }) => { + tracing::debug!("FullyNegotiatedInbound: Creating inbound substream"); + self.inbound_substream = Some(recv_msg(stream).boxed()) + } + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { + protocol: stream, + .. + }) => { + tracing::debug!("FullyNegotiatedOutbound: Creating outbound substream"); + self.outbound_substream = Some(OutboundSubstreamState::Idle(stream)); + self.pending_events_to_behaviour + .push_back(ToBehaviour::FullyNegotiatedOutbound); + } + ConnectionEvent::DialUpgradeError(e) => { + tracing::error!("DialUpgradeError: {:?}", e); + match e.error { + StreamUpgradeError::NegotiationFailed => { + self.pending_events_to_behaviour + .push_back(ToBehaviour::NegotiationFailed); + } + StreamUpgradeError::Io(e) => { + self.pending_events_to_behaviour + .push_back(ToBehaviour::IOError(e)); + } + StreamUpgradeError::Timeout => { + self.pending_events_to_behaviour + .push_back(ToBehaviour::IOError(io::Error::new( + io::ErrorKind::TimedOut, + "mix protocol negotiation timed out", + ))); + } + StreamUpgradeError::Apply(_) => unreachable!(), + } + } + event => { + tracing::debug!("Ignoring connection event: {:?}", event) + } + } + + self.try_wake(); + } +} + +/// Write a message to the stream +async fn send_msg(mut stream: Stream, msg: Vec) -> io::Result { + stream.write_all(&msg).await?; + stream.flush().await?; + Ok(stream) +} + +/// Read a fixed-length message from the stream +// TODO: Consider handling variable-length messages +async fn recv_msg(mut stream: Stream) -> io::Result<(Stream, Vec)> { + let mut buf = vec![0; MSG_SIZE]; + stream.read_exact(&mut buf).await?; + Ok((stream, buf)) +} diff --git a/nomos-mix/network/src/lib.rs b/nomos-mix/network/src/lib.rs new file mode 100644 index 00000000..01fd88c6 --- /dev/null +++ b/nomos-mix/network/src/lib.rs @@ -0,0 +1,157 @@ +mod behaviour; +mod error; +mod handler; + +pub use behaviour::{Behaviour, Event}; + +#[cfg(test)] +mod test { + use std::time::Duration; + + use libp2p::{ + futures::StreamExt, + identity::Keypair, + swarm::{dummy, NetworkBehaviour, SwarmEvent}, + Multiaddr, PeerId, Swarm, SwarmBuilder, + }; + use nomos_mix_message::{new_message, MSG_SIZE}; + use tokio::select; + + use crate::{behaviour::Config, error::Error, Behaviour, Event}; + + /// Check that an wrapped message is forwarded through mix nodes and unwrapped successfully. + #[tokio::test] + async fn behaviour() { + let k1 = libp2p::identity::Keypair::generate_ed25519(); + let peer_id1 = PeerId::from_public_key(&k1.public()); + let k2 = libp2p::identity::Keypair::generate_ed25519(); + + // Initialize two swarms that support the mix protocol. + let mut swarm1 = new_swarm(k1); + let mut swarm2 = new_swarm(k2); + + let addr: Multiaddr = "/ip4/127.0.0.1/udp/5073/quic-v1".parse().unwrap(); + let addr_with_peer_id = addr.clone().with_p2p(peer_id1).unwrap(); + + // Spawn swarm1 + tokio::spawn(async move { + swarm1.listen_on(addr).unwrap(); + loop { + swarm1.select_next_some().await; + } + }); + + // Dial to swarm1 from swarm2 + tokio::time::sleep(Duration::from_secs(1)).await; + swarm2.dial(addr_with_peer_id).unwrap(); + tokio::time::sleep(Duration::from_secs(1)).await; + + // Prepare a task for swarm2 to publish a two-layer wrapped message, + // receive an one-layer unwrapped message from swarm1, + // and return a fully unwrapped message. + let task = async { + let mut msg_published = false; + let mut publish_try_interval = tokio::time::interval(Duration::from_secs(1)); + loop { + select! { + // Try to publish a message until it succeeds. + // (It will fail until swarm2 is connected to swarm1 successfully.) + _ = publish_try_interval.tick() => { + if !msg_published { + // Prepare a message wrapped in two layers + let msg = new_message(&[1; MSG_SIZE - 1], 2).unwrap(); + msg_published = swarm2.behaviour_mut().publish(msg).is_ok(); + } + } + // Proceed swarm2 + event = swarm2.select_next_some() => { + if let SwarmEvent::Behaviour(Event::FullyUnwrappedMessage(message)) = event { + println!("SWARM2 FULLY_UNWRAPPED_MESSAGE: {:?}", message); + break; + }; + } + } + } + }; + + // Expect for the task to be completed within 30 seconds. + assert!(tokio::time::timeout(Duration::from_secs(30), task) + .await + .is_ok()); + } + + /// If the peer doesn't support the mix protocol, the message should not be forwarded to the peer. + #[tokio::test] + async fn peer_not_support_mix_protocol() { + let k1 = libp2p::identity::Keypair::generate_ed25519(); + let peer_id1 = PeerId::from_public_key(&k1.public()); + let k2 = libp2p::identity::Keypair::generate_ed25519(); + + // Only swarm2 supports the mix protocol. + let mut swarm1 = new_swarm_without_mix(k1); + let mut swarm2 = new_swarm(k2); + + let addr: Multiaddr = "/ip4/127.0.0.1/udp/5074/quic-v1".parse().unwrap(); + let addr_with_peer_id = addr.clone().with_p2p(peer_id1).unwrap(); + + // Spawn swarm1 + tokio::spawn(async move { + swarm1.listen_on(addr).unwrap(); + loop { + swarm1.select_next_some().await; + } + }); + + // Dial to swarm1 from swarm2 + tokio::time::sleep(Duration::from_secs(1)).await; + swarm2.dial(addr_with_peer_id).unwrap(); + tokio::time::sleep(Duration::from_secs(1)).await; + + // Expect all publish attempts to fail with [`Error::NoPeers`] + // because swarm2 doesn't have any peers that support the mix protocol. + let mut publish_try_interval = tokio::time::interval(Duration::from_secs(1)); + let mut publish_try_count = 0; + loop { + select! { + _ = publish_try_interval.tick() => { + let msg = new_message(&[10; MSG_SIZE - 1], 1).unwrap(); + assert!(matches!(swarm2.behaviour_mut().publish(msg), Err(Error::NoPeers))); + publish_try_count += 1; + if publish_try_count >= 10 { + break; + } + } + _ = swarm2.select_next_some() => {} + } + } + } + + fn new_swarm(key: Keypair) -> Swarm { + new_swarm_with_behaviour( + key, + Behaviour::new(Config { + transmission_rate: 1.0, + duplicate_cache_lifespan: 60, + }), + ) + } + + fn new_swarm_without_mix(key: Keypair) -> Swarm { + new_swarm_with_behaviour(key, dummy::Behaviour) + } + + fn new_swarm_with_behaviour(key: Keypair, behaviour: B) -> Swarm { + SwarmBuilder::with_existing_identity(key) + .with_tokio() + .with_other_transport(|keypair| { + libp2p::quic::tokio::Transport::new(libp2p::quic::Config::new(keypair)) + }) + .unwrap() + .with_behaviour(|_| behaviour) + .unwrap() + .with_swarm_config(|cfg| { + cfg.with_idle_connection_timeout(std::time::Duration::from_secs(u64::MAX)) + }) + .build() + } +} diff --git a/nomos-mix/queue/Cargo.toml b/nomos-mix/queue/Cargo.toml new file mode 100644 index 00000000..77c0e7ed --- /dev/null +++ b/nomos-mix/queue/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "nomos-mix-queue" +version = "0.1.0" +edition = "2021" + +[dependencies] +rand = "0.8.5" diff --git a/nomos-mix/queue/src/lib.rs b/nomos-mix/queue/src/lib.rs new file mode 100644 index 00000000..397dc4a2 --- /dev/null +++ b/nomos-mix/queue/src/lib.rs @@ -0,0 +1,39 @@ +use std::collections::VecDeque; + +/// A [`Queue`] controls the order of messages to be emitted to a single connection. +pub trait Queue { + /// Push a message to the queue. + fn push(&mut self, data: T); + + /// Pop a message from the queue. + /// + /// The returned message is either the real message pushed before or a noise message. + fn pop(&mut self) -> T; +} + +/// A regular queue that does not mix the order of messages. +/// +/// This queue returns a noise message if the queue is empty. +pub struct NonMixQueue { + queue: VecDeque, + noise: T, +} + +impl NonMixQueue { + pub fn new(noise: T) -> Self { + Self { + queue: VecDeque::new(), + noise, + } + } +} + +impl Queue for NonMixQueue { + fn push(&mut self, data: T) { + self.queue.push_back(data); + } + + fn pop(&mut self) -> T { + self.queue.pop_front().unwrap_or(self.noise.clone()) + } +}