diff --git a/codex/codex.nim b/codex/codex.nim index eb2b26e4..2e319b9e 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -11,6 +11,7 @@ import std/sequtils import std/strutils import std/os import std/tables +import std/cpuinfo import pkg/chronos import pkg/presto @@ -24,6 +25,7 @@ import pkg/datastore import pkg/ethers except Rng import pkg/stew/io2 import pkg/questionable +import pkg/taskpools import ./node import ./conf @@ -54,6 +56,7 @@ type codexNode: CodexNodeRef repoStore: RepoStore maintenance: BlockMaintainer + taskpool: Taskpool CodexPrivateKey* = libp2p.PrivateKey # alias EthWallet = ethers.Wallet @@ -188,6 +191,10 @@ proc start*(s: CodexServer) {.async.} = proc stop*(s: CodexServer) {.async.} = notice "Stopping codex node" + + s.taskpool.syncAll() + s.taskpool.shutdown() + await allFuturesThrowing( s.restServer.stop(), s.codexNode.switch.stop(), @@ -275,12 +282,15 @@ proc new*( else: none Prover + taskpool = Taskpool.new(num_threads = countProcessors()) + codexNode = CodexNodeRef.new( switch = switch, networkStore = store, engine = engine, prover = prover, - discovery = discovery) + discovery = discovery, + taskpool = taskpool) restServer = RestServerRef.new( codexNode.initRestApi(config, repoStore), @@ -296,4 +306,5 @@ proc new*( codexNode: codexNode, restServer: restServer, repoStore: repoStore, - maintenance: maintenance) + maintenance: maintenance, + taskpool: taskpool) diff --git a/codex/erasure/asyncbackend.nim b/codex/erasure/asyncbackend.nim new file mode 100644 index 00000000..f3bca851 --- /dev/null +++ b/codex/erasure/asyncbackend.nim @@ -0,0 +1,211 @@ +## Nim-Codex +## Copyright (c) 2024 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/sequtils + +import pkg/taskpools +import pkg/taskpools/flowvars +import pkg/chronos +import pkg/chronos/threadsync +import pkg/questionable/results + +import ./backend +import ../errors +import ../logutils + +logScope: + topics = "codex asyncerasure" + +const + CompletitionTimeout = 1.seconds # Maximum await time for completition after receiving a signal + CompletitionRetryDelay = 10.millis + +type + EncoderBackendPtr = ptr EncoderBackend + DecoderBackendPtr = ptr DecoderBackend + + # Args objects are missing seq[seq[byte]] field, to avoid unnecessary data copy + EncodeTaskArgs = object + signal: ThreadSignalPtr + backend: EncoderBackendPtr + blockSize: int + ecM: int + + DecodeTaskArgs = object + signal: ThreadSignalPtr + backend: DecoderBackendPtr + blockSize: int + ecK: int + + SharedArrayHolder*[T] = object + data: ptr UncheckedArray[T] + size: int + + EncodeTaskResult = Result[SharedArrayHolder[byte], cstring] + DecodeTaskResult = Result[SharedArrayHolder[byte], cstring] + +proc encodeTask(args: EncodeTaskArgs, data: seq[seq[byte]]): EncodeTaskResult = + var + data = data.unsafeAddr + parity = newSeqWith[seq[byte]](args.ecM, newSeq[byte](args.blockSize)) + + try: + let res = args.backend[].encode(data[], parity) + + if res.isOk: + let + resDataSize = parity.len * args.blockSize + resData = cast[ptr UncheckedArray[byte]](allocShared0(resDataSize)) + arrHolder = SharedArrayHolder[byte]( + data: resData, + size: resDataSize + ) + + for i in 0..