diff --git a/logtools/log/base.py b/logtools/log/base.py index a25b51f..821c5c4 100644 --- a/logtools/log/base.py +++ b/logtools/log/base.py @@ -18,10 +18,10 @@ class LineNumberLocation: @dataclass class RawLogLine(Generic[TLocation]): """ - A :class:`RawLogLine` is a log line that has not been parsed. It contains the raw text of the line and an optional - location when that can be meaningfully established by the input source. + A :class:`RawLogLine` is a log line that has not been parsed. It contains the raw text of the line and a + location, when that can be meaningfully established by the input source. """ - location: Optional[TLocation] + location: TLocation raw: str diff --git a/logtools/log/sources/input/elastic_search/elastic_search_log_repo.py b/logtools/log/sources/input/elastic_search/elastic_search_log_repo.py index e0e63b1..b7a2af9 100644 --- a/logtools/log/sources/input/elastic_search/elastic_search_log_repo.py +++ b/logtools/log/sources/input/elastic_search/elastic_search_log_repo.py @@ -51,9 +51,9 @@ class ElasticSearchLogRepo: } if prefix is not None: - query['aggs']['distinct_namespaces']['terms']['include'] = f'{prefix}.*' + query['aggs']['distinct_namespaces']['terms']['include'] = f'{prefix}.*' # type: ignore - result = self.client.search(index=self.indices, body=query) + result = self.client.search(index=self.indices, body=query) # type: ignore for namespace in result['aggregations']['distinct_namespaces']['buckets']: yield Namespace( @@ -78,7 +78,7 @@ class ElasticSearchLogRepo: } if prefix is not None: - query['aggs']['distinct_pods']['terms']['include'] = f'{prefix}.*' + query['aggs']['distinct_pods']['terms']['include'] = f'{prefix}.*' # type: ignore if run_id is not None: query['query'] = { @@ -87,7 +87,8 @@ class ElasticSearchLogRepo: } } - for pod in self.client.search(index=self.indices, body=query)['aggregations']['distinct_pods']['buckets']: + for pod in self.client.search(index=self.indices, + body=query)['aggregations']['distinct_pods']['buckets']: # type: ignore assert len(pod['namespace']['buckets']) == 1, 'Pods should only have one namespace' assert len(pod['runid']['buckets']) == 1, 'Pods should only have one run_id' @@ -97,4 +98,3 @@ class ElasticSearchLogRepo: run_id=pod['runid']['buckets'][0]['key'], indices=tuple(sorted(index['key'] for index in pod['indices']['buckets'])) ) - diff --git a/logtools/log/sources/input/elastic_search/elastic_search_source.py b/logtools/log/sources/input/elastic_search/elastic_search_source.py index 11a120d..c9d96a0 100644 --- a/logtools/log/sources/input/elastic_search/elastic_search_source.py +++ b/logtools/log/sources/input/elastic_search/elastic_search_source.py @@ -45,7 +45,7 @@ class ElasticSearchSource(LogSource[TimestampedLogLine[ElasticSearchLocation]]): for i, document in enumerate(self._run_scan(self._build_query(), index)): yield self._format_log_line(i, index, document) - def _indices(self) -> List[str]: + def _indices(self) -> Iterator[str]: # FIXME this is a VERY INEFFICIENT fallback if self.start_date is None: return [f'{INDEX_PREFIX}-*'] @@ -61,7 +61,7 @@ class ElasticSearchSource(LogSource[TimestampedLogLine[ElasticSearchLocation]]): start_day += increment def _build_query(self) -> Dict[str, Any]: - query = { + query: Dict[str, Any] = { 'sort': [{'@timestamp': 'asc'}] } @@ -85,7 +85,8 @@ class ElasticSearchSource(LogSource[TimestampedLogLine[ElasticSearchLocation]]): return query def _run_scan(self, query: Dict[str, Any], index: str): - initial = self.client.search(index=index, body=query, size=5_000, scroll='2m') + # the search type stub does not contain the body argument for some reason so we disable typing here. + initial = self.client.search(index=index, body=query, size=5_000, scroll='2m') # type: ignore scroll_id = initial['_scroll_id'] results = initial @@ -106,7 +107,8 @@ class ElasticSearchSource(LogSource[TimestampedLogLine[ElasticSearchLocation]]): contents = document['_source'] return TimestampedLogLine( - location=ElasticSearchLocation(index=index, result_number=result_number, run_id=self.run_id, + location=ElasticSearchLocation(index=index, result_number=result_number, + run_id=contents['pod_labels']['runid'], pod_name=contents['pod_name']), timestamp=datetime.fromisoformat(contents['@timestamp']), raw=contents['message'], diff --git a/logtools/log/sources/transform/merged_source.py b/logtools/log/sources/transform/merged_source.py index 802f58e..229ea66 100644 --- a/logtools/log/sources/transform/merged_source.py +++ b/logtools/log/sources/transform/merged_source.py @@ -1,19 +1,19 @@ from heapq import heapify, heappop, heappush from typing import Iterator -from logtools.log.base import LogSource, TLogLine +from logtools.log.base import LogSource, TimestampedLogLine, TLocation from logtools.log.sources.transform.ordered_source import OrderedSource -class MergedSource(LogSource[TLogLine]): - def __init__(self, *sources: OrderedSource[TLogLine]): +class MergedSource(LogSource[TimestampedLogLine[TLocation]]): + def __init__(self, *sources: OrderedSource[TLocation]): self.sources = [source for source in sources if source.peek is not None] heapify(self.sources) - def __iter__(self) -> Iterator[TLogLine]: + def __iter__(self) -> Iterator[TimestampedLogLine[TLocation]]: return self - def __next__(self) -> TLogLine: + def __next__(self) -> TimestampedLogLine[TLocation]: if not self.sources: raise StopIteration()