fix: filter stats mismatch and add bad peer check for light mode (#1241)

This commit is contained in:
Prem Chaitanya Prathi 2024-10-28 11:46:40 +05:30 committed by GitHub
parent fdb3c3d0b3
commit c78b09d4ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 25 additions and 1 deletions

View File

@ -230,6 +230,7 @@ func (mgr *FilterManager) UnsubscribeFilter(filterID string) {
} }
if len(af.sub.ContentFilter.ContentTopics) == 0 { if len(af.sub.ContentFilter.ContentTopics) == 0 {
af.cancel() af.cancel()
delete(mgr.filterSubscriptions, filterConfig.ID)
} else { } else {
go af.sub.Unsubscribe(filterConfig.contentFilter) go af.sub.Unsubscribe(filterConfig.contentFilter)
} }

View File

@ -101,7 +101,9 @@ const (
const maxFailedAttempts = 5 const maxFailedAttempts = 5
const prunePeerStoreInterval = 10 * time.Minute const prunePeerStoreInterval = 10 * time.Minute
const peerConnectivityLoopSecs = 15 const peerConnectivityLoopSecs = 15
const maxConnsToPeerRatio = 5 const maxConnsToPeerRatio = 3
const badPeersCleanupInterval = 1 * time.Minute
const maxDialFailures = 2
// 80% relay peers 20% service peers // 80% relay peers 20% service peers
func relayAndServicePeers(maxConnections int) (int, int) { func relayAndServicePeers(maxConnections int) (int, int) {
@ -256,16 +258,32 @@ func (pm *PeerManager) Start(ctx context.Context) {
} }
} }
func (pm *PeerManager) removeBadPeers() {
if !pm.RelayEnabled {
for _, peerID := range pm.host.Peerstore().Peers() {
if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures {
//delete peer from peerStore
pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
pm.RemovePeer(peerID)
}
}
}
}
func (pm *PeerManager) peerStoreLoop(ctx context.Context) { func (pm *PeerManager) peerStoreLoop(ctx context.Context) {
defer utils.LogOnPanic() defer utils.LogOnPanic()
t := time.NewTicker(prunePeerStoreInterval) t := time.NewTicker(prunePeerStoreInterval)
t1 := time.NewTicker(badPeersCleanupInterval)
defer t.Stop() defer t.Stop()
defer t1.Stop()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-t.C: case <-t.C:
pm.prunePeerStore() pm.prunePeerStore()
case <-t1.C:
pm.removeBadPeers()
} }
} }
} }
@ -744,4 +762,9 @@ func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) {
pm.logger.Error("failed to emit DialError", zap.Error(emitterErr)) pm.logger.Error("failed to emit DialError", zap.Error(emitterErr))
} }
} }
if !pm.RelayEnabled && pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) >= maxDialFailures {
//delete peer from peerStore
pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
pm.RemovePeer(peerID)
}
} }