Mix: Add mix service (#823)

* Mix: remove all of the previous mixnet stuff

* Mix: Add mix service

* refactor extract_peer_id and add comments
This commit is contained in:
Youngjoon Lee 2024-10-17 18:12:26 +09:00 committed by GitHub
parent cb86528a4a
commit 8dbcf560f9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 444 additions and 1 deletions

View File

@ -22,6 +22,7 @@ members = [
"nomos-services/data-availability/verifier",
"nomos-services/data-availability/dispersal",
"nomos-services/data-availability/tests",
"nomos-services/mix",
"nomos-da/full-replication",
"nomos-mix/message",
"nomos-mix/network",

View File

@ -2,7 +2,7 @@ mod behaviour;
mod error;
mod handler;
pub use behaviour::{Behaviour, Event};
pub use behaviour::{Behaviour, Config, Event};
#[cfg(test)]
mod test {

View File

@ -0,0 +1,21 @@
[package]
name = "nomos-mix-service"
version = "0.1.0"
edition = "2021"
[dependencies]
async-trait = "0.1"
futures = "0.3"
libp2p = { version = "0.53", features = ["ed25519"] }
nomos-libp2p = { path = "../../nomos-libp2p", optional = true }
nomos-mix-network = { path = "../../nomos-mix/network" }
nomos-mix-message = { path = "../../nomos-mix/message" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
rand = "0.8.5"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1", features = ["macros", "sync"] }
tracing = "0.1"
[features]
default = []
libp2p = ["nomos-libp2p"]

View File

@ -0,0 +1,227 @@
use std::{io, time::Duration};
use async_trait::async_trait;
use futures::StreamExt;
use libp2p::{
core::transport::ListenerId,
identity::{ed25519, Keypair},
swarm::SwarmEvent,
Multiaddr, PeerId, Swarm, SwarmBuilder, TransportError,
};
use nomos_libp2p::{secret_key_serde, DialError, DialOpts, Protocol};
use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::NoState};
use rand::seq::IteratorRandom;
use serde::{Deserialize, Serialize};
use tokio::{
sync::{broadcast, mpsc},
task::JoinHandle,
};
use super::NetworkBackend;
pub struct Libp2pNetworkBackend {
#[allow(dead_code)]
task: JoinHandle<()>,
msgs_tx: mpsc::Sender<Libp2pNetworkBackendMessage>,
events_tx: broadcast::Sender<Libp2pNetworkBackendEvent>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Libp2pNetworkBackendSettings {
pub listening_address: Multiaddr,
// A key for deriving PeerId and establishing secure connections (TLS 1.3 by QUIC)
#[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")]
pub node_key: ed25519::SecretKey,
pub membership: Vec<Multiaddr>,
pub peering_degree: usize,
pub num_mix_layers: usize,
}
#[derive(Debug)]
pub enum Libp2pNetworkBackendMessage {
Mix(Vec<u8>),
}
#[derive(Debug)]
pub enum Libp2pNetworkBackendEventKind {
FullyMixedMessage,
}
#[derive(Debug, Clone)]
pub enum Libp2pNetworkBackendEvent {
FullyMixedMessage(Vec<u8>),
}
const CHANNEL_SIZE: usize = 64;
#[async_trait]
impl NetworkBackend for Libp2pNetworkBackend {
type Settings = Libp2pNetworkBackendSettings;
type State = NoState<Self::Settings>;
type Message = Libp2pNetworkBackendMessage;
type EventKind = Libp2pNetworkBackendEventKind;
type NetworkEvent = Libp2pNetworkBackendEvent;
fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self {
let (msgs_tx, msgs_rx) = mpsc::channel(CHANNEL_SIZE);
let (events_tx, _) = broadcast::channel(CHANNEL_SIZE);
let keypair = Keypair::from(ed25519::Keypair::from(config.node_key.clone()));
let local_peer_id = keypair.public().to_peer_id();
let mut swarm = MixSwarm::new(keypair, config.num_mix_layers, msgs_rx, events_tx.clone());
swarm
.listen_on(config.listening_address)
.unwrap_or_else(|e| {
panic!("Failed to listen on Mix network: {e:?}");
});
// Randomly select peering_degree number of peers, and dial to them
// TODO: Consider moving the peer seelction to the nomos_mix_network::Behaviour
config
.membership
.iter()
.filter(|addr| match extract_peer_id(addr) {
Some(peer_id) => peer_id != local_peer_id,
None => false,
})
.choose_multiple(&mut rand::thread_rng(), config.peering_degree)
.iter()
.cloned()
.for_each(|addr| {
if let Err(e) = swarm.dial(addr.clone()) {
tracing::error!("failed to dial to {:?}: {:?}", addr, e);
}
});
let task = overwatch_handle.runtime().spawn(async move {
swarm.run().await;
});
Self {
task,
msgs_tx,
events_tx,
}
}
async fn process(&self, msg: Self::Message) {
if let Err(e) = self.msgs_tx.send(msg).await {
tracing::error!("Failed to send message to MixSwarm: {e}");
}
}
async fn subscribe(
&mut self,
kind: Self::EventKind,
) -> broadcast::Receiver<Self::NetworkEvent> {
match kind {
Libp2pNetworkBackendEventKind::FullyMixedMessage => self.events_tx.subscribe(),
}
}
}
struct MixSwarm {
swarm: Swarm<nomos_mix_network::Behaviour>,
num_mix_layers: usize,
msgs_rx: mpsc::Receiver<Libp2pNetworkBackendMessage>,
events_tx: broadcast::Sender<Libp2pNetworkBackendEvent>,
}
impl MixSwarm {
fn new(
keypair: Keypair,
num_mix_layers: usize,
msgs_rx: mpsc::Receiver<Libp2pNetworkBackendMessage>,
events_tx: broadcast::Sender<Libp2pNetworkBackendEvent>,
) -> Self {
let swarm = SwarmBuilder::with_existing_identity(keypair)
.with_tokio()
.with_quic()
.with_behaviour(|_| {
nomos_mix_network::Behaviour::new(nomos_mix_network::Config {
transmission_rate: 1.0,
duplicate_cache_lifespan: 60,
})
})
.expect("Mix Behaviour should be built")
.with_swarm_config(|cfg| {
cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX))
})
.build();
Self {
swarm,
num_mix_layers,
msgs_rx,
events_tx,
}
}
fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
self.swarm.listen_on(addr)
}
fn dial(&mut self, addr: Multiaddr) -> Result<(), DialError> {
self.swarm.dial(DialOpts::from(addr))
}
async fn run(&mut self) {
loop {
tokio::select! {
Some(msg) = self.msgs_rx.recv() => {
self.handle_msg(msg).await;
}
Some(event) = self.swarm.next() => {
self.handle_event(event);
}
}
}
}
async fn handle_msg(&mut self, msg: Libp2pNetworkBackendMessage) {
match msg {
Libp2pNetworkBackendMessage::Mix(msg) => {
tracing::debug!("Wrap msg and send it to mix network: {msg:?}");
match nomos_mix_message::new_message(&msg, self.num_mix_layers.try_into().unwrap())
{
Ok(wrapped_msg) => {
if let Err(e) = self.swarm.behaviour_mut().publish(wrapped_msg) {
tracing::error!("Failed to publish message to mix network: {e:?}");
}
}
Err(e) => {
tracing::error!("Failed to wrap message: {e:?}");
}
}
}
}
}
fn handle_event(&mut self, event: SwarmEvent<nomos_mix_network::Event>) {
match event {
SwarmEvent::Behaviour(nomos_mix_network::Event::FullyUnwrappedMessage(msg)) => {
tracing::debug!("Received fully unwrapped message: {msg:?}");
self.events_tx
.send(Libp2pNetworkBackendEvent::FullyMixedMessage(msg))
.unwrap();
}
SwarmEvent::Behaviour(nomos_mix_network::Event::Error(e)) => {
tracing::error!("Received error from mix network: {e:?}");
}
_ => {
tracing::debug!("Received event from mix network: {event:?}");
}
}
}
}
fn extract_peer_id(multiaddr: &Multiaddr) -> Option<PeerId> {
multiaddr.iter().find_map(|protocol| {
if let Protocol::P2p(peer_id) = protocol {
Some(peer_id)
} else {
None
}
})
}

View File

@ -0,0 +1,20 @@
#[cfg(feature = "libp2p")]
pub mod libp2p;
use std::fmt::Debug;
use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::ServiceState};
use tokio::sync::broadcast::Receiver;
#[async_trait::async_trait]
pub trait NetworkBackend {
type Settings: Clone + Debug + Send + Sync + 'static;
type State: ServiceState<Settings = Self::Settings> + Clone + Send + Sync;
type Message: Debug + Send + Sync + 'static;
type EventKind: Debug + Send + Sync + 'static;
type NetworkEvent: Debug + Send + Sync + 'static;
fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self;
async fn process(&self, msg: Self::Message);
async fn subscribe(&mut self, event: Self::EventKind) -> Receiver<Self::NetworkEvent>;
}

