From 54cdbc57be16737897f1bc4cdd58673a107aef13 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Fri, 9 Aug 2024 15:10:24 -0400 Subject: [PATCH] chore: limit the max number of message hashes requested --- cmd/storemsgcounter/execute.go | 131 +++++++++++++++++++-------------- 1 file changed, 74 insertions(+), 57 deletions(-) diff --git a/cmd/storemsgcounter/execute.go b/cmd/storemsgcounter/execute.go index 4df1fa5..a4f2909 100644 --- a/cmd/storemsgcounter/execute.go +++ b/cmd/storemsgcounter/execute.go @@ -41,6 +41,7 @@ const ( const timeInterval = 2 * time.Minute const delay = 5 * time.Minute const maxAttempts = 3 +const maxMsgHashesPerRequest = 100 type Application struct { node *node.WakuNode @@ -572,69 +573,85 @@ func (app *Application) verifyMessageExistence(ctx context.Context, runId string queryLogger := logger.With(zap.Stringer("storenode", peerID)) - retry := true - success := false - count := 1 - for retry && count <= maxAttempts { - queryLogger.Info("querying by hash", zap.Int("attempt", count)) - tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) - result, err = app.node.Store().QueryByHash(tCtx, messageHashes, store.IncludeData(false), store.WithPeer(peerInfo.ID), store.WithPaging(false, 100)) - cancel() - if err != nil { - queryLogger.Error("could not query storenode", zap.Error(err), zap.Int("attempt", count)) - time.Sleep(2 * time.Second) - } else { - queryLogger.Info("hashes available", zap.Int("len", len(result.Messages()))) - retry = false - success = true + wg := sync.WaitGroup{} + // Split into batches + for i := 0; i < len(messageHashes); i += maxMsgHashesPerRequest { + j := i + maxMsgHashesPerRequest + if j > len(messageHashes) { + j = len(messageHashes) } - count++ - } - if !success { - queryLogger.Error("storenode not available") - err := app.db.RecordStorenodeUnavailable(runId, peerID) - if err != nil { - queryLogger.Error("could not store recordnode unavailable", zap.Error(err)) - } - app.metrics.RecordStorenodeAvailability(peerID, false) - return - } + wg.Add(1) + go func(messageHashes []pb.MessageHash) { + defer wg.Done() - app.metrics.RecordStorenodeAvailability(peerID, true) - - for !result.IsComplete() { - onResult(result) - - retry := true - success := false - count := 1 - for retry && count <= maxAttempts { - queryLogger.Info("executing next while querying hashes", zap.String("cursor", hexutil.Encode(result.Cursor())), zap.Int("attempt", count)) - tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) - err = result.Next(tCtx) - cancel() - if err != nil { - queryLogger.Error("could not query storenode", zap.String("cursor", hexutil.Encode(result.Cursor())), zap.Error(err), zap.Int("attempt", count)) - time.Sleep(2 * time.Second) - } else { - queryLogger.Info("more hashes available", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Int("len", len(result.Messages()))) - retry = false - success = true + retry := true + success := false + count := 1 + for retry && count <= maxAttempts { + queryLogger.Info("querying by hash", zap.Int("attempt", count)) + tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) + result, err = app.node.Store().QueryByHash(tCtx, messageHashes, store.IncludeData(false), store.WithPeer(peerInfo.ID), store.WithPaging(false, 100)) + cancel() + if err != nil { + queryLogger.Error("could not query storenode", zap.Error(err), zap.Int("attempt", count)) + time.Sleep(2 * time.Second) + } else { + queryLogger.Info("hashes available", zap.Int("len", len(result.Messages()))) + retry = false + success = true + } + count++ } - count++ - } - if !success { - queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor()))) - err := app.db.RecordStorenodeUnavailable(runId, peerID) - if err != nil { - logger.Error("could not store recordnode unavailable", zap.Error(err), zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Stringer("storenode", peerInfo)) + if !success { + queryLogger.Error("storenode not available") + err := app.db.RecordStorenodeUnavailable(runId, peerID) + if err != nil { + queryLogger.Error("could not store recordnode unavailable", zap.Error(err)) + } + app.metrics.RecordStorenodeAvailability(peerID, false) + return } - app.metrics.RecordStorenodeAvailability(peerID, false) - return - } - app.metrics.RecordStorenodeAvailability(peerID, true) + app.metrics.RecordStorenodeAvailability(peerID, true) + + for !result.IsComplete() { + onResult(result) + + retry := true + success := false + count := 1 + for retry && count <= maxAttempts { + queryLogger.Info("executing next while querying hashes", zap.String("cursor", hexutil.Encode(result.Cursor())), zap.Int("attempt", count)) + tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) + err = result.Next(tCtx) + cancel() + if err != nil { + queryLogger.Error("could not query storenode", zap.String("cursor", hexutil.Encode(result.Cursor())), zap.Error(err), zap.Int("attempt", count)) + time.Sleep(2 * time.Second) + } else { + queryLogger.Info("more hashes available", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Int("len", len(result.Messages()))) + retry = false + success = true + } + count++ + } + + if !success { + queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor()))) + err := app.db.RecordStorenodeUnavailable(runId, peerID) + if err != nil { + logger.Error("could not store recordnode unavailable", zap.Error(err), zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Stringer("storenode", peerInfo)) + } + app.metrics.RecordStorenodeAvailability(peerID, false) + return + } + + app.metrics.RecordStorenodeAvailability(peerID, true) + } + }(messageHashes[i:j]) } + + wg.Wait() }