mirror of
https://github.com/logos-storage/bittorrent-benchmarks.git
synced 2026-01-07 07:23:12 +00:00
feat: allow reading multiple Vector files when splitting logs
This commit is contained in:
parent
e47f8848e4
commit
d1077d0972
@ -28,6 +28,7 @@ from benchmarks.logging.sources.sources import (
|
||||
FSOutputManager,
|
||||
split_logs_in_source,
|
||||
LogSource,
|
||||
ChainedLogSource,
|
||||
)
|
||||
from benchmarks.logging.sources.vector_flat_file import VectorFlatFileSource
|
||||
|
||||
@ -179,11 +180,18 @@ def _configure_logstash_source(args, structured_only=True):
|
||||
|
||||
|
||||
def _configure_vector_source(args):
|
||||
if not args.source_file.exists():
|
||||
print(f"Log source file {args.source_file} does not exist.")
|
||||
sys.exit(-1)
|
||||
return VectorFlatFileSource(
|
||||
app_name="codex-benchmarks", file=args.source_file.open(encoding="utf-8")
|
||||
for source_file in args.source_file:
|
||||
if not source_file.exists():
|
||||
print(f"Log source file {args.source_file} does not exist.")
|
||||
sys.exit(-1)
|
||||
|
||||
return ChainedLogSource(
|
||||
[
|
||||
VectorFlatFileSource(
|
||||
app_name="codex-benchmarks", file=source_file.open(encoding="utf-8")
|
||||
)
|
||||
for source_file in args.source_file
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@ -311,7 +319,7 @@ def main():
|
||||
|
||||
vector_source = source_type.add_parser("vector", help="Vector flat file source.")
|
||||
vector_source.add_argument(
|
||||
"source_file", type=Path, help="Vector log file to parse from."
|
||||
"source_file", type=Path, help="Vector log file to parse from.", nargs="+"
|
||||
)
|
||||
|
||||
vector_source.set_defaults(source=lambda args, _: _configure_vector_source(args))
|
||||
|
||||
@ -104,10 +104,12 @@ async def fake_codex_api() -> AsyncIterator[Tuple[FakeCodex, Url]]:
|
||||
|
||||
runner = web.AppRunner(app)
|
||||
await runner.setup()
|
||||
|
||||
site = web.TCPSite(runner, "localhost", 8888)
|
||||
await site.start()
|
||||
|
||||
try:
|
||||
yield codex, Url(scheme="http", host="localhost", port=8888)
|
||||
finally:
|
||||
await site.stop()
|
||||
await runner.cleanup()
|
||||
|
||||
@ -57,6 +57,31 @@ class OutputManager(AbstractContextManager):
|
||||
pass
|
||||
|
||||
|
||||
class ChainedLogSource(LogSource):
|
||||
"""A :class:`LogSource` which chains multiple sources together."""
|
||||
|
||||
def __init__(self, sources: List[LogSource]) -> None:
|
||||
self.sources = sources
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
pass
|
||||
|
||||
def experiments(self, group_id: str) -> Iterator[str]:
|
||||
for source in self.sources:
|
||||
with source:
|
||||
yield from source.experiments(group_id)
|
||||
|
||||
def logs(
|
||||
self, group_id: str, experiment_id: Optional[str] = None
|
||||
) -> Iterator[Tuple[ExperimentId, NodeId, RawLine]]:
|
||||
for source in self.sources:
|
||||
with source:
|
||||
yield from source.logs(group_id, experiment_id)
|
||||
|
||||
|
||||
class FSOutputManager(OutputManager):
|
||||
"""Simple :class:`OutputManager` which writes directly into the file system.
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user