feat: add local file generation agents and drop requirement for shared volumes with the test runner

This commit is contained in:
gmega 2025-01-19 10:32:27 -03:00
parent 3ed074e56f
commit 27d0815be8
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
34 changed files with 764 additions and 407 deletions

1
.gitignore vendored
View File

@ -2,7 +2,6 @@
.idea
.vscode
/volume/deluge*
*.timestamp
.RData
.Rhistory
.Rproj.user

View File

@ -7,10 +7,9 @@ SHELL := bash
harness-stop \
integration \
tests \
unit-docker \
integration-docker \
image-test \
image-release \
image-test \
clean
# Runs the unit tests locally.
@ -19,7 +18,6 @@ unit:
# Starts the local integration harness. This is required for running pytest with the "integration" marker.
harness-start:
rm -rf ${PWD}/volume/deluge-{1,2,3}
docker compose -f docker-compose.local.yaml up
# Stops the local integration harness.
@ -33,7 +31,6 @@ integration:
tests: unit integration
# Builds the test image required for local dockerized integration tests.
image-test:
docker build -t bittorrent-benchmarks:test -f ./docker/bittorrent-benchmarks.Dockerfile .
@ -41,19 +38,11 @@ image-release:
docker build -t bittorrent-benchmarks:test --build-arg BUILD_TYPE="release" \
-f ./docker/bittorrent-benchmarks.Dockerfile .
# Runs the unit tests in a docker container.
unit-docker: image-test
docker run --entrypoint poetry --rm bittorrent-benchmarks:test run pytest -m "not integration"
# Runs the integration tests in a docker container.
integration-docker: image-test
integration-docker:
docker compose -f docker-compose.local.yaml -f docker-compose.ci.yaml down --volumes --remove-orphans
docker compose -f docker-compose.local.yaml -f docker-compose.ci.yaml up \
--abort-on-container-exit --exit-code-from test-runner
tests-docker: unit-docker integration-docker
clean:
rm -rf docker/.lastbuilt*
rm -rf volume/deluge-{1,2,3}
docker compose -f docker-compose.local.yaml -f docker-compose.ci.yaml down --volumes --rmi all --remove-orphans

View File

@ -4,26 +4,34 @@ import sys
from pathlib import Path
from typing import Dict
import uvicorn
from pydantic import IPvAnyAddress
from pydantic_core import ValidationError
from typing_extensions import TypeVar
from benchmarks.core.config import ConfigParser
from benchmarks.core.agent import AgentBuilder
from benchmarks.core.config import ConfigParser, Builder
from benchmarks.core.experiments.experiments import Experiment, ExperimentBuilder
from benchmarks.deluge.agent.api import DelugeAgentConfig
from benchmarks.deluge.config import DelugeExperimentConfig
from benchmarks.deluge.logging import DelugeTorrentDownload
from benchmarks.logging.logging import (
basic_log_parser,
LogSplitter,
LogEntry,
LogSplitterFormats,
)
from benchmarks.deluge.config import DelugeExperimentConfig
from benchmarks.deluge.logging import DelugeTorrentDownload
from benchmarks.logging.sources import (
VectorFlatFileSource,
FSOutputManager,
split_logs_in_source,
)
config_parser = ConfigParser[ExperimentBuilder]()
config_parser.register(DelugeExperimentConfig)
experiment_config_parser = ConfigParser[ExperimentBuilder]()
experiment_config_parser.register(DelugeExperimentConfig)
agent_config_parser = ConfigParser[AgentBuilder]()
agent_config_parser.register(DelugeAgentConfig)
log_parser = basic_log_parser()
log_parser.register(DelugeTorrentDownload)
@ -34,13 +42,13 @@ log_parser.register(DECLogEntry)
logger = logging.getLogger(__name__)
def cmd_list(experiments: Dict[str, ExperimentBuilder[Experiment]], _):
def cmd_list_experiment(experiments: Dict[str, ExperimentBuilder[Experiment]], _):
print("Available experiments are:")
for experiment in experiments.keys():
print(f" - {experiment}")
def cmd_run(experiments: Dict[str, ExperimentBuilder[Experiment]], args):
def cmd_run_experiment(experiments: Dict[str, ExperimentBuilder[Experiment]], args):
if args.experiment not in experiments:
print(f"Experiment {args.experiment} not found.")
sys.exit(-1)
@ -50,14 +58,14 @@ def cmd_run(experiments: Dict[str, ExperimentBuilder[Experiment]], args):
experiment.build().run()
def cmd_describe(args):
def cmd_describe_experiment(args):
if not args.type:
print("Available experiment types are:")
for experiment in config_parser.experiment_types.keys():
for experiment in experiment_config_parser.experiment_types.keys():
print(f" - {experiment}")
return
print(config_parser.experiment_types[args.type].schema_json(indent=2))
print(experiment_config_parser.experiment_types[args.type].schema_json(indent=2))
def cmd_parse_single_log(log: Path, output: Path):
@ -107,14 +115,33 @@ def cmd_parse_log_source(group_id: str, source_file: Path, output_dir: Path):
)
def _parse_config(config: Path) -> Dict[str, ExperimentBuilder[Experiment]]:
def cmd_run_agent(agents: Dict[str, AgentBuilder], args):
if args.agent not in agents:
print(f"Agent type {args.experiment} not found.")
sys.exit(-1)
uvicorn.run(
agents[args.agent].build(),
host=str(args.host),
port=args.port,
reload=False,
workers=1,
)
T = TypeVar("T")
def _parse_config(
config: Path, parser: ConfigParser[Builder[T]]
) -> Dict[str, Builder[T]]:
if not config.exists():
print(f"Config file {config} does not exist.")
sys.exit(-1)
with config.open(encoding="utf-8") as infile:
try:
return config_parser.parse(infile)
return parser.parse(infile)
except ValidationError as e:
print("There were errors parsing the config file.")
for error in e.errors():
@ -147,11 +174,17 @@ def main():
list_cmd = experiment_commands.add_parser(
"list", help="Lists available experiments."
)
list_cmd.set_defaults(func=lambda args: cmd_list(_parse_config(args.config), args))
list_cmd.set_defaults(
func=lambda args: cmd_list_experiment(_parse_config(args.config), args)
)
run_cmd = experiment_commands.add_parser("run", help="Runs an experiment")
run_cmd.add_argument("experiment", type=str, help="Name of the experiment to run.")
run_cmd.set_defaults(func=lambda args: cmd_run(_parse_config(args.config), args))
run_cmd.set_defaults(
func=lambda args: cmd_run_experiment(
_parse_config(args.config, experiment_config_parser), args
)
)
describe_cmd = commands.add_parser(
"describe", help="Shows the JSON schema for the various experiment types."
@ -160,11 +193,11 @@ def main():
"type",
type=str,
help="Type of the experiment to describe.",
choices=config_parser.experiment_types.keys(),
choices=experiment_config_parser.experiment_types.keys(),
nargs="?",
)
describe_cmd.set_defaults(func=cmd_describe)
describe_cmd.set_defaults(func=cmd_describe_experiment)
logs_cmd = commands.add_parser("logs", help="Parse logs.")
log_subcommands = logs_cmd.add_subparsers(required=True)
@ -198,6 +231,27 @@ def main():
)
)
agent_cmd = commands.add_parser("agent", help="Starts a local agent.")
agent_cmd.add_argument(
"config", type=Path, help="Path to the agent configuration file."
)
agent_cmd.add_argument("agent", type=str, help="Name of the agent to run.")
agent_cmd.add_argument(
"--host",
type=IPvAnyAddress,
help="IP address to bind to.",
default=IPvAnyAddress("0.0.0.0"),
)
agent_cmd.add_argument(
"--port", type=int, help="Port to listen to connections.", default=9001
)
agent_cmd.set_defaults(
func=lambda args: cmd_run_agent(
_parse_config(args.config, agent_config_parser), args
)
)
args = parser.parse_args()
_init_logging()

7
benchmarks/core/agent.py Normal file
View File

