diff --git a/codex/codex.nim b/codex/codex.nim index 13985254..6dcfbaaa 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -11,8 +11,10 @@ import std/sequtils import std/strutils import std/os import std/tables +import std/cpuinfo import pkg/chronos +import pkg/taskpools import pkg/presto import pkg/libp2p import pkg/confutils @@ -194,7 +196,18 @@ proc new*( .withTcpTransport({ServerFlags.ReuseAddr}) .build() - var cache: CacheStore = nil + var + cache: CacheStore = nil + taskpool: Taskpool + + try: + if config.numThreads == ThreadCount(0): + taskpool = Taskpool.new(numThreads = min(countProcessors(), 16)) + else: + taskpool = Taskpool.new(numThreads = int(config.numThreads)) + info "Threadpool started", numThreads = taskpool.numThreads + except CatchableError as exc: + raiseAssert("Failure in taskpool initialization:" & exc.msg) if config.cacheSize > 0'nb: cache = CacheStore.new(cacheSize = config.cacheSize) @@ -286,6 +299,7 @@ proc new*( engine = engine, discovery = discovery, prover = prover, + taskPool = taskpool, ) restServer = RestServerRef diff --git a/codex/conf.nim b/codex/conf.nim index 6d47f8f4..ccf29a1f 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -53,6 +53,10 @@ export DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockMaintenanceInterval, DefaultNumberOfBlocksToMaintainPerInterval +type ThreadCount* = distinct Natural + +proc `==`*(a, b: ThreadCount): bool {.borrow.} + proc defaultDataDir*(): string = let dataDir = when defined(windows): @@ -71,6 +75,7 @@ const DefaultDataDir* = defaultDataDir() DefaultCircuitDir* = defaultDataDir() / "circuits" + DefaultThreadCount* = ThreadCount(0) type StartUpCmd* {.pure.} = enum @@ -184,6 +189,13 @@ type name: "max-peers" .}: int + numThreads* {. + desc: + "Number of worker threads (\"0\" = use as many threads as there are CPU cores available)", + defaultValue: DefaultThreadCount, + name: "num-threads" + .}: ThreadCount + agentString* {. defaultValue: "Codex", desc: "Node agent string which is used as identifier in network", @@ -482,6 +494,13 @@ proc parseCmdArg*( quit QuitFailure ma +proc parseCmdArg*(T: type ThreadCount, input: string): T {.upraises: [ValueError].} = + let count = parseInt(input) + if count != 0 and count < 2: + warn "Invalid number of threads", input = input + quit QuitFailure + ThreadCount(count) + proc parseCmdArg*(T: type SignedPeerRecord, uri: string): T = var res: SignedPeerRecord try: @@ -579,6 +598,15 @@ proc readValue*( quit QuitFailure val = NBytes(value) +proc readValue*( + r: var TomlReader, val: var ThreadCount +) {.upraises: [SerializationError, IOError].} = + var str = r.readValue(string) + try: + val = parseCmdArg(ThreadCount, str) + except CatchableError as err: + raise newException(SerializationError, err.msg) + proc readValue*( r: var TomlReader, val: var Duration ) {.upraises: [SerializationError, IOError].} = @@ -609,6 +637,9 @@ proc completeCmdArg*(T: type NBytes, val: string): seq[string] = proc completeCmdArg*(T: type Duration, val: string): seq[string] = discard +proc completeCmdArg*(T: type ThreadCount, val: string): seq[string] = + discard + # silly chronicles, colors is a compile-time property proc stripAnsi*(v: string): string = var diff --git a/codex/erasure/backend.nim b/codex/erasure/backend.nim index a6dd8b8c..32009829 100644 --- a/codex/erasure/backend.nim +++ b/codex/erasure/backend.nim @@ -29,14 +29,18 @@ method release*(self: ErasureBackend) {.base, gcsafe.} = raiseAssert("not implemented!") method encode*( - self: EncoderBackend, buffers, parity: var openArray[seq[byte]] + self: EncoderBackend, + buffers, parity: ptr UncheckedArray[ptr UncheckedArray[byte]], + dataLen, parityLen: int, ): Result[void, cstring] {.base, gcsafe.} = ## encode buffers using a backend ## raiseAssert("not implemented!") method decode*( - self: DecoderBackend, buffers, parity, recovered: var openArray[seq[byte]] + self: DecoderBackend, + buffers, parity, recovered: ptr UncheckedArray[ptr UncheckedArray[byte]], + dataLen, parityLen, recoveredLen: int, ): Result[void, cstring] {.base, gcsafe.} = ## decode buffers using a backend ## diff --git a/codex/erasure/backends/leopard.nim b/codex/erasure/backends/leopard.nim index c9f9db40..ae599f12 100644 --- a/codex/erasure/backends/leopard.nim +++ b/codex/erasure/backends/leopard.nim @@ -22,11 +22,13 @@ type decoder*: Option[LeoDecoder] method encode*( - self: LeoEncoderBackend, data, parity: var openArray[seq[byte]] + self: LeoEncoderBackend, + data, parity: ptr UncheckedArray[ptr UncheckedArray[byte]], + dataLen, parityLen: int, ): Result[void, cstring] = ## Encode data using Leopard backend - if parity.len == 0: + if parityLen == 0: return ok() var encoder = @@ -36,10 +38,12 @@ method encode*( else: self.encoder.get() - encoder.encode(data, parity) + encoder.encode(data, parity, dataLen, parityLen) method decode*( - self: LeoDecoderBackend, data, parity, recovered: var openArray[seq[byte]] + self: LeoDecoderBackend, + data, parity, recovered: ptr UncheckedArray[ptr UncheckedArray[byte]], + dataLen, parityLen, recoveredLen: int, ): Result[void, cstring] = ## Decode data using given Leopard backend @@ -50,7 +54,7 @@ method decode*( else: self.decoder.get() - decoder.decode(data, parity, recovered) + decoder.decode(data, parity, recovered, dataLen, parityLen, recoveredLen) method release*(self: LeoEncoderBackend) = if self.encoder.isSome: diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index aacd187a..107f85bc 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -12,12 +12,14 @@ import pkg/upraises push: {.upraises: [].} -import std/sequtils -import std/sugar +import std/[sugar, atomics, sequtils] import pkg/chronos +import pkg/chronos/threadsync +import pkg/chronicles import pkg/libp2p/[multicodec, cid, multihash] import pkg/libp2p/protobuf/minprotobuf +import pkg/taskpools import ../logutils import ../manifest @@ -28,6 +30,7 @@ import ../utils import ../utils/asynciter import ../indexingstrategy import ../errors +import ../utils/arrayutils import pkg/stew/byteutils @@ -68,6 +71,7 @@ type proc(size, blocks, parity: int): DecoderBackend {.raises: [Defect], noSideEffect.} Erasure* = ref object + taskPool: Taskpool encoderProvider*: EncoderProvider decoderProvider*: DecoderProvider store*: BlockStore @@ -87,6 +91,24 @@ type # provided. minSize*: NBytes + EncodeTask = object + success: Atomic[bool] + erasure: ptr Erasure + blocks: ptr UncheckedArray[ptr UncheckedArray[byte]] + parity: ptr UncheckedArray[ptr UncheckedArray[byte]] + blockSize, blocksLen, parityLen: int + signal: ThreadSignalPtr + + DecodeTask = object + success: Atomic[bool] + erasure: ptr Erasure + blocks: ptr UncheckedArray[ptr UncheckedArray[byte]] + parity: ptr UncheckedArray[ptr UncheckedArray[byte]] + recovered: ptr UncheckedArray[ptr UncheckedArray[byte]] + blockSize, blocksLen: int + parityLen, recoveredLen: int + signal: ThreadSignalPtr + func indexToPos(steps, idx, step: int): int {.inline.} = ## Convert an index to a position in the encoded ## dataset @@ -269,6 +291,81 @@ proc init*( strategy: strategy, ) +proc leopardEncodeTask(tp: Taskpool, task: ptr EncodeTask) {.gcsafe.} = + # Task suitable for running in taskpools - look, no GC! + let encoder = + task[].erasure.encoderProvider(task[].blockSize, task[].blocksLen, task[].parityLen) + defer: + encoder.release() + discard task[].signal.fireSync() + + if ( + let res = + encoder.encode(task[].blocks, task[].parity, task[].blocksLen, task[].parityLen) + res.isErr + ): + warn "Error from leopard encoder backend!", error = $res.error + + task[].success.store(false) + else: + task[].success.store(true) + +proc encodeAsync*( + self: Erasure, + blockSize, blocksLen, parityLen: int, + data: ref seq[seq[byte]], + parity: ptr UncheckedArray[ptr UncheckedArray[byte]], +): Future[?!void] {.async: (raises: [CancelledError]).} = + without threadPtr =? ThreadSignalPtr.new(): + return failure("Unable to create thread signal") + + defer: + threadPtr.close().expect("closing once works") + + var blockData = createDoubleArray(blocksLen, blockSize) + + for i in 0 ..< data[].len: + copyMem(blockData[i], addr data[i][0], blockSize) + + defer: + freeDoubleArray(blockData, blocksLen) + + ## Create an ecode task with block data + var task = EncodeTask( + erasure: addr self, + blockSize: blockSize, + blocksLen: blocksLen, + parityLen: parityLen, + blocks: blockData, + parity: parity, + signal: threadPtr, + ) + + let t = addr task + + doAssert self.taskPool.numThreads > 1, + "Must have at least one separate thread or signal will never be fired" + self.taskPool.spawn leopardEncodeTask(self.taskPool, t) + let threadFut = threadPtr.wait() + + try: + await threadFut.join() + except CatchableError as exc: + try: + await threadFut + except AsyncError as asyncExc: + return failure(asyncExc.msg) + finally: + if exc of CancelledError: + raise (ref CancelledError) exc + else: + return failure(exc.msg) + + if not t.success.load(): + return failure("Leopard encoding failed") + + success() + proc encodeData( self: Erasure, manifest: Manifest, params: EncodingParams ): Future[?!Manifest] {.async.} = @@ -276,7 +373,6 @@ proc encodeData( ## ## `manifest` - the manifest to encode ## - logScope: steps = params.steps rounded_blocks = params.rounded @@ -286,7 +382,6 @@ proc encodeData( var cids = seq[Cid].new() - encoder = self.encoderProvider(manifest.blockSize.int, params.ecK, params.ecM) emptyBlock = newSeq[byte](manifest.blockSize.int) cids[].setLen(params.blocksCount) @@ -296,8 +391,7 @@ proc encodeData( # TODO: Don't allocate a new seq every time, allocate once and zero out var data = seq[seq[byte]].new() # number of blocks to encode - parityData = - newSeqWith[seq[byte]](params.ecM, newSeq[byte](manifest.blockSize.int)) + parity = createDoubleArray(params.ecM, manifest.blockSize.int) data[].setLen(params.ecK) # TODO: this is a tight blocking loop so we sleep here to allow @@ -311,15 +405,25 @@ proc encodeData( trace "Unable to prepare data", error = err.msg return failure(err) - trace "Erasure coding data", data = data[].len, parity = parityData.len + trace "Erasure coding data", data = data[].len - if (let res = encoder.encode(data[], parityData); res.isErr): - trace "Unable to encode manifest!", error = $res.error - return failure($res.error) + try: + if err =? ( + await self.encodeAsync( + manifest.blockSize.int, params.ecK, params.ecM, data, parity + ) + ).errorOption: + return failure(err) + except CancelledError as exc: + raise exc + finally: + freeDoubleArray(parity, params.ecM) var idx = params.rounded + step for j in 0 ..< params.ecM: - without blk =? bt.Block.new(parityData[j]), error: + var innerPtr: ptr UncheckedArray[byte] = parity[][j] + without blk =? bt.Block.new(innerPtr.toOpenArray(0, manifest.blockSize.int - 1)), + error: trace "Unable to create parity block", err = error.msg return failure(error) @@ -356,8 +460,6 @@ proc encodeData( except CatchableError as exc: trace "Erasure coding encoding error", exc = exc.msg return failure(exc) - finally: - encoder.release() proc encode*( self: Erasure, @@ -381,6 +483,101 @@ proc encode*( return success encodedManifest +proc leopardDecodeTask(tp: Taskpool, task: ptr DecodeTask) {.gcsafe.} = + # Task suitable for running in taskpools - look, no GC! + let decoder = + task[].erasure.decoderProvider(task[].blockSize, task[].blocksLen, task[].parityLen) + defer: + decoder.release() + + if ( + let res = decoder.decode( + task[].blocks, + task[].parity, + task[].recovered, + task[].blocksLen, + task[].parityLen, + task[].recoveredLen, + ) + res.isErr + ): + warn "Error from leopard decoder backend!", error = $res.error + task[].success.store(false) + else: + task[].success.store(true) + + discard task[].signal.fireSync() + +proc decodeAsync*( + self: Erasure, + blockSize, blocksLen, parityLen: int, + blocks, parity: ref seq[seq[byte]], + recovered: ptr UncheckedArray[ptr UncheckedArray[byte]], +): Future[?!void] {.async: (raises: [CancelledError]).} = + without threadPtr =? ThreadSignalPtr.new(): + return failure("Unable to create thread signal") + + defer: + threadPtr.close().expect("closing once works") + + var + blocksData = createDoubleArray(blocksLen, blockSize) + parityData = createDoubleArray(parityLen, blockSize) + + for i in 0 ..< blocks[].len: + if blocks[i].len > 0: + copyMem(blocksData[i], addr blocks[i][0], blockSize) + else: + blocksData[i] = nil + + for i in 0 ..< parity[].len: + if parity[i].len > 0: + copyMem(parityData[i], addr parity[i][0], blockSize) + else: + parityData[i] = nil + + defer: + freeDoubleArray(blocksData, blocksLen) + freeDoubleArray(parityData, parityLen) + + ## Create an decode task with block data + var task = DecodeTask( + erasure: addr self, + blockSize: blockSize, + blocksLen: blocksLen, + parityLen: parityLen, + recoveredLen: blocksLen, + blocks: blocksData, + parity: parityData, + recovered: recovered, + signal: threadPtr, + ) + + # Hold the task pointer until the signal is received + let t = addr task + doAssert self.taskPool.numThreads > 1, + "Must have at least one separate thread or signal will never be fired" + self.taskPool.spawn leopardDecodeTask(self.taskPool, t) + let threadFut = threadPtr.wait() + + try: + await threadFut.join() + except CatchableError as exc: + try: + await threadFut + except AsyncError as asyncExc: + return failure(asyncExc.msg) + finally: + if exc of CancelledError: + raise (ref CancelledError) exc + else: + return failure(exc.msg) + + if not t.success.load(): + return failure("Leopard encoding failed") + + success() + proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = ## Decode a protected manifest into it's original ## manifest @@ -388,7 +585,6 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = ## `encoded` - the encoded (protected) manifest to ## be recovered ## - logScope: steps = encoded.steps rounded_blocks = encoded.rounded @@ -411,8 +607,7 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = var data = seq[seq[byte]].new() parityData = seq[seq[byte]].new() - recovered = - newSeqWith[seq[byte]](encoded.ecK, newSeq[byte](encoded.blockSize.int)) + recovered = createDoubleArray(encoded.ecK, encoded.blockSize.int) data[].setLen(encoded.ecK) # set len to K parityData[].setLen(encoded.ecM) # set len to M @@ -430,15 +625,26 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} = continue trace "Erasure decoding data" - - if (let err = decoder.decode(data[], parityData[], recovered); err.isErr): - trace "Unable to decode data!", err = $err.error - return failure($err.error) + try: + if err =? ( + await self.decodeAsync( + encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered + ) + ).errorOption: + return failure(err) + except CancelledError as exc: + raise exc + finally: + freeDoubleArray(recovered, encoded.ecK) for i in 0 ..< encoded.ecK: let idx = i * encoded.steps + step if data[i].len <= 0 and not cids[idx].isEmpty: - without blk =? bt.Block.new(recovered[i]), error: + var innerPtr: ptr UncheckedArray[byte] = recovered[][i] + + without blk =? bt.Block.new( + innerPtr.toOpenArray(0, encoded.blockSize.int - 1) + ), error: trace "Unable to create block!", exc = error.msg return failure(error) @@ -490,10 +696,13 @@ proc new*( store: BlockStore, encoderProvider: EncoderProvider, decoderProvider: DecoderProvider, + taskPool: Taskpool, ): Erasure = ## Create a new Erasure instance for encoding and decoding manifests ## - Erasure( - store: store, encoderProvider: encoderProvider, decoderProvider: decoderProvider + store: store, + encoderProvider: encoderProvider, + decoderProvider: decoderProvider, + taskPool: taskPool, ) diff --git a/codex/node.nim b/codex/node.nim index b90d6a9e..2602bfe6 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -15,6 +15,7 @@ import std/strformat import std/sugar import times +import pkg/taskpools import pkg/questionable import pkg/questionable/results import pkg/chronos @@ -70,6 +71,7 @@ type contracts*: Contracts clock*: Clock storage*: Contracts + taskpool: Taskpool CodexNodeRef* = ref CodexNode @@ -235,8 +237,9 @@ proc streamEntireDataset( # Retrieve, decode and save to the local store all EС groups proc erasureJob(): Future[?!void] {.async.} = # Spawn an erasure decoding job - let erasure = - Erasure.new(self.networkStore, leoEncoderProvider, leoDecoderProvider) + let erasure = Erasure.new( + self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + ) without _ =? (await erasure.decode(manifest)), error: error "Unable to erasure decode manifest", manifestCid, exc = error.msg return failure(error) @@ -461,8 +464,9 @@ proc setupRequest( return failure error # Erasure code the dataset according to provided parameters - let erasure = - Erasure.new(self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider) + let erasure = Erasure.new( + self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskpool + ) without encoded =? (await erasure.encode(manifest, ecK, ecM)), error: trace "Unable to erasure code dataset" @@ -782,12 +786,16 @@ proc stop*(self: CodexNodeRef) {.async.} = if not self.networkStore.isNil: await self.networkStore.close + if not self.taskpool.isNil: + self.taskpool.shutdown() + proc new*( T: type CodexNodeRef, switch: Switch, networkStore: NetworkStore, engine: BlockExcEngine, discovery: Discovery, + taskpool: Taskpool, prover = Prover.none, contracts = Contracts.default, ): CodexNodeRef = @@ -800,5 +808,6 @@ proc new*( engine: engine, prover: prover, discovery: discovery, + taskPool: taskpool, contracts: contracts, ) diff --git a/codex/utils/arrayutils.nim b/codex/utils/arrayutils.nim new file mode 100644 index 00000000..c398921f --- /dev/null +++ b/codex/utils/arrayutils.nim @@ -0,0 +1,25 @@ +import std/sequtils + +proc createDoubleArray*( + outerLen, innerLen: int +): ptr UncheckedArray[ptr UncheckedArray[byte]] = + # Allocate outer array + result = cast[ptr UncheckedArray[ptr UncheckedArray[byte]]](allocShared0( + sizeof(ptr UncheckedArray[byte]) * outerLen + )) + + # Allocate each inner array + for i in 0 ..< outerLen: + result[i] = cast[ptr UncheckedArray[byte]](allocShared0(sizeof(byte) * innerLen)) + +proc freeDoubleArray*( + arr: ptr UncheckedArray[ptr UncheckedArray[byte]], outerLen: int +) = + # Free each inner array + for i in 0 ..< outerLen: + if not arr[i].isNil: + deallocShared(arr[i]) + + # Free outer array + if not arr.isNil: + deallocShared(arr) diff --git a/tests/codex/node/helpers.nim b/tests/codex/node/helpers.nim index 0d72b06b..2d1a87dc 100644 --- a/tests/codex/node/helpers.nim +++ b/tests/codex/node/helpers.nim @@ -6,6 +6,7 @@ import pkg/chronos import pkg/codex/codextypes import pkg/codex/chunker import pkg/codex/stores +import pkg/taskpools import ../../asynctest @@ -118,6 +119,7 @@ template setupAndTearDown*() {.dirty.} = engine = engine, prover = Prover.none, discovery = blockDiscovery, + taskpool = Taskpool.new(), ) teardown: diff --git a/tests/codex/node/testcontracts.nim b/tests/codex/node/testcontracts.nim index cce6d5bd..52adb5f6 100644 --- a/tests/codex/node/testcontracts.nim +++ b/tests/codex/node/testcontracts.nim @@ -75,7 +75,7 @@ asyncchecksuite "Test Node - Host contracts": let manifestBlock = bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() - erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) + erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new) manifestCid = manifestBlock.cid manifestCidStr = $(manifestCid) diff --git a/tests/codex/node/testnode.nim b/tests/codex/node/testnode.nim index b9450f40..3f9a141a 100644 --- a/tests/codex/node/testnode.nim +++ b/tests/codex/node/testnode.nim @@ -12,6 +12,7 @@ import pkg/questionable/results import pkg/stint import pkg/poseidon2 import pkg/poseidon2/io +import pkg/taskpools import pkg/nitro import pkg/codexdht/discv5/protocol as discv5 @@ -67,7 +68,7 @@ asyncchecksuite "Test Node - Basic": # https://github.com/codex-storage/nim-codex/issues/699 let cstore = CountingStore.new(engine, localStore) - node = CodexNodeRef.new(switch, cstore, engine, blockDiscovery) + node = CodexNodeRef.new(switch, cstore, engine, blockDiscovery, Taskpool.new()) missingCid = Cid.init("zDvZRwzmCvtiyubW9AecnxgLnXK8GrBvpQJBDzToxmzDN6Nrc2CZ").get() @@ -138,7 +139,8 @@ asyncchecksuite "Test Node - Basic": test "Setup purchase request": let - erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) + erasure = + Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new()) manifest = await storeDataGetManifest(localStore, chunker) manifestBlock = bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet() diff --git a/tests/codex/testerasure.nim b/tests/codex/testerasure.nim index 952497e9..d469b379 100644 --- a/tests/codex/testerasure.nim +++ b/tests/codex/testerasure.nim @@ -1,5 +1,6 @@ import std/sequtils import std/sugar +import std/times import pkg/chronos import pkg/questionable/results @@ -11,6 +12,8 @@ import pkg/codex/blocktype as bt import pkg/codex/rng import pkg/codex/utils import pkg/codex/indexingstrategy +import pkg/taskpools +import pkg/codex/utils/arrayutils import ../asynctest import ./helpers @@ -27,6 +30,7 @@ suite "Erasure encode/decode": var erasure: Erasure let repoTmp = TempLevelDb.new() let metaTmp = TempLevelDb.new() + var taskpool: Taskpool setup: let @@ -35,12 +39,14 @@ suite "Erasure encode/decode": rng = Rng.instance() chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize) store = RepoStore.new(repoDs, metaDs) - erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) + taskpool = Taskpool.new() + erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool) manifest = await storeDataGetManifest(store, chunker) teardown: await repoTmp.destroyDb() await metaTmp.destroyDb() + taskpool.shutdown() proc encode(buffers, parity: int): Future[Manifest] {.async.} = let encoded = @@ -212,7 +218,7 @@ suite "Erasure encode/decode": let present = await store.hasBlock(manifest.treeCid, d) check present.tryGet() - test "handles edge case of 0 parity blocks": + test "Handles edge case of 0 parity blocks": const buffers = 20 parity = 0 @@ -221,6 +227,43 @@ suite "Erasure encode/decode": discard (await erasure.decode(encoded)).tryGet() + test "Should concurrently encode/decode multiple datasets": + const iterations = 2 + + let + datasetSize = 1.MiBs + ecK = 10.Natural + ecM = 10.Natural + + var encodeTasks = newSeq[Future[?!Manifest]]() + var decodeTasks = newSeq[Future[?!Manifest]]() + var manifests = newSeq[Manifest]() + for i in 0 ..< iterations: + let + # create random data and store it + blockSize = rng.sample(@[1, 2, 4, 8, 16, 32, 64].mapIt(it.KiBs)) + chunker = RandomChunker.new(rng, size = datasetSize, chunkSize = blockSize) + manifest = await storeDataGetManifest(store, chunker) + manifests.add(manifest) + # encode the data concurrently + encodeTasks.add(erasure.encode(manifest, ecK, ecM)) + # wait for all encoding tasks to finish + let encodeResults = await allFinished(encodeTasks) + # decode the data concurrently + for i in 0 ..< encodeResults.len: + decodeTasks.add(erasure.decode(encodeResults[i].read().tryGet())) + # wait for all decoding tasks to finish + let decodeResults = await allFinished(decodeTasks) # TODO: use allFutures + + for j in 0 ..< decodeTasks.len: + let + decoded = decodeResults[j].read().tryGet() + encoded = encodeResults[j].read().tryGet() + check: + decoded.treeCid == manifests[j].treeCid + decoded.treeCid == encoded.originalTreeCid + decoded.blocksCount == encoded.originalBlocksCount + test "Should handle verifiable manifests": const buffers = 20 @@ -259,3 +302,73 @@ suite "Erasure encode/decode": decoded.treeCid == manifest.treeCid decoded.treeCid == encoded.originalTreeCid decoded.blocksCount == encoded.originalBlocksCount + + test "Should complete encode/decode task when cancelled": + let + blocksLen = 10000 + parityLen = 10 + data = seq[seq[byte]].new() + chunker = RandomChunker.new( + rng, size = (blocksLen * BlockSize.int), chunkSize = BlockSize + ) + + data[].setLen(blocksLen) + + for i in 0 ..< blocksLen: + let chunk = await chunker.getBytes() + shallowCopy(data[i], @(chunk)) + + let + parity = createDoubleArray(parityLen, BlockSize.int) + paritySeq = seq[seq[byte]].new() + recovered = createDoubleArray(blocksLen, BlockSize.int) + cancelledTaskParity = createDoubleArray(parityLen, BlockSize.int) + cancelledTaskRecovered = createDoubleArray(blocksLen, BlockSize.int) + + paritySeq[].setLen(parityLen) + defer: + freeDoubleArray(parity, parityLen) + freeDoubleArray(cancelledTaskParity, parityLen) + freeDoubleArray(recovered, blocksLen) + freeDoubleArray(cancelledTaskRecovered, blocksLen) + + for i in 0 ..< parityLen: + paritySeq[i] = cast[seq[byte]](parity[i]) + + # call encodeAsync to get the parity + let encFut = + await erasure.encodeAsync(BlockSize.int, blocksLen, parityLen, data, parity) + check encFut.isOk + + let decFut = await erasure.decodeAsync( + BlockSize.int, blocksLen, parityLen, data, paritySeq, recovered + ) + check decFut.isOk + + # call encodeAsync and cancel the task + let encodeFut = erasure.encodeAsync( + BlockSize.int, blocksLen, parityLen, data, cancelledTaskParity + ) + encodeFut.cancel() + + try: + discard await encodeFut + except CatchableError as exc: + check exc of CancelledError + finally: + for i in 0 ..< parityLen: + check equalMem(parity[i], cancelledTaskParity[i], BlockSize.int) + + # call decodeAsync and cancel the task + let decodeFut = erasure.decodeAsync( + BlockSize.int, blocksLen, parityLen, data, paritySeq, cancelledTaskRecovered + ) + decodeFut.cancel() + + try: + discard await decodeFut + except CatchableError as exc: + check exc of CancelledError + finally: + for i in 0 ..< blocksLen: + check equalMem(recovered[i], cancelledTaskRecovered[i], BlockSize.int) diff --git a/vendor/nim-leopard b/vendor/nim-leopard index 3e09d811..7506b90f 160000 --- a/vendor/nim-leopard +++ b/vendor/nim-leopard @@ -1 +1 @@ -Subproject commit 3e09d8113f874f3584c3fe93818541b2ff9fb9c3 +Subproject commit 7506b90f9c650c02b96bf525d4fd1bd4942a495f