Mix: remove all of the previous mixnet stuff (#822)

This commit is contained in:
Youngjoon Lee 2024-10-17 00:19:20 +09:00 committed by GitHub
parent 228d4b2147
commit cb86528a4a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 7 additions and 1837 deletions

View File

@ -16,7 +16,7 @@ jobs:
strategy: strategy:
fail-fast: true fail-fast: true
matrix: matrix:
feature: [ libp2p, "libp2p,mixnet" ] feature: [ libp2p ]
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
with: with:
@ -47,7 +47,7 @@ jobs:
strategy: strategy:
fail-fast: false # all OSes should be tested even if one fails (default: true) fail-fast: false # all OSes should be tested even if one fails (default: true)
matrix: matrix:
feature: [ libp2p, "libp2p,mixnet" ] feature: [ libp2p ]
os: [ self-hosted, macos-latest ] # drop windows for now as risc0 does not support it os: [ self-hosted, macos-latest ] # drop windows for now as risc0 does not support it
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
steps: steps:
@ -132,7 +132,7 @@ jobs:
runs-on: self-hosted runs-on: self-hosted
strategy: strategy:
matrix: matrix:
feature: [ libp2p, "libp2p,mixnet" ] feature: [ libp2p ]
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
with: with:

View File

@ -30,7 +30,6 @@ members = [
"nomos-utils", "nomos-utils",
"nodes/nomos-node", "nodes/nomos-node",
"nodes/nomos-executor", "nodes/nomos-executor",
"mixnet",
"consensus/carnot-engine", "consensus/carnot-engine",
"consensus/cryptarchia-engine", "consensus/cryptarchia-engine",
"ledger/cryptarchia-ledger", "ledger/cryptarchia-ledger",

View File

@ -1,22 +0,0 @@
[package]
name = "mixnet"
version = "0.0.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
rand = "0.8"
rand_distr = "0.4"
nomos-utils = { path = "../nomos-utils" }
thiserror = "1.0.57"
tokio = { version = "1.36.0", features = ["sync"] }
serde = { version = "1.0.197", features = ["derive"] }
sphinx-packet = "0.1.0"
nym-sphinx-addressing = { package = "nym-sphinx-addressing", git = "https://github.com/nymtech/nym", tag = "v1.1.22" }
tracing = "0.1.40"
uuid = { version = "1.7.0", features = ["v4"] }
futures = "0.3"
[dev-dependencies]
tokio = { version = "1.36.0", features = ["test-util"] }

View File

@ -1,53 +0,0 @@
use std::net::SocketAddr;
use nym_sphinx_addressing::nodes::NymNodeRoutingAddress;
use serde::{Deserialize, Serialize};
use crate::error::MixnetError;
/// Represents an address of mix node.
///
/// This just contains a single [`SocketAddr`], but has conversion functions
/// for various address types defined in the `sphinx-packet` crate.
#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq)]
pub struct NodeAddress(SocketAddr);
impl From<SocketAddr> for NodeAddress {
fn from(address: SocketAddr) -> Self {
Self(address)
}
}
impl From<NodeAddress> for SocketAddr {
fn from(address: NodeAddress) -> Self {
address.0
}
}
impl TryInto<sphinx_packet::route::NodeAddressBytes> for NodeAddress {
type Error = MixnetError;
fn try_into(self) -> Result<sphinx_packet::route::NodeAddressBytes, Self::Error> {
Ok(NymNodeRoutingAddress::from(SocketAddr::from(self)).try_into()?)
}
}
impl TryFrom<sphinx_packet::route::NodeAddressBytes> for NodeAddress {
type Error = MixnetError;
fn try_from(value: sphinx_packet::route::NodeAddressBytes) -> Result<Self, Self::Error> {
Ok(Self::from(SocketAddr::from(
NymNodeRoutingAddress::try_from(value)?,
)))
}
}
impl TryFrom<sphinx_packet::route::DestinationAddressBytes> for NodeAddress {
type Error = MixnetError;
fn try_from(value: sphinx_packet::route::DestinationAddressBytes) -> Result<Self, Self::Error> {
Self::try_from(sphinx_packet::route::NodeAddressBytes::from_bytes(
value.as_bytes(),
))
}
}

View File

@ -1,206 +0,0 @@
use std::{
collections::VecDeque,
num::NonZeroU8,
pin::Pin,
task::{Context, Poll},
};
use futures::{Future, Stream};
use rand::rngs::OsRng;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use crate::{error::MixnetError, packet::Packet, poisson::Poisson, topology::MixnetTopology};
/// Mix client implementation that is used to schedule messages to be sent to the mixnet.
/// Messages inserted to the [`MessageQueue`] are scheduled according to the Poisson interals
/// and returns from [`MixClient.next()`] when it is ready to be sent to the mixnet.
/// If there is no messages inserted to the [`MessageQueue`], cover packets are generated and
/// returned from [`MixClient.next()`].
pub struct MixClient {
config: MixClientConfig,
poisson: Poisson,
message_queue: mpsc::Receiver<Vec<u8>>,
real_packet_queue: VecDeque<Packet>,
delay: Option<Pin<Box<tokio::time::Sleep>>>,
}
/// Mix client configuration
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct MixClientConfig {
/// Mixnet topology
pub topology: MixnetTopology,
/// Poisson rate for packet emissions (per minute)
pub emission_rate_per_min: f64,
/// Packet redundancy for passive retransmission
pub redundancy: NonZeroU8,
}
const MESSAGE_QUEUE_SIZE: usize = 256;
/// Queue for sending messages to [`MixClient`]
pub type MessageQueue = mpsc::Sender<Vec<u8>>;
impl MixClient {
/// Creates a [`MixClient`] and a [`MessageQueue`].
///
/// This returns [`MixnetError`] if the given `config` is invalid.
pub fn new(config: MixClientConfig) -> Result<(Self, MessageQueue), MixnetError> {
let poisson = Poisson::new(config.emission_rate_per_min)?;
let (tx, rx) = mpsc::channel(MESSAGE_QUEUE_SIZE);
Ok((
Self {
config,
poisson,
message_queue: rx,
real_packet_queue: VecDeque::new(),
delay: None,
},
tx,
))
}
}
impl Stream for MixClient {
type Item = Packet;
/// Returns a next [`Packet`] to be emitted, if it exists and the Poisson timer is done.
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.delay.is_none() {
// We've never set an initial delay. Let's do it now.
cx.waker().wake_by_ref();
self.delay = Some(Box::pin(tokio::time::sleep(
self.poisson.interval(&mut OsRng),
)));
return Poll::Pending;
}
match self.delay.as_mut().unwrap().as_mut().poll(cx) {
Poll::Pending => {
// The delay hasn't elapsed yet.
// The current task is automatically scheduled to be woken up once the timer elapses,
// thanks to the `tokio::time::Sleep.poll(cx)`.
Poll::Pending
}
Poll::Ready(_) => {
// The delay has elapsed. Let's reset the delay and return the next packet.
let next_interval = self.poisson.interval(&mut OsRng);
let delay = self.delay.as_mut().unwrap();
let next_deadline = delay.deadline() + next_interval;
delay.as_mut().reset(next_deadline);
match self.next_packet() {
Ok(packet) => Poll::Ready(Some(packet)),
Err(e) => {
tracing::error!(
"failed to find a next packet to emit. skipping to the next turn: {e}"
);
Poll::Pending
}
}
}
}
}
}
impl MixClient {
const DROP_COVER_MSG: &'static [u8] = b"drop cover";
// Returns either a real packet or a drop cover packet.
fn next_packet(&mut self) -> Result<Packet, MixnetError> {
// If there is any redundant real packet scheduled, return it.
if let Some(packet) = self.real_packet_queue.pop_front() {
return Ok(packet);
}
match self.message_queue.try_recv() {
Ok(msg) => {
// If there is any message received, build real packets out of it and
// schedule them in the queue.
for packet in Packet::build_real(msg, &self.config.topology)? {
for _ in 0..self.config.redundancy.get() {
self.real_packet_queue.push_back(packet.clone());
}
}
Ok(self
.real_packet_queue
.pop_front()
.expect("real packet queue should not be empty"))
}
Err(_) => {
// If no message received, generate and return a drop cover packet.
let mut packets = Packet::build_drop_cover(
Vec::from(Self::DROP_COVER_MSG),
&self.config.topology,
)?;
Ok(packets.pop().expect("drop cover should not be empty"))
}
}
}
}
#[cfg(test)]
mod tests {
use std::{num::NonZeroU8, time::Instant};
use futures::StreamExt;
use crate::{
client::MixClientConfig,
topology::{
tests::{gen_entropy, gen_mixnodes},
MixnetTopology,
},
};
use super::MixClient;
#[tokio::test]
async fn poisson_emission() {
let emission_rate_per_min = 60.0;
let (mut client, _) = MixClient::new(MixClientConfig {
topology: MixnetTopology::new(gen_mixnodes(10), 3, 2, gen_entropy()).unwrap(),
emission_rate_per_min,
redundancy: NonZeroU8::new(3).unwrap(),
})
.unwrap();
let mut ts = Instant::now();
let mut intervals = Vec::new();
for _ in 0..30 {
assert!(client.next().await.is_some());
let now = Instant::now();
intervals.push(now - ts);
ts = now;
}
let avg_sec = intervals.iter().map(|d| d.as_secs()).sum::<u64>() / intervals.len() as u64;
let expected_avg_sec = (60.0 / emission_rate_per_min) as u64;
assert!(
avg_sec.abs_diff(expected_avg_sec) <= 1,
"{avg_sec} -{expected_avg_sec}"
);
}
#[tokio::test]
async fn real_packet_emission() {
let (mut client, msg_queue) = MixClient::new(MixClientConfig {
topology: MixnetTopology::new(gen_mixnodes(10), 3, 2, gen_entropy()).unwrap(),
emission_rate_per_min: 360.0,
redundancy: NonZeroU8::new(3).unwrap(),
})
.unwrap();
msg_queue.send("hello".as_bytes().into()).await.unwrap();
// Check if the next 3 packets are the same, according to the redundancy
let packet = client.next().await.unwrap();
assert_eq!(packet, client.next().await.unwrap());
assert_eq!(packet, client.next().await.unwrap());
// Check if the next packet is different (drop cover)
assert_ne!(packet, client.next().await.unwrap());
}
}

View File

@ -1,6 +0,0 @@
use sphinx_packet::crypto::{PrivateKey, PublicKey, PRIVATE_KEY_SIZE, PUBLIC_KEY_SIZE};
/// Converts a mixnode private key to a public key
pub fn public_key_from(private_key: [u8; PRIVATE_KEY_SIZE]) -> [u8; PUBLIC_KEY_SIZE] {
*PublicKey::from(&PrivateKey::from(private_key)).as_bytes()
}

View File

@ -1,34 +0,0 @@
/// Mixnet Errors
#[derive(thiserror::Error, Debug)]
pub enum MixnetError {
/// Invalid topology size
#[error("invalid mixnet topology size")]
InvalidTopologySize,
/// Invalid packet flag
#[error("invalid packet flag")]
InvalidPacketFlag,
/// Invalid fragment header
#[error("invalid fragment header")]
InvalidFragmentHeader,
/// Invalid fragment set ID
#[error("invalid fragment set ID: {0}")]
InvalidFragmentSetId(#[from] uuid::Error),
/// Invalid fragment ID
#[error("invalid fragment ID")]
InvalidFragmentId,
/// Message too long
#[error("message too long: {0} bytes")]
MessageTooLong(usize),
/// Invalid message
#[error("invalid message")]
InvalidMessage,
/// Node address error
#[error("node address error: {0}")]
NodeAddressError(#[from] nym_sphinx_addressing::nodes::NymNodeRoutingAddressError),
/// Sphinx packet error
#[error("sphinx packet error: {0}")]
SphinxPacketError(#[from] sphinx_packet::Error),
/// Exponential distribution error
#[error("exponential distribution error: {0}")]
ExponentialError(#[from] rand_distr::ExpError),
}

View File

@ -1,280 +0,0 @@
use std::collections::HashMap;
use sphinx_packet::{constants::PAYLOAD_SIZE, payload::PAYLOAD_OVERHEAD_SIZE};
use uuid::Uuid;
use crate::error::MixnetError;
pub(crate) struct FragmentSet(Vec<Fragment>);
impl FragmentSet {
const MAX_PLAIN_PAYLOAD_SIZE: usize = PAYLOAD_SIZE - PAYLOAD_OVERHEAD_SIZE;
const CHUNK_SIZE: usize = Self::MAX_PLAIN_PAYLOAD_SIZE - FragmentHeader::SIZE;
pub(crate) fn new(msg: &[u8]) -> Result<Self, MixnetError> {
// For now, we don't support more than `u8::MAX + 1` fragments.
// If needed, we can devise the FragmentSet chaining to support larger messages, like Nym.
let last_fragment_id = FragmentId::try_from(Self::num_chunks(msg) - 1)
.map_err(|_| MixnetError::MessageTooLong(msg.len()))?;
let set_id = FragmentSetId::new();
Ok(FragmentSet(
msg.chunks(Self::CHUNK_SIZE)
.enumerate()
.map(|(i, chunk)| Fragment {
header: FragmentHeader {
set_id,
last_fragment_id,
fragment_id: FragmentId::try_from(i)
.expect("i is always in the right range"),
},
body: Vec::from(chunk),
})
.collect(),
))
}
fn num_chunks(msg: &[u8]) -> usize {
msg.len().div_ceil(Self::CHUNK_SIZE)
}
}
impl AsRef<Vec<Fragment>> for FragmentSet {
fn as_ref(&self) -> &Vec<Fragment> {
&self.0
}
}
#[derive(PartialEq, Eq, Debug, Clone)]
pub(crate) struct Fragment {
header: FragmentHeader,
body: Vec<u8>,
}
impl Fragment {
pub(crate) fn bytes(&self) -> Vec<u8> {
let mut out = Vec::with_capacity(FragmentHeader::SIZE + self.body.len());
out.extend(self.header.bytes());
out.extend(&self.body);
out
}
pub(crate) fn from_bytes(value: &[u8]) -> Result<Self, MixnetError> {
Ok(Self {
header: FragmentHeader::from_bytes(&value[0..FragmentHeader::SIZE])?,
body: value[FragmentHeader::SIZE..].to_vec(),
})
}
}
#[derive(PartialEq, Eq, Hash, Debug, Clone, Copy)]
struct FragmentSetId(Uuid);
impl FragmentSetId {
const SIZE: usize = 16;
fn new() -> Self {
Self(Uuid::new_v4())
}
}
#[derive(PartialEq, Eq, Hash, Debug, Clone, Copy)]
struct FragmentId(u8);
impl FragmentId {
const SIZE: usize = std::mem::size_of::<u8>();
}
impl TryFrom<usize> for FragmentId {
type Error = MixnetError;
fn try_from(id: usize) -> Result<Self, Self::Error> {
if id > u8::MAX as usize {
return Err(MixnetError::InvalidFragmentId);
}
Ok(Self(id as u8))
}
}
impl From<FragmentId> for usize {
fn from(id: FragmentId) -> Self {
id.0 as usize
}
}
#[derive(PartialEq, Eq, Debug, Clone)]
struct FragmentHeader {
set_id: FragmentSetId,
last_fragment_id: FragmentId,
fragment_id: FragmentId,
}
impl FragmentHeader {
const SIZE: usize = FragmentSetId::SIZE + 2 * FragmentId::SIZE;
fn bytes(&self) -> [u8; Self::SIZE] {
let mut out = [0u8; Self::SIZE];
out[0..FragmentSetId::SIZE].copy_from_slice(self.set_id.0.as_bytes());
out[FragmentSetId::SIZE] = self.last_fragment_id.0;
out[FragmentSetId::SIZE + FragmentId::SIZE] = self.fragment_id.0;
out
}
fn from_bytes(value: &[u8]) -> Result<Self, MixnetError> {
if value.len() != Self::SIZE {
return Err(MixnetError::InvalidFragmentHeader);
}
Ok(Self {
set_id: FragmentSetId(Uuid::from_slice(&value[0..FragmentSetId::SIZE])?),
last_fragment_id: FragmentId(value[FragmentSetId::SIZE]),
fragment_id: FragmentId(value[FragmentSetId::SIZE + FragmentId::SIZE]),
})
}
}
pub struct MessageReconstructor {
fragment_sets: HashMap<FragmentSetId, FragmentSetReconstructor>,
}
impl MessageReconstructor {
pub fn new() -> Self {
Self {
fragment_sets: HashMap::new(),
}
}
/// Adds a fragment to the reconstructor and tries to reconstruct a message from the fragment set.
/// This returns `None` if the message has not been reconstructed yet.
pub fn add_and_reconstruct(&mut self, fragment: Fragment) -> Option<Vec<u8>> {
let set_id = fragment.header.set_id;
let reconstructed_msg = self
.fragment_sets
.entry(set_id)
.or_insert(FragmentSetReconstructor::new(
fragment.header.last_fragment_id,
))
.add(fragment)
.try_reconstruct_message()?;
// A message has been reconstructed completely from the fragment set.
// Delete the fragment set from the reconstructor.
self.fragment_sets.remove(&set_id);
Some(reconstructed_msg)
}
}
struct FragmentSetReconstructor {
last_fragment_id: FragmentId,
fragments: HashMap<FragmentId, Fragment>,
// For mem optimization, accumulates the expected message size
// whenever a new fragment is added to the `fragments`.
message_size: usize,
}
impl FragmentSetReconstructor {
fn new(last_fragment_id: FragmentId) -> Self {
Self {
last_fragment_id,
fragments: HashMap::new(),
message_size: 0,
}
}
fn add(&mut self, fragment: Fragment) -> &mut Self {
self.message_size += fragment.body.len();
if let Some(old_fragment) = self.fragments.insert(fragment.header.fragment_id, fragment) {
// In the case when a new fragment replaces the old one, adjust the `meesage_size`.
// e.g. The same fragment has been received multiple times.
self.message_size -= old_fragment.body.len();
}
self
}
/// Merges all fragments gathered if possible
fn try_reconstruct_message(&self) -> Option<Vec<u8>> {
(self.fragments.len() - 1 == self.last_fragment_id.into()).then(|| {
let mut msg = Vec::with_capacity(self.message_size);
for id in 0..=self.last_fragment_id.0 {
msg.extend(&self.fragments.get(&FragmentId(id)).unwrap().body);
}
msg
})
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use rand::RngCore;
use super::*;
#[test]
fn fragment_header() {
let header = FragmentHeader {
set_id: FragmentSetId::new(),
last_fragment_id: FragmentId(19),
fragment_id: FragmentId(0),
};
let bz = header.bytes();
assert_eq!(FragmentHeader::SIZE, bz.len());
assert_eq!(header, FragmentHeader::from_bytes(bz.as_slice()).unwrap());
}
#[test]
fn fragment() {
let fragment = Fragment {
header: FragmentHeader {
set_id: FragmentSetId::new(),
last_fragment_id: FragmentId(19),
fragment_id: FragmentId(0),
},
body: vec![1, 2, 3, 4],
};
let bz = fragment.bytes();
assert_eq!(FragmentHeader::SIZE + fragment.body.len(), bz.len());
assert_eq!(fragment, Fragment::from_bytes(bz.as_slice()).unwrap());
}
#[test]
fn fragment_set() {
let mut msg = vec![0u8; FragmentSet::CHUNK_SIZE * 3 + FragmentSet::CHUNK_SIZE / 2];
rand::thread_rng().fill_bytes(&mut msg);
assert_eq!(4, FragmentSet::num_chunks(&msg));
let set = FragmentSet::new(&msg).unwrap();
assert_eq!(4, set.as_ref().iter().len());
assert_eq!(
1,
HashSet::<FragmentSetId>::from_iter(
set.as_ref().iter().map(|fragment| fragment.header.set_id)
)
.len()
);
set.as_ref()
.iter()
.enumerate()
.for_each(|(i, fragment)| assert_eq!(i, fragment.header.fragment_id.0 as usize));
}
#[test]
fn message_reconstructor() {
let mut msg = vec![0u8; FragmentSet::CHUNK_SIZE * 2];
rand::thread_rng().fill_bytes(&mut msg);
let set = FragmentSet::new(&msg).unwrap();
let mut reconstructor = MessageReconstructor::new();
let mut fragments = set.as_ref().iter();
assert_eq!(
None,
reconstructor.add_and_reconstruct(fragments.next().unwrap().clone())
);
assert_eq!(
Some(msg),
reconstructor.add_and_reconstruct(fragments.next().unwrap().clone())
);
}
}

View File

@ -1,21 +0,0 @@
//! Mixnet
#![deny(missing_docs, warnings)]
#![forbid(unsafe_code)]
/// Mix node address
pub mod address;
/// Mix client
pub mod client;
/// Mixnet cryptography
pub mod crypto;
/// Mixnet errors
pub mod error;
mod fragment;
/// Mix node
pub mod node;
/// Mix packet
pub mod packet;
/// Poisson distribution
mod poisson;
/// Mixnet topology
pub mod topology;

View File

@ -1,191 +0,0 @@
use rand::rngs::OsRng;
use serde::{Deserialize, Serialize};
use sphinx_packet::crypto::{PrivateKey, PRIVATE_KEY_SIZE};
use tokio::sync::mpsc;
use crate::{
error::MixnetError,
fragment::{Fragment, MessageReconstructor},
packet::{Message, Packet, PacketBody},
poisson::Poisson,
};
/// Mix node implementation that returns Sphinx packets which needs to be forwarded to next mix nodes,
/// or messages reconstructed from Sphinx packets delivered through all mix layers.
pub struct MixNode {
output_rx: mpsc::UnboundedReceiver<Output>,
}
struct MixNodeRunner {
_config: MixNodeConfig,
encryption_private_key: PrivateKey,
poisson: Poisson,
packet_queue: mpsc::Receiver<PacketBody>,
message_reconstructor: MessageReconstructor,
output_tx: mpsc::UnboundedSender<Output>,
}
/// Mix node configuration
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct MixNodeConfig {
/// Private key for decrypting Sphinx packets
pub encryption_private_key: [u8; PRIVATE_KEY_SIZE],
/// Poisson delay rate per minutes
pub delay_rate_per_min: f64,
}
const PACKET_QUEUE_SIZE: usize = 256;
/// Queue for sending packets to [`MixNode`]
pub type PacketQueue = mpsc::Sender<PacketBody>;
impl MixNode {
/// Creates a [`MixNode`] and a [`PacketQueue`].
///
/// This returns [`MixnetError`] if the given `config` is invalid.
pub fn new(config: MixNodeConfig) -> Result<(Self, PacketQueue), MixnetError> {
let encryption_private_key = PrivateKey::from(config.encryption_private_key);
let poisson = Poisson::new(config.delay_rate_per_min)?;
let (packet_tx, packet_rx) = mpsc::channel(PACKET_QUEUE_SIZE);
let (output_tx, output_rx) = mpsc::unbounded_channel();
let mixnode_runner = MixNodeRunner {
_config: config,
encryption_private_key,
poisson,
packet_queue: packet_rx,
message_reconstructor: MessageReconstructor::new(),
output_tx,
};
tokio::spawn(mixnode_runner.run());
Ok((Self { output_rx }, packet_tx))
}
/// Returns a next `[Output]` to be emitted, if it exists and the Poisson delay is done (if necessary).
pub async fn next(&mut self) -> Option<Output> {
self.output_rx.recv().await
}
}
impl MixNodeRunner {
async fn run(mut self) {
loop {
if let Some(packet) = self.packet_queue.recv().await {
if let Err(e) = self.process_packet(packet) {
tracing::error!("failed to process packet. skipping it: {e}");
}
}
}
}
fn process_packet(&mut self, packet: PacketBody) -> Result<(), MixnetError> {
match packet {
PacketBody::SphinxPacket(packet) => self.process_sphinx_packet(packet.as_ref()),
PacketBody::Fragment(fragment) => self.process_fragment(fragment.as_ref()),
}
}
fn process_sphinx_packet(&self, packet: &[u8]) -> Result<(), MixnetError> {
let output = Output::Forward(PacketBody::process_sphinx_packet(
packet,
&self.encryption_private_key,
)?);
let delay = self.poisson.interval(&mut OsRng);
let output_tx = self.output_tx.clone();
tokio::spawn(async move {
tokio::time::sleep(delay).await;
// output_tx is always expected to be not closed/dropped.
output_tx.send(output).unwrap();
});
Ok(())
}
fn process_fragment(&mut self, fragment: &[u8]) -> Result<(), MixnetError> {
if let Some(msg) = self
.message_reconstructor
.add_and_reconstruct(Fragment::from_bytes(fragment)?)
{
match Message::from_bytes(&msg)? {
Message::Real(msg) => {
let output = Output::ReconstructedMessage(msg.into_boxed_slice());
self.output_tx
.send(output)
.expect("output channel shouldn't be closed");
}
Message::DropCover(_) => {
tracing::debug!("Drop cover message has been reconstructed. Dropping it...");
}
}
}
Ok(())
}
}
/// Output that [`MixNode::next`] returns.
#[derive(Debug, PartialEq, Eq)]
pub enum Output {
/// Packet to be forwarded to the next mix node
Forward(Packet),
/// Message reconstructed from [`Packet`]s
ReconstructedMessage(Box<[u8]>),
}
#[cfg(test)]
mod tests {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use sphinx_packet::crypto::PublicKey;
use crate::{
packet::Packet,
topology::{tests::gen_entropy, MixNodeInfo, MixnetTopology},
};
use super::*;
#[tokio::test]
async fn mixnode() {
let encryption_private_key = PrivateKey::new();
let node_info = MixNodeInfo::new(
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1000u16).into(),
*PublicKey::from(&encryption_private_key).as_bytes(),
)
.unwrap();
let topology = MixnetTopology::new(
(0..2).map(|_| node_info.clone()).collect(),
2,
1,
gen_entropy(),
)
.unwrap();
let (mut mixnode, packet_queue) = MixNode::new(MixNodeConfig {
encryption_private_key: encryption_private_key.to_bytes(),
delay_rate_per_min: 60.0,
})
.unwrap();
let msg = "hello".as_bytes().to_vec();
let packets = Packet::build_real(msg.clone(), &topology).unwrap();
let num_packets = packets.len();
for packet in packets.into_iter() {
packet_queue.send(packet.body()).await.unwrap();
}
for _ in 0..num_packets {
match mixnode.next().await.unwrap() {
Output::Forward(packet_to) => {
packet_queue.send(packet_to.body()).await.unwrap();
}
Output::ReconstructedMessage(_) => unreachable!(),
}
}
assert_eq!(
Output::ReconstructedMessage(msg.into_boxed_slice()),
mixnode.next().await.unwrap()
);
}
}

View File

@ -1,229 +0,0 @@
use std::io;
use futures::{AsyncRead, AsyncReadExt};
use sphinx_packet::{crypto::PrivateKey, header::delays::Delay};
use crate::{
address::NodeAddress,
error::MixnetError,
fragment::{Fragment, FragmentSet},
topology::MixnetTopology,
};
/// A packet to be sent through the mixnet
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct Packet {
address: NodeAddress,
body: PacketBody,
}
impl Packet {
fn new(processed_packet: sphinx_packet::ProcessedPacket) -> Result<Self, MixnetError> {
match processed_packet {
sphinx_packet::ProcessedPacket::ForwardHop(packet, addr, _) => Ok(Packet {
address: addr.try_into()?,
body: PacketBody::from(packet.as_ref()),
}),
sphinx_packet::ProcessedPacket::FinalHop(addr, _, payload) => Ok(Packet {
address: addr.try_into()?,
body: PacketBody::try_from(payload)?,
}),
}
}
pub(crate) fn build_real(
msg: Vec<u8>,
topology: &MixnetTopology,
) -> Result<Vec<Packet>, MixnetError> {
Self::build(Message::Real(msg), topology)
}
pub(crate) fn build_drop_cover(
msg: Vec<u8>,
topology: &MixnetTopology,
) -> Result<Vec<Packet>, MixnetError> {
Self::build(Message::DropCover(msg), topology)
}
fn build(msg: Message, topology: &MixnetTopology) -> Result<Vec<Packet>, MixnetError> {
let destination = topology.choose_destination();
let fragment_set = FragmentSet::new(&msg.bytes())?;
let mut packets = Vec::with_capacity(fragment_set.as_ref().len());
for fragment in fragment_set.as_ref().iter() {
let route = topology.gen_route();
if route.is_empty() {
// Create a packet that will be directly sent to the mix destination
packets.push(Packet {
address: NodeAddress::try_from(destination.address)?,
body: PacketBody::from(fragment),
});
} else {
// Use dummy delays because mixnodes will ignore this value and generate delay randomly by themselves.
let delays = vec![Delay::new_from_nanos(0); route.len()];
packets.push(Packet {
address: NodeAddress::try_from(route[0].address)?,
body: PacketBody::from(&sphinx_packet::SphinxPacket::new(
fragment.bytes(),
&route,
&destination,
&delays,
)?),
});
}
}
Ok(packets)
}
/// Returns the address of the mix node that this packet is being sent to
pub fn address(&self) -> NodeAddress {
self.address
}
/// Returns the body of the packet
pub fn body(self) -> PacketBody {
self.body
}
}
/// The body of a packet to be sent through the mixnet
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum PacketBody {
/// A Sphinx packet to be sent to the next mix node
SphinxPacket(Vec<u8>),
/// A fragment that has been through the mixnet and can be reconstructed into the original message
Fragment(Vec<u8>),
}
impl From<&sphinx_packet::SphinxPacket> for PacketBody {
fn from(packet: &sphinx_packet::SphinxPacket) -> Self {
Self::SphinxPacket(packet.to_bytes())
}
}
impl From<&Fragment> for PacketBody {
fn from(fragment: &Fragment) -> Self {
Self::Fragment(fragment.bytes())
}
}
impl TryFrom<sphinx_packet::payload::Payload> for PacketBody {
type Error = MixnetError;
fn try_from(payload: sphinx_packet::payload::Payload) -> Result<Self, Self::Error> {
Ok(Self::Fragment(payload.recover_plaintext()?))
}
}
impl PacketBody {
/// Consumes the packet body and serialize it into a byte array
pub fn bytes(self) -> Box<[u8]> {
match self {
Self::SphinxPacket(data) => Self::bytes_with_flag(PacketBodyFlag::SphinxPacket, data),
Self::Fragment(data) => Self::bytes_with_flag(PacketBodyFlag::Fragment, data),
}
}
fn bytes_with_flag(flag: PacketBodyFlag, mut msg: Vec<u8>) -> Box<[u8]> {
let mut out = Vec::with_capacity(1 + std::mem::size_of::<usize>() + msg.len());
out.push(flag as u8);
out.extend_from_slice(&msg.len().to_le_bytes());
out.append(&mut msg);
out.into_boxed_slice()
}
/// Deserialize a packet body from a reader
pub async fn read_from<R: AsyncRead + Unpin>(
reader: &mut R,
) -> io::Result<Result<Self, MixnetError>> {
let mut flag = [0u8; 1];
reader.read_exact(&mut flag).await?;
let mut size = [0u8; std::mem::size_of::<usize>()];
reader.read_exact(&mut size).await?;
let mut data = vec![0u8; usize::from_le_bytes(size)];
reader.read_exact(&mut data).await?;
match PacketBodyFlag::try_from(flag[0]) {
Ok(PacketBodyFlag::SphinxPacket) => Ok(Ok(PacketBody::SphinxPacket(data))),
Ok(PacketBodyFlag::Fragment) => Ok(Ok(PacketBody::Fragment(data))),
Err(e) => Ok(Err(e)),
}
}
pub(crate) fn process_sphinx_packet(
packet: &[u8],
private_key: &PrivateKey,
) -> Result<Packet, MixnetError> {
Packet::new(sphinx_packet::SphinxPacket::from_bytes(packet)?.process(private_key)?)
}
}
#[repr(u8)]
enum PacketBodyFlag {
SphinxPacket,
Fragment,
}
impl TryFrom<u8> for PacketBodyFlag {
type Error = MixnetError;
fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
0u8 => Ok(PacketBodyFlag::SphinxPacket),
1u8 => Ok(PacketBodyFlag::Fragment),
_ => Err(MixnetError::InvalidPacketFlag),
}
}
}
pub(crate) enum Message {
Real(Vec<u8>),
DropCover(Vec<u8>),
}
impl Message {
fn bytes(self) -> Box<[u8]> {
match self {
Self::Real(msg) => Self::bytes_with_flag(MessageFlag::Real, msg),
Self::DropCover(msg) => Self::bytes_with_flag(MessageFlag::DropCover, msg),
}
}
fn bytes_with_flag(flag: MessageFlag, mut msg: Vec<u8>) -> Box<[u8]> {
let mut out = Vec::with_capacity(1 + msg.len());
out.push(flag as u8);
out.append(&mut msg);
out.into_boxed_slice()
}
pub(crate) fn from_bytes(value: &[u8]) -> Result<Self, MixnetError> {
if value.is_empty() {
return Err(MixnetError::InvalidMessage);
}
match MessageFlag::try_from(value[0])? {
MessageFlag::Real => Ok(Self::Real(value[1..].into())),
MessageFlag::DropCover => Ok(Self::DropCover(value[1..].into())),
}
}
}
#[repr(u8)]
enum MessageFlag {
Real,
DropCover,
}
impl TryFrom<u8> for MessageFlag {
type Error = MixnetError;
fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
0u8 => Ok(MessageFlag::Real),
1u8 => Ok(MessageFlag::DropCover),
_ => Err(MixnetError::InvalidPacketFlag),
}
}
}

View File

@ -1,93 +0,0 @@
use std::time::Duration;
use rand::Rng;
use rand_distr::{Distribution, Exp};
use crate::error::MixnetError;
/// A Poisson process that models the times at which events occur.
pub struct Poisson(Exp<f64>);
impl Poisson {
/// Create a new Poisson process with the given rate per minute.
pub fn new(rate_per_min: f64) -> Result<Self, MixnetError> {
Ok(Self(Exp::new(rate_per_min)?))
}
/// Get a random interval between events that follow a Poisson distribution.
///
/// If events occur in a Poisson distribution with rate_per_min,
/// the interval between events follow the exponential distribution with rate_per_min.
pub fn interval<R: Rng + ?Sized>(&self, rng: &mut R) -> Duration {
// generate a random value from the distribution
let interval_min = self.0.sample(rng);
// convert minutes to seconds
Duration::from_secs_f64(interval_min * 60.0)
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::rngs::OsRng;
use std::{collections::BTreeMap, time::Duration};
// Test the interval generation for a specific rate
#[test]
fn test_interval_generation() {
let interval = Poisson::new(1.0).unwrap().interval(&mut OsRng);
// Check if the interval is within a plausible range
// This is a basic check; in practice, you may want to perform a statistical test
assert!(interval > Duration::from_secs(0)); // Must be positive
}
// Compute the empirical CDF
fn empirical_cdf(samples: &[Duration]) -> BTreeMap<Duration, f64> {
let mut map = BTreeMap::new();
let n = samples.len() as f64;
for &sample in samples {
*map.entry(sample).or_insert(0.0) += 1.0 / n;
}
let mut acc = 0.0;
for value in map.values_mut() {
acc += *value;
*value = acc;
}
map
}
// Compare the empirical CDF to the theoretical CDF
#[test]
fn test_distribution_fit() {
let rate_per_min = 1.0;
let mut intervals = Vec::new();
// Generate 10,000 samples
let poisson = Poisson::new(rate_per_min).unwrap();
for _ in 0..10_000 {
intervals.push(poisson.interval(&mut OsRng));
}
let empirical = empirical_cdf(&intervals);
// theoretical CDF for exponential distribution
let rate_per_sec = rate_per_min / 60.0;
let theoretical_cdf = |x: f64| 1.0 - (-rate_per_sec * x).exp();
// Kolmogorov-Smirnov test
let ks_statistic: f64 = empirical
.iter()
.map(|(&k, &v)| {
let x = k.as_secs_f64();
(theoretical_cdf(x) - v).abs()
})
.fold(0.0, f64::max);
println!("KS Statistic: {}", ks_statistic);
assert!(ks_statistic < 0.05, "Distributions differ significantly.");
}
}

View File

@ -1,199 +0,0 @@
use nomos_utils::fisheryates::FisherYatesShuffle;
use rand::Rng;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use sphinx_packet::{
constants::IDENTIFIER_LENGTH,
crypto::{PublicKey, PUBLIC_KEY_SIZE},
route::{DestinationAddressBytes, SURBIdentifier},
};
use crate::{address::NodeAddress, error::MixnetError};
/// Defines Mixnet topology construction and route selection
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct MixnetTopology {
mixnode_candidates: Vec<MixNodeInfo>,
num_layers: usize,
num_mixnodes_per_layer: usize,
}
impl MixnetTopology {
/// Generates [MixnetTopology] with random shuffling/sampling using a given entropy.
///
/// # Errors
///
/// This function will return an error if parameters are invalid.
pub fn new(
mut mixnode_candidates: Vec<MixNodeInfo>,
num_layers: usize,
num_mixnodes_per_layer: usize,
entropy: [u8; 32],
) -> Result<Self, MixnetError> {
if mixnode_candidates.len() < num_layers * num_mixnodes_per_layer {
return Err(MixnetError::InvalidTopologySize);
}
FisherYatesShuffle::shuffle(&mut mixnode_candidates, entropy);
Ok(Self {
mixnode_candidates,
num_layers,
num_mixnodes_per_layer,
})
}
/// Selects a mix destination randomly from the last mix layer
pub(crate) fn choose_destination(&self) -> sphinx_packet::route::Destination {
let idx_in_layer = rand::thread_rng().gen_range(0..self.num_mixnodes_per_layer);
let idx = self.num_mixnodes_per_layer * (self.num_layers - 1) + idx_in_layer;
self.mixnode_candidates[idx].clone().into()
}
/// Selects a mix route randomly from all mix layers except the last layer
/// and append a mix destination to the end of the mix route.
///
/// That is, the caller can generate multiple routes with one mix destination.
pub(crate) fn gen_route(&self) -> Vec<sphinx_packet::route::Node> {
let mut route = Vec::with_capacity(self.num_layers);
for layer in 0..self.num_layers - 1 {
let idx_in_layer = rand::thread_rng().gen_range(0..self.num_mixnodes_per_layer);
let idx = self.num_mixnodes_per_layer * layer + idx_in_layer;
route.push(self.mixnode_candidates[idx].clone().into());
}
route
}
}
/// Mix node information that is used for forwarding packets to the mix node
#[derive(Clone, Debug)]
pub struct MixNodeInfo(sphinx_packet::route::Node);
impl MixNodeInfo {
/// Creates a [`MixNodeInfo`].
pub fn new(
address: NodeAddress,
public_key: [u8; PUBLIC_KEY_SIZE],
) -> Result<Self, MixnetError> {
Ok(Self(sphinx_packet::route::Node::new(
address.try_into()?,
PublicKey::from(public_key),
)))
}
}
impl From<MixNodeInfo> for sphinx_packet::route::Node {
fn from(info: MixNodeInfo) -> Self {
info.0
}
}
const DUMMY_SURB_IDENTIFIER: SURBIdentifier = [0u8; IDENTIFIER_LENGTH];
impl From<MixNodeInfo> for sphinx_packet::route::Destination {
fn from(info: MixNodeInfo) -> Self {
sphinx_packet::route::Destination::new(
DestinationAddressBytes::from_bytes(info.0.address.as_bytes()),
DUMMY_SURB_IDENTIFIER,
)
}
}
impl Serialize for MixNodeInfo {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
SerializableMixNodeInfo::try_from(self)
.map_err(serde::ser::Error::custom)?
.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for MixNodeInfo {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
Self::try_from(SerializableMixNodeInfo::deserialize(deserializer)?)
.map_err(serde::de::Error::custom)
}
}
// Only for serializing/deserializing [`MixNodeInfo`] since [`sphinx_packet::route::Node`] is not serializable.
#[derive(Serialize, Deserialize, Clone, Debug)]
struct SerializableMixNodeInfo {
address: NodeAddress,
public_key: [u8; PUBLIC_KEY_SIZE],
}
impl TryFrom<&MixNodeInfo> for SerializableMixNodeInfo {
type Error = MixnetError;
fn try_from(info: &MixNodeInfo) -> Result<Self, Self::Error> {
Ok(Self {
address: NodeAddress::try_from(info.0.address)?,
public_key: *info.0.pub_key.as_bytes(),
})
}
}
impl TryFrom<SerializableMixNodeInfo> for MixNodeInfo {
type Error = MixnetError;
fn try_from(info: SerializableMixNodeInfo) -> Result<Self, Self::Error> {
Self::new(info.address, info.public_key)
}
}
#[cfg(test)]
pub mod tests {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use rand::RngCore;
use sphinx_packet::crypto::{PrivateKey, PublicKey};
use crate::error::MixnetError;
use super::{MixNodeInfo, MixnetTopology};
#[test]
fn shuffle() {
let candidates = gen_mixnodes(10);
let topology = MixnetTopology::new(candidates.clone(), 3, 2, gen_entropy()).unwrap();
assert_eq!(candidates.len(), topology.mixnode_candidates.len());
}
#[test]
fn route_and_destination() {
let topology = MixnetTopology::new(gen_mixnodes(10), 3, 2, gen_entropy()).unwrap();
let _ = topology.choose_destination();
assert_eq!(2, topology.gen_route().len()); // except a destination
}
#[test]
fn invalid_topology_size() {
// if # of candidates is smaller than the topology size
assert!(matches!(
MixnetTopology::new(gen_mixnodes(5), 3, 2, gen_entropy()).err(),
Some(MixnetError::InvalidTopologySize),
));
}
pub fn gen_mixnodes(n: usize) -> Vec<MixNodeInfo> {
(0..n)
.map(|i| {
MixNodeInfo::new(
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), i as u16).into(),
*PublicKey::from(&PrivateKey::new()).as_bytes(),
)
.unwrap()
})
.collect()
}
pub fn gen_entropy() -> [u8; 32] {
let mut entropy = [0u8; 32];
rand::thread_rng().fill_bytes(&mut entropy);
entropy
}
}

