mirror of
https://github.com/status-im/status-go.git
synced 2025-01-12 07:35:02 +00:00
38308d48f2
* feat_: log error and stacktrace when panic in goroutine * test_: add test TestSafeGo * chore_: rename logAndCall to call * chore_: rename SafeGo to Go * chore_: make lint-fix * chore_: use t.Cleanup * chore_: Revert "chore_: use t.Cleanup" This reverts commit 4eb420d179cc0e208e84c13cb941e6b3d1ed9819. * chore_: Revert "chore_: make lint-fix" This reverts commit fcc995f157e671a4229b47419c3a0e4004b5fdab. * chore_: Revert "chore_: rename SafeGo to Go" This reverts commit a6d73d6df583f313032d79aac62f66328039cb55. * chore_: Revert "chore_: rename logAndCall to call" This reverts commit 8fbe993bedb9fbba67349a44f151e2dd5e3bc4cc. * chore_: Revert "test_: add test TestSafeGo" This reverts commit a1fa91839f3960398980c6bf456e6462ec944819. * chore_: Revert "feat_: log error and stacktrace when panic in goroutine" This reverts commit f612dd828fa2ce410d0e806fe773ecbe3e86a68a. * feat_: log error and stacktrace when panic in goroutine * chore_: make lint-fix * chore_: rename logAndCall to call * chore_: renaming LogOnPanic * chore_: update rest goroutine function calls * chore_: make lint-fix
422 lines
12 KiB
Go
422 lines
12 KiB
Go
package protocol
|
|
|
|
import (
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"encoding/hex"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
datasyncproto "github.com/status-im/mvds/protobuf"
|
|
"github.com/status-im/mvds/state"
|
|
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/zap"
|
|
|
|
gocommon "github.com/status-im/status-go/common"
|
|
"github.com/status-im/status-go/eth-node/types"
|
|
"github.com/status-im/status-go/protocol/common"
|
|
"github.com/status-im/status-go/protocol/communities"
|
|
datasyncpeer "github.com/status-im/status-go/protocol/datasync/peer"
|
|
"github.com/status-im/status-go/protocol/encryption/sharedsecret"
|
|
"github.com/status-im/status-go/protocol/peersyncing"
|
|
v1protocol "github.com/status-im/status-go/protocol/v1"
|
|
)
|
|
|
|
var peerSyncingLoopInterval time.Duration = 60 * time.Second
|
|
var maxAdvertiseMessages = 40
|
|
|
|
func (m *Messenger) markDeliveredMessages(acks [][]byte) {
|
|
for _, ack := range acks {
|
|
//get message ID from database by datasync ID, with at-least-one
|
|
// semantic
|
|
messageIDBytes, err := m.persistence.MarkAsConfirmed(ack, true)
|
|
if err != nil {
|
|
m.logger.Info("got datasync acknowledge for message we don't have in db", zap.String("ack", hex.EncodeToString(ack)))
|
|
continue
|
|
}
|
|
|
|
messageID := messageIDBytes.String()
|
|
//mark messages as delivered
|
|
|
|
m.logger.Debug("got datasync acknowledge for message", zap.String("ack", hex.EncodeToString(ack)), zap.String("messageID", messageID))
|
|
|
|
err = m.UpdateMessageOutgoingStatus(messageID, common.OutgoingStatusDelivered)
|
|
if err != nil {
|
|
m.logger.Debug("Can't set message status as delivered", zap.Error(err))
|
|
}
|
|
|
|
err = m.UpdateRawMessageSent(messageID, true)
|
|
if err != nil {
|
|
m.logger.Debug("can't set raw message as sent", zap.Error(err))
|
|
}
|
|
|
|
m.transport.ConfirmMessageDelivered(messageID)
|
|
|
|
//send signal to client that message status updated
|
|
if m.config.messengerSignalsHandler != nil {
|
|
message, err := m.persistence.MessageByID(messageID)
|
|
if err != nil {
|
|
m.logger.Debug("Can't get message from database", zap.Error(err))
|
|
continue
|
|
}
|
|
m.config.messengerSignalsHandler.MessageDelivered(message.LocalChatID, messageID)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Messenger) handleDatasyncMetadata(response *common.HandleMessageResponse) error {
|
|
m.OnDatasyncAcks(response.DatasyncSender, response.DatasyncAcks)
|
|
|
|
if !m.featureFlags.Peersyncing {
|
|
return nil
|
|
}
|
|
|
|
isPeerSyncingEnabled, err := m.settings.GetPeerSyncingEnabled()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !isPeerSyncingEnabled {
|
|
return nil
|
|
}
|
|
|
|
err = m.OnDatasyncOffer(response)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = m.OnDatasyncRequests(response.DatasyncSender, response.DatasyncRequests)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *Messenger) startPeerSyncingLoop() {
|
|
logger := m.logger.Named("PeerSyncingLoop")
|
|
|
|
ticker := time.NewTicker(peerSyncingLoopInterval)
|
|
go func() {
|
|
defer gocommon.LogOnPanic()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
err := m.sendDatasyncOffers()
|
|
if err != nil {
|
|
m.logger.Warn("failed to send datasync offers", zap.Error(err))
|
|
}
|
|
|
|
case <-m.quit:
|
|
ticker.Stop()
|
|
logger.Debug("peersyncing loop stopped")
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (m *Messenger) sendDatasyncOffers() error {
|
|
if !m.featureFlags.Peersyncing {
|
|
return nil
|
|
}
|
|
|
|
isPeerSyncingEnabled, err := m.settings.GetPeerSyncingEnabled()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !isPeerSyncingEnabled {
|
|
return nil
|
|
}
|
|
|
|
err = m.sendDatasyncOffersForCommunities()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = m.sendDatasyncOffersForChats()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Check all the group ids that need to be on offer
|
|
// Get all the messages that need to be offered
|
|
// Prepare datasync messages
|
|
// Dispatch them to the right group
|
|
return nil
|
|
}
|
|
|
|
func (m *Messenger) sendDatasyncOffersForCommunities() error {
|
|
joinedCommunities, err := m.communitiesManager.Joined()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, community := range joinedCommunities {
|
|
var chatIDs [][]byte
|
|
for id := range community.Chats() {
|
|
chatIDs = append(chatIDs, []byte(community.IDString()+id))
|
|
}
|
|
if len(chatIDs) == 0 {
|
|
continue
|
|
}
|
|
availableMessagesMap, err := m.peersyncing.AvailableMessagesMapByChatIDs(chatIDs, maxAdvertiseMessages)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
datasyncMessage := &datasyncproto.Payload{}
|
|
if len(availableMessagesMap) == 0 {
|
|
continue
|
|
}
|
|
for chatID, m := range availableMessagesMap {
|
|
datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: types.Hex2Bytes(chatID), MessageIds: m})
|
|
}
|
|
payload, err := proto.Marshal(datasyncMessage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
rawMessage := common.RawMessage{
|
|
Payload: payload,
|
|
Ephemeral: true,
|
|
SkipApplicationWrap: true,
|
|
PubsubTopic: community.PubsubTopic(),
|
|
Priority: &common.LowPriority,
|
|
}
|
|
_, err = m.sender.SendPublic(context.Background(), community.IDString(), rawMessage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *Messenger) sendDatasyncOffersForChats() error {
|
|
for _, chat := range m.Chats() {
|
|
chatIDBytes := []byte(chat.ID)
|
|
availableMessagesMap, err := m.peersyncing.AvailableMessagesMapByChatIDs([][]byte{chatIDBytes}, maxAdvertiseMessages)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
datasyncMessage := &datasyncproto.Payload{}
|
|
if len(availableMessagesMap) == 0 {
|
|
continue
|
|
}
|
|
for _, message := range availableMessagesMap {
|
|
datasyncMessage.GroupOffers = append(datasyncMessage.GroupOffers, &datasyncproto.Offer{GroupId: chatIDBytes, MessageIds: message})
|
|
}
|
|
payload, err := proto.Marshal(datasyncMessage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
publicKey, err := chat.PublicKey()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
rawMessage := common.RawMessage{
|
|
Payload: payload,
|
|
Ephemeral: true,
|
|
SkipApplicationWrap: true,
|
|
}
|
|
_, err = m.sender.SendPrivate(context.Background(), publicKey, &rawMessage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *Messenger) OnDatasyncOffer(response *common.HandleMessageResponse) error {
|
|
sender := response.DatasyncSender
|
|
offers := response.DatasyncOffers
|
|
|
|
if len(offers) == 0 {
|
|
return nil
|
|
}
|
|
|
|
if common.PubkeyToHex(sender) == m.myHexIdentity() {
|
|
return nil
|
|
}
|
|
|
|
var offeredMessages []peersyncing.SyncMessage
|
|
|
|
for _, o := range offers {
|
|
offeredMessages = append(offeredMessages, peersyncing.SyncMessage{ChatID: o.GroupID, ID: o.MessageID})
|
|
}
|
|
|
|
messagesToFetch, err := m.peersyncing.OnOffer(offeredMessages)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(messagesToFetch) == 0 {
|
|
return nil
|
|
}
|
|
|
|
datasyncMessage := &datasyncproto.Payload{}
|
|
for _, msg := range messagesToFetch {
|
|
idString := types.Bytes2Hex(msg.ID)
|
|
lastOffered := m.peersyncingOffers[idString]
|
|
timeNow := m.GetCurrentTimeInMillis() / 1000
|
|
if lastOffered+30 < timeNow {
|
|
m.peersyncingOffers[idString] = timeNow
|
|
datasyncMessage.Requests = append(datasyncMessage.Requests, msg.ID)
|
|
}
|
|
}
|
|
payload, err := proto.Marshal(datasyncMessage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
rawMessage := common.RawMessage{
|
|
LocalChatID: common.PubkeyToHex(sender),
|
|
Payload: payload,
|
|
Ephemeral: true,
|
|
SkipApplicationWrap: true,
|
|
}
|
|
_, err = m.sender.SendPrivate(context.Background(), sender, &rawMessage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Check if any of the things need to be added
|
|
// Reply if anything needs adding
|
|
// Ack any message that is out
|
|
return nil
|
|
}
|
|
|
|
// canSyncMessageWith checks the permission of a message
|
|
func (m *Messenger) canSyncMessageWith(message peersyncing.SyncMessage, peer *ecdsa.PublicKey) (bool, error) {
|
|
switch message.Type {
|
|
case peersyncing.SyncMessageCommunityType:
|
|
chat, ok := m.allChats.Load(string(message.ChatID))
|
|
if !ok {
|
|
return false, nil
|
|
}
|
|
community, err := m.communitiesManager.GetByIDString(chat.CommunityID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return m.canSyncCommunityMessageWith(chat, community, peer)
|
|
case peersyncing.SyncMessageOneToOneType:
|
|
chat, ok := m.allChats.Load(string(message.ChatID))
|
|
if !ok {
|
|
return false, nil
|
|
}
|
|
return m.canSyncOneToOneMessageWith(chat, peer)
|
|
default:
|
|
return false, nil
|
|
}
|
|
}
|
|
|
|
// NOTE: This is not stricly correct. It's possible that you sync a message that has been
|
|
// posted after the banning of a user from a community, but before we realized that.
|
|
// As an approximation it should be ok, but worth thinking about how to address this.
|
|
func (m *Messenger) canSyncCommunityMessageWith(chat *Chat, community *communities.Community, peer *ecdsa.PublicKey) (bool, error) {
|
|
return community.IsMemberInChat(peer, chat.CommunityChatID()), nil
|
|
}
|
|
|
|
func (m *Messenger) canSyncOneToOneMessageWith(chat *Chat, peer *ecdsa.PublicKey) (bool, error) {
|
|
return chat.HasMember(common.PubkeyToHex(peer)), nil
|
|
}
|
|
|
|
func (m *Messenger) OnDatasyncRequests(requester *ecdsa.PublicKey, messageIDs [][]byte) error {
|
|
if len(messageIDs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
messages, err := m.peersyncing.MessagesByIDs(messageIDs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, msg := range messages {
|
|
canSync, err := m.canSyncMessageWith(msg, requester)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !canSync {
|
|
continue
|
|
}
|
|
idString := common.PubkeyToHex(requester) + types.Bytes2Hex(msg.ID)
|
|
lastRequested := m.peersyncingRequests[idString]
|
|
timeNow := m.GetCurrentTimeInMillis() / 1000
|
|
if lastRequested+30 < timeNow {
|
|
m.peersyncingRequests[idString] = timeNow
|
|
|
|
// Check permissions
|
|
rawMessage := common.RawMessage{
|
|
LocalChatID: common.PubkeyToHex(requester),
|
|
Payload: msg.Payload,
|
|
Ephemeral: true,
|
|
SkipApplicationWrap: true,
|
|
}
|
|
_, err = m.sender.SendPrivate(context.Background(), requester, &rawMessage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
}
|
|
// no need of group id, since we can derive from message
|
|
return nil
|
|
}
|
|
|
|
func (m *Messenger) OnDatasyncAcks(sender *ecdsa.PublicKey, acks [][]byte) {
|
|
// we should make sure the sender can acknowledge those messages
|
|
m.markDeliveredMessages(acks)
|
|
}
|
|
|
|
// sendDataSync sends a message scheduled by the data sync layer.
|
|
// Data Sync layer calls this method "dispatch" function.
|
|
func (m *Messenger) sendDataSync(receiver state.PeerID, payload *datasyncproto.Payload) error {
|
|
ctx := context.Background()
|
|
if !payload.IsValid() {
|
|
m.logger.Error("payload is invalid")
|
|
return errors.New("payload is invalid")
|
|
}
|
|
|
|
marshalledPayload, err := proto.Marshal(payload)
|
|
if err != nil {
|
|
m.logger.Error("failed to marshal payload")
|
|
return err
|
|
}
|
|
|
|
publicKey, err := datasyncpeer.IDToPublicKey(receiver)
|
|
if err != nil {
|
|
m.logger.Error("failed to convert id to public key", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
// Calculate the messageIDs
|
|
messageIDs := make([][]byte, 0, len(payload.Messages))
|
|
hexMessageIDs := make([]string, 0, len(payload.Messages))
|
|
for _, payload := range payload.Messages {
|
|
mid := v1protocol.MessageID(&m.identity.PublicKey, payload.Body)
|
|
messageIDs = append(messageIDs, mid)
|
|
hexMessageIDs = append(hexMessageIDs, mid.String())
|
|
}
|
|
|
|
messageSpec, err := m.encryptor.BuildEncryptedMessage(m.identity, publicKey, marshalledPayload)
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to encrypt message")
|
|
}
|
|
|
|
// The shared secret needs to be handle before we send a message
|
|
// otherwise the topic might not be set up before we receive a message
|
|
err = m.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
hashes, newMessages, err := m.sender.SendMessageSpec(ctx, publicKey, messageSpec, messageIDs)
|
|
if err != nil {
|
|
m.logger.Error("failed to send a datasync message", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
m.logger.Debug("sent private messages", zap.Any("messageIDs", hexMessageIDs), zap.Strings("hashes", types.EncodeHexes(hashes)))
|
|
m.transport.TrackMany(messageIDs, hashes, newMessages)
|
|
|
|
return nil
|
|
}
|