2025-01-09 20:32:41 -03:00

139 lines
4.3 KiB
Python

import json
from abc import ABC, abstractmethod
from collections.abc import Iterator
from contextlib import AbstractContextManager
from pathlib import Path
from typing import TextIO, Optional, Tuple, List, Dict, Type
from benchmarks.logging.logging import (
LogParser,
LogSplitter,
LogSplitterFormats,
LogEntry,
)
RawLine = str
ExperimentId = str
NodeId = str
class LogSource(ABC):
""":class:`LogSource` knows how to retrieve logs for experiments within experiment groups, and can answer queries
about which experiment ids make up a given group."""
@abstractmethod
def experiments(self, group_id: str) -> Iterator[str]:
pass
@abstractmethod
def logs(
self, group_id: str, experiment_id: Optional[str] = None
) -> Iterator[Tuple[ExperimentId, NodeId, RawLine]]:
pass
class OutputManager(AbstractContextManager):
def open(self, relative_path: Path) -> TextIO:
if relative_path.is_absolute():
raise ValueError(f"Path {relative_path} must be relative.")
return self._open(relative_path)
@abstractmethod
def _open(self, relative_path: Path) -> TextIO:
pass
class FSOutputManager(OutputManager):
def __init__(self, root: Path) -> None:
self.root = root
self.open_files: List[TextIO] = []
def _open(self, relative_path: Path) -> TextIO:
fullpath = self.root / relative_path
parent = fullpath.parent
parent.mkdir(parents=True, exist_ok=True)
f = fullpath.open("w", encoding="utf-8")
self.open_files.append(f)
return f
def __exit__(self, exc_type, exc_val, exc_tb):
for f in self.open_files:
try:
f.close()
except IOError:
pass
class VectorFlatFileSource(LogSource):
def __init__(self, file: TextIO, app_name: str):
self.file = file
self.app_name = app_name
def experiments(self, group_id: str) -> Iterator[str]:
app_label = f'"app.kubernetes.io/name":"{self.app_name}"'
group_label = f'"app.kubernetes.io/part-of":"{group_id}"'
seen = set()
self.file.seek(0)
for line in self.file:
if app_label not in line or group_label not in line:
continue
parsed = json.loads(line)
experiment_id = parsed["kubernetes"]["pod_labels"][
"app.kubernetes.io/instance"
]
if experiment_id in seen:
continue
seen.add(experiment_id)
yield experiment_id
def logs(
self, group_id: str, experiment_id: Optional[str] = None
) -> Iterator[Tuple[ExperimentId, NodeId, RawLine]]:
app_label = f'"app.kubernetes.io/name":"{self.app_name}"'
group_label = f'"app.kubernetes.io/part-of":"{group_id}"'
experiment_label = f'"app.kubernetes.io/instance":"{experiment_id}"'
self.file.seek(0)
for line in self.file:
# Does a cheaper match to avoid parsing every line.
if app_label in line and group_label in line:
if experiment_id is not None and experiment_label not in line:
continue
parsed = json.loads(line)
k8s = parsed["kubernetes"]
yield (
k8s["pod_labels"]["app.kubernetes.io/instance"],
k8s["pod_name"],
parsed["message"],
)
def split_logs_in_source(
log_source: LogSource,
log_parser: LogParser,
output_manager: OutputManager,
group_id: str,
formats: Optional[List[Tuple[Type[LogEntry], LogSplitterFormats]]] = None,
) -> None:
splitters: Dict[str, LogSplitter] = {}
formats = formats if formats else []
for experiment_id, node_id, raw_line in log_source.logs(group_id):
splitter = splitters.get(experiment_id)
if splitter is None:
splitter = LogSplitter(
lambda event_type, ext: output_manager.open(
Path(experiment_id) / f"{event_type}.{ext.value}"
)
)
for entry_type, output_format in formats:
splitter.set_format(entry_type, output_format)
splitters[experiment_id] = splitter
parsed = log_parser.parse_single(raw_line)
if parsed:
splitter.split_single(parsed)