From 472e1e6fc3a8b78a354b86f1bc070e86c89d57b0 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Thu, 17 Mar 2022 07:56:46 -0600 Subject: [PATCH] Store stream (#55) * make manifest a ref object * capitalize tests * add storestream and tests * fix tests * run storestream tests --- dagger/manifest/manifest.nim | 14 ++--- dagger/node.nim | 2 +- dagger/stores/cachestore.nim | 3 +- dagger/storestream.nim | 95 ++++++++++++++++++++++++++++ tests/dagger/testasyncheapqueue.nim | 36 +++++------ tests/dagger/testmanifest.nim | 7 +-- tests/dagger/testnode.nim | 4 +- tests/dagger/teststorestream.nim | 97 +++++++++++++++++++++++++++++ tests/testAll.nim | 1 + 9 files changed, 225 insertions(+), 34 deletions(-) create mode 100644 dagger/storestream.nim create mode 100644 tests/dagger/teststorestream.nim diff --git a/dagger/manifest/manifest.nim b/dagger/manifest/manifest.nim index f4268d36..154c7960 100644 --- a/dagger/manifest/manifest.nim +++ b/dagger/manifest/manifest.nim @@ -45,7 +45,7 @@ template EmptyDigests: untyped = emptyDigests type - Manifest* = object of RootObj + Manifest* = ref object of RootObj rootHash*: ?Cid # root (tree) hash of the contained data set blockSize*: int # size of each contained block (might not be needed if blocks are len-prefixed) blocks*: seq[Cid] # block Cid @@ -69,11 +69,11 @@ func `[]=`*(self: var Manifest, i: Natural, item: Cid) = func `[]`*(self: Manifest, i: BackwardsIndex): Cid = self.blocks[self.len - i.int] -func `[]=`*(self: var Manifest, i: BackwardsIndex, item: Cid) = +func `[]=`*(self: Manifest, i: BackwardsIndex, item: Cid) = self.rootHash = Cid.none self.blocks[self.len - i.int] = item -proc add*(self: var Manifest, cid: Cid) = +proc add*(self: Manifest, cid: Cid) = self.rootHash = Cid.none trace "Adding cid to manifest", cid self.blocks.add(cid) @@ -91,7 +91,7 @@ template hashBytes(mh: MultiHash): seq[byte] = mh.data.buffer[mh.dpos..(mh.dpos + mh.size - 1)] -proc makeRoot*(self: var Manifest): ?!void = +proc makeRoot*(self: Manifest): ?!void = ## Create a tree hash root of the contained ## block hashes ## @@ -122,7 +122,7 @@ proc makeRoot*(self: var Manifest): ?!void = ok() -proc cid*(self: var Manifest): ?!Cid = +proc cid*(self: Manifest): ?!Cid = ## Generate a root hash using the treehash algorithm ## @@ -131,7 +131,7 @@ proc cid*(self: var Manifest): ?!Cid = (!self.rootHash).success -proc init*( +proc new*( T: type Manifest, blocks: openArray[Cid] = [], version = CIDv1, @@ -152,7 +152,7 @@ proc init*( blockSize: blockSize ).success -proc init*( +proc new*( T: type Manifest, data: openArray[byte]): ?!T = Manifest.decode(data) diff --git a/dagger/node.nim b/dagger/node.nim index 13d0c876..7a7ad3ea 100644 --- a/dagger/node.nim +++ b/dagger/node.nim @@ -118,7 +118,7 @@ proc store*( stream: LPStream): Future[?!Cid] {.async.} = trace "Storing data" - without var blockManifest =? Manifest.init(): + without var blockManifest =? Manifest.new(): return failure("Unable to create Block Set") let diff --git a/dagger/stores/cachestore.nim b/dagger/stores/cachestore.nim index cd5b20dd..2bf5ab58 100644 --- a/dagger/stores/cachestore.nim +++ b/dagger/stores/cachestore.nim @@ -115,8 +115,7 @@ func new*( store = CacheStore( cache: cache, currentSize: currentSize, - size: cacheSize - ) + size: cacheSize) for blk in blocks: discard store.putBlockSync(blk) diff --git a/dagger/storestream.nim b/dagger/storestream.nim new file mode 100644 index 00000000..82e1a062 --- /dev/null +++ b/dagger/storestream.nim @@ -0,0 +1,95 @@ +## Nim-Dagger +## Copyright (c) 2022 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [Defect].} + +import pkg/libp2p +import pkg/chronos +import pkg/chronicles +import pkg/stew/ptrops + +import ./stores +import ./manifest +import ./blocktype + +export stores + +logScope: + topics = "dagger storestream" + +type + ReadPattern* {.pure.} = enum + Sequential, + Grid + + StoreStream* = ref object of LPStream + store*: BlockStore + manifest*: Manifest + pattern*: ReadPattern + offset*: int + +proc init*( + T: type StoreStream, + store: BlockStore, + manifest: Manifest, + pattern = ReadPattern.Sequential): T = + result = T( + store: store, + manifest: manifest, + pattern: pattern, + offset: 0) + + result.initStream() + +method readOnce*( + self: StoreStream, + pbytes: pointer, + nbytes: int): Future[int] {.async.} = + + if self.atEof: + raise newLPStreamEOFError() + + var + read = 0 + + while read < nbytes and self.atEof.not: + let + pos = self.offset div self.manifest.blockSize + + let + blk = (await self.store.getBlock(self.manifest[pos])).tryGet() + blockOffset = if self.offset >= self.manifest.blockSize: + self.offset mod self.manifest.blockSize + else: + self.offset + + readBytes = if (nbytes - read) >= (self.manifest.blockSize - blockOffset): + self.manifest.blockSize - blockOffset + else: + min(nbytes - read, self.manifest.blockSize) + + copyMem(pbytes.offset(read), unsafeAddr blk.data[blockOffset], readBytes) + self.offset += readBytes + read += readBytes + + return read + +method atEof*(self: StoreStream): bool = + self.offset >= self.manifest.len * self.manifest.blockSize + +method closeImpl*(self: StoreStream) {.async.} = + try: + trace "Closing StoreStream", self + self.offset = self.manifest.len * self.manifest.blockSize # set Eof + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "Error closing StoreStream", s, msg = exc.msg + + await procCall LPStream(self).closeImpl() diff --git a/tests/dagger/testasyncheapqueue.nim b/tests/dagger/testasyncheapqueue.nim index 645cfea5..7837e2e0 100644 --- a/tests/dagger/testasyncheapqueue.nim +++ b/tests/dagger/testasyncheapqueue.nim @@ -21,8 +21,8 @@ proc toSortedSeq[T](h: AsyncHeapQueue[T], queueType = QueueType.Min): seq[T] = while tmp.len > 0: result.add(popNoWait(tmp).tryGet()) -suite "synchronous tests": - test "test pushNoWait - Min": +suite "Synchronous tests": + test "Test pushNoWait - Min": var heap = newAsyncHeapQueue[int]() let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] for item in data: @@ -31,7 +31,7 @@ suite "synchronous tests": check heap[0] == 0 check heap.toSortedSeq == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] - test "test pushNoWait - Max": + test "Test pushNoWait - Max": var heap = newAsyncHeapQueue[int](queueType = QueueType.Max) let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] for item in data: @@ -40,7 +40,7 @@ suite "synchronous tests": check heap[0] == 9 check heap.toSortedSeq(QueueType.Max) == @[9, 8, 7, 6, 5, 4, 3, 2, 1, 0] - test "test popNoWait": + test "Test popNoWait": var heap = newAsyncHeapQueue[int]() let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] for item in data: @@ -54,7 +54,7 @@ suite "synchronous tests": check res == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] - test "test popNoWait - Max": + test "Test popNoWait - Max": var heap = newAsyncHeapQueue[int](queueType = QueueType.Max) let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] for item in data: @@ -68,7 +68,7 @@ suite "synchronous tests": check res == @[9, 8, 7, 6, 5, 4, 3, 2, 1, 0] - test "test del": # Test del + test "Test del": # Test del var heap = newAsyncHeapQueue[int]() let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] for item in data: @@ -89,7 +89,7 @@ suite "synchronous tests": heap.del(heap.find(2)) check heap.toSortedSeq == @[1, 3, 4, 8, 9] - test "del last": # Test del last + test "Test del last": # Test del last var heap = newAsyncHeapQueue[int]() let data = [1, 2, 3] for item in data: @@ -104,20 +104,20 @@ suite "synchronous tests": heap.del(0) check heap.toSortedSeq == newSeq[int]() # empty seq has no type - test "should throw popping from an empty queue": + test "Should throw popping from an empty queue": var heap = newAsyncHeapQueue[int]() let err = heap.popNoWait() check err.isErr check err.error == AsyncHQErrors.Empty - test "should throw pushing to an full queue": + test "Should throw pushing to an full queue": var heap = newAsyncHeapQueue[int](1) check heap.pushNoWait(1).isOk let err = heap.pushNoWait(2) check err.isErr check err.error == AsyncHQErrors.Full - test "test clear": + test "Test clear": var heap = newAsyncHeapQueue[int]() let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] for item in data: @@ -127,9 +127,9 @@ suite "synchronous tests": heap.clear() check heap.len == 0 -suite "asynchronous tests": +suite "Asynchronous Tests": - test "test push": + test "Test push": var heap = newAsyncHeapQueue[int]() let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] for item in data: @@ -137,7 +137,7 @@ suite "asynchronous tests": check heap[0] == 0 check heap.toSortedSeq == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] - test "test push and pop with maxSize": + test "Test push and pop with maxSize": var heap = newAsyncHeapQueue[int](5) let data = [1, 9, 5, 3, 7, 4, 2] @@ -160,7 +160,7 @@ suite "asynchronous tests": check (await heap.pop) == 2 check (await heap.pop) == 4 - test "test update": + test "Test update": var heap = newAsyncHeapQueue[Task](5) for item in [("a", 4), ("b", 3), ("c", 2)]: @@ -170,7 +170,7 @@ suite "asynchronous tests": check heap.update((name: "a", priority: 1)) check heap[0] == (name: "a", priority: 1) - test "test pushOrUpdate - update": + test "Test pushOrUpdate - update": var heap = newAsyncHeapQueue[Task](3) for item in [("a", 4), ("b", 3), ("c", 2)]: @@ -180,7 +180,7 @@ suite "asynchronous tests": await heap.pushOrUpdate((name: "a", priority: 1)) check heap[0] == (name: "a", priority: 1) - test "test pushOrUpdate - push": + test "Test pushOrUpdate - push": var heap = newAsyncHeapQueue[Task](2) for item in [("a", 4), ("b", 3)]: @@ -194,7 +194,7 @@ suite "asynchronous tests": check heap[0] == (name: "c", priority: 2) # check order again - test "test pop": + test "Test pop": var heap = newAsyncHeapQueue[int]() let data = [1, 3, 5, 7, 9, 2, 4, 6, 8, 0] for item in data: @@ -206,7 +206,7 @@ suite "asynchronous tests": check res == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] - test "test delete": + test "Test delete": var heap = newAsyncHeapQueue[Task]() let data = ["d", "b", "c", "a", "h", "e", "f", "g"] diff --git a/tests/dagger/testmanifest.nim b/tests/dagger/testmanifest.nim index 820a7522..9ad8c30e 100644 --- a/tests/dagger/testmanifest.nim +++ b/tests/dagger/testmanifest.nim @@ -15,7 +15,7 @@ import ./helpers suite "Manifest": test "Should produce valid tree hash checksum": - without var manifest =? Manifest.init( + var manifest = Manifest.new( blocks = @[ Block.init("Block 1".toBytes).tryGet().cid, Block.init("Block 2".toBytes).tryGet().cid, @@ -24,8 +24,7 @@ suite "Manifest": Block.init("Block 5".toBytes).tryGet().cid, Block.init("Block 6".toBytes).tryGet().cid, Block.init("Block 7".toBytes).tryGet().cid, - ]): - fail() + ]).tryGet() let checksum = @[18.byte, 32, 227, 176, 196, 66, 152, @@ -47,7 +46,7 @@ suite "Manifest": ) var - blocksManifest = Manifest.init(blocks).tryGet() + blocksManifest = Manifest.new(blocks).tryGet() let e = blocksManifest.encode().tryGet() diff --git a/tests/dagger/testnode.nim b/tests/dagger/testnode.nim index 557549fa..3536d9e3 100644 --- a/tests/dagger/testnode.nim +++ b/tests/dagger/testnode.nim @@ -56,7 +56,7 @@ suite "Test Node": storeFut = node.store(stream) var - manifest = Manifest.init().tryGet() + manifest = Manifest.new().tryGet() try: while ( @@ -84,7 +84,7 @@ suite "Test Node": test "Retrieve Data Stream": var - manifest = Manifest.init().tryGet() + manifest = Manifest.new().tryGet() original: seq[byte] while ( diff --git a/tests/dagger/teststorestream.nim b/tests/dagger/teststorestream.nim new file mode 100644 index 00000000..12636522 --- /dev/null +++ b/tests/dagger/teststorestream.nim @@ -0,0 +1,97 @@ +import pkg/chronos +import pkg/asynctest +import pkg/libp2p +import pkg/questionable/results + +import ./helpers + +import pkg/dagger/storestream +import pkg/dagger/stores +import pkg/dagger/manifest +import pkg/dagger/chunker +import pkg/dagger/rng + +suite "StoreStream": + var + manifest: Manifest + store: BlockStore + stream: StoreStream + + let + data = [ + [byte 0, 1, 2, 3, 4, 5, 6, 7, 8, 9], + [byte 10, 11, 12, 13, 14, 15, 16, 17, 18, 19], + [byte 20, 21, 22, 23, 24, 25, 26, 27, 28, 29], + [byte 30, 31, 32, 33, 34, 35, 36, 37, 38, 39], + [byte 40, 41, 42, 43, 44, 45, 46, 47, 48, 49], + [byte 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], + [byte 60, 61, 62, 63, 64, 65, 66, 67, 68, 69], + [byte 70, 71, 72, 73, 74, 75, 76, 77, 78, 79], + [byte 80, 81, 82, 83, 84, 85, 86, 87, 88, 89], + [byte 90, 91, 92, 93, 94, 95, 96, 97, 98, 99], + ] + + setup: + store = CacheStore.new() + manifest = Manifest.new(blockSize = 10).tryGet() + stream = StoreStream.init(store, manifest) + + for d in data: + let + blk = Block.init(d).tryGet() + + manifest.add(blk.cid) + if not (await store.putBlock(blk)): + raise newException(CatchableError, "Unable to store block " & $blk.cid) + + test "Read all blocks < blockSize": + var + buf = newSeq[byte](8) + + while not stream.atEof: + let + read = (await stream.readOnce(addr buf[0], buf.len)) + + if stream.atEof.not: + check read == 8 + else: + check read == 4 + + test "Read all blocks == blockSize": + var + buf = newSeq[byte](10) + + while not stream.atEof: + let + read = (await stream.readOnce(addr buf[0], buf.len)) + + check read == 10 + + test "Read all blocks > blockSize": + var + buf = newSeq[byte](11) + + while not stream.atEof: + let + read = (await stream.readOnce(addr buf[0], buf.len)) + + if stream.atEof.not: + check read == 11 + else: + check read == 1 + + test "Read exact bytes within block boundary": + var + buf = newSeq[byte](5) + + await stream.readExactly(addr buf[0], 5) + + check buf == [byte 0, 1, 2, 3, 4] + + test "Read exact bytes outside of block boundary": + var + buf = newSeq[byte](15) + + await stream.readExactly(addr buf[0], 15) + + check buf == [byte 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14] diff --git a/tests/testAll.nim b/tests/testAll.nim index f9e7167e..37914987 100644 --- a/tests/testAll.nim +++ b/tests/testAll.nim @@ -4,5 +4,6 @@ import ./dagger/testasyncheapqueue import ./dagger/testchunking import ./dagger/testmanifest import ./dagger/testnode +import ./dagger/teststorestream {.warning[UnusedImport]: off.}