diff --git a/libp2p/dial.nim b/libp2p/dial.nim new file mode 100644 index 000000000..b4625400b --- /dev/null +++ b/libp2p/dial.nim @@ -0,0 +1,46 @@ +## Nim-LibP2P +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import chronos +import peerid, + stream/connection + +type + Dial* = ref object of RootObj + +method connect*( + self: Dial, + peerId: PeerID, + addrs: seq[MultiAddress]) {.async, base.} = + ## connect remote peer without negotiating + ## a protocol + ## + + doAssert(false, "Not implemented!") + +method dial*( + self: Dial, + peerId: PeerID, + protos: seq[string]): Future[Connection] {.async, base.} = + ## create a protocol stream over an + ## existing connection + ## + + doAssert(false, "Not implemented!") + +method dial*( + self: Dial, + peerId: PeerID, + addrs: seq[MultiAddress], + protos: seq[string]): Future[Connection] {.async, base.} = + ## create a protocol stream and establish + ## a connection if one doesn't exist already + ## + + doAssert(false, "Not implemented!") diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim new file mode 100644 index 000000000..e871aea38 --- /dev/null +++ b/libp2p/dialer.nim @@ -0,0 +1,241 @@ +## Nim-LibP2P +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/[sugar, tables] + +import pkg/[chronos, + chronicles, + metrics] + +import dial, + peerid, + peerinfo, + multistream, + connmanager, + stream/connection, + transports/transport + +export dial + +logScope: + topics = "libp2p dialer" + +declareCounter(libp2p_total_dial_attempts, "total attempted dials") +declareCounter(libp2p_successful_dials, "dialed successful peers") +declareCounter(libp2p_failed_dials, "failed dials") +declareCounter(libp2p_failed_upgrades_outgoing, "outgoing connections failed upgrades") + +type + DialFailedError* = object of CatchableError + + Dialer* = ref object of Dial + peerInfo*: PeerInfo + ms: MultistreamSelect + connManager: ConnManager + dialLock: Table[PeerID, AsyncLock] + transports: seq[Transport] + +proc dialAndUpgrade( + self: Dialer, + peerId: PeerID, + addrs: seq[MultiAddress]): + Future[Connection] {.async.} = + debug "Dialing peer", peerId + + # Avoid "cannot be captured as it would violate memory safety" errors in Nim-1.4.x. + var + transport: Transport + address: MultiAddress + + for t in self.transports: # for each transport + transport = t + for a in addrs: # for each address + address = a + if t.handles(a): # check if it can dial it + trace "Dialing address", address = $a, peerId + let dialed = try: + libp2p_total_dial_attempts.inc() + # await a connection slot when the total + # connection count is equal to `maxConns` + await self.connManager.trackOutgoingConn( + () => transport.dial(address) + ) + except TooManyConnectionsError as exc: + trace "Connection limit reached!" + raise exc + except CancelledError as exc: + debug "Dialing canceled", msg = exc.msg, peerId + raise exc + except CatchableError as exc: + debug "Dialing failed", msg = exc.msg, peerId + libp2p_failed_dials.inc() + continue # Try the next address + + # make sure to assign the peer to the connection + dialed.peerInfo = PeerInfo.init(peerId, addrs) + + # also keep track of the connection's bottom unsafe transport direction + # required by gossipsub scoring + dialed.transportDir = Direction.Out + + libp2p_successful_dials.inc() + + let conn = try: + await transport.upgradeOutgoing(dialed) + except CatchableError as exc: + # If we failed to establish the connection through one transport, + # we won't succeeded through another - no use in trying again + await dialed.close() + debug "Upgrade failed", msg = exc.msg, peerId + if exc isnot CancelledError: + libp2p_failed_upgrades_outgoing.inc() + raise exc + + doAssert not isNil(conn), "connection died after upgradeOutgoing" + debug "Dial successful", conn, peerInfo = conn.peerInfo + return conn + +proc internalConnect( + self: Dialer, + peerId: PeerID, + addrs: seq[MultiAddress]): + Future[Connection] {.async.} = + if self.peerInfo.peerId == peerId: + raise newException(CatchableError, "can't dial self!") + + # Ensure there's only one in-flight attempt per peer + let lock = self.dialLock.mgetOrPut(peerId, newAsyncLock()) + try: + await lock.acquire() + + # Check if we have a connection already and try to reuse it + var conn = self.connManager.selectConn(peerId) + if conn != nil: + if conn.atEof or conn.closed: + # This connection should already have been removed from the connection + # manager - it's essentially a bug that we end up here - we'll fail + # for now, hoping that this will clean themselves up later... + warn "dead connection in connection manager", conn + await conn.close() + raise newException(DialFailedError, "Zombie connection encountered") + + trace "Reusing existing connection", conn, direction = $conn.dir + return conn + + conn = await self.dialAndUpgrade(peerId, addrs) + if isNil(conn): # None of the addresses connected + raise newException(DialFailedError, "Unable to establish outgoing link") + + # We already check for this in Connection manager + # but a disconnect could have happened right after + # we've added the connection so we check again + # to prevent races due to that. + if conn.closed() or conn.atEof(): + # This can happen when the other ends drops us + # before we get a chance to return the connection + # back to the dialer. + trace "Connection dead on arrival", conn + raise newLPStreamClosedError() + + return conn + finally: + if lock.locked(): + lock.release() + +method connect*( + self: Dialer, + peerId: PeerID, + addrs: seq[MultiAddress]) {.async.} = + ## connect remote peer without negotiating + ## a protocol + ## + + if self.connManager.connCount(peerId) > 0: + return + + discard await self.internalConnect(peerId, addrs) + +proc negotiateStream( + self: Dialer, + conn: Connection, + protos: seq[string]): Future[Connection] {.async.} = + trace "Negotiating stream", conn, protos + let selected = await self.ms.select(conn, protos) + if not protos.contains(selected): + await conn.closeWithEOF() + raise newException(DialFailedError, "Unable to select sub-protocol " & $protos) + + return conn + +method dial*( + self: Dialer, + peerId: PeerID, + protos: seq[string]): Future[Connection] {.async.} = + ## create a protocol stream over an + ## existing connection + ## + + trace "Dialing (existing)", peerId, protos + let stream = await self.connManager.getStream(peerId) + if stream.isNil: + raise newException(DialFailedError, "Couldn't get muxed stream") + + return await self.negotiateStream(stream, protos) + +method dial*( + self: Dialer, + peerId: PeerID, + addrs: seq[MultiAddress], + protos: seq[string]): Future[Connection] {.async.} = + ## create a protocol stream and establish + ## a connection if one doesn't exist already + ## + + var + conn: Connection + stream: Connection + + proc cleanup() {.async.} = + if not(isNil(stream)): + await stream.closeWithEOF() + + if not(isNil(conn)): + await conn.close() + + try: + trace "Dialing (new)", peerId, protos + conn = await self.internalConnect(peerId, addrs) + trace "Opening stream", conn + stream = await self.connManager.getStream(conn) + + if isNil(stream): + raise newException(DialFailedError, + "Couldn't get muxed stream") + + return await self.negotiateStream(stream, protos) + except CancelledError as exc: + trace "Dial canceled", conn + await cleanup() + raise exc + except CatchableError as exc: + debug "Error dialing", conn, msg = exc.msg + await cleanup() + raise exc + +proc new*( + T: type Dialer, + peerInfo: PeerInfo, + connManager: ConnManager, + transports: seq[Transport], + ms: MultistreamSelect): Dialer = + + T(peerInfo: peerInfo, + connManager: connManager, + transports: transports, + ms: ms) diff --git a/libp2p/standard_setup.nim b/libp2p/standard_setup.nim new file mode 100644 index 000000000..b3a2d3c53 --- /dev/null +++ b/libp2p/standard_setup.nim @@ -0,0 +1,71 @@ +import + options, tables, chronos, bearssl, + switch, peerid, peerinfo, stream/connection, multiaddress, + crypto/crypto, transports/[transport, tcptransport], + muxers/[muxer, mplex/mplex], + protocols/[identify, secure/secure, secure/noise], + upgrademngrs/[upgrade, muxedupgrade], connmanager + +export + switch, peerid, peerinfo, connection, multiaddress, crypto + +type + SecureProtocol* {.pure.} = enum + Noise, + Secio {.deprecated.} + +proc newStandardSwitch*(privKey = none(PrivateKey), + address = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(), + secureManagers: openarray[SecureProtocol] = [ + SecureProtocol.Noise, + ], + transportFlags: set[ServerFlags] = {}, + rng = newRng(), + inTimeout: Duration = 5.minutes, + outTimeout: Duration = 5.minutes, + maxConnections = MaxConnections, + maxIn = -1, + maxOut = -1, + maxConnsPerPeer = MaxConnectionsPerPeer): Switch = + proc createMplex(conn: Connection): Muxer = + Mplex.init( + conn, + inTimeout = inTimeout, + outTimeout = outTimeout) + + if rng == nil: # newRng could fail + raise (ref CatchableError)(msg: "Cannot initialize RNG") + + let + seckey = privKey.get(otherwise = PrivateKey.random(rng[]).tryGet()) + peerInfo = PeerInfo.init(seckey, [address]) + + var + secureManagerInstances: seq[Secure] + + for sec in secureManagers: + case sec + of SecureProtocol.Noise: + secureManagerInstances &= newNoise(rng, seckey).Secure + of SecureProtocol.Secio: + quit("Secio is deprecated!") # use of secio is unsafe + + let + mplexProvider = newMuxerProvider(createMplex, MplexCodec) + ms = newMultistream() + identify = newIdentify(peerInfo) + muxers = {MplexCodec: mplexProvider}.toTable + connManager = ConnManager.init(maxConnsPerPeer, maxConnections, maxIn, maxOut) + muxedUpgrade = MuxedUpgrade.init(identify, muxers, secureManagerInstances, connManager, ms) + transports = @[Transport(TcpTransport.init(transportFlags, muxedUpgrade))] + + let switch = newSwitch( + peerInfo, + transports, + identify, + muxers, + secureManagers = secureManagerInstances, + connManager = connManager, + ms = ms) + + return switch diff --git a/libp2p/switch.nim b/libp2p/switch.nim index d8cd7f99e..22443959b 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -33,9 +33,10 @@ import stream/connection, utils/semaphore, connmanager, peerid, - errors + errors, + dialer -export connmanager, upgrade +export connmanager, upgrade, dialer logScope: topics = "libp2p switch" @@ -46,26 +47,19 @@ logScope: # and only if the channel has been secured (i.e. if a secure manager has been # previously provided) -declareCounter(libp2p_total_dial_attempts, "total attempted dials") -declareCounter(libp2p_successful_dials, "dialed successful peers") -declareCounter(libp2p_failed_dials, "failed dials") declareCounter(libp2p_failed_upgrades_incoming, "incoming connections failed upgrades") -declareCounter(libp2p_failed_upgrades_outgoing, "outgoing connections failed upgrades") const ConcurrentUpgrades* = 4 type - DialFailedError* = object of LPError - - Switch* = ref object of RootObj + Switch* = ref object of Dial peerInfo*: PeerInfo connManager*: ConnManager transports*: seq[Transport] ms*: MultistreamSelect - dialLock: Table[PeerID, AsyncLock] acceptFuts: seq[Future[void]] - upgrade: Upgrade + dialer*: Dial proc addConnEventHandler*(s: Switch, handler: ConnEventHandler, @@ -97,186 +91,36 @@ proc isConnected*(s: Switch, peerId: PeerID): bool = proc disconnect*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} = s.connManager.dropPeer(peerId) -proc dialAndUpgrade(s: Switch, - peerId: PeerID, - addrs: seq[MultiAddress]): - Future[Connection] {.async.} = - debug "Dialing peer", peerId +method connect*( + s: Switch, + peerId: PeerID, + addrs: seq[MultiAddress]): Future[void] = + s.dialer.connect(peerId, addrs) - # Avoid "cannot be captured as it would violate memory safety" errors in Nim-1.4.x. - var - transport: Transport - address: MultiAddress - - for t in s.transports: # for each transport - transport = t - for a in addrs: # for each address - address = a - if t.handles(a): # check if it can dial it - trace "Dialing address", address = $a, peerId - let dialed = try: - libp2p_total_dial_attempts.inc() - # await a connection slot when the total - # connection count is equal to `maxConns` - await s.connManager.trackOutgoingConn( - () => transport.dial(address) - ) - except TooManyConnectionsError as exc: - trace "Connection limit reached!" - raise exc - except CancelledError as exc: - debug "Dialing canceled", msg = exc.msg, peerId - raise exc - except CatchableError as exc: - debug "Dialing failed", msg = exc.msg, peerId - libp2p_failed_dials.inc() - continue # Try the next address - - # make sure to assign the peer to the connection - dialed.peerInfo = PeerInfo.init(peerId, addrs) - - # also keep track of the connection's bottom unsafe transport direction - # required by gossipsub scoring - dialed.transportDir = Direction.Out - - libp2p_successful_dials.inc() - - let conn = try: - await s.upgrade.upgradeOutgoing(dialed) - except CatchableError as exc: - # If we failed to establish the connection through one transport, - # we won't succeeded through another - no use in trying again - await dialed.close() - debug "Upgrade failed", msg = exc.msg, peerId - if exc isnot CancelledError: - libp2p_failed_upgrades_outgoing.inc() - raise exc - - doAssert not isNil(conn), "connection died after upgradeOutgoing" - debug "Dial successful", conn, peerInfo = conn.peerInfo - return conn - -proc internalConnect(s: Switch, - peerId: PeerID, - addrs: seq[MultiAddress]): - Future[Connection] {.async.} = - if s.peerInfo.peerId == peerId: - raise newException(CatchableError, "can't dial self!") - - # Ensure there's only one in-flight attempt per peer - let lock = s.dialLock.mgetOrPut(peerId, newAsyncLock()) - try: - await lock.acquire() - - # Check if we have a connection already and try to reuse it - var conn = s.connManager.selectConn(peerId) - if conn != nil: - if conn.atEof or conn.closed: - # This connection should already have been removed from the connection - # manager - it's essentially a bug that we end up here - we'll fail - # for now, hoping that this will clean themselves up later... - warn "dead connection in connection manager", conn - await conn.close() - raise newException(DialFailedError, "Zombie connection encountered") - - trace "Reusing existing connection", conn, direction = $conn.dir - return conn - - conn = await s.dialAndUpgrade(peerId, addrs) - if isNil(conn): # None of the addresses connected - raise newException(DialFailedError, "Unable to establish outgoing link") - - # We already check for this in Connection manager - # but a disconnect could have happened right after - # we've added the connection so we check again - # to prevent races due to that. - if conn.closed() or conn.atEof(): - # This can happen when the other ends drops us - # before we get a chance to return the connection - # back to the dialer. - trace "Connection dead on arrival", conn - raise newLPStreamClosedError() - - return conn - finally: - if lock.locked(): - lock.release() - -proc connect*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]) {.async.} = - ## attempt to create establish a connection - ## with a remote peer - ## - - if s.connManager.connCount(peerId) > 0: - return - - discard await s.internalConnect(peerId, addrs) - -proc negotiateStream(s: Switch, conn: Connection, protos: seq[string]): Future[Connection] {.async.} = - trace "Negotiating stream", conn, protos - let selected = await s.ms.select(conn, protos) - if not protos.contains(selected): - await conn.closeWithEOF() - raise newException(DialFailedError, "Unable to select sub-protocol " & $protos) - - return conn - -proc dial*(s: Switch, - peerId: PeerID, - protos: seq[string]): Future[Connection] {.async.} = - trace "Dialing (existing)", peerId, protos - let stream = await s.connManager.getStream(peerId) - if stream.isNil: - raise newException(DialFailedError, "Couldn't get muxed stream") - - return await s.negotiateStream(stream, protos) +method dial*( + s: Switch, + peerId: PeerID, + protos: seq[string]): Future[Connection] = + s.dialer.dial(peerId, protos) proc dial*(s: Switch, peerId: PeerID, proto: string): Future[Connection] = dial(s, peerId, @[proto]) -proc dial*(s: Switch, - peerId: PeerID, - addrs: seq[MultiAddress], - protos: seq[string]): - Future[Connection] {.async.} = - var - conn: Connection - stream: Connection +method dial*( + s: Switch, + peerId: PeerID, + addrs: seq[MultiAddress], + protos: seq[string]): Future[Connection] = + s.dialer.dial(peerId, addrs, protos) - proc cleanup() {.async.} = - if not(isNil(stream)): - await stream.closeWithEOF() - - if not(isNil(conn)): - await conn.close() - - try: - trace "Dialing (new)", peerId, protos - conn = await s.internalConnect(peerId, addrs) - trace "Opening stream", conn - stream = await s.connManager.getStream(conn) - - if isNil(stream): - raise newException(DialFailedError, - "Couldn't get muxed stream") - - return await s.negotiateStream(stream, protos) - except CancelledError as exc: - trace "Dial canceled", conn - await cleanup() - raise exc - except CatchableError as exc: - debug "Error dialing", conn, msg = exc.msg - await cleanup() - raise exc - -proc dial*(s: Switch, - peerId: PeerID, - addrs: seq[MultiAddress], - proto: string): - Future[Connection] = dial(s, peerId, addrs, @[proto]) +proc dial*( + s: Switch, + peerId: PeerID, + addrs: seq[MultiAddress], + proto: string): Future[Connection] = + dial(s, peerId, addrs, @[proto]) proc mount*[T: LPProtocol](s: Switch, proto: T, matcher: Matcher = nil) {.gcsafe, raises: [Defect, LPError].} = @@ -346,7 +190,7 @@ proc accept(s: Switch, transport: Transport) {.async.} = # noraises debug "Accepted an incoming connection", conn asyncSpawn upgradeMonitor(conn, upgrades) - asyncSpawn s.upgrade.upgradeIncoming(conn) + asyncSpawn transport.upgradeIncoming(conn) except CancelledError as exc: trace "releasing semaphore on cancellation" upgrades.release() # always release the slot @@ -404,26 +248,17 @@ proc newSwitch*(peerInfo: PeerInfo, identity: Identify, muxers: Table[string, MuxerProvider], secureManagers: openarray[Secure] = [], - maxConnections = MaxConnections, - maxIn = -1, - maxOut = -1, - maxConnsPerPeer = MaxConnectionsPerPeer): Switch - {.raises: [Defect, LPError].} = - + connManager: ConnManager, + ms: MultistreamSelect): Switch = if secureManagers.len == 0: raise (ref LPError)(msg: "Provide at least one secure manager") - let ms = newMultistream() - let connManager = ConnManager.init(maxConnsPerPeer, maxConnections, maxIn, maxOut) - let upgrade = MuxedUpgrade.init(identity, muxers, secureManagers, connManager, ms) - let switch = Switch( peerInfo: peerInfo, ms: ms, transports: transports, connManager: connManager, - upgrade: upgrade, - ) + dialer: Dialer.new(peerInfo, connManager, transports, ms)) switch.mount(identity) return switch diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index a3b2648ad..ebb55c636 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -14,10 +14,13 @@ import chronos, chronicles import transport, ../errors, ../wire, - ../multiaddress, ../multicodec, + ../multistream, + ../connmanager, + ../multiaddress, ../stream/connection, - ../stream/chronosstream + ../stream/chronosstream, + ../upgrademngrs/upgrade logScope: topics = "libp2p tcptransport" @@ -61,7 +64,7 @@ proc setupTcpTransportTracker(): TcpTransportTracker = result.isLeaked = leakTransport addTracker(TcpTransportTrackerName, result) -proc connHandler*(t: TcpTransport, +proc connHandler*(self: TcpTransport, client: StreamTransport, dir: Direction): Future[Connection] {.async.} = var observedAddr: MultiAddress = MultiAddress() @@ -75,8 +78,8 @@ proc connHandler*(t: TcpTransport, trace "Handling tcp connection", address = $observedAddr, dir = $dir, - clients = t.clients[Direction.In].len + - t.clients[Direction.Out].len + clients = self.clients[Direction.In].len + + self.clients[Direction.Out].len let conn = Connection( ChronosStream.init( @@ -95,7 +98,7 @@ proc connHandler*(t: TcpTransport, trace "Cleaning up client", addrs = $client.remoteAddress, conn - t.clients[dir].keepItIf( it != client ) + self.clients[dir].keepItIf( it != client ) await allFuturesThrowing( conn.close(), client.closeWait()) @@ -106,82 +109,108 @@ proc connHandler*(t: TcpTransport, let useExc {.used.} = exc debug "Error cleaning up client", errMsg = exc.msg, conn - t.clients[dir].add(client) + self.clients[dir].add(client) asyncSpawn onClose() return conn -proc init*(T: type TcpTransport, - flags: set[ServerFlags] = {}): T = - result = T(flags: flags) +func init*( + T: type TcpTransport, + flags: set[ServerFlags] = {}, + upgrade: Upgrade): T = + + result = T( + flags: flags, + upgrader: upgrade + ) result.initTransport() -method initTransport*(t: TcpTransport) = - t.multicodec = multiCodec("tcp") +method initTransport*(self: TcpTransport) = + self.multicodec = multiCodec("tcp") inc getTcpTransportTracker().opened -method start*(t: TcpTransport, ma: MultiAddress) {.async.} = +method start*( + self: TcpTransport, + ma: MultiAddress) {.async.} = ## listen on the transport ## - if t.running: + if self.running: trace "TCP transport already running" return - await procCall Transport(t).start(ma) + await procCall Transport(self).start(ma) trace "Starting TCP transport" - t.server = createStreamServer( - ma = t.ma, - flags = t.flags, - udata = t) + self.server = createStreamServer( + ma = self.ma, + flags = self.flags, + udata = self) # always get the resolved address in case we're bound to 0.0.0.0:0 - t.ma = MultiAddress.init(t.server.sock.getLocalAddress()).tryGet() - t.running = true + self.ma = MultiAddress.init(self.server.sock.getLocalAddress()).tryGet() + self.running = true - trace "Listening on", address = t.ma + trace "Listening on", address = self.ma -method stop*(t: TcpTransport) {.async, gcsafe.} = +method stop*(self: TcpTransport) {.async, gcsafe.} = ## stop the transport ## - t.running = false # mark stopped as soon as possible + self.running = false # mark stopped as soon as possible try: trace "Stopping TCP transport" - await procCall Transport(t).stop() # call base + await procCall Transport(self).stop() # call base checkFutures( await allFinished( - t.clients[Direction.In].mapIt(it.closeWait()) & - t.clients[Direction.Out].mapIt(it.closeWait()))) + self.clients[Direction.In].mapIt(it.closeWait()) & + self.clients[Direction.Out].mapIt(it.closeWait()))) # server can be nil - if not isNil(t.server): - await t.server.closeWait() + if not isNil(self.server): + await self.server.closeWait() - t.server = nil + self.server = nil trace "Transport stopped" inc getTcpTransportTracker().closed except CatchableError as exc: trace "Error shutting down tcp transport", exc = exc.msg -method accept*(t: TcpTransport): Future[Connection] {.async, gcsafe.} = +method upgradeIncoming*( + self: TcpTransport, + conn: Connection): Future[void] {.gcsafe.} = + ## base upgrade method that the transport uses to perform + ## transport specific upgrades + ## + + self.upgrader.upgradeIncoming(conn) + +method upgradeOutgoing*( + self: TcpTransport, + conn: Connection): Future[Connection] {.gcsafe.} = + ## base upgrade method that the transport uses to perform + ## transport specific upgrades + ## + + self.upgrader.upgradeOutgoing(conn) + +method accept*(self: TcpTransport): Future[Connection] {.async, gcsafe.} = ## accept a new TCP connection ## - if not t.running: + if not self.running: raise newTransportClosedError() try: - let transp = await t.server.accept() - return await t.connHandler(transp, Direction.In) + let transp = await self.server.accept() + return await self.connHandler(transp, Direction.In) except TransportOsError as exc: # TODO: it doesn't sound like all OS errors # can be ignored, we should re-raise those - # that can't. + # that can'self. debug "OS Error", exc = exc.msg except TransportTooManyError as exc: debug "Too many files opened", exc = exc.msg @@ -192,16 +221,16 @@ method accept*(t: TcpTransport): Future[Connection] {.async, gcsafe.} = warn "Unexpected error creating connection", exc = exc.msg raise exc -method dial*(t: TcpTransport, - address: MultiAddress): - Future[Connection] {.async, gcsafe.} = +method dial*( + self: TcpTransport, + address: MultiAddress): Future[Connection] {.async, gcsafe.} = ## dial a peer ## trace "Dialing remote peer", address = $address let transp = await connect(address) - return await t.connHandler(transp, Direction.Out) + return await self.connHandler(transp, Direction.Out) method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} = if procCall Transport(t).handles(address): diff --git a/libp2p/transports/transport.nim b/libp2p/transports/transport.nim index 5eed89e69..c92df5a4f 100644 --- a/libp2p/transports/transport.nim +++ b/libp2p/transports/transport.nim @@ -14,7 +14,8 @@ import sequtils import chronos, chronicles import ../stream/connection, ../multiaddress, - ../multicodec + ../multicodec, + ../upgrademngrs/upgrade logScope: topics = "libp2p transport" @@ -25,56 +26,72 @@ type Transport* = ref object of RootObj ma*: Multiaddress - multicodec*: MultiCodec running*: bool + upgrader*: Upgrade + multicodec*: MultiCodec proc newTransportClosedError*(parent: ref Exception = nil): ref LPError = newException(TransportClosedError, "Transport closed, no more connections!", parent) -method initTransport*(t: Transport) {.base, gcsafe, locks: "unknown".} = +method initTransport*(self: Transport) {.base, gcsafe, locks: "unknown".} = ## perform protocol initialization ## discard -method start*(t: Transport, ma: MultiAddress) {.base, async.} = +method start*( + self: Transport, + ma: MultiAddress): Future[void] {.base, async.} = ## start the transport ## - t.ma = ma + self.ma = ma trace "starting transport", address = $ma -method stop*(t: Transport) {.base, async.} = +method stop*(self: Transport): Future[void] {.base, async.} = ## stop and cleanup the transport ## including all outstanding connections ## discard -method accept*(t: Transport): Future[Connection] - {.base, async, gcsafe.} = +method accept*(self: Transport): Future[Connection] + {.base, gcsafe.} = ## accept incoming connections ## discard -method dial*(t: Transport, - address: MultiAddress): Future[Connection] - {.base, async, gcsafe.} = +method dial*( + self: Transport, + address: MultiAddress): Future[Connection] {.base, gcsafe.} = ## dial a peer ## discard -method upgrade*(t: Transport) {.base, async, gcsafe.} = +method upgradeIncoming*( + self: Transport, + conn: Connection): Future[void] {.base, gcsafe.} = ## base upgrade method that the transport uses to perform ## transport specific upgrades ## - discard + doAssert(false, "Not implemented!") -method handles*(t: Transport, address: MultiAddress): bool {.base, gcsafe.} = +method upgradeOutgoing*( + self: Transport, + conn: Connection): Future[Connection] {.base, gcsafe.} = + ## base upgrade method that the transport uses to perform + ## transport specific upgrades + ## + + doAssert(false, "Not implemented!") + +method handles*( + self: Transport, + address: MultiAddress): bool {.base, gcsafe.} = ## check if transport supports the multiaddress ## @@ -83,7 +100,7 @@ method handles*(t: Transport, address: MultiAddress): bool {.base, gcsafe.} = if address.protocols.isOk: return address.protocols.get().filterIt( it == multiCodec("p2p-circuit") ).len == 0 -method localAddress*(t: Transport): MultiAddress {.base, gcsafe.} = +method localAddress*(self: Transport): MultiAddress {.base, gcsafe.} = ## get the local address of the transport in case started with 0.0.0.0:0 ## diff --git a/libp2p/upgrademngrs/muxedupgrade.nim b/libp2p/upgrademngrs/muxedupgrade.nim index 5b5a3257c..ece680729 100644 --- a/libp2p/upgrademngrs/muxedupgrade.nim +++ b/libp2p/upgrademngrs/muxedupgrade.nim @@ -25,26 +25,30 @@ type muxers*: Table[string, MuxerProvider] streamHandler*: StreamHandler -proc identify*(u: MuxedUpgrade, muxer: Muxer) {.async, gcsafe.} = +proc identify*( + self: MuxedUpgrade, + muxer: Muxer) {.async, gcsafe.} = # new stream for identify var stream = await muxer.newStream() if stream == nil: return try: - await u.identify(stream) + await self.identify(stream) finally: await stream.closeWithEOF() -proc mux*(u: MuxedUpgrade, conn: Connection): Future[Muxer] {.async, gcsafe.} = +proc mux*( + self: MuxedUpgrade, + conn: Connection): Future[Muxer] {.async, gcsafe.} = ## mux incoming connection trace "Muxing connection", conn - if u.muxers.len == 0: + if self.muxers.len == 0: warn "no muxers registered, skipping upgrade flow", conn return - let muxerName = await u.ms.select(conn, toSeq(u.muxers.keys())) + let muxerName = await self.ms.select(conn, toSeq(self.muxers.keys())) if muxerName.len == 0 or muxerName == "na": debug "no muxer available, early exit", conn return @@ -52,18 +56,18 @@ proc mux*(u: MuxedUpgrade, conn: Connection): Future[Muxer] {.async, gcsafe.} = trace "Found a muxer", conn, muxerName # create new muxer for connection - let muxer = u.muxers[muxerName].newMuxer(conn) + let muxer = self.muxers[muxerName].newMuxer(conn) # install stream handler - muxer.streamHandler = u.streamHandler + muxer.streamHandler = self.streamHandler - u.connManager.storeConn(conn) + self.connManager.storeConn(conn) # store it in muxed connections if we have a peer for it - u.connManager.storeMuxer(muxer, muxer.handle()) # store muxer and start read loop + self.connManager.storeMuxer(muxer, muxer.handle()) # store muxer and start read loop try: - await u.identify(muxer) + await self.identify(muxer) except CatchableError as exc: # Identify is non-essential, though if it fails, it might indicate that # the connection was closed already - this will be picked up by the read @@ -72,10 +76,12 @@ proc mux*(u: MuxedUpgrade, conn: Connection): Future[Muxer] {.async, gcsafe.} = return muxer -method upgradeOutgoing*(u: MuxedUpgrade, conn: Connection): Future[Connection] {.async, gcsafe.} = +method upgradeOutgoing*( + self: MuxedUpgrade, + conn: Connection): Future[Connection] {.async, gcsafe.} = trace "Upgrading outgoing connection", conn - let sconn = await u.secure(conn) # secure the connection + let sconn = await self.secure(conn) # secure the connection if isNil(sconn): raise newException(UpgradeFailedError, "unable to secure connection, stopping upgrade") @@ -84,7 +90,7 @@ method upgradeOutgoing*(u: MuxedUpgrade, conn: Connection): Future[Connection] { raise newException(UpgradeFailedError, "current version of nim-libp2p requires that secure protocol negotiates peerid") - let muxer = await u.mux(sconn) # mux it if possible + let muxer = await self.mux(sconn) # mux it if possible if muxer == nil: # TODO this might be relaxed in the future raise newException(UpgradeFailedError, @@ -99,7 +105,9 @@ method upgradeOutgoing*(u: MuxedUpgrade, conn: Connection): Future[Connection] { return sconn -method upgradeIncoming*(u: MuxedUpgrade, incomingConn: Connection) {.async, gcsafe.} = # noraises +method upgradeIncoming*( + self: MuxedUpgrade, + incomingConn: Connection): Future[void] {.async, gcsafe.} = # noraises trace "Upgrading incoming connection", incomingConn let ms = newMultistream() @@ -108,7 +116,7 @@ method upgradeIncoming*(u: MuxedUpgrade, incomingConn: Connection) {.async, gcsa proto: string) {.async, gcsafe, closure.} = trace "Starting secure handler", conn - let secure = u.secureManagers.filterIt(it.codec == proto)[0] + let secure = self.secureManagers.filterIt(it.codec == proto)[0] var cconn = conn try: @@ -118,7 +126,7 @@ method upgradeIncoming*(u: MuxedUpgrade, incomingConn: Connection) {.async, gcsa cconn = sconn # add the muxer - for muxer in u.muxers.values: + for muxer in self.muxers.values: ms.addHandler(muxer.codecs, muxer) # handle subsequent secure requests @@ -136,7 +144,7 @@ method upgradeIncoming*(u: MuxedUpgrade, incomingConn: Connection) {.async, gcsa try: if (await ms.select(incomingConn)): # just handshake # add the secure handlers - for k in u.secureManagers: + for k in self.secureManagers: ms.addHandler(k.codec, securedHandler) # handle un-secured connections @@ -150,7 +158,9 @@ method upgradeIncoming*(u: MuxedUpgrade, incomingConn: Connection) {.async, gcsa if not isNil(incomingConn): await incomingConn.close() -proc muxerHandler(u: MuxedUpgrade, muxer: Muxer) {.async, gcsafe.} = +proc muxerHandler( + self: MuxedUpgrade, + muxer: Muxer) {.async, gcsafe.} = let conn = muxer.connection @@ -160,13 +170,13 @@ proc muxerHandler(u: MuxedUpgrade, muxer: Muxer) {.async, gcsafe.} = return # store incoming connection - u.connManager.storeConn(conn) + self.connManager.storeConn(conn) # store muxer and muxed connection - u.connManager.storeMuxer(muxer) + self.connManager.storeMuxer(muxer) try: - await u.identify(muxer) + await self.identify(muxer) except IdentifyError as exc: # Identify is non-essential, though if it fails, it might indicate that # the connection was closed already - this will be picked up by the read @@ -198,17 +208,20 @@ proc init*( connManager: connManager, ms: ms) - upgrader.streamHandler = proc(conn: Connection) {.async, gcsafe.} = # noraises - trace "Starting stream handler", conn - try: - await upgrader.ms.handle(conn) # handle incoming connection - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "exception in stream handler", conn, msg = exc.msg - finally: - await conn.closeWithEOF() - trace "Stream handler done", conn + proc streamHandler(conn: Connection) {.async, gcsafe.} = # noraises + # trace "Starting stream handler", conn + try: + await upgrader.ms.handle(conn) # handle incoming connection + except CancelledError as exc: + raise exc + except CatchableError as exc: + # trace "exception in stream handler", conn, msg = exc.msg + discard + finally: + await conn.closeWithEOF() + # trace "Stream handler done", conn + + upgrader.streamHandler = streamHandler for _, val in muxers: val.streamHandler = upgrader.streamHandler diff --git a/libp2p/upgrademngrs/upgrade.nim b/libp2p/upgrademngrs/upgrade.nim index 475febe71..68da6d074 100644 --- a/libp2p/upgrademngrs/upgrade.nim +++ b/libp2p/upgrademngrs/upgrade.nim @@ -35,22 +35,28 @@ type connManager*: ConnManager secureManagers*: seq[Secure] -method upgradeIncoming*(u: Upgrade, conn: Connection): Future[void] {.base.} = +method upgradeIncoming*( + self: Upgrade, + conn: Connection): Future[void] {.base.} = doAssert(false, "Not implemented!") -method upgradeOutgoing*(u: Upgrade, conn: Connection): Future[Connection] {.base.} = +method upgradeOutgoing*( + self: Upgrade, + conn: Connection): Future[Connection] {.base.} = doAssert(false, "Not implemented!") -proc secure*(u: Upgrade, conn: Connection): Future[Connection] {.async, gcsafe.} = - if u.secureManagers.len <= 0: +proc secure*( + self: Upgrade, + conn: Connection): Future[Connection] {.async, gcsafe.} = + if self.secureManagers.len <= 0: raise newException(UpgradeFailedError, "No secure managers registered!") - let codec = await u.ms.select(conn, u.secureManagers.mapIt(it.codec)) + let codec = await self.ms.select(conn, self.secureManagers.mapIt(it.codec)) if codec.len == 0: raise newException(UpgradeFailedError, "Unable to negotiate a secure channel!") trace "Securing connection", conn, codec - let secureProtocol = u.secureManagers.filterIt(it.codec == codec) + let secureProtocol = self.secureManagers.filterIt(it.codec == codec) # ms.select should deal with the correctness of this # let's avoid duplicating checks but detect if it fails to do it properly @@ -58,11 +64,13 @@ proc secure*(u: Upgrade, conn: Connection): Future[Connection] {.async, gcsafe.} return await secureProtocol[0].secure(conn, true) -proc identify*(u: Upgrade, conn: Connection) {.async, gcsafe.} = +proc identify*( + self: Upgrade, + conn: Connection) {.async, gcsafe.} = ## identify the connection - if (await u.ms.select(conn, u.identity.codec)): - let info = await u.identity.identify(conn, conn.peerInfo) + if (await self.ms.select(conn, self.identity.codec)): + let info = await self.identity.identify(conn, conn.peerInfo) if info.pubKey.isNone and isNil(conn): raise newException(UpgradeFailedError, diff --git a/tests/testidentify.nim b/tests/testidentify.nim index cbdd9b236..583661b35 100644 --- a/tests/testidentify.nim +++ b/tests/testidentify.nim @@ -8,7 +8,8 @@ import ../libp2p/[protocols/identify, multistream, transports/transport, transports/tcptransport, - crypto/crypto] + crypto/crypto, + upgrademngrs/upgrade] import ./helpers when defined(nimHasUsed): {.used.} @@ -38,8 +39,8 @@ suite "Identify": remotePeerInfo = PeerInfo.init( remoteSecKey, [ma], ["/test/proto1/1.0.0", "/test/proto2/1.0.0"]) - transport1 = TcpTransport.init() - transport2 = TcpTransport.init() + transport1 = TcpTransport.init(upgrade = Upgrade()) + transport2 = TcpTransport.init(upgrade = Upgrade()) identifyProto1 = newIdentify(remotePeerInfo) identifyProto2 = newIdentify(remotePeerInfo) diff --git a/tests/testmplex.nim b/tests/testmplex.nim index 1f830d2ea..a1192e546 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -9,6 +9,7 @@ import ../libp2p/[errors, muxers/mplex/mplex, muxers/mplex/coder, muxers/mplex/lpchannel, + upgrademngrs/upgrade, vbuffer, varint] @@ -379,7 +380,7 @@ suite "Mplex": asyncTest "read/write receiver": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - let transport1: TcpTransport = TcpTransport.init() + let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let listenFut = transport1.start(ma) proc acceptHandler() {.async, gcsafe.} = @@ -395,7 +396,7 @@ suite "Mplex": await mplexListen.close() let acceptFut = acceptHandler() - let transport2: TcpTransport = TcpTransport.init() + let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let conn = await transport2.dial(transport1.ma) let mplexDial = Mplex.init(conn) @@ -416,7 +417,7 @@ suite "Mplex": asyncTest "read/write receiver lazy": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - let transport1: TcpTransport = TcpTransport.init() + let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let listenFut = transport1.start(ma) proc acceptHandler() {.async, gcsafe.} = @@ -432,7 +433,7 @@ suite "Mplex": await mplexListen.close() let acceptFut = acceptHandler() - let transport2: TcpTransport = TcpTransport.init() + let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade()) let conn = await transport2.dial(transport1.ma) let mplexDial = Mplex.init(conn) @@ -460,7 +461,7 @@ suite "Mplex": for _ in 0..