Mix: time abstract from streams (#906)

* Extract temporal async stream

* Make stuff public

* Extract persistent transmission scheduler

* Fix tests

* Push temporal scheduler one layer up

* fix compile errors

* return Poll::Pending instead of Poll::Ready(None) when queue is empty

---------

Co-authored-by: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com>
This commit is contained in:
Daniel Sanchez 2024-11-06 06:08:52 +01:00 committed by GitHub
parent c84a29db31
commit 9b29c17e2f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 142 additions and 92 deletions

View File

@ -8,7 +8,7 @@ use rand::RngCore;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
pub use temporal::TemporalProcessorSettings;
pub use temporal::TemporalSchedulerSettings;
use crate::membership::Membership;
use crate::message_blend::crypto::CryptographicProcessor;
@ -28,13 +28,13 @@ where
M::PrivateKey: Serialize + DeserializeOwned,
{
pub cryptographic_processor: CryptographicProcessorSettings<M::PrivateKey>,
pub temporal_processor: TemporalProcessorSettings,
pub temporal_processor: TemporalSchedulerSettings,
}
/// [`MessageBlendStream`] handles the entire mixing tiers process
/// - Unwraps incoming messages received from network using [`CryptographicProcessor`]
/// - Pushes unwrapped messages to [`TemporalProcessor`]
pub struct MessageBlendStream<S, Rng, M>
pub struct MessageBlendStream<S, Rng, M, Scheduler>
where
M: MixMessage,
{
@ -43,22 +43,24 @@ where
temporal_sender: UnboundedSender<MixOutgoingMessage>,
cryptographic_processor: CryptographicProcessor<Rng, M>,
_rng: PhantomData<Rng>,
_scheduler: PhantomData<Scheduler>,
}
impl<S, Rng, M> MessageBlendStream<S, Rng, M>
impl<S, Rng, M, Scheduler> MessageBlendStream<S, Rng, M, Scheduler>
where
S: Stream<Item = Vec<u8>>,
Rng: RngCore + Unpin + Send + 'static,
M: MixMessage,
M::PrivateKey: Serialize + DeserializeOwned,
M::PublicKey: Clone + PartialEq,
Scheduler: Stream<Item = ()> + Unpin + Send + 'static,
{
pub fn new(
input_stream: S,
settings: MessageBlendSettings<M>,
membership: Membership<M>,
scheduler: Scheduler,
cryptographic_processor_rng: Rng,
temporal_processor_rng: Rng,
) -> Self {
let cryptographic_processor = CryptographicProcessor::new(
settings.cryptographic_processor,
@ -67,7 +69,7 @@ where
);
let (temporal_sender, temporal_receiver) = mpsc::unbounded_channel();
let output_stream = UnboundedReceiverStream::new(temporal_receiver)
.temporal_stream(settings.temporal_processor, temporal_processor_rng)
.temporal_stream(scheduler)
.boxed();
Self {
input_stream,
@ -75,6 +77,7 @@ where
temporal_sender,
cryptographic_processor,
_rng: Default::default(),
_scheduler: Default::default(),
}
}
@ -100,13 +103,14 @@ where
}
}
impl<S, Rng, M> Stream for MessageBlendStream<S, Rng, M>
impl<S, Rng, M, Scheduler> Stream for MessageBlendStream<S, Rng, M, Scheduler>
where
S: Stream<Item = Vec<u8>> + Unpin,
Rng: RngCore + Unpin + Send + 'static,
M: MixMessage + Unpin,
M::PrivateKey: Serialize + DeserializeOwned + Unpin,
M::PublicKey: Clone + PartialEq + Unpin,
Scheduler: Stream<Item = ()> + Unpin + Send + 'static,
{
type Item = MixOutgoingMessage;
@ -118,20 +122,21 @@ where
}
}
pub trait MessageBlendExt<Rng, M>: Stream<Item = Vec<u8>>
pub trait MessageBlendExt<Rng, M, Scheduler>: Stream<Item = Vec<u8>>
where
Rng: RngCore + Send + Unpin + 'static,
M: MixMessage,
M::PrivateKey: Serialize + DeserializeOwned,
M::PublicKey: Clone + PartialEq,
Scheduler: Stream<Item = ()> + Unpin + Send + 'static,
{
fn blend(
self,
message_blend_settings: MessageBlendSettings<M>,
membership: Membership<M>,
scheduler: Scheduler,
cryptographic_processor_rng: Rng,
temporal_processor_rng: Rng,
) -> MessageBlendStream<Self, Rng, M>
) -> MessageBlendStream<Self, Rng, M, Scheduler>
where
Self: Sized + Unpin,
{
@ -139,18 +144,19 @@ where
self,
message_blend_settings,
membership,
scheduler,
cryptographic_processor_rng,
temporal_processor_rng,
)
}
}
impl<T, Rng, M> MessageBlendExt<Rng, M> for T
impl<T, Rng, M, S> MessageBlendExt<Rng, M, S> for T
where
T: Stream<Item = Vec<u8>>,
Rng: RngCore + Unpin + Send + 'static,
M: MixMessage,
M::PrivateKey: Clone + Serialize + DeserializeOwned + PartialEq,
M::PublicKey: Clone + Serialize + DeserializeOwned + PartialEq,
S: Stream<Item = ()> + Unpin + Send + 'static,
{
}

