optimize: mark messages as RECEIVED immediately in the publisher node when the messages are published

This commit is contained in:
Youngjoon Lee 2024-08-13 01:02:16 +09:00
parent aebc0459b5
commit c60039963f
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
4 changed files with 28 additions and 15 deletions

View File

@ -2,13 +2,11 @@ from __future__ import annotations
import hashlib
from dataclasses import dataclass
from enum import Enum
from typing import Awaitable, Callable, Self
from typing import Awaitable, Callable
from framework import Framework
from protocol.connection import (
DuplexConnection,
MixSimplexConnection,
SimplexConnection,
)
from protocol.error import PeeringDegreeReached
@ -63,15 +61,30 @@ class Gossip:
msg = await conn.recv()
if self.__check_update_cache(msg):
continue
await self.process_inbound_msg(msg)
await self._process_inbound_msg(msg)
async def process_inbound_msg(self, msg: bytes):
await self.gossip(msg)
async def _process_inbound_msg(self, msg: bytes):
await self._gossip(msg)
await self.handler(msg)
async def gossip(self, msg: bytes):
async def publish(self, msg: bytes):
"""
Gossip a message to all connected peers.
Publish a message to all nodes in the network.
"""
# Don't publish the same message twice.
# Touching the cache here is necessary because this method is called by the user,
# even though we update the cache in the _process_inbound_msg method.
# It's because we don't want this publisher node to gossip the message again
# when it first receives the messages from one of its peers later.
if not self.__check_update_cache(msg):
await self._gossip(msg)
# With the same reason, call the handler here
# which means that we consider that this publisher node received the message.
await self.handler(msg)
async def _gossip(self, msg: bytes):
"""
Gossip a message to all peers connected to this node.
"""
for conn in self.conns:
await conn.send(msg)

View File

@ -76,12 +76,12 @@ class Node:
match result:
case SphinxPacket():
# Gossip the next Sphinx packet
await self.nomssip.gossip(result.bytes())
await self.nomssip.publish(result.bytes())
case bytes():
if self.recovered_msg_handler is not None:
result = await self.recovered_msg_handler(result)
# Broadcast the message fully recovered from Sphinx packets
await self.broadcast.gossip(result)
await self.broadcast.publish(result)
case None:
return
@ -129,7 +129,7 @@ class Node:
self.global_config,
self.config.mix_path_length,
)
await self.nomssip.gossip(sphinx_packet.bytes())
await self.nomssip.publish(sphinx_packet.bytes())
def connect_nodes(

View File

@ -56,7 +56,7 @@ class Nomssip(Gossip):
)
@override
async def process_inbound_msg(self, msg: bytes):
async def _process_inbound_msg(self, msg: bytes):
packet = FlaggedPacket.from_bytes(msg)
match packet.flag:
case FlaggedPacket.Flag.NOISE:
@ -67,7 +67,7 @@ class Nomssip(Gossip):
await self.handler(packet.message)
@override
async def gossip(self, msg: bytes):
async def _gossip(self, msg: bytes):
"""
Gossip a message to all connected peers with prepending a message flag
"""
@ -81,7 +81,7 @@ class Nomssip(Gossip):
"""
An internal method to send a flagged packet to all connected peers
"""
await super().gossip(packet.bytes())
await super()._gossip(packet.bytes())
class FlaggedPacket:

View File

@ -29,4 +29,4 @@ class Node:
"""
Send the message via Nomos Gossip to all connected peers.
"""
await self.nomssip.gossip(msg)
await self.nomssip.publish(msg)