optimize: exclude the node who sent the message when gossiping the message

This commit is contained in:
Youngjoon Lee 2024-08-13 01:14:27 +09:00
parent c60039963f
commit befb52d014
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
2 changed files with 15 additions and 11 deletions

View File

@ -61,10 +61,10 @@ class Gossip:
msg = await conn.recv()
if self.__check_update_cache(msg):
continue
await self._process_inbound_msg(msg)
await self._process_inbound_msg(msg, conn)
async def _process_inbound_msg(self, msg: bytes):
await self._gossip(msg)
async def _process_inbound_msg(self, msg: bytes, received_from: DuplexConnection):
await self._gossip(msg, [received_from])
await self.handler(msg)
async def publish(self, msg: bytes):
@ -82,12 +82,13 @@ class Gossip:
# which means that we consider that this publisher node received the message.
await self.handler(msg)
async def _gossip(self, msg: bytes):
async def _gossip(self, msg: bytes, excludes: list[DuplexConnection] = []):
"""
Gossip a message to all peers connected to this node.
"""
for conn in self.conns:
await conn.send(msg)
if conn not in excludes:
await conn.send(msg)
def __check_update_cache(self, packet: bytes) -> bool:
"""

View File

@ -6,6 +6,7 @@ from typing import Awaitable, Callable, Self, override
from framework import Framework
from protocol.connection import (
DuplexConnection,
MixSimplexConnection,
SimplexConnection,
)
@ -56,18 +57,18 @@ class Nomssip(Gossip):
)
@override
async def _process_inbound_msg(self, msg: bytes):
async def _process_inbound_msg(self, msg: bytes, received_from: DuplexConnection):
packet = FlaggedPacket.from_bytes(msg)
match packet.flag:
case FlaggedPacket.Flag.NOISE:
# Drop noise packet
return
case FlaggedPacket.Flag.REAL:
await self.__gossip_flagged_packet(packet)
await self.__gossip_flagged_packet(packet, [received_from])
await self.handler(packet.message)
@override
async def _gossip(self, msg: bytes):
async def _gossip(self, msg: bytes, excludes: list[DuplexConnection] = []):
"""
Gossip a message to all connected peers with prepending a message flag
"""
@ -75,13 +76,15 @@ class Nomssip(Gossip):
assert len(msg) == self.config.msg_size, f"{len(msg)} != {self.config.msg_size}"
packet = FlaggedPacket(FlaggedPacket.Flag.REAL, msg)
await self.__gossip_flagged_packet(packet)
await self.__gossip_flagged_packet(packet, excludes)
async def __gossip_flagged_packet(self, packet: FlaggedPacket):
async def __gossip_flagged_packet(
self, packet: FlaggedPacket, excludes: list[DuplexConnection] = []
):
"""
An internal method to send a flagged packet to all connected peers
"""
await super()._gossip(packet.bytes())
await super()._gossip(packet.bytes(), excludes)
class FlaggedPacket: