diff --git a/.github/actions/nimbus-build-system/action.yml b/.github/actions/nimbus-build-system/action.yml index c7bdb627..219966db 100644 --- a/.github/actions/nimbus-build-system/action.yml +++ b/.github/actions/nimbus-build-system/action.yml @@ -97,6 +97,33 @@ runs: # Set GCC-14 as the default sudo update-alternatives --set gcc /usr/bin/gcc-14 + - name: Install ccache on Linux/Mac + if: inputs.os == 'linux' || inputs.os == 'macos' + uses: hendrikmuhs/ccache-action@v1.2 + with: + create-symlink: true + key: ${{ matrix.os }}-${{ matrix.builder }}-${{ matrix.cpu }}-${{ matrix.tests }}-${{ matrix.nim_version }} + evict-old-files: 7d + + - name: Install ccache on Windows + if: inputs.os == 'windows' + uses: hendrikmuhs/ccache-action@v1.2 + with: + key: ${{ matrix.os }}-${{ matrix.builder }}-${{ matrix.cpu }}-${{ matrix.tests }}-${{ matrix.nim_version }} + evict-old-files: 7d + + - name: Enable ccache on Windows + if: inputs.os == 'windows' + shell: ${{ inputs.shell }} {0} + run: | + CCACHE_DIR=$(dirname $(which ccache))/ccached + mkdir ${CCACHE_DIR} + ln -s $(which ccache) ${CCACHE_DIR}/gcc.exe + ln -s $(which ccache) ${CCACHE_DIR}/g++.exe + ln -s $(which ccache) ${CCACHE_DIR}/cc.exe + ln -s $(which ccache) ${CCACHE_DIR}/c++.exe + echo "export PATH=${CCACHE_DIR}:\$PATH" >> $HOME/.bash_profile # prefix path in MSYS2 + - name: Derive environment variables shell: ${{ inputs.shell }} {0} run: | @@ -154,8 +181,11 @@ runs: llvm_bin_dir="${llvm_dir}/bin" llvm_lib_dir="${llvm_dir}/lib" echo "${llvm_bin_dir}" >> ${GITHUB_PATH} + # Make sure ccache has precedence (GITHUB_PATH is appending before) + echo "$(brew --prefix)/opt/ccache/libexec" >> ${GITHUB_PATH} + echo $PATH echo "LDFLAGS=${LDFLAGS} -L${libomp_lib_dir} -L${llvm_lib_dir} -Wl,-rpath,${llvm_lib_dir}" >> ${GITHUB_ENV} - NIMFLAGS="${NIMFLAGS} $(quote "-d:LeopardCmakeFlags='-DCMAKE_BUILD_TYPE=Release -DCMAKE_C_COMPILER=${llvm_bin_dir}/clang -DCMAKE_CXX_COMPILER=${llvm_bin_dir}/clang++' -d:LeopardExtraCompilerlags='-fopenmp' -d:LeopardExtraLinkerFlags='-fopenmp -L${libomp_lib_dir}'")" + NIMFLAGS="${NIMFLAGS} $(quote "-d:LeopardCmakeFlags='-DCMAKE_BUILD_TYPE=Release' -d:LeopardExtraCompilerFlags='-fopenmp' -d:LeopardExtraLinkerFlags='-fopenmp -L${libomp_lib_dir}'")" echo "NIMFLAGS=${NIMFLAGS}" >> $GITHUB_ENV fi @@ -191,6 +221,7 @@ runs: - name: Build Nim and Codex dependencies shell: ${{ inputs.shell }} {0} run: | + which gcc gcc --version make -j${ncpu} CI_CACHE=NimBinaries ${ARCH_OVERRIDE} QUICK_AND_DIRTY_COMPILER=1 update echo diff --git a/.github/workflows/docker-reusable.yml b/.github/workflows/docker-reusable.yml index f0e46d95..7d937f78 100644 --- a/.github/workflows/docker-reusable.yml +++ b/.github/workflows/docker-reusable.yml @@ -94,11 +94,11 @@ jobs: - target: os: linux arch: amd64 - builder: ubuntu-22.04 + builder: ubuntu-24.04 - target: os: linux arch: arm64 - builder: ubuntu-22.04-arm + builder: ubuntu-24.04-arm name: Build ${{ matrix.target.os }}/${{ matrix.target.arch }} runs-on: ${{ matrix.builder }} diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 50b14d05..4232ff0f 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -2,17 +2,17 @@ name: OpenAPI on: push: - branches: - - 'master' + tags: + - "v*.*.*" paths: - - 'openapi.yaml' - - '.github/workflows/docs.yml' + - "openapi.yaml" + - ".github/workflows/docs.yml" pull_request: branches: - - '**' + - "**" paths: - - 'openapi.yaml' - - '.github/workflows/docs.yml' + - "openapi.yaml" + - ".github/workflows/docs.yml" # Sets permissions of the GITHUB_TOKEN to allow deployment to GitHub Pages permissions: @@ -40,7 +40,7 @@ jobs: deploy: name: Deploy runs-on: ubuntu-latest - if: github.ref == 'refs/heads/master' + if: startsWith(github.ref, 'refs/tags/') steps: - name: Checkout uses: actions/checkout@v4 diff --git a/build.nims b/build.nims index 5e190f6b..baf21e03 100644 --- a/build.nims +++ b/build.nims @@ -2,25 +2,8 @@ mode = ScriptMode.Verbose import std/os except commandLineParams -const VendorPath = "vendor/nim-nat-traversal/vendor/libnatpmp-upstream" -let - oldVersionFile = joinPath(VendorPath, "VERSION") - newVersionFile = joinPath(VendorPath, "VERSION_temp") - -proc renameFile(oldName, newName: string) = - if fileExists(oldName): - mvFile(oldName, newName) - else: - echo "File ", oldName, " does not exist" - - ### Helper functions proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") = - # This is a quick workaround to avoid VERSION file conflict on macOS - # More details here: https://github.com/codex-storage/nim-codex/issues/1059 - if defined(macosx): - renameFile(oldVersionFile, newVersionFile) - if not dirExists "build": mkDir "build" @@ -30,25 +13,25 @@ proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") = for param in commandLineParams(): extra_params &= " " & param else: - for i in 2.. 0: diff --git a/codex/codex.nim b/codex/codex.nim index 13985254..dc577373 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -11,8 +11,10 @@ import std/sequtils import std/strutils import std/os import std/tables +import std/cpuinfo import pkg/chronos +import pkg/taskpools import pkg/presto import pkg/libp2p import pkg/confutils @@ -107,7 +109,9 @@ proc bootstrapInteractions(s: CodexServer): Future[void] {.async.} = quit QuitFailure let marketplace = Marketplace.new(marketplaceAddress, signer) - let market = OnChainMarket.new(marketplace, config.rewardRecipient) + let market = OnChainMarket.new( + marketplace, config.rewardRecipient, config.marketplaceRequestCacheSize + ) let clock = OnChainClock.new(provider) var client: ?ClientInteractions @@ -194,7 +198,18 @@ proc new*( .withTcpTransport({ServerFlags.ReuseAddr}) .build() - var cache: CacheStore = nil + var + cache: CacheStore = nil + taskpool: Taskpool + + try: + if config.numThreads == ThreadCount(0): + taskpool = Taskpool.new(numThreads = min(countProcessors(), 16)) + else: + taskpool = Taskpool.new(numThreads = int(config.numThreads)) + info "Threadpool started", numThreads = taskpool.numThreads + except CatchableError as exc: + raiseAssert("Failure in taskpool initialization:" & exc.msg) if config.cacheSize > 0'nb: cache = CacheStore.new(cacheSize = config.cacheSize) @@ -286,6 +301,7 @@ proc new*( engine = engine, discovery = discovery, prover = prover, + taskPool = taskpool, ) restServer = RestServerRef diff --git a/codex/conf.nim b/codex/conf.nim index 6d47f8f4..2a859efb 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -44,6 +44,7 @@ import ./utils import ./nat import ./utils/natutils +from ./contracts/config import DefaultRequestCacheSize from ./validationconfig import MaxSlots, ValidationGroups export units, net, codextypes, logutils, completeCmdArg, parseCmdArg, NatConfig @@ -51,7 +52,11 @@ export ValidationGroups, MaxSlots export DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockMaintenanceInterval, - DefaultNumberOfBlocksToMaintainPerInterval + DefaultNumberOfBlocksToMaintainPerInterval, DefaultRequestCacheSize + +type ThreadCount* = distinct Natural + +proc `==`*(a, b: ThreadCount): bool {.borrow.} proc defaultDataDir*(): string = let dataDir = @@ -71,6 +76,7 @@ const DefaultDataDir* = defaultDataDir() DefaultCircuitDir* = defaultDataDir() / "circuits" + DefaultThreadCount* = ThreadCount(0) type StartUpCmd* {.pure.} = enum @@ -184,6 +190,13 @@ type name: "max-peers" .}: int + numThreads* {. + desc: + "Number of worker threads (\"0\" = use as many threads as there are CPU cores available)", + defaultValue: DefaultThreadCount, + name: "num-threads" + .}: ThreadCount + agentString* {. defaultValue: "Codex", desc: "Node agent string which is used as identifier in network", @@ -347,6 +360,16 @@ type name: "reward-recipient" .}: Option[EthAddress] + marketplaceRequestCacheSize* {. + desc: + "Maximum number of StorageRequests kept in memory." & + "Reduces fetching of StorageRequest data from the contract.", + defaultValue: DefaultRequestCacheSize, + defaultValueDesc: $DefaultRequestCacheSize, + name: "request-cache-size", + hidden + .}: uint16 + case persistenceCmd* {.defaultValue: noCmd, command.}: PersistenceCmd of PersistenceCmd.prover: circuitDir* {. @@ -482,6 +505,13 @@ proc parseCmdArg*( quit QuitFailure ma +proc parseCmdArg*(T: type ThreadCount, input: string): T {.upraises: [ValueError].} = + let count = parseInt(input) + if count != 0 and count < 2: + warn "Invalid number of threads", input = input + quit QuitFailure + ThreadCount(count) + proc parseCmdArg*(T: type SignedPeerRecord, uri: string): T = var res: SignedPeerRecord try: @@ -579,6 +609,15 @@ proc readValue*( quit QuitFailure val = NBytes(value) +proc readValue*( + r: var TomlReader, val: var ThreadCount +) {.upraises: [SerializationError, IOError].} = + var str = r.readValue(string) + try: + val = parseCmdArg(ThreadCount, str) + except CatchableError as err: + raise newException(SerializationError, err.msg) + proc readValue*( r: var TomlReader, val: var Duration ) {.upraises: [SerializationError, IOError].} = @@ -609,6 +648,9 @@ proc completeCmdArg*(T: type NBytes, val: string): seq[string] = proc completeCmdArg*(T: type Duration, val: string): seq[string] = discard +proc completeCmdArg*(T: type ThreadCount, val: string): seq[string] = + discard + # silly chronicles, colors is a compile-time property proc stripAnsi*(v: string): string = var diff --git a/codex/contracts/config.nim b/codex/contracts/config.nim index 87cd1f2a..5493c643 100644 --- a/codex/contracts/config.nim +++ b/codex/contracts/config.nim @@ -4,6 +4,8 @@ import pkg/questionable/results export contractabi +const DefaultRequestCacheSize* = 128.uint16 + type MarketplaceConfig* = object collateral*: CollateralConfig diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index 35557050..9157b269 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -2,6 +2,7 @@ import std/strutils import pkg/ethers import pkg/upraises import pkg/questionable +import pkg/lrucache import ../utils/exceptions import ../logutils import ../market @@ -20,6 +21,7 @@ type signer: Signer rewardRecipient: ?Address configuration: ?MarketplaceConfig + requestCache: LruCache[string, StorageRequest] MarketSubscription = market.Subscription EventSubscription = ethers.Subscription @@ -27,12 +29,22 @@ type eventSubscription: EventSubscription func new*( - _: type OnChainMarket, contract: Marketplace, rewardRecipient = Address.none + _: type OnChainMarket, + contract: Marketplace, + rewardRecipient = Address.none, + requestCacheSize: uint16 = DefaultRequestCacheSize, ): OnChainMarket = without signer =? contract.signer: raiseAssert("Marketplace contract should have a signer") - OnChainMarket(contract: contract, signer: signer, rewardRecipient: rewardRecipient) + var requestCache = newLruCache[string, StorageRequest](int(requestCacheSize)) + + OnChainMarket( + contract: contract, + signer: signer, + rewardRecipient: rewardRecipient, + requestCache: requestCache, + ) proc raiseMarketError(message: string) {.raises: [MarketError].} = raise newException(MarketError, message) @@ -112,9 +124,16 @@ method requestStorage(market: OnChainMarket, request: StorageRequest) {.async.} method getRequest*( market: OnChainMarket, id: RequestId ): Future[?StorageRequest] {.async.} = + let key = $id + + if market.requestCache.contains(key): + return some market.requestCache[key] + convertEthersError: try: - return some await market.contract.getRequest(id) + let request = await market.contract.getRequest(id) + market.requestCache[key] = request + return some request except Marketplace_UnknownRequest: return none StorageRequest diff --git a/codex/contracts/provider.nim b/codex/contracts/provider.nim index b7fc5602..b1576bb0 100644 --- a/codex/contracts/provider.nim +++ b/codex/contracts/provider.nim @@ -14,7 +14,7 @@ proc raiseProviderError(message: string) {.raises: [ProviderError].} = proc blockNumberAndTimestamp*( provider: Provider, blockTag: BlockTag -): Future[(UInt256, UInt256)] {.async: (raises: [ProviderError]).} = +): Future[(UInt256, UInt256)] {.async: (raises: [ProviderError, CancelledError]).} = without latestBlock =? await provider.getBlock(blockTag): raiseProviderError("Could not get latest block") @@ -25,7 +25,7 @@ proc blockNumberAndTimestamp*( proc binarySearchFindClosestBlock( provider: Provider, epochTime: int, low: UInt256, high: UInt256 -): Future[UInt256] {.async: (raises: [ProviderError]).} = +): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} = let (_, lowTimestamp) = await provider.blockNumberAndTimestamp(BlockTag.init(low)) let (_, highTimestamp) = await provider.blockNumberAndTimestamp(BlockTag.init(high)) if abs(lowTimestamp.truncate(int) - epochTime) < @@ -39,7 +39,7 @@ proc binarySearchBlockNumberForEpoch( epochTime: UInt256, latestBlockNumber: UInt256, earliestBlockNumber: UInt256, -): Future[UInt256] {.async: (raises: [ProviderError]).} = +): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} = var low = earliestBlockNumber var high = latestBlockNumber @@ -65,7 +65,7 @@ proc binarySearchBlockNumberForEpoch( proc blockNumberForEpoch*( provider: Provider, epochTime: SecondsSince1970 -): Future[UInt256] {.async: (raises: [ProviderError]).} = +): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} = let epochTimeUInt256 = epochTime.u256 let (latestBlockNumber, latestBlockTimestamp) = await provider.blockNumberAndTimestamp(BlockTag.latest) @@ -118,6 +118,6 @@ proc blockNumberForEpoch*( proc pastBlockTag*( provider: Provider, blocksAgo: int -): Future[BlockTag] {.async: (raises: [ProviderError]).} = +): Future[BlockTag] {.async: (raises: [ProviderError, CancelledError]).} = let head = await provider.getBlockNumber() return BlockTag.init(head - blocksAgo.abs.u256) diff --git a/codex/contracts/requests.nim b/codex/contracts/requests.nim index 48947602..98d8c342 100644 --- a/codex/contracts/requests.nim +++ b/codex/contracts/requests.nim @@ -6,8 +6,10 @@ import pkg/nimcrypto import pkg/ethers/fields import pkg/questionable/results import pkg/stew/byteutils +import pkg/libp2p/[cid, multicodec] import ../logutils import ../utils/json +from ../errors import mapFailure export contractabi @@ -29,7 +31,7 @@ type maxSlotLoss* {.serialize.}: uint64 StorageContent* = object - cid* {.serialize.}: string + cid* {.serialize.}: Cid merkleRoot*: array[32, byte] Slot* = object @@ -120,6 +122,9 @@ func fromTuple(_: type StorageAsk, tupl: tuple): StorageAsk = func fromTuple(_: type StorageContent, tupl: tuple): StorageContent = StorageContent(cid: tupl[0], merkleRoot: tupl[1]) +func solidityType*(_: type Cid): string = + solidityType(seq[byte]) + func solidityType*(_: type StorageContent): string = solidityType(StorageContent.fieldTypes) @@ -129,6 +134,10 @@ func solidityType*(_: type StorageAsk): string = func solidityType*(_: type StorageRequest): string = solidityType(StorageRequest.fieldTypes) +# Note: it seems to be ok to ignore the vbuffer offset for now +func encode*(encoder: var AbiEncoder, cid: Cid) = + encoder.write(cid.data.buffer) + func encode*(encoder: var AbiEncoder, content: StorageContent) = encoder.write(content.fieldValues) @@ -141,8 +150,12 @@ func encode*(encoder: var AbiEncoder, id: RequestId | SlotId | Nonce) = func encode*(encoder: var AbiEncoder, request: StorageRequest) = encoder.write(request.fieldValues) -func encode*(encoder: var AbiEncoder, request: Slot) = - encoder.write(request.fieldValues) +func encode*(encoder: var AbiEncoder, slot: Slot) = + encoder.write(slot.fieldValues) + +func decode*(decoder: var AbiDecoder, T: type Cid): ?!T = + let data = ?decoder.read(seq[byte]) + Cid.init(data).mapFailure func decode*(decoder: var AbiDecoder, T: type StorageContent): ?!T = let tupl = ?decoder.read(StorageContent.fieldTypes) diff --git a/codex/erasure/backend.nim b/codex/erasure/backend.nim index a6dd8b8c..32009829 100644 --- a/codex/erasure/backend.nim +++ b/codex/erasure/backend.nim @@ -29,14 +29,18 @@ method release*(self: ErasureBackend) {.base, gcsafe.} = raiseAssert("not implemented!") method encode*( - self: EncoderBackend, buffers, parity: var openArray[seq[byte]] + self: EncoderBackend, + buffers, parity: ptr UncheckedArray[ptr UncheckedArray[byte]], + dataLen, parityLen: int, ): Result[void, cstring] {.base, gcsafe.} = ## encode buffers using a backend ## raiseAssert("not implemented!") method decode*( - self: DecoderBackend, buffers, parity, recovered: var openArray[seq[byte]] + self: DecoderBackend, + buffers, parity, recovered: ptr UncheckedArray[ptr UncheckedArray[byte]], + dataLen, parityLen, recoveredLen: int, ): Result[void, cstring] {.base, gcsafe.} = ## decode buffers using a backend ## diff --git a/codex/erasure/backends/leopard.nim b/codex/erasure/backends/leopard.nim index c9f9db40..ae599f12 100644 --- a/codex/erasure/backends/leopard.nim +++ b/codex/erasure/backends/leopard.nim @@ -22,11 +22,13 @@ type decoder*: Option[LeoDecoder] method encode*( - self: LeoEncoderBackend, data, parity: var openArray[seq[byte]] + self: LeoEncoderBackend, + data, parity: ptr UncheckedArray[ptr UncheckedArray[byte]], + dataLen, parityLen: int, ): Result[void, cstring] = ## Encode data using Leopard backend - if parity.len == 0: + if parityLen == 0: return ok() var encoder = @@ -36,10 +38,12 @@ method encode*( else: self.encoder.get() - encoder.encode(data, parity) + encoder.encode(data, parity, dataLen, parityLen) method decode*( - self: LeoDecoderBackend, data, parity, recovered: var openArray[seq[byte]] + self: LeoDecoderBackend, + data, parity, recovered: ptr UncheckedArray[ptr UncheckedArray[byte]], + dataLen, parityLen, recoveredLen: int, ): Result[void, cstring] = ## Decode data using given Leopard backend @@ -50,7 +54,7 @@ method decode*( else: self.decoder.get() - decoder.decode(data, parity, recovered) + decoder.decode(data, parity, recovered, dataLen, parityLen, recoveredLen) method release*(self: LeoEncoderBackend) = if self.encoder.isSome: diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index aacd187a..107f85bc 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -12,12 +12,14 @@ import pkg/upraises push: {.upraises: [].} -import std/sequtils -import std/sugar +import std/[sugar, atomics, sequtils] import pkg/chronos +import pkg/chronos/threadsync +import pkg/chronicles import pkg/libp2p/[multicodec, cid, multihash] import pkg/libp2p/protobuf/minprotobuf +import pkg/taskpools import ../logutils import ../manifest @@ -28,6 +30,7 @@ import ../utils import ../utils/asynciter import ../indexingstrategy import ../errors +import ../utils/arrayutils import pkg/stew/byteutils @@ -68,6 +71,7 @@ type proc(size, blocks, parity: int): DecoderBackend {.raises: [Defect], noSideEffect.} Erasure* = ref object + taskPool: Taskpool encoderProvider*: EncoderProvider decoderProvider*: DecoderProvider store*: BlockStore @@ -87,6 +91,24 @@ type # provided. minSize*: NBytes + EncodeTask = object + success: Atomic[bool] + erasure: ptr Erasure + blocks: ptr UncheckedArray[ptr UncheckedArray[byte]] + parity: ptr UncheckedArray[ptr UncheckedArray[byte]] + blockSize, blocksLen, parityLen: int + signal: ThreadSignalPtr + + DecodeTask = object + success: Atomic[bool] + erasure: ptr Erasure + blocks: ptr UncheckedArray[ptr UncheckedArray[byte]] + parity: ptr UncheckedArray[ptr UncheckedArray[byte]] + recovered: ptr UncheckedArray[ptr UncheckedArray[byte]] + blockSize, blocksLen: int + parityLen, recoveredLen: int + signal: ThreadSignalPtr + func indexToPos(steps, idx, step: int): int {.inline.} = ## Convert an index to a position in the encoded ## dataset @@ -269,6 +291,81 @@ proc init*( strategy: strategy, ) +proc leopardEncodeTask(tp: Taskpool, task: ptr EncodeTask) {.gcsafe.} = + # Task suitable for running in taskpools - look, no GC! + let encoder = + task[].erasure.encoderProvider(task[].blockSize, task[].blocksLen, task[].parityLen) + defer: + encoder.release() + discard task[].signal.fireSync() + + if ( + let res = + encoder.encode(task[].blocks, task[].parity, task[].blocksLen, task[].parityLen) + res.isErr + ): + warn "Error from leopard encoder backend!", error = $res.error + + task[].success.store(false) + else: + task[].success.store(true) + +proc encodeAsync*( + self: Erasure, + blockSize, blocksLen, parityLen: int, + data: ref seq[seq[byte]], + parity: ptr UncheckedArray[ptr UncheckedArray[byte]], +): Future[?!void] {.async: (raises: [CancelledError]).} = + without threadPtr =? ThreadSignalPtr.new(): + return failure("Unable to create thread signal") + + defer: + threadPtr.close().expect("closing once works") + + var blockData = createDoubleArray(blocksLen, blockSize) + + for i in 0 ..< data[].len: + copyMem(blockData[i], addr data[i][0], blockSize) + + defer: + freeDoubleArray(blockData, blocksLen) + + ## Create an ecode task with block data + var task = EncodeTask( + erasure: addr self, + blockSize: blockSize, + blocksLen: blocksLen, + parityLen: parityLen, + blocks: blockData, + parity: parity, + signal: threadPtr, + ) + + let t = addr task + + doAssert self.taskPool.numThreads > 1, + "Must have at least one separate thread or signal will never be fired" + self.taskPool.spawn leopardEncodeTask(self.taskPool, t) + let threadFut = threadPtr.wait() + + try: + await threadFut.join() + except CatchableError as exc: + try: + await threadFut + except AsyncError as asyncExc: + return failure(asyncExc.msg) + finally: + if exc of CancelledError: + raise (ref CancelledError) exc + else: + return failure(exc.msg) + + if not t.success.load(): + return failure("Leopard encoding failed") + + success() + proc encodeData( self: Erasure, manifest: Manifest, params: EncodingParams ): Future[?!Manifest] {.async.} = @@ -276,7 +373,6 @@ proc encodeData( ## ## `manifest` - the manifest to encode ## - logScope: steps = params.steps rounded_blocks = params.rounded @@ -286,7 +382,6 @@ proc encodeData( var cids = seq[Cid].new() - encoder = self.encoderProvider(manifest.blockSize.int, params.ecK, params.ecM) emptyBlock = newSeq[byte](manifest.blockSize.int) cids[].setLen(params.blocksCount) @@ -296,8 +391,7 @@ proc encodeData( # TODO: Don't allocate a new seq every time, allocate once and zero out var data = seq[seq[byte]].new() # number of blocks to encode - parityData = - newSeqWith[seq[byte]](params.ecM, newSeq[byte](manifest.blockSize.int)) + parity = createDoubleArray(params.ecM, manifest.blockSize.int) data[].setLen(params.ecK) # TODO: this is a tight blocking loop so we sleep here to allow @@ -311,15 +405,25 @@ proc encodeData( trace "Unable to prepare data", error = err.msg return failure(err) - trace "Erasure coding data", data = data[].len, parity = parityData.len + trace "Erasure coding data", data = data[].len - if (let res = encoder.encode(data[], parityData); res.isErr): - trace "Unable to encode manifest!", error = $res.error - return failure($res.error) + try: + if err =? ( + await self.encodeAsync( + manifest.blockSize.int, params.ecK, params.ecM, data, parity + ) + ).errorOption: + return failure(err) + except CancelledError as exc: + raise exc + finally: + freeDoubleArray(parity, params.ecM) var idx = params.rounded + step for j in 0 ..< params.ecM: - without blk =? bt.Block.new(parityData[j]), error: + var innerPtr: ptr UncheckedArray[byte] = parity[][j] + without blk =? bt.Block.new(innerPtr.toOpenArray(0, manifest.blockSize.int - 1)), + error: trace "Unable to create parity block", err = error.msg return failure(error) @@ -356,8 +460,6 @@ proc encodeData( except CatchableError as exc: trace "Erasure coding encoding error", exc = exc.msg return failure(exc) - finally: - encoder.release() proc encode*( self: Erasure, @@ -381,6 +483,101 @@ proc encode*( return success encodedManifest +proc leopardDecodeTask(tp: Taskpool, task: ptr DecodeTask) {.gcsafe.} = + # Task suitable for running in taskpools - look, no GC! + let decoder = + task[].erasure.decoderProvider(task[].blockSize, task[].blocksLen, task[].parityLen) + defer: + decoder.release() + + if ( + let res = decoder.decode( + task[].blocks, + task[].parity, + task[].recovered, + task[].blocksLen, + task[].parityLen, + task[].recoveredLen, + ) + res.isErr + ): + warn "Error from leopard decoder backend!", error = $res.error + task[].success.store(false) + else: + task[].success.store(true) + + discard task[].signal.fireSync() + +proc decodeAsync*( + self: Erasure, + blockSize, blocksLen, parityLen: int, + blocks, parity: ref seq[seq[byte]], + recovered: ptr UncheckedArray[ptr UncheckedArray[byte]], +): Future[?!void] {.async: (raises: [CancelledError]).} = + without threadPtr =? ThreadSignalPtr.new(): + return failure("Unable to create thread signal") + + defer: + threadPtr.close().expect("closing once works") + + var + blocksData = createDoubleArray(blocksLen, blockSize) + parityData = createDoubleArray(parityLen, blockSize) + + for i in 0 ..< blocks[].len: + if blocks[i].len > 0: + copyMem(blocksData[i], addr blocks[i][0], blockSize) + else: + blocksData[i] = nil + + for i in 0 ..< parity[].len: + if parity[i].len > 0: + copyMem(parityData[i], addr parity[i][0], blockSize) + else: + parityData[i] = nil + + defer: + freeDoubleArray(blocksData, blocksLen) + freeDoubleArray(parityData, parityLen) + + ## Create an decode task with block data + var task = DecodeTask( + erasure: addr self, + blockSize: blockSize, + blocksLen: blocksLen, + parityLen: parityLen, + recoveredLen: blocksLen, + blocks: blocksData, + parity: parityData, + recovered: recovered, + signal: threadPtr, + ) + + # Hold the task pointer until the signal is received + let t = addr task + doAssert self.taskPool.numThreads > 1, + "Must have at least one separate thread or signal will never be fired" + self.taskPool.spawn leopardDecodeTask(self.taskPool, t) + let threadFut = threadPtr.wait() + + try: + await threadFut.join() + except CatchableError as exc: + try: + await threadFut + except AsyncError as asyncExc: + return failure(asyncExc.msg) + finally: + if exc of CancelledError: + raise (ref CancelledError) exc + else: + return failure(exc.msg) + + if not t.success.load(): + return failure("Leopard encoding failed") + + success() + proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = ## Decode a protected manifest into it's original ## manifest @@ -388,7 +585,6 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = ## `encoded` - the encoded (protected) manifest to ## be recovered ## - logScope: steps = encoded.steps rounded_blocks = encoded.rounded @@ -411,8 +607,7 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = var data = seq[seq[byte]].new() parityData = seq[seq[byte]].new() - recovered = - newSeqWith[seq[byte]](encoded.ecK, newSeq[byte](encoded.blockSize.int)) + recovered = createDoubleArray(encoded.ecK, encoded.blockSize.int) data[].setLen(encoded.ecK) # set len to K parityData[].setLen(encoded.ecM) # set len to M @@ -430,15 +625,26 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = continue trace "Erasure decoding data" - - if (let err = decoder.decode(data[], parityData[], recovered); err.isErr): - trace "Unable to decode data!", err = $err.error - return failure($err.error) + try: + if err =? ( + await self.decodeAsync( + encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered + ) + ).errorOption: + return failure(err) + except CancelledError as exc: + raise exc + finally: + freeDoubleArray(recovered, encoded.ecK) for i in 0 ..< encoded.ecK: let idx = i * encoded.steps + step if data[i].len <= 0 and not cids[idx].isEmpty: - without blk =? bt.Block.new(recovered[i]), error: + var innerPtr: ptr UncheckedArray[byte] = recovered[][i] + + without blk =? bt.Block.new( + innerPtr.toOpenArray(0, encoded.blockSize.int - 1) + ), error: trace "Unable to create block!", exc = error.msg return failure(error) @@ -490,10 +696,13 @@ proc new*( store: BlockStore, encoderProvider: EncoderProvider, decoderProvider: DecoderProvider, + taskPool: Taskpool, ): Erasure = ## Create a new Erasure instance for encoding and decoding manifests ## - Erasure( - store: store, encoderProvider: encoderProvider, decoderProvider: decoderProvider + store: store, + encoderProvider: encoderProvider, + decoderProvider: decoderProvider, + taskPool: taskPool, ) diff --git a/codex/manifest/coders.nim b/codex/manifest/coders.nim index 0c461e45..30e0c7ca 100644 --- a/codex/manifest/coders.nim +++ b/codex/manifest/coders.nim @@ -63,7 +63,6 @@ proc encode*(manifest: Manifest): ?!seq[byte] = # optional ErasureInfo erasure = 7; # erasure coding info # optional filename: ?string = 8; # original filename # optional mimetype: ?string = 9; # original mimetype - # optional uploadedAt: ?int64 = 10; # original uploadedAt # } # ``` # @@ -102,9 +101,6 @@ proc encode*(manifest: Manifest): ?!seq[byte] = if manifest.mimetype.isSome: header.write(9, manifest.mimetype.get()) - if manifest.uploadedAt.isSome: - header.write(10, manifest.uploadedAt.get().uint64) - pbNode.write(1, header) # set the treeCid as the data field pbNode.finish() @@ -135,7 +131,6 @@ proc decode*(_: type Manifest, data: openArray[byte]): ?!Manifest = verifiableStrategy: uint32 filename: string mimetype: string - uploadedAt: uint64 # Decode `Header` message if pbNode.getField(1, pbHeader).isErr: @@ -169,9 +164,6 @@ proc decode*(_: type Manifest, data: openArray[byte]): ?!Manifest = if pbHeader.getField(9, mimetype).isErr: return failure("Unable to decode `mimetype` from manifest!") - if pbHeader.getField(10, uploadedAt).isErr: - return failure("Unable to decode `uploadedAt` from manifest!") - let protected = pbErasureInfo.buffer.len > 0 var verifiable = false if protected: @@ -211,7 +203,6 @@ proc decode*(_: type Manifest, data: openArray[byte]): ?!Manifest = var filenameOption = if filename.len == 0: string.none else: filename.some var mimetypeOption = if mimetype.len == 0: string.none else: mimetype.some - var uploadedAtOption = if uploadedAt == 0: int64.none else: uploadedAt.int64.some let self = if protected: @@ -229,7 +220,6 @@ proc decode*(_: type Manifest, data: openArray[byte]): ?!Manifest = strategy = StrategyType(protectedStrategy), filename = filenameOption, mimetype = mimetypeOption, - uploadedAt = uploadedAtOption, ) else: Manifest.new( @@ -241,7 +231,6 @@ proc decode*(_: type Manifest, data: openArray[byte]): ?!Manifest = codec = codec.MultiCodec, filename = filenameOption, mimetype = mimetypeOption, - uploadedAt = uploadedAtOption, ) ?self.verify() diff --git a/codex/manifest/manifest.nim b/codex/manifest/manifest.nim index 6e0d1b80..cbb0bace 100644 --- a/codex/manifest/manifest.nim +++ b/codex/manifest/manifest.nim @@ -38,7 +38,6 @@ type Manifest* = ref object of RootObj version: CidVersion # Cid version filename {.serialize.}: ?string # The filename of the content uploaded (optional) mimetype {.serialize.}: ?string # The mimetype of the content uploaded (optional) - uploadedAt {.serialize.}: ?int64 # The UTC creation timestamp in seconds case protected {.serialize.}: bool # Protected datasets have erasure coded info of true: ecK: int # Number of blocks to encode @@ -131,8 +130,6 @@ func filename*(self: Manifest): ?string = func mimetype*(self: Manifest): ?string = self.mimetype -func uploadedAt*(self: Manifest): ?int64 = - self.uploadedAt ############################################################ # Operations on block list ############################################################ @@ -165,14 +162,11 @@ func verify*(self: Manifest): ?!void = return success() -func cid*(self: Manifest): ?!Cid {.deprecated: "use treeCid instead".} = - self.treeCid.success - func `==`*(a, b: Manifest): bool = (a.treeCid == b.treeCid) and (a.datasetSize == b.datasetSize) and (a.blockSize == b.blockSize) and (a.version == b.version) and (a.hcodec == b.hcodec) and (a.codec == b.codec) and (a.protected == b.protected) and (a.filename == b.filename) and - (a.mimetype == b.mimetype) and (a.uploadedAt == b.uploadedAt) and ( + (a.mimetype == b.mimetype) and ( if a.protected: (a.ecK == b.ecK) and (a.ecM == b.ecM) and (a.originalTreeCid == b.originalTreeCid) and (a.originalDatasetSize == b.originalDatasetSize) and @@ -202,9 +196,6 @@ func `$`*(self: Manifest): string = if self.mimetype.isSome: result &= ", mimetype: " & $self.mimetype - if self.uploadedAt.isSome: - result &= ", uploadedAt: " & $self.uploadedAt - result &= ( if self.protected: ", ecK: " & $self.ecK & ", ecM: " & $self.ecM & ", originalTreeCid: " & @@ -236,7 +227,6 @@ func new*( protected = false, filename: ?string = string.none, mimetype: ?string = string.none, - uploadedAt: ?int64 = int64.none, ): Manifest = T( treeCid: treeCid, @@ -248,7 +238,6 @@ func new*( protected: protected, filename: filename, mimetype: mimetype, - uploadedAt: uploadedAt, ) func new*( @@ -278,7 +267,6 @@ func new*( protectedStrategy: strategy, filename: manifest.filename, mimetype: manifest.mimetype, - uploadedAt: manifest.uploadedAt, ) func new*(T: type Manifest, manifest: Manifest): Manifest = @@ -296,7 +284,6 @@ func new*(T: type Manifest, manifest: Manifest): Manifest = protected: false, filename: manifest.filename, mimetype: manifest.mimetype, - uploadedAt: manifest.uploadedAt, ) func new*( @@ -314,7 +301,6 @@ func new*( strategy = SteppedStrategy, filename: ?string = string.none, mimetype: ?string = string.none, - uploadedAt: ?int64 = int64.none, ): Manifest = Manifest( treeCid: treeCid, @@ -331,7 +317,6 @@ func new*( protectedStrategy: strategy, filename: filename, mimetype: mimetype, - uploadedAt: uploadedAt, ) func new*( @@ -374,7 +359,6 @@ func new*( verifiableStrategy: strategy, filename: manifest.filename, mimetype: manifest.mimetype, - uploadedAt: manifest.uploadedAt, ) func new*(T: type Manifest, data: openArray[byte]): ?!Manifest = diff --git a/codex/node.nim b/codex/node.nim index ee2a2b46..062ec2ce 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -15,6 +15,7 @@ import std/strformat import std/sugar import times +import pkg/taskpools import pkg/questionable import pkg/questionable/results import pkg/chronos @@ -70,6 +71,7 @@ type contracts*: Contracts clock*: Clock storage*: Contracts + taskpool: Taskpool CodexNodeRef* = ref CodexNode @@ -235,8 +237,9 @@ proc streamEntireDataset( # Retrieve, decode and save to the local store all EС groups proc erasureJob(): Future[?!void] {.async.} = # Spawn an erasure decoding job - let erasure = - Erasure.new(self.networkStore, leoEncoderProvider, leoDecoderProvider) + let erasure = Erasure.new( + self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + ) without _ =? (await erasure.decode(manifest)), error: error "Unable to erasure decode manifest", manifestCid, exc = error.msg return failure(error) @@ -267,6 +270,65 @@ proc retrieve*( await self.streamEntireDataset(manifest, cid) +proc deleteSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} = + if err =? (await self.networkStore.delBlock(cid)).errorOption: + error "Error deleting block", cid, err = err.msg + return failure(err) + + trace "Deleted block", cid + return success() + +proc deleteEntireDataset(self: CodexNodeRef, cid: Cid): Future[?!void] {.async.} = + # Deletion is a strictly local operation + var store = self.networkStore.localStore + + if not (await cid in store): + # As per the contract for delete*, an absent dataset is not an error. + return success() + + without manifestBlock =? await store.getBlock(cid), err: + return failure(err) + + without manifest =? Manifest.decode(manifestBlock), err: + return failure(err) + + let runtimeQuota = initDuration(milliseconds = 100) + var lastIdle = getTime() + for i in 0 ..< manifest.blocksCount: + if (getTime() - lastIdle) >= runtimeQuota: + await idleAsync() + lastIdle = getTime() + + if err =? (await store.delBlock(manifest.treeCid, i)).errorOption: + # The contract for delBlock is fuzzy, but we assume that if the block is + # simply missing we won't get an error. This is a best effort operation and + # can simply be retried. + error "Failed to delete block within dataset", index = i, err = err.msg + return failure(err) + + if err =? (await store.delBlock(cid)).errorOption: + error "Error deleting manifest block", err = err.msg + + success() + +proc delete*( + self: CodexNodeRef, cid: Cid +): Future[?!void] {.async: (raises: [CatchableError]).} = + ## Deletes a whole dataset, if Cid is a Manifest Cid, or a single block, if Cid a block Cid, + ## from the underlying block store. This is a strictly local operation. + ## + ## Missing blocks in dataset deletes are ignored. + ## + + without isManifest =? cid.isManifest, err: + trace "Bad content type for CID:", cid = cid, err = err.msg + return failure(err) + + if not isManifest: + return await self.deleteSingleBlock(cid) + + await self.deleteEntireDataset(cid) + proc store*( self: CodexNodeRef, stream: LPStream, @@ -332,7 +394,6 @@ proc store*( codec = dataCodec, filename = filename, mimetype = mimetype, - uploadedAt = now().utc.toTime.toUnix.some, ) without manifestBlk =? await self.storeManifest(manifest), err: @@ -403,8 +464,9 @@ proc setupRequest( return failure error # Erasure code the dataset according to provided parameters - let erasure = - Erasure.new(self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider) + let erasure = Erasure.new( + self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + ) without encoded =? (await erasure.encode(manifest, ecK, ecM)), error: trace "Unable to erasure code dataset" @@ -439,10 +501,7 @@ proc setupRequest( collateralPerByte: collateralPerByte, maxSlotLoss: tolerance, ), - content: StorageContent( - cid: $manifestBlk.cid, # TODO: why string? - merkleRoot: verifyRoot, - ), + content: StorageContent(cid: manifestBlk.cid, merkleRoot: verifyRoot), expiry: expiry, ) @@ -499,16 +558,14 @@ proc onStore( ## store data in local storage ## + let cid = request.content.cid + logScope: - cid = request.content.cid + cid = $cid slotIdx = slotIdx trace "Received a request to store a slot" - without cid =? Cid.init(request.content.cid).mapFailure, err: - trace "Unable to parse Cid", cid - return failure(err) - without manifest =? (await self.fetchManifest(cid)), err: trace "Unable to fetch manifest for cid", cid, err = err.msg return failure(err) @@ -578,7 +635,7 @@ proc onProve( ## let - cidStr = slot.request.content.cid + cidStr = $slot.request.content.cid slotIdx = slot.slotIndex.truncate(Natural) logScope: @@ -627,14 +684,9 @@ proc onProve( failure "Prover not enabled" proc onExpiryUpdate( - self: CodexNodeRef, rootCid: string, expiry: SecondsSince1970 + self: CodexNodeRef, rootCid: Cid, expiry: SecondsSince1970 ): Future[?!void] {.async.} = - without cid =? Cid.init(rootCid): - trace "Unable to parse Cid", cid - let error = newException(CodexError, "Unable to parse Cid") - return failure(error) - - return await self.updateExpiry(cid, expiry) + return await self.updateExpiry(rootCid, expiry) proc onClear(self: CodexNodeRef, request: StorageRequest, slotIndex: UInt256) = # TODO: remove data from local storage @@ -657,7 +709,7 @@ proc start*(self: CodexNodeRef) {.async.} = self.onStore(request, slot, onBatch) hostContracts.sales.onExpiryUpdate = proc( - rootCid: string, expiry: SecondsSince1970 + rootCid: Cid, expiry: SecondsSince1970 ): Future[?!void] = self.onExpiryUpdate(rootCid, expiry) @@ -724,12 +776,16 @@ proc stop*(self: CodexNodeRef) {.async.} = if not self.networkStore.isNil: await self.networkStore.close + if not self.taskpool.isNil: + self.taskpool.shutdown() + proc new*( T: type CodexNodeRef, switch: Switch, networkStore: NetworkStore, engine: BlockExcEngine, discovery: Discovery, + taskpool: Taskpool, prover = Prover.none, contracts = Contracts.default, ): CodexNodeRef = @@ -742,5 +798,6 @@ proc new*( engine: engine, prover: prover, discovery: discovery, + taskPool: taskpool, contracts: contracts, ) diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 4d9df011..7b7dbce9 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -204,7 +204,7 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute trace "Handling file upload" var bodyReader = request.getBodyReader() if bodyReader.isErr(): - return RestApiResponse.error(Http500) + return RestApiResponse.error(Http500, msg = bodyReader.error()) # Attempt to handle `Expect` header # some clients (curl), wait 1000ms @@ -215,10 +215,13 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute var mimetype = request.headers.getString(ContentTypeHeader).some if mimetype.get() != "": + let mimetypeVal = mimetype.get() var m = newMimetypes() - let extension = m.getExt(mimetype.get(), "") + let extension = m.getExt(mimetypeVal, "") if extension == "": - return RestApiResponse.error(Http422, "The MIME type is not valid.") + return RestApiResponse.error( + Http422, "The MIME type '" & mimetypeVal & "' is not valid." + ) else: mimetype = string.none @@ -256,13 +259,19 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute finally: await reader.closeWait() - trace "Something went wrong error" - return RestApiResponse.error(Http500) - router.api(MethodGet, "/api/codex/v1/data") do() -> RestApiResponse: let json = await formatManifestBlocks(node) return RestApiResponse.response($json, contentType = "application/json") + router.api(MethodOptions, "/api/codex/v1/data/{cid}") do( + cid: Cid, resp: HttpResponseRef + ) -> RestApiResponse: + if corsOrigin =? allowedOrigin: + resp.setCorsHeaders("GET,DELETE", corsOrigin) + + resp.status = Http204 + await resp.sendBody("") + router.api(MethodGet, "/api/codex/v1/data/{cid}") do( cid: Cid, resp: HttpResponseRef ) -> RestApiResponse: @@ -279,6 +288,27 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute await node.retrieveCid(cid.get(), local = true, resp = resp) + router.api(MethodDelete, "/api/codex/v1/data/{cid}") do( + cid: Cid, resp: HttpResponseRef + ) -> RestApiResponse: + ## Deletes either a single block or an entire dataset + ## from the local node. Does nothing and returns 200 + ## if the dataset is not locally available. + ## + var headers = buildCorsHeaders("DELETE", allowedOrigin) + + if cid.isErr: + return RestApiResponse.error(Http400, $cid.error(), headers = headers) + + if err =? (await node.delete(cid.get())).errorOption: + return RestApiResponse.error(Http500, err.msg, headers = headers) + + if corsOrigin =? allowedOrigin: + resp.setCorsHeaders("DELETE", corsOrigin) + + resp.status = Http204 + await resp.sendBody("") + router.api(MethodPost, "/api/codex/v1/data/{cid}/network") do( cid: Cid, resp: HttpResponseRef ) -> RestApiResponse: diff --git a/codex/sales/salescontext.nim b/codex/sales/salescontext.nim index bb0b5dc9..95f06c04 100644 --- a/codex/sales/salescontext.nim +++ b/codex/sales/salescontext.nim @@ -1,6 +1,7 @@ import pkg/questionable import pkg/questionable/results import pkg/upraises +import pkg/libp2p/cid import ../market import ../clock @@ -30,7 +31,7 @@ type OnProve* = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {. gcsafe, upraises: [] .} - OnExpiryUpdate* = proc(rootCid: string, expiry: SecondsSince1970): Future[?!void] {. + OnExpiryUpdate* = proc(rootCid: Cid, expiry: SecondsSince1970): Future[?!void] {. gcsafe, upraises: [] .} OnClear* = proc(request: StorageRequest, slotIndex: UInt256) {.gcsafe, upraises: [].} diff --git a/codex/slots/builder/builder.nim b/codex/slots/builder/builder.nim index 30332f1c..74597ff1 100644 --- a/codex/slots/builder/builder.nim +++ b/codex/slots/builder/builder.nim @@ -189,7 +189,7 @@ proc getCellHashes*[T, H]( blkIdx = blkIdx pos = i - trace "Getting block CID for tree at index" + trace "Getting block CID for tree at index", index = blkIdx without (_, tree) =? (await self.buildBlockTree(blkIdx, i)) and digest =? tree.root, err: error "Failed to get block CID for tree at index", err = err.msg diff --git a/codex/stores/repostore/operations.nim b/codex/stores/repostore/operations.nim index dcacbd62..125741e1 100644 --- a/codex/stores/repostore/operations.nim +++ b/codex/stores/repostore/operations.nim @@ -57,6 +57,17 @@ proc putLeafMetadata*( (md.some, res), ) +proc delLeafMetadata*( + self: RepoStore, treeCid: Cid, index: Natural +): Future[?!void] {.async.} = + without key =? createBlockCidAndProofMetadataKey(treeCid, index), err: + return failure(err) + + if err =? (await self.metaDs.delete(key)).errorOption: + return failure(err) + + success() + proc getLeafMetadata*( self: RepoStore, treeCid: Cid, index: Natural ): Future[?!LeafMetadata] {.async.} = @@ -205,9 +216,6 @@ proc storeBlock*( proc tryDeleteBlock*( self: RepoStore, cid: Cid, expiryLimit = SecondsSince1970.low ): Future[?!DeleteResult] {.async.} = - if cid.isEmpty: - return success(DeleteResult(kind: InUse)) - without metaKey =? createBlockExpirationMetadataKey(cid), err: return failure(err) diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim index 2b14d6b7..d7305107 100644 --- a/codex/stores/repostore/store.nim +++ b/codex/stores/repostore/store.nim @@ -186,13 +186,13 @@ method putBlock*( return success() -method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = - ## Delete a block from the blockstore when block refCount is 0 or block is expired - ## - +proc delBlockInternal(self: RepoStore, cid: Cid): Future[?!DeleteResultKind] {.async.} = logScope: cid = cid + if cid.isEmpty: + return success(Deleted) + trace "Attempting to delete a block" without res =? await self.tryDeleteBlock(cid, self.clock.now()), err: @@ -205,12 +205,28 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = if err =? (await self.updateQuotaUsage(minusUsed = res.released)).errorOption: return failure(err) - elif res.kind == InUse: - trace "Block in use, refCount > 0 and not expired" - else: - trace "Block not found in store" - return success() + success(res.kind) + +method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = + ## Delete a block from the blockstore when block refCount is 0 or block is expired + ## + + logScope: + cid = cid + + without outcome =? await self.delBlockInternal(cid), err: + return failure(err) + + case outcome + of InUse: + failure("Directly deleting a block that is part of a dataset is not allowed.") + of NotFound: + trace "Block not found, ignoring" + success() + of Deleted: + trace "Block already deleted" + success() method delBlock*( self: RepoStore, treeCid: Cid, index: Natural @@ -221,12 +237,19 @@ method delBlock*( else: return failure(err) + if err =? (await self.delLeafMetadata(treeCid, index)).errorOption: + error "Failed to delete leaf metadata, block will remain on disk.", err = err.msg + return failure(err) + if err =? (await self.updateBlockMetadata(leafMd.blkCid, minusRefCount = 1)).errorOption: if not (err of BlockNotFoundError): return failure(err) - await self.delBlock(leafMd.blkCid) # safe delete, only if refCount == 0 + without _ =? await self.delBlockInternal(leafMd.blkCid), err: + return failure(err) + + success() method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} = ## Check if the block exists in the blockstore @@ -295,6 +318,18 @@ proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query = let queryKey = ?createBlockExpirationMetadataQueryKey() success Query.init(queryKey, offset = offset, limit = maxNumber) +proc blockRefCount*(self: RepoStore, cid: Cid): Future[?!Natural] {.async.} = + ## Returns the reference count for a block. If the count is zero; + ## this means the block is eligible for garbage collection. + ## + without key =? createBlockExpirationMetadataKey(cid), err: + return failure(err) + + without md =? await get[BlockMetadata](self.metaDs, key), err: + return failure(err) + + return success(md.refCount) + method getBlockExpirations*( self: RepoStore, maxNumber: int, offset: int ): Future[?!AsyncIter[BlockExpiration]] {.async, base.} = diff --git a/codex/streams/storestream.nim b/codex/streams/storestream.nim index 85b0e354..a68e2ea7 100644 --- a/codex/streams/storestream.nim +++ b/codex/streams/storestream.nim @@ -110,7 +110,7 @@ method readOnce*( raise newLPStreamReadError(error) trace "Reading bytes from store stream", - manifestCid = self.manifest.cid.get(), + manifestCid = self.manifest.treeCid, numBlocks = self.manifest.blocksCount, blockNum, blkCid = blk.cid, diff --git a/codex/utils/arrayutils.nim b/codex/utils/arrayutils.nim new file mode 100644 index 00000000..c398921f --- /dev/null +++ b/codex/utils/arrayutils.nim @@ -0,0 +1,25 @@ +import std/sequtils + +proc createDoubleArray*( + outerLen, innerLen: int +): ptr UncheckedArray[ptr UncheckedArray[byte]] = + # Allocate outer array + result = cast[ptr UncheckedArray[ptr UncheckedArray[byte]]](allocShared0( + sizeof(ptr UncheckedArray[byte]) * outerLen + )) + + # Allocate each inner array + for i in 0 ..< outerLen: + result[i] = cast[ptr UncheckedArray[byte]](allocShared0(sizeof(byte) * innerLen)) + +proc freeDoubleArray*( + arr: ptr UncheckedArray[ptr UncheckedArray[byte]], outerLen: int +) = + # Free each inner array + for i in 0 ..< outerLen: + if not arr[i].isNil: + deallocShared(arr[i]) + + # Free outer array + if not arr.isNil: + deallocShared(arr) diff --git a/docker/docker-entrypoint.sh b/docker/docker-entrypoint.sh index 2ec840e9..d883f0eb 100644 --- a/docker/docker-entrypoint.sh +++ b/docker/docker-entrypoint.sh @@ -9,6 +9,34 @@ if [[ -n "${ENV_PATH}" ]]; then set +a fi +# Bootstrap node from URL +if [[ -n "${BOOTSTRAP_NODE_URL}" ]]; then + BOOTSTRAP_NODE_URL="${BOOTSTRAP_NODE_URL}/api/codex/v1/spr" + WAIT=${BOOTSTRAP_NODE_URL_WAIT:-300} + SECONDS=0 + SLEEP=1 + # Run and retry if fail + while (( SECONDS < WAIT )); do + SPR=$(curl -s -f -m 5 -H 'Accept: text/plain' "${BOOTSTRAP_NODE_URL}") + # Check if exit code is 0 and returned value is not empty + if [[ $? -eq 0 && -n "${SPR}" ]]; then + export CODEX_BOOTSTRAP_NODE="${SPR}" + echo "Bootstrap node: CODEX_BOOTSTRAP_NODE=${CODEX_BOOTSTRAP_NODE}" + break + else + # Sleep and check again + echo "Can't get SPR from ${BOOTSTRAP_NODE_URL} - Retry in $SLEEP seconds / $((WAIT - SECONDS))" + sleep $SLEEP + fi + done +fi + +# Stop Codex run if unable to get SPR +if [[ -n "${BOOTSTRAP_NODE_URL}" && -z "${CODEX_BOOTSTRAP_NODE}" ]]; then + echo "Unable to get SPR from ${BOOTSTRAP_NODE_URL} in ${BOOTSTRAP_NODE_URL_WAIT} seconds - Stop Codex run" + exit 1 +fi + # Parameters if [[ -z "${CODEX_NAT}" ]]; then if [[ "${NAT_IP_AUTO}" == "true" && -z "${NAT_PUBLIC_IP_AUTO}" ]]; then diff --git a/openapi.yaml b/openapi.yaml index e35a28e8..19925225 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -379,12 +379,6 @@ components: nullable: true description: "The original mimetype of the uploaded content (optional)" example: image/png - uploadedAt: - type: integer - format: int64 - nullable: true - description: "The UTC upload timestamp in seconds" - example: 1729244192 Space: type: object diff --git a/tests/codex/blockexchange/discovery/testdiscovery.nim b/tests/codex/blockexchange/discovery/testdiscovery.nim index 88331c3f..97a455e1 100644 --- a/tests/codex/blockexchange/discovery/testdiscovery.nim +++ b/tests/codex/blockexchange/discovery/testdiscovery.nim @@ -96,9 +96,9 @@ asyncchecksuite "Block Advertising and Discovery": await engine.stop() - test "Should advertise both manifests and trees": + test "Should advertise trees": let - cids = @[manifest.cid.tryGet, manifest.treeCid] + cids = @[manifest.treeCid] advertised = initTable.collect: for cid in cids: {cid: newFuture[void]()} diff --git a/tests/codex/examples.nim b/tests/codex/examples.nim index d0facd1b..c7dba3c5 100644 --- a/tests/codex/examples.nim +++ b/tests/codex/examples.nim @@ -8,6 +8,7 @@ import pkg/codex/stores import pkg/codex/blocktype as bt import pkg/codex/sales import pkg/codex/merkletree +import pkg/codex/manifest import ../examples export examples @@ -36,8 +37,8 @@ proc example*(_: type SignedState): SignedState = proc example*(_: type Pricing): Pricing = Pricing(address: EthAddress.example, price: uint32.rand.u256) -proc example*(_: type bt.Block): bt.Block = - let length = rand(4096) +proc example*(_: type bt.Block, size: int = 4096): bt.Block = + let length = rand(size) let bytes = newSeqWith(length, rand(uint8)) bt.Block.new(bytes).tryGet() @@ -51,6 +52,15 @@ proc example*(_: type BlockExcPeerCtx): BlockExcPeerCtx = proc example*(_: type Cid): Cid = bt.Block.example.cid +proc example*(_: type Manifest): Manifest = + Manifest.new( + treeCid = Cid.example, + blockSize = 256.NBytes, + datasetSize = 4096.NBytes, + filename = "example.txt".some, + mimetype = "text/plain".some, + ) + proc example*(_: type MultiHash, mcodec = Sha256HashCodec): MultiHash = let bytes = newSeqWith(256, rand(uint8)) MultiHash.digest($mcodec, bytes).tryGet() diff --git a/tests/codex/helpers.nim b/tests/codex/helpers.nim index 6d7415d3..898dd16e 100644 --- a/tests/codex/helpers.nim +++ b/tests/codex/helpers.nim @@ -85,30 +85,31 @@ proc makeWantList*( ) proc storeDataGetManifest*( - store: BlockStore, chunker: Chunker + store: BlockStore, blocks: seq[Block] ): Future[Manifest] {.async.} = - var cids = newSeq[Cid]() - - while (let chunk = await chunker.getBytes(); chunk.len > 0): - let blk = Block.new(chunk).tryGet() - cids.add(blk.cid) + for blk in blocks: (await store.putBlock(blk)).tryGet() let - tree = CodexTree.init(cids).tryGet() + (manifest, tree) = makeManifestAndTree(blocks).tryGet() treeCid = tree.rootCid.tryGet() - manifest = Manifest.new( - treeCid = treeCid, - blockSize = NBytes(chunker.chunkSize), - datasetSize = NBytes(chunker.offset), - ) for i in 0 ..< tree.leavesCount: let proof = tree.getProof(i).tryGet() - (await store.putCidAndProof(treeCid, i, cids[i], proof)).tryGet() + (await store.putCidAndProof(treeCid, i, blocks[i].cid, proof)).tryGet() return manifest +proc storeDataGetManifest*( + store: BlockStore, chunker: Chunker +): Future[Manifest] {.async.} = + var blocks = newSeq[Block]() + + while (let chunk = await chunker.getBytes(); chunk.len > 0): + blocks.add(Block.new(chunk).tryGet()) + + return await storeDataGetManifest(store, blocks) + proc makeRandomBlocks*( datasetSize: int, blockSize: NBytes ): Future[seq[Block]] {.async.} = diff --git a/tests/codex/node/helpers.nim b/tests/codex/node/helpers.nim index 0d72b06b..2d1a87dc 100644 --- a/tests/codex/node/helpers.nim +++ b/tests/codex/node/helpers.nim @@ -6,6 +6,7 @@ import pkg/chronos import pkg/codex/codextypes import pkg/codex/chunker import pkg/codex/stores +import pkg/taskpools import ../../asynctest @@ -118,6 +119,7 @@ template setupAndTearDown*() {.dirty.} = engine = engine, prover = Prover.none, discovery = blockDiscovery, + taskpool = Taskpool.new(), ) teardown: diff --git a/tests/codex/node/testcontracts.nim b/tests/codex/node/testcontracts.nim index cce6d5bd..0930d925 100644 --- a/tests/codex/node/testcontracts.nim +++ b/tests/codex/node/testcontracts.nim @@ -75,10 +75,9 @@ asyncchecksuite "Test Node - Host contracts": let manifestBlock = bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() - erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) + erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new) manifestCid = manifestBlock.cid - manifestCidStr = $(manifestCid) (await localStore.putBlock(manifestBlock)).tryGet() @@ -99,7 +98,7 @@ asyncchecksuite "Test Node - Host contracts": expectedExpiry: SecondsSince1970 = clock.now + DefaultBlockTtl.seconds + 11123 expiryUpdateCallback = !sales.onExpiryUpdate - (await expiryUpdateCallback(manifestCidStr, expectedExpiry)).tryGet() + (await expiryUpdateCallback(manifestCid, expectedExpiry)).tryGet() for index in 0 ..< manifest.blocksCount: let @@ -116,7 +115,7 @@ asyncchecksuite "Test Node - Host contracts": test "onStore callback": let onStore = !sales.onStore var request = StorageRequest.example - request.content.cid = $verifiableBlock.cid + request.content.cid = verifiableBlock.cid request.expiry = (getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix.u256 var fetchedBytes: uint = 0 diff --git a/tests/codex/node/testnode.nim b/tests/codex/node/testnode.nim index e4a9d1f4..37960232 100644 --- a/tests/codex/node/testnode.nim +++ b/tests/codex/node/testnode.nim @@ -12,6 +12,7 @@ import pkg/questionable/results import pkg/stint import pkg/poseidon2 import pkg/poseidon2/io +import pkg/taskpools import pkg/nitro import pkg/codexdht/discv5/protocol as discv5 @@ -37,6 +38,7 @@ import ../examples import ../helpers import ../helpers/mockmarket import ../helpers/mockclock +import ../slots/helpers import ./helpers @@ -66,7 +68,7 @@ asyncchecksuite "Test Node - Basic": # https://github.com/codex-storage/nim-codex/issues/699 let cstore = CountingStore.new(engine, localStore) - node = CodexNodeRef.new(switch, cstore, engine, blockDiscovery) + node = CodexNodeRef.new(switch, cstore, engine, blockDiscovery, Taskpool.new()) missingCid = Cid.init("zDvZRwzmCvtiyubW9AecnxgLnXK8GrBvpQJBDzToxmzDN6Nrc2CZ").get() @@ -137,7 +139,8 @@ asyncchecksuite "Test Node - Basic": test "Setup purchase request": let - erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) + erasure = + Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new()) manifest = await storeDataGetManifest(localStore, chunker) manifestBlock = bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() @@ -164,5 +167,30 @@ asyncchecksuite "Test Node - Basic": check: (await verifiableBlock.cid in localStore) == true - request.content.cid == $verifiableBlock.cid + request.content.cid == verifiableBlock.cid request.content.merkleRoot == builder.verifyRoot.get.toBytes + + test "Should delete a single block": + let randomBlock = bt.Block.new("Random block".toBytes).tryGet() + (await localStore.putBlock(randomBlock)).tryGet() + check (await randomBlock.cid in localStore) == true + + (await node.delete(randomBlock.cid)).tryGet() + check (await randomBlock.cid in localStore) == false + + test "Should delete an entire dataset": + let + blocks = await makeRandomBlocks(datasetSize = 2048, blockSize = 256'nb) + manifest = await storeDataGetManifest(localStore, blocks) + manifestBlock = (await store.storeManifest(manifest)).tryGet() + manifestCid = manifestBlock.cid + + check await manifestCid in localStore + for blk in blocks: + check await blk.cid in localStore + + (await node.delete(manifestCid)).tryGet() + + check not await manifestCid in localStore + for blk in blocks: + check not (await blk.cid in localStore) diff --git a/tests/codex/sales/states/testfilled.nim b/tests/codex/sales/states/testfilled.nim index f8f77da6..74413776 100644 --- a/tests/codex/sales/states/testfilled.nim +++ b/tests/codex/sales/states/testfilled.nim @@ -36,7 +36,7 @@ checksuite "sales state 'filled'": market.requestEnds[request.id] = 321 onExpiryUpdatePassedExpiry = -1 let onExpiryUpdate = proc( - rootCid: string, expiry: SecondsSince1970 + rootCid: Cid, expiry: SecondsSince1970 ): Future[?!void] {.async.} = onExpiryUpdatePassedExpiry = expiry return success() diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index 1b457f6d..54415d02 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -47,7 +47,9 @@ asyncchecksuite "Sales - start": pricePerBytePerSecond: 1.u256, collateralPerByte: 1.u256, ), - content: StorageContent(cid: "some cid"), + content: StorageContent( + cid: Cid.init("zb2rhheVmk3bLks5MgzTqyznLu1zqGH5jrfTA1eAZXrjx7Vob").tryGet + ), expiry: (getTime() + initDuration(hours = 1)).toUnix.u256, ) @@ -65,7 +67,7 @@ asyncchecksuite "Sales - start": return success() sales.onExpiryUpdate = proc( - rootCid: string, expiry: SecondsSince1970 + rootCid: Cid, expiry: SecondsSince1970 ): Future[?!void] {.async.} = return success() @@ -161,7 +163,9 @@ asyncchecksuite "Sales": pricePerBytePerSecond: minPricePerBytePerSecond, collateralPerByte: 1.u256, ), - content: StorageContent(cid: "some cid"), + content: StorageContent( + cid: Cid.init("zb2rhheVmk3bLks5MgzTqyznLu1zqGH5jrfTA1eAZXrjx7Vob").tryGet + ), expiry: (getTime() + initDuration(hours = 1)).toUnix.u256, ) @@ -184,7 +188,7 @@ asyncchecksuite "Sales": return success() sales.onExpiryUpdate = proc( - rootCid: string, expiry: SecondsSince1970 + rootCid: Cid, expiry: SecondsSince1970 ): Future[?!void] {.async.} = return success() diff --git a/tests/codex/slots/helpers.nim b/tests/codex/slots/helpers.nim index 03d87d12..fced1f1c 100644 --- a/tests/codex/slots/helpers.nim +++ b/tests/codex/slots/helpers.nim @@ -15,9 +15,7 @@ import pkg/codex/rng import ../helpers -proc storeManifest*( - store: BlockStore, manifest: Manifest -): Future[?!bt.Block] {.async.} = +proc makeManifestBlock*(manifest: Manifest): ?!bt.Block = without encodedVerifiable =? manifest.encode(), err: trace "Unable to encode manifest" return failure(err) @@ -26,6 +24,15 @@ proc storeManifest*( trace "Unable to create block from manifest" return failure(error) + success blk + +proc storeManifest*( + store: BlockStore, manifest: Manifest +): Future[?!bt.Block] {.async.} = + without blk =? makeManifestBlock(manifest), err: + trace "Unable to create manifest block", err = err.msg + return failure(err) + if err =? (await store.putBlock(blk)).errorOption: trace "Unable to store manifest block", cid = blk.cid, err = err.msg return failure(err) diff --git a/tests/codex/stores/testrepostore.nim b/tests/codex/stores/testrepostore.nim index dda4ed82..0279b56f 100644 --- a/tests/codex/stores/testrepostore.nim +++ b/tests/codex/stores/testrepostore.nim @@ -12,9 +12,11 @@ import pkg/datastore import pkg/codex/stores/cachestore import pkg/codex/chunker import pkg/codex/stores +import pkg/codex/stores/repostore/operations import pkg/codex/blocktype as bt import pkg/codex/clock import pkg/codex/utils/asynciter +import pkg/codex/merkletree/codex import ../../asynctest import ../helpers @@ -354,6 +356,119 @@ asyncchecksuite "RepoStore": check has.isOk check has.get + test "should set the reference count for orphan blocks to 0": + let blk = Block.example(size = 200) + (await repo.putBlock(blk)).tryGet() + check (await repo.blockRefCount(blk.cid)).tryGet() == 0.Natural + + test "should not allow non-orphan blocks to be deleted directly": + let + repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = + 1000'nb) + dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb) + blk = dataset[0] + (manifest, tree) = makeManifestAndTree(dataset).tryGet() + treeCid = tree.rootCid.tryGet() + proof = tree.getProof(0).tryGet() + + (await repo.putBlock(blk)).tryGet() + (await repo.putCidAndProof(treeCid, 0, blk.cid, proof)).tryGet() + + let err = (await repo.delBlock(blk.cid)).error() + check err.msg == + "Directly deleting a block that is part of a dataset is not allowed." + + test "should allow non-orphan blocks to be deleted by dataset reference": + let + repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = + 1000'nb) + dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb) + blk = dataset[0] + (manifest, tree) = makeManifestAndTree(dataset).tryGet() + treeCid = tree.rootCid.tryGet() + proof = tree.getProof(0).tryGet() + + (await repo.putBlock(blk)).tryGet() + (await repo.putCidAndProof(treeCid, 0, blk.cid, proof)).tryGet() + + (await repo.delBlock(treeCid, 0.Natural)).tryGet() + check not (await blk.cid in repo) + + test "should not delete a non-orphan block until it is deleted from all parent datasets": + let + repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = + 1000'nb) + blockPool = await makeRandomBlocks(datasetSize = 768, blockSize = 256'nb) + + let + dataset1 = @[blockPool[0], blockPool[1]] + dataset2 = @[blockPool[1], blockPool[2]] + + let sharedBlock = blockPool[1] + + let + (manifest1, tree1) = makeManifestAndTree(dataset1).tryGet() + treeCid1 = tree1.rootCid.tryGet() + (manifest2, tree2) = makeManifestAndTree(dataset2).tryGet() + treeCid2 = tree2.rootCid.tryGet() + + (await repo.putBlock(sharedBlock)).tryGet() + check (await repo.blockRefCount(sharedBlock.cid)).tryGet() == 0.Natural + + let + proof1 = tree1.getProof(1).tryGet() + proof2 = tree2.getProof(0).tryGet() + + (await repo.putCidAndProof(treeCid1, 1, sharedBlock.cid, proof1)).tryGet() + check (await repo.blockRefCount(sharedBlock.cid)).tryGet() == 1.Natural + + (await repo.putCidAndProof(treeCid2, 0, sharedBlock.cid, proof2)).tryGet() + check (await repo.blockRefCount(sharedBlock.cid)).tryGet() == 2.Natural + + (await repo.delBlock(treeCid1, 1.Natural)).tryGet() + check (await repo.blockRefCount(sharedBlock.cid)).tryGet() == 1.Natural + check (await sharedBlock.cid in repo) + + (await repo.delBlock(treeCid2, 0.Natural)).tryGet() + check not (await sharedBlock.cid in repo) + + test "should clear leaf metadata when block is deleted from dataset": + let + repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = + 1000'nb) + dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb) + blk = dataset[0] + (manifest, tree) = makeManifestAndTree(dataset).tryGet() + treeCid = tree.rootCid.tryGet() + proof = tree.getProof(1).tryGet() + + (await repo.putBlock(blk)).tryGet() + (await repo.putCidAndProof(treeCid, 0.Natural, blk.cid, proof)).tryGet() + + discard (await repo.getLeafMetadata(treeCid, 0.Natural)).tryGet() + + (await repo.delBlock(treeCid, 0.Natural)).tryGet() + + let err = (await repo.getLeafMetadata(treeCid, 0.Natural)).error() + check err of BlockNotFoundError + + test "should not fail when reinserting and deleting a previously deleted block (bug #1108)": + let + repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = + 1000'nb) + dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb) + blk = dataset[0] + (manifest, tree) = makeManifestAndTree(dataset).tryGet() + treeCid = tree.rootCid.tryGet() + proof = tree.getProof(1).tryGet() + + (await repo.putBlock(blk)).tryGet() + (await repo.putCidAndProof(treeCid, 0, blk.cid, proof)).tryGet() + + (await repo.delBlock(treeCid, 0.Natural)).tryGet() + (await repo.putBlock(blk)).tryGet() + (await repo.delBlock(treeCid, 0.Natural)).tryGet() + commonBlockStoreTests( "RepoStore Sql backend", proc(): BlockStore = diff --git a/tests/codex/testerasure.nim b/tests/codex/testerasure.nim index 952497e9..d469b379 100644 --- a/tests/codex/testerasure.nim +++ b/tests/codex/testerasure.nim @@ -1,5 +1,6 @@ import std/sequtils import std/sugar +import std/times import pkg/chronos import pkg/questionable/results @@ -11,6 +12,8 @@ import pkg/codex/blocktype as bt import pkg/codex/rng import pkg/codex/utils import pkg/codex/indexingstrategy +import pkg/taskpools +import pkg/codex/utils/arrayutils import ../asynctest import ./helpers @@ -27,6 +30,7 @@ suite "Erasure encode/decode": var erasure: Erasure let repoTmp = TempLevelDb.new() let metaTmp = TempLevelDb.new() + var taskpool: Taskpool setup: let @@ -35,12 +39,14 @@ suite "Erasure encode/decode": rng = Rng.instance() chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize) store = RepoStore.new(repoDs, metaDs) - erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) + taskpool = Taskpool.new() + erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool) manifest = await storeDataGetManifest(store, chunker) teardown: await repoTmp.destroyDb() await metaTmp.destroyDb() + taskpool.shutdown() proc encode(buffers, parity: int): Future[Manifest] {.async.} = let encoded = @@ -212,7 +218,7 @@ suite "Erasure encode/decode": let present = await store.hasBlock(manifest.treeCid, d) check present.tryGet() - test "handles edge case of 0 parity blocks": + test "Handles edge case of 0 parity blocks": const buffers = 20 parity = 0 @@ -221,6 +227,43 @@ suite "Erasure encode/decode": discard (await erasure.decode(encoded)).tryGet() + test "Should concurrently encode/decode multiple datasets": + const iterations = 2 + + let + datasetSize = 1.MiBs + ecK = 10.Natural + ecM = 10.Natural + + var encodeTasks = newSeq[Future[?!Manifest]]() + var decodeTasks = newSeq[Future[?!Manifest]]() + var manifests = newSeq[Manifest]() + for i in 0 ..< iterations: + let + # create random data and store it + blockSize = rng.sample(@[1, 2, 4, 8, 16, 32, 64].mapIt(it.KiBs)) + chunker = RandomChunker.new(rng, size = datasetSize, chunkSize = blockSize) + manifest = await storeDataGetManifest(store, chunker) + manifests.add(manifest) + # encode the data concurrently + encodeTasks.add(erasure.encode(manifest, ecK, ecM)) + # wait for all encoding tasks to finish + let encodeResults = await allFinished(encodeTasks) + # decode the data concurrently + for i in 0 ..< encodeResults.len: + decodeTasks.add(erasure.decode(encodeResults[i].read().tryGet())) + # wait for all decoding tasks to finish + let decodeResults = await allFinished(decodeTasks) # TODO: use allFutures + + for j in 0 ..< decodeTasks.len: + let + decoded = decodeResults[j].read().tryGet() + encoded = encodeResults[j].read().tryGet() + check: + decoded.treeCid == manifests[j].treeCid + decoded.treeCid == encoded.originalTreeCid + decoded.blocksCount == encoded.originalBlocksCount + test "Should handle verifiable manifests": const buffers = 20 @@ -259,3 +302,73 @@ suite "Erasure encode/decode": decoded.treeCid == manifest.treeCid decoded.treeCid == encoded.originalTreeCid decoded.blocksCount == encoded.originalBlocksCount + + test "Should complete encode/decode task when cancelled": + let + blocksLen = 10000 + parityLen = 10 + data = seq[seq[byte]].new() + chunker = RandomChunker.new( + rng, size = (blocksLen * BlockSize.int), chunkSize = BlockSize + ) + + data[].setLen(blocksLen) + + for i in 0 ..< blocksLen: + let chunk = await chunker.getBytes() + shallowCopy(data[i], @(chunk)) + + let + parity = createDoubleArray(parityLen, BlockSize.int) + paritySeq = seq[seq[byte]].new() + recovered = createDoubleArray(blocksLen, BlockSize.int) + cancelledTaskParity = createDoubleArray(parityLen, BlockSize.int) + cancelledTaskRecovered = createDoubleArray(blocksLen, BlockSize.int) + + paritySeq[].setLen(parityLen) + defer: + freeDoubleArray(parity, parityLen) + freeDoubleArray(cancelledTaskParity, parityLen) + freeDoubleArray(recovered, blocksLen) + freeDoubleArray(cancelledTaskRecovered, blocksLen) + + for i in 0 ..< parityLen: + paritySeq[i] = cast[seq[byte]](parity[i]) + + # call encodeAsync to get the parity + let encFut = + await erasure.encodeAsync(BlockSize.int, blocksLen, parityLen, data, parity) + check encFut.isOk + + let decFut = await erasure.decodeAsync( + BlockSize.int, blocksLen, parityLen, data, paritySeq, recovered + ) + check decFut.isOk + + # call encodeAsync and cancel the task + let encodeFut = erasure.encodeAsync( + BlockSize.int, blocksLen, parityLen, data, cancelledTaskParity + ) + encodeFut.cancel() + + try: + discard await encodeFut + except CatchableError as exc: + check exc of CancelledError + finally: + for i in 0 ..< parityLen: + check equalMem(parity[i], cancelledTaskParity[i], BlockSize.int) + + # call decodeAsync and cancel the task + let decodeFut = erasure.decodeAsync( + BlockSize.int, blocksLen, parityLen, data, paritySeq, cancelledTaskRecovered + ) + decodeFut.cancel() + + try: + discard await decodeFut + except CatchableError as exc: + check exc of CancelledError + finally: + for i in 0 ..< blocksLen: + check equalMem(recovered[i], cancelledTaskRecovered[i], BlockSize.int) diff --git a/tests/contracts/helpers/mockprovider.nim b/tests/contracts/helpers/mockprovider.nim index 09e65398..c5be8ad7 100644 --- a/tests/contracts/helpers/mockprovider.nim +++ b/tests/contracts/helpers/mockprovider.nim @@ -13,7 +13,7 @@ type MockProvider* = ref object of Provider method getBlock*( provider: MockProvider, tag: BlockTag -): Future[?Block] {.async: (raises: [ProviderError]).} = +): Future[?Block] {.async: (raises: [ProviderError, CancelledError]).} = try: if tag == BlockTag.latest: if latestBlock =? provider.latest: diff --git a/tests/contracts/testDeployment.nim b/tests/contracts/testDeployment.nim index a439e42a..86a5fe00 100644 --- a/tests/contracts/testDeployment.nim +++ b/tests/contracts/testDeployment.nim @@ -12,7 +12,7 @@ type MockProvider = ref object of Provider method getChainId*( provider: MockProvider -): Future[UInt256] {.async: (raises: [ProviderError]).} = +): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} = return provider.chainId proc configFactory(): CodexConf = diff --git a/tests/contracts/testMarket.nim b/tests/contracts/testMarket.nim index a77c2aaa..6506a2d6 100644 --- a/tests/contracts/testMarket.nim +++ b/tests/contracts/testMarket.nim @@ -3,6 +3,8 @@ import std/importutils import pkg/chronos import pkg/ethers/erc20 import codex/contracts +import pkg/libp2p/cid +import pkg/lrucache import ../ethertest import ./examples import ./time @@ -591,3 +593,13 @@ ethersuite "On-Chain Market": let expectedPayout = request.expectedPayout(filledAt, requestEnd.u256) check endBalanceHost == (startBalanceHost + request.ask.collateralPerSlot) check endBalanceReward == (startBalanceReward + expectedPayout) + + test "the request is added in cache after the fist access": + await market.requestStorage(request) + + check market.requestCache.contains($request.id) == false + discard await market.getRequest(request.id) + + check market.requestCache.contains($request.id) == true + let cacheValue = market.requestCache[$request.id] + check cacheValue == request diff --git a/tests/examples.nim b/tests/examples.nim index bfb34cff..26013cdc 100644 --- a/tests/examples.nim +++ b/tests/examples.nim @@ -57,7 +57,7 @@ proc example*(_: type StorageRequest): StorageRequest = maxSlotLoss: 2, # 2 slots can be freed without data considered to be lost ), content: StorageContent( - cid: "zb2rhheVmk3bLks5MgzTqyznLu1zqGH5jrfTA1eAZXrjx7Vob", + cid: Cid.init("zb2rhheVmk3bLks5MgzTqyznLu1zqGH5jrfTA1eAZXrjx7Vob").tryGet, merkleRoot: array[32, byte].example, ), expiry: (60 * 60).u256, # 1 hour , @@ -86,8 +86,7 @@ proc example(_: type G2Point): G2Point = proc example*(_: type Groth16Proof): Groth16Proof = Groth16Proof(a: G1Point.example, b: G2Point.example, c: G1Point.example) -proc example*(_: type RandomChunker, blocks: int): Future[string] {.async.} = - # doAssert blocks >= 3, "must be more than 3 blocks" +proc example*(_: type RandomChunker, blocks: int): Future[seq[byte]] {.async.} = let rng = Rng.instance() let chunker = RandomChunker.new( rng, size = DefaultBlockSize * blocks.NBytes, chunkSize = DefaultBlockSize @@ -95,7 +94,7 @@ proc example*(_: type RandomChunker, blocks: int): Future[string] {.async.} = var data: seq[byte] while (let moar = await chunker.getBytes(); moar != []): data.add moar - return byteutils.toHex(data) + return data proc example*(_: type RandomChunker): Future[string] {.async.} = await RandomChunker.example(3) diff --git a/tests/integration/codexclient.nim b/tests/integration/codexclient.nim index 5d642adf..316660de 100644 --- a/tests/integration/codexclient.nim +++ b/tests/integration/codexclient.nim @@ -44,6 +44,9 @@ proc upload*(client: CodexClient, contents: string): ?!Cid = assert response.status == "200 OK" Cid.init(response.body).mapFailure +proc upload*(client: CodexClient, bytes: seq[byte]): ?!Cid = + client.upload(string.fromBytes(bytes)) + proc download*(client: CodexClient, cid: Cid, local = false): ?!string = let response = client.http.get( client.baseurl & "/data/" & $cid & (if local: "" else: "/network/stream") @@ -83,6 +86,16 @@ proc downloadBytes*( success bytes +proc delete*(client: CodexClient, cid: Cid): ?!void = + let + url = client.baseurl & "/data/" & $cid + response = client.http.delete(url) + + if response.status != "204 No Content": + return failure(response.status) + + success() + proc list*(client: CodexClient): ?!RestContentList = let url = client.baseurl & "/data" let response = client.http.get(url) @@ -312,3 +325,6 @@ proc downloadRaw*(client: CodexClient, cid: string, local = false): Response = client.baseurl & "/data/" & cid & (if local: "" else: "/network/stream"), httpMethod = HttpGet, ) + +proc deleteRaw*(client: CodexClient, cid: string): Response = + return client.http.request(client.baseurl & "/data/" & cid, httpMethod = HttpDelete) diff --git a/tests/integration/marketplacesuite.nim b/tests/integration/marketplacesuite.nim index 4d155186..68283ad1 100644 --- a/tests/integration/marketplacesuite.nim +++ b/tests/integration/marketplacesuite.nim @@ -4,6 +4,7 @@ from pkg/libp2p import Cid import pkg/codex/contracts/marketplace as mp import pkg/codex/periods import pkg/codex/utils/json +from pkg/codex/utils import roundUp, divUp import ./multinodes import ../contracts/time import ../contracts/deployment @@ -45,11 +46,14 @@ template marketplacesuite*(name: string, body: untyped) = proc periods(p: int): uint64 = p.uint64 * period - proc slotSize(blocks: int): UInt256 = - (DefaultBlockSize * blocks.NBytes).Natural.u256 + proc slotSize(blocks, nodes, tolerance: int): UInt256 = + let ecK = nodes - tolerance + let blocksRounded = roundUp(blocks, ecK) + let blocksPerSlot = divUp(blocksRounded, ecK) + (DefaultBlockSize * blocksPerSlot.NBytes).Natural.u256 proc datasetSize(blocks, nodes, tolerance: int): UInt256 = - (nodes + tolerance).u256 * slotSize(blocks) + return nodes.u256 * slotSize(blocks, nodes, tolerance) proc createAvailabilities( datasetSize: UInt256, diff --git a/tests/integration/testecbug.nim b/tests/integration/testecbug.nim index e7604de7..29a3bc6f 100644 --- a/tests/integration/testecbug.nim +++ b/tests/integration/testecbug.nim @@ -50,7 +50,7 @@ marketplacesuite "Bug #821 - node crashes during erasure coding": check eventually(requestId.isSome, timeout = expiry.int * 1000) let request = await marketplace.getRequest(requestId.get) - let cidFromRequest = Cid.init(request.content.cid).get() + let cidFromRequest = request.content.cid let downloaded = await clientApi.downloadBytes(cidFromRequest, local = true) check downloaded.isOk check downloaded.get.toHex == data.toHex diff --git a/tests/integration/testmarketplace.nim b/tests/integration/testmarketplace.nim index 9705426c..f40aba7e 100644 --- a/tests/integration/testmarketplace.nim +++ b/tests/integration/testmarketplace.nim @@ -114,7 +114,7 @@ marketplacesuite "Marketplace": await ethProvider.advanceTime(duration) # Checking that the hosting node received reward for at least the time between - let slotSize = slotSize(blocks) + let slotSize = slotSize(blocks, ecNodes, ecTolerance) let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize check eventually (await token.balanceOf(hostAccount)) - startBalanceHost >= (duration - 5 * 60) * pricePerSlotPerSecond * ecNodes.u256 @@ -246,7 +246,7 @@ marketplacesuite "Marketplace payouts": await advanceToNextPeriod() - let slotSize = slotSize(blocks) + let slotSize = slotSize(blocks, ecNodes, ecTolerance) let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize check eventually ( diff --git a/tests/integration/testproofs.nim b/tests/integration/testproofs.nim index e654fcc5..abfe5004 100644 --- a/tests/integration/testproofs.nim +++ b/tests/integration/testproofs.nim @@ -1,7 +1,6 @@ from std/times import inMilliseconds import pkg/questionable import pkg/codex/logutils -import pkg/stew/byteutils import ../contracts/time import ../contracts/deployment import ../codex/helpers @@ -60,8 +59,7 @@ marketplacesuite "Hosts submit regular proofs": let purchase = client0.getPurchase(purchaseId).get check purchase.error == none string - let request = purchase.request.get - let slotSize = request.ask.slotSize + let slotSize = slotSize(blocks, ecNodes, ecTolerance) check eventually( client0.purchaseStateIs(purchaseId, "started"), timeout = expiry.int * 1000 diff --git a/tests/integration/testpurchasing.nim b/tests/integration/testpurchasing.nim index 4e08e7a8..ebae78f6 100644 --- a/tests/integration/testpurchasing.nim +++ b/tests/integration/testpurchasing.nim @@ -47,6 +47,8 @@ twonodessuite "Purchasing": ).get let request = client1.getPurchase(id).get.request.get + + check request.content.cid.data.buffer.len > 0 check request.ask.duration == 100.u256 check request.ask.pricePerBytePerSecond == 1.u256 check request.ask.proofProbability == 3.u256 diff --git a/tests/integration/testrestapi.nim b/tests/integration/testrestapi.nim index 1a7e7fcc..32ad7eb7 100644 --- a/tests/integration/testrestapi.nim +++ b/tests/integration/testrestapi.nim @@ -1,9 +1,13 @@ import std/httpclient import std/sequtils -from pkg/libp2p import `==` +import std/strformat +from pkg/libp2p import `==`, `$`, Cid import pkg/codex/units +import pkg/codex/manifest import ./twonodes import ../examples +import ../codex/examples +import ../codex/slots/helpers import json twonodessuite "REST API": @@ -35,7 +39,7 @@ twonodessuite "REST API": check: space.totalBlocks == 2 space.quotaMaxBytes == 8589934592.NBytes - space.quotaUsedBytes == 65598.NBytes + space.quotaUsedBytes == 65592.NBytes space.quotaReservedBytes == 12.NBytes test "node lists local files", twoNodesConfig: @@ -145,18 +149,19 @@ twonodessuite "REST API": check responseBefore.body == "Invalid parameters: `tolerance` cannot be greater than `nodes`" - test "request storage succeeds if nodes and tolerance within range", twoNodesConfig: - let data = await RandomChunker.example(blocks = 2) - let cid = client1.upload(data).get - let duration = 100.u256 - let pricePerBytePerSecond = 1.u256 - let proofProbability = 3.u256 - let expiry = 30.uint - let collateralPerByte = 1.u256 - let ecParams = @[(3, 1), (5, 2)] - - for ecParam in ecParams: - let (nodes, tolerance) = ecParam + for ecParams in @[ + (minBlocks: 2, nodes: 3, tolerance: 1), (minBlocks: 3, nodes: 5, tolerance: 2) + ]: + let (minBlocks, nodes, tolerance) = ecParams + test "request storage succeeds if nodes and tolerance within range " & + fmt"({minBlocks=}, {nodes=}, {tolerance=})", twoNodesConfig: + let data = await RandomChunker.example(blocks = minBlocks) + let cid = client1.upload(data).get + let duration = 100.u256 + let pricePerBytePerSecond = 1.u256 + let proofProbability = 3.u256 + let expiry = 30.uint + let collateralPerByte = 1.u256 var responseBefore = client1.requestStorageRaw( cid, duration, pricePerBytePerSecond, proofProbability, collateralPerByte, @@ -201,7 +206,7 @@ twonodessuite "REST API": let response = client1.uploadRaw("some file contents", headers) check response.status == "422 Unprocessable Entity" - check response.body == "The MIME type is not valid." + check response.body == "The MIME type 'hello/world' is not valid." test "node retrieve the metadata", twoNodesConfig: let headers = newHttpHeaders( @@ -228,8 +233,6 @@ twonodessuite "REST API": check manifest["filename"].getStr() == "example.txt" check manifest.hasKey("mimetype") == true check manifest["mimetype"].getStr() == "text/plain" - check manifest.hasKey("uploadedAt") == true - check manifest["uploadedAt"].getInt() > 0 test "node set the headers when for download", twoNodesConfig: let headers = newHttpHeaders( @@ -262,3 +265,24 @@ twonodessuite "REST API": check localResponse.headers.hasKey("Content-Disposition") == true check localResponse.headers["Content-Disposition"] == "attachment; filename=\"example.txt\"" + + test "should delete a dataset when requested", twoNodesConfig: + let cid = client1.upload("some file contents").get + + var response = client1.downloadRaw($cid, local = true) + check response.body == "some file contents" + + client1.delete(cid).get + + response = client1.downloadRaw($cid, local = true) + check response.status == "404 Not Found" + + test "should return 200 when attempting delete of non-existing block", twoNodesConfig: + let response = client1.deleteRaw($(Cid.example())) + check response.status == "204 No Content" + + test "should return 200 when attempting delete of non-existing dataset", + twoNodesConfig: + let cid = Manifest.example().makeManifestBlock().get.cid + let response = client1.deleteRaw($cid) + check response.status == "204 No Content" diff --git a/tests/integration/testupdownload.nim b/tests/integration/testupdownload.nim index 74bee8c7..05d3a496 100644 --- a/tests/integration/testupdownload.nim +++ b/tests/integration/testupdownload.nim @@ -88,7 +88,7 @@ twonodessuite "Uploads and downloads": let cid = a.upload(data).get let response = b.download(cid).get check: - response == data + @response.mapIt(it.byte) == data for run in 0 .. 10: await transferTest(client1, client2) diff --git a/tests/integration/twonodes.nim b/tests/integration/twonodes.nim index 5666690e..eeceb20d 100644 --- a/tests/integration/twonodes.nim +++ b/tests/integration/twonodes.nim @@ -1,4 +1,3 @@ -import std/os import std/macros import pkg/questionable import ./multinodes diff --git a/vendor/codex-contracts-eth b/vendor/codex-contracts-eth index e74d3397..0f2012b1 160000 --- a/vendor/codex-contracts-eth +++ b/vendor/codex-contracts-eth @@ -1 +1 @@ -Subproject commit e74d3397a133eaf1eb95d9ce59f56747a7c8c30b +Subproject commit 0f2012b1442c404605c8ba9dcae2f4e53058cd2c diff --git a/vendor/nim-ethers b/vendor/nim-ethers index 1cfccb96..d2b11a86 160000 --- a/vendor/nim-ethers +++ b/vendor/nim-ethers @@ -1 +1 @@ -Subproject commit 1cfccb9695fa47860bf7ef3d75da9019096a3933 +Subproject commit d2b11a865796a55296027f8ffba68398035ad435 diff --git a/vendor/nim-leopard b/vendor/nim-leopard index 3e09d811..7506b90f 160000 --- a/vendor/nim-leopard +++ b/vendor/nim-leopard @@ -1 +1 @@ -Subproject commit 3e09d8113f874f3584c3fe93818541b2ff9fb9c3 +Subproject commit 7506b90f9c650c02b96bf525d4fd1bd4942a495f diff --git a/vendor/nim-nat-traversal b/vendor/nim-nat-traversal index 5e405974..6508ce75 160000 --- a/vendor/nim-nat-traversal +++ b/vendor/nim-nat-traversal @@ -1 +1 @@ -Subproject commit 5e4059746e9095e1731b02eeaecd62a70fbe664d +Subproject commit 6508ce75060878dfcdfa21f94721672c69a1823b