feat: add in-place merging for properly sorting pod logs when dumping from vector flat files

This commit is contained in:
gmega 2025-04-24 11:03:33 -03:00
parent 223276f7c8
commit cd77b20a82
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
3 changed files with 183 additions and 23 deletions

View File

@ -188,7 +188,9 @@ def _configure_vector_source(args):
return ChainedLogSource(
[
VectorFlatFileSource(
app_name="codex-benchmarks", file=source_file.open(encoding="utf-8")
app_name="codex-benchmarks",
file=source_file.open(encoding="utf-8"),
sorted=args.chronological,
)
for source_file in args.source_file
]
@ -321,6 +323,9 @@ def main():
vector_source.add_argument(
"source_file", type=Path, help="Vector log file to parse from.", nargs="+"
)
vector_source.add_argument(
"--chronological", action="store_true", help="Sort logs chronologically (slow)."
)
vector_source.set_defaults(source=lambda args, _: _configure_vector_source(args))

View File

@ -1,7 +1,10 @@
import json
from io import StringIO
from typing import Tuple
from benchmarks.logging.sources.vector_flat_file import VectorFlatFileSource
from benchmarks.logging.sources.vector_flat_file import VectorFlatFileSource, PodLog
from benchmarks.tests.utils import make_jsonl
from datetime import datetime
EXPERIMENT_LOG = [
{
@ -15,6 +18,7 @@ EXPERIMENT_LOG = [
"pod_name": "p1",
},
"message": "m1",
"timestamp": "2025-04-22T13:37:43.001886404Z",
},
{
"kubernetes": {
@ -27,6 +31,7 @@ EXPERIMENT_LOG = [
"pod_name": "p2",
},
"message": "m2",
"timestamp": "2025-04-22T12:37:43.001886404Z",
},
{
"kubernetes": {
@ -39,6 +44,7 @@ EXPERIMENT_LOG = [
"pod_name": "p1",
},
"message": "m3",
"timestamp": "2025-04-22T13:38:43.001886404Z",
},
]
@ -65,7 +71,7 @@ def test_should_return_empty_when_no_matching_experiment_exists():
app_name="codex-benchmarks",
)
assert list(source.logs("e3", "g1736425800")) == []
assert list(source.logs("g1736425800", "e3")) == []
def test_should_retrieve_events_for_an_entire_group():
@ -88,3 +94,38 @@ def test_should_return_all_existing_experiments_in_group():
)
assert list(extractor.experiments("g1736425800")) == ["e1", "e2"]
def test_should_read_pod_logs_in_order():
log_file = StringIO(make_jsonl(EXPERIMENT_LOG))
log1 = PodLog("p1", log_file)
log2 = PodLog("p2", log_file)
def check(
value: Tuple[str, datetime], expected_message: str, expected_timestamp: str
):
assert json.loads(value[0])["message"] == expected_message
assert value[1] == datetime.fromisoformat(expected_timestamp)
assert log1.has_next()
check(next(log1), "m1", "2025-04-22T13:37:43.001886404Z")
check(next(log1), "m3", "2025-04-22T13:38:43.001886404Z")
assert not log1.has_next()
assert log2.has_next()
check(next(log2), "m2", "2025-04-22T12:37:43.001886404Z")
assert not log2.has_next()
def test_should_merge_pod_logs_by_timestamp_when_requested():
source = VectorFlatFileSource(
StringIO(make_jsonl(EXPERIMENT_LOG)),
app_name="codex-benchmarks",
sorted=True,
)
assert list(source.logs("g1736425800")) == [
("e1", "p2", "m2"),
("e1", "p1", "m1"),
("e2", "p1", "m3"),
]

View File

@ -1,21 +1,92 @@
import json
from collections.abc import Iterator
from datetime import datetime
from functools import total_ordering
from heapq import heapify, heappush, heappop
from json import JSONDecodeError
from typing import TextIO, Optional, Tuple
from typing import TextIO, Optional, Tuple, List, Callable
import logging
import re
from benchmarks.logging.sources.sources import LogSource, ExperimentId, NodeId, RawLine
logger = logging.getLogger(__name__)
_POD_NAME_REGEX = re.compile(r'"pod_name":"(?P<pod_name>[^"]+)"')
_TIMESTAMP_REGEX = re.compile(r'"timestamp":"(?P<timestamp>[^"]+)"')
@total_ordering
class PodLog(object):
""":class:`PodLog` allows us to iterate separately over the logs of the various pods even when they
are merged into the same file. This is useful when trying to sort the logs of a vector file dump as
those are guaranteed to be sorted per-pod, but not across pods."""
def __init__(self, pod_name: str, file: TextIO) -> None:
self.pod_name = pod_name
self.file = file
self.pointer: int = 0
self.next_line: Optional[Tuple[str, datetime]] = self._scan_next()
@property
def timestamp(self) -> datetime:
if not self.next_line:
raise ValueError("Cannot compare: log has run out of entries")
return self.next_line[1]
def has_next(self) -> bool:
"""Returns true if there are more logs to read for this pod."""
return self.next_line is not None
def __eq__(self, other: object) -> bool:
if not isinstance(other, PodLog):
return NotImplemented
return self.timestamp == other.timestamp
def __lt__(self, other: object) -> bool:
if not isinstance(other, PodLog):
return NotImplemented
return self.timestamp < other.timestamp
def __next__(self) -> Tuple[str, datetime]:
if self.next_line is None:
raise StopIteration()
value = self.next_line
self.next_line = self._scan_next()
return value
def _iter_file(self) -> Iterator[str]:
"""Iterates over the file, yielding lines."""
self.file.seek(self.pointer)
for line in iter(self.file.readline, ""):
self.pointer = self.file.tell()
yield line
def _scan_next(self) -> Optional[Tuple[str, datetime]]:
pod_name = f'"pod_name":"{self.pod_name}"'
for line in self._iter_file():
self.pointer = self.file.tell()
if pod_name not in line:
continue
timestamp = _TIMESTAMP_REGEX.search(line)
if not timestamp:
logger.error(f"Log line contains no timestamp {line}")
continue
return line, datetime.fromisoformat(timestamp.group("timestamp"))
return None
class VectorFlatFileSource(LogSource):
"""Log source for flat JSONL files produced by [Vector](https://vector.dev/). This is typically used when running
experiments locally within, say, Minikube or Kind."""
def __init__(self, file: TextIO, app_name: str):
def __init__(self, file: TextIO, app_name: str, sorted=False):
self.file = file
self.app_name = app_name
self.sorted = sorted
def __enter__(self):
return self
@ -48,6 +119,42 @@ class VectorFlatFileSource(LogSource):
seen.add(experiment_id)
yield experiment_id
def _sorted_logs(self, line_predicate: Callable[[str], bool]) -> Iterator[str]:
sources = [
source for source in self._pod_logs(line_predicate) if source.has_next()
]
heapify(sources)
while sources:
log = heappop(sources)
yield next(log)[0]
if log.has_next():
heappush(sources, log)
def _unsorted_logs(self, line_predicate: Callable[[str], bool]) -> Iterator[str]:
self.file.seek(0)
for line in self.file:
if not line_predicate(line):
continue
yield line
def _pod_logs(self, line_predicate: Callable[[str], bool]) -> List[PodLog]:
logger.info("Identifying pod logs.")
self.file.seek(0)
pod_logs = {}
for line in self.file:
if not line_predicate(line):
continue
match = _POD_NAME_REGEX.search(line)
if not match:
logger.error(f"Log line contains no pod name {line}")
continue
pod_name = match.group("pod_name")
if pod_name not in pod_logs:
logger.info(f"Pod found: {pod_name}")
pod_logs[pod_name] = PodLog(pod_name, self.file)
return list(pod_logs.values())
def logs(
self, group_id: str, experiment_id: Optional[str] = None
) -> Iterator[Tuple[ExperimentId, NodeId, RawLine]]:
@ -59,26 +166,33 @@ class VectorFlatFileSource(LogSource):
group_label = f'"app.kubernetes.io/part-of":"{group_id}"'
experiment_label = f'"app.kubernetes.io/instance":"{experiment_id}"'
self.file.seek(0)
for line in self.file:
# Does a cheaper match to avoid parsing every line.
if app_label in line and group_label in line:
if experiment_id is not None and experiment_label not in line:
continue
try:
parsed = json.loads(line)
except JSONDecodeError as err:
logger.error(
f"Failed to parse line from vector from source {line}", err
)
continue
def line_predicate(line: str) -> bool:
return (
app_label in line
and group_label in line
and (experiment_id is None or experiment_label in line)
)
k8s = parsed["kubernetes"]
yield (
k8s["pod_labels"]["app.kubernetes.io/instance"],
k8s["pod_name"],
parsed["message"],
logs = (
self._sorted_logs(line_predicate)
if self.sorted
else self._unsorted_logs(line_predicate)
)
for line in logs:
try:
parsed = json.loads(line)
except JSONDecodeError as err:
logger.error(
f"Failed to parse line from vector from source {line}", err
)
continue
k8s = parsed["kubernetes"]
yield (
k8s["pod_labels"]["app.kubernetes.io/instance"],
k8s["pod_name"],
parsed["message"],
)
def __str__(self):
return f"VectorFlatFileSource({self.app_name})"