diff --git a/libp2p/switch.nim b/libp2p/switch.nim index b12805289..9269898f4 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -44,7 +44,8 @@ declareCounter(libp2p_failed_dials, "failed dials") declareCounter(libp2p_failed_upgrade, "peers failed upgrade") type - NoPubSubException* = object of CatchableError + UpgradeFailedError* = object of CatchableError + DialFailedError* = object of CatchableError ConnEventKind* {.pure.} = enum Connected, # A connection was made and securely upgraded - there may be @@ -111,13 +112,13 @@ proc isConnected*(s: Switch, peerId: PeerID): bool = proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = if s.secureManagers.len <= 0: - raise newException(CatchableError, "No secure managers registered!") + raise newException(UpgradeFailedError, "No secure managers registered!") let manager = await s.ms.select(conn, s.secureManagers.mapIt(it.codec)) if manager.len == 0: - raise newException(CatchableError, "Unable to negotiate a secure channel!") + raise newException(UpgradeFailedError, "Unable to negotiate a secure channel!") - trace "securing connection", codec = manager, conn + trace "Securing connection", codec = manager, conn let secureProtocol = s.secureManagers.filterIt(it.codec == manager) # ms.select should deal with the correctness of this @@ -133,7 +134,7 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} = let info = await s.identity.identify(conn, conn.peerInfo) if info.pubKey.isNone and isNil(conn): - raise newException(CatchableError, + raise newException(UpgradeFailedError, "no public key provided and no existing peer identity found") if isNil(conn.peerInfo): @@ -151,7 +152,7 @@ proc identify(s: Switch, conn: Connection) {.async, gcsafe.} = if info.protos.len > 0: conn.peerInfo.protocols = info.protos - trace "identify: identified remote peer", conn + trace "identified remote peer", conn, peerInfo = shortLog(conn.peerInfo) proc identify(s: Switch, muxer: Muxer) {.async, gcsafe.} = # new stream for identify @@ -168,7 +169,7 @@ proc identify(s: Switch, muxer: Muxer) {.async, gcsafe.} = proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} = ## mux incoming connection - trace "muxing connection", conn + trace "Muxing connection", conn if s.muxers.len == 0: warn "no muxers registered, skipping upgrade flow", conn return @@ -184,7 +185,8 @@ proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} = # install stream handler muxer.streamHandler = s.streamHandler - s.connManager.storeOutgoing(muxer.connection) + s.connManager.storeOutgoing(conn) + trace "Storing muxer", conn s.connManager.storeMuxer(muxer) trace "found a muxer", name = muxerName, conn @@ -193,7 +195,7 @@ proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} = let handlerFut = muxer.handle() # store it in muxed connections if we have a peer for it - trace "adding muxer for peer", conn + trace "Storing muxer with handler", conn s.connManager.storeMuxer(muxer, handlerFut) # update muxer with handler return muxer @@ -202,20 +204,21 @@ proc disconnect*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} = s.connManager.dropPeer(peerId) proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = + trace "Upgrading outgoing connection", conn + let sconn = await s.secure(conn) # secure the connection if isNil(sconn): - raise newException(CatchableError, + raise newException(UpgradeFailedError, "unable to secure connection, stopping upgrade") if sconn.peerInfo.isNil: - raise newException(CatchableError, + raise newException(UpgradeFailedError, "current version of nim-libp2p requires that secure protocol negotiates peerid") - trace "upgrading connection", conn let muxer = await s.mux(sconn) # mux it if possible if muxer == nil: # TODO this might be relaxed in the future - raise newException(CatchableError, + raise newException(UpgradeFailedError, "a muxer is required for outgoing connections") try: @@ -228,28 +231,26 @@ proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, g if isNil(sconn.peerInfo): await sconn.close() - raise newException(CatchableError, - "unable to identify connection, stopping upgrade") + raise newException(UpgradeFailedError, + "No peerInfo for connection, stopping upgrade") - trace "successfully upgraded outgoing connection", conn, sconn + trace "Upgraded outgoing connection", conn, sconn return sconn proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = - trace "upgrading incoming connection", conn + trace "Upgrading incoming connection", conn let ms = newMultistream() # secure incoming connections proc securedHandler (conn: Connection, proto: string) {.async, gcsafe, closure.} = - - var sconn: Connection trace "Securing connection", conn let secure = s.secureManagers.filterIt(it.codec == proto)[0] try: - sconn = await secure.secure(conn, false) + var sconn = await secure.secure(conn, false) if isNil(sconn): return @@ -266,7 +267,9 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = except CancelledError as exc: raise exc except CatchableError as exc: - debug "ending secured handler", err = exc.msg, conn + debug "Exception in secure handler", err = exc.msg, conn + + trace "Ending secured handler", conn if (await ms.select(conn)): # just handshake # add the secure handlers @@ -296,11 +299,12 @@ proc internalConnect(s: Switch, # This connection should already have been removed from the connection # manager - it's essentially a bug that we end up here - we'll fail # for now, hoping that this will clean themselves up later... - warn "dead connection in connection manager", peerId + warn "dead connection in connection manager", conn await conn.close() - raise newException(CatchableError, "Zombie connection encountered") + raise newException(DialFailedError, "Zombie connection encountered") + + trace "Reusing existing connection", conn, direction = $conn.dir - trace "Reusing existing connection", conn return conn trace "Dialing peer", peerId @@ -311,10 +315,10 @@ proc internalConnect(s: Switch, let dialed = try: await t.dial(a) except CancelledError as exc: - trace "dialing canceled", exc = exc.msg, peerId + trace "Dialing canceled", exc = exc.msg, peerId raise exc except CatchableError as exc: - trace "dialing failed", exc = exc.msg, peerId + trace "Dialing failed", exc = exc.msg, peerId libp2p_failed_dials.inc() continue # Try the next address @@ -337,7 +341,7 @@ proc internalConnect(s: Switch, doAssert not isNil(upgraded), "connection died after upgradeOutgoing" conn = upgraded - trace "dial successful", conn, peerInfo = upgraded.peerInfo + trace "Dial successful", conn, peerInfo = conn.peerInfo break finally: if lock.locked(): @@ -371,27 +375,25 @@ proc internalConnect(s: Switch, # All the errors are handled inside `cleanup()` procedure. asyncSpawn peerCleanup() - return conn proc connect*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]) {.async.} = discard await s.internalConnect(peerId, addrs) -proc negotiateStream(s: Switch, stream: Connection, proto: string): Future[Connection] {.async.} = - trace "Attempting to select remote", proto = proto, stream +proc negotiateStream(s: Switch, conn: Connection, proto: string): Future[Connection] {.async.} = + trace "Negotiating stream", proto = proto, conn + if not await s.ms.select(conn, proto): + await conn.close() + raise newException(DialFailedError, "Unable to select sub-protocol " & proto) - if not await s.ms.select(stream, proto): - await stream.close() - raise newException(CatchableError, "Unable to select sub-protocol" & proto) - - return stream + return conn proc dial*(s: Switch, peerId: PeerID, proto: string): Future[Connection] {.async.} = let stream = await s.connmanager.getMuxedStream(peerId) if stream.isNil: - raise newException(CatchableError, "Couldn't get muxed stream") + raise newException(DialFailedError, "Couldn't get muxed stream") return await s.negotiateStream(stream, proto) @@ -413,7 +415,7 @@ proc dial*(s: Switch, try: if isNil(stream): await conn.close() - raise newException(CatchableError, "Couldn't get muxed stream") + raise newException(DialFailedError, "Couldn't get muxed stream") return await s.negotiateStream(stream, proto) except CancelledError as exc: @@ -421,7 +423,7 @@ proc dial*(s: Switch, await cleanup() raise exc except CatchableError as exc: - trace "error dialing", exc = exc.msg, conn + trace "Error dialing", exc = exc.msg, conn await cleanup() raise exc @@ -440,14 +442,16 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = trace "starting switch for peer", peerInfo = s.peerInfo proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} = + trace "Incoming connection", conn try: await s.upgradeIncoming(conn) # perform upgrade on incoming connection except CancelledError as exc: raise exc except CatchableError as exc: - trace "Error in connection handler", exc = exc.msg, conn + trace "Exception occurred in incoming handler", exc = exc.msg, conn finally: await conn.close() + trace "Connection handler done", conn var startFuts: seq[Future[void]] for t in s.transports: # for each transport @@ -457,7 +461,7 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = s.peerInfo.addrs[i] = t.ma # update peer's address startFuts.add(server) - debug "started libp2p node", peerInfo = s.peerInfo + debug "Started libp2p node", peer = s.peerInfo result = startFuts # listen for incoming connections proc stop*(s: Switch) {.async.} = @@ -477,13 +481,16 @@ proc stop*(s: Switch) {.async.} = trace "switch stopped" proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = - if muxer.connection.peerInfo.isNil: + let + conn = muxer.connection + + if conn.peerInfo.isNil: warn "This version of nim-libp2p requires secure protocol to negotiate peerid" await muxer.close() return # store incoming connection - s.connManager.storeIncoming(muxer.connection) + s.connManager.storeIncoming(conn) # store muxer and muxed connection s.connManager.storeMuxer(muxer) @@ -494,10 +501,10 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = # Identify is non-essential, though if it fails, it might indicate that # the connection was closed already - this will be picked up by the read # loop - debug "Could not identify connection", err = exc.msg, muxer + debug "Could not identify connection", err = exc.msg, conn try: - let peerId = muxer.connection.peerInfo.peerId + let peerId = conn.peerInfo.peerId proc peerCleanup() {.async.} = try: @@ -507,10 +514,10 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = except CancelledError: # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError. - debug "Unexpected cancellation in switch muxer cleanup" + debug "Unexpected cancellation in switch muxer cleanup", conn except CatchableError as exc: debug "Unexpected exception in switch muxer cleanup", - errMsg = exc.msg, muxer + err = exc.msg, conn proc peerStartup() {.async.} = try: @@ -520,10 +527,10 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = except CancelledError: # This is top-level procedure which will work as separate task, so it # do not need to propogate CancelledError. - debug "Unexpected cancellation in switch muxer startup", muxer + debug "Unexpected cancellation in switch muxer startup", conn except CatchableError as exc: debug "Unexpected exception in switch muxer startup", - errMsg = exc.msg, muxer + err = exc.msg, conn # All the errors are handled inside `peerStartup()` procedure. asyncSpawn peerStartup() @@ -537,7 +544,7 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = except CatchableError as exc: await muxer.close() libp2p_failed_upgrade.inc() - trace "exception in muxer handler", exc = exc.msg, muxer + trace "Exception in muxer handler", exc = exc.msg, conn proc newSwitch*(peerInfo: PeerInfo, transports: seq[Transport], @@ -558,17 +565,17 @@ proc newSwitch*(peerInfo: PeerInfo, ) let s = result # can't capture result - result.streamHandler = proc(stream: Connection) {.async, gcsafe.} = + result.streamHandler = proc(conn: Connection) {.async, gcsafe.} = # noraises + trace "Incoming muxed connection", conn try: - trace "handling connection for", stream - defer: - if not(isNil(stream)): - await stream.close() - await s.ms.handle(stream) # handle incoming connection + await s.ms.handle(conn) # handle incoming connection except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception in stream handler", exc = exc.msg, stream + trace "exception in stream handler", exc = exc.msg, conn + finally: + await conn.close() + trace "Muxed connection done", conn result.mount(identity) for key, val in muxers: