diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index a82c424..2ce4d72 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -444,6 +444,13 @@ proc getOutgoingSlot*(c: ConnManager, forceDial = false): ConnectionSlot {.raise raise newTooManyConnectionsError() return ConnectionSlot(connManager: c, direction: Out) +proc slotsAvailable*(c: ConnManager, dir: Direction): int = + case dir: + of Direction.In: + return c.inSema.count + of Direction.Out: + return c.outSema.count + proc release*(cs: ConnectionSlot) = if cs.direction == In: cs.connManager.inSema.release() diff --git a/libp2p/protocols/connectivity/autonat/client.nim b/libp2p/protocols/connectivity/autonat/client.nim index b9e0a67..0e5c341 100644 --- a/libp2p/protocols/connectivity/autonat/client.nim +++ b/libp2p/protocols/connectivity/autonat/client.nim @@ -57,7 +57,11 @@ method dialMe*(self: AutonatClient, switch: Switch, pid: PeerId, addrs: seq[Mult except CatchableError as err: raise newException(AutonatError, "Unexpected error when dialling", err) - defer: await conn.close() + # To bypass maxConnectionsPerPeer + let incomingConnection = switch.connManager.expectConnection(pid) + defer: + await conn.close() + incomingConnection.cancel() # Safer to always try to cancel cause we aren't sure if the peer dialled us or not await conn.sendDial(switch.peerInfo.peerId, switch.peerInfo.addrs) let response = getResponseOrRaise(AutonatMsg.decode(await conn.readLp(1024))) return case response.status: diff --git a/libp2p/protocols/connectivity/autonat/server.nim b/libp2p/protocols/connectivity/autonat/server.nim index 5d429d6..688bf3c 100644 --- a/libp2p/protocols/connectivity/autonat/server.nim +++ b/libp2p/protocols/connectivity/autonat/server.nim @@ -59,8 +59,13 @@ proc sendResponseOk(conn: Connection, ma: MultiAddress) {.async.} = await conn.writeLp(pb.buffer) proc tryDial(autonat: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.async.} = + await autonat.sem.acquire() try: - await autonat.sem.acquire() + # This is to bypass the per peer max connections limit + let outgoingConnection = autonat.switch.connManager.expectConnection(conn.peerId) + # Safer to always try to cancel cause we aren't sure if the connection was established + defer: outgoingConnection.cancel() + # This is to bypass the global max connections limit let ma = await autonat.switch.dialer.tryDial(conn.peerId, addrs).wait(autonat.dialTimeout) if ma.isSome: await conn.sendResponseOk(ma.get()) diff --git a/libp2p/protocols/connectivity/autonat/service.nim b/libp2p/protocols/connectivity/autonat/service.nim index 7c47543..94fb9ac 100644 --- a/libp2p/protocols/connectivity/autonat/service.nim +++ b/libp2p/protocols/connectivity/autonat/service.nim @@ -76,6 +76,10 @@ proc callHandler(self: AutonatService) {.async.} = if not isNil(self.statusAndConfidenceHandler): await self.statusAndConfidenceHandler(self.networkReachability, self.confidence) +proc hasEnoughIncomingSlots(switch: Switch): bool = + # we leave some margin instead of comparing to 0 as a peer could connect to us while we are asking for the dial back + return switch.connManager.slotsAvailable(In) >= 2 + proc handleAnswer(self: AutonatService, ans: NetworkReachability) {.async.} = if ans == Unknown: @@ -98,19 +102,25 @@ proc handleAnswer(self: AutonatService, ans: NetworkReachability) {.async.} = trace "Current status", currentStats = $self.networkReachability, confidence = $self.confidence proc askPeer(self: AutonatService, switch: Switch, peerId: PeerId): Future[NetworkReachability] {.async.} = - trace "Asking for reachability", peerId = $peerId + logScope: + peerId = $peerId + if not hasEnoughIncomingSlots(switch): + trace "No incoming slots available, not asking peer", incomingSlotsAvailable=switch.connManager.slotsAvailable(In) + return Unknown + + trace "Asking for reachability" let ans = try: discard await self.autonatClient.dialMe(switch, peerId).wait(self.dialTimeout) Reachable except AutonatUnreachableError: - trace "dialMe answer is not reachable", peerId = $peerId + trace "dialMe answer is not reachable" NotReachable except AsyncTimeoutError: - trace "dialMe timed out", peerId = $peerId + trace "dialMe timed out" Unknown except CatchableError as err: - trace "dialMe unexpected error", peerId = $peerId, errMsg = $err.msg + trace "dialMe unexpected error", errMsg = $err.msg Unknown await self.handleAnswer(ans) if not isNil(self.statusAndConfidenceHandler): @@ -124,7 +134,10 @@ proc askConnectedPeers(self: AutonatService, switch: Switch) {.async.} = for peer in peers: if answersFromPeers >= self.numPeersToAsk: break - elif (await askPeer(self, switch, peer)) != Unknown: + if not hasEnoughIncomingSlots(switch): + debug "No incoming slots available, not asking peers", incomingSlotsAvailable=switch.connManager.slotsAvailable(In) + break + if (await askPeer(self, switch, peer)) != Unknown: answersFromPeers.inc() proc schedule(service: AutonatService, switch: Switch, interval: Duration) {.async.} = diff --git a/tests/testautonatservice.nim b/tests/testautonatservice.nim index 0bc8a64..c40504b 100644 --- a/tests/testautonatservice.nim +++ b/tests/testautonatservice.nim @@ -17,11 +17,13 @@ import ../libp2p/[builders, import ./helpers import stubs/autonatclientstub -proc createSwitch(autonatSvc: Service = nil, withAutonat = true): Switch = +proc createSwitch(autonatSvc: Service = nil, withAutonat = true, maxConnsPerPeer = 1, maxConns = 100): Switch = var builder = SwitchBuilder.new() .withRng(newRng()) .withAddresses(@[ MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ]) .withTcpTransport() + .withMaxConnsPerPeer(maxConnsPerPeer) + .withMaxConnections(maxConns) .withMplex() .withNoise() @@ -244,3 +246,77 @@ suite "Autonat Service": check (await autonatService.stop(switch)) == false await allFuturesThrowing(switch.stop()) + + asyncTest "Must bypass maxConnectionsPerPeer limit": + let autonatService = AutonatService.new(AutonatClient.new(), newRng(), some(1.seconds), maxQueueSize = 1) + + let switch1 = createSwitch(autonatService, maxConnsPerPeer = 0) + let switch2 = createSwitch(maxConnsPerPeer = 0) + + let awaiter = newFuture[void]() + + proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = + if networkReachability == NetworkReachability.Reachable and confidence.isSome() and confidence.get() == 1: + if not awaiter.finished: + awaiter.complete() + + check autonatService.networkReachability() == NetworkReachability.Unknown + + autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler) + + await switch1.start() + await switch2.start() + + await switch1.connect(switch2.peerInfo.peerId, switch2.peerInfo.addrs) + + await awaiter + + check autonatService.networkReachability() == NetworkReachability.Reachable + check libp2p_autonat_reachability_confidence.value(["Reachable"]) == 1 + + await allFuturesThrowing( + switch1.stop(), switch2.stop()) + + asyncTest "Must work with low maxConnections": + let autonatService = AutonatService.new(AutonatClient.new(), newRng(), some(1.seconds), maxQueueSize = 1) + + let switch1 = createSwitch(autonatService, maxConns = 4) + let switch2 = createSwitch() + let switch3 = createSwitch() + let switch4 = createSwitch() + let switch5 = createSwitch() + + var awaiter = newFuture[void]() + + proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = + if networkReachability == NetworkReachability.Reachable and confidence.isSome() and confidence.get() == 1: + if not awaiter.finished: + awaiter.complete() + + check autonatService.networkReachability() == NetworkReachability.Unknown + + autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler) + + await switch1.start() + await switch2.start() + await switch3.start() + await switch4.start() + await switch5.start() + + await switch1.connect(switch2.peerInfo.peerId, switch2.peerInfo.addrs) + + await awaiter + + await switch1.connect(switch3.peerInfo.peerId, switch3.peerInfo.addrs) + await switch1.connect(switch4.peerInfo.peerId, switch4.peerInfo.addrs) + await switch5.connect(switch1.peerInfo.peerId, switch1.peerInfo.addrs) + # switch1 is now full, should stick to last observation + awaiter = newFuture[void]() + await autonatService.run(switch1) + await awaiter + + check autonatService.networkReachability() == NetworkReachability.Reachable + check libp2p_autonat_reachability_confidence.value(["Reachable"]) == 1 + + await allFuturesThrowing( + switch1.stop(), switch2.stop(), switch3.stop(), switch4.stop(), switch5.stop())