Mix: Persistent rate as stream + Extension (#860)

* forward msgs immediately without any processing

* Mix: Offload transmission rate and message processing from libp2p behaviour/handler

* Mix: Core skeleton used in `MixService`

* rename Processor to MessageBlend

* Mix: Implement Persistent Transmission (Tier 1) (#845)

* Mix: Add Persistent Transmission (Tier 1)

* add test

* define Coin struct with Uniform distribution for fast repeated sampling

* use ChaCha12Rng for Coin

* improve comment

* Mix: Implement Temporal Processor (Tier 2) (#846)

* Mix: Add Persistent Transmission (Tier 1)

* Mix: Implement TemporalProcessor

* use pub(crate)

* Mix: Use TemporalProcessor in MessageBlend (#847)

* Mix: Add Persistent Transmission (Tier 1)

* Mix: Implement TemporalProcessor

* Mix: Use TemporalProcessor in MessageBlend

* remove duplicate members in Cargo.toml

* Implement persistent transmission as stream trait extension

* Replicate test for stream version

---------

Co-authored-by: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com>
This commit is contained in:
Daniel Sanchez 2024-10-28 15:24:40 +01:00 committed by GitHub
parent 7aea30132d
commit e207743f69
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 147 additions and 3 deletions

View File

@ -5,10 +5,15 @@ edition = "2021"
[dependencies]
cached = "0.53"
tokio = { version = "1" }
tokio = { version = "1", features = ["time", "sync", "macros"] }
tracing = "0.1"
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
nomos-mix-message = { path = "../message" }
futures = "0.3"
rand_chacha = "0.3"
[dev-dependencies]
tokio = { version = "1", features = ["rt-multi-thread"] }
tokio-stream = "0.1"

View File

@ -1,9 +1,12 @@
use std::time::Duration;
use futures::Stream;
use nomos_mix_message::DROP_MESSAGE;
use rand::{distributions::Uniform, prelude::Distribution, Rng, SeedableRng};
use rand_chacha::ChaCha12Rng;
use serde::{Deserialize, Serialize};
use std::pin::{pin, Pin};
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::Interval;
use tokio::{
sync::mpsc::{self, error::TryRecvError},
time,
@ -26,6 +29,79 @@ impl Default for PersistentTransmissionSettings {
}
}
pub struct PersistentTransmissionStream<S>
where
S: Stream,
{
interval: Interval,
coin: Coin<ChaCha12Rng>,
stream: S,
}
impl<S> PersistentTransmissionStream<S>
where
S: Stream,
{
pub fn new(
settings: PersistentTransmissionSettings,
stream: S,
) -> PersistentTransmissionStream<S> {
let interval = time::interval(Duration::from_secs_f64(
1.0 / settings.max_emission_frequency,
));
let coin = Coin::<_>::new(
ChaCha12Rng::from_entropy(),
settings.drop_message_probability,
)
.unwrap();
Self {
interval,
coin,
stream,
}
}
}
impl<S> Stream for PersistentTransmissionStream<S>
where
S: Stream<Item = Vec<u8>> + Unpin,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self {
ref mut interval,
ref mut stream,
ref mut coin,
..
} = self.get_mut();
if pin!(interval).poll_tick(cx).is_pending() {
return Poll::Pending;
}
if let Poll::Ready(Some(item)) = pin!(stream).poll_next(cx) {
Poll::Ready(Some(item))
} else if coin.flip() {
Poll::Ready(Some(DROP_MESSAGE.to_vec()))
} else {
Poll::Pending
}
}
}
pub trait PersistentTransmissionExt: Stream {
fn persistent_transmission(
self,
settings: PersistentTransmissionSettings,
) -> PersistentTransmissionStream<Self>
where
Self: Sized + Unpin,
{
PersistentTransmissionStream::new(settings, self)
}
}
impl<S> PersistentTransmissionExt for S where S: Stream {}
/// Transmit scheduled messages with a persistent rate to the transmission channel.
///
/// # Arguments
@ -109,6 +185,7 @@ enum CoinError {
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
macro_rules! assert_interval {
($last_time:expr, $lower_bound:expr, $upper_bound:expr) => {
@ -188,4 +265,66 @@ mod tests {
assert_eq!(emission_receiver.recv().await.unwrap(), vec![4]);
assert_interval!(&mut last_time, lower_bound, upper_bound);
}
#[tokio::test]
async fn test_persistent_transmission_stream() {
let (schedule_sender, schedule_receiver) = mpsc::unbounded_channel();
let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(schedule_receiver);
let settings = PersistentTransmissionSettings {
max_emission_frequency: 1.0,
// Set to always emit drop messages if no scheduled messages for easy testing
drop_message_probability: 1.0,
};
// Prepare the expected emission interval with torelance
let expected_emission_interval =
Duration::from_secs_f64(1.0 / settings.max_emission_frequency);
let torelance = expected_emission_interval / 10; // 10% torelance
let lower_bound = expected_emission_interval - torelance;
let upper_bound = expected_emission_interval + torelance;
// prepare stream
let mut persistent_transmission_stream = stream.persistent_transmission(settings);
// Messages must be scheduled in non-blocking manner.
schedule_sender.send(vec![1]).unwrap();
schedule_sender.send(vec![2]).unwrap();
schedule_sender.send(vec![3]).unwrap();
// Check if expected messages are emitted with the expected interval
assert_eq!(
persistent_transmission_stream.next().await.unwrap(),
vec![1]
);
let mut last_time = time::Instant::now();
assert_eq!(
persistent_transmission_stream.next().await.unwrap(),
vec![2]
);
assert_interval!(&mut last_time, lower_bound, upper_bound);
assert_eq!(
persistent_transmission_stream.next().await.unwrap(),
vec![3]
);
assert_interval!(&mut last_time, lower_bound, upper_bound);
assert_eq!(
persistent_transmission_stream.next().await.unwrap(),
DROP_MESSAGE.to_vec()
);
assert_interval!(&mut last_time, lower_bound, upper_bound);
assert_eq!(
persistent_transmission_stream.next().await.unwrap(),
DROP_MESSAGE.to_vec()
);
assert_interval!(&mut last_time, lower_bound, upper_bound);
// Schedule a new message and check if it is emitted at the next interval
schedule_sender.send(vec![4]).unwrap();
assert_eq!(
persistent_transmission_stream.next().await.unwrap(),
vec![4]
);
assert_interval!(&mut last_time, lower_bound, upper_bound);
}
}