reduse usssage of asyncCheck

This commit is contained in:
Dmitriy Ryajov 2020-04-07 09:49:43 -06:00
parent bd49a35e0a
commit 6cbcc7859e
3 changed files with 35 additions and 30 deletions

View File

@ -35,26 +35,28 @@ proc newInvalidVarintException*(): ref InvalidVarintException =
proc newInvalidVarintSizeException*(): ref InvalidVarintSizeException =
newException(InvalidVarintSizeException, "Wrong varint size")
proc init*[T: Connection](self: var T, stream: LPStream) =
proc bindStreamClose(conn: Connection) {.async.} =
# bind stream's close event to connection's close
# to ensure correct close propagation
if not isNil(conn.stream.closeEvent):
await conn.stream.closeEvent.wait()
trace "wrapped stream closed, about to close conn", closed = this.isClosed,
peer = if not isNil(this.peerInfo):
this.peerInfo.id else: ""
if not conn.isClosed:
trace "wrapped stream closed, closing conn", closed = this.isClosed,
peer = if not isNil(this.peerInfo):
this.peerInfo.id else: ""
asyncCheck conn.close()
proc init*[T: Connection](self: var T, stream: LPStream): T =
## create a new Connection for the specified async reader/writer
new self
self.stream = stream
self.closeEvent = newAsyncEvent()
asyncCheck self.bindStreamClose()
# bind stream's close event to connection's close
# to ensure correct close propagation
let this = self
if not isNil(stream.closeEvent):
stream.closeEvent.wait().
addCallback do (udata: pointer):
trace "wrapped stream closed, about to close conn", closed = this.isClosed,
peer = if not isNil(this.peerInfo):
this.peerInfo.id else: ""
if not this.isClosed:
trace "wrapped stream closed, closing conn", closed = this.isClosed,
peer = if not isNil(this.peerInfo):
this.peerInfo.id else: ""
asyncCheck this.close()
return self
proc newConnection*(stream: LPStream): Connection =
## create a new Connection for the specified async reader/writer
@ -119,9 +121,9 @@ method close*(s: Connection) {.async, gcsafe.} =
if not s.closed:
if not isNil(s.stream) and not s.stream.closed:
trace "closing connection", closed = s.closed,
peer = if not isNil(s.peerInfo):
s.peerInfo.id else: ""
trace "closing child stream", closed = s.closed,
peer = if not isNil(s.peerInfo):
s.peerInfo.id else: ""
await s.stream.close()
s.closeEvent.fire()

View File

@ -128,6 +128,11 @@ method handle*(m: Mplex) {.async, gcsafe.} =
trace "stopping mplex main loop"
await m.close()
proc internalCleanup(m: Mplex, conn: Connection) {.async.} =
await conn.closeEvent.wait()
trace "connection closed, cleaning up mplex"
await m.close()
proc newMplex*(conn: Connection,
maxChanns: uint = MaxChannels): Mplex =
new result
@ -136,11 +141,7 @@ proc newMplex*(conn: Connection,
result.remote = initTable[uint64, LPChannel]()
result.local = initTable[uint64, LPChannel]()
let m = result
conn.closeEvent.wait()
.addCallback do (udata: pointer):
trace "connection closed, cleaning up mplex"
asyncCheck m.close()
asyncCheck result.internalCleanup(conn)
method newStream*(m: Mplex,
name: string = "",

View File

@ -138,6 +138,14 @@ method handleConn*(p: PubSub,
trace "pubsub peer handler ended, cleaning up"
await p.cleanUpHelper(peer)
proc internalClenaup(p: PubSub, conn: Connection) {.async.} =
# handle connection close
var peer = p.getPeer(conn.peerInfo, p.codec)
await conn.closeEvent.wait()
trace "connection closed, cleaning up peer", peer = conn.peerInfo.id
await p.cleanUpHelper(peer)
method subscribeToPeer*(p: PubSub,
conn: Connection) {.base, async.} =
var peer = p.getPeer(conn.peerInfo, p.codec)
@ -145,13 +153,7 @@ method subscribeToPeer*(p: PubSub,
if not peer.isConnected:
peer.conn = conn
# handle connection close
conn.closeEvent.wait()
.addCallback do (udata: pointer = nil):
trace "connection closed, cleaning up peer",
peer = conn.peerInfo.id
asyncCheck p.cleanUpHelper(peer)
asyncCheck p.internalClenaup(conn)
method unsubscribe*(p: PubSub,
topics: seq[TopicPair]) {.base, async.} =