From 7c44369def09dcc4f2e9f46fa3fd50a4514e0e12 Mon Sep 17 00:00:00 2001 From: Martin Kobetic Date: Mon, 30 May 2022 11:55:30 -0400 Subject: [PATCH] feat: structured logging followup (#248) --- logging/logging.go | 17 +++++ tests/utils.go | 12 +--- waku/metrics/http.go | 14 ++--- waku/metrics/http_test.go | 2 +- waku/node.go | 6 +- waku/persistence/store.go | 15 +++-- waku/persistence/store_test.go | 6 +- waku/v2/discv5/discover.go | 32 +++++----- waku/v2/discv5/discover_test.go | 6 +- waku/v2/node/wakunode2.go | 16 ++--- waku/v2/node/wakuoptions_test.go | 2 +- waku/v2/protocol/filter/waku_filter.go | 57 ++++++++--------- waku/v2/protocol/filter/waku_filter_option.go | 10 +-- .../filter/waku_filter_option_test.go | 2 +- waku/v2/protocol/filter/waku_filter_test.go | 8 +-- waku/v2/protocol/lightpush/waku_lightpush.go | 34 +++++----- .../lightpush/waku_lightpush_option.go | 10 +-- .../lightpush/waku_lightpush_option_test.go | 2 +- .../protocol/lightpush/waku_lightpush_test.go | 10 +-- waku/v2/protocol/relay/waku_relay.go | 18 +++--- waku/v2/protocol/relay/waku_relay_test.go | 2 +- waku/v2/protocol/store/waku_resume_test.go | 14 ++--- waku/v2/protocol/store/waku_store.go | 62 ++++++++++--------- .../store/waku_store_persistence_test.go | 6 +- .../store/waku_store_protocol_test.go | 8 +-- .../protocol/store/waku_store_query_test.go | 16 ++--- waku/v2/protocol/swap/waku_swap.go | 9 +-- waku/v2/protocol/swap/waku_swap_test.go | 2 +- waku/v2/rpc/admin.go | 8 +-- waku/v2/rpc/admin_test.go | 4 +- waku/v2/rpc/filter.go | 8 +-- waku/v2/rpc/filter_test.go | 6 +- waku/v2/rpc/private.go | 4 +- waku/v2/rpc/private_test.go | 2 +- waku/v2/rpc/relay.go | 10 +-- waku/v2/rpc/relay_test.go | 2 +- waku/v2/rpc/store.go | 4 +- waku/v2/rpc/store_test.go | 2 +- waku/v2/rpc/waku_rpc.go | 22 +++---- waku/v2/rpc/waku_rpc_test.go | 2 +- 40 files changed, 245 insertions(+), 227 deletions(-) diff --git a/logging/logging.go b/logging/logging.go index aa03fd4a..af0ce986 100644 --- a/logging/logging.go +++ b/logging/logging.go @@ -8,9 +8,11 @@ package logging import ( + "net" "time" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p-core/peer" "github.com/multiformats/go-multiaddr" "github.com/status-im/go-waku/waku/v2/protocol/pb" @@ -110,3 +112,18 @@ func HexBytes(key string, bytes []byte) zap.Field { func (bytes hexBytes) String() string { return hexutil.Encode(bytes) } + +// ENode creates a field for ENR node. +func ENode(key string, node *enode.Node) zap.Field { + return zap.Stringer(key, node) +} + +// TCPAddr creates a field for TCP v4/v6 address and port +func TCPAddr(key string, ip net.IP, port int) zap.Field { + return zap.Stringer(key, &net.TCPAddr{IP: ip, Port: port}) +} + +// UDPAddr creates a field for UDP v4/v6 address and port +func UDPAddr(key string, ip net.IP, port int) zap.Field { + return zap.Stringer(key, &net.UDPAddr{IP: ip, Port: port}) +} diff --git a/tests/utils.go b/tests/utils.go index 94ac6e0f..b494add2 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -9,25 +9,15 @@ import ( "net" "testing" + "github.com/ethereum/go-ethereum/log" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr" "github.com/status-im/go-waku/waku/v2/protocol/pb" - "go.uber.org/zap" ) -var log *zap.SugaredLogger = nil - -func Logger() *zap.SugaredLogger { - if log == nil { - l, _ := zap.NewDevelopment() - log = l.Sugar() - } - return log -} - // GetHostAddress returns the first listen address used by a host func GetHostAddress(ha host.Host) ma.Multiaddr { return ha.Addrs()[0] diff --git a/waku/metrics/http.go b/waku/metrics/http.go index dad80721..8494c7fb 100644 --- a/waku/metrics/http.go +++ b/waku/metrics/http.go @@ -16,11 +16,11 @@ import ( // Server runs and controls a HTTP pprof interface. type Server struct { server *http.Server - log *zap.SugaredLogger + log *zap.Logger } // NewMetricsServer creates a prometheus server on a particular interface and port -func NewMetricsServer(address string, port int, log *zap.SugaredLogger) *Server { +func NewMetricsServer(address string, port int, log *zap.Logger) *Server { p := Server{ log: log.Named("metrics"), } @@ -32,12 +32,12 @@ func NewMetricsServer(address string, port int, log *zap.SugaredLogger) *Server pe, err := prometheus.NewExporter(prometheus.Options{}) if err != nil { - p.log.Fatalf("Failed to create the Prometheus stats exporter: %v", err) + p.log.Fatal("creating Prometheus stats exporter", zap.Error(err)) } view.RegisterExporter(pe) - p.log.Info(fmt.Sprintf("Starting server at %s:%d", address, port)) + p.log.Info("starting server", zap.String("address", address), zap.Int("port", port)) mux := http.NewServeMux() mux.Handle("/metrics", pe) @@ -58,7 +58,7 @@ func NewMetricsServer(address string, port int, log *zap.SugaredLogger) *Server metrics.PeersView, metrics.DialsView, ); err != nil { - p.log.Fatalf("Failed to register views: %v", err) + p.log.Fatal("registering views", zap.Error(err)) } p.server = &http.Server{ @@ -71,14 +71,14 @@ func NewMetricsServer(address string, port int, log *zap.SugaredLogger) *Server // Start executes the HTTP server in the background. func (p *Server) Start() { - p.log.Info("server stopped ", p.server.ListenAndServe()) + p.log.Info("server stopped ", zap.Error(p.server.ListenAndServe())) } // Stop shuts down the prometheus server func (p *Server) Stop(ctx context.Context) error { err := p.server.Shutdown(ctx) if err != nil { - p.log.Error("error while stopping server", err) + p.log.Error("stopping server", zap.Error(err)) return err } diff --git a/waku/metrics/http_test.go b/waku/metrics/http_test.go index 1a3b05c4..b3641af4 100644 --- a/waku/metrics/http_test.go +++ b/waku/metrics/http_test.go @@ -10,7 +10,7 @@ import ( ) func TestStartAndStopMetricsServer(t *testing.T) { - server := NewMetricsServer("0.0.0.0", 9876, utils.Logger().Sugar()) + server := NewMetricsServer("0.0.0.0", 9876, utils.Logger()) go func() { time.Sleep(1 * time.Second) diff --git a/waku/node.go b/waku/node.go index ed49ae07..743fe443 100644 --- a/waku/node.go +++ b/waku/node.go @@ -105,7 +105,7 @@ func Execute(options Options) { var metricsServer *metrics.Server if options.Metrics.Enable { - metricsServer = metrics.NewMetricsServer(options.Metrics.Address, options.Metrics.Port, logger.Sugar()) + metricsServer = metrics.NewMetricsServer(options.Metrics.Address, options.Metrics.Port, logger) go metricsServer.Start() } @@ -190,7 +190,7 @@ func Execute(options Options) { if options.Store.Enable { nodeOpts = append(nodeOpts, node.WithWakuStoreAndRetentionPolicy(options.Store.ShouldResume, options.Store.RetentionMaxDaysDuration(), options.Store.RetentionMaxMessages)) if options.UseDB { - dbStore, err := persistence.NewDBStore(logger.Sugar(), persistence.WithDB(db), persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionMaxDaysDuration())) + dbStore, err := persistence.NewDBStore(logger, persistence.WithDB(db), persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionMaxDaysDuration())) failOnErr(err, "DBStore") nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore)) } else { @@ -299,7 +299,7 @@ func Execute(options Options) { var rpcServer *rpc.WakuRpc if options.RPCServer.Enable { - rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.RPCServer.Private, logger.Sugar()) + rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.RPCServer.Private, logger) rpcServer.Start() } diff --git a/waku/persistence/store.go b/waku/persistence/store.go index c66c028a..d9e1dfc5 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -2,7 +2,6 @@ package persistence import ( "database/sql" - "fmt" "time" "github.com/status-im/go-waku/waku/v2/protocol/pb" @@ -20,7 +19,7 @@ type MessageProvider interface { type DBStore struct { MessageProvider db *sql.DB - log *zap.SugaredLogger + log *zap.Logger maxMessages int maxDuration time.Duration @@ -67,7 +66,7 @@ func WithRetentionPolicy(maxMessages int, maxDuration time.Duration) DBOption { // Creates a new DB store using the db specified via options. // It will create a messages table if it does not exist and // clean up records according to the retention policy used -func NewDBStore(log *zap.SugaredLogger, options ...DBOption) (*DBStore, error) { +func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) { result := new(DBStore) result.log = log.Named("dbstore") @@ -124,7 +123,7 @@ func (d *DBStore) cleanOlderRecords() error { return err } elapsed := time.Since(start) - d.log.Debug(fmt.Sprintf("Deleting older records from the DB took %s", elapsed)) + d.log.Debug("deleting older records from the DB", zap.Duration("duration", elapsed)) } // Limit number of records to a max N @@ -136,7 +135,7 @@ func (d *DBStore) cleanOlderRecords() error { return err } elapsed := time.Since(start) - d.log.Debug(fmt.Sprintf("Deleting excess records from the DB took %s", elapsed)) + d.log.Debug("deleting excess records from the DB", zap.Duration("duration", elapsed)) } return nil @@ -171,7 +170,7 @@ func (d *DBStore) GetAll() ([]StoredMessage, error) { start := time.Now() defer func() { elapsed := time.Since(start) - d.log.Info(fmt.Sprintf("Loading records from the DB took %s", elapsed)) + d.log.Info("loading records from the DB", zap.Duration("duration", elapsed)) }() rows, err := d.db.Query("SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version FROM message ORDER BY senderTimestamp ASC") @@ -194,7 +193,7 @@ func (d *DBStore) GetAll() ([]StoredMessage, error) { err = rows.Scan(&id, &receiverTimestamp, &senderTimestamp, &contentTopic, &pubsubTopic, &payload, &version) if err != nil { - d.log.Fatal(err) + d.log.Fatal("scanning next row", zap.Error(err)) } msg := new(pb.WakuMessage) @@ -213,7 +212,7 @@ func (d *DBStore) GetAll() ([]StoredMessage, error) { result = append(result, record) } - d.log.Info(fmt.Sprintf("DB returned %d records", len(result))) + d.log.Info("DB returned records", zap.Int("count", len(result))) err = rows.Err() if err != nil { diff --git a/waku/persistence/store_test.go b/waku/persistence/store_test.go index edbe2ee9..52600b01 100644 --- a/waku/persistence/store_test.go +++ b/waku/persistence/store_test.go @@ -33,7 +33,7 @@ func createIndex(digest []byte, receiverTime int64) *pb.Index { func TestDbStore(t *testing.T) { db := NewMock() option := WithDB(db) - store, err := NewDBStore(utils.Logger().Sugar(), option) + store, err := NewDBStore(utils.Logger(), option) require.NoError(t, err) res, err := store.GetAll() @@ -54,7 +54,7 @@ func TestDbStore(t *testing.T) { func TestStoreRetention(t *testing.T) { db := NewMock() - store, err := NewDBStore(utils.Logger().Sugar(), WithDB(db), WithRetentionPolicy(5, 20*time.Second)) + store, err := NewDBStore(utils.Logger(), WithDB(db), WithRetentionPolicy(5, 20*time.Second)) require.NoError(t, err) insertTime := time.Now() @@ -74,7 +74,7 @@ func TestStoreRetention(t *testing.T) { // This step simulates starting go-waku again from scratch - store, err = NewDBStore(utils.Logger().Sugar(), WithDB(db), WithRetentionPolicy(5, 40*time.Second)) + store, err = NewDBStore(utils.Logger(), WithDB(db), WithRetentionPolicy(5, 40*time.Second)) require.NoError(t, err) dbResults, err = store.GetAll() diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 2458f0c2..60d3acaa 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -4,7 +4,6 @@ import ( "context" "crypto/ecdsa" "errors" - "fmt" "math" "math/rand" "net" @@ -20,6 +19,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" "github.com/status-im/go-discover/discover" + "github.com/status-im/go-waku/logging" "github.com/status-im/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -38,7 +38,7 @@ type DiscoveryV5 struct { NAT nat.Interface quit chan struct{} - log *zap.SugaredLogger + log *zap.Logger wg *sync.WaitGroup @@ -101,7 +101,7 @@ func DefaultOptions() []DiscoveryV5Option { } } -func NewDiscoveryV5(host host.Host, addresses []ma.Multiaddr, priv *ecdsa.PrivateKey, wakuFlags utils.WakuEnrBitfield, log *zap.SugaredLogger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) { +func NewDiscoveryV5(host host.Host, addresses []ma.Multiaddr, priv *ecdsa.PrivateKey, wakuFlags utils.WakuEnrBitfield, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) { params := new(discV5Parameters) optList := DefaultOptions() optList = append(optList, opts...) @@ -156,7 +156,7 @@ func NewDiscoveryV5(host host.Host, addresses []ma.Multiaddr, priv *ecdsa.Privat }, nil } -func newLocalnode(priv *ecdsa.PrivateKey, ipAddr *net.TCPAddr, udpPort int, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.SugaredLogger) (*enode.LocalNode, error) { +func newLocalnode(priv *ecdsa.PrivateKey, ipAddr *net.TCPAddr, udpPort int, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.Logger) (*enode.LocalNode, error) { db, err := enode.OpenDB("") if err != nil { return nil, err @@ -170,13 +170,13 @@ func newLocalnode(priv *ecdsa.PrivateKey, ipAddr *net.TCPAddr, udpPort int, waku if udpPort > 0 && udpPort <= math.MaxUint16 { localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion] } else { - log.Error("could not set udpPort ", zap.Int("port", udpPort)) + log.Error("setting udpPort", zap.Int("port", udpPort)) } if ipAddr.Port > 0 && ipAddr.Port <= math.MaxUint16 { localnode.Set(enr.TCP(uint16(ipAddr.Port))) // lgtm [go/incorrect-integer-conversion] } else { - log.Error("could not set tcpPort ", zap.Int("port", ipAddr.Port)) + log.Error("setting tcpPort", zap.Int("port", ipAddr.Port)) } if advertiseAddr != nil { @@ -212,8 +212,10 @@ func (d *DiscoveryV5) listen() error { d.listener = listener - d.log.Info(fmt.Sprintf("Started Discovery V5 at %s:%d, advertising IP: %s:%d", d.udpAddr.IP, d.udpAddr.Port, d.localnode.Node().IP(), d.localnode.Node().TCP())) - d.log.Info("Discv5: discoverable ENR ", d.localnode.Node()) + d.log.Info("started Discovery V5", + zap.Stringer("listening", d.udpAddr), + logging.TCPAddr("advertising", d.localnode.Node().IP(), d.localnode.Node().TCP())) + d.log.Info("Discovery V5: discoverable ENR ", logging.ENode("enr", d.localnode.Node())) return nil } @@ -243,7 +245,7 @@ func (d *DiscoveryV5) Stop() { d.listener.Close() d.listener = nil - d.log.Info("Stopped Discovery V5") + d.log.Info("stopped Discovery V5") d.wg.Wait() } @@ -270,8 +272,8 @@ func (d *DiscoveryV5) UpdateAddr(addr *net.TCPAddr) error { d.localnode.SetStaticIP(addr.IP) d.localnode.Set(enr.TCP(uint16(addr.Port))) // lgtm [go/incorrect-integer-conversion] - d.log.Info(fmt.Sprintf("Updated Discovery V5 node: %s:%d", d.localnode.Node().IP(), d.localnode.Node().TCP())) - d.log.Info("Discovery V5 ", d.localnode.Node()) + d.log.Info("updated Discovery V5 node address", logging.TCPAddr("address", d.localnode.Node().IP(), d.localnode.Node().TCP())) + d.log.Info("Discovery V5", logging.ENode("enr", d.localnode.Node())) return nil } @@ -298,7 +300,7 @@ func hasTCPPort(node *enode.Node) bool { enrTCP := new(enr.TCP) if err := node.Record().Load(enr.WithEntry(enrTCP.ENRKey(), enrTCP)); err != nil { if !enr.IsNotFound(err) { - utils.Logger().Named("discv5").Error("could not retrieve port for enr ", zap.Any("node", node)) + utils.Logger().Named("discv5").Error("retrieving port for enr", logging.ENode("enr", node)) } return false } @@ -319,7 +321,7 @@ func evaluateNode(node *enode.Node) bool { _, err := utils.EnodeToPeerInfo(node) if err != nil { - utils.Logger().Named("discv5").Error("could not obtain peer info from enode:", zap.Error(err)) + utils.Logger().Named("discv5").Error("obtaining peer info from enode", logging.ENode("enr", node), zap.Error(err)) return false } @@ -358,13 +360,13 @@ func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limi addresses, err := utils.Multiaddress(iterator.Node()) if err != nil { - d.log.Error(err) + d.log.Error("extracting multiaddrs from enr", zap.Error(err)) continue } peerAddrs, err := peer.AddrInfosFromP2pAddrs(addresses...) if err != nil { - d.log.Error(err) + d.log.Error("converting multiaddrs to addrinfos", zap.Error(err)) continue } diff --git a/waku/v2/discv5/discover_test.go b/waku/v2/discv5/discover_test.go index 125b9d1e..cb02349a 100644 --- a/waku/v2/discv5/discover_test.go +++ b/waku/v2/discv5/discover_test.go @@ -47,19 +47,19 @@ func TestDiscV5(t *testing.T) { host1, _, prvKey1 := createHost(t) udpPort1, err := tests.FindFreePort(t, "127.0.0.1", 3) require.NoError(t, err) - d1, err := NewDiscoveryV5(host1, host1.Addrs(), prvKey1, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger().Sugar(), WithUDPPort(udpPort1)) + d1, err := NewDiscoveryV5(host1, host1.Addrs(), prvKey1, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger(), WithUDPPort(udpPort1)) require.NoError(t, err) host2, _, prvKey2 := createHost(t) udpPort2, err := tests.FindFreePort(t, "127.0.0.1", 3) require.NoError(t, err) - d2, err := NewDiscoveryV5(host2, host2.Addrs(), prvKey2, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger().Sugar(), WithUDPPort(udpPort2), WithBootnodes([]*enode.Node{d1.localnode.Node()})) + d2, err := NewDiscoveryV5(host2, host2.Addrs(), prvKey2, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger(), WithUDPPort(udpPort2), WithBootnodes([]*enode.Node{d1.localnode.Node()})) require.NoError(t, err) host3, _, prvKey3 := createHost(t) udpPort3, err := tests.FindFreePort(t, "127.0.0.1", 3) require.NoError(t, err) - d3, err := NewDiscoveryV5(host3, host3.Addrs(), prvKey3, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger().Sugar(), WithUDPPort(udpPort3), WithBootnodes([]*enode.Node{d2.localnode.Node()})) + d3, err := NewDiscoveryV5(host3, host3.Addrs(), prvKey3, utils.NewWakuEnrBitfield(true, true, true, true), utils.Logger(), WithUDPPort(udpPort3), WithBootnodes([]*enode.Node{d2.localnode.Node()})) require.NoError(t, err) defer d1.Stop() diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 224f2f82..a9e42387 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -86,7 +86,7 @@ type WakuNode struct { } func defaultStoreFactory(w *WakuNode) store.Store { - return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log.Sugar()) + return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log) } // New is used to instantiate a WakuNode using a set of WakuNodeOptions @@ -218,7 +218,7 @@ func (w *WakuNode) onAddrChange() { if w.opts.enableDiscV5 { err := w.discoveryV5.UpdateAddr(addr) if err != nil { - w.log.Error("updating DiscV5 address with IP", zap.Stringer("ip", addr.IP), zap.Int("port", addr.Port), zap.Error(err)) + w.log.Error("updating DiscV5 address with IP", zap.Stringer("address", addr), zap.Error(err)) continue } } @@ -235,7 +235,7 @@ func (w *WakuNode) logAddress(addr ma.Multiaddr) { if err != nil { logger.Error("obtaining ENR record from multiaddress", zap.Error(err)) } else { - logger.Info("listening", zap.Stringer("ENR", enr), zap.Stringer("ip", ip)) + logger.Info("listening", logging.ENode("enr", enr), zap.Stringer("ip", ip)) } } } @@ -283,7 +283,7 @@ func (w *WakuNode) checkForAddressChanges() { func (w *WakuNode) Start() error { w.log.Info("Version details ", zap.String("commit", GitCommit)) - w.swap = swap.NewWakuSwap(w.log.Sugar(), []swap.SwapOption{ + w.swap = swap.NewWakuSwap(w.log, []swap.SwapOption{ swap.WithMode(w.opts.swapMode), swap.WithThreshold(w.opts.swapPaymentThreshold, w.opts.swapDisconnectThreshold), }...) @@ -294,7 +294,7 @@ func (w *WakuNode) Start() error { } if w.opts.enableFilter { - filter, err := filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode, w.log.Sugar(), w.opts.filterOpts...) + filter, err := filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode, w.log, w.opts.filterOpts...) if err != nil { return err } @@ -322,7 +322,7 @@ func (w *WakuNode) Start() error { return err } - w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay, w.log.Sugar()) + w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay, w.log) if w.opts.enableLightPush { if err := w.lightPush.Start(); err != nil { return err @@ -462,7 +462,7 @@ func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error { func (w *WakuNode) mountRelay(minRelayPeersToPublish int, opts ...pubsub.Option) error { var err error - w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, minRelayPeersToPublish, w.log.Sugar(), opts...) + w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, minRelayPeersToPublish, w.log, opts...) if err != nil { return err } @@ -492,7 +492,7 @@ func (w *WakuNode) mountDiscV5() error { } var err error - w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.ListenAddresses(), w.opts.privKey, w.wakuFlag, w.log.Sugar(), discV5Options...) + w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.ListenAddresses(), w.opts.privKey, w.wakuFlag, w.log, discV5Options...) return err } diff --git a/waku/v2/node/wakuoptions_test.go b/waku/v2/node/wakuoptions_test.go index 2c7bfb2c..158b85a1 100644 --- a/waku/v2/node/wakuoptions_test.go +++ b/waku/v2/node/wakuoptions_test.go @@ -30,7 +30,7 @@ func TestWakuOptions(t *testing.T) { advertiseAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0") storeFactory := func(w *WakuNode) store.Store { - return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log.Sugar()) + return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log) } options := []WakuNodeOption{ diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index 001ff34b..135c6f4b 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -4,7 +4,6 @@ import ( "context" "encoding/hex" "errors" - "fmt" "math" "sync" @@ -13,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-msgio/protoio" + "github.com/status-im/go-waku/logging" "github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" @@ -50,7 +50,7 @@ type ( isFullNode bool MsgC chan *protocol.Envelope wg *sync.WaitGroup - log *zap.SugaredLogger + log *zap.Logger filters *FilterMap subscribers *Subscribers @@ -62,13 +62,13 @@ type ( // relay protocol. const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1") -func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, log *zap.SugaredLogger, opts ...Option) (*WakuFilter, error) { +func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, log *zap.Logger, opts ...Option) (*WakuFilter, error) { wf := new(WakuFilter) - wf.log = log.Named("filter") + wf.log = log.Named("filter").With(zap.Bool("fullNode", isFullNode)) ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter")) if err != nil { - wf.log.Error(err) + wf.log.Error("creating tag map", zap.Error(err)) return nil, errors.New("could not start waku filter") } @@ -92,17 +92,13 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, log *za wf.wg.Add(1) go wf.FilterListener() - if wf.isFullNode { - wf.log.Info("Filter protocol started") - } else { - wf.log.Info("Filter protocol started (only client mode)") - } - + wf.log.Info("filter protocol started") return wf, nil } func (wf *WakuFilter) onRequest(s network.Stream) { defer s.Close() + logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer())) filterRPCRequest := &pb.FilterRPC{} @@ -110,11 +106,11 @@ func (wf *WakuFilter) onRequest(s network.Stream) { err := reader.ReadMsg(filterRPCRequest) if err != nil { - wf.log.Error("error reading request", err) + logger.Error("reading request", zap.Error(err)) return } - wf.log.Info(fmt.Sprintf("%s: received request from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) + logger.Info("received request") if filterRPCRequest.Push != nil && len(filterRPCRequest.Push.Messages) > 0 { // We're on a light node. @@ -123,7 +119,7 @@ func (wf *WakuFilter) onRequest(s network.Stream) { wf.filters.Notify(message, filterRPCRequest.RequestId) // Trigger filter handlers on a light node } - wf.log.Info("filter light node, received a message push. ", len(filterRPCRequest.Push.Messages), " messages") + logger.Info("received a message push", zap.Int("messages", len(filterRPCRequest.Push.Messages))) stats.Record(wf.ctx, metrics.Messages.M(int64(len(filterRPCRequest.Push.Messages)))) } else if filterRPCRequest.Request != nil && wf.isFullNode { // We're on a full node. @@ -132,29 +128,30 @@ func (wf *WakuFilter) onRequest(s network.Stream) { subscriber := Subscriber{peer: s.Conn().RemotePeer(), requestId: filterRPCRequest.RequestId, filter: *filterRPCRequest.Request} len := wf.subscribers.Append(subscriber) - wf.log.Info("filter full node, add a filter subscriber: ", subscriber.peer) + logger.Info("adding subscriber") stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len))) } else { peerId := s.Conn().RemotePeer() wf.subscribers.RemoveContentFilters(peerId, filterRPCRequest.Request.ContentFilters) - wf.log.Info("filter full node, remove a filter subscriber: ", peerId.Pretty()) + logger.Info("removing subscriber") stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(wf.subscribers.Length()))) } } else { - wf.log.Error("can't serve request") + logger.Error("can't serve request") return } } func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) error { pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: []*pb.WakuMessage{msg}}} + logger := wf.log.With(logging.HostID("peer", subscriber.peer)) // We connect first so dns4 addresses are resolved (NewStream does not do it) err := wf.h.Connect(wf.ctx, wf.h.Peerstore().PeerInfo(subscriber.peer)) if err != nil { wf.subscribers.FlagAsFailure(subscriber.peer) - wf.log.Error("failed to connect to peer", err) + logger.Error("connecting to peer", zap.Error(err)) return err } @@ -162,7 +159,7 @@ func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) er if err != nil { wf.subscribers.FlagAsFailure(subscriber.peer) - wf.log.Error("failed to open peer stream", err) + logger.Error("opening peer stream", zap.Error(err)) //waku_filter_errors.inc(labelValues = [dialFailure]) return err } @@ -171,7 +168,7 @@ func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) er writer := protoio.NewDelimitedWriter(conn) err = writer.WriteMsg(pushRPC) if err != nil { - wf.log.Error("failed to push messages to remote peer", err) + logger.Error("pushing messages to peer", zap.Error(err)) wf.subscribers.FlagAsFailure(subscriber.peer) return nil } @@ -188,25 +185,29 @@ func (wf *WakuFilter) FilterListener() { handle := func(envelope *protocol.Envelope) error { // async msg := envelope.Message() topic := envelope.PubsubTopic() + logger := wf.log.With(zap.Stringer("message", msg)) g := new(errgroup.Group) // Each subscriber is a light node that earlier on invoked // a FilterRequest on this node for subscriber := range wf.subscribers.Items() { + logger := logger.With(logging.HostID("subscriber", subscriber.peer)) subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines if subscriber.filter.Topic != "" && subscriber.filter.Topic != topic { - wf.log.Info("Subscriber's filter pubsubTopic does not match message topic", subscriber.filter.Topic, topic) + logger.Info("pubsub topic mismatch", + zap.String("subscriberTopic", subscriber.filter.Topic), + zap.String("messageTopic", topic)) continue } for _, filter := range subscriber.filter.ContentFilters { if msg.ContentTopic == filter.ContentTopic { - wf.log.Info("found matching contentTopic ", filter, msg) + logger.Info("found matching content topic", zap.Stringer("filter", filter)) // Do a message push to light node - wf.log.Info("pushing messages to light node: ", subscriber.peer) + logger.Info("pushing message to light node") g.Go(func() (err error) { err = wf.pushMessage(subscriber, msg) if err != nil { - wf.log.Error(err) + logger.Error("pushing message", zap.Error(err)) } return err }) @@ -221,7 +222,7 @@ func (wf *WakuFilter) FilterListener() { for m := range wf.MsgC { if err := handle(m); err != nil { - wf.log.Error("failed to handle message", err) + wf.log.Error("handling message", zap.Error(err)) } } } @@ -274,10 +275,10 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil writer := protoio.NewDelimitedWriter(conn) filterRPC := &pb.FilterRPC{RequestId: requestID, Request: &request} - wf.log.Info("sending filterRPC: ", filterRPC) + wf.log.Info("sending filterRPC", zap.Stringer("rpc", filterRPC)) err = writer.WriteMsg(filterRPC) if err != nil { - wf.log.Error("failed to write message", err) + wf.log.Error("sending filterRPC", zap.Error(err)) return } @@ -342,7 +343,7 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, f ContentFilter, opts ...Fi remoteSubs, err := wf.requestSubscription(ctx, f, opts...) if err != nil || remoteSubs.RequestID == "" { // Failed to subscribe - wf.log.Error("remote subscription to filter failed", err) + wf.log.Error("requesting subscription", zap.Error(err)) return } diff --git a/waku/v2/protocol/filter/waku_filter_option.go b/waku/v2/protocol/filter/waku_filter_option.go index 7d95a4af..3c870a18 100644 --- a/waku/v2/protocol/filter/waku_filter_option.go +++ b/waku/v2/protocol/filter/waku_filter_option.go @@ -14,7 +14,7 @@ type ( FilterSubscribeParameters struct { host host.Host selectedPeer peer.ID - log *zap.SugaredLogger + log *zap.Logger } FilterSubscribeOption func(*FilterSubscribeParameters) @@ -40,22 +40,22 @@ func WithPeer(p peer.ID) FilterSubscribeOption { func WithAutomaticPeerSelection() FilterSubscribeOption { return func(params *FilterSubscribeParameters) { - p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1), params.log.Desugar()) + p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1), params.log) if err == nil { params.selectedPeer = *p } else { - params.log.Info("Error selecting peer: ", err) + params.log.Info("selecting peer", zap.Error(err)) } } } func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1), params.log.Desugar()) + p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1), params.log) if err == nil { params.selectedPeer = *p } else { - params.log.Info("Error selecting peer: ", err) + params.log.Info("selecting peer", zap.Error(err)) } } } diff --git a/waku/v2/protocol/filter/waku_filter_option_test.go b/waku/v2/protocol/filter/waku_filter_option_test.go index 34ae5f0d..c21c33d4 100644 --- a/waku/v2/protocol/filter/waku_filter_option_test.go +++ b/waku/v2/protocol/filter/waku_filter_option_test.go @@ -25,7 +25,7 @@ func TestFilterOption(t *testing.T) { params := new(FilterSubscribeParameters) params.host = host - params.log = utils.Logger().Sugar() + params.log = utils.Logger() for _, opt := range options { opt(params) diff --git a/waku/v2/protocol/filter/waku_filter_test.go b/waku/v2/protocol/filter/waku_filter_test.go index 67f4f891..2b1370cc 100644 --- a/waku/v2/protocol/filter/waku_filter_test.go +++ b/waku/v2/protocol/filter/waku_filter_test.go @@ -23,7 +23,7 @@ func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*rel host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := relay.NewWakuRelay(context.Background(), host, broadcaster, 0, utils.Logger().Sugar()) + relay, err := relay.NewWakuRelay(context.Background(), host, broadcaster, 0, utils.Logger()) require.NoError(t, err) sub, err := relay.SubscribeToTopic(context.Background(), topic) @@ -39,7 +39,7 @@ func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - filter, _ := NewWakuFilter(context.Background(), host, false, utils.Logger().Sugar()) + filter, _ := NewWakuFilter(context.Background(), host, false, utils.Logger()) return filter, host } @@ -69,7 +69,7 @@ func TestWakuFilter(t *testing.T) { defer node2.Stop() defer sub2.Unsubscribe() - node2Filter, _ := NewWakuFilter(ctx, host2, true, utils.Logger().Sugar()) + node2Filter, _ := NewWakuFilter(ctx, host2, true, utils.Logger()) broadcaster.Register(&testTopic, node2Filter.MsgC) host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) @@ -154,7 +154,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { defer node2.Stop() defer sub2.Unsubscribe() - node2Filter, _ := NewWakuFilter(ctx, host2, true, utils.Logger().Sugar(), WithTimeout(3*time.Second)) + node2Filter, _ := NewWakuFilter(ctx, host2, true, utils.Logger(), WithTimeout(3*time.Second)) broadcaster.Register(&testTopic, node2Filter.MsgC) host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 1f0e458b..f836f105 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -4,13 +4,13 @@ import ( "context" "encoding/hex" "errors" - "fmt" "math" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-msgio/protoio" + "github.com/status-im/go-waku/logging" "github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" @@ -30,13 +30,13 @@ type WakuLightPush struct { relay *relay.WakuRelay ctx context.Context - log *zap.SugaredLogger + log *zap.Logger started bool } // NewWakuRelay returns a new instance of Waku Lightpush struct -func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay, log *zap.SugaredLogger) *WakuLightPush { +func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay, log *zap.Logger) *WakuLightPush { wakuLP := new(WakuLightPush) wakuLP.relay = relay wakuLP.ctx = ctx @@ -66,7 +66,7 @@ func (wakuLp *WakuLightPush) IsClientOnly() bool { func (wakuLP *WakuLightPush) onRequest(s network.Stream) { defer s.Close() - + logger := wakuLP.log.With(logging.HostID("peer", s.Conn().RemotePeer())) requestPushRPC := &pb.PushRPC{} writer := protoio.NewDelimitedWriter(s) @@ -74,15 +74,15 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) { err := reader.ReadMsg(requestPushRPC) if err != nil { - wakuLP.log.Error("error reading request", err) + logger.Error("reading request", zap.Error(err)) metrics.RecordLightpushError(wakuLP.ctx, "decodeRpcFailure") return } - wakuLP.log.Info(fmt.Sprintf("%s: lightpush message received from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) + logger.Info("request received") if requestPushRPC.Query != nil { - wakuLP.log.Info("lightpush push request") + logger.Info("push request") response := new(pb.PushResponse) if !wakuLP.IsClientOnly() { pubSubTopic := requestPushRPC.Query.PubsubTopic @@ -101,7 +101,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) { response.Info = "Totally" // TODO: ask about this } } else { - wakuLP.log.Debug("no relay protocol present, unsuccessful push") + logger.Debug("no relay protocol present, unsuccessful push") response.IsSuccess = false response.Info = "No relay protocol" } @@ -112,18 +112,18 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) { err = writer.WriteMsg(responsePushRPC) if err != nil { - wakuLP.log.Error("error writing response", err) + logger.Error("writing response", zap.Error(err)) _ = s.Reset() } else { - wakuLP.log.Info(fmt.Sprintf("%s: response sent to %s", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String())) + logger.Info("response sent") } } if requestPushRPC.Response != nil { if requestPushRPC.Response.IsSuccess { - wakuLP.log.Info("lightpush message success") + logger.Info("request success") } else { - wakuLP.log.Info(fmt.Sprintf("lightpush message failure. info=%s", requestPushRPC.Response.Info)) + logger.Info("request failure", zap.String("info=", requestPushRPC.Response.Info)) } } } @@ -148,15 +148,17 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o return nil, ErrInvalidId } + logger := wakuLP.log.With(logging.HostID("peer", params.selectedPeer)) // We connect first so dns4 addresses are resolved (NewStream does not do it) err := wakuLP.h.Connect(ctx, wakuLP.h.Peerstore().PeerInfo(params.selectedPeer)) if err != nil { + logger.Error("connecting peer", zap.Error(err)) return nil, err } connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1) if err != nil { - wakuLP.log.Info("failed to connect to remote peer", err) + logger.Error("creating stream to peer", zap.Error(err)) metrics.RecordLightpushError(wakuLP.ctx, "dialError") return nil, err } @@ -166,7 +168,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o err := connOpt.Reset() if err != nil { metrics.RecordLightpushError(wakuLP.ctx, "dialError") - wakuLP.log.Error("failed to reset connection", err) + logger.Error("resetting connection", zap.Error(err)) } }() @@ -177,14 +179,14 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o err = writer.WriteMsg(pushRequestRPC) if err != nil { - wakuLP.log.Error("could not write request", err) + logger.Error("writing request", zap.Error(err)) return nil, err } pushResponseRPC := &pb.PushRPC{} err = reader.ReadMsg(pushResponseRPC) if err != nil { - wakuLP.log.Error("could not read response", err) + logger.Error("reading response", zap.Error(err)) metrics.RecordLightpushError(wakuLP.ctx, "decodeRPCFailure") return nil, err } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index 8b12aaf7..94a402ed 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -14,7 +14,7 @@ type LightPushParameters struct { host host.Host selectedPeer peer.ID requestId []byte - log *zap.SugaredLogger + log *zap.Logger } type LightPushOption func(*LightPushParameters) @@ -27,22 +27,22 @@ func WithPeer(p peer.ID) LightPushOption { func WithAutomaticPeerSelection(host host.Host) LightPushOption { return func(params *LightPushParameters) { - p, err := utils.SelectPeer(host, string(LightPushID_v20beta1), params.log.Desugar()) + p, err := utils.SelectPeer(host, string(LightPushID_v20beta1), params.log) if err == nil { params.selectedPeer = *p } else { - params.log.Info("Error selecting peer: ", err) + params.log.Info("selecting peer", zap.Error(err)) } } } func WithFastestPeerSelection(ctx context.Context) LightPushOption { return func(params *LightPushParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1), params.log.Desugar()) + p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1), params.log) if err == nil { params.selectedPeer = *p } else { - params.log.Info("Error selecting peer: ", err) + params.log.Info("selecting peer", zap.Error(err)) } } } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option_test.go b/waku/v2/protocol/lightpush/waku_lightpush_option_test.go index bde2262b..eba2cdd4 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option_test.go @@ -27,7 +27,7 @@ func TestLightPushOption(t *testing.T) { params := new(LightPushParameters) params.host = host - params.log = utils.Logger().Sugar() + params.log = utils.Logger() for _, opt := range options { opt(params) diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index 7930aa70..a5aee33b 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -25,7 +25,7 @@ func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscri host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, utils.Logger().Sugar()) + relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, utils.Logger()) require.NoError(t, err) sub, err := relay.SubscribeToTopic(context.Background(), topic) @@ -56,7 +56,7 @@ func TestWakuLightPush(t *testing.T) { defer sub2.Unsubscribe() ctx := context.Background() - lightPushNode2 := NewWakuLightPush(ctx, host2, node2, utils.Logger().Sugar()) + lightPushNode2 := NewWakuLightPush(ctx, host2, node2, utils.Logger()) err := lightPushNode2.Start() require.NoError(t, err) defer lightPushNode2.Stop() @@ -66,7 +66,7 @@ func TestWakuLightPush(t *testing.T) { clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger().Sugar()) + client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger()) host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) err = host2.Peerstore().AddProtocols(host1.ID(), string(relay.WakuRelayID_v200)) @@ -122,7 +122,7 @@ func TestWakuLightPushStartWithoutRelay(t *testing.T) { clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader) require.NoError(t, err) - client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger().Sugar()) + client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger()) err = client.Start() require.Errorf(t, err, "relay is required") @@ -136,7 +136,7 @@ func TestWakuLightPushNoPeers(t *testing.T) { clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader) require.NoError(t, err) - client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger().Sugar()) + client := NewWakuLightPush(ctx, clientHost, nil, utils.Logger()) _, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", 0), testTopic) require.Errorf(t, err, "no suitable remote peers") diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 0e186edd..1b05dc0e 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -30,7 +30,7 @@ type WakuRelay struct { host host.Host pubsub *pubsub.PubSub - log *zap.SugaredLogger + log *zap.Logger bcaster v2.Broadcaster @@ -52,7 +52,7 @@ func msgIdFn(pmsg *pubsub_pb.Message) string { } // NewWakuRelay returns a new instance of a WakuRelay struct -func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, minPeersToPublish int, log *zap.SugaredLogger, opts ...pubsub.Option) (*WakuRelay, error) { +func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, minPeersToPublish int, log *zap.Logger, opts ...pubsub.Option) (*WakuRelay, error) { w := new(WakuRelay) w.host = h w.wakuRelayTopics = make(map[string]*pubsub.Topic) @@ -109,7 +109,7 @@ func (w *WakuRelay) Topics() []string { return result } -// SetPubSub is used to set an aimplementation of the pubsub system +// SetPubSub is used to set an implementation of the pubsub system func (w *WakuRelay) SetPubSub(pubSub *pubsub.PubSub) { w.pubsub = pubSub } @@ -144,7 +144,7 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro } w.relaySubs[topic] = sub - w.log.Info("Subscribing to topic ", topic) + w.log.Info("subscribing to topic", zap.String("topic", topic)) } return sub, nil @@ -162,7 +162,7 @@ func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage, } if !w.EnoughPeersToPublishToTopic(topic) { - return nil, errors.New("not enougth peers to publish") + return nil, errors.New("not enough peers to publish") } pubSubTopic, err := w.upsertTopic(topic) @@ -255,7 +255,7 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error { if _, ok := w.relaySubs[topic]; !ok { return fmt.Errorf("topics %s is not subscribed", (string)(topic)) } - w.log.Info("Unsubscribing from topic ", topic) + w.log.Info("unsubscribing from topic", zap.String("topic", topic)) for _, sub := range w.subscriptions[topic] { sub.Unsubscribe() @@ -285,7 +285,7 @@ func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) < for { msg, err := sub.Next(ctx) if err != nil { - w.log.Error(fmt.Errorf("subscription failed: %w", err)) + w.log.Error("getting message from subscription", zap.Error(err)) sub.Cancel() close(msgChannel) for _, subscription := range w.subscriptions[sub.Topic()] { @@ -302,7 +302,7 @@ func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) < func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub *pubsub.Subscription) { ctx, err := tag.New(context.Background(), tag.Insert(metrics.KeyType, "relay")) if err != nil { - w.log.Error(err) + w.log.Error("creating tag map", zap.Error(err)) return } @@ -333,7 +333,7 @@ func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub * stats.Record(ctx, metrics.Messages.M(1)) wakuMessage := &pb.WakuMessage{} if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil { - w.log.Error("could not decode message", err) + w.log.Error("decoding message", zap.Error(err)) return } diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index d8079451..79f17702 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -20,7 +20,7 @@ func TestWakuRelay(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := NewWakuRelay(context.Background(), host, nil, 0, utils.Logger().Sugar()) + relay, err := NewWakuRelay(context.Background(), host, nil, 0, utils.Logger()) defer relay.Stop() require.NoError(t, err) diff --git a/waku/v2/protocol/store/waku_resume_test.go b/waku/v2/protocol/store/waku_resume_test.go index d683860d..2a85dde6 100644 --- a/waku/v2/protocol/store/waku_resume_test.go +++ b/waku/v2/protocol/store/waku_resume_test.go @@ -21,7 +21,7 @@ func TestFindLastSeenMessage(t *testing.T) { msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), "test") msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), "test") - s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) + s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger()) _ = s.storeMessage(msg1) _ = s.storeMessage(msg3) _ = s.storeMessage(msg5) @@ -38,7 +38,7 @@ func TestResume(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger().Sugar()) + s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger()) s1.Start(ctx) defer s1.Stop() @@ -56,7 +56,7 @@ func TestResume(t *testing.T) { host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger().Sugar()) + s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger()) s2.Start(ctx) defer s2.Stop() @@ -88,7 +88,7 @@ func TestResumeWithListOfPeers(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger().Sugar()) + s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger()) s1.Start(ctx) defer s1.Stop() @@ -99,7 +99,7 @@ func TestResumeWithListOfPeers(t *testing.T) { host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger().Sugar()) + s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger()) s2.Start(ctx) defer s2.Stop() @@ -121,7 +121,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger().Sugar()) + s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger()) s1.Start(ctx) defer s1.Stop() @@ -132,7 +132,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger().Sugar()) + s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger()) s2.Start(ctx) defer s2.Stop() diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index 91ed873c..1f02752a 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -5,7 +5,6 @@ import ( "context" "encoding/hex" "errors" - "fmt" "math" "sort" "strings" @@ -19,6 +18,7 @@ import ( "github.com/libp2p/go-msgio/protoio" "go.uber.org/zap" + "github.com/status-im/go-waku/logging" "github.com/status-im/go-waku/waku/persistence" "github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/protocol" @@ -239,7 +239,7 @@ type WakuStore struct { MsgC chan *protocol.Envelope wg *sync.WaitGroup - log *zap.SugaredLogger + log *zap.Logger started bool @@ -259,7 +259,7 @@ type Store interface { } // NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages -func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration, log *zap.SugaredLogger) *WakuStore { +func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration, log *zap.Logger) *WakuStore { wakuStore := new(WakuStore) wakuStore.msgProvider = p wakuStore.h = host @@ -308,12 +308,14 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) { start := time.Now() defer func() { elapsed := time.Since(start) - store.log.Info(fmt.Sprintf("Store initialization took %s", elapsed)) + store.log.Info("Store initialization complete", + zap.Duration("duration", elapsed), + zap.Int("messages", store.messageQueue.Length())) }() storedMessages, err := (store.msgProvider).GetAll() if err != nil { - store.log.Error("could not load DBProvider messages", err) + store.log.Error("loading DBProvider messages", zap.Error(err)) metrics.RecordStoreError(ctx, "store_load_failure") return } @@ -328,8 +330,6 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) { } metrics.RecordMessage(ctx, "stored", store.messageQueue.Length()) - - store.log.Info(fmt.Sprintf("%d messages available in waku store", store.messageQueue.Length())) } func (store *WakuStore) addToMessageQueue(pubsubTopic string, idx *pb.Index, msg *pb.WakuMessage) error { @@ -339,7 +339,7 @@ func (store *WakuStore) addToMessageQueue(pubsubTopic string, idx *pb.Index, msg func (store *WakuStore) storeMessage(env *protocol.Envelope) error { index, err := computeIndex(env) if err != nil { - store.log.Error("could not calculate message index", err) + store.log.Error("creating message index", zap.Error(err)) return err } @@ -356,7 +356,7 @@ func (store *WakuStore) storeMessage(env *protocol.Envelope) error { // TODO: Move this to a separate go routine if DB writes becomes a bottleneck err = store.msgProvider.Put(index, env.PubsubTopic(), env.Message()) // Should the index be stored? if err != nil { - store.log.Error("could not store message", err) + store.log.Error("storing message", zap.Error(err)) metrics.RecordStoreError(store.ctx, "store_failure") return err } @@ -374,7 +374,7 @@ func (store *WakuStore) storeIncomingMessages(ctx context.Context) { func (store *WakuStore) onRequest(s network.Stream) { defer s.Close() - + logger := store.log.With(logging.HostID("peer", s.Conn().RemotePeer())) historyRPCRequest := &pb.HistoryRPC{} writer := protoio.NewDelimitedWriter(s) @@ -382,23 +382,27 @@ func (store *WakuStore) onRequest(s network.Stream) { err := reader.ReadMsg(historyRPCRequest) if err != nil { - store.log.Error("error reading request", err) + logger.Error("reading request", zap.Error(err)) metrics.RecordStoreError(store.ctx, "decodeRPCFailure") return } - - store.log.Info(fmt.Sprintf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) + logger = logger.With(zap.String("id", historyRPCRequest.RequestId)) + if query := historyRPCRequest.Query; query != nil { + logger = logger.With(logging.Filters(query.GetContentFilters())) + } + logger.Info("received query") historyResponseRPC := &pb.HistoryRPC{} historyResponseRPC.RequestId = historyRPCRequest.RequestId historyResponseRPC.Response = store.FindMessages(historyRPCRequest.Query) + logger = logger.With(zap.Int("messages", len(historyResponseRPC.Response.Messages))) err = writer.WriteMsg(historyResponseRPC) if err != nil { - store.log.Error("error writing response", err) + logger.Error("writing response", zap.Error(err), logging.PagingInfo(historyResponseRPC.Response.PagingInfo)) _ = s.Reset() } else { - store.log.Info(fmt.Sprintf("%s: Response sent to %s", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String())) + logger.Info("response sent") } } @@ -491,22 +495,22 @@ func WithPeer(p peer.ID) HistoryRequestOption { // to request the message history func WithAutomaticPeerSelection() HistoryRequestOption { return func(params *HistoryRequestParameters) { - p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta4), params.s.log.Desugar()) + p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta4), params.s.log) if err == nil { params.selectedPeer = *p } else { - params.s.log.Info("Error selecting peer: ", err) + params.s.log.Info("selecting peer", zap.Error(err)) } } } func WithFastestPeerSelection(ctx context.Context) HistoryRequestOption { return func(params *HistoryRequestParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta4), params.s.log.Desugar()) + p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta4), params.s.log) if err == nil { params.selectedPeer = *p } else { - params.s.log.Info("Error selecting peer: ", err) + params.s.log.Info("selecting peer", zap.Error(err)) } } } @@ -547,17 +551,19 @@ func DefaultOptions() []HistoryRequestOption { } func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestId []byte) (*pb.HistoryResponse, error) { - store.log.Info(fmt.Sprintf("Querying message history with peer %s", selectedPeer)) + logger := store.log.With(logging.HostID("peer", selectedPeer)) + logger.Info("querying message history") // We connect first so dns4 addresses are resolved (NewStream does not do it) err := store.h.Connect(ctx, store.h.Peerstore().PeerInfo(selectedPeer)) if err != nil { + logger.Error("connecting to peer", zap.Error(err)) return nil, err } connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4) if err != nil { - store.log.Error("Failed to connect to remote peer", err) + logger.Error("creating stream to peer", zap.Error(err)) return nil, err } @@ -573,14 +579,14 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec err = writer.WriteMsg(historyRequest) if err != nil { - store.log.Error("could not write request", err) + logger.Error("writing request", zap.Error(err)) return nil, err } historyResponseRPC := &pb.HistoryRPC{} err = reader.ReadMsg(historyResponseRPC) if err != nil { - store.log.Error("could not read response", err) + logger.Error("reading response", zap.Error(err)) metrics.RecordStoreError(store.ctx, "decodeRPCFailure") return nil, err } @@ -705,7 +711,7 @@ func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, c resultChan <- result return } - store.log.Error(fmt.Errorf("resume history with peer %s failed: %w", peer, err)) + store.log.Error("resuming history", logging.HostID("peer", peer), zap.Error(err)) }() } @@ -774,9 +780,9 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList } if len(peerList) == 0 { - p, err := utils.SelectPeer(store.h, string(StoreID_v20beta4), store.log.Desugar()) + p, err := utils.SelectPeer(store.h, string(StoreID_v20beta4), store.log) if err != nil { - store.log.Info("Error selecting peer: ", err) + store.log.Info("selecting peer", zap.Error(err)) return -1, ErrNoPeersAvailable } @@ -785,7 +791,7 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList messages, err := store.queryLoop(ctx, rpc, peerList) if err != nil { - store.log.Error("failed to resume history", err) + store.log.Error("resuming history", zap.Error(err)) return -1, ErrFailedToResumeHistory } @@ -796,7 +802,7 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList } } - store.log.Info("Retrieved messages since the last online time: ", len(messages)) + store.log.Info("retrieved messages since the last online time", zap.Int("messages", len(messages))) return msgCount, nil } diff --git a/waku/v2/protocol/store/waku_store_persistence_test.go b/waku/v2/protocol/store/waku_store_persistence_test.go index 4cd6435c..262cf461 100644 --- a/waku/v2/protocol/store/waku_store_persistence_test.go +++ b/waku/v2/protocol/store/waku_store_persistence_test.go @@ -21,10 +21,10 @@ func TestStorePersistence(t *testing.T) { db, err := sqlite.NewDB(":memory:") require.NoError(t, err) - dbStore, err := persistence.NewDBStore(utils.Logger().Sugar(), persistence.WithDB(db)) + dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db)) require.NoError(t, err) - s1 := NewWakuStore(nil, nil, dbStore, 0, 0, utils.Logger().Sugar()) + s1 := NewWakuStore(nil, nil, dbStore, 0, 0, utils.Logger()) s1.fetchDBRecords(ctx) require.Len(t, s1.messageQueue.messages, 0) @@ -39,7 +39,7 @@ func TestStorePersistence(t *testing.T) { _ = s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic)) - s2 := NewWakuStore(nil, nil, dbStore, 0, 0, utils.Logger().Sugar()) + s2 := NewWakuStore(nil, nil, dbStore, 0, 0, utils.Logger()) s2.fetchDBRecords(ctx) require.Len(t, s2.messageQueue.messages, 1) require.Equal(t, msg, s2.messageQueue.messages[0].msg) diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index 08488f8f..7dfdb697 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -20,7 +20,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger().Sugar()) + s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger()) s1.Start(ctx) defer s1.Stop() @@ -39,7 +39,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { // Simulate a message has been received via relay protocol s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1) - s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger().Sugar()) + s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger()) s2.Start(ctx) defer s2.Stop() @@ -66,7 +66,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger().Sugar()) + s1 := NewWakuStore(host1, nil, nil, 0, 0, utils.Logger()) s1.Start(ctx) defer s1.Stop() @@ -92,7 +92,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta4)) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger().Sugar()) + s2 := NewWakuStore(host2, nil, nil, 0, 0, utils.Logger()) s2.Start(ctx) defer s2.Stop() diff --git a/waku/v2/protocol/store/waku_store_query_test.go b/waku/v2/protocol/store/waku_store_query_test.go index 05946e8c..28f12706 100644 --- a/waku/v2/protocol/store/waku_store_query_test.go +++ b/waku/v2/protocol/store/waku_store_query_test.go @@ -17,7 +17,7 @@ func TestStoreQuery(t *testing.T) { msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch()) msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) + s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic)) _ = s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic)) @@ -43,7 +43,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) + s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic)) _ = s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic)) @@ -77,7 +77,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) + s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2)) @@ -109,7 +109,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) + s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2)) @@ -131,7 +131,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) + s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1)) @@ -150,7 +150,7 @@ func TestStoreQueryForwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) + s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger()) for i := 0; i < 10; i++ { msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch()) msg.Payload = []byte{byte(i)} @@ -174,7 +174,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) + s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger()) for i := 0; i < 10; i++ { msg := &pb.WakuMessage{ Payload: []byte{byte(i)}, @@ -200,7 +200,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { } func TestTemporalHistoryQueries(t *testing.T) { - s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger().Sugar()) + s := NewWakuStore(nil, nil, nil, 0, 0, utils.Logger()) var messages []*pb.WakuMessage for i := 0; i < 10; i++ { diff --git a/waku/v2/protocol/swap/waku_swap.go b/waku/v2/protocol/swap/waku_swap.go index 07f55e2b..be7ca5f5 100644 --- a/waku/v2/protocol/swap/waku_swap.go +++ b/waku/v2/protocol/swap/waku_swap.go @@ -18,13 +18,13 @@ const WakuSwapID_v200 = protocol.ID("/vac/waku/swap/2.0.0-beta1") type WakuSwap struct { params *SwapParameters - log *zap.SugaredLogger + log *zap.Logger Accounting map[string]int accountingMutex sync.RWMutex } -func NewWakuSwap(log *zap.SugaredLogger, opts ...SwapOption) *WakuSwap { +func NewWakuSwap(log *zap.Logger, opts ...SwapOption) *WakuSwap { params := &SwapParameters{} optList := DefaultOptions() @@ -45,12 +45,13 @@ func (s *WakuSwap) sendCheque(peerId string) { } func (s *WakuSwap) applyPolicy(peerId string) { + logger := s.log.With(zap.String("peer", peerId)) if s.Accounting[peerId] <= s.params.disconnectThreshold { - s.log.Warnf("Disconnect threshold has been reached for %s at %d", peerId, s.Accounting[peerId]) + logger.Warn("disconnect threshold reached", zap.Int("value", s.Accounting[peerId])) } if s.Accounting[peerId] >= s.params.paymentThreshold { - s.log.Warnf("Disconnect threshold has been reached for %s at %d", peerId, s.Accounting[peerId]) + logger.Warn("payment threshold reached", zap.Int("value", s.Accounting[peerId])) if s.params.mode != HardMode { s.sendCheque(peerId) } diff --git a/waku/v2/protocol/swap/waku_swap_test.go b/waku/v2/protocol/swap/waku_swap_test.go index 4f268869..6141ead2 100644 --- a/waku/v2/protocol/swap/waku_swap_test.go +++ b/waku/v2/protocol/swap/waku_swap_test.go @@ -8,7 +8,7 @@ import ( ) func TestSwapCreditDebit(t *testing.T) { - swap := NewWakuSwap(utils.Logger().Sugar(), []SwapOption{ + swap := NewWakuSwap(utils.Logger(), []SwapOption{ WithMode(SoftMode), WithThreshold(0, 0), }...) diff --git a/waku/v2/rpc/admin.go b/waku/v2/rpc/admin.go index 2636e54e..9f25b17f 100644 --- a/waku/v2/rpc/admin.go +++ b/waku/v2/rpc/admin.go @@ -15,7 +15,7 @@ import ( type AdminService struct { node *node.WakuNode - log *zap.SugaredLogger + log *zap.Logger } type GetPeersArgs struct { @@ -39,7 +39,7 @@ func (a *AdminService) PostV1Peers(req *http.Request, args *PeersArgs, reply *Su for _, peer := range args.Peers { addr, err := ma.NewMultiaddr(peer) if err != nil { - a.log.Error("Error building multiaddr", zap.Error(err)) + a.log.Error("building multiaddr", zap.Error(err)) reply.Success = false reply.Error = err.Error() return nil @@ -47,7 +47,7 @@ func (a *AdminService) PostV1Peers(req *http.Request, args *PeersArgs, reply *Su err = a.node.DialPeerWithMultiAddress(req.Context(), addr) if err != nil { - a.log.Error("Error dialing peers", zap.Error(err)) + a.log.Error("dialing peers", zap.Error(err)) reply.Success = false reply.Error = err.Error() return nil @@ -65,7 +65,7 @@ func isWakuProtocol(protocol string) bool { func (a *AdminService) GetV1Peers(req *http.Request, args *GetPeersArgs, reply *PeersReply) error { peers, err := a.node.Peers() if err != nil { - a.log.Error("Error getting peers", zap.Error(err)) + a.log.Error("getting peers", zap.Error(err)) return nil } for _, peer := range peers { diff --git a/waku/v2/rpc/admin_test.go b/waku/v2/rpc/admin_test.go index 3a5aa42f..193f0e5f 100644 --- a/waku/v2/rpc/admin_test.go +++ b/waku/v2/rpc/admin_test.go @@ -24,7 +24,7 @@ func makeAdminService(t *testing.T) *AdminService { require.NoError(t, err) err = n.Start() require.NoError(t, err) - return &AdminService{n, utils.Logger().Sugar()} + return &AdminService{n, utils.Logger()} } func TestV1Peers(t *testing.T) { @@ -33,7 +33,7 @@ func TestV1Peers(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := relay.NewWakuRelay(context.Background(), host, nil, 0, utils.Logger().Sugar()) + relay, err := relay.NewWakuRelay(context.Background(), host, nil, 0, utils.Logger()) require.NoError(t, err) defer relay.Stop() diff --git a/waku/v2/rpc/filter.go b/waku/v2/rpc/filter.go index 76b5a8f5..4a9ab262 100644 --- a/waku/v2/rpc/filter.go +++ b/waku/v2/rpc/filter.go @@ -14,7 +14,7 @@ import ( type FilterService struct { node *node.WakuNode - log *zap.SugaredLogger + log *zap.Logger messages map[string][]*pb.WakuMessage messagesMutex sync.RWMutex @@ -31,7 +31,7 @@ type ContentTopicArgs struct { ContentTopic string `json:"contentTopic,omitempty"` } -func NewFilterService(node *node.WakuNode, log *zap.SugaredLogger) *FilterService { +func NewFilterService(node *node.WakuNode, log *zap.Logger) *FilterService { s := &FilterService{ node: node, log: log.Named("filter"), @@ -80,7 +80,7 @@ func (f *FilterService) PostV1Subscription(req *http.Request, args *FilterConten filter.WithAutomaticPeerSelection(), ) if err != nil { - f.log.Error("Error subscribing to topic:", args.Topic, "err:", err) + f.log.Error("subscribing to topic", zap.String("topic", args.Topic), zap.Error(err)) reply.Success = false reply.Error = err.Error() return nil @@ -98,7 +98,7 @@ func (f *FilterService) DeleteV1Subscription(req *http.Request, args *FilterCont makeContentFilter(args), ) if err != nil { - f.log.Error("Error unsubscribing to topic:", args.Topic, "err:", err) + f.log.Error("unsubscribing from topic", zap.String("topic", args.Topic), zap.Error(err)) reply.Success = false reply.Error = err.Error() return nil diff --git a/waku/v2/rpc/filter_test.go b/waku/v2/rpc/filter_test.go index ca78f25a..5edaba5b 100644 --- a/waku/v2/rpc/filter_test.go +++ b/waku/v2/rpc/filter_test.go @@ -29,7 +29,7 @@ func makeFilterService(t *testing.T) *FilterService { _, err = n.Relay().SubscribeToTopic(context.Background(), testTopic) require.NoError(t, err) - return NewFilterService(n, utils.Logger().Sugar()) + return NewFilterService(n, utils.Logger()) } func TestFilterSubscription(t *testing.T) { @@ -39,13 +39,13 @@ func TestFilterSubscription(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, utils.Logger().Sugar()) + node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, utils.Logger()) require.NoError(t, err) _, err = node.SubscribeToTopic(context.Background(), testTopic) require.NoError(t, err) - _, _ = filter.NewWakuFilter(context.Background(), host, false, utils.Logger().Sugar()) + _, _ = filter.NewWakuFilter(context.Background(), host, false, utils.Logger()) d := makeFilterService(t) defer d.node.Stop() diff --git a/waku/v2/rpc/private.go b/waku/v2/rpc/private.go index 33a91dc8..1f7b678e 100644 --- a/waku/v2/rpc/private.go +++ b/waku/v2/rpc/private.go @@ -16,7 +16,7 @@ import ( type PrivateService struct { node *node.WakuNode - log *zap.SugaredLogger + log *zap.Logger symmetricMessages map[string][]*pb.WakuMessage symmetricMessagesMutex sync.RWMutex @@ -56,7 +56,7 @@ type AsymmetricMessagesArgs struct { PrivateKey string `json:"privateKey"` } -func NewPrivateService(node *node.WakuNode, log *zap.SugaredLogger) *PrivateService { +func NewPrivateService(node *node.WakuNode, log *zap.Logger) *PrivateService { return &PrivateService{ node: node, symmetricMessages: make(map[string][]*pb.WakuMessage), diff --git a/waku/v2/rpc/private_test.go b/waku/v2/rpc/private_test.go index 74ec996e..41c7a86e 100644 --- a/waku/v2/rpc/private_test.go +++ b/waku/v2/rpc/private_test.go @@ -16,7 +16,7 @@ func makePrivateService(t *testing.T) *PrivateService { err = n.Start() require.NoError(t, err) - return NewPrivateService(n, utils.Logger().Sugar()) + return NewPrivateService(n, utils.Logger()) } func TestGetV1SymmetricKey(t *testing.T) { diff --git a/waku/v2/rpc/relay.go b/waku/v2/rpc/relay.go index 30204504..610b1dc1 100644 --- a/waku/v2/rpc/relay.go +++ b/waku/v2/rpc/relay.go @@ -14,7 +14,7 @@ import ( type RelayService struct { node *node.WakuNode - log *zap.SugaredLogger + log *zap.Logger messages map[string][]*pb.WakuMessage messagesMutex sync.RWMutex @@ -35,7 +35,7 @@ type TopicArgs struct { Topic string `json:"topic,omitempty"` } -func NewRelayService(node *node.WakuNode, log *zap.SugaredLogger) *RelayService { +func NewRelayService(node *node.WakuNode, log *zap.Logger) *RelayService { s := &RelayService{ node: node, log: log.Named("relay"), @@ -73,7 +73,7 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, _, err = r.node.Relay().PublishToTopic(req.Context(), &args.Message, args.Topic) } if err != nil { - r.log.Error("Error publishing message: ", err) + r.log.Error("publishing message", zap.Error(err)) reply.Success = false reply.Error = err.Error() } else { @@ -93,7 +93,7 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r _, err = r.node.Relay().SubscribeToTopic(ctx, topic) } if err != nil { - r.log.Error("Error subscribing to topic:", topic, "err:", err) + r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err)) reply.Success = false reply.Error = err.Error() return nil @@ -109,7 +109,7 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, for _, topic := range args.Topics { err := r.node.Relay().Unsubscribe(ctx, topic) if err != nil { - r.log.Error("Error unsubscribing from topic", topic, "err:", err) + r.log.Error("unsubscribing from topic", zap.String("topic", topic), zap.Error(err)) reply.Success = false reply.Error = err.Error() return nil diff --git a/waku/v2/rpc/relay_test.go b/waku/v2/rpc/relay_test.go index f0c343bc..b6a7c6ff 100644 --- a/waku/v2/rpc/relay_test.go +++ b/waku/v2/rpc/relay_test.go @@ -19,7 +19,7 @@ func makeRelayService(t *testing.T) *RelayService { require.NoError(t, err) err = n.Start() require.NoError(t, err) - return NewRelayService(n, utils.Logger().Sugar()) + return NewRelayService(n, utils.Logger()) } func TestPostV1Message(t *testing.T) { diff --git a/waku/v2/rpc/store.go b/waku/v2/rpc/store.go index b9f2f8dd..690dcc20 100644 --- a/waku/v2/rpc/store.go +++ b/waku/v2/rpc/store.go @@ -11,7 +11,7 @@ import ( type StoreService struct { node *node.WakuNode - log *zap.SugaredLogger + log *zap.Logger } // cursor *pb.Index @@ -56,7 +56,7 @@ func (s *StoreService) GetV1Messages(req *http.Request, args *StoreMessagesArgs, options..., ) if err != nil { - s.log.Error("Error querying messages:", zap.Error(err)) + s.log.Error("querying messages", zap.Error(err)) reply.Error = err.Error() return nil } diff --git a/waku/v2/rpc/store_test.go b/waku/v2/rpc/store_test.go index 29442c99..f5b28951 100644 --- a/waku/v2/rpc/store_test.go +++ b/waku/v2/rpc/store_test.go @@ -15,7 +15,7 @@ func makeStoreService(t *testing.T) *StoreService { require.NoError(t, err) err = n.Start() require.NoError(t, err) - return &StoreService{n, utils.Logger().Sugar()} + return &StoreService{n, utils.Logger()} } func TestStoreGetV1Messages(t *testing.T) { diff --git a/waku/v2/rpc/waku_rpc.go b/waku/v2/rpc/waku_rpc.go index c4e77632..affde951 100644 --- a/waku/v2/rpc/waku_rpc.go +++ b/waku/v2/rpc/waku_rpc.go @@ -15,7 +15,7 @@ type WakuRpc struct { node *node.WakuNode server *http.Server - log *zap.SugaredLogger + log *zap.Logger relayService *RelayService filterService *FilterService @@ -23,7 +23,7 @@ type WakuRpc struct { adminService *AdminService } -func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool, enablePrivate bool, log *zap.SugaredLogger) *WakuRpc { +func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool, enablePrivate bool, log *zap.Logger) *WakuRpc { wrpc := new(WakuRpc) wrpc.log = log.Named("rpc") @@ -33,25 +33,25 @@ func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool, err := s.RegisterService(&DebugService{node}, "Debug") if err != nil { - wrpc.log.Error(err) + wrpc.log.Error("registering debug service", zap.Error(err)) } relayService := NewRelayService(node, log) err = s.RegisterService(relayService, "Relay") if err != nil { - wrpc.log.Error(err) + wrpc.log.Error("registering relay service", zap.Error(err)) } err = s.RegisterService(&StoreService{node, log}, "Store") if err != nil { - wrpc.log.Error(err) + wrpc.log.Error("registering store service", zap.Error(err)) } if enableAdmin { adminService := &AdminService{node, log.Named("admin")} err = s.RegisterService(adminService, "Admin") if err != nil { - wrpc.log.Error(err) + wrpc.log.Error("registering admin service", zap.Error(err)) } wrpc.adminService = adminService } @@ -59,14 +59,14 @@ func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool, filterService := NewFilterService(node, log) err = s.RegisterService(filterService, "Filter") if err != nil { - wrpc.log.Error(err) + wrpc.log.Error("registering filter service", zap.Error(err)) } if enablePrivate { privateService := NewPrivateService(node, log) err = s.RegisterService(privateService, "Private") if err != nil { - wrpc.log.Error(err) + wrpc.log.Error("registering private service", zap.Error(err)) } wrpc.privateService = privateService } @@ -75,7 +75,7 @@ func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool, mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { t := time.Now() s.ServeHTTP(w, r) - wrpc.log.Infof("RPC request at %s took %s", r.URL.Path, time.Since(t)) + wrpc.log.Info("served request", zap.String("path", r.URL.Path), zap.Duration("duration", time.Since(t))) }) listenAddr := fmt.Sprintf("%s:%d", address, port) @@ -104,10 +104,10 @@ func (r *WakuRpc) Start() { go func() { _ = r.server.ListenAndServe() }() - r.log.Info("Rpc server started at ", r.server.Addr) + r.log.Info("server started", zap.String("addr", r.server.Addr)) } func (r *WakuRpc) Stop(ctx context.Context) error { - r.log.Info("Shutting down rpc server") + r.log.Info("shutting down server") return r.server.Shutdown(ctx) } diff --git a/waku/v2/rpc/waku_rpc_test.go b/waku/v2/rpc/waku_rpc_test.go index ac0934d7..1ccee7b5 100644 --- a/waku/v2/rpc/waku_rpc_test.go +++ b/waku/v2/rpc/waku_rpc_test.go @@ -14,7 +14,7 @@ func TestWakuRpc(t *testing.T) { n, err := node.New(context.Background(), options) require.NoError(t, err) - rpc := NewWakuRpc(n, "127.0.0.1", 8080, true, true, utils.Logger().Sugar()) + rpc := NewWakuRpc(n, "127.0.0.1", 8080, true, true, utils.Logger()) require.NotNil(t, rpc.server) require.Equal(t, rpc.server.Addr, "127.0.0.1:8080") }