diff --git a/waku/v2/api/common/utils.go b/waku/v2/api/common/utils.go new file mode 100644 index 00000000..506d6814 --- /dev/null +++ b/waku/v2/api/common/utils.go @@ -0,0 +1,5 @@ +package common + +import "time" + +const DefaultStoreQueryTimeout = 30 * time.Second diff --git a/waku/v2/api/missing/missing_messages.go b/waku/v2/api/missing/missing_messages.go index 6de6e760..1b3f782f 100644 --- a/waku/v2/api/missing/missing_messages.go +++ b/waku/v2/api/missing/missing_messages.go @@ -261,7 +261,9 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, defer wg.Wait() result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { - return m.store.QueryByHash(ctx, messageHashes, store.WithPeer(interest.peerID), store.WithPaging(false, maxMsgHashesPerRequest)) + queryCtx, cancel := context.WithTimeout(ctx, m.params.storeQueryTimeout) + defer cancel() + return m.store.QueryByHash(queryCtx, messageHashes, store.WithPeer(interest.peerID), store.WithPaging(false, maxMsgHashesPerRequest)) }, logger, "retrieving missing messages") if err != nil { if !errors.Is(err, context.Canceled) { diff --git a/waku/v2/api/missing/options.go b/waku/v2/api/missing/options.go index b16abbc7..ebd53e71 100644 --- a/waku/v2/api/missing/options.go +++ b/waku/v2/api/missing/options.go @@ -1,11 +1,16 @@ package missing -import "time" +import ( + "time" + + "github.com/waku-org/go-waku/waku/v2/api/common" +) type missingMessageVerifierParams struct { delay time.Duration interval time.Duration maxAttemptsToRetrieveHistory int + storeQueryTimeout time.Duration } // MissingMessageVerifierOption is an option that can be used to customize the MissingMessageVerifier behavior @@ -32,8 +37,16 @@ func WithMaxRetryAttempts(max int) MissingMessageVerifierOption { } } +// WithStoreQueryTimeout sets the timeout for store query +func WithStoreQueryTimeout(timeout time.Duration) MissingMessageVerifierOption { + return func(params *missingMessageVerifierParams) { + params.storeQueryTimeout = timeout + } +} + var defaultMissingMessagesVerifierOptions = []MissingMessageVerifierOption{ WithVerificationInterval(time.Minute), WithDelay(20 * time.Second), WithMaxRetryAttempts(3), + WithStoreQueryTimeout(common.DefaultStoreQueryTimeout), } diff --git a/waku/v2/api/publish/message_check.go b/waku/v2/api/publish/message_check.go index be22abaa..67a67c91 100644 --- a/waku/v2/api/publish/message_check.go +++ b/waku/v2/api/publish/message_check.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/libp2p/go-libp2p/core/peer" + apicommon "github.com/waku-org/go-waku/waku/v2/api/common" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" @@ -47,6 +48,7 @@ type MessageSentCheck struct { hashQueryInterval time.Duration messageSentPeriod uint32 messageExpiredPerid uint32 + storeQueryTimeout time.Duration } // NewMessageSentCheck creates a new instance of MessageSentCheck with default parameters @@ -64,6 +66,7 @@ func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource hashQueryInterval: DefaultHashQueryInterval, messageSentPeriod: DefaultMessageSentPeriod, messageExpiredPerid: DefaultMessageExpiredPerid, + storeQueryTimeout: apicommon.DefaultStoreQueryTimeout, } } @@ -99,6 +102,14 @@ func WithMessageExpiredPerid(period uint32) MessageSentCheckOption { } } +// WithStoreQueryTimeout sets the timeout for store query +func WithStoreQueryTimeout(timeout time.Duration) MessageSentCheckOption { + return func(params *MessageSentCheck) error { + params.storeQueryTimeout = timeout + return nil + } +} + // Add adds a message for message sent check func (m *MessageSentCheck) Add(topic string, messageID common.Hash, sentTime uint32) { m.messageIDsMu.Lock() @@ -218,7 +229,9 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Stringers("messageHashes", messageHashes)) - result, err := m.store.QueryByHash(ctx, messageHashes, opts...) + queryCtx, cancel := context.WithTimeout(ctx, m.storeQueryTimeout) + defer cancel() + result, err := m.store.QueryByHash(queryCtx, messageHashes, opts...) if err != nil { m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Error(err)) return []common.Hash{}