Mix: Implement PersistsentTransmission & Temporal/CryptoProcessor and use them in MixService
(#844)
* forward msgs immediately without any processing * Mix: Offload transmission rate and message processing from libp2p behaviour/handler * Mix: Core skeleton used in `MixService` * rename Processor to MessageBlend * Mix: Implement Persistent Transmission (Tier 1) (#845) * Mix: Add Persistent Transmission (Tier 1) * add test * define Coin struct with Uniform distribution for fast repeated sampling * use ChaCha12Rng for Coin * improve comment * Mix: Implement Temporal Processor (Tier 2) (#846) * Mix: Add Persistent Transmission (Tier 1) * Mix: Implement TemporalProcessor * use pub(crate) * Mix: Use TemporalProcessor in MessageBlend (#847) * Mix: Add Persistent Transmission (Tier 1) * Mix: Implement TemporalProcessor * Mix: Use TemporalProcessor in MessageBlend * remove duplicate members in Cargo.toml
This commit is contained in:
parent
bebb15f921
commit
7aea30132d
@ -25,9 +25,9 @@ members = [
|
|||||||
"nomos-services/data-availability/dispersal",
|
"nomos-services/data-availability/dispersal",
|
||||||
"nomos-services/data-availability/tests",
|
"nomos-services/data-availability/tests",
|
||||||
"nomos-services/mix",
|
"nomos-services/mix",
|
||||||
|
"nomos-mix/core",
|
||||||
"nomos-mix/message",
|
"nomos-mix/message",
|
||||||
"nomos-mix/network",
|
"nomos-mix/network",
|
||||||
"nomos-mix/queue",
|
|
||||||
"nomos-tracing",
|
"nomos-tracing",
|
||||||
"nomos-cli",
|
"nomos-cli",
|
||||||
"nomos-utils",
|
"nomos-utils",
|
||||||
|
@ -271,8 +271,8 @@ pub fn update_mix(
|
|||||||
mix.backend.peering_degree = peering_degree;
|
mix.backend.peering_degree = peering_degree;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(_num_mix_layers) = mix_num_mix_layers {
|
if let Some(num_mix_layers) = mix_num_mix_layers {
|
||||||
// TODO: Set num_mix_layers to the proper module setting
|
mix.message_blend.cryptographic_processor.num_mix_layers = num_mix_layers;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
14
nomos-mix/core/Cargo.toml
Normal file
14
nomos-mix/core/Cargo.toml
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
[package]
|
||||||
|
name = "nomos-mix"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
cached = "0.53"
|
||||||
|
tokio = { version = "1" }
|
||||||
|
tracing = "0.1"
|
||||||
|
rand = "0.8"
|
||||||
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
nomos-mix-message = { path = "../message" }
|
||||||
|
futures = "0.3"
|
||||||
|
rand_chacha = "0.3"
|
2
nomos-mix/core/src/lib.rs
Normal file
2
nomos-mix/core/src/lib.rs
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
pub mod message_blend;
|
||||||
|
pub mod persistent_transmission;
|
32
nomos-mix/core/src/message_blend/crypto.rs
Normal file
32
nomos-mix/core/src/message_blend/crypto.rs
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
use nomos_mix_message::{new_message, unwrap_message};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
/// [`CryptographicProcessor`] is responsible for wrapping and unwrapping messages
|
||||||
|
/// for the message indistinguishability.
|
||||||
|
pub(crate) struct CryptographicProcessor {
|
||||||
|
settings: CryptographicProcessorSettings,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct CryptographicProcessorSettings {
|
||||||
|
pub num_mix_layers: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CryptographicProcessor {
|
||||||
|
pub(crate) fn new(settings: CryptographicProcessorSettings) -> Self {
|
||||||
|
Self { settings }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn wrap_message(&self, message: &[u8]) -> Result<Vec<u8>, nomos_mix_message::Error> {
|
||||||
|
// TODO: Use the actual Sphinx encoding instead of mock.
|
||||||
|
// TODO: Select `num_mix_layers` random nodes from the membership.
|
||||||
|
new_message(message, self.settings.num_mix_layers.try_into().unwrap())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn unwrap_message(
|
||||||
|
&self,
|
||||||
|
message: &[u8],
|
||||||
|
) -> Result<(Vec<u8>, bool), nomos_mix_message::Error> {
|
||||||
|
unwrap_message(message)
|
||||||
|
}
|
||||||
|
}
|
122
nomos-mix/core/src/message_blend/mod.rs
Normal file
122
nomos-mix/core/src/message_blend/mod.rs
Normal file
@ -0,0 +1,122 @@
|
|||||||
|
mod crypto;
|
||||||
|
mod temporal;
|
||||||
|
|
||||||
|
pub use crypto::CryptographicProcessorSettings;
|
||||||
|
use futures::StreamExt;
|
||||||
|
pub use temporal::TemporalProcessorSettings;
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
|
use crate::message_blend::{crypto::CryptographicProcessor, temporal::TemporalProcessor};
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct MessageBlendSettings {
|
||||||
|
pub cryptographic_processor: CryptographicProcessorSettings,
|
||||||
|
pub temporal_processor: TemporalProcessorSettings,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// [`MessageBlend`] handles the entire Tier-2 spec.
|
||||||
|
/// - Wraps new messages using [`CryptographicProcessor`]
|
||||||
|
/// - Unwraps incoming messages received from network using [`CryptographicProcessor`]
|
||||||
|
/// - Pushes unwrapped messages to [`TemporalProcessor`]
|
||||||
|
/// - Releases messages returned by [`TemporalProcessor`] to the proper channel
|
||||||
|
pub struct MessageBlend {
|
||||||
|
/// To receive new messages originated from this node
|
||||||
|
new_message_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
|
||||||
|
/// To receive incoming messages from the network
|
||||||
|
inbound_message_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
|
||||||
|
/// To release messages that are successfully processed but still wrapped
|
||||||
|
outbound_message_sender: mpsc::UnboundedSender<Vec<u8>>,
|
||||||
|
/// To release fully unwrapped messages
|
||||||
|
fully_unwrapped_message_sender: mpsc::UnboundedSender<Vec<u8>>,
|
||||||
|
/// Processors
|
||||||
|
cryptographic_processor: CryptographicProcessor,
|
||||||
|
temporal_processor: TemporalProcessor<TemporalProcessableMessage>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MessageBlend {
|
||||||
|
pub fn new(
|
||||||
|
settings: MessageBlendSettings,
|
||||||
|
new_message_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
|
||||||
|
inbound_message_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
|
||||||
|
outbound_message_sender: mpsc::UnboundedSender<Vec<u8>>,
|
||||||
|
fully_unwrapped_message_sender: mpsc::UnboundedSender<Vec<u8>>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
new_message_receiver,
|
||||||
|
inbound_message_receiver,
|
||||||
|
outbound_message_sender,
|
||||||
|
fully_unwrapped_message_sender,
|
||||||
|
cryptographic_processor: CryptographicProcessor::new(settings.cryptographic_processor),
|
||||||
|
temporal_processor: TemporalProcessor::<_>::new(settings.temporal_processor),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run(&mut self) {
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
Some(new_message) = self.new_message_receiver.recv() => {
|
||||||
|
self.handle_new_message(new_message);
|
||||||
|
}
|
||||||
|
Some(incoming_message) = self.inbound_message_receiver.recv() => {
|
||||||
|
self.handle_incoming_message(incoming_message);
|
||||||
|
}
|
||||||
|
Some(msg) = self.temporal_processor.next() => {
|
||||||
|
self.release_temporal_processed_message(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_new_message(&mut self, message: Vec<u8>) {
|
||||||
|
match self.cryptographic_processor.wrap_message(&message) {
|
||||||
|
Ok(wrapped_message) => {
|
||||||
|
// Bypass Temporal Processor, and send the message to the outbound channel directly
|
||||||
|
// because the message is originated from this node.
|
||||||
|
if let Err(e) = self.outbound_message_sender.send(wrapped_message) {
|
||||||
|
tracing::error!("Failed to send message to the outbound channel: {e:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to wrap message: {:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_incoming_message(&mut self, message: Vec<u8>) {
|
||||||
|
match self.cryptographic_processor.unwrap_message(&message) {
|
||||||
|
Ok((unwrapped_message, fully_unwrapped)) => {
|
||||||
|
self.temporal_processor
|
||||||
|
.push_message(TemporalProcessableMessage {
|
||||||
|
message: unwrapped_message,
|
||||||
|
fully_unwrapped,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(nomos_mix_message::Error::MsgUnwrapNotAllowed) => {
|
||||||
|
tracing::debug!("Message cannot be unwrapped by this node");
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to unwrap message: {:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn release_temporal_processed_message(&mut self, message: TemporalProcessableMessage) {
|
||||||
|
if message.fully_unwrapped {
|
||||||
|
if let Err(e) = self.fully_unwrapped_message_sender.send(message.message) {
|
||||||
|
tracing::error!(
|
||||||
|
"Failed to send fully unwrapped message to the fully unwrapped channel: {e:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} else if let Err(e) = self.outbound_message_sender.send(message.message) {
|
||||||
|
tracing::error!("Failed to send message to the outbound channel: {e:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct TemporalProcessableMessage {
|
||||||
|
message: Vec<u8>,
|
||||||
|
fully_unwrapped: bool,
|
||||||
|
}
|
99
nomos-mix/core/src/message_blend/temporal.rs
Normal file
99
nomos-mix/core/src/message_blend/temporal.rs
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
use std::{
|
||||||
|
collections::VecDeque,
|
||||||
|
pin::Pin,
|
||||||
|
task::{Context, Poll},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
|
use futures::{Future, Stream};
|
||||||
|
use rand::Rng;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tokio::time;
|
||||||
|
|
||||||
|
/// [`TemporalProcessor`] delays messages randomly to hide timing correlation
|
||||||
|
/// between incoming and outgoing messages from a node.
|
||||||
|
///
|
||||||
|
/// See the [`Stream`] implementation below for more details on how it works.
|
||||||
|
pub(crate) struct TemporalProcessor<M> {
|
||||||
|
settings: TemporalProcessorSettings,
|
||||||
|
// All scheduled messages
|
||||||
|
queue: VecDeque<M>,
|
||||||
|
/// Interval in seconds for running the lottery to release a message
|
||||||
|
lottery_interval: time::Interval,
|
||||||
|
/// To wait a few seconds after running the lottery before releasing the message.
|
||||||
|
/// The lottery returns how long to wait before releasing the message.
|
||||||
|
release_timer: Option<Pin<Box<time::Sleep>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct TemporalProcessorSettings {
|
||||||
|
pub max_delay_seconds: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M> TemporalProcessor<M> {
|
||||||
|
pub(crate) fn new(settings: TemporalProcessorSettings) -> Self {
|
||||||
|
let lottery_interval = Self::lottery_interval(settings.max_delay_seconds);
|
||||||
|
Self {
|
||||||
|
settings,
|
||||||
|
queue: VecDeque::new(),
|
||||||
|
lottery_interval,
|
||||||
|
release_timer: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create [`time::Interval`] for running the lottery to release a message.
|
||||||
|
fn lottery_interval(max_delay_seconds: u64) -> time::Interval {
|
||||||
|
time::interval(Duration::from_secs(Self::lottery_interval_seconds(
|
||||||
|
max_delay_seconds,
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Calculate the interval in seconds for running the lottery.
|
||||||
|
/// The lottery interval is half of the maximum delay,
|
||||||
|
/// in order to guarantee that the interval between two subsequent message emissions
|
||||||
|
/// is at most [`max_delay_seconds`].
|
||||||
|
fn lottery_interval_seconds(max_delay_seconds: u64) -> u64 {
|
||||||
|
max_delay_seconds / 2
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run the lottery to determine the delay before releasing a message.
|
||||||
|
/// The delay is in [0, `lottery_interval_seconds`).
|
||||||
|
fn run_lottery(&self) -> u64 {
|
||||||
|
let interval = Self::lottery_interval_seconds(self.settings.max_delay_seconds);
|
||||||
|
rand::thread_rng().gen_range(0..interval)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Schedule a message to be released later.
|
||||||
|
pub(crate) fn push_message(&mut self, message: M) {
|
||||||
|
self.queue.push_back(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<M> Stream for TemporalProcessor<M>
|
||||||
|
where
|
||||||
|
M: Unpin + Clone + 'static,
|
||||||
|
{
|
||||||
|
type Item = M;
|
||||||
|
|
||||||
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
// Check whether it's time to run a new lottery to determine the delay.
|
||||||
|
if self.lottery_interval.poll_tick(cx).is_ready() {
|
||||||
|
let delay = self.run_lottery();
|
||||||
|
// Set timer to release the message after the delay.
|
||||||
|
self.release_timer = Some(Box::pin(time::sleep(Duration::from_secs(delay))));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check whether the release timer is done if it exists.
|
||||||
|
if let Some(timer) = self.release_timer.as_mut() {
|
||||||
|
if timer.as_mut().poll(cx).is_ready() {
|
||||||
|
self.release_timer.take(); // Reset timer after it's done
|
||||||
|
if let Some(msg) = self.queue.pop_front() {
|
||||||
|
// Release the 1st message in the queue if it exists.
|
||||||
|
return Poll::Ready(Some(msg));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
191
nomos-mix/core/src/persistent_transmission.rs
Normal file
191
nomos-mix/core/src/persistent_transmission.rs
Normal file
@ -0,0 +1,191 @@
|
|||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use nomos_mix_message::DROP_MESSAGE;
|
||||||
|
use rand::{distributions::Uniform, prelude::Distribution, Rng, SeedableRng};
|
||||||
|
use rand_chacha::ChaCha12Rng;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tokio::{
|
||||||
|
sync::mpsc::{self, error::TryRecvError},
|
||||||
|
time,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct PersistentTransmissionSettings {
|
||||||
|
/// The maximum number of messages that can be emitted per second
|
||||||
|
pub max_emission_frequency: f64,
|
||||||
|
/// The probability of emitting a drop message by coin flipping
|
||||||
|
pub drop_message_probability: f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for PersistentTransmissionSettings {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
max_emission_frequency: 1.0,
|
||||||
|
drop_message_probability: 0.5,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Transmit scheduled messages with a persistent rate to the transmission channel.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `settings` - The settings for the persistent transmission
|
||||||
|
/// * `schedule_receiver` - The channel for messages scheduled (from Tier 2 currently)
|
||||||
|
/// * `emission_sender` - The channel to emit messages
|
||||||
|
pub async fn persistent_transmission(
|
||||||
|
settings: PersistentTransmissionSettings,
|
||||||
|
schedule_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
|
||||||
|
emission_sender: mpsc::UnboundedSender<Vec<u8>>,
|
||||||
|
) {
|
||||||
|
let mut schedule_receiver = schedule_receiver;
|
||||||
|
let mut interval = time::interval(Duration::from_secs_f64(
|
||||||
|
1.0 / settings.max_emission_frequency,
|
||||||
|
));
|
||||||
|
let mut coin = Coin::<_>::new(
|
||||||
|
ChaCha12Rng::from_entropy(),
|
||||||
|
settings.drop_message_probability,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
interval.tick().await;
|
||||||
|
|
||||||
|
// Emit the first one of the scheduled messages.
|
||||||
|
// If there is no scheduled message, emit a drop message with probability.
|
||||||
|
match schedule_receiver.try_recv() {
|
||||||
|
Ok(msg) => {
|
||||||
|
if let Err(e) = emission_sender.send(msg) {
|
||||||
|
tracing::error!("Failed to send message to the transmission channel: {e:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(TryRecvError::Empty) => {
|
||||||
|
// If the coin is head, emit the drop message.
|
||||||
|
if coin.flip() {
|
||||||
|
if let Err(e) = emission_sender.send(DROP_MESSAGE.to_vec()) {
|
||||||
|
tracing::error!(
|
||||||
|
"Failed to send drop message to the transmission channel: {e:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(TryRecvError::Disconnected) => {
|
||||||
|
tracing::error!("The schedule channel has been closed");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Coin<R: Rng> {
|
||||||
|
rng: R,
|
||||||
|
distribution: Uniform<f64>,
|
||||||
|
probability: f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: Rng> Coin<R> {
|
||||||
|
fn new(rng: R, probability: f64) -> Result<Self, CoinError> {
|
||||||
|
if !(0.0..=1.0).contains(&probability) {
|
||||||
|
return Err(CoinError::InvalidProbability);
|
||||||
|
}
|
||||||
|
Ok(Self {
|
||||||
|
rng,
|
||||||
|
distribution: Uniform::from(0.0..1.0),
|
||||||
|
probability,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flip the coin based on the given probability.
|
||||||
|
fn flip(&mut self) -> bool {
|
||||||
|
self.distribution.sample(&mut self.rng) < self.probability
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum CoinError {
|
||||||
|
InvalidProbability,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
macro_rules! assert_interval {
|
||||||
|
($last_time:expr, $lower_bound:expr, $upper_bound:expr) => {
|
||||||
|
let now = time::Instant::now();
|
||||||
|
let interval = now.duration_since(*$last_time);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
interval >= $lower_bound,
|
||||||
|
"interval {:?} is too short. lower_bound: {:?}",
|
||||||
|
interval,
|
||||||
|
$lower_bound,
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
interval <= $upper_bound,
|
||||||
|
"interval {:?} is too long. upper_bound: {:?}",
|
||||||
|
interval,
|
||||||
|
$upper_bound,
|
||||||
|
);
|
||||||
|
|
||||||
|
*$last_time = now;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_persistent_transmission() {
|
||||||
|
let (schedule_sender, schedule_receiver) = mpsc::unbounded_channel();
|
||||||
|
let (emission_sender, mut emission_receiver) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
let settings = PersistentTransmissionSettings {
|
||||||
|
max_emission_frequency: 1.0,
|
||||||
|
// Set to always emit drop messages if no scheduled messages for easy testing
|
||||||
|
drop_message_probability: 1.0,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Prepare the expected emission interval with torelance
|
||||||
|
let expected_emission_interval =
|
||||||
|
Duration::from_secs_f64(1.0 / settings.max_emission_frequency);
|
||||||
|
let torelance = expected_emission_interval / 10; // 10% torelance
|
||||||
|
let lower_bound = expected_emission_interval - torelance;
|
||||||
|
let upper_bound = expected_emission_interval + torelance;
|
||||||
|
|
||||||
|
// Start the persistent transmission and schedule messages
|
||||||
|
tokio::spawn(persistent_transmission(
|
||||||
|
settings,
|
||||||
|
schedule_receiver,
|
||||||
|
emission_sender,
|
||||||
|
));
|
||||||
|
// Messages must be scheduled in non-blocking manner.
|
||||||
|
schedule_sender.send(vec![1]).unwrap();
|
||||||
|
schedule_sender.send(vec![2]).unwrap();
|
||||||
|
schedule_sender.send(vec![3]).unwrap();
|
||||||
|
|
||||||
|
// Check if expected messages are emitted with the expected interval
|
||||||
|
assert_eq!(emission_receiver.recv().await.unwrap(), vec![1]);
|
||||||
|
let mut last_time = time::Instant::now();
|
||||||
|
|
||||||
|
assert_eq!(emission_receiver.recv().await.unwrap(), vec![2]);
|
||||||
|
assert_interval!(&mut last_time, lower_bound, upper_bound);
|
||||||
|
|
||||||
|
assert_eq!(emission_receiver.recv().await.unwrap(), vec![3]);
|
||||||
|
assert_interval!(&mut last_time, lower_bound, upper_bound);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
emission_receiver.recv().await.unwrap(),
|
||||||
|
DROP_MESSAGE.to_vec()
|
||||||
|
);
|
||||||
|
assert_interval!(&mut last_time, lower_bound, upper_bound);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
emission_receiver.recv().await.unwrap(),
|
||||||
|
DROP_MESSAGE.to_vec()
|
||||||
|
);
|
||||||
|
assert_interval!(&mut last_time, lower_bound, upper_bound);
|
||||||
|
|
||||||
|
// Schedule a new message and check if it is emitted at the next interval
|
||||||
|
schedule_sender.send(vec![4]).unwrap();
|
||||||
|
assert_eq!(emission_receiver.recv().await.unwrap(), vec![4]);
|
||||||
|
assert_interval!(&mut last_time, lower_bound, upper_bound);
|
||||||
|
}
|
||||||
|
}
|
@ -9,8 +9,8 @@ futures = "0.3.30"
|
|||||||
futures-timer = "3.0.3"
|
futures-timer = "3.0.3"
|
||||||
libp2p = "0.53"
|
libp2p = "0.53"
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
|
nomos-mix = { path = "../core" }
|
||||||
nomos-mix-message = { path = "../message" }
|
nomos-mix-message = { path = "../message" }
|
||||||
nomos-mix-queue = { path = "../queue" }
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
|
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
|
||||||
|
@ -1,7 +0,0 @@
|
|||||||
[package]
|
|
||||||
name = "nomos-mix-queue"
|
|
||||||
version = "0.1.0"
|
|
||||||
edition = "2021"
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
rand = "0.8.5"
|
|
@ -1,39 +0,0 @@
|
|||||||
use std::collections::VecDeque;
|
|
||||||
|
|
||||||
/// A [`Queue`] controls the order of messages to be emitted to a single connection.
|
|
||||||
pub trait Queue<T> {
|
|
||||||
/// Push a message to the queue.
|
|
||||||
fn push(&mut self, data: T);
|
|
||||||
|
|
||||||
/// Pop a message from the queue.
|
|
||||||
///
|
|
||||||
/// The returned message is either the real message pushed before or a noise message.
|
|
||||||
fn pop(&mut self) -> T;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A regular queue that does not mix the order of messages.
|
|
||||||
///
|
|
||||||
/// This queue returns a noise message if the queue is empty.
|
|
||||||
pub struct NonMixQueue<T: Clone> {
|
|
||||||
queue: VecDeque<T>,
|
|
||||||
noise: T,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: Clone> NonMixQueue<T> {
|
|
||||||
pub fn new(noise: T) -> Self {
|
|
||||||
Self {
|
|
||||||
queue: VecDeque::new(),
|
|
||||||
noise,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: Clone> Queue<T> for NonMixQueue<T> {
|
|
||||||
fn push(&mut self, data: T) {
|
|
||||||
self.queue.push_back(data);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn pop(&mut self) -> T {
|
|
||||||
self.queue.pop_front().unwrap_or(self.noise.clone())
|
|
||||||
}
|
|
||||||
}
|
|
@ -24,6 +24,7 @@ nomos-mempool = { path = "../../../nomos-services/mempool" }
|
|||||||
nomos-storage = { path = "../../../nomos-services/storage", features = ["rocksdb-backend"] }
|
nomos-storage = { path = "../../../nomos-services/storage", features = ["rocksdb-backend"] }
|
||||||
nomos-network = { path = "../../network", features = ["mock"] }
|
nomos-network = { path = "../../network", features = ["mock"] }
|
||||||
nomos-mix-service = { path = "../../mix" }
|
nomos-mix-service = { path = "../../mix" }
|
||||||
|
nomos-mix = { path = "../../../nomos-mix/core" }
|
||||||
nomos-libp2p = { path = "../../../nomos-libp2p" }
|
nomos-libp2p = { path = "../../../nomos-libp2p" }
|
||||||
libp2p = { version = "0.53.2", features = ["ed25519"] }
|
libp2p = { version = "0.53.2", features = ["ed25519"] }
|
||||||
once_cell = "1.19"
|
once_cell = "1.19"
|
||||||
|
@ -1,5 +1,8 @@
|
|||||||
// std
|
// std
|
||||||
use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings;
|
use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings;
|
||||||
|
use nomos_mix::message_blend::{
|
||||||
|
CryptographicProcessorSettings, MessageBlendSettings, TemporalProcessorSettings,
|
||||||
|
};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
// crates
|
// crates
|
||||||
@ -207,6 +210,13 @@ pub fn new_node(
|
|||||||
},
|
},
|
||||||
mix: MixConfig {
|
mix: MixConfig {
|
||||||
backend: mix_config.clone(),
|
backend: mix_config.clone(),
|
||||||
|
persistent_transmission: Default::default(),
|
||||||
|
message_blend: MessageBlendSettings {
|
||||||
|
cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 },
|
||||||
|
temporal_processor: TemporalProcessorSettings {
|
||||||
|
max_delay_seconds: 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
da_network: DaNetworkConfig {
|
da_network: DaNetworkConfig {
|
||||||
backend: DaNetworkBackendSettings {
|
backend: DaNetworkBackendSettings {
|
||||||
|
@ -8,6 +8,7 @@ async-trait = "0.1"
|
|||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
libp2p = { version = "0.53", features = ["ed25519"] }
|
libp2p = { version = "0.53", features = ["ed25519"] }
|
||||||
nomos-libp2p = { path = "../../nomos-libp2p", optional = true }
|
nomos-libp2p = { path = "../../nomos-libp2p", optional = true }
|
||||||
|
nomos-mix = { path = "../../nomos-mix/core" }
|
||||||
nomos-core = { path = "../../nomos-core/chain-defs" }
|
nomos-core = { path = "../../nomos-core/chain-defs" }
|
||||||
nomos-mix-network = { path = "../../nomos-mix/network" }
|
nomos-mix-network = { path = "../../nomos-mix/network" }
|
||||||
nomos-mix-message = { path = "../../nomos-mix/message" }
|
nomos-mix-message = { path = "../../nomos-mix/message" }
|
||||||
|
@ -8,7 +8,10 @@ use backends::MixBackend;
|
|||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use network::NetworkAdapter;
|
use network::NetworkAdapter;
|
||||||
use nomos_core::wire;
|
use nomos_core::wire;
|
||||||
use nomos_mix_message::{new_message, unwrap_message};
|
use nomos_mix::{
|
||||||
|
message_blend::{MessageBlend, MessageBlendSettings},
|
||||||
|
persistent_transmission::{persistent_transmission, PersistentTransmissionSettings},
|
||||||
|
};
|
||||||
use nomos_network::NetworkService;
|
use nomos_network::NetworkService;
|
||||||
use overwatch_rs::services::{
|
use overwatch_rs::services::{
|
||||||
handle::ServiceStateHandle,
|
handle::ServiceStateHandle,
|
||||||
@ -18,6 +21,7 @@ use overwatch_rs::services::{
|
|||||||
ServiceCore, ServiceData, ServiceId,
|
ServiceCore, ServiceData, ServiceId,
|
||||||
};
|
};
|
||||||
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
/// A mix service that sends messages to the mix network
|
/// A mix service that sends messages to the mix network
|
||||||
/// and broadcasts fully unwrapped messages through the [`NetworkService`].
|
/// and broadcasts fully unwrapped messages through the [`NetworkService`].
|
||||||
@ -77,12 +81,41 @@ where
|
|||||||
mut backend,
|
mut backend,
|
||||||
network_relay,
|
network_relay,
|
||||||
} = self;
|
} = self;
|
||||||
|
let mix_config = service_state.settings_reader.get_updated_settings();
|
||||||
|
|
||||||
let network_relay = network_relay.connect().await?;
|
let network_relay = network_relay.connect().await?;
|
||||||
let network_adapter = Network::new(network_relay);
|
let network_adapter = Network::new(network_relay);
|
||||||
|
|
||||||
// TODO: Spawn PersistentTransmission (Tier 1)
|
// Spawn Persistent Transmission
|
||||||
// TODO: Spawn Processor (Tier 2) and connect it to PersistentTransmission
|
let (transmission_schedule_sender, transmission_schedule_receiver) =
|
||||||
|
mpsc::unbounded_channel();
|
||||||
|
let (emission_sender, mut emission_receiver) = mpsc::unbounded_channel();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
persistent_transmission(
|
||||||
|
mix_config.persistent_transmission,
|
||||||
|
transmission_schedule_receiver,
|
||||||
|
emission_sender,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Spawn Message Blend and connect it to Persistent Transmission
|
||||||
|
let (new_message_sender, new_message_receiver) = mpsc::unbounded_channel();
|
||||||
|
let (processor_inbound_sender, processor_inbound_receiver) = mpsc::unbounded_channel();
|
||||||
|
let (fully_unwrapped_message_sender, mut fully_unwrapped_message_receiver) =
|
||||||
|
mpsc::unbounded_channel();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
MessageBlend::new(
|
||||||
|
mix_config.message_blend,
|
||||||
|
new_message_receiver,
|
||||||
|
processor_inbound_receiver,
|
||||||
|
// Connect the outputs of Message Blend to Persistent Transmission
|
||||||
|
transmission_schedule_sender,
|
||||||
|
fully_unwrapped_message_sender,
|
||||||
|
)
|
||||||
|
.run()
|
||||||
|
.await;
|
||||||
|
});
|
||||||
|
|
||||||
// A channel to listen to messages received from the [`MixBackend`]
|
// A channel to listen to messages received from the [`MixBackend`]
|
||||||
let mut incoming_message_stream = backend.listen_to_incoming_messages();
|
let mut incoming_message_stream = backend.listen_to_incoming_messages();
|
||||||
@ -91,11 +124,17 @@ where
|
|||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
Some(msg) = incoming_message_stream.next() => {
|
Some(msg) = incoming_message_stream.next() => {
|
||||||
// TODO: The following logic is wrong and temporary.
|
tracing::debug!("Received message from mix backend. Sending it to Processor");
|
||||||
// Here we're unwrapping the message and broadcasting it,
|
if let Err(e) = processor_inbound_sender.send(msg) {
|
||||||
// but the message should be handled by Processor and PersistentTransmission.
|
tracing::error!("Failed to send incoming message to processor: {e:?}");
|
||||||
let (msg, fully_unwrapped) = unwrap_message(&msg).unwrap();
|
}
|
||||||
assert!(fully_unwrapped);
|
}
|
||||||
|
Some(msg) = emission_receiver.recv() => {
|
||||||
|
tracing::debug!("Emitting message to mix network");
|
||||||
|
backend.publish(msg).await;
|
||||||
|
}
|
||||||
|
Some(msg) = fully_unwrapped_message_receiver.recv() => {
|
||||||
|
tracing::debug!("Broadcasting fully unwrapped message");
|
||||||
match wire::deserialize::<NetworkMessage<Network::BroadcastSettings>>(&msg) {
|
match wire::deserialize::<NetworkMessage<Network::BroadcastSettings>>(&msg) {
|
||||||
Ok(msg) => {
|
Ok(msg) => {
|
||||||
network_adapter.broadcast(msg.message, msg.broadcast_settings).await;
|
network_adapter.broadcast(msg.message, msg.broadcast_settings).await;
|
||||||
@ -104,7 +143,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Some(msg) = service_state.inbound_relay.recv() => {
|
Some(msg) = service_state.inbound_relay.recv() => {
|
||||||
Self::handle_service_message(msg, &mut backend).await;
|
Self::handle_service_message(msg, &new_message_sender);
|
||||||
}
|
}
|
||||||
Some(msg) = lifecycle_stream.next() => {
|
Some(msg) = lifecycle_stream.next() => {
|
||||||
if Self::should_stop_service(msg).await {
|
if Self::should_stop_service(msg).await {
|
||||||
@ -125,20 +164,16 @@ where
|
|||||||
Network: NetworkAdapter,
|
Network: NetworkAdapter,
|
||||||
Network::BroadcastSettings: Clone + Debug + Serialize + DeserializeOwned,
|
Network::BroadcastSettings: Clone + Debug + Serialize + DeserializeOwned,
|
||||||
{
|
{
|
||||||
async fn handle_service_message(
|
fn handle_service_message(
|
||||||
msg: ServiceMessage<Network::BroadcastSettings>,
|
msg: ServiceMessage<Network::BroadcastSettings>,
|
||||||
backend: &mut Backend,
|
new_message_sender: &mpsc::UnboundedSender<Vec<u8>>,
|
||||||
) {
|
) {
|
||||||
match msg {
|
match msg {
|
||||||
ServiceMessage::Mix(msg) => {
|
ServiceMessage::Mix(msg) => {
|
||||||
// split sending in two steps to help the compiler understand we do not
|
// Serialize the new message and send it to the Processor
|
||||||
// need to hold an instance of &I (which is not send) across an await point
|
if let Err(e) = new_message_sender.send(wire::serialize(&msg).unwrap()) {
|
||||||
// TODO: The following logic is wrong and temporary.
|
tracing::error!("Failed to send a new message to processor: {e:?}");
|
||||||
// Here we're wrapping the message here and publishing the message immediately,
|
}
|
||||||
// but the message should be handled by Processor and PersistentTransmission.
|
|
||||||
let wrapped_msg = new_message(&wire::serialize(&msg).unwrap(), 1).unwrap();
|
|
||||||
let _send = backend.publish(wrapped_msg);
|
|
||||||
_send.await
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -163,6 +198,8 @@ where
|
|||||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||||
pub struct MixConfig<BackendSettings> {
|
pub struct MixConfig<BackendSettings> {
|
||||||
pub backend: BackendSettings,
|
pub backend: BackendSettings,
|
||||||
|
pub persistent_transmission: PersistentTransmissionSettings,
|
||||||
|
pub message_blend: MessageBlendSettings,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A message that is handled by [`MixService`].
|
/// A message that is handled by [`MixService`].
|
||||||
|
@ -11,6 +11,7 @@ nomos-node = { path = "../nodes/nomos-node", default-features = false }
|
|||||||
nomos-executor = { path = "../nodes/nomos-executor", default-features = false }
|
nomos-executor = { path = "../nodes/nomos-executor", default-features = false }
|
||||||
nomos-network = { path = "../nomos-services/network", features = ["libp2p"] }
|
nomos-network = { path = "../nomos-services/network", features = ["libp2p"] }
|
||||||
nomos-mix-service = { path = "../nomos-services/mix", features = ["libp2p"] }
|
nomos-mix-service = { path = "../nomos-services/mix", features = ["libp2p"] }
|
||||||
|
nomos-mix = { path = "../nomos-mix/core" }
|
||||||
cryptarchia-consensus = { path = "../nomos-services/cryptarchia-consensus" }
|
cryptarchia-consensus = { path = "../nomos-services/cryptarchia-consensus" }
|
||||||
nomos-tracing = { path = "../nomos-tracing" }
|
nomos-tracing = { path = "../nomos-tracing" }
|
||||||
nomos-tracing-service = { path = "../nomos-services/tracing" }
|
nomos-tracing-service = { path = "../nomos-services/tracing" }
|
||||||
|
@ -20,6 +20,9 @@ use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as Verif
|
|||||||
use nomos_da_verifier::DaVerifierServiceSettings;
|
use nomos_da_verifier::DaVerifierServiceSettings;
|
||||||
use nomos_executor::api::backend::AxumBackendSettings;
|
use nomos_executor::api::backend::AxumBackendSettings;
|
||||||
use nomos_executor::config::Config;
|
use nomos_executor::config::Config;
|
||||||
|
use nomos_mix::message_blend::{
|
||||||
|
CryptographicProcessorSettings, MessageBlendSettings, TemporalProcessorSettings,
|
||||||
|
};
|
||||||
use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig};
|
use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig};
|
||||||
use nomos_node::api::paths::{CL_METRICS, DA_GET_RANGE};
|
use nomos_node::api::paths::{CL_METRICS, DA_GET_RANGE};
|
||||||
use nomos_node::RocksBackendSettings;
|
use nomos_node::RocksBackendSettings;
|
||||||
@ -154,6 +157,13 @@ pub fn create_executor_config(config: GeneralConfig) -> Config {
|
|||||||
},
|
},
|
||||||
mix: nomos_mix_service::MixConfig {
|
mix: nomos_mix_service::MixConfig {
|
||||||
backend: config.mix_config.backend,
|
backend: config.mix_config.backend,
|
||||||
|
persistent_transmission: Default::default(),
|
||||||
|
message_blend: MessageBlendSettings {
|
||||||
|
cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 },
|
||||||
|
temporal_processor: TemporalProcessorSettings {
|
||||||
|
max_delay_seconds: 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
cryptarchia: CryptarchiaSettings {
|
cryptarchia: CryptarchiaSettings {
|
||||||
notes: config.consensus_config.notes,
|
notes: config.consensus_config.notes,
|
||||||
|
@ -14,6 +14,9 @@ use nomos_da_sampling::{backend::kzgrs::KzgrsSamplingBackendSettings, DaSampling
|
|||||||
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageAdapterSettings;
|
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageAdapterSettings;
|
||||||
use nomos_da_verifier::{backend::kzgrs::KzgrsDaVerifierSettings, DaVerifierServiceSettings};
|
use nomos_da_verifier::{backend::kzgrs::KzgrsDaVerifierSettings, DaVerifierServiceSettings};
|
||||||
use nomos_mempool::MempoolMetrics;
|
use nomos_mempool::MempoolMetrics;
|
||||||
|
use nomos_mix::message_blend::{
|
||||||
|
CryptographicProcessorSettings, MessageBlendSettings, TemporalProcessorSettings,
|
||||||
|
};
|
||||||
use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig};
|
use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig};
|
||||||
use nomos_node::api::paths::{
|
use nomos_node::api::paths::{
|
||||||
CL_METRICS, CRYPTARCHIA_HEADERS, CRYPTARCHIA_INFO, DA_GET_RANGE, STORAGE_BLOCK,
|
CL_METRICS, CRYPTARCHIA_HEADERS, CRYPTARCHIA_INFO, DA_GET_RANGE, STORAGE_BLOCK,
|
||||||
@ -239,6 +242,13 @@ pub fn create_validator_config(config: GeneralConfig) -> Config {
|
|||||||
},
|
},
|
||||||
mix: nomos_mix_service::MixConfig {
|
mix: nomos_mix_service::MixConfig {
|
||||||
backend: config.mix_config.backend,
|
backend: config.mix_config.backend,
|
||||||
|
persistent_transmission: Default::default(),
|
||||||
|
message_blend: MessageBlendSettings {
|
||||||
|
cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 },
|
||||||
|
temporal_processor: TemporalProcessorSettings {
|
||||||
|
max_delay_seconds: 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
cryptarchia: CryptarchiaSettings {
|
cryptarchia: CryptarchiaSettings {
|
||||||
notes: config.consensus_config.notes,
|
notes: config.consensus_config.notes,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user