diff --git a/benchmarks/cli.py b/benchmarks/cli.py index a7c431e..1c65653 100644 --- a/benchmarks/cli.py +++ b/benchmarks/cli.py @@ -5,6 +5,7 @@ from pathlib import Path from typing import Dict import uvicorn +from elasticsearch import Elasticsearch from pydantic import IPvAnyAddress from pydantic_core import ValidationError from typing_extensions import TypeVar @@ -21,7 +22,12 @@ from benchmarks.logging.logging import ( LogEntry, LogSplitterFormats, ) -from benchmarks.logging.sources.sources import FSOutputManager, split_logs_in_source +from benchmarks.logging.sources.logstash import LogstashSource +from benchmarks.logging.sources.sources import ( + FSOutputManager, + split_logs_in_source, + LogSource, +) from benchmarks.logging.sources.vector_flat_file import VectorFlatFileSource experiment_config_parser = ConfigParser[ExperimentBuilder]() @@ -87,11 +93,7 @@ def cmd_parse_single_log(log: Path, output: Path): splitter.split(log_parser.parse(istream)) -def cmd_parse_log_source(group_id: str, source_file: Path, output_dir: Path): - if not source_file.exists(): - print(f"Log source file {source_file} does not exist.") - sys.exit(-1) - +def cmd_parse_log_source(source: LogSource, group_id: str, output_dir: Path): if not output_dir.parent.exists(): print(f"Folder {output_dir.parent} does not exist.") sys.exit(-1) @@ -99,10 +101,9 @@ def cmd_parse_log_source(group_id: str, source_file: Path, output_dir: Path): output_dir.mkdir(exist_ok=True) with ( - source_file.open("r", encoding="utf-8") as istream, + source as log_source, FSOutputManager(output_dir) as output_manager, ): - log_source = VectorFlatFileSource(app_name="codex-benchmarks", file=istream) split_logs_in_source( log_source, log_parser, @@ -146,6 +147,27 @@ def _parse_config( sys.exit(-1) +def _configure_source(args): + # TODO we should probably have builders for sources as well, but for now + # we'll just keep it simple. + if args.source_file: + if not args.source_file.exists(): + print(f"Log source file {args.source_file} does not exist.") + sys.exit(-1) + return VectorFlatFileSource( + app_name="codex-benchmarks", file=args.source_file.open(encoding="utf-8") + ) + else: + import urllib3 + + urllib3.disable_warnings() + + return LogstashSource( + Elasticsearch(args.es_url, verify_certs=False), + structured_only=True, + ) + + def _init_logging(): import logging @@ -213,8 +235,13 @@ def main(): log_source_cmd = log_subcommands.add_parser( "source", help="Parse logs from a log source." ) - log_source_cmd.add_argument( - "source_file", type=Path, help="Vector log file to parse from." + + group = log_source_cmd.add_mutually_exclusive_group(required=True) + group.add_argument( + "--source-file", type=Path, help="Vector log file to parse from." + ) + group.add_argument( + "--es-url", type=str, help="URL to a logstash Elasticsearch instance." ) log_source_cmd.add_argument( "output_dir", type=Path, help="Path to an output folder." @@ -224,7 +251,7 @@ def main(): ) log_source_cmd.set_defaults( func=lambda args: cmd_parse_log_source( - args.group_id, args.source_file, args.output_dir + _configure_source(args), args.group_id, args.output_dir ) ) diff --git a/benchmarks/conftest.py b/benchmarks/conftest.py index c928b36..a402b73 100644 --- a/benchmarks/conftest.py +++ b/benchmarks/conftest.py @@ -2,3 +2,4 @@ from benchmarks.deluge.tests.fixtures import * from benchmarks.core.tests.fixtures import * +from benchmarks.logging.sources.tests.fixtures import * diff --git a/benchmarks/logging/sources/logstash.py b/benchmarks/logging/sources/logstash.py new file mode 100644 index 0000000..e6f05e4 --- /dev/null +++ b/benchmarks/logging/sources/logstash.py @@ -0,0 +1,142 @@ +import datetime +import logging +from collections.abc import Iterator +from typing import Optional, Tuple, Any, Dict, List + +from elasticsearch import Elasticsearch + +from benchmarks.logging.sources.sources import LogSource, ExperimentId, NodeId, RawLine + +GROUP_LABEL = "app.kubernetes.io/part-of" +EXPERIMENT_LABEL = "app.kubernetes.io/instance" +DEFAULT_HORIZON = 5 +ES_MAX_BATCH_SIZE = 10_000 + +logger = logging.getLogger(__name__) + + +class LogstashSource(LogSource): + """Log source for logs stored in Elasticsearch by Logstash. This is typically used when running experiments + in a Kubernetes cluster.""" + + def __init__( + self, + client: Elasticsearch, + structured_only: bool = False, + chronological: bool = False, + horizon: int = DEFAULT_HORIZON, + today: Optional[datetime.date] = None, + ): + """ + @:param client: Elasticsearch client to use for retrieving logs + @:param structured_only: If True, only return structured log lines (those starting with '>>'). + @:param chronological: If True, return logs in chronological order. This is mostly meant for use + in testing, and can get quite slow/expensive for large queries. + """ + self.client = client + self.structured_only = structured_only + self.chronological = chronological + self._indexes = self._generate_indexes(today, horizon) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.client.close() + + @property + def indexes(self) -> List[str]: + return list(self._indexes) + + def experiments(self, group_id: str) -> Iterator[str]: + """Retrieves all experiment IDs within an experiment group.""" + query = { + "size": 0, + "query": { + "constant_score": { + "filter": {"term": {f"pod_labels.{GROUP_LABEL}.keyword": group_id}} + } + }, + "aggs": { + "experiments": { + "terms": { + "field": f"pod_labels.{EXPERIMENT_LABEL}.keyword", + "size": 1000, + } + } + }, + } + + response = self.client.search(index="benchmarks-*", body=query) + for bucket in response["aggregations"]["experiments"]["buckets"]: + yield bucket["key"] + + def logs( + self, group_id: str, experiment_id: Optional[str] = None + ) -> Iterator[Tuple[ExperimentId, NodeId, RawLine]]: + """Retrieves logs for either all experiments within a group, or a specific experiment.""" + filters = [{"term": {f"pod_labels.{GROUP_LABEL}.keyword": group_id}}] + + if experiment_id: + filters.append( + {"term": {f"pod_labels.{EXPERIMENT_LABEL}.keyword": experiment_id}} + ) + + if self.structured_only: + filters.append({"match_phrase": {"message": "entry_type"}}) + + query: Dict[str, Any] = {"query": {"bool": {"filter": filters}}} + + if self.chronological: + query["sort"] = [{"@timestamp": {"order": "asc"}}] + + # We can probably cache this, but for now OK. + actual_indexes = [ + index for index in self.indexes if self.client.indices.exists(index=index) + ] + + # Scrolls are much cheaper than queries. + scroll_response = self.client.search( + index=actual_indexes, body=query, scroll="2m", size=ES_MAX_BATCH_SIZE + ) + scroll_id = scroll_response["_scroll_id"] + + try: + while True: + hits = scroll_response["hits"]["hits"] + if not hits: + break + + for hit in hits: + source = hit["_source"] + message = source["message"] + + experiment_id = source["pod_labels"][EXPERIMENT_LABEL] + node_id = source["pod_name"] + + if ( + not isinstance(experiment_id, str) + or not isinstance(node_id, str) + or not isinstance(message, str) + ): + logger.warning( + "Skipping log entry with invalid data: %s", source + ) + continue + + yield experiment_id, node_id, message + + # Get next batch of results + scroll_response = self.client.scroll(scroll_id=scroll_id, scroll="2m") + finally: + # Clean up scroll context + self.client.clear_scroll(scroll_id=scroll_id) + + def _generate_indexes(self, today: Optional[datetime.date], horizon: int): + if today is None: + today = datetime.date.today() + + return [ + f"benchmarks-{(today - datetime.timedelta(days=i)).strftime('%Y.%m.%d')}" + for i in range(horizon) + ] diff --git a/benchmarks/logging/sources/sources.py b/benchmarks/logging/sources/sources.py index 566af25..7e3aa24 100644 --- a/benchmarks/logging/sources/sources.py +++ b/benchmarks/logging/sources/sources.py @@ -1,5 +1,5 @@ import logging -from abc import ABC, abstractmethod +from abc import abstractmethod from collections.abc import Iterator from contextlib import AbstractContextManager from pathlib import Path @@ -19,7 +19,7 @@ NodeId = str logger = logging.getLogger(__name__) -class LogSource(ABC): +class LogSource(AbstractContextManager): """:class:`LogSource` knows how to retrieve logs for experiments within experiment groups. A key assumption is that group ids are known, and those can usually be recovered from, say, a workflow run.""" diff --git a/benchmarks/logging/sources/tests/benchmarks-2025.01.21-documents.json b/benchmarks/logging/sources/tests/benchmarks-2025.01.21-documents.json new file mode 100644 index 0000000..f3deeb7 --- /dev/null +++ b/benchmarks/logging/sources/tests/benchmarks-2025.01.21-documents.json @@ -0,0 +1,80 @@ +{ + "documents": [ + { + "@timestamp": "2025-01-21T12:47:57.098459487Z", + "file": "/var/log/pods/codex-benchmarks_deluge-nodes-e3-g3-1_70c962b4-8fa3-43be-a664-aefb793c7b8c/deluge-node/0.log", + "message": "12:47:57.098 [INFO ][deluge.core.metrics :49 ] >>{\"entry_type\": \"deluge_torrent_download\", \"timestamp\": \"2025-01-21T12:47:57.098167+00:00\", \"name\": \"deluge_piece_downloaded\", \"value\": 310, \"node\": \"deluge-nodes-e3-g3-1\", \"torrent_name\": \"dataset-0-1\"}", + "pod_labels": { + "app.kubernetes.io/component": "deluge-node", + "app.kubernetes.io/instance": "e3", + "app.kubernetes.io/managed-by": "Helm", + "app.kubernetes.io/name": "codex-benchmarks", + "app.kubernetes.io/part-of": "g3", + "app.kubernetes.io/version": "1.0.0", + "apps.kubernetes.io/pod-index": "1", + "controller-revision-hash": "deluge-nodes-e3-g3-bd9d9789d", + "helm.sh/chart": "deluge-benchmarks-0.1.0", + "statefulset.kubernetes.io/pod-name": "deluge-nodes-e3-g3-1" + }, + "pod_name": "deluge-nodes-e3-g3-1", + "pod_namespace": "codex-benchmarks" + }, + { + "@timestamp": "2025-01-21T12:47:57.099459487Z", + "file": "/var/log/pods/codex-benchmarks_deluge-nodes-e3-g3-1_70c962b4-8fa3-43be-a664-aefb793c7b8c/deluge-node/0.log", + "message": "12:47:57.098 INFO An unstructured log message", + "pod_labels": { + "app.kubernetes.io/component": "deluge-node", + "app.kubernetes.io/instance": "e3", + "app.kubernetes.io/managed-by": "Helm", + "app.kubernetes.io/name": "codex-benchmarks", + "app.kubernetes.io/part-of": "g3", + "app.kubernetes.io/version": "1.0.0", + "apps.kubernetes.io/pod-index": "1", + "controller-revision-hash": "deluge-nodes-e3-g3-bd9d9789d", + "helm.sh/chart": "deluge-benchmarks-0.1.0", + "statefulset.kubernetes.io/pod-name": "deluge-nodes-e3-g3-1" + }, + "pod_name": "deluge-nodes-e3-g3-1", + "pod_namespace": "codex-benchmarks" + }, + { + "@timestamp": "2025-01-21T12:47:15.847325207Z", + "file": "/var/log/pods/codex-benchmarks_deluge-nodes-e3-g3-0_c42bf4d9-4f1d-40b2-9654-5de197153ac0/deluge-node/0.log", + "message": "12:47:15.847 [INFO ][deluge.core.metrics :49 ] >>{\"entry_type\": \"deluge_torrent_download\", \"timestamp\": \"2025-01-21T12:47:15.846761+00:00\", \"name\": \"deluge_piece_downloaded\", \"value\": 23, \"node\": \"deluge-nodes-e3-g3-0\", \"torrent_name\": \"dataset-0-0\"}", + "pod_labels": { + "app.kubernetes.io/component": "deluge-node", + "app.kubernetes.io/instance": "e3", + "app.kubernetes.io/managed-by": "Helm", + "app.kubernetes.io/name": "codex-benchmarks", + "app.kubernetes.io/part-of": "g3", + "app.kubernetes.io/version": "1.0.0", + "apps.kubernetes.io/pod-index": "0", + "controller-revision-hash": "deluge-nodes-e3-g3-bd9d9789d", + "helm.sh/chart": "deluge-benchmarks-0.1.0", + "statefulset.kubernetes.io/pod-name": "deluge-nodes-e3-g3-0" + }, + "pod_name": "deluge-nodes-e3-g3-0", + "pod_namespace": "codex-benchmarks" + }, + { + "@timestamp": "2025-01-21T12:47:57.123446028Z", + "file": "/var/log/pods/codex-benchmarks_deluge-nodes-e3-g3-1_70c962b4-8fa3-43be-a664-aefb793c7b8c/deluge-node/0.log", + "message": "12:47:57.123 [INFO ][deluge.core.metrics :49 ] >>{\"entry_type\": \"deluge_torrent_download\", \"timestamp\": \"2025-01-21T12:47:57.123105+00:00\", \"name\": \"deluge_piece_downloaded\", \"value\": 218, \"node\": \"deluge-nodes-e2-g2-1\", \"torrent_name\": \"dataset-0-1\"}", + "pod_labels": { + "app.kubernetes.io/component": "deluge-node", + "app.kubernetes.io/instance": "e2", + "app.kubernetes.io/managed-by": "Helm", + "app.kubernetes.io/name": "codex-benchmarks", + "app.kubernetes.io/part-of": "g3", + "app.kubernetes.io/version": "1.0.0", + "apps.kubernetes.io/pod-index": "1", + "controller-revision-hash": "deluge-nodes-e2-g2-bd9d9789d", + "helm.sh/chart": "deluge-benchmarks-0.1.0", + "statefulset.kubernetes.io/pod-name": "deluge-nodes-e2-g2-1" + }, + "pod_name": "deluge-nodes-e2-g2-1", + "pod_namespace": "codex-benchmarks" + } + ] +} \ No newline at end of file diff --git a/benchmarks/logging/sources/tests/benchmarks-2025.01.21-mapping.json b/benchmarks/logging/sources/tests/benchmarks-2025.01.21-mapping.json new file mode 100644 index 0000000..c80fdf7 --- /dev/null +++ b/benchmarks/logging/sources/tests/benchmarks-2025.01.21-mapping.json @@ -0,0 +1,220 @@ +{ + "mappings": { + "properties": { + "@timestamp": { + "type": "date" + }, + "file": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "message": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "pod_labels": { + "properties": { + "app": { + "properties": { + "kubernetes": { + "properties": { + "io/component": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "io/instance": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "io/managed-by": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "io/name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "io/part-of": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "io/version": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } + } + }, + "apps": { + "properties": { + "kubernetes": { + "properties": { + "io/pod-index": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } + } + }, + "batch": { + "properties": { + "kubernetes": { + "properties": { + "io/controller-uid": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "io/job-name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } + } + }, + "controller-revision-hash": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "controller-uid": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "helm": { + "properties": { + "sh/chart": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, + "job-name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "pod-template-hash": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "statefulset": { + "properties": { + "kubernetes": { + "properties": { + "io/pod-name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } + } + } + } + }, + "pod_name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "pod_namespace": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } +} diff --git a/benchmarks/logging/sources/tests/fixtures.py b/benchmarks/logging/sources/tests/fixtures.py new file mode 100644 index 0000000..9d2f903 --- /dev/null +++ b/benchmarks/logging/sources/tests/fixtures.py @@ -0,0 +1,40 @@ +import json +import os +from pathlib import Path +from typing import Dict, Any + +import pytest +from elasticsearch import Elasticsearch + +from benchmarks.core.utils import await_predicate + + +def _json_data(data: str) -> Dict[str, Any]: + with (Path(__file__).parent / data).open(encoding="utf-8") as json_file: + return json.load(json_file) + + +@pytest.fixture(scope="module") +def benchmark_logs_client() -> Elasticsearch: + client = Elasticsearch(os.environ.get("ELASTICSEARCH_URL", "http://localhost:9200")) + + if client.indices.exists(index="benchmarks-2025.01.21"): + client.indices.delete(index="benchmarks-2025.01.21") + + client.indices.create( + index="benchmarks-2025.01.21", + body=_json_data("benchmarks-2025.01.21-mapping.json"), + ) + + documents = _json_data("benchmarks-2025.01.21-documents.json")["documents"] + for document in documents: + client.index(index="benchmarks-2025.01.21", body=document) + + def _is_indexed() -> bool: + return client.count(index="benchmarks-2025.01.21")["count"] == len(documents) + + assert await_predicate( + _is_indexed, timeout=10, polling_interval=0.5 + ), "Indexing failed" + + return client diff --git a/benchmarks/logging/sources/tests/test_logstash_source.py b/benchmarks/logging/sources/tests/test_logstash_source.py new file mode 100644 index 0000000..2d90466 --- /dev/null +++ b/benchmarks/logging/sources/tests/test_logstash_source.py @@ -0,0 +1,110 @@ +import pytest +from elasticsearch import Elasticsearch + +from benchmarks.deluge.logging import DelugeTorrentDownload +from benchmarks.logging.logging import LogParser +from datetime import datetime, timezone, date + +from benchmarks.logging.sources.logstash import LogstashSource + + +def _log_lines(source, experiment_id, group_id): + return ( + rawline + for _, _, rawline in source.logs(experiment_id=experiment_id, group_id=group_id) + ) + + +def test_should_look_into_k_day_horizon(): + source = LogstashSource( + Elasticsearch("http://bogus.com:9000/"), + today=date(2025, 1, 21), + horizon=5, + ) + + assert source.indexes == [ + "benchmarks-2025.01.21", + "benchmarks-2025.01.20", + "benchmarks-2025.01.19", + "benchmarks-2025.01.18", + "benchmarks-2025.01.17", + ] + + +@pytest.mark.integration +def test_should_retrieve_unstructured_log_messages(benchmark_logs_client): + source = LogstashSource(benchmark_logs_client, chronological=True) + lines = list(_log_lines(source, "e3", "g3")) + assert not all(">>" in line for line in lines) + + +@pytest.mark.integration +def test_filter_out_unstructured_log_messages(benchmark_logs_client): + source = LogstashSource( + benchmark_logs_client, structured_only=True, chronological=True + ) + lines = list(_log_lines(source, "e3", "g3")) + assert all(">>" in line for line in lines) + + +@pytest.mark.integration +def test_should_retrieve_logs_for_single_experiment(benchmark_logs_client): + source = LogstashSource( + benchmark_logs_client, structured_only=True, chronological=True + ) + + parser = LogParser() + parser.register(DelugeTorrentDownload) + + entries = parser.parse(_log_lines(source, "e3", "g3")) + + assert list(entries) == [ + DelugeTorrentDownload( + name="deluge_piece_downloaded", + timestamp=datetime(2025, 1, 21, 12, 47, 15, 846761, tzinfo=timezone.utc), + value=23, + node="deluge-nodes-e3-g3-0", + torrent_name="dataset-0-0", + ), + DelugeTorrentDownload( + name="deluge_piece_downloaded", + timestamp=datetime(2025, 1, 21, 12, 47, 57, 98167, tzinfo=timezone.utc), + value=310, + node="deluge-nodes-e3-g3-1", + torrent_name="dataset-0-1", + ), + ] + + entries = parser.parse(_log_lines(source, "e2", "g3")) + + assert list(entries) == [ + DelugeTorrentDownload( + name="deluge_piece_downloaded", + timestamp=datetime(2025, 1, 21, 12, 47, 57, 123105, tzinfo=timezone.utc), + value=218, + node="deluge-nodes-e2-g2-1", + torrent_name="dataset-0-1", + ), + ] + + +@pytest.mark.integration +def test_should_return_empty_data_for_non_existing_experiments(benchmark_logs_client): + source = LogstashSource( + benchmark_logs_client, structured_only=True, chronological=True + ) + + parser = LogParser() + parser.register(DelugeTorrentDownload) + + lines = source.logs(experiment_id="e0", group_id="g0") + + assert list(lines) == [] + + +@pytest.mark.integration +def test_should_return_all_experiments_within_a_group(benchmark_logs_client): + source = LogstashSource( + benchmark_logs_client, structured_only=True, chronological=True + ) + assert sorted(list(source.experiments(group_id="g3"))) == ["e2", "e3"] diff --git a/benchmarks/logging/sources/tests/test_vector_flat_file.py b/benchmarks/logging/sources/tests/test_vector_flat_file.py index 9b8d237..fa45a4b 100644 --- a/benchmarks/logging/sources/tests/test_vector_flat_file.py +++ b/benchmarks/logging/sources/tests/test_vector_flat_file.py @@ -44,37 +44,37 @@ EXPERIMENT_LOG = [ def test_should_retrieve_events_for_specific_experiments(): - extractor = VectorFlatFileSource( + source = VectorFlatFileSource( StringIO(make_jsonl(EXPERIMENT_LOG)), app_name="codex-benchmarks", ) - assert list(extractor.logs(group_id="g1736425800", experiment_id="e1")) == [ + assert list(source.logs(group_id="g1736425800", experiment_id="e1")) == [ ("e1", "p1", "m1"), ("e1", "p2", "m2"), ] - assert list(extractor.logs(group_id="g1736425800", experiment_id="e2")) == [ + assert list(source.logs(group_id="g1736425800", experiment_id="e2")) == [ ("e2", "p1", "m3"), ] def test_should_return_empty_when_no_matching_experiment_exists(): - extractor = VectorFlatFileSource( + source = VectorFlatFileSource( StringIO(make_jsonl(EXPERIMENT_LOG)), app_name="codex-benchmarks", ) - assert list(extractor.logs("e3", "g1736425800")) == [] + assert list(source.logs("e3", "g1736425800")) == [] def test_should_retrieve_events_for_an_entire_group(): - extractor = VectorFlatFileSource( + source = VectorFlatFileSource( StringIO(make_jsonl(EXPERIMENT_LOG)), app_name="codex-benchmarks", ) - assert list(extractor.logs(group_id="g1736425800")) == [ + assert list(source.logs(group_id="g1736425800")) == [ ("e1", "p1", "m1"), ("e1", "p2", "m2"), ("e2", "p1", "m3"), diff --git a/benchmarks/logging/sources/vector_flat_file.py b/benchmarks/logging/sources/vector_flat_file.py index e0ba2da..9f74f4c 100644 --- a/benchmarks/logging/sources/vector_flat_file.py +++ b/benchmarks/logging/sources/vector_flat_file.py @@ -17,6 +17,12 @@ class VectorFlatFileSource(LogSource): self.file = file self.app_name = app_name + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.file.close() + def experiments(self, group_id: str) -> Iterator[str]: """ Retrieves all experiment IDs within an experiment group. Can be quite slow as this source supports diff --git a/docker-compose.ci.yaml b/docker-compose.ci.yaml index 8b32861..a3d6151 100644 --- a/docker-compose.ci.yaml +++ b/docker-compose.ci.yaml @@ -12,6 +12,7 @@ services: - DELUGE_AGENT_2=http://agent-2:9002/ - DELUGE_AGENT_3=http://agent-3:9003/ - TRACKER_ANNOUNCE_URL=http://tracker:8000/announce + - ELASTICSEARCH_URL=http://elasticsearch:9200/ depends_on: clean-volumes: condition: service_healthy diff --git a/docker-compose.local.yaml b/docker-compose.local.yaml index d385a11..0c513c1 100644 --- a/docker-compose.local.yaml +++ b/docker-compose.local.yaml @@ -115,6 +115,19 @@ services: ports: - "8000:8000" + elasticsearch: + image: docker.elastic.co/elasticsearch/elasticsearch:8.17.1 + container_name: elasticsearch + command: ["elasticsearch", "-Elogger.level=ERROR"] + environment: + - discovery.type=single-node + - xpack.security.http.ssl.enabled=false + - xpack.security.enabled=false + - xpack.security.enrollment.enabled=false + - ES_JAVA_OPTS=-Xms1024m -Xmx1024m + ports: + - "9200:9200" + volumes: shared-volume-1: shared-volume-2: diff --git a/poetry.lock b/poetry.lock index 88bac1a..6dcf4a2 100644 --- a/poetry.lock +++ b/poetry.lock @@ -202,6 +202,47 @@ files = [ {file = "distlib-0.3.9.tar.gz", hash = "sha256:a60f20dea646b8a33f3e7772f74dc0b2d0772d2837ee1342a00645c81edf9403"}, ] +[[package]] +name = "elastic-transport" +version = "8.17.0" +description = "Transport classes and utilities shared among Python Elastic client libraries" +optional = false +python-versions = ">=3.8" +files = [ + {file = "elastic_transport-8.17.0-py3-none-any.whl", hash = "sha256:59f553300866750e67a38828fede000576562a0e66930c641adb75249e0c95af"}, + {file = "elastic_transport-8.17.0.tar.gz", hash = "sha256:e755f38f99fa6ec5456e236b8e58f0eb18873ac8fe710f74b91a16dd562de2a5"}, +] + +[package.dependencies] +certifi = "*" +urllib3 = ">=1.26.2,<3" + +[package.extras] +develop = ["aiohttp", "furo", "httpcore (<1.0.6)", "httpx", "opentelemetry-api", "opentelemetry-sdk", "orjson", "pytest", "pytest-asyncio", "pytest-cov", "pytest-httpserver", "pytest-mock", "requests", "respx", "sphinx (>2)", "sphinx-autodoc-typehints", "trustme"] + +[[package]] +name = "elasticsearch" +version = "8.17.0" +description = "Python client for Elasticsearch" +optional = false +python-versions = ">=3.8" +files = [ + {file = "elasticsearch-8.17.0-py3-none-any.whl", hash = "sha256:15965240fe297279f0e68b260936d9ced9606aa7ef8910b9b56727f96ef00d5b"}, + {file = "elasticsearch-8.17.0.tar.gz", hash = "sha256:c1069bf2204ba8fab29ff00b2ce6b37324b2cc6ff593283b97df43426ec13053"}, +] + +[package.dependencies] +elastic-transport = ">=8.15.1,<9" + +[package.extras] +async = ["aiohttp (>=3,<4)"] +dev = ["aiohttp", "black", "build", "coverage", "isort", "jinja2", "mapbox-vector-tile", "nox", "numpy", "orjson", "pandas", "pyarrow", "pytest", "pytest-asyncio", "pytest-cov", "python-dateutil", "pyyaml (>=5.4)", "requests (>=2,<3)", "simsimd", "twine", "unasync"] +docs = ["sphinx", "sphinx-autodoc-typehints", "sphinx-rtd-theme (>=2.0)"] +orjson = ["orjson (>=3)"] +pyarrow = ["pyarrow (>=1)"] +requests = ["requests (>=2.4.0,!=2.32.2,<3.0.0)"] +vectorstore-mmr = ["numpy (>=1)", "simsimd (>=3)"] + [[package]] name = "fastapi" version = "0.115.6" @@ -902,4 +943,4 @@ test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess [metadata] lock-version = "2.0" python-versions = "^3.12" -content-hash = "1cb8a00bb7c5eebcafcd9415d1513517c35dd450ca0d4e93a2584f81b08412fe" +content-hash = "84e7963f9c2f926a388e62b3aaab706508965c1a1fb7d9440e9c7727eb83d570" diff --git a/pyproject.toml b/pyproject.toml index fd91e18..d050445 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,7 @@ requests = "^2.32.3" ruff = "^0.8.6" tenacity = "^9.0.0" fastapi = "^0.115.6" +elasticsearch = "^8.17.0" [tool.poetry.group.test.dependencies] pytest = "^8.3.3"