Refactor dialer and upgrade

This commit is contained in:
Diego 2023-01-05 17:29:25 +01:00
parent e304ad0f7e
commit 45311cefe9
No known key found for this signature in database
GPG Key ID: C9DAC9BF68D1F806
1 changed files with 81 additions and 47 deletions

View File

@ -47,52 +47,87 @@ type
transports: seq[Transport] transports: seq[Transport]
nameResolver: NameResolver nameResolver: NameResolver
proc dialAndUpgrade( proc tryDial(
peerId: Opt[PeerId],
hostname: string,
address: MultiAddress,
transport: Transport):
Future[Connection] {.async.} =
trace "Dialing address", address, peerId, hostname
result =
try:
libp2p_total_dial_attempts.inc()
await transport.dial(hostname, address)
except CancelledError as exc:
debug "Dialing canceled", msg = exc.msg, peerId
raise exc
except CatchableError as exc:
debug "Dialing failed", msg = exc.msg, peerId
libp2p_failed_dials.inc()
raise exc
proc tryUpgrade(
peerId: Opt[PeerId],
hostname: string,
transport: Transport,
conn: Connection):
Future[Connection] {.async.} =
result =
try:
await transport.upgradeOutgoing(conn, peerId)
except CatchableError as exc:
# If we failed to upgrade the connection through one transport,
# we won't succeeded through another - no use in trying again
await conn.close()
debug "Upgrade failed", msg = exc.msg, peerId
if exc isnot CancelledError:
libp2p_failed_upgrades_outgoing.inc()
raise exc
proc tryDialAndUpgrade(
peerId: Opt[PeerId],
hostname: string,
address: MultiAddress,
transport: Transport):
Future[Connection] {.async.} =
if transport.handles(address): # check if it can dial it
let conn = await tryDial(peerId, hostname, address, transport)
# also keep track of the connection's bottom unsafe transport direction
# required by gossipsub scoring
conn.transportDir = Direction.Out
libp2p_successful_dials.inc()
let upgradedConn = await tryUpgrade(peerId, hostname, transport, conn)
doAssert not isNil(upgradedConn), "connection died after upgradeOutgoing"
debug "Dial successful", upgradedConn, peerId = upgradedConn.peerId
return upgradedConn
raise newException(DialFailedError, "Transport does not handle the address")
proc tryDialAndUpgrade(
peerId: Opt[PeerId],
hostname: string,
address: MultiAddress,
transports: seq[Transport]):
Future[Connection] {.async.} =
for transport in transports:
try:
return await tryDialAndUpgrade(peerId, hostname, address, transport)
except CatchableError:
continue
raise newException(DialFailedError, "No transport succeeded")
proc tryDialAndUpgrade(
self: Dialer, self: Dialer,
peerId: Opt[PeerId], peerId: Opt[PeerId],
hostname: string, hostname: string,
address: MultiAddress): addresses: seq[MultiAddress]):
Future[Connection] {.async.} = Future[Connection] {.async.} =
for address in addresses:
for transport in self.transports: # for each transport try:
if transport.handles(address): # check if it can dial it return await tryDialAndUpgrade(peerId, hostname, address, self.transports)
trace "Dialing address", address, peerId, hostname except CatchableError:
let dialed = continue
try: raise newException(DialFailedError, "No address succeeded")
libp2p_total_dial_attempts.inc()
await transport.dial(hostname, address)
except CancelledError as exc:
debug "Dialing canceled", msg = exc.msg, peerId
raise exc
except CatchableError as exc:
debug "Dialing failed", msg = exc.msg, peerId
libp2p_failed_dials.inc()
return nil # Try the next address
# also keep track of the connection's bottom unsafe transport direction
# required by gossipsub scoring
dialed.transportDir = Direction.Out
libp2p_successful_dials.inc()
let conn =
try:
await transport.upgradeOutgoing(dialed, peerId)
except CatchableError as exc:
# If we failed to establish the connection through one transport,
# we won't succeeded through another - no use in trying again
await dialed.close()
debug "Upgrade failed", msg = exc.msg, peerId
if exc isnot CancelledError:
libp2p_failed_upgrades_outgoing.inc()
# Try other address
return nil
doAssert not isNil(conn), "connection died after upgradeOutgoing"
debug "Dial successful", conn, peerId = conn.peerId
return conn
return nil
proc expandDnsAddr( proc expandDnsAddr(
self: Dialer, self: Dialer,
@ -141,11 +176,10 @@ proc dialAndUpgrade(
resolvedAddresses = resolvedAddresses =
if isNil(self.nameResolver): @[expandedAddress] if isNil(self.nameResolver): @[expandedAddress]
else: await self.nameResolver.resolveMAddress(expandedAddress) else: await self.nameResolver.resolveMAddress(expandedAddress)
try:
for resolvedAddress in resolvedAddresses: return await self.tryDialAndUpgrade(addrPeerId, hostname, resolvedAddresses)
result = await self.dialAndUpgrade(addrPeerId, hostname, resolvedAddress) except CatchableError:
if not isNil(result): continue
return result
proc internalConnect( proc internalConnect(
self: Dialer, self: Dialer,