diff --git a/mixnet/protocol/gossip.py b/mixnet/protocol/gossip.py index 04a1e60..6c8d9aa 100644 --- a/mixnet/protocol/gossip.py +++ b/mixnet/protocol/gossip.py @@ -59,7 +59,7 @@ class Gossip: async def __process_inbound_conn(self, conn: DuplexConnection): while True: msg = await conn.recv() - if self.__check_update_cache(msg): + if self._check_update_cache(msg): continue await self._process_inbound_msg(msg, conn) @@ -76,7 +76,7 @@ class Gossip: # even though we update the cache in the _process_inbound_msg method. # It's because we don't want this publisher node to gossip the message again # when it first receives the messages from one of its peers later. - if not self.__check_update_cache(msg): + if not self._check_update_cache(msg): await self._gossip(msg) # With the same reason, call the handler here # which means that we consider that this publisher node received the message. @@ -90,7 +90,7 @@ class Gossip: if conn not in excludes: await conn.send(msg) - def __check_update_cache(self, packet: bytes) -> bool: + def _check_update_cache(self, packet: bytes) -> bool: """ Add a message to the cache, and return True if the message was already in the cache. """ diff --git a/mixnet/protocol/nomssip.py b/mixnet/protocol/nomssip.py index bd4e96a..15a3fda 100644 --- a/mixnet/protocol/nomssip.py +++ b/mixnet/protocol/nomssip.py @@ -64,28 +64,24 @@ class Nomssip(Gossip): # Drop noise packet return case FlaggedPacket.Flag.REAL: - await self.__gossip_flagged_packet(packet, [received_from]) + self.assert_message_size(packet.message) + await super()._gossip(msg, [received_from]) await self.handler(packet.message) @override - async def _gossip(self, msg: bytes, excludes: list[DuplexConnection] = []): - """ - Gossip a message to all connected peers with prepending a message flag - """ + async def publish(self, msg: bytes): + self.assert_message_size(msg) + + packet = FlaggedPacket(FlaggedPacket.Flag.REAL, msg).bytes() + # Please see comments in super().publish() for the reason of the following line. + if not self._check_update_cache(packet): + await self._gossip(packet) + await self.handler(msg) + + def assert_message_size(self, msg: bytes): # The message size must be fixed. assert len(msg) == self.config.msg_size, f"{len(msg)} != {self.config.msg_size}" - packet = FlaggedPacket(FlaggedPacket.Flag.REAL, msg) - await self.__gossip_flagged_packet(packet, excludes) - - async def __gossip_flagged_packet( - self, packet: FlaggedPacket, excludes: list[DuplexConnection] = [] - ): - """ - An internal method to send a flagged packet to all connected peers - """ - await super()._gossip(packet.bytes(), excludes) - class FlaggedPacket: class Flag(Enum):