feat: modify peer-manager to consider relay target peers (#1135)

This commit is contained in:
Prem Chaitanya Prathi 2024-06-26 06:18:44 +05:30 committed by GitHub
parent ee33baa283
commit 19a47a1ac1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 120 additions and 86 deletions

View File

@ -8,6 +8,7 @@ package logging
import (
"encoding/hex"
"fmt"
"net"
"time"
@ -147,3 +148,8 @@ func TCPAddr(key string, ip net.IP, port int) zap.Field {
func UDPAddr(key string, ip net.IP, port int) zap.Field {
return zap.Stringer(key, &net.UDPAddr{IP: ip, Port: port})
}
func Uint64(key string, value uint64) zap.Field {
valueStr := fmt.Sprintf("%v", value)
return zap.String(key, valueStr)
}

View File

@ -256,7 +256,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.metadata = metadata
//Initialize peer manager.
w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, metadata, w.log)
w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, metadata, params.enableRelay, w.log)
w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, w.opts.onlineChecker, discoveryConnectTimeout, w.log)
if err != nil {
@ -554,9 +554,9 @@ func (w *WakuNode) watchENRChanges(ctx context.Context) {
currNodeVal := w.localNode.Node().String()
if prevNodeVal != currNodeVal {
if prevNodeVal == "" {
w.log.Info("enr record", logging.ENode("enr", w.localNode.Node()))
w.log.Info("local node enr record", logging.ENode("enr", w.localNode.Node()))
} else {
w.log.Info("new enr record", logging.ENode("enr", w.localNode.Node()))
w.log.Info("local node new enr record", logging.ENode("enr", w.localNode.Node()))
}
prevNodeVal = currNodeVal
}

View File

@ -45,6 +45,8 @@ const UserAgent string = "go-waku"
const defaultMinRelayPeersToPublish = 0
const DefaultMaxConnectionsPerIP = 5
const DefaultMaxConnections = 300
const DefaultMaxPeerStoreCapacity = 300
type WakuNodeParameters struct {
hostAddr *net.TCPAddr
@ -127,9 +129,10 @@ type WakuNodeOption func(*WakuNodeParameters) error
// Default options used in the libp2p node
var DefaultWakuNodeOptions = []WakuNodeOption{
WithPrometheusRegisterer(prometheus.NewRegistry()),
WithMaxPeerConnections(50),
WithMaxPeerConnections(DefaultMaxConnections),
WithMaxConnectionsPerIP(DefaultMaxConnectionsPerIP),
WithCircuitRelayParams(2*time.Second, 3*time.Minute),
WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity),
WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)),
}

View File

