feat: multithreading support for erasure coding (#1087)

* implement async encode

* implement async decode

* cleanup code

* add num-threads flag

* fix tests

* code cleanup

* improve return types and exception handling for async proc

* add validation check for numThreads flag

* modify encode method

* add new tests for aync encoding

* modify decode method

* cleanup test cases

* add new cli flag for threadCount

* test cleanup

* add new tests

* fix decodeAsync exception handling

* code cleanup

* chore: cosmetic changes
This commit is contained in:
munna0908 2025-02-12 23:26:26 +05:30 committed by GitHub
parent 45e97513a7
commit c65148822e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 454 additions and 41 deletions

View File

@ -11,8 +11,10 @@ import std/sequtils
import std/strutils
import std/os
import std/tables
import std/cpuinfo
import pkg/chronos
import pkg/taskpools
import pkg/presto
import pkg/libp2p
import pkg/confutils
@ -194,7 +196,18 @@ proc new*(
.withTcpTransport({ServerFlags.ReuseAddr})
.build()
var cache: CacheStore = nil
var
cache: CacheStore = nil
taskpool: Taskpool
try:
if config.numThreads == ThreadCount(0):
taskpool = Taskpool.new(numThreads = min(countProcessors(), 16))
else:
taskpool = Taskpool.new(numThreads = int(config.numThreads))
info "Threadpool started", numThreads = taskpool.numThreads
except CatchableError as exc:
raiseAssert("Failure in taskpool initialization:" & exc.msg)
if config.cacheSize > 0'nb:
cache = CacheStore.new(cacheSize = config.cacheSize)
@ -286,6 +299,7 @@ proc new*(
engine = engine,
discovery = discovery,
prover = prover,
taskPool = taskpool,
)
restServer = RestServerRef

View File

@ -53,6 +53,10 @@ export
DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockMaintenanceInterval,
DefaultNumberOfBlocksToMaintainPerInterval
type ThreadCount* = distinct Natural
proc `==`*(a, b: ThreadCount): bool {.borrow.}
proc defaultDataDir*(): string =
let dataDir =
when defined(windows):
@ -71,6 +75,7 @@ const
DefaultDataDir* = defaultDataDir()
DefaultCircuitDir* = defaultDataDir() / "circuits"
DefaultThreadCount* = ThreadCount(0)
type
StartUpCmd* {.pure.} = enum
@ -184,6 +189,13 @@ type
name: "max-peers"
.}: int
numThreads* {.
desc:
"Number of worker threads (\"0\" = use as many threads as there are CPU cores available)",
defaultValue: DefaultThreadCount,
name: "num-threads"
.}: ThreadCount
agentString* {.
defaultValue: "Codex",
desc: "Node agent string which is used as identifier in network",
@ -482,6 +494,13 @@ proc parseCmdArg*(
quit QuitFailure
ma
proc parseCmdArg*(T: type ThreadCount, input: string): T {.upraises: [ValueError].} =
let count = parseInt(input)
if count != 0 and count < 2:
warn "Invalid number of threads", input = input
quit QuitFailure
ThreadCount(count)
proc parseCmdArg*(T: type SignedPeerRecord, uri: string): T =
var res: SignedPeerRecord
try:
@ -579,6 +598,15 @@ proc readValue*(
quit QuitFailure
val = NBytes(value)
proc readValue*(
r: var TomlReader, val: var ThreadCount
) {.upraises: [SerializationError, IOError].} =
var str = r.readValue(string)
try:
val = parseCmdArg(ThreadCount, str)
except CatchableError as err:
raise newException(SerializationError, err.msg)
proc readValue*(
r: var TomlReader, val: var Duration
) {.upraises: [SerializationError, IOError].} =
@ -609,6 +637,9 @@ proc completeCmdArg*(T: type NBytes, val: string): seq[string] =
proc completeCmdArg*(T: type Duration, val: string): seq[string] =
discard
proc completeCmdArg*(T: type ThreadCount, val: string): seq[string] =
discard
# silly chronicles, colors is a compile-time property
proc stripAnsi*(v: string): string =
var

View File

@ -29,14 +29,18 @@ method release*(self: ErasureBackend) {.base, gcsafe.} =
raiseAssert("not implemented!")
method encode*(
self: EncoderBackend, buffers, parity: var openArray[seq[byte]]
self: EncoderBackend,
buffers, parity: ptr UncheckedArray[ptr UncheckedArray[byte]],
dataLen, parityLen: int,
): Result[void, cstring] {.base, gcsafe.} =
## encode buffers using a backend
##
raiseAssert("not implemented!")
method decode*(
self: DecoderBackend, buffers, parity, recovered: var openArray[seq[byte]]
self: DecoderBackend,
buffers, parity, recovered: ptr UncheckedArray[ptr UncheckedArray[byte]],
dataLen, parityLen, recoveredLen: int,
): Result[void, cstring] {.base, gcsafe.} =
## decode buffers using a backend
##

View File

@ -22,11 +22,13 @@ type
decoder*: Option[LeoDecoder]
method encode*(
self: LeoEncoderBackend, data, parity: var openArray[seq[byte]]
self: LeoEncoderBackend,
data, parity: ptr UncheckedArray[ptr UncheckedArray[byte]],
dataLen, parityLen: int,
): Result[void, cstring] =
## Encode data using Leopard backend
if parity.len == 0:
if parityLen == 0:
return ok()
var encoder =
@ -36,10 +38,12 @@ method encode*(
else:
self.encoder.get()
encoder.encode(data, parity)
encoder.encode(data, parity, dataLen, parityLen)
method decode*(
self: LeoDecoderBackend, data, parity, recovered: var openArray[seq[byte]]
self: LeoDecoderBackend,
data, parity, recovered: ptr UncheckedArray[ptr UncheckedArray[byte]],
dataLen, parityLen, recoveredLen: int,
): Result[void, cstring] =
## Decode data using given Leopard backend
@ -50,7 +54,7 @@ method decode*(
else:
self.decoder.get()
decoder.decode(data, parity, recovered)
decoder.decode(data, parity, recovered, dataLen, parityLen, recoveredLen)
method release*(self: LeoEncoderBackend) =
if self.encoder.isSome:

View File

@ -12,12 +12,14 @@ import pkg/upraises
push:
{.upraises: [].}
import std/sequtils
import std/sugar
import std/[sugar, atomics, sequtils]
import pkg/chronos
import pkg/chronos/threadsync
import pkg/chronicles
import pkg/libp2p/[multicodec, cid, multihash]
import pkg/libp2p/protobuf/minprotobuf
import pkg/taskpools
import ../logutils
import ../manifest
@ -28,6 +30,7 @@ import ../utils
import ../utils/asynciter
import ../indexingstrategy
import ../errors
import ../utils/arrayutils
import pkg/stew/byteutils
@ -68,6 +71,7 @@ type
proc(size, blocks, parity: int): DecoderBackend {.raises: [Defect], noSideEffect.}
Erasure* = ref object
taskPool: Taskpool
encoderProvider*: EncoderProvider
decoderProvider*: DecoderProvider
store*: BlockStore
@ -87,6 +91,24 @@ type
# provided.
minSize*: NBytes
EncodeTask = object
success: Atomic[bool]
erasure: ptr Erasure
blocks: ptr UncheckedArray[ptr UncheckedArray[byte]]
parity: ptr UncheckedArray[ptr UncheckedArray[byte]]
blockSize, blocksLen, parityLen: int
signal: ThreadSignalPtr
DecodeTask = object
success: Atomic[bool]
erasure: ptr Erasure
blocks: ptr UncheckedArray[ptr UncheckedArray[byte]]
parity: ptr UncheckedArray[ptr UncheckedArray[byte]]
recovered: ptr UncheckedArray[ptr UncheckedArray[byte]]
blockSize, blocksLen: int
parityLen, recoveredLen: int
signal: ThreadSignalPtr
func indexToPos(steps, idx, step: int): int {.inline.} =
## Convert an index to a position in the encoded
## dataset
@ -269,6 +291,81 @@ proc init*(
strategy: strategy,
)
proc leopardEncodeTask(tp: Taskpool, task: ptr EncodeTask) {.gcsafe.} =
# Task suitable for running in taskpools - look, no GC!
let encoder =
task[].erasure.encoderProvider(task[].blockSize, task[].blocksLen, task[].parityLen)
defer:
encoder.release()
discard task[].signal.fireSync()
if (
let res =
encoder.encode(task[].blocks, task[].parity, task[].blocksLen, task[].parityLen)
res.isErr
):
warn "Error from leopard encoder backend!", error = $res.error
task[].success.store(false)
else:
task[].success.store(true)
proc encodeAsync*(
self: Erasure,
blockSize, blocksLen, parityLen: int,
data: ref seq[seq[byte]],
parity: ptr UncheckedArray[ptr UncheckedArray[byte]],
): Future[?!void] {.async: (raises: [CancelledError]).} =
without threadPtr =? ThreadSignalPtr.new():
return failure("Unable to create thread signal")
defer:
threadPtr.close().expect("closing once works")
var blockData = createDoubleArray(blocksLen, blockSize)
for i in 0 ..< data[].len:
copyMem(blockData[i], addr data[i][0], blockSize)
defer:
freeDoubleArray(blockData, blocksLen)
## Create an ecode task with block data
var task = EncodeTask(
erasure: addr self,
blockSize: blockSize,
blocksLen: blocksLen,
parityLen: parityLen,
blocks: blockData,
parity: parity,
signal: threadPtr,
)
let t = addr task
doAssert self.taskPool.numThreads > 1,
"Must have at least one separate thread or signal will never be fired"
self.taskPool.spawn leopardEncodeTask(self.taskPool, t)
let threadFut = threadPtr.wait()
try:
await threadFut.join()
except CatchableError as exc:
try:
await threadFut
except AsyncError as asyncExc:
return failure(asyncExc.msg)
finally:
if exc of CancelledError:
raise (ref CancelledError) exc
else:
return failure(exc.msg)
if not t.success.load():
return failure("Leopard encoding failed")
success()
proc encodeData(
self: Erasure, manifest: Manifest, params: EncodingParams
): Future[?!Manifest] {.async.} =
@ -276,7 +373,6 @@ proc encodeData(
##
## `manifest` - the manifest to encode
##
logScope:
steps = params.steps
rounded_blocks = params.rounded
@ -286,7 +382,6 @@ proc encodeData(
var
cids = seq[Cid].new()
encoder = self.encoderProvider(manifest.blockSize.int, params.ecK, params.ecM)
emptyBlock = newSeq[byte](manifest.blockSize.int)
cids[].setLen(params.blocksCount)
@ -296,8 +391,7 @@ proc encodeData(
# TODO: Don't allocate a new seq every time, allocate once and zero out
var
data = seq[seq[byte]].new() # number of blocks to encode
parityData =
newSeqWith[seq[byte]](params.ecM, newSeq[byte](manifest.blockSize.int))
parity = createDoubleArray(params.ecM, manifest.blockSize.int)
data[].setLen(params.ecK)
# TODO: this is a tight blocking loop so we sleep here to allow
@ -311,15 +405,25 @@ proc encodeData(
trace "Unable to prepare data", error = err.msg
return failure(err)
trace "Erasure coding data", data = data[].len, parity = parityData.len
trace "Erasure coding data", data = data[].len
if (let res = encoder.encode(data[], parityData); res.isErr):
trace "Unable to encode manifest!", error = $res.error
return failure($res.error)
try:
if err =? (
await self.encodeAsync(
manifest.blockSize.int, params.ecK, params.ecM, data, parity
)
).errorOption:
return failure(err)
except CancelledError as exc:
raise exc
finally:
freeDoubleArray(parity, params.ecM)
var idx = params.rounded + step
for j in 0 ..< params.ecM:
without blk =? bt.Block.new(parityData[j]), error:
var innerPtr: ptr UncheckedArray[byte] = parity[][j]
without blk =? bt.Block.new(innerPtr.toOpenArray(0, manifest.blockSize.int - 1)),
error:
trace "Unable to create parity block", err = error.msg
return failure(error)
@ -356,8 +460,6 @@ proc encodeData(
except CatchableError as exc:
trace "Erasure coding encoding error", exc = exc.msg
return failure(exc)
finally:
encoder.release()
proc encode*(
self: Erasure,
@ -381,6 +483,101 @@ proc encode*(
return success encodedManifest
proc leopardDecodeTask(tp: Taskpool, task: ptr DecodeTask) {.gcsafe.} =
# Task suitable for running in taskpools - look, no GC!
let decoder =
task[].erasure.decoderProvider(task[].blockSize, task[].blocksLen, task[].parityLen)
defer:
decoder.release()
if (
let res = decoder.decode(
task[].blocks,
task[].parity,
task[].recovered,
task[].blocksLen,
task[].parityLen,
task[].recoveredLen,
)
res.isErr
):
warn "Error from leopard decoder backend!", error = $res.error
task[].success.store(false)
else:
task[].success.store(true)
discard task[].signal.fireSync()
proc decodeAsync*(
self: Erasure,
blockSize, blocksLen, parityLen: int,
blocks, parity: ref seq[seq[byte]],
recovered: ptr UncheckedArray[ptr UncheckedArray[byte]],
): Future[?!void] {.async: (raises: [CancelledError]).} =
without threadPtr =? ThreadSignalPtr.new():
return failure("Unable to create thread signal")
defer:
threadPtr.close().expect("closing once works")
var
blocksData = createDoubleArray(blocksLen, blockSize)
parityData = createDoubleArray(parityLen, blockSize)
for i in 0 ..< blocks[].len:
if blocks[i].len > 0:
copyMem(blocksData[i], addr blocks[i][0], blockSize)
else:
blocksData[i] = nil
for i in 0 ..< parity[].len:
if parity[i].len > 0:
copyMem(parityData[i], addr parity[i][0], blockSize)
else:
parityData[i] = nil
defer:
freeDoubleArray(blocksData, blocksLen)
freeDoubleArray(parityData, parityLen)
## Create an decode task with block data
var task = DecodeTask(
erasure: addr self,
blockSize: blockSize,
blocksLen: blocksLen,
parityLen: parityLen,
recoveredLen: blocksLen,
blocks: blocksData,
parity: parityData,
recovered: recovered,
signal: threadPtr,
)
# Hold the task pointer until the signal is received
let t = addr task
doAssert self.taskPool.numThreads > 1,
"Must have at least one separate thread or signal will never be fired"
self.taskPool.spawn leopardDecodeTask(self.taskPool, t)
let threadFut = threadPtr.wait()
try:
await threadFut.join()
except CatchableError as exc:
try:
await threadFut
except AsyncError as asyncExc:
return failure(asyncExc.msg)
finally:
if exc of CancelledError:
raise (ref CancelledError) exc
else:
return failure(exc.msg)
if not t.success.load():
return failure("Leopard encoding failed")
success()
proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
## Decode a protected manifest into it's original
## manifest
@ -388,7 +585,6 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
## `encoded` - the encoded (protected) manifest to
## be recovered
##
logScope:
steps = encoded.steps
rounded_blocks = encoded.rounded
@ -411,8 +607,7 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
var
data = seq[seq[byte]].new()
parityData = seq[seq[byte]].new()
recovered =
newSeqWith[seq[byte]](encoded.ecK, newSeq[byte](encoded.blockSize.int))
recovered = createDoubleArray(encoded.ecK, encoded.blockSize.int)
data[].setLen(encoded.ecK) # set len to K
parityData[].setLen(encoded.ecM) # set len to M
@ -430,15 +625,26 @@ proc decode*(self: Erasure, encoded: Manifest): Future[?!Manifest] {.async.} =
continue
trace "Erasure decoding data"
if (let err = decoder.decode(data[], parityData[], recovered); err.isErr):
trace "Unable to decode data!", err = $err.error
return failure($err.error)
try:
if err =? (
await self.decodeAsync(
encoded.blockSize.int, encoded.ecK, encoded.ecM, data, parityData, recovered
)
).errorOption:
return failure(err)
except CancelledError as exc:
raise exc
finally:
freeDoubleArray(recovered, encoded.ecK)
for i in 0 ..< encoded.ecK:
let idx = i * encoded.steps + step
if data[i].len <= 0 and not cids[idx].isEmpty:
without blk =? bt.Block.new(recovered[i]), error:
var innerPtr: ptr UncheckedArray[byte] = recovered[][i]
without blk =? bt.Block.new(
innerPtr.toOpenArray(0, encoded.blockSize.int - 1)
), error:
trace "Unable to create block!", exc = error.msg
return failure(error)
@ -490,10 +696,13 @@ proc new*(
store: BlockStore,
encoderProvider: EncoderProvider,
decoderProvider: DecoderProvider,
taskPool: Taskpool,
): Erasure =
## Create a new Erasure instance for encoding and decoding manifests
##
Erasure(
store: store, encoderProvider: encoderProvider, decoderProvider: decoderProvider
store: store,
encoderProvider: encoderProvider,
decoderProvider: decoderProvider,
taskPool: taskPool,
)

View File

@ -15,6 +15,7 @@ import std/strformat
import std/sugar
import times
import pkg/taskpools
import pkg/questionable
import pkg/questionable/results
import pkg/chronos
@ -70,6 +71,7 @@ type
contracts*: Contracts
clock*: Clock
storage*: Contracts
taskpool: Taskpool
CodexNodeRef* = ref CodexNode
@ -235,8 +237,9 @@ proc streamEntireDataset(
# Retrieve, decode and save to the local store all EС groups
proc erasureJob(): Future[?!void] {.async.} =
# Spawn an erasure decoding job
let erasure =
Erasure.new(self.networkStore, leoEncoderProvider, leoDecoderProvider)
let erasure = Erasure.new(
self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool
)
without _ =? (await erasure.decode(manifest)), error:
error "Unable to erasure decode manifest", manifestCid, exc = error.msg
return failure(error)
@ -461,8 +464,9 @@ proc setupRequest(
return failure error
# Erasure code the dataset according to provided parameters
let erasure =
Erasure.new(self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider)
let erasure = Erasure.new(
self.networkStore.localStore, leoEncoderProvider, leoDecoderProvider, self.taskpool
)
without encoded =? (await erasure.encode(manifest, ecK, ecM)), error:
trace "Unable to erasure code dataset"
@ -782,12 +786,16 @@ proc stop*(self: CodexNodeRef) {.async.} =
if not self.networkStore.isNil:
await self.networkStore.close
if not self.taskpool.isNil:
self.taskpool.shutdown()
proc new*(
T: type CodexNodeRef,
switch: Switch,
networkStore: NetworkStore,
engine: BlockExcEngine,
discovery: Discovery,
taskpool: Taskpool,
prover = Prover.none,
contracts = Contracts.default,
): CodexNodeRef =
@ -800,5 +808,6 @@ proc new*(
engine: engine,
prover: prover,
discovery: discovery,
taskPool: taskpool,
contracts: contracts,
)

View File

@ -0,0 +1,25 @@
import std/sequtils
proc createDoubleArray*(
outerLen, innerLen: int
): ptr UncheckedArray[ptr UncheckedArray[byte]] =
# Allocate outer array
result = cast[ptr UncheckedArray[ptr UncheckedArray[byte]]](allocShared0(
sizeof(ptr UncheckedArray[byte]) * outerLen
))
# Allocate each inner array
for i in 0 ..< outerLen:
result[i] = cast[ptr UncheckedArray[byte]](allocShared0(sizeof(byte) * innerLen))
proc freeDoubleArray*(
arr: ptr UncheckedArray[ptr UncheckedArray[byte]], outerLen: int
) =
# Free each inner array
for i in 0 ..< outerLen:
if not arr[i].isNil:
deallocShared(arr[i])
# Free outer array
if not arr.isNil:
deallocShared(arr)

View File

@ -6,6 +6,7 @@ import pkg/chronos
import pkg/codex/codextypes
import pkg/codex/chunker
import pkg/codex/stores
import pkg/taskpools
import ../../asynctest
@ -118,6 +119,7 @@ template setupAndTearDown*() {.dirty.} =
engine = engine,
prover = Prover.none,
discovery = blockDiscovery,
taskpool = Taskpool.new(),
)
teardown:

View File

@ -75,7 +75,7 @@ asyncchecksuite "Test Node - Host contracts":
let
manifestBlock =
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new)
manifestCid = manifestBlock.cid
manifestCidStr = $(manifestCid)

View File

@ -12,6 +12,7 @@ import pkg/questionable/results
import pkg/stint
import pkg/poseidon2
import pkg/poseidon2/io
import pkg/taskpools
import pkg/nitro
import pkg/codexdht/discv5/protocol as discv5
@ -67,7 +68,7 @@ asyncchecksuite "Test Node - Basic":
# https://github.com/codex-storage/nim-codex/issues/699
let
cstore = CountingStore.new(engine, localStore)
node = CodexNodeRef.new(switch, cstore, engine, blockDiscovery)
node = CodexNodeRef.new(switch, cstore, engine, blockDiscovery, Taskpool.new())
missingCid =
Cid.init("zDvZRwzmCvtiyubW9AecnxgLnXK8GrBvpQJBDzToxmzDN6Nrc2CZ").get()
@ -138,7 +139,8 @@ asyncchecksuite "Test Node - Basic":
test "Setup purchase request":
let
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
erasure =
Erasure.new(store, leoEncoderProvider, leoDecoderProvider, Taskpool.new())
manifest = await storeDataGetManifest(localStore, chunker)
manifestBlock =
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()

View File

@ -1,5 +1,6 @@
import std/sequtils
import std/sugar
import std/times
import pkg/chronos
import pkg/questionable/results
@ -11,6 +12,8 @@ import pkg/codex/blocktype as bt
import pkg/codex/rng
import pkg/codex/utils
import pkg/codex/indexingstrategy
import pkg/taskpools
import pkg/codex/utils/arrayutils
import ../asynctest
import ./helpers
@ -27,6 +30,7 @@ suite "Erasure encode/decode":
var erasure: Erasure
let repoTmp = TempLevelDb.new()
let metaTmp = TempLevelDb.new()
var taskpool: Taskpool
setup:
let
@ -35,12 +39,14 @@ suite "Erasure encode/decode":
rng = Rng.instance()
chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize)
store = RepoStore.new(repoDs, metaDs)
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
taskpool = Taskpool.new()
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool)
manifest = await storeDataGetManifest(store, chunker)
teardown:
await repoTmp.destroyDb()
await metaTmp.destroyDb()
taskpool.shutdown()
proc encode(buffers, parity: int): Future[Manifest] {.async.} =
let encoded =
@ -212,7 +218,7 @@ suite "Erasure encode/decode":
let present = await store.hasBlock(manifest.treeCid, d)
check present.tryGet()
test "handles edge case of 0 parity blocks":
test "Handles edge case of 0 parity blocks":
const
buffers = 20
parity = 0
@ -221,6 +227,43 @@ suite "Erasure encode/decode":
discard (await erasure.decode(encoded)).tryGet()
test "Should concurrently encode/decode multiple datasets":
const iterations = 2
let
datasetSize = 1.MiBs
ecK = 10.Natural
ecM = 10.Natural
var encodeTasks = newSeq[Future[?!Manifest]]()
var decodeTasks = newSeq[Future[?!Manifest]]()
var manifests = newSeq[Manifest]()
for i in 0 ..< iterations:
let
# create random data and store it
blockSize = rng.sample(@[1, 2, 4, 8, 16, 32, 64].mapIt(it.KiBs))
chunker = RandomChunker.new(rng, size = datasetSize, chunkSize = blockSize)
manifest = await storeDataGetManifest(store, chunker)
manifests.add(manifest)
# encode the data concurrently
encodeTasks.add(erasure.encode(manifest, ecK, ecM))
# wait for all encoding tasks to finish
let encodeResults = await allFinished(encodeTasks)
# decode the data concurrently
for i in 0 ..< encodeResults.len:
decodeTasks.add(erasure.decode(encodeResults[i].read().tryGet()))
# wait for all decoding tasks to finish
let decodeResults = await allFinished(decodeTasks) # TODO: use allFutures
for j in 0 ..< decodeTasks.len:
let
decoded = decodeResults[j].read().tryGet()
encoded = encodeResults[j].read().tryGet()
check:
decoded.treeCid == manifests[j].treeCid
decoded.treeCid == encoded.originalTreeCid
decoded.blocksCount == encoded.originalBlocksCount
test "Should handle verifiable manifests":
const
buffers = 20
@ -259,3 +302,73 @@ suite "Erasure encode/decode":
decoded.treeCid == manifest.treeCid
decoded.treeCid == encoded.originalTreeCid
decoded.blocksCount == encoded.originalBlocksCount
test "Should complete encode/decode task when cancelled":
let
blocksLen = 10000
parityLen = 10
data = seq[seq[byte]].new()
chunker = RandomChunker.new(
rng, size = (blocksLen * BlockSize.int), chunkSize = BlockSize
)
data[].setLen(blocksLen)
for i in 0 ..< blocksLen:
let chunk = await chunker.getBytes()
shallowCopy(data[i], @(chunk))
let
parity = createDoubleArray(parityLen, BlockSize.int)
paritySeq = seq[seq[byte]].new()
recovered = createDoubleArray(blocksLen, BlockSize.int)
cancelledTaskParity = createDoubleArray(parityLen, BlockSize.int)
cancelledTaskRecovered = createDoubleArray(blocksLen, BlockSize.int)
paritySeq[].setLen(parityLen)
defer:
freeDoubleArray(parity, parityLen)
freeDoubleArray(cancelledTaskParity, parityLen)
freeDoubleArray(recovered, blocksLen)
freeDoubleArray(cancelledTaskRecovered, blocksLen)
for i in 0 ..< parityLen:
paritySeq[i] = cast[seq[byte]](parity[i])
# call encodeAsync to get the parity
let encFut =
await erasure.encodeAsync(BlockSize.int, blocksLen, parityLen, data, parity)
check encFut.isOk
let decFut = await erasure.decodeAsync(
BlockSize.int, blocksLen, parityLen, data, paritySeq, recovered
)
check decFut.isOk
# call encodeAsync and cancel the task
let encodeFut = erasure.encodeAsync(
BlockSize.int, blocksLen, parityLen, data, cancelledTaskParity
)
encodeFut.cancel()
try:
discard await encodeFut
except CatchableError as exc:
check exc of CancelledError
finally:
for i in 0 ..< parityLen:
check equalMem(parity[i], cancelledTaskParity[i], BlockSize.int)
# call decodeAsync and cancel the task
let decodeFut = erasure.decodeAsync(
BlockSize.int, blocksLen, parityLen, data, paritySeq, cancelledTaskRecovered
)
decodeFut.cancel()
try:
discard await decodeFut
except CatchableError as exc:
check exc of CancelledError
finally:
for i in 0 ..< blocksLen:
check equalMem(recovered[i], cancelledTaskRecovered[i], BlockSize.int)

2
vendor/nim-leopard vendored

@ -1 +1 @@
Subproject commit 3e09d8113f874f3584c3fe93818541b2ff9fb9c3
Subproject commit 7506b90f9c650c02b96bf525d4fd1bd4942a495f