diff --git a/protocol/messenger.go b/protocol/messenger.go index cd38de92e..b979c8815 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -3442,7 +3442,9 @@ func (m *Messenger) SyncVerificationRequest(ctx context.Context, vr *verificatio // RetrieveAll retrieves messages from all filters, processes them and returns a // MessengerResponse to the client func (m *Messenger) RetrieveAll() (*MessengerResponse, error) { + m.logger.Debug("RetrieveAll start") chatWithMessages, err := m.transport.RetrieveRawAll() + m.logger.Debug("RetrieveAll end", zap.Any("chatWithMessages", chatWithMessages)) if err != nil { return nil, err } @@ -3451,6 +3453,7 @@ func (m *Messenger) RetrieveAll() (*MessengerResponse, error) { } func (m *Messenger) StartRetrieveMessagesLoop(tick time.Duration, cancel <-chan struct{}) { + m.logger.Debug("starting RetrieveMessagesLoop") m.shutdownWaitGroup.Add(1) go func() { defer m.shutdownWaitGroup.Done() @@ -3468,6 +3471,7 @@ func (m *Messenger) StartRetrieveMessagesLoop(tick time.Duration, cancel <-chan } func (m *Messenger) ProcessAllMessages() { + m.logger.Debug("ProcessAllMessages") response, err := m.RetrieveAll() if err != nil { m.logger.Error("failed to retrieve raw messages", zap.Error(err)) @@ -3888,6 +3892,9 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte messageState := m.buildMessageState() logger := m.logger.With(zap.String("site", "RetrieveAll")) + logger.Debug("handleRetrievedMessages", + zap.Any("chatWithMessages", chatWithMessages), + ) controlledCommunitiesChatIDs, err := m.communitiesManager.GetOwnedCommunitiesChatIDs() if err != nil { diff --git a/protocol/messenger_handler.go b/protocol/messenger_handler.go index 9b0c90505..201744541 100644 --- a/protocol/messenger_handler.go +++ b/protocol/messenger_handler.go @@ -2134,6 +2134,13 @@ func (m *Messenger) handleChatMessage(state *ReceivedMessageState, forceSeen boo WhisperTimestamp: state.CurrentMessageState.WhisperTimestamp, } + logger.Debug("received chat message", + zap.String("messageID", receivedMessage.ID), + zap.String("from", receivedMessage.From), + zap.Any("contentType", receivedMessage.ContentType), + zap.Bool("forceSeen", forceSeen), + ) + // is the message coming from us? isSyncMessage := common.IsPubKeyEqual(receivedMessage.SigPubKey, &m.identity.PublicKey) diff --git a/protocol/messenger_store_node_request_manager.go b/protocol/messenger_store_node_request_manager.go index ff770710a..0dbf9bece 100644 --- a/protocol/messenger_store_node_request_manager.go +++ b/protocol/messenger_store_node_request_manager.go @@ -373,7 +373,11 @@ func (r *storeNodeRequest) finalize() { func (r *storeNodeRequest) shouldFetchNextPage(envelopesCount int) (bool, uint32) { logger := r.manager.logger.With( zap.Any("requestID", r.requestID), - zap.Int("envelopesCount", envelopesCount)) + zap.Int("envelopesCount", envelopesCount), + zap.String("contentTopic", r.contentTopic.String()), + ) + + logger.Debug("shouldFetchNextPage") r.result.stats.FetchedEnvelopesCount += envelopesCount r.result.stats.FetchedPagesCount++ diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 37c860f76..bb44f0f32 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -232,8 +232,10 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) { logger.Warn("failed to fetch messages", zap.Error(err)) continue } + // Don't pull from filters we don't listen to if !filter.Listen { + logger.Debug("skip filter as not listened", zap.String("filterID", filter.FilterID)) for _, msg := range msgs { t.waku.MarkP2PMessageAsProcessed(common.BytesToHash(msg.Hash)) } @@ -250,6 +252,11 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) { ids[i] = id } + logger.Debug("retrieved raw messages", + zap.String("filterID", filter.FilterID), + zap.Any("ids", ids), + ) + hits, err := t.cache.Hits(ids) if err != nil { logger.Error("failed to check messages exists", zap.Error(err)) diff --git a/wakuv2/common/filter.go b/wakuv2/common/filter.go index f828d86c9..20e4e5818 100644 --- a/wakuv2/common/filter.go +++ b/wakuv2/common/filter.go @@ -248,6 +248,7 @@ func (fs *Filters) NotifyWatchers(recvMessage *ReceivedMessage) bool { if matched && decodedMsg != nil { log.Debug("processing message: decrypted", "envelopeHash", recvMessage.Hash().Hex()) if watcher.Src == nil || IsPubKeyEqual(decodedMsg.Src, watcher.Src) { + log.Debug("processing message: triggering watcher", "envelopeHash", recvMessage.Hash().Hex()) watcher.Trigger(decodedMsg) } } @@ -276,6 +277,10 @@ func (f *Filter) Trigger(msg *ReceivedMessage) { // to a filter. func (f *Filter) Retrieve() []*ReceivedMessage { msgs, err := f.Messages.Pop() + log.Debug("Retrieve messages from filter", + "filter", f.id, + "msgsCount", len(msgs), + ) if err != nil { log.Error("failed to retrieve messages from filter store", "error", err) return nil diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 149379805..71a98f30a 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -1622,6 +1622,8 @@ func (w *Waku) processMessage(e *common.ReceivedMessage) { zap.Int64("timestamp", e.Envelope.Message().GetTimestamp()), ) + logger.Debug("waku.processMessage") + if e.MsgType == common.StoreMessageType { // We need to insert it first, and then remove it if not matched, // as messages are processed asynchronously