feat: modify peer-manager to consider relay target peers for connecting to peers

This commit is contained in:
Prem Chaitanya Prathi 2024-06-24 13:54:06 +05:30
parent 8303c592d3
commit e2b87eee7b
13 changed files with 89 additions and 63 deletions

View File

@ -256,7 +256,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.metadata = metadata w.metadata = metadata
//Initialize peer manager. //Initialize peer manager.
w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, metadata, w.log) w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, metadata, params.enableRelay, w.log)
w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, discoveryConnectTimeout, w.log) w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, discoveryConnectTimeout, w.log)
if err != nil { if err != nil {

View File

@ -44,6 +44,8 @@ const UserAgent string = "go-waku"
const defaultMinRelayPeersToPublish = 0 const defaultMinRelayPeersToPublish = 0
const DefaultMaxConnectionsPerIP = 5 const DefaultMaxConnectionsPerIP = 5
const DefaultMaxConnections = 300
const DefaultMaxPeerStoreCapacity = 300
type WakuNodeParameters struct { type WakuNodeParameters struct {
hostAddr *net.TCPAddr hostAddr *net.TCPAddr
@ -124,9 +126,10 @@ type WakuNodeOption func(*WakuNodeParameters) error
// Default options used in the libp2p node // Default options used in the libp2p node
var DefaultWakuNodeOptions = []WakuNodeOption{ var DefaultWakuNodeOptions = []WakuNodeOption{
WithPrometheusRegisterer(prometheus.NewRegistry()), WithPrometheusRegisterer(prometheus.NewRegistry()),
WithMaxPeerConnections(50), WithMaxPeerConnections(DefaultMaxConnections),
WithMaxConnectionsPerIP(DefaultMaxConnectionsPerIP), WithMaxConnectionsPerIP(DefaultMaxConnectionsPerIP),
WithCircuitRelayParams(2*time.Second, 3*time.Minute), WithCircuitRelayParams(2*time.Second, 3*time.Minute),
WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity),
} }
// MultiAddresses return the list of multiaddresses configured in the node // MultiAddresses return the list of multiaddresses configured in the node

View File

@ -17,7 +17,6 @@ import (
"github.com/libp2p/go-libp2p/p2p/discovery/backoff" "github.com/libp2p/go-libp2p/p2p/discovery/backoff"
"github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/logging"
wps "github.com/waku-org/go-waku/waku/v2/peerstore" wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/service"
"go.uber.org/zap" "go.uber.org/zap"
@ -127,7 +126,7 @@ func (c *PeerConnectionStrategy) consumeSubscription(s subscription) {
triggerImmediateConnection := false triggerImmediateConnection := false
//Not connecting to peer as soon as it is discovered, //Not connecting to peer as soon as it is discovered,
// rather expecting this to be pushed from PeerManager based on the need. // rather expecting this to be pushed from PeerManager based on the need.
if len(c.host.Network().Peers()) < waku_proto.GossipSubDMin { if len(c.host.Network().Peers()) < c.pm.OutPeersTarget {
triggerImmediateConnection = true triggerImmediateConnection = true
} }
c.logger.Debug("adding discovered peer", logging.HostID("peerID", p.AddrInfo.ID)) c.logger.Debug("adding discovered peer", logging.HostID("peerID", p.AddrInfo.ID))
@ -227,7 +226,7 @@ func (c *PeerConnectionStrategy) addConnectionBackoff(peerID peer.ID) {
func (c *PeerConnectionStrategy) dialPeers() { func (c *PeerConnectionStrategy) dialPeers() {
defer c.WaitGroup().Done() defer c.WaitGroup().Done()
maxGoRoutines := c.pm.OutRelayPeersTarget maxGoRoutines := c.pm.OutPeersTarget
if maxGoRoutines > maxActiveDials { if maxGoRoutines > maxActiveDials {
maxGoRoutines = maxActiveDials maxGoRoutines = maxActiveDials
} }

View File

@ -73,8 +73,8 @@ type PeerManager struct {
maxPeers int maxPeers int
maxRelayPeers int maxRelayPeers int
logger *zap.Logger logger *zap.Logger
InRelayPeersTarget int InPeersTarget int
OutRelayPeersTarget int OutPeersTarget int
host host.Host host host.Host
serviceSlots *ServiceSlots serviceSlots *ServiceSlots
ctx context.Context ctx context.Context
@ -85,6 +85,7 @@ type PeerManager struct {
wakuprotoToENRFieldMap map[protocol.ID]WakuProtoInfo wakuprotoToENRFieldMap map[protocol.ID]WakuProtoInfo
TopicHealthNotifCh chan<- TopicHealthStatus TopicHealthNotifCh chan<- TopicHealthStatus
rttCache *FastestPeerSelector rttCache *FastestPeerSelector
RelayEnabled bool
} }
// PeerSelection provides various options based on which Peer is selected from a list of peers. // PeerSelection provides various options based on which Peer is selected from a list of peers.
@ -143,6 +144,7 @@ func (pm *PeerManager) checkAndUpdateTopicHealth(topic *NodeTopicDetails) int {
} }
} }
//Update topic's health //Update topic's health
//TODO: This should be done based on number of full-mesh peers.
oldHealth := topic.healthStatus oldHealth := topic.healthStatus
if healthyPeerCount < 1 { //Ideally this check should be done with minPeersForRelay, but leaving it as is for now. if healthyPeerCount < 1 { //Ideally this check should be done with minPeersForRelay, but leaving it as is for now.
topic.healthStatus = UnHealthy topic.healthStatus = UnHealthy
@ -174,31 +176,38 @@ func (pm *PeerManager) TopicHealth(pubsubTopic string) (TopicHealth, error) {
} }
// NewPeerManager creates a new peerManager instance. // NewPeerManager creates a new peerManager instance.
func NewPeerManager(maxConnections int, maxPeers int, metadata *metadata.WakuMetadata, logger *zap.Logger) *PeerManager { func NewPeerManager(maxConnections int, maxPeers int, metadata *metadata.WakuMetadata, relayEnabled bool, logger *zap.Logger) *PeerManager {
var inPeersTarget, outPeersTarget, maxRelayPeers int
if relayEnabled {
maxRelayPeers, _ := relayAndServicePeers(maxConnections)
inPeersTarget, outPeersTarget = inAndOutRelayPeers(maxRelayPeers)
maxRelayPeers, _ := relayAndServicePeers(maxConnections) if maxPeers == 0 || maxConnections > maxPeers {
inRelayPeersTarget, outRelayPeersTarget := inAndOutRelayPeers(maxRelayPeers) maxPeers = maxConnsToPeerRatio * maxConnections
}
if maxPeers == 0 || maxConnections > maxPeers { } else {
maxPeers = maxConnsToPeerRatio * maxConnections maxRelayPeers = 0
inPeersTarget = 0
//TODO: ideally this should be 2 filter peers per topic, 2 lightpush peers per topic and 2-4 store nodes per topic
outPeersTarget = 10
} }
pm := &PeerManager{ pm := &PeerManager{
logger: logger.Named("peer-manager"), logger: logger.Named("peer-manager"),
metadata: metadata, metadata: metadata,
maxRelayPeers: maxRelayPeers, maxRelayPeers: maxRelayPeers,
InRelayPeersTarget: inRelayPeersTarget, InPeersTarget: inPeersTarget,
OutRelayPeersTarget: outRelayPeersTarget, OutPeersTarget: outPeersTarget,
serviceSlots: NewServiceSlot(), serviceSlots: NewServiceSlot(),
subRelayTopics: make(map[string]*NodeTopicDetails), subRelayTopics: make(map[string]*NodeTopicDetails),
maxPeers: maxPeers, maxPeers: maxPeers,
wakuprotoToENRFieldMap: map[protocol.ID]WakuProtoInfo{}, wakuprotoToENRFieldMap: map[protocol.ID]WakuProtoInfo{},
rttCache: NewFastestPeerSelector(logger), rttCache: NewFastestPeerSelector(logger),
RelayEnabled: relayEnabled,
} }
logger.Info("PeerManager init values", zap.Int("maxConnections", maxConnections), logger.Info("PeerManager init values", zap.Int("maxConnections", maxConnections),
zap.Int("maxRelayPeers", maxRelayPeers), zap.Int("maxRelayPeers", maxRelayPeers),
zap.Int("outRelayPeersTarget", outRelayPeersTarget), zap.Int("outPeersTarget", outPeersTarget),
zap.Int("inRelayPeersTarget", pm.InRelayPeersTarget), zap.Int("inPeersTarget", pm.InPeersTarget),
zap.Int("maxPeers", maxPeers)) zap.Int("maxPeers", maxPeers))
return pm return pm
@ -225,7 +234,7 @@ func (pm *PeerManager) Start(ctx context.Context) {
pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField)
pm.ctx = ctx pm.ctx = ctx
if pm.sub != nil { if pm.sub != nil && pm.RelayEnabled {
go pm.peerEventLoop(ctx) go pm.peerEventLoop(ctx)
} }
go pm.connectivityLoop(ctx) go pm.connectivityLoop(ctx)
@ -233,7 +242,7 @@ func (pm *PeerManager) Start(ctx context.Context) {
// This is a connectivity loop, which currently checks and prunes inbound connections. // This is a connectivity loop, which currently checks and prunes inbound connections.
func (pm *PeerManager) connectivityLoop(ctx context.Context) { func (pm *PeerManager) connectivityLoop(ctx context.Context) {
pm.connectToRelayPeers() pm.connectToPeers()
t := time.NewTicker(peerConnectivityLoopSecs * time.Second) t := time.NewTicker(peerConnectivityLoopSecs * time.Second)
defer t.Stop() defer t.Stop()
for { for {
@ -241,7 +250,7 @@ func (pm *PeerManager) connectivityLoop(ctx context.Context) {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-t.C: case <-t.C:
pm.connectToRelayPeers() pm.connectToPeers()
} }
} }
} }
@ -302,10 +311,10 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
// match those peers that are currently connected // match those peers that are currently connected
curPeerLen := pm.checkAndUpdateTopicHealth(topicInst) curPeerLen := pm.checkAndUpdateTopicHealth(topicInst)
if curPeerLen < waku_proto.GossipSubDMin { if curPeerLen < pm.OutPeersTarget {
pm.logger.Debug("subscribed topic is not sufficiently healthy, initiating more connections to maintain health", pm.logger.Debug("subscribed topic has not reached target peers, initiating more connections to maintain healthy mesh",
zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen), zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen),
zap.Int("optimumPeers", waku_proto.GossipSubDMin)) zap.Int("targetPeers", pm.OutPeersTarget))
//Find not connected peers. //Find not connected peers.
notConnectedPeers := pm.getNotConnectedPers(topicStr) notConnectedPeers := pm.getNotConnectedPers(topicStr)
if notConnectedPeers.Len() == 0 { if notConnectedPeers.Len() == 0 {
@ -315,35 +324,42 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
} }
pm.logger.Debug("connecting to eligible peers in peerstore", zap.String("pubSubTopic", topicStr)) pm.logger.Debug("connecting to eligible peers in peerstore", zap.String("pubSubTopic", topicStr))
//Connect to eligible peers. //Connect to eligible peers.
numPeersToConnect := waku_proto.GossipSubDMin - curPeerLen numPeersToConnect := pm.OutPeersTarget - curPeerLen
if numPeersToConnect > notConnectedPeers.Len() { if numPeersToConnect > notConnectedPeers.Len() {
numPeersToConnect = notConnectedPeers.Len() numPeersToConnect = notConnectedPeers.Len()
} }
pm.connectToPeers(notConnectedPeers[0:numPeersToConnect]) pm.connectToSpecifiedPeers(notConnectedPeers[0:numPeersToConnect])
} }
} }
} }
// connectToRelayPeers ensures minimum D connections are there for each pubSubTopic. // connectToPeers ensures minimum D connections are there for each pubSubTopic.
// If not, initiates connections to additional peers. // If not, initiates connections to additional peers.
// It also checks for incoming relay connections and prunes once they cross inRelayTarget // It also checks for incoming relay connections and prunes once they cross inRelayTarget
func (pm *PeerManager) connectToRelayPeers() { func (pm *PeerManager) connectToPeers() {
//Check for out peer connections and connect to more peers. if pm.RelayEnabled {
pm.ensureMinRelayConnsPerTopic() //Check for out peer connections and connect to more peers.
pm.ensureMinRelayConnsPerTopic()
inRelayPeers, outRelayPeers := pm.getRelayPeers() inRelayPeers, outRelayPeers := pm.getRelayPeers()
pm.logger.Debug("number of relay peers connected", pm.logger.Debug("number of relay peers connected",
zap.Int("in", inRelayPeers.Len()), zap.Int("in", inRelayPeers.Len()),
zap.Int("out", outRelayPeers.Len())) zap.Int("out", outRelayPeers.Len()))
if inRelayPeers.Len() > 0 && if inRelayPeers.Len() > 0 &&
inRelayPeers.Len() > pm.InRelayPeersTarget { inRelayPeers.Len() > pm.InPeersTarget {
pm.pruneInRelayConns(inRelayPeers) pm.pruneInRelayConns(inRelayPeers)
}
} else {
//TODO: Connect to filter peers per topic as of now.
//Fetch filter peers from peerStore, TODO: topics for lightNode not available here?
//Filter subscribe to notify peerManager whenever a new topic/shard is subscribed to.
pm.logger.Debug("light mode..not doing anything")
} }
} }
// connectToPeers connects to peers provided in the list if the addresses have not expired. // connectToSpecifiedPeers connects to peers provided in the list if the addresses have not expired.
func (pm *PeerManager) connectToPeers(peers peer.IDSlice) { func (pm *PeerManager) connectToSpecifiedPeers(peers peer.IDSlice) {
for _, peerID := range peers { for _, peerID := range peers {
peerData := AddrInfoToPeerData(wps.PeerManager, peerID, pm.host) peerData := AddrInfoToPeerData(wps.PeerManager, peerID, pm.host)
if peerData == nil { if peerData == nil {
@ -377,8 +393,8 @@ func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) {
//TODO: Need to have more intelligent way of doing this, maybe peer scores. //TODO: Need to have more intelligent way of doing this, maybe peer scores.
//TODO: Keep optimalPeersRequired for a pubSubTopic in mind while pruning connections to peers. //TODO: Keep optimalPeersRequired for a pubSubTopic in mind while pruning connections to peers.
pm.logger.Info("peer connections exceed target relay peers, hence pruning", pm.logger.Info("peer connections exceed target relay peers, hence pruning",
zap.Int("cnt", inRelayPeers.Len()), zap.Int("target", pm.InRelayPeersTarget)) zap.Int("cnt", inRelayPeers.Len()), zap.Int("target", pm.InPeersTarget))
for pruningStartIndex := pm.InRelayPeersTarget; pruningStartIndex < inRelayPeers.Len(); pruningStartIndex++ { for pruningStartIndex := pm.InPeersTarget; pruningStartIndex < inRelayPeers.Len(); pruningStartIndex++ {
p := inRelayPeers[pruningStartIndex] p := inRelayPeers[pruningStartIndex]
err := pm.host.Network().ClosePeer(p) err := pm.host.Network().ClosePeer(p)
if err != nil { if err != nil {

View File

@ -30,7 +30,7 @@ func initTest(t *testing.T) (context.Context, *PeerManager, func()) {
require.NoError(t, err) require.NoError(t, err)
// host 1 is used by peer manager // host 1 is used by peer manager
pm := NewPeerManager(10, 20, nil, utils.Logger()) pm := NewPeerManager(10, 20, nil, true, utils.Logger())
pm.SetHost(h1) pm.SetHost(h1)
return ctx, pm, func() { return ctx, pm, func() {
@ -228,7 +228,7 @@ func TestConnectToRelayPeers(t *testing.T) {
defer deferFn() defer deferFn()
pm.connectToRelayPeers() pm.connectToPeers()
} }
@ -252,7 +252,7 @@ func createHostWithDiscv5AndPM(t *testing.T, hostName string, topic string, enrF
err = wenr.Update(utils.Logger(), localNode, wenr.WithWakuRelaySharding(rs[0])) err = wenr.Update(utils.Logger(), localNode, wenr.WithWakuRelaySharding(rs[0]))
require.NoError(t, err) require.NoError(t, err)
pm := NewPeerManager(10, 20, nil, logger) pm := NewPeerManager(10, 20, nil, true, logger)
pm.SetHost(host) pm.SetHost(host)
peerconn, err := NewPeerConnectionStrategy(pm, 30*time.Second, logger) peerconn, err := NewPeerConnectionStrategy(pm, 30*time.Second, logger)
require.NoError(t, err) require.NoError(t, err)

View File

@ -48,7 +48,9 @@ func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topic
pm.checkAndUpdateTopicHealth(pm.subRelayTopics[pubsubTopic]) pm.checkAndUpdateTopicHealth(pm.subRelayTopics[pubsubTopic])
if connectedPeers >= waku_proto.GossipSubDMin { //TODO: Use a config rather than hard-coding. //Leaving this logic based on gossipSubDMin as this is a good start for a subscribed topic.
// subsequent connectivity loop iteration would initiate more connections which should take it towards a healthy mesh.
if connectedPeers >= waku_proto.GossipSubDMin {
// Should we use optimal number or define some sort of a config for the node to choose from? // Should we use optimal number or define some sort of a config for the node to choose from?
// A desktop node may choose this to be 4-6, whereas a service node may choose this to be 8-12 based on resources it has // A desktop node may choose this to be 4-6, whereas a service node may choose this to be 8-12 based on resources it has
// or bandwidth it can support. // or bandwidth it can support.
@ -70,7 +72,7 @@ func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topic
} }
//For now all peers are being given same priority, //For now all peers are being given same priority,
// Later we may want to choose peers that have more shards in common over others. // Later we may want to choose peers that have more shards in common over others.
pm.connectToPeers(notConnectedPeers[0:numPeersToConnect]) pm.connectToSpecifiedPeers(notConnectedPeers[0:numPeersToConnect])
} else { } else {
triggerDiscovery = true triggerDiscovery = true
} }

View File

@ -3,6 +3,9 @@ package peermanager
import ( import (
"context" "context"
"crypto/rand" "crypto/rand"
"testing"
"time"
"github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/network"
@ -17,8 +20,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap" "go.uber.org/zap"
"testing"
"time"
) )
func makeWakuRelay(t *testing.T, log *zap.Logger) (*relay.WakuRelay, host.Host, relay.Broadcaster) { func makeWakuRelay(t *testing.T, log *zap.Logger) (*relay.WakuRelay, host.Host, relay.Broadcaster) {
@ -44,7 +45,7 @@ func makeWakuRelay(t *testing.T, log *zap.Logger) (*relay.WakuRelay, host.Host,
func makePeerManagerWithEventBus(t *testing.T, r *relay.WakuRelay, h *host.Host) (*PeerManager, event.Bus) { func makePeerManagerWithEventBus(t *testing.T, r *relay.WakuRelay, h *host.Host) (*PeerManager, event.Bus) {
// Host 1 used by peer manager // Host 1 used by peer manager
pm := NewPeerManager(10, 20, nil, utils.Logger()) pm := NewPeerManager(10, 20, nil, true, utils.Logger())
pm.SetHost(*h) pm.SetHost(*h)
// Create a new relay event bus // Create a new relay event bus
@ -77,7 +78,7 @@ func TestSubscribeToRelayEvtBus(t *testing.T) {
r, h1, _ := makeWakuRelay(t, log) r, h1, _ := makeWakuRelay(t, log)
// Host 1 used by peer manager // Host 1 used by peer manager
pm := NewPeerManager(10, 20, nil, utils.Logger()) pm := NewPeerManager(10, 20, nil, true, utils.Logger())
pm.SetHost(h1) pm.SetHost(h1)
// Create a new relay event bus // Create a new relay event bus

View File

@ -164,7 +164,7 @@ func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData {
s.Require().NoError(err) s.Require().NoError(err)
b := relay.NewBroadcaster(10) b := relay.NewBroadcaster(10)
s.Require().NoError(b.Start(context.Background())) s.Require().NoError(b.Start(context.Background()))
pm := peermanager.NewPeerManager(5, 5, nil, s.Log) pm := peermanager.NewPeerManager(5, 5, nil, true, s.Log)
filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log) filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log)
filterPush.SetHost(host) filterPush.SetHost(host)
pm.SetHost(host) pm.SetHost(host)

View File

@ -36,7 +36,7 @@ func TestQueryOptions(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// Let peer manager reside at host // Let peer manager reside at host
pm := peermanager.NewPeerManager(5, 5, nil, utils.Logger()) pm := peermanager.NewPeerManager(5, 5, nil, true, utils.Logger())
pm.SetHost(host) pm.SetHost(host)
// Add host2 to peerstore // Add host2 to peerstore

View File

@ -237,7 +237,7 @@ func TestWakuLightPushCornerCases(t *testing.T) {
testContentTopic := "/test/10/my-lp-app/proto" testContentTopic := "/test/10/my-lp-app/proto"
// Prepare peer manager instance to include in test // Prepare peer manager instance to include in test
pm := peermanager.NewPeerManager(10, 10, nil, utils.Logger()) pm := peermanager.NewPeerManager(10, 10, nil, true, utils.Logger())
node1, sub1, host1 := makeWakuRelay(t, testTopic) node1, sub1, host1 := makeWakuRelay(t, testTopic)
defer node1.Stop() defer node1.Stop()

View File

@ -125,27 +125,30 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*protoc
writer := pbio.NewDelimitedWriter(stream) writer := pbio.NewDelimitedWriter(stream)
reader := pbio.NewDelimitedReader(stream, math.MaxInt32) reader := pbio.NewDelimitedReader(stream, math.MaxInt32)
logger.Debug("sending metadata request")
err = writer.WriteMsg(request) err = writer.WriteMsg(request)
if err != nil { if err != nil {
logger.Error("writing request", zap.Error(err)) logger.Error("writing request", zap.Error(err))
if err := stream.Reset(); err != nil { if err := stream.Reset(); err != nil {
wakuM.log.Error("resetting connection", zap.Error(err)) logger.Error("resetting connection", zap.Error(err))
} }
return nil, err return nil, err
} }
logger.Debug("sent metadata request")
response := &pb.WakuMetadataResponse{} response := &pb.WakuMetadataResponse{}
err = reader.ReadMsg(response) err = reader.ReadMsg(response)
if err != nil { if err != nil {
logger.Error("reading response", zap.Error(err)) logger.Error("reading response", zap.Error(err))
if err := stream.Reset(); err != nil { if err := stream.Reset(); err != nil {
wakuM.log.Error("resetting connection", zap.Error(err)) logger.Error("resetting connection", zap.Error(err))
} }
return nil, err return nil, err
} }
stream.Close() stream.Close()
logger.Debug("received metadata response")
if response.ClusterId == nil { if response.ClusterId == nil {
return nil, errors.New("node did not provide a waku clusterid") return nil, errors.New("node did not provide a waku clusterid")
@ -163,6 +166,7 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*protoc
rShardIDs = append(rShardIDs, uint16(i)) rShardIDs = append(rShardIDs, uint16(i))
} }
} }
logger.Debug("getting remote cluster and shards")
rs, err := protocol.NewRelayShards(rClusterID, rShardIDs...) rs, err := protocol.NewRelayShards(rClusterID, rShardIDs...)
if err != nil { if err != nil {
@ -176,7 +180,7 @@ func (wakuM *WakuMetadata) onRequest(ctx context.Context) func(network.Stream) {
return func(stream network.Stream) { return func(stream network.Stream) {
logger := wakuM.log.With(logging.HostID("peer", stream.Conn().RemotePeer())) logger := wakuM.log.With(logging.HostID("peer", stream.Conn().RemotePeer()))
request := &pb.WakuMetadataRequest{} request := &pb.WakuMetadataRequest{}
logger.Debug("received metadata request from peer")
writer := pbio.NewDelimitedWriter(stream) writer := pbio.NewDelimitedWriter(stream)
reader := pbio.NewDelimitedReader(stream, math.MaxInt32) reader := pbio.NewDelimitedReader(stream, math.MaxInt32)
@ -184,11 +188,10 @@ func (wakuM *WakuMetadata) onRequest(ctx context.Context) func(network.Stream) {
if err != nil { if err != nil {
logger.Error("reading request", zap.Error(err)) logger.Error("reading request", zap.Error(err))
if err := stream.Reset(); err != nil { if err := stream.Reset(); err != nil {
wakuM.log.Error("resetting connection", zap.Error(err)) logger.Error("resetting connection", zap.Error(err))
} }
return return
} }
response := new(pb.WakuMetadataResponse) response := new(pb.WakuMetadataResponse)
clusterID, shards, err := wakuM.ClusterAndShards() clusterID, shards, err := wakuM.ClusterAndShards()
@ -205,10 +208,11 @@ func (wakuM *WakuMetadata) onRequest(ctx context.Context) func(network.Stream) {
if err != nil { if err != nil {
logger.Error("writing response", zap.Error(err)) logger.Error("writing response", zap.Error(err))
if err := stream.Reset(); err != nil { if err := stream.Reset(); err != nil {
wakuM.log.Error("resetting connection", zap.Error(err)) logger.Error("resetting connection", zap.Error(err))
} }
return return
} }
logger.Debug("sent metadata response to peer")
stream.Close() stream.Close()
} }
@ -248,14 +252,15 @@ func (wakuM *WakuMetadata) disconnectPeer(peerID peer.ID, reason error) {
// Connected is called when a connection is opened // Connected is called when a connection is opened
func (wakuM *WakuMetadata) Connected(n network.Network, cc network.Conn) { func (wakuM *WakuMetadata) Connected(n network.Network, cc network.Conn) {
go func() { go func() {
wakuM.log.Debug("peer connected", zap.Stringer("peer", cc.RemotePeer()))
// Metadata verification is done only if a clusterID is specified // Metadata verification is done only if a clusterID is specified
if wakuM.clusterID == 0 { if wakuM.clusterID == 0 {
return return
} }
peerID := cc.RemotePeer() peerID := cc.RemotePeer()
shard, err := wakuM.Request(wakuM.ctx, peerID) shard, err := wakuM.Request(wakuM.ctx, peerID)
if err != nil { if err != nil {
wakuM.disconnectPeer(peerID, err) wakuM.disconnectPeer(peerID, err)
return return

View File

@ -291,7 +291,7 @@ func TestRetrieveProvidePeerExchangeWithPMAndPeerAddr(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// Prepare peer manager for host3 // Prepare peer manager for host3
pm3 := peermanager.NewPeerManager(10, 20, nil, log) pm3 := peermanager.NewPeerManager(10, 20, nil, true, log)
pm3.SetHost(host3) pm3.SetHost(host3)
pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, 30*time.Second, utils.Logger()) pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, 30*time.Second, utils.Logger())
require.NoError(t, err) require.NoError(t, err)
@ -366,7 +366,7 @@ func TestRetrieveProvidePeerExchangeWithPMOnly(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// Prepare peer manager for host3 // Prepare peer manager for host3
pm3 := peermanager.NewPeerManager(10, 20, nil, log) pm3 := peermanager.NewPeerManager(10, 20, nil, true, log)
pm3.SetHost(host3) pm3.SetHost(host3)
pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, 30*time.Second, utils.Logger()) pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, 30*time.Second, utils.Logger())
require.NoError(t, err) require.NoError(t, err)

View File

@ -43,7 +43,7 @@ func TestStoreClient(t *testing.T) {
err = wakuRelay.Start(context.Background()) err = wakuRelay.Start(context.Background())
require.NoError(t, err) require.NoError(t, err)
pm := peermanager.NewPeerManager(5, 5, nil, utils.Logger()) pm := peermanager.NewPeerManager(5, 5, nil, true, utils.Logger())
pm.SetHost(host) pm.SetHost(host)
err = pm.SubscribeToRelayEvtBus(wakuRelay.Events()) err = pm.SubscribeToRelayEvtBus(wakuRelay.Events())
require.NoError(t, err) require.NoError(t, err)