132 lines
4.4 KiB
Rust

use std::{fmt::Debug, hash::Hash};
use rustc_hash::{FxHashMap, FxHashSet};
use crate::queue::{new_queue, Message, Queue, QueueConfig};
pub type NodeId = u32;
pub struct Node<M>
where
M: Debug + Copy + Clone + PartialEq + Eq + Hash,
{
pub id: NodeId,
queue_config: QueueConfig,
// To have the deterministic result, we use Vec instead of FxHashMap.
// Building `queues` is inefficient, but it's not a problem because it's done only once at the beginning.
// Instead, use `connected_peers` to build `queues` efficiently.
queues: Vec<(NodeId, Box<dyn Queue<M>>)>,
connected_peers: FxHashSet<NodeId>,
// A cache to avoid relaying the same message multiple times.
received_msgs: Option<FxHashMap<M, u32>>,
pub peering_degree: u32,
}
pub type MessagesToRelay<M> = Vec<(NodeId, Message<M>)>;
impl<M> Node<M>
where
M: 'static + Debug + Copy + Clone + PartialEq + Eq + Hash,
{
pub fn new(
id: NodeId,
queue_config: QueueConfig,
peering_degree: u32,
enable_cache: bool,
) -> Self {
Node::<M> {
id,
queue_config,
queues: Vec::new(),
connected_peers: FxHashSet::default(),
received_msgs: if enable_cache {
Some(FxHashMap::default())
} else {
None
},
peering_degree,
}
}
pub fn connect(&mut self, peer_id: NodeId) {
if self.connected_peers.insert(peer_id) {
let pos = self
.queues
.binary_search_by(|probe| probe.0.cmp(&peer_id))
.unwrap_or_else(|pos| pos);
self.queues
.insert(pos, (peer_id, new_queue::<M>(&self.queue_config)));
}
}
pub fn send(&mut self, msg: M) {
assert!(self.check_and_update_cache(msg, true));
for (_, queue) in self.queues.iter_mut() {
queue.push(msg);
}
}
pub fn receive(&mut self, msg: M, from: Option<NodeId>) -> bool {
// If `from` is None, it means that the message is being sent from the node outside of the gossip network.
// In this case, the received count in the cache must start from 0, so it can be removed from the cache only when it is received from C gossip peers (not C-1).
// For that, we set `sending` to true, as if this node is sending the message.
let sending = from.is_none();
let first_received = self.check_and_update_cache(msg, sending);
if first_received {
for (node_id, queue) in self.queues.iter_mut() {
match from {
Some(sender) => {
if *node_id != sender {
queue.push(msg);
}
}
None => queue.push(msg),
}
}
}
first_received
}
pub fn read_queues(&mut self) -> MessagesToRelay<M> {
let mut msgs_to_relay: MessagesToRelay<M> = Vec::with_capacity(self.queues.len());
self.queues.iter_mut().for_each(|(node_id, queue)| {
msgs_to_relay.push((*node_id, queue.pop()));
});
msgs_to_relay
}
pub fn queue_data_msg_counts(&self) -> Vec<usize> {
self.queues
.iter()
.map(|(_, queue)| queue.data_count())
.collect()
}
fn check_and_update_cache(&mut self, msg: M, sending: bool) -> bool {
if let Some(received_msgs) = &mut self.received_msgs {
let first_received = if let Some(count) = received_msgs.get_mut(&msg) {
*count += 1;
false
} else {
received_msgs.insert(msg, if sending { 0 } else { 1 });
true
};
// If the message have been received from all connected peers, remove it from the cache
// because there is no possibility that the message will be received again.
if received_msgs.get(&msg).unwrap() == &self.peering_degree {
tracing::debug!(
"Remove message from cache: {:?} because it has been received {} times, assuming that each inbound peer sent the message once.",
msg,
self.peering_degree
);
received_msgs.remove(&msg);
}
first_received
} else {
true
}
}
}