feat(networking): add backoff period after failed dial (#1462)

* feat(networking): add exponential backoff when dialing relay peers

* feat(networking): fix tests

* revert withTimeout

* feat(networking): refactor tests

* feat(networking): improve logs + ping using switch

* feat(networking): fix backoff bug + fix tests

* feat(networking): fix comments
This commit is contained in:
Alvaro Revuelta 2023-01-23 21:24:46 +01:00 committed by GitHub
parent 9a12872465
commit 028efc8547
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 323 additions and 137 deletions

View File

@ -43,3 +43,6 @@ proc getRng(): ref rand.HmacDrbgContext =
template rng*(): ref rand.HmacDrbgContext =
getRng()
proc generateKey*(): crypto.PrivateKey =
return crypto.PrivateKey.random(Secp256k1, rng[])[]

View File

@ -30,63 +30,51 @@ import
procSuite "Peer Manager":
asyncTest "Peer dialing works":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60800))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60802))
peerInfo2 = node2.switch.peerInfo
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)))
await allFutures([node1.start(), node2.start()])
await node1.mountRelay()
await node2.mountRelay()
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
# Dial node2 from node1
let conn = (await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec)).get()
let conn = (await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec)).get()
# Check connection
check:
conn.activity
conn.peerId == peerInfo2.peerId
conn.peerId == nodes[1].peerInfo.peerId
# Check that node2 is being managed in node1
check:
node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[1].peerInfo.peerId)
# Check connectedness
check:
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connectedness.Connected
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connectedness.Connected
await allFutures([node1.stop(), node2.stop()])
await allFutures(nodes.mapIt(it.stop()))
asyncTest "Dialing fails gracefully":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60810))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60812))
peerInfo2 = node2.switch.peerInfo
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)))
await nodes[0].start()
await nodes[0].mountRelay()
await node1.start()
# Purposefully don't start node2
await node1.mountRelay()
await node2.mountRelay()
# Dial node2 from node1
let connOpt = await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
let connOpt = await nodes[1].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
# Check connection failed gracefully
check:
connOpt.isNone()
await node1.stop()
await nodes[0].stop()
asyncTest "Adding, selecting and filtering peers work":
let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60820))
node = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))
# Create filter peer
filterLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
filterKey = crypto.PrivateKey.random(ECDSA, rng[]).get()
@ -128,54 +116,100 @@ procSuite "Peer Manager":
asyncTest "Peer manager keeps track of connections":
let
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60830))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60832))
peerInfo2 = node2.switch.peerInfo
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)))
await node1.start()
await nodes[0].start()
# Do not start node2
await node1.mountRelay()
await node2.mountRelay()
await allFutures(nodes.mapIt(it.mountRelay()))
# Test default connectedness for new peers
node1.peerManager.addPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec)
nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec)
check:
# No information about node2's connectedness
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == NotConnected
# Purposefully don't start node2
# Attempt dialing node2 from node1
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
discard await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
check:
# Cannot connect to node2
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == CannotConnect
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == CannotConnect
# Successful connection
await node2.start()
discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
await nodes[1].start()
discard await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 2.seconds)
check:
# Currently connected to node2
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connected
# Stop node. Gracefully disconnect from all peers.
await node1.stop()
await nodes[0].stop()
check:
# Not currently connected to node2, but had recent, successful connection.
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == CanConnect
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == CanConnect
await node2.stop()
await nodes[1].stop()
asyncTest "Peer manager updates failed peers correctly":
# Create 2 nodes
let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)))
await nodes[0].start()
await nodes[0].mountRelay()
nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec)
# Set a low backoff to speed up test: 2, 4, 8, 16
nodes[0].peerManager.initialBackoffInSec = 2
nodes[0].peerManager.backoffFactor = 2
# node2 is not started, so dialing will fail
let conn1 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 1.seconds)
check:
# Cannot connect to node2
nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == CannotConnect
nodes[0].peerManager.peerStore[ConnectionBook][nodes[1].peerInfo.peerId] == CannotConnect
nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] == 1
# And the connection failed
conn1.isNone() == true
# Right after failing there is a backoff period
nodes[0].peerManager.peerStore.canBeConnected(
nodes[1].peerInfo.peerId,
nodes[0].peerManager.initialBackoffInSec,
nodes[0].peerManager.backoffFactor) == false
# We wait the first backoff period
await sleepAsync(2100.milliseconds)
# And backoff period is over
check:
nodes[0].peerManager.peerStore.canBeConnected(
nodes[1].peerInfo.peerId,
nodes[0].peerManager.initialBackoffInSec,
nodes[0].peerManager.backoffFactor) == true
await nodes[1].start()
await nodes[1].mountRelay()
# Now we can connect and failed count is reset
let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 1.seconds)
check:
conn2.isNone() == false
nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] == 0
await allFutures(nodes.mapIt(it.stop()))
asyncTest "Peer manager can use persistent storage and survive restarts":
let
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60840), peerStorage = storage)
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60842))
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(0))
peerInfo2 = node2.switch.peerInfo
await node1.start()
@ -194,7 +228,7 @@ procSuite "Peer Manager":
# Simulate restart by initialising a new node using the same storage
let
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60844), peerStorage = storage)
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
await node3.start()
check:
@ -219,9 +253,9 @@ procSuite "Peer Manager":
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60850), peerStorage = storage)
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60852))
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(0))
peerInfo2 = node2.switch.peerInfo
betaCodec = "/vac/waku/relay/2.0.0-beta2"
stableCodec = "/vac/waku/relay/2.0.0"
@ -245,7 +279,7 @@ procSuite "Peer Manager":
# Simulate restart by initialising a new node using the same storage
let
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60854), peerStorage = storage)
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage)
await node3.mountRelay()
node3.wakuRelay.codec = stableCodec
@ -273,11 +307,7 @@ procSuite "Peer Manager":
asyncTest "Peer manager connects to all peers supporting a given protocol":
# Create 4 nodes
var nodes: seq[WakuNode]
for i in 0..<4:
let nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
let node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60860 + i))
nodes &= node
let nodes = toSeq(0..<4).mapIt(WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)))
# Start them
await allFutures(nodes.mapIt(it.start()))
@ -317,11 +347,7 @@ procSuite "Peer Manager":
asyncTest "Peer store keeps track of incoming connections":
# Create 4 nodes
var nodes: seq[WakuNode]
for i in 0..<4:
let nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
let node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60865 + i))
nodes &= node
let nodes = toSeq(0..<4).mapIt(WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)))
# Start them
await allFutures(nodes.mapIt(it.start()))

