feat: introduce `MessagesIterator`

Enables customization of iteration strategy for retrieved messages.
This commit is contained in:
Patryk Osmaczko 2024-02-06 13:43:06 +01:00 committed by osmaczko
parent 9b7926b23b
commit 7e8b61f888
2 changed files with 50 additions and 3 deletions

View File

@ -0,0 +1,40 @@
package protocol
import (
"golang.org/x/exp/maps"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport"
)
type MessagesIterator interface {
HasNext() bool
Next() (transport.Filter, []*types.Message)
}
type DefaultMessagesIterator struct {
chatWithMessages map[transport.Filter][]*types.Message
keys []transport.Filter
currentIndex int
}
func NewDefaultMessagesIterator(chatWithMessages map[transport.Filter][]*types.Message) MessagesIterator {
return &DefaultMessagesIterator{
chatWithMessages: chatWithMessages,
keys: maps.Keys(chatWithMessages),
currentIndex: 0,
}
}
func (it *DefaultMessagesIterator) HasNext() bool {
return it.currentIndex < len(it.keys)
}
func (it *DefaultMessagesIterator) Next() (transport.Filter, []*types.Message) {
if it.HasNext() {
key := it.keys[it.currentIndex]
it.currentIndex++
return key, it.chatWithMessages[key]
}
return transport.Filter{}, nil
}

View File

@ -181,6 +181,9 @@ type Messenger struct {
// used to track unhandled messages // used to track unhandled messages
unhandledMessagesTracker func(*v1protocol.StatusMessage, error) unhandledMessagesTracker func(*v1protocol.StatusMessage, error)
// enables control over chat messages iteration
retrievedMessagesIteratorFactory func(map[transport.Filter][]*types.Message) MessagesIterator
peersyncing *peersyncing.PeerSyncing peersyncing *peersyncing.PeerSyncing
peersyncingOffers map[string]uint64 peersyncingOffers map[string]uint64
peersyncingRequests map[string]uint64 peersyncingRequests map[string]uint64
@ -579,6 +582,7 @@ func NewMessenger(
}, },
logger: logger, logger: logger,
savedAddressesManager: savedAddressesManager, savedAddressesManager: savedAddressesManager,
retrievedMessagesIteratorFactory: NewDefaultMessagesIterator,
} }
if c.rpcClient != nil { if c.rpcClient != nil {
@ -3735,7 +3739,10 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
logger.Info("failed to retrieve admin communities", zap.Error(err)) logger.Info("failed to retrieve admin communities", zap.Error(err))
} }
for filter, messages := range chatWithMessages { iterator := m.retrievedMessagesIteratorFactory(chatWithMessages)
for iterator.HasNext() {
filter, messages := iterator.Next()
var processedMessages []string var processedMessages []string
for _, shhMessage := range messages { for _, shhMessage := range messages {
logger := logger.With(zap.String("hash", types.EncodeHex(shhMessage.Hash))) logger := logger.With(zap.String("hash", types.EncodeHex(shhMessage.Hash)))