feat: support serviceslots in peermanager (#631)

* feat: support peermanager serviceslots and update store protocol to use serviceslots

* fix: lint errors in test code

* fix: error in nix build due to vendor sha change

* fix: set host in peermanager even if relay is disabled

* chore: fix codeclimate issues

* chore: using common filterPeer function to avoid duplication

* feat:use service slots in other service protocols

* chore: fix codeclimate issues

* chore: move AddPeer to peermanager

* Apply suggestions from code review

Co-authored-by: richΛrd <info@richardramos.me>

* chore:address review comments

* feat: implement RemovePeer #638

* chore: fix test failure

* Support for multiple slots for service peers
Adding discovered peers also moved to peer manager

---------

Co-authored-by: richΛrd <info@richardramos.me>
This commit is contained in:
Prem Chaitanya Prathi 2023-08-10 18:28:22 +05:30 committed by GitHub
parent 299801d4ad
commit 9f45d271ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 422 additions and 150 deletions

View File

@ -34,7 +34,6 @@ import (
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/config"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
@ -304,7 +303,7 @@ func Execute(options Options) {
}
for _, d := range discoveredNodes {
wakuNode.Host().Peerstore().AddAddrs(d.PeerID, d.PeerInfo.Addrs, peerstore.PermanentAddrTTL)
wakuNode.AddDiscoveredPeer(d.PeerID, d.PeerInfo.Addrs, wakupeerstore.DnsDiscovery)
}
addStaticPeers(wakuNode, options.Store.Nodes, store.StoreID_v20beta4)

View File

@ -28,7 +28,7 @@
];
doCheck = false;
# FIXME: This needs to be manually changed when updating modules.
vendorSha256 = "sha256-1sioJgLOO5erkjeAIkTWMLZglJERvMo7OzFNvKHwJXA=";
vendorSha256 = "sha256-JhbZJV0SG7QdKR386Pfg7CWi5bNg+MOKwrzClEzKruw=";
# Fix for 'nix run' trying to execute 'go-waku'.
meta = { mainProgram = "waku"; };
};

6
go.mod
View File

