better exception handling and resource cleanup

This commit is contained in:
Dmitriy Ryajov 2020-05-23 11:12:28 -06:00
parent 7ff76d76b6
commit 640c3bdc45
1 changed files with 79 additions and 75 deletions

View File

@ -48,7 +48,7 @@ type
pubSub*: Option[PubSub] pubSub*: Option[PubSub]
dialedPubSubPeers: HashSet[string] dialedPubSubPeers: HashSet[string]
proc newNoPubSubException(): ref Exception {.inline.} = proc newNoPubSubException(): ref CatchableError {.inline.} =
result = newException(NoPubSubException, "no pubsub provided!") result = newException(NoPubSubException, "no pubsub provided!")
proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
@ -134,23 +134,26 @@ proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} =
s.muxed[conn.peerInfo.id] = muxer s.muxed[conn.peerInfo.id] = muxer
proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} = proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} =
if not isNil(conn.peerInfo): try:
let id = conn.peerInfo.id if not isNil(conn.peerInfo):
trace "cleaning up connection for peer", peerId = id let id = conn.peerInfo.id
if id in s.muxed: trace "cleaning up connection for peer", peerId = id
await s.muxed[id].close() if id in s.muxed:
s.muxed.del(id) await s.muxed[id].close()
s.muxed.del(id)
if id in s.connections: if id in s.connections:
if not s.connections[id].closed: if not s.connections[id].closed:
await s.connections[id].close() await s.connections[id].close()
s.connections.del(id) s.connections.del(id)
s.dialedPubSubPeers.excl(id) s.dialedPubSubPeers.excl(id)
# TODO: Investigate cleanupConn() always called twice for one peer. # TODO: Investigate cleanupConn() always called twice for one peer.
if not(conn.peerInfo.isClosed()): if not(conn.peerInfo.isClosed()):
conn.peerInfo.close() conn.peerInfo.close()
except CatchableError as exc:
trace "exception cleaning up connection", exc = exc.msg
proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} = proc disconnect*(s: Switch, peer: PeerInfo) {.async, gcsafe.} =
let conn = s.connections.getOrDefault(peer.id) let conn = s.connections.getOrDefault(peer.id)
@ -167,25 +170,19 @@ proc getMuxedStream(s: Switch, peerInfo: PeerInfo): Future[Connection] {.async,
result = conn result = conn
proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
try: trace "handling connection", conn = $conn
trace "handling connection", conn = $conn result = conn
result = conn
# don't mux/secure twise # don't mux/secure twise
if conn.peerInfo.id in s.muxed: if conn.peerInfo.id in s.muxed:
return return
result = await s.secure(result) # secure the connection result = await s.secure(result) # secure the connection
if isNil(result): if isNil(result):
return return
await s.mux(result) # mux it if possible await s.mux(result) # mux it if possible
s.connections[conn.peerInfo.id] = result s.connections[conn.peerInfo.id] = result
except CancelledError as exc:
raise exc
except CatchableError as exc:
debug "Couldn't upgrade outgoing connection", msg = exc.msg
return nil
proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
trace "upgrading incoming connection", conn = $conn trace "upgrading incoming connection", conn = $conn
@ -207,27 +204,33 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
ms.addHandler(muxer.codec, muxer) ms.addHandler(muxer.codec, muxer)
# handle subsequent requests # handle subsequent requests
await ms.handle(sconn) try:
await sconn.close() await ms.handle(sconn)
finally:
await sconn.close()
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
debug "ending secured handler", err = exc.msg debug "ending secured handler", err = exc.msg
if (await ms.select(conn)): # just handshake
# add the secure handlers
for k in s.secureManagers.keys:
ms.addHandler(k, securedHandler)
try: try:
# handle secured connections try:
await ms.handle(conn) if (await ms.select(conn)): # just handshake
# add the secure handlers
for k in s.secureManagers.keys:
ms.addHandler(k, securedHandler)
# handle secured connections
await ms.handle(conn)
finally:
await conn.close()
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
debug "ending multistream", err = exc.msg debug "ending multistream", err = exc.msg
proc subscribeToPeer(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.}
proc internalConnect(s: Switch, proc internalConnect(s: Switch,
peer: PeerInfo): Future[Connection] {.async.} = peer: PeerInfo): Future[Connection] {.async.} =
@ -239,13 +242,7 @@ proc internalConnect(s: Switch,
for a in peer.addrs: # for each address for a in peer.addrs: # for each address
if t.handles(a): # check if it can dial it if t.handles(a): # check if it can dial it
trace "Dialing address", address = $a trace "Dialing address", address = $a
try: conn = await t.dial(a)
conn = await t.dial(a)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "couldn't dial peer, transport failed", exc = exc.msg, address = a
continue
# make sure to assign the peer to the connection # make sure to assign the peer to the connection
conn.peerInfo = peer conn.peerInfo = peer
conn = await s.upgradeOutgoing(conn) conn = await s.upgradeOutgoing(conn)
@ -253,8 +250,9 @@ proc internalConnect(s: Switch,
continue continue
conn.closeEvent.wait() conn.closeEvent.wait()
.addCallback do (udata: pointer): .addCallback do(udata: pointer):
asyncCheck s.cleanupConn(conn) asyncCheck s.cleanupConn(conn)
break break
else: else:
trace "Reusing existing connection" trace "Reusing existing connection"
@ -289,7 +287,7 @@ proc dial*(s: Switch,
if not await s.ms.select(result, proto): if not await s.ms.select(result, proto):
warn "Unable to select sub-protocol", proto = proto warn "Unable to select sub-protocol", proto = proto
raise newException(CatchableError, &"unable to select protocol: {proto}") return nil
proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} = proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} =
if isNil(proto.handler): if isNil(proto.handler):
@ -307,14 +305,14 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} = proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} =
try: try:
await s.upgradeIncoming(conn) # perform upgrade on incoming connection try:
await s.upgradeIncoming(conn) # perform upgrade on incoming connection
finally:
await s.cleanupConn(conn)
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
trace "Exception occurred in Switch.start", exc = exc.msg trace "Exception occurred in Switch.start", exc = exc.msg
finally:
await conn.close()
await s.cleanupConn(conn)
var startFuts: seq[Future[void]] var startFuts: seq[Future[void]]
for t in s.transports: # for each transport for t in s.transports: # for each transport
@ -338,27 +336,27 @@ proc stop*(s: Switch) {.async.} =
if s.pubSub.isSome: if s.pubSub.isSome:
await s.pubSub.get().stop() await s.pubSub.get().stop()
checkFutures( await all(
await allFinished( toSeq(s.connections.values)
toSeq(s.connections.values).mapIt(s.cleanupConn(it)))) .mapIt(s.cleanupConn(it)))
checkFutures( await all(
await allFinished( s.transports.mapIt(it.close()))
s.transports.mapIt(it.close())))
trace "switch stopped" trace "switch stopped"
proc subscribeToPeer(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
## Subscribe to pub sub peer ## Subscribe to pub sub peer
if s.pubSub.isSome and peerInfo.id notin s.dialedPubSubPeers: if s.pubSub.isSome and peerInfo.id notin s.dialedPubSubPeers:
try: try:
s.dialedPubSubPeers.incl(peerInfo.id) s.dialedPubSubPeers.incl(peerInfo.id)
let conn = await s.dial(peerInfo, s.pubSub.get().codec) let conn = await s.dial(peerInfo, s.pubSub.get().codec)
if isNil(conn):
trace "unable to subscribe to peer"
return
await s.pubSub.get().subscribeToPeer(conn) await s.pubSub.get().subscribeToPeer(conn)
except CancelledError as exc:
raise exc
except CatchableError as exc: except CatchableError as exc:
warn "unable to initiate pubsub", exc = exc.msg trace "exception in subscribe to peer", exc = exc.msg
finally: finally:
s.dialedPubSubPeers.excl(peerInfo.id) s.dialedPubSubPeers.excl(peerInfo.id)
@ -434,19 +432,25 @@ proc newSwitch*(peerInfo: PeerInfo,
for key, val in muxers: for key, val in muxers:
val.streamHandler = result.streamHandler val.streamHandler = result.streamHandler
val.muxerHandler = proc(muxer: Muxer) {.async, gcsafe.} = val.muxerHandler = proc(muxer: Muxer) {.async, gcsafe.} =
trace "got new muxer" var stream: Connection
let stream = await muxer.newStream() try:
muxer.connection.peerInfo = await s.identify(stream) trace "got new muxer"
await stream.close() stream = await muxer.newStream()
muxer.connection.peerInfo = await s.identify(stream)
# store muxer for connection # store muxer for connection
s.muxed[muxer.connection.peerInfo.id] = muxer s.muxed[muxer.connection.peerInfo.id] = muxer
# store muxed connection # store muxed connection
s.connections[muxer.connection.peerInfo.id] = muxer.connection s.connections[muxer.connection.peerInfo.id] = muxer.connection
# try establishing a pubsub connection # try establishing a pubsub connection
await s.subscribeToPeer(muxer.connection.peerInfo) await s.subscribeToPeer(muxer.connection.peerInfo)
except CatchableError as exc:
trace "exception in muxer handler", exc = exc.msg
finally:
if not(isNil(stream)):
await stream.close()
for proto in secureManagers: for proto in secureManagers:
trace "adding secure manager ", codec = proto.codec trace "adding secure manager ", codec = proto.codec