parent
0221affe98
commit
b7726bf68f
|
@ -28,7 +28,8 @@ method connect*(
|
||||||
peerId: PeerId,
|
peerId: PeerId,
|
||||||
addrs: seq[MultiAddress],
|
addrs: seq[MultiAddress],
|
||||||
forceDial = false,
|
forceDial = false,
|
||||||
reuseConnection = true) {.async, base.} =
|
reuseConnection = true,
|
||||||
|
upgradeDir = Direction.Out) {.async, base.} =
|
||||||
## connect remote peer without negotiating
|
## connect remote peer without negotiating
|
||||||
## a protocol
|
## a protocol
|
||||||
##
|
##
|
||||||
|
|
|
@ -36,7 +36,6 @@ logScope:
|
||||||
declareCounter(libp2p_total_dial_attempts, "total attempted dials")
|
declareCounter(libp2p_total_dial_attempts, "total attempted dials")
|
||||||
declareCounter(libp2p_successful_dials, "dialed successful peers")
|
declareCounter(libp2p_successful_dials, "dialed successful peers")
|
||||||
declareCounter(libp2p_failed_dials, "failed dials")
|
declareCounter(libp2p_failed_dials, "failed dials")
|
||||||
declareCounter(libp2p_failed_upgrades_outgoing, "outgoing connections failed upgrades")
|
|
||||||
|
|
||||||
type
|
type
|
||||||
DialFailedError* = object of LPError
|
DialFailedError* = object of LPError
|
||||||
|
@ -53,7 +52,8 @@ proc dialAndUpgrade(
|
||||||
self: Dialer,
|
self: Dialer,
|
||||||
peerId: Opt[PeerId],
|
peerId: Opt[PeerId],
|
||||||
hostname: string,
|
hostname: string,
|
||||||
address: MultiAddress):
|
address: MultiAddress,
|
||||||
|
upgradeDir = Direction.Out):
|
||||||
Future[Muxer] {.async.} =
|
Future[Muxer] {.async.} =
|
||||||
|
|
||||||
for transport in self.transports: # for each transport
|
for transport in self.transports: # for each transport
|
||||||
|
@ -71,27 +71,27 @@ proc dialAndUpgrade(
|
||||||
libp2p_failed_dials.inc()
|
libp2p_failed_dials.inc()
|
||||||
return nil # Try the next address
|
return nil # Try the next address
|
||||||
|
|
||||||
# also keep track of the connection's bottom unsafe transport direction
|
|
||||||
# required by gossipsub scoring
|
|
||||||
dialed.transportDir = Direction.Out
|
|
||||||
|
|
||||||
libp2p_successful_dials.inc()
|
libp2p_successful_dials.inc()
|
||||||
|
|
||||||
let mux =
|
let mux =
|
||||||
try:
|
try:
|
||||||
await transport.upgradeOutgoing(dialed, peerId)
|
dialed.transportDir = upgradeDir
|
||||||
|
await transport.upgrade(dialed, upgradeDir, peerId)
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
# If we failed to establish the connection through one transport,
|
# If we failed to establish the connection through one transport,
|
||||||
# we won't succeeded through another - no use in trying again
|
# we won't succeeded through another - no use in trying again
|
||||||
await dialed.close()
|
await dialed.close()
|
||||||
debug "Upgrade failed", msg = exc.msg, peerId
|
debug "Upgrade failed", msg = exc.msg, peerId
|
||||||
if exc isnot CancelledError:
|
if exc isnot CancelledError:
|
||||||
libp2p_failed_upgrades_outgoing.inc()
|
if upgradeDir == Direction.Out:
|
||||||
|
libp2p_failed_upgrades_outgoing.inc()
|
||||||
|
else:
|
||||||
|
libp2p_failed_upgrades_incoming.inc()
|
||||||
|
|
||||||
# Try other address
|
# Try other address
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
doAssert not isNil(mux), "connection died after upgradeOutgoing"
|
doAssert not isNil(mux), "connection died after upgrade " & $upgradeDir
|
||||||
debug "Dial successful", peerId = mux.connection.peerId
|
debug "Dial successful", peerId = mux.connection.peerId
|
||||||
return mux
|
return mux
|
||||||
return nil
|
return nil
|
||||||
|
@ -127,7 +127,8 @@ proc expandDnsAddr(
|
||||||
proc dialAndUpgrade(
|
proc dialAndUpgrade(
|
||||||
self: Dialer,
|
self: Dialer,
|
||||||
peerId: Opt[PeerId],
|
peerId: Opt[PeerId],
|
||||||
addrs: seq[MultiAddress]):
|
addrs: seq[MultiAddress],
|
||||||
|
upgradeDir = Direction.Out):
|
||||||
Future[Muxer] {.async.} =
|
Future[Muxer] {.async.} =
|
||||||
|
|
||||||
debug "Dialing peer", peerId
|
debug "Dialing peer", peerId
|
||||||
|
@ -145,7 +146,7 @@ proc dialAndUpgrade(
|
||||||
else: await self.nameResolver.resolveMAddress(expandedAddress)
|
else: await self.nameResolver.resolveMAddress(expandedAddress)
|
||||||
|
|
||||||
for resolvedAddress in resolvedAddresses:
|
for resolvedAddress in resolvedAddresses:
|
||||||
result = await self.dialAndUpgrade(addrPeerId, hostname, resolvedAddress)
|
result = await self.dialAndUpgrade(addrPeerId, hostname, resolvedAddress, upgradeDir)
|
||||||
if not isNil(result):
|
if not isNil(result):
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
@ -162,7 +163,8 @@ proc internalConnect(
|
||||||
peerId: Opt[PeerId],
|
peerId: Opt[PeerId],
|
||||||
addrs: seq[MultiAddress],
|
addrs: seq[MultiAddress],
|
||||||
forceDial: bool,
|
forceDial: bool,
|
||||||
reuseConnection = true):
|
reuseConnection = true,
|
||||||
|
upgradeDir = Direction.Out):
|
||||||
Future[Muxer] {.async.} =
|
Future[Muxer] {.async.} =
|
||||||
if Opt.some(self.localPeerId) == peerId:
|
if Opt.some(self.localPeerId) == peerId:
|
||||||
raise newException(CatchableError, "can't dial self!")
|
raise newException(CatchableError, "can't dial self!")
|
||||||
|
@ -180,7 +182,7 @@ proc internalConnect(
|
||||||
let slot = self.connManager.getOutgoingSlot(forceDial)
|
let slot = self.connManager.getOutgoingSlot(forceDial)
|
||||||
let muxed =
|
let muxed =
|
||||||
try:
|
try:
|
||||||
await self.dialAndUpgrade(peerId, addrs)
|
await self.dialAndUpgrade(peerId, addrs, upgradeDir)
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
slot.release()
|
slot.release()
|
||||||
raise exc
|
raise exc
|
||||||
|
@ -206,7 +208,8 @@ method connect*(
|
||||||
peerId: PeerId,
|
peerId: PeerId,
|
||||||
addrs: seq[MultiAddress],
|
addrs: seq[MultiAddress],
|
||||||
forceDial = false,
|
forceDial = false,
|
||||||
reuseConnection = true) {.async.} =
|
reuseConnection = true,
|
||||||
|
upgradeDir = Direction.Out) {.async.} =
|
||||||
## connect remote peer without negotiating
|
## connect remote peer without negotiating
|
||||||
## a protocol
|
## a protocol
|
||||||
##
|
##
|
||||||
|
@ -214,7 +217,7 @@ method connect*(
|
||||||
if self.connManager.connCount(peerId) > 0 and reuseConnection:
|
if self.connManager.connCount(peerId) > 0 and reuseConnection:
|
||||||
return
|
return
|
||||||
|
|
||||||
discard await self.internalConnect(Opt.some(peerId), addrs, forceDial, reuseConnection)
|
discard await self.internalConnect(Opt.some(peerId), addrs, forceDial, reuseConnection, upgradeDir)
|
||||||
|
|
||||||
method connect*(
|
method connect*(
|
||||||
self: Dialer,
|
self: Dialer,
|
||||||
|
|
|
@ -221,5 +221,8 @@ proc identify*(
|
||||||
finally:
|
finally:
|
||||||
await stream.closeWithEOF()
|
await stream.closeWithEOF()
|
||||||
|
|
||||||
|
proc getMostObservedProtosAndPorts*(self: PeerStore): seq[MultiAddress] =
|
||||||
|
return self.identify.observedAddrManager.getMostObservedProtosAndPorts()
|
||||||
|
|
||||||
proc guessDialableAddr*(self: PeerStore, ma: MultiAddress): MultiAddress =
|
proc guessDialableAddr*(self: PeerStore, ma: MultiAddress): MultiAddress =
|
||||||
return self.identify.observedAddrManager.guessDialableAddr(ma)
|
return self.identify.observedAddrManager.guessDialableAddr(ma)
|
||||||
|
|
|
@ -0,0 +1,90 @@
|
||||||
|
# Nim-LibP2P
|
||||||
|
# Copyright (c) 2023 Status Research & Development GmbH
|
||||||
|
# Licensed under either of
|
||||||
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||||
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||||
|
# at your option.
|
||||||
|
# This file may not be copied, modified, or distributed except according to
|
||||||
|
# those terms.
|
||||||
|
|
||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import std/sequtils
|
||||||
|
|
||||||
|
import stew/results
|
||||||
|
import chronos, chronicles
|
||||||
|
|
||||||
|
import core
|
||||||
|
import ../../protocol,
|
||||||
|
../../../stream/connection,
|
||||||
|
../../../switch,
|
||||||
|
../../../utils/future
|
||||||
|
|
||||||
|
type
|
||||||
|
DcutrClient* = ref object
|
||||||
|
connectTimeout: Duration
|
||||||
|
maxDialableAddrs: int
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "libp2p dcutrclient"
|
||||||
|
|
||||||
|
proc new*(T: typedesc[DcutrClient], connectTimeout = 15.seconds, maxDialableAddrs = 8): T =
|
||||||
|
return T(connectTimeout: connectTimeout, maxDialableAddrs: maxDialableAddrs)
|
||||||
|
|
||||||
|
proc startSync*(self: DcutrClient, switch: Switch, remotePeerId: PeerId, addrs: seq[MultiAddress]) {.async.} =
|
||||||
|
logScope:
|
||||||
|
peerId = switch.peerInfo.peerId
|
||||||
|
|
||||||
|
var
|
||||||
|
peerDialableAddrs: seq[MultiAddress]
|
||||||
|
stream: Connection
|
||||||
|
try:
|
||||||
|
var ourDialableAddrs = getHolePunchableAddrs(addrs)
|
||||||
|
if ourDialableAddrs.len == 0:
|
||||||
|
debug "Dcutr initiator has no supported dialable addresses. Aborting Dcutr.", addrs
|
||||||
|
return
|
||||||
|
|
||||||
|
stream = await switch.dial(remotePeerId, DcutrCodec)
|
||||||
|
await stream.send(MsgType.Connect, addrs)
|
||||||
|
debug "Dcutr initiator has sent a Connect message."
|
||||||
|
let rttStart = Moment.now()
|
||||||
|
let connectAnswer = DcutrMsg.decode(await stream.readLp(1024))
|
||||||
|
|
||||||
|
peerDialableAddrs = getHolePunchableAddrs(connectAnswer.addrs)
|
||||||
|
if peerDialableAddrs.len == 0:
|
||||||
|
debug "Dcutr receiver has no supported dialable addresses to connect to. Aborting Dcutr.", addrs=connectAnswer.addrs
|
||||||
|
return
|
||||||
|
|
||||||
|
let rttEnd = Moment.now()
|
||||||
|
debug "Dcutr initiator has received a Connect message back.", connectAnswer
|
||||||
|
let halfRtt = (rttEnd - rttStart) div 2'i64
|
||||||
|
await stream.send(MsgType.Sync, addrs)
|
||||||
|
debug "Dcutr initiator has sent a Sync message."
|
||||||
|
await sleepAsync(halfRtt)
|
||||||
|
|
||||||
|
if peerDialableAddrs.len > self.maxDialableAddrs:
|
||||||
|
peerDialableAddrs = peerDialableAddrs[0..<self.maxDialableAddrs]
|
||||||
|
var futs = peerDialableAddrs.mapIt(switch.connect(stream.peerId, @[it], forceDial = true, reuseConnection = false, upgradeDir = Direction.In))
|
||||||
|
try:
|
||||||
|
discard await anyCompleted(futs).wait(self.connectTimeout)
|
||||||
|
debug "Dcutr initiator has directly connected to the remote peer."
|
||||||
|
finally:
|
||||||
|
for fut in futs: fut.cancel()
|
||||||
|
except CancelledError as err:
|
||||||
|
raise err
|
||||||
|
except AllFuturesFailedError as err:
|
||||||
|
debug "Dcutr initiator could not connect to the remote peer, all connect attempts failed", peerDialableAddrs, msg = err.msg
|
||||||
|
raise newException(DcutrError, "Dcutr initiator could not connect to the remote peer, all connect attempts failed", err)
|
||||||
|
except AsyncTimeoutError as err:
|
||||||
|
debug "Dcutr initiator could not connect to the remote peer, all connect attempts timed out", peerDialableAddrs, msg = err.msg
|
||||||
|
raise newException(DcutrError, "Dcutr initiator could not connect to the remote peer, all connect attempts timed out", err)
|
||||||
|
except CatchableError as err:
|
||||||
|
debug "Unexpected error when trying direct conn", err = err.msg
|
||||||
|
raise newException(DcutrError, "Unexpected error when trying a direct conn", err)
|
||||||
|
finally:
|
||||||
|
if stream != nil:
|
||||||
|
await stream.close()
|
||||||
|
|
|
@ -0,0 +1,63 @@
|
||||||
|
# Nim-LibP2P
|
||||||
|
# Copyright (c) 2023 Status Research & Development GmbH
|
||||||
|
# Licensed under either of
|
||||||
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||||
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||||
|
# at your option.
|
||||||
|
# This file may not be copied, modified, or distributed except according to
|
||||||
|
# those terms.
|
||||||
|
|
||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import std/sequtils
|
||||||
|
|
||||||
|
import chronos
|
||||||
|
import stew/objects
|
||||||
|
|
||||||
|
import ../../../multiaddress,
|
||||||
|
../../../errors,
|
||||||
|
../../../stream/connection
|
||||||
|
|
||||||
|
export multiaddress
|
||||||
|
|
||||||
|
const
|
||||||
|
DcutrCodec* = "/libp2p/dcutr/1.0.0"
|
||||||
|
|
||||||
|
type
|
||||||
|
MsgType* = enum
|
||||||
|
Connect = 100
|
||||||
|
Sync = 300
|
||||||
|
|
||||||
|
DcutrMsg* = object
|
||||||
|
msgType*: MsgType
|
||||||
|
addrs*: seq[MultiAddress]
|
||||||
|
|
||||||
|
DcutrError* = object of LPError
|
||||||
|
|
||||||
|
proc encode*(msg: DcutrMsg): ProtoBuffer =
|
||||||
|
result = initProtoBuffer()
|
||||||
|
result.write(1, msg.msgType.uint)
|
||||||
|
for addr in msg.addrs:
|
||||||
|
result.write(2, addr)
|
||||||
|
result.finish()
|
||||||
|
|
||||||
|
proc decode*(_: typedesc[DcutrMsg], buf: seq[byte]): DcutrMsg {.raises: [Defect, DcutrError].} =
|
||||||
|
var
|
||||||
|
msgTypeOrd: uint32
|
||||||
|
dcutrMsg: DcutrMsg
|
||||||
|
var pb = initProtoBuffer(buf)
|
||||||
|
var r1 = pb.getField(1, msgTypeOrd)
|
||||||
|
let r2 = pb.getRepeatedField(2, dcutrMsg.addrs)
|
||||||
|
if r1.isErr or r2.isErr or not checkedEnumAssign(dcutrMsg.msgType, msgTypeOrd):
|
||||||
|
raise newException(DcutrError, "Received malformed message")
|
||||||
|
return dcutrMsg
|
||||||
|
|
||||||
|
proc send*(conn: Connection, msgType: MsgType, addrs: seq[MultiAddress]) {.async.} =
|
||||||
|
let pb = DcutrMsg(msgType: msgType, addrs: addrs).encode()
|
||||||
|
await conn.writeLp(pb.buffer)
|
||||||
|
|
||||||
|
proc getHolePunchableAddrs*(addrs: seq[MultiAddress]): seq[MultiAddress] =
|
||||||
|
addrs.filterIt(TCP.match(it))
|
|
@ -0,0 +1,83 @@
|
||||||
|
# Nim-LibP2P
|
||||||
|
# Copyright (c) 2023 Status Research & Development GmbH
|
||||||
|
# Licensed under either of
|
||||||
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||||
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||||
|
# at your option.
|
||||||
|
# This file may not be copied, modified, or distributed except according to
|
||||||
|
# those terms.
|
||||||
|
|
||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import std/[options, sets, sequtils]
|
||||||
|
|
||||||
|
import stew/[results, objects]
|
||||||
|
import chronos, chronicles
|
||||||
|
|
||||||
|
import core
|
||||||
|
import ../../protocol,
|
||||||
|
../../../stream/connection,
|
||||||
|
../../../switch,
|
||||||
|
../../../utils/future
|
||||||
|
|
||||||
|
export chronicles
|
||||||
|
|
||||||
|
type Dcutr* = ref object of LPProtocol
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "libp2p dcutr"
|
||||||
|
|
||||||
|
proc new*(T: typedesc[Dcutr], switch: Switch, connectTimeout = 15.seconds, maxDialableAddrs = 8): T =
|
||||||
|
|
||||||
|
proc handleStream(stream: Connection, proto: string) {.async, gcsafe.} =
|
||||||
|
var peerDialableAddrs: seq[MultiAddress]
|
||||||
|
try:
|
||||||
|
let connectMsg = DcutrMsg.decode(await stream.readLp(1024))
|
||||||
|
debug "Dcutr receiver received a Connect message.", connectMsg
|
||||||
|
|
||||||
|
var ourAddrs = switch.peerStore.getMostObservedProtosAndPorts() # likely empty when the peer is reachable
|
||||||
|
if ourAddrs.len == 0:
|
||||||
|
# this list should be the same as the peer's public addrs when it is reachable
|
||||||
|
ourAddrs = switch.peerInfo.listenAddrs.mapIt(switch.peerStore.guessDialableAddr(it))
|
||||||
|
var ourDialableAddrs = getHolePunchableAddrs(ourAddrs)
|
||||||
|
if ourDialableAddrs.len == 0:
|
||||||
|
debug "Dcutr receiver has no supported dialable addresses. Aborting Dcutr.", ourAddrs
|
||||||
|
return
|
||||||
|
|
||||||
|
await stream.send(MsgType.Connect, ourAddrs)
|
||||||
|
debug "Dcutr receiver has sent a Connect message back."
|
||||||
|
let syncMsg = DcutrMsg.decode(await stream.readLp(1024))
|
||||||
|
debug "Dcutr receiver has received a Sync message.", syncMsg
|
||||||
|
|
||||||
|
peerDialableAddrs = getHolePunchableAddrs(connectMsg.addrs)
|
||||||
|
if peerDialableAddrs.len == 0:
|
||||||
|
debug "Dcutr initiator has no supported dialable addresses to connect to. Aborting Dcutr.", addrs=connectMsg.addrs
|
||||||
|
return
|
||||||
|
|
||||||
|
if peerDialableAddrs.len > maxDialableAddrs:
|
||||||
|
peerDialableAddrs = peerDialableAddrs[0..<maxDialableAddrs]
|
||||||
|
var futs = peerDialableAddrs.mapIt(switch.connect(stream.peerId, @[it], forceDial = true, reuseConnection = false))
|
||||||
|
try:
|
||||||
|
discard await anyCompleted(futs).wait(connectTimeout)
|
||||||
|
debug "Dcutr receiver has directly connected to the remote peer."
|
||||||
|
finally:
|
||||||
|
for fut in futs: fut.cancel()
|
||||||
|
except CancelledError as err:
|
||||||
|
raise err
|
||||||
|
except AllFuturesFailedError as err:
|
||||||
|
debug "Dcutr receiver could not connect to the remote peer, all connect attempts failed", peerDialableAddrs, msg = err.msg
|
||||||
|
raise newException(DcutrError, "Dcutr receiver could not connect to the remote peer, all connect attempts failed", err)
|
||||||
|
except AsyncTimeoutError as err:
|
||||||
|
debug "Dcutr receiver could not connect to the remote peer, all connect attempts timed out", peerDialableAddrs, msg = err.msg
|
||||||
|
raise newException(DcutrError, "Dcutr receiver could not connect to the remote peer, all connect attempts timed out", err)
|
||||||
|
except CatchableError as err:
|
||||||
|
warn "Unexpected error in dcutr handler", msg = err.msg
|
||||||
|
raise newException(DcutrError, "Unexpected error in dcutr handler", err)
|
||||||
|
|
||||||
|
let self = T()
|
||||||
|
self.handler = handleStream
|
||||||
|
self.codec = DcutrCodec
|
||||||
|
self
|
|
@ -58,8 +58,6 @@ logScope:
|
||||||
# and only if the channel has been secured (i.e. if a secure manager has been
|
# and only if the channel has been secured (i.e. if a secure manager has been
|
||||||
# previously provided)
|
# previously provided)
|
||||||
|
|
||||||
declareCounter(libp2p_failed_upgrades_incoming, "incoming connections failed upgrades")
|
|
||||||
|
|
||||||
const
|
const
|
||||||
ConcurrentUpgrades* = 4
|
ConcurrentUpgrades* = 4
|
||||||
|
|
||||||
|
@ -149,10 +147,11 @@ method connect*(
|
||||||
peerId: PeerId,
|
peerId: PeerId,
|
||||||
addrs: seq[MultiAddress],
|
addrs: seq[MultiAddress],
|
||||||
forceDial = false,
|
forceDial = false,
|
||||||
reuseConnection = true): Future[void] {.public.} =
|
reuseConnection = true,
|
||||||
|
upgradeDir = Direction.Out): Future[void] {.public.} =
|
||||||
## Connects to a peer without opening a stream to it
|
## Connects to a peer without opening a stream to it
|
||||||
|
|
||||||
s.dialer.connect(peerId, addrs, forceDial, reuseConnection)
|
s.dialer.connect(peerId, addrs, forceDial, reuseConnection, upgradeDir)
|
||||||
|
|
||||||
method connect*(
|
method connect*(
|
||||||
s: Switch,
|
s: Switch,
|
||||||
|
@ -221,7 +220,7 @@ proc mount*[T: LPProtocol](s: Switch, proto: T, matcher: Matcher = nil)
|
||||||
s.peerInfo.protocols.add(proto.codec)
|
s.peerInfo.protocols.add(proto.codec)
|
||||||
|
|
||||||
proc upgrader(switch: Switch, trans: Transport, conn: Connection) {.async.} =
|
proc upgrader(switch: Switch, trans: Transport, conn: Connection) {.async.} =
|
||||||
let muxed = await trans.upgradeIncoming(conn)
|
let muxed = await trans.upgrade(conn, Direction.In, Opt.none(PeerId))
|
||||||
switch.connManager.storeMuxer(muxed)
|
switch.connManager.storeMuxer(muxed)
|
||||||
await switch.peerStore.identify(muxed)
|
await switch.peerStore.identify(muxed)
|
||||||
trace "Connection upgrade succeeded"
|
trace "Connection upgrade succeeded"
|
||||||
|
|
|
@ -83,24 +83,16 @@ proc dial*(
|
||||||
peerId: Opt[PeerId] = Opt.none(PeerId)): Future[Connection] {.gcsafe.} =
|
peerId: Opt[PeerId] = Opt.none(PeerId)): Future[Connection] {.gcsafe.} =
|
||||||
self.dial("", address)
|
self.dial("", address)
|
||||||
|
|
||||||
method upgradeIncoming*(
|
method upgrade*(
|
||||||
self: Transport,
|
|
||||||
conn: Connection): Future[Muxer] {.base, gcsafe.} =
|
|
||||||
## base upgrade method that the transport uses to perform
|
|
||||||
## transport specific upgrades
|
|
||||||
##
|
|
||||||
|
|
||||||
self.upgrader.upgradeIncoming(conn)
|
|
||||||
|
|
||||||
method upgradeOutgoing*(
|
|
||||||
self: Transport,
|
self: Transport,
|
||||||
conn: Connection,
|
conn: Connection,
|
||||||
|
direction: Direction,
|
||||||
peerId: Opt[PeerId]): Future[Muxer] {.base, gcsafe.} =
|
peerId: Opt[PeerId]): Future[Muxer] {.base, gcsafe.} =
|
||||||
## base upgrade method that the transport uses to perform
|
## base upgrade method that the transport uses to perform
|
||||||
## transport specific upgrades
|
## transport specific upgrades
|
||||||
##
|
##
|
||||||
|
|
||||||
self.upgrader.upgradeOutgoing(conn, peerId)
|
self.upgrader.upgrade(conn, direction, peerId)
|
||||||
|
|
||||||
method handles*(
|
method handles*(
|
||||||
self: Transport,
|
self: Transport,
|
||||||
|
|
|
@ -62,7 +62,7 @@ proc mux*(
|
||||||
muxer.handler = muxer.handle()
|
muxer.handler = muxer.handle()
|
||||||
return muxer
|
return muxer
|
||||||
|
|
||||||
proc upgrade(
|
method upgrade*(
|
||||||
self: MuxedUpgrade,
|
self: MuxedUpgrade,
|
||||||
conn: Connection,
|
conn: Connection,
|
||||||
direction: Direction,
|
direction: Direction,
|
||||||
|
@ -90,17 +90,6 @@ proc upgrade(
|
||||||
trace "Upgraded connection", conn, sconn, direction
|
trace "Upgraded connection", conn, sconn, direction
|
||||||
return muxer
|
return muxer
|
||||||
|
|
||||||
method upgradeOutgoing*(
|
|
||||||
self: MuxedUpgrade,
|
|
||||||
conn: Connection,
|
|
||||||
peerId: Opt[PeerId]): Future[Muxer] {.async, gcsafe.} =
|
|
||||||
return await self.upgrade(conn, Out, peerId)
|
|
||||||
|
|
||||||
method upgradeIncoming*(
|
|
||||||
self: MuxedUpgrade,
|
|
||||||
conn: Connection): Future[Muxer] {.async, gcsafe.} =
|
|
||||||
return await self.upgrade(conn, In, Opt.none(PeerId))
|
|
||||||
|
|
||||||
proc new*(
|
proc new*(
|
||||||
T: type MuxedUpgrade,
|
T: type MuxedUpgrade,
|
||||||
muxers: seq[MuxerProvider],
|
muxers: seq[MuxerProvider],
|
||||||
|
|
|
@ -28,7 +28,8 @@ import ../stream/connection,
|
||||||
|
|
||||||
export connmanager, connection, identify, secure, multistream
|
export connmanager, connection, identify, secure, multistream
|
||||||
|
|
||||||
declarePublicCounter(libp2p_failed_upgrade, "peers failed upgrade")
|
declarePublicCounter(libp2p_failed_upgrades_incoming, "incoming connections failed upgrades")
|
||||||
|
declarePublicCounter(libp2p_failed_upgrades_outgoing, "outgoing connections failed upgrades")
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "libp2p upgrade"
|
topics = "libp2p upgrade"
|
||||||
|
@ -41,14 +42,10 @@ type
|
||||||
connManager*: ConnManager
|
connManager*: ConnManager
|
||||||
secureManagers*: seq[Secure]
|
secureManagers*: seq[Secure]
|
||||||
|
|
||||||
method upgradeIncoming*(
|
method upgrade*(
|
||||||
self: Upgrade,
|
|
||||||
conn: Connection): Future[Muxer] {.base.} =
|
|
||||||
doAssert(false, "Not implemented!")
|
|
||||||
|
|
||||||
method upgradeOutgoing*(
|
|
||||||
self: Upgrade,
|
self: Upgrade,
|
||||||
conn: Connection,
|
conn: Connection,
|
||||||
|
direction: Direction,
|
||||||
peerId: Opt[PeerId]): Future[Muxer] {.base.} =
|
peerId: Opt[PeerId]): Future[Muxer] {.base.} =
|
||||||
doAssert(false, "Not implemented!")
|
doAssert(false, "Not implemented!")
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,188 @@
|
||||||
|
# Nim-LibP2P
|
||||||
|
# Copyright (c) 2023 Status Research & Development GmbH
|
||||||
|
# Licensed under either of
|
||||||
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||||
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||||
|
# at your option.
|
||||||
|
# This file may not be copied, modified, or distributed except according to
|
||||||
|
# those terms.
|
||||||
|
|
||||||
|
import chronos
|
||||||
|
import unittest2
|
||||||
|
|
||||||
|
import ../libp2p/protocols/connectivity/dcutr/core as dcore
|
||||||
|
import ../libp2p/protocols/connectivity/dcutr/[client, server]
|
||||||
|
from ../libp2p/protocols/connectivity/autonat/core import NetworkReachability
|
||||||
|
import ../libp2p/builders
|
||||||
|
import ../libp2p/utils/future
|
||||||
|
import ./helpers
|
||||||
|
|
||||||
|
type
|
||||||
|
SwitchStub* = ref object of Switch
|
||||||
|
switch: Switch
|
||||||
|
connectStub*: proc(): Future[void] {.async.}
|
||||||
|
|
||||||
|
proc new*(T: typedesc[SwitchStub], switch: Switch, connectStub: proc (): Future[void] {.async.} = nil): T =
|
||||||
|
return SwitchStub(
|
||||||
|
switch: switch,
|
||||||
|
peerInfo: switch.peerInfo,
|
||||||
|
ms: switch.ms,
|
||||||
|
transports: switch.transports,
|
||||||
|
connManager: switch.connManager,
|
||||||
|
peerStore: switch.peerStore,
|
||||||
|
dialer: switch.dialer,
|
||||||
|
nameResolver: switch.nameResolver,
|
||||||
|
services: switch.services,
|
||||||
|
connectStub: connectStub)
|
||||||
|
|
||||||
|
method connect*(
|
||||||
|
self: SwitchStub,
|
||||||
|
peerId: PeerId,
|
||||||
|
addrs: seq[MultiAddress],
|
||||||
|
forceDial = false,
|
||||||
|
reuseConnection = true,
|
||||||
|
upgradeDir = Direction.Out) {.async.} =
|
||||||
|
if (self.connectStub != nil):
|
||||||
|
await self.connectStub()
|
||||||
|
else:
|
||||||
|
await self.switch.connect(peerId, addrs, forceDial, reuseConnection, upgradeDir)
|
||||||
|
|
||||||
|
suite "Dcutr":
|
||||||
|
teardown:
|
||||||
|
checkTrackers()
|
||||||
|
|
||||||
|
asyncTest "Connect msg Encode / Decode":
|
||||||
|
let addrs = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet(), MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
|
||||||
|
let connectMsg = DcutrMsg(msgType: MsgType.Connect, addrs: addrs)
|
||||||
|
|
||||||
|
let pb = connectMsg.encode()
|
||||||
|
let connectMsgDecoded = DcutrMsg.decode(pb.buffer)
|
||||||
|
|
||||||
|
check connectMsg == connectMsgDecoded
|
||||||
|
|
||||||
|
asyncTest "Sync msg Encode / Decode":
|
||||||
|
let addrs = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet(), MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
|
||||||
|
let syncMsg = DcutrMsg(msgType: MsgType.Sync, addrs: addrs)
|
||||||
|
|
||||||
|
let pb = syncMsg.encode()
|
||||||
|
let syncMsgDecoded = DcutrMsg.decode(pb.buffer)
|
||||||
|
|
||||||
|
check syncMsg == syncMsgDecoded
|
||||||
|
|
||||||
|
asyncTest "DCUtR establishes a new connection":
|
||||||
|
|
||||||
|
let behindNATSwitch = newStandardSwitch()
|
||||||
|
let publicSwitch = newStandardSwitch()
|
||||||
|
|
||||||
|
let dcutrProto = Dcutr.new(publicSwitch)
|
||||||
|
publicSwitch.mount(dcutrProto)
|
||||||
|
|
||||||
|
await allFutures(behindNATSwitch.start(), publicSwitch.start())
|
||||||
|
|
||||||
|
await publicSwitch.connect(behindNATSwitch.peerInfo.peerId, behindNATSwitch.peerInfo.addrs)
|
||||||
|
|
||||||
|
for t in behindNATSwitch.transports:
|
||||||
|
t.networkReachability = NetworkReachability.NotReachable
|
||||||
|
|
||||||
|
expect CatchableError:
|
||||||
|
# we can't hole punch when both peers are in the same machine. This means that the simultaneous dialings will result
|
||||||
|
# in two connections attemps, instead of one. This dial is going to fail because the dcutr client is acting as the
|
||||||
|
# tcp simultaneous incoming upgrader in the dialer which works only in the simultaneous open case.
|
||||||
|
await DcutrClient.new().startSync(behindNATSwitch, publicSwitch.peerInfo.peerId, behindNATSwitch.peerInfo.addrs)
|
||||||
|
.wait(300.millis)
|
||||||
|
|
||||||
|
checkExpiring:
|
||||||
|
# we still expect a new connection to be open by the receiver peer acting as the dcutr server
|
||||||
|
behindNATSwitch.connManager.connCount(publicSwitch.peerInfo.peerId) == 2
|
||||||
|
|
||||||
|
await allFutures(behindNATSwitch.stop(), publicSwitch.stop())
|
||||||
|
|
||||||
|
template ductrClientTest(behindNATSwitch: Switch, publicSwitch: Switch, body: untyped) =
|
||||||
|
let dcutrProto = Dcutr.new(publicSwitch)
|
||||||
|
publicSwitch.mount(dcutrProto)
|
||||||
|
|
||||||
|
await allFutures(behindNATSwitch.start(), publicSwitch.start())
|
||||||
|
|
||||||
|
await publicSwitch.connect(behindNATSwitch.peerInfo.peerId, behindNATSwitch.peerInfo.addrs)
|
||||||
|
|
||||||
|
for t in behindNATSwitch.transports:
|
||||||
|
t.networkReachability = NetworkReachability.NotReachable
|
||||||
|
|
||||||
|
body
|
||||||
|
|
||||||
|
checkExpiring:
|
||||||
|
# we still expect a new connection to be open by the receiver peer acting as the dcutr server
|
||||||
|
behindNATSwitch.connManager.connCount(publicSwitch.peerInfo.peerId) == 2
|
||||||
|
|
||||||
|
await allFutures(behindNATSwitch.stop(), publicSwitch.stop())
|
||||||
|
|
||||||
|
asyncTest "Client connect timeout":
|
||||||
|
|
||||||
|
proc connectTimeoutProc(): Future[void] {.async.} =
|
||||||
|
await sleepAsync(100.millis)
|
||||||
|
|
||||||
|
let behindNATSwitch = SwitchStub.new(newStandardSwitch(), connectTimeoutProc)
|
||||||
|
let publicSwitch = newStandardSwitch()
|
||||||
|
ductrClientTest(behindNATSwitch, publicSwitch):
|
||||||
|
try:
|
||||||
|
let client = DcutrClient.new(connectTimeout = 5.millis)
|
||||||
|
await client.startSync(behindNATSwitch, publicSwitch.peerInfo.peerId, behindNATSwitch.peerInfo.addrs)
|
||||||
|
except DcutrError as err:
|
||||||
|
check err.parent of AsyncTimeoutError
|
||||||
|
|
||||||
|
asyncTest "All client connect attempts fail":
|
||||||
|
|
||||||
|
proc connectErrorProc(): Future[void] {.async.} =
|
||||||
|
raise newException(CatchableError, "error")
|
||||||
|
|
||||||
|
let behindNATSwitch = SwitchStub.new(newStandardSwitch(), connectErrorProc)
|
||||||
|
let publicSwitch = newStandardSwitch()
|
||||||
|
ductrClientTest(behindNATSwitch, publicSwitch):
|
||||||
|
try:
|
||||||
|
let client = DcutrClient.new(connectTimeout = 5.millis)
|
||||||
|
await client.startSync(behindNATSwitch, publicSwitch.peerInfo.peerId, behindNATSwitch.peerInfo.addrs)
|
||||||
|
except DcutrError as err:
|
||||||
|
check err.parent of AllFuturesFailedError
|
||||||
|
|
||||||
|
proc ductrServerTest(connectStub: proc (): Future[void] {.async.}) {.async.} =
|
||||||
|
let behindNATSwitch = newStandardSwitch()
|
||||||
|
let publicSwitch = SwitchStub.new(newStandardSwitch())
|
||||||
|
|
||||||
|
let dcutrProto = Dcutr.new(publicSwitch, connectTimeout = 5.millis)
|
||||||
|
publicSwitch.mount(dcutrProto)
|
||||||
|
|
||||||
|
await allFutures(behindNATSwitch.start(), publicSwitch.start())
|
||||||
|
|
||||||
|
await publicSwitch.connect(behindNATSwitch.peerInfo.peerId, behindNATSwitch.peerInfo.addrs)
|
||||||
|
|
||||||
|
publicSwitch.connectStub = connectStub
|
||||||
|
|
||||||
|
for t in behindNATSwitch.transports:
|
||||||
|
t.networkReachability = NetworkReachability.NotReachable
|
||||||
|
|
||||||
|
expect CatchableError:
|
||||||
|
# we can't hole punch when both peers are in the same machine. This means that the simultaneous dialings will result
|
||||||
|
# in two connections attemps, instead of one. This dial is going to fail because the dcutr client is acting as the
|
||||||
|
# tcp simultaneous incoming upgrader in the dialer which works only in the simultaneous open case.
|
||||||
|
await DcutrClient.new().startSync(behindNATSwitch, publicSwitch.peerInfo.peerId, behindNATSwitch.peerInfo.addrs)
|
||||||
|
.wait(300.millis)
|
||||||
|
|
||||||
|
checkExpiring:
|
||||||
|
# we still expect a new connection to be open by the receiver peer acting as the dcutr server
|
||||||
|
behindNATSwitch.connManager.connCount(publicSwitch.peerInfo.peerId) == 1
|
||||||
|
|
||||||
|
await allFutures(behindNATSwitch.stop(), publicSwitch.stop())
|
||||||
|
|
||||||
|
asyncTest "DCUtR server timeout when establishing a new connection":
|
||||||
|
|
||||||
|
proc connectProc(): Future[void] {.async.} =
|
||||||
|
await sleepAsync(100.millis)
|
||||||
|
|
||||||
|
await ductrServerTest(connectProc)
|
||||||
|
|
||||||
|
asyncTest "DCUtR server error when establishing a new connection":
|
||||||
|
|
||||||
|
proc connectProc(): Future[void] {.async.} =
|
||||||
|
raise newException(CatchableError, "error")
|
||||||
|
|
||||||
|
await ductrServerTest(connectProc)
|
|
@ -43,4 +43,5 @@ import testtcptransport,
|
||||||
testyamux,
|
testyamux,
|
||||||
testautonat,
|
testautonat,
|
||||||
testautonatservice,
|
testautonatservice,
|
||||||
testautorelay
|
testautorelay,
|
||||||
|
testdcutr
|
||||||
|
|
Loading…
Reference in New Issue