diff --git a/codex/chunker.nim b/codex/chunker.nim index 6b0b9cc2..4f25214e 100644 --- a/codex/chunker.nim +++ b/codex/chunker.nim @@ -29,21 +29,14 @@ const type # default reader type ChunkBuffer* = ptr UncheckedArray[byte] - Reader* = - proc(data: ChunkBuffer, len: int): Future[int] {.gcsafe, raises: [Defect].} + Reader* = proc(data: ChunkBuffer, len: int): Future[int] {.gcsafe, raises: [Defect].} - ChunkerType* {.pure.} = enum - FixedChunker - RabinChunker - - Chunker* = object - reader*: Reader - case kind*: ChunkerType: - of FixedChunker: - chunkSize*: Natural - pad*: bool # pad last block if less than size - of RabinChunker: - discard + # Reader that splits input data into fixed-size chunks + Chunker* = ref object + reader*: Reader # Procedure called to actually read the data + offset*: int # Bytes read so far (position in the stream) + chunkSize*: Natural # Size of each chunk + pad*: bool # Pad last chunk to chunkSize? FileChunker* = Chunker LPStreamChunker* = Chunker @@ -59,6 +52,8 @@ proc getBytes*(c: Chunker): Future[seq[byte]] {.async.} = if read <= 0: return @[] + c.offset += read + if not c.pad and buff.len > read: buff.setLen(read) @@ -66,24 +61,18 @@ proc getBytes*(c: Chunker): Future[seq[byte]] {.async.} = func new*( T: type Chunker, - kind = ChunkerType.FixedChunker, reader: Reader, chunkSize = DefaultChunkSize, pad = true): T = - var chunker = Chunker( - kind: kind, - reader: reader) - if kind == ChunkerType.FixedChunker: - chunker.pad = pad - chunker.chunkSize = chunkSize - - return chunker + T(reader: reader, + offset: 0, + chunkSize: chunkSize, + pad: pad) proc new*( T: type LPStreamChunker, stream: LPStream, - kind = ChunkerType.FixedChunker, chunkSize = DefaultChunkSize, pad = true): T = ## create the default File chunker @@ -103,16 +92,14 @@ proc new*( return res - Chunker.new( - kind = ChunkerType.FixedChunker, + T.new( reader = reader, - pad = pad, - chunkSize = chunkSize) + chunkSize = chunkSize, + pad = pad) proc new*( T: type FileChunker, file: File, - kind = ChunkerType.FixedChunker, chunkSize = DefaultChunkSize, pad = true): T = ## create the default File chunker @@ -136,8 +123,7 @@ proc new*( return total - Chunker.new( - kind = ChunkerType.FixedChunker, + T.new( reader = reader, - pad = pad, - chunkSize = chunkSize) + chunkSize = chunkSize, + pad = pad) diff --git a/codex/manifest/coders.nim b/codex/manifest/coders.nim index d86f33d2..f8e0a74b 100644 --- a/codex/manifest/coders.nim +++ b/codex/manifest/coders.nim @@ -7,6 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +# This module implements serialization and deserialization of Manifest + import pkg/upraises push: {.upraises: [].} @@ -29,6 +31,7 @@ func encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] = ## multicodec container (Dag-pb) for now ## + ? manifest.verify() var pbNode = initProtoBuffer() for c in manifest.blocks: @@ -52,6 +55,7 @@ func encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] = # optional uint32 blockSize = 2; # size of a single block # optional uint32 blocksLen = 3; # total amount of blocks # optional ErasureInfo erasure = 4; # erasure coding info + # optional uint64 originalBytes = 5;# exact file size # } # ``` # @@ -61,6 +65,7 @@ func encode*(_: DagPBCoder, manifest: Manifest): ?!seq[byte] = header.write(1, cid.data.buffer) header.write(2, manifest.blockSize.uint32) header.write(3, manifest.len.uint32) + header.write(5, manifest.originalBytes.uint64) if manifest.protected: var erasureInfo = initProtoBuffer() erasureInfo.write(1, manifest.K.uint32) @@ -86,6 +91,7 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = pbErasureInfo: ProtoBuffer rootHash: seq[byte] originalCid: seq[byte] + originalBytes: uint64 blockSize: uint32 blocksLen: uint32 originalLen: uint32 @@ -106,6 +112,9 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = if pbHeader.getField(3, blocksLen).isErr: return failure("Unable to decode `blocksLen` from manifest!") + if pbHeader.getField(5, originalBytes).isErr: + return failure("Unable to decode `originalBytes` from manifest!") + if pbHeader.getField(4, pbErasureInfo).isErr: return failure("Unable to decode `erasureInfo` from manifest!") @@ -140,6 +149,7 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = var self = Manifest( rootHash: rootHashCid.some, + originalBytes: originalBytes.int, blockSize: blockSize.int, blocks: blocks, hcodec: (? rootHashCid.mhash.mapFailure).mcodec, @@ -153,6 +163,7 @@ func decode*(_: DagPBCoder, data: openArray[byte]): ?!Manifest = self.originalCid = ? Cid.init(originalCid).mapFailure self.originalLen = originalLen.int + ? self.verify() self.success proc encode*( diff --git a/codex/manifest/manifest.nim b/codex/manifest/manifest.nim index ebfe039a..8be8d4bb 100644 --- a/codex/manifest/manifest.nim +++ b/codex/manifest/manifest.nim @@ -7,6 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +# This module defines all operations on Manifest + import pkg/upraises push: {.upraises: [].} @@ -18,16 +20,18 @@ import pkg/questionable/results import pkg/chronicles import ../errors +import ../utils import ../blocktype import ./types import ./coders +############################################################ +# Operations on block list +############################################################ + func len*(self: Manifest): int = self.blocks.len -func size*(self: Manifest): int = - self.blocks.len * self.blockSize - func `[]`*(self: Manifest, i: Natural): Cid = self.blocks[i] @@ -43,9 +47,11 @@ func `[]=`*(self: Manifest, i: BackwardsIndex, item: Cid) = self.blocks[self.len - i.int] = item proc add*(self: Manifest, cid: Cid) = + assert not self.protected # we expect that protected manifests are created with properly-sized self.blocks self.rootHash = Cid.none trace "Adding cid to manifest", cid self.blocks.add(cid) + self.originalBytes = self.blocks.len * self.blockSize iterator items*(self: Manifest): Cid = for b in self.blocks: @@ -58,6 +64,44 @@ iterator pairs*(self: Manifest): tuple[key: int, val: Cid] = func contains*(self: Manifest, cid: Cid): bool = cid in self.blocks + +############################################################ +# Various sizes and verification +############################################################ + +func bytes*(self: Manifest, pad = true): int = + ## Compute how many bytes corresponding StoreStream(Manifest, pad) will return + if pad or self.protected: + self.len * self.blockSize + else: + self.originalBytes + +func rounded*(self: Manifest): int = + ## Number of data blocks in *protected* manifest including padding at the end + roundUp(self.originalLen, self.K) + +func steps*(self: Manifest): int = + ## Number of EC groups in *protected* manifest + divUp(self.originalLen, self.K) + +func verify*(self: Manifest): ?!void = + ## Check manifest correctness + ## + let originalLen = (if self.protected: self.originalLen else: self.len) + + if divUp(self.originalBytes, self.blockSize) != originalLen: + return failure newException(CodexError, "Broken manifest: wrong originalBytes") + + if self.protected and (self.len != self.steps * (self.K + self.M)): + return failure newException(CodexError, "Broken manifest: wrong originalLen") + + return success() + + +############################################################ +# Cid computation +############################################################ + template hashBytes(mh: MultiHash): seq[byte] = ## get the hash bytes of a multihash object ## @@ -95,15 +139,6 @@ proc makeRoot*(self: Manifest): ?!void = success() -func rounded*(self: Manifest): int = - if (self.originalLen mod self.K) != 0: - return self.originalLen + (self.K - (self.originalLen mod self.K)) - - self.originalLen - -func steps*(self: Manifest): int = - self.rounded div self.K # number of blocks per row - proc cid*(self: Manifest): ?!Cid = ## Generate a root hash using the treehash algorithm ## @@ -113,6 +148,11 @@ proc cid*(self: Manifest): ?!Cid = (!self.rootHash).success + +############################################################ +# Constructors +############################################################ + proc new*( T: type Manifest, blocks: openArray[Cid] = [], @@ -133,6 +173,7 @@ proc new*( codec: codec, hcodec: hcodec, blockSize: blockSize, + originalBytes: blocks.len * blockSize, protected: protected).success proc new*( @@ -148,6 +189,7 @@ proc new*( version: manifest.version, codec: manifest.codec, hcodec: manifest.hcodec, + originalBytes: manifest.originalBytes, blockSize: manifest.blockSize, protected: true, K: K, M: M, @@ -170,6 +212,7 @@ proc new*( .catch .get() + ? self.verify() self.success proc new*( diff --git a/codex/manifest/types.nim b/codex/manifest/types.nim index c189da0f..11eba72c 100644 --- a/codex/manifest/types.nim +++ b/codex/manifest/types.nim @@ -7,6 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +# This module defines Manifest and all related types + import std/tables import pkg/libp2p import pkg/questionable @@ -25,9 +27,10 @@ const type Manifest* = ref object of RootObj - rootHash*: ?Cid # root (tree) hash of the contained data set - blockSize*: int # size of each contained block (might not be needed if blocks are len-prefixed) - blocks*: seq[Cid] # block Cid + rootHash*: ?Cid # Root (tree) hash of the contained data set + originalBytes*: int # Exact size of the original (uploaded) file + blockSize*: int # Size of each contained block (might not be needed if blocks are len-prefixed) + blocks*: seq[Cid] # Block Cid version*: CidVersion # Cid version hcodec*: MultiCodec # Multihash codec codec*: MultiCodec # Data set codec diff --git a/codex/node.nim b/codex/node.nim index bf82243b..620eb61f 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -115,28 +115,33 @@ proc fetchBatched*( proc retrieve*( node: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async.} = - ## Retrieve a block or manifest + ## Retrieve by Cid a single block or an entire dataset described by manifest ## if manifest =? (await node.fetchManifest(cid)): if manifest.protected: + # Retrieve, decode and save to the local store all EС groups proc erasureJob(): Future[void] {.async.} = try: - without res =? (await node.erasure.decode(manifest)), error: # spawn an erasure decoding job + # Spawn an erasure decoding job + without res =? (await node.erasure.decode(manifest)), error: trace "Unable to erasure decode manifest", cid, exc = error.msg except CatchableError as exc: trace "Exception decoding manifest", cid - + # asyncSpawn erasureJob() else: + # Prefetch the entire dataset into the local store proc prefetchBlocks() {.async, raises: [Defect].} = try: discard await node.fetchBatched(manifest) except CatchableError as exc: trace "Exception prefetching blocks", exc = exc.msg - + # asyncSpawn prefetchBlocks() - return LPStream(StoreStream.new(node.blockStore, manifest)).success + # + # Retrieve all blocks of the dataset sequentially from the local store or network + return LPStream(StoreStream.new(node.blockStore, manifest, pad = false)).success let stream = BufferStream.new() @@ -158,14 +163,18 @@ proc retrieve*( proc store*( node: CodexNodeRef, - stream: LPStream): Future[?!Cid] {.async.} = + stream: LPStream, + blockSize = BlockSize): Future[?!Cid] {.async.} = + ## Save stream contents as dataset with given blockSize + ## to nodes's BlockStore, and return Cid of its manifest + ## trace "Storing data" - without var blockManifest =? Manifest.new(): + without var blockManifest =? Manifest.new(blockSize = blockSize): return failure("Unable to create Block Set") - let - chunker = LPStreamChunker.new(stream, chunkSize = BlockSize) + # Manifest and chunker should use the same blockSize + let chunker = LPStreamChunker.new(stream, chunkSize = blockSize) try: while ( @@ -189,6 +198,7 @@ proc store*( await stream.close() # Generate manifest + blockManifest.originalBytes = chunker.offset # store the exact file size without data =? blockManifest.encode(): return failure( newException(CodexError, "Could not generate dataset manifest!")) diff --git a/codex/streams/storestream.nim b/codex/streams/storestream.nim index 8d533638..e90d09f1 100644 --- a/codex/streams/storestream.nim +++ b/codex/streams/storestream.nim @@ -30,25 +30,29 @@ logScope: topics = "dagger storestream" type + # Make SeekableStream from a sequence of blocks stored in Manifest + # (only original file data - see StoreStream.size) StoreStream* = ref object of SeekableStream - store*: BlockStore - manifest*: Manifest - emptyBlock*: seq[byte] + store*: BlockStore # Store where to lookup block contents + manifest*: Manifest # List of block CIDs + pad*: bool # Pad last block to manifest.blockSize? proc new*( T: type StoreStream, store: BlockStore, - manifest: Manifest): T = + manifest: Manifest, + pad = true): T = + result = T( store: store, manifest: manifest, - offset: 0, - emptyBlock: newSeq[byte](manifest.blockSize)) + pad: pad, + offset: 0) result.initStream() method `size`*(self: StoreStream): int = - self.manifest.len * self.manifest.blockSize + bytes(self.manifest, self.pad) proc `size=`*(self: StoreStream, size: int) {.error: "Setting the size is forbidden".} = @@ -78,7 +82,7 @@ method readOnce*( let blockNum = self.offset div self.manifest.blockSize blockOffset = self.offset mod self.manifest.blockSize - readBytes = min(nbytes - read, self.manifest.blockSize - blockOffset) + readBytes = min([self.size - self.offset, nbytes - read, self.manifest.blockSize - blockOffset]) # Read contents of block `blockNum` without blk =? await self.store.getBlock(self.manifest[blockNum]), error: @@ -87,13 +91,10 @@ method readOnce*( trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset # Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf - copyMem( - pbytes.offset(read), - if blk.isEmpty: - self.emptyBlock[blockOffset].addr - else: - blk.data[blockOffset].addr, - readBytes) + if blk.isEmpty: + zeroMem(pbytes.offset(read), readBytes) + else: + copyMem(pbytes.offset(read), blk.data[blockOffset].addr, readBytes) # Update current positions in the stream and outbuf self.offset += readBytes @@ -102,12 +103,6 @@ method readOnce*( return read method closeImpl*(self: StoreStream) {.async.} = - try: - trace "Closing StoreStream" - self.offset = self.manifest.len * self.manifest.blockSize # set Eof - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "Error closing StoreStream", msg = exc.msg - + trace "Closing StoreStream" + self.offset = self.size # set Eof await procCall LPStream(self).closeImpl() diff --git a/codex/utils.nim b/codex/utils.nim index 8d561d40..e5f21ef2 100644 --- a/codex/utils.nim +++ b/codex/utils.nim @@ -2,3 +2,14 @@ import ./utils/asyncheapqueue import ./utils/fileutils export asyncheapqueue, fileutils + + +func divUp*[T](a, b : T): T = + ## Division with result rounded up (rather than truncated as in 'div') + assert(b != 0) + if a==0: 0 else: ((a - 1) div b) + 1 + +func roundUp*[T](a, b : T): T = + ## Round up 'a' to the next value divisible by 'b' + divUp(a,b) * b + diff --git a/tests/codex/helpers/randomchunker.nim b/tests/codex/helpers/randomchunker.nim index 87726953..0d0b7b4f 100644 --- a/tests/codex/helpers/randomchunker.nim +++ b/tests/codex/helpers/randomchunker.nim @@ -13,7 +13,6 @@ type proc new*( T: type RandomChunker, rng: Rng, - kind = ChunkerType.FixedChunker, chunkSize = DefaultChunkSize, size: int, pad = false): T = @@ -43,7 +42,6 @@ proc new*( return read Chunker.new( - kind = ChunkerType.FixedChunker, reader = reader, pad = pad, chunkSize = chunkSize) diff --git a/tests/codex/testchunking.nim b/tests/codex/testchunking.nim index 626c332c..3a7f1b4f 100644 --- a/tests/codex/testchunking.nim +++ b/tests/codex/testchunking.nim @@ -11,12 +11,14 @@ suite "Chunking": let contents = [1.byte, 2, 3, 4, 5, 6, 7, 8, 9, 0] proc reader(data: ChunkBuffer, len: int): Future[int] {.gcsafe, async, raises: [Defect].} = - if offset >= contents.len: + + let read = min(contents.len - offset, len) + if read == 0: return 0 - copyMem(data, unsafeAddr contents[offset], len) - offset += 2 - return len + copyMem(data, unsafeAddr contents[offset], read) + offset += read + return read let chunker = Chunker.new( reader = reader, @@ -29,9 +31,9 @@ suite "Chunking": (await chunker.getBytes()) == [7.byte, 8] (await chunker.getBytes()) == [9.byte, 0] (await chunker.getBytes()) == [] + chunker.offset == offset test "should chunk LPStream": - var offset = 0 let stream = BufferStream.new() let chunker = LPStreamChunker.new( stream = stream, @@ -51,6 +53,7 @@ suite "Chunking": (await chunker.getBytes()) == [7.byte, 8] (await chunker.getBytes()) == [9.byte, 0] (await chunker.getBytes()) == [] + chunker.offset == 10 await writerFut @@ -69,4 +72,7 @@ suite "Chunking": check buff.len <= fileChunker.chunkSize data.add(buff) - check string.fromBytes(data) == readFile(path) + check: + string.fromBytes(data) == readFile(path) + fileChunker.offset == data.len + diff --git a/tests/codex/testnode.nim b/tests/codex/testnode.nim index b9eef95c..fb14d8c6 100644 --- a/tests/codex/testnode.nim +++ b/tests/codex/testnode.nim @@ -1,5 +1,6 @@ import std/os import std/options +import std/math import pkg/asynctest import pkg/chronos @@ -39,6 +40,39 @@ suite "Test Node": pendingBlocks: PendingBlocksManager discovery: DiscoveryEngine + proc fetch(T: type Manifest, chunker: Chunker): Future[Manifest] {.async.} = + # Collect blocks from Chunker into Manifest + var + manifest = Manifest.new().tryGet() + + while ( + let chunk = await chunker.getBytes(); + chunk.len > 0): + + let blk = bt.Block.new(chunk).tryGet() + (await localStore.putBlock(blk)).tryGet() + manifest.add(blk.cid) + + return manifest + + proc retrieve(cid: Cid): Future[seq[byte]] {.async.} = + # Retrieve an entire file contents by file Cid + let + oddChunkSize = math.trunc(BlockSize/1.359).int # Let's check that node.retrieve can correctly rechunk data + stream = (await node.retrieve(cid)).tryGet() + var + data: seq[byte] + + while not stream.atEof: + var + buf = newSeq[byte](oddChunkSize) + res = await stream.readOnce(addr buf[0], oddChunkSize) + check res <= oddChunkSize + buf.setLen(res) + data &= buf + + return data + setup: file = open(path.splitFile().dir /../ "fixtures" / "test.jpg") chunker = FileChunker.new(file = file, chunkSize = BlockSize) @@ -61,18 +95,9 @@ suite "Test Node": await node.stop() test "Fetch Manifest": - var - manifest = Manifest.new().tryGet() - - while ( - let chunk = await chunker.getBytes(); - chunk.len > 0): - - let blk = bt.Block.new(chunk).tryGet() - (await localStore.putBlock(blk)).tryGet() - manifest.add(blk.cid) - let + manifest = await Manifest.fetch(chunker) + manifestBlock = bt.Block.new( manifest.encode().tryGet(), codec = DagPBCodec @@ -88,115 +113,51 @@ suite "Test Node": fetched.blocks == manifest.blocks test "Block Batching": - var - manifest = Manifest.new().tryGet() - - while ( - let chunk = await chunker.getBytes(); - chunk.len > 0): - - let blk = bt.Block.new(chunk).tryGet() - (await localStore.putBlock(blk)).tryGet() - manifest.add(blk.cid) - let - manifestBlock = bt.Block.new( - manifest.encode().tryGet(), - codec = DagPBCodec - ).tryGet() + manifest = await Manifest.fetch(chunker) - (await node.fetchBatched( - manifest, - batchSize = 3, - proc(blocks: seq[bt.Block]) {.gcsafe, async.} = - check blocks.len > 0 and blocks.len <= 3 - )).tryGet() + for batchSize in 1..12: + (await node.fetchBatched( + manifest, + batchSize = batchSize, + proc(blocks: seq[bt.Block]) {.gcsafe, async.} = + check blocks.len > 0 and blocks.len <= batchSize + )).tryGet() - (await node.fetchBatched( - manifest, - batchSize = 6, - proc(blocks: seq[bt.Block]) {.gcsafe, async.} = - check blocks.len > 0 and blocks.len <= 6 - )).tryGet() - - (await node.fetchBatched( - manifest, - batchSize = 9, - proc(blocks: seq[bt.Block]) {.gcsafe, async.} = - check blocks.len > 0 and blocks.len <= 9 - )).tryGet() - - (await node.fetchBatched( - manifest, - batchSize = 11, - proc(blocks: seq[bt.Block]) {.gcsafe, async.} = - check blocks.len > 0 and blocks.len <= 11 - )).tryGet() - - test "Store Data Stream": + test "Store and retrieve Data Stream": let stream = BufferStream.new() storeFut = node.store(stream) - + oddChunkSize = math.trunc(BlockSize/1.618).int # Let's check that node.store can correctly rechunk these odd chunks + oddChunker = FileChunker.new(file = file, chunkSize = oddChunkSize, pad = false) # TODO: doesn't work with pad=tue var - manifest = Manifest.new().tryGet() + original: seq[byte] try: while ( - let chunk = await chunker.getBytes(); + let chunk = await oddChunker.getBytes(); chunk.len > 0): + original &= chunk await stream.pushData(chunk) - manifest.add(bt.Block.new(chunk).tryGet().cid) finally: await stream.pushEof() await stream.close() let manifestCid = (await storeFut).tryGet() - check: (await localStore.hasBlock(manifestCid)).tryGet() - var + let manifestBlock = (await localStore.getBlock(manifestCid)).tryGet() localManifest = Manifest.decode(manifestBlock).tryGet() - check: - manifest.len == localManifest.len - manifest.cid == localManifest.cid - - test "Retrieve Data Stream": - var - manifest = Manifest.new().tryGet() - original: seq[byte] - - while ( - let chunk = await chunker.getBytes(); - chunk.len > 0): - - let blk = bt.Block.new(chunk).tryGet() - original &= chunk - (await localStore.putBlock(blk)).tryGet() - manifest.add(blk.cid) - let - manifestBlock = bt.Block.new( - manifest.encode().tryGet(), - codec = DagPBCodec - ).tryGet() - - (await localStore.putBlock(manifestBlock)).tryGet() - - let stream = (await node.retrieve(manifestBlock.cid)).tryGet() - var data: seq[byte] - while not stream.atEof: - var - buf = newSeq[byte](BlockSize) - res = await stream.readOnce(addr buf[0], BlockSize div 2) - buf.setLen(res) - data &= buf - - check data == original + data = await retrieve(manifestCid) + check: + data.len == localManifest.originalBytes + data.len == original.len + sha256.digest(data) == sha256.digest(original) test "Retrieve One Block": let diff --git a/vendor/questionable b/vendor/questionable index 82408a5c..6018fd43 160000 --- a/vendor/questionable +++ b/vendor/questionable @@ -1 +1 @@ -Subproject commit 82408a5ca2c24eca411d6774cf83816877170627 +Subproject commit 6018fd43e033d5a5310faa45bcaa1b44049469a4