diff --git a/logtools/cli/es_logs.py b/logtools/cli/es_logs.py index 508c7bc..811bc1f 100644 --- a/logtools/cli/es_logs.py +++ b/logtools/cli/es_logs.py @@ -1,11 +1,15 @@ from enum import Enum -from typing import List, Iterator, Iterable +from typing import List, Iterable +from colored import Style +from dateutil import parser as tsparser from elasticsearch import Elasticsearch from prettytable import PrettyTable from traitlets.config.loader import ArgumentParser +from logtools.cli.palettes import ColorMap from logtools.log.sources.input.elastic_search.elastic_search_log_repo import ElasticSearchLogRepo +from logtools.log.sources.input.elastic_search.elastic_search_source import ElasticSearchSource class ResourceType(Enum): @@ -36,10 +40,25 @@ def _format_field(field: str | Iterable[object]): return ', '.join([str(item) for item in field]) -def get_object(args, repo: ElasticSearchLogRepo): +def get_object(args, client: Elasticsearch): + repo = ElasticSearchLogRepo(client=client) print(format_table(GETTERS[ResourceType[args.resource_type]](repo, args))) +def get_logs(args, client: Elasticsearch): + colors = ColorMap() + for line in ElasticSearchSource( + pods=args.pods, + client=client, + start_date=args.from_, + end_date=args.to, + ): + output = f'[{line.location.pod_name}]: {line.raw}' + if not args.no_color: + output = f'{colors[line.location.pod_name]}{output}{Style.reset}' + print(output) + + def main(): parser = ArgumentParser() parser.add_argument( @@ -56,14 +75,28 @@ def main(): get_subparsers = get.add_subparsers(title='Resource type', dest='resource_type', required=True) get_pods = get_subparsers.add_parser('pods', help='Display existing pods') get_pods.add_argument('--prefix', help='Filter pods by prefix') - get_pods.add_argument('--run-id', help='Show pods for a given run') + get_pods.add_argument('--run-id', help='Show pods for a given run', required=True) get_namespaces = get_subparsers.add_parser('namespaces', help='Display existing namespaces') get_namespaces.add_argument('--prefix', help='Filter namespaces by prefix') + logs = subparsers.add_parser('logs', help='Fetch pod logs') + logs.set_defaults(main=get_logs) + + logs.add_argument('--pods', nargs='+', help='Pods to fetch logs for', required=True) + logs.add_argument('--from', dest='from_', type=tsparser.parse, + help='Show entries from date/time (MM-DD-YYYY, or MM-DD-YYYY HH:MM:SS.mmmmmm), ' + 'treated as UTC if no timezone given', default=None) + logs.add_argument('--to', dest='to', type=tsparser.parse, + help='Show entries until date/time (MM-DD-YYYY, or MM-DD-YYYY HH:MM:SS.mmmmmm), ' + 'treated as UTC if no timezone given', default=None) + logs.add_argument('--no-color', dest='no_color', action='store_true', help='Disable colored output') + args = parser.parse_args() - args.main(args, ElasticSearchLogRepo(client=Elasticsearch(args.es_host))) + client = Elasticsearch(args.es_host, request_timeout=60) + + args.main(args, client) if __name__ == '__main__': diff --git a/logtools/cli/palettes.py b/logtools/cli/palettes.py new file mode 100644 index 0000000..96b31ba --- /dev/null +++ b/logtools/cli/palettes.py @@ -0,0 +1,51 @@ +import re +import random +from typing import Mapping + +from colored import Fore + +_SIMPLE_PALETTE_RGB = [ + (228, 26, 28), # red + (55, 126, 184), # blue + (77, 175, 74), # green + (152, 78, 163), # purple + (255, 127, 0), # orange + (255, 255, 51), # yellow + (166, 86, 40), # brown + (247, 129, 191), # pink + (153, 153, 153), # grey +] + + +def rgb_to_ansi(red, green, blue): + return f'\x1b[38;2;{red};{green};{blue}m' + + +SIMPLE_PALETTE = [rgb_to_ansi(*rgb) for rgb in _SIMPLE_PALETTE_RGB] + +COLORED_PALETTE = [getattr(Fore, color) for color in Fore._COLORS.keys() + if not re.match(r'dark|deep|black', color)] + +# Randomize, but deterministically +shuffler = random.Random(x=1234) +shuffler.shuffle(COLORED_PALETTE) + +FULL_PALETTE = SIMPLE_PALETTE + COLORED_PALETTE + + +class ColorMap(Mapping[str, str]): + def __init__(self): + self._colors = {} + self._next_color = 0 + + def __getitem__(self, key: str) -> str: + if key not in self._colors: + self._colors[key] = FULL_PALETTE[self._next_color] + self._next_color += 1 + return self._colors[key] + + def __len__(self): + return len(self._colors) + + def __iter__(self): + return self._colors.__iter__() diff --git a/logtools/log/sources/input/elastic_search/elastic_search_log_repo.py b/logtools/log/sources/input/elastic_search/elastic_search_log_repo.py index d8e5422..e0e63b1 100644 --- a/logtools/log/sources/input/elastic_search/elastic_search_log_repo.py +++ b/logtools/log/sources/input/elastic_search/elastic_search_log_repo.py @@ -11,8 +11,8 @@ logger = logging.getLogger(__name__) @dataclass(frozen=True) class Namespace: name: str - run_id: frozenset[str] - indices: frozenset[str] + run_id: tuple[str, ...] + indices: tuple[str, ...] @dataclass(frozen=True) @@ -20,7 +20,7 @@ class Pod: name: str namespace: str run_id: str - indices: frozenset[str] + indices: tuple[str, ...] class ElasticSearchLogRepo: @@ -58,8 +58,8 @@ class ElasticSearchLogRepo: for namespace in result['aggregations']['distinct_namespaces']['buckets']: yield Namespace( name=namespace['key'], - run_id=frozenset(run_id['key'] for run_id in namespace['runid']['buckets']), - indices=frozenset(index['key'] for index in namespace['indices']['buckets']) + run_id=tuple(sorted(run_id['key'] for run_id in namespace['runid']['buckets'])), + indices=tuple(sorted(index['key'] for index in namespace['indices']['buckets'])) ) def pods(self, prefix: Optional[str] = None, run_id: Optional[str] = None): @@ -95,5 +95,6 @@ class ElasticSearchLogRepo: name=pod['key'], namespace=pod['namespace']['buckets'][0]['key'], run_id=pod['runid']['buckets'][0]['key'], - indices=frozenset(index['key'] for index in pod['indices']['buckets']) + indices=tuple(sorted(index['key'] for index in pod['indices']['buckets'])) ) + diff --git a/logtools/log/sources/input/elastic_search/elastic_search_source.py b/logtools/log/sources/input/elastic_search/elastic_search_source.py index 50324ad..11a120d 100644 --- a/logtools/log/sources/input/elastic_search/elastic_search_source.py +++ b/logtools/log/sources/input/elastic_search/elastic_search_source.py @@ -26,8 +26,8 @@ class ElasticSearchSource(LogSource[TimestampedLogLine[ElasticSearchLocation]]): pods: Optional[Set[str]] = None, run_id: Optional[str] = None, client: Optional[Elasticsearch] = None, - start_date: Optional[datetime] = datetime.min, - end_date: Optional[datetime] = datetime.max, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, ): if client is None: logger.warning('No client provided, defaulting to localhost') @@ -42,12 +42,16 @@ class ElasticSearchSource(LogSource[TimestampedLogLine[ElasticSearchLocation]]): def __iter__(self) -> Iterator[TimestampedLogLine[ElasticSearchLocation]]: for index in self._indices(): - for i, document in enumerate(self._get_logs(index)): + for i, document in enumerate(self._run_scan(self._build_query(), index)): yield self._format_log_line(i, index, document) def _indices(self) -> List[str]: + # FIXME this is a VERY INEFFICIENT fallback + if self.start_date is None: + return [f'{INDEX_PREFIX}-*'] + start_day = self.start_date.date() - end_day = self.end_date.date() + end_day = self.end_date.date() if self.end_date else datetime.now().date() increment = timedelta(days=1) while start_day <= end_day: @@ -56,32 +60,29 @@ class ElasticSearchSource(LogSource[TimestampedLogLine[ElasticSearchLocation]]): yield index start_day += increment - def _get_logs(self, index: str): + def _build_query(self) -> Dict[str, Any]: query = { - 'sort': [{'@timestamp': 'asc'}], - 'query': { - 'bool': { - 'filter': [ - { - 'range': { - '@timestamp': { - 'gte': self.start_date.isoformat(), - 'lte': self.end_date.isoformat(), - } - } - } - ] - } - } + 'sort': [{'@timestamp': 'asc'}] } + if self.start_date is not None or self.end_date is not None: + time_range = {} + if self.start_date is not None: + time_range['gte'] = self.start_date.isoformat() + if self.end_date is not None: + time_range['lte'] = self.end_date.isoformat() + query['query'] = {'bool': {'filter': [{'range': {'@timestamp': time_range}}]}} + if self.pods is not None: query['query']['bool']['filter'].append({"terms": {"pod_name.keyword": list(self.pods)}}) if self.run_id is not None: query['query']['bool']['filter'].append({"term": {"pod_labels.runid.keyword": self.run_id}}) - return self._run_scan(query, index) + if 'query' not in query: + query['query'] = {'match_all': {}} + + return query def _run_scan(self, query: Dict[str, Any], index: str): initial = self.client.search(index=index, body=query, size=5_000, scroll='2m') diff --git a/logtools/log/sources/input/elastic_search/tests/test_elasticsearch_log_repo.py b/logtools/log/sources/input/elastic_search/tests/test_elasticsearch_log_repo.py index 044df2b..89c2ae9 100644 --- a/logtools/log/sources/input/elastic_search/tests/test_elasticsearch_log_repo.py +++ b/logtools/log/sources/input/elastic_search/tests/test_elasticsearch_log_repo.py @@ -15,27 +15,27 @@ def test_should_retrieve_existing_namespaces(): assert set(namespaces) == { Namespace( name='codex-continuous-tests-profiling-two-client-tests-0', - run_id=frozenset({ - '20231109-085853', + run_id=( + '20231107-064223', + '20231107-065930', '20231107-074743', '20231109-043100', - '20231107-065930', - '20231107-064223', - '20231109-055106' - }), - indices=frozenset({ + '20231109-055106', + '20231109-085853', + ), + indices=( 'continuous-tests-pods-2023.11.07', 'continuous-tests-pods-2023.11.09', 'continuous-tests-pods-2023.11.10', - }), + ), ), Namespace( name='codex-continuous-tests-profiling-two-client-tests-sched-0', - run_id=frozenset({'20231109-101554'}), - indices=frozenset({ + run_id=('20231109-101554',), + indices=( 'continuous-tests-pods-2023.11.09', - 'continuous-tests-pods-2023.11.10' - }), + 'continuous-tests-pods-2023.11.10', + ), ) } @@ -55,10 +55,8 @@ def test_should_retrieve_existing_pods_for_namespace(): name='bootstrap-2-58b69484bc-88msf', namespace='codex-continuous-tests-profiling-two-client-tests-sched-0', run_id='20231109-101554', - indices=frozenset({ + indices=( 'continuous-tests-pods-2023.11.09', - 'continuous-tests-pods-2023.11.10' - }) + 'continuous-tests-pods-2023.11.10', + ) ) in pods - - diff --git a/pyproject.toml b/pyproject.toml index c26be38..0841b9e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,4 +29,5 @@ build-backend = "poetry.core.masonry.api" [tool.poetry.scripts] log-merge = 'logtools.cli.merge:main' -log-to-csv = 'logtools.cli.to_csv:main' \ No newline at end of file +log-to-csv = 'logtools.cli.to_csv:main' +es-logs = 'logtools.cli.es_logs:main' \ No newline at end of file