Extend peersyncing to sync 1-to-1 messages (#4962)

* fix_: extend peersyncing to sync 1-to-1 messages

* fix_: tests
This commit is contained in:
Alexander 2024-05-14 12:20:13 +02:00 committed by GitHub
parent 867cd1f14b
commit c46e395a58
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 264 additions and 111 deletions

View File

@ -2431,7 +2431,6 @@ func (m *Messenger) sendChatMessage(ctx context.Context, message *common.Message
syncMessageType = peersyncing.SyncMessageCommunityType syncMessageType = peersyncing.SyncMessageCommunityType
} else if chat.PrivateGroupChat() { } else if chat.PrivateGroupChat() {
syncMessageType = peersyncing.SyncMessagePrivateGroup syncMessageType = peersyncing.SyncMessagePrivateGroup
} }
wrappedMessage, err := v1protocol.WrapMessageV1(rawMessage.Payload, rawMessage.MessageType, rawMessage.Sender) wrappedMessage, err := v1protocol.WrapMessageV1(rawMessage.Payload, rawMessage.MessageType, rawMessage.Sender)
@ -2442,7 +2441,7 @@ func (m *Messenger) sendChatMessage(ctx context.Context, message *common.Message
syncMessage := peersyncing.SyncMessage{ syncMessage := peersyncing.SyncMessage{
Type: syncMessageType, Type: syncMessageType,
ID: types.Hex2Bytes(rawMessage.ID), ID: types.Hex2Bytes(rawMessage.ID),
GroupID: []byte(chat.ID), ChatID: []byte(chat.ID),
Payload: wrappedMessage, Payload: wrappedMessage,
Timestamp: m.transport.GetCurrentTime() / 1000, Timestamp: m.transport.GetCurrentTime() / 1000,
} }

View File

@ -2457,7 +2457,7 @@ func (m *Messenger) addPeersyncingMessage(chat *Chat, msg *v1protocol.StatusMess
syncMessage := peersyncing.SyncMessage{ syncMessage := peersyncing.SyncMessage{
Type: syncMessageType, Type: syncMessageType,
ID: msg.ApplicationLayer.ID, ID: msg.ApplicationLayer.ID,
GroupID: []byte(chat.ID), ChatID: []byte(chat.ID),
Payload: msg.EncryptionLayer.Payload, Payload: msg.EncryptionLayer.Payload,
Timestamp: uint64(msg.TransportLayer.Message.Timestamp), Timestamp: uint64(msg.TransportLayer.Message.Timestamp),
} }

View File

@ -118,37 +118,47 @@ func (m *Messenger) sendDatasyncOffers() error {
return nil return nil
} }
communities, err := m.communitiesManager.Joined() err = m.sendDatasyncOffersForCommunities()
if err != nil { if err != nil {
return err return err
} }
for _, community := range communities { err = m.sendDatasyncOffersForChats()
if err != nil {
return err
}
// Check all the group ids that need to be on offer
// Get all the messages that need to be offered
// Prepare datasync messages
// Dispatch them to the right group
return nil
}
func (m *Messenger) sendDatasyncOffersForCommunities() error {
joinedCommunities, err := m.communitiesManager.Joined()
if err != nil {
return err
}
for _, community := range joinedCommunities {
var chatIDs [][]byte var chatIDs [][]byte
for id := range community.Chats() { for id := range community.Chats() {
chatIDs = append(chatIDs, []byte(community.IDString()+id)) chatIDs = append(chatIDs, []byte(community.IDString()+id))
} }
if len(chatIDs) == 0 { if len(chatIDs) == 0 {
continue continue
} }
availableMessagesMap, err := m.peersyncing.AvailableMessagesMapByChatIDs(chatIDs, maxAdvertiseMessages)
availableMessages, err := m.peersyncing.AvailableMessagesByGroupIDs(chatIDs, maxAdvertiseMessages)
if err != nil { if err != nil {
return err return err
} }
availableMessagesMap := make(map[string][][]byte)
for _, m := range availableMessages {
groupID := types.Bytes2Hex(m.GroupID)
availableMessagesMap[groupID] = append(availableMessagesMap[groupID], m.ID)
}
datasyncMessage := &datasyncproto.Payload{} datasyncMessage := &datasyncproto.Payload{}
if len(availableMessages) == 0 { if len(availableMessagesMap) == 0 {
continue continue
} }
for groupID, m := range availableMessagesMap { for chatID, m := range availableMessagesMap {
datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: types.Hex2Bytes(groupID), MessageIds: m}) datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: types.Hex2Bytes(chatID), MessageIds: m})
} }
payload, err := proto.Marshal(datasyncMessage) payload, err := proto.Marshal(datasyncMessage)
if err != nil { if err != nil {
@ -164,12 +174,43 @@ func (m *Messenger) sendDatasyncOffers() error {
if err != nil { if err != nil {
return err return err
} }
} }
// Check all the group ids that need to be on offer return nil
// Get all the messages that need to be offered }
// Prepare datasync messages
// Dispatch them to the right group func (m *Messenger) sendDatasyncOffersForChats() error {
for _, chat := range m.Chats() {
chatIDBytes := []byte(chat.ID)
availableMessagesMap, err := m.peersyncing.AvailableMessagesMapByChatIDs([][]byte{chatIDBytes}, maxAdvertiseMessages)
if err != nil {
return err
}
datasyncMessage := &datasyncproto.Payload{}
if len(availableMessagesMap) == 0 {
continue
}
for _, message := range availableMessagesMap {
datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: chatIDBytes, MessageIds: message})
}
payload, err := proto.Marshal(datasyncMessage)
if err != nil {
return err
}
publicKey, err := chat.PublicKey()
if err != nil {
return err
}
rawMessage := common.RawMessage{
Payload: payload,
Ephemeral: true,
SkipApplicationWrap: true,
}
_, err = m.sender.SendPrivate(context.Background(), publicKey, &rawMessage)
if err != nil {
return err
}
}
return nil return nil
} }
@ -188,7 +229,7 @@ func (m *Messenger) OnDatasyncOffer(response *common.HandleMessageResponse) erro
var offeredMessages []peersyncing.SyncMessage var offeredMessages []peersyncing.SyncMessage
for _, o := range offers { for _, o := range offers {
offeredMessages = append(offeredMessages, peersyncing.SyncMessage{GroupID: o.GroupID, ID: o.MessageID}) offeredMessages = append(offeredMessages, peersyncing.SyncMessage{ChatID: o.GroupID, ID: o.MessageID})
} }
messagesToFetch, err := m.peersyncing.OnOffer(offeredMessages) messagesToFetch, err := m.peersyncing.OnOffer(offeredMessages)
@ -235,7 +276,7 @@ func (m *Messenger) OnDatasyncOffer(response *common.HandleMessageResponse) erro
func (m *Messenger) canSyncMessageWith(message peersyncing.SyncMessage, peer *ecdsa.PublicKey) (bool, error) { func (m *Messenger) canSyncMessageWith(message peersyncing.SyncMessage, peer *ecdsa.PublicKey) (bool, error) {
switch message.Type { switch message.Type {
case peersyncing.SyncMessageCommunityType: case peersyncing.SyncMessageCommunityType:
chat, ok := m.allChats.Load(string(message.GroupID)) chat, ok := m.allChats.Load(string(message.ChatID))
if !ok { if !ok {
return false, nil return false, nil
} }
@ -245,7 +286,12 @@ func (m *Messenger) canSyncMessageWith(message peersyncing.SyncMessage, peer *ec
} }
return m.canSyncCommunityMessageWith(chat, community, peer) return m.canSyncCommunityMessageWith(chat, community, peer)
case peersyncing.SyncMessageOneToOneType:
chat, ok := m.allChats.Load(string(message.ChatID))
if !ok {
return false, nil
}
return m.canSyncOneToOneMessageWith(chat, peer)
default: default:
return false, nil return false, nil
} }
@ -258,6 +304,10 @@ func (m *Messenger) canSyncCommunityMessageWith(chat *Chat, community *communiti
return community.IsMemberInChat(peer, chat.CommunityChatID()), nil return community.IsMemberInChat(peer, chat.CommunityChatID()), nil
} }
func (m *Messenger) canSyncOneToOneMessageWith(chat *Chat, peer *ecdsa.PublicKey) (bool, error) {
return chat.HasMember(common.PubkeyToHex(peer)), nil
}
func (m *Messenger) OnDatasyncRequests(requester *ecdsa.PublicKey, messageIDs [][]byte) error { func (m *Messenger) OnDatasyncRequests(requester *ecdsa.PublicKey, messageIDs [][]byte) error {
if len(messageIDs) == 0 { if len(messageIDs) == 0 {
return nil return nil

View File

@ -2,6 +2,7 @@ package protocol
import ( import (
"context" "context"
"encoding/hex"
"testing" "testing"
"time" "time"
@ -9,6 +10,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/communities"
@ -243,7 +245,7 @@ func (s *MessengerPeersyncingSuite) TestCanSyncMessageWith() {
syncMessage := peersyncing.SyncMessage{ syncMessage := peersyncing.SyncMessage{
ID: []byte("test-id"), ID: []byte("test-id"),
GroupID: []byte(chat.ID), ChatID: []byte(chat.ID),
Type: peersyncing.SyncMessageCommunityType, Type: peersyncing.SyncMessageCommunityType,
Payload: []byte("some-payload"), Payload: []byte("some-payload"),
Timestamp: 1, Timestamp: 1,
@ -261,3 +263,97 @@ func (s *MessengerPeersyncingSuite) TestCanSyncMessageWith() {
s.Require().NoError(err) s.Require().NoError(err)
s.Require().True(canSyncWithAlice) s.Require().True(canSyncWithAlice)
} }
func (s *MessengerPeersyncingSuite) TestSyncOneToOne() {
s.alice.featureFlags.Peersyncing = true
s.owner.featureFlags.Peersyncing = true
pkString := hex.EncodeToString(crypto.FromECDSAPub(&s.alice.identity.PublicKey))
chat := CreateOneToOneChat(pkString, &s.alice.identity.PublicKey, s.owner.transport)
chat.LastClockValue = uint64(100000000000000)
err := s.owner.SaveChat(chat)
s.NoError(err)
_, err = s.alice.Join(chat)
s.NoError(err)
chatID := chat.ID
inputMessage := common.NewMessage()
inputMessage.ChatId = chatID
inputMessage.ContentType = protobuf.ChatMessage_TEXT_PLAIN
inputMessage.Text = "some text"
ctx := context.Background()
// Send message, it should be received
response, err := s.alice.SendChatMessage(ctx, inputMessage)
s.Require().NoError(err)
s.Require().Len(response.Messages(), 1)
messageID := response.Messages()[0].ID
// Make sure the message makes it to the owner
response, err = WaitOnMessengerResponse(
s.owner,
func(r *MessengerResponse) bool {
return len(r.Messages()) == 1 && r.Messages()[0].ID == messageID
},
"message not received",
)
s.Require().NoError(err)
s.Require().NotNil(response)
msg, err := s.owner.peersyncing.AvailableMessages()
s.Require().NoError(err)
s.Require().Len(msg, 1)
// Alice should now send an offer
_, err = WaitOnMessengerResponse(
s.alice,
func(r *MessengerResponse) bool {
return s.alice.peersyncingOffers[messageID[2:]] != 0
},
"offer not sent",
)
s.Require().NoError(err)
// Owner should now reply to the offer
_, err = WaitOnMessengerResponse(
s.owner,
func(r *MessengerResponse) bool {
return s.owner.peersyncingRequests[s.alice.myHexIdentity()+messageID[2:]] != 0
},
"request not sent",
)
s.Require().NoError(err)
}
func (s *MessengerPeersyncingSuite) TestCanSyncOneToOneMessageWith() {
s.alice.featureFlags.Peersyncing = true
s.owner.featureFlags.Peersyncing = true
pkString := hex.EncodeToString(crypto.FromECDSAPub(&s.alice.identity.PublicKey))
chat := CreateOneToOneChat(pkString, &s.alice.identity.PublicKey, s.owner.transport)
chat.LastClockValue = uint64(100000000000000)
err := s.owner.SaveChat(chat)
s.NoError(err)
_, err = s.alice.Join(chat)
s.NoError(err)
syncMessage := peersyncing.SyncMessage{
ID: []byte("test-id"),
ChatID: []byte(chat.ID),
Type: peersyncing.SyncMessageOneToOneType,
Payload: []byte("some-payload"),
Timestamp: chat.LastClockValue,
}
s.Require().NoError(s.owner.peersyncing.Add(syncMessage))
canSyncWithBob, err := s.owner.canSyncOneToOneMessageWith(chat, &s.bob.identity.PublicKey)
s.Require().NoError(err)
s.Require().False(canSyncWithBob)
canSyncWithAlice, err := s.owner.canSyncOneToOneMessageWith(chat, &s.alice.identity.PublicKey)
s.Require().NoError(err)
s.Require().True(canSyncWithAlice)
}

View File

@ -137,6 +137,7 @@
// 1712905223_add_parity_to_message_segments.up.sql (792B) // 1712905223_add_parity_to_message_segments.up.sql (792B)
// 1713169458_update_raw_messages_with_resend_features.up.sql (608B) // 1713169458_update_raw_messages_with_resend_features.up.sql (608B)
// 1715163152_remove_status_community.up.sql (354B) // 1715163152_remove_status_community.up.sql (354B)
// 1715163262_rename_peersyncing_group_id_field.up.sql (212B)
// README.md (554B) // README.md (554B)
// doc.go (870B) // doc.go (870B)
@ -2947,6 +2948,26 @@ func _1715163152_remove_status_communityUpSql() (*asset, error) {
return a, nil return a, nil
} }
var __1715163262_rename_peersyncing_group_id_fieldUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x7c\xcd\x31\x0a\xc3\x20\x18\x47\xf1\xdd\x53\xfc\xc7\x16\x7a\x83\x4c\x36\xf9\x0a\x01\xa3\xc5\x58\xc8\x26\x92\x88\x75\x30\x95\x68\x87\xde\xbe\x4b\xe8\x14\xba\xff\x78\xaf\xd3\xea\x8e\x5e\x76\x34\xa1\xbf\x81\xa6\x7e\x34\x23\xb2\xf7\x5b\xf9\xac\x73\x5c\x83\x4d\xbe\x14\x17\x7c\xb1\x35\x26\x5f\xaa\x4b\xb9\x61\x8c\x0b\x43\x1a\x86\x5f\x05\x1d\x62\x68\x92\x7c\x20\xb4\x4a\x3c\x06\x89\xb0\xbd\xde\xd9\xc6\x05\x46\x61\x7e\xba\x6a\xe3\xd2\x30\xd6\x6a\xe2\x86\xf6\xf9\xff\x25\x94\x3c\x14\xa7\xbd\x76\xc1\x8f\x9e\x1b\xf6\x0d\x00\x00\xff\xff\xf3\xaa\x82\x37\xd4\x00\x00\x00")
func _1715163262_rename_peersyncing_group_id_fieldUpSqlBytes() ([]byte, error) {
return bindataRead(
__1715163262_rename_peersyncing_group_id_fieldUpSql,
"1715163262_rename_peersyncing_group_id_field.up.sql",
)
}
func _1715163262_rename_peersyncing_group_id_fieldUpSql() (*asset, error) {
bytes, err := _1715163262_rename_peersyncing_group_id_fieldUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1715163262_rename_peersyncing_group_id_field.up.sql", size: 212, mode: os.FileMode(0644), modTime: time.Unix(1700000000, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x7b, 0x8d, 0x51, 0xe3, 0x46, 0xc, 0x1e, 0xf1, 0xe, 0xc2, 0xb3, 0x43, 0xfb, 0xed, 0xb0, 0x30, 0x45, 0x63, 0x18, 0x59, 0x19, 0x9d, 0x7e, 0x3f, 0x54, 0x87, 0xbf, 0x9f, 0x6, 0x9, 0x78, 0xec}}
return a, nil
}
var _readmeMd = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x54\x91\xc1\xce\xd3\x30\x10\x84\xef\x7e\x8a\x91\x7a\x01\xa9\x2a\x8f\xc0\x0d\x71\x82\x03\x48\x1c\xc9\x36\x9e\x36\x96\x1c\x6f\xf0\xae\x93\xe6\xed\x91\xa3\xc2\xdf\xff\x66\xed\xd8\x33\xdf\x78\x4f\xa7\x13\xbe\xea\x06\x57\x6c\x35\x39\x31\xa7\x7b\x15\x4f\x5a\xec\x73\x08\xbf\x08\x2d\x79\x7f\x4a\x43\x5b\x86\x17\xfd\x8c\x21\xea\x56\x5e\x47\x90\x4a\x14\x75\x48\xde\x64\x37\x2c\x6a\x96\xae\x99\x48\x05\xf6\x27\x77\x13\xad\x08\xae\x8a\x51\xe7\x25\xf3\xf1\xa9\x9f\xf9\x58\x58\x2c\xad\xbc\xe0\x8b\x56\xf0\x21\x5d\xeb\x4c\x95\xb3\xae\x84\x60\xd4\xdc\xe6\x82\x5d\x1b\x36\x6d\x39\x62\x92\xf5\xb8\x11\xdb\x92\xd3\x28\xce\xe0\x13\xe1\x72\xcd\x3c\x63\xd4\x65\x87\xae\xac\xe8\xc3\x28\x2e\x67\x44\x66\x3a\x21\x25\xa2\x72\xac\x14\x67\xbc\x84\x9f\x53\x32\x8c\x52\x70\x25\x56\xd6\xfd\x8d\x05\x37\xad\x30\x9d\x9f\xa6\x86\x0f\xcd\x58\x7f\xcf\x34\x93\x3b\xed\x90\x9f\xa4\x1f\xcf\x30\x85\x4d\x07\x58\xaf\x7f\x25\xc4\x9d\xf3\x72\x64\x84\xd0\x7f\xf9\x9b\x3a\x2d\x84\xef\x85\x48\x66\x8d\xd8\x88\x9b\x8c\x8c\x98\x5b\xf6\x74\x14\x4e\x33\x0d\xc9\xe0\x93\x38\xda\x12\xc5\x69\xbd\xe4\xf0\x2e\x7a\x78\x07\x1c\xfe\x13\x9f\x91\x29\x31\x95\x7b\x7f\x62\x59\x37\xb4\xe5\x5e\x25\xfe\x33\xee\xd5\x53\x71\xd6\xda\x3a\xd8\xcb\xde\x2e\xf8\xa1\x90\x55\x53\x0c\xc7\xaa\x0d\xe9\x76\x14\x29\x1c\x7b\x68\xdd\x2f\xe1\x6f\x00\x00\x00\xff\xff\x3c\x0a\xc2\xfe\x2a\x02\x00\x00") var _readmeMd = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x54\x91\xc1\xce\xd3\x30\x10\x84\xef\x7e\x8a\x91\x7a\x01\xa9\x2a\x8f\xc0\x0d\x71\x82\x03\x48\x1c\xc9\x36\x9e\x36\x96\x1c\x6f\xf0\xae\x93\xe6\xed\x91\xa3\xc2\xdf\xff\x66\xed\xd8\x33\xdf\x78\x4f\xa7\x13\xbe\xea\x06\x57\x6c\x35\x39\x31\xa7\x7b\x15\x4f\x5a\xec\x73\x08\xbf\x08\x2d\x79\x7f\x4a\x43\x5b\x86\x17\xfd\x8c\x21\xea\x56\x5e\x47\x90\x4a\x14\x75\x48\xde\x64\x37\x2c\x6a\x96\xae\x99\x48\x05\xf6\x27\x77\x13\xad\x08\xae\x8a\x51\xe7\x25\xf3\xf1\xa9\x9f\xf9\x58\x58\x2c\xad\xbc\xe0\x8b\x56\xf0\x21\x5d\xeb\x4c\x95\xb3\xae\x84\x60\xd4\xdc\xe6\x82\x5d\x1b\x36\x6d\x39\x62\x92\xf5\xb8\x11\xdb\x92\xd3\x28\xce\xe0\x13\xe1\x72\xcd\x3c\x63\xd4\x65\x87\xae\xac\xe8\xc3\x28\x2e\x67\x44\x66\x3a\x21\x25\xa2\x72\xac\x14\x67\xbc\x84\x9f\x53\x32\x8c\x52\x70\x25\x56\xd6\xfd\x8d\x05\x37\xad\x30\x9d\x9f\xa6\x86\x0f\xcd\x58\x7f\xcf\x34\x93\x3b\xed\x90\x9f\xa4\x1f\xcf\x30\x85\x4d\x07\x58\xaf\x7f\x25\xc4\x9d\xf3\x72\x64\x84\xd0\x7f\xf9\x9b\x3a\x2d\x84\xef\x85\x48\x66\x8d\xd8\x88\x9b\x8c\x8c\x98\x5b\xf6\x74\x14\x4e\x33\x0d\xc9\xe0\x93\x38\xda\x12\xc5\x69\xbd\xe4\xf0\x2e\x7a\x78\x07\x1c\xfe\x13\x9f\x91\x29\x31\x95\x7b\x7f\x62\x59\x37\xb4\xe5\x5e\x25\xfe\x33\xee\xd5\x53\x71\xd6\xda\x3a\xd8\xcb\xde\x2e\xf8\xa1\x90\x55\x53\x0c\xc7\xaa\x0d\xe9\x76\x14\x29\x1c\x7b\x68\xdd\x2f\xe1\x6f\x00\x00\x00\xff\xff\x3c\x0a\xc2\xfe\x2a\x02\x00\x00")
func readmeMdBytes() ([]byte, error) { func readmeMdBytes() ([]byte, error) {
@ -3352,6 +3373,8 @@ var _bindata = map[string]func() (*asset, error){
"1715163152_remove_status_community.up.sql": _1715163152_remove_status_communityUpSql, "1715163152_remove_status_community.up.sql": _1715163152_remove_status_communityUpSql,
"1715163262_rename_peersyncing_group_id_field.up.sql": _1715163262_rename_peersyncing_group_id_fieldUpSql,
"README.md": readmeMd, "README.md": readmeMd,
"doc.go": docGo, "doc.go": docGo,
@ -3361,11 +3384,13 @@ var _bindata = map[string]func() (*asset, error){
// directory embedded in the file by go-bindata. // directory embedded in the file by go-bindata.
// For example if you run go-bindata on data/... and data contains the // For example if you run go-bindata on data/... and data contains the
// following hierarchy: // following hierarchy:
// data/ //
// foo.txt // data/
// img/ // foo.txt
// a.png // img/
// b.png // a.png
// b.png
//
// then AssetDir("data") would return []string{"foo.txt", "img"}, // then AssetDir("data") would return []string{"foo.txt", "img"},
// AssetDir("data/img") would return []string{"a.png", "b.png"}, // AssetDir("data/img") would return []string{"a.png", "b.png"},
// AssetDir("foo.txt") and AssetDir("notexist") would return an error, and // AssetDir("foo.txt") and AssetDir("notexist") would return an error, and
@ -3535,6 +3560,7 @@ var _bintree = &bintree{nil, map[string]*bintree{
"1712905223_add_parity_to_message_segments.up.sql": &bintree{_1712905223_add_parity_to_message_segmentsUpSql, map[string]*bintree{}}, "1712905223_add_parity_to_message_segments.up.sql": &bintree{_1712905223_add_parity_to_message_segmentsUpSql, map[string]*bintree{}},
"1713169458_update_raw_messages_with_resend_features.up.sql": &bintree{_1713169458_update_raw_messages_with_resend_featuresUpSql, map[string]*bintree{}}, "1713169458_update_raw_messages_with_resend_features.up.sql": &bintree{_1713169458_update_raw_messages_with_resend_featuresUpSql, map[string]*bintree{}},
"1715163152_remove_status_community.up.sql": &bintree{_1715163152_remove_status_communityUpSql, map[string]*bintree{}}, "1715163152_remove_status_community.up.sql": &bintree{_1715163152_remove_status_communityUpSql, map[string]*bintree{}},
"1715163262_rename_peersyncing_group_id_field.up.sql": &bintree{_1715163262_rename_peersyncing_group_id_fieldUpSql, map[string]*bintree{}},
"README.md": &bintree{readmeMd, map[string]*bintree{}}, "README.md": &bintree{readmeMd, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}}, "doc.go": &bintree{docGo, map[string]*bintree{}},
}} }}

View File

@ -0,0 +1,5 @@
DROP INDEX IF EXISTS peersyncing_messages_timestamp;
ALTER TABLE peersyncing_messages RENAME COLUMN group_id TO chat_id;
CREATE INDEX peersyncing_messages_timestamp ON peersyncing_messages(chat_id, timestamp);

View File

@ -1,5 +1,7 @@
package peersyncing package peersyncing
import "github.com/status-im/status-go/eth-node/types"
type PeerSyncing struct { type PeerSyncing struct {
persistence SyncMessagePersistence persistence SyncMessagePersistence
config Config config Config
@ -25,12 +27,17 @@ func (p *PeerSyncing) AvailableMessages() ([]SyncMessage, error) {
return p.persistence.All() return p.persistence.All()
} }
func (p *PeerSyncing) AvailableMessagesByGroupID(groupID []byte, limit int) ([]SyncMessage, error) { func (p *PeerSyncing) AvailableMessagesMapByChatIDs(groupIDs [][]byte, limit int) (map[string][][]byte, error) {
return p.persistence.ByGroupID(groupID, limit) availableMessages, err := p.persistence.ByChatIDs(groupIDs, limit)
} if err != nil {
return nil, err
func (p *PeerSyncing) AvailableMessagesByGroupIDs(groupIDs [][]byte, limit int) ([]SyncMessage, error) { }
return p.persistence.ByGroupIDs(groupIDs, limit) availableMessagesMap := make(map[string][][]byte)
for _, m := range availableMessages {
chatID := types.Bytes2Hex(m.ChatID)
availableMessagesMap[chatID] = append(availableMessagesMap[chatID], m.ID)
}
return availableMessagesMap, err
} }
func (p *PeerSyncing) MessagesByIDs(messageIDs [][]byte) ([]SyncMessage, error) { func (p *PeerSyncing) MessagesByIDs(messageIDs [][]byte) ([]SyncMessage, error) {

View File

@ -6,6 +6,7 @@ import (
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/status-im/status-go/appdatabase" "github.com/status-im/status-go/appdatabase"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/sqlite" "github.com/status-im/status-go/protocol/sqlite"
"github.com/status-im/status-go/t/helpers" "github.com/status-im/status-go/t/helpers"
) )
@ -29,13 +30,13 @@ func (s *PeerSyncingSuite) SetupTest() {
s.p = New(Config{Database: db}) s.p = New(Config{Database: db})
} }
var testGroupID = []byte("group-id") var testCommunityID = []byte("community-id")
func (s *PeerSyncingSuite) TestBasic() { func (s *PeerSyncingSuite) TestBasic() {
syncMessage := SyncMessage{ syncMessage := SyncMessage{
ID: []byte("test-id"), ID: []byte("test-id"),
GroupID: testGroupID, ChatID: testCommunityID,
Type: SyncMessageCommunityType, Type: SyncMessageCommunityType,
Payload: []byte("test"), Payload: []byte("test"),
Timestamp: 1, Timestamp: 1,
@ -48,19 +49,19 @@ func (s *PeerSyncingSuite) TestBasic() {
s.Require().NoError(err) s.Require().NoError(err)
s.Require().Len(allMessages, 1) s.Require().Len(allMessages, 1)
byGroupID, err := s.p.AvailableMessagesByGroupID(syncMessage.GroupID, 10) byChatID, err := s.p.AvailableMessagesMapByChatIDs([][]byte{syncMessage.ChatID}, 10)
s.Require().NoError(err) s.Require().NoError(err)
s.Require().Len(byGroupID, 1) s.Require().Len(byChatID, 1)
byGroupID, err = s.p.AvailableMessagesByGroupID([]byte("random-group-id"), 10) byChatID, err = s.p.AvailableMessagesMapByChatIDs([][]byte{[]byte("random-group-id")}, 10)
s.Require().NoError(err) s.Require().NoError(err)
s.Require().Len(byGroupID, 0) s.Require().Len(byChatID, 0)
newSyncMessage := SyncMessage{ newSyncMessage := SyncMessage{
ID: []byte("test-id-2"), ID: []byte("test-id-2"),
GroupID: testGroupID, ChatID: testCommunityID,
Type: SyncMessageCommunityType, Type: SyncMessageCommunityType,
Payload: []byte("test-2"), Payload: []byte("test-2"),
Timestamp: 2, Timestamp: 2,
@ -77,7 +78,7 @@ func (s *PeerSyncingSuite) TestOrderAndLimit() {
syncMessage1 := SyncMessage{ syncMessage1 := SyncMessage{
ID: []byte("test-id-1"), ID: []byte("test-id-1"),
GroupID: testGroupID, ChatID: testCommunityID,
Type: SyncMessageCommunityType, Type: SyncMessageCommunityType,
Payload: []byte("test"), Payload: []byte("test"),
Timestamp: 1, Timestamp: 1,
@ -85,7 +86,7 @@ func (s *PeerSyncingSuite) TestOrderAndLimit() {
syncMessage2 := SyncMessage{ syncMessage2 := SyncMessage{
ID: []byte("test-id-2"), ID: []byte("test-id-2"),
GroupID: testGroupID, ChatID: testCommunityID,
Type: SyncMessageCommunityType, Type: SyncMessageCommunityType,
Payload: []byte("test"), Payload: []byte("test"),
Timestamp: 2, Timestamp: 2,
@ -93,7 +94,7 @@ func (s *PeerSyncingSuite) TestOrderAndLimit() {
syncMessage3 := SyncMessage{ syncMessage3 := SyncMessage{
ID: []byte("test-id-3"), ID: []byte("test-id-3"),
GroupID: testGroupID, ChatID: testCommunityID,
Type: SyncMessageCommunityType, Type: SyncMessageCommunityType,
Payload: []byte("test"), Payload: []byte("test"),
Timestamp: 3, Timestamp: 3,
@ -101,7 +102,7 @@ func (s *PeerSyncingSuite) TestOrderAndLimit() {
syncMessage4 := SyncMessage{ syncMessage4 := SyncMessage{
ID: []byte("test-id-4"), ID: []byte("test-id-4"),
GroupID: testGroupID, ChatID: testCommunityID,
Type: SyncMessageCommunityType, Type: SyncMessageCommunityType,
Payload: []byte("test"), Payload: []byte("test"),
Timestamp: 4, Timestamp: 4,
@ -112,22 +113,15 @@ func (s *PeerSyncingSuite) TestOrderAndLimit() {
s.Require().NoError(s.p.Add(syncMessage3)) s.Require().NoError(s.p.Add(syncMessage3))
s.Require().NoError(s.p.Add(syncMessage4)) s.Require().NoError(s.p.Add(syncMessage4))
byGroupID, err := s.p.AvailableMessagesByGroupID(testGroupID, 10) byChatID, err := s.p.AvailableMessagesMapByChatIDs([][]byte{testCommunityID}, 10)
s.Require().NoError(err) s.Require().NoError(err)
s.Require().Len(byGroupID, 4) s.Require().Len(byChatID, 1)
s.Require().Len(byChatID[types.Bytes2Hex(testCommunityID)], 4)
s.Require().Equal(syncMessage1.ID, byGroupID[3].ID) byChatID, err = s.p.AvailableMessagesMapByChatIDs([][]byte{testCommunityID}, 3)
s.Require().Equal(syncMessage2.ID, byGroupID[2].ID)
s.Require().Equal(syncMessage3.ID, byGroupID[1].ID)
s.Require().Equal(syncMessage4.ID, byGroupID[0].ID)
byGroupID, err = s.p.AvailableMessagesByGroupID(testGroupID, 3)
s.Require().NoError(err) s.Require().NoError(err)
s.Require().Len(byGroupID, 3) s.Require().Len(byChatID, 1)
s.Require().Len(byChatID[types.Bytes2Hex(testCommunityID)], 3)
s.Require().Equal(syncMessage2.ID, byGroupID[2].ID)
s.Require().Equal(syncMessage3.ID, byGroupID[1].ID)
s.Require().Equal(syncMessage4.ID, byGroupID[0].ID)
} }

View File

@ -7,7 +7,7 @@ type SyncMessageType int
type SyncMessage struct { type SyncMessage struct {
ID []byte ID []byte
Type SyncMessageType Type SyncMessageType
GroupID []byte ChatID []byte
Payload []byte Payload []byte
Timestamp uint64 Timestamp uint64
} }
@ -15,7 +15,7 @@ type SyncMessage struct {
var ErrSyncMessageNotValid = errors.New("sync message not valid") var ErrSyncMessageNotValid = errors.New("sync message not valid")
func (s *SyncMessage) Valid() error { func (s *SyncMessage) Valid() error {
valid := len(s.ID) != 0 && s.Type != SyncMessageNoType && len(s.GroupID) != 0 && len(s.Payload) != 0 && s.Timestamp != 0 valid := len(s.ID) != 0 && s.Type != SyncMessageNoType && len(s.ChatID) != 0 && len(s.Payload) != 0 && s.Timestamp != 0
if !valid { if !valid {
return ErrSyncMessageNotValid return ErrSyncMessageNotValid
} }

View File

@ -11,8 +11,7 @@ type SyncMessagePersistence interface {
Add(SyncMessage) error Add(SyncMessage) error
All() ([]SyncMessage, error) All() ([]SyncMessage, error)
Complement([]SyncMessage) ([]SyncMessage, error) Complement([]SyncMessage) ([]SyncMessage, error)
ByGroupID([]byte, int) ([]SyncMessage, error) ByChatIDs([][]byte, int) ([]SyncMessage, error)
ByGroupIDs([][]byte, int) ([]SyncMessage, error)
ByMessageIDs([][]byte) ([]SyncMessage, error) ByMessageIDs([][]byte) ([]SyncMessage, error)
} }
@ -28,13 +27,13 @@ func (p *SyncMessageSQLitePersistence) Add(message SyncMessage) error {
if err := message.Valid(); err != nil { if err := message.Valid(); err != nil {
return err return err
} }
_, err := p.db.Exec(`INSERT INTO peersyncing_messages (id, type, group_id, payload, timestamp) VALUES (?, ?, ?, ?, ?)`, message.ID, message.Type, message.GroupID, message.Payload, message.Timestamp) _, err := p.db.Exec(`INSERT INTO peersyncing_messages (id, type, chat_id, payload, timestamp) VALUES (?, ?, ?, ?, ?)`, message.ID, message.Type, message.ChatID, message.Payload, message.Timestamp)
return err return err
} }
func (p *SyncMessageSQLitePersistence) All() ([]SyncMessage, error) { func (p *SyncMessageSQLitePersistence) All() ([]SyncMessage, error) {
var messages []SyncMessage var messages []SyncMessage
rows, err := p.db.Query(`SELECT id, type, group_id, payload, timestamp FROM peersyncing_messages`) rows, err := p.db.Query(`SELECT id, type, chat_id, payload, timestamp FROM peersyncing_messages`)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -44,7 +43,7 @@ func (p *SyncMessageSQLitePersistence) All() ([]SyncMessage, error) {
for rows.Next() { for rows.Next() {
var m SyncMessage var m SyncMessage
err := rows.Scan(&m.ID, &m.Type, &m.GroupID, &m.Payload, &m.Timestamp) err := rows.Scan(&m.ID, &m.Type, &m.ChatID, &m.Payload, &m.Timestamp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -54,9 +53,22 @@ func (p *SyncMessageSQLitePersistence) All() ([]SyncMessage, error) {
return messages, nil return messages, nil
} }
func (p *SyncMessageSQLitePersistence) ByGroupID(groupID []byte, limit int) ([]SyncMessage, error) { func (p *SyncMessageSQLitePersistence) ByChatIDs(ids [][]byte, limit int) ([]SyncMessage, error) {
if len(ids) == 0 {
return nil, nil
}
queryArgs := make([]interface{}, 0, len(ids))
for _, id := range ids {
queryArgs = append(queryArgs, id)
}
queryArgs = append(queryArgs, limit)
inVector := strings.Repeat("?, ", len(ids)-1) + "?"
query := "SELECT id, type, chat_id, payload, timestamp FROM peersyncing_messages WHERE chat_id IN (" + inVector + ") ORDER BY timestamp DESC LIMIT ?" // nolint: gosec
var messages []SyncMessage var messages []SyncMessage
rows, err := p.db.Query(`SELECT id, type, group_id, payload, timestamp FROM peersyncing_messages WHERE group_id = ? ORDER BY timestamp DESC LIMIT ?`, groupID, limit) rows, err := p.db.Query(query, queryArgs...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -66,7 +78,7 @@ func (p *SyncMessageSQLitePersistence) ByGroupID(groupID []byte, limit int) ([]S
for rows.Next() { for rows.Next() {
var m SyncMessage var m SyncMessage
err := rows.Scan(&m.ID, &m.Type, &m.GroupID, &m.Payload, &m.Timestamp) err := rows.Scan(&m.ID, &m.Type, &m.ChatID, &m.Payload, &m.Timestamp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -87,7 +99,7 @@ func (p *SyncMessageSQLitePersistence) Complement(messages []SyncMessage) ([]Syn
} }
inVector := strings.Repeat("?, ", len(ids)-1) + "?" inVector := strings.Repeat("?, ", len(ids)-1) + "?"
query := "SELECT id, type, group_id, payload, timestamp FROM peersyncing_messages WHERE id IN (" + inVector + ")" // nolint: gosec query := "SELECT id, type, chat_id, payload, timestamp FROM peersyncing_messages WHERE id IN (" + inVector + ")" // nolint: gosec
availableMessages := make(map[string]SyncMessage) availableMessages := make(map[string]SyncMessage)
rows, err := p.db.Query(query, ids...) rows, err := p.db.Query(query, ids...)
@ -100,7 +112,7 @@ func (p *SyncMessageSQLitePersistence) Complement(messages []SyncMessage) ([]Syn
for rows.Next() { for rows.Next() {
var m SyncMessage var m SyncMessage
err := rows.Scan(&m.ID, &m.Type, &m.GroupID, &m.Payload, &m.Timestamp) err := rows.Scan(&m.ID, &m.Type, &m.ChatID, &m.Payload, &m.Timestamp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -120,42 +132,6 @@ func (p *SyncMessageSQLitePersistence) Complement(messages []SyncMessage) ([]Syn
return complement, nil return complement, nil
} }
func (p *SyncMessageSQLitePersistence) ByGroupIDs(ids [][]byte, limit int) ([]SyncMessage, error) {
if len(ids) == 0 {
return nil, nil
}
queryArgs := make([]interface{}, 0, len(ids))
for _, id := range ids {
queryArgs = append(queryArgs, id)
}
queryArgs = append(queryArgs, limit)
inVector := strings.Repeat("?, ", len(ids)-1) + "?"
query := "SELECT id, type, group_id, payload, timestamp FROM peersyncing_messages WHERE group_id IN (" + inVector + ") ORDER BY timestamp DESC LIMIT ?" // nolint: gosec
var messages []SyncMessage
rows, err := p.db.Query(query, queryArgs...)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var m SyncMessage
err := rows.Scan(&m.ID, &m.Type, &m.GroupID, &m.Payload, &m.Timestamp)
if err != nil {
return nil, err
}
messages = append(messages, m)
}
return messages, nil
}
func (p *SyncMessageSQLitePersistence) ByMessageIDs(ids [][]byte) ([]SyncMessage, error) { func (p *SyncMessageSQLitePersistence) ByMessageIDs(ids [][]byte) ([]SyncMessage, error) {
if len(ids) == 0 { if len(ids) == 0 {
return nil, nil return nil, nil
@ -167,7 +143,7 @@ func (p *SyncMessageSQLitePersistence) ByMessageIDs(ids [][]byte) ([]SyncMessage
} }
inVector := strings.Repeat("?, ", len(ids)-1) + "?" inVector := strings.Repeat("?, ", len(ids)-1) + "?"
query := "SELECT id, type, group_id, payload, timestamp FROM peersyncing_messages WHERE id IN (" + inVector + ")" // nolint: gosec query := "SELECT id, type, chat_id, payload, timestamp FROM peersyncing_messages WHERE id IN (" + inVector + ")" // nolint: gosec
var messages []SyncMessage var messages []SyncMessage
rows, err := p.db.Query(query, queryArgs...) rows, err := p.db.Query(query, queryArgs...)
@ -180,7 +156,7 @@ func (p *SyncMessageSQLitePersistence) ByMessageIDs(ids [][]byte) ([]SyncMessage
for rows.Next() { for rows.Next() {
var m SyncMessage var m SyncMessage
err := rows.Scan(&m.ID, &m.Type, &m.GroupID, &m.Payload, &m.Timestamp) err := rows.Scan(&m.ID, &m.Type, &m.ChatID, &m.Payload, &m.Timestamp)
if err != nil { if err != nil {
return nil, err return nil, err
} }