From 28ebb16767cbf43dce2738fd4d9cfdf210b309dc Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 14 Jun 2022 09:19:35 -0600 Subject: [PATCH] adding AsyncStreamWrapper (#110) * Adding AsyncStreamWrapper type * use asyncstreamwrapper to avoid copies * make size an operator * export asyncstreamwrapper --- codex/rest/api.nim | 27 ++---- codex/streams.nim | 3 +- codex/streams/asyncstreamwrapper.nim | 130 +++++++++++++++++++++++++++ codex/streams/seekablestream.nim | 2 +- codex/streams/storestream.nim | 6 +- 5 files changed, 143 insertions(+), 25 deletions(-) create mode 100644 codex/streams/asyncstreamwrapper.nim diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 4b6efac9..5e0df30d 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -30,6 +30,7 @@ import ../node import ../blocktype import ../conf import ../contracts +import ../streams import ./json @@ -234,38 +235,20 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter = let reader = bodyReader.get() - stream = BufferStream.new() - storeFut = node.store(stream) - var bytes = 0 try: - while not reader.atEof: - var - buff = newSeqUninitialized[byte](BlockSize) - len = await reader.readOnce(addr buff[0], buff.len) - - buff.setLen(len) - if len <= 0: - break - - trace "Got chunk from endpoint", len = buff.len - await stream.pushData(buff) - bytes += len - - await stream.pushEof() - without cid =? (await storeFut), error: + without cid =? ( + await node.store(AsyncStreamWrapper.new(reader = AsyncStreamReader(reader)))), error: + trace "Error uploading file", exc = error.msg return RestApiResponse.error(Http500, error.msg) - trace "Uploaded file", bytes, cid = $cid + trace "Uploaded file", cid = $cid return RestApiResponse.response($cid) except CancelledError as exc: - await reader.closeWait() return RestApiResponse.error(Http500) except AsyncStreamError: - await reader.closeWait() return RestApiResponse.error(Http500) finally: - await stream.close() await reader.closeWait() # if we got here something went wrong? diff --git a/codex/streams.nim b/codex/streams.nim index e110adbd..0c3f573a 100644 --- a/codex/streams.nim +++ b/codex/streams.nim @@ -1,4 +1,5 @@ import ./streams/seekablestream import ./streams/storestream +import ./streams/asyncstreamwrapper -export seekablestream, storestream +export seekablestream, storestream, asyncstreamwrapper diff --git a/codex/streams/asyncstreamwrapper.nim b/codex/streams/asyncstreamwrapper.nim new file mode 100644 index 00000000..81406bc3 --- /dev/null +++ b/codex/streams/asyncstreamwrapper.nim @@ -0,0 +1,130 @@ +## Nim-LibP2P +## Copyright (c) 2019 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [Defect].} + +import pkg/chronos +import pkg/chronicles +import pkg/libp2p + +logScope: + topics = "libp2p asyncstreamwrapper" + +const + AsyncStreamWrapperName* = "AsyncStreamWrapper" + +type + AsyncStreamWrapper* = ref object of LPStream + reader*: AsyncStreamReader + writer*: AsyncStreamWriter + +method initStream*(self: AsyncStreamWrapper) = + if self.objName.len == 0: + self.objName = AsyncStreamWrapperName + + procCall LPStream(self).initStream() + +proc new*( + C: type AsyncStreamWrapper, + reader: AsyncStreamReader = nil, + writer: AsyncStreamWriter = nil): AsyncStreamWrapper = + let + stream = C(reader: reader, writer: writer) + + stream.initStream() + return stream + +template withExceptions(body: untyped) = + try: + body + except CancelledError as exc: + raise exc + except AsyncStreamIncompleteError: + # for all intents and purposes this is an EOF + raise newLPStreamIncompleteError() + except AsyncStreamLimitError: + raise newLPStreamLimitError() + except AsyncStreamUseClosedError: + raise newLPStreamEOFError() + except AsyncStreamError as exc: + raise newException(LPStreamError, exc.msg) + +method readOnce*( + self: AsyncStreamWrapper, + pbytes: pointer, + nbytes: int): Future[int] {.async.} = + + trace "Reading bytes from reader", bytes = nbytes + if isNil(self.reader): + error "Async stream wrapper reader nil" + raiseAssert("Async stream wrapper reader nil") + + if self.atEof: + raise newLPStreamEOFError() + + withExceptions: + return await self.reader.readOnce(pbytes, nbytes) + +proc completeWrite( + self: AsyncStreamWrapper, + fut: Future[void], + msgLen: int): Future[void] {.async.} = + + withExceptions: + await fut + +method write*(self: AsyncStreamWrapper, msg: seq[byte]): Future[void] = + # Avoid a copy of msg being kept in the closure created by `{.async.}` as this + # drives up memory usage + + trace "Writing bytes to writer", bytes = msg.len + if isNil(self.writer): + error "Async stream wrapper writer nil" + raiseAssert("Async stream wrapper writer nil") + + if self.closed: + let fut = newFuture[void]("asyncstreamwrapper.write.closed") + fut.fail(newLPStreamClosedError()) + return fut + + self.completeWrite(self.writer.write(msg, msg.len), msg.len) + +method closed*(self: AsyncStreamWrapper): bool = + var + readerClosed = true + writerClosed = true + + if not isNil(self.reader): + readerClosed = self.reader.closed + + if not isNil(self.writer): + writerClosed = self.writer.closed + + return readerClosed and writerClosed + +method atEof*(self: AsyncStreamWrapper): bool = + self.reader.atEof() + +method closeImpl*(self: AsyncStreamWrapper) {.async.} = + try: + trace "Shutting down async chronos stream" + if not self.closed(): + if not isNil(self.reader) and not self.reader.closed(): + await self.reader.closeWait() + + if not isNil(self.writer) and not self.writer.closed(): + await self.writer.closeWait() + + trace "Shutdown async chronos stream" + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "Error closing async chronos stream", msg = exc.msg + + await procCall LPStream(self).closeImpl() diff --git a/codex/streams/seekablestream.nim b/codex/streams/seekablestream.nim index ab5ae0fd..54c13380 100644 --- a/codex/streams/seekablestream.nim +++ b/codex/streams/seekablestream.nim @@ -20,7 +20,7 @@ type SeekableStream* = ref object of LPStream offset*: int -method size*(self: SeekableStream): int {.base.} = +method `size`*(self: SeekableStream): int {.base.} = raiseAssert("method unimplemented") proc setPos*(self: SeekableStream, pos: int) = diff --git a/codex/streams/storestream.nim b/codex/streams/storestream.nim index cece19a5..83958af9 100644 --- a/codex/streams/storestream.nim +++ b/codex/streams/storestream.nim @@ -45,9 +45,13 @@ proc new*( result.initStream() -method size*(self: StoreStream): int = +method `size`*(self: StoreStream): int = self.manifest.len * self.manifest.blockSize +proc `size=`*(self: StoreStream, size: int) + {.error: "Setting the size is forbidden".} = + discard + method readOnce*( self: StoreStream, pbytes: pointer,