Revert erasure coding attempt to fix bug

This commit is contained in:
Arnaud 2024-12-18 16:46:45 +01:00
parent 4935494c29
commit a1c8c94ada
No known key found for this signature in database
GPG Key ID: 69D6CE281FCAE663
7 changed files with 39 additions and 61 deletions

View File

@ -11,7 +11,6 @@ import std/sequtils
import std/strutils import std/strutils
import std/os import std/os
import std/tables import std/tables
import std/cpuinfo
import pkg/chronos import pkg/chronos
import pkg/presto import pkg/presto
@ -24,7 +23,6 @@ import pkg/stew/shims/net as stewnet
import pkg/datastore import pkg/datastore
import pkg/ethers except Rng import pkg/ethers except Rng
import pkg/stew/io2 import pkg/stew/io2
import pkg/taskpools
import ./node import ./node
import ./conf import ./conf
@ -55,7 +53,6 @@ type
codexNode: CodexNodeRef codexNode: CodexNodeRef
repoStore: RepoStore repoStore: RepoStore
maintenance: BlockMaintainer maintenance: BlockMaintainer
taskpool: Taskpool
CodexPrivateKey* = libp2p.PrivateKey # alias CodexPrivateKey* = libp2p.PrivateKey # alias
EthWallet = ethers.Wallet EthWallet = ethers.Wallet
@ -190,10 +187,6 @@ proc start*(s: CodexServer) {.async.} =
proc stop*(s: CodexServer) {.async.} = proc stop*(s: CodexServer) {.async.} =
notice "Stopping codex node" notice "Stopping codex node"
s.taskpool.syncAll()
s.taskpool.shutdown()
await allFuturesThrowing( await allFuturesThrowing(
s.restServer.stop(), s.restServer.stop(),
s.codexNode.switch.stop(), s.codexNode.switch.stop(),
@ -283,15 +276,12 @@ proc new*(
else: else:
none Prover none Prover
taskpool = Taskpool.new(num_threads = countProcessors())
codexNode = CodexNodeRef.new( codexNode = CodexNodeRef.new(
switch = switch, switch = switch,
networkStore = store, networkStore = store,
engine = engine, engine = engine,
prover = prover,
discovery = discovery, discovery = discovery,
taskpool = taskpool) prover = prover)
restServer = RestServerRef.new( restServer = RestServerRef.new(
codexNode.initRestApi(config, repoStore, config.apiCorsAllowedOrigin), codexNode.initRestApi(config, repoStore, config.apiCorsAllowedOrigin),
@ -306,6 +296,4 @@ proc new*(
config: config, config: config,
codexNode: codexNode, codexNode: codexNode,
restServer: restServer, restServer: restServer,
repoStore: repoStore, repoStore: repoStore)
maintenance: maintenance,
taskpool: taskpool)

View File

@ -17,7 +17,6 @@ import std/sugar
import pkg/chronos import pkg/chronos
import pkg/libp2p/[multicodec, cid, multihash] import pkg/libp2p/[multicodec, cid, multihash]
import pkg/libp2p/protobuf/minprotobuf import pkg/libp2p/protobuf/minprotobuf
import pkg/taskpools
import ../logutils import ../logutils
import ../manifest import ../manifest
@ -32,7 +31,6 @@ import ../errors
import pkg/stew/byteutils import pkg/stew/byteutils
import ./backend import ./backend
import ./asyncbackend
export backend export backend
@ -73,7 +71,6 @@ type
encoderProvider*: EncoderProvider encoderProvider*: EncoderProvider
decoderProvider*: DecoderProvider decoderProvider*: DecoderProvider
store*: BlockStore store*: BlockStore
taskpool: Taskpool
EncodingParams = object EncodingParams = object
ecK: Natural ecK: Natural
@ -295,23 +292,30 @@ proc encodeData(
# TODO: Don't allocate a new seq every time, allocate once and zero out # TODO: Don't allocate a new seq every time, allocate once and zero out
var var
data = seq[seq[byte]].new() # number of blocks to encode data = seq[seq[byte]].new() # number of blocks to encode
parityData = newSeqWith[seq[byte]](params.ecM, newSeq[byte](manifest.blockSize.int))
data[].setLen(params.ecK) data[].setLen(params.ecK)
# TODO: this is a tight blocking loop so we sleep here to allow
# other events to be processed, this should be addressed
# by threading
await sleepAsync(10.millis)
without resolved =? without resolved =?
(await self.prepareEncodingData(manifest, params, step, data, cids, emptyBlock)), err: (await self.prepareEncodingData(manifest, params, step, data, cids, emptyBlock)), err:
trace "Unable to prepare data", error = err.msg trace "Unable to prepare data", error = err.msg
return failure(err) return failure(err)
trace "Erasure coding data", data = data[].len, parity = params.ecM trace "Erasure coding data", data = data[].len, parity = parityData.len
without parity =? await asyncEncode(self.taskpool, encoder, data, manifest.blockSize.int, params.ecM), err: if (
trace "Error encoding data", err = err.msg let res = encoder.encode(data[], parityData);
return failure(err) res.isErr):
trace "Unable to encode manifest!", error = $res.error
return failure($res.error)
var idx = params.rounded + step var idx = params.rounded + step
for j in 0..<params.ecM: for j in 0..<params.ecM:
without blk =? bt.Block.new(parity[j]), error: without blk =? bt.Block.new(parityData[j]), error:
trace "Unable to create parity block", err = error.msg trace "Unable to create parity block", err = error.msg
return failure(error) return failure(error)
@ -396,15 +400,21 @@ proc decode*(
cids[].setLen(encoded.blocksCount) cids[].setLen(encoded.blocksCount)
try: try:
for step in 0..<encoded.steps: for step in 0..<encoded.steps:
# TODO: this is a tight blocking loop so we sleep here to allow
# other events to be processed, this should be addressed
# by threading
await sleepAsync(10.millis)
var var
data = seq[seq[byte]].new() data = seq[seq[byte]].new()
parity = seq[seq[byte]].new() parityData = seq[seq[byte]].new()
recovered = newSeqWith[seq[byte]](encoded.ecK, newSeq[byte](encoded.blockSize.int))
data[].setLen(encoded.ecK) # set len to K data[].setLen(encoded.ecK) # set len to K
parity[].setLen(encoded.ecM) # set len to M parityData[].setLen(encoded.ecM) # set len to M
without (dataPieces, _) =? without (dataPieces, _) =?
(await self.prepareDecodingData(encoded, step, data, parity, cids, emptyBlock)), err: (await self.prepareDecodingData(encoded, step, data, parityData, cids, emptyBlock)), err:
trace "Unable to prepare data", error = err.msg trace "Unable to prepare data", error = err.msg
return failure(err) return failure(err)
@ -414,9 +424,11 @@ proc decode*(
trace "Erasure decoding data" trace "Erasure decoding data"
without recovered =? await asyncDecode(self.taskpool, decoder, data, parity, encoded.blockSize.int), err: if (
trace "Error decoding data", err = err.msg let err = decoder.decode(data[], parityData[], recovered);
return failure(err) err.isErr):
trace "Unable to decode data!", err = $err.error
return failure($err.error)
for i in 0..<encoded.ecK: for i in 0..<encoded.ecK:
let idx = i * encoded.steps + step let idx = i * encoded.steps + step
@ -470,13 +482,11 @@ proc new*(
T: type Erasure, T: type Erasure,
store: BlockStore, store: BlockStore,
encoderProvider: EncoderProvider, encoderProvider: EncoderProvider,
decoderProvider: DecoderProvider, decoderProvider: DecoderProvider): Erasure =
taskpool: Taskpool): Erasure =
## Create a new Erasure instance for encoding and decoding manifests ## Create a new Erasure instance for encoding and decoding manifests
## ##
Erasure( Erasure(
store: store, store: store,
encoderProvider: encoderProvider, encoderProvider: encoderProvider,
decoderProvider: decoderProvider, decoderProvider: decoderProvider)
taskpool: taskpool)

View File

@ -13,7 +13,6 @@ import std/options
import std/sequtils import std/sequtils
import std/strformat import std/strformat
import std/sugar import std/sugar
import std/cpuinfo
import times import times
import pkg/questionable import pkg/questionable
@ -27,7 +26,6 @@ import pkg/libp2p/stream/bufferstream
# TODO: remove once exported by libp2p # TODO: remove once exported by libp2p
import pkg/libp2p/routing_record import pkg/libp2p/routing_record
import pkg/libp2p/signed_envelope import pkg/libp2p/signed_envelope
import pkg/taskpools
import ./chunker import ./chunker
import ./slots import ./slots
@ -72,7 +70,6 @@ type
contracts*: Contracts contracts*: Contracts
clock*: Clock clock*: Clock
storage*: Contracts storage*: Contracts
taskpool*: Taskpool
CodexNodeRef* = ref CodexNode CodexNodeRef* = ref CodexNode
@ -214,7 +211,7 @@ proc fetchBatched*(
proc streamSingleBlock( proc streamSingleBlock(
self: CodexNodeRef, self: CodexNodeRef,
cid: Cid cid: Cid
): Future[?!LPstream] {.async.} = ): Future[?!LPStream] {.async.} =
## Streams the contents of a single block. ## Streams the contents of a single block.
## ##
trace "Streaming single block", cid = cid trace "Streaming single block", cid = cid
@ -255,8 +252,7 @@ proc streamEntireDataset(
erasure = Erasure.new( erasure = Erasure.new(
self.networkStore, self.networkStore,
leoEncoderProvider, leoEncoderProvider,
leoDecoderProvider, leoDecoderProvider)
self.taskpool)
without _ =? (await erasure.decode(manifest)), error: without _ =? (await erasure.decode(manifest)), error:
error "Unable to erasure decode manifest", manifestCid, exc = error.msg error "Unable to erasure decode manifest", manifestCid, exc = error.msg
return failure(error) return failure(error)
@ -442,8 +438,7 @@ proc setupRequest(
erasure = Erasure.new( erasure = Erasure.new(
self.networkStore.localStore, self.networkStore.localStore,
leoEncoderProvider, leoEncoderProvider,
leoDecoderProvider, leoDecoderProvider)
self.taskpool)
without encoded =? (await erasure.encode(manifest, ecK, ecM)), error: without encoded =? (await erasure.encode(manifest, ecK, ecM)), error:
trace "Unable to erasure code dataset" trace "Unable to erasure code dataset"
@ -779,8 +774,7 @@ proc new*(
engine: BlockExcEngine, engine: BlockExcEngine,
discovery: Discovery, discovery: Discovery,
prover = Prover.none, prover = Prover.none,
contracts = Contracts.default, contracts = Contracts.default): CodexNodeRef =
taskpool = Taskpool.new(num_threads = countProcessors())): CodexNodeRef =
## Create new instance of a Codex self, call `start` to run it ## Create new instance of a Codex self, call `start` to run it
## ##
@ -790,5 +784,4 @@ proc new*(
engine: engine, engine: engine,
prover: prover, prover: prover,
discovery: discovery, discovery: discovery,
contracts: contracts, contracts: contracts)
taskpool: taskpool)

View File

@ -1,10 +1,8 @@
import std/tables import std/tables
import std/times import std/times
import std/cpuinfo
import pkg/libp2p import pkg/libp2p
import pkg/chronos import pkg/chronos
import pkg/taskpools
import pkg/codex/codextypes import pkg/codex/codextypes
import pkg/codex/chunker import pkg/codex/chunker
import pkg/codex/stores import pkg/codex/stores
@ -83,7 +81,6 @@ template setupAndTearDown*() {.dirty.} =
pendingBlocks: PendingBlocksManager pendingBlocks: PendingBlocksManager
discovery: DiscoveryEngine discovery: DiscoveryEngine
advertiser: Advertiser advertiser: Advertiser
taskpool: Taskpool
let let
path = currentSourcePath().parentDir path = currentSourcePath().parentDir
@ -113,14 +110,12 @@ template setupAndTearDown*() {.dirty.} =
advertiser = Advertiser.new(localStore, blockDiscovery) advertiser = Advertiser.new(localStore, blockDiscovery)
engine = BlockExcEngine.new(localStore, wallet, network, discovery, advertiser, peerStore, pendingBlocks) engine = BlockExcEngine.new(localStore, wallet, network, discovery, advertiser, peerStore, pendingBlocks)
store = NetworkStore.new(engine, localStore) store = NetworkStore.new(engine, localStore)
taskpool = Taskpool.new(num_threads = countProcessors())
node = CodexNodeRef.new( node = CodexNodeRef.new(
switch = switch, switch = switch,
networkStore = store, networkStore = store,
engine = engine, engine = engine,
prover = Prover.none, prover = Prover.none,
discovery = blockDiscovery, discovery = blockDiscovery)
taskpool = taskpool)
teardown: teardown:
close(file) close(file)

View File

@ -4,7 +4,6 @@ import std/math
import std/times import std/times
import std/sequtils import std/sequtils
import std/importutils import std/importutils
import std/cpuinfo
import pkg/chronos import pkg/chronos
import pkg/stew/byteutils import pkg/stew/byteutils
@ -15,7 +14,6 @@ import pkg/questionable/results
import pkg/stint import pkg/stint
import pkg/poseidon2 import pkg/poseidon2
import pkg/poseidon2/io import pkg/poseidon2/io
import pkg/taskpools
import pkg/nitro import pkg/nitro
import pkg/codexdht/discv5/protocol as discv5 import pkg/codexdht/discv5/protocol as discv5
@ -82,7 +80,7 @@ asyncchecksuite "Test Node - Host contracts":
manifestBlock = bt.Block.new( manifestBlock = bt.Block.new(
manifest.encode().tryGet(), manifest.encode().tryGet(),
codec = ManifestCodec).tryGet() codec = ManifestCodec).tryGet()
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool) erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
manifestCid = manifestBlock.cid manifestCid = manifestBlock.cid
manifestCidStr = $(manifestCid) manifestCidStr = $(manifestCid)

View File

@ -4,7 +4,6 @@ import std/math
import std/times import std/times
import std/sequtils import std/sequtils
import std/importutils import std/importutils
import std/cpuinfo
import pkg/chronos import pkg/chronos
import pkg/stew/byteutils import pkg/stew/byteutils
@ -15,7 +14,6 @@ import pkg/questionable/results
import pkg/stint import pkg/stint
import pkg/poseidon2 import pkg/poseidon2
import pkg/poseidon2/io import pkg/poseidon2/io
import pkg/taskpools
import pkg/nitro import pkg/nitro
import pkg/codexdht/discv5/protocol as discv5 import pkg/codexdht/discv5/protocol as discv5
@ -141,7 +139,7 @@ asyncchecksuite "Test Node - Basic":
test "Setup purchase request": test "Setup purchase request":
let let
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool) erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
manifest = await storeDataGetManifest(localStore, chunker) manifest = await storeDataGetManifest(localStore, chunker)
manifestBlock = bt.Block.new( manifestBlock = bt.Block.new(
manifest.encode().tryGet(), manifest.encode().tryGet(),

View File

@ -1,6 +1,5 @@
import std/sequtils import std/sequtils
import std/sugar import std/sugar
import std/cpuinfo
import pkg/chronos import pkg/chronos
import pkg/datastore import pkg/datastore
@ -13,7 +12,6 @@ import pkg/codex/blocktype as bt
import pkg/codex/rng import pkg/codex/rng
import pkg/codex/utils import pkg/codex/utils
import pkg/codex/indexingstrategy import pkg/codex/indexingstrategy
import pkg/taskpools
import ../asynctest import ../asynctest
import ./helpers import ./helpers
@ -28,7 +26,6 @@ suite "Erasure encode/decode":
var manifest: Manifest var manifest: Manifest
var store: BlockStore var store: BlockStore
var erasure: Erasure var erasure: Erasure
var taskpool: Taskpool
let repoTmp = TempLevelDb.new() let repoTmp = TempLevelDb.new()
let metaTmp = TempLevelDb.new() let metaTmp = TempLevelDb.new()
@ -39,8 +36,7 @@ suite "Erasure encode/decode":
rng = Rng.instance() rng = Rng.instance()
chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize) chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize)
store = RepoStore.new(repoDs, metaDs) store = RepoStore.new(repoDs, metaDs)
taskpool = Taskpool.new(num_threads = countProcessors()) erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool)
manifest = await storeDataGetManifest(store, chunker) manifest = await storeDataGetManifest(store, chunker)
teardown: teardown: