combine message latency and connection latency analysis

This commit is contained in:
Youngjoon Lee 2025-02-10 11:10:00 +09:00
parent e96fc752ed
commit 4448c93356
No known key found for this signature in database
GPG Key ID: 303963A54A81DD4D
4 changed files with 98 additions and 124 deletions

View File

@ -1,89 +0,0 @@
use std::{error::Error, ops::Mul, path::PathBuf, time::Duration};
use std::{
collections::HashMap,
fs::File,
io::{BufRead, BufReader},
};
use netrunner::node::NodeId;
use polars::prelude::NamedFrom;
use polars::series::Series;
use serde::{Deserialize, Serialize};
use crate::node::blend::log::TopicLog;
use crate::node::blend::message::{MessageEvent, MessageEventType, PayloadId};
use super::message_latency::quantile;
pub fn analyze_connection_latency(
log_file: PathBuf,
step_duration: Duration,
) -> Result<(), Box<dyn Error>> {
let file = File::open(log_file)?;
let reader = BufReader::new(file);
let mut sent_events: HashMap<(PayloadId, NodeId, NodeId), usize> = HashMap::new();
let mut latencies_ms: Vec<i64> = Vec::new();
for line in reader.lines() {
let line = line?;
if let Ok(topic_log) = serde_json::from_str::<TopicLog<MessageEvent>>(&line) {
assert_eq!(topic_log.topic, "MessageEvent");
let event = topic_log.message;
match event.event_type {
MessageEventType::NetworkSent { to } => {
sent_events
.entry((event.payload_id, event.node_id, to))
.or_insert(event.step_id);
}
MessageEventType::NetworkReceived { from } => {
if let Some(sent_step_id) =
sent_events.remove(&(event.payload_id, from, event.node_id))
{
let latency = step_duration
.mul((event.step_id - sent_step_id).try_into().unwrap())
.as_millis()
.try_into()
.unwrap();
latencies_ms.push(latency);
}
}
_ => {
continue;
}
}
}
}
let series = Series::new("latencies".into(), latencies_ms);
let series = Output::new(&series);
println!("{}", serde_json::to_string(&series).unwrap());
Ok(())
}
#[derive(Serialize, Deserialize, Debug)]
struct Output {
count: usize,
min: i64,
q1: f64,
avg: f64,
med: f64,
q3: f64,
max: i64,
}
impl Output {
fn new(series: &Series) -> Self {
Self {
count: series.len(),
min: series.min::<i64>().unwrap().unwrap(),
q1: quantile(series, 0.25),
avg: series.mean().unwrap(),
med: series.median().unwrap(),
q3: quantile(series, 0.75),
max: series.max::<i64>().unwrap().unwrap(),
}
}
}

View File

