nim-codex/dagger/streams/storestream.nim
Dmitriy Ryajov d3dbbc75fa
Extract Discovery engine (#99)
* don't force logging syncs

* Add failing test

* wip discovery engine

* re-add chronicles sinks

* wip

* move network related stuff to own folder

* move peer related stuff to own folder

* extract discovery into it's own engine

* update imports

* move pending blocks into engine module

* add top level exports

* update imports

* update import paths

* update imports

* support for inflight request filtering and tests

* use `remove` instead of `del`

* fix sorting in `selectCheapest`

* re-org test file structure

* fix to use discovery engine

* file re-org

* fix compilation

* fixup discovery to use async handlers

* more re-org

* rework with support for discovery engine

* add logging

* use defaults

* wip: reworking with discoveryengine

* wip: more test fixes

* more logging

* use ordered table

* use `bt` for blocktype Block

* fix tests

* make tests work with discovery engine

* expose all node components

* fix to work with discovery engine

* wip

* propagate cancellation in listBlocks

* start/stop disc engine in blockexc engine

* remove disc engine start/stop

* wire up discovery engine

* misc comments and imports

* pass discovery to dagger node

* set sleep timers

* unused imports

* misc

* don't spawn a task, await it

* don't await handlers

* trace logging

* reduce default sleep time

Co-authored-by: Tanguy <tanguy@status.im>
2022-05-18 20:29:15 -06:00

97 lines
2.2 KiB
Nim

## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/upraises
push: {.upraises: [].}
import pkg/libp2p
import pkg/chronos
import pkg/chronicles
import pkg/stew/ptrops
import ../stores
import ../manifest
import ../blocktype
import ./seekablestream
export stores, blocktype, manifest, chronos
logScope:
topics = "dagger storestream"
type
StoreStream* = ref object of SeekableStream
store*: BlockStore
manifest*: Manifest
proc new*(
T: type StoreStream,
store: BlockStore,
manifest: Manifest): T =
result = T(
store: store,
manifest: manifest,
offset: 0)
result.initStream()
method size*(self: StoreStream): int =
self.manifest.len * self.manifest.blockSize
method readOnce*(
self: StoreStream,
pbytes: pointer,
nbytes: int): Future[int] {.async.} =
if self.atEof:
raise newLPStreamEOFError()
var
read = 0
while read < nbytes and not self.atEof:
let
pos = self.offset div self.manifest.blockSize
blk = (await self.store.getBlock(self.manifest[pos])).tryGet()
let
blockOffset =
if self.offset >= self.manifest.blockSize:
self.offset mod self.manifest.blockSize
else:
self.offset
readBytes =
if (nbytes - read) >= (self.manifest.blockSize - blockOffset):
self.manifest.blockSize - blockOffset
else:
min(nbytes - read, self.manifest.blockSize)
copyMem(pbytes.offset(read), unsafeAddr blk.data[blockOffset], readBytes)
self.offset += readBytes
read += readBytes
return read
method atEof*(self: StoreStream): bool =
self.offset >= self.manifest.len * self.manifest.blockSize
method closeImpl*(self: StoreStream) {.async.} =
try:
trace "Closing StoreStream"
self.offset = self.manifest.len * self.manifest.blockSize # set Eof
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "Error closing StoreStream", msg = exc.msg
await procCall LPStream(self).closeImpl()