diff --git a/mixnet/protocol/gossip.py b/mixnet/protocol/gossip.py index ff88e2f..2203cee 100644 --- a/mixnet/protocol/gossip.py +++ b/mixnet/protocol/gossip.py @@ -2,13 +2,11 @@ from __future__ import annotations import hashlib from dataclasses import dataclass -from enum import Enum -from typing import Awaitable, Callable, Self +from typing import Awaitable, Callable from framework import Framework from protocol.connection import ( DuplexConnection, - MixSimplexConnection, SimplexConnection, ) from protocol.error import PeeringDegreeReached @@ -63,15 +61,30 @@ class Gossip: msg = await conn.recv() if self.__check_update_cache(msg): continue - await self.process_inbound_msg(msg) + await self._process_inbound_msg(msg) - async def process_inbound_msg(self, msg: bytes): - await self.gossip(msg) + async def _process_inbound_msg(self, msg: bytes): + await self._gossip(msg) await self.handler(msg) - async def gossip(self, msg: bytes): + async def publish(self, msg: bytes): """ - Gossip a message to all connected peers. + Publish a message to all nodes in the network. + """ + # Don't publish the same message twice. + # Touching the cache here is necessary because this method is called by the user, + # even though we update the cache in the _process_inbound_msg method. + # It's because we don't want this publisher node to gossip the message again + # when it first receives the messages from one of its peers later. + if not self.__check_update_cache(msg): + await self._gossip(msg) + # With the same reason, call the handler here + # which means that we consider that this publisher node received the message. + await self.handler(msg) + + async def _gossip(self, msg: bytes): + """ + Gossip a message to all peers connected to this node. """ for conn in self.conns: await conn.send(msg) diff --git a/mixnet/protocol/node.py b/mixnet/protocol/node.py index a43b866..4b6d77f 100644 --- a/mixnet/protocol/node.py +++ b/mixnet/protocol/node.py @@ -76,12 +76,12 @@ class Node: match result: case SphinxPacket(): # Gossip the next Sphinx packet - await self.nomssip.gossip(result.bytes()) + await self.nomssip.publish(result.bytes()) case bytes(): if self.recovered_msg_handler is not None: result = await self.recovered_msg_handler(result) # Broadcast the message fully recovered from Sphinx packets - await self.broadcast.gossip(result) + await self.broadcast.publish(result) case None: return @@ -129,7 +129,7 @@ class Node: self.global_config, self.config.mix_path_length, ) - await self.nomssip.gossip(sphinx_packet.bytes()) + await self.nomssip.publish(sphinx_packet.bytes()) def connect_nodes( diff --git a/mixnet/protocol/nomssip.py b/mixnet/protocol/nomssip.py index 3875b34..588a92e 100644 --- a/mixnet/protocol/nomssip.py +++ b/mixnet/protocol/nomssip.py @@ -56,7 +56,7 @@ class Nomssip(Gossip): ) @override - async def process_inbound_msg(self, msg: bytes): + async def _process_inbound_msg(self, msg: bytes): packet = FlaggedPacket.from_bytes(msg) match packet.flag: case FlaggedPacket.Flag.NOISE: @@ -67,7 +67,7 @@ class Nomssip(Gossip): await self.handler(packet.message) @override - async def gossip(self, msg: bytes): + async def _gossip(self, msg: bytes): """ Gossip a message to all connected peers with prepending a message flag """ @@ -81,7 +81,7 @@ class Nomssip(Gossip): """ An internal method to send a flagged packet to all connected peers """ - await super().gossip(packet.bytes()) + await super()._gossip(packet.bytes()) class FlaggedPacket: diff --git a/mixnet/queuesim/node.py b/mixnet/queuesim/node.py index ee4495a..f9bbdf6 100644 --- a/mixnet/queuesim/node.py +++ b/mixnet/queuesim/node.py @@ -29,4 +29,4 @@ class Node: """ Send the message via Nomos Gossip to all connected peers. """ - await self.nomssip.gossip(msg) + await self.nomssip.publish(msg)