diff --git a/tests/test_waku_keepalive.nim b/tests/test_waku_keepalive.nim index 872994b45..aebee13dc 100644 --- a/tests/test_waku_keepalive.nim +++ b/tests/test_waku_keepalive.nim @@ -44,7 +44,7 @@ suite "Waku Keepalive": await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - node1.startKeepalive() + node1.startKeepalive(2.seconds) check: (await completionFut.withTimeout(5.seconds)) == true diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index d4ac364f3..009cceceb 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -422,7 +422,11 @@ proc startNode*( desiredOutDegree = node.wakuRelay.parameters.d.uint64() (await node.fetchPeerExchangePeers(desiredOutDegree)).isOkOr: error "error while fetching peers from peer exchange", error = error - quit(QuitFailure) + + # Use px to periodically get peers if discv5 is disabled, as discv5 nodes have their own + # periodic loop to find peers and px returned peers actually come from discv5 + if conf.peerExchange and not conf.discv5Discovery: + node.startPeerExchangeLoop() # Start keepalive, if enabled if conf.keepAlive: diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 7f003b0b9..411af6d32 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1146,7 +1146,7 @@ proc mountPeerExchange*( error "failed to mount wakuPeerExchange", error = getCurrentExceptionMsg() proc fetchPeerExchangePeers*( - node: Wakunode, amount: uint64 + node: Wakunode, amount = DefaultPXNumPeersReq ): Future[Result[int, PeerExchangeResponseStatus]] {.async: (raises: []).} = if node.wakuPeerExchange.isNil(): error "could not get peers from px, waku peer-exchange is nil" @@ -1175,6 +1175,20 @@ proc fetchPeerExchangePeers*( error = pxPeersRes.error return err(pxPeersRes.error) +proc peerExchangeLoop(node: WakuNode) {.async.} = + while true: + await sleepAsync(1.minutes) + if not node.started: + continue + (await node.fetchPeerExchangePeers()).isOkOr: + warn "Cannot fetch peers from peer exchange", cause = error + +proc startPeerExchangeLoop*(node: WakuNode) = + if node.wakuPeerExchange.isNil(): + error "startPeerExchangeLoop: Peer Exchange is not mounted" + return + node.wakuPeerExchange.pxLoopHandle = node.peerExchangeLoop() + # TODO: Move to application module (e.g., wakunode2.nim) proc setPeerExchangePeer*( node: WakuNode, peer: RemotePeerInfo | MultiAddress | string @@ -1217,7 +1231,11 @@ proc mountLibp2pPing*(node: WakuNode) {.async: (raises: []).} = # TODO: Move this logic to PeerManager proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} = - while node.started: + while true: + await sleepAsync(keepalive) + if not node.started: + continue + # Keep connected peers alive while running # Each node is responsible of keeping its outgoing connections alive trace "Running keepalive" @@ -1235,14 +1253,11 @@ proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} = except CatchableError as exc: waku_node_errors.inc(labelValues = ["keep_alive_failure"]) - await sleepAsync(keepalive) +# 2 minutes default - 20% of the default chronosstream timeout duration +proc startKeepalive*(node: WakuNode, keepalive = 2.minutes) = + info "starting keepalive", keepalive = keepalive -proc startKeepalive*(node: WakuNode) = - let defaultKeepalive = 2.minutes # 20% of the default chronosstream timeout duration - - info "starting keepalive", keepalive = defaultKeepalive - - asyncSpawn node.keepaliveLoop(defaultKeepalive) + asyncSpawn node.keepaliveLoop(keepalive) proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} = info "mounting rendezvous discovery protocol" @@ -1372,6 +1387,9 @@ proc stop*(node: WakuNode) {.async.} = if not node.wakuStoreResume.isNil(): await node.wakuStoreResume.stopWait() + if not node.wakuPeerExchange.isNil() and not node.wakuPeerExchange.pxLoopHandle.isNil(): + await node.wakuPeerExchange.pxLoopHandle.cancelAndWait() + node.started = false proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} = diff --git a/waku/waku_peer_exchange/protocol.nim b/waku/waku_peer_exchange/protocol.nim index a10d5ef2c..556aa6d3b 100644 --- a/waku/waku_peer_exchange/protocol.nim +++ b/waku/waku_peer_exchange/protocol.nim @@ -57,6 +57,7 @@ type cluster*: Option[uint16] # todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/ requestRateLimiter*: RequestRateLimiter + pxLoopHandle*: Future[void] proc request*( wpx: WakuPeerExchange, numPeers = DefaultPXNumPeersReq, conn: Connection