View File

@ -41,6 +41,5 @@ uuid = { version = "1.10.0", features = ["v4"] }
[features] [features]
default = ["tracing"] default = ["tracing"]
mixnet = ["nomos-node/mixnet"]
metrics = ["nomos-node/metrics"] metrics = ["nomos-node/metrics"]
tracing = ["nomos-node/tracing"] tracing = ["nomos-node/tracing"]

View File

@ -63,6 +63,5 @@ rand = "0.8"
[features] [features]
default = ["tracing"] default = ["tracing"]
mixnet = ["nomos-network/mixnet"]
metrics = [] metrics = []
tracing = [] tracing = []

View File

@ -47,19 +47,6 @@ network:
port: 3000 port: 3000
node_key: 40fb62acf1604000c1b8d3bd0880e43eb2f6ae52029fde75d992ba0fed6e01c3 node_key: 40fb62acf1604000c1b8d3bd0880e43eb2f6ae52029fde75d992ba0fed6e01c3
initial_peers: [] initial_peers: []
mixnet:
mixclient:
topology:
mixnode_candidates:
- address: 127.0.0.1:3000
public_key: [110, 177, 93, 41, 184, 16, 49, 126, 195, 57, 202, 199, 160, 161, 47, 195, 221, 40, 143, 151, 38, 250, 22, 82, 40, 83, 91, 3, 200, 239, 155, 67]
num_layers: 1
num_mixnodes_per_layer: 1
emission_rate_per_min: 600.0
redundancy: 1
mixnode:
encryption_private_key: [183, 50, 199, 33, 53, 46, 43, 123, 6, 173, 255, 66, 183, 156, 146, 221, 80, 102, 22, 155, 216, 234, 28, 99, 107, 231, 99, 27, 250, 17, 36, 108]
delay_rate_per_min: 60000.0
http: http:
backend_settings: backend_settings:

