diff --git a/protocol/messages_iterator.go b/protocol/messages_iterator.go new file mode 100644 index 000000000..ff761a8c3 --- /dev/null +++ b/protocol/messages_iterator.go @@ -0,0 +1,40 @@ +package protocol + +import ( + "golang.org/x/exp/maps" + + "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/protocol/transport" +) + +type MessagesIterator interface { + HasNext() bool + Next() (transport.Filter, []*types.Message) +} + +type DefaultMessagesIterator struct { + chatWithMessages map[transport.Filter][]*types.Message + keys []transport.Filter + currentIndex int +} + +func NewDefaultMessagesIterator(chatWithMessages map[transport.Filter][]*types.Message) MessagesIterator { + return &DefaultMessagesIterator{ + chatWithMessages: chatWithMessages, + keys: maps.Keys(chatWithMessages), + currentIndex: 0, + } +} + +func (it *DefaultMessagesIterator) HasNext() bool { + return it.currentIndex < len(it.keys) +} + +func (it *DefaultMessagesIterator) Next() (transport.Filter, []*types.Message) { + if it.HasNext() { + key := it.keys[it.currentIndex] + it.currentIndex++ + return key, it.chatWithMessages[key] + } + return transport.Filter{}, nil +} diff --git a/protocol/messenger.go b/protocol/messenger.go index 9b5f2f5fc..a26d0e84e 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -181,6 +181,9 @@ type Messenger struct { // used to track unhandled messages unhandledMessagesTracker func(*v1protocol.StatusMessage, error) + // enables control over chat messages iteration + retrievedMessagesIteratorFactory func(map[transport.Filter][]*types.Message) MessagesIterator + peersyncing *peersyncing.PeerSyncing peersyncingOffers map[string]uint64 peersyncingRequests map[string]uint64 @@ -577,8 +580,9 @@ func NewMessenger( func() error { _ = logger.Sync; return nil }, database.Close, }, - logger: logger, - savedAddressesManager: savedAddressesManager, + logger: logger, + savedAddressesManager: savedAddressesManager, + retrievedMessagesIteratorFactory: NewDefaultMessagesIterator, } if c.rpcClient != nil { @@ -3735,7 +3739,10 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte logger.Info("failed to retrieve admin communities", zap.Error(err)) } - for filter, messages := range chatWithMessages { + iterator := m.retrievedMessagesIteratorFactory(chatWithMessages) + for iterator.HasNext() { + filter, messages := iterator.Next() + var processedMessages []string for _, shhMessage := range messages { logger := logger.With(zap.String("hash", types.EncodeHex(shhMessage.Hash)))