Add simulation engine

First stage of adding the simulation engine that was living in older versions of nomos-nod. It will change in the future generalising it for better compatibility.
This commit is contained in:
gusto 2024-11-05 12:13:21 +02:00 committed by GitHub
parent c3a04ab206
commit 09d99c0831
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 6937 additions and 0 deletions

1
.gitignore vendored
View File

@ -3,3 +3,4 @@ __pycache__/
*$py.class *$py.class
*.so *.so
simulation simulation
network-runner/target

2835
network-runner/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

44
network-runner/Cargo.toml Normal file
View File

@ -0,0 +1,44 @@
[package]
name = "nomos-simulations-network-runner"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "simulation"
path = "src/bin/app/main.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1"
blake2 = "0.10"
bls-signatures = "0.14"
digest = "0.10"
csv = "1"
clap = { version = "4", features = ["derive"] }
ctrlc = "3.4"
chrono = { version = "0.4", features = ["serde"] }
crc32fast = "1.3"
crossbeam = { version = "0.8.2", features = ["crossbeam-channel"] }
fixed-slice-deque = "0.1.0-beta2"
futures = "0.3"
humantime = "2.1"
humantime-serde = "1"
once_cell = "1.17"
parking_lot = "0.12"
polars = { version = "0.27", features = ["serde", "object", "json", "csv-file", "parquet", "dtype-struct"], optional = true }
rand = { version = "0.8", features = ["small_rng"] }
rayon = "1.7"
scopeguard = "1"
serde = { version = "1.0", features = ["derive", "rc"] }
serde_with = "2.3"
serde_json = "1.0"
thiserror = "1"
tracing = { version = "0.1", default-features = false, features = ["log", "attributes"] }
tracing-subscriber = { version = "0.3", features = ["json", "env-filter", "tracing-log"]}
[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2", features = ["js"] }
[features]
polars = ["dep:polars"]

View File

@ -0,0 +1,52 @@
{
"network_settings": {
"network_behaviors": {
"north america:north america": "50ms",
"north america:europe": "100ms",
"north america:asia": "120ms",
"europe:europe": "50ms",
"europe:asia": "100ms",
"europe:north america": "120ms",
"asia:north america": "100ms",
"asia:europe": "120ms",
"asia:asia": "40ms"
},
"regions": {
"north america": 0.4,
"europe": 0.4,
"asia": 0.3
}
},
"overlay_settings": {
"number_of_committees": 3
},
"node_settings": {
"timeout": "1000ms"
},
"step_time": "10ms",
"runner_settings": "Sync",
"stream_settings": {
"path": "test.csv"
},
"node_count": 3000,
"views_count": 3,
"leaders_count": 1,
"seed": 0,
"wards": [
{"max_view": 1}
],
"record_settings": {
"node_id": true,
"current_view": true,
"highest_voted_view": true,
"local_high_qc": true,
"safe_blocks": true,
"last_view_timeout_qc": true,
"latest_committed_block": true,
"latest_committed_view": true,
"root_committee": true,
"parent_committee": true,
"child_committees": true,
"committed_blocks": true
}
}

View File

@ -0,0 +1,62 @@
{
"network_settings": {
"network_behaviors": {
"north america:north america": "10ms",
"north america:europe": "150ms",
"north america:asia": "250ms",
"europe:europe": "10ms",
"europe:asia": "200ms",
"europe:north america": "150ms",
"asia:north america": "250ms",
"asia:europe": "200ms",
"asia:asia": "10ms"
},
"regions": {
"north america": 0.4,
"europe": 0.3,
"asia": 0.3
}
},
"overlay_settings": {
"number_of_committees": 7
},
"node_settings": {
"network_capacity_kbps": 10000024,
"timeout": "10000ms"
},
"step_time": "100ms",
"runner_settings": "Sync",
"stream_settings": {
"path": "tree_500_7_view_1_default.csv",
"format": "csv"
},
"node_count": 500,
"views_count": 10,
"leaders_count": 1,
"seed": 0,
"wards": [
{
"max_view": 1
},
{
"stalled_view": {
"consecutive_viewed_checkpoint": null,
"criterion": 0,
"threshold": 100
}
}
],
"record_settings": {
"current_view": true,
"highest_voted_view": true,
"local_high_qc": true,
"safe_blocks": false,
"last_view_timeout_qc": true,
"latest_committed_block": true,
"latest_committed_view": true,
"root_committee": false,
"parent_committee": false,
"child_committees": false,
"committed_blocks": false
}
}

View File

@ -0,0 +1,86 @@
use std::{
fs::File,
io::{stderr, stdout},
path::PathBuf,
str::FromStr,
};
use tracing_subscriber::fmt::{format::Format, FormatEvent, FormatFields, SubscriberBuilder};
#[derive(Default, Copy, Clone)]
pub enum LogFormat {
#[default]
Plain,
Json,
}
impl FromStr for LogFormat {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.trim().to_ascii_lowercase().as_str() {
"json" => Ok(LogFormat::Json),
"plain" => Ok(LogFormat::Plain),
_ => Err(anyhow::anyhow!("Unknown log format")),
}
}
}
#[derive(Default, Clone)]
pub enum LogOutput {
#[default]
StdOut,
StdErr,
File(PathBuf),
}
impl FromStr for LogOutput {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.trim().to_ascii_lowercase().as_str() {
"stdout" => Ok(Self::StdOut),
"stderr" => Ok(Self::StdErr),
path => Ok(Self::File(PathBuf::from(path))),
}
}
}
pub fn config_tracing(fmt: LogFormat, file: &LogOutput) {
let filter = std::env::var("SIMULATION_LOG").unwrap_or_else(|_| "info".to_owned());
let subscriber = tracing_subscriber::fmt::fmt()
.without_time()
.with_line_number(true)
.with_env_filter(filter)
.with_file(false)
.with_target(true);
if let LogFormat::Json = fmt {
set_global(subscriber.json(), file);
} else {
set_global(subscriber, file);
}
}
fn set_global<N, L, T>(
subscriber: SubscriberBuilder<N, Format<L, T>, tracing_subscriber::EnvFilter>,
output: &LogOutput,
) where
N: for<'writer> FormatFields<'writer> + 'static + Send + Sync,
Format<L, T>: FormatEvent<tracing_subscriber::Registry, N>,
L: Send + Sync + 'static,
T: Send + Sync + 'static,
{
use tracing::subscriber::set_global_default;
match output {
LogOutput::StdOut => set_global_default(subscriber.with_writer(stdout).finish()),
LogOutput::StdErr => set_global_default(subscriber.with_writer(stderr).finish()),
LogOutput::File(path) => set_global_default(
subscriber
.with_ansi(false)
.with_writer(File::create(path).expect("Unable to create log file"))
.finish(),
),
}
.expect("Unable to set global default subscriber")
}

View File

@ -0,0 +1,185 @@
// std
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
// crates
use anyhow::Ok;
use clap::Parser;
use nomos_simulations_network_runner::network::behaviour::create_behaviours;
use nomos_simulations_network_runner::network::regions::{create_regions, RegionsData};
use nomos_simulations_network_runner::network::Network;
use nomos_simulations_network_runner::node::NodeId;
use nomos_simulations_network_runner::output_processors::{OutData, Record};
use nomos_simulations_network_runner::runner::{BoxedNode, SimulationRunnerHandle};
#[cfg(feature = "polars")]
use nomos_simulations_network_runner::streaming::polars::PolarsSubscriber;
use nomos_simulations_network_runner::streaming::{
io::IOSubscriber, naive::NaiveSubscriber, StreamType,
};
use parking_lot::Mutex;
use rand::rngs::SmallRng;
use rand::seq::SliceRandom;
use rand::SeedableRng;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use serde::de::DeserializeOwned;
use serde::Serialize;
// internal
use nomos_simulations_network_runner::{runner::SimulationRunner, settings::SimulationSettings};
mod log;
/// Main simulation wrapper
/// Pipes together the cli arguments with the execution
#[derive(Parser)]
pub struct SimulationApp {
/// Json file path, on `SimulationSettings` format
#[clap(long, short)]
input_settings: PathBuf,
#[clap(long)]
stream_type: Option<StreamType>,
#[clap(long, default_value = "plain")]
log_format: log::LogFormat,
#[clap(long, default_value = "stdout")]
log_to: log::LogOutput,
#[clap(long)]
dump_overlay_info: bool,
#[clap(long)]
no_netcap: bool,
}
impl SimulationApp {
pub fn run(self) -> anyhow::Result<()> {
let Self {
input_settings,
stream_type,
log_format: _,
log_to: _,
dump_overlay_info,
no_netcap,
} = self;
let simulation_settings: SimulationSettings = load_json_from_file(&input_settings)?;
let seed = simulation_settings.seed.unwrap_or_else(|| {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs()
});
let mut rng = SmallRng::seed_from_u64(seed);
let mut node_ids: Vec<NodeId> = (0..simulation_settings.node_count)
.map(|_| todo!())
.collect();
node_ids.shuffle(&mut rng);
let regions = create_regions(&node_ids, &mut rng, &simulation_settings.network_settings);
let behaviours = create_behaviours(&simulation_settings.network_settings);
let regions_data = RegionsData::new(regions, behaviours);
let ids = node_ids.clone();
let network = Arc::new(Mutex::new(Network::<()>::new(regions_data, seed)));
// if dump_overlay_info {
// dump_json_to_file(
// Path::new("overlay_info.json"),
// &overlay_node::overlay_info(
// node_ids.clone(),
// node_ids.first().copied().unwrap(),
// &simulation_settings.overlay_settings,
// ),
// )?;
// }
// let nodes: Vec<BoxedNode<_, _>> = node_ids
// .par_iter()
// .copied()
// .map(|node_id| todo!())
// .collect();
// let network = Arc::try_unwrap(network)
// .expect("network is not used anywhere else")
// .into_inner();
// run::<_, _, _>(network, nodes, simulation_settings, stream_type)?;
Ok(())
}
}
fn run<M: std::fmt::Debug, S, T>(
network: Network<M>,
nodes: Vec<BoxedNode<S, T>>,
settings: SimulationSettings,
stream_type: Option<StreamType>,
) -> anyhow::Result<()>
where
M: Clone + Send + Sync + 'static,
S: 'static,
T: Serialize + Clone + 'static,
{
let stream_settings = settings.stream_settings.clone();
let runner =
SimulationRunner::<_, OutData, S, T>::new(network, nodes, Default::default(), settings)?;
let handle = match stream_type {
Some(StreamType::Naive) => {
let settings = stream_settings.unwrap_naive();
runner.simulate_and_subscribe::<NaiveSubscriber<OutData>>(settings)?
}
Some(StreamType::IO) => {
let settings = stream_settings.unwrap_io();
runner.simulate_and_subscribe::<IOSubscriber<OutData>>(settings)?
}
#[cfg(feature = "polars")]
Some(StreamType::Polars) => {
let settings = stream_settings.unwrap_polars();
runner.simulate_and_subscribe::<PolarsSubscriber<OutData>>(settings)?
}
None => runner.simulate()?,
};
signal(handle)
}
fn signal<R: Record>(handle: SimulationRunnerHandle<R>) -> anyhow::Result<()> {
let handle = Arc::new(handle);
let (tx, rx) = crossbeam::channel::bounded(1);
ctrlc::set_handler(move || {
tx.send(()).unwrap();
})?;
loop {
crossbeam::select! {
recv(rx) -> _ => {
handle.stop()?;
tracing::info!("gracefully shutdown the simulation app");
break;
},
default => {
if handle.is_finished() {
handle.shutdown()?;
break;
}
std::thread::sleep(Duration::from_millis(50));
}
}
}
Ok(())
}
/// Generically load a json file
fn load_json_from_file<T: DeserializeOwned>(path: &Path) -> anyhow::Result<T> {
let f = File::open(path).map_err(Box::new)?;
Ok(serde_json::from_reader(f)?)
}
fn dump_json_to_file<T: Serialize>(path: &Path, data: &T) -> anyhow::Result<()> {
let f = File::create(path).map_err(Box::new)?;
Ok(serde_json::to_writer(f, data)?)
}
fn main() -> anyhow::Result<()> {
let app: SimulationApp = SimulationApp::parse();
log::config_tracing(app.log_format, &app.log_to);
if let Err(e) = app.run() {
tracing::error!("error: {}", e);
std::process::exit(1);
}
Ok(())
}

10
network-runner/src/lib.rs Normal file
View File

@ -0,0 +1,10 @@
pub mod network;
pub mod node;
pub mod output_processors;
pub mod runner;
pub mod settings;
pub mod streaming;
pub mod warding;
static START_TIME: once_cell::sync::Lazy<std::time::Instant> =
once_cell::sync::Lazy::new(std::time::Instant::now);

View File