@ -18,7 +18,6 @@ import (
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/service"
"go.uber.org/zap"
@ -134,10 +133,9 @@ func (c *PeerConnectionStrategy) consumeSubscription(s subscription) {
triggerImmediateConnection := false
//Not connecting to peer as soon as it is discovered,
// rather expecting this to be pushed from PeerManager based on the need.
if len(c.host.Network().Peers()) < waku_proto.GossipSubDMin {
if len(c.host.Network().Peers()) < c.pm.OutPeersTarget {
triggerImmediateConnection = true
}
c.logger.Debug("adding discovered peer", logging.HostID("peerID", p.AddrInfo.ID))
c.pm.AddDiscoveredPeer(p, triggerImmediateConnection)
case <-time.After(1 * time.Second):
@ -238,7 +236,7 @@ func (c *PeerConnectionStrategy) addConnectionBackoff(peerID peer.ID) {
func (c *PeerConnectionStrategy) dialPeers() {
defer c.WaitGroup().Done()
maxGoRoutines := c.pm.OutRelayPeersTarget
maxGoRoutines := c.pm.OutPeersTarget
if maxGoRoutines > maxActiveDials {
maxGoRoutines = maxActiveDials
}

View File

@ -55,7 +55,7 @@ func (pm *PeerManager) discoverOnDemand(cluster uint16,
wakuProtoInfo, ok := pm.wakuprotoToENRFieldMap[wakuProtocol]
if !ok {
pm.logger.Error("cannot do on demand discovery for non-waku protocol", zap.String("protocol", string(wakuProtocol)))
pm.logger.Warn("cannot do on demand discovery for non-waku protocol", zap.String("protocol", string(wakuProtocol)))
return nil, errors.New("cannot do on demand discovery for non-waku protocol")
}
iterator, err := pm.discoveryService.PeerIterator(

View File

@ -73,8 +73,8 @@ type PeerManager struct {
maxPeers int
maxRelayPeers int
logger *zap.Logger
InRelayPeersTarget int
OutRelayPeersTarget int
InPeersTarget int
OutPeersTarget int
host host.Host
serviceSlots *ServiceSlots
ctx context.Context
@ -85,6 +85,7 @@ type PeerManager struct {
wakuprotoToENRFieldMap map[protocol.ID]WakuProtoInfo
TopicHealthNotifCh chan<- TopicHealthStatus
rttCache *FastestPeerSelector
RelayEnabled bool
}
// PeerSelection provides various options based on which Peer is selected from a list of peers.
@ -127,7 +128,7 @@ func (pm *PeerManager) checkAndUpdateTopicHealth(topic *NodeTopicDetails) int {
pThreshold, err := pm.host.Peerstore().(wps.WakuPeerstore).Score(p)
if err == nil {
if pThreshold < relay.PeerPublishThreshold {
pm.logger.Debug("peer score below publish threshold", logging.HostID("peer", p), zap.Float64("score", pThreshold))
pm.logger.Debug("peer score below publish threshold", zap.Stringer("peer", p), zap.Float64("score", pThreshold))
} else {
healthyPeerCount++
}
@ -135,14 +136,15 @@ func (pm *PeerManager) checkAndUpdateTopicHealth(topic *NodeTopicDetails) int {
if errors.Is(err, peerstore.ErrNotFound) {
// For now considering peer as healthy if we can't fetch score.
healthyPeerCount++
pm.logger.Debug("peer score is not available yet", logging.HostID("peer", p))
pm.logger.Debug("peer score is not available yet", zap.Stringer("peer", p))
} else {
pm.logger.Warn("failed to fetch peer score ", zap.Error(err), logging.HostID("peer", p))
pm.logger.Warn("failed to fetch peer score ", zap.Error(err), zap.Stringer("peer", p))
}
}
}
}
//Update topic's health
//TODO: This should be done based on number of full-mesh peers.
oldHealth := topic.healthStatus
if healthyPeerCount < 1 { //Ideally this check should be done with minPeersForRelay, but leaving it as is for now.
topic.healthStatus = UnHealthy
@ -174,31 +176,38 @@ func (pm *PeerManager) TopicHealth(pubsubTopic string) (TopicHealth, error) {
}
// NewPeerManager creates a new peerManager instance.
func NewPeerManager(maxConnections int, maxPeers int, metadata *metadata.WakuMetadata, logger *zap.Logger) *PeerManager {
func NewPeerManager(maxConnections int, maxPeers int, metadata *metadata.WakuMetadata, relayEnabled bool, logger *zap.Logger) *PeerManager {
var inPeersTarget, outPeersTarget, maxRelayPeers int
if relayEnabled {
maxRelayPeers, _ := relayAndServicePeers(maxConnections)
inPeersTarget, outPeersTarget = inAndOutRelayPeers(maxRelayPeers)
maxRelayPeers, _ := relayAndServicePeers(maxConnections)
inRelayPeersTarget, outRelayPeersTarget := inAndOutRelayPeers(maxRelayPeers)
if maxPeers == 0 || maxConnections > maxPeers {
maxPeers = maxConnsToPeerRatio * maxConnections
if maxPeers == 0 || maxConnections > maxPeers {
maxPeers = maxConnsToPeerRatio * maxConnections
}
} else {
maxRelayPeers = 0
inPeersTarget = 0
//TODO: ideally this should be 2 filter peers per topic, 2 lightpush peers per topic and 2-4 store nodes per topic
outPeersTarget = 10
}
pm := &PeerManager{
logger: logger.Named("peer-manager"),
metadata: metadata,
maxRelayPeers: maxRelayPeers,
InRelayPeersTarget: inRelayPeersTarget,
OutRelayPeersTarget: outRelayPeersTarget,
InPeersTarget: inPeersTarget,
OutPeersTarget: outPeersTarget,
serviceSlots: NewServiceSlot(),
subRelayTopics: make(map[string]*NodeTopicDetails),
maxPeers: maxPeers,
wakuprotoToENRFieldMap: map[protocol.ID]WakuProtoInfo{},
rttCache: NewFastestPeerSelector(logger),
RelayEnabled: relayEnabled,
}
logger.Info("PeerManager init values", zap.Int("maxConnections", maxConnections),
zap.Int("maxRelayPeers", maxRelayPeers),
zap.Int("outRelayPeersTarget", outRelayPeersTarget),
zap.Int("inRelayPeersTarget", pm.InRelayPeersTarget),
zap.Int("outPeersTarget", outPeersTarget),
zap.Int("inPeersTarget", pm.InPeersTarget),
zap.Int("maxPeers", maxPeers))
return pm
@ -225,7 +234,7 @@ func (pm *PeerManager) Start(ctx context.Context) {
pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField)
pm.ctx = ctx
if pm.sub != nil {
if pm.sub != nil && pm.RelayEnabled {
go pm.peerEventLoop(ctx)
}
go pm.connectivityLoop(ctx)
@ -233,7 +242,7 @@ func (pm *PeerManager) Start(ctx context.Context) {
// This is a connectivity loop, which currently checks and prunes inbound connections.
func (pm *PeerManager) connectivityLoop(ctx context.Context) {
pm.connectToRelayPeers()
pm.connectToPeers()
t := time.NewTicker(peerConnectivityLoopSecs * time.Second)
defer t.Stop()
for {
@ -241,7 +250,7 @@ func (pm *PeerManager) connectivityLoop(ctx context.Context) {
case <-ctx.Done():
return
case <-t.C:
pm.connectToRelayPeers()
pm.connectToPeers()
}
}
}
@ -262,7 +271,7 @@ func (pm *PeerManager) GroupPeersByDirection(specificPeers ...peer.ID) (inPeers
}
} else {
pm.logger.Error("failed to retrieve peer direction",
logging.HostID("peerID", p), zap.Error(err))
zap.Stringer("peerID", p), zap.Error(err))
}
}
return inPeers, outPeers, nil
@ -302,10 +311,10 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
// match those peers that are currently connected
curPeerLen := pm.checkAndUpdateTopicHealth(topicInst)
if curPeerLen < waku_proto.GossipSubDMin {
pm.logger.Debug("subscribed topic is not sufficiently healthy, initiating more connections to maintain health",
if curPeerLen < pm.OutPeersTarget {
pm.logger.Debug("subscribed topic has not reached target peers, initiating more connections to maintain healthy mesh",
zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen),
zap.Int("optimumPeers", waku_proto.GossipSubDMin))
zap.Int("targetPeers", pm.OutPeersTarget))
//Find not connected peers.
notConnectedPeers := pm.getNotConnectedPers(topicStr)
if notConnectedPeers.Len() == 0 {
@ -315,35 +324,42 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
}
pm.logger.Debug("connecting to eligible peers in peerstore", zap.String("pubSubTopic", topicStr))
//Connect to eligible peers.
numPeersToConnect := waku_proto.GossipSubDMin - curPeerLen
numPeersToConnect := pm.OutPeersTarget - curPeerLen
if numPeersToConnect > notConnectedPeers.Len() {
numPeersToConnect = notConnectedPeers.Len()
}
pm.connectToPeers(notConnectedPeers[0:numPeersToConnect])
pm.connectToSpecifiedPeers(notConnectedPeers[0:numPeersToConnect])
}
}
}
// connectToRelayPeers ensures minimum D connections are there for each pubSubTopic.
// connectToPeers ensures minimum D connections are there for each pubSubTopic.
// If not, initiates connections to additional peers.
// It also checks for incoming relay connections and prunes once they cross inRelayTarget
func (pm *PeerManager) connectToRelayPeers() {
//Check for out peer connections and connect to more peers.
pm.ensureMinRelayConnsPerTopic()
func (pm *PeerManager) connectToPeers() {
if pm.RelayEnabled {
//Check for out peer connections and connect to more peers.
pm.ensureMinRelayConnsPerTopic()
inRelayPeers, outRelayPeers := pm.getRelayPeers()
pm.logger.Debug("number of relay peers connected",
zap.Int("in", inRelayPeers.Len()),
zap.Int("out", outRelayPeers.Len()))
if inRelayPeers.Len() > 0 &&
inRelayPeers.Len() > pm.InRelayPeersTarget {
pm.pruneInRelayConns(inRelayPeers)
inRelayPeers, outRelayPeers := pm.getRelayPeers()
pm.logger.Debug("number of relay peers connected",
zap.Int("in", inRelayPeers.Len()),
zap.Int("out", outRelayPeers.Len()))
if inRelayPeers.Len() > 0 &&
inRelayPeers.Len() > pm.InPeersTarget {
pm.pruneInRelayConns(inRelayPeers)
}
} else {
//TODO: Connect to filter peers per topic as of now.
//Fetch filter peers from peerStore, TODO: topics for lightNode not available here?
//Filter subscribe to notify peerManager whenever a new topic/shard is subscribed to.
pm.logger.Debug("light mode..not doing anything")
}
}
// connectToPeers connects to peers provided in the list if the addresses have not expired.
func (pm *PeerManager) connectToPeers(peers peer.IDSlice) {
// connectToSpecifiedPeers connects to peers provided in the list if the addresses have not expired.
func (pm *PeerManager) connectToSpecifiedPeers(peers peer.IDSlice) {
for _, peerID := range peers {
peerData := AddrInfoToPeerData(wps.PeerManager, peerID, pm.host)
if peerData == nil {
@ -377,16 +393,16 @@ func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) {
//TODO: Need to have more intelligent way of doing this, maybe peer scores.
//TODO: Keep optimalPeersRequired for a pubSubTopic in mind while pruning connections to peers.
pm.logger.Info("peer connections exceed target relay peers, hence pruning",
zap.Int("cnt", inRelayPeers.Len()), zap.Int("target", pm.InRelayPeersTarget))
for pruningStartIndex := pm.InRelayPeersTarget; pruningStartIndex < inRelayPeers.Len(); pruningStartIndex++ {
zap.Int("cnt", inRelayPeers.Len()), zap.Int("target", pm.InPeersTarget))
for pruningStartIndex := pm.InPeersTarget; pruningStartIndex < inRelayPeers.Len(); pruningStartIndex++ {
p := inRelayPeers[pruningStartIndex]
err := pm.host.Network().ClosePeer(p)
if err != nil {
pm.logger.Warn("failed to disconnect connection towards peer",
logging.HostID("peerID", p))
zap.Stringer("peerID", p))
}
pm.logger.Debug("successfully disconnected connection towards peer",
logging.HostID("peerID", p))
zap.Stringer("peerID", p))
}
}
@ -394,7 +410,7 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID {
shards, err := wenr.RelaySharding(p.ENR.Record())
if err != nil {
pm.logger.Error("could not derive relayShards from ENR", zap.Error(err),
logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String()))
zap.Stringer("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String()))
} else {
if shards != nil {
p.PubsubTopics = make([]string, 0)
@ -404,7 +420,7 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID {
p.PubsubTopics = append(p.PubsubTopics, topicStr)
}
} else {
pm.logger.Debug("ENR doesn't have relay shards", logging.HostID("peer", p.AddrInfo.ID))
pm.logger.Debug("ENR doesn't have relay shards", zap.Stringer("peer", p.AddrInfo.ID))
}
}
supportedProtos := []protocol.ID{}
@ -430,6 +446,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
if pm.maxPeers <= pm.host.Peerstore().Peers().Len() {
return
}
//Check if the peer is already present, if so skip adding
_, err := pm.host.Peerstore().(wps.WakuPeerstore).Origin(p.AddrInfo.ID)
if err == nil {
@ -447,16 +464,17 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
}
//Peer is already in peer-store but stored ENR is older than discovered one.
pm.logger.Info("peer already found in peerstore, but re-adding it as ENR sequence is higher than locally stored",
logging.HostID("peer", p.AddrInfo.ID), zap.Uint64("newENRSeq", p.ENR.Seq()), zap.Uint64("storedENRSeq", enr.Record().Seq()))
zap.Stringer("peer", p.AddrInfo.ID), logging.Uint64("newENRSeq", p.ENR.Record().Seq()), logging.Uint64("storedENRSeq", enr.Record().Seq()))
} else {
pm.logger.Info("peer already found in peerstore, but no new ENR", logging.HostID("peer", p.AddrInfo.ID))
pm.logger.Info("peer already found in peerstore, but no new ENR", zap.Stringer("peer", p.AddrInfo.ID))
}
} else {
//Peer is in peer-store but it doesn't have an enr
pm.logger.Info("peer already found in peerstore, but doesn't have an ENR record, re-adding",
logging.HostID("peer", p.AddrInfo.ID))
zap.Stringer("peer", p.AddrInfo.ID))
}
}
pm.logger.Debug("adding discovered peer", zap.Stringer("peerID", p.AddrInfo.ID))
supportedProtos := []protocol.ID{}
if len(p.PubsubTopics) == 0 && p.ENR != nil {
@ -467,14 +485,15 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
_ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubsubTopics, supportedProtos...)
if p.ENR != nil {
pm.logger.Debug("setting ENR for peer", zap.Stringer("peerID", p.AddrInfo.ID), zap.Stringer("enr", p.ENR))
err := pm.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR)
if err != nil {
pm.logger.Error("could not store enr", zap.Error(err),
logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String()))
zap.Stringer("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String()))
}
}
if connectNow {
pm.logger.Debug("connecting now to discovered peer", logging.HostID("peer", p.AddrInfo.ID))
pm.logger.Debug("connecting now to discovered peer", zap.Stringer("peer", p.AddrInfo.ID))
go pm.peerConnector.PushToChan(p)
}
}
@ -483,10 +502,10 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
// It also sets additional metadata such as origin and supported protocols
func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) error {
if pm.maxPeers <= pm.host.Peerstore().Peers().Len() {
pm.logger.Error("could not add peer as peer store capacity is reached", logging.HostID("peer", ID), zap.Int("capacity", pm.maxPeers))
pm.logger.Error("could not add peer as peer store capacity is reached", zap.Stringer("peer", ID), zap.Int("capacity", pm.maxPeers))
return errors.New("peer store capacity reached")
}
pm.logger.Info("adding peer to peerstore", logging.HostID("peer", ID))
pm.logger.Info("adding peer to peerstore", zap.Stringer("peer", ID))
if origin == wps.Static {
pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.PermanentAddrTTL)
} else {
@ -496,14 +515,14 @@ func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Orig
}
err := pm.host.Peerstore().(wps.WakuPeerstore).SetOrigin(ID, origin)
if err != nil {
pm.logger.Error("could not set origin", zap.Error(err), logging.HostID("peer", ID))
pm.logger.Error("could not set origin", zap.Error(err), zap.Stringer("peer", ID))
return err
}
if len(protocols) > 0 {
err = pm.host.Peerstore().AddProtocols(ID, protocols...)
if err != nil {
pm.logger.Error("could not set protocols", zap.Error(err), logging.HostID("peer", ID))
pm.logger.Error("could not set protocols", zap.Error(err), zap.Stringer("peer", ID))
return err
}
}
@ -515,7 +534,7 @@ func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Orig
err = pm.host.Peerstore().(wps.WakuPeerstore).SetPubSubTopics(ID, pubSubTopics)
if err != nil {
pm.logger.Error("could not store pubSubTopic", zap.Error(err),
logging.HostID("peer", ID), zap.Strings("topics", pubSubTopics))
zap.Stringer("peer", ID), zap.Strings("topics", pubSubTopics))
}
return nil
}
@ -593,7 +612,7 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) {
//For now adding the peer to serviceSlot which means the latest added peer would be given priority.
//TODO: Ideally we should sort the peers per service and return best peer based on peer score or RTT etc.
pm.logger.Info("adding peer to service slots", logging.HostID("peer", peerID),
pm.logger.Info("adding peer to service slots", zap.Stringer("peer", peerID),
zap.String("service", string(proto)))
// getPeers returns nil for WakuRelayIDv200 protocol, but we don't run this ServiceSlot code for WakuRelayIDv200 protocol
pm.serviceSlots.getPeers(proto).add(peerID)

View File

@ -31,7 +31,7 @@ func initTest(t *testing.T) (context.Context, *PeerManager, func()) {
require.NoError(t, err)
// host 1 is used by peer manager
pm := NewPeerManager(10, 20, nil, utils.Logger())
pm := NewPeerManager(10, 20, nil, true, utils.Logger())
pm.SetHost(h1)
return ctx, pm, func() {
@ -229,7 +229,7 @@ func TestConnectToRelayPeers(t *testing.T) {
defer deferFn()
pm.connectToRelayPeers()
pm.connectToPeers()
}
@ -253,7 +253,7 @@ func createHostWithDiscv5AndPM(t *testing.T, hostName string, topic string, enrF
err = wenr.Update(utils.Logger(), localNode, wenr.WithWakuRelaySharding(rs[0]))
require.NoError(t, err)
pm := NewPeerManager(10, 20, nil, logger)
pm := NewPeerManager(10, 20, nil, true, logger)
pm.SetHost(host)
peerconn, err := NewPeerConnectionStrategy(pm, onlinechecker.NewDefaultOnlineChecker(true), 30*time.Second, logger)
require.NoError(t, err)

View File

@ -48,7 +48,9 @@ func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topic
pm.checkAndUpdateTopicHealth(pm.subRelayTopics[pubsubTopic])
if connectedPeers >= waku_proto.GossipSubDMin { //TODO: Use a config rather than hard-coding.
//Leaving this logic based on gossipSubDMin as this is a good start for a subscribed topic.
// subsequent connectivity loop iteration would initiate more connections which should take it towards a healthy mesh.
if connectedPeers >= waku_proto.GossipSubDMin {
// Should we use optimal number or define some sort of a config for the node to choose from?
// A desktop node may choose this to be 4-6, whereas a service node may choose this to be 8-12 based on resources it has
// or bandwidth it can support.
@ -70,7 +72,7 @@ func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topic
}
//For now all peers are being given same priority,
// Later we may want to choose peers that have more shards in common over others.
pm.connectToPeers(notConnectedPeers[0:numPeersToConnect])
pm.connectToSpecifiedPeers(notConnectedPeers[0:numPeersToConnect])
} else {
triggerDiscovery = true
}

View File

@ -3,6 +3,9 @@ package peermanager
import (
"context"
"crypto/rand"
"testing"
"time"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
@ -17,8 +20,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"testing"
"time"
)
func makeWakuRelay(t *testing.T, log *zap.Logger) (*relay.WakuRelay, host.Host, relay.Broadcaster) {
@ -44,7 +45,7 @@ func makeWakuRelay(t *testing.T, log *zap.Logger) (*relay.WakuRelay, host.Host,
func makePeerManagerWithEventBus(t *testing.T, r *relay.WakuRelay, h *host.Host) (*PeerManager, event.Bus) {
// Host 1 used by peer manager
pm := NewPeerManager(10, 20, nil, utils.Logger())
pm := NewPeerManager(10, 20, nil, true, utils.Logger())
pm.SetHost(*h)
// Create a new relay event bus
@ -77,7 +78,7 @@ func TestSubscribeToRelayEvtBus(t *testing.T) {
r, h1, _ := makeWakuRelay(t, log)
// Host 1 used by peer manager
pm := NewPeerManager(10, 20, nil, utils.Logger())
pm := NewPeerManager(10, 20, nil, true, utils.Logger())
pm.SetHost(h1)
// Create a new relay event bus

View File

@ -165,7 +165,7 @@ func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData {
s.Require().NoError(err)
b := relay.NewBroadcaster(10)
s.Require().NoError(b.Start(context.Background()))
pm := peermanager.NewPeerManager(5, 5, nil, s.Log)
pm := peermanager.NewPeerManager(5, 5, nil, true, s.Log)
filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log)
filterPush.SetHost(host)
pm.SetHost(host)

View File

@ -36,7 +36,7 @@ func TestQueryOptions(t *testing.T) {
require.NoError(t, err)
// Let peer manager reside at host
pm := peermanager.NewPeerManager(5, 5, nil, utils.Logger())
pm := peermanager.NewPeerManager(5, 5, nil, true, utils.Logger())
pm.SetHost(host)
// Add host2 to peerstore

View File

@ -237,7 +237,7 @@ func TestWakuLightPushCornerCases(t *testing.T) {
testContentTopic := "/test/10/my-lp-app/proto"
// Prepare peer manager instance to include in test
pm := peermanager.NewPeerManager(10, 10, nil, utils.Logger())
pm := peermanager.NewPeerManager(10, 10, nil, true, utils.Logger())
node1, sub1, host1 := makeWakuRelay(t, testTopic)
defer node1.Stop()

View File

@ -125,27 +125,30 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*protoc
writer := pbio.NewDelimitedWriter(stream)
reader := pbio.NewDelimitedReader(stream, math.MaxInt32)
logger.Debug("sending metadata request")
err = writer.WriteMsg(request)
if err != nil {
logger.Error("writing request", zap.Error(err))
if err := stream.Reset(); err != nil {
wakuM.log.Error("resetting connection", zap.Error(err))
logger.Error("resetting connection", zap.Error(err))
}
return nil, err
}
logger.Debug("sent metadata request")
response := &pb.WakuMetadataResponse{}
err = reader.ReadMsg(response)
if err != nil {
logger.Error("reading response", zap.Error(err))
if err := stream.Reset(); err != nil {
wakuM.log.Error("resetting connection", zap.Error(err))
logger.Error("resetting connection", zap.Error(err))
}
return nil, err
}
stream.Close()
logger.Debug("received metadata response")
if response.ClusterId == nil {
return nil, errors.New("node did not provide a waku clusterid")
@ -163,6 +166,7 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*protoc
rShardIDs = append(rShardIDs, uint16(i))
}
}
logger.Debug("getting remote cluster and shards")
rs, err := protocol.NewRelayShards(rClusterID, rShardIDs...)
if err != nil {
@ -176,7 +180,7 @@ func (wakuM *WakuMetadata) onRequest(ctx context.Context) func(network.Stream) {
return func(stream network.Stream) {
logger := wakuM.log.With(logging.HostID("peer", stream.Conn().RemotePeer()))
request := &pb.WakuMetadataRequest{}
logger.Debug("received metadata request from peer")
writer := pbio.NewDelimitedWriter(stream)
reader := pbio.NewDelimitedReader(stream, math.MaxInt32)
@ -184,11 +188,10 @@ func (wakuM *WakuMetadata) onRequest(ctx context.Context) func(network.Stream) {
if err != nil {
logger.Error("reading request", zap.Error(err))
if err := stream.Reset(); err != nil {
wakuM.log.Error("resetting connection", zap.Error(err))
logger.Error("resetting connection", zap.Error(err))
}
return
}
response := new(pb.WakuMetadataResponse)
clusterID, shards, err := wakuM.ClusterAndShards()
@ -205,10 +208,11 @@ func (wakuM *WakuMetadata) onRequest(ctx context.Context) func(network.Stream) {
if err != nil {
logger.Error("writing response", zap.Error(err))
if err := stream.Reset(); err != nil {
wakuM.log.Error("resetting connection", zap.Error(err))
logger.Error("resetting connection", zap.Error(err))
}
return
}
logger.Debug("sent metadata response to peer")
stream.Close()
}
@ -248,14 +252,15 @@ func (wakuM *WakuMetadata) disconnectPeer(peerID peer.ID, reason error) {
// Connected is called when a connection is opened
func (wakuM *WakuMetadata) Connected(n network.Network, cc network.Conn) {
go func() {
wakuM.log.Debug("peer connected", zap.Stringer("peer", cc.RemotePeer()))
// Metadata verification is done only if a clusterID is specified
if wakuM.clusterID == 0 {
return
}
peerID := cc.RemotePeer()
shard, err := wakuM.Request(wakuM.ctx, peerID)
if err != nil {
wakuM.disconnectPeer(peerID, err)
return

View File

@ -292,7 +292,7 @@ func TestRetrieveProvidePeerExchangeWithPMAndPeerAddr(t *testing.T) {
require.NoError(t, err)
// Prepare peer manager for host3
pm3 := peermanager.NewPeerManager(10, 20, nil, log)
pm3 := peermanager.NewPeerManager(10, 20, nil, true, log)
pm3.SetHost(host3)
pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, onlinechecker.NewDefaultOnlineChecker(true), 30*time.Second, utils.Logger())
require.NoError(t, err)
@ -367,7 +367,7 @@ func TestRetrieveProvidePeerExchangeWithPMOnly(t *testing.T) {
require.NoError(t, err)
// Prepare peer manager for host3
pm3 := peermanager.NewPeerManager(10, 20, nil, log)
pm3 := peermanager.NewPeerManager(10, 20, nil, true, log)
pm3.SetHost(host3)
pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, onlinechecker.NewDefaultOnlineChecker(true), 30*time.Second, utils.Logger())
require.NoError(t, err)

View File

@ -43,7 +43,7 @@ func TestStoreClient(t *testing.T) {
err = wakuRelay.Start(context.Background())
require.NoError(t, err)
pm := peermanager.NewPeerManager(5, 5, nil, utils.Logger())
pm := peermanager.NewPeerManager(5, 5, nil, true, utils.Logger())
pm.SetHost(host)
err = pm.SubscribeToRelayEvtBus(wakuRelay.Events())
require.NoError(t, err)