From 1f0fc2935c63334294c045a4370420111455879f Mon Sep 17 00:00:00 2001 From: Igor Sirotin Date: Tue, 30 Jan 2024 13:43:34 +0000 Subject: [PATCH] fix: clear waku envelopes cache when deleting a chat (#4621) * chore: extract `ErrPermissionToJoinNotSatisfied` * chore: disable resending messages in communities tests * chore: move newTestMessenger extraOptions to testMessengerConfig * chore: `WithTestStoreNode` messenger option * feat: waku `ClearEnvelopesCache` method * fix: call `ClearEnvelopesCache` when deleting chat * chore: `TestBecomeMemberPermissions` checks messages after rejoin --- eth-node/bridge/geth/waku.go | 4 + eth-node/bridge/geth/wakuv2.go | 4 + eth-node/types/waku.go | 3 + protocol/communities/errors.go | 1 + .../communities_messenger_helpers_test.go | 33 ++++- ...nities_messenger_token_permissions_test.go | 130 ++++++++++++++---- protocol/messenger.go | 2 +- protocol/messenger_base_test.go | 7 +- protocol/messenger_builder_test.go | 7 +- protocol/messenger_chats.go | 7 + protocol/messenger_communities.go | 2 +- protocol/messenger_offline_test.go | 3 + protocol/messenger_storenode_request_test.go | 19 +-- protocol/messenger_sync_settings_test.go | 4 +- protocol/messenger_testing_utils.go | 2 +- protocol/transport/transport.go | 3 + waku/waku.go | 6 + wakuv2/waku.go | 6 + 18 files changed, 186 insertions(+), 57 deletions(-) diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index 55c72917c..a1a445cca 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -274,6 +274,10 @@ func (w *gethWakuWrapper) RequestStoreMessages(ctx context.Context, peerID []byt func (w *gethWakuWrapper) ConnectionChanged(_ connection.State) {} +func (w *gethWakuWrapper) ClearEnvelopesCache() { + w.waku.ClearEnvelopesCache() +} + type wakuFilterWrapper struct { filter *wakucommon.Filter id string diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index 3449b180a..78acb5f5f 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -289,6 +289,10 @@ func (w *gethWakuV2Wrapper) ConnectionChanged(state connection.State) { w.waku.ConnectionChanged(state) } +func (w *gethWakuV2Wrapper) ClearEnvelopesCache() { + w.waku.ClearEnvelopesCache() +} + type wakuV2FilterWrapper struct { filter *wakucommon.Filter id string diff --git a/eth-node/types/waku.go b/eth-node/types/waku.go index 1d737e513..db0ff149d 100644 --- a/eth-node/types/waku.go +++ b/eth-node/types/waku.go @@ -161,4 +161,7 @@ type Waku interface { // ConnectionChanged is called whenever the client knows its connection status has changed ConnectionChanged(connection.State) + + // ClearEnvelopesCache clears waku envelopes cache + ClearEnvelopesCache() } diff --git a/protocol/communities/errors.go b/protocol/communities/errors.go index 1ef53202b..a34a552b9 100644 --- a/protocol/communities/errors.go +++ b/protocol/communities/errors.go @@ -44,3 +44,4 @@ var ErrInvalidManageTokensPermission = errors.New("no privileges to manage token var ErrRevealedAccountsAbsent = errors.New("revealed accounts is absent") var ErrNoRevealedAccountsSignature = errors.New("revealed accounts without the signature") var ErrNoFreeSpaceForHistoryArchives = errors.New("history archive: No free space for downloading history archives") +var ErrPermissionToJoinNotSatisfied = errors.New("permission to join not satisfied") diff --git a/protocol/communities_messenger_helpers_test.go b/protocol/communities_messenger_helpers_test.go index 12da75aba..051ddb6d9 100644 --- a/protocol/communities_messenger_helpers_test.go +++ b/protocol/communities_messenger_helpers_test.go @@ -14,6 +14,7 @@ import ( hexutil "github.com/ethereum/go-ethereum/common/hexutil" "github.com/status-im/status-go/account" + "github.com/status-im/status-go/appdatabase" "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/multiaccounts/accounts" @@ -25,7 +26,9 @@ import ( "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/requests" "github.com/status-im/status-go/services/communitytokens" + mailserversDB "github.com/status-im/status-go/services/mailservers" walletToken "github.com/status-im/status-go/services/wallet/token" + "github.com/status-im/status-go/t/helpers" "github.com/status-im/status-go/transactions" ) @@ -223,14 +226,15 @@ func newTestCommunitiesMessenger(s *suite.Suite, waku types.Waku, config testCom } options := []Option{ - WithResendParams(3, 3), WithAccountManager(accountsManagerMock), WithTokenManager(tokenManagerMock), WithCommunityTokensService(config.collectiblesService), WithAppSettings(*config.appSettings, *config.nodeConfig), } - messenger, err := newTestMessenger(waku, config.testMessengerConfig, options) + config.extraOptions = append(config.extraOptions, options...) + + messenger, err := newTestMessenger(waku, config.testMessengerConfig) s.Require().NoError(err) currentDistributorObj, ok := messenger.communitiesKeyDistributor.(*CommunitiesKeyDistributorImpl) @@ -499,7 +503,8 @@ func sendChatMessage(s *suite.Suite, sender *Messenger, chatID string, text stri s.Require().NoError(err) return msg -}*/ +} +*/ func grantPermission(s *suite.Suite, community *communities.Community, controlNode *Messenger, target *Messenger, role protobuf.CommunityMember_Roles) { responseAddRole, err := controlNode.AddRoleToMember(&requests.AddRoleToMember{ @@ -587,3 +592,25 @@ func waitOnCommunitiesEvent(user *Messenger, condition func(*communities.Subscri return errCh } + +func WithTestStoreNode(s *suite.Suite, id string, address string, fleet string, collectiblesServiceMock *CollectiblesServiceMock) Option { + return func(c *config) error { + sqldb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) + s.Require().NoError(err) + + db := mailserversDB.NewDB(sqldb) + err = db.Add(mailserversDB.Mailserver{ + ID: id, + Name: id, + Address: address, + Fleet: fleet, + }) + s.Require().NoError(err) + + c.mailserversDatabase = db + c.clusterConfig = params.ClusterConfig{Fleet: fleet} + c.communityTokensService = collectiblesServiceMock + + return nil + } +} diff --git a/protocol/communities_messenger_token_permissions_test.go b/protocol/communities_messenger_token_permissions_test.go index f7d0a5b0d..0082c71d4 100644 --- a/protocol/communities_messenger_token_permissions_test.go +++ b/protocol/communities_messenger_token_permissions_test.go @@ -1,7 +1,6 @@ package protocol import ( - //"bytes" "crypto/ecdsa" "errors" "math/big" @@ -109,7 +108,8 @@ func (tckd *TestCommunitiesKeyDistributor) waitOnKeyDistribution(condition func( }() return errCh -}*/ +} +*/ func TestMessengerCommunitiesTokenPermissionsSuite(t *testing.T) { suite.Run(t, new(MessengerCommunitiesTokenPermissionsSuite)) @@ -136,17 +136,14 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) SetupTest() { wakuNodes := CreateWakuV2Network(&s.Suite, s.logger, false, []string{"owner", "bob", "alice"}) - ownerLogger := s.logger.With(zap.String("name", "owner")) s.ownerWaku = wakuNodes[0] - s.owner = s.newMessenger(ownerPassword, []string{ownerAddress}, s.ownerWaku, ownerLogger) + s.owner = s.newMessenger(ownerPassword, []string{ownerAddress}, s.ownerWaku, "owner", []Option{}) - bobLogger := s.logger.With(zap.String("name", "bob")) s.bobWaku = wakuNodes[1] - s.bob = s.newMessenger(bobPassword, []string{bobAddress}, s.bobWaku, bobLogger) + s.bob = s.newMessenger(bobPassword, []string{bobAddress}, s.bobWaku, "bob", []Option{}) - aliceLogger := s.logger.With(zap.String("name", "alice")) s.aliceWaku = wakuNodes[2] - s.alice = s.newMessenger(alicePassword, []string{aliceAddress1, aliceAddress2}, s.aliceWaku, aliceLogger) + s.alice = s.newMessenger(alicePassword, []string{aliceAddress1, aliceAddress2}, s.aliceWaku, "alice", []Option{}) _, err := s.owner.Start() s.Require().NoError(err) @@ -186,10 +183,11 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TearDownTest() { _ = s.logger.Sync() } -func (s *MessengerCommunitiesTokenPermissionsSuite) newMessenger(password string, walletAddresses []string, waku types.Waku, logger *zap.Logger) *Messenger { +func (s *MessengerCommunitiesTokenPermissionsSuite) newMessenger(password string, walletAddresses []string, waku types.Waku, name string, extraOptions []Option) *Messenger { return newTestCommunitiesMessenger(&s.Suite, waku, testCommunitiesMessengerConfig{ testMessengerConfig: testMessengerConfig{ - logger: logger, + logger: s.logger.Named(name), + extraOptions: extraOptions, }, password: password, walletAddresses: walletAddresses, @@ -224,7 +222,8 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) createCommunity() (*communit /* func (s *MessengerCommunitiesTokenPermissionsSuite) sendChatMessage(sender *Messenger, chatID string, text string) *common.Message { return sendChatMessage(&s.Suite, sender, chatID, text) -}*/ +} +*/ func (s *MessengerCommunitiesTokenPermissionsSuite) makeAddressSatisfyTheCriteria(chainID uint64, address string, criteria *protobuf.TokenCriteria) { walletAddress := gethcommon.HexToAddress(address) @@ -242,7 +241,8 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) waitOnKeyDistribution(condit testCommunitiesKeyDistributor, ok := s.owner.communitiesKeyDistributor.(*TestCommunitiesKeyDistributor) s.Require().True(ok) return testCommunitiesKeyDistributor.waitOnKeyDistribution(condition) -}*/ +} +*/ func (s *MessengerCommunitiesTokenPermissionsSuite) TestCreateTokenPermission() { community, _ := s.createCommunity() @@ -630,15 +630,68 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestEditSharedAddresses() { // NOTE(cammellos): Disabling for now as flaky, the reason it fails is that the community // key sometimes will be coming after the community description, working on a fix in a separate // PR -/* func (s *MessengerCommunitiesTokenPermissionsSuite) TestBecomeMemberPermissions() { +/* +func(s *MessengerCommunitiesTokenPermissionsSuite) TestBecomeMemberPermissions() { + // Create a store node + // This is needed to fetch the messages after rejoining the community + var err error + + storeNodeLogger := s.logger.Named("store-node-waku") + wakuStoreNode := NewWakuV2(&s.Suite, storeNodeLogger, true, true, false, shard.UndefinedShardValue) + + storeNodeListenAddresses := wakuStoreNode.ListenAddresses() + s.Require().LessOrEqual(1, len(storeNodeListenAddresses)) + + storeNodeAddress := storeNodeListenAddresses[0] + s.logger.Info("store node ready", zap.String("address", storeNodeAddress)) + + // Create messengers + + wakuNodes := CreateWakuV2Network(&s.Suite, s.logger, false, []string{"owner", "bob"}) + s.ownerWaku = wakuNodes[0] + s.bobWaku = wakuNodes[1] + + options := []Option{ + WithTestStoreNode(&s.Suite, localMailserverID, storeNodeAddress, localFleet, s.collectiblesServiceMock), + } + + s.owner = s.newMessenger(ownerPassword, []string{ownerAddress}, s.ownerWaku, "owner", options) + s.Require().NoError(err) + + _, err = s.owner.Start() + s.Require().NoError(err) + + s.bob = s.newMessenger(bobPassword, []string{bobAddress}, s.bobWaku, "bob", options) + s.Require().NoError(err) + + _, err = s.bob.Start() + s.Require().NoError(err) + + // Force the owner to use the store node as relay peer + + err = s.owner.DialPeer(storeNodeAddress) + s.Require().NoError(err) + + // Create a community + community, chat := s.createCommunity() // bob joins the community s.advertiseCommunityTo(community, s.bob) - s.joinCommunity(community, s.bob, bobPassword, []string{}) + s.joinCommunityWithAirdropAddress(community, s.bob, bobPassword, []string{bobAddress}, "") + + messages := []string{ + "1-message", // RandomLettersString(10), // successful message on open community + "2-message", // RandomLettersString(11), // failing message on encrypted community + "3-message", // RandomLettersString(12), // successful message on encrypted community + } // send message to the channel - msg := s.sendChatMessage(s.owner, chat.ID, "hello on open community") + msg := s.sendChatMessage(s.owner, chat.ID, messages[0]) + s.logger.Debug("owner sent a message", + zap.String("messageText", msg.Text), + zap.String("messageID", msg.ID), + ) // bob can read the message response, err := WaitOnMessengerResponse( @@ -651,12 +704,17 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestEditSharedAddresses() { } return false }, - "no messages", + "first message not received", ) s.Require().NoError(err) s.Require().Len(response.Messages(), 1) s.Require().Equal(msg.Text, response.Messages()[0].Text) + bobMessages, _, err := s.bob.MessageByChatID(msg.ChatId, "", 10) + s.Require().NoError(err) + s.Require().Len(bobMessages, 1) + s.Require().Equal(messages[0], bobMessages[0].Text) + // setup become member permission permissionRequest := requests.CreateCommunityTokenPermission{ CommunityID: community.ID(), @@ -709,7 +767,11 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestEditSharedAddresses() { s.Require().NoError(err) // send message to channel - msg = s.sendChatMessage(s.owner, chat.ID, "hello on encrypted community") + msg = s.sendChatMessage(s.owner, chat.ID, messages[1]) + s.logger.Debug("owner sent a message", + zap.String("messageText", msg.Text), + zap.String("messageID", msg.ID), + ) // bob can't read the message _, err = WaitOnMessengerResponse( @@ -730,13 +792,12 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestEditSharedAddresses() { // bob tries to join, but he doesn't satisfy so the request isn't sent request := &requests.RequestToJoinCommunity{CommunityID: community.ID(), AddressesToReveal: []string{bobAddress}, AirdropAddress: bobAddress} _, err = s.bob.RequestToJoinCommunity(request) - s.Require().Error(err) - s.Require().ErrorContains(err, "permission to join not satisfied") + s.Require().ErrorIs(err, communities.ErrPermissionToJoinNotSatisfied) // make sure bob does not have a pending request to join - requests, err := s.bob.MyPendingRequestsToJoin() + pendingRequests, err := s.bob.MyPendingRequestsToJoin() s.Require().NoError(err) - s.Require().Len(requests, 0) + s.Require().Len(pendingRequests, 0) // make bob satisfy the criteria s.makeAddressSatisfyTheCriteria(testChainID1, bobAddress, permissionRequest.TokenCriteria[0]) @@ -748,18 +809,25 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestEditSharedAddresses() { }) // bob re-joins the community - s.joinCommunity(community, s.bob, bobPassword, []string{}) + s.joinCommunity(community, s.bob, bobPassword, []string{bobAddress}) err = <-waitOnCommunityKeyToBeDistributedToBob s.Require().NoError(err) // send message to channel - msg = s.sendChatMessage(s.owner, chat.ID, "hello on encrypted community 2") + msg = s.sendChatMessage(s.owner, chat.ID, messages[2]) + s.logger.Debug("owner sent a message", + zap.String("messageText", msg.Text), + zap.String("messageID", msg.ID), + ) // bob can read the message _, err = WaitOnMessengerResponse( s.bob, func(r *MessengerResponse) bool { + if len(r.messages) != len(messages) { + return false + } for _, message := range r.messages { if message.Text == msg.Text { return true @@ -767,11 +835,21 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestEditSharedAddresses() { } return false }, - "no messages", + "not all 3 messages received", ) s.Require().NoError(err) -} -*/ + + // Bob should have all 3 messages + bobMessages, _, err = s.bob.MessageByChatID(msg.ChatId, "", 10) + s.Require().NoError(err) + s.Require().Len(bobMessages, 3) + + sort.Slice(bobMessages, func(i, j int) bool { + return bobMessages[i].Text < bobMessages[j].Text + }) + + s.Require().Equal(messages[0], bobMessages[0].Text) +}*/ func (s *MessengerCommunitiesTokenPermissionsSuite) TestJoinCommunityWithAdminPermission() { community, _ := s.createCommunity() diff --git a/protocol/messenger.go b/protocol/messenger.go index e759df962..540a67a0b 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -1549,7 +1549,7 @@ func (m *Messenger) watchExpiredMessages() { if m.Online() { err := m.resendExpiredMessages() if err != nil { - m.logger.Debug("Error when resending expired emoji reactions", zap.Error(err)) + m.logger.Debug("failed to resend expired message", zap.Error(err)) } } case <-m.quit: diff --git a/protocol/messenger_base_test.go b/protocol/messenger_base_test.go index 6b42cfcb7..a1daf1422 100644 --- a/protocol/messenger_base_test.go +++ b/protocol/messenger_base_test.go @@ -72,9 +72,10 @@ func newMessengerWithKey(shh types.Waku, privateKey *ecdsa.PrivateKey, logger *z options = append(options, extraOptions...) m, err := newTestMessenger(shh, testMessengerConfig{ - privateKey: privateKey, - logger: logger, - }, options) + privateKey: privateKey, + logger: logger, + extraOptions: options, + }) if err != nil { return nil, err } diff --git a/protocol/messenger_builder_test.go b/protocol/messenger_builder_test.go index 4a476d655..1db3c9e79 100644 --- a/protocol/messenger_builder_test.go +++ b/protocol/messenger_builder_test.go @@ -25,6 +25,7 @@ type testMessengerConfig struct { logger *zap.Logger unhandledMessagesTracker *unhandledMessagesTracker + extraOptions []Option } func (tmc *testMessengerConfig) complete() error { @@ -42,13 +43,13 @@ func (tmc *testMessengerConfig) complete() error { if tmc.logger == nil { logger := tt.MustCreateTestLogger() - tmc.logger = logger.With(zap.String("name", tmc.name)) + tmc.logger = logger.Named(tmc.name) } return nil } -func newTestMessenger(waku types.Waku, config testMessengerConfig, extraOptions []Option) (*Messenger, error) { +func newTestMessenger(waku types.Waku, config testMessengerConfig) (*Messenger, error) { err := config.complete() if err != nil { return nil, err @@ -80,7 +81,7 @@ func newTestMessenger(waku types.Waku, config testMessengerConfig, extraOptions WithToplevelDatabaseMigrations(), WithBrowserDatabase(nil), } - options = append(options, extraOptions...) + options = append(options, config.extraOptions...) m, err := NewMessenger( config.name, diff --git a/protocol/messenger_chats.go b/protocol/messenger_chats.go index f1bf681f6..fbb03a53b 100644 --- a/protocol/messenger_chats.go +++ b/protocol/messenger_chats.go @@ -364,6 +364,13 @@ func (m *Messenger) deleteChat(chatID string) error { if err != nil { return err } + + // We clean the cache to be able to receive the messages again later + err = m.transport.ClearProcessedMessageIDsCache() + if err != nil { + return err + } + chat, ok := m.allChats.Load(chatID) if ok && chat.Active && chat.Public() { diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index 12b37954b..d519388a4 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -1046,7 +1046,7 @@ func (m *Messenger) RequestToJoinCommunity(request *requests.RequestToJoinCommun return nil, err } if !permissions.Satisfied { - return nil, errors.New("permission to join not satisfied") + return nil, communities.ErrPermissionToJoinNotSatisfied } for _, accountAndChainIDs := range permissions.ValidCombinations { diff --git a/protocol/messenger_offline_test.go b/protocol/messenger_offline_test.go index 72ec88b86..d2e4fd74e 100644 --- a/protocol/messenger_offline_test.go +++ b/protocol/messenger_offline_test.go @@ -90,6 +90,9 @@ func (s *MessengerOfflineSuite) newMessenger(waku types.Waku, logger *zap.Logger return newTestCommunitiesMessenger(&s.Suite, waku, testCommunitiesMessengerConfig{ testMessengerConfig: testMessengerConfig{ logger: s.logger, + extraOptions: []Option{ + WithResendParams(3, 3), + }, }, }) } diff --git a/protocol/messenger_storenode_request_test.go b/protocol/messenger_storenode_request_test.go index 985cc5e04..b70c216f6 100644 --- a/protocol/messenger_storenode_request_test.go +++ b/protocol/messenger_storenode_request_test.go @@ -191,24 +191,8 @@ func (s *MessengerStoreNodeRequestSuite) newMessenger(shh types.Waku, logger *za privateKey, err := crypto.GenerateKey() s.Require().NoError(err) - mailserversSQLDb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) - s.Require().NoError(err) - - mailserversDatabase := mailserversDB.NewDB(mailserversSQLDb) - err = mailserversDatabase.Add(mailserversDB.Mailserver{ - ID: localMailserverID, - Name: localMailserverID, - Address: mailserverAddress, - Fleet: localFleet, - }) - s.Require().NoError(err) - options := []Option{ - WithMailserversDatabase(mailserversDatabase), - WithClusterConfig(params.ClusterConfig{ - Fleet: localFleet, - }), - WithCommunityTokensService(s.collectiblesServiceMock), + WithTestStoreNode(&s.Suite, localMailserverID, mailserverAddress, localFleet, s.collectiblesServiceMock), } messenger, err := newMessengerWithKey(shh, privateKey, logger, options) @@ -617,7 +601,6 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestShardAndCommunityInfo() { s.createOwner() s.createBob() - s.waitForAvailableStoreNode(s.owner) community := s.createCommunity(s.owner) expectedShard := &shard.Shard{ diff --git a/protocol/messenger_sync_settings_test.go b/protocol/messenger_sync_settings_test.go index f1dd9551b..37ac60b3f 100644 --- a/protocol/messenger_sync_settings_test.go +++ b/protocol/messenger_sync_settings_test.go @@ -134,7 +134,9 @@ func (s *MessengerSyncSettingsSuite) newMessenger() *Messenger { Currency: "eth", } - m, err := newTestMessenger(s.shh, testMessengerConfig{}, []Option{WithAppSettings(setting, config)}) + m, err := newTestMessenger(s.shh, testMessengerConfig{ + extraOptions: []Option{WithAppSettings(setting, config)}, + }) s.Require().NoError(err) return m } diff --git a/protocol/messenger_testing_utils.go b/protocol/messenger_testing_utils.go index 3c68bdbc1..a6cb233f4 100644 --- a/protocol/messenger_testing_utils.go +++ b/protocol/messenger_testing_utils.go @@ -313,7 +313,7 @@ func NewWakuV2(s *suite.Suite, logger *zap.Logger, useLocalWaku bool, enableStor func CreateWakuV2Network(s *suite.Suite, parentLogger *zap.Logger, useShardAsDefaultTopic bool, nodeNames []string) []types.Waku { nodes := make([]*waku2.Waku, len(nodeNames)) for i, name := range nodeNames { - logger := parentLogger.With(zap.String("name", name+"-waku")) + logger := parentLogger.Named(name + "-waku") wakuNode := NewWakuV2(s, logger, true, false, useShardAsDefaultTopic, 0) nodes[i] = wakuNode } diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 0fe6fa0f0..32adde08c 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -608,6 +608,7 @@ func (t *Transport) waitForRequestCompleted(ctx context.Context, requestID []byt // ConfirmMessagesProcessed marks the messages as processed in the cache so // they won't be passed to the next layer anymore func (t *Transport) ConfirmMessagesProcessed(ids []string, timestamp uint64) error { + t.logger.Debug("confirming message processed", zap.Any("ids", ids), zap.Any("timestamp", timestamp)) return t.cache.Add(ids, timestamp) } @@ -625,6 +626,8 @@ func (t *Transport) SetEnvelopeEventsHandler(handler EnvelopeEventsHandler) erro } func (t *Transport) ClearProcessedMessageIDsCache() error { + t.logger.Debug("clearing processed messages cache") + t.waku.ClearEnvelopesCache() return t.cache.Clear() } diff --git a/waku/waku.go b/waku/waku.go index 29675b0a5..cf5a23926 100644 --- a/waku/waku.go +++ b/waku/waku.go @@ -1580,6 +1580,12 @@ func (w *Waku) MarkP2PMessageAsProcessed(hash gethcommon.Hash) { delete(w.p2pMsgIDs, hash) } +func (w *Waku) ClearEnvelopesCache() { + w.poolMu.Lock() + defer w.poolMu.Unlock() + w.envelopes = make(map[gethcommon.Hash]*common.Envelope) +} + func (w *Waku) Clean() error { w.poolMu.Lock() defer w.poolMu.Unlock() diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 95e79e83d..bc28228cc 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -1510,6 +1510,12 @@ func (w *Waku) IsEnvelopeCached(hash gethcommon.Hash) bool { return exist } +func (w *Waku) ClearEnvelopesCache() { + w.poolMu.Lock() + defer w.poolMu.Unlock() + w.envelopes = make(map[gethcommon.Hash]*common.ReceivedMessage) +} + func (w *Waku) PeerCount() int { return w.node.PeerCount() }