@ -0,0 +1,40 @@
// std
use std::{collections::HashMap, time::Duration};
// crates
use rand::Rng;
use serde::{Deserialize, Serialize};
use super::{NetworkBehaviourKey, NetworkSettings};
// internal
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct NetworkBehaviour {
pub delay: Duration,
pub drop: f64,
}
impl NetworkBehaviour {
pub fn new(delay: Duration, drop: f64) -> Self {
Self { delay, drop }
}
pub fn delay(&self) -> Duration {
self.delay
}
pub fn should_drop<R: Rng>(&self, rng: &mut R) -> bool {
rng.gen_bool(self.drop)
}
}
// Takes a reference to the simulation_settings and returns a HashMap representing the
// network behaviors for pairs of NodeIds.
pub fn create_behaviours(
network_settings: &NetworkSettings,
) -> HashMap<NetworkBehaviourKey, NetworkBehaviour> {
network_settings
.network_behaviors
.iter()
.map(|(k, d)| (*k, NetworkBehaviour::new(*d, 0.0)))
.collect()
}

View File

@ -0,0 +1,814 @@
// std
use std::{
collections::HashMap,
ops::Add,
str::FromStr,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::{Duration, Instant},
};
// crates
use crossbeam::channel::{self, Receiver, Sender};
use parking_lot::Mutex;
use rand::{rngs::SmallRng, Rng, SeedableRng};
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
// internal
use crate::node::NodeId;
pub mod behaviour;
pub mod regions;
type NetworkTime = Instant;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct NetworkBehaviourKey {
pub from: regions::Region,
pub to: regions::Region,
}
impl NetworkBehaviourKey {
pub fn new(from: regions::Region, to: regions::Region) -> Self {
Self { from, to }
}
}
impl Serialize for NetworkBehaviourKey {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let s = format!("{}:{}", self.from, self.to);
serializer.serialize_str(&s)
}
}
impl<'de> Deserialize<'de> for NetworkBehaviourKey {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let s = String::deserialize(deserializer)?;
let mut split = s.split(':');
let from = split.next().ok_or(serde::de::Error::custom(
"NetworkBehaviourKey should be in the form of `from_region:to_region`",
))?;
let to = split.next().ok_or(serde::de::Error::custom(
"NetworkBehaviourKey should be in the form of `from_region:to_region`",
))?;
Ok(Self::new(
regions::Region::from_str(from).map_err(serde::de::Error::custom)?,
regions::Region::from_str(to).map_err(serde::de::Error::custom)?,
))
}
}
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct NetworkSettings {
#[serde(with = "network_behaviors_serde")]
pub network_behaviors: HashMap<NetworkBehaviourKey, Duration>,
/// Represents node distribution in the simulated regions.
/// The sum of distributions should be 1.
pub regions: HashMap<regions::Region, f32>,
}
/// Ser/Deser `HashMap<NetworkBehaviourKey, Duration>` to humantime format.
mod network_behaviors_serde {
use super::{Deserialize, Duration, HashMap, NetworkBehaviourKey};
/// Have to implement this manually because of the `serde_json` will panic if the key of map
/// is not a string.
pub fn serialize<S>(
vals: &HashMap<NetworkBehaviourKey, Duration>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeMap;
let mut ser = serializer.serialize_map(Some(vals.len()))?;
for (k, v) in vals {
ser.serialize_key(&k)?;
ser.serialize_value(&humantime::format_duration(*v).to_string())?;
}
ser.end()
}
pub fn deserialize<'de, D>(
deserializer: D,
) -> Result<HashMap<NetworkBehaviourKey, Duration>, D::Error>
where
D: serde::Deserializer<'de>,
{
let map = HashMap::<NetworkBehaviourKey, String>::deserialize(deserializer)?;
map.into_iter()
.map(|(k, v)| {
let v = humantime::parse_duration(&v).map_err(serde::de::Error::custom)?;
Ok((k, v))
})
.collect::<Result<HashMap<_, _>, _>>()
}
}
/// Represents node network capacity and current load in bytes.
#[derive(Debug)]
struct NodeNetworkCapacity {
capacity_bps: Option<u32>,
current_load: Mutex<u32>,
load_to_flush: AtomicU32,
}
impl NodeNetworkCapacity {
fn new(capacity_bps: Option<u32>) -> Self {
Self {
capacity_bps,
current_load: Mutex::new(0),
load_to_flush: AtomicU32::new(0),
}
}
fn increase_load(&self, load: u32) -> bool {
if let Some(capacity_bps) = self.capacity_bps {
let mut current_load = self.current_load.lock();
if *current_load + load <= capacity_bps {
*current_load += load;
true
} else {
false
}
} else {
true
}
}
fn decrease_load(&self, load: u32) {
self.load_to_flush.fetch_add(load, Ordering::Relaxed);
}
fn flush_load(&self) {
if self.capacity_bps.is_none() {
return;
}
let mut s = self.current_load.lock();
*s -= self.load_to_flush.load(Ordering::Relaxed);
self.load_to_flush.store(0, Ordering::Relaxed);
}
}
#[derive(Debug)]
pub struct Network<M: std::fmt::Debug> {
pub regions: regions::RegionsData,
network_time: NetworkTime,
messages: Vec<(NetworkTime, NetworkMessage<M>)>,
node_network_capacity: HashMap<NodeId, NodeNetworkCapacity>,
from_node_receivers: HashMap<NodeId, Receiver<NetworkMessage<M>>>,
from_node_broadcast_receivers: HashMap<NodeId, Receiver<NetworkMessage<M>>>,
to_node_senders: HashMap<NodeId, Sender<NetworkMessage<M>>>,
seed: u64,
}
impl<M> Network<M>
where
M: std::fmt::Debug + Send + Sync + Clone,
{
pub fn new(regions: regions::RegionsData, seed: u64) -> Self {
Self {
regions,
network_time: Instant::now(),
messages: Vec::new(),
node_network_capacity: HashMap::new(),
from_node_receivers: HashMap::new(),
from_node_broadcast_receivers: HashMap::new(),
to_node_senders: HashMap::new(),
seed,
}
}
fn send_message_cost<R: Rng>(
&self,
rng: &mut R,
node_a: NodeId,
node_b: NodeId,
) -> Option<Duration> {
let network_behaviour = self.regions.network_behaviour(node_a, node_b);
(!network_behaviour.should_drop(rng))
// TODO: use a delay range
.then(|| network_behaviour.delay())
}
pub fn connect(
&mut self,
node_id: NodeId,
capacity_bps: Option<u32>,
node_message_receiver: Receiver<NetworkMessage<M>>,
node_message_broadcast_receiver: Receiver<NetworkMessage<M>>,
) -> Receiver<NetworkMessage<M>> {
self.node_network_capacity
.insert(node_id, NodeNetworkCapacity::new(capacity_bps));
let (to_node_sender, from_network_receiver) = channel::unbounded();
self.from_node_receivers
.insert(node_id, node_message_receiver);
self.from_node_broadcast_receivers
.insert(node_id, node_message_broadcast_receiver);
self.to_node_senders.insert(node_id, to_node_sender);
from_network_receiver
}
/// Collects and dispatches messages to connected interfaces.
pub fn step(&mut self, time_passed: Duration) {
self.collect_messages();
self.dispatch_after(time_passed);
}
/// Receive and store all messages from nodes.
pub fn collect_messages(&mut self) {
let mut adhoc_messages = self
.from_node_receivers
.par_iter()
.flat_map(|(_, from_node)| {
from_node
.try_iter()
.map(|msg| (self.network_time, msg))
.collect::<Vec<_>>()
})
.collect();
self.messages.append(&mut adhoc_messages);
let mut broadcast_messages = self
.from_node_broadcast_receivers
.iter()
.flat_map(|(_, from_node)| {
from_node.try_iter().flat_map(|msg| {
self.to_node_senders.keys().map(move |recipient| {
let mut m = msg.clone();
m.to = Some(*recipient);
m
})
})
})
.map(|m| (self.network_time, m))
.collect::<Vec<_>>();
self.messages.append(&mut broadcast_messages);
}
/// Reiterate all messages and send to appropriate nodes if simulated
/// delay has passed.
pub fn dispatch_after(&mut self, time_passed: Duration) {
self.network_time += time_passed;
let delayed = self
.messages
.par_iter()
.filter(|(network_time, message)| {
let mut rng = SmallRng::seed_from_u64(self.seed);
self.send_or_drop_message(&mut rng, network_time, message)
})
.cloned()
.collect();
for (_, c) in self.node_network_capacity.iter() {
c.flush_load();
}
self.messages = delayed;
}
/// Returns true if message needs to be delayed and be dispatched in future.
fn send_or_drop_message<R: Rng>(
&self,
rng: &mut R,
network_time: &NetworkTime,
message: &NetworkMessage<M>,
) -> bool {
let to = message.to.expect("adhoc message has recipient");
if let Some(delay) = self.send_message_cost(rng, message.from, to) {
let node_capacity = self.node_network_capacity.get(&to).unwrap();
let should_send = network_time.add(delay) <= self.network_time;
let remaining_size = message.remaining_size();
if should_send && node_capacity.increase_load(remaining_size) {
let to_node = self.to_node_senders.get(&to).unwrap();
to_node
.send(message.clone())
.expect("node should have connection");
node_capacity.decrease_load(remaining_size);
return false;
} else {
// if we do not need to delay, then we should check if the msg is too large
// if so, we mock the partial sending message behavior
if should_send {
// if remaining is 0, we should send without delay
return self.try_partial_send(node_capacity, message, &to) != 0;
}
return true;
}
}
false
}
/// Try to apply partial send logic, returns the remaining size of the message
fn try_partial_send(
&self,
node_capacity: &NodeNetworkCapacity,
message: &NetworkMessage<M>,
to: &NodeId,
) -> u32 {
if let Some(capacity_bps) = node_capacity.capacity_bps {
let mut cap = node_capacity.current_load.lock();
let sent = capacity_bps - *cap;
*cap = capacity_bps;
let remaining = message.partial_send(sent);
// Message is partially sent, the node capacity needs to be flushed at the end of step even
// if the whole message is not sent.
node_capacity.decrease_load(sent);
if remaining == 0 {
let to_node = self.to_node_senders.get(to).unwrap();
to_node
.send(message.clone())
.expect("node should have connection");
}
remaining
} else {
0
}
}
}
#[derive(Clone, Debug)]
pub struct NetworkMessage<M> {
pub from: NodeId,
pub to: Option<NodeId>,
pub payload: M,
pub remaining: Arc<AtomicU32>,
}
impl<M> NetworkMessage<M> {
pub fn new(from: NodeId, to: Option<NodeId>, payload: M, size_bytes: u32) -> Self {
Self {
from,
to,
payload,
remaining: Arc::new(AtomicU32::new(size_bytes)),
}
}
pub fn payload(&self) -> &M {
&self.payload
}
pub fn into_payload(self) -> M {
self.payload
}
fn remaining_size(&self) -> u32 {
self.remaining.load(Ordering::SeqCst)
}
/// Mock the partial sending of a message behavior, returning the remaining message size.
fn partial_send(&self, size: u32) -> u32 {
self.remaining
.fetch_sub(size, Ordering::SeqCst)
.saturating_sub(size)
}
}
pub trait PayloadSize {
fn size_bytes(&self) -> u32;
}
pub trait NetworkInterface {
type Payload;
fn broadcast(&self, message: Self::Payload);
fn send_message(&self, address: NodeId, message: Self::Payload);
fn receive_messages(&self) -> Vec<NetworkMessage<Self::Payload>>;
}
pub struct InMemoryNetworkInterface<M> {
id: NodeId,
broadcast: Sender<NetworkMessage<M>>,
sender: Sender<NetworkMessage<M>>,
receiver: Receiver<NetworkMessage<M>>,
}
impl<M> InMemoryNetworkInterface<M> {
pub fn new(
id: NodeId,
broadcast: Sender<NetworkMessage<M>>,
sender: Sender<NetworkMessage<M>>,
receiver: Receiver<NetworkMessage<M>>,
) -> Self {
Self {
id,
broadcast,
sender,
receiver,
}
}
}
impl<M: PayloadSize> NetworkInterface for InMemoryNetworkInterface<M> {
type Payload = M;
fn broadcast(&self, message: Self::Payload) {
let size = message.size_bytes();
let message = NetworkMessage::new(self.id, None, message, size);
self.broadcast.send(message).unwrap();
}
fn send_message(&self, address: NodeId, message: Self::Payload) {
let size = message.size_bytes();
let message = NetworkMessage::new(self.id, Some(address), message, size);
self.sender.send(message).unwrap();
}
fn receive_messages(&self) -> Vec<crate::network::NetworkMessage<Self::Payload>> {
self.receiver.try_iter().collect()
}
}
#[cfg(test)]
mod tests {
use super::{
behaviour::NetworkBehaviour,
regions::{Region, RegionsData},
Network, NetworkInterface, NetworkMessage,
};
use crate::{
network::NetworkBehaviourKey,
node::{NodeId, NodeIdExt},
};
use crossbeam::channel::{self, Receiver, Sender};
use std::{collections::HashMap, time::Duration};
struct MockNetworkInterface {
id: NodeId,
broadcast: Sender<NetworkMessage<()>>,
sender: Sender<NetworkMessage<()>>,
receiver: Receiver<NetworkMessage<()>>,
message_size: u32,
}
impl MockNetworkInterface {
pub fn new(
id: NodeId,
broadcast: Sender<NetworkMessage<()>>,
sender: Sender<NetworkMessage<()>>,
receiver: Receiver<NetworkMessage<()>>,
message_size: u32,
) -> Self {
Self {
id,
broadcast,
sender,
receiver,
message_size,
}
}
}
impl NetworkInterface for MockNetworkInterface {
type Payload = ();
fn broadcast(&self, message: Self::Payload) {
let message = NetworkMessage::new(self.id, None, message, self.message_size);
self.broadcast.send(message).unwrap();
}
fn send_message(&self, address: NodeId, message: Self::Payload) {
let message = NetworkMessage::new(self.id, Some(address), message, self.message_size);
self.sender.send(message).unwrap();
}
fn receive_messages(&self) -> Vec<crate::network::NetworkMessage<Self::Payload>> {
self.receiver.try_iter().collect()
}
}
#[test]
fn send_receive_messages() {
let node_a = NodeId::from_index(0);
let node_b = NodeId::from_index(1);
let regions = HashMap::from([(Region::Europe, vec![node_a, node_b])]);
let behaviour = HashMap::from([(
NetworkBehaviourKey::new(Region::Europe, Region::Europe),
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
)]);
let regions_data = RegionsData::new(regions, behaviour);
let mut network = Network::new(regions_data, 0);
let (from_a_sender, from_a_receiver) = channel::unbounded();
let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded();
let to_a_receiver =
network.connect(node_a, Some(3), from_a_receiver, from_a_broadcast_receiver);
let a = MockNetworkInterface::new(
node_a,
from_a_broadcast_sender,
from_a_sender,
to_a_receiver,
1,
);
let (from_b_sender, from_b_receiver) = channel::unbounded();
let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded();
let to_b_receiver =
network.connect(node_b, Some(3), from_b_receiver, from_b_broadcast_receiver);
let b = MockNetworkInterface::new(
node_b,
from_b_broadcast_sender,
from_b_sender,
to_b_receiver,
1,
);
a.send_message(node_b, ());
network.collect_messages();
assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 0);
network.step(Duration::from_millis(0));
assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 0);
network.step(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 1);
network.step(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 0);
b.send_message(node_a, ());
b.send_message(node_a, ());
b.send_message(node_a, ());
network.collect_messages();
network.dispatch_after(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 3);
assert_eq!(b.receive_messages().len(), 0);
}
#[test]
fn regions_send_receive_messages() {
let node_a = NodeId::from_index(0);
let node_b = NodeId::from_index(1);
let node_c = NodeId::from_index(2);
let regions = HashMap::from([
(Region::Asia, vec![node_a, node_b]),
(Region::Europe, vec![node_c]),
]);
let behaviour = HashMap::from([
(
NetworkBehaviourKey::new(Region::Asia, Region::Asia),
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
),
(
NetworkBehaviourKey::new(Region::Asia, Region::Europe),
NetworkBehaviour::new(Duration::from_millis(500), 0.0),
),
(
NetworkBehaviourKey::new(Region::Europe, Region::Europe),
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
),
]);
let regions_data = RegionsData::new(regions, behaviour);
let mut network = Network::new(regions_data, 0);
let (from_a_sender, from_a_receiver) = channel::unbounded();
let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded();
let to_a_receiver =
network.connect(node_a, Some(2), from_a_receiver, from_a_broadcast_receiver);
let a = MockNetworkInterface::new(
node_a,
from_a_broadcast_sender,
from_a_sender,
to_a_receiver,
1,
);
let (from_b_sender, from_b_receiver) = channel::unbounded();
let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded();
let to_b_receiver =
network.connect(node_b, Some(2), from_b_receiver, from_b_broadcast_receiver);
let b = MockNetworkInterface::new(
node_b,
from_b_broadcast_sender,
from_b_sender,
to_b_receiver,
1,
);
let (from_c_sender, from_c_receiver) = channel::unbounded();
let (from_c_broadcast_sender, from_c_broadcast_receiver) = channel::unbounded();
let to_c_receiver =
network.connect(node_c, Some(2), from_c_receiver, from_c_broadcast_receiver);
let c = MockNetworkInterface::new(
node_c,
from_c_broadcast_sender,
from_c_sender,
to_c_receiver,
1,
);
a.send_message(node_b, ());
a.send_message(node_c, ());
network.collect_messages();
b.send_message(node_a, ());
b.send_message(node_c, ());
network.collect_messages();
c.send_message(node_a, ());
c.send_message(node_b, ());
network.collect_messages();
network.dispatch_after(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 1);
assert_eq!(b.receive_messages().len(), 1);
assert_eq!(c.receive_messages().len(), 0);
a.send_message(node_b, ());
b.send_message(node_c, ());
network.collect_messages();
network.dispatch_after(Duration::from_millis(400));
assert_eq!(a.receive_messages().len(), 1); // c to a
assert_eq!(b.receive_messages().len(), 2); // c to b && a to b
assert_eq!(c.receive_messages().len(), 2); // a to c && b to c
network.dispatch_after(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 0);
assert_eq!(c.receive_messages().len(), 1); // b to c
}
#[test]
fn node_network_capacity_limit() {
let node_a = NodeId::from_index(0);
let node_b = NodeId::from_index(1);
let regions = HashMap::from([(Region::Europe, vec![node_a, node_b])]);
let behaviour = HashMap::from([(
NetworkBehaviourKey::new(Region::Europe, Region::Europe),
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
)]);
let regions_data = RegionsData::new(regions, behaviour);
let mut network = Network::new(regions_data, 0);
let (from_a_sender, from_a_receiver) = channel::unbounded();
let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded();
let to_a_receiver =
network.connect(node_a, Some(3), from_a_receiver, from_a_broadcast_receiver);
let a = MockNetworkInterface::new(
node_a,
from_a_broadcast_sender,
from_a_sender,
to_a_receiver,
1,
);
let (from_b_sender, from_b_receiver) = channel::unbounded();
let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded();
let to_b_receiver =
network.connect(node_b, Some(2), from_b_receiver, from_b_broadcast_receiver);
let b = MockNetworkInterface::new(
node_b,
from_b_broadcast_sender,
from_b_sender,
to_b_receiver,
1,
);
for _ in 0..6 {
a.send_message(node_b, ());
b.send_message(node_a, ());
}
network.step(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 3);
assert_eq!(b.receive_messages().len(), 2);
network.step(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 3);
assert_eq!(b.receive_messages().len(), 2);
network.step(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 2);
}
#[test]
fn node_network_capacity_no_limit() {
let node_a = NodeId::from_index(0);
let node_b = NodeId::from_index(1);
let regions = HashMap::from([(Region::Europe, vec![node_a, node_b])]);
let behaviour = HashMap::from([(
NetworkBehaviourKey::new(Region::Europe, Region::Europe),
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
)]);
let regions_data = RegionsData::new(regions, behaviour);
let mut network = Network::new(regions_data, 0);
let (from_a_sender, from_a_receiver) = channel::unbounded();
let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded();
let to_a_receiver =
network.connect(node_a, None, from_a_receiver, from_a_broadcast_receiver);
let a = MockNetworkInterface::new(
node_a,
from_a_broadcast_sender,
from_a_sender,
to_a_receiver,
1000,
);
let (from_b_sender, from_b_receiver) = channel::unbounded();
let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded();
let to_b_receiver =
network.connect(node_b, None, from_b_receiver, from_b_broadcast_receiver);
let b = MockNetworkInterface::new(
node_b,
from_b_broadcast_sender,
from_b_sender,
to_b_receiver,
100,
);
for _ in 0..6 {
a.send_message(node_b, ());
b.send_message(node_a, ());
}
network.step(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 6);
assert_eq!(b.receive_messages().len(), 6);
}
#[test]
fn node_network_message_partial_send() {
let node_a = NodeId::from_index(0);
let node_b = NodeId::from_index(1);
let regions = HashMap::from([(Region::Europe, vec![node_a, node_b])]);
let behaviour = HashMap::from([(
NetworkBehaviourKey::new(Region::Europe, Region::Europe),
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
)]);
let regions_data = RegionsData::new(regions, behaviour);
let mut network = Network::new(regions_data, 0);
let (from_a_sender, from_a_receiver) = channel::unbounded();
let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded();
// Node A is connected to the network with throuput of 5.
let to_a_receiver =
network.connect(node_a, Some(5), from_a_receiver, from_a_broadcast_receiver);
// Every message sent **from** Node A will be of size 15.
let a = MockNetworkInterface::new(
node_a,
from_a_broadcast_sender,
from_a_sender,
to_a_receiver,
2,
);
let (from_b_sender, from_b_receiver) = channel::unbounded();
let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded();
// Node B is connected to the network with throuput of 1.
let to_b_receiver =
network.connect(node_b, Some(1), from_b_receiver, from_b_broadcast_receiver);
// Every message sent **from** Node B will be of size 2.
let b = MockNetworkInterface::new(
node_b,
from_b_broadcast_sender,
from_b_sender,
to_b_receiver,
15,
);
// Node A sends message of size 2 to Node B.
a.send_message(node_b, ());
// Node B sends message of size 15 to Node A.
b.send_message(node_a, ());
// Step duration matches the latency between nodes, thus Node A can receive 5 units of a
// message, Node B - 1 unit of a message during the step.
network.step(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 0);
// Node B should receive a message during the second step, because it's throughput during the
// step is 1, but the message size it receives is 2.
network.step(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 1);
// Node A should receive a message during the third step, because it's throughput during the
// step is 5, but the message it recieves is of size 15.
network.step(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 1);
assert_eq!(b.receive_messages().len(), 0);
}
}

