diff --git a/codex/blocktype.nim b/codex/blocktype.nim index 081acdc2..1b41d2b9 100644 --- a/codex/blocktype.nim +++ b/codex/blocktype.nim @@ -88,7 +88,7 @@ template EmptyBlock*: untyped = emptyBlock = [ CIDv0: { multiCodec("sha2-256"): Block( - cid: EmptyCid[CIDv1][multiCodec("sha2-256")]) + cid: EmptyCid[CIDv0][multiCodec("sha2-256")]) }.toTable, CIDv1: { multiCodec("sha2-256"): Block( diff --git a/codex/manifest/coders.nim b/codex/manifest/coders.nim index d6506b13..d86f33d2 100644 --- a/codex/manifest/coders.nim +++ b/codex/manifest/coders.nim @@ -21,6 +21,7 @@ import pkg/chronos import ./manifest import ../errors +import ../blocktype import ./types func encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] = @@ -174,8 +175,8 @@ func decode*( decoder.decode(data) -func decode*(_: type Manifest, data: openArray[byte], cid: Cid): ?!Manifest = - without contentType =? cid.contentType() and +func decode*(_: type Manifest, blk: Block): ?!Manifest = + without contentType =? blk.cid.contentType() and containerType =? ManifestContainers.?[$contentType]: return failure "CID has invalid content type for manifest" - Manifest.decode(data, containerType) + Manifest.decode(blk.data, containerType) diff --git a/codex/manifest/types.nim b/codex/manifest/types.nim index 3ae0e497..c189da0f 100644 --- a/codex/manifest/types.nim +++ b/codex/manifest/types.nim @@ -19,7 +19,6 @@ type DagPBCoder* = ManifestCoderType[multiCodec("dag-pb")] const - # TODO: move somewhere better? ManifestContainers* = { $DagPBCodec: DagPBCoder() }.toTable diff --git a/codex/node.nim b/codex/node.nim index 688c1c0f..0dc1305d 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -35,9 +35,11 @@ logScope: topics = "codex node" const - FetchBatch = 100 + FetchBatch = 200 type + BatchProc* = proc(blocks: seq[bt.Block]): Future[void] {.gcsafe, raises: [Defect].} + CodexError = object of CatchableError CodexNodeRef* = ref object @@ -66,6 +68,10 @@ proc fetchManifest*( ## Fetch and decode a manifest block ## + without contentType =? cid.contentType() and + containerType =? ManifestContainers.?[$contentType]: + return failure "CID has invalid content type for manifest" + trace "Received retrieval request", cid without blkOrNone =? await node.blockStore.getBlock(cid), error: return failure(error) @@ -74,12 +80,41 @@ proc fetchManifest*( trace "Block not found", cid return failure("Block not found") - without manifest =? Manifest.decode(blk.data, blk.cid): + without manifest =? Manifest.decode(blk): return failure( newException(CodexError, "Unable to decode as manifest")) return manifest.success +proc fetchBatched*( + node: CodexNodeRef, + manifest: Manifest, + batchSize = FetchBatch, + onBatch: BatchProc = nil): Future[?!void] {.async, gcsafe.} = + ## Fetch manifest in batches of `batchSize` + ## + + let + batches = + (manifest.blocks.len div batchSize) + + (manifest.blocks.len mod batchSize) + + trace "Fetching blocks in batches of", size = batchSize + for blks in manifest.blocks.distribute(max(1, batches), true): + try: + let + blocks = blks.mapIt(node.blockStore.getBlock( it )) + + await allFuturesThrowing(allFinished(blocks)) + if not onBatch.isNil: + await onBatch(blocks.mapIt( it.read.get.get )) + except CancelledError as exc: + raise exc + except CatchableError as exc: + return failure(exc.msg) + + return success() + proc retrieve*( node: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async.} = @@ -97,18 +132,13 @@ proc retrieve*( asyncSpawn erasureJob() else: - proc fetchBlocksJob() {.async.} = + proc prefetchBlocks() {.async, raises: [Defect].} = try: - let batch = max(1, manifest.blocks.len div FetchBatch) - trace "Prefetching in batches of", FetchBatch - for blks in manifest.blocks.distribute(batch, true): - discard await allFinished( - blks.mapIt( node.blockStore.getBlock( it ) )) + discard await node.fetchBatched(manifest) except CatchableError as exc: trace "Exception prefetching blocks", exc = exc.msg - asyncSpawn fetchBlocksJob() - + asyncSpawn prefetchBlocks() return LPStream(StoreStream.new(node.blockStore, manifest)).success let @@ -207,26 +237,9 @@ proc requestStorage*(self: CodexNodeRef, trace "Purchasing not available" return failure "Purchasing not available" - without blkOrNone =? (await self.blockStore.getBlock(cid)), error: - trace "Unable to retrieve manifest block", cid - return failure(error) - - without blk =? blkOrNone: - trace "Manifest block not found", cid - return failure("Manifest block not found") - - without mc =? blk.cid.contentType(): - trace "Couldn't identify Cid!", cid - return failure("Couldn't identify Cid! " & $cid) - - # if we got a manifest, stream the blocks - if $mc notin ManifestContainers: - trace "Not a manifest type!", cid, mc = $mc - return failure("Not a manifest type!") - - without var manifest =? Manifest.decode(blk.data), error: - trace "Unable to decode manifest from block", cid - return failure(error) + without manifest =? await self.fetchManifest(cid), error: + trace "Unable to fetch manifest for cid", cid + raise error # Erasure code the dataset according to provided parameters without encoded =? (await self.erasure.encode(manifest, nodes.int, tolerance.int)), error: @@ -313,14 +326,14 @@ proc start*(node: CodexNodeRef) {.async.} = trace "Unable to fetch manifest for cid", cid raise error - trace "Fetching block for cid", cid - let batch = max(1, manifest.blocks.len div FetchBatch) - trace "Prefetching in batches of", FetchBatch - for blks in manifest.blocks.distribute(batch, true): - await allFuturesThrowing( - allFinished(blks.mapIt( - node.blockStore.getBlock( it ) - ))) + trace "Fetching block for manifest", cid + # TODO: This will probably require a call to `getBlock` either way, + # since fetching of blocks will have to be selective according + # to a combination of parameters, such as node slot position + # and dataset geometry + let fetchRes = await node.fetchBatched(manifest) + if fetchRes.isErr: + raise newException(CodexError, "Unable to retrieve blocks") contracts.sales.onClear = proc(availability: Availability, request: StorageRequest) = # TODO: remove data from local storage diff --git a/tests/codex/testnode.nim b/tests/codex/testnode.nim index 43572ba7..88b7a3b6 100644 --- a/tests/codex/testnode.nim +++ b/tests/codex/testnode.nim @@ -90,6 +90,52 @@ suite "Test Node": fetched.cid == manifest.cid fetched.blocks == manifest.blocks + test "Block Batching": + var + manifest = Manifest.new().tryGet() + + while ( + let chunk = await chunker.getBytes(); + chunk.len > 0): + + let blk = bt.Block.new(chunk).tryGet() + (await localStore.putBlock(blk)).tryGet() + manifest.add(blk.cid) + + let + manifestBlock = bt.Block.new( + manifest.encode().tryGet(), + codec = DagPBCodec + ).tryGet() + + (await node.fetchBatched( + manifest, + batchSize = 3, + proc(blocks: seq[bt.Block]) {.gcsafe, async.} = + check blocks.len > 0 and blocks.len <= 3 + )).tryGet() + + (await node.fetchBatched( + manifest, + batchSize = 6, + proc(blocks: seq[bt.Block]) {.gcsafe, async.} = + check blocks.len > 0 and blocks.len <= 6 + )).tryGet() + + (await node.fetchBatched( + manifest, + batchSize = 9, + proc(blocks: seq[bt.Block]) {.gcsafe, async.} = + check blocks.len > 0 and blocks.len <= 9 + )).tryGet() + + (await node.fetchBatched( + manifest, + batchSize = 11, + proc(blocks: seq[bt.Block]) {.gcsafe, async.} = + check blocks.len > 0 and blocks.len <= 11 + )).tryGet() + test "Store Data Stream": let stream = BufferStream.new() @@ -116,7 +162,7 @@ suite "Test Node": var manifestBlock = (await localStore.getBlock(manifestCid)).tryGet().get() - localManifest = Manifest.decode(manifestBlock.data).tryGet() + localManifest = Manifest.decode(manifestBlock).tryGet() check: manifest.len == localManifest.len