add API to shhext to manage messages

This commit is contained in:
Adam Babik 2019-08-06 23:50:13 +02:00 committed by Andrea Maria Piana
parent 9ae7d2b6d5
commit 8383feea04
11 changed files with 534 additions and 71 deletions

2
go.mod
View File

@ -21,7 +21,7 @@ require (
github.com/status-im/doubleratchet v2.0.0+incompatible
github.com/status-im/migrate/v4 v4.3.1-status
github.com/status-im/rendezvous v1.3.0
github.com/status-im/status-protocol-go v0.0.0-20190701094942-e1f4f17bafc4ac757933fb6a801fe9fed68b6c4d
github.com/status-im/status-protocol-go v0.0.0-20190701094942-c2b7b022b722d7bebe1c6d6f05cdead79f1b57bd
github.com/status-im/whisper v1.4.14
github.com/stretchr/testify v1.3.0
github.com/syndtr/goleveldb v1.0.0

4
go.sum
View File

@ -450,8 +450,8 @@ github.com/status-im/migrate/v4 v4.3.1-status h1:tJwsEYLgbFkvlTSMk89APwRDfpr4yG8
github.com/status-im/migrate/v4 v4.3.1-status/go.mod h1:r8HggRBZ/k7TRwByq/Hp3P/ubFppIna0nvyavVK0pjA=
github.com/status-im/rendezvous v1.3.0 h1:7RK/MXXW+tlm0asKm1u7Qp7Yni6AO29a7j8+E4Lbjg4=
github.com/status-im/rendezvous v1.3.0/go.mod h1:+hzjuP+j/XzLPeF6E50b88pWOTLdTcwjvNYt+Gh1W1s=
github.com/status-im/status-protocol-go v0.0.0-20190701094942-e1f4f17bafc4ac757933fb6a801fe9fed68b6c4d h1:3Cnw7SrJ4AeTHFcZf/xQuvQSO+f/vNp3kinIes3nFIs=
github.com/status-im/status-protocol-go v0.0.0-20190701094942-e1f4f17bafc4ac757933fb6a801fe9fed68b6c4d/go.mod h1:thrQ4V0ZUmLZPDf74xVzub1gxgSNFaSTeTQdxtRJnTU=
github.com/status-im/status-protocol-go v0.0.0-20190701094942-c2b7b022b722d7bebe1c6d6f05cdead79f1b57bd h1:ZGCzGQ41kPy5oNpHColf3ZTNN9DXWZATgJoV2cQZaC4=
github.com/status-im/status-protocol-go v0.0.0-20190701094942-c2b7b022b722d7bebe1c6d6f05cdead79f1b57bd/go.mod h1:thrQ4V0ZUmLZPDf74xVzub1gxgSNFaSTeTQdxtRJnTU=
github.com/status-im/whisper v1.4.14 h1:9VHqx4+PUYfhDnYYtDxHkg/3cfVvkHjPNciY4LO83yc=
github.com/status-im/whisper v1.4.14/go.mod h1:WS6z39YJQ8WJa9s+DmTuEM/s2nVF6Iz3B1SZYw5cYf0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

View File

@ -633,6 +633,30 @@ func (api *PublicAPI) SetInstallationMetadata(installationID string, data *multi
return api.service.messenger.SetInstallationMetadata(installationID, data)
}
func (api *PublicAPI) MessageByChatID(chatID, cursor string, limit int) ([]*statusproto.Message, string, error) {
return api.service.messenger.MessageByChatID(chatID, cursor, limit)
}
func (api *PublicAPI) MessagesFrom(from string) ([]*statusproto.Message, error) {
return api.service.messenger.MessagesFrom(from)
}
func (api *PublicAPI) SaveMessage(message *statusproto.Message) error {
return api.service.messenger.SaveMessage(message)
}
func (api *PublicAPI) DeleteMessage(id string) error {
return api.service.messenger.DeleteMessage(id)
}
func (api *PublicAPI) MarkMessagesSeen(ids ...string) error {
return api.service.messenger.MarkMessagesSeen(ids...)
}
func (api *PublicAPI) UpdateMessageOutgoingStatus(id, newOutgoingStatus string) error {
return api.service.messenger.UpdateMessageOutgoingStatus(id, newOutgoingStatus)
}
// -----
// HELPER
// -----

View File

@ -0,0 +1,56 @@
package statusproto
import (
"database/sql/driver"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
)
type hexutilSQL hexutil.Bytes
func (h hexutilSQL) Value() (driver.Value, error) {
return []byte(h), nil
}
func (h *hexutilSQL) Scan(value interface{}) error {
if value == nil {
return nil
}
if b, ok := value.([]byte); ok {
*h = hexutilSQL(b)
return nil
}
return errors.New("failed to scan hexutilSQL")
}
// Message represents a message record in the database,
// more specifically in user_messages_legacy table.
// Encoding and decoding of byte blobs should be performed
// using hexutil package.
type Message struct {
// ID calculated as keccak256(compressedAuthorPubKey, data) where data is unencrypted payload.
ID string `json:"id"`
// RawPayloadHash is a Whisper envelope hash.
RawPayloadHash string `json:"rawPayloadHash"`
// WhisperTimestamp is a timestamp of a Whisper envelope.
WhisperTimestamp int64 `json:"whisperTimestamp"`
// From is a public key of the author of the message.
From hexutilSQL `json:"from"`
// To is a public key of the recipient unless it's a public message then it's empty.
To hexutilSQL `json:"to,omitempty"`
// BEGIN: fields from protocol.Message.
Content string `json:"content"`
ContentType string `json:"contentType"`
Timestamp int64 `json:"timestamp"`
ChatID string `json:"chatId"`
MessageType string `json:"messageType,omitempty"`
MessageStatus string `json:"messageStatus,omitempty"`
ClockValue int64 `json:"clockValue"`
// END
Username string `json:"username,omitempty"`
RetryCount int `json:"retryCount"`
Show bool `json:"show"` // default true
Seen bool `json:"seen"`
OutgoingStatus string `json:"outgoingStatus,omitempty"`
}

View File

@ -6,6 +6,8 @@ import (
"database/sql"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"go.uber.org/zap"
"github.com/pkg/errors"
@ -608,3 +610,61 @@ func (m *Messenger) ConfirmMessagesProcessed(messageIDs [][]byte) error {
}
return nil
}
// DEPRECATED: required by status-react.
func (m *Messenger) MessageByID(id string) (*Message, error) {
return m.persistence.MessageByID(id)
}
// DEPRECATED: required by status-react.
func (m *Messenger) MessageExists(id string) (bool, error) {
return m.persistence.MessageExists(id)
}
// DEPRECATED: required by status-react.
func (m *Messenger) MessageByChatID(chatID, cursor string, limit int) ([]*Message, string, error) {
return m.persistence.MessageByChatID(chatID, cursor, limit)
}
// DEPRECATED: required by status-react.
func (m *Messenger) MessagesFrom(from string) ([]*Message, error) {
publicKeyBytes, err := hexutil.Decode(from)
if err != nil {
return nil, errors.Wrap(err, "failed to decode from argument")
}
return m.persistence.MessagesFrom(publicKeyBytes)
}
// DEPRECATED: required by status-react.
func (m *Messenger) UnseenMessageIDs() ([]string, error) {
ids, err := m.persistence.UnseenMessageIDs()
if err != nil {
return nil, err
}
result := make([]string, 0, len(ids))
for _, id := range ids {
result = append(result, hexutil.Encode(id))
}
return result, nil
}
// DEPRECATED: required by status-react.
func (m *Messenger) SaveMessage(message *Message) error {
return m.persistence.SaveMessage(message)
}
// DEPRECATED: required by status-react.
func (m *Messenger) DeleteMessage(id string) error {
return m.persistence.DeleteMessage(id)
}
// DEPRECATED: required by status-react.
func (m *Messenger) MarkMessagesSeen(ids ...string) error {
return m.persistence.MarkMessagesSeen(ids...)
}
// DEPRECATED: required by status-react.
func (m *Messenger) UpdateMessageOutgoingStatus(id, newOutgoingStatus string) error {
return m.persistence.UpdateMessageOutgoingStatus(id, newOutgoingStatus)
}

View File

@ -6,6 +6,8 @@
// 000002_add_chats.up.db.sql (541B)
// 000003_add_contacts.down.db.sql (21B)
// 000003_add_contacts.up.db.sql (251B)
// 000004_user_messages_compatibility.down.sql (33B)
// 000004_user_messages_compatibility.up.sql (945B)
// doc.go (377B)
package migrations
@ -130,7 +132,7 @@ func _000002_add_chatsDownDbSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "000002_add_chats.down.db.sql", size: 74, mode: os.FileMode(0644), modTime: time.Unix(1564587343, 0)}
info := bindataFileInfo{name: "000002_add_chats.down.db.sql", size: 74, mode: os.FileMode(0644), modTime: time.Unix(1565597460, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xd3, 0xa7, 0xf0, 0x94, 0x7a, 0x9, 0xdc, 0x6c, 0x7b, 0xdc, 0x12, 0x30, 0x55, 0x31, 0x17, 0xf2, 0xcc, 0x6e, 0xfd, 0xbb, 0x70, 0xb9, 0xd8, 0x9f, 0x81, 0x83, 0xdc, 0x1d, 0x1c, 0x3a, 0x8d, 0xce}}
return a, nil
}
@ -150,7 +152,7 @@ func _000002_add_chatsUpDbSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "000002_add_chats.up.db.sql", size: 541, mode: os.FileMode(0644), modTime: time.Unix(1564587387, 0)}
info := bindataFileInfo{name: "000002_add_chats.up.db.sql", size: 541, mode: os.FileMode(0644), modTime: time.Unix(1565597460, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xd, 0x7f, 0x3a, 0xd7, 0xf6, 0x8b, 0x6e, 0x4d, 0xce, 0x7d, 0x63, 0x1d, 0x30, 0xa2, 0xc1, 0xb, 0xa0, 0x35, 0x2e, 0xfa, 0xef, 0xf0, 0x39, 0xf7, 0x22, 0xdd, 0x31, 0x11, 0xb1, 0xff, 0xbf, 0xb3}}
return a, nil
}
@ -170,7 +172,7 @@ func _000003_add_contactsDownDbSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "000003_add_contacts.down.db.sql", size: 21, mode: os.FileMode(0644), modTime: time.Unix(1564721146, 0)}
info := bindataFileInfo{name: "000003_add_contacts.down.db.sql", size: 21, mode: os.FileMode(0644), modTime: time.Unix(1565597570, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xfc, 0x7e, 0xb, 0xec, 0x72, 0xcd, 0x21, 0x3e, 0xa2, 0x38, 0xe0, 0x95, 0x7e, 0xce, 0x4a, 0x17, 0xc8, 0xd0, 0x1c, 0xfa, 0xa3, 0x23, 0x5, 0xab, 0x89, 0xf9, 0xfc, 0x63, 0x7, 0x28, 0xe9, 0x93}}
return a, nil
}
@ -190,11 +192,51 @@ func _000003_add_contactsUpDbSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "000003_add_contacts.up.db.sql", size: 251, mode: os.FileMode(0644), modTime: time.Unix(1564721109, 0)}
info := bindataFileInfo{name: "000003_add_contacts.up.db.sql", size: 251, mode: os.FileMode(0644), modTime: time.Unix(1565597570, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x8f, 0x19, 0x9f, 0x5c, 0x9d, 0xa1, 0xe5, 0x99, 0xbe, 0x47, 0xce, 0xa5, 0xd3, 0x51, 0x2f, 0x9b, 0x1d, 0xd9, 0x3f, 0x7a, 0xbf, 0xf, 0x76, 0x6b, 0x4f, 0x82, 0xbd, 0x13, 0x9d, 0x25, 0xdd, 0x60}}
return a, nil
}
var __000004_user_messages_compatibilityDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\x08\x71\x74\xf2\x71\x55\x28\x2d\x4e\x2d\x8a\xcf\x4d\x2d\x2e\x4e\x4c\x4f\x2d\x8e\xcf\x49\x4d\x4f\x4c\xae\xb4\xe6\x02\x04\x00\x00\xff\xff\x25\xef\xa4\x66\x21\x00\x00\x00")
func _000004_user_messages_compatibilityDownSqlBytes() ([]byte, error) {
return bindataRead(
__000004_user_messages_compatibilityDownSql,
"000004_user_messages_compatibility.down.sql",
)
}
func _000004_user_messages_compatibilityDownSql() (*asset, error) {
bytes, err := _000004_user_messages_compatibilityDownSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "000004_user_messages_compatibility.down.sql", size: 33, mode: os.FileMode(0644), modTime: time.Unix(1565597680, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xb9, 0xaf, 0x48, 0x80, 0x3d, 0x54, 0x5e, 0x53, 0xee, 0x98, 0x26, 0xbb, 0x99, 0x6a, 0xd8, 0x37, 0x94, 0xf2, 0xf, 0x82, 0xfa, 0xb7, 0x6a, 0x68, 0xcd, 0x8b, 0xe2, 0xc4, 0x6, 0x25, 0xdc, 0x6}}
return a, nil
}
var __000004_user_messages_compatibilityUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xa4\x92\x41\x6f\x9b\x4e\x10\xc5\xef\xfe\x14\x73\xb3\x2d\x99\xbf\x72\x88\x72\xc9\x09\x3b\xeb\x7f\x51\x29\x44\x18\x57\xc9\x69\xb5\x86\x29\xac\x0a\xbb\x68\x67\x28\x45\xca\x87\xaf\x08\x38\x0a\xae\x7d\x2a\x07\x0e\xfb\x7b\xb3\xf3\xf6\xcd\x78\x1e\x04\xbc\x24\xd0\x75\x63\x1d\x2b\xc3\xc0\xa5\x1a\x7e\x9a\x80\xd5\xa9\x42\x28\x15\x81\xb3\x9d\xce\x41\x11\x74\x08\x0e\xab\x1e\xac\x01\xcd\x0b\xcf\x83\xae\x44\x33\x14\x57\x58\xa3\x61\x6d\x0a\xd0\xe6\x87\x36\x9a\xd1\xa3\xcc\xd9\xaa\xfa\x6f\xb1\x4b\x84\x9f\x0a\x48\xfd\x6d\x28\x20\xd8\x43\x14\xa7\x20\x5e\x82\x43\x7a\x80\x96\xd0\xc9\x1a\x89\x54\x81\x24\x2b\x2c\x54\xd6\xc3\x6a\x01\x00\xa0\x73\xf8\xee\x27\xbb\x2f\x7e\x02\xcf\x49\xf0\xcd\x4f\x5e\xe1\xab\x78\x85\x38\x82\x5d\x1c\xed\xc3\x60\x97\x42\x22\x9e\x43\x7f\x27\x36\xef\x7a\xa7\x3a\xd9\xa8\xbe\xb2\x2a\x97\xa5\xa2\xf2\xa3\x7a\x68\x17\x1d\xc3\x70\x94\x75\xa5\xa6\x06\x9d\x64\x5d\x23\xb1\xaa\x1b\x08\xa2\x54\xfc\x2f\x2e\x75\x64\x5b\x97\x21\x6c\xc3\x78\x7b\x41\x72\x24\xd6\x46\xb1\xb6\xe6\x1d\x8f\xa7\x99\x35\x8c\x86\x6f\x74\x9d\xa8\xe4\xbe\xc1\x1b\x92\x21\x0a\xa3\xea\x0f\x3c\x9e\xce\x6c\x5e\x5e\x5a\x2a\x96\x9f\x62\x9a\x53\x87\xec\x7a\x99\xd9\xd6\xf0\xac\x16\x9e\xc4\xde\x3f\x86\x29\xdc\x8d\xba\x29\xfd\x99\xb5\x39\x21\x56\xdc\xd2\x9c\x65\x95\xcd\x7e\xca\x5f\xaa\x6a\xf1\x8a\x33\x2a\x6d\x07\xdb\x38\x0e\x85\x1f\xfd\xdd\x38\x4d\x8e\xd3\xcc\x08\xd1\xdc\xd6\xed\xfd\xf0\x30\x09\x6d\xcb\x85\xd5\xa6\xb8\xf0\xb2\x58\x3f\x2e\xce\xeb\x15\x44\x4f\xe2\x05\x74\xfe\x5b\x4e\xa3\x8b\xa3\xab\xeb\xb5\x1a\xf1\xfa\xf1\x4a\x21\x2a\x97\x95\xf2\xd4\xcb\x73\xb4\x71\x04\xd7\x2f\x19\xed\xb7\x27\x62\xb7\x5a\xde\xfd\xe3\xb7\x84\xb7\xb7\xcf\x89\x6e\xc0\x7b\xb8\xdf\xc0\xc3\xfd\x7a\x00\x3a\xdf\x9c\x47\x3d\xbc\xf7\x4f\x00\x00\x00\xff\xff\x6b\xae\x37\x6d\xb1\x03\x00\x00")
func _000004_user_messages_compatibilityUpSqlBytes() ([]byte, error) {
return bindataRead(
__000004_user_messages_compatibilityUpSql,
"000004_user_messages_compatibility.up.sql",
)
}
func _000004_user_messages_compatibilityUpSql() (*asset, error) {
bytes, err := _000004_user_messages_compatibilityUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "000004_user_messages_compatibility.up.sql", size: 945, mode: os.FileMode(0644), modTime: time.Unix(1565597764, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xde, 0x18, 0x87, 0x4e, 0x97, 0xf9, 0x2f, 0x18, 0x2, 0x34, 0x53, 0x24, 0x23, 0x2a, 0x6c, 0xe1, 0xba, 0x34, 0x44, 0x72, 0x24, 0x14, 0xf0, 0x3a, 0x2e, 0x56, 0x77, 0xd0, 0xfe, 0x5f, 0x45, 0x63}}
return a, nil
}
var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x84\x8f\xbb\x6e\xc3\x30\x0c\x45\x77\x7f\xc5\x45\x96\x2c\xb5\xb4\x74\xea\xd6\xb1\x7b\x7f\x80\x91\x68\x89\x88\x1e\xae\x48\xe7\xf1\xf7\x85\xd3\x02\xcd\xd6\xf5\x00\xe7\xf0\xd2\x7b\x7c\x66\x51\x2c\x52\x18\xa2\x68\x1c\x58\x95\xc6\x1d\x27\x0e\xb4\x29\xe3\x90\xc4\xf2\x76\x72\xa1\x57\xaf\x46\xb6\xe9\x2c\xd5\x57\x49\x83\x8c\xfd\xe5\xf5\x30\x79\x8f\x40\xed\x68\xc8\xd4\x62\xe1\x47\x4b\xa1\x46\xc3\xa4\x25\x5c\xc5\x32\x08\xeb\xe0\x45\x6e\x0e\xef\x86\xc2\xa4\x06\xcb\x64\x47\x85\x65\x46\x20\xe5\x3d\xb3\xf4\x81\xd4\xe7\x93\xb4\x48\x46\x6e\x47\x1f\xcb\x13\xd9\x17\x06\x2a\x85\x23\x96\xd1\xeb\xc3\x55\xaa\x8c\x28\x83\x83\xf5\x71\x7f\x01\xa9\xb2\xa1\x51\x65\xdd\xfd\x4c\x17\x46\xeb\xbf\xe7\x41\x2d\xfe\xff\x11\xae\x7d\x9c\x15\xa4\xe0\xdb\xca\xc1\x38\xba\x69\x5a\x29\x9c\x29\x31\xf4\xab\x88\xf1\x34\x79\x9f\xfa\x5b\xe2\xc6\xbb\xf5\xbc\x71\x5e\xcf\x09\x3f\x35\xe9\x4d\x31\x77\x38\xe7\xff\x80\x4b\x1d\x6e\xfa\x0e\x00\x00\xff\xff\x9d\x60\x3d\x88\x79\x01\x00\x00")
func docGoBytes() ([]byte, error) {
@ -318,6 +360,10 @@ var _bindata = map[string]func() (*asset, error){
"000003_add_contacts.up.db.sql": _000003_add_contactsUpDbSql,
"000004_user_messages_compatibility.down.sql": _000004_user_messages_compatibilityDownSql,
"000004_user_messages_compatibility.up.sql": _000004_user_messages_compatibilityUpSql,
"doc.go": docGo,
}
@ -362,13 +408,15 @@ type bintree struct {
}
var _bintree = &bintree{nil, map[string]*bintree{
"000001_init.down.db.sql": &bintree{_000001_initDownDbSql, map[string]*bintree{}},
"000001_init.up.db.sql": &bintree{_000001_initUpDbSql, map[string]*bintree{}},
"000002_add_chats.down.db.sql": &bintree{_000002_add_chatsDownDbSql, map[string]*bintree{}},
"000002_add_chats.up.db.sql": &bintree{_000002_add_chatsUpDbSql, map[string]*bintree{}},
"000003_add_contacts.down.db.sql": &bintree{_000003_add_contactsDownDbSql, map[string]*bintree{}},
"000003_add_contacts.up.db.sql": &bintree{_000003_add_contactsUpDbSql, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}},
"000001_init.down.db.sql": &bintree{_000001_initDownDbSql, map[string]*bintree{}},
"000001_init.up.db.sql": &bintree{_000001_initUpDbSql, map[string]*bintree{}},
"000002_add_chats.down.db.sql": &bintree{_000002_add_chatsDownDbSql, map[string]*bintree{}},
"000002_add_chats.up.db.sql": &bintree{_000002_add_chatsUpDbSql, map[string]*bintree{}},
"000003_add_contacts.down.db.sql": &bintree{_000003_add_contactsDownDbSql, map[string]*bintree{}},
"000003_add_contacts.up.db.sql": &bintree{_000003_add_contactsUpDbSql, map[string]*bintree{}},
"000004_user_messages_compatibility.down.sql": &bintree{_000004_user_messages_compatibilityDownSql, map[string]*bintree{}},
"000004_user_messages_compatibility.up.sql": &bintree{_000004_user_messages_compatibilityUpSql, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}},
}}
// RestoreAsset restores an asset under the given directory.

