Mix: Make Address generic for Membership

This commit is contained in:
Youngjoon Lee 2024-12-06 01:56:11 +09:00
parent a58498cab8
commit d385e9f855
No known key found for this signature in database
GPG Key ID: 303963A54A81DD4D
13 changed files with 106 additions and 77 deletions

View File

@ -13,7 +13,6 @@ rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
nomos-mix-message = { path = "../message" }
futures = "0.3"
multiaddr = "0.18"
x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] }
fixed = { version = "1", features = ["serde-str"] }

View File

@ -1,11 +1,11 @@
use std::{
collections::{HashMap, HashSet},
fmt::Debug,
hash::Hash,
time::Duration,
};
use fixed::types::U57F7;
use multiaddr::Multiaddr;
use nomos_mix_message::MixMessage;
use rand::RngCore;
use serde::{Deserialize, Serialize};
@ -22,28 +22,33 @@ pub struct ConnectionMaintenanceSettings {
/// Connection maintenance to detect malicious and unhealthy peers
/// based on the number of messages sent by each peer in time windows
pub struct ConnectionMaintenance<M, R>
pub struct ConnectionMaintenance<Address, M, R>
where
M: MixMessage,
R: RngCore,
{
settings: ConnectionMaintenanceSettings,
membership: Membership<M>,
membership: Membership<Address, M>,
rng: R,
connected_peers: HashSet<Multiaddr>,
malicious_peers: HashSet<Multiaddr>,
connected_peers: HashSet<Address>,
malicious_peers: HashSet<Address>,
/// Monitors to measure the number of effective and drop messages sent by each peer
/// NOTE: We keep this optional until we gain confidence in parameter values that don't cause false detection.
monitors: Option<HashMap<Multiaddr, ConnectionMonitor>>,
monitors: Option<HashMap<Address, ConnectionMonitor>>,
}
impl<M, R> ConnectionMaintenance<M, R>
impl<Address, M, R> ConnectionMaintenance<Address, M, R>
where
Address: Eq + Hash + Clone + Debug,
M: MixMessage,
M::PublicKey: PartialEq,
R: RngCore,
{
pub fn new(settings: ConnectionMaintenanceSettings, membership: Membership<M>, rng: R) -> Self {
pub fn new(
settings: ConnectionMaintenanceSettings,
membership: Membership<Address, M>,
rng: R,
) -> Self {
Self {
settings,
membership,
@ -55,7 +60,7 @@ where
}
/// Choose the `peering_degree` number of remote nodes to connect to.
pub fn bootstrap(&mut self) -> Vec<Multiaddr> {
pub fn bootstrap(&mut self) -> Vec<Address> {
self.membership
.choose_remote_nodes(&mut self.rng, self.settings.peering_degree)
.iter()
@ -65,24 +70,24 @@ where
}
/// Add a peer, which is fully connected, to the list of connected peers.
pub fn add_connected_peer(&mut self, peer: Multiaddr) {
pub fn add_connected_peer(&mut self, peer: Address) {
self.connected_peers.insert(peer);
}
/// Remove a peer that has been disconnected.
pub fn remove_connected_peer(&mut self, peer: &Multiaddr) {
pub fn remove_connected_peer(&mut self, peer: &Address) {
self.connected_peers.remove(peer);
}
/// Return the set of connected peers.
pub fn connected_peers(&self) -> &HashSet<Multiaddr> {
pub fn connected_peers(&self) -> &HashSet<Address> {
&self.connected_peers
}
/// Record a effective message sent by the [`peer`].
/// If the peer was added during the current time window, the peer is not monitored
/// until the next time window, to avoid false detection.
pub fn record_effective_message(&mut self, peer: &Multiaddr) {
pub fn record_effective_message(&mut self, peer: &Address) {
if let Some(monitors) = self.monitors.as_mut() {
if let Some(monitor) = monitors.get_mut(peer) {
monitor.effective_messages = monitor
@ -102,7 +107,7 @@ where
/// Record a drop message sent by the [`peer`].
/// If the peer was added during the current time window, the peer is not monitored
/// until the next time window, to avoid false detection.
pub fn record_drop_message(&mut self, peer: &Multiaddr) {
pub fn record_drop_message(&mut self, peer: &Address) {
if let Some(monitors) = self.monitors.as_mut() {
if let Some(monitor) = monitors.get_mut(peer) {
monitor.drop_messages = monitor
@ -125,9 +130,9 @@ where
pub fn reset(
&mut self,
) -> Option<(
HashMap<Multiaddr, ConnectionMonitor>,
HashSet<Multiaddr>,
HashSet<Multiaddr>,
HashMap<Address, ConnectionMonitor>,
HashSet<Address>,
HashSet<Address>,
)> {
let (malicious_peers, unhealthy_peers) = self.analyze_monitors();
@ -157,7 +162,7 @@ where
/// Find malicious peers and unhealthy peers by analyzing connection monitors.
/// The set of malicious peers is disjoint from the set of unhealthy peers.
fn analyze_monitors(&mut self) -> (HashSet<Multiaddr>, HashSet<Multiaddr>) {
fn analyze_monitors(&mut self) -> (HashSet<Address>, HashSet<Address>) {
let mut malicious_peers = HashSet::new();
let mut unhealthy_peers = HashSet::new();
@ -185,7 +190,7 @@ where
/// Reset monitors for all connected peers to be monitored in the next time window.
/// To avoid false detection, we only monitor peers that were already connected
/// before the time window started.
fn reset_monitors(&mut self) -> Option<HashMap<Multiaddr, ConnectionMonitor>> {
fn reset_monitors(&mut self) -> Option<HashMap<Address, ConnectionMonitor>> {
match self.monitors.take() {
Some(old_monitors) => {
self.monitors = Some(
@ -410,9 +415,9 @@ mod tests {
fn init_maintenance(
settings: ConnectionMaintenanceSettings,
node_count: usize,
) -> ConnectionMaintenance<MockMixMessage, ThreadRng> {
) -> ConnectionMaintenance<usize, MockMixMessage, ThreadRng> {
let nodes = nodes(node_count);
let mut maintenance = ConnectionMaintenance::<MockMixMessage, ThreadRng>::new(
let mut maintenance = ConnectionMaintenance::<usize, MockMixMessage, ThreadRng>::new(
settings,
Membership::new(nodes.clone(), nodes[0].public_key),
thread_rng(),
@ -424,12 +429,10 @@ mod tests {
maintenance
}
fn nodes(count: usize) -> Vec<Node<<MockMixMessage as MixMessage>::PublicKey>> {
fn nodes(count: usize) -> Vec<Node<usize, <MockMixMessage as MixMessage>::PublicKey>> {
(0..count)
.map(|i| Node {
address: format!("/ip4/127.0.0.1/udp/{}/quic-v1", 1000 + i)
.parse()
.unwrap(),
address: i,
public_key: [i as u8; 32],
})
.collect()

View File

@ -1,6 +1,5 @@
use std::collections::HashSet;
use std::{collections::HashSet, hash::Hash};
use multiaddr::Multiaddr;
use nomos_mix_message::MixMessage;
use rand::{
seq::{IteratorRandom, SliceRandom},
@ -9,26 +8,27 @@ use rand::{
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug)]
pub struct Membership<M>
pub struct Membership<Address, M>
where
M: MixMessage,
{
remote_nodes: Vec<Node<M::PublicKey>>,
local_node: Node<M::PublicKey>,
remote_nodes: Vec<Node<Address, M::PublicKey>>,
local_node: Node<Address, M::PublicKey>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Node<K> {
pub address: Multiaddr,
pub struct Node<Address, K> {
pub address: Address,
pub public_key: K,
}
impl<M> Membership<M>
impl<Address, M> Membership<Address, M>
where
Address: Eq + Hash,
M: MixMessage,
M::PublicKey: PartialEq,
{
pub fn new(nodes: Vec<Node<M::PublicKey>>, local_public_key: M::PublicKey) -> Self {
pub fn new(nodes: Vec<Node<Address, M::PublicKey>>, local_public_key: M::PublicKey) -> Self {
let mut remote_nodes = Vec::with_capacity(nodes.len() - 1);
let mut local_node = None;
nodes.into_iter().for_each(|node| {
@ -49,7 +49,7 @@ where
&self,
rng: &mut R,
amount: usize,
) -> Vec<&Node<M::PublicKey>> {
) -> Vec<&Node<Address, M::PublicKey>> {
self.remote_nodes.choose_multiple(rng, amount).collect()
}
@ -57,15 +57,15 @@ where
&self,
rng: &mut R,
amount: usize,
exclude_addrs: &HashSet<Multiaddr>,
) -> Vec<&Node<M::PublicKey>> {
exclude_addrs: &HashSet<Address>,
) -> Vec<&Node<Address, M::PublicKey>> {
self.remote_nodes
.iter()
.filter(|node| !exclude_addrs.contains(&node.address))
.choose_multiple(rng, amount)
}
pub fn local_node(&self) -> &Node<M::PublicKey> {
pub fn local_node(&self) -> &Node<Address, M::PublicKey> {
&self.local_node
}

View File

@ -1,3 +1,5 @@
use std::hash::Hash;
use crate::membership::Membership;
use nomos_mix_message::MixMessage;
use rand::RngCore;
@ -5,12 +7,12 @@ use serde::{Deserialize, Serialize};
/// [`CryptographicProcessor`] is responsible for wrapping and unwrapping messages
/// for the message indistinguishability.
pub struct CryptographicProcessor<R, M>
pub struct CryptographicProcessor<R, Address, M>
where
M: MixMessage,
{
settings: CryptographicProcessorSettings<M::PrivateKey>,
membership: Membership<M>,
membership: Membership<Address, M>,
rng: R,
}
@ -20,15 +22,16 @@ pub struct CryptographicProcessorSettings<K> {
pub num_mix_layers: usize,
}
impl<R, M> CryptographicProcessor<R, M>
impl<R, Address, M> CryptographicProcessor<R, Address, M>
where
R: RngCore,
Address: Eq + Hash,
M: MixMessage,
M::PublicKey: Clone + PartialEq,
{
pub fn new(
settings: CryptographicProcessorSettings<M::PrivateKey>,
membership: Membership<M>,
membership: Membership<Address, M>,
rng: R,
) -> Self {
Self {

View File

@ -5,6 +5,7 @@ pub use crypto::CryptographicProcessorSettings;
use futures::{Stream, StreamExt};
use rand::RngCore;
use std::fmt::Debug;
use std::hash::Hash;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
@ -34,22 +35,23 @@ where
/// [`MessageBlendStream`] handles the entire mixing tiers process
/// - Unwraps incoming messages received from network using [`CryptographicProcessor`]
/// - Pushes unwrapped messages to [`TemporalProcessor`]
pub struct MessageBlendStream<S, Rng, M, Scheduler>
pub struct MessageBlendStream<S, Rng, Address, M, Scheduler>
where
M: MixMessage,
{
input_stream: S,
output_stream: Pin<Box<dyn Stream<Item = MixOutgoingMessage> + Send + Sync + 'static>>,
temporal_sender: UnboundedSender<MixOutgoingMessage>,
cryptographic_processor: CryptographicProcessor<Rng, M>,
cryptographic_processor: CryptographicProcessor<Rng, Address, M>,
_rng: PhantomData<Rng>,
_scheduler: PhantomData<Scheduler>,
}
impl<S, Rng, M, Scheduler> MessageBlendStream<S, Rng, M, Scheduler>
impl<S, Rng, Address, M, Scheduler> MessageBlendStream<S, Rng, Address, M, Scheduler>
where
S: Stream<Item = Vec<u8>>,
Rng: RngCore + Unpin + Send + 'static,
Address: Eq + Hash,
M: MixMessage,
M::PrivateKey: Serialize + DeserializeOwned,
M::PublicKey: Clone + PartialEq,
@ -59,7 +61,7 @@ where
pub fn new(
input_stream: S,
settings: MessageBlendSettings<M>,
membership: Membership<M>,
membership: Membership<Address, M>,
scheduler: Scheduler,
cryptographic_processor_rng: Rng,
) -> Self {
@ -100,10 +102,11 @@ where
}
}
impl<S, Rng, M, Scheduler> Stream for MessageBlendStream<S, Rng, M, Scheduler>
impl<S, Rng, Address, M, Scheduler> Stream for MessageBlendStream<S, Rng, Address, M, Scheduler>
where
S: Stream<Item = Vec<u8>> + Unpin,
Rng: RngCore + Unpin + Send + 'static,
Address: Eq + Hash + Unpin,
M: MixMessage + Unpin,
M::PrivateKey: Serialize + DeserializeOwned + Unpin,
M::PublicKey: Clone + PartialEq + Unpin,
@ -120,9 +123,10 @@ where
}
}
pub trait MessageBlendExt<Rng, M, Scheduler>: Stream<Item = Vec<u8>>
pub trait MessageBlendExt<Rng, Address, M, Scheduler>: Stream<Item = Vec<u8>>
where
Rng: RngCore + Send + Unpin + 'static,
Address: Eq + Hash,
M: MixMessage,
M::PrivateKey: Serialize + DeserializeOwned,
M::PublicKey: Clone + PartialEq,
@ -132,10 +136,10 @@ where
fn blend(
self,
message_blend_settings: MessageBlendSettings<M>,
membership: Membership<M>,
membership: Membership<Address, M>,
scheduler: Scheduler,
cryptographic_processor_rng: Rng,
) -> MessageBlendStream<Self, Rng, M, Scheduler>
) -> MessageBlendStream<Self, Rng, Address, M, Scheduler>
where
Self: Sized + Unpin,
{
@ -149,10 +153,11 @@ where
}
}
impl<T, Rng, M, S> MessageBlendExt<Rng, M, S> for T
impl<T, Rng, Address, M, S> MessageBlendExt<Rng, Address, M, S> for T
where
T: Stream<Item = Vec<u8>>,
Rng: RngCore + Unpin + Send + 'static,
Address: Eq + Hash,
M: MixMessage,
M::PrivateKey: Clone + Serialize + DeserializeOwned + PartialEq,
M::PublicKey: Clone + Serialize + DeserializeOwned + PartialEq,

View File

@ -37,7 +37,7 @@ where
{
config: Config<Interval>,
/// Connection maintenance
conn_maintenance: ConnectionMaintenance<M, R>,
conn_maintenance: ConnectionMaintenance<Multiaddr, M, R>,
peer_address_map: PeerAddressMap,
/// Queue of events to yield to the swarm.
events: VecDeque<ToSwarm<Event, FromBehaviour>>,
@ -69,9 +69,12 @@ where
M::PublicKey: PartialEq,
R: RngCore,
{
pub fn new(config: Config<Interval>, membership: Membership<M>, rng: R) -> Self {
let mut conn_maintenance =
ConnectionMaintenance::<M, R>::new(config.conn_maintenance_settings, membership, rng);
pub fn new(config: Config<Interval>, membership: Membership<Multiaddr, M>, rng: R) -> Self {
let mut conn_maintenance = ConnectionMaintenance::<Multiaddr, M, R>::new(
config.conn_maintenance_settings,
membership,
rng,
);
// Bootstrap connections with initial peers randomly chosen.
let peer_addrs: Vec<Multiaddr> = conn_maintenance.bootstrap();

View File

@ -95,7 +95,7 @@ mod test {
}
fn new_swarm(
membership: Membership<MockMixMessage>,
membership: Membership<Multiaddr, MockMixMessage>,
) -> Swarm<Behaviour<MockMixMessage, ThreadRng, IntervalStream>> {
let conn_maintenance_settings = ConnectionMaintenanceSettings {
peering_degree: membership.size() - 1, // excluding the local node
@ -151,7 +151,7 @@ mod test {
fn nodes(
count: usize,
base_port: usize,
) -> Vec<Node<<MockMixMessage as MixMessage>::PublicKey>> {
) -> Vec<Node<Multiaddr, <MockMixMessage as MixMessage>::PublicKey>> {
(0..count)
.map(|i| Node {
address: format!("/ip4/127.0.0.1/udp/{}/quic-v1", base_port + i)

View File

@ -191,7 +191,7 @@ pub struct TestDaNetworkSettings {
pub struct TestMixSettings {
pub backend: Libp2pMixBackendSettings,
pub private_key: x25519_dalek::StaticSecret,
pub membership: Vec<Node<<SphinxMessage as MixMessage>::PublicKey>>,
pub membership: Vec<Node<Multiaddr, <SphinxMessage as MixMessage>::PublicKey>>,
}
pub fn new_node(

View File

@ -42,11 +42,12 @@ const CHANNEL_SIZE: usize = 64;
#[async_trait]
impl MixBackend for Libp2pMixBackend {
type Settings = Libp2pMixBackendSettings;
type Address = Multiaddr;
fn new<R>(
config: Self::Settings,
overwatch_handle: OverwatchHandle,
membership: Membership<SphinxMessage>,
membership: Membership<Self::Address, SphinxMessage>,
rng: R,
) -> Self
where
@ -112,7 +113,7 @@ where
{
fn new(
config: Libp2pMixBackendSettings,
membership: Membership<SphinxMessage>,
membership: Membership<Multiaddr, SphinxMessage>,
rng: R,
swarm_messages_receiver: mpsc::Receiver<MixSwarmMessage>,
incoming_message_sender: broadcast::Sender<Vec<u8>>,

View File

@ -1,7 +1,7 @@
#[cfg(feature = "libp2p")]
pub mod libp2p;
use std::{fmt::Debug, pin::Pin};
use std::{fmt::Debug, hash::Hash, pin::Pin};
use futures::Stream;
use nomos_mix::membership::Membership;
@ -13,11 +13,12 @@ use rand::RngCore;
#[async_trait::async_trait]
pub trait MixBackend {
type Settings: Clone + Debug + Send + Sync + 'static;
type Address: Eq + Hash + Clone;
fn new<R>(
config: Self::Settings,
overwatch_handle: OverwatchHandle,
membership: Membership<SphinxMessage>,
membership: Membership<Self::Address, SphinxMessage>,
rng: R,
) -> Self
where

View File

@ -30,6 +30,7 @@ use rand::SeedableRng;
use rand_chacha::ChaCha12Rng;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::fmt::Debug;
use std::hash::Hash;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time;
@ -50,7 +51,7 @@ where
backend: Backend,
service_state: ServiceStateHandle<Self>,
network_relay: Relay<NetworkService<Network::Backend>>,
membership: Membership<SphinxMessage>,
membership: Membership<Backend::Address, SphinxMessage>,
}
impl<Backend, Network> ServiceData for MixService<Backend, Network>
@ -61,7 +62,7 @@ where
Network::BroadcastSettings: Clone + Debug + Serialize + DeserializeOwned,
{
const SERVICE_ID: ServiceId = "Mix";
type Settings = MixConfig<Backend::Settings>;
type Settings = MixConfig<Backend::Settings, Backend::Address>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = ServiceMessage<Network::BroadcastSettings>;
@ -72,6 +73,7 @@ impl<Backend, Network> ServiceCore for MixService<Backend, Network>
where
Backend: MixBackend + Send + 'static,
Backend::Settings: Clone,
Backend::Address: Unpin + Send + Sync + 'static,
Network: NetworkAdapter + Send + Sync + 'static,
Network::BroadcastSettings:
Clone + Debug + Serialize + DeserializeOwned + Send + Sync + 'static,
@ -224,7 +226,11 @@ where
fn wrap_and_send_to_persistent_transmission(
message: Vec<u8>,
cryptographic_processor: &mut CryptographicProcessor<ChaCha12Rng, SphinxMessage>,
cryptographic_processor: &mut CryptographicProcessor<
ChaCha12Rng,
Backend::Address,
SphinxMessage,
>,
persistent_sender: &mpsc::UnboundedSender<Vec<u8>>,
) {
match cryptographic_processor.wrap_message(&message) {
@ -241,12 +247,12 @@ where
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct MixConfig<BackendSettings> {
pub struct MixConfig<BackendSettings, Address> {
pub backend: BackendSettings,
pub message_blend: MessageBlendSettings<SphinxMessage>,
pub persistent_transmission: PersistentTransmissionSettings,
pub cover_traffic: CoverTrafficExtSettings,
pub membership: Vec<Node<<SphinxMessage as nomos_mix_message::MixMessage>::PublicKey>>,
pub membership: Vec<Node<Address, <SphinxMessage as nomos_mix_message::MixMessage>::PublicKey>>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
@ -256,13 +262,16 @@ pub struct CoverTrafficExtSettings {
}
impl CoverTrafficExtSettings {
fn cover_traffic_settings(
fn cover_traffic_settings<Address>(
&self,
membership: &Membership<SphinxMessage>,
membership: &Membership<Address, SphinxMessage>,
cryptographic_processor_settings: &CryptographicProcessorSettings<
<SphinxMessage as MixMessage>::PrivateKey,
>,
) -> CoverTrafficSettings {
) -> CoverTrafficSettings
where
Address: Eq + Hash,
{
CoverTrafficSettings {
node_id: membership.local_node().public_key,
number_of_hops: cryptographic_processor_settings.num_mix_layers,
@ -301,8 +310,11 @@ impl CoverTrafficExtSettings {
}
}
impl<BackendSettings> MixConfig<BackendSettings> {
fn membership(&self) -> Membership<SphinxMessage> {
impl<BackendSettings, Address> MixConfig<BackendSettings, Address>
where
Address: Eq + Hash + Clone,
{
fn membership(&self) -> Membership<Address, SphinxMessage> {
let public_key = x25519_dalek::PublicKey::from(&x25519_dalek::StaticSecret::from(
self.message_blend.cryptographic_processor.private_key,
))

View File

@ -176,8 +176,8 @@ fn update_da_peer_addresses(
fn update_mix_membership(
hosts: Vec<Host>,
membership: Vec<Node<<SphinxMessage as MixMessage>::PublicKey>>,
) -> Vec<Node<<SphinxMessage as MixMessage>::PublicKey>> {
membership: Vec<Node<Multiaddr, <SphinxMessage as MixMessage>::PublicKey>>,
) -> Vec<Node<Multiaddr, <SphinxMessage as MixMessage>::PublicKey>> {
membership
.into_iter()
.zip(hosts)

View File

@ -11,7 +11,7 @@ use crate::get_available_port;
pub struct GeneralMixConfig {
pub backend: Libp2pMixBackendSettings,
pub private_key: x25519_dalek::StaticSecret,
pub membership: Vec<Node<<SphinxMessage as MixMessage>::PublicKey>>,
pub membership: Vec<Node<Multiaddr, <SphinxMessage as MixMessage>::PublicKey>>,
}
pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec<GeneralMixConfig> {
@ -50,7 +50,9 @@ pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec<GeneralMixConfig> {
configs
}
fn mix_nodes(configs: &[GeneralMixConfig]) -> Vec<Node<<SphinxMessage as MixMessage>::PublicKey>> {
fn mix_nodes(
configs: &[GeneralMixConfig],
) -> Vec<Node<Multiaddr, <SphinxMessage as MixMessage>::PublicKey>> {
configs
.iter()
.map(|config| Node {