From 7b5259dbc713851f5fd14d2024fedf2cee4f95c7 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 2 Nov 2020 14:35:26 -0600 Subject: [PATCH] Move triggers (#416) * move event triggers to connmanager * use base error type * avoid deadlocks * handle eof and closed when identifying incoming * use `closeWait` --- libp2p/connmanager.nim | 42 +++++++++++++-- libp2p/protocols/identify.nim | 8 ++- libp2p/switch.nim | 85 +++++++----------------------- libp2p/transports/tcptransport.nim | 11 +++- 4 files changed, 67 insertions(+), 79 deletions(-) diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index cd49e56dd..04aded274 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -124,19 +124,20 @@ proc triggerPeerEvents*(c: ConnManager, peerId: PeerID, event: PeerEvent) {.async, gcsafe.} = + trace "About to trigger peer events", peer = peerId if event notin c.peerEvents: return try: let count = c.connCount(peerId) if event == PeerEvent.Joined and count != 1: - trace "peer already joined", peerId, event + trace "peer already joined", peerId, event = $event return elif event == PeerEvent.Left and count != 0: - trace "peer still connected or already left", peerId, event + trace "peer still connected or already left", peerId, event = $event return - trace "triggering peer events", peerId, event + trace "triggering peer events", peerId, event = $event var peerEvents: seq[Future[void]] for h in c.peerEvents[event]: @@ -146,7 +147,7 @@ proc triggerPeerEvents*(c: ConnManager, except CancelledError as exc: raise exc except CatchableError as exc: # handlers should not raise! - warn "exception in triggerPeerEvents", exc = exc.msg, peerId + warn "Exception in triggerPeerEvents", exc = exc.msg, peerId proc contains*(c: ConnManager, conn: Connection): bool = ## checks if a connection is being tracked by the @@ -224,6 +225,32 @@ proc cleanupConn(c: ConnManager, conn: Connection) {.async.} = trace "Connection cleaned up", conn +proc peerStartup(c: ConnManager, conn: Connection) {.async.} = + try: + trace "Triggering peer and connection events on connect", conn + let peerId = conn.peerInfo.peerId + await c.triggerPeerEvents(peerId, PeerEvent.Joined) + await c.triggerConnEvent( + peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: conn.dir == Direction.In)) + except CatchableError as exc: + # This is top-level procedure which will work as separate task, so it + # do not need to propagate CancelledError and should handle other errors + warn "Unexpected exception in switch peer connection cleanup", + conn, msg = exc.msg + +proc peerCleanup(c: ConnManager, conn: Connection) {.async.} = + try: + trace "Triggering peer and connection events on disconnect", conn + let peerId = conn.peerInfo.peerId + await c.triggerConnEvent( + peerId, ConnEvent(kind: ConnEventKind.Disconnected)) + await c.triggerPeerEvents(peerId, PeerEvent.Left) + except CatchableError as exc: + # This is top-level procedure which will work as separate task, so it + # do not need to propagate CancelledError and should handle other errors + warn "Unexpected exception peer cleanup handler", + conn, msg = exc.msg + proc onClose(c: ConnManager, conn: Connection) {.async.} = ## connection close even handler ## @@ -235,11 +262,14 @@ proc onClose(c: ConnManager, conn: Connection) {.async.} = await c.cleanupConn(conn) except CancelledError: # This is top-level procedure which will work as separate task, so it - # do not need to propogate CancelledError. + # do not need to propagate CancelledError. debug "Unexpected cancellation in connection manager's cleanup", conn except CatchableError as exc: debug "Unexpected exception in connection manager's cleanup", errMsg = exc.msg, conn + finally: + trace "Triggering peerCleanup", conn + asyncSpawn c.peerCleanup(conn) proc selectConn*(c: ConnManager, peerId: PeerID, @@ -335,6 +365,8 @@ proc storeMuxer*(c: ConnManager, trace "Stored muxer", muxer, handle = not handle.isNil, connections = c.conns.len + asyncSpawn c.peerStartup(muxer.connection) + proc getMuxedStream*(c: ConnManager, peerId: PeerID, dir: Direction): Future[Connection] {.async, gcsafe.} = diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index 9af145eae..76d1f67a9 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -30,8 +30,9 @@ const #TODO: implement push identify, leaving out for now as it is not essential type - IdentityNoMatchError* = object of CatchableError - IdentityInvalidMsgError* = object of CatchableError + IdentifyError* = object of CatchableError + IdentityNoMatchError* = object of IdentifyError + IdentityInvalidMsgError* = object of IdentifyError IdentifyInfo* = object pubKey*: Option[PublicKey] @@ -138,9 +139,6 @@ proc identify*(p: Identify, if peer.isErr: raise newException(IdentityInvalidMsgError, $peer.error) else: - # do a string comaprison of the ids, - # because that is the only thing we - # have in most cases if peer.get() != remotePeerInfo.peerId: trace "Peer ids don't match", remote = peer, diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 70e998f9a..211ac9808 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -167,13 +167,9 @@ proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} = muxer.streamHandler = s.streamHandler s.connManager.storeOutgoing(conn) - s.connManager.storeMuxer(muxer) - - # start muxer read loop - the future will complete when loop ends - let handlerFut = muxer.handle() # store it in muxed connections if we have a peer for it - s.connManager.storeMuxer(muxer, handlerFut) # update muxer with handler + s.connManager.storeMuxer(muxer, muxer.handle()) # store muxer and start read loop return muxer @@ -308,7 +304,7 @@ proc internalConnect(s: Switch, await s.upgradeOutgoing(dialed) except CatchableError as exc: # If we failed to establish the connection through one transport, - # we won't succeeed through another - no use in trying again + # we won't succeeded through another - no use in trying again await dialed.close() debug "Upgrade failed", msg = exc.msg, peerId if exc isnot CancelledError: @@ -327,30 +323,13 @@ proc internalConnect(s: Switch, if isNil(conn): # None of the addresses connected raise newException(CatchableError, "Unable to establish outgoing link") - if conn.closed(): - # This can happen if one of the peer event handlers deems the peer - # unworthy and disconnects it + if conn.closed() or conn.atEof(): + # This can happen when the other ends drops us + # before we get a chance to return the connection + # back to the dialer. + trace "Connection dead on arrival", conn raise newLPStreamClosedError() - await s.connManager.triggerPeerEvents(peerId, PeerEvent.Joined) - await s.connManager.triggerConnEvent( - peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: false)) - - proc peerCleanup() {.async.} = - try: - await conn.closeEvent.wait() - await s.connManager.triggerConnEvent( - peerId, ConnEvent(kind: ConnEventKind.Disconnected)) - await s.connManager.triggerPeerEvents(peerId, PeerEvent.Left) - except CatchableError as exc: - # This is top-level procedure which will work as separate task, so it - # do not need to propogate CancelledError and should handle other errors - warn "Unexpected exception in switch peer connect cleanup", - conn, msg = exc.msg - - # All the errors are handled inside `cleanup()` procedure. - asyncSpawn peerCleanup() - return conn proc connect*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]) {.async.} = @@ -486,45 +465,17 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = s.connManager.storeMuxer(muxer) try: - await s.identify(muxer) - except CatchableError as exc: - # Identify is non-essential, though if it fails, it might indicate that - # the connection was closed already - this will be picked up by the read - # loop - debug "Could not identify connection", conn, msg = exc.msg - - try: - let peerId = conn.peerInfo.peerId - - proc peerCleanup() {.async.} = - try: - await muxer.connection.join() - await s.connManager.triggerConnEvent( - peerId, ConnEvent(kind: ConnEventKind.Disconnected)) - await s.connManager.triggerPeerEvents(peerId, PeerEvent.Left) - except CatchableError as exc: - # This is top-level procedure which will work as separate task, so it - # do not need to propogate CancelledError and shouldn't leak others - debug "Unexpected exception in switch muxer cleanup", - conn, msg = exc.msg - - proc peerStartup() {.async.} = - try: - await s.connManager.triggerPeerEvents(peerId, PeerEvent.Joined) - await s.connManager.triggerConnEvent(peerId, - ConnEvent(kind: ConnEventKind.Connected, incoming: true)) - except CatchableError as exc: - # This is top-level procedure which will work as separate task, so it - # do not need to propogate CancelledError and shouldn't leak others - debug "Unexpected exception in switch muxer startup", - conn, msg = exc.msg - - # All the errors are handled inside `peerStartup()` procedure. - asyncSpawn peerStartup() - - # All the errors are handled inside `peerCleanup()` procedure. - asyncSpawn peerCleanup() - + try: + await s.identify(muxer) + except IdentifyError as exc: + # Identify is non-essential, though if it fails, it might indicate that + # the connection was closed already - this will be picked up by the read + # loop + debug "Could not identify connection", conn, msg = exc.msg + except LPStreamClosedError as exc: + debug "Identify stream closed", conn, msg = exc.msg + except LPStreamEOFError as exc: + debug "Identify stream EOF", conn, msg = exc.msg except CancelledError as exc: await muxer.close() raise exc diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index eda2e18aa..7783cedb7 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -72,7 +72,6 @@ proc connHandler*(t: TcpTransport, else: Direction.In)) - conn.observedAddr = MultiAddress.init(client.remoteAddress).tryGet() if not initiator: if not isNil(t.handler): t.handlers &= t.handler(conn) @@ -94,7 +93,15 @@ proc connHandler*(t: TcpTransport, t.clients.add(client) # All the errors are handled inside `cleanup()` procedure. asyncSpawn cleanup() - result = conn + + try: + conn.observedAddr = MultiAddress.init(client.remoteAddress).tryGet() + except CatchableError as exc: + trace "Connection setup failed", exc = exc.msg + if not(isNil(client)): + client.close() + + return conn proc connCb(server: StreamServer, client: StreamTransport) {.async, gcsafe.} =