Mix: add connection maintenance

This commit is contained in:
Youngjoon Lee 2024-11-28 12:04:53 +09:00
parent d923328744
commit 65f0327897
No known key found for this signature in database
GPG Key ID: 303963A54A81DD4D
11 changed files with 495 additions and 72 deletions

View File

@ -0,0 +1,168 @@
use std::{
collections::{HashMap, HashSet},
fmt::Debug,
hash::Hash,
time::Duration,
};
use serde::{Deserialize, Serialize};
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct ConnectionMaintenanceSettings {
/// Time interval to measure/evaluate the number of messages sent by each peer.
pub time_window: Duration,
/// The number of effective (data or cover) messages that a peer is expected to send in a given time window.
/// If the measured count is greater than (expected * (1 + tolerance)), the peer is considered malicious.
/// If the measured count is less than (expected * (1 - tolerance)), the peer is considered unhealthy.
pub expected_effective_messages: f32,
pub effective_message_tolerance: f32,
/// The number of drop messages that a peer is expected to send in a given time window.
/// If the measured count is greater than (expected * (1 + tolerance)), the peer is considered malicious.
/// If the measured count is less than (expected * (1 - tolerance)), the peer is considered unhealthy.
pub expected_drop_messages: f32,
pub drop_message_tolerance: f32,
}
/// Connection maintenance to detect malicious and unhealthy peers
/// based on the number of messages sent by each peer in time windows
pub struct ConnectionMaintenance<Peer> {
settings: ConnectionMaintenanceSettings,
/// Meters to count the number of effective and drop messages sent by each peer
meters: HashMap<Peer, ConnectionMeter>,
}
impl<Peer> ConnectionMaintenance<Peer>
where
Peer: Debug + Eq + Hash + Clone,
{
pub fn new(settings: ConnectionMaintenanceSettings) -> Self {
Self {
settings,
meters: HashMap::new(),
}
}
/// Record a effective message sent by the [`peer`]
pub fn add_effective(&mut self, peer: Peer) {
self.meter(peer).effective_messages += 1;
}
/// Record a drop message sent by the [`peer`]
pub fn add_drop(&mut self, peer: Peer) {
self.meter(peer).drop_messages += 1;
}
fn meter(&mut self, peer: Peer) -> &mut ConnectionMeter {
self.meters.entry(peer).or_insert_with(ConnectionMeter::new)
}
/// Return malicious peers and unhealthy peers, and reset the counters of effective/drop messages.
/// This function must be called at the end of each time window.
/// The set of malicious peers doesn't have the intersection with the set of unhealthy peers.
pub fn reset(&mut self) -> (HashSet<Peer>, HashSet<Peer>) {
let mut malicious_peers = HashSet::new();
let mut unhealthy_peers = HashSet::new();
self.meters.iter().for_each(|(peer, meter)| {
if meter.is_malicious(&self.settings) {
malicious_peers.insert(peer.clone());
} else if meter.is_unhealthy(&self.settings) {
unhealthy_peers.insert(peer.clone());
}
});
self.meters.clear();
(malicious_peers, unhealthy_peers)
}
}
/// Meter to count the number of effective and drop messages sent by a peer
#[derive(Debug)]
struct ConnectionMeter {
effective_messages: usize,
drop_messages: usize,
}
impl ConnectionMeter {
fn new() -> Self {
Self {
effective_messages: 0,
drop_messages: 0,
}
}
/// Check if the peer is malicious based on the number of effective and drop messages sent
fn is_malicious(&self, settings: &ConnectionMaintenanceSettings) -> bool {
let effective_threshold =
settings.expected_effective_messages * (1.0 + settings.effective_message_tolerance);
let drop_threshold =
settings.expected_drop_messages * (1.0 + settings.drop_message_tolerance);
self.effective_messages as f32 > effective_threshold
|| self.drop_messages as f32 > drop_threshold
}
/// Check if the peer is unhealthy based on the number of effective and drop messages sent
fn is_unhealthy(&self, settings: &ConnectionMaintenanceSettings) -> bool {
let effective_threshold =
settings.expected_effective_messages * (1.0 - settings.effective_message_tolerance);
let drop_threshold =
settings.expected_drop_messages * (1.0 - settings.drop_message_tolerance);
effective_threshold > self.effective_messages as f32
|| drop_threshold > self.drop_messages as f32
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn malicious_and_unhealthy_by_effective() {
let settings = ConnectionMaintenanceSettings {
time_window: Duration::from_secs(1),
expected_effective_messages: 2.0,
effective_message_tolerance: 0.1,
expected_drop_messages: 0.0,
drop_message_tolerance: 0.0,
};
let mut maintenance = ConnectionMaintenance::<u8>::new(settings);
// Peer 0 sends 3 effective messages, more than expected
maintenance.add_effective(0);
maintenance.add_effective(0);
maintenance.add_effective(0);
// Peer 1 sends 2 effective messages, as expected
maintenance.add_effective(1);
maintenance.add_effective(1);
// Peer 2 sends 1 effective messages, less than expected
maintenance.add_effective(2);
let (malicious, unhealthy) = maintenance.reset();
assert_eq!(malicious, HashSet::from_iter(vec![0]));
assert_eq!(unhealthy, HashSet::from_iter(vec![2]));
}
#[test]
fn malicious_and_unhealthy_by_drop() {
let settings = ConnectionMaintenanceSettings {
time_window: Duration::from_secs(1),
expected_effective_messages: 0.0,
effective_message_tolerance: 0.0,
expected_drop_messages: 2.0,
drop_message_tolerance: 0.1,
};
let mut maintenance = ConnectionMaintenance::<u8>::new(settings);
// Peer 0 sends 3 drop messages, more than expected
maintenance.add_drop(0);
maintenance.add_drop(0);
maintenance.add_drop(0);
// Peer 1 sends 2 drop messages, as expected
maintenance.add_drop(1);
maintenance.add_drop(1);
// Peer 2 sends 1 drop messages, less than expected
maintenance.add_drop(2);
let (malicious, unhealthy) = maintenance.reset();
assert_eq!(malicious, HashSet::from_iter(vec![0]));
assert_eq!(unhealthy, HashSet::from_iter(vec![2]));
}
}

View File

@ -1,3 +1,4 @@
pub mod conn_maintenance;
pub mod cover_traffic; pub mod cover_traffic;
pub mod membership; pub mod membership;
pub mod message_blend; pub mod message_blend;

View File

@ -1,6 +1,11 @@
use std::collections::HashSet;
use multiaddr::Multiaddr; use multiaddr::Multiaddr;
use nomos_mix_message::MixMessage; use nomos_mix_message::MixMessage;
use rand::{seq::SliceRandom, Rng}; use rand::{
seq::{IteratorRandom, SliceRandom},
Rng,
};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -48,6 +53,18 @@ where
self.remote_nodes.choose_multiple(rng, amount).collect() self.remote_nodes.choose_multiple(rng, amount).collect()
} }
pub fn filter_and_choose_remote_nodes<R: Rng>(
&self,
rng: &mut R,
amount: usize,
exclude_addrs: &HashSet<Multiaddr>,
) -> Vec<&Node<M::PublicKey>> {
self.remote_nodes
.iter()
.filter(|node| !exclude_addrs.contains(&node.address))
.choose_multiple(rng, amount)
}
pub fn local_node(&self) -> &Node<M::PublicKey> { pub fn local_node(&self) -> &Node<M::PublicKey> {
&self.local_node &self.local_node
} }