View File

@ -1,8 +1,10 @@
{.used.}
import
std/[options,sequtils],
std/[options,sequtils, times],
chronos,
libp2p/crypto/crypto,
libp2p/peerid,
libp2p/peerstore,
libp2p/multiaddress,
testutils/unittests
@ -10,8 +12,7 @@ import
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/node/peer_manager/waku_peer_store,
../../waku/v2/node/waku_node,
../test_helpers,
./testlib/testutils
../test_helpers
suite "Extended nim-libp2p Peer Store":
@ -45,6 +46,8 @@ suite "Extended nim-libp2p Peer Store":
peerStore[DisconnectBook][p1] = 0
peerStore[SourceBook][p1] = Discv5
peerStore[DirectionBook][p1] = Inbound
peerStore[NumberFailedConnBook][p1] = 1
peerStore[LastFailedConnBook][p1] = Moment.init(1001, Second)
# Peer2: Connected
peerStore[AddressBook][p2] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/2").tryGet()]
@ -56,6 +59,8 @@ suite "Extended nim-libp2p Peer Store":
peerStore[DisconnectBook][p2] = 0
peerStore[SourceBook][p2] = Discv5
peerStore[DirectionBook][p2] = Inbound
peerStore[NumberFailedConnBook][p2] = 2
peerStore[LastFailedConnBook][p2] = Moment.init(1002, Second)
# Peer3: Connected
peerStore[AddressBook][p3] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()]
@ -67,6 +72,8 @@ suite "Extended nim-libp2p Peer Store":
peerStore[DisconnectBook][p3] = 0
peerStore[SourceBook][p3] = Discv5
peerStore[DirectionBook][p3] = Inbound
peerStore[NumberFailedConnBook][p3] = 3
peerStore[LastFailedConnBook][p3] = Moment.init(1003, Second)
# Peer4: Added but never connected
peerStore[AddressBook][p4] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/4").tryGet()]
@ -78,6 +85,8 @@ suite "Extended nim-libp2p Peer Store":
peerStore[DisconnectBook][p4] = 0
peerStore[SourceBook][p4] = Discv5
peerStore[DirectionBook][p4] = Inbound
peerStore[NumberFailedConnBook][p4] = 4
peerStore[LastFailedConnBook][p4] = Moment.init(1004, Second)
# Peer5: Connecteed in the past
peerStore[AddressBook][p5] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/5").tryGet()]
@ -89,6 +98,8 @@ suite "Extended nim-libp2p Peer Store":
peerStore[DisconnectBook][p5] = 1000
peerStore[SourceBook][p5] = Discv5
peerStore[DirectionBook][p5] = Outbound
peerStore[NumberFailedConnBook][p5] = 5
peerStore[LastFailedConnBook][p5] = Moment.init(1005, Second)
test "get() returns the correct StoredInfo for a given PeerId":
# When
@ -108,17 +119,21 @@ suite "Extended nim-libp2p Peer Store":
storedInfoPeer1.connectedness == Connected
storedInfoPeer1.disconnectTime == 0
storedInfoPeer1.origin == Discv5
storedInfoPeer1.numberFailedConn == 1
storedInfoPeer1.lastFailedConn == Moment.init(1001, Second)
check:
# fields are empty
# fields are empty, not part of the peerstore
storedInfoPeer6.peerId == p6
storedInfoPeer6.addrs.len == 0
storedInfoPeer6.protos.len == 0
storedInfoPeer6.agent == ""
storedInfoPeer6.protoVersion == ""
storedInfoPeer6.connectedness == NotConnected
storedInfoPeer6.disconnectTime == 0
storedInfoPeer6.origin == UnknownOrigin
storedInfoPeer6.agent == default(string)
storedInfoPeer6.protoVersion == default(string)
storedInfoPeer6.connectedness == default(Connectedness)
storedInfoPeer6.disconnectTime == default(int)
storedInfoPeer6.origin == default(PeerOrigin)
storedInfoPeer6.numberFailedConn == default(int)
storedInfoPeer6.lastFailedConn == default(Moment)
test "peers() returns all StoredInfo of the PeerStore":
# When
@ -146,6 +161,8 @@ suite "Extended nim-libp2p Peer Store":
p3.connectedness == Connected
p3.disconnectTime == 0
p3.origin == Discv5
p3.numberFailedConn == 3
p3.lastFailedConn == Moment.init(1003, Second)
test "peers() returns all StoredInfo matching a specific protocol":
# When
@ -280,3 +297,76 @@ suite "Extended nim-libp2p Peer Store":
disconnedtedPeers.anyIt(it.peerId == p4)
disconnedtedPeers.anyIt(it.peerId == p5)
not disconnedtedPeers.anyIt(it.connectedness == Connected)
test "del() successfully deletes waku custom books":
# Given
let peerStore = PeerStore.new(capacity = 5)
var p1: PeerId
require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1")
peerStore[AddressBook][p1] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
peerStore[ProtoBook][p1] = @["proto"]
peerStore[KeyBook][p1] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey
peerStore[AgentBook][p1] = "agent"
peerStore[ProtoVersionBook][p1] = "version"
peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second)
peerStore[NumberFailedConnBook][p1] = 1
peerStore[ConnectionBook][p1] = Connected
peerStore[DisconnectBook][p1] = 0
peerStore[SourceBook][p1] = Discv5
peerStore[DirectionBook][p1] = Inbound
# When
peerStore.del(p1)
# Then
check:
peerStore[AddressBook][p1] == newSeq[MultiAddress](0)
peerStore[ProtoBook][p1] == newSeq[string](0)
peerStore[KeyBook][p1] == default(PublicKey)
peerStore[AgentBook][p1] == ""
peerStore[ProtoVersionBook][p1] == ""
peerStore[LastFailedConnBook][p1] == default(Moment)
peerStore[NumberFailedConnBook][p1] == 0
peerStore[ConnectionBook][p1] == default(Connectedness)
peerStore[DisconnectBook][p1] == 0
peerStore[SourceBook][p1] == default(PeerOrigin)
peerStore[DirectionBook][p1] == default(PeerDirection)
asyncTest "canBeConnected() returns correct value":
let peerStore = PeerStore.new(capacity = 5)
var p1: PeerId
require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1")
# with InitialBackoffInSec = 1 backoffs are: 1, 2, 4, 8secs.
let initialBackoffInSec = 1
let backoffFactor = 2
# new peer with no errors can be connected
check:
peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == true
# peer with ONE error that just failed
peerStore[NumberFailedConnBook][p1] = 1
peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second)
# we cant connect right now
check:
peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == false
# but we can after the first backoff of 1 seconds
await sleepAsync(1200)
check:
peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == true
# peer with TWO errors, we can connect until 2 seconds have passed
peerStore[NumberFailedConnBook][p1] = 2
peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second)
# cant be connected after 1 second
await sleepAsync(1000)
check:
peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == false
# can be connected after 2 seconds
await sleepAsync(1200)
check:
peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == true

