Mix: NetworkBehaviour (#765)
* Mix: NetworkBehaviour * use Waker.wake() * make clippy happy by defining Config instead of Behaviour::Default
This commit is contained in:
parent
8142feaa8c
commit
ca0eb824aa
@ -22,6 +22,9 @@ members = [
|
||||
"nomos-services/data-availability/verifier",
|
||||
"nomos-services/data-availability/tests",
|
||||
"nomos-da/full-replication",
|
||||
"nomos-mix/message",
|
||||
"nomos-mix/network",
|
||||
"nomos-mix/queue",
|
||||
"nomos-cli",
|
||||
"nomos-utils",
|
||||
"nodes/nomos-node",
|
||||
@ -34,4 +37,4 @@ members = [
|
||||
"tests",
|
||||
]
|
||||
exclude = ["proof_of_leadership/risc0/risc0_proofs"]
|
||||
resolver = "2"
|
||||
resolver = "2"
|
||||
|
7
nomos-mix/message/Cargo.toml
Normal file
7
nomos-mix/message/Cargo.toml
Normal file
@ -0,0 +1,7 @@
|
||||
[package]
|
||||
name = "nomos-mix-message"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
sha2 = "0.10.8"
|
10
nomos-mix/message/src/error.rs
Normal file
10
nomos-mix/message/src/error.rs
Normal file
@ -0,0 +1,10 @@
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
/// Invalid mix message format
|
||||
InvalidMixMessage,
|
||||
/// Payload size is too large
|
||||
PayloadTooLarge,
|
||||
/// Unwrapping a message is not allowed
|
||||
/// (e.g. the message cannot be unwrapped using the private key provided)
|
||||
MsgUnwrapNotAllowed,
|
||||
}
|
61
nomos-mix/message/src/lib.rs
Normal file
61
nomos-mix/message/src/lib.rs
Normal file
@ -0,0 +1,61 @@
|
||||
mod error;
|
||||
|
||||
pub use error::Error;
|
||||
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
pub const MSG_SIZE: usize = 1024;
|
||||
pub const NOISE: [u8; MSG_SIZE] = [0; MSG_SIZE];
|
||||
|
||||
/// A mock implementation of the Sphinx encoding.
|
||||
///
|
||||
/// The length of the encoded message is fixed to [`MSG_SIZE`] bytes.
|
||||
/// The first byte of the encoded message is the number of remaining layers to be unwrapped.
|
||||
/// The remaining bytes are the payload that is zero-padded to the end.
|
||||
pub fn new_message(payload: &[u8], num_layers: u8) -> Result<Vec<u8>, Error> {
|
||||
if payload.len() > MSG_SIZE - 1 {
|
||||
return Err(Error::PayloadTooLarge);
|
||||
}
|
||||
|
||||
let mut message: Vec<u8> = Vec::with_capacity(MSG_SIZE);
|
||||
message.push(num_layers);
|
||||
message.extend(payload);
|
||||
message.extend(std::iter::repeat(0).take(MSG_SIZE - message.len()));
|
||||
Ok(message)
|
||||
}
|
||||
|
||||
/// SHA-256 hash of the message
|
||||
pub fn message_id(message: &[u8]) -> Vec<u8> {
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(message);
|
||||
hasher.finalize().to_vec()
|
||||
}
|
||||
|
||||
/// Unwrap the message one layer.
|
||||
///
|
||||
/// This function returns the unwrapped message and a boolean indicating whether the message was fully unwrapped.
|
||||
/// (False if the message still has layers to be unwrapped, true otherwise)
|
||||
///
|
||||
/// If the input message was already fully unwrapped, or if ititss format is invalid,
|
||||
/// this function returns `[Error::InvalidMixMessage]`.
|
||||
pub fn unwrap_message(message: &[u8]) -> Result<(Vec<u8>, bool), Error> {
|
||||
if message.is_empty() {
|
||||
return Err(Error::InvalidMixMessage);
|
||||
}
|
||||
|
||||
match message[0] {
|
||||
0 => Err(Error::InvalidMixMessage),
|
||||
1 => Ok((message[1..].to_vec(), true)),
|
||||
n => {
|
||||
let mut unwrapped: Vec<u8> = Vec::with_capacity(message.len());
|
||||
unwrapped.push(n - 1);
|
||||
unwrapped.extend(&message[1..]);
|
||||
Ok((unwrapped, false))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if the message is a noise message.
|
||||
pub fn is_noise(message: &[u8]) -> bool {
|
||||
message == NOISE
|
||||
}
|
18
nomos-mix/network/Cargo.toml
Normal file
18
nomos-mix/network/Cargo.toml
Normal file
@ -0,0 +1,18 @@
|
||||
[package]
|
||||
name = "nomos-mix-network"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
cached = "0.53.1"
|
||||
futures = "0.3.30"
|
||||
futures-timer = "3.0.3"
|
||||
libp2p = "0.53"
|
||||
tracing = "0.1"
|
||||
nomos-mix-message = { path = "../message" }
|
||||
nomos-mix-queue = { path = "../queue" }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "1.39", features = ["macros", "rt-multi-thread", "time"] }
|
||||
libp2p = { version = "0.53", features = ["ed25519", "tokio", "quic"] }
|
||||
tracing-subscriber = "0.3.18"
|
256
nomos-mix/network/src/behaviour.rs
Normal file
256
nomos-mix/network/src/behaviour.rs
Normal file
@ -0,0 +1,256 @@
|
||||
use std::{
|
||||
collections::{HashMap, HashSet, VecDeque},
|
||||
task::{Context, Poll, Waker},
|
||||
};
|
||||
|
||||
use cached::{Cached, TimedCache};
|
||||
use libp2p::{
|
||||
core::Endpoint,
|
||||
swarm::{
|
||||
ConnectionClosed, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour,
|
||||
NotifyHandler, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
|
||||
},
|
||||
Multiaddr, PeerId,
|
||||
};
|
||||
use nomos_mix_message::{message_id, unwrap_message};
|
||||
|
||||
use crate::{
|
||||
error::Error,
|
||||
handler::{FromBehaviour, MixConnectionHandler, ToBehaviour},
|
||||
};
|
||||
|
||||
/// A [`NetworkBehaviour`] that forwards messages between mix nodes.
|
||||
pub struct Behaviour {
|
||||
config: Config,
|
||||
/// Peers that support the mix protocol, and their connection IDs
|
||||
negotiated_peers: HashMap<PeerId, HashSet<ConnectionId>>,
|
||||
/// Queue of events to yield to the swarm.
|
||||
events: VecDeque<ToSwarm<Event, FromBehaviour>>,
|
||||
/// Waker that handles polling
|
||||
waker: Option<Waker>,
|
||||
/// An LRU time cache for storing seen messages (based on their ID). This cache prevents
|
||||
/// duplicates from being propagated on the network.
|
||||
duplicate_cache: TimedCache<Vec<u8>, ()>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Config {
|
||||
pub transmission_rate: f64,
|
||||
pub duplicate_cache_lifespan: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Event {
|
||||
/// A fully unwrapped message received from one of the peers.
|
||||
FullyUnwrappedMessage(Vec<u8>),
|
||||
Error(Error),
|
||||
}
|
||||
|
||||
impl Behaviour {
|
||||
pub fn new(config: Config) -> Self {
|
||||
let duplicate_cache = TimedCache::with_lifespan(config.duplicate_cache_lifespan);
|
||||
Self {
|
||||
config,
|
||||
negotiated_peers: HashMap::new(),
|
||||
events: VecDeque::new(),
|
||||
waker: None,
|
||||
duplicate_cache,
|
||||
}
|
||||
}
|
||||
|
||||
/// Publishs a message through the mix network.
|
||||
///
|
||||
/// This function expects that the message was already encoded for the cryptographic mixing
|
||||
/// (e.g. Sphinx encoding).
|
||||
///
|
||||
/// The message is forward to all connected peers,
|
||||
/// so that it can arrive in the mix node who can unwrap it one layer.
|
||||
/// Fully unwrapped messages are returned as the [`MixBehaviourEvent::FullyUnwrappedMessage`].
|
||||
pub fn publish(&mut self, message: Vec<u8>) -> Result<(), Error> {
|
||||
self.duplicate_cache.cache_set(message_id(&message), ());
|
||||
self.forward_message(message, None)
|
||||
}
|
||||
|
||||
/// Forwards a message to all connected peers except the one that was received from.
|
||||
///
|
||||
/// Returns [`Error::NoPeers`] if there are no connected peers that support the mix protocol.
|
||||
fn forward_message(
|
||||
&mut self,
|
||||
message: Vec<u8>,
|
||||
propagation_source: Option<PeerId>,
|
||||
) -> Result<(), Error> {
|
||||
let peer_ids = self
|
||||
.negotiated_peers
|
||||
.keys()
|
||||
.filter(|&peer_id| {
|
||||
if let Some(propagation_source) = propagation_source {
|
||||
*peer_id != propagation_source
|
||||
} else {
|
||||
true
|
||||
}
|
||||
})
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if peer_ids.is_empty() {
|
||||
return Err(Error::NoPeers);
|
||||
}
|
||||
|
||||
for peer_id in peer_ids.into_iter() {
|
||||
tracing::debug!("Registering event for peer {:?} to send msg", peer_id);
|
||||
self.events.push_back(ToSwarm::NotifyHandler {
|
||||
peer_id,
|
||||
handler: NotifyHandler::Any,
|
||||
event: FromBehaviour::Message(message.clone()),
|
||||
});
|
||||
}
|
||||
|
||||
self.try_wake();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_negotiated_peer(&mut self, peer_id: PeerId, connection_id: ConnectionId) -> bool {
|
||||
tracing::debug!(
|
||||
"Adding to connected_peers: peer_id:{:?}, connection_id:{:?}",
|
||||
peer_id,
|
||||
connection_id
|
||||
);
|
||||
self.negotiated_peers
|
||||
.entry(peer_id)
|
||||
.or_default()
|
||||
.insert(connection_id)
|
||||
}
|
||||
|
||||
fn remove_negotiated_peer(&mut self, peer_id: &PeerId, connection_id: &ConnectionId) {
|
||||
if let Some(connections) = self.negotiated_peers.get_mut(peer_id) {
|
||||
tracing::debug!(
|
||||
"Removing from connected_peers: peer:{:?}, connection_id:{:?}",
|
||||
peer_id,
|
||||
connection_id
|
||||
);
|
||||
connections.remove(connection_id);
|
||||
if connections.is_empty() {
|
||||
self.negotiated_peers.remove(peer_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn try_wake(&mut self) {
|
||||
if let Some(waker) = self.waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NetworkBehaviour for Behaviour {
|
||||
type ConnectionHandler = MixConnectionHandler;
|
||||
type ToSwarm = Event;
|
||||
|
||||
fn handle_established_inbound_connection(
|
||||
&mut self,
|
||||
_: ConnectionId,
|
||||
_: PeerId,
|
||||
_: &Multiaddr,
|
||||
_: &Multiaddr,
|
||||
) -> Result<THandler<Self>, ConnectionDenied> {
|
||||
Ok(MixConnectionHandler::new(&self.config))
|
||||
}
|
||||
|
||||
fn handle_established_outbound_connection(
|
||||
&mut self,
|
||||
_: ConnectionId,
|
||||
_: PeerId,
|
||||
_: &Multiaddr,
|
||||
_: Endpoint,
|
||||
) -> Result<THandler<Self>, ConnectionDenied> {
|
||||
Ok(MixConnectionHandler::new(&self.config))
|
||||
}
|
||||
|
||||
/// Informs the behaviour about an event from the [`Swarm`].
|
||||
fn on_swarm_event(&mut self, event: FromSwarm) {
|
||||
if let FromSwarm::ConnectionClosed(ConnectionClosed {
|
||||
peer_id,
|
||||
connection_id,
|
||||
..
|
||||
}) = event
|
||||
{
|
||||
self.remove_negotiated_peer(&peer_id, &connection_id);
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles an event generated by the [`MixConnectionHandler`]
|
||||
/// dedicated to the connection identified by `peer_id` and `connection_id`.
|
||||
fn on_connection_handler_event(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
event: THandlerOutEvent<Self>,
|
||||
) {
|
||||
match event {
|
||||
// A message was forwarded from the peer.
|
||||
ToBehaviour::Message(message) => {
|
||||
if self
|
||||
.duplicate_cache
|
||||
.cache_set(message_id(&message), ())
|
||||
.is_some()
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// Try to unwrap the message.
|
||||
match unwrap_message(&message) {
|
||||
Ok((unwrapped_msg, fully_unwrapped)) => {
|
||||
if fully_unwrapped {
|
||||
self.events.push_back(ToSwarm::GenerateEvent(
|
||||
Event::FullyUnwrappedMessage(unwrapped_msg),
|
||||
));
|
||||
} else if let Err(e) = self.forward_message(unwrapped_msg, None) {
|
||||
tracing::error!("Failed to forward message: {:?}", e);
|
||||
}
|
||||
}
|
||||
Err(nomos_mix_message::Error::MsgUnwrapNotAllowed) => {
|
||||
// Forward the received message as it is.
|
||||
if let Err(e) = self.forward_message(message, Some(peer_id)) {
|
||||
tracing::error!("Failed to forward message: {:?}", e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to unwrap message: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
// The connection was fully negotiated by the peer,
|
||||
// which means that the peer supports the mix protocol.
|
||||
ToBehaviour::FullyNegotiatedOutbound => {
|
||||
self.add_negotiated_peer(peer_id, connection_id);
|
||||
}
|
||||
ToBehaviour::NegotiationFailed => {
|
||||
self.remove_negotiated_peer(&peer_id, &connection_id);
|
||||
}
|
||||
ToBehaviour::IOError(error) => {
|
||||
// TODO: Consider removing the peer from the connected_peers and closing the connection
|
||||
self.events
|
||||
.push_back(ToSwarm::GenerateEvent(Event::Error(Error::PeerIOError {
|
||||
error,
|
||||
peer_id,
|
||||
connection_id,
|
||||
})));
|
||||
}
|
||||
}
|
||||
|
||||
self.try_wake();
|
||||
}
|
||||
|
||||
/// Polls for things that swarm should do.
|
||||
fn poll(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
|
||||
if let Some(event) = self.events.pop_front() {
|
||||
Poll::Ready(event)
|
||||
} else {
|
||||
self.waker = Some(cx.waker().clone());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
15
nomos-mix/network/src/error.rs
Normal file
15
nomos-mix/network/src/error.rs
Normal file
@ -0,0 +1,15 @@
|
||||
use std::io;
|
||||
|
||||
use libp2p::{swarm::ConnectionId, PeerId};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
/// There were no peers to send a message to.
|
||||
NoPeers,
|
||||
/// IO error from peer
|
||||
PeerIOError {
|
||||
error: io::Error,
|
||||
peer_id: PeerId,
|
||||
connection_id: ConnectionId,
|
||||
},
|
||||
}
|
268
nomos-mix/network/src/handler.rs
Normal file
268
nomos-mix/network/src/handler.rs
Normal file
@ -0,0 +1,268 @@
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
io,
|
||||
task::{Context, Poll, Waker},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use futures::{future::BoxFuture, AsyncReadExt, AsyncWriteExt, FutureExt};
|
||||
use futures_timer::Delay;
|
||||
use libp2p::{
|
||||
core::upgrade::ReadyUpgrade,
|
||||
swarm::{
|
||||
handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound},
|
||||
ConnectionHandler, ConnectionHandlerEvent, StreamUpgradeError, SubstreamProtocol,
|
||||
},
|
||||
Stream, StreamProtocol,
|
||||
};
|
||||
use nomos_mix_message::{is_noise, MSG_SIZE, NOISE};
|
||||
use nomos_mix_queue::{NonMixQueue, Queue};
|
||||
|
||||
use crate::behaviour::Config;
|
||||
|
||||
const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/nomos/mix/0.1.0");
|
||||
|
||||
/// A [`ConnectionHandler`] that handles the mix protocol.
|
||||
pub struct MixConnectionHandler {
|
||||
inbound_substream: Option<MsgRecvFuture>,
|
||||
outbound_substream: Option<OutboundSubstreamState>,
|
||||
interval: Duration, // TODO: use absolute time
|
||||
timer: Delay,
|
||||
queue: Box<dyn Queue<Vec<u8>> + Send>,
|
||||
pending_events_to_behaviour: VecDeque<ToBehaviour>,
|
||||
waker: Option<Waker>,
|
||||
}
|
||||
|
||||
type MsgSendFuture = BoxFuture<'static, Result<Stream, io::Error>>;
|
||||
type MsgRecvFuture = BoxFuture<'static, Result<(Stream, Vec<u8>), io::Error>>;
|
||||
|
||||
enum OutboundSubstreamState {
|
||||
/// A request to open a new outbound substream is being processed.
|
||||
PendingOpenSubstream,
|
||||
/// An outbound substream is open and ready to send messages.
|
||||
Idle(Stream),
|
||||
/// A message is being sent on the outbound substream.
|
||||
PendingSend(MsgSendFuture),
|
||||
}
|
||||
|
||||
impl MixConnectionHandler {
|
||||
pub fn new(config: &Config) -> Self {
|
||||
let interval_sec = 1.0 / config.transmission_rate;
|
||||
let interval = Duration::from_millis((interval_sec * 1000.0) as u64);
|
||||
Self {
|
||||
inbound_substream: None,
|
||||
outbound_substream: None,
|
||||
interval,
|
||||
timer: Delay::new(interval),
|
||||
queue: Box::new(NonMixQueue::new(NOISE.to_vec())),
|
||||
pending_events_to_behaviour: VecDeque::new(),
|
||||
waker: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn try_wake(&mut self) {
|
||||
if let Some(waker) = self.waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum FromBehaviour {
|
||||
/// A message to be sent to the connection.
|
||||
Message(Vec<u8>),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ToBehaviour {
|
||||
/// An outbound substream has been successfully upgraded for the mix protocol.
|
||||
FullyNegotiatedOutbound,
|
||||
/// An outbound substream was failed to be upgraded for the mix protocol.
|
||||
NegotiationFailed,
|
||||
/// A message has been received from the connection.
|
||||
Message(Vec<u8>),
|
||||
/// An IO error from the connection
|
||||
IOError(io::Error),
|
||||
}
|
||||
|
||||
impl ConnectionHandler for MixConnectionHandler {
|
||||
type FromBehaviour = FromBehaviour;
|
||||
type ToBehaviour = ToBehaviour;
|
||||
type InboundProtocol = ReadyUpgrade<StreamProtocol>;
|
||||
type InboundOpenInfo = ();
|
||||
type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
|
||||
type OutboundOpenInfo = ();
|
||||
|
||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
||||
SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ())
|
||||
}
|
||||
|
||||
fn poll(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<
|
||||
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
|
||||
> {
|
||||
// Process pending events to be sent to the behaviour
|
||||
if let Some(event) = self.pending_events_to_behaviour.pop_front() {
|
||||
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
|
||||
}
|
||||
|
||||
// Process inbound stream
|
||||
tracing::debug!("Processing inbound stream");
|
||||
if let Some(msg_recv_fut) = self.inbound_substream.as_mut() {
|
||||
match msg_recv_fut.poll_unpin(cx) {
|
||||
Poll::Ready(Ok((stream, msg))) => {
|
||||
tracing::debug!("Received message from inbound stream. Notifying behaviour...");
|
||||
self.inbound_substream = Some(recv_msg(stream).boxed());
|
||||
if !is_noise(&msg) {
|
||||
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
|
||||
ToBehaviour::Message(msg),
|
||||
));
|
||||
}
|
||||
}
|
||||
Poll::Ready(Err(e)) => {
|
||||
tracing::error!("Failed to receive message from inbound stream: {:?}", e);
|
||||
self.inbound_substream = None;
|
||||
}
|
||||
Poll::Pending => {}
|
||||
}
|
||||
}
|
||||
|
||||
// Process outbound stream
|
||||
tracing::debug!("Processing outbound stream");
|
||||
loop {
|
||||
match self.outbound_substream.take() {
|
||||
// If the request to open a new outbound substream is still being processed, wait more.
|
||||
Some(OutboundSubstreamState::PendingOpenSubstream) => {
|
||||
self.outbound_substream = Some(OutboundSubstreamState::PendingOpenSubstream);
|
||||
self.waker = Some(cx.waker().clone());
|
||||
return Poll::Pending;
|
||||
}
|
||||
// If the substream is idle, and if it's time to send a message, send it.
|
||||
Some(OutboundSubstreamState::Idle(stream)) => match self.timer.poll_unpin(cx) {
|
||||
Poll::Ready(_) => {
|
||||
let msg = self.queue.pop();
|
||||
tracing::debug!("Sending message to outbound stream: {:?}", msg);
|
||||
self.outbound_substream = Some(OutboundSubstreamState::PendingSend(
|
||||
send_msg(stream, msg).boxed(),
|
||||
));
|
||||
self.timer.reset(self.interval);
|
||||
}
|
||||
Poll::Pending => {
|
||||
self.outbound_substream = Some(OutboundSubstreamState::Idle(stream));
|
||||
self.waker = Some(cx.waker().clone());
|
||||
return Poll::Pending;
|
||||
}
|
||||
},
|
||||
// If a message is being sent, check if it's done.
|
||||
Some(OutboundSubstreamState::PendingSend(mut msg_send_fut)) => {
|
||||
match msg_send_fut.poll_unpin(cx) {
|
||||
Poll::Ready(Ok(stream)) => {
|
||||
tracing::debug!("Message sent to outbound stream");
|
||||
self.outbound_substream = Some(OutboundSubstreamState::Idle(stream));
|
||||
}
|
||||
Poll::Ready(Err(e)) => {
|
||||
tracing::error!("Failed to send message to outbound stream: {:?}", e);
|
||||
self.outbound_substream = None;
|
||||
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
|
||||
ToBehaviour::IOError(e),
|
||||
));
|
||||
}
|
||||
Poll::Pending => {
|
||||
self.outbound_substream =
|
||||
Some(OutboundSubstreamState::PendingSend(msg_send_fut));
|
||||
self.waker = Some(cx.waker().clone());
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
}
|
||||
// If there is no outbound substream, request to open a new one.
|
||||
None => {
|
||||
self.outbound_substream = Some(OutboundSubstreamState::PendingOpenSubstream);
|
||||
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
|
||||
protocol: SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
|
||||
match event {
|
||||
FromBehaviour::Message(msg) => {
|
||||
self.queue.push(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn on_connection_event(
|
||||
&mut self,
|
||||
event: ConnectionEvent<
|
||||
Self::InboundProtocol,
|
||||
Self::OutboundProtocol,
|
||||
Self::InboundOpenInfo,
|
||||
Self::OutboundOpenInfo,
|
||||
>,
|
||||
) {
|
||||
match event {
|
||||
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
|
||||
protocol: stream,
|
||||
..
|
||||
}) => {
|
||||
tracing::debug!("FullyNegotiatedInbound: Creating inbound substream");
|
||||
self.inbound_substream = Some(recv_msg(stream).boxed())
|
||||
}
|
||||
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
|
||||
protocol: stream,
|
||||
..
|
||||
}) => {
|
||||
tracing::debug!("FullyNegotiatedOutbound: Creating outbound substream");
|
||||
self.outbound_substream = Some(OutboundSubstreamState::Idle(stream));
|
||||
self.pending_events_to_behaviour
|
||||
.push_back(ToBehaviour::FullyNegotiatedOutbound);
|
||||
}
|
||||
ConnectionEvent::DialUpgradeError(e) => {
|
||||
tracing::error!("DialUpgradeError: {:?}", e);
|
||||
match e.error {
|
||||
StreamUpgradeError::NegotiationFailed => {
|
||||
self.pending_events_to_behaviour
|
||||
.push_back(ToBehaviour::NegotiationFailed);
|
||||
}
|
||||
StreamUpgradeError::Io(e) => {
|
||||
self.pending_events_to_behaviour
|
||||
.push_back(ToBehaviour::IOError(e));
|
||||
}
|
||||
StreamUpgradeError::Timeout => {
|
||||
self.pending_events_to_behaviour
|
||||
.push_back(ToBehaviour::IOError(io::Error::new(
|
||||
io::ErrorKind::TimedOut,
|
||||
"mix protocol negotiation timed out",
|
||||
)));
|
||||
}
|
||||
StreamUpgradeError::Apply(_) => unreachable!(),
|
||||
}
|
||||
}
|
||||
event => {
|
||||
tracing::debug!("Ignoring connection event: {:?}", event)
|
||||
}
|
||||
}
|
||||
|
||||
self.try_wake();
|
||||
}
|
||||
}
|
||||
|
||||
/// Write a message to the stream
|
||||
async fn send_msg(mut stream: Stream, msg: Vec<u8>) -> io::Result<Stream> {
|
||||
stream.write_all(&msg).await?;
|
||||
stream.flush().await?;
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
/// Read a fixed-length message from the stream
|
||||
// TODO: Consider handling variable-length messages
|
||||
async fn recv_msg(mut stream: Stream) -> io::Result<(Stream, Vec<u8>)> {
|
||||
let mut buf = vec![0; MSG_SIZE];
|
||||
stream.read_exact(&mut buf).await?;
|
||||
Ok((stream, buf))
|
||||
}
|
157
nomos-mix/network/src/lib.rs
Normal file
157
nomos-mix/network/src/lib.rs
Normal file
@ -0,0 +1,157 @@
|
||||
mod behaviour;
|
||||
mod error;
|
||||
mod handler;
|
||||
|
||||
pub use behaviour::{Behaviour, Event};
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::time::Duration;
|
||||
|
||||
use libp2p::{
|
||||
futures::StreamExt,
|
||||
identity::Keypair,
|
||||
swarm::{dummy, NetworkBehaviour, SwarmEvent},
|
||||
Multiaddr, PeerId, Swarm, SwarmBuilder,
|
||||
};
|
||||
use nomos_mix_message::{new_message, MSG_SIZE};
|
||||
use tokio::select;
|
||||
|
||||
use crate::{behaviour::Config, error::Error, Behaviour, Event};
|
||||
|
||||
/// Check that an wrapped message is forwarded through mix nodes and unwrapped successfully.
|
||||
#[tokio::test]
|
||||
async fn behaviour() {
|
||||
let k1 = libp2p::identity::Keypair::generate_ed25519();
|
||||
let peer_id1 = PeerId::from_public_key(&k1.public());
|
||||
let k2 = libp2p::identity::Keypair::generate_ed25519();
|
||||
|
||||
// Initialize two swarms that support the mix protocol.
|
||||
let mut swarm1 = new_swarm(k1);
|
||||
let mut swarm2 = new_swarm(k2);
|
||||
|
||||
let addr: Multiaddr = "/ip4/127.0.0.1/udp/5073/quic-v1".parse().unwrap();
|
||||
let addr_with_peer_id = addr.clone().with_p2p(peer_id1).unwrap();
|
||||
|
||||
// Spawn swarm1
|
||||
tokio::spawn(async move {
|
||||
swarm1.listen_on(addr).unwrap();
|
||||
loop {
|
||||
swarm1.select_next_some().await;
|
||||
}
|
||||
});
|
||||
|
||||
// Dial to swarm1 from swarm2
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
swarm2.dial(addr_with_peer_id).unwrap();
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
||||
// Prepare a task for swarm2 to publish a two-layer wrapped message,
|
||||
// receive an one-layer unwrapped message from swarm1,
|
||||
// and return a fully unwrapped message.
|
||||
let task = async {
|
||||
let mut msg_published = false;
|
||||
let mut publish_try_interval = tokio::time::interval(Duration::from_secs(1));
|
||||
loop {
|
||||
select! {
|
||||
// Try to publish a message until it succeeds.
|
||||
// (It will fail until swarm2 is connected to swarm1 successfully.)
|
||||
_ = publish_try_interval.tick() => {
|
||||
if !msg_published {
|
||||
// Prepare a message wrapped in two layers
|
||||
let msg = new_message(&[1; MSG_SIZE - 1], 2).unwrap();
|
||||
msg_published = swarm2.behaviour_mut().publish(msg).is_ok();
|
||||
}
|
||||
}
|
||||
// Proceed swarm2
|
||||
event = swarm2.select_next_some() => {
|
||||
if let SwarmEvent::Behaviour(Event::FullyUnwrappedMessage(message)) = event {
|
||||
println!("SWARM2 FULLY_UNWRAPPED_MESSAGE: {:?}", message);
|
||||
break;
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Expect for the task to be completed within 30 seconds.
|
||||
assert!(tokio::time::timeout(Duration::from_secs(30), task)
|
||||
.await
|
||||
.is_ok());
|
||||
}
|
||||
|
||||
/// If the peer doesn't support the mix protocol, the message should not be forwarded to the peer.
|
||||
#[tokio::test]
|
||||
async fn peer_not_support_mix_protocol() {
|
||||
let k1 = libp2p::identity::Keypair::generate_ed25519();
|
||||
let peer_id1 = PeerId::from_public_key(&k1.public());
|
||||
let k2 = libp2p::identity::Keypair::generate_ed25519();
|
||||
|
||||
// Only swarm2 supports the mix protocol.
|
||||
let mut swarm1 = new_swarm_without_mix(k1);
|
||||
let mut swarm2 = new_swarm(k2);
|
||||
|
||||
let addr: Multiaddr = "/ip4/127.0.0.1/udp/5074/quic-v1".parse().unwrap();
|
||||
let addr_with_peer_id = addr.clone().with_p2p(peer_id1).unwrap();
|
||||
|
||||
// Spawn swarm1
|
||||
tokio::spawn(async move {
|
||||
swarm1.listen_on(addr).unwrap();
|
||||
loop {
|
||||
swarm1.select_next_some().await;
|
||||
}
|
||||
});
|
||||
|
||||
// Dial to swarm1 from swarm2
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
swarm2.dial(addr_with_peer_id).unwrap();
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
||||
// Expect all publish attempts to fail with [`Error::NoPeers`]
|
||||
// because swarm2 doesn't have any peers that support the mix protocol.
|
||||
let mut publish_try_interval = tokio::time::interval(Duration::from_secs(1));
|
||||
let mut publish_try_count = 0;
|
||||
loop {
|
||||
select! {
|
||||
_ = publish_try_interval.tick() => {
|
||||
let msg = new_message(&[10; MSG_SIZE - 1], 1).unwrap();
|
||||
assert!(matches!(swarm2.behaviour_mut().publish(msg), Err(Error::NoPeers)));
|
||||
publish_try_count += 1;
|
||||
if publish_try_count >= 10 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
_ = swarm2.select_next_some() => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn new_swarm(key: Keypair) -> Swarm<Behaviour> {
|
||||
new_swarm_with_behaviour(
|
||||
key,
|
||||
Behaviour::new(Config {
|
||||
transmission_rate: 1.0,
|
||||
duplicate_cache_lifespan: 60,
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
fn new_swarm_without_mix(key: Keypair) -> Swarm<dummy::Behaviour> {
|
||||
new_swarm_with_behaviour(key, dummy::Behaviour)
|
||||
}
|
||||
|
||||
fn new_swarm_with_behaviour<B: NetworkBehaviour>(key: Keypair, behaviour: B) -> Swarm<B> {
|
||||
SwarmBuilder::with_existing_identity(key)
|
||||
.with_tokio()
|
||||
.with_other_transport(|keypair| {
|
||||
libp2p::quic::tokio::Transport::new(libp2p::quic::Config::new(keypair))
|
||||
})
|
||||
.unwrap()
|
||||
.with_behaviour(|_| behaviour)
|
||||
.unwrap()
|
||||
.with_swarm_config(|cfg| {
|
||||
cfg.with_idle_connection_timeout(std::time::Duration::from_secs(u64::MAX))
|
||||
})
|
||||
.build()
|
||||
}
|
||||
}
|
7
nomos-mix/queue/Cargo.toml
Normal file
7
nomos-mix/queue/Cargo.toml
Normal file
@ -0,0 +1,7 @@
|
||||
[package]
|
||||
name = "nomos-mix-queue"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
rand = "0.8.5"
|
39
nomos-mix/queue/src/lib.rs
Normal file
39
nomos-mix/queue/src/lib.rs
Normal file
@ -0,0 +1,39 @@
|
||||
use std::collections::VecDeque;
|
||||
|
||||
/// A [`Queue`] controls the order of messages to be emitted to a single connection.
|
||||
pub trait Queue<T> {
|
||||
/// Push a message to the queue.
|
||||
fn push(&mut self, data: T);
|
||||
|
||||
/// Pop a message from the queue.
|
||||
///
|
||||
/// The returned message is either the real message pushed before or a noise message.
|
||||
fn pop(&mut self) -> T;
|
||||
}
|
||||
|
||||
/// A regular queue that does not mix the order of messages.
|
||||
///
|
||||
/// This queue returns a noise message if the queue is empty.
|
||||
pub struct NonMixQueue<T: Clone> {
|
||||
queue: VecDeque<T>,
|
||||
noise: T,
|
||||
}
|
||||
|
||||
impl<T: Clone> NonMixQueue<T> {
|
||||
pub fn new(noise: T) -> Self {
|
||||
Self {
|
||||
queue: VecDeque::new(),
|
||||
noise,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Clone> Queue<T> for NonMixQueue<T> {
|
||||
fn push(&mut self, data: T) {
|
||||
self.queue.push_back(data);
|
||||
}
|
||||
|
||||
fn pop(&mut self) -> T {
|
||||
self.queue.pop_front().unwrap_or(self.noise.clone())
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user