From 49c41e27b799b105853d2030a842ba73cf83895b Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 5 Apr 2022 13:12:59 -0600 Subject: [PATCH] Erasure coding dataset (#72) * add erasure coding support * add leopard support * erasure coding tests --- dagger/erasure.nim | 19 ++ dagger/erasure/backend.nim | 40 ++++ dagger/erasure/backends/leopard.nim | 83 ++++++++ dagger/erasure/erasure.nim | 267 ++++++++++++++++++++++++ tests/dagger/testerasure.nim | 306 ++++++++++++++++++++++++++++ tests/testDagger.nim | 1 + 6 files changed, 716 insertions(+) create mode 100644 dagger/erasure.nim create mode 100644 dagger/erasure/backend.nim create mode 100644 dagger/erasure/backends/leopard.nim create mode 100644 dagger/erasure/erasure.nim create mode 100644 tests/dagger/testerasure.nim diff --git a/dagger/erasure.nim b/dagger/erasure.nim new file mode 100644 index 00000000..b55ba413 --- /dev/null +++ b/dagger/erasure.nim @@ -0,0 +1,19 @@ +## Nim-Dagger +## Copyright (c) 2022 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import ./erasure/erasure +import ./erasure/backends/leopard + +export erasure + +func leoEncoderProvider*(size, buffers, parity: int): EncoderBackend {.raises: [Defect].} = + LeoEncoderBackend.new(size, buffers, parity) + +func leoDecoderProvider*(size, buffers, parity: int): DecoderBackend {.raises: [Defect].} = + LeoDecoderBackend.new(size, buffers, parity) diff --git a/dagger/erasure/backend.nim b/dagger/erasure/backend.nim new file mode 100644 index 00000000..35a30cd2 --- /dev/null +++ b/dagger/erasure/backend.nim @@ -0,0 +1,40 @@ +## Nim-Dagger +## Copyright (c) 2022 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/upraises + +push: {.upraises: [].} + +import ../manifest +import ../stores + +type + Backend* = ref object of RootObj + blockSize*: int # block size in bytes + buffers*: int # number of original pieces + parity*: int # number of redundancy pieces + + EncoderBackend* = ref object of Backend + DecoderBackend* = ref object of Backend + +method release*(self: Backend) {.base.} = + raiseAssert("not implemented!") + +method encode*( + self: EncoderBackend, + buffers, + parity: var openArray[seq[byte]]): Result[void, cstring] {.base.} = + raiseAssert("not implemented!") + +method decode*( + self: DecoderBackend, + buffers, + parity, + recovered: var openArray[seq[byte]]): Result[void, cstring] {.base.} = + raiseAssert("not implemented!") diff --git a/dagger/erasure/backends/leopard.nim b/dagger/erasure/backends/leopard.nim new file mode 100644 index 00000000..302af0e9 --- /dev/null +++ b/dagger/erasure/backends/leopard.nim @@ -0,0 +1,83 @@ +## Nim-Dagger +## Copyright (c) 2022 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/options + +import pkg/leopard +import pkg/stew/results + +import ../backend + +type + LeoEncoderBackend* = ref object of EncoderBackend + encoder*: Option[LeoEncoder] + + LeoDecoderBackend* = ref object of DecoderBackend + decoder*: Option[LeoDecoder] + +method encode*( + self: LeoEncoderBackend, + data, + parity: var openArray[seq[byte]]): Result[void, cstring] = + + var encoder = if self.encoder.isNone: + self.encoder = (? LeoEncoder.init( + self.blockSize, + self.buffers, + self.parity)).some + self.encoder.get() + else: + self.encoder.get() + + encoder.encode(data, parity) + +method decode*( + self: LeoDecoderBackend, + data, + parity, + recovered: var openArray[seq[byte]]): Result[void, cstring] = + + var decoder = if self.decoder.isNone: + self.decoder = (? LeoDecoder.init( + self.blockSize, + self.buffers, + self.parity)).some + self.decoder.get() + else: + self.decoder.get() + + decoder.decode(data, parity, recovered) + +method release*(self: LeoEncoderBackend) = + if self.encoder.isSome: + self.encoder.get().free() + +method release*(self: LeoDecoderBackend) = + if self.decoder.isSome: + self.decoder.get().free() + +func new*( + T: type LeoEncoderBackend, + blockSize, + buffers, + parity: int): T = + T( + blockSize: blockSize, + buffers: buffers, + parity: parity) + +func new*( + T: type LeoDecoderBackend, + blockSize, + buffers, + parity: int): T = + T( + blockSize: blockSize, + buffers: buffers, + parity: parity) diff --git a/dagger/erasure/erasure.nim b/dagger/erasure/erasure.nim new file mode 100644 index 00000000..a8535036 --- /dev/null +++ b/dagger/erasure/erasure.nim @@ -0,0 +1,267 @@ +## Nim-Dagger +## Copyright (c) 2022 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/upraises + +push: {.upraises: [].} + +import std/sequtils + +import pkg/chronos +import pkg/chronicles + +import ../manifest +import ../stores +import ../errors +import ../blocktype + +import ./backend + +export backend + +logScope: + topics = "dagger erasure" + +type + ## Encode a manifest into one that is erasure protected. + ## + ## The new manifest has K `blocks` that are encoded into + ## additional M `parity` blocks. The resulting dataset + ## is padded with empty blocks if it doesn't have a square + ## shape. + ## + ## NOTE: The padding blocks could be excluded + ## from transmission, but they aren't for now. + ## + ## The resulting dataset is logically divided into rows + ## where a row is made up of B blocks. There are then, + ## K + M = N rows in total, each of length B blocks. Rows + ## are assumed to be of the same number of (B) blocks. + ## + ## The encoding is systematic and the rows can be + ## read sequentially by any node without decoding. + ## + ## Decoding is possible with any K rows or partial K + ## columns (with up to M blocks missing per column), + ## or any combination there of. + ## + + EncoderProvider* = proc(size, blocks, parity: int): EncoderBackend + {.raises: [Defect], noSideEffect.} + + DecoderProvider* = proc(size, blocks, parity: int): DecoderBackend + {.raises: [Defect], noSideEffect.} + + Erasure* = ref object + encoderProvider*: EncoderProvider + decoderProvider*: DecoderProvider + store*: BlockStore + +proc encode*( + self: Erasure, + manifest: Manifest, + blocks: int, + parity: int): Future[?!Manifest] {.async.} = + ## Encode a manifest into one that is erasure protected. + ## + ## `manifest` - the original manifest to be encoded + ## `blocks` - the number of blocks to be encoded - K + ## `parity` - the number of parity blocks to generate - M + ## + + logScope: + original_cid = manifest.cid.get() + original_len = manifest.len + blocks = blocks + parity = parity + + trace "Erasure coding manifest", blocks, parity + without var encoded =? Manifest.new(manifest, blocks, parity), error: + trace "Unable to create manifest", msg = error.msg + return error.failure + + logScope: + steps = encoded.steps + rounded_blocks = encoded.rounded + new_manifest = encoded.len + + var + encoder = self.encoderProvider(manifest.blockSize, blocks, parity) + + try: + for i in 0..= (encoded.K + encoded.M) or idxPendingBlocks.len <= 0: + break + + let + done = await one(idxPendingBlocks) + idx = pendingBlocks.find(done) + + idxPendingBlocks.del(idxPendingBlocks.find(done)) + + without blk =? (await done), error: + trace "Failed retrieving block", exc = error.msg + + if blk.isNil: + continue + + if idx >= encoded.K: + trace "Retrieved parity block", cid = blk.cid, idx + shallowCopy(parityData[idx - encoded.K], if blk.isEmpty: emptyBlock else: blk.data) + else: + trace "Retrieved data block", cid = blk.cid, idx + shallowCopy(data[idx], if blk.isEmpty: emptyBlock else: blk.data) + + resolved.inc + + let + dataPieces = data.filterIt( it.len > 0 ).len + parityPieces = parityData.filterIt( it.len > 0 ).len + + if dataPieces >= encoded.K: + trace "Retrieved all the required data blocks", data = dataPieces, parity = parityPieces + continue + + trace "Erasure decoding data", data = dataPieces, parity = parityPieces + if ( + let err = decoder.decode(data, parityData, recovered); + err.isErr): + trace "Unable to decode manifest!", err = $err.error + return failure($err.error) + + for i in 0.. 0): + + let blk = Block.new(chunk).tryGet() + manifest.add(blk.cid) + check (await store.putBlock(blk)) + + let + encoded = (await erasure.encode( + manifest, + buffers, + parity)).tryGet() + + check: + encoded.len mod (buffers + parity) == 0 + encoded.rounded == (manifest.len + (buffers - (manifest.len mod buffers))) + encoded.steps == encoded.rounded div buffers + + var + column = rng.rand(encoded.len div encoded.steps) # random column + dropped: seq[Cid] + + for _ in 0.. 0): + + let blk = Block.new(chunk).tryGet() + manifest.add(blk.cid) + check (await store.putBlock(blk)) + + let + encoded = (await erasure.encode( + manifest, + buffers, + parity)).tryGet() + + check: + encoded.len mod (buffers + parity) == 0 + encoded.rounded == (manifest.len + (buffers - (manifest.len mod buffers))) + encoded.steps == encoded.rounded div buffers + + var + column = rng.rand(encoded.len div encoded.steps) # random column + dropped: seq[Cid] + + for _ in 0.. 0): + + let blk = Block.new(chunk).tryGet() + manifest.add(blk.cid) + check (await store.putBlock(blk)) + + let + encoded = (await erasure.encode( + manifest, + buffers, + parity)).tryGet() + + check: + encoded.len mod (buffers + parity) == 0 + encoded.rounded == (manifest.len + (buffers - (manifest.len mod buffers))) + encoded.steps == encoded.rounded div buffers + + var + blocks: seq[int] + offset = 0 + + while offset < encoded.steps - 1: + let + blockIdx = toSeq(countup(offset, encoded.len - 1, encoded.steps)) + + for _ in 0.. 0): + + let blk = Block.new(chunk).tryGet() + manifest.add(blk.cid) + check (await store.putBlock(blk)) + + let + encoded = (await erasure.encode( + manifest, + buffers, + parity)).tryGet() + + check: + encoded.len mod (buffers + parity) == 0 + encoded.rounded == (manifest.len + (buffers - (manifest.len mod buffers))) + encoded.steps == encoded.rounded div buffers + + var + blocks: seq[int] + offset = 0 + + while offset < encoded.steps - 1: + let + blockIdx = toSeq(countup(offset, encoded.len - 1, encoded.steps)) + + for _ in 0.. 0): + + let blk = Block.new(chunk).tryGet() + manifest.add(blk.cid) + check (await store.putBlock(blk)) + + let + encoded = (await erasure.encode( + manifest, + buffers, + parity)).tryGet() + + check: + encoded.len mod (buffers + parity) == 0 + encoded.rounded == (manifest.len + (buffers - (manifest.len mod buffers))) + encoded.steps == encoded.rounded div buffers + + for b in encoded.blocks[0.. 0): + + let blk = Block.new(chunk).tryGet() + manifest.add(blk.cid) + check (await store.putBlock(blk)) + + let + encoded = (await erasure.encode( + manifest, + buffers, + parity)).tryGet() + + check: + encoded.len mod (buffers + parity) == 0 + encoded.rounded == (manifest.len + (buffers - (manifest.len mod buffers))) + encoded.steps == encoded.rounded div buffers + + for b in encoded.blocks[^(encoded.steps * encoded.M)..^1]: + check (await store.delBlock(b)) + + var + decoded = (await erasure.decode(encoded)).tryGet() + + for d in manifest: + check d in store diff --git a/tests/testDagger.nim b/tests/testDagger.nim index 73002d2f..d6e1dc3f 100644 --- a/tests/testDagger.nim +++ b/tests/testDagger.nim @@ -7,5 +7,6 @@ import ./dagger/testnode import ./dagger/teststorestream import ./dagger/testpurchasing import ./dagger/testsales +import ./dagger/testerasure {.warning[UnusedImport]: off.}