diff --git a/VERSION b/VERSION index 32aac53c3..b0019c4d9 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.89.15 +0.89.16 diff --git a/protocol/common/message_sender.go b/protocol/common/message_sender.go index 1268eba6b..2c34c7757 100644 --- a/protocol/common/message_sender.go +++ b/protocol/common/message_sender.go @@ -243,6 +243,8 @@ func (s *MessageSender) sendCommunity( return nil, errors.Wrap(err, "failed to send a message spec") } + s.logger.Debug("sent community message ", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hash))) + s.transport.Track(messageIDs, hash, newMessage) return messageID, nil @@ -297,6 +299,8 @@ func (s *MessageSender) sendPrivate( return nil, errors.Wrap(err, "failed to send a message spec") } + s.logger.Debug("sent private message skipEncryption", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hash))) + s.transport.Track(messageIDs, hash, newMessage) } else { @@ -322,6 +326,8 @@ func (s *MessageSender) sendPrivate( return nil, errors.Wrap(err, "failed to send a message spec") } + s.logger.Debug("sent private message without datasync", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hash))) + s.transport.Track(messageIDs, hash, newMessage) } @@ -459,6 +465,8 @@ func (s *MessageSender) SendPublic( return nil, err } + s.logger.Debug("sent public message", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hash))) + sentMessage := &SentMessage{ Spec: messageSpec, MessageIDs: [][]byte{messageID}, @@ -640,8 +648,11 @@ func (s *MessageSender) addToDataSync(publicKey *ecdsa.PublicKey, message []byte func (s *MessageSender) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, marshalledDatasyncPayload []byte, payload *datasyncproto.Payload) error { // Calculate the messageIDs messageIDs := make([][]byte, 0, len(payload.Messages)) + hexMessageIDs := make([]string, 0, len(payload.Messages)) for _, payload := range payload.Messages { - messageIDs = append(messageIDs, v1protocol.MessageID(&s.identity.PublicKey, payload.Body)) + mid := v1protocol.MessageID(&s.identity.PublicKey, payload.Body) + messageIDs = append(messageIDs, mid) + hexMessageIDs = append(hexMessageIDs, mid.String()) } messageSpec, err := s.protocol.BuildDirectMessage(s.identity, publicKey, marshalledDatasyncPayload) @@ -665,6 +676,8 @@ func (s *MessageSender) sendDataSync(ctx context.Context, publicKey *ecdsa.Publi return err } + s.logger.Debug("sent private messages", zap.Any("messageIDs", hexMessageIDs), zap.String("hash", types.EncodeHex(hash))) + s.transport.Track(messageIDs, hash, newMessage) return nil diff --git a/protocol/encryption/protocol.go b/protocol/encryption/protocol.go index 95812795c..093fc9509 100644 --- a/protocol/encryption/protocol.go +++ b/protocol/encryption/protocol.go @@ -357,7 +357,7 @@ func (p *Protocol) GetPublicBundle(theirIdentityKey *ecdsa.PublicKey) (*Bundle, // ConfirmMessageProcessed confirms and deletes message keys for the given messages func (p *Protocol) ConfirmMessageProcessed(messageID []byte) error { logger := p.logger.With(zap.String("site", "ConfirmMessageProcessed")) - logger.Debug("confirming message", zap.Binary("message-id", messageID)) + logger.Debug("confirming message", zap.String("messageID", types.EncodeHex(messageID))) return p.encryptor.ConfirmMessageProcessed(messageID) } @@ -381,7 +381,7 @@ func (p *Protocol) HandleMessage( zap.String("sender-public-key", types.EncodeHex(crypto.FromECDSAPub(theirPublicKey))), zap.String("my-installation-id", p.encryptor.config.InstallationID), - zap.String("message-id", types.EncodeHex(messageID))) + zap.String("messageID", types.EncodeHex(messageID))) if p.encryptor == nil { return nil, errors.New("encryption service not initialized") diff --git a/protocol/messenger.go b/protocol/messenger.go index 8b6ec554a..146547874 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -2237,6 +2237,7 @@ func (m *Messenger) sendChatMessage(ctx context.Context, message *common.Message response.SetMessages(msg) response.AddChat(chat) + m.logger.Debug("sent message", zap.String("id", message.ID)) return &response, m.saveChat(chat) } @@ -2677,6 +2678,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte for _, messages := range chatWithMessages { var processedMessages []string for _, shhMessage := range messages { + logger := logger.With(zap.String("hash", types.EncodeHex(shhMessage.Hash))) // Indicates tha all messages in the batch have been processed correctly allMessagesProcessed := true statusMessages, acks, err := m.sender.HandleMessages(shhMessage, true) @@ -2689,6 +2691,8 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte logger.Debug("processing messages further", zap.Int("count", len(statusMessages))) for _, msg := range statusMessages { + logger := logger.With(zap.String("message-id", msg.ID.String())) + logger.Debug("processing message") publicKey := msg.SigPubKey() m.handleInstallations(msg.Installations) diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index 8c32020ce..2144904f2 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -261,7 +261,7 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse syncedTopics = append(syncedTopics, topicData) } - m.logger.Info("syncing topics", zap.Any("batches", batches)) + m.logger.Debug("syncing topics") if m.config.messengerSignalsHandler != nil { m.config.messengerSignalsHandler.HistoryRequestStarted() @@ -270,7 +270,7 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse for _, batch := range batches { err := m.processMailserverBatch(batch) if err != nil { - m.logger.Info("error syncing topics", zap.Any("error", err)) + m.logger.Error("error syncing topics", zap.Error(err)) if m.config.messengerSignalsHandler != nil { m.config.messengerSignalsHandler.HistoryRequestFailed(err) } @@ -278,7 +278,7 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse } } - m.logger.Info("topics synced") + m.logger.Debug("topics synced") if m.config.messengerSignalsHandler != nil { m.config.messengerSignalsHandler.HistoryRequestCompleted() } @@ -364,7 +364,12 @@ func (m *Messenger) calculateGapForChat(chat *Chat, from uint32) (*common.Messag } func (m *Messenger) processMailserverBatch(batch MailserverBatch) error { - m.logger.Info("syncing topic", zap.Any("topic", batch.Topics), zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(batch.To))) + var topicStrings []string + for _, t := range batch.Topics { + topicStrings = append(topicStrings, t.String()) + } + logger := m.logger.With(zap.Any("chatIDs", batch.ChatIDs), zap.String("fromString", time.Unix(int64(batch.From), 0).Format(time.RFC3339)), zap.String("toString", time.Unix(int64(batch.To), 0).Format(time.RFC3339)), zap.Any("topic", topicStrings), zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(batch.To))) + logger.Info("syncing topic") ctx, cancel := context.WithTimeout(context.Background(), mailserverRequestTimeout) defer cancel() @@ -373,7 +378,7 @@ func (m *Messenger) processMailserverBatch(batch MailserverBatch) error { return err } for len(cursor) != 0 || storeCursor != nil { - m.logger.Info("retrieved cursor", zap.Any("cursor", cursor)) + logger.Info("retrieved cursor", zap.String("cursor", types.EncodeHex(cursor))) ctx, cancel := context.WithTimeout(context.Background(), mailserverRequestTimeout) defer cancel() @@ -382,7 +387,7 @@ func (m *Messenger) processMailserverBatch(batch MailserverBatch) error { return err } } - m.logger.Info("synced topic", zap.Any("topic", batch.Topics), zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(batch.To))) + logger.Info("synced topic") return nil } @@ -439,6 +444,8 @@ func (m *Messenger) SyncChatFromSyncedFrom(chatID string) (uint32, error) { chat.SyncedFrom = batch.From } + m.logger.Debug("setting sync timestamps", zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(chat.SyncedTo)), zap.String("chatID", chatID)) + err = m.persistence.SetSyncTimestamps(batch.From, chat.SyncedTo, chat.ID) if err != nil { return 0, err diff --git a/protocol/pushnotificationclient/client.go b/protocol/pushnotificationclient/client.go index 1845ad006..293513e69 100644 --- a/protocol/pushnotificationclient/client.go +++ b/protocol/pushnotificationclient/client.go @@ -551,7 +551,7 @@ func (c *Client) HandleContactCodeAdvertisement(clientPublicKey *ecdsa.PublicKey // HandlePushNotificationResponse should set the request as processed func (c *Client) HandlePushNotificationResponse(serverKey *ecdsa.PublicKey, response protobuf.PushNotificationResponse) error { messageID := response.MessageId - c.config.Logger.Debug("received response for", zap.Binary("message-id", messageID)) + c.config.Logger.Debug("received response for", zap.String("messageID", types.EncodeHex(messageID))) for _, report := range response.Reports { c.config.Logger.Debug("received response", zap.Any("report", report)) err := c.persistence.UpdateNotificationResponse(messageID, report) @@ -985,7 +985,7 @@ func (c *Client) handleDirectMessageSent(sentMessage *common.SentMessage) error return nil } - c.config.Logger.Debug("actionable messages", zap.Any("message-ids", trackedMessageIDs), zap.Any("installation-ids", installationIDs)) + c.config.Logger.Debug("actionable messages", zap.Any("messageIDs", trackedMessageIDs), zap.Any("installation-ids", installationIDs)) // Get message to check chatID. Again we use the first message for simplicity, but we should send one for each chatID. Messages though are very rarely batched. message, err := c.getMessage(types.EncodeHex(trackedMessageIDs[0])) diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 86de35e9a..74bb09495 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -215,6 +215,7 @@ func (t *Transport) GetStats() types.StatsSummary { func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) { result := make(map[Filter][]*types.Message) + logger := t.logger.With(zap.String("site", "retrieveRawAll")) allFilters := t.filters.Filters() for _, filter := range allFilters { @@ -224,7 +225,7 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) { } msgs, err := t.api.GetFilterMessages(filter.FilterID) if err != nil { - t.logger.Warn("failed to fetch messages", zap.Error(err)) + logger.Warn("failed to fetch messages", zap.Error(err)) continue } if len(msgs) == 0 { @@ -239,7 +240,7 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) { hits, err := t.cache.Hits(ids) if err != nil { - t.logger.Error("failed to check messages exists", zap.Error(err)) + logger.Error("failed to check messages exists", zap.Error(err)) return nil, err } @@ -247,6 +248,10 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) { // Exclude anything that is a cache hit if !hits[types.EncodeHex(msgs[i].Hash)] { result[*filter] = append(result[*filter], msgs[i]) + logger.Debug("message not cached", zap.String("hash", types.EncodeHex(msgs[i].Hash))) + } else { + logger.Debug("message cached", zap.String("hash", types.EncodeHex(msgs[i].Hash))) + } } diff --git a/waku/api.go b/waku/api.go index 3f8226051..c6be44c70 100644 --- a/waku/api.go +++ b/waku/api.go @@ -26,6 +26,8 @@ import ( "sync" "time" + "go.uber.org/zap" + "github.com/status-im/status-go/waku/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -481,6 +483,8 @@ func toMessage(messages []*common.ReceivedMessage) []*Message { // GetFilterMessages returns the messages that match the filter criteria and // are received between the last poll and now. func (api *PublicWakuAPI) GetFilterMessages(id string) ([]*Message, error) { + logger := api.w.logger.With(zap.String("site", "getFilterMessages"), zap.String("filterId", id)) + logger.Debug("retrieving filter messages") api.mu.Lock() f := api.w.GetFilter(id) if f == nil { @@ -493,6 +497,8 @@ func (api *PublicWakuAPI) GetFilterMessages(id string) ([]*Message, error) { receivedMessages := f.Retrieve() messages := make([]*Message, 0, len(receivedMessages)) for _, msg := range receivedMessages { + + logger.Debug("retrieved filter message", zap.String("hash", msg.EnvelopeHash.String()), zap.Bool("isP2P", msg.P2P), zap.String("topic", msg.Topic.String())) messages = append(messages, ToWakuMessage(msg)) } diff --git a/waku/waku.go b/waku/waku.go index ae2345f15..94601b398 100644 --- a/waku/waku.go +++ b/waku/waku.go @@ -1053,6 +1053,7 @@ func (w *Waku) UnsubscribeMany(ids []string) error { // Send injects a message into the waku send queue, to be distributed in the // network in the coming cycles. func (w *Waku) Send(envelope *common.Envelope) error { + w.logger.Debug("send: sending envelope", zap.String("hash", envelope.Hash().String())) ok, err := w.add(envelope, false) if err == nil && !ok { return fmt.Errorf("failed to add envelope") @@ -1326,12 +1327,15 @@ func (w *Waku) addEnvelope(envelope *common.Envelope) { func (w *Waku) addAndBridge(envelope *common.Envelope, isP2P bool, bridged bool) (bool, error) { now := uint32(w.timeSource().Unix()) sent := envelope.Expiry - envelope.TTL + logger := w.logger.With(zap.String("hash", envelope.Hash().String()), zap.String("site", "addAndBridge"), zap.String("topic", envelope.Topic.String()), zap.Bool("isP2P", isP2P)) + + logger.Debug("addAndBridge: processing envelope") common.EnvelopesReceivedCounter.Inc() if sent > now { if sent-common.DefaultSyncAllowance > now { common.EnvelopesCacheFailedCounter.WithLabelValues("in_future").Inc() - w.logger.Warn("envelope created in the future", zap.ByteString("hash", envelope.Hash().Bytes())) + logger.Warn("envelope created in the future") return false, common.TimeSyncError(errors.New("envelope from future")) } // recalculate PoW, adjusted for the time difference, plus one second for latency @@ -1341,17 +1345,17 @@ func (w *Waku) addAndBridge(envelope *common.Envelope, isP2P bool, bridged bool) if envelope.Expiry < now { if envelope.Expiry+common.DefaultSyncAllowance*2 < now { common.EnvelopesCacheFailedCounter.WithLabelValues("very_old").Inc() - w.logger.Warn("very old envelope", zap.ByteString("hash", envelope.Hash().Bytes())) + logger.Warn("very old envelope") return false, common.TimeSyncError(errors.New("very old envelope")) } - w.logger.Debug("expired envelope dropped", zap.ByteString("hash", envelope.Hash().Bytes())) + logger.Debug("expired envelope dropped") common.EnvelopesCacheFailedCounter.WithLabelValues("expired").Inc() return false, nil // drop envelope without error } if uint32(envelope.Size()) > w.MaxMessageSize() { common.EnvelopesCacheFailedCounter.WithLabelValues("oversized").Inc() - return false, fmt.Errorf("huge messages are not allowed [%x][%d][%d]", envelope.Hash(), envelope.Size(), w.MaxMessageSize()) + return false, fmt.Errorf("huge messages are not allowed [%s][%d][%d]", envelope.Hash().String(), envelope.Size(), w.MaxMessageSize()) } if envelope.PoW() < w.MinPow() { @@ -1360,7 +1364,7 @@ func (w *Waku) addAndBridge(envelope *common.Envelope, isP2P bool, bridged bool) // for a short period of peer synchronization. if envelope.PoW() < w.MinPowTolerance() { common.EnvelopesCacheFailedCounter.WithLabelValues("low_pow").Inc() - return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex()) + return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%s]", envelope.PoW(), envelope.Hash().String()) } } @@ -1370,6 +1374,7 @@ func (w *Waku) addAndBridge(envelope *common.Envelope, isP2P bool, bridged bool) } if !match { + logger.Debug("addAndBridge: no matches for envelope") return false, nil } @@ -1379,10 +1384,12 @@ func (w *Waku) addAndBridge(envelope *common.Envelope, isP2P bool, bridged bool) _, alreadyCached := w.envelopes[hash] w.poolMu.Unlock() if !alreadyCached { + logger.Debug("addAndBridge: adding envelope") w.addEnvelope(envelope) } if alreadyCached { + logger.Debug("addAndBridge: already cached") common.EnvelopesCachedCounter.WithLabelValues("hit").Inc() } else { common.EnvelopesCachedCounter.WithLabelValues("miss").Inc() @@ -1400,7 +1407,7 @@ func (w *Waku) addAndBridge(envelope *common.Envelope, isP2P bool, bridged bool) // In particular, if a node is a lightweight node, // it should not bridge any envelopes. if !isP2P && !bridged && w.bridge != nil { - w.logger.Debug("bridging envelope from Waku", zap.ByteString("hash", envelope.Hash().Bytes())) + logger.Debug("bridging envelope from Waku") _, in := w.bridge.Pipe() in <- envelope common.BridgeSent.Inc() @@ -1483,12 +1490,14 @@ func (w *Waku) update() { func (w *Waku) expire() { w.poolMu.Lock() defer w.poolMu.Unlock() + logger := w.logger.With(zap.String("site", "expire")) now := uint32(w.timeSource().Unix()) for expiry, hashSet := range w.expirations { if expiry < now { // Dump all expired messages and remove timestamp hashSet.Each(func(v interface{}) bool { + logger.Debug("expiring envelope", zap.String("hash", v.(gethcommon.Hash).String())) delete(w.envelopes, v.(gethcommon.Hash)) common.EnvelopesCachedCounter.WithLabelValues("clear").Inc() w.envelopeFeed.Send(common.EnvelopeEvent{