diff --git a/Cargo.toml b/Cargo.toml index b41a12b6..611ad5ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,8 +14,13 @@ members = [ "nomos-da/kzg", "nomos-da/full-replication", "nodes/nomos-node", + "nodes/mixnode", "simulations", "consensus-engine", "tests", + "mixnet/node", + "mixnet/client", + "mixnet/protocol", + "mixnet/topology", ] resolver = "2" \ No newline at end of file diff --git a/mixnet/client/Cargo.toml b/mixnet/client/Cargo.toml new file mode 100644 index 00000000..ebf7eafa --- /dev/null +++ b/mixnet/client/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "mixnet-client" +version = "0.1.0" +edition = "2021" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +tracing = "0.1.37" +tokio = { version = "1.29.1", features = ["net"] } +sphinx-packet = "0.1.0" +nym-sphinx = { package = "nym-sphinx", git = "https://github.com/nymtech/nym", tag = "v1.1.22" } +# Using an older version, since `nym-sphinx` depends on `rand` v0.7.3. +rand = "0.7.3" +mixnet-protocol = { path = "../protocol" } +mixnet-topology = { path = "../topology" } +mixnet-util = { path = "../util" } +futures = "0.3.28" +thiserror = "1" diff --git a/mixnet/client/src/config.rs b/mixnet/client/src/config.rs new file mode 100644 index 00000000..3fb1a489 --- /dev/null +++ b/mixnet/client/src/config.rs @@ -0,0 +1,31 @@ +use std::net::SocketAddr; + +use futures::{stream, StreamExt}; +use mixnet_topology::MixnetTopology; +use serde::{Deserialize, Serialize}; + +use crate::{receiver::Receiver, MessageStream, MixnetClientError}; + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct MixnetClientConfig { + pub mode: MixnetClientMode, + pub topology: MixnetTopology, + pub connection_pool_size: usize, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub enum MixnetClientMode { + Sender, + SenderReceiver(SocketAddr), +} + +impl MixnetClientMode { + pub(crate) async fn run(&self) -> Result { + match self { + Self::Sender => Ok(stream::empty().boxed()), + Self::SenderReceiver(node_address) => { + Ok(Receiver::new(*node_address).run().await?.boxed()) + } + } + } +} diff --git a/mixnet/client/src/lib.rs b/mixnet/client/src/lib.rs new file mode 100644 index 00000000..b08e9c3e --- /dev/null +++ b/mixnet/client/src/lib.rs @@ -0,0 +1,56 @@ +pub mod config; +mod receiver; +mod sender; + +use std::error::Error; +use std::time::Duration; + +pub use config::MixnetClientConfig; +pub use config::MixnetClientMode; +use futures::stream::BoxStream; +use mixnet_util::ConnectionPool; +use rand::Rng; +use sender::Sender; +use thiserror::Error; + +// A client for sending packets to Mixnet and receiving packets from Mixnet. +pub struct MixnetClient { + mode: MixnetClientMode, + sender: Sender, +} + +pub type MessageStream = BoxStream<'static, Result, MixnetClientError>>; + +impl MixnetClient { + pub fn new(config: MixnetClientConfig, rng: R) -> Self { + let cache = ConnectionPool::new(config.connection_pool_size); + Self { + mode: config.mode, + sender: Sender::new(config.topology, cache, rng), + } + } + + pub async fn run(&self) -> Result { + self.mode.run().await + } + + pub fn send( + &mut self, + msg: Vec, + total_delay: Duration, + ) -> Result<(), Box> { + self.sender.send(msg, total_delay) + } +} + +#[derive(Error, Debug)] +pub enum MixnetClientError { + #[error("mixnet node connect error")] + MixnetNodeConnectError, + #[error("mixnode stream has been closed")] + MixnetNodeStreamClosed, + #[error("unexpected stream body received")] + UnexpectedStreamBody, + #[error("invalid payload")] + InvalidPayload, +} diff --git a/mixnet/client/src/receiver.rs b/mixnet/client/src/receiver.rs new file mode 100644 index 00000000..4c0bfcc1 --- /dev/null +++ b/mixnet/client/src/receiver.rs @@ -0,0 +1,139 @@ +use std::{error::Error, net::SocketAddr}; + +use futures::{stream, Stream, StreamExt}; +use mixnet_protocol::Body; +use nym_sphinx::{ + chunking::{fragment::Fragment, reconstruction::MessageReconstructor}, + message::{NymMessage, PaddedMessage}, + Payload, +}; +use tokio::net::TcpStream; + +use crate::MixnetClientError; + +// Receiver accepts TCP connections to receive incoming payloads from the Mixnet. +pub struct Receiver { + node_address: SocketAddr, +} + +impl Receiver { + pub fn new(node_address: SocketAddr) -> Self { + Self { node_address } + } + + pub async fn run( + &self, + ) -> Result< + impl Stream, MixnetClientError>> + Send + 'static, + MixnetClientError, + > { + let Ok(socket) = TcpStream::connect(self.node_address).await else { + return Err(MixnetClientError::MixnetNodeConnectError); + }; + + Ok(Self::message_stream(Box::pin(Self::fragment_stream( + socket, + )))) + } + + fn fragment_stream( + socket: TcpStream, + ) -> impl Stream> + Send + 'static { + stream::unfold(socket, |mut socket| async move { + let Ok(body) = Body::read(&mut socket).await else { + // TODO: Maybe this is a hard error and the stream is corrupted? In that case stop the stream + return Some((Err(MixnetClientError::MixnetNodeStreamClosed), socket)); + }; + + match body { + Body::SphinxPacket(_) => { + Some((Err(MixnetClientError::UnexpectedStreamBody), socket)) + } + Body::FinalPayload(payload) => Some((Self::fragment_from_payload(payload), socket)), + } + }) + } + + fn message_stream( + fragment_stream: impl Stream> + + Send + + Unpin + + 'static, + ) -> impl Stream, MixnetClientError>> + Send + 'static { + // MessageReconstructor buffers all received fragments + // and eventually returns reconstructed messages. + let message_reconstructor: MessageReconstructor = Default::default(); + + stream::unfold( + (fragment_stream, message_reconstructor), + |(mut fragment_stream, mut message_reconstructor)| async move { + let result = + Self::reconstruct_message(&mut fragment_stream, &mut message_reconstructor) + .await; + Some((result, (fragment_stream, message_reconstructor))) + }, + ) + } + + fn fragment_from_payload(payload: Payload) -> Result { + let Ok(payload_plaintext) = payload.recover_plaintext() else { + return Err(MixnetClientError::InvalidPayload); + }; + let Ok(fragment) = Fragment::try_from_bytes(&payload_plaintext) else { + return Err(MixnetClientError::InvalidPayload); + }; + Ok(fragment) + } + + async fn reconstruct_message( + fragment_stream: &mut (impl Stream> + + Send + + Unpin + + 'static), + message_reconstructor: &mut MessageReconstructor, + ) -> Result, MixnetClientError> { + // Read fragments until at least one message is fully reconstructed. + while let Some(next) = fragment_stream.next().await { + match next { + Ok(fragment) => { + if let Some(message) = + Self::try_reconstruct_message(fragment, message_reconstructor) + { + return Ok(message); + } + } + Err(e) => { + return Err(e); + } + } + } + + // fragment_stream closed before messages are fully reconstructed + Err(MixnetClientError::MixnetNodeStreamClosed) + } + + fn try_reconstruct_message( + fragment: Fragment, + message_reconstructor: &mut MessageReconstructor, + ) -> Option> { + let reconstruction_result = message_reconstructor.insert_new_fragment(fragment); + match reconstruction_result { + Some((padded_message, _)) => { + let message = Self::remove_padding(padded_message).unwrap(); + Some(message) + } + None => None, + } + } + + fn remove_padding(msg: Vec) -> Result, Box> { + let padded_message = PaddedMessage::new_reconstructed(msg); + // we need this because PaddedMessage.remove_padding requires it for other NymMessage types. + let dummy_num_mix_hops = 0; + + match padded_message.remove_padding(dummy_num_mix_hops)? { + NymMessage::Plain(msg) => Ok(msg), + _ => todo!("return error"), + } + } +} diff --git a/mixnet/client/src/sender.rs b/mixnet/client/src/sender.rs new file mode 100644 index 00000000..b63fb3d2 --- /dev/null +++ b/mixnet/client/src/sender.rs @@ -0,0 +1,200 @@ +use std::{error::Error, net::SocketAddr, time::Duration}; + +use mixnet_protocol::Body; +use mixnet_topology::MixnetTopology; +use mixnet_util::ConnectionPool; +use nym_sphinx::{ + addressing::nodes::NymNodeRoutingAddress, chunking::fragment::Fragment, message::NymMessage, + params::PacketSize, Delay, Destination, DestinationAddressBytes, NodeAddressBytes, + IDENTIFIER_LENGTH, PAYLOAD_OVERHEAD_SIZE, +}; +use rand::{distributions::Uniform, prelude::Distribution, Rng}; +use sphinx_packet::{route, SphinxPacket, SphinxPacketBuilder}; + +// Sender splits messages into Sphinx packets and sends them to the Mixnet. +pub struct Sender { + //TODO: handle topology update + topology: MixnetTopology, + pool: ConnectionPool, + rng: R, +} + +impl Sender { + pub fn new(topology: MixnetTopology, pool: ConnectionPool, rng: R) -> Self { + Self { + topology, + rng, + pool, + } + } + + pub fn send( + &mut self, + msg: Vec, + total_delay: Duration, + ) -> Result<(), Box> { + let destination = self.topology.random_destination(&mut self.rng)?; + let destination = Destination::new( + DestinationAddressBytes::from_bytes(destination.address.as_bytes()), + [0; IDENTIFIER_LENGTH], // TODO: use a proper SURBIdentifier if we need SURB + ); + + self.pad_and_split_message(msg) + .into_iter() + .map(|fragment| self.build_sphinx_packet(fragment, &destination, total_delay)) + .collect::, _>>()? + .into_iter() + .for_each(|(packet, first_node)| { + let pool = self.pool.clone(); + tokio::spawn(async move { + if let Err(e) = + Self::send_packet(&pool, Box::new(packet), first_node.address).await + { + tracing::error!("failed to send packet to the first node: {e}"); + } + }); + }); + + Ok(()) + } + + fn pad_and_split_message(&mut self, msg: Vec) -> Vec { + let nym_message = NymMessage::new_plain(msg); + + // TODO: add PUBLIC_KEY_SIZE for encryption for the destination, + // if we're going to encrypt final payloads for the destination. + // TODO: add ACK_OVERHEAD if we need SURB-ACKs. + // https://github.com/nymtech/nym/blob/3748ab77a132143d5fd1cd75dd06334d33294815/common/nymsphinx/src/message.rs#L181-L181 + let plaintext_size_per_packet = PacketSize::RegularPacket.plaintext_size(); + + nym_message + .pad_to_full_packet_lengths(plaintext_size_per_packet) + .split_into_fragments(&mut self.rng, plaintext_size_per_packet) + } + + fn build_sphinx_packet( + &mut self, + fragment: Fragment, + destination: &Destination, + total_delay: Duration, + ) -> Result<(sphinx_packet::SphinxPacket, route::Node), Box> + { + let route = self.topology.random_route(&mut self.rng)?; + + let delays: Vec = + RandomDelayIterator::new(&mut self.rng, route.len() as u64, total_delay) + .map(|d| Delay::new_from_millis(d.as_millis() as u64)) + .collect(); + + // TODO: encrypt the payload for the destination, if we want + // https://github.com/nymtech/nym/blob/3748ab77a132143d5fd1cd75dd06334d33294815/common/nymsphinx/src/preparer/payload.rs#L70 + let payload = fragment.into_bytes(); + + let packet = SphinxPacketBuilder::new() + .with_payload_size(payload.len() + PAYLOAD_OVERHEAD_SIZE) + .build_packet(payload, &route, destination, &delays)?; + + let first_mixnode = route.first().cloned().expect("route is not empty"); + + Ok((packet, first_mixnode)) + } + + async fn send_packet( + pool: &ConnectionPool, + packet: Box, + addr: NodeAddressBytes, + ) -> Result<(), Box> { + let addr = SocketAddr::try_from(NymNodeRoutingAddress::try_from(addr)?)?; + tracing::debug!("Sending a Sphinx packet to the node: {addr:?}"); + + let mu: std::sync::Arc> = + pool.get_or_init(&addr).await?; + let mut socket = mu.lock().await; + let body = Body::new_sphinx(packet); + body.write(&mut *socket).await?; + + tracing::debug!("Sent a Sphinx packet successuflly to the node: {addr:?}"); + + Ok(()) + } +} + +struct RandomDelayIterator { + rng: R, + remaining_delays: u64, + remaining_time: u64, + avg_delay: u64, +} + +impl RandomDelayIterator { + fn new(rng: R, total_delays: u64, total_time: Duration) -> Self { + let total_time = total_time.as_millis() as u64; + RandomDelayIterator { + rng, + remaining_delays: total_delays, + remaining_time: total_time, + avg_delay: total_time / total_delays, + } + } +} + +impl Iterator for RandomDelayIterator +where + R: Rng, +{ + type Item = Duration; + + fn next(&mut self) -> Option { + if self.remaining_delays == 0 { + return None; + } + + self.remaining_delays -= 1; + + if self.remaining_delays == 1 { + return Some(Duration::from_millis(self.remaining_time)); + } + + // Calculate bounds to avoid extreme values + let upper_bound = (self.avg_delay as f64 * 1.5) + // guarantee that we don't exceed the remaining time and promise the delay we return is + // at least 1ms. + .min(self.remaining_time.saturating_sub(self.remaining_delays) as f64); + let lower_bound = (self.avg_delay as f64 * 0.5).min(upper_bound); + + let delay = Uniform::new_inclusive(lower_bound, upper_bound).sample(&mut self.rng) as u64; + self.remaining_time = self.remaining_time.saturating_sub(delay); + + Some(Duration::from_millis(delay)) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::RandomDelayIterator; + + const TOTAL_DELAYS: u64 = 3; + + #[test] + fn test_random_delay_iter_zero_total_time() { + let mut delays = RandomDelayIterator::new(rand::thread_rng(), TOTAL_DELAYS, Duration::ZERO); + for _ in 0..TOTAL_DELAYS { + assert!(delays.next().is_some()); + } + assert!(delays.next().is_none()); + } + + #[test] + fn test_random_delay_iter_small_total_time() { + let mut delays = + RandomDelayIterator::new(rand::thread_rng(), TOTAL_DELAYS, Duration::from_millis(1)); + let mut d = Duration::ZERO; + for _ in 0..TOTAL_DELAYS { + d += delays.next().unwrap(); + } + assert!(delays.next().is_none()); + assert_eq!(d, Duration::from_millis(1)); + } +} diff --git a/mixnet/node/Cargo.toml b/mixnet/node/Cargo.toml new file mode 100644 index 00000000..703f3412 --- /dev/null +++ b/mixnet/node/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "mixnet-node" +version = "0.1.0" +edition = "2021" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +tracing = "0.1.37" +tokio = { version = "1.32", features = ["net", "time"] } +sphinx-packet = "0.1.0" +nym-sphinx = { package = "nym-sphinx", git = "https://github.com/nymtech/nym", tag = "v1.1.22" } +mixnet-protocol = { path = "../protocol" } +mixnet-topology = { path = "../topology" } +mixnet-util = { path = "../util" } + +[dev-dependencies] +tokio = {version = "1.32", features =["full"]} \ No newline at end of file diff --git a/mixnet/node/src/client_notifier.rs b/mixnet/node/src/client_notifier.rs new file mode 100644 index 00000000..688db25c --- /dev/null +++ b/mixnet/node/src/client_notifier.rs @@ -0,0 +1,47 @@ +use std::{error::Error, net::SocketAddr}; + +use mixnet_protocol::Body; +use tokio::{ + net::{TcpListener, TcpStream}, + sync::mpsc, +}; + +pub struct ClientNotifier {} + +impl ClientNotifier { + pub async fn run( + listen_address: SocketAddr, + mut rx: mpsc::Receiver, + ) -> Result<(), Box> { + let listener = TcpListener::bind(listen_address).await?; + tracing::info!("Listening mixnet client connections: {listen_address}"); + + // Currently, handling only a single incoming connection + // TODO: consider handling multiple clients + loop { + match listener.accept().await { + Ok((socket, remote_addr)) => { + tracing::debug!("Accepted incoming client connection from {remote_addr:?}"); + + if let Err(e) = Self::handle_connection(socket, &mut rx).await { + tracing::error!("failed to handle conn: {e}"); + } + } + Err(e) => tracing::warn!("Failed to accept incoming client connection: {e}"), + } + } + } + + async fn handle_connection( + mut socket: TcpStream, + rx: &mut mpsc::Receiver, + ) -> Result<(), Box> { + while let Some(body) = rx.recv().await { + if let Err(e) = body.write(&mut socket).await { + return Err(format!("error from client conn: {e}").into()); + } + } + tracing::debug!("body receiver closed"); + Ok(()) + } +} diff --git a/mixnet/node/src/config.rs b/mixnet/node/src/config.rs new file mode 100644 index 00000000..93e989a5 --- /dev/null +++ b/mixnet/node/src/config.rs @@ -0,0 +1,37 @@ +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; + +use nym_sphinx::{PrivateKey, PublicKey}; +use serde::{Deserialize, Serialize}; +use sphinx_packet::crypto::{PRIVATE_KEY_SIZE, PUBLIC_KEY_SIZE}; + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct MixnetNodeConfig { + /// A listen address for receiving Sphinx packets + pub listen_address: SocketAddr, + /// An listen address fro communicating with mixnet clients + pub client_listen_address: SocketAddr, + /// A key for decrypting Sphinx packets + pub private_key: [u8; PRIVATE_KEY_SIZE], + /// The size of the connection pool. + pub connection_pool_size: usize, +} + +impl Default for MixnetNodeConfig { + fn default() -> Self { + Self { + listen_address: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 7777)), + client_listen_address: SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(127, 0, 0, 1), + 7778, + )), + private_key: PrivateKey::new().to_bytes(), + connection_pool_size: 255, + } + } +} + +impl MixnetNodeConfig { + pub fn public_key(&self) -> [u8; PUBLIC_KEY_SIZE] { + *PublicKey::from(&PrivateKey::from(self.private_key)).as_bytes() + } +} diff --git a/mixnet/node/src/lib.rs b/mixnet/node/src/lib.rs new file mode 100644 index 00000000..e38b4b56 --- /dev/null +++ b/mixnet/node/src/lib.rs @@ -0,0 +1,191 @@ +mod client_notifier; +pub mod config; + +use std::{error::Error, net::SocketAddr}; + +use client_notifier::ClientNotifier; +pub use config::MixnetNodeConfig; +use mixnet_protocol::Body; +use mixnet_topology::MixnetNodeId; +use mixnet_util::ConnectionPool; +use nym_sphinx::{ + addressing::nodes::NymNodeRoutingAddress, Delay, DestinationAddressBytes, NodeAddressBytes, + Payload, PrivateKey, +}; +pub use sphinx_packet::crypto::PRIVATE_KEY_SIZE; +use sphinx_packet::{crypto::PUBLIC_KEY_SIZE, ProcessedPacket, SphinxPacket}; +use tokio::{ + net::{TcpListener, TcpStream}, + sync::mpsc, +}; + +// A mix node that routes packets in the Mixnet. +pub struct MixnetNode { + config: MixnetNodeConfig, + pool: ConnectionPool, +} + +impl MixnetNode { + pub fn new(config: MixnetNodeConfig) -> Self { + let pool = ConnectionPool::new(config.connection_pool_size); + Self { config, pool } + } + + pub fn id(&self) -> MixnetNodeId { + self.public_key() + } + + pub fn public_key(&self) -> [u8; PUBLIC_KEY_SIZE] { + self.config.public_key() + } + + const CLIENT_NOTI_CHANNEL_SIZE: usize = 100; + + pub async fn run(self) -> Result<(), Box> { + tracing::info!("Public key: {:?}", self.public_key()); + + // Spawn a ClientNotifier + let (client_tx, client_rx) = mpsc::channel(Self::CLIENT_NOTI_CHANNEL_SIZE); + tokio::spawn(async move { + if let Err(e) = ClientNotifier::run(self.config.client_listen_address, client_rx).await + { + tracing::error!("failed to run client notifier: {e}"); + } + }); + + //TODO: Accepting ad-hoc TCP conns for now. Improve conn handling. + //TODO: Add graceful shutdown + let listener = TcpListener::bind(self.config.listen_address).await?; + tracing::info!( + "Listening mixnet node connections: {}", + self.config.listen_address + ); + + loop { + match listener.accept().await { + Ok((socket, remote_addr)) => { + tracing::debug!("Accepted incoming connection from {remote_addr:?}"); + + let client_tx = client_tx.clone(); + let private_key = self.config.private_key; + let pool = self.pool.clone(); + tokio::spawn(async move { + if let Err(e) = + Self::handle_connection(socket, pool, private_key, client_tx).await + { + tracing::error!("failed to handle conn: {e}"); + } + }); + } + Err(e) => tracing::warn!("Failed to accept incoming connection: {e}"), + } + } + } + + async fn handle_connection( + mut socket: TcpStream, + pool: ConnectionPool, + private_key: [u8; PRIVATE_KEY_SIZE], + client_tx: mpsc::Sender, + ) -> Result<(), Box> { + loop { + let body = Body::read(&mut socket).await?; + + let pool = pool.clone(); + let private_key = PrivateKey::from(private_key); + let client_tx = client_tx.clone(); + + tokio::spawn(async move { + if let Err(e) = Self::handle_body(body, &pool, &private_key, &client_tx).await { + tracing::error!("failed to handle body: {e}"); + } + }); + } + } + + async fn handle_body( + body: Body, + pool: &ConnectionPool, + private_key: &PrivateKey, + client_tx: &mpsc::Sender, + ) -> Result<(), Box> { + match body { + Body::SphinxPacket(packet) => { + Self::handle_sphinx_packet(pool, private_key, packet).await + } + _body @ Body::FinalPayload(_) => { + Self::forward_body_to_client_notifier(private_key, client_tx, _body).await + } + } + } + + async fn handle_sphinx_packet( + pool: &ConnectionPool, + private_key: &PrivateKey, + packet: Box, + ) -> Result<(), Box> { + match packet.process(private_key)? { + ProcessedPacket::ForwardHop(packet, next_node_addr, delay) => { + Self::forward_packet_to_next_hop(pool, packet, next_node_addr, delay).await + } + ProcessedPacket::FinalHop(destination_addr, _, payload) => { + Self::forward_payload_to_destination(pool, payload, destination_addr).await + } + } + } + + async fn forward_body_to_client_notifier( + _private_key: &PrivateKey, + client_tx: &mpsc::Sender, + body: Body, + ) -> Result<(), Box> { + // TODO: Decrypt the final payload using the private key, if it's encrypted + + // Do not wait when the channel is full or no receiver exists + client_tx.try_send(body)?; + Ok(()) + } + + async fn forward_packet_to_next_hop( + pool: &ConnectionPool, + packet: Box, + next_node_addr: NodeAddressBytes, + delay: Delay, + ) -> Result<(), Box> { + tracing::debug!("Delaying the packet for {delay:?}"); + tokio::time::sleep(delay.to_duration()).await; + + Self::forward( + pool, + Body::new_sphinx(packet), + NymNodeRoutingAddress::try_from(next_node_addr)?, + ) + .await + } + + async fn forward_payload_to_destination( + pool: &ConnectionPool, + payload: Payload, + destination_addr: DestinationAddressBytes, + ) -> Result<(), Box> { + tracing::debug!("Forwarding final payload to destination mixnode"); + + Self::forward( + pool, + Body::new_final_payload(payload), + NymNodeRoutingAddress::try_from_bytes(&destination_addr.as_bytes())?, + ) + .await + } + + async fn forward( + pool: &ConnectionPool, + body: Body, + to: NymNodeRoutingAddress, + ) -> Result<(), Box> { + let addr = SocketAddr::try_from(to)?; + body.write(&mut *pool.get_or_init(&addr).await?.lock().await) + .await?; + Ok(()) + } +} diff --git a/mixnet/protocol/Cargo.toml b/mixnet/protocol/Cargo.toml new file mode 100644 index 00000000..a587c82b --- /dev/null +++ b/mixnet/protocol/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "mixnet-protocol" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio = "1.29.1" +sphinx-packet = "0.1.0" +futures = "0.3" +tokio-util = {version = "0.7", features = ["io", "io-util"] } \ No newline at end of file diff --git a/mixnet/protocol/src/lib.rs b/mixnet/protocol/src/lib.rs new file mode 100644 index 00000000..5bb1422d --- /dev/null +++ b/mixnet/protocol/src/lib.rs @@ -0,0 +1,101 @@ +use sphinx_packet::{payload::Payload, SphinxPacket}; +use std::error::Error; + +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + +pub enum Body { + SphinxPacket(Box), + FinalPayload(Payload), +} + +impl Body { + pub fn new_sphinx(packet: Box) -> Self { + Self::SphinxPacket(packet) + } + + pub fn new_final_payload(payload: Payload) -> Self { + Self::FinalPayload(payload) + } + + fn variant_as_u8(&self) -> u8 { + match self { + Self::SphinxPacket(_) => 0, + Self::FinalPayload(_) => 1, + } + } + + pub async fn read(reader: &mut R) -> Result> + where + R: AsyncRead + Unpin, + { + let id = reader.read_u8().await?; + match id { + 0 => Self::read_sphinx_packet(reader).await, + 1 => Self::read_final_payload(reader).await, + _ => Err("Invalid body type".into()), + } + } + + fn sphinx_packet_from_bytes( + data: &[u8], + ) -> Result> { + let packet = SphinxPacket::from_bytes(data)?; + Ok(Self::new_sphinx(Box::new(packet))) + } + + async fn read_sphinx_packet( + reader: &mut R, + ) -> Result> + where + R: AsyncRead + Unpin, + { + let size = reader.read_u64().await?; + let mut buf = vec![0; size as usize]; + reader.read_exact(&mut buf).await?; + Self::sphinx_packet_from_bytes(&buf) + } + + pub fn final_payload_from_bytes( + data: &[u8], + ) -> Result> { + let payload = Payload::from_bytes(data)?; + Ok(Self::new_final_payload(payload)) + } + + async fn read_final_payload( + reader: &mut R, + ) -> Result> + where + R: AsyncRead + Unpin, + { + let size = reader.read_u64().await?; + let mut buf = vec![0; size as usize]; + reader.read_exact(&mut buf).await?; + + Self::final_payload_from_bytes(&buf) + } + + pub async fn write( + self, + writer: &mut W, + ) -> Result<(), Box> + where + W: AsyncWrite + Unpin + ?Sized, + { + let variant = self.variant_as_u8(); + writer.write_u8(variant).await?; + match self { + Body::SphinxPacket(packet) => { + let data = packet.to_bytes(); + writer.write_u64(data.len() as u64).await?; + writer.write_all(&data).await?; + } + Body::FinalPayload(payload) => { + let data = payload.as_bytes(); + writer.write_u64(data.len() as u64).await?; + writer.write_all(data).await?; + } + } + Ok(()) + } +} diff --git a/mixnet/topology/Cargo.toml b/mixnet/topology/Cargo.toml new file mode 100644 index 00000000..57ac2d46 --- /dev/null +++ b/mixnet/topology/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "mixnet-topology" +version = "0.1.0" +edition = "2021" + +[dependencies] +hex = "0.4" +# Using an older version, since `nym-sphinx` depends on `rand` v0.7.3. +rand = "0.7.3" +serde = { version = "1.0", features = ["derive"] } +sphinx-packet = "0.1.0" +nym-sphinx = { package = "nym-sphinx", git = "https://github.com/nymtech/nym", tag = "v1.1.22" } diff --git a/mixnet/topology/src/lib.rs b/mixnet/topology/src/lib.rs new file mode 100644 index 00000000..d276f08d --- /dev/null +++ b/mixnet/topology/src/lib.rs @@ -0,0 +1,112 @@ +use std::{error::Error, net::SocketAddr}; + +use nym_sphinx::addressing::nodes::{NymNodeRoutingAddress, NymNodeRoutingAddressError}; +use rand::{seq::IteratorRandom, Rng}; +use serde::{Deserialize, Serialize}; +use sphinx_packet::{crypto::PUBLIC_KEY_SIZE, route}; + +pub type MixnetNodeId = [u8; PUBLIC_KEY_SIZE]; + +#[derive(Serialize, Deserialize, Clone, Debug, Default)] +pub struct MixnetTopology { + pub layers: Vec, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct Layer { + pub nodes: Vec, +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct Node { + pub address: SocketAddr, + #[serde(with = "hex_serde")] + pub public_key: [u8; PUBLIC_KEY_SIZE], +} + +mod hex_serde { + use super::PUBLIC_KEY_SIZE; + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + + pub fn serialize( + pk: &[u8; PUBLIC_KEY_SIZE], + serializer: S, + ) -> Result { + if serializer.is_human_readable() { + hex::encode(pk).serialize(serializer) + } else { + serializer.serialize_bytes(pk) + } + } + + pub fn deserialize<'de, D: Deserializer<'de>>( + deserializer: D, + ) -> Result<[u8; PUBLIC_KEY_SIZE], D::Error> { + if deserializer.is_human_readable() { + let hex_str = String::deserialize(deserializer)?; + hex::decode(hex_str) + .map_err(serde::de::Error::custom) + .and_then(|v| v.as_slice().try_into().map_err(serde::de::Error::custom)) + } else { + <[u8; PUBLIC_KEY_SIZE]>::deserialize(deserializer) + } + } +} + +impl MixnetTopology { + pub fn random_route( + &self, + rng: &mut R, + ) -> Result, Box> { + let num_hops = self.layers.len(); + + let route: Vec = self + .layers + .iter() + .take(num_hops) + .map(|layer| { + layer + .random_node(rng) + .expect("layer is not empty") + .clone() + .try_into() + .unwrap() + }) + .collect(); + + Ok(route) + } + + // Choose a destination mixnet node randomly from the last layer. + pub fn random_destination( + &self, + rng: &mut R, + ) -> Result> { + Ok(self + .layers + .last() + .expect("topology is not empty") + .random_node(rng) + .expect("layer is not empty") + .clone() + .try_into() + .unwrap()) + } +} + +impl Layer { + pub fn random_node(&self, rng: &mut R) -> Option<&Node> { + self.nodes.iter().choose(rng) + } +} + +impl TryInto for Node { + type Error = NymNodeRoutingAddressError; + + fn try_into(self) -> Result { + Ok(route::Node { + address: NymNodeRoutingAddress::from(self.address).try_into()?, + pub_key: self.public_key.into(), + }) + } +} diff --git a/mixnet/util/Cargo.toml b/mixnet/util/Cargo.toml new file mode 100644 index 00000000..0565ae3b --- /dev/null +++ b/mixnet/util/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "mixnet-util" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { version = "1.32", default-features = false, features = ["sync", "net"] } +parking_lot = { version = "0.12", features = ["send_guard"] } \ No newline at end of file diff --git a/mixnet/util/src/lib.rs b/mixnet/util/src/lib.rs new file mode 100644 index 00000000..d5e47d76 --- /dev/null +++ b/mixnet/util/src/lib.rs @@ -0,0 +1,29 @@ +use std::{collections::HashMap, net::SocketAddr, sync::Arc}; + +use tokio::net::TcpStream; +use tokio::sync::Mutex; + +#[derive(Clone)] +pub struct ConnectionPool { + pool: Arc>>>>, +} + +impl ConnectionPool { + pub fn new(size: usize) -> Self { + Self { + pool: Arc::new(Mutex::new(HashMap::with_capacity(size))), + } + } + + pub async fn get_or_init(&self, addr: &SocketAddr) -> std::io::Result>> { + let mut pool = self.pool.lock().await; + match pool.get(addr).cloned() { + Some(tcp) => Ok(tcp), + None => { + let tcp = Arc::new(Mutex::new(TcpStream::connect(addr).await?)); + pool.insert(*addr, tcp.clone()); + Ok(tcp) + } + } + } +} diff --git a/nodes/mixnode/Cargo.toml b/nodes/mixnode/Cargo.toml new file mode 100644 index 00000000..1a2273c3 --- /dev/null +++ b/nodes/mixnode/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "mixnode" +version = "0.1.0" +edition = "2021" + +[dependencies] +async-trait = "0.1" +mixnet-node = { path = "../../mixnet/node" } +nomos-log = { path = "../../nomos-services/log" } +clap = { version = "4", features = ["derive"] } +color-eyre = "0.6.0" +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } +overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" } +serde = "1" +serde_yaml = "0.9" +tracing = "0.1" +tracing-subscriber = "0.3" +tokio = "1.29.1" diff --git a/nodes/mixnode/config.yaml b/nodes/mixnode/config.yaml new file mode 100644 index 00000000..901d94a4 --- /dev/null +++ b/nodes/mixnode/config.yaml @@ -0,0 +1,9 @@ +mixnode: + listen_address: 127.0.0.1:7777 + client_listen_address: 127.0.0.1:7778 + private_key: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + connection_pool_size: 255 +log: + backend: "Stdout" + format: "Json" + level: "debug" diff --git a/nodes/mixnode/src/lib.rs b/nodes/mixnode/src/lib.rs new file mode 100644 index 00000000..c3a9d85f --- /dev/null +++ b/nodes/mixnode/src/lib.rs @@ -0,0 +1,20 @@ +mod services; + +use nomos_log::Logger; +use overwatch_derive::Services; +use overwatch_rs::services::handle::ServiceHandle; +use overwatch_rs::services::ServiceData; +use serde::{Deserialize, Serialize}; +use services::mixnet::MixnetNodeService; + +#[derive(Deserialize, Debug, Clone, Serialize)] +pub struct Config { + pub mixnode: ::Settings, + pub log: ::Settings, +} + +#[derive(Services)] +pub struct MixNode { + node: ServiceHandle, + logging: ServiceHandle, +} diff --git a/nodes/mixnode/src/main.rs b/nodes/mixnode/src/main.rs new file mode 100644 index 00000000..520b9e5c --- /dev/null +++ b/nodes/mixnode/src/main.rs @@ -0,0 +1,29 @@ +mod services; + +use clap::Parser; +use color_eyre::eyre::Result; +use mixnode::{Config, MixNode, MixNodeServiceSettings}; +use overwatch_rs::overwatch::OverwatchRunner; +use overwatch_rs::DynError; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Path for a yaml-encoded mixnet-node config file + config: std::path::PathBuf, +} + +fn main() -> Result<(), DynError> { + let Args { config } = Args::parse(); + let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)?; + + let app = OverwatchRunner::::run( + MixNodeServiceSettings { + node: config.mixnode, + logging: config.log, + }, + None, + )?; + app.wait_finished(); + Ok(()) +} diff --git a/nodes/mixnode/src/services/mixnet.rs b/nodes/mixnode/src/services/mixnet.rs new file mode 100644 index 00000000..cc7bdc42 --- /dev/null +++ b/nodes/mixnode/src/services/mixnet.rs @@ -0,0 +1,31 @@ +use mixnet_node::{MixnetNode, MixnetNodeConfig}; +use overwatch_rs::services::handle::ServiceStateHandle; +use overwatch_rs::services::relay::NoMessage; +use overwatch_rs::services::state::{NoOperator, NoState}; +use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; +use overwatch_rs::DynError; + +pub struct MixnetNodeService(MixnetNode); + +impl ServiceData for MixnetNodeService { + const SERVICE_ID: ServiceId = "mixnet-node"; + type Settings = MixnetNodeConfig; + type State = NoState; + type StateOperator = NoOperator; + type Message = NoMessage; +} + +#[async_trait::async_trait] +impl ServiceCore for MixnetNodeService { + fn init(service_state: ServiceStateHandle) -> Result { + let settings: Self::Settings = service_state.settings_reader.get_updated_settings(); + Ok(Self(MixnetNode::new(settings))) + } + + async fn run(self) -> Result<(), DynError> { + if let Err(_e) = self.0.run().await { + todo!("Errors should match"); + } + Ok(()) + } +} diff --git a/nodes/mixnode/src/services/mod.rs b/nodes/mixnode/src/services/mod.rs new file mode 100644 index 00000000..ae7f01c0 --- /dev/null +++ b/nodes/mixnode/src/services/mod.rs @@ -0,0 +1 @@ +pub mod mixnet; diff --git a/nodes/nomos-node/config.yaml b/nodes/nomos-node/config.yaml index 0d28d050..d60e6c86 100644 --- a/nodes/nomos-node/config.yaml +++ b/nodes/nomos-node/config.yaml @@ -19,6 +19,18 @@ network: discV5BootstrapNodes: [] initial_peers: [] relayTopics: [] + mixnet_client: + mode: Sender + topology: + layers: + - nodes: + - address: 127.0.0.1:7777 + public_key: "0000000000000000000000000000000000000000000000000000000000000000" + connection_pool_size: 255 + mixnet_delay: + start: "0ms" + end: "0ms" + http: backend: address: 0.0.0.0:8080 diff --git a/nodes/nomos-node/src/config.rs b/nodes/nomos-node/src/config.rs index 88eaeaed..060c3284 100644 --- a/nodes/nomos-node/src/config.rs +++ b/nodes/nomos-node/src/config.rs @@ -214,22 +214,23 @@ impl Config { } = network_args; if let Some(IpAddr::V4(h)) = host { - self.network.backend.host = h; + self.network.backend.inner.host = h; } else if host.is_some() { return Err(eyre!("Unsupported ip version")); } if let Some(port) = port { - self.network.backend.port = port as u16; + self.network.backend.inner.port = port as u16; } if let Some(node_key) = node_key { let mut key_bytes = hex::decode(node_key)?; - self.network.backend.node_key = SecretKey::try_from_bytes(key_bytes.as_mut_slice())?; + self.network.backend.inner.node_key = + SecretKey::try_from_bytes(key_bytes.as_mut_slice())?; } if let Some(peers) = initial_peers { - self.network.backend.initial_peers = peers; + self.network.backend.inner.initial_peers = peers; } Ok(self) diff --git a/nomos-core/Cargo.toml b/nomos-core/Cargo.toml index 5b82ec3e..e671b14b 100644 --- a/nomos-core/Cargo.toml +++ b/nomos-core/Cargo.toml @@ -14,14 +14,12 @@ blake2 = { version = "0.10" } bytes = "1.3" consensus-engine = { path = "../consensus-engine", features = ["serde"]} futures = "0.3" -nomos-network = { path = "../nomos-services/network", optional = true } raptorq = { version = "1.7", optional = true } serde = { version = "1.0", features = ["derive"] } thiserror = "1.0" bincode = "1.3" once_cell = "1.0" indexmap = { version = "1.9", features = ["serde"] } -serde_json = { version = "1", optional = true } [dev-dependencies] rand = "0.8" @@ -31,5 +29,4 @@ tokio = { version = "1.23", features = ["macros", "rt"] } [features] default = [] raptor = ["raptorq"] -mock = ["nomos-network/mock", "serde_json"] -waku = ["nomos-network/waku", "serde_json"] +mock = [] diff --git a/nomos-core/src/tx/mock.rs b/nomos-core/src/tx/mock.rs index 86c7d303..f5642571 100644 --- a/nomos-core/src/tx/mock.rs +++ b/nomos-core/src/tx/mock.rs @@ -6,21 +6,21 @@ use blake2::{ Blake2bVar, }; use bytes::{Bytes, BytesMut}; -use nomos_network::backends::mock::MockMessage; +use serde::Serialize; #[derive(Debug, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] -pub struct MockTransaction { +pub struct MockTransaction { id: MockTxId, - content: MockMessage, + content: M, } -impl MockTransaction { - pub fn new(content: MockMessage) -> Self { - let id = MockTxId::from(content.clone()); +impl MockTransaction { + pub fn new(content: M) -> Self { + let id = MockTxId::from(serialize(&content).unwrap().as_slice()); Self { id, content } } - pub fn message(&self) -> &MockMessage { + pub fn message(&self) -> &M { &self.content } @@ -37,7 +37,7 @@ impl MockTransaction { } } -impl Transaction for MockTransaction { +impl Transaction for MockTransaction { const HASHER: TransactionHasher = MockTransaction::id; type Hash = MockTxId; @@ -46,9 +46,9 @@ impl Transaction for MockTransaction { } } -impl From for MockTransaction { - fn from(msg: nomos_network::backends::mock::MockMessage) -> Self { - let id = MockTxId::from(msg.clone()); +impl From for MockTransaction { + fn from(msg: M) -> Self { + let id = MockTxId::from(serialize(&msg).unwrap().as_slice()); Self { id, content: msg } } } @@ -84,18 +84,18 @@ impl MockTxId { } } -impl From for MockTxId { - fn from(msg: nomos_network::backends::mock::MockMessage) -> Self { +impl From<&[u8]> for MockTxId { + fn from(msg: &[u8]) -> Self { let mut hasher = Blake2bVar::new(32).unwrap(); - hasher.update(&serialize(&msg).unwrap()); + hasher.update(msg); let mut id = [0u8; 32]; hasher.finalize_variable(&mut id).unwrap(); Self(id) } } -impl From<&MockTransaction> for MockTxId { - fn from(msg: &MockTransaction) -> Self { +impl From<&MockTransaction> for MockTxId { + fn from(msg: &MockTransaction) -> Self { msg.id } } diff --git a/nomos-services/consensus/src/network/adapters/libp2p.rs b/nomos-services/consensus/src/network/adapters/libp2p.rs index e92f5a87..dc92b1af 100644 --- a/nomos-services/consensus/src/network/adapters/libp2p.rs +++ b/nomos-services/consensus/src/network/adapters/libp2p.rs @@ -231,6 +231,11 @@ impl NetworkAdapter for Libp2pAdapter { let cache = message_cache.clone(); let relay = network_relay.clone(); Self::subscribe(&relay, TOPIC).await; + tracing::debug!("Starting up..."); + // this wait seems to be helpful in some cases since we give the time + // to the network to establish connections before we start sending messages + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + // TODO: maybe we need the runtime handle here? tokio::spawn(async move { let (sender, receiver) = tokio::sync::oneshot::channel(); diff --git a/nomos-services/mempool/Cargo.toml b/nomos-services/mempool/Cargo.toml index 3ea455c1..3e410aa7 100644 --- a/nomos-services/mempool/Cargo.toml +++ b/nomos-services/mempool/Cargo.toml @@ -30,6 +30,6 @@ blake2 = "0.10" [features] default = [] -waku = ["nomos-network/waku", "nomos-core/waku", "waku-bindings"] +waku = ["nomos-network/waku", "waku-bindings"] mock = ["linked-hash-map", "nomos-network/mock", "rand", "nomos-core/mock"] libp2p = ["nomos-network/libp2p"] diff --git a/nomos-services/mempool/src/network/adapters/mock.rs b/nomos-services/mempool/src/network/adapters/mock.rs index e107bc5f..11252e02 100644 --- a/nomos-services/mempool/src/network/adapters/mock.rs +++ b/nomos-services/mempool/src/network/adapters/mock.rs @@ -4,7 +4,7 @@ use futures::{Stream, StreamExt}; use nomos_core::tx::mock::MockTransaction; use nomos_network::backends::mock::{ - EventKind, Mock, MockBackendMessage, MockContentTopic, NetworkEvent, + EventKind, Mock, MockBackendMessage, MockContentTopic, MockMessage, NetworkEvent, }; use nomos_network::{NetworkMsg, NetworkService}; use overwatch_rs::services::relay::OutboundRelay; @@ -25,7 +25,7 @@ pub struct MockAdapter { #[async_trait::async_trait] impl NetworkAdapter for MockAdapter { type Backend = Mock; - type Tx = MockTransaction; + type Tx = MockTransaction; async fn new( network_relay: OutboundRelay< as ServiceData>::Message>, diff --git a/nomos-services/mempool/tests/mock.rs b/nomos-services/mempool/tests/mock.rs index 8e85df0e..c317fb71 100644 --- a/nomos-services/mempool/tests/mock.rs +++ b/nomos-services/mempool/tests/mock.rs @@ -17,7 +17,7 @@ use nomos_mempool::{ struct MockPoolNode { logging: ServiceHandle, network: ServiceHandle>, - mockpool: ServiceHandle>>, + mockpool: ServiceHandle>>>, } #[test] @@ -67,7 +67,7 @@ fn test_mockmempool() { let network = app.handle().relay::>(); let mempool = app .handle() - .relay::>>(); + .relay::>>>(); app.spawn(async move { let network_outbound = network.connect().await.unwrap(); diff --git a/nomos-services/network/Cargo.toml b/nomos-services/network/Cargo.toml index bd2a9112..44188156 100644 --- a/nomos-services/network/Cargo.toml +++ b/nomos-services/network/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" async-trait = "0.1" bytes = "1.2" chrono = { version = "0.4", optional = true } +humantime-serde = { version = "1", optional = true } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } multiaddr = "0.15" serde = { version = "1.0", features = ["derive"] } @@ -18,14 +19,16 @@ tokio = { version = "1", features = ["sync"] } tokio-stream = "0.1" thiserror = "1.0" tracing = "0.1" -rand = { version = "0.8", optional = true } +rand = { version = "0.7.3", optional = true } waku-bindings = { version = "0.1.1", optional = true } tracing-appender = "0.2" tracing-subscriber = { version = "0.3", features = ["json"] } tracing-gelf = "0.7" futures = "0.3" parking_lot = "0.12" +nomos-core = { path = "../../nomos-core" } nomos-libp2p = { path = "../../nomos-libp2p", optional = true } +mixnet-client = { path = "../../mixnet/client" } [dev-dependencies] tokio = { version = "1", features = ["full"] } @@ -33,5 +36,5 @@ tokio = { version = "1", features = ["full"] } [features] default = [] waku = ["waku-bindings"] -libp2p = ["nomos-libp2p"] +libp2p = ["nomos-libp2p", "rand", "humantime-serde"] mock = ["rand", "chrono"] diff --git a/nomos-services/network/src/backends/libp2p.rs b/nomos-services/network/src/backends/libp2p.rs index cfac1def..4ab1cbe2 100644 --- a/nomos-services/network/src/backends/libp2p.rs +++ b/nomos-services/network/src/backends/libp2p.rs @@ -1,7 +1,9 @@ // std -use std::error::Error; +use std::{error::Error, ops::Range, time::Duration}; // internal use super::NetworkBackend; +use mixnet_client::{MixnetClient, MixnetClientConfig}; +use nomos_core::wire; pub use nomos_libp2p::libp2p::gossipsub::{Message, TopicHash}; use nomos_libp2p::{ libp2p::{gossipsub, Multiaddr, PeerId}, @@ -9,8 +11,10 @@ use nomos_libp2p::{ }; // crates use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::NoState}; +use rand::{rngs::OsRng, thread_rng, Rng}; use serde::{Deserialize, Serialize}; use tokio::sync::{broadcast, mpsc, oneshot}; +use tokio_stream::StreamExt; macro_rules! log_error { ($e:expr) => { @@ -25,6 +29,55 @@ pub struct Libp2p { commands_tx: mpsc::Sender, } +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct Libp2pConfig { + #[serde(flatten)] + pub inner: SwarmConfig, + pub mixnet_client: MixnetClientConfig, + #[serde(with = "humantime")] + pub mixnet_delay: Range, +} + +mod humantime { + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + use std::{ops::Range, time::Duration}; + + #[derive(Serialize, Deserialize)] + struct DurationRangeHelper { + #[serde(with = "humantime_serde")] + start: Duration, + #[serde(with = "humantime_serde")] + end: Duration, + } + + pub fn serialize( + val: &Range, + serializer: S, + ) -> Result { + if serializer.is_human_readable() { + DurationRangeHelper { + start: val.start, + end: val.end, + } + .serialize(serializer) + } else { + val.serialize(serializer) + } + } + + pub fn deserialize<'de, D: Deserializer<'de>>( + deserializer: D, + ) -> Result, D::Error> { + if deserializer.is_human_readable() { + let DurationRangeHelper { start, end } = + DurationRangeHelper::deserialize(deserializer)?; + Ok(start..end) + } else { + Range::::deserialize(deserializer) + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Libp2pInfo { pub listen_addresses: Vec, @@ -39,14 +92,29 @@ pub enum EventKind { } const BUFFER_SIZE: usize = 64; +const BACKOFF: u64 = 5; +const MAX_RETRY: usize = 3; #[derive(Debug)] +#[non_exhaustive] pub enum Command { Connect(PeerId, Multiaddr), - Broadcast { topic: Topic, message: Box<[u8]> }, + Broadcast { + topic: Topic, + message: Box<[u8]>, + }, Subscribe(Topic), Unsubscribe(Topic), - Info { reply: oneshot::Sender }, + Info { + reply: oneshot::Sender, + }, + #[doc(hidden)] + // broadcast a message directly through gossipsub without mixnet + DirectBroadcastAndRetry { + topic: Topic, + message: Box<[u8]>, + retry: usize, + }, } pub type Topic = String; @@ -58,24 +126,73 @@ pub enum Event { Message(Message), } +#[derive(Debug, Clone, Serialize, Deserialize)] +struct MixnetMessage { + topic: Topic, + message: Box<[u8]>, +} + +impl MixnetMessage { + pub fn as_bytes(&self) -> Vec { + wire::serialize(self).expect("Couldn't serialize MixnetMessage") + } + pub fn from_bytes(data: &[u8]) -> Result { + wire::deserialize(data) + } +} + #[async_trait::async_trait] impl NetworkBackend for Libp2p { - type Settings = SwarmConfig; - type State = NoState; + type Settings = Libp2pConfig; + type State = NoState; type Message = Command; type EventKind = EventKind; type NetworkEvent = Event; fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self { + let mixnet_client = MixnetClient::new(config.mixnet_client.clone(), OsRng); let (commands_tx, mut commands_rx) = tokio::sync::mpsc::channel(BUFFER_SIZE); let (events_tx, _) = tokio::sync::broadcast::channel(BUFFER_SIZE); - let libp2p = Self { - events_tx: events_tx.clone(), - commands_tx, - }; + + let cmd_tx = commands_tx.clone(); overwatch_handle.runtime().spawn(async move { - use tokio_stream::StreamExt; - let mut swarm = Swarm::build(&config).unwrap(); + let Ok(mut stream) = mixnet_client.run().await else { + tracing::error!("Could not quickstart mixnet stream"); + return; + }; + + while let Some(result) = stream.next().await { + match result { + Ok(msg) => { + tracing::debug!("receiving message from mixnet client"); + let Ok(MixnetMessage { topic, message }) = MixnetMessage::from_bytes(&msg) + else { + tracing::error!( + "failed to deserialize json received from mixnet client" + ); + continue; + }; + + cmd_tx + .send(Command::DirectBroadcastAndRetry { + topic, + message, + retry: 0, + }) + .await + .unwrap_or_else(|_| tracing::error!("could not schedule broadcast")); + } + Err(e) => { + todo!("Handle mixclient error: {e}"); + } + } + } + }); + let cmd_tx = commands_tx.clone(); + let notify = events_tx.clone(); + overwatch_handle.runtime().spawn(async move { + let mut swarm = Swarm::build(&config.inner).unwrap(); + let mut mixnet_client = MixnetClient::new(config.mixnet_client, OsRng); loop { tokio::select! { Some(event) = swarm.next() => { @@ -86,7 +203,7 @@ impl NetworkBackend for Libp2p { message, })) => { tracing::debug!("Got message with id: {id} from peer: {peer_id}"); - log_error!(events_tx.send(Event::Message(message))); + log_error!(notify.send(Event::Message(message))); } SwarmEvent::ConnectionEstablished { peer_id, @@ -121,23 +238,10 @@ impl NetworkBackend for Libp2p { log_error!(swarm.connect(peer_id, peer_addr)); } Command::Broadcast { topic, message } => { - match swarm.broadcast(&topic, message.to_vec()) { - Ok(id) => { - tracing::debug!("broadcasted message with id: {id} tp topic: {topic}"); - } - Err(e) => { - tracing::error!("failed to broadcast message to topic: {topic} {e:?}"); - } - } - - if swarm.is_subscribed(&topic) { - log_error!(events_tx.send(Event::Message(Message { - source: None, - data: message.into(), - sequence_number: None, - topic: Swarm::topic_hash(&topic), - }))); - } + tracing::debug!("sending message to mixnet client"); + let msg = MixnetMessage { topic, message }; + let delay = random_delay(&config.mixnet_delay); + log_error!(mixnet_client.send(msg.as_bytes(), delay)); } Command::Subscribe(topic) => { tracing::debug!("subscribing to topic: {topic}"); @@ -159,12 +263,18 @@ impl NetworkBackend for Libp2p { }; log_error!(reply.send(info)); } + Command::DirectBroadcastAndRetry { topic, message, retry } => { + broadcast_and_retry(topic, message, retry, cmd_tx.clone(), &mut swarm, notify.clone()).await; + } }; } } } }); - libp2p + Self { + events_tx, + commands_tx, + } } async fn process(&self, msg: Self::Message) { @@ -185,3 +295,75 @@ impl NetworkBackend for Libp2p { } } } + +fn random_delay(range: &Range) -> Duration { + if range.start == range.end { + return range.start; + } + thread_rng().gen_range(range.start, range.end) +} + +async fn broadcast_and_retry( + topic: Topic, + message: Box<[u8]>, + retry: usize, + commands_tx: mpsc::Sender, + swarm: &mut Swarm, + events_tx: broadcast::Sender, +) { + tracing::debug!("broadcasting message to topic: {topic}"); + + let wait = BACKOFF.pow(retry as u32); + + match swarm.broadcast(&topic, message.to_vec()) { + Ok(id) => { + tracing::debug!("broadcasted message with id: {id} tp topic: {topic}"); + // self-notification because libp2p doesn't do it + if swarm.is_subscribed(&topic) { + log_error!(events_tx.send(Event::Message(Message { + source: None, + data: message.into(), + sequence_number: None, + topic: Swarm::topic_hash(&topic), + }))); + } + } + Err(gossipsub::PublishError::InsufficientPeers) if retry < MAX_RETRY => { + tracing::error!("failed to broadcast message to topic due to insufficient peers, trying again in {wait:?}"); + + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_secs(wait)).await; + commands_tx + .send(Command::DirectBroadcastAndRetry { + topic, + message, + retry: retry + 1, + }) + .await + .unwrap_or_else(|_| tracing::error!("could not schedule retry")); + }); + } + Err(e) => { + tracing::error!("failed to broadcast message to topic: {topic} {e:?}"); + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::random_delay; + + #[test] + fn test_random_delay() { + assert_eq!( + random_delay(&(Duration::ZERO..Duration::ZERO)), + Duration::ZERO + ); + + let range = Duration::from_millis(10)..Duration::from_millis(100); + let delay = random_delay(&range); + assert!(range.start <= delay && delay < range.end); + } +} diff --git a/nomos-services/network/src/lib.rs b/nomos-services/network/src/lib.rs index 58b92d3e..deaf34d2 100644 --- a/nomos-services/network/src/lib.rs +++ b/nomos-services/network/src/lib.rs @@ -83,12 +83,6 @@ where } async fn run(mut self) -> Result<(), overwatch_rs::DynError> { - tracing::debug!("Starting up..."); - // this wait seems to be helpful in some cases for waku, where it reports - // to be connected to peers but does not seem to be able to send messages - // to them - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - let Self { service_state: ServiceStateHandle { mut inbound_relay, .. diff --git a/tests/Cargo.toml b/tests/Cargo.toml index cd3d964c..1ab4c4db 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -14,9 +14,14 @@ overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" nomos-core = { path = "../nomos-core" } consensus-engine = { path = "../consensus-engine", features = ["serde"] } nomos-mempool = { path = "../nomos-services/mempool", features = ["mock"] } -rand = "0.8" +mixnode = { path = "../nodes/mixnode" } +mixnet-node = { path = "../mixnet/node" } +mixnet-client = { path = "../mixnet/client" } +mixnet-topology = { path = "../mixnet/topology" } +# Using older versions, since `mixnet-*` crates depend on `rand` v0.7.3. +rand = "0.7.3" +rand_xoshiro = "0.4" once_cell = "1" -rand_xoshiro = "0.6" secp256k1 = { version = "0.26", features = ["rand"] } waku-bindings = { version = "0.1.1", optional = true } reqwest = { version = "0.11", features = ["json"] } @@ -27,6 +32,8 @@ tokio = "1" futures = "0.3" async-trait = "0.1" fraction = "0.13" +ntest = "0.9.0" +criterion = { version = "0.5", features = ["async_tokio"] } [[test]] name = "test_consensus_happy_path" @@ -36,8 +43,17 @@ path = "src/tests/happy.rs" name = "test_consensus_unhappy_path" path = "src/tests/unhappy.rs" +[[test]] +name = "test_mixnet" +path = "src/tests/mixnet.rs" + +[[bench]] +name = "mixnet" +path = "src/benches/mixnet.rs" +harness = false + [features] metrics = ["nomos-node/metrics"] waku = ["nomos-network/waku", "nomos-mempool/waku", "nomos-node/waku", "waku-bindings"] -libp2p = ["nomos-network/libp2p", "nomos-mempool/libp2p", "nomos-libp2p", "nomos-node/libp2p"] \ No newline at end of file +libp2p = ["nomos-network/libp2p", "nomos-mempool/libp2p", "nomos-libp2p", "nomos-node/libp2p"] diff --git a/tests/src/benches/mixnet.rs b/tests/src/benches/mixnet.rs new file mode 100644 index 00000000..85470feb --- /dev/null +++ b/tests/src/benches/mixnet.rs @@ -0,0 +1,78 @@ +use std::time::Duration; + +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use futures::StreamExt; +use mixnet_client::{MessageStream, MixnetClient, MixnetClientConfig, MixnetClientMode}; +use rand::{rngs::OsRng, Rng, RngCore}; +use tests::MixNode; +use tokio::time::Instant; + +pub fn mixnet(c: &mut Criterion) { + c.bench_function("mixnet", |b| { + b.to_async(tokio::runtime::Runtime::new().unwrap()) + .iter_custom(|iters| async move { + let (_mixnodes, mut sender_client, mut destination_stream, msg) = + setup(100 * 1024).await; + + let start = Instant::now(); + for _ in 0..iters { + black_box( + send_receive_message(&msg, &mut sender_client, &mut destination_stream) + .await, + ); + } + start.elapsed() + }) + }); +} + +async fn setup(msg_size: usize) -> (Vec, MixnetClient, MessageStream, Vec) { + let (mixnodes, node_configs, topology) = MixNode::spawn_nodes(3).await; + + let sender_client = MixnetClient::new( + MixnetClientConfig { + mode: MixnetClientMode::Sender, + topology: topology.clone(), + connection_pool_size: 255, + }, + OsRng, + ); + let destination_client = MixnetClient::new( + MixnetClientConfig { + mode: MixnetClientMode::SenderReceiver( + // Connect with the MixnetNode in the exit layer + // According to the current implementation, + // one of mixnodes the exit layer always will be selected as a destination. + node_configs.last().unwrap().client_listen_address, + ), + topology, + connection_pool_size: 255, + }, + OsRng, + ); + let destination_stream = destination_client.run().await.unwrap(); + + let mut msg = vec![0u8; msg_size]; + rand::thread_rng().fill_bytes(&mut msg); + + (mixnodes, sender_client, destination_stream, msg) +} + +async fn send_receive_message( + msg: &[u8], + sender_client: &mut MixnetClient, + destination_stream: &mut MessageStream, +) { + let res = sender_client.send(msg.to_vec(), Duration::ZERO); + assert!(res.is_ok()); + + let received = destination_stream.next().await.unwrap().unwrap(); + assert_eq!(msg, received.as_slice()); +} + +criterion_group!( + name = benches; + config = Criterion::default().sample_size(10).measurement_time(Duration::from_secs(180)); + targets = mixnet +); +criterion_main!(benches); diff --git a/tests/src/lib.rs b/tests/src/lib.rs index 8166eb45..f9c015f8 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -1,4 +1,7 @@ mod nodes; +use mixnet_node::MixnetNodeConfig; +use mixnet_topology::MixnetTopology; +pub use nodes::MixNode; pub use nodes::NomosNode; use once_cell::sync::Lazy; @@ -11,7 +14,7 @@ use std::{fmt::Debug, sync::Mutex}; use fraction::Fraction; use rand::{thread_rng, Rng}; -static NET_PORT: Lazy> = Lazy::new(|| Mutex::new(thread_rng().gen_range(8000..10000))); +static NET_PORT: Lazy> = Lazy::new(|| Mutex::new(thread_rng().gen_range(8000, 10000))); pub fn get_available_port() -> u16 { let mut port = NET_PORT.lock().unwrap(); @@ -30,11 +33,13 @@ pub trait Node: Sized { fn stop(&mut self); } -#[derive(Clone, Copy)] +#[derive(Clone)] pub enum SpawnConfig { Star { n_participants: usize, threshold: Fraction, timeout: Duration, + mixnet_node_configs: Vec, + mixnet_topology: MixnetTopology, }, } diff --git a/tests/src/nodes/mixnode.rs b/tests/src/nodes/mixnode.rs new file mode 100644 index 00000000..854f9b2c --- /dev/null +++ b/tests/src/nodes/mixnode.rs @@ -0,0 +1,105 @@ +use std::{ + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + process::{Child, Command, Stdio}, + time::Duration, +}; + +use mixnet_node::{MixnetNodeConfig, PRIVATE_KEY_SIZE}; +use mixnet_topology::{Layer, MixnetTopology, Node}; +use rand::{thread_rng, RngCore}; +use tempfile::NamedTempFile; + +use crate::get_available_port; + +const MIXNODE_BIN: &str = "../target/debug/mixnode"; + +pub struct MixNode { + child: Child, +} + +impl Drop for MixNode { + fn drop(&mut self) { + self.child.kill().unwrap(); + } +} + +impl MixNode { + pub async fn spawn(config: MixnetNodeConfig) -> Self { + let config = mixnode::Config { + mixnode: config, + log: Default::default(), + }; + + let mut file = NamedTempFile::new().unwrap(); + let config_path = file.path().to_owned(); + serde_yaml::to_writer(&mut file, &config).unwrap(); + + let child = Command::new(std::env::current_dir().unwrap().join(MIXNODE_BIN)) + .arg(&config_path) + .stdout(Stdio::null()) + .spawn() + .unwrap(); + + //TODO: use a sophisticated way to wait until the node is ready + tokio::time::sleep(Duration::from_secs(1)).await; + + Self { child } + } + + pub async fn spawn_nodes( + num_nodes: usize, + ) -> (Vec, Vec, MixnetTopology) { + let mut configs = Vec::::new(); + for _ in 0..num_nodes { + let mut private_key = [0u8; PRIVATE_KEY_SIZE]; + thread_rng().fill_bytes(&mut private_key); + + let config = MixnetNodeConfig { + listen_address: SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(127, 0, 0, 1), + get_available_port(), + )), + client_listen_address: SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(127, 0, 0, 1), + get_available_port(), + )), + private_key, + connection_pool_size: 255, + }; + configs.push(config); + } + + let mut nodes = Vec::::new(); + for config in &configs { + nodes.push(Self::spawn(config.clone()).await); + } + + // We need to return configs as well, to configure mixclients accordingly + (nodes, configs.clone(), Self::build_topology(configs)) + } + + fn build_topology(configs: Vec) -> MixnetTopology { + // Build three empty layers first + let mut layers = vec![Layer { nodes: Vec::new() }; 3]; + let mut layer_id = 0; + + // Assign nodes to each layer in round-robin + for config in &configs { + let public_key = config.public_key(); + layers.get_mut(layer_id).unwrap().nodes.push(Node { + address: config.listen_address, + public_key, + }); + layer_id = (layer_id + 1) % layers.len(); + } + + // Exclude empty layers + MixnetTopology { + layers: layers + .iter() + .filter(|layer| !layer.nodes.is_empty()) + .cloned() + .collect(), + } + } +} diff --git a/tests/src/nodes/mod.rs b/tests/src/nodes/mod.rs index 760d9d1e..37f1eea7 100644 --- a/tests/src/nodes/mod.rs +++ b/tests/src/nodes/mod.rs @@ -1,3 +1,5 @@ +mod mixnode; mod nomos; +pub use self::mixnode::MixNode; pub use nomos::NomosNode; diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index 284a3fe6..91550dd8 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -6,13 +6,17 @@ use std::time::Duration; use crate::{get_available_port, Node, SpawnConfig}; use consensus_engine::overlay::{FlatOverlaySettings, RoundRobin}; use consensus_engine::NodeId; +#[cfg(feature = "libp2p")] +use mixnet_client::{MixnetClientConfig, MixnetClientMode}; +use mixnet_node::MixnetNodeConfig; +use mixnet_topology::MixnetTopology; use nomos_consensus::{CarnotInfo, CarnotSettings}; use nomos_http::backends::axum::AxumBackendSettings; #[cfg(feature = "libp2p")] use nomos_libp2p::{Multiaddr, SwarmConfig}; use nomos_log::{LoggerBackend, LoggerFormat}; #[cfg(feature = "libp2p")] -use nomos_network::backends::libp2p::Libp2pInfo; +use nomos_network::backends::libp2p::{Libp2pConfig, Libp2pInfo}; #[cfg(feature = "waku")] use nomos_network::backends::waku::{WakuConfig, WakuInfo}; use nomos_network::NetworkConfig; @@ -171,6 +175,8 @@ impl Node for NomosNode { n_participants, threshold, timeout, + mut mixnet_node_configs, + mixnet_topology, } => { let mut ids = vec![[0; 32]; n_participants]; for id in &mut ids { @@ -184,16 +190,27 @@ impl Node for NomosNode { *id, threshold, timeout, + mixnet_node_configs.pop(), + mixnet_topology.clone(), ) }) .collect::>(); let mut nodes = vec![Self::spawn(configs.swap_remove(0)).await]; let listening_addr = nodes[0].get_listening_address().await; for mut conf in configs { + #[cfg(feature = "waku")] conf.network .backend .initial_peers .push(listening_addr.clone()); + #[cfg(feature = "libp2p")] + // TODO: Consider having `initial_peers` outside of `inner`, as WakuConfig does + conf.network + .backend + .inner + .initial_peers + .push(listening_addr.clone()); + nodes.push(Self::spawn(conf).await); } nodes @@ -220,7 +237,17 @@ fn create_node_config( private_key: [u8; 32], threshold: Fraction, timeout: Duration, + #[cfg(feature = "libp2p")] mixnet_node_config: Option, + #[cfg(feature = "waku")] _mixnet_node_config: Option, + #[cfg(feature = "libp2p")] mixnet_topology: MixnetTopology, + #[cfg(feature = "waku")] _mixnet_topology: MixnetTopology, ) -> Config { + #[cfg(feature = "libp2p")] + let mixnet_client_mode = match mixnet_node_config { + Some(node_config) => MixnetClientMode::SenderReceiver(node_config.client_listen_address), + None => MixnetClientMode::Sender, + }; + let mut config = Config { network: NetworkConfig { #[cfg(feature = "waku")] @@ -229,9 +256,17 @@ fn create_node_config( inner: Default::default(), }, #[cfg(feature = "libp2p")] - backend: SwarmConfig { - initial_peers: vec![], - ..Default::default() + backend: Libp2pConfig { + inner: SwarmConfig { + initial_peers: vec![], + ..Default::default() + }, + mixnet_client: MixnetClientConfig { + mode: mixnet_client_mode, + topology: mixnet_topology, + connection_pool_size: 255, + }, + mixnet_delay: Duration::ZERO..Duration::from_millis(10), }, }, consensus: CarnotSettings { @@ -265,7 +300,7 @@ fn create_node_config( } #[cfg(feature = "libp2p")] { - config.network.backend.port = get_available_port(); + config.network.backend.inner.port = get_available_port(); } config diff --git a/tests/src/tests/happy.rs b/tests/src/tests/happy.rs index 72f23fb3..b93f0d55 100644 --- a/tests/src/tests/happy.rs +++ b/tests/src/tests/happy.rs @@ -3,7 +3,7 @@ use fraction::{Fraction, One}; use futures::stream::{self, StreamExt}; use std::collections::HashSet; use std::time::Duration; -use tests::{Node, NomosNode, SpawnConfig}; +use tests::{MixNode, Node, NomosNode, SpawnConfig}; const TARGET_VIEW: View = View::new(20); @@ -48,10 +48,13 @@ async fn happy_test(nodes: Vec) { #[tokio::test] async fn two_nodes_happy() { + let (_mixnodes, mixnet_node_configs, mixnet_topology) = MixNode::spawn_nodes(2).await; let nodes = NomosNode::spawn_nodes(SpawnConfig::Star { n_participants: 2, threshold: Fraction::one(), timeout: Duration::from_secs(10), + mixnet_node_configs, + mixnet_topology, }) .await; happy_test(nodes).await; @@ -59,10 +62,13 @@ async fn two_nodes_happy() { #[tokio::test] async fn ten_nodes_happy() { + let (_mixnodes, mixnet_node_configs, mixnet_topology) = MixNode::spawn_nodes(3).await; let nodes = NomosNode::spawn_nodes(SpawnConfig::Star { n_participants: 10, threshold: Fraction::one(), timeout: Duration::from_secs(10), + mixnet_node_configs, + mixnet_topology, }) .await; happy_test(nodes).await; diff --git a/tests/src/tests/mixnet.rs b/tests/src/tests/mixnet.rs new file mode 100644 index 00000000..ea7f6702 --- /dev/null +++ b/tests/src/tests/mixnet.rs @@ -0,0 +1,135 @@ +use std::{ + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + time::Duration, +}; + +use futures::{Stream, StreamExt}; +use mixnet_client::{MixnetClient, MixnetClientConfig, MixnetClientError, MixnetClientMode}; +use mixnet_node::{MixnetNode, MixnetNodeConfig}; +use mixnet_topology::{Layer, MixnetTopology, Node}; +use rand::{rngs::OsRng, RngCore}; +use tests::get_available_port; + +#[tokio::test] +// Set timeout since the test won't stop even if mixnodes (spawned asynchronously) panic. +#[ntest::timeout(5000)] +async fn mixnet() { + let (topology, mut destination_stream) = run_nodes_and_destination_client().await; + + let mut msg = [0u8; 100 * 1024]; + rand::thread_rng().fill_bytes(&mut msg); + + let mut sender_client = MixnetClient::new( + MixnetClientConfig { + mode: MixnetClientMode::Sender, + topology: topology.clone(), + connection_pool_size: 255, + }, + OsRng, + ); + + let res = sender_client.send(msg.to_vec(), Duration::from_millis(500)); + assert!(res.is_ok()); + + let received = destination_stream.next().await.unwrap().unwrap(); + assert_eq!(msg, received.as_slice()); +} + +async fn run_nodes_and_destination_client() -> ( + MixnetTopology, + impl Stream, MixnetClientError>> + Send, +) { + let config1 = MixnetNodeConfig { + listen_address: SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(127, 0, 0, 1), + get_available_port(), + )), + client_listen_address: SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(127, 0, 0, 1), + get_available_port(), + )), + ..Default::default() + }; + let config2 = MixnetNodeConfig { + listen_address: SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(127, 0, 0, 1), + get_available_port(), + )), + client_listen_address: SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(127, 0, 0, 1), + get_available_port(), + )), + ..Default::default() + }; + let config3 = MixnetNodeConfig { + listen_address: SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(127, 0, 0, 1), + get_available_port(), + )), + client_listen_address: SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(127, 0, 0, 1), + get_available_port(), + )), + ..Default::default() + }; + + let mixnode1 = MixnetNode::new(config1.clone()); + let mixnode2 = MixnetNode::new(config2.clone()); + let mixnode3 = MixnetNode::new(config3.clone()); + + let topology = MixnetTopology { + layers: vec![ + Layer { + nodes: vec![Node { + address: config1.listen_address, + public_key: mixnode1.public_key(), + }], + }, + Layer { + nodes: vec![Node { + address: config2.listen_address, + public_key: mixnode2.public_key(), + }], + }, + Layer { + nodes: vec![Node { + address: config3.listen_address, + public_key: mixnode3.public_key(), + }], + }, + ], + }; + + // Run all MixnetNodes + tokio::spawn(async move { + let res = mixnode1.run().await; + assert!(res.is_ok()); + }); + tokio::spawn(async move { + let res = mixnode2.run().await; + assert!(res.is_ok()); + }); + tokio::spawn(async move { + let res = mixnode3.run().await; + assert!(res.is_ok()); + }); + + // Wait until mixnodes are ready + // TODO: use a more sophisticated way + tokio::time::sleep(Duration::from_secs(1)).await; + + // Run a MixnetClient only for the MixnetNode in the exit layer. + // According to the current implementation, + // one of mixnodes the exit layer always will be selected as a destination. + let client = MixnetClient::new( + MixnetClientConfig { + mode: MixnetClientMode::SenderReceiver(config3.client_listen_address), + topology: topology.clone(), + connection_pool_size: 255, + }, + OsRng, + ); + let client_stream = client.run().await.unwrap(); + + (topology, client_stream) +} diff --git a/tests/src/tests/unhappy.rs b/tests/src/tests/unhappy.rs index 59bb78ff..114fe0df 100644 --- a/tests/src/tests/unhappy.rs +++ b/tests/src/tests/unhappy.rs @@ -2,16 +2,19 @@ use consensus_engine::View; use fraction::Fraction; use futures::stream::{self, StreamExt}; use std::collections::HashSet; -use tests::{Node, NomosNode, SpawnConfig}; +use tests::{MixNode, Node, NomosNode, SpawnConfig}; const TARGET_VIEW: View = View::new(20); #[tokio::test] async fn ten_nodes_one_down() { + let (_mixnodes, mixnet_node_configs, mixnet_topology) = MixNode::spawn_nodes(3).await; let mut nodes = NomosNode::spawn_nodes(SpawnConfig::Star { n_participants: 10, threshold: Fraction::new(9u32, 10u32), timeout: std::time::Duration::from_secs(5), + mixnet_node_configs, + mixnet_topology, }) .await; let mut failed_node = nodes.pop().unwrap();