mirror of
https://github.com/logos-messaging/storenode-messages-counter.git
synced 2026-01-07 16:43:08 +00:00
chore: timeout queries
This commit is contained in:
parent
2f9bbfe337
commit
fceccf94b7
@ -154,10 +154,16 @@ var msgAttr map[pb.MessageHash]MessageAttr
|
|||||||
func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) error {
|
func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) error {
|
||||||
|
|
||||||
// [MessageHash][StoreNode] = exists?
|
// [MessageHash][StoreNode] = exists?
|
||||||
|
log.Info("LOCKING_MAP - START")
|
||||||
msgMapLock.Lock()
|
msgMapLock.Lock()
|
||||||
|
log.Info("LOCKING_MAP - END")
|
||||||
|
|
||||||
msgMap = make(map[pb.MessageHash]map[peer.ID]MessageExistence)
|
msgMap = make(map[pb.MessageHash]map[peer.ID]MessageExistence)
|
||||||
msgAttr = make(map[pb.MessageHash]MessageAttr)
|
msgAttr = make(map[pb.MessageHash]MessageAttr)
|
||||||
|
|
||||||
|
log.Info("UNLOCKING - START")
|
||||||
msgMapLock.Unlock()
|
msgMapLock.Unlock()
|
||||||
|
log.Info("UNLOCKING - END")
|
||||||
|
|
||||||
topicSyncStatus, err := dbStore.GetTopicSyncStatus(ctx, options.ClusterID, options.PubSubTopics.Value())
|
topicSyncStatus, err := dbStore.GetTopicSyncStatus(ctx, options.ClusterID, options.PubSubTopics.Value())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -286,12 +292,15 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn
|
|||||||
|
|
||||||
queryLbl:
|
queryLbl:
|
||||||
for i := 0; i < maxAttempts; i++ {
|
for i := 0; i < maxAttempts; i++ {
|
||||||
logger.Info("retrieving message history for topic", zap.Stringer("storenode", node), zap.Int64("from", startTime.UnixNano()), zap.Int64("to", endTime.UnixNano()), zap.Int("attempt", i))
|
logger.Info("retrieving message history for topic!", zap.Stringer("storenode", node), zap.Int64("from", startTime.UnixNano()), zap.Int64("to", endTime.UnixNano()), zap.Int("attempt", i))
|
||||||
result, err = wakuNode.Store().Query(ctx, store.FilterCriteria{
|
|
||||||
|
tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
|
||||||
|
result, err = wakuNode.Store().Query(tCtx, store.FilterCriteria{
|
||||||
ContentFilter: protocol.NewContentFilter(topic),
|
ContentFilter: protocol.NewContentFilter(topic),
|
||||||
TimeStart: proto.Int64(startTime.UnixNano()),
|
TimeStart: proto.Int64(startTime.UnixNano()),
|
||||||
TimeEnd: proto.Int64(endTime.UnixNano()),
|
TimeEnd: proto.Int64(endTime.UnixNano()),
|
||||||
}, store.WithPeer(node.ID), store.WithPaging(false, 100))
|
}, store.WithPeer(node.ID), store.WithPaging(false, 100))
|
||||||
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("could not query storenode", zap.Stringer("storenode", node), zap.Error(err))
|
logger.Error("could not query storenode", zap.Stringer("storenode", node), zap.Error(err))
|
||||||
storeNodeFailure = true
|
storeNodeFailure = true
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user