fix(store): generate signals per storenode request with custom requestIDs

This commit is contained in:
Richard Ramos 2022-11-24 17:09:17 -04:00 committed by RichΛrd
parent ee6bda5bcc
commit eaced1c1e9
6 changed files with 43 additions and 67 deletions

View File

@ -185,7 +185,6 @@ func (w *gethWakuV2Wrapper) RequestStoreMessages(peerID []byte, r types.Messages
return nil, err
}
options = []store.HistoryRequestOption{
store.WithPeer(peer),
store.WithPaging(false, uint64(r.Limit)),
}
@ -203,7 +202,7 @@ func (w *gethWakuV2Wrapper) RequestStoreMessages(peerID []byte, r types.Messages
topics = append(topics, wakucommon.BytesToTopic(topic))
}
pbCursor, err := w.waku.Query(topics, uint64(r.From), uint64(r.To), options)
pbCursor, err := w.waku.Query(peer, topics, uint64(r.From), uint64(r.To), options)
if err != nil {
return nil, err
}

View File

@ -32,10 +32,9 @@ type MessengerSignalsHandler interface {
MessageDelivered(chatID string, messageID string)
CommunityInfoFound(community *communities.Community)
MessengerResponse(response *MessengerResponse)
HistoryRequestStarted(requestID string, numBatches int)
HistoryRequestBatchProcessed(requestID string, batchIndex int, batchNum int)
HistoryRequestCompleted(requestID string)
HistoryRequestFailed(requestID string, err error)
HistoryRequestStarted(numBatches int)
HistoryRequestCompleted()
BackupPerformed(uint64)
HistoryArchivesProtocolEnabled()
HistoryArchivesProtocolDisabled()

View File

@ -6,7 +6,6 @@ import (
"sort"
"time"
"github.com/pborman/uuid"
"github.com/pkg/errors"
"go.uber.org/zap"
@ -440,12 +439,9 @@ func (m *Messenger) syncFiltersFrom(filters []*transport.Filter, lastRequest uin
syncedTopics = append(syncedTopics, topicData)
}
requestID := uuid.NewRandom().String()
m.logger.Debug("syncing topics", zap.Any("batches", batches), zap.Any("requestId", requestID))
m.logger.Debug("syncing topics", zap.Any("batches", batches))
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestStarted(requestID, len(batches))
m.config.messengerSignalsHandler.HistoryRequestStarted(len(batches))
}
batchKeys := make([]int, 0, len(batches))
@ -460,21 +456,14 @@ func (m *Messenger) syncFiltersFrom(filters []*transport.Filter, lastRequest uin
i++
err := m.processMailserverBatch(batch)
if err != nil {
m.logger.Error("error syncing topics", zap.Any("requestId", requestID), zap.Error(err))
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestFailed(requestID, err)
}
m.logger.Error("error syncing topics", zap.Error(err))
return nil, err
}
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestBatchProcessed(requestID, i, len(batches))
}
}
m.logger.Debug("topics synced")
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestCompleted(requestID)
m.config.messengerSignalsHandler.HistoryRequestCompleted()
}
err = m.mailserversDatabase.AddTopics(syncedTopics)
@ -675,27 +664,18 @@ func (m *Messenger) SyncChatFromSyncedFrom(chatID string) (uint32, error) {
From: chat.SyncedFrom - defaultSyncPeriod,
Topics: topics,
}
requestID := uuid.NewRandom().String()
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestStarted(requestID, 1)
m.config.messengerSignalsHandler.HistoryRequestStarted(1)
}
err = m.processMailserverBatch(batch)
if err != nil {
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestFailed(requestID, err)
}
return nil, err
}
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestBatchProcessed(requestID, 1, 1)
m.config.messengerSignalsHandler.HistoryRequestCompleted(requestID)
m.config.messengerSignalsHandler.HistoryRequestCompleted()
}
if chat.SyncedFrom == 0 || chat.SyncedFrom > batch.From {
chat.SyncedFrom = batch.From
}
@ -752,23 +732,17 @@ func (m *Messenger) FillGaps(chatID string, messageIDs []string) error {
Topics: topics,
}
requestID := uuid.NewRandom().String()
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestStarted(requestID, 1)
m.config.messengerSignalsHandler.HistoryRequestStarted(1)
}
err = m.processMailserverBatch(batch)
if err != nil {
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestFailed(requestID, err)
}
return err
}
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestBatchProcessed(requestID, 1, 1)
m.config.messengerSignalsHandler.HistoryRequestCompleted(requestID)
m.config.messengerSignalsHandler.HistoryRequestCompleted()
}
return m.persistence.DeleteMessages(messageIDs)

View File