View File

@ -10,14 +10,8 @@ use rand::{Rng, RngCore};
use serde::{Deserialize, Serialize};
use tokio::time;
/// [`TemporalProcessor`] delays messages randomly to hide timing correlation
/// between incoming and outgoing messages from a node.
///
/// See the [`Stream`] implementation below for more details on how it works.
pub(crate) struct TemporalProcessor<M, Rng> {
settings: TemporalProcessorSettings,
// All scheduled messages
queue: VecDeque<M>,
pub struct TemporalScheduler<Rng> {
settings: TemporalSchedulerSettings,
/// Interval in seconds for running the lottery to release a message
lottery_interval: time::Interval,
/// To wait a few seconds after running the lottery before releasing the message.
@ -27,17 +21,11 @@ pub(crate) struct TemporalProcessor<M, Rng> {
rng: Rng,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct TemporalProcessorSettings {
pub max_delay_seconds: u64,
}
impl<M, Rng> TemporalProcessor<M, Rng> {
pub(crate) fn new(settings: TemporalProcessorSettings, rng: Rng) -> Self {
impl<Rng> TemporalScheduler<Rng> {
pub fn new(settings: TemporalSchedulerSettings, rng: Rng) -> Self {
let lottery_interval = Self::lottery_interval(settings.max_delay_seconds);
Self {
settings,
queue: VecDeque::new(),
lottery_interval,
release_timer: None,
rng,
@ -58,12 +46,9 @@ impl<M, Rng> TemporalProcessor<M, Rng> {
fn lottery_interval_seconds(max_delay_seconds: u64) -> u64 {
max_delay_seconds / 2
}
/// Schedule a message to be released later.
pub(crate) fn push_message(&mut self, message: M) {
self.queue.push_back(message);
}
}
impl<M, Rng> TemporalProcessor<M, Rng>
impl<Rng> TemporalScheduler<Rng>
where
Rng: RngCore,
{
@ -75,12 +60,11 @@ where
}
}
impl<M, Rng> Stream for TemporalProcessor<M, Rng>
impl<Rng> Stream for TemporalScheduler<Rng>
where
M: Unpin,
Rng: RngCore + Unpin,
{
type Item = M;
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Check whether it's time to run a new lottery to determine the delay.
@ -94,33 +78,74 @@ where
if let Some(timer) = self.release_timer.as_mut() {
if timer.as_mut().poll(cx).is_ready() {
self.release_timer.take(); // Reset timer after it's done
if let Some(msg) = self.queue.pop_front() {
// Release the 1st message in the queue if it exists.
return Poll::Ready(Some(msg));
}
return Poll::Ready(Some(()));
}
}
Poll::Pending
}
}
pub struct TemporalStream<S, Rng>
where
S: Stream,
Rng: RngCore,
{
processor: TemporalProcessor<S::Item, Rng>,
wrapped_stream: S,
/// [`TemporalProcessor`] delays messages randomly to hide timing correlation
/// between incoming and outgoing messages from a node.
///
/// See the [`Stream`] implementation below for more details on how it works.
pub struct TemporalProcessor<M, S> {
// All scheduled messages
queue: VecDeque<M>,
scheduler: S,
}
impl<S, Rng> Stream for TemporalStream<S, Rng>
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct TemporalSchedulerSettings {
pub max_delay_seconds: u64,
}
impl<M, S> TemporalProcessor<M, S> {
pub(crate) fn new(scheduler: S) -> Self {
Self {
queue: VecDeque::new(),
scheduler,
}
}
/// Schedule a message to be released later.
pub(crate) fn push_message(&mut self, message: M) {
self.queue.push_back(message);
}
}
impl<M, S> Stream for TemporalProcessor<M, S>
where
S: Stream + Unpin,
S::Item: Unpin,
Rng: RngCore + Unpin,
M: Unpin,
S: Stream<Item = ()> + Unpin,
{
type Item = S::Item;
type Item = M;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.scheduler.poll_next_unpin(cx).is_ready() {
if let Some(msg) = self.queue.pop_front() {
return Poll::Ready(Some(msg));
}
};
Poll::Pending
}
}
pub struct TemporalStream<WrappedStream, Scheduler>
where
WrappedStream: Stream,
Scheduler: Stream<Item = ()>,
{
processor: TemporalProcessor<WrappedStream::Item, Scheduler>,
wrapped_stream: WrappedStream,
}
impl<WrappedStream, Scheduler> Stream for TemporalStream<WrappedStream, Scheduler>
where
WrappedStream: Stream + Unpin,
WrappedStream::Item: Unpin,
Scheduler: Stream<Item = ()> + Unpin,
{
type Item = WrappedStream::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Poll::Ready(Some(item)) = self.wrapped_stream.poll_next_unpin(cx) {
@ -129,28 +154,24 @@ where
self.processor.poll_next_unpin(cx)
}
}
pub trait TemporalProcessorExt<Rng>: Stream
pub trait TemporalProcessorExt<Scheduler>: Stream
where
Rng: RngCore,
Scheduler: Stream<Item = ()>,
{
fn temporal_stream(
self,
settings: TemporalProcessorSettings,
rng: Rng,
) -> TemporalStream<Self, Rng>
fn temporal_stream(self, scheduler: Scheduler) -> TemporalStream<Self, Scheduler>
where
Self: Sized,
{
TemporalStream {
processor: TemporalProcessor::new(settings, rng),
processor: TemporalProcessor::<Self::Item, Scheduler>::new(scheduler),
wrapped_stream: self,
}
}
}
impl<T, Rng> TemporalProcessorExt<Rng> for T
impl<T, S> TemporalProcessorExt<S> for T
where
T: Stream,
Rng: RngCore,
S: Stream<Item = ()>,
{
}

View File

@ -1,4 +1,4 @@
use futures::Stream;
use futures::{Stream, StreamExt};
use nomos_mix_message::MixMessage;
use rand::{distributions::Uniform, prelude::Distribution, Rng, RngCore};
use serde::de::DeserializeOwned;
@ -6,9 +6,6 @@ use serde::{Deserialize, Serialize};
use std::marker::PhantomData;
use std::pin::{pin, Pin};
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time;
use tokio::time::Interval;
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct PersistentTransmissionSettings {
@ -28,57 +25,57 @@ impl Default for PersistentTransmissionSettings {
}
/// Transmit scheduled messages with a persistent rate as a stream.
pub struct PersistentTransmissionStream<S, Rng, M>
pub struct PersistentTransmissionStream<S, Rng, M, Scheduler>
where
S: Stream,
Rng: RngCore,
{
interval: Interval,
coin: Coin<Rng>,
stream: S,
scheduler: Scheduler,
_mix_message: PhantomData<M>,
}
impl<S, Rng, M> PersistentTransmissionStream<S, Rng, M>
impl<S, Rng, M, Scheduler> PersistentTransmissionStream<S, Rng, M, Scheduler>
where
S: Stream,
Rng: RngCore,
M: MixMessage,
Scheduler: Stream<Item = ()>,
{
pub fn new(
settings: PersistentTransmissionSettings,
stream: S,
scheduler: Scheduler,
rng: Rng,
) -> PersistentTransmissionStream<S, Rng, M> {
let interval = time::interval(Duration::from_secs_f64(
1.0 / settings.max_emission_frequency,
));
) -> PersistentTransmissionStream<S, Rng, M, Scheduler> {
let coin = Coin::<Rng>::new(rng, settings.drop_message_probability).unwrap();
Self {
interval,
coin,
stream,
scheduler,
_mix_message: Default::default(),
}
}
}
impl<S, Rng, M> Stream for PersistentTransmissionStream<S, Rng, M>
impl<S, Rng, M, Scheduler> Stream for PersistentTransmissionStream<S, Rng, M, Scheduler>
where
S: Stream<Item = Vec<u8>> + Unpin,
Rng: RngCore + Unpin,
M: MixMessage + Unpin,
Scheduler: Stream<Item = ()> + Unpin,
{
type Item = Vec<u8>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self {
ref mut interval,
ref mut scheduler,
ref mut stream,
ref mut coin,
..
} = self.get_mut();
if pin!(interval).poll_tick(cx).is_pending() {
if pin!(scheduler).poll_next_unpin(cx).is_pending() {
return Poll::Pending;
}
if let Poll::Ready(Some(item)) = pin!(stream).poll_next(cx) {
@ -91,29 +88,32 @@ where
}
}
pub trait PersistentTransmissionExt<Rng, M>: Stream
pub trait PersistentTransmissionExt<Rng, M, Scheduler>: Stream
where
Rng: RngCore,
M: MixMessage,
Scheduler: Stream<Item = ()>,
{
fn persistent_transmission(
self,
settings: PersistentTransmissionSettings,
rng: Rng,
) -> PersistentTransmissionStream<Self, Rng, M>
scheduler: Scheduler,
) -> PersistentTransmissionStream<Self, Rng, M, Scheduler>
where
Self: Sized + Unpin,
{
PersistentTransmissionStream::new(settings, self, rng)
PersistentTransmissionStream::new(settings, self, scheduler, rng)
}
}
impl<S, Rng, M> PersistentTransmissionExt<Rng, M> for S
impl<S, Rng, M, Scheduler> PersistentTransmissionExt<Rng, M, Scheduler> for S
where
S: Stream,
Rng: RngCore,
M: MixMessage,
M::PublicKey: Clone + Serialize + DeserializeOwned,
Scheduler: Stream<Item = ()>,
{
}
@ -153,7 +153,10 @@ mod tests {
use nomos_mix_message::mock::MockMixMessage;
use rand::SeedableRng;
use rand_chacha::ChaCha8Rng;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time;
use tokio_stream::wrappers::IntervalStream;
macro_rules! assert_interval {
($last_time:expr, $lower_bound:expr, $upper_bound:expr) => {
@ -193,8 +196,16 @@ mod tests {
let lower_bound = expected_emission_interval - torelance;
let upper_bound = expected_emission_interval + torelance;
// prepare stream
let mut persistent_transmission_stream: PersistentTransmissionStream<_, _, MockMixMessage> =
stream.persistent_transmission(settings, ChaCha8Rng::from_entropy());
let mut persistent_transmission_stream: PersistentTransmissionStream<
_,
_,
MockMixMessage,
_,
> = stream.persistent_transmission(
settings,
ChaCha8Rng::from_entropy(),
IntervalStream::new(time::interval(expected_emission_interval)).map(|_| ()),
);
// Messages must be scheduled in non-blocking manner.
schedule_sender.send(vec![1]).unwrap();
schedule_sender.send(vec![2]).unwrap();

View File

@ -3,7 +3,7 @@ use cryptarchia_consensus::LeaderConfig;
use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings;
use nomos_mix::membership::Node;
use nomos_mix::message_blend::{
CryptographicProcessorSettings, MessageBlendSettings, TemporalProcessorSettings,
CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings,
};
use nomos_mix_message::mock::MockMixMessage;
use nomos_mix_message::MixMessage;
@ -226,7 +226,7 @@ pub fn new_node(
private_key: mix_config.private_key.to_bytes(),
num_mix_layers: 1,
},
temporal_processor: TemporalProcessorSettings {
temporal_processor: TemporalSchedulerSettings {
max_delay_seconds: 2,
},
},

View File

@ -8,6 +8,7 @@ use network::NetworkAdapter;
use nomos_core::wire;
use nomos_mix::membership::{Membership, Node};
use nomos_mix::message_blend::crypto::CryptographicProcessor;
use nomos_mix::message_blend::temporal::TemporalScheduler;
use nomos_mix::message_blend::{MessageBlendExt, MessageBlendSettings};
use nomos_mix::persistent_transmission::{
PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream,
@ -26,8 +27,10 @@ use rand::SeedableRng;
use rand_chacha::ChaCha12Rng;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::fmt::Debug;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio::time;
use tokio_stream::wrappers::{IntervalStream, UnboundedReceiverStream};
/// A mix service that sends messages to the mix network
/// and broadcasts fully unwrapped messages through the [`NetworkService`].
@ -108,16 +111,25 @@ where
_,
_,
MockMixMessage,
_,
> = UnboundedReceiverStream::new(persistent_receiver).persistent_transmission(
mix_config.persistent_transmission,
ChaCha12Rng::from_entropy(),
IntervalStream::new(time::interval(Duration::from_secs_f64(
1.0 / mix_config.persistent_transmission.max_emission_frequency,
)))
.map(|_| ()),
);
// tier 2 blend
let temporal_scheduler = TemporalScheduler::new(
mix_config.message_blend.temporal_processor,
ChaCha12Rng::from_entropy(),
);
let mut blend_messages = backend.listen_to_incoming_messages().blend(
mix_config.message_blend,
membership.clone(),
ChaCha12Rng::from_entropy(),
temporal_scheduler,
ChaCha12Rng::from_entropy(),
);

View File

@ -23,7 +23,7 @@ use nomos_da_verifier::DaVerifierServiceSettings;
use nomos_executor::api::backend::AxumBackendSettings;
use nomos_executor::config::Config;
use nomos_mix::message_blend::{
CryptographicProcessorSettings, MessageBlendSettings, TemporalProcessorSettings,
CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings,
};
use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig};
use nomos_node::api::paths::{CL_METRICS, DA_GET_RANGE};
@ -162,7 +162,7 @@ pub fn create_executor_config(config: GeneralConfig) -> Config {
private_key: config.mix_config.private_key.to_bytes(),
num_mix_layers: 1,
},
temporal_processor: TemporalProcessorSettings {
temporal_processor: TemporalSchedulerSettings {
max_delay_seconds: 2,
},
},

View File

@ -15,7 +15,7 @@ use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as Verif
use nomos_da_verifier::{backend::kzgrs::KzgrsDaVerifierSettings, DaVerifierServiceSettings};
use nomos_mempool::MempoolMetrics;
use nomos_mix::message_blend::{
CryptographicProcessorSettings, MessageBlendSettings, TemporalProcessorSettings,
CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings,
};
use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig};
use nomos_node::api::paths::{
@ -248,7 +248,7 @@ pub fn create_validator_config(config: GeneralConfig) -> Config {
private_key: config.mix_config.private_key.to_bytes(),
num_mix_layers: 1,
},
temporal_processor: TemporalProcessorSettings {
temporal_processor: TemporalSchedulerSettings {
max_delay_seconds: 2,
},
},