diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 9b4df58d..67358fb0 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -12,9 +12,11 @@ import pkg/upraises push: {.upraises: [].} import std/sequtils +import std/options import pkg/chronos import pkg/chronicles +import pkg/questionable import ../manifest import ../stores @@ -62,26 +64,181 @@ type decoderProvider*: DecoderProvider store*: BlockStore -proc encode*( - self: Erasure, - manifest: Manifest, - blocks: int, - parity: int -): Future[?!Manifest] {.async.} = - ## Encode a manifest into one that is erasure protected. + GetNext = proc(): Future[?(bt.Block, int)] {.upraises: [], gcsafe, closure.} + PendingBlocksIter* = ref object + finished*: bool + next*: GetNext + +func indexToPos(self: Erasure, encoded: Manifest, idx, step: int): int {.inline.} = + ## Convert an index to a position in the encoded + ## dataset + ## `idx` - the index to convert + ## `step` - the current step + ## `pos` - the position in the encoded dataset ## - ## `manifest` - the original manifest to be encoded - ## `blocks` - the number of blocks to be encoded - K - ## `parity` - the number of parity blocks to generate - M + + (idx - step) div encoded.steps + +iterator items*(blocks: PendingBlocksIter): Future[?(bt.Block, int)] = + while not blocks.finished: + yield blocks.next() + +proc getPendingBlocks( + self: Erasure, + manifest: Manifest, + start, stop, steps: int): ?!PendingBlocksIter = + ## Get pending blocks iterator ## + var + # calculate block indexes to retrieve + blockIdx = toSeq(countup(start, stop, steps)) + # request all blocks from the store + pendingBlocks = blockIdx.mapIt( + self.store.getBlock(manifest[it]) # Get the data blocks (first K) + ) + indices = pendingBlocks # needed so we can track the block indices + iter = PendingBlocksIter(finished: false) + + trace "Requesting blocks", pendingBlocks = pendingBlocks.len + proc next(): Future[?(bt.Block, int)] {.async.} = + if iter.finished: + trace "No more blocks" + return none (bt.Block, int) + + if pendingBlocks.len == 0: + iter.finished = true + trace "No more blocks - finished" + return none (bt.Block, int) + + let + done = await one(pendingBlocks) + idx = indices.find(done) + + logScope: + idx = idx + blockIdx = blockIdx[idx] + manifest = manifest[blockIdx[idx]] + + pendingBlocks.del(pendingBlocks.find(done)) + without blk =? (await done), error: + trace "Failed retrieving block", err = $error.msg + return none (bt.Block, int) + + trace "Retrieved block" + some (blk, blockIdx[idx]) + + iter.next = next + success iter + +proc prepareEncodingData( + self: Erasure, + encoded: Manifest, + step: int, + data: ref seq[seq[byte]], + emptyBlock: seq[byte]): Future[?!int] {.async.} = + ## Prepare data for encoding + ## + + without pendingBlocksIter =? + self.getPendingBlocks( + encoded, + step, + encoded.rounded - 1, encoded.steps), err: + trace "Unable to get pending blocks", error = err.msg + return failure(err) + + var resolved = 0 + for blkFut in pendingBlocksIter: + if (blk, idx) =? (await blkFut): + let + pos = self.indexToPos(encoded, idx, step) + + if blk.isEmpty: + trace "Padding with empty block", idx + shallowCopy(data[pos], emptyBlock) + else: + trace "Encoding block", cid = blk.cid, idx + shallowCopy(data[pos], blk.data) + + resolved.inc() + + success resolved + +proc prepareDecodingData( + self: Erasure, + encoded: Manifest, + step: int, + data: ref seq[seq[byte]], + parityData: ref seq[seq[byte]], + emptyBlock: seq[byte]): Future[?!(int, int)] {.async.} = + ## Prepare data for decoding + ## `encoded` - the encoded manifest + ## `step` - the current step + ## `data` - the data to be prepared + ## `parityData` - the parityData to be prepared + ## `emptyBlock` - the empty block to be used for padding + ## + + without pendingBlocksIter =? + self.getPendingBlocks( + encoded, + step, + encoded.len - 1, encoded.steps), err: + trace "Unable to get pending blocks", error = err.msg + return failure(err) + + var + dataPieces = 0 + parityPieces = 0 + resolved = 0 + for blkFut in pendingBlocksIter: + # Continue to receive blocks until we have just enough for decoding + # or no more blocks can arrive + if resolved >= encoded.ecK: + break + + if (blk, idx) =? (await blkFut): + let + pos = self.indexToPos(encoded, idx, step) + + logScope: + cid = blk.cid + idx = idx + pos = pos + step = step + empty = blk.isEmpty + + if idx >= encoded.rounded: + trace "Retrieved parity block" + shallowCopy(parityData[pos - encoded.ecK], if blk.isEmpty: emptyBlock else: blk.data) + parityPieces.inc + else: + trace "Retrieved data block" + shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data) + dataPieces.inc + + resolved.inc + + return success (dataPieces, parityPieces) + +proc prepareManifest( + self: Erasure, + manifest: Manifest, + blocks: int, + parity: int): ?!Manifest = + logScope: original_cid = manifest.cid.get() original_len = manifest.len blocks = blocks parity = parity - trace "Erasure coding manifest", blocks, parity + if blocks > manifest.len: + trace "Unable to encode manifest, not enough blocks", blocks = blocks, len = manifest.len + return failure("Not enough blocks to encode") + + trace "Preparing erasure coded manifest", blocks, parity without var encoded =? Manifest.new(manifest, blocks, parity), error: trace "Unable to create manifest", msg = error.msg return error.failure @@ -91,57 +248,75 @@ proc encode*( rounded_blocks = encoded.rounded new_manifest = encoded.len + trace "Erasure coded manifest prepared" + + success encoded + +proc encodeData( + self: Erasure, + manifest: Manifest): Future[?!void] {.async.} = + ## Encode blocks pointed to by the protected manifest + ## + ## `manifest` - the manifest to encode + ## + var - encoder = self.encoderProvider(manifest.blockSize.int, blocks, parity) + encoded = manifest + + logScope: + steps = encoded.steps + rounded_blocks = encoded.rounded + new_manifest = encoded.len + protected = encoded.protected + ecK = encoded.ecK + ecM = encoded.ecM + + if not encoded.protected: + trace "Manifest is not erasure protected" + return failure("Manifest is not erasure protected") + + var + encoder = self.encoderProvider(encoded.blockSize.int, encoded.ecK, encoded.ecM) + emptyBlock = newSeq[byte](encoded.blockSize.int) try: - for i in 0..= encoded.ecK) or (idxPendingBlocks.len == 0): - break + data[].setLen(encoded.ecK) # set len to K + parityData[].setLen(encoded.ecM) # set len to M - let - done = await one(idxPendingBlocks) - idx = pendingBlocks.find(done) - - idxPendingBlocks.del(idxPendingBlocks.find(done)) - - without blk =? (await done), error: - trace "Failed retrieving block", error = error.msg - continue - - if idx >= encoded.ecK: - trace "Retrieved parity block", cid = blk.cid, idx - shallowCopy(parityData[idx - encoded.ecK], if blk.isEmpty: emptyBlock else: blk.data) - else: - trace "Retrieved data block", cid = blk.cid, idx - shallowCopy(data[idx], if blk.isEmpty: emptyBlock else: blk.data) - - resolved.inc - - let - dataPieces = data.filterIt( it.len > 0 ).len - parityPieces = parityData.filterIt( it.len > 0 ).len + without (dataPieces, parityPieces) =? + (await self.prepareDecodingData(encoded, step, data, parityData, emptyBlock)), err: + trace "Unable to prepare data", error = err.msg + return failure(err) if dataPieces >= encoded.ecK: - trace "Retrieved all the required data blocks", data = dataPieces, parity = parityPieces + trace "Retrieved all the required data blocks" continue - trace "Erasure decoding data", data = dataPieces, parity = parityPieces + trace "Erasure decoding data" if ( - let err = decoder.decode(data, parityData, recovered); + let err = decoder.decode(data[], parityData[], recovered); err.isErr): - trace "Unable to decode manifest!", err = $err.error + trace "Unable to decode data!", err = $err.error return failure($err.error) for i in 0..