fix: simple backoff strategy after 3 subscribe failures (#1238)

This commit is contained in:
Prem Chaitanya Prathi 2024-10-04 11:10:19 +05:30 committed by GitHub
parent 15b4aee808
commit 0ed94ce0b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 40 additions and 22 deletions

View File

@ -3,10 +3,12 @@ package filter
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/onlinechecker"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/filter"
@ -29,6 +31,7 @@ func (fc FilterConfig) String() string {
} }
const filterSubLoopInterval = 5 * time.Second const filterSubLoopInterval = 5 * time.Second
const filterSubMaxErrCnt = 3
type Sub struct { type Sub struct {
ContentFilter protocol.ContentFilter ContentFilter protocol.ContentFilter
@ -43,6 +46,7 @@ type Sub struct {
onlineChecker onlinechecker.OnlineChecker onlineChecker onlinechecker.OnlineChecker
resubscribeInProgress bool resubscribeInProgress bool
id string id string
errcnt int
} }
type subscribeParameters struct { type subscribeParameters struct {
@ -107,13 +111,14 @@ func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) {
} }
} }
func (apiSub *Sub) subscriptionLoop(batchInterval time.Duration) { func (apiSub *Sub) subscriptionLoop(loopInterval time.Duration) {
defer utils.LogOnPanic() defer utils.LogOnPanic()
ticker := time.NewTicker(batchInterval) ticker := time.NewTicker(loopInterval)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
apiSub.errcnt = 0 //reset errorCount
if apiSub.onlineChecker.IsOnline() && len(apiSub.subs) < apiSub.Config.MaxPeers && if apiSub.onlineChecker.IsOnline() && len(apiSub.subs) < apiSub.Config.MaxPeers &&
!apiSub.resubscribeInProgress && len(apiSub.closing) < apiSub.Config.MaxPeers { !apiSub.resubscribeInProgress && len(apiSub.closing) < apiSub.Config.MaxPeers {
apiSub.closing <- "" apiSub.closing <- ""
@ -123,11 +128,13 @@ func (apiSub *Sub) subscriptionLoop(batchInterval time.Duration) {
apiSub.cleanup() apiSub.cleanup()
return return
case subId := <-apiSub.closing: case subId := <-apiSub.closing:
if apiSub.errcnt < filterSubMaxErrCnt {
apiSub.resubscribeInProgress = true apiSub.resubscribeInProgress = true
//trigger resubscribe flow for subscription. //trigger resubscribe flow for subscription.
apiSub.checkAndResubscribe(subId) apiSub.checkAndResubscribe(subId)
} }
} }
}
} }
func (apiSub *Sub) checkAndResubscribe(subId string) { func (apiSub *Sub) checkAndResubscribe(subId string) {
@ -181,6 +188,10 @@ func (apiSub *Sub) resubscribe(failedPeer peer.ID) {
apiSub.multiplex(subs) apiSub.multiplex(subs)
} }
func possibleRecursiveError(err error) bool {
return errors.Is(err, utils.ErrNoPeersAvailable) || errors.Is(err, swarm.ErrDialBackoff)
}
func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int, peersToExclude ...peer.ID) ([]*subscription.SubscriptionDetails, error) { func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int, peersToExclude ...peer.ID) ([]*subscription.SubscriptionDetails, error) {
// Low-level subscribe, returns a set of SubscriptionDetails // Low-level subscribe, returns a set of SubscriptionDetails
options := make([]filter.FilterSubscribeOption, 0) options := make([]filter.FilterSubscribeOption, 0)
@ -195,6 +206,9 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int
subs, err := apiSub.wf.Subscribe(apiSub.ctx, contentFilter, options...) subs, err := apiSub.wf.Subscribe(apiSub.ctx, contentFilter, options...)
if err != nil { if err != nil {
if possibleRecursiveError(err) {
apiSub.errcnt++
}
//Inform of error, so that resubscribe can be triggered if required //Inform of error, so that resubscribe can be triggered if required
if len(apiSub.closing) < apiSub.Config.MaxPeers { if len(apiSub.closing) < apiSub.Config.MaxPeers {
apiSub.closing <- "" apiSub.closing <- ""

View File

@ -139,7 +139,7 @@ func (r *FastestPeerSelector) FastestPeer(ctx context.Context, peers peer.IDSlic
} }
} }
return "", ErrNoPeersAvailable return "", utils.ErrNoPeersAvailable
} }
type pingResult struct { type pingResult struct {

View File

@ -34,13 +34,13 @@ func TestRTT(t *testing.T) {
h3.Close() h3.Close()
_, err = rtt.FastestPeer(ctx, peer.IDSlice{h3.ID()}) _, err = rtt.FastestPeer(ctx, peer.IDSlice{h3.ID()})
require.ErrorIs(t, err, ErrNoPeersAvailable) require.ErrorIs(t, err, utils.ErrNoPeersAvailable)
// H3 should never return // H3 should never return
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
p, err := rtt.FastestPeer(ctx, peer.IDSlice{h2.ID(), h3.ID()}) p, err := rtt.FastestPeer(ctx, peer.IDSlice{h2.ID(), h3.ID()})
if err != nil { if err != nil {
require.ErrorIs(t, err, ErrNoPeersAvailable) require.ErrorIs(t, err, utils.ErrNoPeersAvailable)
} else { } else {
require.NotEqual(t, h3.ID(), p) require.NotEqual(t, h3.ID(), p)
} }

View File

@ -98,10 +98,6 @@ const (
LowestRTT LowestRTT
) )
// ErrNoPeersAvailable is emitted when no suitable peers are found for
// some protocol
var ErrNoPeersAvailable = errors.New("no suitable peers found")
const maxFailedAttempts = 5 const maxFailedAttempts = 5
const prunePeerStoreInterval = 10 * time.Minute const prunePeerStoreInterval = 10 * time.Minute
const peerConnectivityLoopSecs = 15 const peerConnectivityLoopSecs = 15

View File

@ -93,7 +93,7 @@ func TestServiceSlots(t *testing.T) {
defer h4.Close() defer h4.Close()
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol1}) _, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol1})
require.Error(t, err, ErrNoPeersAvailable) require.Error(t, err, utils.ErrNoPeersAvailable)
// add h4 peer for protocol1 // add h4 peer for protocol1
_, err = pm.AddPeer(tests.GetAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1)) _, err = pm.AddPeer(tests.GetAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1))
@ -138,7 +138,7 @@ func TestPeerSelection(t *testing.T) {
require.Equal(t, h2.ID(), peerIDs[0]) require.Equal(t, h2.ID(), peerIDs[0])
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}}) _, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}})
require.Error(t, ErrNoPeersAvailable, err) require.Error(t, utils.ErrNoPeersAvailable, err)
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}}) _, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
require.NoError(t, err) require.NoError(t, err)
@ -175,7 +175,7 @@ func TestDefaultProtocol(t *testing.T) {
/////////////// ///////////////
//Test empty peer selection for relay protocol //Test empty peer selection for relay protocol
_, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: relay.WakuRelayID_v200}) _, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: relay.WakuRelayID_v200})
require.Error(t, err, ErrNoPeersAvailable) require.Error(t, err, utils.ErrNoPeersAvailable)
/////////////// ///////////////
// getting peer for default protocol // getting peer for default protocol
@ -215,7 +215,7 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) {
pm.RemovePeer(peers[0]) pm.RemovePeer(peers[0])
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2}) _, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2})
require.Error(t, err, ErrNoPeersAvailable) require.Error(t, err, utils.ErrNoPeersAvailable)
} }
func TestConnectToRelayPeers(t *testing.T) { func TestConnectToRelayPeers(t *testing.T) {

View File

@ -9,6 +9,7 @@ import (
"github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/core/protocol"
wps "github.com/waku-org/go-waku/waku/v2/peerstore" wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/exp/maps" "golang.org/x/exp/maps"
) )
@ -59,7 +60,7 @@ func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlic
peerIDs, err := pm.selectServicePeer(criteria) peerIDs, err := pm.selectServicePeer(criteria)
if err == nil && len(peerIDs) == criteria.MaxPeers { if err == nil && len(peerIDs) == criteria.MaxPeers {
return maps.Keys(peerIDs), nil return maps.Keys(peerIDs), nil
} else if !errors.Is(err, ErrNoPeersAvailable) { } else if !errors.Is(err, utils.ErrNoPeersAvailable) {
pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)), pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)),
zap.Strings("pubsubTopics", criteria.PubsubTopics), zap.Error(err)) zap.Strings("pubsubTopics", criteria.PubsubTopics), zap.Error(err))
return nil, err return nil, err
@ -101,7 +102,7 @@ func getRandom(filter PeerSet, count int, excludePeers PeerSet) (PeerSet, error)
} }
} }
if len(selectedPeers) == 0 { if len(selectedPeers) == 0 {
return nil, ErrNoPeersAvailable return nil, utils.ErrNoPeersAvailable
} }
return selectedPeers, nil return selectedPeers, nil
} }
@ -157,7 +158,7 @@ func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (PeerSe
if len(peers) == 0 { if len(peers) == 0 {
pm.logger.Debug("could not retrieve random peer from slot", zap.Error(err)) pm.logger.Debug("could not retrieve random peer from slot", zap.Error(err))
} }
return peers, ErrNoPeersAvailable return peers, utils.ErrNoPeersAvailable
} }
// PeerSelectionCriteria is the selection Criteria that is used by PeerManager to select peers. // PeerSelectionCriteria is the selection Criteria that is used by PeerManager to select peers.

