From 1e9ed245f196a56d126a11b67a0df5a202bfa831 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 29 May 2024 22:44:37 -0400 Subject: [PATCH] test: more logs --- cmd/storemsgcounter/execute.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/cmd/storemsgcounter/execute.go b/cmd/storemsgcounter/execute.go index d11fe70..60441c4 100644 --- a/cmd/storemsgcounter/execute.go +++ b/cmd/storemsgcounter/execute.go @@ -11,6 +11,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/log" "github.com/google/uuid" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" @@ -177,6 +178,7 @@ func verifyHistory(ctx context.Context, storenodes []peer.AddrInfo, wakuNode *no _ = tx.Rollback() }() + log.Info("CHECK_TOPICSYNC - START") wg := sync.WaitGroup{} for topic, lastSyncTimestamp := range topicSyncStatus { wg.Add(1) @@ -185,13 +187,17 @@ func verifyHistory(ctx context.Context, storenodes []peer.AddrInfo, wakuNode *no retrieveHistory(ctx, runId, storenodes, topic, lastSyncTimestamp, wakuNode, dbStore, tx, logger) }(topic, lastSyncTimestamp) } + log.Info("CHECK_TOPICSYNC - BEFORE_WAIT") wg.Wait() + log.Info("CHECK_TOPICSYNC - AFTER_WAIT") // Verify for each storenode which messages are not available, and query // for their existence using message hash // ======================================================================== msgsToVerify := make(map[peer.ID][]pb.MessageHash) // storenode -> msgHash + log.Info("CHECK_MSG_MAP - BEFORE_LOCK") msgMapLock.Lock() + log.Info("CHECK_MSG_MAP - AFTER_LOCK") for msgHash, nodes := range msgMap { for _, node := range storenodes { if nodes[node.ID] != Exists { @@ -202,6 +208,7 @@ func verifyHistory(ctx context.Context, storenodes []peer.AddrInfo, wakuNode *no msgMapLock.Unlock() wg = sync.WaitGroup{} + log.Info("CHECK_VERIF_EXISTENCE - START") for peerID, messageHashes := range msgsToVerify { wg.Add(1) go func(peerID peer.ID, messageHashes []pb.MessageHash) { @@ -209,7 +216,9 @@ func verifyHistory(ctx context.Context, storenodes []peer.AddrInfo, wakuNode *no verifyMessageExistence(ctx, runId, peerID, messageHashes, wakuNode, dbStore, logger) }(peerID, messageHashes) } + log.Info("CHECK_VERIF_EXISTENCE - BEFORE_WAIT") wg.Wait() + log.Info("CHECK_VERIF_EXISTENCE - AFTER_WAIT") // If a message is not available, store in DB in which store nodes it wasnt // available and its timestamp @@ -307,6 +316,7 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn iteratorLbl: for !result.IsComplete() { msgMapLock.Lock() + logger.Info("OBTAINED MESSAGES", zap.Any("len", len(result.Messages()))) for _, mkv := range result.Messages() { hash := mkv.WakuMessageHash() _, ok := msgMap[hash] @@ -325,6 +335,7 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn nextRetryLbl: for i := 0; i < maxAttempts; i++ { + logger.Info("EXECUTING NEXT!!!") err = result.Next(ctx) if err != nil { logger.Error("could not query storenode", zap.Stringer("storenode", node), zap.Error(err))