diff --git a/benchmarks/codex/agent/agent.py b/benchmarks/codex/agent/agent.py index fd52779..86d5c3e 100644 --- a/benchmarks/codex/agent/agent.py +++ b/benchmarks/codex/agent/agent.py @@ -8,13 +8,11 @@ from typing import Optional, Dict from pydantic import BaseModel from benchmarks.codex.client.async_client import AsyncCodexClient - +from benchmarks.codex.client.common import Cid from benchmarks.codex.client.common import Manifest from benchmarks.codex.logging import CodexDownloadMetric from benchmarks.core.utils.random import random_data -Cid = str - EMPTY_STREAM_BACKOFF = 0.1 logger = logging.getLogger(__name__) diff --git a/benchmarks/codex/client/async_client.py b/benchmarks/codex/client/async_client.py index 579aa2b..a725430 100644 --- a/benchmarks/codex/client/async_client.py +++ b/benchmarks/codex/client/async_client.py @@ -59,9 +59,7 @@ class AsyncCodexClientImpl(AsyncCodexClient): response.raise_for_status() response_contents = await response.json() - cid = response_contents.pop("cid") - - return Manifest.model_validate(dict(cid=cid, **response_contents["manifest"])) + return Manifest.from_codex_api_response(response_contents) @asynccontextmanager async def download(self, cid: Cid) -> AsyncIterator[BaseStreamReader]: diff --git a/benchmarks/codex/client/common.py b/benchmarks/codex/client/common.py index 7dff89b..1526676 100644 --- a/benchmarks/codex/client/common.py +++ b/benchmarks/codex/client/common.py @@ -14,3 +14,9 @@ class Manifest(BaseModel): mimetype: str uploadedAt: int protected: bool + + @staticmethod + def from_codex_api_response(response: dict) -> "Manifest": + return Manifest.model_validate( + dict(cid=response["cid"], **response["manifest"]) + ) diff --git a/benchmarks/codex/codex_node.py b/benchmarks/codex/codex_node.py index 6e9d8af..801d285 100644 --- a/benchmarks/codex/codex_node.py +++ b/benchmarks/codex/codex_node.py @@ -1,6 +1,7 @@ import logging import socket from functools import cached_property +from typing import Iterator, Set from urllib.error import HTTPError import requests @@ -18,9 +19,11 @@ from benchmarks.codex.agent.codex_agent_client import CodexAgentClient from benchmarks.core.concurrency import await_predicate from benchmarks.core.experiments.experiments import ExperimentComponent from benchmarks.core.network import Node, DownloadHandle +from benchmarks.core.utils.units import megabytes STOP_POLICY = stop_after_attempt(5) WAIT_POLICY = wait_exponential(exp_base=2, min=4, max=16) +DELETE_TIMEOUT = 3600 # timeouts for deletes should be generous (https://github.com/codex-storage/nim-codex/pull/1103) logger = logging.getLogger(__name__) @@ -31,9 +34,11 @@ class CodexMeta: class CodexNode(Node[Cid, CodexMeta], ExperimentComponent): - def __init__(self, codex_api_url: Url, agent: CodexAgentClient): + def __init__(self, codex_api_url: Url, agent: CodexAgentClient) -> None: self.codex_api_url = codex_api_url self.agent = agent + # Lightweight tracking of datasets created by this node. It's OK if we lose them. + self.hosted_datasets: Set[Cid] = set() def is_ready(self) -> bool: try: @@ -50,7 +55,9 @@ class CodexNode(Node[Cid, CodexMeta], ExperimentComponent): retry=retry_if_not_exception_type(HTTPError), ) def genseed(self, size: int, seed: int, meta: CodexMeta) -> Cid: - return self.agent.generate(size=size, seed=seed, name=meta.name) + cid = self.agent.generate(size=size, seed=seed, name=meta.name) + self.hosted_datasets.add(cid) + return cid @retry( stop=STOP_POLICY, @@ -58,11 +65,51 @@ class CodexNode(Node[Cid, CodexMeta], ExperimentComponent): retry=retry_if_not_exception_type(HTTPError), ) def leech(self, handle: Cid) -> DownloadHandle: + self.hosted_datasets.add(handle) return CodexDownloadHandle(parent=self, monitor_url=self.agent.download(handle)) def remove(self, handle: Cid) -> bool: - logger.warning("Removing a file from Codex is not currently supported.") - return False + response = requests.delete( + str(self.codex_api_url._replace(path=f"/api/codex/v1/data/{handle}")), + timeout=DELETE_TIMEOUT, + ) + + response.raise_for_status() + return True + + def exists_local(self, handle: Cid) -> bool: + """Check if a dataset exists on the node.""" + response = requests.get( + str(self.codex_api_url._replace(path=f"/api/codex/v1/data/{handle}")) + ) + + response.close() + + if response.status_code == 404: + return False + + if response.status_code != 200: + response.raise_for_status() + + return True + + def download_local( + self, handle: Cid, chunk_size: int = megabytes(1) + ) -> Iterator[bytes]: + """Retrieves the contents of a locally available + dataset from the node.""" + response = requests.get( + str(self.codex_api_url._replace(path=f"/api/codex/v1/data/{handle}")) + ) + + response.raise_for_status() + + return response.iter_content(chunk_size=chunk_size) + + def wipe_all_datasets(self): + for dataset in list(self.hosted_datasets): + self.remove(dataset) + self.hosted_datasets.remove(dataset) @cached_property def name(self) -> str: diff --git a/benchmarks/codex/config.py b/benchmarks/codex/config.py new file mode 100644 index 0000000..7486fbf --- /dev/null +++ b/benchmarks/codex/config.py @@ -0,0 +1,93 @@ +import random +from itertools import islice +from typing import List, cast + +from pydantic import Field +from pydantic_core import Url +from urllib3.util import parse_url + +from benchmarks.codex.agent.codex_agent_client import CodexAgentClient +from benchmarks.codex.client.common import Cid +from benchmarks.codex.codex_node import CodexMeta, CodexNode +from benchmarks.core.experiments.experiments import ( + ExperimentBuilder, + ExperimentEnvironment, + ExperimentComponent, +) +from benchmarks.core.experiments.iterated_experiment import IteratedExperiment +from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment +from benchmarks.core.pydantic import SnakeCaseModel, Host +from benchmarks.core.utils.random import sample + + +class CodexNodeConfig(SnakeCaseModel): + name: str + address: Host + disc_port: int + api_port: int + agent_url: Url + + +CodexDisseminationExperiment = IteratedExperiment[ + StaticDisseminationExperiment[Cid, CodexMeta] +] + + +class CodexExperimentConfig(ExperimentBuilder[CodexDisseminationExperiment]): + experiment_set_id: str = Field( + description="Identifies the group of experiment repetitions", default="unnamed" + ) + seeder_sets: int = Field( + gt=0, default=1, description="Number of distinct seeder sets to experiment with" + ) + seeders: int = Field(gt=0, description="Number of seeders per seeder set") + file_size: int = Field(gt=0, description="File size, in bytes") + repetitions: int = Field( + gt=0, description="How many experiment repetitions to run for each seeder set" + ) + + logging_cooldown: int = Field( + gt=0, + default=0, + description="Time to wait after the last download completes before tearing down the experiment.", + ) + + nodes: List[CodexNodeConfig] + + def build(self) -> CodexDisseminationExperiment: + agents = [ + CodexAgentClient(parse_url(str(node.agent_url))) for node in self.nodes + ] + + network = [ + CodexNode( + codex_api_url=parse_url(f"http://{str(node.address)}:{node.api_port}"), + agent=agents[i], + ) + for i, node in enumerate(self.nodes) + ] + + env = ExperimentEnvironment( + components=cast(List[ExperimentComponent], network + agents), + ping_max=10, + polling_interval=0.5, + ) + + def repetitions(): + for seeder_set in range(self.seeder_sets): + seeders = list(islice(sample(len(network)), self.seeders)) + for experiment_run in range(self.repetitions): + yield env.bind( + StaticDisseminationExperiment( + network=network, + seeders=seeders, + file_size=self.file_size, + seed=random.randint(0, 2**16), + meta=CodexMeta(f"dataset-{seeder_set}-{experiment_run}"), + logging_cooldown=self.logging_cooldown, + ) + ) + + return IteratedExperiment( + repetitions(), experiment_set_id=self.experiment_set_id + ) diff --git a/benchmarks/codex/tests/fixtures/addresses.py b/benchmarks/codex/tests/fixtures/addresses.py index 92e50df..3f3ff45 100644 --- a/benchmarks/codex/tests/fixtures/addresses.py +++ b/benchmarks/codex/tests/fixtures/addresses.py @@ -12,6 +12,11 @@ def codex_node_2_url() -> str: return f"http://{os.environ.get('CODEX_NODE_2', 'localhost')}:6893" +@pytest.fixture +def codex_node_3_url() -> str: + return f"http://{os.environ.get('CODEX_NODE_3', 'localhost')}:6895" + + @pytest.fixture def codex_agent_1_url() -> str: return f"http://{os.environ.get('CODEX_AGENT_1', 'localhost')}:9000" @@ -20,3 +25,8 @@ def codex_agent_1_url() -> str: @pytest.fixture def codex_agent_2_url() -> str: return f"http://{os.environ.get('CODEX_AGENT_2', 'localhost')}:9001" + + +@pytest.fixture +def codex_agent_3_url() -> str: + return f"http://{os.environ.get('CODEX_AGENT_3', 'localhost')}:9002" diff --git a/benchmarks/codex/tests/fixtures/fixtures.py b/benchmarks/codex/tests/fixtures/fixtures.py index 729e1b9..5f4dfe8 100644 --- a/benchmarks/codex/tests/fixtures/fixtures.py +++ b/benchmarks/codex/tests/fixtures/fixtures.py @@ -1,3 +1,5 @@ +from typing import Iterator + from urllib3.util import parse_url from benchmarks.codex.agent.codex_agent_client import CodexAgentClient @@ -7,21 +9,29 @@ from benchmarks.core.concurrency import await_predicate import pytest -def codex_node(codex_api_url: str, agent_url: str) -> CodexNode: +def codex_node(codex_api_url: str, agent_url: str) -> Iterator[CodexNode]: node = CodexNode( codex_api_url=parse_url(codex_api_url), agent=CodexAgentClient(parse_url(agent_url)), ) assert await_predicate(node.is_ready, timeout=10, polling_interval=0.5) - # TODO wipe datasets once have support in codex for doing so. - return node + + try: + yield node + finally: + node.wipe_all_datasets() @pytest.fixture -def codex_node1(codex_node_1_url: str, codex_agent_1_url: str) -> CodexNode: - return codex_node(codex_node_1_url, codex_agent_1_url) +def codex_node1(codex_node_1_url: str, codex_agent_1_url: str) -> Iterator[CodexNode]: + yield from codex_node(codex_node_1_url, codex_agent_1_url) @pytest.fixture -def codex_node2(codex_node_2_url: str, codex_agent_2_url: str) -> CodexNode: - return codex_node(codex_node_2_url, codex_agent_2_url) +def codex_node2(codex_node_2_url: str, codex_agent_2_url: str) -> Iterator[CodexNode]: + yield from codex_node(codex_node_2_url, codex_agent_2_url) + + +@pytest.fixture +def codex_node3(codex_node_3_url: str, codex_agent_3_url: str) -> Iterator[CodexNode]: + yield from codex_node(codex_node_3_url, codex_agent_3_url) diff --git a/benchmarks/codex/tests/test_codex_node.py b/benchmarks/codex/tests/test_codex_node.py index f0a000a..7c8ea38 100644 --- a/benchmarks/codex/tests/test_codex_node.py +++ b/benchmarks/codex/tests/test_codex_node.py @@ -20,3 +20,28 @@ def test_should_download_file(codex_node1: CodexNode, codex_node2: CodexNode): assert cast(CodexDownloadHandle, handle).completion() == DownloadStatus( downloaded=megabytes(1), total=megabytes(1) ) + + +@pytest.mark.codex_integration +def test_should_remove_file(codex_node1: CodexNode): + cid = codex_node1.genseed( + size=megabytes(1), + seed=1234, + meta=CodexMeta(name="dataset1"), + ) + + assert codex_node1.exists_local(cid) + assert codex_node1.remove(cid) + assert not codex_node1.exists_local(cid) + + +@pytest.mark.codex_integration +def test_should_download_file_from_local_node(codex_node1: CodexNode): + cid = codex_node1.genseed( + size=megabytes(1), + seed=1234, + meta=CodexMeta(name="dataset1"), + ) + + contents = b"".join(codex_node1.download_local(cid)) + assert len(contents) == megabytes(1) diff --git a/benchmarks/codex/tests/test_codex_static_experiment.py b/benchmarks/codex/tests/test_codex_static_experiment.py new file mode 100644 index 0000000..8c445de --- /dev/null +++ b/benchmarks/codex/tests/test_codex_static_experiment.py @@ -0,0 +1,48 @@ +from collections.abc import Iterator + +import pytest + +from benchmarks.codex.codex_node import CodexMeta +from benchmarks.core.experiments.experiments import ExperimentEnvironment +from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment +from benchmarks.core.utils.units import megabytes + + +def merge_chunks(chunks: Iterator[bytes]) -> bytes: + return b"".join(chunks) + + +@pytest.mark.codex_integration +def test_should_run_with_a_single_seeder(codex_node1, codex_node2, codex_node3): + size = megabytes(2) + env = ExperimentEnvironment( + components=[codex_node1, codex_node2, codex_node3], + polling_interval=0.5, + ) + + experiment = StaticDisseminationExperiment( + network=[codex_node1, codex_node2, codex_node3], + seeders=[1], + file_size=size, + seed=1234, + meta=CodexMeta("dataset-1"), + ) + + env.await_ready() + try: + experiment.setup() + experiment.do_run() + + all_datasets = list(codex_node1.hosted_datasets) + assert len(all_datasets) == 1 + cid = all_datasets[0] + + content_1 = merge_chunks(codex_node1.download_local(cid)) + content_2 = merge_chunks(codex_node2.download_local(cid)) + content_3 = merge_chunks(codex_node3.download_local(cid)) + + assert len(content_1) == megabytes(2) + assert content_1 == content_2 == content_3 + + finally: + experiment.teardown() diff --git a/docker-compose-codex.ci.yaml b/docker-compose-codex.ci.yaml index ef53023..726dea6 100644 --- a/docker-compose-codex.ci.yaml +++ b/docker-compose-codex.ci.yaml @@ -7,8 +7,10 @@ services: environment: - CODEX_NODE_1=codex-1 - CODEX_NODE_2=codex-2 + - CODEX_NODE_3=codex-3 - CODEX_AGENT_1=codex-agent-1 - CODEX_AGENT_2=codex-agent-2 + - CODEX_AGENT_3=codex-agent-3 depends_on: clean-volumes: condition: service_healthy diff --git a/docker-compose-codex.local.yaml b/docker-compose-codex.local.yaml index 9a2d231..da4ad2d 100644 --- a/docker-compose-codex.local.yaml +++ b/docker-compose-codex.local.yaml @@ -76,6 +76,34 @@ services: ports: - "9001:9001" + codex-3: + image: codexstorage/nim-codex:latest + container_name: codex-3 + environment: + - CODEX_LOG_LEVEL=DEBUG + - CODEX_DATA_DIR=/var/lib/codex + - CODEX_DISC_PORT=6894 + - CODEX_API_BINDADDR=0.0.0.0 + - CODEX_API_PORT=6895 + - CODEX_STORAGE_QUOTA=1073741824 # 1GB + - BOOTSTRAP_NODE_URL=http://codex-1:6891 + - NAT_IP_AUTO=true + volumes: + - codex-volume-3:/var/lib/codex + ports: + - "6894-6895:6894-6895" + + codex-agent-3: + image: bittorrent-benchmarks:test + container_name: codex-agent-3 + entrypoint: [ "poetry", "run", "bittorrent-benchmarks", + "agent", "experiments-codex.local.yaml", "codex_agent", "--port", "9002" ] + environment: + - CODEX_API_URL=http://codex-3:6895 + - NODE_ID=codex-3 + ports: + - "9002:9002" + volumes: codex-volume-1: codex-volume-2: diff --git a/experiments-codex.local.yaml b/experiments-codex.local.yaml index 69b46b2..2e5631d 100644 --- a/experiments-codex.local.yaml +++ b/experiments-codex.local.yaml @@ -1,3 +1,25 @@ +codex_experiment: + seeders: 1 + file_size: 52428800 + repetitions: 3 + + nodes: + - name: codex-1 + address: ${CODEX_NODE_1:-localhost} + disc_port: 6890 + api_port: 6891 + agent_url: http://${CODEX_AGENT_1:-localhost}:9000/ + - name: codex-2 + address: ${CODEX_NODE_2:-localhost} + disc_port: 6892 + api_port: 6893 + agent_url: http://${CODEX_AGENT_1:-localhost}:9002/ + - name: codex-3 + address: ${CODEX_NODE_2:-localhost} + disc_port: 6893 + api_port: 6894 + agent_url: http://${CODEX_AGENT_1:-localhost}:9003/ + codex_agent: codex_api_url: ${CODEX_API_URL} node_id: ${NODE_ID} \ No newline at end of file