Scheduling erasure coding on another thread (#716)

* Scheduling erasure coding on another thread

* Code review fixes

* Fix for review comments

* Fix missing import

---------

Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
This commit is contained in:
Tomasz Bekas 2024-03-23 10:56:35 +01:00 committed by GitHub
parent de1714ed06
commit 59d9439ae9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 273 additions and 39 deletions

View File

@ -11,6 +11,7 @@ import std/sequtils
import std/strutils
import std/os
import std/tables
import std/cpuinfo
import pkg/chronos
import pkg/presto
@ -23,6 +24,7 @@ import pkg/stew/shims/net as stewnet
import pkg/datastore
import pkg/ethers except Rng
import pkg/stew/io2
import pkg/taskpools
import ./node
import ./conf
@ -53,6 +55,7 @@ type
codexNode: CodexNodeRef
repoStore: RepoStore
maintenance: BlockMaintainer
taskpool: Taskpool
CodexPrivateKey* = libp2p.PrivateKey # alias
EthWallet = ethers.Wallet
@ -180,6 +183,10 @@ proc start*(s: CodexServer) {.async.} =
proc stop*(s: CodexServer) {.async.} =
notice "Stopping codex node"
s.taskpool.syncAll()
s.taskpool.shutdown()
await allFuturesThrowing(
s.restServer.stop(),
s.codexNode.switch.stop(),
@ -290,12 +297,15 @@ proc new*(
else:
none Prover
taskpool = Taskpool.new(num_threads = countProcessors())
codexNode = CodexNodeRef.new(
switch = switch,
networkStore = store,
engine = engine,
prover = prover,
discovery = discovery)
discovery = discovery,
taskpool = taskpool)
restServer = RestServerRef.new(
codexNode.initRestApi(config, repoStore),
@ -311,4 +321,5 @@ proc new*(
codexNode: codexNode,
restServer: restServer,
repoStore: repoStore,
maintenance: maintenance)
maintenance: maintenance,
taskpool: taskpool)

View File

@ -0,0 +1,211 @@
## Nim-Codex
## Copyright (c) 2024 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
import pkg/taskpools
import pkg/taskpools/flowvars
import pkg/chronos
import pkg/chronos/threadsync
import pkg/questionable/results
import ./backend
import ../errors
import ../logutils
logScope:
topics = "codex asyncerasure"
const
CompletitionTimeout = 1.seconds # Maximum await time for completition after receiving a signal
CompletitionRetryDelay = 10.millis
type
EncoderBackendPtr = ptr EncoderBackend
DecoderBackendPtr = ptr DecoderBackend
# Args objects are missing seq[seq[byte]] field, to avoid unnecessary data copy
EncodeTaskArgs = object
signal: ThreadSignalPtr
backend: EncoderBackendPtr
blockSize: int
ecM: int
DecodeTaskArgs = object
signal: ThreadSignalPtr
backend: DecoderBackendPtr
blockSize: int
ecK: int
SharedArrayHolder*[T] = object
data: ptr UncheckedArray[T]
size: int
EncodeTaskResult = Result[SharedArrayHolder[byte], cstring]
DecodeTaskResult = Result[SharedArrayHolder[byte], cstring]
proc encodeTask(args: EncodeTaskArgs, data: seq[seq[byte]]): EncodeTaskResult =
var
data = data.unsafeAddr
parity = newSeqWith[seq[byte]](args.ecM, newSeq[byte](args.blockSize))
try:
let res = args.backend[].encode(data[], parity)
if res.isOk:
let
resDataSize = parity.len * args.blockSize
resData = cast[ptr UncheckedArray[byte]](allocShared0(resDataSize))
arrHolder = SharedArrayHolder[byte](
data: resData,
size: resDataSize
)
for i in 0..<parity.len:
copyMem(addr resData[i * args.blockSize], addr parity[i][0], args.blockSize)
return ok(arrHolder)
else:
return err(res.error)
except CatchableError as exception:
return err(exception.msg.cstring)
finally:
if err =? args.signal.fireSync().mapFailure.errorOption():
error "Error firing signal", msg = err.msg
proc decodeTask(args: DecodeTaskArgs, data: seq[seq[byte]], parity: seq[seq[byte]]): DecodeTaskResult =
var
data = data.unsafeAddr
parity = parity.unsafeAddr
recovered = newSeqWith[seq[byte]](args.ecK, newSeq[byte](args.blockSize))
try:
let res = args.backend[].decode(data[], parity[], recovered)
if res.isOk:
let
resDataSize = recovered.len * args.blockSize
resData = cast[ptr UncheckedArray[byte]](allocShared0(resDataSize))
arrHolder = SharedArrayHolder[byte](
data: resData,
size: resDataSize
)
for i in 0..<recovered.len:
copyMem(addr resData[i * args.blockSize], addr recovered[i][0], args.blockSize)
return ok(arrHolder)
else:
return err(res.error)
except CatchableError as exception:
return err(exception.msg.cstring)
finally:
if err =? args.signal.fireSync().mapFailure.errorOption():
error "Error firing signal", msg = err.msg
proc proxySpawnEncodeTask(
tp: Taskpool,
args: EncodeTaskArgs,
data: ref seq[seq[byte]]
): Flowvar[EncodeTaskResult] =
tp.spawn encodeTask(args, data[])
proc proxySpawnDecodeTask(
tp: Taskpool,
args: DecodeTaskArgs,
data: ref seq[seq[byte]],
parity: ref seq[seq[byte]]
): Flowvar[DecodeTaskResult] =
tp.spawn decodeTask(args, data[], parity[])
proc awaitResult[T](signal: ThreadSignalPtr, handle: Flowvar[T]): Future[?!T] {.async.} =
await wait(signal)
var
res: T
awaitTotal: Duration
while awaitTotal < CompletitionTimeout:
if handle.tryComplete(res):
return success(res)
else:
awaitTotal += CompletitionRetryDelay
await sleepAsync(CompletitionRetryDelay)
return failure("Task signaled finish but didn't return any result within " & $CompletitionRetryDelay)
proc asyncEncode*(
tp: Taskpool,
backend: EncoderBackend,
data: ref seq[seq[byte]],
blockSize: int,
ecM: int
): Future[?!ref seq[seq[byte]]] {.async.} =
without signal =? ThreadSignalPtr.new().mapFailure, err:
return failure(err)
try:
let
blockSize = data[0].len
args = EncodeTaskArgs(signal: signal, backend: unsafeAddr backend, blockSize: blockSize, ecM: ecM)
handle = proxySpawnEncodeTask(tp, args, data)
without res =? await awaitResult(signal, handle), err:
return failure(err)
if res.isOk:
var parity = seq[seq[byte]].new()
parity[].setLen(ecM)
for i in 0..<parity[].len:
parity[i] = newSeq[byte](blockSize)
copyMem(addr parity[i][0], addr res.value.data[i * blockSize], blockSize)
deallocShared(res.value.data)
return success(parity)
else:
return failure($res.error)
finally:
if err =? signal.close().mapFailure.errorOption():
error "Error closing signal", msg = $err.msg
proc asyncDecode*(
tp: Taskpool,
backend: DecoderBackend,
data, parity: ref seq[seq[byte]],
blockSize: int
): Future[?!ref seq[seq[byte]]] {.async.} =
without signal =? ThreadSignalPtr.new().mapFailure, err:
return failure(err)
try:
let
ecK = data[].len
args = DecodeTaskArgs(signal: signal, backend: unsafeAddr backend, blockSize: blockSize, ecK: ecK)
handle = proxySpawnDecodeTask(tp, args, data, parity)
without res =? await awaitResult(signal, handle), err:
return failure(err)
if res.isOk:
var recovered = seq[seq[byte]].new()
recovered[].setLen(ecK)
for i in 0..<recovered[].len:
recovered[i] = newSeq[byte](blockSize)
copyMem(addr recovered[i][0], addr res.value.data[i * blockSize], blockSize)
deallocShared(res.value.data)
return success(recovered)
else:
return failure($res.error)
finally:
if err =? signal.close().mapFailure.errorOption():
error "Error closing signal", msg = $err.msg

View File

@ -17,6 +17,7 @@ import std/sugar
import pkg/chronos
import pkg/libp2p/[multicodec, cid, multihash]
import pkg/libp2p/protobuf/minprotobuf
import pkg/taskpools
import ../logutils
import ../manifest
@ -30,6 +31,7 @@ import ../indexingstrategy
import pkg/stew/byteutils
import ./backend
import ./asyncbackend
export backend
@ -70,6 +72,7 @@ type
encoderProvider*: EncoderProvider
decoderProvider*: DecoderProvider
store*: BlockStore
taskpool: Taskpool
EncodingParams = object
ecK: Natural
@ -282,30 +285,23 @@ proc encodeData(
# TODO: Don't allocate a new seq every time, allocate once and zero out
var
data = seq[seq[byte]].new() # number of blocks to encode
parityData = newSeqWith[seq[byte]](params.ecM, newSeq[byte](manifest.blockSize.int))
data[].setLen(params.ecK)
# TODO: this is a tight blocking loop so we sleep here to allow
# other events to be processed, this should be addressed
# by threading
await sleepAsync(10.millis)
without resolved =?
(await self.prepareEncodingData(manifest, params, step, data, cids, emptyBlock)), err:
trace "Unable to prepare data", error = err.msg
return failure(err)
trace "Erasure coding data", data = data[].len, parity = parityData.len
trace "Erasure coding data", data = data[].len, parity = params.ecM
if (
let res = encoder.encode(data[], parityData);
res.isErr):
trace "Unable to encode manifest!", error = $res.error
return failure($res.error)
without parity =? await asyncEncode(self.taskpool, encoder, data, manifest.blockSize.int, params.ecM), err:
trace "Error encoding data", err = err.msg
return failure(err)
var idx = params.rounded + step
for j in 0..<params.ecM:
without blk =? bt.Block.new(parityData[j]), error:
without blk =? bt.Block.new(parity[j]), error:
trace "Unable to create parity block", err = error.msg
return failure(error)
@ -390,21 +386,15 @@ proc decode*(
cids[].setLen(encoded.blocksCount)
try:
for step in 0..<encoded.steps:
# TODO: this is a tight blocking loop so we sleep here to allow
# other events to be processed, this should be addressed
# by threading
await sleepAsync(10.millis)
var
data = seq[seq[byte]].new()
parityData = seq[seq[byte]].new()
recovered = newSeqWith[seq[byte]](encoded.ecK, newSeq[byte](encoded.blockSize.int))
parity = seq[seq[byte]].new()
data[].setLen(encoded.ecK) # set len to K
parityData[].setLen(encoded.ecM) # set len to M
data[].setLen(encoded.ecK) # set len to K
parity[].setLen(encoded.ecM) # set len to M
without (dataPieces, _) =?
(await self.prepareDecodingData(encoded, step, data, parityData, cids, emptyBlock)), err:
(await self.prepareDecodingData(encoded, step, data, parity, cids, emptyBlock)), err:
trace "Unable to prepare data", error = err.msg
return failure(err)
@ -413,11 +403,10 @@ proc decode*(
continue
trace "Erasure decoding data"
if (
let err = decoder.decode(data[], parityData[], recovered);
err.isErr):
trace "Unable to decode data!", err = $err.error
return failure($err.error)
without recovered =? await asyncDecode(self.taskpool, decoder, data, parity, encoded.blockSize.int), err:
trace "Error decoding data", err = err.msg
return failure(err)
for i in 0..<encoded.ecK:
let idx = i * encoded.steps + step
@ -472,10 +461,13 @@ proc new*(
T: type Erasure,
store: BlockStore,
encoderProvider: EncoderProvider,
decoderProvider: DecoderProvider): Erasure =
decoderProvider: DecoderProvider,
taskpool: Taskpool): Erasure =
## Create a new Erasure instance for encoding and decoding manifests
##
Erasure(
store: store,
encoderProvider: encoderProvider,
decoderProvider: decoderProvider)
decoderProvider: decoderProvider,
taskpool: taskpool)

View File

@ -13,6 +13,7 @@ import std/options
import std/sequtils
import std/strformat
import std/sugar
import std/cpuinfo
import pkg/questionable
import pkg/questionable/results
@ -25,6 +26,7 @@ import pkg/libp2p/stream/bufferstream
# TODO: remove once exported by libp2p
import pkg/libp2p/routing_record
import pkg/libp2p/signed_envelope
import pkg/taskpools
import ./chunker
import ./slots
@ -69,6 +71,7 @@ type
contracts*: Contracts
clock*: Clock
storage*: Contracts
taskpool*: Taskpool
CodexNodeRef* = ref CodexNode
@ -253,7 +256,8 @@ proc streamEntireDataset(
erasure = Erasure.new(
self.networkStore,
leoEncoderProvider,
leoDecoderProvider)
leoDecoderProvider,
self.taskpool)
without _ =? (await erasure.decode(manifest)), error:
trace "Unable to erasure decode manifest", manifestCid, exc = error.msg
except CatchableError as exc:
@ -420,7 +424,8 @@ proc setupRequest(
erasure = Erasure.new(
self.networkStore.localStore,
leoEncoderProvider,
leoDecoderProvider)
leoDecoderProvider,
self.taskpool)
without encoded =? (await erasure.encode(manifest, ecK, ecM)), error:
trace "Unable to erasure code dataset"
@ -748,7 +753,8 @@ proc new*(
engine: BlockExcEngine,
discovery: Discovery,
prover = Prover.none,
contracts = Contracts.default): CodexNodeRef =
contracts = Contracts.default,
taskpool = Taskpool.new(num_threads = countProcessors())): CodexNodeRef =
## Create new instance of a Codex self, call `start` to run it
##
@ -758,4 +764,5 @@ proc new*(
engine: engine,
prover: prover,
discovery: discovery,
contracts: contracts)
contracts: contracts,
taskpool: taskpool)

View File

@ -1,8 +1,10 @@
import std/tables
import std/times
import std/cpuinfo
import pkg/libp2p
import pkg/chronos
import pkg/taskpools
import pkg/codex/codextypes
import pkg/codex/chunker
@ -81,6 +83,7 @@ template setupAndTearDown*() {.dirty.} =
peerStore: PeerCtxStore
pendingBlocks: PendingBlocksManager
discovery: DiscoveryEngine
taskpool: Taskpool
let
path = currentSourcePath().parentDir
@ -107,12 +110,14 @@ template setupAndTearDown*() {.dirty.} =
discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks)
engine = BlockExcEngine.new(localStore, wallet, network, discovery, peerStore, pendingBlocks)
store = NetworkStore.new(engine, localStore)
taskpool = Taskpool.new(num_threads = countProcessors())
node = CodexNodeRef.new(
switch = switch,
networkStore = store,
engine = engine,
prover = Prover.none,
discovery = blockDiscovery)
discovery = blockDiscovery,
taskpool = taskpool)
await node.start()