View File

@ -95,9 +95,6 @@ proc findRandomPeers*(wakuDiscv5: WakuDiscoveryV5): Future[Result[seq[RemotePeer
error "Failed to convert ENR to peer info", enr= $node.record, err=res.error()
waku_discv5_errors.inc(labelValues = ["peer_info_failure"])
if discoveredPeers.len > 0:
info "Successfully discovered nodes", count=discoveredPeers.len
waku_discv5_discovered.inc(discoveredPeers.len.int64)
return ok(discoveredPeers)

View File

@ -27,22 +27,34 @@ declarePublicGauge waku_connected_peers, "Number of connected peers per directio
logScope:
topics = "waku node peer_manager"
type
PeerManager* = ref object of RootObj
switch*: Switch
peerStore*: PeerStore
storage: PeerStorage
const
# TODO: Make configurable
DefaultDialTimeout = chronos.seconds(10)
# Max attempts before removing the peer
MaxFailedAttempts = 5
# Time to wait before attempting to dial again is calculated as:
# initialBackoffInSec*(backoffFactor^(failedAttempts-1))
# 120s, 480s, 1920, 7680s
InitialBackoffInSec = 120
BackoffFactor = 4
# limit the amount of paralel dials
MaxParalelDials = 10
# delay between consecutive relayConnectivityLoop runs
ConnectivityLoopInterval = chronos.seconds(30)
type
PeerManager* = ref object of RootObj
switch*: Switch
peerStore*: PeerStore
initialBackoffInSec*: int
backoffFactor*: int
maxFailedAttempts*: int
storage: PeerStorage
####################
# Helper functions #
####################
@ -61,45 +73,56 @@ proc insertOrReplace(ps: PeerStorage,
proc dialPeer(pm: PeerManager, peerId: PeerID,
addrs: seq[MultiAddress], proto: string,
dialTimeout = DefaultDialTimeout,
source = "api"
source = "api",
): Future[Option[Connection]] {.async.} =
# Do not attempt to dial self
if peerId == pm.switch.peerInfo.peerId:
return none(Connection)
info "Dialing peer from manager", wireAddr = addrs, peerId = peerId
let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
debug "Dialing peer", wireAddr = addrs, peerId = peerId, failedAttempts=failedAttempts
# Dial Peer
let dialFut = pm.switch.dial(peerId, addrs, proto)
var reasonFailed = ""
try:
# Attempt to dial remote peer
if (await dialFut.withTimeout(DefaultDialTimeout)):
if (await dialFut.withTimeout(dialTimeout)):
waku_peers_dials.inc(labelValues = ["successful"])
# TODO: This will be populated from the peerstore info when ready
waku_node_conns_initiated.inc(labelValues = [source])
pm.peerStore[NumberFailedConnBook][peerId] = 0
return some(dialFut.read())
else:
# TODO: any redial attempts?
debug "Dialing remote peer timed out"
waku_peers_dials.inc(labelValues = ["timeout"])
reasonFailed = "timeout"
await cancelAndWait(dialFut)
except CatchableError as exc:
reasonFailed = "failed"
pm.peerStore[ConnectionBook][peerId] = CannotConnect
if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect)
# Dial failed
pm.peerStore[NumberFailedConnBook][peerId] = pm.peerStore[NumberFailedConnBook][peerId] + 1
pm.peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second)
pm.peerStore[ConnectionBook][peerId] = CannotConnect
return none(Connection)
except CatchableError as e:
# TODO: any redial attempts?
debug "Dialing remote peer failed", msg = e.msg
waku_peers_dials.inc(labelValues = ["failed"])
debug "Dialing peer failed", peerId = peerId, reason = reasonFailed, failedAttempts=failedAttempts
waku_peers_dials.inc(labelValues = [reasonFailed])
pm.peerStore[ConnectionBook][peerId] = CannotConnect
if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect)
# Update storage
if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect)
return none(Connection)
return none(Connection)
# TODO: To be addressed in nwaku/pull/1473. Do not prune service peers
# TODO: Currently unused
proc prunePeerStore(pm: PeerManager) =
# iterate peers in peerstore
# skip service peers
#if pm.peerStore[NumberFailedConnBook][peerId] >= pm.maxFailedAttempts:
# debug "Removing peer from peer store", peerId = peerId, failedAttempts=failedAttempts
# pm.peerStore.del(peerId)
doAssert(false, "Not implemented!")
proc loadFromStorage(pm: PeerManager) =
debug "loading peers from storage"
@ -156,10 +179,19 @@ proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix)
return
proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): PeerManager =
proc new*(T: type PeerManager,
switch: Switch,
storage: PeerStorage = nil,
initialBackoffInSec = InitialBackoffInSec,
backoffFactor = BackoffFactor,
maxFailedAttempts = MaxFailedAttempts,): PeerManager =
let pm = PeerManager(switch: switch,
peerStore: switch.peerStore,
storage: storage)
storage: storage,
initialBackoffInSec: initialBackoffInSec,
backoffFactor: backoffFactor,
maxFailedAttempts: maxFailedAttempts)
proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(pm, peerId, event)
@ -248,7 +280,8 @@ proc dialPeer*(pm: PeerManager,
remotePeerInfo: RemotePeerInfo,
proto: string,
dialTimeout = DefaultDialTimeout,
source = "api"): Future[Option[Connection]] {.async.} =
source = "api",
): Future[Option[Connection]] {.async.} =
# Dial a given peer and add it to the list of known peers
# TODO: check peer validity and score before continuing. Limit number of peers to be managed.
@ -263,7 +296,7 @@ proc dialPeer*(pm: PeerManager,
peerId: PeerID,
proto: string,
dialTimeout = DefaultDialTimeout,
source = "api"
source = "api",
): Future[Option[Connection]] {.async.} =
# Dial an existing peer by looking up it's existing addrs in the switch's peerStore
# TODO: check peer validity and score before continuing. Limit number of peers to be managed.
@ -276,7 +309,10 @@ proc connectToNodes*(pm: PeerManager,
proto: string,
dialTimeout = DefaultDialTimeout,
source = "api") {.async.} =
info "connectToNodes", len = nodes.len
if nodes.len == 0:
return
info "Dialing multiple peers", numOfPeers = nodes.len
var futConns: seq[Future[Option[Connection]]]
for node in nodes:
@ -285,6 +321,9 @@ proc connectToNodes*(pm: PeerManager,
futConns.add(pm.dialPeer(RemotePeerInfo(node), proto, dialTimeout, source))
await allFutures(futConns)
let successfulConns = futConns.mapIt(it.read()).countIt(it.isSome)
info "Finished dialing multiple peers", successfulConns=successfulConns, attempted=nodes.len
# The issue seems to be around peers not being fully connected when
# trying to subscribe. So what we do is sleep to guarantee nodes are
@ -310,17 +349,18 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
await sleepAsync(ConnectivityLoopInterval)
continue
# TODO: Respect backoff before attempting to connect a relay peer
var disconnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
shuffle(disconnectedPeers)
let numPeersToConnect = min(min(maxConnections - numConPeers, disconnectedPeers.len), MaxParalelDials)
let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.peerStore.canBeConnected(it.peerId,
pm.initialBackoffInSec,
pm.backoffFactor))
let numPeersToConnect = min(min(maxConnections - numConPeers, outsideBackoffPeers.len), MaxParalelDials)
info "Relay connectivity loop",
connectedPeers = numConPeers,
targetConnectedPeers = maxConnections,
availableDisconnectedPeers = disconnectedPeers.len
notConnectedPeers = notConnectedPeers.len,
outsideBackoffPeers = outsideBackoffPeers.len
await pm.connectToNodes(disconnectedPeers[0..<numPeersToConnect], WakuRelayCodec)
await pm.connectToNodes(outsideBackoffPeers[0..<numPeersToConnect], WakuRelayCodec)
await sleepAsync(ConnectivityLoopInterval)

