fix_: limit the maximum number of message hashes by query hash
This commit is contained in:
parent
d4c6734a44
commit
1915ab9000
|
@ -5,6 +5,7 @@ import (
|
|||
"encoding/hex"
|
||||
"errors"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
|
@ -236,44 +237,63 @@ func (w *Waku) fetchMessagesBatch(missingHistoryRequest TopicInterest, batchFrom
|
|||
return nil
|
||||
}
|
||||
|
||||
result, err = w.storeQueryWithRetry(missingHistoryRequest.ctx, func(ctx context.Context) (*store.Result, error) {
|
||||
return w.node.Store().QueryByHash(ctx, missingHashes, store.WithPeer(missingHistoryRequest.peerID), store.WithPaging(false, 100))
|
||||
}, logger, "retrieving missing messages")
|
||||
if err != nil {
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
logger.Error("storenode not available", zap.Error(err))
|
||||
wg := sync.WaitGroup{}
|
||||
// Split into batches
|
||||
for i := 0; i < len(missingHashes); i += maxHashQueryLength {
|
||||
j := i + maxHashQueryLength
|
||||
if j > len(missingHashes) {
|
||||
j = len(missingHashes)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
for !result.IsComplete() {
|
||||
for _, mkv := range result.Messages() {
|
||||
envelope := protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic())
|
||||
w.logger.Info("received waku2 store message",
|
||||
zap.Stringer("envelopeHash", envelope.Hash()),
|
||||
zap.String("pubsubTopic", mkv.GetPubsubTopic()),
|
||||
zap.Int64p("timestamp", envelope.Message().Timestamp),
|
||||
)
|
||||
wg.Add(1)
|
||||
go func(messageHashes []pb.MessageHash) {
|
||||
defer wg.Wait()
|
||||
|
||||
err = w.OnNewEnvelopes(envelope, common.StoreMessageType, false)
|
||||
result, err = w.storeQueryWithRetry(missingHistoryRequest.ctx, func(ctx context.Context) (*store.Result, error) {
|
||||
return w.node.Store().QueryByHash(ctx, messageHashes, store.WithPeer(missingHistoryRequest.peerID), store.WithPaging(false, maxHashQueryLength))
|
||||
}, logger, "retrieving missing messages")
|
||||
if err != nil {
|
||||
return err
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
logger.Error("storenode not available", zap.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
result, err = w.storeQueryWithRetry(missingHistoryRequest.ctx, func(ctx context.Context) (*store.Result, error) {
|
||||
if err = result.Next(ctx); err != nil {
|
||||
return nil, err
|
||||
for !result.IsComplete() {
|
||||
for _, mkv := range result.Messages() {
|
||||
envelope := protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic())
|
||||
w.logger.Info("received waku2 store message",
|
||||
zap.Stringer("envelopeHash", envelope.Hash()),
|
||||
zap.String("pubsubTopic", mkv.GetPubsubTopic()),
|
||||
zap.Int64p("timestamp", envelope.Message().Timestamp),
|
||||
)
|
||||
|
||||
err = w.OnNewEnvelopes(envelope, common.StoreMessageType, false)
|
||||
if err != nil {
|
||||
logger.Error("could not process envelopes", zap.Error(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
result, err = w.storeQueryWithRetry(missingHistoryRequest.ctx, func(ctx context.Context) (*store.Result, error) {
|
||||
if err = result.Next(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page")
|
||||
if err != nil {
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
logger.Error("storenode not available", zap.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page")
|
||||
if err != nil {
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
logger.Error("storenode not available", zap.Error(err))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
}(missingHashes[i:j])
|
||||
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -84,7 +84,7 @@ const requestTimeout = 30 * time.Second
|
|||
const bootnodesQueryBackoffMs = 200
|
||||
const bootnodesMaxRetries = 7
|
||||
const cacheTTL = 20 * time.Minute
|
||||
const maxHashQueryLength = 100
|
||||
const maxHashQueryLength = 50
|
||||
const hashQueryInterval = 3 * time.Second
|
||||
const messageSentPeriod = 3 // in seconds
|
||||
const messageExpiredPerid = 10 // in seconds
|
||||
|
|
Loading…
Reference in New Issue