feat: add signals that are emitted before/after history messages are requested (#2378)

* fix: sqlite variable limit

* feat: add signal for history messages
This commit is contained in:
Richard Ramos 2021-09-28 14:41:35 -04:00 committed by GitHub
parent 5c55ab5264
commit 3b05d379bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 54 additions and 0 deletions

View File

@ -22,6 +22,9 @@ type MessengerSignalsHandler interface {
MessageDelivered(chatID string, messageID string) MessageDelivered(chatID string, messageID string)
CommunityInfoFound(community *communities.Community) CommunityInfoFound(community *communities.Community)
MessengerResponse(response *MessengerResponse) MessengerResponse(response *MessengerResponse)
HistoryRequestStarted()
HistoryRequestCompleted()
HistoryRequestFailed(err error)
} }
type config struct { type config struct {

View File

@ -262,13 +262,27 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse
} }
m.logger.Info("syncing topics", zap.Any("batches", batches)) m.logger.Info("syncing topics", zap.Any("batches", batches))
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestStarted()
}
for _, batch := range batches { for _, batch := range batches {
err := m.processMailserverBatch(batch) err := m.processMailserverBatch(batch)
if err != nil { if err != nil {
m.logger.Info("error syncing topics", zap.Any("error", err))
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestFailed(err)
}
return nil, err return nil, err
} }
} }
m.logger.Info("topics synced")
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestCompleted()
}
err = m.mailserversDatabase.AddTopics(syncedTopics) err = m.mailserversDatabase.AddTopics(syncedTopics)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -65,3 +65,15 @@ func (m MessengerSignalsHandler) CommunityInfoFound(community *communities.Commu
func (m *MessengerSignalsHandler) MessengerResponse(response *protocol.MessengerResponse) { func (m *MessengerSignalsHandler) MessengerResponse(response *protocol.MessengerResponse) {
PublisherSignalHandler{}.NewMessages(response) PublisherSignalHandler{}.NewMessages(response)
} }
func (m *MessengerSignalsHandler) HistoryRequestStarted() {
signal.SendHistoricMessagesRequestStarted()
}
func (m *MessengerSignalsHandler) HistoryRequestFailed(err error) {
signal.SendHistoricMessagesRequestFailed(err)
}
func (m *MessengerSignalsHandler) HistoryRequestCompleted() {
signal.SendHistoricMessagesRequestCompleted()
}

View File

@ -34,6 +34,15 @@ const (
// EventNewMessages is triggered when we receive new messages // EventNewMessages is triggered when we receive new messages
EventNewMessages = "messages.new" EventNewMessages = "messages.new"
// EventHistoryRequestStarted is triggered before processing a mailserver batch
EventHistoryRequestStarted = "history.request.started"
// EventHistoryRequestCompleted is triggered after processing a mailserver batch
EventHistoryRequestCompleted = "history.request.completed"
// EventHistoryRequestFailed is triggered when requesting history messages fails
EventHistoryRequestFailed = "history.request.failed"
) )
// EnvelopeSignal includes hash of the envelope. // EnvelopeSignal includes hash of the envelope.
@ -51,6 +60,10 @@ type MailServerResponseSignal struct {
ErrorMsg string `json:"errorMessage"` ErrorMsg string `json:"errorMessage"`
} }
type HistoryMessagesSignal struct {
ErrorMsg string `json:"errorMessage,omitempty"`
}
// DecryptMessageFailedSignal holds the sender of the message that could not be decrypted // DecryptMessageFailedSignal holds the sender of the message that could not be decrypted
type DecryptMessageFailedSignal struct { type DecryptMessageFailedSignal struct {
Sender string `json:"sender"` Sender string `json:"sender"`
@ -103,6 +116,18 @@ func SendEnvelopeExpired(identifiers [][]byte, err error) {
send(EventEnvelopeExpired, EnvelopeSignal{IDs: hexIdentifiers, Message: message}) send(EventEnvelopeExpired, EnvelopeSignal{IDs: hexIdentifiers, Message: message})
} }
func SendHistoricMessagesRequestStarted() {
send(EventHistoryRequestStarted, HistoryMessagesSignal{})
}
func SendHistoricMessagesRequestFailed(err error) {
send(EventHistoryRequestFailed, HistoryMessagesSignal{ErrorMsg: err.Error()})
}
func SendHistoricMessagesRequestCompleted() {
send(EventHistoryRequestCompleted, HistoryMessagesSignal{})
}
// SendMailServerRequestCompleted triggered when mail server response has been received // SendMailServerRequestCompleted triggered when mail server response has been received
func SendMailServerRequestCompleted(requestID types.Hash, lastEnvelopeHash types.Hash, cursor []byte, err error) { func SendMailServerRequestCompleted(requestID types.Hash, lastEnvelopeHash types.Hash, cursor []byte, err error) {
errorMsg := "" errorMsg := ""