@ -0,0 +1,7 @@
from fastapi import FastAPI
from benchmarks.core.config import Builder
# Agents are auxiliary containers deployed alongside a :class:`Node` which allow us to implement
# arbitrary actions, like generating files for experiments, close to the node itself.
AgentBuilder = Builder[FastAPI]

View File

@ -30,8 +30,9 @@ class ConfigParser(Generic[TBuilder]):
Currently, each :class:`Builder` type can appear at most once in the config file.
"""
def __init__(self):
self.experiment_types = {}
def __init__(self, ignore_unknown: bool = True) -> None:
self.experiment_types: Dict[str, Type[TBuilder]] = {}
self.ignore_unknown = ignore_unknown
def register(self, root: Type[TBuilder]):
self.experiment_types[root.alias()] = root
@ -51,4 +52,5 @@ class ConfigParser(Generic[TBuilder]):
return {
tag: self.experiment_types[tag].model_validate(config)
for tag, config in entries.items()
if tag in self.experiment_types or not self.ignore_unknown
}

View File

@ -6,14 +6,13 @@ from typing import Sequence, Optional
from typing_extensions import Generic, List, Tuple
from benchmarks.core.experiments.experiments import ExperimentWithLifecycle
from benchmarks.logging.logging import RequestEvent, RequestEventType
from benchmarks.core.network import (
TInitialMetadata,
TNetworkHandle,
Node,
DownloadHandle,
)
from benchmarks.core.utils import ExperimentData
from benchmarks.logging.logging import RequestEvent, RequestEventType
logger = logging.getLogger(__name__)
@ -25,13 +24,18 @@ class StaticDisseminationExperiment(
self,
network: Sequence[Node[TNetworkHandle, TInitialMetadata]],
seeders: List[int],
data: ExperimentData[TInitialMetadata],
meta: TInitialMetadata,
file_size: int,
seed: int,
concurrency: Optional[int] = None,
logging_cooldown: int = 0,
) -> None:
self.nodes = network
self.seeders = seeders
self.data = data
self.meta = meta
self.file_size = file_size
self.seed = seed
self._pool = ThreadPool(
processes=len(network) - len(seeders)
if concurrency is None
@ -52,47 +56,40 @@ class StaticDisseminationExperiment(
len(leechers),
)
with self.data as (meta, data):
for node in seeders:
_log_request(node, "seed", str(meta), RequestEventType.start)
self._cid = node.seed(data, meta if self._cid is None else self._cid)
_log_request(node, "seed", str(meta), RequestEventType.end)
for node in seeders:
_log_request(node, "genseed", str(self.meta), RequestEventType.start)
self._cid = node.genseed(self.file_size, self.seed, self.meta)
_log_request(node, "genseed", str(self.meta), RequestEventType.end)
assert self._cid is not None # to please mypy
assert self._cid is not None # to please mypy
logger.info(
f"Setting up leechers: {[str(leecher) for leecher in leechers]}"
)
logger.info(f"Setting up leechers: {[str(leecher) for leecher in leechers]}")
def _leech(leecher):
_log_request(leecher, "leech", str(meta), RequestEventType.start)
download = leecher.leech(self._cid)
_log_request(leecher, "leech", str(meta), RequestEventType.end)
return download
def _leech(leecher):
_log_request(leecher, "leech", str(self.meta), RequestEventType.start)
download = leecher.leech(self._cid)
_log_request(leecher, "leech", str(self.meta), RequestEventType.end)
return download
downloads = list(self._pool.imap_unordered(_leech, leechers))
downloads = list(self._pool.imap_unordered(_leech, leechers))
logger.info("Now waiting for downloads to complete")
logger.info("Now waiting for downloads to complete")
def _await_for_download(element: Tuple[int, DownloadHandle]) -> int:
index, download = element
if not download.await_for_completion():
raise Exception(
f"Download ({index}, {str(download)}) did not complete in time."
)
return index
def _await_for_download(element: Tuple[int, DownloadHandle]) -> int:
index, download = element
if not download.await_for_completion():
raise Exception(
f"Download ({index}, {str(download)}) did not complete in time."
)
return index
for i in self._pool.imap_unordered(
_await_for_download, enumerate(downloads)
):
logger.info("Download %d / %d completed", i + 1, len(downloads))
for i in self._pool.imap_unordered(_await_for_download, enumerate(downloads)):
logger.info("Download %d / %d completed", i + 1, len(downloads))
# FIXME this is a hack to ensure that nodes get a chance to log their data before we
# run the teardown hook and remove the torrents.
logger.info(
f"Waiting for {self.logging_cooldown} seconds before teardown..."
)
sleep(self.logging_cooldown)
# FIXME this is a hack to ensure that nodes get a chance to log their data before we
# run the teardown hook and remove the torrents.
logger.info(f"Waiting for {self.logging_cooldown} seconds before teardown...")
sleep(self.logging_cooldown)
def teardown(self, exception: Optional[Exception] = None):
def _remove(element: Tuple[int, Node[TNetworkHandle, TInitialMetadata]]):

View File

@ -1,29 +1,28 @@
from dataclasses import dataclass
from io import StringIO
from pathlib import Path
from typing import Optional, List, Tuple, Union
from typing import Optional, List
from unittest.mock import patch
from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment
from benchmarks.core.experiments.tests.utils import MockExperimentData
from benchmarks.logging.logging import LogParser, RequestEvent, RequestEventType
from benchmarks.core.network import Node, DownloadHandle
from benchmarks.logging.logging import LogParser, RequestEvent, RequestEventType
@dataclass
class MockHandle:
path: Path
class MockGenData:
size: int
seed: int
name: str
def __str__(self):
return self.name
class MockNode(Node[MockHandle, str]):
class MockNode(Node[MockGenData, str]):
def __init__(self, name="mock_node") -> None:
self._name = name
self.seeding: Optional[Tuple[MockHandle, Path]] = None
self.leeching: Optional[MockHandle] = None
self.seeding: Optional[MockGenData] = None
self.leeching: Optional[MockGenData] = None
self.download_was_awaited = False
self.cleanup_was_called = False
@ -31,23 +30,19 @@ class MockNode(Node[MockHandle, str]):
def name(self) -> str:
return self._name
def seed(self, file: Path, handle: Union[str, MockHandle]) -> MockHandle:
if isinstance(handle, MockHandle):
self.seeding = (handle, file)
else:
self.seeding = (MockHandle(name=handle, path=file), file)
def genseed(self, size: int, seed: int, meta: str) -> MockGenData:
self.seeding = MockGenData(size=size, seed=seed, name=meta)
return self.seeding
return self.seeding[0]
def leech(self, handle: MockHandle):
def leech(self, handle: MockGenData):
self.leeching = handle
return MockDownloadHandle(self)
def remove(self, handle: MockHandle):
def remove(self, handle: MockGenData):
if self.leeching is not None:
assert self.leeching == handle
elif self.seeding is not None:
assert self.seeding[0] == handle
assert self.seeding == handle
else:
raise Exception(
"Either leech or seed must be called before attempting a remove"
@ -69,15 +64,13 @@ def mock_network(n: int) -> List[MockNode]:
return [MockNode(f"node-{i}") for i in range(n)]
def test_should_place_seeders():
def test_should_generate_correct_data_and_seed():
network = mock_network(n=13)
data = MockExperimentData(meta="data", data=Path("/path/to/data"))
gendata = MockGenData(size=1000, seed=12, name="dataset1")
seeders = [9, 6, 3]
experiment = StaticDisseminationExperiment(
seeders=seeders,
network=network,
data=data,
seeders=seeders, network=network, meta="dataset1", file_size=1000, seed=12
)
experiment.run()
@ -86,20 +79,22 @@ def test_should_place_seeders():
for index, node in enumerate(network):
if node.seeding is not None:
actual_seeders.add(index)
assert node.seeding[0] == MockHandle(name=data.meta, path=data.data)
assert node.seeding == gendata
assert actual_seeders == set(seeders)
def test_should_download_at_remaining_nodes():
network = mock_network(n=13)
data = MockExperimentData(meta="data", data=Path("/path/to/data"))
gendata = MockGenData(size=1000, seed=12, name="dataset1")
seeders = [9, 6, 3]
experiment = StaticDisseminationExperiment(
seeders=seeders,
network=network,
data=data,
meta="dataset1",
file_size=1000,
seed=12,
)
experiment.run()
@ -107,8 +102,7 @@ def test_should_download_at_remaining_nodes():
actual_leechers = set()
for index, node in enumerate(network):
if node.leeching is not None:
assert node.leeching.path == data.data
assert node.leeching.name == data.meta
assert node.leeching == gendata
assert node.seeding is None
assert node.download_was_awaited
actual_leechers.add(index)
@ -116,34 +110,18 @@ def test_should_download_at_remaining_nodes():
assert actual_leechers == set(range(13)) - set(seeders)
def test_should_delete_generated_file_at_end_of_experiment():
network = mock_network(n=2)
data = MockExperimentData(meta="data", data=Path("/path/to/data"))
seeders = [1]
experiment = StaticDisseminationExperiment(
seeders=seeders,
network=network,
data=data,
)
experiment.run()
assert data.cleanup_called
def test_should_log_requests_to_seeders_and_leechers(mock_logger):
logger, output = mock_logger
with patch("benchmarks.core.experiments.static_experiment.logger", logger):
network = mock_network(n=3)
data = MockExperimentData(meta="dataset-1", data=Path("/path/to/data"))
seeders = [1]
experiment = StaticDisseminationExperiment(
seeders=seeders,
network=network,
data=data,
concurrency=1,
meta="dataset-1",
file_size=1000,
seed=12,
)
experiment.run()
@ -157,7 +135,7 @@ def test_should_log_requests_to_seeders_and_leechers(mock_logger):
RequestEvent(
destination="node-1",
node="runner",
name="seed",
name="genseed",
request_id="dataset-1",
type=RequestEventType.start,
timestamp=events[0].timestamp,
@ -165,7 +143,7 @@ def test_should_log_requests_to_seeders_and_leechers(mock_logger):
RequestEvent(
destination="node-1",
node="runner",
name="seed",
name="genseed",
request_id="dataset-1",
type=RequestEventType.end,
timestamp=events[1].timestamp,
@ -207,13 +185,14 @@ def test_should_log_requests_to_seeders_and_leechers(mock_logger):
def test_should_delete_file_from_nodes_at_the_end_of_the_experiment():
network = mock_network(n=2)
data = MockExperimentData(meta="data", data=Path("/path/to/data"))
seeders = [1]
experiment = StaticDisseminationExperiment(
seeders=seeders,
network=network,
data=data,
meta="dataset-1",
file_size=1000,
seed=12,
)
experiment.run()

View File

@ -0,0 +1,31 @@
from io import BytesIO
from benchmarks.core.utils import random_data
def test_should_generate_the_requested_amount_of_bytes():
f = BytesIO()
random_data(size=1024, outfile=f)
assert len(f.getvalue()) == 1024
def test_should_generate_equal_files_for_equal_seeds():
f1 = BytesIO()
f2 = BytesIO()
random_data(size=1024, outfile=f1, seed=1234)
random_data(size=1024, outfile=f2, seed=1234)
assert f1.getvalue() == f2.getvalue()
def test_should_generate_different_files_for_different_seeds():
f1 = BytesIO()
f2 = BytesIO()
random_data(size=1024, outfile=f1, seed=1234)
random_data(size=1024, outfile=f2, seed=1235)
assert f1.getvalue() != f2.getvalue()

View File

@ -1,18 +0,0 @@
from pathlib import Path
from typing import Tuple
from benchmarks.core.network import TInitialMetadata
from benchmarks.core.utils import ExperimentData
class MockExperimentData(ExperimentData[TInitialMetadata]):
def __init__(self, meta: TInitialMetadata, data: Path):
self.cleanup_called = False
self.meta = meta
self.data = data
def __enter__(self) -> Tuple[TInitialMetadata, Path]:
return self.meta, self.data
def __exit__(self, exc_type, exc_val, exc_tb):
self.cleanup_called = True

View File

@ -1,8 +1,6 @@
import shutil
from abc import abstractmethod, ABC
from pathlib import Path
from typing_extensions import Generic, TypeVar, Union
from typing_extensions import Generic, TypeVar
TNetworkHandle = TypeVar("TNetworkHandle")
TInitialMetadata = TypeVar("TInitialMetadata")
@ -30,20 +28,20 @@ class Node(ABC, Generic[TNetworkHandle, TInitialMetadata]):
pass
@abstractmethod
def seed(
def genseed(
self,
file: Path,
handle: Union[TInitialMetadata, TNetworkHandle],
size: int,
seed: int,
meta: TInitialMetadata,
) -> TNetworkHandle:
"""
Makes the current :class:`Node` a seeder for the specified file.
Generates a random file of given size and makes the current node a seeder for it. Identical seeds,
metadata, and sizes should result in identical network handles.
:param file: local path to the file to seed.
:param handle: file sharing typically requires some initial metadata when a file is first uploaded into the
network, and this will typically then result into a compact representation such as a manifest CID (Codex)
or a Torrent file (Bittorrent) which other nodes can then use to identify and locate both the file and its
metadata within the network. When doing an initial seed, this method should be called with the initial
metadata (TInitialMetadata). Subsequent calls should use the network handle (TNetworkHandle).
:param size: The size of the file to be seeded.
:param seed: The seed for the random number generator producing the file.
:param meta: Additional, client-specific metadata relevant to the seeding process. For torrents,
this could be the name of the torrent.
:return: The network handle (TNetworkHandle) for this file. This handle should be used for subsequent calls to
:meth:`seed`.
@ -64,22 +62,3 @@ class Node(ABC, Generic[TNetworkHandle, TInitialMetadata]):
seeding it. For leechers, it will stop downloading it. In both cases, the file will be removed from the node's
storage."""
pass
class SharedFSNode(Node[TNetworkHandle, TInitialMetadata], ABC):
"""A `SharedFSNode` is a :class:`Node` which shares a network volume with us. This means
we are able to upload files to it by means of simple file copies."""
def __init__(self, volume: Path):
self.volume = volume
def upload(self, local: Path, name: str) -> Path:
target_path = self.volume / name
target_path.mkdir(parents=True, exist_ok=True)
target = target_path / local.name
if local.is_dir():
shutil.copytree(local, target)
else:
shutil.copy(local, target)
return target

View File

@ -1,50 +1,6 @@
import os
import random
import tempfile
from abc import ABC, abstractmethod
from contextlib import contextmanager, AbstractContextManager
from dataclasses import dataclass
from pathlib import Path
from time import time, sleep
from typing import Iterator, Tuple, ContextManager, Optional, Callable
from typing_extensions import Generic
from benchmarks.core.network import TInitialMetadata
@dataclass
class ExperimentData(Generic[TInitialMetadata], AbstractContextManager, ABC):
""":class:`ExperimentData` provides a context for providing and wiping out
data and metadata objects, usually within the scope of an experiment."""
@abstractmethod
def __enter__(self) -> Tuple[TInitialMetadata, Path]:
"""Generates new data and metadata and returns it."""
pass
@abstractmethod
def __exit__(self, exc_type, exc_val, exc_tb):
"""Wipes out data and metadata."""
pass
class RandomTempData(ExperimentData[TInitialMetadata]):
def __init__(self, size: int, meta: TInitialMetadata):
self.meta = meta
self.size = size
self._context: Optional[ContextManager[Tuple[TInitialMetadata, Path]]] = None
def __enter__(self) -> Tuple[TInitialMetadata, Path]:
if self._context is not None:
raise Exception("Cannot enter context twice")
self._context = temp_random_file(self.size, "data.bin")
return self.meta, self._context.__enter__()
def __exit__(self, exc_type, exc_val, exc_tb):
self._context.__exit__(exc_type, exc_val, exc_tb)
from typing import Iterator, Optional, Callable, IO
def await_predicate(
@ -78,19 +34,12 @@ def megabytes(n: int) -> int:
return kilobytes(n) * 1024
@contextmanager
def temp_random_file(
size: int, name: str = "data.bin", batch_size: int = megabytes(50)
def random_data(
size: int, outfile: IO, batch_size: int = megabytes(50), seed: Optional[int] = None
):
with tempfile.TemporaryDirectory() as temp_dir_str:
temp_dir = Path(temp_dir_str)
random_file = temp_dir / name
with random_file.open("wb") as outfile:
while size > 0:
batch = min(size, batch_size)
random_bytes = os.urandom(batch)
outfile.write(random_bytes)
size -= batch
yield random_file
rnd = random.Random(seed) if seed is not None else random
while size > 0:
batch = min(size, batch_size)
random_bytes = rnd.randbytes(batch)
outfile.write(random_bytes)
size -= batch

View File

@ -0,0 +1,25 @@
from pathlib import Path
from typing import Optional
from torrentool.torrent import Torrent
from benchmarks.core.utils import random_data, megabytes
class DelugeAgent:
def __init__(self, torrents_path: Path, batch_size: int = megabytes(50)):
self.torrents_path = torrents_path
self.batch_size = batch_size
def create_torrent(self, name: str, size: int, seed: Optional[int]) -> Torrent:
torrent_path = self.torrents_path / name
torrent_path.mkdir(parents=True, exist_ok=False)
file_path = torrent_path / "datafile.bin"
with file_path.open(mode="wb") as output:
random_data(size=size, outfile=output, seed=seed)
torrent = Torrent.create_from(torrent_path)
torrent.name = name
return torrent

View File

@ -0,0 +1,43 @@
from pathlib import Path
from typing import Annotated, Optional
from fastapi import FastAPI, Depends, APIRouter, Response
from benchmarks.core.agent import AgentBuilder
from benchmarks.core.utils import megabytes
from benchmarks.deluge.agent.agent import DelugeAgent
router = APIRouter()
def deluge_agent() -> DelugeAgent:
raise Exception("Dependency must be set")
@router.post("/api/v1/deluge/torrent")
def generate(
agent: Annotated[DelugeAgent, Depends(deluge_agent)],
name: str,
size: int,
seed: Optional[int],
):
return Response(
agent.create_torrent(name=name, size=size, seed=seed).to_string(),
media_type="application/octet-stream",
)
class DelugeAgentConfig(AgentBuilder):
torrents_path: Path
batch_size: int = megabytes(50)
def build(self) -> FastAPI:
app = FastAPI()
app.include_router(router)
agent = DelugeAgent(
torrents_path=self.torrents_path,
batch_size=self.batch_size,
)
app.dependency_overrides[deluge_agent] = lambda: agent
return app

View File

@ -0,0 +1,27 @@
import requests
from tenacity import stop_after_attempt, wait_exponential, retry
from torrentool.torrent import Torrent
from urllib3.util import Url
class DelugeAgentClient:
def __init__(self, url: Url):
self.url = url
def generate(self, size: int, seed: int, name: str) -> Torrent:
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=16),
)
def _request():
return requests.post(
url=self.url._replace(path="/api/v1/deluge/torrent").url,
params={
"size": size,
"seed": seed,
"name": name,
},
)
torrent = Torrent.from_string(_request().content)
return torrent

View File

@ -0,0 +1,79 @@
import tempfile
from pathlib import Path
import pytest
from torrentool.torrent import TorrentFile
from benchmarks.deluge.agent.agent import DelugeAgent
@pytest.fixture
def temp_dir():
with tempfile.TemporaryDirectory() as temp_dir:
yield Path(temp_dir)
def test_should_create_torrent_at_specified_location(temp_dir):
agent = DelugeAgent(
torrents_path=temp_dir,
)
torrent_file = agent.create_torrent(
name="dataset-1",
size=1024,
seed=12,
)
assert torrent_file.name == "dataset-1"
assert torrent_file.total_size == 1024
assert torrent_file.files == [TorrentFile("dataset-1/datafile.bin", 1024)]
assert (temp_dir / "dataset-1" / "datafile.bin").stat().st_size == 1024
def test_should_generate_identical_torrent_files_for_identical_seeds(temp_dir):
agent1 = DelugeAgent(
torrents_path=temp_dir / "d1",
)
torrent_file1 = agent1.create_torrent(
name="dataset-1",
size=1024,
seed=12,
)
agent2 = DelugeAgent(
torrents_path=temp_dir / "d2",
)
torrent_file2 = agent2.create_torrent(
name="dataset-1",
size=1024,
seed=12,
)
assert torrent_file1.to_string() == torrent_file2.to_string()
def test_should_generate_different_torrent_files_for_different_seeds(temp_dir):
agent1 = DelugeAgent(
torrents_path=temp_dir / "d1",
)
torrent_file1 = agent1.create_torrent(
name="dataset-1",
size=1024,
seed=12,
)
agent2 = DelugeAgent(
torrents_path=temp_dir / "d2",
)
torrent_file2 = agent2.create_torrent(
name="dataset-1",
size=1024,
seed=13,
)
assert torrent_file1.to_string() != torrent_file2.to_string()

View File

@ -0,0 +1,25 @@
from fastapi import FastAPI
from starlette.testclient import TestClient
from torrentool.torrent import Torrent
from benchmarks.deluge.agent import api
from benchmarks.deluge.agent.agent import DelugeAgent
from benchmarks.deluge.agent.api import deluge_agent
def test_should_return_a_valid_byte_encoded_torrent_object(tmp_path):
app = FastAPI()
app.include_router(api.router)
app.dependency_overrides[deluge_agent] = lambda: DelugeAgent(tmp_path)
client = TestClient(app)
response = client.post(
"/api/v1/deluge/torrent",
params={"name": "dataset-1", "size": 1024, "seed": 12},
)
assert response.status_code == 200
torrent = Torrent.from_string(response.content)
assert torrent.name == "dataset-1"
assert torrent.total_size == 1024

View File

@ -1,3 +1,4 @@
import random
from itertools import islice
from pathlib import Path
from typing import List
@ -14,7 +15,7 @@ from benchmarks.core.experiments.experiments import (
)
from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment
from benchmarks.core.pydantic import Host
from benchmarks.core.utils import sample, RandomTempData
from benchmarks.core.utils import sample
from benchmarks.deluge.deluge_node import DelugeMeta, DelugeNode
from benchmarks.deluge.tracker import Tracker
@ -24,6 +25,7 @@ class DelugeNodeConfig(BaseModel):
address: Host
daemon_port: int
listen_ports: list[int] = Field(min_length=2, max_length=2)
agent_url: HttpUrl
class DelugeNodeSetConfig(BaseModel):
@ -34,6 +36,7 @@ class DelugeNodeSetConfig(BaseModel):
listen_ports: list[int] = Field(min_length=2, max_length=2)
first_node_index: int = 1
nodes: List[DelugeNodeConfig] = []
agent_url: HttpUrl
@model_validator(mode="after")
def expand_nodes(self):
@ -43,6 +46,7 @@ class DelugeNodeSetConfig(BaseModel):
address=self.address.format(node_index=str(i)),
daemon_port=self.daemon_port,
listen_ports=self.listen_ports,
agent_url=self.agent_url,
)
for i in range(
self.first_node_index, self.first_node_index + self.network_size
@ -97,6 +101,7 @@ class DelugeExperimentConfig(ExperimentBuilder[DelugeDisseminationExperiment]):
volume=self.shared_volume_path,
daemon_port=node_spec.daemon_port,
daemon_address=str(node_spec.address),
agent_url=parse_url(str(node_spec.agent_url)),
)
for i, node_spec in enumerate(nodes_specs)
]
@ -116,12 +121,11 @@ class DelugeExperimentConfig(ExperimentBuilder[DelugeDisseminationExperiment]):
StaticDisseminationExperiment(
network=network,
seeders=seeders,
data=RandomTempData(
size=self.file_size,
meta=DelugeMeta(
f"dataset-{seeder_set}-{experiment_run}",
announce_url=tracker.announce_url,
),
file_size=self.file_size,
seed=random.randint(0, 2**16),
meta=DelugeMeta(
f"dataset-{seeder_set}-{experiment_run}",
announce_url=tracker.announce_url,
),
logging_cooldown=self.logging_cooldown,
)

View File

@ -5,7 +5,7 @@ import socket
from dataclasses import dataclass
from io import BytesIO
from pathlib import Path
from typing import List, Union, Optional, Self, Dict, Any
from typing import List, Optional, Self, Dict, Any
import pathvalidate
from deluge_client import DelugeRPCClient
@ -16,8 +16,9 @@ from torrentool.torrent import Torrent
from urllib3.util import Url
from benchmarks.core.experiments.experiments import ExperimentComponent
from benchmarks.core.network import SharedFSNode, DownloadHandle
from benchmarks.core.network import DownloadHandle
from benchmarks.core.utils import await_predicate
from benchmarks.deluge.agent.client import DelugeAgentClient
logger = logging.getLogger(__name__)
@ -31,12 +32,13 @@ class DelugeMeta:
announce_url: Url
class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent):
class DelugeNode(ExperimentComponent):
def __init__(
self,
name: str,
volume: Path,
daemon_port: int,
agent_url: Url = Url(scheme="http", host="localhost", port=8000),
daemon_address: str = "localhost",
daemon_username: str = "user",
daemon_password: str = "password",
@ -45,7 +47,7 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent):
raise ValueError(f'Node name must be a valid filename (bad name: "{name}")')
self._name = name
self.downloads_root = volume / name / "downloads"
self.downloads_root = volume / "downloads"
self._rpc: Optional[DelugeRPCClient] = None
self.daemon_args = {
@ -55,9 +57,7 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent):
"password": daemon_password,
}
super().__init__(self.downloads_root)
self._init_folders()
self.agent = DelugeAgentClient(agent_url)
@property
def name(self) -> str:
@ -80,27 +80,17 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent):
# folder after your check, so this is the only sane way to do it.
pass
self._init_folders()
def seed(
def genseed(
self,
file: Path,
handle: Union[DelugeMeta, Torrent],
size: int,
seed: int,
meta: DelugeMeta,
) -> Torrent:
data_root = self.downloads_root / handle.name
data_root.mkdir(parents=True, exist_ok=False)
target = self.upload(local=file, name=handle.name)
if isinstance(handle, DelugeMeta):
torrent = Torrent.create_from(target.parent)
torrent.announce_urls = handle.announce_url.url
torrent.name = handle.name
else:
torrent = handle
torrent = self.agent.generate(size, seed, meta.name)
torrent.announce_urls = [str(meta.announce_url)]
self.rpc.core.add_torrent_file(
filename=f"{handle.name}.torrent",
filename=f"{meta.name}.torrent",
filedump=self._b64dump(torrent),
options=dict(),
)
@ -154,9 +144,6 @@ class DelugeNode(SharedFSNode[Torrent, DelugeMeta], ExperimentComponent):
except (ConnectionRefusedError, socket.gaierror):
return False
def _init_folders(self):
self.downloads_root.mkdir(parents=True, exist_ok=True)
@staticmethod
def _b64dump(handle: Torrent) -> bytes:
buffer = BytesIO()

View File

@ -5,18 +5,20 @@ from typing import Generator
import pytest
from urllib3.util import parse_url
from benchmarks.core import utils
from benchmarks.core.utils import megabytes, await_predicate
from benchmarks.core.utils import await_predicate
from benchmarks.deluge.deluge_node import DelugeNode
from benchmarks.deluge.tracker import Tracker
from benchmarks.tests.utils import shared_volume
def deluge_node(
name: str, address: str, port: int
name: str, address: str, port: int, agent_url: str
) -> Generator[DelugeNode, None, None]:
node = DelugeNode(
name, volume=shared_volume(), daemon_address=address, daemon_port=port
name,
volume=Path("/var/lib/deluge"),
daemon_address=address,
daemon_port=port,
agent_url=parse_url(agent_url),
)
assert await_predicate(node.is_ready, timeout=10, polling_interval=0.5)
node.wipe_all_torrents()
@ -29,30 +31,33 @@ def deluge_node(
@pytest.fixture
def deluge_node1() -> Generator[DelugeNode, None, None]:
yield from deluge_node(
"deluge-1", os.environ.get("DELUGE_NODE_1", "localhost"), 6890
"deluge-1",
os.environ.get("DELUGE_NODE_1", "localhost"),
6890,
os.environ.get("DELUGE_AGENT_1", "http://localhost:9001"),
)
@pytest.fixture
def deluge_node2() -> Generator[DelugeNode, None, None]:
yield from deluge_node(
"deluge-2", os.environ.get("DELUGE_NODE_2", "localhost"), 6893
"deluge-2",
os.environ.get("DELUGE_NODE_2", "localhost"),
6893,
os.environ.get("DELUGE_AGENT_2", "http://localhost:9002"),
)
@pytest.fixture
def deluge_node3() -> Generator[DelugeNode, None, None]:
yield from deluge_node(
"deluge-3", os.environ.get("DELUGE_NODE_3", "localhost"), 6896
"deluge-3",
os.environ.get("DELUGE_NODE_3", "localhost"),
6896,
os.environ.get("DELUGE_AGENT_3", "http://localhost:9003"),
)
@pytest.fixture
def temp_random_file() -> Generator[Path, None, None]:
with utils.temp_random_file(size=megabytes(1)) as random_file:
yield random_file
@pytest.fixture
def tracker() -> Tracker:
return Tracker(

View File

@ -20,6 +20,7 @@ def test_should_expand_node_sets_into_simple_nodes():
network_size=4,
daemon_port=6080,
listen_ports=[6081, 6082],
agent_url="http://localhost:8000",
)
assert nodeset.nodes == [
@ -28,24 +29,28 @@ def test_should_expand_node_sets_into_simple_nodes():
address="deluge-1.local.svc",
daemon_port=6080,
listen_ports=[6081, 6082],
agent_url="http://localhost:8000",
),
DelugeNodeConfig(
name="custom-2",
address="deluge-2.local.svc",
daemon_port=6080,
listen_ports=[6081, 6082],
agent_url="http://localhost:8000",
),
DelugeNodeConfig(
name="custom-3",
address="deluge-3.local.svc",
daemon_port=6080,
listen_ports=[6081, 6082],
agent_url="http://localhost:8000",
),
DelugeNodeConfig(
name="custom-4",
address="deluge-4.local.svc",
daemon_port=6080,
listen_ports=[6081, 6082],
agent_url="http://localhost:8000",
),
]
@ -58,6 +63,7 @@ def test_should_respect_first_node_index():
daemon_port=6080,
listen_ports=[6081, 6082],
first_node_index=5,
agent_url="http://localhost:8000",
)
assert nodeset.nodes == [
@ -66,12 +72,14 @@ def test_should_respect_first_node_index():
address="deluge-5.local.svc",
daemon_port=6080,
listen_ports=[6081, 6082],
agent_url="http://localhost:8000",
),
DelugeNodeConfig(
name="deluge-6",
address="deluge-6.local.svc",
daemon_port=6080,
listen_ports=[6081, 6082],
agent_url="http://localhost:8000",
),
]
@ -91,6 +99,7 @@ def test_should_build_experiment_from_config():
address: 'node-{node_index}.deluge.codexbenchmarks.svc.cluster.local'
daemon_port: 6890
listen_ports: [ 6891, 6892 ]
agent_url: http://localhost:8080
""")
config = DelugeExperimentConfig.model_validate(
@ -126,6 +135,7 @@ def test_should_create_n_repetitions_per_seeder_set():
address: 'node-{node_index}.deluge.codexbenchmarks.svc.cluster.local'
daemon_port: 6890
listen_ports: [ 6891, 6892 ]
agent_url: http://localhost:8080
""")
config = DelugeExperimentConfig.model_validate(

View File

@ -1,5 +1,3 @@
from pathlib import Path
import pytest
from tenacity import wait_incrementing, stop_after_attempt, RetryError
@ -31,14 +29,15 @@ def assert_is_seed(node: DelugeNode, name: str, size: int):
@pytest.mark.integration
def test_should_seed_files(
deluge_node1: DelugeNode, temp_random_file: Path, tracker: Tracker
):
def test_should_seed_files(deluge_node1: DelugeNode, tracker: Tracker):
assert not deluge_node1.torrent_info(name="dataset1")
deluge_node1.seed(
temp_random_file, DelugeMeta(name="dataset1", announce_url=tracker.announce_url)
deluge_node1.genseed(
size=megabytes(1),
seed=1234,
meta=DelugeMeta(name="dataset1", announce_url=tracker.announce_url),
)
assert_is_seed(deluge_node1, name="dataset1", size=megabytes(1))
@ -46,14 +45,15 @@ def test_should_seed_files(
def test_should_download_files(
deluge_node1: DelugeNode,
deluge_node2: DelugeNode,
temp_random_file: Path,
tracker: Tracker,
):
assert not deluge_node1.torrent_info(name="dataset1")
assert not deluge_node2.torrent_info(name="dataset1")
torrent = deluge_node1.seed(
temp_random_file, DelugeMeta(name="dataset1", announce_url=tracker.announce_url)
torrent = deluge_node1.genseed(
size=megabytes(1),
seed=1234,
meta=DelugeMeta(name="dataset1", announce_url=tracker.announce_url),
)
handle = deluge_node2.leech(torrent)
@ -63,13 +63,13 @@ def test_should_download_files(
@pytest.mark.integration
def test_should_remove_files(
deluge_node1: DelugeNode, temp_random_file: Path, tracker: Tracker
):
def test_should_remove_files(deluge_node1: DelugeNode, tracker: Tracker):
assert not deluge_node1.torrent_info(name="dataset1")
torrent = deluge_node1.seed(
temp_random_file, DelugeMeta(name="dataset1", announce_url=tracker.announce_url)
torrent = deluge_node1.genseed(
size=megabytes(1),
seed=1234,
meta=DelugeMeta(name="dataset1", announce_url=tracker.announce_url),
)
assert_is_seed(deluge_node1, name="dataset1", size=megabytes(1))

View File

@ -2,7 +2,7 @@ import pytest
from benchmarks.core.experiments.experiments import ExperimentEnvironment
from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment
from benchmarks.core.utils import RandomTempData, megabytes
from benchmarks.core.utils import megabytes
from benchmarks.deluge.deluge_node import DelugeMeta
from benchmarks.deluge.tests.test_deluge_node import assert_is_seed
@ -20,10 +20,9 @@ def test_should_run_with_a_single_seeder(
experiment = StaticDisseminationExperiment(
network=[deluge_node1, deluge_node2, deluge_node3],
seeders=[1],
data=RandomTempData(
size=size,
meta=DelugeMeta("dataset-1", announce_url=tracker.announce_url),
),
file_size=size,
seed=1234,
meta=DelugeMeta("dataset-1", announce_url=tracker.announce_url),
)
env.await_ready()
@ -51,10 +50,9 @@ def test_should_run_with_multiple_seeders(
experiment = StaticDisseminationExperiment(
network=[deluge_node1, deluge_node2, deluge_node3],
seeders=[1, 2],
data=RandomTempData(
size=size,
meta=DelugeMeta("dataset-1", announce_url=tracker.announce_url),
),
file_size=size,
seed=1234,
meta=DelugeMeta("dataset-1", announce_url=tracker.announce_url),
)
env.await_ready()

View File

@ -8,7 +8,7 @@ from collections.abc import Iterator
from contextlib import AbstractContextManager
from json import JSONDecodeError
from pathlib import Path
from typing import TextIO, Optional, Tuple, List, Dict, Type
from typing import TextIO, Optional, Tuple, List, Dict, Type, IO
from benchmarks.logging.logging import (
LogParser,
@ -51,14 +51,14 @@ class OutputManager(AbstractContextManager):
"""An :class:`OutputManager` is responsible for managing output locations for log splitting operations.
:class:`OutputManager`s must be closed after use, and implements the context manager interface to that end."""
def open(self, relative_path: Path) -> TextIO:
def open(self, relative_path: Path, mode: str = "w", encoding="utf-8") -> IO:
"""Opens a file for writing within a relative abstract path."""
if relative_path.is_absolute():
raise ValueError(f"Path {relative_path} must be relative.")
return self._open(relative_path)
return self._open(relative_path, mode, encoding)
@abstractmethod
def _open(self, relative_path: Path) -> TextIO:
def _open(self, relative_path: Path, mode: str, encoding: str) -> IO:
pass
@ -67,13 +67,13 @@ class FSOutputManager(OutputManager):
def __init__(self, root: Path) -> None:
self.root = root
self.open_files: List[TextIO] = []
self.open_files: List[IO] = []
def _open(self, relative_path: Path) -> TextIO:
def _open(self, relative_path: Path, mode: str, encoding: str) -> IO:
fullpath = self.root / relative_path
parent = fullpath.parent
parent.mkdir(parents=True, exist_ok=True)
f = fullpath.open("w", encoding="utf-8")
f = fullpath.open(mode, encoding=encoding)
self.open_files.append(f)
return f

View File

@ -1,13 +1,12 @@
import datetime
from io import StringIO
from benchmarks.logging.logging import LogEntry, LogParser
from benchmarks.logging.sources import (
VectorFlatFileSource,
OutputManager,
split_logs_in_source,
)
from benchmarks.logging.logging import LogEntry, LogParser
from benchmarks.logging.tests.utils import InMemoryOutputManager
from benchmarks.tests.utils import make_jsonl, compact
EXPERIMENT_LOG = [
@ -64,27 +63,6 @@ EXPERIMENT_LOG = [
]
class InMemoryOutputManager(OutputManager):
def __init__(self):
self.fs = {}
def _open(self, relative_path):
root = self.fs
for element in relative_path.parts[:-1]:
subtree = root.get(element)
if subtree is None:
subtree = {}
root[element] = subtree
root = subtree
output = StringIO()
root[relative_path.parts[-1]] = output
return output
def __exit__(self, exc_type, exc_value, traceback, /):
pass
class MetricsEvent(LogEntry):
name: str
timestamp: datetime.datetime

View File

@ -0,0 +1,24 @@
from io import StringIO
from benchmarks.logging.sources import OutputManager
class InMemoryOutputManager(OutputManager):
def __init__(self):
self.fs = {}
def _open(self, relative_path, mode: str, encoding: str):
root = self.fs
for element in relative_path.parts[:-1]:
subtree = root.get(element)
if subtree is None:
subtree = {}
root[element] = subtree
root = subtree
output = StringIO()
root[relative_path.parts[-1]] = output
return output
def __exit__(self, exc_type, exc_value, traceback, /):
pass

View File

@ -3,47 +3,15 @@ services:
test-runner:
image: bittorrent-benchmarks:test
container_name: test-runner
volumes:
- shared-volume:/opt/bittorrent-benchmarks/volume
entrypoint: [ "bash", "-c", "/opt/bittorrent-benchmarks/docker/bin/run-tests.sh" ]
healthcheck:
test: stat /opt/bittorrent-benchmarks/volume/.initialized
interval: 1s
timeout: 5s
retries: 150
deluge-1:
volumes: !override
- type: volume
source: shared-volume
target: /var/lib/deluge
volume:
subpath: deluge-1
entrypoint: [ "poetry", "run", "pytest", "--exitfirst" ]
environment:
- DELUGE_NODE_1=deluge-1
- DELUGE_NODE_2=deluge-2
- DELUGE_NODE_3=deluge-3
- DELUGE_AGENT_1=http://agent-1:9001/
- DELUGE_AGENT_2=http://agent-2:9002/
- DELUGE_AGENT_3=http://agent-3:9003/
- TRACKER_ANNOUNCE_URL=http://tracker:8000/announce
depends_on:
test-runner:
clean-volumes:
condition: service_healthy
deluge-2:
volumes: !override
- type: volume
source: shared-volume
target: /var/lib/deluge
volume:
subpath: deluge-2
depends_on:
test-runner:
condition: service_healthy
deluge-3:
volumes: !override
- type: volume
source: shared-volume
target: /var/lib/deluge
volume:
subpath: deluge-3
depends_on:
test-runner:
condition: service_healthy
volumes:
shared-volume:

View File

@ -1,11 +1,27 @@
# This compose spec contains the basic setup for running integration tests with the
# test runner outside of a container and bind mounts for data so they can be inspected.
# This is ideal for development.
# You will need [rootless Docker](https://docs.docker.com/engine/security/rootless/)
# for this to work cause the tests rely on user-writable bind mounts.
# test runner outside of a container. This is ideal for local development.
services:
clean-volumes:
image: alpine
container_name: clean-volumes
entrypoint:
- /bin/sh
- -c
- |
rm -rf /var/lib/deluge1/* /var/lib/deluge2/* /var/lib/deluge3/*
touch /.done
sleep infinity
volumes:
- shared-volume-1:/var/lib/deluge1
- shared-volume-2:/var/lib/deluge2
- shared-volume-3:/var/lib/deluge3
healthcheck:
timeout: 10s
test: [ "CMD", "test", "-f", "/.done" ]
retries: 10
interval: 1s
deluge-1:
image: codexstorage/deluge
container_name: deluge-1
@ -15,11 +31,25 @@ services:
- DELUGE_LOG_LEVEL=${DELUGE_LOG_LEVEL:-info}
- DELUGE_NODE_ID=deluge-1
volumes:
- ./volume/deluge-1:/var/lib/deluge
- ./volume/deluge-1/downloads:/var/lib/deluge/downloads
- shared-volume-1:/var/lib/deluge
ports:
- "6890:6890"
- "6891-6892:6891-6892"
depends_on:
clean-volumes:
condition: service_healthy
agent-1:
image: bittorrent-benchmarks:test
container_name: agent-1
entrypoint: [ "poetry", "run", "bittorrent-benchmarks",
"agent", "experiments.local.yaml", "deluge_agent", "--port", "9001" ]
environment:
- TORRENTS_ROOT=/var/lib/deluge/downloads
volumes:
- shared-volume-1:/var/lib/deluge
ports:
- "9001:9001"
deluge-2:
image: codexstorage/deluge
@ -30,11 +60,25 @@ services:
- DELUGE_LOG_LEVEL=${DELUGE_LOG_LEVEL:-info}
- DELUGE_NODE_ID=deluge-2
volumes:
- ./volume/deluge-2:/var/lib/deluge
- ./volume/deluge-2/downloads:/var/lib/deluge/downloads
- shared-volume-2:/var/lib/deluge
ports:
- "6893:6893"
- "6894-6895:6894-6895"
depends_on:
clean-volumes:
condition: service_healthy
agent-2:
image: bittorrent-benchmarks:test
container_name: agent-2
entrypoint: [ "poetry", "run", "bittorrent-benchmarks", "agent",
"experiments.local.yaml", "deluge_agent", "--port", "9002" ]
environment:
- TORRENTS_ROOT=/var/lib/deluge/downloads
volumes:
- shared-volume-2:/var/lib/deluge
ports:
- "9002:9002"
deluge-3:
image: codexstorage/deluge
@ -45,14 +89,33 @@ services:
- DELUGE_LOG_LEVEL=${DELUGE_LOG_LEVEL:-info}
- DELUGE_NODE_ID=deluge-3
volumes:
- ./volume/deluge-3:/var/lib/deluge
- ./volume/deluge-3/downloads:/var/lib/deluge/downloads
- shared-volume-3:/var/lib/deluge
ports:
- "6896:6896"
- "6897-6898:6897-6898"
depends_on:
clean-volumes:
condition: service_healthy
agent-3:
image: bittorrent-benchmarks:test
container_name: agent-3
entrypoint: [ "poetry", "run", "bittorrent-benchmarks", "agent", "experiments.local.yaml",
"deluge_agent", "--port", "9003" ]
environment:
- TORRENTS_ROOT=/var/lib/deluge/downloads
volumes:
- shared-volume-3:/var/lib/deluge
ports:
- "9003:9003"
tracker:
image: codexstorage/bittorrent-tracker
container_name: tracker
ports:
- "8000:8000"
- "8000:8000"
volumes:
shared-volume-1:
shared-volume-2:
shared-volume-3:

View File

@ -1,16 +0,0 @@
#!/usr/bin/env bash
set -e
export DELUGE_NODE_1=deluge-1
export DELUGE_NODE_2=deluge-2
export DELUGE_NODE_3=deluge-3
export TRACKER_ANNOUNCE_URL=http://tracker:8000/announce
# Initializes the shared volume.
echo "Initializing shared volume."
mkdir -p /opt/bittorrent-benchmarks/volume/deluge-{1,2,3}
touch /opt/bittorrent-benchmarks/volume/.initialized
echo "Launching tests."
cd /opt/bittorrent-benchmarks
poetry run pytest --exitfirst

View File

@ -10,7 +10,7 @@ WORKDIR /opt/bittorrent-benchmarks
COPY pyproject.toml poetry.lock ./
RUN if [ "$BUILD_TYPE" = "release" ]; then \
echo "Image is a release build"; \
poetry install --only main --no-root; \
poetry install --without dev --no-root; \
else \
echo "Image is a test build"; \
poetry install --no-root; \

View File

@ -12,11 +12,17 @@ deluge_experiment:
address: ${DELUGE_NODE_1:-localhost}
daemon_port: 6890
listen_ports: [ 6891, 6892 ]
agent_url: http://${DELUGE_AGENT_1:-localhost}:9001/
- name: deluge-2
address: ${DELUGE_NODE_2:-localhost}
daemon_port: 6893
listen_ports: [ 6894, 6895 ]
- name: deluge-2
agent_url: http://${DELUGE_AGENT_2:-localhost}:9002/
- name: deluge-3
address: ${DELUGE_NODE_3:-localhost}
daemon_port: 6896
listen_ports: [ 6897, 6898 ]
agent_url: http://${DELUGE_AGENT_3:-localhost}:9003/
deluge_agent:
torrents_path: /var/lib/deluge/downloads

159
poetry.lock generated
View File

@ -11,6 +11,27 @@ files = [
{file = "annotated_types-0.7.0.tar.gz", hash = "sha256:aff07c09a53a08bc8cfccb9c85b05f1aa9a2a6f23728d790723543408344ce89"},
]
[[package]]
name = "anyio"
version = "4.8.0"
description = "High level compatibility layer for multiple asynchronous event loop implementations"
optional = false
python-versions = ">=3.9"
files = [
{file = "anyio-4.8.0-py3-none-any.whl", hash = "sha256:b5011f270ab5eb0abf13385f851315585cc37ef330dd88e27ec3d34d651fd47a"},
{file = "anyio-4.8.0.tar.gz", hash = "sha256:1d9fe889df5212298c0c0723fa20479d1b94883a2df44bd3897aa91083316f7a"},
]
[package.dependencies]
idna = ">=2.8"
sniffio = ">=1.1"
typing_extensions = {version = ">=4.5", markers = "python_version < \"3.13\""}
[package.extras]
doc = ["Sphinx (>=7.4,<8.0)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx_rtd_theme"]
test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "trustme", "truststore (>=0.9.1)", "uvloop (>=0.21)"]
trio = ["trio (>=0.26.1)"]
[[package]]
name = "certifi"
version = "2024.12.14"
@ -134,6 +155,20 @@ files = [
{file = "charset_normalizer-3.4.1.tar.gz", hash = "sha256:44251f18cd68a75b56585dd00dae26183e102cd5e0f9f1466e6df5da2ed64ea3"},
]
[[package]]
name = "click"
version = "8.1.8"
description = "Composable command line interface toolkit"
optional = false
python-versions = ">=3.7"
files = [
{file = "click-8.1.8-py3-none-any.whl", hash = "sha256:63c132bbbed01578a06712a2d1f497bb62d9c1c0d329b7903a866228027263b2"},
{file = "click-8.1.8.tar.gz", hash = "sha256:ed53c9d8990d83c2a27deae68e4ee337473f6330c040a31d4225c9574d16096a"},
]
[package.dependencies]
colorama = {version = "*", markers = "platform_system == \"Windows\""}
[[package]]
name = "colorama"
version = "0.4.6"
@ -167,6 +202,26 @@ files = [
{file = "distlib-0.3.9.tar.gz", hash = "sha256:a60f20dea646b8a33f3e7772f74dc0b2d0772d2837ee1342a00645c81edf9403"},
]
[[package]]
name = "fastapi"
version = "0.115.6"
description = "FastAPI framework, high performance, easy to learn, fast to code, ready for production"
optional = false
python-versions = ">=3.8"
files = [
{file = "fastapi-0.115.6-py3-none-any.whl", hash = "sha256:e9240b29e36fa8f4bb7290316988e90c381e5092e0cbe84e7818cc3713bcf305"},
{file = "fastapi-0.115.6.tar.gz", hash = "sha256:9ec46f7addc14ea472958a96aae5b5de65f39721a46aaf5705c480d9a8b76654"},
]
[package.dependencies]
pydantic = ">=1.7.4,<1.8 || >1.8,<1.8.1 || >1.8.1,<2.0.0 || >2.0.0,<2.0.1 || >2.0.1,<2.1.0 || >2.1.0,<3.0.0"
starlette = ">=0.40.0,<0.42.0"
typing-extensions = ">=4.8.0"
[package.extras]
all = ["email-validator (>=2.0.0)", "fastapi-cli[standard] (>=0.0.5)", "httpx (>=0.23.0)", "itsdangerous (>=1.1.0)", "jinja2 (>=2.11.2)", "orjson (>=3.2.1)", "pydantic-extra-types (>=2.0.0)", "pydantic-settings (>=2.0.0)", "python-multipart (>=0.0.7)", "pyyaml (>=5.3.1)", "ujson (>=4.0.1,!=4.0.2,!=4.1.0,!=4.2.0,!=4.3.0,!=5.0.0,!=5.1.0)", "uvicorn[standard] (>=0.12.0)"]
standard = ["email-validator (>=2.0.0)", "fastapi-cli[standard] (>=0.0.5)", "httpx (>=0.23.0)", "jinja2 (>=2.11.2)", "python-multipart (>=0.0.7)", "uvicorn[standard] (>=0.12.0)"]
[[package]]
name = "filelock"
version = "3.16.1"
@ -183,6 +238,62 @@ docs = ["furo (>=2024.8.6)", "sphinx (>=8.0.2)", "sphinx-autodoc-typehints (>=2.
testing = ["covdefaults (>=2.3)", "coverage (>=7.6.1)", "diff-cover (>=9.2)", "pytest (>=8.3.3)", "pytest-asyncio (>=0.24)", "pytest-cov (>=5)", "pytest-mock (>=3.14)", "pytest-timeout (>=2.3.1)", "virtualenv (>=20.26.4)"]
typing = ["typing-extensions (>=4.12.2)"]
[[package]]
name = "h11"
version = "0.14.0"
description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1"
optional = false
python-versions = ">=3.7"
files = [
{file = "h11-0.14.0-py3-none-any.whl", hash = "sha256:e3fe4ac4b851c468cc8363d500db52c2ead036020723024a109d37346efaa761"},
{file = "h11-0.14.0.tar.gz", hash = "sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d"},
]
[[package]]
name = "httpcore"
version = "1.0.7"
description = "A minimal low-level HTTP client."
optional = false
python-versions = ">=3.8"
files = [
{file = "httpcore-1.0.7-py3-none-any.whl", hash = "sha256:a3fff8f43dc260d5bd363d9f9cf1830fa3a458b332856f34282de498ed420edd"},
{file = "httpcore-1.0.7.tar.gz", hash = "sha256:8551cb62a169ec7162ac7be8d4817d561f60e08eaa485234898414bb5a8a0b4c"},
]
[package.dependencies]
certifi = "*"
h11 = ">=0.13,<0.15"
[package.extras]
asyncio = ["anyio (>=4.0,<5.0)"]
http2 = ["h2 (>=3,<5)"]
socks = ["socksio (==1.*)"]
trio = ["trio (>=0.22.0,<1.0)"]
[[package]]
name = "httpx"
version = "0.28.1"
description = "The next generation HTTP client."
optional = false
python-versions = ">=3.8"
files = [
{file = "httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad"},
{file = "httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc"},
]
[package.dependencies]
anyio = "*"
certifi = "*"
httpcore = "==1.*"
idna = "*"
[package.extras]
brotli = ["brotli", "brotlicffi"]
cli = ["click (==8.*)", "pygments (==2.*)", "rich (>=10,<14)"]
http2 = ["h2 (>=3,<5)"]
socks = ["socksio (==1.*)"]
zstd = ["zstandard (>=0.18.0)"]
[[package]]
name = "identify"
version = "2.6.5"
@ -640,6 +751,34 @@ files = [
{file = "ruff-0.8.6.tar.gz", hash = "sha256:dcad24b81b62650b0eb8814f576fc65cfee8674772a6e24c9b747911801eeaa5"},
]
[[package]]
name = "sniffio"
version = "1.3.1"
description = "Sniff out which async library your code is running under"
optional = false
python-versions = ">=3.7"
files = [
{file = "sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2"},
{file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"},
]
[[package]]
name = "starlette"
version = "0.41.3"
description = "The little ASGI library that shines."
optional = false
python-versions = ">=3.8"
files = [
{file = "starlette-0.41.3-py3-none-any.whl", hash = "sha256:44cedb2b7c77a9de33a8b74b2b90e9f50d11fcf25d8270ea525ad71a25374ff7"},
{file = "starlette-0.41.3.tar.gz", hash = "sha256:0e4ab3d16522a255be6b28260b938eae2482f98ce5cc934cb08dce8dc3ba5835"},
]
[package.dependencies]
anyio = ">=3.4.0,<5"
[package.extras]
full = ["httpx (>=0.22.0)", "itsdangerous", "jinja2", "python-multipart (>=0.0.7)", "pyyaml"]
[[package]]
name = "tenacity"
version = "9.0.0"
@ -722,6 +861,24 @@ h2 = ["h2 (>=4,<5)"]
socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"]
zstd = ["zstandard (>=0.18.0)"]
[[package]]
name = "uvicorn"
version = "0.34.0"
description = "The lightning-fast ASGI server."
optional = false
python-versions = ">=3.9"
files = [
{file = "uvicorn-0.34.0-py3-none-any.whl", hash = "sha256:023dc038422502fa28a09c7a30bf2b6991512da7dcdb8fd35fe57cfc154126f4"},
{file = "uvicorn-0.34.0.tar.gz", hash = "sha256:404051050cd7e905de2c9a7e61790943440b3416f49cb409f965d9dcd0fa73e9"},
]
[package.dependencies]
click = ">=7.0"
h11 = ">=0.8"
[package.extras]
standard = ["colorama (>=0.4)", "httptools (>=0.6.3)", "python-dotenv (>=0.13)", "pyyaml (>=5.1)", "uvloop (>=0.14.0,!=0.15.0,!=0.15.1)", "watchfiles (>=0.13)", "websockets (>=10.4)"]
[[package]]
name = "virtualenv"
version = "20.28.1"
@ -745,4 +902,4 @@ test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess
[metadata]
lock-version = "2.0"
python-versions = "^3.12"
content-hash = "ecc038d5e0e05d072a39a294ef96e7045554678cfc870bafe13b07385208e0f2"
content-hash = "1cb8a00bb7c5eebcafcd9415d1513517c35dd450ca0d4e93a2584f81b08412fe"

View File

@ -16,17 +16,23 @@ pyyaml = "^6.0.2"
requests = "^2.32.3"
ruff = "^0.8.6"
tenacity = "^9.0.0"
fastapi = "^0.115.6"
[tool.poetry.group.test.dependencies]
pytest = "^8.3.3"
mypy = "^1.13.0"
types-pyyaml = "^6.0.12.20240917"
types-requests = "^2.32.0.20241016"
httpx = "^0.28.1"
[tool.poetry.group.dev.dependencies]
pre-commit = "^4.0.1"
[tool.poetry.group.agent.dependencies]
uvicorn = "^0.34.0"
[tool.pytest.ini_options]
markers = [
"integration: marks tests as integration tests"