From befb52d014cef3f80858e427bd257a1c20e8774a Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Tue, 13 Aug 2024 01:14:27 +0900 Subject: [PATCH] optimize: exclude the node who sent the message when gossiping the message --- mixnet/protocol/gossip.py | 11 ++++++----- mixnet/protocol/nomssip.py | 15 +++++++++------ 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/mixnet/protocol/gossip.py b/mixnet/protocol/gossip.py index 2203cee..04a1e60 100644 --- a/mixnet/protocol/gossip.py +++ b/mixnet/protocol/gossip.py @@ -61,10 +61,10 @@ class Gossip: msg = await conn.recv() if self.__check_update_cache(msg): continue - await self._process_inbound_msg(msg) + await self._process_inbound_msg(msg, conn) - async def _process_inbound_msg(self, msg: bytes): - await self._gossip(msg) + async def _process_inbound_msg(self, msg: bytes, received_from: DuplexConnection): + await self._gossip(msg, [received_from]) await self.handler(msg) async def publish(self, msg: bytes): @@ -82,12 +82,13 @@ class Gossip: # which means that we consider that this publisher node received the message. await self.handler(msg) - async def _gossip(self, msg: bytes): + async def _gossip(self, msg: bytes, excludes: list[DuplexConnection] = []): """ Gossip a message to all peers connected to this node. """ for conn in self.conns: - await conn.send(msg) + if conn not in excludes: + await conn.send(msg) def __check_update_cache(self, packet: bytes) -> bool: """ diff --git a/mixnet/protocol/nomssip.py b/mixnet/protocol/nomssip.py index 588a92e..bd4e96a 100644 --- a/mixnet/protocol/nomssip.py +++ b/mixnet/protocol/nomssip.py @@ -6,6 +6,7 @@ from typing import Awaitable, Callable, Self, override from framework import Framework from protocol.connection import ( + DuplexConnection, MixSimplexConnection, SimplexConnection, ) @@ -56,18 +57,18 @@ class Nomssip(Gossip): ) @override - async def _process_inbound_msg(self, msg: bytes): + async def _process_inbound_msg(self, msg: bytes, received_from: DuplexConnection): packet = FlaggedPacket.from_bytes(msg) match packet.flag: case FlaggedPacket.Flag.NOISE: # Drop noise packet return case FlaggedPacket.Flag.REAL: - await self.__gossip_flagged_packet(packet) + await self.__gossip_flagged_packet(packet, [received_from]) await self.handler(packet.message) @override - async def _gossip(self, msg: bytes): + async def _gossip(self, msg: bytes, excludes: list[DuplexConnection] = []): """ Gossip a message to all connected peers with prepending a message flag """ @@ -75,13 +76,15 @@ class Nomssip(Gossip): assert len(msg) == self.config.msg_size, f"{len(msg)} != {self.config.msg_size}" packet = FlaggedPacket(FlaggedPacket.Flag.REAL, msg) - await self.__gossip_flagged_packet(packet) + await self.__gossip_flagged_packet(packet, excludes) - async def __gossip_flagged_packet(self, packet: FlaggedPacket): + async def __gossip_flagged_packet( + self, packet: FlaggedPacket, excludes: list[DuplexConnection] = [] + ): """ An internal method to send a flagged packet to all connected peers """ - await super()._gossip(packet.bytes()) + await super()._gossip(packet.bytes(), excludes) class FlaggedPacket: