Cache waku messages

This commit re-introduces a feature that we lost during the migration to
status-go.
Messages are cached for a couple of days if processed correctly by
status-go, to avoid performance issues.
This commit is contained in:
Andrea Maria Piana 2021-01-08 16:21:25 +01:00
parent fbec17af18
commit e3969a7752
9 changed files with 209 additions and 5 deletions

View File

@ -1 +1 @@
0.68.5
0.68.6

View File

@ -736,7 +736,7 @@ func _0018_profile_pictures_visibilityUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "0018_profile_pictures_visibility.up.sql", size: 84, mode: os.FileMode(0644), modTime: time.Unix(1610637901, 0)}
info := bindataFileInfo{name: "0018_profile_pictures_visibility.up.sql", size: 84, mode: os.FileMode(0644), modTime: time.Unix(1610715642, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xc9, 0xe3, 0xc5, 0xec, 0x83, 0x55, 0x45, 0x57, 0x7a, 0xaa, 0xd2, 0xa7, 0x59, 0xa7, 0x87, 0xef, 0x63, 0x19, 0x9c, 0x46, 0x9c, 0xc5, 0x32, 0x89, 0xa4, 0x68, 0x70, 0xd8, 0x83, 0x43, 0xa4, 0x72}}
return a, nil
}

View File

