From 4f232c40caccb20645495748fadaa0f9e7b51fa1 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Thu, 8 Feb 2024 15:24:58 +0530 Subject: [PATCH] feat: topic health reporting (#1027) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: richΛrd --- waku/v2/node/connectedness.go | 67 ------------ waku/v2/node/connectedness_test.go | 114 +++++++++++---------- waku/v2/node/wakunode2.go | 26 +---- waku/v2/node/wakuoptions.go | 22 ++-- waku/v2/node/wakuoptions_test.go | 22 ++-- waku/v2/peermanager/peer_connector.go | 2 +- waku/v2/peermanager/peer_manager.go | 97 ++++++++++++++++-- waku/v2/peermanager/topic_event_handler.go | 19 +++- waku/v2/peerstore/waku_peer_store.go | 19 ++++ waku/v2/protocol/relay/config.go | 14 +-- waku/v2/protocol/relay/waku_relay.go | 2 + waku/v2/protocol/utils.go | 2 +- 12 files changed, 218 insertions(+), 188 deletions(-) diff --git a/waku/v2/node/connectedness.go b/waku/v2/node/connectedness.go index 7aa50960..5a0f89fe 100644 --- a/waku/v2/node/connectedness.go +++ b/waku/v2/node/connectedness.go @@ -9,10 +9,6 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/logging" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" - "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/protocol/store" "go.uber.org/zap" wps "github.com/waku-org/go-waku/waku/v2/peerstore" @@ -21,14 +17,6 @@ import ( // PeerStatis is a map of peer IDs to supported protocols type PeerStats map[peer.ID][]protocol.ID -// ConnStatus is used to indicate if the node is online, has access to history -// and also see the list of peers the node is aware of -type ConnStatus struct { - IsOnline bool - HasHistory bool - Peers PeerStats -} - type PeerConnection struct { PeerID peer.ID Connected bool @@ -112,15 +100,6 @@ func (c ConnectionNotifier) ClosedStream(n network.Network, s network.Stream) { func (c ConnectionNotifier) Close() { } -func (w *WakuNode) sendConnStatus() { - isOnline, hasHistory := w.Status() - if w.connStatusChan != nil { - connStatus := ConnStatus{IsOnline: isOnline, HasHistory: hasHistory, Peers: w.PeerStats()} - w.connStatusChan <- connStatus - } - -} - func (w *WakuNode) connectednessListener(ctx context.Context) { defer w.wg.Done() @@ -128,53 +107,7 @@ func (w *WakuNode) connectednessListener(ctx context.Context) { select { case <-ctx.Done(): return - case <-w.protocolEventSub.Out(): - case <-w.identificationEventSub.Out(): case <-w.connectionNotif.DisconnectChan: } - w.sendConnStatus() } } - -// Status returns the current status of the node (online or not) -// and if the node has access to history nodes or not -func (w *WakuNode) Status() (isOnline bool, hasHistory bool) { - hasRelay := false - hasLightPush := false - hasStore := false - hasFilter := false - - for _, peer := range w.host.Network().Peers() { - protocols, err := w.host.Peerstore().GetProtocols(peer) - if err != nil { - w.log.Warn("reading peer protocols", logging.HostID("peer", peer), zap.Error(err)) - } - - for _, protocol := range protocols { - if !hasRelay && protocol == relay.WakuRelayID_v200 { - hasRelay = true - } - if !hasLightPush && protocol == lightpush.LightPushID_v20beta1 { - hasLightPush = true - } - if !hasStore && protocol == store.StoreID_v20beta4 { - hasStore = true - } - if !hasFilter && protocol == legacy_filter.FilterID_v20beta1 { - hasFilter = true - } - } - } - - if hasStore { - hasHistory = true - } - - if w.opts.enableFilterLightNode && !w.opts.enableRelay { - isOnline = hasLightPush && hasFilter - } else { - isOnline = hasRelay || hasLightPush && (hasStore || hasFilter) - } - - return -} diff --git a/waku/v2/node/connectedness_test.go b/waku/v2/node/connectedness_test.go index 6e1aea23..2f061d84 100644 --- a/waku/v2/node/connectedness_test.go +++ b/waku/v2/node/connectedness_test.go @@ -7,37 +7,33 @@ import ( "testing" "time" - "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/waku/persistence" - "github.com/waku-org/go-waku/waku/persistence/sqlite" - "github.com/waku-org/go-waku/waku/v2/utils" + "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/protocol" ) -func goCheckConnectedness(t *testing.T, wg *sync.WaitGroup, connStatusChan chan ConnStatus, clientNode *WakuNode, node *WakuNode, nodeShouldBeConnected bool, shouldBeOnline bool, shouldHaveHistory bool, expectedPeers int) { +const pubsubTopic = "/waku/2/rs/16/1000" + +func goCheckConnectedness(t *testing.T, wg *sync.WaitGroup, topicHealthStatusChan chan peermanager.TopicHealthStatus, + healthStatus peermanager.TopicHealth) { wg.Add(1) - go checkConnectedness(t, wg, connStatusChan, clientNode, node, nodeShouldBeConnected, shouldBeOnline, shouldHaveHistory, expectedPeers) + go checkConnectedness(t, wg, topicHealthStatusChan, healthStatus) } -func checkConnectedness(t *testing.T, wg *sync.WaitGroup, connStatusChan chan ConnStatus, clientNode *WakuNode, node *WakuNode, nodeShouldBeConnected bool, shouldBeOnline bool, shouldHaveHistory bool, expectedPeers int) { +func checkConnectedness(t *testing.T, wg *sync.WaitGroup, topicHealthStatusChan chan peermanager.TopicHealthStatus, + healthStatus peermanager.TopicHealth) { defer wg.Done() timeout := time.After(5 * time.Second) select { - case connStatus := <-connStatusChan: - _, ok := connStatus.Peers[node.Host().ID()] - if (nodeShouldBeConnected && ok) || (!nodeShouldBeConnected && !ok) { - // Only execute the test when the node is connected or disconnected and it does not appear in the map returned by the connection status channel - require.True(t, connStatus.IsOnline == shouldBeOnline) - require.True(t, connStatus.HasHistory == shouldHaveHistory) - require.Len(t, clientNode.Host().Network().Peers(), expectedPeers) - return - } - + case topicHealthStatus := <-topicHealthStatusChan: + require.Equal(t, healthStatus, topicHealthStatus.Health) + t.Log("received health status update ", topicHealthStatus.Health, "expected is ", healthStatus) + return case <-timeout: - require.Fail(t, "node should have connected") - + require.Fail(t, "health status should have changed") } } @@ -45,7 +41,7 @@ func TestConnectionStatusChanges(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - connStatusChan := make(chan ConnStatus, 100) + topicHealthStatusChan := make(chan peermanager.TopicHealthStatus, 100) // Node1: Only Relay hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") @@ -53,70 +49,80 @@ func TestConnectionStatusChanges(t *testing.T) { node1, err := New( WithHostAddress(hostAddr1), WithWakuRelay(), - WithConnectionStatusChannel(connStatusChan), + WithTopicHealthStatusChannel(topicHealthStatusChan), ) require.NoError(t, err) err = node1.Start(ctx) require.NoError(t, err) + _, err = node1.Relay().Subscribe(ctx, protocol.NewContentFilter(pubsubTopic)) + require.NoError(t, err) // Node2: Relay - hostAddr2, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") - require.NoError(t, err) - node2, err := New( - WithHostAddress(hostAddr2), - WithWakuRelay(), - ) - require.NoError(t, err) - err = node2.Start(ctx) - require.NoError(t, err) - - db, err := sqlite.NewDB(":memory:", utils.Logger()) - require.NoError(t, err) - dbStore, err := persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(sqlite.Migrations)) - require.NoError(t, err) + node2 := startNodeAndSubscribe(t, ctx) // Node3: Relay + Store - hostAddr3, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") - require.NoError(t, err) - node3, err := New( - WithHostAddress(hostAddr3), - WithWakuRelay(), - WithWakuStore(), - WithMessageProvider(dbStore), - ) - require.NoError(t, err) - err = node3.Start(ctx) - require.NoError(t, err) + node3 := startNodeAndSubscribe(t, ctx) + + // Node4: Relay + node4 := startNodeAndSubscribe(t, ctx) + + // Node5: Relay + node5 := startNodeAndSubscribe(t, ctx) var wg sync.WaitGroup - goCheckConnectedness(t, &wg, connStatusChan, node1, node2, true, true, false, 1) + goCheckConnectedness(t, &wg, topicHealthStatusChan, peermanager.MinimallyHealthy) - err = node1.DialPeer(ctx, node2.ListenAddresses()[0].String()) - require.NoError(t, err) + node1.AddDiscoveredPeer(node2.host.ID(), node2.ListenAddresses(), peerstore.Static, []string{pubsubTopic}, true) wg.Wait() - goCheckConnectedness(t, &wg, connStatusChan, node1, node3, true, true, true, 2) - err = node1.DialPeer(ctx, node3.ListenAddresses()[0].String()) require.NoError(t, err) - goCheckConnectedness(t, &wg, connStatusChan, node1, node3, false, true, false, 1) + err = node1.DialPeer(ctx, node4.ListenAddresses()[0].String()) + require.NoError(t, err) + goCheckConnectedness(t, &wg, topicHealthStatusChan, peermanager.SufficientlyHealthy) + + err = node1.DialPeer(ctx, node5.ListenAddresses()[0].String()) + require.NoError(t, err) + + wg.Wait() + + goCheckConnectedness(t, &wg, topicHealthStatusChan, peermanager.MinimallyHealthy) node3.Stop() wg.Wait() - goCheckConnectedness(t, &wg, connStatusChan, node1, node2, false, false, false, 0) + goCheckConnectedness(t, &wg, topicHealthStatusChan, peermanager.UnHealthy) err = node1.ClosePeerById(node2.Host().ID()) require.NoError(t, err) + + node4.Stop() + node5.Stop() + wg.Wait() - goCheckConnectedness(t, &wg, connStatusChan, node1, node2, true, true, false, 1) + goCheckConnectedness(t, &wg, topicHealthStatusChan, peermanager.MinimallyHealthy) err = node1.DialPeerByID(ctx, node2.Host().ID()) require.NoError(t, err) wg.Wait() } + +func startNodeAndSubscribe(t *testing.T, ctx context.Context) *WakuNode { + hostAddr, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") + require.NoError(t, err) + node, err := New( + WithHostAddress(hostAddr), + WithWakuRelay(), + ) + require.NoError(t, err) + err = node.Start(ctx) + require.NoError(t, err) + _, err = node.Relay().Subscribe(ctx, protocol.NewContentFilter(pubsubTopic)) + require.NoError(t, err) + return node +} diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 378ac06b..2c89eb9e 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -112,11 +112,9 @@ type WakuNode struct { bcaster relay.Broadcaster - connectionNotif ConnectionNotifier - protocolEventSub event.Subscription - identificationEventSub event.Subscription - addressChangesSub event.Subscription - enrChangeCh chan struct{} + connectionNotif ConnectionNotifier + addressChangesSub event.Subscription + enrChangeCh chan struct{} keepAliveMutex sync.Mutex keepAliveFails map[peer.ID]int @@ -124,10 +122,6 @@ type WakuNode struct { cancel context.CancelFunc wg *sync.WaitGroup - // Channel passed to WakuNode constructor - // receiving connection status notifications - connStatusChan chan<- ConnStatus - storeFactory storeFactory peermanager *peermanager.PeerManager @@ -306,8 +300,8 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.storeFactory = defaultStoreFactory } - if params.connStatusC != nil { - w.connStatusChan = params.connStatusC + if params.topicHealthNotifCh != nil { + w.peermanager.TopicHealthNotifCh = params.topicHealthNotifCh } return w, nil @@ -364,14 +358,6 @@ func (w *WakuNode) Start(ctx context.Context) error { w.host = host - if w.protocolEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerProtocolsUpdated)); err != nil { - return err - } - - if w.identificationEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted)); err != nil { - return err - } - if w.addressChangesSub, err = host.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated)); err != nil { return err } @@ -519,8 +505,6 @@ func (w *WakuNode) Stop() { w.bcaster.Stop() defer w.connectionNotif.Close() - defer w.protocolEventSub.Close() - defer w.identificationEventSub.Close() defer w.addressChangesSub.Close() w.host.Network().StopNotify(w.connectionNotif) diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 5cbb4f24..50970a1c 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -26,6 +26,7 @@ import ( "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" "github.com/prometheus/client_golang/prometheus" + "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -115,8 +116,8 @@ type WakuNodeParameters struct { enableLightPush bool - connStatusC chan<- ConnStatus - connNotifCh chan<- PeerConnection + connNotifCh chan<- PeerConnection + topicHealthNotifCh chan<- peermanager.TopicHealthStatus storeFactory storeFactory } @@ -489,16 +490,6 @@ func WithKeepAlive(t time.Duration) WakuNodeOption { } } -// WithConnectionStatusChannel is a WakuNodeOption used to set a channel where the -// connection status changes will be pushed to. It's useful to identify when peer -// connections and disconnections occur -func WithConnectionStatusChannel(connStatus chan ConnStatus) WakuNodeOption { - return func(params *WakuNodeParameters) error { - params.connStatusC = connStatus - return nil - } -} - func WithConnectionNotification(ch chan<- PeerConnection) WakuNodeOption { return func(params *WakuNodeParameters) error { params.connNotifCh = ch @@ -566,6 +557,13 @@ func WithCircuitRelayParams(minInterval time.Duration, bootDelay time.Duration) } } +func WithTopicHealthStatusChannel(ch chan<- peermanager.TopicHealthStatus) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.topicHealthNotifCh = ch + return nil + } +} + // Default options used in the libp2p node var DefaultLibP2POptions = []libp2p.Option{ libp2p.ChainOptions( diff --git a/waku/v2/node/wakuoptions_test.go b/waku/v2/node/wakuoptions_test.go index abe7a421..addd80a5 100644 --- a/waku/v2/node/wakuoptions_test.go +++ b/waku/v2/node/wakuoptions_test.go @@ -1,14 +1,16 @@ package node import ( - "github.com/ethereum/go-ethereum/common" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" - r "github.com/waku-org/go-zerokit-rln/rln" - "go.uber.org/zap" "net" "testing" "time" + "github.com/ethereum/go-ethereum/common" + "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + r "github.com/waku-org/go-zerokit-rln/rln" + "go.uber.org/zap" + "github.com/ethereum/go-ethereum/crypto" "github.com/multiformats/go-multiaddr" "github.com/prometheus/client_golang/prometheus" @@ -28,7 +30,7 @@ func handleSpam(msg *pb.WakuMessage, topic string) error { } func TestWakuOptions(t *testing.T) { - connStatusChan := make(chan ConnStatus, 100) + topicHealthStatusChan := make(chan peermanager.TopicHealthStatus, 100) key, err := tests.RandomHex(32) require.NoError(t, err) @@ -58,7 +60,7 @@ func TestWakuOptions(t *testing.T) { WithMessageProvider(&persistence.DBStore{}), WithLightPush(), WithKeepAlive(time.Hour), - WithConnectionStatusChannel(connStatusChan), + WithTopicHealthStatusChannel(topicHealthStatusChan), WithWakuStoreFactory(storeFactory), } @@ -70,11 +72,11 @@ func TestWakuOptions(t *testing.T) { require.NotNil(t, params.multiAddr) require.NotNil(t, params.privKey) - require.NotNil(t, params.connStatusC) + require.NotNil(t, params.topicHealthNotifCh) } func TestWakuRLNOptions(t *testing.T) { - connStatusChan := make(chan ConnStatus, 100) + topicHealthStatusChan := make(chan peermanager.TopicHealthStatus, 100) key, err := tests.RandomHex(32) require.NoError(t, err) @@ -108,7 +110,7 @@ func TestWakuRLNOptions(t *testing.T) { WithMessageProvider(&persistence.DBStore{}), WithLightPush(), WithKeepAlive(time.Hour), - WithConnectionStatusChannel(connStatusChan), + WithTopicHealthStatusChannel(topicHealthStatusChan), WithWakuStoreFactory(storeFactory), WithStaticRLNRelay(&index, handleSpam), } @@ -149,7 +151,7 @@ func TestWakuRLNOptions(t *testing.T) { WithMessageProvider(&persistence.DBStore{}), WithLightPush(), WithKeepAlive(time.Hour), - WithConnectionStatusChannel(connStatusChan), + WithTopicHealthStatusChannel(topicHealthStatusChan), WithWakuStoreFactory(storeFactory), WithDynamicRLNRelay(keystorePath, keystorePassword, rlnTreePath, common.HexToAddress(contractAddress), &index, handleSpam, ethClientAddress), } diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index 9474ad61..8dc8e603 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -127,7 +127,7 @@ func (c *PeerConnectionStrategy) consumeSubscription(s subscription) { triggerImmediateConnection := false //Not connecting to peer as soon as it is discovered, // rather expecting this to be pushed from PeerManager based on the need. - if len(c.host.Network().Peers()) < waku_proto.GossipSubOptimalFullMeshSize { + if len(c.host.Network().Peers()) < waku_proto.GossipSubDMin { triggerImmediateConnection = true } c.logger.Debug("adding discovered peer", logging.HostID("peerID", p.AddrInfo.ID)) diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 85806d05..54cba27b 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -26,9 +26,36 @@ import ( "go.uber.org/zap" ) +type TopicHealth int + +const ( + UnHealthy = iota + MinimallyHealthy = 1 + SufficientlyHealthy = 2 +) + +func (t TopicHealth) String() string { + switch t { + case UnHealthy: + return "UnHealthy" + case MinimallyHealthy: + return "MinimallyHealthy" + case SufficientlyHealthy: + return "SufficientlyHealthy" + default: + return "" + } +} + +type TopicHealthStatus struct { + Topic string + Health TopicHealth +} + // NodeTopicDetails stores pubSubTopic related data like topicHandle for the node. type NodeTopicDetails struct { - topic *pubsub.Topic + topic *pubsub.Topic + healthStatus TopicHealth } // WakuProtoInfo holds protocol specific info @@ -54,6 +81,7 @@ type PeerManager struct { subRelayTopics map[string]*NodeTopicDetails discoveryService *discv5.DiscoveryV5 wakuprotoToENRFieldMap map[protocol.ID]WakuProtoInfo + TopicHealthNotifCh chan<- TopicHealthStatus } // PeerSelection provides various options based on which Peer is selected from a list of peers. @@ -87,6 +115,57 @@ func inAndOutRelayPeers(relayPeers int) (int, int) { return relayPeers - outRelayPeers, outRelayPeers } +// checkAndUpdateTopicHealth finds health of specified topic and updates and notifies of the same. +// Also returns the healthyPeerCount +func (pm *PeerManager) checkAndUpdateTopicHealth(topic *NodeTopicDetails) int { + healthyPeerCount := 0 + for _, p := range topic.topic.ListPeers() { + if pm.host.Network().Connectedness(p) == network.Connected { + pThreshold, err := pm.host.Peerstore().(wps.WakuPeerstore).Score(p) + if err == nil { + if pThreshold < relay.PeerPublishThreshold { + pm.logger.Debug("peer score below publish threshold", logging.HostID("peer", p), zap.Float64("score", pThreshold)) + } else { + healthyPeerCount++ + } + } else { + pm.logger.Warn("failed to fetch peer score ", zap.Error(err), logging.HostID("peer", p)) + //For now considering peer as healthy if we can't fetch score. + healthyPeerCount++ + } + } + } + //Update topic's health + oldHealth := topic.healthStatus + if healthyPeerCount < 1 { //Ideally this check should be done with minPeersForRelay, but leaving it as is for now. + topic.healthStatus = UnHealthy + } else if healthyPeerCount < waku_proto.GossipSubDMin { + topic.healthStatus = MinimallyHealthy + } else { + topic.healthStatus = SufficientlyHealthy + } + + if oldHealth != topic.healthStatus { + //Check old health, and if there is a change notify of the same. + pm.logger.Debug("topic health has changed", zap.String("pubsubtopic", topic.topic.String()), zap.Stringer("health", topic.healthStatus)) + pm.TopicHealthNotifCh <- TopicHealthStatus{topic.topic.String(), topic.healthStatus} + } + return healthyPeerCount +} + +// TopicHealth can be used to fetch health of a specific pubsubTopic. +// Returns error if topic is not found. +func (pm *PeerManager) TopicHealth(pubsubTopic string) (TopicHealth, error) { + pm.topicMutex.RLock() + defer pm.topicMutex.RUnlock() + + topicDetails, ok := pm.subRelayTopics[pubsubTopic] + if !ok { + return UnHealthy, errors.New("topic not found") + } + return topicDetails.healthStatus, nil +} + // NewPeerManager creates a new peerManager instance. func NewPeerManager(maxConnections int, maxPeers int, logger *zap.Logger) *PeerManager { @@ -212,16 +291,12 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() { // @cammellos reported that ListPeers returned an invalid number of // peers. This will ensure that the peers returned by this function // match those peers that are currently connected - curPeerLen := 0 - for _, p := range topicInst.topic.ListPeers() { - if pm.host.Network().Connectedness(p) == network.Connected { - curPeerLen++ - } - } - if curPeerLen < waku_proto.GossipSubOptimalFullMeshSize { - pm.logger.Debug("subscribed topic is unhealthy, initiating more connections to maintain health", + + curPeerLen := pm.checkAndUpdateTopicHealth(topicInst) + if curPeerLen < waku_proto.GossipSubDMin { + pm.logger.Debug("subscribed topic is not sufficiently healthy, initiating more connections to maintain health", zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen), - zap.Int("optimumPeers", waku_proto.GossipSubOptimalFullMeshSize)) + zap.Int("optimumPeers", waku_proto.GossipSubDMin)) //Find not connected peers. notConnectedPeers := pm.getNotConnectedPers(topicStr) if notConnectedPeers.Len() == 0 { @@ -231,7 +306,7 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() { } pm.logger.Debug("connecting to eligible peers in peerstore", zap.String("pubSubTopic", topicStr)) //Connect to eligible peers. - numPeersToConnect := waku_proto.GossipSubOptimalFullMeshSize - curPeerLen + numPeersToConnect := waku_proto.GossipSubDMin - curPeerLen if numPeersToConnect > notConnectedPeers.Len() { numPeersToConnect = notConnectedPeers.Len() diff --git a/waku/v2/peermanager/topic_event_handler.go b/waku/v2/peermanager/topic_event_handler.go index 9e66ec0f..1a39fee2 100644 --- a/waku/v2/peermanager/topic_event_handler.go +++ b/waku/v2/peermanager/topic_event_handler.go @@ -30,7 +30,7 @@ func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topic //Nothing to be done, as we are already subscribed to this topic. return } - pm.subRelayTopics[pubsubTopic] = &NodeTopicDetails{topicInst} + pm.subRelayTopics[pubsubTopic] = &NodeTopicDetails{topicInst, UnHealthy} //Check how many relay peers we are connected to that subscribe to this topic, if less than D find peers in peerstore and connect. //If no peers in peerStore, trigger discovery for this topic? relevantPeersForPubSubTopic := pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubsubTopic) @@ -44,7 +44,9 @@ func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topic } } - if connectedPeers >= waku_proto.GossipSubOptimalFullMeshSize { //TODO: Use a config rather than hard-coding. + pm.checkAndUpdateTopicHealth(pm.subRelayTopics[pubsubTopic]) + + if connectedPeers >= waku_proto.GossipSubDMin { //TODO: Use a config rather than hard-coding. // Should we use optimal number or define some sort of a config for the node to choose from? // A desktop node may choose this to be 4-6, whereas a service node may choose this to be 8-12 based on resources it has // or bandwidth it can support. @@ -58,10 +60,10 @@ func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topic numPeersToConnect := notConnectedPeers.Len() - connectedPeers if numPeersToConnect < 0 { numPeersToConnect = notConnectedPeers.Len() - } else if numPeersToConnect-connectedPeers > waku_proto.GossipSubOptimalFullMeshSize { - numPeersToConnect = waku_proto.GossipSubOptimalFullMeshSize - connectedPeers + } else if numPeersToConnect-connectedPeers > waku_proto.GossipSubDMin { + numPeersToConnect = waku_proto.GossipSubDMin - connectedPeers } - if numPeersToConnect+connectedPeers < waku_proto.GossipSubOptimalFullMeshSize { + if numPeersToConnect+connectedPeers < waku_proto.GossipSubDMin { triggerDiscovery = true } //For now all peers are being given same priority, @@ -123,12 +125,19 @@ func (pm *PeerManager) handlerPeerTopicEvent(peerEvt relay.EvtPeerTopic) { pm.logger.Error("failed to add pubSubTopic for peer", logging.HostID("peerID", peerID), zap.String("topic", peerEvt.PubsubTopic), zap.Error(err)) } + pm.topicMutex.RLock() + defer pm.topicMutex.RUnlock() + pm.checkAndUpdateTopicHealth(pm.subRelayTopics[peerEvt.PubsubTopic]) + } else if peerEvt.State == relay.PEER_LEFT { err := wps.RemovePubSubTopic(peerID, peerEvt.PubsubTopic) if err != nil { pm.logger.Error("failed to remove pubSubTopic for peer", logging.HostID("peerID", peerID), zap.Error(err)) } + pm.topicMutex.RLock() + defer pm.topicMutex.RUnlock() + pm.checkAndUpdateTopicHealth(pm.subRelayTopics[peerEvt.PubsubTopic]) } else { pm.logger.Error("unknown peer event received", zap.Int("eventState", int(peerEvt.State))) } diff --git a/waku/v2/peerstore/waku_peer_store.go b/waku/v2/peerstore/waku_peer_store.go index 5495a577..402d265a 100644 --- a/waku/v2/peerstore/waku_peer_store.go +++ b/waku/v2/peerstore/waku_peer_store.go @@ -28,6 +28,7 @@ const peerOrigin = "origin" const peerENR = "enr" const peerDirection = "direction" const peerPubSubTopics = "pubSubTopics" +const peerScore = "score" // ConnectionFailures contains connection failure information towards all peers type ConnectionFailures struct { @@ -61,6 +62,9 @@ type WakuPeerstore interface { SetPubSubTopics(p peer.ID, topics []string) error PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice + + SetScore(peer.ID, float64) error + Score(peer.ID) (float64, error) } // NewWakuPeerstore creates a new WakuPeerStore object @@ -88,6 +92,21 @@ func (ps *WakuPeerstoreImpl) Origin(p peer.ID) (Origin, error) { return result.(Origin), nil } +// SetScore sets score for a specific peer. +func (ps *WakuPeerstoreImpl) SetScore(p peer.ID, score float64) error { + return ps.peerStore.Put(p, peerScore, score) +} + +// Score fetches the peerScore for a specific peer. +func (ps *WakuPeerstoreImpl) Score(p peer.ID) (float64, error) { + result, err := ps.peerStore.Get(p, peerScore) + if err != nil { + return -1, err + } + + return result.(float64), nil +} + // PeersByOrigin returns the list of peers for a specific origin func (ps *WakuPeerstoreImpl) PeersByOrigin(expectedOrigin Origin) peer.IDSlice { var result peer.IDSlice diff --git a/waku/v2/protocol/relay/config.go b/waku/v2/protocol/relay/config.go index 16799863..f0f41f80 100644 --- a/waku/v2/protocol/relay/config.go +++ b/waku/v2/protocol/relay/config.go @@ -40,6 +40,8 @@ func msgIDFn(pmsg *pubsub_pb.Message) string { return string(hash.SHA256(pmsg.Data)) } +const PeerPublishThreshold = -1000 + func (w *WakuRelay) setDefaultPeerScoreParams() { w.peerScoreParams = &pubsub.PeerScoreParams{ Topics: make(map[string]*pubsub.TopicScoreParams), @@ -59,10 +61,10 @@ func (w *WakuRelay) setDefaultPeerScoreParams() { BehaviourPenaltyDecay: 0.986, } w.peerScoreThresholds = &pubsub.PeerScoreThresholds{ - GossipThreshold: -100, // no gossip is sent to peers below this score - PublishThreshold: -1000, // no self-published msgs are sent to peers below this score - GraylistThreshold: -10000, // used to trigger disconnections + ignore peer if below this score - OpportunisticGraftThreshold: 0, // grafts better peers if the mesh median score drops below this. unset. + GossipThreshold: -100, // no gossip is sent to peers below this score + PublishThreshold: PeerPublishThreshold, // no self-published msgs are sent to peers below this score + GraylistThreshold: -10000, // used to trigger disconnections + ignore peer if below this score + OpportunisticGraftThreshold: 0, // grafts better peers if the mesh median score drops below this. unset. } } @@ -72,11 +74,11 @@ func (w *WakuRelay) defaultPubsubOptions() []pubsub.Option { cfg.PruneBackoff = time.Minute cfg.UnsubscribeBackoff = 5 * time.Second cfg.GossipFactor = 0.25 - cfg.D = waku_proto.GossipSubOptimalFullMeshSize + cfg.D = waku_proto.GossipSubDMin cfg.Dlo = 4 cfg.Dhi = 8 cfg.Dout = 3 - cfg.Dlazy = waku_proto.GossipSubOptimalFullMeshSize + cfg.Dlazy = waku_proto.GossipSubDMin cfg.HeartbeatInterval = time.Second cfg.HistoryLength = 6 cfg.HistoryGossip = 3 diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 060f161c..a6f9a169 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -16,6 +16,7 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/waku-org/go-waku/logging" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/service" @@ -114,6 +115,7 @@ func (w *WakuRelay) peerScoreInspector(peerScoresSnapshots map[peer.ID]*pubsub.P w.log.Error("could not disconnect peer", logging.HostID("peer", pid), zap.Error(err)) } } + _ = w.host.Peerstore().(wps.WakuPeerstore).SetScore(pid, snap.Score) } } diff --git a/waku/v2/protocol/utils.go b/waku/v2/protocol/utils.go index eaf82093..c30b7f9c 100644 --- a/waku/v2/protocol/utils.go +++ b/waku/v2/protocol/utils.go @@ -6,7 +6,7 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" ) -const GossipSubOptimalFullMeshSize = 6 +const GossipSubDMin = 4 // FulltextMatch is the default matching function used for checking if a peer // supports a protocol or not