fix: set default store hash query timeout to 30s (#1204)

This commit is contained in:
kaichao 2024-08-22 22:45:24 +08:00 committed by GitHub
parent 1472b17d39
commit a4f0cae911
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 36 additions and 3 deletions

View File

@ -0,0 +1,5 @@
package common
import "time"
const DefaultStoreQueryTimeout = 30 * time.Second

View File

@ -261,7 +261,9 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
defer wg.Wait() defer wg.Wait()
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
return m.store.QueryByHash(ctx, messageHashes, store.WithPeer(interest.peerID), store.WithPaging(false, maxMsgHashesPerRequest)) queryCtx, cancel := context.WithTimeout(ctx, m.params.storeQueryTimeout)
defer cancel()
return m.store.QueryByHash(queryCtx, messageHashes, store.WithPeer(interest.peerID), store.WithPaging(false, maxMsgHashesPerRequest))
}, logger, "retrieving missing messages") }, logger, "retrieving missing messages")
if err != nil { if err != nil {
if !errors.Is(err, context.Canceled) { if !errors.Is(err, context.Canceled) {

View File

@ -1,11 +1,16 @@
package missing package missing
import "time" import (
"time"
"github.com/waku-org/go-waku/waku/v2/api/common"
)
type missingMessageVerifierParams struct { type missingMessageVerifierParams struct {
delay time.Duration delay time.Duration
interval time.Duration interval time.Duration
maxAttemptsToRetrieveHistory int maxAttemptsToRetrieveHistory int
storeQueryTimeout time.Duration
} }
// MissingMessageVerifierOption is an option that can be used to customize the MissingMessageVerifier behavior // MissingMessageVerifierOption is an option that can be used to customize the MissingMessageVerifier behavior
@ -32,8 +37,16 @@ func WithMaxRetryAttempts(max int) MissingMessageVerifierOption {
} }
} }
// WithStoreQueryTimeout sets the timeout for store query
func WithStoreQueryTimeout(timeout time.Duration) MissingMessageVerifierOption {
return func(params *missingMessageVerifierParams) {
params.storeQueryTimeout = timeout
}
}
var defaultMissingMessagesVerifierOptions = []MissingMessageVerifierOption{ var defaultMissingMessagesVerifierOptions = []MissingMessageVerifierOption{
WithVerificationInterval(time.Minute), WithVerificationInterval(time.Minute),
WithDelay(20 * time.Second), WithDelay(20 * time.Second),
WithMaxRetryAttempts(3), WithMaxRetryAttempts(3),
WithStoreQueryTimeout(common.DefaultStoreQueryTimeout),
} }

View File

@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
apicommon "github.com/waku-org/go-waku/waku/v2/api/common"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/protocol/store"
@ -47,6 +48,7 @@ type MessageSentCheck struct {
hashQueryInterval time.Duration hashQueryInterval time.Duration
messageSentPeriod uint32 messageSentPeriod uint32
messageExpiredPerid uint32 messageExpiredPerid uint32
storeQueryTimeout time.Duration
} }
// NewMessageSentCheck creates a new instance of MessageSentCheck with default parameters // NewMessageSentCheck creates a new instance of MessageSentCheck with default parameters
@ -64,6 +66,7 @@ func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource
hashQueryInterval: DefaultHashQueryInterval, hashQueryInterval: DefaultHashQueryInterval,
messageSentPeriod: DefaultMessageSentPeriod, messageSentPeriod: DefaultMessageSentPeriod,
messageExpiredPerid: DefaultMessageExpiredPerid, messageExpiredPerid: DefaultMessageExpiredPerid,
storeQueryTimeout: apicommon.DefaultStoreQueryTimeout,
} }
} }
@ -99,6 +102,14 @@ func WithMessageExpiredPerid(period uint32) MessageSentCheckOption {
} }
} }
// WithStoreQueryTimeout sets the timeout for store query
func WithStoreQueryTimeout(timeout time.Duration) MessageSentCheckOption {
return func(params *MessageSentCheck) error {
params.storeQueryTimeout = timeout
return nil
}
}
// Add adds a message for message sent check // Add adds a message for message sent check
func (m *MessageSentCheck) Add(topic string, messageID common.Hash, sentTime uint32) { func (m *MessageSentCheck) Add(topic string, messageID common.Hash, sentTime uint32) {
m.messageIDsMu.Lock() m.messageIDsMu.Lock()
@ -218,7 +229,9 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c
m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Stringers("messageHashes", messageHashes)) m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Stringers("messageHashes", messageHashes))
result, err := m.store.QueryByHash(ctx, messageHashes, opts...) queryCtx, cancel := context.WithTimeout(ctx, m.storeQueryTimeout)
defer cancel()
result, err := m.store.QueryByHash(queryCtx, messageHashes, opts...)
if err != nil { if err != nil {
m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Error(err)) m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Error(err))
return []common.Hash{} return []common.Hash{}