Implement Stream for MixClient (#616)

This commit is contained in:
Youngjoon Lee 2024-03-20 20:13:00 +09:00 committed by GitHub
parent d449114044
commit 502c791318
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 60 additions and 37 deletions

View File

@ -1,5 +1,11 @@
use std::{collections::VecDeque, num::NonZeroU8};
use std::{
collections::VecDeque,
num::NonZeroU8,
pin::Pin,
task::{Context, Poll},
};
use futures::{Future, Stream};
use rand::rngs::OsRng;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
@ -12,15 +18,11 @@ use crate::{error::MixnetError, packet::Packet, poisson::Poisson, topology::Mixn
/// If there is no messages inserted to the [`MessageQueue`], cover packets are generated and
/// returned from [`MixClient.next()`].
pub struct MixClient {
packet_rx: mpsc::UnboundedReceiver<Packet>,
}
struct MixClientRunner {
config: MixClientConfig,
poisson: Poisson,
message_queue: mpsc::Receiver<Vec<u8>>,
real_packet_queue: VecDeque<Packet>,
packet_tx: mpsc::UnboundedSender<Packet>,
delay: Option<Pin<Box<tokio::time::Sleep>>>,
}
/// Mix client configuration
@ -46,59 +48,77 @@ impl MixClient {
pub fn new(config: MixClientConfig) -> Result<(Self, MessageQueue), MixnetError> {
let poisson = Poisson::new(config.emission_rate_per_min)?;
let (tx, rx) = mpsc::channel(MESSAGE_QUEUE_SIZE);
let (packet_tx, packet_rx) = mpsc::unbounded_channel();
MixClientRunner {
config,
poisson,
message_queue: rx,
real_packet_queue: VecDeque::new(),
packet_tx,
}
.run();
Ok((Self { packet_rx }, tx))
}
/// Returns a next [`Packet`] to be emitted, if it exists and the Poisson timer is done.
pub async fn next(&mut self) -> Option<Packet> {
self.packet_rx.recv().await
Ok((
Self {
config,
poisson,
message_queue: rx,
real_packet_queue: VecDeque::new(),
delay: None,
},
tx,
))
}
}
impl MixClientRunner {
fn run(mut self) {
tokio::spawn(async move {
let mut delay = tokio::time::sleep(self.poisson.interval(&mut OsRng));
loop {
let next_deadline = delay.deadline() + self.poisson.interval(&mut OsRng);
delay.await;
delay = tokio::time::sleep_until(next_deadline);
impl Stream for MixClient {
type Item = Packet;
match self.next_packet().await {
Ok(packet) => {
// packet_tx is always expected to be not closed/dropped.
self.packet_tx.send(packet).unwrap();
}
/// Returns a next [`Packet`] to be emitted, if it exists and the Poisson timer is done.
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.delay.is_none() {
// We've never set an initial delay. Let's do it now.
cx.waker().wake_by_ref();
self.delay = Some(Box::pin(tokio::time::sleep(
self.poisson.interval(&mut OsRng),
)));
return Poll::Pending;
}
match self.delay.as_mut().unwrap().as_mut().poll(cx) {
Poll::Pending => {
// The delay hasn't elapsed yet.
// The current task is automatically scheduled to be woken up once the timer elapses,
// thanks to the `tokio::time::Sleep.poll(cx)`.
Poll::Pending
}
Poll::Ready(_) => {
// The delay has elapsed. Let's reset the delay and return the next packet.
let next_interval = self.poisson.interval(&mut OsRng);
let delay = self.delay.as_mut().unwrap();
let next_deadline = delay.deadline() + next_interval;
delay.as_mut().reset(next_deadline);
match self.next_packet() {
Ok(packet) => Poll::Ready(Some(packet)),
Err(e) => {
tracing::error!(
"failed to find a next packet to emit. skipping to the next turn: {e}"
);
Poll::Pending
}
}
}
});
}
}
}
impl MixClient {
const DROP_COVER_MSG: &'static [u8] = b"drop cover";
async fn next_packet(&mut self) -> Result<Packet, MixnetError> {
// Returns either a real packet or a drop cover packet.
fn next_packet(&mut self) -> Result<Packet, MixnetError> {
// If there is any redundant real packet scheduled, return it.
if let Some(packet) = self.real_packet_queue.pop_front() {
return Ok(packet);
}
match self.message_queue.try_recv() {
Ok(msg) => {
// If there is any message received, build real packets out of it and
// schedule them in the queue.
for packet in Packet::build_real(msg, &self.config.topology)? {
for _ in 0..self.config.redundancy.get() {
self.real_packet_queue.push_back(packet.clone());
@ -110,6 +130,7 @@ impl MixClientRunner {
.expect("real packet queue should not be empty"))
}
Err(_) => {
// If no message received, generate and return a drop cover packet.
let mut packets = Packet::build_drop_cover(
Vec::from(Self::DROP_COVER_MSG),
&self.config.topology,
@ -124,6 +145,8 @@ impl MixClientRunner {
mod tests {
use std::{num::NonZeroU8, time::Instant};
use futures::StreamExt;
use crate::{
client::MixClientConfig,
topology::{