diff --git a/benchmarks/conftest.py b/benchmarks/conftest.py new file mode 100644 index 0000000..0e09841 --- /dev/null +++ b/benchmarks/conftest.py @@ -0,0 +1 @@ +from benchmarks.core.tests.fixtures import * diff --git a/benchmarks/core/deluge.py b/benchmarks/core/deluge.py new file mode 100644 index 0000000..b69830f --- /dev/null +++ b/benchmarks/core/deluge.py @@ -0,0 +1,113 @@ +import base64 +import shutil +from dataclasses import dataclass +from io import BytesIO +from pathlib import Path +from typing import List, Union, Optional, Self, Dict, Any + +import pathvalidate +from deluge_client import DelugeRPCClient +from torrentool.torrent import Torrent +from urllib3.util import Url + +from benchmarks.core.network import TNetworkHandle, SharedFSNode + + +@dataclass(frozen=True) +class DelugeMeta: + name: str + announce_url: Url + + +class DelugeNode(SharedFSNode[Torrent, DelugeMeta]): + + def __init__( + self, + name: str, + volume: Path, + daemon_port: int, + daemon_address: str = 'localhost', + daemon_username: str = 'user', + daemon_password: str = 'password', + ) -> None: + if not pathvalidate.is_valid_filename(name): + raise ValueError(f'Node name must be a valid filename (bad name: "{name}")') + + self.name = name + self.downloads_root = volume / name / 'downloads' + + self._rpc: Optional[DelugeRPCClient] = None + self.daemon_args = { + 'host': daemon_address, + 'port': daemon_port, + 'username': daemon_username, + 'password': daemon_password, + } + + super().__init__(self.downloads_root) + + self._init_folders() + + def wipe_all_torrents(self): + torrent_ids = list(self.rpc.core.get_torrents_status({}, []).keys()) + if torrent_ids: + errors = self.rpc.core.remove_torrents(torrent_ids, remove_data=True) + if errors: + raise Exception(f'There were errors removing torrents: {errors}') + + # Wipe download folder to get rid of files that got uploaded but failed + # seeding or deletes. + shutil.rmtree(self.downloads_root) + self._init_folders() + + def seed( + self, + file: Path, + handle: Union[DelugeMeta, Torrent], + ) -> Torrent: + data_root = self.downloads_root / handle.name + data_root.mkdir(parents=True, exist_ok=False) + + target = self.upload(local=file, name=handle.name) + + if isinstance(handle, DelugeMeta): + torrent = Torrent.create_from(target.parent) + torrent.announce_urls = handle.announce_url.url + torrent.name = handle.name + else: + torrent = handle + + self.rpc.core.add_torrent_file( + filename=f'{handle.name}.torrent', + filedump=self._b64dump(torrent), + options=dict(), + ) + + return torrent + + def leech(self, handle: TNetworkHandle): + pass + + def torrent_info(self, name: str) -> List[Dict[bytes, Any]]: + return list(self.rpc.core.get_torrents_status({'name': name}, []).values()) + + @property + def rpc(self) -> DelugeRPCClient: + if self._rpc is None: + self.connect() + return self._rpc + + def connect(self) -> Self: + client = DelugeRPCClient(**self.daemon_args) + client.connect() + self._rpc = client + return self + + def _init_folders(self): + self.downloads_root.mkdir(parents=True, exist_ok=True) + + @staticmethod + def _b64dump(handle: Torrent) -> bytes: + buffer = BytesIO() + buffer.write(handle.to_string()) + return base64.b64encode(buffer.getvalue()) diff --git a/benchmarks/core/network.py b/benchmarks/core/network.py index b19a7e4..8e1a91f 100644 --- a/benchmarks/core/network.py +++ b/benchmarks/core/network.py @@ -1,40 +1,65 @@ +import shutil from abc import abstractmethod, ABC from pathlib import Path +from typing import Sequence -from typing_extensions import Generic, TypeVar, List, Optional +from typing_extensions import Generic, TypeVar, Union -TNode = TypeVar('TNode', bound='Node') -TFileHandle = TypeVar('TFileHandle') +TNetworkHandle = TypeVar('TNetworkHandle') +TInitialMetadata = TypeVar('TInitialMetadata') -class Node(ABC, Generic[TFileHandle]): +class Node(ABC, Generic[TNetworkHandle, TInitialMetadata]): """A :class:`Node` represents a peer within a :class:`FileSharingNetwork`.""" @abstractmethod def seed( self, file: Path, - handle: Optional[TFileHandle] - ) -> TFileHandle: + handle: Union[TInitialMetadata, TNetworkHandle], + ) -> TNetworkHandle: """ Makes the current :class:`Node` a seeder for the specified file. - :param file: path to the file to seed. - :param handle: an existing network handle to this file. If none is provided, a new one - will be generated. + :param file: local path to the file to seed. + :param handle: file sharing requires some initial set of information when a file is first uploaded into the + network, and that will typically then result into a compact representation such as a CID or a Torrent file, + which other nodes can then use to identify the file and its metadata within the network. This method can + take both such initial metadata (TInitialMetadata) or the subsequent network handle (TNetworkHandle) if + it exists. """ pass - def leech(self, handle: TFileHandle): + @abstractmethod + def leech(self, handle: TNetworkHandle): """Makes the current node a leecher for the provided handle.""" pass -class FileSharingNetwork(Generic[TNode], ABC): +class FileSharingNetwork(Generic[TNetworkHandle, TInitialMetadata], ABC): """A :class:`FileSharingNetwork` is a set of :class:`Node`s that share an interest in a given file.""" @property @abstractmethod - def nodes(self) -> List[TNode]: + def nodes(self) -> Sequence[Node[TNetworkHandle, TInitialMetadata]]: pass + + +class SharedFSNode(Node[TNetworkHandle, TInitialMetadata], ABC): + """A `SharedFSNode` is a :class:`Node` which shares a network volume with us. This means + we are able to upload files to it by means of simple file copies.""" + + def __init__(self, volume: Path): + self.volume = volume + + def upload(self, local: Path, name: str) -> Path: + target_path = self.volume / name + target_path.mkdir(parents=True, exist_ok=True) + target = target_path / local.name + if local.is_dir(): + shutil.copytree(local, target) + else: + shutil.copy(local, target) + + return target diff --git a/benchmarks/core/tests/__init__.py b/benchmarks/core/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/benchmarks/core/tests/fixtures.py b/benchmarks/core/tests/fixtures.py new file mode 100644 index 0000000..4c696fb --- /dev/null +++ b/benchmarks/core/tests/fixtures.py @@ -0,0 +1,26 @@ +import os +import tempfile +from pathlib import Path +from typing import Generator + +import pytest +from urllib3.util import Url, parse_url + +from benchmarks.core.utils import megabytes + + +@pytest.fixture +def temp_random_file() -> Generator[Path, None, None]: + with tempfile.TemporaryDirectory() as temp_dir_str: + temp_dir = Path(temp_dir_str) + random_file = temp_dir / 'data.bin' + random_bytes = os.urandom(megabytes(1)) + with random_file.open('wb') as outfile: + outfile.write(random_bytes) + + yield random_file + + +@pytest.fixture +def tracker() -> Url: + return parse_url('http://127.0.0.1:8000/announce') diff --git a/benchmarks/core/tests/test_deluge_node.py b/benchmarks/core/tests/test_deluge_node.py new file mode 100644 index 0000000..2c75e7a --- /dev/null +++ b/benchmarks/core/tests/test_deluge_node.py @@ -0,0 +1,32 @@ +from pathlib import Path +from typing import Generator + +import pytest +from urllib3.util import Url + +from benchmarks.core.deluge import DelugeNode, DelugeMeta +from benchmarks.core.utils import megabytes +from benchmarks.tests.utils import shared_volume + + +@pytest.fixture +def deluge_node1() -> Generator[DelugeNode, None, None]: + node = DelugeNode('deluge-1', volume=shared_volume(), daemon_port=6890) + node.wipe_all_torrents() + try: + yield node + finally: + node.wipe_all_torrents() + + +def test_should_seed_files(deluge_node1: DelugeNode, temp_random_file: Path, tracker: Url): + assert not deluge_node1.torrent_info(name='dataset1') + + deluge_node1.seed(temp_random_file, DelugeMeta(name='dataset1', announce_url=tracker)) + response = deluge_node1.torrent_info(name='dataset1') + assert len(response) == 1 + info = response[0] + + assert info[b'name'] == b'dataset1' + assert info[b'total_size'] == megabytes(1) + assert info[b'is_seed'] == True diff --git a/benchmarks/core/utils.py b/benchmarks/core/utils.py index 056e60b..8229b2d 100644 --- a/benchmarks/core/utils.py +++ b/benchmarks/core/utils.py @@ -1,19 +1,27 @@ import random from pathlib import Path -from typing import Callable, Iterator +from typing import Callable, Iterator, Tuple # A Sampler samples without replacement from [0, ..., n]. -Sampler = Callable[[int], Iterator[int]] +type Sampler = Callable[[int], Iterator[int]] # A DataGenerator generates files for experiments. -DataGenerator = Callable[[], Path] +type DataGenerator[TInitialMetadata] = Callable[[], Tuple[TInitialMetadata, Path]] def sample(n: int) -> Iterator[int]: - """Samples without replacement using a Fisher-Yates shuffle.""" + """Samples without replacement using a basic Fisher-Yates shuffle.""" p = list(range(0, n)) for i in range(n - 1): j = i + random.randint(0, n - i) tmp = p[j] p[j], p[j + 1] = p[j + 1], tmp yield p[i] + + +def kilobytes(n: int) -> int: + return n * 1024 + + +def megabytes(n: int) -> int: + return kilobytes(n) * 1024 diff --git a/benchmarks/experiments/static_experiment.py b/benchmarks/experiments/static_experiment.py index 4cadd47..906c1d7 100644 --- a/benchmarks/experiments/static_experiment.py +++ b/benchmarks/experiments/static_experiment.py @@ -1,13 +1,13 @@ from typing_extensions import Generic -from benchmarks.core.network import FileSharingNetwork, TNode +from benchmarks.core.network import FileSharingNetwork, TInitialMetadata, TNetworkHandle from benchmarks.core.utils import Sampler, DataGenerator -class StaticDisseminationExperiment(Generic[TNode]): +class StaticDisseminationExperiment(Generic[TNetworkHandle, TInitialMetadata]): def __init__( self, - network: FileSharingNetwork[TNode], + network: FileSharingNetwork[TNetworkHandle, TInitialMetadata], seeders: int, sampler: Sampler, generator: DataGenerator @@ -25,8 +25,8 @@ class StaticDisseminationExperiment(Generic[TNode]): [self.network.nodes[i] for i in range(0, len(self.network.nodes)) if i not in seeder_idx] ) - data = self.generate_data() - handle = None + meta, data = self.generate_data() + handle = meta for node in seeders: handle = node.seed(data, handle) diff --git a/benchmarks/experiments/tests/test_static_experiment.py b/benchmarks/experiments/tests/test_static_experiment.py index 00262c4..1878323 100644 --- a/benchmarks/experiments/tests/test_static_experiment.py +++ b/benchmarks/experiments/tests/test_static_experiment.py @@ -1,8 +1,8 @@ from dataclasses import dataclass from pathlib import Path -from typing import Optional, List +from typing import Optional, List, Tuple, Union, Sequence -from benchmarks.core.network import FileSharingNetwork, TFileHandle, TNode, Node +from benchmarks.core.network import FileSharingNetwork, Node from benchmarks.core.utils import Sampler from benchmarks.experiments.static_experiment import StaticDisseminationExperiment @@ -10,33 +10,42 @@ from benchmarks.experiments.static_experiment import StaticDisseminationExperime @dataclass class MockHandle: path: Path + name: str def mock_sampler(elements: List[int]) -> Sampler: return lambda _: iter(elements) -class MockNode(Node[MockHandle]): +class MockNode(Node[MockHandle, str]): - def __init__(self): - self.seeding: Optional[Path] = None + def __init__(self) -> None: + self.seeding: Optional[Tuple[MockHandle, Path]] = None self.leeching: Optional[MockHandle] = None - def seed(self, path: Path, handle: Optional[MockHandle] = None) -> MockHandle: - self.seeding = path - return MockHandle(path) + def seed( + self, + file: Path, + handle: Union[str, MockHandle] + ) -> MockHandle: + if isinstance(handle, MockHandle): + self.seeding = (handle, file) + else: + self.seeding = (MockHandle(name=handle, path=file), file) + + return self.seeding[0] def leech(self, handle: MockHandle): self.leeching = handle -class MockFileSharingNetwork(FileSharingNetwork[MockNode]): +class MockFileSharingNetwork(FileSharingNetwork[MockHandle, str]): - def __init__(self, n: int): + def __init__(self, n: int) -> None: self._nodes = [MockNode() for _ in range(n)] @property - def nodes(self) -> List[MockNode]: + def nodes(self) -> Sequence[Node[MockHandle, str]]: return self._nodes @@ -49,7 +58,7 @@ def test_should_place_seeders(): seeders=3, sampler=mock_sampler(seeder_indexes), network=network, - generator=lambda: Path('/path/to/data'), + generator=lambda: ('data', Path('/path/to/data')), ) experiment.run() @@ -58,7 +67,7 @@ def test_should_place_seeders(): for index, node in enumerate(network.nodes): if node.seeding is not None: actual_seeders.add(index) - assert node.seeding == file + assert node.seeding[0] == MockHandle(name='data', path=file) assert actual_seeders == set(seeder_indexes) @@ -72,7 +81,7 @@ def test_should_place_leechers(): seeders=3, sampler=mock_sampler(seeder_indexes), network=network, - generator=lambda: Path('/path/to/data'), + generator=lambda: ('data', Path('/path/to/data')), ) experiment.run() @@ -81,6 +90,7 @@ def test_should_place_leechers(): for index, node in enumerate(network.nodes): if node.leeching is not None: assert node.leeching.path == file + assert node.leeching.name == 'data' assert node.seeding is None actual_leechers.add(index) diff --git a/benchmarks/tests/__init__.py b/benchmarks/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/benchmarks/tests/utils.py b/benchmarks/tests/utils.py new file mode 100644 index 0000000..ba0a5e7 --- /dev/null +++ b/benchmarks/tests/utils.py @@ -0,0 +1,5 @@ +from pathlib import Path + + +def shared_volume() -> Path: + return Path(__file__).parent.parent.parent.joinpath('volume') \ No newline at end of file diff --git a/docker-compose-up.sh b/docker-compose-up.sh new file mode 100755 index 0000000..975890b --- /dev/null +++ b/docker-compose-up.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash +set -e + +# These have to be wiped out before we boot the containers. +rm -rf ./volume/{deluge-1,deluge-2} +docker compose up \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..c6424b0 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,35 @@ +# You will need [rootless Docker](https://docs.docker.com/engine/security/rootless/) +# for this to work cause the tests rely on user-writable bind mounts. + +services: + deluge-1: + image: codexstorage/deluge + container_name: deluge-1 + environment: + - DELUGE_RPC_PORT=6890 + - DELUGE_LISTEN_PORTS=6891,6892 + volumes: + - ./volume/deluge-1:/var/lib/deluge + - ./volume/deluge-1/downloads:/var/lib/deluge/downloads + ports: + - "6890:6890" + - "6891-6892:6891-6892" + + deluge-2: + image: codexstorage/deluge + container_name: deluge-2 + environment: + - DELUGE_RPC_PORT=6893 + - DELUGE_LISTEN_PORTS=6894,6895 + volumes: + - ./volume/deluge-2:/var/lib/deluge + - ./volume/deluge-2/downloads:/var/lib/deluge/downloads + ports: + - "6893:6893" + - "6894-6895:6894-6895" + + tracker: + image: codexstorage/bittorrent-tracker + container_name: tracker + ports: + - "8000:8000" \ No newline at end of file diff --git a/poetry.lock b/poetry.lock index 2458fe6..6a7e450 100644 --- a/poetry.lock +++ b/poetry.lock @@ -107,6 +107,22 @@ files = [ {file = "packaging-24.1.tar.gz", hash = "sha256:026ed72c8ed3fcce5bf8950572258698927fd1dbda10a5e981cdf0ac37f4f002"}, ] +[[package]] +name = "pathvalidate" +version = "3.2.1" +description = "pathvalidate is a Python library to sanitize/validate a string such as filenames/file-paths/etc." +optional = false +python-versions = ">=3.7" +files = [ + {file = "pathvalidate-3.2.1-py3-none-any.whl", hash = "sha256:9a6255eb8f63c9e2135b9be97a5ce08f10230128c4ae7b3e935378b82b22c4c9"}, + {file = "pathvalidate-3.2.1.tar.gz", hash = "sha256:f5d07b1e2374187040612a1fcd2bcb2919f8db180df254c9581bb90bf903377d"}, +] + +[package.extras] +docs = ["Sphinx (>=2.4)", "sphinx-rtd-theme (>=1.2.2)", "urllib3 (<2)"] +readme = ["path (>=13,<17)", "readmemaker (>=1.1.0)"] +test = ["Faker (>=1.0.8)", "allpairspy (>=2)", "click (>=6.2)", "pytest (>=6.0.1)", "pytest-md-report (>=0.6.2)"] + [[package]] name = "pluggy" version = "1.5.0" @@ -142,6 +158,20 @@ pluggy = ">=1.5,<2" [package.extras] dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] +[[package]] +name = "torrentool" +version = "1.2.0" +description = "The tool to work with torrent files." +optional = false +python-versions = "*" +files = [ + {file = "torrentool-1.2.0-py3-none-any.whl", hash = "sha256:bc6c55622e23978cf3c1e4aaf8f087971d75608c15b83be5a2c029464d3dd803"}, + {file = "torrentool-1.2.0.tar.gz", hash = "sha256:72cdd049eaf856ddc907d1d61527764ef0288512087d93a49267e00c4033c429"}, +] + +[package.extras] +cli = ["click"] + [[package]] name = "typing-extensions" version = "4.12.2" @@ -156,4 +186,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.12" -content-hash = "4c657d5b89f926722ec65f35124c9ac0e8138a4264be1860490ab5eaa8a1eb44" +content-hash = "8bd651f652770aa65718872bbe04dd3851922037be99281bceaea5e379a5cc4c" diff --git a/pyproject.toml b/pyproject.toml index 37da87d..0b6be00 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,16 +5,21 @@ description = "Harness for benchmarking Codex against BitTorrent." authors = ["Your Name "] license = "MIT" readme = "README.md" +package-mode = false [tool.poetry.dependencies] python = "^3.12" deluge-client = "^1.10.2" - +pathvalidate = "^3.2.1" +torrentool = "^1.2.0" [tool.poetry.group.test.dependencies] pytest = "^8.3.3" mypy = "^1.13.0" +[tool.mypy] +ignore_missing_imports = true + [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api"