Extract temporal async stream

This commit is contained in:
Daniel Sanchez Quiros 2024-11-05 19:24:56 +07:00
parent 1fa442aa2d
commit 55b86956d8
1 changed files with 70 additions and 41 deletions

View File

@ -10,14 +10,8 @@ use rand::Rng;
use serde::{Deserialize, Serialize};
use tokio::time;
/// [`TemporalProcessor`] delays messages randomly to hide timing correlation
/// between incoming and outgoing messages from a node.
///
/// See the [`Stream`] implementation below for more details on how it works.
pub(crate) struct TemporalProcessor<M> {
pub struct TemporalLotteryWithDelay {
settings: TemporalProcessorSettings,
// All scheduled messages
queue: VecDeque<M>,
/// Interval in seconds for running the lottery to release a message
lottery_interval: time::Interval,
/// To wait a few seconds after running the lottery before releasing the message.
@ -25,22 +19,7 @@ pub(crate) struct TemporalProcessor<M> {
release_timer: Option<Pin<Box<time::Sleep>>>,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct TemporalProcessorSettings {
pub max_delay_seconds: u64,
}
impl<M> TemporalProcessor<M> {
pub(crate) fn new(settings: TemporalProcessorSettings) -> Self {
let lottery_interval = Self::lottery_interval(settings.max_delay_seconds);
Self {
settings,
queue: VecDeque::new(),
lottery_interval,
release_timer: None,
}
}
impl TemporalLotteryWithDelay {
/// Create [`time::Interval`] for running the lottery to release a message.
fn lottery_interval(max_delay_seconds: u64) -> time::Interval {
time::interval(Duration::from_secs(Self::lottery_interval_seconds(
@ -62,18 +41,10 @@ impl<M> TemporalProcessor<M> {
let interval = Self::lottery_interval_seconds(self.settings.max_delay_seconds);
rand::thread_rng().gen_range(0..interval)
}
/// Schedule a message to be released later.
pub(crate) fn push_message(&mut self, message: M) {
self.queue.push_back(message);
}
}
impl<M> Stream for TemporalProcessor<M>
where
M: Unpin,
{
type Item = M;
impl Stream for TemporalLotteryWithDelay {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Check whether it's time to run a new lottery to determine the delay.
@ -87,10 +58,7 @@ where
if let Some(timer) = self.release_timer.as_mut() {
if timer.as_mut().poll(cx).is_ready() {
self.release_timer.take(); // Reset timer after it's done
if let Some(msg) = self.queue.pop_front() {
// Release the 1st message in the queue if it exists.
return Poll::Ready(Some(msg));
}
return Poll::Ready(Some(()));
}
}
@ -98,18 +66,76 @@ where
}
}
pub struct TemporalStream<S>
/// [`TemporalProcessor`] delays messages randomly to hide timing correlation
/// between incoming and outgoing messages from a node.
///
/// See the [`Stream`] implementation below for more details on how it works.
pub(crate) struct TemporalProcessor<M, R> {
// All scheduled messages
queue: VecDeque<M>,
releaser: R,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct TemporalProcessorSettings {
pub max_delay_seconds: u64,
}
impl<M> TemporalProcessor<M, TemporalLotteryWithDelay> {
pub(crate) fn new(settings: TemporalProcessorSettings) -> Self {
let lottery_interval =
TemporalLotteryWithDelay::lottery_interval(settings.max_delay_seconds);
Self {
queue: VecDeque::new(),
releaser: TemporalLotteryWithDelay {
settings,
lottery_interval,
release_timer: None,
},
}
}
}
impl<M, R> TemporalProcessor<M, R> {
/// Schedule a message to be released later.
pub(crate) fn push_message(&mut self, message: M) {
self.queue.push_back(message);
}
}
impl<M, R> Stream for TemporalProcessor<M, R>
where
M: Unpin,
R: Stream + Unpin,
{
type Item = M;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.releaser.poll_next_unpin(cx).is_ready() {
if let Some(msg) = self.queue.pop_front() {
// Release the 1st message in the queue if it exists.
return Poll::Ready(Some(msg));
}
}
Poll::Pending
}
}
pub struct TemporalStream<S, R>
where
S: Stream,
R: Stream,
{
processor: TemporalProcessor<S::Item>,
processor: TemporalProcessor<S::Item, R>,
wrapped_stream: S,
}
impl<S> Stream for TemporalStream<S>
impl<S, R> Stream for TemporalStream<S, R>
where
S: Stream + Unpin,
S::Item: Unpin,
R: Stream + Unpin,
{
type Item = S::Item;
@ -121,7 +147,10 @@ where
}
}
pub trait TemporalProcessorExt: Stream {
fn temporal_stream(self, settings: TemporalProcessorSettings) -> TemporalStream<Self>
fn temporal_stream(
self,
settings: TemporalProcessorSettings,
) -> TemporalStream<Self, TemporalLotteryWithDelay>
where
Self: Sized,
{