mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-03 14:33:12 +00:00
chore: improve keep alive (#3458)
This commit is contained in:
parent
edf416f9e0
commit
d820976eaf
@ -590,9 +590,6 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
||||
|
||||
await chat.readWriteLoop()
|
||||
|
||||
if conf.keepAlive:
|
||||
node.startKeepalive()
|
||||
|
||||
runForever()
|
||||
|
||||
proc main(rng: ref HmacDrbgContext) {.async.} =
|
||||
|
||||
@ -122,14 +122,7 @@ proc process*(
|
||||
await waku.node.peerManager.disconnectNode(peerId)
|
||||
return ok("")
|
||||
of DISCONNECT_ALL_PEERS:
|
||||
let connectedPeers = waku.node.peerManager.switch.peerStore.peers().filterIt(
|
||||
it.connectedness == Connected
|
||||
)
|
||||
|
||||
var futs: seq[Future[void]]
|
||||
for peer in connectedPeers:
|
||||
futs.add(waku.node.peerManager.disconnectNode(peer))
|
||||
await allFutures(futs)
|
||||
await waku.node.peerManager.disconnectAllPeers()
|
||||
return ok("")
|
||||
of DIAL_PEER:
|
||||
let remotePeerInfo = parsePeerInfo($self[].peerMultiAddr).valueOr:
|
||||
|
||||
@ -44,7 +44,10 @@ suite "Waku Keepalive":
|
||||
|
||||
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
||||
|
||||
node1.startKeepalive(2.seconds)
|
||||
let healthMonitor = NodeHealthMonitor()
|
||||
healthMonitor.setNodeToHealthMonitor(node1)
|
||||
healthMonitor.startKeepalive(2.seconds).isOkOr:
|
||||
assert false, "Failed to start keepalive"
|
||||
|
||||
check:
|
||||
(await completionFut.withTimeout(5.seconds)) == true
|
||||
|
||||
@ -255,9 +255,6 @@ proc withRelayShardedPeerManagement*(
|
||||
) =
|
||||
b.relayShardedPeerManagement = some(relayShardedPeerManagement)
|
||||
|
||||
proc withKeepAlive*(b: var WakuConfBuilder, keepAlive: bool) =
|
||||
b.keepAlive = some(keepAlive)
|
||||
|
||||
proc withP2pReliability*(b: var WakuConfBuilder, p2pReliability: bool) =
|
||||
b.p2pReliability = some(p2pReliability)
|
||||
|
||||
|
||||
@ -314,12 +314,6 @@ hence would have reachability issues.""",
|
||||
name: "staticnode"
|
||||
.}: seq[string]
|
||||
|
||||
keepAlive* {.
|
||||
desc: "Enable keep-alive for idle connections: true|false",
|
||||
defaultValue: false,
|
||||
name: "keep-alive"
|
||||
.}: bool
|
||||
|
||||
# TODO: This is trying to do too much, this should only be used for autosharding, which itself should be configurable
|
||||
# If numShardsInNetwork is not set, we use the number of shards configured as numShardsInNetwork
|
||||
numShardsInNetwork* {.
|
||||
@ -951,7 +945,6 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
|
||||
b.withRelayPeerExchange(n.relayPeerExchange)
|
||||
b.withRelayShardedPeerManagement(n.relayShardedPeerManagement)
|
||||
b.withStaticNodes(n.staticNodes)
|
||||
b.withKeepAlive(n.keepAlive)
|
||||
|
||||
if n.numShardsInNetwork != 0:
|
||||
b.withNumShardsInNetwork(n.numShardsInNetwork)
|
||||
|
||||
@ -462,10 +462,6 @@ proc startNode*(
|
||||
if conf.peerExchange and not conf.discv5Conf.isSome():
|
||||
node.startPeerExchangeLoop()
|
||||
|
||||
# Start keepalive, if enabled
|
||||
if conf.keepAlive:
|
||||
node.startKeepalive()
|
||||
|
||||
# Maintain relay connections
|
||||
if conf.relay:
|
||||
node.peerManager.start()
|
||||
|
||||
@ -401,7 +401,8 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
|
||||
waku[].deliveryMonitor.startDeliveryMonitor()
|
||||
|
||||
## Health Monitor
|
||||
waku[].healthMonitor.startHealthMonitor()
|
||||
waku[].healthMonitor.startHealthMonitor().isOkOr:
|
||||
return err("failed to start health monitor: " & $error)
|
||||
|
||||
if conf.restServerConf.isSome():
|
||||
rest_server_builder.startRestServerProtocolSupport(
|
||||
|
||||
@ -1,6 +1,10 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[options, sets, strformat], chronos, chronicles, libp2p/protocols/rendezvous
|
||||
import
|
||||
std/[options, sets, strformat, random, sequtils],
|
||||
chronos,
|
||||
chronicles,
|
||||
libp2p/protocols/rendezvous
|
||||
|
||||
import
|
||||
../waku_node,
|
||||
@ -13,6 +17,10 @@ import
|
||||
|
||||
## This module is aimed to check the state of the "self" Waku Node
|
||||
|
||||
# randomize initializes sdt/random's random number generator
|
||||
# if not called, the outcome of randomization procedures will be the same in every run
|
||||
randomize()
|
||||
|
||||
type
|
||||
HealthReport* = object
|
||||
nodeHealth*: HealthStatus
|
||||
@ -22,6 +30,7 @@ type
|
||||
nodeHealth: HealthStatus
|
||||
node: WakuNode
|
||||
onlineMonitor*: OnlineMonitor
|
||||
keepAliveFut: Future[void]
|
||||
|
||||
template checkWakuNodeNotNil(node: WakuNode, p: ProtocolHealth): untyped =
|
||||
if node.isNil():
|
||||
@ -224,6 +233,145 @@ proc getRendezvousHealth(hm: NodeHealthMonitor): ProtocolHealth =
|
||||
|
||||
return p.ready()
|
||||
|
||||
proc selectRandomPeersForKeepalive(
|
||||
node: WakuNode, outPeers: seq[PeerId], numRandomPeers: int
|
||||
): Future[seq[PeerId]] {.async.} =
|
||||
## Select peers for random keepalive, prioritizing mesh peers
|
||||
|
||||
if node.wakuRelay.isNil():
|
||||
return selectRandomPeers(outPeers, numRandomPeers)
|
||||
|
||||
let meshPeers = node.wakuRelay.getPeersInMesh().valueOr:
|
||||
error "Failed getting peers in mesh for ping", error = error
|
||||
# Fallback to random selection from all outgoing peers
|
||||
return selectRandomPeers(outPeers, numRandomPeers)
|
||||
|
||||
trace "Mesh peers for keepalive", meshPeers = meshPeers
|
||||
|
||||
# Get non-mesh peers and shuffle them
|
||||
var nonMeshPeers = outPeers.filterIt(it notin meshPeers)
|
||||
shuffle(nonMeshPeers)
|
||||
|
||||
# Combine mesh peers + random non-mesh peers up to numRandomPeers total
|
||||
let numNonMeshPeers = max(0, numRandomPeers - len(meshPeers))
|
||||
let selectedNonMeshPeers = nonMeshPeers[0 ..< min(len(nonMeshPeers), numNonMeshPeers)]
|
||||
|
||||
let selectedPeers = meshPeers & selectedNonMeshPeers
|
||||
trace "Selected peers for keepalive", selected = selectedPeers
|
||||
return selectedPeers
|
||||
|
||||
proc keepAliveLoop(
|
||||
node: WakuNode,
|
||||
randomPeersKeepalive: chronos.Duration,
|
||||
allPeersKeepAlive: chronos.Duration,
|
||||
numRandomPeers = 10,
|
||||
) {.async.} =
|
||||
# Calculate how many random peer cycles before pinging all peers
|
||||
let randomToAllRatio =
|
||||
int(allPeersKeepAlive.seconds() / randomPeersKeepalive.seconds())
|
||||
var countdownToPingAll = max(0, randomToAllRatio - 1)
|
||||
|
||||
# Sleep detection configuration
|
||||
let sleepDetectionInterval = 3 * randomPeersKeepalive
|
||||
|
||||
# Failure tracking
|
||||
var consecutiveIterationFailures = 0
|
||||
const maxAllowedConsecutiveFailures = 2
|
||||
|
||||
var lastTimeExecuted = Moment.now()
|
||||
|
||||
while true:
|
||||
trace "Running keepalive loop"
|
||||
await sleepAsync(randomPeersKeepalive)
|
||||
|
||||
if not node.started:
|
||||
continue
|
||||
|
||||
let currentTime = Moment.now()
|
||||
|
||||
# Check for sleep detection
|
||||
if currentTime - lastTimeExecuted > sleepDetectionInterval:
|
||||
warn "Keep alive hasn't been executed recently. Killing all connections"
|
||||
await node.peerManager.disconnectAllPeers()
|
||||
lastTimeExecuted = currentTime
|
||||
consecutiveIterationFailures = 0
|
||||
continue
|
||||
|
||||
# Check for consecutive failures
|
||||
if consecutiveIterationFailures > maxAllowedConsecutiveFailures:
|
||||
warn "Too many consecutive ping failures, node likely disconnected. Killing all connections",
|
||||
consecutiveIterationFailures, maxAllowedConsecutiveFailures
|
||||
await node.peerManager.disconnectAllPeers()
|
||||
consecutiveIterationFailures = 0
|
||||
lastTimeExecuted = currentTime
|
||||
continue
|
||||
|
||||
# Determine which peers to ping
|
||||
let outPeers = node.peerManager.connectedPeers()[1]
|
||||
let peersToPing =
|
||||
if countdownToPingAll > 0:
|
||||
await selectRandomPeersForKeepalive(node, outPeers, numRandomPeers)
|
||||
else:
|
||||
outPeers
|
||||
|
||||
let numPeersToPing = len(peersToPing)
|
||||
|
||||
if countdownToPingAll > 0:
|
||||
trace "Pinging random peers",
|
||||
count = numPeersToPing, countdownToPingAll = countdownToPingAll
|
||||
countdownToPingAll.dec()
|
||||
else:
|
||||
trace "Pinging all peers", count = numPeersToPing
|
||||
countdownToPingAll = max(0, randomToAllRatio - 1)
|
||||
|
||||
# Execute keepalive pings
|
||||
let successfulPings = await parallelPings(node, peersToPing)
|
||||
|
||||
if successfulPings != numPeersToPing:
|
||||
waku_node_errors.inc(
|
||||
amount = numPeersToPing - successfulPings, labelValues = ["keep_alive_failure"]
|
||||
)
|
||||
|
||||
trace "Keepalive results",
|
||||
attemptedPings = numPeersToPing, successfulPings = successfulPings
|
||||
|
||||
# Update failure tracking
|
||||
if numPeersToPing > 0 and successfulPings == 0:
|
||||
consecutiveIterationFailures.inc()
|
||||
error "All pings failed", consecutiveFailures = consecutiveIterationFailures
|
||||
else:
|
||||
consecutiveIterationFailures = 0
|
||||
|
||||
lastTimeExecuted = currentTime
|
||||
|
||||
# 2 minutes default - 20% of the default chronosstream timeout duration
|
||||
proc startKeepalive*(
|
||||
hm: NodeHealthMonitor,
|
||||
randomPeersKeepalive = 10.seconds,
|
||||
allPeersKeepalive = 2.minutes,
|
||||
): Result[void, string] =
|
||||
# Validate input parameters
|
||||
if randomPeersKeepalive.isZero() or allPeersKeepAlive.isZero():
|
||||
error "startKeepalive: allPeersKeepAlive and randomPeersKeepalive must be greater than 0",
|
||||
randomPeersKeepalive = $randomPeersKeepalive,
|
||||
allPeersKeepAlive = $allPeersKeepAlive
|
||||
return err(
|
||||
"startKeepalive: allPeersKeepAlive and randomPeersKeepalive must be greater than 0"
|
||||
)
|
||||
|
||||
if allPeersKeepAlive < randomPeersKeepalive:
|
||||
error "startKeepalive: allPeersKeepAlive can't be less than randomPeersKeepalive",
|
||||
allPeersKeepAlive = $allPeersKeepAlive,
|
||||
randomPeersKeepalive = $randomPeersKeepalive
|
||||
return
|
||||
err("startKeepalive: allPeersKeepAlive can't be less than randomPeersKeepalive")
|
||||
|
||||
info "starting keepalive",
|
||||
randomPeersKeepalive = randomPeersKeepalive, allPeersKeepalive = allPeersKeepalive
|
||||
|
||||
hm.keepAliveFut = hm.node.keepAliveLoop(randomPeersKeepalive, allPeersKeepalive)
|
||||
return ok()
|
||||
|
||||
proc getNodeHealthReport*(hm: NodeHealthMonitor): Future[HealthReport] {.async.} =
|
||||
var report: HealthReport
|
||||
report.nodeHealth = hm.nodeHealth
|
||||
@ -253,11 +401,15 @@ proc setNodeToHealthMonitor*(hm: NodeHealthMonitor, node: WakuNode) =
|
||||
proc setOverallHealth*(hm: NodeHealthMonitor, health: HealthStatus) =
|
||||
hm.nodeHealth = health
|
||||
|
||||
proc startHealthMonitor*(hm: NodeHealthMonitor) =
|
||||
proc startHealthMonitor*(hm: NodeHealthMonitor): Result[void, string] =
|
||||
hm.onlineMonitor.startOnlineMonitor()
|
||||
hm.startKeepalive().isOkOr:
|
||||
return err("startHealthMonitor: failed starting keep alive: " & error)
|
||||
return ok()
|
||||
|
||||
proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} =
|
||||
await hm.onlineMonitor.stopOnlineMonitor()
|
||||
await hm.keepAliveFut.cancelAndWait()
|
||||
|
||||
proc new*(
|
||||
T: type NodeHealthMonitor,
|
||||
|
||||
@ -53,7 +53,7 @@ proc networkConnectivityLoop(self: OnlineMonitor): Future[void] {.async.} =
|
||||
## and triggers any change that depends on the network connectivity state
|
||||
while true:
|
||||
await self.updateOnlineState()
|
||||
await sleepAsync(15.seconds)
|
||||
await sleepAsync(5.seconds)
|
||||
|
||||
proc startOnlineMonitor*(self: OnlineMonitor) =
|
||||
self.networkConnLoopHandle = self.networkConnectivityLoop()
|
||||
|
||||
@ -501,6 +501,13 @@ proc connectedPeers*(
|
||||
|
||||
return (inPeers, outPeers)
|
||||
|
||||
proc disconnectAllPeers*(pm: PeerManager) {.async.} =
|
||||
let (inPeerIds, outPeerIds) = pm.connectedPeers()
|
||||
let connectedPeers = concat(inPeerIds, outPeerIds)
|
||||
|
||||
let futs = connectedPeers.mapIt(pm.disconnectNode(it))
|
||||
await allFutures(futs)
|
||||
|
||||
proc getStreamByPeerIdAndProtocol*(
|
||||
pm: PeerManager, peerId: PeerId, protocol: string
|
||||
): Future[Result[Connection, string]] {.async.} =
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[hashes, options, sugar, tables, strutils, sequtils, os, net],
|
||||
std/[hashes, options, sugar, tables, strutils, sequtils, os, net, random],
|
||||
chronos,
|
||||
chronicles,
|
||||
metrics,
|
||||
@ -69,6 +69,10 @@ declarePublicGauge waku_px_peers,
|
||||
logScope:
|
||||
topics = "waku node"
|
||||
|
||||
# randomize initializes sdt/random's random number generator
|
||||
# if not called, the outcome of randomization procedures will be the same in every run
|
||||
randomize()
|
||||
|
||||
# TODO: Move to application instance (e.g., `WakuNode2`)
|
||||
# Git version in git describe format (defined compile time)
|
||||
const git_version* {.strdefine.} = "n/a"
|
||||
@ -1325,35 +1329,60 @@ proc mountLibp2pPing*(node: WakuNode) {.async: (raises: []).} =
|
||||
except LPError:
|
||||
error "failed to mount libp2pPing", error = getCurrentExceptionMsg()
|
||||
|
||||
# TODO: Move this logic to PeerManager
|
||||
proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
|
||||
while true:
|
||||
await sleepAsync(keepalive)
|
||||
if not node.started:
|
||||
proc pingPeer(node: WakuNode, peerId: PeerId): Future[Result[void, string]] {.async.} =
|
||||
## Ping a single peer and return the result
|
||||
|
||||
try:
|
||||
# Establish a stream
|
||||
let stream = (await node.peerManager.dialPeer(peerId, PingCodec)).valueOr:
|
||||
error "pingPeer: failed dialing peer", peerId = peerId
|
||||
return err("pingPeer failed dialing peer peerId: " & $peerId)
|
||||
defer:
|
||||
# Always close the stream
|
||||
try:
|
||||
await stream.close()
|
||||
except CatchableError as e:
|
||||
debug "Error closing ping connection", peerId = peerId, error = e.msg
|
||||
|
||||
# Perform ping
|
||||
let pingDuration = await node.libp2pPing.ping(stream)
|
||||
|
||||
trace "Ping successful", peerId = peerId, duration = pingDuration
|
||||
return ok()
|
||||
except CatchableError as e:
|
||||
error "pingPeer: exception raised pinging peer", peerId = peerId, error = e.msg
|
||||
return err("pingPeer: exception raised pinging peer: " & e.msg)
|
||||
|
||||
proc selectRandomPeers*(peers: seq[PeerId], numRandomPeers: int): seq[PeerId] =
|
||||
var randomPeers = peers
|
||||
shuffle(randomPeers)
|
||||
return randomPeers[0 ..< min(len(randomPeers), numRandomPeers)]
|
||||
|
||||
# Returns the number of succesful pings performed
|
||||
proc parallelPings*(node: WakuNode, peerIds: seq[PeerId]): Future[int] {.async.} =
|
||||
if len(peerIds) == 0:
|
||||
return 0
|
||||
|
||||
var pingFuts: seq[Future[Result[void, string]]]
|
||||
|
||||
# Create ping futures for each peer
|
||||
for i, peerId in peerIds:
|
||||
let fut = pingPeer(node, peerId)
|
||||
pingFuts.add(fut)
|
||||
|
||||
# Wait for all pings to complete
|
||||
discard await allFutures(pingFuts).withTimeout(5.seconds)
|
||||
|
||||
var successCount = 0
|
||||
for fut in pingFuts:
|
||||
if not fut.completed() or fut.failed():
|
||||
continue
|
||||
|
||||
# Keep connected peers alive while running
|
||||
# Each node is responsible of keeping its outgoing connections alive
|
||||
trace "Running keepalive"
|
||||
let res = fut.read()
|
||||
if res.isOk():
|
||||
successCount.inc()
|
||||
|
||||
# First get a list of connected peer infos
|
||||
let outPeers = node.peerManager.connectedPeers()[1]
|
||||
|
||||
for peerId in outPeers:
|
||||
try:
|
||||
let conn = (await node.peerManager.dialPeer(peerId, PingCodec)).valueOr:
|
||||
warn "Failed dialing peer for keep alive", peerId = peerId
|
||||
continue
|
||||
let pingDelay = await node.libp2pPing.ping(conn)
|
||||
await conn.close()
|
||||
except CatchableError as exc:
|
||||
waku_node_errors.inc(labelValues = ["keep_alive_failure"])
|
||||
|
||||
# 2 minutes default - 20% of the default chronosstream timeout duration
|
||||
proc startKeepalive*(node: WakuNode, keepalive = 2.minutes) =
|
||||
info "starting keepalive", keepalive = keepalive
|
||||
|
||||
asyncSpawn node.keepaliveLoop(keepalive)
|
||||
return successCount
|
||||
|
||||
proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} =
|
||||
info "mounting rendezvous discovery protocol"
|
||||
|
||||
@ -332,6 +332,13 @@ proc getPubSubPeersInMesh*(
|
||||
## Returns the list of PubSubPeers in a mesh defined by the passed pubsub topic.
|
||||
## The 'mesh' atribute is defined in the GossipSub ref object.
|
||||
|
||||
# If pubsubTopic is empty, we return all peers in mesh for any pubsub topic
|
||||
if pubsubTopic == "":
|
||||
var allPeers = initHashSet[PubSubPeer]()
|
||||
for topic, topicMesh in w.mesh.pairs:
|
||||
allPeers = allPeers.union(topicMesh)
|
||||
return ok(allPeers)
|
||||
|
||||
if not w.mesh.hasKey(pubsubTopic):
|
||||
debug "getPubSubPeersInMesh - there is no mesh peer for the given pubsub topic",
|
||||
pubsubTopic = pubsubTopic
|
||||
@ -348,7 +355,7 @@ proc getPubSubPeersInMesh*(
|
||||
return ok(peers)
|
||||
|
||||
proc getPeersInMesh*(
|
||||
w: WakuRelay, pubsubTopic: PubsubTopic
|
||||
w: WakuRelay, pubsubTopic: PubsubTopic = ""
|
||||
): Result[seq[PeerId], string] =
|
||||
## Returns the list of peerIds in a mesh defined by the passed pubsub topic.
|
||||
## The 'mesh' atribute is defined in the GossipSub ref object.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user