feat(integration): use async client instead of standard Nim HTTP client (#1159)

* WiP: migrating CodexClient to chronos http client

* fix(api): fixes #1163

* feat: fully working API integration tests

* convert most of the tests in testupdownload

* feat: working updownload tests on async client

* feat: make testsales work with async codexclient

* feat: make testpurchasing work with async codexclient

* feat: make testblockexpiration work with async codexclient

* feat: make marketplacesuite work with async codexclient

* make testproofs work with async codexclient

* chore: refactor client to express higher level in terms of lower level operations

* fix: set correct content-length for erasure-coded datasets

* feat: make testecbug work with async client

* feat: make testvalidator work with async client

* refactor: simplify request aliases, add close operation

* wire back client.close at node shutdown

* refactor: remove unused exception

* fix: use await instead of waitFor on async call sites
This commit is contained in:
Giuliano Mega 2025-03-17 17:08:24 -03:00 committed by GitHub
parent 75db491d84
commit 54177e9fbf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 656 additions and 499 deletions

View File

@ -114,9 +114,14 @@ proc retrieveCid(
else:
resp.setHeader("Content-Disposition", "attachment")
resp.setHeader("Content-Length", $manifest.datasetSize.int)
# For erasure-coded datasets, we need to return the _original_ length; i.e.,
# the length of the non-erasure-coded dataset, as that's what we will be
# returning to the client.
let contentLength =
if manifest.protected: manifest.originalDatasetSize else: manifest.datasetSize
resp.setHeader("Content-Length", $(contentLength.int))
await resp.prepareChunked()
await resp.prepare(HttpResponseStreamType.Plain)
while not stream.atEof:
var
@ -129,7 +134,7 @@ proc retrieveCid(
bytes += buff.len
await resp.sendChunk(addr buff[0], buff.len)
await resp.send(addr buff[0], buff.len)
await resp.finish()
codex_api_downloads.inc()
except CancelledError as exc:

View File

@ -4,119 +4,216 @@ import std/strutils
from pkg/libp2p import Cid, `$`, init
import pkg/stint
import pkg/questionable/results
import pkg/chronos/apps/http/[httpserver, shttpserver, httpclient]
import pkg/chronos/apps/http/[httpserver, shttpserver, httpclient, httptable]
import pkg/codex/logutils
import pkg/codex/rest/json
import pkg/codex/purchasing
import pkg/codex/errors
import pkg/codex/sales/reservations
export purchasing
export purchasing, httptable, httpclient
type CodexClient* = ref object
baseurl: string
httpClients: seq[HttpClient]
type CodexClientError* = object of CatchableError
const HttpClientTimeoutMs = 60 * 1000
session: HttpSessionRef
proc new*(_: type CodexClient, baseurl: string): CodexClient =
CodexClient(baseurl: baseurl, httpClients: newSeq[HttpClient]())
CodexClient(session: HttpSessionRef.new(), baseurl: baseurl)
proc http*(client: CodexClient): HttpClient =
let httpClient = newHttpClient(timeout = HttpClientTimeoutMs)
client.httpClients.insert(httpClient)
return httpClient
proc close*(self: CodexClient): Future[void] {.async: (raises: []).} =
await self.session.closeWait()
proc close*(client: CodexClient): void =
for httpClient in client.httpClients:
httpClient.close()
proc request(
self: CodexClient,
httpMethod: httputils.HttpMethod,
url: string,
body: openArray[char] = [],
headers: openArray[HttpHeaderTuple] = [],
): Future[HttpClientResponseRef] {.
async: (raw: true, raises: [CancelledError, HttpError])
.} =
HttpClientRequestRef
.new(
self.session,
url,
httpMethod,
version = HttpVersion11,
flags = {},
maxResponseHeadersSize = HttpMaxHeadersSize,
headers = headers,
body = body.toOpenArrayByte(0, len(body) - 1),
).get
.send()
proc info*(client: CodexClient): ?!JsonNode =
let url = client.baseurl & "/debug/info"
JsonNode.parse(client.http().getContent(url))
proc post(
self: CodexClient,
url: string,
body: string = "",
headers: seq[HttpHeaderTuple] = @[],
): Future[HttpClientResponseRef] {.
async: (raw: true, raises: [CancelledError, HttpError])
.} =
return self.request(MethodPost, url, headers = headers, body = body)
proc setLogLevel*(client: CodexClient, level: string) =
let url = client.baseurl & "/debug/chronicles/loglevel?level=" & level
let headers = newHttpHeaders({"Content-Type": "text/plain"})
let response = client.http().request(url, httpMethod = HttpPost, headers = headers)
assert response.status == "200 OK"
proc get(
self: CodexClient, url: string, headers: seq[HttpHeaderTuple] = @[]
): Future[HttpClientResponseRef] {.
async: (raw: true, raises: [CancelledError, HttpError])
.} =
return self.request(MethodGet, url, headers = headers)
proc upload*(client: CodexClient, contents: string): ?!Cid =
let response = client.http().post(client.baseurl & "/data", contents)
assert response.status == "200 OK"
Cid.init(response.body).mapFailure
proc delete(
self: CodexClient, url: string, headers: seq[HttpHeaderTuple] = @[]
): Future[HttpClientResponseRef] {.
async: (raw: true, raises: [CancelledError, HttpError])
.} =
return self.request(MethodDelete, url, headers = headers)
proc upload*(client: CodexClient, bytes: seq[byte]): ?!Cid =
client.upload(string.fromBytes(bytes))
proc patch(
self: CodexClient,
url: string,
body: string = "",
headers: seq[HttpHeaderTuple] = @[],
): Future[HttpClientResponseRef] {.
async: (raw: true, raises: [CancelledError, HttpError])
.} =
return self.request(MethodPatch, url, headers = headers, body = body)
proc download*(client: CodexClient, cid: Cid, local = false): ?!string =
let response = client.http().get(
client.baseurl & "/data/" & $cid & (if local: "" else: "/network/stream")
)
proc body*(
response: HttpClientResponseRef
): Future[string] {.async: (raises: [CancelledError, HttpError]).} =
return bytesToString (await response.getBodyBytes())
if response.status != "200 OK":
return failure(response.status)
proc getContent(
client: CodexClient, url: string, headers: seq[HttpHeaderTuple] = @[]
): Future[string] {.async: (raises: [CancelledError, HttpError]).} =
let response = await client.get(url, headers)
return await response.body
success response.body
proc info*(
client: CodexClient
): Future[?!JsonNode] {.async: (raises: [CancelledError, HttpError]).} =
let response = await client.get(client.baseurl & "/debug/info")
return JsonNode.parse(await response.body)
proc downloadManifestOnly*(client: CodexClient, cid: Cid): ?!string =
let response =
client.http().get(client.baseurl & "/data/" & $cid & "/network/manifest")
proc setLogLevel*(
client: CodexClient, level: string
): Future[void] {.async: (raises: [CancelledError, HttpError]).} =
let
url = client.baseurl & "/debug/chronicles/loglevel?level=" & level
headers = @[("Content-Type", "text/plain")]
response = await client.post(url, headers = headers, body = "")
assert response.status == 200
if response.status != "200 OK":
return failure(response.status)
proc uploadRaw*(
client: CodexClient, contents: string, headers: seq[HttpHeaderTuple] = @[]
): Future[HttpClientResponseRef] {.
async: (raw: true, raises: [CancelledError, HttpError])
.} =
return client.post(client.baseurl & "/data", body = contents, headers = headers)
success response.body
proc upload*(
client: CodexClient, contents: string
): Future[?!Cid] {.async: (raises: [CancelledError, HttpError]).} =
let response = await client.uploadRaw(contents)
assert response.status == 200
Cid.init(await response.body).mapFailure
proc downloadNoStream*(client: CodexClient, cid: Cid): ?!string =
let response = client.http().post(client.baseurl & "/data/" & $cid & "/network")
proc upload*(
client: CodexClient, bytes: seq[byte]
): Future[?!Cid] {.async: (raw: true).} =
return client.upload(string.fromBytes(bytes))
if response.status != "200 OK":
return failure(response.status)
success response.body
proc downloadRaw*(
client: CodexClient, cid: string, local = false
): Future[HttpClientResponseRef] {.
async: (raw: true, raises: [CancelledError, HttpError])
.} =
return
client.get(client.baseurl & "/data/" & cid & (if local: "" else: "/network/stream"))
proc downloadBytes*(
client: CodexClient, cid: Cid, local = false
): Future[?!seq[byte]] {.async.} =
let uri = client.baseurl & "/data/" & $cid & (if local: "" else: "/network/stream")
): Future[?!seq[byte]] {.async: (raises: [CancelledError, HttpError]).} =
let response = await client.downloadRaw($cid, local = local)
let response = client.http().get(uri)
if response.status != 200:
return failure($response.status)
if response.status != "200 OK":
return failure("fetch failed with status " & $response.status)
success await response.getBodyBytes()
success response.body.toBytes
proc download*(
client: CodexClient, cid: Cid, local = false
): Future[?!string] {.async: (raises: [CancelledError, HttpError]).} =
without response =? await client.downloadBytes(cid, local = local), err:
return failure(err)
return success bytesToString(response)
proc delete*(client: CodexClient, cid: Cid): ?!void =
let
url = client.baseurl & "/data/" & $cid
response = client.http().delete(url)
proc downloadNoStream*(
client: CodexClient, cid: Cid
): Future[?!string] {.async: (raises: [CancelledError, HttpError]).} =
let response = await client.post(client.baseurl & "/data/" & $cid & "/network")
if response.status != "204 No Content":
return failure(response.status)
if response.status != 200:
return failure($response.status)
success await response.body
proc downloadManifestOnly*(
client: CodexClient, cid: Cid
): Future[?!string] {.async: (raises: [CancelledError, HttpError]).} =
let response =
await client.get(client.baseurl & "/data/" & $cid & "/network/manifest")
if response.status != 200:
return failure($response.status)
success await response.body
proc deleteRaw*(
client: CodexClient, cid: string
): Future[HttpClientResponseRef] {.
async: (raw: true, raises: [CancelledError, HttpError])
.} =
return client.delete(client.baseurl & "/data/" & cid)
proc delete*(
client: CodexClient, cid: Cid
): Future[?!void] {.async: (raises: [CancelledError, HttpError]).} =
let response = await client.deleteRaw($cid)
if response.status != 204:
return failure($response.status)
success()
proc list*(client: CodexClient): ?!RestContentList =
let url = client.baseurl & "/data"
let response = client.http().get(url)
proc listRaw*(
client: CodexClient
): Future[HttpClientResponseRef] {.
async: (raw: true, raises: [CancelledError, HttpError])
.} =
return client.get(client.baseurl & "/data")
if response.status != "200 OK":
return failure(response.status)
proc list*(
client: CodexClient
): Future[?!RestContentList] {.async: (raises: [CancelledError, HttpError]).} =
let response = await client.listRaw()
RestContentList.fromJson(response.body)
if response.status != 200:
return failure($response.status)
proc space*(client: CodexClient): ?!RestRepoStore =
RestContentList.fromJson(await response.body)
proc space*(
client: CodexClient
): Future[?!RestRepoStore] {.async: (raises: [CancelledError, HttpError]).} =
let url = client.baseurl & "/space"
let response = client.http().get(url)
let response = await client.get(url)
if response.status != "200 OK":
return failure(response.status)
if response.status != 200:
return failure($response.status)
RestRepoStore.fromJson(response.body)
RestRepoStore.fromJson(await response.body)
proc requestStorageRaw*(
client: CodexClient,
@ -128,7 +225,9 @@ proc requestStorageRaw*(
expiry: uint64 = 0,
nodes: uint = 3,
tolerance: uint = 1,
): Response =
): Future[HttpClientResponseRef] {.
async: (raw: true, raises: [CancelledError, HttpError])
.} =
## Call request storage REST endpoint
##
let url = client.baseurl & "/storage/request/" & $cid
@ -145,7 +244,7 @@ proc requestStorageRaw*(
if expiry != 0:
json["expiry"] = %($expiry)
return client.http().post(url, $json)
return client.post(url, $json)
proc requestStorage*(
client: CodexClient,
@ -157,43 +256,45 @@ proc requestStorage*(
collateralPerByte: UInt256,
nodes: uint = 3,
tolerance: uint = 1,
): ?!PurchaseId =
): Future[?!PurchaseId] {.async: (raises: [CancelledError, HttpError]).} =
## Call request storage REST endpoint
##
let response = client.requestStorageRaw(
cid, duration, pricePerBytePerSecond, proofProbability, collateralPerByte, expiry,
nodes, tolerance,
)
if response.status != "200 OK":
doAssert(false, response.body)
PurchaseId.fromHex(response.body).catch
let
response = await client.requestStorageRaw(
cid, duration, pricePerBytePerSecond, proofProbability, collateralPerByte, expiry,
nodes, tolerance,
)
body = await response.body
proc getPurchase*(client: CodexClient, purchaseId: PurchaseId): ?!RestPurchase =
if response.status != 200:
doAssert(false, body)
PurchaseId.fromHex(body).catch
proc getPurchase*(
client: CodexClient, purchaseId: PurchaseId
): Future[?!RestPurchase] {.async: (raises: [CancelledError, HttpError]).} =
let url = client.baseurl & "/storage/purchases/" & purchaseId.toHex
try:
let body = client.http().getContent(url)
let body = await client.getContent(url)
return RestPurchase.fromJson(body)
except CatchableError as e:
return failure e.msg
proc getSalesAgent*(client: CodexClient, slotId: SlotId): ?!RestSalesAgent =
proc getSalesAgent*(
client: CodexClient, slotId: SlotId
): Future[?!RestSalesAgent] {.async: (raises: [CancelledError, HttpError]).} =
let url = client.baseurl & "/sales/slots/" & slotId.toHex
try:
let body = client.http().getContent(url)
let body = await client.getContent(url)
return RestSalesAgent.fromJson(body)
except CatchableError as e:
return failure e.msg
proc getSlots*(client: CodexClient): ?!seq[Slot] =
let url = client.baseurl & "/sales/slots"
let body = client.http().getContent(url)
seq[Slot].fromJson(body)
proc postAvailability*(
client: CodexClient,
totalSize, duration: uint64,
minPricePerBytePerSecond, totalCollateral: UInt256,
): ?!Availability =
): Future[?!Availability] {.async: (raises: [CancelledError, HttpError]).} =
## Post sales availability endpoint
##
let url = client.baseurl & "/sales/availability"
@ -204,17 +305,21 @@ proc postAvailability*(
"minPricePerBytePerSecond": minPricePerBytePerSecond,
"totalCollateral": totalCollateral,
}
let response = client.http().post(url, $json)
doAssert response.status == "201 Created",
"expected 201 Created, got " & response.status & ", body: " & response.body
Availability.fromJson(response.body)
let response = await client.post(url, $json)
let body = await response.body
doAssert response.status == 201,
"expected 201 Created, got " & $response.status & ", body: " & body
Availability.fromJson(body)
proc patchAvailabilityRaw*(
client: CodexClient,
availabilityId: AvailabilityId,
totalSize, freeSize, duration: ?uint64 = uint64.none,
minPricePerBytePerSecond, totalCollateral: ?UInt256 = UInt256.none,
): Response =
): Future[HttpClientResponseRef] {.
async: (raw: true, raises: [CancelledError, HttpError])
.} =
## Updates availability
##
let url = client.baseurl & "/sales/availability/" & $availabilityId
@ -237,66 +342,50 @@ proc patchAvailabilityRaw*(
if totalCollateral =? totalCollateral:
json["totalCollateral"] = %totalCollateral
client.http().patch(url, $json)
client.patch(url, $json)
proc patchAvailability*(
client: CodexClient,
availabilityId: AvailabilityId,
totalSize, duration: ?uint64 = uint64.none,
minPricePerBytePerSecond, totalCollateral: ?UInt256 = UInt256.none,
): void =
let response = client.patchAvailabilityRaw(
): Future[void] {.async: (raises: [CancelledError, HttpError]).} =
let response = await client.patchAvailabilityRaw(
availabilityId,
totalSize = totalSize,
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = totalCollateral,
)
doAssert response.status == "200 OK", "expected 200 OK, got " & response.status
doAssert response.status == 200, "expected 200 OK, got " & $response.status
proc getAvailabilities*(client: CodexClient): ?!seq[Availability] =
proc getAvailabilities*(
client: CodexClient
): Future[?!seq[Availability]] {.async: (raises: [CancelledError, HttpError]).} =
## Call sales availability REST endpoint
let url = client.baseurl & "/sales/availability"
let body = client.http().getContent(url)
let body = await client.getContent(url)
seq[Availability].fromJson(body)
proc getAvailabilityReservations*(
client: CodexClient, availabilityId: AvailabilityId
): ?!seq[Reservation] =
): Future[?!seq[Reservation]] {.async: (raises: [CancelledError, HttpError]).} =
## Retrieves Availability's Reservations
let url = client.baseurl & "/sales/availability/" & $availabilityId & "/reservations"
let body = client.http().getContent(url)
let body = await client.getContent(url)
seq[Reservation].fromJson(body)
proc purchaseStateIs*(client: CodexClient, id: PurchaseId, state: string): bool =
client.getPurchase(id).option .? state == some state
proc purchaseStateIs*(
client: CodexClient, id: PurchaseId, state: string
): Future[bool] {.async: (raises: [CancelledError, HttpError]).} =
(await client.getPurchase(id)).option .? state == some state
proc saleStateIs*(client: CodexClient, id: SlotId, state: string): bool =
client.getSalesAgent(id).option .? state == some state
proc saleStateIs*(
client: CodexClient, id: SlotId, state: string
): Future[bool] {.async: (raises: [CancelledError, HttpError]).} =
(await client.getSalesAgent(id)).option .? state == some state
proc requestId*(client: CodexClient, id: PurchaseId): ?RequestId =
return client.getPurchase(id).option .? requestId
proc uploadRaw*(
client: CodexClient, contents: string, headers = newHttpHeaders()
): Response =
return client.http().request(
client.baseurl & "/data",
body = contents,
httpMethod = HttpPost,
headers = headers,
)
proc listRaw*(client: CodexClient): Response =
return client.http().request(client.baseurl & "/data", httpMethod = HttpGet)
proc downloadRaw*(
client: CodexClient, cid: string, local = false, httpClient = client.http()
): Response =
return httpClient.request(
client.baseurl & "/data/" & cid & (if local: "" else: "/network/stream"),
httpMethod = HttpGet,
)
proc deleteRaw*(client: CodexClient, cid: string): Response =
return client.http().request(client.baseurl & "/data/" & cid, httpMethod = HttpDelete)
proc requestId*(
client: CodexClient, id: PurchaseId
): Future[?RequestId] {.async: (raises: [CancelledError, HttpError]).} =
return (await client.getPurchase(id)).option .? requestId

View File

@ -68,7 +68,7 @@ method stop*(node: CodexProcess) {.async.} =
trace "stopping codex client"
if client =? node.client:
client.close()
await client.close()
node.client = none CodexClient
method removeDataDir*(node: CodexProcess) =

View File

@ -60,13 +60,13 @@ template marketplacesuite*(name: string, body: untyped) =
duration: uint64,
collateralPerByte: UInt256,
minPricePerBytePerSecond: UInt256,
) =
): Future[void] {.async: (raises: [CancelledError, HttpError, ConfigurationError]).} =
let totalCollateral = datasetSize.u256 * collateralPerByte
# post availability to each provider
for i in 0 ..< providers().len:
let provider = providers()[i].client
discard provider.postAvailability(
discard await provider.postAvailability(
totalSize = datasetSize,
duration = duration.uint64,
minPricePerBytePerSecond = minPricePerBytePerSecond,
@ -83,16 +83,18 @@ template marketplacesuite*(name: string, body: untyped) =
expiry: uint64 = 4.periods,
nodes = providers().len,
tolerance = 0,
): Future[PurchaseId] {.async.} =
let id = client.requestStorage(
cid,
expiry = expiry,
duration = duration,
proofProbability = proofProbability,
collateralPerByte = collateralPerByte,
pricePerBytePerSecond = pricePerBytePerSecond,
nodes = nodes.uint,
tolerance = tolerance.uint,
): Future[PurchaseId] {.async: (raises: [CancelledError, HttpError]).} =
let id = (
await client.requestStorage(
cid,
expiry = expiry,
duration = duration,
proofProbability = proofProbability,
collateralPerByte = collateralPerByte,
pricePerBytePerSecond = pricePerBytePerSecond,
nodes = nodes.uint,
tolerance = tolerance.uint,
)
).get
return id

View File

@ -275,8 +275,10 @@ template multinodesuite*(name: string, body: untyped) =
fail()
quit(1)
proc updateBootstrapNodes(node: CodexProcess) =
without ninfo =? node.client.info():
proc updateBootstrapNodes(
node: CodexProcess
): Future[void] {.async: (raises: [CatchableError]).} =
without ninfo =? await node.client.info():
# raise CatchableError instead of Defect (with .get or !) so we
# can gracefully shutdown and prevent zombies
raiseMultiNodeSuiteError "Failed to get node info"
@ -315,14 +317,14 @@ template multinodesuite*(name: string, body: untyped) =
for config in clients.configs:
let node = await startClientNode(config)
running.add RunningNode(role: Role.Client, node: node)
CodexProcess(node).updateBootstrapNodes()
await CodexProcess(node).updateBootstrapNodes()
if var providers =? nodeConfigs.providers:
failAndTeardownOnError "failed to start provider nodes":
for config in providers.configs.mitems:
let node = await startProviderNode(config)
running.add RunningNode(role: Role.Provider, node: node)
CodexProcess(node).updateBootstrapNodes()
await CodexProcess(node).updateBootstrapNodes()
if var validators =? nodeConfigs.validators:
failAndTeardownOnError "failed to start validator nodes":

View File

@ -18,11 +18,11 @@ multinodesuite "Node block expiration tests":
let client = clients()[0]
let clientApi = client.client
let contentId = clientApi.upload(content).get
let contentId = (await clientApi.upload(content)).get
await sleepAsync(2.seconds)
let download = clientApi.download(contentId, local = true)
let download = await clientApi.download(contentId, local = true)
check:
download.isOk
@ -39,12 +39,12 @@ multinodesuite "Node block expiration tests":
let client = clients()[0]
let clientApi = client.client
let contentId = clientApi.upload(content).get
let contentId = (await clientApi.upload(content)).get
await sleepAsync(3.seconds)
let download = clientApi.download(contentId, local = true)
let download = await clientApi.download(contentId, local = true)
check:
download.isFailure
download.error.msg == "404 Not Found"
download.error.msg == "404"

View File

@ -13,21 +13,18 @@ marketplacesuite "Bug #821 - node crashes during erasure coding":
.withLogFile()
# uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
.withLogTopics("node", "erasure", "marketplace").some,
providers: CodexConfigs.init(nodes = 0)
# .debug() # uncomment to enable console log output
# .withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .withLogTopics("node", "marketplace", "sales", "reservations", "node", "proving", "clock")
.some,
providers: CodexConfigs.init(nodes = 0).some,
):
let pricePerBytePerSecond = 1.u256
let duration = 20.periods
let collateralPerByte = 1.u256
let expiry = 10.periods
let data = await RandomChunker.example(blocks = 8)
let client = clients()[0]
let clientApi = client.client
let
pricePerBytePerSecond = 1.u256
duration = 20.periods
collateralPerByte = 1.u256
expiry = 10.periods
data = await RandomChunker.example(blocks = 8)
client = clients()[0]
clientApi = client.client
let cid = clientApi.upload(data).get
let cid = (await clientApi.upload(data)).get
var requestId = none RequestId
proc onStorageRequested(eventResult: ?!StorageRequested) =
@ -49,9 +46,11 @@ marketplacesuite "Bug #821 - node crashes during erasure coding":
check eventually(requestId.isSome, timeout = expiry.int * 1000)
let request = await marketplace.getRequest(requestId.get)
let cidFromRequest = request.content.cid
let downloaded = await clientApi.downloadBytes(cidFromRequest, local = true)
let
request = await marketplace.getRequest(requestId.get)
cidFromRequest = request.content.cid
downloaded = await clientApi.downloadBytes(cidFromRequest, local = true)
check downloaded.isOk
check downloaded.get.toHex == data.toHex

