feat: add ES slicing to CLI

This commit is contained in:
gmega 2025-01-23 09:05:48 -03:00
parent a9b9fd8332
commit cbba235f2e
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
3 changed files with 69 additions and 32 deletions

View File

@ -155,24 +155,25 @@ def _parse_config(
sys.exit(-1)
def _configure_source(args, dump=False):
# TODO we should probably have builders for sources as well, but for now
# we'll just keep it simple.
if args.source_file:
if not args.source_file.exists():
print(f"Log source file {args.source_file} does not exist.")
sys.exit(-1)
return VectorFlatFileSource(
app_name="codex-benchmarks", file=args.source_file.open(encoding="utf-8")
)
else:
def _configure_logstash_source(args, structured_only=True):
import urllib3
urllib3.disable_warnings()
return LogstashSource(
Elasticsearch(args.es_url, verify_certs=False),
structured_only=not dump,
chronological=args.chronological,
structured_only=structured_only,
slices=args.slices,
)
def _configure_vector_source(args):
if not args.source_file.exists():
print(f"Log source file {args.source_file} does not exist.")
sys.exit(-1)
return VectorFlatFileSource(
app_name="codex-benchmarks", file=args.source_file.open(encoding="utf-8")
)
@ -186,10 +187,15 @@ def _init_logging():
def main():
# TODO this is getting unwieldy, need pull this apart into submodules. For now we get away
# with title comments.
parser = argparse.ArgumentParser()
commands = parser.add_subparsers(required=True)
###########################################################################
# Experiments #
###########################################################################
experiments = commands.add_parser(
"experiments", help="List or run experiments in config file."
)
@ -226,6 +232,9 @@ def main():
describe_cmd.set_defaults(func=cmd_describe_experiment)
###########################################################################
# Logs #
###########################################################################
logs_cmd = commands.add_parser("logs", help="Parse logs.")
log_subcommands = logs_cmd.add_subparsers(required=True)
@ -243,15 +252,6 @@ def main():
log_source_cmd = log_subcommands.add_parser(
"source", help="Parse logs from a log source."
)
source_group = log_source_cmd.add_mutually_exclusive_group(required=True)
source_group.add_argument(
"--source-file", type=Path, help="Vector log file to parse from."
)
source_group.add_argument(
"--es-url", type=str, help="URL to a logstash Elasticsearch instance."
)
log_source_cmd.add_argument(
"group_id", type=str, help="ID of experiment group to parse."
)
@ -267,16 +267,48 @@ def main():
type=Path,
help="Splits logs for the entire group into the specified folder.",
)
log_source_cmd.set_defaults(
func=lambda args: cmd_split_log_source(
_configure_source(args, dump=False), args.group_id, args.output_dir
single_or_split.set_defaults(
func=lambda args: cmd_dump_single_experiment(
args.source(args, False), args.group_id, args.experiment_id
)
if args.output_dir
else cmd_dump_single_experiment(
_configure_source(args, dump=True), args.group_id, args.experiment_id
if args.experiment_id
else cmd_split_log_source(
args.source(args, True), args.group_id, args.output_dir
)
)
source_type = log_source_cmd.add_subparsers(required=True)
es_source = source_type.add_parser("logstash", help="Logstash source.")
es_source.add_argument(
"es_url", type=str, help="URL to a logstash Elasticsearch instance."
)
es_source.add_argument(
"--chronological", action="store_true", help="Sort logs chronologically."
)
es_source.add_argument(
"--slices",
type=int,
help="Number of scroll slices to use when reading the log.",
default=2,
)
es_source.set_defaults(
source=lambda args, structured_only: _configure_logstash_source(
args, structured_only=structured_only
)
)
vector_source = source_type.add_parser("vector", help="Vector flat file source.")
vector_source.add_argument(
"source_file", type=Path, help="Vector log file to parse from."
)
vector_source.set_defaults(source=lambda args, _: _configure_vector_source(args))
###########################################################################
# Agents #
###########################################################################
agent_cmd = commands.add_parser("agent", help="Starts a local agent.")
agent_cmd.add_argument(
"config", type=Path, help="Path to the agent configuration file."

View File

@ -34,6 +34,7 @@ def pflatmap(
finally:
q.put(_End())
# TODO handle SIGTERM properly
executor = ThreadPoolExecutor(max_workers=workers)
try:
task_futures = [executor.submit(_consume, task) for task in tasks]

View File

@ -102,6 +102,7 @@ class LogstashSource(LogSource):
]
if self.slices > 1:
logger.info(f"Querying ES with {self.slices} scroll slices.")
yield from pflatmap(
[
self._run_scroll(sliced_query, actual_indexes)
@ -153,8 +154,11 @@ class LogstashSource(LogSource):
# Get next batch of results
scroll_response = self.client.scroll(scroll_id=scroll_id, scroll="2m")
except Exception as e:
logger.exception(f"Error while scrolling: {e}")
finally:
# Clean up scroll context
logger.info("Worker done, clearing scroll context.")
self.client.clear_scroll(scroll_id=scroll_id)
def __str__(self):