From 0595723f66bec374471aafab3a9412ea4f95371c Mon Sep 17 00:00:00 2001 From: Giuliano Mega Date: Tue, 4 Feb 2025 13:01:14 -0300 Subject: [PATCH 01/17] Minor improvements to download API (#1092) * chore: improve error messages in upload API * chore: remove unreachable (dead) code * fix: API integration test --- codex/rest/api.nim | 12 ++++++------ tests/integration/testrestapi.nim | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 134aa8d2..a64d26cf 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -179,7 +179,7 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute trace "Handling file upload" var bodyReader = request.getBodyReader() if bodyReader.isErr(): - return RestApiResponse.error(Http500) + return RestApiResponse.error(Http500, msg = bodyReader.error()) # Attempt to handle `Expect` header # some clients (curl), wait 1000ms @@ -190,10 +190,13 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute var mimetype = request.headers.getString(ContentTypeHeader).some if mimetype.get() != "": + let mimetypeVal = mimetype.get() var m = newMimetypes() - let extension = m.getExt(mimetype.get(), "") + let extension = m.getExt(mimetypeVal, "") if extension == "": - return RestApiResponse.error(Http422, "The MIME type is not valid.") + return RestApiResponse.error( + Http422, "The MIME type '" & mimetypeVal & "' is not valid." + ) else: mimetype = string.none @@ -231,9 +234,6 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute finally: await reader.closeWait() - trace "Something went wrong error" - return RestApiResponse.error(Http500) - router.api(MethodGet, "/api/codex/v1/data") do() -> RestApiResponse: let json = await formatManifestBlocks(node) return RestApiResponse.response($json, contentType = "application/json") diff --git a/tests/integration/testrestapi.nim b/tests/integration/testrestapi.nim index 52b722d6..f1f2299f 100644 --- a/tests/integration/testrestapi.nim +++ b/tests/integration/testrestapi.nim @@ -200,7 +200,7 @@ twonodessuite "REST API": let response = client1.uploadRaw("some file contents", headers) check response.status == "422 Unprocessable Entity" - check response.body == "The MIME type is not valid." + check response.body == "The MIME type 'hello/world' is not valid." test "node retrieve the metadata", twoNodesConfig: let headers = newHttpHeaders( From 54d499be419e6b82204f328f98c14508e0b2eabf Mon Sep 17 00:00:00 2001 From: Slava <20563034+veaceslavdoina@users.noreply.github.com> Date: Tue, 4 Feb 2025 23:22:34 +0200 Subject: [PATCH 02/17] docker: add BOOTSTRAP_NODE_URL to Docker entrypoint (#1098) --- docker/docker-entrypoint.sh | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/docker/docker-entrypoint.sh b/docker/docker-entrypoint.sh index 2ec840e9..d883f0eb 100644 --- a/docker/docker-entrypoint.sh +++ b/docker/docker-entrypoint.sh @@ -9,6 +9,34 @@ if [[ -n "${ENV_PATH}" ]]; then set +a fi +# Bootstrap node from URL +if [[ -n "${BOOTSTRAP_NODE_URL}" ]]; then + BOOTSTRAP_NODE_URL="${BOOTSTRAP_NODE_URL}/api/codex/v1/spr" + WAIT=${BOOTSTRAP_NODE_URL_WAIT:-300} + SECONDS=0 + SLEEP=1 + # Run and retry if fail + while (( SECONDS < WAIT )); do + SPR=$(curl -s -f -m 5 -H 'Accept: text/plain' "${BOOTSTRAP_NODE_URL}") + # Check if exit code is 0 and returned value is not empty + if [[ $? -eq 0 && -n "${SPR}" ]]; then + export CODEX_BOOTSTRAP_NODE="${SPR}" + echo "Bootstrap node: CODEX_BOOTSTRAP_NODE=${CODEX_BOOTSTRAP_NODE}" + break + else + # Sleep and check again + echo "Can't get SPR from ${BOOTSTRAP_NODE_URL} - Retry in $SLEEP seconds / $((WAIT - SECONDS))" + sleep $SLEEP + fi + done +fi + +# Stop Codex run if unable to get SPR +if [[ -n "${BOOTSTRAP_NODE_URL}" && -z "${CODEX_BOOTSTRAP_NODE}" ]]; then + echo "Unable to get SPR from ${BOOTSTRAP_NODE_URL} in ${BOOTSTRAP_NODE_URL_WAIT} seconds - Stop Codex run" + exit 1 +fi + # Parameters if [[ -z "${CODEX_NAT}" ]]; then if [[ "${NAT_IP_AUTO}" == "true" && -z "${NAT_PUBLIC_IP_AUTO}" ]]; then From c05eec422c8d6835f9606c2fdb35f645cc20c35f Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Thu, 6 Feb 2025 16:21:12 +0100 Subject: [PATCH 03/17] fix dataset and slot size calculations in integration tests (#1095) * fixes datasetSize and slotSize helpers (and also RandomChunker.example) * adds overload for <> for seq[byte] * changes RandomChunker.example to return seq[byte] * fixes restapi tests after correcting RandomChunker.example * review: use string.fromBytes from nim-stew to convert seq[byte] to string --- tests/examples.nim | 5 ++--- tests/integration/codexclient.nim | 3 +++ tests/integration/marketplacesuite.nim | 10 +++++++--- tests/integration/testmarketplace.nim | 4 ++-- tests/integration/testproofs.nim | 4 +--- tests/integration/testrestapi.nim | 26 ++++++++++++++------------ tests/integration/testupdownload.nim | 2 +- 7 files changed, 30 insertions(+), 24 deletions(-) diff --git a/tests/examples.nim b/tests/examples.nim index bfb34cff..c96fefd6 100644 --- a/tests/examples.nim +++ b/tests/examples.nim @@ -86,8 +86,7 @@ proc example(_: type G2Point): G2Point = proc example*(_: type Groth16Proof): Groth16Proof = Groth16Proof(a: G1Point.example, b: G2Point.example, c: G1Point.example) -proc example*(_: type RandomChunker, blocks: int): Future[string] {.async.} = - # doAssert blocks >= 3, "must be more than 3 blocks" +proc example*(_: type RandomChunker, blocks: int): Future[seq[byte]] {.async.} = let rng = Rng.instance() let chunker = RandomChunker.new( rng, size = DefaultBlockSize * blocks.NBytes, chunkSize = DefaultBlockSize @@ -95,7 +94,7 @@ proc example*(_: type RandomChunker, blocks: int): Future[string] {.async.} = var data: seq[byte] while (let moar = await chunker.getBytes(); moar != []): data.add moar - return byteutils.toHex(data) + return data proc example*(_: type RandomChunker): Future[string] {.async.} = await RandomChunker.example(3) diff --git a/tests/integration/codexclient.nim b/tests/integration/codexclient.nim index 7826b151..d1191fb9 100644 --- a/tests/integration/codexclient.nim +++ b/tests/integration/codexclient.nim @@ -44,6 +44,9 @@ proc upload*(client: CodexClient, contents: string): ?!Cid = assert response.status == "200 OK" Cid.init(response.body).mapFailure +proc upload*(client: CodexClient, bytes: seq[byte]): ?!Cid = + client.upload(string.fromBytes(bytes)) + proc download*(client: CodexClient, cid: Cid, local = false): ?!string = let response = client.http.get( client.baseurl & "/data/" & $cid & (if local: "" else: "/network/stream") diff --git a/tests/integration/marketplacesuite.nim b/tests/integration/marketplacesuite.nim index 4d155186..68283ad1 100644 --- a/tests/integration/marketplacesuite.nim +++ b/tests/integration/marketplacesuite.nim @@ -4,6 +4,7 @@ from pkg/libp2p import Cid import pkg/codex/contracts/marketplace as mp import pkg/codex/periods import pkg/codex/utils/json +from pkg/codex/utils import roundUp, divUp import ./multinodes import ../contracts/time import ../contracts/deployment @@ -45,11 +46,14 @@ template marketplacesuite*(name: string, body: untyped) = proc periods(p: int): uint64 = p.uint64 * period - proc slotSize(blocks: int): UInt256 = - (DefaultBlockSize * blocks.NBytes).Natural.u256 + proc slotSize(blocks, nodes, tolerance: int): UInt256 = + let ecK = nodes - tolerance + let blocksRounded = roundUp(blocks, ecK) + let blocksPerSlot = divUp(blocksRounded, ecK) + (DefaultBlockSize * blocksPerSlot.NBytes).Natural.u256 proc datasetSize(blocks, nodes, tolerance: int): UInt256 = - (nodes + tolerance).u256 * slotSize(blocks) + return nodes.u256 * slotSize(blocks, nodes, tolerance) proc createAvailabilities( datasetSize: UInt256, diff --git a/tests/integration/testmarketplace.nim b/tests/integration/testmarketplace.nim index bc030a1d..7813485b 100644 --- a/tests/integration/testmarketplace.nim +++ b/tests/integration/testmarketplace.nim @@ -112,7 +112,7 @@ marketplacesuite "Marketplace": await ethProvider.advanceTime(duration) # Checking that the hosting node received reward for at least the time between - let slotSize = slotSize(blocks) + let slotSize = slotSize(blocks, ecNodes, ecTolerance) let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize check eventually (await token.balanceOf(hostAccount)) - startBalanceHost >= (duration - 5 * 60) * pricePerSlotPerSecond * ecNodes.u256 @@ -197,7 +197,7 @@ marketplacesuite "Marketplace payouts": await advanceToNextPeriod() - let slotSize = slotSize(blocks) + let slotSize = slotSize(blocks, ecNodes, ecTolerance) let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize check eventually ( diff --git a/tests/integration/testproofs.nim b/tests/integration/testproofs.nim index b25643ad..a547890b 100644 --- a/tests/integration/testproofs.nim +++ b/tests/integration/testproofs.nim @@ -1,7 +1,6 @@ from std/times import inMilliseconds import pkg/questionable import pkg/codex/logutils -import pkg/stew/byteutils import ../contracts/time import ../contracts/deployment import ../codex/helpers @@ -60,8 +59,7 @@ marketplacesuite "Hosts submit regular proofs": let purchase = client0.getPurchase(purchaseId).get check purchase.error == none string - let request = purchase.request.get - let slotSize = request.ask.slotSize + let slotSize = slotSize(blocks, ecNodes, ecTolerance) check eventually( client0.purchaseStateIs(purchaseId, "started"), timeout = expiry.int * 1000 diff --git a/tests/integration/testrestapi.nim b/tests/integration/testrestapi.nim index f1f2299f..8cbe9817 100644 --- a/tests/integration/testrestapi.nim +++ b/tests/integration/testrestapi.nim @@ -1,5 +1,6 @@ import std/httpclient import std/sequtils +import std/strformat from pkg/libp2p import `==` import pkg/codex/units import ./twonodes @@ -144,18 +145,19 @@ twonodessuite "REST API": check responseBefore.body == "Invalid parameters: `tolerance` cannot be greater than `nodes`" - test "request storage succeeds if nodes and tolerance within range", twoNodesConfig: - let data = await RandomChunker.example(blocks = 2) - let cid = client1.upload(data).get - let duration = 100.u256 - let pricePerBytePerSecond = 1.u256 - let proofProbability = 3.u256 - let expiry = 30.uint - let collateralPerByte = 1.u256 - let ecParams = @[(3, 1), (5, 2)] - - for ecParam in ecParams: - let (nodes, tolerance) = ecParam + for ecParams in @[ + (minBlocks: 2, nodes: 3, tolerance: 1), (minBlocks: 3, nodes: 5, tolerance: 2) + ]: + let (minBlocks, nodes, tolerance) = ecParams + test "request storage succeeds if nodes and tolerance within range " & + fmt"({minBlocks=}, {nodes=}, {tolerance=})", twoNodesConfig: + let data = await RandomChunker.example(blocks = minBlocks) + let cid = client1.upload(data).get + let duration = 100.u256 + let pricePerBytePerSecond = 1.u256 + let proofProbability = 3.u256 + let expiry = 30.uint + let collateralPerByte = 1.u256 var responseBefore = client1.requestStorageRaw( cid, duration, pricePerBytePerSecond, proofProbability, collateralPerByte, diff --git a/tests/integration/testupdownload.nim b/tests/integration/testupdownload.nim index 74bee8c7..05d3a496 100644 --- a/tests/integration/testupdownload.nim +++ b/tests/integration/testupdownload.nim @@ -88,7 +88,7 @@ twonodessuite "Uploads and downloads": let cid = a.upload(data).get let response = b.download(cid).get check: - response == data + @response.mapIt(it.byte) == data for run in 0 .. 10: await transferTest(client1, client2) From e62a09d9b10a02d5f94ce558b3c245369ebda4ab Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 6 Feb 2025 22:36:14 +0100 Subject: [PATCH 04/17] add ccache and sccache to speed up CI (#1074) * add ccache and sccache to speed up CI Signed-off-by: Csaba Kiraly * include testname and nim version in cache separation Signed-off-by: Csaba Kiraly * Make sure ccache has precedence over custom clang/llvm Signed-off-by: Csaba Kiraly * enable ccache for windows Signed-off-by: Csaba Kiraly * ccache: evict old files Make sure old unused cache files are not lingering around for long Signed-off-by: Csaba Kiraly --------- Signed-off-by: Csaba Kiraly --- .../actions/nimbus-build-system/action.yml | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/.github/actions/nimbus-build-system/action.yml b/.github/actions/nimbus-build-system/action.yml index c7bdb627..219966db 100644 --- a/.github/actions/nimbus-build-system/action.yml +++ b/.github/actions/nimbus-build-system/action.yml @@ -97,6 +97,33 @@ runs: # Set GCC-14 as the default sudo update-alternatives --set gcc /usr/bin/gcc-14 + - name: Install ccache on Linux/Mac + if: inputs.os == 'linux' || inputs.os == 'macos' + uses: hendrikmuhs/ccache-action@v1.2 + with: + create-symlink: true + key: ${{ matrix.os }}-${{ matrix.builder }}-${{ matrix.cpu }}-${{ matrix.tests }}-${{ matrix.nim_version }} + evict-old-files: 7d + + - name: Install ccache on Windows + if: inputs.os == 'windows' + uses: hendrikmuhs/ccache-action@v1.2 + with: + key: ${{ matrix.os }}-${{ matrix.builder }}-${{ matrix.cpu }}-${{ matrix.tests }}-${{ matrix.nim_version }} + evict-old-files: 7d + + - name: Enable ccache on Windows + if: inputs.os == 'windows' + shell: ${{ inputs.shell }} {0} + run: | + CCACHE_DIR=$(dirname $(which ccache))/ccached + mkdir ${CCACHE_DIR} + ln -s $(which ccache) ${CCACHE_DIR}/gcc.exe + ln -s $(which ccache) ${CCACHE_DIR}/g++.exe + ln -s $(which ccache) ${CCACHE_DIR}/cc.exe + ln -s $(which ccache) ${CCACHE_DIR}/c++.exe + echo "export PATH=${CCACHE_DIR}:\$PATH" >> $HOME/.bash_profile # prefix path in MSYS2 + - name: Derive environment variables shell: ${{ inputs.shell }} {0} run: | @@ -154,8 +181,11 @@ runs: llvm_bin_dir="${llvm_dir}/bin" llvm_lib_dir="${llvm_dir}/lib" echo "${llvm_bin_dir}" >> ${GITHUB_PATH} + # Make sure ccache has precedence (GITHUB_PATH is appending before) + echo "$(brew --prefix)/opt/ccache/libexec" >> ${GITHUB_PATH} + echo $PATH echo "LDFLAGS=${LDFLAGS} -L${libomp_lib_dir} -L${llvm_lib_dir} -Wl,-rpath,${llvm_lib_dir}" >> ${GITHUB_ENV} - NIMFLAGS="${NIMFLAGS} $(quote "-d:LeopardCmakeFlags='-DCMAKE_BUILD_TYPE=Release -DCMAKE_C_COMPILER=${llvm_bin_dir}/clang -DCMAKE_CXX_COMPILER=${llvm_bin_dir}/clang++' -d:LeopardExtraCompilerlags='-fopenmp' -d:LeopardExtraLinkerFlags='-fopenmp -L${libomp_lib_dir}'")" + NIMFLAGS="${NIMFLAGS} $(quote "-d:LeopardCmakeFlags='-DCMAKE_BUILD_TYPE=Release' -d:LeopardExtraCompilerFlags='-fopenmp' -d:LeopardExtraLinkerFlags='-fopenmp -L${libomp_lib_dir}'")" echo "NIMFLAGS=${NIMFLAGS}" >> $GITHUB_ENV fi @@ -191,6 +221,7 @@ runs: - name: Build Nim and Codex dependencies shell: ${{ inputs.shell }} {0} run: | + which gcc gcc --version make -j${ncpu} CI_CACHE=NimBinaries ${ARCH_OVERRIDE} QUICK_AND_DIRTY_COMPILER=1 update echo From 17d3f99f45bbbcadfb3d9c3de846ecd3f885f933 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Thu, 6 Feb 2025 15:36:35 -0600 Subject: [PATCH 05/17] use a case-of instead of if for better readability (#1063) --- codex/blockexchange/engine/engine.nim | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index e1624e99..87335b0a 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -398,7 +398,8 @@ proc wantListHandler*(b: BlockExcEngine, peer: PeerId, wantList: WantList) {.asy have = await e.address in b.localStore price = @(b.pricing.get(Pricing(price: 0.u256)).price.toBytesBE) - if e.wantType == WantType.WantHave: + case e.wantType: + of WantType.WantHave: if have: presence.add( BlockPresence( @@ -415,17 +416,19 @@ proc wantListHandler*(b: BlockExcEngine, peer: PeerId, wantList: WantList) {.asy peerCtx.peerWants.add(e) codex_block_exchange_want_have_lists_received.inc() - elif e.wantType == WantType.WantBlock: + of WantType.WantBlock: peerCtx.peerWants.add(e) schedulePeer = true codex_block_exchange_want_block_lists_received.inc() else: # Updating existing entry in peer wants # peer doesn't want this block anymore if e.cancel: + trace "Canceling want for block", address = e.address peerCtx.peerWants.del(idx) else: # peer might want to ask for the same cid with # different want params + trace "Updating want for block", address = e.address peerCtx.peerWants[idx] = e # update entry if presence.len > 0: From dfa90a9981cc49859cda04c68b18956f57880495 Mon Sep 17 00:00:00 2001 From: Eric <5089238+emizzle@users.noreply.github.com> Date: Thu, 6 Feb 2025 17:18:00 -0800 Subject: [PATCH 06/17] fix(build): compilation on macos when including nim-nat-traversal (#1084) * fix(build): compilation on macos when including nim-nat-traversal - removes the `VERSION` rename to `VERSION_temp` in the Makefile - instead, relies on `-iqoute` to include the `nim-nat-traversal/vendor/libnatpmp-upstream` directory in the search paths. `-iquote` will match the `vendor/libnatpmp-upstream/VERSION` file for `#include "version"` and not `#include `, the latter being what is included by the macos sdk and was causing issues with `-I`. The [gcc 14.2 docs](https://gcc.gnu.org/onlinedocs/gcc-14.2.0/cpp/Invocation.html#index-I) describe how `-iquote` alleviates this issue: > Directories specified with -iquote apply only to the quote form of the directive, #include "file". Directories specified with -I, -isystem, or -idirafter apply to lookup for both the #include "file" and #include directives. For more info, please see https://github.com/status-im/nim-nat-traversal/pull/34. * bump nim-nat-traversal Now that https://github.com/status-im/nim-nat-traversal/pull/34 has been merged, change back to master commit --- build.nims | 23 ++--------------------- vendor/nim-nat-traversal | 2 +- 2 files changed, 3 insertions(+), 22 deletions(-) diff --git a/build.nims b/build.nims index 5e190f6b..aa090e71 100644 --- a/build.nims +++ b/build.nims @@ -2,24 +2,8 @@ mode = ScriptMode.Verbose import std/os except commandLineParams -const VendorPath = "vendor/nim-nat-traversal/vendor/libnatpmp-upstream" -let - oldVersionFile = joinPath(VendorPath, "VERSION") - newVersionFile = joinPath(VendorPath, "VERSION_temp") - -proc renameFile(oldName, newName: string) = - if fileExists(oldName): - mvFile(oldName, newName) - else: - echo "File ", oldName, " does not exist" - - ### Helper functions proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") = - # This is a quick workaround to avoid VERSION file conflict on macOS - # More details here: https://github.com/codex-storage/nim-codex/issues/1059 - if defined(macosx): - renameFile(oldVersionFile, newVersionFile) if not dirExists "build": mkDir "build" @@ -37,11 +21,8 @@ proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") = # Place build output in 'build' folder, even if name includes a longer path. outName = os.lastPathPart(name) cmd = "nim " & lang & " --out:build/" & outName & " " & extra_params & " " & srcDir & name & ".nim" - try: - exec(cmd) - finally: - if defined(macosx): - renameFile(newVersionFile, oldVersionFile) + + exec(cmd) proc test(name: string, srcDir = "tests/", params = "", lang = "c") = buildBinary name, srcDir, params diff --git a/vendor/nim-nat-traversal b/vendor/nim-nat-traversal index 5e405974..6508ce75 160000 --- a/vendor/nim-nat-traversal +++ b/vendor/nim-nat-traversal @@ -1 +1 @@ -Subproject commit 5e4059746e9095e1731b02eeaecd62a70fbe664d +Subproject commit 6508ce75060878dfcdfa21f94721672c69a1823b From 8880ad9cd43f29609a5a4e7742fb01b8eb7d22f1 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Tue, 11 Feb 2025 11:47:25 +0100 Subject: [PATCH 07/17] fix linting in "codex/blockexchange/engine/engine.nim" (#1107) --- codex/blockexchange/engine/engine.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 87335b0a..d30f88d9 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -398,7 +398,7 @@ proc wantListHandler*(b: BlockExcEngine, peer: PeerId, wantList: WantList) {.asy have = await e.address in b.localStore price = @(b.pricing.get(Pricing(price: 0.u256)).price.toBytesBE) - case e.wantType: + case e.wantType of WantType.WantHave: if have: presence.add( From 11888e78d79d62e00b5bb7be5015727d0be41af1 Mon Sep 17 00:00:00 2001 From: Marcin Czenko Date: Tue, 11 Feb 2025 16:16:45 +0100 Subject: [PATCH 08/17] deploy openapi spec only when tagged (#1106) --- .github/workflows/docs.yml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 50b14d05..4232ff0f 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -2,17 +2,17 @@ name: OpenAPI on: push: - branches: - - 'master' + tags: + - "v*.*.*" paths: - - 'openapi.yaml' - - '.github/workflows/docs.yml' + - "openapi.yaml" + - ".github/workflows/docs.yml" pull_request: branches: - - '**' + - "**" paths: - - 'openapi.yaml' - - '.github/workflows/docs.yml' + - "openapi.yaml" + - ".github/workflows/docs.yml" # Sets permissions of the GITHUB_TOKEN to allow deployment to GitHub Pages permissions: @@ -40,7 +40,7 @@ jobs: deploy: name: Deploy runs-on: ubuntu-latest - if: github.ref == 'refs/heads/master' + if: startsWith(github.ref, 'refs/tags/') steps: - name: Checkout uses: actions/checkout@v4 From bbe1f09cd76a41eb1b202e81dd59fbbb67650c3a Mon Sep 17 00:00:00 2001 From: Giuliano Mega Date: Tue, 11 Feb 2025 16:00:05 -0300 Subject: [PATCH 09/17] Purging of local datasets (#1103) * feat(codex-node): add dataset deletion API to Codex node * feat(api): add deletion of local datasets to API * fix: logging, remove garbage, drop some CORS headers from DELETE request * fix: change empty response return code to 204 instead of 200 * fix: add time-based idling to avoid locking up the node during deletes, fix API status code * fix: uncomment commented tests committed by accident * fix: return correct code when missing CID is a Manifest CID; add back CORS headers * fix: remove lingering echo --- build.nims | 34 ++++++++++++------ codex/node.nim | 59 +++++++++++++++++++++++++++++++ codex/rest/api.nim | 30 ++++++++++++++++ tests/codex/examples.nim | 10 ++++++ tests/codex/helpers.nim | 27 +++++++------- tests/codex/node/testnode.nim | 26 ++++++++++++++ tests/codex/slots/helpers.nim | 13 +++++-- tests/integration/codexclient.nim | 13 +++++++ tests/integration/testrestapi.nim | 26 +++++++++++++- tests/integration/twonodes.nim | 1 - 10 files changed, 211 insertions(+), 28 deletions(-) diff --git a/build.nims b/build.nims index aa090e71..baf21e03 100644 --- a/build.nims +++ b/build.nims @@ -4,7 +4,6 @@ import std/os except commandLineParams ### Helper functions proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") = - if not dirExists "build": mkDir "build" @@ -14,13 +13,15 @@ proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") = for param in commandLineParams(): extra_params &= " " & param else: - for i in 2..= runtimeQuota: + await idleAsync() + lastIdle = getTime() + + if err =? (await store.delBlock(manifest.treeCid, i)).errorOption: + # The contract for delBlock is fuzzy, but we assume that if the block is + # simply missing we won't get an error. This is a best effort operation and + # can simply be retried. + error "Failed to delete block within dataset", index = i, err = err.msg + return failure(err) + + if err =? (await store.delBlock(cid)).errorOption: + error "Error deleting manifest block", err = err.msg + + success() + +proc delete*( + self: CodexNodeRef, cid: Cid +): Future[?!void] {.async: (raises: [CatchableError]).} = + ## Deletes a whole dataset, if Cid is a Manifest Cid, or a single block, if Cid a block Cid, + ## from the underlying block store. This is a strictly local operation. + ## + ## Missing blocks in dataset deletes are ignored. + ## + + without isManifest =? cid.isManifest, err: + trace "Bad content type for CID:", cid = cid, err = err.msg + return failure(err) + + if not isManifest: + return await self.deleteSingleBlock(cid) + + await self.deleteEntireDataset(cid) + proc store*( self: CodexNodeRef, stream: LPStream, diff --git a/codex/rest/api.nim b/codex/rest/api.nim index a64d26cf..f64a6f20 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -238,6 +238,15 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute let json = await formatManifestBlocks(node) return RestApiResponse.response($json, contentType = "application/json") + router.api(MethodOptions, "/api/codex/v1/data/{cid}") do( + cid: Cid, resp: HttpResponseRef + ) -> RestApiResponse: + if corsOrigin =? allowedOrigin: + resp.setCorsHeaders("GET,DELETE", corsOrigin) + + resp.status = Http204 + await resp.sendBody("") + router.api(MethodGet, "/api/codex/v1/data/{cid}") do( cid: Cid, resp: HttpResponseRef ) -> RestApiResponse: @@ -254,6 +263,27 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute await node.retrieveCid(cid.get(), local = true, resp = resp) + router.api(MethodDelete, "/api/codex/v1/data/{cid}") do( + cid: Cid, resp: HttpResponseRef + ) -> RestApiResponse: + ## Deletes either a single block or an entire dataset + ## from the local node. Does nothing and returns 200 + ## if the dataset is not locally available. + ## + var headers = buildCorsHeaders("DELETE", allowedOrigin) + + if cid.isErr: + return RestApiResponse.error(Http400, $cid.error(), headers = headers) + + if err =? (await node.delete(cid.get())).errorOption: + return RestApiResponse.error(Http500, err.msg, headers = headers) + + if corsOrigin =? allowedOrigin: + resp.setCorsHeaders("DELETE", corsOrigin) + + resp.status = Http204 + await resp.sendBody("") + router.api(MethodPost, "/api/codex/v1/data/{cid}/network") do( cid: Cid, resp: HttpResponseRef ) -> RestApiResponse: diff --git a/tests/codex/examples.nim b/tests/codex/examples.nim index 69a85db8..6f15182f 100644 --- a/tests/codex/examples.nim +++ b/tests/codex/examples.nim @@ -8,6 +8,7 @@ import pkg/codex/stores import pkg/codex/blocktype as bt import pkg/codex/sales import pkg/codex/merkletree +import pkg/codex/manifest import ../examples export examples @@ -51,6 +52,15 @@ proc example*(_: type BlockExcPeerCtx): BlockExcPeerCtx = proc example*(_: type Cid): Cid = bt.Block.example.cid +proc example*(_: type Manifest): Manifest = + Manifest.new( + treeCid = Cid.example, + blockSize = 256.NBytes, + datasetSize = 4096.NBytes, + filename = "example.txt".some, + mimetype = "text/plain".some, + ) + proc example*(_: type MultiHash, mcodec = Sha256HashCodec): MultiHash = let bytes = newSeqWith(256, rand(uint8)) MultiHash.digest($mcodec, bytes).tryGet() diff --git a/tests/codex/helpers.nim b/tests/codex/helpers.nim index 6d7415d3..898dd16e 100644 --- a/tests/codex/helpers.nim +++ b/tests/codex/helpers.nim @@ -85,30 +85,31 @@ proc makeWantList*( ) proc storeDataGetManifest*( - store: BlockStore, chunker: Chunker + store: BlockStore, blocks: seq[Block] ): Future[Manifest] {.async.} = - var cids = newSeq[Cid]() - - while (let chunk = await chunker.getBytes(); chunk.len > 0): - let blk = Block.new(chunk).tryGet() - cids.add(blk.cid) + for blk in blocks: (await store.putBlock(blk)).tryGet() let - tree = CodexTree.init(cids).tryGet() + (manifest, tree) = makeManifestAndTree(blocks).tryGet() treeCid = tree.rootCid.tryGet() - manifest = Manifest.new( - treeCid = treeCid, - blockSize = NBytes(chunker.chunkSize), - datasetSize = NBytes(chunker.offset), - ) for i in 0 ..< tree.leavesCount: let proof = tree.getProof(i).tryGet() - (await store.putCidAndProof(treeCid, i, cids[i], proof)).tryGet() + (await store.putCidAndProof(treeCid, i, blocks[i].cid, proof)).tryGet() return manifest +proc storeDataGetManifest*( + store: BlockStore, chunker: Chunker +): Future[Manifest] {.async.} = + var blocks = newSeq[Block]() + + while (let chunk = await chunker.getBytes(); chunk.len > 0): + blocks.add(Block.new(chunk).tryGet()) + + return await storeDataGetManifest(store, blocks) + proc makeRandomBlocks*( datasetSize: int, blockSize: NBytes ): Future[seq[Block]] {.async.} = diff --git a/tests/codex/node/testnode.nim b/tests/codex/node/testnode.nim index e4a9d1f4..b9450f40 100644 --- a/tests/codex/node/testnode.nim +++ b/tests/codex/node/testnode.nim @@ -37,6 +37,7 @@ import ../examples import ../helpers import ../helpers/mockmarket import ../helpers/mockclock +import ../slots/helpers import ./helpers @@ -166,3 +167,28 @@ asyncchecksuite "Test Node - Basic": (await verifiableBlock.cid in localStore) == true request.content.cid == $verifiableBlock.cid request.content.merkleRoot == builder.verifyRoot.get.toBytes + + test "Should delete a single block": + let randomBlock = bt.Block.new("Random block".toBytes).tryGet() + (await localStore.putBlock(randomBlock)).tryGet() + check (await randomBlock.cid in localStore) == true + + (await node.delete(randomBlock.cid)).tryGet() + check (await randomBlock.cid in localStore) == false + + test "Should delete an entire dataset": + let + blocks = await makeRandomBlocks(datasetSize = 2048, blockSize = 256'nb) + manifest = await storeDataGetManifest(localStore, blocks) + manifestBlock = (await store.storeManifest(manifest)).tryGet() + manifestCid = manifestBlock.cid + + check await manifestCid in localStore + for blk in blocks: + check await blk.cid in localStore + + (await node.delete(manifestCid)).tryGet() + + check not await manifestCid in localStore + for blk in blocks: + check not (await blk.cid in localStore) diff --git a/tests/codex/slots/helpers.nim b/tests/codex/slots/helpers.nim index 03d87d12..fced1f1c 100644 --- a/tests/codex/slots/helpers.nim +++ b/tests/codex/slots/helpers.nim @@ -15,9 +15,7 @@ import pkg/codex/rng import ../helpers -proc storeManifest*( - store: BlockStore, manifest: Manifest -): Future[?!bt.Block] {.async.} = +proc makeManifestBlock*(manifest: Manifest): ?!bt.Block = without encodedVerifiable =? manifest.encode(), err: trace "Unable to encode manifest" return failure(err) @@ -26,6 +24,15 @@ proc storeManifest*( trace "Unable to create block from manifest" return failure(error) + success blk + +proc storeManifest*( + store: BlockStore, manifest: Manifest +): Future[?!bt.Block] {.async.} = + without blk =? makeManifestBlock(manifest), err: + trace "Unable to create manifest block", err = err.msg + return failure(err) + if err =? (await store.putBlock(blk)).errorOption: trace "Unable to store manifest block", cid = blk.cid, err = err.msg return failure(err) diff --git a/tests/integration/codexclient.nim b/tests/integration/codexclient.nim index d1191fb9..992b50d0 100644 --- a/tests/integration/codexclient.nim +++ b/tests/integration/codexclient.nim @@ -86,6 +86,16 @@ proc downloadBytes*( success bytes +proc delete*(client: CodexClient, cid: Cid): ?!void = + let + url = client.baseurl & "/data/" & $cid + response = client.http.delete(url) + + if response.status != "204 No Content": + return failure(response.status) + + success() + proc list*(client: CodexClient): ?!RestContentList = let url = client.baseurl & "/data" let response = client.http.get(url) @@ -284,3 +294,6 @@ proc downloadRaw*(client: CodexClient, cid: string, local = false): Response = client.baseurl & "/data/" & cid & (if local: "" else: "/network/stream"), httpMethod = HttpGet, ) + +proc deleteRaw*(client: CodexClient, cid: string): Response = + return client.http.request(client.baseurl & "/data/" & cid, httpMethod = HttpDelete) diff --git a/tests/integration/testrestapi.nim b/tests/integration/testrestapi.nim index 8cbe9817..557efad2 100644 --- a/tests/integration/testrestapi.nim +++ b/tests/integration/testrestapi.nim @@ -1,10 +1,13 @@ import std/httpclient import std/sequtils import std/strformat -from pkg/libp2p import `==` +from pkg/libp2p import `==`, `$`, Cid import pkg/codex/units +import pkg/codex/manifest import ./twonodes import ../examples +import ../codex/examples +import ../codex/slots/helpers import json twonodessuite "REST API": @@ -263,3 +266,24 @@ twonodessuite "REST API": check localResponse.headers.hasKey("Content-Disposition") == true check localResponse.headers["Content-Disposition"] == "attachment; filename=\"example.txt\"" + + test "should delete a dataset when requested", twoNodesConfig: + let cid = client1.upload("some file contents").get + + var response = client1.downloadRaw($cid, local = true) + check response.body == "some file contents" + + client1.delete(cid).get + + response = client1.downloadRaw($cid, local = true) + check response.status == "404 Not Found" + + test "should return 200 when attempting delete of non-existing block", twoNodesConfig: + let response = client1.deleteRaw($(Cid.example())) + check response.status == "204 No Content" + + test "should return 200 when attempting delete of non-existing dataset", + twoNodesConfig: + let cid = Manifest.example().makeManifestBlock().get.cid + let response = client1.deleteRaw($cid) + check response.status == "204 No Content" diff --git a/tests/integration/twonodes.nim b/tests/integration/twonodes.nim index 5666690e..eeceb20d 100644 --- a/tests/integration/twonodes.nim +++ b/tests/integration/twonodes.nim @@ -1,4 +1,3 @@ -import std/os import std/macros import pkg/questionable import ./multinodes From 20f6fef7ab25e7634b9541e2e9d1fa63fa083f1b Mon Sep 17 00:00:00 2001 From: Slava <20563034+veaceslavdoina@users.noreply.github.com> Date: Tue, 11 Feb 2025 23:49:37 +0200 Subject: [PATCH 10/17] fix: use ubuntu-24.04 runners for docker workflows (#1102) Co-authored-by: Giuliano Mega --- .github/workflows/docker-reusable.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/docker-reusable.yml b/.github/workflows/docker-reusable.yml index f0e46d95..7d937f78 100644 --- a/.github/workflows/docker-reusable.yml +++ b/.github/workflows/docker-reusable.yml @@ -94,11 +94,11 @@ jobs: - target: os: linux arch: amd64 - builder: ubuntu-22.04 + builder: ubuntu-24.04 - target: os: linux arch: arm64 - builder: ubuntu-22.04-arm + builder: ubuntu-24.04-arm name: Build ${{ matrix.target.os }}/${{ matrix.target.arch }} runs-on: ${{ matrix.builder }} From 45e97513a7a14ac56689a7c508d5af145737054d Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 12 Feb 2025 04:48:58 -0600 Subject: [PATCH 11/17] remove uploadedAt from manifest (#1091) * remove uploadedAt from manifest * fix test --- codex/manifest/coders.nim | 11 ----------- codex/manifest/manifest.nim | 15 +-------------- codex/node.nim | 1 - codex/slots/builder/builder.nim | 2 +- openapi.yaml | 6 ------ tests/integration/testrestapi.nim | 4 +--- 6 files changed, 3 insertions(+), 36 deletions(-) diff --git a/codex/manifest/coders.nim b/codex/manifest/coders.nim index 0c461e45..30e0c7ca 100644 --- a/codex/manifest/coders.nim +++ b/codex/manifest/coders.nim @@ -63,7 +63,6 @@ proc encode*(manifest: Manifest): ?!seq[byte] = # optional ErasureInfo erasure = 7; # erasure coding info # optional filename: ?string = 8; # original filename # optional mimetype: ?string = 9; # original mimetype - # optional uploadedAt: ?int64 = 10; # original uploadedAt # } # ``` # @@ -102,9 +101,6 @@ proc encode*(manifest: Manifest): ?!seq[byte] = if manifest.mimetype.isSome: header.write(9, manifest.mimetype.get()) - if manifest.uploadedAt.isSome: - header.write(10, manifest.uploadedAt.get().uint64) - pbNode.write(1, header) # set the treeCid as the data field pbNode.finish() @@ -135,7 +131,6 @@ proc decode*(_: type Manifest, data: openArray[byte]): ?!Manifest = verifiableStrategy: uint32 filename: string mimetype: string - uploadedAt: uint64 # Decode `Header` message if pbNode.getField(1, pbHeader).isErr: @@ -169,9 +164,6 @@ proc decode*(_: type Manifest, data: openArray[byte]): ?!Manifest = if pbHeader.getField(9, mimetype).isErr: return failure("Unable to decode `mimetype` from manifest!") - if pbHeader.getField(10, uploadedAt).isErr: - return failure("Unable to decode `uploadedAt` from manifest!") - let protected = pbErasureInfo.buffer.len > 0 var verifiable = false if protected: @@ -211,7 +203,6 @@ proc decode*(_: type Manifest, data: openArray[byte]): ?!Manifest = var filenameOption = if filename.len == 0: string.none else: filename.some var mimetypeOption = if mimetype.len == 0: string.none else: mimetype.some - var uploadedAtOption = if uploadedAt == 0: int64.none else: uploadedAt.int64.some let self = if protected: @@ -229,7 +220,6 @@ proc decode*(_: type Manifest, data: openArray[byte]): ?!Manifest = strategy = StrategyType(protectedStrategy), filename = filenameOption, mimetype = mimetypeOption, - uploadedAt = uploadedAtOption, ) else: Manifest.new( @@ -241,7 +231,6 @@ proc decode*(_: type Manifest, data: openArray[byte]): ?!Manifest = codec = codec.MultiCodec, filename = filenameOption, mimetype = mimetypeOption, - uploadedAt = uploadedAtOption, ) ?self.verify() diff --git a/codex/manifest/manifest.nim b/codex/manifest/manifest.nim index 6e0d1b80..0bc51dfc 100644 --- a/codex/manifest/manifest.nim +++ b/codex/manifest/manifest.nim @@ -38,7 +38,6 @@ type Manifest* = ref object of RootObj version: CidVersion # Cid version filename {.serialize.}: ?string # The filename of the content uploaded (optional) mimetype {.serialize.}: ?string # The mimetype of the content uploaded (optional) - uploadedAt {.serialize.}: ?int64 # The UTC creation timestamp in seconds case protected {.serialize.}: bool # Protected datasets have erasure coded info of true: ecK: int # Number of blocks to encode @@ -131,8 +130,6 @@ func filename*(self: Manifest): ?string = func mimetype*(self: Manifest): ?string = self.mimetype -func uploadedAt*(self: Manifest): ?int64 = - self.uploadedAt ############################################################ # Operations on block list ############################################################ @@ -172,7 +169,7 @@ func `==`*(a, b: Manifest): bool = (a.treeCid == b.treeCid) and (a.datasetSize == b.datasetSize) and (a.blockSize == b.blockSize) and (a.version == b.version) and (a.hcodec == b.hcodec) and (a.codec == b.codec) and (a.protected == b.protected) and (a.filename == b.filename) and - (a.mimetype == b.mimetype) and (a.uploadedAt == b.uploadedAt) and ( + (a.mimetype == b.mimetype) and ( if a.protected: (a.ecK == b.ecK) and (a.ecM == b.ecM) and (a.originalTreeCid == b.originalTreeCid) and (a.originalDatasetSize == b.originalDatasetSize) and @@ -202,9 +199,6 @@ func `$`*(self: Manifest): string = if self.mimetype.isSome: result &= ", mimetype: " & $self.mimetype - if self.uploadedAt.isSome: - result &= ", uploadedAt: " & $self.uploadedAt - result &= ( if self.protected: ", ecK: " & $self.ecK & ", ecM: " & $self.ecM & ", originalTreeCid: " & @@ -236,7 +230,6 @@ func new*( protected = false, filename: ?string = string.none, mimetype: ?string = string.none, - uploadedAt: ?int64 = int64.none, ): Manifest = T( treeCid: treeCid, @@ -248,7 +241,6 @@ func new*( protected: protected, filename: filename, mimetype: mimetype, - uploadedAt: uploadedAt, ) func new*( @@ -278,7 +270,6 @@ func new*( protectedStrategy: strategy, filename: manifest.filename, mimetype: manifest.mimetype, - uploadedAt: manifest.uploadedAt, ) func new*(T: type Manifest, manifest: Manifest): Manifest = @@ -296,7 +287,6 @@ func new*(T: type Manifest, manifest: Manifest): Manifest = protected: false, filename: manifest.filename, mimetype: manifest.mimetype, - uploadedAt: manifest.uploadedAt, ) func new*( @@ -314,7 +304,6 @@ func new*( strategy = SteppedStrategy, filename: ?string = string.none, mimetype: ?string = string.none, - uploadedAt: ?int64 = int64.none, ): Manifest = Manifest( treeCid: treeCid, @@ -331,7 +320,6 @@ func new*( protectedStrategy: strategy, filename: filename, mimetype: mimetype, - uploadedAt: uploadedAt, ) func new*( @@ -374,7 +362,6 @@ func new*( verifiableStrategy: strategy, filename: manifest.filename, mimetype: manifest.mimetype, - uploadedAt: manifest.uploadedAt, ) func new*(T: type Manifest, data: openArray[byte]): ?!Manifest = diff --git a/codex/node.nim b/codex/node.nim index f43a4b55..b90d6a9e 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -391,7 +391,6 @@ proc store*( codec = dataCodec, filename = filename, mimetype = mimetype, - uploadedAt = now().utc.toTime.toUnix.some, ) without manifestBlk =? await self.storeManifest(manifest), err: diff --git a/codex/slots/builder/builder.nim b/codex/slots/builder/builder.nim index 30332f1c..74597ff1 100644 --- a/codex/slots/builder/builder.nim +++ b/codex/slots/builder/builder.nim @@ -189,7 +189,7 @@ proc getCellHashes*[T, H]( blkIdx = blkIdx pos = i - trace "Getting block CID for tree at index" + trace "Getting block CID for tree at index", index = blkIdx without (_, tree) =? (await self.buildBlockTree(blkIdx, i)) and digest =? tree.root, err: error "Failed to get block CID for tree at index", err = err.msg diff --git a/openapi.yaml b/openapi.yaml index 9d401e8f..70da398b 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -371,12 +371,6 @@ components: nullable: true description: "The original mimetype of the uploaded content (optional)" example: image/png - uploadedAt: - type: integer - format: int64 - nullable: true - description: "The UTC upload timestamp in seconds" - example: 1729244192 Space: type: object diff --git a/tests/integration/testrestapi.nim b/tests/integration/testrestapi.nim index 557efad2..13b06500 100644 --- a/tests/integration/testrestapi.nim +++ b/tests/integration/testrestapi.nim @@ -38,7 +38,7 @@ twonodessuite "REST API": check: space.totalBlocks == 2 space.quotaMaxBytes == 8589934592.NBytes - space.quotaUsedBytes == 65598.NBytes + space.quotaUsedBytes == 65592.NBytes space.quotaReservedBytes == 12.NBytes test "node lists local files", twoNodesConfig: @@ -232,8 +232,6 @@ twonodessuite "REST API": check manifest["filename"].getStr() == "example.txt" check manifest.hasKey("mimetype") == true check manifest["mimetype"].getStr() == "text/plain" - check manifest.hasKey("uploadedAt") == true - check manifest["uploadedAt"].getInt() > 0 test "node set the headers when for download", twoNodesConfig: let headers = newHttpHeaders( From c65148822ee4f0eb17fefb584196bb3754adfc08 Mon Sep 17 00:00:00 2001 From: munna0908 <88337208+munna0908@users.noreply.github.com> Date: Wed, 12 Feb 2025 23:26:26 +0530 Subject: [PATCH 12/17] feat: multithreading support for erasure coding (#1087) * implement async encode * implement async decode * cleanup code * add num-threads flag * fix tests * code cleanup * improve return types and exception handling for async proc * add validation check for numThreads flag * modify encode method * add new tests for aync encoding * modify decode method * cleanup test cases * add new cli flag for threadCount * test cleanup * add new tests * fix decodeAsync exception handling * code cleanup * chore: cosmetic changes --- codex/codex.nim | 16 +- codex/conf.nim | 31 ++++ codex/erasure/backend.nim | 8 +- codex/erasure/backends/leopard.nim | 14 +- codex/erasure/erasure.nim | 255 ++++++++++++++++++++++++++--- codex/node.nim | 17 +- codex/utils/arrayutils.nim | 25 +++ tests/codex/node/helpers.nim | 2 + tests/codex/node/testcontracts.nim | 2 +- tests/codex/node/testnode.nim | 6 +- tests/codex/testerasure.nim | 117 ++++++++++++- vendor/nim-leopard | 2 +- 12 files changed, 454 insertions(+), 41 deletions(-) create mode 100644 codex/utils/arrayutils.nim diff --git a/codex/codex.nim b/codex/codex.nim index 13985254..6dcfbaaa 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -11,8 +11,10 @@ import std/sequtils import std/strutils import std/os import std/tables +import std/cpuinfo import pkg/chronos +import pkg/taskpools import pkg/presto import pkg/libp2p import pkg/confutils @@ -194,7 +196,18 @@ proc new*( .withTcpTransport({ServerFlags.ReuseAddr}) .build() - var cache: CacheStore = nil + var + cache: CacheStore = nil + taskpool: Taskpool + + try: + if config.numThreads == ThreadCount(0): + taskpool = Taskpool.new(numThreads = min(countProcessors(), 16)) + else: + taskpool = Taskpool.new(numThreads = int(config.numThreads)) + info "Threadpool started", numThreads = taskpool.numThreads + except CatchableError as exc: + raiseAssert("Failure in taskpool initialization:" & exc.msg) if config.cacheSize > 0'nb: cache = CacheStore.new(cacheSize = config.cacheSize) @@ -286,6 +299,7 @@ proc new*( engine = engine, discovery = discovery, prover = prover, + taskPool = taskpool, ) restServer = RestServerRef diff --git a/codex/conf.nim b/codex/conf.nim index 6d47f8f4..ccf29a1f 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -53,6 +53,10 @@ export DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockMaintenanceInterval, DefaultNumberOfBlocksToMaintainPerInterval +type ThreadCount* = distinct Natural + +proc `==`*(a, b: ThreadCount): bool {.borrow.} + proc defaultDataDir*(): string = let dataDir = when defined(windows): @@ -71,6 +75,7 @@ const DefaultDataDir* = defaultDataDir() DefaultCircuitDir* = defaultDataDir() / "circuits" + DefaultThreadCount* = ThreadCount(0) type StartUpCmd* {.pure.} = enum @@ -184,6 +189,13 @@ type name: "max-peers" .}: int + numThreads* {. + desc: + "Number of worker threads (\"0\" = use as many threads as there are CPU cores available)", + defaultValue: DefaultThreadCount, + name: "num-threads" + .}: ThreadCount + agentString* {. defaultValue: "Codex", desc: "Node agent string which is used as identifier in network", @@ -482,6 +494,13 @@ proc parseCmdArg*( quit QuitFailure ma +proc parseCmdArg*(T: type ThreadCount, input: string): T {.upraises: [ValueError].} = + let count = parseInt(input) + if count != 0 and count < 2: + warn "Invalid number of threads", input = input + quit QuitFailure + ThreadCount(count) + proc parseCmdArg*(T: type SignedPeerRecord, uri: string): T = var res: SignedPeerRecord try: @@ -579,6 +598,15 @@ proc readValue*( quit QuitFailure val = NBytes(value) +proc readValue*( + r: var TomlReader, val: var ThreadCount +) {.upraises: [SerializationError, IOError].} = + var str = r.readValue(string) + try: + val = parseCmdArg(ThreadCount, str) + except CatchableError as err: + raise newException(SerializationError, err.msg) + proc readValue*( r: var TomlReader, val: var Duration ) {.upraises: [SerializationError, IOError].} = @@ -609,6 +637,9 @@ proc completeCmdArg*(T: type NBytes, val: string): seq[string] = proc completeCmdArg*(T: type Duration, val: string): seq[string] = discard +proc completeCmdArg*(T: type ThreadCount, val: string): seq[string] = + discard + # silly chronicles, colors is a compile-time property proc stripAnsi*(v: string): string = var diff --git a/codex/erasure/backend.nim b/codex/erasure/backend.nim index a6dd8b8c..32009829 100644 --- a/codex/erasure/backend.nim +++ b/codex/erasure/backend.nim @@ -29,14 +29,18 @@ method release*(self: ErasureBackend) {.base, gcsafe.} = raiseAssert("not implemented!") method encode*( - self: EncoderBackend, buffers, parity: var openArray[seq[byte]] + self: EncoderBackend, + buffers, parity: ptr UncheckedArray[ptr UncheckedArray[byte]], + dataLen, parityLen: int, ): Result[void, cstring] {.base, gcsafe.} = ## encode buffers using a backend ## raiseAssert("not implemented!") method decode*( - self: DecoderBackend, buffers, parity, recovered: var openArray[seq[byte]] + self: DecoderBackend, + buffers, parity, recovered: ptr UncheckedArray[ptr UncheckedArray[byte]], + dataLen, parityLen, recoveredLen: int, ): Result[void, cstring] {.base, gcsafe.} = ## decode buffers using a backend ## diff --git a/codex/erasure/backends/leopard.nim b/codex/erasure/backends/leopard.nim index c9f9db40..ae599f12 100644 --- a/codex/erasure/backends/leopard.nim +++ b/codex/erasure/backends/leopard.nim @@ -22,11 +22,13 @@ type decoder*: Option[LeoDecoder] method encode*( - self: LeoEncoderBackend, data, parity: var openArray[seq[byte]] + self: LeoEncoderBackend, + data, parity: ptr UncheckedArray[ptr UncheckedArray[byte]], + dataLen, parityLen: int, ): Result[void, cstring] = ## Encode data using Leopard backend - if parity.len == 0: + if parityLen == 0: return ok() var encoder = @@ -36,10 +38,12 @@ method encode*( else: self.encoder.get() - encoder.encode(data, parity) + encoder.encode(data, parity, dataLen, parityLen) method decode*( - self: LeoDecoderBackend, data, parity, recovered: var openArray[seq[byte]] + self: LeoDecoderBackend, + data, parity, recovered: ptr UncheckedArray[ptr UncheckedArray[byte]], + dataLen, parityLen, recoveredLen: int, ): Result[void, cstring] = ## Decode data using given Leopard backend @@ -50,7 +54,7 @@ method decode*( else: self.decoder.get() - decoder.decode(data, parity, recovered) + decoder.decode(data, parity, recovered, dataLen, parityLen, recoveredLen) method release*(self: LeoEncoderBackend) = if self.encoder.isSome: diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index aacd187a..107f85bc 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -12,12 +12,14 @@ import pkg/upraises push: {.upraises: [].} -import std/sequtils -import std/sugar +import std/[sugar, atomics, sequtils] import pkg/chronos +import pkg/chronos/threadsync +import pkg/chronicles import pkg/libp2p/[multicodec, cid, multihash] import pkg/libp2p/protobuf/minprotobuf +import pkg/taskpools import ../logutils import ../manifest @@ -28,6 +30,7 @@ import ../utils import ../utils/asynciter import ../indexingstrategy import ../errors +import ../utils/arrayutils import pkg/stew/byteutils @@ -68,6 +71,7 @@ type proc(size, blocks, parity: int): DecoderBackend {.raises: [Defect], noSideEffect.} Erasure* = ref object + taskPool: Taskpool encoderProvider*: EncoderProvider decoderProvider*: DecoderProvider store*: BlockStore @@ -87,6 +91,24 @@ type # provided. minSize*: NBytes + EncodeTask = object + success: Atomic[bool] + erasure: ptr Erasure + blocks: ptr UncheckedArray[ptr UncheckedArray[byte]] + parity: ptr UncheckedArray[ptr UncheckedArray[byte]] + blockSize, blocksLen, parityLen: int + signal: ThreadSignalPtr + + DecodeTask = object + success: Atomic[bool] + erasure: ptr Erasure + blocks: ptr UncheckedArray[ptr UncheckedArray[byte]] + parity: ptr UncheckedArray[ptr UncheckedArray[byte]] + recovered: ptr UncheckedArray[ptr UncheckedArray[byte]] + blockSize, blocksLen: int + parityLen, recoveredLen: int + signal: ThreadSignalPtr + func indexToPos(steps, idx, step: int): int {.inline.} = ## Convert an index to a position in the encoded ## dataset @@ -269,6 +291,81 @@ proc init*( strategy: strategy, ) +proc leopardEncodeTask(tp: Taskpool, task: ptr EncodeTask) {.gcsafe.} = + # Task suitable for running in taskpools - look, no GC! + let encoder = + task[].erasure.encoderProvider(task[].blockSize, task[].blocksLen, task[].parityLen) + defer: + encoder.release() + discard task[].signal.fireSync() + + if ( + let res = + encoder.encode(task[].blocks, task[].parity, task[].blocksLen, task[].parityLen) + res.isErr + ): + warn "Error from leopard encoder backend!", error = $res.error + + task[].success.store(false) + else: + task[].success.store(true) + +proc encodeAsync*( + self: Erasure, + blockSize, blocksLen, parityLen: int, + data: ref seq[seq[byte]], + parity: ptr UncheckedArray[ptr UncheckedArray[byte]], +): Future[?!void] {.async: (raises: [CancelledError]).} = + without threadPtr =? ThreadSignalPtr.new(): + return failure("Unable to create thread signal") + + defer: + threadPtr.close().expect("closing once works") + + var blockData = createDoubleArray(blocksLen, blockSize) + + for i in 0 ..< data[].len: + copyMem(blockData[i], addr data[i][0], blockSize) + + defer: + freeDoubleArray(blockData, blocksLen) + + ## Create an ecode task with block data + var task = EncodeTask( + erasure: addr self, + blockSize: blockSize, + blocksLen: blocksLen, + parityLen: parityLen, + blocks: blockData, + parity: parity, + signal: threadPtr, + ) + + let t = addr task + + doAssert self.taskPool.numThreads > 1, + "Must have at least one separate thread or signal will never be fired" + self.taskPool.spawn leopardEncodeTask(self.taskPool, t) + let threadFut = threadPtr.wait() + + try: + await threadFut.join() + except CatchableError as exc: + try: + await threadFut + except AsyncError as asyncExc: + return failure(asyncExc.msg) + finally: + if exc of CancelledError: + raise (ref CancelledError) exc + else: + return failure(exc.msg) + + if not t.success.load(): + return failure("Leopard encoding failed") + + success() + proc encodeData( self: Erasure, manifest: Manifest, params: EncodingParams ): Future[?!Manifest] {.async.} = @@ -276,7 +373,6 @@ proc encodeData( ## ## `manifest` - the manifest to encode ## - logScope: steps = params.steps rounded_blocks = params.rounded @@ -286,7 +382,6 @@ proc encodeData( var cids = seq[Cid].new() - encoder = self.encoderProvider(manifest.blockSize.int, params.ecK, params.ecM) emptyBlock = newSeq[byte](manifest.blockSize.int) cids[].setLen(params.blocksCount) @@ -296,8 +391,7 @@ proc encodeData( # TODO: Don't allocate a new seq every time, allocate once and zero out var data = seq[seq[byte]].new() # number of blocks to encode - parityData = - newSeqWith[seq[byte]](params.ecM, newSeq[byte](manifest.blockSize.int)) + parity = createDoubleArray(params.ecM, manifest.blockSize.int) data[].setLen(params.ecK) # TODO: this is a tight blocking loop so we sleep here to allow @@ -311,15 +405,25 @@ proc encodeData( trace "Unable to prepare data", error = err.msg return failure(err) - trace "Erasure coding data", data = data[].len, parity = parityData.len + trace "Erasure coding data", data = data[].len - if (let res = encoder.encode(data[], parityData); res.isErr): - trace "Unable to encode manifest!", error = $res.error - return failure($res.error) + try: + if err =? ( + await self.encodeAsync( + manifest.blockSize.int, params.ecK, params.ecM, data, parity + ) + ).errorOption: + return failure(err) + except CancelledError as exc: + raise exc + finally: + freeDoubleArray(parity, params.ecM) var idx = params.rounded + step for j in 0 ..< params.ecM: - without blk =? bt.Block.new(parityData[j]), error: + var innerPtr: ptr UncheckedArray[byte] = parity[][j] + without blk =? bt.Block.new(innerPtr.toOpenArray(0, manifest.blockSize.int - 1)), + error: trace "Unable to create parity block", err = error.msg return failure(error) @@ -356,8 +460,6 @@ proc encodeData( except CatchableError as exc: trace "Erasure coding encoding error", exc = exc.msg return failure(exc) - finally: - encoder.release() proc encode*( self: Erasure, @@ -381,6 +483,101 @@ proc encode*( return success encodedManifest +proc leopardDecodeTask(tp: Taskpool, task: ptr DecodeTask) {.gcsafe.} = + # Task suitable for running in taskpools - look, no GC! + let decoder = + task[].erasure.decoderProvider(task[].blockSize, task[].blocksLen, task[].parityLen) + defer: + decoder.release() + + if ( + let res = decoder.decode( + task[].blocks, + task[].parity, + task[].recovered, + task[].blocksLen, + task[].parityLen, + task[].recoveredLen, + ) + res.isErr + ): + warn "Error from leopard decoder backend!", error = $res.error + task[].success.store(false) + else: + task[].success.store(true) + + discard task[].signal.fireSync() + +proc decodeAsync*( + self: Erasure, + blockSize, blocksLen, parityLen: int, + blocks, parity: ref seq[seq[byte]], + recovered: ptr UncheckedArray[ptr UncheckedArray[byte]], +): Future[?!void] {.async: (raises: [CancelledError]).} = + without threadPtr =? ThreadSignalPtr.new(): + return failure("Unable to create thread signal") + + defer: + threadPtr.close().expect("closing once works") + + var + blocksData = createDoubleArray(blocksLen, blockSize) + parityData = createDoubleArray(parityLen, blockSize) + + for i in 0 ..< blocks[].len: + if blocks[i].len > 0: + copyMem(blocksData[i], addr blocks[i][0], blockSize) + else: + blocksData[i] = nil + + for i in 0 ..< parity[].len: + if parity[i].len > 0: + copyMem(parityData[i], addr parity[i][0], blockSize) + else: + parityData[i] = nil + + defer: + freeDoubleArray(blocksData, blocksLen) + freeDoubleArray(parityData, parityLen) + + ## Create an decode task with block data + var task = DecodeTask( + erasure: addr self, + blockSize: blockSize, + blocksLen: blocksLen, + parityLen: parityLen, + recoveredLen: blocksLen, + blocks: blocksData, + parity: parityData, + recovered: recovered, + signal: threadPtr, + ) + + # Hold the task pointer until the signal is received + let t = addr task + doAssert self.taskPool.numThreads > 1, + "Must have at least one separate thread or signal will never be fired" + self.taskPool.spawn leopardDecodeTask(self.taskPool, t) + let threadFut = threadPtr.wait() + + try: + await threadFut.join() + except CatchableError as exc: + try: + await threadFut + except AsyncError as asyncExc: + return failure(asyncExc.msg) + finally: + if exc of CancelledError: + raise (ref CancelledError) exc + else: + return failure(exc.msg) + + if not t.success.load(): + return failure("Leopard encoding failed") + + success() + proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = ## Decode a protected manifest into it's original ## manifest @@ -388,7 +585,6 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = ## `encoded` - the encoded (protected) manifest to ## be recovered ## - logScope: steps = encoded.steps rounded_blocks = encoded.rounded @@ -411,8 +607,7 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = var data = seq[seq[byte]].new() parityData = seq[seq[byte]].new() - recovered = - newSeqWith[seq[byte]](encoded.ecK, newSeq[byte](encoded.blockSize.int)) + recovered = createDoubleArray(encoded.ecK, encoded.blockSize.int) data[].setLen(encoded.ecK) # set len to K parityData[].setLen(encoded.ecM) # set len to M @@ -430,15 +625,26 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = continue trace "Erasure decoding data" - - if (let err = decoder.decode(data[], parityData[], recovered); err.isErr): - trace "Unable to decode data!", err = $err.error - return failure($err.error) + try: + if err =? ( + await self.decodeAsync( + encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered + ) + ).errorOption: + return failure(err) + except CancelledError as exc: + raise exc + finally: + freeDoubleArray(recovered, encoded.ecK) for i in 0 ..< encoded.ecK: let idx = i * encoded.steps + step if data[i].len <= 0 and not cids[idx].isEmpty: - without blk =? bt.Block.new(recovered[i]), error: + var innerPtr: ptr UncheckedArray[byte] = recovered[][i] + + without blk =? bt.Block.new( + innerPtr.toOpenArray(0, encoded.blockSize.int - 1) + ), error: trace "Unable to create block!", exc = error.msg return failure(error) @@ -490,10 +696,13 @@ proc new*( store: BlockStore, encoderProvider: EncoderProvider, decoderProvider: DecoderProvider, + taskPool: Taskpool, ): Erasure = ## Create a new Erasure instance for encoding and decoding manifests ## - Erasure( - store: store, encoderProvider: encoderProvider, decoderProvider: decoderProvider + store: store, + encoderProvider: encoderProvider, + decoderProvider: decoderProvider, + taskPool: taskPool, ) diff --git a/codex/node.nim b/codex/node.nim index b90d6a9e..2602bfe6 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -15,6 +15,7 @@ import std/strformat import std/sugar import times +import pkg/taskpools import pkg/questionable import pkg/questionable/results import pkg/chronos @@ -70,6 +71,7 @@ type contracts*: Contracts clock*: Clock storage*: Contracts + taskpool: Taskpool CodexNodeRef* = ref CodexNode @@ -235,8 +237,9 @@ proc streamEntireDataset( # Retrieve, decode and save to the local store all EС groups proc erasureJob(): Future[?!void] {.async.} = # Spawn an erasure decoding job - let erasure = - Erasure.new(self.networkStore, leoEncoderProvider, leoDecoderProvider) + let erasure = Erasure.new( + self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + ) without _ =? (await erasure.decode(manifest)), error: error "Unable to erasure decode manifest", manifestCid, exc = error.msg return failure(error) @@ -461,8 +464,9 @@ proc setupRequest( return failure error # Erasure code the dataset according to provided parameters - let erasure = - Erasure.new(self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider) + let erasure = Erasure.new( + self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + ) without encoded =? (await erasure.encode(manifest, ecK, ecM)), error: trace "Unable to erasure code dataset" @@ -782,12 +786,16 @@ proc stop*(self: CodexNodeRef) {.async.} = if not self.networkStore.isNil: await self.networkStore.close + if not self.taskpool.isNil: + self.taskpool.shutdown() + proc new*( T: type CodexNodeRef, switch: Switch, networkStore: NetworkStore, engine: BlockExcEngine, discovery: Discovery, + taskpool: Taskpool, prover = Prover.none, contracts = Contracts.default, ): CodexNodeRef = @@ -800,5 +808,6 @@ proc new*( engine: engine, prover: prover, discovery: discovery, + taskPool: taskpool, contracts: contracts, ) diff --git a/codex/utils/arrayutils.nim b/codex/utils/arrayutils.nim new file mode 100644 index 00000000..c398921f --- /dev/null +++ b/codex/utils/arrayutils.nim @@ -0,0 +1,25 @@ +import std/sequtils + +proc createDoubleArray*( + outerLen, innerLen: int +): ptr UncheckedArray[ptr UncheckedArray[byte]] = + # Allocate outer array + result = cast[ptr UncheckedArray[ptr UncheckedArray[byte]]](allocShared0( + sizeof(ptr UncheckedArray[byte]) * outerLen + )) + + # Allocate each inner array + for i in 0 ..< outerLen: + result[i] = cast[ptr UncheckedArray[byte]](allocShared0(sizeof(byte) * innerLen)) + +proc freeDoubleArray*( + arr: ptr UncheckedArray[ptr UncheckedArray[byte]], outerLen: int +) = + # Free each inner array + for i in 0 ..< outerLen: + if not arr[i].isNil: + deallocShared(arr[i]) + + # Free outer array + if not arr.isNil: + deallocShared(arr) diff --git a/tests/codex/node/helpers.nim b/tests/codex/node/helpers.nim index 0d72b06b..2d1a87dc 100644 --- a/tests/codex/node/helpers.nim +++ b/tests/codex/node/helpers.nim @@ -6,6 +6,7 @@ import pkg/chronos import pkg/codex/codextypes import pkg/codex/chunker import pkg/codex/stores +import pkg/taskpools import ../../asynctest @@ -118,6 +119,7 @@ template setupAndTearDown*() {.dirty.} = engine = engine, prover = Prover.none, discovery = blockDiscovery, + taskpool = Taskpool.new(), ) teardown: diff --git a/tests/codex/node/testcontracts.nim b/tests/codex/node/testcontracts.nim index cce6d5bd..52adb5f6 100644 --- a/tests/codex/node/testcontracts.nim +++ b/tests/codex/node/testcontracts.nim @@ -75,7 +75,7 @@ asyncchecksuite "Test Node - Host contracts": let manifestBlock = bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() - erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) + erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new) manifestCid = manifestBlock.cid manifestCidStr = $(manifestCid) diff --git a/tests/codex/node/testnode.nim b/tests/codex/node/testnode.nim index b9450f40..3f9a141a 100644 --- a/tests/codex/node/testnode.nim +++ b/tests/codex/node/testnode.nim @@ -12,6 +12,7 @@ import pkg/questionable/results import pkg/stint import pkg/poseidon2 import pkg/poseidon2/io +import pkg/taskpools import pkg/nitro import pkg/codexdht/discv5/protocol as discv5 @@ -67,7 +68,7 @@ asyncchecksuite "Test Node - Basic": # https://github.com/codex-storage/nim-codex/issues/699 let cstore = CountingStore.new(engine, localStore) - node = CodexNodeRef.new(switch, cstore, engine, blockDiscovery) + node = CodexNodeRef.new(switch, cstore, engine, blockDiscovery, Taskpool.new()) missingCid = Cid.init("zDvZRwzmCvtiyubW9AecnxgLnXK8GrBvpQJBDzToxmzDN6Nrc2CZ").get() @@ -138,7 +139,8 @@ asyncchecksuite "Test Node - Basic": test "Setup purchase request": let - erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) + erasure = + Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new()) manifest = await storeDataGetManifest(localStore, chunker) manifestBlock = bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() diff --git a/tests/codex/testerasure.nim b/tests/codex/testerasure.nim index 952497e9..d469b379 100644 --- a/tests/codex/testerasure.nim +++ b/tests/codex/testerasure.nim @@ -1,5 +1,6 @@ import std/sequtils import std/sugar +import std/times import pkg/chronos import pkg/questionable/results @@ -11,6 +12,8 @@ import pkg/codex/blocktype as bt import pkg/codex/rng import pkg/codex/utils import pkg/codex/indexingstrategy +import pkg/taskpools +import pkg/codex/utils/arrayutils import ../asynctest import ./helpers @@ -27,6 +30,7 @@ suite "Erasure encode/decode": var erasure: Erasure let repoTmp = TempLevelDb.new() let metaTmp = TempLevelDb.new() + var taskpool: Taskpool setup: let @@ -35,12 +39,14 @@ suite "Erasure encode/decode": rng = Rng.instance() chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize) store = RepoStore.new(repoDs, metaDs) - erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) + taskpool = Taskpool.new() + erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool) manifest = await storeDataGetManifest(store, chunker) teardown: await repoTmp.destroyDb() await metaTmp.destroyDb() + taskpool.shutdown() proc encode(buffers, parity: int): Future[Manifest] {.async.} = let encoded = @@ -212,7 +218,7 @@ suite "Erasure encode/decode": let present = await store.hasBlock(manifest.treeCid, d) check present.tryGet() - test "handles edge case of 0 parity blocks": + test "Handles edge case of 0 parity blocks": const buffers = 20 parity = 0 @@ -221,6 +227,43 @@ suite "Erasure encode/decode": discard (await erasure.decode(encoded)).tryGet() + test "Should concurrently encode/decode multiple datasets": + const iterations = 2 + + let + datasetSize = 1.MiBs + ecK = 10.Natural + ecM = 10.Natural + + var encodeTasks = newSeq[Future[?!Manifest]]() + var decodeTasks = newSeq[Future[?!Manifest]]() + var manifests = newSeq[Manifest]() + for i in 0 ..< iterations: + let + # create random data and store it + blockSize = rng.sample(@[1, 2, 4, 8, 16, 32, 64].mapIt(it.KiBs)) + chunker = RandomChunker.new(rng, size = datasetSize, chunkSize = blockSize) + manifest = await storeDataGetManifest(store, chunker) + manifests.add(manifest) + # encode the data concurrently + encodeTasks.add(erasure.encode(manifest, ecK, ecM)) + # wait for all encoding tasks to finish + let encodeResults = await allFinished(encodeTasks) + # decode the data concurrently + for i in 0 ..< encodeResults.len: + decodeTasks.add(erasure.decode(encodeResults[i].read().tryGet())) + # wait for all decoding tasks to finish + let decodeResults = await allFinished(decodeTasks) # TODO: use allFutures + + for j in 0 ..< decodeTasks.len: + let + decoded = decodeResults[j].read().tryGet() + encoded = encodeResults[j].read().tryGet() + check: + decoded.treeCid == manifests[j].treeCid + decoded.treeCid == encoded.originalTreeCid + decoded.blocksCount == encoded.originalBlocksCount + test "Should handle verifiable manifests": const buffers = 20 @@ -259,3 +302,73 @@ suite "Erasure encode/decode": decoded.treeCid == manifest.treeCid decoded.treeCid == encoded.originalTreeCid decoded.blocksCount == encoded.originalBlocksCount + + test "Should complete encode/decode task when cancelled": + let + blocksLen = 10000 + parityLen = 10 + data = seq[seq[byte]].new() + chunker = RandomChunker.new( + rng, size = (blocksLen * BlockSize.int), chunkSize = BlockSize + ) + + data[].setLen(blocksLen) + + for i in 0 ..< blocksLen: + let chunk = await chunker.getBytes() + shallowCopy(data[i], @(chunk)) + + let + parity = createDoubleArray(parityLen, BlockSize.int) + paritySeq = seq[seq[byte]].new() + recovered = createDoubleArray(blocksLen, BlockSize.int) + cancelledTaskParity = createDoubleArray(parityLen, BlockSize.int) + cancelledTaskRecovered = createDoubleArray(blocksLen, BlockSize.int) + + paritySeq[].setLen(parityLen) + defer: + freeDoubleArray(parity, parityLen) + freeDoubleArray(cancelledTaskParity, parityLen) + freeDoubleArray(recovered, blocksLen) + freeDoubleArray(cancelledTaskRecovered, blocksLen) + + for i in 0 ..< parityLen: + paritySeq[i] = cast[seq[byte]](parity[i]) + + # call encodeAsync to get the parity + let encFut = + await erasure.encodeAsync(BlockSize.int, blocksLen, parityLen, data, parity) + check encFut.isOk + + let decFut = await erasure.decodeAsync( + BlockSize.int, blocksLen, parityLen, data, paritySeq, recovered + ) + check decFut.isOk + + # call encodeAsync and cancel the task + let encodeFut = erasure.encodeAsync( + BlockSize.int, blocksLen, parityLen, data, cancelledTaskParity + ) + encodeFut.cancel() + + try: + discard await encodeFut + except CatchableError as exc: + check exc of CancelledError + finally: + for i in 0 ..< parityLen: + check equalMem(parity[i], cancelledTaskParity[i], BlockSize.int) + + # call decodeAsync and cancel the task + let decodeFut = erasure.decodeAsync( + BlockSize.int, blocksLen, parityLen, data, paritySeq, cancelledTaskRecovered + ) + decodeFut.cancel() + + try: + discard await decodeFut + except CatchableError as exc: + check exc of CancelledError + finally: + for i in 0 ..< blocksLen: + check equalMem(recovered[i], cancelledTaskRecovered[i], BlockSize.int) diff --git a/vendor/nim-leopard b/vendor/nim-leopard index 3e09d811..7506b90f 160000 --- a/vendor/nim-leopard +++ b/vendor/nim-leopard @@ -1 +1 @@ -Subproject commit 3e09d8113f874f3584c3fe93818541b2ff9fb9c3 +Subproject commit 7506b90f9c650c02b96bf525d4fd1bd4942a495f From 25c84f4e0ee459ecf4eb62804a6a78ee67fa8c14 Mon Sep 17 00:00:00 2001 From: Giuliano Mega Date: Fri, 14 Feb 2025 10:34:17 -0300 Subject: [PATCH 13/17] Fix/repostore deletes for non-orphan blocks (#1109) * fix: fix deletion of non-orphan blocks * feat: improve error feedback for illegal direct block deletes * chore: minor rewording of test header --- codex/stores/repostore/operations.nim | 14 +++- codex/stores/repostore/store.nim | 55 +++++++++--- tests/codex/examples.nim | 4 +- tests/codex/stores/testrepostore.nim | 115 ++++++++++++++++++++++++++ 4 files changed, 173 insertions(+), 15 deletions(-) diff --git a/codex/stores/repostore/operations.nim b/codex/stores/repostore/operations.nim index dcacbd62..125741e1 100644 --- a/codex/stores/repostore/operations.nim +++ b/codex/stores/repostore/operations.nim @@ -57,6 +57,17 @@ proc putLeafMetadata*( (md.some, res), ) +proc delLeafMetadata*( + self: RepoStore, treeCid: Cid, index: Natural +): Future[?!void] {.async.} = + without key =? createBlockCidAndProofMetadataKey(treeCid, index), err: + return failure(err) + + if err =? (await self.metaDs.delete(key)).errorOption: + return failure(err) + + success() + proc getLeafMetadata*( self: RepoStore, treeCid: Cid, index: Natural ): Future[?!LeafMetadata] {.async.} = @@ -205,9 +216,6 @@ proc storeBlock*( proc tryDeleteBlock*( self: RepoStore, cid: Cid, expiryLimit = SecondsSince1970.low ): Future[?!DeleteResult] {.async.} = - if cid.isEmpty: - return success(DeleteResult(kind: InUse)) - without metaKey =? createBlockExpirationMetadataKey(cid), err: return failure(err) diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim index 2b14d6b7..d7305107 100644 --- a/codex/stores/repostore/store.nim +++ b/codex/stores/repostore/store.nim @@ -186,13 +186,13 @@ method putBlock*( return success() -method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = - ## Delete a block from the blockstore when block refCount is 0 or block is expired - ## - +proc delBlockInternal(self: RepoStore, cid: Cid): Future[?!DeleteResultKind] {.async.} = logScope: cid = cid + if cid.isEmpty: + return success(Deleted) + trace "Attempting to delete a block" without res =? await self.tryDeleteBlock(cid, self.clock.now()), err: @@ -205,12 +205,28 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = if err =? (await self.updateQuotaUsage(minusUsed = res.released)).errorOption: return failure(err) - elif res.kind == InUse: - trace "Block in use, refCount > 0 and not expired" - else: - trace "Block not found in store" - return success() + success(res.kind) + +method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = + ## Delete a block from the blockstore when block refCount is 0 or block is expired + ## + + logScope: + cid = cid + + without outcome =? await self.delBlockInternal(cid), err: + return failure(err) + + case outcome + of InUse: + failure("Directly deleting a block that is part of a dataset is not allowed.") + of NotFound: + trace "Block not found, ignoring" + success() + of Deleted: + trace "Block already deleted" + success() method delBlock*( self: RepoStore, treeCid: Cid, index: Natural @@ -221,12 +237,19 @@ method delBlock*( else: return failure(err) + if err =? (await self.delLeafMetadata(treeCid, index)).errorOption: + error "Failed to delete leaf metadata, block will remain on disk.", err = err.msg + return failure(err) + if err =? (await self.updateBlockMetadata(leafMd.blkCid, minusRefCount = 1)).errorOption: if not (err of BlockNotFoundError): return failure(err) - await self.delBlock(leafMd.blkCid) # safe delete, only if refCount == 0 + without _ =? await self.delBlockInternal(leafMd.blkCid), err: + return failure(err) + + success() method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} = ## Check if the block exists in the blockstore @@ -295,6 +318,18 @@ proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query = let queryKey = ?createBlockExpirationMetadataQueryKey() success Query.init(queryKey, offset = offset, limit = maxNumber) +proc blockRefCount*(self: RepoStore, cid: Cid): Future[?!Natural] {.async.} = + ## Returns the reference count for a block. If the count is zero; + ## this means the block is eligible for garbage collection. + ## + without key =? createBlockExpirationMetadataKey(cid), err: + return failure(err) + + without md =? await get[BlockMetadata](self.metaDs, key), err: + return failure(err) + + return success(md.refCount) + method getBlockExpirations*( self: RepoStore, maxNumber: int, offset: int ): Future[?!AsyncIter[BlockExpiration]] {.async, base.} = diff --git a/tests/codex/examples.nim b/tests/codex/examples.nim index 6f15182f..22a411c2 100644 --- a/tests/codex/examples.nim +++ b/tests/codex/examples.nim @@ -37,8 +37,8 @@ proc example*(_: type SignedState): SignedState = proc example*(_: type Pricing): Pricing = Pricing(address: EthAddress.example, price: uint32.rand.u256) -proc example*(_: type bt.Block): bt.Block = - let length = rand(4096) +proc example*(_: type bt.Block, size: int = 4096): bt.Block = + let length = rand(size) let bytes = newSeqWith(length, rand(uint8)) bt.Block.new(bytes).tryGet() diff --git a/tests/codex/stores/testrepostore.nim b/tests/codex/stores/testrepostore.nim index dda4ed82..0279b56f 100644 --- a/tests/codex/stores/testrepostore.nim +++ b/tests/codex/stores/testrepostore.nim @@ -12,9 +12,11 @@ import pkg/datastore import pkg/codex/stores/cachestore import pkg/codex/chunker import pkg/codex/stores +import pkg/codex/stores/repostore/operations import pkg/codex/blocktype as bt import pkg/codex/clock import pkg/codex/utils/asynciter +import pkg/codex/merkletree/codex import ../../asynctest import ../helpers @@ -354,6 +356,119 @@ asyncchecksuite "RepoStore": check has.isOk check has.get + test "should set the reference count for orphan blocks to 0": + let blk = Block.example(size = 200) + (await repo.putBlock(blk)).tryGet() + check (await repo.blockRefCount(blk.cid)).tryGet() == 0.Natural + + test "should not allow non-orphan blocks to be deleted directly": + let + repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = + 1000'nb) + dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb) + blk = dataset[0] + (manifest, tree) = makeManifestAndTree(dataset).tryGet() + treeCid = tree.rootCid.tryGet() + proof = tree.getProof(0).tryGet() + + (await repo.putBlock(blk)).tryGet() + (await repo.putCidAndProof(treeCid, 0, blk.cid, proof)).tryGet() + + let err = (await repo.delBlock(blk.cid)).error() + check err.msg == + "Directly deleting a block that is part of a dataset is not allowed." + + test "should allow non-orphan blocks to be deleted by dataset reference": + let + repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = + 1000'nb) + dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb) + blk = dataset[0] + (manifest, tree) = makeManifestAndTree(dataset).tryGet() + treeCid = tree.rootCid.tryGet() + proof = tree.getProof(0).tryGet() + + (await repo.putBlock(blk)).tryGet() + (await repo.putCidAndProof(treeCid, 0, blk.cid, proof)).tryGet() + + (await repo.delBlock(treeCid, 0.Natural)).tryGet() + check not (await blk.cid in repo) + + test "should not delete a non-orphan block until it is deleted from all parent datasets": + let + repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = + 1000'nb) + blockPool = await makeRandomBlocks(datasetSize = 768, blockSize = 256'nb) + + let + dataset1 = @[blockPool[0], blockPool[1]] + dataset2 = @[blockPool[1], blockPool[2]] + + let sharedBlock = blockPool[1] + + let + (manifest1, tree1) = makeManifestAndTree(dataset1).tryGet() + treeCid1 = tree1.rootCid.tryGet() + (manifest2, tree2) = makeManifestAndTree(dataset2).tryGet() + treeCid2 = tree2.rootCid.tryGet() + + (await repo.putBlock(sharedBlock)).tryGet() + check (await repo.blockRefCount(sharedBlock.cid)).tryGet() == 0.Natural + + let + proof1 = tree1.getProof(1).tryGet() + proof2 = tree2.getProof(0).tryGet() + + (await repo.putCidAndProof(treeCid1, 1, sharedBlock.cid, proof1)).tryGet() + check (await repo.blockRefCount(sharedBlock.cid)).tryGet() == 1.Natural + + (await repo.putCidAndProof(treeCid2, 0, sharedBlock.cid, proof2)).tryGet() + check (await repo.blockRefCount(sharedBlock.cid)).tryGet() == 2.Natural + + (await repo.delBlock(treeCid1, 1.Natural)).tryGet() + check (await repo.blockRefCount(sharedBlock.cid)).tryGet() == 1.Natural + check (await sharedBlock.cid in repo) + + (await repo.delBlock(treeCid2, 0.Natural)).tryGet() + check not (await sharedBlock.cid in repo) + + test "should clear leaf metadata when block is deleted from dataset": + let + repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = + 1000'nb) + dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb) + blk = dataset[0] + (manifest, tree) = makeManifestAndTree(dataset).tryGet() + treeCid = tree.rootCid.tryGet() + proof = tree.getProof(1).tryGet() + + (await repo.putBlock(blk)).tryGet() + (await repo.putCidAndProof(treeCid, 0.Natural, blk.cid, proof)).tryGet() + + discard (await repo.getLeafMetadata(treeCid, 0.Natural)).tryGet() + + (await repo.delBlock(treeCid, 0.Natural)).tryGet() + + let err = (await repo.getLeafMetadata(treeCid, 0.Natural)).error() + check err of BlockNotFoundError + + test "should not fail when reinserting and deleting a previously deleted block (bug #1108)": + let + repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = + 1000'nb) + dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb) + blk = dataset[0] + (manifest, tree) = makeManifestAndTree(dataset).tryGet() + treeCid = tree.rootCid.tryGet() + proof = tree.getProof(1).tryGet() + + (await repo.putBlock(blk)).tryGet() + (await repo.putCidAndProof(treeCid, 0, blk.cid, proof)).tryGet() + + (await repo.delBlock(treeCid, 0.Natural)).tryGet() + (await repo.putBlock(blk)).tryGet() + (await repo.delBlock(treeCid, 0.Natural)).tryGet() + commonBlockStoreTests( "RepoStore Sql backend", proc(): BlockStore = From dc08ff8840ef6c3bbb29a1a9fae5048edead90bf Mon Sep 17 00:00:00 2001 From: Arnaud Date: Mon, 17 Feb 2025 11:34:42 +0100 Subject: [PATCH 14/17] chore(marketplace): add a cache for storage requests (#1090) * Add cache to for requests * Change request cache description message and use const as default value * Set request cache size configuration hidden --- codex/codex.nim | 4 +++- codex/conf.nim | 13 ++++++++++++- codex/contracts/config.nim | 2 ++ codex/contracts/market.nim | 25 ++++++++++++++++++++++--- tests/contracts/testMarket.nim | 11 +++++++++++ 5 files changed, 50 insertions(+), 5 deletions(-) diff --git a/codex/codex.nim b/codex/codex.nim index 6dcfbaaa..dc577373 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -109,7 +109,9 @@ proc bootstrapInteractions(s: CodexServer): Future[void] {.async.} = quit QuitFailure let marketplace = Marketplace.new(marketplaceAddress, signer) - let market = OnChainMarket.new(marketplace, config.rewardRecipient) + let market = OnChainMarket.new( + marketplace, config.rewardRecipient, config.marketplaceRequestCacheSize + ) let clock = OnChainClock.new(provider) var client: ?ClientInteractions diff --git a/codex/conf.nim b/codex/conf.nim index ccf29a1f..2a859efb 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -44,6 +44,7 @@ import ./utils import ./nat import ./utils/natutils +from ./contracts/config import DefaultRequestCacheSize from ./validationconfig import MaxSlots, ValidationGroups export units, net, codextypes, logutils, completeCmdArg, parseCmdArg, NatConfig @@ -51,7 +52,7 @@ export ValidationGroups, MaxSlots export DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockMaintenanceInterval, - DefaultNumberOfBlocksToMaintainPerInterval + DefaultNumberOfBlocksToMaintainPerInterval, DefaultRequestCacheSize type ThreadCount* = distinct Natural @@ -359,6 +360,16 @@ type name: "reward-recipient" .}: Option[EthAddress] + marketplaceRequestCacheSize* {. + desc: + "Maximum number of StorageRequests kept in memory." & + "Reduces fetching of StorageRequest data from the contract.", + defaultValue: DefaultRequestCacheSize, + defaultValueDesc: $DefaultRequestCacheSize, + name: "request-cache-size", + hidden + .}: uint16 + case persistenceCmd* {.defaultValue: noCmd, command.}: PersistenceCmd of PersistenceCmd.prover: circuitDir* {. diff --git a/codex/contracts/config.nim b/codex/contracts/config.nim index 87cd1f2a..5493c643 100644 --- a/codex/contracts/config.nim +++ b/codex/contracts/config.nim @@ -4,6 +4,8 @@ import pkg/questionable/results export contractabi +const DefaultRequestCacheSize* = 128.uint16 + type MarketplaceConfig* = object collateral*: CollateralConfig diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index 35557050..9157b269 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -2,6 +2,7 @@ import std/strutils import pkg/ethers import pkg/upraises import pkg/questionable +import pkg/lrucache import ../utils/exceptions import ../logutils import ../market @@ -20,6 +21,7 @@ type signer: Signer rewardRecipient: ?Address configuration: ?MarketplaceConfig + requestCache: LruCache[string, StorageRequest] MarketSubscription = market.Subscription EventSubscription = ethers.Subscription @@ -27,12 +29,22 @@ type eventSubscription: EventSubscription func new*( - _: type OnChainMarket, contract: Marketplace, rewardRecipient = Address.none + _: type OnChainMarket, + contract: Marketplace, + rewardRecipient = Address.none, + requestCacheSize: uint16 = DefaultRequestCacheSize, ): OnChainMarket = without signer =? contract.signer: raiseAssert("Marketplace contract should have a signer") - OnChainMarket(contract: contract, signer: signer, rewardRecipient: rewardRecipient) + var requestCache = newLruCache[string, StorageRequest](int(requestCacheSize)) + + OnChainMarket( + contract: contract, + signer: signer, + rewardRecipient: rewardRecipient, + requestCache: requestCache, + ) proc raiseMarketError(message: string) {.raises: [MarketError].} = raise newException(MarketError, message) @@ -112,9 +124,16 @@ method requestStorage(market: OnChainMarket, request: StorageRequest) {.async.} method getRequest*( market: OnChainMarket, id: RequestId ): Future[?StorageRequest] {.async.} = + let key = $id + + if market.requestCache.contains(key): + return some market.requestCache[key] + convertEthersError: try: - return some await market.contract.getRequest(id) + let request = await market.contract.getRequest(id) + market.requestCache[key] = request + return some request except Marketplace_UnknownRequest: return none StorageRequest diff --git a/tests/contracts/testMarket.nim b/tests/contracts/testMarket.nim index a77c2aaa..2ba450a1 100644 --- a/tests/contracts/testMarket.nim +++ b/tests/contracts/testMarket.nim @@ -3,6 +3,7 @@ import std/importutils import pkg/chronos import pkg/ethers/erc20 import codex/contracts +import pkg/lrucache import ../ethertest import ./examples import ./time @@ -591,3 +592,13 @@ ethersuite "On-Chain Market": let expectedPayout = request.expectedPayout(filledAt, requestEnd.u256) check endBalanceHost == (startBalanceHost + request.ask.collateralPerSlot) check endBalanceReward == (startBalanceReward + expectedPayout) + + test "the request is added in cache after the fist access": + await market.requestStorage(request) + + check market.requestCache.contains($request.id) == false + discard await market.getRequest(request.id) + + check market.requestCache.contains($request.id) == true + let cacheValue = market.requestCache[$request.id] + check cacheValue == request From 5af3477793191bb5eea3ac5861d72b654af6506e Mon Sep 17 00:00:00 2001 From: Eric <5089238+emizzle@users.noreply.github.com> Date: Tue, 18 Feb 2025 09:00:52 +1100 Subject: [PATCH 15/17] chore(deps): bump ethers to propagate cancellations (#1116) * chore(deps): bump ethers to propagate cancellations Ethers was swallowing canellations and turning them into EthersErrors, which was causing the sales statemachine to error when it should have been simply cancelling the current state's run. Hopefully fixes the intermittently failing marketplace integration test. * Add missing errors in async raises pragma * bump to version of ethers that supports cancellations --------- Co-authored-by: Arnaud --- codex/contracts/provider.nim | 10 +++++----- tests/contracts/helpers/mockprovider.nim | 2 +- tests/contracts/testDeployment.nim | 2 +- vendor/nim-ethers | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/codex/contracts/provider.nim b/codex/contracts/provider.nim index b7fc5602..b1576bb0 100644 --- a/codex/contracts/provider.nim +++ b/codex/contracts/provider.nim @@ -14,7 +14,7 @@ proc raiseProviderError(message: string) {.raises: [ProviderError].} = proc blockNumberAndTimestamp*( provider: Provider, blockTag: BlockTag -): Future[(UInt256, UInt256)] {.async: (raises: [ProviderError]).} = +): Future[(UInt256, UInt256)] {.async: (raises: [ProviderError, CancelledError]).} = without latestBlock =? await provider.getBlock(blockTag): raiseProviderError("Could not get latest block") @@ -25,7 +25,7 @@ proc blockNumberAndTimestamp*( proc binarySearchFindClosestBlock( provider: Provider, epochTime: int, low: UInt256, high: UInt256 -): Future[UInt256] {.async: (raises: [ProviderError]).} = +): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} = let (_, lowTimestamp) = await provider.blockNumberAndTimestamp(BlockTag.init(low)) let (_, highTimestamp) = await provider.blockNumberAndTimestamp(BlockTag.init(high)) if abs(lowTimestamp.truncate(int) - epochTime) < @@ -39,7 +39,7 @@ proc binarySearchBlockNumberForEpoch( epochTime: UInt256, latestBlockNumber: UInt256, earliestBlockNumber: UInt256, -): Future[UInt256] {.async: (raises: [ProviderError]).} = +): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} = var low = earliestBlockNumber var high = latestBlockNumber @@ -65,7 +65,7 @@ proc binarySearchBlockNumberForEpoch( proc blockNumberForEpoch*( provider: Provider, epochTime: SecondsSince1970 -): Future[UInt256] {.async: (raises: [ProviderError]).} = +): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} = let epochTimeUInt256 = epochTime.u256 let (latestBlockNumber, latestBlockTimestamp) = await provider.blockNumberAndTimestamp(BlockTag.latest) @@ -118,6 +118,6 @@ proc blockNumberForEpoch*( proc pastBlockTag*( provider: Provider, blocksAgo: int -): Future[BlockTag] {.async: (raises: [ProviderError]).} = +): Future[BlockTag] {.async: (raises: [ProviderError, CancelledError]).} = let head = await provider.getBlockNumber() return BlockTag.init(head - blocksAgo.abs.u256) diff --git a/tests/contracts/helpers/mockprovider.nim b/tests/contracts/helpers/mockprovider.nim index 09e65398..c5be8ad7 100644 --- a/tests/contracts/helpers/mockprovider.nim +++ b/tests/contracts/helpers/mockprovider.nim @@ -13,7 +13,7 @@ type MockProvider* = ref object of Provider method getBlock*( provider: MockProvider, tag: BlockTag -): Future[?Block] {.async: (raises: [ProviderError]).} = +): Future[?Block] {.async: (raises: [ProviderError, CancelledError]).} = try: if tag == BlockTag.latest: if latestBlock =? provider.latest: diff --git a/tests/contracts/testDeployment.nim b/tests/contracts/testDeployment.nim index a439e42a..86a5fe00 100644 --- a/tests/contracts/testDeployment.nim +++ b/tests/contracts/testDeployment.nim @@ -12,7 +12,7 @@ type MockProvider = ref object of Provider method getChainId*( provider: MockProvider -): Future[UInt256] {.async: (raises: [ProviderError]).} = +): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} = return provider.chainId proc configFactory(): CodexConf = diff --git a/vendor/nim-ethers b/vendor/nim-ethers index 1cfccb96..d2b11a86 160000 --- a/vendor/nim-ethers +++ b/vendor/nim-ethers @@ -1 +1 @@ -Subproject commit 1cfccb9695fa47860bf7ef3d75da9019096a3933 +Subproject commit d2b11a865796a55296027f8ffba68398035ad435 From 6e73338425a41f40adfa554763a17efe1f5dbcf5 Mon Sep 17 00:00:00 2001 From: Arnaud Date: Mon, 17 Feb 2025 23:04:04 +0100 Subject: [PATCH 16/17] Remove deprecated function (#1111) Co-authored-by: Dmitriy Ryajov --- codex/manifest/manifest.nim | 3 --- codex/streams/storestream.nim | 2 +- tests/codex/blockexchange/discovery/testdiscovery.nim | 4 ++-- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/codex/manifest/manifest.nim b/codex/manifest/manifest.nim index 0bc51dfc..cbb0bace 100644 --- a/codex/manifest/manifest.nim +++ b/codex/manifest/manifest.nim @@ -162,9 +162,6 @@ func verify*(self: Manifest): ?!void = return success() -func cid*(self: Manifest): ?!Cid {.deprecated: "use treeCid instead".} = - self.treeCid.success - func `==`*(a, b: Manifest): bool = (a.treeCid == b.treeCid) and (a.datasetSize == b.datasetSize) and (a.blockSize == b.blockSize) and (a.version == b.version) and (a.hcodec == b.hcodec) and diff --git a/codex/streams/storestream.nim b/codex/streams/storestream.nim index 85b0e354..a68e2ea7 100644 --- a/codex/streams/storestream.nim +++ b/codex/streams/storestream.nim @@ -110,7 +110,7 @@ method readOnce*( raise newLPStreamReadError(error) trace "Reading bytes from store stream", - manifestCid = self.manifest.cid.get(), + manifestCid = self.manifest.treeCid, numBlocks = self.manifest.blocksCount, blockNum, blkCid = blk.cid, diff --git a/tests/codex/blockexchange/discovery/testdiscovery.nim b/tests/codex/blockexchange/discovery/testdiscovery.nim index 88331c3f..97a455e1 100644 --- a/tests/codex/blockexchange/discovery/testdiscovery.nim +++ b/tests/codex/blockexchange/discovery/testdiscovery.nim @@ -96,9 +96,9 @@ asyncchecksuite "Block Advertising and Discovery": await engine.stop() - test "Should advertise both manifests and trees": + test "Should advertise trees": let - cids = @[manifest.cid.tryGet, manifest.treeCid] + cids = @[manifest.treeCid] advertised = initTable.collect: for cid in cids: {cid: newFuture[void]()} From 0107eb06fe61ec0a86b8f60ae58d0833ccf0c2d2 Mon Sep 17 00:00:00 2001 From: Arnaud Date: Tue, 18 Feb 2025 06:47:47 +0100 Subject: [PATCH 17/17] chore(marketplace): cid should be bytes (#1073) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Change cid format from string to bytes for the marketplace * refactor: marketplace custom errors handling * chore: update contracts repo * chore: update contracts submodule * Update contracts submodule * Initialize the Cid using init function * Restorage serialize pragma * Use Cid object instead of buffer * Simplify cid usage * Simplify cid usage * bump codex-contracts-eth after PR merge, formatting * fix rebase * collateralPerByte => collateralPerSlot --------- Co-authored-by: Adam Uhlíř Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com> --- codex/contracts/requests.nim | 19 +++++++++++++++--- codex/node.nim | 26 ++++++++----------------- codex/sales/salescontext.nim | 3 ++- tests/codex/node/testcontracts.nim | 5 ++--- tests/codex/node/testnode.nim | 2 +- tests/codex/sales/states/testfilled.nim | 2 +- tests/codex/sales/testsales.nim | 12 ++++++++---- tests/contracts/testMarket.nim | 1 + tests/examples.nim | 2 +- tests/integration/testecbug.nim | 2 +- tests/integration/testpurchasing.nim | 2 ++ vendor/codex-contracts-eth | 2 +- 12 files changed, 44 insertions(+), 34 deletions(-) diff --git a/codex/contracts/requests.nim b/codex/contracts/requests.nim index 48947602..98d8c342 100644 --- a/codex/contracts/requests.nim +++ b/codex/contracts/requests.nim @@ -6,8 +6,10 @@ import pkg/nimcrypto import pkg/ethers/fields import pkg/questionable/results import pkg/stew/byteutils +import pkg/libp2p/[cid, multicodec] import ../logutils import ../utils/json +from ../errors import mapFailure export contractabi @@ -29,7 +31,7 @@ type maxSlotLoss* {.serialize.}: uint64 StorageContent* = object - cid* {.serialize.}: string + cid* {.serialize.}: Cid merkleRoot*: array[32, byte] Slot* = object @@ -120,6 +122,9 @@ func fromTuple(_: type StorageAsk, tupl: tuple): StorageAsk = func fromTuple(_: type StorageContent, tupl: tuple): StorageContent = StorageContent(cid: tupl[0], merkleRoot: tupl[1]) +func solidityType*(_: type Cid): string = + solidityType(seq[byte]) + func solidityType*(_: type StorageContent): string = solidityType(StorageContent.fieldTypes) @@ -129,6 +134,10 @@ func solidityType*(_: type StorageAsk): string = func solidityType*(_: type StorageRequest): string = solidityType(StorageRequest.fieldTypes) +# Note: it seems to be ok to ignore the vbuffer offset for now +func encode*(encoder: var AbiEncoder, cid: Cid) = + encoder.write(cid.data.buffer) + func encode*(encoder: var AbiEncoder, content: StorageContent) = encoder.write(content.fieldValues) @@ -141,8 +150,12 @@ func encode*(encoder: var AbiEncoder, id: RequestId | SlotId | Nonce) = func encode*(encoder: var AbiEncoder, request: StorageRequest) = encoder.write(request.fieldValues) -func encode*(encoder: var AbiEncoder, request: Slot) = - encoder.write(request.fieldValues) +func encode*(encoder: var AbiEncoder, slot: Slot) = + encoder.write(slot.fieldValues) + +func decode*(decoder: var AbiDecoder, T: type Cid): ?!T = + let data = ?decoder.read(seq[byte]) + Cid.init(data).mapFailure func decode*(decoder: var AbiDecoder, T: type StorageContent): ?!T = let tupl = ?decoder.read(StorageContent.fieldTypes) diff --git a/codex/node.nim b/codex/node.nim index 2602bfe6..062ec2ce 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -501,10 +501,7 @@ proc setupRequest( collateralPerByte: collateralPerByte, maxSlotLoss: tolerance, ), - content: StorageContent( - cid: $manifestBlk.cid, # TODO: why string? - merkleRoot: verifyRoot, - ), + content: StorageContent(cid: manifestBlk.cid, merkleRoot: verifyRoot), expiry: expiry, ) @@ -561,16 +558,14 @@ proc onStore( ## store data in local storage ## + let cid = request.content.cid + logScope: - cid = request.content.cid + cid = $cid slotIdx = slotIdx trace "Received a request to store a slot" - without cid =? Cid.init(request.content.cid).mapFailure, err: - trace "Unable to parse Cid", cid - return failure(err) - without manifest =? (await self.fetchManifest(cid)), err: trace "Unable to fetch manifest for cid", cid, err = err.msg return failure(err) @@ -640,7 +635,7 @@ proc onProve( ## let - cidStr = slot.request.content.cid + cidStr = $slot.request.content.cid slotIdx = slot.slotIndex.truncate(Natural) logScope: @@ -689,14 +684,9 @@ proc onProve( failure "Prover not enabled" proc onExpiryUpdate( - self: CodexNodeRef, rootCid: string, expiry: SecondsSince1970 + self: CodexNodeRef, rootCid: Cid, expiry: SecondsSince1970 ): Future[?!void] {.async.} = - without cid =? Cid.init(rootCid): - trace "Unable to parse Cid", cid - let error = newException(CodexError, "Unable to parse Cid") - return failure(error) - - return await self.updateExpiry(cid, expiry) + return await self.updateExpiry(rootCid, expiry) proc onClear(self: CodexNodeRef, request: StorageRequest, slotIndex: UInt256) = # TODO: remove data from local storage @@ -719,7 +709,7 @@ proc start*(self: CodexNodeRef) {.async.} = self.onStore(request, slot, onBatch) hostContracts.sales.onExpiryUpdate = proc( - rootCid: string, expiry: SecondsSince1970 + rootCid: Cid, expiry: SecondsSince1970 ): Future[?!void] = self.onExpiryUpdate(rootCid, expiry) diff --git a/codex/sales/salescontext.nim b/codex/sales/salescontext.nim index bb0b5dc9..95f06c04 100644 --- a/codex/sales/salescontext.nim +++ b/codex/sales/salescontext.nim @@ -1,6 +1,7 @@ import pkg/questionable import pkg/questionable/results import pkg/upraises +import pkg/libp2p/cid import ../market import ../clock @@ -30,7 +31,7 @@ type OnProve* = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {. gcsafe, upraises: [] .} - OnExpiryUpdate* = proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] {. + OnExpiryUpdate* = proc(rootCid: Cid, expiry: SecondsSince1970): Future[?!void] {. gcsafe, upraises: [] .} OnClear* = proc(request: StorageRequest, slotIndex: UInt256) {.gcsafe, upraises: [].} diff --git a/tests/codex/node/testcontracts.nim b/tests/codex/node/testcontracts.nim index 52adb5f6..0930d925 100644 --- a/tests/codex/node/testcontracts.nim +++ b/tests/codex/node/testcontracts.nim @@ -78,7 +78,6 @@ asyncchecksuite "Test Node - Host contracts": erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new) manifestCid = manifestBlock.cid - manifestCidStr = $(manifestCid) (await localStore.putBlock(manifestBlock)).tryGet() @@ -99,7 +98,7 @@ asyncchecksuite "Test Node - Host contracts": expectedExpiry: SecondsSince1970 = clock.now + DefaultBlockTtl.seconds + 11123 expiryUpdateCallback = !sales.onExpiryUpdate - (await expiryUpdateCallback(manifestCidStr, expectedExpiry)).tryGet() + (await expiryUpdateCallback(manifestCid, expectedExpiry)).tryGet() for index in 0 ..< manifest.blocksCount: let @@ -116,7 +115,7 @@ asyncchecksuite "Test Node - Host contracts": test "onStore callback": let onStore = !sales.onStore var request = StorageRequest.example - request.content.cid = $verifiableBlock.cid + request.content.cid = verifiableBlock.cid request.expiry = (getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix.u256 var fetchedBytes: uint = 0 diff --git a/tests/codex/node/testnode.nim b/tests/codex/node/testnode.nim index 3f9a141a..37960232 100644 --- a/tests/codex/node/testnode.nim +++ b/tests/codex/node/testnode.nim @@ -167,7 +167,7 @@ asyncchecksuite "Test Node - Basic": check: (await verifiableBlock.cid in localStore) == true - request.content.cid == $verifiableBlock.cid + request.content.cid == verifiableBlock.cid request.content.merkleRoot == builder.verifyRoot.get.toBytes test "Should delete a single block": diff --git a/tests/codex/sales/states/testfilled.nim b/tests/codex/sales/states/testfilled.nim index f8f77da6..74413776 100644 --- a/tests/codex/sales/states/testfilled.nim +++ b/tests/codex/sales/states/testfilled.nim @@ -36,7 +36,7 @@ checksuite "sales state 'filled'": market.requestEnds[request.id] = 321 onExpiryUpdatePassedExpiry = -1 let onExpiryUpdate = proc( - rootCid: string, expiry: SecondsSince1970 + rootCid: Cid, expiry: SecondsSince1970 ): Future[?!void] {.async.} = onExpiryUpdatePassedExpiry = expiry return success() diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index 0d441f34..05f31057 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -46,7 +46,9 @@ asyncchecksuite "Sales - start": pricePerBytePerSecond: 1.u256, collateralPerByte: 1.u256, ), - content: StorageContent(cid: "some cid"), + content: StorageContent( + cid: Cid.init("zb2rhheVmk3bLks5MgzTqyznLu1zqGH5jrfTA1eAZXrjx7Vob").tryGet + ), expiry: (getTime() + initDuration(hours = 1)).toUnix.u256, ) @@ -64,7 +66,7 @@ asyncchecksuite "Sales - start": return success() sales.onExpiryUpdate = proc( - rootCid: string, expiry: SecondsSince1970 + rootCid: Cid, expiry: SecondsSince1970 ): Future[?!void] {.async.} = return success() @@ -158,7 +160,9 @@ asyncchecksuite "Sales": pricePerBytePerSecond: minPricePerBytePerSecond, collateralPerByte: 1.u256, ), - content: StorageContent(cid: "some cid"), + content: StorageContent( + cid: Cid.init("zb2rhheVmk3bLks5MgzTqyznLu1zqGH5jrfTA1eAZXrjx7Vob").tryGet + ), expiry: (getTime() + initDuration(hours = 1)).toUnix.u256, ) @@ -181,7 +185,7 @@ asyncchecksuite "Sales": return success() sales.onExpiryUpdate = proc( - rootCid: string, expiry: SecondsSince1970 + rootCid: Cid, expiry: SecondsSince1970 ): Future[?!void] {.async.} = return success() diff --git a/tests/contracts/testMarket.nim b/tests/contracts/testMarket.nim index 2ba450a1..6506a2d6 100644 --- a/tests/contracts/testMarket.nim +++ b/tests/contracts/testMarket.nim @@ -3,6 +3,7 @@ import std/importutils import pkg/chronos import pkg/ethers/erc20 import codex/contracts +import pkg/libp2p/cid import pkg/lrucache import ../ethertest import ./examples diff --git a/tests/examples.nim b/tests/examples.nim index c96fefd6..26013cdc 100644 --- a/tests/examples.nim +++ b/tests/examples.nim @@ -57,7 +57,7 @@ proc example*(_: type StorageRequest): StorageRequest = maxSlotLoss: 2, # 2 slots can be freed without data considered to be lost ), content: StorageContent( - cid: "zb2rhheVmk3bLks5MgzTqyznLu1zqGH5jrfTA1eAZXrjx7Vob", + cid: Cid.init("zb2rhheVmk3bLks5MgzTqyznLu1zqGH5jrfTA1eAZXrjx7Vob").tryGet, merkleRoot: array[32, byte].example, ), expiry: (60 * 60).u256, # 1 hour , diff --git a/tests/integration/testecbug.nim b/tests/integration/testecbug.nim index e7604de7..29a3bc6f 100644 --- a/tests/integration/testecbug.nim +++ b/tests/integration/testecbug.nim @@ -50,7 +50,7 @@ marketplacesuite "Bug #821 - node crashes during erasure coding": check eventually(requestId.isSome, timeout = expiry.int * 1000) let request = await marketplace.getRequest(requestId.get) - let cidFromRequest = Cid.init(request.content.cid).get() + let cidFromRequest = request.content.cid let downloaded = await clientApi.downloadBytes(cidFromRequest, local = true) check downloaded.isOk check downloaded.get.toHex == data.toHex diff --git a/tests/integration/testpurchasing.nim b/tests/integration/testpurchasing.nim index 4e08e7a8..ebae78f6 100644 --- a/tests/integration/testpurchasing.nim +++ b/tests/integration/testpurchasing.nim @@ -47,6 +47,8 @@ twonodessuite "Purchasing": ).get let request = client1.getPurchase(id).get.request.get + + check request.content.cid.data.buffer.len > 0 check request.ask.duration == 100.u256 check request.ask.pricePerBytePerSecond == 1.u256 check request.ask.proofProbability == 3.u256 diff --git a/vendor/codex-contracts-eth b/vendor/codex-contracts-eth index e74d3397..0f2012b1 160000 --- a/vendor/codex-contracts-eth +++ b/vendor/codex-contracts-eth @@ -1 +1 @@ -Subproject commit e74d3397a133eaf1eb95d9ce59f56747a7c8c30b +Subproject commit 0f2012b1442c404605c8ba9dcae2f4e53058cd2c