View File

@ -15,5 +15,6 @@ sha2 = "0.10"
[dev-dependencies] [dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] } tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
tokio-stream = "0.1"
libp2p = { version = "0.53", features = ["ed25519", "tokio", "quic"] } libp2p = { version = "0.53", features = ["ed25519", "tokio", "quic"] }
tracing-subscriber = "0.3.18" tracing-subscriber = "0.3.18"

View File

@ -3,6 +3,7 @@ use crate::{
handler::{FromBehaviour, MixConnectionHandler, ToBehaviour}, handler::{FromBehaviour, MixConnectionHandler, ToBehaviour},
}; };
use cached::{Cached, TimedCache}; use cached::{Cached, TimedCache};
use futures::Stream;
use libp2p::{ use libp2p::{
core::Endpoint, core::Endpoint,
swarm::{ swarm::{
@ -11,21 +12,32 @@ use libp2p::{
}, },
Multiaddr, PeerId, Multiaddr, PeerId,
}; };
use nomos_mix::conn_maintenance::{ConnectionMaintenance, ConnectionMaintenanceSettings};
use nomos_mix_message::MixMessage; use nomos_mix_message::MixMessage;
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::{ use std::{
collections::{HashMap, HashSet, VecDeque}, collections::{HashMap, HashSet, VecDeque},
pin::pin,
task::{Context, Poll, Waker}, task::{Context, Poll, Waker},
}; };
/// A [`NetworkBehaviour`]: /// A [`NetworkBehaviour`]:
/// - forwards messages to all connected peers with deduplication. /// - forwards messages to all connected peers with deduplication.
/// - receives messages from all connected peers. /// - receives messages from all connected peers.
pub struct Behaviour<M> { pub struct Behaviour<M, Interval> {
config: Config, config: Config<Interval>,
/// Peers that support the mix protocol, and their connection IDs /// Peers that support the mix protocol, and their connection IDs
negotiated_peers: HashMap<PeerId, HashSet<ConnectionId>>, negotiated_peers: HashMap<PeerId, HashSet<ConnectionId>>,
/// Connection maintenance
/// NOTE: For now, this is optional because this may close too many connections
/// until we clearly figure out the optimal parameters.
conn_maintenance: Option<ConnectionMaintenance<PeerId>>,
/// Peers that should be excluded from connection establishments
blacklist_peers: HashSet<PeerId>,
/// To maintain address of peers because libp2p API uses only [`PeerId`] when handling
/// connection events. But, we need to know [`Multiaddr`] of peers to dial to them.
peer_addresses: HashMap<PeerId, Multiaddr>,
/// Queue of events to yield to the swarm. /// Queue of events to yield to the swarm.
events: VecDeque<ToSwarm<Event, FromBehaviour>>, events: VecDeque<ToSwarm<Event, FromBehaviour>>,
/// Waker that handles polling /// Waker that handles polling
@ -37,26 +49,48 @@ pub struct Behaviour<M> {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct Config { pub struct Config<Interval> {
pub max_peering_degree: usize,
pub duplicate_cache_lifespan: u64, pub duplicate_cache_lifespan: u64,
pub conn_maintenance_settings: Option<ConnectionMaintenanceSettings>,
pub conn_maintenance_interval: Option<Interval>,
} }
#[derive(Debug)] #[derive(Debug)]
pub enum Event { pub enum Event {
/// A message received from one of the peers. /// A message received from one of the peers.
Message(Vec<u8>), Message(Vec<u8>),
/// Request establishing the `[amount]` number of new connections,
/// excluding the peers with the given addresses.
EstabalishNewConnections {
amount: usize,
excludes: HashSet<Multiaddr>,
},
Error(Error), Error(Error),
} }
impl<M> Behaviour<M> impl<M, Interval> Behaviour<M, Interval>
where where
M: MixMessage, M: MixMessage,
{ {
pub fn new(config: Config) -> Self { pub fn new(config: Config<Interval>) -> Self {
let Config {
conn_maintenance_settings,
conn_maintenance_interval,
..
} = &config;
assert_eq!(
conn_maintenance_settings.is_some(),
conn_maintenance_interval.is_some()
);
let conn_maintenance = conn_maintenance_settings.map(ConnectionMaintenance::<PeerId>::new);
let duplicate_cache = TimedCache::with_lifespan(config.duplicate_cache_lifespan); let duplicate_cache = TimedCache::with_lifespan(config.duplicate_cache_lifespan);
Self { Self {
config, config,
negotiated_peers: HashMap::new(), negotiated_peers: HashMap::new(),
conn_maintenance,
peer_addresses: HashMap::new(),
blacklist_peers: HashSet::new(),
events: VecDeque::new(), events: VecDeque::new(),
waker: None, waker: None,
duplicate_cache, duplicate_cache,
@ -115,6 +149,8 @@ where
Ok(()) Ok(())
} }
/// Add a peer to the list of connected peers that have completed negotiations
/// to verify [`Behaviour`] support.
fn add_negotiated_peer(&mut self, peer_id: PeerId, connection_id: ConnectionId) -> bool { fn add_negotiated_peer(&mut self, peer_id: PeerId, connection_id: ConnectionId) -> bool {
tracing::debug!( tracing::debug!(
"Adding to connected_peers: peer_id:{:?}, connection_id:{:?}", "Adding to connected_peers: peer_id:{:?}, connection_id:{:?}",
@ -127,20 +163,120 @@ where
.insert(connection_id) .insert(connection_id)
} }
fn remove_negotiated_peer(&mut self, peer_id: &PeerId, connection_id: &ConnectionId) { /// Remove a peer from the list of connected peers.
if let Some(connections) = self.negotiated_peers.get_mut(peer_id) { /// If the peer (or the connection of the peer) is found and removed, return `false`.
tracing::debug!( /// If not, return `true`.
"Removing from connected_peers: peer:{:?}, connection_id:{:?}", fn remove_negotiated_peer(
peer_id, &mut self,
connection_id peer_id: &PeerId,
); connection_id: Option<&ConnectionId>,
connections.remove(connection_id); ) -> bool {
if connections.is_empty() { match connection_id {
self.negotiated_peers.remove(peer_id); Some(connection_id) => match self.negotiated_peers.get_mut(peer_id) {
} Some(connections) => {
tracing::debug!(
"Removing from connected_peers: peer:{:?}, connection_id:{:?}",
peer_id,
connection_id
);
let removed = connections.remove(connection_id);
if connections.is_empty() {
self.negotiated_peers.remove(peer_id);
}
removed
}
None => false,
},
None => self.negotiated_peers.remove(peer_id).is_some(),
} }
} }
fn is_negotiated_peer(&self, peer_id: &PeerId) -> bool {
self.negotiated_peers.contains_key(peer_id)
}
/// Handle newly detected malicious and unhealthy peers and schedule new connection establishments.
/// This must be called at the end of each time window.
fn run_conn_maintenance(&mut self) {
if let Some(conn_maintenance) = self.conn_maintenance.as_mut() {
let (malicious_peers, unhealthy_peers) = conn_maintenance.reset();
let num_closed_malicious = self.handle_malicious_peers(malicious_peers);
let num_connected_unhealthy = self.handle_unhealthy_peers(unhealthy_peers);
let mut num_to_dial = num_closed_malicious + num_connected_unhealthy;
if num_to_dial + self.negotiated_peers.len() > self.config.max_peering_degree {
tracing::warn!(
"Cannot establish {} new connections due to max_peering_degree:{}. connected_peers:{}",
num_to_dial,
self.config.max_peering_degree,
self.negotiated_peers.len(),
);
num_to_dial = self.config.max_peering_degree - self.negotiated_peers.len();
}
self.schedule_dials(num_to_dial);
}
}
/// Add newly detected malicious peers to the blacklist,
/// and schedule connection closures if the connections exist.
/// This returns the number of connection closures that were actually scheduled.
fn handle_malicious_peers(&mut self, peers: HashSet<PeerId>) -> usize {
peers
.into_iter()
.map(|peer_id| {
self.blacklist_peers.insert(peer_id);
// Close the connection only if the peer was already connected.
if self.remove_negotiated_peer(&peer_id, None) {
self.events.push_back(ToSwarm::CloseConnection {
peer_id,
connection: libp2p::swarm::CloseConnection::All,
});
1
} else {
0
}
})
.sum()
}
/// Add newly detected unhealthy peers to the blacklist,
/// and return the number of connections that actually exist to the unhealthy peers.
fn handle_unhealthy_peers(&mut self, peers: HashSet<PeerId>) -> usize {
peers
.into_iter()
.map(|peer_id| {
self.blacklist_peers.insert(peer_id);
if self.is_negotiated_peer(&peer_id) {
1
} else {
0
}
})
.sum()
}
/// Schedule new connection establishments, excluding the blacklisted peers.
fn schedule_dials(&mut self, amount: usize) {
if amount == 0 {
return;
}
let excludes = self
.blacklist_peers
.iter()
.filter_map(|peer_id| self.peer_addresses.get(peer_id).cloned())
.collect();
tracing::info!(
"Scheduling {} new dialings: excludes:{:?}",
amount,
excludes
);
self.events
.push_back(ToSwarm::GenerateEvent(Event::EstabalishNewConnections {
amount,
excludes,
}));
}
/// SHA-256 hash of the message /// SHA-256 hash of the message
fn message_id(message: &[u8]) -> Vec<u8> { fn message_id(message: &[u8]) -> Vec<u8> {
let mut hasher = Sha256::new(); let mut hasher = Sha256::new();
@ -155,9 +291,10 @@ where
} }
} }
impl<M> NetworkBehaviour for Behaviour<M> impl<M, Interval> NetworkBehaviour for Behaviour<M, Interval>
where where
M: MixMessage + 'static, M: MixMessage + 'static,
Interval: Stream + Unpin + 'static,
{ {
type ConnectionHandler = MixConnectionHandler; type ConnectionHandler = MixConnectionHandler;
type ToSwarm = Event; type ToSwarm = Event;
@ -165,21 +302,25 @@ where
fn handle_established_inbound_connection( fn handle_established_inbound_connection(
&mut self, &mut self,
_: ConnectionId, _: ConnectionId,
_: PeerId, peer_id: PeerId,
_: &Multiaddr,
_: &Multiaddr, _: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> { ) -> Result<THandler<Self>, ConnectionDenied> {
Ok(MixConnectionHandler::new(&self.config)) // Keep the address of the peer
self.peer_addresses.insert(peer_id, remote_addr.clone());
Ok(MixConnectionHandler::new())
} }
fn handle_established_outbound_connection( fn handle_established_outbound_connection(
&mut self, &mut self,
_: ConnectionId, _: ConnectionId,
_: PeerId, peer_id: PeerId,
_: &Multiaddr, addr: &Multiaddr,
_: Endpoint, _: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> { ) -> Result<THandler<Self>, ConnectionDenied> {
Ok(MixConnectionHandler::new(&self.config)) // Keep the address of the peer
self.peer_addresses.insert(peer_id, addr.clone());
Ok(MixConnectionHandler::new())
} }
/// Informs the behaviour about an event from the [`Swarm`]. /// Informs the behaviour about an event from the [`Swarm`].
@ -190,7 +331,7 @@ where
.. ..
}) = event }) = event
{ {
self.remove_negotiated_peer(&peer_id, &connection_id); self.remove_negotiated_peer(&peer_id, Some(&connection_id));
} }
} }
@ -207,9 +348,16 @@ where
ToBehaviour::Message(message) => { ToBehaviour::Message(message) => {
// Ignore drop message // Ignore drop message
if M::is_drop_message(&message) { if M::is_drop_message(&message) {
if let Some(conn_maintenance) = self.conn_maintenance.as_mut() {
conn_maintenance.add_drop(peer_id);
}
return; return;
} }
if let Some(conn_maintenance) = self.conn_maintenance.as_mut() {
conn_maintenance.add_effective(peer_id);
}
// Add the message to the cache. If it was already seen, ignore it. // Add the message to the cache. If it was already seen, ignore it.
if self if self
.duplicate_cache .duplicate_cache
@ -236,7 +384,7 @@ where
self.add_negotiated_peer(peer_id, connection_id); self.add_negotiated_peer(peer_id, connection_id);
} }
ToBehaviour::NegotiationFailed => { ToBehaviour::NegotiationFailed => {
self.remove_negotiated_peer(&peer_id, &connection_id); self.remove_negotiated_peer(&peer_id, Some(&connection_id));
} }
ToBehaviour::IOError(error) => { ToBehaviour::IOError(error) => {
// TODO: Consider removing the peer from the connected_peers and closing the connection // TODO: Consider removing the peer from the connected_peers and closing the connection
@ -257,6 +405,13 @@ where
&mut self, &mut self,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
// Run connection maintenance if the interval is reached.
if let Some(interval) = self.config.conn_maintenance_interval.as_mut() {
if pin!(interval).poll_next(cx).is_ready() {
self.run_conn_maintenance();
}
}
if let Some(event) = self.events.pop_front() { if let Some(event) = self.events.pop_front() {
Poll::Ready(event) Poll::Ready(event)
} else { } else {

View File

@ -14,8 +14,6 @@ use libp2p::{
Stream, StreamProtocol, Stream, StreamProtocol,
}; };
use crate::behaviour::Config;
const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/nomos/mix/0.1.0"); const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/nomos/mix/0.1.0");
// TODO: Consider replacing this struct with libp2p_stream ConnectionHandler // TODO: Consider replacing this struct with libp2p_stream ConnectionHandler
@ -42,7 +40,7 @@ enum OutboundSubstreamState {
} }
impl MixConnectionHandler { impl MixConnectionHandler {
pub fn new(_config: &Config) -> Self { pub fn new() -> Self {
Self { Self {
inbound_substream: None, inbound_substream: None,
outbound_substream: None, outbound_substream: None,

View File

@ -14,8 +14,10 @@ mod test {
swarm::{dummy, NetworkBehaviour, SwarmEvent}, swarm::{dummy, NetworkBehaviour, SwarmEvent},
Multiaddr, PeerId, Swarm, SwarmBuilder, Multiaddr, PeerId, Swarm, SwarmBuilder,
}; };
use nomos_mix::conn_maintenance::ConnectionMaintenanceSettings;
use nomos_mix_message::mock::MockMixMessage; use nomos_mix_message::mock::MockMixMessage;
use tokio::select; use tokio::select;
use tokio_stream::wrappers::IntervalStream;
use crate::{behaviour::Config, error::Error, Behaviour, Event}; use crate::{behaviour::Config, error::Error, Behaviour, Event};
@ -116,11 +118,23 @@ mod test {
} }
} }
fn new_swarm(key: Keypair) -> Swarm<Behaviour<MockMixMessage>> { fn new_swarm(key: Keypair) -> Swarm<Behaviour<MockMixMessage, IntervalStream>> {
let conn_maintenance_settings = ConnectionMaintenanceSettings {
time_window: Duration::from_secs(60),
expected_effective_messages: 1.0,
effective_message_tolerance: 0.1,
expected_drop_messages: 1.0,
drop_message_tolerance: 0.1,
};
let conn_maintenance_interval =
IntervalStream::new(tokio::time::interval(conn_maintenance_settings.time_window));
new_swarm_with_behaviour( new_swarm_with_behaviour(
key, key,
Behaviour::new(Config { Behaviour::new(Config {
max_peering_degree: 10,
duplicate_cache_lifespan: 60, duplicate_cache_lifespan: 60,
conn_maintenance_settings: Some(conn_maintenance_settings),
conn_maintenance_interval: Some(conn_maintenance_interval),
}), }),
) )
} }

View File

@ -1,10 +1,10 @@
use cryptarchia_consensus::LeaderConfig; use cryptarchia_consensus::LeaderConfig;
// std // std
use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings; use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings;
use nomos_mix::membership::Node;
use nomos_mix::message_blend::{ use nomos_mix::message_blend::{
CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings, CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings,
}; };
use nomos_mix::{conn_maintenance::ConnectionMaintenanceSettings, membership::Node};
use nomos_mix_message::{sphinx::SphinxMessage, MixMessage}; use nomos_mix_message::{sphinx::SphinxMessage, MixMessage};
use std::path::PathBuf; use std::path::PathBuf;
use std::time::Duration; use std::time::Duration;
@ -330,6 +330,14 @@ pub fn new_mix_configs(listening_addresses: Vec<Multiaddr>) -> Vec<TestMixSettin
listening_address: listening_address.clone(), listening_address: listening_address.clone(),
node_key: ed25519::SecretKey::generate(), node_key: ed25519::SecretKey::generate(),
peering_degree: 1, peering_degree: 1,
max_peering_degree: 1,
conn_maintenance: Some(ConnectionMaintenanceSettings {
time_window: Duration::from_secs(600),
expected_effective_messages: 600.0,
effective_message_tolerance: 0.1,
expected_drop_messages: 0.0,
drop_message_tolerance: 0.0,
}),
}, },
x25519_dalek::StaticSecret::random(), x25519_dalek::StaticSecret::random(),
) )

View File

@ -1,16 +1,15 @@
use std::{io, pin::Pin, time::Duration}; use std::{collections::HashSet, pin::Pin, time::Duration};
use super::MixBackend; use super::MixBackend;
use async_trait::async_trait; use async_trait::async_trait;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use libp2p::{ use libp2p::{
core::transport::ListenerId,
identity::{ed25519, Keypair}, identity::{ed25519, Keypair},
swarm::SwarmEvent, swarm::SwarmEvent,
Multiaddr, Swarm, SwarmBuilder, TransportError, Multiaddr, Swarm, SwarmBuilder,
}; };
use nomos_libp2p::{secret_key_serde, DialError, DialOpts}; use nomos_libp2p::secret_key_serde;
use nomos_mix::membership::Membership; use nomos_mix::{conn_maintenance::ConnectionMaintenanceSettings, membership::Membership};
use nomos_mix_message::sphinx::SphinxMessage; use nomos_mix_message::sphinx::SphinxMessage;
use overwatch_rs::overwatch::handle::OverwatchHandle; use overwatch_rs::overwatch::handle::OverwatchHandle;
use rand::Rng; use rand::Rng;
@ -19,7 +18,7 @@ use tokio::{
sync::{broadcast, mpsc}, sync::{broadcast, mpsc},
task::JoinHandle, task::JoinHandle,
}; };
use tokio_stream::wrappers::BroadcastStream; use tokio_stream::wrappers::{BroadcastStream, IntervalStream};
/// A mix backend that uses the libp2p network stack. /// A mix backend that uses the libp2p network stack.
pub struct Libp2pMixBackend { pub struct Libp2pMixBackend {
@ -36,6 +35,8 @@ pub struct Libp2pMixBackendSettings {
#[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")] #[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")]
pub node_key: ed25519::SecretKey, pub node_key: ed25519::SecretKey,
pub peering_degree: usize, pub peering_degree: usize,
pub max_peering_degree: usize,
pub conn_maintenance: Option<ConnectionMaintenanceSettings>,
} }
const CHANNEL_SIZE: usize = 64; const CHANNEL_SIZE: usize = 64;
@ -44,38 +45,26 @@ const CHANNEL_SIZE: usize = 64;
impl MixBackend for Libp2pMixBackend { impl MixBackend for Libp2pMixBackend {
type Settings = Libp2pMixBackendSettings; type Settings = Libp2pMixBackendSettings;
fn new<R: Rng>( fn new<R>(
config: Self::Settings, config: Self::Settings,
overwatch_handle: OverwatchHandle, overwatch_handle: OverwatchHandle,
membership: Membership<SphinxMessage>, membership: Membership<SphinxMessage>,
mut rng: R, rng: R,
) -> Self { ) -> Self
where
R: Rng + Send + 'static,
{
let (swarm_message_sender, swarm_message_receiver) = mpsc::channel(CHANNEL_SIZE); let (swarm_message_sender, swarm_message_receiver) = mpsc::channel(CHANNEL_SIZE);
let (incoming_message_sender, _) = broadcast::channel(CHANNEL_SIZE); let (incoming_message_sender, _) = broadcast::channel(CHANNEL_SIZE);
let keypair = Keypair::from(ed25519::Keypair::from(config.node_key.clone()));
let mut swarm = MixSwarm::new( let mut swarm = MixSwarm::new(
keypair, config,
swarm_message_receiver, swarm_message_receiver,
incoming_message_sender.clone(), incoming_message_sender.clone(),
membership,
rng,
); );
swarm
.listen_on(config.listening_address)
.unwrap_or_else(|e| {
panic!("Failed to listen on Mix network: {e:?}");
});
// Randomly select peering_degree number of peers, and dial to them
membership
.choose_remote_nodes(&mut rng, config.peering_degree)
.iter()
.for_each(|node| {
if let Err(e) = swarm.dial(node.address.clone()) {
tracing::error!("failed to dial to {:?}: {:?}", node.address, e);
}
});
let task = overwatch_handle.runtime().spawn(async move { let task = overwatch_handle.runtime().spawn(async move {
swarm.run().await; swarm.run().await;
}); });
@ -105,10 +94,15 @@ impl MixBackend for Libp2pMixBackend {
} }
} }
struct MixSwarm { struct MixSwarm<R>
swarm: Swarm<nomos_mix_network::Behaviour<SphinxMessage>>, where
R: Send,
{
swarm: Swarm<nomos_mix_network::Behaviour<SphinxMessage, IntervalStream>>,
swarm_messages_receiver: mpsc::Receiver<MixSwarmMessage>, swarm_messages_receiver: mpsc::Receiver<MixSwarmMessage>,
incoming_message_sender: broadcast::Sender<Vec<u8>>, incoming_message_sender: broadcast::Sender<Vec<u8>>,
membership: Membership<SphinxMessage>,
rng: R,
} }
#[derive(Debug)] #[derive(Debug)]
@ -116,18 +110,30 @@ pub enum MixSwarmMessage {
Publish(Vec<u8>), Publish(Vec<u8>),
} }
impl MixSwarm { impl<R> MixSwarm<R>
where
R: Rng + Send,
{
fn new( fn new(
keypair: Keypair, config: Libp2pMixBackendSettings,
swarm_messages_receiver: mpsc::Receiver<MixSwarmMessage>, swarm_messages_receiver: mpsc::Receiver<MixSwarmMessage>,
incoming_message_sender: broadcast::Sender<Vec<u8>>, incoming_message_sender: broadcast::Sender<Vec<u8>>,
membership: Membership<SphinxMessage>,
rng: R,
) -> Self { ) -> Self {
let swarm = SwarmBuilder::with_existing_identity(keypair) let keypair = Keypair::from(ed25519::Keypair::from(config.node_key.clone()));
let mut swarm = SwarmBuilder::with_existing_identity(keypair)
.with_tokio() .with_tokio()
.with_quic() .with_quic()
.with_behaviour(|_| { .with_behaviour(|_| {
let conn_maintenance_interval = config.conn_maintenance.map(|settings| {
IntervalStream::new(tokio::time::interval(settings.time_window))
});
nomos_mix_network::Behaviour::new(nomos_mix_network::Config { nomos_mix_network::Behaviour::new(nomos_mix_network::Config {
max_peering_degree: config.max_peering_degree,
duplicate_cache_lifespan: 60, duplicate_cache_lifespan: 60,
conn_maintenance_settings: config.conn_maintenance,
conn_maintenance_interval,
}) })
}) })
.expect("Mix Behaviour should be built") .expect("Mix Behaviour should be built")
@ -136,19 +142,31 @@ impl MixSwarm {
}) })
.build(); .build();
Self { swarm
.listen_on(config.listening_address)
.unwrap_or_else(|e| {
panic!("Failed to listen on Mix network: {e:?}");
});
let mut mix_swarm = Self {
swarm, swarm,
swarm_messages_receiver, swarm_messages_receiver,
incoming_message_sender, incoming_message_sender,
membership,
rng,
};
// Randomly select peering_degree number of peers, and dial to them
let num_dialed = mix_swarm.dial_to_peers(config.peering_degree, None);
if num_dialed < config.peering_degree {
tracing::warn!(
"Failed to dial to enough peers. Expected: {}, Dialed: {}",
config.peering_degree,
num_dialed
);
} }
}
fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> { mix_swarm
self.swarm.listen_on(addr)
}
fn dial(&mut self, addr: Multiaddr) -> Result<(), DialError> {
self.swarm.dial(DialOpts::from(addr))
} }
async fn run(&mut self) { async fn run(&mut self) {
@ -182,6 +200,22 @@ impl MixSwarm {
tracing::error!("Failed to send incoming message to channel: {e}"); tracing::error!("Failed to send incoming message to channel: {e}");
} }
} }
SwarmEvent::Behaviour(nomos_mix_network::Event::EstabalishNewConnections {
amount,
excludes,
}) => {
tracing::debug!(
"Establishing {amount} new connections excluding peers: {excludes:?}"
);
let successful_dials = self.dial_to_peers(amount, Some(&excludes));
if successful_dials < amount {
tracing::warn!(
"Tried to establish {} new connections, but only {} succeeded",
amount,
successful_dials
);
}
}
SwarmEvent::Behaviour(nomos_mix_network::Event::Error(e)) => { SwarmEvent::Behaviour(nomos_mix_network::Event::Error(e)) => {
tracing::error!("Received error from mix network: {e:?}"); tracing::error!("Received error from mix network: {e:?}");
} }
@ -190,4 +224,29 @@ impl MixSwarm {
} }
} }
} }
fn dial_to_peers(&mut self, amount: usize, excludes: Option<&HashSet<Multiaddr>>) -> usize {
let mut successful_dials = 0;
let nodes = match excludes {
Some(excludes) => {
self.membership
.filter_and_choose_remote_nodes(&mut self.rng, amount, excludes)
}
None => self.membership.choose_remote_nodes(&mut self.rng, amount),
};
nodes
.iter()
.for_each(|node| match self.swarm.dial(node.address.clone()) {
Ok(_) => {
successful_dials += 1;
}
Err(e) => {
tracing::error!("Failed to dial to {:?}: {e}", node.address);
}
});
successful_dials
}
} }

View File

@ -21,7 +21,7 @@ pub trait MixBackend {
rng: R, rng: R,
) -> Self ) -> Self
where where
R: Rng; R: Rng + Send + 'static;
/// Publish a message to the mix network. /// Publish a message to the mix network.
async fn publish(&self, msg: Vec<u8>); async fn publish(&self, msg: Vec<u8>);
/// Listen to messages received from the mix network. /// Listen to messages received from the mix network.

View File

@ -31,6 +31,8 @@ pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec<GeneralMixConfig> {
.unwrap(), .unwrap(),
node_key, node_key,
peering_degree: 1, peering_degree: 1,
max_peering_degree: 3,
conn_maintenance: None,
}, },
private_key: x25519_dalek::StaticSecret::random(), private_key: x25519_dalek::StaticSecret::random(),
membership: Vec::new(), membership: Vec::new(),