diff --git a/benchmarks/cli.py b/benchmarks/cli.py index d576634..a7c431e 100644 --- a/benchmarks/cli.py +++ b/benchmarks/cli.py @@ -21,11 +21,8 @@ from benchmarks.logging.logging import ( LogEntry, LogSplitterFormats, ) -from benchmarks.logging.sources import ( - VectorFlatFileSource, - FSOutputManager, - split_logs_in_source, -) +from benchmarks.logging.sources.sources import FSOutputManager, split_logs_in_source +from benchmarks.logging.sources.vector_flat_file import VectorFlatFileSource experiment_config_parser = ConfigParser[ExperimentBuilder]() experiment_config_parser.register(DelugeExperimentConfig) diff --git a/benchmarks/logging/logging.py b/benchmarks/logging/logging.py index f717ed1..1ec3e12 100644 --- a/benchmarks/logging/logging.py +++ b/benchmarks/logging/logging.py @@ -1,3 +1,6 @@ +"""This module standardizes interfaces for consuming logs from external log sources; i.e. infrastructure +that stores logs. Such infrastructure might be a simple file system, a service like Logstash, or a database.""" + import datetime import json import logging diff --git a/benchmarks/logging/sources/__init__.py b/benchmarks/logging/sources/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/benchmarks/logging/sources.py b/benchmarks/logging/sources/sources.py similarity index 59% rename from benchmarks/logging/sources.py rename to benchmarks/logging/sources/sources.py index 8117c8a..566af25 100644 --- a/benchmarks/logging/sources.py +++ b/benchmarks/logging/sources/sources.py @@ -1,14 +1,9 @@ -"""This module standardizes interfaces for consuming logs from external log sources; i.e. infrastructure -that stores logs. Such infrastructure might be a simple file system, a service like Logstash, or a database.""" - -import json import logging from abc import ABC, abstractmethod from collections.abc import Iterator from contextlib import AbstractContextManager -from json import JSONDecodeError from pathlib import Path -from typing import TextIO, Optional, Tuple, List, Dict, Type, IO +from typing import Optional, Tuple, List, Dict, Type, IO from benchmarks.logging.logging import ( LogParser, @@ -85,75 +80,6 @@ class FSOutputManager(OutputManager): pass -class VectorFlatFileSource(LogSource): - """Log source for flat JSONL files produced by [Vector](https://vector.dev/). This is typically used when running - experiments locally within, say, Minikube or Kind.""" - - def __init__(self, file: TextIO, app_name: str): - self.file = file - self.app_name = app_name - - def experiments(self, group_id: str) -> Iterator[str]: - """ - Retrieves all experiment IDs within an experiment group. Can be quite slow as this source supports - no indexing or aggregation. - - See also: :meth:`LogSource.experiments`. - """ - app_label = f'"app.kubernetes.io/name":"{self.app_name}"' - group_label = f'"app.kubernetes.io/part-of":"{group_id}"' - seen = set() - - self.file.seek(0) - for line in self.file: - if app_label not in line or group_label not in line: - continue - - parsed = json.loads(line) - experiment_id = parsed["kubernetes"]["pod_labels"][ - "app.kubernetes.io/instance" - ] - if experiment_id in seen: - continue - seen.add(experiment_id) - yield experiment_id - - def logs( - self, group_id: str, experiment_id: Optional[str] = None - ) -> Iterator[Tuple[ExperimentId, NodeId, RawLine]]: - """Retrieves logs for either all experiments within a group, or a specific experiments. Again, since this - source supports no indexing this can be quite slow, as each query represents a full pass on the file. - I strongly encourage not attempting to retrieve logs for experiments individually. - """ - app_label = f'"app.kubernetes.io/name":"{self.app_name}"' - group_label = f'"app.kubernetes.io/part-of":"{group_id}"' - experiment_label = f'"app.kubernetes.io/instance":"{experiment_id}"' - - self.file.seek(0) - for line in self.file: - # Does a cheaper match to avoid parsing every line. - if app_label in line and group_label in line: - if experiment_id is not None and experiment_label not in line: - continue - try: - parsed = json.loads(line) - except JSONDecodeError as err: - logger.error( - f"Failed to parse line from vector from source {line}", err - ) - continue - - k8s = parsed["kubernetes"] - yield ( - k8s["pod_labels"]["app.kubernetes.io/instance"], - k8s["pod_name"], - parsed["message"], - ) - - def __str__(self): - return f"VectorFlatFileSource({self.app_name})" - - def split_logs_in_source( log_source: LogSource, log_parser: LogParser, diff --git a/benchmarks/logging/sources/tests/__init__.py b/benchmarks/logging/sources/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/benchmarks/logging/tests/test_split_logs_in_source.py b/benchmarks/logging/sources/tests/test_split_logs_in_source.py similarity index 95% rename from benchmarks/logging/tests/test_split_logs_in_source.py rename to benchmarks/logging/sources/tests/test_split_logs_in_source.py index d0ba992..3bf176b 100644 --- a/benchmarks/logging/tests/test_split_logs_in_source.py +++ b/benchmarks/logging/sources/tests/test_split_logs_in_source.py @@ -2,10 +2,8 @@ import datetime from io import StringIO from benchmarks.logging.logging import LogEntry, LogParser -from benchmarks.logging.sources import ( - VectorFlatFileSource, - split_logs_in_source, -) +from benchmarks.logging.sources.sources import split_logs_in_source +from benchmarks.logging.sources.vector_flat_file import VectorFlatFileSource from benchmarks.logging.tests.utils import InMemoryOutputManager from benchmarks.tests.utils import make_jsonl, compact diff --git a/benchmarks/logging/tests/test_sources.py b/benchmarks/logging/sources/tests/test_vector_flat_file.py similarity index 97% rename from benchmarks/logging/tests/test_sources.py rename to benchmarks/logging/sources/tests/test_vector_flat_file.py index a454f81..9b8d237 100644 --- a/benchmarks/logging/tests/test_sources.py +++ b/benchmarks/logging/sources/tests/test_vector_flat_file.py @@ -1,6 +1,6 @@ from io import StringIO -from benchmarks.logging.sources import VectorFlatFileSource +from benchmarks.logging.sources.vector_flat_file import VectorFlatFileSource from benchmarks.tests.utils import make_jsonl EXPERIMENT_LOG = [ diff --git a/benchmarks/logging/sources/vector_flat_file.py b/benchmarks/logging/sources/vector_flat_file.py new file mode 100644 index 0000000..e0ba2da --- /dev/null +++ b/benchmarks/logging/sources/vector_flat_file.py @@ -0,0 +1,78 @@ +import json +from collections.abc import Iterator +from json import JSONDecodeError +from typing import TextIO, Optional, Tuple +import logging + +from benchmarks.logging.sources.sources import LogSource, ExperimentId, NodeId, RawLine + +logger = logging.getLogger(__name__) + + +class VectorFlatFileSource(LogSource): + """Log source for flat JSONL files produced by [Vector](https://vector.dev/). This is typically used when running + experiments locally within, say, Minikube or Kind.""" + + def __init__(self, file: TextIO, app_name: str): + self.file = file + self.app_name = app_name + + def experiments(self, group_id: str) -> Iterator[str]: + """ + Retrieves all experiment IDs within an experiment group. Can be quite slow as this source supports + no indexing or aggregation. + + See also: :meth:`LogSource.experiments`. + """ + app_label = f'"app.kubernetes.io/name":"{self.app_name}"' + group_label = f'"app.kubernetes.io/part-of":"{group_id}"' + seen = set() + + self.file.seek(0) + for line in self.file: + if app_label not in line or group_label not in line: + continue + + parsed = json.loads(line) + experiment_id = parsed["kubernetes"]["pod_labels"][ + "app.kubernetes.io/instance" + ] + if experiment_id in seen: + continue + seen.add(experiment_id) + yield experiment_id + + def logs( + self, group_id: str, experiment_id: Optional[str] = None + ) -> Iterator[Tuple[ExperimentId, NodeId, RawLine]]: + """Retrieves logs for either all experiments within a group, or a specific experiments. Again, since this + source supports no indexing this can be quite slow, as each query represents a full pass on the file. + I strongly encourage not attempting to retrieve logs for experiments individually. + """ + app_label = f'"app.kubernetes.io/name":"{self.app_name}"' + group_label = f'"app.kubernetes.io/part-of":"{group_id}"' + experiment_label = f'"app.kubernetes.io/instance":"{experiment_id}"' + + self.file.seek(0) + for line in self.file: + # Does a cheaper match to avoid parsing every line. + if app_label in line and group_label in line: + if experiment_id is not None and experiment_label not in line: + continue + try: + parsed = json.loads(line) + except JSONDecodeError as err: + logger.error( + f"Failed to parse line from vector from source {line}", err + ) + continue + + k8s = parsed["kubernetes"] + yield ( + k8s["pod_labels"]["app.kubernetes.io/instance"], + k8s["pod_name"], + parsed["message"], + ) + + def __str__(self): + return f"VectorFlatFileSource({self.app_name})" diff --git a/benchmarks/logging/tests/utils.py b/benchmarks/logging/tests/utils.py index 88b350a..d3be83b 100644 --- a/benchmarks/logging/tests/utils.py +++ b/benchmarks/logging/tests/utils.py @@ -1,6 +1,6 @@ from io import StringIO -from benchmarks.logging.sources import OutputManager +from benchmarks.logging.sources.sources import OutputManager class InMemoryOutputManager(OutputManager):