diff --git a/protocol/messenger_config.go b/protocol/messenger_config.go index 4d842dbb4..7c2dbfa06 100644 --- a/protocol/messenger_config.go +++ b/protocol/messenger_config.go @@ -26,9 +26,10 @@ type MessengerSignalsHandler interface { MessageDelivered(chatID string, messageID string) CommunityInfoFound(community *communities.Community) MessengerResponse(response *MessengerResponse) - HistoryRequestStarted() - HistoryRequestCompleted() - HistoryRequestFailed(err error) + HistoryRequestStarted(requestID string, numBatches int) + HistoryRequestBatchProcessed(requestID string, batchIndex int, batchNum int) + HistoryRequestCompleted(requestID string) + HistoryRequestFailed(requestID string, err error) } type config struct { diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index 2144904f2..8b8485b99 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/pborman/uuid" "github.com/pkg/errors" "go.uber.org/zap" @@ -261,26 +262,34 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse syncedTopics = append(syncedTopics, topicData) } - m.logger.Debug("syncing topics") + requestID := uuid.NewRandom().String() + + m.logger.Debug("syncing topics", zap.Any("batches", batches), zap.Any("requestId", requestID)) if m.config.messengerSignalsHandler != nil { - m.config.messengerSignalsHandler.HistoryRequestStarted() + m.config.messengerSignalsHandler.HistoryRequestStarted(requestID, len(batches)) } + i := 0 for _, batch := range batches { + i++ err := m.processMailserverBatch(batch) if err != nil { - m.logger.Error("error syncing topics", zap.Error(err)) + m.logger.Error("error syncing topics", zap.Any("requestId", requestID), zap.Error(err)) if m.config.messengerSignalsHandler != nil { - m.config.messengerSignalsHandler.HistoryRequestFailed(err) + m.config.messengerSignalsHandler.HistoryRequestFailed(requestID, err) } return nil, err } + + if m.config.messengerSignalsHandler != nil { + m.config.messengerSignalsHandler.HistoryRequestBatchProcessed(requestID, i, len(batches)) + } } m.logger.Debug("topics synced") if m.config.messengerSignalsHandler != nil { - m.config.messengerSignalsHandler.HistoryRequestCompleted() + m.config.messengerSignalsHandler.HistoryRequestCompleted(requestID) } err = m.mailserversDatabase.AddTopics(syncedTopics) @@ -435,11 +444,25 @@ func (m *Messenger) SyncChatFromSyncedFrom(chatID string) (uint32, error) { Topics: topics, } + requestID := uuid.NewRandom().String() + + if m.config.messengerSignalsHandler != nil { + m.config.messengerSignalsHandler.HistoryRequestStarted(requestID, 1) + } + err = m.processMailserverBatch(batch) if err != nil { + if m.config.messengerSignalsHandler != nil { + m.config.messengerSignalsHandler.HistoryRequestFailed(requestID, err) + } return 0, err } + if m.config.messengerSignalsHandler != nil { + m.config.messengerSignalsHandler.HistoryRequestBatchProcessed(requestID, 1, 1) + m.config.messengerSignalsHandler.HistoryRequestCompleted(requestID) + } + if chat.SyncedFrom == 0 || chat.SyncedFrom > batch.From { chat.SyncedFrom = batch.From } @@ -493,11 +516,25 @@ func (m *Messenger) FillGaps(chatID string, messageIDs []string) error { Topics: topics, } + requestID := uuid.NewRandom().String() + + if m.config.messengerSignalsHandler != nil { + m.config.messengerSignalsHandler.HistoryRequestStarted(requestID, 1) + } + err = m.processMailserverBatch(batch) if err != nil { + if m.config.messengerSignalsHandler != nil { + m.config.messengerSignalsHandler.HistoryRequestFailed(requestID, err) + } return err } + if m.config.messengerSignalsHandler != nil { + m.config.messengerSignalsHandler.HistoryRequestBatchProcessed(requestID, 1, 1) + m.config.messengerSignalsHandler.HistoryRequestCompleted(requestID) + } + return m.persistence.DeleteMessages(messageIDs) } diff --git a/services/ext/signal.go b/services/ext/signal.go index 944a1427d..850cd1b84 100644 --- a/services/ext/signal.go +++ b/services/ext/signal.go @@ -66,14 +66,18 @@ func (m *MessengerSignalsHandler) MessengerResponse(response *protocol.Messenger PublisherSignalHandler{}.NewMessages(response) } -func (m *MessengerSignalsHandler) HistoryRequestStarted() { - signal.SendHistoricMessagesRequestStarted() +func (m *MessengerSignalsHandler) HistoryRequestStarted(requestID string, numBatches int) { + signal.SendHistoricMessagesRequestStarted(requestID, numBatches) } -func (m *MessengerSignalsHandler) HistoryRequestFailed(err error) { - signal.SendHistoricMessagesRequestFailed(err) +func (m *MessengerSignalsHandler) HistoryRequestBatchProcessed(requestID string, batchIndex int, numBatches int) { + signal.SendHistoricMessagesRequestBatchProcessed(requestID, batchIndex, numBatches) } -func (m *MessengerSignalsHandler) HistoryRequestCompleted() { - signal.SendHistoricMessagesRequestCompleted() +func (m *MessengerSignalsHandler) HistoryRequestFailed(requestID string, err error) { + signal.SendHistoricMessagesRequestFailed(requestID, err) +} + +func (m *MessengerSignalsHandler) HistoryRequestCompleted(requestID string) { + signal.SendHistoricMessagesRequestCompleted(requestID) } diff --git a/signal/events_shhext.go b/signal/events_shhext.go index d99565384..89f1a0b82 100644 --- a/signal/events_shhext.go +++ b/signal/events_shhext.go @@ -38,7 +38,10 @@ const ( // EventHistoryRequestStarted is triggered before processing a mailserver batch EventHistoryRequestStarted = "history.request.started" - // EventHistoryRequestCompleted is triggered after processing a mailserver batch + // EventHistoryBatchProcessed is triggered after processing a mailserver batch + EventHistoryBatchProcessed = "history.request.batch.processed" + + // EventHistoryRequestCompleted is triggered after processing all mailserver batches EventHistoryRequestCompleted = "history.request.completed" // EventHistoryRequestFailed is triggered when requesting history messages fails @@ -61,7 +64,10 @@ type MailServerResponseSignal struct { } type HistoryMessagesSignal struct { - ErrorMsg string `json:"errorMessage,omitempty"` + RequestID string `json:"requestId"` + BatchIndex int `json:"batchIndex"` + NumBatches int `json:"numBatches,omitempty"` + ErrorMsg string `json:"errorMessage,omitempty"` } // DecryptMessageFailedSignal holds the sender of the message that could not be decrypted @@ -116,16 +122,20 @@ func SendEnvelopeExpired(identifiers [][]byte, err error) { send(EventEnvelopeExpired, EnvelopeSignal{IDs: hexIdentifiers, Message: message}) } -func SendHistoricMessagesRequestStarted() { - send(EventHistoryRequestStarted, HistoryMessagesSignal{}) +func SendHistoricMessagesRequestStarted(requestID string, numBatches int) { + send(EventHistoryRequestStarted, HistoryMessagesSignal{RequestID: requestID, NumBatches: numBatches}) } -func SendHistoricMessagesRequestFailed(err error) { - send(EventHistoryRequestFailed, HistoryMessagesSignal{ErrorMsg: err.Error()}) +func SendHistoricMessagesRequestBatchProcessed(requestID string, batchIndex int, numBatches int) { + send(EventHistoryBatchProcessed, HistoryMessagesSignal{RequestID: requestID, BatchIndex: batchIndex, NumBatches: numBatches}) } -func SendHistoricMessagesRequestCompleted() { - send(EventHistoryRequestCompleted, HistoryMessagesSignal{}) +func SendHistoricMessagesRequestFailed(requestID string, err error) { + send(EventHistoryRequestFailed, HistoryMessagesSignal{RequestID: requestID, ErrorMsg: err.Error()}) +} + +func SendHistoricMessagesRequestCompleted(requestID string) { + send(EventHistoryRequestCompleted, HistoryMessagesSignal{RequestID: requestID}) } // SendMailServerRequestCompleted triggered when mail server response has been received