Autonat service: handle connections limits (#846)
Co-authored-by: diegomrsantos <diego@status.im>
This commit is contained in:
parent
351bda2b56
commit
ca19f8fdbf
|
@ -444,6 +444,13 @@ proc getOutgoingSlot*(c: ConnManager, forceDial = false): ConnectionSlot {.raise
|
|||
raise newTooManyConnectionsError()
|
||||
return ConnectionSlot(connManager: c, direction: Out)
|
||||
|
||||
proc slotsAvailable*(c: ConnManager, dir: Direction): int =
|
||||
case dir:
|
||||
of Direction.In:
|
||||
return c.inSema.count
|
||||
of Direction.Out:
|
||||
return c.outSema.count
|
||||
|
||||
proc release*(cs: ConnectionSlot) =
|
||||
if cs.direction == In:
|
||||
cs.connManager.inSema.release()
|
||||
|
|
|
@ -57,7 +57,11 @@ method dialMe*(self: AutonatClient, switch: Switch, pid: PeerId, addrs: seq[Mult
|
|||
except CatchableError as err:
|
||||
raise newException(AutonatError, "Unexpected error when dialling", err)
|
||||
|
||||
defer: await conn.close()
|
||||
# To bypass maxConnectionsPerPeer
|
||||
let incomingConnection = switch.connManager.expectConnection(pid)
|
||||
defer:
|
||||
await conn.close()
|
||||
incomingConnection.cancel() # Safer to always try to cancel cause we aren't sure if the peer dialled us or not
|
||||
await conn.sendDial(switch.peerInfo.peerId, switch.peerInfo.addrs)
|
||||
let response = getResponseOrRaise(AutonatMsg.decode(await conn.readLp(1024)))
|
||||
return case response.status:
|
||||
|
|
|
@ -59,8 +59,13 @@ proc sendResponseOk(conn: Connection, ma: MultiAddress) {.async.} =
|
|||
await conn.writeLp(pb.buffer)
|
||||
|
||||
proc tryDial(autonat: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.async.} =
|
||||
try:
|
||||
await autonat.sem.acquire()
|
||||
try:
|
||||
# This is to bypass the per peer max connections limit
|
||||
let outgoingConnection = autonat.switch.connManager.expectConnection(conn.peerId)
|
||||
# Safer to always try to cancel cause we aren't sure if the connection was established
|
||||
defer: outgoingConnection.cancel()
|
||||
# This is to bypass the global max connections limit
|
||||
let ma = await autonat.switch.dialer.tryDial(conn.peerId, addrs).wait(autonat.dialTimeout)
|
||||
if ma.isSome:
|
||||
await conn.sendResponseOk(ma.get())
|
||||
|
|
|
@ -76,6 +76,10 @@ proc callHandler(self: AutonatService) {.async.} =
|
|||
if not isNil(self.statusAndConfidenceHandler):
|
||||
await self.statusAndConfidenceHandler(self.networkReachability, self.confidence)
|
||||
|
||||
proc hasEnoughIncomingSlots(switch: Switch): bool =
|
||||
# we leave some margin instead of comparing to 0 as a peer could connect to us while we are asking for the dial back
|
||||
return switch.connManager.slotsAvailable(In) >= 2
|
||||
|
||||
proc handleAnswer(self: AutonatService, ans: NetworkReachability) {.async.} =
|
||||
|
||||
if ans == Unknown:
|
||||
|
@ -98,19 +102,25 @@ proc handleAnswer(self: AutonatService, ans: NetworkReachability) {.async.} =
|
|||
trace "Current status", currentStats = $self.networkReachability, confidence = $self.confidence
|
||||
|
||||
proc askPeer(self: AutonatService, switch: Switch, peerId: PeerId): Future[NetworkReachability] {.async.} =
|
||||
trace "Asking for reachability", peerId = $peerId
|
||||
logScope:
|
||||
peerId = $peerId
|
||||
if not hasEnoughIncomingSlots(switch):
|
||||
trace "No incoming slots available, not asking peer", incomingSlotsAvailable=switch.connManager.slotsAvailable(In)
|
||||
return Unknown
|
||||
|
||||
trace "Asking for reachability"
|
||||
let ans =
|
||||
try:
|
||||
discard await self.autonatClient.dialMe(switch, peerId).wait(self.dialTimeout)
|
||||
Reachable
|
||||
except AutonatUnreachableError:
|
||||
trace "dialMe answer is not reachable", peerId = $peerId
|
||||
trace "dialMe answer is not reachable"
|
||||
NotReachable
|
||||
except AsyncTimeoutError:
|
||||
trace "dialMe timed out", peerId = $peerId
|
||||
trace "dialMe timed out"
|
||||
Unknown
|
||||
except CatchableError as err:
|
||||
trace "dialMe unexpected error", peerId = $peerId, errMsg = $err.msg
|
||||
trace "dialMe unexpected error", errMsg = $err.msg
|
||||
Unknown
|
||||
await self.handleAnswer(ans)
|
||||
if not isNil(self.statusAndConfidenceHandler):
|
||||
|
@ -124,7 +134,10 @@ proc askConnectedPeers(self: AutonatService, switch: Switch) {.async.} =
|
|||
for peer in peers:
|
||||
if answersFromPeers >= self.numPeersToAsk:
|
||||
break
|
||||
elif (await askPeer(self, switch, peer)) != Unknown:
|
||||
if not hasEnoughIncomingSlots(switch):
|
||||
debug "No incoming slots available, not asking peers", incomingSlotsAvailable=switch.connManager.slotsAvailable(In)
|
||||
break
|
||||
if (await askPeer(self, switch, peer)) != Unknown:
|
||||
answersFromPeers.inc()
|
||||
|
||||
proc schedule(service: AutonatService, switch: Switch, interval: Duration) {.async.} =
|
||||
|
|
|
@ -17,11 +17,13 @@ import ../libp2p/[builders,
|
|||
import ./helpers
|
||||
import stubs/autonatclientstub
|
||||
|
||||
proc createSwitch(autonatSvc: Service = nil, withAutonat = true): Switch =
|
||||
proc createSwitch(autonatSvc: Service = nil, withAutonat = true, maxConnsPerPeer = 1, maxConns = 100): Switch =
|
||||
var builder = SwitchBuilder.new()
|
||||
.withRng(newRng())
|
||||
.withAddresses(@[ MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ])
|
||||
.withTcpTransport()
|
||||
.withMaxConnsPerPeer(maxConnsPerPeer)
|
||||
.withMaxConnections(maxConns)
|
||||
.withMplex()
|
||||
.withNoise()
|
||||
|
||||
|
@ -244,3 +246,77 @@ suite "Autonat Service":
|
|||
check (await autonatService.stop(switch)) == false
|
||||
|
||||
await allFuturesThrowing(switch.stop())
|
||||
|
||||
asyncTest "Must bypass maxConnectionsPerPeer limit":
|
||||
let autonatService = AutonatService.new(AutonatClient.new(), newRng(), some(1.seconds), maxQueueSize = 1)
|
||||
|
||||
let switch1 = createSwitch(autonatService, maxConnsPerPeer = 0)
|
||||
let switch2 = createSwitch(maxConnsPerPeer = 0)
|
||||
|
||||
let awaiter = newFuture[void]()
|
||||
|
||||
proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} =
|
||||
if networkReachability == NetworkReachability.Reachable and confidence.isSome() and confidence.get() == 1:
|
||||
if not awaiter.finished:
|
||||
awaiter.complete()
|
||||
|
||||
check autonatService.networkReachability() == NetworkReachability.Unknown
|
||||
|
||||
autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler)
|
||||
|
||||
await switch1.start()
|
||||
await switch2.start()
|
||||
|
||||
await switch1.connect(switch2.peerInfo.peerId, switch2.peerInfo.addrs)
|
||||
|
||||
await awaiter
|
||||
|
||||
check autonatService.networkReachability() == NetworkReachability.Reachable
|
||||
check libp2p_autonat_reachability_confidence.value(["Reachable"]) == 1
|
||||
|
||||
await allFuturesThrowing(
|
||||
switch1.stop(), switch2.stop())
|
||||
|
||||
asyncTest "Must work with low maxConnections":
|
||||
let autonatService = AutonatService.new(AutonatClient.new(), newRng(), some(1.seconds), maxQueueSize = 1)
|
||||
|
||||
let switch1 = createSwitch(autonatService, maxConns = 4)
|
||||
let switch2 = createSwitch()
|
||||
let switch3 = createSwitch()
|
||||
let switch4 = createSwitch()
|
||||
let switch5 = createSwitch()
|
||||
|
||||
var awaiter = newFuture[void]()
|
||||
|
||||
proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} =
|
||||
if networkReachability == NetworkReachability.Reachable and confidence.isSome() and confidence.get() == 1:
|
||||
if not awaiter.finished:
|
||||
awaiter.complete()
|
||||
|
||||
check autonatService.networkReachability() == NetworkReachability.Unknown
|
||||
|
||||
autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler)
|
||||
|
||||
await switch1.start()
|
||||
await switch2.start()
|
||||
await switch3.start()
|
||||
await switch4.start()
|
||||
await switch5.start()
|
||||
|
||||
await switch1.connect(switch2.peerInfo.peerId, switch2.peerInfo.addrs)
|
||||
|
||||
await awaiter
|
||||
|
||||
await switch1.connect(switch3.peerInfo.peerId, switch3.peerInfo.addrs)
|
||||
await switch1.connect(switch4.peerInfo.peerId, switch4.peerInfo.addrs)
|
||||
await switch5.connect(switch1.peerInfo.peerId, switch1.peerInfo.addrs)
|
||||
# switch1 is now full, should stick to last observation
|
||||
awaiter = newFuture[void]()
|
||||
await autonatService.run(switch1)
|
||||
await awaiter
|
||||
|
||||
check autonatService.networkReachability() == NetworkReachability.Reachable
|
||||
check libp2p_autonat_reachability_confidence.value(["Reachable"]) == 1
|
||||
|
||||
await allFuturesThrowing(
|
||||
switch1.stop(), switch2.stop(), switch3.stop(), switch4.stop(), switch5.stop())
|
||||
|
|
Loading…
Reference in New Issue