feat: filter peers stored in cache by cluster-id in peer-exchange (#1139)

This commit is contained in:
richΛrd 2024-06-27 10:02:01 -04:00 committed by GitHub
parent 19a47a1ac1
commit 8d7c2f7bfa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 30 additions and 15 deletions

View File

@ -270,7 +270,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
} }
} }
w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.DiscV5(), w.peerConnector, w.peermanager, w.opts.prometheusReg, w.log) w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.DiscV5(), w.opts.clusterID, w.peerConnector, w.peermanager, w.opts.prometheusReg, w.log)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -6,6 +6,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enode"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb"
) )
@ -13,18 +14,32 @@ import (
type enrCache struct { type enrCache struct {
// using lru, saves us from periodically cleaning the cache to mauintain a certain size // using lru, saves us from periodically cleaning the cache to mauintain a certain size
data *shardLRU data *shardLRU
clusterID uint16
} }
// err on negative size // err on negative size
func newEnrCache(size int) *enrCache { func newEnrCache(size int, clusterID uint16) *enrCache {
inner := newShardLRU(int(size)) inner := newShardLRU(int(size))
return &enrCache{ return &enrCache{
data: inner, data: inner,
clusterID: clusterID,
} }
} }
// updating cache // updating cache
func (c *enrCache) updateCache(node *enode.Node) error { func (c *enrCache) updateCache(node *enode.Node) error {
if c.clusterID != 0 {
rs, err := wenr.RelaySharding(node.Record())
if err != nil || rs == nil {
// Node does not contain valid shard information, ignoring...
return nil
}
if rs.ClusterID != c.clusterID {
return nil
}
}
currNode := c.data.Get(node.ID()) currNode := c.data.Get(node.ID())
if currNode == nil || node.Seq() > currNode.Seq() { if currNode == nil || node.Seq() > currNode.Seq() {
return c.data.Add(node) return c.data.Add(node)

View File

@ -54,12 +54,12 @@ type WakuPeerExchange struct {
// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct // NewWakuPeerExchange returns a new instance of WakuPeerExchange struct
// Takes an optional peermanager if WakuPeerExchange is being created along with WakuNode. // Takes an optional peermanager if WakuPeerExchange is being created along with WakuNode.
// If using libp2p host, then pass peermanager as nil // If using libp2p host, then pass peermanager as nil
func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, pm *peermanager.PeerManager, reg prometheus.Registerer, log *zap.Logger, opts ...Option) (*WakuPeerExchange, error) { func NewWakuPeerExchange(disc *discv5.DiscoveryV5, clusterID uint16, peerConnector PeerConnector, pm *peermanager.PeerManager, reg prometheus.Registerer, log *zap.Logger, opts ...Option) (*WakuPeerExchange, error) {
wakuPX := new(WakuPeerExchange) wakuPX := new(WakuPeerExchange)
wakuPX.disc = disc wakuPX.disc = disc
wakuPX.metrics = newMetrics(reg) wakuPX.metrics = newMetrics(reg)
wakuPX.log = log.Named("wakupx") wakuPX.log = log.Named("wakupx")
wakuPX.enrCache = newEnrCache(MaxCacheSize) wakuPX.enrCache = newEnrCache(MaxCacheSize, clusterID)
wakuPX.peerConnector = peerConnector wakuPX.peerConnector = peerConnector
wakuPX.pm = pm wakuPX.pm = pm
wakuPX.CommonService = service.NewCommonService() wakuPX.CommonService = service.NewCommonService()

View File

@ -91,12 +91,12 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) {
// mount peer exchange // mount peer exchange
pxPeerConn1 := discv5.NewTestPeerDiscoverer() pxPeerConn1 := discv5.NewTestPeerDiscoverer()
px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) px1, err := NewWakuPeerExchange(d1, 0, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger())
require.NoError(t, err) require.NoError(t, err)
px1.SetHost(host1) px1.SetHost(host1)
pxPeerConn3 := discv5.NewTestPeerDiscoverer() pxPeerConn3 := discv5.NewTestPeerDiscoverer()
px3, err := NewWakuPeerExchange(nil, pxPeerConn3, nil, prometheus.DefaultRegisterer, utils.Logger()) px3, err := NewWakuPeerExchange(nil, 0, pxPeerConn3, nil, prometheus.DefaultRegisterer, utils.Logger())
require.NoError(t, err) require.NoError(t, err)
px3.SetHost(host3) px3.SetHost(host3)
@ -171,12 +171,12 @@ func TestRetrieveFilteredPeerExchangePeers(t *testing.T) {
// mount peer exchange // mount peer exchange
pxPeerConn1 := discv5.NewTestPeerDiscoverer() pxPeerConn1 := discv5.NewTestPeerDiscoverer()
px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) px1, err := NewWakuPeerExchange(d1, 0, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger())
require.NoError(t, err) require.NoError(t, err)
px1.SetHost(host1) px1.SetHost(host1)
pxPeerConn3 := discv5.NewTestPeerDiscoverer() pxPeerConn3 := discv5.NewTestPeerDiscoverer()
px3, err := NewWakuPeerExchange(nil, pxPeerConn3, nil, prometheus.DefaultRegisterer, utils.Logger()) px3, err := NewWakuPeerExchange(nil, 0, pxPeerConn3, nil, prometheus.DefaultRegisterer, utils.Logger())
require.NoError(t, err) require.NoError(t, err)
px3.SetHost(host3) px3.SetHost(host3)
@ -234,7 +234,7 @@ func TestPeerExchangeOptions(t *testing.T) {
// Mount peer exchange // Mount peer exchange
pxPeerConn1 := discv5.NewTestPeerDiscoverer() pxPeerConn1 := discv5.NewTestPeerDiscoverer()
px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) px1, err := NewWakuPeerExchange(d1, 0, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger())
require.NoError(t, err) require.NoError(t, err)
px1.SetHost(host1) px1.SetHost(host1)
@ -302,11 +302,11 @@ func TestRetrieveProvidePeerExchangeWithPMAndPeerAddr(t *testing.T) {
// mount peer exchange // mount peer exchange
pxPeerConn1 := discv5.NewTestPeerDiscoverer() pxPeerConn1 := discv5.NewTestPeerDiscoverer()
px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) px1, err := NewWakuPeerExchange(d1, 0, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger())
require.NoError(t, err) require.NoError(t, err)
px1.SetHost(host1) px1.SetHost(host1)
px3, err := NewWakuPeerExchange(nil, pxPeerConn3, pm3, prometheus.DefaultRegisterer, utils.Logger()) px3, err := NewWakuPeerExchange(nil, 0, pxPeerConn3, pm3, prometheus.DefaultRegisterer, utils.Logger())
require.NoError(t, err) require.NoError(t, err)
px3.SetHost(host3) px3.SetHost(host3)
@ -382,11 +382,11 @@ func TestRetrieveProvidePeerExchangeWithPMOnly(t *testing.T) {
// mount peer exchange // mount peer exchange
pxPeerConn1 := discv5.NewTestPeerDiscoverer() pxPeerConn1 := discv5.NewTestPeerDiscoverer()
px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) px1, err := NewWakuPeerExchange(d1, 0, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger())
require.NoError(t, err) require.NoError(t, err)
px1.SetHost(host1) px1.SetHost(host1)
px3, err := NewWakuPeerExchange(nil, pxPeerConn3, pm3, prometheus.DefaultRegisterer, utils.Logger()) px3, err := NewWakuPeerExchange(nil, 0, pxPeerConn3, pm3, prometheus.DefaultRegisterer, utils.Logger())
require.NoError(t, err) require.NoError(t, err)
px3.SetHost(host3) px3.SetHost(host3)