mirror of
https://github.com/logos-storage/logtools.git
synced 2026-01-03 22:13:08 +00:00
fix type checking errors
This commit is contained in:
parent
71a66b7624
commit
db92220a9c
@ -18,10 +18,10 @@ class LineNumberLocation:
|
|||||||
@dataclass
|
@dataclass
|
||||||
class RawLogLine(Generic[TLocation]):
|
class RawLogLine(Generic[TLocation]):
|
||||||
"""
|
"""
|
||||||
A :class:`RawLogLine` is a log line that has not been parsed. It contains the raw text of the line and an optional
|
A :class:`RawLogLine` is a log line that has not been parsed. It contains the raw text of the line and a
|
||||||
location when that can be meaningfully established by the input source.
|
location, when that can be meaningfully established by the input source.
|
||||||
"""
|
"""
|
||||||
location: Optional[TLocation]
|
location: TLocation
|
||||||
raw: str
|
raw: str
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -51,9 +51,9 @@ class ElasticSearchLogRepo:
|
|||||||
}
|
}
|
||||||
|
|
||||||
if prefix is not None:
|
if prefix is not None:
|
||||||
query['aggs']['distinct_namespaces']['terms']['include'] = f'{prefix}.*'
|
query['aggs']['distinct_namespaces']['terms']['include'] = f'{prefix}.*' # type: ignore
|
||||||
|
|
||||||
result = self.client.search(index=self.indices, body=query)
|
result = self.client.search(index=self.indices, body=query) # type: ignore
|
||||||
|
|
||||||
for namespace in result['aggregations']['distinct_namespaces']['buckets']:
|
for namespace in result['aggregations']['distinct_namespaces']['buckets']:
|
||||||
yield Namespace(
|
yield Namespace(
|
||||||
@ -78,7 +78,7 @@ class ElasticSearchLogRepo:
|
|||||||
}
|
}
|
||||||
|
|
||||||
if prefix is not None:
|
if prefix is not None:
|
||||||
query['aggs']['distinct_pods']['terms']['include'] = f'{prefix}.*'
|
query['aggs']['distinct_pods']['terms']['include'] = f'{prefix}.*' # type: ignore
|
||||||
|
|
||||||
if run_id is not None:
|
if run_id is not None:
|
||||||
query['query'] = {
|
query['query'] = {
|
||||||
@ -87,7 +87,8 @@ class ElasticSearchLogRepo:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for pod in self.client.search(index=self.indices, body=query)['aggregations']['distinct_pods']['buckets']:
|
for pod in self.client.search(index=self.indices,
|
||||||
|
body=query)['aggregations']['distinct_pods']['buckets']: # type: ignore
|
||||||
assert len(pod['namespace']['buckets']) == 1, 'Pods should only have one namespace'
|
assert len(pod['namespace']['buckets']) == 1, 'Pods should only have one namespace'
|
||||||
assert len(pod['runid']['buckets']) == 1, 'Pods should only have one run_id'
|
assert len(pod['runid']['buckets']) == 1, 'Pods should only have one run_id'
|
||||||
|
|
||||||
@ -97,4 +98,3 @@ class ElasticSearchLogRepo:
|
|||||||
run_id=pod['runid']['buckets'][0]['key'],
|
run_id=pod['runid']['buckets'][0]['key'],
|
||||||
indices=tuple(sorted(index['key'] for index in pod['indices']['buckets']))
|
indices=tuple(sorted(index['key'] for index in pod['indices']['buckets']))
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@ -45,7 +45,7 @@ class ElasticSearchSource(LogSource[TimestampedLogLine[ElasticSearchLocation]]):
|
|||||||
for i, document in enumerate(self._run_scan(self._build_query(), index)):
|
for i, document in enumerate(self._run_scan(self._build_query(), index)):
|
||||||
yield self._format_log_line(i, index, document)
|
yield self._format_log_line(i, index, document)
|
||||||
|
|
||||||
def _indices(self) -> List[str]:
|
def _indices(self) -> Iterator[str]:
|
||||||
# FIXME this is a VERY INEFFICIENT fallback
|
# FIXME this is a VERY INEFFICIENT fallback
|
||||||
if self.start_date is None:
|
if self.start_date is None:
|
||||||
return [f'{INDEX_PREFIX}-*']
|
return [f'{INDEX_PREFIX}-*']
|
||||||
@ -61,7 +61,7 @@ class ElasticSearchSource(LogSource[TimestampedLogLine[ElasticSearchLocation]]):
|
|||||||
start_day += increment
|
start_day += increment
|
||||||
|
|
||||||
def _build_query(self) -> Dict[str, Any]:
|
def _build_query(self) -> Dict[str, Any]:
|
||||||
query = {
|
query: Dict[str, Any] = {
|
||||||
'sort': [{'@timestamp': 'asc'}]
|
'sort': [{'@timestamp': 'asc'}]
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -85,7 +85,8 @@ class ElasticSearchSource(LogSource[TimestampedLogLine[ElasticSearchLocation]]):
|
|||||||
return query
|
return query
|
||||||
|
|
||||||
def _run_scan(self, query: Dict[str, Any], index: str):
|
def _run_scan(self, query: Dict[str, Any], index: str):
|
||||||
initial = self.client.search(index=index, body=query, size=5_000, scroll='2m')
|
# the search type stub does not contain the body argument for some reason so we disable typing here.
|
||||||
|
initial = self.client.search(index=index, body=query, size=5_000, scroll='2m') # type: ignore
|
||||||
scroll_id = initial['_scroll_id']
|
scroll_id = initial['_scroll_id']
|
||||||
results = initial
|
results = initial
|
||||||
|
|
||||||
@ -106,7 +107,8 @@ class ElasticSearchSource(LogSource[TimestampedLogLine[ElasticSearchLocation]]):
|
|||||||
contents = document['_source']
|
contents = document['_source']
|
||||||
|
|
||||||
return TimestampedLogLine(
|
return TimestampedLogLine(
|
||||||
location=ElasticSearchLocation(index=index, result_number=result_number, run_id=self.run_id,
|
location=ElasticSearchLocation(index=index, result_number=result_number,
|
||||||
|
run_id=contents['pod_labels']['runid'],
|
||||||
pod_name=contents['pod_name']),
|
pod_name=contents['pod_name']),
|
||||||
timestamp=datetime.fromisoformat(contents['@timestamp']),
|
timestamp=datetime.fromisoformat(contents['@timestamp']),
|
||||||
raw=contents['message'],
|
raw=contents['message'],
|
||||||
|
|||||||
@ -1,19 +1,19 @@
|
|||||||
from heapq import heapify, heappop, heappush
|
from heapq import heapify, heappop, heappush
|
||||||
from typing import Iterator
|
from typing import Iterator
|
||||||
|
|
||||||
from logtools.log.base import LogSource, TLogLine
|
from logtools.log.base import LogSource, TimestampedLogLine, TLocation
|
||||||
from logtools.log.sources.transform.ordered_source import OrderedSource
|
from logtools.log.sources.transform.ordered_source import OrderedSource
|
||||||
|
|
||||||
|
|
||||||
class MergedSource(LogSource[TLogLine]):
|
class MergedSource(LogSource[TimestampedLogLine[TLocation]]):
|
||||||
def __init__(self, *sources: OrderedSource[TLogLine]):
|
def __init__(self, *sources: OrderedSource[TLocation]):
|
||||||
self.sources = [source for source in sources if source.peek is not None]
|
self.sources = [source for source in sources if source.peek is not None]
|
||||||
heapify(self.sources)
|
heapify(self.sources)
|
||||||
|
|
||||||
def __iter__(self) -> Iterator[TLogLine]:
|
def __iter__(self) -> Iterator[TimestampedLogLine[TLocation]]:
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def __next__(self) -> TLogLine:
|
def __next__(self) -> TimestampedLogLine[TLocation]:
|
||||||
if not self.sources:
|
if not self.sources:
|
||||||
raise StopIteration()
|
raise StopIteration()
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user