dump and parse experiment config as logs

This commit is contained in:
gmega 2024-12-12 11:45:30 -03:00
parent edfcf68cf9
commit d716af5d8b
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
3 changed files with 163 additions and 22 deletions

View File

@ -1,4 +1,5 @@
import argparse import argparse
import logging
import sys import sys
from pathlib import Path from pathlib import Path
from typing import Dict from typing import Dict
@ -7,7 +8,7 @@ from pydantic_core import ValidationError
from benchmarks.core.config import ConfigParser, ExperimentBuilder from benchmarks.core.config import ConfigParser, ExperimentBuilder
from benchmarks.core.experiments.experiments import Experiment from benchmarks.core.experiments.experiments import Experiment
from benchmarks.core.logging import basic_log_parser, LogSplitter from benchmarks.core.logging import basic_log_parser, LogSplitter, LogEntry
from benchmarks.deluge.config import DelugeExperimentConfig from benchmarks.deluge.config import DelugeExperimentConfig
from benchmarks.deluge.logging import DelugeTorrentDownload from benchmarks.deluge.logging import DelugeTorrentDownload
@ -17,6 +18,11 @@ config_parser.register(DelugeExperimentConfig)
log_parser = basic_log_parser() log_parser = basic_log_parser()
log_parser.register(DelugeTorrentDownload) log_parser.register(DelugeTorrentDownload)
DECLogEntry = LogEntry.adapt(DelugeExperimentConfig)
log_parser.register(DECLogEntry)
logger = logging.getLogger(__name__)
def cmd_list(experiments: Dict[str, ExperimentBuilder[Experiment]], _): def cmd_list(experiments: Dict[str, ExperimentBuilder[Experiment]], _):
print(f'Available experiments are:') print(f'Available experiments are:')
@ -28,7 +34,10 @@ def cmd_run(experiments: Dict[str, ExperimentBuilder[Experiment]], args):
if args.experiment not in experiments: if args.experiment not in experiments:
print(f'Experiment {args.experiment} not found.') print(f'Experiment {args.experiment} not found.')
sys.exit(-1) sys.exit(-1)
experiments[args.experiment].build().run()
experiment = experiments[args.experiment]
logger.info(DECLogEntry.adapt_instance(experiment))
experiment.build().run()
def cmd_describe(args): def cmd_describe(args):

View File

@ -1,10 +1,11 @@
import datetime import datetime
import json import json
import logging import logging
from abc import ABC, abstractmethod
from csv import DictWriter from csv import DictWriter
from enum import Enum from enum import Enum
from json import JSONDecodeError from json import JSONDecodeError
from typing import Type, TextIO, Iterable, Callable, Dict, Tuple from typing import Type, TextIO, Iterable, Callable, Dict, Tuple, cast, Any
from pydantic import ValidationError, computed_field, Field from pydantic import ValidationError, computed_field, Field
@ -16,21 +17,74 @@ logger = logging.getLogger(__name__)
class LogEntry(SnakeCaseModel): class LogEntry(SnakeCaseModel):
"""
Base class for log entries. Built so that structured logs are easy to produce with the standard logging module;
e.g.:
>> logging.getLogger(__name__)
>>
>> class DownloadEvent(LogEntry):
>> file: str
>> timestamp: datetime.datetime
>> node: str
>>
>> logger.info(DownloadEvent(file='some_file.csv', timestamp=datetime.datetime.now(), node='node1'))
"""
def __str__(self): def __str__(self):
return f"{MARKER}{self.model_dump_json()}" return f"{MARKER}{self.model_dump_json()}"
@computed_field # type: ignore @computed_field # type: ignore
@property @property
def entry_type(self) -> str: def entry_type(self) -> str:
return self.alias() return self.alias()
@classmethod
def adapt(cls, model: Type[SnakeCaseModel]) -> Type['AdaptedLogEntry']:
"""Adapts an existing Pydantic model to a LogEntry. This is useful for when you have a model
that you want to log and later recover from logs using :class:`LogParser` or :class:`LogSplitter`."""
def adapt_instance(cls, data: SnakeCaseModel):
return cls.model_validate(data.model_dump())
def recover_instance(self):
return model.model_validate(self.model_dump())
adapted = type(
f'{model.__name__}LogEntry',
(LogEntry,),
{
'__annotations__': model.__annotations__,
'adapt_instance': classmethod(adapt_instance),
'recover_instance': recover_instance,
}
)
return cast(Type[AdaptedLogEntry], adapted)
class AdaptedLogEntry(LogEntry, ABC):
@classmethod
@abstractmethod
def adapt_instance(cls, data: SnakeCaseModel) -> 'AdaptedLogEntry':
pass
@abstractmethod
def recover_instance(self) -> SnakeCaseModel:
pass
class LogParser: class LogParser:
""":class:`LogParser` will pick up log entries from a stream and parse them into :class:`LogEntry` instances.
It works by trying to find a special marker (>>>) in the log line, and then parsing the JSON that follows it.
This allows us to flexibly overlay structured logs on top of existing logging frameworks without having to
aggressively modify them."""
def __init__(self): def __init__(self):
self.entry_types = {} self.entry_types = {}
self.warn_counts = 10 self.warn_counts = 10
def register(self, entry_type: Type[SnakeCaseModel]): def register(self, entry_type: Type[LogEntry]):
self.entry_types[entry_type.alias()] = entry_type self.entry_types[entry_type.alias()] = entry_type
def parse(self, log: TextIO) -> Iterable[LogEntry]: def parse(self, log: TextIO) -> Iterable[LogEntry]:
@ -60,29 +114,55 @@ class LogParser:
logger.warning("Too many errors: suppressing further schema warnings.") logger.warning("Too many errors: suppressing further schema warnings.")
class LogSplitterFormats(Enum):
jsonl = 'jsonl'
csv = 'csv'
class LogSplitter: class LogSplitter:
""":class:`LogSplitter` will split parsed logs into different files based on the entry type.
The output format can be set for each entry type."""
def __init__(self, output_factory=Callable[[str], TextIO], output_entry_type=False) -> None: def __init__(self, output_factory=Callable[[str], TextIO], output_entry_type=False) -> None:
self.output_factory = output_factory self.output_factory = output_factory
self.dump = ( self.outputs: Dict[str, Tuple[Callable[[LogEntry], None], TextIO]] = {}
(lambda model: model.model_dump()) self.formats: Dict[str, LogSplitterFormats] = {}
if output_entry_type self.exclude = {'entry_type'} if not output_entry_type else set()
else (lambda model: model.model_dump(exclude={'entry_type'}))
)
self.outputs: Dict[str, Tuple[DictWriter, TextIO]] = {} def set_format(self, entry_type: Type[LogEntry], output_format: LogSplitterFormats):
self.formats[entry_type.alias()] = output_format
def split(self, log: Iterable[LogEntry]): def split(self, log: Iterable[LogEntry]):
for entry in log: for entry in log:
writer, _ = self.outputs.get(entry.entry_type, (None, None)) write, _ = self.outputs.get(entry.entry_type, (None, None))
entry_dict = self.dump(entry)
if writer is None: if write is None:
output = self.output_factory(entry.entry_type) output_stream = self.output_factory(entry.entry_type)
writer = DictWriter(output, fieldnames=entry_dict.keys()) output_format = self.formats.get(entry.entry_type, LogSplitterFormats.csv)
self.outputs[entry.entry_type] = writer, output write = self._formatting_writer(entry, output_stream, output_format)
writer.writeheader() self.outputs[entry.entry_type] = write, output_stream
writer.writerow(entry_dict) write(entry)
def _formatting_writer(
self,
entry: LogEntry,
output_stream: TextIO,
output_format: LogSplitterFormats
) -> Callable[[LogEntry], None]:
if output_format == LogSplitterFormats.csv:
writer = DictWriter(output_stream, fieldnames=entry.model_dump(exclude=self.exclude).keys())
writer.writeheader()
return lambda x: writer.writerow(x.model_dump(exclude=self.exclude))
elif output_format == LogSplitterFormats.jsonl:
def write_jsonl(x: LogEntry):
output_stream.write(x.model_dump_json(exclude=self.exclude) + '\n')
return write_jsonl
else:
raise ValueError(f"Unknown output format: {output_format}")
def __enter__(self): def __enter__(self):
return self return self

