erasure: generalizing encoding

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Csaba Kiraly 2023-11-14 18:28:45 +01:00
parent 41669e9cb3
commit 160e5499f5
No known key found for this signature in database
GPG Key ID: 0FE274EE8C95166E
1 changed files with 147 additions and 87 deletions

View File

@ -73,11 +73,12 @@ type
EncodingParams = object EncodingParams = object
ecK: int ecK: int
ecM: int ecM: int
rounded: int interleave: int
rounded: int # padded number of blocks
steps: int steps: int
blocksCount: int blocksCount: int # total number of blocks including padding en EC
func indexToPos(steps, idx, step: int): int {.inline.} = func oldIndexToPos(params: EncodingParams, idx: int): int {.inline.} =
## Convert an index to a position in the encoded ## Convert an index to a position in the encoded
## dataset ## dataset
## `idx` - the index to convert ## `idx` - the index to convert
@ -85,7 +86,39 @@ func indexToPos(steps, idx, step: int): int {.inline.} =
## `pos` - the position in the encoded dataset ## `pos` - the position in the encoded dataset
## ##
(idx - step) div steps #(idx div params.interleave) mod (params.ecK + params.ecM)
(idx div params.interleave) mod (params.ecK)
func newIndexToPos(encoded: Manifest, idx: int): int {.inline.} =
(idx div encoded.interleave) mod (encoded.ecK + encoded.ecM)
func newIndex(params: EncodingParams, step, column, pos: int): int =
#params.rounded + step + params.steps * pos
#step + params.steps * pos
step * params.interleave * (params.ecK + params.ecM) + pos * params.interleave + column
func newIndex(params: Manifest, step, column, pos: int): int =
#params.rounded + step + params.steps * pos
#step + encoded.steps * pos
step * params.interleave * (params.ecK + params.ecM) + pos * params.interleave + column
func oldIndex(params: EncodingParams, step, column, pos: int): int =
#params.rounded + step + params.steps * pos
#step + encoded.steps * pos
step * params.interleave * (params.ecK) + pos * params.interleave + column
func oldIndex(params: Manifest, step, column, pos: int): int =
#params.rounded + step + params.steps * pos
#step + encoded.steps * pos
step * params.interleave * (params.ecK) + pos * params.interleave + column
iterator oldIndices(params: EncodingParams, step, column: int): int =
for i in 0 ..< params.ecK:
yield oldIndex(params, step, column, i)
iterator newIndices(params: Manifest, step, column: int): int =
for i in 0 ..< params.ecK + params.ecM:
yield newIndex(params, step, column, i)
proc getPendingBlocks( proc getPendingBlocks(
self: Erasure, self: Erasure,
@ -118,6 +151,7 @@ proc prepareEncodingData(
manifest: Manifest, manifest: Manifest,
params: EncodingParams, params: EncodingParams,
step: int, step: int,
column: int,
data: ref seq[seq[byte]], data: ref seq[seq[byte]],
cids: ref seq[Cid], cids: ref seq[Cid],
emptyBlock: seq[byte]): Future[?!int] {.async.} = emptyBlock: seq[byte]): Future[?!int] {.async.} =
@ -125,7 +159,7 @@ proc prepareEncodingData(
## ##
let let
indicies = toSeq(countup(step, params.rounded - 1, params.steps)) indicies = toSeq(oldIndices(params, step, column))
pendingBlocksIter = self.getPendingBlocks(manifest, indicies.filterIt(it < manifest.blocksCount)) pendingBlocksIter = self.getPendingBlocks(manifest, indicies.filterIt(it < manifest.blocksCount))
var resolved = 0 var resolved = 0
@ -134,20 +168,22 @@ proc prepareEncodingData(
without blk =? blkOrErr, err: without blk =? blkOrErr, err:
warn "Failed retreiving a block", treeCid = manifest.treeCid, idx, msg = err.msg warn "Failed retreiving a block", treeCid = manifest.treeCid, idx, msg = err.msg
continue continue
let pos = indexToPos(params.steps, idx, step) let pos = oldIndexToPos(params, idx)
let newidx = newIndex(params, step, column, pos)
shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data) shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data)
cids[idx] = blk.cid cids[newidx] = blk.cid
resolved.inc() resolved.inc()
for idx in indicies.filterIt(it >= manifest.blocksCount): for idx in indicies.filterIt(it >= manifest.blocksCount):
let pos = indexToPos(params.steps, idx, step) let pos = oldIndexToPos(params, idx)
trace "Padding with empty block", idx let newidx = newIndex(params, step, column, pos)
trace "Padding with empty block", idx, newidx
shallowCopy(data[pos], emptyBlock) shallowCopy(data[pos], emptyBlock)
without emptyBlockCid =? emptyCid(manifest.version, manifest.hcodec, manifest.codec), err: without emptyBlockCid =? emptyCid(manifest.version, manifest.hcodec, manifest.codec), err:
return failure(err) return failure(err)
cids[idx] = emptyBlockCid cids[newidx] = emptyBlockCid
success(resolved) success(resolved)
@ -155,6 +191,7 @@ proc prepareDecodingData(
self: Erasure, self: Erasure,
encoded: Manifest, encoded: Manifest,
step: int, step: int,
column: int,
data: ref seq[seq[byte]], data: ref seq[seq[byte]],
parityData: ref seq[seq[byte]], parityData: ref seq[seq[byte]],
cids: ref seq[Cid], cids: ref seq[Cid],
@ -168,8 +205,8 @@ proc prepareDecodingData(
## `emptyBlock` - the empty block to be used for padding ## `emptyBlock` - the empty block to be used for padding
## ##
let let
indicies = toSeq(countup(step, encoded.blocksCount - 1, encoded.steps)) indicies = toSeq(newIndices(encoded, step, column))
pendingBlocksIter = self.getPendingBlocks(encoded, indicies) pendingBlocksIter = self.getPendingBlocks(encoded, indicies)
var var
@ -188,7 +225,7 @@ proc prepareDecodingData(
continue continue
let let
pos = indexToPos(encoded.steps, idx, step) pos = newIndexToPos(encoded, idx)
logScope: logScope:
cid = blk.cid cid = blk.cid
@ -198,7 +235,7 @@ proc prepareDecodingData(
empty = blk.isEmpty empty = blk.isEmpty
cids[idx] = blk.cid cids[idx] = blk.cid
if idx >= encoded.rounded: if pos >= encoded.ecK:
trace "Retrieved parity block" trace "Retrieved parity block"
shallowCopy(parityData[pos - encoded.ecK], if blk.isEmpty: emptyBlock else: blk.data) shallowCopy(parityData[pos - encoded.ecK], if blk.isEmpty: emptyBlock else: blk.data)
parityPieces.inc parityPieces.inc
@ -211,18 +248,25 @@ proc prepareDecodingData(
return success (dataPieces, parityPieces) return success (dataPieces, parityPieces)
proc init(_: type EncodingParams, manifest: Manifest, ecK: int, ecM: int): ?!EncodingParams = proc init(_: type EncodingParams, manifest: Manifest, ecK, ecM, interl: int): ?!EncodingParams =
## Calculate erasure coding parameters.
## interl: if 0, use the interleaving resulting in a single "step". Otherwise use the given value, and calculate the number of steps needed.
if ecK > manifest.blocksCount: if ecK > manifest.blocksCount:
return failure("Unable to encode manifest, not enough blocks, ecK = " & $ecK & ", blocksCount = " & $manifest.blocksCount) return failure("Unable to encode manifest, not enough blocks, ecK = " & $ecK & ", blocksCount = " & $manifest.blocksCount)
let interleave =
if interl == 0: divUp(manifest.blocksCount, ecK)
else: interl
let let
rounded = roundUp(manifest.blocksCount, ecK) rounded = roundUp(manifest.blocksCount, interleave * ecK)
steps = divUp(manifest.blocksCount, ecK) steps = divUp(manifest.blocksCount, interleave * ecK)
blocksCount = rounded + (steps * ecM) blocksCount = rounded + (steps * interleave * ecM)
EncodingParams( EncodingParams(
ecK: ecK, ecK: ecK,
ecM: ecM, ecM: ecM,
interleave: interleave,
rounded: rounded, rounded: rounded,
steps: steps, steps: steps,
blocksCount: blocksCount blocksCount: blocksCount
@ -244,6 +288,7 @@ proc encodeData(
blocks_count = params.blocksCount blocks_count = params.blocksCount
ecK = params.ecK ecK = params.ecK
ecM = params.ecM ecM = params.ecM
interleave = params.interleave
var var
cids = seq[Cid].new() cids = seq[Cid].new()
@ -254,42 +299,42 @@ proc encodeData(
try: try:
for step in 0..<params.steps: for step in 0..<params.steps:
for column in 0..<params.interleave:
# TODO: Don't allocate a new seq every time, allocate once and zero out # TODO: Don't allocate a new seq every time, allocate once and zero out
var var
data = seq[seq[byte]].new() # number of blocks to encode data = seq[seq[byte]].new() # number of blocks to encode
parityData = newSeqWith[seq[byte]](params.ecM, newSeq[byte](manifest.blockSize.int)) parityData = newSeqWith[seq[byte]](params.ecM, newSeq[byte](manifest.blockSize.int))
data[].setLen(params.ecK) data[].setLen(params.ecK)
# TODO: this is a tight blocking loop so we sleep here to allow # TODO: this is a tight blocking loop so we sleep here to allow
# other events to be processed, this should be addressed # other events to be processed, this should be addressed
# by threading # by threading
await sleepAsync(10.millis) await sleepAsync(10.millis)
without resolved =? without resolved =?
(await self.prepareEncodingData(manifest, params, step, data, cids, emptyBlock)), err: (await self.prepareEncodingData(manifest, params, step, column, data, cids, emptyBlock)), err:
trace "Unable to prepare data", error = err.msg trace "Unable to prepare data", error = err.msg
return failure(err) return failure(err)
trace "Erasure coding data", data = data[].len, parity = parityData.len trace "Erasure coding data", data = data[].len, parity = parityData.len
if ( if (
let res = encoder.encode(data[], parityData); let res = encoder.encode(data[], parityData);
res.isErr): res.isErr):
trace "Unable to encode manifest!", error = $res.error trace "Unable to encode manifest!", error = $res.error
return failure($res.error) return failure($res.error)
var idx = params.rounded + step for j in 0..<params.ecM:
for j in 0..<params.ecM: without blk =? bt.Block.new(parityData[j]), error:
without blk =? bt.Block.new(parityData[j]), error: trace "Unable to create parity block", err = error.msg
trace "Unable to create parity block", err = error.msg return failure(error)
return failure(error)
trace "Adding parity block", cid = blk.cid, idx let idx = newIndex(params, step, column, params.ecK + j)
cids[idx] = blk.cid trace "Adding parity block", cid = blk.cid, idx
if isErr (await self.store.putBlock(blk)): cids[idx] = blk.cid
trace "Unable to store block!", cid = blk.cid if isErr (await self.store.putBlock(blk)):
return failure("Unable to store block!") trace "Unable to store block!", cid = blk.cid
idx.inc(params.steps) return failure("Unable to store block!")
without tree =? MerkleTree.init(cids[]), err: without tree =? MerkleTree.init(cids[]), err:
return failure(err) return failure(err)
@ -305,7 +350,8 @@ proc encodeData(
treeCid = treeCid, treeCid = treeCid,
datasetSize = (manifest.blockSize.int * params.blocksCount).NBytes, datasetSize = (manifest.blockSize.int * params.blocksCount).NBytes,
ecK = params.ecK, ecK = params.ecK,
ecM = params.ecM ecM = params.ecM,
interleave = params.interleave #TODO
) )
return encodedManifest.success return encodedManifest.success
@ -322,7 +368,8 @@ proc encode*(
self: Erasure, self: Erasure,
manifest: Manifest, manifest: Manifest,
blocks: int, blocks: int,
parity: int): Future[?!Manifest] {.async.} = parity: int,
interleave: int = 0): Future[?!Manifest] {.async.} =
## Encode a manifest into one that is erasure protected. ## Encode a manifest into one that is erasure protected.
## ##
## `manifest` - the original manifest to be encoded ## `manifest` - the original manifest to be encoded
@ -330,7 +377,7 @@ proc encode*(
## `parity` - the number of parity blocks to generate - M ## `parity` - the number of parity blocks to generate - M
## ##
without params =? EncodingParams.init(manifest, blocks, parity), err: without params =? EncodingParams.init(manifest, blocks, parity, interleave), err:
return failure(err) return failure(err)
without encodedManifest =? await self.encodeData(manifest, params), err: without encodedManifest =? await self.encodeData(manifest, params), err:
@ -360,52 +407,54 @@ proc decode*(
decoder = self.decoderProvider(encoded.blockSize.int, encoded.ecK, encoded.ecM) decoder = self.decoderProvider(encoded.blockSize.int, encoded.ecK, encoded.ecM)
emptyBlock = newSeq[byte](encoded.blockSize.int) emptyBlock = newSeq[byte](encoded.blockSize.int)
cids[].setLen(encoded.blocksCount) cids[].setLen(encoded.blocksCount)
try: try:
for step in 0..<encoded.steps: for step in 0..<encoded.steps:
# TODO: this is a tight blocking loop so we sleep here to allow for column in 0..<encoded.interleave:
# other events to be processed, this should be addressed # TODO: this is a tight blocking loop so we sleep here to allow
# by threading # other events to be processed, this should be addressed
await sleepAsync(10.millis) # by threading
await sleepAsync(10.millis)
var var
data = seq[seq[byte]].new() data = seq[seq[byte]].new()
parityData = seq[seq[byte]].new() parityData = seq[seq[byte]].new()
recovered = newSeqWith[seq[byte]](encoded.ecK, newSeq[byte](encoded.blockSize.int)) recovered = newSeqWith[seq[byte]](encoded.ecK, newSeq[byte](encoded.blockSize.int))
data[].setLen(encoded.ecK) # set len to K data[].setLen(encoded.ecK) # set len to K
parityData[].setLen(encoded.ecM) # set len to M parityData[].setLen(encoded.ecM) # set len to M
without (dataPieces, parityPieces) =? without (dataPieces, parityPieces) =?
(await self.prepareDecodingData(encoded, step, data, parityData, cids, emptyBlock)), err: (await self.prepareDecodingData(encoded, step, column, data, parityData, cids, emptyBlock)), err:
trace "Unable to prepare data", error = err.msg trace "Unable to prepare data", error = err.msg
return failure(err) return failure(err)
if dataPieces >= encoded.ecK: if dataPieces >= encoded.ecK:
trace "Retrieved all the required data blocks" trace "Retrieved all the required data blocks"
continue continue
trace "Erasure decoding data" trace "Erasure decoding data"
if ( if (
let err = decoder.decode(data[], parityData[], recovered); let err = decoder.decode(data[], parityData[], recovered);
err.isErr): err.isErr):
trace "Unable to decode data!", err = $err.error trace "Unable to decode data!", err = $err.error
return failure($err.error) return failure($err.error)
for i in 0..<encoded.ecK: for i in 0..<encoded.ecK:
let idx = i * encoded.steps + step let idx = newIndex(encoded, step, column, i)
if data[i].len <= 0 and not cids[idx].isEmpty: if data[i].len <= 0 and not cids[idx].isEmpty:
without blk =? bt.Block.new(recovered[i]), error: without blk =? bt.Block.new(recovered[i]), error:
trace "Unable to create block!", exc = error.msg trace "Unable to create block!", exc = error.msg
return failure(error) return failure(error)
trace "Recovered block", cid = blk.cid, index = i trace "Recovered block", cid = blk.cid, index = i
if isErr (await self.store.putBlock(blk)): if isErr (await self.store.putBlock(blk)):
trace "Unable to store block!", cid = blk.cid trace "Unable to store block!", cid = blk.cid
return failure("Unable to store block!") return failure("Unable to store block!")
cids[idx] = blk.cid cids[idx] = blk.cid
recoveredIndices.add(idx) recoveredIndices.add(idx)
except CancelledError as exc: except CancelledError as exc:
trace "Erasure coding decoding cancelled" trace "Erasure coding decoding cancelled"
raise exc # cancellation needs to be propagated raise exc # cancellation needs to be propagated
@ -415,7 +464,18 @@ proc decode*(
finally: finally:
decoder.release() decoder.release()
without tree =? MerkleTree.init(cids[0..<encoded.originalBlocksCount]), err: # fill old cid list
var oldCids = seq[Cid].new()
oldCids[].setLen(encoded.originalBlocksCount)
for step in 0..<encoded.steps:
for column in 0..<encoded.interleave:
for i in 0..<encoded.ecK:
let idx = newIndex(encoded, step, column, i)
let oldIdx = oldIndex(encoded, step, column, i)
if oldIdx < encoded.originalBlocksCount:
oldCids[oldIdx] = cids[idx]
without tree =? MerkleTree.init(oldCids[]), err:
return failure(err) return failure(err)
without treeCid =? tree.rootCid, err: without treeCid =? tree.rootCid, err: