1
0
mirror of synced 2025-01-31 01:46:07 +00:00

Mix: Add connection maintenance (#928)

* Mix: add connection maintenance

* refactor: abstract all maintenance logic into `nomos-mix` crate

* clippy happy;

* use fixed for paramters, instead of f32
This commit is contained in:
Youngjoon Lee 2024-12-09 16:31:40 +09:00 committed by GitHub
parent d67b5c5a09
commit 59726d0cc3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 720 additions and 161 deletions

View File

@ -83,9 +83,6 @@ pub struct MixArgs {
#[clap(long = "mix-node-key", env = "MIX_NODE_KEY")]
mix_node_key: Option<String>,
#[clap(long = "mix-peering-degree", env = "MIX_PEERING_DEGREE")]
mix_peering_degree: Option<usize>,
#[clap(long = "mix-num-mix-layers", env = "MIX_NUM_MIX_LAYERS")]
mix_num_mix_layers: Option<usize>,
}
@ -247,7 +244,6 @@ pub fn update_mix(
let MixArgs {
mix_addr,
mix_node_key,
mix_peering_degree,
mix_num_mix_layers,
} = mix_args;
@ -260,10 +256,6 @@ pub fn update_mix(
mix.backend.node_key = SecretKey::try_from_bytes(key_bytes.as_mut_slice())?;
}
if let Some(peering_degree) = mix_peering_degree {
mix.backend.peering_degree = peering_degree;
}
if let Some(num_mix_layers) = mix_num_mix_layers {
mix.message_blend.cryptographic_processor.num_mix_layers = num_mix_layers;
}

View File

@ -15,6 +15,7 @@ nomos-mix-message = { path = "../message" }
futures = "0.3"
multiaddr = "0.18"
x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] }
fixed = { version = "1", features = ["serde-str"] }
[dev-dependencies]

View File