@ -61,6 +61,9 @@ const emojiResendMaxCount = 3
var communityAdvertiseIntervalSecond int64 = 60 * 60
// messageCacheIntervalMs is how long we should keep processed messages in the cache, in ms
var messageCacheIntervalMs uint64 = 1000 * 60 * 60 * 48
// Messenger is a entity managing chats and messages.
// It acts as a bridge between the application and encryption
// layers.
@ -722,6 +725,10 @@ func (m *Messenger) handleEncryptionLayerSubscriptions(subscriptions *encryption
if err := m.handleSendContactCode(); err != nil {
m.logger.Error("failed to publish contact code", zap.Error(err))
}
// we also piggy-back to clean up cached messages
if err := m.transport.CleanMessagesProcessed(m.getTimesource().GetCurrentTime() - messageCacheIntervalMs); err != nil {
m.logger.Error("failed to clean processed messages", zap.Error(err))
}
case <-subscriptions.Quit:
m.logger.Debug("quitting encryption subscription loop")
@ -2709,7 +2716,10 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
logger := m.logger.With(zap.String("site", "RetrieveAll"))
for _, messages := range chatWithMessages {
var processedMessages []string
for _, shhMessage := range messages {
// Indicates tha all messages in the batch have been processed correctly
allMessagesProcessed := true
statusMessages, err := m.processor.HandleMessages(shhMessage, true)
if err != nil {
logger.Info("failed to decode messages", zap.Error(err))
@ -2751,6 +2761,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
c, err := buildContact(senderID, publicKey)
if err != nil {
logger.Info("failed to build contact", zap.Error(err))
allMessagesProcessed = false
continue
}
contact = c
@ -2775,6 +2786,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
err = m.handler.HandleMembershipUpdate(messageState, messageState.AllChats[rawMembershipUpdate.ChatId], rawMembershipUpdate, m.systemMessagesTranslations)
if err != nil {
logger.Warn("failed to handle MembershipUpdate", zap.Error(err))
allMessagesProcessed = false
continue
}
@ -2784,6 +2796,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
err = m.handler.HandleChatMessage(messageState)
if err != nil {
logger.Warn("failed to handle ChatMessage", zap.Error(err))
allMessagesProcessed = false
continue
}
@ -2797,6 +2810,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
err = m.handler.HandlePairInstallation(messageState, p)
if err != nil {
logger.Warn("failed to handle PairInstallation", zap.Error(err))
allMessagesProcessed = false
continue
}
@ -2811,6 +2825,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
err = m.handler.HandleSyncInstallationContact(messageState, p)
if err != nil {
logger.Warn("failed to handle SyncInstallationContact", zap.Error(err))
allMessagesProcessed = false
continue
}
@ -2830,6 +2845,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
err := m.reregisterForPushNotifications()
if err != nil {
allMessagesProcessed = false
logger.Warn("could not re-register for push notifications", zap.Error(err))
continue
}
@ -2841,6 +2857,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
err = m.handler.HandleRequestAddressForTransaction(messageState, command)
if err != nil {
logger.Warn("failed to handle RequestAddressForTransaction", zap.Error(err))
allMessagesProcessed = false
continue
}
@ -2850,6 +2867,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
err = m.handler.HandleSendTransaction(messageState, command)
if err != nil {
logger.Warn("failed to handle SendTransaction", zap.Error(err))
allMessagesProcessed = false
continue
}
@ -2859,6 +2877,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
err = m.handler.HandleAcceptRequestAddressForTransaction(messageState, command)
if err != nil {
logger.Warn("failed to handle AcceptRequestAddressForTransaction", zap.Error(err))
allMessagesProcessed = false
continue
}
@ -2868,6 +2887,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
err = m.handler.HandleDeclineRequestAddressForTransaction(messageState, command)
if err != nil {
logger.Warn("failed to handle DeclineRequestAddressForTransaction", zap.Error(err))
allMessagesProcessed = false
continue
}
@ -2877,6 +2897,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
err = m.handler.HandleDeclineRequestTransaction(messageState, command)
if err != nil {
logger.Warn("failed to handle DeclineRequestTransaction", zap.Error(err))
allMessagesProcessed = false
continue
}
@ -2886,6 +2907,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
err = m.handler.HandleRequestTransaction(messageState, command)
if err != nil {
logger.Warn("failed to handle RequestTransaction", zap.Error(err))
allMessagesProcessed = false
continue
}
@ -2895,6 +2917,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
err = m.handler.HandleContactUpdate(messageState, contactUpdate)
if err != nil {
logger.Warn("failed to handle ContactUpdate", zap.Error(err))
allMessagesProcessed = false
continue
}
case protobuf.PushNotificationQuery:
@ -2904,6 +2927,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
logger.Debug("Handling PushNotificationQuery")
if err := m.pushNotificationServer.HandlePushNotificationQuery(publicKey, msg.ID, msg.ParsedMessage.Interface().(protobuf.PushNotificationQuery)); err != nil {
allMessagesProcessed = false
logger.Warn("failed to handle PushNotificationQuery", zap.Error(err))
}
// We continue in any case, no changes to messenger
@ -2915,6 +2939,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
logger.Debug("Handling PushNotificationRegistrationResponse")
if err := m.pushNotificationClient.HandlePushNotificationRegistrationResponse(publicKey, msg.ParsedMessage.Interface().(protobuf.PushNotificationRegistrationResponse)); err != nil {
allMessagesProcessed = false
logger.Warn("failed to handle PushNotificationRegistrationResponse", zap.Error(err))
}
// We continue in any case, no changes to messenger
@ -2928,6 +2953,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
logger.Debug("Received ContactCodeAdvertisement ChatIdentity")
err = m.handler.HandleChatIdentity(messageState, *cca.ChatIdentity)
if err != nil {
allMessagesProcessed = false
logger.Warn("failed to handle ContactCodeAdvertisement ChatIdentity", zap.Error(err))
// No continue as Chat Identity may fail but the rest of the cca may process fine.
}
@ -2938,6 +2964,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
logger.Debug("Handling ContactCodeAdvertisement")
if err := m.pushNotificationClient.HandleContactCodeAdvertisement(publicKey, cca); err != nil {
allMessagesProcessed = false
logger.Warn("failed to handle ContactCodeAdvertisement", zap.Error(err))
}
@ -2951,6 +2978,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
logger.Debug("Handling PushNotificationResponse")
if err := m.pushNotificationClient.HandlePushNotificationResponse(publicKey, msg.ParsedMessage.Interface().(protobuf.PushNotificationResponse)); err != nil {
allMessagesProcessed = false
logger.Warn("failed to handle PushNotificationResponse", zap.Error(err))
}
// We continue in any case, no changes to messenger
@ -2963,6 +2991,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
logger.Debug("Handling PushNotificationQueryResponse")
if err := m.pushNotificationClient.HandlePushNotificationQueryResponse(publicKey, msg.ParsedMessage.Interface().(protobuf.PushNotificationQueryResponse)); err != nil {
allMessagesProcessed = false
logger.Warn("failed to handle PushNotificationQueryResponse", zap.Error(err))
}
// We continue in any case, no changes to messenger
@ -2975,6 +3004,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
logger.Debug("Handling PushNotificationRequest")
if err := m.pushNotificationServer.HandlePushNotificationRequest(publicKey, msg.ID, msg.ParsedMessage.Interface().(protobuf.PushNotificationRequest)); err != nil {
allMessagesProcessed = false
logger.Warn("failed to handle PushNotificationRequest", zap.Error(err))
}
// We continue in any case, no changes to messenger
@ -2984,6 +3014,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
err = m.handler.HandleEmojiReaction(messageState, msg.ParsedMessage.Interface().(protobuf.EmojiReaction))
if err != nil {
logger.Warn("failed to handle EmojiReaction", zap.Error(err))
allMessagesProcessed = false
continue
}
case protobuf.GroupChatInvitation:
@ -2991,6 +3022,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
err = m.handler.HandleGroupChatInvitation(messageState, msg.ParsedMessage.Interface().(protobuf.GroupChatInvitation))
if err != nil {
logger.Warn("failed to handle GroupChatInvitation", zap.Error(err))
allMessagesProcessed = false
continue
}
case protobuf.ChatIdentity:
@ -2998,6 +3030,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
err = m.handler.HandleChatIdentity(messageState, msg.ParsedMessage.Interface().(protobuf.ChatIdentity))
if err != nil {
logger.Warn("failed to handle ChatIdentity", zap.Error(err))
allMessagesProcessed = false
continue
}
@ -3006,6 +3039,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
err = m.handler.HandleCommunityDescription(messageState, publicKey, msg.ParsedMessage.Interface().(protobuf.CommunityDescription), msg.DecryptedPayload)
if err != nil {
logger.Warn("failed to handle CommunityDescription", zap.Error(err))
allMessagesProcessed = false
continue
}
case protobuf.CommunityInvitation:
@ -3014,6 +3048,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
err = m.handler.HandleCommunityInvitation(messageState, publicKey, invitation, invitation.CommunityDescription)
if err != nil {
logger.Warn("failed to handle CommunityDescription", zap.Error(err))
allMessagesProcessed = false
continue
}
default:
@ -3025,6 +3060,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
}
logger.Debug("Handling PushNotificationRegistration")
if err := m.pushNotificationServer.HandlePushNotificationRegistration(publicKey, msg.ParsedMessage.Interface().([]byte)); err != nil {
allMessagesProcessed = false
logger.Warn("failed to handle PushNotificationRegistration", zap.Error(err))
}
// We continue in any case, no changes to messenger
@ -3038,6 +3074,16 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
logger.Debug("parsed message is nil")
}
}
if allMessagesProcessed {
processedMessages = append(processedMessages, types.EncodeHex(shhMessage.Hash))
}
}
if len(processedMessages) != 0 {
if err := m.transport.ConfirmMessagesProcessed(processedMessages, m.getTimesource().GetCurrentTime()); err != nil {
logger.Warn("failed to confirm processed messages", zap.Error(err))
}
}
}

