Mix: Integrate packet encoding

This commit is contained in:
Youngjoon Lee 2024-11-02 17:36:29 +09:00
parent 5c8b84d3e1
commit 906fe6e2e5
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
27 changed files with 387 additions and 203 deletions

View File

@ -83,9 +83,6 @@ pub struct MixArgs {
#[clap(long = "mix-node-key", env = "MIX_NODE_KEY")]
mix_node_key: Option<String>,
#[clap(long = "mix-membership", env = "MIX_MEMBERSHIP", num_args = 1.., value_delimiter = ',')]
pub mix_membership: Option<Vec<Multiaddr>>,
#[clap(long = "mix-peering-degree", env = "MIX_PEERING_DEGREE")]
mix_peering_degree: Option<usize>,
@ -256,7 +253,6 @@ pub fn update_mix(
let MixArgs {
mix_addr,
mix_node_key,
mix_membership,
mix_peering_degree,
mix_num_mix_layers,
} = mix_args;
@ -270,10 +266,6 @@ pub fn update_mix(
mix.backend.node_key = SecretKey::try_from_bytes(key_bytes.as_mut_slice())?;
}
if let Some(membership) = mix_membership {
mix.backend.membership = membership;
}
if let Some(peering_degree) = mix_peering_degree {
mix.backend.peering_degree = peering_degree;
}

View File

@ -5,6 +5,7 @@ edition = "2021"
[dependencies]
cached = "0.53"
thiserror = "1"
tokio = { version = "1", features = ["time", "sync", "macros"] }
tracing = "0.1"
rand = "0.8"
@ -12,7 +13,7 @@ serde = { version = "1.0", features = ["derive"] }
nomos-mix-message = { path = "../message" }
futures = "0.3"
rand_chacha = "0.3"
multiaddr = "0.18.2"
[dev-dependencies]
tokio = { version = "1", features = ["rt-multi-thread"] }

View File

@ -0,0 +1,7 @@
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Not sufficient nodes")]
NotSufficientNodes,
#[error("Mix message error: {0}")]
MixMessageError(#[from] nomos_mix_message::Error),
}

View File

@ -1,2 +1,4 @@
pub mod error;
pub mod membership;
pub mod message_blend;
pub mod persistent_transmission;

View File

@ -0,0 +1,24 @@
use multiaddr::Multiaddr;
use rand::{seq::SliceRandom, Rng};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug)]
pub struct Membership {
remote_nodes: Vec<Node>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Node {
pub address: Multiaddr,
pub public_key: [u8; 32],
}
impl Membership {
pub fn new(remote_nodes: Vec<Node>) -> Self {
Self { remote_nodes }
}
pub fn choose_nodes<R: Rng>(&self, rng: &mut R, amount: usize) -> Vec<&Node> {
self.remote_nodes.choose_multiple(rng, amount).collect()
}
}

View File

@ -1,32 +1,60 @@
use nomos_mix_message::{new_message, unwrap_message};
use nomos_mix_message::MessageBuilder;
use rand::SeedableRng;
use rand_chacha::ChaCha12Rng;
use serde::{Deserialize, Serialize};
use crate::{error::Error, membership::Membership};
/// [`CryptographicProcessor`] is responsible for wrapping and unwrapping messages
/// for the message indistinguishability.
pub(crate) struct CryptographicProcessor {
settings: CryptographicProcessorSettings,
message_builder: MessageBuilder,
membership: Membership,
rng: ChaCha12Rng,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CryptographicProcessorSettings {
pub num_mix_layers: usize,
pub max_num_mix_layers: usize,
pub max_payload_size: usize,
pub private_key: [u8; 32],
}
impl CryptographicProcessor {
pub(crate) fn new(settings: CryptographicProcessorSettings) -> Self {
Self { settings }
pub(crate) fn new(
settings: CryptographicProcessorSettings,
message_builder: MessageBuilder,
membership: Membership,
) -> Self {
Self {
settings,
message_builder,
membership,
rng: ChaCha12Rng::from_entropy(),
}
}
pub(crate) fn wrap_message(&self, message: &[u8]) -> Result<Vec<u8>, nomos_mix_message::Error> {
// TODO: Use the actual Sphinx encoding instead of mock.
// TODO: Select `num_mix_layers` random nodes from the membership.
new_message(message, self.settings.num_mix_layers.try_into().unwrap())
pub(crate) fn wrap_message(&mut self, message: &[u8]) -> Result<Vec<u8>, Error> {
let public_keys = self.choose_public_keys(self.settings.num_mix_layers);
if public_keys.len() < self.settings.num_mix_layers {
return Err(Error::NotSufficientNodes);
}
Ok(self.message_builder.new_message(public_keys, message)?)
}
pub(crate) fn unwrap_message(
&self,
message: &[u8],
) -> Result<(Vec<u8>, bool), nomos_mix_message::Error> {
unwrap_message(message)
pub(crate) fn unwrap_message(&self, message: &[u8]) -> Result<(Vec<u8>, bool), Error> {
Ok(self
.message_builder
.unpack_message(message, self.settings.private_key)?)
}
fn choose_public_keys(&mut self, amount: usize) -> Vec<[u8; 32]> {
self.membership
.choose_nodes(&mut self.rng, amount)
.iter()
.map(|node| node.public_key)
.collect()
}
}

View File

@ -3,12 +3,17 @@ mod temporal;
pub use crypto::CryptographicProcessorSettings;
use futures::StreamExt;
use nomos_mix_message::MessageBuilder;
pub use temporal::TemporalProcessorSettings;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use crate::message_blend::{crypto::CryptographicProcessor, temporal::TemporalProcessor};
use crate::{
error::Error,
membership::Membership,
message_blend::{crypto::CryptographicProcessor, temporal::TemporalProcessor},
};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MessageBlendSettings {
@ -38,6 +43,8 @@ pub struct MessageBlend {
impl MessageBlend {
pub fn new(
settings: MessageBlendSettings,
message_builder: MessageBuilder,
membership: Membership,
new_message_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
inbound_message_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
outbound_message_sender: mpsc::UnboundedSender<Vec<u8>>,
@ -48,7 +55,11 @@ impl MessageBlend {
inbound_message_receiver,
outbound_message_sender,
fully_unwrapped_message_sender,
cryptographic_processor: CryptographicProcessor::new(settings.cryptographic_processor),
cryptographic_processor: CryptographicProcessor::new(
settings.cryptographic_processor,
message_builder,
membership,
),
temporal_processor: TemporalProcessor::<_>::new(settings.temporal_processor),
}
}
@ -93,7 +104,7 @@ impl MessageBlend {
fully_unwrapped,
});
}
Err(nomos_mix_message::Error::MsgUnwrapNotAllowed) => {
Err(Error::MixMessageError(nomos_mix_message::Error::MsgUnwrapNotAllowed)) => {
tracing::debug!("Message cannot be unwrapped by this node");
}
Err(e) => {

View File

@ -1,5 +1,4 @@
use futures::Stream;
use nomos_mix_message::DROP_MESSAGE;
use rand::{distributions::Uniform, prelude::Distribution, Rng, SeedableRng};
use rand_chacha::ChaCha12Rng;
use serde::{Deserialize, Serialize};
@ -35,6 +34,7 @@ where
{
interval: Interval,
coin: Coin<ChaCha12Rng>,
drop_message: Vec<u8>,
stream: S,
}
@ -44,6 +44,7 @@ where
{
pub fn new(
settings: PersistentTransmissionSettings,
drop_message: Vec<u8>,
stream: S,
) -> PersistentTransmissionStream<S> {
let interval = time::interval(Duration::from_secs_f64(
@ -57,6 +58,7 @@ where
Self {
interval,
coin,
drop_message,
stream,
}
}
@ -73,6 +75,7 @@ where
ref mut interval,
ref mut stream,
ref mut coin,
ref drop_message,
..
} = self.get_mut();
if pin!(interval).poll_tick(cx).is_pending() {
@ -81,7 +84,7 @@ where
if let Poll::Ready(Some(item)) = pin!(stream).poll_next(cx) {
Poll::Ready(Some(item))
} else if coin.flip() {
Poll::Ready(Some(DROP_MESSAGE.to_vec()))
Poll::Ready(Some(drop_message.clone()))
} else {
Poll::Pending
}
@ -92,11 +95,12 @@ pub trait PersistentTransmissionExt: Stream {
fn persistent_transmission(
self,
settings: PersistentTransmissionSettings,
drop_message: Vec<u8>,
) -> PersistentTransmissionStream<Self>
where
Self: Sized + Unpin,
{
PersistentTransmissionStream::new(settings, self)
PersistentTransmissionStream::new(settings, drop_message, self)
}
}
@ -111,6 +115,7 @@ impl<S> PersistentTransmissionExt for S where S: Stream {}
/// * `emission_sender` - The channel to emit messages
pub async fn persistent_transmission(
settings: PersistentTransmissionSettings,
drop_message: Vec<u8>,
schedule_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
emission_sender: mpsc::UnboundedSender<Vec<u8>>,
) {
@ -138,7 +143,7 @@ pub async fn persistent_transmission(
Err(TryRecvError::Empty) => {
// If the coin is head, emit the drop message.
if coin.flip() {
if let Err(e) = emission_sender.send(DROP_MESSAGE.to_vec()) {
if let Err(e) = emission_sender.send(drop_message.clone()) {
tracing::error!(
"Failed to send drop message to the transmission channel: {e:?}"
);
@ -228,8 +233,10 @@ mod tests {
let upper_bound = expected_emission_interval + torelance;
// Start the persistent transmission and schedule messages
let drop_message = vec![0];
tokio::spawn(persistent_transmission(
settings,
drop_message.clone(),
schedule_receiver,
emission_sender,
));
@ -248,16 +255,10 @@ mod tests {
assert_eq!(emission_receiver.recv().await.unwrap(), vec![3]);
assert_interval!(&mut last_time, lower_bound, upper_bound);
assert_eq!(
emission_receiver.recv().await.unwrap(),
DROP_MESSAGE.to_vec()
);
assert_eq!(emission_receiver.recv().await.unwrap(), drop_message);
assert_interval!(&mut last_time, lower_bound, upper_bound);
assert_eq!(
emission_receiver.recv().await.unwrap(),
DROP_MESSAGE.to_vec()
);
assert_eq!(emission_receiver.recv().await.unwrap(), drop_message);
assert_interval!(&mut last_time, lower_bound, upper_bound);
// Schedule a new message and check if it is emitted at the next interval
@ -282,7 +283,9 @@ mod tests {
let lower_bound = expected_emission_interval - torelance;
let upper_bound = expected_emission_interval + torelance;
// prepare stream
let mut persistent_transmission_stream = stream.persistent_transmission(settings);
let drop_message = vec![0];
let mut persistent_transmission_stream =
stream.persistent_transmission(settings, drop_message.clone());
// Messages must be scheduled in non-blocking manner.
schedule_sender.send(vec![1]).unwrap();
schedule_sender.send(vec![2]).unwrap();
@ -309,13 +312,13 @@ mod tests {
assert_eq!(
persistent_transmission_stream.next().await.unwrap(),
DROP_MESSAGE.to_vec()
drop_message
);
assert_interval!(&mut last_time, lower_bound, upper_bound);
assert_eq!(
persistent_transmission_stream.next().await.unwrap(),
DROP_MESSAGE.to_vec()
drop_message
);
assert_interval!(&mut last_time, lower_bound, upper_bound);

View File

@ -5,66 +5,106 @@ mod routing;
pub use error::Error;
use sha2::{Digest, Sha256};
use packet::{Packet, UnpackedPacket};
pub const MSG_SIZE: usize = 2048;
pub const DROP_MESSAGE: [u8; MSG_SIZE] = [0; MSG_SIZE];
// TODO: Remove all the mock below once the actual implementation is integrated to the system.
//
/// A mock implementation of the Sphinx encoding.
///
/// The length of the encoded message is fixed to [`MSG_SIZE`] bytes.
/// The first byte of the encoded message is the number of remaining layers to be unwrapped.
/// The remaining bytes are the payload that is zero-padded to the end.
pub fn new_message(payload: &[u8], num_layers: u8) -> Result<Vec<u8>, Error> {
if payload.len() > MSG_SIZE - 1 {
return Err(Error::PayloadTooLarge);
pub struct MessageBuilder {
max_layers: usize,
max_payload_size: usize,
drop_message: Vec<u8>,
}
let mut message: Vec<u8> = Vec::with_capacity(MSG_SIZE);
message.push(num_layers);
message.extend(payload);
message.extend(std::iter::repeat(0).take(MSG_SIZE - message.len()));
Ok(message)
#[repr(u8)]
pub enum MessageFlag {
Drop = 0x00,
Data = 0x01,
}
/// SHA-256 hash of the message
pub fn message_id(message: &[u8]) -> Vec<u8> {
let mut hasher = Sha256::new();
hasher.update(message);
hasher.finalize().to_vec()
impl MessageBuilder {
pub fn new(max_layers: usize, max_payload_size: usize) -> Result<Self, Error> {
Ok(Self {
max_layers,
max_payload_size,
drop_message: Self::new_drop_message(max_layers, max_payload_size)?,
})
}
/// Unwrap the message one layer.
///
/// This function returns the unwrapped message and a boolean indicating whether the message was fully unwrapped.
/// (False if the message still has layers to be unwrapped, true otherwise)
///
/// If the input message was already fully unwrapped, or if ititss format is invalid,
/// this function returns `[Error::InvalidMixMessage]`.
pub fn unwrap_message(message: &[u8]) -> Result<(Vec<u8>, bool), Error> {
if message.is_empty() {
pub fn new_message(
&self,
recipient_pubkeys: Vec<[u8; 32]>,
payload: &[u8],
) -> Result<Vec<u8>, Error> {
let recipient_pubkeys = recipient_pubkeys
.into_iter()
.map(x25519_dalek::PublicKey::from)
.collect::<Vec<_>>();
let packet = Packet::build(
&recipient_pubkeys,
self.max_layers,
payload,
self.max_payload_size,
)?;
Ok(Self::concat_flag(MessageFlag::Data, &packet.to_bytes()))
}
fn new_drop_message(max_layers: usize, max_payload_size: usize) -> Result<Vec<u8>, Error> {
let dummy_packet = Packet::build(
&[x25519_dalek::PublicKey::from(
&x25519_dalek::EphemeralSecret::random(),
)],
max_layers,
&[0u8; 1],
max_payload_size,
)?;
Ok(Self::concat_flag(
MessageFlag::Drop,
&dummy_packet.to_bytes(),
))
}
pub fn drop_message(&self) -> Vec<u8> {
self.drop_message.clone()
}
pub fn is_drop_message(message: &[u8]) -> bool {
Self::check_flag(MessageFlag::Drop, message).is_ok()
}
pub fn unpack_message(
&self,
message: &[u8],
private_key: [u8; 32],
) -> Result<(Vec<u8>, bool), Error> {
let message = Self::check_flag(MessageFlag::Data, message)?;
let packet = Packet::from_bytes(message, self.max_layers)?;
let private_key = x25519_dalek::StaticSecret::from(private_key);
Ok(match packet.unpack(&private_key, self.max_layers)? {
UnpackedPacket::ToForward(m) => {
(Self::concat_flag(MessageFlag::Data, &m.to_bytes()), false)
}
UnpackedPacket::FullyUnpacked(m) => (m, true),
})
}
pub fn message_size(&self) -> usize {
self.drop_message.len()
}
fn concat_flag(flag: MessageFlag, bytes: &[u8]) -> Vec<u8> {
concat_bytes(&[&[flag as u8], bytes])
}
fn check_flag(flag: MessageFlag, message: &[u8]) -> Result<&[u8], Error> {
if message.first() != Some(&(flag as u8)) {
return Err(Error::InvalidMixMessage);
}
match message[0] {
0 => Err(Error::InvalidMixMessage),
1 => Ok((message[1..].to_vec(), true)),
n => {
let mut unwrapped: Vec<u8> = Vec::with_capacity(message.len());
unwrapped.push(n - 1);
unwrapped.extend(&message[1..]);
Ok((unwrapped, false))
if message.len() == 1 {
Ok(&[])
} else {
Ok(&message[1..])
}
}
}
/// Check if the message is a drop message.
pub fn is_drop_message(message: &[u8]) -> bool {
message == DROP_MESSAGE
}
pub(crate) fn concat_bytes(bytes_list: &[&[u8]]) -> Vec<u8> {
let mut buf = Vec::with_capacity(bytes_list.iter().map(|bytes| bytes.len()).sum());
bytes_list
@ -87,3 +127,20 @@ pub(crate) fn parse_bytes<'a>(data: &'a [u8], sizes: &[usize]) -> Result<Vec<&'a
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn message_builder() {
let builder = MessageBuilder::new(5, 2048).unwrap();
let drop_message = builder.drop_message();
assert_eq!(drop_message.len(), builder.message_size());
let data_message = builder.new_message(vec![[1u8; 32]], &[10u8; 100]).unwrap();
assert!(data_message != drop_message);
assert_eq!(data_message.len(), builder.message_size());
}
}

View File

@ -9,6 +9,7 @@ futures = "0.3.30"
futures-timer = "3.0.3"
libp2p = "0.53"
tracing = "0.1"
sha2 = "0.10"
nomos-mix = { path = "../core" }
nomos-mix-message = { path = "../message" }

View File

@ -12,7 +12,8 @@ use libp2p::{
},
Multiaddr, PeerId,
};
use nomos_mix_message::{is_drop_message, message_id};
use nomos_mix_message::MessageBuilder;
use sha2::{Digest, Sha256};
use crate::{
error::Error,
@ -61,12 +62,12 @@ impl Behaviour {
/// Publish a message (data or drop) to all connected peers
pub fn publish(&mut self, message: Vec<u8>) -> Result<(), Error> {
if is_drop_message(&message) {
if MessageBuilder::is_drop_message(&message) {
// Bypass deduplication for the drop message
return self.forward_message(message, None);
}
let msg_id = message_id(&message);
let msg_id = Self::message_id(&message);
// If the message was already seen, don't forward it again
if self.duplicate_cache.cache_get(&msg_id).is_some() {
return Ok(());
@ -110,6 +111,12 @@ impl Behaviour {
Ok(())
}
fn message_id(message: &[u8]) -> Vec<u8> {
let mut hasher = Sha256::new();
hasher.update(message);
hasher.finalize().to_vec()
}
fn add_negotiated_peer(&mut self, peer_id: PeerId, connection_id: ConnectionId) -> bool {
tracing::debug!(
"Adding to connected_peers: peer_id:{:?}, connection_id:{:?}",
@ -191,14 +198,14 @@ impl NetworkBehaviour for Behaviour {
// A message was forwarded from the peer.
ToBehaviour::Message(message) => {
// Ignore drop message
if is_drop_message(&message) {
if MessageBuilder::is_drop_message(&message) {
return;
}
// Add the message to the cache. If it was already seen, ignore it.
if self
.duplicate_cache
.cache_set(message_id(&message), ())
.cache_set(Self::message_id(&message), ())
.is_some()
{
return;

View File

@ -13,7 +13,6 @@ use libp2p::{
},
Stream, StreamProtocol,
};
use nomos_mix_message::MSG_SIZE;
use crate::behaviour::Config;
@ -248,15 +247,21 @@ impl ConnectionHandler for MixConnectionHandler {
/// Write a message to the stream
async fn send_msg(mut stream: Stream, msg: Vec<u8>) -> io::Result<Stream> {
let msg_len = (msg.len() as u64).to_be_bytes();
stream.write_all(&msg_len).await?;
stream.write_all(&msg).await?;
stream.flush().await?;
Ok(stream)
}
/// Read a fixed-length message from the stream
// TODO: Consider handling variable-length messages
/// Read a message from the stream
async fn recv_msg(mut stream: Stream) -> io::Result<(Stream, Vec<u8>)> {
let mut buf = vec![0; MSG_SIZE];
let msg_len = {
let mut buf = [0u8; 8];
stream.read_exact(&mut buf).await?;
u64::from_be_bytes(buf) as usize
};
let mut buf = vec![0u8; msg_len];
stream.read_exact(&mut buf).await?;
Ok((stream, buf))
}

View File

@ -14,7 +14,7 @@ mod test {
swarm::{dummy, NetworkBehaviour, SwarmEvent},
Multiaddr, PeerId, Swarm, SwarmBuilder,
};
use nomos_mix_message::MSG_SIZE;
use nomos_mix_message::MessageFlag;
use tokio::select;
use crate::{behaviour::Config, error::Error, Behaviour, Event};
@ -43,7 +43,7 @@ mod test {
// Swamr2 publishes a message.
let task = async {
let msg = vec![1; MSG_SIZE];
let msg = vec![MessageFlag::Data as u8; 1024];
let mut msg_published = false;
let mut publish_try_interval = tokio::time::interval(Duration::from_secs(1));
loop {
@ -98,7 +98,7 @@ mod test {
// Expect all publish attempts to fail with [`Error::NoPeers`]
// because swarm2 doesn't have any peers that support the mix protocol.
let msg = vec![1; MSG_SIZE];
let msg = vec![MessageFlag::Data as u8; 1024];
let mut publish_try_interval = tokio::time::interval(Duration::from_secs(1));
let mut publish_try_count = 0;
loop {

View File

@ -41,6 +41,7 @@ time = "0.3"
[dev-dependencies]
blake2 = { version = "0.10" }
x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] }
[features]
default = ["libp2p"]

View File

@ -1,6 +1,7 @@
use cryptarchia_consensus::LeaderConfig;
// std
use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings;
use nomos_mix::membership::Node;
use nomos_mix::message_blend::{
CryptographicProcessorSettings, MessageBlendSettings, TemporalProcessorSettings,
};
@ -193,7 +194,9 @@ pub fn new_node(
genesis_state: &LedgerState,
time_config: &TimeConfig,
swarm_config: &SwarmConfig,
mix_config: &Libp2pMixBackendSettings,
mix_backend_settings: &Libp2pMixBackendSettings,
mix_private_key: &x25519_dalek::StaticSecret,
mix_membership: Vec<Node>,
db_path: PathBuf,
blobs_dir: &PathBuf,
initial_peers: Vec<Multiaddr>,
@ -210,14 +213,20 @@ pub fn new_node(
},
},
mix: MixConfig {
backend: mix_config.clone(),
backend: mix_backend_settings.clone(),
persistent_transmission: Default::default(),
message_blend: MessageBlendSettings {
cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 },
cryptographic_processor: CryptographicProcessorSettings {
num_mix_layers: 1,
max_num_mix_layers: 1,
max_payload_size: 2048,
private_key: mix_private_key.to_bytes(),
},
temporal_processor: TemporalProcessorSettings {
max_delay_seconds: 2,
},
},
membership: mix_membership,
},
da_network: DaNetworkConfig {
backend: DaNetworkBackendSettings {
@ -308,35 +317,35 @@ pub fn new_node(
.unwrap()
}
pub fn new_mix_configs(listening_addresses: Vec<Multiaddr>) -> Vec<Libp2pMixBackendSettings> {
let mut configs = listening_addresses
pub fn new_mix_configs(
listening_addresses: Vec<Multiaddr>,
) -> (
Vec<(Libp2pMixBackendSettings, x25519_dalek::StaticSecret)>,
Vec<Node>,
) {
let node_settings = listening_addresses
.iter()
.map(|listening_address| Libp2pMixBackendSettings {
.map(|listening_address| {
(
Libp2pMixBackendSettings {
listening_address: listening_address.clone(),
node_key: ed25519::SecretKey::generate(),
membership: Vec::new(),
peering_degree: 1,
},
x25519_dalek::StaticSecret::random(),
)
})
.collect::<Vec<_>>();
let membership = configs
let membership = node_settings
.iter()
.map(|c| {
let peer_id = PeerId::from_public_key(
&ed25519::Keypair::from(c.node_key.clone()).public().into(),
);
c.listening_address
.clone()
.with_p2p(peer_id)
.unwrap_or_else(|orig_addr| orig_addr)
.map(|(backend_setting, private_key)| Node {
address: backend_setting.listening_address.clone(),
public_key: x25519_dalek::PublicKey::from(private_key).to_bytes(),
})
.collect::<Vec<_>>();
configs
.iter_mut()
.for_each(|c| c.membership = membership.clone());
configs
(node_settings, membership)
}
// Client node is only created for asyncroniously interact with nodes in the test.

View File

@ -91,7 +91,7 @@ fn test_indexer() {
port: 7772,
..Default::default()
};
let mix_configs = new_mix_configs(vec![
let (mix_configs, mix_membership) = new_mix_configs(vec![
Multiaddr::from_str("/ip4/127.0.0.1/udp/7781/quic-v1").unwrap(),
Multiaddr::from_str("/ip4/127.0.0.1/udp/7782/quic-v1").unwrap(),
]);
@ -122,7 +122,9 @@ fn test_indexer() {
&genesis_state,
&time_config,
&swarm_config1,
&mix_configs[0],
&mix_configs[0].0,
&mix_configs[0].1,
mix_membership.clone(),
NamedTempFile::new().unwrap().path().to_path_buf(),
&blobs_dir,
vec![node_address(&swarm_config2)],
@ -150,7 +152,9 @@ fn test_indexer() {
&genesis_state,
&time_config,
&swarm_config2,
&mix_configs[1],
&mix_configs[1].0,
&mix_configs[1].1,
mix_membership.clone(),
NamedTempFile::new().unwrap().path().to_path_buf(),
&blobs_dir,
vec![node_address(&swarm_config1)],

View File

@ -70,7 +70,7 @@ fn test_verifier() {
port: 7774,
..Default::default()
};
let mix_configs = new_mix_configs(vec![
let (mix_configs, mix_membership) = new_mix_configs(vec![
Multiaddr::from_str("/ip4/127.0.0.1/udp/7783/quic-v1").unwrap(),
Multiaddr::from_str("/ip4/127.0.0.1/udp/7784/quic-v1").unwrap(),
]);
@ -103,7 +103,9 @@ fn test_verifier() {
&genesis_state,
&time_config,
&swarm_config1,
&mix_configs[0],
&mix_configs[0].0,
&mix_configs[0].1,
mix_membership.clone(),
NamedTempFile::new().unwrap().path().to_path_buf(),
&blobs_dir,
vec![node_address(&swarm_config2)],
@ -131,7 +133,9 @@ fn test_verifier() {
&genesis_state,
&time_config,
&swarm_config2,
&mix_configs[1],
&mix_configs[1].0,
&mix_configs[1].1,
mix_membership.clone(),
NamedTempFile::new().unwrap().path().to_path_buf(),
&blobs_dir,
vec![node_address(&swarm_config1)],

View File

@ -19,6 +19,7 @@ serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1", features = ["macros", "sync"] }
tokio-stream = "0.1"
tracing = "0.1"
x25519-dalek = { version = "2", features = ["static_secrets"] }
[features]
default = []

View File

@ -6,11 +6,11 @@ use libp2p::{
core::transport::ListenerId,
identity::{ed25519, Keypair},
swarm::SwarmEvent,
Multiaddr, PeerId, Swarm, SwarmBuilder, TransportError,
Multiaddr, Swarm, SwarmBuilder, TransportError,
};
use nomos_libp2p::{secret_key_serde, DialError, DialOpts, Protocol};
use nomos_libp2p::{secret_key_serde, DialError, DialOpts};
use nomos_mix::membership::Membership;
use overwatch_rs::overwatch::handle::OverwatchHandle;
use rand::seq::IteratorRandom;
use serde::{Deserialize, Serialize};
use tokio::{
sync::{broadcast, mpsc},
@ -34,7 +34,6 @@ pub struct Libp2pMixBackendSettings {
// A key for deriving PeerId and establishing secure connections (TLS 1.3 by QUIC)
#[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")]
pub node_key: ed25519::SecretKey,
pub membership: Vec<Multiaddr>,
pub peering_degree: usize,
}
@ -44,12 +43,15 @@ const CHANNEL_SIZE: usize = 64;
impl MixBackend for Libp2pMixBackend {
type Settings = Libp2pMixBackendSettings;
fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self {
fn new(
config: Self::Settings,
overwatch_handle: OverwatchHandle,
membership: Membership,
) -> Self {
let (swarm_message_sender, swarm_message_receiver) = mpsc::channel(CHANNEL_SIZE);
let (incoming_message_sender, _) = broadcast::channel(CHANNEL_SIZE);
let keypair = Keypair::from(ed25519::Keypair::from(config.node_key.clone()));
let local_peer_id = keypair.public().to_peer_id();
let mut swarm = MixSwarm::new(
keypair,
swarm_message_receiver,
@ -63,20 +65,12 @@ impl MixBackend for Libp2pMixBackend {
});
// Randomly select peering_degree number of peers, and dial to them
// TODO: Consider moving the peer seelction to the nomos_mix_network::Behaviour
config
.membership
membership
.choose_nodes(&mut rand::thread_rng(), config.peering_degree)
.iter()
.filter(|addr| match extract_peer_id(addr) {
Some(peer_id) => peer_id != local_peer_id,
None => false,
})
.choose_multiple(&mut rand::thread_rng(), config.peering_degree)
.iter()
.cloned()
.for_each(|addr| {
if let Err(e) = swarm.dial(addr.clone()) {
tracing::error!("failed to dial to {:?}: {:?}", addr, e);
.for_each(|node| {
if let Err(e) = swarm.dial(node.address.clone()) {
tracing::error!("failed to dial to {:?}: {:?}", node.address, e);
}
});
@ -193,13 +187,3 @@ impl MixSwarm {
}
}
}
fn extract_peer_id(multiaddr: &Multiaddr) -> Option<PeerId> {
multiaddr.iter().find_map(|protocol| {
if let Protocol::P2p(peer_id) = protocol {
Some(peer_id)
} else {
None
}
})
}

View File

@ -4,6 +4,7 @@ pub mod libp2p;
use std::{fmt::Debug, pin::Pin};
use futures::Stream;
use nomos_mix::membership::Membership;
use overwatch_rs::overwatch::handle::OverwatchHandle;
/// A trait for mix backends that send messages to the mix network.
@ -11,7 +12,11 @@ use overwatch_rs::overwatch::handle::OverwatchHandle;
pub trait MixBackend {
type Settings: Clone + Debug + Send + Sync + 'static;
fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self;
fn new(
config: Self::Settings,
overwatch_handle: OverwatchHandle,
membership: Membership,
) -> Self;
/// Publish a message to the mix network.
async fn publish(&self, msg: Vec<u8>);
/// Listen to messages received from the mix network.

View File

@ -9,9 +9,11 @@ use futures::StreamExt;
use network::NetworkAdapter;
use nomos_core::wire;
use nomos_mix::{
message_blend::{MessageBlend, MessageBlendSettings},
membership::{Membership, Node},
message_blend::{CryptographicProcessorSettings, MessageBlend, MessageBlendSettings},
persistent_transmission::{persistent_transmission, PersistentTransmissionSettings},
};
use nomos_mix_message::MessageBuilder;
use nomos_network::NetworkService;
use overwatch_rs::services::{
handle::ServiceStateHandle,
@ -38,6 +40,7 @@ where
backend: Backend,
service_state: ServiceStateHandle<Self>,
network_relay: Relay<NetworkService<Network::Backend>>,
membership: Membership,
}
impl<Backend, Network> ServiceData for MixService<Backend, Network>
@ -64,14 +67,18 @@ where
Clone + Debug + Serialize + DeserializeOwned + Send + Sync + 'static,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let mix_config = service_state.settings_reader.get_updated_settings();
let membership = mix_config.membership();
let network_relay = service_state.overwatch_handle.relay();
Ok(Self {
backend: <Backend as MixBackend>::new(
service_state.settings_reader.get_updated_settings().backend,
service_state.overwatch_handle.clone(),
membership.clone(),
),
service_state,
network_relay,
membership,
})
}
@ -80,19 +87,30 @@ where
mut service_state,
mut backend,
network_relay,
membership,
} = self;
let mix_config = service_state.settings_reader.get_updated_settings();
let network_relay = network_relay.connect().await?;
let network_adapter = Network::new(network_relay);
// Create message builder used in all tiers.
let CryptographicProcessorSettings {
max_num_mix_layers,
max_payload_size,
..
} = mix_config.message_blend.cryptographic_processor;
let message_builder = MessageBuilder::new(max_num_mix_layers, max_payload_size).unwrap();
// Spawn Persistent Transmission
let (transmission_schedule_sender, transmission_schedule_receiver) =
mpsc::unbounded_channel();
let (emission_sender, mut emission_receiver) = mpsc::unbounded_channel();
let drop_message = message_builder.drop_message();
tokio::spawn(async move {
persistent_transmission(
mix_config.persistent_transmission,
drop_message,
transmission_schedule_receiver,
emission_sender,
)
@ -107,6 +125,8 @@ where
tokio::spawn(async move {
MessageBlend::new(
mix_config.message_blend,
message_builder,
membership.clone(),
new_message_receiver,
processor_inbound_receiver,
// Connect the outputs of Message Blend to Persistent Transmission
@ -200,6 +220,24 @@ pub struct MixConfig<BackendSettings> {
pub backend: BackendSettings,
pub persistent_transmission: PersistentTransmissionSettings,
pub message_blend: MessageBlendSettings,
pub membership: Vec<Node>,
}
impl<BackendSettings> MixConfig<BackendSettings> {
// TODO: This step can be redesigned once we can load membership info from the chain state.
fn membership(&self) -> Membership {
let local_public_key = x25519_dalek::PublicKey::from(&x25519_dalek::StaticSecret::from(
self.message_blend.cryptographic_processor.private_key,
))
.to_bytes();
let remote_nodes = self
.membership
.iter()
.filter(|node| node.public_key != local_public_key)
.cloned()
.collect();
Membership::new(remote_nodes)
}
}
/// A message that is handled by [`MixService`].

View File

@ -9,6 +9,7 @@ clap = { version = "4", features = ["derive"] }
nomos-executor = { path = "../../nodes/nomos-executor" }
nomos-libp2p = { path = "../../nomos-libp2p" }
nomos-node = { path = "../../nodes/nomos-node" }
nomos-mix = { path = "../../nomos-mix/core" }
nomos-tracing = { path = "../../nomos-tracing" }
nomos-tracing-service = { path = "../../nomos-services/tracing" }
rand = "0.8"

View File

@ -1,7 +1,8 @@
// std
use std::{collections::HashMap, net::Ipv4Addr, str::FromStr};
// crates
use nomos_libp2p::{Multiaddr, PeerId, Protocol};
use nomos_libp2p::{Multiaddr, PeerId};
use nomos_mix::membership::Node;
use nomos_tracing::{logging::loki::LokiConfig, tracing::otlp::OtlpTracingConfig};
use nomos_tracing_service::{FilterLayer, LoggerLayer, TracingSettings};
use rand::{thread_rng, Rng};
@ -91,7 +92,7 @@ pub fn create_node_configs(
let host_network_init_peers = update_network_init_peers(hosts.clone());
let host_da_peer_addresses = update_da_peer_addresses(hosts.clone(), peer_addresses);
let host_mix_membership =
update_mix_membership(hosts.clone(), mix_configs[0].backend.membership.clone());
update_mix_membership(hosts.clone(), mix_configs[0].membership.clone());
let new_peer_addresses: HashMap<PeerId, Multiaddr> = host_da_peer_addresses
.clone()
@ -122,7 +123,7 @@ pub fn create_node_configs(
let mut mix_config = mix_configs[i].to_owned();
mix_config.backend.listening_address =
Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/{}/quic-v1", host.mix_port)).unwrap();
mix_config.backend.membership = host_mix_membership.clone();
mix_config.membership = host_mix_membership.clone();
// Tracing config.
let tracing_config =
@ -170,32 +171,19 @@ fn update_da_peer_addresses(
.collect()
}
fn update_mix_membership(hosts: Vec<Host>, membership: Vec<Multiaddr>) -> Vec<Multiaddr> {
fn update_mix_membership(hosts: Vec<Host>, membership: Vec<Node>) -> Vec<Node> {
membership
.into_iter()
.zip(hosts)
.map(|(addr, host)| {
Multiaddr::from_str(&format!(
"/ip4/{}/udp/{}/quic-v1/p2p/{}",
host.ip,
host.mix_port,
extract_peer_id(&addr).unwrap(),
))
.unwrap()
.map(|(mut node, host)| {
node.address =
Multiaddr::from_str(&format!("/ip4/{}/udp/{}/quic-v1", host.ip, host.mix_port))
.unwrap();
node
})
.collect()
}
fn extract_peer_id(multiaddr: &Multiaddr) -> Option<PeerId> {
multiaddr.iter().find_map(|protocol| {
if let Protocol::P2p(peer_id) = protocol {
Some(peer_id)
} else {
None
}
})
}
fn tracing_config_for_grafana(params: TracingParams, identifier: String) -> GeneralTracingConfig {
GeneralTracingConfig {
tracing_settings: TracingSettings {

View File

@ -50,6 +50,7 @@ criterion = { version = "0.5", features = ["async_tokio"] }
nomos-cli = { path = "../nomos-cli" }
time = "0.3"
tracing = "0.1"
x25519-dalek = { version = "2.0.1", features = ["getrandom", "static_secrets"] }
[[test]]
name = "test_cryptarchia_happy_path"

View File

@ -159,11 +159,17 @@ pub fn create_executor_config(config: GeneralConfig) -> Config {
backend: config.mix_config.backend,
persistent_transmission: Default::default(),
message_blend: MessageBlendSettings {
cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 },
cryptographic_processor: CryptographicProcessorSettings {
num_mix_layers: 1,
max_num_mix_layers: 1,
max_payload_size: 2048,
private_key: config.mix_config.private_key.to_bytes(),
},
temporal_processor: TemporalProcessorSettings {
max_delay_seconds: 2,
},
},
membership: config.mix_config.membership,
},
cryptarchia: CryptarchiaSettings {
leader_config: config.consensus_config.leader_config,

View File

@ -244,11 +244,17 @@ pub fn create_validator_config(config: GeneralConfig) -> Config {
backend: config.mix_config.backend,
persistent_transmission: Default::default(),
message_blend: MessageBlendSettings {
cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 },
cryptographic_processor: CryptographicProcessorSettings {
num_mix_layers: 1,
max_num_mix_layers: 1,
max_payload_size: 2048,
private_key: config.mix_config.private_key.to_bytes(),
},
temporal_processor: TemporalProcessorSettings {
max_delay_seconds: 2,
},
},
membership: config.mix_config.membership,
},
cryptarchia: CryptarchiaSettings {
leader_config: config.consensus_config.leader_config,

View File

@ -1,17 +1,20 @@
use std::str::FromStr;
use nomos_libp2p::{ed25519, Multiaddr};
use nomos_mix::membership::Node;
use nomos_mix_service::backends::libp2p::Libp2pMixBackendSettings;
use crate::{get_available_port, secret_key_to_peer_id};
use crate::get_available_port;
#[derive(Clone)]
pub struct GeneralMixConfig {
pub backend: Libp2pMixBackendSettings,
pub private_key: x25519_dalek::StaticSecret,
pub membership: Vec<Node>,
}
pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec<GeneralMixConfig> {
let mut configs: Vec<GeneralMixConfig> = ids
let mut configs = ids
.iter()
.map(|id| {
let mut node_key_bytes = *id;
@ -26,33 +29,28 @@ pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec<GeneralMixConfig> {
))
.unwrap(),
node_key,
membership: Vec::new(),
peering_degree: 1,
},
private_key: x25519_dalek::StaticSecret::random(),
membership: Vec::new(),
}
})
.collect();
.collect::<Vec<_>>();
let membership = mix_membership(&configs);
configs.iter_mut().for_each(|config| {
config.backend.membership = membership.clone();
});
configs
.iter_mut()
.for_each(|config| config.membership = membership.clone());
configs
}
fn mix_membership(configs: &[GeneralMixConfig]) -> Vec<Multiaddr> {
fn mix_membership(configs: &[GeneralMixConfig]) -> Vec<Node> {
configs
.iter()
.map(|config| {
let peer_id = secret_key_to_peer_id(config.backend.node_key.clone());
config
.backend
.listening_address
.clone()
.with_p2p(peer_id)
.unwrap_or_else(|orig_addr| orig_addr)
.map(|config| Node {
address: config.backend.listening_address.clone(),
public_key: x25519_dalek::PublicKey::from(&config.private_key).to_bytes(),
})
.collect()
}