Fix erasure coding threading bug

This commit is contained in:
Tomasz Bekas 2024-08-21 16:03:37 +02:00
parent 1e2ad95659
commit ffd71fa1e0
No known key found for this signature in database
GPG Key ID: 4854E04C98824959
8 changed files with 72 additions and 160 deletions

View File

@ -8,18 +8,5 @@
## those terms. ## those terms.
import ./erasure/erasure import ./erasure/erasure
import ./erasure/backends/leopard
export erasure export erasure
func leoEncoderProvider*(
size, buffers, parity: int
): EncoderBackend {.raises: [Defect].} =
## create new Leo Encoder
LeoEncoderBackend.new(size, buffers, parity)
func leoDecoderProvider*(
size, buffers, parity: int
): DecoderBackend {.raises: [Defect].} =
## create new Leo Decoder
LeoDecoderBackend.new(size, buffers, parity)

View File

@ -14,6 +14,7 @@ import pkg/taskpools/flowvars
import pkg/chronos import pkg/chronos
import pkg/chronos/threadsync import pkg/chronos/threadsync
import pkg/questionable/results import pkg/questionable/results
import pkg/leopard
import ./backend import ./backend
import ../errors import ../errors
@ -29,124 +30,62 @@ const
type type
EncoderBackendPtr = ptr EncoderBackend EncoderBackendPtr = ptr EncoderBackend
DecoderBackendPtr = ptr DecoderBackend DecoderBackendPtr = ptr DecoderBackend
DecoderPtr = ptr LeoDecoder
EncoderPtr = ptr LeoEncoder
# Args objects are missing seq[seq[byte]] field, to avoid unnecessary data copy # Args objects are missing seq[seq[byte]] field, to avoid unnecessary data copy
EncodeTaskArgs = object EncodeTaskArgs = object
signal: ThreadSignalPtr signal: ThreadSignalPtr
backend: EncoderBackendPtr encoder: EncoderPtr
blockSize: int
ecM: int
DecodeTaskArgs = object DecodeTaskArgs = object
signal: ThreadSignalPtr signal: ThreadSignalPtr
backend: DecoderBackendPtr decoder: DecoderPtr
blockSize: int
ecK: int
SharedArrayHolder*[T] = object SharedArrayHolder*[T] = object
data: ptr UncheckedArray[T] data: ptr UncheckedArray[T]
size: int size: int
EncodeTaskResult = Result[SharedArrayHolder[byte], cstring] TaskResult = Result[void, cstring]
DecodeTaskResult = Result[SharedArrayHolder[byte], cstring]
proc encodeTask(args: EncodeTaskArgs, data: seq[seq[byte]]): EncodeTaskResult =
var
data = data.unsafeAddr
parity = newSeqWith[seq[byte]](args.ecM, newSeq[byte](args.blockSize))
proc encodeTask(args: EncodeTaskArgs): TaskResult =
try: try:
let res = args.backend[].encode(data[], parity) return args.encoder[].encodePrepared()
if res.isOk:
let
resDataSize = parity.len * args.blockSize
resData = cast[ptr UncheckedArray[byte]](allocShared0(resDataSize))
arrHolder = SharedArrayHolder[byte](
data: resData,
size: resDataSize
)
for i in 0..<parity.len:
copyMem(addr resData[i * args.blockSize], addr parity[i][0], args.blockSize)
return ok(arrHolder)
else:
return err(res.error)
except CatchableError as exception:
return err(exception.msg.cstring)
finally: finally:
if err =? args.signal.fireSync().mapFailure.errorOption(): if err =? args.signal.fireSync().mapFailure.errorOption():
error "Error firing signal", msg = err.msg error "Error firing signal", msg = err.msg
proc decodeTask(args: DecodeTaskArgs, data: seq[seq[byte]], parity: seq[seq[byte]]): DecodeTaskResult = proc decodeTask(args: DecodeTaskArgs): TaskResult =
var
data = data.unsafeAddr
parity = parity.unsafeAddr
recovered = newSeqWith[seq[byte]](args.ecK, newSeq[byte](args.blockSize))
try: try:
let res = args.backend[].decode(data[], parity[], recovered) return args.decoder[].decodePrepared()
if res.isOk:
let
resDataSize = recovered.len * args.blockSize
resData = cast[ptr UncheckedArray[byte]](allocShared0(resDataSize))
arrHolder = SharedArrayHolder[byte](
data: resData,
size: resDataSize
)
for i in 0..<recovered.len:
copyMem(addr resData[i * args.blockSize], addr recovered[i][0], args.blockSize)
return ok(arrHolder)
else:
return err(res.error)
except CatchableError as exception:
return err(exception.msg.cstring)
finally: finally:
if err =? args.signal.fireSync().mapFailure.errorOption(): if err =? args.signal.fireSync().mapFailure.errorOption():
error "Error firing signal", msg = err.msg error "Error firing signal", msg = err.msg
proc proxySpawnEncodeTask( proc proxySpawnEncodeTask(
tp: Taskpool, tp: Taskpool,
args: EncodeTaskArgs, args: EncodeTaskArgs
data: ref seq[seq[byte]] ): Flowvar[TaskResult] =
): Flowvar[EncodeTaskResult] = tp.spawn encodeTask(args)
# FIXME Uncomment the code below after addressing an issue:
# https://github.com/codex-storage/nim-codex/issues/854
# tp.spawn encodeTask(args, data[])
let fv = EncodeTaskResult.newFlowVar
fv.readyWith(encodeTask(args, data[]))
return fv
proc proxySpawnDecodeTask( proc proxySpawnDecodeTask(
tp: Taskpool, tp: Taskpool,
args: DecodeTaskArgs, args: DecodeTaskArgs
data: ref seq[seq[byte]], ): Flowvar[TaskResult] =
parity: ref seq[seq[byte]] tp.spawn decodeTask(args)
): Flowvar[DecodeTaskResult] =
# FIXME Uncomment the code below after addressing an issue:
# https://github.com/codex-storage/nim-codex/issues/854
# tp.spawn decodeTask(args, data[], parity[]) proc awaitTaskResult(signal: ThreadSignalPtr, handle: Flowvar[TaskResult]): Future[?!void] {.async.} =
let fv = DecodeTaskResult.newFlowVar
fv.readyWith(decodeTask(args, data[], parity[]))
return fv
proc awaitResult[T](signal: ThreadSignalPtr, handle: Flowvar[T]): Future[?!T] {.async.} =
await wait(signal) await wait(signal)
var var
res: T res: TaskResult
awaitTotal: Duration awaitTotal: Duration
while awaitTotal < CompletitionTimeout: while awaitTotal < CompletitionTimeout:
if handle.tryComplete(res): if handle.tryComplete(res):
return success(res) if res.isOk:
return success()
else:
return failure($res.error)
else: else:
awaitTotal += CompletitionRetryDelay awaitTotal += CompletitionRetryDelay
await sleepAsync(CompletitionRetryDelay) await sleepAsync(CompletitionRetryDelay)
@ -155,43 +94,45 @@ proc awaitResult[T](signal: ThreadSignalPtr, handle: Flowvar[T]): Future[?!T] {.
proc asyncEncode*( proc asyncEncode*(
tp: Taskpool, tp: Taskpool,
backend: EncoderBackend, encoder: sink LeoEncoder,
data: ref seq[seq[byte]], data: ref seq[seq[byte]],
blockSize: int, blockSize: int,
ecM: int ecM: int
): Future[?!ref seq[seq[byte]]] {.async.} = ): Future[?!ref seq[seq[byte]]] {.async.} =
if ecM == 0:
return success(seq[seq[byte]].new())
without signal =? ThreadSignalPtr.new().mapFailure, err: without signal =? ThreadSignalPtr.new().mapFailure, err:
return failure(err) return failure(err)
try: try:
let if err =? encoder.prepareEncode(data[]).mapFailure.errorOption():
blockSize = data[0].len return failure(err)
args = EncodeTaskArgs(signal: signal, backend: unsafeAddr backend, blockSize: blockSize, ecM: ecM)
handle = proxySpawnEncodeTask(tp, args, data) let
args = EncodeTaskArgs(signal: signal, encoder: addr encoder)
without res =? await awaitResult(signal, handle), err: handle = proxySpawnEncodeTask(tp, args)
if err =? (await awaitTaskResult(signal, handle)).errorOption():
return failure(err) return failure(err)
if res.isOk:
var parity = seq[seq[byte]].new() var parity = seq[seq[byte]].new()
parity[].setLen(ecM) parity[].setLen(ecM)
for i in 0..<parity[].len: for i in 0..<parity[].len:
parity[i] = newSeq[byte](blockSize) parity[i] = newSeq[byte](blockSize)
copyMem(addr parity[i][0], addr res.value.data[i * blockSize], blockSize)
deallocShared(res.value.data) if err =? encoder.readParity(parity[]).mapFailure.errorOption():
return failure(err)
return success(parity) return success(parity)
else:
return failure($res.error)
finally: finally:
if err =? signal.close().mapFailure.errorOption(): if err =? signal.close().mapFailure.errorOption():
error "Error closing signal", msg = $err.msg error "Error closing signal", msg = $err.msg
proc asyncDecode*( proc asyncDecode*(
tp: Taskpool, tp: Taskpool,
backend: DecoderBackend, decoder: sink LeoDecoder,
data, parity: ref seq[seq[byte]], data, parity: ref seq[seq[byte]],
blockSize: int blockSize: int
): Future[?!ref seq[seq[byte]]] {.async.} = ): Future[?!ref seq[seq[byte]]] {.async.} =
@ -199,27 +140,25 @@ proc asyncDecode*(
return failure(err) return failure(err)
try: try:
let if err =? decoder.prepareDecode(data[], parity[]).mapFailure.errorOption():
ecK = data[].len
args = DecodeTaskArgs(signal: signal, backend: unsafeAddr backend, blockSize: blockSize, ecK: ecK)
handle = proxySpawnDecodeTask(tp, args, data, parity)
without res =? await awaitResult(signal, handle), err:
return failure(err) return failure(err)
if res.isOk: let
var recovered = seq[seq[byte]].new() args = DecodeTaskArgs(signal: signal, decoder: addr decoder)
recovered[].setLen(ecK) handle = proxySpawnDecodeTask(tp, args)
if err =? (await awaitTaskResult(signal, handle)).errorOption():
return failure(err)
var recovered = seq[seq[byte]].new()
recovered[].setLen(data[].len)
for i in 0..<recovered[].len: for i in 0..<recovered[].len:
recovered[i] = newSeq[byte](blockSize) recovered[i] = newSeq[byte](blockSize)
copyMem(addr recovered[i][0], addr res.value.data[i * blockSize], blockSize)
deallocShared(res.value.data) if err =? decoder.readDecoded(recovered[]).mapFailure.errorOption():
return failure(err)
return success(recovered) return success(recovered)
else:
return failure($res.error)
finally: finally:
if err =? signal.close().mapFailure.errorOption(): if err =? signal.close().mapFailure.errorOption():
error "Error closing signal", msg = $err.msg error "Error closing signal", msg = $err.msg

View File

@ -18,6 +18,7 @@ import pkg/chronos
import pkg/libp2p/[multicodec, cid, multihash] import pkg/libp2p/[multicodec, cid, multihash]
import pkg/libp2p/protobuf/minprotobuf import pkg/libp2p/protobuf/minprotobuf
import pkg/taskpools import pkg/taskpools
import pkg/leopard
import ../logutils import ../logutils
import ../manifest import ../manifest
@ -31,11 +32,8 @@ import ../errors
import pkg/stew/byteutils import pkg/stew/byteutils
import ./backend
import ./asyncbackend import ./asyncbackend
export backend
logScope: logScope:
topics = "codex erasure" topics = "codex erasure"
@ -63,15 +61,7 @@ type
## or any combination there of. ## or any combination there of.
## ##
EncoderProvider* = proc(size, blocks, parity: int): EncoderBackend
{.raises: [Defect], noSideEffect.}
DecoderProvider* = proc(size, blocks, parity: int): DecoderBackend
{.raises: [Defect], noSideEffect.}
Erasure* = ref object Erasure* = ref object
encoderProvider*: EncoderProvider
decoderProvider*: DecoderProvider
store*: BlockStore store*: BlockStore
taskpool: Taskpool taskpool: Taskpool
@ -285,11 +275,13 @@ proc encodeData(
var var
cids = seq[Cid].new() cids = seq[Cid].new()
encoder = self.encoderProvider(manifest.blockSize.int, params.ecK, params.ecM)
emptyBlock = newSeq[byte](manifest.blockSize.int) emptyBlock = newSeq[byte](manifest.blockSize.int)
cids[].setLen(params.blocksCount) cids[].setLen(params.blocksCount)
without var encoder =? LeoEncoder.init(manifest.blockSize.int, params.ecK, params.ecM).mapFailure, err:
return failure(err)
try: try:
for step in 0..<params.steps: for step in 0..<params.steps:
# TODO: Don't allocate a new seq every time, allocate once and zero out # TODO: Don't allocate a new seq every time, allocate once and zero out
@ -349,7 +341,7 @@ proc encodeData(
trace "Erasure coding encoding error", exc = exc.msg trace "Erasure coding encoding error", exc = exc.msg
return failure(exc) return failure(exc)
finally: finally:
encoder.release() encoder.free()
proc encode*( proc encode*(
self: Erasure, self: Erasure,
@ -390,9 +382,11 @@ proc decode*(
var var
cids = seq[Cid].new() cids = seq[Cid].new()
recoveredIndices = newSeq[Natural]() recoveredIndices = newSeq[Natural]()
decoder = self.decoderProvider(encoded.blockSize.int, encoded.ecK, encoded.ecM)
emptyBlock = newSeq[byte](encoded.blockSize.int) emptyBlock = newSeq[byte](encoded.blockSize.int)
without var decoder =? LeoDecoder.init(encoded.blockSize.int, encoded.ecK, encoded.ecM).mapFailure, err:
return failure(err)
cids[].setLen(encoded.blocksCount) cids[].setLen(encoded.blocksCount)
try: try:
for step in 0..<encoded.steps: for step in 0..<encoded.steps:
@ -439,7 +433,7 @@ proc decode*(
trace "Erasure coding decoding error", exc = exc.msg trace "Erasure coding decoding error", exc = exc.msg
return failure(exc) return failure(exc)
finally: finally:
decoder.release() decoder.free()
without tree =? CodexTree.init(cids[0..<encoded.originalBlocksCount]), err: without tree =? CodexTree.init(cids[0..<encoded.originalBlocksCount]), err:
return failure(err) return failure(err)
@ -469,14 +463,10 @@ proc stop*(self: Erasure) {.async.} =
proc new*( proc new*(
T: type Erasure, T: type Erasure,
store: BlockStore, store: BlockStore,
encoderProvider: EncoderProvider,
decoderProvider: DecoderProvider,
taskpool: Taskpool): Erasure = taskpool: Taskpool): Erasure =
## Create a new Erasure instance for encoding and decoding manifests ## Create a new Erasure instance for encoding and decoding manifests
## ##
Erasure( Erasure(
store: store, store: store,
encoderProvider: encoderProvider,
decoderProvider: decoderProvider,
taskpool: taskpool) taskpool: taskpool)

View File

@ -253,8 +253,6 @@ proc streamEntireDataset(
let let
erasure = Erasure.new( erasure = Erasure.new(
self.networkStore, self.networkStore,
leoEncoderProvider,
leoDecoderProvider,
self.taskpool) self.taskpool)
without _ =? (await erasure.decode(manifest)), error: without _ =? (await erasure.decode(manifest)), error:
error "Unable to erasure decode manifest", manifestCid, exc = error.msg error "Unable to erasure decode manifest", manifestCid, exc = error.msg
@ -433,8 +431,6 @@ proc setupRequest(
let let
erasure = Erasure.new( erasure = Erasure.new(
self.networkStore.localStore, self.networkStore.localStore,
leoEncoderProvider,
leoDecoderProvider,
self.taskpool) self.taskpool)
without encoded =? (await erasure.encode(manifest, ecK, ecM)), error: without encoded =? (await erasure.encode(manifest, ecK, ecM)), error:

View File

@ -82,7 +82,7 @@ asyncchecksuite "Test Node - Host contracts":
manifestBlock = bt.Block.new( manifestBlock = bt.Block.new(
manifest.encode().tryGet(), manifest.encode().tryGet(),
codec = ManifestCodec).tryGet() codec = ManifestCodec).tryGet()
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool) erasure = Erasure.new(store, taskpool)
manifestCid = manifestBlock.cid manifestCid = manifestBlock.cid
manifestCidStr = $(manifestCid) manifestCidStr = $(manifestCid)

View File

@ -141,7 +141,7 @@ asyncchecksuite "Test Node - Basic":
test "Setup purchase request": test "Setup purchase request":
let let
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool) erasure = Erasure.new(store, taskpool)
manifest = await storeDataGetManifest(localStore, chunker) manifest = await storeDataGetManifest(localStore, chunker)
manifestBlock = bt.Block.new( manifestBlock = bt.Block.new(
manifest.encode().tryGet(), manifest.encode().tryGet(),

View File

@ -40,7 +40,7 @@ suite "Erasure encode/decode":
chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize) chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize)
store = RepoStore.new(repoDs, metaDs) store = RepoStore.new(repoDs, metaDs)
taskpool = Taskpool.new(num_threads = countProcessors()) taskpool = Taskpool.new(num_threads = countProcessors())
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool) erasure = Erasure.new(store, taskpool)
manifest = await storeDataGetManifest(store, chunker) manifest = await storeDataGetManifest(store, chunker)
teardown: teardown:

2
vendor/nim-leopard vendored

@ -1 +1 @@
Subproject commit 895ff24ca6615d577acfb11811cdd5465f596c97 Subproject commit 68e691583e83e98f0e23d6b5e4df3354966aa33c