@ -7,6 +7,7 @@ use std::{
io::{BufRead, BufReader},
};
use netrunner::node::NodeId;
use polars::prelude::{AnyValue, NamedFrom, QuantileMethod, Scalar};
use polars::series::Series;
use serde::{Deserialize, Serialize};
@ -14,10 +15,25 @@ use serde::{Deserialize, Serialize};
use crate::node::blend::log::TopicLog;
use crate::node::blend::message::{MessageEvent, MessageEventType, PayloadId};
pub fn analyze_message_latency(
pub fn analyze_latency(log_file: PathBuf, step_duration: Duration) -> Result<(), Box<dyn Error>> {
let output = Output {
message: analyze_message_latency(log_file.clone(), step_duration)?,
connection: analyze_connection_latency(log_file.clone(), step_duration)?,
};
println!("{}", serde_json::to_string(&output)?);
Ok(())
}
#[derive(Serialize, Deserialize, Debug)]
struct Output {
message: MessageLatency,
connection: ConnectionLatency,
}
fn analyze_message_latency(
log_file: PathBuf,
step_duration: Duration,
) -> Result<(), Box<dyn Error>> {
) -> Result<MessageLatency, Box<dyn Error>> {
let file = File::open(log_file)?;
let reader = BufReader::new(file);
@ -59,14 +75,11 @@ pub fn analyze_message_latency(
}
let series = Series::new("latencies".into(), latencies_ms);
let series = Output::new(&series, &latency_to_message);
println!("{}", serde_json::to_string(&series).unwrap());
Ok(())
Ok(MessageLatency::new(&series, &latency_to_message))
}
#[derive(Serialize, Deserialize, Debug)]
struct Output {
struct MessageLatency {
count: usize,
min: i64,
min_payload_id: PayloadId,
@ -78,7 +91,7 @@ struct Output {
max_payload_id: PayloadId,
}
impl Output {
impl MessageLatency {
fn new(series: &Series, latency_to_message: &HashMap<i64, PayloadId>) -> Self {
let min = series.min::<i64>().unwrap().unwrap();
let min_payload_id = latency_to_message.get(&min).unwrap().clone();
@ -98,7 +111,76 @@ impl Output {
}
}
pub(crate) fn quantile(series: &Series, quantile: f64) -> f64 {
fn analyze_connection_latency(
log_file: PathBuf,
step_duration: Duration,
) -> Result<ConnectionLatency, Box<dyn Error>> {
let file = File::open(log_file)?;
let reader = BufReader::new(file);
let mut sent_events: HashMap<(PayloadId, NodeId, NodeId), usize> = HashMap::new();
let mut latencies_ms: Vec<i64> = Vec::new();
for line in reader.lines() {
let line = line?;
if let Ok(topic_log) = serde_json::from_str::<TopicLog<MessageEvent>>(&line) {
assert_eq!(topic_log.topic, "MessageEvent");
let event = topic_log.message;
match event.event_type {
MessageEventType::NetworkSent { to } => {
sent_events
.entry((event.payload_id, event.node_id, to))
.or_insert(event.step_id);
}
MessageEventType::NetworkReceived { from } => {
if let Some(sent_step_id) =
sent_events.remove(&(event.payload_id, from, event.node_id))
{
let latency = step_duration
.mul((event.step_id - sent_step_id).try_into().unwrap())
.as_millis()
.try_into()
.unwrap();
latencies_ms.push(latency);
}
}
_ => {
continue;
}
}
}
}
let series = Series::new("latencies".into(), latencies_ms);
Ok(ConnectionLatency::new(&series))
}
#[derive(Serialize, Deserialize, Debug)]
struct ConnectionLatency {
count: usize,
min: i64,
q1: f64,
avg: f64,
med: f64,
q3: f64,
max: i64,
}
impl ConnectionLatency {
fn new(series: &Series) -> Self {
Self {
count: series.len(),
min: series.min::<i64>().unwrap().unwrap(),
q1: quantile(series, 0.25),
avg: series.mean().unwrap(),
med: series.median().unwrap(),
q3: quantile(series, 0.75),
max: series.max::<i64>().unwrap().unwrap(),
}
}
}
fn quantile(series: &Series, quantile: f64) -> f64 {
f64_from_scalar(
&series
.quantile_reduce(quantile, QuantileMethod::Linear)

View File

@ -1,3 +1,2 @@
pub mod conn_latency;
pub mod latency;
pub mod message_history;
pub mod message_latency;

View File

@ -7,9 +7,8 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
// crates
use crate::node::blend::state::{BlendnodeRecord, BlendnodeState};
use crate::node::blend::{BlendnodeSettings, SimMessage};
use analysis::conn_latency::analyze_connection_latency;
use analysis::latency::analyze_latency;
use analysis::message_history::analyze_message_history;
use analysis::message_latency::analyze_message_latency;
use anyhow::Ok;
use clap::{Parser, Subcommand};
use crossbeam::channel;
@ -62,12 +61,10 @@ enum Commands {
#[derive(Subcommand)]
enum AnalyzeCommands {
/// Analyze the latency of the messages fully unwrapped
MessageLatency(MessageLatencyApp),
/// Analyze the latency of the messages and connections
Latency(LatencyApp),
/// Analyze the history of a message
MessageHistory(MessageHistoryApp),
/// Analyze connection latency
ConnectionLatency(ConnectionLatencyApp),
}
/// Main simulation wrapper
@ -333,7 +330,7 @@ struct ConnLatencyDistributionLog {
}
#[derive(Parser)]
struct MessageLatencyApp {
struct LatencyApp {
#[clap(long, short)]
log_file: PathBuf,
#[clap(long, short, value_parser = humantime::parse_duration)]
@ -350,14 +347,6 @@ struct MessageHistoryApp {
payload_id: PayloadId,
}
#[derive(Parser)]
struct ConnectionLatencyApp {
#[clap(long, short)]
log_file: PathBuf,
#[clap(long, short, value_parser = humantime::parse_duration)]
step_duration: Duration,
}
fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
match cli.command {
@ -372,8 +361,8 @@ fn main() -> anyhow::Result<()> {
Ok(())
}
Commands::Analyze { command } => match command {
AnalyzeCommands::MessageLatency(app) => {
if let Err(e) = analyze_message_latency(app.log_file, app.step_duration) {
AnalyzeCommands::Latency(app) => {
if let Err(e) = analyze_latency(app.log_file, app.step_duration) {
tracing::error!("error: {}", e);
std::process::exit(1);
}
@ -388,13 +377,6 @@ fn main() -> anyhow::Result<()> {
}
Ok(())
}
AnalyzeCommands::ConnectionLatency(app) => {
if let Err(e) = analyze_connection_latency(app.log_file, app.step_duration) {
tracing::error!("error: {}", e);
std::process::exit(1);
}
Ok(())
}
},
}
}