refactor: define struct for Libp2p Mixnet extension

This commit is contained in:
Youngjoon Lee 2024-03-25 21:52:39 +09:00
parent 16444cf32a
commit f2e3a02306
No known key found for this signature in database
GPG Key ID: 7E8EE3EC8D5CEBA2
2 changed files with 162 additions and 150 deletions

View File

@ -14,10 +14,8 @@ use nomos_libp2p::{
Multiaddr, Protocol,
};
use serde::{Deserialize, Serialize};
use tokio::{
runtime::Handle,
sync::{mpsc, oneshot},
};
use tokio::sync::mpsc::{self};
use tokio::{runtime::Handle, sync::oneshot};
use crate::backends::libp2p::{Command, Dial, Topic};
@ -29,158 +27,174 @@ pub struct MixnetConfig {
pub(crate) const STREAM_PROTOCOL: StreamProtocol = StreamProtocol::new("/mixnet");
pub(crate) fn init_mixnet(
config: MixnetConfig,
runtime_handle: Handle,
cmd_tx: mpsc::Sender<Command>,
incoming_streams: IncomingStreams,
) -> MessageQueue {
// Run mixnode
let (mixnode, packet_queue) = MixNode::new(config.mixnode).unwrap();
let libp2p_cmd_tx = cmd_tx.clone();
let queue = packet_queue.clone();
runtime_handle.spawn(async move {
run_mixnode(mixnode, queue, libp2p_cmd_tx).await;
});
let handle = runtime_handle.clone();
let queue = packet_queue.clone();
runtime_handle.spawn(async move {
handle_incoming_streams(incoming_streams, queue, handle).await;
});
// Run mixclient
let (mixclient, message_queue) = MixClient::new(config.mixclient).unwrap();
runtime_handle.spawn(async move {
run_mixclient(mixclient, packet_queue, cmd_tx).await;
});
message_queue
/// Mixnet extension for Libp2p network backend
pub(crate) struct Mixnet {
message_queue: MessageQueue,
}
async fn run_mixnode(
mut mixnode: MixNode,
packet_queue: PacketQueue,
cmd_tx: mpsc::Sender<Command>,
) {
while let Some(output) = mixnode.next().await {
match output {
Output::Forward(packet) => {
stream_send(packet.address(), packet.body(), &cmd_tx, &packet_queue).await;
}
Output::ReconstructedMessage(message) => match MixnetMessage::from_bytes(&message) {
Ok(msg) => {
cmd_tx
.send(Command::Broadcast {
topic: msg.topic,
message: msg.message,
})
.await
.unwrap();
}
Err(e) => {
tracing::error!("failed to parse message received from mixnet: {e}");
}
},
}
}
}
async fn run_mixclient(
mut mixclient: MixClient,
packet_queue: PacketQueue,
cmd_tx: mpsc::Sender<Command>,
) {
while let Some(packet) = mixclient.next().await {
stream_send(packet.address(), packet.body(), &cmd_tx, &packet_queue).await;
}
}
async fn handle_incoming_streams(
mut incoming_streams: IncomingStreams,
packet_queue: PacketQueue,
runtime_handle: Handle,
) {
while let Some((_, stream)) = incoming_streams.next().await {
impl Mixnet {
/// Creates a Mixnet instance by spawning tasks to run mixnode and mixclient
pub(crate) fn new(
config: MixnetConfig,
runtime_handle: Handle,
cmd_tx: mpsc::Sender<Command>,
incoming_streams: IncomingStreams,
) -> Self {
// Run mixnode
let (mixnode, packet_queue) = MixNode::new(config.mixnode).unwrap();
let libp2p_cmd_tx = cmd_tx.clone();
let queue = packet_queue.clone();
runtime_handle.spawn(async move {
if let Err(e) = handle_stream(stream, queue).await {
tracing::warn!("stream closed: {e}");
}
Self::run_mixnode(mixnode, queue, libp2p_cmd_tx).await;
});
let handle = runtime_handle.clone();
let queue = packet_queue.clone();
runtime_handle.spawn(async move {
Self::handle_incoming_streams(incoming_streams, queue, handle).await;
});
}
}
async fn handle_stream(mut stream: Stream, packet_queue: PacketQueue) -> std::io::Result<()> {
loop {
match PacketBody::read_from(&mut stream).await? {
Ok(packet_body) => {
packet_queue
.send(packet_body)
.await
.expect("The receiving half of packet queue should be always open");
// Run mixclient
let (mixclient, message_queue) = MixClient::new(config.mixclient).unwrap();
runtime_handle.spawn(async move {
Self::run_mixclient(mixclient, packet_queue, cmd_tx).await;
});
Self { message_queue }
}
/// Returns MessageQueue for sending messages to mixnet (via mixclient)
pub(crate) fn message_queue(&self) -> &MessageQueue {
&self.message_queue
}
async fn run_mixnode(
mut mixnode: MixNode,
packet_queue: PacketQueue,
cmd_tx: mpsc::Sender<Command>,
) {
while let Some(output) = mixnode.next().await {
match output {
Output::Forward(packet) => {
Self::stream_send(packet.address(), packet.body(), &cmd_tx, &packet_queue)
.await;
}
Output::ReconstructedMessage(message) => {
match MixnetMessage::from_bytes(&message) {
Ok(msg) => {
cmd_tx
.send(Command::Broadcast {
topic: msg.topic,
message: msg.message,
})
.await
.unwrap();
}
Err(e) => {
tracing::error!("failed to parse message received from mixnet: {e}");
}
}
}
}
}
}
async fn run_mixclient(
mut mixclient: MixClient,
packet_queue: PacketQueue,
cmd_tx: mpsc::Sender<Command>,
) {
while let Some(packet) = mixclient.next().await {
Self::stream_send(packet.address(), packet.body(), &cmd_tx, &packet_queue).await;
}
}
async fn handle_incoming_streams(
mut incoming_streams: IncomingStreams,
packet_queue: PacketQueue,
runtime_handle: Handle,
) {
while let Some((_, stream)) = incoming_streams.next().await {
let queue = packet_queue.clone();
runtime_handle.spawn(async move {
if let Err(e) = Self::handle_stream(stream, queue).await {
tracing::warn!("stream closed: {e}");
}
});
}
}
async fn handle_stream(mut stream: Stream, packet_queue: PacketQueue) -> std::io::Result<()> {
loop {
match PacketBody::read_from(&mut stream).await? {
Ok(packet_body) => {
packet_queue
.send(packet_body)
.await
.expect("The receiving half of packet queue should be always open");
}
Err(e) => {
tracing::error!(
"failed to parse packet body. continuing reading the next packet: {e}"
);
}
}
}
}
async fn stream_send(
addr: NodeAddress,
packet_body: PacketBody,
cmd_tx: &mpsc::Sender<Command>,
packet_queue: &PacketQueue,
) {
let (tx, rx) = oneshot::channel();
cmd_tx
.send(Command::Connect(Dial {
addr: Self::multiaddr_from(addr),
retry_count: 3,
result_sender: tx,
}))
.await
.expect("Command receiver should be always open");
match rx.await {
Ok(Ok(peer_id)) => {
cmd_tx
.send(Command::StreamSend {
peer_id,
protocol: STREAM_PROTOCOL,
data: packet_body.bytes(),
})
.await
.expect("Command receiver should be always open");
}
Ok(Err(e)) => match e {
nomos_libp2p::DialError::NoAddresses => {
tracing::debug!("Dialing failed because the peer is the local node. Sending msg directly to the queue");
packet_queue
.send(packet_body)
.await
.expect("The receiving half of packet queue should be always open");
}
_ => tracing::error!("failed to dial with unrecoverable error: {e}"),
},
Err(e) => {
tracing::error!(
"failed to parse packet body. continuing reading the next packet: {e}"
);
tracing::error!("channel closed before receiving: {e}");
}
}
}
}
async fn stream_send(
addr: NodeAddress,
packet_body: PacketBody,
cmd_tx: &mpsc::Sender<Command>,
packet_queue: &PacketQueue,
) {
let (tx, rx) = oneshot::channel();
cmd_tx
.send(Command::Connect(Dial {
addr: multiaddr_from(addr),
retry_count: 3,
result_sender: tx,
}))
.await
.expect("Command receiver should be always open");
match rx.await {
Ok(Ok(peer_id)) => {
cmd_tx
.send(Command::StreamSend {
peer_id,
protocol: STREAM_PROTOCOL,
data: packet_body.bytes(),
})
.await
.expect("Command receiver should be always open");
fn multiaddr_from(addr: NodeAddress) -> Multiaddr {
match SocketAddr::from(addr) {
SocketAddr::V4(addr) => Multiaddr::empty()
.with(Protocol::Ip4(*addr.ip()))
.with(Protocol::Udp(addr.port()))
.with(Protocol::QuicV1),
SocketAddr::V6(addr) => Multiaddr::empty()
.with(Protocol::Ip6(*addr.ip()))
.with(Protocol::Udp(addr.port()))
.with(Protocol::QuicV1),
}
Ok(Err(e)) => match e {
nomos_libp2p::DialError::NoAddresses => {
tracing::debug!("Dialing failed because the peer is the local node. Sending msg directly to the queue");
packet_queue
.send(packet_body)
.await
.expect("The receiving half of packet queue should be always open");
}
_ => tracing::error!("failed to dial with unrecoverable error: {e}"),
},
Err(e) => {
tracing::error!("channel closed before receiving: {e}");
}
}
}
fn multiaddr_from(addr: NodeAddress) -> Multiaddr {
match SocketAddr::from(addr) {
SocketAddr::V4(addr) => Multiaddr::empty()
.with(Protocol::Ip4(*addr.ip()))
.with(Protocol::Udp(addr.port()))
.with(Protocol::QuicV1),
SocketAddr::V6(addr) => Multiaddr::empty()
.with(Protocol::Ip6(*addr.ip()))
.with(Protocol::Udp(addr.port()))
.with(Protocol::QuicV1),
}
}

View File

@ -12,9 +12,7 @@ use self::swarm::SwarmHandler;
// internal
use super::NetworkBackend;
#[cfg(feature = "mixnet")]
use crate::backends::libp2p::mixnet::{init_mixnet, MixnetMessage, STREAM_PROTOCOL};
#[cfg(feature = "mixnet")]
use ::mixnet::client::MessageQueue;
use crate::backends::libp2p::mixnet::{Mixnet, MixnetMessage, STREAM_PROTOCOL};
pub use nomos_libp2p::libp2p::gossipsub::{Message, TopicHash};
// crates
use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::NoState};
@ -24,7 +22,7 @@ pub struct Libp2p {
events_tx: broadcast::Sender<Event>,
commands_tx: mpsc::Sender<Command>,
#[cfg(feature = "mixnet")]
mixclient_message_queue: MessageQueue,
mixnet: Mixnet,
}
#[derive(Debug)]
@ -56,7 +54,7 @@ impl NetworkBackend for Libp2p {
SwarmHandler::new(&config, commands_tx.clone(), commands_rx, events_tx.clone());
#[cfg(feature = "mixnet")]
let mixclient_message_queue = init_mixnet(
let mixnet = Mixnet::new(
config.mixnet,
overwatch_handle.runtime().clone(),
commands_tx.clone(),
@ -71,7 +69,7 @@ impl NetworkBackend for Libp2p {
events_tx,
commands_tx,
#[cfg(feature = "mixnet")]
mixclient_message_queue,
mixnet,
}
}
@ -87,7 +85,7 @@ impl NetworkBackend for Libp2p {
match msg {
Command::Broadcast { topic, message } => {
let msg = MixnetMessage { topic, message };
if let Err(e) = self.mixclient_message_queue.send(msg.as_bytes()).await {
if let Err(e) = self.mixnet.message_queue().send(msg.as_bytes()).await {
tracing::error!("failed to send messasge to mixclient: {e}");
}
}