diff --git a/wakuv2/missing_messages.go b/wakuv2/missing_messages.go index 976fe16f8..5ca7fac68 100644 --- a/wakuv2/missing_messages.go +++ b/wakuv2/missing_messages.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "errors" "slices" + "sync" "time" "github.com/libp2p/go-libp2p/core/peer" @@ -236,44 +237,63 @@ func (w *Waku) fetchMessagesBatch(missingHistoryRequest TopicInterest, batchFrom return nil } - result, err = w.storeQueryWithRetry(missingHistoryRequest.ctx, func(ctx context.Context) (*store.Result, error) { - return w.node.Store().QueryByHash(ctx, missingHashes, store.WithPeer(missingHistoryRequest.peerID), store.WithPaging(false, 100)) - }, logger, "retrieving missing messages") - if err != nil { - if !errors.Is(err, context.Canceled) { - logger.Error("storenode not available", zap.Error(err)) + wg := sync.WaitGroup{} + // Split into batches + for i := 0; i < len(missingHashes); i += maxHashQueryLength { + j := i + maxHashQueryLength + if j > len(missingHashes) { + j = len(missingHashes) } - return err - } - for !result.IsComplete() { - for _, mkv := range result.Messages() { - envelope := protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic()) - w.logger.Info("received waku2 store message", - zap.Stringer("envelopeHash", envelope.Hash()), - zap.String("pubsubTopic", mkv.GetPubsubTopic()), - zap.Int64p("timestamp", envelope.Message().Timestamp), - ) + wg.Add(1) + go func(messageHashes []pb.MessageHash) { + defer wg.Wait() - err = w.OnNewEnvelopes(envelope, common.StoreMessageType, false) + result, err = w.storeQueryWithRetry(missingHistoryRequest.ctx, func(ctx context.Context) (*store.Result, error) { + return w.node.Store().QueryByHash(ctx, messageHashes, store.WithPeer(missingHistoryRequest.peerID), store.WithPaging(false, maxHashQueryLength)) + }, logger, "retrieving missing messages") if err != nil { - return err + if !errors.Is(err, context.Canceled) { + logger.Error("storenode not available", zap.Error(err)) + } + return } - } - result, err = w.storeQueryWithRetry(missingHistoryRequest.ctx, func(ctx context.Context) (*store.Result, error) { - if err = result.Next(ctx); err != nil { - return nil, err + for !result.IsComplete() { + for _, mkv := range result.Messages() { + envelope := protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic()) + w.logger.Info("received waku2 store message", + zap.Stringer("envelopeHash", envelope.Hash()), + zap.String("pubsubTopic", mkv.GetPubsubTopic()), + zap.Int64p("timestamp", envelope.Message().Timestamp), + ) + + err = w.OnNewEnvelopes(envelope, common.StoreMessageType, false) + if err != nil { + logger.Error("could not process envelopes", zap.Error(err)) + return + } + } + + result, err = w.storeQueryWithRetry(missingHistoryRequest.ctx, func(ctx context.Context) (*store.Result, error) { + if err = result.Next(ctx); err != nil { + return nil, err + } + return result, nil + }, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page") + if err != nil { + if !errors.Is(err, context.Canceled) { + logger.Error("storenode not available", zap.Error(err)) + } + return + } } - return result, nil - }, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page") - if err != nil { - if !errors.Is(err, context.Canceled) { - logger.Error("storenode not available", zap.Error(err)) - } - return err - } + + }(missingHashes[i:j]) + } + wg.Wait() + return nil } diff --git a/wakuv2/waku.go b/wakuv2/waku.go index f3b3c9f74..5fcd8f146 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -84,7 +84,7 @@ const requestTimeout = 30 * time.Second const bootnodesQueryBackoffMs = 200 const bootnodesMaxRetries = 7 const cacheTTL = 20 * time.Minute -const maxHashQueryLength = 100 +const maxHashQueryLength = 50 const hashQueryInterval = 3 * time.Second const messageSentPeriod = 3 // in seconds const messageExpiredPerid = 10 // in seconds