integrate
This commit is contained in:
parent
c3de9ade76
commit
05778437ed
|
@ -52,10 +52,21 @@ mix:
|
|||
backend:
|
||||
listening_address: /ip4/0.0.0.0/udp/3001/quic-v1
|
||||
node_key: 40fb62acf1604000c1b8d3bd0880e43eb2f6ae52029fde75d992ba0fed6e01c3
|
||||
membership:
|
||||
- /ip4/0.0.0.0/udp/3001/quic-v1/p2p/12D3KooWMj7KgmbDJ7RSXeFhFimvqddSXzA5XWDTwvdEJYfuoPhM
|
||||
peering_degree: 1
|
||||
num_mix_layers: 1
|
||||
persistent_transmission:
|
||||
max_emission_frequency: 1
|
||||
drop_message_probability: 0.5
|
||||
message_blend:
|
||||
cryptographic_processor:
|
||||
private_key: [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
|
||||
num_mix_layers: 1
|
||||
temporal_processor:
|
||||
max_delay_seconds: 5
|
||||
membership:
|
||||
- address: /ip4/127.0.0.1/udp/3001/quic-v1
|
||||
public_key: [1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1]
|
||||
- address: /ip4/127.0.0.1/udp/3002/quic-v1
|
||||
public_key: [2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2]
|
||||
|
||||
http:
|
||||
backend_settings:
|
||||
|
|
|
@ -83,9 +83,6 @@ pub struct MixArgs {
|
|||
#[clap(long = "mix-node-key", env = "MIX_NODE_KEY")]
|
||||
mix_node_key: Option<String>,
|
||||
|
||||
#[clap(long = "mix-membership", env = "MIX_MEMBERSHIP", num_args = 1.., value_delimiter = ',')]
|
||||
pub mix_membership: Option<Vec<Multiaddr>>,
|
||||
|
||||
#[clap(long = "mix-peering-degree", env = "MIX_PEERING_DEGREE")]
|
||||
mix_peering_degree: Option<usize>,
|
||||
|
||||
|
@ -256,7 +253,6 @@ pub fn update_mix(
|
|||
let MixArgs {
|
||||
mix_addr,
|
||||
mix_node_key,
|
||||
mix_membership,
|
||||
mix_peering_degree,
|
||||
mix_num_mix_layers,
|
||||
} = mix_args;
|
||||
|
@ -270,10 +266,6 @@ pub fn update_mix(
|
|||
mix.backend.node_key = SecretKey::try_from_bytes(key_bytes.as_mut_slice())?;
|
||||
}
|
||||
|
||||
if let Some(membership) = mix_membership {
|
||||
mix.backend.membership = membership;
|
||||
}
|
||||
|
||||
if let Some(peering_degree) = mix_peering_degree {
|
||||
mix.backend.peering_degree = peering_degree;
|
||||
}
|
||||
|
|
|
@ -13,6 +13,8 @@ serde = { version = "1.0", features = ["derive"] }
|
|||
nomos-mix-message = { path = "../message" }
|
||||
futures = "0.3"
|
||||
rand_chacha = "0.3"
|
||||
multiaddr = "0.18"
|
||||
x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] }
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
pub mod membership;
|
||||
pub mod message_blend;
|
||||
pub mod persistent_transmission;
|
||||
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
use multiaddr::Multiaddr;
|
||||
use rand::{seq::SliceRandom, Rng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Membership {
|
||||
remote_nodes: Vec<Node>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct Node {
|
||||
pub address: Multiaddr,
|
||||
pub public_key: [u8; 32],
|
||||
}
|
||||
|
||||
impl Membership {
|
||||
pub fn new(mut nodes: Vec<Node>, local_public_key: &[u8; 32]) -> Self {
|
||||
nodes.retain(|node| node.public_key != *local_public_key);
|
||||
Self {
|
||||
remote_nodes: nodes,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn choose_remote_nodes<R: Rng>(&self, rng: &mut R, amount: usize) -> Vec<&Node> {
|
||||
self.remote_nodes.choose_multiple(rng, amount).collect()
|
||||
}
|
||||
}
|
|
@ -1,33 +1,53 @@
|
|||
use nomos_mix_message::{new_message, unwrap_message};
|
||||
use rand::Rng;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::membership::Membership;
|
||||
|
||||
/// [`CryptographicProcessor`] is responsible for wrapping and unwrapping messages
|
||||
/// for the message indistinguishability.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct CryptographicProcessor {
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct CryptographicProcessor<R> {
|
||||
settings: CryptographicProcessorSettings,
|
||||
membership: Membership,
|
||||
rng: R,
|
||||
public_key: [u8; 32],
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
|
||||
pub struct CryptographicProcessorSettings {
|
||||
pub private_key: [u8; 32],
|
||||
pub num_mix_layers: usize,
|
||||
}
|
||||
|
||||
impl CryptographicProcessor {
|
||||
pub fn new(settings: CryptographicProcessorSettings) -> Self {
|
||||
Self { settings }
|
||||
impl<R: Rng> CryptographicProcessor<R> {
|
||||
pub fn new(settings: CryptographicProcessorSettings, membership: Membership, rng: R) -> Self {
|
||||
let public_key =
|
||||
x25519_dalek::PublicKey::from(&x25519_dalek::StaticSecret::from(settings.private_key))
|
||||
.to_bytes();
|
||||
Self {
|
||||
settings,
|
||||
membership,
|
||||
rng,
|
||||
public_key,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wrap_message(&self, message: &[u8]) -> Result<Vec<u8>, nomos_mix_message::Error> {
|
||||
pub fn wrap_message(&mut self, message: &[u8]) -> Result<Vec<u8>, nomos_mix_message::Error> {
|
||||
// TODO: Use the actual Sphinx encoding instead of mock.
|
||||
// TODO: Select `num_mix_layers` random nodes from the membership.
|
||||
new_message(message, self.settings.num_mix_layers.try_into().unwrap())
|
||||
let node_ids = self
|
||||
.membership
|
||||
.choose_remote_nodes(&mut self.rng, self.settings.num_mix_layers)
|
||||
.iter()
|
||||
.map(|node| node.public_key)
|
||||
.collect::<Vec<_>>();
|
||||
new_message(message, &node_ids)
|
||||
}
|
||||
|
||||
pub fn unwrap_message(
|
||||
&self,
|
||||
message: &[u8],
|
||||
) -> Result<(Vec<u8>, bool), nomos_mix_message::Error> {
|
||||
unwrap_message(message)
|
||||
unwrap_message(message, &self.public_key)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,10 +4,12 @@ pub mod temporal;
|
|||
pub use crypto::CryptographicProcessorSettings;
|
||||
use futures::stream::BoxStream;
|
||||
use futures::{Stream, StreamExt};
|
||||
use rand::Rng;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
pub use temporal::TemporalProcessorSettings;
|
||||
|
||||
use crate::membership::Membership;
|
||||
use crate::message_blend::crypto::CryptographicProcessor;
|
||||
use crate::message_blend::temporal::TemporalProcessorExt;
|
||||
use crate::MixOutgoingMessage;
|
||||
|
@ -25,19 +27,26 @@ pub struct MessageBlendSettings {
|
|||
/// [`MessageBlendStream`] handles the entire mixing tiers process
|
||||
/// - Unwraps incoming messages received from network using [`CryptographicProcessor`]
|
||||
/// - Pushes unwrapped messages to [`TemporalProcessor`]
|
||||
pub struct MessageBlendStream<S> {
|
||||
pub struct MessageBlendStream<S, R> {
|
||||
input_stream: S,
|
||||
output_stream: BoxStream<'static, MixOutgoingMessage>,
|
||||
temporal_sender: UnboundedSender<MixOutgoingMessage>,
|
||||
cryptographic_processor: CryptographicProcessor,
|
||||
cryptographic_processor: CryptographicProcessor<R>,
|
||||
}
|
||||
|
||||
impl<S> MessageBlendStream<S>
|
||||
impl<S, R> MessageBlendStream<S, R>
|
||||
where
|
||||
S: Stream<Item = Vec<u8>>,
|
||||
R: Rng,
|
||||
{
|
||||
pub fn new(input_stream: S, settings: MessageBlendSettings) -> Self {
|
||||
let cryptographic_processor = CryptographicProcessor::new(settings.cryptographic_processor);
|
||||
pub fn new(
|
||||
input_stream: S,
|
||||
settings: MessageBlendSettings,
|
||||
membership: Membership,
|
||||
rng: R,
|
||||
) -> Self {
|
||||
let cryptographic_processor =
|
||||
CryptographicProcessor::new(settings.cryptographic_processor, membership, rng);
|
||||
let (temporal_sender, temporal_receiver) = mpsc::unbounded_channel();
|
||||
let output_stream = UnboundedReceiverStream::new(temporal_receiver)
|
||||
.temporal_stream(settings.temporal_processor)
|
||||
|
@ -72,9 +81,10 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<S> Stream for MessageBlendStream<S>
|
||||
impl<S, R> Stream for MessageBlendStream<S, R>
|
||||
where
|
||||
S: Stream<Item = Vec<u8>> + Unpin,
|
||||
R: Rng + Unpin,
|
||||
{
|
||||
type Item = MixOutgoingMessage;
|
||||
|
||||
|
@ -87,11 +97,17 @@ where
|
|||
}
|
||||
|
||||
pub trait MessageBlendExt: Stream<Item = Vec<u8>> {
|
||||
fn blend(self, message_blend_settings: MessageBlendSettings) -> MessageBlendStream<Self>
|
||||
fn blend<R>(
|
||||
self,
|
||||
message_blend_settings: MessageBlendSettings,
|
||||
membership: Membership,
|
||||
rng: R,
|
||||
) -> MessageBlendStream<Self, R>
|
||||
where
|
||||
Self: Sized + Unpin,
|
||||
R: Rng,
|
||||
{
|
||||
MessageBlendStream::new(self, message_blend_settings)
|
||||
MessageBlendStream::new(self, message_blend_settings, membership, rng)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -13,11 +13,11 @@ pub type NodeId = [u8; NODE_ID_SIZE];
|
|||
const NODE_ID_SIZE: usize = 32;
|
||||
const DUMMY_NODE_ID: NodeId = [0; NODE_ID_SIZE];
|
||||
|
||||
const MAX_PADDED_PAYLOAD_SIZE: usize = 2048;
|
||||
const PADDED_PAYLOAD_SIZE: usize = 2048;
|
||||
const PAYLOAD_PADDING_SEPARATOR: u8 = 0x01;
|
||||
const PAYLOAD_PADDING_SEPARATOR_SIZE: usize = 1;
|
||||
const MAX_LAYERS: usize = 5;
|
||||
const MESSAGE_SIZE: usize = NODE_ID_SIZE * MAX_LAYERS + MAX_PADDED_PAYLOAD_SIZE;
|
||||
pub const MESSAGE_SIZE: usize = NODE_ID_SIZE * MAX_LAYERS + PADDED_PAYLOAD_SIZE;
|
||||
pub const DROP_MESSAGE: [u8; MESSAGE_SIZE] = [0; MESSAGE_SIZE];
|
||||
|
||||
/// The length of the encoded message is fixed to [`MESSAGE_SIZE`] bytes.
|
||||
|
@ -27,7 +27,7 @@ pub fn new_message(payload: &[u8], node_ids: &[NodeId]) -> Result<Vec<u8>, Error
|
|||
if node_ids.is_empty() || node_ids.len() > MAX_LAYERS {
|
||||
return Err(Error::InvalidNumberOfLayers);
|
||||
}
|
||||
if payload.len() > MAX_PADDED_PAYLOAD_SIZE - PAYLOAD_PADDING_SEPARATOR_SIZE {
|
||||
if payload.len() > PADDED_PAYLOAD_SIZE - PAYLOAD_PADDING_SEPARATOR_SIZE {
|
||||
return Err(Error::PayloadTooLarge);
|
||||
}
|
||||
|
||||
|
@ -44,7 +44,7 @@ pub fn new_message(payload: &[u8], node_ids: &[NodeId]) -> Result<Vec<u8>, Error
|
|||
message.push(PAYLOAD_PADDING_SEPARATOR);
|
||||
message.extend(
|
||||
std::iter::repeat(0)
|
||||
.take(MAX_PADDED_PAYLOAD_SIZE - payload.len() - PAYLOAD_PADDING_SEPARATOR_SIZE),
|
||||
.take(PADDED_PAYLOAD_SIZE - payload.len() - PAYLOAD_PADDING_SEPARATOR_SIZE),
|
||||
);
|
||||
Ok(message)
|
||||
}
|
||||
|
|
|
@ -13,7 +13,7 @@ use libp2p::{
|
|||
},
|
||||
Stream, StreamProtocol,
|
||||
};
|
||||
use nomos_mix_message::MAX_PADDED_PAYLOAD_SIZE;
|
||||
use nomos_mix_message::MESSAGE_SIZE;
|
||||
|
||||
use crate::behaviour::Config;
|
||||
|
||||
|
@ -256,7 +256,7 @@ async fn send_msg(mut stream: Stream, msg: Vec<u8>) -> io::Result<Stream> {
|
|||
/// Read a fixed-length message from the stream
|
||||
// TODO: Consider handling variable-length messages
|
||||
async fn recv_msg(mut stream: Stream) -> io::Result<(Stream, Vec<u8>)> {
|
||||
let mut buf = vec![0; MAX_PADDED_PAYLOAD_SIZE];
|
||||
let mut buf = vec![0; MESSAGE_SIZE];
|
||||
stream.read_exact(&mut buf).await?;
|
||||
Ok((stream, buf))
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ mod test {
|
|||
swarm::{dummy, NetworkBehaviour, SwarmEvent},
|
||||
Multiaddr, PeerId, Swarm, SwarmBuilder,
|
||||
};
|
||||
use nomos_mix_message::MAX_PADDED_PAYLOAD_SIZE;
|
||||
use nomos_mix_message::MESSAGE_SIZE;
|
||||
use tokio::select;
|
||||
|
||||
use crate::{behaviour::Config, error::Error, Behaviour, Event};
|
||||
|
@ -43,7 +43,7 @@ mod test {
|
|||
|
||||
// Swamr2 publishes a message.
|
||||
let task = async {
|
||||
let msg = vec![1; MAX_PADDED_PAYLOAD_SIZE];
|
||||
let msg = vec![1; MESSAGE_SIZE];
|
||||
let mut msg_published = false;
|
||||
let mut publish_try_interval = tokio::time::interval(Duration::from_secs(1));
|
||||
loop {
|
||||
|
@ -98,7 +98,7 @@ mod test {
|
|||
|
||||
// Expect all publish attempts to fail with [`Error::NoPeers`]
|
||||
// because swarm2 doesn't have any peers that support the mix protocol.
|
||||
let msg = vec![1; MAX_PADDED_PAYLOAD_SIZE];
|
||||
let msg = vec![1; MESSAGE_SIZE];
|
||||
let mut publish_try_interval = tokio::time::interval(Duration::from_secs(1));
|
||||
let mut publish_try_count = 0;
|
||||
loop {
|
||||
|
|
|
@ -41,6 +41,7 @@ time = "0.3"
|
|||
|
||||
[dev-dependencies]
|
||||
blake2 = { version = "0.10" }
|
||||
x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] }
|
||||
|
||||
[features]
|
||||
default = ["libp2p"]
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use cryptarchia_consensus::LeaderConfig;
|
||||
// std
|
||||
use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings;
|
||||
use nomos_mix::membership::Node;
|
||||
use nomos_mix::message_blend::{
|
||||
CryptographicProcessorSettings, MessageBlendSettings, TemporalProcessorSettings,
|
||||
};
|
||||
|
@ -187,13 +188,19 @@ pub struct TestDaNetworkSettings {
|
|||
pub node_key: ed25519::SecretKey,
|
||||
}
|
||||
|
||||
pub struct TestMixSettings {
|
||||
pub backend: Libp2pMixBackendSettings,
|
||||
pub private_key: x25519_dalek::StaticSecret,
|
||||
pub membership: Vec<Node>,
|
||||
}
|
||||
|
||||
pub fn new_node(
|
||||
leader_config: &LeaderConfig,
|
||||
ledger_config: &nomos_ledger::Config,
|
||||
genesis_state: &LedgerState,
|
||||
time_config: &TimeConfig,
|
||||
swarm_config: &SwarmConfig,
|
||||
mix_config: &Libp2pMixBackendSettings,
|
||||
mix_config: &TestMixSettings,
|
||||
db_path: PathBuf,
|
||||
blobs_dir: &PathBuf,
|
||||
initial_peers: Vec<Multiaddr>,
|
||||
|
@ -210,14 +217,18 @@ pub fn new_node(
|
|||
},
|
||||
},
|
||||
mix: MixConfig {
|
||||
backend: mix_config.clone(),
|
||||
backend: mix_config.backend.clone(),
|
||||
persistent_transmission: Default::default(),
|
||||
message_blend: MessageBlendSettings {
|
||||
cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 },
|
||||
cryptographic_processor: CryptographicProcessorSettings {
|
||||
private_key: mix_config.private_key.to_bytes(),
|
||||
num_mix_layers: 1,
|
||||
},
|
||||
temporal_processor: TemporalProcessorSettings {
|
||||
max_delay_seconds: 2,
|
||||
},
|
||||
},
|
||||
membership: mix_config.membership.clone(),
|
||||
},
|
||||
da_network: DaNetworkConfig {
|
||||
backend: DaNetworkBackendSettings {
|
||||
|
@ -308,35 +319,37 @@ pub fn new_node(
|
|||
.unwrap()
|
||||
}
|
||||
|
||||
pub fn new_mix_configs(listening_addresses: Vec<Multiaddr>) -> Vec<Libp2pMixBackendSettings> {
|
||||
let mut configs = listening_addresses
|
||||
pub fn new_mix_configs(listening_addresses: Vec<Multiaddr>) -> Vec<TestMixSettings> {
|
||||
let settings = listening_addresses
|
||||
.iter()
|
||||
.map(|listening_address| Libp2pMixBackendSettings {
|
||||
listening_address: listening_address.clone(),
|
||||
node_key: ed25519::SecretKey::generate(),
|
||||
membership: Vec::new(),
|
||||
peering_degree: 1,
|
||||
.map(|listening_address| {
|
||||
(
|
||||
Libp2pMixBackendSettings {
|
||||
listening_address: listening_address.clone(),
|
||||
node_key: ed25519::SecretKey::generate(),
|
||||
peering_degree: 1,
|
||||
},
|
||||
x25519_dalek::StaticSecret::random(),
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let membership = configs
|
||||
let membership = settings
|
||||
.iter()
|
||||
.map(|c| {
|
||||
let peer_id = PeerId::from_public_key(
|
||||
&ed25519::Keypair::from(c.node_key.clone()).public().into(),
|
||||
);
|
||||
c.listening_address
|
||||
.clone()
|
||||
.with_p2p(peer_id)
|
||||
.unwrap_or_else(|orig_addr| orig_addr)
|
||||
.map(|(backend, private_key)| Node {
|
||||
address: backend.listening_address.clone(),
|
||||
public_key: x25519_dalek::PublicKey::from(private_key).to_bytes(),
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
configs
|
||||
.iter_mut()
|
||||
.for_each(|c| c.membership = membership.clone());
|
||||
|
||||
configs
|
||||
settings
|
||||
.into_iter()
|
||||
.map(|(backend, private_key)| TestMixSettings {
|
||||
backend,
|
||||
private_key,
|
||||
membership: membership.clone(),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
// Client node is only created for asyncroniously interact with nodes in the test.
|
||||
|
|
|
@ -19,6 +19,8 @@ serde = { version = "1.0", features = ["derive"] }
|
|||
tokio = { version = "1", features = ["macros", "sync"] }
|
||||
tokio-stream = "0.1"
|
||||
tracing = "0.1"
|
||||
rand_chacha = "0.3.1"
|
||||
x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
|
|
|
@ -6,11 +6,12 @@ use libp2p::{
|
|||
core::transport::ListenerId,
|
||||
identity::{ed25519, Keypair},
|
||||
swarm::SwarmEvent,
|
||||
Multiaddr, PeerId, Swarm, SwarmBuilder, TransportError,
|
||||
Multiaddr, Swarm, SwarmBuilder, TransportError,
|
||||
};
|
||||
use nomos_libp2p::{secret_key_serde, DialError, DialOpts, Protocol};
|
||||
use nomos_libp2p::{secret_key_serde, DialError, DialOpts};
|
||||
use nomos_mix::membership::Membership;
|
||||
use overwatch_rs::overwatch::handle::OverwatchHandle;
|
||||
use rand::seq::IteratorRandom;
|
||||
use rand::Rng;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::{
|
||||
sync::{broadcast, mpsc},
|
||||
|
@ -34,7 +35,6 @@ pub struct Libp2pMixBackendSettings {
|
|||
// A key for deriving PeerId and establishing secure connections (TLS 1.3 by QUIC)
|
||||
#[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")]
|
||||
pub node_key: ed25519::SecretKey,
|
||||
pub membership: Vec<Multiaddr>,
|
||||
pub peering_degree: usize,
|
||||
}
|
||||
|
||||
|
@ -44,12 +44,16 @@ const CHANNEL_SIZE: usize = 64;
|
|||
impl MixBackend for Libp2pMixBackend {
|
||||
type Settings = Libp2pMixBackendSettings;
|
||||
|
||||
fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self {
|
||||
fn new<R: Rng>(
|
||||
config: Self::Settings,
|
||||
overwatch_handle: OverwatchHandle,
|
||||
membership: Membership,
|
||||
mut rng: R,
|
||||
) -> Self {
|
||||
let (swarm_message_sender, swarm_message_receiver) = mpsc::channel(CHANNEL_SIZE);
|
||||
let (incoming_message_sender, _) = broadcast::channel(CHANNEL_SIZE);
|
||||
|
||||
let keypair = Keypair::from(ed25519::Keypair::from(config.node_key.clone()));
|
||||
let local_peer_id = keypair.public().to_peer_id();
|
||||
let mut swarm = MixSwarm::new(
|
||||
keypair,
|
||||
swarm_message_receiver,
|
||||
|
@ -63,20 +67,12 @@ impl MixBackend for Libp2pMixBackend {
|
|||
});
|
||||
|
||||
// Randomly select peering_degree number of peers, and dial to them
|
||||
// TODO: Consider moving the peer seelction to the nomos_mix_network::Behaviour
|
||||
config
|
||||
.membership
|
||||
membership
|
||||
.choose_remote_nodes(&mut rng, config.peering_degree)
|
||||
.iter()
|
||||
.filter(|addr| match extract_peer_id(addr) {
|
||||
Some(peer_id) => peer_id != local_peer_id,
|
||||
None => false,
|
||||
})
|
||||
.choose_multiple(&mut rand::thread_rng(), config.peering_degree)
|
||||
.iter()
|
||||
.cloned()
|
||||
.for_each(|addr| {
|
||||
if let Err(e) = swarm.dial(addr.clone()) {
|
||||
tracing::error!("failed to dial to {:?}: {:?}", addr, e);
|
||||
.for_each(|node| {
|
||||
if let Err(e) = swarm.dial(node.address.clone()) {
|
||||
tracing::error!("failed to dial to {:?}: {:?}", node.address, e);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -193,13 +189,3 @@ impl MixSwarm {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_peer_id(multiaddr: &Multiaddr) -> Option<PeerId> {
|
||||
multiaddr.iter().find_map(|protocol| {
|
||||
if let Protocol::P2p(peer_id) = protocol {
|
||||
Some(peer_id)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -4,14 +4,21 @@ pub mod libp2p;
|
|||
use std::{fmt::Debug, pin::Pin};
|
||||
|
||||
use futures::Stream;
|
||||
use nomos_mix::membership::Membership;
|
||||
use overwatch_rs::overwatch::handle::OverwatchHandle;
|
||||
use rand::Rng;
|
||||
|
||||
/// A trait for mix backends that send messages to the mix network.
|
||||
#[async_trait::async_trait]
|
||||
pub trait MixBackend {
|
||||
type Settings: Clone + Debug + Send + Sync + 'static;
|
||||
|
||||
fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self;
|
||||
fn new<R: Rng>(
|
||||
config: Self::Settings,
|
||||
overwatch_handle: OverwatchHandle,
|
||||
membership: Membership,
|
||||
rng: R,
|
||||
) -> Self;
|
||||
/// Publish a message to the mix network.
|
||||
async fn publish(&self, msg: Vec<u8>);
|
||||
/// Listen to messages received from the mix network.
|
||||
|
|
|
@ -6,6 +6,7 @@ use backends::MixBackend;
|
|||
use futures::StreamExt;
|
||||
use network::NetworkAdapter;
|
||||
use nomos_core::wire;
|
||||
use nomos_mix::membership::{Membership, Node};
|
||||
use nomos_mix::message_blend::crypto::CryptographicProcessor;
|
||||
use nomos_mix::message_blend::{MessageBlendExt, MessageBlendSettings};
|
||||
use nomos_mix::persistent_transmission::{
|
||||
|
@ -20,6 +21,8 @@ use overwatch_rs::services::{
|
|||
state::{NoOperator, NoState},
|
||||
ServiceCore, ServiceData, ServiceId,
|
||||
};
|
||||
use rand::SeedableRng;
|
||||
use rand_chacha::ChaCha12Rng;
|
||||
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||
use std::fmt::Debug;
|
||||
use tokio::sync::mpsc;
|
||||
|
@ -40,6 +43,7 @@ where
|
|||
backend: Backend,
|
||||
service_state: ServiceStateHandle<Self>,
|
||||
network_relay: Relay<NetworkService<Network::Backend>>,
|
||||
membership: Membership,
|
||||
}
|
||||
|
||||
impl<Backend, Network> ServiceData for MixService<Backend, Network>
|
||||
|
@ -67,13 +71,18 @@ where
|
|||
{
|
||||
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
||||
let network_relay = service_state.overwatch_handle.relay();
|
||||
let mix_config = service_state.settings_reader.get_updated_settings();
|
||||
let membership = mix_config.membership();
|
||||
Ok(Self {
|
||||
backend: <Backend as MixBackend>::new(
|
||||
service_state.settings_reader.get_updated_settings().backend,
|
||||
service_state.overwatch_handle.clone(),
|
||||
membership.clone(),
|
||||
ChaCha12Rng::from_entropy(),
|
||||
),
|
||||
service_state,
|
||||
network_relay,
|
||||
membership,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -82,10 +91,14 @@ where
|
|||
service_state,
|
||||
mut backend,
|
||||
network_relay,
|
||||
membership,
|
||||
} = self;
|
||||
let mix_config = service_state.settings_reader.get_updated_settings();
|
||||
let cryptographic_processor =
|
||||
CryptographicProcessor::new(mix_config.message_blend.cryptographic_processor);
|
||||
let mut cryptographic_processor = CryptographicProcessor::new(
|
||||
mix_config.message_blend.cryptographic_processor,
|
||||
membership.clone(),
|
||||
ChaCha12Rng::from_entropy(),
|
||||
);
|
||||
let network_relay = network_relay.connect().await?;
|
||||
let network_adapter = Network::new(network_relay);
|
||||
|
||||
|
@ -96,9 +109,11 @@ where
|
|||
.persistent_transmission(mix_config.persistent_transmission);
|
||||
|
||||
// tier 2 blend
|
||||
let mut blend_messages = backend
|
||||
.listen_to_incoming_messages()
|
||||
.blend(mix_config.message_blend);
|
||||
let mut blend_messages = backend.listen_to_incoming_messages().blend(
|
||||
mix_config.message_blend,
|
||||
membership.clone(),
|
||||
ChaCha12Rng::from_entropy(),
|
||||
);
|
||||
|
||||
// local messages, are bypassed and send immediately
|
||||
let mut local_messages = service_state
|
||||
|
@ -188,6 +203,17 @@ pub struct MixConfig<BackendSettings> {
|
|||
pub backend: BackendSettings,
|
||||
pub message_blend: MessageBlendSettings,
|
||||
pub persistent_transmission: PersistentTransmissionSettings,
|
||||
pub membership: Vec<Node>,
|
||||
}
|
||||
|
||||
impl<BackendSettings> MixConfig<BackendSettings> {
|
||||
fn membership(&self) -> Membership {
|
||||
let public_key = x25519_dalek::PublicKey::from(&x25519_dalek::StaticSecret::from(
|
||||
self.message_blend.cryptographic_processor.private_key,
|
||||
))
|
||||
.to_bytes();
|
||||
Membership::new(self.membership.clone(), &public_key)
|
||||
}
|
||||
}
|
||||
|
||||
/// A message that is handled by [`MixService`].
|
||||
|
|
|
@ -9,6 +9,7 @@ clap = { version = "4", features = ["derive"] }
|
|||
nomos-executor = { path = "../../nodes/nomos-executor" }
|
||||
nomos-libp2p = { path = "../../nomos-libp2p" }
|
||||
nomos-node = { path = "../../nodes/nomos-node" }
|
||||
nomos-mix = { path = "../../nomos-mix/core" }
|
||||
nomos-tracing = { path = "../../nomos-tracing" }
|
||||
nomos-tracing-service = { path = "../../nomos-services/tracing" }
|
||||
rand = "0.8"
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
// std
|
||||
use std::{collections::HashMap, net::Ipv4Addr, str::FromStr};
|
||||
// crates
|
||||
use nomos_libp2p::{Multiaddr, PeerId, Protocol};
|
||||
use nomos_libp2p::{Multiaddr, PeerId};
|
||||
use nomos_mix::membership::Node;
|
||||
use nomos_tracing::{logging::loki::LokiConfig, tracing::otlp::OtlpTracingConfig};
|
||||
use nomos_tracing_service::{FilterLayer, LoggerLayer, TracingSettings};
|
||||
use rand::{thread_rng, Rng};
|
||||
|
@ -91,7 +92,7 @@ pub fn create_node_configs(
|
|||
let host_network_init_peers = update_network_init_peers(hosts.clone());
|
||||
let host_da_peer_addresses = update_da_peer_addresses(hosts.clone(), peer_addresses);
|
||||
let host_mix_membership =
|
||||
update_mix_membership(hosts.clone(), mix_configs[0].backend.membership.clone());
|
||||
update_mix_membership(hosts.clone(), mix_configs[0].membership.clone());
|
||||
|
||||
let new_peer_addresses: HashMap<PeerId, Multiaddr> = host_da_peer_addresses
|
||||
.clone()
|
||||
|
@ -122,7 +123,7 @@ pub fn create_node_configs(
|
|||
let mut mix_config = mix_configs[i].to_owned();
|
||||
mix_config.backend.listening_address =
|
||||
Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/{}/quic-v1", host.mix_port)).unwrap();
|
||||
mix_config.backend.membership = host_mix_membership.clone();
|
||||
mix_config.membership = host_mix_membership.clone();
|
||||
|
||||
// Tracing config.
|
||||
let tracing_config =
|
||||
|
@ -170,32 +171,19 @@ fn update_da_peer_addresses(
|
|||
.collect()
|
||||
}
|
||||
|
||||
fn update_mix_membership(hosts: Vec<Host>, membership: Vec<Multiaddr>) -> Vec<Multiaddr> {
|
||||
fn update_mix_membership(hosts: Vec<Host>, membership: Vec<Node>) -> Vec<Node> {
|
||||
membership
|
||||
.into_iter()
|
||||
.zip(hosts)
|
||||
.map(|(addr, host)| {
|
||||
Multiaddr::from_str(&format!(
|
||||
"/ip4/{}/udp/{}/quic-v1/p2p/{}",
|
||||
host.ip,
|
||||
host.mix_port,
|
||||
extract_peer_id(&addr).unwrap(),
|
||||
))
|
||||
.unwrap()
|
||||
.map(|(mut node, host)| {
|
||||
node.address =
|
||||
Multiaddr::from_str(&format!("/ip4/{}/udp/{}/quic-v1", host.ip, host.mix_port))
|
||||
.unwrap();
|
||||
node
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn extract_peer_id(multiaddr: &Multiaddr) -> Option<PeerId> {
|
||||
multiaddr.iter().find_map(|protocol| {
|
||||
if let Protocol::P2p(peer_id) = protocol {
|
||||
Some(peer_id)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn tracing_config_for_grafana(params: TracingParams, identifier: String) -> GeneralTracingConfig {
|
||||
GeneralTracingConfig {
|
||||
tracing_settings: TracingSettings {
|
||||
|
|
|
@ -50,6 +50,7 @@ criterion = { version = "0.5", features = ["async_tokio"] }
|
|||
nomos-cli = { path = "../nomos-cli" }
|
||||
time = "0.3"
|
||||
tracing = "0.1"
|
||||
x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] }
|
||||
|
||||
[[test]]
|
||||
name = "test_cryptarchia_happy_path"
|
||||
|
|
|
@ -158,11 +158,15 @@ pub fn create_executor_config(config: GeneralConfig) -> Config {
|
|||
backend: config.mix_config.backend,
|
||||
persistent_transmission: Default::default(),
|
||||
message_blend: MessageBlendSettings {
|
||||
cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 },
|
||||
cryptographic_processor: CryptographicProcessorSettings {
|
||||
private_key: config.mix_config.private_key.to_bytes(),
|
||||
num_mix_layers: 1,
|
||||
},
|
||||
temporal_processor: TemporalProcessorSettings {
|
||||
max_delay_seconds: 2,
|
||||
},
|
||||
},
|
||||
membership: config.mix_config.membership,
|
||||
},
|
||||
cryptarchia: CryptarchiaSettings {
|
||||
leader_config: config.consensus_config.leader_config,
|
||||
|
|
|
@ -243,13 +243,16 @@ pub fn create_validator_config(config: GeneralConfig) -> Config {
|
|||
mix: nomos_mix_service::MixConfig {
|
||||
backend: config.mix_config.backend,
|
||||
persistent_transmission: Default::default(),
|
||||
|
||||
message_blend: MessageBlendSettings {
|
||||
cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 },
|
||||
cryptographic_processor: CryptographicProcessorSettings {
|
||||
private_key: config.mix_config.private_key.to_bytes(),
|
||||
num_mix_layers: 1,
|
||||
},
|
||||
temporal_processor: TemporalProcessorSettings {
|
||||
max_delay_seconds: 2,
|
||||
},
|
||||
},
|
||||
membership: config.mix_config.membership,
|
||||
},
|
||||
cryptarchia: CryptarchiaSettings {
|
||||
leader_config: config.consensus_config.leader_config,
|
||||
|
|
|
@ -1,13 +1,16 @@
|
|||
use std::str::FromStr;
|
||||
|
||||
use nomos_libp2p::{ed25519, Multiaddr};
|
||||
use nomos_mix::membership::Node;
|
||||
use nomos_mix_service::backends::libp2p::Libp2pMixBackendSettings;
|
||||
|
||||
use crate::{get_available_port, secret_key_to_peer_id};
|
||||
use crate::get_available_port;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct GeneralMixConfig {
|
||||
pub backend: Libp2pMixBackendSettings,
|
||||
pub private_key: x25519_dalek::StaticSecret,
|
||||
pub membership: Vec<Node>,
|
||||
}
|
||||
|
||||
pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec<GeneralMixConfig> {
|
||||
|
@ -26,33 +29,28 @@ pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec<GeneralMixConfig> {
|
|||
))
|
||||
.unwrap(),
|
||||
node_key,
|
||||
membership: Vec::new(),
|
||||
peering_degree: 1,
|
||||
},
|
||||
private_key: x25519_dalek::StaticSecret::random(),
|
||||
membership: Vec::new(),
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let membership = mix_membership(&configs);
|
||||
|
||||
let nodes = mix_nodes(&configs);
|
||||
configs.iter_mut().for_each(|config| {
|
||||
config.backend.membership = membership.clone();
|
||||
config.membership = nodes.clone();
|
||||
});
|
||||
|
||||
configs
|
||||
}
|
||||
|
||||
fn mix_membership(configs: &[GeneralMixConfig]) -> Vec<Multiaddr> {
|
||||
fn mix_nodes(configs: &[GeneralMixConfig]) -> Vec<Node> {
|
||||
configs
|
||||
.iter()
|
||||
.map(|config| {
|
||||
let peer_id = secret_key_to_peer_id(config.backend.node_key.clone());
|
||||
config
|
||||
.backend
|
||||
.listening_address
|
||||
.clone()
|
||||
.with_p2p(peer_id)
|
||||
.unwrap_or_else(|orig_addr| orig_addr)
|
||||
.map(|config| Node {
|
||||
address: config.backend.listening_address.clone(),
|
||||
public_key: x25519_dalek::PublicKey::from(&config.private_key).to_bytes(),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue