feat: add CLI for parsing logs from logstash

This commit is contained in:
gmega 2025-01-21 18:44:30 -03:00
parent 5fc241660b
commit 02e2fe39d1
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
5 changed files with 97 additions and 16 deletions

View File

@ -5,6 +5,7 @@ from pathlib import Path
from typing import Dict
import uvicorn
from elasticsearch import Elasticsearch
from pydantic import IPvAnyAddress
from pydantic_core import ValidationError
from typing_extensions import TypeVar
@ -21,7 +22,12 @@ from benchmarks.logging.logging import (
LogEntry,
LogSplitterFormats,
)
from benchmarks.logging.sources.sources import FSOutputManager, split_logs_in_source
from benchmarks.logging.sources.logstash import LogstashSource
from benchmarks.logging.sources.sources import (
FSOutputManager,
split_logs_in_source,
LogSource,
)
from benchmarks.logging.sources.vector_flat_file import VectorFlatFileSource
experiment_config_parser = ConfigParser[ExperimentBuilder]()
@ -87,11 +93,7 @@ def cmd_parse_single_log(log: Path, output: Path):
splitter.split(log_parser.parse(istream))
def cmd_parse_log_source(group_id: str, source_file: Path, output_dir: Path):
if not source_file.exists():
print(f"Log source file {source_file} does not exist.")
sys.exit(-1)
def cmd_parse_log_source(source: LogSource, group_id: str, output_dir: Path):
if not output_dir.parent.exists():
print(f"Folder {output_dir.parent} does not exist.")
sys.exit(-1)
@ -99,10 +101,9 @@ def cmd_parse_log_source(group_id: str, source_file: Path, output_dir: Path):
output_dir.mkdir(exist_ok=True)
with (
source_file.open("r", encoding="utf-8") as istream,
source as log_source,
FSOutputManager(output_dir) as output_manager,
):
log_source = VectorFlatFileSource(app_name="codex-benchmarks", file=istream)
split_logs_in_source(
log_source,
log_parser,
@ -146,6 +147,27 @@ def _parse_config(
sys.exit(-1)
def _configure_source(args):
# TODO we should probably have builders for sources as well, but for now
# we'll just keep it simple.
if args.source_file:
if not args.source_file.exists():
print(f"Log source file {args.source_file} does not exist.")
sys.exit(-1)
return VectorFlatFileSource(
app_name="codex-benchmarks", file=args.source_file.open(encoding="utf-8")
)
else:
import urllib3
urllib3.disable_warnings()
return LogstashSource(
Elasticsearch(args.es_url, verify_certs=False),
structured_only=True,
)
def _init_logging():
import logging
@ -213,8 +235,13 @@ def main():
log_source_cmd = log_subcommands.add_parser(
"source", help="Parse logs from a log source."
)
log_source_cmd.add_argument(
"source_file", type=Path, help="Vector log file to parse from."
group = log_source_cmd.add_mutually_exclusive_group(required=True)
group.add_argument(
"--source-file", type=Path, help="Vector log file to parse from."
)
group.add_argument(
"--es-url", type=str, help="URL to a logstash Elasticsearch instance."
)
log_source_cmd.add_argument(
"output_dir", type=Path, help="Path to an output folder."
@ -224,7 +251,7 @@ def main():
)
log_source_cmd.set_defaults(
func=lambda args: cmd_parse_log_source(
args.group_id, args.source_file, args.output_dir
_configure_source(args), args.group_id, args.output_dir
)
)

View File

@ -1,6 +1,7 @@
import datetime
import logging
from collections.abc import Iterator
from typing import Optional, Tuple, Any, Dict
from typing import Optional, Tuple, Any, Dict, List
from elasticsearch import Elasticsearch
@ -8,6 +9,8 @@ from benchmarks.logging.sources.sources import LogSource, ExperimentId, NodeId,
GROUP_LABEL = "app.kubernetes.io/part-of"
EXPERIMENT_LABEL = "app.kubernetes.io/instance"
DEFAULT_HORIZON = 5
ES_MAX_BATCH_SIZE = 10_000
logger = logging.getLogger(__name__)
@ -21,6 +24,8 @@ class LogstashSource(LogSource):
client: Elasticsearch,
structured_only: bool = False,
chronological: bool = False,
horizon: int = DEFAULT_HORIZON,
today: Optional[datetime.date] = None,
):
"""
@:param client: Elasticsearch client to use for retrieving logs
@ -31,6 +36,17 @@ class LogstashSource(LogSource):
self.client = client
self.structured_only = structured_only
self.chronological = chronological
self._indexes = self._generate_indexes(today, horizon)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.client.close()
@property
def indexes(self) -> List[str]:
return list(self._indexes)
def experiments(self, group_id: str) -> Iterator[str]:
"""Retrieves all experiment IDs within an experiment group."""
@ -74,9 +90,14 @@ class LogstashSource(LogSource):
if self.chronological:
query["sort"] = [{"@timestamp": {"order": "asc"}}]
# We can probably cache this, but for now OK.
actual_indexes = [
index for index in self.indexes if self.client.indices.exists(index=index)
]
# Scrolls are much cheaper than queries.
scroll_response = self.client.search(
index="benchmarks-*", body=query, scroll="2m"
index=actual_indexes, body=query, scroll="2m", size=ES_MAX_BATCH_SIZE
)
scroll_id = scroll_response["_scroll_id"]
@ -110,3 +131,12 @@ class LogstashSource(LogSource):
finally:
# Clean up scroll context
self.client.clear_scroll(scroll_id=scroll_id)
def _generate_indexes(self, today: Optional[datetime.date], horizon: int):
if today is None:
today = datetime.date.today()
return [
f"benchmarks-{(today - datetime.timedelta(days=i)).strftime('%Y.%m.%d')}"
for i in range(horizon)
]

View File

@ -1,5 +1,5 @@
import logging
from abc import ABC, abstractmethod
from abc import abstractmethod
from collections.abc import Iterator
from contextlib import AbstractContextManager
from pathlib import Path
@ -19,7 +19,7 @@ NodeId = str
logger = logging.getLogger(__name__)
class LogSource(ABC):
class LogSource(AbstractContextManager):
""":class:`LogSource` knows how to retrieve logs for experiments within experiment groups. A key assumption is that
group ids are known, and those can usually be recovered from, say, a workflow run."""

View File

@ -1,8 +1,9 @@
import pytest
from elasticsearch import Elasticsearch
from benchmarks.deluge.logging import DelugeTorrentDownload
from benchmarks.logging.logging import LogParser
from datetime import datetime, timezone
from datetime import datetime, timezone, date
from benchmarks.logging.sources.logstash import LogstashSource
@ -14,6 +15,22 @@ def _log_lines(source, experiment_id, group_id):
)
def test_should_look_into_k_day_horizon():
source = LogstashSource(
Elasticsearch("http://bogus.com:9000/"),
today=date(2025, 1, 21),
horizon=5,
)
assert source.indexes == [
"benchmarks-2025.01.21",
"benchmarks-2025.01.20",
"benchmarks-2025.01.19",
"benchmarks-2025.01.18",
"benchmarks-2025.01.17",
]
@pytest.mark.integration
def test_should_retrieve_unstructured_log_messages(benchmark_logs_client):
source = LogstashSource(benchmark_logs_client, chronological=True)
@ -21,6 +38,7 @@ def test_should_retrieve_unstructured_log_messages(benchmark_logs_client):
assert not all(">>" in line for line in lines)
@pytest.mark.integration
def test_filter_out_unstructured_log_messages(benchmark_logs_client):
source = LogstashSource(
benchmark_logs_client, structured_only=True, chronological=True

View File

@ -17,6 +17,12 @@ class VectorFlatFileSource(LogSource):
self.file = file
self.app_name = app_name
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.file.close()
def experiments(self, group_id: str) -> Iterator[str]:
"""
Retrieves all experiment IDs within an experiment group. Can be quite slow as this source supports