mirror of
https://github.com/logos-storage/bittorrent-benchmarks.git
synced 2026-01-03 13:33:07 +00:00
Merge pull request #3 from codex-storage/feat/logstash
feat: add log source for logstash
This commit is contained in:
commit
d14079cce1
@ -5,6 +5,7 @@ from pathlib import Path
|
||||
from typing import Dict
|
||||
|
||||
import uvicorn
|
||||
from elasticsearch import Elasticsearch
|
||||
from pydantic import IPvAnyAddress
|
||||
from pydantic_core import ValidationError
|
||||
from typing_extensions import TypeVar
|
||||
@ -21,7 +22,12 @@ from benchmarks.logging.logging import (
|
||||
LogEntry,
|
||||
LogSplitterFormats,
|
||||
)
|
||||
from benchmarks.logging.sources.sources import FSOutputManager, split_logs_in_source
|
||||
from benchmarks.logging.sources.logstash import LogstashSource
|
||||
from benchmarks.logging.sources.sources import (
|
||||
FSOutputManager,
|
||||
split_logs_in_source,
|
||||
LogSource,
|
||||
)
|
||||
from benchmarks.logging.sources.vector_flat_file import VectorFlatFileSource
|
||||
|
||||
experiment_config_parser = ConfigParser[ExperimentBuilder]()
|
||||
@ -87,11 +93,7 @@ def cmd_parse_single_log(log: Path, output: Path):
|
||||
splitter.split(log_parser.parse(istream))
|
||||
|
||||
|
||||
def cmd_parse_log_source(group_id: str, source_file: Path, output_dir: Path):
|
||||
if not source_file.exists():
|
||||
print(f"Log source file {source_file} does not exist.")
|
||||
sys.exit(-1)
|
||||
|
||||
def cmd_parse_log_source(source: LogSource, group_id: str, output_dir: Path):
|
||||
if not output_dir.parent.exists():
|
||||
print(f"Folder {output_dir.parent} does not exist.")
|
||||
sys.exit(-1)
|
||||
@ -99,10 +101,9 @@ def cmd_parse_log_source(group_id: str, source_file: Path, output_dir: Path):
|
||||
output_dir.mkdir(exist_ok=True)
|
||||
|
||||
with (
|
||||
source_file.open("r", encoding="utf-8") as istream,
|
||||
source as log_source,
|
||||
FSOutputManager(output_dir) as output_manager,
|
||||
):
|
||||
log_source = VectorFlatFileSource(app_name="codex-benchmarks", file=istream)
|
||||
split_logs_in_source(
|
||||
log_source,
|
||||
log_parser,
|
||||
@ -146,6 +147,27 @@ def _parse_config(
|
||||
sys.exit(-1)
|
||||
|
||||
|
||||
def _configure_source(args):
|
||||
# TODO we should probably have builders for sources as well, but for now
|
||||
# we'll just keep it simple.
|
||||
if args.source_file:
|
||||
if not args.source_file.exists():
|
||||
print(f"Log source file {args.source_file} does not exist.")
|
||||
sys.exit(-1)
|
||||
return VectorFlatFileSource(
|
||||
app_name="codex-benchmarks", file=args.source_file.open(encoding="utf-8")
|
||||
)
|
||||
else:
|
||||
import urllib3
|
||||
|
||||
urllib3.disable_warnings()
|
||||
|
||||
return LogstashSource(
|
||||
Elasticsearch(args.es_url, verify_certs=False),
|
||||
structured_only=True,
|
||||
)
|
||||
|
||||
|
||||
def _init_logging():
|
||||
import logging
|
||||
|
||||
@ -213,8 +235,13 @@ def main():
|
||||
log_source_cmd = log_subcommands.add_parser(
|
||||
"source", help="Parse logs from a log source."
|
||||
)
|
||||
log_source_cmd.add_argument(
|
||||
"source_file", type=Path, help="Vector log file to parse from."
|
||||
|
||||
group = log_source_cmd.add_mutually_exclusive_group(required=True)
|
||||
group.add_argument(
|
||||
"--source-file", type=Path, help="Vector log file to parse from."
|
||||
)
|
||||
group.add_argument(
|
||||
"--es-url", type=str, help="URL to a logstash Elasticsearch instance."
|
||||
)
|
||||
log_source_cmd.add_argument(
|
||||
"output_dir", type=Path, help="Path to an output folder."
|
||||
@ -224,7 +251,7 @@ def main():
|
||||
)
|
||||
log_source_cmd.set_defaults(
|
||||
func=lambda args: cmd_parse_log_source(
|
||||
args.group_id, args.source_file, args.output_dir
|
||||
_configure_source(args), args.group_id, args.output_dir
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@ -2,3 +2,4 @@
|
||||
|
||||
from benchmarks.deluge.tests.fixtures import *
|
||||
from benchmarks.core.tests.fixtures import *
|
||||
from benchmarks.logging.sources.tests.fixtures import *
|
||||
|
||||
142
benchmarks/logging/sources/logstash.py
Normal file
142
benchmarks/logging/sources/logstash.py
Normal file
@ -0,0 +1,142 @@
|
||||
import datetime
|
||||
import logging
|
||||
from collections.abc import Iterator
|
||||
from typing import Optional, Tuple, Any, Dict, List
|
||||
|
||||
from elasticsearch import Elasticsearch
|
||||
|
||||
from benchmarks.logging.sources.sources import LogSource, ExperimentId, NodeId, RawLine
|
||||
|
||||
GROUP_LABEL = "app.kubernetes.io/part-of"
|
||||
EXPERIMENT_LABEL = "app.kubernetes.io/instance"
|
||||
DEFAULT_HORIZON = 5
|
||||
ES_MAX_BATCH_SIZE = 10_000
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LogstashSource(LogSource):
|
||||
"""Log source for logs stored in Elasticsearch by Logstash. This is typically used when running experiments
|
||||
in a Kubernetes cluster."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
client: Elasticsearch,
|
||||
structured_only: bool = False,
|
||||
chronological: bool = False,
|
||||
horizon: int = DEFAULT_HORIZON,
|
||||
today: Optional[datetime.date] = None,
|
||||
):
|
||||
"""
|
||||
@:param client: Elasticsearch client to use for retrieving logs
|
||||
@:param structured_only: If True, only return structured log lines (those starting with '>>').
|
||||
@:param chronological: If True, return logs in chronological order. This is mostly meant for use
|
||||
in testing, and can get quite slow/expensive for large queries.
|
||||
"""
|
||||
self.client = client
|
||||
self.structured_only = structured_only
|
||||
self.chronological = chronological
|
||||
self._indexes = self._generate_indexes(today, horizon)
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.client.close()
|
||||
|
||||
@property
|
||||
def indexes(self) -> List[str]:
|
||||
return list(self._indexes)
|
||||
|
||||
def experiments(self, group_id: str) -> Iterator[str]:
|
||||
"""Retrieves all experiment IDs within an experiment group."""
|
||||
query = {
|
||||
"size": 0,
|
||||
"query": {
|
||||
"constant_score": {
|
||||
"filter": {"term": {f"pod_labels.{GROUP_LABEL}.keyword": group_id}}
|
||||
}
|
||||
},
|
||||
"aggs": {
|
||||
"experiments": {
|
||||
"terms": {
|
||||
"field": f"pod_labels.{EXPERIMENT_LABEL}.keyword",
|
||||
"size": 1000,
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
response = self.client.search(index="benchmarks-*", body=query)
|
||||
for bucket in response["aggregations"]["experiments"]["buckets"]:
|
||||
yield bucket["key"]
|
||||
|
||||
def logs(
|
||||
self, group_id: str, experiment_id: Optional[str] = None
|
||||
) -> Iterator[Tuple[ExperimentId, NodeId, RawLine]]:
|
||||
"""Retrieves logs for either all experiments within a group, or a specific experiment."""
|
||||
filters = [{"term": {f"pod_labels.{GROUP_LABEL}.keyword": group_id}}]
|
||||
|
||||
if experiment_id:
|
||||
filters.append(
|
||||
{"term": {f"pod_labels.{EXPERIMENT_LABEL}.keyword": experiment_id}}
|
||||
)
|
||||
|
||||
if self.structured_only:
|
||||
filters.append({"match_phrase": {"message": "entry_type"}})
|
||||
|
||||
query: Dict[str, Any] = {"query": {"bool": {"filter": filters}}}
|
||||
|
||||
if self.chronological:
|
||||
query["sort"] = [{"@timestamp": {"order": "asc"}}]
|
||||
|
||||
# We can probably cache this, but for now OK.
|
||||
actual_indexes = [
|
||||
index for index in self.indexes if self.client.indices.exists(index=index)
|
||||
]
|
||||
|
||||
# Scrolls are much cheaper than queries.
|
||||
scroll_response = self.client.search(
|
||||
index=actual_indexes, body=query, scroll="2m", size=ES_MAX_BATCH_SIZE
|
||||
)
|
||||
scroll_id = scroll_response["_scroll_id"]
|
||||
|
||||
try:
|
||||
while True:
|
||||
hits = scroll_response["hits"]["hits"]
|
||||
if not hits:
|
||||
break
|
||||
|
||||
for hit in hits:
|
||||
source = hit["_source"]
|
||||
message = source["message"]
|
||||
|
||||
experiment_id = source["pod_labels"][EXPERIMENT_LABEL]
|
||||
node_id = source["pod_name"]
|
||||
|
||||
if (
|
||||
not isinstance(experiment_id, str)
|
||||
or not isinstance(node_id, str)
|
||||
or not isinstance(message, str)
|
||||
):
|
||||
logger.warning(
|
||||
"Skipping log entry with invalid data: %s", source
|
||||
)
|
||||
continue
|
||||
|
||||
yield experiment_id, node_id, message
|
||||
|
||||
# Get next batch of results
|
||||
scroll_response = self.client.scroll(scroll_id=scroll_id, scroll="2m")
|
||||
finally:
|
||||
# Clean up scroll context
|
||||
self.client.clear_scroll(scroll_id=scroll_id)
|
||||
|
||||
def _generate_indexes(self, today: Optional[datetime.date], horizon: int):
|
||||
if today is None:
|
||||
today = datetime.date.today()
|
||||
|
||||
return [
|
||||
f"benchmarks-{(today - datetime.timedelta(days=i)).strftime('%Y.%m.%d')}"
|
||||
for i in range(horizon)
|
||||
]
|
||||
@ -1,5 +1,5 @@
|
||||
import logging
|
||||
from abc import ABC, abstractmethod
|
||||
from abc import abstractmethod
|
||||
from collections.abc import Iterator
|
||||
from contextlib import AbstractContextManager
|
||||
from pathlib import Path
|
||||
@ -19,7 +19,7 @@ NodeId = str
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LogSource(ABC):
|
||||
class LogSource(AbstractContextManager):
|
||||
""":class:`LogSource` knows how to retrieve logs for experiments within experiment groups. A key assumption is that
|
||||
group ids are known, and those can usually be recovered from, say, a workflow run."""
|
||||
|
||||
|
||||
@ -0,0 +1,80 @@
|
||||
{
|
||||
"documents": [
|
||||
{
|
||||
"@timestamp": "2025-01-21T12:47:57.098459487Z",
|
||||
"file": "/var/log/pods/codex-benchmarks_deluge-nodes-e3-g3-1_70c962b4-8fa3-43be-a664-aefb793c7b8c/deluge-node/0.log",
|
||||
"message": "12:47:57.098 [INFO ][deluge.core.metrics :49 ] >>{\"entry_type\": \"deluge_torrent_download\", \"timestamp\": \"2025-01-21T12:47:57.098167+00:00\", \"name\": \"deluge_piece_downloaded\", \"value\": 310, \"node\": \"deluge-nodes-e3-g3-1\", \"torrent_name\": \"dataset-0-1\"}",
|
||||
"pod_labels": {
|
||||
"app.kubernetes.io/component": "deluge-node",
|
||||
"app.kubernetes.io/instance": "e3",
|
||||
"app.kubernetes.io/managed-by": "Helm",
|
||||
"app.kubernetes.io/name": "codex-benchmarks",
|
||||
"app.kubernetes.io/part-of": "g3",
|
||||
"app.kubernetes.io/version": "1.0.0",
|
||||
"apps.kubernetes.io/pod-index": "1",
|
||||
"controller-revision-hash": "deluge-nodes-e3-g3-bd9d9789d",
|
||||
"helm.sh/chart": "deluge-benchmarks-0.1.0",
|
||||
"statefulset.kubernetes.io/pod-name": "deluge-nodes-e3-g3-1"
|
||||
},
|
||||
"pod_name": "deluge-nodes-e3-g3-1",
|
||||
"pod_namespace": "codex-benchmarks"
|
||||
},
|
||||
{
|
||||
"@timestamp": "2025-01-21T12:47:57.099459487Z",
|
||||
"file": "/var/log/pods/codex-benchmarks_deluge-nodes-e3-g3-1_70c962b4-8fa3-43be-a664-aefb793c7b8c/deluge-node/0.log",
|
||||
"message": "12:47:57.098 INFO An unstructured log message",
|
||||
"pod_labels": {
|
||||
"app.kubernetes.io/component": "deluge-node",
|
||||
"app.kubernetes.io/instance": "e3",
|
||||
"app.kubernetes.io/managed-by": "Helm",
|
||||
"app.kubernetes.io/name": "codex-benchmarks",
|
||||
"app.kubernetes.io/part-of": "g3",
|
||||
"app.kubernetes.io/version": "1.0.0",
|
||||
"apps.kubernetes.io/pod-index": "1",
|
||||
"controller-revision-hash": "deluge-nodes-e3-g3-bd9d9789d",
|
||||
"helm.sh/chart": "deluge-benchmarks-0.1.0",
|
||||
"statefulset.kubernetes.io/pod-name": "deluge-nodes-e3-g3-1"
|
||||
},
|
||||
"pod_name": "deluge-nodes-e3-g3-1",
|
||||
"pod_namespace": "codex-benchmarks"
|
||||
},
|
||||
{
|
||||
"@timestamp": "2025-01-21T12:47:15.847325207Z",
|
||||
"file": "/var/log/pods/codex-benchmarks_deluge-nodes-e3-g3-0_c42bf4d9-4f1d-40b2-9654-5de197153ac0/deluge-node/0.log",
|
||||
"message": "12:47:15.847 [INFO ][deluge.core.metrics :49 ] >>{\"entry_type\": \"deluge_torrent_download\", \"timestamp\": \"2025-01-21T12:47:15.846761+00:00\", \"name\": \"deluge_piece_downloaded\", \"value\": 23, \"node\": \"deluge-nodes-e3-g3-0\", \"torrent_name\": \"dataset-0-0\"}",
|
||||
"pod_labels": {
|
||||
"app.kubernetes.io/component": "deluge-node",
|
||||
"app.kubernetes.io/instance": "e3",
|
||||
"app.kubernetes.io/managed-by": "Helm",
|
||||
"app.kubernetes.io/name": "codex-benchmarks",
|
||||
"app.kubernetes.io/part-of": "g3",
|
||||
"app.kubernetes.io/version": "1.0.0",
|
||||
"apps.kubernetes.io/pod-index": "0",
|
||||
"controller-revision-hash": "deluge-nodes-e3-g3-bd9d9789d",
|
||||
"helm.sh/chart": "deluge-benchmarks-0.1.0",
|
||||
"statefulset.kubernetes.io/pod-name": "deluge-nodes-e3-g3-0"
|
||||
},
|
||||
"pod_name": "deluge-nodes-e3-g3-0",
|
||||
"pod_namespace": "codex-benchmarks"
|
||||
},
|
||||
{
|
||||
"@timestamp": "2025-01-21T12:47:57.123446028Z",
|
||||
"file": "/var/log/pods/codex-benchmarks_deluge-nodes-e3-g3-1_70c962b4-8fa3-43be-a664-aefb793c7b8c/deluge-node/0.log",
|
||||
"message": "12:47:57.123 [INFO ][deluge.core.metrics :49 ] >>{\"entry_type\": \"deluge_torrent_download\", \"timestamp\": \"2025-01-21T12:47:57.123105+00:00\", \"name\": \"deluge_piece_downloaded\", \"value\": 218, \"node\": \"deluge-nodes-e2-g2-1\", \"torrent_name\": \"dataset-0-1\"}",
|
||||
"pod_labels": {
|
||||
"app.kubernetes.io/component": "deluge-node",
|
||||
"app.kubernetes.io/instance": "e2",
|
||||
"app.kubernetes.io/managed-by": "Helm",
|
||||
"app.kubernetes.io/name": "codex-benchmarks",
|
||||
"app.kubernetes.io/part-of": "g3",
|
||||
"app.kubernetes.io/version": "1.0.0",
|
||||
"apps.kubernetes.io/pod-index": "1",
|
||||
"controller-revision-hash": "deluge-nodes-e2-g2-bd9d9789d",
|
||||
"helm.sh/chart": "deluge-benchmarks-0.1.0",
|
||||
"statefulset.kubernetes.io/pod-name": "deluge-nodes-e2-g2-1"
|
||||
},
|
||||
"pod_name": "deluge-nodes-e2-g2-1",
|
||||
"pod_namespace": "codex-benchmarks"
|
||||
}
|
||||
]
|
||||
}
|
||||
@ -0,0 +1,220 @@
|
||||
{
|
||||
"mappings": {
|
||||
"properties": {
|
||||
"@timestamp": {
|
||||
"type": "date"
|
||||
},
|
||||
"file": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"message": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"pod_labels": {
|
||||
"properties": {
|
||||
"app": {
|
||||
"properties": {
|
||||
"kubernetes": {
|
||||
"properties": {
|
||||
"io/component": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"io/instance": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"io/managed-by": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"io/name": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"io/part-of": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"io/version": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"apps": {
|
||||
"properties": {
|
||||
"kubernetes": {
|
||||
"properties": {
|
||||
"io/pod-index": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"batch": {
|
||||
"properties": {
|
||||
"kubernetes": {
|
||||
"properties": {
|
||||
"io/controller-uid": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"io/job-name": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"controller-revision-hash": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"controller-uid": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"helm": {
|
||||
"properties": {
|
||||
"sh/chart": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"job-name": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"pod-template-hash": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"statefulset": {
|
||||
"properties": {
|
||||
"kubernetes": {
|
||||
"properties": {
|
||||
"io/pod-name": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"pod_name": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
},
|
||||
"pod_namespace": {
|
||||
"type": "text",
|
||||
"fields": {
|
||||
"keyword": {
|
||||
"type": "keyword",
|
||||
"ignore_above": 256
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
40
benchmarks/logging/sources/tests/fixtures.py
Normal file
40
benchmarks/logging/sources/tests/fixtures.py
Normal file
@ -0,0 +1,40 @@
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any
|
||||
|
||||
import pytest
|
||||
from elasticsearch import Elasticsearch
|
||||
|
||||
from benchmarks.core.utils import await_predicate
|
||||
|
||||
|
||||
def _json_data(data: str) -> Dict[str, Any]:
|
||||
with (Path(__file__).parent / data).open(encoding="utf-8") as json_file:
|
||||
return json.load(json_file)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def benchmark_logs_client() -> Elasticsearch:
|
||||
client = Elasticsearch(os.environ.get("ELASTICSEARCH_URL", "http://localhost:9200"))
|
||||
|
||||
if client.indices.exists(index="benchmarks-2025.01.21"):
|
||||
client.indices.delete(index="benchmarks-2025.01.21")
|
||||
|
||||
client.indices.create(
|
||||
index="benchmarks-2025.01.21",
|
||||
body=_json_data("benchmarks-2025.01.21-mapping.json"),
|
||||
)
|
||||
|
||||
documents = _json_data("benchmarks-2025.01.21-documents.json")["documents"]
|
||||
for document in documents:
|
||||
client.index(index="benchmarks-2025.01.21", body=document)
|
||||
|
||||
def _is_indexed() -> bool:
|
||||
return client.count(index="benchmarks-2025.01.21")["count"] == len(documents)
|
||||
|
||||
assert await_predicate(
|
||||
_is_indexed, timeout=10, polling_interval=0.5
|
||||
), "Indexing failed"
|
||||
|
||||
return client
|
||||
110
benchmarks/logging/sources/tests/test_logstash_source.py
Normal file
110
benchmarks/logging/sources/tests/test_logstash_source.py
Normal file
@ -0,0 +1,110 @@
|
||||
import pytest
|
||||
from elasticsearch import Elasticsearch
|
||||
|
||||
from benchmarks.deluge.logging import DelugeTorrentDownload
|
||||
from benchmarks.logging.logging import LogParser
|
||||
from datetime import datetime, timezone, date
|
||||
|
||||
from benchmarks.logging.sources.logstash import LogstashSource
|
||||
|
||||
|
||||
def _log_lines(source, experiment_id, group_id):
|
||||
return (
|
||||
rawline
|
||||
for _, _, rawline in source.logs(experiment_id=experiment_id, group_id=group_id)
|
||||
)
|
||||
|
||||
|
||||
def test_should_look_into_k_day_horizon():
|
||||
source = LogstashSource(
|
||||
Elasticsearch("http://bogus.com:9000/"),
|
||||
today=date(2025, 1, 21),
|
||||
horizon=5,
|
||||
)
|
||||
|
||||
assert source.indexes == [
|
||||
"benchmarks-2025.01.21",
|
||||
"benchmarks-2025.01.20",
|
||||
"benchmarks-2025.01.19",
|
||||
"benchmarks-2025.01.18",
|
||||
"benchmarks-2025.01.17",
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_should_retrieve_unstructured_log_messages(benchmark_logs_client):
|
||||
source = LogstashSource(benchmark_logs_client, chronological=True)
|
||||
lines = list(_log_lines(source, "e3", "g3"))
|
||||
assert not all(">>" in line for line in lines)
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_filter_out_unstructured_log_messages(benchmark_logs_client):
|
||||
source = LogstashSource(
|
||||
benchmark_logs_client, structured_only=True, chronological=True
|
||||
)
|
||||
lines = list(_log_lines(source, "e3", "g3"))
|
||||
assert all(">>" in line for line in lines)
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_should_retrieve_logs_for_single_experiment(benchmark_logs_client):
|
||||
source = LogstashSource(
|
||||
benchmark_logs_client, structured_only=True, chronological=True
|
||||
)
|
||||
|
||||
parser = LogParser()
|
||||
parser.register(DelugeTorrentDownload)
|
||||
|
||||
entries = parser.parse(_log_lines(source, "e3", "g3"))
|
||||
|
||||
assert list(entries) == [
|
||||
DelugeTorrentDownload(
|
||||
name="deluge_piece_downloaded",
|
||||
timestamp=datetime(2025, 1, 21, 12, 47, 15, 846761, tzinfo=timezone.utc),
|
||||
value=23,
|
||||
node="deluge-nodes-e3-g3-0",
|
||||
torrent_name="dataset-0-0",
|
||||
),
|
||||
DelugeTorrentDownload(
|
||||
name="deluge_piece_downloaded",
|
||||
timestamp=datetime(2025, 1, 21, 12, 47, 57, 98167, tzinfo=timezone.utc),
|
||||
value=310,
|
||||
node="deluge-nodes-e3-g3-1",
|
||||
torrent_name="dataset-0-1",
|
||||
),
|
||||
]
|
||||
|
||||
entries = parser.parse(_log_lines(source, "e2", "g3"))
|
||||
|
||||
assert list(entries) == [
|
||||
DelugeTorrentDownload(
|
||||
name="deluge_piece_downloaded",
|
||||
timestamp=datetime(2025, 1, 21, 12, 47, 57, 123105, tzinfo=timezone.utc),
|
||||
value=218,
|
||||
node="deluge-nodes-e2-g2-1",
|
||||
torrent_name="dataset-0-1",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_should_return_empty_data_for_non_existing_experiments(benchmark_logs_client):
|
||||
source = LogstashSource(
|
||||
benchmark_logs_client, structured_only=True, chronological=True
|
||||
)
|
||||
|
||||
parser = LogParser()
|
||||
parser.register(DelugeTorrentDownload)
|
||||
|
||||
lines = source.logs(experiment_id="e0", group_id="g0")
|
||||
|
||||
assert list(lines) == []
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_should_return_all_experiments_within_a_group(benchmark_logs_client):
|
||||
source = LogstashSource(
|
||||
benchmark_logs_client, structured_only=True, chronological=True
|
||||
)
|
||||
assert sorted(list(source.experiments(group_id="g3"))) == ["e2", "e3"]
|
||||
@ -44,37 +44,37 @@ EXPERIMENT_LOG = [
|
||||
|
||||
|
||||
def test_should_retrieve_events_for_specific_experiments():
|
||||
extractor = VectorFlatFileSource(
|
||||
source = VectorFlatFileSource(
|
||||
StringIO(make_jsonl(EXPERIMENT_LOG)),
|
||||
app_name="codex-benchmarks",
|
||||
)
|
||||
|
||||
assert list(extractor.logs(group_id="g1736425800", experiment_id="e1")) == [
|
||||
assert list(source.logs(group_id="g1736425800", experiment_id="e1")) == [
|
||||
("e1", "p1", "m1"),
|
||||
("e1", "p2", "m2"),
|
||||
]
|
||||
|
||||
assert list(extractor.logs(group_id="g1736425800", experiment_id="e2")) == [
|
||||
assert list(source.logs(group_id="g1736425800", experiment_id="e2")) == [
|
||||
("e2", "p1", "m3"),
|
||||
]
|
||||
|
||||
|
||||
def test_should_return_empty_when_no_matching_experiment_exists():
|
||||
extractor = VectorFlatFileSource(
|
||||
source = VectorFlatFileSource(
|
||||
StringIO(make_jsonl(EXPERIMENT_LOG)),
|
||||
app_name="codex-benchmarks",
|
||||
)
|
||||
|
||||
assert list(extractor.logs("e3", "g1736425800")) == []
|
||||
assert list(source.logs("e3", "g1736425800")) == []
|
||||
|
||||
|
||||
def test_should_retrieve_events_for_an_entire_group():
|
||||
extractor = VectorFlatFileSource(
|
||||
source = VectorFlatFileSource(
|
||||
StringIO(make_jsonl(EXPERIMENT_LOG)),
|
||||
app_name="codex-benchmarks",
|
||||
)
|
||||
|
||||
assert list(extractor.logs(group_id="g1736425800")) == [
|
||||
assert list(source.logs(group_id="g1736425800")) == [
|
||||
("e1", "p1", "m1"),
|
||||
("e1", "p2", "m2"),
|
||||
("e2", "p1", "m3"),
|
||||
|
||||
@ -17,6 +17,12 @@ class VectorFlatFileSource(LogSource):
|
||||
self.file = file
|
||||
self.app_name = app_name
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.file.close()
|
||||
|
||||
def experiments(self, group_id: str) -> Iterator[str]:
|
||||
"""
|
||||
Retrieves all experiment IDs within an experiment group. Can be quite slow as this source supports
|
||||
|
||||
@ -12,6 +12,7 @@ services:
|
||||
- DELUGE_AGENT_2=http://agent-2:9002/
|
||||
- DELUGE_AGENT_3=http://agent-3:9003/
|
||||
- TRACKER_ANNOUNCE_URL=http://tracker:8000/announce
|
||||
- ELASTICSEARCH_URL=http://elasticsearch:9200/
|
||||
depends_on:
|
||||
clean-volumes:
|
||||
condition: service_healthy
|
||||
|
||||
@ -115,6 +115,19 @@ services:
|
||||
ports:
|
||||
- "8000:8000"
|
||||
|
||||
elasticsearch:
|
||||
image: docker.elastic.co/elasticsearch/elasticsearch:8.17.1
|
||||
container_name: elasticsearch
|
||||
command: ["elasticsearch", "-Elogger.level=ERROR"]
|
||||
environment:
|
||||
- discovery.type=single-node
|
||||
- xpack.security.http.ssl.enabled=false
|
||||
- xpack.security.enabled=false
|
||||
- xpack.security.enrollment.enabled=false
|
||||
- ES_JAVA_OPTS=-Xms1024m -Xmx1024m
|
||||
ports:
|
||||
- "9200:9200"
|
||||
|
||||
volumes:
|
||||
shared-volume-1:
|
||||
shared-volume-2:
|
||||
|
||||
43
poetry.lock
generated
43
poetry.lock
generated
@ -202,6 +202,47 @@ files = [
|
||||
{file = "distlib-0.3.9.tar.gz", hash = "sha256:a60f20dea646b8a33f3e7772f74dc0b2d0772d2837ee1342a00645c81edf9403"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "elastic-transport"
|
||||
version = "8.17.0"
|
||||
description = "Transport classes and utilities shared among Python Elastic client libraries"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "elastic_transport-8.17.0-py3-none-any.whl", hash = "sha256:59f553300866750e67a38828fede000576562a0e66930c641adb75249e0c95af"},
|
||||
{file = "elastic_transport-8.17.0.tar.gz", hash = "sha256:e755f38f99fa6ec5456e236b8e58f0eb18873ac8fe710f74b91a16dd562de2a5"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
certifi = "*"
|
||||
urllib3 = ">=1.26.2,<3"
|
||||
|
||||
[package.extras]
|
||||
develop = ["aiohttp", "furo", "httpcore (<1.0.6)", "httpx", "opentelemetry-api", "opentelemetry-sdk", "orjson", "pytest", "pytest-asyncio", "pytest-cov", "pytest-httpserver", "pytest-mock", "requests", "respx", "sphinx (>2)", "sphinx-autodoc-typehints", "trustme"]
|
||||
|
||||
[[package]]
|
||||
name = "elasticsearch"
|
||||
version = "8.17.0"
|
||||
description = "Python client for Elasticsearch"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "elasticsearch-8.17.0-py3-none-any.whl", hash = "sha256:15965240fe297279f0e68b260936d9ced9606aa7ef8910b9b56727f96ef00d5b"},
|
||||
{file = "elasticsearch-8.17.0.tar.gz", hash = "sha256:c1069bf2204ba8fab29ff00b2ce6b37324b2cc6ff593283b97df43426ec13053"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
elastic-transport = ">=8.15.1,<9"
|
||||
|
||||
[package.extras]
|
||||
async = ["aiohttp (>=3,<4)"]
|
||||
dev = ["aiohttp", "black", "build", "coverage", "isort", "jinja2", "mapbox-vector-tile", "nox", "numpy", "orjson", "pandas", "pyarrow", "pytest", "pytest-asyncio", "pytest-cov", "python-dateutil", "pyyaml (>=5.4)", "requests (>=2,<3)", "simsimd", "twine", "unasync"]
|
||||
docs = ["sphinx", "sphinx-autodoc-typehints", "sphinx-rtd-theme (>=2.0)"]
|
||||
orjson = ["orjson (>=3)"]
|
||||
pyarrow = ["pyarrow (>=1)"]
|
||||
requests = ["requests (>=2.4.0,!=2.32.2,<3.0.0)"]
|
||||
vectorstore-mmr = ["numpy (>=1)", "simsimd (>=3)"]
|
||||
|
||||
[[package]]
|
||||
name = "fastapi"
|
||||
version = "0.115.6"
|
||||
@ -902,4 +943,4 @@ test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.12"
|
||||
content-hash = "1cb8a00bb7c5eebcafcd9415d1513517c35dd450ca0d4e93a2584f81b08412fe"
|
||||
content-hash = "84e7963f9c2f926a388e62b3aaab706508965c1a1fb7d9440e9c7727eb83d570"
|
||||
|
||||
@ -17,6 +17,7 @@ requests = "^2.32.3"
|
||||
ruff = "^0.8.6"
|
||||
tenacity = "^9.0.0"
|
||||
fastapi = "^0.115.6"
|
||||
elasticsearch = "^8.17.0"
|
||||
|
||||
[tool.poetry.group.test.dependencies]
|
||||
pytest = "^8.3.3"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user