Exclude peer who forwarded a message when forwarding the message (#32)

Co-authored-by: Daniel Sanchez <3danimanimal@gmail.com>
This commit is contained in:
Youngjoon Lee 2024-11-08 11:20:30 +07:00 committed by GitHub
parent 3d1bd0dc18
commit fb1894fb8c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -9,6 +9,7 @@ use crossbeam::channel;
use futures::Stream; use futures::Stream;
use lottery::StakeLottery; use lottery::StakeLottery;
use multiaddr::Multiaddr; use multiaddr::Multiaddr;
use netrunner::network::NetworkMessage;
use netrunner::node::{Node, NodeId}; use netrunner::node::{Node, NodeId};
use netrunner::{ use netrunner::{
network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize}, network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize},
@ -211,23 +212,27 @@ impl MixNode {
} }
} }
fn forward(&mut self, message: MixMessage) { fn forward(&mut self, message: MixMessage, exclude_node: Option<NodeId>) {
if !self.message_cache.insert(Self::sha256(&message.0)) { if !self.message_cache.insert(Self::sha256(&message.0)) {
return; return;
} }
for node_id in self.settings.connected_peers.iter() { for node_id in self
.settings
.connected_peers
.iter()
.filter(|&id| Some(*id) != exclude_node)
{
self.network_interface self.network_interface
.send_message(*node_id, message.clone()) .send_message(*node_id, message.clone())
} }
} }
fn receive(&mut self) -> Vec<MixMessage> { fn receive(&mut self) -> Vec<NetworkMessage<MixMessage>> {
self.network_interface self.network_interface
.receive_messages() .receive_messages()
.into_iter() .into_iter()
.map(|msg| msg.into_payload())
// Retain only messages that have not been seen before // Retain only messages that have not been seen before
.filter(|msg| self.message_cache.insert(Self::sha256(&msg.0))) .filter(|msg| self.message_cache.insert(Self::sha256(&msg.payload().0)))
.collect() .collect()
} }
@ -277,11 +282,16 @@ impl Node for MixNode {
self.persistent_sender.send(message).unwrap(); self.persistent_sender.send(message).unwrap();
} }
} }
let received_messages = self.receive(); // TODO: Generate cover message with probability
for message in received_messages { for network_message in self.receive() {
// println!(">>>>> Node {}, message: {message:?}", self.id); // println!(">>>>> Node {}, message: {message:?}", self.id);
self.forward(message.clone()); self.forward(
self.blend_sender.send(message.0).unwrap(); network_message.payload().clone(),
Some(network_message.from),
);
self.blend_sender
.send(network_message.into_payload().0)
.unwrap();
} }
// Proceed message blend // Proceed message blend
@ -306,7 +316,7 @@ impl Node for MixNode {
if let Poll::Ready(Some(msg)) = if let Poll::Ready(Some(msg)) =
pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx) pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx)
{ {
self.forward(MixMessage(msg)); self.forward(MixMessage(msg), None);
} }
self.state.step_id += 1; self.state.step_id += 1;