View File

@ -4,7 +4,8 @@ else:
{.push raises: [].}
import
std/[tables, sequtils, sets, options],
std/[tables, sequtils, sets, options, times, math],
chronos,
libp2p/builders,
libp2p/peerstore
@ -13,8 +14,6 @@ import
export peerstore, builders
# TODO rename to peer_store_extended to emphasize its a nimlibp2 extension
type
Connectedness* = enum
# NotConnected: default state for a new peer. No connection and no further information on connectedness.
@ -32,7 +31,7 @@ type
Static,
Dns
Direction* = enum
PeerDirection* = enum
UnknownDirection,
Inbound,
Outbound
@ -40,6 +39,12 @@ type
# Keeps track of the Connectedness state of a peer
ConnectionBook* = ref object of PeerBook[Connectedness]
# Last failed connection attemp timestamp
LastFailedConnBook* = ref object of PeerBook[Moment]
# Failed connection attempts
NumberFailedConnBook* = ref object of PeerBook[int]
# Keeps track of when peers were disconnected in Unix timestamps
DisconnectBook* = ref object of PeerBook[int64]
@ -47,7 +52,7 @@ type
SourceBook* = ref object of PeerBook[PeerOrigin]
# Direction
DirectionBook* = ref object of PeerBook[Direction]
DirectionBook* = ref object of PeerBook[PeerDirection]
StoredInfo* = object
# Taken from nim-libp2
@ -62,12 +67,41 @@ type
connectedness*: Connectedness
disconnectTime*: int64
origin*: PeerOrigin
direction*: Direction
direction*: PeerDirection
lastFailedConn*: Moment
numberFailedConn*: int
##################
# Peer Store API #
##################
proc canBeConnected*(peerStore: PeerStore,
peerId: PeerId,
initialBackoffInSec: int,
backoffFactor: int): bool =
# Returns if we can try to connect to this peer, based on past failed attempts
# It uses an exponential backoff. Each connection attempt makes us
# wait more before trying again.
let failedAttempts = peerStore[NumberFailedConnBook][peerId]
# if it never errored, we can try to connect
if failedAttempts == 0:
return true
# If it errored we wait an exponential backoff from last connection
# the more failed attemps, the greater the backoff since last attempt
let now = Moment.init(getTime().toUnix, Second)
let lastFailed = peerStore[LastFailedConnBook][peerId]
let backoff = chronos.seconds(initialBackoffInSec*(backoffFactor^(failedAttempts-1)))
if now >= (lastFailed + backoff):
return true
return false
proc delete*(peerStore: PeerStore,
peerId: PeerId) =
# Delete all the information of a given peer.
peerStore.del(peerId)
proc get*(peerStore: PeerStore,
peerId: PeerID): StoredInfo =
## Get the stored information of a given peer.
@ -85,6 +119,8 @@ proc get*(peerStore: PeerStore,
disconnectTime: peerStore[DisconnectBook][peerId],
origin: peerStore[SourceBook][peerId],
direction: peerStore[DirectionBook][peerId],
lastFailedConn: peerStore[LastFailedConnBook][peerId],
numberFailedConn: peerStore[NumberFailedConnBook][peerId]
)
# TODO: Rename peers() to getPeersByProtocol()
@ -141,7 +177,7 @@ proc selectPeer*(peerStore: PeerStore, proto: string): Option[RemotePeerInfo] =
else:
return none(RemotePeerInfo)
proc getPeersByDirection*(peerStore: PeerStore, direction: Direction): seq[StoredInfo] =
proc getPeersByDirection*(peerStore: PeerStore, direction: PeerDirection): seq[StoredInfo] =
return peerStore.peers.filterIt(it.direction == direction)
proc getNotConnectedPeers*(peerStore: PeerStore): seq[StoredInfo] =

