add elasticsearch CLI tool

This commit is contained in:
gmega 2023-11-14 09:42:12 -03:00
parent b316e458c9
commit 80d1aef26b
No known key found for this signature in database
GPG Key ID: FFD8DAF00660270F
6 changed files with 134 additions and 49 deletions

View File

@ -1,11 +1,15 @@
from enum import Enum from enum import Enum
from typing import List, Iterator, Iterable from typing import List, Iterable
from colored import Style
from dateutil import parser as tsparser
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
from prettytable import PrettyTable from prettytable import PrettyTable
from traitlets.config.loader import ArgumentParser from traitlets.config.loader import ArgumentParser
from logtools.cli.palettes import ColorMap
from logtools.log.sources.input.elastic_search.elastic_search_log_repo import ElasticSearchLogRepo from logtools.log.sources.input.elastic_search.elastic_search_log_repo import ElasticSearchLogRepo
from logtools.log.sources.input.elastic_search.elastic_search_source import ElasticSearchSource
class ResourceType(Enum): class ResourceType(Enum):
@ -36,10 +40,25 @@ def _format_field(field: str | Iterable[object]):
return ', '.join([str(item) for item in field]) return ', '.join([str(item) for item in field])
def get_object(args, repo: ElasticSearchLogRepo): def get_object(args, client: Elasticsearch):
repo = ElasticSearchLogRepo(client=client)
print(format_table(GETTERS[ResourceType[args.resource_type]](repo, args))) print(format_table(GETTERS[ResourceType[args.resource_type]](repo, args)))
def get_logs(args, client: Elasticsearch):
colors = ColorMap()
for line in ElasticSearchSource(
pods=args.pods,
client=client,
start_date=args.from_,
end_date=args.to,
):
output = f'[{line.location.pod_name}]: {line.raw}'
if not args.no_color:
output = f'{colors[line.location.pod_name]}{output}{Style.reset}'
print(output)
def main(): def main():
parser = ArgumentParser() parser = ArgumentParser()
parser.add_argument( parser.add_argument(
@ -56,14 +75,28 @@ def main():
get_subparsers = get.add_subparsers(title='Resource type', dest='resource_type', required=True) get_subparsers = get.add_subparsers(title='Resource type', dest='resource_type', required=True)
get_pods = get_subparsers.add_parser('pods', help='Display existing pods') get_pods = get_subparsers.add_parser('pods', help='Display existing pods')
get_pods.add_argument('--prefix', help='Filter pods by prefix') get_pods.add_argument('--prefix', help='Filter pods by prefix')
get_pods.add_argument('--run-id', help='Show pods for a given run') get_pods.add_argument('--run-id', help='Show pods for a given run', required=True)
get_namespaces = get_subparsers.add_parser('namespaces', help='Display existing namespaces') get_namespaces = get_subparsers.add_parser('namespaces', help='Display existing namespaces')
get_namespaces.add_argument('--prefix', help='Filter namespaces by prefix') get_namespaces.add_argument('--prefix', help='Filter namespaces by prefix')
logs = subparsers.add_parser('logs', help='Fetch pod logs')
logs.set_defaults(main=get_logs)
logs.add_argument('--pods', nargs='+', help='Pods to fetch logs for', required=True)
logs.add_argument('--from', dest='from_', type=tsparser.parse,
help='Show entries from date/time (MM-DD-YYYY, or MM-DD-YYYY HH:MM:SS.mmmmmm), '
'treated as UTC if no timezone given', default=None)
logs.add_argument('--to', dest='to', type=tsparser.parse,
help='Show entries until date/time (MM-DD-YYYY, or MM-DD-YYYY HH:MM:SS.mmmmmm), '
'treated as UTC if no timezone given', default=None)
logs.add_argument('--no-color', dest='no_color', action='store_true', help='Disable colored output')
args = parser.parse_args() args = parser.parse_args()
args.main(args, ElasticSearchLogRepo(client=Elasticsearch(args.es_host))) client = Elasticsearch(args.es_host, request_timeout=60)
args.main(args, client)
if __name__ == '__main__': if __name__ == '__main__':

51
logtools/cli/palettes.py Normal file
View File

@ -0,0 +1,51 @@
import re
import random
from typing import Mapping
from colored import Fore
_SIMPLE_PALETTE_RGB = [
(228, 26, 28), # red
(55, 126, 184), # blue
(77, 175, 74), # green
(152, 78, 163), # purple
(255, 127, 0), # orange
(255, 255, 51), # yellow
(166, 86, 40), # brown
(247, 129, 191), # pink
(153, 153, 153), # grey
]
def rgb_to_ansi(red, green, blue):
return f'\x1b[38;2;{red};{green};{blue}m'
SIMPLE_PALETTE = [rgb_to_ansi(*rgb) for rgb in _SIMPLE_PALETTE_RGB]
COLORED_PALETTE = [getattr(Fore, color) for color in Fore._COLORS.keys()
if not re.match(r'dark|deep|black', color)]
# Randomize, but deterministically
shuffler = random.Random(x=1234)
shuffler.shuffle(COLORED_PALETTE)
FULL_PALETTE = SIMPLE_PALETTE + COLORED_PALETTE
class ColorMap(Mapping[str, str]):
def __init__(self):
self._colors = {}
self._next_color = 0
def __getitem__(self, key: str) -> str:
if key not in self._colors:
self._colors[key] = FULL_PALETTE[self._next_color]
self._next_color += 1
return self._colors[key]
def __len__(self):
return len(self._colors)
def __iter__(self):
return self._colors.__iter__()

View File

@ -11,8 +11,8 @@ logger = logging.getLogger(__name__)
@dataclass(frozen=True) @dataclass(frozen=True)
class Namespace: class Namespace:
name: str name: str
run_id: frozenset[str] run_id: tuple[str, ...]
indices: frozenset[str] indices: tuple[str, ...]
@dataclass(frozen=True) @dataclass(frozen=True)
@ -20,7 +20,7 @@ class Pod:
name: str name: str
namespace: str namespace: str
run_id: str run_id: str
indices: frozenset[str] indices: tuple[str, ...]
class ElasticSearchLogRepo: class ElasticSearchLogRepo:
@ -58,8 +58,8 @@ class ElasticSearchLogRepo:
for namespace in result['aggregations']['distinct_namespaces']['buckets']: for namespace in result['aggregations']['distinct_namespaces']['buckets']:
yield Namespace( yield Namespace(
name=namespace['key'], name=namespace['key'],
run_id=frozenset(run_id['key'] for run_id in namespace['runid']['buckets']), run_id=tuple(sorted(run_id['key'] for run_id in namespace['runid']['buckets'])),
indices=frozenset(index['key'] for index in namespace['indices']['buckets']) indices=tuple(sorted(index['key'] for index in namespace['indices']['buckets']))
) )
def pods(self, prefix: Optional[str] = None, run_id: Optional[str] = None): def pods(self, prefix: Optional[str] = None, run_id: Optional[str] = None):
@ -95,5 +95,6 @@ class ElasticSearchLogRepo:
name=pod['key'], name=pod['key'],
namespace=pod['namespace']['buckets'][0]['key'], namespace=pod['namespace']['buckets'][0]['key'],
run_id=pod['runid']['buckets'][0]['key'], run_id=pod['runid']['buckets'][0]['key'],
indices=frozenset(index['key'] for index in pod['indices']['buckets']) indices=tuple(sorted(index['key'] for index in pod['indices']['buckets']))
) )