@ -18,7 +18,7 @@ require (
github.com/libp2p/go-libp2p-pubsub v0.9.3
github.com/libp2p/go-msgio v0.3.0
github.com/mattn/go-sqlite3 v1.14.17
github.com/multiformats/go-multiaddr v0.9.0
github.com/multiformats/go-multiaddr v0.10.1
github.com/stretchr/testify v1.8.4
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
github.com/urfave/cli/v2 v2.24.4
@ -164,8 +164,8 @@ require (
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect
golang.org/x/mod v0.10.0 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
golang.org/x/mod v0.11.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.8.0
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect

12
go.sum
View File

@ -1176,8 +1176,8 @@ github.com/multiformats/go-base36 v0.2.0 h1:lFsAbNOGeKtuKozrtBsAkSVhv1p9D0/qedU9
github.com/multiformats/go-base36 v0.2.0/go.mod h1:qvnKE++v+2MWCfePClUEjE78Z7P2a1UV0xHgWc0hkp4=
github.com/multiformats/go-multiaddr v0.1.1/go.mod h1:aMKBKNEYmzmDmxfX88/vz+J5IU55txyt0p4aiWVohjo=
github.com/multiformats/go-multiaddr v0.2.0/go.mod h1:0nO36NvPpyV4QzvTLi/lafl2y95ncPj0vFwVF6k6wJ4=
github.com/multiformats/go-multiaddr v0.9.0 h1:3h4V1LHIk5w4hJHekMKWALPXErDfz/sggzwC/NcqbDQ=
github.com/multiformats/go-multiaddr v0.9.0/go.mod h1:mI67Lb1EeTOYb8GQfL/7wpIZwc46ElrvzhYnoJOmTT0=
github.com/multiformats/go-multiaddr v0.10.1 h1:HghtFrWyZEPrpTvgAMFJi6gFdgHfs2cb0pyfDsk+lqU=
github.com/multiformats/go-multiaddr v0.10.1/go.mod h1:jLEZsA61rwWNZQTHHnqq2HNa+4os/Hz54eqiRnsRqYQ=
github.com/multiformats/go-multiaddr-dns v0.3.1 h1:QgQgR+LQVt3NPTjbrLLpsaT2ufAA2y0Mkk+QRVJbW3A=
github.com/multiformats/go-multiaddr-dns v0.3.1/go.mod h1:G/245BRQ6FJGmryJCrOuTdB37AMA5AMOVuO6NY3JwTk=
github.com/multiformats/go-multiaddr-fmt v0.1.0 h1:WLEFClPycPkp4fnIzoFoV9FVd49/eQsuaL3/CWe167E=
@ -1712,8 +1712,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
@ -1748,8 +1748,8 @@ golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.5.0/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro=
golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk=
golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU=
golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=

View File

@ -33,7 +33,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/discv5"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/peermanager"
peerstore1 "github.com/waku-org/go-waku/waku/v2/peerstore"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
@ -124,7 +124,7 @@ type WakuNode struct {
}
func defaultStoreFactory(w *WakuNode) store.Store {
return store.NewWakuStore(w.opts.messageProvider, w.timesource, w.log)
return store.NewWakuStore(w.opts.messageProvider, w.peermanager, w.timesource, w.log)
}
// New is used to instantiate a WakuNode using a set of WakuNodeOptions
@ -195,14 +195,14 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
// Setup peerstore wrapper
if params.peerstore != nil {
w.peerstore = peerstore1.NewWakuPeerstore(params.peerstore)
w.peerstore = wps.NewWakuPeerstore(params.peerstore)
params.libP2POpts = append(params.libP2POpts, libp2p.Peerstore(w.peerstore))
} else {
ps, err := pstoremem.NewPeerstore()
if err != nil {
return nil, err
}
w.peerstore = peerstore1.NewWakuPeerstore(ps)
w.peerstore = wps.NewWakuPeerstore(ps)
params.libP2POpts = append(params.libP2POpts, libp2p.Peerstore(w.peerstore))
}
@ -265,7 +265,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
}
}
w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.DiscV5(), w.peerConnector, w.log)
w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.DiscV5(), w.peerConnector, w.peermanager, w.log)
if err != nil {
return nil, err
}
@ -274,8 +274,8 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...)
w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullnode, w.timesource, w.log, w.opts.legacyFilterOpts...)
w.filterFullnode = filter.NewWakuFilterFullnode(w.timesource, w.log, w.opts.filterOpts...)
w.filterLightnode = filter.NewWakuFilterLightnode(w.bcaster, w.timesource, w.log)
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.log)
w.filterLightnode = filter.NewWakuFilterLightnode(w.bcaster, w.peermanager, w.timesource, w.log)
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.log)
if params.storeFactory != nil {
w.storeFactory = params.storeFactory
@ -382,6 +382,7 @@ func (w *WakuNode) Start(ctx context.Context) error {
}
w.peerConnector.SetHost(host)
w.peerConnector.SetPeerManager(w.peermanager)
err = w.peerConnector.Start(ctx)
if err != nil {
return err
@ -395,12 +396,13 @@ func (w *WakuNode) Start(ctx context.Context) error {
}
w.relay.SetHost(host)
w.peermanager.SetHost(host)
if w.opts.enableRelay {
err := w.relay.Start(ctx)
if err != nil {
return err
}
w.peermanager.SetHost(host)
w.peermanager.Start(ctx)
}
@ -676,30 +678,21 @@ func (w *WakuNode) startStore(ctx context.Context, sub relay.Subscription) error
return nil
}
func (w *WakuNode) addPeer(info *peer.AddrInfo, origin peerstore1.Origin, protocols ...protocol.ID) error {
w.log.Info("adding peer to peerstore", logging.HostID("peer", info.ID))
err := w.host.Peerstore().(peerstore1.WakuPeerstore).SetOrigin(info.ID, origin)
if err != nil {
return err
}
w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.AddressTTL)
err = w.host.Peerstore().AddProtocols(info.ID, protocols...)
if err != nil {
return err
}
return nil
// AddPeer is used to add a peer and the protocols it support to the node peerstore
func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, protocols ...protocol.ID) (peer.ID, error) {
return w.peermanager.AddPeer(address, origin, protocols...)
}
// AddPeer is used to add a peer and the protocols it support to the node peerstore
func (w *WakuNode) AddPeer(address ma.Multiaddr, origin peerstore1.Origin, protocols ...protocol.ID) (peer.ID, error) {
info, err := peer.AddrInfoFromP2pAddr(address)
if err != nil {
return "", err
// AddDiscoveredPeer to add a discovered peer to the node peerStore
func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin) {
p := peermanager.PeerData{
Origin: origin,
AddrInfo: peer.AddrInfo{
ID: ID,
Addrs: addrs,
},
}
return info.ID, w.addPeer(info, origin, protocols...)
w.peermanager.AddDiscoveredPeer(p)
}
// DialPeerWithMultiAddress is used to connect to a peer using a multiaddress
@ -735,11 +728,11 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo)
func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error {
err := w.host.Connect(ctx, info)
if err != nil {
w.host.Peerstore().(peerstore1.WakuPeerstore).AddConnFailure(info)
w.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(info)
return err
}
w.host.Peerstore().(peerstore1.WakuPeerstore).ResetConnFailures(info)
w.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(info)
stats.Record(ctx, metrics.Dials.M(1))
return nil
}

View File

