mirror of
https://github.com/logos-messaging/storenode-messages-counter.git
synced 2026-01-03 14:43:05 +00:00
fix: mark message as found
This commit is contained in:
parent
a9d0eaf587
commit
0c3ceeff1c
@ -177,13 +177,13 @@ func Execute(ctx context.Context, options Options) error {
|
||||
runIdLogger := logger.With(zap.String("syncRunId", runId))
|
||||
runIdLogger.Info("rechecking missing messages status")
|
||||
|
||||
err := application.checkMissingMessageStatus(ctx, runId, runIdLogger)
|
||||
err := application.checkMissingMessageStatus(ctx, storenodeIDs, runId, runIdLogger)
|
||||
if err != nil {
|
||||
logger.Error("could not recheck the status of missing messages", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
err = application.countMissingMessages()
|
||||
err = application.countMissingMessages(storenodeIDs)
|
||||
if err != nil {
|
||||
logger.Error("could not count missing messages", zap.Error(err))
|
||||
return
|
||||
@ -338,7 +338,7 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno
|
||||
return nil
|
||||
}
|
||||
|
||||
func (app *Application) checkMissingMessageStatus(ctx context.Context, runId string, logger *zap.Logger) error {
|
||||
func (app *Application) checkMissingMessageStatus(ctx context.Context, storenodes []peer.ID, runId string, logger *zap.Logger) error {
|
||||
now := app.node.Timesource().Now()
|
||||
|
||||
// Get all messages whose status is missing or does not exist, and the column found_on_recheck is false
|
||||
@ -349,7 +349,8 @@ func (app *Application) checkMissingMessageStatus(ctx context.Context, runId str
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
for storenodeID, messageHashes := range missingMessages {
|
||||
|
||||
for _, storenodeID := range storenodes {
|
||||
wg.Add(1)
|
||||
go func(peerID peer.ID, messageHashes []pb.MessageHash) {
|
||||
defer wg.Done()
|
||||
@ -369,15 +370,14 @@ func (app *Application) checkMissingMessageStatus(ctx context.Context, runId str
|
||||
|
||||
app.metrics.RecordMissingMessagesPrevHour(peerID, len(messageHashes)-len(foundMissingMessages))
|
||||
|
||||
}(storenodeID, messageHashes)
|
||||
}(storenodeID, missingMessages[storenodeID])
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (app *Application) countMissingMessages() error {
|
||||
func (app *Application) countMissingMessages(storenodes []peer.ID) error {
|
||||
|
||||
// not including last two hours in now to let sync work
|
||||
now := app.node.Timesource().Now().Add(-2 * time.Hour)
|
||||
@ -396,8 +396,8 @@ func (app *Application) countMissingMessages() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for storenode, cnt := range results {
|
||||
app.metrics.RecordMissingMessagesLastWeek(storenode, cnt)
|
||||
for _, storenodeID := range storenodes {
|
||||
app.metrics.RecordMissingMessagesLastWeek(storenodeID, results[storenodeID])
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -539,6 +539,10 @@ func (app *Application) verifyMessageExistence(ctx context.Context, runId string
|
||||
var result *store.Result
|
||||
var err error
|
||||
|
||||
if len(messageHashes) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
peerInfo := app.node.Host().Peerstore().PeerInfo(peerID)
|
||||
|
||||
queryLogger := logger.With(zap.Stringer("storenode", peerID))
|
||||
|
||||
@ -240,7 +240,7 @@ func (d *DBStore) GetTopicSyncStatus(ctx context.Context, clusterID uint, pubsub
|
||||
}
|
||||
|
||||
func (d *DBStore) GetMissingMessages(from time.Time, to time.Time, clusterID uint) (map[peer.ID][]pb.MessageHash, error) {
|
||||
rows, err := d.db.Query("SELECT messageHash, storenode FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist'", from.UnixNano(), to.UnixNano(), clusterID)
|
||||
rows, err := d.db.Query("SELECT messageHash, storenode FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' AND fundOnRecheck = false", from.UnixNano(), to.UnixNano(), clusterID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -307,6 +307,10 @@ func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash,
|
||||
}
|
||||
|
||||
func (d *DBStore) MarkMessagesAsFound(peerID peer.ID, messageHashes []pb.MessageHash, clusterID uint) error {
|
||||
if len(messageHashes) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
query := "UPDATE missingMessages SET foundOnRecheck = true WHERE clusterID = $1 AND messageHash IN ("
|
||||
for i := range messageHashes {
|
||||
if i > 0 {
|
||||
@ -343,7 +347,7 @@ func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode peer.ID) err
|
||||
}
|
||||
|
||||
func (d *DBStore) CountMissingMessages(from time.Time, to time.Time, clusterID uint) (map[peer.ID]int, error) {
|
||||
rows, err := d.db.Query("SELECT storenode, count(1) as cnt FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' GROUP BY storenode", from.UnixNano(), to.UnixNano(), clusterID)
|
||||
rows, err := d.db.Query("SELECT storenode, count(1) as cnt FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' AND foundOnRecheck = false GROUP BY storenode", from.UnixNano(), to.UnixNano(), clusterID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user