View File

@ -2,7 +2,8 @@ import datetime
from collections import defaultdict from collections import defaultdict
from io import StringIO from io import StringIO
from benchmarks.core.logging import LogEntry, LogParser, LogSplitter from benchmarks.core.logging import LogEntry, LogParser, LogSplitter, LogSplitterFormats
from benchmarks.core.pydantic import SnakeCaseModel
from benchmarks.tests.utils import compact from benchmarks.tests.utils import compact
@ -94,7 +95,7 @@ class StateChangeEvent(LogEntry):
new: str new: str
def test_should_log_events_correctly(mock_logger): def test_should_recover_logged_events_at_parsing(mock_logger):
logger, output = mock_logger logger, output = mock_logger
events = [ events = [
@ -146,7 +147,7 @@ def test_should_split_intertwined_logs_by_entry_type():
outputs = defaultdict(StringIO) outputs = defaultdict(StringIO)
splitter = LogSplitter( splitter = LogSplitter(
output_factory=lambda entry_type: outputs[entry_type], output_factory=lambda entry_type: outputs[entry_type],
) )
splitter.split(parser.parse(log)) splitter.split(parser.parse(log))
@ -166,3 +167,54 @@ def test_should_split_intertwined_logs_by_entry_type():
name,surname name,surname
John,Doe John,Doe
""")) """))
class SomeConfig(SnakeCaseModel):
some_field: str
def test_should_adapt_existing_model_types_to_logging_types():
SomeConfigLogEntry = LogEntry.adapt(SomeConfig)
assert (str(SomeConfigLogEntry(some_field='value')) ==
'>>{"some_field":"value","entry_type":"some_config_log_entry"}')
def test_should_adapt_existing_model_instances_to_logging_instances():
SomeConfigLogEntry = LogEntry.adapt(SomeConfig)
instance = SomeConfig(some_field='value')
assert (str(SomeConfigLogEntry.adapt_instance(instance)) ==
'>>{"some_field":"value","entry_type":"some_config_log_entry"}')
def test_should_store_split_logs_as_jsonl_for_requested_types():
log = StringIO("""
>>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.246,"node":"node2","entry_type":"metrics_event"}
>>{"name":"start","timestamp":"2021-01-01T00:00:00Z","entry_type":"simple_event"}
>>{"name":"start2","timestamp":"2021-01-01T00:00:00Z","entry_type":"simple_event"}
""")
parser = LogParser()
parser.register(MetricsEvent)
parser.register(SimpleEvent)
outputs = defaultdict(StringIO)
splitter = LogSplitter(
output_factory=lambda entry_type: outputs[entry_type],
)
splitter.set_format(SimpleEvent, LogSplitterFormats.jsonl)
splitter.split(parser.parse(log))
assert compact(outputs['metrics_event'].getvalue()) == (compact("""
name,timestamp,value,node
download,2021-01-01 00:00:00+00:00,0.246,node2
"""))
assert compact(outputs['simple_event'].getvalue()) == (compact("""
{"name":"start","timestamp":"2021-01-01T00:00:00Z"}
{"name":"start2","timestamp":"2021-01-01T00:00:00Z"}
"""))