From 8681b27e13278a0831fa1a4b9762b66668f8ada8 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 13 Jun 2024 17:01:31 -0400 Subject: [PATCH] test: peer connection --- waku/v2/node/connectedness.go | 12 ++++++++++++ waku/v2/node/wakunode2.go | 18 ++++++++++++++++++ waku/v2/peermanager/peer_connector.go | 3 +-- waku/v2/peermanager/peer_manager.go | 5 +++-- waku/v2/peermanager/topic_event_handler.go | 9 ++++----- 5 files changed, 38 insertions(+), 9 deletions(-) diff --git a/waku/v2/node/connectedness.go b/waku/v2/node/connectedness.go index 5a0f89fe..3e4fb008 100644 --- a/waku/v2/node/connectedness.go +++ b/waku/v2/node/connectedness.go @@ -2,6 +2,7 @@ package node import ( "context" + "fmt" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -55,6 +56,11 @@ func (c ConnectionNotifier) ListenClose(n network.Network, m multiaddr.Multiaddr // Connected is called when a connection is opened func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) { + fmt.Println("===============") + fmt.Println("===============") + fmt.Println("===============") + fmt.Println("===============") + fmt.Println("PEERS: ", len(c.h.Network().Peers())) c.log.Info("peer connected", logging.HostID("peer", cc.RemotePeer()), zap.String("direction", cc.Stat().Direction.String())) if c.connNotifCh != nil { select { @@ -76,6 +82,12 @@ func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) { // Disconnected is called when a connection closed func (c ConnectionNotifier) Disconnected(n network.Network, cc network.Conn) { c.log.Info("peer disconnected", logging.HostID("peer", cc.RemotePeer())) + fmt.Println("===============") + fmt.Println("===============") + fmt.Println("===============") + fmt.Println("===============") + fmt.Println("PEERS: ", len(c.h.Network().Peers())) + c.metrics.RecordPeerDisconnected() c.DisconnectChan <- cc.RemotePeer() if c.connNotifCh != nil { diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 5d68374b..6837eafe 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -2,6 +2,7 @@ package node import ( "context" + "fmt" "math/rand" "net" "sync" @@ -385,6 +386,23 @@ func (w *WakuNode) Start(ctx context.Context) error { } w.metadata.SetHost(host) + + go func() { + t := time.NewTicker(3 * time.Second) + for { + select { + case <-t.C: + fmt.Println("===============") + fmt.Println("===============") + fmt.Println("===============") + fmt.Println("===============") + fmt.Println("PEERS: ", len(host.Network().Peers())) + case <-ctx.Done(): + return + } + } + }() + err = w.metadata.Start(ctx) if err != nil { return err diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index 6f40a171..eb6f4045 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -17,7 +17,6 @@ import ( "github.com/libp2p/go-libp2p/p2p/discovery/backoff" "github.com/waku-org/go-waku/logging" wps "github.com/waku-org/go-waku/waku/v2/peerstore" - waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/service" "go.uber.org/zap" @@ -127,7 +126,7 @@ func (c *PeerConnectionStrategy) consumeSubscription(s subscription) { triggerImmediateConnection := false //Not connecting to peer as soon as it is discovered, // rather expecting this to be pushed from PeerManager based on the need. - if len(c.host.Network().Peers()) < waku_proto.GossipSubDMin { + if len(c.host.Network().Peers()) < 300 { triggerImmediateConnection = true } c.logger.Debug("adding discovered peer", logging.HostID("peerID", p.AddrInfo.ID)) diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index d9f60b0f..9c637686 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -176,6 +176,7 @@ func (pm *PeerManager) TopicHealth(pubsubTopic string) (TopicHealth, error) { // NewPeerManager creates a new peerManager instance. func NewPeerManager(maxConnections int, maxPeers int, metadata *metadata.WakuMetadata, logger *zap.Logger) *PeerManager { + maxConnections = 300 maxRelayPeers, _ := relayAndServicePeers(maxConnections) inRelayPeersTarget, outRelayPeersTarget := inAndOutRelayPeers(maxRelayPeers) @@ -302,7 +303,7 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() { // match those peers that are currently connected curPeerLen := pm.checkAndUpdateTopicHealth(topicInst) - if curPeerLen < waku_proto.GossipSubDMin { + if curPeerLen < 300 { pm.logger.Debug("subscribed topic is not sufficiently healthy, initiating more connections to maintain health", zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen), zap.Int("optimumPeers", waku_proto.GossipSubDMin)) @@ -315,7 +316,7 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() { } pm.logger.Debug("connecting to eligible peers in peerstore", zap.String("pubSubTopic", topicStr)) //Connect to eligible peers. - numPeersToConnect := waku_proto.GossipSubDMin - curPeerLen + numPeersToConnect := 300 - curPeerLen if numPeersToConnect > notConnectedPeers.Len() { numPeersToConnect = notConnectedPeers.Len() diff --git a/waku/v2/peermanager/topic_event_handler.go b/waku/v2/peermanager/topic_event_handler.go index 3060bf7b..edbf37e5 100644 --- a/waku/v2/peermanager/topic_event_handler.go +++ b/waku/v2/peermanager/topic_event_handler.go @@ -10,7 +10,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/logging" wps "github.com/waku-org/go-waku/waku/v2/peerstore" - waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "go.uber.org/zap" "golang.org/x/exp/maps" @@ -48,7 +47,7 @@ func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topic pm.checkAndUpdateTopicHealth(pm.subRelayTopics[pubsubTopic]) - if connectedPeers >= waku_proto.GossipSubDMin { //TODO: Use a config rather than hard-coding. + if connectedPeers >= 300 { //TODO: Use a config rather than hard-coding. // Should we use optimal number or define some sort of a config for the node to choose from? // A desktop node may choose this to be 4-6, whereas a service node may choose this to be 8-12 based on resources it has // or bandwidth it can support. @@ -62,10 +61,10 @@ func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topic numPeersToConnect := notConnectedPeers.Len() - connectedPeers if numPeersToConnect < 0 { numPeersToConnect = notConnectedPeers.Len() - } else if numPeersToConnect-connectedPeers > waku_proto.GossipSubDMin { - numPeersToConnect = waku_proto.GossipSubDMin - connectedPeers + } else if numPeersToConnect-connectedPeers > 300 { + numPeersToConnect = 300 - connectedPeers } - if numPeersToConnect+connectedPeers < waku_proto.GossipSubDMin { + if numPeersToConnect+connectedPeers < 300 { triggerDiscovery = true } //For now all peers are being given same priority,