diff --git a/benchmarks/cli.py b/benchmarks/cli.py index 736c28e..5691445 100644 --- a/benchmarks/cli.py +++ b/benchmarks/cli.py @@ -8,7 +8,7 @@ from pydantic_core import ValidationError from benchmarks.core.config import ConfigParser, ExperimentBuilder from benchmarks.core.experiments.experiments import Experiment -from benchmarks.core.logging import ( +from benchmarks.logging.logging import ( basic_log_parser, LogSplitter, LogEntry, diff --git a/benchmarks/core/experiments/static_experiment.py b/benchmarks/core/experiments/static_experiment.py index e440c81..d6da758 100644 --- a/benchmarks/core/experiments/static_experiment.py +++ b/benchmarks/core/experiments/static_experiment.py @@ -5,7 +5,7 @@ from typing import Sequence, Optional from typing_extensions import Generic, List, Tuple from benchmarks.core.experiments.experiments import ExperimentWithLifecycle -from benchmarks.core.logging import RequestEvent, RequestEventType +from benchmarks.logging.logging import RequestEvent, RequestEventType from benchmarks.core.network import ( TInitialMetadata, TNetworkHandle, @@ -82,7 +82,6 @@ class StaticDisseminationExperiment( logger.info("Download %d / %d completed", i + 1, len(downloads)) def teardown(self, exception: Optional[Exception] = None): - def _remove(element: Tuple[int, Node[TNetworkHandle, TInitialMetadata]]): index, node = element assert self._cid is not None # to please mypy diff --git a/benchmarks/core/experiments/tests/test_static_experiment.py b/benchmarks/core/experiments/tests/test_static_experiment.py index ad22216..1f27ef0 100644 --- a/benchmarks/core/experiments/tests/test_static_experiment.py +++ b/benchmarks/core/experiments/tests/test_static_experiment.py @@ -6,7 +6,7 @@ from unittest.mock import patch from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment from benchmarks.core.experiments.tests.utils import MockExperimentData -from benchmarks.core.logging import LogParser, RequestEvent, RequestEventType +from benchmarks.logging.logging import LogParser, RequestEvent, RequestEventType from benchmarks.core.network import Node, DownloadHandle @@ -49,7 +49,9 @@ class MockNode(Node[MockHandle, str]): elif self.seeding is not None: assert self.seeding[0] == handle else: - raise Exception('Either leech or seed must be called before attempting a remove') + raise Exception( + "Either leech or seed must be called before attempting a remove" + ) self.remove_was_called = True diff --git a/benchmarks/core/logging.py b/benchmarks/core/logging.py deleted file mode 100644 index bdb35d0..0000000 --- a/benchmarks/core/logging.py +++ /dev/null @@ -1,217 +0,0 @@ -import datetime -import json -import logging -from abc import ABC, abstractmethod -from csv import DictWriter -from enum import Enum -from json import JSONDecodeError -from typing import Type, TextIO, Iterable, Callable, Dict, Tuple, cast - -from pydantic import ValidationError, computed_field, Field - -from benchmarks.core.pydantic import SnakeCaseModel - -MARKER = ">>" - -logger = logging.getLogger(__name__) - - -class LogEntry(SnakeCaseModel): - """ - Base class for log entries. Built so that structured logs are easy to produce with the standard logging module; - e.g.: - - >> logging.getLogger(__name__) - >> - >> class DownloadEvent(LogEntry): - >> file: str - >> timestamp: datetime.datetime - >> node: str - >> - >> logger.info(DownloadEvent(file='some_file.csv', timestamp=datetime.datetime.now(), node='node1')) - """ - - def __str__(self): - return f"{MARKER}{self.model_dump_json()}" - - @computed_field # type: ignore - @property - def entry_type(self) -> str: - return self.alias() - - @classmethod - def adapt(cls, model: Type[SnakeCaseModel]) -> Type["AdaptedLogEntry"]: - """Adapts an existing Pydantic model to a LogEntry. This is useful for when you have a model - that you want to log and later recover from logs using :class:`LogParser` or :class:`LogSplitter`.""" - - def adapt_instance(cls, data: SnakeCaseModel): - return cls.model_validate(data.model_dump()) - - def recover_instance(self): - return model.model_validate(self.model_dump()) - - adapted = type( - f"{model.__name__}LogEntry", - (LogEntry,), - { - "__annotations__": model.__annotations__, - "adapt_instance": classmethod(adapt_instance), - "recover_instance": recover_instance, - }, - ) - - return cast(Type[AdaptedLogEntry], adapted) - - -class AdaptedLogEntry(LogEntry, ABC): - @classmethod - @abstractmethod - def adapt_instance(cls, data: SnakeCaseModel) -> "AdaptedLogEntry": - pass - - @abstractmethod - def recover_instance(self) -> SnakeCaseModel: - pass - - -class LogParser: - """:class:`LogParser` will pick up log entries from a stream and parse them into :class:`LogEntry` instances. - It works by trying to find a special marker (>>>) in the log line, and then parsing the JSON that follows it. - This allows us to flexibly overlay structured logs on top of existing logging frameworks without having to - aggressively modify them.""" - - def __init__(self): - self.entry_types = {} - self.warn_counts = 10 - - def register(self, entry_type: Type[LogEntry]): - self.entry_types[entry_type.alias()] = entry_type - - def parse(self, log: TextIO) -> Iterable[LogEntry]: - marker_len = len(MARKER) - for line in log: - index = line.find(MARKER) - if index == -1: - continue - - type_tag = "" # just to calm down mypy - try: - # Should probably test this against a regex for the type tag to see which is faster. - json_line = json.loads(line[index + marker_len :]) - type_tag = json_line.get("entry_type") - if not type_tag or (type_tag not in self.entry_types): - continue - yield self.entry_types[type_tag].model_validate(json_line) - except JSONDecodeError: - pass - except ValidationError as err: - # This is usually something we want to know about, as if the message has a type_tag - # that we know, then we should probably be able to parse it. - self.warn_counts -= 1 # avoid flooding everything with warnings - if self.warn_counts > 0: - logger.warning( - f"Schema failed for line with known type tag {type_tag}: {err}" - ) - elif self.warn_counts == 0: - logger.warning( - "Too many errors: suppressing further schema warnings." - ) - - -class LogSplitterFormats(Enum): - jsonl = "jsonl" - csv = "csv" - - -class LogSplitter: - """:class:`LogSplitter` will split parsed logs into different files based on the entry type. - The output format can be set for each entry type.""" - - def __init__( - self, - output_factory=Callable[[str, LogSplitterFormats], TextIO], - output_entry_type=False, - ) -> None: - self.output_factory = output_factory - self.outputs: Dict[str, Tuple[Callable[[LogEntry], None], TextIO]] = {} - self.formats: Dict[str, LogSplitterFormats] = {} - self.exclude = {"entry_type"} if not output_entry_type else set() - - def set_format(self, entry_type: Type[LogEntry], output_format: LogSplitterFormats): - self.formats[entry_type.alias()] = output_format - - def split(self, log: Iterable[LogEntry]): - for entry in log: - write, _ = self.outputs.get(entry.entry_type, (None, None)) - - if write is None: - output_format = self.formats.get( - entry.entry_type, LogSplitterFormats.csv - ) - output_stream = self.output_factory(entry.entry_type, output_format) - - write = self._formatting_writer(entry, output_stream, output_format) - self.outputs[entry.entry_type] = write, output_stream - - write(entry) - - def _formatting_writer( - self, entry: LogEntry, output_stream: TextIO, output_format: LogSplitterFormats - ) -> Callable[[LogEntry], None]: - if output_format == LogSplitterFormats.csv: - writer = DictWriter( - output_stream, fieldnames=entry.model_dump(exclude=self.exclude).keys() - ) - writer.writeheader() - return lambda x: writer.writerow(x.model_dump(exclude=self.exclude)) - - elif output_format == LogSplitterFormats.jsonl: - - def write_jsonl(x: LogEntry): - output_stream.write(x.model_dump_json(exclude=self.exclude) + "\n") - - return write_jsonl - - else: - raise ValueError(f"Unknown output format: {output_format}") - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - for _, output in self.outputs.values(): - output.close() - - -type NodeId = str - - -class Event(LogEntry): - node: NodeId - name: str # XXX this ends up being redundant for custom event schemas... need to think of a better solution. - timestamp: datetime.datetime = Field( - default_factory=lambda: datetime.datetime.now(datetime.UTC) - ) - - -class Metric(Event): - value: int | float - - -class RequestEventType(Enum): - start = "start" - end = "end" - - -class RequestEvent(Event): - destination: NodeId - request_id: str - type: RequestEventType - - -def basic_log_parser() -> LogParser: - parser = LogParser() - parser.register(Event) - parser.register(Metric) - parser.register(RequestEvent) - return parser diff --git a/benchmarks/core/tests/test_logging.py b/benchmarks/core/tests/test_logging.py deleted file mode 100644 index 88fe801..0000000 --- a/benchmarks/core/tests/test_logging.py +++ /dev/null @@ -1,233 +0,0 @@ -import datetime -from collections import defaultdict -from io import StringIO - -from benchmarks.core.logging import LogEntry, LogParser, LogSplitter, LogSplitterFormats -from benchmarks.core.pydantic import SnakeCaseModel -from benchmarks.tests.utils import compact - - -class MetricsEvent(LogEntry): - name: str - timestamp: datetime.datetime - value: float - node: str - - -def test_log_entry_should_serialize_to_expected_format(): - event = MetricsEvent( - name="download", - timestamp=datetime.datetime(2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc), - value=0.245, - node="node1", - ) - - assert str(event) == ( - '>>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.245,' - '"node":"node1","entry_type":"metrics_event"}' - ) - - -def test_should_parse_logs(): - log = StringIO(""" - >>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.245,"node":"node1","entry_type":"metrics_event"} - [some garbage introduced by the log formatter: bla bla bla] -:>>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.246,"node":"node2","entry_type":"metrics_event"} - """) - - parser = LogParser() - parser.register(MetricsEvent) - - assert list(parser.parse(log)) == [ - MetricsEvent( - name="download", - timestamp=datetime.datetime( - 2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc - ), - value=0.245, - node="node1", - ), - MetricsEvent( - name="download", - timestamp=datetime.datetime( - 2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc - ), - value=0.246, - node="node2", - ), - ] - - -def test_should_skip_unparseable_lines(): - log = StringIO(""" - >>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.245,"node":"node0","entry_type":"metrics_event" - >>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.246,"node":"node2","entry_type":"metrics_event"} - >>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.246,"node":"node5","entry_type":"metcs_event"} - some random gibberish - >>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.246,"node":"node3","entry_type":"metrics_event"} - """) - - parser = LogParser() - parser.register(MetricsEvent) - - assert list(parser.parse(log)) == [ - MetricsEvent( - name="download", - timestamp=datetime.datetime( - 2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc - ), - value=0.246, - node="node2", - ), - MetricsEvent( - name="download", - timestamp=datetime.datetime( - 2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc - ), - value=0.246, - node="node3", - ), - ] - - -class StateChangeEvent(LogEntry): - old: str - new: str - - -def test_should_recover_logged_events_at_parsing(mock_logger): - logger, output = mock_logger - - events = [ - StateChangeEvent(old="stopped", new="started"), - MetricsEvent( - name="download", - timestamp=datetime.datetime( - 2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc - ), - value=0.246, - node="node3", - ), - StateChangeEvent(old="started", new="stopped"), - ] - - for event in events: - logger.info(event) - - parser = LogParser() - parser.register(MetricsEvent) - parser.register(StateChangeEvent) - - assert list(parser.parse(StringIO(output.getvalue()))) == events - - -class SimpleEvent(LogEntry): - name: str - timestamp: datetime.datetime - - -class Person(LogEntry): - name: str - surname: str - - -def test_should_split_intertwined_logs_by_entry_type(): - log = StringIO(""" - >>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.246,"node":"node2","entry_type":"metrics_event"} - >>{"name":"start","timestamp":"2021-01-01T00:00:00Z","entry_type":"simple_event"} - >>{"name":"John","surname":"Doe","timestamp":"2021-01-01T00:00:00Z","entry_type":"person"} - >>{"name":"start2","timestamp":"2021-01-01T00:00:00Z","entry_type":"simple_event"} - """) - - parser = LogParser() - parser.register(MetricsEvent) - parser.register(SimpleEvent) - parser.register(Person) - - outputs = defaultdict(StringIO) - - splitter = LogSplitter( - output_factory=lambda entry_type, _: outputs[entry_type], - ) - - splitter.split(parser.parse(log)) - - assert compact(outputs["metrics_event"].getvalue()) == ( - compact(""" - name,timestamp,value,node - download,2021-01-01 00:00:00+00:00,0.246,node2 - """) - ) - - assert compact(outputs["simple_event"].getvalue()) == ( - compact(""" - name,timestamp - start,2021-01-01 00:00:00+00:00 - start2,2021-01-01 00:00:00+00:00 - """) - ) - - assert compact(outputs["person"].getvalue()) == ( - compact(""" - name,surname - John,Doe - """) - ) - - -class SomeConfig(SnakeCaseModel): - some_field: str - - -def test_should_adapt_existing_model_types_to_logging_types(): - SomeConfigLogEntry = LogEntry.adapt(SomeConfig) - - assert ( - str(SomeConfigLogEntry(some_field="value")) - == '>>{"some_field":"value","entry_type":"some_config_log_entry"}' - ) - - -def test_should_adapt_existing_model_instances_to_logging_instances(): - SomeConfigLogEntry = LogEntry.adapt(SomeConfig) - instance = SomeConfig(some_field="value") - - assert ( - str(SomeConfigLogEntry.adapt_instance(instance)) - == '>>{"some_field":"value","entry_type":"some_config_log_entry"}' - ) - - -def test_should_store_split_logs_as_jsonl_for_requested_types(): - log = StringIO(""" - >>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.246,"node":"node2","entry_type":"metrics_event"} - >>{"name":"start","timestamp":"2021-01-01T00:00:00Z","entry_type":"simple_event"} - >>{"name":"start2","timestamp":"2021-01-01T00:00:00Z","entry_type":"simple_event"} - """) - - parser = LogParser() - parser.register(MetricsEvent) - parser.register(SimpleEvent) - - outputs = defaultdict(StringIO) - - splitter = LogSplitter( - output_factory=lambda entry_type, _: outputs[entry_type], - ) - - splitter.set_format(SimpleEvent, LogSplitterFormats.jsonl) - - splitter.split(parser.parse(log)) - - assert compact(outputs["metrics_event"].getvalue()) == ( - compact(""" - name,timestamp,value,node - download,2021-01-01 00:00:00+00:00,0.246,node2 - """) - ) - - assert compact(outputs["simple_event"].getvalue()) == ( - compact(""" - {"name":"start","timestamp":"2021-01-01T00:00:00Z"} - {"name":"start2","timestamp":"2021-01-01T00:00:00Z"} - """) - ) diff --git a/benchmarks/deluge/logging.py b/benchmarks/deluge/logging.py index 6c324c0..0c226c8 100644 --- a/benchmarks/deluge/logging.py +++ b/benchmarks/deluge/logging.py @@ -1,4 +1,4 @@ -from benchmarks.core.logging import Metric +from benchmarks.logging.logging import Metric class DelugeTorrentDownload(Metric): diff --git a/benchmarks/tests/utils.py b/benchmarks/tests/utils.py index e3add7d..f6512f8 100644 --- a/benchmarks/tests/utils.py +++ b/benchmarks/tests/utils.py @@ -1,4 +1,6 @@ +import json from pathlib import Path +from typing import List, Dict, Any def shared_volume() -> Path: @@ -7,3 +9,7 @@ def shared_volume() -> Path: def compact(a_string: str) -> str: return "\n".join([line.strip() for line in a_string.splitlines() if line.strip()]) + + +def make_jsonl(content: List[Dict[str, Any]]) -> str: + return "\n".join([json.dumps(line, separators=(",", ":")) for line in content])