442 lines
11 KiB
Go
442 lines
11 KiB
Go
package protocol
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/status-im/status-go/eth-node/crypto"
|
|
"github.com/status-im/status-go/eth-node/types"
|
|
"github.com/status-im/status-go/protocol/common"
|
|
"github.com/status-im/status-go/protocol/protobuf"
|
|
"github.com/status-im/status-go/protocol/transport"
|
|
"github.com/status-im/status-go/services/mailservers"
|
|
)
|
|
|
|
func (m *Messenger) connectedToMailserver() bool {
|
|
return m.online() && m.mailserver != nil
|
|
}
|
|
|
|
func (m *Messenger) scheduleSyncChat(chat *Chat) {
|
|
useMailservers, err := m.settings.GetUseMailservers()
|
|
if err != nil {
|
|
m.logger.Error("failed to get use mailservers", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
if !useMailservers || !m.connectedToMailserver() {
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
response, err := m.syncChat(chat.ID)
|
|
|
|
if err != nil {
|
|
m.logger.Error("failed to sync chat", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
if m.config.messengerSignalsHandler != nil {
|
|
m.config.messengerSignalsHandler.MessengerResponse(response)
|
|
}
|
|
|
|
}()
|
|
}
|
|
|
|
func (m *Messenger) scheduleSyncFilter(filter *transport.Filter) {
|
|
m.scheduleSyncFilters([]*transport.Filter{filter})
|
|
|
|
}
|
|
func (m *Messenger) scheduleSyncFilters(filters []*transport.Filter) {
|
|
useMailservers, err := m.settings.GetUseMailservers()
|
|
if err != nil {
|
|
m.logger.Error("failed to get use mailservers", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
if !useMailservers || !m.connectedToMailserver() {
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
response, err := m.syncFilters(filters)
|
|
|
|
if err != nil {
|
|
m.logger.Error("failed to sync filter", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
if m.config.messengerSignalsHandler != nil {
|
|
m.config.messengerSignalsHandler.MessengerResponse(response)
|
|
}
|
|
|
|
}()
|
|
}
|
|
|
|
func (m *Messenger) calculateMailserverTo() uint32 {
|
|
return uint32(m.getTimesource().GetCurrentTime() / 1000)
|
|
}
|
|
|
|
func (m *Messenger) filtersForChat(chatID string) ([]*transport.Filter, error) {
|
|
chat, ok := m.allChats.Load(chatID)
|
|
if !ok {
|
|
return nil, ErrChatNotFound
|
|
}
|
|
var filters []*transport.Filter
|
|
|
|
if chat.OneToOne() {
|
|
// We sync our own topic and any eventual negotiated
|
|
publicKeys := []string{common.PubkeyToHex(&m.identity.PublicKey), chatID}
|
|
|
|
filters = m.transport.FiltersByIdentities(publicKeys)
|
|
|
|
} else if chat.PrivateGroupChat() {
|
|
var publicKeys []string
|
|
for _, m := range chat.Members {
|
|
publicKeys = append(publicKeys, m.ID)
|
|
}
|
|
|
|
filters = m.transport.FiltersByIdentities(publicKeys)
|
|
|
|
} else {
|
|
filter := m.transport.FilterByChatID(chatID)
|
|
if filter == nil {
|
|
return nil, errors.New("no filter registered for given chat")
|
|
}
|
|
filters = []*transport.Filter{filter}
|
|
}
|
|
|
|
return filters, nil
|
|
}
|
|
|
|
func (m *Messenger) topicsForChat(chatID string) ([]types.TopicType, error) {
|
|
filters, err := m.filtersForChat(chatID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var topics []types.TopicType
|
|
|
|
for _, filter := range filters {
|
|
topics = append(topics, filter.Topic)
|
|
}
|
|
|
|
return topics, nil
|
|
}
|
|
|
|
// Assume is a public chat for now
|
|
func (m *Messenger) syncChat(chatID string) (*MessengerResponse, error) {
|
|
filters, err := m.filtersForChat(chatID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return m.syncFilters(filters)
|
|
}
|
|
|
|
func (m *Messenger) defaultSyncInterval() uint32 {
|
|
return 60 * 60 * 24
|
|
}
|
|
|
|
func (m *Messenger) defaultSyncPeriod() uint32 {
|
|
return uint32(m.getTimesource().GetCurrentTime()/1000) - m.defaultSyncInterval()
|
|
}
|
|
|
|
// capSyncPeriod caps the sync period to the default
|
|
func (m *Messenger) capSyncPeriod(period uint32) uint32 {
|
|
d := uint32(m.defaultSyncPeriod())
|
|
if d > period {
|
|
return d
|
|
}
|
|
return period
|
|
}
|
|
|
|
// RequestAllHistoricMessages requests all the historic messages for any topic
|
|
func (m *Messenger) RequestAllHistoricMessages() (*MessengerResponse, error) {
|
|
useMailservers, err := m.settings.GetUseMailservers()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !useMailservers || !m.connectedToMailserver() {
|
|
return nil, nil
|
|
}
|
|
|
|
return m.syncFilters(m.transport.Filters())
|
|
}
|
|
|
|
func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse, error) {
|
|
response := &MessengerResponse{}
|
|
topicInfo, err := m.mailserversDatabase.Topics()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
topicsData := make(map[string]mailservers.MailserverTopic)
|
|
for _, topic := range topicInfo {
|
|
topicsData[topic.Topic] = topic
|
|
}
|
|
|
|
batches := make(map[int]MailserverBatch)
|
|
|
|
var syncedChatIDs []string
|
|
|
|
to := m.calculateMailserverTo()
|
|
var syncedTopics []mailservers.MailserverTopic
|
|
for _, filter := range filters {
|
|
if !filter.Listen || filter.Ephemeral {
|
|
continue
|
|
}
|
|
|
|
var chatID string
|
|
// If the filter has an identity, we use it as a chatID, otherwise is a public chat/community chat filter
|
|
if len(filter.Identity) != 0 {
|
|
chatID = filter.Identity
|
|
} else {
|
|
chatID = filter.ChatID
|
|
}
|
|
|
|
syncedChatIDs = append(syncedChatIDs, chatID)
|
|
|
|
topicData, ok := topicsData[filter.Topic.String()]
|
|
if !ok {
|
|
topicData = mailservers.MailserverTopic{
|
|
Topic: filter.Topic.String(),
|
|
LastRequest: int(m.defaultSyncPeriod()),
|
|
}
|
|
}
|
|
batch, ok := batches[topicData.LastRequest]
|
|
if !ok {
|
|
from := m.capSyncPeriod(uint32(topicData.LastRequest))
|
|
batch = MailserverBatch{From: from, To: to}
|
|
}
|
|
|
|
batch.ChatIDs = append(batch.ChatIDs, chatID)
|
|
batch.Topics = append(batch.Topics, filter.Topic)
|
|
batches[topicData.LastRequest] = batch
|
|
// Set last request to the new `to`
|
|
topicData.LastRequest = int(to)
|
|
syncedTopics = append(syncedTopics, topicData)
|
|
}
|
|
|
|
m.logger.Info("syncing topics", zap.Any("batches", batches))
|
|
for _, batch := range batches {
|
|
err := m.processMailserverBatch(batch)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
err = m.mailserversDatabase.AddTopics(syncedTopics)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var messagesToBeSaved []*common.Message
|
|
for _, batch := range batches {
|
|
for _, id := range batch.ChatIDs {
|
|
chat, ok := m.allChats.Load(id)
|
|
if !ok || !chat.Active || chat.Timeline() || chat.ProfileUpdates() {
|
|
continue
|
|
}
|
|
gap, err := m.calculateGapForChat(chat, batch.From)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if chat.SyncedFrom == 0 || chat.SyncedFrom > batch.From {
|
|
chat.SyncedFrom = batch.From
|
|
}
|
|
|
|
chat.SyncedTo = to
|
|
|
|
err = m.persistence.SetSyncTimestamps(chat.SyncedFrom, chat.SyncedTo, chat.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response.AddChat(chat)
|
|
if gap != nil {
|
|
response.AddMessage(gap)
|
|
messagesToBeSaved = append(messagesToBeSaved, gap)
|
|
}
|
|
// Calculate gaps
|
|
// If last-synced is 0, no gaps
|
|
// If last-synced < from, create gap from
|
|
}
|
|
}
|
|
|
|
if len(messagesToBeSaved) > 0 {
|
|
err := m.persistence.SaveMessages(messagesToBeSaved)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return response, nil
|
|
}
|
|
|
|
func (m *Messenger) calculateGapForChat(chat *Chat, from uint32) (*common.Message, error) {
|
|
// Chat was never synced, no gap necessary
|
|
if chat.SyncedTo == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
// If we filled the gap, nothing to do
|
|
if chat.SyncedTo >= from {
|
|
return nil, nil
|
|
}
|
|
|
|
timestamp := m.getTimesource().GetCurrentTime()
|
|
|
|
message := &common.Message{
|
|
ChatMessage: protobuf.ChatMessage{
|
|
ChatId: chat.ID,
|
|
Text: "Gap message",
|
|
MessageType: protobuf.MessageType_SYSTEM_MESSAGE_GAP,
|
|
ContentType: protobuf.ChatMessage_SYSTEM_MESSAGE_GAP,
|
|
Clock: uint64(from) * 1000,
|
|
Timestamp: timestamp,
|
|
},
|
|
GapParameters: &common.GapParameters{
|
|
From: chat.SyncedTo,
|
|
To: from,
|
|
},
|
|
From: common.PubkeyToHex(&m.identity.PublicKey),
|
|
WhisperTimestamp: timestamp,
|
|
LocalChatID: chat.ID,
|
|
Seen: true,
|
|
ID: types.EncodeHex(crypto.Keccak256([]byte(fmt.Sprintf("%s-%d-%d", chat.ID, chat.SyncedTo, from)))),
|
|
}
|
|
|
|
return message, m.persistence.SaveMessages([]*common.Message{message})
|
|
}
|
|
|
|
func (m *Messenger) processMailserverBatch(batch MailserverBatch) error {
|
|
m.logger.Info("syncing topic", zap.Any("topic", batch.Topics), zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(batch.To)))
|
|
cursor, err := m.transport.SendMessagesRequestForTopics(context.Background(), m.mailserver, batch.From, batch.To, nil, batch.Topics, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for len(cursor) != 0 {
|
|
m.logger.Info("retrieved cursor", zap.Any("cursor", cursor))
|
|
|
|
cursor, err = m.transport.SendMessagesRequest(context.Background(), m.mailserver, batch.From, batch.To, cursor, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
m.logger.Info("synced topic", zap.Any("topic", batch.Topics), zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(batch.To)))
|
|
return nil
|
|
}
|
|
|
|
type MailserverBatch struct {
|
|
From uint32
|
|
To uint32
|
|
Cursor string
|
|
Topics []types.TopicType
|
|
ChatIDs []string
|
|
}
|
|
|
|
func (m *Messenger) RequestHistoricMessagesForFilter(
|
|
ctx context.Context,
|
|
from, to uint32,
|
|
cursor []byte,
|
|
filter *transport.Filter,
|
|
waitForResponse bool,
|
|
) ([]byte, error) {
|
|
if m.mailserver == nil {
|
|
return nil, errors.New("no mailserver selected")
|
|
}
|
|
|
|
return m.transport.SendMessagesRequestForFilter(ctx, m.mailserver, from, to, cursor, filter, waitForResponse)
|
|
}
|
|
|
|
func (m *Messenger) SyncChatFromSyncedFrom(chatID string) (uint32, error) {
|
|
topics, err := m.topicsForChat(chatID)
|
|
if err != nil {
|
|
return 0, nil
|
|
}
|
|
chat, ok := m.allChats.Load(chatID)
|
|
if !ok {
|
|
return 0, ErrChatNotFound
|
|
}
|
|
|
|
batch := MailserverBatch{
|
|
ChatIDs: []string{chatID},
|
|
To: chat.SyncedFrom,
|
|
From: chat.SyncedFrom - m.defaultSyncInterval(),
|
|
Topics: topics,
|
|
}
|
|
|
|
err = m.processMailserverBatch(batch)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if chat.SyncedFrom == 0 || chat.SyncedFrom > batch.From {
|
|
chat.SyncedFrom = batch.From
|
|
}
|
|
|
|
err = m.persistence.SetSyncTimestamps(batch.From, chat.SyncedTo, chat.ID)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return batch.From, nil
|
|
}
|
|
|
|
func (m *Messenger) FillGaps(chatID string, messageIDs []string) error {
|
|
messages, err := m.persistence.MessagesByIDs(messageIDs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, ok := m.allChats.Load(chatID)
|
|
if !ok {
|
|
return errors.New("chat not existing")
|
|
}
|
|
|
|
topics, err := m.topicsForChat(chatID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var lowestFrom, highestTo uint32
|
|
|
|
for _, message := range messages {
|
|
if message.GapParameters == nil {
|
|
return errors.New("can't sync non-gap message")
|
|
}
|
|
|
|
if lowestFrom == 0 || lowestFrom > message.GapParameters.From {
|
|
lowestFrom = message.GapParameters.From
|
|
}
|
|
|
|
if highestTo < message.GapParameters.To {
|
|
highestTo = message.GapParameters.To
|
|
}
|
|
}
|
|
|
|
batch := MailserverBatch{
|
|
ChatIDs: []string{chatID},
|
|
To: highestTo,
|
|
From: lowestFrom,
|
|
Topics: topics,
|
|
}
|
|
|
|
err = m.processMailserverBatch(batch)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return m.persistence.DeleteMessages(messageIDs)
|
|
}
|
|
|
|
func (m *Messenger) LoadFilters(filters []*transport.Filter) ([]*transport.Filter, error) {
|
|
return m.transport.LoadFilters(filters)
|
|
}
|
|
|
|
func (m *Messenger) RemoveFilters(filters []*transport.Filter) error {
|
|
return m.transport.RemoveFilters(filters)
|
|
}
|