From cd77b20a82ea5c2c7ccea6d9d37ae10fda9b9949 Mon Sep 17 00:00:00 2001 From: gmega Date: Thu, 24 Apr 2025 11:03:33 -0300 Subject: [PATCH] feat: add in-place merging for properly sorting pod logs when dumping from vector flat files --- benchmarks/cli.py | 7 +- .../sources/tests/test_vector_flat_file.py | 45 ++++- .../logging/sources/vector_flat_file.py | 154 +++++++++++++++--- 3 files changed, 183 insertions(+), 23 deletions(-) diff --git a/benchmarks/cli.py b/benchmarks/cli.py index 2515f35..b6d38a1 100644 --- a/benchmarks/cli.py +++ b/benchmarks/cli.py @@ -188,7 +188,9 @@ def _configure_vector_source(args): return ChainedLogSource( [ VectorFlatFileSource( - app_name="codex-benchmarks", file=source_file.open(encoding="utf-8") + app_name="codex-benchmarks", + file=source_file.open(encoding="utf-8"), + sorted=args.chronological, ) for source_file in args.source_file ] @@ -321,6 +323,9 @@ def main(): vector_source.add_argument( "source_file", type=Path, help="Vector log file to parse from.", nargs="+" ) + vector_source.add_argument( + "--chronological", action="store_true", help="Sort logs chronologically (slow)." + ) vector_source.set_defaults(source=lambda args, _: _configure_vector_source(args)) diff --git a/benchmarks/logging/sources/tests/test_vector_flat_file.py b/benchmarks/logging/sources/tests/test_vector_flat_file.py index fa45a4b..3635d4e 100644 --- a/benchmarks/logging/sources/tests/test_vector_flat_file.py +++ b/benchmarks/logging/sources/tests/test_vector_flat_file.py @@ -1,7 +1,10 @@ +import json from io import StringIO +from typing import Tuple -from benchmarks.logging.sources.vector_flat_file import VectorFlatFileSource +from benchmarks.logging.sources.vector_flat_file import VectorFlatFileSource, PodLog from benchmarks.tests.utils import make_jsonl +from datetime import datetime EXPERIMENT_LOG = [ { @@ -15,6 +18,7 @@ EXPERIMENT_LOG = [ "pod_name": "p1", }, "message": "m1", + "timestamp": "2025-04-22T13:37:43.001886404Z", }, { "kubernetes": { @@ -27,6 +31,7 @@ EXPERIMENT_LOG = [ "pod_name": "p2", }, "message": "m2", + "timestamp": "2025-04-22T12:37:43.001886404Z", }, { "kubernetes": { @@ -39,6 +44,7 @@ EXPERIMENT_LOG = [ "pod_name": "p1", }, "message": "m3", + "timestamp": "2025-04-22T13:38:43.001886404Z", }, ] @@ -65,7 +71,7 @@ def test_should_return_empty_when_no_matching_experiment_exists(): app_name="codex-benchmarks", ) - assert list(source.logs("e3", "g1736425800")) == [] + assert list(source.logs("g1736425800", "e3")) == [] def test_should_retrieve_events_for_an_entire_group(): @@ -88,3 +94,38 @@ def test_should_return_all_existing_experiments_in_group(): ) assert list(extractor.experiments("g1736425800")) == ["e1", "e2"] + + +def test_should_read_pod_logs_in_order(): + log_file = StringIO(make_jsonl(EXPERIMENT_LOG)) + log1 = PodLog("p1", log_file) + log2 = PodLog("p2", log_file) + + def check( + value: Tuple[str, datetime], expected_message: str, expected_timestamp: str + ): + assert json.loads(value[0])["message"] == expected_message + assert value[1] == datetime.fromisoformat(expected_timestamp) + + assert log1.has_next() + check(next(log1), "m1", "2025-04-22T13:37:43.001886404Z") + check(next(log1), "m3", "2025-04-22T13:38:43.001886404Z") + assert not log1.has_next() + + assert log2.has_next() + check(next(log2), "m2", "2025-04-22T12:37:43.001886404Z") + assert not log2.has_next() + + +def test_should_merge_pod_logs_by_timestamp_when_requested(): + source = VectorFlatFileSource( + StringIO(make_jsonl(EXPERIMENT_LOG)), + app_name="codex-benchmarks", + sorted=True, + ) + + assert list(source.logs("g1736425800")) == [ + ("e1", "p2", "m2"), + ("e1", "p1", "m1"), + ("e2", "p1", "m3"), + ] diff --git a/benchmarks/logging/sources/vector_flat_file.py b/benchmarks/logging/sources/vector_flat_file.py index 9f74f4c..990331c 100644 --- a/benchmarks/logging/sources/vector_flat_file.py +++ b/benchmarks/logging/sources/vector_flat_file.py @@ -1,21 +1,92 @@ import json from collections.abc import Iterator +from datetime import datetime +from functools import total_ordering +from heapq import heapify, heappush, heappop from json import JSONDecodeError -from typing import TextIO, Optional, Tuple +from typing import TextIO, Optional, Tuple, List, Callable import logging +import re from benchmarks.logging.sources.sources import LogSource, ExperimentId, NodeId, RawLine logger = logging.getLogger(__name__) +_POD_NAME_REGEX = re.compile(r'"pod_name":"(?P[^"]+)"') +_TIMESTAMP_REGEX = re.compile(r'"timestamp":"(?P[^"]+)"') + + +@total_ordering +class PodLog(object): + """:class:`PodLog` allows us to iterate separately over the logs of the various pods even when they + are merged into the same file. This is useful when trying to sort the logs of a vector file dump as + those are guaranteed to be sorted per-pod, but not across pods.""" + + def __init__(self, pod_name: str, file: TextIO) -> None: + self.pod_name = pod_name + self.file = file + self.pointer: int = 0 + + self.next_line: Optional[Tuple[str, datetime]] = self._scan_next() + + @property + def timestamp(self) -> datetime: + if not self.next_line: + raise ValueError("Cannot compare: log has run out of entries") + return self.next_line[1] + + def has_next(self) -> bool: + """Returns true if there are more logs to read for this pod.""" + return self.next_line is not None + + def __eq__(self, other: object) -> bool: + if not isinstance(other, PodLog): + return NotImplemented + return self.timestamp == other.timestamp + + def __lt__(self, other: object) -> bool: + if not isinstance(other, PodLog): + return NotImplemented + return self.timestamp < other.timestamp + + def __next__(self) -> Tuple[str, datetime]: + if self.next_line is None: + raise StopIteration() + value = self.next_line + self.next_line = self._scan_next() + return value + + def _iter_file(self) -> Iterator[str]: + """Iterates over the file, yielding lines.""" + self.file.seek(self.pointer) + for line in iter(self.file.readline, ""): + self.pointer = self.file.tell() + yield line + + def _scan_next(self) -> Optional[Tuple[str, datetime]]: + pod_name = f'"pod_name":"{self.pod_name}"' + for line in self._iter_file(): + self.pointer = self.file.tell() + if pod_name not in line: + continue + + timestamp = _TIMESTAMP_REGEX.search(line) + if not timestamp: + logger.error(f"Log line contains no timestamp {line}") + continue + + return line, datetime.fromisoformat(timestamp.group("timestamp")) + return None + class VectorFlatFileSource(LogSource): """Log source for flat JSONL files produced by [Vector](https://vector.dev/). This is typically used when running experiments locally within, say, Minikube or Kind.""" - def __init__(self, file: TextIO, app_name: str): + def __init__(self, file: TextIO, app_name: str, sorted=False): self.file = file self.app_name = app_name + self.sorted = sorted def __enter__(self): return self @@ -48,6 +119,42 @@ class VectorFlatFileSource(LogSource): seen.add(experiment_id) yield experiment_id + def _sorted_logs(self, line_predicate: Callable[[str], bool]) -> Iterator[str]: + sources = [ + source for source in self._pod_logs(line_predicate) if source.has_next() + ] + heapify(sources) + while sources: + log = heappop(sources) + yield next(log)[0] + if log.has_next(): + heappush(sources, log) + + def _unsorted_logs(self, line_predicate: Callable[[str], bool]) -> Iterator[str]: + self.file.seek(0) + for line in self.file: + if not line_predicate(line): + continue + yield line + + def _pod_logs(self, line_predicate: Callable[[str], bool]) -> List[PodLog]: + logger.info("Identifying pod logs.") + self.file.seek(0) + pod_logs = {} + for line in self.file: + if not line_predicate(line): + continue + match = _POD_NAME_REGEX.search(line) + if not match: + logger.error(f"Log line contains no pod name {line}") + continue + pod_name = match.group("pod_name") + if pod_name not in pod_logs: + logger.info(f"Pod found: {pod_name}") + pod_logs[pod_name] = PodLog(pod_name, self.file) + + return list(pod_logs.values()) + def logs( self, group_id: str, experiment_id: Optional[str] = None ) -> Iterator[Tuple[ExperimentId, NodeId, RawLine]]: @@ -59,26 +166,33 @@ class VectorFlatFileSource(LogSource): group_label = f'"app.kubernetes.io/part-of":"{group_id}"' experiment_label = f'"app.kubernetes.io/instance":"{experiment_id}"' - self.file.seek(0) - for line in self.file: - # Does a cheaper match to avoid parsing every line. - if app_label in line and group_label in line: - if experiment_id is not None and experiment_label not in line: - continue - try: - parsed = json.loads(line) - except JSONDecodeError as err: - logger.error( - f"Failed to parse line from vector from source {line}", err - ) - continue + def line_predicate(line: str) -> bool: + return ( + app_label in line + and group_label in line + and (experiment_id is None or experiment_label in line) + ) - k8s = parsed["kubernetes"] - yield ( - k8s["pod_labels"]["app.kubernetes.io/instance"], - k8s["pod_name"], - parsed["message"], + logs = ( + self._sorted_logs(line_predicate) + if self.sorted + else self._unsorted_logs(line_predicate) + ) + for line in logs: + try: + parsed = json.loads(line) + except JSONDecodeError as err: + logger.error( + f"Failed to parse line from vector from source {line}", err ) + continue + + k8s = parsed["kubernetes"] + yield ( + k8s["pod_labels"]["app.kubernetes.io/instance"], + k8s["pod_name"], + parsed["message"], + ) def __str__(self): return f"VectorFlatFileSource({self.app_name})"