use storestream for retrieval

This commit is contained in:
Dmitriy Ryajov 2022-03-29 17:24:59 -06:00
parent 03c8ceccf9
commit 014db9cee4
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
3 changed files with 40 additions and 48 deletions

View File

@ -19,7 +19,7 @@ import pkg/questionable/results
import ./errors
const
BlockSize* = 4096 # file chunk read size
BlockSize* = 31 * 64 * 4 # file chunk read size
type
Block* = ref object of RootObj

View File

@ -25,6 +25,7 @@ import ./blocktype as bt
import ./manifest
import ./stores/blockstore
import ./blockexchange
import ./streams
logScope:
topics = "dagger node"
@ -37,6 +38,7 @@ type
networkId*: PeerID
blockStore*: BlockStore
engine*: BlockExcEngine
erasure*: Erasure
proc start*(node: DaggerNodeRef) {.async.} =
await node.switch.start()
@ -45,8 +47,13 @@ proc start*(node: DaggerNodeRef) {.async.} =
notice "Started dagger node", id = $node.networkId, addrs = node.switch.peerInfo.addrs
proc stop*(node: DaggerNodeRef) {.async.} =
await node.engine.stop()
await node.switch.stop()
trace "Stopping node"
if not node.engine.isNil:
await node.engine.stop()
if not node.switch.isNil:
await node.switch.stop()
proc findPeer*(
node: DaggerNodeRef,
@ -59,31 +66,9 @@ proc connect*(
addrs: seq[MultiAddress]): Future[void] =
node.switch.connect(peerId, addrs)
proc streamBlocks*(
node: DaggerNodeRef,
stream: BufferStream,
blockManifest: Manifest) {.async.} =
try:
# TODO: Read sequentially for now
# to prevent slurping the entire dataset
# since disk IO is blocking
for c in blockManifest:
without blk =? (await node.blockStore.getBlock(c)):
warn "Couldn't retrieve block", cid = c
break # abort if we couldn't get a block
trace "Streaming block data", cid = blk.cid, bytes = blk.data.len
await stream.pushData(blk.data)
except CatchableError as exc:
trace "Exception retrieving blocks", exc = exc.msg
finally:
await stream.pushEof()
proc retrieve*(
node: DaggerNodeRef,
stream: BufferStream,
cid: Cid): Future[?!void] {.async.} =
cid: Cid): Future[?!LPStream] {.async.} =
trace "Received retrieval request", cid
without blk =? await node.blockStore.getBlock(cid):
@ -94,24 +79,29 @@ proc retrieve*(
return failure(
newException(DaggerError, "Couldn't identify Cid!"))
# if we got a manifest, stream the blocks
if $mc in ManifestContainers:
trace "Retrieving data set", cid, mc
without blockManifest =? Manifest.decode(blk.data, ManifestContainers[$mc]):
without manifest =? Manifest.decode(blk.data, ManifestContainers[$mc]):
return failure("Unable to construct manifest!")
asyncSpawn node.streamBlocks(stream, blockManifest)
else:
asyncSpawn (proc(): Future[void] {.async.} =
try:
await stream.pushData(blk.data)
except CatchableError as exc:
trace "Unable to send block", cid
discard
finally:
await stream.pushEof())()
return LPStream(StoreStream.new(node.blockStore, manifest)).success
return success()
let
stream = BufferStream.new()
proc streamOneBlock(): Future[void] {.async.} =
try:
await stream.pushData(blk.data)
except CatchableError as exc:
trace "Unable to send block", cid
discard
finally:
await stream.pushEof()
asyncSpawn streamOneBlock()
return LPStream(stream).success()
proc store*(
node: DaggerNodeRef,
@ -122,7 +112,7 @@ proc store*(
return failure("Unable to create Block Set")
let
chunker = LPStreamChunker.new(stream)
chunker = LPStreamChunker.new(stream, chunkSize = BlockSize)
try:
while (
@ -159,13 +149,12 @@ proc store*(
trace "Unable to store manifest", cid = manifest.cid
return failure("Unable to store manifest " & $manifest.cid)
var cid: ?!Cid
if (cid = blockManifest.cid; cid.isErr):
trace "Unable to generate manifest Cid!", exc = cid.error.msg
return failure(cid.error.msg)
without cid =? blockManifest.cid, error:
trace "Unable to generate manifest Cid!", exc = error.msg
return failure(error.msg)
trace "Stored data", manifestCid = manifest.cid,
contentCid = !cid,
contentCid = cid,
blocks = blockManifest.len
return manifest.cid.success

View File

@ -111,18 +111,20 @@ proc initRestApi*(node: DaggerNodeRef): RestRouter =
Http400,
$id.error())
let
stream = BufferStream.new()
var
stream: LPStream
var bytes = 0
try:
if (
let retr = await node.retrieve(stream, id.get());
let retr = await node.retrieve(id.get());
retr.isErr):
return RestApiResponse.error(Http404, retr.error.msg)
resp.addHeader("Content-Type", "application/octet-stream")
await resp.prepareChunked()
stream = retr.get()
while not stream.atEof:
var
buff = newSeqUninitialized[byte](FileChunkSize)
@ -141,7 +143,8 @@ proc initRestApi*(node: DaggerNodeRef): RestRouter =
return RestApiResponse.error(Http500)
finally:
trace "Sent bytes", cid = id.get(), bytes
await stream.close()
if not stream.isNil:
await stream.close()
router.rawApi(
MethodPost,