Better dnsaddr resolving (#753)
This commit is contained in:
parent
a56c3bc296
commit
fa5d102370
|
@ -7,7 +7,7 @@
|
|||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
import std/[sugar, tables]
|
||||
import std/[sugar, tables, sequtils]
|
||||
|
||||
import stew/results
|
||||
import pkg/[chronos,
|
||||
|
@ -17,6 +17,7 @@ import pkg/[chronos,
|
|||
import dial,
|
||||
peerid,
|
||||
peerinfo,
|
||||
multicodec,
|
||||
multistream,
|
||||
connmanager,
|
||||
stream/connection,
|
||||
|
@ -46,56 +47,105 @@ type
|
|||
transports: seq[Transport]
|
||||
nameResolver: NameResolver
|
||||
|
||||
proc dialAndUpgrade(
|
||||
self: Dialer,
|
||||
peerId: Opt[PeerId],
|
||||
hostname: string,
|
||||
address: MultiAddress):
|
||||
Future[Connection] {.async.} =
|
||||
|
||||
for transport in self.transports: # for each transport
|
||||
if transport.handles(address): # check if it can dial it
|
||||
trace "Dialing address", address, peerId, hostname
|
||||
let dialed =
|
||||
try:
|
||||
libp2p_total_dial_attempts.inc()
|
||||
await transport.dial(hostname, address)
|
||||
except CancelledError as exc:
|
||||
debug "Dialing canceled", msg = exc.msg, peerId
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
debug "Dialing failed", msg = exc.msg, peerId
|
||||
libp2p_failed_dials.inc()
|
||||
return nil # Try the next address
|
||||
|
||||
# also keep track of the connection's bottom unsafe transport direction
|
||||
# required by gossipsub scoring
|
||||
dialed.transportDir = Direction.Out
|
||||
|
||||
libp2p_successful_dials.inc()
|
||||
|
||||
let conn =
|
||||
try:
|
||||
await transport.upgradeOutgoing(dialed, peerId)
|
||||
except CatchableError as exc:
|
||||
# If we failed to establish the connection through one transport,
|
||||
# we won't succeeded through another - no use in trying again
|
||||
await dialed.close()
|
||||
debug "Upgrade failed", msg = exc.msg, peerId
|
||||
if exc isnot CancelledError:
|
||||
libp2p_failed_upgrades_outgoing.inc()
|
||||
|
||||
# Try other address
|
||||
return nil
|
||||
|
||||
doAssert not isNil(conn), "connection died after upgradeOutgoing"
|
||||
debug "Dial successful", conn, peerId = conn.peerId
|
||||
return conn
|
||||
return nil
|
||||
|
||||
proc expandDnsAddr(
|
||||
self: Dialer,
|
||||
peerId: Opt[PeerId],
|
||||
address: MultiAddress): Future[seq[(MultiAddress, Opt[PeerId])]] {.async.} =
|
||||
|
||||
if not DNSADDR.matchPartial(address): return @[(address, peerId)]
|
||||
if isNil(self.nameResolver):
|
||||
info "Can't resolve DNSADDR without NameResolver", ma=address
|
||||
return @[]
|
||||
|
||||
let
|
||||
toResolve =
|
||||
if peerId.isSome:
|
||||
address & MultiAddress.init(multiCodec("p2p"), peerId.tryGet()).tryGet()
|
||||
else:
|
||||
address
|
||||
resolved = await self.nameResolver.resolveDnsAddr(toResolve)
|
||||
|
||||
for resolvedAddress in resolved:
|
||||
let lastPart = resolvedAddress[^1].tryGet()
|
||||
if lastPart.protoCode == Result[MultiCodec, string].ok(multiCodec("p2p")):
|
||||
let
|
||||
peerIdBytes = lastPart.protoArgument().tryGet()
|
||||
addrPeerId = PeerId.init(peerIdBytes).tryGet()
|
||||
result.add((resolvedAddress[0..^2].tryGet(), Opt.some(addrPeerId)))
|
||||
else:
|
||||
result.add((resolvedAddress, peerId))
|
||||
|
||||
proc dialAndUpgrade(
|
||||
self: Dialer,
|
||||
peerId: Opt[PeerId],
|
||||
addrs: seq[MultiAddress]):
|
||||
Future[Connection] {.async.} =
|
||||
|
||||
debug "Dialing peer", peerId
|
||||
|
||||
for address in addrs: # for each address
|
||||
let
|
||||
hostname = address.getHostname()
|
||||
resolvedAddresses =
|
||||
if isNil(self.nameResolver): @[address]
|
||||
else: await self.nameResolver.resolveMAddress(address)
|
||||
for rawAddress in addrs:
|
||||
# resolve potential dnsaddr
|
||||
let addresses = await self.expandDnsAddr(peerId, rawAddress)
|
||||
|
||||
for a in resolvedAddresses: # for each resolved address
|
||||
for transport in self.transports: # for each transport
|
||||
if transport.handles(a): # check if it can dial it
|
||||
trace "Dialing address", address = $a, peerId, hostname
|
||||
let dialed = try:
|
||||
libp2p_total_dial_attempts.inc()
|
||||
await transport.dial(hostname, a)
|
||||
except CancelledError as exc:
|
||||
debug "Dialing canceled", msg = exc.msg, peerId
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
debug "Dialing failed", msg = exc.msg, peerId
|
||||
libp2p_failed_dials.inc()
|
||||
continue # Try the next address
|
||||
for (expandedAddress, addrPeerId) in addresses:
|
||||
# DNS resolution
|
||||
let
|
||||
hostname = expandedAddress.getHostname()
|
||||
resolvedAddresses =
|
||||
if isNil(self.nameResolver): @[expandedAddress]
|
||||
else: await self.nameResolver.resolveMAddress(expandedAddress)
|
||||
|
||||
# also keep track of the connection's bottom unsafe transport direction
|
||||
# required by gossipsub scoring
|
||||
dialed.transportDir = Direction.Out
|
||||
|
||||
libp2p_successful_dials.inc()
|
||||
|
||||
let conn = try:
|
||||
await transport.upgradeOutgoing(dialed, peerId)
|
||||
except CatchableError as exc:
|
||||
# If we failed to establish the connection through one transport,
|
||||
# we won't succeeded through another - no use in trying again
|
||||
# TODO we should try another address though
|
||||
await dialed.close()
|
||||
debug "Upgrade failed", msg = exc.msg, peerId
|
||||
if exc isnot CancelledError:
|
||||
libp2p_failed_upgrades_outgoing.inc()
|
||||
raise exc
|
||||
|
||||
doAssert not isNil(conn), "connection died after upgradeOutgoing"
|
||||
debug "Dial successful", conn, peerId = conn.peerId
|
||||
return conn
|
||||
for resolvedAddress in resolvedAddresses:
|
||||
result = await self.dialAndUpgrade(addrPeerId, hostname, resolvedAddress)
|
||||
if not isNil(result):
|
||||
return result
|
||||
|
||||
proc internalConnect(
|
||||
self: Dialer,
|
||||
|
|
|
@ -573,7 +573,7 @@ proc protoArgument*(ma: MultiAddress,
|
|||
err("multiaddress: Decoding protocol error")
|
||||
else:
|
||||
ok(res)
|
||||
elif proto.kind in {Length, Path}:
|
||||
elif proto.kind in {MAKind.Length, Path}:
|
||||
if vb.data.readSeq(buffer) == -1:
|
||||
err("multiaddress: Decoding protocol error")
|
||||
else:
|
||||
|
@ -594,6 +594,13 @@ proc protoAddress*(ma: MultiAddress): MaResult[seq[byte]] =
|
|||
buffer.setLen(res)
|
||||
ok(buffer)
|
||||
|
||||
proc protoArgument*(ma: MultiAddress): MaResult[seq[byte]] =
|
||||
## Returns MultiAddress ``ma`` protocol address binary blob.
|
||||
##
|
||||
## If current MultiAddress do not have argument value, then result array will
|
||||
## be empty.
|
||||
ma.protoAddress()
|
||||
|
||||
proc getPart(ma: MultiAddress, index: int): MaResult[MultiAddress] =
|
||||
var header: uint64
|
||||
var data = newSeq[byte]()
|
||||
|
@ -601,6 +608,9 @@ proc getPart(ma: MultiAddress, index: int): MaResult[MultiAddress] =
|
|||
var vb = ma
|
||||
var res: MultiAddress
|
||||
res.data = initVBuffer()
|
||||
|
||||
if index < 0: return err("multiaddress: negative index gived to getPart")
|
||||
|
||||
while offset <= index:
|
||||
if vb.data.readVarint(header) == -1:
|
||||
return err("multiaddress: Malformed binary address!")
|
||||
|
@ -618,7 +628,7 @@ proc getPart(ma: MultiAddress, index: int): MaResult[MultiAddress] =
|
|||
res.data.writeVarint(header)
|
||||
res.data.writeArray(data)
|
||||
res.data.finish()
|
||||
elif proto.kind in {Length, Path}:
|
||||
elif proto.kind in {MAKind.Length, Path}:
|
||||
if vb.data.readSeq(data) == -1:
|
||||
return err("multiaddress: Decoding protocol error")
|
||||
|
||||
|
@ -647,9 +657,13 @@ proc getParts[U, V](ma: MultiAddress, slice: HSlice[U, V]): MaResult[MultiAddres
|
|||
? res.append(? ma[i])
|
||||
ok(res)
|
||||
|
||||
proc `[]`*(ma: MultiAddress, i: int): MaResult[MultiAddress] {.inline.} =
|
||||
proc `[]`*(ma: MultiAddress, i: int | BackwardsIndex): MaResult[MultiAddress] {.inline.} =
|
||||
## Returns part with index ``i`` of MultiAddress ``ma``.
|
||||
ma.getPart(i)
|
||||
when i is BackwardsIndex:
|
||||
let maLength = ? len(ma)
|
||||
ma.getPart(maLength - int(i))
|
||||
else:
|
||||
ma.getPart(i)
|
||||
|
||||
proc `[]`*(ma: MultiAddress, slice: HSlice): MaResult[MultiAddress] {.inline.} =
|
||||
## Returns parts with slice ``slice`` of MultiAddress ``ma``.
|
||||
|
@ -680,7 +694,7 @@ iterator items*(ma: MultiAddress): MaResult[MultiAddress] =
|
|||
|
||||
res.data.writeVarint(header)
|
||||
res.data.writeArray(data)
|
||||
elif proto.kind in {Length, Path}:
|
||||
elif proto.kind in {MAKind.Length, Path}:
|
||||
if vb.data.readSeq(data) == -1:
|
||||
yield err(MaResult[MultiAddress], "Decoding protocol error")
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ else:
|
|||
{.push raises: [].}
|
||||
|
||||
import std/[sugar, sets, sequtils, strutils]
|
||||
import
|
||||
import
|
||||
chronos,
|
||||
chronicles,
|
||||
stew/[endians2, byteutils]
|
||||
|
@ -22,14 +22,14 @@ import ".."/[multiaddress, multicodec]
|
|||
logScope:
|
||||
topics = "libp2p nameresolver"
|
||||
|
||||
type
|
||||
type
|
||||
NameResolver* = ref object of RootObj
|
||||
|
||||
method resolveTxt*(
|
||||
self: NameResolver,
|
||||
address: string): Future[seq[string]] {.async, base.} =
|
||||
## Get TXT record
|
||||
##
|
||||
##
|
||||
|
||||
doAssert(false, "Not implemented!")
|
||||
|
||||
|
@ -39,16 +39,18 @@ method resolveIp*(
|
|||
port: Port,
|
||||
domain: Domain = Domain.AF_UNSPEC): Future[seq[TransportAddress]] {.async, base.} =
|
||||
## Resolve the specified address
|
||||
##
|
||||
##
|
||||
|
||||
doAssert(false, "Not implemented!")
|
||||
|
||||
proc getHostname*(ma: MultiAddress): string =
|
||||
let firstPart = ($ma[0].get()).split('/')
|
||||
if firstPart.len > 1: firstPart[2]
|
||||
let
|
||||
firstPart = ma[0].valueOr: return ""
|
||||
fpSplitted = ($firstPart).split('/', 2)
|
||||
if fpSplitted.len > 2: fpSplitted[2]
|
||||
else: ""
|
||||
|
||||
proc resolveDnsAddress(
|
||||
proc resolveOneAddress(
|
||||
self: NameResolver,
|
||||
ma: MultiAddress,
|
||||
domain: Domain = Domain.AF_UNSPEC,
|
||||
|
@ -64,29 +66,22 @@ proc resolveDnsAddress(
|
|||
let
|
||||
port = Port(fromBytesBE(uint16, pbuf))
|
||||
resolvedAddresses = await self.resolveIp(prefix & dnsval, port, domain)
|
||||
|
||||
|
||||
return collect(newSeqOfCap(4)):
|
||||
for address in resolvedAddresses:
|
||||
var createdAddress = MultiAddress.init(address).tryGet()[0].tryGet()
|
||||
for part in ma:
|
||||
if DNS.match(part.get()): continue
|
||||
if DNS.match(part.tryGet()): continue
|
||||
createdAddress &= part.tryGet()
|
||||
createdAddress
|
||||
|
||||
func matchDnsSuffix(m1, m2: MultiAddress): MaResult[bool] =
|
||||
for partMaybe in m1:
|
||||
let part = ?partMaybe
|
||||
if DNS.match(part): continue
|
||||
let entryProt = ?m2[?part.protoCode()]
|
||||
if entryProt != part:
|
||||
return ok(false)
|
||||
return ok(true)
|
||||
|
||||
proc resolveDnsAddr(
|
||||
proc resolveDnsAddr*(
|
||||
self: NameResolver,
|
||||
ma: MultiAddress,
|
||||
depth: int = 0): Future[seq[MultiAddress]]
|
||||
{.async.} =
|
||||
depth: int = 0): Future[seq[MultiAddress]] {.async.} =
|
||||
|
||||
if not DNSADDR.matchPartial(ma):
|
||||
return @[ma]
|
||||
|
||||
trace "Resolving dnsaddr", ma
|
||||
if depth > 6:
|
||||
|
@ -104,21 +99,17 @@ proc resolveDnsAddr(
|
|||
if not entry.startsWith("dnsaddr="): continue
|
||||
let entryValue = MultiAddress.init(entry[8..^1]).tryGet()
|
||||
|
||||
if not matchDnsSuffix(ma, entryValue).tryGet(): continue
|
||||
if entryValue.contains(multiCodec("p2p")).tryGet() and ma.contains(multiCodec("p2p")).tryGet():
|
||||
if entryValue[multiCodec("p2p")] != ma[multiCodec("p2p")]:
|
||||
continue
|
||||
|
||||
# The spec is not clear wheter only DNSADDR can be recursived
|
||||
# or any DNS addr. Only handling DNSADDR because it's simpler
|
||||
# to avoid infinite recursion
|
||||
if DNSADDR.matchPartial(entryValue):
|
||||
let resolved = await self.resolveDnsAddr(entryValue, depth + 1)
|
||||
for r in resolved:
|
||||
result.add(r)
|
||||
else:
|
||||
result.add(entryValue)
|
||||
let resolved = await self.resolveDnsAddr(entryValue, depth + 1)
|
||||
for r in resolved:
|
||||
result.add(r)
|
||||
|
||||
if result.len == 0:
|
||||
debug "Failed to resolve any DNSADDR", ma
|
||||
return @[ma]
|
||||
debug "Failed to resolve a DNSADDR", ma
|
||||
return @[]
|
||||
return result
|
||||
|
||||
|
||||
|
@ -133,14 +124,15 @@ proc resolveMAddress*(
|
|||
let code = address[0].get().protoCode().get()
|
||||
let seq = case code:
|
||||
of multiCodec("dns"):
|
||||
await self.resolveDnsAddress(address)
|
||||
await self.resolveOneAddress(address)
|
||||
of multiCodec("dns4"):
|
||||
await self.resolveDnsAddress(address, Domain.AF_INET)
|
||||
await self.resolveOneAddress(address, Domain.AF_INET)
|
||||
of multiCodec("dns6"):
|
||||
await self.resolveDnsAddress(address, Domain.AF_INET6)
|
||||
await self.resolveOneAddress(address, Domain.AF_INET6)
|
||||
of multiCodec("dnsaddr"):
|
||||
await self.resolveDnsAddr(address)
|
||||
else:
|
||||
doAssert false
|
||||
@[address]
|
||||
for ad in seq:
|
||||
res.incl(ad)
|
||||
|
|
|
@ -386,6 +386,11 @@ suite "MultiAddress test suite":
|
|||
let ma = MultiAddress.init("/ip4/0.0.0.0/tcp/0/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC/p2p-circuit/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSuNEXT/unix/stdio/").get()
|
||||
check:
|
||||
$ma[0..0].get() == "/ip4/0.0.0.0"
|
||||
$ma[^1].get() == "/unix/stdio"
|
||||
ma[-100].isErr()
|
||||
ma[100].isErr()
|
||||
ma[^100].isErr()
|
||||
ma[^0].isErr()
|
||||
$ma[0..1].get() == "/ip4/0.0.0.0/tcp/0"
|
||||
$ma[1..2].get() == "/tcp/0/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC"
|
||||
$ma[^3..^1].get() == "/p2p-circuit/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSuNEXT/unix/stdio"
|
||||
|
|
|
@ -139,7 +139,18 @@ suite "Name resolving":
|
|||
asyncTest "dnsaddr infinite recursion":
|
||||
resolver.txtResponses["_dnsaddr.bootstrap.libp2p.io"] = @["dnsaddr=/dnsaddr/bootstrap.libp2p.io"]
|
||||
|
||||
check testOne("/dnsaddr/bootstrap.libp2p.io/", "/dnsaddr/bootstrap.libp2p.io/")
|
||||
check testOne("/dnsaddr/bootstrap.libp2p.io/", newSeq[string]())
|
||||
|
||||
test "getHostname":
|
||||
check:
|
||||
MultiAddress.init("/dnsaddr/bootstrap.libp2p.io/").tryGet().getHostname == "bootstrap.libp2p.io"
|
||||
MultiAddress.init("").tryGet().getHostname == ""
|
||||
MultiAddress.init("/ip4/147.75.69.143/tcp/4001/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN").tryGet().getHostname == "147.75.69.143"
|
||||
MultiAddress.init("/ip6/2604:1380:1000:6000::1/tcp/4001/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN").tryGet().getHostname == "2604:1380:1000:6000::1"
|
||||
MultiAddress.init("/dns/localhost/udp/0").tryGet().getHostname == "localhost"
|
||||
MultiAddress.init("/dns4/hello.com/udp/0").tryGet().getHostname == "hello.com"
|
||||
MultiAddress.init("/dns6/hello.com/udp/0").tryGet().getHostname == "hello.com"
|
||||
MultiAddress.init("/wss/").tryGet().getHostname == ""
|
||||
|
||||
suite "DNS Resolving":
|
||||
teardown:
|
||||
|
@ -171,7 +182,7 @@ suite "Name resolving":
|
|||
|
||||
# The test
|
||||
var dnsresolver = DnsResolver.new(@[server.localAddress])
|
||||
|
||||
|
||||
check await(dnsresolver.resolveIp("status.im", 0.Port, Domain.AF_UNSPEC)) ==
|
||||
mapIt(
|
||||
@["104.22.24.181:0", "172.67.10.161:0", "104.22.25.181:0",
|
||||
|
@ -209,7 +220,7 @@ suite "Name resolving":
|
|||
|
||||
# The test
|
||||
var dnsresolver = DnsResolver.new(@[unresponsiveServer.localAddress, server.localAddress])
|
||||
|
||||
|
||||
check await(dnsresolver.resolveIp("status.im", 0.Port, Domain.AF_INET)) ==
|
||||
mapIt(@["104.22.24.181:0", "172.67.10.161:0", "104.22.25.181:0"], initTAddress(it))
|
||||
|
||||
|
|
|
@ -295,7 +295,7 @@ suite "Noise":
|
|||
(switch2, peerInfo2) = createSwitch(ma2, true, true) # secio, we want to fail
|
||||
await switch1.start()
|
||||
await switch2.start()
|
||||
expect(UpgradeFailedError):
|
||||
expect(DialFailedError):
|
||||
let conn = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, TestCodec)
|
||||
|
||||
await allFuturesThrowing(
|
||||
|
|
|
@ -201,13 +201,25 @@ suite "Switch":
|
|||
check not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
check not switch2.isConnected(switch1.peerInfo.peerId)
|
||||
|
||||
asyncTest "e2e connect to peer with unkown PeerId":
|
||||
asyncTest "e2e connect to peer with unknown PeerId":
|
||||
let resolver = MockResolver.new()
|
||||
let switch1 = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
|
||||
let switch2 = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
|
||||
let switch2 = newStandardSwitch(secureManagers = [SecureProtocol.Noise], nameResolver = resolver)
|
||||
await switch1.start()
|
||||
await switch2.start()
|
||||
|
||||
# via dnsaddr
|
||||
resolver.txtResponses["_dnsaddr.test.io"] = @[
|
||||
"dnsaddr=" & $switch1.peerInfo.addrs[0] & "/p2p/" & $switch1.peerInfo.peerId,
|
||||
]
|
||||
|
||||
check: (await switch2.connect(@[MultiAddress.init("/dnsaddr/test.io/").tryGet()])) == switch1.peerInfo.peerId
|
||||
await switch2.disconnect(switch1.peerInfo.peerId)
|
||||
|
||||
# via direct ip
|
||||
check not switch2.isConnected(switch1.peerInfo.peerId)
|
||||
check: (await switch2.connect(switch1.peerInfo.addrs)) == switch1.peerInfo.peerId
|
||||
|
||||
await switch2.disconnect(switch1.peerInfo.peerId)
|
||||
|
||||
await allFuturesThrowing(
|
||||
|
@ -665,7 +677,7 @@ suite "Switch":
|
|||
await switch.start()
|
||||
|
||||
var peerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).get()
|
||||
expect LPStreamClosedError, LPStreamEOFError:
|
||||
expect DialFailedError:
|
||||
await switch.connect(peerId, transport.addrs)
|
||||
|
||||
await handlerWait
|
||||
|
@ -994,9 +1006,10 @@ suite "Switch":
|
|||
await srcWsSwitch.start()
|
||||
|
||||
resolver.txtResponses["_dnsaddr.test.io"] = @[
|
||||
"dnsaddr=" & $destSwitch.peerInfo.addrs[0],
|
||||
"dnsaddr=" & $destSwitch.peerInfo.addrs[1]
|
||||
"dnsaddr=/dns4/localhost" & $destSwitch.peerInfo.addrs[0][1..^1].tryGet() & "/p2p/" & $destSwitch.peerInfo.peerId,
|
||||
"dnsaddr=/dns4/localhost" & $destSwitch.peerInfo.addrs[1][1..^1].tryGet()
|
||||
]
|
||||
resolver.ipResponses[("localhost", false)] = @["127.0.0.1"]
|
||||
|
||||
let testAddr = MultiAddress.init("/dnsaddr/test.io/").tryGet()
|
||||
|
||||
|
|
Loading…
Reference in New Issue