Node storestream (#62)

* use storestream for retrieval

* use `len` prop on manifest

* remove read pattern

* make blocksize and chunksize consistent

* fix tests

* fix chunker tests - now padded by default
This commit is contained in:
Dmitriy Ryajov 2022-03-29 20:43:35 -06:00 committed by GitHub
parent 03c8ceccf9
commit 43cea1743a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 65 additions and 82 deletions

View File

@ -19,7 +19,7 @@ import pkg/questionable/results
import ./errors import ./errors
const const
BlockSize* = 4096 # file chunk read size BlockSize* = 31 * 64 * 4 # file chunk read size
type type
Block* = ref object of RootObj Block* = ref object of RootObj

View File

@ -24,7 +24,7 @@ import ./blocktype
export blocktype export blocktype
const const
DefaultChunkSize*: Positive = 1024 * 256 DefaultChunkSize* = BlockSize
type type
# default reader type # default reader type
@ -69,7 +69,7 @@ func new*(
kind = ChunkerType.FixedChunker, kind = ChunkerType.FixedChunker,
reader: Reader, reader: Reader,
chunkSize = DefaultChunkSize, chunkSize = DefaultChunkSize,
pad = false): T = pad = true): T =
var chunker = Chunker( var chunker = Chunker(
kind: kind, kind: kind,
reader: reader) reader: reader)
@ -85,7 +85,7 @@ proc new*(
stream: LPStream, stream: LPStream,
kind = ChunkerType.FixedChunker, kind = ChunkerType.FixedChunker,
chunkSize = DefaultChunkSize, chunkSize = DefaultChunkSize,
pad = false): T = pad = true): T =
## create the default File chunker ## create the default File chunker
## ##
@ -114,7 +114,7 @@ proc new*(
file: File, file: File,
kind = ChunkerType.FixedChunker, kind = ChunkerType.FixedChunker,
chunkSize = DefaultChunkSize, chunkSize = DefaultChunkSize,
pad = false): T = pad = true): T =
## create the default File chunker ## create the default File chunker
## ##

View File

@ -64,7 +64,7 @@ func encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] =
var header = initProtoBuffer() var header = initProtoBuffer()
header.write(1, cid.data.buffer) header.write(1, cid.data.buffer)
header.write(2, manifest.blockSize.uint32) header.write(2, manifest.blockSize.uint32)
header.write(3, manifest.blocks.len.uint32) header.write(3, manifest.len.uint32)
pbNode.write(1, header.buffer) # set the rootHash Cid as the data field pbNode.write(1, header.buffer) # set the rootHash Cid as the data field
pbNode.finish() pbNode.finish()

View File

