diff --git a/benchmarks/cli.py b/benchmarks/cli.py index 8bb8b56..c6611ed 100644 --- a/benchmarks/cli.py +++ b/benchmarks/cli.py @@ -1,4 +1,5 @@ import argparse +import logging import sys from pathlib import Path from typing import Dict @@ -7,7 +8,7 @@ from pydantic_core import ValidationError from benchmarks.core.config import ConfigParser, ExperimentBuilder from benchmarks.core.experiments.experiments import Experiment -from benchmarks.core.logging import basic_log_parser, LogSplitter +from benchmarks.core.logging import basic_log_parser, LogSplitter, LogEntry from benchmarks.deluge.config import DelugeExperimentConfig from benchmarks.deluge.logging import DelugeTorrentDownload @@ -17,6 +18,11 @@ config_parser.register(DelugeExperimentConfig) log_parser = basic_log_parser() log_parser.register(DelugeTorrentDownload) +DECLogEntry = LogEntry.adapt(DelugeExperimentConfig) +log_parser.register(DECLogEntry) + +logger = logging.getLogger(__name__) + def cmd_list(experiments: Dict[str, ExperimentBuilder[Experiment]], _): print(f'Available experiments are:') @@ -28,7 +34,10 @@ def cmd_run(experiments: Dict[str, ExperimentBuilder[Experiment]], args): if args.experiment not in experiments: print(f'Experiment {args.experiment} not found.') sys.exit(-1) - experiments[args.experiment].build().run() + + experiment = experiments[args.experiment] + logger.info(DECLogEntry.adapt_instance(experiment)) + experiment.build().run() def cmd_describe(args): diff --git a/benchmarks/core/logging.py b/benchmarks/core/logging.py index a1e19b2..296ed2a 100644 --- a/benchmarks/core/logging.py +++ b/benchmarks/core/logging.py @@ -1,10 +1,11 @@ import datetime import json import logging +from abc import ABC, abstractmethod from csv import DictWriter from enum import Enum from json import JSONDecodeError -from typing import Type, TextIO, Iterable, Callable, Dict, Tuple +from typing import Type, TextIO, Iterable, Callable, Dict, Tuple, cast, Any from pydantic import ValidationError, computed_field, Field @@ -16,21 +17,74 @@ logger = logging.getLogger(__name__) class LogEntry(SnakeCaseModel): + """ + Base class for log entries. Built so that structured logs are easy to produce with the standard logging module; + e.g.: + + >> logging.getLogger(__name__) + >> + >> class DownloadEvent(LogEntry): + >> file: str + >> timestamp: datetime.datetime + >> node: str + >> + >> logger.info(DownloadEvent(file='some_file.csv', timestamp=datetime.datetime.now(), node='node1')) + """ + def __str__(self): return f"{MARKER}{self.model_dump_json()}" - @computed_field # type: ignore + @computed_field # type: ignore @property def entry_type(self) -> str: return self.alias() + @classmethod + def adapt(cls, model: Type[SnakeCaseModel]) -> Type['AdaptedLogEntry']: + """Adapts an existing Pydantic model to a LogEntry. This is useful for when you have a model + that you want to log and later recover from logs using :class:`LogParser` or :class:`LogSplitter`.""" + + def adapt_instance(cls, data: SnakeCaseModel): + return cls.model_validate(data.model_dump()) + + def recover_instance(self): + return model.model_validate(self.model_dump()) + + adapted = type( + f'{model.__name__}LogEntry', + (LogEntry,), + { + '__annotations__': model.__annotations__, + 'adapt_instance': classmethod(adapt_instance), + 'recover_instance': recover_instance, + } + ) + + return cast(Type[AdaptedLogEntry], adapted) + + +class AdaptedLogEntry(LogEntry, ABC): + + @classmethod + @abstractmethod + def adapt_instance(cls, data: SnakeCaseModel) -> 'AdaptedLogEntry': + pass + + @abstractmethod + def recover_instance(self) -> SnakeCaseModel: + pass class LogParser: + """:class:`LogParser` will pick up log entries from a stream and parse them into :class:`LogEntry` instances. + It works by trying to find a special marker (>>>) in the log line, and then parsing the JSON that follows it. + This allows us to flexibly overlay structured logs on top of existing logging frameworks without having to + aggressively modify them.""" + def __init__(self): self.entry_types = {} self.warn_counts = 10 - def register(self, entry_type: Type[SnakeCaseModel]): + def register(self, entry_type: Type[LogEntry]): self.entry_types[entry_type.alias()] = entry_type def parse(self, log: TextIO) -> Iterable[LogEntry]: @@ -60,29 +114,55 @@ class LogParser: logger.warning("Too many errors: suppressing further schema warnings.") +class LogSplitterFormats(Enum): + jsonl = 'jsonl' + csv = 'csv' + + class LogSplitter: + """:class:`LogSplitter` will split parsed logs into different files based on the entry type. + The output format can be set for each entry type.""" + def __init__(self, output_factory=Callable[[str], TextIO], output_entry_type=False) -> None: self.output_factory = output_factory - self.dump = ( - (lambda model: model.model_dump()) - if output_entry_type - else (lambda model: model.model_dump(exclude={'entry_type'})) - ) + self.outputs: Dict[str, Tuple[Callable[[LogEntry], None], TextIO]] = {} + self.formats: Dict[str, LogSplitterFormats] = {} + self.exclude = {'entry_type'} if not output_entry_type else set() - self.outputs: Dict[str, Tuple[DictWriter, TextIO]] = {} + def set_format(self, entry_type: Type[LogEntry], output_format: LogSplitterFormats): + self.formats[entry_type.alias()] = output_format def split(self, log: Iterable[LogEntry]): for entry in log: - writer, _ = self.outputs.get(entry.entry_type, (None, None)) - entry_dict = self.dump(entry) + write, _ = self.outputs.get(entry.entry_type, (None, None)) - if writer is None: - output = self.output_factory(entry.entry_type) - writer = DictWriter(output, fieldnames=entry_dict.keys()) - self.outputs[entry.entry_type] = writer, output - writer.writeheader() + if write is None: + output_stream = self.output_factory(entry.entry_type) + output_format = self.formats.get(entry.entry_type, LogSplitterFormats.csv) + write = self._formatting_writer(entry, output_stream, output_format) + self.outputs[entry.entry_type] = write, output_stream - writer.writerow(entry_dict) + write(entry) + + def _formatting_writer( + self, + entry: LogEntry, + output_stream: TextIO, + output_format: LogSplitterFormats + ) -> Callable[[LogEntry], None]: + if output_format == LogSplitterFormats.csv: + writer = DictWriter(output_stream, fieldnames=entry.model_dump(exclude=self.exclude).keys()) + writer.writeheader() + return lambda x: writer.writerow(x.model_dump(exclude=self.exclude)) + + elif output_format == LogSplitterFormats.jsonl: + def write_jsonl(x: LogEntry): + output_stream.write(x.model_dump_json(exclude=self.exclude) + '\n') + + return write_jsonl + + else: + raise ValueError(f"Unknown output format: {output_format}") def __enter__(self): return self diff --git a/benchmarks/core/tests/test_logging.py b/benchmarks/core/tests/test_logging.py index eeecd31..50441a3 100644 --- a/benchmarks/core/tests/test_logging.py +++ b/benchmarks/core/tests/test_logging.py @@ -2,7 +2,8 @@ import datetime from collections import defaultdict from io import StringIO -from benchmarks.core.logging import LogEntry, LogParser, LogSplitter +from benchmarks.core.logging import LogEntry, LogParser, LogSplitter, LogSplitterFormats +from benchmarks.core.pydantic import SnakeCaseModel from benchmarks.tests.utils import compact @@ -94,7 +95,7 @@ class StateChangeEvent(LogEntry): new: str -def test_should_log_events_correctly(mock_logger): +def test_should_recover_logged_events_at_parsing(mock_logger): logger, output = mock_logger events = [ @@ -146,7 +147,7 @@ def test_should_split_intertwined_logs_by_entry_type(): outputs = defaultdict(StringIO) splitter = LogSplitter( - output_factory=lambda entry_type: outputs[entry_type], + output_factory=lambda entry_type: outputs[entry_type], ) splitter.split(parser.parse(log)) @@ -166,3 +167,54 @@ def test_should_split_intertwined_logs_by_entry_type(): name,surname John,Doe """)) + + +class SomeConfig(SnakeCaseModel): + some_field: str + + +def test_should_adapt_existing_model_types_to_logging_types(): + SomeConfigLogEntry = LogEntry.adapt(SomeConfig) + + assert (str(SomeConfigLogEntry(some_field='value')) == + '>>{"some_field":"value","entry_type":"some_config_log_entry"}') + + +def test_should_adapt_existing_model_instances_to_logging_instances(): + SomeConfigLogEntry = LogEntry.adapt(SomeConfig) + instance = SomeConfig(some_field='value') + + assert (str(SomeConfigLogEntry.adapt_instance(instance)) == + '>>{"some_field":"value","entry_type":"some_config_log_entry"}') + + +def test_should_store_split_logs_as_jsonl_for_requested_types(): + log = StringIO(""" + >>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.246,"node":"node2","entry_type":"metrics_event"} + >>{"name":"start","timestamp":"2021-01-01T00:00:00Z","entry_type":"simple_event"} + >>{"name":"start2","timestamp":"2021-01-01T00:00:00Z","entry_type":"simple_event"} + """) + + parser = LogParser() + parser.register(MetricsEvent) + parser.register(SimpleEvent) + + outputs = defaultdict(StringIO) + + splitter = LogSplitter( + output_factory=lambda entry_type: outputs[entry_type], + ) + + splitter.set_format(SimpleEvent, LogSplitterFormats.jsonl) + + splitter.split(parser.parse(log)) + + assert compact(outputs['metrics_event'].getvalue()) == (compact(""" + name,timestamp,value,node + download,2021-01-01 00:00:00+00:00,0.246,node2 + """)) + + assert compact(outputs['simple_event'].getvalue()) == (compact(""" + {"name":"start","timestamp":"2021-01-01T00:00:00Z"} + {"name":"start2","timestamp":"2021-01-01T00:00:00Z"} + """))