View File

@ -20,6 +20,7 @@
// 1603816533_add_links.up.sql (48B)
// 1603888149_create_chat_identity_last_published_table.up.sql (407B)
// 1605075346_add_communities.up.sql (6.971kB)
// 1610117927_add_message_cache.up.sql (142B)
// README.md (554B)
// doc.go (850B)
@ -485,11 +486,31 @@ func _1605075346_add_communitiesUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1605075346_add_communities.up.sql", size: 6971, mode: os.FileMode(0644), modTime: time.Unix(1610363715, 0)}
info := bindataFileInfo{name: "1605075346_add_communities.up.sql", size: 6971, mode: os.FileMode(0644), modTime: time.Unix(1610638422, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x1f, 0x64, 0xea, 0xb4, 0xae, 0x9e, 0xdb, 0x9, 0x58, 0xb6, 0x5c, 0x7a, 0x50, 0xc5, 0xfe, 0x93, 0x5d, 0x36, 0x85, 0x5d, 0x6a, 0xba, 0xc9, 0x7e, 0x84, 0xd7, 0xbf, 0x2a, 0x53, 0xf3, 0x97, 0xf1}}
return a, nil
}
var __1610117927_add_message_cacheUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x3c\xcb\xb1\xaa\xc2\x30\x14\x06\xe0\xbd\xd0\x77\xf8\xc7\x7b\xc1\x37\x70\x3a\x86\x53\x0c\xc6\xb4\x9c\x1e\xc5\x4e\x25\xd4\xa0\x1d\xa2\xa5\xc9\xfb\x23\x75\x70\xfe\xf8\x8c\x30\x29\x43\xe9\xe0\x18\xb6\x81\x6f\x15\x7c\xb3\xbd\xf6\x28\x6b\x78\xe5\xe5\xbd\x96\x31\xc5\x9c\xc3\x23\x8e\x53\x98\x9e\x11\x7f\x75\x05\xcc\x77\x5c\x49\xcc\x91\xe4\x5b\xfc\xc5\x39\x74\x62\xcf\x24\x03\x4e\x3c\xa0\xf5\x30\xad\x6f\x9c\x35\x0a\xe1\xce\x91\xe1\xdd\xf6\xca\x9c\x62\x2e\x21\x2d\xb0\x5e\x7f\x75\x93\xff\x7d\x5d\x7d\x02\x00\x00\xff\xff\xcd\x45\x6b\x55\x8e\x00\x00\x00")
func _1610117927_add_message_cacheUpSqlBytes() ([]byte, error) {
return bindataRead(
__1610117927_add_message_cacheUpSql,
"1610117927_add_message_cache.up.sql",
)
}
func _1610117927_add_message_cacheUpSql() (*asset, error) {
bytes, err := _1610117927_add_message_cacheUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1610117927_add_message_cache.up.sql", size: 142, mode: os.FileMode(0644), modTime: time.Unix(1610715642, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x34, 0xf1, 0xf0, 0x82, 0x79, 0x28, 0x19, 0xc2, 0x39, 0x6a, 0xa5, 0x96, 0x59, 0x23, 0xa0, 0xed, 0x60, 0x58, 0x86, 0x9, 0xb9, 0xad, 0xfb, 0xa, 0xe3, 0x47, 0x6e, 0xa1, 0x18, 0xe8, 0x39, 0x2c}}
return a, nil
}
var _readmeMd = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x54\x91\xc1\xce\xd3\x30\x10\x84\xef\x7e\x8a\x91\x7a\x01\xa9\x2a\x8f\xc0\x0d\x71\x82\x03\x48\x1c\xc9\x36\x9e\x36\x96\x1c\x6f\xf0\xae\x93\xe6\xed\x91\xa3\xc2\xdf\xff\x66\xed\xd8\x33\xdf\x78\x4f\xa7\x13\xbe\xea\x06\x57\x6c\x35\x39\x31\xa7\x7b\x15\x4f\x5a\xec\x73\x08\xbf\x08\x2d\x79\x7f\x4a\x43\x5b\x86\x17\xfd\x8c\x21\xea\x56\x5e\x47\x90\x4a\x14\x75\x48\xde\x64\x37\x2c\x6a\x96\xae\x99\x48\x05\xf6\x27\x77\x13\xad\x08\xae\x8a\x51\xe7\x25\xf3\xf1\xa9\x9f\xf9\x58\x58\x2c\xad\xbc\xe0\x8b\x56\xf0\x21\x5d\xeb\x4c\x95\xb3\xae\x84\x60\xd4\xdc\xe6\x82\x5d\x1b\x36\x6d\x39\x62\x92\xf5\xb8\x11\xdb\x92\xd3\x28\xce\xe0\x13\xe1\x72\xcd\x3c\x63\xd4\x65\x87\xae\xac\xe8\xc3\x28\x2e\x67\x44\x66\x3a\x21\x25\xa2\x72\xac\x14\x67\xbc\x84\x9f\x53\x32\x8c\x52\x70\x25\x56\xd6\xfd\x8d\x05\x37\xad\x30\x9d\x9f\xa6\x86\x0f\xcd\x58\x7f\xcf\x34\x93\x3b\xed\x90\x9f\xa4\x1f\xcf\x30\x85\x4d\x07\x58\xaf\x7f\x25\xc4\x9d\xf3\x72\x64\x84\xd0\x7f\xf9\x9b\x3a\x2d\x84\xef\x85\x48\x66\x8d\xd8\x88\x9b\x8c\x8c\x98\x5b\xf6\x74\x14\x4e\x33\x0d\xc9\xe0\x93\x38\xda\x12\xc5\x69\xbd\xe4\xf0\x2e\x7a\x78\x07\x1c\xfe\x13\x9f\x91\x29\x31\x95\x7b\x7f\x62\x59\x37\xb4\xe5\x5e\x25\xfe\x33\xee\xd5\x53\x71\xd6\xda\x3a\xd8\xcb\xde\x2e\xf8\xa1\x90\x55\x53\x0c\xc7\xaa\x0d\xe9\x76\x14\x29\x1c\x7b\x68\xdd\x2f\xe1\x6f\x00\x00\x00\xff\xff\x3c\x0a\xc2\xfe\x2a\x02\x00\x00")
func readmeMdBytes() ([]byte, error) {
@ -505,7 +526,7 @@ func readmeMd() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "README.md", size: 554, mode: os.FileMode(0644), modTime: time.Unix(1610363715, 0)}
info := bindataFileInfo{name: "README.md", size: 554, mode: os.FileMode(0644), modTime: time.Unix(1610638422, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x1c, 0x6e, 0xfb, 0xcc, 0x81, 0x94, 0x4d, 0x8c, 0xa0, 0x3b, 0x5, 0xb0, 0x18, 0xd6, 0xbb, 0xb3, 0x79, 0xc8, 0x8f, 0xff, 0xc1, 0x10, 0xf9, 0xf, 0x20, 0x1b, 0x4a, 0x74, 0x96, 0x42, 0xd7, 0xa8}}
return a, nil
}
@ -661,6 +682,8 @@ var _bindata = map[string]func() (*asset, error){
"1605075346_add_communities.up.sql": _1605075346_add_communitiesUpSql,
"1610117927_add_message_cache.up.sql": _1610117927_add_message_cacheUpSql,
"README.md": readmeMd,
"doc.go": docGo,
@ -727,6 +750,7 @@ var _bintree = &bintree{nil, map[string]*bintree{
"1603816533_add_links.up.sql": &bintree{_1603816533_add_linksUpSql, map[string]*bintree{}},
"1603888149_create_chat_identity_last_published_table.up.sql": &bintree{_1603888149_create_chat_identity_last_published_tableUpSql, map[string]*bintree{}},
"1605075346_add_communities.up.sql": &bintree{_1605075346_add_communitiesUpSql, map[string]*bintree{}},
"1610117927_add_message_cache.up.sql": &bintree{_1610117927_add_message_cacheUpSql, map[string]*bintree{}},
"README.md": &bintree{readmeMd, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}},
}}

