diff --git a/network-runner/src/node/mix/consensus_streams.rs b/network-runner/src/node/mix/consensus_streams.rs new file mode 100644 index 0000000..8f534a7 --- /dev/null +++ b/network-runner/src/node/mix/consensus_streams.rs @@ -0,0 +1,56 @@ +use crate::node::mix::scheduler::Interval; +use crossbeam::channel; +use futures::stream::iter; +use futures::{Stream, StreamExt}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +pub struct CounterInterval { + interval: Box + Unpin>, +} + +impl CounterInterval { + pub fn new(duration: Duration, update_receiver: channel::Receiver) -> Self { + let interval = Interval::new(duration, update_receiver) + .zip(iter(0usize..)) + .map(|(_, i)| i); + let interval = Box::new(interval); + Self { interval } + } +} + +impl Stream for CounterInterval { + type Item = usize; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.interval.poll_next_unpin(cx) + } +} + +pub type Epoch = CounterInterval; + +pub struct Slot { + interval: Box + Unpin>, +} + +impl Slot { + pub fn new( + slots_per_epoch: usize, + slot_duration: Duration, + update_receiver: channel::Receiver, + ) -> Self { + let interval = CounterInterval::new(slot_duration, update_receiver) + .map(move |slot| slot % slots_per_epoch); + let interval = Box::new(interval); + Self { interval } + } +} + +impl Stream for Slot { + type Item = usize; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.interval.poll_next_unpin(cx) + } +} diff --git a/network-runner/src/node/mix/mod.rs b/network-runner/src/node/mix/mod.rs index e3496dd..ac68231 100644 --- a/network-runner/src/node/mix/mod.rs +++ b/network-runner/src/node/mix/mod.rs @@ -1,3 +1,4 @@ +mod consensus_streams; mod scheduler; pub mod state; mod stream_wrapper;