From b0313f65c8022175ba62f0abd137def6e32d7618 Mon Sep 17 00:00:00 2001 From: gmega Date: Fri, 10 Jan 2025 08:17:39 -0300 Subject: [PATCH] chore: improve docs --- benchmarks/logging/logging.py | 4 ++++ benchmarks/logging/sources.py | 44 +++++++++++++++++++++++++++++++++-- 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/benchmarks/logging/logging.py b/benchmarks/logging/logging.py index 38d0fcf..f717ed1 100644 --- a/benchmarks/logging/logging.py +++ b/benchmarks/logging/logging.py @@ -64,6 +64,9 @@ class LogEntry(SnakeCaseModel): class AdaptedLogEntry(LogEntry, ABC): + """Interface extension to adapted :class:`LogEntry`es which allows converting instances from the original model + into the adapted model and vice-versa.""" + @classmethod @abstractmethod def adapt_instance(cls, data: SnakeCaseModel) -> "AdaptedLogEntry": @@ -220,6 +223,7 @@ class RequestEvent(Event): def basic_log_parser() -> LogParser: + """Constructs a basic log parser which can understand some common log entry types.""" parser = LogParser() parser.register(Event) parser.register(Metric) diff --git a/benchmarks/logging/sources.py b/benchmarks/logging/sources.py index 8b05819..50fc820 100644 --- a/benchmarks/logging/sources.py +++ b/benchmarks/logging/sources.py @@ -1,3 +1,6 @@ +"""This module standardizes interfaces for consuming logs from external log sources; i.e. infrastructure +that stores logs. Such infrastructure might be a simple file system, a service like Logstash, or a database.""" + import json from abc import ABC, abstractmethod from collections.abc import Iterator @@ -18,22 +21,34 @@ NodeId = str class LogSource(ABC): - """:class:`LogSource` knows how to retrieve logs for experiments within experiment groups, and can answer queries - about which experiment ids make up a given group.""" + """:class:`LogSource` knows how to retrieve logs for experiments within experiment groups. A key assumption is that + group ids are known, and those can usually be recovered from, say, a workflow run.""" @abstractmethod def experiments(self, group_id: str) -> Iterator[str]: + """Retrieves all experiment IDs within an experiment group.""" pass @abstractmethod def logs( self, group_id: str, experiment_id: Optional[str] = None ) -> Iterator[Tuple[ExperimentId, NodeId, RawLine]]: + """Retrieves logs for either all experiments within a group, or a specific experiments. + + @param group_id: The group ID to retrieve logs for. + @param experiment_id: The experiment ID to retrieve logs for. If None, logs for all experiments in the group. + + @return: An iterator of tuples, where each tuple contains the experiment ID, the node ID, and a raw (unparsed) + log line.""" pass class OutputManager(AbstractContextManager): + """An :class:`OutputManager` is responsible for managing output locations for log splitting operations. + :class:`OutputManager`s must be closed after use, and implements the context manager interface to that end.""" + def open(self, relative_path: Path) -> TextIO: + """Opens a file for writing within a relative abstract path.""" if relative_path.is_absolute(): raise ValueError(f"Path {relative_path} must be relative.") return self._open(relative_path) @@ -44,6 +59,8 @@ class OutputManager(AbstractContextManager): class FSOutputManager(OutputManager): + """Simple :class:`OutputManager` which writes directly into the file system.""" + def __init__(self, root: Path) -> None: self.root = root self.open_files: List[TextIO] = [] @@ -65,11 +82,20 @@ class FSOutputManager(OutputManager): class VectorFlatFileSource(LogSource): + """Log source for flat JSONL files produced by [Vector](https://vector.dev/). This is typically used when running + experiments locally within, say, Minikube or Kind.""" + def __init__(self, file: TextIO, app_name: str): self.file = file self.app_name = app_name def experiments(self, group_id: str) -> Iterator[str]: + """ + Retrieves all experiment IDs within an experiment group. Can be quite slow as this source supports + no indexing or aggregation. + + See also: :meth:`LogSource.experiments`. + """ app_label = f'"app.kubernetes.io/name":"{self.app_name}"' group_label = f'"app.kubernetes.io/part-of":"{group_id}"' seen = set() @@ -91,6 +117,10 @@ class VectorFlatFileSource(LogSource): def logs( self, group_id: str, experiment_id: Optional[str] = None ) -> Iterator[Tuple[ExperimentId, NodeId, RawLine]]: + """Retrieves logs for either all experiments within a group, or a specific experiments. Again, since this + source supports no indexing this can be quite slow, as each query represents a full pass on the file. + I strongly encourage not attempting to retrieve logs for experiments individually. + """ app_label = f'"app.kubernetes.io/name":"{self.app_name}"' group_label = f'"app.kubernetes.io/part-of":"{group_id}"' experiment_label = f'"app.kubernetes.io/instance":"{experiment_id}"' @@ -117,6 +147,16 @@ def split_logs_in_source( group_id: str, formats: Optional[List[Tuple[Type[LogEntry], LogSplitterFormats]]] = None, ) -> None: + """ + Parses logs for an entire experiment group and splits them onto separate folders per experiment, as well + as separate files for each log type. This makes it suitable for consumption by an analysis environment. + + :param log_source: The :class:`LogSource` to retrieve logs from. + :param log_parser: A suitably configured :class:`LogParser` which can understand the logs. + :param output_manager: An :class:`OutputManager` to manage where output content gets placed. + :param group_id: The group ID to retrieve logs for. + :param formats: An additional format configuration to be fed onto :class:`LogSplitter`. + """ splitters: Dict[str, LogSplitter] = {} formats = formats if formats else []