From b1264a9c8712e3439bd47f23fcc16d1b521c5797 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 16 Mar 2022 15:48:10 -0600 Subject: [PATCH] add storestream and tests --- dagger/storestream.nim | 95 +++++++++++++++++++++++++++++++ tests/dagger/teststorestream.nim | 97 ++++++++++++++++++++++++++++++++ 2 files changed, 192 insertions(+) create mode 100644 dagger/storestream.nim create mode 100644 tests/dagger/teststorestream.nim diff --git a/dagger/storestream.nim b/dagger/storestream.nim new file mode 100644 index 00000000..82e1a062 --- /dev/null +++ b/dagger/storestream.nim @@ -0,0 +1,95 @@ +## Nim-Dagger +## Copyright (c) 2022 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [Defect].} + +import pkg/libp2p +import pkg/chronos +import pkg/chronicles +import pkg/stew/ptrops + +import ./stores +import ./manifest +import ./blocktype + +export stores + +logScope: + topics = "dagger storestream" + +type + ReadPattern* {.pure.} = enum + Sequential, + Grid + + StoreStream* = ref object of LPStream + store*: BlockStore + manifest*: Manifest + pattern*: ReadPattern + offset*: int + +proc init*( + T: type StoreStream, + store: BlockStore, + manifest: Manifest, + pattern = ReadPattern.Sequential): T = + result = T( + store: store, + manifest: manifest, + pattern: pattern, + offset: 0) + + result.initStream() + +method readOnce*( + self: StoreStream, + pbytes: pointer, + nbytes: int): Future[int] {.async.} = + + if self.atEof: + raise newLPStreamEOFError() + + var + read = 0 + + while read < nbytes and self.atEof.not: + let + pos = self.offset div self.manifest.blockSize + + let + blk = (await self.store.getBlock(self.manifest[pos])).tryGet() + blockOffset = if self.offset >= self.manifest.blockSize: + self.offset mod self.manifest.blockSize + else: + self.offset + + readBytes = if (nbytes - read) >= (self.manifest.blockSize - blockOffset): + self.manifest.blockSize - blockOffset + else: + min(nbytes - read, self.manifest.blockSize) + + copyMem(pbytes.offset(read), unsafeAddr blk.data[blockOffset], readBytes) + self.offset += readBytes + read += readBytes + + return read + +method atEof*(self: StoreStream): bool = + self.offset >= self.manifest.len * self.manifest.blockSize + +method closeImpl*(self: StoreStream) {.async.} = + try: + trace "Closing StoreStream", self + self.offset = self.manifest.len * self.manifest.blockSize # set Eof + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "Error closing StoreStream", s, msg = exc.msg + + await procCall LPStream(self).closeImpl() diff --git a/tests/dagger/teststorestream.nim b/tests/dagger/teststorestream.nim new file mode 100644 index 00000000..12636522 --- /dev/null +++ b/tests/dagger/teststorestream.nim @@ -0,0 +1,97 @@ +import pkg/chronos +import pkg/asynctest +import pkg/libp2p +import pkg/questionable/results + +import ./helpers + +import pkg/dagger/storestream +import pkg/dagger/stores +import pkg/dagger/manifest +import pkg/dagger/chunker +import pkg/dagger/rng + +suite "StoreStream": + var + manifest: Manifest + store: BlockStore + stream: StoreStream + + let + data = [ + [byte 0, 1, 2, 3, 4, 5, 6, 7, 8, 9], + [byte 10, 11, 12, 13, 14, 15, 16, 17, 18, 19], + [byte 20, 21, 22, 23, 24, 25, 26, 27, 28, 29], + [byte 30, 31, 32, 33, 34, 35, 36, 37, 38, 39], + [byte 40, 41, 42, 43, 44, 45, 46, 47, 48, 49], + [byte 50, 51, 52, 53, 54, 55, 56, 57, 58, 59], + [byte 60, 61, 62, 63, 64, 65, 66, 67, 68, 69], + [byte 70, 71, 72, 73, 74, 75, 76, 77, 78, 79], + [byte 80, 81, 82, 83, 84, 85, 86, 87, 88, 89], + [byte 90, 91, 92, 93, 94, 95, 96, 97, 98, 99], + ] + + setup: + store = CacheStore.new() + manifest = Manifest.new(blockSize = 10).tryGet() + stream = StoreStream.init(store, manifest) + + for d in data: + let + blk = Block.init(d).tryGet() + + manifest.add(blk.cid) + if not (await store.putBlock(blk)): + raise newException(CatchableError, "Unable to store block " & $blk.cid) + + test "Read all blocks < blockSize": + var + buf = newSeq[byte](8) + + while not stream.atEof: + let + read = (await stream.readOnce(addr buf[0], buf.len)) + + if stream.atEof.not: + check read == 8 + else: + check read == 4 + + test "Read all blocks == blockSize": + var + buf = newSeq[byte](10) + + while not stream.atEof: + let + read = (await stream.readOnce(addr buf[0], buf.len)) + + check read == 10 + + test "Read all blocks > blockSize": + var + buf = newSeq[byte](11) + + while not stream.atEof: + let + read = (await stream.readOnce(addr buf[0], buf.len)) + + if stream.atEof.not: + check read == 11 + else: + check read == 1 + + test "Read exact bytes within block boundary": + var + buf = newSeq[byte](5) + + await stream.readExactly(addr buf[0], 5) + + check buf == [byte 0, 1, 2, 3, 4] + + test "Read exact bytes outside of block boundary": + var + buf = newSeq[byte](15) + + await stream.readExactly(addr buf[0], 15) + + check buf == [byte 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]