From 0c989d3d8cf44548927b3a018c3f9e6d099badcb Mon Sep 17 00:00:00 2001 From: Martin Kobetic Date: Fri, 27 May 2022 09:25:06 -0400 Subject: [PATCH] feat: structured logging (#242) --- logging/README.md | 27 +++++ logging/context.go | 20 ++++ logging/logging.go | 112 ++++++++++++++++++ waku/metrics/http_test.go | 2 +- waku/node.go | 38 +++--- waku/persistence/store_test.go | 9 +- waku/v2/discv5/discover_test.go | 6 +- waku/v2/node/connectedness.go | 12 +- waku/v2/node/keepalive.go | 16 +-- waku/v2/node/wakunode2.go | 38 +++--- waku/v2/node/wakuoptions.go | 6 +- waku/v2/node/wakuoptions_test.go | 2 +- waku/v2/protocol/filter/waku_filter_option.go | 4 +- .../filter/waku_filter_option_test.go | 2 +- waku/v2/protocol/filter/waku_filter_test.go | 8 +- .../lightpush/waku_lightpush_option.go | 4 +- .../lightpush/waku_lightpush_option_test.go | 2 +- .../protocol/lightpush/waku_lightpush_test.go | 10 +- waku/v2/protocol/relay/waku_relay_test.go | 2 +- waku/v2/protocol/store/waku_resume_test.go | 14 +-- waku/v2/protocol/store/waku_store.go | 6 +- .../store/waku_store_persistence_test.go | 6 +- .../store/waku_store_protocol_test.go | 8 +- .../protocol/store/waku_store_query_test.go | 16 +-- waku/v2/protocol/swap/waku_swap_test.go | 2 +- waku/v2/rpc/admin_test.go | 4 +- waku/v2/rpc/filter_test.go | 6 +- waku/v2/rpc/private_test.go | 2 +- waku/v2/rpc/relay_test.go | 2 +- waku/v2/rpc/store_test.go | 2 +- waku/v2/rpc/waku_rpc_test.go | 2 +- waku/v2/utils/logger.go | 4 +- waku/v2/utils/peer.go | 7 +- 33 files changed, 284 insertions(+), 117 deletions(-) create mode 100644 logging/README.md create mode 100644 logging/context.go create mode 100644 logging/logging.go diff --git a/logging/README.md b/logging/README.md new file mode 100644 index 00000000..da6424e6 --- /dev/null +++ b/logging/README.md @@ -0,0 +1,27 @@ +# Logging Style Guide + +The goal of the style described here is to yield logs that are amenable to searching and aggregating. Structured logging is the best foundation for that. The log entries should be consistent and predictable to support search efficiency and high fidelity of search results. This style puts forward guidelines that promote this outcome. + +## Log messages + +* Messages should be fixed strings, never interpolate values into the messages. Use log entry fields for values. + +* Message strings should be consistent identification of what action/event was/is happening. Consistent messages makes searching the logs and aggregating correlated events easier. + +* Error messages should look like any other log messages. No need to say "x failed", the log level and error field are sufficient indication of failure. + +## Log entry fields + +* Adding fields to log entries is not free, but the fields are the discriminators that allow distinguishing similar log entries from each other. Insufficient field structure will makes it more difficult to find the entries you are looking for. + +* Create/Use field helpers for commonly used field value types (see logging.go). It promotes consistent formatting and allows changing it easily in one place. + +# Log entry field helpers + +* Make the field creation do as little as possible, i.e. just capture the existing value/object. Postpone any transformation to log emission time by employing generic zap.Stringer, zap.Array, zap.Object fields (see logging.go). It avoids unnecessary transformation for entries that may not even be emitted in the end. + +## Logger management + +* Adorn the logger with fields and reuse the adorned logger rather than repeatedly creating fields with each log entry. + +* Prefer passing the adorned logger down the call chain using Context. It promotes consistent log entry structure, i.e. fields will exist consistently in related entries. diff --git a/logging/context.go b/logging/context.go new file mode 100644 index 00000000..1484efaf --- /dev/null +++ b/logging/context.go @@ -0,0 +1,20 @@ +package logging + +import ( + "context" + + "go.uber.org/zap" +) + +var logKey = &struct{}{} + +// From allows retrieving the Logger from a Context +func From(ctx context.Context) *zap.Logger { + return ctx.Value(logKey).(*zap.Logger) +} + +// With associates a Logger with a Context to allow passing +// a logger down the call chain. +func With(ctx context.Context, log *zap.Logger) context.Context { + return context.WithValue(ctx, logKey, log) +} diff --git a/logging/logging.go b/logging/logging.go new file mode 100644 index 00000000..aa03fd4a --- /dev/null +++ b/logging/logging.go @@ -0,0 +1,112 @@ +// logging implements custom logging field types for commonly +// logged values like host ID or wallet address. +// +// implementation purposely does as little as possible at field creation time, +// and postpones any transformation to output time by relying on the generic +// zap types like zap.Stringer, zap.Array, zap.Object +// +package logging + +import ( + "time" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/multiformats/go-multiaddr" + "github.com/status-im/go-waku/waku/v2/protocol/pb" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// List of multiaddrs +type multiaddrs []multiaddr.Multiaddr + +// MultiAddrs creates a field with an array of multiaddrs +func MultiAddrs(key string, addrs ...multiaddr.Multiaddr) zapcore.Field { + return zap.Array(key, multiaddrs(addrs)) +} + +func (addrs multiaddrs) MarshalLogArray(encoder zapcore.ArrayEncoder) error { + for _, addr := range addrs { + encoder.AppendString(addr.String()) + } + return nil +} + +// Host ID/Peer ID +type hostID peer.ID + +// HostID creates a field for a peer.ID +func HostID(key string, id peer.ID) zapcore.Field { + return zap.Stringer(key, hostID(id)) +} + +func (id hostID) String() string { return peer.Encode(peer.ID(id)) } + +// Time - Waku uses Microsecond Unix Time +type timestamp int64 + +// Time creates a field for Waku time value +func Time(key string, time int64) zapcore.Field { + return zap.Stringer(key, timestamp(time)) +} + +func (t timestamp) String() string { + return time.UnixMicro(int64(t)).Format(time.RFC3339) +} + +// History Query Filters +type historyFilters []*pb.ContentFilter + +// Filters creates a field with an array of history query filters. +// The assumption is that log entries won't have more than one of these, +// so the field key/name is hardcoded to be "filters" to promote consistency. +func Filters(filters []*pb.ContentFilter) zapcore.Field { + return zap.Array("filters", historyFilters(filters)) +} + +func (filters historyFilters) MarshalLogArray(encoder zapcore.ArrayEncoder) error { + for _, filter := range filters { + encoder.AppendString(filter.ContentTopic) + } + return nil +} + +// History Paging Info +// Probably too detailed for normal log levels, but useful for debugging. +// Also a good example of nested object value. +type pagingInfo pb.PagingInfo +type index pb.Index + +// PagingInfo creates a field with history query paging info. +func PagingInfo(pi *pb.PagingInfo) zapcore.Field { + return zap.Object("paging_info", (*pagingInfo)(pi)) +} + +func (pi *pagingInfo) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddUint64("page_size", pi.PageSize) + encoder.AddString("direction", pi.Direction.String()) + if pi.Cursor != nil { + return encoder.AddObject("cursor", (*index)(pi.Cursor)) + } + return nil +} + +func (i *index) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddBinary("digest", i.Digest) + encoder.AddTime("sent", time.UnixMicro(i.SenderTime)) + encoder.AddTime("received", time.UnixMicro(i.ReceiverTime)) + return nil +} + +// Hex encoded bytes +type hexBytes []byte + +// HexBytes creates a field for a byte slice that should be emitted as hex encoded string. +func HexBytes(key string, bytes []byte) zap.Field { + return zap.Stringer(key, hexBytes(bytes)) +} + +func (bytes hexBytes) String() string { + return hexutil.Encode(bytes) +} diff --git a/waku/metrics/http_test.go b/waku/metrics/http_test.go index b3641af4..1a3b05c4 100644 --- a/waku/metrics/http_test.go +++ b/waku/metrics/http_test.go @@ -10,7 +10,7 @@ import ( ) func TestStartAndStopMetricsServer(t *testing.T) { - server := NewMetricsServer("0.0.0.0", 9876, utils.Logger()) + server := NewMetricsServer("0.0.0.0", 9876, utils.Logger().Sugar()) go func() { time.Sleep(1 * time.Second) diff --git a/waku/node.go b/waku/node.go index 3a109d7d..ed49ae07 100644 --- a/waku/node.go +++ b/waku/node.go @@ -23,12 +23,14 @@ import ( "github.com/libp2p/go-libp2p" libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/discovery" + "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p/config" "github.com/libp2p/go-libp2p-peerstore/pstoreds" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/multiformats/go-multiaddr" rendezvous "github.com/status-im/go-waku-rendezvous" + "github.com/status-im/go-waku/logging" "github.com/status-im/go-waku/waku/metrics" "github.com/status-im/go-waku/waku/persistence" "github.com/status-im/go-waku/waku/persistence/sqlite" @@ -44,9 +46,6 @@ import ( func failOnErr(err error, msg string) { if err != nil { - if msg != "" { - msg = msg + ": " - } utils.Logger().Fatal(msg, zap.Error(err)) } } @@ -86,6 +85,11 @@ func Execute(options Options) { prvKey, err := getPrivKey(options) failOnErr(err, "nodekey error") + p2pPrvKey := libp2pcrypto.Secp256k1PrivateKey(*prvKey) + id, err := peer.IDFromPublicKey((&p2pPrvKey).GetPublic()) + failOnErr(err, "deriving peer ID from private key") + logger := utils.Logger().With(logging.HostID("node", id)) + if options.DBPath == "" && options.UseDB { failOnErr(errors.New("dbpath can't be null"), "") } @@ -101,12 +105,12 @@ func Execute(options Options) { var metricsServer *metrics.Server if options.Metrics.Enable { - metricsServer = metrics.NewMetricsServer(options.Metrics.Address, options.Metrics.Port, utils.Logger()) + metricsServer = metrics.NewMetricsServer(options.Metrics.Address, options.Metrics.Port, logger.Sugar()) go metricsServer.Start() } nodeOpts := []node.WakuNodeOption{ - node.WithLogger(utils.Logger().Desugar()), + node.WithLogger(logger), node.WithPrivateKey(prvKey), node.WithHostAddress(hostAddr), node.WithKeepAlive(time.Duration(options.KeepAlive) * time.Second), @@ -186,7 +190,7 @@ func Execute(options Options) { if options.Store.Enable { nodeOpts = append(nodeOpts, node.WithWakuStoreAndRetentionPolicy(options.Store.ShouldResume, options.Store.RetentionMaxDaysDuration(), options.Store.RetentionMaxMessages)) if options.UseDB { - dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionMaxDaysDuration())) + dbStore, err := persistence.NewDBStore(logger.Sugar(), persistence.WithDB(db), persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionMaxDaysDuration())) failOnErr(err, "DBStore") nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore)) } else { @@ -205,16 +209,16 @@ func Execute(options Options) { var discoveredNodes []dnsdisc.DiscoveredNode if options.DNSDiscovery.Enable { if options.DNSDiscovery.URL != "" { - utils.Logger().Info("attempting DNS discovery with ", zap.String("URL", options.DNSDiscovery.URL)) + logger.Info("attempting DNS discovery with ", zap.String("URL", options.DNSDiscovery.URL)) nodes, err := dnsdisc.RetrieveNodes(ctx, options.DNSDiscovery.URL, dnsdisc.WithNameserver(options.DNSDiscovery.Nameserver)) if err != nil { - utils.Logger().Warn("dns discovery error ", zap.Error(err)) + logger.Warn("dns discovery error ", zap.Error(err)) } else { - utils.Logger().Info("found dns entries ", zap.Any("qty", len(nodes))) + logger.Info("found dns entries ", zap.Any("qty", len(nodes))) discoveredNodes = nodes } } else { - utils.Logger().Fatal("DNS discovery URL is required") + logger.Fatal("DNS discovery URL is required") } } @@ -223,7 +227,7 @@ func Execute(options Options) { for _, addr := range options.DiscV5.Nodes.Value() { bootnode, err := enode.Parse(enode.ValidSchemes, addr) if err != nil { - utils.Logger().Fatal("could not parse enr: ", zap.Error(err)) + logger.Fatal("parsing ENR", zap.Error(err)) } bootnodes = append(bootnodes, bootnode) } @@ -247,12 +251,12 @@ func Execute(options Options) { addPeers(wakuNode, options.Filter.Nodes.Value(), string(filter.FilterID_v20beta1)) if err = wakuNode.Start(); err != nil { - utils.Logger().Fatal(fmt.Errorf("could not start waku node, %w", err).Error()) + logger.Fatal("starting waku node", zap.Error(err)) } if options.DiscV5.Enable { if err = wakuNode.DiscV5().Start(); err != nil { - utils.Logger().Fatal(fmt.Errorf("could not start discovery v5, %w", err).Error()) + logger.Fatal("starting discovery v5", zap.Error(err)) } } @@ -273,7 +277,7 @@ func Execute(options Options) { go func(node string) { err = wakuNode.DialPeer(ctx, node) if err != nil { - utils.Logger().Error("error dialing peer ", zap.Error(err)) + logger.Error("dialing peer", zap.Error(err)) } }(n) } @@ -286,7 +290,7 @@ func Execute(options Options) { defer cancel() err = wakuNode.DialPeerWithMultiAddress(ctx, m) if err != nil { - utils.Logger().Error("error dialing peer ", zap.Error(err)) + logger.Error("dialing peer ", zap.Error(err)) } }(ctx, m) } @@ -295,7 +299,7 @@ func Execute(options Options) { var rpcServer *rpc.WakuRpc if options.RPCServer.Enable { - rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.RPCServer.Private, utils.Logger()) + rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.RPCServer.Private, logger.Sugar()) rpcServer.Start() } @@ -305,7 +309,7 @@ func Execute(options Options) { ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) <-ch - utils.Logger().Info("Received signal, shutting down...") + logger.Info("Received signal, shutting down...") // shut the node down wakuNode.Stop() diff --git a/waku/persistence/store_test.go b/waku/persistence/store_test.go index 9ee2354f..edbe2ee9 100644 --- a/waku/persistence/store_test.go +++ b/waku/persistence/store_test.go @@ -10,12 +10,13 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/utils" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) func NewMock() *sql.DB { db, err := sql.Open("sqlite3", ":memory:") if err != nil { - utils.Logger().Fatalf("an error '%s' was not expected when opening a stub database connection", err) + utils.Logger().Fatal("opening a stub database connection", zap.Error(err)) } return db @@ -32,7 +33,7 @@ func createIndex(digest []byte, receiverTime int64) *pb.Index { func TestDbStore(t *testing.T) { db := NewMock() option := WithDB(db) - store, err := NewDBStore(utils.Logger(), option) + store, err := NewDBStore(utils.Logger().Sugar(), option) require.NoError(t, err) res, err := store.GetAll() @@ -53,7 +54,7 @@ func TestDbStore(t *testing.T) { func TestStoreRetention(t *testing.T) { db := NewMock() - store, err := NewDBStore(utils.Logger(), WithDB(db), WithRetentionPolicy(5, 20*time.Second)) + store, err := NewDBStore(utils.Logger().Sugar(), WithDB(db), WithRetentionPolicy(5, 20*time.Second)) require.NoError(t, err) insertTime := time.Now() @@ -73,7 +74,7 @@ func TestStoreRetention(t *testing.T) { // This step simulates starting go-waku again from scratch - store, err = NewDBStore(utils.Logger(), WithDB(db), WithRetentionPolicy(5, 40*time.Second)) + store, err = NewDBStore(utils.Logger().Sugar(), WithDB(db), WithRetentionPolicy(5, 40*time.Second)) require.NoError(t, err) dbResults, err = store.GetAll() diff --git a/waku/v2/discv5/discover_test.go b/waku/v2/discv5/discover_test.go index cb02349a..125b9d1e 100644 --- a/waku/v2/discv5/discover_test.go +++ b/waku/v2/discv5/discover_test.go @@ -47,19 +47,19 @@ func TestDiscV5(t *testing.T) { host1, _, prvKey1 := createHost(t) udpPort1, err := tests.FindFreePort(t, "127.0.0.1", 3) require.NoError(t, err) - d1, err := NewDiscoveryV5(host1, host1.Addrs(), prvKey1, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger(), WithUDPPort(udpPort1)) + d1, err := NewDiscoveryV5(host1, host1.Addrs(), prvKey1, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger().Sugar(), WithUDPPort(udpPort1)) require.NoError(t, err) host2, _, prvKey2 := createHost(t) udpPort2, err := tests.FindFreePort(t, "127.0.0.1", 3) require.NoError(t, err) - d2, err := NewDiscoveryV5(host2, host2.Addrs(), prvKey2, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger(), WithUDPPort(udpPort2), WithBootnodes([]*enode.Node{d1.localnode.Node()})) + d2, err := NewDiscoveryV5(host2, host2.Addrs(), prvKey2, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger().Sugar(), WithUDPPort(udpPort2), WithBootnodes([]*enode.Node{d1.localnode.Node()})) require.NoError(t, err) host3, _, prvKey3 := createHost(t) udpPort3, err := tests.FindFreePort(t, "127.0.0.1", 3) require.NoError(t, err) - d3, err := NewDiscoveryV5(host3, host3.Addrs(), prvKey3, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger(), WithUDPPort(udpPort3), WithBootnodes([]*enode.Node{d2.localnode.Node()})) + d3, err := NewDiscoveryV5(host3, host3.Addrs(), prvKey3, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger().Sugar(), WithUDPPort(udpPort3), WithBootnodes([]*enode.Node{d2.localnode.Node()})) require.NoError(t, err) defer d1.Stop() diff --git a/waku/v2/node/connectedness.go b/waku/v2/node/connectedness.go index 2c4b36bc..6c9c74d6 100644 --- a/waku/v2/node/connectedness.go +++ b/waku/v2/node/connectedness.go @@ -2,12 +2,12 @@ package node import ( "context" - "fmt" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" + "github.com/status-im/go-waku/logging" "github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/protocol/filter" "github.com/status-im/go-waku/waku/v2/protocol/lightpush" @@ -33,12 +33,12 @@ type ConnStatus struct { type ConnectionNotifier struct { h host.Host ctx context.Context - log *zap.SugaredLogger + log *zap.Logger DisconnectChan chan peer.ID quit chan struct{} } -func NewConnectionNotifier(ctx context.Context, h host.Host, log *zap.SugaredLogger) ConnectionNotifier { +func NewConnectionNotifier(ctx context.Context, h host.Host, log *zap.Logger) ConnectionNotifier { return ConnectionNotifier{ h: h, ctx: ctx, @@ -58,13 +58,13 @@ func (c ConnectionNotifier) ListenClose(n network.Network, m ma.Multiaddr) { // Connected is called when a connection is opened func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) { - c.log.Info(fmt.Sprintf("Peer %s connected", cc.RemotePeer())) + c.log.Info("peer connected", logging.HostID("peer", cc.RemotePeer())) stats.Record(c.ctx, metrics.Peers.M(1)) } // Disconnected is called when a connection closed func (c ConnectionNotifier) Disconnected(n network.Network, cc network.Conn) { - c.log.Info(fmt.Sprintf("Peer %s disconnected", cc.RemotePeer())) + c.log.Info("peer disconnected", logging.HostID("peer", cc.RemotePeer())) stats.Record(c.ctx, metrics.Peers.M(-1)) c.DisconnectChan <- cc.RemotePeer() } @@ -117,7 +117,7 @@ func (w *WakuNode) Status() (isOnline bool, hasHistory bool) { for _, peer := range w.host.Network().Peers() { protocols, err := w.host.Peerstore().GetProtocols(peer) if err != nil { - w.log.Warn(fmt.Errorf("could not read peer %s protocols", peer)) + w.log.Warn("reading peer protocols", logging.HostID("peer", peer), zap.Error(err)) } for _, protocol := range protocols { diff --git a/waku/v2/node/keepalive.go b/waku/v2/node/keepalive.go index d12721bf..69e9990b 100644 --- a/waku/v2/node/keepalive.go +++ b/waku/v2/node/keepalive.go @@ -2,12 +2,13 @@ package node import ( "context" - "fmt" "time" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "github.com/status-im/go-waku/logging" + "go.uber.org/zap" ) const maxAllowedPingFailures = 2 @@ -19,7 +20,7 @@ const maxPublishAttempt = 5 func (w *WakuNode) startKeepAlive(t time.Duration) { go func() { defer w.wg.Done() - w.log.Info("Setting up ping protocol with duration of ", t) + w.log.Info("setting up ping protocol", zap.Duration("duration", t)) ticker := time.NewTicker(t) defer ticker.Stop() for { @@ -52,25 +53,26 @@ func (w *WakuNode) pingPeer(peer peer.ID) { ctx, cancel := context.WithTimeout(w.ctx, 3*time.Second) defer cancel() - w.log.Debug("Pinging ", peer) + logger := w.log.With(logging.HostID("peer", peer)) + logger.Debug("pinging") pr := ping.Ping(ctx, w.host, peer) select { case res := <-pr: if res.Error != nil { w.keepAliveFails[peer]++ - w.log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, res.Error.Error())) + logger.Debug("could not ping", zap.Error(res.Error)) } else { w.keepAliveFails[peer] = 0 } case <-ctx.Done(): w.keepAliveFails[peer]++ - w.log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, ctx.Err())) + logger.Debug("could not ping (context done)", zap.Error(ctx.Err())) } if w.keepAliveFails[peer] > maxAllowedPingFailures && w.host.Network().Connectedness(peer) == network.Connected { - w.log.Info("Disconnecting peer ", peer) + logger.Info("disconnecting peer") if err := w.host.Network().ClosePeer(peer); err != nil { - w.log.Debug(fmt.Sprintf("Could not close conn to peer %s: %s", peer, err)) + logger.Debug("closing conn to peer", zap.Error(err)) } w.keepAliveFails[peer] = 0 } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index af533efc..224f2f82 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -9,7 +9,6 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/libp2p/go-libp2p" "go.uber.org/zap" @@ -24,6 +23,7 @@ import ( "go.opencensus.io/stats" rendezvous "github.com/status-im/go-waku-rendezvous" + "github.com/status-im/go-waku/logging" "github.com/status-im/go-waku/waku/try" v2 "github.com/status-im/go-waku/waku/v2" "github.com/status-im/go-waku/waku/v2/discv5" @@ -49,7 +49,7 @@ type storeFactory func(w *WakuNode) store.Store type WakuNode struct { host host.Host opts *WakuNodeParameters - log *zap.SugaredLogger + log *zap.Logger relay *relay.WakuRelay filter *filter.WakuFilter @@ -86,7 +86,7 @@ type WakuNode struct { } func defaultStoreFactory(w *WakuNode) store.Store { - return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log) + return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log.Sugar()) } // New is used to instantiate a WakuNode using a set of WakuNodeOptions @@ -193,19 +193,19 @@ func (w *WakuNode) onAddrChange() { for m := range w.addrChan { ipStr, err := m.ValueForProtocol(ma.P_IP4) if err != nil { - w.log.Error(fmt.Sprintf("could not extract ip from ma %s: %s", m, err.Error())) + w.log.Error("extracting ip from ma", logging.MultiAddrs("ma", m), zap.Error(err)) continue } portStr, err := m.ValueForProtocol(ma.P_TCP) if err != nil { - w.log.Error(fmt.Sprintf("could not extract port from ma %s: %s", m, err.Error())) + w.log.Error("extracting port from ma", logging.MultiAddrs("ma", m), zap.Error(err)) continue } port, err := strconv.Atoi(portStr) if err != nil { - w.log.Error(fmt.Sprintf("could not convert port to int: %s", err.Error())) + w.log.Error("converting port to int", zap.Error(err)) continue } @@ -218,7 +218,7 @@ func (w *WakuNode) onAddrChange() { if w.opts.enableDiscV5 { err := w.discoveryV5.UpdateAddr(addr) if err != nil { - w.log.Error(fmt.Sprintf("could not update DiscV5 address with IP %s:%d %s", addr.IP, addr.Port, err.Error())) + w.log.Error("updating DiscV5 address with IP", zap.Stringer("ip", addr.IP), zap.Int("port", addr.Port), zap.Error(err)) continue } } @@ -227,15 +227,15 @@ func (w *WakuNode) onAddrChange() { } func (w *WakuNode) logAddress(addr ma.Multiaddr) { - w.log.Info("Listening on ", addr) + logger := w.log.With(logging.MultiAddrs("multiaddr", addr)) // TODO: make this optional depending on DNS Disc being enabled if w.opts.privKey != nil { enr, ip, err := utils.GetENRandIP(addr, w.wakuFlag, w.opts.privKey) if err != nil { - w.log.Error("could not obtain ENR record from multiaddress", err) + logger.Error("obtaining ENR record from multiaddress", zap.Error(err)) } else { - w.log.Info(fmt.Sprintf("DNS: discoverable ENR for IP %s: %s", ip, enr)) + logger.Info("listening", zap.Stringer("ENR", enr), zap.Stringer("ip", ip)) } } } @@ -281,9 +281,9 @@ func (w *WakuNode) checkForAddressChanges() { // Start initializes all the protocols that were setup in the WakuNode func (w *WakuNode) Start() error { - w.log.Info("Version details ", "commit=", GitCommit) + w.log.Info("Version details ", zap.String("commit", GitCommit)) - w.swap = swap.NewWakuSwap(w.log, []swap.SwapOption{ + w.swap = swap.NewWakuSwap(w.log.Sugar(), []swap.SwapOption{ swap.WithMode(w.opts.swapMode), swap.WithThreshold(w.opts.swapPaymentThreshold, w.opts.swapDisconnectThreshold), }...) @@ -294,7 +294,7 @@ func (w *WakuNode) Start() error { } if w.opts.enableFilter { - filter, err := filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode, w.log, w.opts.filterOpts...) + filter, err := filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode, w.log.Sugar(), w.opts.filterOpts...) if err != nil { return err } @@ -322,7 +322,7 @@ func (w *WakuNode) Start() error { return err } - w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay, w.log) + w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay, w.log.Sugar()) if w.opts.enableLightPush { if err := w.lightPush.Start(); err != nil { return err @@ -446,11 +446,11 @@ func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error { if !w.lightPush.IsStarted() { err = errors.New("not enought peers for relay and lightpush is not yet started") } else { - w.log.Debug("publishing message via lightpush", hexutil.Encode(hash)) + w.log.Debug("publishing message via lightpush", logging.HexBytes("hash", hash)) _, err = w.Lightpush().Publish(ctx, msg) } } else { - w.log.Debug("publishing message via relay", hexutil.Encode(hash)) + w.log.Debug("publishing message via relay", logging.HexBytes("hash", hash)) _, err = w.Relay().Publish(ctx, msg) } @@ -462,7 +462,7 @@ func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error { func (w *WakuNode) mountRelay(minRelayPeersToPublish int, opts ...pubsub.Option) error { var err error - w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, minRelayPeersToPublish, w.log, opts...) + w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, minRelayPeersToPublish, w.log.Sugar(), opts...) if err != nil { return err } @@ -492,7 +492,7 @@ func (w *WakuNode) mountDiscV5() error { } var err error - w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.ListenAddresses(), w.opts.privKey, w.wakuFlag, w.log, discV5Options...) + w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.ListenAddresses(), w.opts.privKey, w.wakuFlag, w.log.Sugar(), discV5Options...) return err } @@ -549,7 +549,7 @@ func (w *WakuNode) startStore() { } func (w *WakuNode) addPeer(info *peer.AddrInfo, protocols ...string) error { - w.log.Info(fmt.Sprintf("Adding peer %s to peerstore", info.ID.Pretty())) + w.log.Info("adding peer to peerstore", logging.HostID("peer", info.ID)) w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) err := w.host.Peerstore().AddProtocols(info.ID, protocols...) if err != nil { diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 94f8d130..febb4481 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -47,7 +47,7 @@ type WakuNodeParameters struct { wssPort int tlsConfig *tls.Config - logger *zap.SugaredLogger + logger *zap.Logger enableRelay bool enableFilter bool @@ -92,7 +92,7 @@ type WakuNodeOption func(*WakuNodeParameters) error // Default options used in the libp2p node var DefaultWakuNodeOptions = []WakuNodeOption{ - WithLogger(utils.Logger().Desugar()), + WithLogger(utils.Logger()), WithWakuRelay(), } @@ -114,7 +114,7 @@ func (w WakuNodeParameters) AddressFactory() basichost.AddrsFactory { // WithLogger is a WakuNodeOption that adds a custom logger func WithLogger(l *zap.Logger) WakuNodeOption { return func(params *WakuNodeParameters) error { - params.logger = l.Sugar() + params.logger = l return nil } } diff --git a/waku/v2/node/wakuoptions_test.go b/waku/v2/node/wakuoptions_test.go index 158b85a1..2c7bfb2c 100644 --- a/waku/v2/node/wakuoptions_test.go +++ b/waku/v2/node/wakuoptions_test.go @@ -30,7 +30,7 @@ func TestWakuOptions(t *testing.T) { advertiseAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0") storeFactory := func(w *WakuNode) store.Store { - return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log) + return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log.Sugar()) } options := []WakuNodeOption{ diff --git a/waku/v2/protocol/filter/waku_filter_option.go b/waku/v2/protocol/filter/waku_filter_option.go index 135eb7dc..7d95a4af 100644 --- a/waku/v2/protocol/filter/waku_filter_option.go +++ b/waku/v2/protocol/filter/waku_filter_option.go @@ -40,7 +40,7 @@ func WithPeer(p peer.ID) FilterSubscribeOption { func WithAutomaticPeerSelection() FilterSubscribeOption { return func(params *FilterSubscribeParameters) { - p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1), params.log) + p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1), params.log.Desugar()) if err == nil { params.selectedPeer = *p } else { @@ -51,7 +51,7 @@ func WithAutomaticPeerSelection() FilterSubscribeOption { func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1), params.log) + p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1), params.log.Desugar()) if err == nil { params.selectedPeer = *p } else { diff --git a/waku/v2/protocol/filter/waku_filter_option_test.go b/waku/v2/protocol/filter/waku_filter_option_test.go index c21c33d4..34ae5f0d 100644 --- a/waku/v2/protocol/filter/waku_filter_option_test.go +++ b/waku/v2/protocol/filter/waku_filter_option_test.go @@ -25,7 +25,7 @@ func TestFilterOption(t *testing.T) { params := new(FilterSubscribeParameters) params.host = host - params.log = utils.Logger() + params.log = utils.Logger().Sugar() for _, opt := range options { opt(params) diff --git a/waku/v2/protocol/filter/waku_filter_test.go b/waku/v2/protocol/filter/waku_filter_test.go index 2b1370cc..67f4f891 100644 --- a/waku/v2/protocol/filter/waku_filter_test.go +++ b/waku/v2/protocol/filter/waku_filter_test.go @@ -23,7 +23,7 @@ func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*rel host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := relay.NewWakuRelay(context.Background(), host, broadcaster, 0, utils.Logger()) + relay, err := relay.NewWakuRelay(context.Background(), host, broadcaster, 0, utils.Logger().Sugar()) require.NoError(t, err) sub, err := relay.SubscribeToTopic(context.Background(), topic) @@ -39,7 +39,7 @@ func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - filter, _ := NewWakuFilter(context.Background(), host, false, utils.Logger()) + filter, _ := NewWakuFilter(context.Background(), host, false, utils.Logger().Sugar()) return filter, host } @@ -69,7 +69,7 @@ func TestWakuFilter(t *testing.T) { defer node2.Stop() defer sub2.Unsubscribe() - node2Filter, _ := NewWakuFilter(ctx, host2, true, utils.Logger()) + node2Filter, _ := NewWakuFilter(ctx, host2, true, utils.Logger().Sugar()) broadcaster.Register(&testTopic, node2Filter.MsgC) host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) @@ -154,7 +154,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { defer node2.Stop() defer sub2.Unsubscribe() - node2Filter, _ := NewWakuFilter(ctx, host2, true, utils.Logger(), WithTimeout(3*time.Second)) + node2Filter, _ := NewWakuFilter(ctx, host2, true, utils.Logger().Sugar(), WithTimeout(3*time.Second)) broadcaster.Register(&testTopic, node2Filter.MsgC) host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index 5c55af93..8b12aaf7 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -27,7 +27,7 @@ func WithPeer(p peer.ID) LightPushOption { func WithAutomaticPeerSelection(host host.Host) LightPushOption { return func(params *LightPushParameters) { - p, err := utils.SelectPeer(host, string(LightPushID_v20beta1), params.log) + p, err := utils.SelectPeer(host, string(LightPushID_v20beta1), params.log.Desugar()) if err == nil { params.selectedPeer = *p } else { @@ -38,7 +38,7 @@ func WithAutomaticPeerSelection(host host.Host) LightPushOption { func WithFastestPeerSelection(ctx context.Context) LightPushOption { return func(params *LightPushParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1), params.log) + p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1), params.log.Desugar()) if err == nil { params.selectedPeer = *p } else { diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option_test.go b/waku/v2/protocol/lightpush/waku_lightpush_option_test.go index eba2cdd4..bde2262b 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option_test.go @@ -27,7 +27,7 @@ func TestLightPushOption(t *testing.T) { params := new(LightPushParameters) params.host = host - params.log = utils.Logger() + params.log = utils.Logger().Sugar() for _, opt := range options { opt(params) diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index a5aee33b..7930aa70 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -25,7 +25,7 @@ func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscri host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, utils.Logger()) + relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, utils.Logger().Sugar()) require.NoError(t, err) sub, err := relay.SubscribeToTopic(context.Background(), topic) @@ -56,7 +56,7 @@ func TestWakuLightPush(t *testing.T) { defer sub2.Unsubscribe() ctx := context.Background() - lightPushNode2 := NewWakuLightPush(ctx, host2, node2, utils.Logger()) + lightPushNode2 := NewWakuLightPush(ctx, host2, node2, utils.Logger().Sugar()) err := lightPushNode2.Start() require.NoError(t, err) defer lightPushNode2.Stop() @@ -66,7 +66,7 @@ func TestWakuLightPush(t *testing.T) { clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger()) + client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger().Sugar()) host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) err = host2.Peerstore().AddProtocols(host1.ID(), string(relay.WakuRelayID_v200)) @@ -122,7 +122,7 @@ func TestWakuLightPushStartWithoutRelay(t *testing.T) { clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader) require.NoError(t, err) - client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger()) + client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger().Sugar()) err = client.Start() require.Errorf(t, err, "relay is required") @@ -136,7 +136,7 @@ func TestWakuLightPushNoPeers(t *testing.T) { clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader) require.NoError(t, err) - client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger()) + client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger().Sugar()) _, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", 0), testTopic) require.Errorf(t, err, "no suitable remote peers") diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index 79f17702..d8079451 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -20,7 +20,7 @@ func TestWakuRelay(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := NewWakuRelay(context.Background(), host, nil, 0, utils.Logger()) + relay, err := NewWakuRelay(context.Background(), host, nil, 0, utils.Logger().Sugar()) defer relay.Stop() require.NoError(t, err) diff --git a/waku/v2/protocol/store/waku_resume_test.go b/waku/v2/protocol/store/waku_resume_test.go index 2a85dde6..d683860d 100644 --- a/waku/v2/protocol/store/waku_resume_test.go +++ b/waku/v2/protocol/store/waku_resume_test.go @@ -21,7 +21,7 @@ func TestFindLastSeenMessage(t *testing.T) { msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), "test") msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), "test") - s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger()) + s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) _ = s.storeMessage(msg1) _ = s.storeMessage(msg3) _ = s.storeMessage(msg5) @@ -38,7 +38,7 @@ func TestResume(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger()) + s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger().Sugar()) s1.Start(ctx) defer s1.Stop() @@ -56,7 +56,7 @@ func TestResume(t *testing.T) { host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger()) + s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger().Sugar()) s2.Start(ctx) defer s2.Stop() @@ -88,7 +88,7 @@ func TestResumeWithListOfPeers(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger()) + s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger().Sugar()) s1.Start(ctx) defer s1.Stop() @@ -99,7 +99,7 @@ func TestResumeWithListOfPeers(t *testing.T) { host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger()) + s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger().Sugar()) s2.Start(ctx) defer s2.Stop() @@ -121,7 +121,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger()) + s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger().Sugar()) s1.Start(ctx) defer s1.Stop() @@ -132,7 +132,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger()) + s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger().Sugar()) s2.Start(ctx) defer s2.Stop() diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index 044a6653..91ed873c 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -491,7 +491,7 @@ func WithPeer(p peer.ID) HistoryRequestOption { // to request the message history func WithAutomaticPeerSelection() HistoryRequestOption { return func(params *HistoryRequestParameters) { - p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta4), params.s.log) + p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta4), params.s.log.Desugar()) if err == nil { params.selectedPeer = *p } else { @@ -502,7 +502,7 @@ func WithAutomaticPeerSelection() HistoryRequestOption { func WithFastestPeerSelection(ctx context.Context) HistoryRequestOption { return func(params *HistoryRequestParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta4), params.s.log) + p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta4), params.s.log.Desugar()) if err == nil { params.selectedPeer = *p } else { @@ -774,7 +774,7 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList } if len(peerList) == 0 { - p, err := utils.SelectPeer(store.h, string(StoreID_v20beta4), store.log) + p, err := utils.SelectPeer(store.h, string(StoreID_v20beta4), store.log.Desugar()) if err != nil { store.log.Info("Error selecting peer: ", err) return -1, ErrNoPeersAvailable diff --git a/waku/v2/protocol/store/waku_store_persistence_test.go b/waku/v2/protocol/store/waku_store_persistence_test.go index 262cf461..4cd6435c 100644 --- a/waku/v2/protocol/store/waku_store_persistence_test.go +++ b/waku/v2/protocol/store/waku_store_persistence_test.go @@ -21,10 +21,10 @@ func TestStorePersistence(t *testing.T) { db, err := sqlite.NewDB(":memory:") require.NoError(t, err) - dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db)) + dbStore, err := persistence.NewDBStore(utils.Logger().Sugar(), persistence.WithDB(db)) require.NoError(t, err) - s1 := NewWakuStore(nil, nil, dbStore, 0, 0, utils.Logger()) + s1 := NewWakuStore(nil, nil, dbStore, 0, 0, utils.Logger().Sugar()) s1.fetchDBRecords(ctx) require.Len(t, s1.messageQueue.messages, 0) @@ -39,7 +39,7 @@ func TestStorePersistence(t *testing.T) { _ = s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic)) - s2 := NewWakuStore(nil, nil, dbStore, 0, 0, utils.Logger()) + s2 := NewWakuStore(nil, nil, dbStore, 0, 0, utils.Logger().Sugar()) s2.fetchDBRecords(ctx) require.Len(t, s2.messageQueue.messages, 1) require.Equal(t, msg, s2.messageQueue.messages[0].msg) diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index 7dfdb697..08488f8f 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -20,7 +20,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger()) + s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger().Sugar()) s1.Start(ctx) defer s1.Stop() @@ -39,7 +39,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { // Simulate a message has been received via relay protocol s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1) - s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger()) + s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger().Sugar()) s2.Start(ctx) defer s2.Stop() @@ -66,7 +66,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger()) + s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger().Sugar()) s1.Start(ctx) defer s1.Stop() @@ -92,7 +92,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta4)) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger()) + s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger().Sugar()) s2.Start(ctx) defer s2.Stop() diff --git a/waku/v2/protocol/store/waku_store_query_test.go b/waku/v2/protocol/store/waku_store_query_test.go index 28f12706..05946e8c 100644 --- a/waku/v2/protocol/store/waku_store_query_test.go +++ b/waku/v2/protocol/store/waku_store_query_test.go @@ -17,7 +17,7 @@ func TestStoreQuery(t *testing.T) { msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch()) msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger()) + s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) _ = s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic)) _ = s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic)) @@ -43,7 +43,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger()) + s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) _ = s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic)) _ = s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic)) @@ -77,7 +77,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger()) + s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) _ = s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2)) @@ -109,7 +109,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger()) + s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) _ = s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2)) @@ -131,7 +131,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger()) + s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) _ = s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1)) @@ -150,7 +150,7 @@ func TestStoreQueryForwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger()) + s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) for i := 0; i < 10; i++ { msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch()) msg.Payload = []byte{byte(i)} @@ -174,7 +174,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger()) + s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) for i := 0; i < 10; i++ { msg := &pb.WakuMessage{ Payload: []byte{byte(i)}, @@ -200,7 +200,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { } func TestTemporalHistoryQueries(t *testing.T) { - s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger()) + s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) var messages []*pb.WakuMessage for i := 0; i < 10; i++ { diff --git a/waku/v2/protocol/swap/waku_swap_test.go b/waku/v2/protocol/swap/waku_swap_test.go index 6141ead2..4f268869 100644 --- a/waku/v2/protocol/swap/waku_swap_test.go +++ b/waku/v2/protocol/swap/waku_swap_test.go @@ -8,7 +8,7 @@ import ( ) func TestSwapCreditDebit(t *testing.T) { - swap := NewWakuSwap(utils.Logger(), []SwapOption{ + swap := NewWakuSwap(utils.Logger().Sugar(), []SwapOption{ WithMode(SoftMode), WithThreshold(0, 0), }...) diff --git a/waku/v2/rpc/admin_test.go b/waku/v2/rpc/admin_test.go index 193f0e5f..3a5aa42f 100644 --- a/waku/v2/rpc/admin_test.go +++ b/waku/v2/rpc/admin_test.go @@ -24,7 +24,7 @@ func makeAdminService(t *testing.T) *AdminService { require.NoError(t, err) err = n.Start() require.NoError(t, err) - return &AdminService{n, utils.Logger()} + return &AdminService{n, utils.Logger().Sugar()} } func TestV1Peers(t *testing.T) { @@ -33,7 +33,7 @@ func TestV1Peers(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := relay.NewWakuRelay(context.Background(), host, nil, 0, utils.Logger()) + relay, err := relay.NewWakuRelay(context.Background(), host, nil, 0, utils.Logger().Sugar()) require.NoError(t, err) defer relay.Stop() diff --git a/waku/v2/rpc/filter_test.go b/waku/v2/rpc/filter_test.go index 5edaba5b..ca78f25a 100644 --- a/waku/v2/rpc/filter_test.go +++ b/waku/v2/rpc/filter_test.go @@ -29,7 +29,7 @@ func makeFilterService(t *testing.T) *FilterService { _, err = n.Relay().SubscribeToTopic(context.Background(), testTopic) require.NoError(t, err) - return NewFilterService(n, utils.Logger()) + return NewFilterService(n, utils.Logger().Sugar()) } func TestFilterSubscription(t *testing.T) { @@ -39,13 +39,13 @@ func TestFilterSubscription(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, utils.Logger()) + node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, utils.Logger().Sugar()) require.NoError(t, err) _, err = node.SubscribeToTopic(context.Background(), testTopic) require.NoError(t, err) - _, _ = filter.NewWakuFilter(context.Background(), host, false, utils.Logger()) + _, _ = filter.NewWakuFilter(context.Background(), host, false, utils.Logger().Sugar()) d := makeFilterService(t) defer d.node.Stop() diff --git a/waku/v2/rpc/private_test.go b/waku/v2/rpc/private_test.go index 41c7a86e..74ec996e 100644 --- a/waku/v2/rpc/private_test.go +++ b/waku/v2/rpc/private_test.go @@ -16,7 +16,7 @@ func makePrivateService(t *testing.T) *PrivateService { err = n.Start() require.NoError(t, err) - return NewPrivateService(n, utils.Logger()) + return NewPrivateService(n, utils.Logger().Sugar()) } func TestGetV1SymmetricKey(t *testing.T) { diff --git a/waku/v2/rpc/relay_test.go b/waku/v2/rpc/relay_test.go index b6a7c6ff..f0c343bc 100644 --- a/waku/v2/rpc/relay_test.go +++ b/waku/v2/rpc/relay_test.go @@ -19,7 +19,7 @@ func makeRelayService(t *testing.T) *RelayService { require.NoError(t, err) err = n.Start() require.NoError(t, err) - return NewRelayService(n, utils.Logger()) + return NewRelayService(n, utils.Logger().Sugar()) } func TestPostV1Message(t *testing.T) { diff --git a/waku/v2/rpc/store_test.go b/waku/v2/rpc/store_test.go index f5b28951..29442c99 100644 --- a/waku/v2/rpc/store_test.go +++ b/waku/v2/rpc/store_test.go @@ -15,7 +15,7 @@ func makeStoreService(t *testing.T) *StoreService { require.NoError(t, err) err = n.Start() require.NoError(t, err) - return &StoreService{n, utils.Logger()} + return &StoreService{n, utils.Logger().Sugar()} } func TestStoreGetV1Messages(t *testing.T) { diff --git a/waku/v2/rpc/waku_rpc_test.go b/waku/v2/rpc/waku_rpc_test.go index 1ccee7b5..ac0934d7 100644 --- a/waku/v2/rpc/waku_rpc_test.go +++ b/waku/v2/rpc/waku_rpc_test.go @@ -14,7 +14,7 @@ func TestWakuRpc(t *testing.T) { n, err := node.New(context.Background(), options) require.NoError(t, err) - rpc := NewWakuRpc(n, "127.0.0.1", 8080, true, true, utils.Logger()) + rpc := NewWakuRpc(n, "127.0.0.1", 8080, true, true, utils.Logger().Sugar()) require.NotNil(t, rpc.server) require.Equal(t, rpc.server.Addr, "127.0.0.1:8080") } diff --git a/waku/v2/utils/logger.go b/waku/v2/utils/logger.go index f7ae0d2e..2ee10c77 100644 --- a/waku/v2/utils/logger.go +++ b/waku/v2/utils/logger.go @@ -20,7 +20,7 @@ func SetLogLevel(level string) error { } // Logger creates a zap.Logger with some reasonable defaults -func Logger() *zap.SugaredLogger { +func Logger() *zap.Logger { if log == nil { cfg := zap.Config{ Encoding: "console", @@ -45,5 +45,5 @@ func Logger() *zap.SugaredLogger { log = logger.Named("gowaku") } - return log.Sugar() + return log } diff --git a/waku/v2/utils/peer.go b/waku/v2/utils/peer.go index 42eb7f55..c58e47ae 100644 --- a/waku/v2/utils/peer.go +++ b/waku/v2/utils/peer.go @@ -10,6 +10,7 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "github.com/status-im/go-waku/logging" "go.uber.org/zap" ) @@ -18,7 +19,7 @@ import ( var ErrNoPeersAvailable = errors.New("no suitable peers found") // SelectPeer is used to return a random peer that supports a given protocol. -func SelectPeer(host host.Host, protocolId string, log *zap.SugaredLogger) (*peer.ID, error) { +func SelectPeer(host host.Host, protocolId string, log *zap.Logger) (*peer.ID, error) { // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. // Ideally depending on the query and our set of peers we take a subset of ideal peers. // This will require us to check for various factors such as: @@ -29,7 +30,7 @@ func SelectPeer(host host.Host, protocolId string, log *zap.SugaredLogger) (*pee for _, peer := range host.Peerstore().Peers() { protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId) if err != nil { - log.Error("error obtaining the protocols supported by peers", zap.Error(err)) + log.Error("obtaining protocols supported by peers", zap.Error(err), logging.HostID("peer", peer)) return nil, err } @@ -52,7 +53,7 @@ type pingResult struct { } // SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time -func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string, log *zap.SugaredLogger) (*peer.ID, error) { +func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string, log *zap.Logger) (*peer.ID, error) { var peers peer.IDSlice for _, peer := range host.Peerstore().Peers() { protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId)