feat: add experiment group log splitter

This commit is contained in:
gmega 2025-01-09 16:47:37 -03:00
parent bfabd1c4c8
commit 347cbb942e
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
7 changed files with 13 additions and 456 deletions

View File

@ -8,7 +8,7 @@ from pydantic_core import ValidationError
from benchmarks.core.config import ConfigParser, ExperimentBuilder
from benchmarks.core.experiments.experiments import Experiment
from benchmarks.core.logging import (
from benchmarks.logging.logging import (
basic_log_parser,
LogSplitter,
LogEntry,

View File

@ -5,7 +5,7 @@ from typing import Sequence, Optional
from typing_extensions import Generic, List, Tuple
from benchmarks.core.experiments.experiments import ExperimentWithLifecycle
from benchmarks.core.logging import RequestEvent, RequestEventType
from benchmarks.logging.logging import RequestEvent, RequestEventType
from benchmarks.core.network import (
TInitialMetadata,
TNetworkHandle,
@ -82,7 +82,6 @@ class StaticDisseminationExperiment(
logger.info("Download %d / %d completed", i + 1, len(downloads))
def teardown(self, exception: Optional[Exception] = None):
def _remove(element: Tuple[int, Node[TNetworkHandle, TInitialMetadata]]):
index, node = element
assert self._cid is not None # to please mypy

View File

@ -6,7 +6,7 @@ from unittest.mock import patch
from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment
from benchmarks.core.experiments.tests.utils import MockExperimentData
from benchmarks.core.logging import LogParser, RequestEvent, RequestEventType
from benchmarks.logging.logging import LogParser, RequestEvent, RequestEventType
from benchmarks.core.network import Node, DownloadHandle
@ -49,7 +49,9 @@ class MockNode(Node[MockHandle, str]):
elif self.seeding is not None:
assert self.seeding[0] == handle
else:
raise Exception('Either leech or seed must be called before attempting a remove')
raise Exception(
"Either leech or seed must be called before attempting a remove"
)
self.remove_was_called = True

View File

@ -1,217 +0,0 @@
import datetime
import json
import logging
from abc import ABC, abstractmethod
from csv import DictWriter
from enum import Enum
from json import JSONDecodeError
from typing import Type, TextIO, Iterable, Callable, Dict, Tuple, cast
from pydantic import ValidationError, computed_field, Field
from benchmarks.core.pydantic import SnakeCaseModel
MARKER = ">>"
logger = logging.getLogger(__name__)
class LogEntry(SnakeCaseModel):
"""
Base class for log entries. Built so that structured logs are easy to produce with the standard logging module;
e.g.:
>> logging.getLogger(__name__)
>>
>> class DownloadEvent(LogEntry):
>> file: str
>> timestamp: datetime.datetime
>> node: str
>>
>> logger.info(DownloadEvent(file='some_file.csv', timestamp=datetime.datetime.now(), node='node1'))
"""
def __str__(self):
return f"{MARKER}{self.model_dump_json()}"
@computed_field # type: ignore
@property
def entry_type(self) -> str:
return self.alias()
@classmethod
def adapt(cls, model: Type[SnakeCaseModel]) -> Type["AdaptedLogEntry"]:
"""Adapts an existing Pydantic model to a LogEntry. This is useful for when you have a model
that you want to log and later recover from logs using :class:`LogParser` or :class:`LogSplitter`."""
def adapt_instance(cls, data: SnakeCaseModel):
return cls.model_validate(data.model_dump())
def recover_instance(self):
return model.model_validate(self.model_dump())
adapted = type(
f"{model.__name__}LogEntry",
(LogEntry,),
{
"__annotations__": model.__annotations__,
"adapt_instance": classmethod(adapt_instance),
"recover_instance": recover_instance,
},
)
return cast(Type[AdaptedLogEntry], adapted)
class AdaptedLogEntry(LogEntry, ABC):
@classmethod
@abstractmethod
def adapt_instance(cls, data: SnakeCaseModel) -> "AdaptedLogEntry":
pass
@abstractmethod
def recover_instance(self) -> SnakeCaseModel:
pass
class LogParser:
""":class:`LogParser` will pick up log entries from a stream and parse them into :class:`LogEntry` instances.
It works by trying to find a special marker (>>>) in the log line, and then parsing the JSON that follows it.
This allows us to flexibly overlay structured logs on top of existing logging frameworks without having to
aggressively modify them."""
def __init__(self):
self.entry_types = {}
self.warn_counts = 10
def register(self, entry_type: Type[LogEntry]):
self.entry_types[entry_type.alias()] = entry_type
def parse(self, log: TextIO) -> Iterable[LogEntry]:
marker_len = len(MARKER)
for line in log:
index = line.find(MARKER)
if index == -1:
continue
type_tag = "" # just to calm down mypy
try:
# Should probably test this against a regex for the type tag to see which is faster.
json_line = json.loads(line[index + marker_len :])
type_tag = json_line.get("entry_type")
if not type_tag or (type_tag not in self.entry_types):
continue
yield self.entry_types[type_tag].model_validate(json_line)
except JSONDecodeError:
pass
except ValidationError as err:
# This is usually something we want to know about, as if the message has a type_tag
# that we know, then we should probably be able to parse it.
self.warn_counts -= 1 # avoid flooding everything with warnings
if self.warn_counts > 0:
logger.warning(
f"Schema failed for line with known type tag {type_tag}: {err}"
)
elif self.warn_counts == 0:
logger.warning(
"Too many errors: suppressing further schema warnings."
)
class LogSplitterFormats(Enum):
jsonl = "jsonl"
csv = "csv"
class LogSplitter:
""":class:`LogSplitter` will split parsed logs into different files based on the entry type.
The output format can be set for each entry type."""
def __init__(
self,
output_factory=Callable[[str, LogSplitterFormats], TextIO],
output_entry_type=False,
) -> None:
self.output_factory = output_factory
self.outputs: Dict[str, Tuple[Callable[[LogEntry], None], TextIO]] = {}
self.formats: Dict[str, LogSplitterFormats] = {}
self.exclude = {"entry_type"} if not output_entry_type else set()
def set_format(self, entry_type: Type[LogEntry], output_format: LogSplitterFormats):
self.formats[entry_type.alias()] = output_format
def split(self, log: Iterable[LogEntry]):
for entry in log:
write, _ = self.outputs.get(entry.entry_type, (None, None))
if write is None:
output_format = self.formats.get(
entry.entry_type, LogSplitterFormats.csv
)
output_stream = self.output_factory(entry.entry_type, output_format)
write = self._formatting_writer(entry, output_stream, output_format)
self.outputs[entry.entry_type] = write, output_stream
write(entry)
def _formatting_writer(
self, entry: LogEntry, output_stream: TextIO, output_format: LogSplitterFormats
) -> Callable[[LogEntry], None]:
if output_format == LogSplitterFormats.csv:
writer = DictWriter(
output_stream, fieldnames=entry.model_dump(exclude=self.exclude).keys()
)
writer.writeheader()
return lambda x: writer.writerow(x.model_dump(exclude=self.exclude))
elif output_format == LogSplitterFormats.jsonl:
def write_jsonl(x: LogEntry):
output_stream.write(x.model_dump_json(exclude=self.exclude) + "\n")
return write_jsonl
else:
raise ValueError(f"Unknown output format: {output_format}")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
for _, output in self.outputs.values():
output.close()
type NodeId = str
class Event(LogEntry):
node: NodeId
name: str # XXX this ends up being redundant for custom event schemas... need to think of a better solution.
timestamp: datetime.datetime = Field(
default_factory=lambda: datetime.datetime.now(datetime.UTC)
)
class Metric(Event):
value: int | float
class RequestEventType(Enum):
start = "start"
end = "end"
class RequestEvent(Event):
destination: NodeId
request_id: str
type: RequestEventType
def basic_log_parser() -> LogParser:
parser = LogParser()
parser.register(Event)
parser.register(Metric)
parser.register(RequestEvent)
return parser

View File

@ -1,233 +0,0 @@
import datetime
from collections import defaultdict
from io import StringIO
from benchmarks.core.logging import LogEntry, LogParser, LogSplitter, LogSplitterFormats
from benchmarks.core.pydantic import SnakeCaseModel
from benchmarks.tests.utils import compact
class MetricsEvent(LogEntry):
name: str
timestamp: datetime.datetime
value: float
node: str
def test_log_entry_should_serialize_to_expected_format():
event = MetricsEvent(
name="download",
timestamp=datetime.datetime(2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc),
value=0.245,
node="node1",
)
assert str(event) == (
'>>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.245,'
'"node":"node1","entry_type":"metrics_event"}'
)
def test_should_parse_logs():
log = StringIO("""
>>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.245,"node":"node1","entry_type":"metrics_event"}
[some garbage introduced by the log formatter: bla bla bla] -:>>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.246,"node":"node2","entry_type":"metrics_event"}
""")
parser = LogParser()
parser.register(MetricsEvent)
assert list(parser.parse(log)) == [
MetricsEvent(
name="download",
timestamp=datetime.datetime(
2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc
),
value=0.245,
node="node1",
),
MetricsEvent(
name="download",
timestamp=datetime.datetime(
2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc
),
value=0.246,
node="node2",
),
]
def test_should_skip_unparseable_lines():
log = StringIO("""
>>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.245,"node":"node0","entry_type":"metrics_event"
>>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.246,"node":"node2","entry_type":"metrics_event"}
>>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.246,"node":"node5","entry_type":"metcs_event"}
some random gibberish
>>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.246,"node":"node3","entry_type":"metrics_event"}
""")
parser = LogParser()
parser.register(MetricsEvent)
assert list(parser.parse(log)) == [
MetricsEvent(
name="download",
timestamp=datetime.datetime(
2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc
),
value=0.246,
node="node2",
),
MetricsEvent(
name="download",
timestamp=datetime.datetime(
2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc
),
value=0.246,
node="node3",
),
]
class StateChangeEvent(LogEntry):
old: str
new: str
def test_should_recover_logged_events_at_parsing(mock_logger):
logger, output = mock_logger
events = [
StateChangeEvent(old="stopped", new="started"),
MetricsEvent(
name="download",
timestamp=datetime.datetime(
2021, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc
),
value=0.246,
node="node3",
),
StateChangeEvent(old="started", new="stopped"),
]
for event in events:
logger.info(event)
parser = LogParser()
parser.register(MetricsEvent)
parser.register(StateChangeEvent)
assert list(parser.parse(StringIO(output.getvalue()))) == events
class SimpleEvent(LogEntry):
name: str
timestamp: datetime.datetime
class Person(LogEntry):
name: str
surname: str
def test_should_split_intertwined_logs_by_entry_type():
log = StringIO("""
>>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.246,"node":"node2","entry_type":"metrics_event"}
>>{"name":"start","timestamp":"2021-01-01T00:00:00Z","entry_type":"simple_event"}
>>{"name":"John","surname":"Doe","timestamp":"2021-01-01T00:00:00Z","entry_type":"person"}
>>{"name":"start2","timestamp":"2021-01-01T00:00:00Z","entry_type":"simple_event"}
""")
parser = LogParser()
parser.register(MetricsEvent)
parser.register(SimpleEvent)
parser.register(Person)
outputs = defaultdict(StringIO)
splitter = LogSplitter(
output_factory=lambda entry_type, _: outputs[entry_type],
)
splitter.split(parser.parse(log))
assert compact(outputs["metrics_event"].getvalue()) == (
compact("""
name,timestamp,value,node
download,2021-01-01 00:00:00+00:00,0.246,node2
""")
)
assert compact(outputs["simple_event"].getvalue()) == (
compact("""
name,timestamp
start,2021-01-01 00:00:00+00:00
start2,2021-01-01 00:00:00+00:00
""")
)
assert compact(outputs["person"].getvalue()) == (
compact("""
name,surname
John,Doe
""")
)
class SomeConfig(SnakeCaseModel):
some_field: str
def test_should_adapt_existing_model_types_to_logging_types():
SomeConfigLogEntry = LogEntry.adapt(SomeConfig)
assert (
str(SomeConfigLogEntry(some_field="value"))
== '>>{"some_field":"value","entry_type":"some_config_log_entry"}'
)
def test_should_adapt_existing_model_instances_to_logging_instances():
SomeConfigLogEntry = LogEntry.adapt(SomeConfig)
instance = SomeConfig(some_field="value")
assert (
str(SomeConfigLogEntry.adapt_instance(instance))
== '>>{"some_field":"value","entry_type":"some_config_log_entry"}'
)
def test_should_store_split_logs_as_jsonl_for_requested_types():
log = StringIO("""
>>{"name":"download","timestamp":"2021-01-01T00:00:00Z","value":0.246,"node":"node2","entry_type":"metrics_event"}
>>{"name":"start","timestamp":"2021-01-01T00:00:00Z","entry_type":"simple_event"}
>>{"name":"start2","timestamp":"2021-01-01T00:00:00Z","entry_type":"simple_event"}
""")
parser = LogParser()
parser.register(MetricsEvent)
parser.register(SimpleEvent)
outputs = defaultdict(StringIO)
splitter = LogSplitter(
output_factory=lambda entry_type, _: outputs[entry_type],
)
splitter.set_format(SimpleEvent, LogSplitterFormats.jsonl)
splitter.split(parser.parse(log))
assert compact(outputs["metrics_event"].getvalue()) == (
compact("""
name,timestamp,value,node
download,2021-01-01 00:00:00+00:00,0.246,node2
""")
)
assert compact(outputs["simple_event"].getvalue()) == (
compact("""
{"name":"start","timestamp":"2021-01-01T00:00:00Z"}
{"name":"start2","timestamp":"2021-01-01T00:00:00Z"}
""")
)

View File

@ -1,4 +1,4 @@
from benchmarks.core.logging import Metric
from benchmarks.logging.logging import Metric
class DelugeTorrentDownload(Metric):

View File

@ -1,4 +1,6 @@
import json
from pathlib import Path
from typing import List, Dict, Any
def shared_volume() -> Path:
@ -7,3 +9,7 @@ def shared_volume() -> Path:
def compact(a_string: str) -> str:
return "\n".join([line.strip() for line in a_string.splitlines() if line.strip()])
def make_jsonl(content: List[Dict[str, Any]]) -> str:
return "\n".join([json.dumps(line, separators=(",", ":")) for line in content])