Blend: Tiers and Message Flows

This commit is contained in:
Youngjoon Lee 2024-10-24 15:34:18 +09:00
parent 5434fcb315
commit b79369f563
No known key found for this signature in database
GPG Key ID: 303963A54A81DD4D
8 changed files with 761 additions and 0 deletions

1
blend/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

451
blend/Cargo.lock generated Normal file
View File

@ -0,0 +1,451 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "addr2line"
version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1"
dependencies = [
"gimli",
]
[[package]]
name = "adler2"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
[[package]]
name = "autocfg"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
[[package]]
name = "backtrace"
version = "0.3.74"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a"
dependencies = [
"addr2line",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
"windows-targets",
]
[[package]]
name = "bitflags"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de"
[[package]]
name = "blend"
version = "0.1.0"
dependencies = [
"futures",
"tokio",
]
[[package]]
name = "bytes"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "futures"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-macro"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
[[package]]
name = "futures-task"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-util"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
name = "gimli"
version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
[[package]]
name = "hermit-abi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
[[package]]
name = "libc"
version = "0.2.161"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1"
[[package]]
name = "lock_api"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]]
name = "memchr"
version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]]
name = "miniz_oxide"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1"
dependencies = [
"adler2",
]
[[package]]
name = "mio"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec"
dependencies = [
"hermit-abi",
"libc",
"wasi",
"windows-sys",
]
[[package]]
name = "object"
version = "0.36.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e"
dependencies = [
"memchr",
]
[[package]]
name = "parking_lot"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-targets",
]
[[package]]
name = "pin-project-lite"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "proc-macro2"
version = "1.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af"
dependencies = [
"proc-macro2",
]
[[package]]
name = "redox_syscall"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f"
dependencies = [
"bitflags",
]
[[package]]
name = "rustc-demangle"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "signal-hook-registry"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1"
dependencies = [
"libc",
]
[[package]]
name = "slab"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
dependencies = [
"autocfg",
]
[[package]]
name = "smallvec"
version = "1.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
[[package]]
name = "socket2"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c"
dependencies = [
"libc",
"windows-sys",
]
[[package]]
name = "syn"
version = "2.0.85"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5023162dfcd14ef8f32034d8bcd4cc5ddc61ef7a247c024a33e24e1f24d21b56"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "tokio"
version = "1.41.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "145f3413504347a2be84393cc8a7d2fb4d863b375909ea59f2158261aa258bbb"
dependencies = [
"backtrace",
"bytes",
"libc",
"mio",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys",
]
[[package]]
name = "tokio-macros"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "unicode-ident"
version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "windows-sys"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-targets"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_gnullvm",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
[[package]]
name = "windows_i686_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
[[package]]
name = "windows_i686_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
[[package]]
name = "windows_i686_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"

8
blend/Cargo.toml Normal file
View File

@ -0,0 +1,8 @@
[package]
name = "blend"
version = "0.1.0"
edition = "2021"
[dependencies]
futures = "0.3.31"
tokio = { version = "1.41.0", features = ["full"] }

68
blend/src/integration.rs Normal file
View File

@ -0,0 +1,68 @@
use futures::StreamExt;
use tokio::{select, spawn, sync::mpsc};
use crate::{
network::{self, Network},
tier1::{self, Tier1},
tier2::{self, Tier2},
};
pub struct System {
network: Network,
tier1_input_channel: mpsc::UnboundedSender<tier1::Input>,
tier1_output_channel: mpsc::UnboundedReceiver<tier1::Output>,
tier2_input_channel: mpsc::UnboundedSender<tier2::Input>,
tier2_output_channel: mpsc::UnboundedReceiver<tier2::Output>,
}
impl System {
pub fn new() -> Self {
let (tier1_input_sender, tier1_input_receiver) = mpsc::unbounded_channel();
let (tier1_output_sender, tier1_output_receiver) = mpsc::unbounded_channel();
let (tier2_input_sender, tier2_input_receiver) = mpsc::unbounded_channel();
let (tier2_output_sender, tier2_output_receiver) = mpsc::unbounded_channel();
let mut tier1 = Tier1::new(tier1_input_receiver, tier1_output_sender);
spawn(async move { tier1.run().await });
let mut tier2 = Tier2::new(tier2_input_receiver, tier2_output_sender);
spawn(async move { tier2.run().await });
Self {
network: network::Network,
tier1_input_channel: tier1_input_sender,
tier1_output_channel: tier1_output_receiver,
tier2_input_channel: tier2_input_sender,
tier2_output_channel: tier2_output_receiver,
}
}
pub async fn run(&mut self) {
loop {
select! {
Some((msg, from)) = self.network.next() => {
self.tier1_input_channel.send(tier1::Input::FromNetwork { msg, from, });
}
Some(tier1_output) = self.tier1_output_channel.recv() => {
match tier1_output {
tier1::Output::ToNetwork { msg, exclude } => {
self.network.send_to_all(msg, exclude);
}
tier1::Output::ToTier2(msg) => {
self.tier2_input_channel.send(tier2::Input::FromTier1(msg));
}
}
}
Some(tier2_output) = self.tier2_output_channel.recv() => {
match tier2_output {
tier2::Output::ToTier1(msg) => {
self.tier1_input_channel.send(tier1::Input::FromTier2(msg));
}
tier2::Output::ToBroadcast(msg) => {
self.network.send_to_all(msg, None);
}
}
}
}
}
}
}

6
blend/src/lib.rs Normal file
View File