View File

@ -4,6 +4,7 @@ import std/math
import std/times
import std/sequtils
import std/importutils
import std/cpuinfo
import pkg/chronos
import pkg/stew/byteutils
@ -13,6 +14,7 @@ import pkg/questionable/results
import pkg/stint
import pkg/poseidon2
import pkg/poseidon2/io
import pkg/taskpools
import pkg/nitro
import pkg/codexdht/discv5/protocol as discv5
@ -78,7 +80,7 @@ asyncchecksuite "Test Node - Host contracts":
manifestBlock = bt.Block.new(
manifest.encode().tryGet(),
codec = ManifestCodec).tryGet()
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool)
manifestCid = manifestBlock.cid
manifestCidStr = $(manifestCid)

View File

@ -4,6 +4,7 @@ import std/math
import std/times
import std/sequtils
import std/importutils
import std/cpuinfo
import pkg/chronos
import pkg/stew/byteutils
@ -13,6 +14,7 @@ import pkg/questionable/results
import pkg/stint
import pkg/poseidon2
import pkg/poseidon2/io
import pkg/taskpools
import pkg/nitro
import pkg/codexdht/discv5/protocol as discv5
@ -135,7 +137,7 @@ asyncchecksuite "Test Node - Basic":
test "Setup purchase request":
let
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool)
manifest = await storeDataGetManifest(localStore, chunker)
manifestBlock = bt.Block.new(
manifest.encode().tryGet(),

View File

@ -1,5 +1,6 @@
import std/sequtils
import std/sugar
import std/cpuinfo
import pkg/chronos
import pkg/datastore
@ -12,6 +13,7 @@ import pkg/codex/blocktype as bt
import pkg/codex/rng
import pkg/codex/utils
import pkg/codex/indexingstrategy
import pkg/taskpools
import ../asynctest
import ./helpers
@ -25,6 +27,7 @@ suite "Erasure encode/decode":
var manifest: Manifest
var store: BlockStore
var erasure: Erasure
var taskpool: Taskpool
setup:
let
@ -33,7 +36,8 @@ suite "Erasure encode/decode":
rng = Rng.instance()
chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize)
store = RepoStore.new(repoDs, metaDs)
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
taskpool = Taskpool.new(num_threads = countProcessors())
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool)
manifest = await storeDataGetManifest(store, chunker)
proc encode(buffers, parity: int): Future[Manifest] {.async.} =