Erasure cleanup (#545)

* cleanup erasure coding

* moar cleanup

* fix off by 1 issues in tests

* style

* consolidate decoding data code

* simplify tuple unpacking

* fix retrieve purchase

We don't support single blocks for now

* Apply suggestions from code review

Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com>
Signed-off-by: Dmitriy Ryajov <dryajov@gmail.com>

---------

Signed-off-by: Dmitriy Ryajov <dryajov@gmail.com>
Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com>
This commit is contained in:
Dmitriy Ryajov 2023-09-25 08:31:10 -06:00 committed by GitHub
parent 13de6dc500
commit 25ea7fd0b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 355 additions and 133 deletions

View File

@ -12,9 +12,11 @@ import pkg/upraises
push: {.upraises: [].}
import std/sequtils
import std/options
import pkg/chronos
import pkg/chronicles
import pkg/questionable
import ../manifest
import ../stores
@ -62,26 +64,181 @@ type
decoderProvider*: DecoderProvider
store*: BlockStore
proc encode*(
self: Erasure,
manifest: Manifest,
blocks: int,
parity: int
): Future[?!Manifest] {.async.} =
## Encode a manifest into one that is erasure protected.
GetNext = proc(): Future[?(bt.Block, int)] {.upraises: [], gcsafe, closure.}
PendingBlocksIter* = ref object
finished*: bool
next*: GetNext
func indexToPos(self: Erasure, encoded: Manifest, idx, step: int): int {.inline.} =
## Convert an index to a position in the encoded
## dataset
## `idx` - the index to convert
## `step` - the current step
## `pos` - the position in the encoded dataset
##
## `manifest` - the original manifest to be encoded
## `blocks` - the number of blocks to be encoded - K
## `parity` - the number of parity blocks to generate - M
(idx - step) div encoded.steps
iterator items*(blocks: PendingBlocksIter): Future[?(bt.Block, int)] =
while not blocks.finished:
yield blocks.next()
proc getPendingBlocks(
self: Erasure,
manifest: Manifest,
start, stop, steps: int): ?!PendingBlocksIter =
## Get pending blocks iterator
##
var
# calculate block indexes to retrieve
blockIdx = toSeq(countup(start, stop, steps))
# request all blocks from the store
pendingBlocks = blockIdx.mapIt(
self.store.getBlock(manifest[it]) # Get the data blocks (first K)
)
indices = pendingBlocks # needed so we can track the block indices
iter = PendingBlocksIter(finished: false)
trace "Requesting blocks", pendingBlocks = pendingBlocks.len
proc next(): Future[?(bt.Block, int)] {.async.} =
if iter.finished:
trace "No more blocks"
return none (bt.Block, int)
if pendingBlocks.len == 0:
iter.finished = true
trace "No more blocks - finished"
return none (bt.Block, int)
let
done = await one(pendingBlocks)
idx = indices.find(done)
logScope:
idx = idx
blockIdx = blockIdx[idx]
manifest = manifest[blockIdx[idx]]
pendingBlocks.del(pendingBlocks.find(done))
without blk =? (await done), error:
trace "Failed retrieving block", err = $error.msg
return none (bt.Block, int)
trace "Retrieved block"
some (blk, blockIdx[idx])
iter.next = next
success iter
proc prepareEncodingData(
self: Erasure,
encoded: Manifest,
step: int,
data: ref seq[seq[byte]],
emptyBlock: seq[byte]): Future[?!int] {.async.} =
## Prepare data for encoding
##
without pendingBlocksIter =?
self.getPendingBlocks(
encoded,
step,
encoded.rounded - 1, encoded.steps), err:
trace "Unable to get pending blocks", error = err.msg
return failure(err)
var resolved = 0
for blkFut in pendingBlocksIter:
if (blk, idx) =? (await blkFut):
let
pos = self.indexToPos(encoded, idx, step)
if blk.isEmpty:
trace "Padding with empty block", idx
shallowCopy(data[pos], emptyBlock)
else:
trace "Encoding block", cid = blk.cid, idx
shallowCopy(data[pos], blk.data)
resolved.inc()
success resolved
proc prepareDecodingData(
self: Erasure,
encoded: Manifest,
step: int,
data: ref seq[seq[byte]],
parityData: ref seq[seq[byte]],
emptyBlock: seq[byte]): Future[?!(int, int)] {.async.} =
## Prepare data for decoding
## `encoded` - the encoded manifest
## `step` - the current step
## `data` - the data to be prepared
## `parityData` - the parityData to be prepared
## `emptyBlock` - the empty block to be used for padding
##
without pendingBlocksIter =?
self.getPendingBlocks(
encoded,
step,
encoded.len - 1, encoded.steps), err:
trace "Unable to get pending blocks", error = err.msg
return failure(err)
var
dataPieces = 0
parityPieces = 0
resolved = 0
for blkFut in pendingBlocksIter:
# Continue to receive blocks until we have just enough for decoding
# or no more blocks can arrive
if resolved >= encoded.ecK:
break
if (blk, idx) =? (await blkFut):
let
pos = self.indexToPos(encoded, idx, step)
logScope:
cid = blk.cid
idx = idx
pos = pos
step = step
empty = blk.isEmpty
if idx >= encoded.rounded:
trace "Retrieved parity block"
shallowCopy(parityData[pos - encoded.ecK], if blk.isEmpty: emptyBlock else: blk.data)
parityPieces.inc
else:
trace "Retrieved data block"
shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data)
dataPieces.inc
resolved.inc
return success (dataPieces, parityPieces)
proc prepareManifest(
self: Erasure,
manifest: Manifest,
blocks: int,
parity: int): ?!Manifest =
logScope:
original_cid = manifest.cid.get()
original_len = manifest.len
blocks = blocks
parity = parity
trace "Erasure coding manifest", blocks, parity
if blocks > manifest.len:
trace "Unable to encode manifest, not enough blocks", blocks = blocks, len = manifest.len
return failure("Not enough blocks to encode")
trace "Preparing erasure coded manifest", blocks, parity
without var encoded =? Manifest.new(manifest, blocks, parity), error:
trace "Unable to create manifest", msg = error.msg
return error.failure
@ -91,57 +248,75 @@ proc encode*(
rounded_blocks = encoded.rounded
new_manifest = encoded.len
trace "Erasure coded manifest prepared"
success encoded
proc encodeData(
self: Erasure,
manifest: Manifest): Future[?!void] {.async.} =
## Encode blocks pointed to by the protected manifest
##
## `manifest` - the manifest to encode
##
var
encoder = self.encoderProvider(manifest.blockSize.int, blocks, parity)
encoded = manifest
logScope:
steps = encoded.steps
rounded_blocks = encoded.rounded
new_manifest = encoded.len
protected = encoded.protected
ecK = encoded.ecK
ecM = encoded.ecM
if not encoded.protected:
trace "Manifest is not erasure protected"
return failure("Manifest is not erasure protected")
var
encoder = self.encoderProvider(encoded.blockSize.int, encoded.ecK, encoded.ecM)
emptyBlock = newSeq[byte](encoded.blockSize.int)
try:
for i in 0..<encoded.steps:
for step in 0..<encoded.steps:
# TODO: Don't allocate a new seq every time, allocate once and zero out
var
data = newSeq[seq[byte]](blocks) # number of blocks to encode
parityData = newSeqWith[seq[byte]](parity, newSeq[byte](manifest.blockSize.int))
# calculate block indexes to retrieve
blockIdx = toSeq(countup(i, encoded.rounded - 1, encoded.steps))
# request all blocks from the store
dataBlocks = await allFinished(
blockIdx.mapIt( self.store.getBlock(encoded[it]) ))
data = seq[seq[byte]].new() # number of blocks to encode
parityData = newSeqWith[seq[byte]](encoded.ecM, newSeq[byte](encoded.blockSize.int))
data[].setLen(encoded.ecK)
# TODO: this is a tight blocking loop so we sleep here to allow
# other events to be processed, this should be addressed
# by threading
await sleepAsync(10.millis)
for j in 0..<blocks:
let idx = blockIdx[j]
if idx < manifest.len:
without blk =? (await dataBlocks[j]), error:
trace "Unable to retrieve block", error = error.msg
return failure error
without resolved =?
(await self.prepareEncodingData(encoded, step, data, emptyBlock)), err:
trace "Unable to prepare data", error = err.msg
return failure(err)
trace "Encoding block", cid = blk.cid, pos = idx
shallowCopy(data[j], blk.data)
else:
trace "Padding with empty block", pos = idx
data[j] = newSeq[byte](manifest.blockSize.int)
trace "Erasure coding data", data = data[].len, parity = parityData.len
trace "Erasure coding data", data = data.len, parity = parityData.len
let res = encoder.encode(data, parityData);
if res.isErr:
if (
let res = encoder.encode(data[], parityData);
res.isErr):
trace "Unable to encode manifest!", error = $res.error
return failure($res.error)
return res.mapFailure
for j in 0..<parity:
let idx = encoded.rounded + blockIdx[j]
var idx = encoded.rounded + step
for j in 0..<encoded.ecM:
without blk =? bt.Block.new(parityData[j]), error:
trace "Unable to create parity block", err = error.msg
return failure(error)
trace "Adding parity block", cid = blk.cid, pos = idx
trace "Adding parity block", cid = blk.cid, idx
encoded[idx] = blk.cid
if isErr (await self.store.putBlock(blk)):
trace "Unable to store block!", cid = blk.cid
return failure("Unable to store block!")
idx.inc(encoded.steps)
except CancelledError as exc:
trace "Erasure coding encoding cancelled"
raise exc # cancellation needs to be propagated
@ -151,98 +326,104 @@ proc encode*(
finally:
encoder.release()
return encoded.success
return success()
proc encode*(
self: Erasure,
manifest: Manifest,
blocks: int,
parity: int): Future[?!Manifest] {.async.} =
## Encode a manifest into one that is erasure protected.
##
## `manifest` - the original manifest to be encoded
## `blocks` - the number of blocks to be encoded - K
## `parity` - the number of parity blocks to generate - M
##
without var encoded =? self.prepareManifest(manifest, blocks, parity), error:
trace "Unable to prepare manifest", error = error.msg
return failure error
if err =? (await self.encodeData(encoded)).errorOption:
trace "Unable to encode data", error = err.msg
return failure err
return success encoded
proc decode*(
self: Erasure,
encoded: Manifest
): Future[?!Manifest] {.async.} =
self: Erasure,
encoded: Manifest,
all = true): Future[?!Manifest] {.async.} =
## Decode a protected manifest into it's original
## manifest
##
## `encoded` - the encoded (protected) manifest to
## be recovered
## `all` - if true, all blocks will be recovered,
## including parity
##
logScope:
steps = encoded.steps
rounded_blocks = encoded.rounded
new_manifest = encoded.len
protected = encoded.protected
ecK = encoded.ecK
ecM = encoded.ecM
if not encoded.protected:
trace "Manifest is not erasure protected"
return failure "Manifest is not erasure protected"
var
decoder = self.decoderProvider(encoded.blockSize.int, encoded.ecK, encoded.ecM)
emptyBlock = newSeq[byte](encoded.blockSize.int)
hasParity = false
trace "Decoding erasure coded manifest"
try:
for i in 0..<encoded.steps:
# TODO: Don't allocate a new seq every time, allocate once and zero out
let
# calculate block indexes to retrieve
blockIdx = toSeq(countup(i, encoded.len - 1, encoded.steps))
# request all blocks from the store
pendingBlocks = blockIdx.mapIt(
self.store.getBlock(encoded[it]) # Get the data blocks (first K)
)
for step in 0..<encoded.steps:
# TODO: this is a tight blocking loop so we sleep here to allow
# other events to be processed, this should be addressed
# by threading
await sleepAsync(10.millis)
var
data = newSeq[seq[byte]](encoded.ecK) # number of blocks to encode
parityData = newSeq[seq[byte]](encoded.ecM)
data = seq[seq[byte]].new()
# newSeq[seq[byte]](encoded.ecK) # number of blocks to encode
parityData = seq[seq[byte]].new()
recovered = newSeqWith[seq[byte]](encoded.ecK, newSeq[byte](encoded.blockSize.int))
idxPendingBlocks = pendingBlocks # copy futures to make using with `one` easier
emptyBlock = newSeq[byte](encoded.blockSize.int)
resolved = 0
while true:
# Continue to receive blocks until we have just enough for decoding
# or no more blocks can arrive
if (resolved >= encoded.ecK) or (idxPendingBlocks.len == 0):
break
data[].setLen(encoded.ecK) # set len to K
parityData[].setLen(encoded.ecM) # set len to M
let
done = await one(idxPendingBlocks)
idx = pendingBlocks.find(done)
idxPendingBlocks.del(idxPendingBlocks.find(done))
without blk =? (await done), error:
trace "Failed retrieving block", error = error.msg
continue
if idx >= encoded.ecK:
trace "Retrieved parity block", cid = blk.cid, idx
shallowCopy(parityData[idx - encoded.ecK], if blk.isEmpty: emptyBlock else: blk.data)
else:
trace "Retrieved data block", cid = blk.cid, idx
shallowCopy(data[idx], if blk.isEmpty: emptyBlock else: blk.data)
resolved.inc
let
dataPieces = data.filterIt( it.len > 0 ).len
parityPieces = parityData.filterIt( it.len > 0 ).len
without (dataPieces, parityPieces) =?
(await self.prepareDecodingData(encoded, step, data, parityData, emptyBlock)), err:
trace "Unable to prepare data", error = err.msg
return failure(err)
if dataPieces >= encoded.ecK:
trace "Retrieved all the required data blocks", data = dataPieces, parity = parityPieces
trace "Retrieved all the required data blocks"
continue
trace "Erasure decoding data", data = dataPieces, parity = parityPieces
trace "Erasure decoding data"
if (
let err = decoder.decode(data, parityData, recovered);
let err = decoder.decode(data[], parityData[], recovered);
err.isErr):
trace "Unable to decode manifest!", err = $err.error
trace "Unable to decode data!", err = $err.error
return failure($err.error)
for i in 0..<encoded.ecK:
if data[i].len <= 0:
if data[i].len <= 0 and not encoded.blocks[i].isEmpty:
without blk =? bt.Block.new(recovered[i]), error:
trace "Unable to create block!", exc = error.msg
return failure(error)
trace "Recovered block", cid = blk.cid
doAssert blk.cid in encoded.blocks,
"Recovered block not in original manifest"
trace "Recovered block", cid = blk.cid, index = i
if isErr (await self.store.putBlock(blk)):
trace "Unable to store block!", cid = blk.cid
return failure("Unable to store block!")

View File

@ -31,17 +31,17 @@ type
Manifest* = ref object of RootObj
rootHash: ?Cid # Root (tree) hash of the contained data set
originalBytes*: NBytes # Exact size of the original (uploaded) file
blockSize: NBytes # Size of each contained block (might not be needed if blocks are len-prefixed)
blocks: seq[Cid] # Block Cid
version: CidVersion # Cid version
hcodec: MultiCodec # Multihash codec
codec: MultiCodec # Data set codec
case protected: bool # Protected datasets have erasure coded info
blockSize: NBytes # Size of each contained block (might not be needed if blocks are len-prefixed)
blocks: seq[Cid] # Block Cid
version: CidVersion # Cid version
hcodec: MultiCodec # Multihash codec
codec: MultiCodec # Data set codec
case protected: bool # Protected datasets have erasure coded info
of true:
ecK: int # Number of blocks to encode
ecM: int # Number of resulting parity blocks
originalCid: Cid # The original Cid of the dataset being erasure coded
originalLen: int # The length of the original manifest
ecK: int # Number of blocks to encode
ecM: int # Number of resulting parity blocks
originalCid: Cid # The original Cid of the dataset being erasure coded
originalLen: int # The length of the original manifest
else:
discard
@ -223,8 +223,8 @@ proc new*(
## Create a manifest using an array of `Cid`s
##
# if hcodec notin EmptyDigests[version]:
# return failure("Unsupported manifest hash codec!")
if hcodec notin EmptyDigests[version]:
return failure("Unsupported manifest hash codec!")
T(
blocks: @blocks,

View File

@ -11,7 +11,6 @@
import std/tables
import pkg/libp2p
import pkg/questionable
import ../units
export units

View File

@ -195,9 +195,9 @@ proc new*(
chunkSize: NBytes = DefaultChunkSize
): CacheStore {.raises: [Defect, ValueError].} =
## Create a new CacheStore instance
##
##
## `cacheSize` and `chunkSize` are both in bytes
##
##
if cacheSize < chunkSize:
raise newException(ValueError, "cacheSize cannot be less than chunkSize")

View File

@ -58,6 +58,7 @@ type
BlockExpiration* = object
cid*: Cid
expiration*: SecondsSince1970
GetNext = proc(): Future[?BlockExpiration] {.upraises: [], gcsafe, closure.}
BlockExpirationIter* = ref object
finished*: bool
@ -85,6 +86,9 @@ method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
## Get a block from the blockstore
##
logScope:
cid = cid
if cid.isEmpty:
trace "Empty block, ignoring"
return success cid.emptyBlock
@ -108,12 +112,13 @@ proc getBlockExpirationTimestamp(self: RepoStore, ttl: ?Duration): SecondsSince1
self.clock.now() + duration.seconds
proc getBlockExpirationEntry(
self: RepoStore,
batch: var seq[BatchEntry],
cid: Cid,
ttl: ?Duration
): ?!BatchEntry =
self: RepoStore,
batch: var seq[BatchEntry],
cid: Cid,
ttl: ?Duration): ?!BatchEntry =
## Get an expiration entry for a batch
##
without key =? createBlockExpirationMetadataKey(cid), err:
return failure(err)
@ -129,13 +134,15 @@ proc persistTotalBlocksCount(self: RepoStore): Future[?!void] {.async.} =
return success()
method putBlock*(
self: RepoStore,
blk: Block,
ttl = Duration.none
): Future[?!void] {.async.} =
self: RepoStore,
blk: Block,
ttl = Duration.none): Future[?!void] {.async.} =
## Put a block to the blockstore
##
logScope:
cid = blk.cid
if blk.isEmpty:
trace "Empty block, ignoring"
return success()
@ -211,7 +218,11 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
## Delete a block from the blockstore
##
trace "Deleting block", cid
logScope:
cid = cid
trace "Deleting block"
if cid.isEmpty:
trace "Empty block, ignoring"
@ -245,9 +256,12 @@ method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
## Check if the block exists in the blockstore
##
logScope:
cid = cid
if cid.isEmpty:
trace "Empty block, ignoring"
return true.success
return success true
without key =? makePrefixKey(self.postFixLen, cid), err:
trace "Error getting key from provider", err = err.msg
@ -256,9 +270,8 @@ method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
return await self.repoDs.has(key)
method listBlocks*(
self: RepoStore,
blockType = BlockType.Manifest
): Future[?!BlocksIter] {.async.} =
self: RepoStore,
blockType = BlockType.Manifest): Future[?!BlocksIter] {.async.} =
## Get the list of blocks in the RepoStore.
## This is an intensive operation
##
@ -296,12 +309,12 @@ proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query =
success Query.init(queryKey, offset = offset, limit = maxNumber)
method getBlockExpirations*(
self: RepoStore,
maxNumber: int,
offset: int
): Future[?!BlockExpirationIter] {.async, base.} =
## Get block experiartions from the given RepoStore
self: RepoStore,
maxNumber: int,
offset: int): Future[?!BlockExpirationIter] {.async, base.} =
## Get block expirations from the given RepoStore
##
without query =? createBlockExpirationQuery(maxNumber, offset), err:
trace "Unable to format block expirations query"
return failure(err)

