diff --git a/codex/codex.nim b/codex/codex.nim index 441bdf88..ffdf3b41 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -11,7 +11,6 @@ import std/sequtils import std/strutils import std/os import std/tables -import std/cpuinfo import pkg/chronos import pkg/presto @@ -24,7 +23,6 @@ import pkg/stew/shims/net as stewnet import pkg/datastore import pkg/ethers except Rng import pkg/stew/io2 -import pkg/taskpools import ./node import ./conf @@ -55,7 +53,6 @@ type codexNode: CodexNodeRef repoStore: RepoStore maintenance: BlockMaintainer - taskpool: Taskpool CodexPrivateKey* = libp2p.PrivateKey # alias EthWallet = ethers.Wallet @@ -190,10 +187,6 @@ proc start*(s: CodexServer) {.async.} = proc stop*(s: CodexServer) {.async.} = notice "Stopping codex node" - - s.taskpool.syncAll() - s.taskpool.shutdown() - await allFuturesThrowing( s.restServer.stop(), s.codexNode.switch.stop(), @@ -283,15 +276,12 @@ proc new*( else: none Prover - taskpool = Taskpool.new(num_threads = countProcessors()) - codexNode = CodexNodeRef.new( switch = switch, networkStore = store, engine = engine, - prover = prover, discovery = discovery, - taskpool = taskpool) + prover = prover) restServer = RestServerRef.new( codexNode.initRestApi(config, repoStore, config.apiCorsAllowedOrigin), @@ -306,6 +296,4 @@ proc new*( config: config, codexNode: codexNode, restServer: restServer, - repoStore: repoStore, - maintenance: maintenance, - taskpool: taskpool) + repoStore: repoStore) diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 56e3e1cf..d35fc18d 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -17,7 +17,6 @@ import std/sugar import pkg/chronos import pkg/libp2p/[multicodec, cid, multihash] import pkg/libp2p/protobuf/minprotobuf -import pkg/taskpools import ../logutils import ../manifest @@ -32,7 +31,6 @@ import ../errors import pkg/stew/byteutils import ./backend -import ./asyncbackend export backend @@ -73,7 +71,6 @@ type encoderProvider*: EncoderProvider decoderProvider*: DecoderProvider store*: BlockStore - taskpool: Taskpool EncodingParams = object ecK: Natural @@ -295,23 +292,30 @@ proc encodeData( # TODO: Don't allocate a new seq every time, allocate once and zero out var data = seq[seq[byte]].new() # number of blocks to encode + parityData = newSeqWith[seq[byte]](params.ecM, newSeq[byte](manifest.blockSize.int)) data[].setLen(params.ecK) + # TODO: this is a tight blocking loop so we sleep here to allow + # other events to be processed, this should be addressed + # by threading + await sleepAsync(10.millis) without resolved =? (await self.prepareEncodingData(manifest, params, step, data, cids, emptyBlock)), err: trace "Unable to prepare data", error = err.msg return failure(err) - trace "Erasure coding data", data = data[].len, parity = params.ecM + trace "Erasure coding data", data = data[].len, parity = parityData.len - without parity =? await asyncEncode(self.taskpool, encoder, data, manifest.blockSize.int, params.ecM), err: - trace "Error encoding data", err = err.msg - return failure(err) + if ( + let res = encoder.encode(data[], parityData); + res.isErr): + trace "Unable to encode manifest!", error = $res.error + return failure($res.error) var idx = params.rounded + step for j in 0..