Erasure coding dataset (#72)
* add erasure coding support * add leopard support * erasure coding tests
This commit is contained in:
parent
70e4b2e5eb
commit
49c41e27b7
|
@ -0,0 +1,19 @@
|
|||
## Nim-Dagger
|
||||
## Copyright (c) 2022 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import ./erasure/erasure
|
||||
import ./erasure/backends/leopard
|
||||
|
||||
export erasure
|
||||
|
||||
func leoEncoderProvider*(size, buffers, parity: int): EncoderBackend {.raises: [Defect].} =
|
||||
LeoEncoderBackend.new(size, buffers, parity)
|
||||
|
||||
func leoDecoderProvider*(size, buffers, parity: int): DecoderBackend {.raises: [Defect].} =
|
||||
LeoDecoderBackend.new(size, buffers, parity)
|
|
@ -0,0 +1,40 @@
|
|||
## Nim-Dagger
|
||||
## Copyright (c) 2022 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import pkg/upraises
|
||||
|
||||
push: {.upraises: [].}
|
||||
|
||||
import ../manifest
|
||||
import ../stores
|
||||
|
||||
type
|
||||
Backend* = ref object of RootObj
|
||||
blockSize*: int # block size in bytes
|
||||
buffers*: int # number of original pieces
|
||||
parity*: int # number of redundancy pieces
|
||||
|
||||
EncoderBackend* = ref object of Backend
|
||||
DecoderBackend* = ref object of Backend
|
||||
|
||||
method release*(self: Backend) {.base.} =
|
||||
raiseAssert("not implemented!")
|
||||
|
||||
method encode*(
|
||||
self: EncoderBackend,
|
||||
buffers,
|
||||
parity: var openArray[seq[byte]]): Result[void, cstring] {.base.} =
|
||||
raiseAssert("not implemented!")
|
||||
|
||||
method decode*(
|
||||
self: DecoderBackend,
|
||||
buffers,
|
||||
parity,
|
||||
recovered: var openArray[seq[byte]]): Result[void, cstring] {.base.} =
|
||||
raiseAssert("not implemented!")
|
|
@ -0,0 +1,83 @@
|
|||
## Nim-Dagger
|
||||
## Copyright (c) 2022 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import std/options
|
||||
|
||||
import pkg/leopard
|
||||
import pkg/stew/results
|
||||
|
||||
import ../backend
|
||||
|
||||
type
|
||||
LeoEncoderBackend* = ref object of EncoderBackend
|
||||
encoder*: Option[LeoEncoder]
|
||||
|
||||
LeoDecoderBackend* = ref object of DecoderBackend
|
||||
decoder*: Option[LeoDecoder]
|
||||
|
||||
method encode*(
|
||||
self: LeoEncoderBackend,
|
||||
data,
|
||||
parity: var openArray[seq[byte]]): Result[void, cstring] =
|
||||
|
||||
var encoder = if self.encoder.isNone:
|
||||
self.encoder = (? LeoEncoder.init(
|
||||
self.blockSize,
|
||||
self.buffers,
|
||||
self.parity)).some
|
||||
self.encoder.get()
|
||||
else:
|
||||
self.encoder.get()
|
||||
|
||||
encoder.encode(data, parity)
|
||||
|
||||
method decode*(
|
||||
self: LeoDecoderBackend,
|
||||
data,
|
||||
parity,
|
||||
recovered: var openArray[seq[byte]]): Result[void, cstring] =
|
||||
|
||||
var decoder = if self.decoder.isNone:
|
||||
self.decoder = (? LeoDecoder.init(
|
||||
self.blockSize,
|
||||
self.buffers,
|
||||
self.parity)).some
|
||||
self.decoder.get()
|
||||
else:
|
||||
self.decoder.get()
|
||||
|
||||
decoder.decode(data, parity, recovered)
|
||||
|
||||
method release*(self: LeoEncoderBackend) =
|
||||
if self.encoder.isSome:
|
||||
self.encoder.get().free()
|
||||
|
||||
method release*(self: LeoDecoderBackend) =
|
||||
if self.decoder.isSome:
|
||||
self.decoder.get().free()
|
||||
|
||||
func new*(
|
||||
T: type LeoEncoderBackend,
|
||||
blockSize,
|
||||
buffers,
|
||||
parity: int): T =
|
||||
T(
|
||||
blockSize: blockSize,
|
||||
buffers: buffers,
|
||||
parity: parity)
|
||||
|
||||
func new*(
|
||||
T: type LeoDecoderBackend,
|
||||
blockSize,
|
||||
buffers,
|
||||
parity: int): T =
|
||||
T(
|
||||
blockSize: blockSize,
|
||||
buffers: buffers,
|
||||
parity: parity)
|
|
@ -0,0 +1,267 @@
|
|||
## Nim-Dagger
|
||||
## Copyright (c) 2022 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
import pkg/upraises
|
||||
|
||||
push: {.upraises: [].}
|
||||
|
||||
import std/sequtils
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/chronicles
|
||||
|
||||
import ../manifest
|
||||
import ../stores
|
||||
import ../errors
|
||||
import ../blocktype
|
||||
|
||||
import ./backend
|
||||
|
||||
export backend
|
||||
|
||||
logScope:
|
||||
topics = "dagger erasure"
|
||||
|
||||
type
|
||||
## Encode a manifest into one that is erasure protected.
|
||||
##
|
||||
## The new manifest has K `blocks` that are encoded into
|
||||
## additional M `parity` blocks. The resulting dataset
|
||||
## is padded with empty blocks if it doesn't have a square
|
||||
## shape.
|
||||
##
|
||||
## NOTE: The padding blocks could be excluded
|
||||
## from transmission, but they aren't for now.
|
||||
##
|
||||
## The resulting dataset is logically divided into rows
|
||||
## where a row is made up of B blocks. There are then,
|
||||
## K + M = N rows in total, each of length B blocks. Rows
|
||||
## are assumed to be of the same number of (B) blocks.
|
||||
##
|
||||
## The encoding is systematic and the rows can be
|
||||
## read sequentially by any node without decoding.
|
||||
##
|
||||
## Decoding is possible with any K rows or partial K
|
||||
## columns (with up to M blocks missing per column),
|
||||
## or any combination there of.
|
||||
##
|
||||
|
||||
EncoderProvider* = proc(size, blocks, parity: int): EncoderBackend
|
||||
{.raises: [Defect], noSideEffect.}
|
||||
|
||||
DecoderProvider* = proc(size, blocks, parity: int): DecoderBackend
|
||||
{.raises: [Defect], noSideEffect.}
|
||||
|
||||
Erasure* = ref object
|
||||
encoderProvider*: EncoderProvider
|
||||
decoderProvider*: DecoderProvider
|
||||
store*: BlockStore
|
||||
|
||||
proc encode*(
|
||||
self: Erasure,
|
||||
manifest: Manifest,
|
||||
blocks: int,
|
||||
parity: int): Future[?!Manifest] {.async.} =
|
||||
## Encode a manifest into one that is erasure protected.
|
||||
##
|
||||
## `manifest` - the original manifest to be encoded
|
||||
## `blocks` - the number of blocks to be encoded - K
|
||||
## `parity` - the number of parity blocks to generate - M
|
||||
##
|
||||
|
||||
logScope:
|
||||
original_cid = manifest.cid.get()
|
||||
original_len = manifest.len
|
||||
blocks = blocks
|
||||
parity = parity
|
||||
|
||||
trace "Erasure coding manifest", blocks, parity
|
||||
without var encoded =? Manifest.new(manifest, blocks, parity), error:
|
||||
trace "Unable to create manifest", msg = error.msg
|
||||
return error.failure
|
||||
|
||||
logScope:
|
||||
steps = encoded.steps
|
||||
rounded_blocks = encoded.rounded
|
||||
new_manifest = encoded.len
|
||||
|
||||
var
|
||||
encoder = self.encoderProvider(manifest.blockSize, blocks, parity)
|
||||
|
||||
try:
|
||||
for i in 0..<encoded.steps:
|
||||
# TODO: Don't allocate a new seq everytime, allocate once and zero out
|
||||
var
|
||||
data = newSeq[seq[byte]](blocks) # number of blocks to encode
|
||||
parityData = newSeqWith[seq[byte]](parity, newSeq[byte](manifest.blockSize))
|
||||
# calculate block indexes to retrieve
|
||||
blockIdx = toSeq(countup(i, encoded.rounded - 1, encoded.steps))
|
||||
# request all blocks from the store
|
||||
dataBlocks = await allFinished(
|
||||
blockIdx.mapIt( self.store.getBlock(encoded[it]) ))
|
||||
|
||||
for j in 0..<blocks:
|
||||
let idx = blockIdx[j]
|
||||
if idx < manifest.len:
|
||||
without var blk =? await dataBlocks[j], error:
|
||||
trace "Unable to retrieve block", msg = error.msg
|
||||
return error.failure
|
||||
|
||||
trace "Encoding block", cid = blk.cid, pos = idx
|
||||
shallowCopy(data[j], blk.data)
|
||||
else:
|
||||
trace "Padding with empty block", pos = idx
|
||||
data[j] = newSeq[byte](manifest.blockSize)
|
||||
|
||||
trace "Erasure coding data", data = data.len, parity = parityData.len
|
||||
if (
|
||||
let err = encoder.encode(data, parityData);
|
||||
err.isErr):
|
||||
trace "Unable to encode manifest!", err = $err.error
|
||||
return failure($err.error)
|
||||
|
||||
for j in 0..<parity:
|
||||
let idx = encoded.rounded + blockIdx[j]
|
||||
without blk =? Block.new(parityData[j]), error:
|
||||
trace "Unable to create parity block", err = error.msg
|
||||
return failure(error)
|
||||
|
||||
trace "Adding parity block", cid = blk.cid, pos = idx
|
||||
encoded[idx] = blk.cid
|
||||
if not (await self.store.putBlock(blk)):
|
||||
trace "Unable to store block!", cid = blk.cid
|
||||
return failure("Unable to store block!")
|
||||
except CancelledError as exc:
|
||||
trace "Erasure coding encoding cancelled"
|
||||
raise exc # cancellation needs to be propagated
|
||||
except CatchableError as exc:
|
||||
trace "Erasure coding encoding error", exc = exc.msg
|
||||
return failure(exc)
|
||||
finally:
|
||||
encoder.release()
|
||||
|
||||
return encoded.success
|
||||
|
||||
proc decode*(
|
||||
self: Erasure,
|
||||
encoded: Manifest): Future[?!Manifest] {.async.} =
|
||||
## Decode a protected manifest into it's original
|
||||
## manifest
|
||||
##
|
||||
## `encoded` - the encoded (protected) manifest to
|
||||
## be recovered
|
||||
##
|
||||
|
||||
logScope:
|
||||
steps = encoded.steps
|
||||
rounded_blocks = encoded.rounded
|
||||
new_manifest = encoded.len
|
||||
|
||||
var
|
||||
decoder = self.decoderProvider(encoded.blockSize, encoded.K, encoded.M)
|
||||
|
||||
try:
|
||||
for i in 0..<encoded.steps:
|
||||
# TODO: Don't allocate a new seq everytime, allocate once and zero out
|
||||
let
|
||||
# calculate block indexes to retrieve
|
||||
blockIdx = toSeq(countup(i, encoded.len - 1, encoded.steps))
|
||||
# request all blocks from the store
|
||||
pendingBlocks = blockIdx.mapIt(
|
||||
self.store.getBlock(encoded[it]) # Get the data blocks (first K)
|
||||
)
|
||||
|
||||
var
|
||||
data = newSeq[seq[byte]](encoded.K) # number of blocks to encode
|
||||
parityData = newSeq[seq[byte]](encoded.M)
|
||||
recovered = newSeqWith[seq[byte]](encoded.K, newSeq[byte](encoded.blockSize))
|
||||
idxPendingBlocks = pendingBlocks # copy futures to make using with `one` easier
|
||||
emptyBlock = newSeq[byte](encoded.blockSize)
|
||||
resolved = 0
|
||||
|
||||
while true:
|
||||
if resolved >= (encoded.K + encoded.M) or idxPendingBlocks.len <= 0:
|
||||
break
|
||||
|
||||
let
|
||||
done = await one(idxPendingBlocks)
|
||||
idx = pendingBlocks.find(done)
|
||||
|
||||
idxPendingBlocks.del(idxPendingBlocks.find(done))
|
||||
|
||||
without blk =? (await done), error:
|
||||
trace "Failed retrieving block", exc = error.msg
|
||||
|
||||
if blk.isNil:
|
||||
continue
|
||||
|
||||
if idx >= encoded.K:
|
||||
trace "Retrieved parity block", cid = blk.cid, idx
|
||||
shallowCopy(parityData[idx - encoded.K], if blk.isEmpty: emptyBlock else: blk.data)
|
||||
else:
|
||||
trace "Retrieved data block", cid = blk.cid, idx
|
||||
shallowCopy(data[idx], if blk.isEmpty: emptyBlock else: blk.data)
|
||||
|
||||
resolved.inc
|
||||
|
||||
let
|
||||
dataPieces = data.filterIt( it.len > 0 ).len
|
||||
parityPieces = parityData.filterIt( it.len > 0 ).len
|
||||
|
||||
if dataPieces >= encoded.K:
|
||||
trace "Retrieved all the required data blocks", data = dataPieces, parity = parityPieces
|
||||
continue
|
||||
|
||||
trace "Erasure decoding data", data = dataPieces, parity = parityPieces
|
||||
if (
|
||||
let err = decoder.decode(data, parityData, recovered);
|
||||
err.isErr):
|
||||
trace "Unable to decode manifest!", err = $err.error
|
||||
return failure($err.error)
|
||||
|
||||
for i in 0..<encoded.K:
|
||||
if data[i].len <= 0:
|
||||
without blk =? Block.new(recovered[i]), error:
|
||||
trace "Unable to create block!", exc = error.msg
|
||||
return failure(error)
|
||||
|
||||
trace "Recovered block", cid = blk.cid
|
||||
if not (await self.store.putBlock(blk)):
|
||||
trace "Unable to store block!", cid = blk.cid
|
||||
return failure("Unable to store block!")
|
||||
except CancelledError as exc:
|
||||
trace "Erasure coding decoding cancelled"
|
||||
raise exc # cancellation needs to be propagated
|
||||
except CatchableError as exc:
|
||||
trace "Erasure coding decoding error", exc = exc.msg
|
||||
return failure(exc)
|
||||
finally:
|
||||
decoder.release()
|
||||
|
||||
without decoded =? Manifest.new(blocks = encoded.blocks[0..<encoded.originalLen]), error:
|
||||
return error.failure
|
||||
|
||||
return decoded.success
|
||||
|
||||
proc start*(self: Erasure) {.async.} =
|
||||
return
|
||||
|
||||
proc stop*(self: Erasure) {.async.} =
|
||||
return
|
||||
|
||||
proc new*(
|
||||
T: type Erasure,
|
||||
store: BlockStore,
|
||||
encoderProvider: EncoderProvider,
|
||||
decoderProvider: DecoderProvider): Erasure =
|
||||
|
||||
Erasure(
|
||||
store: store,
|
||||
encoderProvider: encoderProvider,
|
||||
decoderProvider: decoderProvider)
|
|
@ -0,0 +1,306 @@
|
|||
import std/sequtils
|
||||
|
||||
import pkg/asynctest
|
||||
import pkg/chronos
|
||||
import pkg/libp2p
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
||||
import pkg/dagger/erasure
|
||||
import pkg/dagger/manifest
|
||||
import pkg/dagger/stores
|
||||
import pkg/dagger/blocktype
|
||||
import pkg/dagger/rng
|
||||
|
||||
import ./helpers
|
||||
|
||||
suite "Erasure encode/decode":
|
||||
test "Should tolerate loosing M data blocks in a single random column":
|
||||
const
|
||||
buffers = 20
|
||||
parity = 10
|
||||
dataSetSize = BlockSize * 123 # weird geometry
|
||||
|
||||
var
|
||||
chunker = RandomChunker.new(Rng.instance(), size = dataSetSize, chunkSize = BlockSize)
|
||||
manifest = Manifest.new(blockSize = BlockSize).tryGet()
|
||||
store = CacheStore.new(cacheSize = (dataSetSize * 2), chunkSize = BlockSize)
|
||||
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
|
||||
rng = Rng.instance
|
||||
|
||||
while (
|
||||
let chunk = await chunker.getBytes();
|
||||
chunk.len > 0):
|
||||
|
||||
let blk = Block.new(chunk).tryGet()
|
||||
manifest.add(blk.cid)
|
||||
check (await store.putBlock(blk))
|
||||
|
||||
let
|
||||
encoded = (await erasure.encode(
|
||||
manifest,
|
||||
buffers,
|
||||
parity)).tryGet()
|
||||
|
||||
check:
|
||||
encoded.len mod (buffers + parity) == 0
|
||||
encoded.rounded == (manifest.len + (buffers - (manifest.len mod buffers)))
|
||||
encoded.steps == encoded.rounded div buffers
|
||||
|
||||
var
|
||||
column = rng.rand(encoded.len div encoded.steps) # random column
|
||||
dropped: seq[Cid]
|
||||
|
||||
for _ in 0..<encoded.M:
|
||||
dropped.add(encoded[column])
|
||||
check (await store.delBlock(encoded[column]))
|
||||
column.inc(encoded.steps)
|
||||
|
||||
var
|
||||
decoded = (await erasure.decode(encoded)).tryGet()
|
||||
|
||||
check:
|
||||
decoded.cid.tryGet() == manifest.cid.tryGet()
|
||||
decoded.cid.tryGet() == encoded.originalCid
|
||||
decoded.len == encoded.originalLen
|
||||
|
||||
for d in dropped:
|
||||
check d in store
|
||||
|
||||
test "Should not tolerate loosing more than M data blocks in a single random column":
|
||||
const
|
||||
buffers = 20
|
||||
parity = 10
|
||||
dataSetSize = BlockSize * 123 # weird geometry
|
||||
|
||||
var
|
||||
chunker = RandomChunker.new(Rng.instance(), size = dataSetSize, chunkSize = BlockSize)
|
||||
manifest = Manifest.new(blockSize = BlockSize).tryGet()
|
||||
store = CacheStore.new(cacheSize = (dataSetSize * 2), chunkSize = BlockSize)
|
||||
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
|
||||
rng = Rng.instance
|
||||
|
||||
while (
|
||||
let chunk = await chunker.getBytes();
|
||||
chunk.len > 0):
|
||||
|
||||
let blk = Block.new(chunk).tryGet()
|
||||
manifest.add(blk.cid)
|
||||
check (await store.putBlock(blk))
|
||||
|
||||
let
|
||||
encoded = (await erasure.encode(
|
||||
manifest,
|
||||
buffers,
|
||||
parity)).tryGet()
|
||||
|
||||
check:
|
||||
encoded.len mod (buffers + parity) == 0
|
||||
encoded.rounded == (manifest.len + (buffers - (manifest.len mod buffers)))
|
||||
encoded.steps == encoded.rounded div buffers
|
||||
|
||||
var
|
||||
column = rng.rand(encoded.len div encoded.steps) # random column
|
||||
dropped: seq[Cid]
|
||||
|
||||
for _ in 0..<encoded.M + 1:
|
||||
dropped.add(encoded[column])
|
||||
check (await store.delBlock(encoded[column]))
|
||||
column.inc(encoded.steps)
|
||||
|
||||
var
|
||||
decoded: Manifest
|
||||
|
||||
expect ResultFailure:
|
||||
decoded = (await erasure.decode(encoded)).tryGet()
|
||||
|
||||
for d in dropped:
|
||||
check d notin store
|
||||
|
||||
test "Should tolerate loosing M data blocks in M random columns":
|
||||
const
|
||||
buffers = 20
|
||||
parity = 10
|
||||
dataSetSize = BlockSize * 123 # weird geometry
|
||||
|
||||
var
|
||||
chunker = RandomChunker.new(Rng.instance(), size = dataSetSize, chunkSize = BlockSize)
|
||||
manifest = Manifest.new(blockSize = BlockSize).tryGet()
|
||||
store = CacheStore.new(cacheSize = (dataSetSize * 5), chunkSize = BlockSize)
|
||||
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
|
||||
rng = Rng.instance
|
||||
|
||||
while (
|
||||
let chunk = await chunker.getBytes();
|
||||
chunk.len > 0):
|
||||
|
||||
let blk = Block.new(chunk).tryGet()
|
||||
manifest.add(blk.cid)
|
||||
check (await store.putBlock(blk))
|
||||
|
||||
let
|
||||
encoded = (await erasure.encode(
|
||||
manifest,
|
||||
buffers,
|
||||
parity)).tryGet()
|
||||
|
||||
check:
|
||||
encoded.len mod (buffers + parity) == 0
|
||||
encoded.rounded == (manifest.len + (buffers - (manifest.len mod buffers)))
|
||||
encoded.steps == encoded.rounded div buffers
|
||||
|
||||
var
|
||||
blocks: seq[int]
|
||||
offset = 0
|
||||
|
||||
while offset < encoded.steps - 1:
|
||||
let
|
||||
blockIdx = toSeq(countup(offset, encoded.len - 1, encoded.steps))
|
||||
|
||||
for _ in 0..<encoded.M:
|
||||
blocks.add(rng.sample(blockIdx, blocks))
|
||||
offset.inc
|
||||
|
||||
for idx in blocks:
|
||||
check (await store.delBlock(encoded[idx]))
|
||||
|
||||
var
|
||||
decoded = (await erasure.decode(encoded)).tryGet()
|
||||
|
||||
for d in manifest:
|
||||
check d in store
|
||||
|
||||
test "Should not tolerate loosing more than M data blocks in M random columns":
|
||||
const
|
||||
buffers = 20
|
||||
parity = 10
|
||||
dataSetSize = BlockSize * 123 # weird geometry
|
||||
|
||||
var
|
||||
chunker = RandomChunker.new(Rng.instance(), size = dataSetSize, chunkSize = BlockSize)
|
||||
manifest = Manifest.new(blockSize = BlockSize).tryGet()
|
||||
store = CacheStore.new(cacheSize = (dataSetSize * 5), chunkSize = BlockSize)
|
||||
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
|
||||
rng = Rng.instance
|
||||
|
||||
while (
|
||||
let chunk = await chunker.getBytes();
|
||||
chunk.len > 0):
|
||||
|
||||
let blk = Block.new(chunk).tryGet()
|
||||
manifest.add(blk.cid)
|
||||
check (await store.putBlock(blk))
|
||||
|
||||
let
|
||||
encoded = (await erasure.encode(
|
||||
manifest,
|
||||
buffers,
|
||||
parity)).tryGet()
|
||||
|
||||
check:
|
||||
encoded.len mod (buffers + parity) == 0
|
||||
encoded.rounded == (manifest.len + (buffers - (manifest.len mod buffers)))
|
||||
encoded.steps == encoded.rounded div buffers
|
||||
|
||||
var
|
||||
blocks: seq[int]
|
||||
offset = 0
|
||||
|
||||
while offset < encoded.steps - 1:
|
||||
let
|
||||
blockIdx = toSeq(countup(offset, encoded.len - 1, encoded.steps))
|
||||
|
||||
for _ in 0..<encoded.M + 1: # NOTE: the +1
|
||||
blocks.add(rng.sample(blockIdx, blocks))
|
||||
offset.inc
|
||||
|
||||
for idx in blocks:
|
||||
check (await store.delBlock(encoded[idx]))
|
||||
|
||||
var
|
||||
decoded: Manifest
|
||||
|
||||
expect ResultFailure:
|
||||
decoded = (await erasure.decode(encoded)).tryGet()
|
||||
|
||||
test "Should tolerate loosing M (a.k.a row) contiguous data blocks":
|
||||
const
|
||||
buffers = 20
|
||||
parity = 10
|
||||
dataSetSize = BlockSize * 123 # weird geometry
|
||||
|
||||
var
|
||||
chunker = RandomChunker.new(Rng.instance(), size = dataSetSize, chunkSize = BlockSize)
|
||||
manifest = Manifest.new(blockSize = BlockSize).tryGet()
|
||||
store = CacheStore.new(cacheSize = (dataSetSize * 5), chunkSize = BlockSize)
|
||||
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
|
||||
rng = Rng.instance
|
||||
|
||||
while (
|
||||
let chunk = await chunker.getBytes();
|
||||
chunk.len > 0):
|
||||
|
||||
let blk = Block.new(chunk).tryGet()
|
||||
manifest.add(blk.cid)
|
||||
check (await store.putBlock(blk))
|
||||
|
||||
let
|
||||
encoded = (await erasure.encode(
|
||||
manifest,
|
||||
buffers,
|
||||
parity)).tryGet()
|
||||
|
||||
check:
|
||||
encoded.len mod (buffers + parity) == 0
|
||||
encoded.rounded == (manifest.len + (buffers - (manifest.len mod buffers)))
|
||||
encoded.steps == encoded.rounded div buffers
|
||||
|
||||
for b in encoded.blocks[0..<encoded.steps * encoded.M]:
|
||||
check (await store.delBlock(b))
|
||||
|
||||
var
|
||||
decoded = (await erasure.decode(encoded)).tryGet()
|
||||
|
||||
for d in manifest:
|
||||
check d in store
|
||||
|
||||
test "Should tolerate loosing M (a.k.a row) contiguous parity blocks":
|
||||
const
|
||||
buffers = 20
|
||||
parity = 10
|
||||
dataSetSize = BlockSize * 123 # weird geometry
|
||||
|
||||
var
|
||||
chunker = RandomChunker.new(Rng.instance(), size = dataSetSize, chunkSize = BlockSize)
|
||||
manifest = Manifest.new(blockSize = BlockSize).tryGet()
|
||||
store = CacheStore.new(cacheSize = (dataSetSize * 5), chunkSize = BlockSize)
|
||||
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
|
||||
rng = Rng.instance
|
||||
|
||||
while (
|
||||
let chunk = await chunker.getBytes();
|
||||
chunk.len > 0):
|
||||
|
||||
let blk = Block.new(chunk).tryGet()
|
||||
manifest.add(blk.cid)
|
||||
check (await store.putBlock(blk))
|
||||
|
||||
let
|
||||
encoded = (await erasure.encode(
|
||||
manifest,
|
||||
buffers,
|
||||
parity)).tryGet()
|
||||
|
||||
check:
|
||||
encoded.len mod (buffers + parity) == 0
|
||||
encoded.rounded == (manifest.len + (buffers - (manifest.len mod buffers)))
|
||||
encoded.steps == encoded.rounded div buffers
|
||||
|
||||
for b in encoded.blocks[^(encoded.steps * encoded.M)..^1]:
|
||||
check (await store.delBlock(b))
|
||||
|
||||
var
|
||||
decoded = (await erasure.decode(encoded)).tryGet()
|
||||
|
||||
for d in manifest:
|
||||
check d in store
|
|
@ -7,5 +7,6 @@ import ./dagger/testnode
|
|||
import ./dagger/teststorestream
|
||||
import ./dagger/testpurchasing
|
||||
import ./dagger/testsales
|
||||
import ./dagger/testerasure
|
||||
|
||||
{.warning[UnusedImport]: off.}
|
||||
|
|
Loading…
Reference in New Issue