mirror of https://github.com/waku-org/nwaku.git
bug: move canBeConnected to PeerManager and check for potential overflow (#1670)
This commit is contained in:
parent
73cbafa658
commit
d5c2770c54
|
@ -1,7 +1,7 @@
|
||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, sequtils],
|
std/[options, sequtils, times],
|
||||||
stew/shims/net as stewNet,
|
stew/shims/net as stewNet,
|
||||||
testutils/unittests,
|
testutils/unittests,
|
||||||
chronos,
|
chronos,
|
||||||
|
@ -195,20 +195,14 @@ procSuite "Peer Manager":
|
||||||
conn1Ok == false
|
conn1Ok == false
|
||||||
|
|
||||||
# Right after failing there is a backoff period
|
# Right after failing there is a backoff period
|
||||||
nodes[0].peerManager.peerStore.canBeConnected(
|
nodes[0].peerManager.canBeConnected(nonExistentPeer.peerId) == false
|
||||||
nonExistentPeer.peerId,
|
|
||||||
nodes[0].peerManager.initialBackoffInSec,
|
|
||||||
nodes[0].peerManager.backoffFactor) == false
|
|
||||||
|
|
||||||
# We wait the first backoff period
|
# We wait the first backoff period
|
||||||
await sleepAsync(2100.milliseconds)
|
await sleepAsync(chronos.milliseconds(2100))
|
||||||
|
|
||||||
# And backoff period is over
|
# And backoff period is over
|
||||||
check:
|
check:
|
||||||
nodes[0].peerManager.peerStore.canBeConnected(
|
nodes[0].peerManager.canBeConnected(nodes[1].peerInfo.peerId) == true
|
||||||
nodes[1].peerInfo.peerId,
|
|
||||||
nodes[0].peerManager.initialBackoffInSec,
|
|
||||||
nodes[0].peerManager.backoffFactor) == true
|
|
||||||
|
|
||||||
# After a successful connection, the number of failed connections is reset
|
# After a successful connection, the number of failed connections is reset
|
||||||
nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] = 4
|
nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] = 4
|
||||||
|
@ -655,3 +649,81 @@ procSuite "Peer Manager":
|
||||||
not pm.peerStore.peers.anyIt(it.peerId == peers[0].peerId)
|
not pm.peerStore.peers.anyIt(it.peerId == peers[0].peerId)
|
||||||
not pm.peerStore.peers.anyIt(it.peerId == peers[1].peerId)
|
not pm.peerStore.peers.anyIt(it.peerId == peers[1].peerId)
|
||||||
not pm.peerStore.peers.anyIt(it.peerId == peers[2].peerId)
|
not pm.peerStore.peers.anyIt(it.peerId == peers[2].peerId)
|
||||||
|
|
||||||
|
asyncTest "canBeConnected() returns correct value":
|
||||||
|
let pm = PeerManager.new(
|
||||||
|
switch = SwitchBuilder.new().withRng(rng).withMplex().withNoise()
|
||||||
|
.withPeerStore(10)
|
||||||
|
.withMaxConnections(5)
|
||||||
|
.build(),
|
||||||
|
initialBackoffInSec = 1, # with InitialBackoffInSec = 1 backoffs are: 1, 2, 4, 8secs.
|
||||||
|
backoffFactor = 2,
|
||||||
|
maxFailedAttempts = 10,
|
||||||
|
storage = nil)
|
||||||
|
var p1: PeerId
|
||||||
|
require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1")
|
||||||
|
|
||||||
|
|
||||||
|
# new peer with no errors can be connected
|
||||||
|
check:
|
||||||
|
pm.canBeConnected(p1) == true
|
||||||
|
|
||||||
|
# peer with ONE error that just failed
|
||||||
|
pm.peerStore[NumberFailedConnBook][p1] = 1
|
||||||
|
pm.peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second)
|
||||||
|
# we cant connect right now
|
||||||
|
check:
|
||||||
|
pm.canBeConnected(p1) == false
|
||||||
|
|
||||||
|
# but we can after the first backoff of 1 seconds
|
||||||
|
await sleepAsync(chronos.milliseconds(1200))
|
||||||
|
check:
|
||||||
|
pm.canBeConnected(p1) == true
|
||||||
|
|
||||||
|
# peer with TWO errors, we can connect until 2 seconds have passed
|
||||||
|
pm.peerStore[NumberFailedConnBook][p1] = 2
|
||||||
|
pm.peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second)
|
||||||
|
|
||||||
|
# cant be connected after 1 second
|
||||||
|
await sleepAsync(chronos.milliseconds(1000))
|
||||||
|
check:
|
||||||
|
pm.canBeConnected(p1) == false
|
||||||
|
|
||||||
|
# can be connected after 2 seconds
|
||||||
|
await sleepAsync(chronos.milliseconds(1200))
|
||||||
|
check:
|
||||||
|
pm.canBeConnected(p1) == true
|
||||||
|
|
||||||
|
# can't be connected if failed attempts are equal to maxFailedAttempts
|
||||||
|
pm.maxFailedAttempts = 2
|
||||||
|
check:
|
||||||
|
pm.canBeConnected(p1) == false
|
||||||
|
|
||||||
|
test "peer manager must fail if max backoff is over a week":
|
||||||
|
# Should result in overflow exception
|
||||||
|
expect(Defect):
|
||||||
|
let pm = PeerManager.new(
|
||||||
|
switch = SwitchBuilder.new().withRng(rng).withMplex().withNoise()
|
||||||
|
.withPeerStore(10)
|
||||||
|
.withMaxConnections(5)
|
||||||
|
.build(),
|
||||||
|
maxFailedAttempts = 150,
|
||||||
|
storage = nil)
|
||||||
|
|
||||||
|
# Should result in backoff > 1 week
|
||||||
|
expect(Defect):
|
||||||
|
let pm = PeerManager.new(
|
||||||
|
switch = SwitchBuilder.new().withRng(rng).withMplex().withNoise()
|
||||||
|
.withPeerStore(10)
|
||||||
|
.withMaxConnections(5)
|
||||||
|
.build(),
|
||||||
|
maxFailedAttempts = 10,
|
||||||
|
storage = nil)
|
||||||
|
|
||||||
|
let pm = PeerManager.new(
|
||||||
|
switch = SwitchBuilder.new().withRng(rng).withMplex().withNoise()
|
||||||
|
.withPeerStore(10)
|
||||||
|
.withMaxConnections(5)
|
||||||
|
.build(),
|
||||||
|
maxFailedAttempts = 5,
|
||||||
|
storage = nil)
|
||||||
|
|
|
@ -318,42 +318,3 @@ suite "Extended nim-libp2p Peer Store":
|
||||||
peerStore[DisconnectBook][p1] == 0
|
peerStore[DisconnectBook][p1] == 0
|
||||||
peerStore[SourceBook][p1] == default(PeerOrigin)
|
peerStore[SourceBook][p1] == default(PeerOrigin)
|
||||||
peerStore[DirectionBook][p1] == default(PeerDirection)
|
peerStore[DirectionBook][p1] == default(PeerDirection)
|
||||||
|
|
||||||
asyncTest "canBeConnected() returns correct value":
|
|
||||||
let peerStore = PeerStore.new(nil, capacity = 5)
|
|
||||||
var p1: PeerId
|
|
||||||
require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1")
|
|
||||||
|
|
||||||
# with InitialBackoffInSec = 1 backoffs are: 1, 2, 4, 8secs.
|
|
||||||
let initialBackoffInSec = 1
|
|
||||||
let backoffFactor = 2
|
|
||||||
|
|
||||||
# new peer with no errors can be connected
|
|
||||||
check:
|
|
||||||
peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == true
|
|
||||||
|
|
||||||
# peer with ONE error that just failed
|
|
||||||
peerStore[NumberFailedConnBook][p1] = 1
|
|
||||||
peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second)
|
|
||||||
# we cant connect right now
|
|
||||||
check:
|
|
||||||
peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == false
|
|
||||||
|
|
||||||
# but we can after the first backoff of 1 seconds
|
|
||||||
await sleepAsync(1200)
|
|
||||||
check:
|
|
||||||
peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == true
|
|
||||||
|
|
||||||
# peer with TWO errors, we can connect until 2 seconds have passed
|
|
||||||
peerStore[NumberFailedConnBook][p1] = 2
|
|
||||||
peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second)
|
|
||||||
|
|
||||||
# cant be connected after 1 second
|
|
||||||
await sleepAsync(1000)
|
|
||||||
check:
|
|
||||||
peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == false
|
|
||||||
|
|
||||||
# can be connected after 2 seconds
|
|
||||||
await sleepAsync(1200)
|
|
||||||
check:
|
|
||||||
peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == true
|
|
||||||
|
|
|
@ -5,7 +5,7 @@ else:
|
||||||
|
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, sets, sequtils, times, strutils],
|
std/[options, sets, sequtils, times, strutils, math],
|
||||||
chronos,
|
chronos,
|
||||||
chronicles,
|
chronicles,
|
||||||
metrics,
|
metrics,
|
||||||
|
@ -78,6 +78,13 @@ proc protocolMatcher*(codec: string): Matcher =
|
||||||
|
|
||||||
return match
|
return match
|
||||||
|
|
||||||
|
proc calculateBackoff(initialBackoffInSec: int,
|
||||||
|
backoffFactor: int,
|
||||||
|
failedAttempts: int): timer.Duration =
|
||||||
|
if failedAttempts == 0:
|
||||||
|
return chronos.seconds(0)
|
||||||
|
return chronos.seconds(initialBackoffInSec*(backoffFactor^(failedAttempts-1)))
|
||||||
|
|
||||||
####################
|
####################
|
||||||
# Helper functions #
|
# Helper functions #
|
||||||
####################
|
####################
|
||||||
|
@ -235,6 +242,30 @@ proc loadFromStorage(pm: PeerManager) =
|
||||||
else:
|
else:
|
||||||
debug "successfully queried peer storage"
|
debug "successfully queried peer storage"
|
||||||
|
|
||||||
|
proc canBeConnected*(pm: PeerManager,
|
||||||
|
peerId: PeerId): bool =
|
||||||
|
# Returns if we can try to connect to this peer, based on past failed attempts
|
||||||
|
# It uses an exponential backoff. Each connection attempt makes us
|
||||||
|
# wait more before trying again.
|
||||||
|
let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
|
||||||
|
|
||||||
|
# if it never errored, we can try to connect
|
||||||
|
if failedAttempts == 0:
|
||||||
|
return true
|
||||||
|
|
||||||
|
# if there are too many failed attempts, do not reconnect
|
||||||
|
if failedAttempts >= pm.maxFailedAttempts:
|
||||||
|
return false
|
||||||
|
|
||||||
|
# If it errored we wait an exponential backoff from last connection
|
||||||
|
# the more failed attempts, the greater the backoff since last attempt
|
||||||
|
let now = Moment.init(getTime().toUnix, Second)
|
||||||
|
let lastFailed = pm.peerStore[LastFailedConnBook][peerId]
|
||||||
|
let backoff = calculateBackoff(pm.initialBackoffInSec, pm.backoffFactor, failedAttempts)
|
||||||
|
if now >= (lastFailed + backoff):
|
||||||
|
return true
|
||||||
|
return false
|
||||||
|
|
||||||
##################
|
##################
|
||||||
# Initialisation #
|
# Initialisation #
|
||||||
##################
|
##################
|
||||||
|
@ -286,12 +317,20 @@ proc new*(T: type PeerManager,
|
||||||
maxConnections = maxConnections
|
maxConnections = maxConnections
|
||||||
raise newException(Defect, "Max number of connections can't be greater than PeerManager capacity")
|
raise newException(Defect, "Max number of connections can't be greater than PeerManager capacity")
|
||||||
|
|
||||||
|
# attempt to calculate max backoff to prevent potential overflows or unreasonably high values
|
||||||
|
let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, maxFailedAttempts)
|
||||||
|
if backoff.weeks() > 1:
|
||||||
|
error "Max backoff time can't be over 1 week",
|
||||||
|
maxBackoff=backoff
|
||||||
|
raise newException(Defect, "Max backoff time can't be over 1 week")
|
||||||
|
|
||||||
let pm = PeerManager(switch: switch,
|
let pm = PeerManager(switch: switch,
|
||||||
peerStore: switch.peerStore,
|
peerStore: switch.peerStore,
|
||||||
storage: storage,
|
storage: storage,
|
||||||
initialBackoffInSec: initialBackoffInSec,
|
initialBackoffInSec: initialBackoffInSec,
|
||||||
backoffFactor: backoffFactor,
|
backoffFactor: backoffFactor,
|
||||||
maxFailedAttempts: maxFailedAttempts)
|
maxFailedAttempts: maxFailedAttempts)
|
||||||
|
|
||||||
proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
|
proc connHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
|
||||||
onConnEvent(pm, peerId, event)
|
onConnEvent(pm, peerId, event)
|
||||||
|
|
||||||
|
@ -467,9 +506,7 @@ proc connectToRelayPeers*(pm: PeerManager) {.async.} =
|
||||||
|
|
||||||
# TODO: Track only relay connections (nwaku/issues/1566)
|
# TODO: Track only relay connections (nwaku/issues/1566)
|
||||||
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
|
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
|
||||||
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.peerStore.canBeConnected(it.peerId,
|
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
|
||||||
pm.initialBackoffInSec,
|
|
||||||
pm.backoffFactor))
|
|
||||||
let numPeersToConnect = min(min(maxConnections - totalRelayPeers, outsideBackoffPeers.len), MaxParalelDials)
|
let numPeersToConnect = min(min(maxConnections - totalRelayPeers, outsideBackoffPeers.len), MaxParalelDials)
|
||||||
|
|
||||||
info "Relay peer connections",
|
info "Relay peer connections",
|
||||||
|
|
|
@ -43,28 +43,6 @@ type
|
||||||
# Peer Store API #
|
# Peer Store API #
|
||||||
##################
|
##################
|
||||||
|
|
||||||
proc canBeConnected*(peerStore: PeerStore,
|
|
||||||
peerId: PeerId,
|
|
||||||
initialBackoffInSec: int,
|
|
||||||
backoffFactor: int): bool =
|
|
||||||
# Returns if we can try to connect to this peer, based on past failed attempts
|
|
||||||
# It uses an exponential backoff. Each connection attempt makes us
|
|
||||||
# wait more before trying again.
|
|
||||||
let failedAttempts = peerStore[NumberFailedConnBook][peerId]
|
|
||||||
|
|
||||||
# if it never errored, we can try to connect
|
|
||||||
if failedAttempts == 0:
|
|
||||||
return true
|
|
||||||
|
|
||||||
# If it errored we wait an exponential backoff from last connection
|
|
||||||
# the more failed attemps, the greater the backoff since last attempt
|
|
||||||
let now = Moment.init(getTime().toUnix, Second)
|
|
||||||
let lastFailed = peerStore[LastFailedConnBook][peerId]
|
|
||||||
let backoff = chronos.seconds(initialBackoffInSec*(backoffFactor^(failedAttempts-1)))
|
|
||||||
if now >= (lastFailed + backoff):
|
|
||||||
return true
|
|
||||||
return false
|
|
||||||
|
|
||||||
proc delete*(peerStore: PeerStore,
|
proc delete*(peerStore: PeerStore,
|
||||||
peerId: PeerId) =
|
peerId: PeerId) =
|
||||||
# Delete all the information of a given peer.
|
# Delete all the information of a given peer.
|
||||||
|
|
Loading…
Reference in New Issue