From 502c791318179f2257b1fdd566ec14d60f66948a Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Wed, 20 Mar 2024 20:13:00 +0900 Subject: [PATCH] Implement Stream for MixClient (#616) --- mixnet/src/client.rs | 97 +++++++++++++++++++++++++++----------------- 1 file changed, 60 insertions(+), 37 deletions(-) diff --git a/mixnet/src/client.rs b/mixnet/src/client.rs index dac6327e..7f7cd0b5 100644 --- a/mixnet/src/client.rs +++ b/mixnet/src/client.rs @@ -1,5 +1,11 @@ -use std::{collections::VecDeque, num::NonZeroU8}; +use std::{ + collections::VecDeque, + num::NonZeroU8, + pin::Pin, + task::{Context, Poll}, +}; +use futures::{Future, Stream}; use rand::rngs::OsRng; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; @@ -12,15 +18,11 @@ use crate::{error::MixnetError, packet::Packet, poisson::Poisson, topology::Mixn /// If there is no messages inserted to the [`MessageQueue`], cover packets are generated and /// returned from [`MixClient.next()`]. pub struct MixClient { - packet_rx: mpsc::UnboundedReceiver, -} - -struct MixClientRunner { config: MixClientConfig, poisson: Poisson, message_queue: mpsc::Receiver>, real_packet_queue: VecDeque, - packet_tx: mpsc::UnboundedSender, + delay: Option>>, } /// Mix client configuration @@ -46,59 +48,77 @@ impl MixClient { pub fn new(config: MixClientConfig) -> Result<(Self, MessageQueue), MixnetError> { let poisson = Poisson::new(config.emission_rate_per_min)?; let (tx, rx) = mpsc::channel(MESSAGE_QUEUE_SIZE); - let (packet_tx, packet_rx) = mpsc::unbounded_channel(); - MixClientRunner { - config, - poisson, - message_queue: rx, - real_packet_queue: VecDeque::new(), - packet_tx, - } - .run(); - - Ok((Self { packet_rx }, tx)) - } - - /// Returns a next [`Packet`] to be emitted, if it exists and the Poisson timer is done. - pub async fn next(&mut self) -> Option { - self.packet_rx.recv().await + Ok(( + Self { + config, + poisson, + message_queue: rx, + real_packet_queue: VecDeque::new(), + delay: None, + }, + tx, + )) } } -impl MixClientRunner { - fn run(mut self) { - tokio::spawn(async move { - let mut delay = tokio::time::sleep(self.poisson.interval(&mut OsRng)); - loop { - let next_deadline = delay.deadline() + self.poisson.interval(&mut OsRng); - delay.await; - delay = tokio::time::sleep_until(next_deadline); +impl Stream for MixClient { + type Item = Packet; - match self.next_packet().await { - Ok(packet) => { - // packet_tx is always expected to be not closed/dropped. - self.packet_tx.send(packet).unwrap(); - } + /// Returns a next [`Packet`] to be emitted, if it exists and the Poisson timer is done. + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.delay.is_none() { + // We've never set an initial delay. Let's do it now. + cx.waker().wake_by_ref(); + + self.delay = Some(Box::pin(tokio::time::sleep( + self.poisson.interval(&mut OsRng), + ))); + return Poll::Pending; + } + + match self.delay.as_mut().unwrap().as_mut().poll(cx) { + Poll::Pending => { + // The delay hasn't elapsed yet. + // The current task is automatically scheduled to be woken up once the timer elapses, + // thanks to the `tokio::time::Sleep.poll(cx)`. + Poll::Pending + } + Poll::Ready(_) => { + // The delay has elapsed. Let's reset the delay and return the next packet. + let next_interval = self.poisson.interval(&mut OsRng); + let delay = self.delay.as_mut().unwrap(); + let next_deadline = delay.deadline() + next_interval; + delay.as_mut().reset(next_deadline); + + match self.next_packet() { + Ok(packet) => Poll::Ready(Some(packet)), Err(e) => { tracing::error!( "failed to find a next packet to emit. skipping to the next turn: {e}" ); + Poll::Pending } } } - }); + } } +} +impl MixClient { const DROP_COVER_MSG: &'static [u8] = b"drop cover"; - async fn next_packet(&mut self) -> Result { + // Returns either a real packet or a drop cover packet. + fn next_packet(&mut self) -> Result { + // If there is any redundant real packet scheduled, return it. if let Some(packet) = self.real_packet_queue.pop_front() { return Ok(packet); } match self.message_queue.try_recv() { Ok(msg) => { + // If there is any message received, build real packets out of it and + // schedule them in the queue. for packet in Packet::build_real(msg, &self.config.topology)? { for _ in 0..self.config.redundancy.get() { self.real_packet_queue.push_back(packet.clone()); @@ -110,6 +130,7 @@ impl MixClientRunner { .expect("real packet queue should not be empty")) } Err(_) => { + // If no message received, generate and return a drop cover packet. let mut packets = Packet::build_drop_cover( Vec::from(Self::DROP_COVER_MSG), &self.config.topology, @@ -124,6 +145,8 @@ impl MixClientRunner { mod tests { use std::{num::NonZeroU8, time::Instant}; + use futures::StreamExt; + use crate::{ client::MixClientConfig, topology::{