Mix: Integrate cover traffic to mix service (#920)
This commit is contained in:
parent
dc286a967a
commit
63c472e172
|
@ -62,6 +62,13 @@ mix:
|
||||||
num_mix_layers: 1
|
num_mix_layers: 1
|
||||||
temporal_processor:
|
temporal_processor:
|
||||||
max_delay_seconds: 5
|
max_delay_seconds: 5
|
||||||
|
cover_traffic:
|
||||||
|
epoch_duration:
|
||||||
|
secs: 432000
|
||||||
|
nanos: 0
|
||||||
|
slot_duration:
|
||||||
|
secs: 20
|
||||||
|
nanos: 0
|
||||||
membership:
|
membership:
|
||||||
- address: /ip4/127.0.0.1/udp/3001/quic-v1
|
- address: /ip4/127.0.0.1/udp/3001/quic-v1
|
||||||
public_key: [1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1]
|
public_key: [1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1]
|
||||||
|
|
|
@ -122,7 +122,7 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_ticket() {
|
fn test_ticket() {
|
||||||
generate_ticket(10u32.to_be_bytes(), 1123, 0);
|
generate_ticket(10u32.to_be_bytes(), 1123, 0);
|
||||||
for i in (0..1u32) {
|
for i in 0..1u32 {
|
||||||
let slots = select_slot(i.to_be_bytes(), 1234, 100, 21600, winning_probability(1));
|
let slots = select_slot(i.to_be_bytes(), 1234, 100, 21600, winning_probability(1));
|
||||||
println!("slots = {slots:?}");
|
println!("slots = {slots:?}");
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ where
|
||||||
M: MixMessage,
|
M: MixMessage,
|
||||||
{
|
{
|
||||||
remote_nodes: Vec<Node<M::PublicKey>>,
|
remote_nodes: Vec<Node<M::PublicKey>>,
|
||||||
|
local_node: Node<M::PublicKey>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
@ -22,10 +23,20 @@ where
|
||||||
M: MixMessage,
|
M: MixMessage,
|
||||||
M::PublicKey: PartialEq,
|
M::PublicKey: PartialEq,
|
||||||
{
|
{
|
||||||
pub fn new(mut nodes: Vec<Node<M::PublicKey>>, local_public_key: M::PublicKey) -> Self {
|
pub fn new(nodes: Vec<Node<M::PublicKey>>, local_public_key: M::PublicKey) -> Self {
|
||||||
nodes.retain(|node| node.public_key != local_public_key);
|
let mut remote_nodes = Vec::with_capacity(nodes.len() - 1);
|
||||||
|
let mut local_node = None;
|
||||||
|
nodes.into_iter().for_each(|node| {
|
||||||
|
if node.public_key == local_public_key {
|
||||||
|
local_node = Some(node);
|
||||||
|
} else {
|
||||||
|
remote_nodes.push(node);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
remote_nodes: nodes,
|
remote_nodes,
|
||||||
|
local_node: local_node.expect("Local node not found"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,4 +47,12 @@ where
|
||||||
) -> Vec<&Node<M::PublicKey>> {
|
) -> Vec<&Node<M::PublicKey>> {
|
||||||
self.remote_nodes.choose_multiple(rng, amount).collect()
|
self.remote_nodes.choose_multiple(rng, amount).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn local_node(&self) -> &Node<M::PublicKey> {
|
||||||
|
&self.local_node
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn size(&self) -> usize {
|
||||||
|
self.remote_nodes.len() + 1
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -229,6 +229,10 @@ pub fn new_node(
|
||||||
max_delay_seconds: 2,
|
max_delay_seconds: 2,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
cover_traffic: nomos_mix_service::CoverTrafficExtSettings {
|
||||||
|
epoch_duration: Duration::from_secs(432000),
|
||||||
|
slot_duration: Duration::from_secs(20),
|
||||||
|
},
|
||||||
membership: mix_config.membership.clone(),
|
membership: mix_config.membership.clone(),
|
||||||
},
|
},
|
||||||
da_network: DaNetworkConfig {
|
da_network: DaNetworkConfig {
|
||||||
|
|
|
@ -6,15 +6,18 @@ use backends::MixBackend;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use network::NetworkAdapter;
|
use network::NetworkAdapter;
|
||||||
use nomos_core::wire;
|
use nomos_core::wire;
|
||||||
use nomos_mix::membership::{Membership, Node};
|
|
||||||
use nomos_mix::message_blend::crypto::CryptographicProcessor;
|
|
||||||
use nomos_mix::message_blend::temporal::TemporalScheduler;
|
use nomos_mix::message_blend::temporal::TemporalScheduler;
|
||||||
|
use nomos_mix::message_blend::{crypto::CryptographicProcessor, CryptographicProcessorSettings};
|
||||||
use nomos_mix::message_blend::{MessageBlendExt, MessageBlendSettings};
|
use nomos_mix::message_blend::{MessageBlendExt, MessageBlendSettings};
|
||||||
use nomos_mix::persistent_transmission::{
|
use nomos_mix::persistent_transmission::{
|
||||||
PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream,
|
PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream,
|
||||||
};
|
};
|
||||||
use nomos_mix::MixOutgoingMessage;
|
use nomos_mix::MixOutgoingMessage;
|
||||||
use nomos_mix_message::mock::MockMixMessage;
|
use nomos_mix::{
|
||||||
|
cover_traffic::{CoverTraffic, CoverTrafficSettings},
|
||||||
|
membership::{Membership, Node},
|
||||||
|
};
|
||||||
|
use nomos_mix_message::{mock::MockMixMessage, MixMessage};
|
||||||
use nomos_network::NetworkService;
|
use nomos_network::NetworkService;
|
||||||
use overwatch_rs::services::{
|
use overwatch_rs::services::{
|
||||||
handle::ServiceStateHandle,
|
handle::ServiceStateHandle,
|
||||||
|
@ -127,12 +130,22 @@ where
|
||||||
ChaCha12Rng::from_entropy(),
|
ChaCha12Rng::from_entropy(),
|
||||||
);
|
);
|
||||||
let mut blend_messages = backend.listen_to_incoming_messages().blend(
|
let mut blend_messages = backend.listen_to_incoming_messages().blend(
|
||||||
mix_config.message_blend,
|
mix_config.message_blend.clone(),
|
||||||
membership.clone(),
|
membership.clone(),
|
||||||
temporal_scheduler,
|
temporal_scheduler,
|
||||||
ChaCha12Rng::from_entropy(),
|
ChaCha12Rng::from_entropy(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// tier 3 cover traffic
|
||||||
|
let mut cover_traffic: CoverTraffic<_, _, MockMixMessage> = CoverTraffic::new(
|
||||||
|
mix_config.cover_traffic.cover_traffic_settings(
|
||||||
|
&membership,
|
||||||
|
&mix_config.message_blend.cryptographic_processor,
|
||||||
|
),
|
||||||
|
mix_config.cover_traffic.epoch_stream(),
|
||||||
|
mix_config.cover_traffic.slot_stream(),
|
||||||
|
);
|
||||||
|
|
||||||
// local messages, are bypassed and send immediately
|
// local messages, are bypassed and send immediately
|
||||||
let mut local_messages = service_state
|
let mut local_messages = service_state
|
||||||
.inbound_relay
|
.inbound_relay
|
||||||
|
@ -162,23 +175,17 @@ where
|
||||||
network_adapter.broadcast(msg.message, msg.broadcast_settings).await;
|
network_adapter.broadcast(msg.message, msg.broadcast_settings).await;
|
||||||
},
|
},
|
||||||
_ => {
|
_ => {
|
||||||
tracing::error!("unrecognized message from mix backend");
|
tracing::debug!("unrecognized message from mix backend");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Some(msg) = cover_traffic.next() => {
|
||||||
|
Self::wrap_and_send_to_persistent_transmission(msg, &mut cryptographic_processor, &persistent_sender);
|
||||||
|
}
|
||||||
Some(msg) = local_messages.next() => {
|
Some(msg) = local_messages.next() => {
|
||||||
match cryptographic_processor.wrap_message(&msg) {
|
Self::wrap_and_send_to_persistent_transmission(msg, &mut cryptographic_processor, &persistent_sender);
|
||||||
Ok(wrapped_message) => {
|
|
||||||
if let Err(e) = persistent_sender.send(wrapped_message) {
|
|
||||||
tracing::error!("Error sending message to persistent stream: {e}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!("Failed to wrap message: {:?}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Some(msg) = lifecycle_stream.next() => {
|
Some(msg) = lifecycle_stream.next() => {
|
||||||
if Self::should_stop_service(msg).await {
|
if Self::should_stop_service(msg).await {
|
||||||
|
@ -214,6 +221,23 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn wrap_and_send_to_persistent_transmission(
|
||||||
|
message: Vec<u8>,
|
||||||
|
cryptographic_processor: &mut CryptographicProcessor<ChaCha12Rng, MockMixMessage>,
|
||||||
|
persistent_sender: &mpsc::UnboundedSender<Vec<u8>>,
|
||||||
|
) {
|
||||||
|
match cryptographic_processor.wrap_message(&message) {
|
||||||
|
Ok(wrapped_message) => {
|
||||||
|
if let Err(e) = persistent_sender.send(wrapped_message) {
|
||||||
|
tracing::error!("Error sending message to persistent stream: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Failed to wrap message: {:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||||
|
@ -221,11 +245,64 @@ pub struct MixConfig<BackendSettings> {
|
||||||
pub backend: BackendSettings,
|
pub backend: BackendSettings,
|
||||||
pub message_blend: MessageBlendSettings<MockMixMessage>,
|
pub message_blend: MessageBlendSettings<MockMixMessage>,
|
||||||
pub persistent_transmission: PersistentTransmissionSettings,
|
pub persistent_transmission: PersistentTransmissionSettings,
|
||||||
|
pub cover_traffic: CoverTrafficExtSettings,
|
||||||
pub membership: Vec<
|
pub membership: Vec<
|
||||||
Node<<nomos_mix_message::mock::MockMixMessage as nomos_mix_message::MixMessage>::PublicKey>,
|
Node<<nomos_mix_message::mock::MockMixMessage as nomos_mix_message::MixMessage>::PublicKey>,
|
||||||
>,
|
>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||||
|
pub struct CoverTrafficExtSettings {
|
||||||
|
pub epoch_duration: Duration,
|
||||||
|
pub slot_duration: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CoverTrafficExtSettings {
|
||||||
|
fn cover_traffic_settings(
|
||||||
|
&self,
|
||||||
|
membership: &Membership<MockMixMessage>,
|
||||||
|
cryptographic_processor_settings: &CryptographicProcessorSettings<
|
||||||
|
<MockMixMessage as MixMessage>::PrivateKey,
|
||||||
|
>,
|
||||||
|
) -> CoverTrafficSettings {
|
||||||
|
CoverTrafficSettings {
|
||||||
|
node_id: membership.local_node().public_key,
|
||||||
|
number_of_hops: cryptographic_processor_settings.num_mix_layers,
|
||||||
|
slots_per_epoch: self.slots_per_epoch(),
|
||||||
|
network_size: membership.size(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn slots_per_epoch(&self) -> usize {
|
||||||
|
(self.epoch_duration.as_secs() as usize)
|
||||||
|
.checked_div(self.slot_duration.as_secs() as usize)
|
||||||
|
.expect("Invalid epoch & slot duration")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn epoch_stream(
|
||||||
|
&self,
|
||||||
|
) -> futures::stream::Map<
|
||||||
|
futures::stream::Enumerate<IntervalStream>,
|
||||||
|
impl FnMut((usize, time::Instant)) -> usize,
|
||||||
|
> {
|
||||||
|
IntervalStream::new(time::interval(self.epoch_duration))
|
||||||
|
.enumerate()
|
||||||
|
.map(|(i, _)| i)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn slot_stream(
|
||||||
|
&self,
|
||||||
|
) -> futures::stream::Map<
|
||||||
|
futures::stream::Enumerate<IntervalStream>,
|
||||||
|
impl FnMut((usize, time::Instant)) -> usize,
|
||||||
|
> {
|
||||||
|
let slots_per_epoch = self.slots_per_epoch();
|
||||||
|
IntervalStream::new(time::interval(self.slot_duration))
|
||||||
|
.enumerate()
|
||||||
|
.map(move |(i, _)| i % slots_per_epoch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<BackendSettings> MixConfig<BackendSettings> {
|
impl<BackendSettings> MixConfig<BackendSettings> {
|
||||||
fn membership(&self) -> Membership<MockMixMessage> {
|
fn membership(&self) -> Membership<MockMixMessage> {
|
||||||
// We use private key as a public key because the `MockMixMessage` doesn't differentiate between them.
|
// We use private key as a public key because the `MockMixMessage` doesn't differentiate between them.
|
||||||
|
|
|
@ -166,6 +166,10 @@ pub fn create_executor_config(config: GeneralConfig) -> Config {
|
||||||
max_delay_seconds: 2,
|
max_delay_seconds: 2,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
cover_traffic: nomos_mix_service::CoverTrafficExtSettings {
|
||||||
|
epoch_duration: Duration::from_secs(432000),
|
||||||
|
slot_duration: Duration::from_secs(20),
|
||||||
|
},
|
||||||
membership: config.mix_config.membership,
|
membership: config.mix_config.membership,
|
||||||
},
|
},
|
||||||
cryptarchia: CryptarchiaSettings {
|
cryptarchia: CryptarchiaSettings {
|
||||||
|
|
|
@ -252,6 +252,10 @@ pub fn create_validator_config(config: GeneralConfig) -> Config {
|
||||||
max_delay_seconds: 2,
|
max_delay_seconds: 2,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
cover_traffic: nomos_mix_service::CoverTrafficExtSettings {
|
||||||
|
epoch_duration: Duration::from_secs(432000),
|
||||||
|
slot_duration: Duration::from_secs(20),
|
||||||
|
},
|
||||||
membership: config.mix_config.membership,
|
membership: config.mix_config.membership,
|
||||||
},
|
},
|
||||||
cryptarchia: CryptarchiaSettings {
|
cryptarchia: CryptarchiaSettings {
|
||||||
|
|
Loading…
Reference in New Issue