message history + batch

This commit is contained in:
Youngjoon Lee 2025-02-06 17:26:12 +09:00
parent a42dc4b4bf
commit e2d1c37399
No known key found for this signature in database
GPG Key ID: D94003D91DE12141
17 changed files with 955 additions and 348 deletions

View File

@ -1,49 +1,109 @@
{
"network_settings": {
"network_behaviors": {
"north america:north america": "50ms",
"north america:europe": "100ms",
"north america:asia": "120ms",
"north america west:north america west": "40ms",
"north america west:north america east": "70ms",
"north america west:north america central": "50ms",
"north america west:europe": "150ms",
"north america west:northern europe": "170ms",
"north america west:east asia": "180ms",
"north america west:southeast asia": "200ms",
"north america west:australia": "250ms",
"north america east:north america west": "70ms",
"north america east:north america east": "40ms",
"north america east:north america central": "50ms",
"north america east:europe": "130ms",
"north america east:northern europe": "140ms",
"north america east:east asia": "250ms",
"north america east:southeast asia": "300ms",
"north america east:australia": "230ms",
"north america central:north america west": "50ms",
"north america central:north america east": "50ms",
"north america central:north america central": "20ms",
"north america central:europe": "140ms",
"north america central:northern europe": "150ms",
"north america central:east asia": "200ms",
"north america central:southeast asia": "280ms",
"north america central:australia": "220ms",
"europe:north america west": "150ms",
"europe:north america east": "130ms",
"europe:north america central": "140ms",
"europe:europe": "50ms",
"europe:asia": "100ms",
"europe:north america": "120ms",
"asia:north america": "100ms",
"asia:europe": "120ms",
"asia:asia": "40ms"
"europe:northern europe": "60ms",
"europe:east asia": "300ms",
"europe:southeast asia": "300ms",
"europe:australia": "300ms",
"northern europe:north america west": "170ms",
"northern europe:north america east": "140ms",
"northern europe:north america central": "150ms",
"northern europe:europe": "60ms",
"northern europe:northern europe": "30ms",
"northern europe:east asia": "400ms",
"northern europe:southeast asia": "320ms",
"northern europe:australia": "300ms",
"east asia:north america west": "180ms",
"east asia:north america east": "250ms",
"east asia:north america central": "200ms",
"east asia:europe": "300ms",
"east asia:northern europe": "400ms",
"east asia:east asia": "50ms",
"east asia:southeast asia": "90ms",
"east asia:australia": "150ms",
"southeast asia:north america west": "200ms",
"southeast asia:north america east": "300ms",
"southeast asia:north america central": "280ms",
"southeast asia:europe": "300ms",
"southeast asia:northern europe": "320ms",
"southeast asia:east asia": "90ms",
"southeast asia:southeast asia": "40ms",
"southeast asia:australia": "150ms",
"australia:north america west": "250ms",
"australia:north america east": "230ms",
"australia:north america central": "220ms",
"australia:europe": "300ms",
"australia:northern europe": "300ms",
"australia:east asia": "150ms",
"australia:southeast asia": "150ms",
"australia:australia": "50ms"
},
"regions": {
"north america": 0.4,
"europe": 0.4,
"asia": 0.3
"north america west": 0.06,
"north america east": 0.15,
"north america central": 0.02,
"europe": 0.47,
"northern europe": 0.10,
"east asia": 0.10,
"southeast asia": 0.07,
"australia": 0.03
}
},
"node_settings": {
"timeout": "1000ms"
},
"step_time": "40ms",
"step_time": "10ms",
"runner_settings": "Sync",
"stream_settings": {
"path": "test.json"
},
"node_count": 10,
"node_count": 100,
"seed": 0,
"record_settings": {},
"wards": [
{
"sum": 10
"sum": 100
}
],
"connected_peers_count": 4,
"data_message_lottery_interval": "20s",
"stake_proportion": 1.0,
"epoch_duration": "432000s",
"slot_duration": "20s",
"slots_per_epoch": 21600,
"epoch_duration": "200s",
"slot_duration": "1s",
"slots_per_epoch": 200,
"number_of_hops": 2,
"persistent_transmission": {
"max_emission_frequency": 1.0,
"drop_message_probability": 0.0
},
"number_of_blend_layers": 2,
"max_delay_seconds": 10
"max_delay_seconds": 5
}

View File

