initStream method and better exceptions handling

This commit is contained in:
Dmitriy Ryajov 2020-05-23 11:08:39 -06:00
parent d3b79b002e
commit e3f8f53620
2 changed files with 71 additions and 60 deletions

View File

@ -30,10 +30,13 @@
## will suspend until either the amount of elements in the ## will suspend until either the amount of elements in the
## buffer goes below ``maxSize`` or more data becomes available. ## buffer goes below ``maxSize`` or more data becomes available.
import deques, math, oids import deques, math
import chronos, chronicles, metrics import chronos, chronicles, metrics
import ../stream/lpstream import ../stream/lpstream
when chronicles.enabledLogLevel == LogLevel.TRACE:
import oids
export lpstream export lpstream
logScope: logScope:
@ -93,10 +96,10 @@ type
AlreadyPipedError* = object of CatchableError AlreadyPipedError* = object of CatchableError
NotWritableError* = object of CatchableError NotWritableError* = object of CatchableError
proc newAlreadyPipedError*(): ref Exception {.inline.} = proc newAlreadyPipedError*(): ref CatchableError {.inline.} =
result = newException(AlreadyPipedError, "stream already piped") result = newException(AlreadyPipedError, "stream already piped")
proc newNotWritableError*(): ref Exception {.inline.} = proc newNotWritableError*(): ref CatchableError {.inline.} =
result = newException(NotWritableError, "stream is not writable") result = newException(NotWritableError, "stream is not writable")
proc requestReadBytes(s: BufferStream): Future[void] = proc requestReadBytes(s: BufferStream): Future[void] =
@ -104,19 +107,25 @@ proc requestReadBytes(s: BufferStream): Future[void] =
## data becomes available in the read buffer ## data becomes available in the read buffer
result = newFuture[void]() result = newFuture[void]()
s.readReqs.addLast(result) s.readReqs.addLast(result)
trace "requestReadBytes(): added a future to readReqs", oid = s.oid # trace "requestReadBytes(): added a future to readReqs", oid = s.oid
method initStream(s: BufferStream) =
procCall LPStream(s).initStream()
inc getBufferStreamTracker().opened
libp2p_open_bufferstream.inc()
proc initBufferStream*(s: BufferStream, proc initBufferStream*(s: BufferStream,
handler: WriteHandler = nil, handler: WriteHandler = nil,
size: int = DefaultBufferSize) = size: int = DefaultBufferSize) =
s.initStream()
s.maxSize = if isPowerOfTwo(size): size else: nextPowerOfTwo(size) s.maxSize = if isPowerOfTwo(size): size else: nextPowerOfTwo(size)
s.readBuf = initDeque[byte](s.maxSize) s.readBuf = initDeque[byte](s.maxSize)
s.readReqs = initDeque[Future[void]]() s.readReqs = initDeque[Future[void]]()
s.dataReadEvent = newAsyncEvent() s.dataReadEvent = newAsyncEvent()
s.lock = newAsyncLock() s.lock = newAsyncLock()
s.writeLock = newAsyncLock() s.writeLock = newAsyncLock()
s.closeEvent = newAsyncEvent()
s.isClosed = false
if not(isNil(handler)): if not(isNil(handler)):
s.writeHandler = proc (data: seq[byte]) {.async, gcsafe.} = s.writeHandler = proc (data: seq[byte]) {.async, gcsafe.} =
@ -132,12 +141,7 @@ proc initBufferStream*(s: BufferStream,
finally: finally:
s.writeLock.release() s.writeLock.release()
when chronicles.enabledLogLevel == LogLevel.TRACE:
s.oid = genOid()
trace "created bufferstream", oid = s.oid trace "created bufferstream", oid = s.oid
inc getBufferStreamTracker().opened
libp2p_open_bufferstream.inc()
proc newBufferStream*(handler: WriteHandler = nil, proc newBufferStream*(handler: WriteHandler = nil,
size: int = DefaultBufferSize): BufferStream = size: int = DefaultBufferSize): BufferStream =
@ -177,12 +181,12 @@ method pushTo*(s: BufferStream, data: seq[byte]) {.base, async.} =
while index < data.len and s.readBuf.len < s.maxSize: while index < data.len and s.readBuf.len < s.maxSize:
s.readBuf.addLast(data[index]) s.readBuf.addLast(data[index])
inc(index) inc(index)
trace "pushTo()", msg = "added " & $index & " bytes to readBuf", oid = s.oid # trace "pushTo()", msg = "added " & $index & " bytes to readBuf", oid = s.oid
# resolve the next queued read request # resolve the next queued read request
if s.readReqs.len > 0: if s.readReqs.len > 0:
s.readReqs.popFirst().complete() s.readReqs.popFirst().complete()
trace "pushTo(): completed a readReqs future", oid = s.oid # trace "pushTo(): completed a readReqs future", oid = s.oid
if index >= data.len: if index >= data.len:
return return
@ -208,7 +212,7 @@ method readExactly*(s: BufferStream,
if s.atEof: if s.atEof:
raise newLPStreamEOFError() raise newLPStreamEOFError()
trace "readExactly()", requested_bytes = nbytes, oid = s.oid # trace "readExactly()", requested_bytes = nbytes, oid = s.oid
var index = 0 var index = 0
if s.readBuf.len() == 0: if s.readBuf.len() == 0:
@ -219,7 +223,7 @@ method readExactly*(s: BufferStream,
while s.readBuf.len() > 0 and index < nbytes: while s.readBuf.len() > 0 and index < nbytes:
output[index] = s.popFirst() output[index] = s.popFirst()
inc(index) inc(index)
trace "readExactly()", read_bytes = index, oid = s.oid # trace "readExactly()", read_bytes = index, oid = s.oid
if index < nbytes: if index < nbytes:
await s.requestReadBytes() await s.requestReadBytes()
@ -303,6 +307,7 @@ proc `|`*(s: BufferStream, target: BufferStream): BufferStream =
pipe(s, target) pipe(s, target)
method close*(s: BufferStream) {.async, gcsafe.} = method close*(s: BufferStream) {.async, gcsafe.} =
try:
## close the stream and clear the buffer ## close the stream and clear the buffer
if not s.isClosed: if not s.isClosed:
trace "closing bufferstream", oid = s.oid trace "closing bufferstream", oid = s.oid
@ -320,3 +325,5 @@ method close*(s: BufferStream) {.async, gcsafe.} =
trace "bufferstream closed", oid = s.oid trace "bufferstream closed", oid = s.oid
else: else:
trace "attempt to close an already closed bufferstream", trace = getStackTrace() trace "attempt to close an already closed bufferstream", trace = getStackTrace()
except CatchableError as exc:
trace "error closing buffer stream", exc = exc.msg

View File

@ -7,11 +7,13 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
import oids import chronicles, chronos
import chronicles, chronos, strformat
import ../varint, import ../varint,
../vbuffer ../vbuffer
when chronicles.enabledLogLevel == LogLevel.TRACE:
import oids
type type
LPStream* = ref object of RootObj LPStream* = ref object of RootObj
isClosed*: bool isClosed*: bool
@ -25,45 +27,50 @@ type
LPStreamIncorrectDefect* = object of Defect LPStreamIncorrectDefect* = object of Defect
LPStreamLimitError* = object of LPStreamError LPStreamLimitError* = object of LPStreamError
LPStreamReadError* = object of LPStreamError LPStreamReadError* = object of LPStreamError
par*: ref Exception par*: ref CatchableError
LPStreamWriteError* = object of LPStreamError LPStreamWriteError* = object of LPStreamError
par*: ref Exception par*: ref CatchableError
LPStreamEOFError* = object of LPStreamError LPStreamEOFError* = object of LPStreamError
LPStreamClosedError* = object of LPStreamError LPStreamClosedError* = object of LPStreamError
InvalidVarintError* = object of LPStreamError InvalidVarintError* = object of LPStreamError
MaxSizeError* = object of LPStreamError MaxSizeError* = object of LPStreamError
proc newLPStreamReadError*(p: ref Exception): ref Exception = proc newLPStreamReadError*(p: ref CatchableError): ref CatchableError =
var w = newException(LPStreamReadError, "Read stream failed") var w = newException(LPStreamReadError, "Read stream failed")
w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg
w.par = p w.par = p
result = w result = w
proc newLPStreamReadError*(msg: string): ref Exception = proc newLPStreamReadError*(msg: string): ref CatchableError =
newException(LPStreamReadError, msg) newException(LPStreamReadError, msg)
proc newLPStreamWriteError*(p: ref Exception): ref Exception = proc newLPStreamWriteError*(p: ref CatchableError): ref CatchableError =
var w = newException(LPStreamWriteError, "Write stream failed") var w = newException(LPStreamWriteError, "Write stream failed")
w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg
w.par = p w.par = p
result = w result = w
proc newLPStreamIncompleteError*(): ref Exception = proc newLPStreamIncompleteError*(): ref CatchableError =
result = newException(LPStreamIncompleteError, "Incomplete data received") result = newException(LPStreamIncompleteError, "Incomplete data received")
proc newLPStreamLimitError*(): ref Exception = proc newLPStreamLimitError*(): ref CatchableError =
result = newException(LPStreamLimitError, "Buffer limit reached") result = newException(LPStreamLimitError, "Buffer limit reached")
proc newLPStreamIncorrectDefect*(m: string): ref Exception = proc newLPStreamIncorrectDefect*(m: string): ref Defect =
result = newException(LPStreamIncorrectDefect, m) result = newException(LPStreamIncorrectDefect, m)
proc newLPStreamEOFError*(): ref Exception = proc newLPStreamEOFError*(): ref CatchableError =
result = newException(LPStreamEOFError, "Stream EOF!") result = newException(LPStreamEOFError, "Stream EOF!")
proc newLPStreamClosedError*(): ref Exception = proc newLPStreamClosedError*(): ref Exception =
result = newException(LPStreamClosedError, "Stream Closed!") result = newException(LPStreamClosedError, "Stream Closed!")
method initStream*(s: LPStream) {.base.} =
s.closeEvent = newAsyncEvent()
when chronicles.enabledLogLevel == LogLevel.TRACE:
s.oid = genOid()
method closed*(s: LPStream): bool {.base, inline.} = method closed*(s: LPStream): bool {.base, inline.} =
s.isClosed s.isClosed
@ -88,7 +95,6 @@ proc readLine*(s: LPStream, limit = 0, sep = "\r\n"): Future[string] {.async, de
var lim = if limit <= 0: -1 else: limit var lim = if limit <= 0: -1 else: limit
var state = 0 var state = 0
try:
while true: while true:
var ch: char var ch: char
await readExactly(s, addr ch, 1) await readExactly(s, addr ch, 1)
@ -108,8 +114,6 @@ proc readLine*(s: LPStream, limit = 0, sep = "\r\n"): Future[string] {.async, de
result.add(ch) result.add(ch)
if len(result) == lim: if len(result) == lim:
break break
except LPStreamIncompleteError, LPStreamReadError:
discard # EOF, in which case we should return whatever we read so far..
proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe.} = proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe.} =
var var