From 9d52297cdf58bdf3251e500d334d3ed782a25784 Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Mon, 28 Oct 2024 17:12:50 +0100 Subject: [PATCH] Mix: temporal as stream extension (#861) * forward msgs immediately without any processing * Mix: Offload transmission rate and message processing from libp2p behaviour/handler * Mix: Core skeleton used in `MixService` * rename Processor to MessageBlend * Mix: Implement Persistent Transmission (Tier 1) (#845) * Mix: Add Persistent Transmission (Tier 1) * add test * define Coin struct with Uniform distribution for fast repeated sampling * use ChaCha12Rng for Coin * improve comment * Mix: Implement Temporal Processor (Tier 2) (#846) * Mix: Add Persistent Transmission (Tier 1) * Mix: Implement TemporalProcessor * use pub(crate) * Mix: Use TemporalProcessor in MessageBlend (#847) * Mix: Add Persistent Transmission (Tier 1) * Mix: Implement TemporalProcessor * Mix: Use TemporalProcessor in MessageBlend * remove duplicate members in Cargo.toml * Implement persistent transmission as stream trait extension * Replicate test for stream version * Temporal processor as stream extension --------- Co-authored-by: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> --- nomos-mix/core/src/message_blend/temporal.rs | 41 +++++++++++++++++++- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/nomos-mix/core/src/message_blend/temporal.rs b/nomos-mix/core/src/message_blend/temporal.rs index 16c1753d..312e8580 100644 --- a/nomos-mix/core/src/message_blend/temporal.rs +++ b/nomos-mix/core/src/message_blend/temporal.rs @@ -5,7 +5,7 @@ use std::{ time::Duration, }; -use futures::{Future, Stream}; +use futures::{Future, Stream, StreamExt}; use rand::Rng; use serde::{Deserialize, Serialize}; use tokio::time; @@ -71,7 +71,7 @@ impl TemporalProcessor { impl Stream for TemporalProcessor where - M: Unpin + Clone + 'static, + M: Unpin, { type Item = M; @@ -97,3 +97,40 @@ where Poll::Pending } } + +pub struct TemporalStream +where + S: Stream, +{ + processor: TemporalProcessor, + wrapped_stream: S, +} + +impl Stream for TemporalStream +where + S: Stream + Unpin, + S::Item: Unpin, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Poll::Ready(Some(item)) = self.wrapped_stream.poll_next_unpin(cx) { + self.processor.push_message(item); + } + self.processor.poll_next_unpin(cx) + } +} +#[allow(dead_code)] // TODO: Remove when integrating into blend +pub trait TemporalProcessorExt: Stream { + fn to_temporal_stream(self, settings: TemporalProcessorSettings) -> TemporalStream + where + Self: Sized, + { + TemporalStream { + processor: TemporalProcessor::new(settings), + wrapped_stream: self, + } + } +} + +impl TemporalProcessorExt for T where T: Stream {}