@ -0,0 +1,427 @@
use std::{
collections::{HashMap, HashSet},
fmt::Debug,
time::Duration,
};
use fixed::types::U57F7;
use multiaddr::Multiaddr;
use nomos_mix_message::MixMessage;
use rand::RngCore;
use serde::{Deserialize, Serialize};
use crate::membership::Membership;
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct ConnectionMaintenanceSettings {
pub peering_degree: usize,
pub max_peering_degree: usize,
/// NOTE: We keep this optional until we gain confidence in parameter values that don't cause false detection.
pub monitor: Option<ConnectionMonitorSettings>,
}
/// Connection maintenance to detect malicious and unhealthy peers
/// based on the number of messages sent by each peer in time windows
pub struct ConnectionMaintenance<M, R>
where
M: MixMessage,
R: RngCore,
{
settings: ConnectionMaintenanceSettings,
membership: Membership<M>,
rng: R,
connected_peers: HashSet<Multiaddr>,
malicious_peers: HashSet<Multiaddr>,
/// Monitors to measure the number of effective and drop messages sent by each peer
/// NOTE: We keep this optional until we gain confidence in parameter values that don't cause false detection.
monitors: Option<HashMap<Multiaddr, ConnectionMonitor>>,
}
impl<M, R> ConnectionMaintenance<M, R>
where
M: MixMessage,
M::PublicKey: PartialEq,
R: RngCore,
{
pub fn new(settings: ConnectionMaintenanceSettings, membership: Membership<M>, rng: R) -> Self {
Self {
settings,
membership,
rng,
connected_peers: HashSet::new(),
malicious_peers: HashSet::new(),
monitors: settings.monitor.as_ref().map(|_| HashMap::new()),
}
}
/// Choose the `peering_degree` number of remote nodes to connect to.
pub fn bootstrap(&mut self) -> Vec<Multiaddr> {
self.membership
.choose_remote_nodes(&mut self.rng, self.settings.peering_degree)
.iter()
.map(|node| node.address.clone())
.collect()
// We don't add the peers to `connected_peers` because the dialings are not started yet.
}
/// Add a peer, which is fully connected, to the list of connected peers.
pub fn add_connected_peer(&mut self, peer: Multiaddr) {
self.connected_peers.insert(peer);
}
/// Remove a peer that has been disconnected.
pub fn remove_connected_peer(&mut self, peer: &Multiaddr) {
self.connected_peers.remove(peer);
}
/// Return the set of connected peers.
pub fn connected_peers(&self) -> &HashSet<Multiaddr> {
&self.connected_peers
}
/// Record a effective message sent by the [`peer`].
/// If the peer was added during the current time window, the peer is not monitored
/// until the next time window, to avoid false detection.
pub fn record_effective_message(&mut self, peer: &Multiaddr) {
if let Some(monitors) = self.monitors.as_mut() {
if let Some(monitor) = monitors.get_mut(peer) {
monitor.effective_messages = monitor
.effective_messages
.checked_add(U57F7::ONE)
.unwrap_or_else(|| {
tracing::warn!(
"Skipping recording an effective message due to overflow: Peer:{:?}",
peer
);
monitor.effective_messages
});
}
}
}
/// Record a drop message sent by the [`peer`].
/// If the peer was added during the current time window, the peer is not monitored
/// until the next time window, to avoid false detection.
pub fn record_drop_message(&mut self, peer: &Multiaddr) {
if let Some(monitors) = self.monitors.as_mut() {
if let Some(monitor) = monitors.get_mut(peer) {
monitor.drop_messages = monitor
.drop_messages
.checked_add(U57F7::ONE)
.unwrap_or_else(|| {
tracing::warn!(
"Skipping recording a drop message due to overflow: Peer:{:?}",
peer
);
monitor.drop_messages
});
}
}
}
/// Analyze connection monitors to identify malicious or unhealthy peers,
/// and return which peers to disconnect from and which new peers to connect with.
/// Additionally, this function resets and prepares the monitors for the next time window.
pub fn reset(&mut self) -> (HashSet<Multiaddr>, HashSet<Multiaddr>) {
let (malicious_peers, unhealthy_peers) = self.analyze_monitors();
// Choose peers to connect with
let peers_to_close = malicious_peers;
let num_to_connect = peers_to_close.len() + unhealthy_peers.len();
let num_to_connect = self.adjust_num_to_connect(num_to_connect, peers_to_close.len());
let peers_to_connect = if num_to_connect > 0 {
// Exclude connected peers and malicious peers from the candidate set
let excludes = self
.connected_peers
.union(&self.malicious_peers)
.cloned()
.collect();
self.membership
.filter_and_choose_remote_nodes(&mut self.rng, num_to_connect, &excludes)
.iter()
.map(|node| node.address.clone())
.collect()
} else {
HashSet::new()
};
self.reset_monitors();
(peers_to_close, peers_to_connect)
}
/// Find malicious peers and unhealthy peers by analyzing connection monitors.
/// The set of malicious peers is disjoint from the set of unhealthy peers.
fn analyze_monitors(&mut self) -> (HashSet<Multiaddr>, HashSet<Multiaddr>) {
let mut malicious_peers = HashSet::new();
let mut unhealthy_peers = HashSet::new();
if let Some(monitors) = self.monitors.as_mut() {
let settings = &self.settings.monitor.unwrap();
monitors.iter().for_each(|(peer, meter)| {
// Skip peers that are disconnected during the current time window
// because we have nothing to do for the connection that doesn't exist.
if self.connected_peers.contains(peer) {
if meter.is_malicious(settings) {
tracing::warn!("Detected a malicious peer: {:?}", peer);
malicious_peers.insert(peer.clone());
} else if meter.is_unhealthy(settings) {
tracing::warn!("Detected an unhealthy peer: {:?}", peer);
unhealthy_peers.insert(peer.clone());
}
}
});
}
assert!(malicious_peers.is_disjoint(&unhealthy_peers));
(malicious_peers, unhealthy_peers)
}
/// Reset monitors for all connected peers to be monitored in the next time window.
/// To avoid false detection, we only monitor peers that were already connected
/// before the time window started.
fn reset_monitors(&mut self) {
if let Some(monitors) = self.monitors.as_mut() {
*monitors = self
.connected_peers
.iter()
.cloned()
.map(|peer| (peer, ConnectionMonitor::new()))
.collect();
}
}
/// Adjust the number of new connections to establish in order to not exceed `max_peering_degree`.
fn adjust_num_to_connect(&self, num_to_connect: usize, num_to_close: usize) -> usize {
let max_peering_degree = self.settings.max_peering_degree;
let num_peers_after_close = self.connected_peers.len() - num_to_close;
if num_peers_after_close + num_to_connect > max_peering_degree {
let new_num_to_connect = max_peering_degree - num_peers_after_close;
tracing::warn!(
"Cannot establish {} new connections due to max_peering_degree:{}. Instead, establishing {} connections",
num_to_connect, max_peering_degree, new_num_to_connect
);
new_num_to_connect
} else {
num_to_connect
}
}
}
/// Meter to count the number of effective and drop messages sent by a peer
#[derive(Debug)]
struct ConnectionMonitor {
effective_messages: U57F7,
drop_messages: U57F7,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct ConnectionMonitorSettings {
/// Time interval to measure/evaluate the number of messages sent by each peer.
pub time_window: Duration,
/// The number of effective (data or cover) messages that a peer is expected to send in a given time window.
/// If the measured count is greater than (expected * (1 + tolerance)), the peer is considered malicious.
/// If the measured count is less than (expected * (1 - tolerance)), the peer is considered unhealthy.
pub expected_effective_messages: U57F7,
pub effective_message_tolerance: U57F7,
/// The number of drop messages that a peer is expected to send in a given time window.
/// If the measured count is greater than (expected * (1 + tolerance)), the peer is considered malicious.
/// If the measured count is less than (expected * (1 - tolerance)), the peer is considered unhealthy.
pub expected_drop_messages: U57F7,
pub drop_message_tolerance: U57F7,
}
impl ConnectionMonitor {
fn new() -> Self {
Self {
effective_messages: U57F7::ZERO,
drop_messages: U57F7::ZERO,
}
}
/// Check if the peer is malicious based on the number of effective and drop messages sent
fn is_malicious(&self, settings: &ConnectionMonitorSettings) -> bool {
let effective_threshold = settings.expected_effective_messages
* (U57F7::ONE + settings.effective_message_tolerance);
let drop_threshold =
settings.expected_drop_messages * (U57F7::ONE + settings.drop_message_tolerance);
self.effective_messages > effective_threshold || self.drop_messages > drop_threshold
}
/// Check if the peer is unhealthy based on the number of effective and drop messages sent
fn is_unhealthy(&self, settings: &ConnectionMonitorSettings) -> bool {
let effective_threshold = settings.expected_effective_messages
* (U57F7::ONE - settings.effective_message_tolerance);
let drop_threshold =
settings.expected_drop_messages * (U57F7::ONE - settings.drop_message_tolerance);
effective_threshold > self.effective_messages || drop_threshold > self.drop_messages
}
}
#[cfg(test)]
mod tests {
use nomos_mix_message::mock::MockMixMessage;
use rand::{rngs::ThreadRng, thread_rng};
use crate::membership::Node;
use super::*;
#[test]
fn meter() {
let settings = ConnectionMonitorSettings {
time_window: Duration::from_secs(1),
expected_effective_messages: U57F7::from_num(2.0),
effective_message_tolerance: U57F7::from_num(0.1),
expected_drop_messages: U57F7::from_num(1.0),
drop_message_tolerance: U57F7::from_num(0.0),
};
let monitor = ConnectionMonitor {
effective_messages: U57F7::from_num(2),
drop_messages: U57F7::from_num(1),
};
assert!(!monitor.is_malicious(&settings));
assert!(!monitor.is_unhealthy(&settings));
let monitor = ConnectionMonitor {
effective_messages: U57F7::from_num(3),
drop_messages: U57F7::from_num(1),
};
assert!(monitor.is_malicious(&settings));
assert!(!monitor.is_unhealthy(&settings));
let monitor = ConnectionMonitor {
effective_messages: U57F7::from_num(1),
drop_messages: U57F7::from_num(1),
};
assert!(!monitor.is_malicious(&settings));
assert!(monitor.is_unhealthy(&settings));
let monitor = ConnectionMonitor {
effective_messages: U57F7::from_num(2),
drop_messages: U57F7::from_num(2),
};
assert!(monitor.is_malicious(&settings));
assert!(!monitor.is_unhealthy(&settings));
let monitor = ConnectionMonitor {
effective_messages: U57F7::from_num(2),
drop_messages: U57F7::from_num(0),
};
assert!(!monitor.is_malicious(&settings));
assert!(monitor.is_unhealthy(&settings));
}
#[test]
fn malicious_and_unhealthy() {
let mut maintenance = init_maintenance(
ConnectionMaintenanceSettings {
peering_degree: 3,
max_peering_degree: 5,
monitor: Some(ConnectionMonitorSettings {
time_window: Duration::from_secs(1),
expected_effective_messages: U57F7::from_num(2.0),
effective_message_tolerance: U57F7::from_num(0.1),
expected_drop_messages: U57F7::from_num(0.0),
drop_message_tolerance: U57F7::from_num(0.0),
}),
},
10,
);
let peers = maintenance
.connected_peers
.iter()
.cloned()
.collect::<Vec<_>>();
assert_eq!(peers.len(), 3);
// Peer 0 sends 3 effective messages, more than expected
maintenance.record_effective_message(&peers[0]);
maintenance.record_effective_message(&peers[0]);
maintenance.record_effective_message(&peers[0]);
// Peer 1 sends 2 effective messages, as expected
maintenance.record_effective_message(&peers[1]);
maintenance.record_effective_message(&peers[1]);
// Peer 2 sends 1 effective messages, less than expected
maintenance.record_effective_message(&peers[2]);
let (peers_to_close, peers_to_connect) = maintenance.reset();
// Peer 0 is malicious
assert_eq!(peers_to_close, HashSet::from_iter(vec![peers[0].clone()]));
// Because Peer 1 is malicious and Peer 2 is unhealthy, 2 new connections should be established
assert_eq!(peers_to_connect.len(), 2);
assert!(peers_to_connect.is_disjoint(&maintenance.connected_peers));
}
#[test]
fn exceed_max_peering_degree() {
let mut maintenance = init_maintenance(
ConnectionMaintenanceSettings {
peering_degree: 3,
max_peering_degree: 4,
monitor: Some(ConnectionMonitorSettings {
time_window: Duration::from_secs(1),
expected_effective_messages: U57F7::from_num(2.0),
effective_message_tolerance: U57F7::from_num(0.1),
expected_drop_messages: U57F7::from_num(0.0),
drop_message_tolerance: U57F7::from_num(0.0),
}),
},
10,
);
let peers = maintenance
.connected_peers
.iter()
.cloned()
.collect::<Vec<_>>();
assert_eq!(peers.len(), 3);
// Peer 0 sends 3 effective messages, more than expected
maintenance.record_effective_message(&peers[0]);
maintenance.record_effective_message(&peers[0]);
maintenance.record_effective_message(&peers[0]);
// Peer 1 and 2 send 1 effective messages, less than expected
maintenance.record_effective_message(&peers[1]);
maintenance.record_effective_message(&peers[2]);
let (peers_to_close, peers_to_connect) = maintenance.reset();
// Peer 0 is malicious
assert_eq!(peers_to_close, HashSet::from_iter(vec![peers[0].clone()]));
// Even though we detected 1 malicious and 2 unhealthy peers,
// we cannot establish 3 new connections due to max_peering_degree.
// Instead, 2 new connections should be established.
assert_eq!(peers_to_connect.len(), 2);
assert!(peers_to_connect.is_disjoint(&maintenance.connected_peers));
}
fn init_maintenance(
settings: ConnectionMaintenanceSettings,
node_count: usize,
) -> ConnectionMaintenance<MockMixMessage, ThreadRng> {
let nodes = nodes(node_count);
let mut maintenance = ConnectionMaintenance::<MockMixMessage, ThreadRng>::new(
settings,
Membership::new(nodes.clone(), nodes[0].public_key),
thread_rng(),
);
(1..=settings.peering_degree).for_each(|i| {
maintenance.add_connected_peer(nodes[i].address.clone());
});
let _ = maintenance.reset();
maintenance
}
fn nodes(count: usize) -> Vec<Node<<MockMixMessage as MixMessage>::PublicKey>> {
(0..count)
.map(|i| Node {
address: format!("/ip4/127.0.0.1/udp/{}/quic-v1", 1000 + i)
.parse()
.unwrap(),
public_key: [i as u8; 32],
})
.collect()
}
}

View File

@ -1,3 +1,4 @@
pub mod conn_maintenance;
pub mod cover_traffic;
pub mod membership;
pub mod message_blend;

View File

@ -1,6 +1,11 @@
use std::collections::HashSet;
use multiaddr::Multiaddr;
use nomos_mix_message::MixMessage;
use rand::{seq::SliceRandom, Rng};
use rand::{
seq::{IteratorRandom, SliceRandom},
Rng,
};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug)]
@ -48,6 +53,18 @@ where
self.remote_nodes.choose_multiple(rng, amount).collect()
}
pub fn filter_and_choose_remote_nodes<R: Rng>(
&self,
rng: &mut R,
amount: usize,
exclude_addrs: &HashSet<Multiaddr>,
) -> Vec<&Node<M::PublicKey>> {
self.remote_nodes
.iter()
.filter(|node| !exclude_addrs.contains(&node.address))
.choose_multiple(rng, amount)
}
pub fn local_node(&self) -> &Node<M::PublicKey> {
&self.local_node
}

