millisec for detailed latencies

This commit is contained in:
Youngjoon Lee 2024-12-21 17:47:57 +09:00
parent 5fd6e06cca
commit fe022001da
No known key found for this signature in database
GPG Key ID: 303963A54A81DD4D
2 changed files with 54 additions and 35 deletions

View File

@ -121,14 +121,14 @@ csv_writer.writerow(
"median_latency",
"max_latency",
"min_latency_msg_id",
"min_latency_msg_persistent_latencies",
"min_latency_msg_persistent_latency_ms",
"min_latency_msg_persistent_queue_sizes",
"min_latency_msg_temporal_latencies",
"min_latency_msg_temporal_latency_ms",
"min_latency_msg_temporal_queue_sizes",
"max_latency_msg_id",
"max_latency_msg_persistent_latencies",
"max_latency_msg_persistent_latency_ms",
"max_latency_msg_persistent_queue_sizes",
"max_latency_msg_temporal_latencies",
"max_latency_msg_temporal_latency_ms",
"max_latency_msg_temporal_queue_sizes",
"min_bandwidth_kbps",
"avg_bandwidth_kbps",
@ -162,26 +162,30 @@ for idx, log_path in enumerate(log_paths):
csv_row.append(float(latency_analysis.max_latency_ms) / 1000.0)
csv_row.append(latency_analysis.min_latency_analysis.message_id)
csv_row.append(
",".join(map(str, latency_analysis.min_latency_analysis.persistent_latencies))
",".join(
map(str, latency_analysis.min_latency_analysis.persistent_latencies_step)
)
)
csv_row.append(
",".join(map(str, latency_analysis.min_latency_analysis.persistent_queue_sizes))
)
csv_row.append(
",".join(map(str, latency_analysis.min_latency_analysis.temporal_latencies))
",".join(map(str, latency_analysis.min_latency_analysis.temporal_latencies_ms))
)
csv_row.append(
",".join(map(str, latency_analysis.min_latency_analysis.temporal_queue_sizes))
)
csv_row.append(latency_analysis.max_latency_analysis.message_id)
csv_row.append(
",".join(map(str, latency_analysis.max_latency_analysis.persistent_latencies))
",".join(
map(str, latency_analysis.max_latency_analysis.persistent_latencies_step)
)
)
csv_row.append(
",".join(map(str, latency_analysis.max_latency_analysis.persistent_queue_sizes))
)
csv_row.append(
",".join(map(str, latency_analysis.max_latency_analysis.temporal_latencies))
",".join(map(str, latency_analysis.max_latency_analysis.temporal_latencies_ms))
)
csv_row.append(
",".join(map(str, latency_analysis.max_latency_analysis.temporal_queue_sizes))

View File

@ -199,13 +199,17 @@ class LatencyAnalysis:
min_latency_steps = min_message.latency
assert min_latency_steps is not None
min_latency_ms = int(min_latency_steps * step_duration)
min_latency_analysis = MessageLatencyAnalysis.build(min_message, node_storage)
min_latency_analysis = MessageLatencyAnalysis.build(
min_message, node_storage, step_duration
)
max_message = max(complete_messages)
max_latency_steps = max_message.latency
assert max_latency_steps is not None
max_latency_ms = int(max_latency_steps * step_duration)
max_latency_analysis = MessageLatencyAnalysis.build(max_message, node_storage)
max_latency_analysis = MessageLatencyAnalysis.build(
max_message, node_storage, step_duration
)
avg_latency_steps = statistics.mean(complete_latencies)
avg_latency_ms = int(avg_latency_steps * step_duration)
@ -213,40 +217,47 @@ class LatencyAnalysis:
median_latency_ms = int(median_latency_steps * step_duration)
return cls(
total_messages,
total_complete_messages,
total_incomplete_messages,
min_latency_steps,
min_latency_ms,
min_latency_analysis,
max_latency_steps,
max_latency_ms,
max_latency_analysis,
avg_latency_steps,
avg_latency_ms,
median_latency_steps,
median_latency_ms,
total_messages=total_messages,
total_complete_messages=total_complete_messages,
total_incomplete_messages=total_incomplete_messages,
min_latency_steps=min_latency_steps,
min_latency_ms=min_latency_ms,
min_latency_analysis=min_latency_analysis,
max_latency_steps=max_latency_steps,
max_latency_ms=max_latency_ms,
max_latency_analysis=max_latency_analysis,
avg_latency_steps=avg_latency_steps,
avg_latency_ms=avg_latency_ms,
median_latency_steps=median_latency_steps,
median_latency_ms=median_latency_ms,
)
@dataclass
class MessageLatencyAnalysis:
message_id: str
persistent_latencies: list[int]
persistent_latencies_step: list[int]
persistent_latencies_ms: list[int]
persistent_queue_sizes: list[int]
temporal_latencies: list[int]
temporal_latencies_step: list[int]
temporal_latencies_ms: list[int]
temporal_queue_sizes: list[int]
@classmethod
def build(
cls, message: Message, node_storage: NodeStorage
cls,
message: Message,
node_storage: NodeStorage,
step_duration: int,
) -> "MessageLatencyAnalysis":
persistent_latencies = []
persistent_latencies_step = []
persistent_latencies_ms = []
persistent_queue_sizes = []
for latency in message.persistent_transmission_latencies:
if latency.steps is None:
continue
persistent_latencies.append(latency.steps)
persistent_latencies_step.append(latency.steps)
persistent_latencies_ms.append(latency.steps * step_duration)
persistent_queue_sizes.append(
next(
(
@ -260,12 +271,14 @@ class MessageLatencyAnalysis:
)
)
temporal_latencies = []
temporal_latencies_step = []
temporal_latencies_ms = []
temporal_queue_sizes = []
for latency in message.temporal_processor_latencies:
if latency.steps is None:
continue
temporal_latencies.append(latency.steps)
temporal_latencies_step.append(latency.steps)
temporal_latencies_ms.append(latency.steps * step_duration)
temporal_queue_sizes.append(
next(
(
@ -280,11 +293,13 @@ class MessageLatencyAnalysis:
)
return cls(
message.id,
persistent_latencies,
persistent_queue_sizes,
temporal_latencies,
temporal_queue_sizes,
message_id=message.id,
persistent_latencies_step=persistent_latencies_step,
persistent_latencies_ms=persistent_latencies_ms,
persistent_queue_sizes=persistent_queue_sizes,
temporal_latencies_step=temporal_latencies_step,
temporal_latencies_ms=temporal_latencies_ms,
temporal_queue_sizes=temporal_queue_sizes,
)