fix: set pubsubTopic in request community info from mailserver (#4194)

This commit is contained in:
Igor Sirotin 2023-10-25 11:13:35 +01:00 committed by GitHub
parent b08125890e
commit c6ff315dfc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 60 additions and 19 deletions

View File

@ -12,6 +12,7 @@ import (
"sync"
"time"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"golang.org/x/time/rate"
@ -2619,6 +2620,9 @@ func (m *Messenger) RequestCommunityInfoFromMailserverAsync(privateOrPublicKey s
// RequestCommunityInfoFromMailserver installs filter for community and requests its details
// from mailserver. When response received it will be passed through signals handler
func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, shard *common.Shard, waitForResponse bool) (*communities.Community, error) {
m.logger.Info("requesting community info", zap.String("communityID", communityID), zap.Any("shard", shard))
m.requestedCommunitiesLock.Lock()
defer m.requestedCommunitiesLock.Unlock()
@ -2653,11 +2657,17 @@ func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, shard
from := to - oneMonthInSeconds
_, err := m.performMailserverRequest(func() (*MessengerResponse, error) {
batch := MailserverBatch{From: from, To: to, Topics: []types.TopicType{filter.ContentTopic}}
m.logger.Info("Requesting historic")
batch := MailserverBatch{
From: from,
To: to,
Topics: []types.TopicType{filter.ContentTopic},
PubsubTopic: filter.PubsubTopic,
}
m.logger.Info("requesting historic", zap.Any("batch", batch))
err := m.processMailserverBatch(batch)
return nil, err
})
if err != nil {
return nil, err
}
@ -2682,6 +2692,7 @@ func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, shard
}
case <-ctx.Done():
m.logger.Error("failed to request community info", zap.String("communityID", communityID), zap.Error(ctx.Err()))
return nil, fmt.Errorf("failed to request community info for id '%s' from mailserver: %w", communityID, ctx.Err())
}
}
@ -2693,7 +2704,9 @@ func (m *Messenger) requestCommunitiesFromMailserver(communities []communities.C
m.requestedCommunitiesLock.Lock()
defer m.requestedCommunitiesLock.Unlock()
var topics []types.TopicType
// we group topics by PubsubTopic
groupedTopics := map[string]map[types.TopicType]struct{}{}
for _, c := range communities {
if _, ok := m.requestedCommunities[c.CommunityID]; ok {
continue
@ -2721,24 +2734,48 @@ func (m *Messenger) requestCommunitiesFromMailserver(communities []communities.C
//we don't remember filter id associated with community because it was already installed
m.requestedCommunities[c.CommunityID] = nil
}
topics = append(topics, filter.ContentTopic)
if _, ok := groupedTopics[filter.PubsubTopic]; !ok {
groupedTopics[filter.PubsubTopic] = map[types.TopicType]struct{}{}
}
groupedTopics[filter.PubsubTopic][filter.ContentTopic] = struct{}{}
}
defer func() {
for _, c := range communities {
m.forgetCommunityRequest(c.CommunityID)
}
}()
to := uint32(m.transport.GetCurrentTime() / 1000)
from := to - oneMonthInSeconds
_, err := m.performMailserverRequest(func() (*MessengerResponse, error) {
batch := MailserverBatch{From: from, To: to, Topics: topics}
m.logger.Info("Requesting historic")
err := m.processMailserverBatch(batch)
return nil, err
})
wg := sync.WaitGroup{}
if err != nil {
m.logger.Error("Err performing mailserver request", zap.Error(err))
return
for pubsubTopic, contentTopics := range groupedTopics {
wg.Add(1)
go func(pubsubTopic string, contentTopics map[types.TopicType]struct{}) {
batch := MailserverBatch{
From: from,
To: to,
Topics: maps.Keys(contentTopics),
PubsubTopic: pubsubTopic,
}
_, err := m.performMailserverRequest(func() (*MessengerResponse, error) {
m.logger.Info("requesting historic", zap.Any("batch", batch))
err := m.processMailserverBatch(batch)
return nil, err
})
if err != nil {
m.logger.Error("error performing mailserver request", zap.Any("batch", batch), zap.Error(err))
}
wg.Done()
}(pubsubTopic, contentTopics)
}
wg.Wait()
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
@ -2770,14 +2807,12 @@ func (m *Messenger) requestCommunitiesFromMailserver(communities []communities.C
}
}
for _, c := range communities {
m.forgetCommunityRequest(c.CommunityID)
}
}
// forgetCommunityRequest removes community from requested ones and removes filter
func (m *Messenger) forgetCommunityRequest(communityID string) {
m.logger.Info("forgetting community request", zap.String("communityID", communityID))
filter, ok := m.requestedCommunities[communityID]
if !ok {
return

View File

@ -112,8 +112,8 @@ func (m *Messenger) connectToNewMailserverAndWait() error {
func (m *Messenger) performMailserverRequest(fn func() (*MessengerResponse, error)) (*MessengerResponse, error) {
m.mailserverCycle.Lock()
defer m.mailserverCycle.Unlock()
m.mailserverCycle.RLock()
defer m.mailserverCycle.RUnlock()
var tries uint = 0
for tries < mailserverMaxTries {
if !m.isActiveMailserverAvailable() {
@ -135,6 +135,12 @@ func (m *Messenger) performMailserverRequest(fn func() (*MessengerResponse, erro
return response, nil
}
m.logger.Error("failed to perform mailserver request",
zap.String("mailserverID", activeMailserver.ID),
zap.Uint("tries", tries),
zap.Error(err),
)
tries++
// Increment failed requests
activeMailserver.FailedRequests++