diff --git a/nomos-mix/core/Cargo.toml b/nomos-mix/core/Cargo.toml index d55a8c19..d32e44dc 100644 --- a/nomos-mix/core/Cargo.toml +++ b/nomos-mix/core/Cargo.toml @@ -19,4 +19,3 @@ x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] } [dev-dependencies] tokio = { version = "1", features = ["rt-multi-thread"] } -rand_chacha = "0.3" diff --git a/nomos-mix/core/src/persistent_transmission.rs b/nomos-mix/core/src/persistent_transmission.rs index 72531a49..c5e38e1f 100644 --- a/nomos-mix/core/src/persistent_transmission.rs +++ b/nomos-mix/core/src/persistent_transmission.rs @@ -1,9 +1,5 @@ use futures::{Stream, StreamExt}; -use nomos_mix_message::MixMessage; -use rand::{distributions::Uniform, prelude::Distribution, Rng, RngCore}; -use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; -use std::marker::PhantomData; use std::pin::{pin, Pin}; use std::task::{Context, Poll}; @@ -11,59 +7,38 @@ use std::task::{Context, Poll}; pub struct PersistentTransmissionSettings { /// The maximum number of messages that can be emitted per second pub max_emission_frequency: f64, - /// The probability of emitting a drop message by coin flipping - pub drop_message_probability: f64, } impl Default for PersistentTransmissionSettings { fn default() -> Self { Self { max_emission_frequency: 1.0, - drop_message_probability: 0.5, } } } /// Transmit scheduled messages with a persistent rate as a stream. -pub struct PersistentTransmissionStream +pub struct PersistentTransmissionStream where S: Stream, - Rng: RngCore, { - coin: Coin, stream: S, scheduler: Scheduler, - _mix_message: PhantomData, } -impl PersistentTransmissionStream +impl PersistentTransmissionStream where S: Stream, - Rng: RngCore, - M: MixMessage, Scheduler: Stream, { - pub fn new( - settings: PersistentTransmissionSettings, - stream: S, - scheduler: Scheduler, - rng: Rng, - ) -> PersistentTransmissionStream { - let coin = Coin::::new(rng, settings.drop_message_probability).unwrap(); - Self { - coin, - stream, - scheduler, - _mix_message: Default::default(), - } + pub fn new(stream: S, scheduler: Scheduler) -> PersistentTransmissionStream { + Self { stream, scheduler } } } -impl Stream for PersistentTransmissionStream +impl Stream for PersistentTransmissionStream where S: Stream> + Unpin, - Rng: RngCore + Unpin, - M: MixMessage + Unpin, Scheduler: Stream + Unpin, { type Item = Vec; @@ -72,7 +47,6 @@ where let Self { ref mut scheduler, ref mut stream, - ref mut coin, .. } = self.get_mut(); if pin!(scheduler).poll_next_unpin(cx).is_pending() { @@ -80,79 +54,38 @@ where } if let Poll::Ready(Some(item)) = pin!(stream).poll_next(cx) { Poll::Ready(Some(item)) - } else if coin.flip() { - Poll::Ready(Some(M::DROP_MESSAGE.to_vec())) } else { Poll::Pending } } } -pub trait PersistentTransmissionExt: Stream +pub trait PersistentTransmissionExt: Stream where - Rng: RngCore, - M: MixMessage, Scheduler: Stream, { fn persistent_transmission( self, - settings: PersistentTransmissionSettings, - rng: Rng, scheduler: Scheduler, - ) -> PersistentTransmissionStream + ) -> PersistentTransmissionStream where Self: Sized + Unpin, { - PersistentTransmissionStream::new(settings, self, scheduler, rng) + PersistentTransmissionStream::new(self, scheduler) } } -impl PersistentTransmissionExt for S +impl PersistentTransmissionExt for S where S: Stream, - Rng: RngCore, - M: MixMessage, - M::PublicKey: Clone + Serialize + DeserializeOwned, Scheduler: Stream, { } -struct Coin { - rng: R, - distribution: Uniform, - probability: f64, -} - -impl Coin { - fn new(rng: R, probability: f64) -> Result { - if !(0.0..=1.0).contains(&probability) { - return Err(CoinError::InvalidProbability); - } - Ok(Self { - rng, - distribution: Uniform::from(0.0..1.0), - probability, - }) - } - - // Flip the coin based on the given probability. - fn flip(&mut self) -> bool { - self.distribution.sample(&mut self.rng) < self.probability - } -} - -#[derive(Debug)] -enum CoinError { - InvalidProbability, -} - #[cfg(test)] mod tests { use super::*; use futures::StreamExt; - use nomos_mix_message::mock::MockMixMessage; - use rand::SeedableRng; - use rand_chacha::ChaCha8Rng; use std::time::Duration; use tokio::sync::mpsc; use tokio::time; @@ -186,8 +119,6 @@ mod tests { let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(schedule_receiver); let settings = PersistentTransmissionSettings { max_emission_frequency: 1.0, - // Set to always emit drop messages if no scheduled messages for easy testing - drop_message_probability: 1.0, }; // Prepare the expected emission interval with torelance let expected_emission_interval = @@ -196,16 +127,10 @@ mod tests { let lower_bound = expected_emission_interval - torelance; let upper_bound = expected_emission_interval + torelance; // prepare stream - let mut persistent_transmission_stream: PersistentTransmissionStream< - _, - _, - MockMixMessage, - _, - > = stream.persistent_transmission( - settings, - ChaCha8Rng::from_entropy(), - IntervalStream::new(time::interval(expected_emission_interval)).map(|_| ()), - ); + let mut persistent_transmission_stream: PersistentTransmissionStream<_, _> = stream + .persistent_transmission( + IntervalStream::new(time::interval(expected_emission_interval)).map(|_| ()), + ); // Messages must be scheduled in non-blocking manner. schedule_sender.send(vec![1]).unwrap(); schedule_sender.send(vec![2]).unwrap(); @@ -230,15 +155,17 @@ mod tests { ); assert_interval!(&mut last_time, lower_bound, upper_bound); - assert!(MockMixMessage::is_drop_message( - &persistent_transmission_stream.next().await.unwrap() - )); - assert_interval!(&mut last_time, lower_bound, upper_bound); - - assert!(MockMixMessage::is_drop_message( - &persistent_transmission_stream.next().await.unwrap() - )); - assert_interval!(&mut last_time, lower_bound, upper_bound); + // Check if nothing is emitted when there is no message scheduled + let timeout = tokio::time::timeout( + expected_emission_interval, + persistent_transmission_stream.next(), + ) + .await; + assert!( + timeout.is_err(), + "Timeout must occur because no message is scheduled." + ); + last_time = time::Instant::now(); // Schedule a new message and check if it is emitted at the next interval schedule_sender.send(vec![4]).unwrap(); diff --git a/nomos-mix/message/src/lib.rs b/nomos-mix/message/src/lib.rs index 5debe180..dd7fcdb9 100644 --- a/nomos-mix/message/src/lib.rs +++ b/nomos-mix/message/src/lib.rs @@ -7,7 +7,6 @@ pub use error::Error; pub trait MixMessage { type PublicKey; type PrivateKey; - const DROP_MESSAGE: &'static [u8]; fn build_message(payload: &[u8], public_keys: &[Self::PublicKey]) -> Result, Error>; /// Unwrap the message one layer. @@ -21,7 +20,4 @@ pub trait MixMessage { message: &[u8], private_key: &Self::PrivateKey, ) -> Result<(Vec, bool), Error>; - fn is_drop_message(message: &[u8]) -> bool { - message == Self::DROP_MESSAGE - } } diff --git a/nomos-mix/message/src/mock/mod.rs b/nomos-mix/message/src/mock/mod.rs index ea8f6d5d..bf1820af 100644 --- a/nomos-mix/message/src/mock/mod.rs +++ b/nomos-mix/message/src/mock/mod.rs @@ -17,7 +17,6 @@ pub struct MockMixMessage; impl MixMessage for MockMixMessage { type PublicKey = [u8; NODE_ID_SIZE]; type PrivateKey = [u8; NODE_ID_SIZE]; - const DROP_MESSAGE: &'static [u8] = &[0; MESSAGE_SIZE]; /// The length of the encoded message is fixed to [`MESSAGE_SIZE`] bytes. /// The [`MAX_LAYERS`] number of [`NodeId`]s are concatenated in front of the payload. diff --git a/nomos-mix/network/src/behaviour.rs b/nomos-mix/network/src/behaviour.rs index d2e04d84..08bfa421 100644 --- a/nomos-mix/network/src/behaviour.rs +++ b/nomos-mix/network/src/behaviour.rs @@ -11,9 +11,7 @@ use libp2p::{ }, Multiaddr, PeerId, }; -use nomos_mix_message::MixMessage; use sha2::{Digest, Sha256}; -use std::marker::PhantomData; use std::{ collections::{HashMap, HashSet, VecDeque}, task::{Context, Poll, Waker}, @@ -22,7 +20,7 @@ use std::{ /// A [`NetworkBehaviour`]: /// - forwards messages to all connected peers with deduplication. /// - receives messages from all connected peers. -pub struct Behaviour { +pub struct Behaviour { config: Config, /// Peers that support the mix protocol, and their connection IDs negotiated_peers: HashMap>, @@ -33,7 +31,6 @@ pub struct Behaviour { /// An LRU time cache for storing seen messages (based on their ID). This cache prevents /// duplicates from being propagated on the network. duplicate_cache: TimedCache, ()>, - _mix_message: PhantomData, } #[derive(Debug)] @@ -48,10 +45,7 @@ pub enum Event { Error(Error), } -impl Behaviour -where - M: MixMessage, -{ +impl Behaviour { pub fn new(config: Config) -> Self { let duplicate_cache = TimedCache::with_lifespan(config.duplicate_cache_lifespan); Self { @@ -60,17 +54,11 @@ where events: VecDeque::new(), waker: None, duplicate_cache, - _mix_message: Default::default(), } } - /// Publish a message (data or drop) to all connected peers + /// Publish a message to all connected peers pub fn publish(&mut self, message: Vec) -> Result<(), Error> { - if M::is_drop_message(&message) { - // Bypass deduplication for the drop message - return self.forward_message(message, None); - } - let msg_id = Self::message_id(&message); // If the message was already seen, don't forward it again if self.duplicate_cache.cache_get(&msg_id).is_some() { @@ -155,10 +143,7 @@ where } } -impl NetworkBehaviour for Behaviour -where - M: MixMessage + 'static, -{ +impl NetworkBehaviour for Behaviour { type ConnectionHandler = MixConnectionHandler; type ToSwarm = Event; @@ -205,11 +190,6 @@ where match event { // A message was forwarded from the peer. ToBehaviour::Message(message) => { - // Ignore drop message - if M::is_drop_message(&message) { - return; - } - // Add the message to the cache. If it was already seen, ignore it. if self .duplicate_cache diff --git a/nomos-mix/network/src/lib.rs b/nomos-mix/network/src/lib.rs index 6db526e4..d2b4ee11 100644 --- a/nomos-mix/network/src/lib.rs +++ b/nomos-mix/network/src/lib.rs @@ -14,7 +14,6 @@ mod test { swarm::{dummy, NetworkBehaviour, SwarmEvent}, Multiaddr, PeerId, Swarm, SwarmBuilder, }; - use nomos_mix_message::mock::MockMixMessage; use tokio::select; use crate::{behaviour::Config, error::Error, Behaviour, Event}; @@ -116,7 +115,7 @@ mod test { } } - fn new_swarm(key: Keypair) -> Swarm> { + fn new_swarm(key: Keypair) -> Swarm { new_swarm_with_behaviour( key, Behaviour::new(Config { diff --git a/nomos-services/mix/src/backends/libp2p.rs b/nomos-services/mix/src/backends/libp2p.rs index c580f154..d6aaf9d6 100644 --- a/nomos-services/mix/src/backends/libp2p.rs +++ b/nomos-services/mix/src/backends/libp2p.rs @@ -106,7 +106,7 @@ impl MixBackend for Libp2pMixBackend { } struct MixSwarm { - swarm: Swarm>, + swarm: Swarm, swarm_messages_receiver: mpsc::Receiver, incoming_message_sender: broadcast::Sender>, } diff --git a/nomos-services/mix/src/lib.rs b/nomos-services/mix/src/lib.rs index 8e3bbc34..8d2d0ead 100644 --- a/nomos-services/mix/src/lib.rs +++ b/nomos-services/mix/src/lib.rs @@ -107,19 +107,13 @@ where // tier 1 persistent transmission let (persistent_sender, persistent_receiver) = mpsc::unbounded_channel(); - let mut persistent_transmission_messages: PersistentTransmissionStream< - _, - _, - MockMixMessage, - _, - > = UnboundedReceiverStream::new(persistent_receiver).persistent_transmission( - mix_config.persistent_transmission, - ChaCha12Rng::from_entropy(), - IntervalStream::new(time::interval(Duration::from_secs_f64( - 1.0 / mix_config.persistent_transmission.max_emission_frequency, - ))) - .map(|_| ()), - ); + let mut persistent_transmission_messages: PersistentTransmissionStream<_, _> = + UnboundedReceiverStream::new(persistent_receiver).persistent_transmission( + IntervalStream::new(time::interval(Duration::from_secs_f64( + 1.0 / mix_config.persistent_transmission.max_emission_frequency, + ))) + .map(|_| ()), + ); // tier 2 blend let temporal_scheduler = TemporalScheduler::new(