This commit is contained in:
Youngjoon Lee 2024-11-22 19:08:50 +09:00
parent cb708de1b0
commit eebe305544
No known key found for this signature in database
GPG Key ID: 303963A54A81DD4D
7 changed files with 153 additions and 81 deletions

View File

@ -12,6 +12,7 @@ where
settings: CryptographicProcessorSettings<M::PrivateKey>,
membership: Membership<M>,
rng: R,
mix_message: M,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -28,18 +29,20 @@ where
{
pub fn new(
settings: CryptographicProcessorSettings<M::PrivateKey>,
mix_message_settings: M::Settings,
membership: Membership<M>,
rng: R,
) -> Self {
let mix_message = M::new(mix_message_settings);
Self {
settings,
membership,
rng,
mix_message,
}
}
pub fn wrap_message(&mut self, message: &[u8]) -> Result<Vec<u8>, M::Error> {
// TODO: Use the actual Sphinx encoding instead of mock.
let public_keys = self
.membership
.choose_remote_nodes(&mut self.rng, self.settings.num_mix_layers)
@ -47,10 +50,11 @@ where
.map(|node| node.public_key.clone())
.collect::<Vec<_>>();
M::build_message(message, &public_keys)
self.mix_message.build_message(message, &public_keys)
}
pub fn unwrap_message(&self, message: &[u8]) -> Result<(Vec<u8>, bool), M::Error> {
M::unwrap_message(message, &self.settings.private_key)
self.mix_message
.unwrap_message(message, &self.settings.private_key)
}
}

View File