View File

@ -221,8 +221,6 @@ pub fn update_network(
network.backend.initial_peers = peers; network.backend.initial_peers = peers;
} }
// TODO: configure mixclient and mixnode if the mixnet feature is enabled
Ok(()) Ok(())
} }

View File

@ -15,7 +15,6 @@ libp2p = { version = "0.53.2", features = [
"quic", "quic",
"secp256k1", "secp256k1",
] } ] }
libp2p-stream = "0.1.0-alpha"
blake2 = { version = "0.10" } blake2 = { version = "0.10" }
serde = { version = "1.0.166", features = ["derive"] } serde = { version = "1.0.166", features = ["derive"] }
hex = "0.4.3" hex = "0.4.3"

View File

@ -20,8 +20,6 @@ pub use libp2p::{
swarm::{dial_opts::DialOpts, DialError, NetworkBehaviour, SwarmEvent}, swarm::{dial_opts::DialOpts, DialError, NetworkBehaviour, SwarmEvent},
PeerId, SwarmBuilder, Transport, PeerId, SwarmBuilder, Transport,
}; };
pub use libp2p_stream;
use libp2p_stream::Control;
pub use multiaddr::{multiaddr, Multiaddr, Protocol}; pub use multiaddr::{multiaddr, Multiaddr, Protocol};
// TODO: Risc0 proofs are HUGE (220 Kb) and it's the only reason we need to have this // TODO: Risc0 proofs are HUGE (220 Kb) and it's the only reason we need to have this
@ -36,7 +34,6 @@ pub struct Swarm {
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
pub struct Behaviour { pub struct Behaviour {
stream: libp2p_stream::Behaviour,
gossipsub: gossipsub::Behaviour, gossipsub: gossipsub::Behaviour,
} }
@ -50,10 +47,7 @@ impl Behaviour {
.max_transmit_size(DATA_LIMIT) .max_transmit_size(DATA_LIMIT)
.build()?, .build()?,
)?; )?;
Ok(Self { Ok(Self { gossipsub })
stream: libp2p_stream::Behaviour::new(),
gossipsub,
})
} }
} }
@ -150,13 +144,6 @@ impl Swarm {
gossipsub::IdentTopic::new(topic).hash() gossipsub::IdentTopic::new(topic).hash()
} }
/// Returns a stream control that can be used to accept streams and establish streams to
/// other peers.
/// Stream controls can be cloned.
pub fn stream_control(&self) -> Control {
self.swarm.behaviour().stream.new_control()
}
pub fn multiaddr(ip: std::net::Ipv4Addr, port: u16) -> Multiaddr { pub fn multiaddr(ip: std::net::Ipv4Addr, port: u16) -> Multiaddr {
multiaddr!(Ip4(ip), Udp(port), QuicV1) multiaddr!(Ip4(ip), Udp(port), QuicV1)
} }
@ -175,68 +162,3 @@ fn compute_message_id(message: &Message) -> MessageId {
hasher.update(&message.data); hasher.update(&message.data);
MessageId::from(hasher.finalize().to_vec()) MessageId::from(hasher.finalize().to_vec())
} }
#[cfg(test)]
mod tests {
use std::time::Duration;
use futures::{AsyncReadExt, AsyncWriteExt, StreamExt};
use libp2p::StreamProtocol;
use rand::Rng;
use crate::{Swarm, SwarmConfig};
#[tokio::test]
async fn stream() {
// Init two swarms
let (config1, mut swarm1) = init_swarm();
let (_, mut swarm2) = init_swarm();
let swarm1_peer_id = *swarm1.swarm().local_peer_id();
// Dial to swarm1
swarm2
.connect(Swarm::multiaddr(config1.host, config1.port))
.unwrap();
// Prepare stream controls
let mut stream_control1 = swarm1.stream_control();
let mut stream_control2 = swarm2.stream_control();
// Poll swarms to make progress
tokio::spawn(async move { while (swarm1.next().await).is_some() {} });
tokio::spawn(async move { while (swarm2.next().await).is_some() {} });
// Make swarm1 accept incoming streams
let protocol = StreamProtocol::new("/test");
let mut incoming_streams = stream_control1.accept(protocol).unwrap();
tokio::spawn(async move {
// If a new stream is established, write bytes and close the stream.
while let Some((_, mut stream)) = incoming_streams.next().await {
stream.write_all(&[1, 2, 3, 4]).await.unwrap();
stream.close().await.unwrap();
}
});
// Wait until the connection is established
tokio::time::sleep(Duration::from_secs(1)).await;
// Establish a stream with swarm1 and read bytes
let mut stream = stream_control2
.open_stream(swarm1_peer_id, StreamProtocol::new("/test"))
.await
.unwrap();
let mut buf = [0u8; 4];
stream.read_exact(&mut buf).await.unwrap();
assert_eq!(buf, [1, 2, 3, 4]);
}
fn init_swarm() -> (SwarmConfig, Swarm) {
let config = SwarmConfig {
host: std::net::Ipv4Addr::new(127, 0, 0, 1),
port: rand::thread_rng().gen_range(10000..30000),
..Default::default()
};
let swarm = Swarm::build(&config).unwrap();
(config, swarm)
}
}

