From 9f45d271acf28f7803dde5b21c55646353c80d7d Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Thu, 10 Aug 2023 18:28:22 +0530 Subject: [PATCH] feat: support serviceslots in peermanager (#631) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: support peermanager serviceslots and update store protocol to use serviceslots * fix: lint errors in test code * fix: error in nix build due to vendor sha change * fix: set host in peermanager even if relay is disabled * chore: fix codeclimate issues * chore: using common filterPeer function to avoid duplication * feat:use service slots in other service protocols * chore: fix codeclimate issues * chore: move AddPeer to peermanager * Apply suggestions from code review Co-authored-by: richΛrd * chore:address review comments * feat: implement RemovePeer #638 * chore: fix test failure * Support for multiple slots for service peers Adding discovered peers also moved to peer manager --------- Co-authored-by: richΛrd --- cmd/waku/node.go | 3 +- flake.nix | 2 +- go.mod | 6 +- go.sum | 12 +- waku/v2/node/wakunode2.go | 55 +++---- waku/v2/node/wakuoptions_test.go | 2 +- waku/v2/peermanager/discovery_connector.go | 20 +-- waku/v2/peermanager/peer_manager.go | 143 +++++++++++++++--- waku/v2/peermanager/test/peer_manager_test.go | 112 ++++++++++++++ waku/v2/protocol/filter/client.go | 11 +- waku/v2/protocol/filter/filter_test.go | 2 +- waku/v2/protocol/filter/options.go | 10 +- waku/v2/protocol/lightpush/waku_lightpush.go | 9 +- .../lightpush/waku_lightpush_option.go | 10 +- .../protocol/lightpush/waku_lightpush_test.go | 8 +- waku/v2/protocol/peer_exchange/client.go | 1 + waku/v2/protocol/peer_exchange/protocol.go | 9 +- .../waku_peer_exchange_option.go | 10 +- .../peer_exchange/waku_peer_exchange_test.go | 4 +- waku/v2/protocol/store/waku_resume_test.go | 14 +- waku/v2/protocol/store/waku_store_client.go | 8 +- waku/v2/protocol/store/waku_store_common.go | 7 +- .../store/waku_store_persistence_test.go | 2 +- .../store/waku_store_protocol_test.go | 18 +-- .../protocol/store/waku_store_query_test.go | 16 +- waku/v2/utils/peer.go | 61 +++++--- waku/v2/utils/{ => test}/peer_test.go | 17 ++- 27 files changed, 422 insertions(+), 150 deletions(-) create mode 100644 waku/v2/peermanager/test/peer_manager_test.go rename waku/v2/utils/{ => test}/peer_test.go (78%) diff --git a/cmd/waku/node.go b/cmd/waku/node.go index 62af3cec..f837a27a 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -34,7 +34,6 @@ import ( "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/config" "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/transport/tcp" @@ -304,7 +303,7 @@ func Execute(options Options) { } for _, d := range discoveredNodes { - wakuNode.Host().Peerstore().AddAddrs(d.PeerID, d.PeerInfo.Addrs, peerstore.PermanentAddrTTL) + wakuNode.AddDiscoveredPeer(d.PeerID, d.PeerInfo.Addrs, wakupeerstore.DnsDiscovery) } addStaticPeers(wakuNode, options.Store.Nodes, store.StoreID_v20beta4) diff --git a/flake.nix b/flake.nix index 03934ac1..7bfe7309 100644 --- a/flake.nix +++ b/flake.nix @@ -28,7 +28,7 @@ ]; doCheck = false; # FIXME: This needs to be manually changed when updating modules. - vendorSha256 = "sha256-1sioJgLOO5erkjeAIkTWMLZglJERvMo7OzFNvKHwJXA="; + vendorSha256 = "sha256-JhbZJV0SG7QdKR386Pfg7CWi5bNg+MOKwrzClEzKruw="; # Fix for 'nix run' trying to execute 'go-waku'. meta = { mainProgram = "waku"; }; }; diff --git a/go.mod b/go.mod index 07674b2e..fa062948 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/libp2p/go-libp2p-pubsub v0.9.3 github.com/libp2p/go-msgio v0.3.0 github.com/mattn/go-sqlite3 v1.14.17 - github.com/multiformats/go-multiaddr v0.9.0 + github.com/multiformats/go-multiaddr v0.10.1 github.com/stretchr/testify v1.8.4 github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect github.com/urfave/cli/v2 v2.24.4 @@ -164,8 +164,8 @@ require ( go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.9.0 // indirect - golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect - golang.org/x/mod v0.10.0 // indirect + golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect + golang.org/x/mod v0.11.0 // indirect golang.org/x/net v0.10.0 // indirect golang.org/x/sys v0.8.0 golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect diff --git a/go.sum b/go.sum index 4374a349..4e3fc03f 100644 --- a/go.sum +++ b/go.sum @@ -1176,8 +1176,8 @@ github.com/multiformats/go-base36 v0.2.0 h1:lFsAbNOGeKtuKozrtBsAkSVhv1p9D0/qedU9 github.com/multiformats/go-base36 v0.2.0/go.mod h1:qvnKE++v+2MWCfePClUEjE78Z7P2a1UV0xHgWc0hkp4= github.com/multiformats/go-multiaddr v0.1.1/go.mod h1:aMKBKNEYmzmDmxfX88/vz+J5IU55txyt0p4aiWVohjo= github.com/multiformats/go-multiaddr v0.2.0/go.mod h1:0nO36NvPpyV4QzvTLi/lafl2y95ncPj0vFwVF6k6wJ4= -github.com/multiformats/go-multiaddr v0.9.0 h1:3h4V1LHIk5w4hJHekMKWALPXErDfz/sggzwC/NcqbDQ= -github.com/multiformats/go-multiaddr v0.9.0/go.mod h1:mI67Lb1EeTOYb8GQfL/7wpIZwc46ElrvzhYnoJOmTT0= +github.com/multiformats/go-multiaddr v0.10.1 h1:HghtFrWyZEPrpTvgAMFJi6gFdgHfs2cb0pyfDsk+lqU= +github.com/multiformats/go-multiaddr v0.10.1/go.mod h1:jLEZsA61rwWNZQTHHnqq2HNa+4os/Hz54eqiRnsRqYQ= github.com/multiformats/go-multiaddr-dns v0.3.1 h1:QgQgR+LQVt3NPTjbrLLpsaT2ufAA2y0Mkk+QRVJbW3A= github.com/multiformats/go-multiaddr-dns v0.3.1/go.mod h1:G/245BRQ6FJGmryJCrOuTdB37AMA5AMOVuO6NY3JwTk= github.com/multiformats/go-multiaddr-fmt v0.1.0 h1:WLEFClPycPkp4fnIzoFoV9FVd49/eQsuaL3/CWe167E= @@ -1712,8 +1712,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug= -golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= +golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME= +golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -1748,8 +1748,8 @@ golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.0/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= -golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk= -golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU= +golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 0f554e1d..db73c680 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -33,7 +33,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/discv5" "github.com/waku-org/go-waku/waku/v2/metrics" "github.com/waku-org/go-waku/waku/v2/peermanager" - peerstore1 "github.com/waku-org/go-waku/waku/v2/peerstore" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" @@ -124,7 +124,7 @@ type WakuNode struct { } func defaultStoreFactory(w *WakuNode) store.Store { - return store.NewWakuStore(w.opts.messageProvider, w.timesource, w.log) + return store.NewWakuStore(w.opts.messageProvider, w.peermanager, w.timesource, w.log) } // New is used to instantiate a WakuNode using a set of WakuNodeOptions @@ -195,14 +195,14 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { // Setup peerstore wrapper if params.peerstore != nil { - w.peerstore = peerstore1.NewWakuPeerstore(params.peerstore) + w.peerstore = wps.NewWakuPeerstore(params.peerstore) params.libP2POpts = append(params.libP2POpts, libp2p.Peerstore(w.peerstore)) } else { ps, err := pstoremem.NewPeerstore() if err != nil { return nil, err } - w.peerstore = peerstore1.NewWakuPeerstore(ps) + w.peerstore = wps.NewWakuPeerstore(ps) params.libP2POpts = append(params.libP2POpts, libp2p.Peerstore(w.peerstore)) } @@ -265,7 +265,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { } } - w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.DiscV5(), w.peerConnector, w.log) + w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.DiscV5(), w.peerConnector, w.peermanager, w.log) if err != nil { return nil, err } @@ -274,8 +274,8 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...) w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullnode, w.timesource, w.log, w.opts.legacyFilterOpts...) w.filterFullnode = filter.NewWakuFilterFullnode(w.timesource, w.log, w.opts.filterOpts...) - w.filterLightnode = filter.NewWakuFilterLightnode(w.bcaster, w.timesource, w.log) - w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.log) + w.filterLightnode = filter.NewWakuFilterLightnode(w.bcaster, w.peermanager, w.timesource, w.log) + w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.log) if params.storeFactory != nil { w.storeFactory = params.storeFactory @@ -382,6 +382,7 @@ func (w *WakuNode) Start(ctx context.Context) error { } w.peerConnector.SetHost(host) + w.peerConnector.SetPeerManager(w.peermanager) err = w.peerConnector.Start(ctx) if err != nil { return err @@ -395,12 +396,13 @@ func (w *WakuNode) Start(ctx context.Context) error { } w.relay.SetHost(host) + w.peermanager.SetHost(host) + if w.opts.enableRelay { err := w.relay.Start(ctx) if err != nil { return err } - w.peermanager.SetHost(host) w.peermanager.Start(ctx) } @@ -676,30 +678,21 @@ func (w *WakuNode) startStore(ctx context.Context, sub relay.Subscription) error return nil } -func (w *WakuNode) addPeer(info *peer.AddrInfo, origin peerstore1.Origin, protocols ...protocol.ID) error { - w.log.Info("adding peer to peerstore", logging.HostID("peer", info.ID)) - err := w.host.Peerstore().(peerstore1.WakuPeerstore).SetOrigin(info.ID, origin) - if err != nil { - return err - } - - w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.AddressTTL) - err = w.host.Peerstore().AddProtocols(info.ID, protocols...) - if err != nil { - return err - } - - return nil +// AddPeer is used to add a peer and the protocols it support to the node peerstore +func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, protocols ...protocol.ID) (peer.ID, error) { + return w.peermanager.AddPeer(address, origin, protocols...) } -// AddPeer is used to add a peer and the protocols it support to the node peerstore -func (w *WakuNode) AddPeer(address ma.Multiaddr, origin peerstore1.Origin, protocols ...protocol.ID) (peer.ID, error) { - info, err := peer.AddrInfoFromP2pAddr(address) - if err != nil { - return "", err +// AddDiscoveredPeer to add a discovered peer to the node peerStore +func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin) { + p := peermanager.PeerData{ + Origin: origin, + AddrInfo: peer.AddrInfo{ + ID: ID, + Addrs: addrs, + }, } - - return info.ID, w.addPeer(info, origin, protocols...) + w.peermanager.AddDiscoveredPeer(p) } // DialPeerWithMultiAddress is used to connect to a peer using a multiaddress @@ -735,11 +728,11 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo) func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error { err := w.host.Connect(ctx, info) if err != nil { - w.host.Peerstore().(peerstore1.WakuPeerstore).AddConnFailure(info) + w.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(info) return err } - w.host.Peerstore().(peerstore1.WakuPeerstore).ResetConnFailures(info) + w.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(info) stats.Record(ctx, metrics.Dials.M(1)) return nil } diff --git a/waku/v2/node/wakuoptions_test.go b/waku/v2/node/wakuoptions_test.go index ba83d8c5..80b51bd1 100644 --- a/waku/v2/node/wakuoptions_test.go +++ b/waku/v2/node/wakuoptions_test.go @@ -28,7 +28,7 @@ func TestWakuOptions(t *testing.T) { require.NoError(t, err) storeFactory := func(w *WakuNode) store.Store { - return store.NewWakuStore(w.opts.messageProvider, w.timesource, w.log) + return store.NewWakuStore(w.opts.messageProvider, w.peermanager, w.timesource, w.log) } options := []WakuNodeOption{ diff --git a/waku/v2/peermanager/discovery_connector.go b/waku/v2/peermanager/discovery_connector.go index 040b53e0..362d5cf7 100644 --- a/waku/v2/peermanager/discovery_connector.go +++ b/waku/v2/peermanager/discovery_connector.go @@ -12,7 +12,6 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/p2p/discovery/backoff" "github.com/waku-org/go-waku/logging" @@ -36,6 +35,7 @@ type PeerConnectionStrategy struct { cache *lru.TwoQueueCache host host.Host + pm *PeerManager cancel context.CancelFunc paused bool @@ -114,6 +114,10 @@ func (c *PeerConnectionStrategy) SetHost(h host.Host) { c.host = h } +func (c *PeerConnectionStrategy) SetPeerManager(pm *PeerManager) { + c.pm = pm +} + // Start attempts to connect to the peers passed in by peerCh. Will not connect to peers if they are within the backoff period. func (c *PeerConnectionStrategy) Start(ctx context.Context) error { if c.cancel != nil { @@ -225,19 +229,7 @@ func (c *PeerConnectionStrategy) workPublisher(ctx context.Context) { case <-ctx.Done(): return case p := <-c.peerCh: - c.host.Peerstore().AddAddrs(p.AddrInfo.ID, p.AddrInfo.Addrs, peerstore.AddressTTL) - err := c.host.Peerstore().(wps.WakuPeerstore).SetOrigin(p.AddrInfo.ID, p.Origin) - if err != nil { - c.logger.Error("could not set origin", zap.Error(err), logging.HostID("peer", p.AddrInfo.ID)) - } - - if p.ENR != nil { - err = c.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR) - if err != nil { - c.logger.Error("could not store enr", zap.Error(err), logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) - } - } - + c.pm.AddDiscoveredPeer(p) c.publishWork(ctx, p.AddrInfo) case <-time.After(1 * time.Second): // This timeout is to not lock the goroutine diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 0f070a3a..f892e938 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -6,8 +6,12 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" + ma "github.com/multiformats/go-multiaddr" + "github.com/waku-org/go-waku/logging" wps "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -23,6 +27,7 @@ type PeerManager struct { InRelayPeersTarget uint OutRelayPeersTarget uint host host.Host + serviceSlots map[protocol.ID][]peer.ID } const maxRelayPeersShare = 5 @@ -42,6 +47,7 @@ func NewPeerManager(maxConnections uint, logger *zap.Logger) *PeerManager { maxRelayPeers: maxRelayPeersValue, InRelayPeersTarget: maxRelayPeersValue - outRelayPeersTargetValue, OutRelayPeersTarget: outRelayPeersTargetValue, + serviceSlots: make(map[protocol.ID][]peer.ID), } logger.Info("PeerManager init values", zap.Uint("maxConnections", maxConnections), zap.Uint("maxRelayPeersValue", maxRelayPeersValue), zap.Uint("outRelayPeersTargetValue", outRelayPeersTargetValue), @@ -72,23 +78,6 @@ func (pm *PeerManager) connectivityLoop(ctx context.Context) { } } -func (pm *PeerManager) filterPeersByProto(peers peer.IDSlice, proto ...protocol.ID) peer.IDSlice { - var filteredPeers peer.IDSlice - //TODO: This can be optimized once we have waku's own peerStore - - for _, p := range peers { - supportedProtocols, err := pm.host.Peerstore().SupportsProtocols(p, proto...) - if err != nil { - pm.logger.Warn("Failed to get supported protocols for peer", zap.String("peerID", p.String())) - continue - } - if len(supportedProtocols) != 0 { - filteredPeers = append(filteredPeers, p) - } - } - return filteredPeers -} - func (pm *PeerManager) pruneInRelayConns() { var inRelayPeers peer.IDSlice @@ -100,8 +89,8 @@ func (pm *PeerManager) pruneInRelayConns() { pm.logger.Info("Number of peers connected", zap.Int("inPeers", inPeers.Len()), zap.Int("outPeers", outPeers.Len())) //Need to filter peers to check if they support relay - inRelayPeers = pm.filterPeersByProto(inPeers, WakuRelayIDv200) - outRelayPeers := pm.filterPeersByProto(outPeers, WakuRelayIDv200) + inRelayPeers, _ = utils.FilterPeersByProto(pm.host, inPeers, WakuRelayIDv200) + outRelayPeers, _ := utils.FilterPeersByProto(pm.host, outPeers, WakuRelayIDv200) pm.logger.Info("Number of Relay peers connected", zap.Int("inRelayPeers", inRelayPeers.Len()), zap.Int("outRelayPeers", outRelayPeers.Len())) if inRelayPeers.Len() > int(pm.InRelayPeersTarget) { @@ -120,3 +109,119 @@ func (pm *PeerManager) pruneInRelayConns() { } } } + +// AddDiscoveredPeer to add dynamically discovered peers. +// Note that these peers will not be set in service-slots. +// TODO: It maybe good to set in service-slots based on services supported in the ENR +func (pm *PeerManager) AddDiscoveredPeer(p PeerData) { + + _ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin) + + if p.ENR != nil { + err := pm.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR) + if err != nil { + pm.logger.Error("could not store enr", zap.Error(err), logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) + } + } +} + +// addPeer adds peer to only the peerStore. +// It also sets additional metadata such as origin, ENR and supported protocols +func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, protocols ...protocol.ID) error { + pm.logger.Info("adding peer to peerstore", logging.HostID("peer", ID)) + pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.AddressTTL) + err := pm.host.Peerstore().(wps.WakuPeerstore).SetOrigin(ID, origin) + if err != nil { + pm.logger.Error("could not set origin", zap.Error(err), logging.HostID("peer", ID)) + return err + } + + if len(protocols) > 0 { + err = pm.host.Peerstore().AddProtocols(ID, protocols...) + if err != nil { + return err + } + } + return nil +} + +// AddPeer adds peer to the peerStore and also to service slots +func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, protocols ...protocol.ID) (peer.ID, error) { + //Assuming all addresses have peerId + info, err := peer.AddrInfoFromP2pAddr(address) + if err != nil { + return "", err + } + + //Add Service peers to serviceSlots. + for _, proto := range protocols { + pm.AddPeerToServiceSlot(proto, info.ID, origin) + } + + //Add to the peer-store + err = pm.addPeer(info.ID, info.Addrs, origin) + if err != nil { + return "", err + } + + return info.ID, nil +} + +// RemovePeer deletes peer from the peerStore after disconnecting it. +// It also removes the peer from serviceSlot. +func (pm *PeerManager) RemovePeer(peerID peer.ID) { + pm.host.Peerstore().RemovePeer(peerID) + //Search if this peer is in serviceSlot and if so, remove it from there + // TODO:Add another peer which is statically configured to the serviceSlot. + for proto, peers := range pm.serviceSlots { + for i, peer := range peers { + if peer == peerID { + pm.serviceSlots[proto][i] = "" + } + } + } +} + +// AddPeerToServiceSlot adds a peerID to serviceSlot. +// Adding to peerStore is expected to be already done by caller. +// If relay proto is passed, it is not added to serviceSlot. +func (pm *PeerManager) AddPeerToServiceSlot(proto protocol.ID, peerID peer.ID, origin wps.Origin) { + if proto == WakuRelayIDv200 { + pm.logger.Warn("Cannot add Relay peer to service peer slots") + return + } + + //For now adding the peer to serviceSlot which means the latest added peer would be given priority. + //TODO: Ideally we should sort the peers per service and return best peer based on peer score or RTT etc. + pm.logger.Info("Adding peer to service slots", logging.HostID("peer", peerID), zap.String("service", string(proto))) + pm.serviceSlots[proto] = append(pm.serviceSlots[proto], peerID) +} + +// SelectPeer is used to return a random peer that supports a given protocol. +// If a list of specific peers is passed, the peer will be chosen from that list assuming +// it supports the chosen protocol, otherwise it will chose a peer from the service slot. +// If a peer cannot be found in the service slot, a peer will be selected from node peerstore +func (pm *PeerManager) SelectPeer(proto protocol.ID, specificPeers []peer.ID, logger *zap.Logger) (peer.ID, error) { + // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. + // Ideally depending on the query and our set of peers we take a subset of ideal peers. + // This will require us to check for various factors such as: + // - which topics they track + // - latency? + + filteredPeers, err := utils.FilterPeersByProto(pm.host, specificPeers, proto) + if err != nil { + return "", err + } + if proto == WakuRelayIDv200 { + return utils.SelectRandomPeer(filteredPeers, pm.logger) + } + + //Try to fetch from serviceSlot + peerIDs, ok := pm.serviceSlots[proto] + if ok || len(peerIDs) > 0 { + pm.logger.Info("Got peer from service slots", logging.HostID("peer", peerIDs[0])) + return peerIDs[0], nil + } + + return utils.SelectRandomPeer(filteredPeers, pm.logger) +} diff --git a/waku/v2/peermanager/test/peer_manager_test.go b/waku/v2/peermanager/test/peer_manager_test.go new file mode 100644 index 00000000..c10abfad --- /dev/null +++ b/waku/v2/peermanager/test/peer_manager_test.go @@ -0,0 +1,112 @@ +package test + +import ( + "context" + "crypto/rand" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/peerstore" + libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/peermanager" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/utils" +) + +func TestServiceSlots(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + h1, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + defer h1.Close() + + h2, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + defer h2.Close() + + h3, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + defer h3.Close() + protocol := libp2pProtocol.ID("test/protocol") + protocol1 := libp2pProtocol.ID("test/protocol1") + + pm := peermanager.NewPeerManager(10, utils.Logger()) + pm.SetHost(h1) + + h1.Peerstore().AddAddrs(h2.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) + err = h1.Peerstore().AddProtocols(h2.ID(), libp2pProtocol.ID(protocol)) + require.NoError(t, err) + + //Test selection from peerStore. + peerId, err := pm.SelectPeer(protocol, nil, utils.Logger()) + require.NoError(t, err) + require.Equal(t, peerId, h2.ID()) + + //Test addition and selection from service-slot + pm.AddPeerToServiceSlot(protocol, h2.ID(), wps.Static) + + peerId, err = pm.SelectPeer(protocol, nil, utils.Logger()) + require.NoError(t, err) + require.Equal(t, peerId, h2.ID()) + + h1.Peerstore().AddAddrs(h3.ID(), h3.Network().ListenAddresses(), peerstore.PermanentAddrTTL) + pm.AddPeerToServiceSlot(protocol, h3.ID(), wps.Static) + + h4, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + defer h4.Close() + + h1.Peerstore().AddAddrs(h4.ID(), h4.Network().ListenAddresses(), peerstore.PermanentAddrTTL) + pm.AddPeerToServiceSlot(protocol1, h4.ID(), wps.Static) + + //Test peer selection from first added peer to serviceSlot + peerId, err = pm.SelectPeer(protocol, nil, utils.Logger()) + require.NoError(t, err) + require.Equal(t, peerId, h2.ID()) + + //Test peer selection for specific protocol + peerId, err = pm.SelectPeer(protocol1, nil, utils.Logger()) + require.NoError(t, err) + require.Equal(t, peerId, h4.ID()) + + h5, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + defer h5.Close() + + //Test empty peer selection for relay protocol + _, err = pm.SelectPeer(peermanager.WakuRelayIDv200, nil, utils.Logger()) + require.Error(t, err, utils.ErrNoPeersAvailable) + //Test peer selection for relay protocol from peer store + h1.Peerstore().AddAddrs(h5.ID(), h5.Network().ListenAddresses(), peerstore.PermanentAddrTTL) + pm.AddPeerToServiceSlot(peermanager.WakuRelayIDv200, h5.ID(), wps.Static) + + _, err = pm.SelectPeer(peermanager.WakuRelayIDv200, nil, utils.Logger()) + require.Error(t, err, utils.ErrNoPeersAvailable) + + err = h1.Peerstore().AddProtocols(h5.ID(), peermanager.WakuRelayIDv200) + require.NoError(t, err) + + peerId, err = pm.SelectPeer(peermanager.WakuRelayIDv200, nil, utils.Logger()) + require.NoError(t, err) + require.Equal(t, peerId, h5.ID()) + + //Test random peer selection + protocol2 := libp2pProtocol.ID("test/protocol2") + h6, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + defer h6.Close() + + h1.Peerstore().AddAddrs(h6.ID(), h6.Network().ListenAddresses(), peerstore.PermanentAddrTTL) + err = h1.Peerstore().AddProtocols(h6.ID(), libp2pProtocol.ID(protocol2)) + require.NoError(t, err) + + peerId, err = pm.SelectPeer(protocol2, nil, utils.Logger()) + require.NoError(t, err) + require.Equal(t, peerId, h6.ID()) + + pm.RemovePeer(peerId) + _, err = pm.SelectPeer(protocol2, nil, utils.Logger()) + require.Error(t, err, utils.ErrNoPeersAvailable) +} diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 9763b3de..da0ab8b6 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -16,6 +16,7 @@ import ( "github.com/libp2p/go-msgio/pbio" "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/metrics" + "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -42,6 +43,7 @@ type WakuFilterLightnode struct { wg *sync.WaitGroup log *zap.Logger subscriptions *SubscriptionsMap + pm *peermanager.PeerManager } type ContentFilter struct { @@ -54,13 +56,17 @@ type WakuFilterPushResult struct { PeerID peer.ID } -// NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options -func NewWakuFilterLightnode(broadcaster relay.Broadcaster, timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightnode { +// NewWakuFilterLightnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options +// Takes an optional peermanager if WakuFilterLightnode is being created along with WakuNode. +// If using libp2p host, then pass peermanager as nil +func NewWakuFilterLightnode(broadcaster relay.Broadcaster, pm *peermanager.PeerManager, + timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightnode { wf := new(WakuFilterLightnode) wf.log = log.Named("filterv2-lightnode") wf.broadcaster = broadcaster wf.timesource = timesource wf.wg = &sync.WaitGroup{} + wf.pm = pm return wf } @@ -220,6 +226,7 @@ func (wf *WakuFilterLightnode) Subscribe(ctx context.Context, contentFilter Cont params := new(FilterSubscribeParameters) params.log = wf.log params.host = wf.h + params.pm = wf.pm optList := DefaultSubscriptionOptions() optList = append(optList, opts...) diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index 4204502e..be9d1382 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -44,7 +44,7 @@ func makeWakuFilterLightNode(t *testing.T) (*WakuFilterLightnode, host.Host) { b := relay.NewBroadcaster(10) require.NoError(t, b.Start(context.Background())) - filterPush := NewWakuFilterLightnode(b, timesource.NewDefaultClock(), utils.Logger()) + filterPush := NewWakuFilterLightnode(b, nil, timesource.NewDefaultClock(), utils.Logger()) filterPush.SetHost(host) err = filterPush.Start(context.Background()) require.NoError(t, err) diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index f3c65c90..188638b9 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -6,6 +6,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -15,6 +16,7 @@ type ( FilterSubscribeParameters struct { host host.Host selectedPeer peer.ID + pm *peermanager.PeerManager requestID []byte log *zap.Logger } @@ -54,7 +56,13 @@ func WithPeer(p peer.ID) FilterSubscribeOption { // supports the chosen protocol, otherwise it will chose a peer from the node peerstore func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { - p, err := utils.SelectPeer(params.host, FilterSubscribeID_v20beta1, fromThesePeers, params.log) + var p peer.ID + var err error + if params.pm == nil { + p, err = utils.SelectPeer(params.host, FilterSubscribeID_v20beta1, fromThesePeers, params.log) + } else { + p, err = params.pm.SelectPeer(FilterSubscribeID_v20beta1, fromThesePeers, params.log) + } if err == nil { params.selectedPeer = p } else { diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 446c1ea2..6bc6f0a5 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-msgio/pbio" "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/metrics" + "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -32,16 +33,19 @@ type WakuLightPush struct { h host.Host relay *relay.WakuRelay cancel context.CancelFunc + pm *peermanager.PeerManager log *zap.Logger } // NewWakuLightPush returns a new instance of Waku Lightpush struct -func NewWakuLightPush(relay *relay.WakuRelay, log *zap.Logger) *WakuLightPush { +// Takes an optional peermanager if WakuLightPush is being created along with WakuNode. +// If using libp2p host, then pass peermanager as nil +func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, log *zap.Logger) *WakuLightPush { wakuLP := new(WakuLightPush) wakuLP.relay = relay wakuLP.log = log.Named("lightpush") - + wakuLP.pm = pm return wakuLP } @@ -142,6 +146,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o params := new(lightPushParameters) params.host = wakuLP.h params.log = wakuLP.log + params.pm = wakuLP.pm optList := append(DefaultOptions(wakuLP.h), opts...) for _, opt := range optList { diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index 61f80752..935c4559 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -5,6 +5,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -14,6 +15,7 @@ type lightPushParameters struct { host host.Host selectedPeer peer.ID requestID []byte + pm *peermanager.PeerManager log *zap.Logger } @@ -33,7 +35,13 @@ func WithPeer(p peer.ID) Option { // from the node peerstore func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option { return func(params *lightPushParameters) { - p, err := utils.SelectPeer(params.host, LightPushID_v20beta1, fromThesePeers, params.log) + var p peer.ID + var err error + if params.pm == nil { + p, err = utils.SelectPeer(params.host, LightPushID_v20beta1, fromThesePeers, params.log) + } else { + p, err = params.pm.SelectPeer(LightPushID_v20beta1, fromThesePeers, params.log) + } if err == nil { params.selectedPeer = p } else { diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index 107b586f..08cb1609 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -61,7 +61,7 @@ func TestWakuLightPush(t *testing.T) { defer sub2.Unsubscribe() ctx := context.Background() - lightPushNode2 := NewWakuLightPush(node2, utils.Logger()) + lightPushNode2 := NewWakuLightPush(node2, nil, utils.Logger()) lightPushNode2.SetHost(host2) err := lightPushNode2.Start(ctx) require.NoError(t, err) @@ -72,7 +72,7 @@ func TestWakuLightPush(t *testing.T) { clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - client := NewWakuLightPush(nil, utils.Logger()) + client := NewWakuLightPush(nil, nil, utils.Logger()) client.SetHost(clientHost) host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) @@ -129,7 +129,7 @@ func TestWakuLightPushStartWithoutRelay(t *testing.T) { clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader) require.NoError(t, err) - client := NewWakuLightPush(nil, utils.Logger()) + client := NewWakuLightPush(nil, nil, utils.Logger()) client.SetHost(clientHost) err = client.Start(ctx) @@ -144,7 +144,7 @@ func TestWakuLightPushNoPeers(t *testing.T) { clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader) require.NoError(t, err) - client := NewWakuLightPush(nil, utils.Logger()) + client := NewWakuLightPush(nil, nil, utils.Logger()) client.SetHost(clientHost) _, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", utils.GetUnixEpoch()), testTopic) require.Errorf(t, err, "no suitable remote peers") diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index 0f1470a1..ff1b6c09 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -22,6 +22,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts params := new(PeerExchangeParameters) params.host = wakuPX.h params.log = wakuPX.log + params.pm = wakuPX.pm optList := DefaultOptions(wakuPX.h) optList = append(optList, opts...) diff --git a/waku/v2/protocol/peer_exchange/protocol.go b/waku/v2/protocol/peer_exchange/protocol.go index 7c6a05f3..832f3198 100644 --- a/waku/v2/protocol/peer_exchange/protocol.go +++ b/waku/v2/protocol/peer_exchange/protocol.go @@ -39,8 +39,8 @@ type PeerConnector interface { type WakuPeerExchange struct { h host.Host disc *discv5.DiscoveryV5 - - log *zap.Logger + pm *peermanager.PeerManager + log *zap.Logger cancel context.CancelFunc @@ -50,7 +50,9 @@ type WakuPeerExchange struct { } // NewWakuPeerExchange returns a new instance of WakuPeerExchange struct -func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, log *zap.Logger) (*WakuPeerExchange, error) { +// Takes an optional peermanager if WakuPeerExchange is being created along with WakuNode. +// If using libp2p host, then pass peermanager as nil +func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, pm *peermanager.PeerManager, log *zap.Logger) (*WakuPeerExchange, error) { newEnrCache, err := newEnrCache(MaxCacheSize) if err != nil { return nil, err @@ -60,6 +62,7 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, wakuPX.log = log.Named("wakupx") wakuPX.enrCache = newEnrCache wakuPX.peerConnector = peerConnector + wakuPX.pm = pm return wakuPX, nil } diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go index 30d1a3bc..bba3ba37 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go @@ -5,6 +5,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -12,6 +13,7 @@ import ( type PeerExchangeParameters struct { host host.Host selectedPeer peer.ID + pm *peermanager.PeerManager log *zap.Logger } @@ -30,7 +32,13 @@ func WithPeer(p peer.ID) PeerExchangeOption { // from the node peerstore func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption { return func(params *PeerExchangeParameters) { - p, err := utils.SelectPeer(params.host, PeerExchangeID_v20alpha1, fromThesePeers, params.log) + var p peer.ID + var err error + if params.pm == nil { + p, err = utils.SelectPeer(params.host, PeerExchangeID_v20alpha1, fromThesePeers, params.log) + } else { + p, err = params.pm.SelectPeer(PeerExchangeID_v20alpha1, fromThesePeers, params.log) + } if err == nil { params.selectedPeer = p } else { diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index 8db5db8f..9f6adc20 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -142,12 +142,12 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { // mount peer exchange pxPeerConn1 := tests.NewTestPeerDiscoverer() - px1, err := NewWakuPeerExchange(d1, pxPeerConn1, utils.Logger()) + px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, utils.Logger()) require.NoError(t, err) px1.SetHost(host1) pxPeerConn3 := tests.NewTestPeerDiscoverer() - px3, err := NewWakuPeerExchange(nil, pxPeerConn3, utils.Logger()) + px3, err := NewWakuPeerExchange(nil, pxPeerConn3, nil, utils.Logger()) require.NoError(t, err) px3.SetHost(host3) diff --git a/waku/v2/protocol/store/waku_resume_test.go b/waku/v2/protocol/store/waku_resume_test.go index c7fa7382..8f31d8c3 100644 --- a/waku/v2/protocol/store/waku_resume_test.go +++ b/waku/v2/protocol/store/waku_resume_test.go @@ -24,7 +24,7 @@ func TestFindLastSeenMessage(t *testing.T) { msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", now+4), utils.GetUnixEpoch(), "test") msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", now+5), utils.GetUnixEpoch(), "test") - s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) _ = s.storeMessage(msg1) _ = s.storeMessage(msg3) _ = s.storeMessage(msg5) @@ -44,7 +44,7 @@ func TestResume(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s1 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) s1.SetHost(host1) err = s1.Start(ctx, relay.NoopSubscription()) require.NoError(t, err) @@ -66,7 +66,7 @@ func TestResume(t *testing.T) { host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) s2.SetHost(host2) err = s2.Start(ctx, relay.NoopSubscription()) require.NoError(t, err) @@ -104,7 +104,7 @@ func TestResumeWithListOfPeers(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s1 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) s1.SetHost(host1) err = s1.Start(ctx, relay.NoopSubscription()) require.NoError(t, err) @@ -118,7 +118,7 @@ func TestResumeWithListOfPeers(t *testing.T) { host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) s2.SetHost(host2) err = s2.Start(ctx, relay.NoopSubscription()) require.NoError(t, err) @@ -145,7 +145,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s1 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) s1.SetHost(host1) err = s1.Start(ctx, relay.NoopSubscription()) require.NoError(t, err) @@ -159,7 +159,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) s2.SetHost(host2) err = s2.Start(ctx, relay.NoopSubscription()) require.NoError(t, err) diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index b3a8df09..17251cbd 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -107,7 +107,13 @@ func WithPeer(p peer.ID) HistoryRequestOption { // from the node peerstore func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption { return func(params *HistoryRequestParameters) { - p, err := utils.SelectPeer(params.s.h, StoreID_v20beta4, fromThesePeers, params.s.log) + var p peer.ID + var err error + if params.s.pm == nil { + p, err = utils.SelectPeer(params.s.h, StoreID_v20beta4, fromThesePeers, params.s.log) + } else { + p, err = params.s.pm.SelectPeer(StoreID_v20beta4, fromThesePeers, params.s.log) + } if err == nil { params.selectedPeer = p } else { diff --git a/waku/v2/protocol/store/waku_store_common.go b/waku/v2/protocol/store/waku_store_common.go index ffbe7cf8..537bb03e 100644 --- a/waku/v2/protocol/store/waku_store_common.go +++ b/waku/v2/protocol/store/waku_store_common.go @@ -7,6 +7,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" + "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" "go.uber.org/zap" @@ -58,15 +59,19 @@ type WakuStore struct { msgProvider MessageProvider h host.Host + pm *peermanager.PeerManager } // NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages -func NewWakuStore(p MessageProvider, timesource timesource.Timesource, log *zap.Logger) *WakuStore { +// Takes an optional peermanager if WakuStore is being created along with WakuNode. +// If using libp2p host, then pass peermanager as nil +func NewWakuStore(p MessageProvider, pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger) *WakuStore { wakuStore := new(WakuStore) wakuStore.msgProvider = p wakuStore.wg = &sync.WaitGroup{} wakuStore.log = log.Named("store") wakuStore.timesource = timesource + wakuStore.pm = pm return wakuStore } diff --git a/waku/v2/protocol/store/waku_store_persistence_test.go b/waku/v2/protocol/store/waku_store_persistence_test.go index b1ee845a..929e20f8 100644 --- a/waku/v2/protocol/store/waku_store_persistence_test.go +++ b/waku/v2/protocol/store/waku_store_persistence_test.go @@ -14,7 +14,7 @@ import ( func TestStorePersistence(t *testing.T) { db := MemoryDB(t) - s1 := NewWakuStore(db, timesource.NewDefaultClock(), utils.Logger()) + s1 := NewWakuStore(db, nil, timesource.NewDefaultClock(), utils.Logger()) defaultPubSubTopic := "test" defaultContentTopic := "1" diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index 53176c64..1b1f9175 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -23,7 +23,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s1 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) s1.SetHost(host1) topic1 := "1" @@ -42,7 +42,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { require.NoError(t, err) defer s1.Stop() - s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) s2.SetHost(host2) @@ -73,7 +73,7 @@ func TestWakuStoreProtocolLocalQuery(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s1 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) s1.SetHost(host1) topic1 := "1" @@ -113,7 +113,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { require.NoError(t, err) db := MemoryDB(t) - s1 := NewWakuStore(db, timesource.NewDefaultClock(), utils.Logger()) + s1 := NewWakuStore(db, nil, timesource.NewDefaultClock(), utils.Logger()) s1.SetHost(host1) topic1 := "1" @@ -143,7 +143,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { err = host2.Peerstore().AddProtocols(host1.ID(), StoreID_v20beta4) require.NoError(t, err) - s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) s2.SetHost(host2) err = s2.Start(ctx, relay.NoopSubscription()) require.NoError(t, err) @@ -188,7 +188,7 @@ func TestWakuStoreResult(t *testing.T) { require.NoError(t, err) db := MemoryDB(t) - s1 := NewWakuStore(db, timesource.NewDefaultClock(), utils.Logger()) + s1 := NewWakuStore(db, nil, timesource.NewDefaultClock(), utils.Logger()) s1.SetHost(host1) topic1 := "1" @@ -218,7 +218,7 @@ func TestWakuStoreResult(t *testing.T) { err = host2.Peerstore().AddProtocols(host1.ID(), StoreID_v20beta4) require.NoError(t, err) - s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) s2.SetHost(host2) err = s2.Start(ctx, relay.NoopSubscription()) require.NoError(t, err) @@ -278,7 +278,7 @@ func TestWakuStoreProtocolFind(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s1 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) s1.SetHost(host1) topic1 := "1" @@ -317,7 +317,7 @@ func TestWakuStoreProtocolFind(t *testing.T) { err = host2.Peerstore().AddProtocols(host1.ID(), StoreID_v20beta4) require.NoError(t, err) - s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s2 := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) s2.SetHost(host2) err = s2.Start(ctx, relay.NoopSubscription()) require.NoError(t, err) diff --git a/waku/v2/protocol/store/waku_store_query_test.go b/waku/v2/protocol/store/waku_store_query_test.go index b358ed51..6be3031b 100644 --- a/waku/v2/protocol/store/waku_store_query_test.go +++ b/waku/v2/protocol/store/waku_store_query_test.go @@ -21,7 +21,7 @@ func TestStoreQuery(t *testing.T) { msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch()) msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch()) - s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), defaultPubSubTopic)) _ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), defaultPubSubTopic)) @@ -47,7 +47,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), defaultPubSubTopic)) _ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), defaultPubSubTopic)) @@ -80,7 +80,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic2)) @@ -112,7 +112,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic2)) @@ -134,7 +134,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1)) @@ -153,7 +153,7 @@ func TestStoreQueryForwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) for i := 0; i < 10; i++ { msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch()) msg.Payload = []byte{byte(i)} @@ -177,7 +177,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) for i := 0; i < 10; i++ { msg := &wpb.WakuMessage{ Payload: []byte{byte(i)}, @@ -203,7 +203,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { } func TestTemporalHistoryQueries(t *testing.T) { - s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s := NewWakuStore(MemoryDB(t), nil, timesource.NewDefaultClock(), utils.Logger()) var messages []*wpb.WakuMessage now := utils.GetUnixEpoch() diff --git a/waku/v2/utils/peer.go b/waku/v2/utils/peer.go index a475a0d4..66864aa1 100644 --- a/waku/v2/utils/peer.go +++ b/waku/v2/utils/peer.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/multiformats/go-multiaddr" + "github.com/waku-org/go-waku/logging" "go.uber.org/zap" ) @@ -33,7 +34,42 @@ func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) { return peerID, nil } +// FilterPeersByProto filters list of peers that support specified protocols. +// If specificPeers is nil, all peers in the host's peerStore are considered for filtering. +func FilterPeersByProto(host host.Host, specificPeers peer.IDSlice, proto ...protocol.ID) (peer.IDSlice, error) { + peerSet := specificPeers + if len(peerSet) == 0 { + peerSet = host.Peerstore().Peers() + } + + var peers peer.IDSlice + for _, peer := range peerSet { + protocols, err := host.Peerstore().SupportsProtocols(peer, proto...) + if err != nil { + return nil, err + } + + if len(protocols) > 0 { + peers = append(peers, peer) + } + } + return peers, nil +} + +// SelectRandomPeer selects randomly a peer from the list of peers passed. +func SelectRandomPeer(peers peer.IDSlice, log *zap.Logger) (peer.ID, error) { + if len(peers) >= 1 { + peerID := peers[rand.Intn(len(peers))] + // TODO: proper heuristic here that compares peer scores and selects "best" one. For now a random peer for the given protocol is returned + log.Info("Got random peer from peerstore", logging.HostID("peer", peerID)) + return peerID, nil // nolint: gosec + } + + return "", ErrNoPeersAvailable +} + // SelectPeer is used to return a random peer that supports a given protocol. +// Note: Use this method only if WakuNode is not being initialized, otherwise use peermanager.SelectPeer. // If a list of specific peers is passed, the peer will be chosen from that list assuming // it supports the chosen protocol, otherwise it will chose a peer from the node peerstore func SelectPeer(host host.Host, protocolId protocol.ID, specificPeers []peer.ID, log *zap.Logger) (peer.ID, error) { @@ -44,29 +80,12 @@ func SelectPeer(host host.Host, protocolId protocol.ID, specificPeers []peer.ID, // - latency? // - default store peer? - peerSet := specificPeers - if len(peerSet) == 0 { - peerSet = host.Peerstore().Peers() + peers, err := FilterPeersByProto(host, specificPeers, protocolId) + if err != nil { + return "", err } - var peers peer.IDSlice - for _, peer := range peerSet { - protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId) - if err != nil { - return "", err - } - - if len(protocols) > 0 { - peers = append(peers, peer) - } - } - - if len(peers) >= 1 { - // TODO: proper heuristic here that compares peer scores and selects "best" one. For now a random peer for the given protocol is returned - return peers[rand.Intn(len(peers))], nil // nolint: gosec - } - - return "", ErrNoPeersAvailable + return SelectRandomPeer(peers, log) } type pingResult struct { diff --git a/waku/v2/utils/peer_test.go b/waku/v2/utils/test/peer_test.go similarity index 78% rename from waku/v2/utils/peer_test.go rename to waku/v2/utils/test/peer_test.go index d961a25c..44463c7d 100644 --- a/waku/v2/utils/peer_test.go +++ b/waku/v2/utils/test/peer_test.go @@ -1,4 +1,4 @@ -package utils +package tests import ( "context" @@ -10,6 +10,7 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/utils" ) func TestSelectPeer(t *testing.T) { @@ -31,17 +32,17 @@ func TestSelectPeer(t *testing.T) { proto := protocol.ID("test/protocol") h1.Peerstore().AddAddrs(h2.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) - h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) + h1.Peerstore().AddAddrs(h3.ID(), h3.Network().ListenAddresses(), peerstore.PermanentAddrTTL) // No peers with selected protocol - _, err = SelectPeer(h1, proto, nil, Logger()) - require.Error(t, ErrNoPeersAvailable, err) + _, err = utils.SelectPeer(h1, proto, nil, utils.Logger()) + require.Error(t, utils.ErrNoPeersAvailable, err) // Peers with selected protocol _ = h1.Peerstore().AddProtocols(h2.ID(), proto) _ = h1.Peerstore().AddProtocols(h3.ID(), proto) - _, err = SelectPeerWithLowestRTT(ctx, h1, proto, nil, Logger()) + _, err = utils.SelectPeerWithLowestRTT(ctx, h1, proto, nil, utils.Logger()) require.NoError(t, err) } @@ -70,13 +71,13 @@ func TestSelectPeerWithLowestRTT(t *testing.T) { h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) // No peers with selected protocol - _, err = SelectPeerWithLowestRTT(ctx, h1, proto, nil, Logger()) - require.Error(t, ErrNoPeersAvailable, err) + _, err = utils.SelectPeerWithLowestRTT(ctx, h1, proto, nil, utils.Logger()) + require.Error(t, utils.ErrNoPeersAvailable, err) // Peers with selected protocol _ = h1.Peerstore().AddProtocols(h2.ID(), proto) _ = h1.Peerstore().AddProtocols(h3.ID(), proto) - _, err = SelectPeerWithLowestRTT(ctx, h1, proto, nil, Logger()) + _, err = utils.SelectPeerWithLowestRTT(ctx, h1, proto, nil, utils.Logger()) require.NoError(t, err) }