View File

@ -0,0 +1,235 @@
// std
use rand::{seq::SliceRandom, Rng};
use std::{collections::HashMap, str::FromStr};
// crates
use serde::{Deserialize, Serialize};
// internal
use crate::{network::behaviour::NetworkBehaviour, node::NodeId};
use super::{NetworkBehaviourKey, NetworkSettings};
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum Region {
NorthAmerica,
Europe,
Asia,
Africa,
SouthAmerica,
Australia,
}
impl core::fmt::Display for Region {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let s = match self {
Self::NorthAmerica => "NorthAmerica",
Self::Europe => "Europe",
Self::Asia => "Asia",
Self::Africa => "Africa",
Self::SouthAmerica => "SouthAmerica",
Self::Australia => "Australia",
};
write!(f, "{s}")
}
}
impl FromStr for Region {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s
.trim()
.to_lowercase()
.replace(['-', '_', ' '], "")
.as_str()
{
"northamerica" | "na" => Ok(Self::NorthAmerica),
"europe" | "eu" => Ok(Self::Europe),
"asia" | "as" => Ok(Self::Asia),
"africa" | "af" => Ok(Self::Africa),
"southamerica" | "sa" => Ok(Self::SouthAmerica),
"australia" | "au" => Ok(Self::Australia),
_ => Err(format!("Unknown region: {s}")),
}
}
}
impl Serialize for Region {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let s = match self {
Self::NorthAmerica => "North America",
Self::Europe => "Europe",
Self::Asia => "Asia",
Self::Africa => "Africa",
Self::SouthAmerica => "South America",
Self::Australia => "Australia",
};
serializer.serialize_str(s)
}
}
impl<'de> Deserialize<'de> for Region {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let s = String::deserialize(deserializer)?;
Self::from_str(&s).map_err(serde::de::Error::custom)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegionsData {
pub regions: HashMap<Region, Vec<NodeId>>,
#[serde(skip)]
pub node_region: HashMap<NodeId, Region>,
pub region_network_behaviour: HashMap<NetworkBehaviourKey, NetworkBehaviour>,
}
impl RegionsData {
pub fn new(
regions: HashMap<Region, Vec<NodeId>>,
region_network_behaviour: HashMap<NetworkBehaviourKey, NetworkBehaviour>,
) -> Self {
let node_region = regions
.iter()
.flat_map(|(region, nodes)| nodes.iter().copied().map(|node| (node, *region)))
.collect();
Self {
regions,
node_region,
region_network_behaviour,
}
}
pub fn node_region(&self, node_id: NodeId) -> Region {
self.node_region[&node_id]
}
pub fn network_behaviour(&self, node_a: NodeId, node_b: NodeId) -> &NetworkBehaviour {
let region_a = self.node_region[&node_a];
let region_b = self.node_region[&node_b];
let k = NetworkBehaviourKey::new(region_a, region_b);
let k_rev = NetworkBehaviourKey::new(region_b, region_a);
self.region_network_behaviour
.get(&k)
.or(self.region_network_behaviour.get(&k_rev))
.expect("Network behaviour not found for the given regions")
}
pub fn region_nodes(&self, region: Region) -> &[NodeId] {
&self.regions[&region]
}
}
// Takes a reference to the node_ids and simulation_settings and returns a HashMap
// representing the regions and their associated node IDs.
pub fn create_regions<R: Rng>(
node_ids: &[NodeId],
rng: &mut R,
network_settings: &NetworkSettings,
) -> HashMap<Region, Vec<NodeId>> {
let mut region_nodes = node_ids.to_vec();
region_nodes.shuffle(rng);
let regions = network_settings
.regions
.clone()
.into_iter()
.collect::<Vec<_>>();
let last_region_index = regions.len() - 1;
regions
.iter()
.enumerate()
.map(|(i, (region, distribution))| {
if i < last_region_index {
let node_count = (node_ids.len() as f32 * distribution).round() as usize;
let nodes = region_nodes.drain(..node_count).collect::<Vec<_>>();
(*region, nodes)
} else {
// Assign the remaining nodes to the last region.
(*region, region_nodes.clone())
}
})
.collect()
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use consensus_engine::NodeId;
use rand::rngs::mock::StepRng;
use crate::{
network::{
regions::{create_regions, Region},
NetworkSettings,
},
node::NodeIdExt,
};
#[test]
fn create_regions_precision() {
struct TestCase {
node_count: usize,
distributions: Vec<f32>,
}
let test_cases = vec![
TestCase {
node_count: 10,
distributions: vec![0.5, 0.3, 0.2],
},
TestCase {
node_count: 7,
distributions: vec![0.6, 0.4],
},
TestCase {
node_count: 20,
distributions: vec![0.4, 0.3, 0.2, 0.1],
},
TestCase {
node_count: 23,
distributions: vec![0.4, 0.3, 0.3],
},
TestCase {
node_count: 111,
distributions: vec![0.3, 0.3, 0.3, 0.1],
},
TestCase {
node_count: 73,
distributions: vec![0.3, 0.2, 0.2, 0.2, 0.1],
},
];
let mut rng = StepRng::new(1, 0);
for tcase in test_cases.iter() {
let nodes = (0..tcase.node_count)
.map(NodeId::from_index)
.collect::<Vec<NodeId>>();
let available_regions = [
Region::NorthAmerica,
Region::Europe,
Region::Asia,
Region::Africa,
Region::SouthAmerica,
Region::Australia,
];
let mut region_distribution = HashMap::new();
for (region, &dist) in available_regions.iter().zip(&tcase.distributions) {
region_distribution.insert(*region, dist);
}
let settings = NetworkSettings {
network_behaviors: HashMap::new(),
regions: region_distribution,
};
let regions = create_regions(&nodes, &mut rng, &settings);
let total_nodes_in_regions = regions.values().map(|v| v.len()).sum::<usize>();
assert_eq!(total_nodes_in_regions, nodes.len());
}
}
}

View File

@ -0,0 +1,45 @@
use serde::{Deserialize, Serialize};
use std::time::Duration;
use super::{Node, NodeId};
#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize)]
pub struct DummyStreamingState {
pub counter: usize,
}
/// This node implementation only used for testing different streaming implementation purposes.
pub struct DummyStreamingNode<S> {
id: NodeId,
state: DummyStreamingState,
#[allow(dead_code)]
settings: S,
}
impl<S> DummyStreamingNode<S> {
pub fn new(id: NodeId, settings: S) -> Self {
Self {
id,
state: DummyStreamingState::default(),
settings,
}
}
}
impl<S> Node for DummyStreamingNode<S> {
type Settings = S;
type State = DummyStreamingState;
fn id(&self) -> NodeId {
self.id
}
fn state(&self) -> &Self::State {
&self.state
}
fn step(&mut self, _: Duration) {
todo!()
}
}

View File

@ -0,0 +1,213 @@
#[cfg(test)]
pub mod dummy_streaming;
// std
use std::{
collections::HashMap,
ops::{Deref, DerefMut},
sync::Arc,
time::Duration,
};
// crates
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
// internal
#[serde_with::serde_as]
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct StepTime(#[serde_as(as = "serde_with::DurationMilliSeconds")] Duration);
impl From<Duration> for StepTime {
fn from(duration: Duration) -> Self {
Self(duration)
}
}
impl StepTime {
#[inline]
pub const fn new(duration: Duration) -> Self {
Self(duration)
}
#[inline]
pub const fn into_inner(&self) -> Duration {
self.0
}
#[inline]
pub const fn from_millis(millis: u64) -> Self {
Self(Duration::from_millis(millis))
}
#[inline]
pub const fn from_secs(secs: u64) -> Self {
Self(Duration::from_secs(secs))
}
}
impl Deref for StepTime {
type Target = Duration;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for StepTime {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl core::iter::Sum<Self> for StepTime {
fn sum<I: Iterator<Item = Self>>(iter: I) -> Self {
Self(iter.into_iter().map(|s| s.0).sum())
}
}
impl core::iter::Sum<Duration> for StepTime {
fn sum<I: Iterator<Item = Duration>>(iter: I) -> Self {
Self(iter.into_iter().sum())
}
}
impl core::iter::Sum<StepTime> for Duration {
fn sum<I: Iterator<Item = StepTime>>(iter: I) -> Self {
iter.into_iter().map(|s| s.0).sum()
}
}
pub type SharedState<S> = Arc<RwLock<S>>;
pub type Step = usize;
/// A state that represents how nodes are interconnected in the network.
pub struct OverlayState {
pub all_nodes: Vec<NodeId>,
pub overlay: MixnetOverlay,
}
#[derive(Clone, Debug)]
pub struct MixnetOverlay {
pub connections: HashMap<NodeId, Vec<NodeId>>,
}
pub trait OverlayGetter {
fn get_overlay(&self) -> MixnetOverlay;
fn get_all_nodes(&self) -> Vec<NodeId>;
}
impl OverlayGetter for SharedState<MixnetOverlay> {
fn get_overlay(&self) -> MixnetOverlay {
let overlay_state = self.read();
overlay_state.clone()
}
fn get_all_nodes(&self) -> Vec<NodeId> {
let overlay_state = self.read();
overlay_state.connections.keys().cloned().collect()
}
}
pub trait Node {
type Settings;
type State;
fn id(&self) -> NodeId;
fn state(&self) -> &Self::State;
fn step(&mut self, elapsed: Duration);
}
#[derive(
Clone, Copy, Default, Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Serialize, Deserialize,
)]
pub struct NodeId(pub [u8; 32]);
impl NodeId {
pub const fn new(val: [u8; 32]) -> Self {
Self(val)
}
/// Returns a random node id
pub fn random<R: rand::Rng>(rng: &mut R) -> Self {
let mut bytes = [0u8; 32];
rng.fill_bytes(&mut bytes);
Self(bytes)
}
}
impl From<[u8; 32]> for NodeId {
fn from(id: [u8; 32]) -> Self {
Self(id)
}
}
impl From<&[u8; 32]> for NodeId {
fn from(id: &[u8; 32]) -> Self {
Self(*id)
}
}
impl From<NodeId> for [u8; 32] {
fn from(id: NodeId) -> Self {
id.0
}
}
impl<'a> From<&'a NodeId> for &'a [u8; 32] {
fn from(id: &'a NodeId) -> Self {
&id.0
}
}
impl core::fmt::Display for NodeId {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "0x")?;
for v in self.0 {
write!(f, "{:02x}", v)?;
}
Ok(())
}
}
#[cfg(test)]
impl Node for usize {
type Settings = ();
type State = Self;
fn id(&self) -> NodeId {
todo!()
}
fn state(&self) -> &Self::State {
self
}
fn step(&mut self, _: Duration) {
use std::ops::AddAssign;
self.add_assign(1);
}
}
pub trait NodeIdExt {
fn index(&self) -> usize;
fn from_index(idx: usize) -> Self;
}
impl NodeIdExt for NodeId {
fn index(&self) -> usize {
const SIZE: usize = core::mem::size_of::<usize>();
let mut bytes = [0u8; SIZE];
let src: [u8; 32] = (*self).into();
bytes.copy_from_slice(&src[..SIZE]);
usize::from_be_bytes(bytes)
}
fn from_index(idx: usize) -> Self {
let mut bytes = [0u8; 32];
bytes[..core::mem::size_of::<usize>()].copy_from_slice(&idx.to_be_bytes());
NodeId::new(bytes)
}
}

