Contract store (#161)

* split out manifest and block retrieval

* adding test for `fetchManifest`

* raise exceptions in onStore
This commit is contained in:
Dmitriy Ryajov 2022-07-28 11:44:59 -06:00 committed by GitHub
parent 0bfe26440e
commit 7ccde112f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 89 additions and 66 deletions

View File

@ -60,10 +60,11 @@ proc connect*(
addrs: seq[MultiAddress]): Future[void] = addrs: seq[MultiAddress]): Future[void] =
node.switch.connect(peerId, addrs) node.switch.connect(peerId, addrs)
# TODO: move code that retrieves blocks in manifest into blockstore proc fetchManifest*(
proc retrieve*(
node: CodexNodeRef, node: CodexNodeRef,
cid: Cid): Future[?!LPStream] {.async.} = cid: Cid): Future[?!Manifest] {.async.} =
## Fetch and decode a manifest block
##
trace "Received retrieval request", cid trace "Received retrieval request", cid
without blkOrNone =? await node.blockStore.getBlock(cid), error: without blkOrNone =? await node.blockStore.getBlock(cid), error:
@ -73,8 +74,19 @@ proc retrieve*(
trace "Block not found", cid trace "Block not found", cid
return failure("Block not found") return failure("Block not found")
if manifest =? Manifest.decode(blk.data, blk.cid): without manifest =? Manifest.decode(blk.data, blk.cid):
return failure(
newException(CodexError, "Unable to decode as manifest"))
return manifest.success
proc retrieve*(
node: CodexNodeRef,
cid: Cid): Future[?!LPStream] {.async.} =
## Retrieve a block or manifest
##
if manifest =? (await node.fetchManifest(cid)):
if manifest.protected: if manifest.protected:
proc erasureJob(): Future[void] {.async.} = proc erasureJob(): Future[void] {.async.} =
try: try:
@ -84,37 +96,38 @@ proc retrieve*(
trace "Exception decoding manifest", cid trace "Exception decoding manifest", cid
asyncSpawn erasureJob() asyncSpawn erasureJob()
else:
proc fetchBlocksJob() {.async.} =
try:
let batch = max(1, manifest.blocks.len div FetchBatch)
trace "Prefetching in batches of", FetchBatch
for blks in manifest.blocks.distribute(batch, true):
discard await allFinished(
blks.mapIt( node.blockStore.getBlock( it ) ))
except CatchableError as exc:
trace "Exception prefetching blocks", exc = exc.msg
proc prefetchBlocks() {.async.} = asyncSpawn fetchBlocksJob()
## Initiates requests to all blocks in the manifest
##
try:
let
batch = max(1, manifest.blocks.len div FetchBatch)
trace "Prefetching in batches of", FetchBatch
for blks in manifest.blocks.distribute(batch, true):
discard await allFinished(
blks.mapIt( node.blockStore.getBlock( it ) ))
except CatchableError as exc:
trace "Exception prefetching blocks", exc = exc.msg
asyncSpawn prefetchBlocks()
return LPStream(StoreStream.new(node.blockStore, manifest)).success return LPStream(StoreStream.new(node.blockStore, manifest)).success
let let
stream = BufferStream.new() stream = BufferStream.new()
proc streamOneBlock(): Future[void] {.async.} = if blkOrNone =? (await node.blockStore.getBlock(cid)) and blk =? blkOrNone:
try: proc streamOneBlock(): Future[void] {.async.} =
await stream.pushData(blk.data) try:
except CatchableError as exc: await stream.pushData(blk.data)
trace "Unable to send block", cid except CatchableError as exc:
discard trace "Unable to send block", cid
finally: discard
await stream.pushEof() finally:
await stream.pushEof()
asyncSpawn streamOneBlock() asyncSpawn streamOneBlock()
return LPStream(stream).success() return LPStream(stream).success()
return failure("Unable to retrieve Cid!")
proc store*( proc store*(
node: CodexNodeRef, node: CodexNodeRef,
@ -172,43 +185,6 @@ proc store*(
return manifest.cid.success return manifest.cid.success
proc store(node: CodexNodeRef, cid: Cid): Future[?!void] {.async.}
proc store(node: CodexNodeRef, cids: seq[Cid]): Future[?!void] {.async.} =
## Retrieves multiple datasets from the network, and stores them locally
let batches = max(1, cids.len div FetchBatch)
for batch in cids.distribute(batches, true):
let results = await allFinished(cids.mapIt(node.store(it)))
for future in results:
let res = await future
if res.isFailure:
return failure res.error
return success()
proc store(node: CodexNodeRef, cid: Cid): Future[?!void] {.async.} =
## Retrieves dataset from the network, and stores it locally
without present =? await node.blockstore.hasBlock(cid):
return failure newException(CodexError, "Unable to find block " & $cid)
if present:
return success()
without blkOrNone =? await node.blockstore.getBlock(cid):
return failure newException(CodexError, "Unable to retrieve block " & $cid)
without blk =? blkOrNone:
return failure newException(CodexError, "Unable to retrieve block " & $cid)
if isErr (await node.blockstore.putBlock(blk)):
return failure newException(CodexError, "Unable to store block " & $cid)
if manifest =? Manifest.decode(blk.data, blk.cid):
let res = await node.store(manifest.blocks)
if res.isFailure:
return failure res.error
proc requestStorage*(self: CodexNodeRef, proc requestStorage*(self: CodexNodeRef,
cid: Cid, cid: Cid,
duration: UInt256, duration: UInt256,
@ -326,14 +302,34 @@ proc start*(node: CodexNodeRef) {.async.} =
if contracts =? node.contracts: if contracts =? node.contracts:
# TODO: remove Sales callbacks, pass BlockStore and StorageProofs instead # TODO: remove Sales callbacks, pass BlockStore and StorageProofs instead
contracts.sales.onStore = proc(cid: string, _: Availability) {.async.} = contracts.sales.onStore = proc(cid: string, _: Availability) {.async.} =
# store data in local storage ## store data in local storage
(await node.store(Cid.init(cid).tryGet())).tryGet() ##
without cid =? Cid.init(cid):
trace "Unable to parse Cid", cid
raise newException(CodexError, "Unable to parse Cid")
without manifest =? await node.fetchManifest(cid), error:
trace "Unable to fetch manifest for cid", cid
raise error
trace "Fetching block for cid", cid
let batch = max(1, manifest.blocks.len div FetchBatch)
trace "Prefetching in batches of", FetchBatch
for blks in manifest.blocks.distribute(batch, true):
await allFuturesThrowing(
allFinished(blks.mapIt(
node.blockStore.getBlock( it )
)))
contracts.sales.onClear = proc(availability: Availability, request: StorageRequest) = contracts.sales.onClear = proc(availability: Availability, request: StorageRequest) =
# TODO: remove data from local storage # TODO: remove data from local storage
discard discard
contracts.sales.onProve = proc(cid: string): Future[seq[byte]] {.async.} = contracts.sales.onProve = proc(cid: string): Future[seq[byte]] {.async.} =
# TODO: generate proof # TODO: generate proof
return @[42'u8] return @[42'u8]
await contracts.start() await contracts.start()
node.networkId = node.switch.peerInfo.peerId node.networkId = node.switch.peerInfo.peerId

View File

@ -63,6 +63,33 @@ suite "Test Node":
close(file) close(file)
await node.stop() await node.stop()
test "Fetch Manifest":
var
manifest = Manifest.new().tryGet()
while (
let chunk = await chunker.getBytes();
chunk.len > 0):
let blk = bt.Block.new(chunk).tryGet()
(await localStore.putBlock(blk)).tryGet()
manifest.add(blk.cid)
let
manifestBlock = bt.Block.new(
manifest.encode().tryGet(),
codec = DagPBCodec
).tryGet()
(await localStore.putBlock(manifestBlock)).tryGet()
let
fetched = (await node.fetchManifest(manifestBlock.cid)).tryGet()
check:
fetched.cid == manifest.cid
fetched.blocks == manifest.blocks
test "Store Data Stream": test "Store Data Stream":
let let
stream = BufferStream.new() stream = BufferStream.new()