From 1acedcf71c05e3e80e3d3634c719acf058357ffa Mon Sep 17 00:00:00 2001 From: Eric <5089238+emizzle@users.noreply.github.com> Date: Fri, 16 Jan 2026 21:47:59 +1100 Subject: [PATCH 1/4] fix(ci): introduce a number of integration test fixes (#1342) Signed-off-by: Slava <20563034+veaceslavdoina@users.noreply.github.com> Co-authored-by: Slava <20563034+veaceslavdoina@users.noreply.github.com> Co-authored-by: Arnaud Co-authored-by: gmega --- .../actions/nimbus-build-system/action.yml | 30 +- .github/workflows/ci-reusable.yml | 11 +- .github/workflows/ci.yml | 1 + Makefile | 16 +- build.nims | 39 ++- codex.nimble | 2 +- codex/logutils.nim | 3 + config.nims | 4 +- tests/asynctest.nim | 12 +- tests/codex/slots/sampler/testutils.nim | 6 +- tests/ethertest.nim | 4 +- tests/helpers.nim | 2 + tests/integration/1_minute/testcli.nim | 92 +++++- tests/integration/1_minute/testecbug.nim | 12 +- .../30_minutes/testmarketplace.nim.ignore | 9 +- .../30_minutes/testvalidator.nim.ignore | 28 +- tests/integration/codexconfig.nim | 3 +- tests/integration/codexprocess.nim | 95 ++++-- tests/integration/hardhatprocess.nim | 108 +++++- tests/integration/marketplacesuite.nim | 5 +- tests/integration/multinodes.nim | 309 ++++++++++++------ tests/integration/nodeprocess.nim | 94 +++--- tests/integration/scripts/winkillprocess.sh | 97 ++++++ tests/integration/utils.nim | 92 ++++++ vendor/nim-chronos | 2 +- 25 files changed, 830 insertions(+), 246 deletions(-) create mode 100644 tests/integration/scripts/winkillprocess.sh create mode 100644 tests/integration/utils.nim diff --git a/.github/actions/nimbus-build-system/action.yml b/.github/actions/nimbus-build-system/action.yml index f70af604..4acbff33 100644 --- a/.github/actions/nimbus-build-system/action.yml +++ b/.github/actions/nimbus-build-system/action.yml @@ -29,6 +29,7 @@ runs: shell: ${{ inputs.shell }} {0} run: | curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs/ | sh -s -- --default-toolchain=${{ inputs.rust_version }} -y + echo "$HOME/.cargo/bin" >> "$GITHUB_PATH" - name: APT (Linux amd64/arm64) if: inputs.os == 'linux' && (inputs.cpu == 'amd64' || inputs.cpu == 'arm64') @@ -83,7 +84,7 @@ runs: - name: Install gcc 14 on Linux # We don't want to install gcc 14 for coverage (Ubuntu 20.04) - if : ${{ inputs.os == 'linux' && inputs.coverage != 'true' }} + if: ${{ inputs.os == 'linux' && inputs.coverage != 'true' }} shell: ${{ inputs.shell }} {0} run: | # Skip for older Ubuntu versions @@ -101,15 +102,22 @@ runs: if: inputs.os == 'linux' || inputs.os == 'macos' uses: hendrikmuhs/ccache-action@v1.2 with: - create-symlink: true - key: ${{ inputs.os }}-${{ inputs.builder }}-${{ inputs.cpu }}-${{ inputs.tests }}-${{ inputs.nim_version }} + create-symlink: false + key: ${{ inputs.os }}-${{ inputs.builder }}-${{ inputs.cpu }}-${{ inputs.tests }}-${{ inputs.nim_version }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }} evict-old-files: 7d + - name: Add ccache to path on Linux/Mac + if: inputs.os == 'linux' || inputs.os == 'macos' + shell: ${{ inputs.shell }} {0} + run: | + echo "/usr/lib/ccache:/usr/local/opt/ccache/libexec" >> "$GITHUB_PATH" + echo "/usr/local/opt/ccache/libexec" >> "$GITHUB_PATH" + - name: Install ccache on Windows if: inputs.os == 'windows' uses: hendrikmuhs/ccache-action@v1.2 with: - key: ${{ inputs.os }}-${{ inputs.builder }}-${{ inputs.cpu }}-${{ inputs.tests }}-${{ inputs.nim_version }} + key: ${{ inputs.os }}-${{ inputs.builder }}-${{ inputs.cpu }}-${{ inputs.tests }}-${{ inputs.nim_version }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }} evict-old-files: 7d - name: Enable ccache on Windows @@ -117,11 +125,11 @@ runs: shell: ${{ inputs.shell }} {0} run: | CCACHE_DIR=$(dirname $(which ccache))/ccached - mkdir ${CCACHE_DIR} - ln -s $(which ccache) ${CCACHE_DIR}/gcc.exe - ln -s $(which ccache) ${CCACHE_DIR}/g++.exe - ln -s $(which ccache) ${CCACHE_DIR}/cc.exe - ln -s $(which ccache) ${CCACHE_DIR}/c++.exe + mkdir -p ${CCACHE_DIR} + ln -sf $(which ccache) ${CCACHE_DIR}/gcc.exe + ln -sf $(which ccache) ${CCACHE_DIR}/g++.exe + ln -sf $(which ccache) ${CCACHE_DIR}/cc.exe + ln -sf $(which ccache) ${CCACHE_DIR}/c++.exe echo "export PATH=${CCACHE_DIR}:\$PATH" >> $HOME/.bash_profile # prefix path in MSYS2 - name: Derive environment variables @@ -202,10 +210,10 @@ runs: - name: Restore Nim toolchain binaries from cache id: nim-cache uses: actions/cache@v4 - if : ${{ inputs.coverage != 'true' }} + if: ${{ inputs.coverage != 'true' }} with: path: NimBinaries - key: ${{ inputs.os }}-${{ inputs.cpu }}-nim-${{ inputs.nim_version }}-cache-${{ env.cache_nonce }}-${{ github.run_id }} + key: ${{ inputs.os }}-${{ inputs.cpu }}-nim-${{ inputs.nim_version }}-cache-${{ env.cache_nonce }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }} restore-keys: ${{ inputs.os }}-${{ inputs.cpu }}-nim-${{ inputs.nim_version }}-cache-${{ env.cache_nonce }} - name: Set NIM_COMMIT diff --git a/.github/workflows/ci-reusable.yml b/.github/workflows/ci-reusable.yml index b6131b18..ae3aecbe 100644 --- a/.github/workflows/ci-reusable.yml +++ b/.github/workflows/ci-reusable.yml @@ -54,13 +54,20 @@ jobs: with: node-version: 22 - - name: Start Ethereum node with Logos Storage contracts + - name: Install Ethereum node dependencies if: matrix.tests == 'contract' || matrix.tests == 'integration' || matrix.tests == 'tools' || matrix.tests == 'all' working-directory: vendor/logos-storage-contracts-eth env: MSYS2_PATH_TYPE: inherit run: | npm ci + + - name: Run Ethereum node with Logos Storage contracts + if: matrix.tests == 'contract' || matrix.tests == 'integration' || matrix.tests == 'tools' || matrix.tests == 'all' + working-directory: vendor/logos-storage-contracts-eth + env: + MSYS2_PATH_TYPE: inherit + run: | npm start & # Wait for the contracts to be deployed sleep 5 @@ -75,7 +82,7 @@ jobs: if: matrix.tests == 'integration' || matrix.tests == 'all' env: CODEX_INTEGRATION_TEST_INCLUDES: ${{ matrix.includes }} - run: make -j${ncpu} testIntegration + run: make -j${ncpu} DEBUG=${{ runner.debug }} testIntegration - name: Upload integration tests log files uses: actions/upload-artifact@v4 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a09ebce1..0d24507d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,6 +18,7 @@ concurrency: jobs: matrix: + name: Compute matrix runs-on: ubuntu-latest outputs: matrix: ${{ steps.matrix.outputs.matrix }} diff --git a/Makefile b/Makefile index d9b9d70e..8b5bc371 100644 --- a/Makefile +++ b/Makefile @@ -140,10 +140,24 @@ testContracts: | build deps echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim testContracts $(NIM_PARAMS) --define:ws_resubscribe=240 build.nims +TEST_PARAMS := +ifdef DEBUG + TEST_PARAMS := $(TEST_PARAMS) -d:DebugTestHarness=$(DEBUG) + TEST_PARAMS := $(TEST_PARAMS) -d:NoCodexLogFilters=$(DEBUG) + TEST_PARAMS := $(TEST_PARAMS) -d:ShowContinuousStatusUpdates=$(DEBUG) + TEST_PARAMS := $(TEST_PARAMS) -d:DebugHardhat=$(DEBUG) +endif +ifdef TEST_TIMEOUT + TEST_PARAMS := $(TEST_PARAMS) -d:TestTimeout=$(TEST_TIMEOUT) +endif +ifdef PARALLEL + TEST_PARAMS := $(TEST_PARAMS) -d:EnableParallelTests=$(PARALLEL) +endif + # Builds and runs the integration tests testIntegration: | build deps echo -e $(BUILD_MSG) "build/$@" && \ - $(ENV_SCRIPT) nim testIntegration $(NIM_PARAMS) --define:ws_resubscribe=240 build.nims + $(ENV_SCRIPT) nim testIntegration $(TEST_PARAMS) $(NIM_PARAMS) --define:ws_resubscribe=240 build.nims # Builds and runs all tests (except for Taiko L2 tests) testAll: | build deps diff --git a/build.nims b/build.nims index 47e848b3..72f44921 100644 --- a/build.nims +++ b/build.nims @@ -1,9 +1,20 @@ mode = ScriptMode.Verbose import std/os except commandLineParams +import std/strutils ### Helper functions -proc buildBinary(srcName: string, outName = os.lastPathPart(srcName), srcDir = "./", params = "", lang = "c") = +proc truthy(val: string): bool = + const truthySwitches = @["yes", "1", "on", "true"] + return val in truthySwitches + +proc buildBinary( + srcName: string, + outName = os.lastPathPart(srcName), + srcDir = "./", + params = "", + lang = "c", +) = if not dirExists "build": mkDir "build" @@ -43,10 +54,8 @@ proc buildLibrary(name: string, srcDir = "./", params = "", `type` = "dynamic") exec "nim c" & " --out:build/" & name & ".a --threads:on --app:staticlib --opt:size --noMain --mm:refc --header --d:metrics " & "--nimMainPrefix:libstorage -d:noSignalHandler " & - "-d:LeopardExtraCompilerFlags=-fPIC " & - "-d:chronicles_runtime_filtering " & - "-d:chronicles_log_level=TRACE " & - params & " " & srcDir & name & ".nim" + "-d:LeopardExtraCompilerFlags=-fPIC " & "-d:chronicles_runtime_filtering " & + "-d:chronicles_log_level=TRACE " & params & " " & srcDir & name & ".nim" proc test(name: string, outName = name, srcDir = "tests/", params = "", lang = "c") = buildBinary name, outName, srcDir, params @@ -61,7 +70,8 @@ task toolsCirdl, "build tools/cirdl binary": buildBinary "tools/cirdl/cirdl" task testStorage, "Build & run Logos Storage tests": - test "testCodex", outName = "testStorage", params = "-d:storage_enable_proof_failures=true" + test "testCodex", + outName = "testStorage", params = "-d:storage_enable_proof_failures=true" task testContracts, "Build & run Logos Storage Contract tests": test "testContracts" @@ -70,11 +80,18 @@ task testIntegration, "Run integration tests": buildBinary "codex", outName = "storage", params = - "-d:chronicles_runtime_filtering -d:chronicles_log_level=TRACE -d:storage_enable_proof_failures=true" - test "testIntegration" + "-d:chronicles_runtime_filtering -d:chronicles_log_level=TRACE -d:chronicles_disabled_topics=JSONRPC-HTTP-CLIENT,websock,libp2p,discv5 -d:codex_enable_proof_failures=true" + var sinks = @["textlines[nocolors,file]"] + for i in 2 ..< paramCount(): + if "DebugTestHarness" in paramStr(i) and truthy paramStr(i).split('=')[1]: + sinks.add "textlines[stdout]" + break + var testParams = + "-d:chronicles_log_level=TRACE -d:chronicles_sinks=\"" & sinks.join(",") & "\"" + test "testIntegration", params = testParams # use params to enable logging from the integration test executable # test "testIntegration", params = "-d:chronicles_sinks=textlines[notimestamps,stdout],textlines[dynamic] " & - # "-d:chronicles_enabled_topics:integration:TRACE" + # "-d:chronicles_enabled_topics:integration:TRACE" task build, "build Logos Storage binary": storageTask() @@ -139,7 +156,9 @@ task coverage, "generates code coverage report": nimSrcs ) echo " ======== Generating HTML coverage report ======== " - exec("genhtml coverage/coverage.f.info --keep-going --output-directory coverage/report ") + exec( + "genhtml coverage/coverage.f.info --keep-going --output-directory coverage/report " + ) echo " ======== Coverage report Done ======== " task showCoverage, "open coverage html": diff --git a/codex.nimble b/codex.nimble index 43c39219..61e1f470 100644 --- a/codex.nimble +++ b/codex.nimble @@ -4,6 +4,6 @@ description = "p2p data durability engine" license = "MIT" binDir = "build" srcDir = "." -installFiles = @["build.nims"] +installFiles = @["build.nims"] include "build.nims" diff --git a/codex/logutils.nim b/codex/logutils.nim index ae27df7f..f3b98548 100644 --- a/codex/logutils.nim +++ b/codex/logutils.nim @@ -92,6 +92,7 @@ import std/sugar import std/typetraits import pkg/chronicles except toJson, `%` +from pkg/chronos import TransportAddress from pkg/libp2p import Cid, MultiAddress, `$` import pkg/questionable import pkg/questionable/results @@ -255,3 +256,5 @@ formatIt(LogFormat.textLines, array[32, byte]): it.short0xHexLog formatIt(LogFormat.json, array[32, byte]): it.to0xHex +formatIt(TransportAddress): + $it diff --git a/config.nims b/config.nims index b1bc4cbe..e9e3eb0a 100644 --- a/config.nims +++ b/config.nims @@ -65,8 +65,8 @@ else: # https://gcc.gnu.org/bugzilla/show_bug.cgi?id=65782 # ("-fno-asynchronous-unwind-tables" breaks Nim's exception raising, sometimes) switch("passC", "-march=x86-64") - else: switch("passC", "-march=native") - + else: + switch("passC", "-march=native") --tlsEmulation: off diff --git a/tests/asynctest.nim b/tests/asynctest.nim index 4db8277f..6f9d2d42 100644 --- a/tests/asynctest.nim +++ b/tests/asynctest.nim @@ -1,3 +1,13 @@ import pkg/asynctest/chronos/unittest2 -export unittest2 +export unittest2 except eventually + +template eventuallySafe*( + expression: untyped, timeout = 5000, pollInterval = 1000 +): bool = + ## More sane defaults, for use with HTTP connections + eventually(expression, timeout, pollInterval) + +template eventually*(expression: untyped, timeout = 5000, pollInterval = 10): bool = + ## Fast defaults, do not use with HTTP connections! + eventually(expression, timeout, pollInterval) diff --git a/tests/codex/slots/sampler/testutils.nim b/tests/codex/slots/sampler/testutils.nim index f20b5efc..5460fde7 100644 --- a/tests/codex/slots/sampler/testutils.nim +++ b/tests/codex/slots/sampler/testutils.nim @@ -77,15 +77,13 @@ asyncchecksuite "Test proof sampler utils": ) proc getExpectedIndices(n: int): seq[Natural] = - return collect( - newSeq, + return collect(newSeq): (; for i in 1 .. n: cellIndex( proofInput.entropy, proofInput.slotRoot, proofInput.nCellsPerSlot, i ) - ), - ) + ) check: slotCellIndices(3) == getExpectedIndices(3) diff --git a/tests/ethertest.nim b/tests/ethertest.nim index 2cab8bf5..636760a1 100644 --- a/tests/ethertest.nim +++ b/tests/ethertest.nim @@ -5,6 +5,8 @@ import pkg/chronos import ./asynctest import ./checktest +const HardhatPort {.intdefine.}: int = 8545 + ## Unit testing suite that sets up an Ethereum testing environment. ## Injects a `ethProvider` instance, and a list of `accounts`. ## Calls the `evm_snapshot` and `evm_revert` methods to ensure that any @@ -16,7 +18,7 @@ template ethersuite*(name, body) = var snapshot: JsonNode setup: - ethProvider = JsonRpcProvider.new("ws://localhost:8545") + ethProvider = JsonRpcProvider.new("ws://localhost:" & $HardhatPort) snapshot = await send(ethProvider, "evm_snapshot") accounts = await ethProvider.listAccounts() teardown: diff --git a/tests/helpers.nim b/tests/helpers.nim index 742bc10d..e938015f 100644 --- a/tests/helpers.nim +++ b/tests/helpers.nim @@ -4,6 +4,8 @@ import helpers/templeveldb import std/times import std/sequtils, chronos +import ./asynctest + export multisetup, trackers, templeveldb ### taken from libp2p errorhelpers.nim diff --git a/tests/integration/1_minute/testcli.nim b/tests/integration/1_minute/testcli.nim index 778608b8..699b1551 100644 --- a/tests/integration/1_minute/testcli.nim +++ b/tests/integration/1_minute/testcli.nim @@ -1,31 +1,91 @@ import std/tempfiles +import std/times import codex/conf import codex/utils/fileutils import ../../asynctest import ../../checktest import ../codexprocess import ../nodeprocess +import ../utils import ../../examples +const HardhatPort {.intdefine.}: int = 8545 +const CodexApiPort {.intdefine.}: int = 8080 +const CodexDiscPort {.intdefine.}: int = 8090 +const CodexLogToFile {.booldefine.}: bool = false +const CodexLogLevel {.strdefine.}: string = "" +const CodexLogsDir {.strdefine.}: string = "" + asyncchecksuite "Command line interface": + let startTime = now().format("yyyy-MM-dd'_'HH:mm:ss") let key = "4242424242424242424242424242424242424242424242424242424242424242" - proc startCodex(args: seq[string]): Future[CodexProcess] {.async.} = - return await CodexProcess.startNode(args, false, "cli-test-node") + var currentTestName = "" + var testCount = 0 + var nodeCount = 0 + + template test(tname, tbody) = + inc testCount + currentTestName = tname + test tname: + tbody + + proc addLogFile(args: seq[string]): seq[string] = + var args = args + when CodexLogToFile: + args.add( + "--log-file=" & + getLogFile( + CodexLogsDir, + startTime, + "Command line interface", + currentTestName, + "Client", + some nodeCount mod testCount, + ) + ) + when CodexLogLevel != "": + args.add "--log-level=" & CodexLogLevel + + return args + + proc startCodex(arguments: seq[string]): Future[CodexProcess] {.async.} = + inc nodeCount + let args = arguments.addLogFile + return await CodexProcess.startNode( + args.concat( + @[ + "--api-port=" & $(await nextFreePort(CodexApiPort + nodeCount)), + "--disc-port=" & $(await nextFreePort(CodexDiscPort + nodeCount)), + ] + ), + debug = false, + "cli-test-node", + ) test "complains when persistence is enabled without ethereum account": let node = await startCodex(@["persistence"]) await node.waitUntilOutput("Persistence enabled, but no Ethereum account was set") - await node.stop() + # Expect the codex process to return an exit code of 1 indicating the result + # of the operation was unsuccessful. + await node.stop(expectedExitCode = 1) test "complains when ethereum private key file has wrong permissions": let unsafeKeyFile = genTempPath("", "") discard unsafeKeyFile.writeFile(key, 0o666) - let node = await startCodex(@["persistence", "--eth-private-key=" & unsafeKeyFile]) + let node = await startCodex( + @[ + "persistence", + "--eth-provider=" & "ws://localhost:" & $HardhatPort, + "--eth-private-key=" & unsafeKeyFile, + ] + ) await node.waitUntilOutput( "Ethereum private key file does not have safe file permissions" ) - await node.stop() + # Expect the codex process to return an exit code of 1 indicating the result + # of the operation was unsuccessful. + await node.stop(expectedExitCode = 1) discard removeFile(unsafeKeyFile) let @@ -36,25 +96,37 @@ asyncchecksuite "Command line interface": test "suggests downloading of circuit files when persistence is enabled without accessible r1cs file": let node = await startCodex(@["persistence", "prover", marketplaceArg]) await node.waitUntilOutput(expectedDownloadInstruction) - await node.stop() + # Expect the codex process to return an exit code of 1 indicating the result + # of the operation was unsuccessful. + await node.stop(expectedExitCode = 1) test "suggests downloading of circuit files when persistence is enabled without accessible wasm file": let node = await startCodex( @[ - "persistence", "prover", marketplaceArg, + "persistence", + "--eth-provider=" & "ws://localhost:" & $HardhatPort, + "prover", + marketplaceArg, "--circom-r1cs=tests/circuits/fixtures/proof_main.r1cs", ] ) await node.waitUntilOutput(expectedDownloadInstruction) - await node.stop() + # Expect the codex process to return an exit code of 1 indicating the result + # of the operation was unsuccessful. + await node.stop(expectedExitCode = 1) test "suggests downloading of circuit files when persistence is enabled without accessible zkey file": let node = await startCodex( @[ - "persistence", "prover", marketplaceArg, + "persistence", + "--eth-provider=" & "ws://localhost:" & $HardhatPort, + "prover", + marketplaceArg, "--circom-r1cs=tests/circuits/fixtures/proof_main.r1cs", "--circom-wasm=tests/circuits/fixtures/proof_main.wasm", ] ) await node.waitUntilOutput(expectedDownloadInstruction) - await node.stop() + # Expect the codex process to return an exit code of 1 indicating the result + # of the operation was unsuccessful. + await node.stop(expectedExitCode = 1) diff --git a/tests/integration/1_minute/testecbug.nim b/tests/integration/1_minute/testecbug.nim index a5bfa832..74f8aaff 100644 --- a/tests/integration/1_minute/testecbug.nim +++ b/tests/integration/1_minute/testecbug.nim @@ -9,12 +9,12 @@ marketplacesuite( ): test "should be able to create storage request and download dataset", NodeConfigs( - clients: CodexConfigs - .init(nodes = 1) - # .debug() # uncomment to enable console log output.debug() - .withLogFile() - # uncomment to output log file to tests/integration/logs/ //_.log - .withLogTopics("node", "erasure", "marketplace").some, + clients: CodexConfigs.init(nodes = 1) + # .debug() # uncomment to enable console log output.debug() + # .withLogFile() + # uncomment to output log file to tests/integration/logs/ //_.log + # .withLogTopics("node", "erasure", "marketplace") + .some, providers: CodexConfigs.init(nodes = 0).some, ): let diff --git a/tests/integration/30_minutes/testmarketplace.nim.ignore b/tests/integration/30_minutes/testmarketplace.nim.ignore index b04626c4..15059664 100644 --- a/tests/integration/30_minutes/testmarketplace.nim.ignore +++ b/tests/integration/30_minutes/testmarketplace.nim.ignore @@ -6,6 +6,7 @@ import ../../contracts/deployment import ./../marketplacesuite import ../twonodes import ../nodeconfigs +from ../../helpers import eventuallySafe marketplacesuite(name = "Marketplace", stopOnRequestFail = true): let marketplaceConfig = NodeConfigs( @@ -122,11 +123,11 @@ marketplacesuite(name = "Marketplace", stopOnRequestFail = true): # Checking that the hosting node received reward for at least the time between let slotSize = slotSize(blocks, ecNodes, ecTolerance) let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize - check eventually (await token.balanceOf(hostAccount)) - startBalanceHost >= + check eventuallySafe (await token.balanceOf(hostAccount)) - startBalanceHost >= (duration - 5 * 60).u256 * pricePerSlotPerSecond * ecNodes.u256 # Checking that client node receives some funds back that were not used for the host nodes - check eventually( + check eventuallySafe( (await token.balanceOf(clientAccount)) - clientBalanceBeforeFinished > 0, timeout = 10 * 1000, # give client a bit of time to withdraw its funds ) @@ -296,12 +297,12 @@ marketplacesuite(name = "Marketplace payouts", stopOnRequestFail = true): let slotSize = slotSize(blocks, ecNodes, ecTolerance) let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize - check eventually ( + check eventuallySafe ( let endBalanceProvider = (await token.balanceOf(provider.ethAccount)) endBalanceProvider > startBalanceProvider and endBalanceProvider < startBalanceProvider + expiry.u256 * pricePerSlotPerSecond ) - check eventually( + check eventuallySafe( ( let endBalanceClient = (await token.balanceOf(client.ethAccount)) let endBalanceProvider = (await token.balanceOf(provider.ethAccount)) diff --git a/tests/integration/30_minutes/testvalidator.nim.ignore b/tests/integration/30_minutes/testvalidator.nim.ignore index ed67b5d0..b6ce1bbe 100644 --- a/tests/integration/30_minutes/testvalidator.nim.ignore +++ b/tests/integration/30_minutes/testvalidator.nim.ignore @@ -28,12 +28,12 @@ marketplacesuite(name = "Validation", stopOnRequestFail = false): NodeConfigs( # Uncomment to start Hardhat automatically, typically so logs can be inspected locally hardhat: HardhatConfig.none, - clients: CodexConfigs - .init(nodes = 1) - # .debug() # uncomment to enable console log output - .withLogFile() - # uncomment to output log file to tests/integration/logs/ //_.log - .withLogTopics("purchases", "onchain").some, + clients: CodexConfigs.init(nodes = 1) + # .debug() # uncomment to enable console log output + # .withLogFile() + # uncomment to output log file to tests/integration/logs/ //_.log + # .withLogTopics("purchases", "onchain") + .some, providers: CodexConfigs .init(nodes = 1) .withSimulateProofFailures(idx = 0, failEveryNProofs = 1) @@ -47,9 +47,9 @@ marketplacesuite(name = "Validation", stopOnRequestFail = false): .withValidationGroupIndex(idx = 0, groupIndex = 0) .withValidationGroupIndex(idx = 1, groupIndex = 1) # .debug() # uncomment to enable console log output - .withLogFile() + # .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log - .withLogTopics("validator") + # .withLogTopics("validator") # each topic as a separate string argument .some, ): @@ -100,12 +100,12 @@ marketplacesuite(name = "Validation", stopOnRequestFail = false): NodeConfigs( # Uncomment to start Hardhat automatically, typically so logs can be inspected locally hardhat: HardhatConfig.none, - clients: CodexConfigs - .init(nodes = 1) - # .debug() # uncomment to enable console log output - .withLogFile() - # uncomment to output log file to tests/integration/logs/ //_.log - .withLogTopics("purchases", "onchain").some, + clients: CodexConfigs.init(nodes = 1) + # .debug() # uncomment to enable console log output + # .withLogFile() + # uncomment to output log file to tests/integration/logs/ //_.log + # .withLogTopics("purchases", "onchain") + .some, providers: CodexConfigs .init(nodes = 1) .withSimulateProofFailures(idx = 0, failEveryNProofs = 1) diff --git a/tests/integration/codexconfig.nim b/tests/integration/codexconfig.nim index 138ae274..8d0cdb33 100644 --- a/tests/integration/codexconfig.nim +++ b/tests/integration/codexconfig.nim @@ -169,7 +169,8 @@ proc withLogFile*(self: CodexConfigs): CodexConfigs {.raises: [CodexConfigError] proc withLogFile*( self: var CodexConfig, logFile: string -) {.raises: [CodexConfigError].} = #: CodexConfigs = +) {.raises: [CodexConfigError].} = + #: CodexConfigs = ## typically called internally from the test suite, sets a log file path to ## be created during the test run, for a specified node in the group # var config = self diff --git a/tests/integration/codexprocess.nim b/tests/integration/codexprocess.nim index 04c2904f..824d4c43 100644 --- a/tests/integration/codexprocess.nim +++ b/tests/integration/codexprocess.nim @@ -7,6 +7,7 @@ import pkg/ethers import pkg/libp2p import std/os import std/strutils +import std/times import codex/conf import ./codexclient import ./nodeprocess @@ -15,11 +16,28 @@ export codexclient export chronicles export nodeprocess +{.push raises: [].} + logScope: topics = "integration testing codex process" -type CodexProcess* = ref object of NodeProcess - client: ?CodexClient +type + CodexProcess* = ref object of NodeProcess + client: ?CodexClient + + CodexProcessError* = object of NodeProcessError + +proc raiseCodexProcessError( + msg: string, parent: ref CatchableError +) {.raises: [CodexProcessError].} = + raise newException(CodexProcessError, msg & ": " & parent.msg, parent) + +template convertError(msg, body: typed) = + # Don't use this in an async proc, unless body does not raise CancelledError + try: + body + except CatchableError as parent: + raiseCodexProcessError(msg, parent) method workingDir(node: CodexProcess): string = return currentSourcePath() / ".." / ".." / ".." @@ -33,44 +51,83 @@ method startedOutput(node: CodexProcess): string = method processOptions(node: CodexProcess): set[AsyncProcessOption] = return {AsyncProcessOption.StdErrToStdOut} -method outputLineEndings(node: CodexProcess): string {.raises: [].} = +method outputLineEndings(node: CodexProcess): string = return "\n" -method onOutputLineCaptured(node: CodexProcess, line: string) {.raises: [].} = +method onOutputLineCaptured(node: CodexProcess, line: string) = discard -proc dataDir(node: CodexProcess): string = - let config = CodexConf.load(cmdLine = node.arguments, quitOnFailure = false) - return config.dataDir.string +proc config(node: CodexProcess): CodexConf {.raises: [CodexProcessError].} = + # cannot use convertError here as it uses typed parameters which forces type + # resolution, while confutils.load uses untyped parameters and expects type + # resolution not to happen yet. In other words, it won't compile. + try: + return CodexConf.load( + cmdLine = node.arguments, quitOnFailure = false, secondarySources = nil + ) + except ConfigurationError as parent: + raiseCodexProcessError "Failed to load node arguments into CodexConf", parent -proc ethAccount*(node: CodexProcess): Address = - let config = CodexConf.load(cmdLine = node.arguments, quitOnFailure = false) - without ethAccount =? config.ethAccount: +proc dataDir(node: CodexProcess): string {.raises: [CodexProcessError].} = + return node.config.dataDir.string + +proc ethAccount*(node: CodexProcess): Address {.raises: [CodexProcessError].} = + without ethAccount =? node.config.ethAccount: raiseAssert "eth account not set" return Address(ethAccount) -proc apiUrl*(node: CodexProcess): string = - let config = CodexConf.load(cmdLine = node.arguments, quitOnFailure = false) - return - "http://" & config.apiBindAddress.get() & ":" & $config.apiPort & "/api/storage/v1" +proc apiUrl*(node: CodexProcess): string {.raises: [CodexProcessError].} = + let config = node.config + without apiBindAddress =? config.apiBindAddress: + raise + newException(CodexProcessError, "REST API not started: --api-bindaddr not set") + return "http://" & apiBindAddress & ":" & $config.apiPort & "/api/storage/v1" -proc client*(node: CodexProcess): CodexClient = +proc logFile*(node: CodexProcess): ?string {.raises: [CodexProcessError].} = + node.config.logFile + +proc client*(node: CodexProcess): CodexClient {.raises: [CodexProcessError].} = if client =? node.client: return client let client = CodexClient.new(node.apiUrl) node.client = some client return client -method stop*(node: CodexProcess) {.async.} = +proc updateLogFile(node: CodexProcess, newLogFile: string) = + for arg in node.arguments.mitems: + if arg.startsWith("--log-file="): + arg = "--log-file=" & newLogFile + break + +method restart*(node: CodexProcess) {.async.} = + trace "restarting codex" + await node.stop() + if logFile =? node.logFile: + # chronicles truncates the existing log file on start, so changed the log + # file cli param to create a new one + node.updateLogFile( + logFile & "_restartedAt_" & now().format("yyyy-MM-dd'_'HH-mm-ss") & ".log" + ) + await node.start() + await node.waitUntilStarted() + trace "codex process restarted" + +method stop*(node: CodexProcess) {.async: (raises: []).} = logScope: nodeName = node.name + trace "stopping codex client" await procCall NodeProcess(node).stop() - trace "stopping Storage client" + if not node.process.isNil: + trace "closing node process' streams" + await node.process.closeWait() + trace "node process' streams closed" + if client =? node.client: await client.close() node.client = none CodexClient -method removeDataDir*(node: CodexProcess) = - removeDir(node.dataDir) +method removeDataDir*(node: CodexProcess) {.raises: [CodexProcessError].} = + convertError("failed to remove codex node data directory"): + removeDir(node.dataDir) diff --git a/tests/integration/hardhatprocess.nim b/tests/integration/hardhatprocess.nim index 915c8c53..8342f05f 100644 --- a/tests/integration/hardhatprocess.nim +++ b/tests/integration/hardhatprocess.nim @@ -8,28 +8,38 @@ import pkg/stew/io2 import std/os import std/sets import std/sequtils +import std/strformat import std/strutils import pkg/codex/conf import pkg/codex/utils/trackedfutures import ./codexclient import ./nodeprocess +import ./utils export codexclient export chronicles +export nodeprocess + +{.push raises: [].} logScope: topics = "integration testing hardhat process" - nodeName = "hardhat" -type HardhatProcess* = ref object of NodeProcess - logFile: ?IoHandle +type + OnOutputLineCaptured = proc(line: string) {.gcsafe, raises: [].} + HardhatProcess* = ref object of NodeProcess + logFile: ?IoHandle + onOutputLine: OnOutputLineCaptured + + HardhatProcessError* = object of NodeProcessError method workingDir(node: HardhatProcess): string = return currentSourcePath() / ".." / ".." / ".." / "vendor" / "logos-storage-contracts-eth" method executable(node: HardhatProcess): string = - return "node_modules" / ".bin" / "hardhat" + return + "node_modules" / ".bin" / (when defined(windows): "hardhat.cmd" else: "hardhat") method startedOutput(node: HardhatProcess): string = return "Started HTTP and WebSocket JSON-RPC server at" @@ -37,7 +47,7 @@ method startedOutput(node: HardhatProcess): string = method processOptions(node: HardhatProcess): set[AsyncProcessOption] = return {} -method outputLineEndings(node: HardhatProcess): string {.raises: [].} = +method outputLineEndings(node: HardhatProcess): string = return "\n" proc openLogFile(node: HardhatProcess, logFilePath: string): IoHandle = @@ -52,7 +62,21 @@ proc openLogFile(node: HardhatProcess, logFilePath: string): IoHandle = return fileHandle -method start*(node: HardhatProcess) {.async.} = +method start*( + node: HardhatProcess +) {.async: (raises: [CancelledError, NodeProcessError]).} = + logScope: + nodeName = node.name + + var executable = "" + try: + executable = absolutePath(node.workingDir / node.executable) + if not fileExists(executable): + raiseAssert "cannot start hardhat, executable doesn't exist (looking for " & + &"{executable}). Try running `npm install` in {node.workingDir}." + except CatchableError as parent: + raiseAssert "failed build path to hardhat executable: " & parent.msg + let poptions = node.processOptions + {AsyncProcessOption.StdErrToStdOut} trace "starting node", @@ -89,19 +113,37 @@ method start*(node: HardhatProcess) {.async.} = trace "hardhat post start scripts executed" except CancelledError as error: raise error - except CatchableError as e: - error "failed to start hardhat process", error = e.msg + except CatchableError as parent: + raise newException( + HardhatProcessError, "failed to start hardhat process: " & parent.msg, parent + ) + +proc port(node: HardhatProcess): ?int = + var next = false + for arg in node.arguments: + # TODO: move to constructor + if next: + return parseInt(arg).catch.option + if arg.contains "--port": + next = true + + return none int proc startNode*( _: type HardhatProcess, args: seq[string], debug: string | bool = false, name: string, -): Future[HardhatProcess] {.async.} = + onOutputLineCaptured: OnOutputLineCaptured = nil, +): Future[HardhatProcess] {.async: (raises: [CancelledError, NodeProcessError]).} = + logScope: + nodeName = name + var logFilePath = "" var arguments = newSeq[string]() for arg in args: + # TODO: move to constructor if arg.contains "--log-file=": logFilePath = arg.split("=")[1] else: @@ -114,17 +156,25 @@ proc startNode*( arguments: arguments, debug: ($debug != "false"), trackedFutures: TrackedFutures.new(), - name: "hardhat", + name: name, + onOutputLine: onOutputLineCaptured, ) await hardhat.start() + # TODO: move to constructor if logFilePath != "": hardhat.logFile = some hardhat.openLogFile(logFilePath) return hardhat method onOutputLineCaptured(node: HardhatProcess, line: string) = + logScope: + nodeName = node.name + + if not node.onOutputLine.isNil: + node.onOutputLine(line) + without logFile =? node.logFile: return @@ -133,13 +183,49 @@ method onOutputLineCaptured(node: HardhatProcess, line: string) = discard logFile.closeFile() node.logFile = none IoHandle -method stop*(node: HardhatProcess) {.async.} = +proc closeProcessStreams(node: HardhatProcess) {.async: (raises: []).} = + when not defined(windows): + if not node.process.isNil: + trace "closing node process' streams" + await node.process.closeWait() + trace "node process' streams closed" + else: + # Windows hangs when attempting to close hardhat's process streams, so try + # to kill the process externally. + without port =? node.port: + error "Failed to get port from Hardhat args" + return + try: + let cmdResult = await forceKillProcess("node.exe", &"--port {port}") + if cmdResult.status > 0: + error "Failed to forcefully kill windows hardhat process", + port, exitCode = cmdResult.status, stderr = cmdResult.stdError + else: + trace "Successfully killed windows hardhat process by force", + port, exitCode = cmdResult.status, stdout = cmdResult.stdOutput + except ValueError, OSError: + let eMsg = getCurrentExceptionMsg() + error "Failed to forcefully kill windows hardhat process, bad path to command", + error = eMsg + except CancelledError as e: + discard + except AsyncProcessError as e: + error "Failed to forcefully kill windows hardhat process", port, error = e.msg + except AsyncProcessTimeoutError as e: + error "Timeout while forcefully killing windows hardhat process", + port, error = e.msg + +method stop*(node: HardhatProcess) {.async: (raises: []).} = # terminate the process await procCall NodeProcess(node).stop() + await node.closeProcessStreams() + if logFile =? node.logFile: trace "closing hardhat log file" discard logFile.closeFile() + node.process = nil + method removeDataDir*(node: HardhatProcess) = discard diff --git a/tests/integration/marketplacesuite.nim b/tests/integration/marketplacesuite.nim index 5a0a11a6..cc0ee246 100644 --- a/tests/integration/marketplacesuite.nim +++ b/tests/integration/marketplacesuite.nim @@ -86,7 +86,10 @@ template marketplacesuite*(name: string, stopOnRequestFail: bool, body: untyped) duration: uint64, collateralPerByte: UInt256, minPricePerBytePerSecond: UInt256, - ): Future[void] {.async: (raises: [CancelledError, HttpError, ConfigurationError]).} = + ): Future[void] {. + async: + (raises: [CancelledError, HttpError, ConfigurationError, CodexProcessError]) + .} = let totalCollateral = datasetSize.u256 * collateralPerByte # post availability to each provider for i in 0 ..< providers().len: diff --git a/tests/integration/multinodes.nim b/tests/integration/multinodes.nim index 5f149585..05bb6fb0 100644 --- a/tests/integration/multinodes.nim +++ b/tests/integration/multinodes.nim @@ -1,3 +1,4 @@ +import std/httpclient import std/os import std/sequtils import std/strutils @@ -13,6 +14,7 @@ import ./codexprocess import ./hardhatconfig import ./hardhatprocess import ./nodeconfigs +import ./utils import ../asynctest import ../checktest @@ -24,6 +26,8 @@ export hardhatconfig export codexconfig export nodeconfigs +{.push raises: [].} + type RunningNode* = ref object role*: Role @@ -36,31 +40,33 @@ type Hardhat MultiNodeSuiteError = object of CatchableError + SuiteTimeoutError = object of MultiNodeSuiteError -const jsonRpcProviderUrl* = "ws://localhost:8545" +const HardhatPort {.intdefine.}: int = 8545 +const CodexApiPort {.intdefine.}: int = 8080 +const CodexDiscPort {.intdefine.}: int = 8090 +const TestId {.strdefine.}: string = "TestId" +const CodexLogToFile {.booldefine.}: bool = false +const CodexLogLevel {.strdefine.}: string = "" +const CodexLogsDir {.strdefine.}: string = "" -proc raiseMultiNodeSuiteError(msg: string) = - raise newException(MultiNodeSuiteError, msg) +proc raiseMultiNodeSuiteError( + msg: string, parent: ref CatchableError = nil +) {.raises: [MultiNodeSuiteError].} = + raise newException(MultiNodeSuiteError, msg, parent) -proc nextFreePort*(startPort: int): Future[int] {.async.} = - proc client(server: StreamServer, transp: StreamTransport) {.async.} = - await transp.closeWait() +template withLock(lock: AsyncLock, body: untyped) = + if lock.isNil: + lock = newAsyncLock() - var port = startPort - while true: - trace "checking if port is free", port + await lock.acquire() + try: + body + finally: try: - let host = initTAddress("127.0.0.1", port) - # We use ReuseAddr here only to be able to reuse the same IP/Port when - # there's a TIME_WAIT socket. It's useful when running the test multiple - # times or if a test ran previously using the same port. - var server = createStreamServer(host, client, {ReuseAddr}) - trace "port is free", port - await server.closeWait() - return port - except TransportOsError: - trace "port is not free", port - inc port + lock.release() + except AsyncLockError as parent: + raiseMultiNodeSuiteError "lock error", parent proc sanitize(pathSegment: string): string = var sanitized = pathSegment @@ -71,8 +77,8 @@ proc sanitize(pathSegment: string): string = proc getTempDirName*(starttime: string, role: Role, roleIdx: int): string = getTempDir() / "Storage" / sanitize($starttime) / sanitize($role & "_" & $roleIdx) -template multinodesuite*(name: string, body: untyped) = - asyncchecksuite name: +template multinodesuite*(suiteName: string, body: untyped) = + asyncchecksuite suiteName: # Following the problem described here: # https://github.com/NomicFoundation/hardhat/issues/2053 # It may be desirable to use http RPC provider. @@ -85,7 +91,7 @@ template multinodesuite*(name: string, body: untyped) = # If you want to use a different provider url in the nodes, you can # use withEthProvider config modifier in the node config # to set the desired provider url. E.g.: - # NodeConfigs( + # NodeConfigs( # hardhat: # HardhatConfig.none, # clients: @@ -93,6 +99,7 @@ template multinodesuite*(name: string, body: untyped) = # .withEthProvider("ws://localhost:8545") # .some, # ... + var jsonRpcProviderUrl = "ws://localhost:" & $HardhatPort var running {.inject, used.}: seq[RunningNode] var bootstrapNodes: seq[string] let starttime = now().format("yyyy-MM-dd'_'HH:mm:ss") @@ -101,6 +108,10 @@ template multinodesuite*(name: string, body: untyped) = var ethProvider {.inject, used.}: JsonRpcProvider var accounts {.inject, used.}: seq[Address] var snapshot: JsonNode + var lastUsedHardhatPort = HardhatPort + var lastUsedCodexApiPort = CodexApiPort + var lastUsedCodexDiscPort = CodexDiscPort + var codexPortLock: AsyncLock template test(tname, startNodeConfigs, tbody) = currentTestName = tname @@ -108,47 +119,50 @@ template multinodesuite*(name: string, body: untyped) = test tname: tbody - proc sanitize(pathSegment: string): string = - var sanitized = pathSegment - for invalid in invalidFilenameChars.items: - sanitized = sanitized.replace(invalid, '_').replace(' ', '_') - sanitized - - proc getLogFile(role: Role, index: ?int): string = - # create log file path, format: - # tests/integration/logs/ //_.log - - var logDir = - currentSourcePath.parentDir() / "logs" / sanitize($starttime & "__" & name) / - sanitize($currentTestName) - createDir(logDir) - - var fn = $role - if idx =? index: - fn &= "_" & $idx - fn &= ".log" - - let fileName = logDir / fn - return fileName + proc updatePort(url: var string, port: int) = + let parts = url.split(':') + url = @[parts[0], parts[1], $port].join(":") proc newHardhatProcess( config: HardhatConfig, role: Role - ): Future[NodeProcess] {.async.} = + ): Future[NodeProcess] {.async: (raises: [MultiNodeSuiteError, CancelledError]).} = var args: seq[string] = @[] if config.logFile: - let updatedLogFile = getLogFile(role, none int) - args.add "--log-file=" & updatedLogFile + try: + let updatedLogFile = getLogFile( + CodexLogsDir, starttime, suiteName, currentTestName, $role, none int + ) + args.add "--log-file=" & updatedLogFile + except IOError as e: + raiseMultiNodeSuiteError( + "failed to start hardhat because logfile path could not be obtained: " & + e.msg, + e, + ) + except OSError as e: + raiseMultiNodeSuiteError( + "failed to start hardhat because logfile path could not be obtained: " & + e.msg, + e, + ) + + let port = await nextFreePort(lastUsedHardhatPort) + jsonRpcProviderUrl.updatePort(port) + args.add("--port") + args.add($port) + lastUsedHardhatPort = port try: let node = await HardhatProcess.startNode(args, config.debugEnabled, "hardhat") + await node.waitUntilStarted() trace "hardhat node started" return node except NodeProcessError as e: - raiseMultiNodeSuiteError "cannot start hardhat process: " & e.msg + raiseMultiNodeSuiteError "hardhat node not started: " & e.msg proc newCodexProcess( roleIdx: int, conf: CodexConfig, role: Role - ): Future[NodeProcess] {.async.} = + ): Future[NodeProcess] {.async: (raises: [MultiNodeSuiteError, CancelledError]).} = let nodeIdx = running.len var config = conf @@ -156,34 +170,60 @@ template multinodesuite*(name: string, body: untyped) = raiseMultiNodeSuiteError "Cannot start node at nodeIdx " & $nodeIdx & ", not enough eth accounts." - let datadir = getTempDirName(starttime, role, roleIdx) + let datadir = getDataDir(TestId, currentTestName, $starttime, $role, some roleIdx) try: - if config.logFile.isSome: - let updatedLogFile = getLogFile(role, some roleIdx) - config.withLogFile(updatedLogFile) + if config.logFile.isSome or CodexLogToFile: + try: + let updatedLogFile = getLogFile( + CodexLogsDir, starttime, suiteName, currentTestName, $role, some roleIdx + ) + config.withLogFile(updatedLogFile) + except IOError as e: + raiseMultiNodeSuiteError( + "failed to start " & $role & + " because logfile path could not be obtained: " & e.msg, + e, + ) + except OSError as e: + raiseMultiNodeSuiteError( + "failed to start " & $role & + " because logfile path could not be obtained: " & e.msg, + e, + ) + + when CodexLogLevel != "": + config.addCliOption("--log-level", CodexLogLevel) + + var apiPort, discPort: int + withLock(codexPortLock): + apiPort = await nextFreePort(lastUsedCodexApiPort + nodeIdx) + discPort = await nextFreePort(lastUsedCodexDiscPort + nodeIdx) + config.addCliOption("--api-port", $apiPort) + config.addCliOption("--disc-port", $discPort) + lastUsedCodexApiPort = apiPort + lastUsedCodexDiscPort = discPort for bootstrapNode in bootstrapNodes: config.addCliOption("--bootstrap-node", bootstrapNode) - config.addCliOption("--api-port", $await nextFreePort(8080 + nodeIdx)) + config.addCliOption("--data-dir", datadir) config.addCliOption("--nat", "none") config.addCliOption("--listen-addrs", "/ip4/127.0.0.1/tcp/0") - config.addCliOption("--disc-port", $await nextFreePort(8090 + nodeIdx)) except CodexConfigError as e: raiseMultiNodeSuiteError "invalid cli option, error: " & e.msg - let node = await CodexProcess.startNode( - config.cliArgs, config.debugEnabled, $role & $roleIdx - ) - try: + let node = await CodexProcess.startNode( + config.cliArgs, config.debugEnabled, $role & $roleIdx + ) await node.waitUntilStarted() trace "node started", nodeName = $role & $roleIdx + return node + except CodexConfigError as e: + raiseMultiNodeSuiteError "failed to get cli args from config: " & e.msg, e except NodeProcessError as e: - raiseMultiNodeSuiteError "node not started, error: " & e.msg - - return node + raiseMultiNodeSuiteError "node not started, error: " & e.msg, e proc hardhat(): HardhatProcess = for r in running: @@ -209,7 +249,9 @@ template multinodesuite*(name: string, body: untyped) = if r.role == Role.Validator: CodexProcess(r.node) - proc startHardhatNode(config: HardhatConfig): Future[NodeProcess] {.async.} = + proc startHardhatNode( + config: HardhatConfig + ): Future[NodeProcess] {.async: (raises: [MultiNodeSuiteError, CancelledError]).} = return await newHardhatProcess(config, Role.Hardhat) proc startClientNode(conf: CodexConfig): Future[NodeProcess] {.async.} = @@ -221,44 +263,64 @@ template multinodesuite*(name: string, body: untyped) = ) return await newCodexProcess(clientIdx, config, Role.Client) - proc startProviderNode(conf: CodexConfig): Future[NodeProcess] {.async.} = - let providerIdx = providers().len - var config = conf - config.addCliOption(StartUpCmd.persistence, "--eth-provider", jsonRpcProviderUrl) - config.addCliOption( - StartUpCmd.persistence, "--eth-account", $accounts[running.len] - ) - config.addCliOption( - PersistenceCmd.prover, "--circom-r1cs", - "vendor/logos-storage-contracts-eth/verifier/networks/hardhat/proof_main.r1cs", - ) - config.addCliOption( - PersistenceCmd.prover, "--circom-wasm", - "vendor/logos-storage-contracts-eth/verifier/networks/hardhat/proof_main.wasm", - ) - config.addCliOption( - PersistenceCmd.prover, "--circom-zkey", - "vendor/logos-storage-contracts-eth/verifier/networks/hardhat/proof_main.zkey", - ) + proc startProviderNode( + conf: CodexConfig + ): Future[NodeProcess] {.async: (raises: [MultiNodeSuiteError, CancelledError]).} = + try: + let providerIdx = providers().len + var config = conf + config.addCliOption( + StartUpCmd.persistence, "--eth-provider", jsonRpcProviderUrl + ) + config.addCliOption( + StartUpCmd.persistence, "--eth-account", $accounts[running.len] + ) + config.addCliOption( + PersistenceCmd.prover, "--circom-r1cs", + "vendor/logos-storage-contracts-eth/verifier/networks/hardhat/proof_main.r1cs", + ) + config.addCliOption( + PersistenceCmd.prover, "--circom-wasm", + "vendor/logos-storage-contracts-eth/verifier/networks/hardhat/proof_main.wasm", + ) + config.addCliOption( + PersistenceCmd.prover, "--circom-zkey", + "vendor/logos-storage-contracts-eth/verifier/networks/hardhat/proof_main.zkey", + ) - return await newCodexProcess(providerIdx, config, Role.Provider) + return await newCodexProcess(providerIdx, config, Role.Provider) + except CodexConfigError as exc: + raiseMultiNodeSuiteError "Failed to start codex node, error adding cli options: " & + exc.msg, exc - proc startValidatorNode(conf: CodexConfig): Future[NodeProcess] {.async.} = - let validatorIdx = validators().len - var config = conf - config.addCliOption(StartUpCmd.persistence, "--eth-provider", jsonRpcProviderUrl) - config.addCliOption( - StartUpCmd.persistence, "--eth-account", $accounts[running.len] - ) - config.addCliOption(StartUpCmd.persistence, "--validator") + proc startValidatorNode( + conf: CodexConfig + ): Future[NodeProcess] {.async: (raises: [MultiNodeSuiteError, CancelledError]).} = + try: + let validatorIdx = validators().len + var config = conf + config.addCliOption( + StartUpCmd.persistence, "--eth-provider", jsonRpcProviderUrl + ) + config.addCliOption( + StartUpCmd.persistence, "--eth-account", $accounts[running.len] + ) + config.addCliOption(StartUpCmd.persistence, "--validator") - return await newCodexProcess(validatorIdx, config, Role.Validator) + return await newCodexProcess(validatorIdx, config, Role.Validator) + except CodexConfigError as e: + raiseMultiNodeSuiteError "Failed to start validator node, error adding cli options: " & + e.msg, e - proc teardownImpl() {.async.} = + proc teardownImpl() {.async: (raises: []).} = + trace "Tearing down test", suite = suiteName, test = currentTestName for nodes in @[validators(), clients(), providers()]: for node in nodes: await node.stop() # also stops rest client - node.removeDataDir() + try: + node.removeDataDir() + except CodexProcessError as e: + error "Failed to remove data dir during teardown", error = e.msg # if hardhat was started in the test, kill the node # otherwise revert the snapshot taken in the test setup @@ -266,15 +328,28 @@ template multinodesuite*(name: string, body: untyped) = if not hardhat.isNil: await hardhat.stop() else: - discard await send(ethProvider, "evm_revert", @[snapshot]) + try: + discard await noCancel send(ethProvider, "evm_revert", @[snapshot]) + except ProviderError as e: + error "Failed to revert hardhat state during teardown", error = e.msg - await ethProvider.close() + # TODO: JsonRpcProvider.close should NOT raise any exceptions + try: + await ethProvider.close() + except CatchableError: + discard running = @[] template failAndTeardownOnError(message: string, tryBody: untyped) = try: tryBody + except CancelledError as e: + await teardownImpl() + when declared(teardownAllIMPL): + teardownAllIMPL() + fail() + quit(1) except CatchableError as er: fatal message, error = er.msg echo "[FATAL] ", message, ": ", er.msg @@ -286,19 +361,34 @@ template multinodesuite*(name: string, body: untyped) = proc updateBootstrapNodes( node: CodexProcess - ): Future[void] {.async: (raises: [CatchableError]).} = - without ninfo =? await node.client.info(): - # raise CatchableError instead of Defect (with .get or !) so we - # can gracefully shutdown and prevent zombies - raiseMultiNodeSuiteError "Failed to get node info" - bootstrapNodes.add ninfo["spr"].getStr() + ): Future[void] {.async: (raises: [MultiNodeSuiteError]).} = + try: + without ninfo =? await node.client.info(): + # raise CatchableError instead of Defect (with .get or !) so we + # can gracefully shutdown and prevent zombies + raiseMultiNodeSuiteError "Failed to get node info" + bootstrapNodes.add ninfo["spr"].getStr() + except CatchableError as e: + raiseMultiNodeSuiteError "Failed to get node info: " & e.msg, e + + setupAll: + # When this file is run with `-d:chronicles_sinks=textlines[file]`, we + # need to set the log file path at runtime, otherwise chronicles didn't seem to + # create a log file even when using an absolute path + when defaultChroniclesStream.outputs is (FileOutput,) and CodexLogsDir.len > 0: + let logFile = + CodexLogsDir / sanitize(getAppFilename().extractFilename & ".chronicles.log") + let success = defaultChroniclesStream.outputs[0].open(logFile, fmAppend) + doAssert success, "Failed to open log file: " & logFile setup: + trace "Setting up test", suite = suiteName, test = currentTestName, nodeConfigs + if var conf =? nodeConfigs.hardhat: try: - let node = await startHardhatNode(conf) + let node = await noCancel startHardhatNode(conf) running.add RunningNode(role: Role.Hardhat, node: node) - except CatchableError as e: + except CatchableError as e: # CancelledError not raised due to noCancel echo "failed to start hardhat node" fail() quit(1) @@ -307,12 +397,16 @@ template multinodesuite*(name: string, body: untyped) = # Workaround for https://github.com/NomicFoundation/hardhat/issues/2053 # Do not use websockets, but use http and polling to stop subscriptions # from being removed after 5 minutes - ethProvider = JsonRpcProvider.new(jsonRpcProviderUrl) + ethProvider = JsonRpcProvider.new( + jsonRpcProviderUrl, pollingInterval = chronos.milliseconds(1000) + ) # if hardhat was NOT started by the test, take a snapshot so it can be # reverted in the test teardown if nodeConfigs.hardhat.isNone: snapshot = await send(ethProvider, "evm_snapshot") accounts = await ethProvider.listAccounts() + except CancelledError as e: + raise e except CatchableError as e: echo "Hardhat not running. Run hardhat manually " & "before executing tests, or include a " & "HardhatConfig in the test setup." @@ -342,7 +436,10 @@ template multinodesuite*(name: string, body: untyped) = # ensure that we have a recent block with a fresh timestamp discard await send(ethProvider, "evm_mine") + trace "Starting test", suite = suiteName, test = currentTestName + teardown: await teardownImpl() + trace "Test completed", suite = suiteName, test = currentTestName body diff --git a/tests/integration/nodeprocess.nim b/tests/integration/nodeprocess.nim index 9ac0f8c3..a45e7806 100644 --- a/tests/integration/nodeprocess.nim +++ b/tests/integration/nodeprocess.nim @@ -5,6 +5,7 @@ import pkg/chronicles import pkg/chronos/asyncproc import pkg/libp2p import std/os +import std/strformat import std/strutils import codex/conf import codex/utils/exceptions @@ -14,6 +15,8 @@ import ./codexclient export codexclient export chronicles +{.push raises: [].} + logScope: topics = "integration testing node process" @@ -39,24 +42,19 @@ method startedOutput(node: NodeProcess): string {.base, gcsafe.} = method processOptions(node: NodeProcess): set[AsyncProcessOption] {.base, gcsafe.} = raiseAssert "not implemented" -method outputLineEndings(node: NodeProcess): string {.base, gcsafe, raises: [].} = +method outputLineEndings(node: NodeProcess): string {.base, gcsafe.} = raiseAssert "not implemented" -method onOutputLineCaptured( - node: NodeProcess, line: string -) {.base, gcsafe, raises: [].} = +method onOutputLineCaptured(node: NodeProcess, line: string) {.base, gcsafe.} = raiseAssert "not implemented" -method start*(node: NodeProcess) {.base, async.} = +method start*(node: NodeProcess) {.base, async: (raises: [CancelledError]).} = logScope: nodeName = node.name let poptions = node.processOptions + {AsyncProcessOption.StdErrToStdOut} trace "starting node", - args = node.arguments, - executable = node.executable, - workingDir = node.workingDir, - processOptions = poptions + args = node.arguments, executable = node.executable, workingDir = node.workingDir try: if node.debug: @@ -81,11 +79,13 @@ proc captureOutput( trace "waiting for output", output - let stream = node.process.stdoutStream - try: while node.process.running.option == some true: - while (let line = await stream.readLine(0, node.outputLineEndings); line != ""): + while ( + let line = await node.process.stdoutStream.readLine(0, node.outputLineEndings) + line != "" + ) + : if node.debug: # would be nice if chronicles could parse and display with colors echo line @@ -95,8 +95,8 @@ proc captureOutput( node.onOutputLineCaptured(line) - await sleepAsync(1.millis) - await sleepAsync(1.millis) + await sleepAsync(1.nanos) + await sleepAsync(1.nanos) except CancelledError: discard # do not propagate as captureOutput was asyncSpawned except AsyncStreamError as e: @@ -104,7 +104,7 @@ proc captureOutput( proc startNode*[T: NodeProcess]( _: type T, args: seq[string], debug: string | bool = false, name: string -): Future[T] {.async.} = +): Future[T] {.async: (raises: [CancelledError]).} = ## Starts a Logos Storage Node with the specified arguments. ## Set debug to 'true' to see output of the node. let node = T( @@ -116,34 +116,36 @@ proc startNode*[T: NodeProcess]( await node.start() return node -method stop*(node: NodeProcess) {.base, async.} = +method stop*( + node: NodeProcess, expectedExitCode: int = 0 +) {.base, async: (raises: []).} = logScope: nodeName = node.name await node.trackedFutures.cancelTracked() - if node.process != nil: + if not node.process.isNil: + let processId = node.process.processId + trace "terminating node process...", processId try: - trace "terminating node process..." - if errCode =? node.process.terminate().errorOption: - error "failed to terminate process", errCode = $errCode + let exitCode = await noCancel node.process.terminateAndWaitForExit(2.seconds) + if exitCode > 0 and exitCode != 143 and # 143 = SIGTERM (initiated above) + exitCode != expectedExitCode: + warn "process exited with a non-zero exit code", exitCode + trace "node process terminated", exitCode + except CatchableError: + try: + let forcedExitCode = await noCancel node.process.killAndWaitForExit(3.seconds) + trace "node process forcibly killed with exit code: ", exitCode = forcedExitCode + except CatchableError as e: + warn "failed to kill node process in time, it will be killed when the parent process exits", + error = e.msg + writeStackTrace() - trace "waiting for node process to exit" - let exitCode = await node.process.waitForExit(3.seconds) - if exitCode > 0: - error "failed to exit process, check for zombies", exitCode + trace "node stopped" - trace "closing node process' streams" - await node.process.closeWait() - except CancelledError as error: - raise error - except CatchableError as e: - error "error stopping node process", error = e.msg - finally: - node.process = nil - - trace "node stopped" - -proc waitUntilOutput*(node: NodeProcess, output: string) {.async.} = +proc waitUntilOutput*( + node: NodeProcess, output: string +) {.async: (raises: [CancelledError, AsyncTimeoutError]).} = logScope: nodeName = node.name @@ -153,9 +155,21 @@ proc waitUntilOutput*(node: NodeProcess, output: string) {.async.} = let fut = node.captureOutput(output, started) node.trackedFutures.track(fut) asyncSpawn fut - await started.wait(60.seconds) # allow enough time for proof generation + try: + await started.wait(60.seconds) # allow enough time for proof generation + except AsyncTimeoutError as e: + raise e + except CancelledError as e: + raise e + except CatchableError as e: # unsure where this originates from + error "unexpected error occurred waiting for node output", error = e.msg + +proc waitUntilStarted*( + node: NodeProcess +) {.async: (raises: [CancelledError, NodeProcessError]).} = + logScope: + nodeName = node.name -proc waitUntilStarted*(node: NodeProcess) {.async.} = try: await node.waitUntilOutput(node.startedOutput) trace "node started" @@ -168,10 +182,10 @@ proc waitUntilStarted*(node: NodeProcess) {.async.} = raise newException(NodeProcessError, "node did not output '" & node.startedOutput & "'") -proc restart*(node: NodeProcess) {.async.} = +method restart*(node: NodeProcess) {.base, async.} = await node.stop() await node.start() await node.waitUntilStarted() -method removeDataDir*(node: NodeProcess) {.base.} = +method removeDataDir*(node: NodeProcess) {.base, raises: [NodeProcessError].} = raiseAssert "[removeDataDir] not implemented" diff --git a/tests/integration/scripts/winkillprocess.sh b/tests/integration/scripts/winkillprocess.sh new file mode 100644 index 00000000..b5e58ab4 --- /dev/null +++ b/tests/integration/scripts/winkillprocess.sh @@ -0,0 +1,97 @@ +#!/bin/bash + +# List all processes with a specific name +list() { + local name=$1 + echo "Listing all processes named '$name'..." + powershell.exe -Command "Get-CimInstance Win32_Process -Filter \"name = '$name'\" | Select-Object ProcessId, Name, CommandLine | Format-Table -AutoSize" +} + +# Search for processes with a specific name and command line pattern +search() { + local name=$1 + local pattern=$2 + echo "Searching for '$name' processes with command line matching '$pattern'..." + powershell.exe -Command " + \$processes = Get-CimInstance Win32_Process -Filter \"name = '$name'\" | Where-Object { \$_.CommandLine -match '$pattern' }; + if (\$processes) { + \$processes | Select-Object ProcessId, Name, CommandLine | Format-Table -AutoSize; + } else { + Write-Host \"No matching '$name' processes found\"; + } + " +} + +# Kill all processes with a specific name +killall() { + local name=$1 + echo "Finding and killing all '$name' processes..." + powershell.exe -Command " + \$processes = Get-CimInstance Win32_Process -Filter \"name = '$name'\"; + if (\$processes) { + foreach (\$process in \$processes) { + Stop-Process -Id \$process.ProcessId -Force; + Write-Host \"Killed process \$(\$process.ProcessId)\"; + } + } else { + Write-Host \"No '$name' processes found\"; + } + " +} + +# Kill processes with a specific name and command line pattern +kill() { + local name=$1 + local pattern=$2 + echo "Finding and killing '$name' processes with command line matching '$pattern'..." + powershell.exe -Command " + \$processes = Get-CimInstance Win32_Process -Filter \"name = '$name'\" | Where-Object { \$_.CommandLine -match '$pattern' }; + if (\$processes) { + foreach (\$process in \$processes) { + Stop-Process -Id \$process.ProcessId -Force; + Write-Host \"Killed process \$(\$process.ProcessId)\"; + } + } else { + Write-Host \"No matching '$name' processes found\"; + } + " +} + +# Check if being run directly or sourced +if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then + # If run directly (not sourced), provide command line interface + case "$1" in + list) + if [ -z "$2" ]; then + echo "Usage: $0 list PROCESS_NAME" + exit 1 + fi + list "$2" + ;; + search) + if [ -z "$2" ] || [ -z "$3" ]; then + echo "Usage: $0 search PROCESS_NAME COMMANDLINE_PATTERN" + exit 1 + fi + search "$2" "$3" + ;; + killall) + if [ -z "$2" ]; then + echo "Usage: $0 killall PROCESS_NAME" + exit 1 + fi + killall "$2" + ;; + kill) + if [ -z "$2" ] || [ -z "$3" ]; then + echo "Usage: $0 kill PROCESS_NAME COMMANDLINE_PATTERN" + exit 1 + fi + kill "$2" "$3" + ;; + *) + echo "Usage: $0 {list PROCESS_NAME|search PROCESS_NAME COMMANDLINE_PATTERN|killall PROCESS_NAME|kill PROCESS_NAME COMMANDLINE_PATTERN}" + exit 1 + ;; + esac +fi diff --git a/tests/integration/utils.nim b/tests/integration/utils.nim new file mode 100644 index 00000000..3e522a04 --- /dev/null +++ b/tests/integration/utils.nim @@ -0,0 +1,92 @@ +import std/os +import std/strformat +import pkg/chronos +import pkg/chronos/asyncproc +import pkg/codex/logutils + +{.push raises: [].} + +proc nextFreePort*(startPort: int): Future[int] {.async: (raises: [CancelledError]).} = + proc client(server: StreamServer, transp: StreamTransport) {.async: (raises: []).} = + await transp.closeWait() + + var port = startPort + while true: + trace "checking if port is free", port + try: + let host = initTAddress("127.0.0.1", port) + # We use ReuseAddr here only to be able to reuse the same IP/Port when + # there's a TIME_WAIT socket. It's useful when running the test multiple + # times or if a test ran previously using the same port. + var server = createStreamServer(host, client, {ReuseAddr}) + trace "port is free", port + await server.closeWait() + return port + except TransportOsError: + trace "port is not free", port + inc port + except TransportAddressError: + raiseAssert "bad address" + +proc sanitize*(pathSegment: string): string = + var sanitized = pathSegment + for invalid in invalidFilenameChars.items: + sanitized = sanitized.replace(invalid, '_').replace(' ', '_') + sanitized + +proc getLogFile*( + logDir, startTime, suiteName, testName, role: string, index = int.none +): string {.raises: [IOError, OSError].} = + let logsDir = + if logDir == "": + currentSourcePath.parentDir() / "logs" / sanitize(startTime & "__" & suiteName) / + sanitize(testName) + else: + logDir / sanitize(suiteName) / sanitize(testName) + + createDir(logsDir) + + var fn = $role + if idx =? index: + fn &= "_" & $idx + fn &= ".log" + + let fileName = logsDir / fn + return fileName + +proc appendFile*(filename: string, content: string) {.raises: [IOError].} = + ## Opens a file named `filename` for writing. Then writes the + ## `content` completely to the file and closes the file afterwards. + ## Raises an IO exception in case of an error. + var f: File + try: + f = open(filename, fmAppend) + f.write(content) + except IOError as e: + raise newException(IOError, "cannot open and write " & filename & ": " & e.msg) + finally: + close(f) + +when defined(windows): + proc forceKillProcess*( + processName, matchingCriteria: string + ): Future[CommandExResponse] {. + async: ( + raises: [ + AsyncProcessError, AsyncProcessTimeoutError, CancelledError, ValueError, + OSError, + ] + ) + .} = + let path = splitFile(currentSourcePath()).dir / "scripts" / "winkillprocess.sh" + let cmd = &"{absolutePath(path)} kill {processName} \"{matchingCriteria}\"" + trace "Forcefully killing windows process", processName, matchingCriteria, cmd + return await execCommandEx(cmd, timeout = 5.seconds) + +proc getDataDir*(testId, testName, startTime, role: string, index = int.none): string = + var suffix = role + if idx =? index: + suffix &= "_" & $idx + + getTempDir() / "Codex" / sanitize(testId) / sanitize(testName) / sanitize(startTime) / + sanitize(suffix) diff --git a/vendor/nim-chronos b/vendor/nim-chronos index c04576d8..0646c444 160000 --- a/vendor/nim-chronos +++ b/vendor/nim-chronos @@ -1 +1 @@ -Subproject commit c04576d829b8a0a1b12baaa8bc92037501b3a4a0 +Subproject commit 0646c444fce7c7ed08ef6f2c9a7abfd172ffe655 From 2073090ed78fd0766cbdf4fe516f9c8d0fb440e0 Mon Sep 17 00:00:00 2001 From: Arnaud Date: Fri, 16 Jan 2026 17:10:30 +0400 Subject: [PATCH 2/4] fix: close the discovery store (#1364) --- codex/codex.nim | 19 +++++++++++++------ codex/discovery.nim | 12 ++++++++++-- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/codex/codex.nim b/codex/codex.nim index 3c28ec01..23a77416 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -206,6 +206,7 @@ proc stop*(s: CodexServer) {.async.} = @[ s.codexNode.switch.stop(), s.codexNode.stop(), + s.codexNode.discovery.stop(), s.repoStore.stop(), s.maintenance.stop(), ] @@ -215,12 +216,15 @@ proc stop*(s: CodexServer) {.async.} = let res = await noCancel allFinishedFailed[void](futures) + s.isStarted = false + if res.failure.len > 0: error "Failed to stop Storage node", failures = res.failure.len raiseAssert "Failed to stop Storage node" proc close*(s: CodexServer) {.async.} = - var futures = @[s.codexNode.close(), s.repoStore.close()] + var futures = + @[s.codexNode.close(), s.repoStore.close(), s.codexNode.discovery.close()] let res = await noCancel allFinishedFailed[void](futures) @@ -282,12 +286,15 @@ proc new*( msg: "Unable to create discovery directory for block store: " & discoveryDir ) + let providersPath = config.dataDir / CodexDhtProvidersNamespace + let discoveryStoreRes = LevelDbDatastore.new(providersPath) + if discoveryStoreRes.isErr: + error "Failed to initialize discovery datastore", + path = providersPath, err = discoveryStoreRes.error.msg + let - discoveryStore = Datastore( - LevelDbDatastore.new(config.dataDir / CodexDhtProvidersNamespace).expect( - "Should create discovery datastore!" - ) - ) + discoveryStore = + Datastore(discoveryStoreRes.expect("Should create discovery datastore!")) discovery = Discovery.new( switch.peerInfo.privateKey, diff --git a/codex/discovery.nim b/codex/discovery.nim index 424ec9c0..1b9ba5c0 100644 --- a/codex/discovery.nim +++ b/codex/discovery.nim @@ -44,6 +44,7 @@ type Discovery* = ref object of RootObj # address that the node can be connected on dhtRecord*: ?SignedPeerRecord # record to advertice DHT connection information isStarted: bool + store: Datastore proc toNodeId*(cid: Cid): NodeId = ## Cid to discovery id @@ -215,9 +216,15 @@ proc stop*(d: Discovery) {.async: (raises: []).} = try: await noCancel d.protocol.closeWait() + d.isStarted = false except CatchableError as exc: error "Error stopping discovery", exc = exc.msg +proc close*(d: Discovery) {.async: (raises: []).} = + let res = await noCancel d.store.close() + if res.isErr: + error "Error closing discovery store", error = res.error().msg + proc new*( T: type Discovery, key: PrivateKey, @@ -230,8 +237,9 @@ proc new*( ## Create a new Discovery node instance for the given key and datastore ## - var self = - Discovery(key: key, peerId: PeerId.init(key).expect("Should construct PeerId")) + var self = Discovery( + key: key, peerId: PeerId.init(key).expect("Should construct PeerId"), store: store + ) self.updateAnnounceRecord(announceAddrs) From da70ebff7c9f8b49439c83dc196301cc34df3fb5 Mon Sep 17 00:00:00 2001 From: Arnaud Date: Fri, 16 Jan 2026 18:33:34 +0400 Subject: [PATCH 3/4] chore: improve c bindings (#1361) Signed-off-by: Arnaud --- .github/workflows/artifacts.yml | 125 +++ .github/workflows/ci.yml | 26 + .gitignore | 3 + .../{golang/hello.txt => c/hello_world.txt} | 0 examples/c/storage.c | 945 ++++++++++++++++++ examples/golang/README.md | 24 - examples/golang/storage.go | 885 ---------------- library/README.md | 475 ++++++++- library/libstorage.h | 508 +++++++--- 9 files changed, 1924 insertions(+), 1067 deletions(-) create mode 100644 .github/workflows/artifacts.yml rename examples/{golang/hello.txt => c/hello_world.txt} (100%) create mode 100644 examples/c/storage.c delete mode 100644 examples/golang/README.md delete mode 100644 examples/golang/storage.go diff --git a/.github/workflows/artifacts.yml b/.github/workflows/artifacts.yml new file mode 100644 index 00000000..38fb8599 --- /dev/null +++ b/.github/workflows/artifacts.yml @@ -0,0 +1,125 @@ +name: Libstorage artifacts + +on: + push: + tags: + - "v*" + workflow_dispatch: + +jobs: + build: + runs-on: ${{ matrix.target.os }} + strategy: + matrix: + target: + - os: ubuntu-latest + cpu: amd64 + lib_ext: so + + - os: ubuntu-24.04-arm + cpu: arm64 + lib_ext: so + + - os: macos-latest + lib_ext: dylib + cpu: arm64 + + - os: windows-latest + cpu: amd64 + lib_ext: dll + + steps: + - name: Check out sources + uses: actions/checkout@v4 + with: + submodules: recursive + + - name: Install Rust 1.85.0 + if: matrix.target.os != 'windows-latest' + uses: dtolnay/rust-toolchain@master + with: + toolchain: 1.85.0 + + - name: MSYS2 (Windows amd64) + if: matrix.target.os == 'windows-latest' && matrix.target.cpu == 'amd64' + uses: msys2/setup-msys2@v2 + with: + path-type: inherit + msystem: UCRT64 + install: >- + base-devel + git + mingw-w64-ucrt-x86_64-toolchain + mingw-w64-ucrt-x86_64-cmake + mingw-w64-ucrt-x86_64-ntldd-git + mingw-w64-ucrt-x86_64-rust + run: | + pacman -Sy --noconfirm make + git config --global core.symlinks false + + - name: Build libstorage (Linux) + if: matrix.target.lib_ext == 'so' + run: | + make -j${ncpu} update + make -j${ncpu} libstorage + + - name: Build libstorage (MacOS) + if: matrix.target.os == 'macos-latest' + run: | + make -j${ncpu} update + STORAGE_LIB_PARAMS="--passL:\"-Wl,-install_name,@rpath/libstorage.dylib\"" make -j${ncpu} libstorage + + - name: Build libstorage (Windows) + if: matrix.target.os == 'windows-latest' + shell: msys2 {0} + run: | + make -j${ncpu} update + make -j${ncpu} libstorage + + - name: Package artifacts Linux + if: matrix.target.os == 'ubuntu-latest' || matrix.target.os == 'ubuntu-24.04-arm' + run: | + sudo apt-get update && sudo apt-get install -y zip + ZIPFILE=storage-linux-${{ matrix.target.cpu }}.zip + zip -j $ZIPFILE ./build/libstorage.${{ matrix.target.lib_ext }} ./library/libstorage.h + echo "ZIPFILE=$ZIPFILE" >> $GITHUB_ENV + + - name: Package artifacts MacOS + if: matrix.target.os == 'macos-latest' + run: | + ZIPFILE=storage-macos-${{ matrix.target.cpu }}.zip + zip -j $ZIPFILE ./build/libstorage.${{ matrix.target.lib_ext }} ./library/libstorage.h + echo "ZIPFILE=$ZIPFILE" >> $GITHUB_ENV + + - name: Package artifacts (Windows) + if: matrix.target.os == 'windows-latest' + shell: msys2 {0} + run: | + ZIPFILE=storage-windows-${{ matrix.target.cpu }}.zip + (cd ./build && 7z a -tzip "${GITHUB_WORKSPACE}/${ZIPFILE}" libstorage.dll) + (cd ./library && 7z a -tzip "${GITHUB_WORKSPACE}/${ZIPFILE}" libstorage.h) + echo "ZIPFILE=$ZIPFILE" >> $GITHUB_ENV + + - name: Upload artifacts + uses: actions/upload-artifact@v4 + with: + name: ${{ env.ZIPFILE }} + path: ${{ env.ZIPFILE }} + if-no-files-found: error + + publish-release: + needs: build + runs-on: ubuntu-latest + if: startsWith(github.ref, 'refs/tags/') + steps: + - name: Check out sources + uses: actions/checkout@v4 + - name: Download artifacts + uses: actions/download-artifact@v5 + with: + path: dist + - name: Create release + uses: softprops/action-gh-release@v2 + with: + files: dist/** + draft: true diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0d24507d..4f19eb61 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -85,3 +85,29 @@ jobs: name: codecov-umbrella token: ${{ secrets.CODECOV_TOKEN }} verbose: true + + cbinding: + runs-on: ubuntu-latest + steps: + - name: Checkout sources + uses: actions/checkout@v4 + with: + submodules: recursive + ref: ${{ github.event.pull_request.head.sha }} + + - name: Setup Nimbus Build System + uses: ./.github/actions/nimbus-build-system + with: + os: linux + nim_version: ${{ env.nim_version }} + + - name: C Binding build + run: | + make -j${ncpu} update + make -j${ncpu} libstorage + + - name: C Binding test + run: | + cd examples/c + gcc -o storage storage.c -L../../build -lstorage -Wl,-rpath,../../ -pthread + LD_LIBRARY_PATH=../../build ./storage diff --git a/.gitignore b/.gitignore index f6292dda..c4aa0d4d 100644 --- a/.gitignore +++ b/.gitignore @@ -47,3 +47,6 @@ nim.cfg tests/integration/logs data/ + +examples/c/data-dir +examples/c/downloaded_hello.txt diff --git a/examples/golang/hello.txt b/examples/c/hello_world.txt similarity index 100% rename from examples/golang/hello.txt rename to examples/c/hello_world.txt diff --git a/examples/c/storage.c b/examples/c/storage.c new file mode 100644 index 00000000..36f9109c --- /dev/null +++ b/examples/c/storage.c @@ -0,0 +1,945 @@ +#include +#include +#include +#include +#include +#include +#include "../../library/libstorage.h" + +// We need 250 as max retries mainly for the start function in CI. +// Other functions should be not need that many retries. +#define MAX_RETRIES 250 + +typedef struct +{ + int ret; + char *msg; + char *chunk; + size_t len; +} Resp; + +static Resp *alloc_resp(void) +{ + Resp *r = (Resp *)calloc(1, sizeof(Resp)); + r->msg = NULL; + r->chunk = NULL; + r->ret = -1; + return r; +} + +static void free_resp(Resp *r) +{ + if (!r) + { + return; + } + + if (r->msg) + { + free(r->msg); + } + + if (r->chunk) + { + free(r->chunk); + } + + free(r); +} + +static int get_ret(Resp *r) +{ + if (!r) + { + return RET_ERR; + } + + return r->ret; +} + +// wait_resp waits until the async response is ready or max retries is reached. +// The resp is initially set to -1, to any code (RET_OK, RET_ERR, RET_PROGRESS) will +// indicate that the response is ready to be consumed. +static void wait_resp(Resp *r) +{ + int retries = 0; + + while (get_ret(r) == -1 && retries < MAX_RETRIES) + { + usleep(1000 * 100); // 100 ms + retries++; + } +} + +// is_resp_ok checks if the async response indicates success. +// It will wait first for the response to be ready. +// Then it will copy the message or chunk to res if provided. +static int is_resp_ok(Resp *r, char **res) +{ + if (!r) + { + return RET_ERR; + } + + wait_resp(r); + + int ret = (r->ret == RET_OK) ? RET_OK : RET_ERR; + + // If a response pointer is provided, it’s safe to initialize it to NULL. + if (res) + { + *res = NULL; + } + + // If the response contains a chunk (for a download or an upload with RET_PROGRESS), + // the response will be in chunk. + // Otherwise, the response will be in msg. + if (res && r->chunk) + { + *res = strdup(r->chunk); + } + else if (res && r->msg) + { + *res = strdup(r->msg); + } + + free_resp(r); + + return ret; +} + +// callback is the function that will be called by the storage library +// when an async operation is completed or has progress to report. +// - ret is the return code of the callback +// - msg is the data returned by the callback: it can be a string or a chunk +// - len is the size of that data +// - userData is the bridge between the caller and the lib. +// The caller passes this userData to the library. +// When the library invokes the callback, it passes the same userData back. The callback +// then fills it with the received information (return code, message). Once the callback +// has completed, the caller can read the populated userData. +static void callback(int ret, const char *msg, size_t len, void *userData) +{ + Resp *r = (Resp *)userData; + + // This means that the caller did not provide a valid userData pointer. + // In that case, we have nothing to do but return. + if (!r) + { + return; + } + + // Assign the return code to the response structure. + r->ret = ret; + + // If the reponse already has a message, just free it first. + if (r->msg) + { + free(r->msg); + r->msg = NULL; + r->len = 0; + } + + // For a RET_PROGRESS with chunk, copy the chunk data directly. + // This is used for upload/download chunk progress. + if (ret == RET_PROGRESS && msg && len > 0 && r->chunk) + { + memcpy(r->chunk, msg, len); + r->len = len; + } + + // For other cases, copy the message data. + if (msg && len > 0) + { + // Allocate memory for the message plus null terminator. + r->msg = (char *)malloc(len + 1); + + // Just in case malloc fails. + if (!r->msg) + { + r->len = 0; + return; + } + + memcpy(r->msg, msg, len); + + // Null terminate is needed here otherwise + // the msg will contains non valid string like "0� :g" + r->msg[len] = '\0'; + + r->len = len; + } + else + { + r->msg = NULL; + r->len = 0; + } +} + +static int read_file(const char *filepath, char **res) +{ + FILE *file; + char c; + // Just read first 100 bytes for the test + char content[100]; + + file = fopen(filepath, "r"); + + if (file == NULL) + { + return RET_ERR; + } + + fgets(content, 100, file); + + *res = strdup(content); + + fclose(file); + + return RET_OK; +} + +int setup(void **storage_ctx) +{ + // Initialize Nim runtime + extern void libstorageNimMain(void); + libstorageNimMain(); + + Resp *r = alloc_resp(); + const char *cfg = "{\"log-level\":\"WARN\",\"data-dir\":\"./data-dir\"}"; + void *ctx = storage_new(cfg, (StorageCallback)callback, r); + + if (!ctx) + { + free_resp(r); + return RET_ERR; + } + + wait_resp(r); + + if (r->ret != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + (*storage_ctx) = ctx; + + free_resp(r); + + return RET_OK; +} + +int start(void *storage_ctx) +{ + Resp *r = alloc_resp(); + + if (storage_start(storage_ctx, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + return is_resp_ok(r, NULL); +} + +int cleanup(void *storage_ctx) +{ + Resp *r = alloc_resp(); + + // Stop node + if (storage_stop(storage_ctx, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + if (is_resp_ok(r, NULL) != RET_OK) + { + return RET_ERR; + } + + r = alloc_resp(); + + // Close node + if (storage_close(storage_ctx, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + if (is_resp_ok(r, NULL) != RET_OK) + { + return RET_ERR; + } + + r = alloc_resp(); + + // Destroy node + // No need to wait here as storage_destroy is synchronous + if (storage_destroy(storage_ctx, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + free_resp(r); + + return RET_OK; +} + +int check_version(void *storage_ctx) +{ + char *res = NULL; + + Resp *r = alloc_resp(); + + // No need to wait here as storage_version is synchronous + if (storage_version(storage_ctx, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + free_resp(r); + + return RET_OK; +} + +int check_repo(void *storage_ctx) +{ + Resp *r = alloc_resp(); + char *res = NULL; + + if (storage_repo(storage_ctx, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + int ret = is_resp_ok(r, &res); + + if (strcmp(res, "./data-dir") != 0) + { + printf("repo mismatch: %s\n", res); + ret = RET_ERR; + } + + free(res); + + return ret; +} + +int check_debug(void *storage_ctx) +{ + Resp *r = alloc_resp(); + char *res = NULL; + + if (storage_debug(storage_ctx, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + int ret = is_resp_ok(r, &res); + + // Simple check to ensure the response contains spr + if (strstr(res, "spr") == NULL) + { + fprintf(stderr, "debug content mismatch, res:%s\n", res); + ret = RET_ERR; + } + + free(res); + + return ret; +} + +int check_spr(void *storage_ctx) +{ + Resp *r = alloc_resp(); + char *res = NULL; + + if (storage_spr(storage_ctx, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + int ret = is_resp_ok(r, &res); + + if (strstr(res, "spr") == NULL) + { + fprintf(stderr, "spr content mismatch, res:%s\n", res); + ret = RET_ERR; + } + + free(res); + + return ret; +} + +int check_peer_id(void *storage_ctx) +{ + Resp *r = alloc_resp(); + char *res = NULL; + + if (storage_peer_id(storage_ctx, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + return is_resp_ok(r, &res); +} + +int update_log_level(void *storage_ctx, const char *log_level) +{ + char *res = NULL; + + Resp *r = alloc_resp(); + + if (storage_log_level(storage_ctx, log_level, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + return is_resp_ok(r, NULL); +} + +int check_upload_chunk(void *storage_ctx, const char *filepath) +{ + Resp *r = alloc_resp(); + char *res = NULL; + char *session_id = NULL; + const char *payload = "hello world"; + size_t chunk_size = strlen(payload); + + if (storage_upload_init(storage_ctx, filepath, chunk_size, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + if (is_resp_ok(r, &session_id) != RET_OK) + { + return RET_ERR; + } + + uint8_t *chunk = malloc(chunk_size); + if (!chunk) + { + free(session_id); + return RET_ERR; + } + memcpy(chunk, payload, chunk_size); + + r = alloc_resp(); + + if (storage_upload_chunk(storage_ctx, session_id, chunk, chunk_size, (StorageCallback)callback, r) != RET_OK) + { + free(session_id); + free_resp(r); + free(chunk); + return RET_ERR; + } + + if (is_resp_ok(r, NULL) != RET_OK) + { + free(session_id); + free(chunk); + return RET_ERR; + } + + free(chunk); + r = alloc_resp(); + + if (storage_upload_finalize(storage_ctx, session_id, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + free(session_id); + return RET_ERR; + } + + free(session_id); + + int ret = is_resp_ok(r, &res); + + if (res == NULL || strlen(res) == 0) + { + fprintf(stderr, "CID is missing\n"); + ret = RET_ERR; + } + + free(res); + + return ret; +} + +int upload_cancel(void *storage_ctx) +{ + Resp *r = alloc_resp(); + char *session_id = NULL; + size_t chunk_size = 64 * 1024; + + if (storage_upload_init(storage_ctx, "hello.txt", chunk_size, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + if (is_resp_ok(r, &session_id) != RET_OK) + { + return RET_ERR; + } + + r = alloc_resp(); + + if (storage_upload_cancel(storage_ctx, session_id, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + free(session_id); + return RET_ERR; + } + + free(session_id); + + return is_resp_ok(r, NULL); +} + +int check_upload_file(void *storage_ctx, const char *filepath, char **res) +{ + Resp *r = alloc_resp(); + char *session_id = NULL; + size_t chunk_size = 64 * 1024; + + if (storage_upload_init(storage_ctx, filepath, chunk_size, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + if (is_resp_ok(r, &session_id) != RET_OK) + { + return RET_ERR; + } + + r = alloc_resp(); + + if (storage_upload_file(storage_ctx, session_id, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + free(session_id); + return RET_ERR; + } + + free(session_id); + + int ret = is_resp_ok(r, res); + + if (res == NULL || strlen(*res) == 0) + { + fprintf(stderr, "CID is missing\n"); + return RET_ERR; + } + + return ret; +} + +int check_download_stream(void *storage_ctx, const char *cid, const char *filepath) +{ + Resp *r = alloc_resp(); + char *res = NULL; + size_t chunk_size = 64 * 1024; + bool local = true; + + if (storage_download_init(storage_ctx, cid, chunk_size, local, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + if (is_resp_ok(r, NULL) != RET_OK) + { + return RET_ERR; + } + + r = alloc_resp(); + r->chunk = malloc(chunk_size + 1); + + if (storage_download_stream(storage_ctx, cid, chunk_size, local, filepath, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + int ret = is_resp_ok(r, &res); + + if (strncmp(res, "Hello World!", strlen("Hello World!")) != 0) + { + fprintf(stderr, "downloaded content mismatch, res:%s\n", res); + ret = RET_ERR; + } + + if (read_file("downloaded_hello.txt", &res) != RET_OK) + { + fprintf(stderr, "read downloaded file failed\n"); + ret = RET_ERR; + } + + if (strncmp(res, "Hello World!", strlen("Hello World!")) != 0) + { + fprintf(stderr, "downloaded content mismatch, res:%s\n", res); + ret = RET_ERR; + } + + free(res); + + return ret; +} + +int check_download_chunk(void *storage_ctx, const char *cid) +{ + Resp *r = alloc_resp(); + char *res = NULL; + size_t chunk_size = 64 * 1024; + bool local = true; + + if (storage_download_init(storage_ctx, cid, chunk_size, local, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + if (is_resp_ok(r, NULL) != RET_OK) + { + return RET_ERR; + } + + r = alloc_resp(); + r->chunk = malloc(chunk_size + 1); + + if (storage_download_chunk(storage_ctx, cid, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + int ret = is_resp_ok(r, &res); + + if (strncmp(res, "Hello World!", strlen("Hello World!")) != 0) + { + fprintf(stderr, "downloaded chunk content mismatch, res:%s\n", res); + ret = RET_ERR; + } + + free(res); + + return ret; +} + +int check_download_cancel(void *storage_ctx, const char *cid) +{ + Resp *r = alloc_resp(); + + if (storage_download_cancel(storage_ctx, cid, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + return is_resp_ok(r, NULL); +} + +int check_download_manifest(void *storage_ctx, const char *cid) +{ + Resp *r = alloc_resp(); + char *res = NULL; + + if (storage_download_manifest(storage_ctx, cid, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + int ret = is_resp_ok(r, &res); + + const char *expected_manifest = "{\"treeCid\":\"zDzSvJTf8JYwvysKPmG7BtzpbiAHfuwFMRphxm4hdvnMJ4XPJjKX\",\"datasetSize\":12,\"blockSize\":65536,\"filename\":\"hello_world.txt\",\"mimetype\":\"text/plain\",\"protected\":false}"; + + if (strncmp(res, expected_manifest, strlen(expected_manifest)) != 0) + { + fprintf(stderr, "downloaded manifest content mismatch, res:%s\n", res); + ret = RET_ERR; + } + + free(res); + + return ret; +} + +int check_list(void *storage_ctx) +{ + Resp *r = alloc_resp(); + char *res = NULL; + + if (storage_list(storage_ctx, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + int ret = is_resp_ok(r, &res); + + const char *expected_manifest = "{\"treeCid\":\"zDzSvJTf8JYwvysKPmG7BtzpbiAHfuwFMRphxm4hdvnMJ4XPJjKX\",\"datasetSize\":12,\"blockSize\":65536,\"filename\":\"hello_world.txt\",\"mimetype\":\"text/plain\",\"protected\":false}"; + + if (strstr(res, expected_manifest) == NULL) + { + fprintf(stderr, "downloaded manifest content mismatch, res:%s\n", res); + ret = RET_ERR; + } + + free(res); + + return ret; +} + +int check_space(void *storage_ctx) +{ + Resp *r = alloc_resp(); + char *res = NULL; + + if (storage_space(storage_ctx, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + int ret = is_resp_ok(r, &res); + + // Simple check to ensure the response contains totalBlocks + if (strstr(res, "totalBlocks") == NULL) + { + fprintf(stderr, "list content mismatch, res:%s\n", res); + ret = RET_ERR; + } + + free(res); + + return ret; +} + +int check_exists(void *storage_ctx, const char *cid, bool expected) +{ + Resp *r = alloc_resp(); + char *res = NULL; + + if (storage_exists(storage_ctx, cid, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + int ret = is_resp_ok(r, &res); + + if (expected) + { + if (strcmp(res, "true") != 0) + { + fprintf(stderr, "exists content mismatch, res:%s\n", res); + ret = RET_ERR; + } + } + else + { + if (strcmp(res, "false") != 0) + { + fprintf(stderr, "exists content mismatch, res:%s\n", res); + ret = RET_ERR; + } + } + + free(res); + + return ret; +} + +int check_delete(void *storage_ctx, const char *cid) +{ + Resp *r = alloc_resp(); + + if (storage_delete(storage_ctx, cid, (StorageCallback)callback, r) != RET_OK) + { + free_resp(r); + return RET_ERR; + } + + return is_resp_ok(r, NULL); +} + +// TODO: implement check_fetch +// It is a bit complicated because it requires two nodes +// connected together to fetch from peers. +// A good idea would be to use connect function using addresses. +// This test will be quite important when the block engine is re-implemented. +int check_fetch(void *storage_ctx, const char *cid) +{ + return RET_OK; +} + +int main(void) +{ + void *storage_ctx = NULL; + char *res = NULL; + char *cid = NULL; + + if (setup(&storage_ctx) != RET_OK) + { + fprintf(stderr, "setup failed\n"); + return RET_ERR; + } + + if (check_version(storage_ctx) != RET_OK) + { + fprintf(stderr, "check version failed\n"); + return RET_ERR; + } + + if (start(storage_ctx) != RET_OK) + { + fprintf(stderr, "start failed\n"); + return RET_ERR; + } + + if (check_repo(storage_ctx) != RET_OK) + { + fprintf(stderr, "check repo failed\n"); + return RET_ERR; + } + + if (check_debug(storage_ctx) != RET_OK) + { + fprintf(stderr, "check debug failed\n"); + return RET_ERR; + } + + if (check_spr(storage_ctx) != RET_OK) + { + fprintf(stderr, "check spr failed\n"); + return RET_ERR; + } + + if (check_peer_id(storage_ctx) != RET_OK) + { + fprintf(stderr, "check peer_id failed\n"); + return RET_ERR; + } + + if (check_upload_chunk(storage_ctx, "hello_world.txt") != RET_OK) + { + fprintf(stderr, "upload chunk failed\n"); + return RET_ERR; + } + + if (upload_cancel(storage_ctx) != RET_OK) + { + fprintf(stderr, "upload cancel failed\n"); + return RET_ERR; + } + + char *path = realpath("hello_world.txt", NULL); + + if (!path) + { + fprintf(stderr, "realpath failed\n"); + return RET_ERR; + } + + if (check_upload_file(storage_ctx, path, &cid) != RET_OK) + { + fprintf(stderr, "upload file failed\n"); + free(path); + return RET_ERR; + } + + free(path); + + if (check_download_stream(storage_ctx, cid, "downloaded_hello.txt") != RET_OK) + { + fprintf(stderr, "download stream failed\n"); + free(cid); + return RET_ERR; + } + + if (check_download_chunk(storage_ctx, cid) != RET_OK) + { + fprintf(stderr, "download chunk failed\n"); + free(cid); + return RET_ERR; + } + + if (check_download_cancel(storage_ctx, cid) != RET_OK) + { + fprintf(stderr, "download cancel failed\n"); + free(cid); + return RET_ERR; + } + + if (check_download_manifest(storage_ctx, cid) != RET_OK) + { + fprintf(stderr, "download manifest failed\n"); + free(cid); + return RET_ERR; + } + + if (check_list(storage_ctx) != RET_OK) + { + fprintf(stderr, "list failed\n"); + free(cid); + return RET_ERR; + } + + if (check_space(storage_ctx) != RET_OK) + { + fprintf(stderr, "space failed\n"); + free(cid); + return RET_ERR; + } + + if (check_exists(storage_ctx, cid, true) != RET_OK) + { + fprintf(stderr, "exists failed\n"); + free(cid); + return RET_ERR; + } + + if (check_delete(storage_ctx, cid) != RET_OK) + { + fprintf(stderr, "delete failed\n"); + free(cid); + return RET_ERR; + } + + if (check_exists(storage_ctx, cid, false) != RET_OK) + { + fprintf(stderr, "exists failed\n"); + free(cid); + return RET_ERR; + } + + free(cid); + + if (update_log_level(storage_ctx, "INFO") != RET_OK) + { + fprintf(stderr, "update log level failed\n"); + return RET_ERR; + } + + if (cleanup(storage_ctx) != RET_OK) + { + fprintf(stderr, "cleanup failed\n"); + return RET_ERR; + } + + return RET_OK; +} \ No newline at end of file diff --git a/examples/golang/README.md b/examples/golang/README.md deleted file mode 100644 index 119648c2..00000000 --- a/examples/golang/README.md +++ /dev/null @@ -1,24 +0,0 @@ - -## Pre-requisite - -libstorage.so is needed to be compiled and present in build folder. - -## Compilation - -From the Logos Storage root folder: - -```code -go build -o storage-go examples/golang/storage.go -``` - -## Run -From the storage root folder: - - -```code -export LD_LIBRARY_PATH=build -``` - -```code -./storage-go -``` diff --git a/examples/golang/storage.go b/examples/golang/storage.go deleted file mode 100644 index 5908afc9..00000000 --- a/examples/golang/storage.go +++ /dev/null @@ -1,885 +0,0 @@ -package main - -/* - #cgo LDFLAGS: -L../../build/ -lstorage - #cgo LDFLAGS: -L../../ -Wl,-rpath,../../ - - #include - #include - #include "../../library/libstorage.h" - - typedef struct { - int ret; - char* msg; - size_t len; - uintptr_t h; - } Resp; - - static void* allocResp(uintptr_t h) { - Resp* r = (Resp*)calloc(1, sizeof(Resp)); - r->h = h; - return r; - } - - static void freeResp(void* resp) { - if (resp != NULL) { - free(resp); - } - } - - static int getRet(void* resp) { - if (resp == NULL) { - return 0; - } - Resp* m = (Resp*) resp; - return m->ret; - } - - void libstorageNimMain(void); - - static void storage_host_init_once(void){ - static int done; - if (!__atomic_exchange_n(&done, 1, __ATOMIC_SEQ_CST)) libstorageNimMain(); - } - - // resp must be set != NULL in case interest on retrieving data from the callback - void callback(int ret, char* msg, size_t len, void* resp); - - static void* cGoStorageNew(const char* configJson, void* resp) { - void* ret = storage_new(configJson, (StorageCallback) callback, resp); - return ret; - } - - static int cGoStorageStart(void* storageCtx, void* resp) { - return storage_start(storageCtx, (StorageCallback) callback, resp); - } - - static int cGoStorageStop(void* storageCtx, void* resp) { - return storage_stop(storageCtx, (StorageCallback) callback, resp); - } - - static int cGoStorageClose(void* storageCtx, void* resp) { - return storage_close(storageCtx, (StorageCallback) callback, resp); - } - - static int cGoStorageDestroy(void* storageCtx, void* resp) { - return storage_destroy(storageCtx, (StorageCallback) callback, resp); - } - - static int cGoStorageVersion(void* storageCtx, void* resp) { - return storage_version(storageCtx, (StorageCallback) callback, resp); - } - - static int cGoStorageRevision(void* storageCtx, void* resp) { - return storage_revision(storageCtx, (StorageCallback) callback, resp); - } - - static int cGoStorageRepo(void* storageCtx, void* resp) { - return storage_repo(storageCtx, (StorageCallback) callback, resp); - } - - static int cGoStorageSpr(void* storageCtx, void* resp) { - return storage_spr(storageCtx, (StorageCallback) callback, resp); - } - - static int cGoStoragePeerId(void* storageCtx, void* resp) { - return storage_peer_id(storageCtx, (StorageCallback) callback, resp); - } - - static int cGoStorageUploadInit(void* storageCtx, char* filepath, size_t chunkSize, void* resp) { - return storage_upload_init(storageCtx, filepath, chunkSize, (StorageCallback) callback, resp); - } - - static int cGoStorageUploadChunk(void* storageCtx, char* sessionId, const uint8_t* chunk, size_t len, void* resp) { - return storage_upload_chunk(storageCtx, sessionId, chunk, len, (StorageCallback) callback, resp); - } - - static int cGoStorageUploadFinalize(void* storageCtx, char* sessionId, void* resp) { - return storage_upload_finalize(storageCtx, sessionId, (StorageCallback) callback, resp); - } - - static int cGoStorageUploadCancel(void* storageCtx, char* sessionId, void* resp) { - return storage_upload_cancel(storageCtx, sessionId, (StorageCallback) callback, resp); - } - - static int cGoStorageUploadFile(void* storageCtx, char* sessionId, void* resp) { - return storage_upload_file(storageCtx, sessionId, (StorageCallback) callback, resp); - } - - static int cGoStorageLogLevel(void* storageCtx, char* logLevel, void* resp) { - return storage_log_level(storageCtx, logLevel, (StorageCallback) callback, resp); - } - - static int cGoStorageExists(void* storageCtx, char* cid, void* resp) { - return storage_exists(storageCtx, cid, (StorageCallback) callback, resp); - } -*/ -import "C" -import ( - "bytes" - "encoding/json" - "errors" - "fmt" - "io" - "log" - "os" - "os/signal" - "runtime/cgo" - "sync" - "syscall" - "unsafe" -) - -type LogFormat string - -const ( - LogFormatAuto LogFormat = "auto" - LogFormatColors LogFormat = "colors" - LogFormatNoColors LogFormat = "nocolors" - LogFormatJSON LogFormat = "json" -) - -type RepoKind string - -const ( - FS RepoKind = "fs" - SQLite RepoKind = "sqlite" - LevelDb RepoKind = "leveldb" -) - -const defaultBlockSize = 1024 * 64 - -type Config struct { - // Default: INFO - LogLevel string `json:"log-level,omitempty"` - - // Specifies what kind of logs should be written to stdout - // Default: auto - LogFormat LogFormat `json:"log-format,omitempty"` - - // Enable the metrics server - // Default: false - MetricsEnabled bool `json:"metrics,omitempty"` - - // Listening address of the metrics server - // Default: 127.0.0.1 - MetricsAddress string `json:"metrics-address,omitempty"` - - // Listening HTTP port of the metrics server - // Default: 8008 - MetricsPort int `json:"metrics-port,omitempty"` - - // The directory where logos storage will store configuration and data - // Default: - // $HOME\AppData\Roaming\Logos Storage on Windows - // $HOME/Library/Application Support/Logos Storage on macOS - // $HOME/.cache/logos_storage on Linux - DataDir string `json:"data-dir,omitempty"` - - // Multi Addresses to listen on - // Default: ["/ip4/0.0.0.0/tcp/0"] - ListenAddrs []string `json:"listen-addrs,omitempty"` - - // Specify method to use for determining public address. - // Must be one of: any, none, upnp, pmp, extip: - // Default: any - Nat string `json:"nat,omitempty"` - - // Discovery (UDP) port - // Default: 8090 - DiscoveryPort int `json:"disc-port,omitempty"` - - // Source of network (secp256k1) private key file path or name - // Default: "key" - NetPrivKeyFile string `json:"net-privkey,omitempty"` - - // Specifies one or more bootstrap nodes to use when connecting to the network. - BootstrapNodes []string `json:"bootstrap-node,omitempty"` - - // The maximum number of peers to connect to. - // Default: 160 - MaxPeers int `json:"max-peers,omitempty"` - - // Number of worker threads (\"0\" = use as many threads as there are CPU cores available) - // Default: 0 - NumThreads int `json:"num-threads,omitempty"` - - // Node agent string which is used as identifier in network - // Default: "Logos Storage" - AgentString string `json:"agent-string,omitempty"` - - // Backend for main repo store (fs, sqlite, leveldb) - // Default: fs - RepoKind RepoKind `json:"repo-kind,omitempty"` - - // The size of the total storage quota dedicated to the node - // Default: 20 GiBs - StorageQuota int `json:"storage-quota,omitempty"` - - // Default block timeout in seconds - 0 disables the ttl - // Default: 30 days - BlockTtl int `json:"block-ttl,omitempty"` - - // Time interval in seconds - determines frequency of block - // maintenance cycle: how often blocks are checked for expiration and cleanup - // Default: 10 minutes - BlockMaintenanceInterval int `json:"block-mi,omitempty"` - - // Number of blocks to check every maintenance cycle - // Default: 1000 - BlockMaintenanceNumberOfBlocks int `json:"block-mn,omitempty"` - - // Number of times to retry fetching a block before giving up - // Default: 3000 - BlockRetries int `json:"block-retries,omitempty"` - - // The size of the block cache, 0 disables the cache - - // might help on slow hardrives - // Default: 0 - CacheSize int `json:"cache-size,omitempty"` - - // Default: "" (no log file) - LogFile string `json:"log-file,omitempty"` -} - -type StorageNode struct { - ctx unsafe.Pointer -} - -type ChunkSize int - -func (c ChunkSize) valOrDefault() int { - if c == 0 { - return defaultBlockSize - } - - return int(c) -} - -func (c ChunkSize) toSizeT() C.size_t { - return C.size_t(c.valOrDefault()) -} - -// bridgeCtx is used for managing the C-Go bridge calls. -// It contains a wait group for synchronizing the calls, -// a cgo.Handle for passing context to the C code, -// a response pointer for receiving data from the C code, -// and fields for storing the result and error of the call. -type bridgeCtx struct { - wg *sync.WaitGroup - h cgo.Handle - resp unsafe.Pointer - result string - err error - - // Callback used for receiving progress updates during upload/download. - // - // For the upload, the bytes parameter indicates the number of bytes uploaded. - // If the chunk size is superior or equal to the blocksize (passed in init function), - // the callback will be called when a block is put in the store. - // Otherwise, it will be called when a chunk is pushed into the stream. - // - // For the download, the bytes is the size of the chunk received, and the chunk - // is the actual chunk of data received. - onProgress func(bytes int, chunk []byte) -} - -// newBridgeCtx creates a new bridge context for managing C-Go calls. -// The bridge context is initialized with a wait group and a cgo.Handle. -func newBridgeCtx() *bridgeCtx { - bridge := &bridgeCtx{} - bridge.wg = &sync.WaitGroup{} - bridge.wg.Add(1) - bridge.h = cgo.NewHandle(bridge) - bridge.resp = C.allocResp(C.uintptr_t(uintptr(bridge.h))) - return bridge -} - -// callError creates an error message for a failed C-Go call. -func (b *bridgeCtx) callError(name string) error { - return fmt.Errorf("failed the call to %s returned code %d", name, C.getRet(b.resp)) -} - -// free releases the resources associated with the bridge context, -// including the cgo.Handle and the response pointer. -func (b *bridgeCtx) free() { - if b.h > 0 { - b.h.Delete() - b.h = 0 - } - - if b.resp != nil { - C.freeResp(b.resp) - b.resp = nil - } -} - -// callback is the function called by the C code to communicate back to Go. -// It handles progress updates, successful completions, and errors. -// The function uses the response pointer to retrieve the bridge context -// and update its state accordingly. -// -//export callback -func callback(ret C.int, msg *C.char, len C.size_t, resp unsafe.Pointer) { - if resp == nil { - return - } - - m := (*C.Resp)(resp) - m.ret = ret - m.msg = msg - m.len = len - - if m.h == 0 { - return - } - - h := cgo.Handle(m.h) - if h == 0 { - return - } - - if v, ok := h.Value().(*bridgeCtx); ok { - switch ret { - case C.RET_PROGRESS: - if v.onProgress == nil { - return - } - if msg != nil { - chunk := C.GoBytes(unsafe.Pointer(msg), C.int(len)) - v.onProgress(int(C.int(len)), chunk) - } else { - v.onProgress(int(C.int(len)), nil) - } - case C.RET_OK: - retMsg := C.GoStringN(msg, C.int(len)) - v.result = retMsg - v.err = nil - if v.wg != nil { - v.wg.Done() - } - case C.RET_ERR: - retMsg := C.GoStringN(msg, C.int(len)) - v.err = errors.New(retMsg) - if v.wg != nil { - v.wg.Done() - } - } - } -} - -// wait waits for the bridge context to complete its operation. -// It returns the result and error of the operation. -func (b *bridgeCtx) wait() (string, error) { - b.wg.Wait() - return b.result, b.err -} - -type OnUploadProgressFunc func(read, total int, percent float64, err error) - -type UploadOptions struct { - // Filepath can be the full path when using UploadFile - // otherwise the file name. - // It is used to detect the mimetype. - Filepath string - - // ChunkSize is the size of each upload chunk, passed as `blockSize` to the Logos Storage node - // store. Default is to 64 KB. - ChunkSize ChunkSize - - // OnProgress is a callback function that is called after each chunk is uploaded with: - // - read: the number of bytes read in the last chunk. - // - total: the total number of bytes read so far. - // - percent: the percentage of the total file size that has been uploaded. It is - // determined from a `stat` call if it is a file and from the length of the buffer - // if it is a buffer. Otherwise, it is 0. - // - err: an error, if one occurred. - // - // If the chunk size is more than the `chunkSize` parameter, the callback is called - // after the block is actually stored in the block store. Otherwise, it is called - // after the chunk is sent to the stream. - OnProgress OnUploadProgressFunc -} - -func getReaderSize(r io.Reader) int64 { - switch v := r.(type) { - case *os.File: - stat, err := v.Stat() - if err != nil { - return 0 - } - return stat.Size() - case *bytes.Buffer: - return int64(v.Len()) - default: - return 0 - } -} - -// New creates a new Logos Storage node with the provided configuration. -// The node is not started automatically; you need to call StorageStart -// to start it. -// It returns a Logos Storage node that can be used to interact -// with the Logos Storage network. -func New(config Config) (*StorageNode, error) { - bridge := newBridgeCtx() - defer bridge.free() - - jsonConfig, err := json.Marshal(config) - if err != nil { - return nil, err - } - - cJsonConfig := C.CString(string(jsonConfig)) - defer C.free(unsafe.Pointer(cJsonConfig)) - - ctx := C.cGoStorageNew(cJsonConfig, bridge.resp) - - if _, err := bridge.wait(); err != nil { - return nil, bridge.err - } - - return &StorageNode{ctx: ctx}, bridge.err -} - -// Start starts the Logos Storage node. -func (node StorageNode) Start() error { - bridge := newBridgeCtx() - defer bridge.free() - - if C.cGoStorageStart(node.ctx, bridge.resp) != C.RET_OK { - return bridge.callError("cGoStorageStart") - } - - _, err := bridge.wait() - return err -} - -// StartAsync is the asynchronous version of Start. -func (node StorageNode) StartAsync(onDone func(error)) { - go func() { - err := node.Start() - onDone(err) - }() -} - -// Stop stops the Logos Storage node. -func (node StorageNode) Stop() error { - bridge := newBridgeCtx() - defer bridge.free() - - if C.cGoStorageStop(node.ctx, bridge.resp) != C.RET_OK { - return bridge.callError("cGoStorageStop") - } - - _, err := bridge.wait() - return err -} - -// Destroy destroys the Logos Storage node, freeing all resources. -// The node must be stopped before calling this method. -func (node StorageNode) Destroy() error { - bridge := newBridgeCtx() - defer bridge.free() - - if C.cGoStorageClose(node.ctx, bridge.resp) != C.RET_OK { - return bridge.callError("cGoStorageClose") - } - - _, err := bridge.wait() - if err != nil { - return err - } - - if C.cGoStorageDestroy(node.ctx, bridge.resp) != C.RET_OK { - return errors.New("Failed to destroy the Logos Storage node.") - } - - return err -} - -// Version returns the version of the Logos Storage node. -func (node StorageNode) Version() (string, error) { - bridge := newBridgeCtx() - defer bridge.free() - - if C.cGoStorageVersion(node.ctx, bridge.resp) != C.RET_OK { - return "", bridge.callError("cGoStorageVersion") - } - - return bridge.wait() -} - -func (node StorageNode) Revision() (string, error) { - bridge := newBridgeCtx() - defer bridge.free() - - if C.cGoStorageRevision(node.ctx, bridge.resp) != C.RET_OK { - return "", bridge.callError("cGoStorageRevision") - } - - return bridge.wait() -} - -// Repo returns the path of the data dir folder. -func (node StorageNode) Repo() (string, error) { - bridge := newBridgeCtx() - defer bridge.free() - - if C.cGoStorageRepo(node.ctx, bridge.resp) != C.RET_OK { - return "", bridge.callError("cGoStorageRepo") - } - - return bridge.wait() -} - -func (node StorageNode) Spr() (string, error) { - bridge := newBridgeCtx() - defer bridge.free() - - if C.cGoStorageSpr(node.ctx, bridge.resp) != C.RET_OK { - return "", bridge.callError("cGoStorageSpr") - } - - return bridge.wait() -} - -func (node StorageNode) PeerId() (string, error) { - bridge := newBridgeCtx() - defer bridge.free() - - if C.cGoStoragePeerId(node.ctx, bridge.resp) != C.RET_OK { - return "", bridge.callError("cGoStoragePeerId") - } - - return bridge.wait() -} - -// UploadInit initializes a new upload session. -// It returns a session ID that can be used for subsequent upload operations. -// This function is called by UploadReader and UploadFile internally. -// You should use this function only if you need to manage the upload session manually. -func (node StorageNode) UploadInit(options *UploadOptions) (string, error) { - bridge := newBridgeCtx() - defer bridge.free() - - var cFilename = C.CString(options.Filepath) - defer C.free(unsafe.Pointer(cFilename)) - - if C.cGoStorageUploadInit(node.ctx, cFilename, options.ChunkSize.toSizeT(), bridge.resp) != C.RET_OK { - return "", bridge.callError("cGoStorageUploadInit") - } - - return bridge.wait() -} - -// UploadChunk uploads a chunk of data to the Logos Storage node. -// It takes the session ID returned by UploadInit -// and a byte slice containing the chunk data. -// This function is called by UploadReader internally. -// You should use this function only if you need to manage the upload session manually. -func (node StorageNode) UploadChunk(sessionId string, chunk []byte) error { - bridge := newBridgeCtx() - defer bridge.free() - - var cSessionId = C.CString(sessionId) - defer C.free(unsafe.Pointer(cSessionId)) - - var cChunkPtr *C.uint8_t - if len(chunk) > 0 { - cChunkPtr = (*C.uint8_t)(unsafe.Pointer(&chunk[0])) - } - - if C.cGoStorageUploadChunk(node.ctx, cSessionId, cChunkPtr, C.size_t(len(chunk)), bridge.resp) != C.RET_OK { - return bridge.callError("cGoStorageUploadChunk") - } - - _, err := bridge.wait() - return err -} - -// UploadFinalize finalizes the upload session and returns the CID of the uploaded file. -// It takes the session ID returned by UploadInit. -// This function is called by UploadReader and UploadFile internally. -// You should use this function only if you need to manage the upload session manually. -func (node StorageNode) UploadFinalize(sessionId string) (string, error) { - bridge := newBridgeCtx() - defer bridge.free() - - var cSessionId = C.CString(sessionId) - defer C.free(unsafe.Pointer(cSessionId)) - - if C.cGoStorageUploadFinalize(node.ctx, cSessionId, bridge.resp) != C.RET_OK { - return "", bridge.callError("cGoStorageUploadFinalize") - } - - return bridge.wait() -} - -// UploadCancel cancels an ongoing upload session. -// It can be only if the upload session is managed manually. -// It doesn't work with UploadFile. -func (node StorageNode) UploadCancel(sessionId string) error { - bridge := newBridgeCtx() - defer bridge.free() - - var cSessionId = C.CString(sessionId) - defer C.free(unsafe.Pointer(cSessionId)) - - if C.cGoStorageUploadCancel(node.ctx, cSessionId, bridge.resp) != C.RET_OK { - return bridge.callError("cGoStorageUploadCancel") - } - - _, err := bridge.wait() - return err -} - -// UploadReader uploads data from an io.Reader to the Logos Storage node. -// It takes the upload options and the reader as parameters. -// It returns the CID of the uploaded file or an error. -// -// Internally, it calls: -// - UploadInit to create the upload session. -// - UploadChunk to upload a chunk to Logos Storage. -// - UploadFinalize to finalize the upload session. -// - UploadCancel if an error occurs. -func (node StorageNode) UploadReader(options UploadOptions, r io.Reader) (string, error) { - sessionId, err := node.UploadInit(&options) - if err != nil { - return "", err - } - - buf := make([]byte, options.ChunkSize.valOrDefault()) - total := 0 - - var size int64 - if options.OnProgress != nil { - size = getReaderSize(r) - } - - for { - n, err := r.Read(buf) - if err == io.EOF { - break - } - - if err != nil { - if cancelErr := node.UploadCancel(sessionId); cancelErr != nil { - return "", fmt.Errorf("failed to upload chunk %v and failed to cancel upload session %v", err, cancelErr) - } - - return "", err - } - - if n == 0 { - break - } - - if err := node.UploadChunk(sessionId, buf[:n]); err != nil { - if cancelErr := node.UploadCancel(sessionId); cancelErr != nil { - return "", fmt.Errorf("failed to upload chunk %v and failed to cancel upload session %v", err, cancelErr) - } - - return "", err - } - - total += n - if options.OnProgress != nil && size > 0 { - percent := float64(total) / float64(size) * 100.0 - // The last block could be a bit over the size due to padding - // on the chunk size. - if percent > 100.0 { - percent = 100.0 - } - options.OnProgress(n, total, percent, nil) - } else if options.OnProgress != nil { - options.OnProgress(n, total, 0, nil) - } - } - - return node.UploadFinalize(sessionId) -} - -// UploadReaderAsync is the asynchronous version of UploadReader using a goroutine. -func (node StorageNode) UploadReaderAsync(options UploadOptions, r io.Reader, onDone func(cid string, err error)) { - go func() { - cid, err := node.UploadReader(options, r) - onDone(cid, err) - }() -} - -// UploadFile uploads a file to the Logos Storage node. -// It takes the upload options as parameter. -// It returns the CID of the uploaded file or an error. -// -// The options parameter contains the following fields: -// - filepath: the full path of the file to upload. -// - chunkSize: the size of each upload chunk, passed as `blockSize` to the Logos Storage node -// store. Default is to 64 KB. -// - onProgress: a callback function that is called after each chunk is uploaded with: -// - read: the number of bytes read in the last chunk. -// - total: the total number of bytes read so far. -// - percent: the percentage of the total file size that has been uploaded. It is -// determined from a `stat` call. -// - err: an error, if one occurred. -// -// If the chunk size is more than the `chunkSize` parameter, the callback is called after -// the block is actually stored in the block store. Otherwise, it is called after the chunk -// is sent to the stream. -// -// Internally, it calls UploadInit to create the upload session. -func (node StorageNode) UploadFile(options UploadOptions) (string, error) { - bridge := newBridgeCtx() - defer bridge.free() - - if options.OnProgress != nil { - stat, err := os.Stat(options.Filepath) - if err != nil { - return "", err - } - - size := stat.Size() - total := 0 - - if size > 0 { - bridge.onProgress = func(read int, _ []byte) { - if read == 0 { - return - } - - total += read - percent := float64(total) / float64(size) * 100.0 - // The last block could be a bit over the size due to padding - // on the chunk size. - if percent > 100.0 { - percent = 100.0 - } - - options.OnProgress(read, int(size), percent, nil) - } - } - } - - sessionId, err := node.UploadInit(&options) - if err != nil { - return "", err - } - - var cSessionId = C.CString(sessionId) - defer C.free(unsafe.Pointer(cSessionId)) - - if C.cGoStorageUploadFile(node.ctx, cSessionId, bridge.resp) != C.RET_OK { - return "", bridge.callError("cGoStorageUploadFile") - } - - return bridge.wait() -} - -// UploadFileAsync is the asynchronous version of UploadFile using a goroutine. -func (node StorageNode) UploadFileAsync(options UploadOptions, onDone func(cid string, err error)) { - go func() { - cid, err := node.UploadFile(options) - onDone(cid, err) - }() -} - -func (node StorageNode) UpdateLogLevel(logLevel string) error { - bridge := newBridgeCtx() - defer bridge.free() - - var cLogLevel = C.CString(string(logLevel)) - defer C.free(unsafe.Pointer(cLogLevel)) - - if C.cGoStorageLogLevel(node.ctx, cLogLevel, bridge.resp) != C.RET_OK { - return bridge.callError("cGoStorageLogLevel") - } - - _, err := bridge.wait() - return err -} - -func (node StorageNode) Exists(cid string) (bool, error) { - bridge := newBridgeCtx() - defer bridge.free() - - var cCid = C.CString(cid) - defer C.free(unsafe.Pointer(cCid)) - - if C.cGoStorageExists(node.ctx, cCid, bridge.resp) != C.RET_OK { - return false, bridge.callError("cGoStorageUploadCancel") - } - - result, err := bridge.wait() - return result == "true", err -} - -func main() { - dataDir := os.TempDir() + "/data-dir" - - node, err := New(Config{ - BlockRetries: 5, - LogLevel: "WARN", - DataDir: dataDir, - }) - if err != nil { - log.Fatalf("Failed to create Logos Storage node: %v", err) - } - defer os.RemoveAll(dataDir) - - if err := node.Start(); err != nil { - log.Fatalf("Failed to start Logos Storage node: %v", err) - } - log.Println("Logos Storage node started") - - version, err := node.Version() - if err != nil { - log.Fatalf("Failed to get Logos Storage version: %v", err) - } - log.Printf("Logos Storage version: %s", version) - - err = node.UpdateLogLevel("ERROR") - if err != nil { - log.Fatalf("Failed to update log level: %v", err) - } - - cid := "zDvZRwzmAkhzDRPH5EW242gJBNZ2T7aoH2v1fVH66FxXL4kSbvyM" - exists, err := node.Exists(cid) - if err != nil { - log.Fatalf("Failed to check data existence: %v", err) - } - - if exists { - log.Fatalf("The data should not exist") - } - - buf := bytes.NewBuffer([]byte("Hello World!")) - len := buf.Len() - cid, err = node.UploadReader(UploadOptions{Filepath: "hello.txt"}, buf) - if err != nil { - log.Fatalf("Failed to upload data: %v", err) - } - log.Printf("Uploaded data with CID: %s (size: %d bytes)", cid, len) - - exists, err = node.Exists(cid) - if err != nil { - log.Fatalf("Failed to check data existence: %v", err) - } - - if !exists { - log.Fatalf("The data should exist") - } - - // Wait for a SIGINT or SIGTERM signal - ch := make(chan os.Signal, 1) - signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) - <-ch - - if err := node.Stop(); err != nil { - log.Fatalf("Failed to stop Storage node: %v", err) - } - log.Println("Logos Storage node stopped") - - if err := node.Destroy(); err != nil { - log.Fatalf("Failed to destroy Logos Storage node: %v", err) - } -} diff --git a/library/README.md b/library/README.md index 655cd9c8..73f5c551 100644 --- a/library/README.md +++ b/library/README.md @@ -34,4 +34,477 @@ sequenceDiagram Ctx-->>C: forward callback C-->>Go: forward callback Go-->>App: done -``` \ No newline at end of file +``` + +## C API + +C-exported interface for the Logos Storage shared library. + +This API provides a C-compatible interface to the internal Nim implementation of Logos Storage. + +Unless explicitly stated otherwise, all functions are asynchronous and execute their work on a separate thread, returning results via the provided callback. The `int` return value is the synchronous status of dispatch: +- `RET_OK`: job dispatched to the worker thread +- `RET_ERR`: immediate failure +- `RET_MISSING_CALLBACK`: callback is missing + +Some functions may emit progress updates via the callback using `RET_PROGRESS`, and finally complete with `RET_OK` or `RET_ERR`. + +The `msg` parameter can carry different kinds of data depending on the return code: + +- If ret is `RET_ERR`, msg contains an error message. +- If ret is `RET_OK`, msg contains the response data as a string. +- If ret is `RET_PROGRESS`, msg contains a chunk of data. + +The `len` parameter specifies the length of the data pointed to by msg. + +--- + +## Types + +### `StorageCallback` + +```c +typedef void (*StorageCallback)(int callerRet, const char *msg, size_t len, void *userData); +``` + +--- + +## Return codes + +```c +#define RET_OK 0 +#define RET_ERR 1 +#define RET_MISSING_CALLBACK 2 +#define RET_PROGRESS 3 +``` + +--- + +## Context lifecycle + +### `storage_new` + +Create a new instance of a Logos Storage node. + +```c +void *storage_new( + const char *configJson, + StorageCallback callback, + void *userData +); +``` + +- `configJson`: JSON string with configuration overwriting defaults +- Returns an opaque context pointer used for subsequent calls + +Typical usage: +- `storage_new(...)` +- `storage_start(...)` +- `storage_stop(...)` +- `storage_destroy(...)` + +--- + +### `storage_start` + +Start the Logos Storage node (can be started/stopped multiple times). + +```c +int storage_start(void *ctx, StorageCallback callback, void *userData); +``` + +--- + +### `storage_stop` + +Stop the Logos Storage node (can be started/stopped multiple times). + +```c +int storage_stop(void *ctx, StorageCallback callback, void *userData); +``` + +--- + +### `storage_close` + +Close the node and release resources before destruction. + +```c +int storage_close(void *ctx, StorageCallback callback, void *userData); +``` + +--- + +### `storage_destroy` + +Destroy the node instance and free associated resources. Node must be stopped and closed. + +```c +int storage_destroy(void *ctx, StorageCallback callback, void *userData); +``` + +--- + +## Version + +### `storage_version` + +Get the Logos Storage version string. +Does not require the node to be started and does not involve a thread call. + +```c +int storage_version(void *ctx, StorageCallback callback, void *userData); +``` + +--- + +### `storage_revision` + +Get the Logos Storage contracts revision. +Does not require the node to be started and does not involve a thread call. + +```c +int storage_revision(void *ctx, StorageCallback callback, void *userData); +``` + +--- + +### `storage_repo` + +Get the repo (data-dir) used by the node. + +```c +int storage_repo(void *ctx, StorageCallback callback, void *userData); +``` + +--- + +## Debug + +### `storage_debug` + +Retrieve debug information (JSON). + +```c +int storage_debug(void *ctx, StorageCallback callback, void *userData); +``` + +--- + +### `storage_spr` + +Get the node's Signed Peer Record (SPR). + +```c +int storage_spr(void *ctx, StorageCallback callback, void *userData); +``` + +--- + +### `storage_peer_id` + +Get the node's peer ID (libp2p Peer Identity). + +```c +int storage_peer_id(void *ctx, StorageCallback callback, void *userData); +``` + +--- + +### `storage_peer_debug` + +Request debug information for a given peer ID. +Only available if compiled with `storage_enable_api_debug_peers`. + +```c +int storage_peer_debug(void *ctx, const char *peerId, StorageCallback callback, void *userData); +``` + +--- + +## Logging + +### `storage_log_level` + +Set the log level at run time. +`logLevel` can be: `TRACE`, `DEBUG`, `INFO`, `NOTICE`, `WARN`, `ERROR`, `FATAL`. + +```c +int storage_log_level( + void *ctx, + const char *logLevel, + StorageCallback callback, + void *userData +); +``` + +--- + +## Networking + +### `storage_connect` + +Connect to a peer by using `peerAddresses` if provided, otherwise use `peerId`. + +Note that the `peerId` has to be advertised in the DHT for this to work. + +```c +int storage_connect( + void *ctx, + const char *peerId, + const char **peerAddresses, + size_t peerAddressesSize, + StorageCallback callback, + void *userData +); +``` + +--- + +## Upload + +### `storage_upload_init` + +Initialize an upload session for a file. + +- `filepath`: absolute path for file upload; for chunk uploads it's the file name. The metadata filename and mime type are derived from this value. +- `chunkSize`: chunk size for upload (default: `1024 * 64` bytes) +- Callback returns the `sessionId` + +```c +int storage_upload_init( + void *ctx, + const char *filepath, + size_t chunkSize, + StorageCallback callback, + void *userData +); +``` + +--- + +### `storage_upload_chunk` + +Upload a chunk for the given `sessionId`. + +```c +int storage_upload_chunk( + void *ctx, + const char *sessionId, + const uint8_t *chunk, + size_t len, + StorageCallback callback, + void *userData +); +``` + +--- + +### `storage_upload_finalize` + +Finalize an upload session identified by `sessionId`. +Callback returns the `cid` of the uploaded content. + +```c +int storage_upload_finalize( + void *ctx, + const char *sessionId, + StorageCallback callback, + void *userData +); +``` + +--- + +### `storage_upload_cancel` + +Cancel an ongoing upload session. + +```c +int storage_upload_cancel( + void *ctx, + const char *sessionId, + StorageCallback callback, + void *userData +); +``` + +--- + +### `storage_upload_file` + +Upload the file defined as `filepath` in the init method. + +- Callback may be called with `RET_PROGRESS` during upload (depending on chunk size constraints) +- Callback returns the `cid` of the uploaded content + +```c +int storage_upload_file( + void *ctx, + const char *sessionId, + StorageCallback callback, + void *userData +); +``` + +--- + +## Download API + +### `storage_download_init` + +Initialize a download for `cid`. + +- `chunkSize`: chunk size for download (default: `1024 * 64` bytes) +- `local`: attempt local store retrieval only + +```c +int storage_download_init( + void *ctx, + const char *cid, + size_t chunkSize, + bool local, + StorageCallback callback, + void *userData +); +``` + +--- + +### `storage_download_stream` + +Perform a streaming download for `cid`. Init must have been called prior. + +- If `filepath` is provided, content is written to that file. +- Callback may be called with `RET_PROGRESS` updates during download. +- `local` indicates whether to attempt local store retrieval only. + +```c +int storage_download_stream( + void *ctx, + const char *cid, + size_t chunkSize, + bool local, + const char *filepath, + StorageCallback callback, + void *userData +); +``` + +--- + +### `storage_download_chunk` + +Download a chunk for the given `cid`. Init must have been called prior. +Chunk returned via callback using `RET_PROGRESS`. + +```c +int storage_download_chunk( + void *ctx, + const char *cid, + StorageCallback callback, + void *userData +); +``` + +--- + +### `storage_download_cancel` + +Cancel an ongoing download for `cid`. + +```c +int storage_download_cancel( + void *ctx, + const char *cid, + StorageCallback callback, + void *userData +); +``` + +--- + +### `storage_download_manifest` + +Retrieve the manifest for the given `cid` (JSON). + +```c +int storage_download_manifest( + void *ctx, + const char *cid, + StorageCallback callback, + void *userData +); +``` + +--- + +## Storage operations + +### `storage_list` + +Retrieve the list of manifests stored in the node. + +```c +int storage_list(void *ctx, StorageCallback callback, void *userData); +``` + +--- + +### `storage_space` + +Retrieve storage space information (JSON). + +```c +int storage_space(void *ctx, StorageCallback callback, void *userData); +``` + +--- + +### `storage_delete` + +Delete the content identified by `cid`. + +```c +int storage_delete(void *ctx, const char *cid, StorageCallback callback, void *userData); +``` + +--- + +### `storage_fetch` + +Fetch content identified by `cid` from the network into local store +in background. The callback will not receive progress updates. + +```c +int storage_fetch(void *ctx, const char *cid, StorageCallback callback, void *userData); +``` + +--- + +### `storage_exists` + +Check if content identified by `cid` exists in local store. + +```c +int storage_exists(void *ctx, const char *cid, StorageCallback callback, void *userData); +``` + + +### `storage_set_event_callback` + +Not used currently. Reserved for future use to set an event callback. + +```c +void storage_set_event_callback(void *ctx, StorageCallback callback, void *userData); +``` + +--- + +## Go wrapper + +A Go wrapper is available [here](https://github.com/logos-storage/logos-storage-go-bindings). + +## Rust Wrapper + +A Rust wrapper is available [here](https://github.com/nipsysdev/codex-rust-bindings). \ No newline at end of file diff --git a/library/libstorage.h b/library/libstorage.h index 76d5e2e1..458cd313 100644 --- a/library/libstorage.h +++ b/library/libstorage.h @@ -1,203 +1,397 @@ /** -* libstorage.h - C Interface for Example Library -* -* This header provides the public API for libstorage -* -* To see the auto-generated header by Nim, run `make libstorage` from the -* repository root. The generated file will be created at: -* nimcache/release/libstorage/libstorage.h -*/ + * libstorage.h - C-exported interface for the Storage shared library + * + * This file implements the public C API for libstorage. It acts as the bridge + * between C programs and the internal Nim implementation. + * + * Unless it is explicitly stated otherwise, all functions are asynchronous and execute + * their work on a separate thread, returning results via the provided callback. The + * result code of the function represents the synchronous status of the call itself: + * returning RET_OK if the job has been dispatched to the thread, and RET_ERR in case + * of immediate failure. + * + * The callback function is invoked with the result of the operation, including + * any data or error messages. If the call was successful, `callerRet` will be RET_OK, + * and `msg` will contain the result data. If there was an error, `callerRet` will be RET_ERR, + * and `msg` will contain the error message. + * + * When a function supports progress updates, it may invoke the callback multiple times: + * first with RET_PROGRESS and progress information, and finally with RET_OK or RET_ERR + * upon completion. The msg parameter will a chunk of data for upload and download operations. + * + * `userData` is a pointer provided by the caller that is passed back to the callback + * for context. + */ #ifndef __libstorage__ #define __libstorage__ #include #include +#include // The possible returned values for the functions that return int -#define RET_OK 0 -#define RET_ERR 1 -#define RET_MISSING_CALLBACK 2 -#define RET_PROGRESS 3 +#define RET_OK 0 +#define RET_ERR 1 +#define RET_MISSING_CALLBACK 2 + +// RET_PROGRESS is used to indicate that the callback is being +// with progress updates. +#define RET_PROGRESS 3 #ifdef __cplusplus -extern "C" { +extern "C" +{ #endif -typedef void (*StorageCallback) (int callerRet, const char* msg, size_t len, void* userData); + typedef void (*StorageCallback)(int callerRet, const char *msg, size_t len, void *userData); -void* storage_new( - const char* configJson, - StorageCallback callback, - void* userData); + // Create a new instance of a Logos Storage node. + // `configJson` is a JSON string with the configuration overwriting defaults. + // Returns a pointer to the StorageContext used to interact with the node. + // + // Typical usage: + // ctx = storage_new(configJson, myCallback, myUserData); + // storage_start(ctx, ...); + // ... + // storage_stop(ctx, ...); + // storage_destroy(ctx, ...); + void *storage_new( + const char *configJson, + StorageCallback callback, + void *userData); -int storage_version( - void* ctx, - StorageCallback callback, - void* userData); + // Get the Logos Storage version string. + // This call does not require the node to be started and + // does not involve a thread call. + int storage_version( + void *ctx, + StorageCallback callback, + void *userData); -int storage_revision( - void* ctx, - StorageCallback callback, - void* userData); + // Get the Logos Storage contracts revision. + // This call does not require the node to be started and + // does not involve a thread call. + int storage_revision( + void *ctx, + StorageCallback callback, + void *userData); -int storage_repo( - void* ctx, - StorageCallback callback, - void* userData); + // Get the repo (data-dir) used by the node. + int storage_repo( + void *ctx, + StorageCallback callback, + void *userData); -int storage_debug( - void* ctx, - StorageCallback callback, - void* userData); + // Retrieve debug information (JSON). + // + // Here is an example of the returned JSON structure: + // { + // "id": "...", + // "addrs": ["..."], + // "spr": "", + // "announceAddresses": ["..."], + // "table": { + // "localNode": "", + // "nodes": [ + // { + // "nodeId": "...", + // "peerId": "...", + // "record": "...", + // "address": "...", + // "seen": true, + // } + // ] + // } + int storage_debug( + void *ctx, + StorageCallback callback, + void *userData); -int storage_spr( - void* ctx, - StorageCallback callback, - void* userData); + /// Get the node's (Signed Peer Record) + int storage_spr( + void *ctx, + StorageCallback callback, + void *userData); -int storage_peer_id( - void* ctx, - StorageCallback callback, - void* userData); + // Get the node's peer ID. + // Peer Identity reference as specified at + // https://docs.libp2p.io/concepts/fundamentals/peers/ + int storage_peer_id( + void *ctx, + StorageCallback callback, + void *userData); -int storage_log_level( - void* ctx, - const char* logLevel, - StorageCallback callback, - void* userData); + // Set the log level at run time. + // `logLevel` can be one of: + // TRACE, DEBUG, INFO, NOTICE, WARN, ERROR or FATAL + int storage_log_level( + void *ctx, + const char *logLevel, + StorageCallback callback, + void *userData); -int storage_connect( - void* ctx, - const char* peerId, - const char** peerAddresses, - size_t peerAddressesSize, - StorageCallback callback, - void* userData); + // Connect to a peer by using `peerAddresses` if provided, otherwise use `peerId`. + // Note that the `peerId` has to be advertised in the DHT for this to work. + int storage_connect( + void *ctx, + const char *peerId, + const char **peerAddresses, + size_t peerAddressesSize, + StorageCallback callback, + void *userData); -int storage_peer_debug( - void* ctx, - const char* peerId, - StorageCallback callback, - void* userData); + // Request debug information for a given peer ID. + // This api is only available if the library was compiled with + // `storage_enable_api_debug_peers` argument. + // + // Here is an example of the returned JSON structure: + // { + // "peerId": "...", + // "seqNo": 0, + // "addresses": [], + // } + int storage_peer_debug( + void *ctx, + const char *peerId, + StorageCallback callback, + void *userData); + // Initialize an upload session for a file. + // `filepath` for a file upload, this is the absolute path to the file + // to be uploaded. For an upload using chunks, this is the name of the file. + // The metadata filename and mime type are derived from this value. + // + // `chunkSize` defines the size of each chunk to be used during upload. + // The default value is the default block size 1024 * 64 bytes. + // + // The callback returns the `sessionId` for the download session created. + // + // Typical usage: + // storage_upload_init(ctx, filepath, chunkSize, myCallback, myUserData); + // ... + // storage_upload_chunk(ctx, sessionId, chunk, len, myCallback, myUserData); + // ... + // storage_upload_finalize(ctx, sessionId, myCallback, myUserData); + int storage_upload_init( + void *ctx, + const char *filepath, + size_t chunkSize, + StorageCallback callback, + void *userData); -int storage_upload_init( - void* ctx, - const char* filepath, - size_t chunkSize, - StorageCallback callback, - void* userData); + // Upload a chunk for the given `sessionId`. + int storage_upload_chunk( + void *ctx, + const char *sessionId, + const uint8_t *chunk, + size_t len, + StorageCallback callback, + void *userData); -int storage_upload_chunk( - void* ctx, - const char* sessionId, - const uint8_t* chunk, - size_t len, - StorageCallback callback, - void* userData); + // Finalize an upload session identified by `sessionId`. + // The callback returns the `cid` of the uploaded content. + int storage_upload_finalize( + void *ctx, + const char *sessionId, + StorageCallback callback, + void *userData); -int storage_upload_finalize( - void* ctx, - const char* sessionId, - StorageCallback callback, - void* userData); + // Cancel an ongoing upload session. + int storage_upload_cancel( + void *ctx, + const char *sessionId, + StorageCallback callback, + void *userData); -int storage_upload_cancel( - void* ctx, - const char* sessionId, - StorageCallback callback, - void* userData); + // Upload the file defined as `filepath` in the init method. + // The callback will be called with RET_PROGRESS updates during the upload, + // if the chunk size is equal or greater than the session chunkSize. + // + // The callback returns the `cid` of the uploaded content. + // + // Typical usage: + // storage_upload_init(ctx, filepath, chunkSize, myCallback, myUserData); + // ... + // storage_upload_file(ctx, sessionId, myCallback, myUserData); + int storage_upload_file( + void *ctx, + const char *sessionId, + StorageCallback callback, + void *userData); -int storage_upload_file( - void* ctx, - const char* sessionId, - StorageCallback callback, - void* userData); + // Initialize a download for `cid`. + // `chunkSize` defines the size of each chunk to be used during download. + // The default value is the default block size 1024 * 64 bytes. + // `local` indicates whether to attempt local store retrieval only. + // + // Typical usage: + // storage_download_init(ctx, cid, chunkSize, local, myCallback, myUserData); + // ... + // storage_download_stream(ctx, cid, filepath, myCallback, myUserData); + int storage_download_init( + void *ctx, + const char *cid, + size_t chunkSize, + bool local, + StorageCallback callback, + void *userData); -int storage_download_stream( - void* ctx, - const char* cid, - size_t chunkSize, - bool local, - const char* filepath, - StorageCallback callback, - void* userData); + // Perform a streaming download for `cid`. + // The init method must have been called prior to this. + // If filepath is provided, the content will be written to that file. + // The callback will be called with RET_PROGRESS updates during the download/ + // `local` indicates whether to attempt local store retrieval only. + // + // Typical usage: + // storage_download_init(ctx, cid, chunkSize, local, myCallback, myUserData); + // ... + // storage_download_stream(ctx, cid, filepath, myCallback, myUserData); + int storage_download_stream( + void *ctx, + const char *cid, + size_t chunkSize, + bool local, + const char *filepath, + StorageCallback callback, + void *userData); -int storage_download_init( - void* ctx, - const char* cid, - size_t chunkSize, - bool local, - StorageCallback callback, - void* userData); + // Download a chunk for the given `cid`. + // The init method must have been called prior to this. + // The chunk will be returned via the callback using `RET_PROGRESS`. + int storage_download_chunk( + void *ctx, + const char *cid, + StorageCallback callback, + void *userData); -int storage_download_chunk( - void* ctx, - const char* cid, - StorageCallback callback, - void* userData); + // Cancel an ongoing download for `cid`. + int storage_download_cancel( + void *ctx, + const char *cid, + StorageCallback callback, + void *userData); -int storage_download_cancel( - void* ctx, - const char* cid, - StorageCallback callback, - void* userData); + // Retrieve the manifest for the given `cid`. + // + // Here is an example of the returned manifest JSON structure: + // { + // "treeCid": "zDzSvJTf8JYwvysKPmG7BtzpbiAHfuwFMRphxm4hdvnMJ4XPJjKX", + // "datasetSize": 123456, + // "blockSize": 65536, + // "filename": "example.txt", + // "mimetype": "text/plain", + // "protected": false + // } + int storage_download_manifest( + void *ctx, + const char *cid, + StorageCallback callback, + void *userData); -int storage_download_manifest( - void* ctx, - const char* cid, - StorageCallback callback, - void* userData); + // Retrieve the list of the manifests stored in the node. + int storage_list( + void *ctx, + StorageCallback callback, + void *userData); -int storage_list( - void* ctx, - StorageCallback callback, - void* userData); + // Retrieve the storage space information. + // + // Here is an example of the returned JSON structure: + // { + // "totalBlocks": 100000, + // "quotaMaxBytes": 0, + // "quotaUsedBytes": 0, + // "quotaReservedBytes": 0 + // } + int storage_space( + void *ctx, + StorageCallback callback, + void *userData); -int storage_space( - void* ctx, - StorageCallback callback, - void* userData); + // Delete the content identified by `cid`. + int storage_delete( + void *ctx, + const char *cid, + StorageCallback callback, + void *userData); -int storage_delete( - void* ctx, - const char* cid, - StorageCallback callback, - void* userData); + // Fetch the content identified by `cid` from the network into + // local store. + // The download is done in background so the callback + // will not receive progress updates. + int storage_fetch( + void *ctx, + const char *cid, + StorageCallback callback, + void *userData); -int storage_fetch( - void* ctx, - const char* cid, - StorageCallback callback, - void* userData); + // Check if the content identified by `cid` exists in local store. + int storage_exists( + void *ctx, + const char *cid, + StorageCallback callback, + void *userData); -int storage_exists( - void* ctx, - const char* cid, - StorageCallback callback, - void* userData); + // Start the Logos Storage node. + // The node can be started and stopped multiple times. + // + // Typical usage: + // ctx = storage_new(configJson, myCallback, myUserData); + // storage_start(ctx, ...); + // ... + // storage_stop(ctx, ...); + // storage_destroy(ctx, ...); + int storage_start(void *ctx, + StorageCallback callback, + void *userData); -int storage_start(void* ctx, - StorageCallback callback, - void* userData); + // Stop the Logos Storage node. + // The node can be started and stopped multiple times. + // + // Typical usage: + // ctx = storage_new(configJson, myCallback, myUserData); + // storage_start(ctx, ...); + // ... + // storage_stop(ctx, ...); + // storage_destroy(ctx, ...); + int storage_stop(void *ctx, + StorageCallback callback, + void *userData); -int storage_stop(void* ctx, - StorageCallback callback, - void* userData); + // Close the Logos Storage node. + // Use this to release resources before destroying the node. + // + // Typical usage: + // ctx = storage_new(configJson, myCallback, myUserData); + // storage_start(ctx, ...); + // ... + // storage_stop(ctx, ...); + // storage_close(ctx, ...); + int storage_close(void *ctx, + StorageCallback callback, + void *userData); -int storage_close(void* ctx, - StorageCallback callback, - void* userData); + // Destroys an instance of a Logos Storage node. + // This will free all resources associated with the node. + // The node must be stopped and closed before calling this function. + // + // Typical usage: + // ctx = storage_new(configJson, myCallback, myUserData); + // storage_start(ctx, ...); + // ... + // storage_stop(ctx, ...); + // storage_close(ctx, ...); + // storage_destroy(ctx, ...); + int storage_destroy(void *ctx, + StorageCallback callback, + void *userData); -// Destroys an instance of a Logos Storage node created with storage_new -int storage_destroy(void* ctx, - StorageCallback callback, - void* userData); - -void storage_set_event_callback(void* ctx, - StorageCallback callback, - void* userData); + // Not used currently. + // Reserved for future use to set an event callback. + void storage_set_event_callback(void *ctx, + StorageCallback callback, + void *userData); #ifdef __cplusplus } From e894fb03fac6784ae48780cc998419aab3e5886b Mon Sep 17 00:00:00 2001 From: Giuliano Mega Date: Fri, 16 Jan 2026 13:49:04 -0300 Subject: [PATCH 4/4] feat: primitives for sampling with and without replacement (#1125) --- codex/rng.nim | 36 ++++++++++++++++++++++++++++++++++-- tests/codex/testrng.nim | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 2 deletions(-) create mode 100644 tests/codex/testrng.nim diff --git a/codex/rng.nim b/codex/rng.nim index d36da9d7..3b46f963 100644 --- a/codex/rng.nim +++ b/codex/rng.nim @@ -7,8 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -{.push raises: [], gcsafe.} - +import std/sugar import pkg/libp2p/crypto/crypto import pkg/bearssl/rand @@ -36,9 +35,42 @@ proc rand*(rng: Rng, max: Natural): int = if x < randMax - (randMax mod (uint64(max) + 1'u64)): # against modulo bias return int(x mod (uint64(max) + 1'u64)) +proc sampleNoReplacement[T](a: seq[T], n: int): seq[T] {.raises: [RngSampleError].} = + if n > a.len: + raise newException( + RngSampleError, + "Cannot sample " & $n & " elements from a set of size " & $a.len & + " without replacement.", + ) + + if n == a.len: + return a + + var x = a + collect: + for i in 0 ..< n: + swap(x[i], x[i + rng.rand(x.len - i - 1)]) + x[i] + +proc sampleWithReplacement[T](a: seq[T], n: int): seq[T] = + collect: + for i in 0 ..< n: + a[rng.rand(a.high)] + proc sample*[T](rng: Rng, a: openArray[T]): T = result = a[rng.rand(a.high)] +proc sample*[T]( + rng: Rng, a: seq[T], n: int, replace: bool = false +): seq[T] {.raises: [RngSampleError].} = + ## Sample `n` elements from a set `a` with or without replacement. + ## In case of sampling without replacement, `n` must not be greater than the + ## size of `a`. + if replace: + sampleWithReplacement(a, n) + else: + sampleNoReplacement(a, n) + proc sample*[T]( rng: Rng, sample, exclude: openArray[T] ): T {.raises: [Defect, RngSampleError].} = diff --git a/tests/codex/testrng.nim b/tests/codex/testrng.nim new file mode 100644 index 00000000..f97a253b --- /dev/null +++ b/tests/codex/testrng.nim @@ -0,0 +1,37 @@ +import std/unittest +import std/sequtils +import std/sets + +import ../../codex/rng + +suite "Random Number Generator (RNG)": + let rng = Rng.instance() + + test "should sample with replacement": + let elements = toSeq(1 .. 10) + + let sample = rng.sample(elements, n = 15, replace = true) + check sample.len == 15 + for element in sample: + check element in elements + + test "should sample without replacement": + let elements = toSeq(1 .. 10) + + # If we were not drawing without replacement, there'd be a 1/2 chance + # that we'd draw the same element twice in a sample of size 5. + # Running this 40 times gives enough assurance. + var seen: array[10, bool] + for i in 1 .. 40: + let sample = rng.sample(elements, n = 5, replace = false) + + check sample.len == 5 + check sample.toHashSet.len == 5 + + for element in sample: + seen[element - 1] = true + + # There's a 1/2 chance we'll see an element for each draw we do. + # After 40 draws, we are reasonably sure we've seen every element. + for seen in seen: + check seen