close and cleanup connections

This commit is contained in:
Dmitriy Ryajov 2020-01-07 02:02:37 -06:00
parent c6561b8851
commit cd8961cfb9
3 changed files with 49 additions and 47 deletions

View File

@ -16,7 +16,7 @@ import connection,
logScope: logScope:
topic = "Multistream" topic = "Multistream"
const const
MsgSize* = 64*1024 MsgSize* = 64*1024
Codec* = "/multistream/1.0.0" Codec* = "/multistream/1.0.0"
@ -83,13 +83,13 @@ proc select*(m: MultisteamSelect,
proc select*(m: MultisteamSelect, proc select*(m: MultisteamSelect,
conn: Connection, conn: Connection,
proto: string): Future[bool] {.async.} = proto: string): Future[bool] {.async.} =
if proto.len > 0: if proto.len > 0:
result = (await m.select(conn, @[proto])) == proto result = (await m.select(conn, @[proto])) == proto
else: else:
result = (await m.select(conn, @[])) == Codec result = (await m.select(conn, @[])) == Codec
proc select*(m: MultisteamSelect, conn: Connection): Future[bool] = proc select*(m: MultisteamSelect, conn: Connection): Future[bool] =
m.select(conn, "") m.select(conn, "")
proc list*(m: MultisteamSelect, proc list*(m: MultisteamSelect,
@ -112,10 +112,9 @@ proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} =
trace "handle: starting multistream handling" trace "handle: starting multistream handling"
try: try:
while not conn.closed: while not conn.closed:
await sleepAsync(1.millis)
var ms = cast[string]((await conn.readLp())) var ms = cast[string]((await conn.readLp()))
ms.removeSuffix("\n") ms.removeSuffix("\n")
trace "handle: got request for ", ms trace "handle: got request for ", ms
if ms.len() <= 0: if ms.len() <= 0:
trace "handle: invalid proto" trace "handle: invalid proto"
@ -158,9 +157,9 @@ proc addHandler*[T: LPProtocol](m: MultisteamSelect,
protocol: T, protocol: T,
matcher: Matcher = nil) = matcher: Matcher = nil) =
## register a protocol ## register a protocol
# TODO: This is a bug in chronicles, # TODO: This is a bug in chronicles,
# it break if I uncoment this line. # it break if I uncoment this line.
# Which is almost the same as the # Which is almost the same as the
# one on the next override of addHandler # one on the next override of addHandler
# #
# trace "registering protocol", codec = codec # trace "registering protocol", codec = codec

View File

@ -125,7 +125,8 @@ method handle*(m: Mplex) {.async, gcsafe.} =
trace "exception occurred", exception = exc.msg trace "exception occurred", exception = exc.msg
finally: finally:
trace "stopping mplex main loop" trace "stopping mplex main loop"
await m.connection.close() if not m.connection.closed():
await m.connection.close()
proc newMplex*(conn: Connection, proc newMplex*(conn: Connection,
maxChanns: uint = MaxChannels): Mplex = maxChanns: uint = MaxChannels): Mplex =
@ -136,7 +137,8 @@ proc newMplex*(conn: Connection,
result.local = initTable[uint, LPChannel]() result.local = initTable[uint, LPChannel]()
let m = result let m = result
conn.closeEvent.wait().addCallback do (udata: pointer): conn.closeEvent.wait()
.addCallback do (udata: pointer):
trace "connection closed, cleaning up mplex" trace "connection closed, cleaning up mplex"
asyncCheck m.close() asyncCheck m.close()

View File

@ -7,27 +7,27 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
## This module implements an asynchronous buffer stream ## This module implements an asynchronous buffer stream
## which emulates physical async IO. ## which emulates physical async IO.
## ##
## The stream is based on the standard library's `Deque`, ## The stream is based on the standard library's `Deque`,
## which is itself based on a ring buffer. ## which is itself based on a ring buffer.
## ##
## It works by exposing a regular LPStream interface and ## It works by exposing a regular LPStream interface and
## a method ``pushTo`` to push data to the internal read ## a method ``pushTo`` to push data to the internal read
## buffer; as well as a handler that can be registrered ## buffer; as well as a handler that can be registrered
## that gets triggered on every write to the stream. This ## that gets triggered on every write to the stream. This
## allows using the buffered stream as a sort of proxy, ## allows using the buffered stream as a sort of proxy,
## which can be consumed as a regular LPStream but allows ## which can be consumed as a regular LPStream but allows
## injecting data for reads and intercepting writes. ## injecting data for reads and intercepting writes.
## ##
## Another notable feature is that the stream is fully ## Another notable feature is that the stream is fully
## ordered and asynchronous. Reads are queued up in order ## ordered and asynchronous. Reads are queued up in order
## and are suspended when not enough data available. This ## and are suspended when not enough data available. This
## allows preserving backpressure while maintaining full ## allows preserving backpressure while maintaining full
## asynchrony. Both writting to the internal buffer with ## asynchrony. Both writting to the internal buffer with
## ``pushTo`` as well as reading with ``read*` methods, ## ``pushTo`` as well as reading with ``read*` methods,
## will suspend until either the amount of elements in the ## will suspend until either the amount of elements in the
## buffer goes below ``maxSize`` or more data becomes available. ## buffer goes below ``maxSize`` or more data becomes available.
import deques, math import deques, math
@ -59,7 +59,7 @@ proc newNotWritableError*(): ref Exception {.inline.} =
result = newException(NotWritableError, "stream is not writable") result = newException(NotWritableError, "stream is not writable")
proc requestReadBytes(s: BufferStream): Future[void] = proc requestReadBytes(s: BufferStream): Future[void] =
## create a future that will complete when more ## create a future that will complete when more
## data becomes available in the read buffer ## data becomes available in the read buffer
result = newFuture[void]() result = newFuture[void]()
s.readReqs.addLast(result) s.readReqs.addLast(result)
@ -95,10 +95,10 @@ proc shrink(s: BufferStream, fromFirst = 0, fromLast = 0) =
proc len*(s: BufferStream): int = s.readBuf.len proc len*(s: BufferStream): int = s.readBuf.len
proc pushTo*(s: BufferStream, data: seq[byte]) {.async.} = proc pushTo*(s: BufferStream, data: seq[byte]) {.async.} =
## Write bytes to internal read buffer, use this to fill up the ## Write bytes to internal read buffer, use this to fill up the
## buffer with data. ## buffer with data.
## ##
## This method is async and will wait until all data has been ## This method is async and will wait until all data has been
## written to the internal buffer; this is done so that backpressure ## written to the internal buffer; this is done so that backpressure
## is preserved. ## is preserved.
## ##
@ -123,7 +123,7 @@ proc pushTo*(s: BufferStream, data: seq[byte]) {.async.} =
if index >= data.len: if index >= data.len:
break break
# if we couldn't transfer all the data to the # if we couldn't transfer all the data to the
# internal buf wait on a read event # internal buf wait on a read event
await s.dataReadEvent.wait() await s.dataReadEvent.wait()
s.lock.release() s.lock.release()
@ -159,8 +159,8 @@ method readExactly*(s: BufferStream,
copyMem(pbytes, addr buff[0], nbytes) copyMem(pbytes, addr buff[0], nbytes)
method readLine*(s: BufferStream, method readLine*(s: BufferStream,
limit = 0, limit = 0,
sep = "\r\n"): sep = "\r\n"):
Future[string] {.async.} = Future[string] {.async.} =
## Read one line from read-only stream ``rstream``, where ``"line"`` is a ## Read one line from read-only stream ``rstream``, where ``"line"`` is a
@ -213,9 +213,9 @@ method readOnce*(s: BufferStream,
result = len result = len
method readUntil*(s: BufferStream, method readUntil*(s: BufferStream,
pbytes: pointer, pbytes: pointer,
nbytes: int, nbytes: int,
sep: seq[byte]): sep: seq[byte]):
Future[int] {.async.} = Future[int] {.async.} =
## Read data from the read-only stream ``rstream`` until separator ``sep`` is ## Read data from the read-only stream ``rstream`` until separator ``sep`` is
## found. ## found.
@ -275,8 +275,8 @@ method write*(s: BufferStream,
if not isNil(s.writeHandler): if not isNil(s.writeHandler):
result = s.writeHandler(buf) result = s.writeHandler(buf)
method write*(s: BufferStream, method write*(s: BufferStream,
msg: string, msg: string,
msglen = -1): Future[void] = msglen = -1): Future[void] =
## Write string ``sbytes`` of length ``msglen`` to writer stream ``wstream``. ## Write string ``sbytes`` of length ``msglen`` to writer stream ``wstream``.
## ##
@ -291,8 +291,8 @@ method write*(s: BufferStream,
if not isNil(s.writeHandler): if not isNil(s.writeHandler):
result = s.writeHandler(cast[seq[byte]](buf)) result = s.writeHandler(cast[seq[byte]](buf))
method write*(s: BufferStream, method write*(s: BufferStream,
msg: seq[byte], msg: seq[byte],
msglen = -1): Future[void] = msglen = -1): Future[void] =
## Write sequence of bytes ``sbytes`` of length ``msglen`` to writer ## Write sequence of bytes ``sbytes`` of length ``msglen`` to writer
## stream ``wstream``. ## stream ``wstream``.
@ -310,11 +310,11 @@ method write*(s: BufferStream,
proc pipe*(s: BufferStream, proc pipe*(s: BufferStream,
target: BufferStream): BufferStream = target: BufferStream): BufferStream =
## pipe the write end of this stream to ## pipe the write end of this stream to
## be the source of the target stream ## be the source of the target stream
## ##
## Note that this only works with the LPStream ## Note that this only works with the LPStream
## interface methods `read*` and `write` are ## interface methods `read*` and `write` are
## piped. ## piped.
## ##
if s.isPiped: if s.isPiped:
@ -326,15 +326,15 @@ proc pipe*(s: BufferStream,
if not isNil(oldHandler): if not isNil(oldHandler):
await oldHandler(data) await oldHandler(data)
# if we're piping to self, # if we're piping to self,
# then add the data to the # then add the data to the
# buffer directly and fire # buffer directly and fire
# the read event # the read event
if s == target: if s == target:
for b in data: for b in data:
s.readBuf.addLast(b) s.readBuf.addLast(b)
# notify main loop of available # notify main loop of available
# data # data
s.dataReadEvent.fire() s.dataReadEvent.fire()
else: else:
@ -350,7 +350,8 @@ proc `|`*(s: BufferStream, target: BufferStream): BufferStream =
method close*(s: BufferStream) {.async.} = method close*(s: BufferStream) {.async.} =
## close the stream and clear the buffer ## close the stream and clear the buffer
for r in s.readReqs: for r in s.readReqs:
r.cancel() if not(isNil(r)) and not(r.finished()):
r.cancel()
s.dataReadEvent.fire() s.dataReadEvent.fire()
s.readBuf.clear() s.readBuf.clear()
s.closeEvent.fire() s.closeEvent.fire()