diff --git a/.gitignore b/.gitignore index 1fa1560a..6881707b 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ rlnKeystore.json test_onchain.json *.bkp *.log +.vscode # sqlite db *.db diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 747109ff..f9dc443f 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -416,9 +416,9 @@ func (w *WakuNode) Start(ctx context.Context) error { if err != nil { return err } - w.peermanager.Start(ctx) w.registerAndMonitorReachability(ctx) } + w.peermanager.Start(ctx) w.legacyStore = w.storeFactory(w) w.legacyStore.SetHost(host) @@ -752,7 +752,7 @@ func (w *WakuNode) DialPeerWithInfo(ctx context.Context, peerInfo peer.AddrInfo) func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error { err := w.host.Connect(ctx, info) if err != nil { - w.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(info) + w.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(info.ID) return err } @@ -770,7 +770,7 @@ func (w *WakuNode) connect(ctx context.Context, info peer.AddrInfo) error { } } - w.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(info) + w.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(info.ID) w.metrics.RecordDial() diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index ca58c4b5..bd844b20 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -279,8 +279,9 @@ func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) { err := c.host.Connect(ctx, pi) if err != nil && !errors.Is(err, context.Canceled) { c.addConnectionBackoff(pi.ID) - c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi) + c.host.Peerstore().(wps.WakuPeerstore).AddConnFailure(pi.ID) c.logger.Warn("connecting to peer", logging.HostID("peerID", pi.ID), zap.Error(err)) } + c.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(pi.ID) <-sem } diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 1441f7f4..e12d3b24 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -101,6 +101,8 @@ const ( // some protocol var ErrNoPeersAvailable = errors.New("no suitable peers found") +const maxFailedAttempts = 5 +const prunePeerStoreInterval = 10 * time.Minute const peerConnectivityLoopSecs = 15 const maxConnsToPeerRatio = 5 @@ -234,13 +236,115 @@ func (pm *PeerManager) SetPeerConnector(pc *PeerConnectionStrategy) { // Start starts the processing to be done by peer manager. func (pm *PeerManager) Start(ctx context.Context) { - pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) - pm.ctx = ctx - if pm.sub != nil && pm.RelayEnabled { - go pm.peerEventLoop(ctx) + if pm.RelayEnabled { + pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) + if pm.sub != nil { + go pm.peerEventLoop(ctx) + } + go pm.connectivityLoop(ctx) } - go pm.connectivityLoop(ctx) + go pm.peerStoreLoop(ctx) +} + +func (pm *PeerManager) peerStoreLoop(ctx context.Context) { + t := time.NewTicker(prunePeerStoreInterval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + pm.prunePeerStore() + } + } +} + +func (pm *PeerManager) prunePeerStore() { + peers := pm.host.Peerstore().Peers() + numPeers := len(peers) + if numPeers < pm.maxPeers { + pm.logger.Debug("peerstore size within capacity, not pruning", zap.Int("capacity", pm.maxPeers), zap.Int("numPeers", numPeers)) + return + } + peerCntBeforePruning := numPeers + pm.logger.Debug("peerstore capacity exceeded, hence pruning", zap.Int("capacity", pm.maxPeers), zap.Int("numPeers", peerCntBeforePruning)) + + for _, peerID := range peers { + connFailues := pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) + if connFailues > maxFailedAttempts { + // safety check so that we don't end up disconnecting connected peers. + if pm.host.Network().Connectedness(peerID) == network.Connected { + pm.host.Peerstore().(wps.WakuPeerstore).ResetConnFailures(peerID) + continue + } + pm.host.Peerstore().RemovePeer(peerID) + numPeers-- + } + if numPeers < pm.maxPeers { + pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers)) + return + } + } + + notConnectedPeers := pm.getPeersBasedOnconnectionStatus("", network.NotConnected) + peersByTopic := make(map[string]peer.IDSlice) + var prunedPeers peer.IDSlice + + //prune not connected peers without shard + for _, peerID := range notConnectedPeers { + topics, err := pm.host.Peerstore().(wps.WakuPeerstore).PubSubTopics(peerID) + //Prune peers without pubsubtopics. + if err != nil || len(topics) == 0 { + if err != nil { + pm.logger.Error("pruning:failed to fetch pubsub topics", zap.Error(err), zap.Stringer("peer", peerID)) + } + prunedPeers = append(prunedPeers, peerID) + pm.host.Peerstore().RemovePeer(peerID) + numPeers-- + } else { + prunedPeers = append(prunedPeers, peerID) + for topic := range topics { + peersByTopic[topic] = append(peersByTopic[topic], peerID) + } + } + if numPeers < pm.maxPeers { + pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers), zap.Stringers("prunedPeers", prunedPeers)) + return + } + } + pm.logger.Debug("pruned notconnected peers", zap.Stringers("prunedPeers", prunedPeers)) + + // calculate the avg peers per shard + total, maxPeerCnt := 0, 0 + for _, peersInTopic := range peersByTopic { + peerLen := len(peersInTopic) + total += peerLen + if peerLen > maxPeerCnt { + maxPeerCnt = peerLen + } + } + avgPerTopic := min(1, total/maxPeerCnt) + // prune peers from shard with higher than avg count + + for topic, peers := range peersByTopic { + count := max(len(peers)-avgPerTopic, 0) + var prunedPeers peer.IDSlice + for i, pID := range peers { + if i > count { + break + } + prunedPeers = append(prunedPeers, pID) + pm.host.Peerstore().RemovePeer(pID) + numPeers-- + if numPeers < pm.maxPeers { + pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers), zap.Stringers("prunedPeers", prunedPeers)) + return + } + } + pm.logger.Debug("pruned peers higher than average", zap.Stringers("prunedPeers", prunedPeers), zap.String("topic", topic)) + } + pm.logger.Debug("finished pruning peer store", zap.Int("capacity", pm.maxPeers), zap.Int("beforeNumPeers", peerCntBeforePruning), zap.Int("afterNumPeers", numPeers)) } // This is a connectivity loop, which currently checks and prunes inbound connections. @@ -444,11 +548,6 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID { // AddDiscoveredPeer to add dynamically discovered peers. // Note that these peers will not be set in service-slots. func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { - //Doing this check again inside addPeer, in order to avoid additional complexity of rollingBack other changes. - if pm.maxPeers <= pm.host.Peerstore().Peers().Len() { - return - } - //Check if the peer is already present, if so skip adding _, err := pm.host.Peerstore().(wps.WakuPeerstore).Origin(p.AddrInfo.ID) if err == nil { @@ -503,10 +602,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { // addPeer adds peer to the peerStore. // It also sets additional metadata such as origin and supported protocols func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) error { - if pm.maxPeers <= pm.host.Peerstore().Peers().Len() { - pm.logger.Error("could not add peer as peer store capacity is reached", zap.Stringer("peer", ID), zap.Int("capacity", pm.maxPeers)) - return errors.New("peer store capacity reached") - } + pm.logger.Info("adding peer to peerstore", zap.Stringer("peer", ID)) if origin == wps.Static { pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.PermanentAddrTTL) diff --git a/waku/v2/peerstore/waku_peer_store.go b/waku/v2/peerstore/waku_peer_store.go index 203b2310..af42a28a 100644 --- a/waku/v2/peerstore/waku_peer_store.go +++ b/waku/v2/peerstore/waku_peer_store.go @@ -51,9 +51,9 @@ type WakuPeerstore interface { PeersByOrigin(origin Origin) peer.IDSlice SetENR(p peer.ID, enr *enode.Node) error ENR(p peer.ID) (*enode.Node, error) - AddConnFailure(p peer.AddrInfo) - ResetConnFailures(p peer.AddrInfo) - ConnFailures(p peer.AddrInfo) int + AddConnFailure(pID peer.ID) + ResetConnFailures(pID peer.ID) + ConnFailures(pID peer.ID) int SetDirection(p peer.ID, direction network.Direction) error Direction(p peer.ID) (network.Direction, error) @@ -136,24 +136,24 @@ func (ps *WakuPeerstoreImpl) ENR(p peer.ID) (*enode.Node, error) { } // AddConnFailure increments connectionFailures for a peer -func (ps *WakuPeerstoreImpl) AddConnFailure(p peer.AddrInfo) { +func (ps *WakuPeerstoreImpl) AddConnFailure(pID peer.ID) { ps.connFailures.Lock() defer ps.connFailures.Unlock() - ps.connFailures.failures[p.ID]++ + ps.connFailures.failures[pID]++ } // ResetConnFailures resets connectionFailures for a peer to 0 -func (ps *WakuPeerstoreImpl) ResetConnFailures(p peer.AddrInfo) { +func (ps *WakuPeerstoreImpl) ResetConnFailures(pID peer.ID) { ps.connFailures.Lock() defer ps.connFailures.Unlock() - ps.connFailures.failures[p.ID] = 0 + ps.connFailures.failures[pID] = 0 } // ConnFailures fetches connectionFailures for a peer -func (ps *WakuPeerstoreImpl) ConnFailures(p peer.AddrInfo) int { +func (ps *WakuPeerstoreImpl) ConnFailures(pID peer.ID) int { ps.connFailures.RLock() defer ps.connFailures.RUnlock() - return ps.connFailures.failures[p.ID] + return ps.connFailures.failures[pID] } // SetDirection sets connection direction for a specific peer. diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index c52c9098..a9d2b496 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -246,7 +246,7 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, if err != nil { wf.metrics.RecordError(dialFailure) if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peer.AddrInfo{ID: peerID}) + ps.AddConnFailure(peerID) } return err } diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index 3d898f89..9a2e25d6 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -275,7 +275,7 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logge } else { wf.metrics.RecordError(dialFailure) if ps, ok := wf.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peer.AddrInfo{ID: peerID}) + ps.AddConnFailure(peerID) } } logger.Error("opening peer stream", zap.Error(err)) diff --git a/waku/v2/protocol/legacy_store/waku_store_client.go b/waku/v2/protocol/legacy_store/waku_store_client.go index 456dada5..03f7c9b2 100644 --- a/waku/v2/protocol/legacy_store/waku_store_client.go +++ b/waku/v2/protocol/legacy_store/waku_store_client.go @@ -208,7 +208,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, historyRequest *pb.Histor logger.Error("creating stream to peer", zap.Error(err)) store.metrics.RecordError(dialFailure) if ps, ok := store.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peer.AddrInfo{ID: selectedPeer}) + ps.AddConnFailure(selectedPeer) } return nil, err } diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index c0a72c2e..8200fddf 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -198,7 +198,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p logger.Error("creating stream to peer", zap.Error(err)) wakuLP.metrics.RecordError(dialFailure) if ps, ok := wakuLP.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peer.AddrInfo{ID: peerID}) + ps.AddConnFailure(peerID) } return nil, err } diff --git a/waku/v2/protocol/metadata/waku_metadata.go b/waku/v2/protocol/metadata/waku_metadata.go index 93a70a2b..47cf088d 100644 --- a/waku/v2/protocol/metadata/waku_metadata.go +++ b/waku/v2/protocol/metadata/waku_metadata.go @@ -105,7 +105,7 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*pb.Wak if err != nil { logger.Error("creating stream to peer", zap.Error(err)) if ps, ok := wakuM.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peer.AddrInfo{ID: peerID}) + ps.AddConnFailure(peerID) } return nil, err } diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index 6baf3095..8075e2de 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -77,7 +77,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts stream, err := wakuPX.h.NewStream(ctx, params.selectedPeer, PeerExchangeID_v20alpha1) if err != nil { if ps, ok := wakuPX.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peer.AddrInfo{ID: params.selectedPeer}) + ps.AddConnFailure(params.selectedPeer) } return err } diff --git a/waku/v2/protocol/store/client.go b/waku/v2/protocol/store/client.go index 5cda4eef..92c47ff4 100644 --- a/waku/v2/protocol/store/client.go +++ b/waku/v2/protocol/store/client.go @@ -254,7 +254,7 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe if err != nil { logger.Error("creating stream to peer", zap.Error(err)) if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(peer.AddrInfo{ID: selectedPeer}) + ps.AddConnFailure(selectedPeer) } return nil, err }