mirror of
https://github.com/codex-storage/nim-codex.git
synced 2025-02-17 23:36:33 +00:00
Documentation in the REST OpenAPI spec were showing incorrect information about Purchasing endpoints. Update the open api spec to reflect information about purchases, not requests.
277 lines
8.7 KiB
Nim
277 lines
8.7 KiB
Nim
## Nim-Codex
|
|
## Copyright (c) 2022 Status Research & Development GmbH
|
|
## Licensed under either of
|
|
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
## at your option.
|
|
## This file may not be copied, modified, or distributed except according to
|
|
## those terms.
|
|
|
|
import pkg/upraises
|
|
|
|
push: {.upraises: [].}
|
|
|
|
import std/sequtils
|
|
|
|
import pkg/chronos
|
|
import pkg/chronicles
|
|
|
|
import ../manifest
|
|
import ../stores
|
|
import ../blocktype as bt
|
|
|
|
import ./backend
|
|
|
|
export backend
|
|
|
|
logScope:
|
|
topics = "codex erasure"
|
|
|
|
type
|
|
## Encode a manifest into one that is erasure protected.
|
|
##
|
|
## The new manifest has K `blocks` that are encoded into
|
|
## additional M `parity` blocks. The resulting dataset
|
|
## is padded with empty blocks if it doesn't have a square
|
|
## shape.
|
|
##
|
|
## NOTE: The padding blocks could be excluded
|
|
## from transmission, but they aren't for now.
|
|
##
|
|
## The resulting dataset is logically divided into rows
|
|
## where a row is made up of B blocks. There are then,
|
|
## K + M = N rows in total, each of length B blocks. Rows
|
|
## are assumed to be of the same number of (B) blocks.
|
|
##
|
|
## The encoding is systematic and the rows can be
|
|
## read sequentially by any node without decoding.
|
|
##
|
|
## Decoding is possible with any K rows or partial K
|
|
## columns (with up to M blocks missing per column),
|
|
## or any combination there of.
|
|
##
|
|
|
|
EncoderProvider* = proc(size, blocks, parity: int): EncoderBackend
|
|
{.raises: [Defect], noSideEffect.}
|
|
|
|
DecoderProvider* = proc(size, blocks, parity: int): DecoderBackend
|
|
{.raises: [Defect], noSideEffect.}
|
|
|
|
Erasure* = ref object
|
|
encoderProvider*: EncoderProvider
|
|
decoderProvider*: DecoderProvider
|
|
store*: BlockStore
|
|
|
|
proc encode*(
|
|
self: Erasure,
|
|
manifest: Manifest,
|
|
blocks: int,
|
|
parity: int): Future[?!Manifest] {.async.} =
|
|
## Encode a manifest into one that is erasure protected.
|
|
##
|
|
## `manifest` - the original manifest to be encoded
|
|
## `blocks` - the number of blocks to be encoded - K
|
|
## `parity` - the number of parity blocks to generate - M
|
|
##
|
|
|
|
logScope:
|
|
original_cid = manifest.cid.get()
|
|
original_len = manifest.len
|
|
blocks = blocks
|
|
parity = parity
|
|
|
|
trace "Erasure coding manifest", blocks, parity
|
|
without var encoded =? Manifest.new(manifest, blocks, parity), error:
|
|
trace "Unable to create manifest", msg = error.msg
|
|
return error.failure
|
|
|
|
logScope:
|
|
steps = encoded.steps
|
|
rounded_blocks = encoded.rounded
|
|
new_manifest = encoded.len
|
|
|
|
var
|
|
encoder = self.encoderProvider(manifest.blockSize, blocks, parity)
|
|
|
|
try:
|
|
for i in 0..<encoded.steps:
|
|
# TODO: Don't allocate a new seq every time, allocate once and zero out
|
|
var
|
|
data = newSeq[seq[byte]](blocks) # number of blocks to encode
|
|
parityData = newSeqWith[seq[byte]](parity, newSeq[byte](manifest.blockSize))
|
|
# calculate block indexes to retrieve
|
|
blockIdx = toSeq(countup(i, encoded.rounded - 1, encoded.steps))
|
|
# request all blocks from the store
|
|
dataBlocks = await allFinished(
|
|
blockIdx.mapIt( self.store.getBlock(encoded[it]) ))
|
|
|
|
# TODO: this is a tight blocking loop so we sleep here to allow
|
|
# other events to be processed, this should be addressed
|
|
# by threading
|
|
await sleepAsync(10.millis)
|
|
|
|
for j in 0..<blocks:
|
|
let idx = blockIdx[j]
|
|
if idx < manifest.len:
|
|
without blk =? (await dataBlocks[j]), error:
|
|
trace "Unable to retrieve block", error = error.msg
|
|
return failure error
|
|
|
|
trace "Encoding block", cid = blk.cid, pos = idx
|
|
shallowCopy(data[j], blk.data)
|
|
else:
|
|
trace "Padding with empty block", pos = idx
|
|
data[j] = newSeq[byte](manifest.blockSize)
|
|
|
|
trace "Erasure coding data", data = data.len, parity = parityData.len
|
|
|
|
let res = encoder.encode(data, parityData);
|
|
if res.isErr:
|
|
trace "Unable to encode manifest!", error = $res.error
|
|
return failure($res.error)
|
|
|
|
for j in 0..<parity:
|
|
let idx = encoded.rounded + blockIdx[j]
|
|
without blk =? bt.Block.new(parityData[j]), error:
|
|
trace "Unable to create parity block", err = error.msg
|
|
return failure(error)
|
|
|
|
trace "Adding parity block", cid = blk.cid, pos = idx
|
|
encoded[idx] = blk.cid
|
|
if isErr (await self.store.putBlock(blk)):
|
|
trace "Unable to store block!", cid = blk.cid
|
|
return failure("Unable to store block!")
|
|
except CancelledError as exc:
|
|
trace "Erasure coding encoding cancelled"
|
|
raise exc # cancellation needs to be propagated
|
|
except CatchableError as exc:
|
|
trace "Erasure coding encoding error", exc = exc.msg
|
|
return failure(exc)
|
|
finally:
|
|
encoder.release()
|
|
|
|
return encoded.success
|
|
|
|
proc decode*(
|
|
self: Erasure,
|
|
encoded: Manifest): Future[?!Manifest] {.async.} =
|
|
## Decode a protected manifest into it's original
|
|
## manifest
|
|
##
|
|
## `encoded` - the encoded (protected) manifest to
|
|
## be recovered
|
|
##
|
|
|
|
logScope:
|
|
steps = encoded.steps
|
|
rounded_blocks = encoded.rounded
|
|
new_manifest = encoded.len
|
|
|
|
var
|
|
decoder = self.decoderProvider(encoded.blockSize, encoded.ecK, encoded.ecM)
|
|
|
|
try:
|
|
for i in 0..<encoded.steps:
|
|
# TODO: Don't allocate a new seq every time, allocate once and zero out
|
|
let
|
|
# calculate block indexes to retrieve
|
|
blockIdx = toSeq(countup(i, encoded.len - 1, encoded.steps))
|
|
# request all blocks from the store
|
|
pendingBlocks = blockIdx.mapIt(
|
|
self.store.getBlock(encoded[it]) # Get the data blocks (first K)
|
|
)
|
|
|
|
# TODO: this is a tight blocking loop so we sleep here to allow
|
|
# other events to be processed, this should be addressed
|
|
# by threading
|
|
await sleepAsync(10.millis)
|
|
|
|
var
|
|
data = newSeq[seq[byte]](encoded.ecK) # number of blocks to encode
|
|
parityData = newSeq[seq[byte]](encoded.ecM)
|
|
recovered = newSeqWith[seq[byte]](encoded.ecK, newSeq[byte](encoded.blockSize))
|
|
idxPendingBlocks = pendingBlocks # copy futures to make using with `one` easier
|
|
emptyBlock = newSeq[byte](encoded.blockSize)
|
|
resolved = 0
|
|
|
|
while true:
|
|
# Continue to receive blocks until we have just enough for decoding
|
|
# or no more blocks can arrive
|
|
if (resolved >= encoded.ecK) or (idxPendingBlocks.len == 0):
|
|
break
|
|
|
|
let
|
|
done = await one(idxPendingBlocks)
|
|
idx = pendingBlocks.find(done)
|
|
|
|
idxPendingBlocks.del(idxPendingBlocks.find(done))
|
|
|
|
without blk =? (await done), error:
|
|
trace "Failed retrieving block", error = error.msg
|
|
continue
|
|
|
|
if idx >= encoded.ecK:
|
|
trace "Retrieved parity block", cid = blk.cid, idx
|
|
shallowCopy(parityData[idx - encoded.ecK], if blk.isEmpty: emptyBlock else: blk.data)
|
|
else:
|
|
trace "Retrieved data block", cid = blk.cid, idx
|
|
shallowCopy(data[idx], if blk.isEmpty: emptyBlock else: blk.data)
|
|
|
|
resolved.inc
|
|
|
|
let
|
|
dataPieces = data.filterIt( it.len > 0 ).len
|
|
parityPieces = parityData.filterIt( it.len > 0 ).len
|
|
|
|
if dataPieces >= encoded.ecK:
|
|
trace "Retrieved all the required data blocks", data = dataPieces, parity = parityPieces
|
|
continue
|
|
|
|
trace "Erasure decoding data", data = dataPieces, parity = parityPieces
|
|
if (
|
|
let err = decoder.decode(data, parityData, recovered);
|
|
err.isErr):
|
|
trace "Unable to decode manifest!", err = $err.error
|
|
return failure($err.error)
|
|
|
|
for i in 0..<encoded.ecK:
|
|
if data[i].len <= 0:
|
|
without blk =? bt.Block.new(recovered[i]), error:
|
|
trace "Unable to create block!", exc = error.msg
|
|
return failure(error)
|
|
|
|
trace "Recovered block", cid = blk.cid
|
|
if isErr (await self.store.putBlock(blk)):
|
|
trace "Unable to store block!", cid = blk.cid
|
|
return failure("Unable to store block!")
|
|
except CancelledError as exc:
|
|
trace "Erasure coding decoding cancelled"
|
|
raise exc # cancellation needs to be propagated
|
|
except CatchableError as exc:
|
|
trace "Erasure coding decoding error", exc = exc.msg
|
|
return failure(exc)
|
|
finally:
|
|
decoder.release()
|
|
|
|
without decoded =? Manifest.new(blocks = encoded.blocks[0..<encoded.originalLen]), error:
|
|
return error.failure
|
|
|
|
return decoded.success
|
|
|
|
proc start*(self: Erasure) {.async.} =
|
|
return
|
|
|
|
proc stop*(self: Erasure) {.async.} =
|
|
return
|
|
|
|
proc new*(
|
|
T: type Erasure,
|
|
store: BlockStore,
|
|
encoderProvider: EncoderProvider,
|
|
decoderProvider: DecoderProvider): Erasure =
|
|
|
|
Erasure(
|
|
store: store,
|
|
encoderProvider: encoderProvider,
|
|
decoderProvider: decoderProvider)
|