@ -28,7 +28,7 @@ func TestWakuOptions(t *testing.T) {
require.NoError(t, err)
storeFactory := func(w *WakuNode) store.Store {
return store.NewWakuStore(w.opts.messageProvider, w.timesource, w.log)
return store.NewWakuStore(w.opts.messageProvider, w.peermanager, w.timesource, w.log)
}
options := []WakuNodeOption{

View File

@ -12,7 +12,6 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
"github.com/waku-org/go-waku/logging"
@ -36,6 +35,7 @@ type PeerConnectionStrategy struct {
cache *lru.TwoQueueCache
host host.Host
pm *PeerManager
cancel context.CancelFunc
paused bool
@ -114,6 +114,10 @@ func (c *PeerConnectionStrategy) SetHost(h host.Host) {
c.host = h
}
func (c *PeerConnectionStrategy) SetPeerManager(pm *PeerManager) {
c.pm = pm
}
// Start attempts to connect to the peers passed in by peerCh. Will not connect to peers if they are within the backoff period.
func (c *PeerConnectionStrategy) Start(ctx context.Context) error {
if c.cancel != nil {
@ -225,19 +229,7 @@ func (c *PeerConnectionStrategy) workPublisher(ctx context.Context) {
case <-ctx.Done():
return
case p := <-c.peerCh:
c.host.Peerstore().AddAddrs(p.AddrInfo.ID, p.AddrInfo.Addrs, peerstore.AddressTTL)
err := c.host.Peerstore().(wps.WakuPeerstore).SetOrigin(p.AddrInfo.ID, p.Origin)
if err != nil {
c.logger.Error("could not set origin", zap.Error(err), logging.HostID("peer", p.AddrInfo.ID))
}
if p.ENR != nil {
err = c.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR)
if err != nil {
c.logger.Error("could not store enr", zap.Error(err), logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String()))
}
}
c.pm.AddDiscoveredPeer(p)
c.publishWork(ctx, p.AddrInfo)
case <-time.After(1 * time.Second):
// This timeout is to not lock the goroutine

View File

@ -6,8 +6,12 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
ma "github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/logging"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
@ -23,6 +27,7 @@ type PeerManager struct {
InRelayPeersTarget uint
OutRelayPeersTarget uint
host host.Host
serviceSlots map[protocol.ID][]peer.ID
}
const maxRelayPeersShare = 5
@ -42,6 +47,7 @@ func NewPeerManager(maxConnections uint, logger *zap.Logger) *PeerManager {
maxRelayPeers: maxRelayPeersValue,
InRelayPeersTarget: maxRelayPeersValue - outRelayPeersTargetValue,
OutRelayPeersTarget: outRelayPeersTargetValue,
serviceSlots: make(map[protocol.ID][]peer.ID),
}
logger.Info("PeerManager init values", zap.Uint("maxConnections", maxConnections),
zap.Uint("maxRelayPeersValue", maxRelayPeersValue), zap.Uint("outRelayPeersTargetValue", outRelayPeersTargetValue),
@ -72,23 +78,6 @@ func (pm *PeerManager) connectivityLoop(ctx context.Context) {
}
}
func (pm *PeerManager) filterPeersByProto(peers peer.IDSlice, proto ...protocol.ID) peer.IDSlice {
var filteredPeers peer.IDSlice
//TODO: This can be optimized once we have waku's own peerStore
for _, p := range peers {
supportedProtocols, err := pm.host.Peerstore().SupportsProtocols(p, proto...)
if err != nil {
pm.logger.Warn("Failed to get supported protocols for peer", zap.String("peerID", p.String()))
continue
}
if len(supportedProtocols) != 0 {
filteredPeers = append(filteredPeers, p)
}
}
return filteredPeers
}
func (pm *PeerManager) pruneInRelayConns() {
var inRelayPeers peer.IDSlice
@ -100,8 +89,8 @@ func (pm *PeerManager) pruneInRelayConns() {
pm.logger.Info("Number of peers connected", zap.Int("inPeers", inPeers.Len()), zap.Int("outPeers", outPeers.Len()))
//Need to filter peers to check if they support relay
inRelayPeers = pm.filterPeersByProto(inPeers, WakuRelayIDv200)
outRelayPeers := pm.filterPeersByProto(outPeers, WakuRelayIDv200)
inRelayPeers, _ = utils.FilterPeersByProto(pm.host, inPeers, WakuRelayIDv200)
outRelayPeers, _ := utils.FilterPeersByProto(pm.host, outPeers, WakuRelayIDv200)
pm.logger.Info("Number of Relay peers connected", zap.Int("inRelayPeers", inRelayPeers.Len()), zap.Int("outRelayPeers", outRelayPeers.Len()))
if inRelayPeers.Len() > int(pm.InRelayPeersTarget) {
@ -120,3 +109,119 @@ func (pm *PeerManager) pruneInRelayConns() {
}
}
}
// AddDiscoveredPeer to add dynamically discovered peers.
// Note that these peers will not be set in service-slots.
// TODO: It maybe good to set in service-slots based on services supported in the ENR
func (pm *PeerManager) AddDiscoveredPeer(p PeerData) {
_ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin)
if p.ENR != nil {
err := pm.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR)
if err != nil {
pm.logger.Error("could not store enr", zap.Error(err), logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String()))
}
}
}
// addPeer adds peer to only the peerStore.
// It also sets additional metadata such as origin, ENR and supported protocols
func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, protocols ...protocol.ID) error {
pm.logger.Info("adding peer to peerstore", logging.HostID("peer", ID))
pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.AddressTTL)
err := pm.host.Peerstore().(wps.WakuPeerstore).SetOrigin(ID, origin)
if err != nil {
pm.logger.Error("could not set origin", zap.Error(err), logging.HostID("peer", ID))
return err
}
if len(protocols) > 0 {
err = pm.host.Peerstore().AddProtocols(ID, protocols...)
if err != nil {
return err
}
}
return nil
}
// AddPeer adds peer to the peerStore and also to service slots
func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, protocols ...protocol.ID) (peer.ID, error) {
//Assuming all addresses have peerId
info, err := peer.AddrInfoFromP2pAddr(address)
if err != nil {
return "", err
}
//Add Service peers to serviceSlots.
for _, proto := range protocols {
pm.AddPeerToServiceSlot(proto, info.ID, origin)
}
//Add to the peer-store
err = pm.addPeer(info.ID, info.Addrs, origin)
if err != nil {
return "", err
}
return info.ID, nil
}
// RemovePeer deletes peer from the peerStore after disconnecting it.
// It also removes the peer from serviceSlot.
func (pm *PeerManager) RemovePeer(peerID peer.ID) {
pm.host.Peerstore().RemovePeer(peerID)
//Search if this peer is in serviceSlot and if so, remove it from there
// TODO:Add another peer which is statically configured to the serviceSlot.
for proto, peers := range pm.serviceSlots {
for i, peer := range peers {
if peer == peerID {
pm.serviceSlots[proto][i] = ""
}
}
}
}
// AddPeerToServiceSlot adds a peerID to serviceSlot.
// Adding to peerStore is expected to be already done by caller.
// If relay proto is passed, it is not added to serviceSlot.
func (pm *PeerManager) AddPeerToServiceSlot(proto protocol.ID, peerID peer.ID, origin wps.Origin) {
if proto == WakuRelayIDv200 {
pm.logger.Warn("Cannot add Relay peer to service peer slots")
return
}
//For now adding the peer to serviceSlot which means the latest added peer would be given priority.
//TODO: Ideally we should sort the peers per service and return best peer based on peer score or RTT etc.
pm.logger.Info("Adding peer to service slots", logging.HostID("peer", peerID), zap.String("service", string(proto)))
pm.serviceSlots[proto] = append(pm.serviceSlots[proto], peerID)
}
// SelectPeer is used to return a random peer that supports a given protocol.
// If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol, otherwise it will chose a peer from the service slot.
// If a peer cannot be found in the service slot, a peer will be selected from node peerstore
func (pm *PeerManager) SelectPeer(proto protocol.ID, specificPeers []peer.ID, logger *zap.Logger) (peer.ID, error) {
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
// Ideally depending on the query and our set of peers we take a subset of ideal peers.
// This will require us to check for various factors such as:
// - which topics they track
// - latency?
filteredPeers, err := utils.FilterPeersByProto(pm.host, specificPeers, proto)
if err != nil {
return "", err
}
if proto == WakuRelayIDv200 {
return utils.SelectRandomPeer(filteredPeers, pm.logger)
}
//Try to fetch from serviceSlot
peerIDs, ok := pm.serviceSlots[proto]
if ok || len(peerIDs) > 0 {
pm.logger.Info("Got peer from service slots", logging.HostID("peer", peerIDs[0]))
return peerIDs[0], nil
}
return utils.SelectRandomPeer(filteredPeers, pm.logger)
}

