diff --git a/logging/logging.go b/logging/logging.go index 5ce22c42..130cc75d 100644 --- a/logging/logging.go +++ b/logging/logging.go @@ -7,6 +7,7 @@ package logging import ( + "encoding/hex" "net" "time" @@ -19,6 +20,31 @@ import ( "go.uber.org/zap/zapcore" ) +// List of []byte +type byteArr [][]byte + +// HexArray creates a field with an array of bytes that will be shown as a hexadecimal string in logs +func HexArray(key string, byteVal ...[]byte) zapcore.Field { + return zap.Array(key, byteArr(byteVal)) +} + +func (bArr byteArr) MarshalLogArray(encoder zapcore.ArrayEncoder) error { + for _, b := range bArr { + encoder.AppendString("0x" + hex.EncodeToString(b)) + } + return nil +} + +type hexByte []byte + +func HexString(key string, byteVal []byte) zapcore.Field { + return zap.Stringer(key, hexByte(byteVal)) +} + +func (h hexByte) String() string { + return "0x" + hex.EncodeToString(h) +} + // List of multiaddrs type multiaddrs []multiaddr.Multiaddr diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 3de287a5..36148a7f 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -440,7 +440,7 @@ func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error { return errors.New("cannot publish message, relay and lightpush are disabled") } - hash, _ := msg.Hash() + hash, _, _ := msg.Hash() err := try.Do(func(attempt int) (bool, error) { var err error if !w.relay.EnoughPeersToPublish() { diff --git a/waku/v2/protocol/envelope.go b/waku/v2/protocol/envelope.go index 5e146e3d..72102ea6 100644 --- a/waku/v2/protocol/envelope.go +++ b/waku/v2/protocol/envelope.go @@ -20,13 +20,12 @@ type Envelope struct { // It's used as a way to know to which Pubsub topic belongs a WakuMessage // as well as generating a hash based on the bytes that compose the message func NewEnvelope(msg *pb.WakuMessage, receiverTime int64, pubSubTopic string) *Envelope { - data, _ := msg.Marshal() + messageHash, dataLen, _ := msg.Hash() hash := sha256.Sum256(append([]byte(msg.ContentTopic), msg.Payload...)) - return &Envelope{ msg: msg, - size: len(data), - hash: pb.Hash(data), + size: dataLen, + hash: messageHash, index: &pb.Index{ Digest: hash[:], ReceiverTime: receiverTime, diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index c6b315ba..57d6df27 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -225,7 +225,8 @@ func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *pb.Wak } if response.IsSuccess { - hash, _ := message.Hash() + hash, _, _ := message.Hash() + wakuLP.log.Info("waku.lightpush published", logging.HexString("hash", hash)) return hash, nil } else { return nil, errors.New(response.Info) diff --git a/waku/v2/protocol/pb/utils.go b/waku/v2/protocol/pb/utils.go index 80f392a0..51c1cef8 100644 --- a/waku/v2/protocol/pb/utils.go +++ b/waku/v2/protocol/pb/utils.go @@ -7,13 +7,13 @@ import ( ) // Hash calculates the hash of a waku message -func (msg *WakuMessage) Hash() ([]byte, error) { +func (msg *WakuMessage) Hash() ([]byte, int, error) { out, err := proto.Marshal(msg) if err != nil { - return nil, err + return nil, 0, err } - return Hash(out), nil + return Hash(out), len(out), nil } // Hash calculates a hash from a byte slice using sha2-256 for the hashing algorithm diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index b3d78ac3..f88d97b7 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -3,6 +3,7 @@ package relay import ( "context" "crypto/sha256" + "encoding/hex" "errors" "fmt" "sync" @@ -16,6 +17,7 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" + "github.com/waku-org/go-waku/logging" v2 "github.com/waku-org/go-waku/waku/v2" "github.com/waku-org/go-waku/waku/v2/metrics" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" @@ -184,6 +186,8 @@ func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage, hash := pb.Hash(out) + w.log.Info("waku.relay published", zap.String("hash", hex.EncodeToString(hash))) + return hash, nil } @@ -341,6 +345,8 @@ func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub * envelope := waku_proto.NewEnvelope(wakuMessage, utils.GetUnixEpoch(), string(t)) + w.log.Info("waku.relay received", logging.HexString("hash", envelope.Hash())) + if w.bcaster != nil { w.bcaster.Submit(envelope) } diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index 11f5433a..8d4eae10 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -492,6 +492,13 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR return nil, errors.New("invalid cursor") } + var messageIDs [][]byte + for _, m := range response.Messages { + messageID, _, _ := m.Hash() + messageIDs = append(messageIDs, messageID) + } + store.log.Info("waku.store retrieved", logging.HexArray("hashes", messageIDs...)) + result := &Result{ Messages: response.Messages, query: q,