View File

@ -12,8 +12,11 @@ tracing = "0.1"
nomos-mix = { path = "../core" }
nomos-mix-message = { path = "../message" }
sha2 = "0.10"
rand = "0.8"
[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
tokio-stream = "0.1"
libp2p = { version = "0.53", features = ["ed25519", "tokio", "quic"] }
tracing-subscriber = "0.3.18"
fixed = "1"

View File

@ -3,29 +3,42 @@ use crate::{
handler::{FromBehaviour, MixConnectionHandler, ToBehaviour},
};
use cached::{Cached, TimedCache};
use futures::Stream;
use libp2p::{
core::Endpoint,
swarm::{
ConnectionClosed, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour,
NotifyHandler, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
dial_opts::DialOpts, CloseConnection, ConnectionClosed, ConnectionDenied, ConnectionId,
FromSwarm, NetworkBehaviour, NotifyHandler, THandler, THandlerInEvent, THandlerOutEvent,
ToSwarm,
},
Multiaddr, PeerId,
};
use nomos_mix::{
conn_maintenance::{ConnectionMaintenance, ConnectionMaintenanceSettings},
membership::Membership,
};
use nomos_mix_message::MixMessage;
use rand::RngCore;
use sha2::{Digest, Sha256};
use std::marker::PhantomData;
use std::{
collections::{HashMap, HashSet, VecDeque},
pin::pin,
task::{Context, Poll, Waker},
};
/// A [`NetworkBehaviour`]:
/// - forwards messages to all connected peers with deduplication.
/// - receives messages from all connected peers.
pub struct Behaviour<M> {
config: Config,
/// Peers that support the mix protocol, and their connection IDs
negotiated_peers: HashMap<PeerId, HashSet<ConnectionId>>,
pub struct Behaviour<M, R, Interval>
where
M: MixMessage,
R: RngCore,
{
config: Config<Interval>,
/// Connection maintenance
conn_maintenance: ConnectionMaintenance<M, R>,
peer_address_map: PeerAddressMap,
/// Queue of events to yield to the swarm.
events: VecDeque<ToSwarm<Event, FromBehaviour>>,
/// Waker that handles polling
@ -37,8 +50,10 @@ pub struct Behaviour<M> {
}
#[derive(Debug)]
pub struct Config {
pub struct Config<Interval> {
pub duplicate_cache_lifespan: u64,
pub conn_maintenance_settings: ConnectionMaintenanceSettings,
pub conn_maintenance_interval: Option<Interval>,
}
#[derive(Debug)]
@ -48,16 +63,31 @@ pub enum Event {
Error(Error),
}
impl<M> Behaviour<M>
impl<M, R, Interval> Behaviour<M, R, Interval>
where
M: MixMessage,
M::PublicKey: PartialEq,
R: RngCore,
{
pub fn new(config: Config) -> Self {
pub fn new(config: Config<Interval>, membership: Membership<M>, rng: R) -> Self {
let mut conn_maintenance =
ConnectionMaintenance::<M, R>::new(config.conn_maintenance_settings, membership, rng);
// Bootstrap connections with initial peers randomly chosen.
let peer_addrs: Vec<Multiaddr> = conn_maintenance.bootstrap();
let events = peer_addrs
.into_iter()
.map(|peer_addr| ToSwarm::Dial {
opts: DialOpts::from(peer_addr),
})
.collect::<VecDeque<_>>();
let duplicate_cache = TimedCache::with_lifespan(config.duplicate_cache_lifespan);
Self {
config,
negotiated_peers: HashMap::new(),
events: VecDeque::new(),
conn_maintenance,
peer_address_map: PeerAddressMap::new(),
events,
waker: None,
duplicate_cache,
_mix_message: Default::default(),
@ -93,7 +123,12 @@ where
message: Vec<u8>,
excluded_peer: Option<PeerId>,
) -> Result<(), Error> {
let mut peer_ids: HashSet<_> = self.negotiated_peers.keys().collect();
let mut peer_ids = self
.conn_maintenance
.connected_peers()
.iter()
.filter_map(|addr| self.peer_address_map.peer_id(addr))
.collect::<HashSet<_>>();
if let Some(peer) = &excluded_peer {
peer_ids.remove(peer);
}
@ -105,7 +140,7 @@ where
for peer_id in peer_ids.into_iter() {
tracing::debug!("Registering event for peer {:?} to send msg", peer_id);
self.events.push_back(ToSwarm::NotifyHandler {
peer_id: *peer_id,
peer_id,
handler: NotifyHandler::Any,
event: FromBehaviour::Message(message.clone()),
});
@ -115,39 +150,32 @@ where
Ok(())
}
fn add_negotiated_peer(&mut self, peer_id: PeerId, connection_id: ConnectionId) -> bool {
tracing::debug!(
"Adding to connected_peers: peer_id:{:?}, connection_id:{:?}",
peer_id,
connection_id
);
self.negotiated_peers
.entry(peer_id)
.or_default()
.insert(connection_id)
}
fn remove_negotiated_peer(&mut self, peer_id: &PeerId, connection_id: &ConnectionId) {
if let Some(connections) = self.negotiated_peers.get_mut(peer_id) {
tracing::debug!(
"Removing from connected_peers: peer:{:?}, connection_id:{:?}",
peer_id,
connection_id
);
connections.remove(connection_id);
if connections.is_empty() {
self.negotiated_peers.remove(peer_id);
}
}
}
/// SHA-256 hash of the message
fn message_id(message: &[u8]) -> Vec<u8> {
let mut hasher = Sha256::new();
hasher.update(message);
hasher.finalize().to_vec()
}
fn run_conn_maintenance(&mut self) {
let (peers_to_close, peers_to_connect) = self.conn_maintenance.reset();
// Schedule events to close connections
peers_to_close.into_iter().for_each(|addr| {
if let Some(peer_id) = self.peer_address_map.peer_id(&addr) {
self.events.push_back(ToSwarm::CloseConnection {
peer_id,
connection: CloseConnection::All,
});
}
});
// Schedule events to connect to peers
peers_to_connect.into_iter().for_each(|addr| {
self.events.push_back(ToSwarm::Dial {
opts: DialOpts::from(addr),
});
});
}
fn try_wake(&mut self) {
if let Some(waker) = self.waker.take() {
waker.wake();
@ -155,9 +183,12 @@ where
}
}
impl<M> NetworkBehaviour for Behaviour<M>
impl<M, R, Interval> NetworkBehaviour for Behaviour<M, R, Interval>
where
M: MixMessage + 'static,
M::PublicKey: PartialEq + 'static,
R: RngCore + 'static,
Interval: Stream + Unpin + 'static,
{
type ConnectionHandler = MixConnectionHandler;
type ToSwarm = Event;
@ -165,32 +196,40 @@ where
fn handle_established_inbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
_: &Multiaddr,
peer_id: PeerId,
_: &Multiaddr,
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(MixConnectionHandler::new(&self.config))
// Keep PeerId <> Multiaddr mapping
self.peer_address_map.add(peer_id, remote_addr.clone());
Ok(MixConnectionHandler::new())
}
fn handle_established_outbound_connection(
&mut self,
_: ConnectionId,
_: PeerId,
_: &Multiaddr,
peer_id: PeerId,
addr: &Multiaddr,
_: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(MixConnectionHandler::new(&self.config))
// Keep PeerId <> Multiaddr mapping
self.peer_address_map.add(peer_id, addr.clone());
Ok(MixConnectionHandler::new())
}
/// Informs the behaviour about an event from the [`Swarm`].
fn on_swarm_event(&mut self, event: FromSwarm) {
if let FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id,
remaining_established,
..
}) = event
{
self.remove_negotiated_peer(&peer_id, &connection_id);
if remaining_established == 0 {
if let Some(addr) = self.peer_address_map.address(&peer_id) {
self.conn_maintenance.remove_connected_peer(addr);
}
}
}
}
@ -207,9 +246,16 @@ where
ToBehaviour::Message(message) => {
// Ignore drop message
if M::is_drop_message(&message) {
if let Some(addr) = self.peer_address_map.address(&peer_id) {
self.conn_maintenance.record_drop_message(addr);
}
return;
}
if let Some(addr) = self.peer_address_map.address(&peer_id) {
self.conn_maintenance.record_effective_message(addr);
}
// Add the message to the cache. If it was already seen, ignore it.
if self
.duplicate_cache
@ -233,10 +279,14 @@ where
// The connection was fully negotiated by the peer,
// which means that the peer supports the mix protocol.
ToBehaviour::FullyNegotiatedOutbound => {
self.add_negotiated_peer(peer_id, connection_id);
if let Some(addr) = self.peer_address_map.address(&peer_id) {
self.conn_maintenance.add_connected_peer(addr.clone());
}
}
ToBehaviour::NegotiationFailed => {
self.remove_negotiated_peer(&peer_id, &connection_id);
if let Some(addr) = self.peer_address_map.address(&peer_id) {
self.conn_maintenance.remove_connected_peer(addr);
}
}
ToBehaviour::IOError(error) => {
// TODO: Consider removing the peer from the connected_peers and closing the connection
@ -257,6 +307,13 @@ where
&mut self,
cx: &mut Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
// Run connection maintenance if the interval is reached.
if let Some(interval) = &mut self.config.conn_maintenance_interval {
if pin!(interval).poll_next(cx).is_ready() {
self.run_conn_maintenance();
}
}
if let Some(event) = self.events.pop_front() {
Poll::Ready(event)
} else {
@ -265,3 +322,30 @@ where
}
}
}
struct PeerAddressMap {
peer_to_addr: HashMap<PeerId, Multiaddr>,
addr_to_peer: HashMap<Multiaddr, PeerId>,
}
impl PeerAddressMap {
fn new() -> Self {
Self {
peer_to_addr: HashMap::new(),
addr_to_peer: HashMap::new(),
}
}
fn add(&mut self, peer_id: PeerId, addr: Multiaddr) {
self.peer_to_addr.insert(peer_id, addr.clone());
self.addr_to_peer.insert(addr, peer_id);
}
fn peer_id(&self, addr: &Multiaddr) -> Option<PeerId> {
self.addr_to_peer.get(addr).copied()
}
fn address(&self, peer_id: &PeerId) -> Option<&Multiaddr> {
self.peer_to_addr.get(peer_id)
}
}

View File

@ -14,8 +14,6 @@ use libp2p::{
Stream, StreamProtocol,
};
use crate::behaviour::Config;
const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/nomos/mix/0.1.0");
// TODO: Consider replacing this struct with libp2p_stream ConnectionHandler
@ -42,7 +40,7 @@ enum OutboundSubstreamState {
}
impl MixConnectionHandler {
pub fn new(_config: &Config) -> Self {
pub fn new() -> Self {
Self {
inbound_substream: None,
outbound_substream: None,
@ -59,6 +57,12 @@ impl MixConnectionHandler {
}
}
impl Default for MixConnectionHandler {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub enum FromBehaviour {
/// A message to be sent to the connection.

View File

@ -8,38 +8,30 @@ pub use behaviour::{Behaviour, Config, Event};
mod test {
use std::time::Duration;
use fixed::types::U57F7;
use libp2p::{
futures::StreamExt,
identity::Keypair,
swarm::{dummy, NetworkBehaviour, SwarmEvent},
Multiaddr, PeerId, Swarm, SwarmBuilder,
Multiaddr, Swarm, SwarmBuilder,
};
use nomos_mix_message::mock::MockMixMessage;
use nomos_mix::{
conn_maintenance::{ConnectionMaintenanceSettings, ConnectionMonitorSettings},
membership::{Membership, Node},
};
use nomos_mix_message::{mock::MockMixMessage, MixMessage};
use rand::{rngs::ThreadRng, thread_rng};
use tokio::select;
use tokio_stream::wrappers::IntervalStream;
use crate::{behaviour::Config, error::Error, Behaviour, Event};
/// Check that a published messsage arrives in the peers successfully.
#[tokio::test]
async fn behaviour() {
let k1 = libp2p::identity::Keypair::generate_ed25519();
let peer_id1 = PeerId::from_public_key(&k1.public());
let k2 = libp2p::identity::Keypair::generate_ed25519();
// Initialize two swarms that support the mix protocol.
let mut swarm1 = new_swarm(k1);
let mut swarm2 = new_swarm(k2);
let addr: Multiaddr = "/ip4/127.0.0.1/udp/5073/quic-v1".parse().unwrap();
let addr_with_peer_id = addr.clone().with_p2p(peer_id1).unwrap();
// Swarm1 listens on the address.
swarm1.listen_on(addr).unwrap();
// Dial to swarm1 from swarm2
tokio::time::sleep(Duration::from_secs(1)).await;
swarm2.dial(addr_with_peer_id).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
let nodes = nodes(2, 8090);
let mut swarm1 = new_swarm(Membership::new(nodes.clone(), nodes[0].public_key));
let mut swarm2 = new_swarm(Membership::new(nodes.clone(), nodes[1].public_key));
// Swamr2 publishes a message.
let task = async {
@ -77,24 +69,10 @@ mod test {
/// If the peer doesn't support the mix protocol, the message should not be forwarded to the peer.
#[tokio::test]
async fn peer_not_support_mix_protocol() {
let k1 = libp2p::identity::Keypair::generate_ed25519();
let peer_id1 = PeerId::from_public_key(&k1.public());
let k2 = libp2p::identity::Keypair::generate_ed25519();
// Only swarm2 supports the mix protocol.
let mut swarm1 = new_swarm_without_mix(k1);
let mut swarm2 = new_swarm(k2);
let addr: Multiaddr = "/ip4/127.0.0.1/udp/5074/quic-v1".parse().unwrap();
let addr_with_peer_id = addr.clone().with_p2p(peer_id1).unwrap();
// Swarm1 listens on the address.
swarm1.listen_on(addr).unwrap();
// Dial to swarm1 from swarm2
tokio::time::sleep(Duration::from_secs(1)).await;
swarm2.dial(addr_with_peer_id).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
let nodes = nodes(2, 8190);
let mut swarm1 = new_swarm_without_mix(nodes[0].address.clone());
let mut swarm2 = new_swarm(Membership::new(nodes.clone(), nodes[1].public_key));
// Expect all publish attempts to fail with [`Error::NoPeers`]
// because swarm2 doesn't have any peers that support the mix protocol.
@ -116,21 +94,47 @@ mod test {
}
}
fn new_swarm(key: Keypair) -> Swarm<Behaviour<MockMixMessage>> {
new_swarm_with_behaviour(
key,
Behaviour::new(Config {
duplicate_cache_lifespan: 60,
fn new_swarm(
membership: Membership<MockMixMessage>,
) -> Swarm<Behaviour<MockMixMessage, ThreadRng, IntervalStream>> {
let conn_maintenance_settings = ConnectionMaintenanceSettings {
peering_degree: membership.size() - 1, // excluding the local node
max_peering_degree: membership.size() * 2,
monitor: Some(ConnectionMonitorSettings {
time_window: Duration::from_secs(60),
expected_effective_messages: U57F7::from_num(1.0),
effective_message_tolerance: U57F7::from_num(0.1),
expected_drop_messages: U57F7::from_num(1.0),
drop_message_tolerance: U57F7::from_num(0.1),
}),
)
};
let conn_maintenance_interval = conn_maintenance_settings
.monitor
.as_ref()
.map(|monitor| IntervalStream::new(tokio::time::interval(monitor.time_window)));
let mut swarm = new_swarm_with_behaviour(Behaviour::new(
Config {
duplicate_cache_lifespan: 60,
conn_maintenance_settings,
conn_maintenance_interval,
},
membership.clone(),
thread_rng(),
));
swarm
.listen_on(membership.local_node().address.clone())
.unwrap();
swarm
}
fn new_swarm_without_mix(key: Keypair) -> Swarm<dummy::Behaviour> {
new_swarm_with_behaviour(key, dummy::Behaviour)
fn new_swarm_without_mix(addr: Multiaddr) -> Swarm<dummy::Behaviour> {
let mut swarm = new_swarm_with_behaviour(dummy::Behaviour);
swarm.listen_on(addr).unwrap();
swarm
}
fn new_swarm_with_behaviour<B: NetworkBehaviour>(key: Keypair, behaviour: B) -> Swarm<B> {
SwarmBuilder::with_existing_identity(key)
fn new_swarm_with_behaviour<B: NetworkBehaviour>(behaviour: B) -> Swarm<B> {
SwarmBuilder::with_new_identity()
.with_tokio()
.with_other_transport(|keypair| {
libp2p::quic::tokio::Transport::new(libp2p::quic::Config::new(keypair))
@ -143,4 +147,18 @@ mod test {
})
.build()
}
fn nodes(
count: usize,
base_port: usize,
) -> Vec<Node<<MockMixMessage as MixMessage>::PublicKey>> {
(0..count)
.map(|i| Node {
address: format!("/ip4/127.0.0.1/udp/{}/quic-v1", base_port + i)
.parse()
.unwrap(),
public_key: [i as u8; 32],
})
.collect()
}
}

View File

@ -1,10 +1,10 @@
use cryptarchia_consensus::LeaderConfig;
// std
use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings;
use nomos_mix::membership::Node;
use nomos_mix::message_blend::{
CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings,
};
use nomos_mix::{conn_maintenance::ConnectionMaintenanceSettings, membership::Node};
use nomos_mix_message::{sphinx::SphinxMessage, MixMessage};
use std::path::PathBuf;
use std::time::Duration;
@ -329,7 +329,11 @@ pub fn new_mix_configs(listening_addresses: Vec<Multiaddr>) -> Vec<TestMixSettin
Libp2pMixBackendSettings {
listening_address: listening_address.clone(),
node_key: ed25519::SecretKey::generate(),
conn_maintenance: ConnectionMaintenanceSettings {
peering_degree: 1,
max_peering_degree: 1,
monitor: None,
},
},
x25519_dalek::StaticSecret::random(),
)

View File

@ -1,25 +1,24 @@
use std::{io, pin::Pin, time::Duration};
use std::{pin::Pin, time::Duration};
use super::MixBackend;
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use libp2p::{
core::transport::ListenerId,
identity::{ed25519, Keypair},
swarm::SwarmEvent,
Multiaddr, Swarm, SwarmBuilder, TransportError,
Multiaddr, Swarm, SwarmBuilder,
};
use nomos_libp2p::{secret_key_serde, DialError, DialOpts};
use nomos_mix::membership::Membership;
use nomos_libp2p::secret_key_serde;
use nomos_mix::{conn_maintenance::ConnectionMaintenanceSettings, membership::Membership};
use nomos_mix_message::sphinx::SphinxMessage;
use overwatch_rs::overwatch::handle::OverwatchHandle;
use rand::Rng;
use rand::RngCore;
use serde::{Deserialize, Serialize};
use tokio::{
sync::{broadcast, mpsc},
task::JoinHandle,
};
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::wrappers::{BroadcastStream, IntervalStream};
/// A mix backend that uses the libp2p network stack.
pub struct Libp2pMixBackend {
@ -35,7 +34,7 @@ pub struct Libp2pMixBackendSettings {
// A key for deriving PeerId and establishing secure connections (TLS 1.3 by QUIC)
#[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")]
pub node_key: ed25519::SecretKey,
pub peering_degree: usize,
pub conn_maintenance: ConnectionMaintenanceSettings,
}
const CHANNEL_SIZE: usize = 64;
@ -44,38 +43,26 @@ const CHANNEL_SIZE: usize = 64;
impl MixBackend for Libp2pMixBackend {
type Settings = Libp2pMixBackendSettings;
fn new<R: Rng>(
fn new<R>(
config: Self::Settings,
overwatch_handle: OverwatchHandle,
membership: Membership<SphinxMessage>,
mut rng: R,
) -> Self {
rng: R,
) -> Self
where
R: RngCore + Send + 'static,
{
let (swarm_message_sender, swarm_message_receiver) = mpsc::channel(CHANNEL_SIZE);
let (incoming_message_sender, _) = broadcast::channel(CHANNEL_SIZE);
let keypair = Keypair::from(ed25519::Keypair::from(config.node_key.clone()));
let mut swarm = MixSwarm::new(
keypair,
config,
membership,
rng,
swarm_message_receiver,
incoming_message_sender.clone(),
);
swarm
.listen_on(config.listening_address)
.unwrap_or_else(|e| {
panic!("Failed to listen on Mix network: {e:?}");
});
// Randomly select peering_degree number of peers, and dial to them
membership
.choose_remote_nodes(&mut rng, config.peering_degree)
.iter()
.for_each(|node| {
if let Err(e) = swarm.dial(node.address.clone()) {
tracing::error!("failed to dial to {:?}: {:?}", node.address, e);
}
});
let task = overwatch_handle.runtime().spawn(async move {
swarm.run().await;
});
@ -105,8 +92,11 @@ impl MixBackend for Libp2pMixBackend {
}
}
struct MixSwarm {
swarm: Swarm<nomos_mix_network::Behaviour<SphinxMessage>>,
struct MixSwarm<R>
where
R: RngCore + 'static,
{
swarm: Swarm<nomos_mix_network::Behaviour<SphinxMessage, R, IntervalStream>>,
swarm_messages_receiver: mpsc::Receiver<MixSwarmMessage>,
incoming_message_sender: broadcast::Sender<Vec<u8>>,
}
@ -116,19 +106,35 @@ pub enum MixSwarmMessage {
Publish(Vec<u8>),
}
impl MixSwarm {
impl<R> MixSwarm<R>
where
R: RngCore + 'static,
{
fn new(
keypair: Keypair,
config: Libp2pMixBackendSettings,
membership: Membership<SphinxMessage>,
rng: R,
swarm_messages_receiver: mpsc::Receiver<MixSwarmMessage>,
incoming_message_sender: broadcast::Sender<Vec<u8>>,
) -> Self {
let swarm = SwarmBuilder::with_existing_identity(keypair)
let keypair = Keypair::from(ed25519::Keypair::from(config.node_key.clone()));
let mut swarm = SwarmBuilder::with_existing_identity(keypair)
.with_tokio()
.with_quic()
.with_behaviour(|_| {
nomos_mix_network::Behaviour::new(nomos_mix_network::Config {
let conn_maintenance_interval =
config.conn_maintenance.monitor.as_ref().map(|monitor| {
IntervalStream::new(tokio::time::interval(monitor.time_window))
});
nomos_mix_network::Behaviour::new(
nomos_mix_network::Config {
duplicate_cache_lifespan: 60,
})
conn_maintenance_settings: config.conn_maintenance,
conn_maintenance_interval,
},
membership,
rng,
)
})
.expect("Mix Behaviour should be built")
.with_swarm_config(|cfg| {
@ -136,6 +142,12 @@ impl MixSwarm {
})
.build();
swarm
.listen_on(config.listening_address)
.unwrap_or_else(|e| {
panic!("Failed to listen on Mix network: {e:?}");
});
Self {
swarm,
swarm_messages_receiver,
@ -143,14 +155,6 @@ impl MixSwarm {
}
}
fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
self.swarm.listen_on(addr)
}
fn dial(&mut self, addr: Multiaddr) -> Result<(), DialError> {
self.swarm.dial(DialOpts::from(addr))
}
async fn run(&mut self) {
loop {
tokio::select! {

View File

@ -7,7 +7,7 @@ use futures::Stream;
use nomos_mix::membership::Membership;
use nomos_mix_message::sphinx::SphinxMessage;
use overwatch_rs::overwatch::handle::OverwatchHandle;
use rand::Rng;
use rand::RngCore;
/// A trait for mix backends that send messages to the mix network.
#[async_trait::async_trait]
@ -21,7 +21,7 @@ pub trait MixBackend {
rng: R,
) -> Self
where
R: Rng;
R: RngCore + Send + 'static;
/// Publish a message to the mix network.
async fn publish(&self, msg: Vec<u8>);
/// Listen to messages received from the mix network.

View File

@ -1,7 +1,7 @@
use std::str::FromStr;
use nomos_libp2p::{ed25519, Multiaddr};
use nomos_mix::membership::Node;
use nomos_mix::{conn_maintenance::ConnectionMaintenanceSettings, membership::Node};
use nomos_mix_message::{sphinx::SphinxMessage, MixMessage};
use nomos_mix_service::backends::libp2p::Libp2pMixBackendSettings;
@ -30,7 +30,11 @@ pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec<GeneralMixConfig> {
))
.unwrap(),
node_key,
conn_maintenance: ConnectionMaintenanceSettings {
peering_degree: 1,
max_peering_degree: 3,
monitor: None,
},
},
private_key: x25519_dalek::StaticSecret::random(),
membership: Vec::new(),