2021-12-08 14:21:30 +00:00
package node
import (
"context"
2024-07-11 12:02:52 -04:00
"errors"
"math/rand"
2023-01-26 16:08:27 -04:00
"sync"
2021-12-08 14:21:30 +00:00
"time"
2024-07-21 20:43:22 -04:00
"github.com/libp2p/go-libp2p/core/host"
2022-10-19 15:39:32 -04:00
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
2021-12-08 14:21:30 +00:00
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
2022-11-09 15:53:01 -04:00
"github.com/waku-org/go-waku/logging"
2024-09-25 17:15:20 +08:00
"github.com/waku-org/go-waku/waku/v2/utils"
2022-05-27 09:25:06 -04:00
"go.uber.org/zap"
2024-07-11 12:02:52 -04:00
"golang.org/x/exp/maps"
2021-12-08 14:21:30 +00:00
)
const maxAllowedPingFailures = 2
2023-08-08 10:44:38 -04:00
// If the difference between the last time the keep alive code was executed and now is greater
// than sleepDectectionIntervalFactor * keepAlivePeriod, force the ping verification to disconnect
// the peers if they don't reply back
const sleepDetectionIntervalFactor = 3
2023-01-08 14:33:30 -04:00
2024-08-09 11:51:14 -04:00
const maxPeersToPingPerProtocol = 10
2024-07-11 12:02:52 -04:00
2024-07-21 20:43:22 -04:00
const maxAllowedSubsequentPingFailures = 2
func disconnectAllPeers ( host host . Host , logger * zap . Logger ) {
for _ , p := range host . Network ( ) . Peers ( ) {
err := host . Network ( ) . ClosePeer ( p )
if err != nil {
logger . Debug ( "closing conn to peer" , zap . Error ( err ) )
}
}
}
2021-12-08 14:21:30 +00:00
// startKeepAlive creates a go routine that periodically pings connected peers.
// This is necessary because TCP connections are automatically closed due to inactivity,
// and doing a ping will avoid this (with a small bandwidth cost)
2024-07-11 12:02:52 -04:00
func ( w * WakuNode ) startKeepAlive ( ctx context . Context , randomPeersPingDuration time . Duration , allPeersPingDuration time . Duration ) {
2024-09-25 17:15:20 +08:00
defer utils . LogOnPanic ( )
2023-01-08 14:33:30 -04:00
defer w . wg . Done ( )
2024-07-11 12:02:52 -04:00
if ! w . opts . enableRelay {
return
}
w . log . Info ( "setting up ping protocol" , zap . Duration ( "randomPeersPingDuration" , randomPeersPingDuration ) , zap . Duration ( "allPeersPingDuration" , allPeersPingDuration ) )
randomPeersTickerC := make ( <- chan time . Time )
if randomPeersPingDuration != 0 {
randomPeersTicker := time . NewTicker ( randomPeersPingDuration )
defer randomPeersTicker . Stop ( )
randomPeersTickerC = randomPeersTicker . C
}
allPeersTickerC := make ( <- chan time . Time )
2024-08-09 11:51:14 -04:00
if allPeersPingDuration != 0 {
allPeersTicker := time . NewTicker ( allPeersPingDuration )
2024-07-11 12:02:52 -04:00
defer allPeersTicker . Stop ( )
2024-09-04 19:14:17 +05:30
allPeersTickerC = allPeersTicker . C
2024-07-11 12:02:52 -04:00
}
2022-12-05 15:06:33 -04:00
2023-01-08 14:33:30 -04:00
lastTimeExecuted := w . timesource . Now ( )
2022-12-08 15:48:16 -04:00
2024-07-11 12:02:52 -04:00
sleepDetectionInterval := int64 ( randomPeersPingDuration ) * sleepDetectionIntervalFactor
2022-12-05 15:06:33 -04:00
2024-07-21 20:43:22 -04:00
var iterationFailure int
2023-01-08 14:33:30 -04:00
for {
2024-07-11 12:02:52 -04:00
peersToPing := [ ] peer . ID { }
2023-01-08 14:33:30 -04:00
select {
2024-07-11 12:02:52 -04:00
case <- allPeersTickerC :
2024-08-09 11:51:14 -04:00
if w . opts . enableRelay {
relayPeersSet := make ( map [ peer . ID ] struct { } )
for _ , t := range w . Relay ( ) . Topics ( ) {
for _ , p := range w . Relay ( ) . PubSub ( ) . ListPeers ( t ) {
relayPeersSet [ p ] = struct { } { }
}
2024-07-11 12:02:52 -04:00
}
2024-08-09 11:51:14 -04:00
peersToPing = append ( peersToPing , maps . Keys ( relayPeersSet ) ... )
2024-07-11 12:02:52 -04:00
}
case <- randomPeersTickerC :
2023-01-08 14:33:30 -04:00
difference := w . timesource . Now ( ) . UnixNano ( ) - lastTimeExecuted . UnixNano ( )
if difference > sleepDetectionInterval {
lastTimeExecuted = w . timesource . Now ( )
2024-07-11 12:02:52 -04:00
w . log . Warn ( "keep alive hasnt been executed recently. Killing all connections" )
2024-07-21 20:43:22 -04:00
disconnectAllPeers ( w . host , w . log )
continue
} else if iterationFailure >= maxAllowedSubsequentPingFailures {
iterationFailure = 0
w . log . Warn ( "Pinging random peers failed, node is likely disconnected. Killing all connections" )
disconnectAllPeers ( w . host , w . log )
2023-01-08 14:33:30 -04:00
continue
}
2022-12-05 15:06:33 -04:00
2024-08-09 11:51:14 -04:00
if w . opts . enableRelay {
// Priorize mesh peers
meshPeersSet := make ( map [ peer . ID ] struct { } )
2024-07-11 12:02:52 -04:00
for _ , t := range w . Relay ( ) . Topics ( ) {
2024-08-09 11:51:14 -04:00
for _ , p := range w . Relay ( ) . PubSub ( ) . MeshPeers ( t ) {
meshPeersSet [ p ] = struct { } { }
2024-07-11 12:02:52 -04:00
}
2021-12-08 14:21:30 +00:00
}
2024-08-09 11:51:14 -04:00
peersToPing = append ( peersToPing , maps . Keys ( meshPeersSet ) ... )
// Ping also some random relay peers
if maxPeersToPingPerProtocol - len ( peersToPing ) > 0 {
relayPeersSet := make ( map [ peer . ID ] struct { } )
for _ , t := range w . Relay ( ) . Topics ( ) {
for _ , p := range w . Relay ( ) . PubSub ( ) . ListPeers ( t ) {
if _ , ok := meshPeersSet [ p ] ; ! ok {
relayPeersSet [ p ] = struct { } { }
}
}
}
2024-07-11 12:02:52 -04:00
2024-08-09 11:51:14 -04:00
relayPeers := maps . Keys ( relayPeersSet )
rand . Shuffle ( len ( relayPeers ) , func ( i , j int ) { relayPeers [ i ] , relayPeers [ j ] = relayPeers [ j ] , relayPeers [ i ] } )
2024-07-11 12:02:52 -04:00
2024-08-09 11:51:14 -04:00
peerLen := maxPeersToPingPerProtocol - len ( peersToPing )
if peerLen > len ( relayPeers ) {
peerLen = len ( relayPeers )
}
peersToPing = append ( peersToPing , relayPeers [ 0 : peerLen ] ... )
2024-07-11 12:02:52 -04:00
}
2021-12-08 14:21:30 +00:00
}
2023-01-08 14:33:30 -04:00
2024-08-09 11:51:14 -04:00
if w . opts . enableFilterLightNode {
// We also ping all filter nodes
filterPeersSet := make ( map [ peer . ID ] struct { } )
for _ , s := range w . FilterLightnode ( ) . Subscriptions ( ) {
filterPeersSet [ s . PeerID ] = struct { } { }
}
peersToPing = append ( peersToPing , maps . Keys ( filterPeersSet ) ... )
}
2023-01-08 14:33:30 -04:00
case <- ctx . Done ( ) :
w . log . Info ( "stopping ping protocol" )
return
2021-12-08 14:21:30 +00:00
}
2024-07-11 12:02:52 -04:00
pingWg := sync . WaitGroup { }
pingWg . Add ( len ( peersToPing ) )
2024-07-21 20:43:22 -04:00
pingResultChan := make ( chan bool , len ( peersToPing ) )
2024-07-11 12:02:52 -04:00
for _ , p := range peersToPing {
2024-07-21 20:43:22 -04:00
go w . pingPeer ( ctx , & pingWg , p , pingResultChan )
2024-07-11 12:02:52 -04:00
}
pingWg . Wait ( )
2024-07-21 20:43:22 -04:00
close ( pingResultChan )
failureCounter := 0
for couldPing := range pingResultChan {
if ! couldPing {
failureCounter ++
}
}
if len ( peersToPing ) > 0 && failureCounter == len ( peersToPing ) {
iterationFailure ++
} else {
iterationFailure = 0
}
2024-07-11 12:02:52 -04:00
lastTimeExecuted = w . timesource . Now ( )
2023-01-08 14:33:30 -04:00
}
2021-12-08 14:21:30 +00:00
}
2024-07-21 20:43:22 -04:00
func ( w * WakuNode ) pingPeer ( ctx context . Context , wg * sync . WaitGroup , peerID peer . ID , resultChan chan bool ) {
2024-09-25 17:15:20 +08:00
defer utils . LogOnPanic ( )
2023-01-26 16:08:27 -04:00
defer wg . Done ( )
2021-12-08 14:21:30 +00:00
2024-07-11 12:02:52 -04:00
logger := w . log . With ( logging . HostID ( "peer" , peerID ) )
for i := 0 ; i < maxAllowedPingFailures ; i ++ {
if w . host . Network ( ) . Connectedness ( peerID ) != network . Connected {
// Peer is no longer connected. No need to ping
2024-07-21 20:43:22 -04:00
resultChan <- false
2024-07-11 12:02:52 -04:00
return
}
logger . Debug ( "pinging" )
if w . tryPing ( ctx , peerID , logger ) {
2024-07-21 20:43:22 -04:00
resultChan <- true
2024-07-11 12:02:52 -04:00
return
}
}
if w . host . Network ( ) . Connectedness ( peerID ) != network . Connected {
2024-07-21 20:43:22 -04:00
resultChan <- false
2024-07-11 12:02:52 -04:00
return
}
logger . Info ( "disconnecting dead peer" )
if err := w . host . Network ( ) . ClosePeer ( peerID ) ; err != nil {
logger . Debug ( "closing conn to peer" , zap . Error ( err ) )
}
2024-07-21 20:43:22 -04:00
resultChan <- false
2024-07-11 12:02:52 -04:00
}
func ( w * WakuNode ) tryPing ( ctx context . Context , peerID peer . ID , logger * zap . Logger ) bool {
2023-01-06 18:37:57 -04:00
ctx , cancel := context . WithTimeout ( ctx , 7 * time . Second )
2021-12-08 14:21:30 +00:00
defer cancel ( )
2023-08-08 10:44:38 -04:00
pr := ping . Ping ( ctx , w . host , peerID )
2021-12-08 14:21:30 +00:00
select {
case res := <- pr :
if res . Error != nil {
2022-05-27 09:25:06 -04:00
logger . Debug ( "could not ping" , zap . Error ( res . Error ) )
2024-07-11 12:02:52 -04:00
return false
2021-12-08 14:21:30 +00:00
}
case <- ctx . Done ( ) :
2024-07-11 12:02:52 -04:00
if ! errors . Is ( ctx . Err ( ) , context . Canceled ) {
logger . Debug ( "could not ping (context)" , zap . Error ( ctx . Err ( ) ) )
2021-12-08 14:21:30 +00:00
}
2024-07-11 12:02:52 -04:00
return false
2021-12-08 14:21:30 +00:00
}
2024-07-11 12:02:52 -04:00
return true
2021-12-08 14:21:30 +00:00
}