diff --git a/Makefile b/Makefile index 122e7e1..0a163ca 100644 --- a/Makefile +++ b/Makefile @@ -14,4 +14,4 @@ lint-install: lint: @echo "lint" - @golangci-lint run ./... --deadline=5m + @golangci-lint run ./... diff --git a/cmd/storemsgcounter/execute.go b/cmd/storemsgcounter/execute.go index cb91c5c..ef58b07 100644 --- a/cmd/storemsgcounter/execute.go +++ b/cmd/storemsgcounter/execute.go @@ -40,6 +40,12 @@ const timeInterval = 2 * time.Minute const delay = 5 * time.Minute const maxAttempts = 3 +type Application struct { + node *node.WakuNode + metrics metrics.Metrics + db *persistence.DBStore +} + func Execute(ctx context.Context, options Options) error { // Set encoding for logs (console, json, ...) // Note that libp2p reads the encoding from GOLOG_LOG_FMT env var. @@ -129,6 +135,12 @@ func Execute(ctx context.Context, options Options) error { return err } + application := &Application{ + node: wakuNode, + metrics: metrics, + db: dbStore, + } + timer := time.NewTimer(0) defer timer.Stop() for { @@ -141,7 +153,7 @@ func Execute(ctx context.Context, options Options) error { runIdLogger := logger.With(zap.String("runId", runId)) runIdLogger.Info("verifying message history...") - err := verifyHistory(ctx, runId, storenodes, wakuNode, dbStore, metrics, runIdLogger) + err := application.verifyHistory(ctx, runId, storenodes, runIdLogger) if err != nil { return err } @@ -156,7 +168,7 @@ var msgMapLock sync.Mutex var msgMap map[pb.MessageHash]map[peer.ID]MessageExistence var msgPubsubTopic map[pb.MessageHash]string -func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, wakuNode *node.WakuNode, dbStore *persistence.DBStore, metrics metrics.Metrics, logger *zap.Logger) error { +func (app *Application) verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, logger *zap.Logger) error { // [MessageHash][StoreNode] = exists? msgMapLock.Lock() @@ -164,12 +176,12 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo msgPubsubTopic = make(map[pb.MessageHash]string) msgMapLock.Unlock() - topicSyncStatus, err := dbStore.GetTopicSyncStatus(ctx, options.ClusterID, options.PubSubTopics.Value()) + topicSyncStatus, err := app.db.GetTopicSyncStatus(ctx, options.ClusterID, options.PubSubTopics.Value()) if err != nil { return err } - tx, err := dbStore.GetTrx(ctx) + tx, err := app.db.GetTrx(ctx) if err != nil { return err } @@ -188,7 +200,7 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo wg.Add(1) go func(topic string, lastSyncTimestamp *time.Time) { defer wg.Done() - retrieveHistory(ctx, runId, storenodes, topic, lastSyncTimestamp, wakuNode, dbStore, tx, metrics, logger) + app.retrieveHistory(ctx, runId, storenodes, topic, lastSyncTimestamp, tx, logger) }(topic, lastSyncTimestamp) } wg.Wait() @@ -212,7 +224,7 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo wg.Add(1) go func(peerID peer.ID, messageHashes []pb.MessageHash) { defer wg.Done() - verifyMessageExistence(ctx, runId, peerID, messageHashes, wakuNode, dbStore, metrics, logger) + app.verifyMessageExistence(ctx, runId, peerID, messageHashes, logger) }(peerID, messageHashes) } wg.Wait() @@ -241,7 +253,7 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo if len(missingIn) != 0 { logger.Info("missing message identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgPubsubTopic[msgHash]), zap.Int("num_nodes", len(missingIn))) - err := dbStore.RecordMessage(runId, tx, msgHash, options.ClusterID, msgPubsubTopic[msgHash], missingIn, "does_not_exist") + err := app.db.RecordMessage(runId, tx, msgHash, options.ClusterID, msgPubsubTopic[msgHash], missingIn, "does_not_exist") if err != nil { return err } @@ -249,7 +261,7 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo if len(unknownIn) != 0 { logger.Info("message with unknown state identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgPubsubTopic[msgHash]), zap.Int("num_nodes", len(missingIn))) - err = dbStore.RecordMessage(runId, tx, msgHash, options.ClusterID, msgPubsubTopic[msgHash], unknownIn, "unknown") + err = app.db.RecordMessage(runId, tx, msgHash, options.ClusterID, msgPubsubTopic[msgHash], unknownIn, "unknown") if err != nil { return err } @@ -257,12 +269,12 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo } for s, cnt := range missingInSummary { - metrics.RecordMissingMessages(s, "does_not_exist", cnt) + app.metrics.RecordMissingMessages(s, "does_not_exist", cnt) logger.Info("missing message summary", zap.Stringer("storenode", s), zap.Int("numMsgs", cnt)) } for s, cnt := range unknownInSummary { - metrics.RecordMissingMessages(s, "unknown", cnt) + app.metrics.RecordMissingMessages(s, "unknown", cnt) logger.Info("messages that could not be verified summary", zap.Stringer("storenode", s), zap.Int("numMsgs", cnt)) } @@ -270,10 +282,100 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo return nil } -func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, topic string, lastSyncTimestamp *time.Time, wakuNode *node.WakuNode, dbStore *persistence.DBStore, tx *sql.Tx, metrics metrics.Metrics, logger *zap.Logger) { +func (app *Application) fetchStoreNodeMessages(ctx context.Context, runId string, storenodeID peer.ID, topic string, startTime time.Time, endTime time.Time, logger *zap.Logger) { + var result *store.Result + var err error + + queryLogger := logger.With(zap.Stringer("storenode", storenodeID), zap.Int64("startTime", startTime.UnixNano()), zap.Int64("endTime", endTime.UnixNano())) + + retry := true + success := false + count := 1 + for retry && count <= maxAttempts { + queryLogger.Info("retrieving message history for topic!", zap.Int("attempt", count)) + + tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) + result, err = app.node.Store().Query(tCtx, store.FilterCriteria{ + ContentFilter: protocol.NewContentFilter(topic), + TimeStart: proto.Int64(startTime.UnixNano()), + TimeEnd: proto.Int64(endTime.UnixNano()), + }, store.WithPeer(storenodeID), store.WithPaging(false, 100), store.IncludeData(false)) + cancel() + + if err != nil { + queryLogger.Error("could not query storenode", zap.Error(err), zap.Int("attempt", count)) + time.Sleep(2 * time.Second) + } else { + queryLogger.Info("messages available", zap.Int("len", len(result.Messages()))) + retry = false + success = true + } + count++ + } + + if !success { + queryLogger.Error("storenode not available") + err := app.db.RecordStorenodeUnavailable(runId, storenodeID) + if err != nil { + queryLogger.Error("could not store node unavailable", zap.Error(err)) + } + app.metrics.RecordStorenodeAvailability(storenodeID, false) + return + } + + app.metrics.RecordStorenodeAvailability(storenodeID, true) + + for !result.IsComplete() { + msgMapLock.Lock() + for _, mkv := range result.Messages() { + hash := mkv.WakuMessageHash() + _, ok := msgMap[hash] + if !ok { + msgMap[hash] = make(map[peer.ID]MessageExistence) + } + msgMap[hash][storenodeID] = Exists + msgPubsubTopic[hash] = mkv.GetPubsubTopic() + } + msgMapLock.Unlock() + + retry := true + success := false + count := 1 + cursorLogger := queryLogger.With(zap.String("cursor", hex.EncodeToString(result.Cursor()))) + for retry && count <= maxAttempts { + cursorLogger.Info("retrieving next page") + tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) + err = result.Next(tCtx) + cancel() + if err != nil { + cursorLogger.Error("could not query storenode", zap.Error(err)) + time.Sleep(2 * time.Second) + } else { + cursorLogger.Info("more messages available", zap.Int("len", len(result.Messages()))) + retry = false + success = true + } + count++ + } + + if !success { + cursorLogger.Error("storenode not available") + err := app.db.RecordStorenodeUnavailable(runId, storenodeID) + if err != nil { + cursorLogger.Error("could not store recordnode unavailable", zap.Error(err)) + } + app.metrics.RecordStorenodeAvailability(storenodeID, false) + return + } + + app.metrics.RecordStorenodeAvailability(storenodeID, true) + } +} + +func (app *Application) retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, topic string, lastSyncTimestamp *time.Time, tx *sql.Tx, logger *zap.Logger) { logger = logger.With(zap.String("topic", topic), zap.Timep("lastSyncTimestamp", lastSyncTimestamp)) - now := wakuNode.Timesource().Now() + now := app.node.Timesource().Now() // Query is done with a delay startTime := now.Add(-(timeInterval + delay)) @@ -288,189 +390,111 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn } // Determine if the messages exist across all nodes + wg := sync.WaitGroup{} for _, node := range storenodes { - storeNodeFailure := false - - var result *store.Result - var err error - - queryLogger := logger.With(zap.Stringer("storenode", node.ID), zap.Int64("startTime", startTime.UnixNano()), zap.Int64("endTime", endTime.UnixNano())) - - queryLbl: - for i := 0; i < maxAttempts; i++ { - - queryLogger.Info("retrieving message history for topic!", zap.Int("attempt", i)) - - tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) - result, err = wakuNode.Store().Query(tCtx, store.FilterCriteria{ - ContentFilter: protocol.NewContentFilter(topic), - TimeStart: proto.Int64(startTime.UnixNano()), - TimeEnd: proto.Int64(endTime.UnixNano()), - }, store.WithPeer(node.ID), store.WithPaging(false, 100), store.IncludeData(false)) - cancel() - if err != nil { - queryLogger.Error("could not query storenode", zap.Error(err), zap.Int("attempt", i)) - storeNodeFailure = true - time.Sleep(2 * time.Second) - } else { - queryLogger.Info("messages available", zap.Int("len", len(result.Messages()))) - storeNodeFailure = false - break queryLbl - } - } - - if storeNodeFailure { - queryLogger.Error("storenode not available") - err := dbStore.RecordStorenodeUnavailable(runId, node.ID) - if err != nil { - queryLogger.Error("could not store node unavailable", zap.Error(err)) - } - metrics.RecordStorenodeAvailability(node.ID, false) - } else { - metrics.RecordStorenodeAvailability(node.ID, true) - - iteratorLbl: - for !result.IsComplete() { - msgMapLock.Lock() - for _, mkv := range result.Messages() { - hash := mkv.WakuMessageHash() - _, ok := msgMap[hash] - if !ok { - msgMap[hash] = make(map[peer.ID]MessageExistence) - } - msgMap[hash][node.ID] = Exists - msgPubsubTopic[hash] = mkv.GetPubsubTopic() - } - msgMapLock.Unlock() - - storeNodeFailure := false - - nextRetryLbl: - for i := 0; i < maxAttempts; i++ { - queryLogger.Info("retrieving next page", zap.String("cursor", hex.EncodeToString(result.Cursor()))) - tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) - err = result.Next(tCtx) - cancel() - if err != nil { - queryLogger.Error("could not query storenode", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Error(err)) - storeNodeFailure = true - time.Sleep(2 * time.Second) - } else { - queryLogger.Info("more messages available", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Int("len", len(result.Messages()))) - storeNodeFailure = false - break nextRetryLbl - } - } - - if storeNodeFailure { - queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor()))) - err := dbStore.RecordStorenodeUnavailable(runId, node.ID) - if err != nil { - queryLogger.Error("could not store recordnode unavailable", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Error(err)) - } - metrics.RecordStorenodeAvailability(node.ID, false) - break iteratorLbl - } else { - metrics.RecordStorenodeAvailability(node.ID, true) - } - } - } - + wg.Add(1) + go func(peerID peer.ID) { + defer wg.Done() + app.fetchStoreNodeMessages(ctx, runId, peerID, topic, startTime, endTime, logger) + }(node.ID) } + wg.Wait() + // Update db with last sync time - err := dbStore.UpdateTopicSyncState(tx, options.ClusterID, topic, endTime) + err := app.db.UpdateTopicSyncState(tx, options.ClusterID, topic, endTime) if err != nil { logger.Panic("could not update topic sync state", zap.Error(err)) } } -func verifyMessageExistence(ctx context.Context, runId string, peerID peer.ID, messageHashes []pb.MessageHash, wakuNode *node.WakuNode, dbStore *persistence.DBStore, metrics metrics.Metrics, logger *zap.Logger) { - storeNodeFailure := false +func (app *Application) verifyMessageExistence(ctx context.Context, runId string, peerID peer.ID, messageHashes []pb.MessageHash, logger *zap.Logger) { var result *store.Result var err error - peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID) + peerInfo := app.node.Host().Peerstore().PeerInfo(peerID) queryLogger := logger.With(zap.Stringer("storenode", peerID)) -queryLbl: - for i := 0; i < maxAttempts; i++ { - queryLogger.Info("querying by hash", zap.Int("attempt", i)) + retry := true + success := false + count := 1 + for retry && count <= maxAttempts { + queryLogger.Info("querying by hash", zap.Int("attempt", count)) tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) - result, err = wakuNode.Store().QueryByHash(tCtx, messageHashes, store.IncludeData(false), store.WithPeer(peerInfo.ID), store.WithPaging(false, 100)) + result, err = app.node.Store().QueryByHash(tCtx, messageHashes, store.IncludeData(false), store.WithPeer(peerInfo.ID), store.WithPaging(false, 100)) cancel() if err != nil { - queryLogger.Error("could not query storenode", zap.Error(err), zap.Int("attempt", i)) - storeNodeFailure = true + queryLogger.Error("could not query storenode", zap.Error(err), zap.Int("attempt", count)) time.Sleep(2 * time.Second) } else { queryLogger.Info("hashes available", zap.Int("len", len(result.Messages()))) - storeNodeFailure = false - break queryLbl + retry = false + success = true } + count++ } - if storeNodeFailure { + if !success { queryLogger.Error("storenode not available") - - err := dbStore.RecordStorenodeUnavailable(runId, peerID) + err := app.db.RecordStorenodeUnavailable(runId, peerID) if err != nil { queryLogger.Error("could not store recordnode unavailable", zap.Error(err)) } - metrics.RecordStorenodeAvailability(peerID, false) + app.metrics.RecordStorenodeAvailability(peerID, false) + return + } - } else { - metrics.RecordStorenodeAvailability(peerID, true) + app.metrics.RecordStorenodeAvailability(peerID, true) - for !result.IsComplete() { - msgMapLock.Lock() - for _, mkv := range result.Messages() { - hash := mkv.WakuMessageHash() - _, ok := msgMap[hash] - if !ok { - msgMap[hash] = make(map[peer.ID]MessageExistence) - } - msgMap[hash][peerInfo.ID] = Exists + for !result.IsComplete() { + msgMapLock.Lock() + for _, mkv := range result.Messages() { + hash := mkv.WakuMessageHash() + _, ok := msgMap[hash] + if !ok { + msgMap[hash] = make(map[peer.ID]MessageExistence) } + msgMap[hash][peerInfo.ID] = Exists + } - for _, msgHash := range messageHashes { - if msgMap[msgHash][peerInfo.ID] != Exists { - msgMap[msgHash][peerInfo.ID] = DoesNotExist - } - } - - msgMapLock.Unlock() - - storeNodeFailure = false - - nextRetryLbl: - for i := 0; i < maxAttempts; i++ { - queryLogger.Info("executing next while querying hashes", zap.String("cursor", hexutil.Encode(result.Cursor())), zap.Int("attempt", i)) - tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) - err = result.Next(tCtx) - cancel() - if err != nil { - queryLogger.Error("could not query storenode", zap.String("cursor", hexutil.Encode(result.Cursor())), zap.Error(err), zap.Int("attempt", i)) - storeNodeFailure = true - time.Sleep(2 * time.Second) - } else { - queryLogger.Info("more hashes available", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Int("len", len(result.Messages()))) - storeNodeFailure = false - break nextRetryLbl - } - } - - if storeNodeFailure { - queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor()))) - err := dbStore.RecordStorenodeUnavailable(runId, peerID) - if err != nil { - logger.Error("could not store recordnode unavailable", zap.Error(err), zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Stringer("storenode", peerInfo)) - } - metrics.RecordStorenodeAvailability(peerID, false) - } else { - metrics.RecordStorenodeAvailability(peerID, true) + for _, msgHash := range messageHashes { + if msgMap[msgHash][peerInfo.ID] != Exists { + msgMap[msgHash][peerInfo.ID] = DoesNotExist } } + + msgMapLock.Unlock() + + retry := true + success := false + count := 1 + for retry && count <= maxAttempts { + queryLogger.Info("executing next while querying hashes", zap.String("cursor", hexutil.Encode(result.Cursor())), zap.Int("attempt", count)) + tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) + err = result.Next(tCtx) + cancel() + if err != nil { + queryLogger.Error("could not query storenode", zap.String("cursor", hexutil.Encode(result.Cursor())), zap.Error(err), zap.Int("attempt", count)) + time.Sleep(2 * time.Second) + } else { + queryLogger.Info("more hashes available", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Int("len", len(result.Messages()))) + retry = false + success = true + } + count++ + } + + if !success { + queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor()))) + err := app.db.RecordStorenodeUnavailable(runId, peerID) + if err != nil { + logger.Error("could not store recordnode unavailable", zap.Error(err), zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Stringer("storenode", peerInfo)) + } + app.metrics.RecordStorenodeAvailability(peerID, false) + return + } + + app.metrics.RecordStorenodeAvailability(peerID, true) } }