fix_: community members reevaluation fixes (#5117)
* fix_: prevent publishing older community description * fix_: schedule member reevaluation instead of reevaluating in parallel * fix_: lock community on members reevaluation * fix(TestJoinCommunityAsAdminWithMemberAndAdminPermission)_: setup waitOnCommunitiesEvent in advance * fix(TestEditSharedAddresses)_: remove redundant community description retrieval
This commit is contained in:
parent
522f3288b0
commit
3e4367a7cf
|
@ -951,9 +951,21 @@ func (m *Manager) EditCommunityTokenPermission(request *requests.EditCommunityTo
|
|||
return community, changes, nil
|
||||
}
|
||||
|
||||
func (m *Manager) ReevaluateMembers(community *Community) (map[protobuf.CommunityMember_Roles][]*ecdsa.PublicKey, error) {
|
||||
m.communityLock.Lock(community.ID())
|
||||
defer m.communityLock.Unlock(community.ID())
|
||||
// WARNING: ReevaluateMembers is only public to be used in messenger tests.
|
||||
func (m *Manager) ReevaluateMembers(communityID types.HexBytes) (*Community, map[protobuf.CommunityMember_Roles][]*ecdsa.PublicKey, error) {
|
||||
m.communityLock.Lock(communityID)
|
||||
defer m.communityLock.Unlock(communityID)
|
||||
|
||||
community, err := m.GetByID(communityID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// TODO: Control node needs to be notified to do a permission check if TokenMasters did airdrop
|
||||
// of the token which is using in a community permissions
|
||||
if !community.IsControlNode() {
|
||||
return nil, nil, ErrNotEnoughPermissions
|
||||
}
|
||||
|
||||
becomeMemberPermissions := community.TokenPermissionsByType(protobuf.CommunityTokenPermission_BECOME_MEMBER)
|
||||
becomeAdminPermissions := community.TokenPermissionsByType(protobuf.CommunityTokenPermission_BECOME_ADMIN)
|
||||
|
@ -968,7 +980,7 @@ func (m *Manager) ReevaluateMembers(community *Community) (map[protobuf.Communit
|
|||
for memberKey := range community.Members() {
|
||||
memberPubKey, err := common.HexToPubkey(memberKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if memberKey == common.PubkeyToHex(&m.identity.PublicKey) || community.IsMemberOwner(memberPubKey) {
|
||||
|
@ -980,7 +992,7 @@ func (m *Manager) ReevaluateMembers(community *Community) (map[protobuf.Communit
|
|||
requestID := CalculateRequestID(memberKey, community.ID())
|
||||
revealedAccounts, err := m.persistence.GetRequestToJoinRevealedAddresses(requestID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
memberHasWallet := len(revealedAccounts) > 0
|
||||
|
@ -990,7 +1002,7 @@ func (m *Manager) ReevaluateMembers(community *Community) (map[protobuf.Communit
|
|||
if !memberHasWallet && (hasMemberPermissions || isCurrentRoleTokenMaster || isCurrentRoleAdmin) {
|
||||
_, err = community.RemoveUserFromOrg(memberPubKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
@ -1001,7 +1013,7 @@ func (m *Manager) ReevaluateMembers(community *Community) (map[protobuf.Communit
|
|||
protobuf.CommunityMember_ROLE_TOKEN_MASTER, isCurrentRoleTokenMaster)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if isNewRoleTokenMaster {
|
||||
|
@ -1017,7 +1029,7 @@ func (m *Manager) ReevaluateMembers(community *Community) (map[protobuf.Communit
|
|||
protobuf.CommunityMember_ROLE_ADMIN, isCurrentRoleAdmin)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if isNewRoleAdmin {
|
||||
|
@ -1032,13 +1044,13 @@ func (m *Manager) ReevaluateMembers(community *Community) (map[protobuf.Communit
|
|||
if hasMemberPermissions {
|
||||
permissionResponse, err := m.PermissionChecker.CheckPermissions(becomeMemberPermissions, accountsAndChainIDs, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if !permissionResponse.Satisfied {
|
||||
_, err = community.RemoveUserFromOrg(memberPubKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
// Skip channels validation if user has been removed
|
||||
continue
|
||||
|
@ -1056,14 +1068,14 @@ func (m *Manager) ReevaluateMembers(community *Community) (map[protobuf.Communit
|
|||
// ensure all members are added back if channel permissions were removed
|
||||
_, err = community.PopulateChatWithAllMembers(channelID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
response, err := m.checkChannelPermissions(viewOnlyPermissions, viewAndPostPermissions, accountsAndChainIDs, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
isMemberAlreadyInChannel := community.IsMemberInChat(memberPubKey, channelID)
|
||||
|
@ -1077,22 +1089,25 @@ func (m *Manager) ReevaluateMembers(community *Community) (map[protobuf.Communit
|
|||
// Add the member back to the chat member list in case the role changed (it replaces the previous values)
|
||||
_, err := community.AddMemberToChat(channelID, memberPubKey, []protobuf.CommunityMember_Roles{}, channelRole)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
} else if isMemberAlreadyInChannel {
|
||||
_, err := community.RemoveUserFromChat(memberPubKey, channelID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return newPrivilegedRoles, m.saveAndPublish(community)
|
||||
return community, newPrivilegedRoles, m.saveAndPublish(community)
|
||||
}
|
||||
|
||||
func (m *Manager) ReevaluateMembersPeriodically(communityID types.HexBytes) {
|
||||
logger := m.logger.Named("reevaluate members loop").With(zap.String("communityID", communityID.String()))
|
||||
func (m *Manager) StartMembersReevaluationLoop(communityID types.HexBytes, reevaluateOnStart bool) {
|
||||
go m.reevaluateMembersLoop(communityID, reevaluateOnStart)
|
||||
}
|
||||
|
||||
func (m *Manager) reevaluateMembersLoop(communityID types.HexBytes, reevaluateOnStart bool) {
|
||||
|
||||
if _, exists := m.membersReevaluationTasks.Load(communityID.String()); exists {
|
||||
return
|
||||
|
@ -1126,43 +1141,45 @@ func (m *Manager) ReevaluateMembersPeriodically(communityID types.HexBytes) {
|
|||
return nil
|
||||
}
|
||||
|
||||
if task.lastSuccessTime.Before(time.Now().Add(-memberPermissionsCheckInterval)) ||
|
||||
task.lastSuccessTime.Before(task.onDemandRequestTime) {
|
||||
community, err := m.GetByID(communityID)
|
||||
if err != nil {
|
||||
if err == ErrOrgNotFound {
|
||||
return criticalError{
|
||||
error: err,
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
err = m.ReevaluateCommunityMembersPermissions(community)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
task.lastSuccessTime = time.Now()
|
||||
if !task.lastSuccessTime.Before(time.Now().Add(-memberPermissionsCheckInterval)) &&
|
||||
!task.lastSuccessTime.Before(task.onDemandRequestTime) {
|
||||
return nil
|
||||
}
|
||||
|
||||
err = m.reevaluateCommunityMembersPermissions(communityID)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrOrgNotFound) {
|
||||
return criticalError{
|
||||
error: err,
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
task.lastSuccessTime = time.Now()
|
||||
return nil
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
logger.Debug("loop started")
|
||||
defer logger.Debug("loop stopped")
|
||||
reevaluate := reevaluateOnStart
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if reevaluate {
|
||||
err := reevaluateMembers()
|
||||
if err != nil {
|
||||
logger.Error("reevaluation failed", zap.Error(err))
|
||||
if _, isCritical := err.(*criticalError); isCritical {
|
||||
var criticalError *criticalError
|
||||
if errors.As(err, &criticalError) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ticker.C:
|
||||
reevaluate = true
|
||||
continue
|
||||
|
||||
case <-m.quit:
|
||||
return
|
||||
|
@ -1209,18 +1226,8 @@ func (m *Manager) DeleteCommunityTokenPermission(request *requests.DeleteCommuni
|
|||
return community, changes, nil
|
||||
}
|
||||
|
||||
func (m *Manager) ReevaluateCommunityMembersPermissions(community *Community) error {
|
||||
if community == nil {
|
||||
return ErrOrgNotFound
|
||||
}
|
||||
|
||||
// TODO: Control node needs to be notified to do a permission check if TokenMasters did airdrop
|
||||
// of the token which is using in a community permissions
|
||||
if !community.IsControlNode() {
|
||||
return ErrNotEnoughPermissions
|
||||
}
|
||||
|
||||
newPrivilegedMembers, err := m.ReevaluateMembers(community)
|
||||
func (m *Manager) reevaluateCommunityMembersPermissions(communityID types.HexBytes) error {
|
||||
community, newPrivilegedMembers, err := m.ReevaluateMembers(communityID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -4921,7 +4928,9 @@ func (m *Manager) saveAndPublish(community *Community) error {
|
|||
if community.IsControlNode() {
|
||||
m.publish(&Subscription{Community: community})
|
||||
return nil
|
||||
} else if community.HasPermissionToSendCommunityEvents() {
|
||||
}
|
||||
|
||||
if community.HasPermissionToSendCommunityEvents() {
|
||||
err := m.signEvents(community)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -611,19 +611,6 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestEditSharedAddresses() {
|
|||
s.Require().Equal(revealedAccounts[0].Address, aliceAddress1)
|
||||
s.Require().Equal(true, revealedAccounts[0].IsAirdropAddress)
|
||||
|
||||
// Retrieve community description change
|
||||
err = tt.RetryWithBackOff(func() error {
|
||||
response, err := s.alice.RetrieveAll()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(response.Communities()) == 0 {
|
||||
return errors.New("no communities in response (address change reception)")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
s.Require().NoError(err)
|
||||
|
||||
alicesRevealedAccounts, err = s.alice.communitiesManager.GetRevealedAddresses(community.ID(), alicePubkey)
|
||||
s.Require().NoError(err)
|
||||
s.Require().Len(alicesRevealedAccounts, 1)
|
||||
|
@ -996,14 +983,15 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestJoinCommunityAsAdminWith
|
|||
},
|
||||
},
|
||||
}
|
||||
response, err := s.owner.CreateCommunityTokenPermission(&permissionRequestMember)
|
||||
s.Require().NoError(err)
|
||||
s.Require().Len(response.Communities(), 1)
|
||||
|
||||
waitOnCommunityPermissionCreated := waitOnCommunitiesEvent(s.owner, func(sub *communities.Subscription) bool {
|
||||
return sub.Community.HasTokenPermissions()
|
||||
})
|
||||
|
||||
response, err := s.owner.CreateCommunityTokenPermission(&permissionRequestMember)
|
||||
s.Require().NoError(err)
|
||||
s.Require().Len(response.Communities(), 1)
|
||||
|
||||
err = <-waitOnCommunityPermissionCreated
|
||||
s.Require().NoError(err)
|
||||
|
||||
|
@ -1021,16 +1009,17 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestJoinCommunityAsAdminWith
|
|||
},
|
||||
},
|
||||
}
|
||||
|
||||
waitOnCommunityPermissionCreated = waitOnCommunitiesEvent(s.owner, func(sub *communities.Subscription) bool {
|
||||
return len(sub.Community.TokenPermissions()) == 2
|
||||
})
|
||||
|
||||
response, err = s.owner.CreateCommunityTokenPermission(&permissionRequestAdmin)
|
||||
s.Require().NoError(err)
|
||||
s.Require().Len(response.Communities(), 1)
|
||||
s.Require().Len(response.Communities()[0].TokenPermissionsByType(protobuf.CommunityTokenPermission_BECOME_ADMIN), 1)
|
||||
s.Require().Len(response.Communities()[0].TokenPermissions(), 2)
|
||||
|
||||
waitOnCommunityPermissionCreated = waitOnCommunitiesEvent(s.owner, func(sub *communities.Subscription) bool {
|
||||
return len(sub.Community.TokenPermissions()) == 2
|
||||
})
|
||||
|
||||
err = <-waitOnCommunityPermissionCreated
|
||||
s.Require().NoError(err)
|
||||
|
||||
|
@ -1165,9 +1154,7 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) testViewChannelPermissions(v
|
|||
|
||||
// force owner to reevaluate channel members
|
||||
// in production it will happen automatically, by periodic check
|
||||
community, err = s.owner.communitiesManager.GetByID(community.ID())
|
||||
s.Require().NoError(err)
|
||||
_, err = s.owner.communitiesManager.ReevaluateMembers(community)
|
||||
community, _, err = s.owner.communitiesManager.ReevaluateMembers(community.ID())
|
||||
s.Require().NoError(err)
|
||||
|
||||
err = <-waitOnChannelKeyToBeDistributedToBob
|
||||
|
@ -1338,9 +1325,7 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestMemberRoleGetUpdatedWhen
|
|||
|
||||
// force owner to reevaluate channel members
|
||||
// in production it will happen automatically, by periodic check
|
||||
community, err = s.owner.communitiesManager.GetByID(community.ID())
|
||||
s.Require().NoError(err)
|
||||
_, err = s.owner.communitiesManager.ReevaluateMembers(community)
|
||||
community, _, err = s.owner.communitiesManager.ReevaluateMembers(community.ID())
|
||||
s.Require().NoError(err)
|
||||
|
||||
err = <-waitOnChannelKeyToBeDistributedToBob
|
||||
|
@ -1391,9 +1376,7 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestMemberRoleGetUpdatedWhen
|
|||
|
||||
// force owner to reevaluate channel members
|
||||
// in production it will happen automatically, by periodic check
|
||||
community, err = s.owner.communitiesManager.GetByID(community.ID())
|
||||
s.Require().NoError(err)
|
||||
roles, err := s.owner.communitiesManager.ReevaluateMembers(community)
|
||||
community, roles, err := s.owner.communitiesManager.ReevaluateMembers(community.ID())
|
||||
s.Require().NoError(err)
|
||||
s.Require().Len(roles, 2)
|
||||
community, err = s.owner.communitiesManager.GetByID(community.ID())
|
||||
|
@ -1493,7 +1476,7 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) testReevaluateMemberPrivileg
|
|||
s.Require().True(checkRoleBasedOnThePermissionType(permissionType, &s.alice.identity.PublicKey, community))
|
||||
|
||||
// the control node re-evaluates the roles of the participants, checking that the privileged user has not lost his role
|
||||
_, err = s.owner.communitiesManager.ReevaluateMembers(community)
|
||||
community, _, err = s.owner.communitiesManager.ReevaluateMembers(community.ID())
|
||||
s.Require().NoError(err)
|
||||
community, err = s.owner.communitiesManager.GetByID(community.ID())
|
||||
s.Require().NoError(err)
|
||||
|
@ -1515,7 +1498,7 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) testReevaluateMemberPrivileg
|
|||
s.Require().NoError(err)
|
||||
s.Require().False(community.HasTokenPermissions())
|
||||
|
||||
_, err = s.owner.communitiesManager.ReevaluateMembers(community)
|
||||
community, _, err = s.owner.communitiesManager.ReevaluateMembers(community.ID())
|
||||
s.Require().NoError(err)
|
||||
|
||||
community, err = s.owner.communitiesManager.GetByID(community.ID())
|
||||
|
@ -1608,7 +1591,7 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) testReevaluateMemberPrivileg
|
|||
s.Require().True(checkRoleBasedOnThePermissionType(permissionType, &s.alice.identity.PublicKey, community))
|
||||
|
||||
// the control node reevaluates the roles of the participants, checking that the privileged user has not lost his role
|
||||
_, err = s.owner.communitiesManager.ReevaluateMembers(community)
|
||||
community, _, err = s.owner.communitiesManager.ReevaluateMembers(community.ID())
|
||||
s.Require().NoError(err)
|
||||
|
||||
community, err = s.owner.communitiesManager.GetByID(community.ID())
|
||||
|
@ -1631,7 +1614,7 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) testReevaluateMemberPrivileg
|
|||
s.Require().NoError(err)
|
||||
s.Require().Len(response.Communities()[0].TokenPermissions(), 1)
|
||||
|
||||
_, err = s.owner.communitiesManager.ReevaluateMembers(community)
|
||||
community, _, err = s.owner.communitiesManager.ReevaluateMembers(community.ID())
|
||||
s.Require().NoError(err)
|
||||
|
||||
community, err = s.owner.communitiesManager.GetByID(community.ID())
|
||||
|
@ -1655,7 +1638,7 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) testReevaluateMemberPrivileg
|
|||
s.Require().NoError(err)
|
||||
s.Require().Len(response.Communities()[0].TokenPermissions(), 0)
|
||||
|
||||
_, err = s.owner.communitiesManager.ReevaluateMembers(community)
|
||||
community, _, err = s.owner.communitiesManager.ReevaluateMembers(community.ID())
|
||||
s.Require().NoError(err)
|
||||
|
||||
community, err = s.owner.communitiesManager.GetByID(community.ID())
|
||||
|
|
|
@ -820,7 +820,7 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
|
|||
|
||||
for _, c := range controlledCommunities {
|
||||
if c.Joined() && c.HasTokenPermissions() {
|
||||
go m.communitiesManager.ReevaluateMembersPeriodically(c.ID())
|
||||
go m.communitiesManager.StartMembersReevaluationLoop(c.ID(), false)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -292,6 +292,10 @@ func (m *Messenger) handleCommunitiesSubscription(c chan *communities.Subscripti
|
|||
publishOrgAndDistributeEncryptionKeys := func(community *communities.Community) {
|
||||
recentlyPublishedOrg := recentlyPublishedOrgs[community.IDString()]
|
||||
|
||||
if recentlyPublishedOrg != nil && community.Clock() <= recentlyPublishedOrg.Clock() {
|
||||
return
|
||||
}
|
||||
|
||||
// evaluate and distribute encryption keys (if any)
|
||||
encryptionKeyActions := communities.EvaluateCommunityEncryptionKeyActions(recentlyPublishedOrg, community)
|
||||
err := m.communitiesKeyDistributor.Distribute(community, encryptionKeyActions)
|
||||
|
@ -2498,14 +2502,7 @@ func (m *Messenger) CreateCommunityTokenPermission(request *requests.CreateCommu
|
|||
}
|
||||
|
||||
if community.IsControlNode() {
|
||||
// check existing member permission once, then check periodically
|
||||
go func() {
|
||||
if err := m.communitiesManager.ReevaluateCommunityMembersPermissions(community); err != nil {
|
||||
m.logger.Debug("failed to check member permissions", zap.Error(err))
|
||||
}
|
||||
|
||||
m.communitiesManager.ReevaluateMembersPeriodically(community.ID())
|
||||
}()
|
||||
m.communitiesManager.StartMembersReevaluationLoop(community.ID(), true)
|
||||
}
|
||||
|
||||
// ensure HRkeys are synced
|
||||
|
@ -2538,11 +2535,10 @@ func (m *Messenger) EditCommunityTokenPermission(request *requests.EditCommunity
|
|||
//
|
||||
// We do this in a separate routine to not block this function
|
||||
if community.IsControlNode() {
|
||||
go func() {
|
||||
if err := m.communitiesManager.ReevaluateCommunityMembersPermissions(community); err != nil {
|
||||
m.logger.Debug("failed to check member permissions", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
err = m.communitiesManager.ScheduleMembersReevaluation(community.ID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
response := &MessengerResponse{}
|
||||
|
@ -2565,11 +2561,10 @@ func (m *Messenger) DeleteCommunityTokenPermission(request *requests.DeleteCommu
|
|||
// check if members still fulfill the token criteria
|
||||
// We do this in a separate routine to not block this function
|
||||
if community.IsControlNode() {
|
||||
go func() {
|
||||
if err = m.communitiesManager.ReevaluateCommunityMembersPermissions(community); err != nil {
|
||||
m.logger.Debug("failed to check member permissions", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
err = m.communitiesManager.ScheduleMembersReevaluation(community.ID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
response := &MessengerResponse{}
|
||||
|
|
Loading…
Reference in New Issue