View File

@ -0,0 +1,4 @@
CREATE TABLE IF NOT EXISTS transport_message_cache (
id VARCHAR NOT NULL PRIMARY KEY ON CONFLICT REPLACE,
timestamp INT NOT NULL
);

View File

@ -0,0 +1,81 @@
package transport
import (
"context"
"database/sql"
"strings"
)
type ProcessedMessageIDsCache struct {
db *sql.DB
}
func NewProcessedMessageIDsCache(db *sql.DB) *ProcessedMessageIDsCache {
return &ProcessedMessageIDsCache{db: db}
}
func (c *ProcessedMessageIDsCache) Hits(ids []string) (map[string]bool, error) {
hits := make(map[string]bool)
idsArgs := make([]interface{}, 0, len(ids))
for _, id := range ids {
idsArgs = append(idsArgs, id)
}
inVector := strings.Repeat("?, ", len(ids)-1) + "?"
query := "SELECT id FROM transport_message_cache WHERE id IN (" + inVector + ")" // nolint: gosec
rows, err := c.db.Query(query, idsArgs...)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var id string
err := rows.Scan(&id)
if err != nil {
return nil, err
}
hits[id] = true
}
return hits, nil
}
func (c *ProcessedMessageIDsCache) Add(ids []string, timestamp uint64) (err error) {
var tx *sql.Tx
tx, err = c.db.BeginTx(context.Background(), &sql.TxOptions{})
if err != nil {
return
}
defer func() {
if err == nil {
err = tx.Commit()
return
}
// don't shadow original error
_ = tx.Rollback()
}()
for _, id := range ids {
var stmt *sql.Stmt
stmt, err = tx.Prepare(`INSERT INTO transport_message_cache(id,timestamp) VALUES (?, ?)`)
if err != nil {
return
}
_, err = stmt.Exec(id, timestamp)
if err != nil {
return
}
}
return
}
func (c *ProcessedMessageIDsCache) Clean(timestamp uint64) error {
_, err := c.db.Exec(`REMOVE FROM transport_message_cache WHERE timestamp < ?`, timestamp)
return err
}

