Merge 5b82e7f6d695f53b572c8c7354757efa5f00d6dd into cd77b20a82ea5c2c7ccea6d9d37ae10fda9b9949
1
.github/workflows/ci.yml
vendored
@ -13,6 +13,7 @@ env:
|
||||
DOCKER_REPO: codexstorage/bittorrent-benchmarks
|
||||
BUILD_ARGS: |
|
||||
BUILD_TYPE=test
|
||||
COMPOSE_CODEX_IMAGE: codexstorage/nim-codex:sha-f529a70
|
||||
|
||||
jobs:
|
||||
build-and-test:
|
||||
|
||||
BIN
analysis/presentation.short/images/nu-6.png
Normal file
|
After Width: | Height: | Size: 36 KiB |
BIN
analysis/presentation.short/images/nu-7.png
Normal file
|
After Width: | Height: | Size: 58 KiB |
BIN
analysis/presentation.short/images/un-1.png
Normal file
|
After Width: | Height: | Size: 74 KiB |
BIN
analysis/presentation.short/images/un-2.png
Normal file
|
After Width: | Height: | Size: 74 KiB |
BIN
analysis/presentation.short/images/un-3.png
Normal file
|
After Width: | Height: | Size: 74 KiB |
BIN
analysis/presentation.short/images/un-4.png
Normal file
|
After Width: | Height: | Size: 75 KiB |
BIN
analysis/presentation.short/images/un-5.png
Normal file
|
After Width: | Height: | Size: 75 KiB |
215
analysis/presentation.short/short-presentation.qmd
Normal file
@ -0,0 +1,215 @@
|
||||
---
|
||||
title: "Measuring Codex Performance for Content Delivery"
|
||||
subtitle: "(aka Codex Benchmarks)"
|
||||
format:
|
||||
revealjs:
|
||||
slide-number: true
|
||||
execute:
|
||||
cache: true
|
||||
---
|
||||
|
||||
<!--
|
||||
This is NOT self-contained. Compiling the presentation requires exporting
|
||||
the benchmarks table to a file. It also requires its own packages.
|
||||
-->
|
||||
|
||||
```{r echo = FALSE, warning = FALSE, echo = FALSE, message = FALSE}
|
||||
library(tidyverse)
|
||||
library(DT)
|
||||
|
||||
benchmarks <- read_csv('./benchmarks.csv') |>
|
||||
mutate(
|
||||
file_size = factor(rlang::parse_bytes(as.character(file_size)),
|
||||
levels = rlang::parse_bytes(as.character(
|
||||
unique(file_size[order(file_size, decreasing = TRUE)])))))
|
||||
|
||||
relative_performance <- benchmarks |>
|
||||
filter(experiment_type == 'deluge_experiment_config_log_entry') |>
|
||||
transmute(
|
||||
file_size, network_size, seeders, leechers, deluge_median = median,
|
||||
) |>
|
||||
inner_join(
|
||||
benchmarks |>
|
||||
filter(experiment_type == 'codex_experiment_config_log_entry') |>
|
||||
select(
|
||||
file_size, network_size, seeders, leechers, codex_median = median
|
||||
),
|
||||
by = c('file_size', 'network_size', 'seeders', 'leechers')
|
||||
) |>
|
||||
mutate(
|
||||
performance = codex_median / deluge_median,
|
||||
seeder_ratio = seeders / network_size
|
||||
)
|
||||
```
|
||||
|
||||
|
||||
## Intro
|
||||
|
||||
::: {.incremental}
|
||||
|
||||
* Why?
|
||||
* _performance_ is a key aspect of a storage system;
|
||||
* want to understand _how Codex performs_.
|
||||
|
||||
* What?
|
||||
* Content delivery: _download_ performance.
|
||||
* Download performance: latency, throughput.
|
||||
* Codex aims at supporting _large_ files;
|
||||
* download speed ($\text{MB/s}$) is dominant.
|
||||
|
||||
:::
|
||||
|
||||
## Baseline {.smaller}
|
||||
|
||||
::: {.incremental}
|
||||
|
||||
* _Quality_ baseline: easier to know where you stand;
|
||||
* faster: good;
|
||||
* slower: maybe not so good.
|
||||
|
||||
* Decentralized, large-file content distribution:
|
||||
* Bittorrent;
|
||||
* IPFS.
|
||||
|
||||
* Bittorrent: _de facto_ standard;
|
||||
* been used for a very long time;
|
||||
* fast and efficient (more so than IPFS);
|
||||
* several open source implementations.
|
||||
|
||||
:::
|
||||
|
||||
## Baseline
|
||||
|
||||
::: {.incremental}
|
||||
|
||||
* For our baseline, we picked [Deluge](https://deluge-torrent.org/):
|
||||
* well-known, lots of users despite small market share (1%);
|
||||
* can be run as a daemon, has [open source client libraries](https://github.com/JohnDoee/deluge-client) to interact with daemon remotely;
|
||||
* based on [libtorrent (rasterbar)](https://www.libtorrent.org/).
|
||||
|
||||
:::
|
||||
|
||||
## Static Dissemination Experiment
|
||||
|
||||
::: {.incremental}
|
||||
|
||||
* _Static dissemination experiment._
|
||||
* Network of size $n$;
|
||||
* split into $s$ seeders, $l = n - s$ leechers;
|
||||
* seeder ratio $r = \frac{s}{n}$.
|
||||
|
||||
* Experiment:
|
||||
* generate file $F$ of size $b$;
|
||||
* upload $F$ to each seeder;
|
||||
* fire up all leechers "at the same time";
|
||||
* measure time to download $F$ at leechers.
|
||||
|
||||
:::
|
||||
|
||||
## Static Dissemination Experiment
|
||||
|
||||
```{r fig.align="center", echo = FALSE}
|
||||
knitr::include_graphics('./images/un-1.png')
|
||||
```
|
||||
|
||||
## Static Dissemination Experiment
|
||||
|
||||
```{r fig.align="center", echo = FALSE}
|
||||
knitr::include_graphics('./images/un-2.png')
|
||||
```
|
||||
|
||||
## Static Dissemination Experiment
|
||||
|
||||
```{r fig.align="center", echo = FALSE}
|
||||
knitr::include_graphics('./images/un-3.png')
|
||||
```
|
||||
|
||||
## Static Dissemination Experiment
|
||||
|
||||
```{r fig.align="center", echo = FALSE}
|
||||
knitr::include_graphics('./images/un-4.png')
|
||||
```
|
||||
|
||||
## Static Dissemination Experiment
|
||||
|
||||
```{r fig.align="center", echo = FALSE}
|
||||
knitr::include_graphics('./images/nu-6.png')
|
||||
```
|
||||
|
||||
## Static Dissemination Experiment
|
||||
|
||||
```{r fig.align="center", echo = FALSE}
|
||||
knitr::include_graphics('./images/nu-7.png')
|
||||
```
|
||||
|
||||
## Static Dissemination Experiment
|
||||
|
||||
* Parameters:
|
||||
* File sizes: $b \in {100\text{MB}, 1\text{GB}, 5\text{GB}}$;
|
||||
* Network sizes: $n \in {2, 8, 16, 32}$;
|
||||
* Seeder ratios: $0.5, 0.25, 0.125, 0.0625, 0.03125$ (depending on $n$).
|
||||
|
||||
* Hardware:
|
||||
* [CPX31](https://www.hetzner.com/cloud?ref=blog.codex.storage) Hetzner VMs (4 vCPU, 8GB RAM);
|
||||
* $\sim 4\text{Gbps}$ point-to-point bandwidth.
|
||||
|
||||
## Results - Download Speed
|
||||
|
||||
```{r fig.width = 10, warning=FALSE, message=FALSE, echo=FALSE}
|
||||
ggplot(benchmarks, aes(col = experiment_type, fill = experiment_type, group = experiment_type)) +
|
||||
geom_ribbon(aes(ymin = p25_speed, ymax = p75_speed, x = network_size, fill = experiment_type, alpha = 0.5), col = 'lightgray') +
|
||||
geom_point(aes(x = network_size, y = p25_speed), col = 'darkgray', size=10.0, shape='-') +
|
||||
geom_point(aes(x = network_size, y = p75_speed), col = 'darkgray', size=10.0, shape='-') +
|
||||
geom_line(aes(x = network_size, y = median_speed)) +
|
||||
geom_point(aes(x = network_size, y = median_speed)) +
|
||||
ylab('median download speed (bytes/second)') +
|
||||
xlab('network size') +
|
||||
theme_minimal(base_size=15) +
|
||||
scale_y_continuous(labels = function(x) paste0(scales::label_bytes()(x), '/s')) +
|
||||
facet_grid(
|
||||
file_size ~ seeder_ratio,
|
||||
labeller = labeller(
|
||||
seeder_ratio = as_labeller(function(x) {
|
||||
paste0("seeder ratio: ", scales::percent(as.numeric(x)))
|
||||
}))
|
||||
) +
|
||||
scale_color_discrete(name = '', labels = c('Codex', 'Deluge')) +
|
||||
guides(fill = 'none', alpha = 'none')
|
||||
```
|
||||
## Results - Relative (Median) Speed
|
||||
|
||||
```{r fig.cap='Median downlaod time ratio for Codex and Deluge', fig.width = 11, message = FALSE, echo = FALSE}
|
||||
ggplot(relative_performance) +
|
||||
geom_line(aes(x = network_size, y = performance, col = file_size), lwd=1) +
|
||||
geom_hline(yintercept = 1, linetype = 'dashed', col = 'darkgray') +
|
||||
geom_point(aes(x = network_size, y = performance, col = file_size)) +
|
||||
ylab('median Codex/Deluge performance ratio') +
|
||||
annotate('text', label = 'faster', x = 29, y = 0, col = 'darkgreen') +
|
||||
annotate('text', label = 'slower', x = 28.5, y = 2, col = 'darkred') +
|
||||
theme_minimal(base_size=15) +
|
||||
scale_color_discrete(name = 'file size') +
|
||||
facet_grid(
|
||||
file_size ~ seeder_ratio,
|
||||
labeller = labeller(
|
||||
file_size = as_labeller(function(x) x),
|
||||
seeder_ratio = as_labeller(function(x) {
|
||||
paste0("seeder ratio: ", scales::percent(as.numeric(x)))
|
||||
}))
|
||||
)
|
||||
```
|
||||
|
||||
## Next
|
||||
|
||||
::: {.incremental}
|
||||
|
||||
* Debugging, debugging...
|
||||
* larger experiments (networks, files);
|
||||
* dynamic network experiments, with churn and faults.
|
||||
|
||||
:::
|
||||
|
||||
## Thank You!
|
||||
|
||||
* Benchmarks repo: [github.com/codex-network/codex-benchmarks]()
|
||||
* RPubs Notebook with Data: [https://rpubs.com/giuliano_mega/1266876]()
|
||||
* Blog post: [https://blog.codex.storage/measuring-codex-performance-for-content-delivery/]()
|
||||
@ -1,121 +1,137 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from asyncio import Task
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
from typing import Optional, Dict
|
||||
|
||||
from aiohttp import ClientTimeout
|
||||
from pydantic import BaseModel
|
||||
|
||||
from benchmarks.codex.client.async_client import AsyncCodexClient
|
||||
from benchmarks.codex.client.async_client import AsyncCodexClient, DownloadStatus
|
||||
from benchmarks.codex.client.common import Cid
|
||||
from benchmarks.codex.client.common import Manifest
|
||||
from benchmarks.core.utils.random import random_data
|
||||
from benchmarks.logging.logging import DownloadMetric
|
||||
|
||||
EMPTY_STREAM_BACKOFF = 2
|
||||
STATUS_BACKOFF = 2.0
|
||||
PROGRESS_TIMEOUT = 120
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DownloadStatus(BaseModel):
|
||||
downloaded: int
|
||||
total: int
|
||||
|
||||
def as_percent(self) -> float:
|
||||
return (self.downloaded * 100) / self.total
|
||||
|
||||
|
||||
class DownloadHandle:
|
||||
def __init__(
|
||||
self,
|
||||
parent: "CodexAgent",
|
||||
manifest: Manifest,
|
||||
read_increment: float = 0.01,
|
||||
read_timeout: Optional[float] = None,
|
||||
log_increment: float = 0.01,
|
||||
status_backoff: float = STATUS_BACKOFF,
|
||||
progress_timeout: float = PROGRESS_TIMEOUT,
|
||||
):
|
||||
self.parent = parent
|
||||
self.manifest = manifest
|
||||
self.bytes_downloaded = 0
|
||||
self.read_increment = read_increment
|
||||
self.read_timeout = read_timeout
|
||||
self.log_increment = log_increment
|
||||
self.download_task: Optional[Task[None]] = None
|
||||
self.status_backoff = status_backoff
|
||||
self.progress_timeout = progress_timeout
|
||||
self._progress = DownloadStatus(downloaded=0, total=manifest.block_count)
|
||||
|
||||
def begin_download(self) -> Task:
|
||||
self.download_task = asyncio.create_task(self._download_loop())
|
||||
return self.download_task
|
||||
|
||||
async def _download_loop(self):
|
||||
step_size = int(self.manifest.datasetSize * self.read_increment)
|
||||
step_size = max(1, int(self.manifest.block_count * self.log_increment))
|
||||
|
||||
async with self.parent.client.download(
|
||||
self.manifest.cid,
|
||||
download_id = await self.parent.client.download(
|
||||
self.manifest,
|
||||
timeout=ClientTimeout(
|
||||
total=None,
|
||||
sock_connect=30,
|
||||
sock_read=self.read_timeout,
|
||||
),
|
||||
) as download_stream:
|
||||
logged_step = 0
|
||||
while not download_stream.at_eof():
|
||||
step = min(step_size, self.manifest.datasetSize - self.bytes_downloaded)
|
||||
bytes_read = await download_stream.read(step)
|
||||
# We actually have no guarantees that an empty read means EOF, so we just back off
|
||||
# a bit.
|
||||
if not bytes_read:
|
||||
await asyncio.sleep(EMPTY_STREAM_BACKOFF)
|
||||
self.bytes_downloaded += len(bytes_read)
|
||||
)
|
||||
|
||||
if int(self.bytes_downloaded / step_size) > logged_step:
|
||||
logged_step += 1
|
||||
logger.info(
|
||||
DownloadMetric(
|
||||
dataset_name=self.manifest.filename,
|
||||
handle=self.manifest.cid,
|
||||
value=step_size * logged_step,
|
||||
node=self.parent.node_id,
|
||||
)
|
||||
)
|
||||
logger.info(f"Start download monitoring loop for {download_id}")
|
||||
|
||||
if self.bytes_downloaded < self.manifest.datasetSize:
|
||||
raise EOFError(
|
||||
f"Got EOF too early: download size ({self.bytes_downloaded}) was less "
|
||||
f"than expected ({self.manifest.datasetSize})."
|
||||
)
|
||||
current_step = 0
|
||||
last_progress = time.time()
|
||||
while True:
|
||||
has_progress = False
|
||||
progress = await self.parent.client.download_status(download_id)
|
||||
self._publish_progress(progress)
|
||||
while current_step < (progress.downloaded / step_size):
|
||||
has_progress = True
|
||||
current_step += 1
|
||||
self._log_progress(current_step * step_size)
|
||||
|
||||
if self.bytes_downloaded > self.manifest.datasetSize:
|
||||
raise ValueError(
|
||||
f"Download size ({self.bytes_downloaded}) was greater than expected "
|
||||
f"({self.manifest.datasetSize})."
|
||||
if not has_progress:
|
||||
# Backs off for a bit if we haven't seen any progress.
|
||||
await asyncio.sleep(self.status_backoff)
|
||||
else:
|
||||
last_progress = time.time()
|
||||
|
||||
if progress.is_complete():
|
||||
# If step_size is not a multiple of 1/log_increment, we have a trailing step for the
|
||||
# remainder.
|
||||
if current_step * step_size < self.manifest.block_count:
|
||||
self._log_progress(current_step * step_size)
|
||||
break
|
||||
|
||||
if time.time() - last_progress > self.progress_timeout:
|
||||
raise asyncio.TimeoutError(
|
||||
f"Download made no progress in more than {self.progress_timeout} seconds"
|
||||
)
|
||||
|
||||
def progress(self) -> DownloadStatus:
|
||||
if self.download_task is None:
|
||||
return DownloadStatus(downloaded=0, total=self.manifest.datasetSize)
|
||||
return self._progress
|
||||
|
||||
if self.download_task.done():
|
||||
# This will bubble exceptions up, if any.
|
||||
self.download_task.result()
|
||||
def _publish_progress(self, status: DownloadStatus):
|
||||
self._progress = DownloadStatus(
|
||||
downloaded=status.downloaded * self.manifest.blockSize,
|
||||
total=status.total * self.manifest.blockSize,
|
||||
)
|
||||
|
||||
return DownloadStatus(
|
||||
downloaded=self.bytes_downloaded, total=self.manifest.datasetSize
|
||||
def _log_progress(self, downloaded: int):
|
||||
logger.info(
|
||||
DownloadMetric(
|
||||
dataset_name=self.manifest.filename,
|
||||
value=downloaded * self.manifest.blockSize,
|
||||
node=self.parent.node_id,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class CodexAgent:
|
||||
""":class:`CodexAgent` interacts with the Codex node locally through its REST API
|
||||
providing the higher-level API required by the benchmarking experiments."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
client: AsyncCodexClient,
|
||||
node_id: str = "unknown",
|
||||
read_timeout: Optional[float] = None,
|
||||
status_backoff: float = STATUS_BACKOFF,
|
||||
progress_timeout: float = PROGRESS_TIMEOUT,
|
||||
) -> None:
|
||||
self.client = client
|
||||
self.node_id = node_id
|
||||
self.ongoing_downloads: Dict[Cid, DownloadHandle] = {}
|
||||
self.read_timeout = read_timeout
|
||||
self.status_backoff = status_backoff
|
||||
self.progress_timeout = progress_timeout
|
||||
|
||||
async def create_dataset(self, name: str, size: int, seed: Optional[int]) -> Cid:
|
||||
async def create_dataset(
|
||||
self, name: str, size: int, seed: Optional[int]
|
||||
) -> Manifest:
|
||||
"""
|
||||
Creates a random dataset and uploads it to the Codex node.
|
||||
|
||||
:param name: the name of the dataset to be created.
|
||||
:param size: the size of the dataset to be created, in bytes.
|
||||
:param seed: the seed to be used for generating the random dataset. Using the
|
||||
same seed will generate the exact same dataset.
|
||||
|
||||
:return: the :class:`Manifest` block for the dataset.
|
||||
"""
|
||||
with TemporaryDirectory() as td:
|
||||
data = Path(td) / "datafile.bin"
|
||||
|
||||
@ -123,22 +139,37 @@ class CodexAgent:
|
||||
random_data(size=size, outfile=outfile, seed=seed)
|
||||
|
||||
with data.open(mode="rb") as infile:
|
||||
return await self.client.upload(
|
||||
cid = await self.client.upload(
|
||||
name=name, mime_type="application/octet-stream", content=infile
|
||||
)
|
||||
|
||||
async def download(self, cid: Cid, read_increment: float = 0.01) -> DownloadHandle:
|
||||
if cid in self.ongoing_downloads:
|
||||
return self.ongoing_downloads[cid]
|
||||
return await self.client.manifest(cid)
|
||||
|
||||
async def download(
|
||||
self, manifest: Manifest, log_increment: float = 0.01
|
||||
) -> DownloadHandle:
|
||||
"""
|
||||
Downloads the dataset with the given CID from the Codex node, tracking and logging
|
||||
its progress until it is complete.
|
||||
|
||||
:param manifest: the Manifest or the dataset to be downloaded.
|
||||
:param log_increment:
|
||||
how often to log progress, in terms of download completion fraction. E.g., 0.1
|
||||
will log progress every 10% of the download completed.
|
||||
|
||||
:return: a :class:`DownloadHandle` object that can be used to return the current
|
||||
progress. The experiment controller will typically ask for this periodically
|
||||
to figure out if the download is complete.
|
||||
"""
|
||||
handle = DownloadHandle(
|
||||
self,
|
||||
manifest=await self.client.manifest(cid),
|
||||
read_increment=read_increment,
|
||||
read_timeout=self.read_timeout,
|
||||
manifest=manifest,
|
||||
log_increment=log_increment,
|
||||
status_backoff=self.status_backoff,
|
||||
progress_timeout=self.progress_timeout,
|
||||
)
|
||||
|
||||
handle.begin_download()
|
||||
|
||||
self.ongoing_downloads[cid] = handle
|
||||
self.ongoing_downloads[manifest.treeCid] = handle
|
||||
return handle
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
"""This module contains a REST API wrapping :class:`CodexAgent`."""
|
||||
|
||||
import logging
|
||||
from typing import Annotated, Optional
|
||||
|
||||
from aiohttp import ClientResponseError
|
||||
@ -10,10 +11,13 @@ from urllib3.util import parse_url
|
||||
|
||||
from benchmarks.codex.agent.agent import CodexAgent, DownloadStatus
|
||||
from benchmarks.codex.client.async_client import AsyncCodexClientImpl
|
||||
from benchmarks.codex.client.common import Manifest
|
||||
from benchmarks.core.agent import AgentBuilder
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def codex_agent() -> CodexAgent:
|
||||
raise Exception("Dependency must be set")
|
||||
@ -30,21 +34,22 @@ async def generate(
|
||||
name: str,
|
||||
size: int,
|
||||
seed: Optional[int],
|
||||
):
|
||||
return Response(
|
||||
await agent.create_dataset(name=name, size=size, seed=seed),
|
||||
media_type="text/plain; charset=UTF-8",
|
||||
)
|
||||
) -> Manifest:
|
||||
return await agent.create_dataset(name=name, size=size, seed=seed)
|
||||
|
||||
|
||||
@router.post("/api/v1/codex/download")
|
||||
async def download(
|
||||
request: Request, agent: Annotated[CodexAgent, Depends(codex_agent)], cid: str
|
||||
request: Request,
|
||||
agent: Annotated[CodexAgent, Depends(codex_agent)],
|
||||
manifest: Manifest,
|
||||
):
|
||||
await agent.download(cid)
|
||||
await agent.download(manifest)
|
||||
return JSONResponse(
|
||||
status_code=202,
|
||||
content={"status": str(request.url_for("download_status", cid=cid))},
|
||||
content={
|
||||
"status": str(request.url_for("download_status", cid=manifest.treeCid))
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@ -52,12 +57,23 @@ async def download(
|
||||
async def download_status(
|
||||
agent: Annotated[CodexAgent, Depends(codex_agent)], cid: str
|
||||
) -> DownloadStatus:
|
||||
if cid not in agent.ongoing_downloads:
|
||||
download = agent.ongoing_downloads.get(cid)
|
||||
if download is None:
|
||||
raise HTTPException(
|
||||
status_code=404, detail=f"There are no ongoing downloads for CID {cid}"
|
||||
)
|
||||
|
||||
return agent.ongoing_downloads[cid].progress()
|
||||
assert download.download_task is not None
|
||||
|
||||
if download.download_task.done():
|
||||
exception = download.download_task.exception()
|
||||
if exception is not None:
|
||||
logger.error("Error during download:", exc_info=exception)
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Error during download: {exception}"
|
||||
)
|
||||
|
||||
return download.progress()
|
||||
|
||||
|
||||
@router.get("/api/v1/codex/download/node-id")
|
||||
|
||||
@ -6,8 +6,8 @@ import requests
|
||||
from requests.exceptions import ConnectionError
|
||||
from urllib3.util import Url, parse_url
|
||||
|
||||
from benchmarks.codex.agent.agent import DownloadStatus
|
||||
from benchmarks.codex.client.common import Cid
|
||||
from benchmarks.codex.client.async_client import DownloadStatus
|
||||
from benchmarks.codex.client.common import Manifest
|
||||
from benchmarks.core.experiments.experiments import ExperimentComponent
|
||||
|
||||
|
||||
@ -22,7 +22,7 @@ class CodexAgentClient(ExperimentComponent):
|
||||
except (ConnectionError, socket.gaierror):
|
||||
return False
|
||||
|
||||
def generate(self, size: int, seed: int, name: str) -> Cid:
|
||||
def generate(self, size: int, seed: int, name: str) -> Manifest:
|
||||
response = requests.post(
|
||||
url=self.url._replace(path="/api/v1/codex/dataset").url,
|
||||
params={
|
||||
@ -34,14 +34,12 @@ class CodexAgentClient(ExperimentComponent):
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
return response.text
|
||||
return Manifest.model_validate(response.json())
|
||||
|
||||
def download(self, cid: str) -> Url:
|
||||
def download(self, manifest: Manifest) -> Url:
|
||||
response = requests.post(
|
||||
url=self.url._replace(path="/api/v1/codex/download").url,
|
||||
params={
|
||||
"cid": cid,
|
||||
},
|
||||
json=manifest.model_dump(mode="json"),
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
@ -1,22 +1,32 @@
|
||||
import json
|
||||
import re
|
||||
from asyncio import StreamReader
|
||||
from contextlib import asynccontextmanager
|
||||
from io import BytesIO
|
||||
from typing import Dict, Optional, AsyncIterator, Tuple, IO
|
||||
from typing import Dict, Optional, IO
|
||||
|
||||
from aiohttp import web, ClientTimeout
|
||||
from urllib3.util import Url
|
||||
from aiohttp import ClientTimeout
|
||||
|
||||
from benchmarks.codex.client.async_client import AsyncCodexClient, Cid
|
||||
from benchmarks.codex.client.async_client import AsyncCodexClient, Cid, DownloadStatus
|
||||
from benchmarks.codex.client.common import Manifest
|
||||
from benchmarks.core.utils.streams import BaseStreamReader
|
||||
|
||||
|
||||
class FakeDownload:
|
||||
def __init__(self, manifest: Manifest) -> None:
|
||||
self.manifest = manifest
|
||||
self.downloaded = 0
|
||||
self.exception: Optional[Exception] = None
|
||||
|
||||
def advance_download(self, blocks: int):
|
||||
self.downloaded += blocks
|
||||
print("Advance download to", self.downloaded)
|
||||
assert (
|
||||
self.downloaded <= self.manifest.block_count
|
||||
), "Downloaded blocks exceed total blocks"
|
||||
|
||||
def abort(self, exception: Exception):
|
||||
self.exception = exception
|
||||
|
||||
|
||||
class FakeCodex(AsyncCodexClient):
|
||||
def __init__(self) -> None:
|
||||
self.storage: Dict[Cid, Manifest] = {}
|
||||
self.streams: Dict[Cid, StreamReader] = {}
|
||||
self.manifests: Dict[Cid, Manifest] = {}
|
||||
self.ongoing_downloads: Dict[Cid, FakeDownload] = {}
|
||||
|
||||
async def upload(
|
||||
self,
|
||||
@ -27,89 +37,38 @@ class FakeCodex(AsyncCodexClient):
|
||||
) -> Cid:
|
||||
data = content.read()
|
||||
cid = "Qm" + str(hash(data))
|
||||
self.storage[cid] = Manifest(
|
||||
self.manifests[cid] = Manifest(
|
||||
cid=cid,
|
||||
treeCid=f"{cid}treeCid",
|
||||
datasetSize=len(data),
|
||||
mimetype=mime_type,
|
||||
blockSize=1,
|
||||
filename=name,
|
||||
treeCid="",
|
||||
protected=False,
|
||||
)
|
||||
return cid
|
||||
|
||||
async def manifest(self, cid: Cid) -> Manifest:
|
||||
return self.storage[cid]
|
||||
return self.manifests[cid]
|
||||
|
||||
def create_download_stream(self, cid: Cid) -> StreamReader:
|
||||
reader = StreamReader()
|
||||
self.streams[cid] = reader
|
||||
return reader
|
||||
|
||||
@asynccontextmanager
|
||||
async def download(
|
||||
self, cid: Cid, timeout: Optional[ClientTimeout] = None
|
||||
) -> AsyncIterator[BaseStreamReader]:
|
||||
yield self.streams[cid]
|
||||
self, manifest: Manifest, timeout: Optional[ClientTimeout] = None
|
||||
) -> Cid:
|
||||
if manifest.treeCid not in self.ongoing_downloads:
|
||||
raise ValueError("You need to create a " "download before initiating it")
|
||||
return manifest.treeCid
|
||||
|
||||
def new_download(self, manifest: Manifest) -> FakeDownload:
|
||||
"""Creates a download which we can then use to simulate progress."""
|
||||
handle = FakeDownload(manifest)
|
||||
self.ongoing_downloads[manifest.treeCid] = handle
|
||||
return handle
|
||||
|
||||
@asynccontextmanager
|
||||
async def fake_codex_api() -> AsyncIterator[Tuple[FakeCodex, Url]]:
|
||||
codex = FakeCodex()
|
||||
routes = web.RouteTableDef()
|
||||
|
||||
@routes.get("/api/codex/v1/data/{cid}/network/manifest")
|
||||
async def manifest(request):
|
||||
cid = request.match_info["cid"]
|
||||
assert cid in codex.storage
|
||||
# Gets the manifest in a similar shape as the Codex response.
|
||||
manifest = json.loads(codex.storage[cid].model_dump_json())
|
||||
return web.json_response(
|
||||
data={
|
||||
"cid": manifest.pop("cid"),
|
||||
"manifest": manifest,
|
||||
}
|
||||
async def download_status(self, dataset: Cid) -> DownloadStatus:
|
||||
download = self.ongoing_downloads[dataset]
|
||||
if download.exception:
|
||||
raise download.exception
|
||||
return DownloadStatus(
|
||||
downloaded=download.downloaded,
|
||||
total=download.manifest.block_count,
|
||||
)
|
||||
|
||||
@routes.post("/api/codex/v1/data")
|
||||
async def upload(request):
|
||||
await request.post()
|
||||
filename = re.findall(
|
||||
r'filename="(.+)"', request.headers["Content-Disposition"]
|
||||
)[0]
|
||||
cid = await codex.upload(
|
||||
name=filename,
|
||||
mime_type=request.headers["Content-Type"],
|
||||
content=BytesIO(await request.read()),
|
||||
)
|
||||
return web.Response(text=cid)
|
||||
|
||||
@routes.get("/api/codex/v1/data/{cid}")
|
||||
async def download(request):
|
||||
cid = request.match_info["cid"]
|
||||
assert cid in codex.streams
|
||||
reader = codex.streams[cid]
|
||||
|
||||
# We basically copy the stream onto the response.
|
||||
response = web.StreamResponse()
|
||||
await response.prepare(request)
|
||||
while not reader.at_eof():
|
||||
await response.write(await reader.read(1024))
|
||||
|
||||
await response.write_eof()
|
||||
return response
|
||||
|
||||
app = web.Application()
|
||||
app.add_routes(routes)
|
||||
|
||||
runner = web.AppRunner(app)
|
||||
await runner.setup()
|
||||
|
||||
site = web.TCPSite(runner, "localhost", 8888)
|
||||
await site.start()
|
||||
|
||||
try:
|
||||
yield codex, Url(scheme="http", host="localhost", port=8888)
|
||||
finally:
|
||||
await site.stop()
|
||||
await runner.cleanup()
|
||||
|
||||
@ -6,6 +6,7 @@ from starlette.testclient import TestClient
|
||||
from benchmarks.codex.agent import api
|
||||
from benchmarks.codex.agent.agent import CodexAgent
|
||||
from benchmarks.codex.agent.tests.fake_codex import FakeCodex
|
||||
from benchmarks.codex.client.common import Manifest
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@ -25,9 +26,8 @@ async def test_should_create_file():
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.charset_encoding == "utf-8"
|
||||
|
||||
manifest = await codex_client.manifest(response.text)
|
||||
manifest = Manifest.model_validate(response.json())
|
||||
|
||||
assert manifest.datasetSize == 1024
|
||||
|
||||
@ -50,28 +50,24 @@ async def test_should_report_when_download_is_complete():
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.charset_encoding == "utf-8"
|
||||
manifest = Manifest.model_validate(response.json())
|
||||
|
||||
cid = response.text
|
||||
|
||||
download_stream = codex_client.create_download_stream(cid)
|
||||
fake_download = codex_client.new_download(manifest)
|
||||
|
||||
response = await client.post(
|
||||
"/api/v1/codex/download",
|
||||
params={"cid": cid},
|
||||
"/api/v1/codex/download", json=manifest.model_dump(mode="json")
|
||||
)
|
||||
|
||||
assert response.status_code == 202
|
||||
assert response.json() == {
|
||||
"status": f"http://testserver/api/v1/codex/download/{cid}/status"
|
||||
"status": f"http://testserver/api/v1/codex/download/{manifest.treeCid}/status"
|
||||
}
|
||||
|
||||
download_stream.feed_data(b"0" * 1024)
|
||||
download_stream.feed_eof()
|
||||
fake_download.advance_download(blocks=1024)
|
||||
|
||||
await codex_agent.ongoing_downloads[cid].download_task
|
||||
await codex_agent.ongoing_downloads[manifest.treeCid].download_task
|
||||
|
||||
response = await client.get(f"api/v1/codex/download/{cid}/status")
|
||||
response = await client.get(f"api/v1/codex/download/{manifest.treeCid}/status")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"downloaded": 1024, "total": 1024}
|
||||
|
||||
@ -5,8 +5,7 @@ from unittest.mock import patch
|
||||
import pytest
|
||||
|
||||
from benchmarks.codex.agent.agent import CodexAgent, DownloadStatus
|
||||
from benchmarks.codex.agent.tests.fake_codex import FakeCodex, fake_codex_api
|
||||
from benchmarks.codex.client.async_client import AsyncCodexClientImpl
|
||||
from benchmarks.codex.agent.tests.fake_codex import FakeCodex
|
||||
from benchmarks.core.concurrency import await_predicate_async
|
||||
from benchmarks.logging.logging import LogParser, DownloadMetric
|
||||
|
||||
@ -14,8 +13,7 @@ from benchmarks.logging.logging import LogParser, DownloadMetric
|
||||
@pytest.mark.asyncio
|
||||
async def test_should_create_dataset_of_right_size():
|
||||
codex_agent = CodexAgent(FakeCodex())
|
||||
cid = await codex_agent.create_dataset(size=1024, name="dataset-1", seed=1234)
|
||||
manifest = await codex_agent.client.manifest(cid)
|
||||
manifest = await codex_agent.create_dataset(size=1024, name="dataset-1", seed=1234)
|
||||
|
||||
assert manifest.datasetSize == 1024
|
||||
|
||||
@ -24,35 +22,33 @@ async def test_should_create_dataset_of_right_size():
|
||||
async def test_same_seed_creates_same_cid():
|
||||
codex_agent = CodexAgent(FakeCodex())
|
||||
|
||||
cid1 = await codex_agent.create_dataset(size=2048, name="dataset-1", seed=1234)
|
||||
cid2 = await codex_agent.create_dataset(size=2048, name="dataset-1", seed=1234)
|
||||
cid3 = await codex_agent.create_dataset(size=2048, name="dataset-1", seed=1235)
|
||||
manifest1 = await codex_agent.create_dataset(size=2048, name="dataset-1", seed=1234)
|
||||
manifest2 = await codex_agent.create_dataset(size=2048, name="dataset-1", seed=1234)
|
||||
manifest3 = await codex_agent.create_dataset(size=2048, name="dataset-1", seed=1235)
|
||||
|
||||
assert cid1 == cid2
|
||||
assert cid1 != cid3
|
||||
assert manifest1.cid == manifest2.cid
|
||||
assert manifest1.cid != manifest3.cid
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_should_report_download_progress():
|
||||
client = FakeCodex()
|
||||
codex_agent = CodexAgent(client)
|
||||
codex_agent = CodexAgent(client, status_backoff=0.01)
|
||||
|
||||
cid = await codex_agent.create_dataset(size=1000, name="dataset-1", seed=1234)
|
||||
download_stream = client.create_download_stream(cid)
|
||||
|
||||
handle = await codex_agent.download(cid)
|
||||
manifest = await codex_agent.create_dataset(size=1000, name="dataset-1", seed=1234)
|
||||
fake_download = client.new_download(manifest)
|
||||
handle = await codex_agent.download(manifest)
|
||||
|
||||
assert handle.progress() == DownloadStatus(downloaded=0, total=1000)
|
||||
|
||||
for i in range(200):
|
||||
download_stream.feed_data(b"0" * 5)
|
||||
fake_download.advance_download(blocks=5)
|
||||
assert await await_predicate_async(
|
||||
lambda: handle.progress()
|
||||
== DownloadStatus(downloaded=5 * (i + 1), total=1000),
|
||||
timeout=5,
|
||||
)
|
||||
|
||||
download_stream.feed_eof()
|
||||
await handle.download_task
|
||||
|
||||
assert handle.progress() == DownloadStatus(downloaded=1000, total=1000)
|
||||
@ -63,14 +59,17 @@ async def test_should_raise_exception_on_progress_query_if_download_fails():
|
||||
client = FakeCodex()
|
||||
codex_agent = CodexAgent(client)
|
||||
|
||||
cid = await codex_agent.create_dataset(size=1000, name="dataset-1", seed=1234)
|
||||
download_stream = client.create_download_stream(cid)
|
||||
manifest = await codex_agent.create_dataset(size=1000, name="dataset-1", seed=1234)
|
||||
fake_download = client.new_download(manifest)
|
||||
|
||||
handle = await codex_agent.download(cid)
|
||||
handle = await codex_agent.download(manifest)
|
||||
|
||||
download_stream.feed_eof()
|
||||
class SomeError(Exception):
|
||||
pass
|
||||
|
||||
with pytest.raises(EOFError):
|
||||
fake_download.abort(SomeError())
|
||||
|
||||
with pytest.raises(SomeError):
|
||||
await handle.download_task
|
||||
|
||||
|
||||
@ -82,13 +81,14 @@ async def test_should_log_download_progress_as_metric_in_discrete_steps(mock_log
|
||||
client = FakeCodex()
|
||||
codex_agent = CodexAgent(client)
|
||||
|
||||
cid = await codex_agent.create_dataset(size=1000, name="dataset-1", seed=1234)
|
||||
manifest = await codex_agent.create_dataset(
|
||||
size=1000, name="dataset-1", seed=1234
|
||||
)
|
||||
fake_download = client.new_download(manifest)
|
||||
|
||||
download_stream = client.create_download_stream(cid)
|
||||
download_stream.feed_data(b"0" * 1000)
|
||||
download_stream.feed_eof()
|
||||
fake_download.advance_download(1000)
|
||||
|
||||
handle = await codex_agent.download(cid, read_increment=0.2)
|
||||
handle = await codex_agent.download(manifest, log_increment=0.2)
|
||||
await handle.download_task
|
||||
|
||||
parser = LogParser()
|
||||
@ -99,35 +99,30 @@ async def test_should_log_download_progress_as_metric_in_discrete_steps(mock_log
|
||||
assert metrics == [
|
||||
DownloadMetric(
|
||||
dataset_name="dataset-1",
|
||||
handle=cid,
|
||||
value=200,
|
||||
node=codex_agent.node_id,
|
||||
timestamp=metrics[0].timestamp,
|
||||
),
|
||||
DownloadMetric(
|
||||
dataset_name="dataset-1",
|
||||
handle=cid,
|
||||
value=400,
|
||||
node=codex_agent.node_id,
|
||||
timestamp=metrics[1].timestamp,
|
||||
),
|
||||
DownloadMetric(
|
||||
dataset_name="dataset-1",
|
||||
handle=cid,
|
||||
value=600,
|
||||
node=codex_agent.node_id,
|
||||
timestamp=metrics[2].timestamp,
|
||||
),
|
||||
DownloadMetric(
|
||||
dataset_name="dataset-1",
|
||||
handle=cid,
|
||||
value=800,
|
||||
node=codex_agent.node_id,
|
||||
timestamp=metrics[3].timestamp,
|
||||
),
|
||||
DownloadMetric(
|
||||
dataset_name="dataset-1",
|
||||
handle=cid,
|
||||
value=1000,
|
||||
node=codex_agent.node_id,
|
||||
timestamp=metrics[4].timestamp,
|
||||
@ -143,24 +138,24 @@ async def test_should_log_download_progress_as_discrete_steps_even_when_underlyi
|
||||
|
||||
with patch("benchmarks.codex.agent.agent.logger", logger):
|
||||
client = FakeCodex()
|
||||
codex_agent = CodexAgent(client)
|
||||
cid = await codex_agent.create_dataset(size=1000, name="dataset-1", seed=1234)
|
||||
download_stream = client.create_download_stream(cid)
|
||||
handle = await codex_agent.download(cid, read_increment=0.2)
|
||||
|
||||
codex_agent = CodexAgent(client, status_backoff=0.01)
|
||||
manifest = await codex_agent.create_dataset(
|
||||
size=1000, name="dataset-1", seed=1234
|
||||
)
|
||||
fake_download = client.new_download(manifest)
|
||||
handle = await codex_agent.download(manifest, log_increment=0.2)
|
||||
# Simulates a choppy download which returns a lot less than the logging step size every time.
|
||||
fed = 0
|
||||
step = 37
|
||||
while fed < 1000:
|
||||
to_feed = min(step, 1000 - fed)
|
||||
download_stream.feed_data(b"0" * to_feed)
|
||||
fake_download.advance_download(to_feed)
|
||||
fed += to_feed
|
||||
assert await await_predicate_async(
|
||||
lambda: handle.progress() == DownloadStatus(downloaded=fed, total=1000),
|
||||
timeout=5,
|
||||
)
|
||||
|
||||
download_stream.feed_eof()
|
||||
await handle.download_task
|
||||
|
||||
parser = LogParser()
|
||||
@ -171,35 +166,30 @@ async def test_should_log_download_progress_as_discrete_steps_even_when_underlyi
|
||||
assert metrics == [
|
||||
DownloadMetric(
|
||||
dataset_name="dataset-1",
|
||||
handle=cid,
|
||||
value=200,
|
||||
node=codex_agent.node_id,
|
||||
timestamp=metrics[0].timestamp,
|
||||
),
|
||||
DownloadMetric(
|
||||
dataset_name="dataset-1",
|
||||
handle=cid,
|
||||
value=400,
|
||||
node=codex_agent.node_id,
|
||||
timestamp=metrics[1].timestamp,
|
||||
),
|
||||
DownloadMetric(
|
||||
dataset_name="dataset-1",
|
||||
handle=cid,
|
||||
value=600,
|
||||
node=codex_agent.node_id,
|
||||
timestamp=metrics[2].timestamp,
|
||||
),
|
||||
DownloadMetric(
|
||||
dataset_name="dataset-1",
|
||||
handle=cid,
|
||||
value=800,
|
||||
node=codex_agent.node_id,
|
||||
timestamp=metrics[3].timestamp,
|
||||
),
|
||||
DownloadMetric(
|
||||
dataset_name="dataset-1",
|
||||
handle=cid,
|
||||
value=1000,
|
||||
node=codex_agent.node_id,
|
||||
timestamp=metrics[4].timestamp,
|
||||
@ -212,49 +202,38 @@ async def test_should_track_download_handles():
|
||||
client = FakeCodex()
|
||||
codex_agent = CodexAgent(client)
|
||||
|
||||
cid = await codex_agent.create_dataset(size=1000, name="dataset-1", seed=1356)
|
||||
manifest = await codex_agent.create_dataset(size=1000, name="dataset-1", seed=1356)
|
||||
fake_download = client.new_download(manifest)
|
||||
|
||||
assert cid not in codex_agent.ongoing_downloads
|
||||
assert manifest.treeCid not in codex_agent.ongoing_downloads
|
||||
|
||||
download_stream = client.create_download_stream(cid)
|
||||
handle = await codex_agent.download(cid)
|
||||
|
||||
download_stream.feed_data(b"0" * 1000)
|
||||
download_stream.feed_eof()
|
||||
|
||||
assert codex_agent.ongoing_downloads[cid] == handle
|
||||
handle = await codex_agent.download(manifest)
|
||||
assert codex_agent.ongoing_downloads[manifest.treeCid] == handle
|
||||
|
||||
fake_download.advance_download(1000)
|
||||
await handle.download_task
|
||||
|
||||
assert cid in codex_agent.ongoing_downloads
|
||||
assert manifest.treeCid in codex_agent.ongoing_downloads
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_should_timeout_if_download_stream_takes_too_long_to_return_content():
|
||||
async with fake_codex_api() as (fake_codex, url):
|
||||
client = AsyncCodexClientImpl(url)
|
||||
codex_agent = CodexAgent(client, read_timeout=0.5)
|
||||
async def test_should_timeout_if_download_goes_for_too_long_without_any_progress():
|
||||
fake_codex = FakeCodex()
|
||||
codex_agent = CodexAgent(fake_codex, status_backoff=0.01, progress_timeout=0.5)
|
||||
|
||||
fast_cid = await codex_agent.create_dataset(
|
||||
size=1000, name="dataset-fast-1", seed=1356
|
||||
)
|
||||
slow_cid = await codex_agent.create_dataset(
|
||||
size=1000, name="dataset-slow-1", seed=1353
|
||||
)
|
||||
fast = await codex_agent.create_dataset(size=1000, name="dataset-fast-1", seed=1356)
|
||||
slow = await codex_agent.create_dataset(size=1000, name="dataset-slow-1", seed=1353)
|
||||
|
||||
fast_download = fake_codex.create_download_stream(fast_cid)
|
||||
slow_download = fake_codex.create_download_stream(slow_cid)
|
||||
fast_download = fake_codex.new_download(fast)
|
||||
slow_download = fake_codex.new_download(slow)
|
||||
|
||||
fast_download.feed_data(b"0" * 1000)
|
||||
fast_download.feed_eof()
|
||||
fast_handle = await codex_agent.download(fast_cid)
|
||||
await fast_handle.download_task
|
||||
fast_download.advance_download(1000)
|
||||
fast_handle = await codex_agent.download(fast)
|
||||
await fast_handle.download_task
|
||||
|
||||
slow_handle = await codex_agent.download(slow_cid)
|
||||
slow_download.feed_data(b"0" * 500)
|
||||
await asyncio.sleep(0.6)
|
||||
slow_download.feed_data(b"0" * 500)
|
||||
slow_download.feed_eof()
|
||||
slow_handle = await codex_agent.download(slow)
|
||||
slow_download.advance_download(500)
|
||||
await asyncio.sleep(0.6)
|
||||
slow_download.advance_download(500)
|
||||
|
||||
with pytest.raises(asyncio.TimeoutError):
|
||||
await slow_handle.download_task
|
||||
with pytest.raises(asyncio.TimeoutError):
|
||||
await slow_handle.download_task
|
||||
|
||||
@ -1,16 +1,25 @@
|
||||
"""Async Client implementation for the base Codex API."""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import IO, AsyncIterator, AsyncGenerator, Optional
|
||||
from typing import IO, Optional
|
||||
|
||||
import aiohttp
|
||||
from aiohttp import ClientTimeout
|
||||
from pydantic import BaseModel
|
||||
from urllib3.util import Url
|
||||
|
||||
from benchmarks.codex.client.common import Manifest, Cid
|
||||
from benchmarks.core.utils.streams import BaseStreamReader
|
||||
|
||||
|
||||
class DownloadStatus(BaseModel):
|
||||
downloaded: int
|
||||
total: int
|
||||
|
||||
def as_percent(self) -> float:
|
||||
return (self.downloaded * 100) / self.total
|
||||
|
||||
def is_complete(self) -> bool:
|
||||
return self.downloaded == self.total
|
||||
|
||||
|
||||
class AsyncCodexClient(ABC):
|
||||
@ -28,11 +37,14 @@ class AsyncCodexClient(ABC):
|
||||
async def manifest(self, cid: Cid) -> Manifest:
|
||||
pass
|
||||
|
||||
@asynccontextmanager
|
||||
@abstractmethod
|
||||
def download(
|
||||
self, cid: Cid, timeout: Optional[ClientTimeout] = None
|
||||
) -> AsyncGenerator[BaseStreamReader, None]:
|
||||
async def download(
|
||||
self, manifest: Manifest, timeout: Optional[ClientTimeout] = None
|
||||
) -> Cid:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def download_status(self, dataset: Cid) -> DownloadStatus:
|
||||
pass
|
||||
|
||||
|
||||
@ -77,16 +89,44 @@ class AsyncCodexClientImpl(AsyncCodexClient):
|
||||
|
||||
return Manifest.from_codex_api_response(response_contents)
|
||||
|
||||
@asynccontextmanager
|
||||
async def download(
|
||||
self, cid: Cid, timeout: Optional[ClientTimeout] = None
|
||||
) -> AsyncIterator[BaseStreamReader]:
|
||||
self, manifest: Manifest, timeout: Optional[ClientTimeout] = None
|
||||
) -> Cid:
|
||||
async with aiohttp.ClientSession(timeout=ClientTimeout()) as session:
|
||||
response = await session.get(
|
||||
self.codex_api_url._replace(path=f"/api/codex/v1/data/{cid}").url,
|
||||
timeout=timeout,
|
||||
response = await session.post(
|
||||
self.codex_api_url._replace(path="/api/codex/v1/download").url,
|
||||
json={
|
||||
"cid": manifest.cid,
|
||||
"manifest": manifest.model_dump(exclude={"cid"}, mode="json"),
|
||||
},
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
response_contents = await response.json()
|
||||
|
||||
yield response.content
|
||||
return response_contents["downloadId"]
|
||||
|
||||
async def download_status(self, dataset: Cid) -> DownloadStatus:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
response = await session.get(
|
||||
self.codex_api_url._replace(
|
||||
path=f"/api/codex/v1/download/{dataset}"
|
||||
).url,
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
response_contents = await response.json()
|
||||
|
||||
return DownloadStatus(
|
||||
downloaded=response_contents["downloaded"], total=response_contents["total"]
|
||||
)
|
||||
|
||||
async def leave_swarm(self, dataset: Cid) -> None:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
response = await session.delete(
|
||||
self.codex_api_url._replace(
|
||||
path=f"/api/codex/v1/download/{dataset}"
|
||||
).url,
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
import math
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
API_VERSION = "v1"
|
||||
@ -14,6 +16,10 @@ class Manifest(BaseModel):
|
||||
mimetype: str
|
||||
protected: bool
|
||||
|
||||
@property
|
||||
def block_count(self) -> int:
|
||||
return math.ceil(self.datasetSize / self.blockSize)
|
||||
|
||||
@staticmethod
|
||||
def from_codex_api_response(response: dict) -> "Manifest":
|
||||
return Manifest.model_validate(
|
||||
|
||||
@ -4,6 +4,7 @@ import pytest
|
||||
from urllib3.util import parse_url
|
||||
|
||||
from benchmarks.codex.client.async_client import AsyncCodexClientImpl
|
||||
from benchmarks.core.concurrency import await_predicate_async
|
||||
from benchmarks.core.utils.random import random_data
|
||||
from benchmarks.core.utils.units import megabytes
|
||||
|
||||
@ -28,14 +29,21 @@ async def test_should_download_file(codex_node_1_url: str):
|
||||
client = AsyncCodexClientImpl(parse_url(codex_node_1_url))
|
||||
|
||||
data = BytesIO()
|
||||
random_data(megabytes(1), data)
|
||||
random_data(megabytes(5), data)
|
||||
|
||||
cid = await client.upload(
|
||||
"test.txt", "application/octet-stream", BytesIO(data.getvalue())
|
||||
)
|
||||
assert cid is not None
|
||||
|
||||
async with client.download(cid) as content:
|
||||
downloaded = await content.readexactly(megabytes(1))
|
||||
manifest = await client.manifest(cid)
|
||||
dataset_cid = await client.download(manifest)
|
||||
|
||||
assert downloaded == data.getvalue()
|
||||
async def is_complete():
|
||||
status = await client.download_status(dataset_cid)
|
||||
assert status.total == manifest.block_count
|
||||
return status.is_complete()
|
||||
|
||||
await await_predicate_async(is_complete, timeout=10)
|
||||
|
||||
await client.leave_swarm(dataset_cid)
|
||||
|
||||
@ -17,6 +17,7 @@ from urllib3.util import Url
|
||||
|
||||
from benchmarks.codex.agent.agent import Cid, DownloadStatus
|
||||
from benchmarks.codex.agent.codex_agent_client import CodexAgentClient
|
||||
from benchmarks.codex.client.common import Manifest
|
||||
from benchmarks.core.concurrency import await_predicate
|
||||
from benchmarks.core.experiments.experiments import ExperimentComponent
|
||||
from benchmarks.core.network import Node, DownloadHandle
|
||||
@ -34,12 +35,18 @@ class CodexMeta:
|
||||
name: str
|
||||
|
||||
|
||||
class CodexNode(Node[Cid, CodexMeta], ExperimentComponent):
|
||||
def __init__(self, codex_api_url: Url, agent: CodexAgentClient) -> None:
|
||||
class CodexNode(Node[Manifest, CodexMeta], ExperimentComponent):
|
||||
def __init__(
|
||||
self,
|
||||
codex_api_url: Url,
|
||||
agent: CodexAgentClient,
|
||||
remove_data: bool = False,
|
||||
) -> None:
|
||||
self.codex_api_url = codex_api_url
|
||||
self.agent = agent
|
||||
# Lightweight tracking of datasets created by this node. It's OK if we lose them.
|
||||
self.hosted_datasets: Set[Cid] = set()
|
||||
self.hosted_datasets: dict[Cid, Manifest] = {}
|
||||
self.remove_data = remove_data
|
||||
|
||||
def is_ready(self) -> bool:
|
||||
try:
|
||||
@ -55,33 +62,72 @@ class CodexNode(Node[Cid, CodexMeta], ExperimentComponent):
|
||||
wait=WAIT_POLICY,
|
||||
retry=retry_if_not_exception_type(HTTPError),
|
||||
)
|
||||
def genseed(self, size: int, seed: int, meta: CodexMeta) -> Cid:
|
||||
cid = self.agent.generate(size=size, seed=seed, name=meta.name)
|
||||
self.hosted_datasets.add(cid)
|
||||
return cid
|
||||
def genseed(self, size: int, seed: int, meta: CodexMeta) -> Manifest:
|
||||
manifest = self.agent.generate(size=size, seed=seed, name=meta.name)
|
||||
# Joins the swarm.
|
||||
self.agent.download(manifest)
|
||||
self.hosted_datasets[manifest.treeCid] = manifest
|
||||
return manifest
|
||||
|
||||
@retry(
|
||||
stop=STOP_POLICY,
|
||||
wait=WAIT_POLICY,
|
||||
retry=retry_if_not_exception_type(HTTPError),
|
||||
)
|
||||
def leech(self, handle: Cid) -> DownloadHandle:
|
||||
self.hosted_datasets.add(handle)
|
||||
def leech(self, handle: Manifest) -> DownloadHandle:
|
||||
self.hosted_datasets[handle.treeCid] = handle
|
||||
return CodexDownloadHandle(parent=self, monitor_url=self.agent.download(handle))
|
||||
|
||||
def remove(self, handle: Cid) -> bool:
|
||||
def remove(self, handle: Manifest) -> bool:
|
||||
# Removes node from the swarm.
|
||||
success = self._leave_swarm(handle)
|
||||
|
||||
# Removes the data, if requested.
|
||||
if self.remove_data:
|
||||
success &= self._purge_local_data(handle)
|
||||
|
||||
return success
|
||||
|
||||
def swarms(self) -> Set[Cid]:
|
||||
response = requests.get(
|
||||
str(self.codex_api_url._replace(path="/api/codex/v1/download"))
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
return set(response.json()["activeSwarms"])
|
||||
|
||||
def _leave_swarm(self, handle: Manifest) -> bool:
|
||||
response = requests.delete(
|
||||
str(self.codex_api_url._replace(path=f"/api/codex/v1/data/{handle}")),
|
||||
timeout=DELETE_TIMEOUT,
|
||||
str(
|
||||
self.codex_api_url._replace(
|
||||
path=f"/api/codex/v1/download/{handle.treeCid}"
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
raise_if_server_error(response)
|
||||
|
||||
return response.ok
|
||||
|
||||
def _purge_local_data(self, handle: Manifest) -> bool:
|
||||
# Purge data on disk, if requested.
|
||||
if self.remove_data:
|
||||
response = requests.delete(
|
||||
str(
|
||||
self.codex_api_url._replace(path=f"/api/codex/v1/data/{handle.cid}")
|
||||
),
|
||||
timeout=DELETE_TIMEOUT,
|
||||
)
|
||||
|
||||
raise_if_server_error(response)
|
||||
return response.ok
|
||||
|
||||
return True
|
||||
|
||||
def exists_local(self, handle: Cid) -> bool:
|
||||
def exists_local(self, handle: Manifest) -> bool:
|
||||
"""Check if a dataset exists on the node."""
|
||||
response = requests.get(
|
||||
str(self.codex_api_url._replace(path=f"/api/codex/v1/data/{handle}"))
|
||||
str(self.codex_api_url._replace(path=f"/api/codex/v1/data/{handle.cid}"))
|
||||
)
|
||||
|
||||
response.close()
|
||||
@ -95,12 +141,12 @@ class CodexNode(Node[Cid, CodexMeta], ExperimentComponent):
|
||||
return True
|
||||
|
||||
def download_local(
|
||||
self, handle: Cid, chunk_size: int = megabytes(1)
|
||||
self, handle: Manifest, chunk_size: int = megabytes(1)
|
||||
) -> Iterator[bytes]:
|
||||
"""Retrieves the contents of a locally available
|
||||
dataset from the node."""
|
||||
response = requests.get(
|
||||
str(self.codex_api_url._replace(path=f"/api/codex/v1/data/{handle}"))
|
||||
str(self.codex_api_url._replace(path=f"/api/codex/v1/data/{handle.cid}"))
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
@ -108,9 +154,9 @@ class CodexNode(Node[Cid, CodexMeta], ExperimentComponent):
|
||||
return response.iter_content(chunk_size=chunk_size)
|
||||
|
||||
def wipe_all_datasets(self):
|
||||
for dataset in list(self.hosted_datasets):
|
||||
for dataset in list(self.hosted_datasets.values()):
|
||||
self.remove(dataset)
|
||||
self.hosted_datasets.remove(dataset)
|
||||
del self.hosted_datasets[dataset.treeCid]
|
||||
|
||||
@cached_property
|
||||
def name(self) -> str:
|
||||
@ -141,3 +187,9 @@ class CodexDownloadHandle(DownloadHandle):
|
||||
response.raise_for_status()
|
||||
|
||||
return DownloadStatus.model_validate(response.json())
|
||||
|
||||
|
||||
def raise_if_server_error(response: requests.Response):
|
||||
"""Raises an exception if the server returns a 5xx error."""
|
||||
if response.status_code >= 500 or response.status_code != 404:
|
||||
response.raise_for_status()
|
||||
|
||||
@ -74,6 +74,7 @@ class CodexExperimentConfig(
|
||||
)
|
||||
|
||||
download_metric_unit_bytes: int = 1
|
||||
remove_data: bool = False
|
||||
|
||||
def build(self) -> CodexDisseminationExperiment:
|
||||
node_specs = (
|
||||
@ -90,6 +91,7 @@ class CodexExperimentConfig(
|
||||
CodexNode(
|
||||
codex_api_url=parse_url(f"http://{str(node.address)}:{node.api_port}"),
|
||||
agent=agents[i],
|
||||
remove_data=self.remove_data,
|
||||
)
|
||||
for i, node in enumerate(node_specs)
|
||||
]
|
||||
|
||||
1
benchmarks/codex/tests/fixtures/fixtures.py
vendored
@ -13,6 +13,7 @@ def codex_node(codex_api_url: str, agent_url: str) -> Iterator[CodexNode]:
|
||||
node = CodexNode(
|
||||
codex_api_url=parse_url(codex_api_url),
|
||||
agent=CodexAgentClient(parse_url(agent_url)),
|
||||
remove_data=True,
|
||||
)
|
||||
assert await_predicate(node.is_ready, timeout=10, polling_interval=0.5)
|
||||
|
||||
|
||||
@ -9,12 +9,12 @@ from benchmarks.core.utils.units import megabytes
|
||||
|
||||
@pytest.mark.codex_integration
|
||||
def test_should_download_file(codex_node1: CodexNode, codex_node2: CodexNode):
|
||||
cid = codex_node1.genseed(
|
||||
manifest = codex_node1.genseed(
|
||||
size=megabytes(1),
|
||||
seed=1234,
|
||||
meta=CodexMeta(name="dataset1"),
|
||||
)
|
||||
handle = codex_node2.leech(cid)
|
||||
handle = codex_node2.leech(manifest)
|
||||
|
||||
assert handle.await_for_completion(5)
|
||||
assert cast(CodexDownloadHandle, handle).completion() == DownloadStatus(
|
||||
@ -22,6 +22,19 @@ def test_should_download_file(codex_node1: CodexNode, codex_node2: CodexNode):
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.codex_integration
|
||||
def test_should_leave_swarm_on_remove(codex_node1: CodexNode):
|
||||
manifest = codex_node1.genseed(
|
||||
size=megabytes(1),
|
||||
seed=1234,
|
||||
meta=CodexMeta(name="dataset1"),
|
||||
)
|
||||
assert codex_node1.swarms() == {manifest.treeCid}
|
||||
|
||||
codex_node1.remove(manifest)
|
||||
assert codex_node1.swarms() == set()
|
||||
|
||||
|
||||
@pytest.mark.codex_integration
|
||||
def test_should_remove_file(codex_node1: CodexNode):
|
||||
cid = codex_node1.genseed(
|
||||
|
||||
@ -35,13 +35,13 @@ def test_should_run_with_a_single_seeder(codex_node1, codex_node2, codex_node3):
|
||||
experiment.setup()
|
||||
experiment.do_run()
|
||||
|
||||
all_datasets = list(codex_node1.hosted_datasets)
|
||||
all_datasets = list(codex_node1.hosted_datasets.values())
|
||||
assert len(all_datasets) == 1
|
||||
cid = all_datasets[0]
|
||||
manifest = all_datasets[0]
|
||||
|
||||
content_1 = merge_chunks(codex_node1.download_local(cid))
|
||||
content_2 = merge_chunks(codex_node2.download_local(cid))
|
||||
content_3 = merge_chunks(codex_node3.download_local(cid))
|
||||
content_1 = merge_chunks(codex_node1.download_local(manifest))
|
||||
content_2 = merge_chunks(codex_node2.download_local(manifest))
|
||||
content_3 = merge_chunks(codex_node3.download_local(manifest))
|
||||
|
||||
assert len(content_1) == megabytes(2)
|
||||
assert content_1 == content_2 == content_3
|
||||
|
||||
@ -4,6 +4,9 @@ codex_experiment:
|
||||
seeders: ${SEEDERS}
|
||||
file_size: ${FILE_SIZE}
|
||||
repetitions: ${REPETITIONS}
|
||||
# Should we delete the data at the end of each experiment (slower, uses less space), or we just
|
||||
# leave it there (faster, uses more space)?
|
||||
remove_data: ${REMOVE_DATA}
|
||||
# No need for cooldown as Codex takes forever to remove files, so there's plenty of time to log stuff. :-)
|
||||
logging_cooldown: 0
|
||||
|
||||
|
||||
@ -2,6 +2,7 @@ codex_experiment:
|
||||
seeders: 1
|
||||
file_size: 52428800
|
||||
repetitions: 3
|
||||
remove_data: true
|
||||
|
||||
nodes:
|
||||
- name: codex-1
|
||||
|
||||
@ -25,7 +25,7 @@ services:
|
||||
image: ${COMPOSE_CODEX_IMAGE:-codexstorage/nim-codex:latest}
|
||||
container_name: codex-1
|
||||
environment:
|
||||
- CODEX_LOG_LEVEL=DEBUG
|
||||
- CODEX_LOG_LEVEL=DEBUG;trace:swarm,blockexcnetworkpeer
|
||||
- CODEX_DATA_DIR=/var/lib/codex
|
||||
- CODEX_DISC_PORT=6890
|
||||
- CODEX_API_BINDADDR=0.0.0.0
|
||||
@ -52,7 +52,7 @@ services:
|
||||
image: ${COMPOSE_CODEX_IMAGE:-codexstorage/nim-codex:latest}
|
||||
container_name: codex-2
|
||||
environment:
|
||||
- CODEX_LOG_LEVEL=DEBUG
|
||||
- CODEX_LOG_LEVEL=DEBUG;trace:swarm,blockexcnetworkpeer
|
||||
- CODEX_DATA_DIR=/var/lib/codex
|
||||
- CODEX_DISC_PORT=6892
|
||||
- CODEX_API_BINDADDR=0.0.0.0
|
||||
@ -80,7 +80,7 @@ services:
|
||||
image: ${COMPOSE_CODEX_IMAGE:-codexstorage/nim-codex:latest}
|
||||
container_name: codex-3
|
||||
environment:
|
||||
- CODEX_LOG_LEVEL=DEBUG
|
||||
- CODEX_LOG_LEVEL=DEBUG;trace:swarm,blockexcnetworkpeer
|
||||
- CODEX_DATA_DIR=/var/lib/codex
|
||||
- CODEX_DISC_PORT=6894
|
||||
- CODEX_API_BINDADDR=0.0.0.0
|
||||
|
||||
@ -9,8 +9,13 @@ Expand the name of the chart.
|
||||
{{- mul $sizeNum (index $size $sizeUnit) -}}
|
||||
{{- end -}}
|
||||
|
||||
{{- define "codex.quota" }}
|
||||
{{- div (mul (include "filesize.bytes" .) 13) 10 -}}
|
||||
{{- define "experiment.count" -}}
|
||||
{{- mul .Values.experiment.seederSets .Values.experiment.repetitions -}}
|
||||
{{- end -}}
|
||||
|
||||
{{- define "codex.quota" -}}
|
||||
{{- $mulFactor := .Values.experiment.removeData | ternary 1 (include "experiment.count" .) -}}
|
||||
{{- div (mul (mul (include "filesize.bytes" .) $mulFactor) 13) 10 -}}
|
||||
{{- end -}}
|
||||
|
||||
{{- define "experiment.groupId" -}}
|
||||
|
||||
@ -36,6 +36,8 @@ spec:
|
||||
value: {{ .Values.experiment.repetitions | quote }}
|
||||
- name: SEEDER_SETS
|
||||
value: {{ .Values.experiment.seederSets | quote }}
|
||||
- name: REMOVE_DATA
|
||||
value: {{ .Values.experiment.removeData | quote }}
|
||||
- name: FILE_SIZE
|
||||
value: {{ include "filesize.bytes" . | quote }}
|
||||
- name: CODEX_STATEFULSET
|
||||
|
||||
@ -21,6 +21,10 @@ experiment:
|
||||
# If set to false, does not deploy a test runner (useful if you just want the network).
|
||||
testRunner: true
|
||||
|
||||
# If set to false, does not delete the data at the end of each experiment. This is faster
|
||||
# than deleting it, but requires more space.
|
||||
removeData: true
|
||||
|
||||
deployment:
|
||||
appName: ""
|
||||
|
||||
|
||||