View File

@ -44,59 +44,6 @@ func (db sqlitePersistence) LastMessageClock(chatID string) (int64, error) {
return last.Int64, nil
}
func (db sqlitePersistence) SaveMessages(chatID string, messages []*protocol.Message) (last int64, err error) {
var (
tx *sql.Tx
stmt *sql.Stmt
)
tx, err = db.db.BeginTx(context.Background(), &sql.TxOptions{})
if err != nil {
return
}
stmt, err = tx.Prepare(`INSERT INTO user_messages(
id, chat_id, content_type, message_type, text, clock, timestamp, content_chat_id, content_text, public_key, flags)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return
}
defer func() {
if err == nil {
err = tx.Commit()
return
}
// don't shadow original error
_ = tx.Rollback()
}()
var rst sql.Result
for _, msg := range messages {
pkey := []byte{}
if msg.SigPubKey != nil {
pkey, err = marshalECDSAPub(msg.SigPubKey)
}
rst, err = stmt.Exec(
msg.ID, chatID, msg.ContentT, msg.MessageT, msg.Text,
msg.Clock, msg.Timestamp, msg.Content.ChatID, msg.Content.Text,
pkey, msg.Flags)
if err != nil {
if err.Error() == uniqueIDContstraint {
// skip duplicated messages
err = nil
continue
}
return
}
last, err = rst.LastInsertId()
if err != nil {
return
}
}
return
}
func formatChatID(chatID string, chatType ChatType) string {
return fmt.Sprintf("%s-%d", chatID, chatType)
}
@ -478,6 +425,59 @@ func (db sqlitePersistence) UnreadMessages(chatID string) ([]*protocol.Message,
return result, nil
}
func (db sqlitePersistence) SaveMessages(chatID string, messages []*protocol.Message) (last int64, err error) {
var (
tx *sql.Tx
stmt *sql.Stmt
)
tx, err = db.db.BeginTx(context.Background(), &sql.TxOptions{})
if err != nil {
return
}
stmt, err = tx.Prepare(`INSERT INTO user_messages(
id, chat_id, content_type, message_type, text, clock, timestamp, content_chat_id, content_text, public_key, flags)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return
}
defer func() {
if err == nil {
err = tx.Commit()
return
}
// don't shadow original error
_ = tx.Rollback()
}()
var rst sql.Result
for _, msg := range messages {
pkey := []byte{}
if msg.SigPubKey != nil {
pkey, err = marshalECDSAPub(msg.SigPubKey)
}
rst, err = stmt.Exec(
msg.ID, chatID, msg.ContentT, msg.MessageT, msg.Text,
msg.Clock, msg.Timestamp, msg.Content.ChatID, msg.Content.Text,
pkey, msg.Flags)
if err != nil {
if err.Error() == uniqueIDContstraint {
// skip duplicated messages
err = nil
continue
}
return
}
last, err = rst.LastInsertId()
if err != nil {
return
}
}
return
}
func marshalECDSAPub(pub *ecdsa.PublicKey) (rst []byte, err error) {
switch pub.Curve.(type) {
case *secp256k1.BitCurve:

View File

@ -0,0 +1,273 @@
package statusproto
import (
"database/sql"
"fmt"
"strings"
"github.com/pkg/errors"
)
var (
errRecordNotFound = errors.New("record not found")
)
func (db sqlitePersistence) tableUserMessagesLegacyAllFields() string {
return `id,
raw_payload_hash,
whisper_timestamp,
source,
destination,
content,
content_type,
username,
timestamp,
chat_id,
retry_count,
message_type,
message_status,
clock_value,
show,
seen,
outgoing_status`
}
func (db sqlitePersistence) tableUserMessagesLegacyAllFieldsCount() int {
return strings.Count(db.tableUserMessagesLegacyAllFields(), ",") + 1
}
type scanner interface {
Scan(dest ...interface{}) error
}
func (db sqlitePersistence) tableUserMessagesLegacyScanAllFields(row scanner, message *Message, others ...interface{}) error {
args := []interface{}{
&message.ID,
&message.RawPayloadHash,
&message.WhisperTimestamp,
&message.From, // source in table
&message.To, // destination in table
&message.Content,
&message.ContentType,
&message.Username,
&message.Timestamp,
&message.ChatID,
&message.RetryCount,
&message.MessageType,
&message.MessageStatus,
&message.ClockValue,
&message.Show,
&message.Seen,
&message.OutgoingStatus,
}
return row.Scan(append(args, others...)...)
}
func (db sqlitePersistence) tableUserMessagesLegacyAllValues(message *Message) []interface{} {
return []interface{}{
message.ID,
message.RawPayloadHash,
message.WhisperTimestamp,
message.From, // source in table
message.To, // destination in table
message.Content,
message.ContentType,
message.Username,
message.Timestamp,
message.ChatID,
message.RetryCount,
message.MessageType,
message.MessageStatus,
message.ClockValue,
message.Show,
message.Seen,
message.OutgoingStatus,
}
}
func (db sqlitePersistence) MessageByID(id string) (*Message, error) {
var message Message
allFields := db.tableUserMessagesLegacyAllFields()
row := db.db.QueryRow(
fmt.Sprintf(`
SELECT
%s
FROM
user_messages_legacy
WHERE
id = ?
`, allFields),
id,
)
err := db.tableUserMessagesLegacyScanAllFields(row, &message)
switch err {
case sql.ErrNoRows:
return nil, errRecordNotFound
case nil:
return &message, nil
default:
return nil, err
}
}
func (db sqlitePersistence) MessageExists(id string) (bool, error) {
var result bool
err := db.db.QueryRow(`SELECT EXISTS(SELECT 1 FROM user_messages_legacy WHERE id = ?)`, id).Scan(&result)
switch err {
case sql.ErrNoRows:
return false, errRecordNotFound
case nil:
return result, nil
default:
return false, err
}
}
// MessageByChatID returns all messages for a given chatID in descending order.
// Ordering is accomplished using two concatenated values: ClockValue and ID.
// These two values are also used to compose a cursor which is returned to the result.
func (db sqlitePersistence) MessageByChatID(chatID string, currCursor string, limit int) ([]*Message, string, error) {
cursorWhere := ""
if currCursor != "" {
cursorWhere = "AND cursor <= ?"
}
allFields := db.tableUserMessagesLegacyAllFields()
args := []interface{}{chatID}
if currCursor != "" {
args = append(args, currCursor)
}
// Build a new column `cursor` at the query time by having a fixed-sized clock value at the beginning
// concatenated with rowid. Results are sorted using this new column.
// This new column values can also be returned as a cursor for subsequent requests.
rows, err := db.db.Query(
fmt.Sprintf(`
SELECT
%s,
substr('0000000000000000000000000000000000000000000000000000000000000000' || clock_value, -64, 64) || id as cursor
FROM
user_messages_legacy
WHERE
chat_id = ? %s
ORDER BY cursor DESC
LIMIT ?
`, allFields, cursorWhere),
append(args, limit+1)..., // take one more to figure our whether a cursor should be returned
)
if err != nil {
return nil, "", err
}
defer rows.Close()
var (
result []*Message
cursors []string
)
for rows.Next() {
var (
message Message
cursor string
)
if err := db.tableUserMessagesLegacyScanAllFields(rows, &message, &cursor); err != nil {
return nil, "", err
}
result = append(result, &message)
cursors = append(cursors, cursor)
}
var newCursor string
if len(result) > limit {
newCursor = cursors[limit]
result = result[:limit]
}
return result, newCursor, nil
}
func (db sqlitePersistence) MessagesFrom(from []byte) ([]*Message, error) {
allFields := db.tableUserMessagesLegacyAllFields()
rows, err := db.db.Query(
fmt.Sprintf(`
SELECT
%s
FROM
user_messages_legacy
WHERE
source = ?
`, allFields),
from,
)
if err != nil {
return nil, err
}
defer rows.Close()
var result []*Message
for rows.Next() {
var message Message
if err := db.tableUserMessagesLegacyScanAllFields(rows, &message); err != nil {
return nil, err
}
result = append(result, &message)
}
return result, nil
}
func (db sqlitePersistence) UnseenMessageIDs() ([][]byte, error) {
rows, err := db.db.Query(`SELECT id FROM user_messages_legacy WHERE seen = 0`)
if err != nil {
return nil, err
}
defer rows.Close()
var result [][]byte
for rows.Next() {
var id []byte
if err := rows.Scan(&id); err != nil {
return nil, err
}
result = append(result, id)
}
return result, nil
}
func (db sqlitePersistence) SaveMessage(m *Message) error {
allFields := db.tableUserMessagesLegacyAllFields()
valuesVector := strings.Repeat("?, ", db.tableUserMessagesLegacyAllFieldsCount()-1) + "?"
query := fmt.Sprintf(`INSERT INTO user_messages_legacy(%s) VALUES (%s)`, allFields, valuesVector)
_, err := db.db.Exec(
query,
db.tableUserMessagesLegacyAllValues(m)...,
)
return err
}
func (db sqlitePersistence) DeleteMessage(id string) error {
_, err := db.db.Exec(`DELETE FROM user_messages_legacy WHERE id = ?`, id)
return err
}
func (db sqlitePersistence) MarkMessagesSeen(ids ...string) error {
idsArgs := make([]interface{}, 0, len(ids))
for _, id := range ids {
idsArgs = append(idsArgs, id)
}
inVector := strings.Repeat("?, ", len(ids)-1) + "?"
_, err := db.db.Exec(
fmt.Sprintf(`
UPDATE user_messages_legacy
SET seen = 1
WHERE id IN (%s)
`, inVector),
idsArgs...)
return err
}
func (db sqlitePersistence) UpdateMessageOutgoingStatus(id string, newOutgoingStatus string) error {
_, err := db.db.Exec(`
UPDATE user_messages_legacy
SET outgoing_status = ?
WHERE id = ?
`, newOutgoingStatus, id)
return err
}

View File

@ -271,6 +271,7 @@ func (m *EnvelopesMonitor) handleEnvelopeFailure(hash common.Hash, err error) {
m.envelopes[envelopeID] = EnvelopePosted
m.messages[envelopeID] = message
m.attempts[envelopeID] = attempt + 1
m.identifiers[envelopeID] = identifiers
} else {
m.logger.Debug("envelope expired", zap.String("hash", hash.String()))
if m.handler != nil {

View File

@ -5,12 +5,12 @@ import (
"crypto/ecdsa"
"encoding/hex"
"encoding/json"
"github.com/pkg/errors"
"strings"
"time"
"github.com/ethereum/go-ethereum/crypto"
"github.com/golang/protobuf/proto"
"strings"
"github.com/pkg/errors"
)
const (
@ -144,7 +144,8 @@ func EncodeMessage(value Message) ([]byte, error) {
return buf.Bytes(), nil
}
// MessageID calculates the messageID, by appending the sha3-256 to the pubkey bytes
// MessageID calculates the messageID from author's compressed public key
// and not encrypted but encoded payload.
func MessageID(author *ecdsa.PublicKey, data []byte) []byte {
keyBytes := crypto.FromECDSAPub(author)
return crypto.Keccak256(append(keyBytes, data...))

2
vendor/modules.txt vendored
View File

@ -317,7 +317,7 @@ github.com/status-im/migrate/v4/database/sqlcipher
github.com/status-im/rendezvous
github.com/status-im/rendezvous/protocol
github.com/status-im/rendezvous/server
# github.com/status-im/status-protocol-go v0.0.0-20190701094942-e1f4f17bafc4ac757933fb6a801fe9fed68b6c4d
# github.com/status-im/status-protocol-go v0.0.0-20190701094942-c2b7b022b722d7bebe1c6d6f05cdead79f1b57bd
github.com/status-im/status-protocol-go/zaputil
github.com/status-im/status-protocol-go
github.com/status-im/status-protocol-go/encryption/multidevice