chore: improve StoreNodeRequestManager for community custom storenodes (#4860)

This commit is contained in:
Igor Sirotin 2024-03-04 20:46:25 +00:00 committed by GitHub
parent 440779fc8c
commit c217692c76
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 28 additions and 11 deletions

View File

@ -33,6 +33,17 @@ type storeNodeRequestID struct {
DataID string `json:"dataID"`
}
func (r *storeNodeRequestID) getCommunityID() string {
switch r.RequestType {
case storeNodeCommunityRequest:
return r.DataID
case storeNodeShardRequest:
return strings.TrimSuffix(r.DataID, transport.CommunityShardInfoTopicPrefix())
default:
return ""
}
}
type StoreNodeRequestManager struct {
messenger *Messenger
logger *zap.Logger
@ -502,24 +513,29 @@ func (r *storeNodeRequest) routine() {
r.finalize()
}()
communityIDStr := strings.TrimSuffix(r.requestID.DataID, transport.CommunityShardInfoTopicPrefix())
if !r.manager.messenger.communityStorenodes.HasStorenodeSetup(communityIDStr) && !r.manager.messenger.waitForAvailableStoreNode(storeNodeAvailableTimeout) {
r.result.err = fmt.Errorf("store node is not available")
return
communityID := r.requestID.getCommunityID()
if r.requestID.RequestType != storeNodeCommunityRequest || !r.manager.messenger.communityStorenodes.HasStorenodeSetup(communityID) {
if !r.manager.messenger.waitForAvailableStoreNode(storeNodeAvailableTimeout) {
r.result.err = fmt.Errorf("store node is not available")
return
}
}
ms := r.manager.messenger.getActiveMailserver(communityIDStr)
storeNode := r.manager.messenger.getActiveMailserver(communityID)
// Check if community already exists locally and get Clock.
localCommunity, _ := r.manager.messenger.communitiesManager.GetByIDString(r.requestID.DataID)
if localCommunity != nil {
r.minimumDataClock = localCommunity.Clock()
if r.requestID.RequestType == storeNodeCommunityRequest {
localCommunity, _ := r.manager.messenger.communitiesManager.GetByIDString(communityID)
if localCommunity != nil {
r.minimumDataClock = localCommunity.Clock()
}
}
// Start store node request
from, to := r.manager.messenger.calculateMailserverTimeBounds(oneMonthDuration)
_, err := r.manager.messenger.performMailserverRequest(ms, func(ms mailservers.Mailserver) (*MessengerResponse, error) {
_, err := r.manager.messenger.performMailserverRequest(storeNode, func(ms mailservers.Mailserver) (*MessengerResponse, error) {
batch := MailserverBatch{
From: from,
To: to,

View File

@ -583,6 +583,7 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestProfileInfo() {
s.Require().NoError(err)
s.createBob()
s.waitForAvailableStoreNode(s.bob)
s.fetchProfile(s.bob, s.owner.selfContact.ID, s.owner.selfContact)
}