View File

@ -42,5 +42,8 @@ type Transport interface {
ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*Filter, error)
RetrieveRawAll() (map[Filter][]*types.Message, error)
ConfirmMessagesProcessed([]string, uint64) error
CleanMessagesProcessed(uint64) error
SetEnvelopeEventsHandler(handler EnvelopeEventsHandler) error
}

View File

@ -67,6 +67,7 @@ type Transport struct {
keysManager *wakuServiceKeysManager
filters *transport.FiltersManager
logger *zap.Logger
cache *transport.ProcessedMessageIDsCache
mailservers []string
envelopesMonitor *EnvelopesMonitor
@ -103,6 +104,7 @@ func NewTransport(
t := &Transport{
waku: waku,
api: api,
cache: transport.NewProcessedMessageIDsCache(db),
envelopesMonitor: envelopesMonitor,
keysManager: &wakuServiceKeysManager{
waku: waku,
@ -217,9 +219,32 @@ func (a *Transport) RetrieveRawAll() (map[transport.Filter][]*types.Message, err
for _, filter := range allFilters {
msgs, err := a.api.GetFilterMessages(filter.FilterID)
if err != nil {
a.logger.Warn("failed to fetch messages", zap.Error(err))
continue
}
result[*filter] = append(result[*filter], msgs...)
if len(msgs) == 0 {
continue
}
ids := make([]string, len(msgs))
for i := range msgs {
id := types.EncodeHex(msgs[i].Hash)
ids[i] = id
}
hits, err := a.cache.Hits(ids)
if err != nil {
a.logger.Error("failed to check messages exists", zap.Error(err))
return nil, err
}
for i := range msgs {
// Exclude anything that is a cache hit
if !hits[types.EncodeHex(msgs[i].Hash)] {
result[*filter] = append(result[*filter], msgs[i])
}
}
}
return result, nil
@ -390,6 +415,17 @@ func (a *Transport) waitForRequestCompleted(ctx context.Context, requestID []byt
}
}
// ConfirmMessagesProcessed marks the messages as processed in the cache so
// they won't be passed to the next layer anymore
func (a *Transport) ConfirmMessagesProcessed(ids []string, timestamp uint64) error {
return a.cache.Add(ids, timestamp)
}
// CleanMessagesProcessed clears the messages that are older than timestamp
func (a *Transport) CleanMessagesProcessed(timestamp uint64) error {
return a.cache.Clean(timestamp)
}
func (a *Transport) SetEnvelopeEventsHandler(handler transport.EnvelopeEventsHandler) error {
if a.envelopesMonitor == nil {
return errors.New("Current transport has no envelopes monitor")

View File

@ -445,6 +445,16 @@ func (a *Transport) waitForRequestCompleted(ctx context.Context, requestID []byt
}
}
// NOTE: currently not used as whisper is not maintained anymore
func (a *Transport) ConfirmMessagesProcessed(ids []string, timestamp uint64) error {
return nil
}
// NOTE: currently not used as whisper is not maintained anymore
func (a *Transport) CleanMessagesProcessed(timestamp uint64) error {
return nil
}
func (a *Transport) SetEnvelopeEventsHandler(handler transport.EnvelopeEventsHandler) error {
if a.envelopesMonitor == nil {
return errors.New("Current transport has no envelopes monitor")