From ff689343547290d701e944f3c7666c353ed51617 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Thu, 4 Jan 2024 10:41:11 -0400 Subject: [PATCH] chore: improve light client logging (#992) --- waku/v2/protocol/filter/client.go | 24 ++++++++++++-------- waku/v2/protocol/lightpush/waku_lightpush.go | 5 ++++ waku/v2/protocol/pb/utils.go | 16 +++++++++++++ waku/v2/utils/logger.go | 2 +- 4 files changed, 37 insertions(+), 10 deletions(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index b992d74a..b6fc3092 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -136,7 +136,9 @@ func (wf *WakuFilterLightNode) Stop() { func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Stream) { return func(stream network.Stream) { peerID := stream.Conn().RemotePeer() - logger := wf.log.With(logging.HostID("peer", peerID)) + + logger := wf.log.With(logging.HostID("peerID", peerID)) + if !wf.subscriptions.IsSubscribedTo(peerID) { logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID)) wf.metrics.RecordError(unknownPeerMessagePush) @@ -181,10 +183,11 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea } else { pubSubTopic = *messagePush.PubsubTopic } + + logger = messagePush.WakuMessage.Logger(logger, pubSubTopic) + if !wf.subscriptions.Has(peerID, protocol.NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)) { - logger.Warn("received messagepush with invalid subscription parameters", - zap.String("topic", pubSubTopic), - zap.String("contentTopic", messagePush.WakuMessage.ContentTopic)) + logger.Warn("received messagepush with invalid subscription parameters") wf.metrics.RecordError(invalidSubscriptionMessage) return } @@ -222,6 +225,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr return err } + logger := wf.log.With(logging.HostID("peerID", params.selectedPeer)) + stream, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1) if err != nil { wf.metrics.RecordError(dialFailure) @@ -231,13 +236,13 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr writer := pbio.NewDelimitedWriter(stream) reader := pbio.NewDelimitedReader(stream, math.MaxInt32) - wf.log.Debug("sending FilterSubscribeRequest", zap.Stringer("request", request)) + logger.Debug("sending FilterSubscribeRequest", zap.Stringer("request", request)) err = writer.WriteMsg(request) if err != nil { wf.metrics.RecordError(writeRequestFailure) - wf.log.Error("sending FilterSubscribeRequest", zap.Error(err)) + logger.Error("sending FilterSubscribeRequest", zap.Error(err)) if err := stream.Reset(); err != nil { - wf.log.Error("resetting connection", zap.Error(err)) + logger.Error("resetting connection", zap.Error(err)) } return err } @@ -245,10 +250,10 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr filterSubscribeResponse := &pb.FilterSubscribeResponse{} err = reader.ReadMsg(filterSubscribeResponse) if err != nil { - wf.log.Error("receiving FilterSubscribeResponse", zap.Error(err)) + logger.Error("receiving FilterSubscribeResponse", zap.Error(err)) wf.metrics.RecordError(decodeRPCFailure) if err := stream.Reset(); err != nil { - wf.log.Error("resetting connection", zap.Error(err)) + logger.Error("resetting connection", zap.Error(err)) } return err } @@ -257,6 +262,7 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr if err = filterSubscribeResponse.Validate(); err != nil { wf.metrics.RecordError(decodeRPCFailure) + logger.Error("validating response", zap.Error(err)) return err } diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 361f12ba..322b4d9c 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -300,8 +300,13 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa req.Message = message req.PubsubTopic = params.pubsubTopic + logger := message.Logger(wakuLP.log, params.pubsubTopic).With(logging.HostID("peerID", params.selectedPeer)) + + logger.Debug("publishing message") + response, err := wakuLP.request(ctx, req, params) if err != nil { + logger.Error("could not publish message", zap.Error(err)) return nil, err } diff --git a/waku/v2/protocol/pb/utils.go b/waku/v2/protocol/pb/utils.go index d21f527c..70de2e0d 100644 --- a/waku/v2/protocol/pb/utils.go +++ b/waku/v2/protocol/pb/utils.go @@ -3,7 +3,10 @@ package pb import ( "encoding/binary" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/waku-org/go-waku/waku/v2/hash" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) // Hash calculates the hash of a waku message @@ -16,3 +19,16 @@ func toBytes(i int64) []byte { binary.BigEndian.PutUint64(b, uint64(i)) return b } + +func (msg *WakuMessage) LogFields(pubsubTopic string) []zapcore.Field { + return []zapcore.Field{ + zap.String("hash", hexutil.Encode(msg.Hash(pubsubTopic))), + zap.String("pubsubTopic", pubsubTopic), + zap.String("contentTopic", msg.ContentTopic), + zap.Int64("timestamp", msg.GetTimestamp()), + } +} + +func (msg *WakuMessage) Logger(logger *zap.Logger, pubsubTopic string) *zap.Logger { + return logger.With(msg.LogFields(pubsubTopic)...) +} diff --git a/waku/v2/utils/logger.go b/waku/v2/utils/logger.go index 22e3a7ff..4616dfa2 100644 --- a/waku/v2/utils/logger.go +++ b/waku/v2/utils/logger.go @@ -19,7 +19,7 @@ func Logger() *zap.Logger { return log } -// MessagesLogger returns a logger used for debug logging of receivent/sent messages +// MessagesLogger returns a logger used for debug logging of sent/received messages func MessagesLogger(prefix string) *zap.Logger { if messageLoggers == nil { messageLoggers = make(map[string]*zap.Logger)