From 0c3ceeff1c46a365bc9b5579478c909a8dbbe012 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 7 Aug 2024 21:11:01 -0400 Subject: [PATCH] fix: mark message as found --- cmd/storemsgcounter/execute.go | 22 +++++++++++++--------- internal/persistence/database.go | 8 ++++++-- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/cmd/storemsgcounter/execute.go b/cmd/storemsgcounter/execute.go index b2f6281..27d2ba3 100644 --- a/cmd/storemsgcounter/execute.go +++ b/cmd/storemsgcounter/execute.go @@ -177,13 +177,13 @@ func Execute(ctx context.Context, options Options) error { runIdLogger := logger.With(zap.String("syncRunId", runId)) runIdLogger.Info("rechecking missing messages status") - err := application.checkMissingMessageStatus(ctx, runId, runIdLogger) + err := application.checkMissingMessageStatus(ctx, storenodeIDs, runId, runIdLogger) if err != nil { logger.Error("could not recheck the status of missing messages", zap.Error(err)) return } - err = application.countMissingMessages() + err = application.countMissingMessages(storenodeIDs) if err != nil { logger.Error("could not count missing messages", zap.Error(err)) return @@ -338,7 +338,7 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno return nil } -func (app *Application) checkMissingMessageStatus(ctx context.Context, runId string, logger *zap.Logger) error { +func (app *Application) checkMissingMessageStatus(ctx context.Context, storenodes []peer.ID, runId string, logger *zap.Logger) error { now := app.node.Timesource().Now() // Get all messages whose status is missing or does not exist, and the column found_on_recheck is false @@ -349,7 +349,8 @@ func (app *Application) checkMissingMessageStatus(ctx context.Context, runId str } wg := sync.WaitGroup{} - for storenodeID, messageHashes := range missingMessages { + + for _, storenodeID := range storenodes { wg.Add(1) go func(peerID peer.ID, messageHashes []pb.MessageHash) { defer wg.Done() @@ -369,15 +370,14 @@ func (app *Application) checkMissingMessageStatus(ctx context.Context, runId str app.metrics.RecordMissingMessagesPrevHour(peerID, len(messageHashes)-len(foundMissingMessages)) - }(storenodeID, messageHashes) + }(storenodeID, missingMessages[storenodeID]) } - wg.Wait() return nil } -func (app *Application) countMissingMessages() error { +func (app *Application) countMissingMessages(storenodes []peer.ID) error { // not including last two hours in now to let sync work now := app.node.Timesource().Now().Add(-2 * time.Hour) @@ -396,8 +396,8 @@ func (app *Application) countMissingMessages() error { if err != nil { return err } - for storenode, cnt := range results { - app.metrics.RecordMissingMessagesLastWeek(storenode, cnt) + for _, storenodeID := range storenodes { + app.metrics.RecordMissingMessagesLastWeek(storenodeID, results[storenodeID]) } return nil } @@ -539,6 +539,10 @@ func (app *Application) verifyMessageExistence(ctx context.Context, runId string var result *store.Result var err error + if len(messageHashes) == 0 { + return + } + peerInfo := app.node.Host().Peerstore().PeerInfo(peerID) queryLogger := logger.With(zap.Stringer("storenode", peerID)) diff --git a/internal/persistence/database.go b/internal/persistence/database.go index 61236a9..5ab613e 100644 --- a/internal/persistence/database.go +++ b/internal/persistence/database.go @@ -240,7 +240,7 @@ func (d *DBStore) GetTopicSyncStatus(ctx context.Context, clusterID uint, pubsub } func (d *DBStore) GetMissingMessages(from time.Time, to time.Time, clusterID uint) (map[peer.ID][]pb.MessageHash, error) { - rows, err := d.db.Query("SELECT messageHash, storenode FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist'", from.UnixNano(), to.UnixNano(), clusterID) + rows, err := d.db.Query("SELECT messageHash, storenode FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' AND fundOnRecheck = false", from.UnixNano(), to.UnixNano(), clusterID) if err != nil { return nil, err } @@ -307,6 +307,10 @@ func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, } func (d *DBStore) MarkMessagesAsFound(peerID peer.ID, messageHashes []pb.MessageHash, clusterID uint) error { + if len(messageHashes) == 0 { + return nil + } + query := "UPDATE missingMessages SET foundOnRecheck = true WHERE clusterID = $1 AND messageHash IN (" for i := range messageHashes { if i > 0 { @@ -343,7 +347,7 @@ func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode peer.ID) err } func (d *DBStore) CountMissingMessages(from time.Time, to time.Time, clusterID uint) (map[peer.ID]int, error) { - rows, err := d.db.Query("SELECT storenode, count(1) as cnt FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' GROUP BY storenode", from.UnixNano(), to.UnixNano(), clusterID) + rows, err := d.db.Query("SELECT storenode, count(1) as cnt FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' AND foundOnRecheck = false GROUP BY storenode", from.UnixNano(), to.UnixNano(), clusterID) if err != nil { return nil, err }