chore: add extra logging for message IDs

This commit is contained in:
Richard Ramos 2022-11-03 09:53:33 -04:00 committed by RichΛrd
parent dbe152b8e5
commit a247e8346d
7 changed files with 48 additions and 9 deletions

View File

@ -7,6 +7,7 @@
package logging package logging
import ( import (
"encoding/hex"
"net" "net"
"time" "time"
@ -19,6 +20,31 @@ import (
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
) )
// List of []byte
type byteArr [][]byte
// HexArray creates a field with an array of bytes that will be shown as a hexadecimal string in logs
func HexArray(key string, byteVal ...[]byte) zapcore.Field {
return zap.Array(key, byteArr(byteVal))
}
func (bArr byteArr) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
for _, b := range bArr {
encoder.AppendString("0x" + hex.EncodeToString(b))
}
return nil
}
type hexByte []byte
func HexString(key string, byteVal []byte) zapcore.Field {
return zap.Stringer(key, hexByte(byteVal))
}
func (h hexByte) String() string {
return "0x" + hex.EncodeToString(h)
}
// List of multiaddrs // List of multiaddrs
type multiaddrs []multiaddr.Multiaddr type multiaddrs []multiaddr.Multiaddr

View File

@ -440,7 +440,7 @@ func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error {
return errors.New("cannot publish message, relay and lightpush are disabled") return errors.New("cannot publish message, relay and lightpush are disabled")
} }
hash, _ := msg.Hash() hash, _, _ := msg.Hash()
err := try.Do(func(attempt int) (bool, error) { err := try.Do(func(attempt int) (bool, error) {
var err error var err error
if !w.relay.EnoughPeersToPublish() { if !w.relay.EnoughPeersToPublish() {

View File

@ -20,13 +20,12 @@ type Envelope struct {
// It's used as a way to know to which Pubsub topic belongs a WakuMessage // It's used as a way to know to which Pubsub topic belongs a WakuMessage
// as well as generating a hash based on the bytes that compose the message // as well as generating a hash based on the bytes that compose the message
func NewEnvelope(msg *pb.WakuMessage, receiverTime int64, pubSubTopic string) *Envelope { func NewEnvelope(msg *pb.WakuMessage, receiverTime int64, pubSubTopic string) *Envelope {
data, _ := msg.Marshal() messageHash, dataLen, _ := msg.Hash()
hash := sha256.Sum256(append([]byte(msg.ContentTopic), msg.Payload...)) hash := sha256.Sum256(append([]byte(msg.ContentTopic), msg.Payload...))
return &Envelope{ return &Envelope{
msg: msg, msg: msg,
size: len(data), size: dataLen,
hash: pb.Hash(data), hash: messageHash,
index: &pb.Index{ index: &pb.Index{
Digest: hash[:], Digest: hash[:],
ReceiverTime: receiverTime, ReceiverTime: receiverTime,

View File

@ -225,7 +225,8 @@ func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *pb.Wak
} }
if response.IsSuccess { if response.IsSuccess {
hash, _ := message.Hash() hash, _, _ := message.Hash()
wakuLP.log.Info("waku.lightpush published", logging.HexString("hash", hash))
return hash, nil return hash, nil
} else { } else {
return nil, errors.New(response.Info) return nil, errors.New(response.Info)

View File

@ -7,13 +7,13 @@ import (
) )
// Hash calculates the hash of a waku message // Hash calculates the hash of a waku message
func (msg *WakuMessage) Hash() ([]byte, error) { func (msg *WakuMessage) Hash() ([]byte, int, error) {
out, err := proto.Marshal(msg) out, err := proto.Marshal(msg)
if err != nil { if err != nil {
return nil, err return nil, 0, err
} }
return Hash(out), nil return Hash(out), len(out), nil
} }
// Hash calculates a hash from a byte slice using sha2-256 for the hashing algorithm // Hash calculates a hash from a byte slice using sha2-256 for the hashing algorithm

View File

@ -3,6 +3,7 @@ package relay
import ( import (
"context" "context"
"crypto/sha256" "crypto/sha256"
"encoding/hex"
"errors" "errors"
"fmt" "fmt"
"sync" "sync"
@ -16,6 +17,7 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/waku-org/go-waku/logging"
v2 "github.com/waku-org/go-waku/waku/v2" v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/metrics" "github.com/waku-org/go-waku/waku/v2/metrics"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
@ -184,6 +186,8 @@ func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage,
hash := pb.Hash(out) hash := pb.Hash(out)
w.log.Info("waku.relay published", zap.String("hash", hex.EncodeToString(hash)))
return hash, nil return hash, nil
} }
@ -341,6 +345,8 @@ func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub *
envelope := waku_proto.NewEnvelope(wakuMessage, utils.GetUnixEpoch(), string(t)) envelope := waku_proto.NewEnvelope(wakuMessage, utils.GetUnixEpoch(), string(t))
w.log.Info("waku.relay received", logging.HexString("hash", envelope.Hash()))
if w.bcaster != nil { if w.bcaster != nil {
w.bcaster.Submit(envelope) w.bcaster.Submit(envelope)
} }

View File

@ -492,6 +492,13 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
return nil, errors.New("invalid cursor") return nil, errors.New("invalid cursor")
} }
var messageIDs [][]byte
for _, m := range response.Messages {
messageID, _, _ := m.Hash()
messageIDs = append(messageIDs, messageID)
}
store.log.Info("waku.store retrieved", logging.HexArray("hashes", messageIDs...))
result := &Result{ result := &Result{
Messages: response.Messages, Messages: response.Messages,
query: q, query: q,