diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 14790833..2fb68879 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -243,8 +243,9 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { if err != nil { w.log.Error("creating localnode", zap.Error(err)) } + //Initialize peer manager. - w.peermanager = peermanager.NewPeerManager(uint(w.opts.maxPeerConnections), w.log) + w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.log) maxOutPeers := int(w.peermanager.OutRelayPeersTarget) // Setup peer connection strategy @@ -257,6 +258,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { if err != nil { w.log.Error("creating peer connection strategy", zap.Error(err)) } + w.peermanager.SetPeerConnector(w.peerConnector) if w.opts.enableDiscV5 { err := w.mountDiscV5() diff --git a/waku/v2/peermanager/discovery_connector.go b/waku/v2/peermanager/peer_connector.go similarity index 73% rename from waku/v2/peermanager/discovery_connector.go rename to waku/v2/peermanager/peer_connector.go index 362d5cf7..460f6cd4 100644 --- a/waku/v2/peermanager/discovery_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -29,7 +29,8 @@ type PeerData struct { ENR *enode.Node } -// PeerConnectionStrategy is a utility to connect to peers, but only if we have not recently tried connecting to them already +// PeerConnectionStrategy is a utility to connect to peers, +// but only if we have not recently tried connecting to them already type PeerConnectionStrategy struct { sync.RWMutex @@ -54,12 +55,17 @@ type PeerConnectionStrategy struct { logger *zap.Logger } -// NewPeerConnectionStrategy creates a utility to connect to peers, but only if we have not recently tried connecting to them already. +// NewPeerConnectionStrategy creates a utility to connect to peers, +// but only if we have not recently tried connecting to them already. +// // cacheSize is the size of a TwoQueueCache // dialTimeout is how long we attempt to connect to a peer before giving up // minPeers is the minimum number of peers that the node should have // backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer -func NewPeerConnectionStrategy(cacheSize int, maxOutPeers int, dialTimeout time.Duration, backoff backoff.BackoffFactory, logger *zap.Logger) (*PeerConnectionStrategy, error) { +func NewPeerConnectionStrategy(cacheSize int, maxOutPeers int, + dialTimeout time.Duration, backoff backoff.BackoffFactory, + logger *zap.Logger) (*PeerConnectionStrategy, error) { + cache, err := lru.New2Q(cacheSize) if err != nil { return nil, err @@ -109,16 +115,18 @@ func (c *PeerConnectionStrategy) consumeSubscription(ctx context.Context, ch <-c } -// Sets the host to be able to mount or consume a protocol +// SetHost sets the host to be able to mount or consume a protocol func (c *PeerConnectionStrategy) SetHost(h host.Host) { c.host = h } +// SetPeerManager sets the peermanager in order to utilize add peer func (c *PeerConnectionStrategy) SetPeerManager(pm *PeerManager) { c.pm = pm } -// Start attempts to connect to the peers passed in by peerCh. Will not connect to peers if they are within the backoff period. +// Start attempts to connect to the peers passed in by peerCh. +// Will not connect to peers if they are within the backoff period. func (c *PeerConnectionStrategy) Start(ctx context.Context) error { if c.cancel != nil { return errors.New("already started") @@ -139,6 +147,7 @@ func (c *PeerConnectionStrategy) Start(ctx context.Context) error { return nil } +// Stop terminates the peer-connector func (c *PeerConnectionStrategy) Stop() { if c.cancel == nil { return @@ -176,9 +185,9 @@ func (c *PeerConnectionStrategy) shouldDialPeers(ctx context.Context) { return case <-ticker.C: isPaused := c.isPaused() - _, outRelayPeers, err := c.host.Peerstore().(wps.WakuPeerstore).GroupPeersByDirection() + _, outRelayPeers, err := c.pm.GroupPeersByDirection() if err != nil { - c.logger.Info("Failed to get outRelayPeers from peerstore", zap.Error(err)) + c.logger.Warn("failed to get outRelayPeers from peerstore", zap.Error(err)) continue } numPeers := outRelayPeers.Len() @@ -245,6 +254,28 @@ func (c *PeerConnectionStrategy) workPublisher(ctx context.Context) { const maxActiveDials = 5 +func (c *PeerConnectionStrategy) canDialPeer(pi peer.AddrInfo) bool { + c.mux.Lock() + val, ok := c.cache.Get(pi.ID) + var cachedPeer *connCacheData + if ok { + tv := val.(*connCacheData) + now := time.Now() + if now.Before(tv.nextTry) { + c.mux.Unlock() + return false + } + + tv.nextTry = now.Add(tv.strat.Delay()) + } else { + cachedPeer = &connCacheData{strat: c.backoff()} + cachedPeer.nextTry = time.Now().Add(cachedPeer.strat.Delay()) + c.cache.Add(pi.ID, cachedPeer) + } + c.mux.Unlock() + return true +} + func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) { defer c.wg.Done() @@ -262,51 +293,34 @@ func (c *PeerConnectionStrategy) dialPeers(ctx context.Context) { return } - if pi.ID == c.host.ID() || pi.ID == "" { + if pi.ID == c.host.ID() || pi.ID == "" || + c.host.Network().Connectedness(pi.ID) == network.Connected { continue } - if c.host.Network().Connectedness(pi.ID) == network.Connected { - continue - } - - c.mux.Lock() - val, ok := c.cache.Get(pi.ID) - var cachedPeer *connCacheData - if ok { - tv := val.(*connCacheData) - now := time.Now() - if now.Before(tv.nextTry) { - c.mux.Unlock() - continue - } - - tv.nextTry = now.Add(tv.strat.Delay()) + if c.canDialPeer(pi) { + sem <- struct{}{} + c.wg.Add(1) + go c.dialPeer(pi, sem) } else { - cachedPeer = &connCacheData{strat: c.backoff()} - cachedPeer.nextTry = time.Now().Add(cachedPeer.strat.Delay()) - c.cache.Add(pi.ID, cachedPeer) + continue } - c.mux.Unlock() - - sem <- struct{}{} - c.wg.Add(1) - go func(pi peer.AddrInfo) { - defer c.wg.Done() - c.RLock() - ctx, cancel := context.WithTimeout(c.workerCtx, c.dialTimeout) - c.RUnlock() - defer cancel() - err := c.host.Connect(ctx, pi) - if err != nil && !errors.Is(err, context.Canceled) { - c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi) - c.logger.Info("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err)) - } - <-sem - }(pi) - case <-ctx.Done(): return } } } + +func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) { + defer c.wg.Done() + c.RLock() + ctx, cancel := context.WithTimeout(c.workerCtx, c.dialTimeout) + c.RUnlock() + defer cancel() + err := c.host.Connect(ctx, pi) + if err != nil && !errors.Is(err, context.Canceled) { + c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi) + c.logger.Warn("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err)) + } + <-sem +} diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index f892e938..466eff55 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -5,6 +5,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" @@ -22,12 +23,15 @@ const WakuRelayIDv200 = protocol.ID("/vac/waku/relay/2.0.0") // PeerManager applies various controls and manage connections towards peers. type PeerManager struct { - maxRelayPeers uint + peerConnector *PeerConnectionStrategy + maxConnections int + maxRelayPeers int logger *zap.Logger - InRelayPeersTarget uint - OutRelayPeersTarget uint + InRelayPeersTarget int + OutRelayPeersTarget int host host.Host serviceSlots map[protocol.ID][]peer.ID + ctx context.Context } const maxRelayPeersShare = 5 @@ -35,78 +39,169 @@ const maxRelayPeersShare = 5 // const defaultMaxOutRelayPeersTarget = 10 const outRelayPeersShare = 3 const peerConnectivityLoopSecs = 15 +const minOutRelayConns = 10 // NewPeerManager creates a new peerManager instance. -func NewPeerManager(maxConnections uint, logger *zap.Logger) *PeerManager { +func NewPeerManager(maxConnections int, logger *zap.Logger) *PeerManager { maxRelayPeersValue := maxConnections - (maxConnections / maxRelayPeersShare) - outRelayPeersTargetValue := uint(maxRelayPeersValue / outRelayPeersShare) + outRelayPeersTargetValue := int(maxRelayPeersValue / outRelayPeersShare) + if outRelayPeersTargetValue < minOutRelayConns { + outRelayPeersTargetValue = minOutRelayConns + } + inRelayPeersTargetValue := maxRelayPeersValue - outRelayPeersTargetValue + if inRelayPeersTargetValue < 0 { + inRelayPeersTargetValue = 0 + } pm := &PeerManager{ + maxConnections: maxConnections, logger: logger.Named("peer-manager"), maxRelayPeers: maxRelayPeersValue, - InRelayPeersTarget: maxRelayPeersValue - outRelayPeersTargetValue, + InRelayPeersTarget: inRelayPeersTargetValue, OutRelayPeersTarget: outRelayPeersTargetValue, serviceSlots: make(map[protocol.ID][]peer.ID), } - logger.Info("PeerManager init values", zap.Uint("maxConnections", maxConnections), - zap.Uint("maxRelayPeersValue", maxRelayPeersValue), zap.Uint("outRelayPeersTargetValue", outRelayPeersTargetValue), - zap.Uint("inRelayPeersTarget", pm.InRelayPeersTarget)) + logger.Info("PeerManager init values", zap.Int("maxConnections", maxConnections), + zap.Int("maxRelayPeersValue", maxRelayPeersValue), + zap.Int("outRelayPeersTargetValue", outRelayPeersTargetValue), + zap.Int("inRelayPeersTarget", pm.InRelayPeersTarget)) return pm } +// SetHost sets the host to be used in order to access the peerStore. func (pm *PeerManager) SetHost(host host.Host) { pm.host = host } +// SetPeerConnector sets the peer connector to be used for establishing relay connections. +func (pm *PeerManager) SetPeerConnector(pc *PeerConnectionStrategy) { + pm.peerConnector = pc +} + // Start starts the processing to be done by peer manager. func (pm *PeerManager) Start(ctx context.Context) { + pm.ctx = ctx go pm.connectivityLoop(ctx) } // This is a connectivity loop, which currently checks and prunes inbound connections. func (pm *PeerManager) connectivityLoop(ctx context.Context) { t := time.NewTicker(peerConnectivityLoopSecs * time.Second) + defer t.Stop() for { select { case <-ctx.Done(): return case <-t.C: - pm.pruneInRelayConns() + pm.connectToRelayPeers() } } } -func (pm *PeerManager) pruneInRelayConns() { +// GroupPeersByDirection returns all the connected peers in peer store grouped by Inbound or outBound direction +func (pm *PeerManager) GroupPeersByDirection() (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) { - var inRelayPeers peer.IDSlice + for _, p := range pm.host.Network().Peers() { + direction, err := pm.host.Peerstore().(wps.WakuPeerstore).Direction(p) + if err == nil { + if direction == network.DirInbound { + inPeers = append(inPeers, p) + } else if direction == network.DirOutbound { + outPeers = append(outPeers, p) + } + } else { + pm.logger.Error("Failed to retrieve peer direction", + logging.HostID("peerID", p), zap.Error(err)) + } + } + return inPeers, outPeers, nil +} + +func (pm *PeerManager) getRelayPeers() (inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) { //Group peers by their connected direction inbound or outbound. - inPeers, outPeers, err := pm.host.Peerstore().(wps.WakuPeerstore).GroupPeersByDirection() + inPeers, outPeers, err := pm.GroupPeersByDirection() if err != nil { return } - pm.logger.Info("Number of peers connected", zap.Int("inPeers", inPeers.Len()), zap.Int("outPeers", outPeers.Len())) + pm.logger.Info("Number of peers connected", zap.Int("inPeers", inPeers.Len()), + zap.Int("outPeers", outPeers.Len())) //Need to filter peers to check if they support relay inRelayPeers, _ = utils.FilterPeersByProto(pm.host, inPeers, WakuRelayIDv200) - outRelayPeers, _ := utils.FilterPeersByProto(pm.host, outPeers, WakuRelayIDv200) - pm.logger.Info("Number of Relay peers connected", zap.Int("inRelayPeers", inRelayPeers.Len()), zap.Int("outRelayPeers", outRelayPeers.Len())) + outRelayPeers, _ = utils.FilterPeersByProto(pm.host, outPeers, WakuRelayIDv200) + pm.logger.Info("Number of Relay peers connected", zap.Int("inRelayPeers", inRelayPeers.Len()), + zap.Int("outRelayPeers", outRelayPeers.Len())) + return +} - if inRelayPeers.Len() > int(pm.InRelayPeersTarget) { - //Start disconnecting peers, based on what? - //For now, just disconnect most recently connected peers - //TODO: Need to have more intelligent way of doing this, maybe peer scores. - pm.logger.Info("Number of in peer connections exceed targer relay peers, hence pruning", zap.Int("inRelayPeers", inRelayPeers.Len()), zap.Uint("inRelayPeersTarget", pm.InRelayPeersTarget)) - for pruningStartIndex := pm.InRelayPeersTarget; pruningStartIndex < uint(inRelayPeers.Len()); pruningStartIndex++ { - p := inRelayPeers[pruningStartIndex] - err := pm.host.Network().ClosePeer(p) - if err != nil { - pm.logger.Warn("Failed to disconnect connection towards peer", zap.String("peerID", p.String())) - } - pm.host.Peerstore().RemovePeer(p) //TODO: Should we remove the peer immediately? - pm.logger.Info("Successfully disconnected connection towards peer", zap.String("peerID", p.String())) +func (pm *PeerManager) connectToRelayPeers() { + + //Check for out peer connections and connect to more peers. + inRelayPeers, outRelayPeers := pm.getRelayPeers() + if inRelayPeers.Len() > 0 && + inRelayPeers.Len() > pm.InRelayPeersTarget { + pm.pruneInRelayConns(inRelayPeers, outRelayPeers) + } + + if outRelayPeers.Len() > pm.OutRelayPeersTarget { + return + } + totalRelayPeers := inRelayPeers.Len() + outRelayPeers.Len() + // Establish additional connections if there are peers. + //What if the not connected peers in peerstore are not relay peers??? + if totalRelayPeers < pm.host.Peerstore().Peers().Len() { + //Find not connected peers. + notConnectedPeers := pm.getNotConnectedPers() + //Figure out outside backoff peers. + + //Connect to eligible peers. + numPeersToConnect := pm.maxRelayPeers - totalRelayPeers + + if numPeersToConnect > notConnectedPeers.Len() { + numPeersToConnect = notConnectedPeers.Len() - 1 } + + pm.connectToPeers(notConnectedPeers[0:numPeersToConnect]) + } //Else: Should we raise some sort of unhealthy event?? +} + +func (pm *PeerManager) connectToPeers(peers peer.IDSlice) { + for _, peerID := range peers { + peerInfo := peer.AddrInfo{ + ID: peerID, + Addrs: pm.host.Peerstore().Addrs(peerID), + } + pm.peerConnector.publishWork(pm.ctx, peerInfo) + } +} + +func (pm *PeerManager) getNotConnectedPers() (notConnectedPeers peer.IDSlice) { + for _, peerID := range pm.host.Peerstore().Peers() { + if pm.host.Network().Connectedness(peerID) != network.Connected { + notConnectedPeers = append(notConnectedPeers, peerID) + } + } + return +} + +func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) { + + //Start disconnecting peers, based on what? + //For now, just disconnect most recently connected peers + //TODO: Need to have more intelligent way of doing this, maybe peer scores. + pm.logger.Info("Number of in peer connections exceed targer relay peers, hence pruning", + zap.Int("inRelayPeers", inRelayPeers.Len()), zap.Int("inRelayPeersTarget", pm.InRelayPeersTarget)) + for pruningStartIndex := pm.InRelayPeersTarget; pruningStartIndex < inRelayPeers.Len(); pruningStartIndex++ { + p := inRelayPeers[pruningStartIndex] + err := pm.host.Network().ClosePeer(p) + if err != nil { + pm.logger.Warn("Failed to disconnect connection towards peer", + logging.HostID("peerID", p)) + } + pm.logger.Debug("Successfully disconnected connection towards peer", + logging.HostID("peerID", p)) } } @@ -120,7 +215,8 @@ func (pm *PeerManager) AddDiscoveredPeer(p PeerData) { if p.ENR != nil { err := pm.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR) if err != nil { - pm.logger.Error("could not store enr", zap.Error(err), logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) + pm.logger.Error("could not store enr", zap.Error(err), + logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) } } } @@ -193,7 +289,8 @@ func (pm *PeerManager) AddPeerToServiceSlot(proto protocol.ID, peerID peer.ID, o //For now adding the peer to serviceSlot which means the latest added peer would be given priority. //TODO: Ideally we should sort the peers per service and return best peer based on peer score or RTT etc. - pm.logger.Info("Adding peer to service slots", logging.HostID("peer", peerID), zap.String("service", string(proto))) + pm.logger.Info("Adding peer to service slots", logging.HostID("peer", peerID), + zap.String("service", string(proto))) pm.serviceSlots[proto] = append(pm.serviceSlots[proto], peerID) } @@ -219,8 +316,7 @@ func (pm *PeerManager) SelectPeer(proto protocol.ID, specificPeers []peer.ID, lo //Try to fetch from serviceSlot peerIDs, ok := pm.serviceSlots[proto] if ok || len(peerIDs) > 0 { - pm.logger.Info("Got peer from service slots", logging.HostID("peer", peerIDs[0])) - return peerIDs[0], nil + filteredPeers = peerIDs } return utils.SelectRandomPeer(filteredPeers, pm.logger) diff --git a/waku/v2/peermanager/test/peer_manager_test.go b/waku/v2/peermanager/test/peer_manager_test.go index c10abfad..bd8c71b4 100644 --- a/waku/v2/peermanager/test/peer_manager_test.go +++ b/waku/v2/peermanager/test/peer_manager_test.go @@ -49,6 +49,12 @@ func TestServiceSlots(t *testing.T) { peerId, err = pm.SelectPeer(protocol, nil, utils.Logger()) require.NoError(t, err) + if peerId == h2.ID() || peerId == h1.ID() { + //Test success + t.Log("Random peer selection per protocol successful") + } else { + t.FailNow() + } require.Equal(t, peerId, h2.ID()) h1.Peerstore().AddAddrs(h3.ID(), h3.Network().ListenAddresses(), peerstore.PermanentAddrTTL) @@ -64,7 +70,12 @@ func TestServiceSlots(t *testing.T) { //Test peer selection from first added peer to serviceSlot peerId, err = pm.SelectPeer(protocol, nil, utils.Logger()) require.NoError(t, err) - require.Equal(t, peerId, h2.ID()) + if peerId == h2.ID() || peerId == h3.ID() { + //Test success + t.Log("Random peer selection per protocol successful") + } else { + t.FailNow() + } //Test peer selection for specific protocol peerId, err = pm.SelectPeer(protocol1, nil, utils.Logger()) diff --git a/waku/v2/peerstore/waku_peer_store.go b/waku/v2/peerstore/waku_peer_store.go index e1bf4568..e62364bd 100644 --- a/waku/v2/peerstore/waku_peer_store.go +++ b/waku/v2/peerstore/waku_peer_store.go @@ -51,7 +51,6 @@ type WakuPeerstore interface { SetDirection(p peer.ID, direction network.Direction) error Direction(p peer.ID) (network.Direction, error) - GroupPeersByDirection() (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) } // NewWakuPeerstore creates a new WakuPeerStore object @@ -140,19 +139,3 @@ func (ps *WakuPeerstoreImpl) Direction(p peer.ID) (network.Direction, error) { return result.(network.Direction), nil } - -// GroupPeersByDirection returns all the peers in peer store grouped by Inbound or outBound direction -func (ps *WakuPeerstoreImpl) GroupPeersByDirection() (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) { - - for _, p := range ps.Peers() { - direction, err := ps.Direction(p) - if err == nil { - if direction == network.DirInbound { - inPeers = append(inPeers, p) - } else if direction == network.DirOutbound { - outPeers = append(outPeers, p) - } - } - } - return inPeers, outPeers, nil -}