@ -72,20 +72,12 @@ func (m *MessengerSignalsHandler) MessengerResponse(response *protocol.Messenger
PublisherSignalHandler{}.NewMessages(response)
}
func (m *MessengerSignalsHandler) HistoryRequestStarted(requestID string, numBatches int) {
signal.SendHistoricMessagesRequestStarted(requestID, numBatches)
func (m *MessengerSignalsHandler) HistoryRequestStarted(numBatches int) {
signal.SendHistoricMessagesRequestStarted(numBatches)
}
func (m *MessengerSignalsHandler) HistoryRequestBatchProcessed(requestID string, batchIndex int, numBatches int) {
signal.SendHistoricMessagesRequestBatchProcessed(requestID, batchIndex, numBatches)
}
func (m *MessengerSignalsHandler) HistoryRequestFailed(requestID string, err error) {
signal.SendHistoricMessagesRequestFailed(requestID, err)
}
func (m *MessengerSignalsHandler) HistoryRequestCompleted(requestID string) {
signal.SendHistoricMessagesRequestCompleted(requestID)
func (m *MessengerSignalsHandler) HistoryRequestCompleted() {
signal.SendHistoricMessagesRequestCompleted()
}
func (m *MessengerSignalsHandler) HistoryArchivesProtocolEnabled() {

View File

@ -4,6 +4,8 @@ import (
"encoding/hex"
"encoding/json"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/status-im/status-go/eth-node/types"
)
@ -34,18 +36,18 @@ const (
// EventNewMessages is triggered when we receive new messages
EventNewMessages = "messages.new"
// EventHistoryRequestStarted is triggered before processing a mailserver batch
// EventHistoryRequestStarted is triggered before processing a store request
EventHistoryRequestStarted = "history.request.started"
// EventHistoryBatchProcessed is triggered after processing a mailserver batch
EventHistoryBatchProcessed = "history.request.batch.processed"
// EventHistoryRequestCompleted is triggered after processing all mailserver batches
// EventHistoryRequestCompleted is triggered after processing all storenode requests
EventHistoryRequestCompleted = "history.request.completed"
// EventHistoryRequestFailed is triggered when requesting history messages fails
EventHistoryRequestFailed = "history.request.failed"
// EventHistoryRequestFailed is triggered when requesting history messages succeeds
EventHistoryRequestSuccess = "history.request.success"
// EventBackupPerformed is triggered when a backup has been performed
EventBackupPerformed = "backup.performed"
@ -79,6 +81,7 @@ type MailServerResponseSignal struct {
type HistoryMessagesSignal struct {
RequestID string `json:"requestId"`
PeerID string `json:"peerId"`
BatchIndex int `json:"batchIndex"`
NumBatches int `json:"numBatches,omitempty"`
ErrorMsg string `json:"errorMessage,omitempty"`
@ -147,20 +150,20 @@ func SendEnvelopeExpired(identifiers [][]byte, err error) {
send(EventEnvelopeExpired, EnvelopeSignal{IDs: hexIdentifiers, Message: message})
}
func SendHistoricMessagesRequestStarted(requestID string, numBatches int) {
send(EventHistoryRequestStarted, HistoryMessagesSignal{RequestID: requestID, NumBatches: numBatches})
func SendHistoricMessagesRequestStarted(numBatches int) {
send(EventHistoryRequestStarted, HistoryMessagesSignal{NumBatches: numBatches})
}
func SendHistoricMessagesRequestBatchProcessed(requestID string, batchIndex int, numBatches int) {
send(EventHistoryBatchProcessed, HistoryMessagesSignal{RequestID: requestID, BatchIndex: batchIndex, NumBatches: numBatches})
func SendHistoricMessagesRequestFailed(requestID []byte, peerID peer.ID, err error) {
send(EventHistoryRequestFailed, HistoryMessagesSignal{RequestID: hex.EncodeToString(requestID), PeerID: peerID.String(), ErrorMsg: err.Error()})
}
func SendHistoricMessagesRequestFailed(requestID string, err error) {
send(EventHistoryRequestFailed, HistoryMessagesSignal{RequestID: requestID, ErrorMsg: err.Error()})
func SendHistoricMessagesRequestSuccess(requestID []byte, peerID peer.ID) {
send(EventHistoryRequestSuccess, HistoryMessagesSignal{RequestID: hex.EncodeToString(requestID), PeerID: peerID.String()})
}
func SendHistoricMessagesRequestCompleted(requestID string) {
send(EventHistoryRequestCompleted, HistoryMessagesSignal{RequestID: requestID})
func SendHistoricMessagesRequestCompleted() {
send(EventHistoryRequestCompleted, HistoryMessagesSignal{})
}
func SendUpdateAvailable(available bool, latestVersion string, url string) {

View File

@ -931,12 +931,17 @@ func (w *Waku) Send(msg *pb.WakuMessage) ([]byte, error) {
return hash, nil
}
func (w *Waku) Query(topics []common.TopicType, from uint64, to uint64, opts []store.HistoryRequestOption) (cursor *pb.Index, err error) {
func (w *Waku) Query(peerID peer.ID, topics []common.TopicType, from uint64, to uint64, opts []store.HistoryRequestOption) (cursor *pb.Index, err error) {
strTopics := make([]string, len(topics))
for i, t := range topics {
strTopics[i] = t.ContentTopic()
}
requestID := protocol.GenerateRequestId()
opts = append(opts, store.WithRequestId(requestID))
opts = append(opts, store.WithPeer(peerID))
query := store.Query{
StartTime: int64(from) * int64(time.Second),
EndTime: int64(to) * int64(time.Second),
@ -949,9 +954,13 @@ func (w *Waku) Query(topics []common.TopicType, from uint64, to uint64, opts []s
result, err := w.node.Store().Query(ctx, query, opts...)
if err != nil {
return
w.logger.Error("error querying storenode", zap.String("peerID", peerID.String()), zap.Error(err))
signal.SendHistoricMessagesRequestFailed(requestID, peerID, err)
return nil, err
}
signal.SendHistoricMessagesRequestSuccess(requestID, peerID)
for _, msg := range result.Messages {
envelope := protocol.NewEnvelope(msg, msg.Timestamp, relay.DefaultWakuTopic)
w.logger.Info("received waku2 store message", zap.Any("envelopeHash", hexutil.Encode(envelope.Hash())))