adding a "seekable" base stream

This commit is contained in:
Dmitriy Ryajov 2022-03-18 12:18:34 -06:00
parent 1b601ccd1b
commit ac01ff66b2
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
5 changed files with 45 additions and 59 deletions

View File

@ -1,46 +0,0 @@
## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import pkg/libp2p
import pkg/questionable
template EmptyDigests*: untyped =
var
emptyDigests {.global, threadvar.}:
array[CIDv0..CIDv1, Table[MultiCodec, MultiHash]]
once:
emptyDigests = [
CIDv0: {
multiCodec("sha2-256"): Cid
.init("bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku")
.get()
.mhash
.get()
}.toTable,
CIDv1: {
multiCodec("sha2-256"): Cid
.init("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n")
.get()
.mhash
.get()
}.toTable,
]
emptyDigests
type
Manifest* = object of RootObj
rootHash*: ?Cid
blocks*: seq[Cid]
version*: CidVersion
hcodec*: MultiCodec
codec*: MultiCodec

4
dagger/streams.nim Normal file
View File

@ -0,0 +1,4 @@
import ./streams/seekablestream
import ./streams/storestream
export seekablestream, storestream

View File

@ -0,0 +1,27 @@
## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/libp2p
import pkg/chronos
import pkg/chronicles
export libp2p, chronos, chronicles
logScope:
topics = "dagger seekablestream"
type
SeekableStream* = ref object of LPStream
offset*: int
method size*(self: SeekableStream): int {.base.} =
raiseAssert("method unimplemented")
proc setPos*(self: SeekableStream, pos: int) =
self.offset = pos

View File

@ -7,18 +7,22 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
{.push raises: [Defect].}
import pkg/upraises
push: {.upraises: [].}
import pkg/libp2p
import pkg/chronos
import pkg/chronicles
import pkg/stew/ptrops
import ./stores
import ./manifest
import ./blocktype
import ../stores
import ../manifest
import ../blocktype
export stores
import ./seekablestream
export stores, blocktype, manifest, chronos
logScope:
topics = "dagger storestream"
@ -28,11 +32,10 @@ type
Sequential,
Grid
StoreStream* = ref object of LPStream
StoreStream* = ref object of SeekableStream
store*: BlockStore
manifest*: Manifest
pattern*: ReadPattern
offset*: int
proc init*(
T: type StoreStream,
@ -47,6 +50,9 @@ proc init*(
result.initStream()
method size*(self: StoreStream): int =
self.manifest.len * self.manifest.blockSize
method readOnce*(
self: StoreStream,
pbytes: pointer,
@ -61,8 +67,6 @@ method readOnce*(
while read < nbytes and self.atEof.not:
let
pos = self.offset div self.manifest.blockSize
let
blk = (await self.store.getBlock(self.manifest[pos])).tryGet()
blockOffset = if self.offset >= self.manifest.blockSize:
self.offset mod self.manifest.blockSize

View File

@ -5,10 +5,9 @@ import pkg/questionable/results
import ./helpers
import pkg/dagger/storestream
import pkg/dagger/streams
import pkg/dagger/stores
import pkg/dagger/manifest
import pkg/dagger/chunker
import pkg/dagger/rng
suite "StoreStream":
@ -85,7 +84,6 @@ suite "StoreStream":
buf = newSeq[byte](5)
await stream.readExactly(addr buf[0], 5)
check buf == [byte 0, 1, 2, 3, 4]
test "Read exact bytes outside of block boundary":
@ -93,5 +91,4 @@ suite "StoreStream":
buf = newSeq[byte](15)
await stream.readExactly(addr buf[0], 15)
check buf == [byte 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]