@ -0,0 +1,6 @@
mod integration;
mod network;
mod tier1;
mod tier2;
type PeerId = String;

24
blend/src/network.rs Normal file
View File

@ -0,0 +1,24 @@
use std::{
pin::Pin,
task::{Context, Poll},
};
use futures::Stream;
use crate::PeerId;
pub struct Network;
impl Network {
pub fn send_to_all(&self, msg: Vec<u8>, exclude: Option<PeerId>) {
todo!("send the msg to all peers except the one with the given id")
}
}
impl Stream for Network {
type Item = (Vec<u8>, PeerId);
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
todo!("Return messages received from peers")
}
}

98
blend/src/tier1.rs Normal file
View File

@ -0,0 +1,98 @@
use std::{
pin::Pin,
task::{Context, Poll},
};
use futures::{Stream, StreamExt};
use tokio::{select, sync::mpsc};
use crate::PeerId;
pub struct Tier1 {
input_channel: mpsc::UnboundedReceiver<Input>,
output_channel: mpsc::UnboundedSender<Output>,
persitent_transmitter: PersistentTransmitter,
}
pub enum Input {
FromNetwork { msg: Vec<u8>, from: PeerId },
FromTier2(Vec<u8>),
}
pub enum Output {
ToNetwork {
msg: Vec<u8>,
exclude: Option<PeerId>,
},
ToTier2(Vec<u8>),
}
impl Tier1 {
pub fn new(
input_channel: mpsc::UnboundedReceiver<Input>,
output_channel: mpsc::UnboundedSender<Output>,
) -> Self {
Self {
input_channel,
output_channel,
persitent_transmitter: todo!(),
}
}
pub async fn run(&mut self) {
loop {
select! {
Some(input) = self.input_channel.recv() => {
match input {
Input::FromNetwork { msg, from } => {
// TODO: Send to monitor
if self.is_drop_message(&msg) {
continue;
}
if self.is_duplicate(&msg) {
continue;
}
self.output_channel.send(Output::ToNetwork { msg: msg.clone(), exclude: Some(from) });
self.output_channel.send(Output::ToTier2(msg));
}
Input::FromTier2(msg) => {
self.persitent_transmitter.push(msg);
}
}
}
Some(msg_to_emit) = self.persitent_transmitter.next() => {
self.output_channel.send(Output::ToNetwork { msg: msg_to_emit, exclude: None });
}
}
}
}
fn is_drop_message(&self, msg: &[u8]) -> bool {
todo!()
}
fn is_duplicate(&self, msg: &[u8]) -> bool {
todo!()
}
}
struct PersistentTransmitter {
max_emission_frequency: f64,
drop_message_probability: f64,
}
impl PersistentTransmitter {
fn push(&self, msg: Vec<u8>) {
todo!("Push msg to the buffer (queue)")
}
}
impl Stream for PersistentTransmitter {
type Item = Vec<u8>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
todo!("Periodically release one of scheduled messages. Release a drop message with the probability if no message scheduled")
}
}

105
blend/src/tier2.rs Normal file
View File

@ -0,0 +1,105 @@
use std::{
pin::Pin,
task::{Context, Poll},
};
use futures::{Stream, StreamExt};
use tokio::{select, sync::mpsc};
pub struct Tier2 {
input_channel: mpsc::UnboundedReceiver<Input>,
output_channel: mpsc::UnboundedSender<Output>,
crypto_processor: CryptographicProcessor,
temporal_processor: TemporalProcessor,
}
pub enum Input {
FromTier1(Vec<u8>),
New(Vec<u8>),
FromTier3(Vec<u8>),
}
pub enum Output {
ToTier1(Vec<u8>),
ToBroadcast(Vec<u8>),
}
impl Tier2 {
pub fn new(
input_channel: mpsc::UnboundedReceiver<Input>,
output_channel: mpsc::UnboundedSender<Output>,
) -> Self {
Self {
input_channel,
output_channel,
crypto_processor: todo!(),
temporal_processor: todo!(),
}
}
pub async fn run(&mut self) {
loop {
select! {
Some(input) = self.input_channel.recv() => {
match input {
Input::FromTier1(msg) | Input::FromTier3(msg) => {
match self.crypto_processor.unwrap(msg) {
Ok(unwrapped) => self.temporal_processor.push(unwrapped),
Err(_) => {} // do nothing
}
}
Input::New(msg) => {
let wrapped = self.crypto_processor.wrap(msg);
self.output_channel.send(Output::ToTier1(wrapped));
}
}
}
Some(msg) = self.temporal_processor.next() => {
if msg.fully_unwrapped {
self.output_channel.send(Output::ToBroadcast(msg.msg));
} else {
self.output_channel.send(Output::ToTier1(msg.msg));
}
}
}
}
}
}
struct CryptographicProcessor;
struct CryptoError;
impl CryptographicProcessor {
fn wrap(&self, msg: Vec<u8>) -> Vec<u8> {
todo!()
}
fn unwrap(&self, msg: Vec<u8>) -> Result<UnwrappedMessage, CryptoError> {
todo!()
}
}
struct UnwrappedMessage {
msg: Vec<u8>,
fully_unwrapped: bool,
}
struct TemporalProcessor {
max_delay: u64,
}
impl TemporalProcessor {
fn push(&self, msg: UnwrappedMessage) {
todo!("Push msg to the buffer (queue)")
}
}
impl Stream for TemporalProcessor {
type Item = UnwrappedMessage;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
todo!("Run the logic and release a message when necessary")
}
}