@ -0,0 +1,289 @@
# !/usr/bin/env python
import argparse
import csv
import json
import os
import subprocess
from collections import OrderedDict
import latency
import mixlog
def bandwidth_result(log_path: str, step_duration_ms: int) -> dict[str, float]:
max_step_id = 0
for topic, json_msg in mixlog.get_input_stream(log_path):
if topic == "MessageFullyUnwrapped":
max_step_id = max(max_step_id, json_msg["message"]["step_id"])
with open(log_path, "r") as file:
for line in file:
if "total_outbound_bandwidth" in line:
line = line[line.find("{") :]
line = line.replace("{ ", '{"')
line = line.replace(": ", '": ')
line = line.replace(", ", ', "')
record = json.loads(line)
elapsed = (max_step_id * step_duration_ms) / 1000.0
return {
"min": float(record["min_node_total_bandwidth"]) / elapsed,
"avg": float(record["avg_node_total_bandwidth"]) / elapsed,
"max": float(record["max_node_total_bandwidth"]) / elapsed,
}
raise Exception("No bandwidth data found in log file")
def topology_result(log_path: str) -> dict[str, int]:
for topic, json_msg in mixlog.get_input_stream(log_path):
if topic == "Topology":
return json_msg
raise Exception("No topology found in log file")
def build_argument_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Log analysis for nomos-simulations.")
parser.add_argument(
"--step-duration",
type=int,
help="Duration (in ms) of each step in the simulation.",
)
parser.add_argument(
"--params-file",
type=str,
help="A CSV file that contains all parameter sets",
)
parser.add_argument(
"--orig-config-file",
type=str,
help="An original blendnet config JSON file that will be modified as specified in `params_file`",
)
parser.add_argument(
"--outdir",
type=str,
help="A output directory to be created",
)
parser.add_argument(
"--skip-run",
action="store_true",
help="Skip running the simulations and only analyze the logs",
)
return parser
if __name__ == "__main__":
argument_parser = build_argument_parser()
args = argument_parser.parse_args()
# Read the params CSV file
csv_data = []
with open(args.params_file, mode="r") as csvfile:
reader = csv.DictReader(csvfile, delimiter=",")
csv_data = list(reader)
# Read the original blendnet json config file
with open(args.orig_config_file, "r") as jsonfile:
original_json = json.load(jsonfile)
# Directory to save modified JSON files
modified_configs_dir = os.path.join(args.outdir, "modified_configs")
os.makedirs(modified_configs_dir, exist_ok=True)
# Modify and save JSON files for each row in CSV
config_paths = []
for idx, row in enumerate(csv_data):
output_path = os.path.join(modified_configs_dir, f"{idx}.json")
config_paths.append(output_path)
if args.skip_run:
continue
modified_json = OrderedDict(original_json) # Preserve original field order
# Apply modifications
modified_json["network_settings"]["regions"]["north america west"] = 0.06
modified_json["network_settings"]["regions"]["north america east"] = 0.15
modified_json["network_settings"]["regions"]["north america central"] = 0.02
modified_json["network_settings"]["regions"]["europe"] = 0.47
modified_json["network_settings"]["regions"]["northern europe"] = 0.10
modified_json["network_settings"]["regions"]["east asia"] = 0.10
modified_json["network_settings"]["regions"]["southeast asia"] = 0.07
modified_json["network_settings"]["regions"]["australia"] = 0.03
modified_json["step_time"] = f"{args.step_duration}ms"
modified_json["node_count"] = int(row["network_size"])
modified_json["wards"][0]["sum"] = 1000
modified_json["connected_peers_count"] = int(row["peering_degree"])
modified_json["data_message_lottery_interval"] = "20s"
modified_json["stake_proportion"] = 0.0
modified_json["persistent_transmission"]["max_emission_frequency"] = 1.0
modified_json["persistent_transmission"]["drop_message_probability"] = 0.0
modified_json["epoch_duration"] = (
f"{int(row['cover_slots_per_epoch']) * int(row['cover_slot_duration'])}s"
)
modified_json["slots_per_epoch"] = int(row["cover_slots_per_epoch"])
modified_json["slot_duration"] = f"{row['cover_slot_duration']}s"
modified_json["max_delay_seconds"] = int(row["max_temporal_delay"])
modified_json["number_of_hops"] = int(row["blend_hops"])
modified_json["number_of_blend_layers"] = int(row["blend_hops"])
# Save modified JSON
with open(output_path, "w") as outfile:
json.dump(modified_json, outfile, indent=2)
print("Saved modified JSON to:", output_path)
# Directory to save logs
log_dir = os.path.join(args.outdir, "logs")
os.makedirs(log_dir, exist_ok=True)
log_paths = []
for idx, config_path in enumerate(config_paths):
log_path = f"{log_dir}/{idx}.log"
log_paths.append(log_path)
if args.skip_run:
continue
with open(log_path, "w") as log_file:
print(
f"Running simulation-{idx}: {log_file.name} with config: {config_path}"
)
subprocess.run(
["../../target/release/blendnet-sims", "--input-settings", config_path],
stdout=log_file,
)
print(f"Simulation-{idx} completed: {log_file.name}")
print("Analyzing logs...")
print("=================")
with open(os.path.join(args.outdir, "output.csv"), "w", newline="") as file:
print(f"Writing results to: {file.name}")
csv_writer = csv.writer(file)
csv_writer.writerow(
[
"network_diameter",
"msg_count",
"min_latency_sec",
"avg_latency_sec",
"median_latency_sec",
"max_latency_sec",
"min_latency_msg_id",
"min_latency_msg_persistent_latency_sec",
"min_latency_msg_persistent_index",
"min_latency_msg_temporal_latency_sec",
"min_latency_msg_temporal_index",
"max_latency_msg_id",
"max_latency_msg_persistent_latency_sec",
"max_latency_msg_persistent_index",
"max_latency_msg_temporal_latency_sec",
"max_latency_msg_temporal_index",
"min_conn_latency_sec",
"avg_conn_latency_sec",
"med_conn_latency_sec",
"max_conn_latency_sec",
"min_bandwidth_kbps",
"avg_bandwidth_kbps",
"max_bandwidth_kbps",
]
)
for idx, log_path in enumerate(log_paths):
csv_row = []
csv_row.append(topology_result(log_path)["diameter"])
latency_analysis = latency.LatencyAnalysis.build(
mixlog.get_input_stream(log_path)
)
csv_row.append(latency_analysis.total_messages)
csv_row.append(float(latency_analysis.min_latency_ms) / 1000.0)
csv_row.append(float(latency_analysis.avg_latency_ms) / 1000.0)
csv_row.append(float(latency_analysis.median_latency_ms) / 1000.0)
csv_row.append(float(latency_analysis.max_latency_ms) / 1000.0)
csv_row.append(latency_analysis.min_latency_analysis.message_id)
csv_row.append(
",".join(
map(
str,
[
ms / 1000.0
for ms in latency_analysis.min_latency_analysis.persistent_latencies_ms
],
)
)
)
csv_row.append(
",".join(
map(str, latency_analysis.min_latency_analysis.persistent_indices)
)
)
csv_row.append(
",".join(
map(
str,
[
ms / 1000.0
for ms in latency_analysis.min_latency_analysis.temporal_latencies_ms
],
)
)
)
csv_row.append(
",".join(
map(str, latency_analysis.min_latency_analysis.temporal_indices)
)
)
csv_row.append(latency_analysis.max_latency_analysis.message_id)
csv_row.append(
",".join(
map(
str,
[
ms / 1000.0
for ms in latency_analysis.max_latency_analysis.persistent_latencies_ms
],
)
)
)
csv_row.append(
",".join(
map(str, latency_analysis.max_latency_analysis.persistent_indices)
)
)
csv_row.append(
",".join(
map(
str,
[
ms / 1000.0
for ms in latency_analysis.max_latency_analysis.temporal_latencies_ms
],
)
)
)
csv_row.append(
",".join(
map(str, latency_analysis.max_latency_analysis.temporal_indices)
)
)
csv_row.append(
float(latency_analysis.conn_latency_analysis.min_ms) / 1000.0
)
csv_row.append(
float(latency_analysis.conn_latency_analysis.avg_ms) / 1000.0
)
csv_row.append(
float(latency_analysis.conn_latency_analysis.med_ms) / 1000.0
)
csv_row.append(
float(latency_analysis.conn_latency_analysis.max_ms) / 1000.0
)
bandwidth_res = bandwidth_result(log_path, args.step_duration)
csv_row.append(bandwidth_res["min"] * 8 / 1000.0)
csv_row.append(bandwidth_res["avg"] * 8 / 1000.0)
csv_row.append(bandwidth_res["max"] * 8 / 1000.0)
csv_writer.writerow(csv_row)
print(f"The outputs have been successfully written to {file.name}")

View File

@ -1,16 +1,14 @@
import argparse
import json
from collections.abc import Iterable
from typing import Any
import matplotlib
import matplotlib.pyplot as plt
import mixlog
import pandas as pd
import mixlog
def plot_emissions(input_stream: Iterable[str], plot_path: str) -> None:
def plot_emissions(input_stream: Iterable[tuple[str, dict]], plot_path: str) -> None:
df = pd.DataFrame(emission_records(input_stream))
plt.figure(figsize=(12, 6))
@ -24,19 +22,8 @@ def plot_emissions(input_stream: Iterable[str], plot_path: str) -> None:
plt.show()
def emission_records(input_stream: Iterable[str]) -> list[Any]:
records = []
for line in input_stream:
try:
record = json.loads(line)
except json.JSONDecodeError:
continue
if "emission_type" in record:
records.append(record)
return records
def emission_records(input_stream: Iterable[tuple[str, dict]]) -> list[Any]:
return [record for _, record in filter(lambda x: x[0] == "Emission", input_stream)]
if __name__ == "__main__":

