From b79369f56390a619b362061b3b4ba4f614403cb0 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Thu, 24 Oct 2024 15:34:18 +0900 Subject: [PATCH] Blend: Tiers and Message Flows --- blend/.gitignore | 1 + blend/Cargo.lock | 451 +++++++++++++++++++++++++++++++++++++++ blend/Cargo.toml | 8 + blend/src/integration.rs | 68 ++++++ blend/src/lib.rs | 6 + blend/src/network.rs | 24 +++ blend/src/tier1.rs | 98 +++++++++ blend/src/tier2.rs | 105 +++++++++ 8 files changed, 761 insertions(+) create mode 100644 blend/.gitignore create mode 100644 blend/Cargo.lock create mode 100644 blend/Cargo.toml create mode 100644 blend/src/integration.rs create mode 100644 blend/src/lib.rs create mode 100644 blend/src/network.rs create mode 100644 blend/src/tier1.rs create mode 100644 blend/src/tier2.rs diff --git a/blend/.gitignore b/blend/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/blend/.gitignore @@ -0,0 +1 @@ +/target diff --git a/blend/Cargo.lock b/blend/Cargo.lock new file mode 100644 index 0000000..04d114e --- /dev/null +++ b/blend/Cargo.lock @@ -0,0 +1,451 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "addr2line" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + +[[package]] +name = "autocfg" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + +[[package]] +name = "backtrace" +version = "0.3.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-targets", +] + +[[package]] +name = "bitflags" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" + +[[package]] +name = "blend" +version = "0.1.0" +dependencies = [ + "futures", + "tokio", +] + +[[package]] +name = "bytes" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "gimli" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" + +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + +[[package]] +name = "libc" +version = "0.2.161" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" + +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "miniz_oxide" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +dependencies = [ + "adler2", +] + +[[package]] +name = "mio" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" +dependencies = [ + "hermit-abi", + "libc", + "wasi", + "windows-sys", +] + +[[package]] +name = "object" +version = "0.36.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e" +dependencies = [ + "memchr", +] + +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "proc-macro2" +version = "1.0.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" +dependencies = [ + "bitflags", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + +[[package]] +name = "smallvec" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" + +[[package]] +name = "socket2" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "syn" +version = "2.0.85" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5023162dfcd14ef8f32034d8bcd4cc5ddc61ef7a247c024a33e24e1f24d21b56" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tokio" +version = "1.41.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "145f3413504347a2be84393cc8a7d2fb4d863b375909ea59f2158261aa258bbb" +dependencies = [ + "backtrace", + "bytes", + "libc", + "mio", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-macros" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-ident" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" diff --git a/blend/Cargo.toml b/blend/Cargo.toml new file mode 100644 index 0000000..315d1ea --- /dev/null +++ b/blend/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "blend" +version = "0.1.0" +edition = "2021" + +[dependencies] +futures = "0.3.31" +tokio = { version = "1.41.0", features = ["full"] } diff --git a/blend/src/integration.rs b/blend/src/integration.rs new file mode 100644 index 0000000..f3af5dd --- /dev/null +++ b/blend/src/integration.rs @@ -0,0 +1,68 @@ +use futures::StreamExt; +use tokio::{select, spawn, sync::mpsc}; + +use crate::{ + network::{self, Network}, + tier1::{self, Tier1}, + tier2::{self, Tier2}, +}; + +pub struct System { + network: Network, + tier1_input_channel: mpsc::UnboundedSender, + tier1_output_channel: mpsc::UnboundedReceiver, + tier2_input_channel: mpsc::UnboundedSender, + tier2_output_channel: mpsc::UnboundedReceiver, +} + +impl System { + pub fn new() -> Self { + let (tier1_input_sender, tier1_input_receiver) = mpsc::unbounded_channel(); + let (tier1_output_sender, tier1_output_receiver) = mpsc::unbounded_channel(); + let (tier2_input_sender, tier2_input_receiver) = mpsc::unbounded_channel(); + let (tier2_output_sender, tier2_output_receiver) = mpsc::unbounded_channel(); + + let mut tier1 = Tier1::new(tier1_input_receiver, tier1_output_sender); + spawn(async move { tier1.run().await }); + let mut tier2 = Tier2::new(tier2_input_receiver, tier2_output_sender); + spawn(async move { tier2.run().await }); + + Self { + network: network::Network, + tier1_input_channel: tier1_input_sender, + tier1_output_channel: tier1_output_receiver, + tier2_input_channel: tier2_input_sender, + tier2_output_channel: tier2_output_receiver, + } + } + + pub async fn run(&mut self) { + loop { + select! { + Some((msg, from)) = self.network.next() => { + self.tier1_input_channel.send(tier1::Input::FromNetwork { msg, from, }); + } + Some(tier1_output) = self.tier1_output_channel.recv() => { + match tier1_output { + tier1::Output::ToNetwork { msg, exclude } => { + self.network.send_to_all(msg, exclude); + } + tier1::Output::ToTier2(msg) => { + self.tier2_input_channel.send(tier2::Input::FromTier1(msg)); + } + } + } + Some(tier2_output) = self.tier2_output_channel.recv() => { + match tier2_output { + tier2::Output::ToTier1(msg) => { + self.tier1_input_channel.send(tier1::Input::FromTier2(msg)); + } + tier2::Output::ToBroadcast(msg) => { + self.network.send_to_all(msg, None); + } + } + } + } + } + } +} diff --git a/blend/src/lib.rs b/blend/src/lib.rs new file mode 100644 index 0000000..bc371e5 --- /dev/null +++ b/blend/src/lib.rs @@ -0,0 +1,6 @@ +mod integration; +mod network; +mod tier1; +mod tier2; + +type PeerId = String; diff --git a/blend/src/network.rs b/blend/src/network.rs new file mode 100644 index 0000000..4864f9a --- /dev/null +++ b/blend/src/network.rs @@ -0,0 +1,24 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use futures::Stream; + +use crate::PeerId; + +pub struct Network; + +impl Network { + pub fn send_to_all(&self, msg: Vec, exclude: Option) { + todo!("send the msg to all peers except the one with the given id") + } +} + +impl Stream for Network { + type Item = (Vec, PeerId); + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + todo!("Return messages received from peers") + } +} diff --git a/blend/src/tier1.rs b/blend/src/tier1.rs new file mode 100644 index 0000000..5d78249 --- /dev/null +++ b/blend/src/tier1.rs @@ -0,0 +1,98 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use futures::{Stream, StreamExt}; +use tokio::{select, sync::mpsc}; + +use crate::PeerId; + +pub struct Tier1 { + input_channel: mpsc::UnboundedReceiver, + output_channel: mpsc::UnboundedSender, + + persitent_transmitter: PersistentTransmitter, +} + +pub enum Input { + FromNetwork { msg: Vec, from: PeerId }, + FromTier2(Vec), +} + +pub enum Output { + ToNetwork { + msg: Vec, + exclude: Option, + }, + ToTier2(Vec), +} + +impl Tier1 { + pub fn new( + input_channel: mpsc::UnboundedReceiver, + output_channel: mpsc::UnboundedSender, + ) -> Self { + Self { + input_channel, + output_channel, + persitent_transmitter: todo!(), + } + } + + pub async fn run(&mut self) { + loop { + select! { + Some(input) = self.input_channel.recv() => { + match input { + Input::FromNetwork { msg, from } => { + // TODO: Send to monitor + if self.is_drop_message(&msg) { + continue; + } + if self.is_duplicate(&msg) { + continue; + } + self.output_channel.send(Output::ToNetwork { msg: msg.clone(), exclude: Some(from) }); + self.output_channel.send(Output::ToTier2(msg)); + } + Input::FromTier2(msg) => { + self.persitent_transmitter.push(msg); + } + } + } + Some(msg_to_emit) = self.persitent_transmitter.next() => { + self.output_channel.send(Output::ToNetwork { msg: msg_to_emit, exclude: None }); + } + + } + } + } + + fn is_drop_message(&self, msg: &[u8]) -> bool { + todo!() + } + + fn is_duplicate(&self, msg: &[u8]) -> bool { + todo!() + } +} + +struct PersistentTransmitter { + max_emission_frequency: f64, + drop_message_probability: f64, +} + +impl PersistentTransmitter { + fn push(&self, msg: Vec) { + todo!("Push msg to the buffer (queue)") + } +} + +impl Stream for PersistentTransmitter { + type Item = Vec; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + todo!("Periodically release one of scheduled messages. Release a drop message with the probability if no message scheduled") + } +} diff --git a/blend/src/tier2.rs b/blend/src/tier2.rs new file mode 100644 index 0000000..1faa8d4 --- /dev/null +++ b/blend/src/tier2.rs @@ -0,0 +1,105 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use futures::{Stream, StreamExt}; +use tokio::{select, sync::mpsc}; + +pub struct Tier2 { + input_channel: mpsc::UnboundedReceiver, + output_channel: mpsc::UnboundedSender, + + crypto_processor: CryptographicProcessor, + temporal_processor: TemporalProcessor, +} + +pub enum Input { + FromTier1(Vec), + New(Vec), + FromTier3(Vec), +} + +pub enum Output { + ToTier1(Vec), + ToBroadcast(Vec), +} + +impl Tier2 { + pub fn new( + input_channel: mpsc::UnboundedReceiver, + output_channel: mpsc::UnboundedSender, + ) -> Self { + Self { + input_channel, + output_channel, + crypto_processor: todo!(), + temporal_processor: todo!(), + } + } + + pub async fn run(&mut self) { + loop { + select! { + Some(input) = self.input_channel.recv() => { + match input { + Input::FromTier1(msg) | Input::FromTier3(msg) => { + match self.crypto_processor.unwrap(msg) { + Ok(unwrapped) => self.temporal_processor.push(unwrapped), + Err(_) => {} // do nothing + } + } + Input::New(msg) => { + let wrapped = self.crypto_processor.wrap(msg); + self.output_channel.send(Output::ToTier1(wrapped)); + } + } + } + Some(msg) = self.temporal_processor.next() => { + if msg.fully_unwrapped { + self.output_channel.send(Output::ToBroadcast(msg.msg)); + } else { + self.output_channel.send(Output::ToTier1(msg.msg)); + } + } + } + } + } +} + +struct CryptographicProcessor; + +struct CryptoError; + +impl CryptographicProcessor { + fn wrap(&self, msg: Vec) -> Vec { + todo!() + } + + fn unwrap(&self, msg: Vec) -> Result { + todo!() + } +} + +struct UnwrappedMessage { + msg: Vec, + fully_unwrapped: bool, +} + +struct TemporalProcessor { + max_delay: u64, +} + +impl TemporalProcessor { + fn push(&self, msg: UnwrappedMessage) { + todo!("Push msg to the buffer (queue)") + } +} + +impl Stream for TemporalProcessor { + type Item = UnwrappedMessage; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + todo!("Run the logic and release a message when necessary") + } +}