feat: add batch information and request ID in history requests (#2397)

This commit is contained in:
Richard Ramos 2021-11-09 17:16:19 -04:00 committed by GitHub
parent d35e0a339d
commit 7440c184c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 74 additions and 22 deletions

View File

@ -26,9 +26,10 @@ type MessengerSignalsHandler interface {
MessageDelivered(chatID string, messageID string) MessageDelivered(chatID string, messageID string)
CommunityInfoFound(community *communities.Community) CommunityInfoFound(community *communities.Community)
MessengerResponse(response *MessengerResponse) MessengerResponse(response *MessengerResponse)
HistoryRequestStarted() HistoryRequestStarted(requestID string, numBatches int)
HistoryRequestCompleted() HistoryRequestBatchProcessed(requestID string, batchIndex int, batchNum int)
HistoryRequestFailed(err error) HistoryRequestCompleted(requestID string)
HistoryRequestFailed(requestID string, err error)
} }
type config struct { type config struct {

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/pborman/uuid"
"github.com/pkg/errors" "github.com/pkg/errors"
"go.uber.org/zap" "go.uber.org/zap"
@ -261,26 +262,34 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse
syncedTopics = append(syncedTopics, topicData) syncedTopics = append(syncedTopics, topicData)
} }
m.logger.Debug("syncing topics") requestID := uuid.NewRandom().String()
m.logger.Debug("syncing topics", zap.Any("batches", batches), zap.Any("requestId", requestID))
if m.config.messengerSignalsHandler != nil { if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestStarted() m.config.messengerSignalsHandler.HistoryRequestStarted(requestID, len(batches))
} }
i := 0
for _, batch := range batches { for _, batch := range batches {
i++
err := m.processMailserverBatch(batch) err := m.processMailserverBatch(batch)
if err != nil { if err != nil {
m.logger.Error("error syncing topics", zap.Error(err)) m.logger.Error("error syncing topics", zap.Any("requestId", requestID), zap.Error(err))
if m.config.messengerSignalsHandler != nil { if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestFailed(err) m.config.messengerSignalsHandler.HistoryRequestFailed(requestID, err)
} }
return nil, err return nil, err
} }
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestBatchProcessed(requestID, i, len(batches))
}
} }
m.logger.Debug("topics synced") m.logger.Debug("topics synced")
if m.config.messengerSignalsHandler != nil { if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestCompleted() m.config.messengerSignalsHandler.HistoryRequestCompleted(requestID)
} }
err = m.mailserversDatabase.AddTopics(syncedTopics) err = m.mailserversDatabase.AddTopics(syncedTopics)
@ -435,11 +444,25 @@ func (m *Messenger) SyncChatFromSyncedFrom(chatID string) (uint32, error) {
Topics: topics, Topics: topics,
} }
requestID := uuid.NewRandom().String()
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestStarted(requestID, 1)
}
err = m.processMailserverBatch(batch) err = m.processMailserverBatch(batch)
if err != nil { if err != nil {
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestFailed(requestID, err)
}
return 0, err return 0, err
} }
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestBatchProcessed(requestID, 1, 1)
m.config.messengerSignalsHandler.HistoryRequestCompleted(requestID)
}
if chat.SyncedFrom == 0 || chat.SyncedFrom > batch.From { if chat.SyncedFrom == 0 || chat.SyncedFrom > batch.From {
chat.SyncedFrom = batch.From chat.SyncedFrom = batch.From
} }
@ -493,11 +516,25 @@ func (m *Messenger) FillGaps(chatID string, messageIDs []string) error {
Topics: topics, Topics: topics,
} }
requestID := uuid.NewRandom().String()
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestStarted(requestID, 1)
}
err = m.processMailserverBatch(batch) err = m.processMailserverBatch(batch)
if err != nil { if err != nil {
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestFailed(requestID, err)
}
return err return err
} }
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestBatchProcessed(requestID, 1, 1)
m.config.messengerSignalsHandler.HistoryRequestCompleted(requestID)
}
return m.persistence.DeleteMessages(messageIDs) return m.persistence.DeleteMessages(messageIDs)
} }

View File

