From 3a2f3cc0794ae202c0e2f25686342e58e6ba75bc Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Wed, 14 Aug 2024 16:52:18 +0900 Subject: [PATCH] optimization: remove cache entry if the message has been received from all adjacent peers in the end --- mixnet/protocol/gossip.py | 29 ++++++++++++++++++++++------- mixnet/protocol/nomssip.py | 2 +- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/mixnet/protocol/gossip.py b/mixnet/protocol/gossip.py index 6c8d9aa..01d0e49 100644 --- a/mixnet/protocol/gossip.py +++ b/mixnet/protocol/gossip.py @@ -35,7 +35,8 @@ class Gossip: self.conns: list[DuplexConnection] = [] # A handler to process inbound messages. self.handler = handler - self.packet_cache: set[bytes] = set() + # msg -> received_cnt + self.packet_cache: dict[bytes, int] = dict() # A set just for gathering a reference of tasks to prevent them from being garbage collected. # https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task self.tasks: set[Awaitable] = set() @@ -76,7 +77,7 @@ class Gossip: # even though we update the cache in the _process_inbound_msg method. # It's because we don't want this publisher node to gossip the message again # when it first receives the messages from one of its peers later. - if not self._check_update_cache(msg): + if not self._check_update_cache(msg, publishing=True): await self._gossip(msg) # With the same reason, call the handler here # which means that we consider that this publisher node received the message. @@ -90,12 +91,26 @@ class Gossip: if conn not in excludes: await conn.send(msg) - def _check_update_cache(self, packet: bytes) -> bool: + def _check_update_cache(self, packet: bytes, publishing: bool = False) -> bool: """ Add a message to the cache, and return True if the message was already in the cache. """ hash = hashlib.sha256(packet).digest() - if hash in self.packet_cache: - return True - self.packet_cache.add(hash) - return False + seen = hash in self.packet_cache + + if publishing: + if not seen: + # Put 0 when publishing, so that the publisher node doesn't gossip the message again + # even when it first receive the message from one of its peers later. + self.packet_cache[hash] = 0 + else: + if not seen: + self.packet_cache[hash] = 1 + else: + self.packet_cache[hash] += 1 + # Remove the message from the cache if it's received from all adjacent peers in the end + # to reduce the size of cache. + if self.packet_cache[hash] >= self.config.peering_degree: + del self.packet_cache[hash] + + return seen diff --git a/mixnet/protocol/nomssip.py b/mixnet/protocol/nomssip.py index 15a3fda..2cd8c9b 100644 --- a/mixnet/protocol/nomssip.py +++ b/mixnet/protocol/nomssip.py @@ -74,7 +74,7 @@ class Nomssip(Gossip): packet = FlaggedPacket(FlaggedPacket.Flag.REAL, msg).bytes() # Please see comments in super().publish() for the reason of the following line. - if not self._check_update_cache(packet): + if not self._check_update_cache(packet, publishing=True): await self._gossip(packet) await self.handler(msg)