2023-08-03 21:51:15 +05:30
package peermanager
import (
"context"
2023-09-27 12:16:37 +05:30
"errors"
"sync"
2023-08-03 21:51:15 +05:30
"time"
2023-09-27 12:16:37 +05:30
pubsub "github.com/libp2p/go-libp2p-pubsub"
2023-09-19 11:35:29 +05:30
"github.com/libp2p/go-libp2p/core/event"
2023-08-03 21:51:15 +05:30
"github.com/libp2p/go-libp2p/core/host"
2023-08-15 06:57:51 +05:30
"github.com/libp2p/go-libp2p/core/network"
2023-08-03 21:51:15 +05:30
"github.com/libp2p/go-libp2p/core/peer"
2023-08-10 18:28:22 +05:30
"github.com/libp2p/go-libp2p/core/peerstore"
2023-08-03 21:51:15 +05:30
"github.com/libp2p/go-libp2p/core/protocol"
2023-08-10 18:28:22 +05:30
ma "github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/logging"
2023-11-07 22:43:19 +05:30
"github.com/waku-org/go-waku/waku/v2/discv5"
2023-08-03 21:51:15 +05:30
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
2023-09-27 12:16:37 +05:30
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
2023-09-14 20:30:06 +05:30
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
2024-03-14 10:21:47 -04:00
"github.com/waku-org/go-waku/waku/v2/protocol/metadata"
2023-09-14 14:06:08 +05:30
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
2023-11-07 22:43:19 +05:30
"github.com/waku-org/go-waku/waku/v2/service"
2024-09-23 14:41:07 -07:00
"github.com/waku-org/go-waku/waku/v2/utils"
2023-08-03 21:51:15 +05:30
"go.uber.org/zap"
)
2024-02-08 15:24:58 +05:30
type TopicHealth int
const (
UnHealthy = iota
MinimallyHealthy = 1
SufficientlyHealthy = 2
)
func ( t TopicHealth ) String ( ) string {
switch t {
case UnHealthy :
return "UnHealthy"
case MinimallyHealthy :
return "MinimallyHealthy"
case SufficientlyHealthy :
return "SufficientlyHealthy"
default :
return ""
}
}
type TopicHealthStatus struct {
Topic string
Health TopicHealth
}
2023-09-27 12:16:37 +05:30
// NodeTopicDetails stores pubSubTopic related data like topicHandle for the node.
type NodeTopicDetails struct {
2024-02-08 15:24:58 +05:30
topic * pubsub . Topic
healthStatus TopicHealth
2023-09-27 12:16:37 +05:30
}
2023-11-07 22:43:19 +05:30
// WakuProtoInfo holds protocol specific info
// To be used at a later stage to set various config such as criteria for peer management specific to each Waku protocols
// This should make peer-manager agnostic to protocol
type WakuProtoInfo struct {
waku2ENRBitField uint8
}
2023-08-03 21:51:15 +05:30
// PeerManager applies various controls and manage connections towards peers.
type PeerManager struct {
2023-11-07 22:43:19 +05:30
peerConnector * PeerConnectionStrategy
2024-03-14 10:21:47 -04:00
metadata * metadata . WakuMetadata
2024-07-03 16:35:39 -04:00
relay * relay . WakuRelay
2023-11-07 22:43:19 +05:30
maxPeers int
maxRelayPeers int
logger * zap . Logger
2024-06-26 06:18:44 +05:30
InPeersTarget int
OutPeersTarget int
2023-11-07 22:43:19 +05:30
host host . Host
serviceSlots * ServiceSlots
ctx context . Context
sub event . Subscription
topicMutex sync . RWMutex
subRelayTopics map [ string ] * NodeTopicDetails
discoveryService * discv5 . DiscoveryV5
wakuprotoToENRFieldMap map [ protocol . ID ] WakuProtoInfo
2024-02-08 15:24:58 +05:30
TopicHealthNotifCh chan <- TopicHealthStatus
2024-05-13 14:56:34 -04:00
rttCache * FastestPeerSelector
2024-06-26 06:18:44 +05:30
RelayEnabled bool
2024-09-23 14:41:07 -07:00
evtDialError event . Emitter
2023-08-03 21:51:15 +05:30
}
2023-10-16 22:12:01 +05:30
// PeerSelection provides various options based on which Peer is selected from a list of peers.
type PeerSelection int
const (
Automatic PeerSelection = iota
LowestRTT
)
2024-08-21 18:08:11 +05:30
const maxFailedAttempts = 5
const prunePeerStoreInterval = 10 * time . Minute
2023-08-03 21:51:15 +05:30
const peerConnectivityLoopSecs = 15
2024-10-28 11:46:40 +05:30
const maxConnsToPeerRatio = 3
const maxDialFailures = 2
2023-08-30 18:57:22 +07:00
// 80% relay peers 20% service peers
func relayAndServicePeers ( maxConnections int ) ( int , int ) {
return maxConnections - maxConnections / 5 , maxConnections / 5
}
// 66% inRelayPeers 33% outRelayPeers
func inAndOutRelayPeers ( relayPeers int ) ( int , int ) {
outRelayPeers := relayPeers / 3
//
const minOutRelayConns = 10
if outRelayPeers < minOutRelayConns {
outRelayPeers = minOutRelayConns
}
return relayPeers - outRelayPeers , outRelayPeers
}
2023-08-03 21:51:15 +05:30
2024-02-08 15:24:58 +05:30
// checkAndUpdateTopicHealth finds health of specified topic and updates and notifies of the same.
// Also returns the healthyPeerCount
func ( pm * PeerManager ) checkAndUpdateTopicHealth ( topic * NodeTopicDetails ) int {
2024-08-23 15:23:07 +01:00
if topic == nil {
return 0
}
2024-02-08 15:24:58 +05:30
healthyPeerCount := 0
2024-07-03 16:35:39 -04:00
for _ , p := range pm . relay . PubSub ( ) . MeshPeers ( topic . topic . String ( ) ) {
2024-02-08 15:24:58 +05:30
if pm . host . Network ( ) . Connectedness ( p ) == network . Connected {
pThreshold , err := pm . host . Peerstore ( ) . ( wps . WakuPeerstore ) . Score ( p )
if err == nil {
if pThreshold < relay . PeerPublishThreshold {
2024-06-26 06:18:44 +05:30
pm . logger . Debug ( "peer score below publish threshold" , zap . Stringer ( "peer" , p ) , zap . Float64 ( "score" , pThreshold ) )
2024-02-08 15:24:58 +05:30
} else {
healthyPeerCount ++
}
} else {
2024-06-05 15:03:33 -04:00
if errors . Is ( err , peerstore . ErrNotFound ) {
// For now considering peer as healthy if we can't fetch score.
healthyPeerCount ++
2024-06-26 06:18:44 +05:30
pm . logger . Debug ( "peer score is not available yet" , zap . Stringer ( "peer" , p ) )
2024-06-05 15:03:33 -04:00
} else {
2024-06-26 06:18:44 +05:30
pm . logger . Warn ( "failed to fetch peer score " , zap . Error ( err ) , zap . Stringer ( "peer" , p ) )
2024-06-05 15:03:33 -04:00
}
2024-02-08 15:24:58 +05:30
}
}
}
2024-07-03 16:35:39 -04:00
2024-02-08 15:24:58 +05:30
//Update topic's health
oldHealth := topic . healthStatus
if healthyPeerCount < 1 { //Ideally this check should be done with minPeersForRelay, but leaving it as is for now.
topic . healthStatus = UnHealthy
} else if healthyPeerCount < waku_proto . GossipSubDMin {
topic . healthStatus = MinimallyHealthy
} else {
topic . healthStatus = SufficientlyHealthy
}
if oldHealth != topic . healthStatus {
//Check old health, and if there is a change notify of the same.
pm . logger . Debug ( "topic health has changed" , zap . String ( "pubsubtopic" , topic . topic . String ( ) ) , zap . Stringer ( "health" , topic . healthStatus ) )
pm . TopicHealthNotifCh <- TopicHealthStatus { topic . topic . String ( ) , topic . healthStatus }
}
return healthyPeerCount
}
// TopicHealth can be used to fetch health of a specific pubsubTopic.
// Returns error if topic is not found.
func ( pm * PeerManager ) TopicHealth ( pubsubTopic string ) ( TopicHealth , error ) {
pm . topicMutex . RLock ( )
defer pm . topicMutex . RUnlock ( )
topicDetails , ok := pm . subRelayTopics [ pubsubTopic ]
if ! ok {
return UnHealthy , errors . New ( "topic not found" )
}
return topicDetails . healthStatus , nil
}
2023-08-03 21:51:15 +05:30
// NewPeerManager creates a new peerManager instance.
2024-07-03 16:35:39 -04:00
func NewPeerManager ( maxConnections int , maxPeers int , metadata * metadata . WakuMetadata , relay * relay . WakuRelay , relayEnabled bool , logger * zap . Logger ) * PeerManager {
2024-06-26 06:18:44 +05:30
var inPeersTarget , outPeersTarget , maxRelayPeers int
if relayEnabled {
maxRelayPeers , _ := relayAndServicePeers ( maxConnections )
inPeersTarget , outPeersTarget = inAndOutRelayPeers ( maxRelayPeers )
if maxPeers == 0 || maxConnections > maxPeers {
maxPeers = maxConnsToPeerRatio * maxConnections
}
} else {
maxRelayPeers = 0
inPeersTarget = 0
//TODO: ideally this should be 2 filter peers per topic, 2 lightpush peers per topic and 2-4 store nodes per topic
outPeersTarget = 10
2023-09-27 12:16:37 +05:30
}
2023-08-03 21:51:15 +05:30
pm := & PeerManager {
2023-11-07 22:43:19 +05:30
logger : logger . Named ( "peer-manager" ) ,
2024-03-14 10:21:47 -04:00
metadata : metadata ,
2024-07-03 16:35:39 -04:00
relay : relay ,
2023-11-07 22:43:19 +05:30
maxRelayPeers : maxRelayPeers ,
2024-06-26 06:18:44 +05:30
InPeersTarget : inPeersTarget ,
OutPeersTarget : outPeersTarget ,
2023-11-07 22:43:19 +05:30
serviceSlots : NewServiceSlot ( ) ,
subRelayTopics : make ( map [ string ] * NodeTopicDetails ) ,
maxPeers : maxPeers ,
wakuprotoToENRFieldMap : map [ protocol . ID ] WakuProtoInfo { } ,
2024-05-13 14:56:34 -04:00
rttCache : NewFastestPeerSelector ( logger ) ,
2024-06-26 06:18:44 +05:30
RelayEnabled : relayEnabled ,
2023-08-03 21:51:15 +05:30
}
2023-08-15 06:57:51 +05:30
logger . Info ( "PeerManager init values" , zap . Int ( "maxConnections" , maxConnections ) ,
2023-08-30 18:57:22 +07:00
zap . Int ( "maxRelayPeers" , maxRelayPeers ) ,
2024-06-26 06:18:44 +05:30
zap . Int ( "outPeersTarget" , outPeersTarget ) ,
zap . Int ( "inPeersTarget" , pm . InPeersTarget ) ,
2023-09-27 12:16:37 +05:30
zap . Int ( "maxPeers" , maxPeers ) )
2023-08-03 21:51:15 +05:30
return pm
}
2023-11-07 22:43:19 +05:30
// SetDiscv5 sets the discoveryv5 service to be used for peer discovery.
func ( pm * PeerManager ) SetDiscv5 ( discv5 * discv5 . DiscoveryV5 ) {
pm . discoveryService = discv5
}
2023-08-15 06:57:51 +05:30
// SetHost sets the host to be used in order to access the peerStore.
2023-08-03 21:51:15 +05:30
func ( pm * PeerManager ) SetHost ( host host . Host ) {
pm . host = host
2024-05-13 14:56:34 -04:00
pm . rttCache . SetHost ( host )
2023-08-03 21:51:15 +05:30
}
2023-08-15 06:57:51 +05:30
// SetPeerConnector sets the peer connector to be used for establishing relay connections.
func ( pm * PeerManager ) SetPeerConnector ( pc * PeerConnectionStrategy ) {
pm . peerConnector = pc
}
2023-08-03 21:51:15 +05:30
// Start starts the processing to be done by peer manager.
func ( pm * PeerManager ) Start ( ctx context . Context ) {
2023-08-15 06:57:51 +05:30
pm . ctx = ctx
2024-08-21 18:08:11 +05:30
if pm . RelayEnabled {
pm . RegisterWakuProtocol ( relay . WakuRelayID_v200 , relay . WakuRelayENRField )
if pm . sub != nil {
go pm . peerEventLoop ( ctx )
}
go pm . connectivityLoop ( ctx )
}
go pm . peerStoreLoop ( ctx )
2024-09-23 14:41:07 -07:00
if pm . host != nil {
var err error
pm . evtDialError , err = pm . host . EventBus ( ) . Emitter ( new ( utils . DialError ) )
if err != nil {
pm . logger . Error ( "failed to create dial error emitter" , zap . Error ( err ) )
}
}
2024-08-21 18:08:11 +05:30
}
2024-12-09 14:14:28 +05:30
func ( pm * PeerManager ) CheckAndRemoveBadPeer ( peerID peer . ID ) {
if pm . host . Peerstore ( ) . ( wps . WakuPeerstore ) . ConnFailures ( peerID ) > maxDialFailures &&
pm . peerConnector . onlineChecker . IsOnline ( ) {
if origin , _ := pm . host . Peerstore ( ) . ( wps . WakuPeerstore ) . Origin ( peerID ) ; origin != wps . Static { // delete only if a peer is discovered and not configured statically.
//delete peer from peerStore
pm . logger . Debug ( "removing bad peer due to recurring dial failures" , zap . Stringer ( "peerID" , peerID ) )
pm . RemovePeer ( peerID )
2024-10-28 11:46:40 +05:30
}
}
}
2024-08-21 18:08:11 +05:30
func ( pm * PeerManager ) peerStoreLoop ( ctx context . Context ) {
2024-09-25 17:15:20 +08:00
defer utils . LogOnPanic ( )
2024-08-21 18:08:11 +05:30
t := time . NewTicker ( prunePeerStoreInterval )
defer t . Stop ( )
for {
select {
case <- ctx . Done ( ) :
return
case <- t . C :
pm . prunePeerStore ( )
}
2023-09-19 11:35:29 +05:30
}
2024-08-21 18:08:11 +05:30
}
func ( pm * PeerManager ) prunePeerStore ( ) {
peers := pm . host . Peerstore ( ) . Peers ( )
numPeers := len ( peers )
if numPeers < pm . maxPeers {
pm . logger . Debug ( "peerstore size within capacity, not pruning" , zap . Int ( "capacity" , pm . maxPeers ) , zap . Int ( "numPeers" , numPeers ) )
return
}
peerCntBeforePruning := numPeers
pm . logger . Debug ( "peerstore capacity exceeded, hence pruning" , zap . Int ( "capacity" , pm . maxPeers ) , zap . Int ( "numPeers" , peerCntBeforePruning ) )
for _ , peerID := range peers {
connFailues := pm . host . Peerstore ( ) . ( wps . WakuPeerstore ) . ConnFailures ( peerID )
if connFailues > maxFailedAttempts {
// safety check so that we don't end up disconnecting connected peers.
if pm . host . Network ( ) . Connectedness ( peerID ) == network . Connected {
pm . host . Peerstore ( ) . ( wps . WakuPeerstore ) . ResetConnFailures ( peerID )
continue
}
pm . host . Peerstore ( ) . RemovePeer ( peerID )
numPeers --
}
if numPeers < pm . maxPeers {
pm . logger . Debug ( "finished pruning peer store" , zap . Int ( "capacity" , pm . maxPeers ) , zap . Int ( "beforeNumPeers" , peerCntBeforePruning ) , zap . Int ( "afterNumPeers" , numPeers ) )
return
}
}
notConnectedPeers := pm . getPeersBasedOnconnectionStatus ( "" , network . NotConnected )
peersByTopic := make ( map [ string ] peer . IDSlice )
var prunedPeers peer . IDSlice
//prune not connected peers without shard
for _ , peerID := range notConnectedPeers {
topics , err := pm . host . Peerstore ( ) . ( wps . WakuPeerstore ) . PubSubTopics ( peerID )
//Prune peers without pubsubtopics.
if err != nil || len ( topics ) == 0 {
if err != nil {
pm . logger . Error ( "pruning:failed to fetch pubsub topics" , zap . Error ( err ) , zap . Stringer ( "peer" , peerID ) )
}
prunedPeers = append ( prunedPeers , peerID )
pm . host . Peerstore ( ) . RemovePeer ( peerID )
numPeers --
} else {
prunedPeers = append ( prunedPeers , peerID )
for topic := range topics {
peersByTopic [ topic ] = append ( peersByTopic [ topic ] , peerID )
}
}
if numPeers < pm . maxPeers {
pm . logger . Debug ( "finished pruning peer store" , zap . Int ( "capacity" , pm . maxPeers ) , zap . Int ( "beforeNumPeers" , peerCntBeforePruning ) , zap . Int ( "afterNumPeers" , numPeers ) , zap . Stringers ( "prunedPeers" , prunedPeers ) )
return
}
}
pm . logger . Debug ( "pruned notconnected peers" , zap . Stringers ( "prunedPeers" , prunedPeers ) )
// calculate the avg peers per shard
total , maxPeerCnt := 0 , 0
for _ , peersInTopic := range peersByTopic {
peerLen := len ( peersInTopic )
total += peerLen
if peerLen > maxPeerCnt {
maxPeerCnt = peerLen
}
}
avgPerTopic := min ( 1 , total / maxPeerCnt )
// prune peers from shard with higher than avg count
for topic , peers := range peersByTopic {
count := max ( len ( peers ) - avgPerTopic , 0 )
var prunedPeers peer . IDSlice
for i , pID := range peers {
if i > count {
break
}
prunedPeers = append ( prunedPeers , pID )
pm . host . Peerstore ( ) . RemovePeer ( pID )
numPeers --
if numPeers < pm . maxPeers {
pm . logger . Debug ( "finished pruning peer store" , zap . Int ( "capacity" , pm . maxPeers ) , zap . Int ( "beforeNumPeers" , peerCntBeforePruning ) , zap . Int ( "afterNumPeers" , numPeers ) , zap . Stringers ( "prunedPeers" , prunedPeers ) )
return
}
}
pm . logger . Debug ( "pruned peers higher than average" , zap . Stringers ( "prunedPeers" , prunedPeers ) , zap . String ( "topic" , topic ) )
}
pm . logger . Debug ( "finished pruning peer store" , zap . Int ( "capacity" , pm . maxPeers ) , zap . Int ( "beforeNumPeers" , peerCntBeforePruning ) , zap . Int ( "afterNumPeers" , numPeers ) )
2023-08-03 21:51:15 +05:30
}
// This is a connectivity loop, which currently checks and prunes inbound connections.
func ( pm * PeerManager ) connectivityLoop ( ctx context . Context ) {
2024-09-25 17:15:20 +08:00
defer utils . LogOnPanic ( )
2024-06-26 06:18:44 +05:30
pm . connectToPeers ( )
2023-08-03 21:51:15 +05:30
t := time . NewTicker ( peerConnectivityLoopSecs * time . Second )
2023-08-15 06:57:51 +05:30
defer t . Stop ( )
2023-08-03 21:51:15 +05:30
for {
select {
case <- ctx . Done ( ) :
return
case <- t . C :
2024-06-26 06:18:44 +05:30
pm . connectToPeers ( )
2023-08-03 21:51:15 +05:30
}
}
}
2023-08-15 06:57:51 +05:30
// GroupPeersByDirection returns all the connected peers in peer store grouped by Inbound or outBound direction
2023-09-29 10:43:25 +05:30
func ( pm * PeerManager ) GroupPeersByDirection ( specificPeers ... peer . ID ) ( inPeers peer . IDSlice , outPeers peer . IDSlice , err error ) {
2023-09-27 12:16:37 +05:30
if len ( specificPeers ) == 0 {
specificPeers = pm . host . Network ( ) . Peers ( )
}
2023-08-15 06:57:51 +05:30
2023-09-27 12:16:37 +05:30
for _ , p := range specificPeers {
2023-08-15 06:57:51 +05:30
direction , err := pm . host . Peerstore ( ) . ( wps . WakuPeerstore ) . Direction ( p )
if err == nil {
if direction == network . DirInbound {
inPeers = append ( inPeers , p )
} else if direction == network . DirOutbound {
outPeers = append ( outPeers , p )
}
} else {
2023-11-15 20:09:09 +05:30
pm . logger . Error ( "failed to retrieve peer direction" ,
2024-06-26 06:18:44 +05:30
zap . Stringer ( "peerID" , p ) , zap . Error ( err ) )
2023-08-15 06:57:51 +05:30
}
}
return inPeers , outPeers , nil
}
2023-08-03 21:51:15 +05:30
2023-09-27 12:16:37 +05:30
// getRelayPeers - Returns list of in and out peers supporting WakuRelayProtocol within specifiedPeers.
// If specifiedPeers is empty, it checks within all peers in peerStore.
2023-09-29 10:43:25 +05:30
func ( pm * PeerManager ) getRelayPeers ( specificPeers ... peer . ID ) ( inRelayPeers peer . IDSlice , outRelayPeers peer . IDSlice ) {
2023-08-03 21:51:15 +05:30
//Group peers by their connected direction inbound or outbound.
2023-09-29 10:43:25 +05:30
inPeers , outPeers , err := pm . GroupPeersByDirection ( specificPeers ... )
2023-08-03 21:51:15 +05:30
if err != nil {
return
}
2023-11-15 20:09:09 +05:30
pm . logger . Debug ( "number of peers connected" , zap . Int ( "inPeers" , inPeers . Len ( ) ) ,
2023-08-15 06:57:51 +05:30
zap . Int ( "outPeers" , outPeers . Len ( ) ) )
2023-08-03 21:51:15 +05:30
//Need to filter peers to check if they support relay
2023-08-17 18:26:20 +05:30
if inPeers . Len ( ) != 0 {
2024-05-22 11:45:53 +05:30
inRelayPeers , _ = pm . FilterPeersByProto ( inPeers , nil , relay . WakuRelayID_v200 )
2023-08-17 18:26:20 +05:30
}
if outPeers . Len ( ) != 0 {
2024-05-22 11:45:53 +05:30
outRelayPeers , _ = pm . FilterPeersByProto ( outPeers , nil , relay . WakuRelayID_v200 )
2023-08-17 18:26:20 +05:30
}
2023-08-15 06:57:51 +05:30
return
}
2023-09-27 12:16:37 +05:30
// ensureMinRelayConnsPerTopic makes sure there are min of D conns per pubsubTopic.
// If not it will look into peerStore to initiate more connections.
// If peerStore doesn't have enough peers, will wait for discv5 to find more and try in next cycle
func ( pm * PeerManager ) ensureMinRelayConnsPerTopic ( ) {
pm . topicMutex . RLock ( )
defer pm . topicMutex . RUnlock ( )
for topicStr , topicInst := range pm . subRelayTopics {
2023-12-06 09:38:56 -04:00
2024-07-30 11:02:59 +05:30
meshPeerLen := pm . checkAndUpdateTopicHealth ( topicInst )
2024-08-06 17:51:11 +05:30
curConnectedPeerLen := pm . getPeersBasedOnconnectionStatus ( topicStr , network . Connected ) . Len ( )
if meshPeerLen < waku_proto . GossipSubDMin || curConnectedPeerLen < pm . OutPeersTarget {
2024-06-26 06:18:44 +05:30
pm . logger . Debug ( "subscribed topic has not reached target peers, initiating more connections to maintain healthy mesh" ,
2024-08-06 17:51:11 +05:30
zap . String ( "pubSubTopic" , topicStr ) , zap . Int ( "connectedPeerCount" , curConnectedPeerLen ) ,
2024-06-26 06:18:44 +05:30
zap . Int ( "targetPeers" , pm . OutPeersTarget ) )
2023-09-27 12:16:37 +05:30
//Find not connected peers.
2024-08-06 17:51:11 +05:30
notConnectedPeers := pm . getPeersBasedOnconnectionStatus ( topicStr , network . NotConnected )
2023-09-27 12:16:37 +05:30
if notConnectedPeers . Len ( ) == 0 {
2023-11-15 20:09:09 +05:30
pm . logger . Debug ( "could not find any peers in peerstore to connect to, discovering more" , zap . String ( "pubSubTopic" , topicStr ) )
2024-04-15 08:33:23 -04:00
go pm . discoverPeersByPubsubTopics ( [ ] string { topicStr } , relay . WakuRelayID_v200 , pm . ctx , 2 )
2023-09-27 12:16:37 +05:30
continue
}
2023-11-15 20:09:09 +05:30
pm . logger . Debug ( "connecting to eligible peers in peerstore" , zap . String ( "pubSubTopic" , topicStr ) )
2023-09-27 12:16:37 +05:30
//Connect to eligible peers.
2024-08-06 17:51:11 +05:30
numPeersToConnect := pm . OutPeersTarget - curConnectedPeerLen
if numPeersToConnect > 0 {
if numPeersToConnect > notConnectedPeers . Len ( ) {
numPeersToConnect = notConnectedPeers . Len ( )
}
pm . connectToSpecifiedPeers ( notConnectedPeers [ 0 : numPeersToConnect ] )
2023-09-27 12:16:37 +05:30
}
}
}
}
2023-08-15 06:57:51 +05:30
2024-06-26 06:18:44 +05:30
// connectToPeers ensures minimum D connections are there for each pubSubTopic.
2023-09-27 12:16:37 +05:30
// If not, initiates connections to additional peers.
// It also checks for incoming relay connections and prunes once they cross inRelayTarget
2024-06-26 06:18:44 +05:30
func ( pm * PeerManager ) connectToPeers ( ) {
if pm . RelayEnabled {
//Check for out peer connections and connect to more peers.
pm . ensureMinRelayConnsPerTopic ( )
inRelayPeers , outRelayPeers := pm . getRelayPeers ( )
pm . logger . Debug ( "number of relay peers connected" ,
zap . Int ( "in" , inRelayPeers . Len ( ) ) ,
zap . Int ( "out" , outRelayPeers . Len ( ) ) )
if inRelayPeers . Len ( ) > 0 &&
inRelayPeers . Len ( ) > pm . InPeersTarget {
pm . pruneInRelayConns ( inRelayPeers )
}
} else {
//TODO: Connect to filter peers per topic as of now.
//Fetch filter peers from peerStore, TODO: topics for lightNode not available here?
//Filter subscribe to notify peerManager whenever a new topic/shard is subscribed to.
pm . logger . Debug ( "light mode..not doing anything" )
2023-08-15 06:57:51 +05:30
}
}
2024-06-26 06:18:44 +05:30
// connectToSpecifiedPeers connects to peers provided in the list if the addresses have not expired.
func ( pm * PeerManager ) connectToSpecifiedPeers ( peers peer . IDSlice ) {
2023-08-15 06:57:51 +05:30
for _ , peerID := range peers {
2023-11-15 20:09:09 +05:30
peerData := AddrInfoToPeerData ( wps . PeerManager , peerID , pm . host )
2023-09-27 12:16:37 +05:30
if peerData == nil {
continue
}
pm . peerConnector . PushToChan ( * peerData )
2023-08-15 06:57:51 +05:30
}
}
2024-08-06 17:51:11 +05:30
// getPeersBasedOnconnectionStatus returns peers for a pubSubTopic that are either connected/not-connected based on status passed.
func ( pm * PeerManager ) getPeersBasedOnconnectionStatus ( pubsubTopic string , connected network . Connectedness ) ( filteredPeers peer . IDSlice ) {
2023-09-27 12:16:37 +05:30
var peerList peer . IDSlice
if pubsubTopic == "" {
peerList = pm . host . Peerstore ( ) . Peers ( )
} else {
peerList = pm . host . Peerstore ( ) . ( * wps . WakuPeerstoreImpl ) . PeersByPubSubTopic ( pubsubTopic )
}
for _ , peerID := range peerList {
2024-08-06 17:51:11 +05:30
if pm . host . Network ( ) . Connectedness ( peerID ) == connected {
filteredPeers = append ( filteredPeers , peerID )
2023-08-15 06:57:51 +05:30
}
}
return
}
2023-09-27 12:16:37 +05:30
// pruneInRelayConns prune any incoming relay connections crossing derived inrelayPeerTarget
2023-08-28 10:47:48 +04:00
func ( pm * PeerManager ) pruneInRelayConns ( inRelayPeers peer . IDSlice ) {
2023-08-15 06:57:51 +05:30
//Start disconnecting peers, based on what?
2023-09-27 12:16:37 +05:30
//For now no preference is used
2023-08-15 06:57:51 +05:30
//TODO: Need to have more intelligent way of doing this, maybe peer scores.
2023-09-27 12:16:37 +05:30
//TODO: Keep optimalPeersRequired for a pubSubTopic in mind while pruning connections to peers.
pm . logger . Info ( "peer connections exceed target relay peers, hence pruning" ,
2024-06-26 06:18:44 +05:30
zap . Int ( "cnt" , inRelayPeers . Len ( ) ) , zap . Int ( "target" , pm . InPeersTarget ) )
for pruningStartIndex := pm . InPeersTarget ; pruningStartIndex < inRelayPeers . Len ( ) ; pruningStartIndex ++ {
2023-08-15 06:57:51 +05:30
p := inRelayPeers [ pruningStartIndex ]
err := pm . host . Network ( ) . ClosePeer ( p )
if err != nil {
2023-11-15 20:09:09 +05:30
pm . logger . Warn ( "failed to disconnect connection towards peer" ,
2024-06-26 06:18:44 +05:30
zap . Stringer ( "peerID" , p ) )
2023-08-03 21:51:15 +05:30
}
2023-11-15 20:09:09 +05:30
pm . logger . Debug ( "successfully disconnected connection towards peer" ,
2024-06-26 06:18:44 +05:30
zap . Stringer ( "peerID" , p ) )
2023-08-03 21:51:15 +05:30
}
}
2023-08-10 18:28:22 +05:30
2023-11-07 22:43:19 +05:30
func ( pm * PeerManager ) processPeerENR ( p * service . PeerData ) [ ] protocol . ID {
shards , err := wenr . RelaySharding ( p . ENR . Record ( ) )
if err != nil {
pm . logger . Error ( "could not derive relayShards from ENR" , zap . Error ( err ) ,
2024-06-26 06:18:44 +05:30
zap . Stringer ( "peer" , p . AddrInfo . ID ) , zap . String ( "enr" , p . ENR . String ( ) ) )
2023-11-07 22:43:19 +05:30
} else {
if shards != nil {
2023-11-14 04:22:46 +05:30
p . PubsubTopics = make ( [ ] string , 0 )
2023-11-07 22:43:19 +05:30
topics := shards . Topics ( )
for _ , topic := range topics {
topicStr := topic . String ( )
2023-11-14 04:22:46 +05:30
p . PubsubTopics = append ( p . PubsubTopics , topicStr )
2023-11-07 22:43:19 +05:30
}
} else {
2024-06-26 06:18:44 +05:30
pm . logger . Debug ( "ENR doesn't have relay shards" , zap . Stringer ( "peer" , p . AddrInfo . ID ) )
2023-11-07 22:43:19 +05:30
}
}
supportedProtos := [ ] protocol . ID { }
//Identify and specify protocols supported by the peer based on the discovered peer's ENR
2024-10-03 10:12:31 -04:00
enrField , err := wenr . GetWakuEnrBitField ( p . ENR )
if err == nil {
2023-11-07 22:43:19 +05:30
for proto , protoENR := range pm . wakuprotoToENRFieldMap {
protoENRField := protoENR . waku2ENRBitField
if protoENRField & enrField != 0 {
supportedProtos = append ( supportedProtos , proto )
//Add Service peers to serviceSlots.
pm . addPeerToServiceSlot ( proto , p . AddrInfo . ID )
}
}
}
return supportedProtos
}
2023-08-10 18:28:22 +05:30
// AddDiscoveredPeer to add dynamically discovered peers.
// Note that these peers will not be set in service-slots.
2023-11-07 22:43:19 +05:30
func ( pm * PeerManager ) AddDiscoveredPeer ( p service . PeerData , connectNow bool ) {
2023-09-27 12:16:37 +05:30
//Check if the peer is already present, if so skip adding
_ , err := pm . host . Peerstore ( ) . ( wps . WakuPeerstore ) . Origin ( p . AddrInfo . ID )
if err == nil {
2024-06-18 08:06:16 +05:30
//Add addresses if existing addresses have expired
existingAddrs := pm . host . Peerstore ( ) . Addrs ( p . AddrInfo . ID )
if len ( existingAddrs ) == 0 {
pm . host . Peerstore ( ) . AddAddrs ( p . AddrInfo . ID , p . AddrInfo . Addrs , peerstore . AddressTTL )
}
2023-12-06 09:12:48 -04:00
enr , err := pm . host . Peerstore ( ) . ( wps . WakuPeerstore ) . ENR ( p . AddrInfo . ID )
// Verifying if the enr record is more recent (DiscV5 and peer exchange can return peers already seen)
2024-05-28 18:20:47 +05:30
if err == nil {
if p . ENR != nil {
if enr . Record ( ) . Seq ( ) >= p . ENR . Seq ( ) {
return
}
//Peer is already in peer-store but stored ENR is older than discovered one.
pm . logger . Info ( "peer already found in peerstore, but re-adding it as ENR sequence is higher than locally stored" ,
2024-06-26 06:18:44 +05:30
zap . Stringer ( "peer" , p . AddrInfo . ID ) , logging . Uint64 ( "newENRSeq" , p . ENR . Record ( ) . Seq ( ) ) , logging . Uint64 ( "storedENRSeq" , enr . Record ( ) . Seq ( ) ) )
2024-05-28 18:20:47 +05:30
} else {
2024-06-26 06:18:44 +05:30
pm . logger . Info ( "peer already found in peerstore, but no new ENR" , zap . Stringer ( "peer" , p . AddrInfo . ID ) )
2024-05-28 18:20:47 +05:30
}
2024-01-04 20:03:42 +05:30
} else {
2024-05-28 18:20:47 +05:30
//Peer is in peer-store but it doesn't have an enr
pm . logger . Info ( "peer already found in peerstore, but doesn't have an ENR record, re-adding" ,
2024-06-26 06:18:44 +05:30
zap . Stringer ( "peer" , p . AddrInfo . ID ) )
2024-01-04 20:03:42 +05:30
}
2023-09-27 12:16:37 +05:30
}
2024-06-26 06:18:44 +05:30
pm . logger . Debug ( "adding discovered peer" , zap . Stringer ( "peerID" , p . AddrInfo . ID ) )
2023-12-06 09:12:48 -04:00
2023-11-07 22:43:19 +05:30
supportedProtos := [ ] protocol . ID { }
2023-11-14 04:22:46 +05:30
if len ( p . PubsubTopics ) == 0 && p . ENR != nil {
2023-11-07 22:43:19 +05:30
// Try to fetch shard info and supported protocols from ENR to arrive at pubSub topics.
supportedProtos = pm . processPeerENR ( & p )
2023-09-14 20:30:06 +05:30
}
2023-08-10 18:28:22 +05:30
2023-11-14 04:22:46 +05:30
_ = pm . addPeer ( p . AddrInfo . ID , p . AddrInfo . Addrs , p . Origin , p . PubsubTopics , supportedProtos ... )
2023-08-10 18:28:22 +05:30
if p . ENR != nil {
2024-06-26 06:18:44 +05:30
pm . logger . Debug ( "setting ENR for peer" , zap . Stringer ( "peerID" , p . AddrInfo . ID ) , zap . Stringer ( "enr" , p . ENR ) )
2023-08-10 18:28:22 +05:30
err := pm . host . Peerstore ( ) . ( wps . WakuPeerstore ) . SetENR ( p . AddrInfo . ID , p . ENR )
if err != nil {
2023-08-15 06:57:51 +05:30
pm . logger . Error ( "could not store enr" , zap . Error ( err ) ,
2024-06-26 06:18:44 +05:30
zap . Stringer ( "peer" , p . AddrInfo . ID ) , zap . String ( "enr" , p . ENR . String ( ) ) )
2023-08-10 18:28:22 +05:30
}
}
2023-09-27 12:16:37 +05:30
if connectNow {
2024-06-26 06:18:44 +05:30
pm . logger . Debug ( "connecting now to discovered peer" , zap . Stringer ( "peer" , p . AddrInfo . ID ) )
2023-10-25 21:25:56 -04:00
go pm . peerConnector . PushToChan ( p )
2023-09-27 12:16:37 +05:30
}
2023-08-10 18:28:22 +05:30
}
2024-06-18 08:06:16 +05:30
// addPeer adds peer to the peerStore.
// It also sets additional metadata such as origin and supported protocols
2023-09-14 20:30:06 +05:30
func ( pm * PeerManager ) addPeer ( ID peer . ID , addrs [ ] ma . Multiaddr , origin wps . Origin , pubSubTopics [ ] string , protocols ... protocol . ID ) error {
2024-08-21 18:08:11 +05:30
2024-06-26 06:18:44 +05:30
pm . logger . Info ( "adding peer to peerstore" , zap . Stringer ( "peer" , ID ) )
2023-09-27 12:16:37 +05:30
if origin == wps . Static {
pm . host . Peerstore ( ) . AddAddrs ( ID , addrs , peerstore . PermanentAddrTTL )
} else {
//Need to re-evaluate the address expiry
// For now expiring them with default addressTTL which is an hour.
pm . host . Peerstore ( ) . AddAddrs ( ID , addrs , peerstore . AddressTTL )
}
2023-08-10 18:28:22 +05:30
err := pm . host . Peerstore ( ) . ( wps . WakuPeerstore ) . SetOrigin ( ID , origin )
if err != nil {
2024-06-26 06:18:44 +05:30
pm . logger . Error ( "could not set origin" , zap . Error ( err ) , zap . Stringer ( "peer" , ID ) )
2023-08-10 18:28:22 +05:30
return err
}
if len ( protocols ) > 0 {
err = pm . host . Peerstore ( ) . AddProtocols ( ID , protocols ... )
if err != nil {
2024-06-26 06:18:44 +05:30
pm . logger . Error ( "could not set protocols" , zap . Error ( err ) , zap . Stringer ( "peer" , ID ) )
2023-08-10 18:28:22 +05:30
return err
}
}
2023-09-14 20:30:06 +05:30
if len ( pubSubTopics ) == 0 {
// Probably the peer is discovered via DNSDiscovery (for which we don't have pubSubTopic info)
//If pubSubTopic and enr is empty or no shard info in ENR,then set to defaultPubSubTopic
pubSubTopics = [ ] string { relay . DefaultWakuTopic }
}
err = pm . host . Peerstore ( ) . ( wps . WakuPeerstore ) . SetPubSubTopics ( ID , pubSubTopics )
if err != nil {
pm . logger . Error ( "could not store pubSubTopic" , zap . Error ( err ) ,
2024-06-26 06:18:44 +05:30
zap . Stringer ( "peer" , ID ) , zap . Strings ( "topics" , pubSubTopics ) )
2023-09-14 20:30:06 +05:30
}
2023-08-10 18:28:22 +05:30
return nil
}
2023-11-14 04:22:46 +05:30
func AddrInfoToPeerData ( origin wps . Origin , peerID peer . ID , host host . Host , pubsubTopics ... string ) * service . PeerData {
addrs := host . Peerstore ( ) . Addrs ( peerID )
if len ( addrs ) == 0 {
//Addresses expired, remove peer from peerStore
host . Peerstore ( ) . RemovePeer ( peerID )
return nil
}
return & service . PeerData {
Origin : origin ,
AddrInfo : peer . AddrInfo {
ID : peerID ,
Addrs : addrs ,
} ,
PubsubTopics : pubsubTopics ,
}
}
2023-08-10 18:28:22 +05:30
// AddPeer adds peer to the peerStore and also to service slots
2024-12-19 14:47:13 -04:00
func ( pm * PeerManager ) AddPeer ( addresses [ ] ma . Multiaddr , origin wps . Origin , pubsubTopics [ ] string , protocols ... protocol . ID ) ( * service . PeerData , error ) {
2023-08-10 18:28:22 +05:30
//Assuming all addresses have peerId
2024-12-19 14:47:13 -04:00
infoArr , err := peer . AddrInfosFromP2pAddrs ( addresses ... )
2023-08-10 18:28:22 +05:30
if err != nil {
2023-11-14 04:22:46 +05:30
return nil , err
2023-08-10 18:28:22 +05:30
}
2024-12-19 14:47:13 -04:00
if len ( infoArr ) > 1 {
return nil , errors . New ( "only a single peerID is expected in AddPeer" )
}
info := infoArr [ 0 ]
2023-08-10 18:28:22 +05:30
//Add Service peers to serviceSlots.
for _ , proto := range protocols {
2023-08-30 21:33:57 +07:00
pm . addPeerToServiceSlot ( proto , info . ID )
2023-08-10 18:28:22 +05:30
}
//Add to the peer-store
2023-11-14 04:22:46 +05:30
err = pm . addPeer ( info . ID , info . Addrs , origin , pubsubTopics , protocols ... )
2023-08-10 18:28:22 +05:30
if err != nil {
2023-11-14 04:22:46 +05:30
return nil , err
2023-08-10 18:28:22 +05:30
}
2023-11-14 04:22:46 +05:30
pData := & service . PeerData {
2024-12-19 14:47:13 -04:00
Origin : origin ,
AddrInfo : info ,
2023-11-14 04:22:46 +05:30
PubsubTopics : pubsubTopics ,
}
return pData , nil
}
2023-11-14 16:47:49 +05:30
// Connect establishes a connection to a
2023-11-14 04:22:46 +05:30
func ( pm * PeerManager ) Connect ( pData * service . PeerData ) {
go pm . peerConnector . PushToChan ( * pData )
2023-08-10 18:28:22 +05:30
}
// RemovePeer deletes peer from the peerStore after disconnecting it.
// It also removes the peer from serviceSlot.
func ( pm * PeerManager ) RemovePeer ( peerID peer . ID ) {
pm . host . Peerstore ( ) . RemovePeer ( peerID )
//Search if this peer is in serviceSlot and if so, remove it from there
// TODO:Add another peer which is statically configured to the serviceSlot.
2023-08-30 18:57:22 +07:00
pm . serviceSlots . removePeer ( peerID )
2023-08-10 18:28:22 +05:30
}
2023-08-30 21:33:57 +07:00
// addPeerToServiceSlot adds a peerID to serviceSlot.
2023-08-10 18:28:22 +05:30
// Adding to peerStore is expected to be already done by caller.
// If relay proto is passed, it is not added to serviceSlot.
2023-08-30 21:33:57 +07:00
func ( pm * PeerManager ) addPeerToServiceSlot ( proto protocol . ID , peerID peer . ID ) {
2023-09-14 14:06:08 +05:30
if proto == relay . WakuRelayID_v200 {
2023-11-14 16:47:49 +05:30
pm . logger . Debug ( "cannot add Relay peer to service peer slots" )
2023-08-10 18:28:22 +05:30
return
}
//For now adding the peer to serviceSlot which means the latest added peer would be given priority.
//TODO: Ideally we should sort the peers per service and return best peer based on peer score or RTT etc.
2024-06-26 06:18:44 +05:30
pm . logger . Info ( "adding peer to service slots" , zap . Stringer ( "peer" , peerID ) ,
2023-08-15 06:57:51 +05:30
zap . String ( "service" , string ( proto ) ) )
2023-08-30 18:57:22 +07:00
// getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol
pm . serviceSlots . getPeers ( proto ) . add ( peerID )
2023-08-10 18:28:22 +05:30
}
2024-09-23 14:41:07 -07:00
func ( pm * PeerManager ) HandleDialError ( err error , peerID peer . ID ) {
if err == nil || errors . Is ( err , context . Canceled ) {
return
}
2024-12-09 14:14:28 +05:30
2024-09-23 14:41:07 -07:00
if pm . peerConnector != nil {
pm . peerConnector . addConnectionBackoff ( peerID )
}
if pm . host != nil {
pm . host . Peerstore ( ) . ( wps . WakuPeerstore ) . AddConnFailure ( peerID )
}
pm . logger . Warn ( "connecting to peer" , logging . HostID ( "peerID" , peerID ) , zap . Error ( err ) )
if pm . evtDialError != nil {
emitterErr := pm . evtDialError . Emit ( utils . DialError { Err : err , PeerID : peerID } )
if emitterErr != nil {
pm . logger . Error ( "failed to emit DialError" , zap . Error ( emitterErr ) )
}
}
}