initial test setup with deluge nodes
This commit is contained in:
parent
8686c71ccf
commit
33c445bee8
|
@ -0,0 +1 @@
|
|||
from benchmarks.core.tests.fixtures import *
|
|
@ -0,0 +1,113 @@
|
|||
import base64
|
||||
import shutil
|
||||
from dataclasses import dataclass
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from typing import List, Union, Optional, Self, Dict, Any
|
||||
|
||||
import pathvalidate
|
||||
from deluge_client import DelugeRPCClient
|
||||
from torrentool.torrent import Torrent
|
||||
from urllib3.util import Url
|
||||
|
||||
from benchmarks.core.network import TNetworkHandle, SharedFSNode
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DelugeMeta:
|
||||
name: str
|
||||
announce_url: Url
|
||||
|
||||
|
||||
class DelugeNode(SharedFSNode[Torrent, DelugeMeta]):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
volume: Path,
|
||||
daemon_port: int,
|
||||
daemon_address: str = 'localhost',
|
||||
daemon_username: str = 'user',
|
||||
daemon_password: str = 'password',
|
||||
) -> None:
|
||||
if not pathvalidate.is_valid_filename(name):
|
||||
raise ValueError(f'Node name must be a valid filename (bad name: "{name}")')
|
||||
|
||||
self.name = name
|
||||
self.downloads_root = volume / name / 'downloads'
|
||||
|
||||
self._rpc: Optional[DelugeRPCClient] = None
|
||||
self.daemon_args = {
|
||||
'host': daemon_address,
|
||||
'port': daemon_port,
|
||||
'username': daemon_username,
|
||||
'password': daemon_password,
|
||||
}
|
||||
|
||||
super().__init__(self.downloads_root)
|
||||
|
||||
self._init_folders()
|
||||
|
||||
def wipe_all_torrents(self):
|
||||
torrent_ids = list(self.rpc.core.get_torrents_status({}, []).keys())
|
||||
if torrent_ids:
|
||||
errors = self.rpc.core.remove_torrents(torrent_ids, remove_data=True)
|
||||
if errors:
|
||||
raise Exception(f'There were errors removing torrents: {errors}')
|
||||
|
||||
# Wipe download folder to get rid of files that got uploaded but failed
|
||||
# seeding or deletes.
|
||||
shutil.rmtree(self.downloads_root)
|
||||
self._init_folders()
|
||||
|
||||
def seed(
|
||||
self,
|
||||
file: Path,
|
||||
handle: Union[DelugeMeta, Torrent],
|
||||
) -> Torrent:
|
||||
data_root = self.downloads_root / handle.name
|
||||
data_root.mkdir(parents=True, exist_ok=False)
|
||||
|
||||
target = self.upload(local=file, name=handle.name)
|
||||
|
||||
if isinstance(handle, DelugeMeta):
|
||||
torrent = Torrent.create_from(target.parent)
|
||||
torrent.announce_urls = handle.announce_url.url
|
||||
torrent.name = handle.name
|
||||
else:
|
||||
torrent = handle
|
||||
|
||||
self.rpc.core.add_torrent_file(
|
||||
filename=f'{handle.name}.torrent',
|
||||
filedump=self._b64dump(torrent),
|
||||
options=dict(),
|
||||
)
|
||||
|
||||
return torrent
|
||||
|
||||
def leech(self, handle: TNetworkHandle):
|
||||
pass
|
||||
|
||||
def torrent_info(self, name: str) -> List[Dict[bytes, Any]]:
|
||||
return list(self.rpc.core.get_torrents_status({'name': name}, []).values())
|
||||
|
||||
@property
|
||||
def rpc(self) -> DelugeRPCClient:
|
||||
if self._rpc is None:
|
||||
self.connect()
|
||||
return self._rpc
|
||||
|
||||
def connect(self) -> Self:
|
||||
client = DelugeRPCClient(**self.daemon_args)
|
||||
client.connect()
|
||||
self._rpc = client
|
||||
return self
|
||||
|
||||
def _init_folders(self):
|
||||
self.downloads_root.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
@staticmethod
|
||||
def _b64dump(handle: Torrent) -> bytes:
|
||||
buffer = BytesIO()
|
||||
buffer.write(handle.to_string())
|
||||
return base64.b64encode(buffer.getvalue())
|
|
@ -1,40 +1,65 @@
|
|||
import shutil
|
||||
from abc import abstractmethod, ABC
|
||||
from pathlib import Path
|
||||
from typing import Sequence
|
||||
|
||||
from typing_extensions import Generic, TypeVar, List, Optional
|
||||
from typing_extensions import Generic, TypeVar, Union
|
||||
|
||||
TNode = TypeVar('TNode', bound='Node')
|
||||
TFileHandle = TypeVar('TFileHandle')
|
||||
TNetworkHandle = TypeVar('TNetworkHandle')
|
||||
TInitialMetadata = TypeVar('TInitialMetadata')
|
||||
|
||||
|
||||
class Node(ABC, Generic[TFileHandle]):
|
||||
class Node(ABC, Generic[TNetworkHandle, TInitialMetadata]):
|
||||
"""A :class:`Node` represents a peer within a :class:`FileSharingNetwork`."""
|
||||
|
||||
@abstractmethod
|
||||
def seed(
|
||||
self,
|
||||
file: Path,
|
||||
handle: Optional[TFileHandle]
|
||||
) -> TFileHandle:
|
||||
handle: Union[TInitialMetadata, TNetworkHandle],
|
||||
) -> TNetworkHandle:
|
||||
"""
|
||||
Makes the current :class:`Node` a seeder for the specified file.
|
||||
|
||||
:param file: path to the file to seed.
|
||||
:param handle: an existing network handle to this file. If none is provided, a new one
|
||||
will be generated.
|
||||
:param file: local path to the file to seed.
|
||||
:param handle: file sharing requires some initial set of information when a file is first uploaded into the
|
||||
network, and that will typically then result into a compact representation such as a CID or a Torrent file,
|
||||
which other nodes can then use to identify the file and its metadata within the network. This method can
|
||||
take both such initial metadata (TInitialMetadata) or the subsequent network handle (TNetworkHandle) if
|
||||
it exists.
|
||||
"""
|
||||
pass
|
||||
|
||||
def leech(self, handle: TFileHandle):
|
||||
@abstractmethod
|
||||
def leech(self, handle: TNetworkHandle):
|
||||
"""Makes the current node a leecher for the provided handle."""
|
||||
pass
|
||||
|
||||
|
||||
class FileSharingNetwork(Generic[TNode], ABC):
|
||||
class FileSharingNetwork(Generic[TNetworkHandle, TInitialMetadata], ABC):
|
||||
"""A :class:`FileSharingNetwork` is a set of :class:`Node`s that share
|
||||
an interest in a given file."""
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def nodes(self) -> List[TNode]:
|
||||
def nodes(self) -> Sequence[Node[TNetworkHandle, TInitialMetadata]]:
|
||||
pass
|
||||
|
||||
|
||||
class SharedFSNode(Node[TNetworkHandle, TInitialMetadata], ABC):
|
||||
"""A `SharedFSNode` is a :class:`Node` which shares a network volume with us. This means
|
||||
we are able to upload files to it by means of simple file copies."""
|
||||
|
||||
def __init__(self, volume: Path):
|
||||
self.volume = volume
|
||||
|
||||
def upload(self, local: Path, name: str) -> Path:
|
||||
target_path = self.volume / name
|
||||
target_path.mkdir(parents=True, exist_ok=True)
|
||||
target = target_path / local.name
|
||||
if local.is_dir():
|
||||
shutil.copytree(local, target)
|
||||
else:
|
||||
shutil.copy(local, target)
|
||||
|
||||
return target
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Generator
|
||||
|
||||
import pytest
|
||||
from urllib3.util import Url, parse_url
|
||||
|
||||
from benchmarks.core.utils import megabytes
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def temp_random_file() -> Generator[Path, None, None]:
|
||||
with tempfile.TemporaryDirectory() as temp_dir_str:
|
||||
temp_dir = Path(temp_dir_str)
|
||||
random_file = temp_dir / 'data.bin'
|
||||
random_bytes = os.urandom(megabytes(1))
|
||||
with random_file.open('wb') as outfile:
|
||||
outfile.write(random_bytes)
|
||||
|
||||
yield random_file
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tracker() -> Url:
|
||||
return parse_url('http://127.0.0.1:8000/announce')
|
|
@ -0,0 +1,32 @@
|
|||
from pathlib import Path
|
||||
from typing import Generator
|
||||
|
||||
import pytest
|
||||
from urllib3.util import Url
|
||||
|
||||
from benchmarks.core.deluge import DelugeNode, DelugeMeta
|
||||
from benchmarks.core.utils import megabytes
|
||||
from benchmarks.tests.utils import shared_volume
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def deluge_node1() -> Generator[DelugeNode, None, None]:
|
||||
node = DelugeNode('deluge-1', volume=shared_volume(), daemon_port=6890)
|
||||
node.wipe_all_torrents()
|
||||
try:
|
||||
yield node
|
||||
finally:
|
||||
node.wipe_all_torrents()
|
||||
|
||||
|
||||
def test_should_seed_files(deluge_node1: DelugeNode, temp_random_file: Path, tracker: Url):
|
||||
assert not deluge_node1.torrent_info(name='dataset1')
|
||||
|
||||
deluge_node1.seed(temp_random_file, DelugeMeta(name='dataset1', announce_url=tracker))
|
||||
response = deluge_node1.torrent_info(name='dataset1')
|
||||
assert len(response) == 1
|
||||
info = response[0]
|
||||
|
||||
assert info[b'name'] == b'dataset1'
|
||||
assert info[b'total_size'] == megabytes(1)
|
||||
assert info[b'is_seed'] == True
|
|
@ -1,19 +1,27 @@
|
|||
import random
|
||||
from pathlib import Path
|
||||
from typing import Callable, Iterator
|
||||
from typing import Callable, Iterator, Tuple
|
||||
|
||||
# A Sampler samples without replacement from [0, ..., n].
|
||||
Sampler = Callable[[int], Iterator[int]]
|
||||
type Sampler = Callable[[int], Iterator[int]]
|
||||
|
||||
# A DataGenerator generates files for experiments.
|
||||
DataGenerator = Callable[[], Path]
|
||||
type DataGenerator[TInitialMetadata] = Callable[[], Tuple[TInitialMetadata, Path]]
|
||||
|
||||
|
||||
def sample(n: int) -> Iterator[int]:
|
||||
"""Samples without replacement using a Fisher-Yates shuffle."""
|
||||
"""Samples without replacement using a basic Fisher-Yates shuffle."""
|
||||
p = list(range(0, n))
|
||||
for i in range(n - 1):
|
||||
j = i + random.randint(0, n - i)
|
||||
tmp = p[j]
|
||||
p[j], p[j + 1] = p[j + 1], tmp
|
||||
yield p[i]
|
||||
|
||||
|
||||
def kilobytes(n: int) -> int:
|
||||
return n * 1024
|
||||
|
||||
|
||||
def megabytes(n: int) -> int:
|
||||
return kilobytes(n) * 1024
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
from typing_extensions import Generic
|
||||
|
||||
from benchmarks.core.network import FileSharingNetwork, TNode
|
||||
from benchmarks.core.network import FileSharingNetwork, TInitialMetadata, TNetworkHandle
|
||||
from benchmarks.core.utils import Sampler, DataGenerator
|
||||
|
||||
|
||||
class StaticDisseminationExperiment(Generic[TNode]):
|
||||
class StaticDisseminationExperiment(Generic[TNetworkHandle, TInitialMetadata]):
|
||||
def __init__(
|
||||
self,
|
||||
network: FileSharingNetwork[TNode],
|
||||
network: FileSharingNetwork[TNetworkHandle, TInitialMetadata],
|
||||
seeders: int,
|
||||
sampler: Sampler,
|
||||
generator: DataGenerator
|
||||
|
@ -25,8 +25,8 @@ class StaticDisseminationExperiment(Generic[TNode]):
|
|||
[self.network.nodes[i] for i in range(0, len(self.network.nodes)) if i not in seeder_idx]
|
||||
)
|
||||
|
||||
data = self.generate_data()
|
||||
handle = None
|
||||
meta, data = self.generate_data()
|
||||
handle = meta
|
||||
|
||||
for node in seeders:
|
||||
handle = node.seed(data, handle)
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Optional, List
|
||||
from typing import Optional, List, Tuple, Union, Sequence
|
||||
|
||||
from benchmarks.core.network import FileSharingNetwork, TFileHandle, TNode, Node
|
||||
from benchmarks.core.network import FileSharingNetwork, Node
|
||||
from benchmarks.core.utils import Sampler
|
||||
from benchmarks.experiments.static_experiment import StaticDisseminationExperiment
|
||||
|
||||
|
@ -10,33 +10,42 @@ from benchmarks.experiments.static_experiment import StaticDisseminationExperime
|
|||
@dataclass
|
||||
class MockHandle:
|
||||
path: Path
|
||||
name: str
|
||||
|
||||
|
||||
def mock_sampler(elements: List[int]) -> Sampler:
|
||||
return lambda _: iter(elements)
|
||||
|
||||
|
||||
class MockNode(Node[MockHandle]):
|
||||
class MockNode(Node[MockHandle, str]):
|
||||
|
||||
def __init__(self):
|
||||
self.seeding: Optional[Path] = None
|
||||
def __init__(self) -> None:
|
||||
self.seeding: Optional[Tuple[MockHandle, Path]] = None
|
||||
self.leeching: Optional[MockHandle] = None
|
||||
|
||||
def seed(self, path: Path, handle: Optional[MockHandle] = None) -> MockHandle:
|
||||
self.seeding = path
|
||||
return MockHandle(path)
|
||||
def seed(
|
||||
self,
|
||||
file: Path,
|
||||
handle: Union[str, MockHandle]
|
||||
) -> MockHandle:
|
||||
if isinstance(handle, MockHandle):
|
||||
self.seeding = (handle, file)
|
||||
else:
|
||||
self.seeding = (MockHandle(name=handle, path=file), file)
|
||||
|
||||
return self.seeding[0]
|
||||
|
||||
def leech(self, handle: MockHandle):
|
||||
self.leeching = handle
|
||||
|
||||
|
||||
class MockFileSharingNetwork(FileSharingNetwork[MockNode]):
|
||||
class MockFileSharingNetwork(FileSharingNetwork[MockHandle, str]):
|
||||
|
||||
def __init__(self, n: int):
|
||||
def __init__(self, n: int) -> None:
|
||||
self._nodes = [MockNode() for _ in range(n)]
|
||||
|
||||
@property
|
||||
def nodes(self) -> List[MockNode]:
|
||||
def nodes(self) -> Sequence[Node[MockHandle, str]]:
|
||||
return self._nodes
|
||||
|
||||
|
||||
|
@ -49,7 +58,7 @@ def test_should_place_seeders():
|
|||
seeders=3,
|
||||
sampler=mock_sampler(seeder_indexes),
|
||||
network=network,
|
||||
generator=lambda: Path('/path/to/data'),
|
||||
generator=lambda: ('data', Path('/path/to/data')),
|
||||
)
|
||||
|
||||
experiment.run()
|
||||
|
@ -58,7 +67,7 @@ def test_should_place_seeders():
|
|||
for index, node in enumerate(network.nodes):
|
||||
if node.seeding is not None:
|
||||
actual_seeders.add(index)
|
||||
assert node.seeding == file
|
||||
assert node.seeding[0] == MockHandle(name='data', path=file)
|
||||
|
||||
assert actual_seeders == set(seeder_indexes)
|
||||
|
||||
|
@ -72,7 +81,7 @@ def test_should_place_leechers():
|
|||
seeders=3,
|
||||
sampler=mock_sampler(seeder_indexes),
|
||||
network=network,
|
||||
generator=lambda: Path('/path/to/data'),
|
||||
generator=lambda: ('data', Path('/path/to/data')),
|
||||
)
|
||||
|
||||
experiment.run()
|
||||
|
@ -81,6 +90,7 @@ def test_should_place_leechers():
|
|||
for index, node in enumerate(network.nodes):
|
||||
if node.leeching is not None:
|
||||
assert node.leeching.path == file
|
||||
assert node.leeching.name == 'data'
|
||||
assert node.seeding is None
|
||||
actual_leechers.add(index)
|
||||
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
from pathlib import Path
|
||||
|
||||
|
||||
def shared_volume() -> Path:
|
||||
return Path(__file__).parent.parent.parent.joinpath('volume')
|
|
@ -0,0 +1,6 @@
|
|||
#!/usr/bin/env bash
|
||||
set -e
|
||||
|
||||
# These have to be wiped out before we boot the containers.
|
||||
rm -rf ./volume/{deluge-1,deluge-2}
|
||||
docker compose up
|
|
@ -0,0 +1,35 @@
|
|||
# You will need [rootless Docker](https://docs.docker.com/engine/security/rootless/)
|
||||
# for this to work cause the tests rely on user-writable bind mounts.
|
||||
|
||||
services:
|
||||
deluge-1:
|
||||
image: codexstorage/deluge
|
||||
container_name: deluge-1
|
||||
environment:
|
||||
- DELUGE_RPC_PORT=6890
|
||||
- DELUGE_LISTEN_PORTS=6891,6892
|
||||
volumes:
|
||||
- ./volume/deluge-1:/var/lib/deluge
|
||||
- ./volume/deluge-1/downloads:/var/lib/deluge/downloads
|
||||
ports:
|
||||
- "6890:6890"
|
||||
- "6891-6892:6891-6892"
|
||||
|
||||
deluge-2:
|
||||
image: codexstorage/deluge
|
||||
container_name: deluge-2
|
||||
environment:
|
||||
- DELUGE_RPC_PORT=6893
|
||||
- DELUGE_LISTEN_PORTS=6894,6895
|
||||
volumes:
|
||||
- ./volume/deluge-2:/var/lib/deluge
|
||||
- ./volume/deluge-2/downloads:/var/lib/deluge/downloads
|
||||
ports:
|
||||
- "6893:6893"
|
||||
- "6894-6895:6894-6895"
|
||||
|
||||
tracker:
|
||||
image: codexstorage/bittorrent-tracker
|
||||
container_name: tracker
|
||||
ports:
|
||||
- "8000:8000"
|
|
@ -107,6 +107,22 @@ files = [
|
|||
{file = "packaging-24.1.tar.gz", hash = "sha256:026ed72c8ed3fcce5bf8950572258698927fd1dbda10a5e981cdf0ac37f4f002"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pathvalidate"
|
||||
version = "3.2.1"
|
||||
description = "pathvalidate is a Python library to sanitize/validate a string such as filenames/file-paths/etc."
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
{file = "pathvalidate-3.2.1-py3-none-any.whl", hash = "sha256:9a6255eb8f63c9e2135b9be97a5ce08f10230128c4ae7b3e935378b82b22c4c9"},
|
||||
{file = "pathvalidate-3.2.1.tar.gz", hash = "sha256:f5d07b1e2374187040612a1fcd2bcb2919f8db180df254c9581bb90bf903377d"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
docs = ["Sphinx (>=2.4)", "sphinx-rtd-theme (>=1.2.2)", "urllib3 (<2)"]
|
||||
readme = ["path (>=13,<17)", "readmemaker (>=1.1.0)"]
|
||||
test = ["Faker (>=1.0.8)", "allpairspy (>=2)", "click (>=6.2)", "pytest (>=6.0.1)", "pytest-md-report (>=0.6.2)"]
|
||||
|
||||
[[package]]
|
||||
name = "pluggy"
|
||||
version = "1.5.0"
|
||||
|
@ -142,6 +158,20 @@ pluggy = ">=1.5,<2"
|
|||
[package.extras]
|
||||
dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"]
|
||||
|
||||
[[package]]
|
||||
name = "torrentool"
|
||||
version = "1.2.0"
|
||||
description = "The tool to work with torrent files."
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
{file = "torrentool-1.2.0-py3-none-any.whl", hash = "sha256:bc6c55622e23978cf3c1e4aaf8f087971d75608c15b83be5a2c029464d3dd803"},
|
||||
{file = "torrentool-1.2.0.tar.gz", hash = "sha256:72cdd049eaf856ddc907d1d61527764ef0288512087d93a49267e00c4033c429"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
cli = ["click"]
|
||||
|
||||
[[package]]
|
||||
name = "typing-extensions"
|
||||
version = "4.12.2"
|
||||
|
@ -156,4 +186,4 @@ files = [
|
|||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.12"
|
||||
content-hash = "4c657d5b89f926722ec65f35124c9ac0e8138a4264be1860490ab5eaa8a1eb44"
|
||||
content-hash = "8bd651f652770aa65718872bbe04dd3851922037be99281bceaea5e379a5cc4c"
|
||||
|
|
|
@ -5,16 +5,21 @@ description = "Harness for benchmarking Codex against BitTorrent."
|
|||
authors = ["Your Name <you@example.com>"]
|
||||
license = "MIT"
|
||||
readme = "README.md"
|
||||
package-mode = false
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.12"
|
||||
deluge-client = "^1.10.2"
|
||||
|
||||
pathvalidate = "^3.2.1"
|
||||
torrentool = "^1.2.0"
|
||||
|
||||
[tool.poetry.group.test.dependencies]
|
||||
pytest = "^8.3.3"
|
||||
mypy = "^1.13.0"
|
||||
|
||||
[tool.mypy]
|
||||
ignore_missing_imports = true
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
|
|
Loading…
Reference in New Issue