use temporal processor directly

This commit is contained in:
Youngjoon Lee 2024-12-20 22:08:45 +09:00
parent e597303e2f
commit d6c5ff2004
No known key found for this signature in database
GPG Key ID: 303963A54A81DD4D
2 changed files with 58 additions and 46 deletions

View File

@ -21,12 +21,11 @@ use netrunner::{
network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize},
warding::WardCondition,
};
use nomos_blend::message_blend::temporal::{TemporalProcessorExt, TemporalStream};
use nomos_blend::{
cover_traffic::{CoverTraffic, CoverTrafficSettings},
membership::Membership,
message_blend::{
crypto::CryptographicProcessor, MessageBlendExt, MessageBlendSettings, MessageBlendStream,
},
message_blend::{crypto::CryptographicProcessor, MessageBlendSettings},
persistent_transmission::{
PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream,
},
@ -35,7 +34,7 @@ use nomos_blend::{
use nomos_blend_message::mock::MockBlendMessage;
use rand::SeedableRng;
use rand_chacha::ChaCha12Rng;
use scheduler::{Interval, TemporalRelease};
use scheduler::{Interval, TemporalScheduler};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use state::BlendnodeState;
@ -88,14 +87,10 @@ pub struct BlendNode {
Interval,
>,
crypto_processor: CryptographicProcessor<ChaCha12Rng, MockBlendMessage>,
blend_sender: channel::Sender<Vec<u8>>,
blend_update_time_sender: channel::Sender<Duration>,
blend_messages: MessageBlendStream<
CrossbeamReceiverStream<Vec<u8>>,
ChaCha12Rng,
MockBlendMessage,
TemporalRelease,
>,
temporal_sender: channel::Sender<BlendOutgoingMessage>,
temporal_update_time_sender: channel::Sender<Duration>,
temporal_processor_messages:
TemporalStream<CrossbeamReceiverStream<BlendOutgoingMessage>, TemporalScheduler>,
epoch_update_sender: channel::Sender<Duration>,
slot_update_sender: channel::Sender<Duration>,
cover_traffic: CoverTraffic<Epoch, Slot, MockBlendMessage>,
@ -136,9 +131,7 @@ impl BlendNode {
),
);
// Init Tier-2: message blend
let (blend_sender, blend_receiver) = channel::unbounded();
let (blend_update_time_sender, blend_update_time_receiver) = channel::unbounded();
// Init Tier-2: message blend: CryptographicProcessor and TemporalProcessor
let nodes: Vec<
nomos_blend::membership::Node<
<MockBlendMessage as nomos_blend_message::BlendMessage>::PublicKey,
@ -157,20 +150,17 @@ impl BlendNode {
membership.clone(),
ChaCha12Rng::from_rng(&mut rng_generator).unwrap(),
);
let temporal_release = TemporalRelease::new(
ChaCha12Rng::from_rng(&mut rng_generator).unwrap(),
blend_update_time_receiver,
(
1,
settings.message_blend.temporal_processor.max_delay_seconds,
),
);
let blend_messages = CrossbeamReceiverStream::new(blend_receiver).blend(
settings.message_blend.clone(),
membership,
temporal_release,
ChaCha12Rng::from_rng(&mut rng_generator).unwrap(),
);
let (temporal_sender, temporal_receiver) = channel::unbounded();
let (temporal_update_time_sender, temporal_update_time_receiver) = channel::unbounded();
let temporal_processor_messages = CrossbeamReceiverStream::new(temporal_receiver)
.temporal_stream(TemporalScheduler::new(
ChaCha12Rng::from_rng(&mut rng_generator).unwrap(),
temporal_update_time_receiver,
(
1,
settings.message_blend.temporal_processor.max_delay_seconds,
),
));
// tier 3 cover traffic
let (epoch_update_sender, epoch_updater_update_receiver) = channel::unbounded();
@ -204,9 +194,9 @@ impl BlendNode {
persistent_update_time_sender,
persistent_transmission_messages,
crypto_processor,
blend_sender,
blend_update_time_sender,
blend_messages,
temporal_sender,
temporal_update_time_sender,
temporal_processor_messages,
epoch_update_sender,
slot_update_sender,
cover_traffic,
@ -264,9 +254,29 @@ impl BlendNode {
self.persistent_sender.send(message).unwrap();
}
fn schedule_blend(&mut self, message: Vec<u8>) {
self.log_message("BlendScheduled", &Self::parse_payload(&message));
self.blend_sender.send(message).unwrap();
fn handle_incoming_message(&mut self, message: Vec<u8>) {
match self.crypto_processor.unwrap_message(&message) {
Ok((unwrapped_message, fully_unwrapped)) => {
let temporal_message = if fully_unwrapped {
BlendOutgoingMessage::FullyUnwrapped(unwrapped_message)
} else {
BlendOutgoingMessage::Outbound(unwrapped_message)
};
self.schedule_temporal_processor(temporal_message);
}
Err(e) => {
tracing::error!("Failed to unwrap message: {:?}", e);
}
}
}
fn schedule_temporal_processor(&mut self, message: BlendOutgoingMessage) {
let payload = match &message {
BlendOutgoingMessage::FullyUnwrapped(payload) => Payload::load(payload.clone()),
BlendOutgoingMessage::Outbound(msg) => Self::parse_payload(&msg),
};
self.log_message("TemporalProcessorScheduled", &payload);
self.temporal_sender.send(message).unwrap();
}
fn parse_payload(message: &[u8]) -> Payload {
@ -278,7 +288,7 @@ impl BlendNode {
.send(elapsed)
.unwrap();
self.persistent_update_time_sender.send(elapsed).unwrap();
self.blend_update_time_sender.send(elapsed).unwrap();
self.temporal_update_time_sender.send(elapsed).unwrap();
self.epoch_update_sender.send(elapsed).unwrap();
self.slot_update_sender.send(elapsed).unwrap();
}
@ -357,19 +367,21 @@ impl Node for BlendNode {
Some(network_message.from),
None,
);
self.schedule_blend(network_message.into_payload().0);
self.handle_incoming_message(network_message.into_payload().0);
}
// Proceed message blend
if let Poll::Ready(Some(msg)) = pin!(&mut self.blend_messages).poll_next(&mut cx) {
// Proceed temporal processor
if let Poll::Ready(Some(msg)) =
pin!(&mut self.temporal_processor_messages).poll_next(&mut cx)
{
match msg {
BlendOutgoingMessage::Outbound(msg) => {
self.log_message_released_from("Blend", &Self::parse_payload(&msg));
self.log_message_released_from("TemporalProcessor", &Self::parse_payload(&msg));
self.schedule_persistent_transmission(msg);
}
BlendOutgoingMessage::FullyUnwrapped(payload) => {
let payload = Payload::load(payload);
self.log_message_released_from("Blend", &payload);
self.log_message_released_from("TemporalProcessor", &payload);
self.log_message_fully_unwrapped(&payload);
self.state.num_messages_fully_unwrapped += 1;
//TODO: create a tracing event

View File

@ -44,14 +44,14 @@ impl Stream for Interval {
}
}
pub struct TemporalRelease {
pub struct TemporalScheduler {
random_sleeps: Box<dyn Iterator<Item = Duration> + Send + Sync + 'static>,
elapsed: Duration,
current_sleep: Duration,
update_time: channel::Receiver<Duration>,
}
impl TemporalRelease {
impl TemporalScheduler {
pub fn new<Rng: RngCore + Send + Sync + 'static>(
mut rng: Rng,
update_time: channel::Receiver<Duration>,
@ -80,7 +80,7 @@ impl TemporalRelease {
}
}
impl Stream for TemporalRelease {
impl Stream for TemporalScheduler {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@ -134,7 +134,7 @@ mod tests {
fn temporal_release_update() {
let (_tx, rx) = channel::unbounded();
let mut temporal_release =
TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1));
TemporalScheduler::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1));
assert!(!temporal_release.update(Duration::from_secs(0)));
assert!(!temporal_release.update(Duration::from_millis(999)));
@ -149,7 +149,7 @@ mod tests {
let (tx, rx) = channel::unbounded();
let mut temporal_release =
TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1));
TemporalScheduler::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1));
tx.send(Duration::from_secs(0)).unwrap();
assert_eq!(temporal_release.poll_next_unpin(&mut cx), Poll::Pending);