mirror of
https://github.com/logos-messaging/storenode-messages-counter.git
synced 2026-01-07 00:23:09 +00:00
chore: fix logs
This commit is contained in:
parent
b82c5c1a3a
commit
6faafdbd70
@ -11,7 +11,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||||
@ -154,16 +153,10 @@ var msgAttr map[pb.MessageHash]MessageAttr
|
|||||||
func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) error {
|
func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) error {
|
||||||
|
|
||||||
// [MessageHash][StoreNode] = exists?
|
// [MessageHash][StoreNode] = exists?
|
||||||
log.Info("LOCKING_MAP - START")
|
|
||||||
msgMapLock.Lock()
|
msgMapLock.Lock()
|
||||||
log.Info("LOCKING_MAP - END")
|
|
||||||
|
|
||||||
msgMap = make(map[pb.MessageHash]map[peer.ID]MessageExistence)
|
msgMap = make(map[pb.MessageHash]map[peer.ID]MessageExistence)
|
||||||
msgAttr = make(map[pb.MessageHash]MessageAttr)
|
msgAttr = make(map[pb.MessageHash]MessageAttr)
|
||||||
|
|
||||||
log.Info("UNLOCKING - START")
|
|
||||||
msgMapLock.Unlock()
|
msgMapLock.Unlock()
|
||||||
log.Info("UNLOCKING - END")
|
|
||||||
|
|
||||||
topicSyncStatus, err := dbStore.GetTopicSyncStatus(ctx, options.ClusterID, options.PubSubTopics.Value())
|
topicSyncStatus, err := dbStore.GetTopicSyncStatus(ctx, options.ClusterID, options.PubSubTopics.Value())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -184,7 +177,6 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo
|
|||||||
_ = tx.Rollback()
|
_ = tx.Rollback()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
log.Info("CHECK_TOPICSYNC - START")
|
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for topic, lastSyncTimestamp := range topicSyncStatus {
|
for topic, lastSyncTimestamp := range topicSyncStatus {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@ -193,17 +185,13 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo
|
|||||||
retrieveHistory(ctx, runId, storenodes, topic, lastSyncTimestamp, wakuNode, dbStore, tx, logger)
|
retrieveHistory(ctx, runId, storenodes, topic, lastSyncTimestamp, wakuNode, dbStore, tx, logger)
|
||||||
}(topic, lastSyncTimestamp)
|
}(topic, lastSyncTimestamp)
|
||||||
}
|
}
|
||||||
log.Info("CHECK_TOPICSYNC - BEFORE_WAIT")
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
log.Info("CHECK_TOPICSYNC - AFTER_WAIT")
|
|
||||||
|
|
||||||
// Verify for each storenode which messages are not available, and query
|
// Verify for each storenode which messages are not available, and query
|
||||||
// for their existence using message hash
|
// for their existence using message hash
|
||||||
// ========================================================================
|
// ========================================================================
|
||||||
msgsToVerify := make(map[peer.ID][]pb.MessageHash) // storenode -> msgHash
|
msgsToVerify := make(map[peer.ID][]pb.MessageHash) // storenode -> msgHash
|
||||||
log.Info("CHECK_MSG_MAP - BEFORE_LOCK")
|
|
||||||
msgMapLock.Lock()
|
msgMapLock.Lock()
|
||||||
log.Info("CHECK_MSG_MAP - AFTER_LOCK")
|
|
||||||
for msgHash, nodes := range msgMap {
|
for msgHash, nodes := range msgMap {
|
||||||
for _, node := range storenodes {
|
for _, node := range storenodes {
|
||||||
if nodes[node.ID] != Exists {
|
if nodes[node.ID] != Exists {
|
||||||
@ -214,7 +202,6 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo
|
|||||||
msgMapLock.Unlock()
|
msgMapLock.Unlock()
|
||||||
|
|
||||||
wg = sync.WaitGroup{}
|
wg = sync.WaitGroup{}
|
||||||
log.Info("CHECK_VERIF_EXISTENCE - START")
|
|
||||||
for peerID, messageHashes := range msgsToVerify {
|
for peerID, messageHashes := range msgsToVerify {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(peerID peer.ID, messageHashes []pb.MessageHash) {
|
go func(peerID peer.ID, messageHashes []pb.MessageHash) {
|
||||||
@ -222,9 +209,7 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo
|
|||||||
verifyMessageExistence(ctx, runId, peerID, messageHashes, wakuNode, dbStore, logger)
|
verifyMessageExistence(ctx, runId, peerID, messageHashes, wakuNode, dbStore, logger)
|
||||||
}(peerID, messageHashes)
|
}(peerID, messageHashes)
|
||||||
}
|
}
|
||||||
log.Info("CHECK_VERIF_EXISTENCE - BEFORE_WAIT")
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
log.Info("CHECK_VERIF_EXISTENCE - AFTER_WAIT")
|
|
||||||
|
|
||||||
// If a message is not available, store in DB in which store nodes it wasnt
|
// If a message is not available, store in DB in which store nodes it wasnt
|
||||||
// available and its timestamp
|
// available and its timestamp
|
||||||
@ -290,9 +275,12 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn
|
|||||||
var result *store.Result
|
var result *store.Result
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
queryLogger := logger.With(zap.Stringer("storenode", node.ID), zap.Int64("startTime", startTime.UnixNano()), zap.Int64("endTime", endTime.UnixNano()))
|
||||||
|
|
||||||
queryLbl:
|
queryLbl:
|
||||||
for i := 0; i < maxAttempts; i++ {
|
for i := 0; i < maxAttempts; i++ {
|
||||||
logger.Info("retrieving message history for topic!", zap.Stringer("storenode", node.ID), zap.Int64("startTime", startTime.UnixNano()), zap.Int64("endTime", endTime.UnixNano()), zap.Int("attempt", i))
|
|
||||||
|
queryLogger.Info("retrieving message history for topic!", zap.Int("attempt", i))
|
||||||
|
|
||||||
tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
|
tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
|
||||||
result, err = wakuNode.Store().Query(tCtx, store.FilterCriteria{
|
result, err = wakuNode.Store().Query(tCtx, store.FilterCriteria{
|
||||||
@ -302,38 +290,27 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn
|
|||||||
}, store.WithPeer(node.ID), store.WithPaging(false, 100))
|
}, store.WithPeer(node.ID), store.WithPaging(false, 100))
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("could not query storenode", zap.Stringer("storenode", node), zap.Error(err))
|
queryLogger.Error("could not query storenode", zap.Error(err), zap.Int("attempt", i))
|
||||||
storeNodeFailure = true
|
storeNodeFailure = true
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
} else {
|
} else {
|
||||||
logger.Info("messages available?", zap.Int("len", len(result.Messages())))
|
queryLogger.Info("messages available", zap.Int("len", len(result.Messages())))
|
||||||
storeNodeFailure = false
|
storeNodeFailure = false
|
||||||
break queryLbl
|
break queryLbl
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if storeNodeFailure {
|
if storeNodeFailure {
|
||||||
logger.Error("storenode not available", zap.Stringer("storenode", node), zap.Time("startTime", startTime), zap.Time("endTime", endTime))
|
queryLogger.Error("storenode not available")
|
||||||
err := dbStore.RecordStorenodeUnavailable(runId, node)
|
err := dbStore.RecordStorenodeUnavailable(runId, node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("could not store node unavailable", zap.Error(err), zap.Stringer("storenode", node))
|
queryLogger.Error("could not store node unavailable", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
iteratorLbl:
|
iteratorLbl:
|
||||||
for !result.IsComplete() {
|
for !result.IsComplete() {
|
||||||
msgMapLock.Lock()
|
msgMapLock.Lock()
|
||||||
|
|
||||||
oldMessageT := zap.Skip()
|
|
||||||
newMessageT := zap.Skip()
|
|
||||||
if len(result.Messages()) > 0 {
|
|
||||||
oldMessageT = zap.Int64("oldMessageT", result.Messages()[0].Message.GetTimestamp())
|
|
||||||
newMessageT = zap.Int64("newMessageT", result.Messages()[len(result.Messages())-1].Message.GetTimestamp())
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Info("OBTAINED MESSAGES", zap.Any("len", len(result.Messages())), newMessageT, oldMessageT)
|
|
||||||
|
|
||||||
for _, mkv := range result.Messages() {
|
for _, mkv := range result.Messages() {
|
||||||
hash := mkv.WakuMessageHash()
|
hash := mkv.WakuMessageHash()
|
||||||
_, ok := msgMap[hash]
|
_, ok := msgMap[hash]
|
||||||
@ -352,29 +329,24 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn
|
|||||||
|
|
||||||
nextRetryLbl:
|
nextRetryLbl:
|
||||||
for i := 0; i < maxAttempts; i++ {
|
for i := 0; i < maxAttempts; i++ {
|
||||||
logger.Info("EXECUTING NEXT!!!", zap.String("cursor", hex.EncodeToString(result.Cursor())))
|
queryLogger.Info("retrieving next page", zap.String("cursor", hex.EncodeToString(result.Cursor())))
|
||||||
err = result.Next(ctx)
|
err = result.Next(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("could not query storenode", zap.Stringer("storenode", node), zap.Error(err))
|
queryLogger.Error("could not query storenode", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Error(err))
|
||||||
storeNodeFailure = true
|
storeNodeFailure = true
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
} else {
|
} else {
|
||||||
|
queryLogger.Info("more messages available", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Int("len", len(result.Messages())))
|
||||||
storeNodeFailure = false
|
storeNodeFailure = false
|
||||||
break nextRetryLbl
|
break nextRetryLbl
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if storeNodeFailure {
|
if storeNodeFailure {
|
||||||
logger.Error("storenode not available",
|
queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor())))
|
||||||
zap.Stringer("storenode", node),
|
|
||||||
zap.Time("startTime", startTime),
|
|
||||||
zap.Time("endTime", endTime),
|
|
||||||
zap.String("topic", topic),
|
|
||||||
zap.String("cursor", hexutil.Encode(result.Cursor())))
|
|
||||||
|
|
||||||
err := dbStore.RecordStorenodeUnavailable(runId, node)
|
err := dbStore.RecordStorenodeUnavailable(runId, node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("could not store recordnode unavailable", zap.Error(err), zap.Stringer("storenode", node))
|
queryLogger.Error("could not store recordnode unavailable", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Error(err))
|
||||||
}
|
}
|
||||||
break iteratorLbl
|
break iteratorLbl
|
||||||
}
|
}
|
||||||
@ -396,28 +368,29 @@ func verifyMessageExistence(ctx context.Context, runId string, peerID peer.ID, m
|
|||||||
|
|
||||||
peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID)
|
peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID)
|
||||||
|
|
||||||
|
queryLogger := logger.With(zap.Stringer("storenode", peerID), zap.Stringers("hashes", messageHashes))
|
||||||
|
|
||||||
queryLbl:
|
queryLbl:
|
||||||
for i := 0; i < maxAttempts; i++ {
|
for i := 0; i < maxAttempts; i++ {
|
||||||
logger.Info("querying by hash", zap.Stringer("storenode", peerID), zap.Stringers("hashes", messageHashes), zap.Int("attempt", i))
|
queryLogger.Info("querying by hash", zap.Int("attempt", i))
|
||||||
result, err = wakuNode.Store().QueryByHash(ctx, messageHashes, store.IncludeData(false), store.WithPeer(peerInfo.ID), store.WithPaging(false, 100))
|
result, err = wakuNode.Store().QueryByHash(ctx, messageHashes, store.IncludeData(false), store.WithPeer(peerInfo.ID), store.WithPaging(false, 100))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("could not query storenode", zap.Stringer("storenode", peerInfo), zap.Error(err))
|
queryLogger.Error("could not query storenode", zap.Error(err), zap.Int("attempt", i))
|
||||||
storeNodeFailure = true
|
storeNodeFailure = true
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
} else {
|
} else {
|
||||||
|
queryLogger.Info("hashes available", zap.Int("len", len(result.Messages())))
|
||||||
storeNodeFailure = false
|
storeNodeFailure = false
|
||||||
break queryLbl
|
break queryLbl
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if storeNodeFailure {
|
if storeNodeFailure {
|
||||||
logger.Error("storenode not available",
|
queryLogger.Error("storenode not available")
|
||||||
zap.Stringer("storenode", peerInfo),
|
|
||||||
zap.Stringers("hashes", messageHashes))
|
|
||||||
|
|
||||||
err := dbStore.RecordStorenodeUnavailable(runId, peerInfo)
|
err := dbStore.RecordStorenodeUnavailable(runId, peerInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("could not store recordnode unavailable", zap.Error(err), zap.Stringer("storenode", peerInfo))
|
queryLogger.Error("could not store recordnode unavailable", zap.Error(err))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for !result.IsComplete() {
|
for !result.IsComplete() {
|
||||||
@ -443,30 +416,26 @@ queryLbl:
|
|||||||
|
|
||||||
nextRetryLbl:
|
nextRetryLbl:
|
||||||
for i := 0; i < maxAttempts; i++ {
|
for i := 0; i < maxAttempts; i++ {
|
||||||
logger.Info("executing next while querying hashes", zap.Stringer("storenode", peerID), zap.Int("attempt", i))
|
queryLogger.Info("executing next while querying hashes", zap.String("cursor", hexutil.Encode(result.Cursor())), zap.Int("attempt", i))
|
||||||
err = result.Next(ctx)
|
err = result.Next(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("could not query storenode", zap.Stringer("storenode", peerInfo), zap.Error(err))
|
queryLogger.Error("could not query storenode", zap.String("cursor", hexutil.Encode(result.Cursor())), zap.Error(err), zap.Int("attempt", i))
|
||||||
storeNodeFailure = true
|
storeNodeFailure = true
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
} else {
|
} else {
|
||||||
|
queryLogger.Info("more hashes available", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Int("len", len(result.Messages())))
|
||||||
storeNodeFailure = false
|
storeNodeFailure = false
|
||||||
break nextRetryLbl
|
break nextRetryLbl
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if storeNodeFailure {
|
if storeNodeFailure {
|
||||||
logger.Error("storenode not available",
|
queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor())))
|
||||||
zap.Stringer("storenode", peerInfo),
|
|
||||||
zap.Stringers("hashes", messageHashes),
|
|
||||||
zap.String("cursor", hexutil.Encode(result.Cursor())))
|
|
||||||
|
|
||||||
err := dbStore.RecordStorenodeUnavailable(runId, peerInfo)
|
err := dbStore.RecordStorenodeUnavailable(runId, peerInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("could not store recordnode unavailable", zap.Error(err), zap.Stringer("storenode", peerInfo))
|
logger.Error("could not store recordnode unavailable", zap.Error(err), zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Stringer("storenode", peerInfo))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user