From 59d9439ae9bb0991382baffcfe87063071536be3 Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Sat, 23 Mar 2024 10:56:35 +0100 Subject: [PATCH] Scheduling erasure coding on another thread (#716) * Scheduling erasure coding on another thread * Code review fixes * Fix for review comments * Fix missing import --------- Co-authored-by: Dmitriy Ryajov --- codex/codex.nim | 15 +- codex/erasure/asyncbackend.nim | 211 +++++++++++++++++++++++++++++ codex/erasure/erasure.nim | 50 +++---- codex/node.nim | 15 +- tests/codex/node/helpers.nim | 7 +- tests/codex/node/testcontracts.nim | 4 +- tests/codex/node/testnode.nim | 4 +- tests/codex/testerasure.nim | 6 +- 8 files changed, 273 insertions(+), 39 deletions(-) create mode 100644 codex/erasure/asyncbackend.nim diff --git a/codex/codex.nim b/codex/codex.nim index e1ca98fa..48027519 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -11,6 +11,7 @@ import std/sequtils import std/strutils import std/os import std/tables +import std/cpuinfo import pkg/chronos import pkg/presto @@ -23,6 +24,7 @@ import pkg/stew/shims/net as stewnet import pkg/datastore import pkg/ethers except Rng import pkg/stew/io2 +import pkg/taskpools import ./node import ./conf @@ -53,6 +55,7 @@ type codexNode: CodexNodeRef repoStore: RepoStore maintenance: BlockMaintainer + taskpool: Taskpool CodexPrivateKey* = libp2p.PrivateKey # alias EthWallet = ethers.Wallet @@ -180,6 +183,10 @@ proc start*(s: CodexServer) {.async.} = proc stop*(s: CodexServer) {.async.} = notice "Stopping codex node" + + s.taskpool.syncAll() + s.taskpool.shutdown() + await allFuturesThrowing( s.restServer.stop(), s.codexNode.switch.stop(), @@ -290,12 +297,15 @@ proc new*( else: none Prover + taskpool = Taskpool.new(num_threads = countProcessors()) + codexNode = CodexNodeRef.new( switch = switch, networkStore = store, engine = engine, prover = prover, - discovery = discovery) + discovery = discovery, + taskpool = taskpool) restServer = RestServerRef.new( codexNode.initRestApi(config, repoStore), @@ -311,4 +321,5 @@ proc new*( codexNode: codexNode, restServer: restServer, repoStore: repoStore, - maintenance: maintenance) + maintenance: maintenance, + taskpool: taskpool) diff --git a/codex/erasure/asyncbackend.nim b/codex/erasure/asyncbackend.nim new file mode 100644 index 00000000..f3bca851 --- /dev/null +++ b/codex/erasure/asyncbackend.nim @@ -0,0 +1,211 @@ +## Nim-Codex +## Copyright (c) 2024 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/sequtils + +import pkg/taskpools +import pkg/taskpools/flowvars +import pkg/chronos +import pkg/chronos/threadsync +import pkg/questionable/results + +import ./backend +import ../errors +import ../logutils + +logScope: + topics = "codex asyncerasure" + +const + CompletitionTimeout = 1.seconds # Maximum await time for completition after receiving a signal + CompletitionRetryDelay = 10.millis + +type + EncoderBackendPtr = ptr EncoderBackend + DecoderBackendPtr = ptr DecoderBackend + + # Args objects are missing seq[seq[byte]] field, to avoid unnecessary data copy + EncodeTaskArgs = object + signal: ThreadSignalPtr + backend: EncoderBackendPtr + blockSize: int + ecM: int + + DecodeTaskArgs = object + signal: ThreadSignalPtr + backend: DecoderBackendPtr + blockSize: int + ecK: int + + SharedArrayHolder*[T] = object + data: ptr UncheckedArray[T] + size: int + + EncodeTaskResult = Result[SharedArrayHolder[byte], cstring] + DecodeTaskResult = Result[SharedArrayHolder[byte], cstring] + +proc encodeTask(args: EncodeTaskArgs, data: seq[seq[byte]]): EncodeTaskResult = + var + data = data.unsafeAddr + parity = newSeqWith[seq[byte]](args.ecM, newSeq[byte](args.blockSize)) + + try: + let res = args.backend[].encode(data[], parity) + + if res.isOk: + let + resDataSize = parity.len * args.blockSize + resData = cast[ptr UncheckedArray[byte]](allocShared0(resDataSize)) + arrHolder = SharedArrayHolder[byte]( + data: resData, + size: resDataSize + ) + + for i in 0..