feat: topic health reporting (#1027)

Co-authored-by: richΛrd <info@richardramos.me>
This commit is contained in:
Prem Chaitanya Prathi 2024-02-08 15:24:58 +05:30 committed by GitHub
parent c09bd8383b
commit 4f232c40ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 218 additions and 188 deletions

View File

@ -9,10 +9,6 @@ import (
"github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"go.uber.org/zap" "go.uber.org/zap"
wps "github.com/waku-org/go-waku/waku/v2/peerstore" wps "github.com/waku-org/go-waku/waku/v2/peerstore"
@ -21,14 +17,6 @@ import (
// PeerStatis is a map of peer IDs to supported protocols // PeerStatis is a map of peer IDs to supported protocols
type PeerStats map[peer.ID][]protocol.ID type PeerStats map[peer.ID][]protocol.ID
// ConnStatus is used to indicate if the node is online, has access to history
// and also see the list of peers the node is aware of
type ConnStatus struct {
IsOnline bool
HasHistory bool
Peers PeerStats
}
type PeerConnection struct { type PeerConnection struct {
PeerID peer.ID PeerID peer.ID
Connected bool Connected bool
@ -112,15 +100,6 @@ func (c ConnectionNotifier) ClosedStream(n network.Network, s network.Stream) {
func (c ConnectionNotifier) Close() { func (c ConnectionNotifier) Close() {
} }
func (w *WakuNode) sendConnStatus() {
isOnline, hasHistory := w.Status()
if w.connStatusChan != nil {
connStatus := ConnStatus{IsOnline: isOnline, HasHistory: hasHistory, Peers: w.PeerStats()}
w.connStatusChan <- connStatus
}
}
func (w *WakuNode) connectednessListener(ctx context.Context) { func (w *WakuNode) connectednessListener(ctx context.Context) {
defer w.wg.Done() defer w.wg.Done()
@ -128,53 +107,7 @@ func (w *WakuNode) connectednessListener(ctx context.Context) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-w.protocolEventSub.Out():
case <-w.identificationEventSub.Out():
case <-w.connectionNotif.DisconnectChan: case <-w.connectionNotif.DisconnectChan:
} }
w.sendConnStatus()
} }
} }
// Status returns the current status of the node (online or not)
// and if the node has access to history nodes or not
func (w *WakuNode) Status() (isOnline bool, hasHistory bool) {
hasRelay := false
hasLightPush := false
hasStore := false
hasFilter := false
for _, peer := range w.host.Network().Peers() {
protocols, err := w.host.Peerstore().GetProtocols(peer)
if err != nil {
w.log.Warn("reading peer protocols", logging.HostID("peer", peer), zap.Error(err))
}
for _, protocol := range protocols {
if !hasRelay && protocol == relay.WakuRelayID_v200 {
hasRelay = true
}
if !hasLightPush && protocol == lightpush.LightPushID_v20beta1 {
hasLightPush = true
}
if !hasStore && protocol == store.StoreID_v20beta4 {
hasStore = true
}
if !hasFilter && protocol == legacy_filter.FilterID_v20beta1 {
hasFilter = true
}
}
}
if hasStore {
hasHistory = true
}
if w.opts.enableFilterLightNode && !w.opts.enableRelay {
isOnline = hasLightPush && hasFilter
} else {
isOnline = hasRelay || hasLightPush && (hasStore || hasFilter)
}
return
}

View File

@ -7,37 +7,33 @@ import (
"testing" "testing"
"time" "time"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/persistence" "github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/persistence/sqlite" "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/go-waku/waku/v2/protocol"
) )
func goCheckConnectedness(t *testing.T, wg *sync.WaitGroup, connStatusChan chan ConnStatus, clientNode *WakuNode, node *WakuNode, nodeShouldBeConnected bool, shouldBeOnline bool, shouldHaveHistory bool, expectedPeers int) { const pubsubTopic = "/waku/2/rs/16/1000"
func goCheckConnectedness(t *testing.T, wg *sync.WaitGroup, topicHealthStatusChan chan peermanager.TopicHealthStatus,
healthStatus peermanager.TopicHealth) {
wg.Add(1) wg.Add(1)
go checkConnectedness(t, wg, connStatusChan, clientNode, node, nodeShouldBeConnected, shouldBeOnline, shouldHaveHistory, expectedPeers) go checkConnectedness(t, wg, topicHealthStatusChan, healthStatus)
} }
func checkConnectedness(t *testing.T, wg *sync.WaitGroup, connStatusChan chan ConnStatus, clientNode *WakuNode, node *WakuNode, nodeShouldBeConnected bool, shouldBeOnline bool, shouldHaveHistory bool, expectedPeers int) { func checkConnectedness(t *testing.T, wg *sync.WaitGroup, topicHealthStatusChan chan peermanager.TopicHealthStatus,
healthStatus peermanager.TopicHealth) {
defer wg.Done() defer wg.Done()
timeout := time.After(5 * time.Second) timeout := time.After(5 * time.Second)
select { select {
case connStatus := <-connStatusChan: case topicHealthStatus := <-topicHealthStatusChan:
_, ok := connStatus.Peers[node.Host().ID()] require.Equal(t, healthStatus, topicHealthStatus.Health)
if (nodeShouldBeConnected && ok) || (!nodeShouldBeConnected && !ok) { t.Log("received health status update ", topicHealthStatus.Health, "expected is ", healthStatus)
// Only execute the test when the node is connected or disconnected and it does not appear in the map returned by the connection status channel return
require.True(t, connStatus.IsOnline == shouldBeOnline)
require.True(t, connStatus.HasHistory == shouldHaveHistory)
require.Len(t, clientNode.Host().Network().Peers(), expectedPeers)
return
}
case <-timeout: case <-timeout:
require.Fail(t, "node should have connected") require.Fail(t, "health status should have changed")
} }
} }
@ -45,7 +41,7 @@ func TestConnectionStatusChanges(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
connStatusChan := make(chan ConnStatus, 100) topicHealthStatusChan := make(chan peermanager.TopicHealthStatus, 100)
// Node1: Only Relay // Node1: Only Relay
hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
@ -53,70 +49,80 @@ func TestConnectionStatusChanges(t *testing.T) {
node1, err := New( node1, err := New(
WithHostAddress(hostAddr1), WithHostAddress(hostAddr1),
WithWakuRelay(), WithWakuRelay(),
WithConnectionStatusChannel(connStatusChan), WithTopicHealthStatusChannel(topicHealthStatusChan),
) )
require.NoError(t, err) require.NoError(t, err)
err = node1.Start(ctx) err = node1.Start(ctx)
require.NoError(t, err) require.NoError(t, err)
_, err = node1.Relay().Subscribe(ctx, protocol.NewContentFilter(pubsubTopic))
require.NoError(t, err)
// Node2: Relay // Node2: Relay
hostAddr2, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") node2 := startNodeAndSubscribe(t, ctx)
require.NoError(t, err)
node2, err := New(
WithHostAddress(hostAddr2),
WithWakuRelay(),
)
require.NoError(t, err)
err = node2.Start(ctx)
require.NoError(t, err)
db, err := sqlite.NewDB(":memory:", utils.Logger())
require.NoError(t, err)
dbStore, err := persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(sqlite.Migrations))
require.NoError(t, err)
// Node3: Relay + Store // Node3: Relay + Store
hostAddr3, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") node3 := startNodeAndSubscribe(t, ctx)
require.NoError(t, err)
node3, err := New( // Node4: Relay
WithHostAddress(hostAddr3), node4 := startNodeAndSubscribe(t, ctx)
WithWakuRelay(),
WithWakuStore(), // Node5: Relay
WithMessageProvider(dbStore), node5 := startNodeAndSubscribe(t, ctx)
)
require.NoError(t, err)
err = node3.Start(ctx)
require.NoError(t, err)
var wg sync.WaitGroup var wg sync.WaitGroup
goCheckConnectedness(t, &wg, connStatusChan, node1, node2, true, true, false, 1) goCheckConnectedness(t, &wg, topicHealthStatusChan, peermanager.MinimallyHealthy)
err = node1.DialPeer(ctx, node2.ListenAddresses()[0].String()) node1.AddDiscoveredPeer(node2.host.ID(), node2.ListenAddresses(), peerstore.Static, []string{pubsubTopic}, true)
require.NoError(t, err)
wg.Wait() wg.Wait()
goCheckConnectedness(t, &wg, connStatusChan, node1, node3, true, true, true, 2)
err = node1.DialPeer(ctx, node3.ListenAddresses()[0].String()) err = node1.DialPeer(ctx, node3.ListenAddresses()[0].String())
require.NoError(t, err) require.NoError(t, err)
goCheckConnectedness(t, &wg, connStatusChan, node1, node3, false, true, false, 1) err = node1.DialPeer(ctx, node4.ListenAddresses()[0].String())
require.NoError(t, err)
goCheckConnectedness(t, &wg, topicHealthStatusChan, peermanager.SufficientlyHealthy)
err = node1.DialPeer(ctx, node5.ListenAddresses()[0].String())
require.NoError(t, err)
wg.Wait()
goCheckConnectedness(t, &wg, topicHealthStatusChan, peermanager.MinimallyHealthy)
node3.Stop() node3.Stop()
wg.Wait() wg.Wait()
goCheckConnectedness(t, &wg, connStatusChan, node1, node2, false, false, false, 0) goCheckConnectedness(t, &wg, topicHealthStatusChan, peermanager.UnHealthy)
err = node1.ClosePeerById(node2.Host().ID()) err = node1.ClosePeerById(node2.Host().ID())
require.NoError(t, err) require.NoError(t, err)
node4.Stop()
node5.Stop()
wg.Wait() wg.Wait()
goCheckConnectedness(t, &wg, connStatusChan, node1, node2, true, true, false, 1) goCheckConnectedness(t, &wg, topicHealthStatusChan, peermanager.MinimallyHealthy)
err = node1.DialPeerByID(ctx, node2.Host().ID()) err = node1.DialPeerByID(ctx, node2.Host().ID())
require.NoError(t, err) require.NoError(t, err)
wg.Wait() wg.Wait()
} }
func startNodeAndSubscribe(t *testing.T, ctx context.Context) *WakuNode {
hostAddr, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
node, err := New(
WithHostAddress(hostAddr),
WithWakuRelay(),
)
require.NoError(t, err)
err = node.Start(ctx)
require.NoError(t, err)
_, err = node.Relay().Subscribe(ctx, protocol.NewContentFilter(pubsubTopic))
require.NoError(t, err)
return node
}