View File

@ -44,4 +44,3 @@ blake2 = { version = "0.10" }
[features] [features]
default = ["libp2p"] default = ["libp2p"]
libp2p = [] libp2p = []
mixnet = []

View File

@ -1,22 +1,13 @@
// Networking is not essential for verifier and indexer tests.
// Libp2p network is chosen for consensus requirement, mixnet is ignored.
//
// Note: To enable rust-analyzer in modules, comment out the
// `#[cfg(not(feature = "mixnet"))]` lines (reenable when pushing).
#[cfg(test)] #[cfg(test)]
#[cfg(feature = "libp2p")] #[cfg(feature = "libp2p")]
#[cfg(not(feature = "mixnet"))]
mod common; mod common;
#[cfg(test)] #[cfg(test)]
#[cfg(feature = "libp2p")] #[cfg(feature = "libp2p")]
#[cfg(not(feature = "mixnet"))]
mod indexer_integration; mod indexer_integration;
#[cfg(test)] #[cfg(test)]
#[cfg(feature = "libp2p")] #[cfg(feature = "libp2p")]
#[cfg(not(feature = "mixnet"))]
mod verifier_integration; mod verifier_integration;
#[cfg(test)] #[cfg(test)]

View File

@ -22,7 +22,6 @@ futures = "0.3"
parking_lot = "0.12" parking_lot = "0.12"
nomos-core = { path = "../../nomos-core" } nomos-core = { path = "../../nomos-core" }
nomos-libp2p = { path = "../../nomos-libp2p", optional = true } nomos-libp2p = { path = "../../nomos-libp2p", optional = true }
mixnet = { path = "../../mixnet", optional = true }
utoipa = { version = "4.0", optional = true } utoipa = { version = "4.0", optional = true }
serde_json = { version = "1", optional = true } serde_json = { version = "1", optional = true }
@ -33,6 +32,5 @@ tokio = { version = "1", features = ["full"] }
[features] [features]
default = [] default = []
libp2p = ["nomos-libp2p", "rand", "humantime-serde"] libp2p = ["nomos-libp2p", "rand", "humantime-serde"]
mixnet = ["dep:mixnet"]
mock = ["rand", "chrono"] mock = ["rand", "chrono"]
openapi = ["dep:utoipa", "serde_json"] openapi = ["dep:utoipa", "serde_json"]

