From 85a5b48c1ffad05785971db9024e427010cf8451 Mon Sep 17 00:00:00 2001 From: Alejandro Cabeza Romero Date: Thu, 7 Nov 2024 04:48:36 +0100 Subject: [PATCH] Implement Epoch and Slot streams. --- .../src/node/mix/consensus_streams.rs | 56 +++++++++++++++++++ network-runner/src/node/mix/mod.rs | 1 + 2 files changed, 57 insertions(+) create mode 100644 network-runner/src/node/mix/consensus_streams.rs diff --git a/network-runner/src/node/mix/consensus_streams.rs b/network-runner/src/node/mix/consensus_streams.rs new file mode 100644 index 0000000..8f534a7 --- /dev/null +++ b/network-runner/src/node/mix/consensus_streams.rs @@ -0,0 +1,56 @@ +use crate::node::mix::scheduler::Interval; +use crossbeam::channel; +use futures::stream::iter; +use futures::{Stream, StreamExt}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +pub struct CounterInterval { + interval: Box + Unpin>, +} + +impl CounterInterval { + pub fn new(duration: Duration, update_receiver: channel::Receiver) -> Self { + let interval = Interval::new(duration, update_receiver) + .zip(iter(0usize..)) + .map(|(_, i)| i); + let interval = Box::new(interval); + Self { interval } + } +} + +impl Stream for CounterInterval { + type Item = usize; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.interval.poll_next_unpin(cx) + } +} + +pub type Epoch = CounterInterval; + +pub struct Slot { + interval: Box + Unpin>, +} + +impl Slot { + pub fn new( + slots_per_epoch: usize, + slot_duration: Duration, + update_receiver: channel::Receiver, + ) -> Self { + let interval = CounterInterval::new(slot_duration, update_receiver) + .map(move |slot| slot % slots_per_epoch); + let interval = Box::new(interval); + Self { interval } + } +} + +impl Stream for Slot { + type Item = usize; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.interval.poll_next_unpin(cx) + } +} diff --git a/network-runner/src/node/mix/mod.rs b/network-runner/src/node/mix/mod.rs index 9e70381..ba3ee3b 100644 --- a/network-runner/src/node/mix/mod.rs +++ b/network-runner/src/node/mix/mod.rs @@ -1,3 +1,4 @@ +mod consensus_streams; mod scheduler; pub mod state; mod stream_wrapper;