From 96c01e5e6952d013cdff50821f5872b5c3019980 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 20 Jan 2021 11:28:32 -0600 Subject: [PATCH] Split upgrade flow (#507) * splitting upgrade flow * bring back master changes * re-export `Upgrade` * export public methods/procs in derived class * style fixes --- libp2p/switch.nim | 245 ++------------------------- libp2p/upgrademngrs/muxedupgrade.nim | 213 +++++++++++++++++++++++ libp2p/upgrademngrs/upgrade.nim | 80 +++++++++ 3 files changed, 305 insertions(+), 233 deletions(-) create mode 100644 libp2p/upgrademngrs/muxedupgrade.nim create mode 100644 libp2p/upgrademngrs/upgrade.nim diff --git a/libp2p/switch.nim b/libp2p/switch.nim index e513c864d..49559eba1 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -19,6 +19,7 @@ import chronos, import stream/connection, transports/transport, + upgrademngrs/[upgrade, muxedupgrade], multistream, multiaddress, protocols/protocol, @@ -31,7 +32,7 @@ import stream/connection, peerid, errors -export connmanager +export connmanager, upgrade logScope: topics = "libp2p switch" @@ -52,21 +53,16 @@ const ConcurrentUpgrades* = 4 type - UpgradeFailedError* = object of CatchableError DialFailedError* = object of CatchableError Switch* = ref object of RootObj peerInfo*: PeerInfo connManager*: ConnManager transports*: seq[Transport] - protocols*: seq[LPProtocol] - muxers*: Table[string, MuxerProvider] ms*: MultistreamSelect - identity*: Identify - streamHandler*: StreamHandler - secureManagers*: seq[Secure] dialLock: Table[PeerID, AsyncLock] acceptFuts: seq[Future[void]] + upgrade: Upgrade proc addConnEventHandler*(s: Switch, handler: ConnEventHandler, @@ -97,178 +93,9 @@ proc isConnected*(s: Switch, peerId: PeerID): bool = peerId in s.connManager -proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = - if s.secureManagers.len <= 0: - raise newException(UpgradeFailedError, "No secure managers registered!") - - let codec = await s.ms.select(conn, s.secureManagers.mapIt(it.codec)) - if codec.len == 0: - raise newException(UpgradeFailedError, "Unable to negotiate a secure channel!") - - trace "Securing connection", conn, codec - let secureProtocol = s.secureManagers.filterIt(it.codec == codec) - - # ms.select should deal with the correctness of this - # let's avoid duplicating checks but detect if it fails to do it properly - doAssert(secureProtocol.len > 0) - - return await secureProtocol[0].secure(conn, true) - -proc identify(s: Switch, conn: Connection) {.async, gcsafe.} = - ## identify the connection - - if (await s.ms.select(conn, s.identity.codec)): - let info = await s.identity.identify(conn, conn.peerInfo) - - if info.pubKey.isNone and isNil(conn): - raise newException(UpgradeFailedError, - "no public key provided and no existing peer identity found") - - if isNil(conn.peerInfo): - conn.peerInfo = PeerInfo.init(info.pubKey.get()) - - if info.addrs.len > 0: - conn.peerInfo.addrs = info.addrs - - if info.agentVersion.isSome: - conn.peerInfo.agentVersion = info.agentVersion.get() - - if info.protoVersion.isSome: - conn.peerInfo.protoVersion = info.protoVersion.get() - - if info.protos.len > 0: - conn.peerInfo.protocols = info.protos - - trace "identified remote peer", conn, peerInfo = shortLog(conn.peerInfo) - -proc identify(s: Switch, muxer: Muxer) {.async, gcsafe.} = - # new stream for identify - var stream = await muxer.newStream() - if stream == nil: - return - - try: - await s.identify(stream) - finally: - await stream.closeWithEOF() - -proc mux(s: Switch, conn: Connection): Future[Muxer] {.async, gcsafe.} = - ## mux incoming connection - - trace "Muxing connection", conn - if s.muxers.len == 0: - warn "no muxers registered, skipping upgrade flow", conn - return - - let muxerName = await s.ms.select(conn, toSeq(s.muxers.keys())) - if muxerName.len == 0 or muxerName == "na": - debug "no muxer available, early exit", conn - return - - trace "Found a muxer", conn, muxerName - - # create new muxer for connection - let muxer = s.muxers[muxerName].newMuxer(conn) - - # install stream handler - muxer.streamHandler = s.streamHandler - - s.connManager.storeOutgoing(conn) - - # store it in muxed connections if we have a peer for it - s.connManager.storeMuxer(muxer, muxer.handle()) # store muxer and start read loop - - try: - await s.identify(muxer) - except CatchableError as exc: - # Identify is non-essential, though if it fails, it might indicate that - # the connection was closed already - this will be picked up by the read - # loop - debug "Could not identify connection", conn, msg = exc.msg - - return muxer - proc disconnect*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} = s.connManager.dropPeer(peerId) -proc upgradeOutgoing(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = - trace "Upgrading outgoing connection", conn - - let sconn = await s.secure(conn) # secure the connection - if isNil(sconn): - raise newException(UpgradeFailedError, - "unable to secure connection, stopping upgrade") - - if sconn.peerInfo.isNil: - raise newException(UpgradeFailedError, - "current version of nim-libp2p requires that secure protocol negotiates peerid") - - let muxer = await s.mux(sconn) # mux it if possible - if muxer == nil: - # TODO this might be relaxed in the future - raise newException(UpgradeFailedError, - "a muxer is required for outgoing connections") - - if sconn.closed() or isNil(sconn.peerInfo): - await sconn.close() - raise newException(UpgradeFailedError, - "Connection closed or missing peer info, stopping upgrade") - - trace "Upgraded outgoing connection", conn, sconn - - return sconn - -proc upgradeIncoming(s: Switch, incomingConn: Connection) {.async, gcsafe.} = # noraises - trace "Upgrading incoming connection", incomingConn - let ms = newMultistream() - - # secure incoming connections - proc securedHandler(conn: Connection, - proto: string) - {.async, gcsafe, closure.} = - trace "Starting secure handler", conn - let secure = s.secureManagers.filterIt(it.codec == proto)[0] - - var cconn = conn - try: - var sconn = await secure.secure(cconn, false) - if isNil(sconn): - return - - cconn = sconn - # add the muxer - for muxer in s.muxers.values: - ms.addHandler(muxer.codecs, muxer) - - # handle subsequent secure requests - await ms.handle(cconn) - except CatchableError as exc: - debug "Exception in secure handler during incoming upgrade", msg = exc.msg, conn - if not cconn.isUpgraded: - cconn.upgrade(exc) - finally: - if not isNil(cconn): - await cconn.close() - - trace "Stopped secure handler", conn - - try: - if (await ms.select(incomingConn)): # just handshake - # add the secure handlers - for k in s.secureManagers: - ms.addHandler(k.codec, securedHandler) - - # handle un-secured connections - # we handshaked above, set this ms handler as active - await ms.handle(incomingConn, active = true) - except CatchableError as exc: - debug "Exception upgrading incoming", exc = exc.msg - if not incomingConn.isUpgraded: - incomingConn.upgrade(exc) - finally: - if not isNil(incomingConn): - await incomingConn.close() - proc dialAndUpgrade(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]): @@ -295,7 +122,7 @@ proc dialAndUpgrade(s: Switch, libp2p_successful_dials.inc() let conn = try: - await s.upgradeOutgoing(dialed) + await s.upgrade.upgradeOutgoing(dialed) except CatchableError as exc: # If we failed to establish the connection through one transport, # we won't succeeded through another - no use in trying again @@ -476,7 +303,7 @@ proc accept(s: Switch, transport: Transport) {.async.} = # noraises debug "Accepted an incoming connection", conn asyncSpawn upgradeMonitor(conn, upgrades) - asyncSpawn s.upgradeIncoming(conn) + asyncSpawn s.upgrade.upgradeIncoming(conn) except CancelledError as exc: trace "releasing semaphore on cancellation" upgrades.release() # always release the slot @@ -529,39 +356,6 @@ proc stop*(s: Switch) {.async.} = trace "Switch stopped" -proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = - let - conn = muxer.connection - - if conn.peerInfo.isNil: - warn "This version of nim-libp2p requires secure protocol to negotiate peerid" - await muxer.close() - return - - # store incoming connection - s.connManager.storeIncoming(conn) - - # store muxer and muxed connection - s.connManager.storeMuxer(muxer) - - try: - await s.identify(muxer) - except IdentifyError as exc: - # Identify is non-essential, though if it fails, it might indicate that - # the connection was closed already - this will be picked up by the read - # loop - debug "Could not identify connection", conn, msg = exc.msg - except LPStreamClosedError as exc: - debug "Identify stream closed", conn, msg = exc.msg - except LPStreamEOFError as exc: - debug "Identify stream EOF", conn, msg = exc.msg - except CancelledError as exc: - await muxer.close() - raise exc - except CatchableError as exc: - await muxer.close() - trace "Exception in muxer handler", conn, msg = exc.msg - proc newSwitch*(peerInfo: PeerInfo, transports: seq[Transport], identity: Identify, @@ -570,34 +364,19 @@ proc newSwitch*(peerInfo: PeerInfo, if secureManagers.len == 0: raise (ref CatchableError)(msg: "Provide at least one secure manager") + let ms = newMultistream() + let connManager = ConnManager.init() + let upgrade = MuxedUpgrade.init(identity, muxers, secureManagers, connManager, ms) + let switch = Switch( peerInfo: peerInfo, - ms: newMultistream(), + ms: ms, transports: transports, - connManager: ConnManager.init(), - identity: identity, - muxers: muxers, - secureManagers: @secureManagers, + connManager: connManager, + upgrade: upgrade, ) - switch.streamHandler = proc(conn: Connection) {.async, gcsafe.} = # noraises - trace "Starting stream handler", conn - try: - await switch.ms.handle(conn) # handle incoming connection - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "exception in stream handler", conn, msg = exc.msg - finally: - await conn.closeWithEOF() - trace "Stream handler done", conn - switch.mount(identity) - for key, val in muxers: - val.streamHandler = switch.streamHandler - val.muxerHandler = proc(muxer: Muxer): Future[void] = - switch.muxerHandler(muxer) - return switch proc isConnected*(s: Switch, peerInfo: PeerInfo): bool diff --git a/libp2p/upgrademngrs/muxedupgrade.nim b/libp2p/upgrademngrs/muxedupgrade.nim new file mode 100644 index 000000000..feddc95e0 --- /dev/null +++ b/libp2p/upgrademngrs/muxedupgrade.nim @@ -0,0 +1,213 @@ +## Nim-LibP2P +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/[tables, sequtils] +import pkg/[chronos, chronicles, metrics] + +import ../upgrademngrs/upgrade, + ../muxers/muxer + +export Upgrade + +type + MuxedUpgrade* = ref object of Upgrade + muxers*: Table[string, MuxerProvider] + streamHandler*: StreamHandler + +proc identify*(u: MuxedUpgrade, muxer: Muxer) {.async, gcsafe.} = + # new stream for identify + var stream = await muxer.newStream() + if stream == nil: + return + + try: + await u.identify(stream) + finally: + await stream.closeWithEOF() + +proc mux*(u: MuxedUpgrade, conn: Connection): Future[Muxer] {.async, gcsafe.} = + ## mux incoming connection + + trace "Muxing connection", conn + if u.muxers.len == 0: + warn "no muxers registered, skipping upgrade flow", conn + return + + let muxerName = await u.ms.select(conn, toSeq(u.muxers.keys())) + if muxerName.len == 0 or muxerName == "na": + debug "no muxer available, early exit", conn + return + + trace "Found a muxer", conn, muxerName + + # create new muxer for connection + let muxer = u.muxers[muxerName].newMuxer(conn) + + # install stream handler + muxer.streamHandler = u.streamHandler + + u.connManager.storeOutgoing(conn) + + # store it in muxed connections if we have a peer for it + u.connManager.storeMuxer(muxer, muxer.handle()) # store muxer and start read loop + + try: + await u.identify(muxer) + except CatchableError as exc: + # Identify is non-essential, though if it fails, it might indicate that + # the connection was closed already - this will be picked up by the read + # loop + debug "Could not identify connection", conn, msg = exc.msg + + return muxer + +method upgradeOutgoing*(u: MuxedUpgrade, conn: Connection): Future[Connection] {.async, gcsafe.} = + trace "Upgrading outgoing connection", conn + + let sconn = await u.secure(conn) # secure the connection + if isNil(sconn): + raise newException(UpgradeFailedError, + "unable to secure connection, stopping upgrade") + + if sconn.peerInfo.isNil: + raise newException(UpgradeFailedError, + "current version of nim-libp2p requires that secure protocol negotiates peerid") + + let muxer = await u.mux(sconn) # mux it if possible + if muxer == nil: + # TODO this might be relaxed in the future + raise newException(UpgradeFailedError, + "a muxer is required for outgoing connections") + + if sconn.closed() or isNil(sconn.peerInfo): + await sconn.close() + raise newException(UpgradeFailedError, + "Connection closed or missing peer info, stopping upgrade") + + trace "Upgraded outgoing connection", conn, sconn + + return sconn + +method upgradeIncoming*(u: MuxedUpgrade, incomingConn: Connection) {.async, gcsafe.} = # noraises + trace "Upgrading incoming connection", incomingConn + let ms = newMultistream() + + # secure incoming connections + proc securedHandler(conn: Connection, + proto: string) + {.async, gcsafe, closure.} = + trace "Starting secure handler", conn + let secure = u.secureManagers.filterIt(it.codec == proto)[0] + + var cconn = conn + try: + var sconn = await secure.secure(cconn, false) + if isNil(sconn): + return + + cconn = sconn + # add the muxer + for muxer in u.muxers.values: + ms.addHandler(muxer.codecs, muxer) + + # handle subsequent secure requests + await ms.handle(cconn) + except CatchableError as exc: + debug "Exception in secure handler during incoming upgrade", msg = exc.msg, conn + if not cconn.isUpgraded: + cconn.upgrade(exc) + finally: + if not isNil(cconn): + await cconn.close() + + trace "Stopped secure handler", conn + + try: + if (await ms.select(incomingConn)): # just handshake + # add the secure handlers + for k in u.secureManagers: + ms.addHandler(k.codec, securedHandler) + + # handle un-secured connections + # we handshaked above, set this ms handler as active + await ms.handle(incomingConn, active = true) + except CatchableError as exc: + debug "Exception upgrading incoming", exc = exc.msg + if not incomingConn.isUpgraded: + incomingConn.upgrade(exc) + finally: + if not isNil(incomingConn): + await incomingConn.close() + +proc muxerHandler(u: MuxedUpgrade, muxer: Muxer) {.async, gcsafe.} = + let + conn = muxer.connection + + if conn.peerInfo.isNil: + warn "This version of nim-libp2p requires secure protocol to negotiate peerid" + await muxer.close() + return + + # store incoming connection + u.connManager.storeIncoming(conn) + + # store muxer and muxed connection + u.connManager.storeMuxer(muxer) + + try: + await u.identify(muxer) + except IdentifyError as exc: + # Identify is non-essential, though if it fails, it might indicate that + # the connection was closed already - this will be picked up by the read + # loop + debug "Could not identify connection", conn, msg = exc.msg + except LPStreamClosedError as exc: + debug "Identify stream closed", conn, msg = exc.msg + except LPStreamEOFError as exc: + debug "Identify stream EOF", conn, msg = exc.msg + except CancelledError as exc: + await muxer.close() + raise exc + except CatchableError as exc: + await muxer.close() + trace "Exception in muxer handler", conn, msg = exc.msg + +proc init*( + T: type MuxedUpgrade, + identity: Identify, + muxers: Table[string, MuxerProvider], + secureManagers: openarray[Secure] = [], + connManager: ConnManager, + ms: MultistreamSelect): T = + + let upgrader = T( + identity: identity, + muxers: muxers, + secureManagers: @secureManagers, + connManager: connManager, + ms: ms) + + upgrader.streamHandler = proc(conn: Connection) {.async, gcsafe.} = # noraises + trace "Starting stream handler", conn + try: + await upgrader.ms.handle(conn) # handle incoming connection + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "exception in stream handler", conn, msg = exc.msg + finally: + await conn.closeWithEOF() + trace "Stream handler done", conn + + for _, val in muxers: + val.streamHandler = upgrader.streamHandler + val.muxerHandler = proc(muxer: Muxer): Future[void] = + upgrader.muxerHandler(muxer) + + return upgrader diff --git a/libp2p/upgrademngrs/upgrade.nim b/libp2p/upgrademngrs/upgrade.nim new file mode 100644 index 000000000..2af6d1b5b --- /dev/null +++ b/libp2p/upgrademngrs/upgrade.nim @@ -0,0 +1,80 @@ +## Nim-LibP2P +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/[options, sequtils] +import pkg/[chronos, chronicles, metrics] + +import ../stream/connection, + ../protocols/secure/secure, + ../protocols/identify, + ../multistream, + ../connmanager + +export connmanager, connection, identify, secure, multistream + +declarePublicCounter(libp2p_failed_upgrade, "peers failed upgrade") + +type + UpgradeFailedError* = object of CatchableError + + Upgrade* = ref object of RootObj + ms*: MultistreamSelect + identity*: Identify + connManager*: ConnManager + secureManagers*: seq[Secure] + +method upgradeIncoming*(u: Upgrade, conn: Connection): Future[void] {.base.} = + doAssert(false, "Not implemented!") + +method upgradeOutgoing*(u: Upgrade, conn: Connection): Future[Connection] {.base.} = + doAssert(false, "Not implemented!") + +proc secure*(u: Upgrade, conn: Connection): Future[Connection] {.async, gcsafe.} = + if u.secureManagers.len <= 0: + raise newException(UpgradeFailedError, "No secure managers registered!") + + let codec = await u.ms.select(conn, u.secureManagers.mapIt(it.codec)) + if codec.len == 0: + raise newException(UpgradeFailedError, "Unable to negotiate a secure channel!") + + trace "Securing connection", conn, codec + let secureProtocol = u.secureManagers.filterIt(it.codec == codec) + + # ms.select should deal with the correctness of this + # let's avoid duplicating checks but detect if it fails to do it properly + doAssert(secureProtocol.len > 0) + + return await secureProtocol[0].secure(conn, true) + +proc identify*(u: Upgrade, conn: Connection) {.async, gcsafe.} = + ## identify the connection + + if (await u.ms.select(conn, u.identity.codec)): + let info = await u.identity.identify(conn, conn.peerInfo) + + if info.pubKey.isNone and isNil(conn): + raise newException(UpgradeFailedError, + "no public key provided and no existing peer identity found") + + if isNil(conn.peerInfo): + conn.peerInfo = PeerInfo.init(info.pubKey.get()) + + if info.addrs.len > 0: + conn.peerInfo.addrs = info.addrs + + if info.agentVersion.isSome: + conn.peerInfo.agentVersion = info.agentVersion.get() + + if info.protoVersion.isSome: + conn.peerInfo.protoVersion = info.protoVersion.get() + + if info.protos.len > 0: + conn.peerInfo.protocols = info.protos + + trace "identified remote peer", conn, peerInfo = shortLog(conn.peerInfo)