diff --git a/dagger/blocktype.nim b/dagger/blocktype.nim index 0f7c37ba..2318228d 100644 --- a/dagger/blocktype.nim +++ b/dagger/blocktype.nim @@ -19,7 +19,7 @@ import pkg/questionable/results import ./errors const - BlockSize* = 4096 # file chunk read size + BlockSize* = 31 * 64 * 4 # file chunk read size type Block* = ref object of RootObj diff --git a/dagger/node.nim b/dagger/node.nim index 312093f6..2bbfb864 100644 --- a/dagger/node.nim +++ b/dagger/node.nim @@ -25,6 +25,7 @@ import ./blocktype as bt import ./manifest import ./stores/blockstore import ./blockexchange +import ./streams logScope: topics = "dagger node" @@ -37,6 +38,7 @@ type networkId*: PeerID blockStore*: BlockStore engine*: BlockExcEngine + erasure*: Erasure proc start*(node: DaggerNodeRef) {.async.} = await node.switch.start() @@ -45,8 +47,13 @@ proc start*(node: DaggerNodeRef) {.async.} = notice "Started dagger node", id = $node.networkId, addrs = node.switch.peerInfo.addrs proc stop*(node: DaggerNodeRef) {.async.} = - await node.engine.stop() - await node.switch.stop() + trace "Stopping node" + + if not node.engine.isNil: + await node.engine.stop() + + if not node.switch.isNil: + await node.switch.stop() proc findPeer*( node: DaggerNodeRef, @@ -59,31 +66,9 @@ proc connect*( addrs: seq[MultiAddress]): Future[void] = node.switch.connect(peerId, addrs) -proc streamBlocks*( - node: DaggerNodeRef, - stream: BufferStream, - blockManifest: Manifest) {.async.} = - - try: - # TODO: Read sequentially for now - # to prevent slurping the entire dataset - # since disk IO is blocking - for c in blockManifest: - without blk =? (await node.blockStore.getBlock(c)): - warn "Couldn't retrieve block", cid = c - break # abort if we couldn't get a block - - trace "Streaming block data", cid = blk.cid, bytes = blk.data.len - await stream.pushData(blk.data) - except CatchableError as exc: - trace "Exception retrieving blocks", exc = exc.msg - finally: - await stream.pushEof() - proc retrieve*( node: DaggerNodeRef, - stream: BufferStream, - cid: Cid): Future[?!void] {.async.} = + cid: Cid): Future[?!LPStream] {.async.} = trace "Received retrieval request", cid without blk =? await node.blockStore.getBlock(cid): @@ -94,24 +79,29 @@ proc retrieve*( return failure( newException(DaggerError, "Couldn't identify Cid!")) + # if we got a manifest, stream the blocks if $mc in ManifestContainers: trace "Retrieving data set", cid, mc - without blockManifest =? Manifest.decode(blk.data, ManifestContainers[$mc]): + without manifest =? Manifest.decode(blk.data, ManifestContainers[$mc]): return failure("Unable to construct manifest!") - asyncSpawn node.streamBlocks(stream, blockManifest) - else: - asyncSpawn (proc(): Future[void] {.async.} = - try: - await stream.pushData(blk.data) - except CatchableError as exc: - trace "Unable to send block", cid - discard - finally: - await stream.pushEof())() + return LPStream(StoreStream.new(node.blockStore, manifest)).success - return success() + let + stream = BufferStream.new() + + proc streamOneBlock(): Future[void] {.async.} = + try: + await stream.pushData(blk.data) + except CatchableError as exc: + trace "Unable to send block", cid + discard + finally: + await stream.pushEof() + + asyncSpawn streamOneBlock() + return LPStream(stream).success() proc store*( node: DaggerNodeRef, @@ -122,7 +112,7 @@ proc store*( return failure("Unable to create Block Set") let - chunker = LPStreamChunker.new(stream) + chunker = LPStreamChunker.new(stream, chunkSize = BlockSize) try: while ( @@ -159,13 +149,12 @@ proc store*( trace "Unable to store manifest", cid = manifest.cid return failure("Unable to store manifest " & $manifest.cid) - var cid: ?!Cid - if (cid = blockManifest.cid; cid.isErr): - trace "Unable to generate manifest Cid!", exc = cid.error.msg - return failure(cid.error.msg) + without cid =? blockManifest.cid, error: + trace "Unable to generate manifest Cid!", exc = error.msg + return failure(error.msg) trace "Stored data", manifestCid = manifest.cid, - contentCid = !cid, + contentCid = cid, blocks = blockManifest.len return manifest.cid.success diff --git a/dagger/rest/api.nim b/dagger/rest/api.nim index 8fbb417f..c5a93d49 100644 --- a/dagger/rest/api.nim +++ b/dagger/rest/api.nim @@ -111,18 +111,20 @@ proc initRestApi*(node: DaggerNodeRef): RestRouter = Http400, $id.error()) - let - stream = BufferStream.new() + var + stream: LPStream var bytes = 0 try: if ( - let retr = await node.retrieve(stream, id.get()); + let retr = await node.retrieve(id.get()); retr.isErr): return RestApiResponse.error(Http404, retr.error.msg) resp.addHeader("Content-Type", "application/octet-stream") await resp.prepareChunked() + + stream = retr.get() while not stream.atEof: var buff = newSeqUninitialized[byte](FileChunkSize) @@ -141,7 +143,8 @@ proc initRestApi*(node: DaggerNodeRef): RestRouter = return RestApiResponse.error(Http500) finally: trace "Sent bytes", cid = id.get(), bytes - await stream.close() + if not stream.isNil: + await stream.close() router.rawApi( MethodPost,