refactor: split log source module

This commit is contained in:
gmega 2025-01-21 08:42:26 -03:00
parent aeb2f044c8
commit 485b86465d
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
9 changed files with 88 additions and 86 deletions

View File

@ -21,11 +21,8 @@ from benchmarks.logging.logging import (
LogEntry,
LogSplitterFormats,
)
from benchmarks.logging.sources import (
VectorFlatFileSource,
FSOutputManager,
split_logs_in_source,
)
from benchmarks.logging.sources.sources import FSOutputManager, split_logs_in_source
from benchmarks.logging.sources.vector_flat_file import VectorFlatFileSource
experiment_config_parser = ConfigParser[ExperimentBuilder]()
experiment_config_parser.register(DelugeExperimentConfig)

View File

@ -1,3 +1,6 @@
"""This module standardizes interfaces for consuming logs from external log sources; i.e. infrastructure
that stores logs. Such infrastructure might be a simple file system, a service like Logstash, or a database."""
import datetime
import json
import logging

View File

View File

@ -1,14 +1,9 @@
"""This module standardizes interfaces for consuming logs from external log sources; i.e. infrastructure
that stores logs. Such infrastructure might be a simple file system, a service like Logstash, or a database."""
import json
import logging
from abc import ABC, abstractmethod
from collections.abc import Iterator
from contextlib import AbstractContextManager
from json import JSONDecodeError
from pathlib import Path
from typing import TextIO, Optional, Tuple, List, Dict, Type, IO
from typing import Optional, Tuple, List, Dict, Type, IO
from benchmarks.logging.logging import (
LogParser,
@ -85,75 +80,6 @@ class FSOutputManager(OutputManager):
pass
class VectorFlatFileSource(LogSource):
"""Log source for flat JSONL files produced by [Vector](https://vector.dev/). This is typically used when running
experiments locally within, say, Minikube or Kind."""
def __init__(self, file: TextIO, app_name: str):
self.file = file
self.app_name = app_name
def experiments(self, group_id: str) -> Iterator[str]:
"""
Retrieves all experiment IDs within an experiment group. Can be quite slow as this source supports
no indexing or aggregation.
See also: :meth:`LogSource.experiments`.
"""
app_label = f'"app.kubernetes.io/name":"{self.app_name}"'
group_label = f'"app.kubernetes.io/part-of":"{group_id}"'
seen = set()
self.file.seek(0)
for line in self.file:
if app_label not in line or group_label not in line:
continue
parsed = json.loads(line)
experiment_id = parsed["kubernetes"]["pod_labels"][
"app.kubernetes.io/instance"
]
if experiment_id in seen:
continue
seen.add(experiment_id)
yield experiment_id
def logs(
self, group_id: str, experiment_id: Optional[str] = None
) -> Iterator[Tuple[ExperimentId, NodeId, RawLine]]:
"""Retrieves logs for either all experiments within a group, or a specific experiments. Again, since this
source supports no indexing this can be quite slow, as each query represents a full pass on the file.
I strongly encourage not attempting to retrieve logs for experiments individually.
"""
app_label = f'"app.kubernetes.io/name":"{self.app_name}"'
group_label = f'"app.kubernetes.io/part-of":"{group_id}"'
experiment_label = f'"app.kubernetes.io/instance":"{experiment_id}"'
self.file.seek(0)
for line in self.file:
# Does a cheaper match to avoid parsing every line.
if app_label in line and group_label in line:
if experiment_id is not None and experiment_label not in line:
continue
try:
parsed = json.loads(line)
except JSONDecodeError as err:
logger.error(
f"Failed to parse line from vector from source {line}", err
)
continue
k8s = parsed["kubernetes"]
yield (
k8s["pod_labels"]["app.kubernetes.io/instance"],
k8s["pod_name"],
parsed["message"],
)
def __str__(self):
return f"VectorFlatFileSource({self.app_name})"
def split_logs_in_source(
log_source: LogSource,
log_parser: LogParser,

View File

@ -2,10 +2,8 @@ import datetime
from io import StringIO
from benchmarks.logging.logging import LogEntry, LogParser
from benchmarks.logging.sources import (
VectorFlatFileSource,
split_logs_in_source,
)
from benchmarks.logging.sources.sources import split_logs_in_source
from benchmarks.logging.sources.vector_flat_file import VectorFlatFileSource
from benchmarks.logging.tests.utils import InMemoryOutputManager
from benchmarks.tests.utils import make_jsonl, compact

View File

@ -1,6 +1,6 @@
from io import StringIO
from benchmarks.logging.sources import VectorFlatFileSource
from benchmarks.logging.sources.vector_flat_file import VectorFlatFileSource
from benchmarks.tests.utils import make_jsonl
EXPERIMENT_LOG = [

View File

@ -0,0 +1,78 @@
import json
from collections.abc import Iterator
from json import JSONDecodeError
from typing import TextIO, Optional, Tuple
import logging
from benchmarks.logging.sources.sources import LogSource, ExperimentId, NodeId, RawLine
logger = logging.getLogger(__name__)
class VectorFlatFileSource(LogSource):
"""Log source for flat JSONL files produced by [Vector](https://vector.dev/). This is typically used when running
experiments locally within, say, Minikube or Kind."""
def __init__(self, file: TextIO, app_name: str):
self.file = file
self.app_name = app_name
def experiments(self, group_id: str) -> Iterator[str]:
"""
Retrieves all experiment IDs within an experiment group. Can be quite slow as this source supports
no indexing or aggregation.
See also: :meth:`LogSource.experiments`.
"""
app_label = f'"app.kubernetes.io/name":"{self.app_name}"'
group_label = f'"app.kubernetes.io/part-of":"{group_id}"'
seen = set()
self.file.seek(0)
for line in self.file:
if app_label not in line or group_label not in line:
continue
parsed = json.loads(line)
experiment_id = parsed["kubernetes"]["pod_labels"][
"app.kubernetes.io/instance"
]
if experiment_id in seen:
continue
seen.add(experiment_id)
yield experiment_id
def logs(
self, group_id: str, experiment_id: Optional[str] = None
) -> Iterator[Tuple[ExperimentId, NodeId, RawLine]]:
"""Retrieves logs for either all experiments within a group, or a specific experiments. Again, since this
source supports no indexing this can be quite slow, as each query represents a full pass on the file.
I strongly encourage not attempting to retrieve logs for experiments individually.
"""
app_label = f'"app.kubernetes.io/name":"{self.app_name}"'
group_label = f'"app.kubernetes.io/part-of":"{group_id}"'
experiment_label = f'"app.kubernetes.io/instance":"{experiment_id}"'
self.file.seek(0)
for line in self.file:
# Does a cheaper match to avoid parsing every line.
if app_label in line and group_label in line:
if experiment_id is not None and experiment_label not in line:
continue
try:
parsed = json.loads(line)
except JSONDecodeError as err:
logger.error(
f"Failed to parse line from vector from source {line}", err
)
continue
k8s = parsed["kubernetes"]
yield (
k8s["pod_labels"]["app.kubernetes.io/instance"],
k8s["pod_name"],
parsed["message"],
)
def __str__(self):
return f"VectorFlatFileSource({self.app_name})"

View File

@ -1,6 +1,6 @@
from io import StringIO
from benchmarks.logging.sources import OutputManager
from benchmarks.logging.sources.sources import OutputManager
class InMemoryOutputManager(OutputManager):