From 6faafdbd70b82822143b7a5abf07742fcfca614e Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Fri, 31 May 2024 14:41:54 -0400 Subject: [PATCH] chore: fix logs --- cmd/storemsgcounter/execute.go | 81 +++++++++++----------------------- 1 file changed, 25 insertions(+), 56 deletions(-) diff --git a/cmd/storemsgcounter/execute.go b/cmd/storemsgcounter/execute.go index 78d846f..5318a56 100644 --- a/cmd/storemsgcounter/execute.go +++ b/cmd/storemsgcounter/execute.go @@ -11,7 +11,6 @@ import ( "time" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/log" "github.com/google/uuid" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" @@ -154,16 +153,10 @@ var msgAttr map[pb.MessageHash]MessageAttr func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) error { // [MessageHash][StoreNode] = exists? - log.Info("LOCKING_MAP - START") msgMapLock.Lock() - log.Info("LOCKING_MAP - END") - msgMap = make(map[pb.MessageHash]map[peer.ID]MessageExistence) msgAttr = make(map[pb.MessageHash]MessageAttr) - - log.Info("UNLOCKING - START") msgMapLock.Unlock() - log.Info("UNLOCKING - END") topicSyncStatus, err := dbStore.GetTopicSyncStatus(ctx, options.ClusterID, options.PubSubTopics.Value()) if err != nil { @@ -184,7 +177,6 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo _ = tx.Rollback() }() - log.Info("CHECK_TOPICSYNC - START") wg := sync.WaitGroup{} for topic, lastSyncTimestamp := range topicSyncStatus { wg.Add(1) @@ -193,17 +185,13 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo retrieveHistory(ctx, runId, storenodes, topic, lastSyncTimestamp, wakuNode, dbStore, tx, logger) }(topic, lastSyncTimestamp) } - log.Info("CHECK_TOPICSYNC - BEFORE_WAIT") wg.Wait() - log.Info("CHECK_TOPICSYNC - AFTER_WAIT") // Verify for each storenode which messages are not available, and query // for their existence using message hash // ======================================================================== msgsToVerify := make(map[peer.ID][]pb.MessageHash) // storenode -> msgHash - log.Info("CHECK_MSG_MAP - BEFORE_LOCK") msgMapLock.Lock() - log.Info("CHECK_MSG_MAP - AFTER_LOCK") for msgHash, nodes := range msgMap { for _, node := range storenodes { if nodes[node.ID] != Exists { @@ -214,7 +202,6 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo msgMapLock.Unlock() wg = sync.WaitGroup{} - log.Info("CHECK_VERIF_EXISTENCE - START") for peerID, messageHashes := range msgsToVerify { wg.Add(1) go func(peerID peer.ID, messageHashes []pb.MessageHash) { @@ -222,9 +209,7 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo verifyMessageExistence(ctx, runId, peerID, messageHashes, wakuNode, dbStore, logger) }(peerID, messageHashes) } - log.Info("CHECK_VERIF_EXISTENCE - BEFORE_WAIT") wg.Wait() - log.Info("CHECK_VERIF_EXISTENCE - AFTER_WAIT") // If a message is not available, store in DB in which store nodes it wasnt // available and its timestamp @@ -290,9 +275,12 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn var result *store.Result var err error + queryLogger := logger.With(zap.Stringer("storenode", node.ID), zap.Int64("startTime", startTime.UnixNano()), zap.Int64("endTime", endTime.UnixNano())) + queryLbl: for i := 0; i < maxAttempts; i++ { - logger.Info("retrieving message history for topic!", zap.Stringer("storenode", node.ID), zap.Int64("startTime", startTime.UnixNano()), zap.Int64("endTime", endTime.UnixNano()), zap.Int("attempt", i)) + + queryLogger.Info("retrieving message history for topic!", zap.Int("attempt", i)) tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) result, err = wakuNode.Store().Query(tCtx, store.FilterCriteria{ @@ -302,38 +290,27 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn }, store.WithPeer(node.ID), store.WithPaging(false, 100)) cancel() if err != nil { - logger.Error("could not query storenode", zap.Stringer("storenode", node), zap.Error(err)) + queryLogger.Error("could not query storenode", zap.Error(err), zap.Int("attempt", i)) storeNodeFailure = true time.Sleep(2 * time.Second) } else { - logger.Info("messages available?", zap.Int("len", len(result.Messages()))) + queryLogger.Info("messages available", zap.Int("len", len(result.Messages()))) storeNodeFailure = false break queryLbl } } if storeNodeFailure { - logger.Error("storenode not available", zap.Stringer("storenode", node), zap.Time("startTime", startTime), zap.Time("endTime", endTime)) + queryLogger.Error("storenode not available") err := dbStore.RecordStorenodeUnavailable(runId, node) if err != nil { - logger.Error("could not store node unavailable", zap.Error(err), zap.Stringer("storenode", node)) + queryLogger.Error("could not store node unavailable", zap.Error(err)) } - } else { iteratorLbl: for !result.IsComplete() { msgMapLock.Lock() - - oldMessageT := zap.Skip() - newMessageT := zap.Skip() - if len(result.Messages()) > 0 { - oldMessageT = zap.Int64("oldMessageT", result.Messages()[0].Message.GetTimestamp()) - newMessageT = zap.Int64("newMessageT", result.Messages()[len(result.Messages())-1].Message.GetTimestamp()) - } - - logger.Info("OBTAINED MESSAGES", zap.Any("len", len(result.Messages())), newMessageT, oldMessageT) - for _, mkv := range result.Messages() { hash := mkv.WakuMessageHash() _, ok := msgMap[hash] @@ -352,29 +329,24 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn nextRetryLbl: for i := 0; i < maxAttempts; i++ { - logger.Info("EXECUTING NEXT!!!", zap.String("cursor", hex.EncodeToString(result.Cursor()))) + queryLogger.Info("retrieving next page", zap.String("cursor", hex.EncodeToString(result.Cursor()))) err = result.Next(ctx) if err != nil { - logger.Error("could not query storenode", zap.Stringer("storenode", node), zap.Error(err)) + queryLogger.Error("could not query storenode", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Error(err)) storeNodeFailure = true time.Sleep(2 * time.Second) } else { + queryLogger.Info("more messages available", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Int("len", len(result.Messages()))) storeNodeFailure = false break nextRetryLbl } } if storeNodeFailure { - logger.Error("storenode not available", - zap.Stringer("storenode", node), - zap.Time("startTime", startTime), - zap.Time("endTime", endTime), - zap.String("topic", topic), - zap.String("cursor", hexutil.Encode(result.Cursor()))) - + queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor()))) err := dbStore.RecordStorenodeUnavailable(runId, node) if err != nil { - logger.Error("could not store recordnode unavailable", zap.Error(err), zap.Stringer("storenode", node)) + queryLogger.Error("could not store recordnode unavailable", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Error(err)) } break iteratorLbl } @@ -396,28 +368,29 @@ func verifyMessageExistence(ctx context.Context, runId string, peerID peer.ID, m peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID) + queryLogger := logger.With(zap.Stringer("storenode", peerID), zap.Stringers("hashes", messageHashes)) + queryLbl: for i := 0; i < maxAttempts; i++ { - logger.Info("querying by hash", zap.Stringer("storenode", peerID), zap.Stringers("hashes", messageHashes), zap.Int("attempt", i)) + queryLogger.Info("querying by hash", zap.Int("attempt", i)) result, err = wakuNode.Store().QueryByHash(ctx, messageHashes, store.IncludeData(false), store.WithPeer(peerInfo.ID), store.WithPaging(false, 100)) if err != nil { - logger.Error("could not query storenode", zap.Stringer("storenode", peerInfo), zap.Error(err)) + queryLogger.Error("could not query storenode", zap.Error(err), zap.Int("attempt", i)) storeNodeFailure = true time.Sleep(2 * time.Second) } else { + queryLogger.Info("hashes available", zap.Int("len", len(result.Messages()))) storeNodeFailure = false break queryLbl } } if storeNodeFailure { - logger.Error("storenode not available", - zap.Stringer("storenode", peerInfo), - zap.Stringers("hashes", messageHashes)) + queryLogger.Error("storenode not available") err := dbStore.RecordStorenodeUnavailable(runId, peerInfo) if err != nil { - logger.Error("could not store recordnode unavailable", zap.Error(err), zap.Stringer("storenode", peerInfo)) + queryLogger.Error("could not store recordnode unavailable", zap.Error(err)) } } else { for !result.IsComplete() { @@ -443,30 +416,26 @@ queryLbl: nextRetryLbl: for i := 0; i < maxAttempts; i++ { - logger.Info("executing next while querying hashes", zap.Stringer("storenode", peerID), zap.Int("attempt", i)) + queryLogger.Info("executing next while querying hashes", zap.String("cursor", hexutil.Encode(result.Cursor())), zap.Int("attempt", i)) err = result.Next(ctx) if err != nil { - logger.Error("could not query storenode", zap.Stringer("storenode", peerInfo), zap.Error(err)) + queryLogger.Error("could not query storenode", zap.String("cursor", hexutil.Encode(result.Cursor())), zap.Error(err), zap.Int("attempt", i)) storeNodeFailure = true time.Sleep(2 * time.Second) } else { + queryLogger.Info("more hashes available", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Int("len", len(result.Messages()))) storeNodeFailure = false break nextRetryLbl } } if storeNodeFailure { - logger.Error("storenode not available", - zap.Stringer("storenode", peerInfo), - zap.Stringers("hashes", messageHashes), - zap.String("cursor", hexutil.Encode(result.Cursor()))) - + queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor()))) err := dbStore.RecordStorenodeUnavailable(runId, peerInfo) if err != nil { - logger.Error("could not store recordnode unavailable", zap.Error(err), zap.Stringer("storenode", peerInfo)) + logger.Error("could not store recordnode unavailable", zap.Error(err), zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Stringer("storenode", peerInfo)) } } - } } }