diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index 30ff15e27..d3c0826dd 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -7,7 +7,7 @@ # This file may not be copied, modified, or distributed except according to # those terms. -import std/[sugar, tables] +import std/[sugar, tables, sequtils] import stew/results import pkg/[chronos, @@ -17,6 +17,7 @@ import pkg/[chronos, import dial, peerid, peerinfo, + multicodec, multistream, connmanager, stream/connection, @@ -46,56 +47,105 @@ type transports: seq[Transport] nameResolver: NameResolver +proc dialAndUpgrade( + self: Dialer, + peerId: Opt[PeerId], + hostname: string, + address: MultiAddress): + Future[Connection] {.async.} = + + for transport in self.transports: # for each transport + if transport.handles(address): # check if it can dial it + trace "Dialing address", address, peerId, hostname + let dialed = + try: + libp2p_total_dial_attempts.inc() + await transport.dial(hostname, address) + except CancelledError as exc: + debug "Dialing canceled", msg = exc.msg, peerId + raise exc + except CatchableError as exc: + debug "Dialing failed", msg = exc.msg, peerId + libp2p_failed_dials.inc() + return nil # Try the next address + + # also keep track of the connection's bottom unsafe transport direction + # required by gossipsub scoring + dialed.transportDir = Direction.Out + + libp2p_successful_dials.inc() + + let conn = + try: + await transport.upgradeOutgoing(dialed, peerId) + except CatchableError as exc: + # If we failed to establish the connection through one transport, + # we won't succeeded through another - no use in trying again + await dialed.close() + debug "Upgrade failed", msg = exc.msg, peerId + if exc isnot CancelledError: + libp2p_failed_upgrades_outgoing.inc() + + # Try other address + return nil + + doAssert not isNil(conn), "connection died after upgradeOutgoing" + debug "Dial successful", conn, peerId = conn.peerId + return conn + return nil + +proc expandDnsAddr( + self: Dialer, + peerId: Opt[PeerId], + address: MultiAddress): Future[seq[(MultiAddress, Opt[PeerId])]] {.async.} = + + if not DNSADDR.matchPartial(address): return @[(address, peerId)] + if isNil(self.nameResolver): + info "Can't resolve DNSADDR without NameResolver", ma=address + return @[] + + let + toResolve = + if peerId.isSome: + address & MultiAddress.init(multiCodec("p2p"), peerId.tryGet()).tryGet() + else: + address + resolved = await self.nameResolver.resolveDnsAddr(toResolve) + + for resolvedAddress in resolved: + let lastPart = resolvedAddress[^1].tryGet() + if lastPart.protoCode == Result[MultiCodec, string].ok(multiCodec("p2p")): + let + peerIdBytes = lastPart.protoArgument().tryGet() + addrPeerId = PeerId.init(peerIdBytes).tryGet() + result.add((resolvedAddress[0..^2].tryGet(), Opt.some(addrPeerId))) + else: + result.add((resolvedAddress, peerId)) + proc dialAndUpgrade( self: Dialer, peerId: Opt[PeerId], addrs: seq[MultiAddress]): Future[Connection] {.async.} = + debug "Dialing peer", peerId - for address in addrs: # for each address - let - hostname = address.getHostname() - resolvedAddresses = - if isNil(self.nameResolver): @[address] - else: await self.nameResolver.resolveMAddress(address) + for rawAddress in addrs: + # resolve potential dnsaddr + let addresses = await self.expandDnsAddr(peerId, rawAddress) - for a in resolvedAddresses: # for each resolved address - for transport in self.transports: # for each transport - if transport.handles(a): # check if it can dial it - trace "Dialing address", address = $a, peerId, hostname - let dialed = try: - libp2p_total_dial_attempts.inc() - await transport.dial(hostname, a) - except CancelledError as exc: - debug "Dialing canceled", msg = exc.msg, peerId - raise exc - except CatchableError as exc: - debug "Dialing failed", msg = exc.msg, peerId - libp2p_failed_dials.inc() - continue # Try the next address + for (expandedAddress, addrPeerId) in addresses: + # DNS resolution + let + hostname = expandedAddress.getHostname() + resolvedAddresses = + if isNil(self.nameResolver): @[expandedAddress] + else: await self.nameResolver.resolveMAddress(expandedAddress) - # also keep track of the connection's bottom unsafe transport direction - # required by gossipsub scoring - dialed.transportDir = Direction.Out - - libp2p_successful_dials.inc() - - let conn = try: - await transport.upgradeOutgoing(dialed, peerId) - except CatchableError as exc: - # If we failed to establish the connection through one transport, - # we won't succeeded through another - no use in trying again - # TODO we should try another address though - await dialed.close() - debug "Upgrade failed", msg = exc.msg, peerId - if exc isnot CancelledError: - libp2p_failed_upgrades_outgoing.inc() - raise exc - - doAssert not isNil(conn), "connection died after upgradeOutgoing" - debug "Dial successful", conn, peerId = conn.peerId - return conn + for resolvedAddress in resolvedAddresses: + result = await self.dialAndUpgrade(addrPeerId, hostname, resolvedAddress) + if not isNil(result): + return result proc internalConnect( self: Dialer, diff --git a/libp2p/multiaddress.nim b/libp2p/multiaddress.nim index 054fb00e8..fd663f94a 100644 --- a/libp2p/multiaddress.nim +++ b/libp2p/multiaddress.nim @@ -573,7 +573,7 @@ proc protoArgument*(ma: MultiAddress, err("multiaddress: Decoding protocol error") else: ok(res) - elif proto.kind in {Length, Path}: + elif proto.kind in {MAKind.Length, Path}: if vb.data.readSeq(buffer) == -1: err("multiaddress: Decoding protocol error") else: @@ -594,6 +594,13 @@ proc protoAddress*(ma: MultiAddress): MaResult[seq[byte]] = buffer.setLen(res) ok(buffer) +proc protoArgument*(ma: MultiAddress): MaResult[seq[byte]] = + ## Returns MultiAddress ``ma`` protocol address binary blob. + ## + ## If current MultiAddress do not have argument value, then result array will + ## be empty. + ma.protoAddress() + proc getPart(ma: MultiAddress, index: int): MaResult[MultiAddress] = var header: uint64 var data = newSeq[byte]() @@ -601,6 +608,9 @@ proc getPart(ma: MultiAddress, index: int): MaResult[MultiAddress] = var vb = ma var res: MultiAddress res.data = initVBuffer() + + if index < 0: return err("multiaddress: negative index gived to getPart") + while offset <= index: if vb.data.readVarint(header) == -1: return err("multiaddress: Malformed binary address!") @@ -618,7 +628,7 @@ proc getPart(ma: MultiAddress, index: int): MaResult[MultiAddress] = res.data.writeVarint(header) res.data.writeArray(data) res.data.finish() - elif proto.kind in {Length, Path}: + elif proto.kind in {MAKind.Length, Path}: if vb.data.readSeq(data) == -1: return err("multiaddress: Decoding protocol error") @@ -647,9 +657,13 @@ proc getParts[U, V](ma: MultiAddress, slice: HSlice[U, V]): MaResult[MultiAddres ? res.append(? ma[i]) ok(res) -proc `[]`*(ma: MultiAddress, i: int): MaResult[MultiAddress] {.inline.} = +proc `[]`*(ma: MultiAddress, i: int | BackwardsIndex): MaResult[MultiAddress] {.inline.} = ## Returns part with index ``i`` of MultiAddress ``ma``. - ma.getPart(i) + when i is BackwardsIndex: + let maLength = ? len(ma) + ma.getPart(maLength - int(i)) + else: + ma.getPart(i) proc `[]`*(ma: MultiAddress, slice: HSlice): MaResult[MultiAddress] {.inline.} = ## Returns parts with slice ``slice`` of MultiAddress ``ma``. @@ -680,7 +694,7 @@ iterator items*(ma: MultiAddress): MaResult[MultiAddress] = res.data.writeVarint(header) res.data.writeArray(data) - elif proto.kind in {Length, Path}: + elif proto.kind in {MAKind.Length, Path}: if vb.data.readSeq(data) == -1: yield err(MaResult[MultiAddress], "Decoding protocol error") diff --git a/libp2p/nameresolving/nameresolver.nim b/libp2p/nameresolving/nameresolver.nim index 0fdd6a029..b53e0b4ed 100644 --- a/libp2p/nameresolving/nameresolver.nim +++ b/libp2p/nameresolving/nameresolver.nim @@ -13,7 +13,7 @@ else: {.push raises: [].} import std/[sugar, sets, sequtils, strutils] -import +import chronos, chronicles, stew/[endians2, byteutils] @@ -22,14 +22,14 @@ import ".."/[multiaddress, multicodec] logScope: topics = "libp2p nameresolver" -type +type NameResolver* = ref object of RootObj method resolveTxt*( self: NameResolver, address: string): Future[seq[string]] {.async, base.} = ## Get TXT record - ## + ## doAssert(false, "Not implemented!") @@ -39,16 +39,18 @@ method resolveIp*( port: Port, domain: Domain = Domain.AF_UNSPEC): Future[seq[TransportAddress]] {.async, base.} = ## Resolve the specified address - ## + ## doAssert(false, "Not implemented!") proc getHostname*(ma: MultiAddress): string = - let firstPart = ($ma[0].get()).split('/') - if firstPart.len > 1: firstPart[2] + let + firstPart = ma[0].valueOr: return "" + fpSplitted = ($firstPart).split('/', 2) + if fpSplitted.len > 2: fpSplitted[2] else: "" -proc resolveDnsAddress( +proc resolveOneAddress( self: NameResolver, ma: MultiAddress, domain: Domain = Domain.AF_UNSPEC, @@ -64,29 +66,22 @@ proc resolveDnsAddress( let port = Port(fromBytesBE(uint16, pbuf)) resolvedAddresses = await self.resolveIp(prefix & dnsval, port, domain) - + return collect(newSeqOfCap(4)): for address in resolvedAddresses: var createdAddress = MultiAddress.init(address).tryGet()[0].tryGet() for part in ma: - if DNS.match(part.get()): continue + if DNS.match(part.tryGet()): continue createdAddress &= part.tryGet() createdAddress -func matchDnsSuffix(m1, m2: MultiAddress): MaResult[bool] = - for partMaybe in m1: - let part = ?partMaybe - if DNS.match(part): continue - let entryProt = ?m2[?part.protoCode()] - if entryProt != part: - return ok(false) - return ok(true) - -proc resolveDnsAddr( +proc resolveDnsAddr*( self: NameResolver, ma: MultiAddress, - depth: int = 0): Future[seq[MultiAddress]] - {.async.} = + depth: int = 0): Future[seq[MultiAddress]] {.async.} = + + if not DNSADDR.matchPartial(ma): + return @[ma] trace "Resolving dnsaddr", ma if depth > 6: @@ -104,21 +99,17 @@ proc resolveDnsAddr( if not entry.startsWith("dnsaddr="): continue let entryValue = MultiAddress.init(entry[8..^1]).tryGet() - if not matchDnsSuffix(ma, entryValue).tryGet(): continue + if entryValue.contains(multiCodec("p2p")).tryGet() and ma.contains(multiCodec("p2p")).tryGet(): + if entryValue[multiCodec("p2p")] != ma[multiCodec("p2p")]: + continue - # The spec is not clear wheter only DNSADDR can be recursived - # or any DNS addr. Only handling DNSADDR because it's simpler - # to avoid infinite recursion - if DNSADDR.matchPartial(entryValue): - let resolved = await self.resolveDnsAddr(entryValue, depth + 1) - for r in resolved: - result.add(r) - else: - result.add(entryValue) + let resolved = await self.resolveDnsAddr(entryValue, depth + 1) + for r in resolved: + result.add(r) if result.len == 0: - debug "Failed to resolve any DNSADDR", ma - return @[ma] + debug "Failed to resolve a DNSADDR", ma + return @[] return result @@ -133,14 +124,15 @@ proc resolveMAddress*( let code = address[0].get().protoCode().get() let seq = case code: of multiCodec("dns"): - await self.resolveDnsAddress(address) + await self.resolveOneAddress(address) of multiCodec("dns4"): - await self.resolveDnsAddress(address, Domain.AF_INET) + await self.resolveOneAddress(address, Domain.AF_INET) of multiCodec("dns6"): - await self.resolveDnsAddress(address, Domain.AF_INET6) + await self.resolveOneAddress(address, Domain.AF_INET6) of multiCodec("dnsaddr"): await self.resolveDnsAddr(address) else: + doAssert false @[address] for ad in seq: res.incl(ad) diff --git a/tests/testmultiaddress.nim b/tests/testmultiaddress.nim index 2c05e212e..c180db7f2 100644 --- a/tests/testmultiaddress.nim +++ b/tests/testmultiaddress.nim @@ -386,6 +386,11 @@ suite "MultiAddress test suite": let ma = MultiAddress.init("/ip4/0.0.0.0/tcp/0/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC/p2p-circuit/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSuNEXT/unix/stdio/").get() check: $ma[0..0].get() == "/ip4/0.0.0.0" + $ma[^1].get() == "/unix/stdio" + ma[-100].isErr() + ma[100].isErr() + ma[^100].isErr() + ma[^0].isErr() $ma[0..1].get() == "/ip4/0.0.0.0/tcp/0" $ma[1..2].get() == "/tcp/0/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC" $ma[^3..^1].get() == "/p2p-circuit/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSuNEXT/unix/stdio" diff --git a/tests/testnameresolve.nim b/tests/testnameresolve.nim index fa9b88ffa..4a4dd40b0 100644 --- a/tests/testnameresolve.nim +++ b/tests/testnameresolve.nim @@ -139,7 +139,18 @@ suite "Name resolving": asyncTest "dnsaddr infinite recursion": resolver.txtResponses["_dnsaddr.bootstrap.libp2p.io"] = @["dnsaddr=/dnsaddr/bootstrap.libp2p.io"] - check testOne("/dnsaddr/bootstrap.libp2p.io/", "/dnsaddr/bootstrap.libp2p.io/") + check testOne("/dnsaddr/bootstrap.libp2p.io/", newSeq[string]()) + + test "getHostname": + check: + MultiAddress.init("/dnsaddr/bootstrap.libp2p.io/").tryGet().getHostname == "bootstrap.libp2p.io" + MultiAddress.init("").tryGet().getHostname == "" + MultiAddress.init("/ip4/147.75.69.143/tcp/4001/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN").tryGet().getHostname == "147.75.69.143" + MultiAddress.init("/ip6/2604:1380:1000:6000::1/tcp/4001/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN").tryGet().getHostname == "2604:1380:1000:6000::1" + MultiAddress.init("/dns/localhost/udp/0").tryGet().getHostname == "localhost" + MultiAddress.init("/dns4/hello.com/udp/0").tryGet().getHostname == "hello.com" + MultiAddress.init("/dns6/hello.com/udp/0").tryGet().getHostname == "hello.com" + MultiAddress.init("/wss/").tryGet().getHostname == "" suite "DNS Resolving": teardown: @@ -171,7 +182,7 @@ suite "Name resolving": # The test var dnsresolver = DnsResolver.new(@[server.localAddress]) - + check await(dnsresolver.resolveIp("status.im", 0.Port, Domain.AF_UNSPEC)) == mapIt( @["104.22.24.181:0", "172.67.10.161:0", "104.22.25.181:0", @@ -209,7 +220,7 @@ suite "Name resolving": # The test var dnsresolver = DnsResolver.new(@[unresponsiveServer.localAddress, server.localAddress]) - + check await(dnsresolver.resolveIp("status.im", 0.Port, Domain.AF_INET)) == mapIt(@["104.22.24.181:0", "172.67.10.161:0", "104.22.25.181:0"], initTAddress(it)) diff --git a/tests/testnoise.nim b/tests/testnoise.nim index b0e785be3..a94714ce0 100644 --- a/tests/testnoise.nim +++ b/tests/testnoise.nim @@ -295,7 +295,7 @@ suite "Noise": (switch2, peerInfo2) = createSwitch(ma2, true, true) # secio, we want to fail await switch1.start() await switch2.start() - expect(UpgradeFailedError): + expect(DialFailedError): let conn = await switch2.dial(switch1.peerInfo.peerId, switch1.peerInfo.addrs, TestCodec) await allFuturesThrowing( diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 608266c09..961b6dc3c 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -201,13 +201,25 @@ suite "Switch": check not switch1.isConnected(switch2.peerInfo.peerId) check not switch2.isConnected(switch1.peerInfo.peerId) - asyncTest "e2e connect to peer with unkown PeerId": + asyncTest "e2e connect to peer with unknown PeerId": + let resolver = MockResolver.new() let switch1 = newStandardSwitch(secureManagers = [SecureProtocol.Noise]) - let switch2 = newStandardSwitch(secureManagers = [SecureProtocol.Noise]) + let switch2 = newStandardSwitch(secureManagers = [SecureProtocol.Noise], nameResolver = resolver) await switch1.start() await switch2.start() + # via dnsaddr + resolver.txtResponses["_dnsaddr.test.io"] = @[ + "dnsaddr=" & $switch1.peerInfo.addrs[0] & "/p2p/" & $switch1.peerInfo.peerId, + ] + + check: (await switch2.connect(@[MultiAddress.init("/dnsaddr/test.io/").tryGet()])) == switch1.peerInfo.peerId + await switch2.disconnect(switch1.peerInfo.peerId) + + # via direct ip + check not switch2.isConnected(switch1.peerInfo.peerId) check: (await switch2.connect(switch1.peerInfo.addrs)) == switch1.peerInfo.peerId + await switch2.disconnect(switch1.peerInfo.peerId) await allFuturesThrowing( @@ -665,7 +677,7 @@ suite "Switch": await switch.start() var peerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).get() - expect LPStreamClosedError, LPStreamEOFError: + expect DialFailedError: await switch.connect(peerId, transport.addrs) await handlerWait @@ -994,9 +1006,10 @@ suite "Switch": await srcWsSwitch.start() resolver.txtResponses["_dnsaddr.test.io"] = @[ - "dnsaddr=" & $destSwitch.peerInfo.addrs[0], - "dnsaddr=" & $destSwitch.peerInfo.addrs[1] + "dnsaddr=/dns4/localhost" & $destSwitch.peerInfo.addrs[0][1..^1].tryGet() & "/p2p/" & $destSwitch.peerInfo.peerId, + "dnsaddr=/dns4/localhost" & $destSwitch.peerInfo.addrs[1][1..^1].tryGet() ] + resolver.ipResponses[("localhost", false)] = @["127.0.0.1"] let testAddr = MultiAddress.init("/dnsaddr/test.io/").tryGet()