View File

@ -0,0 +1,174 @@
pub mod backends;
use std::fmt::{self, Debug};
use async_trait::async_trait;
use backends::NetworkBackend;
use futures::StreamExt;
use overwatch_rs::services::{
handle::ServiceStateHandle,
life_cycle::LifecycleMessage,
relay::RelayMessage,
state::{NoOperator, ServiceState},
ServiceCore, ServiceData, ServiceId,
};
use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, oneshot};
pub struct NetworkService<B: NetworkBackend + 'static> {
backend: B,
service_state: ServiceStateHandle<Self>,
}
impl<B: NetworkBackend + 'static> ServiceData for NetworkService<B> {
const SERVICE_ID: ServiceId = "MixNetwork";
type Settings = NetworkConfig<B>;
type State = NetworkState<B>;
type StateOperator = NoOperator<Self::State>;
type Message = NetworkMsg<B>;
}
#[async_trait]
impl<B> ServiceCore for NetworkService<B>
where
B: NetworkBackend + Send + 'static,
B::State: Send + Sync,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
Ok(Self {
backend: <B as NetworkBackend>::new(
service_state.settings_reader.get_updated_settings().backend,
service_state.overwatch_handle.clone(),
),
service_state,
})
}
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
let Self {
service_state:
ServiceStateHandle {
mut inbound_relay,
lifecycle_handle,
..
},
mut backend,
} = self;
let mut lifecycle_stream = lifecycle_handle.message_stream();
loop {
tokio::select! {
Some(msg) = inbound_relay.recv() => {
Self::handle_network_service_message(msg, &mut backend).await;
}
Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await {
break;
}
}
}
}
Ok(())
}
}
impl<B> NetworkService<B>
where
B: NetworkBackend + Send + 'static,
B::State: Send + Sync,
{
async fn handle_network_service_message(msg: NetworkMsg<B>, backend: &mut B) {
match msg {
NetworkMsg::Process(msg) => {
// split sending in two steps to help the compiler understand we do not
// need to hold an instance of &I (which is not send) across an await point
let _send = backend.process(msg);
_send.await
}
NetworkMsg::Subscribe { kind, sender } => sender
.send(backend.subscribe(kind).await)
.unwrap_or_else(|_| {
tracing::warn!(
"client hung up before a subscription handle could be established"
)
}),
}
}
async fn should_stop_service(msg: LifecycleMessage) -> bool {
match msg {
LifecycleMessage::Kill => true,
LifecycleMessage::Shutdown(signal_sender) => {
// TODO: Maybe add a call to backend to handle this. Maybe trying to save unprocessed messages?
if signal_sender.send(()).is_err() {
tracing::error!(
"Error sending successful shutdown signal from service {}",
Self::SERVICE_ID
);
}
true
}
}
}
}
#[derive(Serialize, Deserialize)]
pub struct NetworkConfig<B: NetworkBackend> {
pub backend: B::Settings,
}
impl<B: NetworkBackend> Debug for NetworkConfig<B> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "NetworkConfig {{ backend: {:?}}}", self.backend)
}
}
impl<B: NetworkBackend> Clone for NetworkConfig<B> {
fn clone(&self) -> Self {
NetworkConfig {
backend: self.backend.clone(),
}
}
}
pub struct NetworkState<B: NetworkBackend> {
_backend: B::State,
}
impl<B: NetworkBackend> Clone for NetworkState<B> {
fn clone(&self) -> Self {
NetworkState {
_backend: self._backend.clone(),
}
}
}
impl<B: NetworkBackend> ServiceState for NetworkState<B> {
type Settings = NetworkConfig<B>;
type Error = <B::State as ServiceState>::Error;
fn from_settings(settings: &Self::Settings) -> Result<Self, Self::Error> {
B::State::from_settings(&settings.backend).map(|_backend| Self { _backend })
}
}
pub enum NetworkMsg<B: NetworkBackend> {
Process(B::Message),
Subscribe {
kind: B::EventKind,
sender: oneshot::Sender<broadcast::Receiver<B::NetworkEvent>>,
},
}
impl<B: NetworkBackend> Debug for NetworkMsg<B> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Process(msg) => write!(fmt, "NetworkMsg::Process({msg:?})"),
Self::Subscribe { kind, sender } => write!(
fmt,
"NetworkMsg::Subscribe{{ kind: {kind:?}, sender: {sender:?}}}"
),
}
}
}
impl<T: NetworkBackend + 'static> RelayMessage for NetworkMsg<T> {}