From e0959644a9927d3930197df281684615dee7a6ef Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Wed, 6 Nov 2024 18:59:06 +0900 Subject: [PATCH] Add Sync bound to BoxStream in blend stream (#908) --- nomos-mix/core/src/message_blend/mod.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/nomos-mix/core/src/message_blend/mod.rs b/nomos-mix/core/src/message_blend/mod.rs index 18d488bd..13d6a2ce 100644 --- a/nomos-mix/core/src/message_blend/mod.rs +++ b/nomos-mix/core/src/message_blend/mod.rs @@ -2,7 +2,6 @@ pub mod crypto; pub mod temporal; pub use crypto::CryptographicProcessorSettings; -use futures::stream::BoxStream; use futures::{Stream, StreamExt}; use rand::RngCore; use std::marker::PhantomData; @@ -39,7 +38,7 @@ where M: MixMessage, { input_stream: S, - output_stream: BoxStream<'static, MixOutgoingMessage>, + output_stream: Pin + Send + Sync + 'static>>, temporal_sender: UnboundedSender, cryptographic_processor: CryptographicProcessor, _rng: PhantomData, @@ -53,7 +52,7 @@ where M: MixMessage, M::PrivateKey: Serialize + DeserializeOwned, M::PublicKey: Clone + PartialEq, - Scheduler: Stream + Unpin + Send + 'static, + Scheduler: Stream + Unpin + Send + Sync + 'static, { pub fn new( input_stream: S, @@ -68,9 +67,8 @@ where cryptographic_processor_rng, ); let (temporal_sender, temporal_receiver) = mpsc::unbounded_channel(); - let output_stream = UnboundedReceiverStream::new(temporal_receiver) - .temporal_stream(scheduler) - .boxed(); + let output_stream = + Box::pin(UnboundedReceiverStream::new(temporal_receiver).temporal_stream(scheduler)); Self { input_stream, output_stream, @@ -110,7 +108,7 @@ where M: MixMessage + Unpin, M::PrivateKey: Serialize + DeserializeOwned + Unpin, M::PublicKey: Clone + PartialEq + Unpin, - Scheduler: Stream + Unpin + Send + 'static, + Scheduler: Stream + Unpin + Send + Sync + 'static, { type Item = MixOutgoingMessage; @@ -128,7 +126,7 @@ where M: MixMessage, M::PrivateKey: Serialize + DeserializeOwned, M::PublicKey: Clone + PartialEq, - Scheduler: Stream + Unpin + Send + 'static, + Scheduler: Stream + Unpin + Send + Sync + 'static, { fn blend( self, @@ -157,6 +155,6 @@ where M: MixMessage, M::PrivateKey: Clone + Serialize + DeserializeOwned + PartialEq, M::PublicKey: Clone + Serialize + DeserializeOwned + PartialEq, - S: Stream + Unpin + Send + 'static, + S: Stream + Unpin + Send + Sync + 'static, { }