diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index 4b60afa30..3c4354192 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -1,7 +1,7 @@ {.used.} import - std/[options, sequtils], + std/[options, sequtils, times], stew/shims/net as stewNet, testutils/unittests, chronos, @@ -195,20 +195,14 @@ procSuite "Peer Manager": conn1Ok == false # Right after failing there is a backoff period - nodes[0].peerManager.peerStore.canBeConnected( - nonExistentPeer.peerId, - nodes[0].peerManager.initialBackoffInSec, - nodes[0].peerManager.backoffFactor) == false + nodes[0].peerManager.canBeConnected(nonExistentPeer.peerId) == false # We wait the first backoff period - await sleepAsync(2100.milliseconds) + await sleepAsync(chronos.milliseconds(2100)) # And backoff period is over check: - nodes[0].peerManager.peerStore.canBeConnected( - nodes[1].peerInfo.peerId, - nodes[0].peerManager.initialBackoffInSec, - nodes[0].peerManager.backoffFactor) == true + nodes[0].peerManager.canBeConnected(nodes[1].peerInfo.peerId) == true # After a successful connection, the number of failed connections is reset nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] = 4 @@ -655,3 +649,81 @@ procSuite "Peer Manager": not pm.peerStore.peers.anyIt(it.peerId == peers[0].peerId) not pm.peerStore.peers.anyIt(it.peerId == peers[1].peerId) not pm.peerStore.peers.anyIt(it.peerId == peers[2].peerId) + + asyncTest "canBeConnected() returns correct value": + let pm = PeerManager.new( + switch = SwitchBuilder.new().withRng(rng).withMplex().withNoise() + .withPeerStore(10) + .withMaxConnections(5) + .build(), + initialBackoffInSec = 1, # with InitialBackoffInSec = 1 backoffs are: 1, 2, 4, 8secs. + backoffFactor = 2, + maxFailedAttempts = 10, + storage = nil) + var p1: PeerId + require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1") + + + # new peer with no errors can be connected + check: + pm.canBeConnected(p1) == true + + # peer with ONE error that just failed + pm.peerStore[NumberFailedConnBook][p1] = 1 + pm.peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second) + # we cant connect right now + check: + pm.canBeConnected(p1) == false + + # but we can after the first backoff of 1 seconds + await sleepAsync(chronos.milliseconds(1200)) + check: + pm.canBeConnected(p1) == true + + # peer with TWO errors, we can connect until 2 seconds have passed + pm.peerStore[NumberFailedConnBook][p1] = 2 + pm.peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second) + + # cant be connected after 1 second + await sleepAsync(chronos.milliseconds(1000)) + check: + pm.canBeConnected(p1) == false + + # can be connected after 2 seconds + await sleepAsync(chronos.milliseconds(1200)) + check: + pm.canBeConnected(p1) == true + + # can't be connected if failed attempts are equal to maxFailedAttempts + pm.maxFailedAttempts = 2 + check: + pm.canBeConnected(p1) == false + + test "peer manager must fail if max backoff is over a week": + # Should result in overflow exception + expect(Defect): + let pm = PeerManager.new( + switch = SwitchBuilder.new().withRng(rng).withMplex().withNoise() + .withPeerStore(10) + .withMaxConnections(5) + .build(), + maxFailedAttempts = 150, + storage = nil) + + # Should result in backoff > 1 week + expect(Defect): + let pm = PeerManager.new( + switch = SwitchBuilder.new().withRng(rng).withMplex().withNoise() + .withPeerStore(10) + .withMaxConnections(5) + .build(), + maxFailedAttempts = 10, + storage = nil) + + let pm = PeerManager.new( + switch = SwitchBuilder.new().withRng(rng).withMplex().withNoise() + .withPeerStore(10) + .withMaxConnections(5) + .build(), + maxFailedAttempts = 5, + storage = nil) diff --git a/tests/v2/test_peer_store_extended.nim b/tests/v2/test_peer_store_extended.nim index 2bafac6c5..64c6552b5 100644 --- a/tests/v2/test_peer_store_extended.nim +++ b/tests/v2/test_peer_store_extended.nim @@ -318,42 +318,3 @@ suite "Extended nim-libp2p Peer Store": peerStore[DisconnectBook][p1] == 0 peerStore[SourceBook][p1] == default(PeerOrigin) peerStore[DirectionBook][p1] == default(PeerDirection) - - asyncTest "canBeConnected() returns correct value": - let peerStore = PeerStore.new(nil, capacity = 5) - var p1: PeerId - require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1") - - # with InitialBackoffInSec = 1 backoffs are: 1, 2, 4, 8secs. - let initialBackoffInSec = 1 - let backoffFactor = 2 - - # new peer with no errors can be connected - check: - peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == true - - # peer with ONE error that just failed - peerStore[NumberFailedConnBook][p1] = 1 - peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second) - # we cant connect right now - check: - peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == false - - # but we can after the first backoff of 1 seconds - await sleepAsync(1200) - check: - peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == true - - # peer with TWO errors, we can connect until 2 seconds have passed - peerStore[NumberFailedConnBook][p1] = 2 - peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second) - - # cant be connected after 1 second - await sleepAsync(1000) - check: - peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == false - - # can be connected after 2 seconds - await sleepAsync(1200) - check: - peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == true diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index 36d25f740..be4669771 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -5,7 +5,7 @@ else: import - std/[options, sets, sequtils, times, strutils], + std/[options, sets, sequtils, times, strutils, math], chronos, chronicles, metrics, @@ -78,6 +78,13 @@ proc protocolMatcher*(codec: string): Matcher = return match +proc calculateBackoff(initialBackoffInSec: int, + backoffFactor: int, + failedAttempts: int): timer.Duration = + if failedAttempts == 0: + return chronos.seconds(0) + return chronos.seconds(initialBackoffInSec*(backoffFactor^(failedAttempts-1))) + #################### # Helper functions # #################### @@ -235,6 +242,30 @@ proc loadFromStorage(pm: PeerManager) = else: debug "successfully queried peer storage" +proc canBeConnected*(pm: PeerManager, + peerId: PeerId): bool = + # Returns if we can try to connect to this peer, based on past failed attempts + # It uses an exponential backoff. Each connection attempt makes us + # wait more before trying again. + let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId] + + # if it never errored, we can try to connect + if failedAttempts == 0: + return true + + # if there are too many failed attempts, do not reconnect + if failedAttempts >= pm.maxFailedAttempts: + return false + + # If it errored we wait an exponential backoff from last connection + # the more failed attempts, the greater the backoff since last attempt + let now = Moment.init(getTime().toUnix, Second) + let lastFailed = pm.peerStore[LastFailedConnBook][peerId] + let backoff = calculateBackoff(pm.initialBackoffInSec, pm.backoffFactor, failedAttempts) + if now >= (lastFailed + backoff): + return true + return false + ################## # Initialisation # ################## @@ -286,12 +317,20 @@ proc new*(T: type PeerManager, maxConnections = maxConnections raise newException(Defect, "Max number of connections can't be greater than PeerManager capacity") + # attempt to calculate max backoff to prevent potential overflows or unreasonably high values + let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, maxFailedAttempts) + if backoff.weeks() > 1: + error "Max backoff time can't be over 1 week", + maxBackoff=backoff + raise newException(Defect, "Max backoff time can't be over 1 week") + let pm = PeerManager(switch: switch, peerStore: switch.peerStore, storage: storage, initialBackoffInSec: initialBackoffInSec, backoffFactor: backoffFactor, maxFailedAttempts: maxFailedAttempts) + proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} = onConnEvent(pm, peerId, event) @@ -467,9 +506,7 @@ proc connectToRelayPeers*(pm: PeerManager) {.async.} = # TODO: Track only relay connections (nwaku/issues/1566) let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs)) - let outsideBackoffPeers = notConnectedPeers.filterIt(pm.peerStore.canBeConnected(it.peerId, - pm.initialBackoffInSec, - pm.backoffFactor)) + let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId)) let numPeersToConnect = min(min(maxConnections - totalRelayPeers, outsideBackoffPeers.len), MaxParalelDials) info "Relay peer connections", diff --git a/waku/v2/node/peer_manager/waku_peer_store.nim b/waku/v2/node/peer_manager/waku_peer_store.nim index b5ee166e2..28baa8cfe 100644 --- a/waku/v2/node/peer_manager/waku_peer_store.nim +++ b/waku/v2/node/peer_manager/waku_peer_store.nim @@ -43,28 +43,6 @@ type # Peer Store API # ################## -proc canBeConnected*(peerStore: PeerStore, - peerId: PeerId, - initialBackoffInSec: int, - backoffFactor: int): bool = - # Returns if we can try to connect to this peer, based on past failed attempts - # It uses an exponential backoff. Each connection attempt makes us - # wait more before trying again. - let failedAttempts = peerStore[NumberFailedConnBook][peerId] - - # if it never errored, we can try to connect - if failedAttempts == 0: - return true - - # If it errored we wait an exponential backoff from last connection - # the more failed attemps, the greater the backoff since last attempt - let now = Moment.init(getTime().toUnix, Second) - let lastFailed = peerStore[LastFailedConnBook][peerId] - let backoff = chronos.seconds(initialBackoffInSec*(backoffFactor^(failedAttempts-1))) - if now >= (lastFailed + backoff): - return true - return false - proc delete*(peerStore: PeerStore, peerId: PeerId) = # Delete all the information of a given peer.