Abstract persistent transmission rng (#903)

This commit is contained in:
Daniel Sanchez 2024-11-05 09:00:31 +01:00 committed by GitHub
parent c237333791
commit 23f93dcc28
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 37 additions and 20 deletions

View File

@ -12,8 +12,9 @@ rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
nomos-mix-message = { path = "../message" }
futures = "0.3"
rand_chacha = "0.3"
[dev-dependencies]
tokio = { version = "1", features = ["rt-multi-thread"] }
rand_chacha = "0.3"

View File

@ -1,7 +1,6 @@
use futures::Stream;
use nomos_mix_message::DROP_MESSAGE;
use rand::{distributions::Uniform, prelude::Distribution, Rng, SeedableRng};
use rand_chacha::ChaCha12Rng;
use rand::{distributions::Uniform, prelude::Distribution, Rng, RngCore};
use serde::{Deserialize, Serialize};
use std::pin::{pin, Pin};
use std::task::{Context, Poll};
@ -27,31 +26,30 @@ impl Default for PersistentTransmissionSettings {
}
/// Transmit scheduled messages with a persistent rate as a stream.
pub struct PersistentTransmissionStream<S>
pub struct PersistentTransmissionStream<S, Rng>
where
S: Stream,
Rng: RngCore,
{
interval: Interval,
coin: Coin<ChaCha12Rng>,
coin: Coin<Rng>,
stream: S,
}
impl<S> PersistentTransmissionStream<S>
impl<S, Rng> PersistentTransmissionStream<S, Rng>
where
S: Stream,
Rng: RngCore,
{
pub fn new(
settings: PersistentTransmissionSettings,
stream: S,
) -> PersistentTransmissionStream<S> {
rng: Rng,
) -> PersistentTransmissionStream<S, Rng> {
let interval = time::interval(Duration::from_secs_f64(
1.0 / settings.max_emission_frequency,
));
let coin = Coin::<_>::new(
ChaCha12Rng::from_entropy(),
settings.drop_message_probability,
)
.unwrap();
let coin = Coin::<Rng>::new(rng, settings.drop_message_probability).unwrap();
Self {
interval,
coin,
@ -60,9 +58,10 @@ where
}
}
impl<S> Stream for PersistentTransmissionStream<S>
impl<S, Rng> Stream for PersistentTransmissionStream<S, Rng>
where
S: Stream<Item = Vec<u8>> + Unpin,
Rng: RngCore + Unpin,
{
type Item = Vec<u8>;
@ -86,19 +85,28 @@ where
}
}
pub trait PersistentTransmissionExt: Stream {
pub trait PersistentTransmissionExt<Rng>: Stream
where
Rng: RngCore,
{
fn persistent_transmission(
self,
settings: PersistentTransmissionSettings,
) -> PersistentTransmissionStream<Self>
rng: Rng,
) -> PersistentTransmissionStream<Self, Rng>
where
Self: Sized + Unpin,
{
PersistentTransmissionStream::new(settings, self)
PersistentTransmissionStream::new(settings, self, rng)
}
}
impl<S> PersistentTransmissionExt for S where S: Stream {}
impl<S, Rng> PersistentTransmissionExt<Rng> for S
where
S: Stream,
Rng: RngCore,
{
}
struct Coin<R: Rng> {
rng: R,
@ -133,6 +141,8 @@ enum CoinError {
mod tests {
use super::*;
use futures::StreamExt;
use rand::SeedableRng;
use rand_chacha::ChaCha8Rng;
use tokio::sync::mpsc;
macro_rules! assert_interval {
@ -173,7 +183,8 @@ mod tests {
let lower_bound = expected_emission_interval - torelance;
let upper_bound = expected_emission_interval + torelance;
// prepare stream
let mut persistent_transmission_stream = stream.persistent_transmission(settings);
let mut persistent_transmission_stream =
stream.persistent_transmission(settings, ChaCha8Rng::from_entropy());
// Messages must be scheduled in non-blocking manner.
schedule_sender.send(vec![1]).unwrap();
schedule_sender.send(vec![2]).unwrap();

View File

@ -15,6 +15,7 @@ nomos-mix-message = { path = "../../nomos-mix/message" }
nomos-network = { path = "../network" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
rand = "0.8.5"
rand_chacha = "0.3"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1", features = ["macros", "sync"] }
tokio-stream = "0.1"

View File

@ -20,6 +20,8 @@ use overwatch_rs::services::{
state::{NoOperator, NoState},
ServiceCore, ServiceData, ServiceId,
};
use rand::SeedableRng;
use rand_chacha::ChaCha12Rng;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::fmt::Debug;
use tokio::sync::mpsc;
@ -92,8 +94,10 @@ where
// tier 1 persistent transmission
let (persistent_sender, persistent_receiver) = mpsc::unbounded_channel();
let mut persistent_transmission_messages =
UnboundedReceiverStream::new(persistent_receiver)
.persistent_transmission(mix_config.persistent_transmission);
UnboundedReceiverStream::new(persistent_receiver).persistent_transmission(
mix_config.persistent_transmission,
ChaCha12Rng::from_entropy(),
);
// tier 2 blend
let mut blend_messages = backend