View File

@ -0,0 +1,132 @@
use std::time::Duration;
use chrono::{DateTime, Utc};
use serde::Serialize;
use crate::settings::SimulationSettings;
use crate::warding::SimulationState;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum RecordType {
Meta,
Settings,
Data,
}
pub trait Record: From<Runtime> + From<SimulationSettings> + Send + Sync + 'static {
type Data: serde::Serialize;
fn record_type(&self) -> RecordType;
fn is_settings(&self) -> bool {
self.record_type() == RecordType::Settings
}
fn is_meta(&self) -> bool {
self.record_type() == RecordType::Meta
}
fn is_data(&self) -> bool {
self.record_type() == RecordType::Data
}
fn data(&self) -> Vec<&Self::Data>;
}
pub type SerializedNodeState = serde_json::Value;
#[derive(Serialize)]
pub struct Runtime {
start: DateTime<Utc>,
end: DateTime<Utc>,
elapsed: Duration,
}
impl Runtime {
pub(crate) fn load() -> anyhow::Result<Self> {
let elapsed = crate::START_TIME.elapsed();
let end = Utc::now();
Ok(Self {
start: end
.checked_sub_signed(chrono::Duration::from_std(elapsed)?)
.unwrap(),
end,
elapsed,
})
}
}
#[derive(Serialize)]
#[serde(untagged)]
pub enum OutData {
Runtime(Runtime),
Settings(Box<SimulationSettings>),
Data(SerializedNodeState),
}
impl From<Runtime> for OutData {
fn from(runtime: Runtime) -> Self {
Self::Runtime(runtime)
}
}
impl From<SimulationSettings> for OutData {
fn from(settings: SimulationSettings) -> Self {
Self::Settings(Box::new(settings))
}
}
impl From<SerializedNodeState> for OutData {
fn from(state: SerializedNodeState) -> Self {
Self::Data(state)
}
}
impl Record for OutData {
type Data = SerializedNodeState;
fn record_type(&self) -> RecordType {
match self {
Self::Runtime(_) => RecordType::Meta,
Self::Settings(_) => RecordType::Settings,
Self::Data(_) => RecordType::Data,
}
}
fn data(&self) -> Vec<&SerializedNodeState> {
match self {
Self::Data(d) => vec![d],
_ => unreachable!(),
}
}
}
impl OutData {
#[inline]
pub const fn new(state: SerializedNodeState) -> Self {
Self::Data(state)
}
}
impl<S, T: Serialize + Clone> TryFrom<&SimulationState<S, T>> for OutData {
type Error = anyhow::Error;
fn try_from(state: &SimulationState<S, T>) -> Result<Self, Self::Error> {
serde_json::to_value(
state
.nodes
.read()
.iter()
.map(|n| n.state())
.collect::<Vec<_>>(),
)
.map(OutData::new)
.map_err(From::from)
}
}
pub trait NodeStateRecord {
fn get_serialized_state_record(&self) -> SerializedNodeState {
SerializedNodeState::Null
}
}

View File

@ -0,0 +1,75 @@
use crate::node::NodeId;
use crate::output_processors::Record;
use crate::runner::SimulationRunner;
use crate::warding::SimulationState;
use crossbeam::channel::bounded;
use crossbeam::select;
use rand::prelude::SliceRandom;
use rayon::prelude::*;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use super::SimulationRunnerHandle;
/// Simulate with sending the network state to any subscriber
pub fn simulate<M, R, S, T>(
runner: SimulationRunner<M, R, S, T>,
chunk_size: usize,
step_time: Duration,
) -> anyhow::Result<SimulationRunnerHandle<R>>
where
M: std::fmt::Debug + Clone + Send + Sync + 'static,
R: Record
+ for<'a> TryFrom<&'a SimulationState<S, T>, Error = anyhow::Error>
+ Send
+ Sync
+ 'static,
S: 'static,
T: 'static,
{
let simulation_state = SimulationState {
nodes: Arc::clone(&runner.nodes),
};
let mut node_ids: Vec<NodeId> = runner.nodes.read().iter().map(|n| n.id()).collect();
let mut inner_runner = runner.inner;
let nodes = runner.nodes;
let (stop_tx, stop_rx) = bounded(1);
let p = runner.producer.clone();
let p1 = runner.producer;
let handle = std::thread::spawn(move || {
loop {
select! {
recv(stop_rx) -> _ => {
return Ok(());
}
default => {
node_ids.shuffle(&mut inner_runner.rng);
for ids_chunk in node_ids.chunks(chunk_size) {
let ids: HashSet<NodeId> = ids_chunk.iter().copied().collect();
nodes
.write()
.par_iter_mut()
.filter(|n| ids.contains(&n.id()))
.for_each(|node| node.step(step_time));
p.send(R::try_from(
&simulation_state,
)?)?;
}
// check if any condition makes the simulation stop
if inner_runner.check_wards(&simulation_state) {
return Ok(());
}
}
}
}
});
Ok(SimulationRunnerHandle {
producer: p1,
stop_tx,
handle,
})
}