View File

@ -26,8 +26,8 @@ class ElasticSearchSource(LogSource[TimestampedLogLine[ElasticSearchLocation]]):
pods: Optional[Set[str]] = None, pods: Optional[Set[str]] = None,
run_id: Optional[str] = None, run_id: Optional[str] = None,
client: Optional[Elasticsearch] = None, client: Optional[Elasticsearch] = None,
start_date: Optional[datetime] = datetime.min, start_date: Optional[datetime] = None,
end_date: Optional[datetime] = datetime.max, end_date: Optional[datetime] = None,
): ):
if client is None: if client is None:
logger.warning('No client provided, defaulting to localhost') logger.warning('No client provided, defaulting to localhost')
@ -42,12 +42,16 @@ class ElasticSearchSource(LogSource[TimestampedLogLine[ElasticSearchLocation]]):
def __iter__(self) -> Iterator[TimestampedLogLine[ElasticSearchLocation]]: def __iter__(self) -> Iterator[TimestampedLogLine[ElasticSearchLocation]]:
for index in self._indices(): for index in self._indices():
for i, document in enumerate(self._get_logs(index)): for i, document in enumerate(self._run_scan(self._build_query(), index)):
yield self._format_log_line(i, index, document) yield self._format_log_line(i, index, document)
def _indices(self) -> List[str]: def _indices(self) -> List[str]:
# FIXME this is a VERY INEFFICIENT fallback
if self.start_date is None:
return [f'{INDEX_PREFIX}-*']
start_day = self.start_date.date() start_day = self.start_date.date()
end_day = self.end_date.date() end_day = self.end_date.date() if self.end_date else datetime.now().date()
increment = timedelta(days=1) increment = timedelta(days=1)
while start_day <= end_day: while start_day <= end_day:
@ -56,32 +60,29 @@ class ElasticSearchSource(LogSource[TimestampedLogLine[ElasticSearchLocation]]):
yield index yield index
start_day += increment start_day += increment
def _get_logs(self, index: str): def _build_query(self) -> Dict[str, Any]:
query = { query = {
'sort': [{'@timestamp': 'asc'}], 'sort': [{'@timestamp': 'asc'}]
'query': {
'bool': {
'filter': [
{
'range': {
'@timestamp': {
'gte': self.start_date.isoformat(),
'lte': self.end_date.isoformat(),
}
}
}
]
}
}
} }
if self.start_date is not None or self.end_date is not None:
time_range = {}
if self.start_date is not None:
time_range['gte'] = self.start_date.isoformat()
if self.end_date is not None:
time_range['lte'] = self.end_date.isoformat()
query['query'] = {'bool': {'filter': [{'range': {'@timestamp': time_range}}]}}
if self.pods is not None: if self.pods is not None:
query['query']['bool']['filter'].append({"terms": {"pod_name.keyword": list(self.pods)}}) query['query']['bool']['filter'].append({"terms": {"pod_name.keyword": list(self.pods)}})
if self.run_id is not None: if self.run_id is not None:
query['query']['bool']['filter'].append({"term": {"pod_labels.runid.keyword": self.run_id}}) query['query']['bool']['filter'].append({"term": {"pod_labels.runid.keyword": self.run_id}})
return self._run_scan(query, index) if 'query' not in query:
query['query'] = {'match_all': {}}
return query
def _run_scan(self, query: Dict[str, Any], index: str): def _run_scan(self, query: Dict[str, Any], index: str):
initial = self.client.search(index=index, body=query, size=5_000, scroll='2m') initial = self.client.search(index=index, body=query, size=5_000, scroll='2m')

