Allow sending messages offline (non-datasync messages)
This commit is contained in:
parent
83c3849899
commit
3564630c33
|
@ -64,8 +64,8 @@ const (
|
||||||
privateChat chatContext = "private-chat"
|
privateChat chatContext = "private-chat"
|
||||||
)
|
)
|
||||||
|
|
||||||
const emojiResendMinDelay = 30
|
const messageResendMinDelay = 30
|
||||||
const emojiResendMaxCount = 3
|
const messageResendMaxCount = 3
|
||||||
|
|
||||||
var communityAdvertiseIntervalSecond int64 = 60 * 60
|
var communityAdvertiseIntervalSecond int64 = 60 * 60
|
||||||
|
|
||||||
|
@ -140,9 +140,16 @@ func (interceptor EnvelopeEventsInterceptor) EnvelopeSent(identifiers [][]byte)
|
||||||
err := interceptor.Messenger.processSentMessages(ids)
|
err := interceptor.Messenger.processSentMessages(ids)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
interceptor.Messenger.logger.Info("Messenger failed to process sent messages", zap.Error(err))
|
interceptor.Messenger.logger.Info("Messenger failed to process sent messages", zap.Error(err))
|
||||||
|
} else {
|
||||||
|
interceptor.EnvelopeEventsHandler.EnvelopeSent(identifiers)
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// NOTE(rasom): In case if interceptor.Messenger is not nil and
|
||||||
|
// some error occurred on processing sent message we don't want
|
||||||
|
// to send envelop.sent signal to the client, thus `else` cause
|
||||||
|
// is necessary.
|
||||||
|
interceptor.EnvelopeEventsHandler.EnvelopeSent(identifiers)
|
||||||
}
|
}
|
||||||
interceptor.EnvelopeEventsHandler.EnvelopeSent(identifiers)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnvelopeExpired triggered when envelope is expired but wasn't delivered to any peer.
|
// EnvelopeExpired triggered when envelope is expired but wasn't delivered to any peer.
|
||||||
|
@ -404,6 +411,10 @@ func NewMessenger(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) processSentMessages(ids []string) error {
|
func (m *Messenger) processSentMessages(ids []string) error {
|
||||||
|
if m.connectionState.Offline {
|
||||||
|
return errors.New("Can't mark message as sent while offline")
|
||||||
|
}
|
||||||
|
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
rawMessage, err := m.persistence.RawMessageByID(id)
|
rawMessage, err := m.persistence.RawMessageByID(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -416,32 +427,42 @@ func (m *Messenger) processSentMessages(ids []string) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "Can't save raw message marked as sent")
|
return errors.Wrapf(err, "Can't save raw message marked as sent")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = m.UpdateMessageOutgoingStatus(id, "sent")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func shouldResendEmojiReaction(message *common.RawMessage, t common.TimeSource) (bool, error) {
|
func shouldResendMessage(message *common.RawMessage, t common.TimeSource) (bool, error) {
|
||||||
if message.MessageType != protobuf.ApplicationMetadataMessage_EMOJI_REACTION {
|
if !(message.MessageType == protobuf.ApplicationMetadataMessage_EMOJI_REACTION ||
|
||||||
return false, errors.New("Should resend only emoji reactions")
|
message.MessageType == protobuf.ApplicationMetadataMessage_CHAT_MESSAGE) {
|
||||||
|
return false, errors.Errorf("Should resend only specific types of messages, can't resend %v", message.MessageType)
|
||||||
}
|
}
|
||||||
|
|
||||||
if message.Sent {
|
if message.Sent {
|
||||||
return false, errors.New("Should resend only non-sent messages")
|
return false, errors.New("Should resend only non-sent messages")
|
||||||
}
|
}
|
||||||
|
|
||||||
if message.SendCount > emojiResendMaxCount {
|
if message.SendCount > messageResendMaxCount {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
//exponential backoff depends on how many attempts to send message already made
|
//exponential backoff depends on how many attempts to send message already made
|
||||||
backoff := uint64(math.Pow(2, float64(message.SendCount-1))) * emojiResendMinDelay * uint64(time.Second)
|
backoff := uint64(math.Pow(2, float64(message.SendCount-1))) * messageResendMinDelay * uint64(time.Second.Milliseconds())
|
||||||
backoffElapsed := t.GetCurrentTime() > (message.LastSent + backoff)
|
backoffElapsed := t.GetCurrentTime() > (message.LastSent + backoff)
|
||||||
return backoffElapsed, nil
|
return backoffElapsed, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) resendExpiredEmojiReactions() error {
|
func (m *Messenger) resendExpiredMessages() error {
|
||||||
ids, err := m.persistence.ExpiredEmojiReactionsIDs(emojiResendMaxCount)
|
if m.connectionState.Offline {
|
||||||
|
return errors.New("offline")
|
||||||
|
}
|
||||||
|
|
||||||
|
ids, err := m.persistence.ExpiredMessagesIDs(messageResendMaxCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "Can't get expired reactions from db")
|
return errors.Wrapf(err, "Can't get expired reactions from db")
|
||||||
}
|
}
|
||||||
|
@ -452,7 +473,21 @@ func (m *Messenger) resendExpiredEmojiReactions() error {
|
||||||
return errors.Wrapf(err, "Can't get raw message with id %v", id)
|
return errors.Wrapf(err, "Can't get raw message with id %v", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
if ok, err := shouldResendEmojiReaction(rawMessage, m.getTimesource()); ok {
|
chat, ok := m.allChats.Load(rawMessage.LocalChatID)
|
||||||
|
if !ok {
|
||||||
|
return ErrChatNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
if !(chat.Public() || chat.CommunityChat()) {
|
||||||
|
return errors.New("Only public chats and community chats messages are resent")
|
||||||
|
}
|
||||||
|
|
||||||
|
ok, err = shouldResendMessage(rawMessage, m.getTimesource())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if ok {
|
||||||
err = m.persistence.SaveRawMessage(rawMessage)
|
err = m.persistence.SaveRawMessage(rawMessage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "Can't save raw message marked as non-expired")
|
return errors.Wrapf(err, "Can't save raw message marked as non-expired")
|
||||||
|
@ -462,8 +497,6 @@ func (m *Messenger) resendExpiredEmojiReactions() error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "Can't resend expired message with id %v", rawMessage.ID)
|
return errors.Wrapf(err, "Can't resend expired message with id %v", rawMessage.ID)
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -524,7 +557,7 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
|
||||||
m.handleConnectionChange(m.online())
|
m.handleConnectionChange(m.online())
|
||||||
m.handleENSVerificationSubscription(ensSubscription)
|
m.handleENSVerificationSubscription(ensSubscription)
|
||||||
m.watchConnectionChange()
|
m.watchConnectionChange()
|
||||||
m.watchExpiredEmojis()
|
m.watchExpiredMessages()
|
||||||
m.watchIdentityImageChanges()
|
m.watchIdentityImageChanges()
|
||||||
m.broadcastLatestUserStatus()
|
m.broadcastLatestUserStatus()
|
||||||
m.startBackupLoop()
|
m.startBackupLoop()
|
||||||
|
@ -998,15 +1031,15 @@ func (m *Messenger) watchConnectionChange() {
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// watchExpiredEmojis regularly checks for expired emojis and invoke their resending
|
// watchExpiredMessages regularly checks for expired emojis and invoke their resending
|
||||||
func (m *Messenger) watchExpiredEmojis() {
|
func (m *Messenger) watchExpiredMessages() {
|
||||||
m.logger.Debug("watching expired emojis")
|
m.logger.Debug("watching expired messages")
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-time.After(time.Second):
|
case <-time.After(time.Second):
|
||||||
if m.online() {
|
if m.online() {
|
||||||
err := m.resendExpiredEmojiReactions()
|
err := m.resendExpiredMessages()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.logger.Debug("Error when resending expired emoji reactions", zap.Error(err))
|
m.logger.Debug("Error when resending expired emoji reactions", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
|
@ -2242,7 +2242,7 @@ func (s *MessengerSuite) TestLastSentField() {
|
||||||
|
|
||||||
func (s *MessengerSuite) TestShouldResendEmoji() {
|
func (s *MessengerSuite) TestShouldResendEmoji() {
|
||||||
// shouldn't try to resend non-emoji messages.
|
// shouldn't try to resend non-emoji messages.
|
||||||
ok, err := shouldResendEmojiReaction(&common.RawMessage{
|
ok, err := shouldResendMessage(&common.RawMessage{
|
||||||
MessageType: protobuf.ApplicationMetadataMessage_CONTACT_UPDATE,
|
MessageType: protobuf.ApplicationMetadataMessage_CONTACT_UPDATE,
|
||||||
Sent: false,
|
Sent: false,
|
||||||
SendCount: 2,
|
SendCount: 2,
|
||||||
|
@ -2251,7 +2251,7 @@ func (s *MessengerSuite) TestShouldResendEmoji() {
|
||||||
s.False(ok)
|
s.False(ok)
|
||||||
|
|
||||||
// shouldn't try to resend already sent message
|
// shouldn't try to resend already sent message
|
||||||
ok, err = shouldResendEmojiReaction(&common.RawMessage{
|
ok, err = shouldResendMessage(&common.RawMessage{
|
||||||
MessageType: protobuf.ApplicationMetadataMessage_EMOJI_REACTION,
|
MessageType: protobuf.ApplicationMetadataMessage_EMOJI_REACTION,
|
||||||
Sent: true,
|
Sent: true,
|
||||||
SendCount: 1,
|
SendCount: 1,
|
||||||
|
@ -2260,50 +2260,50 @@ func (s *MessengerSuite) TestShouldResendEmoji() {
|
||||||
s.False(ok)
|
s.False(ok)
|
||||||
|
|
||||||
// messages that already sent to many times shouldn't be resend
|
// messages that already sent to many times shouldn't be resend
|
||||||
ok, err = shouldResendEmojiReaction(&common.RawMessage{
|
ok, err = shouldResendMessage(&common.RawMessage{
|
||||||
MessageType: protobuf.ApplicationMetadataMessage_EMOJI_REACTION,
|
MessageType: protobuf.ApplicationMetadataMessage_EMOJI_REACTION,
|
||||||
Sent: false,
|
Sent: false,
|
||||||
SendCount: emojiResendMaxCount + 1,
|
SendCount: messageResendMaxCount + 1,
|
||||||
}, s.m.getTimesource())
|
}, s.m.getTimesource())
|
||||||
s.NoError(err)
|
s.NoError(err)
|
||||||
s.False(ok)
|
s.False(ok)
|
||||||
|
|
||||||
// message sent one time CAN'T be resend in 15 seconds (only after 30)
|
// message sent one time CAN'T be resend in 15 seconds (only after 30)
|
||||||
ok, err = shouldResendEmojiReaction(&common.RawMessage{
|
ok, err = shouldResendMessage(&common.RawMessage{
|
||||||
MessageType: protobuf.ApplicationMetadataMessage_EMOJI_REACTION,
|
MessageType: protobuf.ApplicationMetadataMessage_EMOJI_REACTION,
|
||||||
Sent: false,
|
Sent: false,
|
||||||
SendCount: 1,
|
SendCount: 1,
|
||||||
LastSent: s.m.getTimesource().GetCurrentTime() - 15*uint64(time.Second),
|
LastSent: s.m.getTimesource().GetCurrentTime() - 15*uint64(time.Second.Milliseconds()),
|
||||||
}, s.m.getTimesource())
|
}, s.m.getTimesource())
|
||||||
s.NoError(err)
|
s.NoError(err)
|
||||||
s.False(ok)
|
s.False(ok)
|
||||||
|
|
||||||
// message sent one time CAN be resend in 35 seconds
|
// message sent one time CAN be resend in 35 seconds
|
||||||
ok, err = shouldResendEmojiReaction(&common.RawMessage{
|
ok, err = shouldResendMessage(&common.RawMessage{
|
||||||
MessageType: protobuf.ApplicationMetadataMessage_EMOJI_REACTION,
|
MessageType: protobuf.ApplicationMetadataMessage_EMOJI_REACTION,
|
||||||
Sent: false,
|
Sent: false,
|
||||||
SendCount: 1,
|
SendCount: 1,
|
||||||
LastSent: s.m.getTimesource().GetCurrentTime() - 35*uint64(time.Second),
|
LastSent: s.m.getTimesource().GetCurrentTime() - 35*uint64(time.Second.Milliseconds()),
|
||||||
}, s.m.getTimesource())
|
}, s.m.getTimesource())
|
||||||
s.NoError(err)
|
s.NoError(err)
|
||||||
s.True(ok)
|
s.True(ok)
|
||||||
|
|
||||||
// message sent three times CAN'T be resend in 100 seconds (only after 120)
|
// message sent three times CAN'T be resend in 100 seconds (only after 120)
|
||||||
ok, err = shouldResendEmojiReaction(&common.RawMessage{
|
ok, err = shouldResendMessage(&common.RawMessage{
|
||||||
MessageType: protobuf.ApplicationMetadataMessage_EMOJI_REACTION,
|
MessageType: protobuf.ApplicationMetadataMessage_EMOJI_REACTION,
|
||||||
Sent: false,
|
Sent: false,
|
||||||
SendCount: 3,
|
SendCount: 3,
|
||||||
LastSent: s.m.getTimesource().GetCurrentTime() - 100*uint64(time.Second),
|
LastSent: s.m.getTimesource().GetCurrentTime() - 100*uint64(time.Second.Milliseconds()),
|
||||||
}, s.m.getTimesource())
|
}, s.m.getTimesource())
|
||||||
s.NoError(err)
|
s.NoError(err)
|
||||||
s.False(ok)
|
s.False(ok)
|
||||||
|
|
||||||
// message sent tow times CAN be resend in 65 seconds
|
// message sent tow times CAN be resend in 65 seconds
|
||||||
ok, err = shouldResendEmojiReaction(&common.RawMessage{
|
ok, err = shouldResendMessage(&common.RawMessage{
|
||||||
MessageType: protobuf.ApplicationMetadataMessage_EMOJI_REACTION,
|
MessageType: protobuf.ApplicationMetadataMessage_EMOJI_REACTION,
|
||||||
Sent: false,
|
Sent: false,
|
||||||
SendCount: 3,
|
SendCount: 3,
|
||||||
LastSent: s.m.getTimesource().GetCurrentTime() - 125*uint64(time.Second),
|
LastSent: s.m.getTimesource().GetCurrentTime() - 125*uint64(time.Second.Milliseconds()),
|
||||||
}, s.m.getTimesource())
|
}, s.m.getTimesource())
|
||||||
s.NoError(err)
|
s.NoError(err)
|
||||||
s.True(ok)
|
s.True(ok)
|
||||||
|
@ -2359,7 +2359,7 @@ func (s *MessengerSuite) TestResendExpiredEmojis() {
|
||||||
s.Equal(1, rawMessage.SendCount)
|
s.Equal(1, rawMessage.SendCount)
|
||||||
|
|
||||||
//imitate that more than 30 seconds passed since message was sent
|
//imitate that more than 30 seconds passed since message was sent
|
||||||
rawMessage.LastSent = rawMessage.LastSent - 35*uint64(time.Second)
|
rawMessage.LastSent = rawMessage.LastSent - 35*uint64(time.Second.Milliseconds())
|
||||||
err = s.m.persistence.SaveRawMessage(rawMessage)
|
err = s.m.persistence.SaveRawMessage(rawMessage)
|
||||||
s.NoError(err)
|
s.NoError(err)
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
|
|
|
@ -643,7 +643,7 @@ func (db sqlitePersistence) SaveContactChatIdentity(contactID string, chatIdenti
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db sqlitePersistence) ExpiredEmojiReactionsIDs(maxSendCount int) ([]string, error) {
|
func (db sqlitePersistence) ExpiredMessagesIDs(maxSendCount int) ([]string, error) {
|
||||||
ids := []string{}
|
ids := []string{}
|
||||||
|
|
||||||
rows, err := db.db.Query(`
|
rows, err := db.db.Query(`
|
||||||
|
@ -652,8 +652,11 @@ func (db sqlitePersistence) ExpiredEmojiReactionsIDs(maxSendCount int) ([]string
|
||||||
FROM
|
FROM
|
||||||
raw_messages
|
raw_messages
|
||||||
WHERE
|
WHERE
|
||||||
message_type = ? AND sent = ? AND send_count <= ?`,
|
message_type IN (?, ?) AND sent = ? AND send_count <= ?`,
|
||||||
protobuf.ApplicationMetadataMessage_EMOJI_REACTION, false, maxSendCount)
|
protobuf.ApplicationMetadataMessage_CHAT_MESSAGE,
|
||||||
|
protobuf.ApplicationMetadataMessage_EMOJI_REACTION,
|
||||||
|
false,
|
||||||
|
maxSendCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ids, err
|
return ids, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -533,23 +533,12 @@ func TestMessagesIDsByType(t *testing.T) {
|
||||||
require.Equal(t, "emoji-message-id", ids[0])
|
require.Equal(t, "emoji-message-id", ids[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestExpiredEmojiReactionsIDs(t *testing.T) {
|
func TestExpiredMessagesIDs(t *testing.T) {
|
||||||
db, err := openTestDB()
|
db, err := openTestDB()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
p := NewSQLitePersistence(db)
|
p := NewSQLitePersistence(db)
|
||||||
|
|
||||||
ids, err := p.ExpiredEmojiReactionsIDs(emojiResendMaxCount)
|
ids, err := p.ExpiredMessagesIDs(messageResendMaxCount)
|
||||||
require.NoError(t, err)
|
|
||||||
require.Empty(t, ids)
|
|
||||||
|
|
||||||
//save expired chat message
|
|
||||||
rawChatMessage := minimalRawMessage("chat-message-id", protobuf.ApplicationMetadataMessage_CHAT_MESSAGE)
|
|
||||||
rawChatMessage.Sent = false
|
|
||||||
err = p.SaveRawMessage(rawChatMessage)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
//make sure it doesn't apper in an expired emoji reactions list
|
|
||||||
ids, err = p.ExpiredEmojiReactionsIDs(emojiResendMaxCount)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Empty(t, ids)
|
require.Empty(t, ids)
|
||||||
|
|
||||||
|
@ -560,7 +549,7 @@ func TestExpiredEmojiReactionsIDs(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
//make sure it appered in expired emoji reactions list
|
//make sure it appered in expired emoji reactions list
|
||||||
ids, err = p.ExpiredEmojiReactionsIDs(emojiResendMaxCount)
|
ids, err = p.ExpiredMessagesIDs(messageResendMaxCount)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 1, len(ids))
|
require.Equal(t, 1, len(ids))
|
||||||
|
|
||||||
|
@ -571,7 +560,7 @@ func TestExpiredEmojiReactionsIDs(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
//make sure it didn't appear in expired emoji reactions list
|
//make sure it didn't appear in expired emoji reactions list
|
||||||
ids, err = p.ExpiredEmojiReactionsIDs(emojiResendMaxCount)
|
ids, err = p.ExpiredMessagesIDs(messageResendMaxCount)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 1, len(ids))
|
require.Equal(t, 1, len(ids))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue