1
0
mirror of synced 2025-01-09 23:35:46 +00:00

Mix: Use streams in message blend and mix service (#896)

* Use streams in message blend and mix service

* Refactor temporal stream method naming

* Clippy happy

* Clippy test happy

* Undo coupling

* Remove bypassing from blend

* Clippy happy

* Un-entangle tiers

* Send local messages to persistent transmission

Co-authored-by: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com>

---------

Co-authored-by: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com>
This commit is contained in:
Daniel Sanchez 2024-11-02 22:57:10 +01:00 committed by GitHub
parent 2f92c183ab
commit c237333791
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 105 additions and 352 deletions

View File

@ -1,2 +1,16 @@
pub mod message_blend; pub mod message_blend;
pub mod persistent_transmission; pub mod persistent_transmission;
pub enum MixOutgoingMessage {
FullyUnwrapped(Vec<u8>),
Outbound(Vec<u8>),
}
impl From<MixOutgoingMessage> for Vec<u8> {
fn from(value: MixOutgoingMessage) -> Self {
match value {
MixOutgoingMessage::FullyUnwrapped(v) => v,
MixOutgoingMessage::Outbound(v) => v,
}
}
}

View File

@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
/// [`CryptographicProcessor`] is responsible for wrapping and unwrapping messages /// [`CryptographicProcessor`] is responsible for wrapping and unwrapping messages
/// for the message indistinguishability. /// for the message indistinguishability.
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
pub(crate) struct CryptographicProcessor { pub struct CryptographicProcessor {
settings: CryptographicProcessorSettings, settings: CryptographicProcessorSettings,
} }
@ -14,17 +14,17 @@ pub struct CryptographicProcessorSettings {
} }
impl CryptographicProcessor { impl CryptographicProcessor {
pub(crate) fn new(settings: CryptographicProcessorSettings) -> Self { pub fn new(settings: CryptographicProcessorSettings) -> Self {
Self { settings } Self { settings }
} }
pub(crate) fn wrap_message(&self, message: &[u8]) -> Result<Vec<u8>, nomos_mix_message::Error> { pub fn wrap_message(&self, message: &[u8]) -> Result<Vec<u8>, nomos_mix_message::Error> {
// TODO: Use the actual Sphinx encoding instead of mock. // TODO: Use the actual Sphinx encoding instead of mock.
// TODO: Select `num_mix_layers` random nodes from the membership. // TODO: Select `num_mix_layers` random nodes from the membership.
new_message(message, self.settings.num_mix_layers.try_into().unwrap()) new_message(message, self.settings.num_mix_layers.try_into().unwrap())
} }
pub(crate) fn unwrap_message( pub fn unwrap_message(
&self, &self,
message: &[u8], message: &[u8],
) -> Result<(Vec<u8>, bool), nomos_mix_message::Error> { ) -> Result<(Vec<u8>, bool), nomos_mix_message::Error> {

View File

@ -1,5 +1,5 @@
mod crypto; pub mod crypto;
mod temporal; pub mod temporal;
pub use crypto::CryptographicProcessorSettings; pub use crypto::CryptographicProcessorSettings;
use futures::stream::BoxStream; use futures::stream::BoxStream;
@ -8,8 +8,9 @@ use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
pub use temporal::TemporalProcessorSettings; pub use temporal::TemporalProcessorSettings;
use crate::message_blend::crypto::CryptographicProcessor;
use crate::message_blend::temporal::TemporalProcessorExt; use crate::message_blend::temporal::TemporalProcessorExt;
use crate::message_blend::{crypto::CryptographicProcessor, temporal::TemporalProcessor}; use crate::MixOutgoingMessage;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
@ -21,175 +22,41 @@ pub struct MessageBlendSettings {
pub temporal_processor: TemporalProcessorSettings, pub temporal_processor: TemporalProcessorSettings,
} }
/// [`MessageBlend`] handles the entire Tier-2 spec. /// [`MessageBlendStream`] handles the entire mixing tiers process
/// - Wraps new messages using [`CryptographicProcessor`]
/// - Unwraps incoming messages received from network using [`CryptographicProcessor`] /// - Unwraps incoming messages received from network using [`CryptographicProcessor`]
/// - Pushes unwrapped messages to [`TemporalProcessor`] /// - Pushes unwrapped messages to [`TemporalProcessor`]
/// - Releases messages returned by [`TemporalProcessor`] to the proper channel
pub struct MessageBlend {
/// To receive new messages originated from this node
new_message_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
/// To receive incoming messages from the network
inbound_message_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
/// To release messages that are successfully processed but still wrapped
outbound_message_sender: mpsc::UnboundedSender<Vec<u8>>,
/// To release fully unwrapped messages
fully_unwrapped_message_sender: mpsc::UnboundedSender<Vec<u8>>,
/// Processors
cryptographic_processor: CryptographicProcessor,
temporal_processor: TemporalProcessor<TemporalProcessableMessage>,
}
impl MessageBlend {
pub fn new(
settings: MessageBlendSettings,
new_message_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
inbound_message_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
outbound_message_sender: mpsc::UnboundedSender<Vec<u8>>,
fully_unwrapped_message_sender: mpsc::UnboundedSender<Vec<u8>>,
) -> Self {
Self {
new_message_receiver,
inbound_message_receiver,
outbound_message_sender,
fully_unwrapped_message_sender,
cryptographic_processor: CryptographicProcessor::new(settings.cryptographic_processor),
temporal_processor: TemporalProcessor::<_>::new(settings.temporal_processor),
}
}
pub async fn run(&mut self) {
loop {
tokio::select! {
Some(new_message) = self.new_message_receiver.recv() => {
self.handle_new_message(new_message);
}
Some(incoming_message) = self.inbound_message_receiver.recv() => {
self.handle_incoming_message(incoming_message);
}
Some(msg) = self.temporal_processor.next() => {
self.release_temporal_processed_message(msg);
}
}
}
}
fn handle_new_message(&mut self, message: Vec<u8>) {
match self.cryptographic_processor.wrap_message(&message) {
Ok(wrapped_message) => {
// Bypass Temporal Processor, and send the message to the outbound channel directly
// because the message is originated from this node.
if let Err(e) = self.outbound_message_sender.send(wrapped_message) {
tracing::error!("Failed to send message to the outbound channel: {e:?}");
}
}
Err(e) => {
tracing::error!("Failed to wrap message: {:?}", e);
}
}
}
fn handle_incoming_message(&mut self, message: Vec<u8>) {
match self.cryptographic_processor.unwrap_message(&message) {
Ok((unwrapped_message, fully_unwrapped)) => {
self.temporal_processor
.push_message(TemporalProcessableMessage {
message: unwrapped_message,
fully_unwrapped,
});
}
Err(nomos_mix_message::Error::MsgUnwrapNotAllowed) => {
tracing::debug!("Message cannot be unwrapped by this node");
}
Err(e) => {
tracing::error!("Failed to unwrap message: {:?}", e);
}
}
}
fn release_temporal_processed_message(&mut self, message: TemporalProcessableMessage) {
if message.fully_unwrapped {
if let Err(e) = self.fully_unwrapped_message_sender.send(message.message) {
tracing::error!(
"Failed to send fully unwrapped message to the fully unwrapped channel: {e:?}"
);
}
} else if let Err(e) = self.outbound_message_sender.send(message.message) {
tracing::error!("Failed to send message to the outbound channel: {e:?}");
}
}
}
#[derive(Clone)]
struct TemporalProcessableMessage {
message: Vec<u8>,
fully_unwrapped: bool,
}
pub enum MessageBlendStreamIncomingMessage {
Local(Vec<u8>),
Inbound(Vec<u8>),
}
pub enum MessageBlendStreamOutgoingMessage {
FullyUnwrapped(Vec<u8>),
Outbound(Vec<u8>),
}
pub struct MessageBlendStream<S> { pub struct MessageBlendStream<S> {
input_stream: S, input_stream: S,
output_stream: BoxStream<'static, MessageBlendStreamOutgoingMessage>, output_stream: BoxStream<'static, MixOutgoingMessage>,
bypass_sender: UnboundedSender<MessageBlendStreamOutgoingMessage>, temporal_sender: UnboundedSender<MixOutgoingMessage>,
temporal_sender: UnboundedSender<MessageBlendStreamOutgoingMessage>,
cryptographic_processor: CryptographicProcessor, cryptographic_processor: CryptographicProcessor,
} }
impl<S> MessageBlendStream<S> impl<S> MessageBlendStream<S>
where where
S: Stream<Item = MessageBlendStreamIncomingMessage>, S: Stream<Item = Vec<u8>>,
{ {
pub fn new(input_stream: S, settings: MessageBlendSettings) -> Self { pub fn new(input_stream: S, settings: MessageBlendSettings) -> Self {
let cryptographic_processor = CryptographicProcessor::new(settings.cryptographic_processor); let cryptographic_processor = CryptographicProcessor::new(settings.cryptographic_processor);
let (bypass_sender, bypass_receiver) = mpsc::unbounded_channel();
let (temporal_sender, temporal_receiver) = mpsc::unbounded_channel(); let (temporal_sender, temporal_receiver) = mpsc::unbounded_channel();
let output_stream = tokio_stream::StreamExt::merge( let output_stream = UnboundedReceiverStream::new(temporal_receiver)
UnboundedReceiverStream::new(bypass_receiver), .temporal_stream(settings.temporal_processor)
UnboundedReceiverStream::new(temporal_receiver)
.to_temporal_stream(settings.temporal_processor),
)
.boxed(); .boxed();
Self { Self {
input_stream, input_stream,
output_stream, output_stream,
bypass_sender,
temporal_sender, temporal_sender,
cryptographic_processor, cryptographic_processor,
} }
} }
fn process_new_message(self: &mut Pin<&mut Self>, message: Vec<u8>) {
match self.cryptographic_processor.wrap_message(&message) {
Ok(wrapped_message) => {
if let Err(e) = self
.bypass_sender
.send(MessageBlendStreamOutgoingMessage::Outbound(wrapped_message))
{
tracing::error!("Failed to send message to the outbound channel: {e:?}");
}
}
Err(e) => {
tracing::error!("Failed to wrap message: {:?}", e);
}
}
}
fn process_incoming_message(self: &mut Pin<&mut Self>, message: Vec<u8>) { fn process_incoming_message(self: &mut Pin<&mut Self>, message: Vec<u8>) {
match self.cryptographic_processor.unwrap_message(&message) { match self.cryptographic_processor.unwrap_message(&message) {
Ok((unwrapped_message, fully_unwrapped)) => { Ok((unwrapped_message, fully_unwrapped)) => {
let message = if fully_unwrapped { let message = if fully_unwrapped {
MessageBlendStreamOutgoingMessage::FullyUnwrapped(unwrapped_message) MixOutgoingMessage::FullyUnwrapped(unwrapped_message)
} else { } else {
MessageBlendStreamOutgoingMessage::Outbound(unwrapped_message) MixOutgoingMessage::Outbound(unwrapped_message)
}; };
if let Err(e) = self.temporal_sender.send(message) { if let Err(e) = self.temporal_sender.send(message) {
tracing::error!("Failed to send message to the outbound channel: {e:?}"); tracing::error!("Failed to send message to the outbound channel: {e:?}");
@ -207,31 +74,25 @@ where
impl<S> Stream for MessageBlendStream<S> impl<S> Stream for MessageBlendStream<S>
where where
S: Stream<Item = MessageBlendStreamIncomingMessage> + Unpin, S: Stream<Item = Vec<u8>> + Unpin,
{ {
type Item = MessageBlendStreamOutgoingMessage; type Item = MixOutgoingMessage;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.input_stream.poll_next_unpin(cx) { if let Poll::Ready(Some(message)) = self.input_stream.poll_next_unpin(cx) {
Poll::Ready(Some(MessageBlendStreamIncomingMessage::Local(message))) => {
self.process_new_message(message);
}
Poll::Ready(Some(MessageBlendStreamIncomingMessage::Inbound(message))) => {
self.process_incoming_message(message); self.process_incoming_message(message);
} }
_ => {}
}
self.output_stream.poll_next_unpin(cx) self.output_stream.poll_next_unpin(cx)
} }
} }
pub trait MessageBlendExt: Stream<Item = MessageBlendStreamIncomingMessage> { pub trait MessageBlendExt: Stream<Item = Vec<u8>> {
fn blend(self, message_blend_settings: MessageBlendSettings) -> MessageBlendStream<Self> fn blend(self, message_blend_settings: MessageBlendSettings) -> MessageBlendStream<Self>
where where
Self: Sized, Self: Sized + Unpin,
{ {
MessageBlendStream::new(self, message_blend_settings) MessageBlendStream::new(self, message_blend_settings)
} }
} }
impl<T> MessageBlendExt for T where T: Stream<Item = MessageBlendStreamIncomingMessage> {} impl<T> MessageBlendExt for T where T: Stream<Item = Vec<u8>> {}

View File

@ -120,9 +120,8 @@ where
self.processor.poll_next_unpin(cx) self.processor.poll_next_unpin(cx)
} }
} }
#[allow(dead_code)] // TODO: Remove when integrating into blend
pub trait TemporalProcessorExt: Stream { pub trait TemporalProcessorExt: Stream {
fn to_temporal_stream(self, settings: TemporalProcessorSettings) -> TemporalStream<Self> fn temporal_stream(self, settings: TemporalProcessorSettings) -> TemporalStream<Self>
where where
Self: Sized, Self: Sized,
{ {

View File

@ -6,13 +6,10 @@ use serde::{Deserialize, Serialize};
use std::pin::{pin, Pin}; use std::pin::{pin, Pin};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use tokio::time;
use tokio::time::Interval; use tokio::time::Interval;
use tokio::{
sync::mpsc::{self, error::TryRecvError},
time,
};
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct PersistentTransmissionSettings { pub struct PersistentTransmissionSettings {
/// The maximum number of messages that can be emitted per second /// The maximum number of messages that can be emitted per second
pub max_emission_frequency: f64, pub max_emission_frequency: f64,
@ -29,6 +26,7 @@ impl Default for PersistentTransmissionSettings {
} }
} }
/// Transmit scheduled messages with a persistent rate as a stream.
pub struct PersistentTransmissionStream<S> pub struct PersistentTransmissionStream<S>
where where
S: Stream, S: Stream,
@ -66,7 +64,7 @@ impl<S> Stream for PersistentTransmissionStream<S>
where where
S: Stream<Item = Vec<u8>> + Unpin, S: Stream<Item = Vec<u8>> + Unpin,
{ {
type Item = S::Item; type Item = Vec<u8>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self { let Self {
@ -102,57 +100,6 @@ pub trait PersistentTransmissionExt: Stream {
impl<S> PersistentTransmissionExt for S where S: Stream {} impl<S> PersistentTransmissionExt for S where S: Stream {}
/// Transmit scheduled messages with a persistent rate to the transmission channel.
///
/// # Arguments
///
/// * `settings` - The settings for the persistent transmission
/// * `schedule_receiver` - The channel for messages scheduled (from Tier 2 currently)
/// * `emission_sender` - The channel to emit messages
pub async fn persistent_transmission(
settings: PersistentTransmissionSettings,
schedule_receiver: mpsc::UnboundedReceiver<Vec<u8>>,
emission_sender: mpsc::UnboundedSender<Vec<u8>>,
) {
let mut schedule_receiver = schedule_receiver;
let mut interval = time::interval(Duration::from_secs_f64(
1.0 / settings.max_emission_frequency,
));
let mut coin = Coin::<_>::new(
ChaCha12Rng::from_entropy(),
settings.drop_message_probability,
)
.unwrap();
loop {
interval.tick().await;
// Emit the first one of the scheduled messages.
// If there is no scheduled message, emit a drop message with probability.
match schedule_receiver.try_recv() {
Ok(msg) => {
if let Err(e) = emission_sender.send(msg) {
tracing::error!("Failed to send message to the transmission channel: {e:?}");
}
}
Err(TryRecvError::Empty) => {
// If the coin is head, emit the drop message.
if coin.flip() {
if let Err(e) = emission_sender.send(DROP_MESSAGE.to_vec()) {
tracing::error!(
"Failed to send drop message to the transmission channel: {e:?}"
);
}
}
}
Err(TryRecvError::Disconnected) => {
tracing::error!("The schedule channel has been closed");
break;
}
}
}
}
struct Coin<R: Rng> { struct Coin<R: Rng> {
rng: R, rng: R,
distribution: Uniform<f64>, distribution: Uniform<f64>,
@ -186,6 +133,7 @@ enum CoinError {
mod tests { mod tests {
use super::*; use super::*;
use futures::StreamExt; use futures::StreamExt;
use tokio::sync::mpsc;
macro_rules! assert_interval { macro_rules! assert_interval {
($last_time:expr, $lower_bound:expr, $upper_bound:expr) => { ($last_time:expr, $lower_bound:expr, $upper_bound:expr) => {
@ -209,63 +157,6 @@ mod tests {
}; };
} }
#[tokio::test]
async fn test_persistent_transmission() {
let (schedule_sender, schedule_receiver) = mpsc::unbounded_channel();
let (emission_sender, mut emission_receiver) = mpsc::unbounded_channel();
let settings = PersistentTransmissionSettings {
max_emission_frequency: 1.0,
// Set to always emit drop messages if no scheduled messages for easy testing
drop_message_probability: 1.0,
};
// Prepare the expected emission interval with torelance
let expected_emission_interval =
Duration::from_secs_f64(1.0 / settings.max_emission_frequency);
let torelance = expected_emission_interval / 10; // 10% torelance
let lower_bound = expected_emission_interval - torelance;
let upper_bound = expected_emission_interval + torelance;
// Start the persistent transmission and schedule messages
tokio::spawn(persistent_transmission(
settings,
schedule_receiver,
emission_sender,
));
// Messages must be scheduled in non-blocking manner.
schedule_sender.send(vec![1]).unwrap();
schedule_sender.send(vec![2]).unwrap();
schedule_sender.send(vec![3]).unwrap();
// Check if expected messages are emitted with the expected interval
assert_eq!(emission_receiver.recv().await.unwrap(), vec![1]);
let mut last_time = time::Instant::now();
assert_eq!(emission_receiver.recv().await.unwrap(), vec![2]);
assert_interval!(&mut last_time, lower_bound, upper_bound);
assert_eq!(emission_receiver.recv().await.unwrap(), vec![3]);
assert_interval!(&mut last_time, lower_bound, upper_bound);
assert_eq!(
emission_receiver.recv().await.unwrap(),
DROP_MESSAGE.to_vec()
);
assert_interval!(&mut last_time, lower_bound, upper_bound);
assert_eq!(
emission_receiver.recv().await.unwrap(),
DROP_MESSAGE.to_vec()
);
assert_interval!(&mut last_time, lower_bound, upper_bound);
// Schedule a new message and check if it is emitted at the next interval
schedule_sender.send(vec![4]).unwrap();
assert_eq!(emission_receiver.recv().await.unwrap(), vec![4]);
assert_interval!(&mut last_time, lower_bound, upper_bound);
}
#[tokio::test] #[tokio::test]
async fn test_persistent_transmission_stream() { async fn test_persistent_transmission_stream() {
let (schedule_sender, schedule_receiver) = mpsc::unbounded_channel(); let (schedule_sender, schedule_receiver) = mpsc::unbounded_channel();

View File

@ -1,17 +1,17 @@
pub mod backends; pub mod backends;
pub mod network; pub mod network;
use std::fmt::Debug;
use async_trait::async_trait; use async_trait::async_trait;
use backends::MixBackend; use backends::MixBackend;
use futures::StreamExt; use futures::StreamExt;
use network::NetworkAdapter; use network::NetworkAdapter;
use nomos_core::wire; use nomos_core::wire;
use nomos_mix::{ use nomos_mix::message_blend::crypto::CryptographicProcessor;
message_blend::{MessageBlend, MessageBlendSettings}, use nomos_mix::message_blend::{MessageBlendExt, MessageBlendSettings};
persistent_transmission::{persistent_transmission, PersistentTransmissionSettings}, use nomos_mix::persistent_transmission::{
PersistentTransmissionExt, PersistentTransmissionSettings,
}; };
use nomos_mix::MixOutgoingMessage;
use nomos_network::NetworkService; use nomos_network::NetworkService;
use overwatch_rs::services::{ use overwatch_rs::services::{
handle::ServiceStateHandle, handle::ServiceStateHandle,
@ -21,7 +21,9 @@ use overwatch_rs::services::{
ServiceCore, ServiceData, ServiceId, ServiceCore, ServiceData, ServiceId,
}; };
use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::fmt::Debug;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
/// A mix service that sends messages to the mix network /// A mix service that sends messages to the mix network
/// and broadcasts fully unwrapped messages through the [`NetworkService`]. /// and broadcasts fully unwrapped messages through the [`NetworkService`].
@ -77,73 +79,73 @@ where
async fn run(mut self) -> Result<(), overwatch_rs::DynError> { async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
let Self { let Self {
mut service_state, service_state,
mut backend, mut backend,
network_relay, network_relay,
} = self; } = self;
let mix_config = service_state.settings_reader.get_updated_settings(); let mix_config = service_state.settings_reader.get_updated_settings();
let cryptographic_processor =
CryptographicProcessor::new(mix_config.message_blend.cryptographic_processor);
let network_relay = network_relay.connect().await?; let network_relay = network_relay.connect().await?;
let network_adapter = Network::new(network_relay); let network_adapter = Network::new(network_relay);
// Spawn Persistent Transmission // tier 1 persistent transmission
let (transmission_schedule_sender, transmission_schedule_receiver) = let (persistent_sender, persistent_receiver) = mpsc::unbounded_channel();
mpsc::unbounded_channel(); let mut persistent_transmission_messages =
let (emission_sender, mut emission_receiver) = mpsc::unbounded_channel(); UnboundedReceiverStream::new(persistent_receiver)
tokio::spawn(async move { .persistent_transmission(mix_config.persistent_transmission);
persistent_transmission(
mix_config.persistent_transmission,
transmission_schedule_receiver,
emission_sender,
)
.await;
});
// Spawn Message Blend and connect it to Persistent Transmission // tier 2 blend
let (new_message_sender, new_message_receiver) = mpsc::unbounded_channel(); let mut blend_messages = backend
let (processor_inbound_sender, processor_inbound_receiver) = mpsc::unbounded_channel(); .listen_to_incoming_messages()
let (fully_unwrapped_message_sender, mut fully_unwrapped_message_receiver) = .blend(mix_config.message_blend);
mpsc::unbounded_channel();
tokio::spawn(async move {
MessageBlend::new(
mix_config.message_blend,
new_message_receiver,
processor_inbound_receiver,
// Connect the outputs of Message Blend to Persistent Transmission
transmission_schedule_sender,
fully_unwrapped_message_sender,
)
.run()
.await;
});
// A channel to listen to messages received from the [`MixBackend`] // local messages, are bypassed and send immediately
let mut incoming_message_stream = backend.listen_to_incoming_messages(); let mut local_messages = service_state
.inbound_relay
.map(|ServiceMessage::Mix(message)| {
wire::serialize(&message)
.expect("Message from internal services should not fail to serialize")
});
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
loop { loop {
tokio::select! { tokio::select! {
Some(msg) = incoming_message_stream.next() => { Some(msg) = persistent_transmission_messages.next() => {
tracing::debug!("Received message from mix backend. Sending it to Processor");
if let Err(e) = processor_inbound_sender.send(msg) {
tracing::error!("Failed to send incoming message to processor: {e:?}");
}
}
Some(msg) = emission_receiver.recv() => {
tracing::debug!("Emitting message to mix network");
backend.publish(msg).await; backend.publish(msg).await;
} }
Some(msg) = fully_unwrapped_message_receiver.recv() => { // Already processed blend messages
Some(msg) = blend_messages.next() => {
match msg {
MixOutgoingMessage::Outbound(msg) => {
if let Err(e) = persistent_sender.send(msg) {
tracing::error!("Error sending message to persistent stream: {e}");
}
}
MixOutgoingMessage::FullyUnwrapped(msg) => {
tracing::debug!("Broadcasting fully unwrapped message"); tracing::debug!("Broadcasting fully unwrapped message");
match wire::deserialize::<NetworkMessage<Network::BroadcastSettings>>(&msg) { match wire::deserialize::<NetworkMessage<Network::BroadcastSettings>>(&msg) {
Ok(msg) => { Ok(msg) => {
network_adapter.broadcast(msg.message, msg.broadcast_settings).await; network_adapter.broadcast(msg.message, msg.broadcast_settings).await;
}, },
_ => tracing::error!("unrecognized message from mix backend") _ => {
tracing::error!("unrecognized message from mix backend");
}
}
}
}
}
Some(msg) = local_messages.next() => {
match cryptographic_processor.wrap_message(&msg) {
Ok(wrapped_message) => {
if let Err(e) = persistent_sender.send(wrapped_message) {
tracing::error!("Error sending message to persistent stream: {e}");
}
}
Err(e) => {
tracing::error!("Failed to wrap message: {:?}", e);
} }
} }
Some(msg) = service_state.inbound_relay.recv() => {
Self::handle_service_message(msg, &new_message_sender);
} }
Some(msg) = lifecycle_stream.next() => { Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await { if Self::should_stop_service(msg).await {
@ -164,20 +166,6 @@ where
Network: NetworkAdapter, Network: NetworkAdapter,
Network::BroadcastSettings: Clone + Debug + Serialize + DeserializeOwned, Network::BroadcastSettings: Clone + Debug + Serialize + DeserializeOwned,
{ {
fn handle_service_message(
msg: ServiceMessage<Network::BroadcastSettings>,
new_message_sender: &mpsc::UnboundedSender<Vec<u8>>,
) {
match msg {
ServiceMessage::Mix(msg) => {
// Serialize the new message and send it to the Processor
if let Err(e) = new_message_sender.send(wire::serialize(&msg).unwrap()) {
tracing::error!("Failed to send a new message to processor: {e:?}");
}
}
}
}
async fn should_stop_service(msg: LifecycleMessage) -> bool { async fn should_stop_service(msg: LifecycleMessage) -> bool {
match msg { match msg {
LifecycleMessage::Kill => true, LifecycleMessage::Kill => true,
@ -198,8 +186,8 @@ where
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
pub struct MixConfig<BackendSettings> { pub struct MixConfig<BackendSettings> {
pub backend: BackendSettings, pub backend: BackendSettings,
pub persistent_transmission: PersistentTransmissionSettings,
pub message_blend: MessageBlendSettings, pub message_blend: MessageBlendSettings,
pub persistent_transmission: PersistentTransmissionSettings,
} }
/// A message that is handled by [`MixService`]. /// A message that is handled by [`MixService`].

View File

@ -3,6 +3,8 @@ use std::process::{Command, Stdio};
use std::time::Duration; use std::time::Duration;
use std::{net::SocketAddr, process::Child}; use std::{net::SocketAddr, process::Child};
use crate::adjust_timeout;
use crate::topology::configs::GeneralConfig;
use cryptarchia_consensus::CryptarchiaSettings; use cryptarchia_consensus::CryptarchiaSettings;
use nomos_da_dispersal::backend::kzgrs::{DispersalKZGRSBackendSettings, EncoderSettings}; use nomos_da_dispersal::backend::kzgrs::{DispersalKZGRSBackendSettings, EncoderSettings};
use nomos_da_dispersal::DispersalServiceSettings; use nomos_da_dispersal::DispersalServiceSettings;
@ -28,9 +30,6 @@ use nomos_node::api::paths::{CL_METRICS, DA_GET_RANGE};
use nomos_node::RocksBackendSettings; use nomos_node::RocksBackendSettings;
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
use crate::adjust_timeout;
use crate::topology::configs::GeneralConfig;
use super::{create_tempdir, persist_tempdir, GetRangeReq, CLIENT}; use super::{create_tempdir, persist_tempdir, GetRangeReq, CLIENT};
const BIN_PATH: &str = "../target/debug/nomos-executor"; const BIN_PATH: &str = "../target/debug/nomos-executor";

View File

@ -243,6 +243,7 @@ pub fn create_validator_config(config: GeneralConfig) -> Config {
mix: nomos_mix_service::MixConfig { mix: nomos_mix_service::MixConfig {
backend: config.mix_config.backend, backend: config.mix_config.backend,
persistent_transmission: Default::default(), persistent_transmission: Default::default(),
message_blend: MessageBlendSettings { message_blend: MessageBlendSettings {
cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 }, cryptographic_processor: CryptographicProcessorSettings { num_mix_layers: 1 },
temporal_processor: TemporalProcessorSettings { temporal_processor: TemporalProcessorSettings {