Simulatios network throughput (#270)

* Use step time from configuration

* PayloadSize trait for inmemory network interface

* Per node network capacity

* Initial carnot message sizes

* Divide provided kbps by step time

* Use std::mem::size_of for msg sizes
This commit is contained in:
gusto 2023-07-25 16:03:05 +03:00 committed by GitHub
parent 7d64915dd7
commit df97ea2543
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 177 additions and 27 deletions

View File

@ -80,8 +80,14 @@ impl SimulationApp {
let (node_message_broadcast_sender, node_message_broadcast_receiver) =
channel::unbounded();
let (node_message_sender, node_message_receiver) = channel::unbounded();
// Dividing milliseconds in second by milliseconds in the step.
let step_time_as_second_fraction =
1_000_000 / simulation_settings.step_time.subsec_millis();
let capacity_bps = simulation_settings.node_settings.network_capacity_kbps * 1024
/ step_time_as_second_fraction;
let network_message_receiver = network.connect(
node_id,
capacity_bps,
node_message_receiver,
node_message_broadcast_receiver,
);

View File

@ -3,10 +3,12 @@ use std::{
collections::HashMap,
ops::Add,
str::FromStr,
sync::atomic::{AtomicU32, Ordering},
time::{Duration, Instant},
};
// crates
use crossbeam::channel::{self, Receiver, Sender};
use parking_lot::Mutex;
use rand::{rngs::SmallRng, Rng, SeedableRng};
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
@ -101,10 +103,48 @@ mod network_behaviors_serde {
}
}
/// Represents node network capacity and current load in bytes.
struct NodeNetworkCapacity {
capacity_bps: u32,
current_load: Mutex<u32>,
load_to_flush: AtomicU32,
}
impl NodeNetworkCapacity {
fn new(capacity_bps: u32) -> Self {
Self {
capacity_bps,
current_load: Mutex::new(0),
load_to_flush: AtomicU32::new(0),
}
}
fn increase_load(&self, load: u32) -> bool {
let mut current_load = self.current_load.lock();
if *current_load + load <= self.capacity_bps {
*current_load += load;
true
} else {
false
}
}
fn decrease_load(&self, load: u32) {
self.load_to_flush.fetch_add(load, Ordering::Relaxed);
}
fn flush_load(&self) {
let mut s = self.current_load.lock();
*s -= self.load_to_flush.load(Ordering::Relaxed);
self.load_to_flush.store(0, Ordering::Relaxed);
}
}
pub struct Network<M: std::fmt::Debug> {
pub regions: regions::RegionsData,
network_time: NetworkTime,
messages: Vec<(NetworkTime, NetworkMessage<M>)>,
node_network_capacity: HashMap<NodeId, NodeNetworkCapacity>,
from_node_receivers: HashMap<NodeId, Receiver<NetworkMessage<M>>>,
from_node_broadcast_receivers: HashMap<NodeId, Receiver<NetworkMessage<M>>>,
to_node_senders: HashMap<NodeId, Sender<NetworkMessage<M>>>,
@ -120,6 +160,7 @@ where
regions,
network_time: Instant::now(),
messages: Vec::new(),
node_network_capacity: HashMap::new(),
from_node_receivers: HashMap::new(),
from_node_broadcast_receivers: HashMap::new(),
to_node_senders: HashMap::new(),
@ -142,9 +183,12 @@ where
pub fn connect(
&mut self,
node_id: NodeId,
capacity_bps: u32,
node_message_receiver: Receiver<NetworkMessage<M>>,
node_message_broadcast_receiver: Receiver<NetworkMessage<M>>,
) -> Receiver<NetworkMessage<M>> {
self.node_network_capacity
.insert(node_id, NodeNetworkCapacity::new(capacity_bps));
let (to_node_sender, from_network_receiver) = channel::unbounded();
self.from_node_receivers
.insert(node_id, node_message_receiver);
@ -206,6 +250,10 @@ where
.cloned()
.collect();
for (_, c) in self.node_network_capacity.iter() {
c.flush_load();
}
self.messages = delayed;
}
@ -218,11 +266,15 @@ where
) -> bool {
let to = message.to.expect("adhoc message has recipient");
if let Some(delay) = self.send_message_cost(rng, message.from, to) {
if network_time.add(delay) <= self.network_time {
let node_capacity = self.node_network_capacity.get(&to).unwrap();
if network_time.add(delay) <= self.network_time
&& node_capacity.increase_load(message.size_bytes)
{
let to_node = self.to_node_senders.get(&to).unwrap();
to_node
.send(message.clone())
.expect("Node should have connection");
.expect("node should have connection");
node_capacity.decrease_load(message.size_bytes);
return false;
} else {
return true;
@ -237,11 +289,17 @@ pub struct NetworkMessage<M> {
pub from: NodeId,
pub to: Option<NodeId>,
pub payload: M,
pub size_bytes: u32,
}
impl<M> NetworkMessage<M> {
pub fn new(from: NodeId, to: Option<NodeId>, payload: M) -> Self {
Self { from, to, payload }
pub fn new(from: NodeId, to: Option<NodeId>, payload: M, size_bytes: u32) -> Self {
Self {
from,
to,
payload,
size_bytes,
}
}
pub fn get_payload(self) -> M {
@ -249,6 +307,10 @@ impl<M> NetworkMessage<M> {
}
}
pub trait PayloadSize {
fn size_bytes(&self) -> u32;
}
pub trait NetworkInterface {
type Payload;
@ -280,16 +342,18 @@ impl<M> InMemoryNetworkInterface<M> {
}
}
impl<M> NetworkInterface for InMemoryNetworkInterface<M> {
impl<M: PayloadSize> NetworkInterface for InMemoryNetworkInterface<M> {
type Payload = M;
fn broadcast(&self, message: Self::Payload) {
let message = NetworkMessage::new(self.id, None, message);
let size = message.size_bytes();
let message = NetworkMessage::new(self.id, None, message, size);
self.broadcast.send(message).unwrap();
}
fn send_message(&self, address: NodeId, message: Self::Payload) {
let message = NetworkMessage::new(self.id, Some(address), message);
let size = message.size_bytes();
let message = NetworkMessage::new(self.id, Some(address), message, size);
self.sender.send(message).unwrap();
}
@ -339,12 +403,12 @@ mod tests {
type Payload = ();
fn broadcast(&self, message: Self::Payload) {
let message = NetworkMessage::new(self.id, None, message);
let message = NetworkMessage::new(self.id, None, message, 1);
self.broadcast.send(message).unwrap();
}
fn send_message(&self, address: NodeId, message: Self::Payload) {
let message = NetworkMessage::new(self.id, Some(address), message);
let message = NetworkMessage::new(self.id, Some(address), message, 1);
self.sender.send(message).unwrap();
}
@ -368,7 +432,7 @@ mod tests {
let (from_a_sender, from_a_receiver) = channel::unbounded();
let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded();
let to_a_receiver = network.connect(node_a, from_a_receiver, from_a_broadcast_receiver);
let to_a_receiver = network.connect(node_a, 3, from_a_receiver, from_a_broadcast_receiver);
let a = MockNetworkInterface::new(
node_a,
from_a_broadcast_sender,
@ -378,7 +442,7 @@ mod tests {
let (from_b_sender, from_b_receiver) = channel::unbounded();
let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded();
let to_b_receiver = network.connect(node_b, from_b_receiver, from_b_broadcast_receiver);
let to_b_receiver = network.connect(node_b, 3, from_b_receiver, from_b_broadcast_receiver);
let b = MockNetworkInterface::new(
node_b,
from_b_broadcast_sender,
@ -443,7 +507,7 @@ mod tests {
let (from_a_sender, from_a_receiver) = channel::unbounded();
let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded();
let to_a_receiver = network.connect(node_a, from_a_receiver, from_a_broadcast_receiver);
let to_a_receiver = network.connect(node_a, 2, from_a_receiver, from_a_broadcast_receiver);
let a = MockNetworkInterface::new(
node_a,
from_a_broadcast_sender,
@ -453,7 +517,7 @@ mod tests {
let (from_b_sender, from_b_receiver) = channel::unbounded();
let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded();
let to_b_receiver = network.connect(node_b, from_b_receiver, from_b_broadcast_receiver);
let to_b_receiver = network.connect(node_b, 2, from_b_receiver, from_b_broadcast_receiver);
let b = MockNetworkInterface::new(
node_b,
from_b_broadcast_sender,
@ -463,7 +527,7 @@ mod tests {
let (from_c_sender, from_c_receiver) = channel::unbounded();
let (from_c_broadcast_sender, from_c_broadcast_receiver) = channel::unbounded();
let to_c_receiver = network.connect(node_c, from_c_receiver, from_c_broadcast_receiver);
let to_c_receiver = network.connect(node_c, 2, from_c_receiver, from_c_broadcast_receiver);
let c = MockNetworkInterface::new(
node_c,
from_c_broadcast_sender,
@ -502,4 +566,55 @@ mod tests {
assert_eq!(b.receive_messages().len(), 0);
assert_eq!(c.receive_messages().len(), 1); // b to c
}
#[test]
fn node_network_capacity_limit() {
let node_a = NodeId::from_index(0);
let node_b = NodeId::from_index(1);
let regions = HashMap::from([(Region::Europe, vec![node_a, node_b])]);
let behaviour = HashMap::from([(
NetworkBehaviourKey::new(Region::Europe, Region::Europe),
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
)]);
let regions_data = RegionsData::new(regions, behaviour);
let mut network = Network::new(regions_data, 0);
let (from_a_sender, from_a_receiver) = channel::unbounded();
let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded();
let to_a_receiver = network.connect(node_a, 3, from_a_receiver, from_a_broadcast_receiver);
let a = MockNetworkInterface::new(
node_a,
from_a_broadcast_sender,
from_a_sender,
to_a_receiver,
);
let (from_b_sender, from_b_receiver) = channel::unbounded();
let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded();
let to_b_receiver = network.connect(node_b, 2, from_b_receiver, from_b_broadcast_receiver);
let b = MockNetworkInterface::new(
node_b,
from_b_broadcast_sender,
from_b_sender,
to_b_receiver,
);
for _ in 0..6 {
a.send_message(node_b, ());
b.send_message(node_a, ());
}
network.step(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 3);
assert_eq!(b.receive_messages().len(), 2);
network.step(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 3);
assert_eq!(b.receive_messages().len(), 2);
network.step(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 2);
}
}

View File

@ -3,6 +3,8 @@ use nomos_consensus::network::messages::{
NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg,
};
use crate::network::PayloadSize;
#[derive(Debug, Eq, PartialEq, Hash, Clone)]
pub enum CarnotMessage {
Proposal(ProposalChunkMsg),
@ -23,3 +25,17 @@ impl CarnotMessage {
}
}
}
impl PayloadSize for CarnotMessage {
fn size_bytes(&self) -> u32 {
match self {
CarnotMessage::Proposal(p) => {
(std::mem::size_of::<ProposalChunkMsg>() + p.chunk.len()) as u32
}
CarnotMessage::Vote(_) => std::mem::size_of::<VoteMsg>() as u32,
CarnotMessage::TimeoutQc(_) => std::mem::size_of::<TimeoutQcMsg>() as u32,
CarnotMessage::Timeout(_) => std::mem::size_of::<TimeoutMsg>() as u32,
CarnotMessage::NewView(_) => std::mem::size_of::<NewViewMsg>() as u32,
}
}
}

View File

@ -3,6 +3,7 @@ use consensus_engine::View;
use std::collections::{BTreeMap, BTreeSet};
use std::time::Duration;
// crates
use crate::network::PayloadSize;
use serde::{Deserialize, Serialize};
// internal
use crate::{
@ -90,6 +91,12 @@ pub enum DummyMessage {
Proposal(Block),
}
impl PayloadSize for DummyMessage {
fn size_bytes(&self) -> u32 {
0
}
}
struct LocalView {
pub next_view_leaders: Vec<NodeId>,
pub current_roots: Option<BTreeSet<NodeId>>,
@ -475,6 +482,7 @@ mod tests {
channel::unbounded();
let network_message_receiver = network.connect(
*node_id,
0,
node_message_receiver,
node_message_broadcast_receiver,
);

View File

@ -16,6 +16,7 @@ use super::SimulationRunnerHandle;
pub fn simulate<M, R, S, T>(
runner: SimulationRunner<M, R, S, T>,
chunk_size: usize,
step_time: Duration,
) -> anyhow::Result<SimulationRunnerHandle<R>>
where
M: std::fmt::Debug + Clone + Send + Sync + 'static,
@ -38,7 +39,6 @@ where
let (stop_tx, stop_rx) = bounded(1);
let p = runner.producer.clone();
let p1 = runner.producer;
let elapsed = Duration::from_millis(100);
let handle = std::thread::spawn(move || {
loop {
select! {
@ -53,7 +53,7 @@ where
.write()
.par_iter_mut()
.filter(|n| ids.contains(&n.id()))
.for_each(|node|node.step(elapsed));
.for_each(|node|node.step(step_time));
p.send(R::try_from(
&simulation_state,

View File

@ -19,6 +19,7 @@ pub fn simulate<M, R, S, T>(
runner: SimulationRunner<M, R, S, T>,
update_rate: usize,
maximum_iterations: usize,
step_time: Duration,
) -> anyhow::Result<SimulationRunnerHandle<R>>
where
M: std::fmt::Debug + Send + Sync + Clone + 'static,
@ -42,7 +43,6 @@ where
let (stop_tx, stop_rx) = bounded(1);
let p = runner.producer.clone();
let p1 = runner.producer;
let elapsed = Duration::from_millis(100);
let handle = std::thread::spawn(move || {
'main: for chunk in iterations.chunks(update_rate) {
select! {
@ -62,7 +62,7 @@ where
let node: &mut dyn Node<Settings = S, State = T> = &mut **shared_nodes
.get_mut(node_id.index())
.expect("Node should be present");
node.step(elapsed);
node.step(step_time);
}
// check if any condition makes the simulation stop

View File

@ -52,6 +52,7 @@ pub fn simulate<M, R, S, T>(
runner: SimulationRunner<M, R, S, T>,
gap: usize,
distribution: Option<Vec<f32>>,
step_time: Duration,
) -> anyhow::Result<SimulationRunnerHandle<R>>
where
M: std::fmt::Debug + Send + Sync + Clone + 'static,
@ -79,7 +80,6 @@ where
let (stop_tx, stop_rx) = bounded(1);
let p = runner.producer.clone();
let p1 = runner.producer;
let elapsed = Duration::from_millis(100);
let handle = std::thread::spawn(move || {
loop {
select! {
@ -99,7 +99,7 @@ where
.get_mut(node_id.index())
.expect("Node should be present");
let prev_view = node.current_view();
node.step(elapsed);
node.step(step_time);
let after_view = node.current_view();
if after_view > prev_view {
// pass node to next step group

View File

@ -93,6 +93,7 @@ pub struct SimulationRunner<M: std::fmt::Debug, R, S, T> {
nodes: Arc<RwLock<Vec<BoxedNode<S, T>>>>,
runner_settings: RunnerSettings,
producer: StreamProducer<R>,
step_time: Duration,
}
impl<M, R, S, T> SimulationRunner<M, R, S, T>
@ -136,7 +137,7 @@ where
views_count: _,
leaders_count: _,
network_settings: _,
step_time: _,
step_time,
record_settings: _,
} = settings;
Ok(Self {
@ -148,24 +149,26 @@ where
},
nodes,
producer,
step_time,
})
}
pub fn simulate(self) -> anyhow::Result<SimulationRunnerHandle<R>> {
// init the start time
let _ = *crate::START_TIME;
let step_time = self.step_time;
match self.runner_settings.clone() {
RunnerSettings::Sync => sync_runner::simulate(self),
RunnerSettings::Async { chunks } => async_runner::simulate(self, chunks),
RunnerSettings::Sync => sync_runner::simulate(self, step_time),
RunnerSettings::Async { chunks } => async_runner::simulate(self, chunks, step_time),
RunnerSettings::Glauber {
maximum_iterations,
update_rate,
} => glauber_runner::simulate(self, update_rate, maximum_iterations),
} => glauber_runner::simulate(self, update_rate, maximum_iterations, step_time),
RunnerSettings::Layered {
rounds_gap,
distribution,
} => layered_runner::simulate(self, rounds_gap, distribution),
} => layered_runner::simulate(self, rounds_gap, distribution, step_time),
}
}
}

View File

@ -8,6 +8,7 @@ use std::time::Duration;
/// Simulate with sending the network state to any subscriber
pub fn simulate<M, R, S, T>(
runner: SimulationRunner<M, R, S, T>,
step_time: Duration,
) -> anyhow::Result<SimulationRunnerHandle<R>>
where
M: std::fmt::Debug + Send + Sync + Clone + 'static,
@ -29,7 +30,6 @@ where
let (stop_tx, stop_rx) = bounded(1);
let p = runner.producer.clone();
let p1 = runner.producer;
let elapsed = Duration::from_millis(100);
let handle = std::thread::spawn(move || {
p.send(R::try_from(&state)?)?;
loop {
@ -43,7 +43,7 @@ where
// then dead lock will occur
{
let mut nodes = nodes.write();
inner_runner.step(&mut nodes, elapsed);
inner_runner.step(&mut nodes, step_time);
}
p.send(R::try_from(&state)?)?;
@ -116,6 +116,7 @@ mod tests {
channel::unbounded();
let network_message_receiver = network.connect(
*node_id,
1,
node_message_receiver,
node_message_broadcast_receiver,
);

View File

@ -37,6 +37,7 @@ pub struct TreeSettings {
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct NodeSettings {
pub network_capacity_kbps: u32,
#[serde(with = "humantime_serde")]
pub timeout: std::time::Duration,
}