View File

@ -15,27 +15,27 @@ def test_should_retrieve_existing_namespaces():
assert set(namespaces) == { assert set(namespaces) == {
Namespace( Namespace(
name='codex-continuous-tests-profiling-two-client-tests-0', name='codex-continuous-tests-profiling-two-client-tests-0',
run_id=frozenset({ run_id=(
'20231109-085853', '20231107-064223',
'20231107-065930',
'20231107-074743', '20231107-074743',
'20231109-043100', '20231109-043100',
'20231107-065930', '20231109-055106',
'20231107-064223', '20231109-085853',
'20231109-055106' ),
}), indices=(
indices=frozenset({
'continuous-tests-pods-2023.11.07', 'continuous-tests-pods-2023.11.07',
'continuous-tests-pods-2023.11.09', 'continuous-tests-pods-2023.11.09',
'continuous-tests-pods-2023.11.10', 'continuous-tests-pods-2023.11.10',
}), ),
), ),
Namespace( Namespace(
name='codex-continuous-tests-profiling-two-client-tests-sched-0', name='codex-continuous-tests-profiling-two-client-tests-sched-0',
run_id=frozenset({'20231109-101554'}), run_id=('20231109-101554',),
indices=frozenset({ indices=(
'continuous-tests-pods-2023.11.09', 'continuous-tests-pods-2023.11.09',
'continuous-tests-pods-2023.11.10' 'continuous-tests-pods-2023.11.10',
}), ),
) )
} }
@ -55,10 +55,8 @@ def test_should_retrieve_existing_pods_for_namespace():
name='bootstrap-2-58b69484bc-88msf', name='bootstrap-2-58b69484bc-88msf',
namespace='codex-continuous-tests-profiling-two-client-tests-sched-0', namespace='codex-continuous-tests-profiling-two-client-tests-sched-0',
run_id='20231109-101554', run_id='20231109-101554',
indices=frozenset({ indices=(
'continuous-tests-pods-2023.11.09', 'continuous-tests-pods-2023.11.09',
'continuous-tests-pods-2023.11.10' 'continuous-tests-pods-2023.11.10',
}) )
) in pods ) in pods

View File

@ -30,3 +30,4 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts] [tool.poetry.scripts]
log-merge = 'logtools.cli.merge:main' log-merge = 'logtools.cli.merge:main'
log-to-csv = 'logtools.cli.to_csv:main' log-to-csv = 'logtools.cli.to_csv:main'
es-logs = 'logtools.cli.es_logs:main'