diff --git a/waku/v2/api/missing/missing_messages.go b/waku/v2/api/missing/missing_messages.go index a50e6071..8490c966 100644 --- a/waku/v2/api/missing/missing_messages.go +++ b/waku/v2/api/missing/missing_messages.go @@ -20,6 +20,7 @@ import ( ) const maxContentTopicsPerRequest = 10 +const maxMsgHashesPerRequest = 50 // MessageTracker should keep track of messages it has seen before and // provide a way to determine whether a message exists or not. This @@ -247,38 +248,55 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, return nil } - result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { - return m.store.QueryByHash(ctx, missingHashes, store.WithPeer(interest.peerID), store.WithPaging(false, 100)) - }, logger, "retrieving missing messages") - if err != nil { - if !errors.Is(err, context.Canceled) { - logger.Error("storenode not available", zap.Error(err)) - } - return err - } - - for !result.IsComplete() { - for _, mkv := range result.Messages() { - select { - case c <- protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic()): - default: - m.logger.Warn("subscriber is too slow!") - } + wg := sync.WaitGroup{} + // Split into batches + for i := 0; i < len(missingHashes); i += maxMsgHashesPerRequest { + j := i + maxMsgHashesPerRequest + if j > len(missingHashes) { + j = len(missingHashes) } - result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { - if err = result.Next(ctx); err != nil { - return nil, err + wg.Add(1) + go func(messageHashes []pb.MessageHash) { + defer wg.Wait() + + result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { + return m.store.QueryByHash(ctx, messageHashes, store.WithPeer(interest.peerID), store.WithPaging(false, maxMsgHashesPerRequest)) + }, logger, "retrieving missing messages") + if err != nil { + if !errors.Is(err, context.Canceled) { + logger.Error("storenode not available", zap.Error(err)) + } + return } - return result, nil - }, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page") - if err != nil { - if !errors.Is(err, context.Canceled) { - logger.Error("storenode not available", zap.Error(err)) + + for !result.IsComplete() { + for _, mkv := range result.Messages() { + select { + case c <- protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic()): + default: + m.logger.Warn("subscriber is too slow!") + } + } + + result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { + if err = result.Next(ctx); err != nil { + return nil, err + } + return result, nil + }, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page") + if err != nil { + if !errors.Is(err, context.Canceled) { + logger.Error("storenode not available", zap.Error(err)) + } + return + } } - return err - } + + }(missingHashes[i:j]) } + wg.Wait() + return nil } diff --git a/waku/v2/api/publish/message_check.go b/waku/v2/api/publish/message_check.go index a60a8d91..be22abaa 100644 --- a/waku/v2/api/publish/message_check.go +++ b/waku/v2/api/publish/message_check.go @@ -16,7 +16,7 @@ import ( "go.uber.org/zap" ) -const DefaultMaxHashQueryLength = 100 +const DefaultMaxHashQueryLength = 50 const DefaultHashQueryInterval = 3 * time.Second const DefaultMessageSentPeriod = 3 // in seconds const DefaultMessageExpiredPerid = 10 // in seconds @@ -216,7 +216,7 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c messageHashes[i] = pb.ToMessageHash(hash.Bytes()) } - m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Any("messageHashes", messageHashes)) + m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Stringers("messageHashes", messageHashes)) result, err := m.store.QueryByHash(ctx, messageHashes, opts...) if err != nil { @@ -248,8 +248,8 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c } } - m.logger.Debug("ack message hashes", zap.Any("ackHashes", ackHashes)) - m.logger.Debug("missed message hashes", zap.Any("missedHashes", missedHashes)) + m.logger.Debug("ack message hashes", zap.Stringers("ackHashes", ackHashes)) + m.logger.Debug("missed message hashes", zap.Stringers("missedHashes", missedHashes)) return append(ackHashes, missedHashes...) }