mirror of
https://github.com/logos-messaging/storenode-messages-counter.git
synced 2026-01-02 06:03:07 +00:00
fix: limit query intervals to two minutes
This commit is contained in:
parent
54cdbc57be
commit
54fa01757b
@ -162,13 +162,17 @@ func Execute(ctx context.Context, options Options) error {
|
||||
runIdLogger := logger.With(zap.String("runId", runId))
|
||||
|
||||
runIdLogger.Info("verifying message history...")
|
||||
err := application.verifyHistory(ctx, runId, storenodeIDs, runIdLogger)
|
||||
shouldResetTimer, err := application.verifyHistory(ctx, runId, storenodeIDs, runIdLogger)
|
||||
if err != nil {
|
||||
runIdLogger.Error("could not verify message history", zap.Error(err))
|
||||
}
|
||||
runIdLogger.Info("verification complete")
|
||||
|
||||
missingMessagesTimer.Reset(timeInterval)
|
||||
if shouldResetTimer {
|
||||
missingMessagesTimer.Reset(0)
|
||||
} else {
|
||||
missingMessagesTimer.Reset(timeInterval)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -218,7 +222,7 @@ var msgMapLock sync.Mutex
|
||||
var msgMap map[pb.MessageHash]map[peer.ID]MessageExistence
|
||||
var msgPubsubTopic map[pb.MessageHash]string
|
||||
|
||||
func (app *Application) verifyHistory(ctx context.Context, runId string, storenodes peer.IDSlice, logger *zap.Logger) error {
|
||||
func (app *Application) verifyHistory(ctx context.Context, runId string, storenodes peer.IDSlice, logger *zap.Logger) (shouldResetTimer bool, err error) {
|
||||
|
||||
// [MessageHash][StoreNode] = exists?
|
||||
msgMapLock.Lock()
|
||||
@ -228,12 +232,12 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno
|
||||
|
||||
topicSyncStatus, err := app.db.GetTopicSyncStatus(ctx, options.ClusterID, options.PubSubTopics.Value())
|
||||
if err != nil {
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
|
||||
tx, err := app.db.GetTrx(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
@ -248,10 +252,36 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno
|
||||
wg := sync.WaitGroup{}
|
||||
for topic, lastSyncTimestamp := range topicSyncStatus {
|
||||
wg.Add(1)
|
||||
go func(topic string, lastSyncTimestamp *time.Time) {
|
||||
|
||||
logger = logger.With(zap.String("topic", topic), zap.Timep("lastSyncTimestamp", lastSyncTimestamp))
|
||||
if lastSyncTimestamp != nil {
|
||||
app.metrics.RecordLastSyncDate(topic, *lastSyncTimestamp)
|
||||
}
|
||||
|
||||
now := app.node.Timesource().Now()
|
||||
|
||||
// Query is done with a delay
|
||||
startTime := now.Add(-(timeInterval + delay))
|
||||
if lastSyncTimestamp != nil {
|
||||
startTime = *lastSyncTimestamp
|
||||
}
|
||||
endTime := now.Add(-delay)
|
||||
|
||||
if startTime.After(endTime) {
|
||||
logger.Warn("too soon to retrieve messages for topic")
|
||||
continue
|
||||
}
|
||||
|
||||
// This avoids extremely large resultsets
|
||||
if endTime.Sub(startTime) > timeInterval {
|
||||
endTime = endTime.Add(timeInterval)
|
||||
shouldResetTimer = true
|
||||
}
|
||||
|
||||
go func(topic string, startTime time.Time, endTime time.Time, logger *zap.Logger) {
|
||||
defer wg.Done()
|
||||
app.retrieveHistory(ctx, runId, storenodes, topic, lastSyncTimestamp, tx, logger)
|
||||
}(topic, lastSyncTimestamp)
|
||||
app.retrieveHistory(ctx, runId, storenodes, topic, startTime, endTime, tx, logger)
|
||||
}(topic, startTime, endTime, logger)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
@ -327,7 +357,7 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno
|
||||
logger.Info("missing message identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgPubsubTopic[msgHash]), zap.Int("num_nodes", len(missingIn)))
|
||||
err := app.db.RecordMessage(runId, tx, msgHash, options.ClusterID, msgPubsubTopic[msgHash], missingIn, "does_not_exist")
|
||||
if err != nil {
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
totalMissingMessages++
|
||||
}
|
||||
@ -336,7 +366,7 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno
|
||||
logger.Info("message with unknown state identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgPubsubTopic[msgHash]), zap.Int("num_nodes", len(missingIn)))
|
||||
err = app.db.RecordMessage(runId, tx, msgHash, options.ClusterID, msgPubsubTopic[msgHash], unknownIn, "unknown")
|
||||
if err != nil {
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -354,7 +384,7 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno
|
||||
logger.Info("total missing messages", zap.Int("total", totalMissingMessages))
|
||||
app.metrics.RecordTotalMissingMessages(totalMissingMessages)
|
||||
|
||||
return nil
|
||||
return shouldResetTimer, nil
|
||||
}
|
||||
|
||||
func (app *Application) checkMissingMessageStatus(ctx context.Context, storenodes []peer.ID, runId string, logger *zap.Logger) error {
|
||||
@ -518,27 +548,7 @@ func (app *Application) fetchStoreNodeMessages(ctx context.Context, runId string
|
||||
}
|
||||
}
|
||||
|
||||
func (app *Application) retrieveHistory(ctx context.Context, runId string, storenodes peer.IDSlice, topic string, lastSyncTimestamp *time.Time, tx *sql.Tx, logger *zap.Logger) {
|
||||
logger = logger.With(zap.String("topic", topic), zap.Timep("lastSyncTimestamp", lastSyncTimestamp))
|
||||
|
||||
if lastSyncTimestamp != nil {
|
||||
app.metrics.RecordLastSyncDate(topic, *lastSyncTimestamp)
|
||||
}
|
||||
|
||||
now := app.node.Timesource().Now()
|
||||
|
||||
// Query is done with a delay
|
||||
startTime := now.Add(-(timeInterval + delay))
|
||||
if lastSyncTimestamp != nil {
|
||||
startTime = *lastSyncTimestamp
|
||||
}
|
||||
endTime := now.Add(-delay)
|
||||
|
||||
if startTime.After(endTime) {
|
||||
logger.Warn("too soon to retrieve messages for topic")
|
||||
return
|
||||
}
|
||||
|
||||
func (app *Application) retrieveHistory(ctx context.Context, runId string, storenodes peer.IDSlice, topic string, startTime time.Time, endTime time.Time, tx *sql.Tx, logger *zap.Logger) {
|
||||
// Determine if the messages exist across all nodes
|
||||
wg := sync.WaitGroup{}
|
||||
for _, storePeerID := range storenodes {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user