@ -66,14 +66,18 @@ func (m *MessengerSignalsHandler) MessengerResponse(response *protocol.Messenger
PublisherSignalHandler{}.NewMessages(response) PublisherSignalHandler{}.NewMessages(response)
} }
func (m *MessengerSignalsHandler) HistoryRequestStarted() { func (m *MessengerSignalsHandler) HistoryRequestStarted(requestID string, numBatches int) {
signal.SendHistoricMessagesRequestStarted() signal.SendHistoricMessagesRequestStarted(requestID, numBatches)
} }
func (m *MessengerSignalsHandler) HistoryRequestFailed(err error) { func (m *MessengerSignalsHandler) HistoryRequestBatchProcessed(requestID string, batchIndex int, numBatches int) {
signal.SendHistoricMessagesRequestFailed(err) signal.SendHistoricMessagesRequestBatchProcessed(requestID, batchIndex, numBatches)
} }
func (m *MessengerSignalsHandler) HistoryRequestCompleted() { func (m *MessengerSignalsHandler) HistoryRequestFailed(requestID string, err error) {
signal.SendHistoricMessagesRequestCompleted() signal.SendHistoricMessagesRequestFailed(requestID, err)
}
func (m *MessengerSignalsHandler) HistoryRequestCompleted(requestID string) {
signal.SendHistoricMessagesRequestCompleted(requestID)
} }

View File

@ -38,7 +38,10 @@ const (
// EventHistoryRequestStarted is triggered before processing a mailserver batch // EventHistoryRequestStarted is triggered before processing a mailserver batch
EventHistoryRequestStarted = "history.request.started" EventHistoryRequestStarted = "history.request.started"
// EventHistoryRequestCompleted is triggered after processing a mailserver batch // EventHistoryBatchProcessed is triggered after processing a mailserver batch
EventHistoryBatchProcessed = "history.request.batch.processed"
// EventHistoryRequestCompleted is triggered after processing all mailserver batches
EventHistoryRequestCompleted = "history.request.completed" EventHistoryRequestCompleted = "history.request.completed"
// EventHistoryRequestFailed is triggered when requesting history messages fails // EventHistoryRequestFailed is triggered when requesting history messages fails
@ -61,7 +64,10 @@ type MailServerResponseSignal struct {
} }
type HistoryMessagesSignal struct { type HistoryMessagesSignal struct {
ErrorMsg string `json:"errorMessage,omitempty"` RequestID string `json:"requestId"`
BatchIndex int `json:"batchIndex"`
NumBatches int `json:"numBatches,omitempty"`
ErrorMsg string `json:"errorMessage,omitempty"`
} }
// DecryptMessageFailedSignal holds the sender of the message that could not be decrypted // DecryptMessageFailedSignal holds the sender of the message that could not be decrypted
@ -116,16 +122,20 @@ func SendEnvelopeExpired(identifiers [][]byte, err error) {
send(EventEnvelopeExpired, EnvelopeSignal{IDs: hexIdentifiers, Message: message}) send(EventEnvelopeExpired, EnvelopeSignal{IDs: hexIdentifiers, Message: message})
} }
func SendHistoricMessagesRequestStarted() { func SendHistoricMessagesRequestStarted(requestID string, numBatches int) {
send(EventHistoryRequestStarted, HistoryMessagesSignal{}) send(EventHistoryRequestStarted, HistoryMessagesSignal{RequestID: requestID, NumBatches: numBatches})
} }
func SendHistoricMessagesRequestFailed(err error) { func SendHistoricMessagesRequestBatchProcessed(requestID string, batchIndex int, numBatches int) {
send(EventHistoryRequestFailed, HistoryMessagesSignal{ErrorMsg: err.Error()}) send(EventHistoryBatchProcessed, HistoryMessagesSignal{RequestID: requestID, BatchIndex: batchIndex, NumBatches: numBatches})
} }
func SendHistoricMessagesRequestCompleted() { func SendHistoricMessagesRequestFailed(requestID string, err error) {
send(EventHistoryRequestCompleted, HistoryMessagesSignal{}) send(EventHistoryRequestFailed, HistoryMessagesSignal{RequestID: requestID, ErrorMsg: err.Error()})
}
func SendHistoricMessagesRequestCompleted(requestID string) {
send(EventHistoryRequestCompleted, HistoryMessagesSignal{RequestID: requestID})
} }
// SendMailServerRequestCompleted triggered when mail server response has been received // SendMailServerRequestCompleted triggered when mail server response has been received