chore: limit the max number of message hashes requested

This commit is contained in:
Richard Ramos 2024-08-09 15:10:24 -04:00
parent 51ff465587
commit 54cdbc57be
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760

View File

@ -41,6 +41,7 @@ const (
const timeInterval = 2 * time.Minute
const delay = 5 * time.Minute
const maxAttempts = 3
const maxMsgHashesPerRequest = 100
type Application struct {
node *node.WakuNode
@ -572,69 +573,85 @@ func (app *Application) verifyMessageExistence(ctx context.Context, runId string
queryLogger := logger.With(zap.Stringer("storenode", peerID))
retry := true
success := false
count := 1
for retry && count <= maxAttempts {
queryLogger.Info("querying by hash", zap.Int("attempt", count))
tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
result, err = app.node.Store().QueryByHash(tCtx, messageHashes, store.IncludeData(false), store.WithPeer(peerInfo.ID), store.WithPaging(false, 100))
cancel()
if err != nil {
queryLogger.Error("could not query storenode", zap.Error(err), zap.Int("attempt", count))
time.Sleep(2 * time.Second)
} else {
queryLogger.Info("hashes available", zap.Int("len", len(result.Messages())))
retry = false
success = true
wg := sync.WaitGroup{}
// Split into batches
for i := 0; i < len(messageHashes); i += maxMsgHashesPerRequest {
j := i + maxMsgHashesPerRequest
if j > len(messageHashes) {
j = len(messageHashes)
}
count++
}
if !success {
queryLogger.Error("storenode not available")
err := app.db.RecordStorenodeUnavailable(runId, peerID)
if err != nil {
queryLogger.Error("could not store recordnode unavailable", zap.Error(err))
}
app.metrics.RecordStorenodeAvailability(peerID, false)
return
}
wg.Add(1)
go func(messageHashes []pb.MessageHash) {
defer wg.Done()
app.metrics.RecordStorenodeAvailability(peerID, true)
for !result.IsComplete() {
onResult(result)
retry := true
success := false
count := 1
for retry && count <= maxAttempts {
queryLogger.Info("executing next while querying hashes", zap.String("cursor", hexutil.Encode(result.Cursor())), zap.Int("attempt", count))
tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
err = result.Next(tCtx)
cancel()
if err != nil {
queryLogger.Error("could not query storenode", zap.String("cursor", hexutil.Encode(result.Cursor())), zap.Error(err), zap.Int("attempt", count))
time.Sleep(2 * time.Second)
} else {
queryLogger.Info("more hashes available", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Int("len", len(result.Messages())))
retry = false
success = true
retry := true
success := false
count := 1
for retry && count <= maxAttempts {
queryLogger.Info("querying by hash", zap.Int("attempt", count))
tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
result, err = app.node.Store().QueryByHash(tCtx, messageHashes, store.IncludeData(false), store.WithPeer(peerInfo.ID), store.WithPaging(false, 100))
cancel()
if err != nil {
queryLogger.Error("could not query storenode", zap.Error(err), zap.Int("attempt", count))
time.Sleep(2 * time.Second)
} else {
queryLogger.Info("hashes available", zap.Int("len", len(result.Messages())))
retry = false
success = true
}
count++
}
count++
}
if !success {
queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor())))
err := app.db.RecordStorenodeUnavailable(runId, peerID)
if err != nil {
logger.Error("could not store recordnode unavailable", zap.Error(err), zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Stringer("storenode", peerInfo))
if !success {
queryLogger.Error("storenode not available")
err := app.db.RecordStorenodeUnavailable(runId, peerID)
if err != nil {
queryLogger.Error("could not store recordnode unavailable", zap.Error(err))
}
app.metrics.RecordStorenodeAvailability(peerID, false)
return
}
app.metrics.RecordStorenodeAvailability(peerID, false)
return
}
app.metrics.RecordStorenodeAvailability(peerID, true)
app.metrics.RecordStorenodeAvailability(peerID, true)
for !result.IsComplete() {
onResult(result)
retry := true
success := false
count := 1
for retry && count <= maxAttempts {
queryLogger.Info("executing next while querying hashes", zap.String("cursor", hexutil.Encode(result.Cursor())), zap.Int("attempt", count))
tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
err = result.Next(tCtx)
cancel()
if err != nil {
queryLogger.Error("could not query storenode", zap.String("cursor", hexutil.Encode(result.Cursor())), zap.Error(err), zap.Int("attempt", count))
time.Sleep(2 * time.Second)
} else {
queryLogger.Info("more hashes available", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Int("len", len(result.Messages())))
retry = false
success = true
}
count++
}
if !success {
queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor())))
err := app.db.RecordStorenodeUnavailable(runId, peerID)
if err != nil {
logger.Error("could not store recordnode unavailable", zap.Error(err), zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Stringer("storenode", peerInfo))
}
app.metrics.RecordStorenodeAvailability(peerID, false)
return
}
app.metrics.RecordStorenodeAvailability(peerID, true)
}
}(messageHashes[i:j])
}
wg.Wait()
}