1
0
mirror of synced 2025-01-10 15:56:03 +00:00

Mixnet PoC base branch (#316)

* Add `mixnode` and `mixnet-client` crate (#302)

* Add `mixnode` binary (#317)

* Integrate mixnet with libp2p network backend (#318)

* Fix #312: proper delays (#321)

* proper delays

* add missing duration param

* tiny fix: compilation error caused by `rand` 0.8 -> 0.7

* use `get_available_port()` for mixnet integration tests (#333)

* add missing comments

* Overwatch mixnet node (#339)

* Add mixnet service and overwatch app

* remove #[tokio::main]

---------

Co-authored-by: Youngjoon Lee <taxihighway@gmail.com>

* fix tests for the overwatch mixnode (#342)

* fix panic when corner case happen in RandomDelayIter (#335)

* Use `log` service for `mixnode` bin (#341)

* Use `wire` for MixnetMessage in libp2p (#347)

* Prevent tmixnet tests from running forever (#363)

* Use random delay when sending msgs to mixnet (#362)

* fix a minor compilation error caused by the latest master

* Fix run output fd (#343)

* add a connection pool

* Exp backoff (#332)

* move mixnet listening into separate task

* add exponential retry for insufficient peers in libp2p

* fix logging

* Fix MutexGuard across await (#373)

* Fix MutexGuard across await

Holding a MutexGuard across an await point is not a good idea.
Removing that solves the issues we had with the mixnet test

* Make mixnode handle bodies coming from the same source concurrently (#372)

---------

Co-authored-by: Youngjoon Lee <taxihighway@gmail.com>

* Move wait at network startup (#338)

We now wait after the call to 'subscribe' to give the network
the time to register peers in the mesh before starting to
publish messages

* Remove unused functions from mixnet connpool (#374)

* Mixnet benchmark (#375)

* merge fixes

* add `connection_pool_size` field to `config.yaml`

* Simplify mixnet topology (#393)

* Simplify bytes and duration range ser/de (#394)

* optimize bytes serde and duration serde

---------

Co-authored-by: Al Liu <scygliu1@gmail.com>
Co-authored-by: Daniel Sanchez <sanchez.quiros.daniel@gmail.com>
Co-authored-by: Giacomo Pasini <Zeegomo@users.noreply.github.com>
This commit is contained in:
Youngjoon Lee 2023-09-14 17:38:47 +09:00 committed by GitHub
parent 5b7c17e450
commit 8449c81d0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 1781 additions and 79 deletions

View File

@ -14,8 +14,13 @@ members = [
"nomos-da/kzg",
"nomos-da/full-replication",
"nodes/nomos-node",
"nodes/mixnode",
"simulations",
"consensus-engine",
"tests",
"mixnet/node",
"mixnet/client",
"mixnet/protocol",
"mixnet/topology",
]
resolver = "2"

18
mixnet/client/Cargo.toml Normal file
View File

@ -0,0 +1,18 @@
[package]
name = "mixnet-client"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
tracing = "0.1.37"
tokio = { version = "1.29.1", features = ["net"] }
sphinx-packet = "0.1.0"
nym-sphinx = { package = "nym-sphinx", git = "https://github.com/nymtech/nym", tag = "v1.1.22" }
# Using an older version, since `nym-sphinx` depends on `rand` v0.7.3.
rand = "0.7.3"
mixnet-protocol = { path = "../protocol" }
mixnet-topology = { path = "../topology" }
mixnet-util = { path = "../util" }
futures = "0.3.28"
thiserror = "1"

View File

@ -0,0 +1,31 @@
use std::net::SocketAddr;
use futures::{stream, StreamExt};
use mixnet_topology::MixnetTopology;
use serde::{Deserialize, Serialize};
use crate::{receiver::Receiver, MessageStream, MixnetClientError};
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct MixnetClientConfig {
pub mode: MixnetClientMode,
pub topology: MixnetTopology,
pub connection_pool_size: usize,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum MixnetClientMode {
Sender,
SenderReceiver(SocketAddr),
}
impl MixnetClientMode {
pub(crate) async fn run(&self) -> Result<MessageStream, MixnetClientError> {
match self {
Self::Sender => Ok(stream::empty().boxed()),
Self::SenderReceiver(node_address) => {
Ok(Receiver::new(*node_address).run().await?.boxed())
}
}
}
}

56
mixnet/client/src/lib.rs Normal file
View File

@ -0,0 +1,56 @@
pub mod config;
mod receiver;
mod sender;
use std::error::Error;
use std::time::Duration;
pub use config::MixnetClientConfig;
pub use config::MixnetClientMode;
use futures::stream::BoxStream;
use mixnet_util::ConnectionPool;
use rand::Rng;
use sender::Sender;
use thiserror::Error;
// A client for sending packets to Mixnet and receiving packets from Mixnet.
pub struct MixnetClient<R: Rng> {
mode: MixnetClientMode,
sender: Sender<R>,
}
pub type MessageStream = BoxStream<'static, Result<Vec<u8>, MixnetClientError>>;
impl<R: Rng> MixnetClient<R> {
pub fn new(config: MixnetClientConfig, rng: R) -> Self {
let cache = ConnectionPool::new(config.connection_pool_size);
Self {
mode: config.mode,
sender: Sender::new(config.topology, cache, rng),
}
}
pub async fn run(&self) -> Result<MessageStream, MixnetClientError> {
self.mode.run().await
}
pub fn send(
&mut self,
msg: Vec<u8>,
total_delay: Duration,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
self.sender.send(msg, total_delay)
}
}
#[derive(Error, Debug)]
pub enum MixnetClientError {
#[error("mixnet node connect error")]
MixnetNodeConnectError,
#[error("mixnode stream has been closed")]
MixnetNodeStreamClosed,
#[error("unexpected stream body received")]
UnexpectedStreamBody,
#[error("invalid payload")]
InvalidPayload,
}

View File

@ -0,0 +1,139 @@
use std::{error::Error, net::SocketAddr};
use futures::{stream, Stream, StreamExt};
use mixnet_protocol::Body;
use nym_sphinx::{
chunking::{fragment::Fragment, reconstruction::MessageReconstructor},
message::{NymMessage, PaddedMessage},
Payload,
};
use tokio::net::TcpStream;
use crate::MixnetClientError;
// Receiver accepts TCP connections to receive incoming payloads from the Mixnet.
pub struct Receiver {
node_address: SocketAddr,
}
impl Receiver {
pub fn new(node_address: SocketAddr) -> Self {
Self { node_address }
}
pub async fn run(
&self,
) -> Result<
impl Stream<Item = Result<Vec<u8>, MixnetClientError>> + Send + 'static,
MixnetClientError,
> {
let Ok(socket) = TcpStream::connect(self.node_address).await else {
return Err(MixnetClientError::MixnetNodeConnectError);
};
Ok(Self::message_stream(Box::pin(Self::fragment_stream(
socket,
))))
}
fn fragment_stream(
socket: TcpStream,
) -> impl Stream<Item = Result<Fragment, MixnetClientError>> + Send + 'static {
stream::unfold(socket, |mut socket| async move {
let Ok(body) = Body::read(&mut socket).await else {
// TODO: Maybe this is a hard error and the stream is corrupted? In that case stop the stream
return Some((Err(MixnetClientError::MixnetNodeStreamClosed), socket));
};
match body {
Body::SphinxPacket(_) => {
Some((Err(MixnetClientError::UnexpectedStreamBody), socket))
}
Body::FinalPayload(payload) => Some((Self::fragment_from_payload(payload), socket)),
}
})
}
fn message_stream(
fragment_stream: impl Stream<Item = Result<Fragment, MixnetClientError>>
+ Send
+ Unpin
+ 'static,
) -> impl Stream<Item = Result<Vec<u8>, MixnetClientError>> + Send + 'static {
// MessageReconstructor buffers all received fragments
// and eventually returns reconstructed messages.
let message_reconstructor: MessageReconstructor = Default::default();
stream::unfold(
(fragment_stream, message_reconstructor),
|(mut fragment_stream, mut message_reconstructor)| async move {
let result =
Self::reconstruct_message(&mut fragment_stream, &mut message_reconstructor)
.await;
Some((result, (fragment_stream, message_reconstructor)))
},
)
}
fn fragment_from_payload(payload: Payload) -> Result<Fragment, MixnetClientError> {
let Ok(payload_plaintext) = payload.recover_plaintext() else {
return Err(MixnetClientError::InvalidPayload);
};
let Ok(fragment) = Fragment::try_from_bytes(&payload_plaintext) else {
return Err(MixnetClientError::InvalidPayload);
};
Ok(fragment)
}
async fn reconstruct_message(
fragment_stream: &mut (impl Stream<Item = Result<Fragment, MixnetClientError>>
+ Send
+ Unpin
+ 'static),
message_reconstructor: &mut MessageReconstructor,
) -> Result<Vec<u8>, MixnetClientError> {
// Read fragments until at least one message is fully reconstructed.
while let Some(next) = fragment_stream.next().await {
match next {
Ok(fragment) => {
if let Some(message) =
Self::try_reconstruct_message(fragment, message_reconstructor)
{
return Ok(message);
}
}
Err(e) => {
return Err(e);
}
}
}
// fragment_stream closed before messages are fully reconstructed
Err(MixnetClientError::MixnetNodeStreamClosed)
}
fn try_reconstruct_message(
fragment: Fragment,
message_reconstructor: &mut MessageReconstructor,
) -> Option<Vec<u8>> {
let reconstruction_result = message_reconstructor.insert_new_fragment(fragment);
match reconstruction_result {
Some((padded_message, _)) => {
let message = Self::remove_padding(padded_message).unwrap();
Some(message)
}
None => None,
}
}
fn remove_padding(msg: Vec<u8>) -> Result<Vec<u8>, Box<dyn Error>> {
let padded_message = PaddedMessage::new_reconstructed(msg);
// we need this because PaddedMessage.remove_padding requires it for other NymMessage types.
let dummy_num_mix_hops = 0;
match padded_message.remove_padding(dummy_num_mix_hops)? {
NymMessage::Plain(msg) => Ok(msg),
_ => todo!("return error"),
}
}
}

200
mixnet/client/src/sender.rs Normal file
View File

@ -0,0 +1,200 @@
use std::{error::Error, net::SocketAddr, time::Duration};
use mixnet_protocol::Body;
use mixnet_topology::MixnetTopology;
use mixnet_util::ConnectionPool;
use nym_sphinx::{
addressing::nodes::NymNodeRoutingAddress, chunking::fragment::Fragment, message::NymMessage,
params::PacketSize, Delay, Destination, DestinationAddressBytes, NodeAddressBytes,
IDENTIFIER_LENGTH, PAYLOAD_OVERHEAD_SIZE,
};
use rand::{distributions::Uniform, prelude::Distribution, Rng};
use sphinx_packet::{route, SphinxPacket, SphinxPacketBuilder};
// Sender splits messages into Sphinx packets and sends them to the Mixnet.
pub struct Sender<R: Rng> {
//TODO: handle topology update
topology: MixnetTopology,
pool: ConnectionPool,
rng: R,
}
impl<R: Rng> Sender<R> {
pub fn new(topology: MixnetTopology, pool: ConnectionPool, rng: R) -> Self {
Self {
topology,
rng,
pool,
}
}
pub fn send(
&mut self,
msg: Vec<u8>,
total_delay: Duration,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let destination = self.topology.random_destination(&mut self.rng)?;
let destination = Destination::new(
DestinationAddressBytes::from_bytes(destination.address.as_bytes()),
[0; IDENTIFIER_LENGTH], // TODO: use a proper SURBIdentifier if we need SURB
);
self.pad_and_split_message(msg)
.into_iter()
.map(|fragment| self.build_sphinx_packet(fragment, &destination, total_delay))
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.for_each(|(packet, first_node)| {
let pool = self.pool.clone();
tokio::spawn(async move {
if let Err(e) =
Self::send_packet(&pool, Box::new(packet), first_node.address).await
{
tracing::error!("failed to send packet to the first node: {e}");
}
});
});
Ok(())
}
fn pad_and_split_message(&mut self, msg: Vec<u8>) -> Vec<Fragment> {
let nym_message = NymMessage::new_plain(msg);
// TODO: add PUBLIC_KEY_SIZE for encryption for the destination,
// if we're going to encrypt final payloads for the destination.
// TODO: add ACK_OVERHEAD if we need SURB-ACKs.
// https://github.com/nymtech/nym/blob/3748ab77a132143d5fd1cd75dd06334d33294815/common/nymsphinx/src/message.rs#L181-L181
let plaintext_size_per_packet = PacketSize::RegularPacket.plaintext_size();
nym_message
.pad_to_full_packet_lengths(plaintext_size_per_packet)
.split_into_fragments(&mut self.rng, plaintext_size_per_packet)
}
fn build_sphinx_packet(
&mut self,
fragment: Fragment,
destination: &Destination,
total_delay: Duration,
) -> Result<(sphinx_packet::SphinxPacket, route::Node), Box<dyn Error + Send + Sync + 'static>>
{
let route = self.topology.random_route(&mut self.rng)?;
let delays: Vec<Delay> =
RandomDelayIterator::new(&mut self.rng, route.len() as u64, total_delay)
.map(|d| Delay::new_from_millis(d.as_millis() as u64))
.collect();
// TODO: encrypt the payload for the destination, if we want
// https://github.com/nymtech/nym/blob/3748ab77a132143d5fd1cd75dd06334d33294815/common/nymsphinx/src/preparer/payload.rs#L70
let payload = fragment.into_bytes();
let packet = SphinxPacketBuilder::new()
.with_payload_size(payload.len() + PAYLOAD_OVERHEAD_SIZE)
.build_packet(payload, &route, destination, &delays)?;
let first_mixnode = route.first().cloned().expect("route is not empty");
Ok((packet, first_mixnode))
}
async fn send_packet(
pool: &ConnectionPool,
packet: Box<SphinxPacket>,
addr: NodeAddressBytes,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let addr = SocketAddr::try_from(NymNodeRoutingAddress::try_from(addr)?)?;
tracing::debug!("Sending a Sphinx packet to the node: {addr:?}");
let mu: std::sync::Arc<tokio::sync::Mutex<tokio::net::TcpStream>> =
pool.get_or_init(&addr).await?;
let mut socket = mu.lock().await;
let body = Body::new_sphinx(packet);
body.write(&mut *socket).await?;
tracing::debug!("Sent a Sphinx packet successuflly to the node: {addr:?}");
Ok(())
}
}
struct RandomDelayIterator<R> {
rng: R,
remaining_delays: u64,
remaining_time: u64,
avg_delay: u64,
}
impl<R> RandomDelayIterator<R> {
fn new(rng: R, total_delays: u64, total_time: Duration) -> Self {
let total_time = total_time.as_millis() as u64;
RandomDelayIterator {
rng,
remaining_delays: total_delays,
remaining_time: total_time,
avg_delay: total_time / total_delays,
}
}
}
impl<R> Iterator for RandomDelayIterator<R>
where
R: Rng,
{
type Item = Duration;
fn next(&mut self) -> Option<Duration> {
if self.remaining_delays == 0 {
return None;
}
self.remaining_delays -= 1;
if self.remaining_delays == 1 {
return Some(Duration::from_millis(self.remaining_time));
}
// Calculate bounds to avoid extreme values
let upper_bound = (self.avg_delay as f64 * 1.5)
// guarantee that we don't exceed the remaining time and promise the delay we return is
// at least 1ms.
.min(self.remaining_time.saturating_sub(self.remaining_delays) as f64);
let lower_bound = (self.avg_delay as f64 * 0.5).min(upper_bound);
let delay = Uniform::new_inclusive(lower_bound, upper_bound).sample(&mut self.rng) as u64;
self.remaining_time = self.remaining_time.saturating_sub(delay);
Some(Duration::from_millis(delay))
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::RandomDelayIterator;
const TOTAL_DELAYS: u64 = 3;
#[test]
fn test_random_delay_iter_zero_total_time() {
let mut delays = RandomDelayIterator::new(rand::thread_rng(), TOTAL_DELAYS, Duration::ZERO);
for _ in 0..TOTAL_DELAYS {
assert!(delays.next().is_some());
}
assert!(delays.next().is_none());
}
#[test]
fn test_random_delay_iter_small_total_time() {
let mut delays =
RandomDelayIterator::new(rand::thread_rng(), TOTAL_DELAYS, Duration::from_millis(1));
let mut d = Duration::ZERO;
for _ in 0..TOTAL_DELAYS {
d += delays.next().unwrap();
}
assert!(delays.next().is_none());
assert_eq!(d, Duration::from_millis(1));
}
}

17
mixnet/node/Cargo.toml Normal file
View File

@ -0,0 +1,17 @@
[package]
name = "mixnet-node"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
tracing = "0.1.37"
tokio = { version = "1.32", features = ["net", "time"] }
sphinx-packet = "0.1.0"
nym-sphinx = { package = "nym-sphinx", git = "https://github.com/nymtech/nym", tag = "v1.1.22" }
mixnet-protocol = { path = "../protocol" }
mixnet-topology = { path = "../topology" }
mixnet-util = { path = "../util" }
[dev-dependencies]
tokio = {version = "1.32", features =["full"]}

View File

@ -0,0 +1,47 @@
use std::{error::Error, net::SocketAddr};
use mixnet_protocol::Body;
use tokio::{
net::{TcpListener, TcpStream},
sync::mpsc,
};
pub struct ClientNotifier {}
impl ClientNotifier {
pub async fn run(
listen_address: SocketAddr,
mut rx: mpsc::Receiver<Body>,
) -> Result<(), Box<dyn Error>> {
let listener = TcpListener::bind(listen_address).await?;
tracing::info!("Listening mixnet client connections: {listen_address}");
// Currently, handling only a single incoming connection
// TODO: consider handling multiple clients
loop {
match listener.accept().await {
Ok((socket, remote_addr)) => {
tracing::debug!("Accepted incoming client connection from {remote_addr:?}");
if let Err(e) = Self::handle_connection(socket, &mut rx).await {
tracing::error!("failed to handle conn: {e}");
}
}
Err(e) => tracing::warn!("Failed to accept incoming client connection: {e}"),
}
}
}
async fn handle_connection(
mut socket: TcpStream,
rx: &mut mpsc::Receiver<Body>,
) -> Result<(), Box<dyn Error>> {
while let Some(body) = rx.recv().await {
if let Err(e) = body.write(&mut socket).await {
return Err(format!("error from client conn: {e}").into());
}
}
tracing::debug!("body receiver closed");
Ok(())
}
}

37
mixnet/node/src/config.rs Normal file
View File

@ -0,0 +1,37 @@
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use nym_sphinx::{PrivateKey, PublicKey};
use serde::{Deserialize, Serialize};
use sphinx_packet::crypto::{PRIVATE_KEY_SIZE, PUBLIC_KEY_SIZE};
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct MixnetNodeConfig {
/// A listen address for receiving Sphinx packets
pub listen_address: SocketAddr,
/// An listen address fro communicating with mixnet clients
pub client_listen_address: SocketAddr,
/// A key for decrypting Sphinx packets
pub private_key: [u8; PRIVATE_KEY_SIZE],
/// The size of the connection pool.
pub connection_pool_size: usize,
}
impl Default for MixnetNodeConfig {
fn default() -> Self {
Self {
listen_address: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 7777)),
client_listen_address: SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
7778,
)),
private_key: PrivateKey::new().to_bytes(),
connection_pool_size: 255,
}
}
}
impl MixnetNodeConfig {
pub fn public_key(&self) -> [u8; PUBLIC_KEY_SIZE] {
*PublicKey::from(&PrivateKey::from(self.private_key)).as_bytes()
}
}

191
mixnet/node/src/lib.rs Normal file
View File

@ -0,0 +1,191 @@
mod client_notifier;
pub mod config;
use std::{error::Error, net::SocketAddr};
use client_notifier::ClientNotifier;
pub use config::MixnetNodeConfig;
use mixnet_protocol::Body;
use mixnet_topology::MixnetNodeId;
use mixnet_util::ConnectionPool;
use nym_sphinx::{
addressing::nodes::NymNodeRoutingAddress, Delay, DestinationAddressBytes, NodeAddressBytes,
Payload, PrivateKey,
};
pub use sphinx_packet::crypto::PRIVATE_KEY_SIZE;
use sphinx_packet::{crypto::PUBLIC_KEY_SIZE, ProcessedPacket, SphinxPacket};
use tokio::{
net::{TcpListener, TcpStream},
sync::mpsc,
};
// A mix node that routes packets in the Mixnet.
pub struct MixnetNode {
config: MixnetNodeConfig,
pool: ConnectionPool,
}
impl MixnetNode {
pub fn new(config: MixnetNodeConfig) -> Self {
let pool = ConnectionPool::new(config.connection_pool_size);
Self { config, pool }
}
pub fn id(&self) -> MixnetNodeId {
self.public_key()
}
pub fn public_key(&self) -> [u8; PUBLIC_KEY_SIZE] {
self.config.public_key()
}
const CLIENT_NOTI_CHANNEL_SIZE: usize = 100;
pub async fn run(self) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
tracing::info!("Public key: {:?}", self.public_key());
// Spawn a ClientNotifier
let (client_tx, client_rx) = mpsc::channel(Self::CLIENT_NOTI_CHANNEL_SIZE);
tokio::spawn(async move {
if let Err(e) = ClientNotifier::run(self.config.client_listen_address, client_rx).await
{
tracing::error!("failed to run client notifier: {e}");
}
});
//TODO: Accepting ad-hoc TCP conns for now. Improve conn handling.
//TODO: Add graceful shutdown
let listener = TcpListener::bind(self.config.listen_address).await?;
tracing::info!(
"Listening mixnet node connections: {}",
self.config.listen_address
);
loop {
match listener.accept().await {
Ok((socket, remote_addr)) => {
tracing::debug!("Accepted incoming connection from {remote_addr:?}");
let client_tx = client_tx.clone();
let private_key = self.config.private_key;
let pool = self.pool.clone();
tokio::spawn(async move {
if let Err(e) =
Self::handle_connection(socket, pool, private_key, client_tx).await
{
tracing::error!("failed to handle conn: {e}");
}
});
}
Err(e) => tracing::warn!("Failed to accept incoming connection: {e}"),
}
}
}
async fn handle_connection(
mut socket: TcpStream,
pool: ConnectionPool,
private_key: [u8; PRIVATE_KEY_SIZE],
client_tx: mpsc::Sender<Body>,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
loop {
let body = Body::read(&mut socket).await?;
let pool = pool.clone();
let private_key = PrivateKey::from(private_key);
let client_tx = client_tx.clone();
tokio::spawn(async move {
if let Err(e) = Self::handle_body(body, &pool, &private_key, &client_tx).await {
tracing::error!("failed to handle body: {e}");
}
});
}
}
async fn handle_body(
body: Body,
pool: &ConnectionPool,
private_key: &PrivateKey,
client_tx: &mpsc::Sender<Body>,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
match body {
Body::SphinxPacket(packet) => {
Self::handle_sphinx_packet(pool, private_key, packet).await
}
_body @ Body::FinalPayload(_) => {
Self::forward_body_to_client_notifier(private_key, client_tx, _body).await
}
}
}
async fn handle_sphinx_packet(
pool: &ConnectionPool,
private_key: &PrivateKey,
packet: Box<SphinxPacket>,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
match packet.process(private_key)? {
ProcessedPacket::ForwardHop(packet, next_node_addr, delay) => {
Self::forward_packet_to_next_hop(pool, packet, next_node_addr, delay).await
}
ProcessedPacket::FinalHop(destination_addr, _, payload) => {
Self::forward_payload_to_destination(pool, payload, destination_addr).await
}
}
}
async fn forward_body_to_client_notifier(
_private_key: &PrivateKey,
client_tx: &mpsc::Sender<Body>,
body: Body,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// TODO: Decrypt the final payload using the private key, if it's encrypted
// Do not wait when the channel is full or no receiver exists
client_tx.try_send(body)?;
Ok(())
}
async fn forward_packet_to_next_hop(
pool: &ConnectionPool,
packet: Box<SphinxPacket>,
next_node_addr: NodeAddressBytes,
delay: Delay,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
tracing::debug!("Delaying the packet for {delay:?}");
tokio::time::sleep(delay.to_duration()).await;
Self::forward(
pool,
Body::new_sphinx(packet),
NymNodeRoutingAddress::try_from(next_node_addr)?,
)
.await
}
async fn forward_payload_to_destination(
pool: &ConnectionPool,
payload: Payload,
destination_addr: DestinationAddressBytes,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
tracing::debug!("Forwarding final payload to destination mixnode");
Self::forward(
pool,
Body::new_final_payload(payload),
NymNodeRoutingAddress::try_from_bytes(&destination_addr.as_bytes())?,
)
.await
}
async fn forward(
pool: &ConnectionPool,
body: Body,
to: NymNodeRoutingAddress,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let addr = SocketAddr::try_from(to)?;
body.write(&mut *pool.get_or_init(&addr).await?.lock().await)
.await?;
Ok(())
}
}

View File

@ -0,0 +1,12 @@
[package]
name = "mixnet-protocol"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = "1.29.1"
sphinx-packet = "0.1.0"
futures = "0.3"
tokio-util = {version = "0.7", features = ["io", "io-util"] }

101
mixnet/protocol/src/lib.rs Normal file
View File

@ -0,0 +1,101 @@
use sphinx_packet::{payload::Payload, SphinxPacket};
use std::error::Error;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
pub enum Body {
SphinxPacket(Box<SphinxPacket>),
FinalPayload(Payload),
}
impl Body {
pub fn new_sphinx(packet: Box<SphinxPacket>) -> Self {
Self::SphinxPacket(packet)
}
pub fn new_final_payload(payload: Payload) -> Self {
Self::FinalPayload(payload)
}
fn variant_as_u8(&self) -> u8 {
match self {
Self::SphinxPacket(_) => 0,
Self::FinalPayload(_) => 1,
}
}
pub async fn read<R>(reader: &mut R) -> Result<Body, Box<dyn Error + Send + Sync + 'static>>
where
R: AsyncRead + Unpin,
{
let id = reader.read_u8().await?;
match id {
0 => Self::read_sphinx_packet(reader).await,
1 => Self::read_final_payload(reader).await,
_ => Err("Invalid body type".into()),
}
}
fn sphinx_packet_from_bytes(
data: &[u8],
) -> Result<Self, Box<dyn Error + Send + Sync + 'static>> {
let packet = SphinxPacket::from_bytes(data)?;
Ok(Self::new_sphinx(Box::new(packet)))
}
async fn read_sphinx_packet<R>(
reader: &mut R,
) -> Result<Body, Box<dyn Error + Send + Sync + 'static>>
where
R: AsyncRead + Unpin,
{
let size = reader.read_u64().await?;
let mut buf = vec![0; size as usize];
reader.read_exact(&mut buf).await?;
Self::sphinx_packet_from_bytes(&buf)
}
pub fn final_payload_from_bytes(
data: &[u8],
) -> Result<Self, Box<dyn Error + Send + Sync + 'static>> {
let payload = Payload::from_bytes(data)?;
Ok(Self::new_final_payload(payload))
}
async fn read_final_payload<R>(
reader: &mut R,
) -> Result<Body, Box<dyn Error + Send + Sync + 'static>>
where
R: AsyncRead + Unpin,
{
let size = reader.read_u64().await?;
let mut buf = vec![0; size as usize];
reader.read_exact(&mut buf).await?;
Self::final_payload_from_bytes(&buf)
}
pub async fn write<W>(
self,
writer: &mut W,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>>
where
W: AsyncWrite + Unpin + ?Sized,
{
let variant = self.variant_as_u8();
writer.write_u8(variant).await?;
match self {
Body::SphinxPacket(packet) => {
let data = packet.to_bytes();
writer.write_u64(data.len() as u64).await?;
writer.write_all(&data).await?;
}
Body::FinalPayload(payload) => {
let data = payload.as_bytes();
writer.write_u64(data.len() as u64).await?;
writer.write_all(data).await?;
}
}
Ok(())
}
}

View File

@ -0,0 +1,12 @@
[package]
name = "mixnet-topology"
version = "0.1.0"
edition = "2021"
[dependencies]
hex = "0.4"
# Using an older version, since `nym-sphinx` depends on `rand` v0.7.3.
rand = "0.7.3"
serde = { version = "1.0", features = ["derive"] }
sphinx-packet = "0.1.0"
nym-sphinx = { package = "nym-sphinx", git = "https://github.com/nymtech/nym", tag = "v1.1.22" }

112
mixnet/topology/src/lib.rs Normal file
View File

@ -0,0 +1,112 @@
use std::{error::Error, net::SocketAddr};
use nym_sphinx::addressing::nodes::{NymNodeRoutingAddress, NymNodeRoutingAddressError};
use rand::{seq::IteratorRandom, Rng};
use serde::{Deserialize, Serialize};
use sphinx_packet::{crypto::PUBLIC_KEY_SIZE, route};
pub type MixnetNodeId = [u8; PUBLIC_KEY_SIZE];
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub struct MixnetTopology {
pub layers: Vec<Layer>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Layer {
pub nodes: Vec<Node>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Node {
pub address: SocketAddr,
#[serde(with = "hex_serde")]
pub public_key: [u8; PUBLIC_KEY_SIZE],
}
mod hex_serde {
use super::PUBLIC_KEY_SIZE;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub fn serialize<S: Serializer>(
pk: &[u8; PUBLIC_KEY_SIZE],
serializer: S,
) -> Result<S::Ok, S::Error> {
if serializer.is_human_readable() {
hex::encode(pk).serialize(serializer)
} else {
serializer.serialize_bytes(pk)
}
}
pub fn deserialize<'de, D: Deserializer<'de>>(
deserializer: D,
) -> Result<[u8; PUBLIC_KEY_SIZE], D::Error> {
if deserializer.is_human_readable() {
let hex_str = String::deserialize(deserializer)?;
hex::decode(hex_str)
.map_err(serde::de::Error::custom)
.and_then(|v| v.as_slice().try_into().map_err(serde::de::Error::custom))
} else {
<[u8; PUBLIC_KEY_SIZE]>::deserialize(deserializer)
}
}
}
impl MixnetTopology {
pub fn random_route<R: Rng>(
&self,
rng: &mut R,
) -> Result<Vec<route::Node>, Box<dyn Error + Send + Sync + 'static>> {
let num_hops = self.layers.len();
let route: Vec<route::Node> = self
.layers
.iter()
.take(num_hops)
.map(|layer| {
layer
.random_node(rng)
.expect("layer is not empty")
.clone()
.try_into()
.unwrap()
})
.collect();
Ok(route)
}
// Choose a destination mixnet node randomly from the last layer.
pub fn random_destination<R: Rng>(
&self,
rng: &mut R,
) -> Result<route::Node, Box<dyn Error + Send + Sync + 'static>> {
Ok(self
.layers
.last()
.expect("topology is not empty")
.random_node(rng)
.expect("layer is not empty")
.clone()
.try_into()
.unwrap())
}
}
impl Layer {
pub fn random_node<R: Rng>(&self, rng: &mut R) -> Option<&Node> {
self.nodes.iter().choose(rng)
}
}
impl TryInto<route::Node> for Node {
type Error = NymNodeRoutingAddressError;
fn try_into(self) -> Result<route::Node, Self::Error> {
Ok(route::Node {
address: NymNodeRoutingAddress::from(self.address).try_into()?,
pub_key: self.public_key.into(),
})
}
}

8
mixnet/util/Cargo.toml Normal file
View File

@ -0,0 +1,8 @@
[package]
name = "mixnet-util"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.32", default-features = false, features = ["sync", "net"] }
parking_lot = { version = "0.12", features = ["send_guard"] }

29
mixnet/util/src/lib.rs Normal file
View File

@ -0,0 +1,29 @@
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
use tokio::net::TcpStream;
use tokio::sync::Mutex;
#[derive(Clone)]
pub struct ConnectionPool {
pool: Arc<Mutex<HashMap<SocketAddr, Arc<Mutex<TcpStream>>>>>,
}
impl ConnectionPool {
pub fn new(size: usize) -> Self {
Self {
pool: Arc::new(Mutex::new(HashMap::with_capacity(size))),
}
}
pub async fn get_or_init(&self, addr: &SocketAddr) -> std::io::Result<Arc<Mutex<TcpStream>>> {
let mut pool = self.pool.lock().await;
match pool.get(addr).cloned() {
Some(tcp) => Ok(tcp),
None => {
let tcp = Arc::new(Mutex::new(TcpStream::connect(addr).await?));
pool.insert(*addr, tcp.clone());
Ok(tcp)
}
}
}
}

18
nodes/mixnode/Cargo.toml Normal file
View File

@ -0,0 +1,18 @@
[package]
name = "mixnode"
version = "0.1.0"
edition = "2021"
[dependencies]
async-trait = "0.1"
mixnet-node = { path = "../../mixnet/node" }
nomos-log = { path = "../../nomos-services/log" }
clap = { version = "4", features = ["derive"] }
color-eyre = "0.6.0"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
serde = "1"
serde_yaml = "0.9"
tracing = "0.1"
tracing-subscriber = "0.3"
tokio = "1.29.1"

View File

@ -0,0 +1,9 @@
mixnode:
listen_address: 127.0.0.1:7777
client_listen_address: 127.0.0.1:7778
private_key: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
connection_pool_size: 255
log:
backend: "Stdout"
format: "Json"
level: "debug"

20
nodes/mixnode/src/lib.rs Normal file
View File

@ -0,0 +1,20 @@
mod services;
use nomos_log::Logger;
use overwatch_derive::Services;
use overwatch_rs::services::handle::ServiceHandle;
use overwatch_rs::services::ServiceData;
use serde::{Deserialize, Serialize};
use services::mixnet::MixnetNodeService;
#[derive(Deserialize, Debug, Clone, Serialize)]
pub struct Config {
pub mixnode: <MixnetNodeService as ServiceData>::Settings,
pub log: <Logger as ServiceData>::Settings,
}
#[derive(Services)]
pub struct MixNode {
node: ServiceHandle<MixnetNodeService>,
logging: ServiceHandle<Logger>,
}

29
nodes/mixnode/src/main.rs Normal file
View File

@ -0,0 +1,29 @@
mod services;
use clap::Parser;
use color_eyre::eyre::Result;
use mixnode::{Config, MixNode, MixNodeServiceSettings};
use overwatch_rs::overwatch::OverwatchRunner;
use overwatch_rs::DynError;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Path for a yaml-encoded mixnet-node config file
config: std::path::PathBuf,
}
fn main() -> Result<(), DynError> {
let Args { config } = Args::parse();
let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)?;
let app = OverwatchRunner::<MixNode>::run(
MixNodeServiceSettings {
node: config.mixnode,
logging: config.log,
},
None,
)?;
app.wait_finished();
Ok(())
}

View File

@ -0,0 +1,31 @@
use mixnet_node::{MixnetNode, MixnetNodeConfig};
use overwatch_rs::services::handle::ServiceStateHandle;
use overwatch_rs::services::relay::NoMessage;
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::DynError;
pub struct MixnetNodeService(MixnetNode);
impl ServiceData for MixnetNodeService {
const SERVICE_ID: ServiceId = "mixnet-node";
type Settings = MixnetNodeConfig;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = NoMessage;
}
#[async_trait::async_trait]
impl ServiceCore for MixnetNodeService {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
let settings: Self::Settings = service_state.settings_reader.get_updated_settings();
Ok(Self(MixnetNode::new(settings)))
}
async fn run(self) -> Result<(), DynError> {
if let Err(_e) = self.0.run().await {
todo!("Errors should match");
}
Ok(())
}
}

View File

@ -0,0 +1 @@
pub mod mixnet;

View File

@ -19,6 +19,18 @@ network:
discV5BootstrapNodes: []
initial_peers: []
relayTopics: []
mixnet_client:
mode: Sender
topology:
layers:
- nodes:
- address: 127.0.0.1:7777
public_key: "0000000000000000000000000000000000000000000000000000000000000000"
connection_pool_size: 255
mixnet_delay:
start: "0ms"
end: "0ms"
http:
backend:
address: 0.0.0.0:8080

View File

@ -214,22 +214,23 @@ impl Config {
} = network_args;
if let Some(IpAddr::V4(h)) = host {
self.network.backend.host = h;
self.network.backend.inner.host = h;
} else if host.is_some() {
return Err(eyre!("Unsupported ip version"));
}
if let Some(port) = port {
self.network.backend.port = port as u16;
self.network.backend.inner.port = port as u16;
}
if let Some(node_key) = node_key {
let mut key_bytes = hex::decode(node_key)?;
self.network.backend.node_key = SecretKey::try_from_bytes(key_bytes.as_mut_slice())?;
self.network.backend.inner.node_key =
SecretKey::try_from_bytes(key_bytes.as_mut_slice())?;
}
if let Some(peers) = initial_peers {
self.network.backend.initial_peers = peers;
self.network.backend.inner.initial_peers = peers;
}
Ok(self)

View File

@ -14,14 +14,12 @@ blake2 = { version = "0.10" }
bytes = "1.3"
consensus-engine = { path = "../consensus-engine", features = ["serde"]}
futures = "0.3"
nomos-network = { path = "../nomos-services/network", optional = true }
raptorq = { version = "1.7", optional = true }
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0"
bincode = "1.3"
once_cell = "1.0"
indexmap = { version = "1.9", features = ["serde"] }
serde_json = { version = "1", optional = true }
[dev-dependencies]
rand = "0.8"
@ -31,5 +29,4 @@ tokio = { version = "1.23", features = ["macros", "rt"] }
[features]
default = []
raptor = ["raptorq"]
mock = ["nomos-network/mock", "serde_json"]
waku = ["nomos-network/waku", "serde_json"]
mock = []

View File

@ -6,21 +6,21 @@ use blake2::{
Blake2bVar,
};
use bytes::{Bytes, BytesMut};
use nomos_network::backends::mock::MockMessage;
use serde::Serialize;
#[derive(Debug, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
pub struct MockTransaction {
pub struct MockTransaction<M> {
id: MockTxId,
content: MockMessage,
content: M,
}
impl MockTransaction {
pub fn new(content: MockMessage) -> Self {
let id = MockTxId::from(content.clone());
impl<M: Serialize> MockTransaction<M> {
pub fn new(content: M) -> Self {
let id = MockTxId::from(serialize(&content).unwrap().as_slice());
Self { id, content }
}
pub fn message(&self) -> &MockMessage {
pub fn message(&self) -> &M {
&self.content
}
@ -37,7 +37,7 @@ impl MockTransaction {
}
}
impl Transaction for MockTransaction {
impl<M: Serialize> Transaction for MockTransaction<M> {
const HASHER: TransactionHasher<Self> = MockTransaction::id;
type Hash = MockTxId;
@ -46,9 +46,9 @@ impl Transaction for MockTransaction {
}
}
impl From<nomos_network::backends::mock::MockMessage> for MockTransaction {
fn from(msg: nomos_network::backends::mock::MockMessage) -> Self {
let id = MockTxId::from(msg.clone());
impl<M: Serialize> From<M> for MockTransaction<M> {
fn from(msg: M) -> Self {
let id = MockTxId::from(serialize(&msg).unwrap().as_slice());
Self { id, content: msg }
}
}
@ -84,18 +84,18 @@ impl MockTxId {
}
}
impl From<nomos_network::backends::mock::MockMessage> for MockTxId {
fn from(msg: nomos_network::backends::mock::MockMessage) -> Self {
impl From<&[u8]> for MockTxId {
fn from(msg: &[u8]) -> Self {
let mut hasher = Blake2bVar::new(32).unwrap();
hasher.update(&serialize(&msg).unwrap());
hasher.update(msg);
let mut id = [0u8; 32];
hasher.finalize_variable(&mut id).unwrap();
Self(id)
}
}
impl From<&MockTransaction> for MockTxId {
fn from(msg: &MockTransaction) -> Self {
impl<M> From<&MockTransaction<M>> for MockTxId {
fn from(msg: &MockTransaction<M>) -> Self {
msg.id
}
}

View File

@ -231,6 +231,11 @@ impl NetworkAdapter for Libp2pAdapter {
let cache = message_cache.clone();
let relay = network_relay.clone();
Self::subscribe(&relay, TOPIC).await;
tracing::debug!("Starting up...");
// this wait seems to be helpful in some cases since we give the time
// to the network to establish connections before we start sending messages
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// TODO: maybe we need the runtime handle here?
tokio::spawn(async move {
let (sender, receiver) = tokio::sync::oneshot::channel();

View File

@ -30,6 +30,6 @@ blake2 = "0.10"
[features]
default = []
waku = ["nomos-network/waku", "nomos-core/waku", "waku-bindings"]
waku = ["nomos-network/waku", "waku-bindings"]
mock = ["linked-hash-map", "nomos-network/mock", "rand", "nomos-core/mock"]
libp2p = ["nomos-network/libp2p"]

View File

@ -4,7 +4,7 @@
use futures::{Stream, StreamExt};
use nomos_core::tx::mock::MockTransaction;
use nomos_network::backends::mock::{
EventKind, Mock, MockBackendMessage, MockContentTopic, NetworkEvent,
EventKind, Mock, MockBackendMessage, MockContentTopic, MockMessage, NetworkEvent,
};
use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay;
@ -25,7 +25,7 @@ pub struct MockAdapter {
#[async_trait::async_trait]
impl NetworkAdapter for MockAdapter {
type Backend = Mock;
type Tx = MockTransaction;
type Tx = MockTransaction<MockMessage>;
async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,

View File

@ -17,7 +17,7 @@ use nomos_mempool::{
struct MockPoolNode {
logging: ServiceHandle<Logger>,
network: ServiceHandle<NetworkService<Mock>>,
mockpool: ServiceHandle<MempoolService<MockAdapter, MockPool<MockTransaction>>>,
mockpool: ServiceHandle<MempoolService<MockAdapter, MockPool<MockTransaction<MockMessage>>>>,
}
#[test]
@ -67,7 +67,7 @@ fn test_mockmempool() {
let network = app.handle().relay::<NetworkService<Mock>>();
let mempool = app
.handle()
.relay::<MempoolService<MockAdapter, MockPool<MockTransaction>>>();
.relay::<MempoolService<MockAdapter, MockPool<MockTransaction<MockMessage>>>>();
app.spawn(async move {
let network_outbound = network.connect().await.unwrap();

View File

@ -9,6 +9,7 @@ edition = "2021"
async-trait = "0.1"
bytes = "1.2"
chrono = { version = "0.4", optional = true }
humantime-serde = { version = "1", optional = true }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
multiaddr = "0.15"
serde = { version = "1.0", features = ["derive"] }
@ -18,14 +19,16 @@ tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1"
thiserror = "1.0"
tracing = "0.1"
rand = { version = "0.8", optional = true }
rand = { version = "0.7.3", optional = true }
waku-bindings = { version = "0.1.1", optional = true }
tracing-appender = "0.2"
tracing-subscriber = { version = "0.3", features = ["json"] }
tracing-gelf = "0.7"
futures = "0.3"
parking_lot = "0.12"
nomos-core = { path = "../../nomos-core" }
nomos-libp2p = { path = "../../nomos-libp2p", optional = true }
mixnet-client = { path = "../../mixnet/client" }
[dev-dependencies]
tokio = { version = "1", features = ["full"] }
@ -33,5 +36,5 @@ tokio = { version = "1", features = ["full"] }
[features]
default = []
waku = ["waku-bindings"]
libp2p = ["nomos-libp2p"]
libp2p = ["nomos-libp2p", "rand", "humantime-serde"]
mock = ["rand", "chrono"]

View File

@ -1,7 +1,9 @@
// std
use std::error::Error;
use std::{error::Error, ops::Range, time::Duration};
// internal
use super::NetworkBackend;
use mixnet_client::{MixnetClient, MixnetClientConfig};
use nomos_core::wire;
pub use nomos_libp2p::libp2p::gossipsub::{Message, TopicHash};
use nomos_libp2p::{
libp2p::{gossipsub, Multiaddr, PeerId},
@ -9,8 +11,10 @@ use nomos_libp2p::{
};
// crates
use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::NoState};
use rand::{rngs::OsRng, thread_rng, Rng};
use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_stream::StreamExt;
macro_rules! log_error {
($e:expr) => {
@ -25,6 +29,55 @@ pub struct Libp2p {
commands_tx: mpsc::Sender<Command>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Libp2pConfig {
#[serde(flatten)]
pub inner: SwarmConfig,
pub mixnet_client: MixnetClientConfig,
#[serde(with = "humantime")]
pub mixnet_delay: Range<Duration>,
}
mod humantime {
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::{ops::Range, time::Duration};
#[derive(Serialize, Deserialize)]
struct DurationRangeHelper {
#[serde(with = "humantime_serde")]
start: Duration,
#[serde(with = "humantime_serde")]
end: Duration,
}
pub fn serialize<S: Serializer>(
val: &Range<Duration>,
serializer: S,
) -> Result<S::Ok, S::Error> {
if serializer.is_human_readable() {
DurationRangeHelper {
start: val.start,
end: val.end,
}
.serialize(serializer)
} else {
val.serialize(serializer)
}
}
pub fn deserialize<'de, D: Deserializer<'de>>(
deserializer: D,
) -> Result<Range<Duration>, D::Error> {
if deserializer.is_human_readable() {
let DurationRangeHelper { start, end } =
DurationRangeHelper::deserialize(deserializer)?;
Ok(start..end)
} else {
Range::<Duration>::deserialize(deserializer)
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Libp2pInfo {
pub listen_addresses: Vec<Multiaddr>,
@ -39,14 +92,29 @@ pub enum EventKind {
}
const BUFFER_SIZE: usize = 64;
const BACKOFF: u64 = 5;
const MAX_RETRY: usize = 3;
#[derive(Debug)]
#[non_exhaustive]
pub enum Command {
Connect(PeerId, Multiaddr),
Broadcast { topic: Topic, message: Box<[u8]> },
Broadcast {
topic: Topic,
message: Box<[u8]>,
},
Subscribe(Topic),
Unsubscribe(Topic),
Info { reply: oneshot::Sender<Libp2pInfo> },
Info {
reply: oneshot::Sender<Libp2pInfo>,
},
#[doc(hidden)]
// broadcast a message directly through gossipsub without mixnet
DirectBroadcastAndRetry {
topic: Topic,
message: Box<[u8]>,
retry: usize,
},
}
pub type Topic = String;
@ -58,24 +126,73 @@ pub enum Event {
Message(Message),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct MixnetMessage {
topic: Topic,
message: Box<[u8]>,
}
impl MixnetMessage {
pub fn as_bytes(&self) -> Vec<u8> {
wire::serialize(self).expect("Couldn't serialize MixnetMessage")
}
pub fn from_bytes(data: &[u8]) -> Result<Self, wire::Error> {
wire::deserialize(data)
}
}
#[async_trait::async_trait]
impl NetworkBackend for Libp2p {
type Settings = SwarmConfig;
type State = NoState<SwarmConfig>;
type Settings = Libp2pConfig;
type State = NoState<Libp2pConfig>;
type Message = Command;
type EventKind = EventKind;
type NetworkEvent = Event;
fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self {
let mixnet_client = MixnetClient::new(config.mixnet_client.clone(), OsRng);
let (commands_tx, mut commands_rx) = tokio::sync::mpsc::channel(BUFFER_SIZE);
let (events_tx, _) = tokio::sync::broadcast::channel(BUFFER_SIZE);
let libp2p = Self {
events_tx: events_tx.clone(),
commands_tx,
};
let cmd_tx = commands_tx.clone();
overwatch_handle.runtime().spawn(async move {
use tokio_stream::StreamExt;
let mut swarm = Swarm::build(&config).unwrap();
let Ok(mut stream) = mixnet_client.run().await else {
tracing::error!("Could not quickstart mixnet stream");
return;
};
while let Some(result) = stream.next().await {
match result {
Ok(msg) => {
tracing::debug!("receiving message from mixnet client");
let Ok(MixnetMessage { topic, message }) = MixnetMessage::from_bytes(&msg)
else {
tracing::error!(
"failed to deserialize json received from mixnet client"
);
continue;
};
cmd_tx
.send(Command::DirectBroadcastAndRetry {
topic,
message,
retry: 0,
})
.await
.unwrap_or_else(|_| tracing::error!("could not schedule broadcast"));
}
Err(e) => {
todo!("Handle mixclient error: {e}");
}
}
}
});
let cmd_tx = commands_tx.clone();
let notify = events_tx.clone();
overwatch_handle.runtime().spawn(async move {
let mut swarm = Swarm::build(&config.inner).unwrap();
let mut mixnet_client = MixnetClient::new(config.mixnet_client, OsRng);
loop {
tokio::select! {
Some(event) = swarm.next() => {
@ -86,7 +203,7 @@ impl NetworkBackend for Libp2p {
message,
})) => {
tracing::debug!("Got message with id: {id} from peer: {peer_id}");
log_error!(events_tx.send(Event::Message(message)));
log_error!(notify.send(Event::Message(message)));
}
SwarmEvent::ConnectionEstablished {
peer_id,
@ -121,23 +238,10 @@ impl NetworkBackend for Libp2p {
log_error!(swarm.connect(peer_id, peer_addr));
}
Command::Broadcast { topic, message } => {
match swarm.broadcast(&topic, message.to_vec()) {
Ok(id) => {
tracing::debug!("broadcasted message with id: {id} tp topic: {topic}");
}
Err(e) => {
tracing::error!("failed to broadcast message to topic: {topic} {e:?}");
}
}
if swarm.is_subscribed(&topic) {
log_error!(events_tx.send(Event::Message(Message {
source: None,
data: message.into(),
sequence_number: None,
topic: Swarm::topic_hash(&topic),
})));
}
tracing::debug!("sending message to mixnet client");
let msg = MixnetMessage { topic, message };
let delay = random_delay(&config.mixnet_delay);
log_error!(mixnet_client.send(msg.as_bytes(), delay));
}
Command::Subscribe(topic) => {
tracing::debug!("subscribing to topic: {topic}");
@ -159,12 +263,18 @@ impl NetworkBackend for Libp2p {
};
log_error!(reply.send(info));
}
Command::DirectBroadcastAndRetry { topic, message, retry } => {
broadcast_and_retry(topic, message, retry, cmd_tx.clone(), &mut swarm, notify.clone()).await;
}
};
}
}
}
});
libp2p
Self {
events_tx,
commands_tx,
}
}
async fn process(&self, msg: Self::Message) {
@ -185,3 +295,75 @@ impl NetworkBackend for Libp2p {
}
}
}
fn random_delay(range: &Range<Duration>) -> Duration {
if range.start == range.end {
return range.start;
}
thread_rng().gen_range(range.start, range.end)
}
async fn broadcast_and_retry(
topic: Topic,
message: Box<[u8]>,
retry: usize,
commands_tx: mpsc::Sender<Command>,
swarm: &mut Swarm,
events_tx: broadcast::Sender<Event>,
) {
tracing::debug!("broadcasting message to topic: {topic}");
let wait = BACKOFF.pow(retry as u32);
match swarm.broadcast(&topic, message.to_vec()) {
Ok(id) => {
tracing::debug!("broadcasted message with id: {id} tp topic: {topic}");
// self-notification because libp2p doesn't do it
if swarm.is_subscribed(&topic) {
log_error!(events_tx.send(Event::Message(Message {
source: None,
data: message.into(),
sequence_number: None,
topic: Swarm::topic_hash(&topic),
})));
}
}
Err(gossipsub::PublishError::InsufficientPeers) if retry < MAX_RETRY => {
tracing::error!("failed to broadcast message to topic due to insufficient peers, trying again in {wait:?}");
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(wait)).await;
commands_tx
.send(Command::DirectBroadcastAndRetry {
topic,
message,
retry: retry + 1,
})
.await
.unwrap_or_else(|_| tracing::error!("could not schedule retry"));
});
}
Err(e) => {
tracing::error!("failed to broadcast message to topic: {topic} {e:?}");
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::random_delay;
#[test]
fn test_random_delay() {
assert_eq!(
random_delay(&(Duration::ZERO..Duration::ZERO)),
Duration::ZERO
);
let range = Duration::from_millis(10)..Duration::from_millis(100);
let delay = random_delay(&range);
assert!(range.start <= delay && delay < range.end);
}
}

View File

@ -83,12 +83,6 @@ where
}
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
tracing::debug!("Starting up...");
// this wait seems to be helpful in some cases for waku, where it reports
// to be connected to peers but does not seem to be able to send messages
// to them
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let Self {
service_state: ServiceStateHandle {
mut inbound_relay, ..

View File

@ -14,9 +14,14 @@ overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main"
nomos-core = { path = "../nomos-core" }
consensus-engine = { path = "../consensus-engine", features = ["serde"] }
nomos-mempool = { path = "../nomos-services/mempool", features = ["mock"] }
rand = "0.8"
mixnode = { path = "../nodes/mixnode" }
mixnet-node = { path = "../mixnet/node" }
mixnet-client = { path = "../mixnet/client" }
mixnet-topology = { path = "../mixnet/topology" }
# Using older versions, since `mixnet-*` crates depend on `rand` v0.7.3.
rand = "0.7.3"
rand_xoshiro = "0.4"
once_cell = "1"
rand_xoshiro = "0.6"
secp256k1 = { version = "0.26", features = ["rand"] }
waku-bindings = { version = "0.1.1", optional = true }
reqwest = { version = "0.11", features = ["json"] }
@ -27,6 +32,8 @@ tokio = "1"
futures = "0.3"
async-trait = "0.1"
fraction = "0.13"
ntest = "0.9.0"
criterion = { version = "0.5", features = ["async_tokio"] }
[[test]]
name = "test_consensus_happy_path"
@ -36,8 +43,17 @@ path = "src/tests/happy.rs"
name = "test_consensus_unhappy_path"
path = "src/tests/unhappy.rs"
[[test]]
name = "test_mixnet"
path = "src/tests/mixnet.rs"
[[bench]]
name = "mixnet"
path = "src/benches/mixnet.rs"
harness = false
[features]
metrics = ["nomos-node/metrics"]
waku = ["nomos-network/waku", "nomos-mempool/waku", "nomos-node/waku", "waku-bindings"]
libp2p = ["nomos-network/libp2p", "nomos-mempool/libp2p", "nomos-libp2p", "nomos-node/libp2p"]
libp2p = ["nomos-network/libp2p", "nomos-mempool/libp2p", "nomos-libp2p", "nomos-node/libp2p"]

View File

@ -0,0 +1,78 @@
use std::time::Duration;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use futures::StreamExt;
use mixnet_client::{MessageStream, MixnetClient, MixnetClientConfig, MixnetClientMode};
use rand::{rngs::OsRng, Rng, RngCore};
use tests::MixNode;
use tokio::time::Instant;
pub fn mixnet(c: &mut Criterion) {
c.bench_function("mixnet", |b| {
b.to_async(tokio::runtime::Runtime::new().unwrap())
.iter_custom(|iters| async move {
let (_mixnodes, mut sender_client, mut destination_stream, msg) =
setup(100 * 1024).await;
let start = Instant::now();
for _ in 0..iters {
black_box(
send_receive_message(&msg, &mut sender_client, &mut destination_stream)
.await,
);
}
start.elapsed()
})
});
}
async fn setup(msg_size: usize) -> (Vec<MixNode>, MixnetClient<OsRng>, MessageStream, Vec<u8>) {
let (mixnodes, node_configs, topology) = MixNode::spawn_nodes(3).await;
let sender_client = MixnetClient::new(
MixnetClientConfig {
mode: MixnetClientMode::Sender,
topology: topology.clone(),
connection_pool_size: 255,
},
OsRng,
);
let destination_client = MixnetClient::new(
MixnetClientConfig {
mode: MixnetClientMode::SenderReceiver(
// Connect with the MixnetNode in the exit layer
// According to the current implementation,
// one of mixnodes the exit layer always will be selected as a destination.
node_configs.last().unwrap().client_listen_address,
),
topology,
connection_pool_size: 255,
},
OsRng,
);
let destination_stream = destination_client.run().await.unwrap();
let mut msg = vec![0u8; msg_size];
rand::thread_rng().fill_bytes(&mut msg);
(mixnodes, sender_client, destination_stream, msg)
}
async fn send_receive_message<R: Rng>(
msg: &[u8],
sender_client: &mut MixnetClient<R>,
destination_stream: &mut MessageStream,
) {
let res = sender_client.send(msg.to_vec(), Duration::ZERO);
assert!(res.is_ok());
let received = destination_stream.next().await.unwrap().unwrap();
assert_eq!(msg, received.as_slice());
}
criterion_group!(
name = benches;
config = Criterion::default().sample_size(10).measurement_time(Duration::from_secs(180));
targets = mixnet
);
criterion_main!(benches);

View File

@ -1,4 +1,7 @@
mod nodes;
use mixnet_node::MixnetNodeConfig;
use mixnet_topology::MixnetTopology;
pub use nodes::MixNode;
pub use nodes::NomosNode;
use once_cell::sync::Lazy;
@ -11,7 +14,7 @@ use std::{fmt::Debug, sync::Mutex};
use fraction::Fraction;
use rand::{thread_rng, Rng};
static NET_PORT: Lazy<Mutex<u16>> = Lazy::new(|| Mutex::new(thread_rng().gen_range(8000..10000)));
static NET_PORT: Lazy<Mutex<u16>> = Lazy::new(|| Mutex::new(thread_rng().gen_range(8000, 10000)));
pub fn get_available_port() -> u16 {
let mut port = NET_PORT.lock().unwrap();
@ -30,11 +33,13 @@ pub trait Node: Sized {
fn stop(&mut self);
}
#[derive(Clone, Copy)]
#[derive(Clone)]
pub enum SpawnConfig {
Star {
n_participants: usize,
threshold: Fraction,
timeout: Duration,
mixnet_node_configs: Vec<MixnetNodeConfig>,
mixnet_topology: MixnetTopology,
},
}

105
tests/src/nodes/mixnode.rs Normal file
View File

@ -0,0 +1,105 @@
use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
process::{Child, Command, Stdio},
time::Duration,
};
use mixnet_node::{MixnetNodeConfig, PRIVATE_KEY_SIZE};
use mixnet_topology::{Layer, MixnetTopology, Node};
use rand::{thread_rng, RngCore};
use tempfile::NamedTempFile;
use crate::get_available_port;
const MIXNODE_BIN: &str = "../target/debug/mixnode";
pub struct MixNode {
child: Child,
}
impl Drop for MixNode {
fn drop(&mut self) {
self.child.kill().unwrap();
}
}
impl MixNode {
pub async fn spawn(config: MixnetNodeConfig) -> Self {
let config = mixnode::Config {
mixnode: config,
log: Default::default(),
};
let mut file = NamedTempFile::new().unwrap();
let config_path = file.path().to_owned();
serde_yaml::to_writer(&mut file, &config).unwrap();
let child = Command::new(std::env::current_dir().unwrap().join(MIXNODE_BIN))
.arg(&config_path)
.stdout(Stdio::null())
.spawn()
.unwrap();
//TODO: use a sophisticated way to wait until the node is ready
tokio::time::sleep(Duration::from_secs(1)).await;
Self { child }
}
pub async fn spawn_nodes(
num_nodes: usize,
) -> (Vec<Self>, Vec<MixnetNodeConfig>, MixnetTopology) {
let mut configs = Vec::<MixnetNodeConfig>::new();
for _ in 0..num_nodes {
let mut private_key = [0u8; PRIVATE_KEY_SIZE];
thread_rng().fill_bytes(&mut private_key);
let config = MixnetNodeConfig {
listen_address: SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
get_available_port(),
)),
client_listen_address: SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
get_available_port(),
)),
private_key,
connection_pool_size: 255,
};
configs.push(config);
}
let mut nodes = Vec::<MixNode>::new();
for config in &configs {
nodes.push(Self::spawn(config.clone()).await);
}
// We need to return configs as well, to configure mixclients accordingly
(nodes, configs.clone(), Self::build_topology(configs))
}
fn build_topology(configs: Vec<MixnetNodeConfig>) -> MixnetTopology {
// Build three empty layers first
let mut layers = vec![Layer { nodes: Vec::new() }; 3];
let mut layer_id = 0;
// Assign nodes to each layer in round-robin
for config in &configs {
let public_key = config.public_key();
layers.get_mut(layer_id).unwrap().nodes.push(Node {
address: config.listen_address,
public_key,
});
layer_id = (layer_id + 1) % layers.len();
}
// Exclude empty layers
MixnetTopology {
layers: layers
.iter()
.filter(|layer| !layer.nodes.is_empty())
.cloned()
.collect(),
}
}
}

View File

@ -1,3 +1,5 @@
mod mixnode;
mod nomos;
pub use self::mixnode::MixNode;
pub use nomos::NomosNode;

View File

@ -6,13 +6,17 @@ use std::time::Duration;
use crate::{get_available_port, Node, SpawnConfig};
use consensus_engine::overlay::{FlatOverlaySettings, RoundRobin};
use consensus_engine::NodeId;
#[cfg(feature = "libp2p")]
use mixnet_client::{MixnetClientConfig, MixnetClientMode};
use mixnet_node::MixnetNodeConfig;
use mixnet_topology::MixnetTopology;
use nomos_consensus::{CarnotInfo, CarnotSettings};
use nomos_http::backends::axum::AxumBackendSettings;
#[cfg(feature = "libp2p")]
use nomos_libp2p::{Multiaddr, SwarmConfig};
use nomos_log::{LoggerBackend, LoggerFormat};
#[cfg(feature = "libp2p")]
use nomos_network::backends::libp2p::Libp2pInfo;
use nomos_network::backends::libp2p::{Libp2pConfig, Libp2pInfo};
#[cfg(feature = "waku")]
use nomos_network::backends::waku::{WakuConfig, WakuInfo};
use nomos_network::NetworkConfig;
@ -171,6 +175,8 @@ impl Node for NomosNode {
n_participants,
threshold,
timeout,
mut mixnet_node_configs,
mixnet_topology,
} => {
let mut ids = vec![[0; 32]; n_participants];
for id in &mut ids {
@ -184,16 +190,27 @@ impl Node for NomosNode {
*id,
threshold,
timeout,
mixnet_node_configs.pop(),
mixnet_topology.clone(),
)
})
.collect::<Vec<_>>();
let mut nodes = vec![Self::spawn(configs.swap_remove(0)).await];
let listening_addr = nodes[0].get_listening_address().await;
for mut conf in configs {
#[cfg(feature = "waku")]
conf.network
.backend
.initial_peers
.push(listening_addr.clone());
#[cfg(feature = "libp2p")]
// TODO: Consider having `initial_peers` outside of `inner`, as WakuConfig does
conf.network
.backend
.inner
.initial_peers
.push(listening_addr.clone());
nodes.push(Self::spawn(conf).await);
}
nodes
@ -220,7 +237,17 @@ fn create_node_config(
private_key: [u8; 32],
threshold: Fraction,
timeout: Duration,
#[cfg(feature = "libp2p")] mixnet_node_config: Option<MixnetNodeConfig>,
#[cfg(feature = "waku")] _mixnet_node_config: Option<MixnetNodeConfig>,
#[cfg(feature = "libp2p")] mixnet_topology: MixnetTopology,
#[cfg(feature = "waku")] _mixnet_topology: MixnetTopology,
) -> Config {
#[cfg(feature = "libp2p")]
let mixnet_client_mode = match mixnet_node_config {
Some(node_config) => MixnetClientMode::SenderReceiver(node_config.client_listen_address),
None => MixnetClientMode::Sender,
};
let mut config = Config {
network: NetworkConfig {
#[cfg(feature = "waku")]
@ -229,9 +256,17 @@ fn create_node_config(
inner: Default::default(),
},
#[cfg(feature = "libp2p")]
backend: SwarmConfig {
initial_peers: vec![],
..Default::default()
backend: Libp2pConfig {
inner: SwarmConfig {
initial_peers: vec![],
..Default::default()
},
mixnet_client: MixnetClientConfig {
mode: mixnet_client_mode,
topology: mixnet_topology,
connection_pool_size: 255,
},
mixnet_delay: Duration::ZERO..Duration::from_millis(10),
},
},
consensus: CarnotSettings {
@ -265,7 +300,7 @@ fn create_node_config(
}
#[cfg(feature = "libp2p")]
{
config.network.backend.port = get_available_port();
config.network.backend.inner.port = get_available_port();
}
config

View File

@ -3,7 +3,7 @@ use fraction::{Fraction, One};
use futures::stream::{self, StreamExt};
use std::collections::HashSet;
use std::time::Duration;
use tests::{Node, NomosNode, SpawnConfig};
use tests::{MixNode, Node, NomosNode, SpawnConfig};
const TARGET_VIEW: View = View::new(20);
@ -48,10 +48,13 @@ async fn happy_test(nodes: Vec<NomosNode>) {
#[tokio::test]
async fn two_nodes_happy() {
let (_mixnodes, mixnet_node_configs, mixnet_topology) = MixNode::spawn_nodes(2).await;
let nodes = NomosNode::spawn_nodes(SpawnConfig::Star {
n_participants: 2,
threshold: Fraction::one(),
timeout: Duration::from_secs(10),
mixnet_node_configs,
mixnet_topology,
})
.await;
happy_test(nodes).await;
@ -59,10 +62,13 @@ async fn two_nodes_happy() {
#[tokio::test]
async fn ten_nodes_happy() {
let (_mixnodes, mixnet_node_configs, mixnet_topology) = MixNode::spawn_nodes(3).await;
let nodes = NomosNode::spawn_nodes(SpawnConfig::Star {
n_participants: 10,
threshold: Fraction::one(),
timeout: Duration::from_secs(10),
mixnet_node_configs,
mixnet_topology,
})
.await;
happy_test(nodes).await;

135
tests/src/tests/mixnet.rs Normal file
View File

@ -0,0 +1,135 @@
use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
time::Duration,
};
use futures::{Stream, StreamExt};
use mixnet_client::{MixnetClient, MixnetClientConfig, MixnetClientError, MixnetClientMode};
use mixnet_node::{MixnetNode, MixnetNodeConfig};
use mixnet_topology::{Layer, MixnetTopology, Node};
use rand::{rngs::OsRng, RngCore};
use tests::get_available_port;
#[tokio::test]
// Set timeout since the test won't stop even if mixnodes (spawned asynchronously) panic.
#[ntest::timeout(5000)]
async fn mixnet() {
let (topology, mut destination_stream) = run_nodes_and_destination_client().await;
let mut msg = [0u8; 100 * 1024];
rand::thread_rng().fill_bytes(&mut msg);
let mut sender_client = MixnetClient::new(
MixnetClientConfig {
mode: MixnetClientMode::Sender,
topology: topology.clone(),
connection_pool_size: 255,
},
OsRng,
);
let res = sender_client.send(msg.to_vec(), Duration::from_millis(500));
assert!(res.is_ok());
let received = destination_stream.next().await.unwrap().unwrap();
assert_eq!(msg, received.as_slice());
}
async fn run_nodes_and_destination_client() -> (
MixnetTopology,
impl Stream<Item = Result<Vec<u8>, MixnetClientError>> + Send,
) {
let config1 = MixnetNodeConfig {
listen_address: SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
get_available_port(),
)),
client_listen_address: SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
get_available_port(),
)),
..Default::default()
};
let config2 = MixnetNodeConfig {
listen_address: SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
get_available_port(),
)),
client_listen_address: SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
get_available_port(),
)),
..Default::default()
};
let config3 = MixnetNodeConfig {
listen_address: SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
get_available_port(),
)),
client_listen_address: SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
get_available_port(),
)),
..Default::default()
};
let mixnode1 = MixnetNode::new(config1.clone());
let mixnode2 = MixnetNode::new(config2.clone());
let mixnode3 = MixnetNode::new(config3.clone());
let topology = MixnetTopology {
layers: vec![
Layer {
nodes: vec![Node {
address: config1.listen_address,
public_key: mixnode1.public_key(),
}],
},
Layer {
nodes: vec![Node {
address: config2.listen_address,
public_key: mixnode2.public_key(),
}],
},
Layer {
nodes: vec![Node {
address: config3.listen_address,
public_key: mixnode3.public_key(),
}],
},
],
};
// Run all MixnetNodes
tokio::spawn(async move {
let res = mixnode1.run().await;
assert!(res.is_ok());
});
tokio::spawn(async move {
let res = mixnode2.run().await;
assert!(res.is_ok());
});
tokio::spawn(async move {
let res = mixnode3.run().await;
assert!(res.is_ok());
});
// Wait until mixnodes are ready
// TODO: use a more sophisticated way
tokio::time::sleep(Duration::from_secs(1)).await;
// Run a MixnetClient only for the MixnetNode in the exit layer.
// According to the current implementation,
// one of mixnodes the exit layer always will be selected as a destination.
let client = MixnetClient::new(
MixnetClientConfig {
mode: MixnetClientMode::SenderReceiver(config3.client_listen_address),
topology: topology.clone(),
connection_pool_size: 255,
},
OsRng,
);
let client_stream = client.run().await.unwrap();
(topology, client_stream)
}

View File

@ -2,16 +2,19 @@ use consensus_engine::View;
use fraction::Fraction;
use futures::stream::{self, StreamExt};
use std::collections::HashSet;
use tests::{Node, NomosNode, SpawnConfig};
use tests::{MixNode, Node, NomosNode, SpawnConfig};
const TARGET_VIEW: View = View::new(20);
#[tokio::test]
async fn ten_nodes_one_down() {
let (_mixnodes, mixnet_node_configs, mixnet_topology) = MixNode::spawn_nodes(3).await;
let mut nodes = NomosNode::spawn_nodes(SpawnConfig::Star {
n_participants: 10,
threshold: Fraction::new(9u32, 10u32),
timeout: std::time::Duration::from_secs(5),
mixnet_node_configs,
mixnet_topology,
})
.await;
let mut failed_node = nodes.pop().unwrap();