View File

@ -0,0 +1,214 @@
mod async_runner;
mod sync_runner;
// std
use std::sync::Arc;
use std::time::Duration;
use crate::output_processors::Record;
// crates
use crate::streaming::{
runtime_subscriber::RuntimeSubscriber, settings_subscriber::SettingsSubscriber, StreamProducer,
Subscriber, SubscriberHandle,
};
use crossbeam::channel::Sender;
use parking_lot::RwLock;
use rand::rngs::SmallRng;
use rand::{RngCore, SeedableRng};
use rayon::prelude::*;
use serde::Serialize;
// internal
use crate::network::Network;
use crate::node::Node;
use crate::settings::{RunnerSettings, SimulationSettings};
use crate::warding::{SimulationState, SimulationWard, Ward};
pub type BoxedNode<S, T> = Box<dyn Node<Settings = S, State = T> + Send + Sync>;
pub struct SimulationRunnerHandle<R> {
producer: StreamProducer<R>,
stop_tx: Sender<()>,
handle: std::thread::JoinHandle<anyhow::Result<()>>,
}
impl<R: Record> SimulationRunnerHandle<R> {
pub fn stop_after(self, duration: Duration) -> anyhow::Result<()> {
std::thread::sleep(duration);
self.stop()
}
pub fn stop(&self) -> anyhow::Result<()> {
if !self.handle.is_finished() {
self.stop_tx.send(())?;
self.shutdown()?;
}
Ok(())
}
pub fn subscribe<S: Subscriber<Record = R>>(
&self,
settings: S::Settings,
) -> anyhow::Result<SubscriberHandle<S>> {
self.producer.subscribe(settings)
}
pub fn is_finished(&self) -> bool {
self.handle.is_finished()
}
pub fn shutdown(&self) -> anyhow::Result<()> {
self.producer.stop()
}
pub fn join(self) -> anyhow::Result<()> {
self.handle.join().expect("Join simulation thread")
}
}
pub(crate) struct SimulationRunnerInner<M: std::fmt::Debug> {
network: Network<M>,
wards: Vec<Ward>,
rng: SmallRng,
}
impl<M> SimulationRunnerInner<M>
where
M: std::fmt::Debug + Send + Sync + Clone,
{
fn check_wards<S, T>(&mut self, state: &SimulationState<S, T>) -> bool {
self.wards
.par_iter_mut()
.map(|ward| ward.analyze(state))
.any(|x| x)
}
fn step<S, T>(&mut self, nodes: &mut [BoxedNode<S, T>], elapsed: Duration) {
self.network.dispatch_after(elapsed);
nodes.par_iter_mut().for_each(|node| {
node.step(elapsed);
});
self.network.collect_messages();
}
}
/// Encapsulation solution for the simulations runner
/// Holds the network state, the simulating nodes and the simulation settings.
pub struct SimulationRunner<M: std::fmt::Debug, R, S, T> {
inner: SimulationRunnerInner<M>,
nodes: Arc<RwLock<Vec<BoxedNode<S, T>>>>,
runner_settings: RunnerSettings,
producer: StreamProducer<R>,
step_time: Duration,
}
impl<M, R, S, T> SimulationRunner<M, R, S, T>
where
M: std::fmt::Debug + Clone + Send + Sync + 'static,
R: Record
+ for<'a> TryFrom<&'a SimulationState<S, T>, Error = anyhow::Error>
+ Send
+ Sync
+ 'static,
S: 'static,
T: Serialize + Clone + 'static,
{
pub fn new(
network: Network<M>,
nodes: Vec<BoxedNode<S, T>>,
producer: StreamProducer<R>,
mut settings: SimulationSettings,
) -> anyhow::Result<Self> {
let seed = settings
.seed
.unwrap_or_else(|| rand::thread_rng().next_u64());
settings
.seed
.get_or_insert_with(|| rand::thread_rng().next_u64());
// Store the settings to the producer so that we can collect them later
producer.send(R::from(settings.clone()))?;
let rng = SmallRng::seed_from_u64(seed);
let nodes = Arc::new(RwLock::new(nodes));
let SimulationSettings {
wards,
overlay_settings: _,
node_settings: _,
runner_settings,
stream_settings: _,
node_count: _,
seed: _,
views_count: _,
leaders_count: _,
network_settings: _,
step_time,
record_settings: _,
} = settings;
Ok(Self {
runner_settings,
inner: SimulationRunnerInner {
network,
rng,
wards,
},
nodes,
producer,
step_time,
})
}
pub fn simulate(self) -> anyhow::Result<SimulationRunnerHandle<R>> {
// init the start time
let _ = *crate::START_TIME;
let step_time = self.step_time;
match self.runner_settings.clone() {
RunnerSettings::Sync => sync_runner::simulate(self, step_time),
}
}
}
impl<M, R, S, T> SimulationRunner<M, R, S, T>
where
M: std::fmt::Debug + Clone + Send + Sync + 'static,
R: Record
+ serde::Serialize
+ for<'a> TryFrom<&'a SimulationState<S, T>, Error = anyhow::Error>
+ Send
+ Sync
+ 'static,
S: 'static,
T: Serialize + Clone + 'static,
{
pub fn simulate_and_subscribe<B>(
self,
settings: B::Settings,
) -> anyhow::Result<SimulationRunnerHandle<R>>
where
B: Subscriber<Record = R> + Send + Sync + 'static,
{
let handle = self.simulate()?;
let mut data_subscriber_handle = handle.subscribe::<B>(settings)?;
let mut runtime_subscriber_handle =
handle.subscribe::<RuntimeSubscriber<R>>(Default::default())?;
let mut settings_subscriber_handle =
handle.subscribe::<SettingsSubscriber<R>>(Default::default())?;
std::thread::scope(|s| {
s.spawn(move || {
data_subscriber_handle.run();
});
s.spawn(move || {
runtime_subscriber_handle.run();
});
s.spawn(move || {
settings_subscriber_handle.run();
});
});
Ok(handle)
}
}

View File

@ -0,0 +1,236 @@
use super::{SimulationRunner, SimulationRunnerHandle};
use crate::output_processors::Record;
use crate::warding::SimulationState;
use crossbeam::channel::{bounded, select};
use std::sync::Arc;
use std::time::Duration;
/// Simulate with sending the network state to any subscriber
pub fn simulate<M, R, S, T>(
runner: SimulationRunner<M, R, S, T>,
step_time: Duration,
) -> anyhow::Result<SimulationRunnerHandle<R>>
where
M: std::fmt::Debug + Send + Sync + Clone + 'static,
R: Record
+ for<'a> TryFrom<&'a SimulationState<S, T>, Error = anyhow::Error>
+ Send
+ Sync
+ 'static,
S: 'static,
T: 'static,
{
let state = SimulationState {
nodes: Arc::clone(&runner.nodes),
};
let mut inner_runner = runner.inner;
let nodes = runner.nodes;
let (stop_tx, stop_rx) = bounded(1);
let p = runner.producer.clone();
let p1 = runner.producer;
let handle = std::thread::spawn(move || {
p.send(R::try_from(&state)?)?;
loop {
select! {
recv(stop_rx) -> _ => {
return Ok(());
}
default => {
// we must use a code block to make sure once the step call is finished then the write lock will be released, because in Record::try_from(&state),
// we need to call the read lock, if we do not release the write lock,
// then dead lock will occur
{
let mut nodes = nodes.write();
inner_runner.step(&mut nodes, step_time);
}
p.send(R::try_from(&state)?)?;
// check if any condition makes the simulation stop
if inner_runner.check_wards(&state) {
return Ok(());
}
}
}
}
});
Ok(SimulationRunnerHandle {
producer: p1,
stop_tx,
handle,
})
}
// #[cfg(test)]
// mod tests {
// use crate::{
// network::{
// behaviour::NetworkBehaviour,
// regions::{Region, RegionsData},
// InMemoryNetworkInterface, Network, NetworkBehaviourKey,
// },
// node::{Node, NodeId, NodeIdExt, OverlayState, SharedState},
// output_processors::OutData,
// runner::SimulationRunner,
// settings::SimulationSettings,
// streaming::StreamProducer,
// };
// use crossbeam::channel;
// use parking_lot::RwLock;
// use rand::rngs::mock::StepRng;
// use std::{
// collections::{BTreeMap, HashMap},
// sync::Arc,
// time::Duration,
// };
//
// fn init_network(node_ids: &[NodeId]) -> Network<DummyMessage> {
// let regions = HashMap::from([(Region::Europe, node_ids.to_vec())]);
// let behaviour = HashMap::from([(
// NetworkBehaviourKey::new(Region::Europe, Region::Europe),
// NetworkBehaviour::new(Duration::from_millis(100), 0.0),
// )]);
// let regions_data = RegionsData::new(regions, behaviour);
// Network::new(regions_data, 0)
// }
//
// fn init_dummy_nodes(
// node_ids: &[NodeId],
// network: &mut Network<DummyMessage>,
// overlay_state: SharedState<OverlayState>,
// ) -> Vec<DummyNode> {
// node_ids
// .iter()
// .map(|node_id| {
// let (node_message_sender, node_message_receiver) = channel::unbounded();
// let (node_message_broadcast_sender, node_message_broadcast_receiver) =
// channel::unbounded();
// let network_message_receiver = network.connect(
// *node_id,
// Some(1),
// node_message_receiver,
// node_message_broadcast_receiver,
// );
// let network_interface = InMemoryNetworkInterface::new(
// *node_id,
// node_message_broadcast_sender,
// node_message_sender,
// network_message_receiver,
// );
// DummyNode::new(
// *node_id,
// View::new(0),
// overlay_state.clone(),
// network_interface,
// )
// })
// .collect()
// }
//
// #[test]
// fn runner_one_step() {
// let settings = SimulationSettings {
// node_count: 10,
// ..Default::default()
// };
//
// let mut rng = StepRng::new(1, 0);
// let node_ids: Vec<NodeId> = (0..settings.node_count).map(NodeId::from_index).collect();
// let overlay = TreeOverlay::new(TreeSettings::default());
// let mut network = init_network(&node_ids);
// let view = ViewOverlay {
// leaders: overlay.leaders(&node_ids, 1, &mut rng).collect(),
// layout: overlay.layout(&node_ids, &mut rng),
// };
// let overlay_state = Arc::new(RwLock::new(OverlayState {
// all_nodes: node_ids.clone(),
// overlay: SimulationOverlay::Tree(overlay),
// overlays: BTreeMap::from([(View::new(0), view.clone()), (View::new(1), view)]),
// }));
// let nodes = init_dummy_nodes(&node_ids, &mut network, overlay_state)
// .into_iter()
// .map(|n| {
// Box::new(n)
// as Box<
// dyn Node<State = DummyState, Settings = DummySettings>
// + std::marker::Send
// + Sync,
// >
// })
// .collect();
//
// let producer = StreamProducer::default();
// let mut runner: SimulationRunner<DummyMessage, OutData, DummySettings, DummyState> =
// SimulationRunner::<_, OutData, DummySettings, DummyState>::new(
// network, nodes, producer, settings,
// )
// .unwrap();
// let mut nodes = runner.nodes.write();
// runner.inner.step(&mut nodes, Duration::from_millis(100));
// drop(nodes);
//
// let nodes = runner.nodes.read();
// for node in nodes.iter() {
// assert_eq!(node.current_view(), View::new(0));
// }
// }
//
// #[test]
// fn runner_send_receive() {
// let settings = SimulationSettings {
// node_count: 10,
// ..Default::default()
// };
//
// let mut rng = StepRng::new(1, 0);
// let node_ids: Vec<NodeId> = (0..settings.node_count).map(NodeId::from_index).collect();
// let overlay = TreeOverlay::new(TreeSettings::default());
// let mut network = init_network(&node_ids);
// let view = ViewOverlay {
// leaders: overlay.leaders(&node_ids, 1, &mut rng).collect(),
// layout: overlay.layout(&node_ids, &mut rng),
// };
// let overlay_state = Arc::new(RwLock::new(OverlayState {
// all_nodes: node_ids.clone(),
// overlay: SimulationOverlay::Tree(overlay),
// overlays: BTreeMap::from([
// (View::new(0), view.clone()),
// (View::new(1), view.clone()),
// (View::new(42), view.clone()),
// (View::new(43), view),
// ]),
// }));
// let nodes = init_dummy_nodes(&node_ids, &mut network, overlay_state.clone());
//
// for node in nodes.iter() {
// // All nodes send one message to NodeId(1).
// // Nodes can send messages to themselves.
// node.send_message(node_ids[1], DummyMessage::Proposal(View::new(42).into()));
// }
// network.collect_messages();
//
// let nodes = init_dummy_nodes(&node_ids, &mut network, overlay_state)
// .into_iter()
// .map(|n| {
// Box::new(n)
// as Box<
// dyn Node<State = DummyState, Settings = DummySettings>
// + std::marker::Send
// + Sync,
// >
// })
// .collect();
//
// let mut runner: SimulationRunner<DummyMessage, OutData, DummySettings, DummyState> =
// SimulationRunner::new(network, nodes, Default::default(), settings).unwrap();
//
// let mut nodes = runner.nodes.write();
// runner.inner.step(&mut nodes, Duration::from_millis(100));
// drop(nodes);
//
// let nodes = runner.nodes.read();
// let state = nodes[1].state();
// assert_eq!(state.message_count, 10);
// }
// }

