Add logging of message-id

This commit is contained in:
Andrea Maria Piana 2021-10-29 15:29:28 +01:00
parent 9693d59e61
commit 2d13fa1e25
9 changed files with 64 additions and 20 deletions

View File

@ -1 +1 @@
0.89.15
0.89.16

View File

@ -243,6 +243,8 @@ func (s *MessageSender) sendCommunity(
return nil, errors.Wrap(err, "failed to send a message spec")
}
s.logger.Debug("sent community message ", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hash)))
s.transport.Track(messageIDs, hash, newMessage)
return messageID, nil
@ -297,6 +299,8 @@ func (s *MessageSender) sendPrivate(
return nil, errors.Wrap(err, "failed to send a message spec")
}
s.logger.Debug("sent private message skipEncryption", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hash)))
s.transport.Track(messageIDs, hash, newMessage)
} else {
@ -322,6 +326,8 @@ func (s *MessageSender) sendPrivate(
return nil, errors.Wrap(err, "failed to send a message spec")
}
s.logger.Debug("sent private message without datasync", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hash)))
s.transport.Track(messageIDs, hash, newMessage)
}
@ -459,6 +465,8 @@ func (s *MessageSender) SendPublic(
return nil, err
}
s.logger.Debug("sent public message", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hash)))
sentMessage := &SentMessage{
Spec: messageSpec,
MessageIDs: [][]byte{messageID},
@ -640,8 +648,11 @@ func (s *MessageSender) addToDataSync(publicKey *ecdsa.PublicKey, message []byte
func (s *MessageSender) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, marshalledDatasyncPayload []byte, payload *datasyncproto.Payload) error {
// Calculate the messageIDs
messageIDs := make([][]byte, 0, len(payload.Messages))
hexMessageIDs := make([]string, 0, len(payload.Messages))
for _, payload := range payload.Messages {
messageIDs = append(messageIDs, v1protocol.MessageID(&s.identity.PublicKey, payload.Body))
mid := v1protocol.MessageID(&s.identity.PublicKey, payload.Body)
messageIDs = append(messageIDs, mid)
hexMessageIDs = append(hexMessageIDs, mid.String())
}
messageSpec, err := s.protocol.BuildDirectMessage(s.identity, publicKey, marshalledDatasyncPayload)
@ -665,6 +676,8 @@ func (s *MessageSender) sendDataSync(ctx context.Context, publicKey *ecdsa.Publi
return err
}
s.logger.Debug("sent private messages", zap.Any("messageIDs", hexMessageIDs), zap.String("hash", types.EncodeHex(hash)))
s.transport.Track(messageIDs, hash, newMessage)
return nil

View File

@ -357,7 +357,7 @@ func (p *Protocol) GetPublicBundle(theirIdentityKey *ecdsa.PublicKey) (*Bundle,
// ConfirmMessageProcessed confirms and deletes message keys for the given messages
func (p *Protocol) ConfirmMessageProcessed(messageID []byte) error {
logger := p.logger.With(zap.String("site", "ConfirmMessageProcessed"))
logger.Debug("confirming message", zap.Binary("message-id", messageID))
logger.Debug("confirming message", zap.String("messageID", types.EncodeHex(messageID)))
return p.encryptor.ConfirmMessageProcessed(messageID)
}
@ -381,7 +381,7 @@ func (p *Protocol) HandleMessage(
zap.String("sender-public-key",
types.EncodeHex(crypto.FromECDSAPub(theirPublicKey))),
zap.String("my-installation-id", p.encryptor.config.InstallationID),
zap.String("message-id", types.EncodeHex(messageID)))
zap.String("messageID", types.EncodeHex(messageID)))
if p.encryptor == nil {
return nil, errors.New("encryption service not initialized")

View File

@ -2237,6 +2237,7 @@ func (m *Messenger) sendChatMessage(ctx context.Context, message *common.Message
response.SetMessages(msg)
response.AddChat(chat)
m.logger.Debug("sent message", zap.String("id", message.ID))
return &response, m.saveChat(chat)
}
@ -2677,6 +2678,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
for _, messages := range chatWithMessages {
var processedMessages []string
for _, shhMessage := range messages {
logger := logger.With(zap.String("hash", types.EncodeHex(shhMessage.Hash)))
// Indicates tha all messages in the batch have been processed correctly
allMessagesProcessed := true
statusMessages, acks, err := m.sender.HandleMessages(shhMessage, true)
@ -2689,6 +2691,8 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
logger.Debug("processing messages further", zap.Int("count", len(statusMessages)))
for _, msg := range statusMessages {
logger := logger.With(zap.String("message-id", msg.ID.String()))
logger.Debug("processing message")
publicKey := msg.SigPubKey()
m.handleInstallations(msg.Installations)

View File

@ -261,7 +261,7 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse
syncedTopics = append(syncedTopics, topicData)
}
m.logger.Info("syncing topics", zap.Any("batches", batches))
m.logger.Debug("syncing topics")
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestStarted()
@ -270,7 +270,7 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse
for _, batch := range batches {
err := m.processMailserverBatch(batch)
if err != nil {
m.logger.Info("error syncing topics", zap.Any("error", err))
m.logger.Error("error syncing topics", zap.Error(err))
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestFailed(err)
}
@ -278,7 +278,7 @@ func (m *Messenger) syncFilters(filters []*transport.Filter) (*MessengerResponse
}
}
m.logger.Info("topics synced")
m.logger.Debug("topics synced")
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryRequestCompleted()
}
@ -364,7 +364,12 @@ func (m *Messenger) calculateGapForChat(chat *Chat, from uint32) (*common.Messag
}
func (m *Messenger) processMailserverBatch(batch MailserverBatch) error {
m.logger.Info("syncing topic", zap.Any("topic", batch.Topics), zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(batch.To)))
var topicStrings []string
for _, t := range batch.Topics {
topicStrings = append(topicStrings, t.String())
}
logger := m.logger.With(zap.Any("chatIDs", batch.ChatIDs), zap.String("fromString", time.Unix(int64(batch.From), 0).Format(time.RFC3339)), zap.String("toString", time.Unix(int64(batch.To), 0).Format(time.RFC3339)), zap.Any("topic", topicStrings), zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(batch.To)))
logger.Info("syncing topic")
ctx, cancel := context.WithTimeout(context.Background(), mailserverRequestTimeout)
defer cancel()
@ -373,7 +378,7 @@ func (m *Messenger) processMailserverBatch(batch MailserverBatch) error {
return err
}
for len(cursor) != 0 || storeCursor != nil {
m.logger.Info("retrieved cursor", zap.Any("cursor", cursor))
logger.Info("retrieved cursor", zap.String("cursor", types.EncodeHex(cursor)))
ctx, cancel := context.WithTimeout(context.Background(), mailserverRequestTimeout)
defer cancel()
@ -382,7 +387,7 @@ func (m *Messenger) processMailserverBatch(batch MailserverBatch) error {
return err
}
}
m.logger.Info("synced topic", zap.Any("topic", batch.Topics), zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(batch.To)))
logger.Info("synced topic")
return nil
}
@ -439,6 +444,8 @@ func (m *Messenger) SyncChatFromSyncedFrom(chatID string) (uint32, error) {
chat.SyncedFrom = batch.From
}
m.logger.Debug("setting sync timestamps", zap.Int64("from", int64(batch.From)), zap.Int64("to", int64(chat.SyncedTo)), zap.String("chatID", chatID))
err = m.persistence.SetSyncTimestamps(batch.From, chat.SyncedTo, chat.ID)
if err != nil {
return 0, err

View File

@ -551,7 +551,7 @@ func (c *Client) HandleContactCodeAdvertisement(clientPublicKey *ecdsa.PublicKey
// HandlePushNotificationResponse should set the request as processed
func (c *Client) HandlePushNotificationResponse(serverKey *ecdsa.PublicKey, response protobuf.PushNotificationResponse) error {
messageID := response.MessageId
c.config.Logger.Debug("received response for", zap.Binary("message-id", messageID))
c.config.Logger.Debug("received response for", zap.String("messageID", types.EncodeHex(messageID)))
for _, report := range response.Reports {
c.config.Logger.Debug("received response", zap.Any("report", report))
err := c.persistence.UpdateNotificationResponse(messageID, report)
@ -985,7 +985,7 @@ func (c *Client) handleDirectMessageSent(sentMessage *common.SentMessage) error
return nil
}
c.config.Logger.Debug("actionable messages", zap.Any("message-ids", trackedMessageIDs), zap.Any("installation-ids", installationIDs))
c.config.Logger.Debug("actionable messages", zap.Any("messageIDs", trackedMessageIDs), zap.Any("installation-ids", installationIDs))
// Get message to check chatID. Again we use the first message for simplicity, but we should send one for each chatID. Messages though are very rarely batched.
message, err := c.getMessage(types.EncodeHex(trackedMessageIDs[0]))

View File

@ -215,6 +215,7 @@ func (t *Transport) GetStats() types.StatsSummary {
func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) {
result := make(map[Filter][]*types.Message)
logger := t.logger.With(zap.String("site", "retrieveRawAll"))
allFilters := t.filters.Filters()
for _, filter := range allFilters {
@ -224,7 +225,7 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) {
}
msgs, err := t.api.GetFilterMessages(filter.FilterID)
if err != nil {
t.logger.Warn("failed to fetch messages", zap.Error(err))
logger.Warn("failed to fetch messages", zap.Error(err))
continue
}
if len(msgs) == 0 {
@ -239,7 +240,7 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) {
hits, err := t.cache.Hits(ids)
if err != nil {
t.logger.Error("failed to check messages exists", zap.Error(err))
logger.Error("failed to check messages exists", zap.Error(err))
return nil, err
}
@ -247,6 +248,10 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) {
// Exclude anything that is a cache hit
if !hits[types.EncodeHex(msgs[i].Hash)] {
result[*filter] = append(result[*filter], msgs[i])
logger.Debug("message not cached", zap.String("hash", types.EncodeHex(msgs[i].Hash)))
} else {
logger.Debug("message cached", zap.String("hash", types.EncodeHex(msgs[i].Hash)))
}
}

View File

@ -26,6 +26,8 @@ import (
"sync"
"time"
"go.uber.org/zap"
"github.com/status-im/status-go/waku/common"
"github.com/ethereum/go-ethereum/common/hexutil"
@ -481,6 +483,8 @@ func toMessage(messages []*common.ReceivedMessage) []*Message {
// GetFilterMessages returns the messages that match the filter criteria and
// are received between the last poll and now.
func (api *PublicWakuAPI) GetFilterMessages(id string) ([]*Message, error) {
logger := api.w.logger.With(zap.String("site", "getFilterMessages"), zap.String("filterId", id))
logger.Debug("retrieving filter messages")
api.mu.Lock()
f := api.w.GetFilter(id)
if f == nil {
@ -493,6 +497,8 @@ func (api *PublicWakuAPI) GetFilterMessages(id string) ([]*Message, error) {
receivedMessages := f.Retrieve()
messages := make([]*Message, 0, len(receivedMessages))
for _, msg := range receivedMessages {
logger.Debug("retrieved filter message", zap.String("hash", msg.EnvelopeHash.String()), zap.Bool("isP2P", msg.P2P), zap.String("topic", msg.Topic.String()))
messages = append(messages, ToWakuMessage(msg))
}

View File

@ -1053,6 +1053,7 @@ func (w *Waku) UnsubscribeMany(ids []string) error {
// Send injects a message into the waku send queue, to be distributed in the
// network in the coming cycles.
func (w *Waku) Send(envelope *common.Envelope) error {
w.logger.Debug("send: sending envelope", zap.String("hash", envelope.Hash().String()))
ok, err := w.add(envelope, false)
if err == nil && !ok {
return fmt.Errorf("failed to add envelope")
@ -1326,12 +1327,15 @@ func (w *Waku) addEnvelope(envelope *common.Envelope) {
func (w *Waku) addAndBridge(envelope *common.Envelope, isP2P bool, bridged bool) (bool, error) {
now := uint32(w.timeSource().Unix())
sent := envelope.Expiry - envelope.TTL
logger := w.logger.With(zap.String("hash", envelope.Hash().String()), zap.String("site", "addAndBridge"), zap.String("topic", envelope.Topic.String()), zap.Bool("isP2P", isP2P))
logger.Debug("addAndBridge: processing envelope")
common.EnvelopesReceivedCounter.Inc()
if sent > now {
if sent-common.DefaultSyncAllowance > now {
common.EnvelopesCacheFailedCounter.WithLabelValues("in_future").Inc()
w.logger.Warn("envelope created in the future", zap.ByteString("hash", envelope.Hash().Bytes()))
logger.Warn("envelope created in the future")
return false, common.TimeSyncError(errors.New("envelope from future"))
}
// recalculate PoW, adjusted for the time difference, plus one second for latency
@ -1341,17 +1345,17 @@ func (w *Waku) addAndBridge(envelope *common.Envelope, isP2P bool, bridged bool)
if envelope.Expiry < now {
if envelope.Expiry+common.DefaultSyncAllowance*2 < now {
common.EnvelopesCacheFailedCounter.WithLabelValues("very_old").Inc()
w.logger.Warn("very old envelope", zap.ByteString("hash", envelope.Hash().Bytes()))
logger.Warn("very old envelope")
return false, common.TimeSyncError(errors.New("very old envelope"))
}
w.logger.Debug("expired envelope dropped", zap.ByteString("hash", envelope.Hash().Bytes()))
logger.Debug("expired envelope dropped")
common.EnvelopesCacheFailedCounter.WithLabelValues("expired").Inc()
return false, nil // drop envelope without error
}
if uint32(envelope.Size()) > w.MaxMessageSize() {
common.EnvelopesCacheFailedCounter.WithLabelValues("oversized").Inc()
return false, fmt.Errorf("huge messages are not allowed [%x][%d][%d]", envelope.Hash(), envelope.Size(), w.MaxMessageSize())
return false, fmt.Errorf("huge messages are not allowed [%s][%d][%d]", envelope.Hash().String(), envelope.Size(), w.MaxMessageSize())
}
if envelope.PoW() < w.MinPow() {
@ -1360,7 +1364,7 @@ func (w *Waku) addAndBridge(envelope *common.Envelope, isP2P bool, bridged bool)
// for a short period of peer synchronization.
if envelope.PoW() < w.MinPowTolerance() {
common.EnvelopesCacheFailedCounter.WithLabelValues("low_pow").Inc()
return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex())
return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%s]", envelope.PoW(), envelope.Hash().String())
}
}
@ -1370,6 +1374,7 @@ func (w *Waku) addAndBridge(envelope *common.Envelope, isP2P bool, bridged bool)
}
if !match {
logger.Debug("addAndBridge: no matches for envelope")
return false, nil
}
@ -1379,10 +1384,12 @@ func (w *Waku) addAndBridge(envelope *common.Envelope, isP2P bool, bridged bool)
_, alreadyCached := w.envelopes[hash]
w.poolMu.Unlock()
if !alreadyCached {
logger.Debug("addAndBridge: adding envelope")
w.addEnvelope(envelope)
}
if alreadyCached {
logger.Debug("addAndBridge: already cached")
common.EnvelopesCachedCounter.WithLabelValues("hit").Inc()
} else {
common.EnvelopesCachedCounter.WithLabelValues("miss").Inc()
@ -1400,7 +1407,7 @@ func (w *Waku) addAndBridge(envelope *common.Envelope, isP2P bool, bridged bool)
// In particular, if a node is a lightweight node,
// it should not bridge any envelopes.
if !isP2P && !bridged && w.bridge != nil {
w.logger.Debug("bridging envelope from Waku", zap.ByteString("hash", envelope.Hash().Bytes()))
logger.Debug("bridging envelope from Waku")
_, in := w.bridge.Pipe()
in <- envelope
common.BridgeSent.Inc()
@ -1483,12 +1490,14 @@ func (w *Waku) update() {
func (w *Waku) expire() {
w.poolMu.Lock()
defer w.poolMu.Unlock()
logger := w.logger.With(zap.String("site", "expire"))
now := uint32(w.timeSource().Unix())
for expiry, hashSet := range w.expirations {
if expiry < now {
// Dump all expired messages and remove timestamp
hashSet.Each(func(v interface{}) bool {
logger.Debug("expiring envelope", zap.String("hash", v.(gethcommon.Hash).String()))
delete(w.envelopes, v.(gethcommon.Hash))
common.EnvelopesCachedCounter.WithLabelValues("clear").Inc()
w.envelopeFeed.Send(common.EnvelopeEvent{