From 3c03910fa1945fce2f910500f1bd03f7f54e754a Mon Sep 17 00:00:00 2001 From: gmega Date: Thu, 9 Jan 2025 19:03:55 -0300 Subject: [PATCH] feat: add split log source command to CLI; check-in missing logging module --- benchmarks/cli.py | 73 +++++- benchmarks/logging/__init__.py | 0 benchmarks/logging/logging.py | 227 +++++++++++++++++ benchmarks/logging/sources.py | 138 ++++++++++ benchmarks/logging/tests/__init__.py | 0 benchmarks/logging/tests/test_logging.py | 238 ++++++++++++++++++ benchmarks/logging/tests/test_sources.py | 90 +++++++ .../tests/test_split_logs_in_source.py | 137 ++++++++++ 8 files changed, 895 insertions(+), 8 deletions(-) create mode 100644 benchmarks/logging/__init__.py create mode 100644 benchmarks/logging/logging.py create mode 100644 benchmarks/logging/sources.py create mode 100644 benchmarks/logging/tests/__init__.py create mode 100644 benchmarks/logging/tests/test_logging.py create mode 100644 benchmarks/logging/tests/test_sources.py create mode 100644 benchmarks/logging/tests/test_split_logs_in_source.py diff --git a/benchmarks/cli.py b/benchmarks/cli.py index 5691445..7030f3d 100644 --- a/benchmarks/cli.py +++ b/benchmarks/cli.py @@ -16,6 +16,11 @@ from benchmarks.logging.logging import ( ) from benchmarks.deluge.config import DelugeExperimentConfig from benchmarks.deluge.logging import DelugeTorrentDownload +from benchmarks.logging.sources import ( + VectorFlatFileSource, + FSOutputManager, + split_logs_in_source, +) config_parser = ConfigParser() config_parser.register(DelugeExperimentConfig) @@ -55,7 +60,7 @@ def cmd_describe(args): print(config_parser.experiment_types[args.type].schema_json(indent=2)) -def cmd_logs(log: Path, output: Path): +def cmd_parse_single_log(log: Path, output: Path): if not log.exists(): print(f"Log file {log} does not exist.") sys.exit(-1) @@ -77,6 +82,31 @@ def cmd_logs(log: Path, output: Path): splitter.split(log_parser.parse(istream)) +def cmd_parse_log_source(group_id: str, source_file: Path, output_dir: Path): + if not source_file.exists(): + print(f"Log source file {source_file} does not exist.") + sys.exit(-1) + + if not output_dir.parent.exists(): + print(f"Folder {output_dir.parent} does not exist.") + sys.exit(-1) + + output_dir.mkdir(exist_ok=True) + + with ( + source_file.open("r", encoding="utf-8") as istream, + FSOutputManager(output_dir) as output_manager, + ): + log_source = VectorFlatFileSource(app_name="codex-benchmarks", file=istream) + split_logs_in_source( + log_source, + log_parser, + output_manager, + group_id, + formats=[(DECLogEntry, LogSplitterFormats.jsonl)], + ) + + def _parse_config(config: Path) -> Dict[str, ExperimentBuilder[Experiment]]: if not config.exists(): print(f"Config file {config} does not exist.") @@ -123,10 +153,10 @@ def main(): run_cmd.add_argument("experiment", type=str, help="Name of the experiment to run.") run_cmd.set_defaults(func=lambda args: cmd_run(_parse_config(args.config), args)) - describe = commands.add_parser( + describe_cmd = commands.add_parser( "describe", help="Shows the JSON schema for the various experiment types." ) - describe.add_argument( + describe_cmd.add_argument( "type", type=str, help="Type of the experiment to describe.", @@ -134,12 +164,39 @@ def main(): nargs="?", ) - describe.set_defaults(func=cmd_describe) + describe_cmd.set_defaults(func=cmd_describe) - logs = commands.add_parser("logs", help="Parse logs.") - logs.add_argument("log", type=Path, help="Path to the log file.") - logs.add_argument("output_dir", type=Path, help="Path to an output folder.") - logs.set_defaults(func=lambda args: cmd_logs(args.log, args.output_dir)) + logs_cmd = commands.add_parser("logs", help="Parse logs.") + log_subcommands = logs_cmd.add_subparsers(required=True) + + single_log_cmd = log_subcommands.add_parser( + "single", help="Parse a single log file." + ) + single_log_cmd.add_argument("log", type=Path, help="Path to the log file.") + single_log_cmd.add_argument( + "output_dir", type=Path, help="Path to an output folder." + ) + single_log_cmd.set_defaults( + func=lambda args: cmd_parse_single_log(args.log, args.output_dir) + ) + + log_source_cmd = log_subcommands.add_parser( + "source", help="Parse logs from a log source." + ) + log_source_cmd.add_argument( + "source_file", type=Path, help="Vector log file to parse from." + ) + log_source_cmd.add_argument( + "output_dir", type=Path, help="Path to an output folder." + ) + log_source_cmd.add_argument( + "group_id", type=str, help="ID of experiment group to parse." + ) + log_source_cmd.set_defaults( + func=lambda args: cmd_parse_log_source( + args.group_id, args.source_file, args.output_dir + ) + ) args = parser.parse_args() diff --git a/benchmarks/logging/__init__.py b/benchmarks/logging/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/benchmarks/logging/logging.py b/benchmarks/logging/logging.py new file mode 100644 index 0000000..1624e4a --- /dev/null +++ b/benchmarks/logging/logging.py @@ -0,0 +1,227 @@ +import datetime +import json +import logging +from abc import ABC, abstractmethod +from csv import DictWriter +from enum import Enum +from json import JSONDecodeError +from typing import Type, TextIO, Iterable, Callable, Dict, Tuple, cast, Optional + +from pydantic import ValidationError, computed_field, Field + +from benchmarks.core.pydantic import SnakeCaseModel + +MARKER = ">>" + +logger = logging.getLogger(__name__) + + +class LogEntry(SnakeCaseModel): + """ + Base class for log entries. Built so that structured logs are easy to produce with the standard logging module; + e.g.: + + >> logging.getLogger(__name__) + >> + >> class DownloadEvent(LogEntry): + >> file: str + >> timestamp: datetime.datetime + >> node: str + >> + >> logger.info(DownloadEvent(file='some_file.csv', timestamp=datetime.datetime.now(), node='node1')) + """ + + def __str__(self): + return f"{MARKER}{self.model_dump_json()}" + + @computed_field # type: ignore + @property + def entry_type(self) -> str: + return self.alias() + + @classmethod + def adapt(cls, model: Type[SnakeCaseModel]) -> Type["AdaptedLogEntry"]: + """Adapts an existing Pydantic model to a LogEntry. This is useful for when you have a model + that you want to log and later recover from logs using :class:`LogParser` or :class:`LogSplitter`.""" + + def adapt_instance(cls, data: SnakeCaseModel): + return cls.model_validate(data.model_dump()) + + def recover_instance(self): + return model.model_validate(self.model_dump()) + + adapted = type( + f"{model.__name__}LogEntry", + (LogEntry,), + { + "__annotations__": model.__annotations__, + "adapt_instance": classmethod(adapt_instance), + "recover_instance": recover_instance, + }, + ) + + return cast(Type[AdaptedLogEntry], adapted) + + +class AdaptedLogEntry(LogEntry, ABC): + @classmethod + @abstractmethod + def adapt_instance(cls, data: SnakeCaseModel) -> "AdaptedLogEntry": + pass + + @abstractmethod + def recover_instance(self) -> SnakeCaseModel: + pass + + +type Logs = Iterable[LogEntry] + + +class LogParser: + """:class:`LogParser` will pick up log entries from a stream and parse them into :class:`LogEntry` instances. + It works by trying to find a special marker (>>) in the log line, and then parsing the JSON that follows it. + This allows us to flexibly overlay structured logs on top of existing logging frameworks without having to + aggressively modify them.""" + + def __init__(self): + self.entry_types = {} + self.warn_counts = 10 + + def register(self, entry_type: Type[LogEntry]): + self.entry_types[entry_type.alias()] = entry_type + + def parse(self, log: Iterable[str]) -> Logs: + for line in log: + parsed = self.parse_single(line) + if not parsed: + continue + yield parsed + + def parse_single(self, line: str) -> Optional[LogEntry]: + marker_len = len(MARKER) + index = line.find(MARKER) + if index == -1: + return None + + type_tag = "" # just to calm down mypy + try: + # Should probably test this against a regex for the type tag to see which is faster. + json_line = json.loads(line[index + marker_len :]) + type_tag = json_line.get("entry_type") + if not type_tag or (type_tag not in self.entry_types): + return None + return self.entry_types[type_tag].model_validate(json_line) + except JSONDecodeError: + pass + except ValidationError as err: + # This is usually something we want to know about, as if the message has a type_tag + # that we know, then we should probably be able to parse it. + self.warn_counts -= 1 # avoid flooding everything with warnings + if self.warn_counts > 0: + logger.warning( + f"Schema failed for line with known type tag {type_tag}: {err}" + ) + elif self.warn_counts == 0: + logger.warning("Too many errors: suppressing further schema warnings.") + + return None + + +class LogSplitterFormats(Enum): + jsonl = "jsonl" + csv = "csv" + + +class LogSplitter: + """:class:`LogSplitter` will split parsed logs into different files based on the entry type. + The output format can be set for each entry type.""" + + def __init__( + self, + output_factory=Callable[[str, LogSplitterFormats], TextIO], + output_entry_type=False, + ) -> None: + self.output_factory = output_factory + self.outputs: Dict[str, Tuple[Callable[[LogEntry], None], TextIO]] = {} + self.formats: Dict[str, LogSplitterFormats] = {} + self.exclude = {"entry_type"} if not output_entry_type else set() + + def set_format(self, entry_type: Type[LogEntry], output_format: LogSplitterFormats): + self.formats[entry_type.alias()] = output_format + + def split(self, log: Iterable[LogEntry]): + for entry in log: + self.process_single(entry) + + def process_single(self, entry: LogEntry): + write, _ = self.outputs.get(entry.entry_type, (None, None)) + + if write is None: + output_format = self.formats.get(entry.entry_type, LogSplitterFormats.csv) + output_stream = self.output_factory(entry.entry_type, output_format) + + write = self._formatting_writer(entry, output_stream, output_format) + self.outputs[entry.entry_type] = write, output_stream + + write(entry) + + def _formatting_writer( + self, entry: LogEntry, output_stream: TextIO, output_format: LogSplitterFormats + ) -> Callable[[LogEntry], None]: + if output_format == LogSplitterFormats.csv: + writer = DictWriter( + output_stream, fieldnames=entry.model_dump(exclude=self.exclude).keys() + ) + writer.writeheader() + return lambda x: writer.writerow(x.model_dump(exclude=self.exclude)) + + elif output_format == LogSplitterFormats.jsonl: + + def write_jsonl(x: LogEntry): + output_stream.write(x.model_dump_json(exclude=self.exclude) + "\n") + + return write_jsonl + + else: + raise ValueError(f"Unknown output format: {output_format}") + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + for _, output in self.outputs.values(): + output.close() + + +type NodeId = str + + +class Event(LogEntry): + node: NodeId + name: str # XXX this ends up being redundant for custom event schemas... need to think of a better solution. + timestamp: datetime.datetime = Field( + default_factory=lambda: datetime.datetime.now(datetime.UTC) + ) + + +class Metric(Event): + value: int | float + + +class RequestEventType(Enum): + start = "start" + end = "end" + + +class RequestEvent(Event): + destination: NodeId + request_id: str + type: RequestEventType + + +def basic_log_parser() -> LogParser: + parser = LogParser() + parser.register(Event) + parser.register(Metric) + parser.register(RequestEvent) + return parser diff --git a/benchmarks/logging/sources.py b/benchmarks/logging/sources.py new file mode 100644 index 0000000..5fb2752 --- /dev/null +++ b/benchmarks/logging/sources.py @@ -0,0 +1,138 @@ +import json +from abc import ABC, abstractmethod +from collections.abc import Iterator +from contextlib import AbstractContextManager +from pathlib import Path +from typing import TextIO, Optional, Tuple, List, Dict, Type + +from benchmarks.logging.logging import ( + LogParser, + LogSplitter, + LogSplitterFormats, + LogEntry, +) + +RawLine = str +ExperimentId = str +NodeId = str + + +class LogSource(ABC): + """:class:`LogSource` knows how to retrieve logs for :class:`Identifiable` experiments, and can answer queries + about which experiments are present within it.""" + + @abstractmethod + def experiments(self, group_id: str) -> Iterator[str]: + pass + + @abstractmethod + def logs( + self, group_id: str, experiment_id: Optional[str] = None + ) -> Iterator[Tuple[ExperimentId, NodeId, RawLine]]: + pass + + +class OutputManager(AbstractContextManager): + def open(self, relative_path: Path) -> TextIO: + if relative_path.is_absolute(): + raise ValueError(f"Path {relative_path} must be relative.") + return self._open(relative_path) + + @abstractmethod + def _open(self, relative_path: Path) -> TextIO: + pass + + +class FSOutputManager(OutputManager): + def __init__(self, root: Path) -> None: + self.root = root + self.open_files: List[TextIO] = [] + + def _open(self, relative_path: Path) -> TextIO: + fullpath = self.root / relative_path + parent = fullpath.parent + parent.mkdir(parents=True, exist_ok=True) + f = fullpath.open("w", encoding="utf-8") + self.open_files.append(f) + return f + + def __exit__(self, exc_type, exc_val, exc_tb): + for f in self.open_files: + try: + f.close() + except IOError: + pass + + +class VectorFlatFileSource(LogSource): + def __init__(self, file: TextIO, app_name: str): + self.file = file + self.app_name = app_name + + def experiments(self, group_id: str) -> Iterator[str]: + app_label = f'"app.kubernetes.io/name":"{self.app_name}"' + group_label = f'"app.kubernetes.io/part-of":"{group_id}"' + seen = set() + + self.file.seek(0) + for line in self.file: + if app_label not in line or group_label not in line: + continue + + parsed = json.loads(line) + experiment_id = parsed["kubernetes"]["pod_labels"][ + "app.kubernetes.io/instance" + ] + if experiment_id in seen: + continue + seen.add(experiment_id) + yield experiment_id + + def logs( + self, group_id: str, experiment_id: Optional[str] = None + ) -> Iterator[Tuple[ExperimentId, NodeId, RawLine]]: + app_label = f'"app.kubernetes.io/name":"{self.app_name}"' + group_label = f'"app.kubernetes.io/part-of":"{group_id}"' + experiment_label = f'"app.kubernetes.io/instance":"{experiment_id}"' + + self.file.seek(0) + for line in self.file: + # Does a cheaper match to avoid parsing every line. + if app_label in line and group_label in line: + if experiment_id is not None and experiment_label not in line: + continue + parsed = json.loads(line) + k8s = parsed["kubernetes"] + yield ( + k8s["pod_labels"]["app.kubernetes.io/instance"], + k8s["pod_name"], + parsed["message"], + ) + + +def split_logs_in_source( + log_source: LogSource, + log_parser: LogParser, + output_manager: OutputManager, + group_id: str, + formats: Optional[List[Tuple[Type[LogEntry], LogSplitterFormats]]] = None, +) -> None: + splitters: Dict[str, LogSplitter] = {} + formats = formats if formats else [] + + for experiment_id, node_id, raw_line in log_source.logs(group_id): + splitter = splitters.get(experiment_id) + if splitter is None: + splitter = LogSplitter( + lambda event_type, ext: output_manager.open( + Path(experiment_id) / f"{event_type}.{ext.value}" + ) + ) + for entry_type, output_format in formats: + splitter.set_format(entry_type, output_format) + + splitters[experiment_id] = splitter + + parsed = log_parser.parse_single(raw_line) + if parsed: + splitter.process_single(parsed) diff --git a/benchmarks/logging/tests/__init__.py b/benchmarks/logging/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/benchmarks/logging/tests/test_logging.py b/benchmarks/logging/tests/test_logging.py new file mode 100644 index 0000000..5eb2f59 --- /dev/null +++ b/benchmarks/logging/tests/test_logging.py @@ -0,0 +1,238 @@ +import datetime +from collections import defaultdict +from io import StringIO + +from benchmarks.logging.logging import ( + LogEntry, + LogParser, + LogSplitter, + LogSplitterFormats, +) +from benchmarks.core.pydantic import SnakeCaseModel +from benchmarks.tests.utils import compact + + +class MetricsEvent(LogEntry): + name: str + timestamp: datetime.datetime + value: float + node: str + + +def test_log_entry_should_serialize_to_expected_format(): + event = MetricsEvent( + name="download", + timestamp=datetime.datetime(2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc), + value=0.245, + node="node1", + ) + + assert str(event) == ( + '>>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.245,' + '"node":"node1","entry_type":"metrics_event"}' + ) + + +def test_should_parse_logs(): + log = StringIO(""" + >>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.245,"node":"node1","entry_type":"metrics_event"} + [some garbage introduced by the log formatter: bla bla bla] -:>>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.246,"node":"node2","entry_type":"metrics_event"} + """) + + parser = LogParser() + parser.register(MetricsEvent) + + assert list(parser.parse(log)) == [ + MetricsEvent( + name="download", + timestamp=datetime.datetime( + 2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc + ), + value=0.245, + node="node1", + ), + MetricsEvent( + name="download", + timestamp=datetime.datetime( + 2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc + ), + value=0.246, + node="node2", + ), + ] + + +def test_should_skip_unparseable_lines(): + log = StringIO(""" + >>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.245,"node":"node0","entry_type":"metrics_event" + >>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.246,"node":"node2","entry_type":"metrics_event"} + >>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.246,"node":"node5","entry_type":"metcs_event"} + some random gibberish + >>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.246,"node":"node3","entry_type":"metrics_event"} + """) + + parser = LogParser() + parser.register(MetricsEvent) + + assert list(parser.parse(log)) == [ + MetricsEvent( + name="download", + timestamp=datetime.datetime( + 2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc + ), + value=0.246, + node="node2", + ), + MetricsEvent( + name="download", + timestamp=datetime.datetime( + 2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc + ), + value=0.246, + node="node3", + ), + ] + + +class StateChangeEvent(LogEntry): + old: str + new: str + + +def test_should_recover_logged_events_at_parsing(mock_logger): + logger, output = mock_logger + + events = [ + StateChangeEvent(old="stopped", new="started"), + MetricsEvent( + name="download", + timestamp=datetime.datetime( + 2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc + ), + value=0.246, + node="node3", + ), + StateChangeEvent(old="started", new="stopped"), + ] + + for event in events: + logger.info(event) + + parser = LogParser() + parser.register(MetricsEvent) + parser.register(StateChangeEvent) + + assert list(parser.parse(StringIO(output.getvalue()))) == events + + +class SimpleEvent(LogEntry): + name: str + timestamp: datetime.datetime + + +class Person(LogEntry): + name: str + surname: str + + +def test_should_split_intertwined_logs_by_entry_type(): + log = StringIO(""" + >>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.246,"node":"node2","entry_type":"metrics_event"} + >>{"name":"start","timestamp":"2021-01-01T00:00:00Z","entry_type":"simple_event"} + >>{"name":"John","surname":"Doe","timestamp":"2021-01-01T00:00:00Z","entry_type":"person"} + >>{"name":"start2","timestamp":"2021-01-01T00:00:00Z","entry_type":"simple_event"} + """) + + parser = LogParser() + parser.register(MetricsEvent) + parser.register(SimpleEvent) + parser.register(Person) + + outputs = defaultdict(StringIO) + + splitter = LogSplitter( + output_factory=lambda entry_type, _: outputs[entry_type], + ) + + splitter.split(parser.parse(log)) + + assert compact(outputs["metrics_event"].getvalue()) == ( + compact(""" + name,timestamp,value,node + download,2021-01-01 00:00:00+00:00,0.246,node2 + """) + ) + + assert compact(outputs["simple_event"].getvalue()) == ( + compact(""" + name,timestamp + start,2021-01-01 00:00:00+00:00 + start2,2021-01-01 00:00:00+00:00 + """) + ) + + assert compact(outputs["person"].getvalue()) == ( + compact(""" + name,surname + John,Doe + """) + ) + + +class SomeConfig(SnakeCaseModel): + some_field: str + + +def test_should_adapt_existing_model_types_to_logging_types(): + SomeConfigLogEntry = LogEntry.adapt(SomeConfig) + + assert ( + str(SomeConfigLogEntry(some_field="value")) + == '>>{"some_field":"value","entry_type":"some_config_log_entry"}' + ) + + +def test_should_adapt_existing_model_instances_to_logging_instances(): + SomeConfigLogEntry = LogEntry.adapt(SomeConfig) + instance = SomeConfig(some_field="value") + + assert ( + str(SomeConfigLogEntry.adapt_instance(instance)) + == '>>{"some_field":"value","entry_type":"some_config_log_entry"}' + ) + + +def test_should_store_split_logs_as_jsonl_for_requested_types(): + log = StringIO(""" + >>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.246,"node":"node2","entry_type":"metrics_event"} + >>{"name":"start","timestamp":"2021-01-01T00:00:00Z","entry_type":"simple_event"} + >>{"name":"start2","timestamp":"2021-01-01T00:00:00Z","entry_type":"simple_event"} + """) + + parser = LogParser() + parser.register(MetricsEvent) + parser.register(SimpleEvent) + + outputs = defaultdict(StringIO) + + splitter = LogSplitter( + output_factory=lambda entry_type, _: outputs[entry_type], + ) + + splitter.set_format(SimpleEvent, LogSplitterFormats.jsonl) + + splitter.split(parser.parse(log)) + + assert compact(outputs["metrics_event"].getvalue()) == ( + compact(""" + name,timestamp,value,node + download,2021-01-01 00:00:00+00:00,0.246,node2 + """) + ) + + assert compact(outputs["simple_event"].getvalue()) == ( + compact(""" + {"name":"start","timestamp":"2021-01-01T00:00:00Z"} + {"name":"start2","timestamp":"2021-01-01T00:00:00Z"} + """) + ) diff --git a/benchmarks/logging/tests/test_sources.py b/benchmarks/logging/tests/test_sources.py new file mode 100644 index 0000000..a454f81 --- /dev/null +++ b/benchmarks/logging/tests/test_sources.py @@ -0,0 +1,90 @@ +from io import StringIO + +from benchmarks.logging.sources import VectorFlatFileSource +from benchmarks.tests.utils import make_jsonl + +EXPERIMENT_LOG = [ + { + "kubernetes": { + "container_name": "deluge-experiment-runner", + "pod_labels": { + "app.kubernetes.io/instance": "e1", + "app.kubernetes.io/name": "codex-benchmarks", + "app.kubernetes.io/part-of": "g1736425800", + }, + "pod_name": "p1", + }, + "message": "m1", + }, + { + "kubernetes": { + "container_name": "deluge-experiment-runner", + "pod_labels": { + "app.kubernetes.io/instance": "e1", + "app.kubernetes.io/name": "codex-benchmarks", + "app.kubernetes.io/part-of": "g1736425800", + }, + "pod_name": "p2", + }, + "message": "m2", + }, + { + "kubernetes": { + "container_name": "deluge-experiment-runner", + "pod_labels": { + "app.kubernetes.io/instance": "e2", + "app.kubernetes.io/name": "codex-benchmarks", + "app.kubernetes.io/part-of": "g1736425800", + }, + "pod_name": "p1", + }, + "message": "m3", + }, +] + + +def test_should_retrieve_events_for_specific_experiments(): + extractor = VectorFlatFileSource( + StringIO(make_jsonl(EXPERIMENT_LOG)), + app_name="codex-benchmarks", + ) + + assert list(extractor.logs(group_id="g1736425800", experiment_id="e1")) == [ + ("e1", "p1", "m1"), + ("e1", "p2", "m2"), + ] + + assert list(extractor.logs(group_id="g1736425800", experiment_id="e2")) == [ + ("e2", "p1", "m3"), + ] + + +def test_should_return_empty_when_no_matching_experiment_exists(): + extractor = VectorFlatFileSource( + StringIO(make_jsonl(EXPERIMENT_LOG)), + app_name="codex-benchmarks", + ) + + assert list(extractor.logs("e3", "g1736425800")) == [] + + +def test_should_retrieve_events_for_an_entire_group(): + extractor = VectorFlatFileSource( + StringIO(make_jsonl(EXPERIMENT_LOG)), + app_name="codex-benchmarks", + ) + + assert list(extractor.logs(group_id="g1736425800")) == [ + ("e1", "p1", "m1"), + ("e1", "p2", "m2"), + ("e2", "p1", "m3"), + ] + + +def test_should_return_all_existing_experiments_in_group(): + extractor = VectorFlatFileSource( + StringIO(make_jsonl(EXPERIMENT_LOG)), + app_name="codex-benchmarks", + ) + + assert list(extractor.experiments("g1736425800")) == ["e1", "e2"] diff --git a/benchmarks/logging/tests/test_split_logs_in_source.py b/benchmarks/logging/tests/test_split_logs_in_source.py new file mode 100644 index 0000000..9b5005d --- /dev/null +++ b/benchmarks/logging/tests/test_split_logs_in_source.py @@ -0,0 +1,137 @@ +import datetime +from io import StringIO + +from benchmarks.logging.sources import ( + VectorFlatFileSource, + OutputManager, + split_logs_in_source, +) + +from benchmarks.logging.logging import LogEntry, LogParser +from benchmarks.tests.utils import make_jsonl, compact + +EXPERIMENT_LOG = [ + { + "kubernetes": { + "container_name": "deluge-experiment-runner", + "pod_labels": { + "app.kubernetes.io/instance": "e1", + "app.kubernetes.io/name": "codex-benchmarks", + "app.kubernetes.io/part-of": "g1736425800", + }, + "pod_name": "p1", + }, + "message": 'INFO >>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.246,' + '"node":"node2","entry_type":"metrics_event"}', + }, + { + "kubernetes": { + "container_name": "deluge-experiment-runner", + "pod_labels": { + "app.kubernetes.io/instance": "e1", + "app.kubernetes.io/name": "codex-benchmarks", + "app.kubernetes.io/part-of": "g1736425800", + }, + "pod_name": "p1", + }, + "message": '>>{"name":"John","surname":"Doe","timestamp":"2021-01-01T00:00:00Z","entry_type":"person"}', + }, + { + "kubernetes": { + "container_name": "deluge-experiment-runner", + "pod_labels": { + "app.kubernetes.io/instance": "e1", + "app.kubernetes.io/name": "codex-benchmarks", + "app.kubernetes.io/part-of": "g1736425800", + }, + "pod_name": "p1", + }, + "message": "Useless line", + }, + { + "kubernetes": { + "container_name": "deluge-experiment-runner", + "pod_labels": { + "app.kubernetes.io/instance": "e2", + "app.kubernetes.io/name": "codex-benchmarks", + "app.kubernetes.io/part-of": "g1736425800", + }, + "pod_name": "p2", + }, + "message": 'INFO >>{"name":"download","timestamp":"2021-01-01T00:00:00Z",' + '"value":0.246,"node":"node3","entry_type":"metrics_event"}', + }, +] + + +class InMemoryOutputManager(OutputManager): + def __init__(self): + self.fs = {} + + def _open(self, relative_path): + root = self.fs + for element in relative_path.parts[:-1]: + subtree = root.get(element) + if subtree is None: + subtree = {} + root[element] = subtree + root = subtree + + output = StringIO() + root[relative_path.parts[-1]] = output + return output + + def __exit__(self, exc_type, exc_value, traceback, /): + pass + + +class MetricsEvent(LogEntry): + name: str + timestamp: datetime.datetime + value: float + node: str + + +class Person(LogEntry): + name: str + surname: str + + +def test_should_produce_logs_for_multiple_experiments(): + parser = LogParser() + parser.register(MetricsEvent) + parser.register(Person) + + outputs = InMemoryOutputManager() + + split_logs_in_source( + log_source=VectorFlatFileSource( + app_name="codex-benchmarks", + file=StringIO(make_jsonl(EXPERIMENT_LOG)), + ), + log_parser=parser, + output_manager=outputs, + group_id="g1736425800", + ) + + assert set(outputs.fs.keys()) == {"e1", "e2"} + assert compact(outputs.fs["e1"]["metrics_event.csv"].getvalue()) == ( + compact(""" + name,timestamp,value,node + download,2021-01-01 00:00:00+00:00,0.246,node2 + """) + ) + + assert compact(outputs.fs["e1"]["person.csv"].getvalue()) == ( + compact(""" + name,surname + John,Doe + """) + ) + + assert compact(outputs.fs["e2"]["metrics_event.csv"].getvalue()) == ( + compact(""" + name,timestamp,value,node + download,2021-01-01 00:00:00+00:00,0.246,node3 + """) + )