mirror of
https://github.com/logos-blockchain/logos-blockchain-simulations.git
synced 2026-01-02 21:23:11 +00:00
132 lines
4.4 KiB
Rust
132 lines
4.4 KiB
Rust
use std::{fmt::Debug, hash::Hash};
|
|
|
|
use rustc_hash::{FxHashMap, FxHashSet};
|
|
|
|
use crate::queue::{new_queue, Message, Queue, QueueConfig};
|
|
|
|
pub type NodeId = u32;
|
|
|
|
pub struct Node<M>
|
|
where
|
|
M: Debug + Copy + Clone + PartialEq + Eq + Hash,
|
|
{
|
|
pub id: NodeId,
|
|
queue_config: QueueConfig,
|
|
// To have the deterministic result, we use Vec instead of FxHashMap.
|
|
// Building `queues` is inefficient, but it's not a problem because it's done only once at the beginning.
|
|
// Instead, use `connected_peers` to build `queues` efficiently.
|
|
queues: Vec<(NodeId, Box<dyn Queue<M>>)>,
|
|
connected_peers: FxHashSet<NodeId>,
|
|
// A cache to avoid relaying the same message multiple times.
|
|
received_msgs: Option<FxHashMap<M, u32>>,
|
|
pub peering_degree: u32,
|
|
}
|
|
|
|
pub type MessagesToRelay<M> = Vec<(NodeId, Message<M>)>;
|
|
|
|
impl<M> Node<M>
|
|
where
|
|
M: 'static + Debug + Copy + Clone + PartialEq + Eq + Hash,
|
|
{
|
|
pub fn new(
|
|
id: NodeId,
|
|
queue_config: QueueConfig,
|
|
peering_degree: u32,
|
|
enable_cache: bool,
|
|
) -> Self {
|
|
Node::<M> {
|
|
id,
|
|
queue_config,
|
|
queues: Vec::new(),
|
|
connected_peers: FxHashSet::default(),
|
|
received_msgs: if enable_cache {
|
|
Some(FxHashMap::default())
|
|
} else {
|
|
None
|
|
},
|
|
peering_degree,
|
|
}
|
|
}
|
|
|
|
pub fn connect(&mut self, peer_id: NodeId) {
|
|
if self.connected_peers.insert(peer_id) {
|
|
let pos = self
|
|
.queues
|
|
.binary_search_by(|probe| probe.0.cmp(&peer_id))
|
|
.unwrap_or_else(|pos| pos);
|
|
self.queues
|
|
.insert(pos, (peer_id, new_queue::<M>(&self.queue_config)));
|
|
}
|
|
}
|
|
|
|
pub fn send(&mut self, msg: M) {
|
|
assert!(self.check_and_update_cache(msg, true));
|
|
for (_, queue) in self.queues.iter_mut() {
|
|
queue.push(msg);
|
|
}
|
|
}
|
|
|
|
pub fn receive(&mut self, msg: M, from: Option<NodeId>) -> bool {
|
|
// If `from` is None, it means that the message is being sent from the node outside of the gossip network.
|
|
// In this case, the received count in the cache must start from 0, so it can be removed from the cache only when it is received from C gossip peers (not C-1).
|
|
// For that, we set `sending` to true, as if this node is sending the message.
|
|
let sending = from.is_none();
|
|
let first_received = self.check_and_update_cache(msg, sending);
|
|
if first_received {
|
|
for (node_id, queue) in self.queues.iter_mut() {
|
|
match from {
|
|
Some(sender) => {
|
|
if *node_id != sender {
|
|
queue.push(msg);
|
|
}
|
|
}
|
|
None => queue.push(msg),
|
|
}
|
|
}
|
|
}
|
|
first_received
|
|
}
|
|
|
|
pub fn read_queues(&mut self) -> MessagesToRelay<M> {
|
|
let mut msgs_to_relay: MessagesToRelay<M> = Vec::with_capacity(self.queues.len());
|
|
self.queues.iter_mut().for_each(|(node_id, queue)| {
|
|
msgs_to_relay.push((*node_id, queue.pop()));
|
|
});
|
|
msgs_to_relay
|
|
}
|
|
|
|
pub fn queue_data_msg_counts(&self) -> Vec<usize> {
|
|
self.queues
|
|
.iter()
|
|
.map(|(_, queue)| queue.data_count())
|
|
.collect()
|
|
}
|
|
|
|
fn check_and_update_cache(&mut self, msg: M, sending: bool) -> bool {
|
|
if let Some(received_msgs) = &mut self.received_msgs {
|
|
let first_received = if let Some(count) = received_msgs.get_mut(&msg) {
|
|
*count += 1;
|
|
false
|
|
} else {
|
|
received_msgs.insert(msg, if sending { 0 } else { 1 });
|
|
true
|
|
};
|
|
|
|
// If the message have been received from all connected peers, remove it from the cache
|
|
// because there is no possibility that the message will be received again.
|
|
if received_msgs.get(&msg).unwrap() == &self.peering_degree {
|
|
tracing::debug!(
|
|
"Remove message from cache: {:?} because it has been received {} times, assuming that each inbound peer sent the message once.",
|
|
msg,
|
|
self.peering_degree
|
|
);
|
|
received_msgs.remove(&msg);
|
|
}
|
|
|
|
first_received
|
|
} else {
|
|
true
|
|
}
|
|
}
|
|
}
|