status-go/protocol/messenger_raw_message_resend.go
2024-05-02 05:40:49 +08:00

206 lines
6.1 KiB
Go

package protocol
import (
"context"
"fmt"
"math"
"time"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/pkg/errors"
"go.uber.org/zap"
"github.com/status-im/status-go/protocol/common"
)
// watchExpiredMessages regularly checks for expired emojis and invoke their resending
func (m *Messenger) watchExpiredMessages() {
m.logger.Debug("watching expired messages")
go func() {
for {
select {
case <-time.After(time.Second):
if m.Online() {
err := m.resendExpiredMessages()
if err != nil {
m.logger.Debug("failed to resend expired message", zap.Error(err))
}
}
case <-m.quit:
return
}
}
}()
}
func (m *Messenger) resendExpiredMessages() error {
if m.connectionState.Offline {
return errors.New("offline")
}
ids, err := m.persistence.ExpiredMessagesIDs(m.config.messageResendMaxCount)
if err != nil {
return errors.Wrapf(err, "Can't get expired reactions from db")
}
for _, id := range ids {
message, shouldResend, err := m.processMessageID(id)
if err != nil {
m.logger.Error("Error processing message ID when trying resend raw message", zap.String("id", id), zap.Error(err))
} else if shouldResend {
m.logger.Debug("Resent raw message",
zap.String("id", id),
zap.Any("message type", message.MessageType),
zap.Int("send count", message.SendCount),
)
}
}
return nil
}
func (m *Messenger) processMessageID(id string) (*common.RawMessage, bool, error) {
rawMessage, err := m.persistence.RawMessageByID(id)
if err != nil {
return nil, false, errors.Wrap(err, "Can't get raw message by ID")
}
shouldResend, err := m.shouldResendMessage(rawMessage, m.getTimesource())
if err != nil {
m.logger.Error("Can't check if message should be resent", zap.Error(err))
return rawMessage, false, err
}
if !shouldResend {
return rawMessage, false, nil
}
switch rawMessage.ResendMethod {
case common.ResendMethodSendCommunityMessage:
err = m.handleSendCommunityMessage(rawMessage)
case common.ResendMethodSendPrivate:
err = m.handleSendPrivateMessage(rawMessage)
case common.ResendMethodDynamic:
shouldResend, err = m.handleOtherResendMethods(rawMessage)
default:
err = errors.New("Unknown resend method")
}
return rawMessage, shouldResend, err
}
func (m *Messenger) handleSendCommunityMessage(rawMessage *common.RawMessage) error {
_, err := m.sender.SendCommunityMessage(context.TODO(), rawMessage)
if err != nil {
err = errors.Wrap(err, "Can't resend message with SendCommunityMessage")
}
m.upsertRawMessageToWatch(rawMessage)
return err
}
func (m *Messenger) handleSendPrivateMessage(rawMessage *common.RawMessage) error {
if len(rawMessage.Recipients) == 0 {
m.logger.Error("No recipients to resend message", zap.String("id", rawMessage.ID))
m.upsertRawMessageToWatch(rawMessage)
return errors.New("No recipients to resend message with SendPrivate")
}
var err error
for _, r := range rawMessage.Recipients {
_, err = m.sender.SendPrivate(context.TODO(), r, rawMessage)
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("Can't resend message with SendPrivate to %s", common.PubkeyToHex(r)))
}
}
m.upsertRawMessageToWatch(rawMessage)
return err
}
func (m *Messenger) handleOtherResendMethods(rawMessage *common.RawMessage) (bool, error) {
chat, ok := m.allChats.Load(rawMessage.LocalChatID)
if !ok {
m.logger.Error("Can't find chat with id", zap.String("id", rawMessage.LocalChatID))
return false, nil // Continue with next message if chat not found
}
if !(chat.Public() || chat.CommunityChat()) {
return false, nil // Only resend for public or community chats
}
if ok {
err := m.persistence.SaveRawMessage(rawMessage)
if err != nil {
m.logger.Error("Can't save raw message marked as expired", zap.Error(err))
return true, err
}
}
return true, m.reSendRawMessage(context.Background(), rawMessage.ID)
}
func (m *Messenger) shouldResendMessage(message *common.RawMessage, t common.TimeSource) (bool, error) {
if m.featureFlags.ResendRawMessagesDisabled {
return false, nil
}
//exponential backoff depends on how many attempts to send message already made
power := math.Pow(2, float64(message.SendCount-1))
backoff := uint64(power) * uint64(m.config.messageResendMinDelay.Milliseconds())
backoffElapsed := t.GetCurrentTime() > (message.LastSent + backoff)
return backoffElapsed, nil
}
// pull a message from the database and send it again
func (m *Messenger) reSendRawMessage(ctx context.Context, messageID string) error {
message, err := m.persistence.RawMessageByID(messageID)
if err != nil {
return err
}
chat, ok := m.allChats.Load(message.LocalChatID)
if !ok {
return errors.New("chat not found")
}
_, err = m.dispatchMessage(ctx, common.RawMessage{
LocalChatID: chat.ID,
Payload: message.Payload,
PubsubTopic: message.PubsubTopic,
MessageType: message.MessageType,
Recipients: message.Recipients,
ResendType: message.ResendType,
SendCount: message.SendCount,
})
return err
}
// UpsertRawMessageToWatch insert/update the rawMessage to the database, resend it if necessary.
// relate watch method: Messenger#watchExpiredMessages
func (m *Messenger) UpsertRawMessageToWatch(rawMessage *common.RawMessage) (*common.RawMessage, error) {
rawMessage.SendCount++
rawMessage.LastSent = m.getTimesource().GetCurrentTime()
err := m.persistence.SaveRawMessage(rawMessage)
if err != nil {
return nil, err
}
return rawMessage, nil
}
func (m *Messenger) upsertRawMessageToWatch(rawMessage *common.RawMessage) {
_, err := m.UpsertRawMessageToWatch(rawMessage)
if err != nil {
// this is unlikely to happen, but we should log it
m.logger.Error("Can't upsert raw message after SendCommunityMessage", zap.Error(err), zap.String("id", rawMessage.ID))
}
}
func (m *Messenger) RawMessagesIDsByType(t protobuf.ApplicationMetadataMessage_Type) ([]string, error) {
return m.persistence.RawMessagesIDsByType(t)
}
func (m *Messenger) RawMessageByID(id string) (*common.RawMessage, error) {
return m.persistence.RawMessageByID(id)
}
func (m *Messenger) UpdateRawMessageSent(id string, sent bool, lastSent uint64) error {
return m.persistence.UpdateRawMessageSent(id, sent, lastSent)
}