diff --git a/dagger/blocktype.nim b/dagger/blocktype.nim index 0f7c37ba..2318228d 100644 --- a/dagger/blocktype.nim +++ b/dagger/blocktype.nim @@ -19,7 +19,7 @@ import pkg/questionable/results import ./errors const - BlockSize* = 4096 # file chunk read size + BlockSize* = 31 * 64 * 4 # file chunk read size type Block* = ref object of RootObj diff --git a/dagger/chunker.nim b/dagger/chunker.nim index 0889f29e..1ecb5cb2 100644 --- a/dagger/chunker.nim +++ b/dagger/chunker.nim @@ -24,7 +24,7 @@ import ./blocktype export blocktype const - DefaultChunkSize*: Positive = 1024 * 256 + DefaultChunkSize* = BlockSize type # default reader type @@ -69,7 +69,7 @@ func new*( kind = ChunkerType.FixedChunker, reader: Reader, chunkSize = DefaultChunkSize, - pad = false): T = + pad = true): T = var chunker = Chunker( kind: kind, reader: reader) @@ -85,7 +85,7 @@ proc new*( stream: LPStream, kind = ChunkerType.FixedChunker, chunkSize = DefaultChunkSize, - pad = false): T = + pad = true): T = ## create the default File chunker ## @@ -114,7 +114,7 @@ proc new*( file: File, kind = ChunkerType.FixedChunker, chunkSize = DefaultChunkSize, - pad = false): T = + pad = true): T = ## create the default File chunker ## diff --git a/dagger/manifest/coders.nim b/dagger/manifest/coders.nim index 045dbdd5..1189f677 100644 --- a/dagger/manifest/coders.nim +++ b/dagger/manifest/coders.nim @@ -64,7 +64,7 @@ func encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] = var header = initProtoBuffer() header.write(1, cid.data.buffer) header.write(2, manifest.blockSize.uint32) - header.write(3, manifest.blocks.len.uint32) + header.write(3, manifest.len.uint32) pbNode.write(1, header.buffer) # set the rootHash Cid as the data field pbNode.finish() diff --git a/dagger/node.nim b/dagger/node.nim index 312093f6..a5d417ec 100644 --- a/dagger/node.nim +++ b/dagger/node.nim @@ -25,6 +25,7 @@ import ./blocktype as bt import ./manifest import ./stores/blockstore import ./blockexchange +import ./streams logScope: topics = "dagger node" @@ -45,8 +46,13 @@ proc start*(node: DaggerNodeRef) {.async.} = notice "Started dagger node", id = $node.networkId, addrs = node.switch.peerInfo.addrs proc stop*(node: DaggerNodeRef) {.async.} = - await node.engine.stop() - await node.switch.stop() + trace "Stopping node" + + if not node.engine.isNil: + await node.engine.stop() + + if not node.switch.isNil: + await node.switch.stop() proc findPeer*( node: DaggerNodeRef, @@ -59,31 +65,9 @@ proc connect*( addrs: seq[MultiAddress]): Future[void] = node.switch.connect(peerId, addrs) -proc streamBlocks*( - node: DaggerNodeRef, - stream: BufferStream, - blockManifest: Manifest) {.async.} = - - try: - # TODO: Read sequentially for now - # to prevent slurping the entire dataset - # since disk IO is blocking - for c in blockManifest: - without blk =? (await node.blockStore.getBlock(c)): - warn "Couldn't retrieve block", cid = c - break # abort if we couldn't get a block - - trace "Streaming block data", cid = blk.cid, bytes = blk.data.len - await stream.pushData(blk.data) - except CatchableError as exc: - trace "Exception retrieving blocks", exc = exc.msg - finally: - await stream.pushEof() - proc retrieve*( node: DaggerNodeRef, - stream: BufferStream, - cid: Cid): Future[?!void] {.async.} = + cid: Cid): Future[?!LPStream] {.async.} = trace "Received retrieval request", cid without blk =? await node.blockStore.getBlock(cid): @@ -94,24 +78,29 @@ proc retrieve*( return failure( newException(DaggerError, "Couldn't identify Cid!")) + # if we got a manifest, stream the blocks if $mc in ManifestContainers: trace "Retrieving data set", cid, mc - without blockManifest =? Manifest.decode(blk.data, ManifestContainers[$mc]): + without manifest =? Manifest.decode(blk.data, ManifestContainers[$mc]): return failure("Unable to construct manifest!") - asyncSpawn node.streamBlocks(stream, blockManifest) - else: - asyncSpawn (proc(): Future[void] {.async.} = - try: - await stream.pushData(blk.data) - except CatchableError as exc: - trace "Unable to send block", cid - discard - finally: - await stream.pushEof())() + return LPStream(StoreStream.new(node.blockStore, manifest)).success - return success() + let + stream = BufferStream.new() + + proc streamOneBlock(): Future[void] {.async.} = + try: + await stream.pushData(blk.data) + except CatchableError as exc: + trace "Unable to send block", cid + discard + finally: + await stream.pushEof() + + asyncSpawn streamOneBlock() + return LPStream(stream).success() proc store*( node: DaggerNodeRef, @@ -122,7 +111,7 @@ proc store*( return failure("Unable to create Block Set") let - chunker = LPStreamChunker.new(stream) + chunker = LPStreamChunker.new(stream, chunkSize = BlockSize) try: while ( @@ -159,13 +148,12 @@ proc store*( trace "Unable to store manifest", cid = manifest.cid return failure("Unable to store manifest " & $manifest.cid) - var cid: ?!Cid - if (cid = blockManifest.cid; cid.isErr): - trace "Unable to generate manifest Cid!", exc = cid.error.msg - return failure(cid.error.msg) + without cid =? blockManifest.cid, error: + trace "Unable to generate manifest Cid!", exc = error.msg + return failure(error.msg) trace "Stored data", manifestCid = manifest.cid, - contentCid = !cid, + contentCid = cid, blocks = blockManifest.len return manifest.cid.success diff --git a/dagger/rest/api.nim b/dagger/rest/api.nim index 8fbb417f..c5a93d49 100644 --- a/dagger/rest/api.nim +++ b/dagger/rest/api.nim @@ -111,18 +111,20 @@ proc initRestApi*(node: DaggerNodeRef): RestRouter = Http400, $id.error()) - let - stream = BufferStream.new() + var + stream: LPStream var bytes = 0 try: if ( - let retr = await node.retrieve(stream, id.get()); + let retr = await node.retrieve(id.get()); retr.isErr): return RestApiResponse.error(Http404, retr.error.msg) resp.addHeader("Content-Type", "application/octet-stream") await resp.prepareChunked() + + stream = retr.get() while not stream.atEof: var buff = newSeqUninitialized[byte](FileChunkSize) @@ -141,7 +143,8 @@ proc initRestApi*(node: DaggerNodeRef): RestRouter = return RestApiResponse.error(Http500) finally: trace "Sent bytes", cid = id.get(), bytes - await stream.close() + if not stream.isNil: + await stream.close() router.rawApi( MethodPost, diff --git a/dagger/streams/storestream.nim b/dagger/streams/storestream.nim index 28b6d944..58990a63 100644 --- a/dagger/streams/storestream.nim +++ b/dagger/streams/storestream.nim @@ -7,6 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +import std/strformat + import pkg/upraises push: {.upraises: [].} @@ -28,24 +30,17 @@ logScope: topics = "dagger storestream" type - ReadPattern* {.pure.} = enum - Sequential, - Grid - StoreStream* = ref object of SeekableStream store*: BlockStore manifest*: Manifest - pattern*: ReadPattern -proc init*( +proc new*( T: type StoreStream, store: BlockStore, - manifest: Manifest, - pattern = ReadPattern.Sequential): T = + manifest: Manifest): T = result = T( store: store, manifest: manifest, - pattern: pattern, offset: 0) result.initStream() @@ -64,16 +59,20 @@ method readOnce*( var read = 0 - while read < nbytes and self.atEof.not: + while read < nbytes and not self.atEof: let pos = self.offset div self.manifest.blockSize blk = (await self.store.getBlock(self.manifest[pos])).tryGet() - blockOffset = if self.offset >= self.manifest.blockSize: + + let + blockOffset = + if self.offset >= self.manifest.blockSize: self.offset mod self.manifest.blockSize else: self.offset - readBytes = if (nbytes - read) >= (self.manifest.blockSize - blockOffset): + readBytes = + if (nbytes - read) >= (self.manifest.blockSize - blockOffset): self.manifest.blockSize - blockOffset else: min(nbytes - read, self.manifest.blockSize) @@ -89,11 +88,11 @@ method atEof*(self: StoreStream): bool = method closeImpl*(self: StoreStream) {.async.} = try: - trace "Closing StoreStream", self + trace "Closing StoreStream" self.offset = self.manifest.len * self.manifest.blockSize # set Eof except CancelledError as exc: raise exc except CatchableError as exc: - trace "Error closing StoreStream", s, msg = exc.msg + trace "Error closing StoreStream", msg = exc.msg await procCall LPStream(self).closeImpl() diff --git a/tests/dagger/testchunking.nim b/tests/dagger/testchunking.nim index ad759d95..4d4668a8 100644 --- a/tests/dagger/testchunking.nim +++ b/tests/dagger/testchunking.nim @@ -58,7 +58,7 @@ suite "Chunking": let (path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name file = open(path) - fileChunker = FileChunker.new(file = file, chunkSize = 256) + fileChunker = FileChunker.new(file = file, chunkSize = 256, pad = false) var data: seq[byte] while true: diff --git a/tests/dagger/testnode.nim b/tests/dagger/testnode.nim index 0c4ffcf3..22e8197c 100644 --- a/tests/dagger/testnode.nim +++ b/tests/dagger/testnode.nim @@ -35,7 +35,7 @@ suite "Test Node": setup: file = open(path.splitFile().dir /../ "fixtures" / "test.jpg") - chunker = FileChunker.new(file = file) + chunker = FileChunker.new(file = file, chunkSize = BlockSize) switch = newStandardSwitch() wallet = WalletRef.new(EthPrivateKey.random()) network = BlockExcNetwork.new(switch) @@ -106,19 +106,15 @@ suite "Test Node": check await localStore.putBlock(manifestBlock) - let stream = BufferStream.new() - check (await node.retrieve(stream, manifestBlock.cid)).isOk - + let stream = (await node.retrieve(manifestBlock.cid)).tryGet() var data: seq[byte] - while true: + while not stream.atEof: var buf = newSeq[byte](BlockSize) - res = await stream.readOnce(addr buf[0], buf.len) - - if res <= 0: - break + res = await stream.readOnce(addr buf[0], BlockSize div 2) buf.setLen(res) + data &= buf check data == original @@ -128,11 +124,8 @@ suite "Test Node": testString = "Block 1" blk = bt.Block.new(testString.toBytes).tryGet() - var - stream = BufferStream.new() - check (await localStore.putBlock(blk)) - check (await node.retrieve(stream, blk.cid)).isOk + let stream = (await node.retrieve(blk.cid)).tryGet() var data = newSeq[byte](testString.len) await stream.readExactly(addr data[0], data.len) diff --git a/tests/dagger/teststorestream.nim b/tests/dagger/teststorestream.nim index 415a9846..9d2d41fb 100644 --- a/tests/dagger/teststorestream.nim +++ b/tests/dagger/teststorestream.nim @@ -33,7 +33,7 @@ suite "StoreStream": setup: store = CacheStore.new() manifest = Manifest.new(blockSize = 10).tryGet() - stream = StoreStream.init(store, manifest) + stream = StoreStream.new(store, manifest) for d in data: let