View File

@ -1,88 +1,113 @@
# !/usr/bin/env python
import argparse
import json
import statistics
from collections.abc import Iterable
from typing import Dict, Optional
from dataclasses import asdict, dataclass
from typing import Iterable
import mixlog
import pandas as pd
class Message:
def __init__(self, message_id: str, step_a: Optional[int]):
self.id = message_id
self.step_a = int(step_a) if step_a is not None else None
self.step_b = None
@dataclass
class LatencyAnalysis:
total_messages: int
min_latency_ms: int
min_latency_analysis: "MessageLatencyAnalysis"
max_latency_ms: int
max_latency_analysis: "MessageLatencyAnalysis"
avg_latency_ms: int
median_latency_ms: int
conn_latency_analysis: "ConnectionLatencyAnalysis"
def __hash__(self):
return self.id
@classmethod
def build(cls, input_stream: Iterable[tuple[str, dict]]) -> "LatencyAnalysis":
latencies = []
message_ids = []
messages: dict[str, dict] = {}
for topic, record in input_stream:
if topic != "MessageFullyUnwrapped":
continue
latencies.append(record["total_duration"])
message_id = record["message"]["payload_id"]
message_ids.append(message_id)
messages[message_id] = record
def __repr__(self):
return f"[{self.id}] {self.step_a} -> {self.step_b}"
latencies = pd.Series(latencies)
message_ids = pd.Series(message_ids)
@property
def latency(self) -> Optional[int]:
if self.step_a is not None and self.step_b is not None:
return abs(self.step_a - self.step_b)
return cls(
total_messages=int(latencies.count()),
min_latency_ms=int(latencies.min()),
min_latency_analysis=MessageLatencyAnalysis.build(
messages[str(message_ids[latencies.idxmin()])]
),
max_latency_ms=int(latencies.max()),
max_latency_analysis=MessageLatencyAnalysis.build(
messages[str(message_ids[latencies.idxmax()])]
),
avg_latency_ms=int(latencies.mean()),
median_latency_ms=int(latencies.median()),
conn_latency_analysis=ConnectionLatencyAnalysis.build(messages),
)
MessageStorage = Dict[str, Message]
@dataclass
class MessageLatencyAnalysis:
message_id: str
persistent_latencies_ms: list[int]
persistent_indices: list[int]
temporal_latencies_ms: list[int]
temporal_indices: list[int]
@classmethod
def build(
cls,
message: dict,
) -> "MessageLatencyAnalysis":
analysis = cls(message["message"]["payload_id"], [], [], [], [])
for event in message["history"]:
event_type = event["event_type"]
if "PersistentTransmissionScheduled" in event_type:
analysis.persistent_indices.append(
int(event_type["PersistentTransmissionScheduled"]["index"])
)
elif event_type == "PersistentTransmissionReleased":
analysis.persistent_latencies_ms.append(event["duration_from_prev"])
elif "TemporalProcessorScheduled" in event_type:
analysis.temporal_indices.append(
int(event_type["TemporalProcessorScheduled"]["index"])
)
elif event_type == "TemporalProcessorReleased":
analysis.temporal_latencies_ms.append(event["duration_from_prev"])
return analysis
def compute_results(message_storage: MessageStorage, step_duration: int) -> str:
latencies = [message_record.latency for message_record in message_storage.values()]
valued_latencies = [latency for latency in latencies if latency is not None]
incomplete_latencies = sum((1 for latency in latencies if latency is None))
@dataclass
class ConnectionLatencyAnalysis:
min_ms: int
avg_ms: int
med_ms: int
max_ms: int
total_messages = len(latencies)
total_messages_full_latency = len(valued_latencies)
total_messages_incomplete_latency = incomplete_latencies
latency_average_steps = statistics.mean(valued_latencies)
latency_average_ms = "{:.2f}ms".format(latency_average_steps * step_duration)
latency_median_steps = statistics.median(valued_latencies)
latency_median_ms = "{:.2f}ms".format(latency_median_steps * step_duration)
max_latency_steps = max(valued_latencies)
max_latency_ms = "{:.2f}ms".format(max_latency_steps * step_duration)
min_latency_steps = min(valued_latencies)
min_latency_ms = "{:.2f}ms".format(min_latency_steps * step_duration)
return f"""[Results]
- Total messages: {total_messages}
- Full latencies: {total_messages_full_latency}
- Incomplete latencies: {total_messages_incomplete_latency}
- Averages
- Steps: {latency_average_steps}
- Duration: {latency_average_ms}
- Median
- Steps: {latency_median_steps}
- Duration: {latency_median_ms}
- Max
- Steps: {max_latency_steps}
- Duration: {max_latency_ms}
- Min
- Steps: {min_latency_steps}
- Duration: {min_latency_ms}"""
def parse_record_stream(record_stream: Iterable[str]) -> MessageStorage:
storage: MessageStorage = {}
for record in record_stream:
try:
json_record = json.loads(record)
except json.decoder.JSONDecodeError:
continue
if (payload_id := json_record.get("payload_id")) is None:
continue
step_id = json_record["step_id"]
if (stored_message := storage.get(payload_id)) is None:
storage[payload_id] = Message(payload_id, step_id)
else:
stored_message.step_b = step_id
return storage
@classmethod
def build(
cls,
messages: dict[str, dict],
) -> "ConnectionLatencyAnalysis":
latencies = []
for message in messages.values():
for event in message["history"]:
if "NetworkReceived" in event["event_type"]:
latencies.append(event["duration_from_prev"])
latencies = pd.Series(latencies)
return cls(
int(latencies.min()),
int(latencies.mean()),
int(latencies.median()),
int(latencies.max()),
)
def build_argument_parser() -> argparse.ArgumentParser:
@ -105,8 +130,5 @@ if __name__ == "__main__":
argument_parser = build_argument_parser()
arguments = argument_parser.parse_args()
input_stream = mixlog.get_input_stream(arguments.input_file)
messages = parse_record_stream(input_stream)
results = compute_results(messages, arguments.step_duration)
print(results)
analysis = LatencyAnalysis.build(mixlog.get_input_stream(arguments.input_file))
print(json.dumps(asdict(analysis), indent=2))

View File

