From fceccf94b7d5569cf8268f3cbbbe3073035adbd3 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 30 May 2024 15:35:49 -0400 Subject: [PATCH] chore: timeout queries --- cmd/storemsgcounter/execute.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/cmd/storemsgcounter/execute.go b/cmd/storemsgcounter/execute.go index b4148c9..94c8808 100644 --- a/cmd/storemsgcounter/execute.go +++ b/cmd/storemsgcounter/execute.go @@ -154,10 +154,16 @@ var msgAttr map[pb.MessageHash]MessageAttr func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) error { // [MessageHash][StoreNode] = exists? + log.Info("LOCKING_MAP - START") msgMapLock.Lock() + log.Info("LOCKING_MAP - END") + msgMap = make(map[pb.MessageHash]map[peer.ID]MessageExistence) msgAttr = make(map[pb.MessageHash]MessageAttr) + + log.Info("UNLOCKING - START") msgMapLock.Unlock() + log.Info("UNLOCKING - END") topicSyncStatus, err := dbStore.GetTopicSyncStatus(ctx, options.ClusterID, options.PubSubTopics.Value()) if err != nil { @@ -286,12 +292,15 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn queryLbl: for i := 0; i < maxAttempts; i++ { - logger.Info("retrieving message history for topic", zap.Stringer("storenode", node), zap.Int64("from", startTime.UnixNano()), zap.Int64("to", endTime.UnixNano()), zap.Int("attempt", i)) - result, err = wakuNode.Store().Query(ctx, store.FilterCriteria{ + logger.Info("retrieving message history for topic!", zap.Stringer("storenode", node), zap.Int64("from", startTime.UnixNano()), zap.Int64("to", endTime.UnixNano()), zap.Int("attempt", i)) + + tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) + result, err = wakuNode.Store().Query(tCtx, store.FilterCriteria{ ContentFilter: protocol.NewContentFilter(topic), TimeStart: proto.Int64(startTime.UnixNano()), TimeEnd: proto.Int64(endTime.UnixNano()), }, store.WithPeer(node.ID), store.WithPaging(false, 100)) + cancel() if err != nil { logger.Error("could not query storenode", zap.Stringer("storenode", node), zap.Error(err)) storeNodeFailure = true