diff --git a/nomos-services/network/src/backends/libp2p/mixnet.rs b/nomos-services/network/src/backends/libp2p/mixnet.rs index fc32a2ca..1445db4c 100644 --- a/nomos-services/network/src/backends/libp2p/mixnet.rs +++ b/nomos-services/network/src/backends/libp2p/mixnet.rs @@ -14,10 +14,8 @@ use nomos_libp2p::{ Multiaddr, Protocol, }; use serde::{Deserialize, Serialize}; -use tokio::{ - runtime::Handle, - sync::{mpsc, oneshot}, -}; +use tokio::sync::mpsc::{self}; +use tokio::{runtime::Handle, sync::oneshot}; use crate::backends::libp2p::{Command, Dial, Topic}; @@ -29,158 +27,174 @@ pub struct MixnetConfig { pub(crate) const STREAM_PROTOCOL: StreamProtocol = StreamProtocol::new("/mixnet"); -pub(crate) fn init_mixnet( - config: MixnetConfig, - runtime_handle: Handle, - cmd_tx: mpsc::Sender, - incoming_streams: IncomingStreams, -) -> MessageQueue { - // Run mixnode - let (mixnode, packet_queue) = MixNode::new(config.mixnode).unwrap(); - let libp2p_cmd_tx = cmd_tx.clone(); - let queue = packet_queue.clone(); - runtime_handle.spawn(async move { - run_mixnode(mixnode, queue, libp2p_cmd_tx).await; - }); - let handle = runtime_handle.clone(); - let queue = packet_queue.clone(); - runtime_handle.spawn(async move { - handle_incoming_streams(incoming_streams, queue, handle).await; - }); - - // Run mixclient - let (mixclient, message_queue) = MixClient::new(config.mixclient).unwrap(); - runtime_handle.spawn(async move { - run_mixclient(mixclient, packet_queue, cmd_tx).await; - }); - - message_queue +/// Mixnet extension for Libp2p network backend +pub(crate) struct Mixnet { + message_queue: MessageQueue, } -async fn run_mixnode( - mut mixnode: MixNode, - packet_queue: PacketQueue, - cmd_tx: mpsc::Sender, -) { - while let Some(output) = mixnode.next().await { - match output { - Output::Forward(packet) => { - stream_send(packet.address(), packet.body(), &cmd_tx, &packet_queue).await; - } - Output::ReconstructedMessage(message) => match MixnetMessage::from_bytes(&message) { - Ok(msg) => { - cmd_tx - .send(Command::Broadcast { - topic: msg.topic, - message: msg.message, - }) - .await - .unwrap(); - } - Err(e) => { - tracing::error!("failed to parse message received from mixnet: {e}"); - } - }, - } - } -} - -async fn run_mixclient( - mut mixclient: MixClient, - packet_queue: PacketQueue, - cmd_tx: mpsc::Sender, -) { - while let Some(packet) = mixclient.next().await { - stream_send(packet.address(), packet.body(), &cmd_tx, &packet_queue).await; - } -} - -async fn handle_incoming_streams( - mut incoming_streams: IncomingStreams, - packet_queue: PacketQueue, - runtime_handle: Handle, -) { - while let Some((_, stream)) = incoming_streams.next().await { +impl Mixnet { + /// Creates a Mixnet instance by spawning tasks to run mixnode and mixclient + pub(crate) fn new( + config: MixnetConfig, + runtime_handle: Handle, + cmd_tx: mpsc::Sender, + incoming_streams: IncomingStreams, + ) -> Self { + // Run mixnode + let (mixnode, packet_queue) = MixNode::new(config.mixnode).unwrap(); + let libp2p_cmd_tx = cmd_tx.clone(); let queue = packet_queue.clone(); runtime_handle.spawn(async move { - if let Err(e) = handle_stream(stream, queue).await { - tracing::warn!("stream closed: {e}"); - } + Self::run_mixnode(mixnode, queue, libp2p_cmd_tx).await; + }); + let handle = runtime_handle.clone(); + let queue = packet_queue.clone(); + runtime_handle.spawn(async move { + Self::handle_incoming_streams(incoming_streams, queue, handle).await; }); - } -} -async fn handle_stream(mut stream: Stream, packet_queue: PacketQueue) -> std::io::Result<()> { - loop { - match PacketBody::read_from(&mut stream).await? { - Ok(packet_body) => { - packet_queue - .send(packet_body) - .await - .expect("The receiving half of packet queue should be always open"); + // Run mixclient + let (mixclient, message_queue) = MixClient::new(config.mixclient).unwrap(); + runtime_handle.spawn(async move { + Self::run_mixclient(mixclient, packet_queue, cmd_tx).await; + }); + + Self { message_queue } + } + + /// Returns MessageQueue for sending messages to mixnet (via mixclient) + pub(crate) fn message_queue(&self) -> &MessageQueue { + &self.message_queue + } + + async fn run_mixnode( + mut mixnode: MixNode, + packet_queue: PacketQueue, + cmd_tx: mpsc::Sender, + ) { + while let Some(output) = mixnode.next().await { + match output { + Output::Forward(packet) => { + Self::stream_send(packet.address(), packet.body(), &cmd_tx, &packet_queue) + .await; + } + Output::ReconstructedMessage(message) => { + match MixnetMessage::from_bytes(&message) { + Ok(msg) => { + cmd_tx + .send(Command::Broadcast { + topic: msg.topic, + message: msg.message, + }) + .await + .unwrap(); + } + Err(e) => { + tracing::error!("failed to parse message received from mixnet: {e}"); + } + } + } } + } + } + + async fn run_mixclient( + mut mixclient: MixClient, + packet_queue: PacketQueue, + cmd_tx: mpsc::Sender, + ) { + while let Some(packet) = mixclient.next().await { + Self::stream_send(packet.address(), packet.body(), &cmd_tx, &packet_queue).await; + } + } + + async fn handle_incoming_streams( + mut incoming_streams: IncomingStreams, + packet_queue: PacketQueue, + runtime_handle: Handle, + ) { + while let Some((_, stream)) = incoming_streams.next().await { + let queue = packet_queue.clone(); + runtime_handle.spawn(async move { + if let Err(e) = Self::handle_stream(stream, queue).await { + tracing::warn!("stream closed: {e}"); + } + }); + } + } + + async fn handle_stream(mut stream: Stream, packet_queue: PacketQueue) -> std::io::Result<()> { + loop { + match PacketBody::read_from(&mut stream).await? { + Ok(packet_body) => { + packet_queue + .send(packet_body) + .await + .expect("The receiving half of packet queue should be always open"); + } + Err(e) => { + tracing::error!( + "failed to parse packet body. continuing reading the next packet: {e}" + ); + } + } + } + } + + async fn stream_send( + addr: NodeAddress, + packet_body: PacketBody, + cmd_tx: &mpsc::Sender, + packet_queue: &PacketQueue, + ) { + let (tx, rx) = oneshot::channel(); + cmd_tx + .send(Command::Connect(Dial { + addr: Self::multiaddr_from(addr), + retry_count: 3, + result_sender: tx, + })) + .await + .expect("Command receiver should be always open"); + + match rx.await { + Ok(Ok(peer_id)) => { + cmd_tx + .send(Command::StreamSend { + peer_id, + protocol: STREAM_PROTOCOL, + data: packet_body.bytes(), + }) + .await + .expect("Command receiver should be always open"); + } + Ok(Err(e)) => match e { + nomos_libp2p::DialError::NoAddresses => { + tracing::debug!("Dialing failed because the peer is the local node. Sending msg directly to the queue"); + packet_queue + .send(packet_body) + .await + .expect("The receiving half of packet queue should be always open"); + } + _ => tracing::error!("failed to dial with unrecoverable error: {e}"), + }, Err(e) => { - tracing::error!( - "failed to parse packet body. continuing reading the next packet: {e}" - ); + tracing::error!("channel closed before receiving: {e}"); } } } -} -async fn stream_send( - addr: NodeAddress, - packet_body: PacketBody, - cmd_tx: &mpsc::Sender, - packet_queue: &PacketQueue, -) { - let (tx, rx) = oneshot::channel(); - cmd_tx - .send(Command::Connect(Dial { - addr: multiaddr_from(addr), - retry_count: 3, - result_sender: tx, - })) - .await - .expect("Command receiver should be always open"); - - match rx.await { - Ok(Ok(peer_id)) => { - cmd_tx - .send(Command::StreamSend { - peer_id, - protocol: STREAM_PROTOCOL, - data: packet_body.bytes(), - }) - .await - .expect("Command receiver should be always open"); + fn multiaddr_from(addr: NodeAddress) -> Multiaddr { + match SocketAddr::from(addr) { + SocketAddr::V4(addr) => Multiaddr::empty() + .with(Protocol::Ip4(*addr.ip())) + .with(Protocol::Udp(addr.port())) + .with(Protocol::QuicV1), + SocketAddr::V6(addr) => Multiaddr::empty() + .with(Protocol::Ip6(*addr.ip())) + .with(Protocol::Udp(addr.port())) + .with(Protocol::QuicV1), } - Ok(Err(e)) => match e { - nomos_libp2p::DialError::NoAddresses => { - tracing::debug!("Dialing failed because the peer is the local node. Sending msg directly to the queue"); - packet_queue - .send(packet_body) - .await - .expect("The receiving half of packet queue should be always open"); - } - _ => tracing::error!("failed to dial with unrecoverable error: {e}"), - }, - Err(e) => { - tracing::error!("channel closed before receiving: {e}"); - } - } -} - -fn multiaddr_from(addr: NodeAddress) -> Multiaddr { - match SocketAddr::from(addr) { - SocketAddr::V4(addr) => Multiaddr::empty() - .with(Protocol::Ip4(*addr.ip())) - .with(Protocol::Udp(addr.port())) - .with(Protocol::QuicV1), - SocketAddr::V6(addr) => Multiaddr::empty() - .with(Protocol::Ip6(*addr.ip())) - .with(Protocol::Udp(addr.port())) - .with(Protocol::QuicV1), } } diff --git a/nomos-services/network/src/backends/libp2p/mod.rs b/nomos-services/network/src/backends/libp2p/mod.rs index 39aa88c6..1c28e085 100644 --- a/nomos-services/network/src/backends/libp2p/mod.rs +++ b/nomos-services/network/src/backends/libp2p/mod.rs @@ -12,9 +12,7 @@ use self::swarm::SwarmHandler; // internal use super::NetworkBackend; #[cfg(feature = "mixnet")] -use crate::backends::libp2p::mixnet::{init_mixnet, MixnetMessage, STREAM_PROTOCOL}; -#[cfg(feature = "mixnet")] -use ::mixnet::client::MessageQueue; +use crate::backends::libp2p::mixnet::{Mixnet, MixnetMessage, STREAM_PROTOCOL}; pub use nomos_libp2p::libp2p::gossipsub::{Message, TopicHash}; // crates use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::NoState}; @@ -24,7 +22,7 @@ pub struct Libp2p { events_tx: broadcast::Sender, commands_tx: mpsc::Sender, #[cfg(feature = "mixnet")] - mixclient_message_queue: MessageQueue, + mixnet: Mixnet, } #[derive(Debug)] @@ -56,7 +54,7 @@ impl NetworkBackend for Libp2p { SwarmHandler::new(&config, commands_tx.clone(), commands_rx, events_tx.clone()); #[cfg(feature = "mixnet")] - let mixclient_message_queue = init_mixnet( + let mixnet = Mixnet::new( config.mixnet, overwatch_handle.runtime().clone(), commands_tx.clone(), @@ -71,7 +69,7 @@ impl NetworkBackend for Libp2p { events_tx, commands_tx, #[cfg(feature = "mixnet")] - mixclient_message_queue, + mixnet, } } @@ -87,7 +85,7 @@ impl NetworkBackend for Libp2p { match msg { Command::Broadcast { topic, message } => { let msg = MixnetMessage { topic, message }; - if let Err(e) = self.mixclient_message_queue.send(msg.as_bytes()).await { + if let Err(e) = self.mixnet.message_queue().send(msg.as_bytes()).await { tracing::error!("failed to send messasge to mixclient: {e}"); } }