View File

@ -37,15 +37,17 @@ marketplacesuite "Marketplace":
let size = 0xFFFFFF.uint64
let data = await RandomChunker.example(blocks = blocks)
# host makes storage available
let availability = host.postAvailability(
totalSize = size,
duration = 20 * 60.uint64,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = size.u256 * minPricePerBytePerSecond,
let availability = (
await host.postAvailability(
totalSize = size,
duration = 20 * 60.uint64,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = size.u256 * minPricePerBytePerSecond,
)
).get
# client requests storage
let cid = client.upload(data).get
let cid = (await client.upload(data)).get
let id = await client.requestStorage(
cid,
duration = 20 * 60.uint64,
@ -57,15 +59,17 @@ marketplacesuite "Marketplace":
tolerance = ecTolerance,
)
check eventually(client.purchaseStateIs(id, "started"), timeout = 10 * 60 * 1000)
let purchase = client.getPurchase(id).get
check eventually(
await client.purchaseStateIs(id, "started"), timeout = 10 * 60 * 1000
)
let purchase = (await client.getPurchase(id)).get
check purchase.error == none string
let availabilities = host.getAvailabilities().get
let availabilities = (await host.getAvailabilities()).get
check availabilities.len == 1
let newSize = availabilities[0].freeSize
check newSize > 0 and newSize < size
let reservations = host.getAvailabilityReservations(availability.id).get
let reservations = (await host.getAvailabilityReservations(availability.id)).get
check reservations.len == 3
check reservations[0].requestId == purchase.requestId
@ -80,15 +84,17 @@ marketplacesuite "Marketplace":
# host makes storage available
let startBalanceHost = await token.balanceOf(hostAccount)
discard host.postAvailability(
totalSize = size,
duration = 20 * 60.uint64,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = size.u256 * minPricePerBytePerSecond,
discard (
await host.postAvailability(
totalSize = size,
duration = 20 * 60.uint64,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = size.u256 * minPricePerBytePerSecond,
)
).get
# client requests storage
let cid = client.upload(data).get
let cid = (await client.upload(data)).get
let id = await client.requestStorage(
cid,
duration = duration,
@ -100,8 +106,10 @@ marketplacesuite "Marketplace":
tolerance = ecTolerance,
)
check eventually(client.purchaseStateIs(id, "started"), timeout = 10 * 60 * 1000)
let purchase = client.getPurchase(id).get
check eventually(
await client.purchaseStateIs(id, "started"), timeout = 10 * 60 * 1000
)
let purchase = (await client.getPurchase(id)).get
check purchase.error == none string
let clientBalanceBeforeFinished = await token.balanceOf(clientAccount)
@ -158,7 +166,7 @@ marketplacesuite "Marketplace payouts":
# provider makes storage available
let datasetSize = datasetSize(blocks, ecNodes, ecTolerance)
let totalAvailabilitySize = (datasetSize div 2).truncate(uint64)
discard providerApi.postAvailability(
discard await providerApi.postAvailability(
# make availability size small enough that we can't fill all the slots,
# thus causing a cancellation
totalSize = totalAvailabilitySize,
@ -167,7 +175,7 @@ marketplacesuite "Marketplace payouts":
totalCollateral = collateralPerByte * totalAvailabilitySize.u256,
)
let cid = clientApi.upload(data).get
let cid = (await clientApi.upload(data)).get
var slotIdxFilled = none uint64
proc onSlotFilled(eventResult: ?!SlotFilled) =
@ -189,11 +197,11 @@ marketplacesuite "Marketplace payouts":
# wait until one slot is filled
check eventually(slotIdxFilled.isSome, timeout = expiry.int * 1000)
let slotId = slotId(!clientApi.requestId(id), !slotIdxFilled)
let slotId = slotId(!(await clientApi.requestId(id)), !slotIdxFilled)
# wait until sale is cancelled
await ethProvider.advanceTime(expiry.u256)
check eventually providerApi.saleStateIs(slotId, "SaleCancelled")
check eventually await providerApi.saleStateIs(slotId, "SaleCancelled")
await advanceToNextPeriod()

View File

@ -42,14 +42,14 @@ marketplacesuite "Hosts submit regular proofs":
let data = await RandomChunker.example(blocks = blocks)
let datasetSize =
datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance)
createAvailabilities(
await createAvailabilities(
datasetSize.truncate(uint64),
duration,
collateralPerByte,
minPricePerBytePerSecond,
)
let cid = client0.upload(data).get
let cid = (await client0.upload(data)).get
let purchaseId = await client0.requestStorage(
cid,
@ -59,13 +59,13 @@ marketplacesuite "Hosts submit regular proofs":
tolerance = ecTolerance,
)
let purchase = client0.getPurchase(purchaseId).get
let purchase = (await client0.getPurchase(purchaseId)).get
check purchase.error == none string
let slotSize = slotSize(blocks, ecNodes, ecTolerance)
check eventually(
client0.purchaseStateIs(purchaseId, "started"), timeout = expiry.int * 1000
await client0.purchaseStateIs(purchaseId, "started"), timeout = expiry.int * 1000
)
var proofWasSubmitted = false
@ -119,27 +119,29 @@ marketplacesuite "Simulate invalid proofs":
let data = await RandomChunker.example(blocks = blocks)
let datasetSize =
datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance)
createAvailabilities(
await createAvailabilities(
datasetSize.truncate(uint64),
duration,
collateralPerByte,
minPricePerBytePerSecond,
)
let cid = client0.upload(data).get
let cid = (await client0.upload(data)).get
let purchaseId = await client0.requestStorage(
cid,
expiry = expiry,
duration = duration,
nodes = ecNodes,
tolerance = ecTolerance,
proofProbability = 1.u256,
let purchaseId = (
await client0.requestStorage(
cid,
expiry = expiry,
duration = duration,
nodes = ecNodes,
tolerance = ecTolerance,
proofProbability = 1.u256,
)
)
let requestId = client0.requestId(purchaseId).get
let requestId = (await client0.requestId(purchaseId)).get
check eventually(
client0.purchaseStateIs(purchaseId, "started"), timeout = expiry.int * 1000
await client0.purchaseStateIs(purchaseId, "started"), timeout = expiry.int * 1000
)
var slotWasFreed = false
@ -182,14 +184,14 @@ marketplacesuite "Simulate invalid proofs":
let data = await RandomChunker.example(blocks = blocks)
let datasetSize =
datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance)
createAvailabilities(
await createAvailabilities(
datasetSize.truncate(uint64),
duration,
collateralPerByte,
minPricePerBytePerSecond,
)
let cid = client0.upload(data).get
let cid = (await client0.upload(data)).get
let purchaseId = await client0.requestStorage(
cid,
@ -199,7 +201,7 @@ marketplacesuite "Simulate invalid proofs":
tolerance = ecTolerance,
proofProbability = 1.u256,
)
let requestId = client0.requestId(purchaseId).get
let requestId = (await client0.requestId(purchaseId)).get
var slotWasFilled = false
proc onSlotFilled(eventResult: ?!SlotFilled) =

View File

@ -8,22 +8,26 @@ import ../examples
twonodessuite "Purchasing":
test "node handles storage request", twoNodesConfig:
let data = await RandomChunker.example(blocks = 2)
let cid = client1.upload(data).get
let id1 = client1.requestStorage(
cid,
duration = 100.uint64,
pricePerBytePerSecond = 1.u256,
proofProbability = 3.u256,
expiry = 10.uint64,
collateralPerByte = 1.u256,
let cid = (await client1.upload(data)).get
let id1 = (
await client1.requestStorage(
cid,
duration = 100.uint64,
pricePerBytePerSecond = 1.u256,
proofProbability = 3.u256,
expiry = 10.uint64,
collateralPerByte = 1.u256,
)
).get
let id2 = client1.requestStorage(
cid,
duration = 400.uint64,
pricePerBytePerSecond = 2.u256,
proofProbability = 6.u256,
expiry = 10.uint64,
collateralPerByte = 2.u256,
let id2 = (
await client1.requestStorage(
cid,
duration = 400.uint64,
pricePerBytePerSecond = 2.u256,
proofProbability = 6.u256,
expiry = 10.uint64,
collateralPerByte = 2.u256,
)
).get
check id1 != id2
@ -34,19 +38,21 @@ twonodessuite "Purchasing":
rng, size = DefaultBlockSize * 2, chunkSize = DefaultBlockSize * 2
)
let data = await chunker.getBytes()
let cid = client1.upload(byteutils.toHex(data)).get
let id = client1.requestStorage(
cid,
duration = 100.uint64,
pricePerBytePerSecond = 1.u256,
proofProbability = 3.u256,
expiry = 30.uint64,
collateralPerByte = 1.u256,
nodes = 3,
tolerance = 1,
let cid = (await client1.upload(byteutils.toHex(data))).get
let id = (
await client1.requestStorage(
cid,
duration = 100.uint64,
pricePerBytePerSecond = 1.u256,
proofProbability = 3.u256,
expiry = 30.uint64,
collateralPerByte = 1.u256,
nodes = 3,
tolerance = 1,
)
).get
let request = client1.getPurchase(id).get.request.get
let request = (await client1.getPurchase(id)).get.request.get
check request.content.cid.data.buffer.len > 0
check request.ask.duration == 100.uint64
@ -75,23 +81,29 @@ twonodessuite "Purchasing":
test "node remembers purchase status after restart", twoNodesConfig:
let data = await RandomChunker.example(blocks = 2)
let cid = client1.upload(data).get
let id = client1.requestStorage(
cid,
duration = 10 * 60.uint64,
pricePerBytePerSecond = 1.u256,
proofProbability = 3.u256,
expiry = 5 * 60.uint64,
collateralPerByte = 1.u256,
nodes = 3.uint,
tolerance = 1.uint,
let cid = (await client1.upload(data)).get
let id = (
await client1.requestStorage(
cid,
duration = 10 * 60.uint64,
pricePerBytePerSecond = 1.u256,
proofProbability = 3.u256,
expiry = 5 * 60.uint64,
collateralPerByte = 1.u256,
nodes = 3.uint,
tolerance = 1.uint,
)
).get
check eventually(client1.purchaseStateIs(id, "submitted"), timeout = 3 * 60 * 1000)
check eventually(
await client1.purchaseStateIs(id, "submitted"), timeout = 3 * 60 * 1000
)
await node1.restart()
check eventually(client1.purchaseStateIs(id, "submitted"), timeout = 3 * 60 * 1000)
let request = client1.getPurchase(id).get.request.get
check eventually(
await client1.purchaseStateIs(id, "submitted"), timeout = 3 * 60 * 1000
)
let request = (await client1.getPurchase(id)).get.request.get
check request.ask.duration == (10 * 60).uint64
check request.ask.pricePerBytePerSecond == 1.u256
check request.ask.proofProbability == 3.u256
@ -102,19 +114,19 @@ twonodessuite "Purchasing":
test "node requires expiry and its value to be in future", twoNodesConfig:
let data = await RandomChunker.example(blocks = 2)
let cid = client1.upload(data).get
let cid = (await client1.upload(data)).get
let responseMissing = client1.requestStorageRaw(
let responseMissing = await client1.requestStorageRaw(
cid,
duration = 1.uint64,
pricePerBytePerSecond = 1.u256,
proofProbability = 3.u256,
collateralPerByte = 1.u256,
)
check responseMissing.status == "400 Bad Request"
check responseMissing.body == "Expiry required"
check responseMissing.status == 400
check (await responseMissing.body) == "Expiry required"
let responseBefore = client1.requestStorageRaw(
let responseBefore = await client1.requestStorageRaw(
cid,
duration = 10.uint64,
pricePerBytePerSecond = 1.u256,
@ -122,6 +134,6 @@ twonodessuite "Purchasing":
collateralPerByte = 1.u256,
expiry = 10.uint64,
)
check responseBefore.status == "400 Bad Request"
check responseBefore.status == 400
check "Expiry needs value bigger then zero and smaller then the request's duration" in
responseBefore.body
(await responseBefore.body)

View File

@ -1,4 +1,3 @@
import std/httpclient
import std/importutils
import std/net
import std/sequtils
@ -14,29 +13,31 @@ import json
twonodessuite "REST API":
test "nodes can print their peer information", twoNodesConfig:
check !client1.info() != !client2.info()
check !(await client1.info()) != !(await client2.info())
test "nodes can set chronicles log level", twoNodesConfig:
client1.setLogLevel("DEBUG;TRACE:codex")
await client1.setLogLevel("DEBUG;TRACE:codex")
test "node accepts file uploads", twoNodesConfig:
let cid1 = client1.upload("some file contents").get
let cid2 = client1.upload("some other contents").get
let cid1 = (await client1.upload("some file contents")).get
let cid2 = (await client1.upload("some other contents")).get
check cid1 != cid2
test "node shows used and available space", twoNodesConfig:
discard client1.upload("some file contents").get
discard (await client1.upload("some file contents")).get
let totalSize = 12.uint64
let minPricePerBytePerSecond = 1.u256
let totalCollateral = totalSize.u256 * minPricePerBytePerSecond
discard client1.postAvailability(
totalSize = totalSize,
duration = 2.uint64,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = totalCollateral,
discard (
await client1.postAvailability(
totalSize = totalSize,
duration = 2.uint64,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = totalCollateral,
)
).get
let space = client1.space().tryGet()
let space = (await client1.space()).tryGet()
check:
space.totalBlocks == 2
space.quotaMaxBytes == 21474836480.NBytes
@ -47,48 +48,52 @@ twonodessuite "REST API":
let content1 = "some file contents"
let content2 = "some other contents"
let cid1 = client1.upload(content1).get
let cid2 = client1.upload(content2).get
let list = client1.list().get
let cid1 = (await client1.upload(content1)).get
let cid2 = (await client1.upload(content2)).get
let list = (await client1.list()).get
check:
[cid1, cid2].allIt(it in list.content.mapIt(it.cid))
test "request storage fails for datasets that are too small", twoNodesConfig:
let cid = client1.upload("some file contents").get
let response = client1.requestStorageRaw(
cid,
duration = 10.uint64,
pricePerBytePerSecond = 1.u256,
proofProbability = 3.u256,
collateralPerByte = 1.u256,
expiry = 9.uint64,
let cid = (await client1.upload("some file contents")).get
let response = (
await client1.requestStorageRaw(
cid,
duration = 10.uint64,
pricePerBytePerSecond = 1.u256,
proofProbability = 3.u256,
collateralPerByte = 1.u256,
expiry = 9.uint64,
)
)
check:
response.status == "400 Bad Request"
response.body ==
response.status == 400
(await response.body) ==
"Dataset too small for erasure parameters, need at least " &
$(2 * DefaultBlockSize.int) & " bytes"
test "request storage succeeds for sufficiently sized datasets", twoNodesConfig:
let data = await RandomChunker.example(blocks = 2)
let cid = client1.upload(data).get
let response = client1.requestStorageRaw(
cid,
duration = 10.uint64,
pricePerBytePerSecond = 1.u256,
proofProbability = 3.u256,
collateralPerByte = 1.u256,
expiry = 9.uint64,
let cid = (await client1.upload(data)).get
let response = (
await client1.requestStorageRaw(
cid,
duration = 10.uint64,
pricePerBytePerSecond = 1.u256,
proofProbability = 3.u256,
collateralPerByte = 1.u256,
expiry = 9.uint64,
)
)
check:
response.status == "200 OK"
response.status == 200
test "request storage fails if tolerance is zero", twoNodesConfig:
let data = await RandomChunker.example(blocks = 2)
let cid = client1.upload(data).get
let cid = (await client1.upload(data)).get
let duration = 100.uint64
let pricePerBytePerSecond = 1.u256
let proofProbability = 3.u256
@ -97,17 +102,19 @@ twonodessuite "REST API":
let nodes = 3
let tolerance = 0
var responseBefore = client1.requestStorageRaw(
cid, duration, pricePerBytePerSecond, proofProbability, collateralPerByte, expiry,
nodes.uint, tolerance.uint,
var responseBefore = (
await client1.requestStorageRaw(
cid, duration, pricePerBytePerSecond, proofProbability, collateralPerByte,
expiry, nodes.uint, tolerance.uint,
)
)
check responseBefore.status == "400 Bad Request"
check responseBefore.body == "Tolerance needs to be bigger then zero"
check responseBefore.status == 400
check (await responseBefore.body) == "Tolerance needs to be bigger then zero"
test "request storage fails if duration exceeds limit", twoNodesConfig:
let data = await RandomChunker.example(blocks = 2)
let cid = client1.upload(data).get
let cid = (await client1.upload(data)).get
let duration = (31 * 24 * 60 * 60).uint64
# 31 days TODO: this should not be hardcoded, but waits for https://github.com/codex-storage/nim-codex/issues/1056
let proofProbability = 3.u256
@ -117,17 +124,19 @@ twonodessuite "REST API":
let tolerance = 2
let pricePerBytePerSecond = 1.u256
var responseBefore = client1.requestStorageRaw(
cid, duration, pricePerBytePerSecond, proofProbability, collateralPerByte, expiry,
nodes.uint, tolerance.uint,
var responseBefore = (
await client1.requestStorageRaw(
cid, duration, pricePerBytePerSecond, proofProbability, collateralPerByte,
expiry, nodes.uint, tolerance.uint,
)
)
check responseBefore.status == "400 Bad Request"
check "Duration exceeds limit of" in responseBefore.body
check responseBefore.status == 400
check "Duration exceeds limit of" in (await responseBefore.body)
test "request storage fails if nodes and tolerance aren't correct", twoNodesConfig:
let data = await RandomChunker.example(blocks = 2)
let cid = client1.upload(data).get
let cid = (await client1.upload(data)).get
let duration = 100.uint64
let pricePerBytePerSecond = 1.u256
let proofProbability = 3.u256
@ -138,19 +147,21 @@ twonodessuite "REST API":
for ecParam in ecParams:
let (nodes, tolerance) = ecParam
var responseBefore = client1.requestStorageRaw(
cid, duration, pricePerBytePerSecond, proofProbability, collateralPerByte,
expiry, nodes.uint, tolerance.uint,
var responseBefore = (
await client1.requestStorageRaw(
cid, duration, pricePerBytePerSecond, proofProbability, collateralPerByte,
expiry, nodes.uint, tolerance.uint,
)
)
check responseBefore.status == "400 Bad Request"
check responseBefore.body ==
check responseBefore.status == 400
check (await responseBefore.body) ==
"Invalid parameters: parameters must satify `1 < (nodes - tolerance) ≥ tolerance`"
test "request storage fails if tolerance > nodes (underflow protection)",
twoNodesConfig:
let data = await RandomChunker.example(blocks = 2)
let cid = client1.upload(data).get
let cid = (await client1.upload(data)).get
let duration = 100.uint64
let pricePerBytePerSecond = 1.u256
let proofProbability = 3.u256
@ -161,13 +172,15 @@ twonodessuite "REST API":
for ecParam in ecParams:
let (nodes, tolerance) = ecParam
var responseBefore = client1.requestStorageRaw(
cid, duration, pricePerBytePerSecond, proofProbability, collateralPerByte,
expiry, nodes.uint, tolerance.uint,
var responseBefore = (
await client1.requestStorageRaw(
cid, duration, pricePerBytePerSecond, proofProbability, collateralPerByte,
expiry, nodes.uint, tolerance.uint,
)
)
check responseBefore.status == "400 Bad Request"
check responseBefore.body ==
check responseBefore.status == 400
check (await responseBefore.body) ==
"Invalid parameters: `tolerance` cannot be greater than `nodes`"
for ecParams in @[
@ -177,70 +190,69 @@ twonodessuite "REST API":
test "request storage succeeds if nodes and tolerance within range " &
fmt"({minBlocks=}, {nodes=}, {tolerance=})", twoNodesConfig:
let data = await RandomChunker.example(blocks = minBlocks)
let cid = client1.upload(data).get
let cid = (await client1.upload(data)).get
let duration = 100.uint64
let pricePerBytePerSecond = 1.u256
let proofProbability = 3.u256
let expiry = 30.uint64
let collateralPerByte = 1.u256
var responseBefore = client1.requestStorageRaw(
cid, duration, pricePerBytePerSecond, proofProbability, collateralPerByte,
expiry, nodes.uint, tolerance.uint,
var responseBefore = (
await client1.requestStorageRaw(
cid, duration, pricePerBytePerSecond, proofProbability, collateralPerByte,
expiry, nodes.uint, tolerance.uint,
)
)
check responseBefore.status == "200 OK"
check responseBefore.status == 200
test "node accepts file uploads with content type", twoNodesConfig:
let headers = newHttpHeaders({"Content-Type": "text/plain"})
let response = client1.uploadRaw("some file contents", headers)
let headers = @[("Content-Type", "text/plain")]
let response = await client1.uploadRaw("some file contents", headers)
check response.status == "200 OK"
check response.body != ""
check response.status == 200
check (await response.body) != ""
test "node accepts file uploads with content disposition", twoNodesConfig:
let headers =
newHttpHeaders({"Content-Disposition": "attachment; filename=\"example.txt\""})
let response = client1.uploadRaw("some file contents", headers)
let headers = @[("Content-Disposition", "attachment; filename=\"example.txt\"")]
let response = await client1.uploadRaw("some file contents", headers)
check response.status == "200 OK"
check response.body != ""
check response.status == 200
check (await response.body) != ""
test "node accepts file uploads with content disposition without filename",
twoNodesConfig:
let headers = newHttpHeaders({"Content-Disposition": "attachment"})
let response = client1.uploadRaw("some file contents", headers)
let headers = @[("Content-Disposition", "attachment")]
let response = await client1.uploadRaw("some file contents", headers)
check response.status == "200 OK"
check response.body != ""
check response.status == 200
check (await response.body) != ""
test "upload fails if content disposition contains bad filename", twoNodesConfig:
let headers =
newHttpHeaders({"Content-Disposition": "attachment; filename=\"exam*ple.txt\""})
let response = client1.uploadRaw("some file contents", headers)
let headers = @[("Content-Disposition", "attachment; filename=\"exam*ple.txt\"")]
let response = await client1.uploadRaw("some file contents", headers)
check response.status == "422 Unprocessable Entity"
check response.body == "The filename is not valid."
check response.status == 422
check (await response.body) == "The filename is not valid."
test "upload fails if content type is invalid", twoNodesConfig:
let headers = newHttpHeaders({"Content-Type": "hello/world"})
let response = client1.uploadRaw("some file contents", headers)
let headers = @[("Content-Type", "hello/world")]
let response = await client1.uploadRaw("some file contents", headers)
check response.status == "422 Unprocessable Entity"
check response.body == "The MIME type 'hello/world' is not valid."
check response.status == 422
check (await response.body) == "The MIME type 'hello/world' is not valid."
test "node retrieve the metadata", twoNodesConfig:
let headers = newHttpHeaders(
{
"Content-Type": "text/plain",
"Content-Disposition": "attachment; filename=\"example.txt\"",
}
)
let uploadResponse = client1.uploadRaw("some file contents", headers)
let cid = uploadResponse.body
let listResponse = client1.listRaw()
let headers =
@[
("Content-Type", "text/plain"),
("Content-Disposition", "attachment; filename=\"example.txt\""),
]
let uploadResponse = await client1.uploadRaw("some file contents", headers)
let cid = await uploadResponse.body
let listResponse = await client1.listRaw()
let jsonData = parseJson(listResponse.body)
let jsonData = parseJson(await listResponse.body)
check jsonData.hasKey("content") == true
@ -256,83 +268,79 @@ twonodessuite "REST API":
check manifest["mimetype"].getStr() == "text/plain"
test "node set the headers when for download", twoNodesConfig:
let headers = newHttpHeaders(
{
"Content-Disposition": "attachment; filename=\"example.txt\"",
"Content-Type": "text/plain",
}
)
let headers =
@[
("Content-Disposition", "attachment; filename=\"example.txt\""),
("Content-Type", "text/plain"),
]
let uploadResponse = client1.uploadRaw("some file contents", headers)
let cid = uploadResponse.body
let uploadResponse = await client1.uploadRaw("some file contents", headers)
let cid = await uploadResponse.body
check uploadResponse.status == "200 OK"
check uploadResponse.status == 200
let response = client1.downloadRaw(cid)
let response = await client1.downloadRaw(cid)
check response.status == "200 OK"
check response.headers.hasKey("Content-Type") == true
check response.headers["Content-Type"] == "text/plain"
check response.headers.hasKey("Content-Disposition") == true
check response.headers["Content-Disposition"] ==
check response.status == 200
check "Content-Type" in response.headers
check response.headers.getString("Content-Type") == "text/plain"
check "Content-Disposition" in response.headers
check response.headers.getString("Content-Disposition") ==
"attachment; filename=\"example.txt\""
let local = true
let localResponse = client1.downloadRaw(cid, local)
let localResponse = await client1.downloadRaw(cid, local)
check localResponse.status == "200 OK"
check localResponse.headers.hasKey("Content-Type") == true
check localResponse.headers["Content-Type"] == "text/plain"
check localResponse.headers.hasKey("Content-Disposition") == true
check localResponse.headers["Content-Disposition"] ==
check localResponse.status == 200
check "Content-Type" in localResponse.headers
check localResponse.headers.getString("Content-Type") == "text/plain"
check "Content-Disposition" in localResponse.headers
check localResponse.headers.getString("Content-Disposition") ==
"attachment; filename=\"example.txt\""
test "should delete a dataset when requested", twoNodesConfig:
let cid = client1.upload("some file contents").get
let cid = (await client1.upload("some file contents")).get
var response = client1.downloadRaw($cid, local = true)
check response.body == "some file contents"
var response = await client1.downloadRaw($cid, local = true)
check (await response.body) == "some file contents"
client1.delete(cid).get
(await client1.delete(cid)).get
response = client1.downloadRaw($cid, local = true)
check response.status == "404 Not Found"
response = await client1.downloadRaw($cid, local = true)
check response.status == 404
test "should return 200 when attempting delete of non-existing block", twoNodesConfig:
let response = client1.deleteRaw($(Cid.example()))
check response.status == "204 No Content"
let response = await client1.deleteRaw($(Cid.example()))
check response.status == 204
test "should return 200 when attempting delete of non-existing dataset",
twoNodesConfig:
let cid = Manifest.example().makeManifestBlock().get.cid
let response = client1.deleteRaw($cid)
check response.status == "204 No Content"
let response = await client1.deleteRaw($cid)
check response.status == 204
test "should not crash if the download stream is closed before download completes",
twoNodesConfig:
privateAccess(client1.type)
privateAccess(client1.http.type)
# FIXME this is not a good test. For some reason, to get this to fail, I have to
# store content that is several times the default stream buffer size, otherwise
# the test will succeed even when the bug is present. Since this is probably some
# setting that is internal to chronos, it might change in future versions,
# invalidating this test. Works on Chronos 4.0.3.
let cid = client1.upload(repeat("some file contents", 1000)).get
let httpClient = client1.http()
let
contents = repeat("b", DefaultStreamBufferSize * 10)
cid = (await client1.upload(contents)).get
response = await client1.downloadRaw($cid)
try:
# Sadly, there's no high level API for preventing the client from
# consuming the whole response, and we need to close the socket
# before that happens if we want to trigger the bug, so we need to
# resort to this.
httpClient.getBody = false
let response = client1.downloadRaw($cid, httpClient = httpClient)
let reader = response.getBodyReader()
# Read 4 bytes from the stream just to make sure we actually
# receive some data.
let data = httpClient.socket.recv(4)
check data.len == 4
# Read 4 bytes from the stream just to make sure we actually
# receive some data.
check (bytesToString await reader.read(4)) == "bbbb"
# Prematurely closes the connection.
httpClient.close()
finally:
httpClient.getBody = true
# Abruptly closes the stream (we have to dig all the way to the transport
# or Chronos will close things "nicely").
response.connection.reader.tsource.close()
let response = client1.downloadRaw($cid, httpClient = httpClient)
check response.body == repeat("some file contents", 1000)
let response2 = await client1.downloadRaw($cid)
check (await response2.body) == contents

View File

@ -30,54 +30,63 @@ multinodesuite "Sales":
client = clients()[0].client
test "node handles new storage availability", salesConfig:
let availability1 = host.postAvailability(
totalSize = 1.uint64,
duration = 2.uint64,
minPricePerBytePerSecond = 3.u256,
totalCollateral = 4.u256,
let availability1 = (
await host.postAvailability(
totalSize = 1.uint64,
duration = 2.uint64,
minPricePerBytePerSecond = 3.u256,
totalCollateral = 4.u256,
)
).get
let availability2 = host.postAvailability(
totalSize = 4.uint64,
duration = 5.uint64,
minPricePerBytePerSecond = 6.u256,
totalCollateral = 7.u256,
let availability2 = (
await host.postAvailability(
totalSize = 4.uint64,
duration = 5.uint64,
minPricePerBytePerSecond = 6.u256,
totalCollateral = 7.u256,
)
).get
check availability1 != availability2
test "node lists storage that is for sale", salesConfig:
let availability = host.postAvailability(
totalSize = 1.uint64,
duration = 2.uint64,
minPricePerBytePerSecond = 3.u256,
totalCollateral = 4.u256,
let availability = (
await host.postAvailability(
totalSize = 1.uint64,
duration = 2.uint64,
minPricePerBytePerSecond = 3.u256,
totalCollateral = 4.u256,
)
).get
check availability in host.getAvailabilities().get
check availability in (await host.getAvailabilities()).get
test "updating non-existing availability", salesConfig:
let nonExistingResponse = host.patchAvailabilityRaw(
let nonExistingResponse = await host.patchAvailabilityRaw(
AvailabilityId.example,
duration = 100.uint64.some,
minPricePerBytePerSecond = 2.u256.some,
totalCollateral = 200.u256.some,
)
check nonExistingResponse.status == "404 Not Found"
check nonExistingResponse.status == 404
test "updating availability", salesConfig:
let availability = host.postAvailability(
totalSize = 140000.uint64,
duration = 200.uint64,
minPricePerBytePerSecond = 3.u256,
totalCollateral = 300.u256,
let availability = (
await host.postAvailability(
totalSize = 140000.uint64,
duration = 200.uint64,
minPricePerBytePerSecond = 3.u256,
totalCollateral = 300.u256,
)
).get
host.patchAvailability(
await host.patchAvailability(
availability.id,
duration = 100.uint64.some,
minPricePerBytePerSecond = 2.u256.some,
totalCollateral = 200.u256.some,
)
let updatedAvailability = (host.getAvailabilities().get).findItem(availability).get
let updatedAvailability =
((await host.getAvailabilities()).get).findItem(availability).get
check updatedAvailability.duration == 100.uint64
check updatedAvailability.minPricePerBytePerSecond == 2
check updatedAvailability.totalCollateral == 200
@ -85,26 +94,31 @@ multinodesuite "Sales":
check updatedAvailability.freeSize == 140000.uint64
test "updating availability - freeSize is not allowed to be changed", salesConfig:
let availability = host.postAvailability(
totalSize = 140000.uint64,
duration = 200.uint64,
minPricePerBytePerSecond = 3.u256,
totalCollateral = 300.u256,
let availability = (
await host.postAvailability(
totalSize = 140000.uint64,
duration = 200.uint64,
minPricePerBytePerSecond = 3.u256,
totalCollateral = 300.u256,
)
).get
let freeSizeResponse =
host.patchAvailabilityRaw(availability.id, freeSize = 110000.uint64.some)
check freeSizeResponse.status == "400 Bad Request"
check "not allowed" in freeSizeResponse.body
await host.patchAvailabilityRaw(availability.id, freeSize = 110000.uint64.some)
check freeSizeResponse.status == 400
check "not allowed" in (await freeSizeResponse.body)
test "updating availability - updating totalSize", salesConfig:
let availability = host.postAvailability(
totalSize = 140000.uint64,
duration = 200.uint64,
minPricePerBytePerSecond = 3.u256,
totalCollateral = 300.u256,
let availability = (
await host.postAvailability(
totalSize = 140000.uint64,
duration = 200.uint64,
minPricePerBytePerSecond = 3.u256,
totalCollateral = 300.u256,
)
).get
host.patchAvailability(availability.id, totalSize = 100000.uint64.some)
let updatedAvailability = (host.getAvailabilities().get).findItem(availability).get
await host.patchAvailability(availability.id, totalSize = 100000.uint64.some)
let updatedAvailability =
((await host.getAvailabilities()).get).findItem(availability).get
check updatedAvailability.totalSize == 100000
check updatedAvailability.freeSize == 100000
@ -115,38 +129,51 @@ multinodesuite "Sales":
let minPricePerBytePerSecond = 3.u256
let collateralPerByte = 1.u256
let totalCollateral = originalSize.u256 * collateralPerByte
let availability = host.postAvailability(
totalSize = originalSize,
duration = 20 * 60.uint64,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = totalCollateral,
let availability = (
await host.postAvailability(
totalSize = originalSize,
duration = 20 * 60.uint64,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = totalCollateral,
)
).get
# Lets create storage request that will utilize some of the availability's space
let cid = client.upload(data).get
let id = client.requestStorage(
cid,
duration = 20 * 60.uint64,
pricePerBytePerSecond = minPricePerBytePerSecond,
proofProbability = 3.u256,
expiry = (10 * 60).uint64,
collateralPerByte = collateralPerByte,
nodes = 3,
tolerance = 1,
let cid = (await client.upload(data)).get
let id = (
await client.requestStorage(
cid,
duration = 20 * 60.uint64,
pricePerBytePerSecond = minPricePerBytePerSecond,
proofProbability = 3.u256,
expiry = (10 * 60).uint64,
collateralPerByte = collateralPerByte,
nodes = 3,
tolerance = 1,
)
).get
check eventually(client.purchaseStateIs(id, "started"), timeout = 10 * 60 * 1000)
let updatedAvailability = (host.getAvailabilities().get).findItem(availability).get
check eventually(
await client.purchaseStateIs(id, "started"), timeout = 10 * 60 * 1000
)
let updatedAvailability =
((await host.getAvailabilities()).get).findItem(availability).get
check updatedAvailability.totalSize != updatedAvailability.freeSize
let utilizedSize = updatedAvailability.totalSize - updatedAvailability.freeSize
let totalSizeResponse =
host.patchAvailabilityRaw(availability.id, totalSize = (utilizedSize - 1).some)
check totalSizeResponse.status == "400 Bad Request"
check "totalSize must be larger then current totalSize" in totalSizeResponse.body
let totalSizeResponse = (
await host.patchAvailabilityRaw(
availability.id, totalSize = (utilizedSize - 1).some
)
)
check totalSizeResponse.status == 400
check "totalSize must be larger then current totalSize" in
(await totalSizeResponse.body)
host.patchAvailability(availability.id, totalSize = (originalSize + 20000).some)
await host.patchAvailability(
availability.id, totalSize = (originalSize + 20000).some
)
let newUpdatedAvailability =
(host.getAvailabilities().get).findItem(availability).get
((await host.getAvailabilities()).get).findItem(availability).get
check newUpdatedAvailability.totalSize == originalSize + 20000
check newUpdatedAvailability.freeSize - updatedAvailability.freeSize == 20000

View File

@ -9,11 +9,11 @@ twonodessuite "Uploads and downloads":
let content1 = "some file contents"
let content2 = "some other contents"
let cid1 = client1.upload(content1).get
let cid2 = client2.upload(content2).get
let cid1 = (await client1.upload(content1)).get
let cid2 = (await client2.upload(content2)).get
let resp1 = client1.download(cid1, local = true).get
let resp2 = client2.download(cid2, local = true).get
let resp1 = (await client1.download(cid1, local = true)).get
let resp2 = (await client2.download(cid2, local = true)).get
check:
content1 == resp1
@ -23,11 +23,11 @@ twonodessuite "Uploads and downloads":
let content1 = "some file contents"
let content2 = "some other contents"
let cid1 = client1.upload(content1).get
let cid2 = client2.upload(content2).get
let cid1 = (await client1.upload(content1)).get
let cid2 = (await client2.upload(content2)).get
let resp2 = client1.download(cid2, local = false).get
let resp1 = client2.download(cid1, local = false).get
let resp2 = (await client1.download(cid2, local = false)).get
let resp1 = (await client2.download(cid1, local = false)).get
check:
content1 == resp1
@ -35,11 +35,12 @@ twonodessuite "Uploads and downloads":
test "node fails retrieving non-existing local file", twoNodesConfig:
let content1 = "some file contents"
let cid1 = client1.upload(content1).get # upload to first node
let resp2 = client2.download(cid1, local = true) # try retrieving from second node
let cid1 = (await client1.upload(content1)).get # upload to first node
let resp2 =
await client2.download(cid1, local = true) # try retrieving from second node
check:
resp2.error.msg == "404 Not Found"
resp2.error.msg == "404"
proc checkRestContent(cid: Cid, content: ?!string) =
let c = content.tryGet()
@ -67,26 +68,28 @@ twonodessuite "Uploads and downloads":
test "node allows downloading only manifest", twoNodesConfig:
let content1 = "some file contents"
let cid1 = client1.upload(content1).get
let cid1 = (await client1.upload(content1)).get
let resp2 = client1.downloadManifestOnly(cid1)
let resp2 = await client1.downloadManifestOnly(cid1)
checkRestContent(cid1, resp2)
test "node allows downloading content without stream", twoNodesConfig:
let content1 = "some file contents"
let cid1 = client1.upload(content1).get
let
content1 = "some file contents"
cid1 = (await client1.upload(content1)).get
resp1 = await client2.downloadNoStream(cid1)
let resp1 = client2.downloadNoStream(cid1)
checkRestContent(cid1, resp1)
let resp2 = client2.download(cid1, local = true).get
let resp2 = (await client2.download(cid1, local = true)).get
check:
content1 == resp2
test "reliable transfer test", twoNodesConfig:
proc transferTest(a: CodexClient, b: CodexClient) {.async.} =
let data = await RandomChunker.example(blocks = 8)
let cid = a.upload(data).get
let response = b.download(cid).get
let cid = (await a.upload(data)).get
let response = (await b.download(cid)).get
check:
@response.mapIt(it.byte) == data

View File

@ -99,14 +99,14 @@ marketplacesuite "Validation":
let data = await RandomChunker.example(blocks = blocks)
let datasetSize =
datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance)
createAvailabilities(
await createAvailabilities(
datasetSize.truncate(uint64),
duration,
collateralPerByte,
minPricePerBytePerSecond,
)
let cid = client0.upload(data).get
let cid = (await client0.upload(data)).get
let purchaseId = await client0.requestStorage(
cid,
expiry = expiry,
@ -115,12 +115,12 @@ marketplacesuite "Validation":
tolerance = ecTolerance,
proofProbability = proofProbability,
)
let requestId = client0.requestId(purchaseId).get
let requestId = (await client0.requestId(purchaseId)).get
debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId
if not eventuallyS(
client0.purchaseStateIs(purchaseId, "started"),
await client0.purchaseStateIs(purchaseId, "started"),
timeout = (expiry + 60).int,
step = 5,
):
@ -169,14 +169,14 @@ marketplacesuite "Validation":
let data = await RandomChunker.example(blocks = blocks)
let datasetSize =
datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance)
createAvailabilities(
await createAvailabilities(
datasetSize.truncate(uint64),
duration,
collateralPerByte,
minPricePerBytePerSecond,
)
let cid = client0.upload(data).get
let cid = (await client0.upload(data)).get
let purchaseId = await client0.requestStorage(
cid,
expiry = expiry,
@ -185,12 +185,12 @@ marketplacesuite "Validation":
tolerance = ecTolerance,
proofProbability = proofProbability,
)
let requestId = client0.requestId(purchaseId).get
let requestId = (await client0.requestId(purchaseId)).get
debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId
if not eventuallyS(
client0.purchaseStateIs(purchaseId, "started"),
await client0.purchaseStateIs(purchaseId, "started"),
timeout = (expiry + 60).int,
step = 5,
):

View File

@ -24,7 +24,7 @@ suite "Taiko L2 Integration Tests":
)
node1.waitUntilStarted()
let bootstrap = (!node1.client.info())["spr"].getStr()
let bootstrap = (!(await node1.client.info()))["spr"].getStr()
node2 = startNode(
[