mirror of
https://github.com/logos-messaging/storenode-messages-counter.git
synced 2026-01-04 07:03:11 +00:00
test: more logs
This commit is contained in:
parent
7ebfe40900
commit
1e9ed245f1
@ -11,6 +11,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||||
|
"github.com/ethereum/go-ethereum/log"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||||
@ -177,6 +178,7 @@ func verifyHistory(ctx context.Context, storenodes []peer.AddrInfo, wakuNode *no
|
|||||||
_ = tx.Rollback()
|
_ = tx.Rollback()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
log.Info("CHECK_TOPICSYNC - START")
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for topic, lastSyncTimestamp := range topicSyncStatus {
|
for topic, lastSyncTimestamp := range topicSyncStatus {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@ -185,13 +187,17 @@ func verifyHistory(ctx context.Context, storenodes []peer.AddrInfo, wakuNode *no
|
|||||||
retrieveHistory(ctx, runId, storenodes, topic, lastSyncTimestamp, wakuNode, dbStore, tx, logger)
|
retrieveHistory(ctx, runId, storenodes, topic, lastSyncTimestamp, wakuNode, dbStore, tx, logger)
|
||||||
}(topic, lastSyncTimestamp)
|
}(topic, lastSyncTimestamp)
|
||||||
}
|
}
|
||||||
|
log.Info("CHECK_TOPICSYNC - BEFORE_WAIT")
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
log.Info("CHECK_TOPICSYNC - AFTER_WAIT")
|
||||||
|
|
||||||
// Verify for each storenode which messages are not available, and query
|
// Verify for each storenode which messages are not available, and query
|
||||||
// for their existence using message hash
|
// for their existence using message hash
|
||||||
// ========================================================================
|
// ========================================================================
|
||||||
msgsToVerify := make(map[peer.ID][]pb.MessageHash) // storenode -> msgHash
|
msgsToVerify := make(map[peer.ID][]pb.MessageHash) // storenode -> msgHash
|
||||||
|
log.Info("CHECK_MSG_MAP - BEFORE_LOCK")
|
||||||
msgMapLock.Lock()
|
msgMapLock.Lock()
|
||||||
|
log.Info("CHECK_MSG_MAP - AFTER_LOCK")
|
||||||
for msgHash, nodes := range msgMap {
|
for msgHash, nodes := range msgMap {
|
||||||
for _, node := range storenodes {
|
for _, node := range storenodes {
|
||||||
if nodes[node.ID] != Exists {
|
if nodes[node.ID] != Exists {
|
||||||
@ -202,6 +208,7 @@ func verifyHistory(ctx context.Context, storenodes []peer.AddrInfo, wakuNode *no
|
|||||||
msgMapLock.Unlock()
|
msgMapLock.Unlock()
|
||||||
|
|
||||||
wg = sync.WaitGroup{}
|
wg = sync.WaitGroup{}
|
||||||
|
log.Info("CHECK_VERIF_EXISTENCE - START")
|
||||||
for peerID, messageHashes := range msgsToVerify {
|
for peerID, messageHashes := range msgsToVerify {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(peerID peer.ID, messageHashes []pb.MessageHash) {
|
go func(peerID peer.ID, messageHashes []pb.MessageHash) {
|
||||||
@ -209,7 +216,9 @@ func verifyHistory(ctx context.Context, storenodes []peer.AddrInfo, wakuNode *no
|
|||||||
verifyMessageExistence(ctx, runId, peerID, messageHashes, wakuNode, dbStore, logger)
|
verifyMessageExistence(ctx, runId, peerID, messageHashes, wakuNode, dbStore, logger)
|
||||||
}(peerID, messageHashes)
|
}(peerID, messageHashes)
|
||||||
}
|
}
|
||||||
|
log.Info("CHECK_VERIF_EXISTENCE - BEFORE_WAIT")
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
log.Info("CHECK_VERIF_EXISTENCE - AFTER_WAIT")
|
||||||
|
|
||||||
// If a message is not available, store in DB in which store nodes it wasnt
|
// If a message is not available, store in DB in which store nodes it wasnt
|
||||||
// available and its timestamp
|
// available and its timestamp
|
||||||
@ -307,6 +316,7 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn
|
|||||||
iteratorLbl:
|
iteratorLbl:
|
||||||
for !result.IsComplete() {
|
for !result.IsComplete() {
|
||||||
msgMapLock.Lock()
|
msgMapLock.Lock()
|
||||||
|
logger.Info("OBTAINED MESSAGES", zap.Any("len", len(result.Messages())))
|
||||||
for _, mkv := range result.Messages() {
|
for _, mkv := range result.Messages() {
|
||||||
hash := mkv.WakuMessageHash()
|
hash := mkv.WakuMessageHash()
|
||||||
_, ok := msgMap[hash]
|
_, ok := msgMap[hash]
|
||||||
@ -325,6 +335,7 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn
|
|||||||
|
|
||||||
nextRetryLbl:
|
nextRetryLbl:
|
||||||
for i := 0; i < maxAttempts; i++ {
|
for i := 0; i < maxAttempts; i++ {
|
||||||
|
logger.Info("EXECUTING NEXT!!!")
|
||||||
err = result.Next(ctx)
|
err = result.Next(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("could not query storenode", zap.Stringer("storenode", node), zap.Error(err))
|
logger.Error("could not query storenode", zap.Stringer("storenode", node), zap.Error(err))
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user