Youngjoon Lee 3d14319588
Targeted experiments for queuing mechanism
Targeted experiments for queuing mechanism

gather series into dataframe

put exp_id to the CSV path

revert iterations back to num_nodes/2

add missing print and decrease msg_interval_sec

change param sequence for readability

use struct instead of pickle for fixed-size & faster serde

include dtime series into dataframe

optimize: choose optimized connection type according to latency setting

add skip_sending_noise option

optimize filling up the queue with noises

move queue_type to the end of param set, and build CSV gradually row by row

fix: consider num_senders when waiting until all messages are disseminated

fix: sample senders without duplicate

fix: build param combinations correctly

add plot script

initialize MinSizeMixQueue with noises

define SessionParameterSet and add paramset for session2

improve topology connectivity check to avoid "maxmimum recursions depth exceeded" error

fix: the correct parameter set constructor

store individual series to separate CSV files

reorganize files and draw plot automatically

start series file id from 1 (not 0)

add queue_type CLI argument for parallelization

pretty format of elapsed time

pretty format of elapsed time

add merge CLI and draw multiple plots

split functions

do not draw plot for each session

use concurrent.futures to utilize multiprocessing

add from_paramset argument

fix: count num of finished iterations correctly

draw plots for num_sent_msgs and num_senders for specific experiments
2024-08-02 11:38:17 +09:00

111 lines
3.2 KiB
Python

from __future__ import annotations
import hashlib
import random
from dataclasses import dataclass
from enum import Enum
from typing import Awaitable, Callable, Self, override
from framework import Framework
from protocol.connection import (
DuplexConnection,
MixSimplexConnection,
SimplexConnection,
)
from protocol.error import PeeringDegreeReached
from protocol.gossip import Gossip, GossipConfig
from protocol.temporalmix import TemporalMixConfig
@dataclass
class NomssipConfig(GossipConfig):
transmission_rate_per_sec: int
msg_size: int
temporal_mix: TemporalMixConfig
# OPTIMIZATION ONLY FOR EXPERIMENTS WITHOUT BANDWIDTH MEASUREMENT
# If True, skip sending a noise even if it's time to send one.
skip_sending_noise: bool = False
class Nomssip(Gossip):
"""
A NomMix gossip channel that extends the Gossip channel
by adding global transmission rate and noise generation.
"""
def __init__(
self,
framework: Framework,
config: NomssipConfig,
handler: Callable[[bytes], Awaitable[None]],
):
super().__init__(framework, config, handler)
self.config = config
@override
def add_conn(self, inbound: SimplexConnection, outbound: SimplexConnection):
noise_packet = FlaggedPacket(
FlaggedPacket.Flag.NOISE, bytes(self.config.msg_size)
).bytes()
super().add_conn(
inbound,
MixSimplexConnection(
self.framework,
outbound,
self.config.transmission_rate_per_sec,
noise_packet,
self.config.temporal_mix,
self.config.skip_sending_noise,
),
)
@override
async def process_inbound_msg(self, msg: bytes):
packet = FlaggedPacket.from_bytes(msg)
match packet.flag:
case FlaggedPacket.Flag.NOISE:
# Drop noise packet
return
case FlaggedPacket.Flag.REAL:
await self.__gossip_flagged_packet(packet)
await self.handler(packet.message)
@override
async def gossip(self, msg: bytes):
"""
Gossip a message to all connected peers with prepending a message flag
"""
# The message size must be fixed.
assert len(msg) == self.config.msg_size, f"{len(msg)} != {self.config.msg_size}"
packet = FlaggedPacket(FlaggedPacket.Flag.REAL, msg)
await self.__gossip_flagged_packet(packet)
async def __gossip_flagged_packet(self, packet: FlaggedPacket):
"""
An internal method to send a flagged packet to all connected peers
"""
await super().gossip(packet.bytes())
class FlaggedPacket:
class Flag(Enum):
REAL = b"\x00"
NOISE = b"\x01"
def __init__(self, flag: Flag, message: bytes):
self.flag = flag
self.message = message
def bytes(self) -> bytes:
return self.flag.value + self.message
@classmethod
def from_bytes(cls, packet: bytes) -> Self:
"""
Parse a flagged packet from bytes
"""
if len(packet) < 1:
raise ValueError("Invalid message format")
return cls(cls.Flag(packet[:1]), packet[1:])