From f24ded0f761832161067ba17ddf6fb566ed096d7 Mon Sep 17 00:00:00 2001 From: Bulat-Ziganshin Date: Wed, 24 Aug 2022 15:15:59 +0300 Subject: [PATCH] Download files without padding (#218) The initial goal of this patch was to allow to download of a file via REST API in exactly the same size as it was uploaded, which required adding fields Chunker.offset and Manifest.originalBytes to keep that size. On top of that, we added more integrity checks to operations on Manifest, and reorganized TestNode.nim to test the actual interaction between node.store and node.retrieve operations. Note that the wire format of Manifest was changed, so we need to recreate all BlockStores. * Download without padding * Fixed chunker tests * Chunker: get rid of RabinChunker * Verify offset in the chunker tests * Use manifest.originalBytesPadded in StoreStream.size * StoreStream: replace emptyBlock with zeroMem * Manifest.bytes: compute how many bytes corresponding StoreStream(Manifest, pad) will return * Manifest: verify originalBytes and originalLen on new/encode/decode Also set originalBytes in each Manifest creation/update scenario * Manifest: comments, split code into sections * Reordered parameters to deal with int64 size in 32-bit builds * TestNode.nim: combine Store and Retrieve tests 1. Instead of copy-pasting code from node.nim, new test calls node.store() and node.retrieve() in order to check that they can correctly store and then retrieve data 2. New test compares only file contents, manifest contents considered an implementation detail 3. New test chunks at odd chunkSize=BlockSize/1.618 in order to ensure that data retrieved correctly even when buffer sizes mismatch * TestNode.nim: code refactoring * Manifest.add: one more test * Manifest.verify: return Result instead of raising Defect * Node.store: added blockSize parameter --- codex/chunker.nim | 52 ++++----- codex/manifest/coders.nim | 11 ++ codex/manifest/manifest.nim | 67 ++++++++++-- codex/manifest/types.nim | 9 +- codex/node.nim | 28 +++-- codex/streams/storestream.nim | 41 +++---- codex/utils.nim | 11 ++ tests/codex/helpers/randomchunker.nim | 2 - tests/codex/testchunking.nim | 18 ++- tests/codex/testnode.nim | 151 ++++++++++---------------- vendor/questionable | 2 +- 11 files changed, 208 insertions(+), 184 deletions(-) diff --git a/codex/chunker.nim b/codex/chunker.nim index 6b0b9cc2..4f25214e 100644 --- a/codex/chunker.nim +++ b/codex/chunker.nim @@ -29,21 +29,14 @@ const type # default reader type ChunkBuffer* = ptr UncheckedArray[byte] - Reader* = - proc(data: ChunkBuffer, len: int): Future[int] {.gcsafe, raises: [Defect].} + Reader* = proc(data: ChunkBuffer, len: int): Future[int] {.gcsafe, raises: [Defect].} - ChunkerType* {.pure.} = enum - FixedChunker - RabinChunker - - Chunker* = object - reader*: Reader - case kind*: ChunkerType: - of FixedChunker: - chunkSize*: Natural - pad*: bool # pad last block if less than size - of RabinChunker: - discard + # Reader that splits input data into fixed-size chunks + Chunker* = ref object + reader*: Reader # Procedure called to actually read the data + offset*: int # Bytes read so far (position in the stream) + chunkSize*: Natural # Size of each chunk + pad*: bool # Pad last chunk to chunkSize? FileChunker* = Chunker LPStreamChunker* = Chunker @@ -59,6 +52,8 @@ proc getBytes*(c: Chunker): Future[seq[byte]] {.async.} = if read <= 0: return @[] + c.offset += read + if not c.pad and buff.len > read: buff.setLen(read) @@ -66,24 +61,18 @@ proc getBytes*(c: Chunker): Future[seq[byte]] {.async.} = func new*( T: type Chunker, - kind = ChunkerType.FixedChunker, reader: Reader, chunkSize = DefaultChunkSize, pad = true): T = - var chunker = Chunker( - kind: kind, - reader: reader) - if kind == ChunkerType.FixedChunker: - chunker.pad = pad - chunker.chunkSize = chunkSize - - return chunker + T(reader: reader, + offset: 0, + chunkSize: chunkSize, + pad: pad) proc new*( T: type LPStreamChunker, stream: LPStream, - kind = ChunkerType.FixedChunker, chunkSize = DefaultChunkSize, pad = true): T = ## create the default File chunker @@ -103,16 +92,14 @@ proc new*( return res - Chunker.new( - kind = ChunkerType.FixedChunker, + T.new( reader = reader, - pad = pad, - chunkSize = chunkSize) + chunkSize = chunkSize, + pad = pad) proc new*( T: type FileChunker, file: File, - kind = ChunkerType.FixedChunker, chunkSize = DefaultChunkSize, pad = true): T = ## create the default File chunker @@ -136,8 +123,7 @@ proc new*( return total - Chunker.new( - kind = ChunkerType.FixedChunker, + T.new( reader = reader, - pad = pad, - chunkSize = chunkSize) + chunkSize = chunkSize, + pad = pad) diff --git a/codex/manifest/coders.nim b/codex/manifest/coders.nim index d86f33d2..f8e0a74b 100644 --- a/codex/manifest/coders.nim +++ b/codex/manifest/coders.nim @@ -7,6 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +# This module implements serialization and deserialization of Manifest + import pkg/upraises push: {.upraises: [].} @@ -29,6 +31,7 @@ func encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] = ## multicodec container (Dag-pb) for now ## + ? manifest.verify() var pbNode = initProtoBuffer() for c in manifest.blocks: @@ -52,6 +55,7 @@ func encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] = # optional uint32 blockSize = 2; # size of a single block # optional uint32 blocksLen = 3; # total amount of blocks # optional ErasureInfo erasure = 4; # erasure coding info + # optional uint64 originalBytes = 5;# exact file size # } # ``` # @@ -61,6 +65,7 @@ func encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] = header.write(1, cid.data.buffer) header.write(2, manifest.blockSize.uint32) header.write(3, manifest.len.uint32) + header.write(5, manifest.originalBytes.uint64) if manifest.protected: var erasureInfo = initProtoBuffer() erasureInfo.write(1, manifest.K.uint32) @@ -86,6 +91,7 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = pbErasureInfo: ProtoBuffer rootHash: seq[byte] originalCid: seq[byte] + originalBytes: uint64 blockSize: uint32 blocksLen: uint32 originalLen: uint32 @@ -106,6 +112,9 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = if pbHeader.getField(3, blocksLen).isErr: return failure("Unable to decode `blocksLen` from manifest!") + if pbHeader.getField(5, originalBytes).isErr: + return failure("Unable to decode `originalBytes` from manifest!") + if pbHeader.getField(4, pbErasureInfo).isErr: return failure("Unable to decode `erasureInfo` from manifest!") @@ -140,6 +149,7 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = var self = Manifest( rootHash: rootHashCid.some, + originalBytes: originalBytes.int, blockSize: blockSize.int, blocks: blocks, hcodec: (? rootHashCid.mhash.mapFailure).mcodec, @@ -153,6 +163,7 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = self.originalCid = ? Cid.init(originalCid).mapFailure self.originalLen = originalLen.int + ? self.verify() self.success proc encode*( diff --git a/codex/manifest/manifest.nim b/codex/manifest/manifest.nim index ebfe039a..8be8d4bb 100644 --- a/codex/manifest/manifest.nim +++ b/codex/manifest/manifest.nim @@ -7,6 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +# This module defines all operations on Manifest + import pkg/upraises push: {.upraises: [].} @@ -18,16 +20,18 @@ import pkg/questionable/results import pkg/chronicles import ../errors +import ../utils import ../blocktype import ./types import ./coders +############################################################ +# Operations on block list +############################################################ + func len*(self: Manifest): int = self.blocks.len -func size*(self: Manifest): int = - self.blocks.len * self.blockSize - func `[]`*(self: Manifest, i: Natural): Cid = self.blocks[i] @@ -43,9 +47,11 @@ func `[]=`*(self: Manifest, i: BackwardsIndex, item: Cid) = self.blocks[self.len - i.int] = item proc add*(self: Manifest, cid: Cid) = + assert not self.protected # we expect that protected manifests are created with properly-sized self.blocks self.rootHash = Cid.none trace "Adding cid to manifest", cid self.blocks.add(cid) + self.originalBytes = self.blocks.len * self.blockSize iterator items*(self: Manifest): Cid = for b in self.blocks: @@ -58,6 +64,44 @@ iterator pairs*(self: Manifest): tuple[key: int, val: Cid] = func contains*(self: Manifest, cid: Cid): bool = cid in self.blocks + +############################################################ +# Various sizes and verification +############################################################ + +func bytes*(self: Manifest, pad = true): int = + ## Compute how many bytes corresponding StoreStream(Manifest, pad) will return + if pad or self.protected: + self.len * self.blockSize + else: + self.originalBytes + +func rounded*(self: Manifest): int = + ## Number of data blocks in *protected* manifest including padding at the end + roundUp(self.originalLen, self.K) + +func steps*(self: Manifest): int = + ## Number of EC groups in *protected* manifest + divUp(self.originalLen, self.K) + +func verify*(self: Manifest): ?!void = + ## Check manifest correctness + ## + let originalLen = (if self.protected: self.originalLen else: self.len) + + if divUp(self.originalBytes, self.blockSize) != originalLen: + return failure newException(CodexError, "Broken manifest: wrong originalBytes") + + if self.protected and (self.len != self.steps * (self.K + self.M)): + return failure newException(CodexError, "Broken manifest: wrong originalLen") + + return success() + + +############################################################ +# Cid computation +############################################################ + template hashBytes(mh: MultiHash): seq[byte] = ## get the hash bytes of a multihash object ## @@ -95,15 +139,6 @@ proc makeRoot*(self: Manifest): ?!void = success() -func rounded*(self: Manifest): int = - if (self.originalLen mod self.K) != 0: - return self.originalLen + (self.K - (self.originalLen mod self.K)) - - self.originalLen - -func steps*(self: Manifest): int = - self.rounded div self.K # number of blocks per row - proc cid*(self: Manifest): ?!Cid = ## Generate a root hash using the treehash algorithm ## @@ -113,6 +148,11 @@ proc cid*(self: Manifest): ?!Cid = (!self.rootHash).success + +############################################################ +# Constructors +############################################################ + proc new*( T: type Manifest, blocks: openArray[Cid] = [], @@ -133,6 +173,7 @@ proc new*( codec: codec, hcodec: hcodec, blockSize: blockSize, + originalBytes: blocks.len * blockSize, protected: protected).success proc new*( @@ -148,6 +189,7 @@ proc new*( version: manifest.version, codec: manifest.codec, hcodec: manifest.hcodec, + originalBytes: manifest.originalBytes, blockSize: manifest.blockSize, protected: true, K: K, M: M, @@ -170,6 +212,7 @@ proc new*( .catch .get() + ? self.verify() self.success proc new*( diff --git a/codex/manifest/types.nim b/codex/manifest/types.nim index c189da0f..11eba72c 100644 --- a/codex/manifest/types.nim +++ b/codex/manifest/types.nim @@ -7,6 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +# This module defines Manifest and all related types + import std/tables import pkg/libp2p import pkg/questionable @@ -25,9 +27,10 @@ const type Manifest* = ref object of RootObj - rootHash*: ?Cid # root (tree) hash of the contained data set - blockSize*: int # size of each contained block (might not be needed if blocks are len-prefixed) - blocks*: seq[Cid] # block Cid + rootHash*: ?Cid # Root (tree) hash of the contained data set + originalBytes*: int # Exact size of the original (uploaded) file + blockSize*: int # Size of each contained block (might not be needed if blocks are len-prefixed) + blocks*: seq[Cid] # Block Cid version*: CidVersion # Cid version hcodec*: MultiCodec # Multihash codec codec*: MultiCodec # Data set codec diff --git a/codex/node.nim b/codex/node.nim index bf82243b..620eb61f 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -115,28 +115,33 @@ proc fetchBatched*( proc retrieve*( node: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async.} = - ## Retrieve a block or manifest + ## Retrieve by Cid a single block or an entire dataset described by manifest ## if manifest =? (await node.fetchManifest(cid)): if manifest.protected: + # Retrieve, decode and save to the local store all EС groups proc erasureJob(): Future[void] {.async.} = try: - without res =? (await node.erasure.decode(manifest)), error: # spawn an erasure decoding job + # Spawn an erasure decoding job + without res =? (await node.erasure.decode(manifest)), error: trace "Unable to erasure decode manifest", cid, exc = error.msg except CatchableError as exc: trace "Exception decoding manifest", cid - + # asyncSpawn erasureJob() else: + # Prefetch the entire dataset into the local store proc prefetchBlocks() {.async, raises: [Defect].} = try: discard await node.fetchBatched(manifest) except CatchableError as exc: trace "Exception prefetching blocks", exc = exc.msg - + # asyncSpawn prefetchBlocks() - return LPStream(StoreStream.new(node.blockStore, manifest)).success + # + # Retrieve all blocks of the dataset sequentially from the local store or network + return LPStream(StoreStream.new(node.blockStore, manifest, pad = false)).success let stream = BufferStream.new() @@ -158,14 +163,18 @@ proc retrieve*( proc store*( node: CodexNodeRef, - stream: LPStream): Future[?!Cid] {.async.} = + stream: LPStream, + blockSize = BlockSize): Future[?!Cid] {.async.} = + ## Save stream contents as dataset with given blockSize + ## to nodes's BlockStore, and return Cid of its manifest + ## trace "Storing data" - without var blockManifest =? Manifest.new(): + without var blockManifest =? Manifest.new(blockSize = blockSize): return failure("Unable to create Block Set") - let - chunker = LPStreamChunker.new(stream, chunkSize = BlockSize) + # Manifest and chunker should use the same blockSize + let chunker = LPStreamChunker.new(stream, chunkSize = blockSize) try: while ( @@ -189,6 +198,7 @@ proc store*( await stream.close() # Generate manifest + blockManifest.originalBytes = chunker.offset # store the exact file size without data =? blockManifest.encode(): return failure( newException(CodexError, "Could not generate dataset manifest!")) diff --git a/codex/streams/storestream.nim b/codex/streams/storestream.nim index 8d533638..e90d09f1 100644 --- a/codex/streams/storestream.nim +++ b/codex/streams/storestream.nim @@ -30,25 +30,29 @@ logScope: topics = "dagger storestream" type + # Make SeekableStream from a sequence of blocks stored in Manifest + # (only original file data - see StoreStream.size) StoreStream* = ref object of SeekableStream - store*: BlockStore - manifest*: Manifest - emptyBlock*: seq[byte] + store*: BlockStore # Store where to lookup block contents + manifest*: Manifest # List of block CIDs + pad*: bool # Pad last block to manifest.blockSize? proc new*( T: type StoreStream, store: BlockStore, - manifest: Manifest): T = + manifest: Manifest, + pad = true): T = + result = T( store: store, manifest: manifest, - offset: 0, - emptyBlock: newSeq[byte](manifest.blockSize)) + pad: pad, + offset: 0) result.initStream() method `size`*(self: StoreStream): int = - self.manifest.len * self.manifest.blockSize + bytes(self.manifest, self.pad) proc `size=`*(self: StoreStream, size: int) {.error: "Setting the size is forbidden".} = @@ -78,7 +82,7 @@ method readOnce*( let blockNum = self.offset div self.manifest.blockSize blockOffset = self.offset mod self.manifest.blockSize - readBytes = min(nbytes - read, self.manifest.blockSize - blockOffset) + readBytes = min([self.size - self.offset, nbytes - read, self.manifest.blockSize - blockOffset]) # Read contents of block `blockNum` without blk =? await self.store.getBlock(self.manifest[blockNum]), error: @@ -87,13 +91,10 @@ method readOnce*( trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset # Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf - copyMem( - pbytes.offset(read), - if blk.isEmpty: - self.emptyBlock[blockOffset].addr - else: - blk.data[blockOffset].addr, - readBytes) + if blk.isEmpty: + zeroMem(pbytes.offset(read), readBytes) + else: + copyMem(pbytes.offset(read), blk.data[blockOffset].addr, readBytes) # Update current positions in the stream and outbuf self.offset += readBytes @@ -102,12 +103,6 @@ method readOnce*( return read method closeImpl*(self: StoreStream) {.async.} = - try: - trace "Closing StoreStream" - self.offset = self.manifest.len * self.manifest.blockSize # set Eof - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "Error closing StoreStream", msg = exc.msg - + trace "Closing StoreStream" + self.offset = self.size # set Eof await procCall LPStream(self).closeImpl() diff --git a/codex/utils.nim b/codex/utils.nim index 8d561d40..e5f21ef2 100644 --- a/codex/utils.nim +++ b/codex/utils.nim @@ -2,3 +2,14 @@ import ./utils/asyncheapqueue import ./utils/fileutils export asyncheapqueue, fileutils + + +func divUp*[T](a, b : T): T = + ## Division with result rounded up (rather than truncated as in 'div') + assert(b != 0) + if a==0: 0 else: ((a - 1) div b) + 1 + +func roundUp*[T](a, b : T): T = + ## Round up 'a' to the next value divisible by 'b' + divUp(a,b) * b + diff --git a/tests/codex/helpers/randomchunker.nim b/tests/codex/helpers/randomchunker.nim index 87726953..0d0b7b4f 100644 --- a/tests/codex/helpers/randomchunker.nim +++ b/tests/codex/helpers/randomchunker.nim @@ -13,7 +13,6 @@ type proc new*( T: type RandomChunker, rng: Rng, - kind = ChunkerType.FixedChunker, chunkSize = DefaultChunkSize, size: int, pad = false): T = @@ -43,7 +42,6 @@ proc new*( return read Chunker.new( - kind = ChunkerType.FixedChunker, reader = reader, pad = pad, chunkSize = chunkSize) diff --git a/tests/codex/testchunking.nim b/tests/codex/testchunking.nim index 626c332c..3a7f1b4f 100644 --- a/tests/codex/testchunking.nim +++ b/tests/codex/testchunking.nim @@ -11,12 +11,14 @@ suite "Chunking": let contents = [1.byte, 2, 3, 4, 5, 6, 7, 8, 9, 0] proc reader(data: ChunkBuffer, len: int): Future[int] {.gcsafe, async, raises: [Defect].} = - if offset >= contents.len: + + let read = min(contents.len - offset, len) + if read == 0: return 0 - copyMem(data, unsafeAddr contents[offset], len) - offset += 2 - return len + copyMem(data, unsafeAddr contents[offset], read) + offset += read + return read let chunker = Chunker.new( reader = reader, @@ -29,9 +31,9 @@ suite "Chunking": (await chunker.getBytes()) == [7.byte, 8] (await chunker.getBytes()) == [9.byte, 0] (await chunker.getBytes()) == [] + chunker.offset == offset test "should chunk LPStream": - var offset = 0 let stream = BufferStream.new() let chunker = LPStreamChunker.new( stream = stream, @@ -51,6 +53,7 @@ suite "Chunking": (await chunker.getBytes()) == [7.byte, 8] (await chunker.getBytes()) == [9.byte, 0] (await chunker.getBytes()) == [] + chunker.offset == 10 await writerFut @@ -69,4 +72,7 @@ suite "Chunking": check buff.len <= fileChunker.chunkSize data.add(buff) - check string.fromBytes(data) == readFile(path) + check: + string.fromBytes(data) == readFile(path) + fileChunker.offset == data.len + diff --git a/tests/codex/testnode.nim b/tests/codex/testnode.nim index b9eef95c..fb14d8c6 100644 --- a/tests/codex/testnode.nim +++ b/tests/codex/testnode.nim @@ -1,5 +1,6 @@ import std/os import std/options +import std/math import pkg/asynctest import pkg/chronos @@ -39,6 +40,39 @@ suite "Test Node": pendingBlocks: PendingBlocksManager discovery: DiscoveryEngine + proc fetch(T: type Manifest, chunker: Chunker): Future[Manifest] {.async.} = + # Collect blocks from Chunker into Manifest + var + manifest = Manifest.new().tryGet() + + while ( + let chunk = await chunker.getBytes(); + chunk.len > 0): + + let blk = bt.Block.new(chunk).tryGet() + (await localStore.putBlock(blk)).tryGet() + manifest.add(blk.cid) + + return manifest + + proc retrieve(cid: Cid): Future[seq[byte]] {.async.} = + # Retrieve an entire file contents by file Cid + let + oddChunkSize = math.trunc(BlockSize/1.359).int # Let's check that node.retrieve can correctly rechunk data + stream = (await node.retrieve(cid)).tryGet() + var + data: seq[byte] + + while not stream.atEof: + var + buf = newSeq[byte](oddChunkSize) + res = await stream.readOnce(addr buf[0], oddChunkSize) + check res <= oddChunkSize + buf.setLen(res) + data &= buf + + return data + setup: file = open(path.splitFile().dir /../ "fixtures" / "test.jpg") chunker = FileChunker.new(file = file, chunkSize = BlockSize) @@ -61,18 +95,9 @@ suite "Test Node": await node.stop() test "Fetch Manifest": - var - manifest = Manifest.new().tryGet() - - while ( - let chunk = await chunker.getBytes(); - chunk.len > 0): - - let blk = bt.Block.new(chunk).tryGet() - (await localStore.putBlock(blk)).tryGet() - manifest.add(blk.cid) - let + manifest = await Manifest.fetch(chunker) + manifestBlock = bt.Block.new( manifest.encode().tryGet(), codec = DagPBCodec @@ -88,115 +113,51 @@ suite "Test Node": fetched.blocks == manifest.blocks test "Block Batching": - var - manifest = Manifest.new().tryGet() - - while ( - let chunk = await chunker.getBytes(); - chunk.len > 0): - - let blk = bt.Block.new(chunk).tryGet() - (await localStore.putBlock(blk)).tryGet() - manifest.add(blk.cid) - let - manifestBlock = bt.Block.new( - manifest.encode().tryGet(), - codec = DagPBCodec - ).tryGet() + manifest = await Manifest.fetch(chunker) - (await node.fetchBatched( - manifest, - batchSize = 3, - proc(blocks: seq[bt.Block]) {.gcsafe, async.} = - check blocks.len > 0 and blocks.len <= 3 - )).tryGet() + for batchSize in 1..12: + (await node.fetchBatched( + manifest, + batchSize = batchSize, + proc(blocks: seq[bt.Block]) {.gcsafe, async.} = + check blocks.len > 0 and blocks.len <= batchSize + )).tryGet() - (await node.fetchBatched( - manifest, - batchSize = 6, - proc(blocks: seq[bt.Block]) {.gcsafe, async.} = - check blocks.len > 0 and blocks.len <= 6 - )).tryGet() - - (await node.fetchBatched( - manifest, - batchSize = 9, - proc(blocks: seq[bt.Block]) {.gcsafe, async.} = - check blocks.len > 0 and blocks.len <= 9 - )).tryGet() - - (await node.fetchBatched( - manifest, - batchSize = 11, - proc(blocks: seq[bt.Block]) {.gcsafe, async.} = - check blocks.len > 0 and blocks.len <= 11 - )).tryGet() - - test "Store Data Stream": + test "Store and retrieve Data Stream": let stream = BufferStream.new() storeFut = node.store(stream) - + oddChunkSize = math.trunc(BlockSize/1.618).int # Let's check that node.store can correctly rechunk these odd chunks + oddChunker = FileChunker.new(file = file, chunkSize = oddChunkSize, pad = false) # TODO: doesn't work with pad=tue var - manifest = Manifest.new().tryGet() + original: seq[byte] try: while ( - let chunk = await chunker.getBytes(); + let chunk = await oddChunker.getBytes(); chunk.len > 0): + original &= chunk await stream.pushData(chunk) - manifest.add(bt.Block.new(chunk).tryGet().cid) finally: await stream.pushEof() await stream.close() let manifestCid = (await storeFut).tryGet() - check: (await localStore.hasBlock(manifestCid)).tryGet() - var + let manifestBlock = (await localStore.getBlock(manifestCid)).tryGet() localManifest = Manifest.decode(manifestBlock).tryGet() - check: - manifest.len == localManifest.len - manifest.cid == localManifest.cid - - test "Retrieve Data Stream": - var - manifest = Manifest.new().tryGet() - original: seq[byte] - - while ( - let chunk = await chunker.getBytes(); - chunk.len > 0): - - let blk = bt.Block.new(chunk).tryGet() - original &= chunk - (await localStore.putBlock(blk)).tryGet() - manifest.add(blk.cid) - let - manifestBlock = bt.Block.new( - manifest.encode().tryGet(), - codec = DagPBCodec - ).tryGet() - - (await localStore.putBlock(manifestBlock)).tryGet() - - let stream = (await node.retrieve(manifestBlock.cid)).tryGet() - var data: seq[byte] - while not stream.atEof: - var - buf = newSeq[byte](BlockSize) - res = await stream.readOnce(addr buf[0], BlockSize div 2) - buf.setLen(res) - data &= buf - - check data == original + data = await retrieve(manifestCid) + check: + data.len == localManifest.originalBytes + data.len == original.len + sha256.digest(data) == sha256.digest(original) test "Retrieve One Block": let diff --git a/vendor/questionable b/vendor/questionable index 82408a5c..6018fd43 160000 --- a/vendor/questionable +++ b/vendor/questionable @@ -1 +1 @@ -Subproject commit 82408a5ca2c24eca411d6774cf83816877170627 +Subproject commit 6018fd43e033d5a5310faa45bcaa1b44049469a4