View File

@ -112,11 +112,9 @@ type WakuNode struct {
bcaster relay.Broadcaster bcaster relay.Broadcaster
connectionNotif ConnectionNotifier connectionNotif ConnectionNotifier
protocolEventSub event.Subscription addressChangesSub event.Subscription
identificationEventSub event.Subscription enrChangeCh chan struct{}
addressChangesSub event.Subscription
enrChangeCh chan struct{}
keepAliveMutex sync.Mutex keepAliveMutex sync.Mutex
keepAliveFails map[peer.ID]int keepAliveFails map[peer.ID]int
@ -124,10 +122,6 @@ type WakuNode struct {
cancel context.CancelFunc cancel context.CancelFunc
wg *sync.WaitGroup wg *sync.WaitGroup
// Channel passed to WakuNode constructor
// receiving connection status notifications
connStatusChan chan<- ConnStatus
storeFactory storeFactory storeFactory storeFactory
peermanager *peermanager.PeerManager peermanager *peermanager.PeerManager
@ -306,8 +300,8 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.storeFactory = defaultStoreFactory w.storeFactory = defaultStoreFactory
} }
if params.connStatusC != nil { if params.topicHealthNotifCh != nil {
w.connStatusChan = params.connStatusC w.peermanager.TopicHealthNotifCh = params.topicHealthNotifCh
} }
return w, nil return w, nil
@ -364,14 +358,6 @@ func (w *WakuNode) Start(ctx context.Context) error {
w.host = host w.host = host
if w.protocolEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerProtocolsUpdated)); err != nil {
return err
}
if w.identificationEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted)); err != nil {
return err
}
if w.addressChangesSub, err = host.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated)); err != nil { if w.addressChangesSub, err = host.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated)); err != nil {
return err return err
} }
@ -519,8 +505,6 @@ func (w *WakuNode) Stop() {
w.bcaster.Stop() w.bcaster.Stop()
defer w.connectionNotif.Close() defer w.connectionNotif.Close()
defer w.protocolEventSub.Close()
defer w.identificationEventSub.Close()
defer w.addressChangesSub.Close() defer w.addressChangesSub.Close()
w.host.Network().StopNotify(w.connectionNotif) w.host.Network().StopNotify(w.connectionNotif)

View File

@ -26,6 +26,7 @@ import (
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net" manet "github.com/multiformats/go-multiaddr/net"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/pb"
@ -115,8 +116,8 @@ type WakuNodeParameters struct {
enableLightPush bool enableLightPush bool
connStatusC chan<- ConnStatus connNotifCh chan<- PeerConnection
connNotifCh chan<- PeerConnection topicHealthNotifCh chan<- peermanager.TopicHealthStatus
storeFactory storeFactory storeFactory storeFactory
} }
@ -489,16 +490,6 @@ func WithKeepAlive(t time.Duration) WakuNodeOption {
} }
} }
// WithConnectionStatusChannel is a WakuNodeOption used to set a channel where the
// connection status changes will be pushed to. It's useful to identify when peer
// connections and disconnections occur
func WithConnectionStatusChannel(connStatus chan ConnStatus) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.connStatusC = connStatus
return nil
}
}
func WithConnectionNotification(ch chan<- PeerConnection) WakuNodeOption { func WithConnectionNotification(ch chan<- PeerConnection) WakuNodeOption {
return func(params *WakuNodeParameters) error { return func(params *WakuNodeParameters) error {
params.connNotifCh = ch params.connNotifCh = ch
@ -566,6 +557,13 @@ func WithCircuitRelayParams(minInterval time.Duration, bootDelay time.Duration)
} }
} }
func WithTopicHealthStatusChannel(ch chan<- peermanager.TopicHealthStatus) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.topicHealthNotifCh = ch
return nil
}
}
// Default options used in the libp2p node // Default options used in the libp2p node
var DefaultLibP2POptions = []libp2p.Option{ var DefaultLibP2POptions = []libp2p.Option{
libp2p.ChainOptions( libp2p.ChainOptions(

View File

@ -1,14 +1,16 @@
package node package node
import ( import (
"github.com/ethereum/go-ethereum/common"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
r "github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
"net" "net"
"testing" "testing"
"time" "time"
"github.com/ethereum/go-ethereum/common"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
r "github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -28,7 +30,7 @@ func handleSpam(msg *pb.WakuMessage, topic string) error {
} }
func TestWakuOptions(t *testing.T) { func TestWakuOptions(t *testing.T) {
connStatusChan := make(chan ConnStatus, 100) topicHealthStatusChan := make(chan peermanager.TopicHealthStatus, 100)
key, err := tests.RandomHex(32) key, err := tests.RandomHex(32)
require.NoError(t, err) require.NoError(t, err)
@ -58,7 +60,7 @@ func TestWakuOptions(t *testing.T) {
WithMessageProvider(&persistence.DBStore{}), WithMessageProvider(&persistence.DBStore{}),
WithLightPush(), WithLightPush(),
WithKeepAlive(time.Hour), WithKeepAlive(time.Hour),
WithConnectionStatusChannel(connStatusChan), WithTopicHealthStatusChannel(topicHealthStatusChan),
WithWakuStoreFactory(storeFactory), WithWakuStoreFactory(storeFactory),
} }
@ -70,11 +72,11 @@ func TestWakuOptions(t *testing.T) {
require.NotNil(t, params.multiAddr) require.NotNil(t, params.multiAddr)
require.NotNil(t, params.privKey) require.NotNil(t, params.privKey)
require.NotNil(t, params.connStatusC) require.NotNil(t, params.topicHealthNotifCh)
} }
func TestWakuRLNOptions(t *testing.T) { func TestWakuRLNOptions(t *testing.T) {
connStatusChan := make(chan ConnStatus, 100) topicHealthStatusChan := make(chan peermanager.TopicHealthStatus, 100)
key, err := tests.RandomHex(32) key, err := tests.RandomHex(32)
require.NoError(t, err) require.NoError(t, err)
@ -108,7 +110,7 @@ func TestWakuRLNOptions(t *testing.T) {
WithMessageProvider(&persistence.DBStore{}), WithMessageProvider(&persistence.DBStore{}),
WithLightPush(), WithLightPush(),
WithKeepAlive(time.Hour), WithKeepAlive(time.Hour),
WithConnectionStatusChannel(connStatusChan), WithTopicHealthStatusChannel(topicHealthStatusChan),
WithWakuStoreFactory(storeFactory), WithWakuStoreFactory(storeFactory),
WithStaticRLNRelay(&index, handleSpam), WithStaticRLNRelay(&index, handleSpam),
} }
@ -149,7 +151,7 @@ func TestWakuRLNOptions(t *testing.T) {
WithMessageProvider(&persistence.DBStore{}), WithMessageProvider(&persistence.DBStore{}),
WithLightPush(), WithLightPush(),
WithKeepAlive(time.Hour), WithKeepAlive(time.Hour),
WithConnectionStatusChannel(connStatusChan), WithTopicHealthStatusChannel(topicHealthStatusChan),
WithWakuStoreFactory(storeFactory), WithWakuStoreFactory(storeFactory),
WithDynamicRLNRelay(keystorePath, keystorePassword, rlnTreePath, common.HexToAddress(contractAddress), &index, handleSpam, ethClientAddress), WithDynamicRLNRelay(keystorePath, keystorePassword, rlnTreePath, common.HexToAddress(contractAddress), &index, handleSpam, ethClientAddress),
} }

View File

@ -127,7 +127,7 @@ func (c *PeerConnectionStrategy) consumeSubscription(s subscription) {
triggerImmediateConnection := false triggerImmediateConnection := false
//Not connecting to peer as soon as it is discovered, //Not connecting to peer as soon as it is discovered,
// rather expecting this to be pushed from PeerManager based on the need. // rather expecting this to be pushed from PeerManager based on the need.
if len(c.host.Network().Peers()) < waku_proto.GossipSubOptimalFullMeshSize { if len(c.host.Network().Peers()) < waku_proto.GossipSubDMin {
triggerImmediateConnection = true triggerImmediateConnection = true
} }
c.logger.Debug("adding discovered peer", logging.HostID("peerID", p.AddrInfo.ID)) c.logger.Debug("adding discovered peer", logging.HostID("peerID", p.AddrInfo.ID))

View File

@ -26,9 +26,36 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
type TopicHealth int
const (
UnHealthy = iota
MinimallyHealthy = 1
SufficientlyHealthy = 2
)
func (t TopicHealth) String() string {
switch t {
case UnHealthy:
return "UnHealthy"
case MinimallyHealthy:
return "MinimallyHealthy"
case SufficientlyHealthy:
return "SufficientlyHealthy"
default:
return ""
}
}
type TopicHealthStatus struct {
Topic string
Health TopicHealth
}
// NodeTopicDetails stores pubSubTopic related data like topicHandle for the node. // NodeTopicDetails stores pubSubTopic related data like topicHandle for the node.
type NodeTopicDetails struct { type NodeTopicDetails struct {
topic *pubsub.Topic topic *pubsub.Topic
healthStatus TopicHealth
} }
// WakuProtoInfo holds protocol specific info // WakuProtoInfo holds protocol specific info
@ -54,6 +81,7 @@ type PeerManager struct {
subRelayTopics map[string]*NodeTopicDetails subRelayTopics map[string]*NodeTopicDetails
discoveryService *discv5.DiscoveryV5 discoveryService *discv5.DiscoveryV5
wakuprotoToENRFieldMap map[protocol.ID]WakuProtoInfo wakuprotoToENRFieldMap map[protocol.ID]WakuProtoInfo
TopicHealthNotifCh chan<- TopicHealthStatus
} }
// PeerSelection provides various options based on which Peer is selected from a list of peers. // PeerSelection provides various options based on which Peer is selected from a list of peers.
@ -87,6 +115,57 @@ func inAndOutRelayPeers(relayPeers int) (int, int) {
return relayPeers - outRelayPeers, outRelayPeers return relayPeers - outRelayPeers, outRelayPeers
} }
// checkAndUpdateTopicHealth finds health of specified topic and updates and notifies of the same.
// Also returns the healthyPeerCount
func (pm *PeerManager) checkAndUpdateTopicHealth(topic *NodeTopicDetails) int {
healthyPeerCount := 0
for _, p := range topic.topic.ListPeers() {
if pm.host.Network().Connectedness(p) == network.Connected {
pThreshold, err := pm.host.Peerstore().(wps.WakuPeerstore).Score(p)
if err == nil {
if pThreshold < relay.PeerPublishThreshold {
pm.logger.Debug("peer score below publish threshold", logging.HostID("peer", p), zap.Float64("score", pThreshold))
} else {
healthyPeerCount++
}
} else {
pm.logger.Warn("failed to fetch peer score ", zap.Error(err), logging.HostID("peer", p))
//For now considering peer as healthy if we can't fetch score.
healthyPeerCount++
}
}
}
//Update topic's health
oldHealth := topic.healthStatus
if healthyPeerCount < 1 { //Ideally this check should be done with minPeersForRelay, but leaving it as is for now.
topic.healthStatus = UnHealthy
} else if healthyPeerCount < waku_proto.GossipSubDMin {
topic.healthStatus = MinimallyHealthy
} else {
topic.healthStatus = SufficientlyHealthy
}
if oldHealth != topic.healthStatus {
//Check old health, and if there is a change notify of the same.
pm.logger.Debug("topic health has changed", zap.String("pubsubtopic", topic.topic.String()), zap.Stringer("health", topic.healthStatus))
pm.TopicHealthNotifCh <- TopicHealthStatus{topic.topic.String(), topic.healthStatus}
}
return healthyPeerCount
}
// TopicHealth can be used to fetch health of a specific pubsubTopic.
// Returns error if topic is not found.
func (pm *PeerManager) TopicHealth(pubsubTopic string) (TopicHealth, error) {
pm.topicMutex.RLock()
defer pm.topicMutex.RUnlock()
topicDetails, ok := pm.subRelayTopics[pubsubTopic]
if !ok {
return UnHealthy, errors.New("topic not found")
}
return topicDetails.healthStatus, nil
}
// NewPeerManager creates a new peerManager instance. // NewPeerManager creates a new peerManager instance.
func NewPeerManager(maxConnections int, maxPeers int, logger *zap.Logger) *PeerManager { func NewPeerManager(maxConnections int, maxPeers int, logger *zap.Logger) *PeerManager {
@ -212,16 +291,12 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
// @cammellos reported that ListPeers returned an invalid number of // @cammellos reported that ListPeers returned an invalid number of
// peers. This will ensure that the peers returned by this function // peers. This will ensure that the peers returned by this function
// match those peers that are currently connected // match those peers that are currently connected
curPeerLen := 0
for _, p := range topicInst.topic.ListPeers() { curPeerLen := pm.checkAndUpdateTopicHealth(topicInst)
if pm.host.Network().Connectedness(p) == network.Connected { if curPeerLen < waku_proto.GossipSubDMin {
curPeerLen++ pm.logger.Debug("subscribed topic is not sufficiently healthy, initiating more connections to maintain health",
}
}
if curPeerLen < waku_proto.GossipSubOptimalFullMeshSize {
pm.logger.Debug("subscribed topic is unhealthy, initiating more connections to maintain health",
zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen), zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen),
zap.Int("optimumPeers", waku_proto.GossipSubOptimalFullMeshSize)) zap.Int("optimumPeers", waku_proto.GossipSubDMin))
//Find not connected peers. //Find not connected peers.
notConnectedPeers := pm.getNotConnectedPers(topicStr) notConnectedPeers := pm.getNotConnectedPers(topicStr)
if notConnectedPeers.Len() == 0 { if notConnectedPeers.Len() == 0 {
@ -231,7 +306,7 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
} }
pm.logger.Debug("connecting to eligible peers in peerstore", zap.String("pubSubTopic", topicStr)) pm.logger.Debug("connecting to eligible peers in peerstore", zap.String("pubSubTopic", topicStr))
//Connect to eligible peers. //Connect to eligible peers.
numPeersToConnect := waku_proto.GossipSubOptimalFullMeshSize - curPeerLen numPeersToConnect := waku_proto.GossipSubDMin - curPeerLen
if numPeersToConnect > notConnectedPeers.Len() { if numPeersToConnect > notConnectedPeers.Len() {
numPeersToConnect = notConnectedPeers.Len() numPeersToConnect = notConnectedPeers.Len()

View File

@ -30,7 +30,7 @@ func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topic
//Nothing to be done, as we are already subscribed to this topic. //Nothing to be done, as we are already subscribed to this topic.
return return
} }
pm.subRelayTopics[pubsubTopic] = &NodeTopicDetails{topicInst} pm.subRelayTopics[pubsubTopic] = &NodeTopicDetails{topicInst, UnHealthy}
//Check how many relay peers we are connected to that subscribe to this topic, if less than D find peers in peerstore and connect. //Check how many relay peers we are connected to that subscribe to this topic, if less than D find peers in peerstore and connect.
//If no peers in peerStore, trigger discovery for this topic? //If no peers in peerStore, trigger discovery for this topic?
relevantPeersForPubSubTopic := pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubsubTopic) relevantPeersForPubSubTopic := pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubsubTopic)
@ -44,7 +44,9 @@ func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topic
} }
} }
if connectedPeers >= waku_proto.GossipSubOptimalFullMeshSize { //TODO: Use a config rather than hard-coding. pm.checkAndUpdateTopicHealth(pm.subRelayTopics[pubsubTopic])
if connectedPeers >= waku_proto.GossipSubDMin { //TODO: Use a config rather than hard-coding.
// Should we use optimal number or define some sort of a config for the node to choose from? // Should we use optimal number or define some sort of a config for the node to choose from?
// A desktop node may choose this to be 4-6, whereas a service node may choose this to be 8-12 based on resources it has // A desktop node may choose this to be 4-6, whereas a service node may choose this to be 8-12 based on resources it has
// or bandwidth it can support. // or bandwidth it can support.
@ -58,10 +60,10 @@ func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topic
numPeersToConnect := notConnectedPeers.Len() - connectedPeers numPeersToConnect := notConnectedPeers.Len() - connectedPeers
if numPeersToConnect < 0 { if numPeersToConnect < 0 {
numPeersToConnect = notConnectedPeers.Len() numPeersToConnect = notConnectedPeers.Len()
} else if numPeersToConnect-connectedPeers > waku_proto.GossipSubOptimalFullMeshSize { } else if numPeersToConnect-connectedPeers > waku_proto.GossipSubDMin {
numPeersToConnect = waku_proto.GossipSubOptimalFullMeshSize - connectedPeers numPeersToConnect = waku_proto.GossipSubDMin - connectedPeers
} }
if numPeersToConnect+connectedPeers < waku_proto.GossipSubOptimalFullMeshSize { if numPeersToConnect+connectedPeers < waku_proto.GossipSubDMin {
triggerDiscovery = true triggerDiscovery = true
} }
//For now all peers are being given same priority, //For now all peers are being given same priority,
@ -123,12 +125,19 @@ func (pm *PeerManager) handlerPeerTopicEvent(peerEvt relay.EvtPeerTopic) {
pm.logger.Error("failed to add pubSubTopic for peer", pm.logger.Error("failed to add pubSubTopic for peer",
logging.HostID("peerID", peerID), zap.String("topic", peerEvt.PubsubTopic), zap.Error(err)) logging.HostID("peerID", peerID), zap.String("topic", peerEvt.PubsubTopic), zap.Error(err))
} }
pm.topicMutex.RLock()
defer pm.topicMutex.RUnlock()
pm.checkAndUpdateTopicHealth(pm.subRelayTopics[peerEvt.PubsubTopic])
} else if peerEvt.State == relay.PEER_LEFT { } else if peerEvt.State == relay.PEER_LEFT {
err := wps.RemovePubSubTopic(peerID, peerEvt.PubsubTopic) err := wps.RemovePubSubTopic(peerID, peerEvt.PubsubTopic)
if err != nil { if err != nil {
pm.logger.Error("failed to remove pubSubTopic for peer", pm.logger.Error("failed to remove pubSubTopic for peer",
logging.HostID("peerID", peerID), zap.Error(err)) logging.HostID("peerID", peerID), zap.Error(err))
} }
pm.topicMutex.RLock()
defer pm.topicMutex.RUnlock()
pm.checkAndUpdateTopicHealth(pm.subRelayTopics[peerEvt.PubsubTopic])
} else { } else {
pm.logger.Error("unknown peer event received", zap.Int("eventState", int(peerEvt.State))) pm.logger.Error("unknown peer event received", zap.Int("eventState", int(peerEvt.State)))
} }

