fix(community)_: handle outdated request to join and request to join response when we have been joined (#5271)

This commit is contained in:
Mykhailo Prakhov 2024-05-31 19:16:45 +02:00 committed by GitHub
parent 64d2860571
commit 2053bd323b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 144 additions and 78 deletions

View File

@ -3999,10 +3999,6 @@ func (m *Manager) CanceledRequestsToJoinForUser(pk *ecdsa.PublicKey) ([]*Request
return m.persistence.CanceledRequestsToJoinForUser(common.PubkeyToHex(pk))
}
func (m *Manager) CanceledRequestToJoinForUserForCommunityID(pk *ecdsa.PublicKey, communityID []byte) (*RequestToJoin, error) {
return m.persistence.CanceledRequestToJoinForUserForCommunityID(common.PubkeyToHex(pk), communityID)
}
func (m *Manager) PendingRequestsToJoin() ([]*RequestToJoin, error) {
return m.persistence.PendingRequestsToJoin()
}

View File

@ -691,26 +691,6 @@ func (p *Persistence) CanceledRequestsToJoinForUser(pk string) ([]*RequestToJoin
return requests, nil
}
func (p *Persistence) CanceledRequestToJoinForUserForCommunityID(pk string, communityID []byte) (*RequestToJoin, error) {
row := p.db.QueryRow(`SELECT id,public_key,clock,ens_name,chat_id,community_id,state
FROM
communities_requests_to_join
WHERE
state = ? AND public_key = ? AND community_id = ?`,
RequestToJoinStateCanceled, pk, communityID)
request := &RequestToJoin{}
err := row.Scan(&request.ID, &request.PublicKey, &request.Clock, &request.ENSName, &request.ChatID, &request.CommunityID, &request.State)
if err == sql.ErrNoRows {
return nil, nil
} else if err != nil {
return nil, err
}
return request, nil
}
func (p *Persistence) RequestsToJoinForUserByState(pk string, state RequestToJoinState) ([]*RequestToJoin, error) {
var requests []*RequestToJoin
rows, err := p.db.Query(`SELECT id,public_key,clock,ens_name,chat_id,community_id,state FROM communities_requests_to_join WHERE state = ? AND public_key = ?`, state, pk)
@ -855,8 +835,8 @@ func (p *Persistence) GetRequestToJoinClockByPkAndCommunityID(pk string, communi
var clock uint64
err := p.db.QueryRow(`
SELECT clock
FROM communities_requests_to_join
SELECT clock
FROM communities_requests_to_join
WHERE public_key = ? AND community_id = ?`, pk, communityID).Scan(&clock)
return clock, err
}

View File

@ -4014,10 +4014,11 @@ func (s *MessengerCommunitiesSuite) TestRequestAndCancelCommunityAdminOffline()
s.Require().Empty(aliceJoinedCommunities)
// pull to make sure it has been saved
cancelRequestsToJoin, err := s.alice.MyCanceledRequestToJoinForCommunityID(community.ID())
myRequestToJoinId := communities.CalculateRequestID(s.alice.IdentityPublicKeyString(), community.ID())
canceledRequestToJoin, err := s.alice.communitiesManager.GetRequestToJoin(myRequestToJoinId)
s.Require().NoError(err)
s.Require().NotNil(cancelRequestsToJoin)
s.Require().Equal(cancelRequestsToJoin.State, communities.RequestToJoinStateCanceled)
s.Require().NotNil(canceledRequestToJoin)
s.Require().Equal(canceledRequestToJoin.State, communities.RequestToJoinStateCanceled)
s.Require().NoError(err)
@ -4029,7 +4030,7 @@ func (s *MessengerCommunitiesSuite) TestRequestAndCancelCommunityAdminOffline()
statusMessage.TransportLayer.Dst = community.PublicKey()
requestToJoinResponseProto := &protobuf.CommunityRequestToJoinResponse{
Clock: cancelRequestsToJoin.Clock,
Clock: canceledRequestToJoin.Clock,
CommunityId: community.ID(),
Accepted: true,
}
@ -4487,3 +4488,65 @@ func (s *MessengerCommunitiesSuite) TestAliceDoesNotReceiveMentionWhenSpectating
)
s.Require().NoError(err)
}
// this test simulate the scenario, when we are leaving the community and after the leave
// receiving outdated COMMUNITY_REQUEST_TO_JOIN_RESPONSE and joining the community again
func (s *MessengerCommunitiesSuite) TestAliceDidNotProcessOutdatedCommunityRequestToJoinResponse() {
community, _ := s.createCommunity()
advertiseCommunityTo(&s.Suite, community, s.owner, s.alice)
request := &requests.RequestToJoinCommunity{CommunityID: community.ID()}
joinCommunity(&s.Suite, community, s.owner, s.alice, request, "")
response, err := s.alice.LeaveCommunity(community.ID())
s.Require().NoError(err)
s.Require().Len(response.Communities(), 1)
s.Require().False(response.Communities()[0].Joined())
// double-check that alice left the community
community, err = s.alice.GetCommunityByID(community.ID())
s.Require().NoError(err)
s.Require().False(community.Joined())
// prepare the same request to join response
community, err = s.owner.GetCommunityByID(community.ID())
s.Require().NoError(err)
grant, err := community.BuildGrant(s.alice.IdentityPublicKey(), "")
s.Require().NoError(err)
var key *ecdsa.PrivateKey
if s.owner.transport.WakuVersion() == 2 {
key, err = s.owner.transport.RetrievePubsubTopicKey(community.PubsubTopic())
s.Require().NoError(err)
}
encryptedDescription, err := community.EncryptedDescription()
s.Require().NoError(err)
requestToJoinResponse := &protobuf.CommunityRequestToJoinResponse{
Clock: community.Clock(),
Accepted: true,
CommunityId: community.ID(),
Community: encryptedDescription,
Grant: grant,
ProtectedTopicPrivateKey: crypto.FromECDSA(key),
Shard: community.Shard().Protobuffer(),
}
// alice handle duplicated request to join response
state := &ReceivedMessageState{
Response: &MessengerResponse{},
CurrentMessageState: &CurrentMessageState{
PublicKey: community.ControlNode(),
},
}
err = s.alice.HandleCommunityRequestToJoinResponse(state, requestToJoinResponse, nil)
s.Require().Error(err, ErrOutdatedCommunityRequestToJoin)
// alice receives new request to join when she's already joined
requestToJoinResponse.Clock = requestToJoinResponse.Clock + 1
err = s.alice.HandleCommunityRequestToJoinResponse(state, requestToJoinResponse, nil)
s.Require().NoError(err)
}

