From e207743f69cc1883ff3549cbee8e689cf7de6ea3 Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Mon, 28 Oct 2024 15:24:40 +0100 Subject: [PATCH] Mix: Persistent rate as stream + Extension (#860) * forward msgs immediately without any processing * Mix: Offload transmission rate and message processing from libp2p behaviour/handler * Mix: Core skeleton used in `MixService` * rename Processor to MessageBlend * Mix: Implement Persistent Transmission (Tier 1) (#845) * Mix: Add Persistent Transmission (Tier 1) * add test * define Coin struct with Uniform distribution for fast repeated sampling * use ChaCha12Rng for Coin * improve comment * Mix: Implement Temporal Processor (Tier 2) (#846) * Mix: Add Persistent Transmission (Tier 1) * Mix: Implement TemporalProcessor * use pub(crate) * Mix: Use TemporalProcessor in MessageBlend (#847) * Mix: Add Persistent Transmission (Tier 1) * Mix: Implement TemporalProcessor * Mix: Use TemporalProcessor in MessageBlend * remove duplicate members in Cargo.toml * Implement persistent transmission as stream trait extension * Replicate test for stream version --------- Co-authored-by: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> --- nomos-mix/core/Cargo.toml | 7 +- nomos-mix/core/src/persistent_transmission.rs | 143 +++++++++++++++++- 2 files changed, 147 insertions(+), 3 deletions(-) diff --git a/nomos-mix/core/Cargo.toml b/nomos-mix/core/Cargo.toml index 6c8f90b7..eb746922 100644 --- a/nomos-mix/core/Cargo.toml +++ b/nomos-mix/core/Cargo.toml @@ -5,10 +5,15 @@ edition = "2021" [dependencies] cached = "0.53" -tokio = { version = "1" } +tokio = { version = "1", features = ["time", "sync", "macros"] } tracing = "0.1" rand = "0.8" serde = { version = "1.0", features = ["derive"] } nomos-mix-message = { path = "../message" } futures = "0.3" rand_chacha = "0.3" + + +[dev-dependencies] +tokio = { version = "1", features = ["rt-multi-thread"] } +tokio-stream = "0.1" \ No newline at end of file diff --git a/nomos-mix/core/src/persistent_transmission.rs b/nomos-mix/core/src/persistent_transmission.rs index 7e1dbab7..48946286 100644 --- a/nomos-mix/core/src/persistent_transmission.rs +++ b/nomos-mix/core/src/persistent_transmission.rs @@ -1,9 +1,12 @@ -use std::time::Duration; - +use futures::Stream; use nomos_mix_message::DROP_MESSAGE; use rand::{distributions::Uniform, prelude::Distribution, Rng, SeedableRng}; use rand_chacha::ChaCha12Rng; use serde::{Deserialize, Serialize}; +use std::pin::{pin, Pin}; +use std::task::{Context, Poll}; +use std::time::Duration; +use tokio::time::Interval; use tokio::{ sync::mpsc::{self, error::TryRecvError}, time, @@ -26,6 +29,79 @@ impl Default for PersistentTransmissionSettings { } } +pub struct PersistentTransmissionStream +where + S: Stream, +{ + interval: Interval, + coin: Coin, + stream: S, +} + +impl PersistentTransmissionStream +where + S: Stream, +{ + pub fn new( + settings: PersistentTransmissionSettings, + stream: S, + ) -> PersistentTransmissionStream { + let interval = time::interval(Duration::from_secs_f64( + 1.0 / settings.max_emission_frequency, + )); + let coin = Coin::<_>::new( + ChaCha12Rng::from_entropy(), + settings.drop_message_probability, + ) + .unwrap(); + Self { + interval, + coin, + stream, + } + } +} + +impl Stream for PersistentTransmissionStream +where + S: Stream> + Unpin, +{ + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Self { + ref mut interval, + ref mut stream, + ref mut coin, + .. + } = self.get_mut(); + if pin!(interval).poll_tick(cx).is_pending() { + return Poll::Pending; + } + if let Poll::Ready(Some(item)) = pin!(stream).poll_next(cx) { + Poll::Ready(Some(item)) + } else if coin.flip() { + Poll::Ready(Some(DROP_MESSAGE.to_vec())) + } else { + Poll::Pending + } + } +} + +pub trait PersistentTransmissionExt: Stream { + fn persistent_transmission( + self, + settings: PersistentTransmissionSettings, + ) -> PersistentTransmissionStream + where + Self: Sized + Unpin, + { + PersistentTransmissionStream::new(settings, self) + } +} + +impl PersistentTransmissionExt for S where S: Stream {} + /// Transmit scheduled messages with a persistent rate to the transmission channel. /// /// # Arguments @@ -109,6 +185,7 @@ enum CoinError { #[cfg(test)] mod tests { use super::*; + use futures::StreamExt; macro_rules! assert_interval { ($last_time:expr, $lower_bound:expr, $upper_bound:expr) => { @@ -188,4 +265,66 @@ mod tests { assert_eq!(emission_receiver.recv().await.unwrap(), vec![4]); assert_interval!(&mut last_time, lower_bound, upper_bound); } + + #[tokio::test] + async fn test_persistent_transmission_stream() { + let (schedule_sender, schedule_receiver) = mpsc::unbounded_channel(); + let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(schedule_receiver); + let settings = PersistentTransmissionSettings { + max_emission_frequency: 1.0, + // Set to always emit drop messages if no scheduled messages for easy testing + drop_message_probability: 1.0, + }; + // Prepare the expected emission interval with torelance + let expected_emission_interval = + Duration::from_secs_f64(1.0 / settings.max_emission_frequency); + let torelance = expected_emission_interval / 10; // 10% torelance + let lower_bound = expected_emission_interval - torelance; + let upper_bound = expected_emission_interval + torelance; + // prepare stream + let mut persistent_transmission_stream = stream.persistent_transmission(settings); + // Messages must be scheduled in non-blocking manner. + schedule_sender.send(vec![1]).unwrap(); + schedule_sender.send(vec![2]).unwrap(); + schedule_sender.send(vec![3]).unwrap(); + + // Check if expected messages are emitted with the expected interval + assert_eq!( + persistent_transmission_stream.next().await.unwrap(), + vec![1] + ); + let mut last_time = time::Instant::now(); + + assert_eq!( + persistent_transmission_stream.next().await.unwrap(), + vec![2] + ); + assert_interval!(&mut last_time, lower_bound, upper_bound); + + assert_eq!( + persistent_transmission_stream.next().await.unwrap(), + vec![3] + ); + assert_interval!(&mut last_time, lower_bound, upper_bound); + + assert_eq!( + persistent_transmission_stream.next().await.unwrap(), + DROP_MESSAGE.to_vec() + ); + assert_interval!(&mut last_time, lower_bound, upper_bound); + + assert_eq!( + persistent_transmission_stream.next().await.unwrap(), + DROP_MESSAGE.to_vec() + ); + assert_interval!(&mut last_time, lower_bound, upper_bound); + + // Schedule a new message and check if it is emitted at the next interval + schedule_sender.send(vec![4]).unwrap(); + assert_eq!( + persistent_transmission_stream.next().await.unwrap(), + vec![4] + ); + assert_interval!(&mut last_time, lower_bound, upper_bound); + } }