View File

@ -28,6 +28,7 @@ const peerOrigin = "origin"
const peerENR = "enr" const peerENR = "enr"
const peerDirection = "direction" const peerDirection = "direction"
const peerPubSubTopics = "pubSubTopics" const peerPubSubTopics = "pubSubTopics"
const peerScore = "score"
// ConnectionFailures contains connection failure information towards all peers // ConnectionFailures contains connection failure information towards all peers
type ConnectionFailures struct { type ConnectionFailures struct {
@ -61,6 +62,9 @@ type WakuPeerstore interface {
SetPubSubTopics(p peer.ID, topics []string) error SetPubSubTopics(p peer.ID, topics []string) error
PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice PeersByPubSubTopics(pubSubTopics []string, specificPeers ...peer.ID) peer.IDSlice
PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice
SetScore(peer.ID, float64) error
Score(peer.ID) (float64, error)
} }
// NewWakuPeerstore creates a new WakuPeerStore object // NewWakuPeerstore creates a new WakuPeerStore object
@ -88,6 +92,21 @@ func (ps *WakuPeerstoreImpl) Origin(p peer.ID) (Origin, error) {
return result.(Origin), nil return result.(Origin), nil
} }
// SetScore sets score for a specific peer.
func (ps *WakuPeerstoreImpl) SetScore(p peer.ID, score float64) error {
return ps.peerStore.Put(p, peerScore, score)
}
// Score fetches the peerScore for a specific peer.
func (ps *WakuPeerstoreImpl) Score(p peer.ID) (float64, error) {
result, err := ps.peerStore.Get(p, peerScore)
if err != nil {
return -1, err
}
return result.(float64), nil
}
// PeersByOrigin returns the list of peers for a specific origin // PeersByOrigin returns the list of peers for a specific origin
func (ps *WakuPeerstoreImpl) PeersByOrigin(expectedOrigin Origin) peer.IDSlice { func (ps *WakuPeerstoreImpl) PeersByOrigin(expectedOrigin Origin) peer.IDSlice {
var result peer.IDSlice var result peer.IDSlice

View File

@ -40,6 +40,8 @@ func msgIDFn(pmsg *pubsub_pb.Message) string {
return string(hash.SHA256(pmsg.Data)) return string(hash.SHA256(pmsg.Data))
} }
const PeerPublishThreshold = -1000
func (w *WakuRelay) setDefaultPeerScoreParams() { func (w *WakuRelay) setDefaultPeerScoreParams() {
w.peerScoreParams = &pubsub.PeerScoreParams{ w.peerScoreParams = &pubsub.PeerScoreParams{
Topics: make(map[string]*pubsub.TopicScoreParams), Topics: make(map[string]*pubsub.TopicScoreParams),
@ -59,10 +61,10 @@ func (w *WakuRelay) setDefaultPeerScoreParams() {
BehaviourPenaltyDecay: 0.986, BehaviourPenaltyDecay: 0.986,
} }
w.peerScoreThresholds = &pubsub.PeerScoreThresholds{ w.peerScoreThresholds = &pubsub.PeerScoreThresholds{
GossipThreshold: -100, // no gossip is sent to peers below this score GossipThreshold: -100, // no gossip is sent to peers below this score
PublishThreshold: -1000, // no self-published msgs are sent to peers below this score PublishThreshold: PeerPublishThreshold, // no self-published msgs are sent to peers below this score
GraylistThreshold: -10000, // used to trigger disconnections + ignore peer if below this score GraylistThreshold: -10000, // used to trigger disconnections + ignore peer if below this score
OpportunisticGraftThreshold: 0, // grafts better peers if the mesh median score drops below this. unset. OpportunisticGraftThreshold: 0, // grafts better peers if the mesh median score drops below this. unset.
} }
} }
@ -72,11 +74,11 @@ func (w *WakuRelay) defaultPubsubOptions() []pubsub.Option {
cfg.PruneBackoff = time.Minute cfg.PruneBackoff = time.Minute
cfg.UnsubscribeBackoff = 5 * time.Second cfg.UnsubscribeBackoff = 5 * time.Second
cfg.GossipFactor = 0.25 cfg.GossipFactor = 0.25
cfg.D = waku_proto.GossipSubOptimalFullMeshSize cfg.D = waku_proto.GossipSubDMin
cfg.Dlo = 4 cfg.Dlo = 4
cfg.Dhi = 8 cfg.Dhi = 8
cfg.Dout = 3 cfg.Dout = 3
cfg.Dlazy = waku_proto.GossipSubOptimalFullMeshSize cfg.Dlazy = waku_proto.GossipSubDMin
cfg.HeartbeatInterval = time.Second cfg.HeartbeatInterval = time.Second
cfg.HistoryLength = 6 cfg.HistoryLength = 6
cfg.HistoryGossip = 3 cfg.HistoryGossip = 3

View File

@ -16,6 +16,7 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/logging"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/service"
@ -114,6 +115,7 @@ func (w *WakuRelay) peerScoreInspector(peerScoresSnapshots map[peer.ID]*pubsub.P
w.log.Error("could not disconnect peer", logging.HostID("peer", pid), zap.Error(err)) w.log.Error("could not disconnect peer", logging.HostID("peer", pid), zap.Error(err))
} }
} }
_ = w.host.Peerstore().(wps.WakuPeerstore).SetScore(pid, snap.Score)
} }
} }

View File

@ -6,7 +6,7 @@ import (
"github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/core/protocol"
) )
const GossipSubOptimalFullMeshSize = 6 const GossipSubDMin = 4
// FulltextMatch is the default matching function used for checking if a peer // FulltextMatch is the default matching function used for checking if a peer
// supports a protocol or not // supports a protocol or not