1
0
mirror of synced 2025-01-23 22:18:54 +00:00

Mix: Offload transmission rate and message processing from libp2p behaviour/handler (#843)

This commit is contained in:
Youngjoon Lee 2024-10-24 21:00:26 +09:00 committed by GitHub
parent 97330a763f
commit ce24a03a23
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 131 additions and 166 deletions

View File

@ -279,8 +279,8 @@ pub fn update_mix(
mix.backend.peering_degree = peering_degree;
}
if let Some(num_mix_layers) = mix_num_mix_layers {
mix.backend.num_mix_layers = num_mix_layers;
if let Some(_num_mix_layers) = mix_num_mix_layers {
// TODO: Set num_mix_layers to the proper module setting
}
Ok(())

View File

@ -5,7 +5,7 @@ pub use error::Error;
use sha2::{Digest, Sha256};
pub const MSG_SIZE: usize = 2048;
pub const NOISE: [u8; MSG_SIZE] = [0; MSG_SIZE];
pub const DROP_MESSAGE: [u8; MSG_SIZE] = [0; MSG_SIZE];
/// A mock implementation of the Sphinx encoding.
///
@ -55,7 +55,7 @@ pub fn unwrap_message(message: &[u8]) -> Result<(Vec<u8>, bool), Error> {
}
}
/// Check if the message is a noise message.
pub fn is_noise(message: &[u8]) -> bool {
message == NOISE
/// Check if the message is a drop message.
pub fn is_drop_message(message: &[u8]) -> bool {
message == DROP_MESSAGE
}

View File

@ -12,14 +12,16 @@ use libp2p::{
},
Multiaddr, PeerId,
};
use nomos_mix_message::{message_id, unwrap_message};
use nomos_mix_message::{is_drop_message, message_id};
use crate::{
error::Error,
handler::{FromBehaviour, MixConnectionHandler, ToBehaviour},
};
/// A [`NetworkBehaviour`] that forwards messages between mix nodes.
/// A [`NetworkBehaviour`]:
/// - forwards messages to all connected peers with deduplication.
/// - receives messages from all connected peers.
pub struct Behaviour {
config: Config,
/// Peers that support the mix protocol, and their connection IDs
@ -35,14 +37,13 @@ pub struct Behaviour {
#[derive(Debug)]
pub struct Config {
pub transmission_rate: f64,
pub duplicate_cache_lifespan: u64,
}
#[derive(Debug)]
pub enum Event {
/// A fully unwrapped message received from one of the peers.
FullyUnwrappedMessage(Vec<u8>),
/// A message received from one of the peers.
Message(Vec<u8>),
Error(Error),
}
@ -58,39 +59,39 @@ impl Behaviour {
}
}
/// Publishs a message through the mix network.
///
/// This function expects that the message was already encoded for the cryptographic mixing
/// (e.g. Sphinx encoding).
///
/// The message is forward to all connected peers,
/// so that it can arrive in the mix node who can unwrap it one layer.
/// Fully unwrapped messages are returned as the [`MixBehaviourEvent::FullyUnwrappedMessage`].
/// Publish a message (data or drop) to all connected peers
pub fn publish(&mut self, message: Vec<u8>) -> Result<(), Error> {
self.duplicate_cache.cache_set(message_id(&message), ());
self.forward_message(message, None)
if is_drop_message(&message) {
// Bypass deduplication for the drop message
return self.forward_message(message, None);
}
let msg_id = message_id(&message);
// If the message was already seen, don't forward it again
if self.duplicate_cache.cache_get(&msg_id).is_some() {
return Ok(());
}
let result = self.forward_message(message, None);
// Add the message to the cache only if the forwarding was successfully triggered
if result.is_ok() {
self.duplicate_cache.cache_set(msg_id, ());
}
result
}
/// Forwards a message to all connected peers except the one that was received from.
/// Forwards a message to all connected peers except the excluded peer.
///
/// Returns [`Error::NoPeers`] if there are no connected peers that support the mix protocol.
fn forward_message(
&mut self,
message: Vec<u8>,
propagation_source: Option<PeerId>,
excluded_peer: Option<PeerId>,
) -> Result<(), Error> {
let peer_ids = self
.negotiated_peers
.keys()
.filter(|&peer_id| {
if let Some(propagation_source) = propagation_source {
*peer_id != propagation_source
} else {
true
}
})
.cloned()
.collect::<Vec<_>>();
let mut peer_ids: HashSet<_> = self.negotiated_peers.keys().collect();
if let Some(peer) = &excluded_peer {
peer_ids.remove(peer);
}
if peer_ids.is_empty() {
return Err(Error::NoPeers);
@ -99,7 +100,7 @@ impl Behaviour {
for peer_id in peer_ids.into_iter() {
tracing::debug!("Registering event for peer {:?} to send msg", peer_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
peer_id: *peer_id,
handler: NotifyHandler::Any,
event: FromBehaviour::Message(message.clone()),
});
@ -189,6 +190,12 @@ impl NetworkBehaviour for Behaviour {
match event {
// A message was forwarded from the peer.
ToBehaviour::Message(message) => {
// Ignore drop message
if is_drop_message(&message) {
return;
}
// Add the message to the cache. If it was already seen, ignore it.
if self
.duplicate_cache
.cache_set(message_id(&message), ())
@ -203,25 +210,10 @@ impl NetworkBehaviour for Behaviour {
tracing::error!("Failed to forward message: {e:?}");
}
// Try to unwrap the message.
// TODO: Abstract as Tier 2: Cryptographic Processor & Temporal Processor
match unwrap_message(&message) {
Ok((unwrapped_msg, fully_unwrapped)) => {
if fully_unwrapped {
self.events.push_back(ToSwarm::GenerateEvent(
Event::FullyUnwrappedMessage(unwrapped_msg),
));
} else if let Err(e) = self.forward_message(unwrapped_msg, None) {
tracing::error!("Failed to forward message: {:?}", e);
}
}
Err(nomos_mix_message::Error::MsgUnwrapNotAllowed) => {
tracing::debug!("Message cannot be unwrapped by this node");
}
Err(e) => {
tracing::error!("Failed to unwrap message: {:?}", e);
}
}
// Notify the swarm about the received message,
// so that it can be processed by the core protocol module.
self.events
.push_back(ToSwarm::GenerateEvent(Event::Message(message)));
}
// The connection was fully negotiated by the peer,
// which means that the peer supports the mix protocol.

View File

@ -2,11 +2,9 @@ use std::{
collections::VecDeque,
io,
task::{Context, Poll, Waker},
time::Duration,
};
use futures::{future::BoxFuture, AsyncReadExt, AsyncWriteExt, FutureExt};
use futures_timer::Delay;
use libp2p::{
core::upgrade::ReadyUpgrade,
swarm::{
@ -15,20 +13,19 @@ use libp2p::{
},
Stream, StreamProtocol,
};
use nomos_mix_message::{is_noise, MSG_SIZE, NOISE};
use nomos_mix_queue::{NonMixQueue, Queue};
use nomos_mix_message::MSG_SIZE;
use crate::behaviour::Config;
const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/nomos/mix/0.1.0");
// TODO: Consider replacing this struct with libp2p_stream ConnectionHandler
// because we don't implement persistent emission in the per-connection level anymore.
/// A [`ConnectionHandler`] that handles the mix protocol.
pub struct MixConnectionHandler {
inbound_substream: Option<MsgRecvFuture>,
outbound_substream: Option<OutboundSubstreamState>,
interval: Duration, // TODO: use absolute time
timer: Delay,
queue: Box<dyn Queue<Vec<u8>> + Send>,
outbound_msgs: VecDeque<Vec<u8>>,
pending_events_to_behaviour: VecDeque<ToBehaviour>,
waker: Option<Waker>,
}
@ -46,15 +43,11 @@ enum OutboundSubstreamState {
}
impl MixConnectionHandler {
pub fn new(config: &Config) -> Self {
let interval_sec = 1.0 / config.transmission_rate;
let interval = Duration::from_millis((interval_sec * 1000.0) as u64);
pub fn new(_config: &Config) -> Self {
Self {
inbound_substream: None,
outbound_substream: None,
interval,
timer: Delay::new(interval),
queue: Box::new(NonMixQueue::new(NOISE.to_vec())),
outbound_msgs: VecDeque::new(),
pending_events_to_behaviour: VecDeque::new(),
waker: None,
}
@ -109,17 +102,17 @@ impl ConnectionHandler for MixConnectionHandler {
}
// Process inbound stream
// TODO: Measure message frequencies and compare them to the desired frequencies
// for connection maintenance defined in the Tier 1 spec.
tracing::debug!("Processing inbound stream");
if let Some(msg_recv_fut) = self.inbound_substream.as_mut() {
match msg_recv_fut.poll_unpin(cx) {
Poll::Ready(Ok((stream, msg))) => {
tracing::debug!("Received message from inbound stream. Notifying behaviour...");
self.inbound_substream = Some(recv_msg(stream).boxed());
if !is_noise(&msg) {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
ToBehaviour::Message(msg),
));
}
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
ToBehaviour::Message(msg),
));
}
Poll::Ready(Err(e)) => {
tracing::error!("Failed to receive message from inbound stream: {:?}", e);
@ -140,21 +133,22 @@ impl ConnectionHandler for MixConnectionHandler {
return Poll::Pending;
}
// If the substream is idle, and if it's time to send a message, send it.
Some(OutboundSubstreamState::Idle(stream)) => match self.timer.poll_unpin(cx) {
Poll::Ready(_) => {
let msg = self.queue.pop();
tracing::debug!("Sending message to outbound stream: {:?}", msg);
self.outbound_substream = Some(OutboundSubstreamState::PendingSend(
send_msg(stream, msg).boxed(),
));
self.timer.reset(self.interval);
Some(OutboundSubstreamState::Idle(stream)) => {
match self.outbound_msgs.pop_front() {
Some(msg) => {
tracing::debug!("Sending message to outbound stream: {:?}", msg);
self.outbound_substream = Some(OutboundSubstreamState::PendingSend(
send_msg(stream, msg).boxed(),
));
}
None => {
tracing::debug!("Nothing to send to outbound stream");
self.outbound_substream = Some(OutboundSubstreamState::Idle(stream));
self.waker = Some(cx.waker().clone());
return Poll::Pending;
}
}
Poll::Pending => {
self.outbound_substream = Some(OutboundSubstreamState::Idle(stream));
self.waker = Some(cx.waker().clone());
return Poll::Pending;
}
},
}
// If a message is being sent, check if it's done.
Some(OutboundSubstreamState::PendingSend(mut msg_send_fut)) => {
match msg_send_fut.poll_unpin(cx) {
@ -191,7 +185,7 @@ impl ConnectionHandler for MixConnectionHandler {
fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
match event {
FromBehaviour::Message(msg) => {
self.queue.push(msg);
self.outbound_msgs.push_back(msg);
}
}
}

View File

@ -14,12 +14,12 @@ mod test {
swarm::{dummy, NetworkBehaviour, SwarmEvent},
Multiaddr, PeerId, Swarm, SwarmBuilder,
};
use nomos_mix_message::{new_message, MSG_SIZE};
use nomos_mix_message::MSG_SIZE;
use tokio::select;
use crate::{behaviour::Config, error::Error, Behaviour, Event};
/// Check that an wrapped message is forwarded through mix nodes and unwrapped successfully.
/// Check that a published messsage arrives in the peers successfully.
#[tokio::test]
async fn behaviour() {
let k1 = libp2p::identity::Keypair::generate_ed25519();
@ -33,23 +33,17 @@ mod test {
let addr: Multiaddr = "/ip4/127.0.0.1/udp/5073/quic-v1".parse().unwrap();
let addr_with_peer_id = addr.clone().with_p2p(peer_id1).unwrap();
// Spawn swarm1
tokio::spawn(async move {
swarm1.listen_on(addr).unwrap();
loop {
swarm1.select_next_some().await;
}
});
// Swarm1 listens on the address.
swarm1.listen_on(addr).unwrap();
// Dial to swarm1 from swarm2
tokio::time::sleep(Duration::from_secs(1)).await;
swarm2.dial(addr_with_peer_id).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
// Prepare a task for swarm2 to publish a two-layer wrapped message,
// receive an one-layer unwrapped message from swarm1,
// and return a fully unwrapped message.
// Swamr2 publishes a message.
let task = async {
let msg = vec![1; MSG_SIZE];
let mut msg_published = false;
let mut publish_try_interval = tokio::time::interval(Duration::from_secs(1));
loop {
@ -58,18 +52,18 @@ mod test {
// (It will fail until swarm2 is connected to swarm1 successfully.)
_ = publish_try_interval.tick() => {
if !msg_published {
// Prepare a message wrapped in two layers
let msg = new_message(&[1; MSG_SIZE - 1], 2).unwrap();
msg_published = swarm2.behaviour_mut().publish(msg).is_ok();
msg_published = swarm2.behaviour_mut().publish(msg.clone()).is_ok();
}
}
// Proceed swarm2
event = swarm2.select_next_some() => {
if let SwarmEvent::Behaviour(Event::FullyUnwrappedMessage(message)) = event {
println!("SWARM2 FULLY_UNWRAPPED_MESSAGE: {:?}", message);
// Proceed swarm1
event = swarm1.select_next_some() => {
if let SwarmEvent::Behaviour(Event::Message(received_msg)) = event {
assert_eq!(received_msg, msg);
break;
};
}
// Proceed swarm2
_ = swarm2.select_next_some() => {}
}
}
};
@ -94,13 +88,8 @@ mod test {
let addr: Multiaddr = "/ip4/127.0.0.1/udp/5074/quic-v1".parse().unwrap();
let addr_with_peer_id = addr.clone().with_p2p(peer_id1).unwrap();
// Spawn swarm1
tokio::spawn(async move {
swarm1.listen_on(addr).unwrap();
loop {
swarm1.select_next_some().await;
}
});
// Swarm1 listens on the address.
swarm1.listen_on(addr).unwrap();
// Dial to swarm1 from swarm2
tokio::time::sleep(Duration::from_secs(1)).await;
@ -109,18 +98,19 @@ mod test {
// Expect all publish attempts to fail with [`Error::NoPeers`]
// because swarm2 doesn't have any peers that support the mix protocol.
let msg = vec![1; MSG_SIZE];
let mut publish_try_interval = tokio::time::interval(Duration::from_secs(1));
let mut publish_try_count = 0;
loop {
select! {
_ = publish_try_interval.tick() => {
let msg = new_message(&[10; MSG_SIZE - 1], 1).unwrap();
assert!(matches!(swarm2.behaviour_mut().publish(msg), Err(Error::NoPeers)));
assert!(matches!(swarm2.behaviour_mut().publish(msg.clone()), Err(Error::NoPeers)));
publish_try_count += 1;
if publish_try_count >= 10 {
break;
}
}
_ = swarm1.select_next_some() => {}
_ = swarm2.select_next_some() => {}
}
}
@ -130,7 +120,6 @@ mod test {
new_swarm_with_behaviour(
key,
Behaviour::new(Config {
transmission_rate: 1.0,
duplicate_cache_lifespan: 60,
}),
)

View File

@ -305,7 +305,6 @@ pub fn new_mix_configs(listening_addresses: Vec<Multiaddr>) -> Vec<Libp2pMixBack
node_key: ed25519::SecretKey::generate(),
membership: Vec::new(),
peering_degree: 1,
num_mix_layers: 1,
})
.collect::<Vec<_>>();

View File

@ -25,7 +25,7 @@ pub struct Libp2pMixBackend {
#[allow(dead_code)]
task: JoinHandle<()>,
swarm_message_sender: mpsc::Sender<MixSwarmMessage>,
fully_unwrapped_message_sender: broadcast::Sender<Vec<u8>>,
incoming_message_sender: broadcast::Sender<Vec<u8>>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -36,7 +36,6 @@ pub struct Libp2pMixBackendSettings {
pub node_key: ed25519::SecretKey,
pub membership: Vec<Multiaddr>,
pub peering_degree: usize,
pub num_mix_layers: usize,
}
const CHANNEL_SIZE: usize = 64;
@ -47,15 +46,14 @@ impl MixBackend for Libp2pMixBackend {
fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self {
let (swarm_message_sender, swarm_message_receiver) = mpsc::channel(CHANNEL_SIZE);
let (fully_unwrapped_message_sender, _) = broadcast::channel(CHANNEL_SIZE);
let (incoming_message_sender, _) = broadcast::channel(CHANNEL_SIZE);
let keypair = Keypair::from(ed25519::Keypair::from(config.node_key.clone()));
let local_peer_id = keypair.public().to_peer_id();
let mut swarm = MixSwarm::new(
keypair,
config.num_mix_layers,
swarm_message_receiver,
fully_unwrapped_message_sender.clone(),
incoming_message_sender.clone(),
);
swarm
@ -89,25 +87,23 @@ impl MixBackend for Libp2pMixBackend {
Self {
task,
swarm_message_sender,
fully_unwrapped_message_sender,
incoming_message_sender,
}
}
async fn mix(&self, msg: Vec<u8>) {
async fn publish(&self, msg: Vec<u8>) {
if let Err(e) = self
.swarm_message_sender
.send(MixSwarmMessage::Mix(msg))
.send(MixSwarmMessage::Publish(msg))
.await
{
tracing::error!("Failed to send message to MixSwarm: {e}");
}
}
fn listen_to_fully_unwrapped_messages(
&mut self,
) -> Pin<Box<dyn Stream<Item = Vec<u8>> + Send>> {
fn listen_to_incoming_messages(&mut self) -> Pin<Box<dyn Stream<Item = Vec<u8>> + Send>> {
Box::pin(
BroadcastStream::new(self.fully_unwrapped_message_sender.subscribe())
BroadcastStream::new(self.incoming_message_sender.subscribe())
.filter_map(|event| async { event.ok() }),
)
}
@ -115,29 +111,26 @@ impl MixBackend for Libp2pMixBackend {
struct MixSwarm {
swarm: Swarm<nomos_mix_network::Behaviour>,
num_mix_layers: usize,
swarm_messages_receiver: mpsc::Receiver<MixSwarmMessage>,
fully_unwrapped_messages_sender: broadcast::Sender<Vec<u8>>,
incoming_message_sender: broadcast::Sender<Vec<u8>>,
}
#[derive(Debug)]
pub enum MixSwarmMessage {
Mix(Vec<u8>),
Publish(Vec<u8>),
}
impl MixSwarm {
fn new(
keypair: Keypair,
num_mix_layers: usize,
swarm_messages_receiver: mpsc::Receiver<MixSwarmMessage>,
fully_unwrapped_messages_sender: broadcast::Sender<Vec<u8>>,
incoming_message_sender: broadcast::Sender<Vec<u8>>,
) -> Self {
let swarm = SwarmBuilder::with_existing_identity(keypair)
.with_tokio()
.with_quic()
.with_behaviour(|_| {
nomos_mix_network::Behaviour::new(nomos_mix_network::Config {
transmission_rate: 1.0,
duplicate_cache_lifespan: 60,
})
})
@ -149,9 +142,8 @@ impl MixSwarm {
Self {
swarm,
num_mix_layers,
swarm_messages_receiver,
fully_unwrapped_messages_sender,
incoming_message_sender,
}
}
@ -178,18 +170,9 @@ impl MixSwarm {
async fn handle_swarm_message(&mut self, msg: MixSwarmMessage) {
match msg {
MixSwarmMessage::Mix(msg) => {
tracing::debug!("Wrap msg and send it to mix network: {msg:?}");
match nomos_mix_message::new_message(&msg, self.num_mix_layers.try_into().unwrap())
{
Ok(wrapped_msg) => {
if let Err(e) = self.swarm.behaviour_mut().publish(wrapped_msg) {
tracing::error!("Failed to publish message to mix network: {e:?}");
}
}
Err(e) => {
tracing::error!("Failed to wrap message: {e:?}");
}
MixSwarmMessage::Publish(msg) => {
if let Err(e) = self.swarm.behaviour_mut().publish(msg) {
tracing::error!("Failed to publish message to mix network: {e:?}");
}
}
}
@ -197,9 +180,9 @@ impl MixSwarm {
fn handle_event(&mut self, event: SwarmEvent<nomos_mix_network::Event>) {
match event {
SwarmEvent::Behaviour(nomos_mix_network::Event::FullyUnwrappedMessage(msg)) => {
tracing::debug!("Received fully unwrapped message: {msg:?}");
self.fully_unwrapped_messages_sender.send(msg).unwrap();
SwarmEvent::Behaviour(nomos_mix_network::Event::Message(msg)) => {
tracing::debug!("Received message from a peer: {msg:?}");
self.incoming_message_sender.send(msg).unwrap();
}
SwarmEvent::Behaviour(nomos_mix_network::Event::Error(e)) => {
tracing::error!("Received error from mix network: {e:?}");

View File

@ -12,9 +12,8 @@ pub trait MixBackend {
type Settings: Clone + Debug + Send + Sync + 'static;
fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self;
async fn mix(&self, msg: Vec<u8>);
/// Listen to fully unwrapped messages returned from the mix network
/// if this node is the last node that can unwrap the data message.
fn listen_to_fully_unwrapped_messages(&mut self)
-> Pin<Box<dyn Stream<Item = Vec<u8>> + Send>>;
/// Publish a message to the mix network.
async fn publish(&self, msg: Vec<u8>);
/// Listen to messages received from the mix network.
fn listen_to_incoming_messages(&mut self) -> Pin<Box<dyn Stream<Item = Vec<u8>> + Send>>;
}

View File

@ -8,6 +8,7 @@ use backends::MixBackend;
use futures::StreamExt;
use network::NetworkAdapter;
use nomos_core::wire;
use nomos_mix_message::{new_message, unwrap_message};
use nomos_network::NetworkService;
use overwatch_rs::services::{
handle::ServiceStateHandle,
@ -80,16 +81,21 @@ where
let network_relay = network_relay.connect().await?;
let network_adapter = Network::new(network_relay);
// A channel to listen to fully unwrapped messages returned from the [`MixBackend`]
// if this node is the last node that can unwrap the data message.
let mut fully_unwrapped_messages_receiver = backend.listen_to_fully_unwrapped_messages();
// TODO: Spawn PersistentTransmission (Tier 1)
// TODO: Spawn Processor (Tier 2) and connect it to PersistentTransmission
// A channel to listen to messages received from the [`MixBackend`]
let mut incoming_message_stream = backend.listen_to_incoming_messages();
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
loop {
tokio::select! {
Some(msg) = fully_unwrapped_messages_receiver.next() => {
tracing::debug!("Received fully unwrapped message from mix backend. Broadcasting it to the network service..");
// Deserialize the received message to get the topic for broadcasting
Some(msg) = incoming_message_stream.next() => {
// TODO: The following logic is wrong and temporary.
// Here we're unwrapping the message and broadcasting it,
// but the message should be handled by Processor and PersistentTransmission.
let (msg, fully_unwrapped) = unwrap_message(&msg).unwrap();
assert!(fully_unwrapped);
match wire::deserialize::<NetworkMessage<Network::BroadcastSettings>>(&msg) {
Ok(msg) => {
network_adapter.broadcast(msg.message, msg.broadcast_settings).await;
@ -127,7 +133,11 @@ where
ServiceMessage::Mix(msg) => {
// split sending in two steps to help the compiler understand we do not
// need to hold an instance of &I (which is not send) across an await point
let _send = backend.mix(wire::serialize(&msg).unwrap());
// TODO: The following logic is wrong and temporary.
// Here we're wrapping the message here and publishing the message immediately,
// but the message should be handled by Processor and PersistentTransmission.
let wrapped_msg = new_message(&wire::serialize(&msg).unwrap(), 1).unwrap();
let _send = backend.publish(wrapped_msg);
_send.await
}
}

View File

@ -28,7 +28,6 @@ pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec<GeneralMixConfig> {
node_key,
membership: Vec::new(),
peering_degree: 1,
num_mix_layers: 1,
},
}
})