From e68186373be09c0c9698c5c1dcdca0e0b9e47a97 Mon Sep 17 00:00:00 2001 From: diegomrsantos Date: Tue, 7 Feb 2023 18:51:17 +0100 Subject: [PATCH] Dialing addrs concurrently in autonat (#855) --- .../protocols/connectivity/autonat/client.nim | 3 +- .../protocols/connectivity/autonat/server.nim | 28 +++++++++++++++---- .../connectivity/autonat/service.nim | 25 ++++++++++------- tests/testautonat.nim | 2 +- 4 files changed, 40 insertions(+), 18 deletions(-) diff --git a/libp2p/protocols/connectivity/autonat/client.nim b/libp2p/protocols/connectivity/autonat/client.nim index 0e5c3415a..ef1279c0b 100644 --- a/libp2p/protocols/connectivity/autonat/client.nim +++ b/libp2p/protocols/connectivity/autonat/client.nim @@ -62,12 +62,13 @@ method dialMe*(self: AutonatClient, switch: Switch, pid: PeerId, addrs: seq[Mult defer: await conn.close() incomingConnection.cancel() # Safer to always try to cancel cause we aren't sure if the peer dialled us or not + trace "sending Dial", addrs = switch.peerInfo.addrs await conn.sendDial(switch.peerInfo.peerId, switch.peerInfo.addrs) let response = getResponseOrRaise(AutonatMsg.decode(await conn.readLp(1024))) return case response.status: of ResponseStatus.Ok: response.ma.get() of ResponseStatus.DialError: - raise newException(AutonatUnreachableError, "Peer could not dial us back") + raise newException(AutonatUnreachableError, "Peer could not dial us back: " & response.text.get("")) else: raise newException(AutonatError, "Bad status " & $response.status & " " & response.text.get("")) diff --git a/libp2p/protocols/connectivity/autonat/server.nim b/libp2p/protocols/connectivity/autonat/server.nim index 688bf3c9c..a987ed023 100644 --- a/libp2p/protocols/connectivity/autonat/server.nim +++ b/libp2p/protocols/connectivity/autonat/server.nim @@ -20,7 +20,7 @@ import ../../protocol, ../../../multiaddress, ../../../multicodec, ../../../peerid, - ../../../utils/semaphore, + ../../../utils/[semaphore, future], ../../../errors import core @@ -60,23 +60,36 @@ proc sendResponseOk(conn: Connection, ma: MultiAddress) {.async.} = proc tryDial(autonat: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.async.} = await autonat.sem.acquire() + var futs: seq[Future[Opt[MultiAddress]]] try: # This is to bypass the per peer max connections limit let outgoingConnection = autonat.switch.connManager.expectConnection(conn.peerId) # Safer to always try to cancel cause we aren't sure if the connection was established defer: outgoingConnection.cancel() - # This is to bypass the global max connections limit - let ma = await autonat.switch.dialer.tryDial(conn.peerId, addrs).wait(autonat.dialTimeout) + # tryDial is to bypass the global max connections limit + futs = addrs.mapIt(autonat.switch.dialer.tryDial(conn.peerId, @[it])) + let fut = await anyCompleted(futs).wait(autonat.dialTimeout) + let ma = await fut if ma.isSome: await conn.sendResponseOk(ma.get()) else: await conn.sendResponseError(DialError, "Missing observed address") except CancelledError as exc: raise exc + except AllFuturesFailedError as exc: + debug "All dial attempts failed", addrs, exc = exc.msg + await conn.sendResponseError(DialError, "All dial attempts failed") + except AsyncTimeoutError as exc: + debug "Dial timeout", addrs, exc = exc.msg + await conn.sendResponseError(DialError, "Dial timeout") except CatchableError as exc: - await conn.sendResponseError(DialError, exc.msg) + debug "Unexpected error", addrs, exc = exc.msg + await conn.sendResponseError(DialError, "Unexpected error") finally: autonat.sem.release() + for f in futs: + if not f.finished(): + f.cancel() proc handleDial(autonat: Autonat, conn: Connection, msg: AutonatMsg): Future[void] = if msg.dial.isNone() or msg.dial.get().peerInfo.isNone(): @@ -98,6 +111,7 @@ proc handleDial(autonat: Autonat, conn: Connection, msg: AutonatMsg): Future[voi return conn.sendResponseError(InternalError, "Expected an IP address") var addrs = initHashSet[MultiAddress]() addrs.incl(observedAddr) + trace "addrs received", addrs = peerInfo.addrs for ma in peerInfo.addrs: isRelayed = ma.contains(multiCodec("p2p-circuit")) if isRelayed.isErr() or isRelayed.get(): @@ -122,7 +136,9 @@ proc handleDial(autonat: Autonat, conn: Connection, msg: AutonatMsg): Future[voi if len(addrs) == 0: return conn.sendResponseError(DialRefused, "No dialable address") - return autonat.tryDial(conn, toSeq(addrs)) + let addrsSeq = toSeq(addrs) + trace "trying to dial", addrs = addrsSeq + return autonat.tryDial(conn, addrsSeq) proc new*(T: typedesc[Autonat], switch: Switch, semSize: int = 1, dialTimeout = 15.seconds): T = let autonat = T(switch: switch, sem: newAsyncSemaphore(semSize), dialTimeout: dialTimeout) @@ -136,7 +152,7 @@ proc new*(T: typedesc[Autonat], switch: Switch, semSize: int = 1, dialTimeout = except CancelledError as exc: raise exc except CatchableError as exc: - trace "exception in autonat handler", exc = exc.msg, conn + debug "exception in autonat handler", exc = exc.msg, conn finally: trace "exiting autonat handler", conn await conn.close() diff --git a/libp2p/protocols/connectivity/autonat/service.nim b/libp2p/protocols/connectivity/autonat/service.nim index 94fb9ac07..6b223babc 100644 --- a/libp2p/protocols/connectivity/autonat/service.nim +++ b/libp2p/protocols/connectivity/autonat/service.nim @@ -99,28 +99,29 @@ proc handleAnswer(self: AutonatService, ans: NetworkReachability) {.async.} = self.networkReachability = reachability self.confidence = some(confidence) - trace "Current status", currentStats = $self.networkReachability, confidence = $self.confidence + debug "Current status", currentStats = $self.networkReachability, confidence = $self.confidence, answers = self.answers proc askPeer(self: AutonatService, switch: Switch, peerId: PeerId): Future[NetworkReachability] {.async.} = logScope: peerId = $peerId if not hasEnoughIncomingSlots(switch): - trace "No incoming slots available, not asking peer", incomingSlotsAvailable=switch.connManager.slotsAvailable(In) + debug "No incoming slots available, not asking peer", incomingSlotsAvailable=switch.connManager.slotsAvailable(In) return Unknown - trace "Asking for reachability" + trace "Asking peer for reachability" let ans = try: discard await self.autonatClient.dialMe(switch, peerId).wait(self.dialTimeout) + debug "dialMe answer is reachable" Reachable - except AutonatUnreachableError: - trace "dialMe answer is not reachable" + except AutonatUnreachableError as error: + debug "dialMe answer is not reachable", msg = error.msg NotReachable - except AsyncTimeoutError: - trace "dialMe timed out" + except AsyncTimeoutError as error: + debug "dialMe timed out", msg = error.msg Unknown - except CatchableError as err: - trace "dialMe unexpected error", errMsg = $err.msg + except CatchableError as error: + debug "dialMe unexpected error", msg = error.msg Unknown await self.handleAnswer(ans) if not isNil(self.statusAndConfidenceHandler): @@ -128,6 +129,7 @@ proc askPeer(self: AutonatService, switch: Switch, peerId: PeerId): Future[Netwo return ans proc askConnectedPeers(self: AutonatService, switch: Switch) {.async.} = + trace "Asking peers for reachability" var peers = switch.connectedPeers(Direction.Out) self.rng.shuffle(peers) var answersFromPeers = 0 @@ -141,10 +143,11 @@ proc askConnectedPeers(self: AutonatService, switch: Switch) {.async.} = answersFromPeers.inc() proc schedule(service: AutonatService, switch: Switch, interval: Duration) {.async.} = - heartbeat "Schedule AutonatService run", interval: + heartbeat "Scheduling AutonatService run", interval: await service.run(switch) method setup*(self: AutonatService, switch: Switch): Future[bool] {.async.} = + info "Setting up AutonatService" let hasBeenSetup = await procCall Service(self).setup(switch) if hasBeenSetup: if self.askNewConnectedPeers: @@ -157,11 +160,13 @@ method setup*(self: AutonatService, switch: Switch): Future[bool] {.async.} = return hasBeenSetup method run*(self: AutonatService, switch: Switch) {.async, public.} = + trace "Running AutonatService" await askConnectedPeers(self, switch) await self.callHandler() method stop*(self: AutonatService, switch: Switch): Future[bool] {.async, public.} = + info "Stopping AutonatService" let hasBeenStopped = await procCall Service(self).stop(switch) if hasBeenStopped: if not isNil(self.scheduleHandle): diff --git a/tests/testautonat.nim b/tests/testautonat.nim index a945afa89..19a6f2c12 100644 --- a/tests/testautonat.nim +++ b/tests/testautonat.nim @@ -85,6 +85,6 @@ suite "Autonat": let response = AutonatMsg.decode(await conn.readLp(1024)).get().response.get() check: response.status == DialError - response.text.get() == "Timeout exceeded!" + response.text.get() == "Dial timeout" response.ma.isNone() await allFutures(doesNothingListener.stop(), src.stop(), dst.stop())