diff --git a/libp2p/dial.nim b/libp2p/dial.nim index b4c205af7..089ebdb69 100644 --- a/libp2p/dial.nim +++ b/libp2p/dial.nim @@ -28,7 +28,8 @@ method connect*( peerId: PeerId, addrs: seq[MultiAddress], forceDial = false, - reuseConnection = true) {.async, base.} = + reuseConnection = true, + upgradeDir = Direction.Out) {.async, base.} = ## connect remote peer without negotiating ## a protocol ## diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index a5e7e0c27..a96af4d0e 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -36,7 +36,6 @@ logScope: declareCounter(libp2p_total_dial_attempts, "total attempted dials") declareCounter(libp2p_successful_dials, "dialed successful peers") declareCounter(libp2p_failed_dials, "failed dials") -declareCounter(libp2p_failed_upgrades_outgoing, "outgoing connections failed upgrades") type DialFailedError* = object of LPError @@ -53,7 +52,8 @@ proc dialAndUpgrade( self: Dialer, peerId: Opt[PeerId], hostname: string, - address: MultiAddress): + address: MultiAddress, + upgradeDir = Direction.Out): Future[Muxer] {.async.} = for transport in self.transports: # for each transport @@ -71,27 +71,27 @@ proc dialAndUpgrade( libp2p_failed_dials.inc() return nil # Try the next address - # also keep track of the connection's bottom unsafe transport direction - # required by gossipsub scoring - dialed.transportDir = Direction.Out - libp2p_successful_dials.inc() let mux = try: - await transport.upgradeOutgoing(dialed, peerId) + dialed.transportDir = upgradeDir + await transport.upgrade(dialed, upgradeDir, peerId) except CatchableError as exc: # If we failed to establish the connection through one transport, # we won't succeeded through another - no use in trying again await dialed.close() debug "Upgrade failed", msg = exc.msg, peerId if exc isnot CancelledError: - libp2p_failed_upgrades_outgoing.inc() + if upgradeDir == Direction.Out: + libp2p_failed_upgrades_outgoing.inc() + else: + libp2p_failed_upgrades_incoming.inc() # Try other address return nil - doAssert not isNil(mux), "connection died after upgradeOutgoing" + doAssert not isNil(mux), "connection died after upgrade " & $upgradeDir debug "Dial successful", peerId = mux.connection.peerId return mux return nil @@ -127,7 +127,8 @@ proc expandDnsAddr( proc dialAndUpgrade( self: Dialer, peerId: Opt[PeerId], - addrs: seq[MultiAddress]): + addrs: seq[MultiAddress], + upgradeDir = Direction.Out): Future[Muxer] {.async.} = debug "Dialing peer", peerId @@ -145,7 +146,7 @@ proc dialAndUpgrade( else: await self.nameResolver.resolveMAddress(expandedAddress) for resolvedAddress in resolvedAddresses: - result = await self.dialAndUpgrade(addrPeerId, hostname, resolvedAddress) + result = await self.dialAndUpgrade(addrPeerId, hostname, resolvedAddress, upgradeDir) if not isNil(result): return result @@ -162,7 +163,8 @@ proc internalConnect( peerId: Opt[PeerId], addrs: seq[MultiAddress], forceDial: bool, - reuseConnection = true): + reuseConnection = true, + upgradeDir = Direction.Out): Future[Muxer] {.async.} = if Opt.some(self.localPeerId) == peerId: raise newException(CatchableError, "can't dial self!") @@ -180,7 +182,7 @@ proc internalConnect( let slot = self.connManager.getOutgoingSlot(forceDial) let muxed = try: - await self.dialAndUpgrade(peerId, addrs) + await self.dialAndUpgrade(peerId, addrs, upgradeDir) except CatchableError as exc: slot.release() raise exc @@ -206,7 +208,8 @@ method connect*( peerId: PeerId, addrs: seq[MultiAddress], forceDial = false, - reuseConnection = true) {.async.} = + reuseConnection = true, + upgradeDir = Direction.Out) {.async.} = ## connect remote peer without negotiating ## a protocol ## @@ -214,7 +217,7 @@ method connect*( if self.connManager.connCount(peerId) > 0 and reuseConnection: return - discard await self.internalConnect(Opt.some(peerId), addrs, forceDial, reuseConnection) + discard await self.internalConnect(Opt.some(peerId), addrs, forceDial, reuseConnection, upgradeDir) method connect*( self: Dialer, diff --git a/libp2p/peerstore.nim b/libp2p/peerstore.nim index 617a40ec6..26a8349c5 100644 --- a/libp2p/peerstore.nim +++ b/libp2p/peerstore.nim @@ -221,5 +221,8 @@ proc identify*( finally: await stream.closeWithEOF() +proc getMostObservedProtosAndPorts*(self: PeerStore): seq[MultiAddress] = + return self.identify.observedAddrManager.getMostObservedProtosAndPorts() + proc guessDialableAddr*(self: PeerStore, ma: MultiAddress): MultiAddress = return self.identify.observedAddrManager.guessDialableAddr(ma) diff --git a/libp2p/protocols/connectivity/dcutr/client.nim b/libp2p/protocols/connectivity/dcutr/client.nim new file mode 100644 index 000000000..560741e6c --- /dev/null +++ b/libp2p/protocols/connectivity/dcutr/client.nim @@ -0,0 +1,90 @@ +# Nim-LibP2P +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import std/sequtils + +import stew/results +import chronos, chronicles + +import core +import ../../protocol, + ../../../stream/connection, + ../../../switch, + ../../../utils/future + +type + DcutrClient* = ref object + connectTimeout: Duration + maxDialableAddrs: int + +logScope: + topics = "libp2p dcutrclient" + +proc new*(T: typedesc[DcutrClient], connectTimeout = 15.seconds, maxDialableAddrs = 8): T = + return T(connectTimeout: connectTimeout, maxDialableAddrs: maxDialableAddrs) + +proc startSync*(self: DcutrClient, switch: Switch, remotePeerId: PeerId, addrs: seq[MultiAddress]) {.async.} = + logScope: + peerId = switch.peerInfo.peerId + + var + peerDialableAddrs: seq[MultiAddress] + stream: Connection + try: + var ourDialableAddrs = getHolePunchableAddrs(addrs) + if ourDialableAddrs.len == 0: + debug "Dcutr initiator has no supported dialable addresses. Aborting Dcutr.", addrs + return + + stream = await switch.dial(remotePeerId, DcutrCodec) + await stream.send(MsgType.Connect, addrs) + debug "Dcutr initiator has sent a Connect message." + let rttStart = Moment.now() + let connectAnswer = DcutrMsg.decode(await stream.readLp(1024)) + + peerDialableAddrs = getHolePunchableAddrs(connectAnswer.addrs) + if peerDialableAddrs.len == 0: + debug "Dcutr receiver has no supported dialable addresses to connect to. Aborting Dcutr.", addrs=connectAnswer.addrs + return + + let rttEnd = Moment.now() + debug "Dcutr initiator has received a Connect message back.", connectAnswer + let halfRtt = (rttEnd - rttStart) div 2'i64 + await stream.send(MsgType.Sync, addrs) + debug "Dcutr initiator has sent a Sync message." + await sleepAsync(halfRtt) + + if peerDialableAddrs.len > self.maxDialableAddrs: + peerDialableAddrs = peerDialableAddrs[0.. maxDialableAddrs: + peerDialableAddrs = peerDialableAddrs[0..