diff --git a/codex/node.nim b/codex/node.nim index 4bd3550c..688c1c0f 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -60,10 +60,11 @@ proc connect*( addrs: seq[MultiAddress]): Future[void] = node.switch.connect(peerId, addrs) -# TODO: move code that retrieves blocks in manifest into blockstore -proc retrieve*( +proc fetchManifest*( node: CodexNodeRef, - cid: Cid): Future[?!LPStream] {.async.} = + cid: Cid): Future[?!Manifest] {.async.} = + ## Fetch and decode a manifest block + ## trace "Received retrieval request", cid without blkOrNone =? await node.blockStore.getBlock(cid), error: @@ -73,8 +74,19 @@ proc retrieve*( trace "Block not found", cid return failure("Block not found") - if manifest =? Manifest.decode(blk.data, blk.cid): + without manifest =? Manifest.decode(blk.data, blk.cid): + return failure( + newException(CodexError, "Unable to decode as manifest")) + return manifest.success + +proc retrieve*( + node: CodexNodeRef, + cid: Cid): Future[?!LPStream] {.async.} = + ## Retrieve a block or manifest + ## + + if manifest =? (await node.fetchManifest(cid)): if manifest.protected: proc erasureJob(): Future[void] {.async.} = try: @@ -84,37 +96,38 @@ proc retrieve*( trace "Exception decoding manifest", cid asyncSpawn erasureJob() + else: + proc fetchBlocksJob() {.async.} = + try: + let batch = max(1, manifest.blocks.len div FetchBatch) + trace "Prefetching in batches of", FetchBatch + for blks in manifest.blocks.distribute(batch, true): + discard await allFinished( + blks.mapIt( node.blockStore.getBlock( it ) )) + except CatchableError as exc: + trace "Exception prefetching blocks", exc = exc.msg - proc prefetchBlocks() {.async.} = - ## Initiates requests to all blocks in the manifest - ## - try: - let - batch = max(1, manifest.blocks.len div FetchBatch) - trace "Prefetching in batches of", FetchBatch - for blks in manifest.blocks.distribute(batch, true): - discard await allFinished( - blks.mapIt( node.blockStore.getBlock( it ) )) - except CatchableError as exc: - trace "Exception prefetching blocks", exc = exc.msg + asyncSpawn fetchBlocksJob() - asyncSpawn prefetchBlocks() return LPStream(StoreStream.new(node.blockStore, manifest)).success let stream = BufferStream.new() - proc streamOneBlock(): Future[void] {.async.} = - try: - await stream.pushData(blk.data) - except CatchableError as exc: - trace "Unable to send block", cid - discard - finally: - await stream.pushEof() + if blkOrNone =? (await node.blockStore.getBlock(cid)) and blk =? blkOrNone: + proc streamOneBlock(): Future[void] {.async.} = + try: + await stream.pushData(blk.data) + except CatchableError as exc: + trace "Unable to send block", cid + discard + finally: + await stream.pushEof() - asyncSpawn streamOneBlock() - return LPStream(stream).success() + asyncSpawn streamOneBlock() + return LPStream(stream).success() + + return failure("Unable to retrieve Cid!") proc store*( node: CodexNodeRef, @@ -172,43 +185,6 @@ proc store*( return manifest.cid.success -proc store(node: CodexNodeRef, cid: Cid): Future[?!void] {.async.} - -proc store(node: CodexNodeRef, cids: seq[Cid]): Future[?!void] {.async.} = - ## Retrieves multiple datasets from the network, and stores them locally - - let batches = max(1, cids.len div FetchBatch) - for batch in cids.distribute(batches, true): - let results = await allFinished(cids.mapIt(node.store(it))) - for future in results: - let res = await future - if res.isFailure: - return failure res.error - - return success() - -proc store(node: CodexNodeRef, cid: Cid): Future[?!void] {.async.} = - ## Retrieves dataset from the network, and stores it locally - - without present =? await node.blockstore.hasBlock(cid): - return failure newException(CodexError, "Unable to find block " & $cid) - if present: - return success() - - without blkOrNone =? await node.blockstore.getBlock(cid): - return failure newException(CodexError, "Unable to retrieve block " & $cid) - without blk =? blkOrNone: - return failure newException(CodexError, "Unable to retrieve block " & $cid) - - if isErr (await node.blockstore.putBlock(blk)): - return failure newException(CodexError, "Unable to store block " & $cid) - - if manifest =? Manifest.decode(blk.data, blk.cid): - - let res = await node.store(manifest.blocks) - if res.isFailure: - return failure res.error - proc requestStorage*(self: CodexNodeRef, cid: Cid, duration: UInt256, @@ -326,14 +302,34 @@ proc start*(node: CodexNodeRef) {.async.} = if contracts =? node.contracts: # TODO: remove Sales callbacks, pass BlockStore and StorageProofs instead contracts.sales.onStore = proc(cid: string, _: Availability) {.async.} = - # store data in local storage - (await node.store(Cid.init(cid).tryGet())).tryGet() + ## store data in local storage + ## + + without cid =? Cid.init(cid): + trace "Unable to parse Cid", cid + raise newException(CodexError, "Unable to parse Cid") + + without manifest =? await node.fetchManifest(cid), error: + trace "Unable to fetch manifest for cid", cid + raise error + + trace "Fetching block for cid", cid + let batch = max(1, manifest.blocks.len div FetchBatch) + trace "Prefetching in batches of", FetchBatch + for blks in manifest.blocks.distribute(batch, true): + await allFuturesThrowing( + allFinished(blks.mapIt( + node.blockStore.getBlock( it ) + ))) + contracts.sales.onClear = proc(availability: Availability, request: StorageRequest) = # TODO: remove data from local storage discard + contracts.sales.onProve = proc(cid: string): Future[seq[byte]] {.async.} = # TODO: generate proof return @[42'u8] + await contracts.start() node.networkId = node.switch.peerInfo.peerId diff --git a/tests/codex/testnode.nim b/tests/codex/testnode.nim index 71947cd8..43572ba7 100644 --- a/tests/codex/testnode.nim +++ b/tests/codex/testnode.nim @@ -63,6 +63,33 @@ suite "Test Node": close(file) await node.stop() + test "Fetch Manifest": + var + manifest = Manifest.new().tryGet() + + while ( + let chunk = await chunker.getBytes(); + chunk.len > 0): + + let blk = bt.Block.new(chunk).tryGet() + (await localStore.putBlock(blk)).tryGet() + manifest.add(blk.cid) + + let + manifestBlock = bt.Block.new( + manifest.encode().tryGet(), + codec = DagPBCodec + ).tryGet() + + (await localStore.putBlock(manifestBlock)).tryGet() + + let + fetched = (await node.fetchManifest(manifestBlock.cid)).tryGet() + + check: + fetched.cid == manifest.cid + fetched.blocks == manifest.blocks + test "Store Data Stream": let stream = BufferStream.new()