View File

@ -6,6 +6,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/utils"
"golang.org/x/exp/maps" "golang.org/x/exp/maps"
) )
@ -26,7 +27,7 @@ func TestServiceSlot(t *testing.T) {
slots.getPeers(protocol).remove(peerID) slots.getPeers(protocol).remove(peerID)
// //
_, err = slots.getPeers(protocol).getRandom(1, nil) _, err = slots.getPeers(protocol).getRandom(1, nil)
require.Equal(t, err, ErrNoPeersAvailable) require.Equal(t, err, utils.ErrNoPeersAvailable)
// Test with more peers // Test with more peers
peerID2 := peer.ID("peerId2") peerID2 := peer.ID("peerId2")
@ -74,7 +75,7 @@ func TestServiceSlotRemovePeerFromAll(t *testing.T) {
slots.removePeer(peerID) slots.removePeer(peerID)
// //
_, err = slots.getPeers(protocol).getRandom(1, nil) _, err = slots.getPeers(protocol).getRandom(1, nil)
require.Equal(t, err, ErrNoPeersAvailable) require.Equal(t, err, utils.ErrNoPeersAvailable)
_, err = slots.getPeers(protocol1).getRandom(1, nil) _, err = slots.getPeers(protocol1).getRandom(1, nil)
require.Equal(t, err, ErrNoPeersAvailable) require.Equal(t, err, utils.ErrNoPeersAvailable)
} }

View File

@ -1,6 +1,8 @@
package utils package utils
import ( import (
"errors"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
) )
@ -10,6 +12,10 @@ type DialError struct {
PeerID peer.ID PeerID peer.ID
} }
// ErrNoPeersAvailable is emitted when no suitable peers are found for
// some protocol
var ErrNoPeersAvailable = errors.New("no suitable peers found")
// GetPeerID is used to extract the peerID from a multiaddress // GetPeerID is used to extract the peerID from a multiaddress
func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) { func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) {
peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P) peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P)