feat_: force members reevaluation (#5139)
* chore_: enable adding community manager options from messenger config * chore_: make `reevaluateMembers` private method * fix(MessengerCommunitiesTokenPermissionsSuite)_: proper waiting * feat_: `ForceMembersReevaluation` method * test_: increate some test timeouts
This commit is contained in:
parent
3b6b38a414
commit
349ea8ad6e
|
@ -106,6 +106,7 @@ type Manager struct {
|
|||
historyArchiveTasksWaitGroup sync.WaitGroup
|
||||
historyArchiveTasks sync.Map // stores `chan struct{}`
|
||||
membersReevaluationTasks sync.Map // stores `membersReevaluationTask`
|
||||
forceMembersReevaluation map[string]chan struct{}
|
||||
torrentTasks map[string]metainfo.Hash
|
||||
historyArchiveDownloadTasks map[string]*HistoryArchiveDownloadTask
|
||||
stopped bool
|
||||
|
@ -191,6 +192,11 @@ type managerOptions struct {
|
|||
walletConfig *params.WalletConfig
|
||||
communityTokensService communitytokens.ServiceInterface
|
||||
permissionChecker PermissionChecker
|
||||
|
||||
// allowForcingCommunityMembersReevaluation indicates whether we should allow forcing community members reevaluation.
|
||||
// This will allow using `force` argument in ScheduleMembersReevaluation.
|
||||
// Should only be used in tests.
|
||||
allowForcingCommunityMembersReevaluation bool
|
||||
}
|
||||
|
||||
type TokenManager interface {
|
||||
|
@ -286,6 +292,12 @@ func WithCommunityTokensService(communityTokensService communitytokens.ServiceIn
|
|||
}
|
||||
}
|
||||
|
||||
func WithAllowForcingCommunityMembersReevaluation(enabled bool) ManagerOption {
|
||||
return func(opts *managerOptions) {
|
||||
opts.allowForcingCommunityMembersReevaluation = enabled
|
||||
}
|
||||
}
|
||||
|
||||
type OwnerVerifier interface {
|
||||
SafeGetSignerPubKey(ctx context.Context, chainID uint64, communityID string) (string, error)
|
||||
}
|
||||
|
@ -376,6 +388,11 @@ func NewManager(identity *ecdsa.PrivateKey, installationID string, db *sql.DB, e
|
|||
}
|
||||
}
|
||||
|
||||
if managerConfig.allowForcingCommunityMembersReevaluation {
|
||||
manager.logger.Warn("allowing forcing community members reevaluation, this should only be used in test environment")
|
||||
manager.forceMembersReevaluation = make(map[string]chan struct{}, 10)
|
||||
}
|
||||
|
||||
return manager, nil
|
||||
}
|
||||
|
||||
|
@ -954,8 +971,7 @@ func (m *Manager) EditCommunityTokenPermission(request *requests.EditCommunityTo
|
|||
return community, changes, nil
|
||||
}
|
||||
|
||||
// WARNING: ReevaluateMembers is only public to be used in messenger tests.
|
||||
func (m *Manager) ReevaluateMembers(communityID types.HexBytes) (*Community, map[protobuf.CommunityMember_Roles][]*ecdsa.PublicKey, error) {
|
||||
func (m *Manager) reevaluateMembers(communityID types.HexBytes) (*Community, map[protobuf.CommunityMember_Roles][]*ecdsa.PublicKey, error) {
|
||||
m.communityLock.Lock(communityID)
|
||||
defer m.communityLock.Unlock(communityID)
|
||||
|
||||
|
@ -1119,11 +1135,17 @@ func (m *Manager) reevaluateMembersLoop(communityID types.HexBytes, reevaluateOn
|
|||
m.membersReevaluationTasks.Store(communityID.String(), &membersReevaluationTask{})
|
||||
defer m.membersReevaluationTasks.Delete(communityID.String())
|
||||
|
||||
var forceReevaluation chan struct{}
|
||||
if m.forceMembersReevaluation != nil {
|
||||
forceReevaluation = make(chan struct{}, 10)
|
||||
m.forceMembersReevaluation[communityID.String()] = forceReevaluation
|
||||
}
|
||||
|
||||
type criticalError struct {
|
||||
error
|
||||
}
|
||||
|
||||
reevaluateMembers := func() (err error) {
|
||||
reevaluateMembers := func(force bool) (err error) {
|
||||
t, exists := m.membersReevaluationTasks.Load(communityID.String())
|
||||
if !exists {
|
||||
return criticalError{
|
||||
|
@ -1139,8 +1161,8 @@ func (m *Manager) reevaluateMembersLoop(communityID types.HexBytes, reevaluateOn
|
|||
task.mutex.Lock()
|
||||
defer task.mutex.Unlock()
|
||||
|
||||
// Ensure reevaluation is performed not more often than once per minute.
|
||||
if task.lastSuccessTime.After(time.Now().Add(-1 * time.Minute)) {
|
||||
// Ensure reevaluation is performed not more often than once per minute
|
||||
if !force && task.lastSuccessTime.After(time.Now().Add(-1*time.Minute)) {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1167,10 +1189,11 @@ func (m *Manager) reevaluateMembersLoop(communityID types.HexBytes, reevaluateOn
|
|||
defer ticker.Stop()
|
||||
|
||||
reevaluate := reevaluateOnStart
|
||||
force := false
|
||||
|
||||
for {
|
||||
if reevaluate {
|
||||
err := reevaluateMembers()
|
||||
err := reevaluateMembers(force)
|
||||
if err != nil {
|
||||
var criticalError *criticalError
|
||||
if errors.As(err, &criticalError) {
|
||||
|
@ -1179,18 +1202,37 @@ func (m *Manager) reevaluateMembersLoop(communityID types.HexBytes, reevaluateOn
|
|||
}
|
||||
}
|
||||
|
||||
force = false
|
||||
reevaluate = false
|
||||
|
||||
select {
|
||||
case <-ticker.C:
|
||||
reevaluate = true
|
||||
continue
|
||||
|
||||
case <-forceReevaluation:
|
||||
reevaluate = true
|
||||
force = true
|
||||
continue
|
||||
|
||||
case <-m.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) ForceMembersReevaluation(communityID types.HexBytes) error {
|
||||
if m.forceMembersReevaluation == nil {
|
||||
return errors.New("forcing members reevaluation is not allowed")
|
||||
}
|
||||
return m.scheduleMembersReevaluation(communityID, true)
|
||||
}
|
||||
|
||||
func (m *Manager) ScheduleMembersReevaluation(communityID types.HexBytes) error {
|
||||
return m.scheduleMembersReevaluation(communityID, false)
|
||||
}
|
||||
|
||||
func (m *Manager) scheduleMembersReevaluation(communityID types.HexBytes, forceImmediateReevaluation bool) error {
|
||||
t, exists := m.membersReevaluationTasks.Load(communityID.String())
|
||||
if !exists {
|
||||
return errors.New("reevaluation task doesn't exist")
|
||||
|
@ -1204,6 +1246,10 @@ func (m *Manager) ScheduleMembersReevaluation(communityID types.HexBytes) error
|
|||
defer task.mutex.Unlock()
|
||||
task.onDemandRequestTime = time.Now()
|
||||
|
||||
if forceImmediateReevaluation {
|
||||
m.forceMembersReevaluation[communityID.String()] <- struct{}{}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1233,7 +1279,7 @@ func (m *Manager) reevaluateCommunityMembersPermissions(communityID types.HexByt
|
|||
// Publish when the reevluation started since it can take a while
|
||||
signal.SendCommunityMemberReevaluationStarted(types.EncodeHex(communityID))
|
||||
|
||||
community, newPrivilegedMembers, err := m.ReevaluateMembers(communityID)
|
||||
community, newPrivilegedMembers, err := m.reevaluateMembers(communityID)
|
||||
|
||||
// Publish the reevaluation ending, even if it errored
|
||||
// A possible improvement would be to pass the error here
|
||||
|
|
|
@ -580,7 +580,7 @@ func waitOnCommunitiesEvent(user *Messenger, condition func(*communities.Subscri
|
|||
return
|
||||
}
|
||||
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
case <-time.After(5 * time.Second):
|
||||
errCh <- errors.New("timed out when waiting for communities event")
|
||||
return
|
||||
}
|
||||
|
|
|
@ -104,7 +104,7 @@ func (tckd *TestCommunitiesKeyDistributor) waitOnKeyDistribution(condition func(
|
|||
return
|
||||
}
|
||||
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
case <-time.After(5 * time.Second):
|
||||
errCh <- errors.New("timed out when waiting for key distribution")
|
||||
return
|
||||
}
|
||||
|
@ -184,6 +184,11 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TearDownTest() {
|
|||
}
|
||||
|
||||
func (s *MessengerCommunitiesTokenPermissionsSuite) newMessenger(password string, walletAddresses []string, waku types.Waku, name string, extraOptions []Option) *Messenger {
|
||||
communityManagerOptions := []communities.ManagerOption{
|
||||
communities.WithAllowForcingCommunityMembersReevaluation(true),
|
||||
}
|
||||
extraOptions = append(extraOptions, WithCommunityManagerOptions(communityManagerOptions))
|
||||
|
||||
return newTestCommunitiesMessenger(&s.Suite, waku, testCommunitiesMessengerConfig{
|
||||
testMessengerConfig: testMessengerConfig{
|
||||
logger: s.logger.Named(name),
|
||||
|
@ -1167,7 +1172,7 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) testViewChannelPermissions(v
|
|||
|
||||
// force owner to reevaluate channel members
|
||||
// in production it will happen automatically, by periodic check
|
||||
community, _, err = s.owner.communitiesManager.ReevaluateMembers(community.ID())
|
||||
err = s.owner.communitiesManager.ForceMembersReevaluation(community.ID())
|
||||
s.Require().NoError(err)
|
||||
|
||||
err = <-waitOnChannelKeyToBeDistributedToBob
|
||||
|
@ -1338,7 +1343,7 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestMemberRoleGetUpdatedWhen
|
|||
|
||||
// force owner to reevaluate channel members
|
||||
// in production it will happen automatically, by periodic check
|
||||
community, _, err = s.owner.communitiesManager.ReevaluateMembers(community.ID())
|
||||
err = s.owner.communitiesManager.ForceMembersReevaluation(community.ID())
|
||||
s.Require().NoError(err)
|
||||
|
||||
err = <-waitOnChannelKeyToBeDistributedToBob
|
||||
|
@ -1387,11 +1392,26 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestMemberRoleGetUpdatedWhen
|
|||
s.Require().True(s.owner.communitiesManager.IsChannelEncrypted(community.IDString(), chat.ID))
|
||||
s.Require().Len(response.CommunityChanges[0].TokenPermissionsModified, 1)
|
||||
|
||||
waitOnBobAddedToChannelAsPoster := waitOnCommunitiesEvent(s.owner, func(sub *communities.Subscription) bool {
|
||||
channel, ok := sub.Community.Chats()[chat.CommunityChatID()]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
member, ok := channel.Members[s.bob.IdentityPublicKeyString()]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return member.ChannelRole == protobuf.CommunityMember_CHANNEL_ROLE_POSTER
|
||||
})
|
||||
|
||||
// force owner to reevaluate channel members
|
||||
// in production it will happen automatically, by periodic check
|
||||
community, roles, err := s.owner.communitiesManager.ReevaluateMembers(community.ID())
|
||||
err = s.owner.communitiesManager.ForceMembersReevaluation(community.ID())
|
||||
s.Require().NoError(err)
|
||||
s.Require().Len(roles, 2)
|
||||
|
||||
err = <-waitOnBobAddedToChannelAsPoster
|
||||
s.Require().NoError(err)
|
||||
|
||||
community, err = s.owner.communitiesManager.GetByID(community.ID())
|
||||
s.Require().NoError(err)
|
||||
|
||||
|
@ -1488,9 +1508,20 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) testReevaluateMemberPrivileg
|
|||
s.Require().NoError(err)
|
||||
s.Require().True(checkRoleBasedOnThePermissionType(permissionType, &s.alice.identity.PublicKey, community))
|
||||
|
||||
waitOnPermissionsReevaluated := waitOnCommunitiesEvent(s.owner, func(sub *communities.Subscription) bool {
|
||||
if sub.Community == nil {
|
||||
return false
|
||||
}
|
||||
return checkRoleBasedOnThePermissionType(permissionType, &s.alice.identity.PublicKey, sub.Community)
|
||||
})
|
||||
|
||||
// the control node re-evaluates the roles of the participants, checking that the privileged user has not lost his role
|
||||
community, _, err = s.owner.communitiesManager.ReevaluateMembers(community.ID())
|
||||
err = s.owner.communitiesManager.ForceMembersReevaluation(community.ID())
|
||||
s.Require().NoError(err)
|
||||
|
||||
err = <-waitOnPermissionsReevaluated
|
||||
s.Require().NoError(err)
|
||||
|
||||
community, err = s.owner.communitiesManager.GetByID(community.ID())
|
||||
s.Require().NoError(err)
|
||||
s.Require().True(checkRoleBasedOnThePermissionType(permissionType, &s.alice.identity.PublicKey, community))
|
||||
|
@ -1511,7 +1542,17 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) testReevaluateMemberPrivileg
|
|||
s.Require().NoError(err)
|
||||
s.Require().False(community.HasTokenPermissions())
|
||||
|
||||
community, _, err = s.owner.communitiesManager.ReevaluateMembers(community.ID())
|
||||
waitOnPermissionsReevaluated = waitOnCommunitiesEvent(s.owner, func(sub *communities.Subscription) bool {
|
||||
if sub.Community == nil {
|
||||
return false
|
||||
}
|
||||
return !checkRoleBasedOnThePermissionType(permissionType, &s.alice.identity.PublicKey, sub.Community)
|
||||
})
|
||||
|
||||
err = s.owner.communitiesManager.ForceMembersReevaluation(community.ID())
|
||||
s.Require().NoError(err)
|
||||
|
||||
err = <-waitOnPermissionsReevaluated
|
||||
s.Require().NoError(err)
|
||||
|
||||
community, err = s.owner.communitiesManager.GetByID(community.ID())
|
||||
|
@ -1596,6 +1637,13 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) testReevaluateMemberPrivileg
|
|||
s.makeAddressSatisfyTheCriteria(testChainID1, aliceAddress1, tokenPermission.TokenCriteria[0])
|
||||
s.makeAddressSatisfyTheCriteria(testChainID1, aliceAddress1, tokenMemberPermission.TokenCriteria[0])
|
||||
|
||||
waitOnAliceAddedToCommunity := waitOnCommunitiesEvent(s.owner, func(sub *communities.Subscription) bool {
|
||||
if sub.Community == nil {
|
||||
return false
|
||||
}
|
||||
return checkRoleBasedOnThePermissionType(permissionType, &s.alice.identity.PublicKey, sub.Community)
|
||||
})
|
||||
|
||||
// join community as a privileged user
|
||||
s.joinCommunity(community, s.alice, alicePassword, []string{aliceAddress1})
|
||||
|
||||
|
@ -1604,7 +1652,10 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) testReevaluateMemberPrivileg
|
|||
s.Require().True(checkRoleBasedOnThePermissionType(permissionType, &s.alice.identity.PublicKey, community))
|
||||
|
||||
// the control node reevaluates the roles of the participants, checking that the privileged user has not lost his role
|
||||
community, _, err = s.owner.communitiesManager.ReevaluateMembers(community.ID())
|
||||
err = s.owner.communitiesManager.ForceMembersReevaluation(community.ID())
|
||||
s.Require().NoError(err)
|
||||
|
||||
err = <-waitOnAliceAddedToCommunity
|
||||
s.Require().NoError(err)
|
||||
|
||||
community, err = s.owner.communitiesManager.GetByID(community.ID())
|
||||
|
@ -1627,7 +1678,17 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) testReevaluateMemberPrivileg
|
|||
s.Require().NoError(err)
|
||||
s.Require().Len(response.Communities()[0].TokenPermissions(), 1)
|
||||
|
||||
community, _, err = s.owner.communitiesManager.ReevaluateMembers(community.ID())
|
||||
waitOnAliceLostPermission := waitOnCommunitiesEvent(s.owner, func(sub *communities.Subscription) bool {
|
||||
if sub.Community == nil {
|
||||
return false
|
||||
}
|
||||
return !checkRoleBasedOnThePermissionType(permissionType, &s.alice.identity.PublicKey, sub.Community)
|
||||
})
|
||||
|
||||
err = s.owner.communitiesManager.ForceMembersReevaluation(community.ID())
|
||||
s.Require().NoError(err)
|
||||
|
||||
err = <-waitOnAliceLostPermission
|
||||
s.Require().NoError(err)
|
||||
|
||||
community, err = s.owner.communitiesManager.GetByID(community.ID())
|
||||
|
@ -1651,7 +1712,17 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) testReevaluateMemberPrivileg
|
|||
s.Require().NoError(err)
|
||||
s.Require().Len(response.Communities()[0].TokenPermissions(), 0)
|
||||
|
||||
community, _, err = s.owner.communitiesManager.ReevaluateMembers(community.ID())
|
||||
waitOnReevaluation := waitOnCommunitiesEvent(s.owner, func(sub *communities.Subscription) bool {
|
||||
if sub.Community == nil {
|
||||
return false
|
||||
}
|
||||
return !checkRoleBasedOnThePermissionType(permissionType, &s.alice.identity.PublicKey, sub.Community)
|
||||
})
|
||||
|
||||
err = s.owner.communitiesManager.ForceMembersReevaluation(community.ID())
|
||||
s.Require().NoError(err)
|
||||
|
||||
err = <-waitOnReevaluation
|
||||
s.Require().NoError(err)
|
||||
|
||||
community, err = s.owner.communitiesManager.GetByID(community.ID())
|
||||
|
|
|
@ -477,6 +477,8 @@ func NewMessenger(
|
|||
managerOptions = append(managerOptions, communities.WithCommunityTokensService(c.communityTokensService))
|
||||
}
|
||||
|
||||
managerOptions = append(managerOptions, c.communityManagerOptions...)
|
||||
|
||||
communitiesKeyDistributor := &CommunitiesKeyDistributorImpl{
|
||||
sender: sender,
|
||||
encryptor: encryptionProtocol,
|
||||
|
|
|
@ -118,6 +118,8 @@ type config struct {
|
|||
|
||||
messageResendMinDelay time.Duration
|
||||
messageResendMaxCount int
|
||||
|
||||
communityManagerOptions []communities.ManagerOption
|
||||
}
|
||||
|
||||
func messengerDefaultConfig() config {
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
|
||||
"github.com/status-im/status-go/appdatabase"
|
||||
"github.com/status-im/status-go/params"
|
||||
"github.com/status-im/status-go/protocol/communities"
|
||||
"github.com/status-im/status-go/services/mailservers"
|
||||
"github.com/status-im/status-go/t/helpers"
|
||||
)
|
||||
|
@ -44,3 +45,10 @@ func WithCuratedCommunitiesUpdateLoop(enabled bool) Option {
|
|||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithCommunityManagerOptions(options []communities.ManagerOption) Option {
|
||||
return func(c *config) error {
|
||||
c.communityManagerOptions = options
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue