Mix: Implement blend as stream + extension (#864)

* First attempt

* Implement blend as stream and extension

* Implement blend as stream and extension
This commit is contained in:
Daniel Sanchez 2024-10-30 11:50:15 +01:00 committed by GitHub
parent 9b5d4d329e
commit 2f92c183ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 122 additions and 6 deletions

View File

@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
cached = "0.53"
tokio = { version = "1", features = ["time", "sync", "macros"] }
tokio-stream = "0.1"
tracing = "0.1"
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
@ -16,4 +17,3 @@ rand_chacha = "0.3"
[dev-dependencies]
tokio = { version = "1", features = ["rt-multi-thread"] }
tokio-stream = "0.1"

View File

@ -3,11 +3,12 @@ use serde::{Deserialize, Serialize};
/// [`CryptographicProcessor`] is responsible for wrapping and unwrapping messages
/// for the message indistinguishability.
#[derive(Clone, Copy, Debug)]
pub(crate) struct CryptographicProcessor {
settings: CryptographicProcessorSettings,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct CryptographicProcessorSettings {
pub num_mix_layers: usize,
}

View File

@ -2,13 +2,18 @@ mod crypto;
mod temporal;
pub use crypto::CryptographicProcessorSettings;
use futures::StreamExt;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
pub use temporal::TemporalProcessorSettings;
use crate::message_blend::temporal::TemporalProcessorExt;
use crate::message_blend::{crypto::CryptographicProcessor, temporal::TemporalProcessor};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use crate::message_blend::{crypto::CryptographicProcessor, temporal::TemporalProcessor};
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::wrappers::UnboundedReceiverStream;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MessageBlendSettings {
@ -120,3 +125,113 @@ struct TemporalProcessableMessage {
message: Vec<u8>,
fully_unwrapped: bool,
}
pub enum MessageBlendStreamIncomingMessage {
Local(Vec<u8>),
Inbound(Vec<u8>),
}
pub enum MessageBlendStreamOutgoingMessage {
FullyUnwrapped(Vec<u8>),
Outbound(Vec<u8>),
}
pub struct MessageBlendStream<S> {
input_stream: S,
output_stream: BoxStream<'static, MessageBlendStreamOutgoingMessage>,
bypass_sender: UnboundedSender<MessageBlendStreamOutgoingMessage>,
temporal_sender: UnboundedSender<MessageBlendStreamOutgoingMessage>,
cryptographic_processor: CryptographicProcessor,
}
impl<S> MessageBlendStream<S>
where
S: Stream<Item = MessageBlendStreamIncomingMessage>,
{
pub fn new(input_stream: S, settings: MessageBlendSettings) -> Self {
let cryptographic_processor = CryptographicProcessor::new(settings.cryptographic_processor);
let (bypass_sender, bypass_receiver) = mpsc::unbounded_channel();
let (temporal_sender, temporal_receiver) = mpsc::unbounded_channel();
let output_stream = tokio_stream::StreamExt::merge(
UnboundedReceiverStream::new(bypass_receiver),
UnboundedReceiverStream::new(temporal_receiver)
.to_temporal_stream(settings.temporal_processor),
)
.boxed();
Self {
input_stream,
output_stream,
bypass_sender,
temporal_sender,
cryptographic_processor,
}
}
fn process_new_message(self: &mut Pin<&mut Self>, message: Vec<u8>) {
match self.cryptographic_processor.wrap_message(&message) {
Ok(wrapped_message) => {
if let Err(e) = self
.bypass_sender
.send(MessageBlendStreamOutgoingMessage::Outbound(wrapped_message))
{
tracing::error!("Failed to send message to the outbound channel: {e:?}");
}
}
Err(e) => {
tracing::error!("Failed to wrap message: {:?}", e);
}
}
}
fn process_incoming_message(self: &mut Pin<&mut Self>, message: Vec<u8>) {
match self.cryptographic_processor.unwrap_message(&message) {
Ok((unwrapped_message, fully_unwrapped)) => {
let message = if fully_unwrapped {
MessageBlendStreamOutgoingMessage::FullyUnwrapped(unwrapped_message)
} else {
MessageBlendStreamOutgoingMessage::Outbound(unwrapped_message)
};
if let Err(e) = self.temporal_sender.send(message) {
tracing::error!("Failed to send message to the outbound channel: {e:?}");
}
}
Err(nomos_mix_message::Error::MsgUnwrapNotAllowed) => {
tracing::debug!("Message cannot be unwrapped by this node");
}
Err(e) => {
tracing::error!("Failed to unwrap message: {:?}", e);
}
}
}
}
impl<S> Stream for MessageBlendStream<S>
where
S: Stream<Item = MessageBlendStreamIncomingMessage> + Unpin,
{
type Item = MessageBlendStreamOutgoingMessage;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.input_stream.poll_next_unpin(cx) {
Poll::Ready(Some(MessageBlendStreamIncomingMessage::Local(message))) => {
self.process_new_message(message);
}
Poll::Ready(Some(MessageBlendStreamIncomingMessage::Inbound(message))) => {
self.process_incoming_message(message);
}
_ => {}
}
self.output_stream.poll_next_unpin(cx)
}
}
pub trait MessageBlendExt: Stream<Item = MessageBlendStreamIncomingMessage> {
fn blend(self, message_blend_settings: MessageBlendSettings) -> MessageBlendStream<Self>
where
Self: Sized,
{
MessageBlendStream::new(self, message_blend_settings)
}
}
impl<T> MessageBlendExt for T where T: Stream<Item = MessageBlendStreamIncomingMessage> {}

View File

@ -25,7 +25,7 @@ pub(crate) struct TemporalProcessor<M> {
release_timer: Option<Pin<Box<time::Sleep>>>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct TemporalProcessorSettings {
pub max_delay_seconds: u64,
}