143 lines
4.8 KiB
Python

import datetime
import logging
from collections.abc import Iterator
from typing import Optional, Tuple, Any, Dict, List
from elasticsearch import Elasticsearch
from benchmarks.logging.sources.sources import LogSource, ExperimentId, NodeId, RawLine
GROUP_LABEL = "app.kubernetes.io/part-of"
EXPERIMENT_LABEL = "app.kubernetes.io/instance"
DEFAULT_HORIZON = 5
ES_MAX_BATCH_SIZE = 10_000
logger = logging.getLogger(__name__)
class LogstashSource(LogSource):
"""Log source for logs stored in Elasticsearch by Logstash. This is typically used when running experiments
in a Kubernetes cluster."""
def __init__(
self,
client: Elasticsearch,
structured_only: bool = False,
chronological: bool = False,
horizon: int = DEFAULT_HORIZON,
today: Optional[datetime.date] = None,
):
"""
@:param client: Elasticsearch client to use for retrieving logs
@:param structured_only: If True, only return structured log lines (those starting with '>>').
@:param chronological: If True, return logs in chronological order. This is mostly meant for use
in testing, and can get quite slow/expensive for large queries.
"""
self.client = client
self.structured_only = structured_only
self.chronological = chronological
self._indexes = self._generate_indexes(today, horizon)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.client.close()
@property
def indexes(self) -> List[str]:
return list(self._indexes)
def experiments(self, group_id: str) -> Iterator[str]:
"""Retrieves all experiment IDs within an experiment group."""
query = {
"size": 0,
"query": {
"constant_score": {
"filter": {"term": {f"pod_labels.{GROUP_LABEL}.keyword": group_id}}
}
},
"aggs": {
"experiments": {
"terms": {
"field": f"pod_labels.{EXPERIMENT_LABEL}.keyword",
"size": 1000,
}
}
},
}
response = self.client.search(index="benchmarks-*", body=query)
for bucket in response["aggregations"]["experiments"]["buckets"]:
yield bucket["key"]
def logs(
self, group_id: str, experiment_id: Optional[str] = None
) -> Iterator[Tuple[ExperimentId, NodeId, RawLine]]:
"""Retrieves logs for either all experiments within a group, or a specific experiment."""
filters = [{"term": {f"pod_labels.{GROUP_LABEL}.keyword": group_id}}]
if experiment_id:
filters.append(
{"term": {f"pod_labels.{EXPERIMENT_LABEL}.keyword": experiment_id}}
)
if self.structured_only:
filters.append({"match_phrase": {"message": "entry_type"}})
query: Dict[str, Any] = {"query": {"bool": {"filter": filters}}}
if self.chronological:
query["sort"] = [{"@timestamp": {"order": "asc"}}]
# We can probably cache this, but for now OK.
actual_indexes = [
index for index in self.indexes if self.client.indices.exists(index=index)
]
# Scrolls are much cheaper than queries.
scroll_response = self.client.search(
index=actual_indexes, body=query, scroll="2m", size=ES_MAX_BATCH_SIZE
)
scroll_id = scroll_response["_scroll_id"]
try:
while True:
hits = scroll_response["hits"]["hits"]
if not hits:
break
for hit in hits:
source = hit["_source"]
message = source["message"]
experiment_id = source["pod_labels"][EXPERIMENT_LABEL]
node_id = source["pod_name"]
if (
not isinstance(experiment_id, str)
or not isinstance(node_id, str)
or not isinstance(message, str)
):
logger.warning(
"Skipping log entry with invalid data: %s", source
)
continue
yield experiment_id, node_id, message
# Get next batch of results
scroll_response = self.client.scroll(scroll_id=scroll_id, scroll="2m")
finally:
# Clean up scroll context
self.client.clear_scroll(scroll_id=scroll_id)
def _generate_indexes(self, today: Optional[datetime.date], horizon: int):
if today is None:
today = datetime.date.today()
return [
f"benchmarks-{(today - datetime.timedelta(days=i)).strftime('%Y.%m.%d')}"
for i in range(horizon)
]