From 684c7a46dfbe05fc73b95baaebc9553e7d9e0d4c Mon Sep 17 00:00:00 2001 From: Vitaly Vlasov Date: Thu, 9 Nov 2023 13:40:19 +0200 Subject: [PATCH] Add messages logging subsystem --- cmd/waku/relay.go | 2 +- cmd/waku/rlngenerate/command_rln.go | 10 ++++----- cmd/waku/rlngenerate/web3.go | 2 +- cmd/waku/server/rpc/relay.go | 2 +- logging/logging.go | 10 --------- waku/v2/protocol/filter/server.go | 22 +++++++++----------- waku/v2/protocol/lightpush/waku_lightpush.go | 3 ++- waku/v2/protocol/relay/metrics.go | 2 +- waku/v2/protocol/relay/waku_relay.go | 9 +++++--- waku/v2/protocol/relay/waku_relay_test.go | 2 +- waku/v2/utils/logger.go | 21 +++++++++++++++++-- 11 files changed, 47 insertions(+), 38 deletions(-) diff --git a/cmd/waku/relay.go b/cmd/waku/relay.go index 513582e3..6b59f941 100644 --- a/cmd/waku/relay.go +++ b/cmd/waku/relay.go @@ -142,7 +142,7 @@ func bridgeTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.WakuNo env.Message().Meta = append(env.Message().Meta, fwdMetaTag...) _, err := wakuNode.Relay().Publish(ctx, env.Message(), relay.WithPubSubTopic(topic)) if err != nil { - utils.Logger().Warn("could not bridge message", logging.HexString("hash", env.Hash()), + utils.Logger().Warn("could not bridge message", logging.HexBytes("hash", env.Hash()), zap.String("fromTopic", env.PubsubTopic()), zap.String("toTopic", topic), zap.String("contentTopic", env.Message().ContentTopic), zap.Error(err)) } diff --git a/cmd/waku/rlngenerate/command_rln.go b/cmd/waku/rlngenerate/command_rln.go index 8a05cea9..c88767f8 100644 --- a/cmd/waku/rlngenerate/command_rln.go +++ b/cmd/waku/rlngenerate/command_rln.go @@ -77,14 +77,14 @@ func execute(ctx context.Context) error { if logger.Level() == zap.DebugLevel { logger.Info("registered credentials into the membership contract", - logging.HexString("IDCommitment", identityCredential.IDCommitment[:]), - logging.HexString("IDNullifier", identityCredential.IDNullifier[:]), - logging.HexString("IDSecretHash", identityCredential.IDSecretHash[:]), - logging.HexString("IDTrapDoor", identityCredential.IDTrapdoor[:]), + logging.HexBytes("IDCommitment", identityCredential.IDCommitment[:]), + logging.HexBytes("IDNullifier", identityCredential.IDNullifier[:]), + logging.HexBytes("IDSecretHash", identityCredential.IDSecretHash[:]), + logging.HexBytes("IDTrapDoor", identityCredential.IDTrapdoor[:]), zap.Uint("index", membershipIndex), ) } else { - logger.Info("registered credentials into the membership contract", logging.HexString("idCommitment", identityCredential.IDCommitment[:]), zap.Uint("index", membershipIndex)) + logger.Info("registered credentials into the membership contract", logging.HexBytes("idCommitment", identityCredential.IDCommitment[:]), zap.Uint("index", membershipIndex)) } web3Config.ETHClient.Close() diff --git a/cmd/waku/rlngenerate/web3.go b/cmd/waku/rlngenerate/web3.go index 5990c961..7c40d841 100644 --- a/cmd/waku/rlngenerate/web3.go +++ b/cmd/waku/rlngenerate/web3.go @@ -124,7 +124,7 @@ func register(ctx context.Context, web3Config *web3.Config, idComm rln.IDCommitm var eventIDComm rln.IDCommitment = rln.BigIntToBytes32(evt.IdCommitment) - log.Debug("information extracted from tx log", zap.Uint64("blockNumber", evt.Raw.BlockNumber), logging.HexString("idCommitment", eventIDComm[:]), zap.Uint64("index", evt.Index.Uint64())) + log.Debug("information extracted from tx log", zap.Uint64("blockNumber", evt.Raw.BlockNumber), logging.HexBytes("idCommitment", eventIDComm[:]), zap.Uint64("index", evt.Index.Uint64())) if eventIDComm != idComm { return 0, errors.New("invalid id commitment key") diff --git a/cmd/waku/server/rpc/relay.go b/cmd/waku/server/rpc/relay.go index 5c7245b4..ddc221bf 100644 --- a/cmd/waku/server/rpc/relay.go +++ b/cmd/waku/server/rpc/relay.go @@ -160,7 +160,7 @@ func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, rep } rpcMsg, err := ProtoToRPC(msg.Message()) if err != nil { - r.log.Warn("could not include message in response", logging.HexString("hash", msg.Hash()), zap.Error(err)) + r.log.Warn("could not include message in response", logging.HexBytes("hash", msg.Hash()), zap.Error(err)) } else { *reply = append(*reply, rpcMsg) } diff --git a/logging/logging.go b/logging/logging.go index 51763d6c..fb0295ae 100644 --- a/logging/logging.go +++ b/logging/logging.go @@ -35,16 +35,6 @@ func (bArr byteArr) MarshalLogArray(encoder zapcore.ArrayEncoder) error { return nil } -type hexByte []byte - -func HexString(key string, byteVal hexByte) zapcore.Field { - return zap.Stringer(key, hexByte(byteVal)) -} - -func (h hexByte) String() string { - return "0x" + hex.EncodeToString(h) -} - // List of multiaddrs type multiaddrs []multiaddr.Multiaddr diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index 376f80b9..cea5efab 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -19,6 +19,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -217,20 +218,23 @@ func (wf *WakuFilterFullNode) filterListener(ctx context.Context) { handle := func(envelope *protocol.Envelope) error { msg := envelope.Message() pubsubTopic := envelope.PubsubTopic() - logger := wf.log.With(logging.HexBytes("envelopeHash", envelope.Hash())) + logger := utils.MessagesLogger("filter").With(logging.HexBytes("hash", envelope.Hash()), + zap.String("pubsubTopic", envelope.PubsubTopic()), + zap.String("contentTopic", envelope.Message().ContentTopic), + ) + logger.Debug("push message to filter subscribers") // Each subscriber is a light node that earlier on invoked // a FilterRequest on this node for subscriber := range wf.subscriptions.Items(pubsubTopic, msg.ContentTopic) { - logger := logger.With(logging.HostID("subscriber", subscriber)) - subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines + logger := logger.With(logging.HostID("peer", subscriber)) // Do a message push to light node - logger.Info("pushing message to light node") + logger.Debug("pushing message to light node") wf.WaitGroup().Add(1) go func(subscriber peer.ID) { defer wf.WaitGroup().Done() start := time.Now() - err := wf.pushMessage(ctx, subscriber, envelope) + err := wf.pushMessage(ctx, logger, subscriber, envelope) if err != nil { logger.Error("pushing message", zap.Error(err)) return @@ -249,13 +253,7 @@ func (wf *WakuFilterFullNode) filterListener(ctx context.Context) { } } -func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, env *protocol.Envelope) error { - logger := wf.log.With( - logging.HostID("peer", peerID), - logging.HexBytes("envelopeHash", env.Hash()), - zap.String("pubsubTopic", env.PubsubTopic()), - zap.String("contentTopic", env.Message().ContentTopic), - ) +func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logger, peerID peer.ID, env *protocol.Envelope) error { pubSubTopic := env.PubsubTopic() messagePush := &pb.MessagePush{ PubsubTopic: &pubSubTopic, diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 469d9d07..10e78662 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -18,6 +18,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/lightpush/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -292,7 +293,7 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa if response.IsSuccess { hash := message.Hash(params.pubsubTopic) - wakuLP.log.Info("waku.lightpush published", logging.HexString("hash", hash)) + utils.MessagesLogger("lightpush").Debug("waku.lightpush published", logging.HexBytes("hash", hash)) return hash, nil } diff --git a/waku/v2/protocol/relay/metrics.go b/waku/v2/protocol/relay/metrics.go index a506d719..35cddbc7 100644 --- a/waku/v2/protocol/relay/metrics.go +++ b/waku/v2/protocol/relay/metrics.go @@ -53,6 +53,6 @@ func (m *metricsImpl) RecordMessage(envelope *waku_proto.Envelope) { messageSize.Observe(payloadSizeInKb) pubsubTopic := envelope.PubsubTopic() messages.WithLabelValues(pubsubTopic).Inc() - m.log.Debug("waku.relay received", zap.String("pubsubTopic", pubsubTopic), logging.HexString("hash", envelope.Hash()), zap.Int64("receivedTime", envelope.Index().ReceiverTime), zap.Int("payloadSizeBytes", payloadSizeInBytes)) + m.log.Debug("waku.relay received", zap.String("pubsubTopic", pubsubTopic), logging.HexBytes("hash", envelope.Hash()), zap.Int64("receivedTime", envelope.Index().ReceiverTime), zap.Int("payloadSizeBytes", payloadSizeInBytes)) }() } diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 88726028..e6e8ec7f 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -20,6 +20,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" ) // WakuRelayID_v200 is the current protocol ID used for WakuRelay @@ -41,7 +42,8 @@ type WakuRelay struct { timesource timesource.Timesource metrics Metrics - log *zap.Logger + log *zap.Logger + logMessages *zap.Logger bcaster Broadcaster @@ -80,8 +82,9 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou w.minPeersToPublish = minPeersToPublish w.CommonService = service.NewCommonService() w.log = log.Named("relay") + w.logMessages = utils.MessagesLogger("relay") w.events = eventbus.NewBus() - w.metrics = newMetrics(reg, w.log) + w.metrics = newMetrics(reg, w.logMessages) // default options required by WakuRelay w.opts = append(w.defaultPubsubOptions(), opts...) @@ -276,7 +279,7 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts . hash := message.Hash(params.pubsubTopic) - w.log.Debug("waku.relay published", zap.String("pubsubTopic", params.pubsubTopic), logging.HexString("hash", hash), zap.Int64("publishTime", w.timesource.Now().UnixNano()), zap.Int("payloadSizeBytes", len(message.Payload))) + w.logMessages.Debug("waku.relay published", zap.String("pubsubTopic", params.pubsubTopic), logging.HexBytes("hash", hash), zap.Int64("publishTime", w.timesource.Now().UnixNano()), zap.Int("payloadSizeBytes", len(message.Payload))) return hash, nil } diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index 158432f4..b31c0981 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -53,7 +53,7 @@ func TestWakuRelay(t *testing.T) { go func() { defer cancel() env := <-subs[0].Ch - t.Log("received msg", logging.HexString("hash", env.Hash())) + t.Log("received msg", logging.HexBytes("hash", env.Hash())) }() msg := &pb.WakuMessage{ diff --git a/waku/v2/utils/logger.go b/waku/v2/utils/logger.go index 754e9a22..22e3a7ff 100644 --- a/waku/v2/utils/logger.go +++ b/waku/v2/utils/logger.go @@ -9,6 +9,7 @@ import ( ) var log *zap.Logger +var messageLoggers map[string]*zap.Logger // Logger creates a zap.Logger with some reasonable defaults func Logger() *zap.Logger { @@ -18,6 +19,20 @@ func Logger() *zap.Logger { return log } +// MessagesLogger returns a logger used for debug logging of receivent/sent messages +func MessagesLogger(prefix string) *zap.Logger { + if messageLoggers == nil { + messageLoggers = make(map[string]*zap.Logger) + } + logger := messageLoggers[prefix] + if logger == nil { + logger = logging.Logger(prefix + ".messages").Desugar() + messageLoggers[prefix] = logger + } + + return logger +} + // InitLogger initializes a global logger using an specific encoding func InitLogger(encoding string, output string) { cfg := logging.GetConfig() @@ -50,10 +65,12 @@ func InitLogger(encoding string, output string) { cfg.File = "./waku.log" } } + if cfg.Level == logging.LevelError { + // Override default level setting + cfg.Level = logging.LevelInfo + } logging.SetupLogging(cfg) log = logging.Logger("gowaku").Desugar() - - logging.SetAllLoggers(logging.LevelInfo) }