feat: add Codex static dissemination experiment

This commit is contained in:
gmega 2025-02-12 19:21:34 -03:00
parent 84f5e1437f
commit 6681922e00
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
12 changed files with 304 additions and 17 deletions

View File

@ -8,13 +8,11 @@ from typing import Optional, Dict
from pydantic import BaseModel
from benchmarks.codex.client.async_client import AsyncCodexClient
from benchmarks.codex.client.common import Cid
from benchmarks.codex.client.common import Manifest
from benchmarks.codex.logging import CodexDownloadMetric
from benchmarks.core.utils.random import random_data
Cid = str
EMPTY_STREAM_BACKOFF = 0.1
logger = logging.getLogger(__name__)

View File

@ -59,9 +59,7 @@ class AsyncCodexClientImpl(AsyncCodexClient):
response.raise_for_status()
response_contents = await response.json()
cid = response_contents.pop("cid")
return Manifest.model_validate(dict(cid=cid, **response_contents["manifest"]))
return Manifest.from_codex_api_response(response_contents)
@asynccontextmanager
async def download(self, cid: Cid) -> AsyncIterator[BaseStreamReader]:

View File

@ -14,3 +14,9 @@ class Manifest(BaseModel):
mimetype: str
uploadedAt: int
protected: bool
@staticmethod
def from_codex_api_response(response: dict) -> "Manifest":
return Manifest.model_validate(
dict(cid=response["cid"], **response["manifest"])
)

View File

@ -1,6 +1,7 @@
import logging
import socket
from functools import cached_property
from typing import Iterator, Set
from urllib.error import HTTPError
import requests
@ -18,9 +19,11 @@ from benchmarks.codex.agent.codex_agent_client import CodexAgentClient
from benchmarks.core.concurrency import await_predicate
from benchmarks.core.experiments.experiments import ExperimentComponent
from benchmarks.core.network import Node, DownloadHandle
from benchmarks.core.utils.units import megabytes
STOP_POLICY = stop_after_attempt(5)
WAIT_POLICY = wait_exponential(exp_base=2, min=4, max=16)
DELETE_TIMEOUT = 3600 # timeouts for deletes should be generous (https://github.com/codex-storage/nim-codex/pull/1103)
logger = logging.getLogger(__name__)
@ -31,9 +34,11 @@ class CodexMeta:
class CodexNode(Node[Cid, CodexMeta], ExperimentComponent):
def __init__(self, codex_api_url: Url, agent: CodexAgentClient):
def __init__(self, codex_api_url: Url, agent: CodexAgentClient) -> None:
self.codex_api_url = codex_api_url
self.agent = agent
# Lightweight tracking of datasets created by this node. It's OK if we lose them.
self.hosted_datasets: Set[Cid] = set()
def is_ready(self) -> bool:
try:
@ -50,7 +55,9 @@ class CodexNode(Node[Cid, CodexMeta], ExperimentComponent):
retry=retry_if_not_exception_type(HTTPError),
)
def genseed(self, size: int, seed: int, meta: CodexMeta) -> Cid:
return self.agent.generate(size=size, seed=seed, name=meta.name)
cid = self.agent.generate(size=size, seed=seed, name=meta.name)
self.hosted_datasets.add(cid)
return cid
@retry(
stop=STOP_POLICY,
@ -58,11 +65,51 @@ class CodexNode(Node[Cid, CodexMeta], ExperimentComponent):
retry=retry_if_not_exception_type(HTTPError),
)
def leech(self, handle: Cid) -> DownloadHandle:
self.hosted_datasets.add(handle)
return CodexDownloadHandle(parent=self, monitor_url=self.agent.download(handle))
def remove(self, handle: Cid) -> bool:
logger.warning("Removing a file from Codex is not currently supported.")
return False
response = requests.delete(
str(self.codex_api_url._replace(path=f"/api/codex/v1/data/{handle}")),
timeout=DELETE_TIMEOUT,
)
response.raise_for_status()
return True
def exists_local(self, handle: Cid) -> bool:
"""Check if a dataset exists on the node."""
response = requests.get(
str(self.codex_api_url._replace(path=f"/api/codex/v1/data/{handle}"))
)
response.close()
if response.status_code == 404:
return False
if response.status_code != 200:
response.raise_for_status()
return True
def download_local(
self, handle: Cid, chunk_size: int = megabytes(1)
) -> Iterator[bytes]:
"""Retrieves the contents of a locally available
dataset from the node."""
response = requests.get(
str(self.codex_api_url._replace(path=f"/api/codex/v1/data/{handle}"))
)
response.raise_for_status()
return response.iter_content(chunk_size=chunk_size)
def wipe_all_datasets(self):
for dataset in list(self.hosted_datasets):
self.remove(dataset)
self.hosted_datasets.remove(dataset)
@cached_property
def name(self) -> str:

View File