@ -1,12 +1,20 @@
import json
import sys
from collections.abc import Iterable
from typing import Optional
TOPIC_LOG_INDICATOR = '{"topic":'
def line_to_json_stream(record_stream: Iterable[str]) -> Iterable[str]:
def line_to_json_stream(record_stream: Iterable[str]) -> Iterable[tuple[str, dict]]:
for record in record_stream:
bracket_pos = record.rfind("{")
yield record[bracket_pos:]
topic_idx = record.find(TOPIC_LOG_INDICATOR)
if topic_idx == -1:
continue
# Split the line into 2 parts: topic and JSON message
log = json.loads(record[topic_idx:].strip())
yield (log["topic"], log["message"])
def get_pipe_stream() -> Iterable[str]:
@ -18,7 +26,7 @@ def get_file_stream(input_filename) -> Iterable[str]:
yield from file
def get_input_stream(input_filename: Optional[str]) -> Iterable[str]:
def get_input_stream(input_filename: Optional[str]) -> Iterable[tuple[str, dict]]:
stream = (
get_file_stream(input_filename)
if input_filename is not None

View File

@ -10,6 +10,7 @@ use crate::node::blend::{BlendMessage, BlendnodeSettings};
use anyhow::Ok;
use clap::Parser;
use crossbeam::channel;
use multiaddr::Multiaddr;
use netrunner::network::behaviour::create_behaviours;
use netrunner::network::regions::{create_regions, RegionsData};
use netrunner::network::{InMemoryNetworkInterface, Network, PayloadSize};
@ -85,13 +86,26 @@ impl SimulationApp {
&mut rng,
&settings.simulation_settings.network_settings,
);
log!(
"Regions",
regions
.iter()
.map(|(region, node_ids)| (region, node_ids.len()))
.collect::<HashMap<_, _>>()
);
log!("NumRegions", regions.len());
let behaviours = create_behaviours(&settings.simulation_settings.network_settings);
let regions_data = RegionsData::new(regions, behaviours);
let network = Arc::new(Mutex::new(Network::<BlendMessage>::new(regions_data, seed)));
let network = Arc::new(Mutex::new(Network::<BlendMessage>::new(
regions_data.clone(),
seed,
)));
let topology = Topology::new(&node_ids, settings.connected_peers_count, &mut rng);
log_topology(&topology);
log_conn_latency_distribution(&topology.conn_latency_distribution(&regions_data));
let nodes: Vec<_> = node_ids
.iter()
@ -126,7 +140,14 @@ impl SimulationApp {
slots_per_epoch: settings.slots_per_epoch,
network_size: node_ids.len(),
},
membership: node_ids.iter().map(|&id| id.into()).collect(),
membership: node_ids
.iter()
.map(|&id| nomos_blend::membership::Node {
id,
address: Multiaddr::empty(),
public_key: id.into(),
})
.collect(),
},
)
})
@ -176,6 +197,7 @@ fn create_boxed_blendnode(
Box::new(BlendNode::new(
node_id,
blendnode_settings,
simulation_settings.step_time,
network_interface,
))
}
@ -246,11 +268,13 @@ fn load_json_from_file<T: DeserializeOwned>(path: &Path) -> anyhow::Result<T> {
}
fn log_topology(topology: &Topology) {
let log = TopologyLog {
topology: topology.to_node_indices(),
diameter: topology.diameter(),
};
tracing::info!("Topology: {}", serde_json::to_string(&log).unwrap());
log!(
"Topology",
TopologyLog {
topology: topology.to_node_indices(),
diameter: topology.diameter(),
}
);
}
#[derive(Debug, Serialize, Deserialize)]
@ -259,6 +283,25 @@ struct TopologyLog {
diameter: usize,
}
fn log_conn_latency_distribution(distribution: &HashMap<Duration, usize>) {
log!(
"ConnLatencyDistribution",
ConnLatencyDistributionLog {
num_links: distribution.values().sum(),
distribution: distribution
.iter()
.map(|(latency, count)| (latency.as_millis(), *count))
.collect::<HashMap<_, _>>(),
}
);
}
#[derive(Debug, Serialize, Deserialize)]
struct ConnLatencyDistributionLog {
num_links: usize,
distribution: HashMap<u128, usize>,
}
fn main() -> anyhow::Result<()> {
let app: SimulationApp = SimulationApp::parse();
let maybe_guard = log::config_tracing(app.log_format, &app.log_to, app.with_metrics);

View File

@ -0,0 +1,21 @@
use serde::Serialize;
#[macro_export]
macro_rules! log {
($topic:expr, $msg:expr) => {
tracing::info!(
"{}",
serde_json::to_string(&$crate::node::blend::log::TopicLog {
topic: $topic.to_string(),
message: $msg
})
.unwrap()
);
};
}
#[derive(Serialize)]
pub struct TopicLog<M: Serialize> {
pub topic: String,
pub message: M,
}

View File

@ -1,3 +1,8 @@
use std::{ops::Mul, time::Duration};
use netrunner::node::serialize_node_id_as_index;
use netrunner::node::NodeId;
use serde::Serialize;
use uuid::Uuid;
pub type PayloadId = String;
@ -23,6 +28,79 @@ impl Payload {
}
}
#[derive(Debug, Clone, Serialize)]
pub struct MessageHistory(Vec<MessageEvent>);
impl MessageHistory {
pub fn new() -> Self {
Self(Vec::new())
}
pub fn add(
&mut self,
node_id: NodeId,
step_id: usize,
step_time: Duration,
event_type: MessageEventType,
) {
let duration_from_prev = self.0.last().map_or(Duration::ZERO, |prev_event| {
step_time.mul((step_id - prev_event.step_id).try_into().unwrap())
});
self.0.push(MessageEvent {
node_id,
step_id,
duration_from_prev,
event_type,
});
}
pub fn last_event_type(&self) -> Option<&MessageEventType> {
self.0.last().map(|event| &event.event_type)
}
pub fn total_duration(&self) -> Duration {
self.0.iter().map(|event| event.duration_from_prev).sum()
}
}
#[derive(Debug, Clone, Serialize)]
struct MessageEvent {
#[serde(serialize_with = "serialize_node_id_as_index")]
node_id: NodeId,
step_id: usize,
#[serde(serialize_with = "duration_as_millis")]
duration_from_prev: Duration,
event_type: MessageEventType,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub enum MessageEventType {
Created,
PersistentTransmissionScheduled {
index: usize,
},
PersistentTransmissionReleased,
TemporalProcessorScheduled {
index: usize,
},
TemporalProcessorReleased,
NetworkSent {
#[serde(serialize_with = "serialize_node_id_as_index")]
to: NodeId,
},
NetworkReceived {
#[serde(serialize_with = "serialize_node_id_as_index")]
from: NodeId,
},
}
pub fn duration_as_millis<S>(duration: &Duration, s: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
s.serialize_u64(duration.as_millis().try_into().unwrap())
}
#[cfg(test)]
mod tests {
use super::Payload;

View File

@ -1,4 +1,6 @@
pub mod consensus_streams;
#[macro_use]
pub mod log;
pub mod lottery;
mod message;
pub mod scheduler;
@ -11,20 +13,18 @@ use cached::{Cached, TimedCache};
use crossbeam::channel;
use futures::Stream;
use lottery::StakeLottery;
use message::{Payload, PayloadId};
use multiaddr::Multiaddr;
use message::{duration_as_millis, MessageEventType, MessageHistory, Payload, PayloadId};
use netrunner::network::NetworkMessage;
use netrunner::node::{Node, NodeId, NodeIdExt};
use netrunner::{
network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize},
warding::WardCondition,
};
use nomos_blend::message_blend::temporal::{TemporalProcessorExt, TemporalStream};
use nomos_blend::{
cover_traffic::{CoverTraffic, CoverTrafficSettings},
membership::Membership,
message_blend::{
crypto::CryptographicProcessor, MessageBlendExt, MessageBlendSettings, MessageBlendStream,
},
message_blend::{crypto::CryptographicProcessor, MessageBlendSettings},
persistent_transmission::{
PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream,
},
@ -33,22 +33,53 @@ use nomos_blend::{
use nomos_blend_message::mock::MockBlendMessage;
use rand::SeedableRng;
use rand_chacha::ChaCha12Rng;
use scheduler::{Interval, TemporalRelease};
use scheduler::{Interval, TemporalScheduler};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use state::BlendnodeState;
use std::{pin::pin, task::Poll, time::Duration};
use stream_wrapper::CrossbeamReceiverStream;
#[derive(Debug, Clone)]
pub struct BlendMessage(Vec<u8>);
#[derive(Debug, Clone, Serialize)]
pub struct BlendMessage {
message: Vec<u8>,
history: MessageHistory,
}
impl BlendMessage {
fn new(message: Vec<u8>, node_id: NodeId, step_id: usize, step_time: Duration) -> Self {
let mut history = MessageHistory::new();
history.add(node_id, step_id, step_time, MessageEventType::Created);
Self { message, history }
}
fn new_drop() -> Self {
Self {
message: Vec::new(),
history: MessageHistory::new(),
}
}
fn is_drop(&self) -> bool {
self.message.is_empty()
}
}
impl PayloadSize for BlendMessage {
fn size_bytes(&self) -> u32 {
2208
// payload: 32 KiB
// header encryption overhead: 133 bytes = 48 + 17 * max_blend_hops(=5)
// payload encryption overhaed: 16 bytes
// economic data overhead: 8043 bytes
40960
}
}
struct BlendOutgoingMessageWithHistory {
outgoing_message: BlendOutgoingMessage,
history: MessageHistory,
}
#[derive(Deserialize)]
pub struct BlendnodeSettings {
pub connected_peers: Vec<NodeId>,
@ -60,7 +91,12 @@ pub struct BlendnodeSettings {
pub persistent_transmission: PersistentTransmissionSettings,
pub message_blend: MessageBlendSettings<MockBlendMessage>,
pub cover_traffic_settings: CoverTrafficSettings,
pub membership: Vec<<MockBlendMessage as nomos_blend_message::BlendMessage>::PublicKey>,
pub membership: Vec<
nomos_blend::membership::Node<
NodeId,
<MockBlendMessage as nomos_blend_message::BlendMessage>::PublicKey,
>,
>,
}
type Sha256Hash = [u8; 32];
@ -70,6 +106,7 @@ pub struct BlendNode {
id: NodeId,
state: BlendnodeState,
settings: BlendnodeSettings,
step_time: Duration,
network_interface: InMemoryNetworkInterface<BlendMessage>,
message_cache: TimedCache<Sha256Hash, ()>,
@ -77,23 +114,17 @@ pub struct BlendNode {
data_msg_lottery_interval: Interval,
data_msg_lottery: StakeLottery<ChaCha12Rng>,
persistent_sender: channel::Sender<Vec<u8>>,
persistent_sender: channel::Sender<BlendMessage>,
persistent_update_time_sender: channel::Sender<Duration>,
persistent_transmission_messages: PersistentTransmissionStream<
CrossbeamReceiverStream<Vec<u8>>,
ChaCha12Rng,
MockBlendMessage,
Interval,
>,
crypto_processor: CryptographicProcessor<ChaCha12Rng, MockBlendMessage>,
blend_sender: channel::Sender<Vec<u8>>,
blend_update_time_sender: channel::Sender<Duration>,
blend_messages: MessageBlendStream<
CrossbeamReceiverStream<Vec<u8>>,
ChaCha12Rng,
MockBlendMessage,
TemporalRelease,
>,
persistent_transmission_messages:
PersistentTransmissionStream<CrossbeamReceiverStream<BlendMessage>, ChaCha12Rng, Interval>,
crypto_processor: CryptographicProcessor<NodeId, ChaCha12Rng, MockBlendMessage>,
temporal_sender: channel::Sender<BlendOutgoingMessageWithHistory>,
temporal_update_time_sender: channel::Sender<Duration>,
temporal_processor_messages:
TemporalStream<CrossbeamReceiverStream<BlendOutgoingMessageWithHistory>, TemporalScheduler>,
epoch_update_sender: channel::Sender<Duration>,
slot_update_sender: channel::Sender<Duration>,
cover_traffic: CoverTraffic<Epoch, Slot, MockBlendMessage>,
@ -103,6 +134,7 @@ impl BlendNode {
pub fn new(
id: NodeId,
settings: BlendnodeSettings,
step_time: Duration,
network_interface: InMemoryNetworkInterface<BlendMessage>,
) -> Self {
let mut rng_generator = ChaCha12Rng::seed_from_u64(settings.seed);
@ -120,7 +152,7 @@ impl BlendNode {
);
// Init Tier-1: Persistent transmission
let (persistent_sender, persistent_receiver) = channel::unbounded();
let (persistent_sender, persistent_receiver) = channel::unbounded::<BlendMessage>();
let (persistent_update_time_sender, persistent_update_time_receiver) = channel::unbounded();
let persistent_transmission_messages = CrossbeamReceiverStream::new(persistent_receiver)
.persistent_transmission(
@ -132,43 +164,28 @@ impl BlendNode {
),
persistent_update_time_receiver,
),
BlendMessage::new_drop(),
);
// Init Tier-2: message blend
let (blend_sender, blend_receiver) = channel::unbounded();
let (blend_update_time_sender, blend_update_time_receiver) = channel::unbounded();
let nodes: Vec<
nomos_blend::membership::Node<
<MockBlendMessage as nomos_blend_message::BlendMessage>::PublicKey,
>,
> = settings
.membership
.iter()
.map(|&public_key| nomos_blend::membership::Node {
address: Multiaddr::empty(),
public_key,
})
.collect();
let membership = Membership::<MockBlendMessage>::new(nodes, id.into());
// Init Tier-2: message blend: CryptographicProcessor and TemporalProcessor
let membership =
Membership::<NodeId, MockBlendMessage>::new(settings.membership.clone(), id.into());
let crypto_processor = CryptographicProcessor::new(
settings.message_blend.cryptographic_processor.clone(),
membership.clone(),
ChaCha12Rng::from_rng(&mut rng_generator).unwrap(),
);
let temporal_release = TemporalRelease::new(
ChaCha12Rng::from_rng(&mut rng_generator).unwrap(),
blend_update_time_receiver,
(
1,
settings.message_blend.temporal_processor.max_delay_seconds,
),
);
let blend_messages = CrossbeamReceiverStream::new(blend_receiver).blend(
settings.message_blend.clone(),
membership,
temporal_release,
ChaCha12Rng::from_rng(&mut rng_generator).unwrap(),
);
let (temporal_sender, temporal_receiver) = channel::unbounded();
let (temporal_update_time_sender, temporal_update_time_receiver) = channel::unbounded();
let temporal_processor_messages = CrossbeamReceiverStream::new(temporal_receiver)
.temporal_stream(TemporalScheduler::new(
ChaCha12Rng::from_rng(&mut rng_generator).unwrap(),
temporal_update_time_receiver,
(
0,
settings.message_blend.temporal_processor.max_delay_seconds,
),
));
// tier 3 cover traffic
let (epoch_update_sender, epoch_updater_update_receiver) = channel::unbounded();
@ -185,6 +202,7 @@ impl BlendNode {
Self {
id,
step_time,
network_interface,
// We're not coupling this lifespan with the steps now, but it's okay
// We expected that a message will be delivered to most of nodes within 60s.
@ -194,6 +212,8 @@ impl BlendNode {
node_id: id,
step_id: 0,
num_messages_fully_unwrapped: 0,
cur_num_persistent_scheduled: 0,
cur_num_temporal_scheduled: 0,
},
data_msg_lottery_update_time_sender,
data_msg_lottery_interval,
@ -202,37 +222,28 @@ impl BlendNode {
persistent_update_time_sender,
persistent_transmission_messages,
crypto_processor,
blend_sender,
blend_update_time_sender,
blend_messages,
temporal_sender,
temporal_update_time_sender,
temporal_processor_messages,
epoch_update_sender,
slot_update_sender,
cover_traffic,
}
}
fn forward(
&mut self,
message: BlendMessage,
exclude_node: Option<NodeId>,
log: Option<EmissionLog>,
) {
for (i, node_id) in self
fn forward(&mut self, message: BlendMessage, exclude_node: Option<NodeId>) {
for node_id in self
.settings
.connected_peers
.iter()
.filter(|&id| Some(*id) != exclude_node)
.enumerate()
{
if i == 0 {
if let Some(log) = &log {
Self::log_emission(log);
}
}
self.network_interface
.send_message(*node_id, message.clone())
let mut message = message.clone();
self.record_network_sent_event(&mut message.history, *node_id);
self.network_interface.send_message(*node_id, message)
}
self.message_cache.cache_set(Self::sha256(&message.0), ());
self.message_cache
.cache_set(Self::sha256(&message.message), ());
}
fn receive(&mut self) -> Vec<NetworkMessage<BlendMessage>> {
@ -242,7 +253,7 @@ impl BlendNode {
// Retain only messages that have not been seen before
.filter(|msg| {
self.message_cache
.cache_set(Self::sha256(&msg.payload().0), ())
.cache_set(Self::sha256(&msg.payload().message), ())
.is_none()
})
.collect()
@ -254,43 +265,116 @@ impl BlendNode {
hasher.finalize().into()
}
fn schedule_persistent_transmission(&mut self, mut message: BlendMessage) {
self.record_persistent_scheduled_event(&mut message.history);
self.persistent_sender.send(message).unwrap();
self.state.cur_num_persistent_scheduled += 1;
}
fn handle_incoming_message(&mut self, message: BlendMessage) {
match self.crypto_processor.unwrap_message(&message.message) {
Ok((unwrapped_message, fully_unwrapped)) => {
let temporal_message = if fully_unwrapped {
BlendOutgoingMessage::FullyUnwrapped(unwrapped_message)
} else {
BlendOutgoingMessage::Outbound(unwrapped_message)
};
self.schedule_temporal_processor(BlendOutgoingMessageWithHistory {
outgoing_message: temporal_message,
history: message.history,
});
}
Err(e) => {
tracing::debug!("Failed to unwrap message: {:?}", e);
}
}
}
fn schedule_temporal_processor(&mut self, mut message: BlendOutgoingMessageWithHistory) {
self.record_temporal_scheduled_event(&mut message.history);
self.temporal_sender.send(message).unwrap();
self.state.cur_num_temporal_scheduled += 1;
}
fn update_time(&mut self, elapsed: Duration) {
self.data_msg_lottery_update_time_sender
.send(elapsed)
.unwrap();
self.persistent_update_time_sender.send(elapsed).unwrap();
self.blend_update_time_sender.send(elapsed).unwrap();
self.temporal_update_time_sender.send(elapsed).unwrap();
self.epoch_update_sender.send(elapsed).unwrap();
self.slot_update_sender.send(elapsed).unwrap();
}
fn log_message_generated(&self, msg_type: &str, payload: &Payload) {
self.log_message(format!("{}MessageGenerated", msg_type).as_str(), payload);
fn log_message_fully_unwrapped(&self, payload: &Payload, history: MessageHistory) {
let total_duration = history.total_duration();
log!(
"MessageFullyUnwrapped",
MessageWithHistoryLog {
message: MessageLog {
payload_id: payload.id(),
step_id: self.state.step_id,
node_id: self.id.index(),
},
history,
total_duration,
}
);
}
fn log_message_fully_unwrapped(&self, payload: &Payload) {
self.log_message("MessageFullyUnwrapped", payload);
fn new_blend_message(&self, message: Vec<u8>) -> BlendMessage {
BlendMessage::new(message, self.id, self.state.step_id, self.step_time)
}
fn log_message(&self, tag: &str, payload: &Payload) {
let log = MessageLog {
payload_id: payload.id(),
step_id: self.state.step_id,
node_id: self.id.index(),
};
tracing::info!("{}: {}", tag, serde_json::to_string(&log).unwrap());
fn record_network_sent_event(&self, history: &mut MessageHistory, to: NodeId) {
self.record_message_event(history, MessageEventType::NetworkSent { to });
}
fn log_emission(log: &EmissionLog) {
tracing::info!("Emission: {}", serde_json::to_string(log).unwrap());
fn record_network_received_event(&self, history: &mut MessageHistory, from: NodeId) {
assert_eq!(
history.last_event_type(),
Some(&MessageEventType::NetworkSent { to: self.id })
);
self.record_message_event(history, MessageEventType::NetworkReceived { from });
}
fn new_emission_log(&self, emission_type: &str) -> EmissionLog {
EmissionLog {
emission_type: emission_type.to_string(),
step_id: self.state.step_id,
node_id: self.id.index(),
}
fn record_persistent_scheduled_event(&self, history: &mut MessageHistory) {
self.record_message_event(
history,
MessageEventType::PersistentTransmissionScheduled {
index: self.state.cur_num_persistent_scheduled,
},
);
}
fn record_persistent_released_event(&self, history: &mut MessageHistory) {
assert!(matches!(
history.last_event_type(),
Some(MessageEventType::PersistentTransmissionScheduled { .. })
));
self.record_message_event(history, MessageEventType::PersistentTransmissionReleased);
}
fn record_temporal_scheduled_event(&self, history: &mut MessageHistory) {
self.record_message_event(
history,
MessageEventType::TemporalProcessorScheduled {
index: self.state.cur_num_temporal_scheduled,
},
);
}
fn record_temporal_released_event(&self, history: &mut MessageHistory) {
assert!(matches!(
history.last_event_type(),
Some(MessageEventType::TemporalProcessorScheduled { .. })
));
self.record_message_event(history, MessageEventType::TemporalProcessorReleased);
}
fn record_message_event(&self, history: &mut MessageHistory, event_type: MessageEventType) {
history.add(self.id, self.state.step_id, self.step_time, event_type);
}
}
@ -316,38 +400,51 @@ impl Node for BlendNode {
if let Poll::Ready(Some(_)) = pin!(&mut self.data_msg_lottery_interval).poll_next(&mut cx) {
if self.data_msg_lottery.run() {
let payload = Payload::new();
self.log_message_generated("Data", &payload);
let message = self
.crypto_processor
.wrap_message(payload.as_bytes())
.unwrap();
self.persistent_sender.send(message).unwrap();
self.schedule_persistent_transmission(self.new_blend_message(message));
}
}
// Handle incoming messages
for network_message in self.receive() {
for mut network_message in self.receive() {
self.record_network_received_event(
&mut network_message.payload.history,
network_message.from,
);
if network_message.payload().is_drop() {
continue;
}
self.forward(
network_message.payload().clone(),
Some(network_message.from),
None,
);
self.blend_sender
.send(network_message.into_payload().0)
.unwrap();
self.handle_incoming_message(network_message.into_payload());
}
// Proceed message blend
if let Poll::Ready(Some(msg)) = pin!(&mut self.blend_messages).poll_next(&mut cx) {
match msg {
BlendOutgoingMessage::Outbound(msg) => {
self.persistent_sender.send(msg).unwrap();
// Proceed temporal processor
if let Poll::Ready(Some(mut outgoing_msg_with_history)) =
pin!(&mut self.temporal_processor_messages).poll_next(&mut cx)
{
self.record_temporal_released_event(&mut outgoing_msg_with_history.history);
self.state.cur_num_temporal_scheduled -= 1;
// Proceed the message
match outgoing_msg_with_history.outgoing_message {
BlendOutgoingMessage::Outbound(message) => {
self.schedule_persistent_transmission(BlendMessage {
message,
history: outgoing_msg_with_history.history,
});
}
BlendOutgoingMessage::FullyUnwrapped(payload) => {
let payload = Payload::load(payload);
self.log_message_fully_unwrapped(&payload);
self.log_message_fully_unwrapped(&payload, outgoing_msg_with_history.history);
self.state.num_messages_fully_unwrapped += 1;
//TODO: create a tracing event
}
}
}
@ -355,23 +452,20 @@ impl Node for BlendNode {
// Generate a cover message probabilistically
if let Poll::Ready(Some(_)) = pin!(&mut self.cover_traffic).poll_next(&mut cx) {
let payload = Payload::new();
self.log_message_generated("Cover", &payload);
let message = self
.crypto_processor
.wrap_message(payload.as_bytes())
.unwrap();
self.persistent_sender.send(message).unwrap();
self.schedule_persistent_transmission(self.new_blend_message(message));
}
// Proceed persistent transmission
if let Poll::Ready(Some(msg)) =
if let Poll::Ready(Some(mut msg)) =
pin!(&mut self.persistent_transmission_messages).poll_next(&mut cx)
{
self.forward(
BlendMessage(msg),
None,
Some(self.new_emission_log("FromPersistent")),
);
self.record_persistent_released_event(&mut msg.history);
self.state.cur_num_persistent_scheduled -= 1;
self.forward(msg, None);
}
self.state.step_id += 1;
@ -388,16 +482,17 @@ impl Node for BlendNode {
}
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize)]
struct MessageLog {
payload_id: PayloadId,
step_id: usize,
node_id: usize,
}
#[derive(Debug, Serialize, Deserialize)]
struct EmissionLog {
emission_type: String,
step_id: usize,
node_id: usize,
#[derive(Debug, Serialize)]
struct MessageWithHistoryLog {
message: MessageLog,
history: MessageHistory,
#[serde(serialize_with = "duration_as_millis")]
total_duration: Duration,
}

View File

@ -44,14 +44,14 @@ impl Stream for Interval {
}
}
pub struct TemporalRelease {
pub struct TemporalScheduler {
random_sleeps: Box<dyn Iterator<Item = Duration> + Send + Sync + 'static>,
elapsed: Duration,
current_sleep: Duration,
update_time: channel::Receiver<Duration>,
}
impl TemporalRelease {
impl TemporalScheduler {
pub fn new<Rng: RngCore + Send + Sync + 'static>(
mut rng: Rng,
update_time: channel::Receiver<Duration>,
@ -80,7 +80,7 @@ impl TemporalRelease {
}
}
impl Stream for TemporalRelease {
impl Stream for TemporalScheduler {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@ -134,7 +134,7 @@ mod tests {
fn temporal_release_update() {
let (_tx, rx) = channel::unbounded();
let mut temporal_release =
TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1));
TemporalScheduler::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1));
assert!(!temporal_release.update(Duration::from_secs(0)));
assert!(!temporal_release.update(Duration::from_millis(999)));
@ -149,7 +149,7 @@ mod tests {
let (tx, rx) = channel::unbounded();
let mut temporal_release =
TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1));
TemporalScheduler::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1));
tx.send(Duration::from_secs(0)).unwrap();
assert_eq!(temporal_release.poll_next_unpin(&mut cx), Poll::Pending);

View File

@ -15,6 +15,8 @@ pub struct BlendnodeState {
pub node_id: NodeId,
pub step_id: usize,
pub num_messages_fully_unwrapped: usize,
pub cur_num_persistent_scheduled: usize,
pub cur_num_temporal_scheduled: usize,
}
#[derive(Serialize)]

View File

@ -1,6 +1,12 @@
use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
time::Duration,
};
use netrunner::node::{NodeId, NodeIdExt};
use netrunner::{
network::regions::RegionsData,
node::{NodeId, NodeIdExt},
};
use rand::{seq::SliceRandom, RngCore};
#[derive(Clone)]
@ -121,6 +127,28 @@ impl Topology {
hop_count
}
pub fn conn_latency_distribution(
&self,
regions_data: &RegionsData,
) -> HashMap<Duration, usize> {
// Initialize a distribution
let distribution = regions_data
.region_network_behaviour
.values()
.map(|behaviour| (behaviour.delay(), 0))
.collect();
// Populate the distribution
self.0.iter().fold(distribution, |mut acc, (node, peers)| {
let region_a = regions_data.node_region(*node);
peers.iter().for_each(|peer| {
let region_b = regions_data.node_region(*peer);
let behaviour = regions_data.network_behaviour_between_regions(region_a, region_b);
acc.entry(behaviour.delay()).and_modify(|count| *count += 1);
});
acc
})
}
pub fn get(&self, node: &NodeId) -> Option<&HashSet<NodeId>> {
self.0.get(node)
}

View File

@ -618,16 +618,16 @@ mod tests {
let node_c = NodeId::from_index(2);
let regions = HashMap::from([
(Region::Asia, vec![node_a, node_b]),
(Region::EastAsia, vec![node_a, node_b]),
(Region::Europe, vec![node_c]),
]);
let behaviour = HashMap::from([
(
NetworkBehaviourKey::new(Region::Asia, Region::Asia),
NetworkBehaviourKey::new(Region::EastAsia, Region::EastAsia),
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
),
(
NetworkBehaviourKey::new(Region::Asia, Region::Europe),
NetworkBehaviourKey::new(Region::EastAsia, Region::Europe),
NetworkBehaviour::new(Duration::from_millis(500), 0.0),
),
(

View File

@ -10,9 +10,13 @@ use super::{NetworkBehaviourKey, NetworkSettings};
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum Region {
NorthAmerica,
NorthAmericaWest,
NorthAmericaCentral,
NorthAmericaEast,
Europe,
Asia,
NorthernEurope,
EastAsia,
SoutheastAsia,
Africa,
SouthAmerica,
Australia,
@ -21,9 +25,13 @@ pub enum Region {
impl core::fmt::Display for Region {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let s = match self {
Self::NorthAmerica => "NorthAmerica",
Self::NorthAmericaWest => "NorthAmericaWest",
Self::NorthAmericaCentral => "NorthAmericaCentral",
Self::NorthAmericaEast => "NorthAmericaEast",
Self::Europe => "Europe",
Self::Asia => "Asia",
Self::NorthernEurope => "NorthernEurope",
Self::EastAsia => "EastAsia",
Self::SoutheastAsia => "SoutheastAsia",
Self::Africa => "Africa",
Self::SouthAmerica => "SouthAmerica",
Self::Australia => "Australia",
@ -42,9 +50,13 @@ impl FromStr for Region {
.replace(['-', '_', ' '], "")
.as_str()
{
"northamerica" | "na" => Ok(Self::NorthAmerica),
"northamericawest" | "naw" => Ok(Self::NorthAmericaWest),
"northamericacentral" | "nac" => Ok(Self::NorthAmericaCentral),
"northamericaeast" | "nae" => Ok(Self::NorthAmericaEast),
"europe" | "eu" => Ok(Self::Europe),
"asia" | "as" => Ok(Self::Asia),
"northerneurope" | "neu" => Ok(Self::NorthernEurope),
"eastasia" | "eas" => Ok(Self::EastAsia),
"southeastasia" | "seas" => Ok(Self::SoutheastAsia),
"africa" | "af" => Ok(Self::Africa),
"southamerica" | "sa" => Ok(Self::SouthAmerica),
"australia" | "au" => Ok(Self::Australia),
@ -56,9 +68,13 @@ impl FromStr for Region {
impl Serialize for Region {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let s = match self {
Self::NorthAmerica => "North America",
Self::NorthAmericaWest => "North America West",
Self::NorthAmericaCentral => "North America Central",
Self::NorthAmericaEast => "North America East",
Self::Europe => "Europe",
Self::Asia => "Asia",
Self::NorthernEurope => "Northern Europe",
Self::EastAsia => "EastAsia",
Self::SoutheastAsia => "Southeast Asia",
Self::Africa => "Africa",
Self::SouthAmerica => "South America",
Self::Australia => "Australia",
@ -105,6 +121,14 @@ impl RegionsData {
pub fn network_behaviour(&self, node_a: NodeId, node_b: NodeId) -> &NetworkBehaviour {
let region_a = self.node_region[&node_a];
let region_b = self.node_region[&node_b];
self.network_behaviour_between_regions(region_a, region_b)
}
pub fn network_behaviour_between_regions(
&self,
region_a: Region,
region_b: Region,
) -> &NetworkBehaviour {
let k = NetworkBehaviourKey::new(region_a, region_b);
let k_rev = NetworkBehaviourKey::new(region_b, region_a);
self.region_network_behaviour
@ -207,9 +231,13 @@ mod tests {
.collect::<Vec<NodeId>>();
let available_regions = [
Region::NorthAmerica,
Region::NorthAmericaWest,
Region::NorthAmericaCentral,
Region::NorthAmericaEast,
Region::Europe,
Region::Asia,
Region::NorthernEurope,
Region::EastAsia,
Region::SoutheastAsia,
Region::Africa,
Region::SouthAmerica,
Region::Australia,

View File

@ -114,7 +114,7 @@ where
}
#[cfg(test)]
mod tests {
pub(crate) mod tests {
use std::{collections::HashMap, time::Duration};
use crate::{
@ -174,43 +174,19 @@ mod tests {
RegionsData {
regions: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
let region = region_from_index(idx);
(region, vec![NodeId::from_index(idx)])
})
.collect(),
node_region: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
let region = region_from_index(idx);
(NodeId::from_index(idx), region)
})
.collect(),
region_network_behaviour: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
let region = region_from_index(idx);
(
NetworkBehaviourKey::new(region, region),
NetworkBehaviour {
@ -231,4 +207,20 @@ mod tests {
.stop_after(Duration::from_millis(100))
.unwrap();
}
pub(crate) fn region_from_index(idx: usize) -> Region {
match idx % 10 {
0 => Region::Europe,
1 => Region::NorthernEurope,
2 => Region::NorthAmericaWest,
3 => Region::NorthAmericaCentral,
4 => Region::NorthAmericaEast,
5 => Region::SouthAmerica,
6 => Region::EastAsia,
7 => Region::SoutheastAsia,
8 => Region::Africa,
9 => Region::Australia,
_ => unreachable!(),
}
}
}

View File

@ -115,6 +115,7 @@ mod tests {
},
output_processors::OutData,
runner::SimulationRunner,
streaming::io::tests::region_from_index,
warding::SimulationState,
};
@ -160,43 +161,19 @@ mod tests {
RegionsData {
regions: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
let region = region_from_index(idx);
(region, vec![NodeId::from_index(idx)])
})
.collect(),
node_region: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
let region = region_from_index(idx);
(NodeId::from_index(idx), region)
})
.collect(),
region_network_behaviour: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
let region = region_from_index(idx);
(
NetworkBehaviourKey::new(region, region),
NetworkBehaviour {

View File

@ -116,6 +116,7 @@ mod tests {
},
output_processors::OutData,
runner::SimulationRunner,
streaming::io::tests::region_from_index,
warding::SimulationState,
};
@ -156,43 +157,19 @@ mod tests {
RegionsData {
regions: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
let region = region_from_index(idx);
(region, vec![NodeId::from_index(idx)])
})
.collect(),
node_region: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
let region = region_from_index(idx);
(NodeId::from_index(idx), region)
})
.collect(),
region_network_behaviour: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
let region = region_from_index(idx);
(
NetworkBehaviourKey::new(region, region),
NetworkBehaviour {