From 8d7c2f7bfad04e0de9829b42b6ff02a02939d200 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Thu, 27 Jun 2024 10:02:01 -0400 Subject: [PATCH] feat: filter peers stored in cache by cluster-id in peer-exchange (#1139) --- waku/v2/node/wakunode2.go | 2 +- waku/v2/protocol/peer_exchange/enr_cache.go | 21 ++++++++++++++++--- waku/v2/protocol/peer_exchange/protocol.go | 4 ++-- .../peer_exchange/waku_peer_exchange_test.go | 18 ++++++++-------- 4 files changed, 30 insertions(+), 15 deletions(-) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index d8c280c1..3e9f137e 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -270,7 +270,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { } } - w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.DiscV5(), w.peerConnector, w.peermanager, w.opts.prometheusReg, w.log) + w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.DiscV5(), w.opts.clusterID, w.peerConnector, w.peermanager, w.opts.prometheusReg, w.log) if err != nil { return nil, err } diff --git a/waku/v2/protocol/peer_exchange/enr_cache.go b/waku/v2/protocol/peer_exchange/enr_cache.go index 90fb096a..81f1b89c 100644 --- a/waku/v2/protocol/peer_exchange/enr_cache.go +++ b/waku/v2/protocol/peer_exchange/enr_cache.go @@ -6,25 +6,40 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" + wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb" ) // simpleLRU internal uses container/list, which is ring buffer(double linked list) type enrCache struct { // using lru, saves us from periodically cleaning the cache to mauintain a certain size - data *shardLRU + data *shardLRU + clusterID uint16 } // err on negative size -func newEnrCache(size int) *enrCache { +func newEnrCache(size int, clusterID uint16) *enrCache { inner := newShardLRU(int(size)) return &enrCache{ - data: inner, + data: inner, + clusterID: clusterID, } } // updating cache func (c *enrCache) updateCache(node *enode.Node) error { + if c.clusterID != 0 { + rs, err := wenr.RelaySharding(node.Record()) + if err != nil || rs == nil { + // Node does not contain valid shard information, ignoring... + return nil + } + + if rs.ClusterID != c.clusterID { + return nil + } + } + currNode := c.data.Get(node.ID()) if currNode == nil || node.Seq() > currNode.Seq() { return c.data.Add(node) diff --git a/waku/v2/protocol/peer_exchange/protocol.go b/waku/v2/protocol/peer_exchange/protocol.go index a70b34f6..c02cdca6 100644 --- a/waku/v2/protocol/peer_exchange/protocol.go +++ b/waku/v2/protocol/peer_exchange/protocol.go @@ -54,12 +54,12 @@ type WakuPeerExchange struct { // NewWakuPeerExchange returns a new instance of WakuPeerExchange struct // Takes an optional peermanager if WakuPeerExchange is being created along with WakuNode. // If using libp2p host, then pass peermanager as nil -func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, pm *peermanager.PeerManager, reg prometheus.Registerer, log *zap.Logger, opts ...Option) (*WakuPeerExchange, error) { +func NewWakuPeerExchange(disc *discv5.DiscoveryV5, clusterID uint16, peerConnector PeerConnector, pm *peermanager.PeerManager, reg prometheus.Registerer, log *zap.Logger, opts ...Option) (*WakuPeerExchange, error) { wakuPX := new(WakuPeerExchange) wakuPX.disc = disc wakuPX.metrics = newMetrics(reg) wakuPX.log = log.Named("wakupx") - wakuPX.enrCache = newEnrCache(MaxCacheSize) + wakuPX.enrCache = newEnrCache(MaxCacheSize, clusterID) wakuPX.peerConnector = peerConnector wakuPX.pm = pm wakuPX.CommonService = service.NewCommonService() diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index 92280810..dded32e2 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -91,12 +91,12 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { // mount peer exchange pxPeerConn1 := discv5.NewTestPeerDiscoverer() - px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) + px1, err := NewWakuPeerExchange(d1, 0, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) require.NoError(t, err) px1.SetHost(host1) pxPeerConn3 := discv5.NewTestPeerDiscoverer() - px3, err := NewWakuPeerExchange(nil, pxPeerConn3, nil, prometheus.DefaultRegisterer, utils.Logger()) + px3, err := NewWakuPeerExchange(nil, 0, pxPeerConn3, nil, prometheus.DefaultRegisterer, utils.Logger()) require.NoError(t, err) px3.SetHost(host3) @@ -171,12 +171,12 @@ func TestRetrieveFilteredPeerExchangePeers(t *testing.T) { // mount peer exchange pxPeerConn1 := discv5.NewTestPeerDiscoverer() - px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) + px1, err := NewWakuPeerExchange(d1, 0, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) require.NoError(t, err) px1.SetHost(host1) pxPeerConn3 := discv5.NewTestPeerDiscoverer() - px3, err := NewWakuPeerExchange(nil, pxPeerConn3, nil, prometheus.DefaultRegisterer, utils.Logger()) + px3, err := NewWakuPeerExchange(nil, 0, pxPeerConn3, nil, prometheus.DefaultRegisterer, utils.Logger()) require.NoError(t, err) px3.SetHost(host3) @@ -234,7 +234,7 @@ func TestPeerExchangeOptions(t *testing.T) { // Mount peer exchange pxPeerConn1 := discv5.NewTestPeerDiscoverer() - px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) + px1, err := NewWakuPeerExchange(d1, 0, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) require.NoError(t, err) px1.SetHost(host1) @@ -302,11 +302,11 @@ func TestRetrieveProvidePeerExchangeWithPMAndPeerAddr(t *testing.T) { // mount peer exchange pxPeerConn1 := discv5.NewTestPeerDiscoverer() - px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) + px1, err := NewWakuPeerExchange(d1, 0, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) require.NoError(t, err) px1.SetHost(host1) - px3, err := NewWakuPeerExchange(nil, pxPeerConn3, pm3, prometheus.DefaultRegisterer, utils.Logger()) + px3, err := NewWakuPeerExchange(nil, 0, pxPeerConn3, pm3, prometheus.DefaultRegisterer, utils.Logger()) require.NoError(t, err) px3.SetHost(host3) @@ -382,11 +382,11 @@ func TestRetrieveProvidePeerExchangeWithPMOnly(t *testing.T) { // mount peer exchange pxPeerConn1 := discv5.NewTestPeerDiscoverer() - px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) + px1, err := NewWakuPeerExchange(d1, 0, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) require.NoError(t, err) px1.SetHost(host1) - px3, err := NewWakuPeerExchange(nil, pxPeerConn3, pm3, prometheus.DefaultRegisterer, utils.Logger()) + px3, err := NewWakuPeerExchange(nil, 0, pxPeerConn3, pm3, prometheus.DefaultRegisterer, utils.Logger()) require.NoError(t, err) px3.SetHost(host3)