test: peer connection

This commit is contained in:
Richard Ramos 2024-06-13 17:01:31 -04:00
parent 32da07cad9
commit 8681b27e13
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
5 changed files with 38 additions and 9 deletions

View File

@ -2,6 +2,7 @@ package node
import (
"context"
"fmt"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
@ -55,6 +56,11 @@ func (c ConnectionNotifier) ListenClose(n network.Network, m multiaddr.Multiaddr
// Connected is called when a connection is opened
func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) {
fmt.Println("===============")
fmt.Println("===============")
fmt.Println("===============")
fmt.Println("===============")
fmt.Println("PEERS: ", len(c.h.Network().Peers()))
c.log.Info("peer connected", logging.HostID("peer", cc.RemotePeer()), zap.String("direction", cc.Stat().Direction.String()))
if c.connNotifCh != nil {
select {
@ -76,6 +82,12 @@ func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) {
// Disconnected is called when a connection closed
func (c ConnectionNotifier) Disconnected(n network.Network, cc network.Conn) {
c.log.Info("peer disconnected", logging.HostID("peer", cc.RemotePeer()))
fmt.Println("===============")
fmt.Println("===============")
fmt.Println("===============")
fmt.Println("===============")
fmt.Println("PEERS: ", len(c.h.Network().Peers()))
c.metrics.RecordPeerDisconnected()
c.DisconnectChan <- cc.RemotePeer()
if c.connNotifCh != nil {

View File

@ -2,6 +2,7 @@ package node
import (
"context"
"fmt"
"math/rand"
"net"
"sync"
@ -385,6 +386,23 @@ func (w *WakuNode) Start(ctx context.Context) error {
}
w.metadata.SetHost(host)
go func() {
t := time.NewTicker(3 * time.Second)
for {
select {
case <-t.C:
fmt.Println("===============")
fmt.Println("===============")
fmt.Println("===============")
fmt.Println("===============")
fmt.Println("PEERS: ", len(host.Network().Peers()))
case <-ctx.Done():
return
}
}
}()
err = w.metadata.Start(ctx)
if err != nil {
return err

View File

@ -17,7 +17,6 @@ import (
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
"github.com/waku-org/go-waku/logging"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/service"
"go.uber.org/zap"
@ -127,7 +126,7 @@ func (c *PeerConnectionStrategy) consumeSubscription(s subscription) {
triggerImmediateConnection := false
//Not connecting to peer as soon as it is discovered,
// rather expecting this to be pushed from PeerManager based on the need.
if len(c.host.Network().Peers()) < waku_proto.GossipSubDMin {
if len(c.host.Network().Peers()) < 300 {
triggerImmediateConnection = true
}
c.logger.Debug("adding discovered peer", logging.HostID("peerID", p.AddrInfo.ID))

View File

@ -176,6 +176,7 @@ func (pm *PeerManager) TopicHealth(pubsubTopic string) (TopicHealth, error) {
// NewPeerManager creates a new peerManager instance.
func NewPeerManager(maxConnections int, maxPeers int, metadata *metadata.WakuMetadata, logger *zap.Logger) *PeerManager {
maxConnections = 300
maxRelayPeers, _ := relayAndServicePeers(maxConnections)
inRelayPeersTarget, outRelayPeersTarget := inAndOutRelayPeers(maxRelayPeers)
@ -302,7 +303,7 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
// match those peers that are currently connected
curPeerLen := pm.checkAndUpdateTopicHealth(topicInst)
if curPeerLen < waku_proto.GossipSubDMin {
if curPeerLen < 300 {
pm.logger.Debug("subscribed topic is not sufficiently healthy, initiating more connections to maintain health",
zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen),
zap.Int("optimumPeers", waku_proto.GossipSubDMin))
@ -315,7 +316,7 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
}
pm.logger.Debug("connecting to eligible peers in peerstore", zap.String("pubSubTopic", topicStr))
//Connect to eligible peers.
numPeersToConnect := waku_proto.GossipSubDMin - curPeerLen
numPeersToConnect := 300 - curPeerLen
if numPeersToConnect > notConnectedPeers.Len() {
numPeersToConnect = notConnectedPeers.Len()

View File

@ -10,7 +10,6 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/logging"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
"golang.org/x/exp/maps"
@ -48,7 +47,7 @@ func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topic
pm.checkAndUpdateTopicHealth(pm.subRelayTopics[pubsubTopic])
if connectedPeers >= waku_proto.GossipSubDMin { //TODO: Use a config rather than hard-coding.
if connectedPeers >= 300 { //TODO: Use a config rather than hard-coding.
// Should we use optimal number or define some sort of a config for the node to choose from?
// A desktop node may choose this to be 4-6, whereas a service node may choose this to be 8-12 based on resources it has
// or bandwidth it can support.
@ -62,10 +61,10 @@ func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topic
numPeersToConnect := notConnectedPeers.Len() - connectedPeers
if numPeersToConnect < 0 {
numPeersToConnect = notConnectedPeers.Len()
} else if numPeersToConnect-connectedPeers > waku_proto.GossipSubDMin {
numPeersToConnect = waku_proto.GossipSubDMin - connectedPeers
} else if numPeersToConnect-connectedPeers > 300 {
numPeersToConnect = 300 - connectedPeers
}
if numPeersToConnect+connectedPeers < waku_proto.GossipSubDMin {
if numPeersToConnect+connectedPeers < 300 {
triggerDiscovery = true
}
//For now all peers are being given same priority,