View File

@ -0,0 +1,112 @@
package test
import (
"context"
"crypto/rand"
"testing"
"time"
"github.com/libp2p/go-libp2p/core/peerstore"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/peermanager"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/utils"
)
func TestServiceSlots(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
h1, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h1.Close()
h2, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h2.Close()
h3, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h3.Close()
protocol := libp2pProtocol.ID("test/protocol")
protocol1 := libp2pProtocol.ID("test/protocol1")
pm := peermanager.NewPeerManager(10, utils.Logger())
pm.SetHost(h1)
h1.Peerstore().AddAddrs(h2.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
err = h1.Peerstore().AddProtocols(h2.ID(), libp2pProtocol.ID(protocol))
require.NoError(t, err)
//Test selection from peerStore.
peerId, err := pm.SelectPeer(protocol, nil, utils.Logger())
require.NoError(t, err)
require.Equal(t, peerId, h2.ID())
//Test addition and selection from service-slot
pm.AddPeerToServiceSlot(protocol, h2.ID(), wps.Static)
peerId, err = pm.SelectPeer(protocol, nil, utils.Logger())
require.NoError(t, err)
require.Equal(t, peerId, h2.ID())
h1.Peerstore().AddAddrs(h3.ID(), h3.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
pm.AddPeerToServiceSlot(protocol, h3.ID(), wps.Static)
h4, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h4.Close()
h1.Peerstore().AddAddrs(h4.ID(), h4.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
pm.AddPeerToServiceSlot(protocol1, h4.ID(), wps.Static)
//Test peer selection from first added peer to serviceSlot
peerId, err = pm.SelectPeer(protocol, nil, utils.Logger())
require.NoError(t, err)
require.Equal(t, peerId, h2.ID())
//Test peer selection for specific protocol
peerId, err = pm.SelectPeer(protocol1, nil, utils.Logger())
require.NoError(t, err)
require.Equal(t, peerId, h4.ID())
h5, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h5.Close()
//Test empty peer selection for relay protocol
_, err = pm.SelectPeer(peermanager.WakuRelayIDv200, nil, utils.Logger())
require.Error(t, err, utils.ErrNoPeersAvailable)
//Test peer selection for relay protocol from peer store
h1.Peerstore().AddAddrs(h5.ID(), h5.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
pm.AddPeerToServiceSlot(peermanager.WakuRelayIDv200, h5.ID(), wps.Static)
_, err = pm.SelectPeer(peermanager.WakuRelayIDv200, nil, utils.Logger())
require.Error(t, err, utils.ErrNoPeersAvailable)
err = h1.Peerstore().AddProtocols(h5.ID(), peermanager.WakuRelayIDv200)
require.NoError(t, err)
peerId, err = pm.SelectPeer(peermanager.WakuRelayIDv200, nil, utils.Logger())
require.NoError(t, err)
require.Equal(t, peerId, h5.ID())
//Test random peer selection
protocol2 := libp2pProtocol.ID("test/protocol2")
h6, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h6.Close()
h1.Peerstore().AddAddrs(h6.ID(), h6.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
err = h1.Peerstore().AddProtocols(h6.ID(), libp2pProtocol.ID(protocol2))
require.NoError(t, err)
peerId, err = pm.SelectPeer(protocol2, nil, utils.Logger())
require.NoError(t, err)
require.Equal(t, peerId, h6.ID())
pm.RemovePeer(peerId)
_, err = pm.SelectPeer(protocol2, nil, utils.Logger())
require.Error(t, err, utils.ErrNoPeersAvailable)
}

View File

@ -16,6 +16,7 @@ import (
"github.com/libp2p/go-msgio/pbio"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
@ -42,6 +43,7 @@ type WakuFilterLightnode struct {
wg *sync.WaitGroup
log *zap.Logger
subscriptions *SubscriptionsMap
pm *peermanager.PeerManager
}
type ContentFilter struct {
@ -54,13 +56,17 @@ type WakuFilterPushResult struct {
PeerID peer.ID
}
// NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilterLightnode(broadcaster relay.Broadcaster, timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightnode {
// NewWakuFilterLightnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options
// Takes an optional peermanager if WakuFilterLightnode is being created along with WakuNode.
// If using libp2p host, then pass peermanager as nil
func NewWakuFilterLightnode(broadcaster relay.Broadcaster, pm *peermanager.PeerManager,
timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightnode {
wf := new(WakuFilterLightnode)
wf.log = log.Named("filterv2-lightnode")
wf.broadcaster = broadcaster
wf.timesource = timesource
wf.wg = &sync.WaitGroup{}
wf.pm = pm
return wf
}
@ -220,6 +226,7 @@ func (wf *WakuFilterLightnode) Subscribe(ctx context.Context, contentFilter Cont
params := new(FilterSubscribeParameters)
params.log = wf.log
params.host = wf.h
params.pm = wf.pm
optList := DefaultSubscriptionOptions()
optList = append(optList, opts...)

View File

@ -44,7 +44,7 @@ func makeWakuFilterLightNode(t *testing.T) (*WakuFilterLightnode, host.Host) {
b := relay.NewBroadcaster(10)
require.NoError(t, b.Start(context.Background()))
filterPush := NewWakuFilterLightnode(b, timesource.NewDefaultClock(), utils.Logger())
filterPush := NewWakuFilterLightnode(b, nil, timesource.NewDefaultClock(), utils.Logger())
filterPush.SetHost(host)
err = filterPush.Start(context.Background())
require.NoError(t, err)

View File

@ -6,6 +6,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
@ -15,6 +16,7 @@ type (
FilterSubscribeParameters struct {
host host.Host
selectedPeer peer.ID
pm *peermanager.PeerManager
requestID []byte
log *zap.Logger
}
@ -54,7 +56,13 @@ func WithPeer(p peer.ID) FilterSubscribeOption {
// supports the chosen protocol, otherwise it will chose a peer from the node peerstore
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
p, err := utils.SelectPeer(params.host, FilterSubscribeID_v20beta1, fromThesePeers, params.log)
var p peer.ID
var err error
if params.pm == nil {
p, err = utils.SelectPeer(params.host, FilterSubscribeID_v20beta1, fromThesePeers, params.log)
} else {
p, err = params.pm.SelectPeer(FilterSubscribeID_v20beta1, fromThesePeers, params.log)
}
if err == nil {
params.selectedPeer = p
} else {

View File

@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-msgio/pbio"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush/pb"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
@ -32,16 +33,19 @@ type WakuLightPush struct {
h host.Host
relay *relay.WakuRelay
cancel context.CancelFunc
pm *peermanager.PeerManager
log *zap.Logger
}
// NewWakuLightPush returns a new instance of Waku Lightpush struct
func NewWakuLightPush(relay *relay.WakuRelay, log *zap.Logger) *WakuLightPush {
// Takes an optional peermanager if WakuLightPush is being created along with WakuNode.
// If using libp2p host, then pass peermanager as nil
func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, log *zap.Logger) *WakuLightPush {
wakuLP := new(WakuLightPush)
wakuLP.relay = relay
wakuLP.log = log.Named("lightpush")
wakuLP.pm = pm
return wakuLP
}
@ -142,6 +146,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o
params := new(lightPushParameters)
params.host = wakuLP.h
params.log = wakuLP.log
params.pm = wakuLP.pm
optList := append(DefaultOptions(wakuLP.h), opts...)
for _, opt := range optList {

View File

@ -5,6 +5,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
@ -14,6 +15,7 @@ type lightPushParameters struct {
host host.Host
selectedPeer peer.ID
requestID []byte
pm *peermanager.PeerManager
log *zap.Logger
}
@ -33,7 +35,13 @@ func WithPeer(p peer.ID) Option {
// from the node peerstore
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option {
return func(params *lightPushParameters) {
p, err := utils.SelectPeer(params.host, LightPushID_v20beta1, fromThesePeers, params.log)
var p peer.ID
var err error
if params.pm == nil {
p, err = utils.SelectPeer(params.host, LightPushID_v20beta1, fromThesePeers, params.log)
} else {
p, err = params.pm.SelectPeer(LightPushID_v20beta1, fromThesePeers, params.log)
}
if err == nil {
params.selectedPeer = p
} else {

View File

@ -61,7 +61,7 @@ func TestWakuLightPush(t *testing.T) {
defer sub2.Unsubscribe()
ctx := context.Background()
lightPushNode2 := NewWakuLightPush(node2, utils.Logger())
lightPushNode2 := NewWakuLightPush(node2, nil, utils.Logger())
lightPushNode2.SetHost(host2)
err := lightPushNode2.Start(ctx)
require.NoError(t, err)
@ -72,7 +72,7 @@ func TestWakuLightPush(t *testing.T) {
clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
client := NewWakuLightPush(nil, utils.Logger())
client := NewWakuLightPush(nil, nil, utils.Logger())
client.SetHost(clientHost)
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
@ -129,7 +129,7 @@ func TestWakuLightPushStartWithoutRelay(t *testing.T) {
clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader)
require.NoError(t, err)
client := NewWakuLightPush(nil, utils.Logger())
client := NewWakuLightPush(nil, nil, utils.Logger())
client.SetHost(clientHost)
err = client.Start(ctx)
@ -144,7 +144,7 @@ func TestWakuLightPushNoPeers(t *testing.T) {
clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader)
require.NoError(t, err)
client := NewWakuLightPush(nil, utils.Logger())
client := NewWakuLightPush(nil, nil, utils.Logger())
client.SetHost(clientHost)
_, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", utils.GetUnixEpoch()), testTopic)
require.Errorf(t, err, "no suitable remote peers")

View File

@ -22,6 +22,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
params := new(PeerExchangeParameters)
params.host = wakuPX.h
params.log = wakuPX.log
params.pm = wakuPX.pm
optList := DefaultOptions(wakuPX.h)
optList = append(optList, opts...)

View File

@ -39,8 +39,8 @@ type PeerConnector interface {
type WakuPeerExchange struct {
h host.Host
disc *discv5.DiscoveryV5
log *zap.Logger
pm *peermanager.PeerManager
log *zap.Logger
cancel context.CancelFunc
@ -50,7 +50,9 @@ type WakuPeerExchange struct {
}
// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct
func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, log *zap.Logger) (*WakuPeerExchange, error) {
// Takes an optional peermanager if WakuPeerExchange is being created along with WakuNode.
// If using libp2p host, then pass peermanager as nil
func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, pm *peermanager.PeerManager, log *zap.Logger) (*WakuPeerExchange, error) {
newEnrCache, err := newEnrCache(MaxCacheSize)
if err != nil {
return nil, err
@ -60,6 +62,7 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector,
wakuPX.log = log.Named("wakupx")
wakuPX.enrCache = newEnrCache
wakuPX.peerConnector = peerConnector
wakuPX.pm = pm
return wakuPX, nil
}

View File

@ -5,6 +5,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
@ -12,6 +13,7 @@ import (
type PeerExchangeParameters struct {
host host.Host
selectedPeer peer.ID
pm *peermanager.PeerManager
log *zap.Logger
}
@ -30,7 +32,13 @@ func WithPeer(p peer.ID) PeerExchangeOption {
// from the node peerstore
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
return func(params *PeerExchangeParameters) {
p, err := utils.SelectPeer(params.host, PeerExchangeID_v20alpha1, fromThesePeers, params.log)
var p peer.ID
var err error
if params.pm == nil {
p, err = utils.SelectPeer(params.host, PeerExchangeID_v20alpha1, fromThesePeers, params.log)
} else {
p, err = params.pm.SelectPeer(PeerExchangeID_v20alpha1, fromThesePeers, params.log)
}
if err == nil {
params.selectedPeer = p
} else {

View File

@ -142,12 +142,12 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) {
// mount peer exchange
pxPeerConn1 := tests.NewTestPeerDiscoverer()
px1, err := NewWakuPeerExchange(d1, pxPeerConn1, utils.Logger())
px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, utils.Logger())
require.NoError(t, err)
px1.SetHost(host1)
pxPeerConn3 := tests.NewTestPeerDiscoverer()
px3, err := NewWakuPeerExchange(nil, pxPeerConn3, utils.Logger())
px3, err := NewWakuPeerExchange(nil, pxPeerConn3, nil, utils.Logger())
require.NoError(t, err)
px3.SetHost(host3)

View File

@ -24,7 +24,7 @@ func TestFindLastSeenMessage(t *testing.T) {
msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", now+4), utils.GetUnixEpoch(), "test")
msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", now+5), utils.GetUnixEpoch(), "test")
s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
_ = s.storeMessage(msg1)
_ = s.storeMessage(msg3)
_ = s.storeMessage(msg5)
@ -44,7 +44,7 @@ func TestResume(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx, relay.NoopSubscription())
require.NoError(t, err)
@ -66,7 +66,7 @@ func TestResume(t *testing.T) {
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx, relay.NoopSubscription())
require.NoError(t, err)
@ -104,7 +104,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx, relay.NoopSubscription())
require.NoError(t, err)
@ -118,7 +118,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx, relay.NoopSubscription())
require.NoError(t, err)
@ -145,7 +145,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
err = s1.Start(ctx, relay.NoopSubscription())
require.NoError(t, err)
@ -159,7 +159,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx, relay.NoopSubscription())
require.NoError(t, err)

View File

@ -107,7 +107,13 @@ func WithPeer(p peer.ID) HistoryRequestOption {
// from the node peerstore
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
p, err := utils.SelectPeer(params.s.h, StoreID_v20beta4, fromThesePeers, params.s.log)
var p peer.ID
var err error
if params.s.pm == nil {
p, err = utils.SelectPeer(params.s.h, StoreID_v20beta4, fromThesePeers, params.s.log)
} else {
p, err = params.s.pm.SelectPeer(StoreID_v20beta4, fromThesePeers, params.s.log)
}
if err == nil {
params.selectedPeer = p
} else {

View File

@ -7,6 +7,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.uber.org/zap"
@ -58,15 +59,19 @@ type WakuStore struct {
msgProvider MessageProvider
h host.Host
pm *peermanager.PeerManager
}
// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages
func NewWakuStore(p MessageProvider, timesource timesource.Timesource, log *zap.Logger) *WakuStore {
// Takes an optional peermanager if WakuStore is being created along with WakuNode.
// If using libp2p host, then pass peermanager as nil
func NewWakuStore(p MessageProvider, pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger) *WakuStore {
wakuStore := new(WakuStore)
wakuStore.msgProvider = p
wakuStore.wg = &sync.WaitGroup{}
wakuStore.log = log.Named("store")
wakuStore.timesource = timesource
wakuStore.pm = pm
return wakuStore
}

View File

@ -14,7 +14,7 @@ import (
func TestStorePersistence(t *testing.T) {
db := MemoryDB(t)
s1 := NewWakuStore(db, timesource.NewDefaultClock(), utils.Logger())
s1 := NewWakuStore(db, nil, timesource.NewDefaultClock(), utils.Logger())
defaultPubSubTopic := "test"
defaultContentTopic := "1"

View File

@ -23,7 +23,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
topic1 := "1"
@ -42,7 +42,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
require.NoError(t, err)
defer s1.Stop()
s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2.SetHost(host2)
@ -73,7 +73,7 @@ func TestWakuStoreProtocolLocalQuery(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
topic1 := "1"
@ -113,7 +113,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
require.NoError(t, err)
db := MemoryDB(t)
s1 := NewWakuStore(db, timesource.NewDefaultClock(), utils.Logger())
s1 := NewWakuStore(db, nil, timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
topic1 := "1"
@ -143,7 +143,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
err = host2.Peerstore().AddProtocols(host1.ID(), StoreID_v20beta4)
require.NoError(t, err)
s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx, relay.NoopSubscription())
require.NoError(t, err)
@ -188,7 +188,7 @@ func TestWakuStoreResult(t *testing.T) {
require.NoError(t, err)
db := MemoryDB(t)
s1 := NewWakuStore(db, timesource.NewDefaultClock(), utils.Logger())
s1 := NewWakuStore(db, nil, timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
topic1 := "1"
@ -218,7 +218,7 @@ func TestWakuStoreResult(t *testing.T) {
err = host2.Peerstore().AddProtocols(host1.ID(), StoreID_v20beta4)
require.NoError(t, err)
s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx, relay.NoopSubscription())
require.NoError(t, err)
@ -278,7 +278,7 @@ func TestWakuStoreProtocolFind(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s1 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
s1.SetHost(host1)
topic1 := "1"
@ -317,7 +317,7 @@ func TestWakuStoreProtocolFind(t *testing.T) {
err = host2.Peerstore().AddProtocols(host1.ID(), StoreID_v20beta4)
require.NoError(t, err)
s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
s2.SetHost(host2)
err = s2.Start(ctx, relay.NoopSubscription())
require.NoError(t, err)

View File

@ -21,7 +21,7 @@ func TestStoreQuery(t *testing.T) {
msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch())
msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch())
s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), defaultPubSubTopic))
_ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), defaultPubSubTopic))
@ -47,7 +47,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), defaultPubSubTopic))
_ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), defaultPubSubTopic))
@ -80,7 +80,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1))
_ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic2))
_ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic2))
@ -112,7 +112,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic2))
_ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic2))
_ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic2))
@ -134,7 +134,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1))
_ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1))
_ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1))
@ -153,7 +153,7 @@ func TestStoreQueryForwardPagination(t *testing.T) {
topic1 := "1"
pubsubTopic1 := "topic1"
s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
for i := 0; i < 10; i++ {
msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
msg.Payload = []byte{byte(i)}
@ -177,7 +177,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
topic1 := "1"
pubsubTopic1 := "topic1"
s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
for i := 0; i < 10; i++ {
msg := &wpb.WakuMessage{
Payload: []byte{byte(i)},
@ -203,7 +203,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
}
func TestTemporalHistoryQueries(t *testing.T) {
s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
s := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger())
var messages []*wpb.WakuMessage
now := utils.GetUnixEpoch()

View File

@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/logging"
"go.uber.org/zap"
)
@ -33,7 +34,42 @@ func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) {
return peerID, nil
}
// FilterPeersByProto filters list of peers that support specified protocols.
// If specificPeers is nil, all peers in the host's peerStore are considered for filtering.
func FilterPeersByProto(host host.Host, specificPeers peer.IDSlice, proto ...protocol.ID) (peer.IDSlice, error) {
peerSet := specificPeers
if len(peerSet) == 0 {
peerSet = host.Peerstore().Peers()
}
var peers peer.IDSlice
for _, peer := range peerSet {
protocols, err := host.Peerstore().SupportsProtocols(peer, proto...)
if err != nil {
return nil, err
}
if len(protocols) > 0 {
peers = append(peers, peer)
}
}
return peers, nil
}
// SelectRandomPeer selects randomly a peer from the list of peers passed.
func SelectRandomPeer(peers peer.IDSlice, log *zap.Logger) (peer.ID, error) {
if len(peers) >= 1 {
peerID := peers[rand.Intn(len(peers))]
// TODO: proper heuristic here that compares peer scores and selects "best" one. For now a random peer for the given protocol is returned
log.Info("Got random peer from peerstore", logging.HostID("peer", peerID))
return peerID, nil // nolint: gosec
}
return "", ErrNoPeersAvailable
}
// SelectPeer is used to return a random peer that supports a given protocol.
// Note: Use this method only if WakuNode is not being initialized, otherwise use peermanager.SelectPeer.
// If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore
func SelectPeer(host host.Host, protocolId protocol.ID, specificPeers []peer.ID, log *zap.Logger) (peer.ID, error) {
@ -44,29 +80,12 @@ func SelectPeer(host host.Host, protocolId protocol.ID, specificPeers []peer.ID,
// - latency?
// - default store peer?
peerSet := specificPeers
if len(peerSet) == 0 {
peerSet = host.Peerstore().Peers()
peers, err := FilterPeersByProto(host, specificPeers, protocolId)
if err != nil {
return "", err
}
var peers peer.IDSlice
for _, peer := range peerSet {
protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId)
if err != nil {
return "", err
}
if len(protocols) > 0 {
peers = append(peers, peer)
}
}
if len(peers) >= 1 {
// TODO: proper heuristic here that compares peer scores and selects "best" one. For now a random peer for the given protocol is returned
return peers[rand.Intn(len(peers))], nil // nolint: gosec
}
return "", ErrNoPeersAvailable
return SelectRandomPeer(peers, log)
}
type pingResult struct {

View File

@ -1,4 +1,4 @@
package utils
package tests
import (
"context"
@ -10,6 +10,7 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/utils"
)
func TestSelectPeer(t *testing.T) {
@ -31,17 +32,17 @@ func TestSelectPeer(t *testing.T) {
proto := protocol.ID("test/protocol")
h1.Peerstore().AddAddrs(h2.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
h1.Peerstore().AddAddrs(h3.ID(), h3.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
// No peers with selected protocol
_, err = SelectPeer(h1, proto, nil, Logger())
require.Error(t, ErrNoPeersAvailable, err)
_, err = utils.SelectPeer(h1, proto, nil, utils.Logger())
require.Error(t, utils.ErrNoPeersAvailable, err)
// Peers with selected protocol
_ = h1.Peerstore().AddProtocols(h2.ID(), proto)
_ = h1.Peerstore().AddProtocols(h3.ID(), proto)
_, err = SelectPeerWithLowestRTT(ctx, h1, proto, nil, Logger())
_, err = utils.SelectPeerWithLowestRTT(ctx, h1, proto, nil, utils.Logger())
require.NoError(t, err)
}
@ -70,13 +71,13 @@ func TestSelectPeerWithLowestRTT(t *testing.T) {
h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
// No peers with selected protocol
_, err = SelectPeerWithLowestRTT(ctx, h1, proto, nil, Logger())
require.Error(t, ErrNoPeersAvailable, err)
_, err = utils.SelectPeerWithLowestRTT(ctx, h1, proto, nil, utils.Logger())
require.Error(t, utils.ErrNoPeersAvailable, err)
// Peers with selected protocol
_ = h1.Peerstore().AddProtocols(h2.ID(), proto)
_ = h1.Peerstore().AddProtocols(h3.ID(), proto)
_, err = SelectPeerWithLowestRTT(ctx, h1, proto, nil, Logger())
_, err = utils.SelectPeerWithLowestRTT(ctx, h1, proto, nil, utils.Logger())
require.NoError(t, err)
}