@ -59,12 +59,14 @@ where
pub fn new(
input_stream: S,
settings: MessageBlendSettings<M>,
mix_message_settings: M::Settings,
membership: Membership<M>,
scheduler: Scheduler,
cryptographic_processor_rng: Rng,
) -> Self {
let cryptographic_processor = CryptographicProcessor::new(
settings.cryptographic_processor,
mix_message_settings,
membership,
cryptographic_processor_rng,
);
@ -132,6 +134,7 @@ where
fn blend(
self,
message_blend_settings: MessageBlendSettings<M>,
mix_message_settings: M::Settings,
membership: Membership<M>,
scheduler: Scheduler,
cryptographic_processor_rng: Rng,
@ -142,6 +145,7 @@ where
MessageBlendStream::new(
self,
message_blend_settings,
mix_message_settings,
membership,
scheduler,
cryptographic_processor_rng,

View File

@ -1,9 +1,6 @@
use futures::{Stream, StreamExt};
use nomos_mix_message::MixMessage;
use rand::{distributions::Uniform, prelude::Distribution, Rng, RngCore};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::marker::PhantomData;
use std::pin::{pin, Pin};
use std::task::{Context, Poll};
@ -25,7 +22,7 @@ impl Default for PersistentTransmissionSettings {
}
/// Transmit scheduled messages with a persistent rate as a stream.
pub struct PersistentTransmissionStream<S, Rng, M, Scheduler>
pub struct PersistentTransmissionStream<S, Rng, Scheduler>
where
S: Stream,
Rng: RngCore,
@ -33,14 +30,13 @@ where
coin: Coin<Rng>,
stream: S,
scheduler: Scheduler,
_mix_message: PhantomData<M>,
drop_message: Vec<u8>,
}
impl<S, Rng, M, Scheduler> PersistentTransmissionStream<S, Rng, M, Scheduler>
impl<S, Rng, Scheduler> PersistentTransmissionStream<S, Rng, Scheduler>
where
S: Stream,
Rng: RngCore,
M: MixMessage,
Scheduler: Stream<Item = ()>,
{
pub fn new(
@ -48,22 +44,22 @@ where
stream: S,
scheduler: Scheduler,
rng: Rng,
) -> PersistentTransmissionStream<S, Rng, M, Scheduler> {
drop_message: Vec<u8>,
) -> PersistentTransmissionStream<S, Rng, Scheduler> {
let coin = Coin::<Rng>::new(rng, settings.drop_message_probability).unwrap();
Self {
coin,
stream,
scheduler,
_mix_message: Default::default(),
drop_message,
}
}
}
impl<S, Rng, M, Scheduler> Stream for PersistentTransmissionStream<S, Rng, M, Scheduler>
impl<S, Rng, Scheduler> Stream for PersistentTransmissionStream<S, Rng, Scheduler>
where
S: Stream<Item = Vec<u8>> + Unpin,
Rng: RngCore + Unpin,
M: MixMessage + Unpin,
Scheduler: Stream<Item = ()> + Unpin,
{
type Item = Vec<u8>;
@ -73,6 +69,7 @@ where
ref mut scheduler,
ref mut stream,
ref mut coin,
ref drop_message,
..
} = self.get_mut();
if pin!(scheduler).poll_next_unpin(cx).is_pending() {
@ -81,17 +78,16 @@ where
if let Poll::Ready(Some(item)) = pin!(stream).poll_next(cx) {
Poll::Ready(Some(item))
} else if coin.flip() {
Poll::Ready(Some(M::DROP_MESSAGE.to_vec()))
Poll::Ready(Some(drop_message.clone()))
} else {
Poll::Pending
}
}
}
pub trait PersistentTransmissionExt<Rng, M, Scheduler>: Stream
pub trait PersistentTransmissionExt<Rng, Scheduler>: Stream
where
Rng: RngCore,
M: MixMessage,
Scheduler: Stream<Item = ()>,
{
fn persistent_transmission(
@ -99,20 +95,19 @@ where
settings: PersistentTransmissionSettings,
rng: Rng,
scheduler: Scheduler,
) -> PersistentTransmissionStream<Self, Rng, M, Scheduler>
drop_message: Vec<u8>,
) -> PersistentTransmissionStream<Self, Rng, Scheduler>
where
Self: Sized + Unpin,
{
PersistentTransmissionStream::new(settings, self, scheduler, rng)
PersistentTransmissionStream::new(settings, self, scheduler, rng, drop_message)
}
}
impl<S, Rng, M, Scheduler> PersistentTransmissionExt<Rng, M, Scheduler> for S
impl<S, Rng, Scheduler> PersistentTransmissionExt<Rng, Scheduler> for S
where
S: Stream,
Rng: RngCore,
M: MixMessage,
M::PublicKey: Clone + Serialize + DeserializeOwned,
Scheduler: Stream<Item = ()>,
{
}
@ -150,7 +145,6 @@ enum CoinError {
mod tests {
use super::*;
use futures::StreamExt;
use nomos_mix_message::mock::MockMixMessage;
use rand::SeedableRng;
use rand_chacha::ChaCha8Rng;
use std::time::Duration;
@ -196,15 +190,13 @@ mod tests {
let lower_bound = expected_emission_interval - torelance;
let upper_bound = expected_emission_interval + torelance;
// prepare stream
let mut persistent_transmission_stream: PersistentTransmissionStream<
_,
_,
MockMixMessage,
_,
> = stream.persistent_transmission(
let drop_message = vec![0u8; 10];
let mut persistent_transmission_stream: PersistentTransmissionStream<_, _, _> = stream
.persistent_transmission(
settings,
ChaCha8Rng::from_entropy(),
IntervalStream::new(time::interval(expected_emission_interval)).map(|_| ()),
drop_message.clone(),
);
// Messages must be scheduled in non-blocking manner.
schedule_sender.send(vec![1]).unwrap();
@ -230,14 +222,16 @@ mod tests {
);
assert_interval!(&mut last_time, lower_bound, upper_bound);
assert!(MockMixMessage::is_drop_message(
&persistent_transmission_stream.next().await.unwrap()
));
assert_eq!(
&persistent_transmission_stream.next().await.unwrap(),
&drop_message
);
assert_interval!(&mut last_time, lower_bound, upper_bound);
assert!(MockMixMessage::is_drop_message(
&persistent_transmission_stream.next().await.unwrap()
));
assert_eq!(
&persistent_transmission_stream.next().await.unwrap(),
&drop_message
);
assert_interval!(&mut last_time, lower_bound, upper_bound);
// Schedule a new message and check if it is emitted at the next interval

View File

@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
itertools = "0.13"
rand_chacha = "0.3"
serde = { version = "1.0.215", features = ["derive"] }
sha2 = "0.10"
sphinx-packet = "0.2"
thiserror = "1.0.65"

View File

@ -4,10 +4,13 @@ pub mod sphinx;
pub trait MixMessage {
type PublicKey;
type PrivateKey;
type Settings;
type Error;
const DROP_MESSAGE: &'static [u8];
fn new(settings: Self::Settings) -> Self;
fn build_message(
&self,
payload: &[u8],
public_keys: &[Self::PublicKey],
) -> Result<Vec<u8>, Self::Error>;
@ -19,10 +22,14 @@ pub trait MixMessage {
/// If the input message was already fully unwrapped, or if its format is invalid,
/// this function returns `[Error::InvalidMixMessage]`.
fn unwrap_message(
&self,
message: &[u8],
private_key: &Self::PrivateKey,
) -> Result<(Vec<u8>, bool), Self::Error>;
fn is_drop_message(message: &[u8]) -> bool {
message == Self::DROP_MESSAGE
fn drop_message(&self) -> &[u8];
fn is_drop_message(&self, message: &[u8]) -> bool {
message == self.drop_message()
}
}

View File

@ -1,68 +1,99 @@
pub mod error;
use error::Error;
use serde::{Deserialize, Serialize};
use crate::MixMessage;
// TODO: Remove all the mock below once the actual implementation is integrated to the system.
//
/// A mock implementation of the Sphinx encoding.
const NODE_ID_SIZE: usize = 32;
// TODO: Move MAX_PAYLOAD_SIZE and MAX_LAYERS to the upper layer (service layer).
const MAX_PAYLOAD_SIZE: usize = 2048;
const PAYLOAD_PADDING_SEPARATOR: u8 = 0x01;
const PAYLOAD_PADDING_SEPARATOR_SIZE: usize = 1;
const MAX_LAYERS: usize = 5;
pub const MESSAGE_SIZE: usize =
NODE_ID_SIZE * MAX_LAYERS + MAX_PAYLOAD_SIZE + PAYLOAD_PADDING_SEPARATOR_SIZE;
const PAYLOAD_PADDING_SEPARATOR_SIZE: usize = std::mem::size_of::<u8>();
#[derive(Clone, Debug)]
pub struct MockMixMessage;
pub struct MockMixMessage {
settings: MockMixMessageSettings,
drop_message: Vec<u8>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MockMixMessageSettings {
max_layers: usize,
max_payload_size: usize,
}
impl MockMixMessageSettings {
const fn message_size(&self) -> usize {
NODE_ID_SIZE * self.max_layers + self.max_payload_size + PAYLOAD_PADDING_SEPARATOR_SIZE
}
}
impl MockMixMessage {
const fn size(&self) -> usize {
self.settings.message_size()
}
}
impl MixMessage for MockMixMessage {
type PublicKey = [u8; NODE_ID_SIZE];
type PrivateKey = [u8; NODE_ID_SIZE];
type Settings = MockMixMessageSettings;
type Error = Error;
const DROP_MESSAGE: &'static [u8] = &[0; MESSAGE_SIZE];
fn new(settings: Self::Settings) -> Self {
let drop_message = vec![0; settings.message_size()];
Self {
settings,
drop_message,
}
}
/// The length of the encoded message is fixed to [`MESSAGE_SIZE`] bytes.
/// The [`MAX_LAYERS`] number of [`NodeId`]s are concatenated in front of the payload.
/// The payload is zero-padded to the end.
///
fn build_message(
&self,
payload: &[u8],
public_keys: &[Self::PublicKey],
) -> Result<Vec<u8>, Self::Error> {
let MockMixMessageSettings {
max_layers,
max_payload_size,
} = self.settings;
// In this mock, we don't encrypt anything. So, we use public key as just a node ID.
let node_ids = public_keys;
if node_ids.is_empty() || node_ids.len() > MAX_LAYERS {
if node_ids.is_empty() || node_ids.len() > max_layers {
return Err(Error::InvalidNumberOfLayers);
}
if payload.len() > MAX_PAYLOAD_SIZE {
if payload.len() > max_payload_size {
return Err(Error::PayloadTooLarge);
}
let mut message: Vec<u8> = Vec::with_capacity(MESSAGE_SIZE);
let mut message: Vec<u8> = Vec::with_capacity(self.size());
node_ids.iter().for_each(|node_id| {
message.extend(node_id);
});
// If there is any remaining layers, fill them with zeros.
(0..MAX_LAYERS - node_ids.len()).for_each(|_| message.extend(&[0; NODE_ID_SIZE]));
(0..max_layers - node_ids.len()).for_each(|_| message.extend(&[0; NODE_ID_SIZE]));
// Append payload with padding
message.extend(payload);
message.push(PAYLOAD_PADDING_SEPARATOR);
message.extend(std::iter::repeat(0).take(MAX_PAYLOAD_SIZE - payload.len()));
message.extend(std::iter::repeat(0).take(max_payload_size - payload.len()));
Ok(message)
}
fn unwrap_message(
&self,
message: &[u8],
private_key: &Self::PrivateKey,
) -> Result<(Vec<u8>, bool), Self::Error> {
if message.len() != MESSAGE_SIZE {
let MockMixMessageSettings { max_layers, .. } = self.settings;
if message.len() != self.size() {
return Err(Error::InvalidMixMessage);
}
@ -74,7 +105,7 @@ impl MixMessage for MockMixMessage {
// If this is the last layer
if message[NODE_ID_SIZE..NODE_ID_SIZE * 2] == [0; NODE_ID_SIZE] {
let padded_payload = &message[NODE_ID_SIZE * MAX_LAYERS..];
let padded_payload = &message[NODE_ID_SIZE * max_layers..];
// remove the payload padding
match padded_payload
.iter()
@ -87,12 +118,16 @@ impl MixMessage for MockMixMessage {
}
}
let mut new_message: Vec<u8> = Vec::with_capacity(MESSAGE_SIZE);
new_message.extend(&message[NODE_ID_SIZE..NODE_ID_SIZE * MAX_LAYERS]);
let mut new_message: Vec<u8> = Vec::with_capacity(self.size());
new_message.extend(&message[NODE_ID_SIZE..NODE_ID_SIZE * max_layers]);
new_message.extend(&[0; NODE_ID_SIZE]);
new_message.extend(&message[NODE_ID_SIZE * MAX_LAYERS..]); // padded payload
new_message.extend(&message[NODE_ID_SIZE * max_layers..]); // padded payload
Ok((new_message, false))
}
fn drop_message(&self) -> &[u8] {
&self.drop_message
}
}
#[cfg(test)]
@ -101,23 +136,28 @@ mod tests {
#[test]
fn message() {
let mock_message = MockMixMessage::new(MockMixMessageSettings {
max_layers: 5,
max_payload_size: 100,
});
let node_ids = (0..3).map(|i| [i; NODE_ID_SIZE]).collect::<Vec<_>>();
let payload = [7; 10];
let message = MockMixMessage::build_message(&payload, &node_ids).unwrap();
assert_eq!(message.len(), MESSAGE_SIZE);
let message = mock_message.build_message(&payload, &node_ids).unwrap();
assert_eq!(message.len(), mock_message.size());
let (message, is_fully_unwrapped) =
MockMixMessage::unwrap_message(&message, &node_ids[0]).unwrap();
mock_message.unwrap_message(&message, &node_ids[0]).unwrap();
assert!(!is_fully_unwrapped);
assert_eq!(message.len(), MESSAGE_SIZE);
assert_eq!(message.len(), mock_message.size());
let (message, is_fully_unwrapped) =
MockMixMessage::unwrap_message(&message, &node_ids[1]).unwrap();
mock_message.unwrap_message(&message, &node_ids[1]).unwrap();
assert!(!is_fully_unwrapped);
assert_eq!(message.len(), MESSAGE_SIZE);
assert_eq!(message.len(), mock_message.size());
let (unwrapped_payload, is_fully_unwrapped) =
MockMixMessage::unwrap_message(&message, &node_ids[2]).unwrap();
mock_message.unwrap_message(&message, &node_ids[2]).unwrap();
assert!(is_fully_unwrapped);
assert_eq!(unwrapped_payload, payload);
}

View File

@ -1,5 +1,6 @@
use error::Error;
use packet::{Packet, UnpackedPacket};
use serde::{Deserialize, Serialize};
use crate::MixMessage;
@ -8,21 +9,35 @@ mod layered_cipher;
pub mod packet;
mod routing;
pub struct SphinxMessage;
pub struct SphinxMessage {
settings: SphinxMessageSettings,
drop_message: Vec<u8>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SphinxMessageSettings {
max_layers: usize,
max_payload_size: usize,
}
const ASYM_KEY_SIZE: usize = 32;
// TODO: Move these constants to the upper layer (service layer).
const MAX_PAYLOAD_SIZE: usize = 2048;
const MAX_LAYERS: usize = 5;
impl MixMessage for SphinxMessage {
type PublicKey = [u8; ASYM_KEY_SIZE];
type PrivateKey = [u8; ASYM_KEY_SIZE];
type Settings = SphinxMessageSettings;
type Error = Error;
const DROP_MESSAGE: &'static [u8] = &[0; Packet::size(MAX_LAYERS, MAX_PAYLOAD_SIZE)];
fn new(settings: Self::Settings) -> Self {
let drop_message = vec![0; Packet::size(settings.max_layers, settings.max_payload_size)];
Self {
settings,
drop_message,
}
}
fn build_message(
&self,
payload: &[u8],
public_keys: &[Self::PublicKey],
) -> Result<Vec<u8>, Self::Error> {
@ -31,25 +46,32 @@ impl MixMessage for SphinxMessage {
.iter()
.map(|k| x25519_dalek::PublicKey::from(*k))
.collect::<Vec<_>>(),
MAX_LAYERS,
self.settings.max_layers,
payload,
MAX_PAYLOAD_SIZE,
self.settings.max_payload_size,
)?;
Ok(packet.to_bytes())
}
fn unwrap_message(
&self,
message: &[u8],
private_key: &Self::PrivateKey,
) -> Result<(Vec<u8>, bool), Self::Error> {
let packet = Packet::from_bytes(message, MAX_LAYERS)?;
let unpacked_packet =
packet.unpack(&x25519_dalek::StaticSecret::from(*private_key), MAX_LAYERS)?;
let packet = Packet::from_bytes(message, self.settings.max_layers)?;
let unpacked_packet = packet.unpack(
&x25519_dalek::StaticSecret::from(*private_key),
self.settings.max_layers,
)?;
match unpacked_packet {
UnpackedPacket::ToForward(packet) => Ok((packet.to_bytes(), false)),
UnpackedPacket::FullyUnpacked(payload) => Ok((payload, true)),
}
}
fn drop_message(&self) -> &[u8] {
&self.drop_message
}
}
fn parse_bytes<'a>(data: &'a [u8], sizes: &[usize]) -> Result<Vec<&'a [u8]>, String> {