View File

@ -879,17 +879,12 @@ proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
.filterIt(it.connectedness == Connected)
.mapIt(it.toRemotePeerInfo())
# Attempt to retrieve and ping the active outgoing connection for each peer
for peer in peers:
let connOpt = await node.peerManager.dialPeer(peer, PingCodec)
if connOpt.isNone():
# TODO: more sophisticated error handling here
debug "failed to connect to remote peer", peer=peer
try:
let conn = await node.switch.dial(peer.peerId, peer.addrs, PingCodec)
let pingDelay = await node.libp2pPing.ping(conn)
except CatchableError as exc:
waku_node_errors.inc(labelValues = ["keep_alive_failure"])
return
discard await node.libp2pPing.ping(connOpt.get()) # Ping connection
await sleepAsync(keepalive)
@ -911,16 +906,15 @@ proc runDiscv5Loop(node: WakuNode) {.async.} =
while node.wakuDiscv5.listening:
trace "Running discovery loop"
## Query for a random target and collect all discovered nodes
## TODO: we could filter nodes here
let discoveredPeers = await node.wakuDiscv5.findRandomPeers()
if discoveredPeers.isOk():
## Let's attempt to connect to peers we
## have not encountered before
let discoveredPeersRes = await node.wakuDiscv5.findRandomPeers()
trace "Discovered peers", count=discoveredPeers.get().len()
if discoveredPeersRes.isOk:
let discoveredPeers = discoveredPeersRes.get
let newSeen = discoveredPeers.countIt(not node.peerManager.peerStore[AddressBook].contains(it.peerId))
info "Discovered peers", discovered=discoveredPeers.len, new=newSeen
for peer in discoveredPeers.get():
# Add all peers, new ones and already seen (in case their addresses changed)
for peer in discoveredPeers:
# TODO: proto: WakuRelayCodec will be removed from add peer
node.peerManager.addPeer(peer, WakuRelayCodec)