diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3d391d0..71a6984 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,6 +13,7 @@ env: DOCKER_REPO: codexstorage/bittorrent-benchmarks BUILD_ARGS: | BUILD_TYPE=test + COMPOSE_CODEX_IMAGE: codexstorage/nim-codex:sha-f529a70 jobs: build-and-test: diff --git a/analysis/presentation.short/images/nu-6.png b/analysis/presentation.short/images/nu-6.png new file mode 100644 index 0000000..1da4c13 Binary files /dev/null and b/analysis/presentation.short/images/nu-6.png differ diff --git a/analysis/presentation.short/images/nu-7.png b/analysis/presentation.short/images/nu-7.png new file mode 100644 index 0000000..d962f26 Binary files /dev/null and b/analysis/presentation.short/images/nu-7.png differ diff --git a/analysis/presentation.short/images/un-1.png b/analysis/presentation.short/images/un-1.png new file mode 100644 index 0000000..13e924f Binary files /dev/null and b/analysis/presentation.short/images/un-1.png differ diff --git a/analysis/presentation.short/images/un-2.png b/analysis/presentation.short/images/un-2.png new file mode 100644 index 0000000..855bffa Binary files /dev/null and b/analysis/presentation.short/images/un-2.png differ diff --git a/analysis/presentation.short/images/un-3.png b/analysis/presentation.short/images/un-3.png new file mode 100644 index 0000000..99b76ac Binary files /dev/null and b/analysis/presentation.short/images/un-3.png differ diff --git a/analysis/presentation.short/images/un-4.png b/analysis/presentation.short/images/un-4.png new file mode 100644 index 0000000..1748729 Binary files /dev/null and b/analysis/presentation.short/images/un-4.png differ diff --git a/analysis/presentation.short/images/un-5.png b/analysis/presentation.short/images/un-5.png new file mode 100644 index 0000000..efafd1c Binary files /dev/null and b/analysis/presentation.short/images/un-5.png differ diff --git a/analysis/presentation.short/short-presentation.qmd b/analysis/presentation.short/short-presentation.qmd new file mode 100644 index 0000000..7504014 --- /dev/null +++ b/analysis/presentation.short/short-presentation.qmd @@ -0,0 +1,215 @@ +--- +title: "Measuring Codex Performance for Content Delivery" +subtitle: "(aka Codex Benchmarks)" +format: + revealjs: + slide-number: true +execute: + cache: true +--- + + + +```{r echo = FALSE, warning = FALSE, echo = FALSE, message = FALSE} +library(tidyverse) +library(DT) + +benchmarks <- read_csv('./benchmarks.csv') |> + mutate( + file_size = factor(rlang::parse_bytes(as.character(file_size)), + levels = rlang::parse_bytes(as.character( + unique(file_size[order(file_size, decreasing = TRUE)]))))) + +relative_performance <- benchmarks |> + filter(experiment_type == 'deluge_experiment_config_log_entry') |> + transmute( + file_size, network_size, seeders, leechers, deluge_median = median, + ) |> + inner_join( + benchmarks |> + filter(experiment_type == 'codex_experiment_config_log_entry') |> + select( + file_size, network_size, seeders, leechers, codex_median = median + ), + by = c('file_size', 'network_size', 'seeders', 'leechers') + ) |> + mutate( + performance = codex_median / deluge_median, + seeder_ratio = seeders / network_size + ) +``` + + +## Intro + +::: {.incremental} + +* Why? + * _performance_ is a key aspect of a storage system; + * want to understand _how Codex performs_. + +* What? + * Content delivery: _download_ performance. + * Download performance: latency, throughput. + * Codex aims at supporting _large_ files; + * download speed ($\text{MB/s}$) is dominant. + +::: + +## Baseline {.smaller} + +::: {.incremental} + +* _Quality_ baseline: easier to know where you stand; + * faster: good; + * slower: maybe not so good. + +* Decentralized, large-file content distribution: + * Bittorrent; + * IPFS. + +* Bittorrent: _de facto_ standard; + * been used for a very long time; + * fast and efficient (more so than IPFS); + * several open source implementations. + +::: + +## Baseline + +::: {.incremental} + +* For our baseline, we picked [Deluge](https://deluge-torrent.org/): + * well-known, lots of users despite small market share (1%); + * can be run as a daemon, has [open source client libraries](https://github.com/JohnDoee/deluge-client) to interact with daemon remotely; + * based on [libtorrent (rasterbar)](https://www.libtorrent.org/). + +::: + +## Static Dissemination Experiment + +::: {.incremental} + +* _Static dissemination experiment._ +* Network of size $n$; + * split into $s$ seeders, $l = n - s$ leechers; + * seeder ratio $r = \frac{s}{n}$. + +* Experiment: + * generate file $F$ of size $b$; + * upload $F$ to each seeder; + * fire up all leechers "at the same time"; + * measure time to download $F$ at leechers. + +::: + +## Static Dissemination Experiment + +```{r fig.align="center", echo = FALSE} +knitr::include_graphics('./images/un-1.png') +``` + +## Static Dissemination Experiment + +```{r fig.align="center", echo = FALSE} +knitr::include_graphics('./images/un-2.png') +``` + +## Static Dissemination Experiment + +```{r fig.align="center", echo = FALSE} +knitr::include_graphics('./images/un-3.png') +``` + +## Static Dissemination Experiment + +```{r fig.align="center", echo = FALSE} +knitr::include_graphics('./images/un-4.png') +``` + +## Static Dissemination Experiment + +```{r fig.align="center", echo = FALSE} +knitr::include_graphics('./images/nu-6.png') +``` + +## Static Dissemination Experiment + +```{r fig.align="center", echo = FALSE} +knitr::include_graphics('./images/nu-7.png') +``` + +## Static Dissemination Experiment + +* Parameters: + * File sizes: $b \in {100\text{MB}, 1\text{GB}, 5\text{GB}}$; + * Network sizes: $n \in {2, 8, 16, 32}$; + * Seeder ratios: $0.5, 0.25, 0.125, 0.0625, 0.03125$ (depending on $n$). + +* Hardware: + * [CPX31](https://www.hetzner.com/cloud?ref=blog.codex.storage) Hetzner VMs (4 vCPU, 8GB RAM); + * $\sim 4\text{Gbps}$ point-to-point bandwidth. + +## Results - Download Speed + +```{r fig.width = 10, warning=FALSE, message=FALSE, echo=FALSE} +ggplot(benchmarks, aes(col = experiment_type, fill = experiment_type, group = experiment_type)) + + geom_ribbon(aes(ymin = p25_speed, ymax = p75_speed, x = network_size, fill = experiment_type, alpha = 0.5), col = 'lightgray') + + geom_point(aes(x = network_size, y = p25_speed), col = 'darkgray', size=10.0, shape='-') + + geom_point(aes(x = network_size, y = p75_speed), col = 'darkgray', size=10.0, shape='-') + + geom_line(aes(x = network_size, y = median_speed)) + + geom_point(aes(x = network_size, y = median_speed)) + + ylab('median download speed (bytes/second)') + + xlab('network size') + + theme_minimal(base_size=15) + + scale_y_continuous(labels = function(x) paste0(scales::label_bytes()(x), '/s')) + + facet_grid( + file_size ~ seeder_ratio, + labeller = labeller( + seeder_ratio = as_labeller(function(x) { + paste0("seeder ratio: ", scales::percent(as.numeric(x))) + })) + ) + + scale_color_discrete(name = '', labels = c('Codex', 'Deluge')) + + guides(fill = 'none', alpha = 'none') +``` +## Results - Relative (Median) Speed + +```{r fig.cap='Median downlaod time ratio for Codex and Deluge', fig.width = 11, message = FALSE, echo = FALSE} +ggplot(relative_performance) + + geom_line(aes(x = network_size, y = performance, col = file_size), lwd=1) + + geom_hline(yintercept = 1, linetype = 'dashed', col = 'darkgray') + + geom_point(aes(x = network_size, y = performance, col = file_size)) + + ylab('median Codex/Deluge performance ratio') + + annotate('text', label = 'faster', x = 29, y = 0, col = 'darkgreen') + + annotate('text', label = 'slower', x = 28.5, y = 2, col = 'darkred') + + theme_minimal(base_size=15) + + scale_color_discrete(name = 'file size') + + facet_grid( + file_size ~ seeder_ratio, + labeller = labeller( + file_size = as_labeller(function(x) x), + seeder_ratio = as_labeller(function(x) { + paste0("seeder ratio: ", scales::percent(as.numeric(x))) + })) + ) +``` + +## Next + +::: {.incremental} + +* Debugging, debugging... +* larger experiments (networks, files); +* dynamic network experiments, with churn and faults. + +::: + +## Thank You! + +* Benchmarks repo: [github.com/codex-network/codex-benchmarks]() +* RPubs Notebook with Data: [https://rpubs.com/giuliano_mega/1266876]() +* Blog post: [https://blog.codex.storage/measuring-codex-performance-for-content-delivery/]() diff --git a/benchmarks/codex/agent/agent.py b/benchmarks/codex/agent/agent.py index 12261e7..3e7aec3 100644 --- a/benchmarks/codex/agent/agent.py +++ b/benchmarks/codex/agent/agent.py @@ -1,121 +1,137 @@ import asyncio import logging +import time from asyncio import Task from pathlib import Path from tempfile import TemporaryDirectory from typing import Optional, Dict from aiohttp import ClientTimeout -from pydantic import BaseModel -from benchmarks.codex.client.async_client import AsyncCodexClient +from benchmarks.codex.client.async_client import AsyncCodexClient, DownloadStatus from benchmarks.codex.client.common import Cid from benchmarks.codex.client.common import Manifest from benchmarks.core.utils.random import random_data from benchmarks.logging.logging import DownloadMetric -EMPTY_STREAM_BACKOFF = 2 +STATUS_BACKOFF = 2.0 +PROGRESS_TIMEOUT = 120 logger = logging.getLogger(__name__) -class DownloadStatus(BaseModel): - downloaded: int - total: int - - def as_percent(self) -> float: - return (self.downloaded * 100) / self.total - - class DownloadHandle: def __init__( self, parent: "CodexAgent", manifest: Manifest, - read_increment: float = 0.01, - read_timeout: Optional[float] = None, + log_increment: float = 0.01, + status_backoff: float = STATUS_BACKOFF, + progress_timeout: float = PROGRESS_TIMEOUT, ): self.parent = parent self.manifest = manifest - self.bytes_downloaded = 0 - self.read_increment = read_increment - self.read_timeout = read_timeout + self.log_increment = log_increment self.download_task: Optional[Task[None]] = None + self.status_backoff = status_backoff + self.progress_timeout = progress_timeout + self._progress = DownloadStatus(downloaded=0, total=manifest.block_count) def begin_download(self) -> Task: self.download_task = asyncio.create_task(self._download_loop()) return self.download_task async def _download_loop(self): - step_size = int(self.manifest.datasetSize * self.read_increment) + step_size = max(1, int(self.manifest.block_count * self.log_increment)) - async with self.parent.client.download( - self.manifest.cid, + download_id = await self.parent.client.download( + self.manifest, timeout=ClientTimeout( total=None, sock_connect=30, - sock_read=self.read_timeout, ), - ) as download_stream: - logged_step = 0 - while not download_stream.at_eof(): - step = min(step_size, self.manifest.datasetSize - self.bytes_downloaded) - bytes_read = await download_stream.read(step) - # We actually have no guarantees that an empty read means EOF, so we just back off - # a bit. - if not bytes_read: - await asyncio.sleep(EMPTY_STREAM_BACKOFF) - self.bytes_downloaded += len(bytes_read) + ) - if int(self.bytes_downloaded / step_size) > logged_step: - logged_step += 1 - logger.info( - DownloadMetric( - dataset_name=self.manifest.filename, - handle=self.manifest.cid, - value=step_size * logged_step, - node=self.parent.node_id, - ) - ) + logger.info(f"Start download monitoring loop for {download_id}") - if self.bytes_downloaded < self.manifest.datasetSize: - raise EOFError( - f"Got EOF too early: download size ({self.bytes_downloaded}) was less " - f"than expected ({self.manifest.datasetSize})." - ) + current_step = 0 + last_progress = time.time() + while True: + has_progress = False + progress = await self.parent.client.download_status(download_id) + self._publish_progress(progress) + while current_step < (progress.downloaded / step_size): + has_progress = True + current_step += 1 + self._log_progress(current_step * step_size) - if self.bytes_downloaded > self.manifest.datasetSize: - raise ValueError( - f"Download size ({self.bytes_downloaded}) was greater than expected " - f"({self.manifest.datasetSize})." + if not has_progress: + # Backs off for a bit if we haven't seen any progress. + await asyncio.sleep(self.status_backoff) + else: + last_progress = time.time() + + if progress.is_complete(): + # If step_size is not a multiple of 1/log_increment, we have a trailing step for the + # remainder. + if current_step * step_size < self.manifest.block_count: + self._log_progress(current_step * step_size) + break + + if time.time() - last_progress > self.progress_timeout: + raise asyncio.TimeoutError( + f"Download made no progress in more than {self.progress_timeout} seconds" ) def progress(self) -> DownloadStatus: - if self.download_task is None: - return DownloadStatus(downloaded=0, total=self.manifest.datasetSize) + return self._progress - if self.download_task.done(): - # This will bubble exceptions up, if any. - self.download_task.result() + def _publish_progress(self, status: DownloadStatus): + self._progress = DownloadStatus( + downloaded=status.downloaded * self.manifest.blockSize, + total=status.total * self.manifest.blockSize, + ) - return DownloadStatus( - downloaded=self.bytes_downloaded, total=self.manifest.datasetSize + def _log_progress(self, downloaded: int): + logger.info( + DownloadMetric( + dataset_name=self.manifest.filename, + value=downloaded * self.manifest.blockSize, + node=self.parent.node_id, + ) ) class CodexAgent: + """:class:`CodexAgent` interacts with the Codex node locally through its REST API + providing the higher-level API required by the benchmarking experiments.""" + def __init__( self, client: AsyncCodexClient, node_id: str = "unknown", - read_timeout: Optional[float] = None, + status_backoff: float = STATUS_BACKOFF, + progress_timeout: float = PROGRESS_TIMEOUT, ) -> None: self.client = client self.node_id = node_id self.ongoing_downloads: Dict[Cid, DownloadHandle] = {} - self.read_timeout = read_timeout + self.status_backoff = status_backoff + self.progress_timeout = progress_timeout - async def create_dataset(self, name: str, size: int, seed: Optional[int]) -> Cid: + async def create_dataset( + self, name: str, size: int, seed: Optional[int] + ) -> Manifest: + """ + Creates a random dataset and uploads it to the Codex node. + + :param name: the name of the dataset to be created. + :param size: the size of the dataset to be created, in bytes. + :param seed: the seed to be used for generating the random dataset. Using the + same seed will generate the exact same dataset. + + :return: the :class:`Manifest` block for the dataset. + """ with TemporaryDirectory() as td: data = Path(td) / "datafile.bin" @@ -123,22 +139,37 @@ class CodexAgent: random_data(size=size, outfile=outfile, seed=seed) with data.open(mode="rb") as infile: - return await self.client.upload( + cid = await self.client.upload( name=name, mime_type="application/octet-stream", content=infile ) - async def download(self, cid: Cid, read_increment: float = 0.01) -> DownloadHandle: - if cid in self.ongoing_downloads: - return self.ongoing_downloads[cid] + return await self.client.manifest(cid) + async def download( + self, manifest: Manifest, log_increment: float = 0.01 + ) -> DownloadHandle: + """ + Downloads the dataset with the given CID from the Codex node, tracking and logging + its progress until it is complete. + + :param manifest: the Manifest or the dataset to be downloaded. + :param log_increment: + how often to log progress, in terms of download completion fraction. E.g., 0.1 + will log progress every 10% of the download completed. + + :return: a :class:`DownloadHandle` object that can be used to return the current + progress. The experiment controller will typically ask for this periodically + to figure out if the download is complete. + """ handle = DownloadHandle( self, - manifest=await self.client.manifest(cid), - read_increment=read_increment, - read_timeout=self.read_timeout, + manifest=manifest, + log_increment=log_increment, + status_backoff=self.status_backoff, + progress_timeout=self.progress_timeout, ) handle.begin_download() - self.ongoing_downloads[cid] = handle + self.ongoing_downloads[manifest.treeCid] = handle return handle diff --git a/benchmarks/codex/agent/api.py b/benchmarks/codex/agent/api.py index 95b7d20..cc92f5d 100644 --- a/benchmarks/codex/agent/api.py +++ b/benchmarks/codex/agent/api.py @@ -1,5 +1,6 @@ """This module contains a REST API wrapping :class:`CodexAgent`.""" +import logging from typing import Annotated, Optional from aiohttp import ClientResponseError @@ -10,10 +11,13 @@ from urllib3.util import parse_url from benchmarks.codex.agent.agent import CodexAgent, DownloadStatus from benchmarks.codex.client.async_client import AsyncCodexClientImpl +from benchmarks.codex.client.common import Manifest from benchmarks.core.agent import AgentBuilder router = APIRouter() +logger = logging.getLogger(__name__) + def codex_agent() -> CodexAgent: raise Exception("Dependency must be set") @@ -30,21 +34,22 @@ async def generate( name: str, size: int, seed: Optional[int], -): - return Response( - await agent.create_dataset(name=name, size=size, seed=seed), - media_type="text/plain; charset=UTF-8", - ) +) -> Manifest: + return await agent.create_dataset(name=name, size=size, seed=seed) @router.post("/api/v1/codex/download") async def download( - request: Request, agent: Annotated[CodexAgent, Depends(codex_agent)], cid: str + request: Request, + agent: Annotated[CodexAgent, Depends(codex_agent)], + manifest: Manifest, ): - await agent.download(cid) + await agent.download(manifest) return JSONResponse( status_code=202, - content={"status": str(request.url_for("download_status", cid=cid))}, + content={ + "status": str(request.url_for("download_status", cid=manifest.treeCid)) + }, ) @@ -52,12 +57,23 @@ async def download( async def download_status( agent: Annotated[CodexAgent, Depends(codex_agent)], cid: str ) -> DownloadStatus: - if cid not in agent.ongoing_downloads: + download = agent.ongoing_downloads.get(cid) + if download is None: raise HTTPException( status_code=404, detail=f"There are no ongoing downloads for CID {cid}" ) - return agent.ongoing_downloads[cid].progress() + assert download.download_task is not None + + if download.download_task.done(): + exception = download.download_task.exception() + if exception is not None: + logger.error("Error during download:", exc_info=exception) + raise HTTPException( + status_code=500, detail=f"Error during download: {exception}" + ) + + return download.progress() @router.get("/api/v1/codex/download/node-id") diff --git a/benchmarks/codex/agent/codex_agent_client.py b/benchmarks/codex/agent/codex_agent_client.py index df8d439..d44db02 100644 --- a/benchmarks/codex/agent/codex_agent_client.py +++ b/benchmarks/codex/agent/codex_agent_client.py @@ -6,8 +6,8 @@ import requests from requests.exceptions import ConnectionError from urllib3.util import Url, parse_url -from benchmarks.codex.agent.agent import DownloadStatus -from benchmarks.codex.client.common import Cid +from benchmarks.codex.client.async_client import DownloadStatus +from benchmarks.codex.client.common import Manifest from benchmarks.core.experiments.experiments import ExperimentComponent @@ -22,7 +22,7 @@ class CodexAgentClient(ExperimentComponent): except (ConnectionError, socket.gaierror): return False - def generate(self, size: int, seed: int, name: str) -> Cid: + def generate(self, size: int, seed: int, name: str) -> Manifest: response = requests.post( url=self.url._replace(path="/api/v1/codex/dataset").url, params={ @@ -34,14 +34,12 @@ class CodexAgentClient(ExperimentComponent): response.raise_for_status() - return response.text + return Manifest.model_validate(response.json()) - def download(self, cid: str) -> Url: + def download(self, manifest: Manifest) -> Url: response = requests.post( url=self.url._replace(path="/api/v1/codex/download").url, - params={ - "cid": cid, - }, + json=manifest.model_dump(mode="json"), ) response.raise_for_status() diff --git a/benchmarks/codex/agent/tests/fake_codex.py b/benchmarks/codex/agent/tests/fake_codex.py index 7ef4983..d536b17 100644 --- a/benchmarks/codex/agent/tests/fake_codex.py +++ b/benchmarks/codex/agent/tests/fake_codex.py @@ -1,22 +1,32 @@ -import json -import re -from asyncio import StreamReader -from contextlib import asynccontextmanager -from io import BytesIO -from typing import Dict, Optional, AsyncIterator, Tuple, IO +from typing import Dict, Optional, IO -from aiohttp import web, ClientTimeout -from urllib3.util import Url +from aiohttp import ClientTimeout -from benchmarks.codex.client.async_client import AsyncCodexClient, Cid +from benchmarks.codex.client.async_client import AsyncCodexClient, Cid, DownloadStatus from benchmarks.codex.client.common import Manifest -from benchmarks.core.utils.streams import BaseStreamReader + + +class FakeDownload: + def __init__(self, manifest: Manifest) -> None: + self.manifest = manifest + self.downloaded = 0 + self.exception: Optional[Exception] = None + + def advance_download(self, blocks: int): + self.downloaded += blocks + print("Advance download to", self.downloaded) + assert ( + self.downloaded <= self.manifest.block_count + ), "Downloaded blocks exceed total blocks" + + def abort(self, exception: Exception): + self.exception = exception class FakeCodex(AsyncCodexClient): def __init__(self) -> None: - self.storage: Dict[Cid, Manifest] = {} - self.streams: Dict[Cid, StreamReader] = {} + self.manifests: Dict[Cid, Manifest] = {} + self.ongoing_downloads: Dict[Cid, FakeDownload] = {} async def upload( self, @@ -27,89 +37,38 @@ class FakeCodex(AsyncCodexClient): ) -> Cid: data = content.read() cid = "Qm" + str(hash(data)) - self.storage[cid] = Manifest( + self.manifests[cid] = Manifest( cid=cid, + treeCid=f"{cid}treeCid", datasetSize=len(data), mimetype=mime_type, blockSize=1, filename=name, - treeCid="", protected=False, ) return cid async def manifest(self, cid: Cid) -> Manifest: - return self.storage[cid] + return self.manifests[cid] - def create_download_stream(self, cid: Cid) -> StreamReader: - reader = StreamReader() - self.streams[cid] = reader - return reader - - @asynccontextmanager async def download( - self, cid: Cid, timeout: Optional[ClientTimeout] = None - ) -> AsyncIterator[BaseStreamReader]: - yield self.streams[cid] + self, manifest: Manifest, timeout: Optional[ClientTimeout] = None + ) -> Cid: + if manifest.treeCid not in self.ongoing_downloads: + raise ValueError("You need to create a " "download before initiating it") + return manifest.treeCid + def new_download(self, manifest: Manifest) -> FakeDownload: + """Creates a download which we can then use to simulate progress.""" + handle = FakeDownload(manifest) + self.ongoing_downloads[manifest.treeCid] = handle + return handle -@asynccontextmanager -async def fake_codex_api() -> AsyncIterator[Tuple[FakeCodex, Url]]: - codex = FakeCodex() - routes = web.RouteTableDef() - - @routes.get("/api/codex/v1/data/{cid}/network/manifest") - async def manifest(request): - cid = request.match_info["cid"] - assert cid in codex.storage - # Gets the manifest in a similar shape as the Codex response. - manifest = json.loads(codex.storage[cid].model_dump_json()) - return web.json_response( - data={ - "cid": manifest.pop("cid"), - "manifest": manifest, - } + async def download_status(self, dataset: Cid) -> DownloadStatus: + download = self.ongoing_downloads[dataset] + if download.exception: + raise download.exception + return DownloadStatus( + downloaded=download.downloaded, + total=download.manifest.block_count, ) - - @routes.post("/api/codex/v1/data") - async def upload(request): - await request.post() - filename = re.findall( - r'filename="(.+)"', request.headers["Content-Disposition"] - )[0] - cid = await codex.upload( - name=filename, - mime_type=request.headers["Content-Type"], - content=BytesIO(await request.read()), - ) - return web.Response(text=cid) - - @routes.get("/api/codex/v1/data/{cid}") - async def download(request): - cid = request.match_info["cid"] - assert cid in codex.streams - reader = codex.streams[cid] - - # We basically copy the stream onto the response. - response = web.StreamResponse() - await response.prepare(request) - while not reader.at_eof(): - await response.write(await reader.read(1024)) - - await response.write_eof() - return response - - app = web.Application() - app.add_routes(routes) - - runner = web.AppRunner(app) - await runner.setup() - - site = web.TCPSite(runner, "localhost", 8888) - await site.start() - - try: - yield codex, Url(scheme="http", host="localhost", port=8888) - finally: - await site.stop() - await runner.cleanup() diff --git a/benchmarks/codex/agent/tests/test_api.py b/benchmarks/codex/agent/tests/test_api.py index c5b76ae..78727c1 100644 --- a/benchmarks/codex/agent/tests/test_api.py +++ b/benchmarks/codex/agent/tests/test_api.py @@ -6,6 +6,7 @@ from starlette.testclient import TestClient from benchmarks.codex.agent import api from benchmarks.codex.agent.agent import CodexAgent from benchmarks.codex.agent.tests.fake_codex import FakeCodex +from benchmarks.codex.client.common import Manifest @pytest.mark.asyncio @@ -25,9 +26,8 @@ async def test_should_create_file(): ) assert response.status_code == 200 - assert response.charset_encoding == "utf-8" - manifest = await codex_client.manifest(response.text) + manifest = Manifest.model_validate(response.json()) assert manifest.datasetSize == 1024 @@ -50,28 +50,24 @@ async def test_should_report_when_download_is_complete(): ) assert response.status_code == 200 - assert response.charset_encoding == "utf-8" + manifest = Manifest.model_validate(response.json()) - cid = response.text - - download_stream = codex_client.create_download_stream(cid) + fake_download = codex_client.new_download(manifest) response = await client.post( - "/api/v1/codex/download", - params={"cid": cid}, + "/api/v1/codex/download", json=manifest.model_dump(mode="json") ) assert response.status_code == 202 assert response.json() == { - "status": f"http://testserver/api/v1/codex/download/{cid}/status" + "status": f"http://testserver/api/v1/codex/download/{manifest.treeCid}/status" } - download_stream.feed_data(b"0" * 1024) - download_stream.feed_eof() + fake_download.advance_download(blocks=1024) - await codex_agent.ongoing_downloads[cid].download_task + await codex_agent.ongoing_downloads[manifest.treeCid].download_task - response = await client.get(f"api/v1/codex/download/{cid}/status") + response = await client.get(f"api/v1/codex/download/{manifest.treeCid}/status") assert response.status_code == 200 assert response.json() == {"downloaded": 1024, "total": 1024} diff --git a/benchmarks/codex/agent/tests/test_codex_agent.py b/benchmarks/codex/agent/tests/test_codex_agent.py index 9aea254..15e956c 100644 --- a/benchmarks/codex/agent/tests/test_codex_agent.py +++ b/benchmarks/codex/agent/tests/test_codex_agent.py @@ -5,8 +5,7 @@ from unittest.mock import patch import pytest from benchmarks.codex.agent.agent import CodexAgent, DownloadStatus -from benchmarks.codex.agent.tests.fake_codex import FakeCodex, fake_codex_api -from benchmarks.codex.client.async_client import AsyncCodexClientImpl +from benchmarks.codex.agent.tests.fake_codex import FakeCodex from benchmarks.core.concurrency import await_predicate_async from benchmarks.logging.logging import LogParser, DownloadMetric @@ -14,8 +13,7 @@ from benchmarks.logging.logging import LogParser, DownloadMetric @pytest.mark.asyncio async def test_should_create_dataset_of_right_size(): codex_agent = CodexAgent(FakeCodex()) - cid = await codex_agent.create_dataset(size=1024, name="dataset-1", seed=1234) - manifest = await codex_agent.client.manifest(cid) + manifest = await codex_agent.create_dataset(size=1024, name="dataset-1", seed=1234) assert manifest.datasetSize == 1024 @@ -24,35 +22,33 @@ async def test_should_create_dataset_of_right_size(): async def test_same_seed_creates_same_cid(): codex_agent = CodexAgent(FakeCodex()) - cid1 = await codex_agent.create_dataset(size=2048, name="dataset-1", seed=1234) - cid2 = await codex_agent.create_dataset(size=2048, name="dataset-1", seed=1234) - cid3 = await codex_agent.create_dataset(size=2048, name="dataset-1", seed=1235) + manifest1 = await codex_agent.create_dataset(size=2048, name="dataset-1", seed=1234) + manifest2 = await codex_agent.create_dataset(size=2048, name="dataset-1", seed=1234) + manifest3 = await codex_agent.create_dataset(size=2048, name="dataset-1", seed=1235) - assert cid1 == cid2 - assert cid1 != cid3 + assert manifest1.cid == manifest2.cid + assert manifest1.cid != manifest3.cid @pytest.mark.asyncio async def test_should_report_download_progress(): client = FakeCodex() - codex_agent = CodexAgent(client) + codex_agent = CodexAgent(client, status_backoff=0.01) - cid = await codex_agent.create_dataset(size=1000, name="dataset-1", seed=1234) - download_stream = client.create_download_stream(cid) - - handle = await codex_agent.download(cid) + manifest = await codex_agent.create_dataset(size=1000, name="dataset-1", seed=1234) + fake_download = client.new_download(manifest) + handle = await codex_agent.download(manifest) assert handle.progress() == DownloadStatus(downloaded=0, total=1000) for i in range(200): - download_stream.feed_data(b"0" * 5) + fake_download.advance_download(blocks=5) assert await await_predicate_async( lambda: handle.progress() == DownloadStatus(downloaded=5 * (i + 1), total=1000), timeout=5, ) - download_stream.feed_eof() await handle.download_task assert handle.progress() == DownloadStatus(downloaded=1000, total=1000) @@ -63,14 +59,17 @@ async def test_should_raise_exception_on_progress_query_if_download_fails(): client = FakeCodex() codex_agent = CodexAgent(client) - cid = await codex_agent.create_dataset(size=1000, name="dataset-1", seed=1234) - download_stream = client.create_download_stream(cid) + manifest = await codex_agent.create_dataset(size=1000, name="dataset-1", seed=1234) + fake_download = client.new_download(manifest) - handle = await codex_agent.download(cid) + handle = await codex_agent.download(manifest) - download_stream.feed_eof() + class SomeError(Exception): + pass - with pytest.raises(EOFError): + fake_download.abort(SomeError()) + + with pytest.raises(SomeError): await handle.download_task @@ -82,13 +81,14 @@ async def test_should_log_download_progress_as_metric_in_discrete_steps(mock_log client = FakeCodex() codex_agent = CodexAgent(client) - cid = await codex_agent.create_dataset(size=1000, name="dataset-1", seed=1234) + manifest = await codex_agent.create_dataset( + size=1000, name="dataset-1", seed=1234 + ) + fake_download = client.new_download(manifest) - download_stream = client.create_download_stream(cid) - download_stream.feed_data(b"0" * 1000) - download_stream.feed_eof() + fake_download.advance_download(1000) - handle = await codex_agent.download(cid, read_increment=0.2) + handle = await codex_agent.download(manifest, log_increment=0.2) await handle.download_task parser = LogParser() @@ -99,35 +99,30 @@ async def test_should_log_download_progress_as_metric_in_discrete_steps(mock_log assert metrics == [ DownloadMetric( dataset_name="dataset-1", - handle=cid, value=200, node=codex_agent.node_id, timestamp=metrics[0].timestamp, ), DownloadMetric( dataset_name="dataset-1", - handle=cid, value=400, node=codex_agent.node_id, timestamp=metrics[1].timestamp, ), DownloadMetric( dataset_name="dataset-1", - handle=cid, value=600, node=codex_agent.node_id, timestamp=metrics[2].timestamp, ), DownloadMetric( dataset_name="dataset-1", - handle=cid, value=800, node=codex_agent.node_id, timestamp=metrics[3].timestamp, ), DownloadMetric( dataset_name="dataset-1", - handle=cid, value=1000, node=codex_agent.node_id, timestamp=metrics[4].timestamp, @@ -143,24 +138,24 @@ async def test_should_log_download_progress_as_discrete_steps_even_when_underlyi with patch("benchmarks.codex.agent.agent.logger", logger): client = FakeCodex() - codex_agent = CodexAgent(client) - cid = await codex_agent.create_dataset(size=1000, name="dataset-1", seed=1234) - download_stream = client.create_download_stream(cid) - handle = await codex_agent.download(cid, read_increment=0.2) - + codex_agent = CodexAgent(client, status_backoff=0.01) + manifest = await codex_agent.create_dataset( + size=1000, name="dataset-1", seed=1234 + ) + fake_download = client.new_download(manifest) + handle = await codex_agent.download(manifest, log_increment=0.2) # Simulates a choppy download which returns a lot less than the logging step size every time. fed = 0 step = 37 while fed < 1000: to_feed = min(step, 1000 - fed) - download_stream.feed_data(b"0" * to_feed) + fake_download.advance_download(to_feed) fed += to_feed assert await await_predicate_async( lambda: handle.progress() == DownloadStatus(downloaded=fed, total=1000), timeout=5, ) - download_stream.feed_eof() await handle.download_task parser = LogParser() @@ -171,35 +166,30 @@ async def test_should_log_download_progress_as_discrete_steps_even_when_underlyi assert metrics == [ DownloadMetric( dataset_name="dataset-1", - handle=cid, value=200, node=codex_agent.node_id, timestamp=metrics[0].timestamp, ), DownloadMetric( dataset_name="dataset-1", - handle=cid, value=400, node=codex_agent.node_id, timestamp=metrics[1].timestamp, ), DownloadMetric( dataset_name="dataset-1", - handle=cid, value=600, node=codex_agent.node_id, timestamp=metrics[2].timestamp, ), DownloadMetric( dataset_name="dataset-1", - handle=cid, value=800, node=codex_agent.node_id, timestamp=metrics[3].timestamp, ), DownloadMetric( dataset_name="dataset-1", - handle=cid, value=1000, node=codex_agent.node_id, timestamp=metrics[4].timestamp, @@ -212,49 +202,38 @@ async def test_should_track_download_handles(): client = FakeCodex() codex_agent = CodexAgent(client) - cid = await codex_agent.create_dataset(size=1000, name="dataset-1", seed=1356) + manifest = await codex_agent.create_dataset(size=1000, name="dataset-1", seed=1356) + fake_download = client.new_download(manifest) - assert cid not in codex_agent.ongoing_downloads + assert manifest.treeCid not in codex_agent.ongoing_downloads - download_stream = client.create_download_stream(cid) - handle = await codex_agent.download(cid) - - download_stream.feed_data(b"0" * 1000) - download_stream.feed_eof() - - assert codex_agent.ongoing_downloads[cid] == handle + handle = await codex_agent.download(manifest) + assert codex_agent.ongoing_downloads[manifest.treeCid] == handle + fake_download.advance_download(1000) await handle.download_task - - assert cid in codex_agent.ongoing_downloads + assert manifest.treeCid in codex_agent.ongoing_downloads @pytest.mark.asyncio -async def test_should_timeout_if_download_stream_takes_too_long_to_return_content(): - async with fake_codex_api() as (fake_codex, url): - client = AsyncCodexClientImpl(url) - codex_agent = CodexAgent(client, read_timeout=0.5) +async def test_should_timeout_if_download_goes_for_too_long_without_any_progress(): + fake_codex = FakeCodex() + codex_agent = CodexAgent(fake_codex, status_backoff=0.01, progress_timeout=0.5) - fast_cid = await codex_agent.create_dataset( - size=1000, name="dataset-fast-1", seed=1356 - ) - slow_cid = await codex_agent.create_dataset( - size=1000, name="dataset-slow-1", seed=1353 - ) + fast = await codex_agent.create_dataset(size=1000, name="dataset-fast-1", seed=1356) + slow = await codex_agent.create_dataset(size=1000, name="dataset-slow-1", seed=1353) - fast_download = fake_codex.create_download_stream(fast_cid) - slow_download = fake_codex.create_download_stream(slow_cid) + fast_download = fake_codex.new_download(fast) + slow_download = fake_codex.new_download(slow) - fast_download.feed_data(b"0" * 1000) - fast_download.feed_eof() - fast_handle = await codex_agent.download(fast_cid) - await fast_handle.download_task + fast_download.advance_download(1000) + fast_handle = await codex_agent.download(fast) + await fast_handle.download_task - slow_handle = await codex_agent.download(slow_cid) - slow_download.feed_data(b"0" * 500) - await asyncio.sleep(0.6) - slow_download.feed_data(b"0" * 500) - slow_download.feed_eof() + slow_handle = await codex_agent.download(slow) + slow_download.advance_download(500) + await asyncio.sleep(0.6) + slow_download.advance_download(500) - with pytest.raises(asyncio.TimeoutError): - await slow_handle.download_task + with pytest.raises(asyncio.TimeoutError): + await slow_handle.download_task diff --git a/benchmarks/codex/client/async_client.py b/benchmarks/codex/client/async_client.py index c3379af..4ef2323 100644 --- a/benchmarks/codex/client/async_client.py +++ b/benchmarks/codex/client/async_client.py @@ -1,16 +1,25 @@ """Async Client implementation for the base Codex API.""" from abc import ABC, abstractmethod - -from contextlib import asynccontextmanager -from typing import IO, AsyncIterator, AsyncGenerator, Optional +from typing import IO, Optional import aiohttp from aiohttp import ClientTimeout +from pydantic import BaseModel from urllib3.util import Url from benchmarks.codex.client.common import Manifest, Cid -from benchmarks.core.utils.streams import BaseStreamReader + + +class DownloadStatus(BaseModel): + downloaded: int + total: int + + def as_percent(self) -> float: + return (self.downloaded * 100) / self.total + + def is_complete(self) -> bool: + return self.downloaded == self.total class AsyncCodexClient(ABC): @@ -28,11 +37,14 @@ class AsyncCodexClient(ABC): async def manifest(self, cid: Cid) -> Manifest: pass - @asynccontextmanager @abstractmethod - def download( - self, cid: Cid, timeout: Optional[ClientTimeout] = None - ) -> AsyncGenerator[BaseStreamReader, None]: + async def download( + self, manifest: Manifest, timeout: Optional[ClientTimeout] = None + ) -> Cid: + pass + + @abstractmethod + async def download_status(self, dataset: Cid) -> DownloadStatus: pass @@ -77,16 +89,44 @@ class AsyncCodexClientImpl(AsyncCodexClient): return Manifest.from_codex_api_response(response_contents) - @asynccontextmanager async def download( - self, cid: Cid, timeout: Optional[ClientTimeout] = None - ) -> AsyncIterator[BaseStreamReader]: + self, manifest: Manifest, timeout: Optional[ClientTimeout] = None + ) -> Cid: async with aiohttp.ClientSession(timeout=ClientTimeout()) as session: - response = await session.get( - self.codex_api_url._replace(path=f"/api/codex/v1/data/{cid}").url, - timeout=timeout, + response = await session.post( + self.codex_api_url._replace(path="/api/codex/v1/download").url, + json={ + "cid": manifest.cid, + "manifest": manifest.model_dump(exclude={"cid"}, mode="json"), + }, ) response.raise_for_status() + response_contents = await response.json() - yield response.content + return response_contents["downloadId"] + + async def download_status(self, dataset: Cid) -> DownloadStatus: + async with aiohttp.ClientSession() as session: + response = await session.get( + self.codex_api_url._replace( + path=f"/api/codex/v1/download/{dataset}" + ).url, + ) + + response.raise_for_status() + response_contents = await response.json() + + return DownloadStatus( + downloaded=response_contents["downloaded"], total=response_contents["total"] + ) + + async def leave_swarm(self, dataset: Cid) -> None: + async with aiohttp.ClientSession() as session: + response = await session.delete( + self.codex_api_url._replace( + path=f"/api/codex/v1/download/{dataset}" + ).url, + ) + + response.raise_for_status() diff --git a/benchmarks/codex/client/common.py b/benchmarks/codex/client/common.py index c4d99e2..822ab15 100644 --- a/benchmarks/codex/client/common.py +++ b/benchmarks/codex/client/common.py @@ -1,3 +1,5 @@ +import math + from pydantic import BaseModel API_VERSION = "v1" @@ -14,6 +16,10 @@ class Manifest(BaseModel): mimetype: str protected: bool + @property + def block_count(self) -> int: + return math.ceil(self.datasetSize / self.blockSize) + @staticmethod def from_codex_api_response(response: dict) -> "Manifest": return Manifest.model_validate( diff --git a/benchmarks/codex/client/tests/test_async_client.py b/benchmarks/codex/client/tests/test_async_client.py index 7ea83ad..77d1212 100644 --- a/benchmarks/codex/client/tests/test_async_client.py +++ b/benchmarks/codex/client/tests/test_async_client.py @@ -4,6 +4,7 @@ import pytest from urllib3.util import parse_url from benchmarks.codex.client.async_client import AsyncCodexClientImpl +from benchmarks.core.concurrency import await_predicate_async from benchmarks.core.utils.random import random_data from benchmarks.core.utils.units import megabytes @@ -28,14 +29,21 @@ async def test_should_download_file(codex_node_1_url: str): client = AsyncCodexClientImpl(parse_url(codex_node_1_url)) data = BytesIO() - random_data(megabytes(1), data) + random_data(megabytes(5), data) cid = await client.upload( "test.txt", "application/octet-stream", BytesIO(data.getvalue()) ) assert cid is not None - async with client.download(cid) as content: - downloaded = await content.readexactly(megabytes(1)) + manifest = await client.manifest(cid) + dataset_cid = await client.download(manifest) - assert downloaded == data.getvalue() + async def is_complete(): + status = await client.download_status(dataset_cid) + assert status.total == manifest.block_count + return status.is_complete() + + await await_predicate_async(is_complete, timeout=10) + + await client.leave_swarm(dataset_cid) diff --git a/benchmarks/codex/codex_node.py b/benchmarks/codex/codex_node.py index 3c2e924..98d0a14 100644 --- a/benchmarks/codex/codex_node.py +++ b/benchmarks/codex/codex_node.py @@ -17,6 +17,7 @@ from urllib3.util import Url from benchmarks.codex.agent.agent import Cid, DownloadStatus from benchmarks.codex.agent.codex_agent_client import CodexAgentClient +from benchmarks.codex.client.common import Manifest from benchmarks.core.concurrency import await_predicate from benchmarks.core.experiments.experiments import ExperimentComponent from benchmarks.core.network import Node, DownloadHandle @@ -34,12 +35,18 @@ class CodexMeta: name: str -class CodexNode(Node[Cid, CodexMeta], ExperimentComponent): - def __init__(self, codex_api_url: Url, agent: CodexAgentClient) -> None: +class CodexNode(Node[Manifest, CodexMeta], ExperimentComponent): + def __init__( + self, + codex_api_url: Url, + agent: CodexAgentClient, + remove_data: bool = False, + ) -> None: self.codex_api_url = codex_api_url self.agent = agent # Lightweight tracking of datasets created by this node. It's OK if we lose them. - self.hosted_datasets: Set[Cid] = set() + self.hosted_datasets: dict[Cid, Manifest] = {} + self.remove_data = remove_data def is_ready(self) -> bool: try: @@ -55,33 +62,72 @@ class CodexNode(Node[Cid, CodexMeta], ExperimentComponent): wait=WAIT_POLICY, retry=retry_if_not_exception_type(HTTPError), ) - def genseed(self, size: int, seed: int, meta: CodexMeta) -> Cid: - cid = self.agent.generate(size=size, seed=seed, name=meta.name) - self.hosted_datasets.add(cid) - return cid + def genseed(self, size: int, seed: int, meta: CodexMeta) -> Manifest: + manifest = self.agent.generate(size=size, seed=seed, name=meta.name) + # Joins the swarm. + self.agent.download(manifest) + self.hosted_datasets[manifest.treeCid] = manifest + return manifest @retry( stop=STOP_POLICY, wait=WAIT_POLICY, retry=retry_if_not_exception_type(HTTPError), ) - def leech(self, handle: Cid) -> DownloadHandle: - self.hosted_datasets.add(handle) + def leech(self, handle: Manifest) -> DownloadHandle: + self.hosted_datasets[handle.treeCid] = handle return CodexDownloadHandle(parent=self, monitor_url=self.agent.download(handle)) - def remove(self, handle: Cid) -> bool: + def remove(self, handle: Manifest) -> bool: + # Removes node from the swarm. + success = self._leave_swarm(handle) + + # Removes the data, if requested. + if self.remove_data: + success &= self._purge_local_data(handle) + + return success + + def swarms(self) -> Set[Cid]: + response = requests.get( + str(self.codex_api_url._replace(path="/api/codex/v1/download")) + ) + response.raise_for_status() + + return set(response.json()["activeSwarms"]) + + def _leave_swarm(self, handle: Manifest) -> bool: response = requests.delete( - str(self.codex_api_url._replace(path=f"/api/codex/v1/data/{handle}")), - timeout=DELETE_TIMEOUT, + str( + self.codex_api_url._replace( + path=f"/api/codex/v1/download/{handle.treeCid}" + ) + ), ) - response.raise_for_status() + raise_if_server_error(response) + + return response.ok + + def _purge_local_data(self, handle: Manifest) -> bool: + # Purge data on disk, if requested. + if self.remove_data: + response = requests.delete( + str( + self.codex_api_url._replace(path=f"/api/codex/v1/data/{handle.cid}") + ), + timeout=DELETE_TIMEOUT, + ) + + raise_if_server_error(response) + return response.ok + return True - def exists_local(self, handle: Cid) -> bool: + def exists_local(self, handle: Manifest) -> bool: """Check if a dataset exists on the node.""" response = requests.get( - str(self.codex_api_url._replace(path=f"/api/codex/v1/data/{handle}")) + str(self.codex_api_url._replace(path=f"/api/codex/v1/data/{handle.cid}")) ) response.close() @@ -95,12 +141,12 @@ class CodexNode(Node[Cid, CodexMeta], ExperimentComponent): return True def download_local( - self, handle: Cid, chunk_size: int = megabytes(1) + self, handle: Manifest, chunk_size: int = megabytes(1) ) -> Iterator[bytes]: """Retrieves the contents of a locally available dataset from the node.""" response = requests.get( - str(self.codex_api_url._replace(path=f"/api/codex/v1/data/{handle}")) + str(self.codex_api_url._replace(path=f"/api/codex/v1/data/{handle.cid}")) ) response.raise_for_status() @@ -108,9 +154,9 @@ class CodexNode(Node[Cid, CodexMeta], ExperimentComponent): return response.iter_content(chunk_size=chunk_size) def wipe_all_datasets(self): - for dataset in list(self.hosted_datasets): + for dataset in list(self.hosted_datasets.values()): self.remove(dataset) - self.hosted_datasets.remove(dataset) + del self.hosted_datasets[dataset.treeCid] @cached_property def name(self) -> str: @@ -141,3 +187,9 @@ class CodexDownloadHandle(DownloadHandle): response.raise_for_status() return DownloadStatus.model_validate(response.json()) + + +def raise_if_server_error(response: requests.Response): + """Raises an exception if the server returns a 5xx error.""" + if response.status_code >= 500 or response.status_code != 404: + response.raise_for_status() diff --git a/benchmarks/codex/config.py b/benchmarks/codex/config.py index f856659..3f02233 100644 --- a/benchmarks/codex/config.py +++ b/benchmarks/codex/config.py @@ -74,6 +74,7 @@ class CodexExperimentConfig( ) download_metric_unit_bytes: int = 1 + remove_data: bool = False def build(self) -> CodexDisseminationExperiment: node_specs = ( @@ -90,6 +91,7 @@ class CodexExperimentConfig( CodexNode( codex_api_url=parse_url(f"http://{str(node.address)}:{node.api_port}"), agent=agents[i], + remove_data=self.remove_data, ) for i, node in enumerate(node_specs) ] diff --git a/benchmarks/codex/tests/fixtures/fixtures.py b/benchmarks/codex/tests/fixtures/fixtures.py index 5f4dfe8..fc61653 100644 --- a/benchmarks/codex/tests/fixtures/fixtures.py +++ b/benchmarks/codex/tests/fixtures/fixtures.py @@ -13,6 +13,7 @@ def codex_node(codex_api_url: str, agent_url: str) -> Iterator[CodexNode]: node = CodexNode( codex_api_url=parse_url(codex_api_url), agent=CodexAgentClient(parse_url(agent_url)), + remove_data=True, ) assert await_predicate(node.is_ready, timeout=10, polling_interval=0.5) diff --git a/benchmarks/codex/tests/test_codex_node.py b/benchmarks/codex/tests/test_codex_node.py index 7c8ea38..9469db9 100644 --- a/benchmarks/codex/tests/test_codex_node.py +++ b/benchmarks/codex/tests/test_codex_node.py @@ -9,12 +9,12 @@ from benchmarks.core.utils.units import megabytes @pytest.mark.codex_integration def test_should_download_file(codex_node1: CodexNode, codex_node2: CodexNode): - cid = codex_node1.genseed( + manifest = codex_node1.genseed( size=megabytes(1), seed=1234, meta=CodexMeta(name="dataset1"), ) - handle = codex_node2.leech(cid) + handle = codex_node2.leech(manifest) assert handle.await_for_completion(5) assert cast(CodexDownloadHandle, handle).completion() == DownloadStatus( @@ -22,6 +22,19 @@ def test_should_download_file(codex_node1: CodexNode, codex_node2: CodexNode): ) +@pytest.mark.codex_integration +def test_should_leave_swarm_on_remove(codex_node1: CodexNode): + manifest = codex_node1.genseed( + size=megabytes(1), + seed=1234, + meta=CodexMeta(name="dataset1"), + ) + assert codex_node1.swarms() == {manifest.treeCid} + + codex_node1.remove(manifest) + assert codex_node1.swarms() == set() + + @pytest.mark.codex_integration def test_should_remove_file(codex_node1: CodexNode): cid = codex_node1.genseed( diff --git a/benchmarks/codex/tests/test_codex_static_experiment.py b/benchmarks/codex/tests/test_codex_static_experiment.py index 39e110a..185e0a7 100644 --- a/benchmarks/codex/tests/test_codex_static_experiment.py +++ b/benchmarks/codex/tests/test_codex_static_experiment.py @@ -35,13 +35,13 @@ def test_should_run_with_a_single_seeder(codex_node1, codex_node2, codex_node3): experiment.setup() experiment.do_run() - all_datasets = list(codex_node1.hosted_datasets) + all_datasets = list(codex_node1.hosted_datasets.values()) assert len(all_datasets) == 1 - cid = all_datasets[0] + manifest = all_datasets[0] - content_1 = merge_chunks(codex_node1.download_local(cid)) - content_2 = merge_chunks(codex_node2.download_local(cid)) - content_3 = merge_chunks(codex_node3.download_local(cid)) + content_1 = merge_chunks(codex_node1.download_local(manifest)) + content_2 = merge_chunks(codex_node2.download_local(manifest)) + content_3 = merge_chunks(codex_node3.download_local(manifest)) assert len(content_1) == megabytes(2) assert content_1 == content_2 == content_3 diff --git a/config/codex/experiments.k8s.yaml b/config/codex/experiments.k8s.yaml index a26a8f9..e4c32d2 100644 --- a/config/codex/experiments.k8s.yaml +++ b/config/codex/experiments.k8s.yaml @@ -4,6 +4,9 @@ codex_experiment: seeders: ${SEEDERS} file_size: ${FILE_SIZE} repetitions: ${REPETITIONS} + # Should we delete the data at the end of each experiment (slower, uses less space), or we just + # leave it there (faster, uses more space)? + remove_data: ${REMOVE_DATA} # No need for cooldown as Codex takes forever to remove files, so there's plenty of time to log stuff. :-) logging_cooldown: 0 diff --git a/config/codex/experiments.local.yaml b/config/codex/experiments.local.yaml index f8d2d27..07eb16d 100644 --- a/config/codex/experiments.local.yaml +++ b/config/codex/experiments.local.yaml @@ -2,6 +2,7 @@ codex_experiment: seeders: 1 file_size: 52428800 repetitions: 3 + remove_data: true nodes: - name: codex-1 diff --git a/docker-compose-codex.local.yaml b/docker-compose-codex.local.yaml index a33c201..24bd36f 100644 --- a/docker-compose-codex.local.yaml +++ b/docker-compose-codex.local.yaml @@ -25,7 +25,7 @@ services: image: ${COMPOSE_CODEX_IMAGE:-codexstorage/nim-codex:latest} container_name: codex-1 environment: - - CODEX_LOG_LEVEL=DEBUG + - CODEX_LOG_LEVEL=DEBUG;trace:swarm,blockexcnetworkpeer - CODEX_DATA_DIR=/var/lib/codex - CODEX_DISC_PORT=6890 - CODEX_API_BINDADDR=0.0.0.0 @@ -52,7 +52,7 @@ services: image: ${COMPOSE_CODEX_IMAGE:-codexstorage/nim-codex:latest} container_name: codex-2 environment: - - CODEX_LOG_LEVEL=DEBUG + - CODEX_LOG_LEVEL=DEBUG;trace:swarm,blockexcnetworkpeer - CODEX_DATA_DIR=/var/lib/codex - CODEX_DISC_PORT=6892 - CODEX_API_BINDADDR=0.0.0.0 @@ -80,7 +80,7 @@ services: image: ${COMPOSE_CODEX_IMAGE:-codexstorage/nim-codex:latest} container_name: codex-3 environment: - - CODEX_LOG_LEVEL=DEBUG + - CODEX_LOG_LEVEL=DEBUG;trace:swarm,blockexcnetworkpeer - CODEX_DATA_DIR=/var/lib/codex - CODEX_DISC_PORT=6894 - CODEX_API_BINDADDR=0.0.0.0 diff --git a/k8s/charts/codex/templates/_helpers.tpl b/k8s/charts/codex/templates/_helpers.tpl index b1f6446..e5323b9 100644 --- a/k8s/charts/codex/templates/_helpers.tpl +++ b/k8s/charts/codex/templates/_helpers.tpl @@ -9,8 +9,13 @@ Expand the name of the chart. {{- mul $sizeNum (index $size $sizeUnit) -}} {{- end -}} -{{- define "codex.quota" }} -{{- div (mul (include "filesize.bytes" .) 13) 10 -}} +{{- define "experiment.count" -}} +{{- mul .Values.experiment.seederSets .Values.experiment.repetitions -}} +{{- end -}} + +{{- define "codex.quota" -}} +{{- $mulFactor := .Values.experiment.removeData | ternary 1 (include "experiment.count" .) -}} +{{- div (mul (mul (include "filesize.bytes" .) $mulFactor) 13) 10 -}} {{- end -}} {{- define "experiment.groupId" -}} diff --git a/k8s/charts/codex/templates/testrunner-job.yaml b/k8s/charts/codex/templates/testrunner-job.yaml index 8bc1d1a..2ceb34d 100644 --- a/k8s/charts/codex/templates/testrunner-job.yaml +++ b/k8s/charts/codex/templates/testrunner-job.yaml @@ -36,6 +36,8 @@ spec: value: {{ .Values.experiment.repetitions | quote }} - name: SEEDER_SETS value: {{ .Values.experiment.seederSets | quote }} + - name: REMOVE_DATA + value: {{ .Values.experiment.removeData | quote }} - name: FILE_SIZE value: {{ include "filesize.bytes" . | quote }} - name: CODEX_STATEFULSET diff --git a/k8s/charts/codex/values.yaml b/k8s/charts/codex/values.yaml index 19f4a2d..d24a70f 100644 --- a/k8s/charts/codex/values.yaml +++ b/k8s/charts/codex/values.yaml @@ -21,6 +21,10 @@ experiment: # If set to false, does not deploy a test runner (useful if you just want the network). testRunner: true + # If set to false, does not delete the data at the end of each experiment. This is faster + # than deleting it, but requires more space. + removeData: true + deployment: appName: ""