diff --git a/benchmarks/cli.py b/benchmarks/cli.py index a7c431e..1c65653 100644 --- a/benchmarks/cli.py +++ b/benchmarks/cli.py @@ -5,6 +5,7 @@ from pathlib import Path from typing import Dict import uvicorn +from elasticsearch import Elasticsearch from pydantic import IPvAnyAddress from pydantic_core import ValidationError from typing_extensions import TypeVar @@ -21,7 +22,12 @@ from benchmarks.logging.logging import ( LogEntry, LogSplitterFormats, ) -from benchmarks.logging.sources.sources import FSOutputManager, split_logs_in_source +from benchmarks.logging.sources.logstash import LogstashSource +from benchmarks.logging.sources.sources import ( + FSOutputManager, + split_logs_in_source, + LogSource, +) from benchmarks.logging.sources.vector_flat_file import VectorFlatFileSource experiment_config_parser = ConfigParser[ExperimentBuilder]() @@ -87,11 +93,7 @@ def cmd_parse_single_log(log: Path, output: Path): splitter.split(log_parser.parse(istream)) -def cmd_parse_log_source(group_id: str, source_file: Path, output_dir: Path): - if not source_file.exists(): - print(f"Log source file {source_file} does not exist.") - sys.exit(-1) - +def cmd_parse_log_source(source: LogSource, group_id: str, output_dir: Path): if not output_dir.parent.exists(): print(f"Folder {output_dir.parent} does not exist.") sys.exit(-1) @@ -99,10 +101,9 @@ def cmd_parse_log_source(group_id: str, source_file: Path, output_dir: Path): output_dir.mkdir(exist_ok=True) with ( - source_file.open("r", encoding="utf-8") as istream, + source as log_source, FSOutputManager(output_dir) as output_manager, ): - log_source = VectorFlatFileSource(app_name="codex-benchmarks", file=istream) split_logs_in_source( log_source, log_parser, @@ -146,6 +147,27 @@ def _parse_config( sys.exit(-1) +def _configure_source(args): + # TODO we should probably have builders for sources as well, but for now + # we'll just keep it simple. + if args.source_file: + if not args.source_file.exists(): + print(f"Log source file {args.source_file} does not exist.") + sys.exit(-1) + return VectorFlatFileSource( + app_name="codex-benchmarks", file=args.source_file.open(encoding="utf-8") + ) + else: + import urllib3 + + urllib3.disable_warnings() + + return LogstashSource( + Elasticsearch(args.es_url, verify_certs=False), + structured_only=True, + ) + + def _init_logging(): import logging @@ -213,8 +235,13 @@ def main(): log_source_cmd = log_subcommands.add_parser( "source", help="Parse logs from a log source." ) - log_source_cmd.add_argument( - "source_file", type=Path, help="Vector log file to parse from." + + group = log_source_cmd.add_mutually_exclusive_group(required=True) + group.add_argument( + "--source-file", type=Path, help="Vector log file to parse from." + ) + group.add_argument( + "--es-url", type=str, help="URL to a logstash Elasticsearch instance." ) log_source_cmd.add_argument( "output_dir", type=Path, help="Path to an output folder." @@ -224,7 +251,7 @@ def main(): ) log_source_cmd.set_defaults( func=lambda args: cmd_parse_log_source( - args.group_id, args.source_file, args.output_dir + _configure_source(args), args.group_id, args.output_dir ) ) diff --git a/benchmarks/logging/sources/logstash.py b/benchmarks/logging/sources/logstash.py index 426d050..e6f05e4 100644 --- a/benchmarks/logging/sources/logstash.py +++ b/benchmarks/logging/sources/logstash.py @@ -1,6 +1,7 @@ +import datetime import logging from collections.abc import Iterator -from typing import Optional, Tuple, Any, Dict +from typing import Optional, Tuple, Any, Dict, List from elasticsearch import Elasticsearch @@ -8,6 +9,8 @@ from benchmarks.logging.sources.sources import LogSource, ExperimentId, NodeId, GROUP_LABEL = "app.kubernetes.io/part-of" EXPERIMENT_LABEL = "app.kubernetes.io/instance" +DEFAULT_HORIZON = 5 +ES_MAX_BATCH_SIZE = 10_000 logger = logging.getLogger(__name__) @@ -21,6 +24,8 @@ class LogstashSource(LogSource): client: Elasticsearch, structured_only: bool = False, chronological: bool = False, + horizon: int = DEFAULT_HORIZON, + today: Optional[datetime.date] = None, ): """ @:param client: Elasticsearch client to use for retrieving logs @@ -31,6 +36,17 @@ class LogstashSource(LogSource): self.client = client self.structured_only = structured_only self.chronological = chronological + self._indexes = self._generate_indexes(today, horizon) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.client.close() + + @property + def indexes(self) -> List[str]: + return list(self._indexes) def experiments(self, group_id: str) -> Iterator[str]: """Retrieves all experiment IDs within an experiment group.""" @@ -74,9 +90,14 @@ class LogstashSource(LogSource): if self.chronological: query["sort"] = [{"@timestamp": {"order": "asc"}}] + # We can probably cache this, but for now OK. + actual_indexes = [ + index for index in self.indexes if self.client.indices.exists(index=index) + ] + # Scrolls are much cheaper than queries. scroll_response = self.client.search( - index="benchmarks-*", body=query, scroll="2m" + index=actual_indexes, body=query, scroll="2m", size=ES_MAX_BATCH_SIZE ) scroll_id = scroll_response["_scroll_id"] @@ -110,3 +131,12 @@ class LogstashSource(LogSource): finally: # Clean up scroll context self.client.clear_scroll(scroll_id=scroll_id) + + def _generate_indexes(self, today: Optional[datetime.date], horizon: int): + if today is None: + today = datetime.date.today() + + return [ + f"benchmarks-{(today - datetime.timedelta(days=i)).strftime('%Y.%m.%d')}" + for i in range(horizon) + ] diff --git a/benchmarks/logging/sources/sources.py b/benchmarks/logging/sources/sources.py index 566af25..7e3aa24 100644 --- a/benchmarks/logging/sources/sources.py +++ b/benchmarks/logging/sources/sources.py @@ -1,5 +1,5 @@ import logging -from abc import ABC, abstractmethod +from abc import abstractmethod from collections.abc import Iterator from contextlib import AbstractContextManager from pathlib import Path @@ -19,7 +19,7 @@ NodeId = str logger = logging.getLogger(__name__) -class LogSource(ABC): +class LogSource(AbstractContextManager): """:class:`LogSource` knows how to retrieve logs for experiments within experiment groups. A key assumption is that group ids are known, and those can usually be recovered from, say, a workflow run.""" diff --git a/benchmarks/logging/sources/tests/test_logstash_source.py b/benchmarks/logging/sources/tests/test_logstash_source.py index c6619ff..2d90466 100644 --- a/benchmarks/logging/sources/tests/test_logstash_source.py +++ b/benchmarks/logging/sources/tests/test_logstash_source.py @@ -1,8 +1,9 @@ import pytest +from elasticsearch import Elasticsearch from benchmarks.deluge.logging import DelugeTorrentDownload from benchmarks.logging.logging import LogParser -from datetime import datetime, timezone +from datetime import datetime, timezone, date from benchmarks.logging.sources.logstash import LogstashSource @@ -14,6 +15,22 @@ def _log_lines(source, experiment_id, group_id): ) +def test_should_look_into_k_day_horizon(): + source = LogstashSource( + Elasticsearch("http://bogus.com:9000/"), + today=date(2025, 1, 21), + horizon=5, + ) + + assert source.indexes == [ + "benchmarks-2025.01.21", + "benchmarks-2025.01.20", + "benchmarks-2025.01.19", + "benchmarks-2025.01.18", + "benchmarks-2025.01.17", + ] + + @pytest.mark.integration def test_should_retrieve_unstructured_log_messages(benchmark_logs_client): source = LogstashSource(benchmark_logs_client, chronological=True) @@ -21,6 +38,7 @@ def test_should_retrieve_unstructured_log_messages(benchmark_logs_client): assert not all(">>" in line for line in lines) +@pytest.mark.integration def test_filter_out_unstructured_log_messages(benchmark_logs_client): source = LogstashSource( benchmark_logs_client, structured_only=True, chronological=True diff --git a/benchmarks/logging/sources/vector_flat_file.py b/benchmarks/logging/sources/vector_flat_file.py index e0ba2da..9f74f4c 100644 --- a/benchmarks/logging/sources/vector_flat_file.py +++ b/benchmarks/logging/sources/vector_flat_file.py @@ -17,6 +17,12 @@ class VectorFlatFileSource(LogSource): self.file = file self.app_name = app_name + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.file.close() + def experiments(self, group_id: str) -> Iterator[str]: """ Retrieves all experiment IDs within an experiment group. Can be quite slow as this source supports