From cb86528a4a18c1464ec54809341b1883550e1057 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Thu, 17 Oct 2024 00:19:20 +0900 Subject: [PATCH] Mix: remove all of the previous mixnet stuff (#822) --- .github/workflows/build-test.yml | 6 +- Cargo.toml | 1 - mixnet/Cargo.toml | 22 -- mixnet/src/address.rs | 53 ---- mixnet/src/client.rs | 206 ------------- mixnet/src/crypto.rs | 6 - mixnet/src/error.rs | 34 --- mixnet/src/fragment.rs | 280 ------------------ mixnet/src/lib.rs | 21 -- mixnet/src/node.rs | 191 ------------ mixnet/src/packet.rs | 229 -------------- mixnet/src/poisson.rs | 93 ------ mixnet/src/topology.rs | 199 ------------- nodes/nomos-executor/Cargo.toml | 1 - nodes/nomos-node/Cargo.toml | 1 - nodes/nomos-node/config.yaml | 13 - nodes/nomos-node/src/config.rs | 2 - nomos-libp2p/Cargo.toml | 1 - nomos-libp2p/src/lib.rs | 80 +---- .../data-availability/tests/Cargo.toml | 1 - .../data-availability/tests/src/lib.rs | 9 - nomos-services/network/Cargo.toml | 2 - .../network/src/backends/libp2p/command.rs | 7 +- .../network/src/backends/libp2p/config.rs | 5 - .../network/src/backends/libp2p/mixnet.rs | 201 ------------- .../network/src/backends/libp2p/mod.rs | 36 --- .../network/src/backends/libp2p/swarm.rs | 58 +--- tests/Cargo.toml | 2 - tests/src/nodes/nomos.rs | 84 ------ 29 files changed, 7 insertions(+), 1837 deletions(-) delete mode 100644 mixnet/Cargo.toml delete mode 100644 mixnet/src/address.rs delete mode 100644 mixnet/src/client.rs delete mode 100644 mixnet/src/crypto.rs delete mode 100644 mixnet/src/error.rs delete mode 100644 mixnet/src/fragment.rs delete mode 100644 mixnet/src/lib.rs delete mode 100644 mixnet/src/node.rs delete mode 100644 mixnet/src/packet.rs delete mode 100644 mixnet/src/poisson.rs delete mode 100644 mixnet/src/topology.rs delete mode 100644 nomos-services/network/src/backends/libp2p/mixnet.rs diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index de5354bf..3dfecfb8 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -16,7 +16,7 @@ jobs: strategy: fail-fast: true matrix: - feature: [ libp2p, "libp2p,mixnet" ] + feature: [ libp2p ] steps: - uses: actions/checkout@v2 with: @@ -47,7 +47,7 @@ jobs: strategy: fail-fast: false # all OSes should be tested even if one fails (default: true) matrix: - feature: [ libp2p, "libp2p,mixnet" ] + feature: [ libp2p ] os: [ self-hosted, macos-latest ] # drop windows for now as risc0 does not support it runs-on: ${{ matrix.os }} steps: @@ -132,7 +132,7 @@ jobs: runs-on: self-hosted strategy: matrix: - feature: [ libp2p, "libp2p,mixnet" ] + feature: [ libp2p ] steps: - uses: actions/checkout@v2 with: diff --git a/Cargo.toml b/Cargo.toml index 72322169..fd894cd8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,6 @@ members = [ "nomos-utils", "nodes/nomos-node", "nodes/nomos-executor", - "mixnet", "consensus/carnot-engine", "consensus/cryptarchia-engine", "ledger/cryptarchia-ledger", diff --git a/mixnet/Cargo.toml b/mixnet/Cargo.toml deleted file mode 100644 index 7f00e182..00000000 --- a/mixnet/Cargo.toml +++ /dev/null @@ -1,22 +0,0 @@ -[package] -name = "mixnet" -version = "0.0.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -rand = "0.8" -rand_distr = "0.4" -nomos-utils = { path = "../nomos-utils" } -thiserror = "1.0.57" -tokio = { version = "1.36.0", features = ["sync"] } -serde = { version = "1.0.197", features = ["derive"] } -sphinx-packet = "0.1.0" -nym-sphinx-addressing = { package = "nym-sphinx-addressing", git = "https://github.com/nymtech/nym", tag = "v1.1.22" } -tracing = "0.1.40" -uuid = { version = "1.7.0", features = ["v4"] } -futures = "0.3" - -[dev-dependencies] -tokio = { version = "1.36.0", features = ["test-util"] } diff --git a/mixnet/src/address.rs b/mixnet/src/address.rs deleted file mode 100644 index faae16d8..00000000 --- a/mixnet/src/address.rs +++ /dev/null @@ -1,53 +0,0 @@ -use std::net::SocketAddr; - -use nym_sphinx_addressing::nodes::NymNodeRoutingAddress; -use serde::{Deserialize, Serialize}; - -use crate::error::MixnetError; - -/// Represents an address of mix node. -/// -/// This just contains a single [`SocketAddr`], but has conversion functions -/// for various address types defined in the `sphinx-packet` crate. -#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq)] -pub struct NodeAddress(SocketAddr); - -impl From for NodeAddress { - fn from(address: SocketAddr) -> Self { - Self(address) - } -} - -impl From for SocketAddr { - fn from(address: NodeAddress) -> Self { - address.0 - } -} - -impl TryInto for NodeAddress { - type Error = MixnetError; - - fn try_into(self) -> Result { - Ok(NymNodeRoutingAddress::from(SocketAddr::from(self)).try_into()?) - } -} - -impl TryFrom for NodeAddress { - type Error = MixnetError; - - fn try_from(value: sphinx_packet::route::NodeAddressBytes) -> Result { - Ok(Self::from(SocketAddr::from( - NymNodeRoutingAddress::try_from(value)?, - ))) - } -} - -impl TryFrom for NodeAddress { - type Error = MixnetError; - - fn try_from(value: sphinx_packet::route::DestinationAddressBytes) -> Result { - Self::try_from(sphinx_packet::route::NodeAddressBytes::from_bytes( - value.as_bytes(), - )) - } -} diff --git a/mixnet/src/client.rs b/mixnet/src/client.rs deleted file mode 100644 index 7f7cd0b5..00000000 --- a/mixnet/src/client.rs +++ /dev/null @@ -1,206 +0,0 @@ -use std::{ - collections::VecDeque, - num::NonZeroU8, - pin::Pin, - task::{Context, Poll}, -}; - -use futures::{Future, Stream}; -use rand::rngs::OsRng; -use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc; - -use crate::{error::MixnetError, packet::Packet, poisson::Poisson, topology::MixnetTopology}; - -/// Mix client implementation that is used to schedule messages to be sent to the mixnet. -/// Messages inserted to the [`MessageQueue`] are scheduled according to the Poisson interals -/// and returns from [`MixClient.next()`] when it is ready to be sent to the mixnet. -/// If there is no messages inserted to the [`MessageQueue`], cover packets are generated and -/// returned from [`MixClient.next()`]. -pub struct MixClient { - config: MixClientConfig, - poisson: Poisson, - message_queue: mpsc::Receiver>, - real_packet_queue: VecDeque, - delay: Option>>, -} - -/// Mix client configuration -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct MixClientConfig { - /// Mixnet topology - pub topology: MixnetTopology, - /// Poisson rate for packet emissions (per minute) - pub emission_rate_per_min: f64, - /// Packet redundancy for passive retransmission - pub redundancy: NonZeroU8, -} - -const MESSAGE_QUEUE_SIZE: usize = 256; - -/// Queue for sending messages to [`MixClient`] -pub type MessageQueue = mpsc::Sender>; - -impl MixClient { - /// Creates a [`MixClient`] and a [`MessageQueue`]. - /// - /// This returns [`MixnetError`] if the given `config` is invalid. - pub fn new(config: MixClientConfig) -> Result<(Self, MessageQueue), MixnetError> { - let poisson = Poisson::new(config.emission_rate_per_min)?; - let (tx, rx) = mpsc::channel(MESSAGE_QUEUE_SIZE); - - Ok(( - Self { - config, - poisson, - message_queue: rx, - real_packet_queue: VecDeque::new(), - delay: None, - }, - tx, - )) - } -} - -impl Stream for MixClient { - type Item = Packet; - - /// Returns a next [`Packet`] to be emitted, if it exists and the Poisson timer is done. - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.delay.is_none() { - // We've never set an initial delay. Let's do it now. - cx.waker().wake_by_ref(); - - self.delay = Some(Box::pin(tokio::time::sleep( - self.poisson.interval(&mut OsRng), - ))); - return Poll::Pending; - } - - match self.delay.as_mut().unwrap().as_mut().poll(cx) { - Poll::Pending => { - // The delay hasn't elapsed yet. - // The current task is automatically scheduled to be woken up once the timer elapses, - // thanks to the `tokio::time::Sleep.poll(cx)`. - Poll::Pending - } - Poll::Ready(_) => { - // The delay has elapsed. Let's reset the delay and return the next packet. - let next_interval = self.poisson.interval(&mut OsRng); - let delay = self.delay.as_mut().unwrap(); - let next_deadline = delay.deadline() + next_interval; - delay.as_mut().reset(next_deadline); - - match self.next_packet() { - Ok(packet) => Poll::Ready(Some(packet)), - Err(e) => { - tracing::error!( - "failed to find a next packet to emit. skipping to the next turn: {e}" - ); - Poll::Pending - } - } - } - } - } -} - -impl MixClient { - const DROP_COVER_MSG: &'static [u8] = b"drop cover"; - - // Returns either a real packet or a drop cover packet. - fn next_packet(&mut self) -> Result { - // If there is any redundant real packet scheduled, return it. - if let Some(packet) = self.real_packet_queue.pop_front() { - return Ok(packet); - } - - match self.message_queue.try_recv() { - Ok(msg) => { - // If there is any message received, build real packets out of it and - // schedule them in the queue. - for packet in Packet::build_real(msg, &self.config.topology)? { - for _ in 0..self.config.redundancy.get() { - self.real_packet_queue.push_back(packet.clone()); - } - } - Ok(self - .real_packet_queue - .pop_front() - .expect("real packet queue should not be empty")) - } - Err(_) => { - // If no message received, generate and return a drop cover packet. - let mut packets = Packet::build_drop_cover( - Vec::from(Self::DROP_COVER_MSG), - &self.config.topology, - )?; - Ok(packets.pop().expect("drop cover should not be empty")) - } - } - } -} - -#[cfg(test)] -mod tests { - use std::{num::NonZeroU8, time::Instant}; - - use futures::StreamExt; - - use crate::{ - client::MixClientConfig, - topology::{ - tests::{gen_entropy, gen_mixnodes}, - MixnetTopology, - }, - }; - - use super::MixClient; - - #[tokio::test] - async fn poisson_emission() { - let emission_rate_per_min = 60.0; - let (mut client, _) = MixClient::new(MixClientConfig { - topology: MixnetTopology::new(gen_mixnodes(10), 3, 2, gen_entropy()).unwrap(), - emission_rate_per_min, - redundancy: NonZeroU8::new(3).unwrap(), - }) - .unwrap(); - - let mut ts = Instant::now(); - let mut intervals = Vec::new(); - for _ in 0..30 { - assert!(client.next().await.is_some()); - let now = Instant::now(); - intervals.push(now - ts); - ts = now; - } - - let avg_sec = intervals.iter().map(|d| d.as_secs()).sum::() / intervals.len() as u64; - let expected_avg_sec = (60.0 / emission_rate_per_min) as u64; - assert!( - avg_sec.abs_diff(expected_avg_sec) <= 1, - "{avg_sec} -{expected_avg_sec}" - ); - } - - #[tokio::test] - async fn real_packet_emission() { - let (mut client, msg_queue) = MixClient::new(MixClientConfig { - topology: MixnetTopology::new(gen_mixnodes(10), 3, 2, gen_entropy()).unwrap(), - emission_rate_per_min: 360.0, - redundancy: NonZeroU8::new(3).unwrap(), - }) - .unwrap(); - - msg_queue.send("hello".as_bytes().into()).await.unwrap(); - - // Check if the next 3 packets are the same, according to the redundancy - let packet = client.next().await.unwrap(); - assert_eq!(packet, client.next().await.unwrap()); - assert_eq!(packet, client.next().await.unwrap()); - - // Check if the next packet is different (drop cover) - assert_ne!(packet, client.next().await.unwrap()); - } -} diff --git a/mixnet/src/crypto.rs b/mixnet/src/crypto.rs deleted file mode 100644 index 158f777a..00000000 --- a/mixnet/src/crypto.rs +++ /dev/null @@ -1,6 +0,0 @@ -use sphinx_packet::crypto::{PrivateKey, PublicKey, PRIVATE_KEY_SIZE, PUBLIC_KEY_SIZE}; - -/// Converts a mixnode private key to a public key -pub fn public_key_from(private_key: [u8; PRIVATE_KEY_SIZE]) -> [u8; PUBLIC_KEY_SIZE] { - *PublicKey::from(&PrivateKey::from(private_key)).as_bytes() -} diff --git a/mixnet/src/error.rs b/mixnet/src/error.rs deleted file mode 100644 index 627dde28..00000000 --- a/mixnet/src/error.rs +++ /dev/null @@ -1,34 +0,0 @@ -/// Mixnet Errors -#[derive(thiserror::Error, Debug)] -pub enum MixnetError { - /// Invalid topology size - #[error("invalid mixnet topology size")] - InvalidTopologySize, - /// Invalid packet flag - #[error("invalid packet flag")] - InvalidPacketFlag, - /// Invalid fragment header - #[error("invalid fragment header")] - InvalidFragmentHeader, - /// Invalid fragment set ID - #[error("invalid fragment set ID: {0}")] - InvalidFragmentSetId(#[from] uuid::Error), - /// Invalid fragment ID - #[error("invalid fragment ID")] - InvalidFragmentId, - /// Message too long - #[error("message too long: {0} bytes")] - MessageTooLong(usize), - /// Invalid message - #[error("invalid message")] - InvalidMessage, - /// Node address error - #[error("node address error: {0}")] - NodeAddressError(#[from] nym_sphinx_addressing::nodes::NymNodeRoutingAddressError), - /// Sphinx packet error - #[error("sphinx packet error: {0}")] - SphinxPacketError(#[from] sphinx_packet::Error), - /// Exponential distribution error - #[error("exponential distribution error: {0}")] - ExponentialError(#[from] rand_distr::ExpError), -} diff --git a/mixnet/src/fragment.rs b/mixnet/src/fragment.rs deleted file mode 100644 index 5799eefc..00000000 --- a/mixnet/src/fragment.rs +++ /dev/null @@ -1,280 +0,0 @@ -use std::collections::HashMap; - -use sphinx_packet::{constants::PAYLOAD_SIZE, payload::PAYLOAD_OVERHEAD_SIZE}; -use uuid::Uuid; - -use crate::error::MixnetError; - -pub(crate) struct FragmentSet(Vec); - -impl FragmentSet { - const MAX_PLAIN_PAYLOAD_SIZE: usize = PAYLOAD_SIZE - PAYLOAD_OVERHEAD_SIZE; - const CHUNK_SIZE: usize = Self::MAX_PLAIN_PAYLOAD_SIZE - FragmentHeader::SIZE; - - pub(crate) fn new(msg: &[u8]) -> Result { - // For now, we don't support more than `u8::MAX + 1` fragments. - // If needed, we can devise the FragmentSet chaining to support larger messages, like Nym. - let last_fragment_id = FragmentId::try_from(Self::num_chunks(msg) - 1) - .map_err(|_| MixnetError::MessageTooLong(msg.len()))?; - let set_id = FragmentSetId::new(); - - Ok(FragmentSet( - msg.chunks(Self::CHUNK_SIZE) - .enumerate() - .map(|(i, chunk)| Fragment { - header: FragmentHeader { - set_id, - last_fragment_id, - fragment_id: FragmentId::try_from(i) - .expect("i is always in the right range"), - }, - body: Vec::from(chunk), - }) - .collect(), - )) - } - - fn num_chunks(msg: &[u8]) -> usize { - msg.len().div_ceil(Self::CHUNK_SIZE) - } -} - -impl AsRef> for FragmentSet { - fn as_ref(&self) -> &Vec { - &self.0 - } -} - -#[derive(PartialEq, Eq, Debug, Clone)] -pub(crate) struct Fragment { - header: FragmentHeader, - body: Vec, -} - -impl Fragment { - pub(crate) fn bytes(&self) -> Vec { - let mut out = Vec::with_capacity(FragmentHeader::SIZE + self.body.len()); - out.extend(self.header.bytes()); - out.extend(&self.body); - out - } - - pub(crate) fn from_bytes(value: &[u8]) -> Result { - Ok(Self { - header: FragmentHeader::from_bytes(&value[0..FragmentHeader::SIZE])?, - body: value[FragmentHeader::SIZE..].to_vec(), - }) - } -} - -#[derive(PartialEq, Eq, Hash, Debug, Clone, Copy)] -struct FragmentSetId(Uuid); - -impl FragmentSetId { - const SIZE: usize = 16; - - fn new() -> Self { - Self(Uuid::new_v4()) - } -} - -#[derive(PartialEq, Eq, Hash, Debug, Clone, Copy)] -struct FragmentId(u8); - -impl FragmentId { - const SIZE: usize = std::mem::size_of::(); -} - -impl TryFrom for FragmentId { - type Error = MixnetError; - - fn try_from(id: usize) -> Result { - if id > u8::MAX as usize { - return Err(MixnetError::InvalidFragmentId); - } - Ok(Self(id as u8)) - } -} - -impl From for usize { - fn from(id: FragmentId) -> Self { - id.0 as usize - } -} - -#[derive(PartialEq, Eq, Debug, Clone)] -struct FragmentHeader { - set_id: FragmentSetId, - last_fragment_id: FragmentId, - fragment_id: FragmentId, -} - -impl FragmentHeader { - const SIZE: usize = FragmentSetId::SIZE + 2 * FragmentId::SIZE; - - fn bytes(&self) -> [u8; Self::SIZE] { - let mut out = [0u8; Self::SIZE]; - out[0..FragmentSetId::SIZE].copy_from_slice(self.set_id.0.as_bytes()); - out[FragmentSetId::SIZE] = self.last_fragment_id.0; - out[FragmentSetId::SIZE + FragmentId::SIZE] = self.fragment_id.0; - out - } - - fn from_bytes(value: &[u8]) -> Result { - if value.len() != Self::SIZE { - return Err(MixnetError::InvalidFragmentHeader); - } - - Ok(Self { - set_id: FragmentSetId(Uuid::from_slice(&value[0..FragmentSetId::SIZE])?), - last_fragment_id: FragmentId(value[FragmentSetId::SIZE]), - fragment_id: FragmentId(value[FragmentSetId::SIZE + FragmentId::SIZE]), - }) - } -} - -pub struct MessageReconstructor { - fragment_sets: HashMap, -} - -impl MessageReconstructor { - pub fn new() -> Self { - Self { - fragment_sets: HashMap::new(), - } - } - - /// Adds a fragment to the reconstructor and tries to reconstruct a message from the fragment set. - /// This returns `None` if the message has not been reconstructed yet. - pub fn add_and_reconstruct(&mut self, fragment: Fragment) -> Option> { - let set_id = fragment.header.set_id; - let reconstructed_msg = self - .fragment_sets - .entry(set_id) - .or_insert(FragmentSetReconstructor::new( - fragment.header.last_fragment_id, - )) - .add(fragment) - .try_reconstruct_message()?; - // A message has been reconstructed completely from the fragment set. - // Delete the fragment set from the reconstructor. - self.fragment_sets.remove(&set_id); - Some(reconstructed_msg) - } -} - -struct FragmentSetReconstructor { - last_fragment_id: FragmentId, - fragments: HashMap, - // For mem optimization, accumulates the expected message size - // whenever a new fragment is added to the `fragments`. - message_size: usize, -} - -impl FragmentSetReconstructor { - fn new(last_fragment_id: FragmentId) -> Self { - Self { - last_fragment_id, - fragments: HashMap::new(), - message_size: 0, - } - } - - fn add(&mut self, fragment: Fragment) -> &mut Self { - self.message_size += fragment.body.len(); - if let Some(old_fragment) = self.fragments.insert(fragment.header.fragment_id, fragment) { - // In the case when a new fragment replaces the old one, adjust the `meesage_size`. - // e.g. The same fragment has been received multiple times. - self.message_size -= old_fragment.body.len(); - } - self - } - - /// Merges all fragments gathered if possible - fn try_reconstruct_message(&self) -> Option> { - (self.fragments.len() - 1 == self.last_fragment_id.into()).then(|| { - let mut msg = Vec::with_capacity(self.message_size); - for id in 0..=self.last_fragment_id.0 { - msg.extend(&self.fragments.get(&FragmentId(id)).unwrap().body); - } - msg - }) - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashSet; - - use rand::RngCore; - - use super::*; - - #[test] - fn fragment_header() { - let header = FragmentHeader { - set_id: FragmentSetId::new(), - last_fragment_id: FragmentId(19), - fragment_id: FragmentId(0), - }; - let bz = header.bytes(); - assert_eq!(FragmentHeader::SIZE, bz.len()); - assert_eq!(header, FragmentHeader::from_bytes(bz.as_slice()).unwrap()); - } - - #[test] - fn fragment() { - let fragment = Fragment { - header: FragmentHeader { - set_id: FragmentSetId::new(), - last_fragment_id: FragmentId(19), - fragment_id: FragmentId(0), - }, - body: vec![1, 2, 3, 4], - }; - let bz = fragment.bytes(); - assert_eq!(FragmentHeader::SIZE + fragment.body.len(), bz.len()); - assert_eq!(fragment, Fragment::from_bytes(bz.as_slice()).unwrap()); - } - - #[test] - fn fragment_set() { - let mut msg = vec![0u8; FragmentSet::CHUNK_SIZE * 3 + FragmentSet::CHUNK_SIZE / 2]; - rand::thread_rng().fill_bytes(&mut msg); - - assert_eq!(4, FragmentSet::num_chunks(&msg)); - - let set = FragmentSet::new(&msg).unwrap(); - assert_eq!(4, set.as_ref().iter().len()); - assert_eq!( - 1, - HashSet::::from_iter( - set.as_ref().iter().map(|fragment| fragment.header.set_id) - ) - .len() - ); - set.as_ref() - .iter() - .enumerate() - .for_each(|(i, fragment)| assert_eq!(i, fragment.header.fragment_id.0 as usize)); - } - - #[test] - fn message_reconstructor() { - let mut msg = vec![0u8; FragmentSet::CHUNK_SIZE * 2]; - rand::thread_rng().fill_bytes(&mut msg); - - let set = FragmentSet::new(&msg).unwrap(); - - let mut reconstructor = MessageReconstructor::new(); - let mut fragments = set.as_ref().iter(); - assert_eq!( - None, - reconstructor.add_and_reconstruct(fragments.next().unwrap().clone()) - ); - assert_eq!( - Some(msg), - reconstructor.add_and_reconstruct(fragments.next().unwrap().clone()) - ); - } -} diff --git a/mixnet/src/lib.rs b/mixnet/src/lib.rs deleted file mode 100644 index 3fbf2712..00000000 --- a/mixnet/src/lib.rs +++ /dev/null @@ -1,21 +0,0 @@ -//! Mixnet -#![deny(missing_docs, warnings)] -#![forbid(unsafe_code)] - -/// Mix node address -pub mod address; -/// Mix client -pub mod client; -/// Mixnet cryptography -pub mod crypto; -/// Mixnet errors -pub mod error; -mod fragment; -/// Mix node -pub mod node; -/// Mix packet -pub mod packet; -/// Poisson distribution -mod poisson; -/// Mixnet topology -pub mod topology; diff --git a/mixnet/src/node.rs b/mixnet/src/node.rs deleted file mode 100644 index 5bd6903a..00000000 --- a/mixnet/src/node.rs +++ /dev/null @@ -1,191 +0,0 @@ -use rand::rngs::OsRng; -use serde::{Deserialize, Serialize}; -use sphinx_packet::crypto::{PrivateKey, PRIVATE_KEY_SIZE}; -use tokio::sync::mpsc; - -use crate::{ - error::MixnetError, - fragment::{Fragment, MessageReconstructor}, - packet::{Message, Packet, PacketBody}, - poisson::Poisson, -}; - -/// Mix node implementation that returns Sphinx packets which needs to be forwarded to next mix nodes, -/// or messages reconstructed from Sphinx packets delivered through all mix layers. -pub struct MixNode { - output_rx: mpsc::UnboundedReceiver, -} - -struct MixNodeRunner { - _config: MixNodeConfig, - encryption_private_key: PrivateKey, - poisson: Poisson, - packet_queue: mpsc::Receiver, - message_reconstructor: MessageReconstructor, - output_tx: mpsc::UnboundedSender, -} - -/// Mix node configuration -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct MixNodeConfig { - /// Private key for decrypting Sphinx packets - pub encryption_private_key: [u8; PRIVATE_KEY_SIZE], - /// Poisson delay rate per minutes - pub delay_rate_per_min: f64, -} - -const PACKET_QUEUE_SIZE: usize = 256; - -/// Queue for sending packets to [`MixNode`] -pub type PacketQueue = mpsc::Sender; - -impl MixNode { - /// Creates a [`MixNode`] and a [`PacketQueue`]. - /// - /// This returns [`MixnetError`] if the given `config` is invalid. - pub fn new(config: MixNodeConfig) -> Result<(Self, PacketQueue), MixnetError> { - let encryption_private_key = PrivateKey::from(config.encryption_private_key); - let poisson = Poisson::new(config.delay_rate_per_min)?; - let (packet_tx, packet_rx) = mpsc::channel(PACKET_QUEUE_SIZE); - let (output_tx, output_rx) = mpsc::unbounded_channel(); - - let mixnode_runner = MixNodeRunner { - _config: config, - encryption_private_key, - poisson, - packet_queue: packet_rx, - message_reconstructor: MessageReconstructor::new(), - output_tx, - }; - tokio::spawn(mixnode_runner.run()); - - Ok((Self { output_rx }, packet_tx)) - } - - /// Returns a next `[Output]` to be emitted, if it exists and the Poisson delay is done (if necessary). - pub async fn next(&mut self) -> Option { - self.output_rx.recv().await - } -} - -impl MixNodeRunner { - async fn run(mut self) { - loop { - if let Some(packet) = self.packet_queue.recv().await { - if let Err(e) = self.process_packet(packet) { - tracing::error!("failed to process packet. skipping it: {e}"); - } - } - } - } - - fn process_packet(&mut self, packet: PacketBody) -> Result<(), MixnetError> { - match packet { - PacketBody::SphinxPacket(packet) => self.process_sphinx_packet(packet.as_ref()), - PacketBody::Fragment(fragment) => self.process_fragment(fragment.as_ref()), - } - } - - fn process_sphinx_packet(&self, packet: &[u8]) -> Result<(), MixnetError> { - let output = Output::Forward(PacketBody::process_sphinx_packet( - packet, - &self.encryption_private_key, - )?); - let delay = self.poisson.interval(&mut OsRng); - let output_tx = self.output_tx.clone(); - tokio::spawn(async move { - tokio::time::sleep(delay).await; - // output_tx is always expected to be not closed/dropped. - output_tx.send(output).unwrap(); - }); - Ok(()) - } - - fn process_fragment(&mut self, fragment: &[u8]) -> Result<(), MixnetError> { - if let Some(msg) = self - .message_reconstructor - .add_and_reconstruct(Fragment::from_bytes(fragment)?) - { - match Message::from_bytes(&msg)? { - Message::Real(msg) => { - let output = Output::ReconstructedMessage(msg.into_boxed_slice()); - self.output_tx - .send(output) - .expect("output channel shouldn't be closed"); - } - Message::DropCover(_) => { - tracing::debug!("Drop cover message has been reconstructed. Dropping it..."); - } - } - } - Ok(()) - } -} - -/// Output that [`MixNode::next`] returns. -#[derive(Debug, PartialEq, Eq)] -pub enum Output { - /// Packet to be forwarded to the next mix node - Forward(Packet), - /// Message reconstructed from [`Packet`]s - ReconstructedMessage(Box<[u8]>), -} - -#[cfg(test)] -mod tests { - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - - use sphinx_packet::crypto::PublicKey; - - use crate::{ - packet::Packet, - topology::{tests::gen_entropy, MixNodeInfo, MixnetTopology}, - }; - - use super::*; - - #[tokio::test] - async fn mixnode() { - let encryption_private_key = PrivateKey::new(); - let node_info = MixNodeInfo::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1000u16).into(), - *PublicKey::from(&encryption_private_key).as_bytes(), - ) - .unwrap(); - - let topology = MixnetTopology::new( - (0..2).map(|_| node_info.clone()).collect(), - 2, - 1, - gen_entropy(), - ) - .unwrap(); - let (mut mixnode, packet_queue) = MixNode::new(MixNodeConfig { - encryption_private_key: encryption_private_key.to_bytes(), - delay_rate_per_min: 60.0, - }) - .unwrap(); - - let msg = "hello".as_bytes().to_vec(); - let packets = Packet::build_real(msg.clone(), &topology).unwrap(); - let num_packets = packets.len(); - - for packet in packets.into_iter() { - packet_queue.send(packet.body()).await.unwrap(); - } - - for _ in 0..num_packets { - match mixnode.next().await.unwrap() { - Output::Forward(packet_to) => { - packet_queue.send(packet_to.body()).await.unwrap(); - } - Output::ReconstructedMessage(_) => unreachable!(), - } - } - - assert_eq!( - Output::ReconstructedMessage(msg.into_boxed_slice()), - mixnode.next().await.unwrap() - ); - } -} diff --git a/mixnet/src/packet.rs b/mixnet/src/packet.rs deleted file mode 100644 index 3e201ed0..00000000 --- a/mixnet/src/packet.rs +++ /dev/null @@ -1,229 +0,0 @@ -use std::io; - -use futures::{AsyncRead, AsyncReadExt}; -use sphinx_packet::{crypto::PrivateKey, header::delays::Delay}; - -use crate::{ - address::NodeAddress, - error::MixnetError, - fragment::{Fragment, FragmentSet}, - topology::MixnetTopology, -}; - -/// A packet to be sent through the mixnet -#[derive(Clone, PartialEq, Eq, Debug)] -pub struct Packet { - address: NodeAddress, - body: PacketBody, -} - -impl Packet { - fn new(processed_packet: sphinx_packet::ProcessedPacket) -> Result { - match processed_packet { - sphinx_packet::ProcessedPacket::ForwardHop(packet, addr, _) => Ok(Packet { - address: addr.try_into()?, - body: PacketBody::from(packet.as_ref()), - }), - sphinx_packet::ProcessedPacket::FinalHop(addr, _, payload) => Ok(Packet { - address: addr.try_into()?, - body: PacketBody::try_from(payload)?, - }), - } - } - - pub(crate) fn build_real( - msg: Vec, - topology: &MixnetTopology, - ) -> Result, MixnetError> { - Self::build(Message::Real(msg), topology) - } - - pub(crate) fn build_drop_cover( - msg: Vec, - topology: &MixnetTopology, - ) -> Result, MixnetError> { - Self::build(Message::DropCover(msg), topology) - } - - fn build(msg: Message, topology: &MixnetTopology) -> Result, MixnetError> { - let destination = topology.choose_destination(); - - let fragment_set = FragmentSet::new(&msg.bytes())?; - let mut packets = Vec::with_capacity(fragment_set.as_ref().len()); - for fragment in fragment_set.as_ref().iter() { - let route = topology.gen_route(); - if route.is_empty() { - // Create a packet that will be directly sent to the mix destination - packets.push(Packet { - address: NodeAddress::try_from(destination.address)?, - body: PacketBody::from(fragment), - }); - } else { - // Use dummy delays because mixnodes will ignore this value and generate delay randomly by themselves. - let delays = vec![Delay::new_from_nanos(0); route.len()]; - packets.push(Packet { - address: NodeAddress::try_from(route[0].address)?, - body: PacketBody::from(&sphinx_packet::SphinxPacket::new( - fragment.bytes(), - &route, - &destination, - &delays, - )?), - }); - } - } - Ok(packets) - } - - /// Returns the address of the mix node that this packet is being sent to - pub fn address(&self) -> NodeAddress { - self.address - } - - /// Returns the body of the packet - pub fn body(self) -> PacketBody { - self.body - } -} - -/// The body of a packet to be sent through the mixnet -#[derive(Clone, PartialEq, Eq, Debug)] -pub enum PacketBody { - /// A Sphinx packet to be sent to the next mix node - SphinxPacket(Vec), - /// A fragment that has been through the mixnet and can be reconstructed into the original message - Fragment(Vec), -} - -impl From<&sphinx_packet::SphinxPacket> for PacketBody { - fn from(packet: &sphinx_packet::SphinxPacket) -> Self { - Self::SphinxPacket(packet.to_bytes()) - } -} - -impl From<&Fragment> for PacketBody { - fn from(fragment: &Fragment) -> Self { - Self::Fragment(fragment.bytes()) - } -} - -impl TryFrom for PacketBody { - type Error = MixnetError; - - fn try_from(payload: sphinx_packet::payload::Payload) -> Result { - Ok(Self::Fragment(payload.recover_plaintext()?)) - } -} - -impl PacketBody { - /// Consumes the packet body and serialize it into a byte array - pub fn bytes(self) -> Box<[u8]> { - match self { - Self::SphinxPacket(data) => Self::bytes_with_flag(PacketBodyFlag::SphinxPacket, data), - Self::Fragment(data) => Self::bytes_with_flag(PacketBodyFlag::Fragment, data), - } - } - - fn bytes_with_flag(flag: PacketBodyFlag, mut msg: Vec) -> Box<[u8]> { - let mut out = Vec::with_capacity(1 + std::mem::size_of::() + msg.len()); - out.push(flag as u8); - out.extend_from_slice(&msg.len().to_le_bytes()); - out.append(&mut msg); - out.into_boxed_slice() - } - - /// Deserialize a packet body from a reader - pub async fn read_from( - reader: &mut R, - ) -> io::Result> { - let mut flag = [0u8; 1]; - reader.read_exact(&mut flag).await?; - - let mut size = [0u8; std::mem::size_of::()]; - reader.read_exact(&mut size).await?; - - let mut data = vec![0u8; usize::from_le_bytes(size)]; - reader.read_exact(&mut data).await?; - - match PacketBodyFlag::try_from(flag[0]) { - Ok(PacketBodyFlag::SphinxPacket) => Ok(Ok(PacketBody::SphinxPacket(data))), - Ok(PacketBodyFlag::Fragment) => Ok(Ok(PacketBody::Fragment(data))), - Err(e) => Ok(Err(e)), - } - } - - pub(crate) fn process_sphinx_packet( - packet: &[u8], - private_key: &PrivateKey, - ) -> Result { - Packet::new(sphinx_packet::SphinxPacket::from_bytes(packet)?.process(private_key)?) - } -} - -#[repr(u8)] -enum PacketBodyFlag { - SphinxPacket, - Fragment, -} - -impl TryFrom for PacketBodyFlag { - type Error = MixnetError; - - fn try_from(value: u8) -> Result { - match value { - 0u8 => Ok(PacketBodyFlag::SphinxPacket), - 1u8 => Ok(PacketBodyFlag::Fragment), - _ => Err(MixnetError::InvalidPacketFlag), - } - } -} - -pub(crate) enum Message { - Real(Vec), - DropCover(Vec), -} - -impl Message { - fn bytes(self) -> Box<[u8]> { - match self { - Self::Real(msg) => Self::bytes_with_flag(MessageFlag::Real, msg), - Self::DropCover(msg) => Self::bytes_with_flag(MessageFlag::DropCover, msg), - } - } - - fn bytes_with_flag(flag: MessageFlag, mut msg: Vec) -> Box<[u8]> { - let mut out = Vec::with_capacity(1 + msg.len()); - out.push(flag as u8); - out.append(&mut msg); - out.into_boxed_slice() - } - - pub(crate) fn from_bytes(value: &[u8]) -> Result { - if value.is_empty() { - return Err(MixnetError::InvalidMessage); - } - - match MessageFlag::try_from(value[0])? { - MessageFlag::Real => Ok(Self::Real(value[1..].into())), - MessageFlag::DropCover => Ok(Self::DropCover(value[1..].into())), - } - } -} - -#[repr(u8)] -enum MessageFlag { - Real, - DropCover, -} - -impl TryFrom for MessageFlag { - type Error = MixnetError; - - fn try_from(value: u8) -> Result { - match value { - 0u8 => Ok(MessageFlag::Real), - 1u8 => Ok(MessageFlag::DropCover), - _ => Err(MixnetError::InvalidPacketFlag), - } - } -} diff --git a/mixnet/src/poisson.rs b/mixnet/src/poisson.rs deleted file mode 100644 index 0854c471..00000000 --- a/mixnet/src/poisson.rs +++ /dev/null @@ -1,93 +0,0 @@ -use std::time::Duration; - -use rand::Rng; -use rand_distr::{Distribution, Exp}; - -use crate::error::MixnetError; - -/// A Poisson process that models the times at which events occur. -pub struct Poisson(Exp); - -impl Poisson { - /// Create a new Poisson process with the given rate per minute. - pub fn new(rate_per_min: f64) -> Result { - Ok(Self(Exp::new(rate_per_min)?)) - } - - /// Get a random interval between events that follow a Poisson distribution. - /// - /// If events occur in a Poisson distribution with rate_per_min, - /// the interval between events follow the exponential distribution with rate_per_min. - pub fn interval(&self, rng: &mut R) -> Duration { - // generate a random value from the distribution - let interval_min = self.0.sample(rng); - // convert minutes to seconds - Duration::from_secs_f64(interval_min * 60.0) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use rand::rngs::OsRng; - use std::{collections::BTreeMap, time::Duration}; - - // Test the interval generation for a specific rate - #[test] - fn test_interval_generation() { - let interval = Poisson::new(1.0).unwrap().interval(&mut OsRng); - // Check if the interval is within a plausible range - // This is a basic check; in practice, you may want to perform a statistical test - assert!(interval > Duration::from_secs(0)); // Must be positive - } - - // Compute the empirical CDF - fn empirical_cdf(samples: &[Duration]) -> BTreeMap { - let mut map = BTreeMap::new(); - let n = samples.len() as f64; - - for &sample in samples { - *map.entry(sample).or_insert(0.0) += 1.0 / n; - } - - let mut acc = 0.0; - for value in map.values_mut() { - acc += *value; - *value = acc; - } - - map - } - - // Compare the empirical CDF to the theoretical CDF - #[test] - fn test_distribution_fit() { - let rate_per_min = 1.0; - let mut intervals = Vec::new(); - - // Generate 10,000 samples - let poisson = Poisson::new(rate_per_min).unwrap(); - for _ in 0..10_000 { - intervals.push(poisson.interval(&mut OsRng)); - } - - let empirical = empirical_cdf(&intervals); - - // theoretical CDF for exponential distribution - let rate_per_sec = rate_per_min / 60.0; - let theoretical_cdf = |x: f64| 1.0 - (-rate_per_sec * x).exp(); - - // Kolmogorov-Smirnov test - let ks_statistic: f64 = empirical - .iter() - .map(|(&k, &v)| { - let x = k.as_secs_f64(); - (theoretical_cdf(x) - v).abs() - }) - .fold(0.0, f64::max); - - println!("KS Statistic: {}", ks_statistic); - - assert!(ks_statistic < 0.05, "Distributions differ significantly."); - } -} diff --git a/mixnet/src/topology.rs b/mixnet/src/topology.rs deleted file mode 100644 index f903cadb..00000000 --- a/mixnet/src/topology.rs +++ /dev/null @@ -1,199 +0,0 @@ -use nomos_utils::fisheryates::FisherYatesShuffle; -use rand::Rng; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use sphinx_packet::{ - constants::IDENTIFIER_LENGTH, - crypto::{PublicKey, PUBLIC_KEY_SIZE}, - route::{DestinationAddressBytes, SURBIdentifier}, -}; - -use crate::{address::NodeAddress, error::MixnetError}; - -/// Defines Mixnet topology construction and route selection -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct MixnetTopology { - mixnode_candidates: Vec, - num_layers: usize, - num_mixnodes_per_layer: usize, -} - -impl MixnetTopology { - /// Generates [MixnetTopology] with random shuffling/sampling using a given entropy. - /// - /// # Errors - /// - /// This function will return an error if parameters are invalid. - pub fn new( - mut mixnode_candidates: Vec, - num_layers: usize, - num_mixnodes_per_layer: usize, - entropy: [u8; 32], - ) -> Result { - if mixnode_candidates.len() < num_layers * num_mixnodes_per_layer { - return Err(MixnetError::InvalidTopologySize); - } - - FisherYatesShuffle::shuffle(&mut mixnode_candidates, entropy); - Ok(Self { - mixnode_candidates, - num_layers, - num_mixnodes_per_layer, - }) - } - - /// Selects a mix destination randomly from the last mix layer - pub(crate) fn choose_destination(&self) -> sphinx_packet::route::Destination { - let idx_in_layer = rand::thread_rng().gen_range(0..self.num_mixnodes_per_layer); - let idx = self.num_mixnodes_per_layer * (self.num_layers - 1) + idx_in_layer; - self.mixnode_candidates[idx].clone().into() - } - - /// Selects a mix route randomly from all mix layers except the last layer - /// and append a mix destination to the end of the mix route. - /// - /// That is, the caller can generate multiple routes with one mix destination. - pub(crate) fn gen_route(&self) -> Vec { - let mut route = Vec::with_capacity(self.num_layers); - for layer in 0..self.num_layers - 1 { - let idx_in_layer = rand::thread_rng().gen_range(0..self.num_mixnodes_per_layer); - let idx = self.num_mixnodes_per_layer * layer + idx_in_layer; - route.push(self.mixnode_candidates[idx].clone().into()); - } - route - } -} - -/// Mix node information that is used for forwarding packets to the mix node -#[derive(Clone, Debug)] -pub struct MixNodeInfo(sphinx_packet::route::Node); - -impl MixNodeInfo { - /// Creates a [`MixNodeInfo`]. - pub fn new( - address: NodeAddress, - public_key: [u8; PUBLIC_KEY_SIZE], - ) -> Result { - Ok(Self(sphinx_packet::route::Node::new( - address.try_into()?, - PublicKey::from(public_key), - ))) - } -} - -impl From for sphinx_packet::route::Node { - fn from(info: MixNodeInfo) -> Self { - info.0 - } -} - -const DUMMY_SURB_IDENTIFIER: SURBIdentifier = [0u8; IDENTIFIER_LENGTH]; - -impl From for sphinx_packet::route::Destination { - fn from(info: MixNodeInfo) -> Self { - sphinx_packet::route::Destination::new( - DestinationAddressBytes::from_bytes(info.0.address.as_bytes()), - DUMMY_SURB_IDENTIFIER, - ) - } -} - -impl Serialize for MixNodeInfo { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - SerializableMixNodeInfo::try_from(self) - .map_err(serde::ser::Error::custom)? - .serialize(serializer) - } -} - -impl<'de> Deserialize<'de> for MixNodeInfo { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - Self::try_from(SerializableMixNodeInfo::deserialize(deserializer)?) - .map_err(serde::de::Error::custom) - } -} - -// Only for serializing/deserializing [`MixNodeInfo`] since [`sphinx_packet::route::Node`] is not serializable. -#[derive(Serialize, Deserialize, Clone, Debug)] -struct SerializableMixNodeInfo { - address: NodeAddress, - public_key: [u8; PUBLIC_KEY_SIZE], -} - -impl TryFrom<&MixNodeInfo> for SerializableMixNodeInfo { - type Error = MixnetError; - - fn try_from(info: &MixNodeInfo) -> Result { - Ok(Self { - address: NodeAddress::try_from(info.0.address)?, - public_key: *info.0.pub_key.as_bytes(), - }) - } -} - -impl TryFrom for MixNodeInfo { - type Error = MixnetError; - - fn try_from(info: SerializableMixNodeInfo) -> Result { - Self::new(info.address, info.public_key) - } -} - -#[cfg(test)] -pub mod tests { - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - - use rand::RngCore; - use sphinx_packet::crypto::{PrivateKey, PublicKey}; - - use crate::error::MixnetError; - - use super::{MixNodeInfo, MixnetTopology}; - - #[test] - fn shuffle() { - let candidates = gen_mixnodes(10); - let topology = MixnetTopology::new(candidates.clone(), 3, 2, gen_entropy()).unwrap(); - - assert_eq!(candidates.len(), topology.mixnode_candidates.len()); - } - - #[test] - fn route_and_destination() { - let topology = MixnetTopology::new(gen_mixnodes(10), 3, 2, gen_entropy()).unwrap(); - let _ = topology.choose_destination(); - assert_eq!(2, topology.gen_route().len()); // except a destination - } - - #[test] - fn invalid_topology_size() { - // if # of candidates is smaller than the topology size - assert!(matches!( - MixnetTopology::new(gen_mixnodes(5), 3, 2, gen_entropy()).err(), - Some(MixnetError::InvalidTopologySize), - )); - } - - pub fn gen_mixnodes(n: usize) -> Vec { - (0..n) - .map(|i| { - MixNodeInfo::new( - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), i as u16).into(), - *PublicKey::from(&PrivateKey::new()).as_bytes(), - ) - .unwrap() - }) - .collect() - } - - pub fn gen_entropy() -> [u8; 32] { - let mut entropy = [0u8; 32]; - rand::thread_rng().fill_bytes(&mut entropy); - entropy - } -} diff --git a/nodes/nomos-executor/Cargo.toml b/nodes/nomos-executor/Cargo.toml index 1ada2798..e11192c0 100644 --- a/nodes/nomos-executor/Cargo.toml +++ b/nodes/nomos-executor/Cargo.toml @@ -41,6 +41,5 @@ uuid = { version = "1.10.0", features = ["v4"] } [features] default = ["tracing"] -mixnet = ["nomos-node/mixnet"] metrics = ["nomos-node/metrics"] tracing = ["nomos-node/tracing"] diff --git a/nodes/nomos-node/Cargo.toml b/nodes/nomos-node/Cargo.toml index 66c6f418..e17dc894 100644 --- a/nodes/nomos-node/Cargo.toml +++ b/nodes/nomos-node/Cargo.toml @@ -63,6 +63,5 @@ rand = "0.8" [features] default = ["tracing"] -mixnet = ["nomos-network/mixnet"] metrics = [] tracing = [] diff --git a/nodes/nomos-node/config.yaml b/nodes/nomos-node/config.yaml index 4081f2c6..7fa09d75 100644 --- a/nodes/nomos-node/config.yaml +++ b/nodes/nomos-node/config.yaml @@ -47,19 +47,6 @@ network: port: 3000 node_key: 40fb62acf1604000c1b8d3bd0880e43eb2f6ae52029fde75d992ba0fed6e01c3 initial_peers: [] - mixnet: - mixclient: - topology: - mixnode_candidates: - - address: 127.0.0.1:3000 - public_key: [110, 177, 93, 41, 184, 16, 49, 126, 195, 57, 202, 199, 160, 161, 47, 195, 221, 40, 143, 151, 38, 250, 22, 82, 40, 83, 91, 3, 200, 239, 155, 67] - num_layers: 1 - num_mixnodes_per_layer: 1 - emission_rate_per_min: 600.0 - redundancy: 1 - mixnode: - encryption_private_key: [183, 50, 199, 33, 53, 46, 43, 123, 6, 173, 255, 66, 183, 156, 146, 221, 80, 102, 22, 155, 216, 234, 28, 99, 107, 231, 99, 27, 250, 17, 36, 108] - delay_rate_per_min: 60000.0 http: backend_settings: diff --git a/nodes/nomos-node/src/config.rs b/nodes/nomos-node/src/config.rs index d1aff678..1a498ebf 100644 --- a/nodes/nomos-node/src/config.rs +++ b/nodes/nomos-node/src/config.rs @@ -221,8 +221,6 @@ pub fn update_network( network.backend.initial_peers = peers; } - // TODO: configure mixclient and mixnode if the mixnet feature is enabled - Ok(()) } diff --git a/nomos-libp2p/Cargo.toml b/nomos-libp2p/Cargo.toml index e5c2c594..e9c2149e 100644 --- a/nomos-libp2p/Cargo.toml +++ b/nomos-libp2p/Cargo.toml @@ -15,7 +15,6 @@ libp2p = { version = "0.53.2", features = [ "quic", "secp256k1", ] } -libp2p-stream = "0.1.0-alpha" blake2 = { version = "0.10" } serde = { version = "1.0.166", features = ["derive"] } hex = "0.4.3" diff --git a/nomos-libp2p/src/lib.rs b/nomos-libp2p/src/lib.rs index 3d943f9a..7e429ac4 100644 --- a/nomos-libp2p/src/lib.rs +++ b/nomos-libp2p/src/lib.rs @@ -20,8 +20,6 @@ pub use libp2p::{ swarm::{dial_opts::DialOpts, DialError, NetworkBehaviour, SwarmEvent}, PeerId, SwarmBuilder, Transport, }; -pub use libp2p_stream; -use libp2p_stream::Control; pub use multiaddr::{multiaddr, Multiaddr, Protocol}; // TODO: Risc0 proofs are HUGE (220 Kb) and it's the only reason we need to have this @@ -36,7 +34,6 @@ pub struct Swarm { #[derive(NetworkBehaviour)] pub struct Behaviour { - stream: libp2p_stream::Behaviour, gossipsub: gossipsub::Behaviour, } @@ -50,10 +47,7 @@ impl Behaviour { .max_transmit_size(DATA_LIMIT) .build()?, )?; - Ok(Self { - stream: libp2p_stream::Behaviour::new(), - gossipsub, - }) + Ok(Self { gossipsub }) } } @@ -150,13 +144,6 @@ impl Swarm { gossipsub::IdentTopic::new(topic).hash() } - /// Returns a stream control that can be used to accept streams and establish streams to - /// other peers. - /// Stream controls can be cloned. - pub fn stream_control(&self) -> Control { - self.swarm.behaviour().stream.new_control() - } - pub fn multiaddr(ip: std::net::Ipv4Addr, port: u16) -> Multiaddr { multiaddr!(Ip4(ip), Udp(port), QuicV1) } @@ -175,68 +162,3 @@ fn compute_message_id(message: &Message) -> MessageId { hasher.update(&message.data); MessageId::from(hasher.finalize().to_vec()) } - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use futures::{AsyncReadExt, AsyncWriteExt, StreamExt}; - use libp2p::StreamProtocol; - use rand::Rng; - - use crate::{Swarm, SwarmConfig}; - - #[tokio::test] - async fn stream() { - // Init two swarms - let (config1, mut swarm1) = init_swarm(); - let (_, mut swarm2) = init_swarm(); - let swarm1_peer_id = *swarm1.swarm().local_peer_id(); - - // Dial to swarm1 - swarm2 - .connect(Swarm::multiaddr(config1.host, config1.port)) - .unwrap(); - - // Prepare stream controls - let mut stream_control1 = swarm1.stream_control(); - let mut stream_control2 = swarm2.stream_control(); - - // Poll swarms to make progress - tokio::spawn(async move { while (swarm1.next().await).is_some() {} }); - tokio::spawn(async move { while (swarm2.next().await).is_some() {} }); - - // Make swarm1 accept incoming streams - let protocol = StreamProtocol::new("/test"); - let mut incoming_streams = stream_control1.accept(protocol).unwrap(); - tokio::spawn(async move { - // If a new stream is established, write bytes and close the stream. - while let Some((_, mut stream)) = incoming_streams.next().await { - stream.write_all(&[1, 2, 3, 4]).await.unwrap(); - stream.close().await.unwrap(); - } - }); - - // Wait until the connection is established - tokio::time::sleep(Duration::from_secs(1)).await; - - // Establish a stream with swarm1 and read bytes - let mut stream = stream_control2 - .open_stream(swarm1_peer_id, StreamProtocol::new("/test")) - .await - .unwrap(); - let mut buf = [0u8; 4]; - stream.read_exact(&mut buf).await.unwrap(); - assert_eq!(buf, [1, 2, 3, 4]); - } - - fn init_swarm() -> (SwarmConfig, Swarm) { - let config = SwarmConfig { - host: std::net::Ipv4Addr::new(127, 0, 0, 1), - port: rand::thread_rng().gen_range(10000..30000), - ..Default::default() - }; - let swarm = Swarm::build(&config).unwrap(); - (config, swarm) - } -} diff --git a/nomos-services/data-availability/tests/Cargo.toml b/nomos-services/data-availability/tests/Cargo.toml index 8dd3108e..c3826879 100644 --- a/nomos-services/data-availability/tests/Cargo.toml +++ b/nomos-services/data-availability/tests/Cargo.toml @@ -44,4 +44,3 @@ blake2 = { version = "0.10" } [features] default = ["libp2p"] libp2p = [] -mixnet = [] diff --git a/nomos-services/data-availability/tests/src/lib.rs b/nomos-services/data-availability/tests/src/lib.rs index 89bd1908..fe071f36 100644 --- a/nomos-services/data-availability/tests/src/lib.rs +++ b/nomos-services/data-availability/tests/src/lib.rs @@ -1,22 +1,13 @@ -// Networking is not essential for verifier and indexer tests. -// Libp2p network is chosen for consensus requirement, mixnet is ignored. -// -// Note: To enable rust-analyzer in modules, comment out the -// `#[cfg(not(feature = "mixnet"))]` lines (reenable when pushing). - #[cfg(test)] #[cfg(feature = "libp2p")] -#[cfg(not(feature = "mixnet"))] mod common; #[cfg(test)] #[cfg(feature = "libp2p")] -#[cfg(not(feature = "mixnet"))] mod indexer_integration; #[cfg(test)] #[cfg(feature = "libp2p")] -#[cfg(not(feature = "mixnet"))] mod verifier_integration; #[cfg(test)] diff --git a/nomos-services/network/Cargo.toml b/nomos-services/network/Cargo.toml index 2d630a60..85301096 100644 --- a/nomos-services/network/Cargo.toml +++ b/nomos-services/network/Cargo.toml @@ -22,7 +22,6 @@ futures = "0.3" parking_lot = "0.12" nomos-core = { path = "../../nomos-core" } nomos-libp2p = { path = "../../nomos-libp2p", optional = true } -mixnet = { path = "../../mixnet", optional = true } utoipa = { version = "4.0", optional = true } serde_json = { version = "1", optional = true } @@ -33,6 +32,5 @@ tokio = { version = "1", features = ["full"] } [features] default = [] libp2p = ["nomos-libp2p", "rand", "humantime-serde"] -mixnet = ["dep:mixnet"] mock = ["rand", "chrono"] openapi = ["dep:utoipa", "serde_json"] diff --git a/nomos-services/network/src/backends/libp2p/command.rs b/nomos-services/network/src/backends/libp2p/command.rs index 7d64b05b..6dcd31c0 100644 --- a/nomos-services/network/src/backends/libp2p/command.rs +++ b/nomos-services/network/src/backends/libp2p/command.rs @@ -1,4 +1,4 @@ -use nomos_libp2p::{libp2p::StreamProtocol, Multiaddr, PeerId}; +use nomos_libp2p::{Multiaddr, PeerId}; use serde::{Deserialize, Serialize}; use tokio::sync::oneshot; @@ -21,11 +21,6 @@ pub enum Command { message: Box<[u8]>, retry_count: usize, }, - StreamSend { - peer_id: PeerId, - protocol: StreamProtocol, - data: Box<[u8]>, - }, } #[derive(Debug)] diff --git a/nomos-services/network/src/backends/libp2p/config.rs b/nomos-services/network/src/backends/libp2p/config.rs index bcee26e8..a45d4c4b 100644 --- a/nomos-services/network/src/backends/libp2p/config.rs +++ b/nomos-services/network/src/backends/libp2p/config.rs @@ -1,9 +1,6 @@ use nomos_libp2p::{Multiaddr, SwarmConfig}; use serde::{Deserialize, Serialize}; -#[cfg(feature = "mixnet")] -use crate::backends::libp2p::mixnet::MixnetConfig; - #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Libp2pConfig { #[serde(flatten)] @@ -11,6 +8,4 @@ pub struct Libp2pConfig { // Initial peers to connect to #[serde(default)] pub initial_peers: Vec, - #[cfg(feature = "mixnet")] - pub mixnet: MixnetConfig, } diff --git a/nomos-services/network/src/backends/libp2p/mixnet.rs b/nomos-services/network/src/backends/libp2p/mixnet.rs deleted file mode 100644 index fc32a2ca..00000000 --- a/nomos-services/network/src/backends/libp2p/mixnet.rs +++ /dev/null @@ -1,201 +0,0 @@ -use std::net::SocketAddr; - -use futures::StreamExt; -use mixnet::{ - address::NodeAddress, - client::{MessageQueue, MixClient, MixClientConfig}, - node::{MixNode, MixNodeConfig, Output, PacketQueue}, - packet::PacketBody, -}; -use nomos_core::wire; -use nomos_libp2p::{ - libp2p::{Stream, StreamProtocol}, - libp2p_stream::IncomingStreams, - Multiaddr, Protocol, -}; -use serde::{Deserialize, Serialize}; -use tokio::{ - runtime::Handle, - sync::{mpsc, oneshot}, -}; - -use crate::backends::libp2p::{Command, Dial, Topic}; - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct MixnetConfig { - pub mixclient: MixClientConfig, - pub mixnode: MixNodeConfig, -} - -pub(crate) const STREAM_PROTOCOL: StreamProtocol = StreamProtocol::new("/mixnet"); - -pub(crate) fn init_mixnet( - config: MixnetConfig, - runtime_handle: Handle, - cmd_tx: mpsc::Sender, - incoming_streams: IncomingStreams, -) -> MessageQueue { - // Run mixnode - let (mixnode, packet_queue) = MixNode::new(config.mixnode).unwrap(); - let libp2p_cmd_tx = cmd_tx.clone(); - let queue = packet_queue.clone(); - runtime_handle.spawn(async move { - run_mixnode(mixnode, queue, libp2p_cmd_tx).await; - }); - let handle = runtime_handle.clone(); - let queue = packet_queue.clone(); - runtime_handle.spawn(async move { - handle_incoming_streams(incoming_streams, queue, handle).await; - }); - - // Run mixclient - let (mixclient, message_queue) = MixClient::new(config.mixclient).unwrap(); - runtime_handle.spawn(async move { - run_mixclient(mixclient, packet_queue, cmd_tx).await; - }); - - message_queue -} - -async fn run_mixnode( - mut mixnode: MixNode, - packet_queue: PacketQueue, - cmd_tx: mpsc::Sender, -) { - while let Some(output) = mixnode.next().await { - match output { - Output::Forward(packet) => { - stream_send(packet.address(), packet.body(), &cmd_tx, &packet_queue).await; - } - Output::ReconstructedMessage(message) => match MixnetMessage::from_bytes(&message) { - Ok(msg) => { - cmd_tx - .send(Command::Broadcast { - topic: msg.topic, - message: msg.message, - }) - .await - .unwrap(); - } - Err(e) => { - tracing::error!("failed to parse message received from mixnet: {e}"); - } - }, - } - } -} - -async fn run_mixclient( - mut mixclient: MixClient, - packet_queue: PacketQueue, - cmd_tx: mpsc::Sender, -) { - while let Some(packet) = mixclient.next().await { - stream_send(packet.address(), packet.body(), &cmd_tx, &packet_queue).await; - } -} - -async fn handle_incoming_streams( - mut incoming_streams: IncomingStreams, - packet_queue: PacketQueue, - runtime_handle: Handle, -) { - while let Some((_, stream)) = incoming_streams.next().await { - let queue = packet_queue.clone(); - runtime_handle.spawn(async move { - if let Err(e) = handle_stream(stream, queue).await { - tracing::warn!("stream closed: {e}"); - } - }); - } -} - -async fn handle_stream(mut stream: Stream, packet_queue: PacketQueue) -> std::io::Result<()> { - loop { - match PacketBody::read_from(&mut stream).await? { - Ok(packet_body) => { - packet_queue - .send(packet_body) - .await - .expect("The receiving half of packet queue should be always open"); - } - Err(e) => { - tracing::error!( - "failed to parse packet body. continuing reading the next packet: {e}" - ); - } - } - } -} - -async fn stream_send( - addr: NodeAddress, - packet_body: PacketBody, - cmd_tx: &mpsc::Sender, - packet_queue: &PacketQueue, -) { - let (tx, rx) = oneshot::channel(); - cmd_tx - .send(Command::Connect(Dial { - addr: multiaddr_from(addr), - retry_count: 3, - result_sender: tx, - })) - .await - .expect("Command receiver should be always open"); - - match rx.await { - Ok(Ok(peer_id)) => { - cmd_tx - .send(Command::StreamSend { - peer_id, - protocol: STREAM_PROTOCOL, - data: packet_body.bytes(), - }) - .await - .expect("Command receiver should be always open"); - } - Ok(Err(e)) => match e { - nomos_libp2p::DialError::NoAddresses => { - tracing::debug!("Dialing failed because the peer is the local node. Sending msg directly to the queue"); - packet_queue - .send(packet_body) - .await - .expect("The receiving half of packet queue should be always open"); - } - _ => tracing::error!("failed to dial with unrecoverable error: {e}"), - }, - Err(e) => { - tracing::error!("channel closed before receiving: {e}"); - } - } -} - -fn multiaddr_from(addr: NodeAddress) -> Multiaddr { - match SocketAddr::from(addr) { - SocketAddr::V4(addr) => Multiaddr::empty() - .with(Protocol::Ip4(*addr.ip())) - .with(Protocol::Udp(addr.port())) - .with(Protocol::QuicV1), - SocketAddr::V6(addr) => Multiaddr::empty() - .with(Protocol::Ip6(*addr.ip())) - .with(Protocol::Udp(addr.port())) - .with(Protocol::QuicV1), - } -} - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub(crate) struct MixnetMessage { - pub topic: Topic, - pub message: Box<[u8]>, -} - -impl MixnetMessage { - pub fn as_bytes(&self) -> Vec { - wire::serialize(self).expect("Couldn't serialize MixnetMessage") - } - - pub fn from_bytes(data: &[u8]) -> Result { - wire::deserialize(data) - } -} diff --git a/nomos-services/network/src/backends/libp2p/mod.rs b/nomos-services/network/src/backends/libp2p/mod.rs index 39aa88c6..8cbe9344 100644 --- a/nomos-services/network/src/backends/libp2p/mod.rs +++ b/nomos-services/network/src/backends/libp2p/mod.rs @@ -1,7 +1,5 @@ mod command; mod config; -#[cfg(feature = "mixnet")] -pub mod mixnet; pub(crate) mod swarm; // std @@ -11,10 +9,6 @@ use self::swarm::SwarmHandler; // internal use super::NetworkBackend; -#[cfg(feature = "mixnet")] -use crate::backends::libp2p::mixnet::{init_mixnet, MixnetMessage, STREAM_PROTOCOL}; -#[cfg(feature = "mixnet")] -use ::mixnet::client::MessageQueue; pub use nomos_libp2p::libp2p::gossipsub::{Message, TopicHash}; // crates use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::NoState}; @@ -23,8 +17,6 @@ use tokio::sync::{broadcast, mpsc}; pub struct Libp2p { events_tx: broadcast::Sender, commands_tx: mpsc::Sender, - #[cfg(feature = "mixnet")] - mixclient_message_queue: MessageQueue, } #[derive(Debug)] @@ -55,14 +47,6 @@ impl NetworkBackend for Libp2p { let mut swarm_handler = SwarmHandler::new(&config, commands_tx.clone(), commands_rx, events_tx.clone()); - #[cfg(feature = "mixnet")] - let mixclient_message_queue = init_mixnet( - config.mixnet, - overwatch_handle.runtime().clone(), - commands_tx.clone(), - swarm_handler.incoming_streams(STREAM_PROTOCOL), - ); - overwatch_handle.runtime().spawn(async move { swarm_handler.run(config.initial_peers).await; }); @@ -70,35 +54,15 @@ impl NetworkBackend for Libp2p { Self { events_tx, commands_tx, - #[cfg(feature = "mixnet")] - mixclient_message_queue, } } - #[cfg(not(feature = "mixnet"))] async fn process(&self, msg: Self::Message) { if let Err(e) = self.commands_tx.send(msg).await { tracing::error!("failed to send command to nomos-libp2p: {e:?}"); } } - #[cfg(feature = "mixnet")] - async fn process(&self, msg: Self::Message) { - match msg { - Command::Broadcast { topic, message } => { - let msg = MixnetMessage { topic, message }; - if let Err(e) = self.mixclient_message_queue.send(msg.as_bytes()).await { - tracing::error!("failed to send messasge to mixclient: {e}"); - } - } - cmd => { - if let Err(e) = self.commands_tx.send(cmd).await { - tracing::error!("failed to send command to libp2p swarm: {e:?}"); - } - } - } - } - async fn subscribe( &mut self, kind: Self::EventKind, diff --git a/nomos-services/network/src/backends/libp2p/swarm.rs b/nomos-services/network/src/backends/libp2p/swarm.rs index f4610b6b..5fce1ac9 100644 --- a/nomos-services/network/src/backends/libp2p/swarm.rs +++ b/nomos-services/network/src/backends/libp2p/swarm.rs @@ -1,16 +1,7 @@ -use std::{ - collections::{hash_map::Entry, HashMap}, - time::Duration, -}; +use std::{collections::HashMap, time::Duration}; -use futures::AsyncWriteExt; -#[cfg(feature = "mixnet")] -use nomos_libp2p::libp2p_stream::IncomingStreams; use nomos_libp2p::{ - gossipsub, - libp2p::{swarm::ConnectionId, Stream, StreamProtocol}, - libp2p_stream::{Control, OpenStreamError}, - BehaviourEvent, Multiaddr, PeerId, Swarm, SwarmEvent, + gossipsub, libp2p::swarm::ConnectionId, BehaviourEvent, Multiaddr, PeerId, Swarm, SwarmEvent, }; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio_stream::StreamExt; @@ -24,8 +15,6 @@ use super::{ pub struct SwarmHandler { pub swarm: Swarm, - stream_control: Control, - streams: HashMap, pub pending_dials: HashMap, pub commands_tx: mpsc::Sender, pub commands_rx: mpsc::Receiver, @@ -53,15 +42,12 @@ impl SwarmHandler { events_tx: broadcast::Sender, ) -> Self { let swarm = Swarm::build(&config.inner).unwrap(); - let stream_control = swarm.stream_control(); // Keep the dialing history since swarm.connect doesn't return the result synchronously let pending_dials = HashMap::::new(); Self { swarm, - stream_control, - streams: HashMap::new(), pending_dials, commands_tx, commands_rx, @@ -173,24 +159,6 @@ impl SwarmHandler { } => { self.broadcast_and_retry(topic, message, retry_count).await; } - Command::StreamSend { - peer_id, - protocol, - data, - } => { - tracing::debug!("StreamSend to {peer_id}"); - match self.open_stream(peer_id, protocol).await { - Ok(stream) => { - if let Err(e) = stream.write_all(&data).await { - tracing::error!("failed to write to the stream with ${peer_id}: {e}"); - self.close_stream(&peer_id).await; - } - } - Err(e) => { - tracing::error!("failed to open stream with {peer_id}: {e}"); - } - } - } } } @@ -287,26 +255,4 @@ impl SwarmHandler { fn exp_backoff(retry: usize) -> Duration { std::time::Duration::from_secs(BACKOFF.pow(retry as u32)) } - - #[cfg(feature = "mixnet")] - pub fn incoming_streams(&mut self, protocol: StreamProtocol) -> IncomingStreams { - self.stream_control.accept(protocol).unwrap() - } - - async fn open_stream( - &mut self, - peer_id: PeerId, - protocol: StreamProtocol, - ) -> Result<&mut Stream, OpenStreamError> { - if let Entry::Vacant(entry) = self.streams.entry(peer_id) { - entry.insert(self.stream_control.open_stream(peer_id, protocol).await?); - } - Ok(self.streams.get_mut(&peer_id).unwrap()) - } - - async fn close_stream(&mut self, peer_id: &PeerId) { - if let Some(mut stream) = self.streams.remove(peer_id) { - let _ = stream.close().await; - } - } } diff --git a/tests/Cargo.toml b/tests/Cargo.toml index b82f4f97..0c65f766 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -9,7 +9,6 @@ blst = { version = "0.3.11" } nomos-node = { path = "../nodes/nomos-node", default-features = false } nomos-network = { path = "../nomos-services/network", features = ["libp2p"] } cryptarchia-consensus = { path = "../nomos-services/cryptarchia-consensus" } -mixnet = { path = "../mixnet" } nomos-log = { path = "../nomos-services/log" } nomos-api = { path = "../nomos-services/api" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } @@ -54,5 +53,4 @@ name = "test_cli" path = "src/tests/cli.rs" [features] -mixnet = ["nomos-network/mixnet"] metrics = ["nomos-node/metrics"] diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index e01f12f3..44abb65e 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -10,13 +10,6 @@ use cl::{InputWitness, NoteWitness, NullifierSecret}; use cryptarchia_consensus::{CryptarchiaInfo, CryptarchiaSettings, TimeConfig}; use cryptarchia_ledger::LedgerState; use kzgrs_backend::dispersal::BlobInfo; -#[cfg(feature = "mixnet")] -use mixnet::{ - address::NodeAddress, - client::MixClientConfig, - node::MixNodeConfig, - topology::{MixNodeInfo, MixnetTopology}, -}; use nomos_core::{block::Block, header::HeaderId, staking::NMO_UNIT}; use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageAdapterSettings; use nomos_da_indexer::IndexerSettings; @@ -31,8 +24,6 @@ use nomos_da_verifier::DaVerifierServiceSettings; use nomos_libp2p::{Multiaddr, PeerId, SwarmConfig}; use nomos_log::{LoggerBackend, LoggerFormat}; use nomos_mempool::MempoolMetrics; -#[cfg(feature = "mixnet")] -use nomos_network::backends::libp2p::mixnet::MixnetConfig; use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig}; use nomos_node::api::paths::{ CL_METRICS, CRYPTARCHIA_HEADERS, CRYPTARCHIA_INFO, DA_GET_RANGE, STORAGE_BLOCK, @@ -55,8 +46,6 @@ static CLIENT: Lazy = Lazy::new(Client::new); const NOMOS_BIN: &str = "../target/debug/nomos-node"; const DEFAULT_SLOT_TIME: u64 = 2; const CONSENSUS_SLOT_TIME_VAR: &str = "CONSENSUS_SLOT_TIME"; -#[cfg(feature = "mixnet")] -const NUM_MIXNODE_CANDIDATES: usize = 2; pub struct NomosNode { addr: SocketAddr, @@ -273,9 +262,6 @@ impl Node for NomosNode { thread_rng().fill(id); } - #[cfg(feature = "mixnet")] - let (mixclient_config, mixnode_configs) = create_mixnet_config(&ids); - let notes = ids .iter() .map(|&id| { @@ -322,11 +308,6 @@ impl Node for NomosNode { vec![coin], time_config.clone(), da.clone(), - #[cfg(feature = "mixnet")] - MixnetConfig { - mixclient: mixclient_config.clone(), - mixnode: mixnode_configs[i].clone(), - }, ) }) .collect::>(); @@ -345,22 +326,6 @@ impl Node for NomosNode { config.da_network.backend.addresses = peer_addresses.iter().cloned().collect(); } - #[cfg(feature = "mixnet")] - { - // Build a topology using only a subset of nodes. - let mixnode_candidates = configs - .iter() - .take(NUM_MIXNODE_CANDIDATES) - .collect::>(); - let topology = build_mixnet_topology(&mixnode_candidates); - - // Set the topology to all configs - for config in &mut configs { - config.network.backend.mixnet.mixclient.topology = topology.clone(); - } - configs - } - #[cfg(not(feature = "mixnet"))] configs } } @@ -376,52 +341,6 @@ struct GetRangeReq { pub range: Range<[u8; 8]>, } -#[cfg(feature = "mixnet")] -fn create_mixnet_config(ids: &[[u8; 32]]) -> (MixClientConfig, Vec) { - use std::num::NonZeroU8; - - let mixnode_configs: Vec = ids - .iter() - .map(|id| MixNodeConfig { - encryption_private_key: *id, - delay_rate_per_min: 100000000.0, - }) - .collect(); - // Build an empty topology because it will be constructed with meaningful node infos later - let topology = MixnetTopology::new(Vec::new(), 0, 0, [1u8; 32]).unwrap(); - - ( - MixClientConfig { - topology, - emission_rate_per_min: 120.0, - redundancy: NonZeroU8::new(1).unwrap(), - }, - mixnode_configs, - ) -} - -#[cfg(feature = "mixnet")] -fn build_mixnet_topology(mixnode_candidates: &[&Config]) -> MixnetTopology { - use mixnet::crypto::public_key_from; - use std::net::{IpAddr, Ipv4Addr}; - - let candidates = mixnode_candidates - .iter() - .map(|config| { - MixNodeInfo::new( - NodeAddress::from(SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - config.network.backend.inner.port, - )), - public_key_from(config.network.backend.mixnet.mixnode.encryption_private_key), - ) - .unwrap() - }) - .collect::>(); - let num_layers = candidates.len(); - MixnetTopology::new(candidates, num_layers, 1, [1u8; 32]).unwrap() -} - fn secret_key_to_peer_id(node_key: nomos_libp2p::ed25519::SecretKey) -> PeerId { PeerId::from_public_key( &nomos_libp2p::ed25519::Keypair::from(node_key) @@ -450,7 +369,6 @@ fn create_node_config( notes: Vec, time: TimeConfig, da_config: DaConfig, - #[cfg(feature = "mixnet")] mixnet_config: MixnetConfig, ) -> Config { let swarm_config: SwarmConfig = Default::default(); let node_key = swarm_config.node_key.clone(); @@ -463,8 +381,6 @@ fn create_node_config( backend: Libp2pConfig { inner: swarm_config, initial_peers: vec![], - #[cfg(feature = "mixnet")] - mixnet: mixnet_config, }, }, cryptarchia: CryptarchiaSettings {