From cbba235f2e741fb499ca5b67b7bb479e296031ca Mon Sep 17 00:00:00 2001 From: gmega Date: Thu, 23 Jan 2025 09:05:48 -0300 Subject: [PATCH] feat: add ES slicing to CLI --- benchmarks/cli.py | 96 +++++++++++++++++--------- benchmarks/core/concurrency.py | 1 + benchmarks/logging/sources/logstash.py | 4 ++ 3 files changed, 69 insertions(+), 32 deletions(-) diff --git a/benchmarks/cli.py b/benchmarks/cli.py index fccc747..4b920fd 100644 --- a/benchmarks/cli.py +++ b/benchmarks/cli.py @@ -155,25 +155,26 @@ def _parse_config( sys.exit(-1) -def _configure_source(args, dump=False): - # TODO we should probably have builders for sources as well, but for now - # we'll just keep it simple. - if args.source_file: - if not args.source_file.exists(): - print(f"Log source file {args.source_file} does not exist.") - sys.exit(-1) - return VectorFlatFileSource( - app_name="codex-benchmarks", file=args.source_file.open(encoding="utf-8") - ) - else: - import urllib3 +def _configure_logstash_source(args, structured_only=True): + import urllib3 - urllib3.disable_warnings() + urllib3.disable_warnings() - return LogstashSource( - Elasticsearch(args.es_url, verify_certs=False), - structured_only=not dump, - ) + return LogstashSource( + Elasticsearch(args.es_url, verify_certs=False), + chronological=args.chronological, + structured_only=structured_only, + slices=args.slices, + ) + + +def _configure_vector_source(args): + if not args.source_file.exists(): + print(f"Log source file {args.source_file} does not exist.") + sys.exit(-1) + return VectorFlatFileSource( + app_name="codex-benchmarks", file=args.source_file.open(encoding="utf-8") + ) def _init_logging(): @@ -186,10 +187,15 @@ def _init_logging(): def main(): + # TODO this is getting unwieldy, need pull this apart into submodules. For now we get away + # with title comments. parser = argparse.ArgumentParser() commands = parser.add_subparsers(required=True) + ########################################################################### + # Experiments # + ########################################################################### experiments = commands.add_parser( "experiments", help="List or run experiments in config file." ) @@ -226,6 +232,9 @@ def main(): describe_cmd.set_defaults(func=cmd_describe_experiment) + ########################################################################### + # Logs # + ########################################################################### logs_cmd = commands.add_parser("logs", help="Parse logs.") log_subcommands = logs_cmd.add_subparsers(required=True) @@ -243,15 +252,6 @@ def main(): log_source_cmd = log_subcommands.add_parser( "source", help="Parse logs from a log source." ) - - source_group = log_source_cmd.add_mutually_exclusive_group(required=True) - source_group.add_argument( - "--source-file", type=Path, help="Vector log file to parse from." - ) - source_group.add_argument( - "--es-url", type=str, help="URL to a logstash Elasticsearch instance." - ) - log_source_cmd.add_argument( "group_id", type=str, help="ID of experiment group to parse." ) @@ -267,16 +267,48 @@ def main(): type=Path, help="Splits logs for the entire group into the specified folder.", ) - log_source_cmd.set_defaults( - func=lambda args: cmd_split_log_source( - _configure_source(args, dump=False), args.group_id, args.output_dir + + single_or_split.set_defaults( + func=lambda args: cmd_dump_single_experiment( + args.source(args, False), args.group_id, args.experiment_id ) - if args.output_dir - else cmd_dump_single_experiment( - _configure_source(args, dump=True), args.group_id, args.experiment_id + if args.experiment_id + else cmd_split_log_source( + args.source(args, True), args.group_id, args.output_dir ) ) + source_type = log_source_cmd.add_subparsers(required=True) + es_source = source_type.add_parser("logstash", help="Logstash source.") + es_source.add_argument( + "es_url", type=str, help="URL to a logstash Elasticsearch instance." + ) + es_source.add_argument( + "--chronological", action="store_true", help="Sort logs chronologically." + ) + es_source.add_argument( + "--slices", + type=int, + help="Number of scroll slices to use when reading the log.", + default=2, + ) + + es_source.set_defaults( + source=lambda args, structured_only: _configure_logstash_source( + args, structured_only=structured_only + ) + ) + + vector_source = source_type.add_parser("vector", help="Vector flat file source.") + vector_source.add_argument( + "source_file", type=Path, help="Vector log file to parse from." + ) + + vector_source.set_defaults(source=lambda args, _: _configure_vector_source(args)) + + ########################################################################### + # Agents # + ########################################################################### agent_cmd = commands.add_parser("agent", help="Starts a local agent.") agent_cmd.add_argument( "config", type=Path, help="Path to the agent configuration file." diff --git a/benchmarks/core/concurrency.py b/benchmarks/core/concurrency.py index a2183b7..a7a5170 100644 --- a/benchmarks/core/concurrency.py +++ b/benchmarks/core/concurrency.py @@ -34,6 +34,7 @@ def pflatmap( finally: q.put(_End()) + # TODO handle SIGTERM properly executor = ThreadPoolExecutor(max_workers=workers) try: task_futures = [executor.submit(_consume, task) for task in tasks] diff --git a/benchmarks/logging/sources/logstash.py b/benchmarks/logging/sources/logstash.py index 384d055..be75e8c 100644 --- a/benchmarks/logging/sources/logstash.py +++ b/benchmarks/logging/sources/logstash.py @@ -102,6 +102,7 @@ class LogstashSource(LogSource): ] if self.slices > 1: + logger.info(f"Querying ES with {self.slices} scroll slices.") yield from pflatmap( [ self._run_scroll(sliced_query, actual_indexes) @@ -153,8 +154,11 @@ class LogstashSource(LogSource): # Get next batch of results scroll_response = self.client.scroll(scroll_id=scroll_id, scroll="2m") + except Exception as e: + logger.exception(f"Error while scrolling: {e}") finally: # Clean up scroll context + logger.info("Worker done, clearing scroll context.") self.client.clear_scroll(scroll_id=scroll_id) def __str__(self):