diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index 39934eb22..f0a7bea42 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -30,10 +30,13 @@ ## will suspend until either the amount of elements in the ## buffer goes below ``maxSize`` or more data becomes available. -import deques, math, oids +import deques, math import chronos, chronicles, metrics import ../stream/lpstream +when chronicles.enabledLogLevel == LogLevel.TRACE: + import oids + export lpstream logScope: @@ -93,10 +96,10 @@ type AlreadyPipedError* = object of CatchableError NotWritableError* = object of CatchableError -proc newAlreadyPipedError*(): ref Exception {.inline.} = +proc newAlreadyPipedError*(): ref CatchableError {.inline.} = result = newException(AlreadyPipedError, "stream already piped") -proc newNotWritableError*(): ref Exception {.inline.} = +proc newNotWritableError*(): ref CatchableError {.inline.} = result = newException(NotWritableError, "stream is not writable") proc requestReadBytes(s: BufferStream): Future[void] = @@ -104,19 +107,25 @@ proc requestReadBytes(s: BufferStream): Future[void] = ## data becomes available in the read buffer result = newFuture[void]() s.readReqs.addLast(result) - trace "requestReadBytes(): added a future to readReqs", oid = s.oid + # trace "requestReadBytes(): added a future to readReqs", oid = s.oid + +method initStream(s: BufferStream) = + procCall LPStream(s).initStream() + + inc getBufferStreamTracker().opened + libp2p_open_bufferstream.inc() proc initBufferStream*(s: BufferStream, handler: WriteHandler = nil, size: int = DefaultBufferSize) = + s.initStream() + s.maxSize = if isPowerOfTwo(size): size else: nextPowerOfTwo(size) s.readBuf = initDeque[byte](s.maxSize) s.readReqs = initDeque[Future[void]]() s.dataReadEvent = newAsyncEvent() s.lock = newAsyncLock() s.writeLock = newAsyncLock() - s.closeEvent = newAsyncEvent() - s.isClosed = false if not(isNil(handler)): s.writeHandler = proc (data: seq[byte]) {.async, gcsafe.} = @@ -132,12 +141,7 @@ proc initBufferStream*(s: BufferStream, finally: s.writeLock.release() - when chronicles.enabledLogLevel == LogLevel.TRACE: - s.oid = genOid() - trace "created bufferstream", oid = s.oid - inc getBufferStreamTracker().opened - libp2p_open_bufferstream.inc() proc newBufferStream*(handler: WriteHandler = nil, size: int = DefaultBufferSize): BufferStream = @@ -177,12 +181,12 @@ method pushTo*(s: BufferStream, data: seq[byte]) {.base, async.} = while index < data.len and s.readBuf.len < s.maxSize: s.readBuf.addLast(data[index]) inc(index) - trace "pushTo()", msg = "added " & $index & " bytes to readBuf", oid = s.oid + # trace "pushTo()", msg = "added " & $index & " bytes to readBuf", oid = s.oid # resolve the next queued read request if s.readReqs.len > 0: s.readReqs.popFirst().complete() - trace "pushTo(): completed a readReqs future", oid = s.oid + # trace "pushTo(): completed a readReqs future", oid = s.oid if index >= data.len: return @@ -208,7 +212,7 @@ method readExactly*(s: BufferStream, if s.atEof: raise newLPStreamEOFError() - trace "readExactly()", requested_bytes = nbytes, oid = s.oid + # trace "readExactly()", requested_bytes = nbytes, oid = s.oid var index = 0 if s.readBuf.len() == 0: @@ -219,7 +223,7 @@ method readExactly*(s: BufferStream, while s.readBuf.len() > 0 and index < nbytes: output[index] = s.popFirst() inc(index) - trace "readExactly()", read_bytes = index, oid = s.oid + # trace "readExactly()", read_bytes = index, oid = s.oid if index < nbytes: await s.requestReadBytes() @@ -303,20 +307,23 @@ proc `|`*(s: BufferStream, target: BufferStream): BufferStream = pipe(s, target) method close*(s: BufferStream) {.async, gcsafe.} = - ## close the stream and clear the buffer - if not s.isClosed: - trace "closing bufferstream", oid = s.oid - for r in s.readReqs: - if not(isNil(r)) and not(r.finished()): - r.fail(newLPStreamEOFError()) - s.dataReadEvent.fire() - s.readBuf.clear() - s.closeEvent.fire() - s.isClosed = true + try: + ## close the stream and clear the buffer + if not s.isClosed: + trace "closing bufferstream", oid = s.oid + for r in s.readReqs: + if not(isNil(r)) and not(r.finished()): + r.fail(newLPStreamEOFError()) + s.dataReadEvent.fire() + s.readBuf.clear() + s.closeEvent.fire() + s.isClosed = true - inc getBufferStreamTracker().closed - libp2p_open_bufferstream.dec() + inc getBufferStreamTracker().closed + libp2p_open_bufferstream.dec() - trace "bufferstream closed", oid = s.oid - else: - trace "attempt to close an already closed bufferstream", trace = getStackTrace() + trace "bufferstream closed", oid = s.oid + else: + trace "attempt to close an already closed bufferstream", trace = getStackTrace() + except CatchableError as exc: + trace "error closing buffer stream", exc = exc.msg diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index 899691c66..211c79488 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -7,11 +7,13 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import oids -import chronicles, chronos, strformat +import chronicles, chronos import ../varint, ../vbuffer +when chronicles.enabledLogLevel == LogLevel.TRACE: + import oids + type LPStream* = ref object of RootObj isClosed*: bool @@ -25,45 +27,50 @@ type LPStreamIncorrectDefect* = object of Defect LPStreamLimitError* = object of LPStreamError LPStreamReadError* = object of LPStreamError - par*: ref Exception + par*: ref CatchableError LPStreamWriteError* = object of LPStreamError - par*: ref Exception + par*: ref CatchableError LPStreamEOFError* = object of LPStreamError LPStreamClosedError* = object of LPStreamError InvalidVarintError* = object of LPStreamError MaxSizeError* = object of LPStreamError -proc newLPStreamReadError*(p: ref Exception): ref Exception = +proc newLPStreamReadError*(p: ref CatchableError): ref CatchableError = var w = newException(LPStreamReadError, "Read stream failed") w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg w.par = p result = w -proc newLPStreamReadError*(msg: string): ref Exception = +proc newLPStreamReadError*(msg: string): ref CatchableError = newException(LPStreamReadError, msg) -proc newLPStreamWriteError*(p: ref Exception): ref Exception = +proc newLPStreamWriteError*(p: ref CatchableError): ref CatchableError = var w = newException(LPStreamWriteError, "Write stream failed") w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg w.par = p result = w -proc newLPStreamIncompleteError*(): ref Exception = +proc newLPStreamIncompleteError*(): ref CatchableError = result = newException(LPStreamIncompleteError, "Incomplete data received") -proc newLPStreamLimitError*(): ref Exception = +proc newLPStreamLimitError*(): ref CatchableError = result = newException(LPStreamLimitError, "Buffer limit reached") -proc newLPStreamIncorrectDefect*(m: string): ref Exception = +proc newLPStreamIncorrectDefect*(m: string): ref Defect = result = newException(LPStreamIncorrectDefect, m) -proc newLPStreamEOFError*(): ref Exception = +proc newLPStreamEOFError*(): ref CatchableError = result = newException(LPStreamEOFError, "Stream EOF!") proc newLPStreamClosedError*(): ref Exception = result = newException(LPStreamClosedError, "Stream Closed!") +method initStream*(s: LPStream) {.base.} = + s.closeEvent = newAsyncEvent() + when chronicles.enabledLogLevel == LogLevel.TRACE: + s.oid = genOid() + method closed*(s: LPStream): bool {.base, inline.} = s.isClosed @@ -88,28 +95,25 @@ proc readLine*(s: LPStream, limit = 0, sep = "\r\n"): Future[string] {.async, de var lim = if limit <= 0: -1 else: limit var state = 0 - try: - while true: - var ch: char - await readExactly(s, addr ch, 1) + while true: + var ch: char + await readExactly(s, addr ch, 1) - if sep[state] == ch: - inc(state) - if state == len(sep): - break + if sep[state] == ch: + inc(state) + if state == len(sep): + break + else: + state = 0 + if limit > 0: + let missing = min(state, lim - len(result) - 1) + result.add(sep[0 ..< missing]) else: - state = 0 - if limit > 0: - let missing = min(state, lim - len(result) - 1) - result.add(sep[0 ..< missing]) - else: - result.add(sep[0 ..< state]) + result.add(sep[0 ..< state]) - result.add(ch) - if len(result) == lim: - break - except LPStreamIncompleteError, LPStreamReadError: - discard # EOF, in which case we should return whatever we read so far.. + result.add(ch) + if len(result) == lim: + break proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe.} = var