Handle async raw message confirmations
Sometimes confirmation for raw messages are received before the record is actually saved in the database. In this case, the code will preserve the Sent status.
This commit is contained in:
parent
83238283b4
commit
14e3eafaeb
|
@ -32,6 +32,19 @@ func NewRawMessagesPersistence(db *sql.DB) *RawMessagesPersistence {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db RawMessagesPersistence) SaveRawMessage(message *RawMessage) error {
|
func (db RawMessagesPersistence) SaveRawMessage(message *RawMessage) error {
|
||||||
|
tx, err := db.db.BeginTx(context.Background(), &sql.TxOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err == nil {
|
||||||
|
err = tx.Commit()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// don't shadow original error
|
||||||
|
_ = tx.Rollback()
|
||||||
|
}()
|
||||||
|
|
||||||
var pubKeys [][]byte
|
var pubKeys [][]byte
|
||||||
for _, pk := range message.Recipients {
|
for _, pk := range message.Recipients {
|
||||||
pubKeys = append(pubKeys, crypto.CompressPubkey(pk))
|
pubKeys = append(pubKeys, crypto.CompressPubkey(pk))
|
||||||
|
@ -44,7 +57,19 @@ func (db RawMessagesPersistence) SaveRawMessage(message *RawMessage) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := db.db.Exec(`
|
// If the message is not sent, we check whether there's a record
|
||||||
|
// in the database already and preserve the state
|
||||||
|
if !message.Sent {
|
||||||
|
oldMessage, err := db.rawMessageByID(tx, message.ID)
|
||||||
|
if err != nil && err != sql.ErrNoRows {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if oldMessage != nil {
|
||||||
|
message.Sent = oldMessage.Sent
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.Exec(`
|
||||||
INSERT INTO
|
INSERT INTO
|
||||||
raw_messages
|
raw_messages
|
||||||
(
|
(
|
||||||
|
@ -80,13 +105,30 @@ func (db RawMessagesPersistence) SaveRawMessage(message *RawMessage) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db RawMessagesPersistence) RawMessageByID(id string) (*RawMessage, error) {
|
func (db RawMessagesPersistence) RawMessageByID(id string) (*RawMessage, error) {
|
||||||
|
tx, err := db.db.BeginTx(context.Background(), &sql.TxOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err == nil {
|
||||||
|
err = tx.Commit()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// don't shadow original error
|
||||||
|
_ = tx.Rollback()
|
||||||
|
}()
|
||||||
|
|
||||||
|
return db.rawMessageByID(tx, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db RawMessagesPersistence) rawMessageByID(tx *sql.Tx, id string) (*RawMessage, error) {
|
||||||
var rawPubKeys [][]byte
|
var rawPubKeys [][]byte
|
||||||
var encodedRecipients []byte
|
var encodedRecipients []byte
|
||||||
var skipGroupMessageWrap sql.NullBool
|
var skipGroupMessageWrap sql.NullBool
|
||||||
var sendOnPersonalTopic sql.NullBool
|
var sendOnPersonalTopic sql.NullBool
|
||||||
message := &RawMessage{}
|
message := &RawMessage{}
|
||||||
|
|
||||||
err := db.db.QueryRow(`
|
err := tx.QueryRow(`
|
||||||
SELECT
|
SELECT
|
||||||
id,
|
id,
|
||||||
local_chat_id,
|
local_chat_id,
|
||||||
|
|
|
@ -574,7 +574,14 @@ func (m *Messenger) processSentMessages(ids []string) error {
|
||||||
|
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
rawMessage, err := m.persistence.RawMessageByID(id)
|
rawMessage, err := m.persistence.RawMessageByID(id)
|
||||||
if err != nil {
|
// If we have no raw message, we create a temporary one, so that
|
||||||
|
// the sent status is preserved
|
||||||
|
if err == sql.ErrNoRows || rawMessage == nil {
|
||||||
|
rawMessage = &common.RawMessage{
|
||||||
|
ID: id,
|
||||||
|
MessageType: protobuf.ApplicationMetadataMessage_CHAT_MESSAGE,
|
||||||
|
}
|
||||||
|
} else if err != nil {
|
||||||
return errors.Wrapf(err, "Can't get raw message with id %v", id)
|
return errors.Wrapf(err, "Can't get raw message with id %v", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -766,6 +766,29 @@ func TestExpiredMessagesIDs(t *testing.T) {
|
||||||
require.Equal(t, 1, len(ids))
|
require.Equal(t, 1, len(ids))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDontOverwriteSentStatus(t *testing.T) {
|
||||||
|
db, err := openTestDB()
|
||||||
|
require.NoError(t, err)
|
||||||
|
p := newSQLitePersistence(db)
|
||||||
|
|
||||||
|
//save rawMessage
|
||||||
|
rawMessage := minimalRawMessage("chat-message-id", protobuf.ApplicationMetadataMessage_CHAT_MESSAGE)
|
||||||
|
rawMessage.Sent = true
|
||||||
|
err = p.SaveRawMessage(rawMessage)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
rawMessage.Sent = false
|
||||||
|
rawMessage.ResendAutomatically = true
|
||||||
|
err = p.SaveRawMessage(rawMessage)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
m, err := p.RawMessageByID(rawMessage.ID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.True(t, m.Sent)
|
||||||
|
require.True(t, m.ResendAutomatically)
|
||||||
|
}
|
||||||
|
|
||||||
func TestPersistenceEmojiReactions(t *testing.T) {
|
func TestPersistenceEmojiReactions(t *testing.T) {
|
||||||
db, err := openTestDB()
|
db, err := openTestDB()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
Loading…
Reference in New Issue