From 6c42c1ae27cfde5bd51381da883cf8af88408b33 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Fri, 13 Dec 2024 13:29:53 +0900 Subject: [PATCH] add missing dir --- .../src/node/blend/consensus_streams.rs | 107 +++++ .../blendnet-sims/src/node/blend/lottery.rs | 22 + .../blendnet-sims/src/node/blend/message.rs | 39 ++ simlib/blendnet-sims/src/node/blend/mod.rs | 402 ++++++++++++++++++ .../blendnet-sims/src/node/blend/scheduler.rs | 169 ++++++++ simlib/blendnet-sims/src/node/blend/state.rs | 75 ++++ .../src/node/blend/stream_wrapper.rs | 29 ++ 7 files changed, 843 insertions(+) create mode 100644 simlib/blendnet-sims/src/node/blend/consensus_streams.rs create mode 100644 simlib/blendnet-sims/src/node/blend/lottery.rs create mode 100644 simlib/blendnet-sims/src/node/blend/message.rs create mode 100644 simlib/blendnet-sims/src/node/blend/mod.rs create mode 100644 simlib/blendnet-sims/src/node/blend/scheduler.rs create mode 100644 simlib/blendnet-sims/src/node/blend/state.rs create mode 100644 simlib/blendnet-sims/src/node/blend/stream_wrapper.rs diff --git a/simlib/blendnet-sims/src/node/blend/consensus_streams.rs b/simlib/blendnet-sims/src/node/blend/consensus_streams.rs new file mode 100644 index 0000000..08e7dfe --- /dev/null +++ b/simlib/blendnet-sims/src/node/blend/consensus_streams.rs @@ -0,0 +1,107 @@ +use crate::node::blend::scheduler::Interval; +use crossbeam::channel; +use futures::stream::iter; +use futures::{Stream, StreamExt}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +pub struct CounterInterval { + interval: Box + Unpin + Send + Sync>, +} + +impl CounterInterval { + pub fn new(duration: Duration, update_receiver: channel::Receiver) -> Self { + let interval = Interval::new(duration, update_receiver) + .zip(iter(0usize..)) + .map(|(_, i)| i); + let interval = Box::new(interval); + Self { interval } + } +} + +impl Stream for CounterInterval { + type Item = usize; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.interval.poll_next_unpin(cx) + } +} + +pub type Epoch = CounterInterval; + +pub struct Slot { + interval: Box + Unpin + Send + Sync>, +} + +impl Slot { + pub fn new( + slots_per_epoch: usize, + slot_duration: Duration, + update_receiver: channel::Receiver, + ) -> Self { + let interval = CounterInterval::new(slot_duration, update_receiver) + .map(move |slot| slot % slots_per_epoch); + let interval = Box::new(interval); + Self { interval } + } +} + +impl Stream for Slot { + type Item = usize; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.interval.poll_next_unpin(cx) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn counter_interval() { + let waker = futures::task::noop_waker(); + let mut cx = Context::from_waker(&waker); + + let (update_sender, update_receiver) = channel::unbounded(); + let mut interval = CounterInterval::new(Duration::from_secs(1), update_receiver); + + update_sender.send(Duration::from_secs(0)).unwrap(); + assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Ready(Some(0))); + update_sender.send(Duration::from_secs(0)).unwrap(); + assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Pending); + update_sender.send(Duration::from_millis(999)).unwrap(); + assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Pending); + update_sender.send(Duration::from_millis(1)).unwrap(); + assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Ready(Some(1))); + update_sender.send(Duration::from_secs(1)).unwrap(); + assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Ready(Some(2))); + update_sender.send(Duration::from_secs(3)).unwrap(); + assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Ready(Some(3))); + } + + #[test] + fn slot_interval() { + let waker = futures::task::noop_waker(); + let mut cx = Context::from_waker(&waker); + + let (update_sender, update_receiver) = channel::unbounded(); + let mut slot = Slot::new(3, Duration::from_secs(1), update_receiver); + + update_sender.send(Duration::from_secs(0)).unwrap(); + assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Ready(Some(0))); + update_sender.send(Duration::from_secs(0)).unwrap(); + assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Pending); + update_sender.send(Duration::from_millis(999)).unwrap(); + assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Pending); + update_sender.send(Duration::from_millis(1)).unwrap(); + assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Ready(Some(1))); + update_sender.send(Duration::from_secs(1)).unwrap(); + assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Ready(Some(2))); + update_sender.send(Duration::from_secs(3)).unwrap(); + assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Ready(Some(0))); + update_sender.send(Duration::from_secs(1)).unwrap(); + assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Ready(Some(1))); + } +} diff --git a/simlib/blendnet-sims/src/node/blend/lottery.rs b/simlib/blendnet-sims/src/node/blend/lottery.rs new file mode 100644 index 0000000..d0ed0b9 --- /dev/null +++ b/simlib/blendnet-sims/src/node/blend/lottery.rs @@ -0,0 +1,22 @@ +use rand::Rng; + +pub struct StakeLottery { + rng: R, + stake_proportion: f64, +} + +impl StakeLottery +where + R: Rng, +{ + pub fn new(rng: R, stake_proportion: f64) -> Self { + Self { + rng, + stake_proportion, + } + } + + pub fn run(&mut self) -> bool { + self.rng.gen_range(0.0..1.0) < self.stake_proportion + } +} diff --git a/simlib/blendnet-sims/src/node/blend/message.rs b/simlib/blendnet-sims/src/node/blend/message.rs new file mode 100644 index 0000000..4a30a7d --- /dev/null +++ b/simlib/blendnet-sims/src/node/blend/message.rs @@ -0,0 +1,39 @@ +use uuid::Uuid; + +pub type PayloadId = String; + +pub struct Payload(Uuid); + +impl Payload { + pub fn new() -> Self { + Self(Uuid::new_v4()) + } + + pub fn id(&self) -> PayloadId { + self.0.to_string() + } + + pub fn as_bytes(&self) -> &[u8] { + self.0.as_bytes() + } + + pub fn load(data: Vec) -> Self { + assert_eq!(data.len(), 16); + Self(data.try_into().unwrap()) + } +} + +#[cfg(test)] +mod tests { + use super::Payload; + + #[test] + fn payload() { + let payload = Payload::new(); + println!("{}", payload.id()); + let bytes = payload.as_bytes(); + assert_eq!(bytes.len(), 16); + let loaded_payload = Payload::load(bytes.to_vec()); + assert_eq!(bytes, loaded_payload.as_bytes()); + } +} diff --git a/simlib/blendnet-sims/src/node/blend/mod.rs b/simlib/blendnet-sims/src/node/blend/mod.rs new file mode 100644 index 0000000..27924cb --- /dev/null +++ b/simlib/blendnet-sims/src/node/blend/mod.rs @@ -0,0 +1,402 @@ +pub mod consensus_streams; +pub mod lottery; +mod message; +pub mod scheduler; +pub mod state; +pub mod stream_wrapper; + +use crate::node::blend::consensus_streams::{Epoch, Slot}; +use cached::{Cached, TimedCache}; +use crossbeam::channel; +use futures::Stream; +use lottery::StakeLottery; +use message::{Payload, PayloadId}; +use multiaddr::Multiaddr; +use netrunner::network::NetworkMessage; +use netrunner::node::{Node, NodeId, NodeIdExt}; +use netrunner::{ + network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize}, + warding::WardCondition, +}; +use nomos_blend::{ + cover_traffic::{CoverTraffic, CoverTrafficSettings}, + membership::Membership, + message_blend::{ + crypto::CryptographicProcessor, MessageBlendExt, MessageBlendSettings, MessageBlendStream, + }, + persistent_transmission::{ + PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream, + }, + BlendOutgoingMessage, +}; +use nomos_blend_message::mock::MockBlendMessage; +use rand::SeedableRng; +use rand_chacha::ChaCha12Rng; +use scheduler::{Interval, TemporalRelease}; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use state::BlendnodeState; +use std::{pin::pin, task::Poll, time::Duration}; +use stream_wrapper::CrossbeamReceiverStream; + +#[derive(Debug, Clone)] +pub struct BlendMessage(Vec); + +impl PayloadSize for BlendMessage { + fn size_bytes(&self) -> u32 { + 2208 + } +} + +#[derive(Deserialize)] +pub struct BlendnodeSettings { + pub connected_peers: Vec, + pub data_message_lottery_interval: Duration, + pub stake_proportion: f64, + pub seed: u64, + pub epoch_duration: Duration, + pub slot_duration: Duration, + pub persistent_transmission: PersistentTransmissionSettings, + pub message_blend: MessageBlendSettings, + pub cover_traffic_settings: CoverTrafficSettings, + pub membership: Vec<::PublicKey>, +} + +type Sha256Hash = [u8; 32]; + +/// This node implementation only used for testing different streaming implementation purposes. +pub struct BlendNode { + id: NodeId, + state: BlendnodeState, + settings: BlendnodeSettings, + network_interface: InMemoryNetworkInterface, + message_cache: TimedCache, + + data_msg_lottery_update_time_sender: channel::Sender, + data_msg_lottery_interval: Interval, + data_msg_lottery: StakeLottery, + + persistent_sender: channel::Sender>, + persistent_update_time_sender: channel::Sender, + persistent_transmission_messages: PersistentTransmissionStream< + CrossbeamReceiverStream>, + ChaCha12Rng, + MockBlendMessage, + Interval, + >, + crypto_processor: CryptographicProcessor, + blend_sender: channel::Sender>, + blend_update_time_sender: channel::Sender, + blend_messages: MessageBlendStream< + CrossbeamReceiverStream>, + ChaCha12Rng, + MockBlendMessage, + TemporalRelease, + >, + epoch_update_sender: channel::Sender, + slot_update_sender: channel::Sender, + cover_traffic: CoverTraffic, +} + +impl BlendNode { + pub fn new( + id: NodeId, + settings: BlendnodeSettings, + network_interface: InMemoryNetworkInterface, + ) -> Self { + let mut rng_generator = ChaCha12Rng::seed_from_u64(settings.seed); + + // Init Interval for data message lottery + let (data_msg_lottery_update_time_sender, data_msg_lottery_update_time_receiver) = + channel::unbounded(); + let data_msg_lottery_interval = Interval::new( + settings.data_message_lottery_interval, + data_msg_lottery_update_time_receiver, + ); + let data_msg_lottery = StakeLottery::new( + ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), + settings.stake_proportion, + ); + + // Init Tier-1: Persistent transmission + let (persistent_sender, persistent_receiver) = channel::unbounded(); + let (persistent_update_time_sender, persistent_update_time_receiver) = channel::unbounded(); + let persistent_transmission_messages = CrossbeamReceiverStream::new(persistent_receiver) + .persistent_transmission( + settings.persistent_transmission, + ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), + Interval::new( + Duration::from_secs_f64( + 1.0 / settings.persistent_transmission.max_emission_frequency, + ), + persistent_update_time_receiver, + ), + ); + + // Init Tier-2: message blend + let (blend_sender, blend_receiver) = channel::unbounded(); + let (blend_update_time_sender, blend_update_time_receiver) = channel::unbounded(); + let nodes: Vec< + nomos_blend::membership::Node< + ::PublicKey, + >, + > = settings + .membership + .iter() + .map(|&public_key| nomos_blend::membership::Node { + address: Multiaddr::empty(), + public_key, + }) + .collect(); + let membership = Membership::::new(nodes, id.into()); + let crypto_processor = CryptographicProcessor::new( + settings.message_blend.cryptographic_processor.clone(), + membership.clone(), + ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), + ); + let temporal_release = TemporalRelease::new( + ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), + blend_update_time_receiver, + ( + 1, + settings.message_blend.temporal_processor.max_delay_seconds, + ), + ); + let blend_messages = CrossbeamReceiverStream::new(blend_receiver).blend( + settings.message_blend.clone(), + membership, + temporal_release, + ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), + ); + + // tier 3 cover traffic + let (epoch_update_sender, epoch_updater_update_receiver) = channel::unbounded(); + let (slot_update_sender, slot_updater_update_receiver) = channel::unbounded(); + let cover_traffic: CoverTraffic = CoverTraffic::new( + settings.cover_traffic_settings, + Epoch::new(settings.epoch_duration, epoch_updater_update_receiver), + Slot::new( + settings.cover_traffic_settings.slots_per_epoch, + settings.slot_duration, + slot_updater_update_receiver, + ), + ); + + Self { + id, + network_interface, + // We're not coupling this lifespan with the steps now, but it's okay + // We expected that a message will be delivered to most of nodes within 60s. + message_cache: TimedCache::with_lifespan(60), + settings, + state: BlendnodeState { + node_id: id, + step_id: 0, + num_messages_fully_unwrapped: 0, + }, + data_msg_lottery_update_time_sender, + data_msg_lottery_interval, + data_msg_lottery, + persistent_sender, + persistent_update_time_sender, + persistent_transmission_messages, + crypto_processor, + blend_sender, + blend_update_time_sender, + blend_messages, + epoch_update_sender, + slot_update_sender, + cover_traffic, + } + } + + fn forward( + &mut self, + message: BlendMessage, + exclude_node: Option, + log: Option, + ) { + for (i, node_id) in self + .settings + .connected_peers + .iter() + .filter(|&id| Some(*id) != exclude_node) + .enumerate() + { + if i == 0 { + if let Some(log) = &log { + Self::log_emission(log); + } + } + self.network_interface + .send_message(*node_id, message.clone()) + } + self.message_cache.cache_set(Self::sha256(&message.0), ()); + } + + fn receive(&mut self) -> Vec> { + self.network_interface + .receive_messages() + .into_iter() + // Retain only messages that have not been seen before + .filter(|msg| { + self.message_cache + .cache_set(Self::sha256(&msg.payload().0), ()) + .is_none() + }) + .collect() + } + + fn sha256(message: &[u8]) -> Sha256Hash { + let mut hasher = Sha256::new(); + hasher.update(message); + hasher.finalize().into() + } + + fn update_time(&mut self, elapsed: Duration) { + self.data_msg_lottery_update_time_sender + .send(elapsed) + .unwrap(); + self.persistent_update_time_sender.send(elapsed).unwrap(); + self.blend_update_time_sender.send(elapsed).unwrap(); + self.epoch_update_sender.send(elapsed).unwrap(); + self.slot_update_sender.send(elapsed).unwrap(); + } + + fn log_message_generated(&self, msg_type: &str, payload: &Payload) { + self.log_message(format!("{}MessageGenerated", msg_type).as_str(), payload); + } + + fn log_message_fully_unwrapped(&self, payload: &Payload) { + self.log_message("MessageFullyUnwrapped", payload); + } + + fn log_message(&self, tag: &str, payload: &Payload) { + let log = MessageLog { + payload_id: payload.id(), + step_id: self.state.step_id, + node_id: self.id.index(), + }; + tracing::info!("{}: {}", tag, serde_json::to_string(&log).unwrap()); + } + + fn log_emission(log: &EmissionLog) { + tracing::info!("Emission: {}", serde_json::to_string(log).unwrap()); + } + + fn new_emission_log(&self, emission_type: &str) -> EmissionLog { + EmissionLog { + emission_type: emission_type.to_string(), + step_id: self.state.step_id, + node_id: self.id.index(), + } + } +} + +impl Node for BlendNode { + type Settings = BlendnodeSettings; + + type State = BlendnodeState; + + fn id(&self) -> NodeId { + self.id + } + + fn state(&self) -> &Self::State { + &self.state + } + + fn step(&mut self, elapsed: Duration) { + self.update_time(elapsed); + let waker = futures::task::noop_waker(); + let mut cx = futures::task::Context::from_waker(&waker); + + // Generate a data message probabilistically + if let Poll::Ready(Some(_)) = pin!(&mut self.data_msg_lottery_interval).poll_next(&mut cx) { + if self.data_msg_lottery.run() { + let payload = Payload::new(); + self.log_message_generated("Data", &payload); + let message = self + .crypto_processor + .wrap_message(payload.as_bytes()) + .unwrap(); + self.persistent_sender.send(message).unwrap(); + } + } + + // Handle incoming messages + for network_message in self.receive() { + self.forward( + network_message.payload().clone(), + Some(network_message.from), + None, + ); + self.blend_sender + .send(network_message.into_payload().0) + .unwrap(); + } + + // Proceed message blend + if let Poll::Ready(Some(msg)) = pin!(&mut self.blend_messages).poll_next(&mut cx) { + match msg { + BlendOutgoingMessage::Outbound(msg) => { + self.persistent_sender.send(msg).unwrap(); + } + BlendOutgoingMessage::FullyUnwrapped(payload) => { + let payload = Payload::load(payload); + self.log_message_fully_unwrapped(&payload); + self.state.num_messages_fully_unwrapped += 1; + //TODO: create a tracing event + } + } + } + + // Generate a cover message probabilistically + if let Poll::Ready(Some(_)) = pin!(&mut self.cover_traffic).poll_next(&mut cx) { + let payload = Payload::new(); + self.log_message_generated("Cover", &payload); + let message = self + .crypto_processor + .wrap_message(payload.as_bytes()) + .unwrap(); + self.persistent_sender.send(message).unwrap(); + } + + // Proceed persistent transmission + if let Poll::Ready(Some(msg)) = + pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx) + { + self.forward( + BlendMessage(msg), + None, + Some(self.new_emission_log("FromPersistent")), + ); + } + + self.state.step_id += 1; + } + + fn analyze(&self, ward: &mut WardCondition) -> bool { + match ward { + WardCondition::Max(_) => false, + WardCondition::Sum(condition) => { + *condition.step_result.borrow_mut() += self.state.num_messages_fully_unwrapped; + false + } + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +struct MessageLog { + payload_id: PayloadId, + step_id: usize, + node_id: usize, +} + +#[derive(Debug, Serialize, Deserialize)] +struct EmissionLog { + emission_type: String, + step_id: usize, + node_id: usize, +} diff --git a/simlib/blendnet-sims/src/node/blend/scheduler.rs b/simlib/blendnet-sims/src/node/blend/scheduler.rs new file mode 100644 index 0000000..ac403c8 --- /dev/null +++ b/simlib/blendnet-sims/src/node/blend/scheduler.rs @@ -0,0 +1,169 @@ +use crossbeam::channel; +use futures::Stream; +use rand::RngCore; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +pub struct Interval { + duration: Duration, + current_elapsed: Duration, + update_time: channel::Receiver, +} + +impl Interval { + pub fn new(duration: Duration, update_time: channel::Receiver) -> Self { + Self { + duration, + current_elapsed: duration, // to immediately release at the interval 0 + update_time, + } + } + + pub fn update(&mut self, elapsed: Duration) -> bool { + self.current_elapsed += elapsed; + if self.current_elapsed >= self.duration { + self.current_elapsed = Duration::from_secs(0); + true + } else { + false + } + } +} + +impl Stream for Interval { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if let Ok(elapsed) = self.update_time.recv() { + if self.update(elapsed) { + return Poll::Ready(Some(())); + } + } + Poll::Pending + } +} + +pub struct TemporalRelease { + random_sleeps: Box + Send + Sync + 'static>, + elapsed: Duration, + current_sleep: Duration, + update_time: channel::Receiver, +} + +impl TemporalRelease { + pub fn new( + mut rng: Rng, + update_time: channel::Receiver, + (min_delay, max_delay): (u64, u64), + ) -> Self { + let mut random_sleeps = Box::new(std::iter::repeat_with(move || { + Duration::from_secs((rng.next_u64() % (max_delay + 1)).max(min_delay)) + })); + let current_sleep = random_sleeps.next().unwrap(); + Self { + random_sleeps, + elapsed: Duration::from_secs(0), + current_sleep, + update_time, + } + } + pub fn update(&mut self, elapsed: Duration) -> bool { + self.elapsed += elapsed; + if self.elapsed >= self.current_sleep { + self.elapsed = Duration::from_secs(0); + self.current_sleep = self.random_sleeps.next().unwrap(); + true + } else { + false + } + } +} + +impl Stream for TemporalRelease { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if let Ok(elapsed) = self.update_time.recv() { + if self.update(elapsed) { + return Poll::Ready(Some(())); + } + } + Poll::Pending + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::StreamExt; + use rand_chacha::rand_core::SeedableRng; + + #[test] + fn interval_update() { + let (_tx, rx) = channel::unbounded(); + let mut interval = Interval::new(Duration::from_secs(2), rx); + + assert!(interval.update(Duration::from_secs(0))); + assert!(!interval.update(Duration::from_secs(1))); + assert!(interval.update(Duration::from_secs(1))); + assert!(interval.update(Duration::from_secs(3))); + } + + #[test] + fn interval_polling() { + let waker = futures::task::noop_waker(); + let mut cx = Context::from_waker(&waker); + + let (tx, rx) = channel::unbounded(); + let mut interval = Interval::new(Duration::from_secs(2), rx); + + tx.send(Duration::from_secs(0)).unwrap(); + assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Ready(Some(()))); + tx.send(Duration::from_secs(0)).unwrap(); + assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Pending); + tx.send(Duration::from_secs(1)).unwrap(); + assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Pending); + tx.send(Duration::from_secs(1)).unwrap(); + assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Ready(Some(()))); + tx.send(Duration::from_secs(3)).unwrap(); + assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Ready(Some(()))); + } + + #[test] + fn temporal_release_update() { + let (_tx, rx) = channel::unbounded(); + let mut temporal_release = + TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1)); + + assert!(!temporal_release.update(Duration::from_secs(0))); + assert!(!temporal_release.update(Duration::from_millis(999))); + assert!(temporal_release.update(Duration::from_secs(1))); + assert!(temporal_release.update(Duration::from_secs(3))); + } + + #[test] + fn temporal_release_polling() { + let waker = futures::task::noop_waker(); + let mut cx = Context::from_waker(&waker); + + let (tx, rx) = channel::unbounded(); + let mut temporal_release = + TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1)); + + tx.send(Duration::from_secs(0)).unwrap(); + assert_eq!(temporal_release.poll_next_unpin(&mut cx), Poll::Pending); + tx.send(Duration::from_millis(999)).unwrap(); + assert_eq!(temporal_release.poll_next_unpin(&mut cx), Poll::Pending); + tx.send(Duration::from_secs(1)).unwrap(); + assert_eq!( + temporal_release.poll_next_unpin(&mut cx), + Poll::Ready(Some(())) + ); + tx.send(Duration::from_secs(3)).unwrap(); + assert_eq!( + temporal_release.poll_next_unpin(&mut cx), + Poll::Ready(Some(())) + ); + } +} diff --git a/simlib/blendnet-sims/src/node/blend/state.rs b/simlib/blendnet-sims/src/node/blend/state.rs new file mode 100644 index 0000000..7fb4417 --- /dev/null +++ b/simlib/blendnet-sims/src/node/blend/state.rs @@ -0,0 +1,75 @@ +use std::any::Any; + +use serde::Serialize; + +use netrunner::{ + node::{serialize_node_id_as_index, NodeId}, + output_processors::{Record, RecordType, Runtime}, + settings::SimulationSettings, + warding::SimulationState, +}; + +#[derive(Debug, Clone, Serialize)] +pub struct BlendnodeState { + #[serde(serialize_with = "serialize_node_id_as_index")] + pub node_id: NodeId, + pub step_id: usize, + pub num_messages_fully_unwrapped: usize, +} + +#[derive(Serialize)] +#[serde(untagged)] +pub enum BlendnodeRecord { + Runtime(Runtime), + Settings(Box), + #[allow(clippy::vec_box)] // we downcast stuff and we need the extra boxing + Data(Vec>), +} + +impl From for BlendnodeRecord { + fn from(value: Runtime) -> Self { + Self::Runtime(value) + } +} + +impl From for BlendnodeRecord { + fn from(value: SimulationSettings) -> Self { + Self::Settings(Box::new(value)) + } +} + +impl Record for BlendnodeRecord { + type Data = BlendnodeState; + + fn record_type(&self) -> RecordType { + match self { + BlendnodeRecord::Runtime(_) => RecordType::Meta, + BlendnodeRecord::Settings(_) => RecordType::Settings, + BlendnodeRecord::Data(_) => RecordType::Data, + } + } + + fn data(&self) -> Vec<&BlendnodeState> { + match self { + BlendnodeRecord::Data(d) => d.iter().map(AsRef::as_ref).collect(), + _ => vec![], + } + } +} + +impl TryFrom<&SimulationState> for BlendnodeRecord { + type Error = anyhow::Error; + + fn try_from(state: &SimulationState) -> Result { + let Ok(states) = state + .nodes + .read() + .iter() + .map(|n| Box::::downcast(Box::new(n.state().clone()))) + .collect::, _>>() + else { + return Err(anyhow::anyhow!("use carnot record on other node")); + }; + Ok(Self::Data(states)) + } +} diff --git a/simlib/blendnet-sims/src/node/blend/stream_wrapper.rs b/simlib/blendnet-sims/src/node/blend/stream_wrapper.rs new file mode 100644 index 0000000..7d776e5 --- /dev/null +++ b/simlib/blendnet-sims/src/node/blend/stream_wrapper.rs @@ -0,0 +1,29 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use crossbeam::channel; +use futures::Stream; + +pub struct CrossbeamReceiverStream { + receiver: channel::Receiver, +} + +impl CrossbeamReceiverStream { + pub fn new(receiver: channel::Receiver) -> Self { + Self { receiver } + } +} + +impl Stream for CrossbeamReceiverStream { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + match self.receiver.try_recv() { + Ok(item) => Poll::Ready(Some(item)), + Err(channel::TryRecvError::Empty) => Poll::Pending, + Err(channel::TryRecvError::Disconnected) => Poll::Ready(None), + } + } +}