Better batching (#170)
* add cleaner batching * pass block instead of cid * cleanup manifest fetching * bug: reference correct cid version * add batch tests * make sure batch is always <= batchSize
This commit is contained in:
parent
48368893c9
commit
625facff4a
|
@ -88,7 +88,7 @@ template EmptyBlock*: untyped =
|
|||
emptyBlock = [
|
||||
CIDv0: {
|
||||
multiCodec("sha2-256"): Block(
|
||||
cid: EmptyCid[CIDv1][multiCodec("sha2-256")])
|
||||
cid: EmptyCid[CIDv0][multiCodec("sha2-256")])
|
||||
}.toTable,
|
||||
CIDv1: {
|
||||
multiCodec("sha2-256"): Block(
|
||||
|
|
|
@ -21,6 +21,7 @@ import pkg/chronos
|
|||
|
||||
import ./manifest
|
||||
import ../errors
|
||||
import ../blocktype
|
||||
import ./types
|
||||
|
||||
func encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] =
|
||||
|
@ -174,8 +175,8 @@ func decode*(
|
|||
|
||||
decoder.decode(data)
|
||||
|
||||
func decode*(_: type Manifest, data: openArray[byte], cid: Cid): ?!Manifest =
|
||||
without contentType =? cid.contentType() and
|
||||
func decode*(_: type Manifest, blk: Block): ?!Manifest =
|
||||
without contentType =? blk.cid.contentType() and
|
||||
containerType =? ManifestContainers.?[$contentType]:
|
||||
return failure "CID has invalid content type for manifest"
|
||||
Manifest.decode(data, containerType)
|
||||
Manifest.decode(blk.data, containerType)
|
||||
|
|
|
@ -19,7 +19,6 @@ type
|
|||
DagPBCoder* = ManifestCoderType[multiCodec("dag-pb")]
|
||||
|
||||
const
|
||||
# TODO: move somewhere better?
|
||||
ManifestContainers* = {
|
||||
$DagPBCodec: DagPBCoder()
|
||||
}.toTable
|
||||
|
|
|
@ -35,9 +35,11 @@ logScope:
|
|||
topics = "codex node"
|
||||
|
||||
const
|
||||
FetchBatch = 100
|
||||
FetchBatch = 200
|
||||
|
||||
type
|
||||
BatchProc* = proc(blocks: seq[bt.Block]): Future[void] {.gcsafe, raises: [Defect].}
|
||||
|
||||
CodexError = object of CatchableError
|
||||
|
||||
CodexNodeRef* = ref object
|
||||
|
@ -66,6 +68,10 @@ proc fetchManifest*(
|
|||
## Fetch and decode a manifest block
|
||||
##
|
||||
|
||||
without contentType =? cid.contentType() and
|
||||
containerType =? ManifestContainers.?[$contentType]:
|
||||
return failure "CID has invalid content type for manifest"
|
||||
|
||||
trace "Received retrieval request", cid
|
||||
without blkOrNone =? await node.blockStore.getBlock(cid), error:
|
||||
return failure(error)
|
||||
|
@ -74,12 +80,41 @@ proc fetchManifest*(
|
|||
trace "Block not found", cid
|
||||
return failure("Block not found")
|
||||
|
||||
without manifest =? Manifest.decode(blk.data, blk.cid):
|
||||
without manifest =? Manifest.decode(blk):
|
||||
return failure(
|
||||
newException(CodexError, "Unable to decode as manifest"))
|
||||
|
||||
return manifest.success
|
||||
|
||||
proc fetchBatched*(
|
||||
node: CodexNodeRef,
|
||||
manifest: Manifest,
|
||||
batchSize = FetchBatch,
|
||||
onBatch: BatchProc = nil): Future[?!void] {.async, gcsafe.} =
|
||||
## Fetch manifest in batches of `batchSize`
|
||||
##
|
||||
|
||||
let
|
||||
batches =
|
||||
(manifest.blocks.len div batchSize) +
|
||||
(manifest.blocks.len mod batchSize)
|
||||
|
||||
trace "Fetching blocks in batches of", size = batchSize
|
||||
for blks in manifest.blocks.distribute(max(1, batches), true):
|
||||
try:
|
||||
let
|
||||
blocks = blks.mapIt(node.blockStore.getBlock( it ))
|
||||
|
||||
await allFuturesThrowing(allFinished(blocks))
|
||||
if not onBatch.isNil:
|
||||
await onBatch(blocks.mapIt( it.read.get.get ))
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
return failure(exc.msg)
|
||||
|
||||
return success()
|
||||
|
||||
proc retrieve*(
|
||||
node: CodexNodeRef,
|
||||
cid: Cid): Future[?!LPStream] {.async.} =
|
||||
|
@ -97,18 +132,13 @@ proc retrieve*(
|
|||
|
||||
asyncSpawn erasureJob()
|
||||
else:
|
||||
proc fetchBlocksJob() {.async.} =
|
||||
proc prefetchBlocks() {.async, raises: [Defect].} =
|
||||
try:
|
||||
let batch = max(1, manifest.blocks.len div FetchBatch)
|
||||
trace "Prefetching in batches of", FetchBatch
|
||||
for blks in manifest.blocks.distribute(batch, true):
|
||||
discard await allFinished(
|
||||
blks.mapIt( node.blockStore.getBlock( it ) ))
|
||||
discard await node.fetchBatched(manifest)
|
||||
except CatchableError as exc:
|
||||
trace "Exception prefetching blocks", exc = exc.msg
|
||||
|
||||
asyncSpawn fetchBlocksJob()
|
||||
|
||||
asyncSpawn prefetchBlocks()
|
||||
return LPStream(StoreStream.new(node.blockStore, manifest)).success
|
||||
|
||||
let
|
||||
|
@ -207,26 +237,9 @@ proc requestStorage*(self: CodexNodeRef,
|
|||
trace "Purchasing not available"
|
||||
return failure "Purchasing not available"
|
||||
|
||||
without blkOrNone =? (await self.blockStore.getBlock(cid)), error:
|
||||
trace "Unable to retrieve manifest block", cid
|
||||
return failure(error)
|
||||
|
||||
without blk =? blkOrNone:
|
||||
trace "Manifest block not found", cid
|
||||
return failure("Manifest block not found")
|
||||
|
||||
without mc =? blk.cid.contentType():
|
||||
trace "Couldn't identify Cid!", cid
|
||||
return failure("Couldn't identify Cid! " & $cid)
|
||||
|
||||
# if we got a manifest, stream the blocks
|
||||
if $mc notin ManifestContainers:
|
||||
trace "Not a manifest type!", cid, mc = $mc
|
||||
return failure("Not a manifest type!")
|
||||
|
||||
without var manifest =? Manifest.decode(blk.data), error:
|
||||
trace "Unable to decode manifest from block", cid
|
||||
return failure(error)
|
||||
without manifest =? await self.fetchManifest(cid), error:
|
||||
trace "Unable to fetch manifest for cid", cid
|
||||
raise error
|
||||
|
||||
# Erasure code the dataset according to provided parameters
|
||||
without encoded =? (await self.erasure.encode(manifest, nodes.int, tolerance.int)), error:
|
||||
|
@ -313,14 +326,14 @@ proc start*(node: CodexNodeRef) {.async.} =
|
|||
trace "Unable to fetch manifest for cid", cid
|
||||
raise error
|
||||
|
||||
trace "Fetching block for cid", cid
|
||||
let batch = max(1, manifest.blocks.len div FetchBatch)
|
||||
trace "Prefetching in batches of", FetchBatch
|
||||
for blks in manifest.blocks.distribute(batch, true):
|
||||
await allFuturesThrowing(
|
||||
allFinished(blks.mapIt(
|
||||
node.blockStore.getBlock( it )
|
||||
)))
|
||||
trace "Fetching block for manifest", cid
|
||||
# TODO: This will probably require a call to `getBlock` either way,
|
||||
# since fetching of blocks will have to be selective according
|
||||
# to a combination of parameters, such as node slot position
|
||||
# and dataset geometry
|
||||
let fetchRes = await node.fetchBatched(manifest)
|
||||
if fetchRes.isErr:
|
||||
raise newException(CodexError, "Unable to retrieve blocks")
|
||||
|
||||
contracts.sales.onClear = proc(availability: Availability, request: StorageRequest) =
|
||||
# TODO: remove data from local storage
|
||||
|
|
|
@ -90,6 +90,52 @@ suite "Test Node":
|
|||
fetched.cid == manifest.cid
|
||||
fetched.blocks == manifest.blocks
|
||||
|
||||
test "Block Batching":
|
||||
var
|
||||
manifest = Manifest.new().tryGet()
|
||||
|
||||
while (
|
||||
let chunk = await chunker.getBytes();
|
||||
chunk.len > 0):
|
||||
|
||||
let blk = bt.Block.new(chunk).tryGet()
|
||||
(await localStore.putBlock(blk)).tryGet()
|
||||
manifest.add(blk.cid)
|
||||
|
||||
let
|
||||
manifestBlock = bt.Block.new(
|
||||
manifest.encode().tryGet(),
|
||||
codec = DagPBCodec
|
||||
).tryGet()
|
||||
|
||||
(await node.fetchBatched(
|
||||
manifest,
|
||||
batchSize = 3,
|
||||
proc(blocks: seq[bt.Block]) {.gcsafe, async.} =
|
||||
check blocks.len > 0 and blocks.len <= 3
|
||||
)).tryGet()
|
||||
|
||||
(await node.fetchBatched(
|
||||
manifest,
|
||||
batchSize = 6,
|
||||
proc(blocks: seq[bt.Block]) {.gcsafe, async.} =
|
||||
check blocks.len > 0 and blocks.len <= 6
|
||||
)).tryGet()
|
||||
|
||||
(await node.fetchBatched(
|
||||
manifest,
|
||||
batchSize = 9,
|
||||
proc(blocks: seq[bt.Block]) {.gcsafe, async.} =
|
||||
check blocks.len > 0 and blocks.len <= 9
|
||||
)).tryGet()
|
||||
|
||||
(await node.fetchBatched(
|
||||
manifest,
|
||||
batchSize = 11,
|
||||
proc(blocks: seq[bt.Block]) {.gcsafe, async.} =
|
||||
check blocks.len > 0 and blocks.len <= 11
|
||||
)).tryGet()
|
||||
|
||||
test "Store Data Stream":
|
||||
let
|
||||
stream = BufferStream.new()
|
||||
|
@ -116,7 +162,7 @@ suite "Test Node":
|
|||
|
||||
var
|
||||
manifestBlock = (await localStore.getBlock(manifestCid)).tryGet().get()
|
||||
localManifest = Manifest.decode(manifestBlock.data).tryGet()
|
||||
localManifest = Manifest.decode(manifestBlock).tryGet()
|
||||
|
||||
check:
|
||||
manifest.len == localManifest.len
|
||||
|
|
Loading…
Reference in New Issue