mirror of
https://github.com/logos-blockchain/logos-blockchain-simulations.git
synced 2026-02-25 23:53:08 +00:00
fix optimization: put right messages to cache in nomssip
This commit is contained in:
parent
618705e252
commit
5aeecf45d9
@ -59,7 +59,7 @@ class Gossip:
|
||||
async def __process_inbound_conn(self, conn: DuplexConnection):
|
||||
while True:
|
||||
msg = await conn.recv()
|
||||
if self.__check_update_cache(msg):
|
||||
if self._check_update_cache(msg):
|
||||
continue
|
||||
await self._process_inbound_msg(msg, conn)
|
||||
|
||||
@ -76,7 +76,7 @@ class Gossip:
|
||||
# even though we update the cache in the _process_inbound_msg method.
|
||||
# It's because we don't want this publisher node to gossip the message again
|
||||
# when it first receives the messages from one of its peers later.
|
||||
if not self.__check_update_cache(msg):
|
||||
if not self._check_update_cache(msg):
|
||||
await self._gossip(msg)
|
||||
# With the same reason, call the handler here
|
||||
# which means that we consider that this publisher node received the message.
|
||||
@ -90,7 +90,7 @@ class Gossip:
|
||||
if conn not in excludes:
|
||||
await conn.send(msg)
|
||||
|
||||
def __check_update_cache(self, packet: bytes) -> bool:
|
||||
def _check_update_cache(self, packet: bytes) -> bool:
|
||||
"""
|
||||
Add a message to the cache, and return True if the message was already in the cache.
|
||||
"""
|
||||
|
||||
@ -64,28 +64,24 @@ class Nomssip(Gossip):
|
||||
# Drop noise packet
|
||||
return
|
||||
case FlaggedPacket.Flag.REAL:
|
||||
await self.__gossip_flagged_packet(packet, [received_from])
|
||||
self.assert_message_size(packet.message)
|
||||
await super()._gossip(msg, [received_from])
|
||||
await self.handler(packet.message)
|
||||
|
||||
@override
|
||||
async def _gossip(self, msg: bytes, excludes: list[DuplexConnection] = []):
|
||||
"""
|
||||
Gossip a message to all connected peers with prepending a message flag
|
||||
"""
|
||||
async def publish(self, msg: bytes):
|
||||
self.assert_message_size(msg)
|
||||
|
||||
packet = FlaggedPacket(FlaggedPacket.Flag.REAL, msg).bytes()
|
||||
# Please see comments in super().publish() for the reason of the following line.
|
||||
if not self._check_update_cache(packet):
|
||||
await self._gossip(packet)
|
||||
await self.handler(msg)
|
||||
|
||||
def assert_message_size(self, msg: bytes):
|
||||
# The message size must be fixed.
|
||||
assert len(msg) == self.config.msg_size, f"{len(msg)} != {self.config.msg_size}"
|
||||
|
||||
packet = FlaggedPacket(FlaggedPacket.Flag.REAL, msg)
|
||||
await self.__gossip_flagged_packet(packet, excludes)
|
||||
|
||||
async def __gossip_flagged_packet(
|
||||
self, packet: FlaggedPacket, excludes: list[DuplexConnection] = []
|
||||
):
|
||||
"""
|
||||
An internal method to send a flagged packet to all connected peers
|
||||
"""
|
||||
await super()._gossip(packet.bytes(), excludes)
|
||||
|
||||
|
||||
class FlaggedPacket:
|
||||
class Flag(Enum):
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user