diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index 194794ae5..422ee0d6a 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "golang.org/x/exp/maps" "golang.org/x/exp/slices" "golang.org/x/time/rate" @@ -2619,6 +2620,9 @@ func (m *Messenger) RequestCommunityInfoFromMailserverAsync(privateOrPublicKey s // RequestCommunityInfoFromMailserver installs filter for community and requests its details // from mailserver. When response received it will be passed through signals handler func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, shard *common.Shard, waitForResponse bool) (*communities.Community, error) { + + m.logger.Info("requesting community info", zap.String("communityID", communityID), zap.Any("shard", shard)) + m.requestedCommunitiesLock.Lock() defer m.requestedCommunitiesLock.Unlock() @@ -2653,11 +2657,17 @@ func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, shard from := to - oneMonthInSeconds _, err := m.performMailserverRequest(func() (*MessengerResponse, error) { - batch := MailserverBatch{From: from, To: to, Topics: []types.TopicType{filter.ContentTopic}} - m.logger.Info("Requesting historic") + batch := MailserverBatch{ + From: from, + To: to, + Topics: []types.TopicType{filter.ContentTopic}, + PubsubTopic: filter.PubsubTopic, + } + m.logger.Info("requesting historic", zap.Any("batch", batch)) err := m.processMailserverBatch(batch) return nil, err }) + if err != nil { return nil, err } @@ -2682,6 +2692,7 @@ func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, shard } case <-ctx.Done(): + m.logger.Error("failed to request community info", zap.String("communityID", communityID), zap.Error(ctx.Err())) return nil, fmt.Errorf("failed to request community info for id '%s' from mailserver: %w", communityID, ctx.Err()) } } @@ -2693,7 +2704,9 @@ func (m *Messenger) requestCommunitiesFromMailserver(communities []communities.C m.requestedCommunitiesLock.Lock() defer m.requestedCommunitiesLock.Unlock() - var topics []types.TopicType + // we group topics by PubsubTopic + groupedTopics := map[string]map[types.TopicType]struct{}{} + for _, c := range communities { if _, ok := m.requestedCommunities[c.CommunityID]; ok { continue @@ -2721,24 +2734,48 @@ func (m *Messenger) requestCommunitiesFromMailserver(communities []communities.C //we don't remember filter id associated with community because it was already installed m.requestedCommunities[c.CommunityID] = nil } - topics = append(topics, filter.ContentTopic) + + if _, ok := groupedTopics[filter.PubsubTopic]; !ok { + groupedTopics[filter.PubsubTopic] = map[types.TopicType]struct{}{} + } + + groupedTopics[filter.PubsubTopic][filter.ContentTopic] = struct{}{} } + defer func() { + for _, c := range communities { + m.forgetCommunityRequest(c.CommunityID) + } + }() + to := uint32(m.transport.GetCurrentTime() / 1000) from := to - oneMonthInSeconds - _, err := m.performMailserverRequest(func() (*MessengerResponse, error) { - batch := MailserverBatch{From: from, To: to, Topics: topics} - m.logger.Info("Requesting historic") - err := m.processMailserverBatch(batch) - return nil, err - }) + wg := sync.WaitGroup{} - if err != nil { - m.logger.Error("Err performing mailserver request", zap.Error(err)) - return + for pubsubTopic, contentTopics := range groupedTopics { + wg.Add(1) + go func(pubsubTopic string, contentTopics map[types.TopicType]struct{}) { + batch := MailserverBatch{ + From: from, + To: to, + Topics: maps.Keys(contentTopics), + PubsubTopic: pubsubTopic, + } + _, err := m.performMailserverRequest(func() (*MessengerResponse, error) { + m.logger.Info("requesting historic", zap.Any("batch", batch)) + err := m.processMailserverBatch(batch) + return nil, err + }) + if err != nil { + m.logger.Error("error performing mailserver request", zap.Any("batch", batch), zap.Error(err)) + } + wg.Done() + }(pubsubTopic, contentTopics) } + wg.Wait() + ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() @@ -2770,14 +2807,12 @@ func (m *Messenger) requestCommunitiesFromMailserver(communities []communities.C } } - for _, c := range communities { - m.forgetCommunityRequest(c.CommunityID) - } - } // forgetCommunityRequest removes community from requested ones and removes filter func (m *Messenger) forgetCommunityRequest(communityID string) { + m.logger.Info("forgetting community request", zap.String("communityID", communityID)) + filter, ok := m.requestedCommunities[communityID] if !ok { return diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index cbbc50000..ce45efb1b 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -112,8 +112,8 @@ func (m *Messenger) connectToNewMailserverAndWait() error { func (m *Messenger) performMailserverRequest(fn func() (*MessengerResponse, error)) (*MessengerResponse, error) { - m.mailserverCycle.Lock() - defer m.mailserverCycle.Unlock() + m.mailserverCycle.RLock() + defer m.mailserverCycle.RUnlock() var tries uint = 0 for tries < mailserverMaxTries { if !m.isActiveMailserverAvailable() { @@ -135,6 +135,12 @@ func (m *Messenger) performMailserverRequest(fn func() (*MessengerResponse, erro return response, nil } + m.logger.Error("failed to perform mailserver request", + zap.String("mailserverID", activeMailserver.ID), + zap.Uint("tries", tries), + zap.Error(err), + ) + tries++ // Increment failed requests activeMailserver.FailedRequests++