View File

@ -0,0 +1,58 @@
use std::collections::BTreeMap;
use crate::network::NetworkSettings;
use crate::streaming::StreamSettings;
use crate::warding::Ward;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub enum RunnerSettings {
#[default]
Sync,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
#[serde(untagged)]
pub enum OverlaySettings {
#[default]
Flat,
Tree(TreeSettings),
Branch(BranchSettings),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TreeSettings {
pub number_of_committees: usize,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BranchSettings {
pub branch_depth: usize,
}
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct NodeSettings {
pub network_capacity_kbps: Option<u32>,
#[serde(with = "humantime_serde")]
pub timeout: std::time::Duration,
}
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct SimulationSettings {
#[serde(default)]
pub wards: Vec<Ward>,
#[serde(default)]
pub record_settings: BTreeMap<String, bool>,
pub network_settings: NetworkSettings,
pub overlay_settings: OverlaySettings,
pub node_settings: NodeSettings,
#[serde(default)]
pub runner_settings: RunnerSettings,
pub stream_settings: StreamSettings,
#[serde(with = "humantime_serde")]
pub step_time: std::time::Duration,
pub node_count: usize,
pub views_count: usize,
pub leaders_count: usize,
pub seed: Option<u64>,
}

View File

@ -0,0 +1,235 @@
use std::{any::Any, io::stdout, sync::Arc};
use super::{Receivers, StreamSettings, Subscriber};
use crate::output_processors::{RecordType, Runtime};
use crossbeam::channel::{Receiver, Sender};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct IOStreamSettings {
#[serde(rename = "type")]
pub writer_type: WriteType,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WriteType {
#[default]
Stdout,
}
pub trait ToWriter<W: std::io::Write + Send + Sync + 'static> {
fn to_writer(&self) -> anyhow::Result<W>;
}
impl<W: std::io::Write + Send + Sync + 'static> ToWriter<W> for WriteType {
fn to_writer(&self) -> anyhow::Result<W> {
match self {
WriteType::Stdout => {
let boxed_any = Box::new(stdout()) as Box<dyn Any + Send + Sync>;
Ok(boxed_any.downcast::<W>().map(|boxed| *boxed).map_err(|_| {
std::io::Error::new(std::io::ErrorKind::Other, "Writer type mismatch")
})?)
}
}
}
}
impl TryFrom<StreamSettings> for IOStreamSettings {
type Error = String;
fn try_from(settings: StreamSettings) -> Result<Self, Self::Error> {
match settings {
StreamSettings::IO(settings) => Ok(settings),
_ => Err("io settings can't be created".into()),
}
}
}
#[derive(Debug)]
pub struct IOSubscriber<R, W = std::io::Stdout> {
recvs: Arc<Receivers<R>>,
writer: Arc<Mutex<W>>,
}
impl<W, R> Subscriber for IOSubscriber<R, W>
where
W: std::io::Write + Send + Sync + 'static,
R: crate::output_processors::Record + Serialize,
{
type Record = R;
type Settings = IOStreamSettings;
fn new(
record_recv: Receiver<Arc<Self::Record>>,
stop_recv: Receiver<Sender<()>>,
settings: Self::Settings,
) -> anyhow::Result<Self>
where
Self: Sized,
{
Ok(Self {
recvs: Arc::new(Receivers {
stop_rx: stop_recv,
recv: record_recv,
}),
writer: Arc::new(Mutex::new(settings.writer_type.to_writer()?)),
})
}
fn next(&self) -> Option<anyhow::Result<Arc<Self::Record>>> {
Some(self.recvs.recv.recv().map_err(From::from))
}
fn run(self) -> anyhow::Result<()> {
loop {
crossbeam::select! {
recv(self.recvs.stop_rx) -> finish_tx => {
// Flush remaining messages after stop signal.
while let Ok(msg) = self.recvs.recv.try_recv() {
self.sink(msg)?;
}
// collect the run time meta
self.sink(Arc::new(R::from(Runtime::load()?)))?;
finish_tx?.send(())?
}
recv(self.recvs.recv) -> msg => {
self.sink(msg?)?;
}
}
}
}
fn sink(&self, state: Arc<Self::Record>) -> anyhow::Result<()> {
serde_json::to_writer(&mut *self.writer.lock(), &state)?;
Ok(())
}
fn subscribe_data_type() -> RecordType {
RecordType::Data
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, time::Duration};
use consensus_engine::View;
use crate::{
network::{
behaviour::NetworkBehaviour,
regions::{Region, RegionsData},
Network, NetworkBehaviourKey,
},
node::{
dummy_streaming::{DummyStreamingNode, DummyStreamingState},
Node, NodeId, NodeIdExt,
},
output_processors::OutData,
runner::SimulationRunner,
warding::SimulationState,
};
use super::*;
#[derive(Debug, Clone, Serialize)]
struct IORecord {
states: HashMap<NodeId, View>,
}
impl<S, T: Serialize> TryFrom<&SimulationState<S, T>> for IORecord {
type Error = anyhow::Error;
fn try_from(value: &SimulationState<S, T>) -> Result<Self, Self::Error> {
let nodes = value.nodes.read();
Ok(Self {
states: nodes
.iter()
.map(|node| (node.id(), node.current_view()))
.collect(),
})
}
}
#[test]
fn test_streaming() {
let simulation_settings = crate::settings::SimulationSettings {
seed: Some(1),
..Default::default()
};
let nodes = (0..6)
.map(|idx| {
Box::new(DummyStreamingNode::new(NodeId::from_index(idx), ()))
as Box<
dyn Node<State = DummyStreamingState, Settings = ()>
+ std::marker::Send
+ Sync,
>
})
.collect::<Vec<_>>();
let network = Network::new(
RegionsData {
regions: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
(region, vec![NodeId::from_index(idx)])
})
.collect(),
node_region: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
(NodeId::from_index(idx), region)
})
.collect(),
region_network_behaviour: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
(
NetworkBehaviourKey::new(region, region),
NetworkBehaviour {
delay: Duration::from_millis(100),
drop: 0.0,
},
)
})
.collect(),
},
0,
);
let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> =
SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap();
simulation_runner
.simulate()
.unwrap()
.stop_after(Duration::from_millis(100))
.unwrap();
}
}

View File