View File

@ -1037,6 +1037,7 @@ func (m *Messenger) joinCommunity(ctx context.Context, communityID types.HexByte
if err = m.PublishIdentityImage(); err != nil {
return nil, err
}
// Was applicant not a member and successfully joined?
if !isCommunityMember && community.Joined() {
joinedNotification := &localnotifications.Notification{
@ -2906,10 +2907,6 @@ func (m *Messenger) MyCanceledRequestsToJoin() ([]*communities.RequestToJoin, er
return m.communitiesManager.CanceledRequestsToJoinForUser(&m.identity.PublicKey)
}
func (m *Messenger) MyCanceledRequestToJoinForCommunityID(communityID []byte) (*communities.RequestToJoin, error) {
return m.communitiesManager.CanceledRequestToJoinForUserForCommunityID(&m.identity.PublicKey, communityID)
}
func (m *Messenger) MyPendingRequestsToJoin() ([]*communities.RequestToJoin, error) {
return m.communitiesManager.PendingRequestsToJoinForUser(&m.identity.PublicKey)
}

View File

@ -55,6 +55,7 @@ var (
ErrInvalidCommunityID = errors.New("invalid community id")
ErrTryingToApplyOldTokenPreferences = errors.New("trying to apply old token preferences")
ErrTryingToApplyOldCollectiblePreferences = errors.New("trying to apply old collectible preferences")
ErrOutdatedCommunityRequestToJoin = errors.New("outdated community request to join response")
)
// HandleMembershipUpdate updates a Chat instance according to the membership updates.
@ -1643,16 +1644,32 @@ func (m *Messenger) HandleCommunityRequestToJoinResponse(state *ReceivedMessageS
return ErrInvalidCommunityID
}
myCancelledRequestToJoin, err := m.MyCanceledRequestToJoinForCommunityID(requestToJoinResponseProto.CommunityId)
myRequestToJoinId := communities.CalculateRequestID(m.IdentityPublicKeyString(), requestToJoinResponseProto.CommunityId)
requestToJoin, err := m.communitiesManager.GetRequestToJoin(myRequestToJoinId)
if err != nil {
return err
}
if myCancelledRequestToJoin != nil {
if requestToJoin.State == communities.RequestToJoinStateCanceled {
return nil
}
community, err := m.communitiesManager.GetByID(requestToJoinResponseProto.CommunityId)
if err != nil {
return err
}
// check if it is outdated approved request to join
if requestToJoin.State != communities.RequestToJoinStatePending && requestToJoinResponseProto.Clock <= community.Clock() {
m.logger.Error(ErrOutdatedCommunityRequestToJoin.Error(),
zap.String("communityId", community.IDString()),
zap.Uint64("communityClock", community.Clock()),
zap.Uint64("requestToJoinClock", requestToJoinResponseProto.Clock),
zap.Uint8("state", uint8(requestToJoin.State)))
return ErrOutdatedCommunityRequestToJoin
}
updatedRequest, err := m.communitiesManager.HandleCommunityRequestToJoinResponse(signer, requestToJoinResponseProto)
if err != nil {
return err
@ -1662,7 +1679,7 @@ func (m *Messenger) HandleCommunityRequestToJoinResponse(state *ReceivedMessageS
state.Response.AddRequestToJoinCommunity(updatedRequest)
}
community, err := m.communitiesManager.GetByID(requestToJoinResponseProto.CommunityId)
community, err = m.communitiesManager.GetByID(requestToJoinResponseProto.CommunityId)
if err != nil {
return err
}
@ -1681,52 +1698,65 @@ func (m *Messenger) HandleCommunityRequestToJoinResponse(state *ReceivedMessageS
return err
}
// Note: we can't guarantee that REQUEST_TO_JOIN_RESPONSE msg will be delivered before
// COMMUNITY_DESCRIPTION msg, so this msg can return an ErrOrgAlreadyJoined if we
// have been joined during COMMUNITY_DESCRIPTION
response, err := m.JoinCommunity(context.Background(), requestToJoinResponseProto.CommunityId, false)
if err != nil {
if err != nil && err != communities.ErrOrgAlreadyJoined {
return err
}
// we merge to include chats in response signal to joining a community
err = state.Response.Merge(response)
if err != nil {
return err
}
if len(response.Communities()) > 0 {
communitySettings := response.CommunitiesSettings()[0]
community := response.Communities()[0]
magnetlink := requestToJoinResponseProto.MagnetUri
if m.torrentClientReady() && communitySettings != nil && communitySettings.HistoryArchiveSupportEnabled && magnetlink != "" {
currentTask := m.communitiesManager.GetHistoryArchiveDownloadTask(community.IDString())
go func(currentTask *communities.HistoryArchiveDownloadTask) {
// Cancel ongoing download/import task
if currentTask != nil && !currentTask.IsCancelled() {
currentTask.Cancel()
currentTask.Waiter.Wait()
}
task := &communities.HistoryArchiveDownloadTask{
CancelChan: make(chan struct{}),
Waiter: *new(sync.WaitGroup),
Cancelled: false,
}
m.communitiesManager.AddHistoryArchiveDownloadTask(community.IDString(), task)
task.Waiter.Add(1)
defer task.Waiter.Done()
m.shutdownWaitGroup.Add(1)
defer m.shutdownWaitGroup.Done()
m.downloadAndImportHistoryArchives(community.ID(), magnetlink, task.CancelChan)
}(currentTask)
clock := requestToJoinResponseProto.Community.ArchiveMagnetlinkClock
return m.communitiesManager.UpdateMagnetlinkMessageClock(community.ID(), clock)
var communitySettings *communities.CommunitySettings
if response != nil {
// we merge to include chats in response signal to joining a community
err = state.Response.Merge(response)
if err != nil {
return err
}
if len(response.Communities()) > 0 {
communitySettings = response.CommunitiesSettings()[0]
community = response.Communities()[0]
}
}
if communitySettings == nil {
communitySettings, err = m.communitiesManager.GetCommunitySettingsByID(requestToJoinResponseProto.CommunityId)
if err != nil {
return nil
}
}
magnetlink := requestToJoinResponseProto.MagnetUri
if m.torrentClientReady() && communitySettings != nil && communitySettings.HistoryArchiveSupportEnabled && magnetlink != "" {
currentTask := m.communitiesManager.GetHistoryArchiveDownloadTask(community.IDString())
go func(currentTask *communities.HistoryArchiveDownloadTask) {
// Cancel ongoing download/import task
if currentTask != nil && !currentTask.IsCancelled() {
currentTask.Cancel()
currentTask.Waiter.Wait()
}
task := &communities.HistoryArchiveDownloadTask{
CancelChan: make(chan struct{}),
Waiter: *new(sync.WaitGroup),
Cancelled: false,
}
m.communitiesManager.AddHistoryArchiveDownloadTask(community.IDString(), task)
task.Waiter.Add(1)
defer task.Waiter.Done()
m.shutdownWaitGroup.Add(1)
defer m.shutdownWaitGroup.Done()
m.downloadAndImportHistoryArchives(community.ID(), magnetlink, task.CancelChan)
}(currentTask)
clock := requestToJoinResponseProto.Community.ArchiveMagnetlinkClock
return m.communitiesManager.UpdateMagnetlinkMessageClock(community.ID(), clock)
}
}