Seekable stream (#58)

* cleanup manifest

* adding a "seekable" base stream
This commit is contained in:
Dmitriy Ryajov 2022-03-21 12:09:59 -06:00 committed by GitHub
parent 2c4b1e6906
commit e965f5e0de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 45 additions and 13 deletions

4
dagger/streams.nim Normal file
View File

@ -0,0 +1,4 @@
import ./streams/seekablestream
import ./streams/storestream
export seekablestream, storestream

View File

@ -0,0 +1,27 @@
## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/libp2p
import pkg/chronos
import pkg/chronicles
export libp2p, chronos, chronicles
logScope:
topics = "dagger seekablestream"
type
SeekableStream* = ref object of LPStream
offset*: int
method size*(self: SeekableStream): int {.base.} =
raiseAssert("method unimplemented")
proc setPos*(self: SeekableStream, pos: int) =
self.offset = pos

View File

@ -7,18 +7,22 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
{.push raises: [Defect].} import pkg/upraises
push: {.upraises: [].}
import pkg/libp2p import pkg/libp2p
import pkg/chronos import pkg/chronos
import pkg/chronicles import pkg/chronicles
import pkg/stew/ptrops import pkg/stew/ptrops
import ./stores import ../stores
import ./manifest import ../manifest
import ./blocktype import ../blocktype
export stores import ./seekablestream
export stores, blocktype, manifest, chronos
logScope: logScope:
topics = "dagger storestream" topics = "dagger storestream"
@ -28,11 +32,10 @@ type
Sequential, Sequential,
Grid Grid
StoreStream* = ref object of LPStream StoreStream* = ref object of SeekableStream
store*: BlockStore store*: BlockStore
manifest*: Manifest manifest*: Manifest
pattern*: ReadPattern pattern*: ReadPattern
offset*: int
proc init*( proc init*(
T: type StoreStream, T: type StoreStream,
@ -47,6 +50,9 @@ proc init*(
result.initStream() result.initStream()
method size*(self: StoreStream): int =
self.manifest.len * self.manifest.blockSize
method readOnce*( method readOnce*(
self: StoreStream, self: StoreStream,
pbytes: pointer, pbytes: pointer,
@ -61,8 +67,6 @@ method readOnce*(
while read < nbytes and self.atEof.not: while read < nbytes and self.atEof.not:
let let
pos = self.offset div self.manifest.blockSize pos = self.offset div self.manifest.blockSize
let
blk = (await self.store.getBlock(self.manifest[pos])).tryGet() blk = (await self.store.getBlock(self.manifest[pos])).tryGet()
blockOffset = if self.offset >= self.manifest.blockSize: blockOffset = if self.offset >= self.manifest.blockSize:
self.offset mod self.manifest.blockSize self.offset mod self.manifest.blockSize

View File

@ -5,10 +5,9 @@ import pkg/questionable/results
import ./helpers import ./helpers
import pkg/dagger/storestream import pkg/dagger/streams
import pkg/dagger/stores import pkg/dagger/stores
import pkg/dagger/manifest import pkg/dagger/manifest
import pkg/dagger/chunker
import pkg/dagger/rng import pkg/dagger/rng
suite "StoreStream": suite "StoreStream":
@ -85,7 +84,6 @@ suite "StoreStream":
buf = newSeq[byte](5) buf = newSeq[byte](5)
await stream.readExactly(addr buf[0], 5) await stream.readExactly(addr buf[0], 5)
check buf == [byte 0, 1, 2, 3, 4] check buf == [byte 0, 1, 2, 3, 4]
test "Read exact bytes outside of block boundary": test "Read exact bytes outside of block boundary":
@ -93,5 +91,4 @@ suite "StoreStream":
buf = newSeq[byte](15) buf = newSeq[byte](15)
await stream.readExactly(addr buf[0], 15) await stream.readExactly(addr buf[0], 15)
check buf == [byte 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14] check buf == [byte 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]