@ -0,0 +1,367 @@
use std::{
str::FromStr,
sync::{Arc, Mutex},
time::Duration,
};
use crossbeam::channel::{bounded, unbounded, Receiver, Sender};
use serde::{Deserialize, Serialize};
use crate::output_processors::{Record, RecordType, Runtime};
pub mod io;
pub mod naive;
#[cfg(feature = "polars")]
pub mod polars;
pub mod runtime_subscriber;
pub mod settings_subscriber;
#[derive(Debug, Default, Clone, Copy, Serialize, PartialEq, Eq)]
pub enum SubscriberFormat {
Json,
#[default]
Csv,
Parquet,
}
impl SubscriberFormat {
pub const fn csv() -> Self {
Self::Csv
}
pub const fn json() -> Self {
Self::Json
}
pub const fn parquet() -> Self {
Self::Parquet
}
pub fn is_csv(&self) -> bool {
matches!(self, Self::Csv)
}
}
impl FromStr for SubscriberFormat {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.trim().to_ascii_lowercase().as_str() {
"json" => Ok(Self::Json),
"csv" => Ok(Self::Csv),
"parquet" => Ok(Self::Parquet),
tag => Err(format!(
"Invalid {tag} format, only [json, csv, parquet] are supported",
)),
}
}
}
impl<'de> Deserialize<'de> for SubscriberFormat {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
SubscriberFormat::from_str(&s).map_err(serde::de::Error::custom)
}
}
pub enum SubscriberType {
Meta,
Settings,
Data,
}
#[derive(Debug)]
struct Receivers<R> {
stop_rx: Receiver<Sender<()>>,
recv: Receiver<Arc<R>>,
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize)]
pub enum StreamType {
#[default]
IO,
Naive,
#[cfg(feature = "polars")]
Polars,
}
impl FromStr for StreamType {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.trim().to_ascii_lowercase().as_str() {
"io" => Ok(Self::IO),
"naive" => Ok(Self::Naive),
#[cfg(feature = "polars")]
"polars" => Ok(Self::Polars),
tag => Err(format!(
"Invalid {tag} streaming type, only [naive, polars] are supported",
)),
}
}
}
impl<'de> serde::Deserialize<'de> for StreamType {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
StreamType::from_str(&s).map_err(serde::de::Error::custom)
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase", untagged)]
pub enum StreamSettings {
Naive(naive::NaiveSettings),
IO(io::IOStreamSettings),
#[cfg(feature = "polars")]
Polars(polars::PolarsSettings),
}
impl Default for StreamSettings {
fn default() -> Self {
Self::IO(Default::default())
}
}
impl StreamSettings {
pub fn unwrap_naive(self) -> naive::NaiveSettings {
match self {
StreamSettings::Naive(settings) => settings,
_ => panic!("unwrap naive failed"),
}
}
pub fn unwrap_io(self) -> io::IOStreamSettings {
match self {
StreamSettings::IO(settings) => settings,
_ => panic!("unwrap io failed"),
}
}
#[cfg(feature = "polars")]
pub fn unwrap_polars(self) -> polars::PolarsSettings {
match self {
StreamSettings::Polars(settings) => settings,
_ => panic!("unwrap polars failed"),
}
}
}
pub struct SubscriberHandle<S> {
handle: Option<std::thread::JoinHandle<anyhow::Result<()>>>,
stop_tx: Sender<Sender<()>>,
subscriber: Option<S>,
}
impl<S> SubscriberHandle<S>
where
S: Subscriber + Send + 'static,
{
pub fn run(&mut self) {
if self.handle.is_some() {
return;
}
// unwrap safe here, because if handld is none, then we must have not booted the subscriber.
let subscriber = self.subscriber.take().unwrap();
let handle = std::thread::spawn(move || subscriber.run());
self.handle = Some(handle);
}
pub fn stop_after(self, duration: Duration) -> anyhow::Result<()> {
std::thread::sleep(duration);
self.stop()
}
pub fn stop(self) -> anyhow::Result<()> {
if let Some(handle) = self.handle {
// if we have a handle, and the handle is not finished
if !handle.is_finished() {
let (finish_tx, finish_rx) = bounded(1);
self.stop_tx.send(finish_tx)?;
finish_rx.recv()?;
} else {
// we are sure the handle is finished, so we can join it and try to get the result.
// if we have any error on subscriber side, return the error.
match handle.join() {
Ok(rst) => rst?,
Err(_) => {
tracing::error!("Error joining subscriber thread");
}
}
}
Ok(())
} else {
// if we do not have a handle, then we have not booted the subscriber yet.
// we can just return immediately
Ok(())
}
}
}
#[derive(Debug)]
struct Senders<R> {
record_ty: RecordType,
record_sender: Sender<Arc<R>>,
stop_sender: Sender<Sender<()>>,
}
#[derive(Debug)]
struct StreamProducerInner<R> {
/// senders is used to send messages to subscribers.
senders: Vec<Senders<R>>,
/// record_cache is used to cache messsages when there are no subscribers.
record_cache: Vec<Arc<R>>,
}
impl<R> Default for StreamProducerInner<R> {
fn default() -> Self {
Self {
senders: Vec::new(),
record_cache: Vec::new(),
}
}
}
#[derive(Debug)]
pub struct StreamProducer<R> {
inner: Arc<Mutex<StreamProducerInner<R>>>,
}
impl<R> Default for StreamProducer<R> {
fn default() -> Self {
Self {
inner: Arc::new(Mutex::new(StreamProducerInner::default())),
}
}
}
impl<R> Clone for StreamProducer<R> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<R> StreamProducer<R> {
pub fn new() -> Self {
Self::default()
}
}
impl<R> StreamProducer<R>
where
R: Record + Send + Sync + 'static,
{
pub fn send(&self, record: R) -> anyhow::Result<()> {
let mut inner = self.inner.lock().unwrap();
if inner.senders.is_empty() {
inner.record_cache.push(Arc::new(record));
Ok(())
} else {
let record = Arc::new(record);
// cache record for new subscriber
inner.record_cache.push(record.clone());
// if a send fails, then it means the corresponding subscriber is dropped,
// we just remove the sender from the list of senders.
inner.senders.retain(|tx| {
if tx.record_ty != record.record_type() {
true
} else {
tx.record_sender.send(Arc::clone(&record)).is_ok()
}
});
Ok(())
}
}
pub fn subscribe<S: Subscriber<Record = R>>(
&self,
settings: S::Settings,
) -> anyhow::Result<SubscriberHandle<S>> {
let (tx, rx) = unbounded();
let (stop_tx, stop_rx) = bounded(1);
let mut inner = self.inner.lock().unwrap();
// send all previous records to the new subscriber
for record in inner.record_cache.iter() {
if S::subscribe_data_type() == record.record_type() {
tx.send(Arc::clone(record))?;
}
}
inner.senders.push(Senders {
record_sender: tx,
stop_sender: stop_tx.clone(),
record_ty: S::subscribe_data_type(),
});
Ok(SubscriberHandle {
handle: None,
stop_tx,
subscriber: Some(S::new(rx, stop_rx, settings)?),
})
}
pub fn stop(&self) -> anyhow::Result<()> {
let meta_record = Arc::new(R::from(Runtime::load()?));
let inner = self.inner.lock().unwrap();
// send runtime record to runtime subscribers
inner.senders.iter().for_each(|tx| {
if tx.record_ty == meta_record.record_type() {
if let Err(e) = tx.record_sender.send(Arc::clone(&meta_record)) {
tracing::error!("Error sending meta record: {e}");
}
}
});
// send stop signal to all subscribers
inner.senders.iter().for_each(|tx| {
let (finish_tx, finish_rx) = bounded(1);
if let Err(e) = tx.stop_sender.send(finish_tx) {
tracing::error!("Error stopping subscriber: {e}");
} else if let Err(e) = finish_rx.recv() {
tracing::error!("Error finilizing subscriber: {e}");
}
});
Ok(())
}
}
pub trait Subscriber {
type Settings;
type Record: crate::output_processors::Record + Serialize;
fn new(
record_recv: Receiver<Arc<Self::Record>>,
stop_recv: Receiver<Sender<()>>,
settings: Self::Settings,
) -> anyhow::Result<Self>
where
Self: Sized;
fn next(&self) -> Option<anyhow::Result<Arc<Self::Record>>>;
fn run(self) -> anyhow::Result<()>
where
Self: Sized,
{
while let Some(state) = self.next() {
self.sink(state?)?;
}
Ok(())
}
fn sink(&self, state: Arc<Self::Record>) -> anyhow::Result<()>;
fn subscribe_data_type() -> RecordType;
}

View File

@ -0,0 +1,311 @@
use super::{Receivers, StreamSettings, Subscriber, SubscriberFormat};
use crate::output_processors::{Record, RecordType, Runtime};
use crossbeam::channel::{Receiver, Sender};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::{
fs::{File, OpenOptions},
io::{Seek, Write},
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NaiveSettings {
pub path: PathBuf,
#[serde(default = "SubscriberFormat::csv")]
pub format: SubscriberFormat,
}
impl TryFrom<StreamSettings> for NaiveSettings {
type Error = String;
fn try_from(settings: StreamSettings) -> Result<Self, Self::Error> {
match settings {
StreamSettings::Naive(settings) => Ok(settings),
_ => Err("naive settings can't be created".into()),
}
}
}
impl Default for NaiveSettings {
fn default() -> Self {
let mut tmp = std::env::temp_dir();
tmp.push("simulation");
tmp.set_extension("data");
Self {
path: tmp,
format: SubscriberFormat::Csv,
}
}
}
#[derive(Debug)]
pub struct NaiveSubscriber<R> {
file: Mutex<File>,
recvs: Receivers<R>,
initialized: AtomicBool,
format: SubscriberFormat,
}
impl<R> Subscriber for NaiveSubscriber<R>
where
R: crate::output_processors::Record + Serialize,
{
type Record = R;
type Settings = NaiveSettings;
fn new(
record_recv: Receiver<Arc<Self::Record>>,
stop_recv: Receiver<Sender<()>>,
settings: Self::Settings,
) -> anyhow::Result<Self>
where
Self: Sized,
{
let mut opts = OpenOptions::new();
let recvs = Receivers {
stop_rx: stop_recv,
recv: record_recv,
};
let this = NaiveSubscriber {
file: Mutex::new(
opts.truncate(true)
.create(true)
.read(true)
.write(true)
.open(&settings.path)?,
),
recvs,
initialized: AtomicBool::new(false),
format: settings.format,
};
tracing::info!(
target = "simulation",
"subscribed to {}",
settings.path.display()
);
Ok(this)
}
fn next(&self) -> Option<anyhow::Result<Arc<Self::Record>>> {
Some(self.recvs.recv.recv().map_err(From::from))
}
fn run(self) -> anyhow::Result<()> {
loop {
crossbeam::select! {
recv(self.recvs.stop_rx) -> finish_tx => {
// Flush remaining messages after stop signal.
while let Ok(msg) = self.recvs.recv.try_recv() {
self.sink(msg)?;
}
// collect the run time meta
self.sink(Arc::new(R::from(Runtime::load()?)))?;
finish_tx?.send(())?
}
recv(self.recvs.recv) -> msg => {
self.sink(msg?)?;
}
}
}
}
fn sink(&self, state: Arc<Self::Record>) -> anyhow::Result<()> {
let mut file = self.file.lock();
match self.format {
SubscriberFormat::Json => {
write_json_record(&mut *file, &self.initialized, &*state)?;
}
SubscriberFormat::Csv => {
write_csv_record(&mut *file, &self.initialized, &*state)?;
}
SubscriberFormat::Parquet => {
panic!("native subscriber does not support parquet format")
}
}
Ok(())
}
fn subscribe_data_type() -> RecordType {
RecordType::Data
}
}
impl<R> Drop for NaiveSubscriber<R> {
fn drop(&mut self) {
if SubscriberFormat::Json == self.format {
let mut file = self.file.lock();
// To construct a valid json format, we need to overwrite the last comma
if let Err(e) = file
.seek(std::io::SeekFrom::End(-1))
.and_then(|_| file.write_all(b"]}"))
{
tracing::error!(target="simulations", err=%e, "fail to close json format");
}
}
}
}
fn write_json_record<W: std::io::Write, R: Record>(
mut w: W,
initialized: &AtomicBool,
record: &R,
) -> std::io::Result<()> {
if !initialized.load(Ordering::Acquire) {
w.write_all(b"{\"records\": [")?;
initialized.store(true, Ordering::Release);
}
for data in record.data() {
serde_json::to_writer(&mut w, data)?;
w.write_all(b",")?;
}
Ok(())
}
fn write_csv_record<W: std::io::Write, R: Record>(
w: &mut W,
initialized: &AtomicBool,
record: &R,
) -> csv::Result<()> {
// If have not write csv header, then write it
let mut w = if !initialized.load(Ordering::Acquire) {
initialized.store(true, Ordering::Release);
csv::WriterBuilder::new().has_headers(true).from_writer(w)
} else {
csv::WriterBuilder::new().has_headers(false).from_writer(w)
};
for data in record.data() {
w.serialize(data).map_err(|e| {
tracing::error!(target = "simulations", err = %e, "fail to write CSV record");
e
})?;
w.flush()?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, time::Duration};
use consensus_engine::View;
use crate::{
network::{
behaviour::NetworkBehaviour,
regions::{Region, RegionsData},
Network, NetworkBehaviourKey,
},
node::{
dummy_streaming::{DummyStreamingNode, DummyStreamingState},
Node, NodeId, NodeIdExt,
},
output_processors::OutData,
runner::SimulationRunner,
warding::SimulationState,
};
use super::*;
#[derive(Debug, Clone, Serialize)]
struct NaiveRecord {
states: HashMap<NodeId, View>,
}
impl<S, T: Serialize> TryFrom<&SimulationState<S, T>> for NaiveRecord {
type Error = anyhow::Error;
fn try_from(value: &SimulationState<S, T>) -> Result<Self, Self::Error> {
Ok(Self {
states: value
.nodes
.read()
.iter()
.map(|node| (node.id(), node.current_view()))
.collect(),
})
}
}
#[test]
fn test_streaming() {
let simulation_settings = crate::settings::SimulationSettings {
seed: Some(1),
..Default::default()
};
let nodes = (0..6)
.map(|idx| {
Box::new(DummyStreamingNode::new(NodeId::from_index(idx), ()))
as Box<
dyn Node<State = DummyStreamingState, Settings = ()>
+ std::marker::Send
+ Sync,
>
})
.collect::<Vec<_>>();
let network = Network::new(
RegionsData {
regions: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
(region, vec![NodeId::from_index(idx)])
})
.collect(),
node_region: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
(NodeId::from_index(idx), region)
})
.collect(),
region_network_behaviour: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
(
NetworkBehaviourKey::new(region, region),
NetworkBehaviour {
delay: Duration::from_millis(100),
drop: 0.0,
},
)
})
.collect(),
},
0,
);
let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> =
SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap();
simulation_runner.simulate().unwrap();
}
}

View File

