fix: clear waku envelopes cache when deleting a chat (#4621)

* chore: extract `ErrPermissionToJoinNotSatisfied`
* chore: disable resending messages in communities tests
* chore: move newTestMessenger extraOptions to testMessengerConfig
* chore: `WithTestStoreNode` messenger option
* feat: waku `ClearEnvelopesCache` method
* fix: call `ClearEnvelopesCache` when deleting chat
* chore: `TestBecomeMemberPermissions` checks messages after rejoin
This commit is contained in:
Igor Sirotin 2024-01-30 13:43:34 +00:00 committed by GitHub
parent 69948a7024
commit 1f0fc2935c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 186 additions and 57 deletions

View File

@ -274,6 +274,10 @@ func (w *gethWakuWrapper) RequestStoreMessages(ctx context.Context, peerID []byt
func (w *gethWakuWrapper) ConnectionChanged(_ connection.State) {}
func (w *gethWakuWrapper) ClearEnvelopesCache() {
w.waku.ClearEnvelopesCache()
}
type wakuFilterWrapper struct {
filter *wakucommon.Filter
id string

View File

@ -289,6 +289,10 @@ func (w *gethWakuV2Wrapper) ConnectionChanged(state connection.State) {
w.waku.ConnectionChanged(state)
}
func (w *gethWakuV2Wrapper) ClearEnvelopesCache() {
w.waku.ClearEnvelopesCache()
}
type wakuV2FilterWrapper struct {
filter *wakucommon.Filter
id string

View File

@ -161,4 +161,7 @@ type Waku interface {
// ConnectionChanged is called whenever the client knows its connection status has changed
ConnectionChanged(connection.State)
// ClearEnvelopesCache clears waku envelopes cache
ClearEnvelopesCache()
}

View File

@ -44,3 +44,4 @@ var ErrInvalidManageTokensPermission = errors.New("no privileges to manage token
var ErrRevealedAccountsAbsent = errors.New("revealed accounts is absent")
var ErrNoRevealedAccountsSignature = errors.New("revealed accounts without the signature")
var ErrNoFreeSpaceForHistoryArchives = errors.New("history archive: No free space for downloading history archives")
var ErrPermissionToJoinNotSatisfied = errors.New("permission to join not satisfied")

View File

@ -14,6 +14,7 @@ import (
hexutil "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/status-im/status-go/account"
"github.com/status-im/status-go/appdatabase"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/multiaccounts/accounts"
@ -25,7 +26,9 @@ import (
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/services/communitytokens"
mailserversDB "github.com/status-im/status-go/services/mailservers"
walletToken "github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/transactions"
)
@ -223,14 +226,15 @@ func newTestCommunitiesMessenger(s *suite.Suite, waku types.Waku, config testCom
}
options := []Option{
WithResendParams(3, 3),
WithAccountManager(accountsManagerMock),
WithTokenManager(tokenManagerMock),
WithCommunityTokensService(config.collectiblesService),
WithAppSettings(*config.appSettings, *config.nodeConfig),
}
messenger, err := newTestMessenger(waku, config.testMessengerConfig, options)
config.extraOptions = append(config.extraOptions, options...)
messenger, err := newTestMessenger(waku, config.testMessengerConfig)
s.Require().NoError(err)
currentDistributorObj, ok := messenger.communitiesKeyDistributor.(*CommunitiesKeyDistributorImpl)
@ -499,7 +503,8 @@ func sendChatMessage(s *suite.Suite, sender *Messenger, chatID string, text stri
s.Require().NoError(err)
return msg
}*/
}
*/
func grantPermission(s *suite.Suite, community *communities.Community, controlNode *Messenger, target *Messenger, role protobuf.CommunityMember_Roles) {
responseAddRole, err := controlNode.AddRoleToMember(&requests.AddRoleToMember{
@ -587,3 +592,25 @@ func waitOnCommunitiesEvent(user *Messenger, condition func(*communities.Subscri
return errCh
}
func WithTestStoreNode(s *suite.Suite, id string, address string, fleet string, collectiblesServiceMock *CollectiblesServiceMock) Option {
return func(c *config) error {
sqldb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{})
s.Require().NoError(err)
db := mailserversDB.NewDB(sqldb)
err = db.Add(mailserversDB.Mailserver{
ID: id,
Name: id,
Address: address,
Fleet: fleet,
})
s.Require().NoError(err)
c.mailserversDatabase = db
c.clusterConfig = params.ClusterConfig{Fleet: fleet}
c.communityTokensService = collectiblesServiceMock
return nil
}
}

View File

@ -1,7 +1,6 @@
package protocol
import (
//"bytes"
"crypto/ecdsa"
"errors"
"math/big"
@ -109,7 +108,8 @@ func (tckd *TestCommunitiesKeyDistributor) waitOnKeyDistribution(condition func(
}()
return errCh
}*/
}
*/
func TestMessengerCommunitiesTokenPermissionsSuite(t *testing.T) {
suite.Run(t, new(MessengerCommunitiesTokenPermissionsSuite))
@ -136,17 +136,14 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) SetupTest() {
wakuNodes := CreateWakuV2Network(&s.Suite, s.logger, false, []string{"owner", "bob", "alice"})
ownerLogger := s.logger.With(zap.String("name", "owner"))
s.ownerWaku = wakuNodes[0]
s.owner = s.newMessenger(ownerPassword, []string{ownerAddress}, s.ownerWaku, ownerLogger)
s.owner = s.newMessenger(ownerPassword, []string{ownerAddress}, s.ownerWaku, "owner", []Option{})
bobLogger := s.logger.With(zap.String("name", "bob"))
s.bobWaku = wakuNodes[1]
s.bob = s.newMessenger(bobPassword, []string{bobAddress}, s.bobWaku, bobLogger)
s.bob = s.newMessenger(bobPassword, []string{bobAddress}, s.bobWaku, "bob", []Option{})
aliceLogger := s.logger.With(zap.String("name", "alice"))
s.aliceWaku = wakuNodes[2]
s.alice = s.newMessenger(alicePassword, []string{aliceAddress1, aliceAddress2}, s.aliceWaku, aliceLogger)
s.alice = s.newMessenger(alicePassword, []string{aliceAddress1, aliceAddress2}, s.aliceWaku, "alice", []Option{})
_, err := s.owner.Start()
s.Require().NoError(err)
@ -186,10 +183,11 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TearDownTest() {
_ = s.logger.Sync()
}
func (s *MessengerCommunitiesTokenPermissionsSuite) newMessenger(password string, walletAddresses []string, waku types.Waku, logger *zap.Logger) *Messenger {
func (s *MessengerCommunitiesTokenPermissionsSuite) newMessenger(password string, walletAddresses []string, waku types.Waku, name string, extraOptions []Option) *Messenger {
return newTestCommunitiesMessenger(&s.Suite, waku, testCommunitiesMessengerConfig{
testMessengerConfig: testMessengerConfig{
logger: logger,
logger: s.logger.Named(name),
extraOptions: extraOptions,
},
password: password,
walletAddresses: walletAddresses,
@ -224,7 +222,8 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) createCommunity() (*communit
/*
func (s *MessengerCommunitiesTokenPermissionsSuite) sendChatMessage(sender *Messenger, chatID string, text string) *common.Message {
return sendChatMessage(&s.Suite, sender, chatID, text)
}*/
}
*/
func (s *MessengerCommunitiesTokenPermissionsSuite) makeAddressSatisfyTheCriteria(chainID uint64, address string, criteria *protobuf.TokenCriteria) {
walletAddress := gethcommon.HexToAddress(address)
@ -242,7 +241,8 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) waitOnKeyDistribution(condit
testCommunitiesKeyDistributor, ok := s.owner.communitiesKeyDistributor.(*TestCommunitiesKeyDistributor)
s.Require().True(ok)
return testCommunitiesKeyDistributor.waitOnKeyDistribution(condition)
}*/
}
*/
func (s *MessengerCommunitiesTokenPermissionsSuite) TestCreateTokenPermission() {
community, _ := s.createCommunity()
@ -630,15 +630,68 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestEditSharedAddresses() {
// NOTE(cammellos): Disabling for now as flaky, the reason it fails is that the community
// key sometimes will be coming after the community description, working on a fix in a separate
// PR
/* func (s *MessengerCommunitiesTokenPermissionsSuite) TestBecomeMemberPermissions() {
/*
func(s *MessengerCommunitiesTokenPermissionsSuite) TestBecomeMemberPermissions() {
// Create a store node
// This is needed to fetch the messages after rejoining the community
var err error
storeNodeLogger := s.logger.Named("store-node-waku")
wakuStoreNode := NewWakuV2(&s.Suite, storeNodeLogger, true, true, false, shard.UndefinedShardValue)
storeNodeListenAddresses := wakuStoreNode.ListenAddresses()
s.Require().LessOrEqual(1, len(storeNodeListenAddresses))
storeNodeAddress := storeNodeListenAddresses[0]
s.logger.Info("store node ready", zap.String("address", storeNodeAddress))
// Create messengers
wakuNodes := CreateWakuV2Network(&s.Suite, s.logger, false, []string{"owner", "bob"})
s.ownerWaku = wakuNodes[0]
s.bobWaku = wakuNodes[1]
options := []Option{
WithTestStoreNode(&s.Suite, localMailserverID, storeNodeAddress, localFleet, s.collectiblesServiceMock),
}
s.owner = s.newMessenger(ownerPassword, []string{ownerAddress}, s.ownerWaku, "owner", options)
s.Require().NoError(err)
_, err = s.owner.Start()
s.Require().NoError(err)
s.bob = s.newMessenger(bobPassword, []string{bobAddress}, s.bobWaku, "bob", options)
s.Require().NoError(err)
_, err = s.bob.Start()
s.Require().NoError(err)
// Force the owner to use the store node as relay peer
err = s.owner.DialPeer(storeNodeAddress)
s.Require().NoError(err)
// Create a community
community, chat := s.createCommunity()
// bob joins the community
s.advertiseCommunityTo(community, s.bob)
s.joinCommunity(community, s.bob, bobPassword, []string{})
s.joinCommunityWithAirdropAddress(community, s.bob, bobPassword, []string{bobAddress}, "")
messages := []string{
"1-message", // RandomLettersString(10), // successful message on open community
"2-message", // RandomLettersString(11), // failing message on encrypted community
"3-message", // RandomLettersString(12), // successful message on encrypted community
}
// send message to the channel
msg := s.sendChatMessage(s.owner, chat.ID, "hello on open community")
msg := s.sendChatMessage(s.owner, chat.ID, messages[0])
s.logger.Debug("owner sent a message",
zap.String("messageText", msg.Text),
zap.String("messageID", msg.ID),
)
// bob can read the message
response, err := WaitOnMessengerResponse(
@ -651,12 +704,17 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestEditSharedAddresses() {
}
return false
},
"no messages",
"first message not received",
)
s.Require().NoError(err)
s.Require().Len(response.Messages(), 1)
s.Require().Equal(msg.Text, response.Messages()[0].Text)
bobMessages, _, err := s.bob.MessageByChatID(msg.ChatId, "", 10)
s.Require().NoError(err)
s.Require().Len(bobMessages, 1)
s.Require().Equal(messages[0], bobMessages[0].Text)
// setup become member permission
permissionRequest := requests.CreateCommunityTokenPermission{
CommunityID: community.ID(),
@ -709,7 +767,11 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestEditSharedAddresses() {
s.Require().NoError(err)
// send message to channel
msg = s.sendChatMessage(s.owner, chat.ID, "hello on encrypted community")
msg = s.sendChatMessage(s.owner, chat.ID, messages[1])
s.logger.Debug("owner sent a message",
zap.String("messageText", msg.Text),
zap.String("messageID", msg.ID),
)
// bob can't read the message
_, err = WaitOnMessengerResponse(
@ -730,13 +792,12 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestEditSharedAddresses() {
// bob tries to join, but he doesn't satisfy so the request isn't sent
request := &requests.RequestToJoinCommunity{CommunityID: community.ID(), AddressesToReveal: []string{bobAddress}, AirdropAddress: bobAddress}
_, err = s.bob.RequestToJoinCommunity(request)
s.Require().Error(err)
s.Require().ErrorContains(err, "permission to join not satisfied")
s.Require().ErrorIs(err, communities.ErrPermissionToJoinNotSatisfied)
// make sure bob does not have a pending request to join
requests, err := s.bob.MyPendingRequestsToJoin()
pendingRequests, err := s.bob.MyPendingRequestsToJoin()
s.Require().NoError(err)
s.Require().Len(requests, 0)
s.Require().Len(pendingRequests, 0)
// make bob satisfy the criteria
s.makeAddressSatisfyTheCriteria(testChainID1, bobAddress, permissionRequest.TokenCriteria[0])
@ -748,18 +809,25 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestEditSharedAddresses() {
})
// bob re-joins the community
s.joinCommunity(community, s.bob, bobPassword, []string{})
s.joinCommunity(community, s.bob, bobPassword, []string{bobAddress})
err = <-waitOnCommunityKeyToBeDistributedToBob
s.Require().NoError(err)
// send message to channel
msg = s.sendChatMessage(s.owner, chat.ID, "hello on encrypted community 2")
msg = s.sendChatMessage(s.owner, chat.ID, messages[2])
s.logger.Debug("owner sent a message",
zap.String("messageText", msg.Text),
zap.String("messageID", msg.ID),
)
// bob can read the message
_, err = WaitOnMessengerResponse(
s.bob,
func(r *MessengerResponse) bool {
if len(r.messages) != len(messages) {
return false
}
for _, message := range r.messages {
if message.Text == msg.Text {
return true
@ -767,11 +835,21 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestEditSharedAddresses() {
}
return false
},
"no messages",
"not all 3 messages received",
)
s.Require().NoError(err)
}
*/
// Bob should have all 3 messages
bobMessages, _, err = s.bob.MessageByChatID(msg.ChatId, "", 10)
s.Require().NoError(err)
s.Require().Len(bobMessages, 3)
sort.Slice(bobMessages, func(i, j int) bool {
return bobMessages[i].Text < bobMessages[j].Text
})
s.Require().Equal(messages[0], bobMessages[0].Text)
}*/
func (s *MessengerCommunitiesTokenPermissionsSuite) TestJoinCommunityWithAdminPermission() {
community, _ := s.createCommunity()

View File

@ -1549,7 +1549,7 @@ func (m *Messenger) watchExpiredMessages() {
if m.Online() {
err := m.resendExpiredMessages()
if err != nil {
m.logger.Debug("Error when resending expired emoji reactions", zap.Error(err))
m.logger.Debug("failed to resend expired message", zap.Error(err))
}
}
case <-m.quit:

View File

@ -74,7 +74,8 @@ func newMessengerWithKey(shh types.Waku, privateKey *ecdsa.PrivateKey, logger *z
m, err := newTestMessenger(shh, testMessengerConfig{
privateKey: privateKey,
logger: logger,
}, options)
extraOptions: options,
})
if err != nil {
return nil, err
}

View File

@ -25,6 +25,7 @@ type testMessengerConfig struct {
logger *zap.Logger
unhandledMessagesTracker *unhandledMessagesTracker
extraOptions []Option
}
func (tmc *testMessengerConfig) complete() error {
@ -42,13 +43,13 @@ func (tmc *testMessengerConfig) complete() error {
if tmc.logger == nil {
logger := tt.MustCreateTestLogger()
tmc.logger = logger.With(zap.String("name", tmc.name))
tmc.logger = logger.Named(tmc.name)
}
return nil
}
func newTestMessenger(waku types.Waku, config testMessengerConfig, extraOptions []Option) (*Messenger, error) {
func newTestMessenger(waku types.Waku, config testMessengerConfig) (*Messenger, error) {
err := config.complete()
if err != nil {
return nil, err
@ -80,7 +81,7 @@ func newTestMessenger(waku types.Waku, config testMessengerConfig, extraOptions
WithToplevelDatabaseMigrations(),
WithBrowserDatabase(nil),
}
options = append(options, extraOptions...)
options = append(options, config.extraOptions...)
m, err := NewMessenger(
config.name,

View File

@ -364,6 +364,13 @@ func (m *Messenger) deleteChat(chatID string) error {
if err != nil {
return err
}
// We clean the cache to be able to receive the messages again later
err = m.transport.ClearProcessedMessageIDsCache()
if err != nil {
return err
}
chat, ok := m.allChats.Load(chatID)
if ok && chat.Active && chat.Public() {

View File

@ -1046,7 +1046,7 @@ func (m *Messenger) RequestToJoinCommunity(request *requests.RequestToJoinCommun
return nil, err
}
if !permissions.Satisfied {
return nil, errors.New("permission to join not satisfied")
return nil, communities.ErrPermissionToJoinNotSatisfied
}
for _, accountAndChainIDs := range permissions.ValidCombinations {

View File

@ -90,6 +90,9 @@ func (s *MessengerOfflineSuite) newMessenger(waku types.Waku, logger *zap.Logger
return newTestCommunitiesMessenger(&s.Suite, waku, testCommunitiesMessengerConfig{
testMessengerConfig: testMessengerConfig{
logger: s.logger,
extraOptions: []Option{
WithResendParams(3, 3),
},
},
})
}

View File

@ -191,24 +191,8 @@ func (s *MessengerStoreNodeRequestSuite) newMessenger(shh types.Waku, logger *za
privateKey, err := crypto.GenerateKey()
s.Require().NoError(err)
mailserversSQLDb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{})
s.Require().NoError(err)
mailserversDatabase := mailserversDB.NewDB(mailserversSQLDb)
err = mailserversDatabase.Add(mailserversDB.Mailserver{
ID: localMailserverID,
Name: localMailserverID,
Address: mailserverAddress,
Fleet: localFleet,
})
s.Require().NoError(err)
options := []Option{
WithMailserversDatabase(mailserversDatabase),
WithClusterConfig(params.ClusterConfig{
Fleet: localFleet,
}),
WithCommunityTokensService(s.collectiblesServiceMock),
WithTestStoreNode(&s.Suite, localMailserverID, mailserverAddress, localFleet, s.collectiblesServiceMock),
}
messenger, err := newMessengerWithKey(shh, privateKey, logger, options)
@ -617,7 +601,6 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestShardAndCommunityInfo() {
s.createOwner()
s.createBob()
s.waitForAvailableStoreNode(s.owner)
community := s.createCommunity(s.owner)
expectedShard := &shard.Shard{

View File

@ -134,7 +134,9 @@ func (s *MessengerSyncSettingsSuite) newMessenger() *Messenger {
Currency: "eth",
}
m, err := newTestMessenger(s.shh, testMessengerConfig{}, []Option{WithAppSettings(setting, config)})
m, err := newTestMessenger(s.shh, testMessengerConfig{
extraOptions: []Option{WithAppSettings(setting, config)},
})
s.Require().NoError(err)
return m
}

View File

@ -313,7 +313,7 @@ func NewWakuV2(s *suite.Suite, logger *zap.Logger, useLocalWaku bool, enableStor
func CreateWakuV2Network(s *suite.Suite, parentLogger *zap.Logger, useShardAsDefaultTopic bool, nodeNames []string) []types.Waku {
nodes := make([]*waku2.Waku, len(nodeNames))
for i, name := range nodeNames {
logger := parentLogger.With(zap.String("name", name+"-waku"))
logger := parentLogger.Named(name + "-waku")
wakuNode := NewWakuV2(s, logger, true, false, useShardAsDefaultTopic, 0)
nodes[i] = wakuNode
}

View File

@ -608,6 +608,7 @@ func (t *Transport) waitForRequestCompleted(ctx context.Context, requestID []byt
// ConfirmMessagesProcessed marks the messages as processed in the cache so
// they won't be passed to the next layer anymore
func (t *Transport) ConfirmMessagesProcessed(ids []string, timestamp uint64) error {
t.logger.Debug("confirming message processed", zap.Any("ids", ids), zap.Any("timestamp", timestamp))
return t.cache.Add(ids, timestamp)
}
@ -625,6 +626,8 @@ func (t *Transport) SetEnvelopeEventsHandler(handler EnvelopeEventsHandler) erro
}
func (t *Transport) ClearProcessedMessageIDsCache() error {
t.logger.Debug("clearing processed messages cache")
t.waku.ClearEnvelopesCache()
return t.cache.Clear()
}

View File

@ -1580,6 +1580,12 @@ func (w *Waku) MarkP2PMessageAsProcessed(hash gethcommon.Hash) {
delete(w.p2pMsgIDs, hash)
}
func (w *Waku) ClearEnvelopesCache() {
w.poolMu.Lock()
defer w.poolMu.Unlock()
w.envelopes = make(map[gethcommon.Hash]*common.Envelope)
}
func (w *Waku) Clean() error {
w.poolMu.Lock()
defer w.poolMu.Unlock()

View File

@ -1510,6 +1510,12 @@ func (w *Waku) IsEnvelopeCached(hash gethcommon.Hash) bool {
return exist
}
func (w *Waku) ClearEnvelopesCache() {
w.poolMu.Lock()
defer w.poolMu.Unlock()
w.envelopes = make(map[gethcommon.Hash]*common.ReceivedMessage)
}
func (w *Waku) PeerCount() int {
return w.node.PeerCount()
}