Close peers (#201)

* wip

* exceptions and resource cleanup

* correct peerlifetime on disconnect

* emulate defered

* remove comment
This commit is contained in:
Dmitriy Ryajov 2020-06-02 11:32:42 -06:00 committed by GitHub
parent c7298f34f4
commit 285884c20c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 33 additions and 12 deletions

View File

@ -200,16 +200,26 @@ method close*(m: Mplex) {.async, gcsafe.} =
try: try:
trace "closing mplex muxer", oid = m.oid trace "closing mplex muxer", oid = m.oid
await all( let channs = toSeq(m.remote.values) &
toSeq(m.remote.values).mapIt(it.reset()) & toSeq(m.local.values)
toSeq(m.local.values).mapIt(it.reset()))
for chann in channs:
try:
await chann.reset()
except CatchableError as exc:
warn "error resetting channel", exc = exc.msg
for conn in m.conns:
try:
await conn.close()
except CatchableError as exc:
warn "error closing channel's connection"
checkFutures(
await allFinished(m.handlerFuts))
await all(m.conns.mapIt(it.close())) # dispose of channel's connections
await all(m.handlerFuts)
except CatchableError as exc:
trace "exception in mplex close", exc = exc.msg
finally:
await m.connection.close() await m.connection.close()
finally:
m.remote.clear() m.remote.clear()
m.local.clear() m.local.clear()
m.conns = @[] m.conns = @[]

View File

@ -60,7 +60,7 @@ method init(c: MuxerProvider) =
if not isNil(c.muxerHandler): if not isNil(c.muxerHandler):
futs &= c.muxerHandler(muxer) futs &= c.muxerHandler(muxer)
await all(futs) checkFutures(await allFinished(futs))
except CatchableError as exc: except CatchableError as exc:
trace "exception in muxer handler", exc = exc.msg trace "exception in muxer handler", exc = exc.msg

View File

@ -140,9 +140,8 @@ proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} =
s.muxed.del(id) s.muxed.del(id)
if id in s.connections: if id in s.connections:
if not s.connections[id].closed:
await s.connections[id].close()
s.connections.del(id) s.connections.del(id)
await conn.close()
s.dialedPubSubPeers.excl(id) s.dialedPubSubPeers.excl(id)
@ -438,7 +437,7 @@ proc newSwitch*(peerInfo: PeerInfo,
if not(stream.closed): if not(stream.closed):
await stream.close() await stream.close()
except CatchableError as exc: except CatchableError as exc:
trace "excepton in stream handler", exc = exc.msg trace "exception in stream handler", exc = exc.msg
result.mount(identity) result.mount(identity)
for key, val in muxers: for key, val in muxers:
@ -448,6 +447,8 @@ proc newSwitch*(peerInfo: PeerInfo,
try: try:
trace "got new muxer" trace "got new muxer"
stream = await muxer.newStream() stream = await muxer.newStream()
# once we got a muxed connection, attempt to
# identify it
muxer.connection.peerInfo = await s.identify(stream) muxer.connection.peerInfo = await s.identify(stream)
# store muxer for connection # store muxer for connection
@ -456,6 +457,10 @@ proc newSwitch*(peerInfo: PeerInfo,
# store muxed connection # store muxed connection
s.connections[muxer.connection.peerInfo.id] = muxer.connection s.connections[muxer.connection.peerInfo.id] = muxer.connection
muxer.connection.closeEvent.wait()
.addCallback do(udata: pointer):
asyncCheck s.cleanupConn(muxer.connection)
# try establishing a pubsub connection # try establishing a pubsub connection
await s.subscribeToPeer(muxer.connection.peerInfo) await s.subscribeToPeer(muxer.connection.peerInfo)
except CatchableError as exc: except CatchableError as exc:

View File

@ -192,6 +192,9 @@ suite "Switch":
awaiters.add(await switch2.start()) awaiters.add(await switch2.start())
await switch2.connect(switch1.peerInfo) await switch2.connect(switch1.peerInfo)
check switch1.connections.len > 0
check switch2.connections.len > 0
await sleepAsync(100.millis) await sleepAsync(100.millis)
await switch2.disconnect(switch1.peerInfo) await switch2.disconnect(switch1.peerInfo)
@ -204,6 +207,9 @@ suite "Switch":
# echo connTracker.dump() # echo connTracker.dump()
check connTracker.isLeaked() == false check connTracker.isLeaked() == false
check switch1.connections.len == 0
check switch2.connections.len == 0
await all( await all(
switch1.stop(), switch1.stop(),
switch2.stop() switch2.stop()