chore: improve docs

This commit is contained in:
gmega 2025-01-10 08:17:39 -03:00
parent f30b5d003d
commit b0313f65c8
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
2 changed files with 46 additions and 2 deletions

View File

@ -64,6 +64,9 @@ class LogEntry(SnakeCaseModel):
class AdaptedLogEntry(LogEntry, ABC):
"""Interface extension to adapted :class:`LogEntry`es which allows converting instances from the original model
into the adapted model and vice-versa."""
@classmethod
@abstractmethod
def adapt_instance(cls, data: SnakeCaseModel) -> "AdaptedLogEntry":
@ -220,6 +223,7 @@ class RequestEvent(Event):
def basic_log_parser() -> LogParser:
"""Constructs a basic log parser which can understand some common log entry types."""
parser = LogParser()
parser.register(Event)
parser.register(Metric)

View File

@ -1,3 +1,6 @@
"""This module standardizes interfaces for consuming logs from external log sources; i.e. infrastructure
that stores logs. Such infrastructure might be a simple file system, a service like Logstash, or a database."""
import json
from abc import ABC, abstractmethod
from collections.abc import Iterator
@ -18,22 +21,34 @@ NodeId = str
class LogSource(ABC):
""":class:`LogSource` knows how to retrieve logs for experiments within experiment groups, and can answer queries
about which experiment ids make up a given group."""
""":class:`LogSource` knows how to retrieve logs for experiments within experiment groups. A key assumption is that
group ids are known, and those can usually be recovered from, say, a workflow run."""
@abstractmethod
def experiments(self, group_id: str) -> Iterator[str]:
"""Retrieves all experiment IDs within an experiment group."""
pass
@abstractmethod
def logs(
self, group_id: str, experiment_id: Optional[str] = None
) -> Iterator[Tuple[ExperimentId, NodeId, RawLine]]:
"""Retrieves logs for either all experiments within a group, or a specific experiments.
@param group_id: The group ID to retrieve logs for.
@param experiment_id: The experiment ID to retrieve logs for. If None, logs for all experiments in the group.
@return: An iterator of tuples, where each tuple contains the experiment ID, the node ID, and a raw (unparsed)
log line."""
pass
class OutputManager(AbstractContextManager):
"""An :class:`OutputManager` is responsible for managing output locations for log splitting operations.
:class:`OutputManager`s must be closed after use, and implements the context manager interface to that end."""
def open(self, relative_path: Path) -> TextIO:
"""Opens a file for writing within a relative abstract path."""
if relative_path.is_absolute():
raise ValueError(f"Path {relative_path} must be relative.")
return self._open(relative_path)
@ -44,6 +59,8 @@ class OutputManager(AbstractContextManager):
class FSOutputManager(OutputManager):
"""Simple :class:`OutputManager` which writes directly into the file system."""
def __init__(self, root: Path) -> None:
self.root = root
self.open_files: List[TextIO] = []
@ -65,11 +82,20 @@ class FSOutputManager(OutputManager):
class VectorFlatFileSource(LogSource):
"""Log source for flat JSONL files produced by [Vector](https://vector.dev/). This is typically used when running
experiments locally within, say, Minikube or Kind."""
def __init__(self, file: TextIO, app_name: str):
self.file = file
self.app_name = app_name
def experiments(self, group_id: str) -> Iterator[str]:
"""
Retrieves all experiment IDs within an experiment group. Can be quite slow as this source supports
no indexing or aggregation.
See also: :meth:`LogSource.experiments`.
"""
app_label = f'"app.kubernetes.io/name":"{self.app_name}"'
group_label = f'"app.kubernetes.io/part-of":"{group_id}"'
seen = set()
@ -91,6 +117,10 @@ class VectorFlatFileSource(LogSource):
def logs(
self, group_id: str, experiment_id: Optional[str] = None
) -> Iterator[Tuple[ExperimentId, NodeId, RawLine]]:
"""Retrieves logs for either all experiments within a group, or a specific experiments. Again, since this
source supports no indexing this can be quite slow, as each query represents a full pass on the file.
I strongly encourage not attempting to retrieve logs for experiments individually.
"""
app_label = f'"app.kubernetes.io/name":"{self.app_name}"'
group_label = f'"app.kubernetes.io/part-of":"{group_id}"'
experiment_label = f'"app.kubernetes.io/instance":"{experiment_id}"'
@ -117,6 +147,16 @@ def split_logs_in_source(
group_id: str,
formats: Optional[List[Tuple[Type[LogEntry], LogSplitterFormats]]] = None,
) -> None:
"""
Parses logs for an entire experiment group and splits them onto separate folders per experiment, as well
as separate files for each log type. This makes it suitable for consumption by an analysis environment.
:param log_source: The :class:`LogSource` to retrieve logs from.
:param log_parser: A suitably configured :class:`LogParser` which can understand the logs.
:param output_manager: An :class:`OutputManager` to manage where output content gets placed.
:param group_id: The group ID to retrieve logs for.
:param formats: An additional format configuration to be fed onto :class:`LogSplitter`.
"""
splitters: Dict[str, LogSplitter] = {}
formats = formats if formats else []