2024-12-14 06:34:11 -03:00

218 lines
7.0 KiB
Python

import datetime
import json
import logging
from abc import ABC, abstractmethod
from csv import DictWriter
from enum import Enum
from json import JSONDecodeError
from typing import Type, TextIO, Iterable, Callable, Dict, Tuple, cast
from pydantic import ValidationError, computed_field, Field
from benchmarks.core.pydantic import SnakeCaseModel
MARKER = ">>"
logger = logging.getLogger(__name__)
class LogEntry(SnakeCaseModel):
"""
Base class for log entries. Built so that structured logs are easy to produce with the standard logging module;
e.g.:
>> logging.getLogger(__name__)
>>
>> class DownloadEvent(LogEntry):
>> file: str
>> timestamp: datetime.datetime
>> node: str
>>
>> logger.info(DownloadEvent(file='some_file.csv', timestamp=datetime.datetime.now(), node='node1'))
"""
def __str__(self):
return f"{MARKER}{self.model_dump_json()}"
@computed_field # type: ignore
@property
def entry_type(self) -> str:
return self.alias()
@classmethod
def adapt(cls, model: Type[SnakeCaseModel]) -> Type["AdaptedLogEntry"]:
"""Adapts an existing Pydantic model to a LogEntry. This is useful for when you have a model
that you want to log and later recover from logs using :class:`LogParser` or :class:`LogSplitter`."""
def adapt_instance(cls, data: SnakeCaseModel):
return cls.model_validate(data.model_dump())
def recover_instance(self):
return model.model_validate(self.model_dump())
adapted = type(
f"{model.__name__}LogEntry",
(LogEntry,),
{
"__annotations__": model.__annotations__,
"adapt_instance": classmethod(adapt_instance),
"recover_instance": recover_instance,
},
)
return cast(Type[AdaptedLogEntry], adapted)
class AdaptedLogEntry(LogEntry, ABC):
@classmethod
@abstractmethod
def adapt_instance(cls, data: SnakeCaseModel) -> "AdaptedLogEntry":
pass
@abstractmethod
def recover_instance(self) -> SnakeCaseModel:
pass
class LogParser:
""":class:`LogParser` will pick up log entries from a stream and parse them into :class:`LogEntry` instances.
It works by trying to find a special marker (>>>) in the log line, and then parsing the JSON that follows it.
This allows us to flexibly overlay structured logs on top of existing logging frameworks without having to
aggressively modify them."""
def __init__(self):
self.entry_types = {}
self.warn_counts = 10
def register(self, entry_type: Type[LogEntry]):
self.entry_types[entry_type.alias()] = entry_type
def parse(self, log: TextIO) -> Iterable[LogEntry]:
marker_len = len(MARKER)
for line in log:
index = line.find(MARKER)
if index == -1:
continue
type_tag = "" # just to calm down mypy
try:
# Should probably test this against a regex for the type tag to see which is faster.
json_line = json.loads(line[index + marker_len :])
type_tag = json_line.get("entry_type")
if not type_tag or (type_tag not in self.entry_types):
continue
yield self.entry_types[type_tag].model_validate(json_line)
except JSONDecodeError:
pass
except ValidationError as err:
# This is usually something we want to know about, as if the message has a type_tag
# that we know, then we should probably be able to parse it.
self.warn_counts -= 1 # avoid flooding everything with warnings
if self.warn_counts > 0:
logger.warning(
f"Schema failed for line with known type tag {type_tag}: {err}"
)
elif self.warn_counts == 0:
logger.warning(
"Too many errors: suppressing further schema warnings."
)
class LogSplitterFormats(Enum):
jsonl = "jsonl"
csv = "csv"
class LogSplitter:
""":class:`LogSplitter` will split parsed logs into different files based on the entry type.
The output format can be set for each entry type."""
def __init__(
self,
output_factory=Callable[[str, LogSplitterFormats], TextIO],
output_entry_type=False,
) -> None:
self.output_factory = output_factory
self.outputs: Dict[str, Tuple[Callable[[LogEntry], None], TextIO]] = {}
self.formats: Dict[str, LogSplitterFormats] = {}
self.exclude = {"entry_type"} if not output_entry_type else set()
def set_format(self, entry_type: Type[LogEntry], output_format: LogSplitterFormats):
self.formats[entry_type.alias()] = output_format
def split(self, log: Iterable[LogEntry]):
for entry in log:
write, _ = self.outputs.get(entry.entry_type, (None, None))
if write is None:
output_format = self.formats.get(
entry.entry_type, LogSplitterFormats.csv
)
output_stream = self.output_factory(entry.entry_type, output_format)
write = self._formatting_writer(entry, output_stream, output_format)
self.outputs[entry.entry_type] = write, output_stream
write(entry)
def _formatting_writer(
self, entry: LogEntry, output_stream: TextIO, output_format: LogSplitterFormats
) -> Callable[[LogEntry], None]:
if output_format == LogSplitterFormats.csv:
writer = DictWriter(
output_stream, fieldnames=entry.model_dump(exclude=self.exclude).keys()
)
writer.writeheader()
return lambda x: writer.writerow(x.model_dump(exclude=self.exclude))
elif output_format == LogSplitterFormats.jsonl:
def write_jsonl(x: LogEntry):
output_stream.write(x.model_dump_json(exclude=self.exclude) + "\n")
return write_jsonl
else:
raise ValueError(f"Unknown output format: {output_format}")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
for _, output in self.outputs.values():
output.close()
type NodeId = str
class Event(LogEntry):
node: NodeId
name: str # XXX this ends up being redundant for custom event schemas... need to think of a better solution.
timestamp: datetime.datetime = Field(
default_factory=lambda: datetime.datetime.now(datetime.UTC)
)
class Metric(Event):
value: int | float
class RequestEventType(Enum):
start = "start"
end = "end"
class RequestEvent(Event):
destination: NodeId
request_id: str
type: RequestEventType
def basic_log_parser() -> LogParser:
parser = LogParser()
parser.register(Event)
parser.register(Metric)
parser.register(RequestEvent)
return parser