diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 1ba599d78..d18d35674 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -590,9 +590,6 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = await chat.readWriteLoop() - if conf.keepAlive: - node.startKeepalive() - runForever() proc main(rng: ref HmacDrbgContext) {.async.} = diff --git a/library/waku_thread_requests/requests/peer_manager_request.nim b/library/waku_thread_requests/requests/peer_manager_request.nim index 1acc78595..a7e643a21 100644 --- a/library/waku_thread_requests/requests/peer_manager_request.nim +++ b/library/waku_thread_requests/requests/peer_manager_request.nim @@ -122,14 +122,7 @@ proc process*( await waku.node.peerManager.disconnectNode(peerId) return ok("") of DISCONNECT_ALL_PEERS: - let connectedPeers = waku.node.peerManager.switch.peerStore.peers().filterIt( - it.connectedness == Connected - ) - - var futs: seq[Future[void]] - for peer in connectedPeers: - futs.add(waku.node.peerManager.disconnectNode(peer)) - await allFutures(futs) + await waku.node.peerManager.disconnectAllPeers() return ok("") of DIAL_PEER: let remotePeerInfo = parsePeerInfo($self[].peerMultiAddr).valueOr: diff --git a/tests/test_waku_keepalive.nim b/tests/test_waku_keepalive.nim index 3fcf01b8e..f6a9e631b 100644 --- a/tests/test_waku_keepalive.nim +++ b/tests/test_waku_keepalive.nim @@ -44,7 +44,10 @@ suite "Waku Keepalive": await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - node1.startKeepalive(2.seconds) + let healthMonitor = NodeHealthMonitor() + healthMonitor.setNodeToHealthMonitor(node1) + healthMonitor.startKeepalive(2.seconds).isOkOr: + assert false, "Failed to start keepalive" check: (await completionFut.withTimeout(5.seconds)) == true diff --git a/waku/factory/conf_builder/waku_conf_builder.nim b/waku/factory/conf_builder/waku_conf_builder.nim index 46e303e70..5a3abbba4 100644 --- a/waku/factory/conf_builder/waku_conf_builder.nim +++ b/waku/factory/conf_builder/waku_conf_builder.nim @@ -255,9 +255,6 @@ proc withRelayShardedPeerManagement*( ) = b.relayShardedPeerManagement = some(relayShardedPeerManagement) -proc withKeepAlive*(b: var WakuConfBuilder, keepAlive: bool) = - b.keepAlive = some(keepAlive) - proc withP2pReliability*(b: var WakuConfBuilder, p2pReliability: bool) = b.p2pReliability = some(p2pReliability) diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 190ce46e7..ecf57afd7 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -314,12 +314,6 @@ hence would have reachability issues.""", name: "staticnode" .}: seq[string] - keepAlive* {. - desc: "Enable keep-alive for idle connections: true|false", - defaultValue: false, - name: "keep-alive" - .}: bool - # TODO: This is trying to do too much, this should only be used for autosharding, which itself should be configurable # If numShardsInNetwork is not set, we use the number of shards configured as numShardsInNetwork numShardsInNetwork* {. @@ -951,7 +945,6 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] = b.withRelayPeerExchange(n.relayPeerExchange) b.withRelayShardedPeerManagement(n.relayShardedPeerManagement) b.withStaticNodes(n.staticNodes) - b.withKeepAlive(n.keepAlive) if n.numShardsInNetwork != 0: b.withNumShardsInNetwork(n.numShardsInNetwork) diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 2c363c6c4..5298fa2b9 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -462,10 +462,6 @@ proc startNode*( if conf.peerExchange and not conf.discv5Conf.isSome(): node.startPeerExchangeLoop() - # Start keepalive, if enabled - if conf.keepAlive: - node.startKeepalive() - # Maintain relay connections if conf.relay: node.peerManager.start() diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 2602120d8..faca627a4 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -401,7 +401,8 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} = waku[].deliveryMonitor.startDeliveryMonitor() ## Health Monitor - waku[].healthMonitor.startHealthMonitor() + waku[].healthMonitor.startHealthMonitor().isOkOr: + return err("failed to start health monitor: " & $error) if conf.restServerConf.isSome(): rest_server_builder.startRestServerProtocolSupport( diff --git a/waku/node/health_monitor/node_health_monitor.nim b/waku/node/health_monitor/node_health_monitor.nim index b13925d66..fa31c0529 100644 --- a/waku/node/health_monitor/node_health_monitor.nim +++ b/waku/node/health_monitor/node_health_monitor.nim @@ -1,6 +1,10 @@ {.push raises: [].} -import std/[options, sets, strformat], chronos, chronicles, libp2p/protocols/rendezvous +import + std/[options, sets, strformat, random, sequtils], + chronos, + chronicles, + libp2p/protocols/rendezvous import ../waku_node, @@ -13,6 +17,10 @@ import ## This module is aimed to check the state of the "self" Waku Node +# randomize initializes sdt/random's random number generator +# if not called, the outcome of randomization procedures will be the same in every run +randomize() + type HealthReport* = object nodeHealth*: HealthStatus @@ -22,6 +30,7 @@ type nodeHealth: HealthStatus node: WakuNode onlineMonitor*: OnlineMonitor + keepAliveFut: Future[void] template checkWakuNodeNotNil(node: WakuNode, p: ProtocolHealth): untyped = if node.isNil(): @@ -224,6 +233,145 @@ proc getRendezvousHealth(hm: NodeHealthMonitor): ProtocolHealth = return p.ready() +proc selectRandomPeersForKeepalive( + node: WakuNode, outPeers: seq[PeerId], numRandomPeers: int +): Future[seq[PeerId]] {.async.} = + ## Select peers for random keepalive, prioritizing mesh peers + + if node.wakuRelay.isNil(): + return selectRandomPeers(outPeers, numRandomPeers) + + let meshPeers = node.wakuRelay.getPeersInMesh().valueOr: + error "Failed getting peers in mesh for ping", error = error + # Fallback to random selection from all outgoing peers + return selectRandomPeers(outPeers, numRandomPeers) + + trace "Mesh peers for keepalive", meshPeers = meshPeers + + # Get non-mesh peers and shuffle them + var nonMeshPeers = outPeers.filterIt(it notin meshPeers) + shuffle(nonMeshPeers) + + # Combine mesh peers + random non-mesh peers up to numRandomPeers total + let numNonMeshPeers = max(0, numRandomPeers - len(meshPeers)) + let selectedNonMeshPeers = nonMeshPeers[0 ..< min(len(nonMeshPeers), numNonMeshPeers)] + + let selectedPeers = meshPeers & selectedNonMeshPeers + trace "Selected peers for keepalive", selected = selectedPeers + return selectedPeers + +proc keepAliveLoop( + node: WakuNode, + randomPeersKeepalive: chronos.Duration, + allPeersKeepAlive: chronos.Duration, + numRandomPeers = 10, +) {.async.} = + # Calculate how many random peer cycles before pinging all peers + let randomToAllRatio = + int(allPeersKeepAlive.seconds() / randomPeersKeepalive.seconds()) + var countdownToPingAll = max(0, randomToAllRatio - 1) + + # Sleep detection configuration + let sleepDetectionInterval = 3 * randomPeersKeepalive + + # Failure tracking + var consecutiveIterationFailures = 0 + const maxAllowedConsecutiveFailures = 2 + + var lastTimeExecuted = Moment.now() + + while true: + trace "Running keepalive loop" + await sleepAsync(randomPeersKeepalive) + + if not node.started: + continue + + let currentTime = Moment.now() + + # Check for sleep detection + if currentTime - lastTimeExecuted > sleepDetectionInterval: + warn "Keep alive hasn't been executed recently. Killing all connections" + await node.peerManager.disconnectAllPeers() + lastTimeExecuted = currentTime + consecutiveIterationFailures = 0 + continue + + # Check for consecutive failures + if consecutiveIterationFailures > maxAllowedConsecutiveFailures: + warn "Too many consecutive ping failures, node likely disconnected. Killing all connections", + consecutiveIterationFailures, maxAllowedConsecutiveFailures + await node.peerManager.disconnectAllPeers() + consecutiveIterationFailures = 0 + lastTimeExecuted = currentTime + continue + + # Determine which peers to ping + let outPeers = node.peerManager.connectedPeers()[1] + let peersToPing = + if countdownToPingAll > 0: + await selectRandomPeersForKeepalive(node, outPeers, numRandomPeers) + else: + outPeers + + let numPeersToPing = len(peersToPing) + + if countdownToPingAll > 0: + trace "Pinging random peers", + count = numPeersToPing, countdownToPingAll = countdownToPingAll + countdownToPingAll.dec() + else: + trace "Pinging all peers", count = numPeersToPing + countdownToPingAll = max(0, randomToAllRatio - 1) + + # Execute keepalive pings + let successfulPings = await parallelPings(node, peersToPing) + + if successfulPings != numPeersToPing: + waku_node_errors.inc( + amount = numPeersToPing - successfulPings, labelValues = ["keep_alive_failure"] + ) + + trace "Keepalive results", + attemptedPings = numPeersToPing, successfulPings = successfulPings + + # Update failure tracking + if numPeersToPing > 0 and successfulPings == 0: + consecutiveIterationFailures.inc() + error "All pings failed", consecutiveFailures = consecutiveIterationFailures + else: + consecutiveIterationFailures = 0 + + lastTimeExecuted = currentTime + +# 2 minutes default - 20% of the default chronosstream timeout duration +proc startKeepalive*( + hm: NodeHealthMonitor, + randomPeersKeepalive = 10.seconds, + allPeersKeepalive = 2.minutes, +): Result[void, string] = + # Validate input parameters + if randomPeersKeepalive.isZero() or allPeersKeepAlive.isZero(): + error "startKeepalive: allPeersKeepAlive and randomPeersKeepalive must be greater than 0", + randomPeersKeepalive = $randomPeersKeepalive, + allPeersKeepAlive = $allPeersKeepAlive + return err( + "startKeepalive: allPeersKeepAlive and randomPeersKeepalive must be greater than 0" + ) + + if allPeersKeepAlive < randomPeersKeepalive: + error "startKeepalive: allPeersKeepAlive can't be less than randomPeersKeepalive", + allPeersKeepAlive = $allPeersKeepAlive, + randomPeersKeepalive = $randomPeersKeepalive + return + err("startKeepalive: allPeersKeepAlive can't be less than randomPeersKeepalive") + + info "starting keepalive", + randomPeersKeepalive = randomPeersKeepalive, allPeersKeepalive = allPeersKeepalive + + hm.keepAliveFut = hm.node.keepAliveLoop(randomPeersKeepalive, allPeersKeepalive) + return ok() + proc getNodeHealthReport*(hm: NodeHealthMonitor): Future[HealthReport] {.async.} = var report: HealthReport report.nodeHealth = hm.nodeHealth @@ -253,11 +401,15 @@ proc setNodeToHealthMonitor*(hm: NodeHealthMonitor, node: WakuNode) = proc setOverallHealth*(hm: NodeHealthMonitor, health: HealthStatus) = hm.nodeHealth = health -proc startHealthMonitor*(hm: NodeHealthMonitor) = +proc startHealthMonitor*(hm: NodeHealthMonitor): Result[void, string] = hm.onlineMonitor.startOnlineMonitor() + hm.startKeepalive().isOkOr: + return err("startHealthMonitor: failed starting keep alive: " & error) + return ok() proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} = await hm.onlineMonitor.stopOnlineMonitor() + await hm.keepAliveFut.cancelAndWait() proc new*( T: type NodeHealthMonitor, diff --git a/waku/node/health_monitor/online_monitor.nim b/waku/node/health_monitor/online_monitor.nim index f3a3013e2..27bd53bc3 100644 --- a/waku/node/health_monitor/online_monitor.nim +++ b/waku/node/health_monitor/online_monitor.nim @@ -53,7 +53,7 @@ proc networkConnectivityLoop(self: OnlineMonitor): Future[void] {.async.} = ## and triggers any change that depends on the network connectivity state while true: await self.updateOnlineState() - await sleepAsync(15.seconds) + await sleepAsync(5.seconds) proc startOnlineMonitor*(self: OnlineMonitor) = self.networkConnLoopHandle = self.networkConnectivityLoop() diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 0a19d5b2c..7deff0593 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -501,6 +501,13 @@ proc connectedPeers*( return (inPeers, outPeers) +proc disconnectAllPeers*(pm: PeerManager) {.async.} = + let (inPeerIds, outPeerIds) = pm.connectedPeers() + let connectedPeers = concat(inPeerIds, outPeerIds) + + let futs = connectedPeers.mapIt(pm.disconnectNode(it)) + await allFutures(futs) + proc getStreamByPeerIdAndProtocol*( pm: PeerManager, peerId: PeerId, protocol: string ): Future[Result[Connection, string]] {.async.} = diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index ac72f3e37..6a5c3fdb0 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1,7 +1,7 @@ {.push raises: [].} import - std/[hashes, options, sugar, tables, strutils, sequtils, os, net], + std/[hashes, options, sugar, tables, strutils, sequtils, os, net, random], chronos, chronicles, metrics, @@ -69,6 +69,10 @@ declarePublicGauge waku_px_peers, logScope: topics = "waku node" +# randomize initializes sdt/random's random number generator +# if not called, the outcome of randomization procedures will be the same in every run +randomize() + # TODO: Move to application instance (e.g., `WakuNode2`) # Git version in git describe format (defined compile time) const git_version* {.strdefine.} = "n/a" @@ -1325,35 +1329,60 @@ proc mountLibp2pPing*(node: WakuNode) {.async: (raises: []).} = except LPError: error "failed to mount libp2pPing", error = getCurrentExceptionMsg() -# TODO: Move this logic to PeerManager -proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} = - while true: - await sleepAsync(keepalive) - if not node.started: +proc pingPeer(node: WakuNode, peerId: PeerId): Future[Result[void, string]] {.async.} = + ## Ping a single peer and return the result + + try: + # Establish a stream + let stream = (await node.peerManager.dialPeer(peerId, PingCodec)).valueOr: + error "pingPeer: failed dialing peer", peerId = peerId + return err("pingPeer failed dialing peer peerId: " & $peerId) + defer: + # Always close the stream + try: + await stream.close() + except CatchableError as e: + debug "Error closing ping connection", peerId = peerId, error = e.msg + + # Perform ping + let pingDuration = await node.libp2pPing.ping(stream) + + trace "Ping successful", peerId = peerId, duration = pingDuration + return ok() + except CatchableError as e: + error "pingPeer: exception raised pinging peer", peerId = peerId, error = e.msg + return err("pingPeer: exception raised pinging peer: " & e.msg) + +proc selectRandomPeers*(peers: seq[PeerId], numRandomPeers: int): seq[PeerId] = + var randomPeers = peers + shuffle(randomPeers) + return randomPeers[0 ..< min(len(randomPeers), numRandomPeers)] + +# Returns the number of succesful pings performed +proc parallelPings*(node: WakuNode, peerIds: seq[PeerId]): Future[int] {.async.} = + if len(peerIds) == 0: + return 0 + + var pingFuts: seq[Future[Result[void, string]]] + + # Create ping futures for each peer + for i, peerId in peerIds: + let fut = pingPeer(node, peerId) + pingFuts.add(fut) + + # Wait for all pings to complete + discard await allFutures(pingFuts).withTimeout(5.seconds) + + var successCount = 0 + for fut in pingFuts: + if not fut.completed() or fut.failed(): continue - # Keep connected peers alive while running - # Each node is responsible of keeping its outgoing connections alive - trace "Running keepalive" + let res = fut.read() + if res.isOk(): + successCount.inc() - # First get a list of connected peer infos - let outPeers = node.peerManager.connectedPeers()[1] - - for peerId in outPeers: - try: - let conn = (await node.peerManager.dialPeer(peerId, PingCodec)).valueOr: - warn "Failed dialing peer for keep alive", peerId = peerId - continue - let pingDelay = await node.libp2pPing.ping(conn) - await conn.close() - except CatchableError as exc: - waku_node_errors.inc(labelValues = ["keep_alive_failure"]) - -# 2 minutes default - 20% of the default chronosstream timeout duration -proc startKeepalive*(node: WakuNode, keepalive = 2.minutes) = - info "starting keepalive", keepalive = keepalive - - asyncSpawn node.keepaliveLoop(keepalive) + return successCount proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} = info "mounting rendezvous discovery protocol" diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index c87519b06..18d60dcef 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -332,6 +332,13 @@ proc getPubSubPeersInMesh*( ## Returns the list of PubSubPeers in a mesh defined by the passed pubsub topic. ## The 'mesh' atribute is defined in the GossipSub ref object. + # If pubsubTopic is empty, we return all peers in mesh for any pubsub topic + if pubsubTopic == "": + var allPeers = initHashSet[PubSubPeer]() + for topic, topicMesh in w.mesh.pairs: + allPeers = allPeers.union(topicMesh) + return ok(allPeers) + if not w.mesh.hasKey(pubsubTopic): debug "getPubSubPeersInMesh - there is no mesh peer for the given pubsub topic", pubsubTopic = pubsubTopic @@ -348,7 +355,7 @@ proc getPubSubPeersInMesh*( return ok(peers) proc getPeersInMesh*( - w: WakuRelay, pubsubTopic: PubsubTopic + w: WakuRelay, pubsubTopic: PubsubTopic = "" ): Result[seq[PeerId], string] = ## Returns the list of peerIds in a mesh defined by the passed pubsub topic. ## The 'mesh' atribute is defined in the GossipSub ref object.