status-go/protocol/messenger_peersyncing.go
kaichao 47899fd045
feat_: hash based query for outgoing messages. (#5217)
* feat_: hash based query for outgoing messages.

* chore_: more logs

* chore_: fix comments

* chore_: do not lock when send queries

* chore_: use constant for magic number

* chore_: remove message ids from query queue after ack

* chore_: fix ack clean process

* chore_: fix message resend test

* chore_: add test for waku confirm message sent.

* chore_: fix tests.

* chore_: fix more

* chore_: set store peer id when mailserver updates

* fix_: tests

* chore_: increase max hash query length

* chore_: remove debug log of ack message

* chore_: remove automatic peer selection

* chore_: mark raw message to sent after ack

* chore_: fix test

* chore_: fix test
2024-06-11 15:45:01 +08:00

419 lines
11 KiB
Go

package protocol
import (
"context"
"crypto/ecdsa"
"encoding/hex"
"time"
"github.com/golang/protobuf/proto"
datasyncproto "github.com/status-im/mvds/protobuf"
"github.com/status-im/mvds/state"
"github.com/pkg/errors"
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/communities"
datasyncpeer "github.com/status-im/status-go/protocol/datasync/peer"
"github.com/status-im/status-go/protocol/encryption/sharedsecret"
"github.com/status-im/status-go/protocol/peersyncing"
v1protocol "github.com/status-im/status-go/protocol/v1"
)
var peerSyncingLoopInterval time.Duration = 60 * time.Second
var maxAdvertiseMessages = 40
func (m *Messenger) markDeliveredMessages(acks [][]byte) {
for _, ack := range acks {
//get message ID from database by datasync ID, with at-least-one
// semantic
messageIDBytes, err := m.persistence.MarkAsConfirmed(ack, true)
if err != nil {
m.logger.Info("got datasync acknowledge for message we don't have in db", zap.String("ack", hex.EncodeToString(ack)))
continue
}
messageID := messageIDBytes.String()
//mark messages as delivered
m.logger.Debug("got datasync acknowledge for message", zap.String("ack", hex.EncodeToString(ack)), zap.String("messageID", messageID))
err = m.UpdateMessageOutgoingStatus(messageID, common.OutgoingStatusDelivered)
if err != nil {
m.logger.Debug("Can't set message status as delivered", zap.Error(err))
}
err = m.UpdateRawMessageSent(messageID, true)
if err != nil {
m.logger.Debug("can't set raw message as sent", zap.Error(err))
}
m.transport.ConfirmMessageDelivered(messageID)
//send signal to client that message status updated
if m.config.messengerSignalsHandler != nil {
message, err := m.persistence.MessageByID(messageID)
if err != nil {
m.logger.Debug("Can't get message from database", zap.Error(err))
continue
}
m.config.messengerSignalsHandler.MessageDelivered(message.LocalChatID, messageID)
}
}
}
func (m *Messenger) handleDatasyncMetadata(response *common.HandleMessageResponse) error {
m.OnDatasyncAcks(response.DatasyncSender, response.DatasyncAcks)
if !m.featureFlags.Peersyncing {
return nil
}
isPeerSyncingEnabled, err := m.settings.GetPeerSyncingEnabled()
if err != nil {
return err
}
if !isPeerSyncingEnabled {
return nil
}
err = m.OnDatasyncOffer(response)
if err != nil {
return err
}
err = m.OnDatasyncRequests(response.DatasyncSender, response.DatasyncRequests)
if err != nil {
return err
}
return nil
}
func (m *Messenger) startPeerSyncingLoop() {
logger := m.logger.Named("PeerSyncingLoop")
ticker := time.NewTicker(peerSyncingLoopInterval)
go func() {
for {
select {
case <-ticker.C:
err := m.sendDatasyncOffers()
if err != nil {
m.logger.Warn("failed to send datasync offers", zap.Error(err))
}
case <-m.quit:
ticker.Stop()
logger.Debug("peersyncing loop stopped")
return
}
}
}()
}
func (m *Messenger) sendDatasyncOffers() error {
if !m.featureFlags.Peersyncing {
return nil
}
isPeerSyncingEnabled, err := m.settings.GetPeerSyncingEnabled()
if err != nil {
return err
}
if !isPeerSyncingEnabled {
return nil
}
err = m.sendDatasyncOffersForCommunities()
if err != nil {
return err
}
err = m.sendDatasyncOffersForChats()
if err != nil {
return err
}
// Check all the group ids that need to be on offer
// Get all the messages that need to be offered
// Prepare datasync messages
// Dispatch them to the right group
return nil
}
func (m *Messenger) sendDatasyncOffersForCommunities() error {
joinedCommunities, err := m.communitiesManager.Joined()
if err != nil {
return err
}
for _, community := range joinedCommunities {
var chatIDs [][]byte
for id := range community.Chats() {
chatIDs = append(chatIDs, []byte(community.IDString()+id))
}
if len(chatIDs) == 0 {
continue
}
availableMessagesMap, err := m.peersyncing.AvailableMessagesMapByChatIDs(chatIDs, maxAdvertiseMessages)
if err != nil {
return err
}
datasyncMessage := &datasyncproto.Payload{}
if len(availableMessagesMap) == 0 {
continue
}
for chatID, m := range availableMessagesMap {
datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: types.Hex2Bytes(chatID), MessageIds: m})
}
payload, err := proto.Marshal(datasyncMessage)
if err != nil {
return err
}
rawMessage := common.RawMessage{
Payload: payload,
Ephemeral: true,
SkipApplicationWrap: true,
PubsubTopic: community.PubsubTopic(),
}
_, err = m.sender.SendPublic(context.Background(), community.IDString(), rawMessage)
if err != nil {
return err
}
}
return nil
}
func (m *Messenger) sendDatasyncOffersForChats() error {
for _, chat := range m.Chats() {
chatIDBytes := []byte(chat.ID)
availableMessagesMap, err := m.peersyncing.AvailableMessagesMapByChatIDs([][]byte{chatIDBytes}, maxAdvertiseMessages)
if err != nil {
return err
}
datasyncMessage := &datasyncproto.Payload{}
if len(availableMessagesMap) == 0 {
continue
}
for _, message := range availableMessagesMap {
datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: chatIDBytes, MessageIds: message})
}
payload, err := proto.Marshal(datasyncMessage)
if err != nil {
return err
}
publicKey, err := chat.PublicKey()
if err != nil {
return err
}
rawMessage := common.RawMessage{
Payload: payload,
Ephemeral: true,
SkipApplicationWrap: true,
}
_, err = m.sender.SendPrivate(context.Background(), publicKey, &rawMessage)
if err != nil {
return err
}
}
return nil
}
func (m *Messenger) OnDatasyncOffer(response *common.HandleMessageResponse) error {
sender := response.DatasyncSender
offers := response.DatasyncOffers
if len(offers) == 0 {
return nil
}
if common.PubkeyToHex(sender) == m.myHexIdentity() {
return nil
}
var offeredMessages []peersyncing.SyncMessage
for _, o := range offers {
offeredMessages = append(offeredMessages, peersyncing.SyncMessage{ChatID: o.GroupID, ID: o.MessageID})
}
messagesToFetch, err := m.peersyncing.OnOffer(offeredMessages)
if err != nil {
return err
}
if len(messagesToFetch) == 0 {
return nil
}
datasyncMessage := &datasyncproto.Payload{}
for _, msg := range messagesToFetch {
idString := types.Bytes2Hex(msg.ID)
lastOffered := m.peersyncingOffers[idString]
timeNow := m.GetCurrentTimeInMillis() / 1000
if lastOffered+30 < timeNow {
m.peersyncingOffers[idString] = timeNow
datasyncMessage.Requests = append(datasyncMessage.Requests, msg.ID)
}
}
payload, err := proto.Marshal(datasyncMessage)
if err != nil {
return err
}
rawMessage := common.RawMessage{
LocalChatID: common.PubkeyToHex(sender),
Payload: payload,
Ephemeral: true,
SkipApplicationWrap: true,
}
_, err = m.sender.SendPrivate(context.Background(), sender, &rawMessage)
if err != nil {
return err
}
// Check if any of the things need to be added
// Reply if anything needs adding
// Ack any message that is out
return nil
}
// canSyncMessageWith checks the permission of a message
func (m *Messenger) canSyncMessageWith(message peersyncing.SyncMessage, peer *ecdsa.PublicKey) (bool, error) {
switch message.Type {
case peersyncing.SyncMessageCommunityType:
chat, ok := m.allChats.Load(string(message.ChatID))
if !ok {
return false, nil
}
community, err := m.communitiesManager.GetByIDString(chat.CommunityID)
if err != nil {
return false, err
}
return m.canSyncCommunityMessageWith(chat, community, peer)
case peersyncing.SyncMessageOneToOneType:
chat, ok := m.allChats.Load(string(message.ChatID))
if !ok {
return false, nil
}
return m.canSyncOneToOneMessageWith(chat, peer)
default:
return false, nil
}
}
// NOTE: This is not stricly correct. It's possible that you sync a message that has been
// posted after the banning of a user from a community, but before we realized that.
// As an approximation it should be ok, but worth thinking about how to address this.
func (m *Messenger) canSyncCommunityMessageWith(chat *Chat, community *communities.Community, peer *ecdsa.PublicKey) (bool, error) {
return community.IsMemberInChat(peer, chat.CommunityChatID()), nil
}
func (m *Messenger) canSyncOneToOneMessageWith(chat *Chat, peer *ecdsa.PublicKey) (bool, error) {
return chat.HasMember(common.PubkeyToHex(peer)), nil
}
func (m *Messenger) OnDatasyncRequests(requester *ecdsa.PublicKey, messageIDs [][]byte) error {
if len(messageIDs) == 0 {
return nil
}
messages, err := m.peersyncing.MessagesByIDs(messageIDs)
if err != nil {
return err
}
for _, msg := range messages {
canSync, err := m.canSyncMessageWith(msg, requester)
if err != nil {
return err
}
if !canSync {
continue
}
idString := common.PubkeyToHex(requester) + types.Bytes2Hex(msg.ID)
lastRequested := m.peersyncingRequests[idString]
timeNow := m.GetCurrentTimeInMillis() / 1000
if lastRequested+30 < timeNow {
m.peersyncingRequests[idString] = timeNow
// Check permissions
rawMessage := common.RawMessage{
LocalChatID: common.PubkeyToHex(requester),
Payload: msg.Payload,
Ephemeral: true,
SkipApplicationWrap: true,
}
_, err = m.sender.SendPrivate(context.Background(), requester, &rawMessage)
if err != nil {
return err
}
}
}
// no need of group id, since we can derive from message
return nil
}
func (m *Messenger) OnDatasyncAcks(sender *ecdsa.PublicKey, acks [][]byte) {
// we should make sure the sender can acknowledge those messages
m.markDeliveredMessages(acks)
}
// sendDataSync sends a message scheduled by the data sync layer.
// Data Sync layer calls this method "dispatch" function.
func (m *Messenger) sendDataSync(receiver state.PeerID, payload *datasyncproto.Payload) error {
ctx := context.Background()
if !payload.IsValid() {
m.logger.Error("payload is invalid")
return errors.New("payload is invalid")
}
marshalledPayload, err := proto.Marshal(payload)
if err != nil {
m.logger.Error("failed to marshal payload")
return err
}
publicKey, err := datasyncpeer.IDToPublicKey(receiver)
if err != nil {
m.logger.Error("failed to convert id to public key", zap.Error(err))
return err
}
// Calculate the messageIDs
messageIDs := make([][]byte, 0, len(payload.Messages))
hexMessageIDs := make([]string, 0, len(payload.Messages))
for _, payload := range payload.Messages {
mid := v1protocol.MessageID(&m.identity.PublicKey, payload.Body)
messageIDs = append(messageIDs, mid)
hexMessageIDs = append(hexMessageIDs, mid.String())
}
messageSpec, err := m.encryptor.BuildEncryptedMessage(m.identity, publicKey, marshalledPayload)
if err != nil {
return errors.Wrap(err, "failed to encrypt message")
}
// The shared secret needs to be handle before we send a message
// otherwise the topic might not be set up before we receive a message
err = m.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret})
if err != nil {
return err
}
hashes, newMessages, err := m.sender.SendMessageSpec(ctx, publicKey, messageSpec, messageIDs)
if err != nil {
m.logger.Error("failed to send a datasync message", zap.Error(err))
return err
}
m.logger.Debug("sent private messages", zap.Any("messageIDs", hexMessageIDs), zap.Strings("hashes", types.EncodeHexes(hashes)))
m.transport.TrackMany(messageIDs, hashes, newMessages)
return nil
}