Add messages logging subsystem

This commit is contained in:
Vitaly Vlasov 2023-11-09 13:40:19 +02:00 committed by Vit∀ly Vlasov
parent 10e32d1059
commit 684c7a46df
11 changed files with 47 additions and 38 deletions

View File

@ -142,7 +142,7 @@ func bridgeTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.WakuNo
env.Message().Meta = append(env.Message().Meta, fwdMetaTag...)
_, err := wakuNode.Relay().Publish(ctx, env.Message(), relay.WithPubSubTopic(topic))
if err != nil {
utils.Logger().Warn("could not bridge message", logging.HexString("hash", env.Hash()),
utils.Logger().Warn("could not bridge message", logging.HexBytes("hash", env.Hash()),
zap.String("fromTopic", env.PubsubTopic()), zap.String("toTopic", topic),
zap.String("contentTopic", env.Message().ContentTopic), zap.Error(err))
}

View File

@ -77,14 +77,14 @@ func execute(ctx context.Context) error {
if logger.Level() == zap.DebugLevel {
logger.Info("registered credentials into the membership contract",
logging.HexString("IDCommitment", identityCredential.IDCommitment[:]),
logging.HexString("IDNullifier", identityCredential.IDNullifier[:]),
logging.HexString("IDSecretHash", identityCredential.IDSecretHash[:]),
logging.HexString("IDTrapDoor", identityCredential.IDTrapdoor[:]),
logging.HexBytes("IDCommitment", identityCredential.IDCommitment[:]),
logging.HexBytes("IDNullifier", identityCredential.IDNullifier[:]),
logging.HexBytes("IDSecretHash", identityCredential.IDSecretHash[:]),
logging.HexBytes("IDTrapDoor", identityCredential.IDTrapdoor[:]),
zap.Uint("index", membershipIndex),
)
} else {
logger.Info("registered credentials into the membership contract", logging.HexString("idCommitment", identityCredential.IDCommitment[:]), zap.Uint("index", membershipIndex))
logger.Info("registered credentials into the membership contract", logging.HexBytes("idCommitment", identityCredential.IDCommitment[:]), zap.Uint("index", membershipIndex))
}
web3Config.ETHClient.Close()

View File

@ -124,7 +124,7 @@ func register(ctx context.Context, web3Config *web3.Config, idComm rln.IDCommitm
var eventIDComm rln.IDCommitment = rln.BigIntToBytes32(evt.IdCommitment)
log.Debug("information extracted from tx log", zap.Uint64("blockNumber", evt.Raw.BlockNumber), logging.HexString("idCommitment", eventIDComm[:]), zap.Uint64("index", evt.Index.Uint64()))
log.Debug("information extracted from tx log", zap.Uint64("blockNumber", evt.Raw.BlockNumber), logging.HexBytes("idCommitment", eventIDComm[:]), zap.Uint64("index", evt.Index.Uint64()))
if eventIDComm != idComm {
return 0, errors.New("invalid id commitment key")

View File

@ -160,7 +160,7 @@ func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, rep
}
rpcMsg, err := ProtoToRPC(msg.Message())
if err != nil {
r.log.Warn("could not include message in response", logging.HexString("hash", msg.Hash()), zap.Error(err))
r.log.Warn("could not include message in response", logging.HexBytes("hash", msg.Hash()), zap.Error(err))
} else {
*reply = append(*reply, rpcMsg)
}

View File

@ -35,16 +35,6 @@ func (bArr byteArr) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
return nil
}
type hexByte []byte
func HexString(key string, byteVal hexByte) zapcore.Field {
return zap.Stringer(key, hexByte(byteVal))
}
func (h hexByte) String() string {
return "0x" + hex.EncodeToString(h)
}
// List of multiaddrs
type multiaddrs []multiaddr.Multiaddr

View File

@ -19,6 +19,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/service"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
@ -217,20 +218,23 @@ func (wf *WakuFilterFullNode) filterListener(ctx context.Context) {
handle := func(envelope *protocol.Envelope) error {
msg := envelope.Message()
pubsubTopic := envelope.PubsubTopic()
logger := wf.log.With(logging.HexBytes("envelopeHash", envelope.Hash()))
logger := utils.MessagesLogger("filter").With(logging.HexBytes("hash", envelope.Hash()),
zap.String("pubsubTopic", envelope.PubsubTopic()),
zap.String("contentTopic", envelope.Message().ContentTopic),
)
logger.Debug("push message to filter subscribers")
// Each subscriber is a light node that earlier on invoked
// a FilterRequest on this node
for subscriber := range wf.subscriptions.Items(pubsubTopic, msg.ContentTopic) {
logger := logger.With(logging.HostID("subscriber", subscriber))
subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines
logger := logger.With(logging.HostID("peer", subscriber))
// Do a message push to light node
logger.Info("pushing message to light node")
logger.Debug("pushing message to light node")
wf.WaitGroup().Add(1)
go func(subscriber peer.ID) {
defer wf.WaitGroup().Done()
start := time.Now()
err := wf.pushMessage(ctx, subscriber, envelope)
err := wf.pushMessage(ctx, logger, subscriber, envelope)
if err != nil {
logger.Error("pushing message", zap.Error(err))
return
@ -249,13 +253,7 @@ func (wf *WakuFilterFullNode) filterListener(ctx context.Context) {
}
}
func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, env *protocol.Envelope) error {
logger := wf.log.With(
logging.HostID("peer", peerID),
logging.HexBytes("envelopeHash", env.Hash()),
zap.String("pubsubTopic", env.PubsubTopic()),
zap.String("contentTopic", env.Message().ContentTopic),
)
func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, logger *zap.Logger, peerID peer.ID, env *protocol.Envelope) error {
pubSubTopic := env.PubsubTopic()
messagePush := &pb.MessagePush{
PubsubTopic: &pubSubTopic,

View File

@ -18,6 +18,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush/pb"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
@ -292,7 +293,7 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa
if response.IsSuccess {
hash := message.Hash(params.pubsubTopic)
wakuLP.log.Info("waku.lightpush published", logging.HexString("hash", hash))
utils.MessagesLogger("lightpush").Debug("waku.lightpush published", logging.HexBytes("hash", hash))
return hash, nil
}

View File

@ -53,6 +53,6 @@ func (m *metricsImpl) RecordMessage(envelope *waku_proto.Envelope) {
messageSize.Observe(payloadSizeInKb)
pubsubTopic := envelope.PubsubTopic()
messages.WithLabelValues(pubsubTopic).Inc()
m.log.Debug("waku.relay received", zap.String("pubsubTopic", pubsubTopic), logging.HexString("hash", envelope.Hash()), zap.Int64("receivedTime", envelope.Index().ReceiverTime), zap.Int("payloadSizeBytes", payloadSizeInBytes))
m.log.Debug("waku.relay received", zap.String("pubsubTopic", pubsubTopic), logging.HexBytes("hash", envelope.Hash()), zap.Int64("receivedTime", envelope.Index().ReceiverTime), zap.Int("payloadSizeBytes", payloadSizeInBytes))
}()
}

View File

@ -20,6 +20,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/service"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
// WakuRelayID_v200 is the current protocol ID used for WakuRelay
@ -41,7 +42,8 @@ type WakuRelay struct {
timesource timesource.Timesource
metrics Metrics
log *zap.Logger
log *zap.Logger
logMessages *zap.Logger
bcaster Broadcaster
@ -80,8 +82,9 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou
w.minPeersToPublish = minPeersToPublish
w.CommonService = service.NewCommonService()
w.log = log.Named("relay")
w.logMessages = utils.MessagesLogger("relay")
w.events = eventbus.NewBus()
w.metrics = newMetrics(reg, w.log)
w.metrics = newMetrics(reg, w.logMessages)
// default options required by WakuRelay
w.opts = append(w.defaultPubsubOptions(), opts...)
@ -276,7 +279,7 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts .
hash := message.Hash(params.pubsubTopic)
w.log.Debug("waku.relay published", zap.String("pubsubTopic", params.pubsubTopic), logging.HexString("hash", hash), zap.Int64("publishTime", w.timesource.Now().UnixNano()), zap.Int("payloadSizeBytes", len(message.Payload)))
w.logMessages.Debug("waku.relay published", zap.String("pubsubTopic", params.pubsubTopic), logging.HexBytes("hash", hash), zap.Int64("publishTime", w.timesource.Now().UnixNano()), zap.Int("payloadSizeBytes", len(message.Payload)))
return hash, nil
}

View File

@ -53,7 +53,7 @@ func TestWakuRelay(t *testing.T) {
go func() {
defer cancel()
env := <-subs[0].Ch
t.Log("received msg", logging.HexString("hash", env.Hash()))
t.Log("received msg", logging.HexBytes("hash", env.Hash()))
}()
msg := &pb.WakuMessage{

View File

@ -9,6 +9,7 @@ import (
)
var log *zap.Logger
var messageLoggers map[string]*zap.Logger
// Logger creates a zap.Logger with some reasonable defaults
func Logger() *zap.Logger {
@ -18,6 +19,20 @@ func Logger() *zap.Logger {
return log
}
// MessagesLogger returns a logger used for debug logging of receivent/sent messages
func MessagesLogger(prefix string) *zap.Logger {
if messageLoggers == nil {
messageLoggers = make(map[string]*zap.Logger)
}
logger := messageLoggers[prefix]
if logger == nil {
logger = logging.Logger(prefix + ".messages").Desugar()
messageLoggers[prefix] = logger
}
return logger
}
// InitLogger initializes a global logger using an specific encoding
func InitLogger(encoding string, output string) {
cfg := logging.GetConfig()
@ -50,10 +65,12 @@ func InitLogger(encoding string, output string) {
cfg.File = "./waku.log"
}
}
if cfg.Level == logging.LevelError {
// Override default level setting
cfg.Level = logging.LevelInfo
}
logging.SetupLogging(cfg)
log = logging.Logger("gowaku").Desugar()
logging.SetAllLoggers(logging.LevelInfo)
}