@ -0,0 +1,156 @@
use super::{Receivers, StreamSettings, SubscriberFormat};
use crate::output_processors::{RecordType, Runtime};
use crossbeam::channel::{Receiver, Sender};
use parking_lot::Mutex;
use polars::prelude::*;
use serde::{Deserialize, Serialize};
use std::{
fs::File,
io::Cursor,
path::{Path, PathBuf},
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PolarsSettings {
pub format: SubscriberFormat,
#[serde(skip_serializing_if = "Option::is_none")]
pub path: Option<PathBuf>,
}
impl TryFrom<StreamSettings> for PolarsSettings {
type Error = String;
fn try_from(settings: StreamSettings) -> Result<Self, Self::Error> {
match settings {
StreamSettings::Polars(settings) => Ok(settings),
_ => Err("polars settings can't be created".into()),
}
}
}
#[derive(Debug)]
pub struct PolarsSubscriber<R> {
data: Mutex<Vec<Arc<R>>>,
path: PathBuf,
format: SubscriberFormat,
recvs: Receivers<R>,
}
impl<R> PolarsSubscriber<R>
where
R: Serialize,
{
fn persist(&self) -> anyhow::Result<()> {
let data = self.data.lock();
let mut cursor = Cursor::new(Vec::new());
serde_json::to_writer(&mut cursor, &*data).expect("Dump data to json ");
let mut data = JsonReader::new(cursor)
.finish()
.expect("Load dataframe from intermediary json");
data.unnest(["state"])?;
match self.format {
SubscriberFormat::Json => dump_dataframe_to_json(&mut data, self.path.as_path()),
SubscriberFormat::Csv => dump_dataframe_to_csv(&mut data, self.path.as_path()),
SubscriberFormat::Parquet => dump_dataframe_to_parquet(&mut data, self.path.as_path()),
}
}
}
impl<R> super::Subscriber for PolarsSubscriber<R>
where
R: crate::output_processors::Record + Serialize,
{
type Record = R;
type Settings = PolarsSettings;
fn new(
record_recv: Receiver<Arc<Self::Record>>,
stop_recv: Receiver<Sender<()>>,
settings: Self::Settings,
) -> anyhow::Result<Self>
where
Self: Sized,
{
let recvs = Receivers {
stop_rx: stop_recv,
recv: record_recv,
};
let this = PolarsSubscriber {
data: Mutex::new(Vec::new()),
recvs,
path: settings.path.clone().unwrap_or_else(|| {
let mut p = std::env::temp_dir().join("polars");
match settings.format {
SubscriberFormat::Json => p.set_extension("json"),
SubscriberFormat::Csv => p.set_extension("csv"),
SubscriberFormat::Parquet => p.set_extension("parquet"),
};
p
}),
format: settings.format,
};
tracing::info!(
target = "simulation",
"subscribed to {}",
this.path.display()
);
Ok(this)
}
fn next(&self) -> Option<anyhow::Result<Arc<Self::Record>>> {
Some(self.recvs.recv.recv().map_err(From::from))
}
fn run(self) -> anyhow::Result<()> {
loop {
crossbeam::select! {
recv(self.recvs.stop_rx) -> finish_tx => {
// Flush remaining messages after stop signal.
while let Ok(msg) = self.recvs.recv.try_recv() {
self.sink(msg)?;
}
// collect the run time meta
self.sink(Arc::new(R::from(Runtime::load()?)))?;
finish_tx?.send(())?;
return self.persist();
}
recv(self.recvs.recv) -> msg => {
self.sink(msg?)?;
}
}
}
}
fn sink(&self, state: Arc<Self::Record>) -> anyhow::Result<()> {
self.data.lock().push(state);
Ok(())
}
fn subscribe_data_type() -> RecordType {
RecordType::Data
}
}
fn dump_dataframe_to_json(data: &mut DataFrame, out_path: &Path) -> anyhow::Result<()> {
let out_path = out_path.with_extension("json");
let f = File::create(out_path)?;
let mut writer = polars::prelude::JsonWriter::new(f);
Ok(writer.finish(data)?)
}
fn dump_dataframe_to_csv(data: &mut DataFrame, out_path: &Path) -> anyhow::Result<()> {
let out_path = out_path.with_extension("csv");
let f = File::create(out_path)?;
let mut writer = polars::prelude::CsvWriter::new(f);
Ok(writer.finish(data)?)
}
fn dump_dataframe_to_parquet(data: &mut DataFrame, out_path: &Path) -> anyhow::Result<()> {
let out_path = out_path.with_extension("parquet");
let f = File::create(out_path)?;
let writer = polars::prelude::ParquetWriter::new(f);
Ok(writer.finish(data).map(|_| ())?)
}

View File

@ -0,0 +1,218 @@
use super::{Receivers, Subscriber};
use crate::output_processors::{RecordType, Runtime};
use crossbeam::channel::{Receiver, Sender};
use serde::{Deserialize, Serialize};
use std::{
fs::{File, OpenOptions},
io::Write,
path::PathBuf,
sync::{Arc, Mutex},
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeSettings {
pub path: PathBuf,
}
impl Default for RuntimeSettings {
fn default() -> Self {
let mut tmp = std::env::temp_dir();
tmp.push("simulation");
tmp.set_extension("runtime");
Self { path: tmp }
}
}
#[derive(Debug)]
pub struct RuntimeSubscriber<R> {
file: Arc<Mutex<File>>,
recvs: Arc<Receivers<R>>,
}
impl<R> Subscriber for RuntimeSubscriber<R>
where
R: crate::output_processors::Record + Serialize,
{
type Record = R;
type Settings = RuntimeSettings;
fn new(
record_recv: Receiver<Arc<Self::Record>>,
stop_recv: Receiver<Sender<()>>,
settings: Self::Settings,
) -> anyhow::Result<Self>
where
Self: Sized,
{
let mut opts = OpenOptions::new();
let recvs = Receivers {
stop_rx: stop_recv,
recv: record_recv,
};
let this = RuntimeSubscriber {
file: Arc::new(Mutex::new(
opts.truncate(true)
.create(true)
.read(true)
.write(true)
.open(&settings.path)?,
)),
recvs: Arc::new(recvs),
};
tracing::info!(
taget = "simulation",
"subscribed to {}",
settings.path.display()
);
Ok(this)
}
fn next(&self) -> Option<anyhow::Result<Arc<Self::Record>>> {
Some(self.recvs.recv.recv().map_err(From::from))
}
fn run(self) -> anyhow::Result<()> {
crossbeam::select! {
recv(self.recvs.stop_rx) -> finish_tx => {
// collect the run time meta
self.sink(Arc::new(R::from(Runtime::load()?)))?;
finish_tx?.send(())?;
}
recv(self.recvs.recv) -> msg => {
self.sink(msg?)?;
}
}
Ok(())
}
fn sink(&self, state: Arc<Self::Record>) -> anyhow::Result<()> {
let mut file = self.file.lock().expect("failed to lock file");
serde_json::to_writer(&mut *file, &state)?;
file.write_all(b",\n")?;
Ok(())
}
fn subscribe_data_type() -> RecordType {
RecordType::Data
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, time::Duration};
use consensus_engine::View;
use crate::{
network::{
behaviour::NetworkBehaviour,
regions::{Region, RegionsData},
Network, NetworkBehaviourKey,
},
node::{
dummy_streaming::{DummyStreamingNode, DummyStreamingState},
Node, NodeId, NodeIdExt,
},
output_processors::OutData,
runner::SimulationRunner,
warding::SimulationState,
};
use super::*;
#[derive(Debug, Clone, Serialize)]
struct RuntimeRecord {
states: HashMap<NodeId, View>,
}
impl<S, T: Serialize> TryFrom<&SimulationState<S, T>> for RuntimeRecord {
type Error = anyhow::Error;
fn try_from(value: &SimulationState<S, T>) -> Result<Self, Self::Error> {
Ok(Self {
states: value
.nodes
.read()
.iter()
.map(|node| (node.id(), node.current_view()))
.collect(),
})
}
}
#[test]
fn test_streaming() {
let simulation_settings = crate::settings::SimulationSettings {
seed: Some(1),
..Default::default()
};
let nodes = (0..6)
.map(|idx| {
Box::new(DummyStreamingNode::new(NodeId::from_index(idx), ()))
as Box<
dyn Node<State = DummyStreamingState, Settings = ()>
+ std::marker::Send
+ Sync,
>
})
.collect::<Vec<_>>();
let network = Network::new(
RegionsData {
regions: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
(region, vec![NodeId::from_index(idx)])
})
.collect(),
node_region: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
(NodeId::from_index(idx), region)
})
.collect(),
region_network_behaviour: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
(
NetworkBehaviourKey::new(region, region),
NetworkBehaviour {
delay: Duration::from_millis(100),
drop: 0.0,
},
)
})
.collect(),
},
0,
);
let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> =
SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap();
simulation_runner.simulate().unwrap();
}
}

View File

@ -0,0 +1,218 @@
use super::{Receivers, Subscriber};
use crate::output_processors::{RecordType, Runtime};
use crossbeam::channel::{Receiver, Sender};
use serde::{Deserialize, Serialize};
use std::{
fs::{File, OpenOptions},
io::Write,
path::PathBuf,
sync::{Arc, Mutex},
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SettingsSubscriberSettings {
pub path: PathBuf,
}
impl Default for SettingsSubscriberSettings {
fn default() -> Self {
let mut tmp = std::env::temp_dir();
tmp.push("simulation");
tmp.set_extension("conf");
Self { path: tmp }
}
}
#[derive(Debug)]
pub struct SettingsSubscriber<R> {
file: Arc<Mutex<File>>,
recvs: Arc<Receivers<R>>,
}
impl<R> Subscriber for SettingsSubscriber<R>
where
R: crate::output_processors::Record + Serialize,
{
type Record = R;
type Settings = SettingsSubscriberSettings;
fn new(
record_recv: Receiver<Arc<Self::Record>>,
stop_recv: Receiver<Sender<()>>,
settings: Self::Settings,
) -> anyhow::Result<Self>
where
Self: Sized,
{
let mut opts = OpenOptions::new();
let recvs = Receivers {
stop_rx: stop_recv,
recv: record_recv,
};
let this = SettingsSubscriber {
file: Arc::new(Mutex::new(
opts.truncate(true)
.create(true)
.read(true)
.write(true)
.open(&settings.path)?,
)),
recvs: Arc::new(recvs),
};
tracing::info!(
target = "simulation",
"subscribed to {}",
settings.path.display()
);
Ok(this)
}
fn next(&self) -> Option<anyhow::Result<Arc<Self::Record>>> {
Some(self.recvs.recv.recv().map_err(From::from))
}
fn run(self) -> anyhow::Result<()> {
crossbeam::select! {
recv(self.recvs.stop_rx) -> finish_tx => {
// collect the run time meta
self.sink(Arc::new(R::from(Runtime::load()?)))?;
finish_tx?.send(())?;
}
recv(self.recvs.recv) -> msg => {
self.sink(msg?)?;
}
}
Ok(())
}
fn sink(&self, state: Arc<Self::Record>) -> anyhow::Result<()> {
let mut file = self.file.lock().expect("failed to lock file");
serde_json::to_writer(&mut *file, &state)?;
file.write_all(b",\n")?;
Ok(())
}
fn subscribe_data_type() -> RecordType {
RecordType::Data
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, time::Duration};
use consensus_engine::View;
use crate::{
network::{
behaviour::NetworkBehaviour,
regions::{Region, RegionsData},
Network, NetworkBehaviourKey,
},
node::{
dummy_streaming::{DummyStreamingNode, DummyStreamingState},
Node, NodeId, NodeIdExt,
},
output_processors::OutData,
runner::SimulationRunner,
warding::SimulationState,
};
use super::*;
#[derive(Debug, Clone, Serialize)]
struct SettingsRecord {
states: HashMap<NodeId, View>,
}
impl<S, T: Serialize> TryFrom<&SimulationState<S, T>> for SettingsRecord {
type Error = anyhow::Error;
fn try_from(value: &SimulationState<S, T>) -> Result<Self, Self::Error> {
Ok(Self {
states: value
.nodes
.read()
.iter()
.map(|node| (node.id(), node.current_view()))
.collect(),
})
}
}
#[test]
fn test_streaming() {
let simulation_settings = crate::settings::SimulationSettings {
seed: Some(1),
..Default::default()
};
let nodes = (0..6)
.map(|idx| {
Box::new(DummyStreamingNode::new(NodeId::from_index(idx), ()))
as Box<
dyn Node<State = DummyStreamingState, Settings = ()>
+ std::marker::Send
+ Sync,
>
})
.collect::<Vec<_>>();
let network = Network::new(
RegionsData {
regions: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
(region, vec![NodeId::from_index(idx)])
})
.collect(),
node_region: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
(NodeId::from_index(idx), region)
})
.collect(),
region_network_behaviour: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
(
NetworkBehaviourKey::new(region, region),
NetworkBehaviour {
delay: Duration::from_millis(100),
drop: 0.0,
},
)
})
.collect(),
},
0,
);
let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> =
SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap();
simulation_runner.simulate().unwrap();
}
}

View File

@ -0,0 +1,54 @@
// std
use std::sync::Arc;
// crates
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
// internal
use crate::runner::BoxedNode;
mod ttf;
pub struct SimulationState<S, T> {
pub nodes: Arc<RwLock<Vec<BoxedNode<S, T>>>>,
}
impl<S, T> SimulationState<S, T> {
#[inline]
pub fn new(nodes: Vec<BoxedNode<S, T>>) -> Self {
Self {
nodes: Arc::new(RwLock::new(nodes)),
}
}
}
/// A ward is a computation over the `NetworkState`, it must return true if the state satisfies
/// the warding conditions. It is used to stop the consensus simulation if such condition is reached.
pub trait SimulationWard<S, T> {
type SimulationState;
fn analyze(&mut self, state: &Self::SimulationState) -> bool;
}
/// Ward dispatcher
/// Enum to avoid Boxing (Box<dyn SimulationWard>) wards.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum Ward {
MaxView(ttf::MaxViewWard),
}
impl Ward {
pub fn simulation_ward_mut<S, T>(
&mut self,
) -> &mut dyn SimulationWard<S, T, SimulationState = SimulationState<S, T>> {
match self {
Ward::MaxView(ward) => ward,
}
}
}
impl<S, T> SimulationWard<S, T> for Ward {
type SimulationState = SimulationState<S, T>;
fn analyze(&mut self, state: &Self::SimulationState) -> bool {
self.simulation_ward_mut().analyze(state)
}
}

View File

@ -0,0 +1,41 @@
use crate::warding::{SimulationState, SimulationWard};
use serde::{Deserialize, Serialize};
/// Time to finality ward. It monitors the amount of rounds of the simulations, triggers when surpassing
/// the set threshold.
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
#[serde(transparent)]
pub struct MaxViewWard {
max_count: usize,
}
impl<S, T> SimulationWard<S, T> for MaxViewWard {
type SimulationState = SimulationState<S, T>;
fn analyze(&mut self, state: &Self::SimulationState) -> bool {
// state.nodes.read().iter();
//.all(|n| n.current_view() >= self.max_count)
todo!()
}
}
#[cfg(test)]
mod test {
use crate::warding::ttf::MaxViewWard;
use crate::warding::{SimulationState, SimulationWard};
use parking_lot::RwLock;
use std::sync::Arc;
#[test]
fn rebase_threshold() {
let mut ttf = MaxViewWard { max_count: 10 };
let node = 11;
let state = SimulationState {
nodes: Arc::new(RwLock::new(vec![Box::new(node)])),
};
assert!(ttf.analyze(&state));
state.nodes.write().push(Box::new(9));
assert!(!ttf.analyze(&state));
}
}