chore: process message logs
This commit is contained in:
parent
0456dcef8b
commit
92762f4000
|
@ -3442,7 +3442,9 @@ func (m *Messenger) SyncVerificationRequest(ctx context.Context, vr *verificatio
|
|||
// RetrieveAll retrieves messages from all filters, processes them and returns a
|
||||
// MessengerResponse to the client
|
||||
func (m *Messenger) RetrieveAll() (*MessengerResponse, error) {
|
||||
m.logger.Debug("RetrieveAll start")
|
||||
chatWithMessages, err := m.transport.RetrieveRawAll()
|
||||
m.logger.Debug("RetrieveAll end", zap.Any("chatWithMessages", chatWithMessages))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -3451,6 +3453,7 @@ func (m *Messenger) RetrieveAll() (*MessengerResponse, error) {
|
|||
}
|
||||
|
||||
func (m *Messenger) StartRetrieveMessagesLoop(tick time.Duration, cancel <-chan struct{}) {
|
||||
m.logger.Debug("starting RetrieveMessagesLoop")
|
||||
m.shutdownWaitGroup.Add(1)
|
||||
go func() {
|
||||
defer m.shutdownWaitGroup.Done()
|
||||
|
@ -3468,6 +3471,7 @@ func (m *Messenger) StartRetrieveMessagesLoop(tick time.Duration, cancel <-chan
|
|||
}
|
||||
|
||||
func (m *Messenger) ProcessAllMessages() {
|
||||
m.logger.Debug("ProcessAllMessages")
|
||||
response, err := m.RetrieveAll()
|
||||
if err != nil {
|
||||
m.logger.Error("failed to retrieve raw messages", zap.Error(err))
|
||||
|
@ -3888,6 +3892,9 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
|
|||
messageState := m.buildMessageState()
|
||||
|
||||
logger := m.logger.With(zap.String("site", "RetrieveAll"))
|
||||
logger.Debug("handleRetrievedMessages",
|
||||
zap.Any("chatWithMessages", chatWithMessages),
|
||||
)
|
||||
|
||||
controlledCommunitiesChatIDs, err := m.communitiesManager.GetOwnedCommunitiesChatIDs()
|
||||
if err != nil {
|
||||
|
|
|
@ -2134,6 +2134,13 @@ func (m *Messenger) handleChatMessage(state *ReceivedMessageState, forceSeen boo
|
|||
WhisperTimestamp: state.CurrentMessageState.WhisperTimestamp,
|
||||
}
|
||||
|
||||
logger.Debug("received chat message",
|
||||
zap.String("messageID", receivedMessage.ID),
|
||||
zap.String("from", receivedMessage.From),
|
||||
zap.Any("contentType", receivedMessage.ContentType),
|
||||
zap.Bool("forceSeen", forceSeen),
|
||||
)
|
||||
|
||||
// is the message coming from us?
|
||||
isSyncMessage := common.IsPubKeyEqual(receivedMessage.SigPubKey, &m.identity.PublicKey)
|
||||
|
||||
|
|
|
@ -373,7 +373,11 @@ func (r *storeNodeRequest) finalize() {
|
|||
func (r *storeNodeRequest) shouldFetchNextPage(envelopesCount int) (bool, uint32) {
|
||||
logger := r.manager.logger.With(
|
||||
zap.Any("requestID", r.requestID),
|
||||
zap.Int("envelopesCount", envelopesCount))
|
||||
zap.Int("envelopesCount", envelopesCount),
|
||||
zap.String("contentTopic", r.contentTopic.String()),
|
||||
)
|
||||
|
||||
logger.Debug("shouldFetchNextPage")
|
||||
|
||||
r.result.stats.FetchedEnvelopesCount += envelopesCount
|
||||
r.result.stats.FetchedPagesCount++
|
||||
|
|
|
@ -232,8 +232,10 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) {
|
|||
logger.Warn("failed to fetch messages", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
// Don't pull from filters we don't listen to
|
||||
if !filter.Listen {
|
||||
logger.Debug("skip filter as not listened", zap.String("filterID", filter.FilterID))
|
||||
for _, msg := range msgs {
|
||||
t.waku.MarkP2PMessageAsProcessed(common.BytesToHash(msg.Hash))
|
||||
}
|
||||
|
@ -250,6 +252,11 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) {
|
|||
ids[i] = id
|
||||
}
|
||||
|
||||
logger.Debug("retrieved raw messages",
|
||||
zap.String("filterID", filter.FilterID),
|
||||
zap.Any("ids", ids),
|
||||
)
|
||||
|
||||
hits, err := t.cache.Hits(ids)
|
||||
if err != nil {
|
||||
logger.Error("failed to check messages exists", zap.Error(err))
|
||||
|
|
|
@ -248,6 +248,7 @@ func (fs *Filters) NotifyWatchers(recvMessage *ReceivedMessage) bool {
|
|||
if matched && decodedMsg != nil {
|
||||
log.Debug("processing message: decrypted", "envelopeHash", recvMessage.Hash().Hex())
|
||||
if watcher.Src == nil || IsPubKeyEqual(decodedMsg.Src, watcher.Src) {
|
||||
log.Debug("processing message: triggering watcher", "envelopeHash", recvMessage.Hash().Hex())
|
||||
watcher.Trigger(decodedMsg)
|
||||
}
|
||||
}
|
||||
|
@ -276,6 +277,10 @@ func (f *Filter) Trigger(msg *ReceivedMessage) {
|
|||
// to a filter.
|
||||
func (f *Filter) Retrieve() []*ReceivedMessage {
|
||||
msgs, err := f.Messages.Pop()
|
||||
log.Debug("Retrieve messages from filter",
|
||||
"filter", f.id,
|
||||
"msgsCount", len(msgs),
|
||||
)
|
||||
if err != nil {
|
||||
log.Error("failed to retrieve messages from filter store", "error", err)
|
||||
return nil
|
||||
|
|
|
@ -1622,6 +1622,8 @@ func (w *Waku) processMessage(e *common.ReceivedMessage) {
|
|||
zap.Int64("timestamp", e.Envelope.Message().GetTimestamp()),
|
||||
)
|
||||
|
||||
logger.Debug("waku.processMessage")
|
||||
|
||||
if e.MsgType == common.StoreMessageType {
|
||||
// We need to insert it first, and then remove it if not matched,
|
||||
// as messages are processed asynchronously
|
||||
|
|
Loading…
Reference in New Issue