add storestream and tests

This commit is contained in:
Dmitriy Ryajov 2022-03-16 15:48:10 -06:00
parent cc2cf7c14d
commit b1264a9c87
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
2 changed files with 192 additions and 0 deletions

95
dagger/storestream.nim Normal file
View File

@ -0,0 +1,95 @@
## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import pkg/libp2p
import pkg/chronos
import pkg/chronicles
import pkg/stew/ptrops
import ./stores
import ./manifest
import ./blocktype
export stores
logScope:
topics = "dagger storestream"
type
ReadPattern* {.pure.} = enum
Sequential,
Grid
StoreStream* = ref object of LPStream
store*: BlockStore
manifest*: Manifest
pattern*: ReadPattern
offset*: int
proc init*(
T: type StoreStream,
store: BlockStore,
manifest: Manifest,
pattern = ReadPattern.Sequential): T =
result = T(
store: store,
manifest: manifest,
pattern: pattern,
offset: 0)
result.initStream()
method readOnce*(
self: StoreStream,
pbytes: pointer,
nbytes: int): Future[int] {.async.} =
if self.atEof:
raise newLPStreamEOFError()
var
read = 0
while read < nbytes and self.atEof.not:
let
pos = self.offset div self.manifest.blockSize
let
blk = (await self.store.getBlock(self.manifest[pos])).tryGet()
blockOffset = if self.offset >= self.manifest.blockSize:
self.offset mod self.manifest.blockSize
else:
self.offset
readBytes = if (nbytes - read) >= (self.manifest.blockSize - blockOffset):
self.manifest.blockSize - blockOffset
else:
min(nbytes - read, self.manifest.blockSize)
copyMem(pbytes.offset(read), unsafeAddr blk.data[blockOffset], readBytes)
self.offset += readBytes
read += readBytes
return read
method atEof*(self: StoreStream): bool =
self.offset >= self.manifest.len * self.manifest.blockSize
method closeImpl*(self: StoreStream) {.async.} =
try:
trace "Closing StoreStream", self
self.offset = self.manifest.len * self.manifest.blockSize # set Eof
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "Error closing StoreStream", s, msg = exc.msg
await procCall LPStream(self).closeImpl()

View File

@ -0,0 +1,97 @@
import pkg/chronos
import pkg/asynctest
import pkg/libp2p
import pkg/questionable/results
import ./helpers
import pkg/dagger/storestream
import pkg/dagger/stores
import pkg/dagger/manifest
import pkg/dagger/chunker
import pkg/dagger/rng
suite "StoreStream":
var
manifest: Manifest
store: BlockStore
stream: StoreStream
let
data = [
[byte 0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
[byte 10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
[byte 20, 21, 22, 23, 24, 25, 26, 27, 28, 29],
[byte 30, 31, 32, 33, 34, 35, 36, 37, 38, 39],
[byte 40, 41, 42, 43, 44, 45, 46, 47, 48, 49],
[byte 50, 51, 52, 53, 54, 55, 56, 57, 58, 59],
[byte 60, 61, 62, 63, 64, 65, 66, 67, 68, 69],
[byte 70, 71, 72, 73, 74, 75, 76, 77, 78, 79],
[byte 80, 81, 82, 83, 84, 85, 86, 87, 88, 89],
[byte 90, 91, 92, 93, 94, 95, 96, 97, 98, 99],
]
setup:
store = CacheStore.new()
manifest = Manifest.new(blockSize = 10).tryGet()
stream = StoreStream.init(store, manifest)
for d in data:
let
blk = Block.init(d).tryGet()
manifest.add(blk.cid)
if not (await store.putBlock(blk)):
raise newException(CatchableError, "Unable to store block " & $blk.cid)
test "Read all blocks < blockSize":
var
buf = newSeq[byte](8)
while not stream.atEof:
let
read = (await stream.readOnce(addr buf[0], buf.len))
if stream.atEof.not:
check read == 8
else:
check read == 4
test "Read all blocks == blockSize":
var
buf = newSeq[byte](10)
while not stream.atEof:
let
read = (await stream.readOnce(addr buf[0], buf.len))
check read == 10
test "Read all blocks > blockSize":
var
buf = newSeq[byte](11)
while not stream.atEof:
let
read = (await stream.readOnce(addr buf[0], buf.len))
if stream.atEof.not:
check read == 11
else:
check read == 1
test "Read exact bytes within block boundary":
var
buf = newSeq[byte](5)
await stream.readExactly(addr buf[0], 5)
check buf == [byte 0, 1, 2, 3, 4]
test "Read exact bytes outside of block boundary":
var
buf = newSeq[byte](15)
await stream.readExactly(addr buf[0], 15)
check buf == [byte 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]