diff --git a/nomos-mix/core/src/message_blend/temporal.rs b/nomos-mix/core/src/message_blend/temporal.rs index 16c1753d..312e8580 100644 --- a/nomos-mix/core/src/message_blend/temporal.rs +++ b/nomos-mix/core/src/message_blend/temporal.rs @@ -5,7 +5,7 @@ use std::{ time::Duration, }; -use futures::{Future, Stream}; +use futures::{Future, Stream, StreamExt}; use rand::Rng; use serde::{Deserialize, Serialize}; use tokio::time; @@ -71,7 +71,7 @@ impl TemporalProcessor { impl Stream for TemporalProcessor where - M: Unpin + Clone + 'static, + M: Unpin, { type Item = M; @@ -97,3 +97,40 @@ where Poll::Pending } } + +pub struct TemporalStream +where + S: Stream, +{ + processor: TemporalProcessor, + wrapped_stream: S, +} + +impl Stream for TemporalStream +where + S: Stream + Unpin, + S::Item: Unpin, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Poll::Ready(Some(item)) = self.wrapped_stream.poll_next_unpin(cx) { + self.processor.push_message(item); + } + self.processor.poll_next_unpin(cx) + } +} +#[allow(dead_code)] // TODO: Remove when integrating into blend +pub trait TemporalProcessorExt: Stream { + fn to_temporal_stream(self, settings: TemporalProcessorSettings) -> TemporalStream + where + Self: Sized, + { + TemporalStream { + processor: TemporalProcessor::new(settings), + wrapped_stream: self, + } + } +} + +impl TemporalProcessorExt for T where T: Stream {}