@ -25,6 +25,7 @@ import ./blocktype as bt
import ./manifest import ./manifest
import ./stores/blockstore import ./stores/blockstore
import ./blockexchange import ./blockexchange
import ./streams
logScope: logScope:
topics = "dagger node" topics = "dagger node"
@ -45,7 +46,12 @@ proc start*(node: DaggerNodeRef) {.async.} =
notice "Started dagger node", id = $node.networkId, addrs = node.switch.peerInfo.addrs notice "Started dagger node", id = $node.networkId, addrs = node.switch.peerInfo.addrs
proc stop*(node: DaggerNodeRef) {.async.} = proc stop*(node: DaggerNodeRef) {.async.} =
trace "Stopping node"
if not node.engine.isNil:
await node.engine.stop() await node.engine.stop()
if not node.switch.isNil:
await node.switch.stop() await node.switch.stop()
proc findPeer*( proc findPeer*(
@ -59,31 +65,9 @@ proc connect*(
addrs: seq[MultiAddress]): Future[void] = addrs: seq[MultiAddress]): Future[void] =
node.switch.connect(peerId, addrs) node.switch.connect(peerId, addrs)
proc streamBlocks*(
node: DaggerNodeRef,
stream: BufferStream,
blockManifest: Manifest) {.async.} =
try:
# TODO: Read sequentially for now
# to prevent slurping the entire dataset
# since disk IO is blocking
for c in blockManifest:
without blk =? (await node.blockStore.getBlock(c)):
warn "Couldn't retrieve block", cid = c
break # abort if we couldn't get a block
trace "Streaming block data", cid = blk.cid, bytes = blk.data.len
await stream.pushData(blk.data)
except CatchableError as exc:
trace "Exception retrieving blocks", exc = exc.msg
finally:
await stream.pushEof()
proc retrieve*( proc retrieve*(
node: DaggerNodeRef, node: DaggerNodeRef,
stream: BufferStream, cid: Cid): Future[?!LPStream] {.async.} =
cid: Cid): Future[?!void] {.async.} =
trace "Received retrieval request", cid trace "Received retrieval request", cid
without blk =? await node.blockStore.getBlock(cid): without blk =? await node.blockStore.getBlock(cid):
@ -94,24 +78,29 @@ proc retrieve*(
return failure( return failure(
newException(DaggerError, "Couldn't identify Cid!")) newException(DaggerError, "Couldn't identify Cid!"))
# if we got a manifest, stream the blocks
if $mc in ManifestContainers: if $mc in ManifestContainers:
trace "Retrieving data set", cid, mc trace "Retrieving data set", cid, mc
without blockManifest =? Manifest.decode(blk.data, ManifestContainers[$mc]): without manifest =? Manifest.decode(blk.data, ManifestContainers[$mc]):
return failure("Unable to construct manifest!") return failure("Unable to construct manifest!")
asyncSpawn node.streamBlocks(stream, blockManifest) return LPStream(StoreStream.new(node.blockStore, manifest)).success
else:
asyncSpawn (proc(): Future[void] {.async.} = let
stream = BufferStream.new()
proc streamOneBlock(): Future[void] {.async.} =
try: try:
await stream.pushData(blk.data) await stream.pushData(blk.data)
except CatchableError as exc: except CatchableError as exc:
trace "Unable to send block", cid trace "Unable to send block", cid
discard discard
finally: finally:
await stream.pushEof())() await stream.pushEof()
return success() asyncSpawn streamOneBlock()
return LPStream(stream).success()
proc store*( proc store*(
node: DaggerNodeRef, node: DaggerNodeRef,
@ -122,7 +111,7 @@ proc store*(
return failure("Unable to create Block Set") return failure("Unable to create Block Set")
let let
chunker = LPStreamChunker.new(stream) chunker = LPStreamChunker.new(stream, chunkSize = BlockSize)
try: try:
while ( while (
@ -159,13 +148,12 @@ proc store*(
trace "Unable to store manifest", cid = manifest.cid trace "Unable to store manifest", cid = manifest.cid
return failure("Unable to store manifest " & $manifest.cid) return failure("Unable to store manifest " & $manifest.cid)
var cid: ?!Cid without cid =? blockManifest.cid, error:
if (cid = blockManifest.cid; cid.isErr): trace "Unable to generate manifest Cid!", exc = error.msg
trace "Unable to generate manifest Cid!", exc = cid.error.msg return failure(error.msg)
return failure(cid.error.msg)
trace "Stored data", manifestCid = manifest.cid, trace "Stored data", manifestCid = manifest.cid,
contentCid = !cid, contentCid = cid,
blocks = blockManifest.len blocks = blockManifest.len
return manifest.cid.success return manifest.cid.success

View File

@ -111,18 +111,20 @@ proc initRestApi*(node: DaggerNodeRef): RestRouter =
Http400, Http400,
$id.error()) $id.error())
let var
stream = BufferStream.new() stream: LPStream
var bytes = 0 var bytes = 0
try: try:
if ( if (
let retr = await node.retrieve(stream, id.get()); let retr = await node.retrieve(id.get());
retr.isErr): retr.isErr):
return RestApiResponse.error(Http404, retr.error.msg) return RestApiResponse.error(Http404, retr.error.msg)
resp.addHeader("Content-Type", "application/octet-stream") resp.addHeader("Content-Type", "application/octet-stream")
await resp.prepareChunked() await resp.prepareChunked()
stream = retr.get()
while not stream.atEof: while not stream.atEof:
var var
buff = newSeqUninitialized[byte](FileChunkSize) buff = newSeqUninitialized[byte](FileChunkSize)
@ -141,6 +143,7 @@ proc initRestApi*(node: DaggerNodeRef): RestRouter =
return RestApiResponse.error(Http500) return RestApiResponse.error(Http500)
finally: finally:
trace "Sent bytes", cid = id.get(), bytes trace "Sent bytes", cid = id.get(), bytes
if not stream.isNil:
await stream.close() await stream.close()
router.rawApi( router.rawApi(

View File

@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import std/strformat
import pkg/upraises import pkg/upraises
push: {.upraises: [].} push: {.upraises: [].}
@ -28,24 +30,17 @@ logScope:
topics = "dagger storestream" topics = "dagger storestream"
type type
ReadPattern* {.pure.} = enum
Sequential,
Grid
StoreStream* = ref object of SeekableStream StoreStream* = ref object of SeekableStream
store*: BlockStore store*: BlockStore
manifest*: Manifest manifest*: Manifest
pattern*: ReadPattern
proc init*( proc new*(
T: type StoreStream, T: type StoreStream,
store: BlockStore, store: BlockStore,
manifest: Manifest, manifest: Manifest): T =
pattern = ReadPattern.Sequential): T =
result = T( result = T(
store: store, store: store,
manifest: manifest, manifest: manifest,
pattern: pattern,
offset: 0) offset: 0)
result.initStream() result.initStream()
@ -64,16 +59,20 @@ method readOnce*(
var var
read = 0 read = 0
while read < nbytes and self.atEof.not: while read < nbytes and not self.atEof:
let let
pos = self.offset div self.manifest.blockSize pos = self.offset div self.manifest.blockSize
blk = (await self.store.getBlock(self.manifest[pos])).tryGet() blk = (await self.store.getBlock(self.manifest[pos])).tryGet()
blockOffset = if self.offset >= self.manifest.blockSize:
let
blockOffset =
if self.offset >= self.manifest.blockSize:
self.offset mod self.manifest.blockSize self.offset mod self.manifest.blockSize
else: else:
self.offset self.offset
readBytes = if (nbytes - read) >= (self.manifest.blockSize - blockOffset): readBytes =
if (nbytes - read) >= (self.manifest.blockSize - blockOffset):
self.manifest.blockSize - blockOffset self.manifest.blockSize - blockOffset
else: else:
min(nbytes - read, self.manifest.blockSize) min(nbytes - read, self.manifest.blockSize)
@ -89,11 +88,11 @@ method atEof*(self: StoreStream): bool =
method closeImpl*(self: StoreStream) {.async.} = method closeImpl*(self: StoreStream) {.async.} =
try: try:
trace "Closing StoreStream", self trace "Closing StoreStream"
self.offset = self.manifest.len * self.manifest.blockSize # set Eof self.offset = self.manifest.len * self.manifest.blockSize # set Eof
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
trace "Error closing StoreStream", s, msg = exc.msg trace "Error closing StoreStream", msg = exc.msg
await procCall LPStream(self).closeImpl() await procCall LPStream(self).closeImpl()

View File

@ -58,7 +58,7 @@ suite "Chunking":
let let
(path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name (path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name
file = open(path) file = open(path)
fileChunker = FileChunker.new(file = file, chunkSize = 256) fileChunker = FileChunker.new(file = file, chunkSize = 256, pad = false)
var data: seq[byte] var data: seq[byte]
while true: while true:

View File

@ -35,7 +35,7 @@ suite "Test Node":
setup: setup:
file = open(path.splitFile().dir /../ "fixtures" / "test.jpg") file = open(path.splitFile().dir /../ "fixtures" / "test.jpg")
chunker = FileChunker.new(file = file) chunker = FileChunker.new(file = file, chunkSize = BlockSize)
switch = newStandardSwitch() switch = newStandardSwitch()
wallet = WalletRef.new(EthPrivateKey.random()) wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch) network = BlockExcNetwork.new(switch)
@ -106,19 +106,15 @@ suite "Test Node":
check await localStore.putBlock(manifestBlock) check await localStore.putBlock(manifestBlock)
let stream = BufferStream.new() let stream = (await node.retrieve(manifestBlock.cid)).tryGet()
check (await node.retrieve(stream, manifestBlock.cid)).isOk
var data: seq[byte] var data: seq[byte]
while true: while not stream.atEof:
var var
buf = newSeq[byte](BlockSize) buf = newSeq[byte](BlockSize)
res = await stream.readOnce(addr buf[0], buf.len) res = await stream.readOnce(addr buf[0], BlockSize div 2)
if res <= 0:
break
buf.setLen(res) buf.setLen(res)
data &= buf data &= buf
check data == original check data == original
@ -128,11 +124,8 @@ suite "Test Node":
testString = "Block 1" testString = "Block 1"
blk = bt.Block.new(testString.toBytes).tryGet() blk = bt.Block.new(testString.toBytes).tryGet()
var
stream = BufferStream.new()
check (await localStore.putBlock(blk)) check (await localStore.putBlock(blk))
check (await node.retrieve(stream, blk.cid)).isOk let stream = (await node.retrieve(blk.cid)).tryGet()
var data = newSeq[byte](testString.len) var data = newSeq[byte](testString.len)
await stream.readExactly(addr data[0], data.len) await stream.readExactly(addr data[0], data.len)

View File

@ -33,7 +33,7 @@ suite "StoreStream":
setup: setup:
store = CacheStore.new() store = CacheStore.new()
manifest = Manifest.new(blockSize = 10).tryGet() manifest = Manifest.new(blockSize = 10).tryGet()
stream = StoreStream.init(store, manifest) stream = StoreStream.new(store, manifest)
for d in data: for d in data:
let let