View File

@ -1,4 +1,4 @@
use nomos_libp2p::{libp2p::StreamProtocol, Multiaddr, PeerId}; use nomos_libp2p::{Multiaddr, PeerId};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::oneshot; use tokio::sync::oneshot;
@ -21,11 +21,6 @@ pub enum Command {
message: Box<[u8]>, message: Box<[u8]>,
retry_count: usize, retry_count: usize,
}, },
StreamSend {
peer_id: PeerId,
protocol: StreamProtocol,
data: Box<[u8]>,
},
} }
#[derive(Debug)] #[derive(Debug)]

View File

@ -1,9 +1,6 @@
use nomos_libp2p::{Multiaddr, SwarmConfig}; use nomos_libp2p::{Multiaddr, SwarmConfig};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[cfg(feature = "mixnet")]
use crate::backends::libp2p::mixnet::MixnetConfig;
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Libp2pConfig { pub struct Libp2pConfig {
#[serde(flatten)] #[serde(flatten)]
@ -11,6 +8,4 @@ pub struct Libp2pConfig {
// Initial peers to connect to // Initial peers to connect to
#[serde(default)] #[serde(default)]
pub initial_peers: Vec<Multiaddr>, pub initial_peers: Vec<Multiaddr>,
#[cfg(feature = "mixnet")]
pub mixnet: MixnetConfig,
} }

View File

@ -1,201 +0,0 @@
use std::net::SocketAddr;
use futures::StreamExt;
use mixnet::{
address::NodeAddress,
client::{MessageQueue, MixClient, MixClientConfig},
node::{MixNode, MixNodeConfig, Output, PacketQueue},
packet::PacketBody,
};
use nomos_core::wire;
use nomos_libp2p::{
libp2p::{Stream, StreamProtocol},
libp2p_stream::IncomingStreams,
Multiaddr, Protocol,
};
use serde::{Deserialize, Serialize};
use tokio::{
runtime::Handle,
sync::{mpsc, oneshot},
};
use crate::backends::libp2p::{Command, Dial, Topic};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MixnetConfig {
pub mixclient: MixClientConfig,
pub mixnode: MixNodeConfig,
}
pub(crate) const STREAM_PROTOCOL: StreamProtocol = StreamProtocol::new("/mixnet");
pub(crate) fn init_mixnet(
config: MixnetConfig,
runtime_handle: Handle,
cmd_tx: mpsc::Sender<Command>,
incoming_streams: IncomingStreams,
) -> MessageQueue {
// Run mixnode
let (mixnode, packet_queue) = MixNode::new(config.mixnode).unwrap();
let libp2p_cmd_tx = cmd_tx.clone();
let queue = packet_queue.clone();
runtime_handle.spawn(async move {
run_mixnode(mixnode, queue, libp2p_cmd_tx).await;
});
let handle = runtime_handle.clone();
let queue = packet_queue.clone();
runtime_handle.spawn(async move {
handle_incoming_streams(incoming_streams, queue, handle).await;
});
// Run mixclient
let (mixclient, message_queue) = MixClient::new(config.mixclient).unwrap();
runtime_handle.spawn(async move {
run_mixclient(mixclient, packet_queue, cmd_tx).await;
});
message_queue
}
async fn run_mixnode(
mut mixnode: MixNode,
packet_queue: PacketQueue,
cmd_tx: mpsc::Sender<Command>,
) {
while let Some(output) = mixnode.next().await {
match output {
Output::Forward(packet) => {
stream_send(packet.address(), packet.body(), &cmd_tx, &packet_queue).await;
}
Output::ReconstructedMessage(message) => match MixnetMessage::from_bytes(&message) {
Ok(msg) => {
cmd_tx
.send(Command::Broadcast {
topic: msg.topic,
message: msg.message,
})
.await
.unwrap();
}
Err(e) => {
tracing::error!("failed to parse message received from mixnet: {e}");
}
},
}
}
}
async fn run_mixclient(
mut mixclient: MixClient,
packet_queue: PacketQueue,
cmd_tx: mpsc::Sender<Command>,
) {
while let Some(packet) = mixclient.next().await {
stream_send(packet.address(), packet.body(), &cmd_tx, &packet_queue).await;
}
}
async fn handle_incoming_streams(
mut incoming_streams: IncomingStreams,
packet_queue: PacketQueue,
runtime_handle: Handle,
) {
while let Some((_, stream)) = incoming_streams.next().await {
let queue = packet_queue.clone();
runtime_handle.spawn(async move {
if let Err(e) = handle_stream(stream, queue).await {
tracing::warn!("stream closed: {e}");
}
});
}
}
async fn handle_stream(mut stream: Stream, packet_queue: PacketQueue) -> std::io::Result<()> {
loop {
match PacketBody::read_from(&mut stream).await? {
Ok(packet_body) => {
packet_queue
.send(packet_body)
.await
.expect("The receiving half of packet queue should be always open");
}
Err(e) => {
tracing::error!(
"failed to parse packet body. continuing reading the next packet: {e}"
);
}
}
}
}
async fn stream_send(
addr: NodeAddress,
packet_body: PacketBody,
cmd_tx: &mpsc::Sender<Command>,
packet_queue: &PacketQueue,
) {
let (tx, rx) = oneshot::channel();
cmd_tx
.send(Command::Connect(Dial {
addr: multiaddr_from(addr),
retry_count: 3,
result_sender: tx,
}))
.await
.expect("Command receiver should be always open");
match rx.await {
Ok(Ok(peer_id)) => {
cmd_tx
.send(Command::StreamSend {
peer_id,
protocol: STREAM_PROTOCOL,
data: packet_body.bytes(),
})
.await
.expect("Command receiver should be always open");
}
Ok(Err(e)) => match e {
nomos_libp2p::DialError::NoAddresses => {
tracing::debug!("Dialing failed because the peer is the local node. Sending msg directly to the queue");
packet_queue
.send(packet_body)
.await
.expect("The receiving half of packet queue should be always open");
}
_ => tracing::error!("failed to dial with unrecoverable error: {e}"),
},
Err(e) => {
tracing::error!("channel closed before receiving: {e}");
}
}
}
fn multiaddr_from(addr: NodeAddress) -> Multiaddr {
match SocketAddr::from(addr) {
SocketAddr::V4(addr) => Multiaddr::empty()
.with(Protocol::Ip4(*addr.ip()))
.with(Protocol::Udp(addr.port()))
.with(Protocol::QuicV1),
SocketAddr::V6(addr) => Multiaddr::empty()
.with(Protocol::Ip6(*addr.ip()))
.with(Protocol::Udp(addr.port()))
.with(Protocol::QuicV1),
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub(crate) struct MixnetMessage {
pub topic: Topic,
pub message: Box<[u8]>,
}
impl MixnetMessage {
pub fn as_bytes(&self) -> Vec<u8> {
wire::serialize(self).expect("Couldn't serialize MixnetMessage")
}
pub fn from_bytes(data: &[u8]) -> Result<Self, wire::Error> {
wire::deserialize(data)
}
}

View File

@ -1,7 +1,5 @@
mod command; mod command;
mod config; mod config;
#[cfg(feature = "mixnet")]
pub mod mixnet;
pub(crate) mod swarm; pub(crate) mod swarm;
// std // std
@ -11,10 +9,6 @@ use self::swarm::SwarmHandler;
// internal // internal
use super::NetworkBackend; use super::NetworkBackend;
#[cfg(feature = "mixnet")]
use crate::backends::libp2p::mixnet::{init_mixnet, MixnetMessage, STREAM_PROTOCOL};
#[cfg(feature = "mixnet")]
use ::mixnet::client::MessageQueue;
pub use nomos_libp2p::libp2p::gossipsub::{Message, TopicHash}; pub use nomos_libp2p::libp2p::gossipsub::{Message, TopicHash};
// crates // crates
use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::NoState}; use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::NoState};
@ -23,8 +17,6 @@ use tokio::sync::{broadcast, mpsc};
pub struct Libp2p { pub struct Libp2p {
events_tx: broadcast::Sender<Event>, events_tx: broadcast::Sender<Event>,
commands_tx: mpsc::Sender<Command>, commands_tx: mpsc::Sender<Command>,
#[cfg(feature = "mixnet")]
mixclient_message_queue: MessageQueue,
} }
#[derive(Debug)] #[derive(Debug)]
@ -55,14 +47,6 @@ impl NetworkBackend for Libp2p {
let mut swarm_handler = let mut swarm_handler =
SwarmHandler::new(&config, commands_tx.clone(), commands_rx, events_tx.clone()); SwarmHandler::new(&config, commands_tx.clone(), commands_rx, events_tx.clone());
#[cfg(feature = "mixnet")]
let mixclient_message_queue = init_mixnet(
config.mixnet,
overwatch_handle.runtime().clone(),
commands_tx.clone(),
swarm_handler.incoming_streams(STREAM_PROTOCOL),
);
overwatch_handle.runtime().spawn(async move { overwatch_handle.runtime().spawn(async move {
swarm_handler.run(config.initial_peers).await; swarm_handler.run(config.initial_peers).await;
}); });
@ -70,35 +54,15 @@ impl NetworkBackend for Libp2p {
Self { Self {
events_tx, events_tx,
commands_tx, commands_tx,
#[cfg(feature = "mixnet")]
mixclient_message_queue,
} }
} }
#[cfg(not(feature = "mixnet"))]
async fn process(&self, msg: Self::Message) { async fn process(&self, msg: Self::Message) {
if let Err(e) = self.commands_tx.send(msg).await { if let Err(e) = self.commands_tx.send(msg).await {
tracing::error!("failed to send command to nomos-libp2p: {e:?}"); tracing::error!("failed to send command to nomos-libp2p: {e:?}");
} }
} }
#[cfg(feature = "mixnet")]
async fn process(&self, msg: Self::Message) {
match msg {
Command::Broadcast { topic, message } => {
let msg = MixnetMessage { topic, message };
if let Err(e) = self.mixclient_message_queue.send(msg.as_bytes()).await {
tracing::error!("failed to send messasge to mixclient: {e}");
}
}
cmd => {
if let Err(e) = self.commands_tx.send(cmd).await {
tracing::error!("failed to send command to libp2p swarm: {e:?}");
}
}
}
}
async fn subscribe( async fn subscribe(
&mut self, &mut self,
kind: Self::EventKind, kind: Self::EventKind,

View File

@ -1,16 +1,7 @@
use std::{ use std::{collections::HashMap, time::Duration};
collections::{hash_map::Entry, HashMap},
time::Duration,
};
use futures::AsyncWriteExt;
#[cfg(feature = "mixnet")]
use nomos_libp2p::libp2p_stream::IncomingStreams;
use nomos_libp2p::{ use nomos_libp2p::{
gossipsub, gossipsub, libp2p::swarm::ConnectionId, BehaviourEvent, Multiaddr, PeerId, Swarm, SwarmEvent,
libp2p::{swarm::ConnectionId, Stream, StreamProtocol},
libp2p_stream::{Control, OpenStreamError},
BehaviourEvent, Multiaddr, PeerId, Swarm, SwarmEvent,
}; };
use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
@ -24,8 +15,6 @@ use super::{
pub struct SwarmHandler { pub struct SwarmHandler {
pub swarm: Swarm, pub swarm: Swarm,
stream_control: Control,
streams: HashMap<PeerId, Stream>,
pub pending_dials: HashMap<ConnectionId, Dial>, pub pending_dials: HashMap<ConnectionId, Dial>,
pub commands_tx: mpsc::Sender<Command>, pub commands_tx: mpsc::Sender<Command>,
pub commands_rx: mpsc::Receiver<Command>, pub commands_rx: mpsc::Receiver<Command>,
@ -53,15 +42,12 @@ impl SwarmHandler {
events_tx: broadcast::Sender<Event>, events_tx: broadcast::Sender<Event>,
) -> Self { ) -> Self {
let swarm = Swarm::build(&config.inner).unwrap(); let swarm = Swarm::build(&config.inner).unwrap();
let stream_control = swarm.stream_control();
// Keep the dialing history since swarm.connect doesn't return the result synchronously // Keep the dialing history since swarm.connect doesn't return the result synchronously
let pending_dials = HashMap::<ConnectionId, Dial>::new(); let pending_dials = HashMap::<ConnectionId, Dial>::new();
Self { Self {
swarm, swarm,
stream_control,
streams: HashMap::new(),
pending_dials, pending_dials,
commands_tx, commands_tx,
commands_rx, commands_rx,
@ -173,24 +159,6 @@ impl SwarmHandler {
} => { } => {
self.broadcast_and_retry(topic, message, retry_count).await; self.broadcast_and_retry(topic, message, retry_count).await;
} }
Command::StreamSend {
peer_id,
protocol,
data,
} => {
tracing::debug!("StreamSend to {peer_id}");
match self.open_stream(peer_id, protocol).await {
Ok(stream) => {
if let Err(e) = stream.write_all(&data).await {
tracing::error!("failed to write to the stream with ${peer_id}: {e}");
self.close_stream(&peer_id).await;
}
}
Err(e) => {
tracing::error!("failed to open stream with {peer_id}: {e}");
}
}
}
} }
} }
@ -287,26 +255,4 @@ impl SwarmHandler {
fn exp_backoff(retry: usize) -> Duration { fn exp_backoff(retry: usize) -> Duration {
std::time::Duration::from_secs(BACKOFF.pow(retry as u32)) std::time::Duration::from_secs(BACKOFF.pow(retry as u32))
} }
#[cfg(feature = "mixnet")]
pub fn incoming_streams(&mut self, protocol: StreamProtocol) -> IncomingStreams {
self.stream_control.accept(protocol).unwrap()
}
async fn open_stream(
&mut self,
peer_id: PeerId,
protocol: StreamProtocol,
) -> Result<&mut Stream, OpenStreamError> {
if let Entry::Vacant(entry) = self.streams.entry(peer_id) {
entry.insert(self.stream_control.open_stream(peer_id, protocol).await?);
}
Ok(self.streams.get_mut(&peer_id).unwrap())
}
async fn close_stream(&mut self, peer_id: &PeerId) {
if let Some(mut stream) = self.streams.remove(peer_id) {
let _ = stream.close().await;
}
}
} }

View File

@ -9,7 +9,6 @@ blst = { version = "0.3.11" }
nomos-node = { path = "../nodes/nomos-node", default-features = false } nomos-node = { path = "../nodes/nomos-node", default-features = false }
nomos-network = { path = "../nomos-services/network", features = ["libp2p"] } nomos-network = { path = "../nomos-services/network", features = ["libp2p"] }
cryptarchia-consensus = { path = "../nomos-services/cryptarchia-consensus" } cryptarchia-consensus = { path = "../nomos-services/cryptarchia-consensus" }
mixnet = { path = "../mixnet" }
nomos-log = { path = "../nomos-services/log" } nomos-log = { path = "../nomos-services/log" }
nomos-api = { path = "../nomos-services/api" } nomos-api = { path = "../nomos-services/api" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
@ -54,5 +53,4 @@ name = "test_cli"
path = "src/tests/cli.rs" path = "src/tests/cli.rs"
[features] [features]
mixnet = ["nomos-network/mixnet"]
metrics = ["nomos-node/metrics"] metrics = ["nomos-node/metrics"]

View File

@ -10,13 +10,6 @@ use cl::{InputWitness, NoteWitness, NullifierSecret};
use cryptarchia_consensus::{CryptarchiaInfo, CryptarchiaSettings, TimeConfig}; use cryptarchia_consensus::{CryptarchiaInfo, CryptarchiaSettings, TimeConfig};
use cryptarchia_ledger::LedgerState; use cryptarchia_ledger::LedgerState;
use kzgrs_backend::dispersal::BlobInfo; use kzgrs_backend::dispersal::BlobInfo;
#[cfg(feature = "mixnet")]
use mixnet::{
address::NodeAddress,
client::MixClientConfig,
node::MixNodeConfig,
topology::{MixNodeInfo, MixnetTopology},
};
use nomos_core::{block::Block, header::HeaderId, staking::NMO_UNIT}; use nomos_core::{block::Block, header::HeaderId, staking::NMO_UNIT};
use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageAdapterSettings; use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageAdapterSettings;
use nomos_da_indexer::IndexerSettings; use nomos_da_indexer::IndexerSettings;
@ -31,8 +24,6 @@ use nomos_da_verifier::DaVerifierServiceSettings;
use nomos_libp2p::{Multiaddr, PeerId, SwarmConfig}; use nomos_libp2p::{Multiaddr, PeerId, SwarmConfig};
use nomos_log::{LoggerBackend, LoggerFormat}; use nomos_log::{LoggerBackend, LoggerFormat};
use nomos_mempool::MempoolMetrics; use nomos_mempool::MempoolMetrics;
#[cfg(feature = "mixnet")]
use nomos_network::backends::libp2p::mixnet::MixnetConfig;
use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig}; use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig};
use nomos_node::api::paths::{ use nomos_node::api::paths::{
CL_METRICS, CRYPTARCHIA_HEADERS, CRYPTARCHIA_INFO, DA_GET_RANGE, STORAGE_BLOCK, CL_METRICS, CRYPTARCHIA_HEADERS, CRYPTARCHIA_INFO, DA_GET_RANGE, STORAGE_BLOCK,
@ -55,8 +46,6 @@ static CLIENT: Lazy<Client> = Lazy::new(Client::new);
const NOMOS_BIN: &str = "../target/debug/nomos-node"; const NOMOS_BIN: &str = "../target/debug/nomos-node";
const DEFAULT_SLOT_TIME: u64 = 2; const DEFAULT_SLOT_TIME: u64 = 2;
const CONSENSUS_SLOT_TIME_VAR: &str = "CONSENSUS_SLOT_TIME"; const CONSENSUS_SLOT_TIME_VAR: &str = "CONSENSUS_SLOT_TIME";
#[cfg(feature = "mixnet")]
const NUM_MIXNODE_CANDIDATES: usize = 2;
pub struct NomosNode { pub struct NomosNode {
addr: SocketAddr, addr: SocketAddr,
@ -273,9 +262,6 @@ impl Node for NomosNode {
thread_rng().fill(id); thread_rng().fill(id);
} }
#[cfg(feature = "mixnet")]
let (mixclient_config, mixnode_configs) = create_mixnet_config(&ids);
let notes = ids let notes = ids
.iter() .iter()
.map(|&id| { .map(|&id| {
@ -322,11 +308,6 @@ impl Node for NomosNode {
vec![coin], vec![coin],
time_config.clone(), time_config.clone(),
da.clone(), da.clone(),
#[cfg(feature = "mixnet")]
MixnetConfig {
mixclient: mixclient_config.clone(),
mixnode: mixnode_configs[i].clone(),
},
) )
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -345,22 +326,6 @@ impl Node for NomosNode {
config.da_network.backend.addresses = peer_addresses.iter().cloned().collect(); config.da_network.backend.addresses = peer_addresses.iter().cloned().collect();
} }
#[cfg(feature = "mixnet")]
{
// Build a topology using only a subset of nodes.
let mixnode_candidates = configs
.iter()
.take(NUM_MIXNODE_CANDIDATES)
.collect::<Vec<_>>();
let topology = build_mixnet_topology(&mixnode_candidates);
// Set the topology to all configs
for config in &mut configs {
config.network.backend.mixnet.mixclient.topology = topology.clone();
}
configs
}
#[cfg(not(feature = "mixnet"))]
configs configs
} }
} }
@ -376,52 +341,6 @@ struct GetRangeReq {
pub range: Range<[u8; 8]>, pub range: Range<[u8; 8]>,
} }
#[cfg(feature = "mixnet")]
fn create_mixnet_config(ids: &[[u8; 32]]) -> (MixClientConfig, Vec<MixNodeConfig>) {
use std::num::NonZeroU8;
let mixnode_configs: Vec<MixNodeConfig> = ids
.iter()
.map(|id| MixNodeConfig {
encryption_private_key: *id,
delay_rate_per_min: 100000000.0,
})
.collect();
// Build an empty topology because it will be constructed with meaningful node infos later
let topology = MixnetTopology::new(Vec::new(), 0, 0, [1u8; 32]).unwrap();
(
MixClientConfig {
topology,
emission_rate_per_min: 120.0,
redundancy: NonZeroU8::new(1).unwrap(),
},
mixnode_configs,
)
}
#[cfg(feature = "mixnet")]
fn build_mixnet_topology(mixnode_candidates: &[&Config]) -> MixnetTopology {
use mixnet::crypto::public_key_from;
use std::net::{IpAddr, Ipv4Addr};
let candidates = mixnode_candidates
.iter()
.map(|config| {
MixNodeInfo::new(
NodeAddress::from(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
config.network.backend.inner.port,
)),
public_key_from(config.network.backend.mixnet.mixnode.encryption_private_key),
)
.unwrap()
})
.collect::<Vec<_>>();
let num_layers = candidates.len();
MixnetTopology::new(candidates, num_layers, 1, [1u8; 32]).unwrap()
}
fn secret_key_to_peer_id(node_key: nomos_libp2p::ed25519::SecretKey) -> PeerId { fn secret_key_to_peer_id(node_key: nomos_libp2p::ed25519::SecretKey) -> PeerId {
PeerId::from_public_key( PeerId::from_public_key(
&nomos_libp2p::ed25519::Keypair::from(node_key) &nomos_libp2p::ed25519::Keypair::from(node_key)
@ -450,7 +369,6 @@ fn create_node_config(
notes: Vec<InputWitness>, notes: Vec<InputWitness>,
time: TimeConfig, time: TimeConfig,
da_config: DaConfig, da_config: DaConfig,
#[cfg(feature = "mixnet")] mixnet_config: MixnetConfig,
) -> Config { ) -> Config {
let swarm_config: SwarmConfig = Default::default(); let swarm_config: SwarmConfig = Default::default();
let node_key = swarm_config.node_key.clone(); let node_key = swarm_config.node_key.clone();
@ -463,8 +381,6 @@ fn create_node_config(
backend: Libp2pConfig { backend: Libp2pConfig {
inner: swarm_config, inner: swarm_config,
initial_peers: vec![], initial_peers: vec![],
#[cfg(feature = "mixnet")]
mixnet: mixnet_config,
}, },
}, },
cryptarchia: CryptarchiaSettings { cryptarchia: CryptarchiaSettings {