mirror of
https://github.com/logos-storage/bittorrent-benchmarks.git
synced 2026-01-02 13:03:13 +00:00
feat: add Codex node and initial integration tests
This commit is contained in:
parent
820699f001
commit
74ee71889e
@ -10,6 +10,7 @@ from pydantic import IPvAnyAddress
|
||||
from pydantic_core import ValidationError
|
||||
from typing_extensions import TypeVar
|
||||
|
||||
from benchmarks.codex.agent.api import CodexAgentConfig
|
||||
from benchmarks.core.agent import AgentBuilder
|
||||
from benchmarks.core.config import ConfigParser, Builder
|
||||
from benchmarks.core.experiments.experiments import Experiment, ExperimentBuilder
|
||||
@ -35,6 +36,7 @@ experiment_config_parser.register(DelugeExperimentConfig)
|
||||
|
||||
agent_config_parser = ConfigParser[AgentBuilder]()
|
||||
agent_config_parser.register(DelugeAgentConfig)
|
||||
agent_config_parser.register(CodexAgentConfig)
|
||||
|
||||
log_parser = basic_log_parser()
|
||||
log_parser.register(DelugeTorrentDownload)
|
||||
@ -125,7 +127,7 @@ def cmd_dump_single_experiment(source: LogSource, group_id: str, experiment_id:
|
||||
|
||||
def cmd_run_agent(agents: Dict[str, AgentBuilder], args):
|
||||
if args.agent not in agents:
|
||||
print(f"Agent type {args.experiment} not found.")
|
||||
print(f"Agent type {args.agent} not found.")
|
||||
sys.exit(-1)
|
||||
|
||||
uvicorn.run(
|
||||
|
||||
@ -7,10 +7,11 @@ from typing import Optional, Dict
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from benchmarks.codex.agent.codex_client import CodexClient, Manifest
|
||||
from benchmarks.codex.client.async_client import AsyncCodexClient
|
||||
|
||||
from benchmarks.codex.client.common import Manifest
|
||||
from benchmarks.codex.logging import CodexDownloadMetric
|
||||
from benchmarks.core.utils.random import random_data
|
||||
from benchmarks.core.utils.streams import BaseStreamReader
|
||||
|
||||
Cid = str
|
||||
|
||||
@ -23,20 +24,21 @@ class DownloadStatus(BaseModel):
|
||||
downloaded: int
|
||||
total: int
|
||||
|
||||
def as_percent(self) -> float:
|
||||
return (self.downloaded * 100) / self.total
|
||||
|
||||
|
||||
class DownloadHandle:
|
||||
def __init__(
|
||||
self,
|
||||
parent: "CodexAgent",
|
||||
manifest: Manifest,
|
||||
download_stream: BaseStreamReader,
|
||||
read_increment: float = 0.01,
|
||||
):
|
||||
self.parent = parent
|
||||
self.manifest = manifest
|
||||
self.bytes_downloaded = 0
|
||||
self.read_increment = read_increment
|
||||
self.download_stream = download_stream
|
||||
self.download_task: Optional[Task[None]] = None
|
||||
|
||||
def begin_download(self) -> Task:
|
||||
@ -46,34 +48,35 @@ class DownloadHandle:
|
||||
async def _download_loop(self):
|
||||
step_size = int(self.manifest.datasetSize * self.read_increment)
|
||||
|
||||
while not self.download_stream.at_eof():
|
||||
step = min(step_size, self.manifest.datasetSize - self.bytes_downloaded)
|
||||
bytes_read = await self.download_stream.read(step)
|
||||
# We actually have no guarantees that an empty read means EOF, so we just back off
|
||||
# a bit.
|
||||
if not bytes_read:
|
||||
await asyncio.sleep(EMPTY_STREAM_BACKOFF)
|
||||
self.bytes_downloaded += len(bytes_read)
|
||||
async with self.parent.client.download(self.manifest.cid) as download_stream:
|
||||
while not download_stream.at_eof():
|
||||
step = min(step_size, self.manifest.datasetSize - self.bytes_downloaded)
|
||||
bytes_read = await download_stream.read(step)
|
||||
# We actually have no guarantees that an empty read means EOF, so we just back off
|
||||
# a bit.
|
||||
if not bytes_read:
|
||||
await asyncio.sleep(EMPTY_STREAM_BACKOFF)
|
||||
self.bytes_downloaded += len(bytes_read)
|
||||
|
||||
logger.info(
|
||||
CodexDownloadMetric(
|
||||
cid=self.manifest.cid,
|
||||
value=self.bytes_downloaded,
|
||||
node=self.parent.node_id,
|
||||
logger.info(
|
||||
CodexDownloadMetric(
|
||||
cid=self.manifest.cid,
|
||||
value=self.bytes_downloaded,
|
||||
node=self.parent.node_id,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
if self.bytes_downloaded < self.manifest.datasetSize:
|
||||
raise EOFError(
|
||||
f"Got EOF too early: download size ({self.bytes_downloaded}) was less "
|
||||
f"than expected ({self.manifest.datasetSize})."
|
||||
)
|
||||
if self.bytes_downloaded < self.manifest.datasetSize:
|
||||
raise EOFError(
|
||||
f"Got EOF too early: download size ({self.bytes_downloaded}) was less "
|
||||
f"than expected ({self.manifest.datasetSize})."
|
||||
)
|
||||
|
||||
if self.bytes_downloaded > self.manifest.datasetSize:
|
||||
raise ValueError(
|
||||
f"Download size ({self.bytes_downloaded}) was greater than expected "
|
||||
f"({self.manifest.datasetSize})."
|
||||
)
|
||||
if self.bytes_downloaded > self.manifest.datasetSize:
|
||||
raise ValueError(
|
||||
f"Download size ({self.bytes_downloaded}) was greater than expected "
|
||||
f"({self.manifest.datasetSize})."
|
||||
)
|
||||
|
||||
def progress(self) -> DownloadStatus:
|
||||
if self.download_task is None:
|
||||
@ -89,7 +92,7 @@ class DownloadHandle:
|
||||
|
||||
|
||||
class CodexAgent:
|
||||
def __init__(self, client: CodexClient, node_id: str = "unknown") -> None:
|
||||
def __init__(self, client: AsyncCodexClient, node_id: str = "unknown") -> None:
|
||||
self.client = client
|
||||
self.node_id = node_id
|
||||
self.ongoing_downloads: Dict[Cid, DownloadHandle] = {}
|
||||
@ -112,8 +115,7 @@ class CodexAgent:
|
||||
|
||||
handle = DownloadHandle(
|
||||
self,
|
||||
manifest=await self.client.get_manifest(cid),
|
||||
download_stream=await self.client.download(cid),
|
||||
manifest=await self.client.manifest(cid),
|
||||
read_increment=read_increment,
|
||||
)
|
||||
|
||||
|
||||
@ -1,9 +1,14 @@
|
||||
from typing import Annotated, Optional
|
||||
|
||||
from fastapi import APIRouter, Response, Depends, HTTPException, Request
|
||||
from aiohttp import ClientResponseError
|
||||
from fastapi import APIRouter, Response, Depends, HTTPException, Request, FastAPI
|
||||
from fastapi.responses import JSONResponse
|
||||
from pydantic_core import Url
|
||||
from urllib3.util import parse_url
|
||||
|
||||
from benchmarks.codex.agent.agent import CodexAgent, DownloadStatus
|
||||
from benchmarks.codex.client.async_client import AsyncCodexClientImpl
|
||||
from benchmarks.core.agent import AgentBuilder
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@ -51,3 +56,36 @@ async def download_status(
|
||||
)
|
||||
|
||||
return agent.ongoing_downloads[cid].progress()
|
||||
|
||||
|
||||
@router.get("/api/v1/codex/download/node-id")
|
||||
async def node_id(agent: Annotated[CodexAgent, Depends(codex_agent)]):
|
||||
return agent.node_id
|
||||
|
||||
|
||||
def client_response_error_handler(
|
||||
_: Request, exception: ClientResponseError
|
||||
) -> Response:
|
||||
return JSONResponse(
|
||||
status_code=exception.status,
|
||||
content={"message": exception.message},
|
||||
)
|
||||
|
||||
|
||||
class CodexAgentConfig(AgentBuilder):
|
||||
codex_api_url: Url
|
||||
node_id: str
|
||||
|
||||
def build(self) -> FastAPI:
|
||||
app = FastAPI()
|
||||
app.include_router(router)
|
||||
# Need to disable typing (https://github.com/encode/starlette/discussions/2391)
|
||||
app.add_exception_handler(ClientResponseError, client_response_error_handler) # type: ignore
|
||||
agent = CodexAgent(
|
||||
client=AsyncCodexClientImpl(
|
||||
codex_api_url=parse_url(str(self.codex_api_url))
|
||||
),
|
||||
node_id=self.node_id,
|
||||
)
|
||||
app.dependency_overrides[codex_agent] = lambda: agent
|
||||
return app
|
||||
|
||||
64
benchmarks/codex/agent/codex_agent_client.py
Normal file
64
benchmarks/codex/agent/codex_agent_client.py
Normal file
@ -0,0 +1,64 @@
|
||||
from urllib3.util import Url, parse_url
|
||||
import requests
|
||||
import socket
|
||||
|
||||
from benchmarks.codex.agent.agent import DownloadStatus
|
||||
from benchmarks.core.experiments.experiments import ExperimentComponent
|
||||
|
||||
from benchmarks.codex.client.common import Cid
|
||||
|
||||
|
||||
class CodexAgentClient(ExperimentComponent):
|
||||
def __init__(self, url: Url):
|
||||
self.url = url
|
||||
|
||||
def is_ready(self) -> bool:
|
||||
try:
|
||||
requests.get(str(self.url._replace(path="/api/v1/hello")))
|
||||
return True
|
||||
except (ConnectionError, socket.gaierror):
|
||||
return False
|
||||
|
||||
def generate(self, size: int, seed: int, name: str) -> Cid:
|
||||
response = requests.post(
|
||||
url=self.url._replace(path="/api/v1/codex/dataset").url,
|
||||
params={
|
||||
"size": str(size),
|
||||
"seed": str(seed),
|
||||
"name": name,
|
||||
},
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
return response.text
|
||||
|
||||
def download(self, cid: str) -> Url:
|
||||
response = requests.post(
|
||||
url=self.url._replace(path="/api/v1/codex/download").url,
|
||||
params={
|
||||
"cid": cid,
|
||||
},
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
return parse_url(response.json()["status"])
|
||||
|
||||
def download_status(self, cid: str) -> DownloadStatus:
|
||||
response = requests.get(
|
||||
url=self.url._replace(path=f"/api/v1/codex/download/{cid}/status").url,
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
return DownloadStatus.model_validate_json(response.json()["status"])
|
||||
|
||||
def node_id(self) -> str:
|
||||
response = requests.get(
|
||||
url=self.url._replace(path="/api/v1/codex/download/node-id").url,
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
return response.text
|
||||
@ -1,14 +0,0 @@
|
||||
import os
|
||||
|
||||
import pytest
|
||||
from urllib3.util import parse_url
|
||||
|
||||
from benchmarks.codex.agent.codex_client import CodexClientImpl
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def codex_client_1():
|
||||
# TODO wipe data between tests
|
||||
return CodexClientImpl(
|
||||
parse_url(f"http://{os.environ.get('CODEX_NODE_1', 'localhost')}:8091")
|
||||
)
|
||||
@ -27,7 +27,7 @@ async def test_should_create_file():
|
||||
assert response.status_code == 200
|
||||
assert response.charset_encoding == "utf-8"
|
||||
|
||||
manifest = await codex_client.get_manifest(response.text)
|
||||
manifest = await codex_client.manifest(response.text)
|
||||
|
||||
assert manifest.datasetSize == 1024
|
||||
|
||||
|
||||
@ -1,19 +1,21 @@
|
||||
from asyncio import StreamReader
|
||||
from contextlib import asynccontextmanager
|
||||
from io import StringIO
|
||||
from typing import IO, Dict
|
||||
from typing import IO, Dict, AsyncIterator
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from benchmarks.codex.agent.agent import CodexAgent, DownloadStatus
|
||||
from benchmarks.codex.agent.codex_client import CodexClient, Cid, Manifest
|
||||
from benchmarks.codex.client.async_client import AsyncCodexClient
|
||||
from benchmarks.codex.client.common import Manifest, Cid
|
||||
from benchmarks.codex.logging import CodexDownloadMetric
|
||||
from benchmarks.core.concurrency import await_predicate_async
|
||||
from benchmarks.core.utils.streams import BaseStreamReader
|
||||
from benchmarks.logging.logging import LogParser
|
||||
|
||||
|
||||
class FakeCodexClient(CodexClient):
|
||||
class FakeCodexClient(AsyncCodexClient):
|
||||
def __init__(self) -> None:
|
||||
self.storage: Dict[Cid, Manifest] = {}
|
||||
self.streams: Dict[Cid, StreamReader] = {}
|
||||
@ -33,7 +35,7 @@ class FakeCodexClient(CodexClient):
|
||||
)
|
||||
return cid
|
||||
|
||||
async def get_manifest(self, cid: Cid) -> Manifest:
|
||||
async def manifest(self, cid: Cid) -> Manifest:
|
||||
return self.storage[cid]
|
||||
|
||||
def create_download_stream(self, cid: Cid) -> StreamReader:
|
||||
@ -41,15 +43,16 @@ class FakeCodexClient(CodexClient):
|
||||
self.streams[cid] = reader
|
||||
return reader
|
||||
|
||||
async def download(self, cid: Cid) -> BaseStreamReader:
|
||||
return self.streams[cid]
|
||||
@asynccontextmanager
|
||||
async def download(self, cid: Cid) -> AsyncIterator[BaseStreamReader]:
|
||||
yield self.streams[cid]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_should_create_dataset_of_right_size():
|
||||
codex_agent = CodexAgent(FakeCodexClient())
|
||||
cid = await codex_agent.create_dataset(size=1024, name="dataset-1", seed=1234)
|
||||
manifest = await codex_agent.client.get_manifest(cid)
|
||||
manifest = await codex_agent.client.manifest(cid)
|
||||
|
||||
assert manifest.datasetSize == 1024
|
||||
|
||||
|
||||
0
benchmarks/codex/client/__init__.py
Normal file
0
benchmarks/codex/client/__init__.py
Normal file
@ -1,43 +1,33 @@
|
||||
"""Async Client implementation for the base Codex API."""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import IO
|
||||
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import IO, AsyncIterator, AsyncGenerator
|
||||
|
||||
import aiohttp
|
||||
from pydantic import BaseModel
|
||||
from urllib3.util import Url
|
||||
|
||||
from benchmarks.codex.client.common import Manifest, Cid
|
||||
from benchmarks.core.utils.streams import BaseStreamReader
|
||||
|
||||
API_VERSION = "v1"
|
||||
|
||||
Cid = str
|
||||
|
||||
|
||||
class Manifest(BaseModel):
|
||||
cid: Cid
|
||||
treeCid: Cid
|
||||
datasetSize: int
|
||||
blockSize: int
|
||||
filename: str
|
||||
mimetype: str
|
||||
uploadedAt: int
|
||||
protected: bool
|
||||
|
||||
|
||||
class CodexClient(ABC):
|
||||
class AsyncCodexClient(ABC):
|
||||
@abstractmethod
|
||||
async def upload(self, name: str, mime_type: str, content: IO) -> Cid:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def get_manifest(self, cid: Cid) -> Manifest:
|
||||
async def manifest(self, cid: Cid) -> Manifest:
|
||||
pass
|
||||
|
||||
@asynccontextmanager
|
||||
@abstractmethod
|
||||
async def download(self, cid: Cid) -> BaseStreamReader:
|
||||
def download(self, cid: Cid) -> AsyncGenerator[BaseStreamReader, None]:
|
||||
pass
|
||||
|
||||
|
||||
class CodexClientImpl(CodexClient):
|
||||
class AsyncCodexClientImpl(AsyncCodexClient):
|
||||
"""A lightweight async wrapper built around the Codex REST API."""
|
||||
|
||||
def __init__(self, codex_api_url: Url):
|
||||
@ -58,7 +48,7 @@ class CodexClientImpl(CodexClient):
|
||||
|
||||
return await response.text()
|
||||
|
||||
async def get_manifest(self, cid: Cid) -> Manifest:
|
||||
async def manifest(self, cid: Cid) -> Manifest:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
response = await session.get(
|
||||
self.codex_api_url._replace(
|
||||
@ -73,14 +63,13 @@ class CodexClientImpl(CodexClient):
|
||||
|
||||
return Manifest.model_validate(dict(cid=cid, **response_contents["manifest"]))
|
||||
|
||||
async def download(self, cid: Cid) -> BaseStreamReader:
|
||||
@asynccontextmanager
|
||||
async def download(self, cid: Cid) -> AsyncIterator[BaseStreamReader]:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
response = await session.get(
|
||||
self.codex_api_url._replace(
|
||||
path=f"/api/codex/v1/data/{cid}/network/download"
|
||||
).url,
|
||||
self.codex_api_url._replace(path=f"/api/codex/v1/data/{cid}").url,
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
return response.content
|
||||
yield response.content
|
||||
16
benchmarks/codex/client/common.py
Normal file
16
benchmarks/codex/client/common.py
Normal file
@ -0,0 +1,16 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
API_VERSION = "v1"
|
||||
|
||||
Cid = str
|
||||
|
||||
|
||||
class Manifest(BaseModel):
|
||||
cid: Cid
|
||||
treeCid: Cid
|
||||
datasetSize: int
|
||||
blockSize: int
|
||||
filename: str
|
||||
mimetype: str
|
||||
uploadedAt: int
|
||||
protected: bool
|
||||
1
benchmarks/codex/client/tests/__init__.py
Normal file
1
benchmarks/codex/client/tests/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
|
||||
38
benchmarks/codex/client/tests/test_async_client.py
Normal file
38
benchmarks/codex/client/tests/test_async_client.py
Normal file
@ -0,0 +1,38 @@
|
||||
from io import BytesIO
|
||||
|
||||
import pytest
|
||||
from urllib3.util import parse_url
|
||||
|
||||
from benchmarks.codex.client.async_client import AsyncCodexClientImpl
|
||||
from benchmarks.core.utils.random import random_data
|
||||
from benchmarks.core.utils.units import megabytes
|
||||
|
||||
|
||||
@pytest.mark.codex_integration
|
||||
@pytest.mark.asyncio
|
||||
async def test_should_upload_file(codex_node_1_url: str):
|
||||
client = AsyncCodexClientImpl(parse_url(codex_node_1_url))
|
||||
|
||||
data = BytesIO()
|
||||
random_data(megabytes(1), data)
|
||||
|
||||
cid = client.upload("test.txt", "application/octet-stream", data)
|
||||
assert cid is not None
|
||||
|
||||
|
||||
@pytest.mark.codex_integration
|
||||
@pytest.mark.asyncio
|
||||
async def test_should_download_file(codex_node_1_url: str):
|
||||
client = AsyncCodexClientImpl(parse_url(codex_node_1_url))
|
||||
|
||||
buff = BytesIO()
|
||||
random_data(megabytes(1), buff)
|
||||
data = buff.getvalue()
|
||||
|
||||
cid = await client.upload("test.txt", "application/octet-stream", BytesIO(data))
|
||||
assert cid is not None
|
||||
|
||||
async with client.download(cid) as content:
|
||||
downloaded = await content.readexactly(megabytes(1))
|
||||
|
||||
assert downloaded == data
|
||||
92
benchmarks/codex/codex_node.py
Normal file
92
benchmarks/codex/codex_node.py
Normal file
@ -0,0 +1,92 @@
|
||||
import logging
|
||||
import socket
|
||||
from functools import cached_property
|
||||
from urllib.error import HTTPError
|
||||
|
||||
import requests
|
||||
from attr import dataclass
|
||||
from tenacity import (
|
||||
stop_after_attempt,
|
||||
wait_exponential,
|
||||
retry,
|
||||
retry_if_not_exception_type,
|
||||
)
|
||||
from urllib3.util import Url
|
||||
|
||||
from benchmarks.codex.agent.agent import Cid, DownloadStatus
|
||||
from benchmarks.codex.agent.codex_agent_client import CodexAgentClient
|
||||
from benchmarks.core.concurrency import await_predicate
|
||||
from benchmarks.core.experiments.experiments import ExperimentComponent
|
||||
from benchmarks.core.network import Node, DownloadHandle
|
||||
|
||||
STOP_POLICY = stop_after_attempt(5)
|
||||
WAIT_POLICY = wait_exponential(exp_base=2, min=4, max=16)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class CodexMeta:
|
||||
name: str
|
||||
|
||||
|
||||
class CodexNode(Node[Cid, CodexMeta], ExperimentComponent):
|
||||
def __init__(self, codex_api_url: Url, agent: CodexAgentClient):
|
||||
self.codex_api_url = codex_api_url
|
||||
self.agent = agent
|
||||
|
||||
def is_ready(self) -> bool:
|
||||
try:
|
||||
requests.get(
|
||||
str(self.codex_api_url._replace(path="/api/codex/v1/debug/info"))
|
||||
)
|
||||
return True
|
||||
except (ConnectionError, socket.gaierror):
|
||||
return False
|
||||
|
||||
@retry(
|
||||
stop=STOP_POLICY,
|
||||
wait=WAIT_POLICY,
|
||||
retry=retry_if_not_exception_type(HTTPError),
|
||||
)
|
||||
def genseed(self, size: int, seed: int, meta: CodexMeta) -> Cid:
|
||||
return self.agent.generate(size=size, seed=seed, name=meta.name)
|
||||
|
||||
@retry(
|
||||
stop=STOP_POLICY,
|
||||
wait=WAIT_POLICY,
|
||||
retry=retry_if_not_exception_type(HTTPError),
|
||||
)
|
||||
def leech(self, handle: Cid) -> DownloadHandle:
|
||||
return CodexDownloadHandle(parent=self, monitor_url=self.agent.download(handle))
|
||||
|
||||
def remove(self, handle: Cid) -> bool:
|
||||
logger.warning("Removing a file from Codex is not currently supported.")
|
||||
return False
|
||||
|
||||
@cached_property
|
||||
def name(self) -> str:
|
||||
return self.agent.node_id()
|
||||
|
||||
|
||||
class CodexDownloadHandle(DownloadHandle):
|
||||
def __init__(self, parent: CodexNode, monitor_url: Url):
|
||||
self.monitor_url = monitor_url
|
||||
self.parent = parent
|
||||
|
||||
def await_for_completion(self, timeout: float = 0) -> bool:
|
||||
def _predicate():
|
||||
completion = self.completion()
|
||||
return completion.downloaded == completion.total
|
||||
|
||||
return await_predicate(_predicate, timeout)
|
||||
|
||||
@property
|
||||
def node(self) -> Node:
|
||||
return self.parent
|
||||
|
||||
def completion(self) -> DownloadStatus:
|
||||
response = requests.get(str(self.monitor_url))
|
||||
response.raise_for_status()
|
||||
|
||||
return DownloadStatus.model_validate(response.json())
|
||||
0
benchmarks/codex/tests/fixtures/__init__.py
vendored
Normal file
0
benchmarks/codex/tests/fixtures/__init__.py
vendored
Normal file
22
benchmarks/codex/tests/fixtures/addresses.py
vendored
Normal file
22
benchmarks/codex/tests/fixtures/addresses.py
vendored
Normal file
@ -0,0 +1,22 @@
|
||||
import pytest
|
||||
import os
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def codex_node_1_url() -> str:
|
||||
return f"http://{os.environ.get('CODEX_NODE_1', 'localhost')}:6891"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def codex_node_2_url() -> str:
|
||||
return f"http://{os.environ.get('CODEX_NODE_2', 'localhost')}:6893"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def codex_agent_1_url() -> str:
|
||||
return f"http://{os.environ.get('CODEX_AGENT_1', 'localhost')}:9000"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def codex_agent_2_url() -> str:
|
||||
return f"http://{os.environ.get('CODEX_AGENT_2', 'localhost')}:9001"
|
||||
27
benchmarks/codex/tests/fixtures/fixtures.py
vendored
Normal file
27
benchmarks/codex/tests/fixtures/fixtures.py
vendored
Normal file
@ -0,0 +1,27 @@
|
||||
from urllib3.util import parse_url
|
||||
|
||||
from benchmarks.codex.agent.codex_agent_client import CodexAgentClient
|
||||
from benchmarks.codex.codex_node import CodexNode
|
||||
from benchmarks.core.concurrency import await_predicate
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def codex_node(codex_api_url: str, agent_url: str) -> CodexNode:
|
||||
node = CodexNode(
|
||||
codex_api_url=parse_url(codex_api_url),
|
||||
agent=CodexAgentClient(parse_url(agent_url)),
|
||||
)
|
||||
assert await_predicate(node.is_ready, timeout=10, polling_interval=0.5)
|
||||
# TODO wipe datasets once have support in codex for doing so.
|
||||
return node
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def codex_node1(codex_node_1_url: str, codex_agent_1_url: str) -> CodexNode:
|
||||
return codex_node(codex_node_1_url, codex_agent_1_url)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def codex_node2(codex_node_2_url: str, codex_agent_2_url: str) -> CodexNode:
|
||||
return codex_node(codex_node_2_url, codex_agent_2_url)
|
||||
@ -1,7 +1,9 @@
|
||||
from io import BytesIO
|
||||
|
||||
import pytest
|
||||
from urllib3.util import parse_url
|
||||
|
||||
from benchmarks.codex.client.async_client import AsyncCodexClientImpl
|
||||
from benchmarks.core.utils.random import random_data
|
||||
|
||||
|
||||
@ -15,14 +17,15 @@ def random_file() -> BytesIO:
|
||||
|
||||
@pytest.mark.codex_integration
|
||||
@pytest.mark.asyncio
|
||||
async def test_should_upload_file(codex_client_1, random_file):
|
||||
cid = await codex_client_1.upload(
|
||||
async def test_should_upload_file(codex_node_1_url: str, random_file):
|
||||
codex_client = AsyncCodexClientImpl(parse_url(codex_node_1_url))
|
||||
cid = await codex_client.upload(
|
||||
content=random_file, name="dataset-1", mime_type="application/octet-stream"
|
||||
)
|
||||
|
||||
assert cid is not None
|
||||
|
||||
manifest = await codex_client_1.get_manifest(cid)
|
||||
manifest = await codex_client.manifest(cid)
|
||||
|
||||
assert manifest.cid == cid
|
||||
assert manifest.datasetSize == 2048
|
||||
22
benchmarks/codex/tests/test_codex_node.py
Normal file
22
benchmarks/codex/tests/test_codex_node.py
Normal file
@ -0,0 +1,22 @@
|
||||
from typing import cast
|
||||
|
||||
import pytest
|
||||
|
||||
from benchmarks.codex.agent.agent import DownloadStatus
|
||||
from benchmarks.codex.codex_node import CodexMeta, CodexNode, CodexDownloadHandle
|
||||
from benchmarks.core.utils.units import megabytes
|
||||
|
||||
|
||||
@pytest.mark.codex_integration
|
||||
def test_should_download_file(codex_node1: CodexNode, codex_node2: CodexNode):
|
||||
cid = codex_node1.genseed(
|
||||
size=megabytes(1),
|
||||
seed=1234,
|
||||
meta=CodexMeta(name="dataset1"),
|
||||
)
|
||||
handle = codex_node2.leech(cid)
|
||||
|
||||
assert handle.await_for_completion(5)
|
||||
assert cast(CodexDownloadHandle, handle).completion() == DownloadStatus(
|
||||
downloaded=megabytes(1), total=megabytes(1)
|
||||
)
|
||||
@ -3,4 +3,5 @@
|
||||
from benchmarks.deluge.tests.fixtures import *
|
||||
from benchmarks.core.tests.fixtures import *
|
||||
from benchmarks.logging.sources.tests.fixtures import *
|
||||
from benchmarks.codex.agent.tests.fixtures import *
|
||||
from benchmarks.codex.tests.fixtures.addresses import *
|
||||
from benchmarks.codex.tests.fixtures.fixtures import *
|
||||
|
||||
@ -4,6 +4,8 @@ from typing import Protocol
|
||||
class BaseStreamReader(Protocol):
|
||||
async def read(self, n: int) -> bytes: ...
|
||||
|
||||
async def readexactly(self, n: int) -> bytes: ...
|
||||
|
||||
def feed_data(self, data: bytes) -> None: ...
|
||||
|
||||
def at_eof(self) -> bool: ...
|
||||
|
||||
@ -17,7 +17,7 @@ from benchmarks.core.experiments.static_experiment import StaticDisseminationExp
|
||||
from benchmarks.core.pydantic import Host
|
||||
from benchmarks.core.utils.random import sample
|
||||
|
||||
from benchmarks.deluge.agent.client import DelugeAgentClient
|
||||
from benchmarks.deluge.agent.deluge_agent_client import DelugeAgentClient
|
||||
from benchmarks.deluge.deluge_node import DelugeMeta, DelugeNode
|
||||
from benchmarks.deluge.tracker import Tracker
|
||||
|
||||
|
||||
@ -26,7 +26,7 @@ from benchmarks.core.concurrency import await_predicate
|
||||
from benchmarks.core.experiments.experiments import ExperimentComponent
|
||||
from benchmarks.core.network import DownloadHandle, Node
|
||||
|
||||
from benchmarks.deluge.agent.client import DelugeAgentClient
|
||||
from benchmarks.deluge.agent.deluge_agent_client import DelugeAgentClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -6,7 +6,7 @@ import pytest
|
||||
from urllib3.util import parse_url
|
||||
|
||||
from benchmarks.core.concurrency import await_predicate
|
||||
from benchmarks.deluge.agent.client import DelugeAgentClient
|
||||
from benchmarks.deluge.agent.deluge_agent_client import DelugeAgentClient
|
||||
from benchmarks.deluge.deluge_node import DelugeNode
|
||||
from benchmarks.deluge.tracker import Tracker
|
||||
|
||||
|
||||
@ -6,6 +6,9 @@ services:
|
||||
entrypoint: [ "poetry", "run", "pytest", "-m", "codex_integration", "--exitfirst" ]
|
||||
environment:
|
||||
- CODEX_NODE_1=codex-1
|
||||
- CODEX_NODE_2=codex-2
|
||||
- CODEX_AGENT_1=codex-agent-1
|
||||
- CODEX_AGENT_2=codex-agent-2
|
||||
depends_on:
|
||||
clean-volumes:
|
||||
condition: service_healthy
|
||||
|
||||
@ -6,13 +6,15 @@ services:
|
||||
- /bin/sh
|
||||
- -c
|
||||
- |
|
||||
echo "Cleaning data dirs..."
|
||||
rm -rf /var/lib/codex1/* /var/lib/codex2/* /var/lib/codex3/*
|
||||
touch /.done
|
||||
echo "done."
|
||||
sleep infinity
|
||||
volumes:
|
||||
- codex-volume-1:/var/lib/deluge1
|
||||
- codex-volume-2:/var/lib/deluge2
|
||||
- codex-volume-3:/var/lib/deluge3
|
||||
- codex-volume-1:/var/lib/codex1
|
||||
- codex-volume-2:/var/lib/codex2
|
||||
- codex-volume-3:/var/lib/codex3
|
||||
healthcheck:
|
||||
timeout: 10s
|
||||
test: [ "CMD", "test", "-f", "/.done" ]
|
||||
@ -24,17 +26,55 @@ services:
|
||||
container_name: codex-1
|
||||
environment:
|
||||
- CODEX_LOG_LEVEL=DEBUG
|
||||
|
||||
- CODEX_DATA_DIR=/var/lib/codex
|
||||
- CODEX_DISC_PORT=8090
|
||||
- CODEX_DISC_PORT=6890
|
||||
- CODEX_API_BINDADDR=0.0.0.0
|
||||
- CODEX_API_PORT=8091
|
||||
- CODEX_API_PORT=6891
|
||||
- CODEX_STORAGE_QUOTA=1073741824 # 1GB
|
||||
|
||||
- NAT_IP_AUTO=true
|
||||
volumes:
|
||||
- codex-volume-1:/var/lib/codex
|
||||
ports:
|
||||
- "8090-8091:8090-8091"
|
||||
- "6890-6891:6890-6891"
|
||||
|
||||
codex-agent-1:
|
||||
image: bittorrent-benchmarks:test
|
||||
container_name: codex-agent-1
|
||||
entrypoint: [ "poetry", "run", "bittorrent-benchmarks",
|
||||
"agent", "experiments-codex.local.yaml", "codex_agent", "--port", "9000" ]
|
||||
environment:
|
||||
- CODEX_API_URL=http://codex-1:6891
|
||||
- NODE_ID=codex-1
|
||||
ports:
|
||||
- "9000:9000"
|
||||
|
||||
codex-2:
|
||||
image: codexstorage/nim-codex:latest
|
||||
container_name: codex-2
|
||||
environment:
|
||||
- CODEX_LOG_LEVEL=DEBUG
|
||||
- CODEX_DATA_DIR=/var/lib/codex
|
||||
- CODEX_DISC_PORT=6892
|
||||
- CODEX_API_BINDADDR=0.0.0.0
|
||||
- CODEX_API_PORT=6893
|
||||
- CODEX_STORAGE_QUOTA=1073741824 # 1GB
|
||||
- BOOTSTRAP_NODE_URL=http://codex-1:6891
|
||||
- NAT_IP_AUTO=true
|
||||
volumes:
|
||||
- codex-volume-2:/var/lib/codex
|
||||
ports:
|
||||
- "6892-6893:6892-6893"
|
||||
|
||||
codex-agent-2:
|
||||
image: bittorrent-benchmarks:test
|
||||
container_name: codex-agent-2
|
||||
entrypoint: [ "poetry", "run", "bittorrent-benchmarks",
|
||||
"agent", "experiments-codex.local.yaml", "codex_agent", "--port", "9001" ]
|
||||
environment:
|
||||
- CODEX_API_URL=http://codex-2:6893
|
||||
- NODE_ID=codex-2
|
||||
ports:
|
||||
- "9001:9001"
|
||||
|
||||
volumes:
|
||||
codex-volume-1:
|
||||
|
||||
@ -43,7 +43,7 @@ services:
|
||||
image: bittorrent-benchmarks:test
|
||||
container_name: agent-1
|
||||
entrypoint: [ "poetry", "run", "bittorrent-benchmarks",
|
||||
"agent", "experiments.local.yaml", "deluge_agent", "--port", "9001" ]
|
||||
"agent", "experiments-deluge.local.yaml", "deluge_agent", "--port", "9001" ]
|
||||
environment:
|
||||
- TORRENTS_ROOT=/var/lib/deluge/downloads
|
||||
volumes:
|
||||
@ -72,7 +72,7 @@ services:
|
||||
image: bittorrent-benchmarks:test
|
||||
container_name: agent-2
|
||||
entrypoint: [ "poetry", "run", "bittorrent-benchmarks", "agent",
|
||||
"experiments.local.yaml", "deluge_agent", "--port", "9002" ]
|
||||
"experiments-deluge.local.yaml", "deluge_agent", "--port", "9002" ]
|
||||
environment:
|
||||
- TORRENTS_ROOT=/var/lib/deluge/downloads
|
||||
volumes:
|
||||
@ -100,7 +100,7 @@ services:
|
||||
deluge-agent-3:
|
||||
image: bittorrent-benchmarks:test
|
||||
container_name: agent-3
|
||||
entrypoint: [ "poetry", "run", "bittorrent-benchmarks", "agent", "experiments.local.yaml",
|
||||
entrypoint: [ "poetry", "run", "bittorrent-benchmarks", "agent", "experiments-deluge.local.yaml",
|
||||
"deluge_agent", "--port", "9003" ]
|
||||
environment:
|
||||
- TORRENTS_ROOT=/var/lib/deluge/downloads
|
||||
|
||||
@ -19,4 +19,5 @@ RUN if [ "$BUILD_TYPE" = "release" ]; then \
|
||||
COPY . .
|
||||
RUN poetry install --only main
|
||||
|
||||
ENTRYPOINT ["poetry", "run", "bittorrent-benchmarks", "experiments", "/opt/bittorrent-benchmarks/experiments.k8s.yaml"]
|
||||
ENTRYPOINT ["poetry", "run", "bittorrent-benchmarks", "experiments", \
|
||||
"/opt/bittorrent-benchmarks/experiments-deluge.k8s.yaml"]
|
||||
|
||||
3
experiments-codex.local.yaml
Normal file
3
experiments-codex.local.yaml
Normal file
@ -0,0 +1,3 @@
|
||||
codex_agent:
|
||||
codex_api_url: ${CODEX_API_URL}
|
||||
node_id: ${NODE_ID}
|
||||
@ -27,3 +27,4 @@ deluge_experiment:
|
||||
|
||||
deluge_agent:
|
||||
torrents_path: /var/lib/deluge/downloads
|
||||
|
||||
@ -76,7 +76,7 @@ spec:
|
||||
imagePullPolicy: {{ include "benchmark.harness.imagePullPolicy" . }}
|
||||
command: [
|
||||
"poetry", "run", "bittorrent-benchmarks",
|
||||
"agent", "experiments.k8s.yaml", "deluge_agent", "--port", "9001"
|
||||
"agent", "experiments-deluge.k8s.yaml", "deluge_agent", "--port", "9001"
|
||||
]
|
||||
ports:
|
||||
- containerPort: 9001
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user