Mix: Remove drop messages

This commit is contained in:
Youngjoon Lee 2024-11-21 10:42:01 +09:00
parent f71db1a7fe
commit 2270e3bc12
No known key found for this signature in database
GPG Key ID: 303963A54A81DD4D
8 changed files with 37 additions and 143 deletions

View File

@ -19,4 +19,3 @@ x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] }
[dev-dependencies] [dev-dependencies]
tokio = { version = "1", features = ["rt-multi-thread"] } tokio = { version = "1", features = ["rt-multi-thread"] }
rand_chacha = "0.3"

View File

@ -1,9 +1,5 @@
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use nomos_mix_message::MixMessage;
use rand::{distributions::Uniform, prelude::Distribution, Rng, RngCore};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::marker::PhantomData;
use std::pin::{pin, Pin}; use std::pin::{pin, Pin};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
@ -11,59 +7,38 @@ use std::task::{Context, Poll};
pub struct PersistentTransmissionSettings { pub struct PersistentTransmissionSettings {
/// The maximum number of messages that can be emitted per second /// The maximum number of messages that can be emitted per second
pub max_emission_frequency: f64, pub max_emission_frequency: f64,
/// The probability of emitting a drop message by coin flipping
pub drop_message_probability: f64,
} }
impl Default for PersistentTransmissionSettings { impl Default for PersistentTransmissionSettings {
fn default() -> Self { fn default() -> Self {
Self { Self {
max_emission_frequency: 1.0, max_emission_frequency: 1.0,
drop_message_probability: 0.5,
} }
} }
} }
/// Transmit scheduled messages with a persistent rate as a stream. /// Transmit scheduled messages with a persistent rate as a stream.
pub struct PersistentTransmissionStream<S, Rng, M, Scheduler> pub struct PersistentTransmissionStream<S, Scheduler>
where where
S: Stream, S: Stream,
Rng: RngCore,
{ {
coin: Coin<Rng>,
stream: S, stream: S,
scheduler: Scheduler, scheduler: Scheduler,
_mix_message: PhantomData<M>,
} }
impl<S, Rng, M, Scheduler> PersistentTransmissionStream<S, Rng, M, Scheduler> impl<S, Scheduler> PersistentTransmissionStream<S, Scheduler>
where where
S: Stream, S: Stream,
Rng: RngCore,
M: MixMessage,
Scheduler: Stream<Item = ()>, Scheduler: Stream<Item = ()>,
{ {
pub fn new( pub fn new(stream: S, scheduler: Scheduler) -> PersistentTransmissionStream<S, Scheduler> {
settings: PersistentTransmissionSettings, Self { stream, scheduler }
stream: S,
scheduler: Scheduler,
rng: Rng,
) -> PersistentTransmissionStream<S, Rng, M, Scheduler> {
let coin = Coin::<Rng>::new(rng, settings.drop_message_probability).unwrap();
Self {
coin,
stream,
scheduler,
_mix_message: Default::default(),
}
} }
} }
impl<S, Rng, M, Scheduler> Stream for PersistentTransmissionStream<S, Rng, M, Scheduler> impl<S, Scheduler> Stream for PersistentTransmissionStream<S, Scheduler>
where where
S: Stream<Item = Vec<u8>> + Unpin, S: Stream<Item = Vec<u8>> + Unpin,
Rng: RngCore + Unpin,
M: MixMessage + Unpin,
Scheduler: Stream<Item = ()> + Unpin, Scheduler: Stream<Item = ()> + Unpin,
{ {
type Item = Vec<u8>; type Item = Vec<u8>;
@ -72,7 +47,6 @@ where
let Self { let Self {
ref mut scheduler, ref mut scheduler,
ref mut stream, ref mut stream,
ref mut coin,
.. ..
} = self.get_mut(); } = self.get_mut();
if pin!(scheduler).poll_next_unpin(cx).is_pending() { if pin!(scheduler).poll_next_unpin(cx).is_pending() {
@ -80,79 +54,38 @@ where
} }
if let Poll::Ready(Some(item)) = pin!(stream).poll_next(cx) { if let Poll::Ready(Some(item)) = pin!(stream).poll_next(cx) {
Poll::Ready(Some(item)) Poll::Ready(Some(item))
} else if coin.flip() {
Poll::Ready(Some(M::DROP_MESSAGE.to_vec()))
} else { } else {
Poll::Pending Poll::Pending
} }
} }
} }
pub trait PersistentTransmissionExt<Rng, M, Scheduler>: Stream pub trait PersistentTransmissionExt<Scheduler>: Stream
where where
Rng: RngCore,
M: MixMessage,
Scheduler: Stream<Item = ()>, Scheduler: Stream<Item = ()>,
{ {
fn persistent_transmission( fn persistent_transmission(
self, self,
settings: PersistentTransmissionSettings,
rng: Rng,
scheduler: Scheduler, scheduler: Scheduler,
) -> PersistentTransmissionStream<Self, Rng, M, Scheduler> ) -> PersistentTransmissionStream<Self, Scheduler>
where where
Self: Sized + Unpin, Self: Sized + Unpin,
{ {
PersistentTransmissionStream::new(settings, self, scheduler, rng) PersistentTransmissionStream::new(self, scheduler)
} }
} }
impl<S, Rng, M, Scheduler> PersistentTransmissionExt<Rng, M, Scheduler> for S impl<S, Scheduler> PersistentTransmissionExt<Scheduler> for S
where where
S: Stream, S: Stream,
Rng: RngCore,
M: MixMessage,
M::PublicKey: Clone + Serialize + DeserializeOwned,
Scheduler: Stream<Item = ()>, Scheduler: Stream<Item = ()>,
{ {
} }
struct Coin<R: Rng> {
rng: R,
distribution: Uniform<f64>,
probability: f64,
}
impl<R: Rng> Coin<R> {
fn new(rng: R, probability: f64) -> Result<Self, CoinError> {
if !(0.0..=1.0).contains(&probability) {
return Err(CoinError::InvalidProbability);
}
Ok(Self {
rng,
distribution: Uniform::from(0.0..1.0),
probability,
})
}
// Flip the coin based on the given probability.
fn flip(&mut self) -> bool {
self.distribution.sample(&mut self.rng) < self.probability
}
}
#[derive(Debug)]
enum CoinError {
InvalidProbability,
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use futures::StreamExt; use futures::StreamExt;
use nomos_mix_message::mock::MockMixMessage;
use rand::SeedableRng;
use rand_chacha::ChaCha8Rng;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::time; use tokio::time;
@ -186,8 +119,6 @@ mod tests {
let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(schedule_receiver); let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(schedule_receiver);
let settings = PersistentTransmissionSettings { let settings = PersistentTransmissionSettings {
max_emission_frequency: 1.0, max_emission_frequency: 1.0,
// Set to always emit drop messages if no scheduled messages for easy testing
drop_message_probability: 1.0,
}; };
// Prepare the expected emission interval with torelance // Prepare the expected emission interval with torelance
let expected_emission_interval = let expected_emission_interval =
@ -196,16 +127,10 @@ mod tests {
let lower_bound = expected_emission_interval - torelance; let lower_bound = expected_emission_interval - torelance;
let upper_bound = expected_emission_interval + torelance; let upper_bound = expected_emission_interval + torelance;
// prepare stream // prepare stream
let mut persistent_transmission_stream: PersistentTransmissionStream< let mut persistent_transmission_stream: PersistentTransmissionStream<_, _> = stream
_, .persistent_transmission(
_, IntervalStream::new(time::interval(expected_emission_interval)).map(|_| ()),
MockMixMessage, );
_,
> = stream.persistent_transmission(
settings,
ChaCha8Rng::from_entropy(),
IntervalStream::new(time::interval(expected_emission_interval)).map(|_| ()),
);
// Messages must be scheduled in non-blocking manner. // Messages must be scheduled in non-blocking manner.
schedule_sender.send(vec![1]).unwrap(); schedule_sender.send(vec![1]).unwrap();
schedule_sender.send(vec![2]).unwrap(); schedule_sender.send(vec![2]).unwrap();
@ -230,15 +155,17 @@ mod tests {
); );
assert_interval!(&mut last_time, lower_bound, upper_bound); assert_interval!(&mut last_time, lower_bound, upper_bound);
assert!(MockMixMessage::is_drop_message( // Check if nothing is emitted when there is no message scheduled
&persistent_transmission_stream.next().await.unwrap() let timeout = tokio::time::timeout(
)); expected_emission_interval,
assert_interval!(&mut last_time, lower_bound, upper_bound); persistent_transmission_stream.next(),
)
assert!(MockMixMessage::is_drop_message( .await;
&persistent_transmission_stream.next().await.unwrap() assert!(
)); timeout.is_err(),
assert_interval!(&mut last_time, lower_bound, upper_bound); "Timeout must occur because no message is scheduled."
);
last_time = time::Instant::now();
// Schedule a new message and check if it is emitted at the next interval // Schedule a new message and check if it is emitted at the next interval
schedule_sender.send(vec![4]).unwrap(); schedule_sender.send(vec![4]).unwrap();

View File

@ -7,7 +7,6 @@ pub use error::Error;
pub trait MixMessage { pub trait MixMessage {
type PublicKey; type PublicKey;
type PrivateKey; type PrivateKey;
const DROP_MESSAGE: &'static [u8];
fn build_message(payload: &[u8], public_keys: &[Self::PublicKey]) -> Result<Vec<u8>, Error>; fn build_message(payload: &[u8], public_keys: &[Self::PublicKey]) -> Result<Vec<u8>, Error>;
/// Unwrap the message one layer. /// Unwrap the message one layer.
@ -21,7 +20,4 @@ pub trait MixMessage {
message: &[u8], message: &[u8],
private_key: &Self::PrivateKey, private_key: &Self::PrivateKey,
) -> Result<(Vec<u8>, bool), Error>; ) -> Result<(Vec<u8>, bool), Error>;
fn is_drop_message(message: &[u8]) -> bool {
message == Self::DROP_MESSAGE
}
} }

View File

@ -17,7 +17,6 @@ pub struct MockMixMessage;
impl MixMessage for MockMixMessage { impl MixMessage for MockMixMessage {
type PublicKey = [u8; NODE_ID_SIZE]; type PublicKey = [u8; NODE_ID_SIZE];
type PrivateKey = [u8; NODE_ID_SIZE]; type PrivateKey = [u8; NODE_ID_SIZE];
const DROP_MESSAGE: &'static [u8] = &[0; MESSAGE_SIZE];
/// The length of the encoded message is fixed to [`MESSAGE_SIZE`] bytes. /// The length of the encoded message is fixed to [`MESSAGE_SIZE`] bytes.
/// The [`MAX_LAYERS`] number of [`NodeId`]s are concatenated in front of the payload. /// The [`MAX_LAYERS`] number of [`NodeId`]s are concatenated in front of the payload.

View File

@ -11,9 +11,7 @@ use libp2p::{
}, },
Multiaddr, PeerId, Multiaddr, PeerId,
}; };
use nomos_mix_message::MixMessage;
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::marker::PhantomData;
use std::{ use std::{
collections::{HashMap, HashSet, VecDeque}, collections::{HashMap, HashSet, VecDeque},
task::{Context, Poll, Waker}, task::{Context, Poll, Waker},
@ -22,7 +20,7 @@ use std::{
/// A [`NetworkBehaviour`]: /// A [`NetworkBehaviour`]:
/// - forwards messages to all connected peers with deduplication. /// - forwards messages to all connected peers with deduplication.
/// - receives messages from all connected peers. /// - receives messages from all connected peers.
pub struct Behaviour<M> { pub struct Behaviour {
config: Config, config: Config,
/// Peers that support the mix protocol, and their connection IDs /// Peers that support the mix protocol, and their connection IDs
negotiated_peers: HashMap<PeerId, HashSet<ConnectionId>>, negotiated_peers: HashMap<PeerId, HashSet<ConnectionId>>,
@ -33,7 +31,6 @@ pub struct Behaviour<M> {
/// An LRU time cache for storing seen messages (based on their ID). This cache prevents /// An LRU time cache for storing seen messages (based on their ID). This cache prevents
/// duplicates from being propagated on the network. /// duplicates from being propagated on the network.
duplicate_cache: TimedCache<Vec<u8>, ()>, duplicate_cache: TimedCache<Vec<u8>, ()>,
_mix_message: PhantomData<M>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -48,10 +45,7 @@ pub enum Event {
Error(Error), Error(Error),
} }
impl<M> Behaviour<M> impl Behaviour {
where
M: MixMessage,
{
pub fn new(config: Config) -> Self { pub fn new(config: Config) -> Self {
let duplicate_cache = TimedCache::with_lifespan(config.duplicate_cache_lifespan); let duplicate_cache = TimedCache::with_lifespan(config.duplicate_cache_lifespan);
Self { Self {
@ -60,17 +54,11 @@ where
events: VecDeque::new(), events: VecDeque::new(),
waker: None, waker: None,
duplicate_cache, duplicate_cache,
_mix_message: Default::default(),
} }
} }
/// Publish a message (data or drop) to all connected peers /// Publish a message to all connected peers
pub fn publish(&mut self, message: Vec<u8>) -> Result<(), Error> { pub fn publish(&mut self, message: Vec<u8>) -> Result<(), Error> {
if M::is_drop_message(&message) {
// Bypass deduplication for the drop message
return self.forward_message(message, None);
}
let msg_id = Self::message_id(&message); let msg_id = Self::message_id(&message);
// If the message was already seen, don't forward it again // If the message was already seen, don't forward it again
if self.duplicate_cache.cache_get(&msg_id).is_some() { if self.duplicate_cache.cache_get(&msg_id).is_some() {
@ -155,10 +143,7 @@ where
} }
} }
impl<M> NetworkBehaviour for Behaviour<M> impl NetworkBehaviour for Behaviour {
where
M: MixMessage + 'static,
{
type ConnectionHandler = MixConnectionHandler; type ConnectionHandler = MixConnectionHandler;
type ToSwarm = Event; type ToSwarm = Event;
@ -205,11 +190,6 @@ where
match event { match event {
// A message was forwarded from the peer. // A message was forwarded from the peer.
ToBehaviour::Message(message) => { ToBehaviour::Message(message) => {
// Ignore drop message
if M::is_drop_message(&message) {
return;
}
// Add the message to the cache. If it was already seen, ignore it. // Add the message to the cache. If it was already seen, ignore it.
if self if self
.duplicate_cache .duplicate_cache

View File

@ -14,7 +14,6 @@ mod test {
swarm::{dummy, NetworkBehaviour, SwarmEvent}, swarm::{dummy, NetworkBehaviour, SwarmEvent},
Multiaddr, PeerId, Swarm, SwarmBuilder, Multiaddr, PeerId, Swarm, SwarmBuilder,
}; };
use nomos_mix_message::mock::MockMixMessage;
use tokio::select; use tokio::select;
use crate::{behaviour::Config, error::Error, Behaviour, Event}; use crate::{behaviour::Config, error::Error, Behaviour, Event};
@ -116,7 +115,7 @@ mod test {
} }
} }
fn new_swarm(key: Keypair) -> Swarm<Behaviour<MockMixMessage>> { fn new_swarm(key: Keypair) -> Swarm<Behaviour> {
new_swarm_with_behaviour( new_swarm_with_behaviour(
key, key,
Behaviour::new(Config { Behaviour::new(Config {

View File

@ -106,7 +106,7 @@ impl MixBackend for Libp2pMixBackend {
} }
struct MixSwarm { struct MixSwarm {
swarm: Swarm<nomos_mix_network::Behaviour<MockMixMessage>>, swarm: Swarm<nomos_mix_network::Behaviour>,
swarm_messages_receiver: mpsc::Receiver<MixSwarmMessage>, swarm_messages_receiver: mpsc::Receiver<MixSwarmMessage>,
incoming_message_sender: broadcast::Sender<Vec<u8>>, incoming_message_sender: broadcast::Sender<Vec<u8>>,
} }

View File

@ -107,19 +107,13 @@ where
// tier 1 persistent transmission // tier 1 persistent transmission
let (persistent_sender, persistent_receiver) = mpsc::unbounded_channel(); let (persistent_sender, persistent_receiver) = mpsc::unbounded_channel();
let mut persistent_transmission_messages: PersistentTransmissionStream< let mut persistent_transmission_messages: PersistentTransmissionStream<_, _> =
_, UnboundedReceiverStream::new(persistent_receiver).persistent_transmission(
_, IntervalStream::new(time::interval(Duration::from_secs_f64(
MockMixMessage, 1.0 / mix_config.persistent_transmission.max_emission_frequency,
_, )))
> = UnboundedReceiverStream::new(persistent_receiver).persistent_transmission( .map(|_| ()),
mix_config.persistent_transmission, );
ChaCha12Rng::from_entropy(),
IntervalStream::new(time::interval(Duration::from_secs_f64(
1.0 / mix_config.persistent_transmission.max_emission_frequency,
)))
.map(|_| ()),
);
// tier 2 blend // tier 2 blend
let temporal_scheduler = TemporalScheduler::new( let temporal_scheduler = TemporalScheduler::new(