From 11d1f8fb0d43a94e2409f3c18525b42880fb05b4 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 18 Jan 2022 14:17:06 -0400 Subject: [PATCH] feat: result aggregation in resume and enforce max page size (#183) * feat: result aggregation in resume and enforce max page size * feat: add WithLogger option to wakunode (#184) * fix: rebase issues --- go.mod | 1 + tests/utils.go | 12 +- waku.go | 10 ++ waku/metrics/http.go | 29 ++--- waku/metrics/http_test.go | 3 +- waku/node.go | 35 +++--- waku/persistence/store.go | 10 +- waku/persistence/store_test.go | 9 +- waku/v2/discv5/discover.go | 37 +++--- waku/v2/discv5/discover_test.go | 6 +- waku/v2/dnsdisc/enr_test.go | 2 +- waku/v2/metrics/metrics.go | 11 +- waku/v2/node/connectedness.go | 11 +- waku/v2/node/keepalive.go | 12 +- waku/v2/node/keepalive_test.go | 2 + waku/v2/node/wakunode2.go | 48 ++++---- waku/v2/node/wakuoptions.go | 13 ++ waku/v2/protocol/filter/filter_subscribers.go | 2 - waku/v2/protocol/filter/waku_filter.go | 48 ++++---- waku/v2/protocol/filter/waku_filter_option.go | 10 +- .../filter/waku_filter_option_test.go | 1 + waku/v2/protocol/filter/waku_filter_test.go | 8 +- waku/v2/protocol/lightpush/waku_lightpush.go | 36 +++--- .../lightpush/waku_lightpush_option.go | 10 +- .../lightpush/waku_lightpush_option_test.go | 1 + .../protocol/lightpush/waku_lightpush_test.go | 10 +- waku/v2/protocol/relay/waku_relay.go | 23 ++-- waku/v2/protocol/relay/waku_relay_test.go | 2 +- waku/v2/protocol/requestId.go | 13 +- waku/v2/protocol/store/message_queue.go | 9 +- waku/v2/protocol/store/message_queue_test.go | 21 +++- waku/v2/protocol/store/waku_resume_test.go | 14 +-- waku/v2/protocol/store/waku_store.go | 111 +++++++++++------- .../store/waku_store_persistence_test.go | 7 +- .../store/waku_store_protocol_test.go | 8 +- .../protocol/store/waku_store_query_test.go | 16 +-- waku/v2/protocol/swap/waku_swap.go | 16 +-- waku/v2/protocol/swap/waku_swap_test.go | 3 +- waku/v2/rpc/admin.go | 8 +- waku/v2/rpc/admin_test.go | 4 +- waku/v2/rpc/filter.go | 9 +- waku/v2/rpc/filter_test.go | 6 +- waku/v2/rpc/private.go | 5 +- waku/v2/rpc/private_test.go | 3 +- waku/v2/rpc/relay.go | 12 +- waku/v2/rpc/relay_test.go | 3 +- waku/v2/rpc/store.go | 4 +- waku/v2/rpc/store_test.go | 3 +- waku/v2/rpc/waku_rpc.go | 53 +++++---- waku/v2/rpc/waku_rpc_test.go | 3 +- waku/v2/utils/enr.go | 3 +- waku/v2/utils/logger.go | 47 ++++++++ waku/v2/utils/peer.go | 8 +- waku/v2/utils/peer_test.go | 8 +- 54 files changed, 481 insertions(+), 318 deletions(-) create mode 100644 waku/v2/utils/logger.go diff --git a/go.mod b/go.mod index e658dfc4..a2bb2195 100644 --- a/go.mod +++ b/go.mod @@ -28,4 +28,5 @@ require ( github.com/stretchr/testify v1.7.0 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 go.opencensus.io v0.23.0 + go.uber.org/zap v1.19.0 ) diff --git a/tests/utils.go b/tests/utils.go index d078b6bc..030692ac 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -9,15 +9,25 @@ import ( "net" "testing" - "github.com/ethereum/go-ethereum/log" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr" "github.com/status-im/go-waku/waku/v2/protocol/pb" + "go.uber.org/zap" ) +var log *zap.SugaredLogger = nil + +func Logger() *zap.SugaredLogger { + if log == nil { + l, _ := zap.NewDevelopment() + log = l.Sugar() + } + return log +} + func GetHostAddress(ha host.Host) ma.Multiaddr { return ha.Addrs()[0] } diff --git a/waku.go b/waku.go index 1d39b9b5..63a81641 100644 --- a/waku.go +++ b/waku.go @@ -1,11 +1,13 @@ package main import ( + "fmt" "os" logging "github.com/ipfs/go-log" "github.com/jessevdk/go-flags" "github.com/status-im/go-waku/waku" + "github.com/status-im/go-waku/waku/v2/utils" ) var options waku.Options @@ -17,11 +19,19 @@ func main() { os.Exit(1) } + // for go-libp2p loggers lvl, err := logging.LevelFromString(options.LogLevel) if err != nil { os.Exit(1) } logging.SetAllLoggers(lvl) + // go-waku logger + fmt.Println(options.LogLevel) + err = utils.SetLogLevel(options.LogLevel) + if err != nil { + os.Exit(1) + } + waku.Execute(options) } diff --git a/waku/metrics/http.go b/waku/metrics/http.go index 4280a7a1..dad80721 100644 --- a/waku/metrics/http.go +++ b/waku/metrics/http.go @@ -6,22 +6,25 @@ import ( "net/http" "contrib.go.opencensus.io/exporter/prometheus" - logging "github.com/ipfs/go-log" "github.com/status-im/go-waku/waku/v2/metrics" "go.opencensus.io/plugin/ochttp" "go.opencensus.io/plugin/runmetrics" "go.opencensus.io/stats/view" + "go.uber.org/zap" ) -var log = logging.Logger("metrics") - // Server runs and controls a HTTP pprof interface. type Server struct { server *http.Server + log *zap.SugaredLogger } // NewMetricsServer creates a prometheus server on a particular interface and port -func NewMetricsServer(address string, port int) *Server { +func NewMetricsServer(address string, port int, log *zap.SugaredLogger) *Server { + p := Server{ + log: log.Named("metrics"), + } + _ = runmetrics.Enable(runmetrics.RunMetricOptions{ EnableCPU: true, EnableMemory: true, @@ -29,12 +32,12 @@ func NewMetricsServer(address string, port int) *Server { pe, err := prometheus.NewExporter(prometheus.Options{}) if err != nil { - log.Fatalf("Failed to create the Prometheus stats exporter: %v", err) + p.log.Fatalf("Failed to create the Prometheus stats exporter: %v", err) } view.RegisterExporter(pe) - log.Info(fmt.Sprintf("Starting server at %s:%d", address, port)) + p.log.Info(fmt.Sprintf("Starting server at %s:%d", address, port)) mux := http.NewServeMux() mux.Handle("/metrics", pe) @@ -55,14 +58,12 @@ func NewMetricsServer(address string, port int) *Server { metrics.PeersView, metrics.DialsView, ); err != nil { - log.Fatalf("Failed to register views: %v", err) + p.log.Fatalf("Failed to register views: %v", err) } - p := Server{ - server: &http.Server{ - Addr: fmt.Sprintf("%s:%d", address, port), - Handler: h, - }, + p.server = &http.Server{ + Addr: fmt.Sprintf("%s:%d", address, port), + Handler: h, } return &p @@ -70,14 +71,14 @@ func NewMetricsServer(address string, port int) *Server { // Start executes the HTTP server in the background. func (p *Server) Start() { - log.Info("server stopped ", p.server.ListenAndServe()) + p.log.Info("server stopped ", p.server.ListenAndServe()) } // Stop shuts down the prometheus server func (p *Server) Stop(ctx context.Context) error { err := p.server.Shutdown(ctx) if err != nil { - log.Error("error while stopping server", err) + p.log.Error("error while stopping server", err) return err } diff --git a/waku/metrics/http_test.go b/waku/metrics/http_test.go index 9600bbcf..034178ee 100644 --- a/waku/metrics/http_test.go +++ b/waku/metrics/http_test.go @@ -5,11 +5,12 @@ import ( "testing" "time" + "github.com/status-im/go-waku/tests" "github.com/stretchr/testify/require" ) func TestStartAndStopMetricsServer(t *testing.T) { - server := NewMetricsServer("0.0.0.0", 9876) + server := NewMetricsServer("0.0.0.0", 9876, tests.Logger()) go func() { time.Sleep(1 * time.Second) diff --git a/waku/node.go b/waku/node.go index e057215a..727c8804 100644 --- a/waku/node.go +++ b/waku/node.go @@ -17,8 +17,8 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/enode" dssql "github.com/ipfs/go-ds-sql" + "go.uber.org/zap" - logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p" libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/discovery" @@ -29,6 +29,7 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/multiformats/go-multiaddr" rendezvous "github.com/status-im/go-waku-rendezvous" + "github.com/status-im/go-waku/tests" "github.com/status-im/go-waku/waku/metrics" "github.com/status-im/go-waku/waku/persistence" "github.com/status-im/go-waku/waku/persistence/sqlite" @@ -39,16 +40,15 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/status-im/go-waku/waku/v2/protocol/store" "github.com/status-im/go-waku/waku/v2/rpc" + "github.com/status-im/go-waku/waku/v2/utils" ) -var log = logging.Logger("wakunode") - func failOnErr(err error, msg string) { if err != nil { if msg != "" { msg = msg + ": " } - log.Fatal(msg, err) + utils.Logger().Fatal(msg, zap.Error(err)) } } @@ -102,11 +102,12 @@ func Execute(options Options) { var metricsServer *metrics.Server if options.Metrics.Enable { - metricsServer = metrics.NewMetricsServer(options.Metrics.Address, options.Metrics.Port) + metricsServer = metrics.NewMetricsServer(options.Metrics.Address, options.Metrics.Port, tests.Logger()) go metricsServer.Start() } nodeOpts := []node.WakuNodeOption{ + node.WithLogger(utils.Logger()), node.WithPrivateKey(prvKey), node.WithHostAddress(hostAddr), node.WithKeepAlive(time.Duration(options.KeepAlive) * time.Second), @@ -179,7 +180,7 @@ func Execute(options Options) { if options.Store.Enable { nodeOpts = append(nodeOpts, node.WithWakuStoreAndRetentionPolicy(options.Store.ShouldResume, options.Store.RetentionMaxDaysDuration(), options.Store.RetentionMaxMessages)) if options.UseDB { - dbStore, err := persistence.NewDBStore(persistence.WithDB(db), persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionMaxDaysDuration())) + dbStore, err := persistence.NewDBStore(tests.Logger(), persistence.WithDB(db), persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionMaxDaysDuration())) failOnErr(err, "DBStore") nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore)) } else { @@ -200,7 +201,7 @@ func Execute(options Options) { for _, addr := range options.DiscV5.Nodes { bootnode, err := enode.Parse(enode.ValidSchemes, addr) if err != nil { - log.Fatal("could not parse enr: ", err) + utils.Logger().Fatal("could not parse enr: ", zap.Error(err)) } bootnodes = append(bootnodes, bootnode) } @@ -217,12 +218,12 @@ func Execute(options Options) { addPeers(wakuNode, options.Filter.Nodes, filter.FilterID_v20beta1) if err = wakuNode.Start(); err != nil { - log.Fatal(fmt.Errorf("could not start waku node, %w", err)) + utils.Logger().Fatal(fmt.Errorf("could not start waku node, %w", err).Error()) } if options.DiscV5.Enable { if err = wakuNode.DiscV5().Start(); err != nil { - log.Fatal(fmt.Errorf("could not start discovery v5, %w", err)) + utils.Logger().Fatal(fmt.Errorf("could not start discovery v5, %w", err).Error()) } } @@ -241,38 +242,38 @@ func Execute(options Options) { go func(node string) { err = wakuNode.DialPeer(ctx, node) if err != nil { - log.Error("error dialing peer ", err) + utils.Logger().Error("error dialing peer ", zap.Error(err)) } }(n) } if options.DNSDiscovery.Enable { if options.DNSDiscovery.URL != "" { - log.Info("attempting DNS discovery with ", options.DNSDiscovery.URL) + utils.Logger().Info("attempting DNS discovery with ", zap.String("URL", options.DNSDiscovery.URL)) multiaddresses, err := dnsdisc.RetrieveNodes(ctx, options.DNSDiscovery.URL, dnsdisc.WithNameserver(options.DNSDiscovery.Nameserver)) if err != nil { - log.Warn("dns discovery error ", err) + utils.Logger().Warn("dns discovery error ", zap.Error(err)) } else { - log.Info("found dns entries ", multiaddresses) + utils.Logger().Info("found dns entries ", zap.Any("multiaddresses", multiaddresses)) for _, m := range multiaddresses { go func(ctx context.Context, m multiaddr.Multiaddr) { ctx, cancel := context.WithTimeout(ctx, time.Duration(3)*time.Second) defer cancel() err = wakuNode.DialPeerWithMultiAddress(ctx, m) if err != nil { - log.Error("error dialing peer ", err) + utils.Logger().Error("error dialing peer ", zap.Error(err)) } }(ctx, m) } } } else { - log.Fatal("DNS discovery URL is required") + utils.Logger().Fatal("DNS discovery URL is required") } } var rpcServer *rpc.WakuRpc if options.RPCServer.Enable { - rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port) + rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port, tests.Logger()) rpcServer.Start() } @@ -280,7 +281,7 @@ func Execute(options Options) { ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) <-ch - log.Info("Received signal, shutting down...") + utils.Logger().Info("Received signal, shutting down...") // shut the node down wakuNode.Stop() diff --git a/waku/persistence/store.go b/waku/persistence/store.go index e1a4fb2e..affc3947 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -2,11 +2,11 @@ package persistence import ( "database/sql" - "log" "time" "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/utils" + "go.uber.org/zap" ) type MessageProvider interface { @@ -18,7 +18,8 @@ type MessageProvider interface { // DBStore is a MessageProvider that has a *sql.DB connection type DBStore struct { MessageProvider - db *sql.DB + db *sql.DB + log *zap.SugaredLogger maxMessages int maxDuration time.Duration @@ -65,8 +66,9 @@ func WithRetentionPolicy(maxMessages int, maxDuration time.Duration) DBOption { // Creates a new DB store using the db specified via options. // It will create a messages table if it does not exist and // clean up records according to the retention policy used -func NewDBStore(options ...DBOption) (*DBStore, error) { +func NewDBStore(log *zap.SugaredLogger, options ...DBOption) (*DBStore, error) { result := new(DBStore) + result.log = log.Named("dbstore") for _, opt := range options { err := opt(result) @@ -168,7 +170,7 @@ func (d *DBStore) GetAll() ([]StoredMessage, error) { err = rows.Scan(&id, &receiverTimestamp, &senderTimestamp, &contentTopic, &pubsubTopic, &payload, &version) if err != nil { - log.Fatal(err) + d.log.Fatal(err) } msg := new(pb.WakuMessage) diff --git a/waku/persistence/store_test.go b/waku/persistence/store_test.go index fabdc6fe..cbe20d8e 100644 --- a/waku/persistence/store_test.go +++ b/waku/persistence/store_test.go @@ -2,7 +2,6 @@ package persistence import ( "database/sql" - "log" "testing" "time" @@ -15,7 +14,7 @@ import ( func NewMock() *sql.DB { db, err := sql.Open("sqlite3", ":memory:") if err != nil { - log.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + tests.Logger().Fatalf("an error '%s' was not expected when opening a stub database connection", err) } return db @@ -32,7 +31,7 @@ func createIndex(digest []byte, receiverTime float64) *pb.Index { func TestDbStore(t *testing.T) { db := NewMock() option := WithDB(db) - store, err := NewDBStore(option) + store, err := NewDBStore(tests.Logger(), option) require.NoError(t, err) res, err := store.GetAll() @@ -53,7 +52,7 @@ func TestDbStore(t *testing.T) { func TestStoreRetention(t *testing.T) { db := NewMock() - store, err := NewDBStore(WithDB(db), WithRetentionPolicy(5, 20*time.Second)) + store, err := NewDBStore(tests.Logger(), WithDB(db), WithRetentionPolicy(5, 20*time.Second)) require.NoError(t, err) insertTime := time.Now() @@ -73,7 +72,7 @@ func TestStoreRetention(t *testing.T) { // This step simulates starting go-waku again from scratch - store, err = NewDBStore(WithDB(db), WithRetentionPolicy(5, 40*time.Second)) + store, err = NewDBStore(tests.Logger(), WithDB(db), WithRetentionPolicy(5, 40*time.Second)) require.NoError(t, err) dbResults, err = store.GetAll() diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 2185ca79..b7a4a396 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -13,16 +13,14 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/nat" - logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/discovery" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/status-im/go-discover/discover" "github.com/status-im/go-waku/waku/v2/utils" + "go.uber.org/zap" ) -var log = logging.Logger("waku_discv5") - type DiscoveryV5 struct { sync.Mutex @@ -37,6 +35,8 @@ type DiscoveryV5 struct { NAT nat.Interface quit chan struct{} + log *zap.SugaredLogger + wg *sync.WaitGroup peerCache peerCache @@ -93,7 +93,7 @@ func DefaultOptions() []DiscoveryV5Option { } } -func NewDiscoveryV5(host host.Host, ipAddr net.IP, tcpPort int, priv *ecdsa.PrivateKey, wakuFlags utils.WakuEnrBitfield, opts ...DiscoveryV5Option) (*DiscoveryV5, error) { +func NewDiscoveryV5(host host.Host, ipAddr net.IP, tcpPort int, priv *ecdsa.PrivateKey, wakuFlags utils.WakuEnrBitfield, log *zap.SugaredLogger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) { params := new(discV5Parameters) optList := DefaultOptions() optList = append(optList, opts...) @@ -101,9 +101,11 @@ func NewDiscoveryV5(host host.Host, ipAddr net.IP, tcpPort int, priv *ecdsa.Priv opt(params) } + logger := log.Named("discv5") + params.tcpPort = tcpPort - localnode, err := newLocalnode(priv, ipAddr, params.udpPort, tcpPort, wakuFlags, params.advertiseAddr) + localnode, err := newLocalnode(priv, ipAddr, params.udpPort, tcpPort, wakuFlags, params.advertiseAddr, logger) if err != nil { return nil, err } @@ -135,10 +137,11 @@ func NewDiscoveryV5(host host.Host, ipAddr net.IP, tcpPort int, priv *ecdsa.Priv IP: net.IPv4zero, Port: params.udpPort, }, + log: logger, }, nil } -func newLocalnode(priv *ecdsa.PrivateKey, ipAddr net.IP, udpPort int, tcpPort int, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP) (*enode.LocalNode, error) { +func newLocalnode(priv *ecdsa.PrivateKey, ipAddr net.IP, udpPort int, tcpPort int, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP, log *zap.SugaredLogger) (*enode.LocalNode, error) { db, err := enode.OpenDB("") if err != nil { return nil, err @@ -194,8 +197,8 @@ func (d *DiscoveryV5) listen() error { d.listener = listener - log.Info(fmt.Sprintf("Started Discovery V5 at %s:%d, advertising IP: %s:%d", d.udpAddr.IP, d.udpAddr.Port, d.localnode.Node().IP(), d.params.tcpPort)) - log.Info("Discovery V5 ", d.localnode.Node()) + d.log.Info(fmt.Sprintf("Started Discovery V5 at %s:%d, advertising IP: %s:%d", d.udpAddr.IP, d.udpAddr.Port, d.localnode.Node().IP(), d.params.tcpPort)) + d.log.Info("Discovery V5 ", d.localnode.Node()) return nil } @@ -225,7 +228,7 @@ func (d *DiscoveryV5) Stop() { d.listener.Close() d.listener = nil - log.Info("Stopped Discovery V5") + d.log.Info("Stopped Discovery V5") d.wg.Wait() } @@ -273,8 +276,8 @@ func (d *DiscoveryV5) UpdateAddr(addr net.IP) error { d.localnode.Set(enr.IP(addr)) - log.Info(fmt.Sprintf("Updated Discovery V5 node IP: %s", d.localnode.Node().IP())) - log.Info("Discovery V5 ", d.localnode.Node()) + d.log.Info(fmt.Sprintf("Updated Discovery V5 node IP: %s", d.localnode.Node().IP())) + d.log.Info("Discovery V5 ", d.localnode.Node()) return nil } @@ -283,7 +286,7 @@ func isWakuNode(node *enode.Node) bool { enrField := new(utils.WakuEnrBitfield) if err := node.Record().Load(enr.WithEntry(utils.WakuENRField, &enrField)); err != nil { if !enr.IsNotFound(err) { - log.Error("could not retrieve port for enr ", node) + utils.Logger().Named("discv5").Error("could not retrieve port for enr ", zap.Any("node", node)) } return false } @@ -299,7 +302,7 @@ func hasTCPPort(node *enode.Node) bool { enrTCP := new(enr.TCP) if err := node.Record().Load(enr.WithEntry(enrTCP.ENRKey(), enrTCP)); err != nil { if !enr.IsNotFound(err) { - log.Error("could not retrieve port for enr ", node) + utils.Logger().Named("discv5").Error("could not retrieve port for enr ", zap.Any("node", node)) } return false } @@ -319,14 +322,14 @@ func evaluateNode(node *enode.Node) bool { _, err := utils.EnodeToPeerInfo(node) if err != nil { - log.Error("could not obtain peer info from enode:", err) + utils.Logger().Named("discv5").Error("could not obtain peer info from enode:", zap.Error(err)) return false } return true } -func (c *DiscoveryV5) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { +func (d *DiscoveryV5) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { // Get options var options discovery.Options err := options.Apply(opts...) @@ -358,13 +361,13 @@ func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limi address, err := utils.EnodeToMultiAddr(iterator.Node()) if err != nil { - log.Error(err) + d.log.Error(err) continue } peerInfo, err := peer.AddrInfoFromP2pAddr(address) if err != nil { - log.Error(err) + d.log.Error(err) continue } diff --git a/waku/v2/discv5/discover_test.go b/waku/v2/discv5/discover_test.go index 5b1ff636..90fccf71 100644 --- a/waku/v2/discv5/discover_test.go +++ b/waku/v2/discv5/discover_test.go @@ -49,19 +49,19 @@ func TestDiscV5(t *testing.T) { host1, tcpPort1, prvKey1 := createHost(t) udpPort1, err := tests.FindFreePort(t, "127.0.0.1", 3) require.NoError(t, err) - d1, err := NewDiscoveryV5(host1, net.IPv4(127, 0, 0, 1), tcpPort1, prvKey1, utils.NewWakuEnrBitfield(true, true, true, true), WithUDPPort(udpPort1)) + d1, err := NewDiscoveryV5(host1, net.IPv4(127, 0, 0, 1), tcpPort1, prvKey1, utils.NewWakuEnrBitfield(true, true, true, true), tests.Logger(), WithUDPPort(udpPort1)) require.NoError(t, err) host2, tcpPort2, prvKey2 := createHost(t) udpPort2, err := tests.FindFreePort(t, "127.0.0.1", 3) require.NoError(t, err) - d2, err := NewDiscoveryV5(host2, net.IPv4(127, 0, 0, 1), tcpPort2, prvKey2, utils.NewWakuEnrBitfield(true, true, true, true), WithUDPPort(udpPort2), WithBootnodes([]*enode.Node{d1.localnode.Node()})) + d2, err := NewDiscoveryV5(host2, net.IPv4(127, 0, 0, 1), tcpPort2, prvKey2, utils.NewWakuEnrBitfield(true, true, true, true), tests.Logger(), WithUDPPort(udpPort2), WithBootnodes([]*enode.Node{d1.localnode.Node()})) require.NoError(t, err) host3, tcpPort3, prvKey3 := createHost(t) udpPort3, err := tests.FindFreePort(t, "127.0.0.1", 3) require.NoError(t, err) - d3, err := NewDiscoveryV5(host3, net.IPv4(127, 0, 0, 1), tcpPort3, prvKey3, utils.NewWakuEnrBitfield(true, true, true, true), WithUDPPort(udpPort3), WithBootnodes([]*enode.Node{d2.localnode.Node()})) + d3, err := NewDiscoveryV5(host3, net.IPv4(127, 0, 0, 1), tcpPort3, prvKey3, utils.NewWakuEnrBitfield(true, true, true, true), tests.Logger(), WithUDPPort(udpPort3), WithBootnodes([]*enode.Node{d2.localnode.Node()})) require.NoError(t, err) defer d1.Stop() diff --git a/waku/v2/dnsdisc/enr_test.go b/waku/v2/dnsdisc/enr_test.go index 98551452..cccf3429 100644 --- a/waku/v2/dnsdisc/enr_test.go +++ b/waku/v2/dnsdisc/enr_test.go @@ -10,7 +10,7 @@ import ( // TestRetrieveNodes uses a live connection, so it could be // flaky, it should though pay for itself and should be fairly stable func TestRetrieveNodes(t *testing.T) { - url := "enrtree://AOFTICU2XWDULNLZGRMQS4RIZPAZEHYMV4FYHAPW563HNRAOERP7C@test.nodes.vac.dev" + url := "enrtree://AOFTICU2XWDULNLZGRMQS4RIZPAZEHYMV4FYHAPW563HNRAOERP7C@test.waku.nodes.status.im" nodes, err := RetrieveNodes(context.Background(), url) require.NoError(t, err) diff --git a/waku/v2/metrics/metrics.go b/waku/v2/metrics/metrics.go index 3a3d2a1d..dcb519a7 100644 --- a/waku/v2/metrics/metrics.go +++ b/waku/v2/metrics/metrics.go @@ -3,14 +3,13 @@ package metrics import ( "context" - logging "github.com/ipfs/go-log" + "github.com/status-im/go-waku/waku/v2/utils" "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" + "go.uber.org/zap" ) -var log = logging.Logger("metrics") - var ( Messages = stats.Int64("node_messages", "Number of messages received", stats.UnitDimensionless) Peers = stats.Int64("peers", "Number of connected peers", stats.UnitDimensionless) @@ -76,18 +75,18 @@ var ( func RecordLightpushError(ctx context.Context, tagType string) { if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(tag.Key(ErrorType), tagType)}, LightpushErrors.M(1)); err != nil { - log.Error("failed to record with tags", err) + utils.Logger().Error("failed to record with tags", zap.Error(err)) } } func RecordMessage(ctx context.Context, tagType string, len int) { if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(KeyType, tagType)}, StoreMessages.M(int64(len))); err != nil { - log.Error("failed to record with tags", err) + utils.Logger().Error("failed to record with tags", zap.Error(err)) } } func RecordStoreError(ctx context.Context, tagType string) { if err := stats.RecordWithTags(ctx, []tag.Mutator{tag.Insert(ErrorType, tagType)}, StoreErrors.M(1)); err != nil { - log.Error("failed to record with tags", err) + utils.Logger().Error("failed to record with tags", zap.Error(err)) } } diff --git a/waku/v2/node/connectedness.go b/waku/v2/node/connectedness.go index 503024f0..62e5499f 100644 --- a/waku/v2/node/connectedness.go +++ b/waku/v2/node/connectedness.go @@ -14,6 +14,7 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/status-im/go-waku/waku/v2/protocol/store" "go.opencensus.io/stats" + "go.uber.org/zap" ) // A map of peer IDs to supported protocols @@ -28,16 +29,18 @@ type ConnStatus struct { type ConnectionNotifier struct { h host.Host ctx context.Context + log *zap.SugaredLogger DisconnectChan chan peer.ID quit chan struct{} } -func NewConnectionNotifier(ctx context.Context, h host.Host) ConnectionNotifier { +func NewConnectionNotifier(ctx context.Context, h host.Host, log *zap.SugaredLogger) ConnectionNotifier { return ConnectionNotifier{ h: h, ctx: ctx, DisconnectChan: make(chan peer.ID, 100), quit: make(chan struct{}), + log: log, } } @@ -51,13 +54,13 @@ func (c ConnectionNotifier) ListenClose(n network.Network, m ma.Multiaddr) { func (c ConnectionNotifier) Connected(n network.Network, cc network.Conn) { // called when a connection opened - log.Info(fmt.Sprintf("Peer %s connected", cc.RemotePeer())) + c.log.Info(fmt.Sprintf("Peer %s connected", cc.RemotePeer())) stats.Record(c.ctx, metrics.Peers.M(1)) } func (c ConnectionNotifier) Disconnected(n network.Network, cc network.Conn) { // called when a connection closed - log.Info(fmt.Sprintf("Peer %s disconnected", cc.RemotePeer())) + c.log.Info(fmt.Sprintf("Peer %s disconnected", cc.RemotePeer())) stats.Record(c.ctx, metrics.Peers.M(-1)) c.DisconnectChan <- cc.RemotePeer() } @@ -107,7 +110,7 @@ func (w *WakuNode) Status() (isOnline bool, hasHistory bool) { for _, peer := range w.host.Network().Peers() { protocols, err := w.host.Peerstore().GetProtocols(peer) if err != nil { - log.Warn(fmt.Errorf("could not read peer %s protocols", peer)) + w.log.Warn(fmt.Errorf("could not read peer %s protocols", peer)) } for _, protocol := range protocols { diff --git a/waku/v2/node/keepalive.go b/waku/v2/node/keepalive.go index a5c5e979..d12721bf 100644 --- a/waku/v2/node/keepalive.go +++ b/waku/v2/node/keepalive.go @@ -19,7 +19,7 @@ const maxPublishAttempt = 5 func (w *WakuNode) startKeepAlive(t time.Duration) { go func() { defer w.wg.Done() - log.Info("Setting up ping protocol with duration of ", t) + w.log.Info("Setting up ping protocol with duration of ", t) ticker := time.NewTicker(t) defer ticker.Stop() for { @@ -52,25 +52,25 @@ func (w *WakuNode) pingPeer(peer peer.ID) { ctx, cancel := context.WithTimeout(w.ctx, 3*time.Second) defer cancel() - log.Debug("Pinging ", peer) + w.log.Debug("Pinging ", peer) pr := ping.Ping(ctx, w.host, peer) select { case res := <-pr: if res.Error != nil { w.keepAliveFails[peer]++ - log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, res.Error.Error())) + w.log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, res.Error.Error())) } else { w.keepAliveFails[peer] = 0 } case <-ctx.Done(): w.keepAliveFails[peer]++ - log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, ctx.Err())) + w.log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, ctx.Err())) } if w.keepAliveFails[peer] > maxAllowedPingFailures && w.host.Network().Connectedness(peer) == network.Connected { - log.Info("Disconnecting peer ", peer) + w.log.Info("Disconnecting peer ", peer) if err := w.host.Network().ClosePeer(peer); err != nil { - log.Debug(fmt.Sprintf("Could not close conn to peer %s: %s", peer, err)) + w.log.Debug(fmt.Sprintf("Could not close conn to peer %s: %s", peer, err)) } w.keepAliveFails[peer] = 0 } diff --git a/waku/v2/node/keepalive_test.go b/waku/v2/node/keepalive_test.go index f9300e9a..f37abd33 100644 --- a/waku/v2/node/keepalive_test.go +++ b/waku/v2/node/keepalive_test.go @@ -9,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/status-im/go-waku/tests" "github.com/stretchr/testify/require" ) @@ -36,6 +37,7 @@ func TestKeepAlive(t *testing.T) { host: host1, ctx: ctx2, wg: wg, + log: tests.Logger(), keepAliveMutex: sync.Mutex{}, keepAliveFails: make(map[peer.ID]int), } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 1fe175fa..6de72f9a 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -10,8 +10,8 @@ import ( "time" "github.com/ethereum/go-ethereum/common/hexutil" - logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p" + "go.uber.org/zap" "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/host" @@ -37,8 +37,6 @@ import ( "github.com/status-im/go-waku/waku/v2/utils" ) -var log = logging.Logger("wakunode") - type Peer struct { ID peer.ID Protocols []string @@ -49,6 +47,7 @@ type Peer struct { type WakuNode struct { host host.Host opts *WakuNodeParameters + log *zap.SugaredLogger relay *relay.WakuRelay filter *filter.WakuFilter @@ -130,6 +129,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { w.cancel = cancel w.ctx = ctx w.opts = params + w.log = params.logger.Named("node2") w.quit = make(chan struct{}) w.wg = &sync.WaitGroup{} w.addrChan = make(chan ma.Multiaddr, 1024) @@ -152,7 +152,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { w.connStatusChan = params.connStatusC } - w.connectionNotif = NewConnectionNotifier(ctx, host) + w.connectionNotif = NewConnectionNotifier(ctx, host, w.log) w.host.Network().Notify(w.connectionNotif) w.wg.Add(2) @@ -172,7 +172,7 @@ func (w *WakuNode) onAddrChange() { for m := range w.addrChan { ipStr, err := m.ValueForProtocol(ma.P_IP4) if err != nil { - log.Error(fmt.Sprintf("could not extract ip from ma %s: %s", m, err.Error())) + w.log.Error(fmt.Sprintf("could not extract ip from ma %s: %s", m, err.Error())) continue } ip := net.ParseIP(ipStr) @@ -180,7 +180,7 @@ func (w *WakuNode) onAddrChange() { if w.opts.enableDiscV5 { err := w.discoveryV5.UpdateAddr(ip) if err != nil { - log.Error(fmt.Sprintf("could not update DiscV5 address with IP %s: %s", ip, err.Error())) + w.log.Error(fmt.Sprintf("could not update DiscV5 address with IP %s: %s", ip, err.Error())) continue } } @@ -189,15 +189,15 @@ func (w *WakuNode) onAddrChange() { } func (w *WakuNode) logAddress(addr ma.Multiaddr) { - log.Info("Listening on ", addr) + w.log.Info("Listening on ", addr) // TODO: make this optional depending on DNS Disc being enabled if w.opts.privKey != nil { enr, ip, err := utils.GetENRandIP(addr, w.wakuFlag, w.opts.privKey) if err != nil { - log.Error("could not obtain ENR record from multiaddress", err) + w.log.Error("could not obtain ENR record from multiaddress", err) } else { - log.Info(fmt.Sprintf("ENR for IP %s: %s", ip, enr)) + w.log.Info(fmt.Sprintf("ENR for IP %s: %s", ip, enr)) } } } @@ -231,7 +231,7 @@ func (w *WakuNode) checkForAddressChanges() { } if print { addrs = newAddrs - log.Warn("Change in host multiaddresses") + w.log.Warn("Change in host multiaddresses") for _, addr := range newAddrs { w.addrChan <- addr w.logAddress(addr) @@ -242,18 +242,18 @@ func (w *WakuNode) checkForAddressChanges() { } func (w *WakuNode) Start() error { - w.swap = swap.NewWakuSwap([]swap.SwapOption{ + w.swap = swap.NewWakuSwap(w.log, []swap.SwapOption{ swap.WithMode(w.opts.swapMode), swap.WithThreshold(w.opts.swapPaymentThreshold, w.opts.swapDisconnectThreshold), }...) - w.store = store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration) + w.store = store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log) if w.opts.enableStore { w.startStore() } if w.opts.enableFilter { - filter, err := filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode, w.opts.filterOpts...) + filter, err := filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode, w.log, w.opts.filterOpts...) if err != nil { return err } @@ -281,7 +281,7 @@ func (w *WakuNode) Start() error { return err } - w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay) + w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay, w.log) if w.opts.enableLightPush { if err := w.lightPush.Start(); err != nil { return err @@ -297,12 +297,12 @@ func (w *WakuNode) Start() error { // Subscribe store to topic if w.opts.storeMsgs { - log.Info("Subscribing store to broadcaster") + w.log.Info("Subscribing store to broadcaster") w.bcaster.Register(w.store.MsgC) } if w.filter != nil { - log.Info("Subscribing filter to broadcaster") + w.log.Info("Subscribing filter to broadcaster") w.bcaster.Register(w.filter.MsgC) } @@ -392,11 +392,11 @@ func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error { if !w.lightPush.IsStarted() { err = errors.New("not enought peers for relay and lightpush is not yet started") } else { - log.Debug("publishing message via lightpush", hexutil.Encode(hash)) + w.log.Debug("publishing message via lightpush", hexutil.Encode(hash)) _, err = w.Lightpush().Publish(ctx, msg) } } else { - log.Debug("publishing message via relay", hexutil.Encode(hash)) + w.log.Debug("publishing message via relay", hexutil.Encode(hash)) _, err = w.Relay().Publish(ctx, msg) } @@ -408,7 +408,7 @@ func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error { func (w *WakuNode) mountRelay(minRelayPeersToPublish int, opts ...pubsub.Option) error { var err error - w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, minRelayPeersToPublish, opts...) + w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, minRelayPeersToPublish, w.log, opts...) if err != nil { return err } @@ -449,7 +449,7 @@ func (w *WakuNode) mountDiscV5() error { return err } - w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), net.ParseIP(ipStr), port, w.opts.privKey, w.wakuFlag, discV5Options...) + w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), net.ParseIP(ipStr), port, w.opts.privKey, w.wakuFlag, w.log, discV5Options...) return err } @@ -461,7 +461,7 @@ func (w *WakuNode) mountRendezvous() error { return err } - log.Info("Rendezvous service started") + w.log.Info("Rendezvous service started") return nil } @@ -485,7 +485,7 @@ func (w *WakuNode) startStore() { case <-w.quit: return case <-ticker.C: - _, err := utils.SelectPeer(w.host, string(store.StoreID_v20beta3)) + _, err := utils.SelectPeer(w.host, string(store.StoreID_v20beta3), w.log) if err == nil { break peerVerif } @@ -495,7 +495,7 @@ func (w *WakuNode) startStore() { ctxWithTimeout, ctxCancel := context.WithTimeout(w.ctx, 20*time.Second) defer ctxCancel() if _, err := w.store.Resume(ctxWithTimeout, string(relay.DefaultWakuTopic), nil); err != nil { - log.Info("Retrying in 10s...") + w.log.Info("Retrying in 10s...") time.Sleep(10 * time.Second) } else { break @@ -506,7 +506,7 @@ func (w *WakuNode) startStore() { } func (w *WakuNode) addPeer(info *peer.AddrInfo, protocolID p2pproto.ID) error { - log.Info(fmt.Sprintf("Adding peer %s to peerstore", info.ID.Pretty())) + w.log.Info(fmt.Sprintf("Adding peer %s to peerstore", info.ID.Pretty())) w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) err := w.host.Peerstore().AddProtocols(info.ID, string(protocolID)) if err != nil { diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 8b3860b3..0615c281 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -19,6 +19,8 @@ import ( rendezvous "github.com/status-im/go-waku-rendezvous" "github.com/status-im/go-waku/waku/v2/protocol/filter" "github.com/status-im/go-waku/waku/v2/protocol/store" + "github.com/status-im/go-waku/waku/v2/utils" + "go.uber.org/zap" ) // Default clientId @@ -35,6 +37,8 @@ type WakuNodeParameters struct { privKey *ecdsa.PrivateKey libP2POpts []libp2p.Option + logger *zap.SugaredLogger + enableRelay bool enableFilter bool isFilterFullNode bool @@ -76,6 +80,7 @@ type WakuNodeOption func(*WakuNodeParameters) error // Default options used in the libp2p node var DefaultWakuNodeOptions = []WakuNodeOption{ + WithLogger(utils.Logger()), WithWakuRelay(), } @@ -93,6 +98,14 @@ func (w WakuNodeParameters) AddressFactory() basichost.AddrsFactory { return w.addressFactory } +// WithLogger is a WakuNodeOption that adds a custom logger +func WithLogger(l *zap.Logger) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.logger = l.Sugar() + return nil + } +} + // WithHostAddress is a WakuNodeOption that configures libp2p to listen on a specific address func WithHostAddress(hostAddr *net.TCPAddr) WakuNodeOption { return func(params *WakuNodeParameters) error { diff --git a/waku/v2/protocol/filter/filter_subscribers.go b/waku/v2/protocol/filter/filter_subscribers.go index 82c17f1c..0c9adf85 100644 --- a/waku/v2/protocol/filter/filter_subscribers.go +++ b/waku/v2/protocol/filter/filter_subscribers.go @@ -77,8 +77,6 @@ func (sub *Subscribers) FlagAsFailure(peerID peer.ID) { if ok { elapsedTime := time.Since(lastFailure) if elapsedTime > sub.timeout { - log.Debug("filter timeout reached for peer:", peerID) - var tmpSubs []Subscriber for _, s := range sub.subscribers { if s.peer != peerID { diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index 72d7c3f4..795bf676 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -8,7 +8,6 @@ import ( "math" "sync" - logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -19,10 +18,9 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol/pb" "go.opencensus.io/stats" "go.opencensus.io/tag" + "go.uber.org/zap" ) -var log = logging.Logger("wakufilter") - var ( ErrNoPeersAvailable = errors.New("no suitable remote peers") ) @@ -51,6 +49,7 @@ type ( isFullNode bool MsgC chan *protocol.Envelope wg *sync.WaitGroup + log *zap.SugaredLogger filters *FilterMap subscribers *Subscribers @@ -62,10 +61,13 @@ type ( // relay protocol. const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1") -func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, opts ...Option) (*WakuFilter, error) { +func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, log *zap.SugaredLogger, opts ...Option) (*WakuFilter, error) { + wf := new(WakuFilter) + wf.log = log.Named("filter") + ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter")) if err != nil { - log.Error(err) + wf.log.Error(err) return nil, errors.New("could not start waku filter") } @@ -76,7 +78,6 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, opts .. opt(params) } - wf := new(WakuFilter) wf.ctx = ctx wf.wg = &sync.WaitGroup{} wf.MsgC = make(chan *protocol.Envelope, 1024) @@ -91,9 +92,9 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, opts .. go wf.FilterListener() if wf.isFullNode { - log.Info("Filter protocol started") + wf.log.Info("Filter protocol started") } else { - log.Info("Filter protocol started (only client mode)") + wf.log.Info("Filter protocol started (only client mode)") } return wf, nil @@ -108,11 +109,11 @@ func (wf *WakuFilter) onRequest(s network.Stream) { err := reader.ReadMsg(filterRPCRequest) if err != nil { - log.Error("error reading request", err) + wf.log.Error("error reading request", err) return } - log.Info(fmt.Sprintf("%s: received request from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) + wf.log.Info(fmt.Sprintf("%s: received request from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) if filterRPCRequest.Push != nil && len(filterRPCRequest.Push.Messages) > 0 { // We're on a light node. @@ -121,7 +122,7 @@ func (wf *WakuFilter) onRequest(s network.Stream) { wf.filters.Notify(message, filterRPCRequest.RequestId) // Trigger filter handlers on a light node } - log.Info("filter light node, received a message push. ", len(filterRPCRequest.Push.Messages), " messages") + wf.log.Info("filter light node, received a message push. ", len(filterRPCRequest.Push.Messages), " messages") stats.Record(wf.ctx, metrics.Messages.M(int64(len(filterRPCRequest.Push.Messages)))) } else if filterRPCRequest.Request != nil && wf.isFullNode { // We're on a full node. @@ -130,17 +131,17 @@ func (wf *WakuFilter) onRequest(s network.Stream) { subscriber := Subscriber{peer: s.Conn().RemotePeer(), requestId: filterRPCRequest.RequestId, filter: *filterRPCRequest.Request} len := wf.subscribers.Append(subscriber) - log.Info("filter full node, add a filter subscriber: ", subscriber.peer) + wf.log.Info("filter full node, add a filter subscriber: ", subscriber.peer) stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len))) } else { peerId := s.Conn().RemotePeer() wf.subscribers.RemoveContentFilters(peerId, filterRPCRequest.Request.ContentFilters) - log.Info("filter full node, remove a filter subscriber: ", peerId.Pretty()) + wf.log.Info("filter full node, remove a filter subscriber: ", peerId.Pretty()) stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(wf.subscribers.Length()))) } } else { - log.Error("can't serve request") + wf.log.Error("can't serve request") return } } @@ -152,7 +153,7 @@ func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) er if err != nil { wf.subscribers.FlagAsFailure(subscriber.peer) - log.Error("failed to open peer stream", err) + wf.log.Error("failed to open peer stream", err) //waku_filter_errors.inc(labelValues = [dialFailure]) return err } @@ -161,7 +162,7 @@ func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) er writer := protoio.NewDelimitedWriter(conn) err = writer.WriteMsg(pushRPC) if err != nil { - log.Error("failed to push messages to remote peer", err) + wf.log.Error("failed to push messages to remote peer", err) wf.subscribers.FlagAsFailure(subscriber.peer) return nil } @@ -182,15 +183,15 @@ func (wf *WakuFilter) FilterListener() { // a FilterRequest on this node for subscriber := range wf.subscribers.Items() { if subscriber.filter.Topic != "" && subscriber.filter.Topic != topic { - log.Info("Subscriber's filter pubsubTopic does not match message topic", subscriber.filter.Topic, topic) + wf.log.Info("Subscriber's filter pubsubTopic does not match message topic", subscriber.filter.Topic, topic) continue } for _, filter := range subscriber.filter.ContentFilters { if msg.ContentTopic == filter.ContentTopic { - log.Info("found matching contentTopic ", filter, msg) + wf.log.Info("found matching contentTopic ", filter, msg) // Do a message push to light node - log.Info("pushing messages to light node: ", subscriber.peer) + wf.log.Info("pushing messages to light node: ", subscriber.peer) if err := wf.pushMessage(subscriber, msg); err != nil { return err } @@ -204,7 +205,7 @@ func (wf *WakuFilter) FilterListener() { for m := range wf.MsgC { if err := handle(m); err != nil { - log.Error("failed to handle message", err) + wf.log.Error("failed to handle message", err) } } } @@ -214,6 +215,7 @@ func (wf *WakuFilter) FilterListener() { // and submit FilterRequest wrapped in FilterRPC func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFilter, opts ...FilterSubscribeOption) (subscription *FilterSubscription, err error) { params := new(FilterSubscribeParameters) + params.log = wf.log params.host = wf.h optList := DefaultSubscribtionOptions() @@ -250,10 +252,10 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil writer := protoio.NewDelimitedWriter(conn) filterRPC := &pb.FilterRPC{RequestId: requestID, Request: &request} - log.Info("sending filterRPC: ", filterRPC) + wf.log.Info("sending filterRPC: ", filterRPC) err = writer.WriteMsg(filterRPC) if err != nil { - log.Error("failed to write message", err) + wf.log.Error("failed to write message", err) return } @@ -313,7 +315,7 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, f ContentFilter, opts ...Fi remoteSubs, err := wf.requestSubscription(ctx, f, opts...) if err != nil || remoteSubs.RequestID == "" { // Failed to subscribe - log.Error("remote subscription to filter failed", err) + wf.log.Error("remote subscription to filter failed", err) return } diff --git a/waku/v2/protocol/filter/waku_filter_option.go b/waku/v2/protocol/filter/waku_filter_option.go index 8a6a07b7..135eb7dc 100644 --- a/waku/v2/protocol/filter/waku_filter_option.go +++ b/waku/v2/protocol/filter/waku_filter_option.go @@ -7,12 +7,14 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/status-im/go-waku/waku/v2/utils" + "go.uber.org/zap" ) type ( FilterSubscribeParameters struct { host host.Host selectedPeer peer.ID + log *zap.SugaredLogger } FilterSubscribeOption func(*FilterSubscribeParameters) @@ -38,22 +40,22 @@ func WithPeer(p peer.ID) FilterSubscribeOption { func WithAutomaticPeerSelection() FilterSubscribeOption { return func(params *FilterSubscribeParameters) { - p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1)) + p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1), params.log) if err == nil { params.selectedPeer = *p } else { - log.Info("Error selecting peer: ", err) + params.log.Info("Error selecting peer: ", err) } } } func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1)) + p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1), params.log) if err == nil { params.selectedPeer = *p } else { - log.Info("Error selecting peer: ", err) + params.log.Info("Error selecting peer: ", err) } } } diff --git a/waku/v2/protocol/filter/waku_filter_option_test.go b/waku/v2/protocol/filter/waku_filter_option_test.go index c324f749..2a2d8fd4 100644 --- a/waku/v2/protocol/filter/waku_filter_option_test.go +++ b/waku/v2/protocol/filter/waku_filter_option_test.go @@ -24,6 +24,7 @@ func TestFilterOption(t *testing.T) { params := new(FilterSubscribeParameters) params.host = host + params.log = tests.Logger() for _, opt := range options { opt(params) diff --git a/waku/v2/protocol/filter/waku_filter_test.go b/waku/v2/protocol/filter/waku_filter_test.go index 5c1af5c4..ab7e101a 100644 --- a/waku/v2/protocol/filter/waku_filter_test.go +++ b/waku/v2/protocol/filter/waku_filter_test.go @@ -22,7 +22,7 @@ func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*rel host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := relay.NewWakuRelay(context.Background(), host, broadcaster, 0) + relay, err := relay.NewWakuRelay(context.Background(), host, broadcaster, 0, tests.Logger()) require.NoError(t, err) sub, err := relay.SubscribeToTopic(context.Background(), topic) @@ -38,7 +38,7 @@ func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - filter, _ := NewWakuFilter(context.Background(), host, false) + filter, _ := NewWakuFilter(context.Background(), host, false, tests.Logger()) return filter, host } @@ -68,7 +68,7 @@ func TestWakuFilter(t *testing.T) { defer node2.Stop() defer sub2.Unsubscribe() - node2Filter, _ := NewWakuFilter(ctx, host2, true) + node2Filter, _ := NewWakuFilter(ctx, host2, true, tests.Logger()) broadcaster.Register(node2Filter.MsgC) host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) @@ -153,7 +153,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { defer node2.Stop() defer sub2.Unsubscribe() - node2Filter, _ := NewWakuFilter(ctx, host2, true, WithTimeout(3*time.Second)) + node2Filter, _ := NewWakuFilter(ctx, host2, true, tests.Logger(), WithTimeout(3*time.Second)) broadcaster.Register(node2Filter.MsgC) host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index e83f50dd..1106ec25 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -7,7 +7,6 @@ import ( "fmt" "math" - logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" @@ -16,10 +15,9 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/relay" + "go.uber.org/zap" ) -var log = logging.Logger("waku_lightpush") - const LightPushID_v20beta1 = libp2pProtocol.ID("/vac/waku/lightpush/2.0.0-beta1") var ( @@ -32,14 +30,17 @@ type WakuLightPush struct { relay *relay.WakuRelay ctx context.Context + log *zap.SugaredLogger + started bool } -func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay) *WakuLightPush { +func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay, log *zap.SugaredLogger) *WakuLightPush { wakuLP := new(WakuLightPush) wakuLP.relay = relay wakuLP.ctx = ctx wakuLP.h = h + wakuLP.log = log.Named("lightpush") return wakuLP } @@ -50,7 +51,7 @@ func (wakuLP *WakuLightPush) Start() error { } wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest) - log.Info("Light Push protocol started") + wakuLP.log.Info("Light Push protocol started") wakuLP.started = true return nil @@ -70,15 +71,15 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) { err := reader.ReadMsg(requestPushRPC) if err != nil { - log.Error("error reading request", err) + wakuLP.log.Error("error reading request", err) metrics.RecordLightpushError(wakuLP.ctx, "decodeRpcFailure") return } - log.Info(fmt.Sprintf("%s: lightpush message received from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) + wakuLP.log.Info(fmt.Sprintf("%s: lightpush message received from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) if requestPushRPC.Query != nil { - log.Info("lightpush push request") + wakuLP.log.Info("lightpush push request") response := new(pb.PushResponse) if !wakuLP.IsClientOnly() { pubSubTopic := requestPushRPC.Query.PubsubTopic @@ -97,7 +98,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) { response.Info = "Totally" // TODO: ask about this } } else { - log.Debug("no relay protocol present, unsuccessful push") + wakuLP.log.Debug("no relay protocol present, unsuccessful push") response.IsSuccess = false response.Info = "No relay protocol" } @@ -108,18 +109,18 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) { err = writer.WriteMsg(responsePushRPC) if err != nil { - log.Error("error writing response", err) + wakuLP.log.Error("error writing response", err) _ = s.Reset() } else { - log.Info(fmt.Sprintf("%s: response sent to %s", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String())) + wakuLP.log.Info(fmt.Sprintf("%s: response sent to %s", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String())) } } if requestPushRPC.Response != nil { if requestPushRPC.Response.IsSuccess { - log.Info("lightpush message success") + wakuLP.log.Info("lightpush message success") } else { - log.Info(fmt.Sprintf("lightpush message failure. info=%s", requestPushRPC.Response.Info)) + wakuLP.log.Info(fmt.Sprintf("lightpush message failure. info=%s", requestPushRPC.Response.Info)) } } } @@ -127,6 +128,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) { func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) { params := new(LightPushParameters) params.host = wakuLP.h + params.log = wakuLP.log optList := DefaultOptions(wakuLP.h) optList = append(optList, opts...) @@ -145,7 +147,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o connOpt, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1) if err != nil { - log.Info("failed to connect to remote peer", err) + wakuLP.log.Info("failed to connect to remote peer", err) metrics.RecordLightpushError(wakuLP.ctx, "dialError") return nil, err } @@ -155,7 +157,7 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o err := connOpt.Reset() if err != nil { metrics.RecordLightpushError(wakuLP.ctx, "dialError") - log.Error("failed to reset connection", err) + wakuLP.log.Error("failed to reset connection", err) } }() @@ -166,14 +168,14 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o err = writer.WriteMsg(pushRequestRPC) if err != nil { - log.Error("could not write request", err) + wakuLP.log.Error("could not write request", err) return nil, err } pushResponseRPC := &pb.PushRPC{} err = reader.ReadMsg(pushResponseRPC) if err != nil { - log.Error("could not read response", err) + wakuLP.log.Error("could not read response", err) metrics.RecordLightpushError(wakuLP.ctx, "decodeRPCFailure") return nil, err } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index d880f597..5c55af93 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -7,12 +7,14 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/utils" + "go.uber.org/zap" ) type LightPushParameters struct { host host.Host selectedPeer peer.ID requestId []byte + log *zap.SugaredLogger } type LightPushOption func(*LightPushParameters) @@ -25,22 +27,22 @@ func WithPeer(p peer.ID) LightPushOption { func WithAutomaticPeerSelection(host host.Host) LightPushOption { return func(params *LightPushParameters) { - p, err := utils.SelectPeer(host, string(LightPushID_v20beta1)) + p, err := utils.SelectPeer(host, string(LightPushID_v20beta1), params.log) if err == nil { params.selectedPeer = *p } else { - log.Info("Error selecting peer: ", err) + params.log.Info("Error selecting peer: ", err) } } } func WithFastestPeerSelection(ctx context.Context) LightPushOption { return func(params *LightPushParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1)) + p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1), params.log) if err == nil { params.selectedPeer = *p } else { - log.Info("Error selecting peer: ", err) + params.log.Info("Error selecting peer: ", err) } } } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option_test.go b/waku/v2/protocol/lightpush/waku_lightpush_option_test.go index 98fdf3c0..a0cee488 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option_test.go @@ -26,6 +26,7 @@ func TestLightPushOption(t *testing.T) { params := new(LightPushParameters) params.host = host + params.log = tests.Logger() for _, opt := range options { opt(params) diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index c928be4f..795affd8 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -24,7 +24,7 @@ func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscri host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0) + relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, tests.Logger()) require.NoError(t, err) sub, err := relay.SubscribeToTopic(context.Background(), topic) @@ -55,7 +55,7 @@ func TestWakuLightPush(t *testing.T) { defer sub2.Unsubscribe() ctx := context.Background() - lightPushNode2 := NewWakuLightPush(ctx, host2, node2) + lightPushNode2 := NewWakuLightPush(ctx, host2, node2, tests.Logger()) err := lightPushNode2.Start() require.NoError(t, err) defer lightPushNode2.Stop() @@ -65,7 +65,7 @@ func TestWakuLightPush(t *testing.T) { clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - client := NewWakuLightPush(ctx, clientHost, nil) + client := NewWakuLightPush(ctx, clientHost, nil, tests.Logger()) host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) err = host2.Peerstore().AddProtocols(host1.ID(), string(relay.WakuRelayID_v200)) @@ -121,7 +121,7 @@ func TestWakuLightPushStartWithoutRelay(t *testing.T) { clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader) require.NoError(t, err) - client := NewWakuLightPush(ctx, clientHost, nil) + client := NewWakuLightPush(ctx, clientHost, nil, tests.Logger()) err = client.Start() require.Errorf(t, err, "relay is required") @@ -135,7 +135,7 @@ func TestWakuLightPushNoPeers(t *testing.T) { clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader) require.NoError(t, err) - client := NewWakuLightPush(ctx, clientHost, nil) + client := NewWakuLightPush(ctx, clientHost, nil, tests.Logger()) _, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", float64(0)), testTopic) require.Errorf(t, err, "no suitable remote peers") diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index e61086d6..abd09b2f 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -8,11 +8,11 @@ import ( "sync" proto "github.com/golang/protobuf/proto" - logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/protocol" "go.opencensus.io/stats" "go.opencensus.io/tag" + "go.uber.org/zap" pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" @@ -22,8 +22,6 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol/pb" ) -var log = logging.Logger("wakurelay") - const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0") var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic().String() @@ -32,6 +30,8 @@ type WakuRelay struct { host host.Host pubsub *pubsub.PubSub + log *zap.SugaredLogger + bcaster v2.Broadcaster minPeersToPublish int @@ -52,7 +52,7 @@ func msgIdFn(pmsg *pubsub_pb.Message) string { return string(hash[:]) } -func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, minPeersToPublish int, opts ...pubsub.Option) (*WakuRelay, error) { +func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, minPeersToPublish int, log *zap.SugaredLogger, opts ...pubsub.Option) (*WakuRelay, error) { w := new(WakuRelay) w.host = h w.wakuRelayTopics = make(map[string]*pubsub.Topic) @@ -60,6 +60,7 @@ func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, minP w.subscriptions = make(map[string][]*Subscription) w.bcaster = bcaster w.minPeersToPublish = minPeersToPublish + w.log = log.Named("relay") // default options required by WakuRelay opts = append(opts, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) @@ -86,7 +87,7 @@ func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, minP } w.pubsub = ps - log.Info("Relay protocol started") + w.log.Info("Relay protocol started") return w, nil } @@ -140,7 +141,7 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro } w.relaySubs[topic] = sub - log.Info("Subscribing to topic ", topic) + w.log.Info("Subscribing to topic ", topic) } return sub, nil @@ -243,7 +244,7 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error { if _, ok := w.relaySubs[topic]; !ok { return fmt.Errorf("topics %s is not subscribed", (string)(topic)) } - log.Info("Unsubscribing from topic ", topic) + w.log.Info("Unsubscribing from topic ", topic) for _, sub := range w.subscriptions[topic] { sub.Unsubscribe() @@ -266,14 +267,14 @@ func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) < go func(msgChannel chan *pubsub.Message) { defer func() { if r := recover(); r != nil { - log.Debug("recovered msgChannel") + w.log.Debug("recovered msgChannel") } }() for { msg, err := sub.Next(ctx) if err != nil { - log.Error(fmt.Errorf("subscription failed: %w", err)) + w.log.Error(fmt.Errorf("subscription failed: %w", err)) sub.Cancel() close(msgChannel) for _, subscription := range w.subscriptions[sub.Topic()] { @@ -290,7 +291,7 @@ func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) < func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub *pubsub.Subscription) { ctx, err := tag.New(context.Background(), tag.Insert(metrics.KeyType, "relay")) if err != nil { - log.Error(err) + w.log.Error(err) return } @@ -310,7 +311,7 @@ func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub * stats.Record(ctx, metrics.Messages.M(1)) wakuMessage := &pb.WakuMessage{} if err := proto.Unmarshal(msg.Data, wakuMessage); err != nil { - log.Error("could not decode message", err) + w.log.Error("could not decode message", err) return } diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index 8021efd2..90963386 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -19,7 +19,7 @@ func TestWakuRelay(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := NewWakuRelay(context.Background(), host, nil, 0) + relay, err := NewWakuRelay(context.Background(), host, nil, 0, tests.Logger()) defer relay.Stop() require.NoError(t, err) diff --git a/waku/v2/protocol/requestId.go b/waku/v2/protocol/requestId.go index 3a626af5..f0fcbdab 100644 --- a/waku/v2/protocol/requestId.go +++ b/waku/v2/protocol/requestId.go @@ -5,16 +5,15 @@ import ( "sync" "github.com/cruxic/go-hmac-drbg/hmacdrbg" - logging "github.com/ipfs/go-log" + "github.com/status-im/go-waku/waku/v2/utils" + "go.uber.org/zap" ) -var log = logging.Logger("request-gen") - var brHmacDrbgPool = sync.Pool{New: func() interface{} { seed := make([]byte, 48) _, err := rand.Read(seed) if err != nil { - log.Fatal(err) + utils.Logger().Fatal("rand.Read err", zap.Error(err)) } return hmacdrbg.NewHmacDrbg(256, seed, nil) }} @@ -31,16 +30,16 @@ func GenerateRequestId() []byte { seed := make([]byte, 48) _, err := rand.Read(seed) if err != nil { - log.Fatal(err) + utils.Logger().Fatal("rand.Read err", zap.Error(err)) } err = rng.Reseed(seed) if err != nil { //only happens if seed < security-level - log.Fatal(err) + utils.Logger().Fatal("rng.Reseed err", zap.Error(err)) } if !rng.Generate(randData) { - log.Error("could not generate random request id") + utils.Logger().Error("could not generate random request id") } } return randData diff --git a/waku/v2/protocol/store/message_queue.go b/waku/v2/protocol/store/message_queue.go index fa7a52d6..71333111 100644 --- a/waku/v2/protocol/store/message_queue.go +++ b/waku/v2/protocol/store/message_queue.go @@ -1,6 +1,7 @@ package store import ( + "errors" "sync" "time" @@ -19,14 +20,16 @@ type MessageQueue struct { wg *sync.WaitGroup } -func (self *MessageQueue) Push(msg IndexedWakuMessage) { +var ErrDuplicatedMessage = errors.New("duplicated message") + +func (self *MessageQueue) Push(msg IndexedWakuMessage) error { self.Lock() defer self.Unlock() var k [32]byte copy(k[:], msg.index.Digest) if _, ok := self.seen[k]; ok { - return + return ErrDuplicatedMessage } self.seen[k] = struct{}{} @@ -36,6 +39,8 @@ func (self *MessageQueue) Push(msg IndexedWakuMessage) { numToPop := len(self.messages) - self.maxMessages self.messages = self.messages[numToPop:len(self.messages)] } + + return nil } func (self *MessageQueue) Messages() <-chan IndexedWakuMessage { diff --git a/waku/v2/protocol/store/message_queue_test.go b/waku/v2/protocol/store/message_queue_test.go index 2c3bc735..0afa3f73 100644 --- a/waku/v2/protocol/store/message_queue_test.go +++ b/waku/v2/protocol/store/message_queue_test.go @@ -18,13 +18,20 @@ func TestMessageQueue(t *testing.T) { msg5 := tests.CreateWakuMessage("3", 3) msgQ := NewMessageQueue(3, 1*time.Minute) - msgQ.Push(IndexedWakuMessage{msg: msg1, index: &pb.Index{Digest: []byte{1}, ReceiverTime: utils.GetUnixEpochFrom(time.Now().Add(-20 * time.Second))}, pubsubTopic: "test"}) - msgQ.Push(IndexedWakuMessage{msg: msg2, index: &pb.Index{Digest: []byte{2}, ReceiverTime: utils.GetUnixEpochFrom(time.Now().Add(-15 * time.Second))}, pubsubTopic: "test"}) - msgQ.Push(IndexedWakuMessage{msg: msg3, index: &pb.Index{Digest: []byte{3}, ReceiverTime: utils.GetUnixEpochFrom(time.Now().Add(-10 * time.Second))}, pubsubTopic: "test"}) + + err := msgQ.Push(IndexedWakuMessage{msg: msg1, index: &pb.Index{Digest: []byte{1}, ReceiverTime: utils.GetUnixEpochFrom(time.Now().Add(-20 * time.Second))}, pubsubTopic: "test"}) + require.NoError(t, err) + + err = msgQ.Push(IndexedWakuMessage{msg: msg2, index: &pb.Index{Digest: []byte{2}, ReceiverTime: utils.GetUnixEpochFrom(time.Now().Add(-15 * time.Second))}, pubsubTopic: "test"}) + require.NoError(t, err) + + err = msgQ.Push(IndexedWakuMessage{msg: msg3, index: &pb.Index{Digest: []byte{3}, ReceiverTime: utils.GetUnixEpochFrom(time.Now().Add(-10 * time.Second))}, pubsubTopic: "test"}) + require.NoError(t, err) require.Equal(t, msgQ.Length(), 3) - msgQ.Push(IndexedWakuMessage{msg: msg4, index: &pb.Index{Digest: []byte{4}, ReceiverTime: utils.GetUnixEpochFrom(time.Now().Add(-3 * time.Second))}, pubsubTopic: "test"}) + err = msgQ.Push(IndexedWakuMessage{msg: msg4, index: &pb.Index{Digest: []byte{4}, ReceiverTime: utils.GetUnixEpochFrom(time.Now().Add(-3 * time.Second))}, pubsubTopic: "test"}) + require.NoError(t, err) require.Len(t, msgQ.messages, 3) require.Equal(t, msg2.Payload, msgQ.messages[0].msg.Payload) @@ -32,14 +39,16 @@ func TestMessageQueue(t *testing.T) { indexedMsg5 := IndexedWakuMessage{msg: msg5, index: &pb.Index{Digest: []byte{5}, ReceiverTime: utils.GetUnixEpochFrom(time.Now().Add(0 * time.Second))}, pubsubTopic: "test"} - msgQ.Push(indexedMsg5) + err = msgQ.Push(indexedMsg5) + require.NoError(t, err) require.Len(t, msgQ.messages, 3) require.Equal(t, msg3.Payload, msgQ.messages[0].msg.Payload) require.Equal(t, msg5.Payload, msgQ.messages[2].msg.Payload) // Test duplication - msgQ.Push(indexedMsg5) + err = msgQ.Push(indexedMsg5) + require.ErrorIs(t, err, ErrDuplicatedMessage) require.Len(t, msgQ.messages, 3) require.Equal(t, msg3.Payload, msgQ.messages[0].msg.Payload) diff --git a/waku/v2/protocol/store/waku_resume_test.go b/waku/v2/protocol/store/waku_resume_test.go index 65b3e733..fd0f01a2 100644 --- a/waku/v2/protocol/store/waku_resume_test.go +++ b/waku/v2/protocol/store/waku_resume_test.go @@ -21,7 +21,7 @@ func TestFindLastSeenMessage(t *testing.T) { msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), "test") msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), "test") - s := NewWakuStore(nil, nil, nil, 0, 0) + s := NewWakuStore(nil, nil, nil, 0, 0, tests.Logger()) s.storeMessage(msg1) s.storeMessage(msg3) s.storeMessage(msg5) @@ -38,7 +38,7 @@ func TestResume(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, nil, 0, 0) + s1 := NewWakuStore(host1, nil, nil, 0, 0, tests.Logger()) s1.Start(ctx) defer s1.Stop() @@ -55,7 +55,7 @@ func TestResume(t *testing.T) { host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, nil, 0, 0) + s2 := NewWakuStore(host2, nil, nil, 0, 0, tests.Logger()) s2.Start(ctx) defer s2.Stop() @@ -87,7 +87,7 @@ func TestResumeWithListOfPeers(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, nil, 0, 0) + s1 := NewWakuStore(host1, nil, nil, 0, 0, tests.Logger()) s1.Start(ctx) defer s1.Stop() @@ -98,7 +98,7 @@ func TestResumeWithListOfPeers(t *testing.T) { host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, nil, 0, 0) + s2 := NewWakuStore(host2, nil, nil, 0, 0, tests.Logger()) s2.Start(ctx) defer s2.Stop() @@ -120,7 +120,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, nil, 0, 0) + s1 := NewWakuStore(host1, nil, nil, 0, 0, tests.Logger()) s1.Start(ctx) defer s1.Stop() @@ -131,7 +131,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, nil, 0, 0) + s2 := NewWakuStore(host2, nil, nil, 0, 0, tests.Logger()) s2.Start(ctx) defer s2.Stop() diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index 52efee9b..e4807cbc 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -11,12 +11,12 @@ import ( "sync" "time" - logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-msgio/protoio" + "go.uber.org/zap" "github.com/status-im/go-waku/waku/persistence" "github.com/status-im/go-waku/waku/v2/metrics" @@ -26,8 +26,6 @@ import ( "github.com/status-im/go-waku/waku/v2/utils" ) -var log = logging.Logger("wakustore") - // StoreID_v20beta3 is the current Waku Store protocol identifier const StoreID_v20beta3 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta3") @@ -64,14 +62,14 @@ func paginateWithIndex(list []IndexedWakuMessage, pinfo *pb.PagingInfo) (resMess pageSize := pinfo.PageSize dir := pinfo.Direction - if pageSize == 0 { // pageSize being zero indicates that no pagination is required - return list, pinfo - } - if len(list) == 0 { // no pagination is needed for an empty list return list, &pb.PagingInfo{PageSize: 0, Cursor: pinfo.Cursor, Direction: pinfo.Direction} } + if pageSize == 0 { + pageSize = MaxPageSize + } + msgList := make([]IndexedWakuMessage, len(list)) _ = copy(msgList, list) // makes a copy of the list @@ -231,6 +229,8 @@ type WakuStore struct { MsgC chan *protocol.Envelope wg *sync.WaitGroup + log *zap.SugaredLogger + started bool messageQueue *MessageQueue @@ -240,12 +240,13 @@ type WakuStore struct { } // NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages -func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore { +func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration, log *zap.SugaredLogger) *WakuStore { wakuStore := new(WakuStore) wakuStore.msgProvider = p wakuStore.h = host wakuStore.swap = swap wakuStore.wg = &sync.WaitGroup{} + wakuStore.log = log.Named("store") wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages, maxRetentionDuration) return wakuStore } @@ -271,13 +272,13 @@ func (store *WakuStore) Start(ctx context.Context) { go store.storeIncomingMessages(ctx) if store.msgProvider == nil { - log.Info("Store protocol started (no message provider)") + store.log.Info("Store protocol started (no message provider)") return } store.fetchDBRecords(ctx) - log.Info("Store protocol started") + store.log.Info("Store protocol started") } func (store *WakuStore) fetchDBRecords(ctx context.Context) { @@ -287,7 +288,7 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) { storedMessages, err := (store.msgProvider).GetAll() if err != nil { - log.Error("could not load DBProvider messages", err) + store.log.Error("could not load DBProvider messages", err) metrics.RecordStoreError(ctx, "store_load_failure") return } @@ -298,24 +299,27 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) { ReceiverTime: float64(storedMessage.ReceiverTime), } - store.storeMessageWithIndex(storedMessage.PubsubTopic, idx, storedMessage.Message) + _ = store.addToMessageQueue(storedMessage.PubsubTopic, idx, storedMessage.Message) metrics.RecordMessage(ctx, "stored", store.messageQueue.Length()) } } -func (store *WakuStore) storeMessageWithIndex(pubsubTopic string, idx *pb.Index, msg *pb.WakuMessage) { - store.messageQueue.Push(IndexedWakuMessage{msg: msg, index: idx, pubsubTopic: pubsubTopic}) +func (store *WakuStore) addToMessageQueue(pubsubTopic string, idx *pb.Index, msg *pb.WakuMessage) error { + return store.messageQueue.Push(IndexedWakuMessage{msg: msg, index: idx, pubsubTopic: pubsubTopic}) } func (store *WakuStore) storeMessage(env *protocol.Envelope) { index, err := computeIndex(env) if err != nil { - log.Error("could not calculate message index", err) + store.log.Error("could not calculate message index", err) return } - store.storeMessageWithIndex(env.PubsubTopic(), index, env.Message()) + err = store.addToMessageQueue(env.PubsubTopic(), index, env.Message()) + if err == ErrDuplicatedMessage { + return + } if store.msgProvider == nil { metrics.RecordMessage(store.ctx, "stored", store.messageQueue.Length()) @@ -325,7 +329,7 @@ func (store *WakuStore) storeMessage(env *protocol.Envelope) { // TODO: Move this to a separate go routine if DB writes becomes a bottleneck err = store.msgProvider.Put(index, env.PubsubTopic(), env.Message()) // Should the index be stored? if err != nil { - log.Error("could not store message", err) + store.log.Error("could not store message", err) metrics.RecordStoreError(store.ctx, "store_failure") return } @@ -350,12 +354,12 @@ func (store *WakuStore) onRequest(s network.Stream) { err := reader.ReadMsg(historyRPCRequest) if err != nil { - log.Error("error reading request", err) + store.log.Error("error reading request", err) metrics.RecordStoreError(store.ctx, "decodeRPCFailure") return } - log.Info(fmt.Sprintf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) + store.log.Info(fmt.Sprintf("%s: Received query from %s", s.Conn().LocalPeer(), s.Conn().RemotePeer())) historyResponseRPC := &pb.HistoryRPC{} historyResponseRPC.RequestId = historyRPCRequest.RequestId @@ -363,10 +367,10 @@ func (store *WakuStore) onRequest(s network.Stream) { err = writer.WriteMsg(historyResponseRPC) if err != nil { - log.Error("error writing response", err) + store.log.Error("error writing response", err) _ = s.Reset() } else { - log.Info(fmt.Sprintf("%s: Response sent to %s", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String())) + store.log.Info(fmt.Sprintf("%s: Response sent to %s", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String())) } } @@ -441,22 +445,22 @@ func WithPeer(p peer.ID) HistoryRequestOption { // to request the message history func WithAutomaticPeerSelection() HistoryRequestOption { return func(params *HistoryRequestParameters) { - p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta3)) + p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta3), params.s.log) if err == nil { params.selectedPeer = *p } else { - log.Info("Error selecting peer: ", err) + params.s.log.Info("Error selecting peer: ", err) } } } func WithFastestPeerSelection(ctx context.Context) HistoryRequestOption { return func(params *HistoryRequestParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta3)) + p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta3), params.s.log) if err == nil { params.selectedPeer = *p } else { - log.Info("Error selecting peer: ", err) + params.s.log.Info("Error selecting peer: ", err) } } } @@ -497,11 +501,11 @@ func DefaultOptions() []HistoryRequestOption { } func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestId []byte) (*pb.HistoryResponse, error) { - log.Info(fmt.Sprintf("Querying message history with peer %s", selectedPeer)) + store.log.Info(fmt.Sprintf("Querying message history with peer %s", selectedPeer)) connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta3) if err != nil { - log.Error("Failed to connect to remote peer", err) + store.log.Error("Failed to connect to remote peer", err) return nil, err } @@ -517,14 +521,14 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec err = writer.WriteMsg(historyRequest) if err != nil { - log.Error("could not write request", err) + store.log.Error("could not write request", err) return nil, err } historyResponseRPC := &pb.HistoryRPC{} err = reader.ReadMsg(historyResponseRPC) if err != nil { - log.Error("could not read response", err) + store.log.Error("could not read response", err) metrics.RecordStoreError(store.ctx, "decodeRPCFailure") return nil, err } @@ -631,15 +635,38 @@ func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) { }, nil } -func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, candidateList []peer.ID) (*pb.HistoryResponse, error) { +func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, candidateList []peer.ID) ([]*pb.WakuMessage, error) { // loops through the candidateList in order and sends the query to each until one of the query gets resolved successfully // returns the number of retrieved messages, or error if all the requests fail + + queryWg := sync.WaitGroup{} + queryWg.Add(len(candidateList)) + + resultChan := make(chan *pb.HistoryResponse, len(candidateList)) + for _, peer := range candidateList { - result, err := store.queryFrom(ctx, query, peer, protocol.GenerateRequestId()) - if err == nil { - return result, nil - } - log.Error(fmt.Errorf("resume history with peer %s failed: %w", peer, err)) + func() { + defer queryWg.Done() + result, err := store.queryFrom(ctx, query, peer, protocol.GenerateRequestId()) + if err == nil { + resultChan <- result + } + store.log.Error(fmt.Errorf("resume history with peer %s failed: %w", peer, err)) + }() + } + + queryWg.Wait() + close(resultChan) + + var messages []*pb.WakuMessage + hasResults := false + for result := range resultChan { + hasResults = true + messages = append(messages, result.Messages...) + } + + if hasResults { + return messages, nil } return nil, ErrFailedQuery @@ -686,28 +713,28 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList } if len(peerList) == 0 { - p, err := utils.SelectPeer(store.h, string(StoreID_v20beta3)) + p, err := utils.SelectPeer(store.h, string(StoreID_v20beta3), store.log) if err != nil { - log.Info("Error selecting peer: ", err) + store.log.Info("Error selecting peer: ", err) return -1, ErrNoPeersAvailable } peerList = append(peerList, *p) } - response, err := store.queryLoop(ctx, rpc, peerList) + messages, err := store.queryLoop(ctx, rpc, peerList) if err != nil { - log.Error("failed to resume history", err) + store.log.Error("failed to resume history", err) return -1, ErrFailedToResumeHistory } - for _, msg := range response.Messages { + for _, msg := range messages { store.storeMessage(protocol.NewEnvelope(msg, pubsubTopic)) } - log.Info("Retrieved messages since the last online time: ", len(response.Messages)) + store.log.Info("Retrieved messages since the last online time: ", len(messages)) - return len(response.Messages), nil + return len(messages), nil } // TODO: queryWithAccounting diff --git a/waku/v2/protocol/store/waku_store_persistence_test.go b/waku/v2/protocol/store/waku_store_persistence_test.go index 4d484d35..f4628b2b 100644 --- a/waku/v2/protocol/store/waku_store_persistence_test.go +++ b/waku/v2/protocol/store/waku_store_persistence_test.go @@ -5,6 +5,7 @@ import ( "database/sql" "testing" + "github.com/status-im/go-waku/tests" "github.com/status-im/go-waku/waku/persistence" "github.com/status-im/go-waku/waku/persistence/sqlite" "github.com/status-im/go-waku/waku/v2/protocol" @@ -21,10 +22,10 @@ func TestStorePersistence(t *testing.T) { db, err := sqlite.NewDB(":memory:") require.NoError(t, err) - dbStore, err := persistence.NewDBStore(persistence.WithDB(db)) + dbStore, err := persistence.NewDBStore(tests.Logger(), persistence.WithDB(db)) require.NoError(t, err) - s1 := NewWakuStore(nil, nil, dbStore, 0, 0) + s1 := NewWakuStore(nil, nil, dbStore, 0, 0, tests.Logger()) s1.fetchDBRecords(ctx) require.Len(t, s1.messageQueue.messages, 0) @@ -39,7 +40,7 @@ func TestStorePersistence(t *testing.T) { s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic)) - s2 := NewWakuStore(nil, nil, dbStore, 0, 0) + s2 := NewWakuStore(nil, nil, dbStore, 0, 0, tests.Logger()) s2.fetchDBRecords(ctx) require.Len(t, s2.messageQueue.messages, 1) require.Equal(t, msg, s2.messageQueue.messages[0].msg) diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index 65dbf085..15f53c4f 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -20,7 +20,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, nil, 0, 0) + s1 := NewWakuStore(host1, nil, nil, 0, 0, tests.Logger()) s1.Start(ctx) defer s1.Stop() @@ -39,7 +39,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { // Simulate a message has been received via relay protocol s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1) - s2 := NewWakuStore(host2, nil, nil, 0, 0) + s2 := NewWakuStore(host2, nil, nil, 0, 0, tests.Logger()) s2.Start(ctx) defer s2.Stop() @@ -66,7 +66,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, nil, 0, 0) + s1 := NewWakuStore(host1, nil, nil, 0, 0, tests.Logger()) s1.Start(ctx) defer s1.Stop() @@ -92,7 +92,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3)) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, nil, 0, 0) + s2 := NewWakuStore(host2, nil, nil, 0, 0, tests.Logger()) s2.Start(ctx) defer s2.Stop() diff --git a/waku/v2/protocol/store/waku_store_query_test.go b/waku/v2/protocol/store/waku_store_query_test.go index f4411c74..e47b4bea 100644 --- a/waku/v2/protocol/store/waku_store_query_test.go +++ b/waku/v2/protocol/store/waku_store_query_test.go @@ -17,7 +17,7 @@ func TestStoreQuery(t *testing.T) { msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch()) msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, nil, 0, 0) + s := NewWakuStore(nil, nil, nil, 0, 0, tests.Logger()) s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic)) s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic)) @@ -43,7 +43,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, nil, 0, 0) + s := NewWakuStore(nil, nil, nil, 0, 0, tests.Logger()) s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic)) s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic)) @@ -77,7 +77,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, nil, 0, 0) + s := NewWakuStore(nil, nil, nil, 0, 0, tests.Logger()) s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1)) s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2)) s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2)) @@ -109,7 +109,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, nil, 0, 0) + s := NewWakuStore(nil, nil, nil, 0, 0, tests.Logger()) s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2)) s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2)) s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2)) @@ -131,7 +131,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, nil, 0, 0) + s := NewWakuStore(nil, nil, nil, 0, 0, tests.Logger()) s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1)) s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1)) s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1)) @@ -150,7 +150,7 @@ func TestStoreQueryForwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(nil, nil, nil, 0, 0) + s := NewWakuStore(nil, nil, nil, 0, 0, tests.Logger()) for i := 0; i < 10; i++ { msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch()) msg.Payload = []byte{byte(i)} @@ -174,7 +174,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(nil, nil, nil, 0, 0) + s := NewWakuStore(nil, nil, nil, 0, 0, tests.Logger()) for i := 0; i < 10; i++ { msg := &pb.WakuMessage{ Payload: []byte{byte(i)}, @@ -200,7 +200,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { } func TestTemporalHistoryQueries(t *testing.T) { - s := NewWakuStore(nil, nil, nil, 0, 0) + s := NewWakuStore(nil, nil, nil, 0, 0, tests.Logger()) var messages []*pb.WakuMessage for i := 0; i < 10; i++ { diff --git a/waku/v2/protocol/swap/waku_swap.go b/waku/v2/protocol/swap/waku_swap.go index 66ed00a9..75c205d9 100644 --- a/waku/v2/protocol/swap/waku_swap.go +++ b/waku/v2/protocol/swap/waku_swap.go @@ -3,9 +3,8 @@ package swap import ( "sync" - logging "github.com/ipfs/go-log" - "github.com/libp2p/go-libp2p-core/protocol" + "go.uber.org/zap" ) const ( @@ -16,16 +15,16 @@ const ( const WakuSwapID_v200 = protocol.ID("/vac/waku/swap/2.0.0-beta1") -var log = logging.Logger("wakuswap") - type WakuSwap struct { params *SwapParameters + log *zap.SugaredLogger + Accounting map[string]int accountingMutex sync.RWMutex } -func NewWakuSwap(opts ...SwapOption) *WakuSwap { +func NewWakuSwap(log *zap.SugaredLogger, opts ...SwapOption) *WakuSwap { params := &SwapParameters{} optList := DefaultOptions() @@ -36,21 +35,22 @@ func NewWakuSwap(opts ...SwapOption) *WakuSwap { return &WakuSwap{ params: params, + log: log.Named("swap"), Accounting: make(map[string]int), } } func (s *WakuSwap) sendCheque(peerId string) { - log.Debug("not yet implemented") + s.log.Debug("not yet implemented") } func (s *WakuSwap) applyPolicy(peerId string) { if s.Accounting[peerId] <= s.params.disconnectThreshold { - log.Warnf("Disconnect threshhold has been reached for %s at %d", peerId, s.Accounting[peerId]) + s.log.Warnf("Disconnect threshhold has been reached for %s at %d", peerId, s.Accounting[peerId]) } if s.Accounting[peerId] >= s.params.paymentThreshold { - log.Warnf("Disconnect threshhold has been reached for %s at %d", peerId, s.Accounting[peerId]) + s.log.Warnf("Disconnect threshhold has been reached for %s at %d", peerId, s.Accounting[peerId]) if s.params.mode != HardMode { s.sendCheque(peerId) } diff --git a/waku/v2/protocol/swap/waku_swap_test.go b/waku/v2/protocol/swap/waku_swap_test.go index 34e7d255..e0899bdb 100644 --- a/waku/v2/protocol/swap/waku_swap_test.go +++ b/waku/v2/protocol/swap/waku_swap_test.go @@ -3,11 +3,12 @@ package swap import ( "testing" + "github.com/status-im/go-waku/tests" "github.com/stretchr/testify/require" ) func TestSwapCreditDebit(t *testing.T) { - swap := NewWakuSwap([]SwapOption{ + swap := NewWakuSwap(tests.Logger(), []SwapOption{ WithMode(SoftMode), WithThreshold(0, 0), }...) diff --git a/waku/v2/rpc/admin.go b/waku/v2/rpc/admin.go index 4f3f016c..7230730b 100644 --- a/waku/v2/rpc/admin.go +++ b/waku/v2/rpc/admin.go @@ -4,6 +4,7 @@ import ( "net/http" ma "github.com/multiformats/go-multiaddr" + "go.uber.org/zap" "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol/filter" @@ -14,6 +15,7 @@ import ( type AdminService struct { node *node.WakuNode + log *zap.SugaredLogger } type GetPeersArgs struct { @@ -37,7 +39,7 @@ func (a *AdminService) PostV1Peers(req *http.Request, args *PeersArgs, reply *Su for _, peer := range args.Peers { addr, err := ma.NewMultiaddr(peer) if err != nil { - log.Error("Error building multiaddr", err) + a.log.Error("Error building multiaddr", err) reply.Success = false reply.Error = err.Error() return nil @@ -45,7 +47,7 @@ func (a *AdminService) PostV1Peers(req *http.Request, args *PeersArgs, reply *Su err = a.node.DialPeerWithMultiAddress(req.Context(), addr) if err != nil { - log.Error("Error dialing peers", err) + a.log.Error("Error dialing peers", err) reply.Success = false reply.Error = err.Error() return nil @@ -63,7 +65,7 @@ func isWakuProtocol(protocol string) bool { func (a *AdminService) GetV1Peers(req *http.Request, args *GetPeersArgs, reply *PeersReply) error { peers, err := a.node.Peers() if err != nil { - log.Error("Error getting peers", err) + a.log.Error("Error getting peers", err) return nil } for _, peer := range peers { diff --git a/waku/v2/rpc/admin_test.go b/waku/v2/rpc/admin_test.go index 5b46cc81..311dd99f 100644 --- a/waku/v2/rpc/admin_test.go +++ b/waku/v2/rpc/admin_test.go @@ -23,7 +23,7 @@ func makeAdminService(t *testing.T) *AdminService { require.NoError(t, err) err = n.Start() require.NoError(t, err) - return &AdminService{n} + return &AdminService{n, tests.Logger()} } func TestV1Peers(t *testing.T) { @@ -32,7 +32,7 @@ func TestV1Peers(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := relay.NewWakuRelay(context.Background(), host, nil, 0) + relay, err := relay.NewWakuRelay(context.Background(), host, nil, 0, tests.Logger()) require.NoError(t, err) defer relay.Stop() diff --git a/waku/v2/rpc/filter.go b/waku/v2/rpc/filter.go index cfafdd7b..76b5a8f5 100644 --- a/waku/v2/rpc/filter.go +++ b/waku/v2/rpc/filter.go @@ -9,10 +9,12 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/filter" "github.com/status-im/go-waku/waku/v2/protocol/pb" + "go.uber.org/zap" ) type FilterService struct { node *node.WakuNode + log *zap.SugaredLogger messages map[string][]*pb.WakuMessage messagesMutex sync.RWMutex @@ -29,9 +31,10 @@ type ContentTopicArgs struct { ContentTopic string `json:"contentTopic,omitempty"` } -func NewFilterService(node *node.WakuNode) *FilterService { +func NewFilterService(node *node.WakuNode, log *zap.SugaredLogger) *FilterService { s := &FilterService{ node: node, + log: log.Named("filter"), messages: make(map[string][]*pb.WakuMessage), } s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope) @@ -77,7 +80,7 @@ func (f *FilterService) PostV1Subscription(req *http.Request, args *FilterConten filter.WithAutomaticPeerSelection(), ) if err != nil { - log.Error("Error subscribing to topic:", args.Topic, "err:", err) + f.log.Error("Error subscribing to topic:", args.Topic, "err:", err) reply.Success = false reply.Error = err.Error() return nil @@ -95,7 +98,7 @@ func (f *FilterService) DeleteV1Subscription(req *http.Request, args *FilterCont makeContentFilter(args), ) if err != nil { - log.Error("Error unsubscribing to topic:", args.Topic, "err:", err) + f.log.Error("Error unsubscribing to topic:", args.Topic, "err:", err) reply.Success = false reply.Error = err.Error() return nil diff --git a/waku/v2/rpc/filter_test.go b/waku/v2/rpc/filter_test.go index 72be6aad..28499aa2 100644 --- a/waku/v2/rpc/filter_test.go +++ b/waku/v2/rpc/filter_test.go @@ -28,7 +28,7 @@ func makeFilterService(t *testing.T) *FilterService { _, err = n.Relay().SubscribeToTopic(context.Background(), testTopic) require.NoError(t, err) - return NewFilterService(n) + return NewFilterService(n, tests.Logger()) } func TestFilterSubscription(t *testing.T) { @@ -38,13 +38,13 @@ func TestFilterSubscription(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0) + node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0, tests.Logger()) require.NoError(t, err) _, err = node.SubscribeToTopic(context.Background(), testTopic) require.NoError(t, err) - _, _ = filter.NewWakuFilter(context.Background(), host, false) + _, _ = filter.NewWakuFilter(context.Background(), host, false, tests.Logger()) d := makeFilterService(t) defer d.node.Stop() diff --git a/waku/v2/rpc/private.go b/waku/v2/rpc/private.go index cd8273bc..33a91dc8 100644 --- a/waku/v2/rpc/private.go +++ b/waku/v2/rpc/private.go @@ -11,10 +11,12 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol/pb" + "go.uber.org/zap" ) type PrivateService struct { node *node.WakuNode + log *zap.SugaredLogger symmetricMessages map[string][]*pb.WakuMessage symmetricMessagesMutex sync.RWMutex @@ -54,11 +56,12 @@ type AsymmetricMessagesArgs struct { PrivateKey string `json:"privateKey"` } -func NewPrivateService(node *node.WakuNode) *PrivateService { +func NewPrivateService(node *node.WakuNode, log *zap.SugaredLogger) *PrivateService { return &PrivateService{ node: node, symmetricMessages: make(map[string][]*pb.WakuMessage), asymmetricMessages: make(map[string][]*pb.WakuMessage), + log: log.Named("private"), } } diff --git a/waku/v2/rpc/private_test.go b/waku/v2/rpc/private_test.go index c697e6c1..ff39e7e1 100644 --- a/waku/v2/rpc/private_test.go +++ b/waku/v2/rpc/private_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/status-im/go-waku/tests" "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/stretchr/testify/require" @@ -15,7 +16,7 @@ func makePrivateService(t *testing.T) *PrivateService { err = n.Start() require.NoError(t, err) - return NewPrivateService(n) + return NewPrivateService(n, tests.Logger()) } func TestGetV1SymmetricKey(t *testing.T) { diff --git a/waku/v2/rpc/relay.go b/waku/v2/rpc/relay.go index 708e91f7..30204504 100644 --- a/waku/v2/rpc/relay.go +++ b/waku/v2/rpc/relay.go @@ -8,11 +8,14 @@ import ( "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" + "go.uber.org/zap" ) type RelayService struct { node *node.WakuNode + log *zap.SugaredLogger + messages map[string][]*pb.WakuMessage messagesMutex sync.RWMutex @@ -32,9 +35,10 @@ type TopicArgs struct { Topic string `json:"topic,omitempty"` } -func NewRelayService(node *node.WakuNode) *RelayService { +func NewRelayService(node *node.WakuNode, log *zap.SugaredLogger) *RelayService { s := &RelayService{ node: node, + log: log.Named("relay"), messages: make(map[string][]*pb.WakuMessage), } @@ -69,7 +73,7 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, _, err = r.node.Relay().PublishToTopic(req.Context(), &args.Message, args.Topic) } if err != nil { - log.Error("Error publishing message:", err) + r.log.Error("Error publishing message: ", err) reply.Success = false reply.Error = err.Error() } else { @@ -89,7 +93,7 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r _, err = r.node.Relay().SubscribeToTopic(ctx, topic) } if err != nil { - log.Error("Error subscribing to topic:", topic, "err:", err) + r.log.Error("Error subscribing to topic:", topic, "err:", err) reply.Success = false reply.Error = err.Error() return nil @@ -105,7 +109,7 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, for _, topic := range args.Topics { err := r.node.Relay().Unsubscribe(ctx, topic) if err != nil { - log.Error("Error unsubscribing from topic:", topic, "err:", err) + r.log.Error("Error unsubscribing from topic", topic, "err:", err) reply.Success = false reply.Error = err.Error() return nil diff --git a/waku/v2/rpc/relay_test.go b/waku/v2/rpc/relay_test.go index 8bcbcb6c..56f7e7b7 100644 --- a/waku/v2/rpc/relay_test.go +++ b/waku/v2/rpc/relay_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/multiformats/go-multiaddr" + "github.com/status-im/go-waku/tests" "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/stretchr/testify/require" @@ -18,7 +19,7 @@ func makeRelayService(t *testing.T) *RelayService { require.NoError(t, err) err = n.Start() require.NoError(t, err) - return NewRelayService(n) + return NewRelayService(n, tests.Logger()) } func TestPostV1Message(t *testing.T) { diff --git a/waku/v2/rpc/store.go b/waku/v2/rpc/store.go index 7354f123..7ce5465f 100644 --- a/waku/v2/rpc/store.go +++ b/waku/v2/rpc/store.go @@ -6,10 +6,12 @@ import ( "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/store" + "go.uber.org/zap" ) type StoreService struct { node *node.WakuNode + log *zap.SugaredLogger } // cursor *pb.Index @@ -54,7 +56,7 @@ func (s *StoreService) GetV1Messages(req *http.Request, args *StoreMessagesArgs, options..., ) if err != nil { - log.Error("Error querying messages:", err) + s.log.Error("Error querying messages:", err) reply.Error = err.Error() return nil } diff --git a/waku/v2/rpc/store_test.go b/waku/v2/rpc/store_test.go index 68d1924e..a8c4c991 100644 --- a/waku/v2/rpc/store_test.go +++ b/waku/v2/rpc/store_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/status-im/go-waku/tests" "github.com/status-im/go-waku/waku/v2/node" "github.com/stretchr/testify/require" ) @@ -14,7 +15,7 @@ func makeStoreService(t *testing.T) *StoreService { require.NoError(t, err) err = n.Start() require.NoError(t, err) - return &StoreService{n} + return &StoreService{n, tests.Logger()} } func TestStoreGetV1Messages(t *testing.T) { diff --git a/waku/v2/rpc/waku_rpc.go b/waku/v2/rpc/waku_rpc.go index bbcb847e..f3ea9aeb 100644 --- a/waku/v2/rpc/waku_rpc.go +++ b/waku/v2/rpc/waku_rpc.go @@ -7,64 +7,67 @@ import ( "time" "github.com/gorilla/rpc/v2" - logging "github.com/ipfs/go-log" "github.com/status-im/go-waku/waku/v2/node" + "go.uber.org/zap" ) -var log = logging.Logger("wakurpc") - type WakuRpc struct { node *node.WakuNode server *http.Server + log *zap.SugaredLogger + relayService *RelayService filterService *FilterService privateService *PrivateService } -func NewWakuRpc(node *node.WakuNode, address string, port int) *WakuRpc { +func NewWakuRpc(node *node.WakuNode, address string, port int, log *zap.SugaredLogger) *WakuRpc { + wrpc := new(WakuRpc) + wrpc.log = log.Named("rpc") + s := rpc.NewServer() s.RegisterCodec(NewSnakeCaseCodec(), "application/json") s.RegisterCodec(NewSnakeCaseCodec(), "application/json;charset=UTF-8") err := s.RegisterService(&DebugService{node}, "Debug") if err != nil { - log.Error(err) + wrpc.log.Error(err) } - relayService := NewRelayService(node) + relayService := NewRelayService(node, log) err = s.RegisterService(relayService, "Relay") if err != nil { - log.Error(err) + wrpc.log.Error(err) } - err = s.RegisterService(&StoreService{node}, "Store") + err = s.RegisterService(&StoreService{node, log.Named("store")}, "Store") if err != nil { - log.Error(err) + wrpc.log.Error(err) } - err = s.RegisterService(&AdminService{node}, "Admin") + err = s.RegisterService(&AdminService{node, log.Named("admin")}, "Admin") if err != nil { - log.Error(err) + wrpc.log.Error(err) } - filterService := NewFilterService(node) + filterService := NewFilterService(node, log) err = s.RegisterService(filterService, "Filter") if err != nil { - log.Error(err) + wrpc.log.Error(err) } - privateService := NewPrivateService(node) + privateService := NewPrivateService(node, log) err = s.RegisterService(privateService, "Private") if err != nil { - log.Error(err) + wrpc.log.Error(err) } mux := http.NewServeMux() mux.HandleFunc("/jsonrpc", func(w http.ResponseWriter, r *http.Request) { t := time.Now() s.ServeHTTP(w, r) - log.Infof("RPC request at %s took %s", r.URL.Path, time.Since(t)) + wrpc.log.Infof("RPC request at %s took %s", r.URL.Path, time.Since(t)) }) listenAddr := fmt.Sprintf("%s:%d", address, port) @@ -79,13 +82,13 @@ func NewWakuRpc(node *node.WakuNode, address string, port int) *WakuRpc { relayService.Stop() }) - return &WakuRpc{ - node: node, - server: server, - relayService: relayService, - filterService: filterService, - privateService: privateService, - } + wrpc.node = node + wrpc.server = server + wrpc.relayService = relayService + wrpc.filterService = filterService + wrpc.privateService = privateService + + return wrpc } func (r *WakuRpc) Start() { @@ -94,10 +97,10 @@ func (r *WakuRpc) Start() { go func() { _ = r.server.ListenAndServe() }() - log.Info("Rpc server started at ", r.server.Addr) + r.log.Info("Rpc server started at ", r.server.Addr) } func (r *WakuRpc) Stop(ctx context.Context) error { - log.Info("Shutting down rpc server") + r.log.Info("Shutting down rpc server") return r.server.Shutdown(ctx) } diff --git a/waku/v2/rpc/waku_rpc_test.go b/waku/v2/rpc/waku_rpc_test.go index 1cec40e8..9f6f44d5 100644 --- a/waku/v2/rpc/waku_rpc_test.go +++ b/waku/v2/rpc/waku_rpc_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/status-im/go-waku/tests" "github.com/status-im/go-waku/waku/v2/node" "github.com/stretchr/testify/require" ) @@ -13,7 +14,7 @@ func TestWakuRpc(t *testing.T) { n, err := node.New(context.Background(), options) require.NoError(t, err) - rpc := NewWakuRpc(n, "127.0.0.1", 8080) + rpc := NewWakuRpc(n, "127.0.0.1", 8080, tests.Logger()) require.NotNil(t, rpc.server) require.Equal(t, rpc.server.Addr, "127.0.0.1:8080") } diff --git a/waku/v2/utils/enr.go b/waku/v2/utils/enr.go index 2c8d64f6..c5366b08 100644 --- a/waku/v2/utils/enr.go +++ b/waku/v2/utils/enr.go @@ -13,6 +13,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/enr" "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" + "go.uber.org/zap" ) const WakuENRField = "waku2" @@ -141,7 +142,7 @@ func Multiaddress(node *enode.Node) ([]ma.Multiaddr, error) { var multiaddrRaw []byte if err := node.Record().Load(enr.WithEntry(MultiaddrENRField, &multiaddrRaw)); err != nil { if !enr.IsNotFound(err) { - log.Error("could not retrieve multiaddress field for node ", node) + Logger().Error("could not retrieve multiaddress field for node ", zap.Any("enode", node)) } return nil, err } diff --git a/waku/v2/utils/logger.go b/waku/v2/utils/logger.go new file mode 100644 index 00000000..40ed2704 --- /dev/null +++ b/waku/v2/utils/logger.go @@ -0,0 +1,47 @@ +package utils + +import ( + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +var log *zap.Logger = nil +var atom = zap.NewAtomicLevel() + +func SetLogLevel(level string) error { + lvl := zapcore.InfoLevel // zero value + err := lvl.Set(level) + if err != nil { + return err + } + atom.SetLevel(lvl) + return nil +} + +func Logger() *zap.Logger { + if log == nil { + cfg := zap.Config{ + Encoding: "console", + Level: atom, + OutputPaths: []string{"stderr"}, + ErrorOutputPaths: []string{"stderr"}, + EncoderConfig: zapcore.EncoderConfig{ + MessageKey: "message", + LevelKey: "level", + EncodeLevel: zapcore.CapitalLevelEncoder, + TimeKey: "time", + EncodeTime: zapcore.ISO8601TimeEncoder, + NameKey: "caller", + EncodeCaller: zapcore.ShortCallerEncoder, + }, + } + + logger, err := cfg.Build() + if err != nil { + panic("could not create logger") + } + + log = logger.Named("gowaku") + } + return log +} diff --git a/waku/v2/utils/peer.go b/waku/v2/utils/peer.go index 66acbb64..831a35c8 100644 --- a/waku/v2/utils/peer.go +++ b/waku/v2/utils/peer.go @@ -7,19 +7,17 @@ import ( "sync" "time" - logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/ping" + "go.uber.org/zap" ) -var log = logging.Logger("utils") - var ErrNoPeersAvailable = errors.New("no suitable peers found") var PingServiceNotAvailable = errors.New("ping service not available") // SelectPeer is used to return a random peer that supports a given protocol. -func SelectPeer(host host.Host, protocolId string) (*peer.ID, error) { +func SelectPeer(host host.Host, protocolId string, log *zap.SugaredLogger) (*peer.ID, error) { // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. // Ideally depending on the query and our set of peers we take a subset of ideal peers. // This will require us to check for various factors such as: @@ -52,7 +50,7 @@ type pingResult struct { rtt time.Duration } -func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string) (*peer.ID, error) { +func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string, log *zap.SugaredLogger) (*peer.ID, error) { var peers peer.IDSlice for _, peer := range host.Peerstore().Peers() { protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId) diff --git a/waku/v2/utils/peer_test.go b/waku/v2/utils/peer_test.go index f32ae92b..e6ead63c 100644 --- a/waku/v2/utils/peer_test.go +++ b/waku/v2/utils/peer_test.go @@ -33,14 +33,14 @@ func TestSelectPeer(t *testing.T) { h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) // No peers with selected protocol - _, err = SelectPeer(h1, proto) + _, err = SelectPeer(h1, proto, tests.Logger()) require.Error(t, ErrNoPeersAvailable, err) // Peers with selected protocol _ = h1.Peerstore().AddProtocols(h2.ID(), proto) _ = h1.Peerstore().AddProtocols(h3.ID(), proto) - _, err = SelectPeerWithLowestRTT(ctx, h1, proto) + _, err = SelectPeerWithLowestRTT(ctx, h1, proto, tests.Logger()) require.NoError(t, err) } @@ -69,13 +69,13 @@ func TestSelectPeerWithLowestRTT(t *testing.T) { h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) // No peers with selected protocol - _, err = SelectPeerWithLowestRTT(ctx, h1, proto) + _, err = SelectPeerWithLowestRTT(ctx, h1, proto, tests.Logger()) require.Error(t, ErrNoPeersAvailable, err) // Peers with selected protocol _ = h1.Peerstore().AddProtocols(h2.ID(), proto) _ = h1.Peerstore().AddProtocols(h3.ID(), proto) - _, err = SelectPeerWithLowestRTT(ctx, h1, proto) + _, err = SelectPeerWithLowestRTT(ctx, h1, proto, tests.Logger()) require.NoError(t, err) }