@ -0,0 +1,93 @@
import random
from itertools import islice
from typing import List, cast
from pydantic import Field
from pydantic_core import Url
from urllib3.util import parse_url
from benchmarks.codex.agent.codex_agent_client import CodexAgentClient
from benchmarks.codex.client.common import Cid
from benchmarks.codex.codex_node import CodexMeta, CodexNode
from benchmarks.core.experiments.experiments import (
ExperimentBuilder,
ExperimentEnvironment,
ExperimentComponent,
)
from benchmarks.core.experiments.iterated_experiment import IteratedExperiment
from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment
from benchmarks.core.pydantic import SnakeCaseModel, Host
from benchmarks.core.utils.random import sample
class CodexNodeConfig(SnakeCaseModel):
name: str
address: Host
disc_port: int
api_port: int
agent_url: Url
CodexDisseminationExperiment = IteratedExperiment[
StaticDisseminationExperiment[Cid, CodexMeta]
]
class CodexExperimentConfig(ExperimentBuilder[CodexDisseminationExperiment]):
experiment_set_id: str = Field(
description="Identifies the group of experiment repetitions", default="unnamed"
)
seeder_sets: int = Field(
gt=0, default=1, description="Number of distinct seeder sets to experiment with"
)
seeders: int = Field(gt=0, description="Number of seeders per seeder set")
file_size: int = Field(gt=0, description="File size, in bytes")
repetitions: int = Field(
gt=0, description="How many experiment repetitions to run for each seeder set"
)
logging_cooldown: int = Field(
gt=0,
default=0,
description="Time to wait after the last download completes before tearing down the experiment.",
)
nodes: List[CodexNodeConfig]
def build(self) -> CodexDisseminationExperiment:
agents = [
CodexAgentClient(parse_url(str(node.agent_url))) for node in self.nodes
]
network = [
CodexNode(
codex_api_url=parse_url(f"http://{str(node.address)}:{node.api_port}"),
agent=agents[i],
)
for i, node in enumerate(self.nodes)
]
env = ExperimentEnvironment(
components=cast(List[ExperimentComponent], network + agents),
ping_max=10,
polling_interval=0.5,
)
def repetitions():
for seeder_set in range(self.seeder_sets):
seeders = list(islice(sample(len(network)), self.seeders))
for experiment_run in range(self.repetitions):
yield env.bind(
StaticDisseminationExperiment(
network=network,
seeders=seeders,
file_size=self.file_size,
seed=random.randint(0, 2**16),
meta=CodexMeta(f"dataset-{seeder_set}-{experiment_run}"),
logging_cooldown=self.logging_cooldown,
)
)
return IteratedExperiment(
repetitions(), experiment_set_id=self.experiment_set_id
)

View File

@ -12,6 +12,11 @@ def codex_node_2_url() -> str:
return f"http://{os.environ.get('CODEX_NODE_2', 'localhost')}:6893"
@pytest.fixture
def codex_node_3_url() -> str:
return f"http://{os.environ.get('CODEX_NODE_3', 'localhost')}:6895"
@pytest.fixture
def codex_agent_1_url() -> str:
return f"http://{os.environ.get('CODEX_AGENT_1', 'localhost')}:9000"
@ -20,3 +25,8 @@ def codex_agent_1_url() -> str:
@pytest.fixture
def codex_agent_2_url() -> str:
return f"http://{os.environ.get('CODEX_AGENT_2', 'localhost')}:9001"
@pytest.fixture
def codex_agent_3_url() -> str:
return f"http://{os.environ.get('CODEX_AGENT_3', 'localhost')}:9002"

View File

@ -1,3 +1,5 @@
from typing import Iterator
from urllib3.util import parse_url
from benchmarks.codex.agent.codex_agent_client import CodexAgentClient
@ -7,21 +9,29 @@ from benchmarks.core.concurrency import await_predicate
import pytest
def codex_node(codex_api_url: str, agent_url: str) -> CodexNode:
def codex_node(codex_api_url: str, agent_url: str) -> Iterator[CodexNode]:
node = CodexNode(
codex_api_url=parse_url(codex_api_url),
agent=CodexAgentClient(parse_url(agent_url)),
)
assert await_predicate(node.is_ready, timeout=10, polling_interval=0.5)
# TODO wipe datasets once have support in codex for doing so.
return node
try:
yield node
finally:
node.wipe_all_datasets()
@pytest.fixture
def codex_node1(codex_node_1_url: str, codex_agent_1_url: str) -> CodexNode:
return codex_node(codex_node_1_url, codex_agent_1_url)
def codex_node1(codex_node_1_url: str, codex_agent_1_url: str) -> Iterator[CodexNode]:
yield from codex_node(codex_node_1_url, codex_agent_1_url)
@pytest.fixture
def codex_node2(codex_node_2_url: str, codex_agent_2_url: str) -> CodexNode:
return codex_node(codex_node_2_url, codex_agent_2_url)
def codex_node2(codex_node_2_url: str, codex_agent_2_url: str) -> Iterator[CodexNode]:
yield from codex_node(codex_node_2_url, codex_agent_2_url)
@pytest.fixture
def codex_node3(codex_node_3_url: str, codex_agent_3_url: str) -> Iterator[CodexNode]:
yield from codex_node(codex_node_3_url, codex_agent_3_url)