View File

@ -34,10 +34,9 @@ proc lenPrefix*(msg: openArray[byte]): seq[byte] =
return buf
proc corruptBlocks*(
store: BlockStore,
manifest: Manifest,
blks, bytes: int
): Future[seq[int]] {.async.} =
store: BlockStore,
manifest: Manifest,
blks, bytes: int): Future[seq[int]] {.async.} =
var pos: seq[int]
doAssert blks < manifest.len

View File

@ -64,13 +64,13 @@ asyncchecksuite "Erasure encode/decode":
let encoded = await encode(buffers, parity)
var
column = rng.rand(encoded.len div encoded.steps) # random column
column = rng.rand((encoded.len - 1) div encoded.steps) # random column
dropped: seq[Cid]
for _ in 0..<encoded.ecM:
dropped.add(encoded[column])
(await store.delBlock(encoded[column])).tryGet()
column.inc(encoded.steps)
column.inc(encoded.steps - 1)
var
decoded = (await erasure.decode(encoded)).tryGet()
@ -92,7 +92,7 @@ asyncchecksuite "Erasure encode/decode":
let encoded = await encode(buffers, parity)
var
column = rng.rand(encoded.len div encoded.steps) # random column
column = rng.rand((encoded.len - 1) div encoded.steps) # random column
dropped: seq[Cid]
for _ in 0..<encoded.ecM + 1:

View File

@ -2,11 +2,14 @@ import std/options
from pkg/libp2p import `==`
import pkg/chronos
import pkg/stint
import pkg/codex/rng
import pkg/stew/byteutils
import pkg/ethers/erc20
import pkg/codex/contracts
import pkg/codex/utils/stintutils
import ../contracts/time
import ../contracts/deployment
import ../codex/helpers
import ./twonodes
@ -53,9 +56,22 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false:
check id1 != id2
test "node retrieves purchase status":
# get one contiguous chunk
let rng = rng.Rng.instance()
let chunker = RandomChunker.new(rng, size = DefaultBlockSize * 2, chunkSize = DefaultBlockSize * 2)
let data = await chunker.getBytes()
let cid = client1.upload(byteutils.toHex(data)).get
let expiry = (await provider.currentTime()) + 30
let cid = client1.upload("some file contents").get
let id = client1.requestStorage(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, expiry=expiry, collateral=200.u256, nodes=2, tolerance=1).get
let id = client1.requestStorage(
cid,
duration=1.u256,
reward=2.u256,
proofProbability=3.u256,
expiry=expiry,
collateral=200.u256,
nodes=2,
tolerance=1).get
let request = client1.getPurchase(id).get.request.get
check request.ask.duration == 1.u256
check request.ask.reward == 2.u256
@ -65,6 +81,20 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false:
check request.ask.slots == 3'u64
check request.ask.maxSlotLoss == 1'u64
# TODO: We currently do not support encoding single chunks
# test "node retrieves purchase status with 1 chunk":
# let expiry = (await provider.currentTime()) + 30
# let cid = client1.upload("some file contents").get
# let id = client1.requestStorage(cid, duration=1.u256, reward=2.u256, proofProbability=3.u256, expiry=expiry, collateral=200.u256, nodes=2, tolerance=1).get
# let request = client1.getPurchase(id).get.request.get
# check request.ask.duration == 1.u256
# check request.ask.reward == 2.u256
# check request.ask.proofProbability == 3.u256
# check request.expiry == expiry
# check request.ask.collateral == 200.u256
# check request.ask.slots == 3'u64
# check request.ask.maxSlotLoss == 1'u64
test "node remembers purchase status after restart":
let expiry = (await provider.currentTime()) + 30
let cid = client1.upload("some file contents").get