diff --git a/Cargo.toml b/Cargo.toml index fd894cd8..42aece63 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "nomos-services/data-availability/verifier", "nomos-services/data-availability/dispersal", "nomos-services/data-availability/tests", + "nomos-services/mix", "nomos-da/full-replication", "nomos-mix/message", "nomos-mix/network", diff --git a/nomos-mix/network/src/lib.rs b/nomos-mix/network/src/lib.rs index 01fd88c6..4a5e3c94 100644 --- a/nomos-mix/network/src/lib.rs +++ b/nomos-mix/network/src/lib.rs @@ -2,7 +2,7 @@ mod behaviour; mod error; mod handler; -pub use behaviour::{Behaviour, Event}; +pub use behaviour::{Behaviour, Config, Event}; #[cfg(test)] mod test { diff --git a/nomos-services/mix/Cargo.toml b/nomos-services/mix/Cargo.toml new file mode 100644 index 00000000..f4a335d4 --- /dev/null +++ b/nomos-services/mix/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "nomos-mix-service" +version = "0.1.0" +edition = "2021" + +[dependencies] +async-trait = "0.1" +futures = "0.3" +libp2p = { version = "0.53", features = ["ed25519"] } +nomos-libp2p = { path = "../../nomos-libp2p", optional = true } +nomos-mix-network = { path = "../../nomos-mix/network" } +nomos-mix-message = { path = "../../nomos-mix/message" } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } +rand = "0.8.5" +serde = { version = "1.0", features = ["derive"] } +tokio = { version = "1", features = ["macros", "sync"] } +tracing = "0.1" + +[features] +default = [] +libp2p = ["nomos-libp2p"] diff --git a/nomos-services/mix/src/backends/libp2p.rs b/nomos-services/mix/src/backends/libp2p.rs new file mode 100644 index 00000000..34781572 --- /dev/null +++ b/nomos-services/mix/src/backends/libp2p.rs @@ -0,0 +1,227 @@ +use std::{io, time::Duration}; + +use async_trait::async_trait; +use futures::StreamExt; +use libp2p::{ + core::transport::ListenerId, + identity::{ed25519, Keypair}, + swarm::SwarmEvent, + Multiaddr, PeerId, Swarm, SwarmBuilder, TransportError, +}; +use nomos_libp2p::{secret_key_serde, DialError, DialOpts, Protocol}; +use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::NoState}; +use rand::seq::IteratorRandom; +use serde::{Deserialize, Serialize}; +use tokio::{ + sync::{broadcast, mpsc}, + task::JoinHandle, +}; + +use super::NetworkBackend; + +pub struct Libp2pNetworkBackend { + #[allow(dead_code)] + task: JoinHandle<()>, + msgs_tx: mpsc::Sender, + events_tx: broadcast::Sender, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Libp2pNetworkBackendSettings { + pub listening_address: Multiaddr, + // A key for deriving PeerId and establishing secure connections (TLS 1.3 by QUIC) + #[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")] + pub node_key: ed25519::SecretKey, + pub membership: Vec, + pub peering_degree: usize, + pub num_mix_layers: usize, +} + +#[derive(Debug)] +pub enum Libp2pNetworkBackendMessage { + Mix(Vec), +} + +#[derive(Debug)] +pub enum Libp2pNetworkBackendEventKind { + FullyMixedMessage, +} + +#[derive(Debug, Clone)] +pub enum Libp2pNetworkBackendEvent { + FullyMixedMessage(Vec), +} + +const CHANNEL_SIZE: usize = 64; + +#[async_trait] +impl NetworkBackend for Libp2pNetworkBackend { + type Settings = Libp2pNetworkBackendSettings; + type State = NoState; + type Message = Libp2pNetworkBackendMessage; + type EventKind = Libp2pNetworkBackendEventKind; + type NetworkEvent = Libp2pNetworkBackendEvent; + + fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self { + let (msgs_tx, msgs_rx) = mpsc::channel(CHANNEL_SIZE); + let (events_tx, _) = broadcast::channel(CHANNEL_SIZE); + + let keypair = Keypair::from(ed25519::Keypair::from(config.node_key.clone())); + let local_peer_id = keypair.public().to_peer_id(); + let mut swarm = MixSwarm::new(keypair, config.num_mix_layers, msgs_rx, events_tx.clone()); + + swarm + .listen_on(config.listening_address) + .unwrap_or_else(|e| { + panic!("Failed to listen on Mix network: {e:?}"); + }); + + // Randomly select peering_degree number of peers, and dial to them + // TODO: Consider moving the peer seelction to the nomos_mix_network::Behaviour + config + .membership + .iter() + .filter(|addr| match extract_peer_id(addr) { + Some(peer_id) => peer_id != local_peer_id, + None => false, + }) + .choose_multiple(&mut rand::thread_rng(), config.peering_degree) + .iter() + .cloned() + .for_each(|addr| { + if let Err(e) = swarm.dial(addr.clone()) { + tracing::error!("failed to dial to {:?}: {:?}", addr, e); + } + }); + + let task = overwatch_handle.runtime().spawn(async move { + swarm.run().await; + }); + + Self { + task, + msgs_tx, + events_tx, + } + } + + async fn process(&self, msg: Self::Message) { + if let Err(e) = self.msgs_tx.send(msg).await { + tracing::error!("Failed to send message to MixSwarm: {e}"); + } + } + + async fn subscribe( + &mut self, + kind: Self::EventKind, + ) -> broadcast::Receiver { + match kind { + Libp2pNetworkBackendEventKind::FullyMixedMessage => self.events_tx.subscribe(), + } + } +} + +struct MixSwarm { + swarm: Swarm, + num_mix_layers: usize, + msgs_rx: mpsc::Receiver, + events_tx: broadcast::Sender, +} + +impl MixSwarm { + fn new( + keypair: Keypair, + num_mix_layers: usize, + msgs_rx: mpsc::Receiver, + events_tx: broadcast::Sender, + ) -> Self { + let swarm = SwarmBuilder::with_existing_identity(keypair) + .with_tokio() + .with_quic() + .with_behaviour(|_| { + nomos_mix_network::Behaviour::new(nomos_mix_network::Config { + transmission_rate: 1.0, + duplicate_cache_lifespan: 60, + }) + }) + .expect("Mix Behaviour should be built") + .with_swarm_config(|cfg| { + cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX)) + }) + .build(); + + Self { + swarm, + num_mix_layers, + msgs_rx, + events_tx, + } + } + + fn listen_on(&mut self, addr: Multiaddr) -> Result> { + self.swarm.listen_on(addr) + } + + fn dial(&mut self, addr: Multiaddr) -> Result<(), DialError> { + self.swarm.dial(DialOpts::from(addr)) + } + + async fn run(&mut self) { + loop { + tokio::select! { + Some(msg) = self.msgs_rx.recv() => { + self.handle_msg(msg).await; + } + Some(event) = self.swarm.next() => { + self.handle_event(event); + } + } + } + } + + async fn handle_msg(&mut self, msg: Libp2pNetworkBackendMessage) { + match msg { + Libp2pNetworkBackendMessage::Mix(msg) => { + tracing::debug!("Wrap msg and send it to mix network: {msg:?}"); + match nomos_mix_message::new_message(&msg, self.num_mix_layers.try_into().unwrap()) + { + Ok(wrapped_msg) => { + if let Err(e) = self.swarm.behaviour_mut().publish(wrapped_msg) { + tracing::error!("Failed to publish message to mix network: {e:?}"); + } + } + Err(e) => { + tracing::error!("Failed to wrap message: {e:?}"); + } + } + } + } + } + + fn handle_event(&mut self, event: SwarmEvent) { + match event { + SwarmEvent::Behaviour(nomos_mix_network::Event::FullyUnwrappedMessage(msg)) => { + tracing::debug!("Received fully unwrapped message: {msg:?}"); + self.events_tx + .send(Libp2pNetworkBackendEvent::FullyMixedMessage(msg)) + .unwrap(); + } + SwarmEvent::Behaviour(nomos_mix_network::Event::Error(e)) => { + tracing::error!("Received error from mix network: {e:?}"); + } + _ => { + tracing::debug!("Received event from mix network: {event:?}"); + } + } + } +} + +fn extract_peer_id(multiaddr: &Multiaddr) -> Option { + multiaddr.iter().find_map(|protocol| { + if let Protocol::P2p(peer_id) = protocol { + Some(peer_id) + } else { + None + } + }) +} diff --git a/nomos-services/mix/src/backends/mod.rs b/nomos-services/mix/src/backends/mod.rs new file mode 100644 index 00000000..20087477 --- /dev/null +++ b/nomos-services/mix/src/backends/mod.rs @@ -0,0 +1,20 @@ +#[cfg(feature = "libp2p")] +pub mod libp2p; + +use std::fmt::Debug; + +use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::ServiceState}; +use tokio::sync::broadcast::Receiver; + +#[async_trait::async_trait] +pub trait NetworkBackend { + type Settings: Clone + Debug + Send + Sync + 'static; + type State: ServiceState + Clone + Send + Sync; + type Message: Debug + Send + Sync + 'static; + type EventKind: Debug + Send + Sync + 'static; + type NetworkEvent: Debug + Send + Sync + 'static; + + fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self; + async fn process(&self, msg: Self::Message); + async fn subscribe(&mut self, event: Self::EventKind) -> Receiver; +} diff --git a/nomos-services/mix/src/lib.rs b/nomos-services/mix/src/lib.rs new file mode 100644 index 00000000..a2416503 --- /dev/null +++ b/nomos-services/mix/src/lib.rs @@ -0,0 +1,174 @@ +pub mod backends; + +use std::fmt::{self, Debug}; + +use async_trait::async_trait; +use backends::NetworkBackend; +use futures::StreamExt; +use overwatch_rs::services::{ + handle::ServiceStateHandle, + life_cycle::LifecycleMessage, + relay::RelayMessage, + state::{NoOperator, ServiceState}, + ServiceCore, ServiceData, ServiceId, +}; +use serde::{Deserialize, Serialize}; +use tokio::sync::{broadcast, oneshot}; + +pub struct NetworkService { + backend: B, + service_state: ServiceStateHandle, +} + +impl ServiceData for NetworkService { + const SERVICE_ID: ServiceId = "MixNetwork"; + type Settings = NetworkConfig; + type State = NetworkState; + type StateOperator = NoOperator; + type Message = NetworkMsg; +} + +#[async_trait] +impl ServiceCore for NetworkService +where + B: NetworkBackend + Send + 'static, + B::State: Send + Sync, +{ + fn init(service_state: ServiceStateHandle) -> Result { + Ok(Self { + backend: ::new( + service_state.settings_reader.get_updated_settings().backend, + service_state.overwatch_handle.clone(), + ), + service_state, + }) + } + + async fn run(mut self) -> Result<(), overwatch_rs::DynError> { + let Self { + service_state: + ServiceStateHandle { + mut inbound_relay, + lifecycle_handle, + .. + }, + mut backend, + } = self; + let mut lifecycle_stream = lifecycle_handle.message_stream(); + loop { + tokio::select! { + Some(msg) = inbound_relay.recv() => { + Self::handle_network_service_message(msg, &mut backend).await; + } + Some(msg) = lifecycle_stream.next() => { + if Self::should_stop_service(msg).await { + break; + } + } + } + } + Ok(()) + } +} + +impl NetworkService +where + B: NetworkBackend + Send + 'static, + B::State: Send + Sync, +{ + async fn handle_network_service_message(msg: NetworkMsg, backend: &mut B) { + match msg { + NetworkMsg::Process(msg) => { + // split sending in two steps to help the compiler understand we do not + // need to hold an instance of &I (which is not send) across an await point + let _send = backend.process(msg); + _send.await + } + NetworkMsg::Subscribe { kind, sender } => sender + .send(backend.subscribe(kind).await) + .unwrap_or_else(|_| { + tracing::warn!( + "client hung up before a subscription handle could be established" + ) + }), + } + } + + async fn should_stop_service(msg: LifecycleMessage) -> bool { + match msg { + LifecycleMessage::Kill => true, + LifecycleMessage::Shutdown(signal_sender) => { + // TODO: Maybe add a call to backend to handle this. Maybe trying to save unprocessed messages? + if signal_sender.send(()).is_err() { + tracing::error!( + "Error sending successful shutdown signal from service {}", + Self::SERVICE_ID + ); + } + true + } + } + } +} + +#[derive(Serialize, Deserialize)] +pub struct NetworkConfig { + pub backend: B::Settings, +} + +impl Debug for NetworkConfig { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "NetworkConfig {{ backend: {:?}}}", self.backend) + } +} + +impl Clone for NetworkConfig { + fn clone(&self) -> Self { + NetworkConfig { + backend: self.backend.clone(), + } + } +} + +pub struct NetworkState { + _backend: B::State, +} + +impl Clone for NetworkState { + fn clone(&self) -> Self { + NetworkState { + _backend: self._backend.clone(), + } + } +} + +impl ServiceState for NetworkState { + type Settings = NetworkConfig; + type Error = ::Error; + + fn from_settings(settings: &Self::Settings) -> Result { + B::State::from_settings(&settings.backend).map(|_backend| Self { _backend }) + } +} + +pub enum NetworkMsg { + Process(B::Message), + Subscribe { + kind: B::EventKind, + sender: oneshot::Sender>, + }, +} + +impl Debug for NetworkMsg { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::Process(msg) => write!(fmt, "NetworkMsg::Process({msg:?})"), + Self::Subscribe { kind, sender } => write!( + fmt, + "NetworkMsg::Subscribe{{ kind: {kind:?}, sender: {sender:?}}}" + ), + } + } +} + +impl RelayMessage for NetworkMsg {}