View File

@ -20,3 +20,28 @@ def test_should_download_file(codex_node1: CodexNode, codex_node2: CodexNode):
assert cast(CodexDownloadHandle, handle).completion() == DownloadStatus(
downloaded=megabytes(1), total=megabytes(1)
)
@pytest.mark.codex_integration
def test_should_remove_file(codex_node1: CodexNode):
cid = codex_node1.genseed(
size=megabytes(1),
seed=1234,
meta=CodexMeta(name="dataset1"),
)
assert codex_node1.exists_local(cid)
assert codex_node1.remove(cid)
assert not codex_node1.exists_local(cid)
@pytest.mark.codex_integration
def test_should_download_file_from_local_node(codex_node1: CodexNode):
cid = codex_node1.genseed(
size=megabytes(1),
seed=1234,
meta=CodexMeta(name="dataset1"),
)
contents = b"".join(codex_node1.download_local(cid))
assert len(contents) == megabytes(1)

View File

@ -0,0 +1,48 @@
from collections.abc import Iterator
import pytest
from benchmarks.codex.codex_node import CodexMeta
from benchmarks.core.experiments.experiments import ExperimentEnvironment
from benchmarks.core.experiments.static_experiment import StaticDisseminationExperiment
from benchmarks.core.utils.units import megabytes
def merge_chunks(chunks: Iterator[bytes]) -> bytes:
return b"".join(chunks)
@pytest.mark.codex_integration
def test_should_run_with_a_single_seeder(codex_node1, codex_node2, codex_node3):
size = megabytes(2)
env = ExperimentEnvironment(
components=[codex_node1, codex_node2, codex_node3],
polling_interval=0.5,
)
experiment = StaticDisseminationExperiment(
network=[codex_node1, codex_node2, codex_node3],
seeders=[1],
file_size=size,
seed=1234,
meta=CodexMeta("dataset-1"),
)
env.await_ready()
try:
experiment.setup()
experiment.do_run()
all_datasets = list(codex_node1.hosted_datasets)
assert len(all_datasets) == 1
cid = all_datasets[0]
content_1 = merge_chunks(codex_node1.download_local(cid))
content_2 = merge_chunks(codex_node2.download_local(cid))
content_3 = merge_chunks(codex_node3.download_local(cid))
assert len(content_1) == megabytes(2)
assert content_1 == content_2 == content_3
finally:
experiment.teardown()

View File

@ -7,8 +7,10 @@ services:
environment:
- CODEX_NODE_1=codex-1
- CODEX_NODE_2=codex-2
- CODEX_NODE_3=codex-3
- CODEX_AGENT_1=codex-agent-1
- CODEX_AGENT_2=codex-agent-2
- CODEX_AGENT_3=codex-agent-3
depends_on:
clean-volumes:
condition: service_healthy

View File

@ -76,6 +76,34 @@ services:
ports:
- "9001:9001"
codex-3:
image: codexstorage/nim-codex:latest
container_name: codex-3
environment:
- CODEX_LOG_LEVEL=DEBUG
- CODEX_DATA_DIR=/var/lib/codex
- CODEX_DISC_PORT=6894
- CODEX_API_BINDADDR=0.0.0.0
- CODEX_API_PORT=6895
- CODEX_STORAGE_QUOTA=1073741824 # 1GB
- BOOTSTRAP_NODE_URL=http://codex-1:6891
- NAT_IP_AUTO=true
volumes:
- codex-volume-3:/var/lib/codex
ports:
- "6894-6895:6894-6895"
codex-agent-3:
image: bittorrent-benchmarks:test
container_name: codex-agent-3
entrypoint: [ "poetry", "run", "bittorrent-benchmarks",
"agent", "experiments-codex.local.yaml", "codex_agent", "--port", "9002" ]
environment:
- CODEX_API_URL=http://codex-3:6895
- NODE_ID=codex-3
ports:
- "9002:9002"
volumes:
codex-volume-1:
codex-volume-2:

View File

@ -1,3 +1,25 @@
codex_experiment:
seeders: 1
file_size: 52428800
repetitions: 3
nodes:
- name: codex-1
address: ${CODEX_NODE_1:-localhost}
disc_port: 6890
api_port: 6891
agent_url: http://${CODEX_AGENT_1:-localhost}:9000/
- name: codex-2
address: ${CODEX_NODE_2:-localhost}
disc_port: 6892
api_port: 6893
agent_url: http://${CODEX_AGENT_1:-localhost}:9002/
- name: codex-3
address: ${CODEX_NODE_2:-localhost}
disc_port: 6893
api_port: 6894
agent_url: http://${CODEX_AGENT_1:-localhost}:9003/
codex_agent:
codex_api_url: ${CODEX_API_URL}
node_id: ${NODE_ID}