diff --git a/benchmarks/cli.py b/benchmarks/cli.py index 8b27435..048bd07 100644 --- a/benchmarks/cli.py +++ b/benchmarks/cli.py @@ -10,6 +10,7 @@ from pydantic import IPvAnyAddress from pydantic_core import ValidationError from typing_extensions import TypeVar +from benchmarks.codex.agent.api import CodexAgentConfig from benchmarks.core.agent import AgentBuilder from benchmarks.core.config import ConfigParser, Builder from benchmarks.core.experiments.experiments import Experiment, ExperimentBuilder @@ -35,6 +36,7 @@ experiment_config_parser.register(DelugeExperimentConfig) agent_config_parser = ConfigParser[AgentBuilder]() agent_config_parser.register(DelugeAgentConfig) +agent_config_parser.register(CodexAgentConfig) log_parser = basic_log_parser() log_parser.register(DelugeTorrentDownload) @@ -125,7 +127,7 @@ def cmd_dump_single_experiment(source: LogSource, group_id: str, experiment_id: def cmd_run_agent(agents: Dict[str, AgentBuilder], args): if args.agent not in agents: - print(f"Agent type {args.experiment} not found.") + print(f"Agent type {args.agent} not found.") sys.exit(-1) uvicorn.run( diff --git a/benchmarks/codex/agent/agent.py b/benchmarks/codex/agent/agent.py index fd65bdc..fd52779 100644 --- a/benchmarks/codex/agent/agent.py +++ b/benchmarks/codex/agent/agent.py @@ -7,10 +7,11 @@ from typing import Optional, Dict from pydantic import BaseModel -from benchmarks.codex.agent.codex_client import CodexClient, Manifest +from benchmarks.codex.client.async_client import AsyncCodexClient + +from benchmarks.codex.client.common import Manifest from benchmarks.codex.logging import CodexDownloadMetric from benchmarks.core.utils.random import random_data -from benchmarks.core.utils.streams import BaseStreamReader Cid = str @@ -23,20 +24,21 @@ class DownloadStatus(BaseModel): downloaded: int total: int + def as_percent(self) -> float: + return (self.downloaded * 100) / self.total + class DownloadHandle: def __init__( self, parent: "CodexAgent", manifest: Manifest, - download_stream: BaseStreamReader, read_increment: float = 0.01, ): self.parent = parent self.manifest = manifest self.bytes_downloaded = 0 self.read_increment = read_increment - self.download_stream = download_stream self.download_task: Optional[Task[None]] = None def begin_download(self) -> Task: @@ -46,34 +48,35 @@ class DownloadHandle: async def _download_loop(self): step_size = int(self.manifest.datasetSize * self.read_increment) - while not self.download_stream.at_eof(): - step = min(step_size, self.manifest.datasetSize - self.bytes_downloaded) - bytes_read = await self.download_stream.read(step) - # We actually have no guarantees that an empty read means EOF, so we just back off - # a bit. - if not bytes_read: - await asyncio.sleep(EMPTY_STREAM_BACKOFF) - self.bytes_downloaded += len(bytes_read) + async with self.parent.client.download(self.manifest.cid) as download_stream: + while not download_stream.at_eof(): + step = min(step_size, self.manifest.datasetSize - self.bytes_downloaded) + bytes_read = await download_stream.read(step) + # We actually have no guarantees that an empty read means EOF, so we just back off + # a bit. + if not bytes_read: + await asyncio.sleep(EMPTY_STREAM_BACKOFF) + self.bytes_downloaded += len(bytes_read) - logger.info( - CodexDownloadMetric( - cid=self.manifest.cid, - value=self.bytes_downloaded, - node=self.parent.node_id, + logger.info( + CodexDownloadMetric( + cid=self.manifest.cid, + value=self.bytes_downloaded, + node=self.parent.node_id, + ) ) - ) - if self.bytes_downloaded < self.manifest.datasetSize: - raise EOFError( - f"Got EOF too early: download size ({self.bytes_downloaded}) was less " - f"than expected ({self.manifest.datasetSize})." - ) + if self.bytes_downloaded < self.manifest.datasetSize: + raise EOFError( + f"Got EOF too early: download size ({self.bytes_downloaded}) was less " + f"than expected ({self.manifest.datasetSize})." + ) - if self.bytes_downloaded > self.manifest.datasetSize: - raise ValueError( - f"Download size ({self.bytes_downloaded}) was greater than expected " - f"({self.manifest.datasetSize})." - ) + if self.bytes_downloaded > self.manifest.datasetSize: + raise ValueError( + f"Download size ({self.bytes_downloaded}) was greater than expected " + f"({self.manifest.datasetSize})." + ) def progress(self) -> DownloadStatus: if self.download_task is None: @@ -89,7 +92,7 @@ class DownloadHandle: class CodexAgent: - def __init__(self, client: CodexClient, node_id: str = "unknown") -> None: + def __init__(self, client: AsyncCodexClient, node_id: str = "unknown") -> None: self.client = client self.node_id = node_id self.ongoing_downloads: Dict[Cid, DownloadHandle] = {} @@ -112,8 +115,7 @@ class CodexAgent: handle = DownloadHandle( self, - manifest=await self.client.get_manifest(cid), - download_stream=await self.client.download(cid), + manifest=await self.client.manifest(cid), read_increment=read_increment, ) diff --git a/benchmarks/codex/agent/api.py b/benchmarks/codex/agent/api.py index ea47df3..878127a 100644 --- a/benchmarks/codex/agent/api.py +++ b/benchmarks/codex/agent/api.py @@ -1,9 +1,14 @@ from typing import Annotated, Optional -from fastapi import APIRouter, Response, Depends, HTTPException, Request +from aiohttp import ClientResponseError +from fastapi import APIRouter, Response, Depends, HTTPException, Request, FastAPI from fastapi.responses import JSONResponse +from pydantic_core import Url +from urllib3.util import parse_url from benchmarks.codex.agent.agent import CodexAgent, DownloadStatus +from benchmarks.codex.client.async_client import AsyncCodexClientImpl +from benchmarks.core.agent import AgentBuilder router = APIRouter() @@ -51,3 +56,36 @@ async def download_status( ) return agent.ongoing_downloads[cid].progress() + + +@router.get("/api/v1/codex/download/node-id") +async def node_id(agent: Annotated[CodexAgent, Depends(codex_agent)]): + return agent.node_id + + +def client_response_error_handler( + _: Request, exception: ClientResponseError +) -> Response: + return JSONResponse( + status_code=exception.status, + content={"message": exception.message}, + ) + + +class CodexAgentConfig(AgentBuilder): + codex_api_url: Url + node_id: str + + def build(self) -> FastAPI: + app = FastAPI() + app.include_router(router) + # Need to disable typing (https://github.com/encode/starlette/discussions/2391) + app.add_exception_handler(ClientResponseError, client_response_error_handler) # type: ignore + agent = CodexAgent( + client=AsyncCodexClientImpl( + codex_api_url=parse_url(str(self.codex_api_url)) + ), + node_id=self.node_id, + ) + app.dependency_overrides[codex_agent] = lambda: agent + return app diff --git a/benchmarks/codex/agent/codex_agent_client.py b/benchmarks/codex/agent/codex_agent_client.py new file mode 100644 index 0000000..50db2e4 --- /dev/null +++ b/benchmarks/codex/agent/codex_agent_client.py @@ -0,0 +1,64 @@ +from urllib3.util import Url, parse_url +import requests +import socket + +from benchmarks.codex.agent.agent import DownloadStatus +from benchmarks.core.experiments.experiments import ExperimentComponent + +from benchmarks.codex.client.common import Cid + + +class CodexAgentClient(ExperimentComponent): + def __init__(self, url: Url): + self.url = url + + def is_ready(self) -> bool: + try: + requests.get(str(self.url._replace(path="/api/v1/hello"))) + return True + except (ConnectionError, socket.gaierror): + return False + + def generate(self, size: int, seed: int, name: str) -> Cid: + response = requests.post( + url=self.url._replace(path="/api/v1/codex/dataset").url, + params={ + "size": str(size), + "seed": str(seed), + "name": name, + }, + ) + + response.raise_for_status() + + return response.text + + def download(self, cid: str) -> Url: + response = requests.post( + url=self.url._replace(path="/api/v1/codex/download").url, + params={ + "cid": cid, + }, + ) + + response.raise_for_status() + + return parse_url(response.json()["status"]) + + def download_status(self, cid: str) -> DownloadStatus: + response = requests.get( + url=self.url._replace(path=f"/api/v1/codex/download/{cid}/status").url, + ) + + response.raise_for_status() + + return DownloadStatus.model_validate_json(response.json()["status"]) + + def node_id(self) -> str: + response = requests.get( + url=self.url._replace(path="/api/v1/codex/download/node-id").url, + ) + + response.raise_for_status() + + return response.text diff --git a/benchmarks/codex/agent/tests/fixtures.py b/benchmarks/codex/agent/tests/fixtures.py deleted file mode 100644 index 6bf679a..0000000 --- a/benchmarks/codex/agent/tests/fixtures.py +++ /dev/null @@ -1,14 +0,0 @@ -import os - -import pytest -from urllib3.util import parse_url - -from benchmarks.codex.agent.codex_client import CodexClientImpl - - -@pytest.fixture -def codex_client_1(): - # TODO wipe data between tests - return CodexClientImpl( - parse_url(f"http://{os.environ.get('CODEX_NODE_1', 'localhost')}:8091") - ) diff --git a/benchmarks/codex/agent/tests/test_api.py b/benchmarks/codex/agent/tests/test_api.py index d0645ef..34d5c54 100644 --- a/benchmarks/codex/agent/tests/test_api.py +++ b/benchmarks/codex/agent/tests/test_api.py @@ -27,7 +27,7 @@ async def test_should_create_file(): assert response.status_code == 200 assert response.charset_encoding == "utf-8" - manifest = await codex_client.get_manifest(response.text) + manifest = await codex_client.manifest(response.text) assert manifest.datasetSize == 1024 diff --git a/benchmarks/codex/agent/tests/test_codex_agent.py b/benchmarks/codex/agent/tests/test_codex_agent.py index 8afb270..c557759 100644 --- a/benchmarks/codex/agent/tests/test_codex_agent.py +++ b/benchmarks/codex/agent/tests/test_codex_agent.py @@ -1,19 +1,21 @@ from asyncio import StreamReader +from contextlib import asynccontextmanager from io import StringIO -from typing import IO, Dict +from typing import IO, Dict, AsyncIterator from unittest.mock import patch import pytest from benchmarks.codex.agent.agent import CodexAgent, DownloadStatus -from benchmarks.codex.agent.codex_client import CodexClient, Cid, Manifest +from benchmarks.codex.client.async_client import AsyncCodexClient +from benchmarks.codex.client.common import Manifest, Cid from benchmarks.codex.logging import CodexDownloadMetric from benchmarks.core.concurrency import await_predicate_async from benchmarks.core.utils.streams import BaseStreamReader from benchmarks.logging.logging import LogParser -class FakeCodexClient(CodexClient): +class FakeCodexClient(AsyncCodexClient): def __init__(self) -> None: self.storage: Dict[Cid, Manifest] = {} self.streams: Dict[Cid, StreamReader] = {} @@ -33,7 +35,7 @@ class FakeCodexClient(CodexClient): ) return cid - async def get_manifest(self, cid: Cid) -> Manifest: + async def manifest(self, cid: Cid) -> Manifest: return self.storage[cid] def create_download_stream(self, cid: Cid) -> StreamReader: @@ -41,15 +43,16 @@ class FakeCodexClient(CodexClient): self.streams[cid] = reader return reader - async def download(self, cid: Cid) -> BaseStreamReader: - return self.streams[cid] + @asynccontextmanager + async def download(self, cid: Cid) -> AsyncIterator[BaseStreamReader]: + yield self.streams[cid] @pytest.mark.asyncio async def test_should_create_dataset_of_right_size(): codex_agent = CodexAgent(FakeCodexClient()) cid = await codex_agent.create_dataset(size=1024, name="dataset-1", seed=1234) - manifest = await codex_agent.client.get_manifest(cid) + manifest = await codex_agent.client.manifest(cid) assert manifest.datasetSize == 1024 diff --git a/benchmarks/codex/client/__init__.py b/benchmarks/codex/client/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/benchmarks/codex/agent/codex_client.py b/benchmarks/codex/client/async_client.py similarity index 69% rename from benchmarks/codex/agent/codex_client.py rename to benchmarks/codex/client/async_client.py index c55efe0..579aa2b 100644 --- a/benchmarks/codex/agent/codex_client.py +++ b/benchmarks/codex/client/async_client.py @@ -1,43 +1,33 @@ +"""Async Client implementation for the base Codex API.""" + from abc import ABC, abstractmethod -from typing import IO + +from contextlib import asynccontextmanager +from typing import IO, AsyncIterator, AsyncGenerator import aiohttp -from pydantic import BaseModel from urllib3.util import Url +from benchmarks.codex.client.common import Manifest, Cid from benchmarks.core.utils.streams import BaseStreamReader -API_VERSION = "v1" -Cid = str - - -class Manifest(BaseModel): - cid: Cid - treeCid: Cid - datasetSize: int - blockSize: int - filename: str - mimetype: str - uploadedAt: int - protected: bool - - -class CodexClient(ABC): +class AsyncCodexClient(ABC): @abstractmethod async def upload(self, name: str, mime_type: str, content: IO) -> Cid: pass @abstractmethod - async def get_manifest(self, cid: Cid) -> Manifest: + async def manifest(self, cid: Cid) -> Manifest: pass + @asynccontextmanager @abstractmethod - async def download(self, cid: Cid) -> BaseStreamReader: + def download(self, cid: Cid) -> AsyncGenerator[BaseStreamReader, None]: pass -class CodexClientImpl(CodexClient): +class AsyncCodexClientImpl(AsyncCodexClient): """A lightweight async wrapper built around the Codex REST API.""" def __init__(self, codex_api_url: Url): @@ -58,7 +48,7 @@ class CodexClientImpl(CodexClient): return await response.text() - async def get_manifest(self, cid: Cid) -> Manifest: + async def manifest(self, cid: Cid) -> Manifest: async with aiohttp.ClientSession() as session: response = await session.get( self.codex_api_url._replace( @@ -73,14 +63,13 @@ class CodexClientImpl(CodexClient): return Manifest.model_validate(dict(cid=cid, **response_contents["manifest"])) - async def download(self, cid: Cid) -> BaseStreamReader: + @asynccontextmanager + async def download(self, cid: Cid) -> AsyncIterator[BaseStreamReader]: async with aiohttp.ClientSession() as session: response = await session.get( - self.codex_api_url._replace( - path=f"/api/codex/v1/data/{cid}/network/download" - ).url, + self.codex_api_url._replace(path=f"/api/codex/v1/data/{cid}").url, ) response.raise_for_status() - return response.content + yield response.content diff --git a/benchmarks/codex/client/common.py b/benchmarks/codex/client/common.py new file mode 100644 index 0000000..7dff89b --- /dev/null +++ b/benchmarks/codex/client/common.py @@ -0,0 +1,16 @@ +from pydantic import BaseModel + +API_VERSION = "v1" + +Cid = str + + +class Manifest(BaseModel): + cid: Cid + treeCid: Cid + datasetSize: int + blockSize: int + filename: str + mimetype: str + uploadedAt: int + protected: bool diff --git a/benchmarks/codex/client/tests/__init__.py b/benchmarks/codex/client/tests/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/benchmarks/codex/client/tests/__init__.py @@ -0,0 +1 @@ + diff --git a/benchmarks/codex/client/tests/test_async_client.py b/benchmarks/codex/client/tests/test_async_client.py new file mode 100644 index 0000000..76f23bb --- /dev/null +++ b/benchmarks/codex/client/tests/test_async_client.py @@ -0,0 +1,38 @@ +from io import BytesIO + +import pytest +from urllib3.util import parse_url + +from benchmarks.codex.client.async_client import AsyncCodexClientImpl +from benchmarks.core.utils.random import random_data +from benchmarks.core.utils.units import megabytes + + +@pytest.mark.codex_integration +@pytest.mark.asyncio +async def test_should_upload_file(codex_node_1_url: str): + client = AsyncCodexClientImpl(parse_url(codex_node_1_url)) + + data = BytesIO() + random_data(megabytes(1), data) + + cid = client.upload("test.txt", "application/octet-stream", data) + assert cid is not None + + +@pytest.mark.codex_integration +@pytest.mark.asyncio +async def test_should_download_file(codex_node_1_url: str): + client = AsyncCodexClientImpl(parse_url(codex_node_1_url)) + + buff = BytesIO() + random_data(megabytes(1), buff) + data = buff.getvalue() + + cid = await client.upload("test.txt", "application/octet-stream", BytesIO(data)) + assert cid is not None + + async with client.download(cid) as content: + downloaded = await content.readexactly(megabytes(1)) + + assert downloaded == data diff --git a/benchmarks/codex/codex_node.py b/benchmarks/codex/codex_node.py new file mode 100644 index 0000000..6e9d8af --- /dev/null +++ b/benchmarks/codex/codex_node.py @@ -0,0 +1,92 @@ +import logging +import socket +from functools import cached_property +from urllib.error import HTTPError + +import requests +from attr import dataclass +from tenacity import ( + stop_after_attempt, + wait_exponential, + retry, + retry_if_not_exception_type, +) +from urllib3.util import Url + +from benchmarks.codex.agent.agent import Cid, DownloadStatus +from benchmarks.codex.agent.codex_agent_client import CodexAgentClient +from benchmarks.core.concurrency import await_predicate +from benchmarks.core.experiments.experiments import ExperimentComponent +from benchmarks.core.network import Node, DownloadHandle + +STOP_POLICY = stop_after_attempt(5) +WAIT_POLICY = wait_exponential(exp_base=2, min=4, max=16) + +logger = logging.getLogger(__name__) + + +@dataclass +class CodexMeta: + name: str + + +class CodexNode(Node[Cid, CodexMeta], ExperimentComponent): + def __init__(self, codex_api_url: Url, agent: CodexAgentClient): + self.codex_api_url = codex_api_url + self.agent = agent + + def is_ready(self) -> bool: + try: + requests.get( + str(self.codex_api_url._replace(path="/api/codex/v1/debug/info")) + ) + return True + except (ConnectionError, socket.gaierror): + return False + + @retry( + stop=STOP_POLICY, + wait=WAIT_POLICY, + retry=retry_if_not_exception_type(HTTPError), + ) + def genseed(self, size: int, seed: int, meta: CodexMeta) -> Cid: + return self.agent.generate(size=size, seed=seed, name=meta.name) + + @retry( + stop=STOP_POLICY, + wait=WAIT_POLICY, + retry=retry_if_not_exception_type(HTTPError), + ) + def leech(self, handle: Cid) -> DownloadHandle: + return CodexDownloadHandle(parent=self, monitor_url=self.agent.download(handle)) + + def remove(self, handle: Cid) -> bool: + logger.warning("Removing a file from Codex is not currently supported.") + return False + + @cached_property + def name(self) -> str: + return self.agent.node_id() + + +class CodexDownloadHandle(DownloadHandle): + def __init__(self, parent: CodexNode, monitor_url: Url): + self.monitor_url = monitor_url + self.parent = parent + + def await_for_completion(self, timeout: float = 0) -> bool: + def _predicate(): + completion = self.completion() + return completion.downloaded == completion.total + + return await_predicate(_predicate, timeout) + + @property + def node(self) -> Node: + return self.parent + + def completion(self) -> DownloadStatus: + response = requests.get(str(self.monitor_url)) + response.raise_for_status() + + return DownloadStatus.model_validate(response.json()) diff --git a/benchmarks/codex/tests/fixtures/__init__.py b/benchmarks/codex/tests/fixtures/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/benchmarks/codex/tests/fixtures/addresses.py b/benchmarks/codex/tests/fixtures/addresses.py new file mode 100644 index 0000000..92e50df --- /dev/null +++ b/benchmarks/codex/tests/fixtures/addresses.py @@ -0,0 +1,22 @@ +import pytest +import os + + +@pytest.fixture +def codex_node_1_url() -> str: + return f"http://{os.environ.get('CODEX_NODE_1', 'localhost')}:6891" + + +@pytest.fixture +def codex_node_2_url() -> str: + return f"http://{os.environ.get('CODEX_NODE_2', 'localhost')}:6893" + + +@pytest.fixture +def codex_agent_1_url() -> str: + return f"http://{os.environ.get('CODEX_AGENT_1', 'localhost')}:9000" + + +@pytest.fixture +def codex_agent_2_url() -> str: + return f"http://{os.environ.get('CODEX_AGENT_2', 'localhost')}:9001" diff --git a/benchmarks/codex/tests/fixtures/fixtures.py b/benchmarks/codex/tests/fixtures/fixtures.py new file mode 100644 index 0000000..729e1b9 --- /dev/null +++ b/benchmarks/codex/tests/fixtures/fixtures.py @@ -0,0 +1,27 @@ +from urllib3.util import parse_url + +from benchmarks.codex.agent.codex_agent_client import CodexAgentClient +from benchmarks.codex.codex_node import CodexNode +from benchmarks.core.concurrency import await_predicate + +import pytest + + +def codex_node(codex_api_url: str, agent_url: str) -> CodexNode: + node = CodexNode( + codex_api_url=parse_url(codex_api_url), + agent=CodexAgentClient(parse_url(agent_url)), + ) + assert await_predicate(node.is_ready, timeout=10, polling_interval=0.5) + # TODO wipe datasets once have support in codex for doing so. + return node + + +@pytest.fixture +def codex_node1(codex_node_1_url: str, codex_agent_1_url: str) -> CodexNode: + return codex_node(codex_node_1_url, codex_agent_1_url) + + +@pytest.fixture +def codex_node2(codex_node_2_url: str, codex_agent_2_url: str) -> CodexNode: + return codex_node(codex_node_2_url, codex_agent_2_url) diff --git a/benchmarks/codex/agent/tests/test_codex_client.py b/benchmarks/codex/tests/test_codex_client.py similarity index 61% rename from benchmarks/codex/agent/tests/test_codex_client.py rename to benchmarks/codex/tests/test_codex_client.py index a8a0f51..f5940af 100644 --- a/benchmarks/codex/agent/tests/test_codex_client.py +++ b/benchmarks/codex/tests/test_codex_client.py @@ -1,7 +1,9 @@ from io import BytesIO import pytest +from urllib3.util import parse_url +from benchmarks.codex.client.async_client import AsyncCodexClientImpl from benchmarks.core.utils.random import random_data @@ -15,14 +17,15 @@ def random_file() -> BytesIO: @pytest.mark.codex_integration @pytest.mark.asyncio -async def test_should_upload_file(codex_client_1, random_file): - cid = await codex_client_1.upload( +async def test_should_upload_file(codex_node_1_url: str, random_file): + codex_client = AsyncCodexClientImpl(parse_url(codex_node_1_url)) + cid = await codex_client.upload( content=random_file, name="dataset-1", mime_type="application/octet-stream" ) assert cid is not None - manifest = await codex_client_1.get_manifest(cid) + manifest = await codex_client.manifest(cid) assert manifest.cid == cid assert manifest.datasetSize == 2048 diff --git a/benchmarks/codex/tests/test_codex_node.py b/benchmarks/codex/tests/test_codex_node.py new file mode 100644 index 0000000..f0a000a --- /dev/null +++ b/benchmarks/codex/tests/test_codex_node.py @@ -0,0 +1,22 @@ +from typing import cast + +import pytest + +from benchmarks.codex.agent.agent import DownloadStatus +from benchmarks.codex.codex_node import CodexMeta, CodexNode, CodexDownloadHandle +from benchmarks.core.utils.units import megabytes + + +@pytest.mark.codex_integration +def test_should_download_file(codex_node1: CodexNode, codex_node2: CodexNode): + cid = codex_node1.genseed( + size=megabytes(1), + seed=1234, + meta=CodexMeta(name="dataset1"), + ) + handle = codex_node2.leech(cid) + + assert handle.await_for_completion(5) + assert cast(CodexDownloadHandle, handle).completion() == DownloadStatus( + downloaded=megabytes(1), total=megabytes(1) + ) diff --git a/benchmarks/conftest.py b/benchmarks/conftest.py index c2477da..5c8d587 100644 --- a/benchmarks/conftest.py +++ b/benchmarks/conftest.py @@ -3,4 +3,5 @@ from benchmarks.deluge.tests.fixtures import * from benchmarks.core.tests.fixtures import * from benchmarks.logging.sources.tests.fixtures import * -from benchmarks.codex.agent.tests.fixtures import * +from benchmarks.codex.tests.fixtures.addresses import * +from benchmarks.codex.tests.fixtures.fixtures import * diff --git a/benchmarks/core/utils/streams.py b/benchmarks/core/utils/streams.py index 798a71a..e237661 100644 --- a/benchmarks/core/utils/streams.py +++ b/benchmarks/core/utils/streams.py @@ -4,6 +4,8 @@ from typing import Protocol class BaseStreamReader(Protocol): async def read(self, n: int) -> bytes: ... + async def readexactly(self, n: int) -> bytes: ... + def feed_data(self, data: bytes) -> None: ... def at_eof(self) -> bool: ... diff --git a/benchmarks/deluge/agent/client.py b/benchmarks/deluge/agent/deluge_agent_client.py similarity index 100% rename from benchmarks/deluge/agent/client.py rename to benchmarks/deluge/agent/deluge_agent_client.py diff --git a/benchmarks/deluge/config.py b/benchmarks/deluge/config.py index b689dc4..adc305b 100644 --- a/benchmarks/deluge/config.py +++ b/benchmarks/deluge/config.py @@ -17,7 +17,7 @@ from benchmarks.core.experiments.static_experiment import StaticDisseminationExp from benchmarks.core.pydantic import Host from benchmarks.core.utils.random import sample -from benchmarks.deluge.agent.client import DelugeAgentClient +from benchmarks.deluge.agent.deluge_agent_client import DelugeAgentClient from benchmarks.deluge.deluge_node import DelugeMeta, DelugeNode from benchmarks.deluge.tracker import Tracker diff --git a/benchmarks/deluge/deluge_node.py b/benchmarks/deluge/deluge_node.py index 1f72498..192818e 100644 --- a/benchmarks/deluge/deluge_node.py +++ b/benchmarks/deluge/deluge_node.py @@ -26,7 +26,7 @@ from benchmarks.core.concurrency import await_predicate from benchmarks.core.experiments.experiments import ExperimentComponent from benchmarks.core.network import DownloadHandle, Node -from benchmarks.deluge.agent.client import DelugeAgentClient +from benchmarks.deluge.agent.deluge_agent_client import DelugeAgentClient logger = logging.getLogger(__name__) diff --git a/benchmarks/deluge/tests/fixtures.py b/benchmarks/deluge/tests/fixtures.py index cee4acc..2e8ad0d 100644 --- a/benchmarks/deluge/tests/fixtures.py +++ b/benchmarks/deluge/tests/fixtures.py @@ -6,7 +6,7 @@ import pytest from urllib3.util import parse_url from benchmarks.core.concurrency import await_predicate -from benchmarks.deluge.agent.client import DelugeAgentClient +from benchmarks.deluge.agent.deluge_agent_client import DelugeAgentClient from benchmarks.deluge.deluge_node import DelugeNode from benchmarks.deluge.tracker import Tracker diff --git a/docker-compose-codex.ci.yaml b/docker-compose-codex.ci.yaml index 667cfb5..ef53023 100644 --- a/docker-compose-codex.ci.yaml +++ b/docker-compose-codex.ci.yaml @@ -6,6 +6,9 @@ services: entrypoint: [ "poetry", "run", "pytest", "-m", "codex_integration", "--exitfirst" ] environment: - CODEX_NODE_1=codex-1 + - CODEX_NODE_2=codex-2 + - CODEX_AGENT_1=codex-agent-1 + - CODEX_AGENT_2=codex-agent-2 depends_on: clean-volumes: condition: service_healthy diff --git a/docker-compose-codex.local.yaml b/docker-compose-codex.local.yaml index a442393..9a2d231 100644 --- a/docker-compose-codex.local.yaml +++ b/docker-compose-codex.local.yaml @@ -6,13 +6,15 @@ services: - /bin/sh - -c - | + echo "Cleaning data dirs..." rm -rf /var/lib/codex1/* /var/lib/codex2/* /var/lib/codex3/* touch /.done + echo "done." sleep infinity volumes: - - codex-volume-1:/var/lib/deluge1 - - codex-volume-2:/var/lib/deluge2 - - codex-volume-3:/var/lib/deluge3 + - codex-volume-1:/var/lib/codex1 + - codex-volume-2:/var/lib/codex2 + - codex-volume-3:/var/lib/codex3 healthcheck: timeout: 10s test: [ "CMD", "test", "-f", "/.done" ] @@ -24,17 +26,55 @@ services: container_name: codex-1 environment: - CODEX_LOG_LEVEL=DEBUG - - CODEX_DATA_DIR=/var/lib/codex - - CODEX_DISC_PORT=8090 + - CODEX_DISC_PORT=6890 - CODEX_API_BINDADDR=0.0.0.0 - - CODEX_API_PORT=8091 + - CODEX_API_PORT=6891 - CODEX_STORAGE_QUOTA=1073741824 # 1GB - + - NAT_IP_AUTO=true volumes: - codex-volume-1:/var/lib/codex ports: - - "8090-8091:8090-8091" + - "6890-6891:6890-6891" + + codex-agent-1: + image: bittorrent-benchmarks:test + container_name: codex-agent-1 + entrypoint: [ "poetry", "run", "bittorrent-benchmarks", + "agent", "experiments-codex.local.yaml", "codex_agent", "--port", "9000" ] + environment: + - CODEX_API_URL=http://codex-1:6891 + - NODE_ID=codex-1 + ports: + - "9000:9000" + + codex-2: + image: codexstorage/nim-codex:latest + container_name: codex-2 + environment: + - CODEX_LOG_LEVEL=DEBUG + - CODEX_DATA_DIR=/var/lib/codex + - CODEX_DISC_PORT=6892 + - CODEX_API_BINDADDR=0.0.0.0 + - CODEX_API_PORT=6893 + - CODEX_STORAGE_QUOTA=1073741824 # 1GB + - BOOTSTRAP_NODE_URL=http://codex-1:6891 + - NAT_IP_AUTO=true + volumes: + - codex-volume-2:/var/lib/codex + ports: + - "6892-6893:6892-6893" + + codex-agent-2: + image: bittorrent-benchmarks:test + container_name: codex-agent-2 + entrypoint: [ "poetry", "run", "bittorrent-benchmarks", + "agent", "experiments-codex.local.yaml", "codex_agent", "--port", "9001" ] + environment: + - CODEX_API_URL=http://codex-2:6893 + - NODE_ID=codex-2 + ports: + - "9001:9001" volumes: codex-volume-1: diff --git a/docker-compose-deluge.local.yaml b/docker-compose-deluge.local.yaml index 3716b42..ad93003 100644 --- a/docker-compose-deluge.local.yaml +++ b/docker-compose-deluge.local.yaml @@ -43,7 +43,7 @@ services: image: bittorrent-benchmarks:test container_name: agent-1 entrypoint: [ "poetry", "run", "bittorrent-benchmarks", - "agent", "experiments.local.yaml", "deluge_agent", "--port", "9001" ] + "agent", "experiments-deluge.local.yaml", "deluge_agent", "--port", "9001" ] environment: - TORRENTS_ROOT=/var/lib/deluge/downloads volumes: @@ -72,7 +72,7 @@ services: image: bittorrent-benchmarks:test container_name: agent-2 entrypoint: [ "poetry", "run", "bittorrent-benchmarks", "agent", - "experiments.local.yaml", "deluge_agent", "--port", "9002" ] + "experiments-deluge.local.yaml", "deluge_agent", "--port", "9002" ] environment: - TORRENTS_ROOT=/var/lib/deluge/downloads volumes: @@ -100,7 +100,7 @@ services: deluge-agent-3: image: bittorrent-benchmarks:test container_name: agent-3 - entrypoint: [ "poetry", "run", "bittorrent-benchmarks", "agent", "experiments.local.yaml", + entrypoint: [ "poetry", "run", "bittorrent-benchmarks", "agent", "experiments-deluge.local.yaml", "deluge_agent", "--port", "9003" ] environment: - TORRENTS_ROOT=/var/lib/deluge/downloads diff --git a/docker/bittorrent-benchmarks.Dockerfile b/docker/bittorrent-benchmarks.Dockerfile index de18331..8c3faf8 100644 --- a/docker/bittorrent-benchmarks.Dockerfile +++ b/docker/bittorrent-benchmarks.Dockerfile @@ -19,4 +19,5 @@ RUN if [ "$BUILD_TYPE" = "release" ]; then \ COPY . . RUN poetry install --only main -ENTRYPOINT ["poetry", "run", "bittorrent-benchmarks", "experiments", "/opt/bittorrent-benchmarks/experiments.k8s.yaml"] +ENTRYPOINT ["poetry", "run", "bittorrent-benchmarks", "experiments", \ + "/opt/bittorrent-benchmarks/experiments-deluge.k8s.yaml"] diff --git a/experiments-codex.local.yaml b/experiments-codex.local.yaml new file mode 100644 index 0000000..69b46b2 --- /dev/null +++ b/experiments-codex.local.yaml @@ -0,0 +1,3 @@ +codex_agent: + codex_api_url: ${CODEX_API_URL} + node_id: ${NODE_ID} \ No newline at end of file diff --git a/experiments.k8s.yaml b/experiments-deluge.k8s.yaml similarity index 100% rename from experiments.k8s.yaml rename to experiments-deluge.k8s.yaml diff --git a/experiments.local.yaml b/experiments-deluge.local.yaml similarity index 99% rename from experiments.local.yaml rename to experiments-deluge.local.yaml index d7ff6b5..9f6d58e 100644 --- a/experiments.local.yaml +++ b/experiments-deluge.local.yaml @@ -27,3 +27,4 @@ deluge_experiment: deluge_agent: torrents_path: /var/lib/deluge/downloads + diff --git a/k8s/charts/deluge/templates/deluge-statefulset.yaml b/k8s/charts/deluge/templates/deluge-statefulset.yaml index 9c054bc..37a6fc7 100644 --- a/k8s/charts/deluge/templates/deluge-statefulset.yaml +++ b/k8s/charts/deluge/templates/deluge-statefulset.yaml @@ -76,7 +76,7 @@ spec: imagePullPolicy: {{ include "benchmark.harness.imagePullPolicy" . }} command: [ "poetry", "run", "bittorrent-benchmarks", - "agent", "experiments.k8s.yaml", "deluge_agent", "--port", "9001" + "agent", "experiments-deluge.k8s.yaml", "deluge_agent", "--port", "9001" ] ports: - containerPort: 9001