Mix: temporal as stream extension (#861)

* forward msgs immediately without any processing

* Mix: Offload transmission rate and message processing from libp2p behaviour/handler

* Mix: Core skeleton used in `MixService`

* rename Processor to MessageBlend

* Mix: Implement Persistent Transmission (Tier 1) (#845)

* Mix: Add Persistent Transmission (Tier 1)

* add test

* define Coin struct with Uniform distribution for fast repeated sampling

* use ChaCha12Rng for Coin

* improve comment

* Mix: Implement Temporal Processor (Tier 2) (#846)

* Mix: Add Persistent Transmission (Tier 1)

* Mix: Implement TemporalProcessor

* use pub(crate)

* Mix: Use TemporalProcessor in MessageBlend (#847)

* Mix: Add Persistent Transmission (Tier 1)

* Mix: Implement TemporalProcessor

* Mix: Use TemporalProcessor in MessageBlend

* remove duplicate members in Cargo.toml

* Implement persistent transmission as stream trait extension

* Replicate test for stream version

* Temporal processor as stream extension

---------

Co-authored-by: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com>
This commit is contained in:
Daniel Sanchez 2024-10-28 17:12:50 +01:00 committed by GitHub
parent e207743f69
commit 9d52297cdf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 39 additions and 2 deletions

View File

@ -5,7 +5,7 @@ use std::{
time::Duration,
};
use futures::{Future, Stream};
use futures::{Future, Stream, StreamExt};
use rand::Rng;
use serde::{Deserialize, Serialize};
use tokio::time;
@ -71,7 +71,7 @@ impl<M> TemporalProcessor<M> {
impl<M> Stream for TemporalProcessor<M>
where
M: Unpin + Clone + 'static,
M: Unpin,
{
type Item = M;
@ -97,3 +97,40 @@ where
Poll::Pending
}
}
pub struct TemporalStream<S>
where
S: Stream,
{
processor: TemporalProcessor<S::Item>,
wrapped_stream: S,
}
impl<S> Stream for TemporalStream<S>
where
S: Stream + Unpin,
S::Item: Unpin,
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Poll::Ready(Some(item)) = self.wrapped_stream.poll_next_unpin(cx) {
self.processor.push_message(item);
}
self.processor.poll_next_unpin(cx)
}
}
#[allow(dead_code)] // TODO: Remove when integrating into blend
pub trait TemporalProcessorExt: Stream {
fn to_temporal_stream(self, settings: TemporalProcessorSettings) -> TemporalStream<Self>
where
Self: Sized,
{
TemporalStream {
processor: TemporalProcessor::new(settings),
wrapped_stream: self,
}
}
}
impl<T> TemporalProcessorExt for T where T: Stream {}