chore: limit the maximum number of message hashes to request per query (#1190)

This commit is contained in:
richΛrd 2024-08-10 11:13:59 -04:00 committed by GitHub
parent 92d62a7c38
commit 159635e21b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 49 additions and 31 deletions

View File

@ -20,6 +20,7 @@ import (
) )
const maxContentTopicsPerRequest = 10 const maxContentTopicsPerRequest = 10
const maxMsgHashesPerRequest = 50
// MessageTracker should keep track of messages it has seen before and // MessageTracker should keep track of messages it has seen before and
// provide a way to determine whether a message exists or not. This // provide a way to determine whether a message exists or not. This
@ -247,38 +248,55 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
return nil return nil
} }
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { wg := sync.WaitGroup{}
return m.store.QueryByHash(ctx, missingHashes, store.WithPeer(interest.peerID), store.WithPaging(false, 100)) // Split into batches
}, logger, "retrieving missing messages") for i := 0; i < len(missingHashes); i += maxMsgHashesPerRequest {
if err != nil { j := i + maxMsgHashesPerRequest
if !errors.Is(err, context.Canceled) { if j > len(missingHashes) {
logger.Error("storenode not available", zap.Error(err)) j = len(missingHashes)
}
return err
}
for !result.IsComplete() {
for _, mkv := range result.Messages() {
select {
case c <- protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic()):
default:
m.logger.Warn("subscriber is too slow!")
}
} }
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { wg.Add(1)
if err = result.Next(ctx); err != nil { go func(messageHashes []pb.MessageHash) {
return nil, err defer wg.Wait()
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
return m.store.QueryByHash(ctx, messageHashes, store.WithPeer(interest.peerID), store.WithPaging(false, maxMsgHashesPerRequest))
}, logger, "retrieving missing messages")
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("storenode not available", zap.Error(err))
}
return
} }
return result, nil
}, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page") for !result.IsComplete() {
if err != nil { for _, mkv := range result.Messages() {
if !errors.Is(err, context.Canceled) { select {
logger.Error("storenode not available", zap.Error(err)) case c <- protocol.NewEnvelope(mkv.Message, mkv.Message.GetTimestamp(), mkv.GetPubsubTopic()):
default:
m.logger.Warn("subscriber is too slow!")
}
}
result, err = m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) {
if err = result.Next(ctx); err != nil {
return nil, err
}
return result, nil
}, logger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))), "retrieving next page")
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("storenode not available", zap.Error(err))
}
return
}
} }
return err
} }(missingHashes[i:j])
} }
wg.Wait()
return nil return nil
} }

View File

@ -16,7 +16,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
const DefaultMaxHashQueryLength = 100 const DefaultMaxHashQueryLength = 50
const DefaultHashQueryInterval = 3 * time.Second const DefaultHashQueryInterval = 3 * time.Second
const DefaultMessageSentPeriod = 3 // in seconds const DefaultMessageSentPeriod = 3 // in seconds
const DefaultMessageExpiredPerid = 10 // in seconds const DefaultMessageExpiredPerid = 10 // in seconds
@ -216,7 +216,7 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c
messageHashes[i] = pb.ToMessageHash(hash.Bytes()) messageHashes[i] = pb.ToMessageHash(hash.Bytes())
} }
m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Any("messageHashes", messageHashes)) m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Stringers("messageHashes", messageHashes))
result, err := m.store.QueryByHash(ctx, messageHashes, opts...) result, err := m.store.QueryByHash(ctx, messageHashes, opts...)
if err != nil { if err != nil {
@ -248,8 +248,8 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c
} }
} }
m.logger.Debug("ack message hashes", zap.Any("ackHashes", ackHashes)) m.logger.Debug("ack message hashes", zap.Stringers("ackHashes", ackHashes))
m.logger.Debug("missed message hashes", zap.Any("missedHashes", missedHashes)) m.logger.Debug("missed message hashes", zap.Stringers("missedHashes", missedHashes))
return append(ackHashes, missedHashes...) return append(ackHashes, missedHashes...)
} }