diff --git a/tests/test_helpers.nim b/tests/test_helpers.nim index a4d2fd286..5712cb755 100644 --- a/tests/test_helpers.nim +++ b/tests/test_helpers.nim @@ -43,3 +43,6 @@ proc getRng(): ref rand.HmacDrbgContext = template rng*(): ref rand.HmacDrbgContext = getRng() + +proc generateKey*(): crypto.PrivateKey = + return crypto.PrivateKey.random(Secp256k1, rng[])[] diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index 75ae2d913..9d85cf06b 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -30,63 +30,51 @@ import procSuite "Peer Manager": asyncTest "Peer dialing works": - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60800)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60802)) - peerInfo2 = node2.switch.peerInfo + # Create 2 nodes + let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))) - await allFutures([node1.start(), node2.start()]) - - await node1.mountRelay() - await node2.mountRelay() + await allFutures(nodes.mapIt(it.start())) + await allFutures(nodes.mapIt(it.mountRelay())) # Dial node2 from node1 - let conn = (await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec)).get() + let conn = (await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec)).get() # Check connection check: conn.activity - conn.peerId == peerInfo2.peerId + conn.peerId == nodes[1].peerInfo.peerId # Check that node2 is being managed in node1 check: - node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + nodes[0].peerManager.peerStore.peers().anyIt(it.peerId == nodes[1].peerInfo.peerId) # Check connectedness check: - node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connectedness.Connected + nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connectedness.Connected - await allFutures([node1.stop(), node2.stop()]) + await allFutures(nodes.mapIt(it.stop())) asyncTest "Dialing fails gracefully": - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60810)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60812)) - peerInfo2 = node2.switch.peerInfo + # Create 2 nodes + let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))) + + await nodes[0].start() + await nodes[0].mountRelay() - await node1.start() # Purposefully don't start node2 - await node1.mountRelay() - await node2.mountRelay() - # Dial node2 from node1 - let connOpt = await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds) + let connOpt = await nodes[1].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 2.seconds) # Check connection failed gracefully check: connOpt.isNone() - await node1.stop() + await nodes[0].stop() asyncTest "Adding, selecting and filtering peers work": let - nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60820)) + node = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)) # Create filter peer filterLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet() filterKey = crypto.PrivateKey.random(ECDSA, rng[]).get() @@ -128,54 +116,100 @@ procSuite "Peer Manager": asyncTest "Peer manager keeps track of connections": - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60830)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60832)) - peerInfo2 = node2.switch.peerInfo + # Create 2 nodes + let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))) - await node1.start() + await nodes[0].start() + # Do not start node2 - await node1.mountRelay() - await node2.mountRelay() + await allFutures(nodes.mapIt(it.mountRelay())) # Test default connectedness for new peers - node1.peerManager.addPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec) + nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec) check: # No information about node2's connectedness - node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected + nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == NotConnected # Purposefully don't start node2 # Attempt dialing node2 from node1 - discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds) + discard await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 2.seconds) check: # Cannot connect to node2 - node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == CannotConnect + nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == CannotConnect # Successful connection - await node2.start() - discard await node1.peerManager.dialPeer(peerInfo2.toRemotePeerInfo(), WakuRelayCodec, 2.seconds) + await nodes[1].start() + discard await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 2.seconds) check: # Currently connected to node2 - node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected + nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connected # Stop node. Gracefully disconnect from all peers. - await node1.stop() + await nodes[0].stop() check: # Not currently connected to node2, but had recent, successful connection. - node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == CanConnect + nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == CanConnect - await node2.stop() + await nodes[1].stop() + + asyncTest "Peer manager updates failed peers correctly": + # Create 2 nodes + let nodes = toSeq(0..<2).mapIt(WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))) + + await nodes[0].start() + await nodes[0].mountRelay() + nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec) + + # Set a low backoff to speed up test: 2, 4, 8, 16 + nodes[0].peerManager.initialBackoffInSec = 2 + nodes[0].peerManager.backoffFactor = 2 + + # node2 is not started, so dialing will fail + let conn1 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 1.seconds) + check: + # Cannot connect to node2 + nodes[0].peerManager.peerStore.connectedness(nodes[1].peerInfo.peerId) == CannotConnect + nodes[0].peerManager.peerStore[ConnectionBook][nodes[1].peerInfo.peerId] == CannotConnect + nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] == 1 + + # And the connection failed + conn1.isNone() == true + + # Right after failing there is a backoff period + nodes[0].peerManager.peerStore.canBeConnected( + nodes[1].peerInfo.peerId, + nodes[0].peerManager.initialBackoffInSec, + nodes[0].peerManager.backoffFactor) == false + + # We wait the first backoff period + await sleepAsync(2100.milliseconds) + + # And backoff period is over + check: + nodes[0].peerManager.peerStore.canBeConnected( + nodes[1].peerInfo.peerId, + nodes[0].peerManager.initialBackoffInSec, + nodes[0].peerManager.backoffFactor) == true + + await nodes[1].start() + await nodes[1].mountRelay() + + # Now we can connect and failed count is reset + let conn2 = await nodes[0].peerManager.dialPeer(nodes[1].peerInfo.toRemotePeerInfo(), WakuRelayCodec, 1.seconds) + check: + conn2.isNone() == false + nodes[0].peerManager.peerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] == 0 + + await allFutures(nodes.mapIt(it.stop())) asyncTest "Peer manager can use persistent storage and survive restarts": let database = SqliteDatabase.new(":memory:")[] storage = WakuPeerStorage.new(database)[] nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60840), peerStorage = storage) + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60842)) + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(0)) peerInfo2 = node2.switch.peerInfo await node1.start() @@ -194,7 +228,7 @@ procSuite "Peer Manager": # Simulate restart by initialising a new node using the same storage let nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60844), peerStorage = storage) + node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage) await node3.start() check: @@ -219,9 +253,9 @@ procSuite "Peer Manager": database = SqliteDatabase.new(":memory:")[] storage = WakuPeerStorage.new(database)[] nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60850), peerStorage = storage) + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60852)) + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(0)) peerInfo2 = node2.switch.peerInfo betaCodec = "/vac/waku/relay/2.0.0-beta2" stableCodec = "/vac/waku/relay/2.0.0" @@ -245,7 +279,7 @@ procSuite "Peer Manager": # Simulate restart by initialising a new node using the same storage let nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60854), peerStorage = storage) + node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(0), peerStorage = storage) await node3.mountRelay() node3.wakuRelay.codec = stableCodec @@ -273,11 +307,7 @@ procSuite "Peer Manager": asyncTest "Peer manager connects to all peers supporting a given protocol": # Create 4 nodes - var nodes: seq[WakuNode] - for i in 0..<4: - let nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - let node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60860 + i)) - nodes &= node + let nodes = toSeq(0..<4).mapIt(WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))) # Start them await allFutures(nodes.mapIt(it.start())) @@ -317,11 +347,7 @@ procSuite "Peer Manager": asyncTest "Peer store keeps track of incoming connections": # Create 4 nodes - var nodes: seq[WakuNode] - for i in 0..<4: - let nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - let node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60865 + i)) - nodes &= node + let nodes = toSeq(0..<4).mapIt(WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))) # Start them await allFutures(nodes.mapIt(it.start())) diff --git a/tests/v2/test_peer_store_extended.nim b/tests/v2/test_peer_store_extended.nim index 431a51ca5..708f3919b 100644 --- a/tests/v2/test_peer_store_extended.nim +++ b/tests/v2/test_peer_store_extended.nim @@ -1,8 +1,10 @@ {.used.} import - std/[options,sequtils], + std/[options,sequtils, times], + chronos, libp2p/crypto/crypto, + libp2p/peerid, libp2p/peerstore, libp2p/multiaddress, testutils/unittests @@ -10,8 +12,7 @@ import ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/peer_manager/waku_peer_store, ../../waku/v2/node/waku_node, - ../test_helpers, - ./testlib/testutils + ../test_helpers suite "Extended nim-libp2p Peer Store": @@ -45,6 +46,8 @@ suite "Extended nim-libp2p Peer Store": peerStore[DisconnectBook][p1] = 0 peerStore[SourceBook][p1] = Discv5 peerStore[DirectionBook][p1] = Inbound + peerStore[NumberFailedConnBook][p1] = 1 + peerStore[LastFailedConnBook][p1] = Moment.init(1001, Second) # Peer2: Connected peerStore[AddressBook][p2] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/2").tryGet()] @@ -56,6 +59,8 @@ suite "Extended nim-libp2p Peer Store": peerStore[DisconnectBook][p2] = 0 peerStore[SourceBook][p2] = Discv5 peerStore[DirectionBook][p2] = Inbound + peerStore[NumberFailedConnBook][p2] = 2 + peerStore[LastFailedConnBook][p2] = Moment.init(1002, Second) # Peer3: Connected peerStore[AddressBook][p3] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()] @@ -67,6 +72,8 @@ suite "Extended nim-libp2p Peer Store": peerStore[DisconnectBook][p3] = 0 peerStore[SourceBook][p3] = Discv5 peerStore[DirectionBook][p3] = Inbound + peerStore[NumberFailedConnBook][p3] = 3 + peerStore[LastFailedConnBook][p3] = Moment.init(1003, Second) # Peer4: Added but never connected peerStore[AddressBook][p4] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/4").tryGet()] @@ -78,6 +85,8 @@ suite "Extended nim-libp2p Peer Store": peerStore[DisconnectBook][p4] = 0 peerStore[SourceBook][p4] = Discv5 peerStore[DirectionBook][p4] = Inbound + peerStore[NumberFailedConnBook][p4] = 4 + peerStore[LastFailedConnBook][p4] = Moment.init(1004, Second) # Peer5: Connecteed in the past peerStore[AddressBook][p5] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/5").tryGet()] @@ -89,6 +98,8 @@ suite "Extended nim-libp2p Peer Store": peerStore[DisconnectBook][p5] = 1000 peerStore[SourceBook][p5] = Discv5 peerStore[DirectionBook][p5] = Outbound + peerStore[NumberFailedConnBook][p5] = 5 + peerStore[LastFailedConnBook][p5] = Moment.init(1005, Second) test "get() returns the correct StoredInfo for a given PeerId": # When @@ -108,17 +119,21 @@ suite "Extended nim-libp2p Peer Store": storedInfoPeer1.connectedness == Connected storedInfoPeer1.disconnectTime == 0 storedInfoPeer1.origin == Discv5 + storedInfoPeer1.numberFailedConn == 1 + storedInfoPeer1.lastFailedConn == Moment.init(1001, Second) check: - # fields are empty + # fields are empty, not part of the peerstore storedInfoPeer6.peerId == p6 storedInfoPeer6.addrs.len == 0 storedInfoPeer6.protos.len == 0 - storedInfoPeer6.agent == "" - storedInfoPeer6.protoVersion == "" - storedInfoPeer6.connectedness == NotConnected - storedInfoPeer6.disconnectTime == 0 - storedInfoPeer6.origin == UnknownOrigin + storedInfoPeer6.agent == default(string) + storedInfoPeer6.protoVersion == default(string) + storedInfoPeer6.connectedness == default(Connectedness) + storedInfoPeer6.disconnectTime == default(int) + storedInfoPeer6.origin == default(PeerOrigin) + storedInfoPeer6.numberFailedConn == default(int) + storedInfoPeer6.lastFailedConn == default(Moment) test "peers() returns all StoredInfo of the PeerStore": # When @@ -146,6 +161,8 @@ suite "Extended nim-libp2p Peer Store": p3.connectedness == Connected p3.disconnectTime == 0 p3.origin == Discv5 + p3.numberFailedConn == 3 + p3.lastFailedConn == Moment.init(1003, Second) test "peers() returns all StoredInfo matching a specific protocol": # When @@ -280,3 +297,76 @@ suite "Extended nim-libp2p Peer Store": disconnedtedPeers.anyIt(it.peerId == p4) disconnedtedPeers.anyIt(it.peerId == p5) not disconnedtedPeers.anyIt(it.connectedness == Connected) + + test "del() successfully deletes waku custom books": + # Given + let peerStore = PeerStore.new(capacity = 5) + var p1: PeerId + require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1") + peerStore[AddressBook][p1] = @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()] + peerStore[ProtoBook][p1] = @["proto"] + peerStore[KeyBook][p1] = KeyPair.random(ECDSA, rng[]).tryGet().pubkey + peerStore[AgentBook][p1] = "agent" + peerStore[ProtoVersionBook][p1] = "version" + peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second) + peerStore[NumberFailedConnBook][p1] = 1 + peerStore[ConnectionBook][p1] = Connected + peerStore[DisconnectBook][p1] = 0 + peerStore[SourceBook][p1] = Discv5 + peerStore[DirectionBook][p1] = Inbound + + # When + peerStore.del(p1) + + # Then + check: + peerStore[AddressBook][p1] == newSeq[MultiAddress](0) + peerStore[ProtoBook][p1] == newSeq[string](0) + peerStore[KeyBook][p1] == default(PublicKey) + peerStore[AgentBook][p1] == "" + peerStore[ProtoVersionBook][p1] == "" + peerStore[LastFailedConnBook][p1] == default(Moment) + peerStore[NumberFailedConnBook][p1] == 0 + peerStore[ConnectionBook][p1] == default(Connectedness) + peerStore[DisconnectBook][p1] == 0 + peerStore[SourceBook][p1] == default(PeerOrigin) + peerStore[DirectionBook][p1] == default(PeerDirection) + + asyncTest "canBeConnected() returns correct value": + let peerStore = PeerStore.new(capacity = 5) + var p1: PeerId + require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1") + + # with InitialBackoffInSec = 1 backoffs are: 1, 2, 4, 8secs. + let initialBackoffInSec = 1 + let backoffFactor = 2 + + # new peer with no errors can be connected + check: + peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == true + + # peer with ONE error that just failed + peerStore[NumberFailedConnBook][p1] = 1 + peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second) + # we cant connect right now + check: + peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == false + + # but we can after the first backoff of 1 seconds + await sleepAsync(1200) + check: + peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == true + + # peer with TWO errors, we can connect until 2 seconds have passed + peerStore[NumberFailedConnBook][p1] = 2 + peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second) + + # cant be connected after 1 second + await sleepAsync(1000) + check: + peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == false + + # can be connected after 2 seconds + await sleepAsync(1200) + check: + peerStore.canBeConnected(p1, initialBackoffInSec, backoffFactor) == true diff --git a/waku/v2/node/discv5/waku_discv5.nim b/waku/v2/node/discv5/waku_discv5.nim index 6fe3167ad..6468e0295 100644 --- a/waku/v2/node/discv5/waku_discv5.nim +++ b/waku/v2/node/discv5/waku_discv5.nim @@ -95,9 +95,6 @@ proc findRandomPeers*(wakuDiscv5: WakuDiscoveryV5): Future[Result[seq[RemotePeer error "Failed to convert ENR to peer info", enr= $node.record, err=res.error() waku_discv5_errors.inc(labelValues = ["peer_info_failure"]) - if discoveredPeers.len > 0: - info "Successfully discovered nodes", count=discoveredPeers.len - waku_discv5_discovered.inc(discoveredPeers.len.int64) return ok(discoveredPeers) diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index 087962b59..9f70b197f 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -27,22 +27,34 @@ declarePublicGauge waku_connected_peers, "Number of connected peers per directio logScope: topics = "waku node peer_manager" -type - PeerManager* = ref object of RootObj - switch*: Switch - peerStore*: PeerStore - storage: PeerStorage - const # TODO: Make configurable DefaultDialTimeout = chronos.seconds(10) + # Max attempts before removing the peer + MaxFailedAttempts = 5 + + # Time to wait before attempting to dial again is calculated as: + # initialBackoffInSec*(backoffFactor^(failedAttempts-1)) + # 120s, 480s, 1920, 7680s + InitialBackoffInSec = 120 + BackoffFactor = 4 + # limit the amount of paralel dials MaxParalelDials = 10 # delay between consecutive relayConnectivityLoop runs ConnectivityLoopInterval = chronos.seconds(30) +type + PeerManager* = ref object of RootObj + switch*: Switch + peerStore*: PeerStore + initialBackoffInSec*: int + backoffFactor*: int + maxFailedAttempts*: int + storage: PeerStorage + #################### # Helper functions # #################### @@ -61,45 +73,56 @@ proc insertOrReplace(ps: PeerStorage, proc dialPeer(pm: PeerManager, peerId: PeerID, addrs: seq[MultiAddress], proto: string, dialTimeout = DefaultDialTimeout, - source = "api" + source = "api", ): Future[Option[Connection]] {.async.} = # Do not attempt to dial self if peerId == pm.switch.peerInfo.peerId: return none(Connection) - info "Dialing peer from manager", wireAddr = addrs, peerId = peerId + let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId] + debug "Dialing peer", wireAddr = addrs, peerId = peerId, failedAttempts=failedAttempts # Dial Peer let dialFut = pm.switch.dial(peerId, addrs, proto) + var reasonFailed = "" try: - # Attempt to dial remote peer - if (await dialFut.withTimeout(DefaultDialTimeout)): + if (await dialFut.withTimeout(dialTimeout)): waku_peers_dials.inc(labelValues = ["successful"]) # TODO: This will be populated from the peerstore info when ready waku_node_conns_initiated.inc(labelValues = [source]) + pm.peerStore[NumberFailedConnBook][peerId] = 0 return some(dialFut.read()) else: - # TODO: any redial attempts? - debug "Dialing remote peer timed out" - waku_peers_dials.inc(labelValues = ["timeout"]) + reasonFailed = "timeout" + await cancelAndWait(dialFut) + except CatchableError as exc: + reasonFailed = "failed" - pm.peerStore[ConnectionBook][peerId] = CannotConnect - if not pm.storage.isNil: - pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect) + # Dial failed + pm.peerStore[NumberFailedConnBook][peerId] = pm.peerStore[NumberFailedConnBook][peerId] + 1 + pm.peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second) + pm.peerStore[ConnectionBook][peerId] = CannotConnect - return none(Connection) - except CatchableError as e: - # TODO: any redial attempts? - debug "Dialing remote peer failed", msg = e.msg - waku_peers_dials.inc(labelValues = ["failed"]) + debug "Dialing peer failed", peerId = peerId, reason = reasonFailed, failedAttempts=failedAttempts + waku_peers_dials.inc(labelValues = [reasonFailed]) - pm.peerStore[ConnectionBook][peerId] = CannotConnect - if not pm.storage.isNil: - pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect) + # Update storage + if not pm.storage.isNil: + pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CannotConnect) - return none(Connection) + return none(Connection) + +# TODO: To be addressed in nwaku/pull/1473. Do not prune service peers +# TODO: Currently unused +proc prunePeerStore(pm: PeerManager) = + # iterate peers in peerstore + # skip service peers + #if pm.peerStore[NumberFailedConnBook][peerId] >= pm.maxFailedAttempts: + # debug "Removing peer from peer store", peerId = peerId, failedAttempts=failedAttempts + # pm.peerStore.del(peerId) + doAssert(false, "Not implemented!") proc loadFromStorage(pm: PeerManager) = debug "loading peers from storage" @@ -156,10 +179,19 @@ proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} = pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), CanConnect, getTime().toUnix) return -proc new*(T: type PeerManager, switch: Switch, storage: PeerStorage = nil): PeerManager = +proc new*(T: type PeerManager, + switch: Switch, + storage: PeerStorage = nil, + initialBackoffInSec = InitialBackoffInSec, + backoffFactor = BackoffFactor, + maxFailedAttempts = MaxFailedAttempts,): PeerManager = + let pm = PeerManager(switch: switch, peerStore: switch.peerStore, - storage: storage) + storage: storage, + initialBackoffInSec: initialBackoffInSec, + backoffFactor: backoffFactor, + maxFailedAttempts: maxFailedAttempts) proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} = onConnEvent(pm, peerId, event) @@ -248,7 +280,8 @@ proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, dialTimeout = DefaultDialTimeout, - source = "api"): Future[Option[Connection]] {.async.} = + source = "api", + ): Future[Option[Connection]] {.async.} = # Dial a given peer and add it to the list of known peers # TODO: check peer validity and score before continuing. Limit number of peers to be managed. @@ -263,7 +296,7 @@ proc dialPeer*(pm: PeerManager, peerId: PeerID, proto: string, dialTimeout = DefaultDialTimeout, - source = "api" + source = "api", ): Future[Option[Connection]] {.async.} = # Dial an existing peer by looking up it's existing addrs in the switch's peerStore # TODO: check peer validity and score before continuing. Limit number of peers to be managed. @@ -276,7 +309,10 @@ proc connectToNodes*(pm: PeerManager, proto: string, dialTimeout = DefaultDialTimeout, source = "api") {.async.} = - info "connectToNodes", len = nodes.len + if nodes.len == 0: + return + + info "Dialing multiple peers", numOfPeers = nodes.len var futConns: seq[Future[Option[Connection]]] for node in nodes: @@ -285,6 +321,9 @@ proc connectToNodes*(pm: PeerManager, futConns.add(pm.dialPeer(RemotePeerInfo(node), proto, dialTimeout, source)) await allFutures(futConns) + let successfulConns = futConns.mapIt(it.read()).countIt(it.isSome) + + info "Finished dialing multiple peers", successfulConns=successfulConns, attempted=nodes.len # The issue seems to be around peers not being fully connected when # trying to subscribe. So what we do is sleep to guarantee nodes are @@ -310,17 +349,18 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} = await sleepAsync(ConnectivityLoopInterval) continue - # TODO: Respect backoff before attempting to connect a relay peer - var disconnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs)) - shuffle(disconnectedPeers) - - let numPeersToConnect = min(min(maxConnections - numConPeers, disconnectedPeers.len), MaxParalelDials) + let notConnectedPeers = pm.peerStore.getNotConnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs)) + let outsideBackoffPeers = notConnectedPeers.filterIt(pm.peerStore.canBeConnected(it.peerId, + pm.initialBackoffInSec, + pm.backoffFactor)) + let numPeersToConnect = min(min(maxConnections - numConPeers, outsideBackoffPeers.len), MaxParalelDials) info "Relay connectivity loop", connectedPeers = numConPeers, targetConnectedPeers = maxConnections, - availableDisconnectedPeers = disconnectedPeers.len + notConnectedPeers = notConnectedPeers.len, + outsideBackoffPeers = outsideBackoffPeers.len - await pm.connectToNodes(disconnectedPeers[0..= (lastFailed + backoff): + return true + return false + +proc delete*(peerStore: PeerStore, + peerId: PeerId) = + # Delete all the information of a given peer. + peerStore.del(peerId) + proc get*(peerStore: PeerStore, peerId: PeerID): StoredInfo = ## Get the stored information of a given peer. @@ -85,6 +119,8 @@ proc get*(peerStore: PeerStore, disconnectTime: peerStore[DisconnectBook][peerId], origin: peerStore[SourceBook][peerId], direction: peerStore[DirectionBook][peerId], + lastFailedConn: peerStore[LastFailedConnBook][peerId], + numberFailedConn: peerStore[NumberFailedConnBook][peerId] ) # TODO: Rename peers() to getPeersByProtocol() @@ -141,7 +177,7 @@ proc selectPeer*(peerStore: PeerStore, proto: string): Option[RemotePeerInfo] = else: return none(RemotePeerInfo) -proc getPeersByDirection*(peerStore: PeerStore, direction: Direction): seq[StoredInfo] = +proc getPeersByDirection*(peerStore: PeerStore, direction: PeerDirection): seq[StoredInfo] = return peerStore.peers.filterIt(it.direction == direction) proc getNotConnectedPeers*(peerStore: PeerStore): seq[StoredInfo] = diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index 2e4c7c193..41d6f4177 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -879,17 +879,12 @@ proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} = .filterIt(it.connectedness == Connected) .mapIt(it.toRemotePeerInfo()) - # Attempt to retrieve and ping the active outgoing connection for each peer for peer in peers: - let connOpt = await node.peerManager.dialPeer(peer, PingCodec) - - if connOpt.isNone(): - # TODO: more sophisticated error handling here - debug "failed to connect to remote peer", peer=peer + try: + let conn = await node.switch.dial(peer.peerId, peer.addrs, PingCodec) + let pingDelay = await node.libp2pPing.ping(conn) + except CatchableError as exc: waku_node_errors.inc(labelValues = ["keep_alive_failure"]) - return - - discard await node.libp2pPing.ping(connOpt.get()) # Ping connection await sleepAsync(keepalive) @@ -911,16 +906,15 @@ proc runDiscv5Loop(node: WakuNode) {.async.} = while node.wakuDiscv5.listening: trace "Running discovery loop" - ## Query for a random target and collect all discovered nodes - ## TODO: we could filter nodes here - let discoveredPeers = await node.wakuDiscv5.findRandomPeers() - if discoveredPeers.isOk(): - ## Let's attempt to connect to peers we - ## have not encountered before + let discoveredPeersRes = await node.wakuDiscv5.findRandomPeers() - trace "Discovered peers", count=discoveredPeers.get().len() + if discoveredPeersRes.isOk: + let discoveredPeers = discoveredPeersRes.get + let newSeen = discoveredPeers.countIt(not node.peerManager.peerStore[AddressBook].contains(it.peerId)) + info "Discovered peers", discovered=discoveredPeers.len, new=newSeen - for peer in discoveredPeers.get(): + # Add all peers, new ones and already seen (in case their addresses changed) + for peer in discoveredPeers: # TODO: proto: WakuRelayCodec will be removed from add peer node.peerManager.addPeer(peer, WakuRelayCodec)